Flink运行时之TaskManager执行Task

TaskManager执行任务

当一个任务被JobManager部署到TaskManager之后,它将会被执行。本篇我们将分析任务的执行细节。

submitTask方法分析

一个任务实例被部署所产生的实际影响就是JobManager会将一个TaskDeploymentDescriptor对象封装在SubmitTask消息中发送给TaskManager。而处理该消息的入口方法是submitTask方法,它是TaskManager接收任务部署并启动任务执行的入口方法,值得我们关注一下它的实现细节。

submitTask方法中的第一个关键点是它先构建一个Task对象:

val task = new Task(
    tdd,
    memoryManager,
    ioManager,
    network,
    bcVarManager,
    selfGateway,
    jobManagerGateway,
    config.timeout,
    libCache,
    fileCache,
    runtimeInfo,
    taskMetricGroup)

该Task封装了其在TaskManager中执行时需要的一些关键对象。task对象将会被加入TaskManager中的一个ExecutionAttemptID与Task的Map中,如果发现该ExecutionAttemptID所对应的Task对象已存在于Map中,则将原先的Task实例重新放回到Map中,同时抛出异常:

val execId = tdd.getExecutionId
val prevTask = runningTasks.put(execId, task)
if (prevTask != null) {
    runningTasks.put(execId, prevTask)
    throw new IllegalStateException("TaskManager already contains a task for id " + execId)
}

如果一切正常,接下来就启动线程并执行任务,接着发送应答消息进行回复:

task.startTaskThread()
sender ! decorateMessage(Acknowledge)

submitTask方法比起JobManager的submitJob方法,逻辑和代码量都相对简单。我们会进一步分析两个过程:

  1. Task对象的构造方法
  2. Task作为一个线程,其run方法的实现

首先关注的是Task的构造方法,Task作为TaskManager的启动对象,其需要的参数基本都跟其执行有关,参数如下:

public Task(TaskDeploymentDescriptor tdd,            //任务描述符
    MemoryManager memManager,                        //内存管理器
    IOManager ioManager,                             //IO管理器
    NetworkEnvironment networkEnvironment,           //网络环境对象,处理网络请求
    BroadcastVariableManager bcVarManager,           //广播变量管理器
    ActorGateway taskManagerActor,                   //TaskManager对应的actor通信网关
    ActorGateway jobManagerActor,                    //JobManager对应的actor通信网关
    FiniteDuration actorAskTimeout,                  //actor响应超时时间
    LibraryCacheManager libraryCache,                //用户程序的Jar、类库缓存
    FileCache fileCache,                             //用户定义的文件缓存,执行时需要
    TaskManagerRuntimeInfo taskManagerConfig         //TaskManager运行时配置
)

构造方法的第一段代码是将TaskDeploymentDescriptor封装的大量信息“转交”给Task对象。

接下来会根据结果分区部署描述符ResultPartitionDeploymentDescriptor和输入网关部署描述符InputGateDeploymentDescriptor来初始化结果分区以及输入网关,其中结果分区是当前的task实例产生的,而输入网关是用来从网络上消费前一个任务的结果分区。首先看一下结果分区的初始化:

this.producedPartitions = new ResultPartition[partitions.size()];
this.writers = new ResultPartitionWriter[partitions.size()];
for (int i = 0; i < this.producedPartitions.length; i++) {
    ResultPartitionDeploymentDescriptor desc = partitions.get(i);
    ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);
    this.producedPartitions[i] = new ResultPartition(
        taskNameWithSubtaskAndId,
        jobId,
        partitionId,
        desc.getPartitionType(),
        desc.getEagerlyDeployConsumers(),
        desc.getNumberOfSubpartitions(),
        networkEnvironment.getPartitionManager(),
        networkEnvironment.getPartitionConsumableNotifier(),
        ioManager,
        networkEnvironment.getDefaultIOMode());   

    this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
}

以上代码主要的逻辑是循环初始化结果分区对象数组producedPartitions以及结果分区写入器数组writers。结果分区对象初始化时,会根据ResultPartitionType的类型来判断是创建阻塞式的子分区还是创建管道式的子分区,这涉及到数据传输的方式。ResultPartitionWriter是面向结果分区的运行时结果写入器对象。

