Flink运行时之客户端提交作业图-上

客户端提交作业图

作业图(JobGraph)是Flink的运行时所能理解的作业表示,无论程序通过是DataStream还是DataSet
API编写的,它们的JobGraph提交给JobManager以及之后的处理都将得到统一。本篇我们将分析客户端如何提交JobGraph给JobManager。

流处理程序提交作业图

在前面讲解Flink的核心概念的时候我们谈到了Flink利用了“惰性求值”的概念,只有当最终调用execute方法时,才会真正开始执行。因此,execute方法是我们的切入点。

DataStream API所编写的程序生成作业图之后,在提交时产生的方法调用时序图示意如下:

上图中的多个run方法是同名的方法重载。

从时序图中可以看到,ClusterClient对其自身抽象方法submitJob的调用是触发作业图提交的方法。随后真正的提交逻辑由JobClient实现。

ClusterClient封装了提交一个程序到远程集群的必要的功能,而StandaloneClusterClient则扩展了ClusterClient的功能,它专门针对独立的集群提供服务,这两个类都位于flink-clients模块中。JobClient则负责将用户的Job提交给JobManager,它充当了提交代理的角色,并返回表示作业执行结果的JobExecutionResult对象。

JobClient是提交所有类型的Job的统一入口,具体的提交细节我们将会在“公共的提交流程”中详细分析。

批处理程序提交作业图

利用DataSet API所编写的程序生成作业图之后,在提交时产生的方法调用的时序图如下:

上图中出现多个重名的run方法为同名方法重载。

从上图中可以看到,批处理程序的JobGraph跟流处理程序的JobGraph在提交之前有非常明显的不同。它引入了PlanExecutor作为Flink程序的计划执行器。而RemoteExecutor是PlanExecutor的实现,用于将程序提交给远程的Flink集群。具体的提交动作被进一步委托给ClusterClient及其实现(StandaloneClusterClient)最终同样被JobClient代理提交给JobManager。

公共的提交流程

从前面的时序图可见Flink对于不同类型的程序的提交流程最终是殊途同归的。因此,接下来我们将对公共的提交流程进行分析。一个程序的JobGraph真正被提交始于对JobClient的submitJobAndWait方法的调用。

submitJobAndWait方法用于将一个JobGraph发送到指定的JobClient
actor,随后它会将该JobGraph转发给JobManager。该方法会一直阻塞,直到该作业执行完成或者感知不到JobManager的存活。如果作业被顺利执行完成则返回JobExecutionResult对象而如果JobManager产生故障,则抛出抛出JobExecutionException异常。

一个JobGraph从提交开始会经过多个对象层层递交,各个对象之间的交互关系如下图所示:

JobClient在其中起到了“桥接”作用,它桥接了同步的方法调用和异步的消息通信。更具体得说,JobClient可以看做是一个“静态类”提供了一些静态方法,这里我们主要关注上面的submitJobAndWait方法,该方法内部封装了Actor之间的异步通信(具体的通信对象是JobClientActor,它负责跟JobManager的ActorSystem的Actor对象进行通信),并以阻塞的形式返回结果。而ClusterClient只需调用JobClient的submitJobAndWait方法,而无需关注其内部是如何实现的。

通过调用JobClient的submitJobAndWait静态方法,会触发基于Akka的Actor之间的消息通信来完成后续的提交JobGraph的动作。这之间的交互示意图如下:

这里总共有两个ActorSystem,一个归属于JobClient,另一个归属于JobManager。在submitJobAndWait方法中,其首先会创建一个JobClientActor的ActorRef:

ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

然后向其发起一个SubmitJobAndWait消息,该消息将JobGraph的实例提交给JobClientActor。发起模式是ask,它表示需要一个应答消息。

Akka的消息通信模型有两种:

  1. Fire and
    forget:消息的生产者不期望从消息的消费者那里得到应答。这种消息会以异步的形式发送,发送方法在发送完成之后立即返回。Akka的actor使用tell方法发送这种消息。
  2. Send and
    receive:消息的生产者期待并将等待从消费者那里得到应答。这种消息也会以异步的形式发送,发送完成后会返回一个Future对象,该对象表示一个潜在的应答。Akka的actor使用ask方法发送这种消息,并通过Future来获取响应。

