Flink流处理之迭代API分析

IterativeStream

Flink在DataStream中也是通过一个特定的可迭代的流(IterativeStream)来构建相关的迭代处理逻辑,这一点跟DataSet提供的可迭代的数据集(IterativeDataSet)的是类似的。

IterativeStream继承自DataStream,因此DataStream支持的转换函数,在IterativeStream上同样可以调用。

IterativeStream的实例是通过DataStream的iterate方法创建的˙。iterate方法存在两个重载形式:一种是无参的,表示不限定最大等待时间;另一种提供一个长整型的maxWaitTimeMillis参数,允许用户指定等待反馈边的下一个输入元素的最大时间间隔。而迭代的关闭是通过调用IterativeStream的实例方法closeWith来实现的。

每一种数据流(DataStream)都会有与之对应的流转换(StreamTransformation)。IterativeStream对应的转换是FeedbackTransformation。

我们来看一下closeWith方法的实现:

public DataStream<T> closeWith(DataStream<T> feedbackStream) {
//基于需要反馈给迭代头的反馈流对象获取其所有前任的StreamTransformation对象,目的是为了下文的检查
Collection<StreamTransformation<?>> predecessors =
feedbackStream.getTransformation().getTransitivePredecessors();

//在反馈流的所有前任StreamTransformation对象集合中查找当前可迭代的流(迭代头)对应的转换对象是否在其中,
//如果不在,则抛出异常
if (!predecessors.contains(this.transformation)) {
throw new UnsupportedOperationException(
"Cannot close an iteration with a feedback DataStream "
+ " that does not originate from said iteration.");
}

//将反馈流对应的转换对象作为迭代头的反馈边
((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());

return feedbackStream;
}

解释一下上文代码中为什么要检查前任StreamTransformation对象的原因。我们结合上一篇的案例中的代码片段来看:

IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =
inputStream.map(new TupleTransformMapFunction()).iterate(5000);
DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =
iterativeStream.map(new FibonacciCalcStepFunction());
SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =
fibonacciStream.split(new FibonacciOverflowSelector());
iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG));

这里传递给closeWith的是branchedStream筛选出的数据流,而从branchedStream向上是能追溯到可迭代的流iterativeStream的,因此满足前任追溯到迭代头的条件。所以这里需要基于前任向上游追溯的原因是确保反馈流是的源头是来自迭代头(从而形成迭代这样一个闭环),而不是任意的某个流都可以作为反馈流。

另外,IterativeStream通过调用withFeedbackType方法还可以改变或者重新指定迭代反馈流的类型,从而形成一个跟最初的输入流组合而成的连接迭代流(ConnectedIterativeStreams),这一点也是批处理中的迭代所不具备的。示例代码如下:

DataStream<Integer> source = ...;
ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(Integer.class);

ConnectedIterativeStreams是ConnectedStreams的特例,它表示将迭代最初的输入与反馈边的输入进行连接所形成的ConnectedStreams。ConnectedIterativeStreams的构造器会要求指定反馈流的数据类型信息(TypeInformation),你可以为其指定Flink所支持的任意类型。ConnectedIterativeStreams对应的转换是CoFeedbackTransformation,我们在下面会顺带介绍。

当IterativeStream转变为双流连接而成的ConnectedIterativeStreams,转换也从FeedbackTransformation转变为CoFeedbackTransformation,因此ConnectedIterativeStreams也提供了自己的closeWith方法来将CoFeedbackTransformation添加为自己的反馈边。在实现上和IterativeStream是类似的,不再赘述。

FeedbackTransformation

迭代流(IterativeStream)对应的转换是反馈转换(FeedbackTransformation),它表示拓扑中的一个反馈点(也即迭代头)。一个反馈点包含一个输入边以及若干个反馈边,且Flink要求每个反馈边的并行度必须跟输入边的并行度一致,这一点在往该转换中加入反馈边时会进行校验。

这里并行度一致的原因是Flink将采用一种CoLocationGroup来优化迭代任务的子任务执行。当一组作业顶点(JobVertex,一个任务在JobGraph中的表示)被包含在同一个CoLocationGroup中的时候,这些JobVertex在运行时所对应的任务的第i个子任务必须运行在同一个TaskManager的JVM实例中。那么一个分布式的迭代作业,其迭代部分是就是并行度个执行体在并行执行,而每个执行体中的子任务都在位于同一个TaskManager的实例中多线程的形式并发地执行,其中还涉及到并发环境下的数据交换,后续会进行分析。

当IterativeStream对象被构造时,FeedbackTransformation的实例会被创建并传递给DataStream的构造方法:

protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
super(dataStream.getExecutionEnvironment(),
new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
this.originalInput = dataStream;
this.maxWaitTime = maxWaitTime;
setBufferTimeout(dataStream.environment.getBufferTimeout());
}

每一个流转换对象都要实现获得其前任转换对象集合的getTransitivePredecessors方法,FeedbackTransformation对该方法的实现如下:

public Collection<StreamTransformation<?>> getTransitivePredecessors() {
List<StreamTransformation<?>> result = Lists.newArrayList();
//先加入自身
result.add(this);
//加入其输入端的所有前任转换对象
result.addAll(input.getTransitivePredecessors());
return result;
}

在上面分析IterativeStream时,我们提过它可以转换为ConnectedIterativeStreams,ConnectedIterativeStreams对应的CoFeedbackTransformation这里我们也一并分析一下。CoFeedbackTransformation跟FeedbackTransformation一样都表示拓扑中的一个反馈点。对于CoFeedbackTransformation转换,它不要求反馈边元素的类型跟上游输入端元素的类型一致。因为上游流将会成为该转换的第一个输入,而反馈流将会成为该转换的第二个输入。因为两个流会在此连接,所以CoFeedbackTransformation后只允许跟TwoInputTransformations类型的转换。