下面的代码用于输入网关的初始化:

this.inputGates = new SingleInputGate[consumedPartitions.size()];
this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
for (int i = 0; i < this.inputGates.length; i++) {
    SingleInputGate gate = SingleInputGate.create(
        taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment);
    this.inputGates[i] = gate;   inputGatesById.put(gate.getConsumedResultId(), gate);
}

输入网关的初始化则是根据上游task产生的结果分区来进行挨个初始化。

最终它会为该任务的执行创建一个线程:

executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

其实Task类实现了Runnable接口,它的实例本身就可以被线程执行,然后它又在内部实例化了一个线程对象并保存了执行它自身的线程引用,进而获得了对该线程的完全控制。比如,用startTaskThread方法来启动执行Task的线程。Task线程的执行细节,我们将会在接下来进行分析。

从这里我们也能看到,每个任务的部署会产生一个Task对象,而一个Task对象恰好对应一个执行它的线程实例。

Task线程的执行

Task实现了Runnable接口,那么毫无疑问其run方法承载了Task被执行的核心逻辑。而之前,我们将会分析Task线程的执行流程。

首先,第一步先对Task的执行状态进行转换:

while (true) {
    ExecutionState current = this.executionState;
    //如果当前的执行状态为CREATED,则对其应用CAS操作,将其设置为DEPLOYING状态,如果设置成功,将退出while无限循环
    if (current == ExecutionState.CREATED) {
        if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
            // success, we can start our work
            break;
        }
    }
    //如果当前执行状态为FAILED,则发出最终状态的通知消息,并退出run方法的执行
    else if (current == ExecutionState.FAILED) {
        notifyFinalState();
        return;
    }
    //如果当前执行状态为CANCELING,则对其应用cas操作,并将其修改为CANCELED状态,如果修改成功则发出最终状态通知消息,
    //同时退出run方法的执行
    else if (current == ExecutionState.CANCELING) {
        if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {
            notifyFinalState();
            return;
        }
    }
    //如果当前的执行状态为其他状态,则抛出异常
    else {
        throw new IllegalStateException("Invalid state for beginning of task operation");
    }
}

接下来,是对用户代码所打成的jar包的加载并生成对应的类加载器,同时获取到程序的执行配置ExecutionConfig。根据类加载器以及用户的可执行体在Flink中所对应的具体的实现类名来加载该类:

invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

Flink中所有类型的操作都有特定的可执行体,它们无一例外都是对AbstractInvokable类的扩展。每个的可执行体的名称在生产JobGraph时就已确定。

紧接着的一个关键操作就是向网络栈注册该任务对象:

network.registerTask(this);

这个操作是为了让Task之间可以基于网络互相进行数据交换,包含了分配网络缓冲、结果分区注册等一系列内部操作,并且有可能会由于系统无足够的内存而发生失败。

然后会把各种配置、管理对象都打包到Task在执行时的统一环境对象Environment中,并将该环境对象赋予可执行体:

invokable.setEnvironment(env);

在此之后,对于有状态的任务,如果它们的状态不为空,则会对这些有状态的任务进行状态初始化:

SerializedValue<StateHandle<?>> operatorState = this.operatorState;

if (operatorState != null) {
    if (invokable instanceof StatefulTask) {
    try {
        StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
        StatefulTask<?> op = (StatefulTask<?>) invokable;
        StateUtils.setOperatorState(op, state);
    }
    catch (Exception e) {
        throw new RuntimeException("Failed to deserialize state handle and "
            + " setup initial operator state.", e);
    }
    }
    else {
        throw new IllegalStateException("Found operator state for a non-stateful task invokable");
    }
}

通常什么情况下任务会有初始状态呢?当任务并不是首次运行,比如之前发生过失败从某个检查点恢复时会从检查点中获取当前任务的状态,在执行之前先进行初始化。

接下来,会将任务的执行状态变更为RUNNING,并向观察者以及TaskManager发送通知:

if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
    throw new CancelTaskException();
}

notifyObservers(ExecutionState.RUNNING, null);
taskManager.tell(new UpdateTaskExecutionState(
    new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)));

