Apache Flink Client生成StreamGraph

概述

上文我们分析提交流程时,RemoteStreamEnvironment类的execute方法的第一步就是生成StreamGraph

StreamGraph是用于表示流的拓扑结构的数据结构,它包含了生成JobGraph的必要信息。它的类继承关系图如下:

如果你按照StreamGraph的继承链向上追溯,最终会发现它实现了接口FlinkPlan。Flink在这里效仿的是数据库的执行SQL是产生执行计划的机制,FlinkPlan定义在Flink的优化器相关的包中,针对流应用的计划是StreamingPlan

针对Batch类的应用的计划类是OptimizedPlan。Flink会对Batch类的应用进行优化(这点我们后面会分析),而当前针对Streaming类的应用没有优化措施。

StreamGraph的形象化表示如下图:

Flink官方提供了一个计划可视化器来图形化执行计划

节点和边

上面的图是由“节点”和“边”组成的。节点在Flink中对应的数据结构是StreamNode,而边在Flink中对应的数据结构是StreamEdgeStreamNodeStreamEdge之间存在着组合的依赖关系,依赖关系可见下图:

StreamEdge包含了其连接的源节点sourceVertex和目的节点targetVertex,而StreamNode中包含了与其连接的入边集合inEdges和出边集合outEdgesStreamEdgeStreamNode都有唯一的编号进行标识,但是各自编号的生成规则并不相同。

StreamNode的编号id的生成是通过调用StreamTransformation的静态方法getNewNodeId获得的,其实现是一个静态计数器:

// This is used to assign a unique ID to every StreamTransformation
protected static Integer idCounter = 0;

public static int getNewNodeId() {
    idCounter++;
    return idCounter;
}

StreamEdge的编号edgeId是字符串类型,其生成的规则为:

this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames + "_" + outputPartitioner;

它是由多个段连接起来的,语义的文字表述如下:

源顶点_目的顶点_输入类型数量_输出选择器的名称_输出分区器

edgeId除了用来实现StreamEdge的hashCode及equals方法之外并没有其他实际意义。

StreamNode其实是表示operator的数据结构,了解这一点很重要。从Flink开始生成StreamGraph开始,source、sink都是图中的一个节点都是operator,都通过StreamNode这一数据结构来表示,我们常将它们单独拎出来讲是因为它们是流的的输入和输出,但在数据结构层面上它们是一致的。

StreamNode除了存储了输入端和输出端的StreamEdge集合,还封装了operator的其他关键属性,基于这不是我们关注的重点,所以不再赘述。

回过头来我们看JobGraph就不是那么难理解了。它包含了表述整个流拓扑的所有必要信息(比如所有的节点集合、所有的source集合、所有的sink集合、虚拟输出选择节点、虚拟分区节点)。同时还包含了大量操作这些信息的方法。

生成StreamGraph

了解了基础的数据结构之后,我们来分析如何生成JobGraph。定位到getStreamGraph的实现:


public StreamGraph getStreamGraph() {
    if (transformations.size() <= 0) {
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
    }   

    return StreamGraphGenerator.generate(this, transformations);
}

它依赖于transformations集合,该集合中存储着一个Streaming程序中所有的转换操作对应的StreamTransformation对象。

每当在DataStream对象上调用transform方法或者调用已经被实现了的一些转换操作(如map、flter等,这些转换操作在内部也调用了transform方法),这些调用都会被加入到transformations集合中。

StreamTransformation表示创建DataStream的操作,其实每个DataStream底层都对应着一个StreamTransformation。DataStream持有执行环境对象的引用,当调用transform方法时,它会调用执行环境对象的addOperator方法,将特定的StreamTransformation对象加入到transformations集合中去,这就是transformations集合中元素的来源。

到目前为止我们提到了多个名词,它们之前拥有着强依赖关系,为了避免混淆,我们以flatMap转换操作为例图示各种对象之间的构建关系:

在源码中,其实Flink自身的命名也并不是那么准确,比如上图中的SingleOutputStreamOperator其实是一种DataStream,但却以Operator结尾,让人匪夷所思。这种情况下,鉴定它们类型的方式可以通过查看它们的继承链来进行识别。

StreamGraph的生成依赖于生成器StreamGraphGenerator,每调用一次静态方法generate才会在内部创建一个StreamGraphGenerator的实例,一个实例对应着一个StreamGraph对象。StreamGraphGenerator调用内部的实例方法generateInternal来遍历transformations集合的每个对象:


