 * Simple manager that keeps track of which TaskManager are available and alive.
public class InstanceManager {

    // ------------------------------------------------------------------------
    // Fields
    // ------------------------------------------------------------------------

    /** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
    private final Map<InstanceID, Instance> registeredHostsById;
    /** Set of hosts known to run a task manager that are thus able to execute tasks (by ResourceID). */
    private final Map<ResourceID, Instance> registeredHostsByResource;

    /** Set of hosts that were present once and have died */
    private final Set<ResourceID> deadHosts;

    /** Listeners that want to be notified about availability and disappearance of instances */
    private final List<InstanceListener> instanceListeners = new ArrayList<>(); //Instance资源发生变化时,需要通知谁,如Scheduler

    /** The total number of task slots that the system has */
    private int totalNumberOfAliveTaskSlots;




 * Registers a task manager. Registration of a task manager makes it available to be used
 * for the job execution.
 * @param taskManagerGateway gateway to the task manager
 * @param taskManagerLocation Location info of the TaskManager
 * @param resources Hardware description of the TaskManager
 * @param numberOfSlots Number of available slots on the TaskManager
 * @return The assigned InstanceID of the registered task manager
public InstanceID registerTaskManager(
        TaskManagerGateway taskManagerGateway,
        TaskManagerLocation taskManagerLocation,
        HardwareDescription resources,
        int numberOfSlots) {

    synchronized (this.lock) {
        InstanceID instanceID = new InstanceID();

        Instance host = new Instance( //创建新的instance

        registeredHostsById.put(instanceID, host); //register
        registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);

        totalNumberOfAliveTaskSlots += numberOfSlots;


        // notify all listeners (for example the scheduler)

        return instanceID;


private void notifyNewInstance(Instance instance) {
    synchronized (this.instanceListeners) {
        for (InstanceListener listener : this.instanceListeners) {
            try {
                listener.newInstanceAvailable(instance); //调用listener的newInstanceAvailable
            catch (Throwable t) {
                LOG.error("Notification of new instance availability failed.", t);





 * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
 * registered at a JobManager and ready to receive work.
public class Instance implements SlotOwner {

    /** The instance gateway to communicate with the instance */
    private final TaskManagerGateway taskManagerGateway;

    /** The instance connection information for the data transfer. */
    private final TaskManagerLocation location;

    /** A description of the resources of the task manager */
    private final HardwareDescription resources;

    /** The ID identifying the taskManager. */
    private final InstanceID instanceId;

    /** The number of task slots available on the node */
    private final int numberOfSlots;

    /** A list of available slot positions */
    private final Queue<Integer> availableSlots; //注意这里记录的不是slot,而是position,因为slot是在用的时候创建的

    /** Allocated slots on this taskManager */
    private final Set<Slot> allocatedSlots = new HashSet<Slot>();

    /** A listener to be notified upon new slot availability */
    private SlotAvailabilityListener slotAvailabilityListener;  //listener用于通知当slot状态发生变化

    /** Time when last heat beat has been received from the task manager running on this taskManager. */
    private volatile long lastReceivedHeartBeat = System.currentTimeMillis();



 * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot
 * is available at the moment.
 * @param jobID The ID of the job that the slot is allocated for.
 * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the
 *         TaskManager instance has no more slots available.
 * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the
 *                               slot is allocated.
public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {

    synchronized (instanceLock) {
        Integer nextSlot = availableSlots.poll(); //看看有没有available的slot position
        if (nextSlot == null) {
            return null;
        else {
            SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
            return slot;



 * Returns a slot that has been allocated from this instance. The slot needs have been canceled
 * prior to calling this method.
 * <p>The method will transition the slot to the "released" state. If the slot is already in state
 * "released", this method will do nothing.</p>
 * @param slot The slot to return.
 * @return True, if the slot was returned, false if not.
public boolean returnAllocatedSlot(Slot slot) {

    if (slot.markReleased()) {
        LOG.debug("Return allocated slot {}.", slot);
        synchronized (instanceLock) {

            if (this.allocatedSlots.remove(slot)) {

                if (this.slotAvailabilityListener != null) {
                    this.slotAvailabilityListener.newSlotAvailable(this); //通知有个slot可以用

                return true;

