Flink - InstanceManager

InstanceManager用于管理JobManager申请到的taskManager和slots资源

/**
 * Simple manager that keeps track of which TaskManager are available and alive.
 */
public class InstanceManager {

    // ------------------------------------------------------------------------
    // Fields
    // ------------------------------------------------------------------------

    //分别以InstanceId和ResourceId来索引Instance
    /** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
    private final Map<InstanceID, Instance> registeredHostsById;
    /** Set of hosts known to run a task manager that are thus able to execute tasks (by ResourceID). */
    private final Map<ResourceID, Instance> registeredHostsByResource;

    /** Set of hosts that were present once and have died */
    private final Set<ResourceID> deadHosts;

    /** Listeners that want to be notified about availability and disappearance of instances */
    private final List<InstanceListener> instanceListeners = new ArrayList<>(); //Instance资源发生变化时,需要通知谁,如Scheduler

    /** The total number of task slots that the system has */
    private int totalNumberOfAliveTaskSlots;

 

关键的操作,

registerTaskManager

/**
 * Registers a task manager. Registration of a task manager makes it available to be used
 * for the job execution.
 *
 * @param taskManagerGateway gateway to the task manager
 * @param taskManagerLocation Location info of the TaskManager
 * @param resources Hardware description of the TaskManager
 * @param numberOfSlots Number of available slots on the TaskManager
 * @return The assigned InstanceID of the registered task manager
 */
public InstanceID registerTaskManager(
        TaskManagerGateway taskManagerGateway,
        TaskManagerLocation taskManagerLocation,
        HardwareDescription resources,
        int numberOfSlots) {

    synchronized (this.lock) {
        InstanceID instanceID = new InstanceID();

        Instance host = new Instance( //创建新的instance
            taskManagerGateway,
            taskManagerLocation,
            instanceID,
            resources,
            numberOfSlots);

        registeredHostsById.put(instanceID, host); //register
        registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);

        totalNumberOfAliveTaskSlots += numberOfSlots;

        host.reportHeartBeat();

        // notify all listeners (for example the scheduler)
        notifyNewInstance(host);

        return instanceID;
    }
}

其中,notifyNewInstance

private void notifyNewInstance(Instance instance) {
    synchronized (this.instanceListeners) {
        for (InstanceListener listener : this.instanceListeners) {
            try {
                listener.newInstanceAvailable(instance); //调用listener的newInstanceAvailable
            }
            catch (Throwable t) {
                LOG.error("Notification of new instance availability failed.", t);
            }
        }
    }
}

 

Instance

看注释,instance就是一种抽象

用于描述注册到JobManager,并准备接受work的TaskManager

/**
 * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
 * registered at a JobManager and ready to receive work.
 */
public class Instance implements SlotOwner {

    /** The instance gateway to communicate with the instance */
    private final TaskManagerGateway taskManagerGateway;

    /** The instance connection information for the data transfer. */
    private final TaskManagerLocation location;

    /** A description of the resources of the task manager */
    private final HardwareDescription resources;

    /** The ID identifying the taskManager. */
    private final InstanceID instanceId;

    /** The number of task slots available on the node */
    private final int numberOfSlots;

    /** A list of available slot positions */
    private final Queue<Integer> availableSlots; //注意这里记录的不是slot,而是position,因为slot是在用的时候创建的

    /** Allocated slots on this taskManager */
    private final Set<Slot> allocatedSlots = new HashSet<Slot>();

    /** A listener to be notified upon new slot availability */
    private SlotAvailabilityListener slotAvailabilityListener;  //listener用于通知当slot状态发生变化