private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
    for (StreamTransformation<?> transformation: transformations) {
        transform(transformation);
    }   

    return streamGraph;
}

transform方法中,它枚举了Flink中每一种转换类型,并对当前传入的转换类型进行判断,然后将其分发给特定的转换方法进行转换,最终返回当前StreamGraph对象中跟该转换有关的节点编号集合。

你可以将整个过程看作是玩拼图游戏,每遍历完一个转换对象,就离构建完整的StreamGraph更近一步。所有类型各异的转换操作各自持有整个StreamGraph的一部分小图片,根据不同的转换操作类型,它们为StreamGraph提供的“部件”并不完全相同,有的转换只构建节点(如SourceTransformation),有的转换除了构建节点还构建边(如SinkTransformation),有的只构建虚拟节点(如PartitionTransformationSplitTransformationSelectTransformation)。

关于虚拟节点,这里需要说明的是并非所有转换操作都具有实际的物理意义(即物理上对应operator)。有些转换操作只具有逻辑概念,例如unionsplitselectpartition。这些转换操作不会构建真实的StreamNode对象。比如某个流处理应用对应的转换树如下图:

但在运行时,其生成的执行计划,这里也就等同于StreamGraph却是下图这种形式:

从图中可以看到,转换图中对应的一些逻辑操作在产生的执行计划时并不存在,Flink将这些逻辑转换操作转换成了虚拟节点,它们的信息会被绑定到从sourcemap转换的这条边上。

在给StreamGraph创建并添加一个operator时,需要给该operator指定slotSharingGroup,这时需要调用方法determineSlotSharingGroup来获得SlotSharingGroup的名称:

private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
    if (specifiedGroup != null) {
        return specifiedGroup;
    } else {
        String inputGroup = null;
        for (int id: inputIds) {
            String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
            if (inputGroup == null) {
                inputGroup = inputGroupCandidate;
            } else if (!inputGroup.equals(inputGroupCandidate)) {
                return "default";
            }
        }      

        return inputGroup == null ? "default" : inputGroup;
    }
}

当用户指定了组名,则直接使用用户指定的名称。如果用户没有指定特定的名称,则需要结合输入节点来做决定:第一种情况如果所有的输入节点都拥有相同的slotSharingGroup名称,那么就使用该组名;否则组名将被命名为default

Flink当前对于流处理的应用是不作优化的,所以其执行计划就是StreamGraph。Flink提供了一个执行计划的可视化器,它将客户端生成的执行计划以图形的方式展示出来,就像本节开始我们展示的那幅图就是可视化器生成的。那么我们怎么来查看我们自己编写的程序的执行计划呢?其实很简单,我们以Flink的flink-examples-streaming包中的SocketTextStreamWordCount为例,来看一下如何生成执行计划。

我们将SocketTextStreamWordCount最后一行代码注释掉:

env.execute("WordCount from SocketTextStream Example");

然后将其替换成下面这句:

System.out.println(env.getExecutionPlan());

这行语句的作用是打印当前这个程序的执行计划,它将在控制台产生该执行计划的JSON格式表示:

{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream",
"parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":2,
"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Keyed Aggregation",
"pact":"Operator","contents":"Keyed Aggregation","parallelism":2,"predecessors":[{"id":2,
"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Unnamed","pact":"Data Sink",
"contents":"Sink: Unnamed","parallelism":2,"predecessors":[{"id":4,"ship_strategy":"FORWARD",
"side":"second"}]}]}System.out.println(env.getExecutionPlan());

把上面这段JSON复制到Flink的执行计划可视化器,点击下方的Draw按钮,即可生成。

小结

本文我们谈论了StreamGraph的数据结构以及StreamGraphGenerator如何生成StreamGraph。鉴于StreamEdgeStreamNode是组成StreamGraph不可或缺的部分,我们还对这两个数据结构进行了简单的分析。当然,StreamGraph还有一个关键的实例方法:getJobGraph,它用于获取流处理程序的JobGraph(该方法继承自StreamingPlan)。至于什么是JobGraph以及如何获取它,我们将在下文进行讨论。

原文发布时间为:2016-07-23

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2025-01-03 08:39:24

Apache Flink Client生成StreamGraph的相关文章

Apache Flink fault tolerance源码剖析(四)