然后将执行线程的类加载器设置为用户代码的类加载器,然后调用可执行体的invoke方法,invoke方法实现了每个可执行体所要执行的核心逻辑。

executingThread.setContextClassLoader(userCodeClassLoader);
invokable.invoke();

invoke方法的执行是个分界点,在执行之前用户逻辑还没有被触发执行;而该方法被执行之后,说明用户逻辑已被执行完成。

然后对当前任务所生产的所有结果分区调用finish方法进行资源释放:

for (ResultPartition partition : producedPartitions) {
    if (partition != null) {
        partition.finish();
    }
}

最后将任务的执行状态修改为FINISHED,并发出通知:

if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) {
    notifyObservers(ExecutionState.FINISHED, null);
}
else {
    throw new CancelTaskException();
}

接下来在finally块里进行一系列资源释放操作。

最终的可执行体

Task是在TaskManager中执行任务的统一抽象,它的核心仍然是如何执行,而不是如何表述。比如,批处理任务和流处理任务,它们有很大的差别,但我们需要一种表述层面上的抽象,使得它们最终都能被Task所接收,然后得到执行。而该表述层面上的抽象即为AbstractInvokable。它是所有在TaskManager中真正被执行的主体。其类图如下:

AbstractInvokable定义了一系列的“上下文”对象,同时提供了核心两个方法:

  • invoke:该抽象方法是描述用户逻辑的核心方法,最终在Task线程中被执行的就是该方法;
  • cancel:取消执行用户逻辑的方法,提供了默认为空的实现,用户取消执行或者执行失败会触发该方法的调用;

跟Flink提供了流处理和批处理的API一致,AbstractInvokable也相应的具有两个派生类:

  • StreamTask:所有流处理任务的基类,实现位于flink-streaming-java模块中;
  • BatchTask:所有批处理任务的基类,实现位于runtime模块中;

无论是哪种形式的任务,在生成JobGraph阶段就已经被确定并加入到JobVertex中:

public void setInvokableClass(Class<? extends AbstractInvokable> invokable) {
    Preconditions.checkNotNull(invokable);
    this.invokableClassName = invokable.getName();
    this.isStoppable = StoppableTask.class.isAssignableFrom(invokable);
}

随后被一直携带到Task类中,并通过反射的机制从特定的类加载器中创建其实例,最终调用其invoke方法执行:

private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className)
    throws Exception {
    Class<? extends AbstractInvokable> invokableClass;
    try {
        invokableClass = Class.forName(className, true, classLoader)
            .asSubclass(AbstractInvokable.class);
    }   catch (Throwable t) {
        throw new Exception("Could not load the task's invokable class.", t);
    }
    try {
        return invokableClass.newInstance();
    }   catch (Throwable t) {
        throw new Exception("Could not instantiate the task's invokable class.", t);
    }
}

关于更多用户逻辑的执行细节,我们后续会进行分析。

原文发布时间为:2017-01-24

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-09-17 15:39:23

Flink运行时之TaskManager执行Task的相关文章

网络编程-inet_pton函数运行时,这个函数执行出错,难道我的函数调用有问题吗

问题描述 inet_pton函数运行时,这个函数执行出错,难道我的函数调用有问题吗 #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include<stdio.h> #include<stdlib.h> #include<strings.h> void outputError(char *message) { fprintf(stdout, &q

当APP在后台运行时,接收到消息.有部分方法会执行两次.导致通知中显示收到的消息数是实际的两倍.

问题描述 当APP在后台运行时,接收到消息.有部分方法会执行两次.导致通知中显示收到的消息数是实际的两倍.求大神解答 解决方案 什么部分方法会执行两次,你是否监听了多次呢?解决方案二:我的也有这样的问题,求答案解决方案三:我也遇到了这个问题.我分析了一下,估计是下面的原因:测试手机使用的是小米,最新的SDK集成了小米的推送服务.导致在接受消息的时候,会接受小米的推送消息,然后环信的连接就连上了,环信又接受了一次.这就出现两条消息提示,但是实际只有一条消息.

使用并发与协调运行时

