Flink中task之间的数据交换机制

Flink中的数据交换构建在如下两条设计原则之上:

  • 数据交换的控制流(例如,为实例化交换而进行的消息传输)是接收端初始化的,这非常像最初的MapReduce。
  • 数据交换的数据流(例如,在网络上最终传输的数据)被抽象成一个叫做IntermediateResult的概念,它是可插拔的。这意味着系统基于相同的实现逻辑可以既支持流数据,又支持批处理数据的传输。

数据传输包含多个对象,它们是:

  • JobManager master节点,用于响应任务调度、恢复、协作,以及通过ExecutionGraph数据结构来hold住job的整个图结构。
  • TaskManager worker节点,一个TaskManager(TM)在多线程中并发执行多个task。每一个TM也包含一个CommunicationManager(CM - 任务之间共享),以及一个MemoryManager(MM - 也在任务之间共享)。TM之间彼此可以进行数据交换通过标准的TCP连接,这些连接在需要通信时被创建。

注意,在Flink中,是TaskManager而不是task在网络上交换数据。比如,处于同一个TM内的task,他们之间的数据交换是在一个网络连接(TaskManager创建并维护)上基于多路复用的。

ExecutionGraph: 执行图是一个包含job计算的“ground truth”的数据结构。它包含节点(ExecutionVertex,表示计算任务),以及中间结果(IntermediateResultPartition,表示任务产生的数据)。节点通过ExecutionEdge(EE)来连接到它们要消费的中间结果:

这些都是存活在JobManager中的逻辑数据结构。它们在TaskManager中存在运行时等价的数据结构,用来应对最终的数据处理。在运行时,IntermediateResultPartition的等价数据结构被称为ResultPartition。

ResultPartition(RP)表示BufferWriter写入的data chunk。一个RP是ResultSubpartition(RS)的集合。这是为了区别被不同接收者定义的数据,例如针对一个reduce或一个join的分区shuffle的场景。

ResultSubpartition(RS)表示一个operator创建的数据的一个分区,跟要传输的数据逻辑一起传输给接收operator。RS的特定的实现决定了最终的数据传输逻辑,它被设计为插件化的机制来满足系统各种各样的数据传输需求。例如,PipelinedSubpartition就是一种支持流数据交换的pipeline的实现。而SpillableSubpartition是一个支持批处理的块数据实现。

InputGate: 在接收端,逻辑上等价于RP。它用于处理并收集来自上游的buffer中的数据。

InputChannel: 在接收端,逻辑上等价于RS。用于接收某个特定的分区的数据。

Buffer: 参见memory-management

序列化器、反序列化器用于可靠得将类型化的数据转化为纯粹的二进制数据,处理跨buffer的数据。

数据交换的控制流

上图表示一个简单的map-reduce job并具有两个并行的task。我们有两个TaskManager,每个TaskManager都有两个task(一个map,一个reduce),这两个TaskManager运行在两个不同的节点上,有一个JobManager运行在第三方节点上。我们聚焦在task M1和R2之间的传输初始化。数据传输使用粗箭头表示,消息使用细箭头表示。首先,M1生产一个ResultPartition(RP1)(箭头1)。当RP对于消费端变得可访问(我们后面会讨论),它会通知JobManager(箭头2)。JobManager通知想要接收这个分区数据的接收者(task R1和R2)分区当前已经准备好了。如果接收者还没有被调度,这将会触发task的deployment(箭头3a,3b)。然后接收者将会向RP请求数据(箭头4a,4b)。这将会初始化任务之间的数据传输(5a,5b),这个初始化要么是本地的(5a),或者通过TaskManager的网络栈传输(5b)。这种机制给了RP在决定什么时候通知JobManager自己已经处于准备好状态的时机上拥有充分的自由度。例如,如果RP1希望在通知JM之前,等待数据完整地传输完(比如它将数据写到一个临时文件里),这种数据交换机制粗略来看等同于批处理数据交换,就像在Hadoop中实现的那样。而如果RP1一旦在其第一条记录准备好时就通知JobManager,那么我就拥有了一个流式的数据交换。

字节缓冲区在两个task之间的传输

上面这张图展示了一个更细节的过程,描述了数据从生产者传输到消费者的完整生命周期。最初,MapDriver生产数据记录(通过Collector收集),这些记录被传给RecordWriter对象。RecordWriter包含一组序列化器(RecordSerializer对象)。消费者task可能会消费这些数据。一个ChannelSelector选择一个或者多个序列化器来处理记录。如果记录在broadcast中,它们将被传递给每一个序列化器。如果记录是基于hash分区的,ChannelSelector将会计算记录的hash值,然后选择合适的序列化器。

序列化器将数据记录序列化成二进制的表示形式。然后将它们放到大小合适的buffer中(记录也可以被切割到多个buffer中)。这些buffer首先会被传递给BufferWriter,然后被写到一个ResulePartition(RP)中。一个RP包含多个subpartition(ResultSubpartition - RS),用于为特定的消费者收集buffer数据。在上图中的这个buffer是为TaskManager2中的reducer定义的,然后被放到RS2中。既然首个buffer进来了,RS2就对消费者变成可访问的状态了(注意,这个行为实现了一个streaming shuffle),然后它通知JobManager。