上篇文章我们探讨了Zookeeper在Flink的fault tolerance中发挥的作用(存储/恢复已完成的检查点以及检查点编号生成器). 这篇文章会谈论一种特殊的检查点,Flink将之命名为--Savepoint(保存点). 因为保存点只不过是一种特殊的检查点,所以在Flink中并没有太多代码实现.但作为一个特性,值得花费一个篇幅来介绍. 检查点VS保存点 使用数据流API编写的程序可以从保存点来恢复执行.保存点允许你在更新程序的同时还能保证Flink集群不丢失任何状态. 保存点是人工触发

Apache Flink流作业提交流程分析

用户编写的程序逻辑需要提交给Flink才能得到执行.本文来探讨一下客户程序如何提交给Flink.鉴于用户将自己利用Flink的API编写的逻辑打成相应的应用程序包(比如Jar)然后提交到一个目标Flink集群上去运行是比较主流的使用场景,因此我们的分析也基于这一场景进行. Flink的API针对不同的执行环境有不同的Environment对象,这里我们主要基于常用的RemoteStreamEnvironment和RemoteEnvironment进行分析 在前面我们谈到了Flink中实现了"惰性

Apache Flink fault tolerance源码剖析(五)

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储.这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的state backend(中文暂译为状态终端). 基于数据流API而编写的程序经常以各种各样的形式保存着状态: 窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发 转换函数可能会使用key/value状态接口来存储数据 转换函数可能实现Checkpointed接口来让它们的本地变量受益于fault tolerant机制 当检查点机制工作时,上面谈

Apache Flink fault tolerance源码剖析(二)

继续Flink Fault Tolerance机制剖析.上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于Actor的消息驱动的协同机制.这篇涉及到一个非常关键的类--CheckpointCoordinator. org.apache.flink.runtime.checkpoint.CheckpointCoordinator 该类可以理解为检查点的协调器,用来协调operator和state的分布

深入理解Apache Flink核心技术

Apache Flink(下简称Flink)项目是大数据处理领域最近冉冉升起的一颗新星,其不同于其他大数据项目的诸多特性吸引了越来越多人的关注.本文将深入分析Flink的一些关键技术与特性,希望能够帮助读者对Flink有更加深入的了解,对其他大数据系统开发者也能有所裨益.本文假设读者已对MapReduce.Spark及Storm等大数据处理框架有所了解,同时熟悉流处理与批处理的基本概念. Flink简介 Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布.数据通信以

Apache Flink实现的数据流体系结构

data Artisans应用程序工程总监Jamie Grier最近在OSCON 2016 Conference大会发言谈到了使用Apache Flink构建的一种数据流体系结构.同时还谈到了数据流应用程序的构建块. 数据流体系结构可用于处理随着时间流逝以事件流方式持续生成的数据,这一点不同于传统的静态数据集.相对于传统的集中式"状态化"数据库和数据仓库,数据流应用程序可以处理事件流以及针对历史事件汇总而来的应用程序本地状态.流式数据处理的一些优势包括: 降低从信号到决策的过程延迟 通

Apache Flink 实现的数据流体系结构

data Artisans应用程序工程总监Jamie Grier最近在OSCON 2016 Conference大会发言谈到了使用Apache Flink构建的一种数据流体系结构.同时还谈到了数据流应用程序的构建块. 数据流体系结构可用于处理随着时间流逝以事件流方式持续生成的数据,这一点不同于传统的静态数据集.相对于传统的集中式"状态化"数据库和数据仓库,数据流应用程序可以处理事件流以及针对历史事件汇总而来的应用程序本地状态.流式数据处理的一些优势包括: 降低从信号到决策的过程延迟 通

《Apache Flink官方文档》 Apache Flink介绍

下面是关于Apache Flink(以下简称Filnk)框架和流式计算的概述.为了更专业.更技术化的介绍,在Flink文档中推荐了一些"概念性"的文章. 1.无穷数据集的持续计算 在我们详细介绍Flink前,复习一下当我们计算数据选择运算模型时,很可能会遇到的一个更高级别的数据集类型.下面有两个观点经常容易混淆,很有必要去澄清它们. (1)两种数据集类型: ①无穷数据集:无穷的持续集成的数据集合. ②有界数据集:有限不会改变的数据集合. 很多现实中传统地认为有界或者批量的数据集合实际上

Apache Flink源码解析之stream-window

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析.本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下. Window 一个Window代表有限对象的集合.一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点--所有应该进入这个窗口的元素都已经到达. Flink的根窗口对象是一个抽象类,只提供了一个抽象方法: public abstract long maxTimes