JobClient向JobClientActor发送消息的代码段如下:

Future<Object> future = Patterns.ask(jobClientActor,
                                     new JobClientMessages.SubmitJobAndWait(jobGraph),
                                     new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

该SubmitJobAndWait消息被JobClientActor接收后,最终通过调用tryToSubmitJob方法触发真正的提交动作。在tryToSubmitJob方法中,一个JobGraph的提交将会分为两步:

  1. 将用户程序相关的Jar包上传至JobManager;
  2. 给JobManager Actor发送封装JobGraph的SubmitJob消息;

随后,JobManager
Actor会接收到来自JobClientActor的SubmitJob消息,进而触发submitJob方法,该方法的执行主体已经是JobManager了。submitJob包含的逻辑较为复杂,且任何一个检测或者子调用所产生的异常都可能会导致提交失败。我们列举一下该方法完成的主要任务:

  1. 向BlobLibraryCacheManager注册该Job;
  2. 构建ExecutionGraph对象;
  3. 对JobGraph中的每个顶点进行初始化;
  4. 将DAG拓扑中从source开始排序,排序后的顶点集合附加到ExecutionGraph对象;
  5. 获取检查点相关的配置,并将其设置到ExecutionGraph对象;
  6. 向ExecutionGraph注册相关的listener;
  7. 执行恢复操作或者将JobGraph信息写入SubmittedJobGraphStore以在后续用于恢复目的;
  8. 响应给客户端JobSubmitSuccess消息;
  9. 对ExecutionGraph对象进行调度执行;

如果提交流程顺利,用户程序包以及描述Job的JobGraph将会被JobManager接收,随后JobManager会对Job进行调度、部署并执行。JobClient会阻塞等待提交结果返回。在得到返回结果之后,先进行解析判断它是否是Job被成功执行后返回的结果:

if (answer instanceof JobManagerMessages.JobResultSuccess) {
    SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
    if (result != null) {
        try {
            return result.toJobExecutionResult(classLoader);
         } catch (Throwable t) {
             throw new JobExecutionException(jobGraph.getJobID(),
             "Job was successfully executed but JobExecutionResult could not be deserialized.");
        }
    } else {
         throw new JobExecutionException(jobGraph.getJobID(),
         "Job was successfully executed but result contained a null JobExecutionResult.");
    }
}

还是失败后返回的结果:

if (answer instanceof JobManagerMessages.JobResultFailure) {
    LOG.info("Job execution failed");
    SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
    if (serThrowable != null) {
        Throwable cause = serThrowable.deserializeError(classLoader);
        if (cause instanceof JobExecutionException) {
            throw (JobExecutionException) cause;
        } else {
            throw new JobExecutionException(jobGraph.getJobID(), "Job execution failed", cause);
        }
    } else {
        throw new JobExecutionException(jobGraph.getJobID(),
        "Job execution failed with null as failure cause.");
    }
} else {
    throw new JobExecutionException(jobGraph.getJobID(),
        "Unknown answer from JobManager after submitting the job: " + answer);
}

以上就是批处理作业和流处理作业共同的提交流程,这中间涉及了JobManager接收到用户提交后一系列处理,这部分的处理细节我们随后进行分析。

原文发布时间为:2017-03-31

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-09-28 23:42:21

Flink运行时之客户端提交作业图-上的相关文章

Flink运行时之客户端提交作业图-下

submitJob方法分析 JobClientActor通过向JobManager的Actor发送SubmitJob消息来提交Job,JobManager接收到消息对象之后,构建一个JobInfo对象以封装Job的基本信息,然后将这两个对象传递给submitJob方法: 我们会以submitJob的关键方法调用来串讲其主要逻辑.首先判断jobGraph参数,如果为空则直接回应JobResultFailure消息: 接着,向类库缓存管理器注册该Job相关的库文件.类路径: 必须确保该步骤率先成功执

Flink运行时之基于Netty的网络通信上

概述 本文以及接下来的几篇文章将介绍Flink运行时TaskManager间进行数据交换的核心部分--基于Netty通信框架远程请求ResultSubpartition.作为系列文章的第一篇,先列出一些需要了解的基础对象. NettyConnectionManager Netty连接管理器(NettyConnectionManager)是连接管理器接口(ConnectionManager)针对基于Netty的远程连接管理的实现者.它是TaskManager中负责网络通信的网络环境对象(Netwo

Flink运行时之生成作业图

生成作业图 在分析完了流处理程序生成的流图(StreamGraph)以及批处理程序生成的优化后的计划(OptimizedPlan)之后,下一步就是生成它们面向Flink运行时执行引擎的共同抽象--作业图(JobGraph). 什么是作业图 作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一. 相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已

Flink运行时之合久必分的特定任务

合久必分的特定任务 前面我们谈到了TaskManager对每个Task实例会启动一个独立的线程来执行.在分析线程执行的核心代码时,我们看到最终执行的是AbstractInvokable这样执行体的invoke方法.所谓合久必分,鉴于流处理任务跟批处理任务执行模式上存在巨大的差异,在对AbstractInvokable的实现时,它们将会走向两个不同的分支. 流处理相关的任务 流处理所对应的任务的继承关系图如下: 从上面的继承关系图可见,StreamTask是流处理任务的抽象.因为在DataStre

Flink运行时之生产端结果分区

生产端结果分区 生产者结果分区是生产端任务所产生的结果.以一个简单的MapReduce程序为例,从静态的角度来看,生产端的算子(Map)跟消费端的算子(Reduce),两者之间交换数据通过中间结果集(IntermediateResult).形如下图: 而IntermediateResult只是在静态表述时的一种概念,在运行时,算子会被分布式部署.执行,我们假设两个算子的并行度都为2,那么对应的运行时模型如下图: 生产端的Map算子会产生两个子任务实例,它们各自都会产生结果分区(ResultPar

Flink运行时之流处理程序生成流图

流处理程序生成流图 DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph). 什么是流图 流图(StreamGraph)是表示流处理程序拓扑的数据结构,它封装了生成作业图(JobGraph)的必要信息.它的类继承关系如下图所示: 当你基于StreamGraph的继承链向上追溯,会发现它实现了FlinkPlan接口. Flink效仿了传统的关系型数据库在执行SQL时生成执行计划并对其进行优化的思路

Flink运行时之结果分区消费端

结果分区消费端 在前一篇,我们讲解了生产者分区,生产者分区是生产者任务生产中间结果数据的过程.消费者任务在获得结果分区可用的通知之后,会发起对数据的请求.我们仍然以生产者分区的例子作为假设,其在消费端示意图如下: 可以看到在生产端和消费端存在对等的模型,具体ResultSubpartition中的数据如何被消费,我们将在本篇进行深入剖析. 输入网关 输入网关(InputGate)用于消费中间结果(IntermediateResult)在并行执行时由子任务生产的一个或多个结果分区(ResultPa

Flink运行时之统一的数据交换对象

统一的数据交换对象 在Flink的执行引擎中,流动的元素主要有两种:缓冲(Buffer)和事件(Event).Buffer主要针对用户数据交换,而Event则用于一些特殊的控制标识.但在实现时,为了在通信层统一数据交换,Flink提供了数据交换对象--BufferOrEvent.它是一个既可以表示Buffer又可以表示Event的类.上层使用者只需调用isBuffer和isEvent方法即可判断当前收到的这条数据是Buffer还是Event. 缓冲 缓冲(Buffer)是数据交换的载体,几乎所有

Flink运行时之基于Netty的网络通信中

PartitionRequestClient 分区请求客户端(PartitionRequestClient)用于发起远程PartitionRequest请求,它也是RemoteChannel跟Netty通信层之间进行衔接的对象. 对单一的TaskManager而言只存在一个NettyClient实例.但处于同一TaskManager中不同的任务实例可能会跟不同的远程TaskManager上的任务之间交换数据,不同的TaskManager实例会有不同的ConnectionID(用于标识不同的IP地