介绍 并发与协调运行时(Concurrency and Coordination Runtime,CCR)是一个.NET平台 上的异步信息传递类库,提供了一套细小而强大的基础功能,能够使用不同的方式来组织 应用程序.应用程序通过有效使用CCR可以获得更好的响应能力,以及更好的伸缩性及容 错性.而它最神奇的地方则在于,开发人员获得这些便利的同时,还减少(甚至完全消除 )了对线程.锁.互斥体(mutex)或其他同步元素的直接操作(或捕获错误). 如果您的应用程序是单线程的,CCR可以使您的程序提高响

大型Web应用运行时 PHP负载均衡指南

过去当运行一个大的web应用时候意味着需要运行一个大型的web服务器.因为你的应用吸引了大量的用户,你将不得不在你的服务器里增加更多的内存和处理器.今天,"大型服务器"模式已经过去,取而代之的是大量的小服务器,使用各种各样的负载均衡技术. "更多小服务器"的优势超过过去的"大型服务器"模式体现在两个方面: 1. 如果服务器宕机,那么负载均衡系统将停止请求到宕机的服务器,转而分发负载到其他正常运行的服务器上. 2. 扩展你的服务器更加容易.你要做的

.NET基础知识-公共语言运行时

   .NET提供了一个运行环境,叫做公共语言运行时(CLR).CLR管理代码的执行并使开发过程变得更加简单.CLR是一种受控的执行环境,其功能通过编译器与其他工具共同展现.以"运行时"为目标的代码称为受控代码(Managed Code).受控代码指向的对象在执行过程中完全被CLR所控制.在执行过程中,CLR提供自动内存管理.调试支持.增强的安全性及与受控代码(如COM组件)的互操作性.凡是符合CLS(公共语言规范)的程序语言(如C#和Visual Basic.NET等)所开发的程序都

WF从入门到精通(第二章):workflow运行时

学习完本章后,你将掌握: 1.在你的应用程序中使用workflow 2.理解"WorkflowRuntime"对象的的基本功能 3.知道如何启动和停止workflow运行时 4.能够使用各种workflow运行时的相关事件 当你在WF环境中执行任务时,需要一些东西来监管执行的过程,这个东西就是命名为"WorkflowRuntime"的对象.WorkflowRuntime启动一个独立的工作流任务.在你的任务执行过程中,WorkflowRuntime也会针对不同的情况响

.net中如何得到实际运行时的asm代码

在对.net程序进行调试或者性能测试时,常常需要查看生成的IL代码,但仅仅有IL代码还是不够的,有时我们还希望查看CLR生成的最终asm代码.在VS里,可以非常方便的查看最终的asm代码:当程序执行到断点时,在代码窗口右键选择Go To Disassemble就可以.但是,当通过VS Debug程序时,为了方便调试,CLR通常不会生成最优化的代码.所以为了得到实际运行时的asm代码,还必须做以下设置: 1,在Release模式下编译代码: 2. 打开工程属性窗口,选择"Build"页面

iOS运行时(Runtime)总结

声明 本博客中文章不会在此处再更新,只会在微信公众号中更新,请关注微信公众号,以获取最新的学习资源和更多学习资源.本博文末尾有微信公众号二维码,扫一扫添加关注. 原文出自:微信公众号iOSDevShares的文章 引言 相信很多同学都听过运行时,但是我相信还是有很多同学不了解什么是运行时,到底在项目开发中怎么用?什么时候适合使用?想想我们的项目中,到底在哪里使用过运行时呢?还能想起来吗?另外,在面试的时候,是否经常有笔试中要求运用运行时或者在面试时面试官会问是否使用过运行时,又是如何使用的? 回

面向Android上Dalvik运行时的C# 编译器dot42简介

Mono for Android最大的缺点是需要在Mono上面构建,这与Android预期的运行时完全不同.尽管能够直接访问完整的CLR的确有些优势,但是它与Android的Dalvik 运行时之间的封送调用(marshalling call)可能非常昂贵.那为什么不跳过IL代码直接生成Dex代码呢? 事实上这有点夸张.dot42编译器实际上并没有跳过IL.恰恰相反,它读取IL代码并将其转换为一种叫做RL或Register Language的新语言.IL和RL主要的差异在于IL是基于栈的(有点像