JobManager查找RS2的消费者,然后通知TaskManager 2一个数据块已经可以访问了。通知TM2的消息会被发送到InputChannel,该inputchannel被认为是接收这个buffer的,接着通知RS2可以初始化一个网络传输了。然后,RS2通过TM1的网络栈请求该buffer,然后双方基于netty准备进行数据传输。网络连接是在TaskManager(而非特定的task)之间长时间存在的。

一旦buffer被TM2接收,它会穿过一个类似的对象栈,起始于InputChannel(接收端 等价于IRPQ),进入InputGate(它包含多个IC),最终进入一个RecordDeserializer,它用于从buffer中还原成类型化的记录,然后将其传递给接收task,这个例子中是ReduceDriver。

原文发布时间为:2016-04-24

时间: 2024-10-24 17:10:07

Flink中task之间的数据交换机制的相关文章

利用Java技术搞定两个不同数据库之间的数据交换

 1.建立远程数据库的连接: public static synchronized Connection getConFromRemote() { Connection con = null; String url = "jdbc:sqlserver://admin.xxx.xxx.comdbo:1436;databaseName=remote_jadepool"; String userName = "hkm12345"; String password = &q

中创电子政务数据交换平台解决方案

方案概要 中创软件推出的"电子政务数据交换平台解决方案",是基于中创软件Infor系列中间件技术,结合政府信息化建设现状及发展需求而推出的,使得各政府部门之间的基础数据共享,让基础数据发挥更大的社会价值,使得政府从宏观上把握经济运行的整体情况.该方案主要实现: 实现政府部门之间数据的安全.可靠交换和共享,避免数据重复采集,保持各部门基础数据的一致: 实现数据的即时整合,并对全局数据进行灵活的多维度分析和多样式展示,为管理层监控和决策提供有效支持. 系统概述 中创软件商用中间件有限公司与

Android:Activity+Fragment及它们之间的数据交换

简介: 为什么要用Fragment?使用Fragment可以在一个Activity中实现不同的界面.Fragment与Fragment之间的动画切换,远比Activity与Activity之间的动画切换变化方式多.很多时候,我们通过使用一个Activity,切换多个Fragment.本次博客,主要列举一下Fragment与它的Activity之间进行数据交换的方式. 1.Fragment中通过getActivity()然后进行强制转化,调用Activity中的公有方法 ((XXXXActivit

Excel和Access之间的数据交换

在Microsoft OfficeAccess和 Microsoft OfficeExcel之间存在多种交换数据的方法. ·若要将Access中的数据装入 Excel,可以从Access数据表中复制数据并粘贴到Excel工作表中,从Excel工作表连接到Access数据库,或者将Access数据导出到Excel工作表中. ·若要将Excel中的数据装入 Access,可以从Excel工作表复制数据并粘贴到Access数据表中,将Excel工作表导入Access表中,或者从Access表链接到Ex

虚拟机如何与计算机之间进行数据交换?

虚拟机通过软件模拟的具有完整硬件系统功能的.运行在一个完全隔离环境中的完整计算机系统.通过虚拟机软件,你可以在一台物理计算机上模拟出一台或多台虚拟的计算机,这些虚拟机完全就像真正的计算机那样进行工作,例如你可以安装操作系统.安装应用程序.访问网络资源等等.对于你而言,它只是运行在你物理计算机上的一个应用程序,但是对于在虚拟机中运行的应用程序而言,它就像是在真正的计算机中进行工作.因此,当我在虚拟机中进行软件评测时,可能系统一样会崩溃,但是,崩溃的只是虚拟机上的操作系统,而不是物理计算机上的操作系

OpenCV学习(6) 文件和Mat之间的数据交换

      有时候为了便于调试算法,我们需要从文本文件或二进制文件中读取数据,并把数据放到相应的矩阵中去.我们通常可以通过下面的函数实现.   1.从二进制文件中读取数据.      新建一个txt文件 input.txt,在vs2010中,右键点击该文件,选择open with,然后选择Binary Editor,就可以用二进制的方式打开文件了. 编辑文件并保存之后,我们可以用下面的函数把数据读入到矩阵中去. int gMophEx::LoadData(string fileName, cv:

物理隔离与数据交换:网闸的设计原理与误区

一.什么是网闸 网闸技术的需求来自内网与外网数据互通的要求,比如政府的电子政务是对公众服务,与互联网连通,而内网的政府办公网络,由于保密的要求,内网若与网连通,则面临来自公网的各种威胁.安全专家给出的建议是:由于目前的安全技术,无论防火墙.UTM等防护系统都不能保证攻击的一定阻断,入侵检测等监控系统也不能保证入侵行为完全捕获,所以最安全的方式就是物理的分开,所以在公安部的技术要求中,要求电子政务的内.外网络之间"物理隔离".没有连接,来自外网对内网的攻击就无从谈起. 但是,网络的物理隔

进程之间的数据交互的实现原理与方法?

问题描述 进程之间的数据交互的实现原理与方法? 在网上找了许多答案都不怎么好,有些太难懂了,求大神帮我回答下,谢谢! 解决方案 进程之间,是地址独立的,即各个进程只能访问自己进程内的数据.其它进程内的数据,想要访问必须通过系统! 一般来说,通过消息,共享内存等方式可以完成进程之间的数据交换.

Java多线程编程之使用Exchanger数据交换实例_java

用于实现两个人之间的数据交换,每个人在完成一定的事务后想与对方交换数据,第一个先拿出数据的人将一直等待第二个人拿着数据到来时,才能彼此交换数据. 复制代码 代码如下: package com.ljq.test.thread;   import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;   public cla