    /** Time when last heat beat has been received from the task manager running on this taskManager. */
    private volatile long lastReceivedHeartBeat = System.currentTimeMillis();

核心的操作,

申请slot

/**
 * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot
 * is available at the moment.
 *
 * @param jobID The ID of the job that the slot is allocated for.
 *
 * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the
 *         TaskManager instance has no more slots available.
 *
 * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the
 *                               slot is allocated.
 */
public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {

    synchronized (instanceLock) {
        Integer nextSlot = availableSlots.poll(); //看看有没有available的slot position
        if (nextSlot == null) {
            return null;
        }
        else {
            SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
            allocatedSlots.add(slot);
            return slot;
        }
    }
}

 

归还slot

/**
 * Returns a slot that has been allocated from this instance. The slot needs have been canceled
 * prior to calling this method.
 *
 * <p>The method will transition the slot to the "released" state. If the slot is already in state
 * "released", this method will do nothing.</p>
 *
 * @param slot The slot to return.
 * @return True, if the slot was returned, false if not.
 */
@Override
public boolean returnAllocatedSlot(Slot slot) {

    if (slot.markReleased()) {
        LOG.debug("Return allocated slot {}.", slot);
        synchronized (instanceLock) {

            if (this.allocatedSlots.remove(slot)) {
                this.availableSlots.add(slot.getSlotNumber());

                if (this.slotAvailabilityListener != null) {
                    this.slotAvailabilityListener.newSlotAvailable(this); //通知有个slot可以用
                }

                return true;
            }
        }
    }
}

时间: 2024-08-16 08:46:22

Flink - InstanceManager的相关文章

Flink - FLIP

https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals   FLIP-1 : Fine Grained Recovery from Task Failures   When a task fails during execution, Flink currently resets the entire execution graph and triggers complete re-executi

Flink 1.1 – ResourceManager

Flink resource manager的作用如图,   FlinkResourceManager /** * * <h1>Worker allocation steps</h1> * * <ol> * <li>The resource manager decides to request more workers. This can happen in order * to fill the initial pool, or as a result o

Flink运行时之生成作业图

生成作业图 在分析完了流处理程序生成的流图(StreamGraph)以及批处理程序生成的优化后的计划(OptimizedPlan)之后,下一步就是生成它们面向Flink运行时执行引擎的共同抽象--作业图(JobGraph). 什么是作业图 作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一. 相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已

阿里蒋晓伟谈流计算和批处理引擎Blink,以及Flink和Spark的异同与优势

首届阿里巴巴在线技术峰会(Alibaba Online Technology Summit),将于7月19日-21日 20:00-21:30 在线举办.本次峰会邀请到阿里集团9位技术大V,分享电商架构.安全.数据处理.数据库.多应用部署.互动技术.Docker持续交付与微服务等一线实战经验,解读最新技术在阿里集团的应用实践. 7月19日晚8点,阿里搜索事业部资深搜索专家蒋晓伟将在线分享<阿里流计算和批处理引擎Blink>,其基于Apache Flink项目并且在API和它上兼容,深度分享阿里为

Flink 原理与实现:内存管理

如今,大数据领域的开源框架(Hadoop,Spark,Storm)都使用的 JVM,当然也包括 Flink.基于 JVM 的数据分析引擎都需要面对将大量数据存到内存中,这就不得不面对 JVM 存在的几个问题: Java 对象存储密度低.一个只包含 boolean 属性的对象占用了16个字节内存:对象头占了8个,boolean 属性占了1个,对齐填充占了7个.而实际上只需要一个bit(1/8字节)就够了. Full GC 会极大地影响性能,尤其是为了处理更大数据而开了很大内存空间的JVM来说,GC

Flink关系型API的公共部分

关系型程序的公共部分 下面的代码段展示了Table&SQL API所编写流式程序的程序模式: val env = StreamExecutionEnvironment.getExecutionEnvironment //创建TableEnvironment对象 val tableEnv = TableEnvironment.getTableEnvironment(env) //注册表 tableEnv.registerTable("table1", ...) //或者 tabl

Apache Flink源码解析之stream-window

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析.本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下. Window 一个Window代表有限对象的集合.一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点--所有应该进入这个窗口的元素都已经到达. Flink的根窗口对象是一个抽象类,只提供了一个抽象方法: public abstract long maxTimes

Flink关系型API简介

在接触关系型API之前,用户通常会采用DataStream.DataSet API来编写Flink程序,它们都提供了丰富的处理能力,以DataStream为例,它有如下这些优点: 富有表现力的流处理,包括但不限于:转换数据,更新状态,定义窗口.聚合,事件时间语义,有状态且保证正确性等: 高度自定义的窗口逻辑:分配器.触发器.逐出器以及允许延迟等: 提升与外部系统连接能力的异步I/O接口: ProcessFunction给予用户访问时间戳和定时器等低层级的操作能力: 但它同时也存在一些使用壁垒导致

Flink运行时之生产端结果分区

生产端结果分区 生产者结果分区是生产端任务所产生的结果.以一个简单的MapReduce程序为例,从静态的角度来看,生产端的算子(Map)跟消费端的算子(Reduce),两者之间交换数据通过中间结果集(IntermediateResult).形如下图: 而IntermediateResult只是在静态表述时的一种概念,在运行时,算子会被分布式部署.执行,我们假设两个算子的并行度都为2,那么对应的运行时模型如下图: 生产端的Map算子会产生两个子任务实例,它们各自都会产生结果分区(ResultPar