CoFeedbackTransformation同样对输入端的并行度和反馈边的并行度有一定的限制,它也要求两者的并行度必须相等,但是它们的分区策略可以是不一致的。

Flink在创建ConnectedIterativeStreams流对象时,会用迭代流的初始输入来作为ConnectedIterativeStreams的第一个输入流,然后用CoFeedbackTransformation来构建参与连接的第二个流对象,这里可以指定跟迭代流类型不同的feedbackType作为第二个流的类型:

public ConnectedIterativeStreams(DataStream<I> input,
TypeInformation<F> feedbackType,
long waitTime) {
super(input.getExecutionEnvironment(),
input,
new DataStream<>(input.getExecutionEnvironment(),
new CoFeedbackTransformation<>(input.getParallelism(),
feedbackType,
waitTime)));
this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
}

这一篇我们介绍了IterativeStream以及ConnectedIterativeStreams所对应的转换对象,下一篇我们分析StreamGraph的迭代相关的内容时,将会剖析Flink如何将FeedbackTransformation转换为算子。

原文发布时间为:2016-12-04

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-10-24 12:53:21

Flink流处理之迭代API分析的相关文章

Flink流处理之迭代案例

当前Flink将迭代的重心集中在批处理上,之前我们谈及了批量迭代和增量迭代主要是针对批处理(DataSet)API而言的,并且Flink为批处理中的迭代提供了针对性的优化.但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显. 可迭代的流处理程序允许定义"步函数"(step function)并将其内嵌到一个可迭代的流(IterativeStream)中.因为

Flink流处理之迭代任务

前面我们分析过Flink对迭代在流图中的特殊处理,使得迭代中的反馈环得以转化为普通的DAG模型.这一篇我们将剖析运行时的流处理迭代任务的执行机制.这里涉及到两个任务类: StreamIterationHead:迭代头任务,它借助于反馈阻塞队列从迭代尾部接收参与下一次迭代的反馈数据. StreamIterationTail:迭代尾任务,它借助于阻塞队列作为反馈信道将下一次需要迭代的数据反馈给迭代头. 对于迭代流处理而言,随着任务(task)最终被并行化执行,它们的子任务(sub task,这些任务

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

Flink流处理迭代之化解反馈环

我们都知道Flink在可迭代的流处理中引入了反馈边来将本次迭代的结果反馈给迭代头以进行下一次迭代,这在执行拓扑中引入了环(反馈环).Flink主要应对的执行拓扑还是有向无环图(DAG),最终它选择了将反馈环进行化解使其能够适配有向无环图的结构,而如何对反馈环进行化解是我们这一篇主要探讨的话题. 任何提交给Flink执行的程序在提交之前都必须先生成作业图,对于用DataStream API编写的流处理程序在生成作业图之前,还会先生成流图.因此,如果想化解迭代产生的反馈环其时机只能是在部署执行之前的

Apache Flink流作业提交流程分析

用户编写的程序逻辑需要提交给Flink才能得到执行.本文来探讨一下客户程序如何提交给Flink.鉴于用户将自己利用Flink的API编写的逻辑打成相应的应用程序包(比如Jar)然后提交到一个目标Flink集群上去运行是比较主流的使用场景,因此我们的分析也基于这一场景进行. Flink的API针对不同的执行环境有不同的Environment对象,这里我们主要基于常用的RemoteStreamEnvironment和RemoteEnvironment进行分析 在前面我们谈到了Flink中实现了"惰性

Open API分析、实践和思索

SOA.SAAS.云计算等等热捧概念词汇层出不穷,也让很多开发者去重新审视未来的软件开发将会何去何从.而Open API的出现,其实已经给国外的互连网应用开发者带来了一种新的创新思维,一种新的开发模式,将SOA的信息互通的理念贯穿到整个互连网行业,让更多的"草根"开发者用创新思维将互联网信息的价值最大化. 对于国内的开发者来说,在SNS热潮中第一次接触了Open API,但这仅仅只是开始.SNS提供的API以及现有的一些分享类网站提供的API,仅仅只是Open API中的一角,所能给开

使用 Java 6 API分析源码

您可曾想过像 Checkstyle 或 FindBugs 这样的工具如何执行静态代码分析吗,或者像 NetBeans 或 Eclipse 这样的集成开发环境(Integrated Development Environments IDE)如何执行快速代码修复或 查找在代码中声明的字段的完全引用吗?在许多情况下,IDE 具有自己的 API 来解析源码并生成标准树 结构,称为 抽象语法树(Abstract Syntax Tree AST) 或"解析树",此树可用于对源码元素的进一步 分析.

espresso基础架构与API分析

Espresso测试框架提供了一组API来构建UI测试,以测试应用程序内的用户流. 这些API让您能够编写简洁,运行可靠的自动化UI测试. Espresso非常适合编写白盒式自动化测试,其中测试代码使用来自所测试的应用程序的实现代码细节. Espresso测试框架的主要功能包括: 用于在目标应用程序中查看和适配器匹配的灵活API. 有关详细信息,请参阅View matching. 一组广泛的操作API,用于自动化UI交互. 有关更多信息,请参阅 Action APIs. UI线程同步提高测试的可

通过管道进行线程间通信:字节流。字符流的用法及API类似

管道流(PipedStream)可以用于不同线程间直接传送数据.一个线程发送数据到输出管道,另一个线程从输入管道中读取数据.通过使用管道,实现不同线程间的通信,而无须借助于类似临时文件之类的东西. package thread.communicate; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; /*2015-11-19*/ public class