Flink流处理之迭代任务

前面我们分析过Flink对迭代在流图中的特殊处理,使得迭代中的反馈环得以转化为普通的DAG模型。这一篇我们将剖析运行时的流处理迭代任务的执行机制。这里涉及到两个任务类:

  • StreamIterationHead:迭代头任务,它借助于反馈阻塞队列从迭代尾部接收参与下一次迭代的反馈数据。
  • StreamIterationTail:迭代尾任务,它借助于阻塞队列作为反馈信道将下一次需要迭代的数据反馈给迭代头。

对于迭代流处理而言,随着任务(task)最终被并行化执行,它们的子任务(sub
task,这些任务在计算节点中的实例)的并行度要求一致,并且迭代头的子任务与迭代尾的子任务必须成对且处于同一个CoLocationGroup中执行,这些都反映了流处理的迭代任务的特殊性,可以认为是一种并发与并行兼具的模式。

StreamIterationHead和StreamIterationTail之间的交互依赖于阻塞队列代理(BlockingQueueBroker)。Flink设计了一种称之为Broker的数据结构,专门用于对迭代进行并发控制,而BlockingQueueBroker是Broker针对BlockingQueue类型的特定实现。因为Broker内部的核心数据结构就是ConcurrentMap<String,
BlockingQueue<V>>类型,所以BlockingQueueBroker对应的类型即为:ConcurrentMap<String,
BlockingQueue<BlockingQueue>>其图示如下:

迭代头任务跟迭代尾任务之间真正交互的数据结构是容量为1的阻塞队列:

final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);

它会被加入到BlockingQueueBroker内部的ConcurrentMap中去,对应的键是brokerID。brokerID生成规则如下:

public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) {
    return jid + "-" + iterationID + "-" + subtaskIndex;
}

BlockingQueueBroker是单例的,因为对应的迭代头子任务和迭代尾子任务会生成相同的brokerID,所以两者在同一个JVM中会基于相同的dataChannel进行通信。dataChannel由迭代头创建并递交给BlockingQueueBroker:

BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);

由迭代尾获取:

BlockingQueue<StreamRecord<IN>> dataChannel =
    (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);

数据交换是基于dataChannel的,由迭代尾负责生产:

public void collect(StreamRecord<IN> record) {
    try {
        if (shouldWait) {
            dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
        } else {
            dataChannel.put(record);
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

由迭代头负责消费:

while (running) {
    StreamRecord<OUT> nextRecord = shouldWait ?
        dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :
        dataChannel.take();

    if (nextRecord != null) {
        for (RecordWriterOutput<OUT> output : outputs) {
            output.collect(nextRecord);
        }
    } else {
        // done
        break;
    }
}

因此,迭代头跟迭代尾的子任务之间通过反馈信道进行迭代数据处理的大致机制如下图:

相信分析到这里,我们应该明白了为什么迭代头和迭代尾的并行度必须一致,且处于相同的CoLocationGroup中。Flink在执行拓扑中巧妙地化解了反馈环,从而使其适应于DAG计算模型,但却在同一个TaskManager的迭代头和迭代尾对应的子任务中借助于阻塞队列的生产者和消费者模型重塑了反馈“环”。

原文发布时间为:2016-12-12

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-08-01 21:45:40

Flink流处理之迭代任务的相关文章

Flink流处理之迭代案例

当前Flink将迭代的重心集中在批处理上,之前我们谈及了批量迭代和增量迭代主要是针对批处理(DataSet)API而言的,并且Flink为批处理中的迭代提供了针对性的优化.但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显. 可迭代的流处理程序允许定义"步函数"(step function)并将其内嵌到一个可迭代的流(IterativeStream)中.因为

Flink流处理之迭代API分析

IterativeStream Flink在DataStream中也是通过一个特定的可迭代的流(IterativeStream)来构建相关的迭代处理逻辑,这一点跟DataSet提供的可迭代的数据集(IterativeDataSet)的是类似的. IterativeStream继承自DataStream,因此DataStream支持的转换函数,在IterativeStream上同样可以调用. IterativeStream的实例是通过DataStream的iterate方法创建的˙.iterate

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

Flink流处理迭代之化解反馈环

我们都知道Flink在可迭代的流处理中引入了反馈边来将本次迭代的结果反馈给迭代头以进行下一次迭代,这在执行拓扑中引入了环(反馈环).Flink主要应对的执行拓扑还是有向无环图(DAG),最终它选择了将反馈环进行化解使其能够适配有向无环图的结构,而如何对反馈环进行化解是我们这一篇主要探讨的话题. 任何提交给Flink执行的程序在提交之前都必须先生成作业图,对于用DataStream API编写的流处理程序在生成作业图之前,还会先生成流图.因此,如果想化解迭代产生的反馈环其时机只能是在部署执行之前的

Apache Flink流作业提交流程分析

用户编写的程序逻辑需要提交给Flink才能得到执行.本文来探讨一下客户程序如何提交给Flink.鉴于用户将自己利用Flink的API编写的逻辑打成相应的应用程序包(比如Jar)然后提交到一个目标Flink集群上去运行是比较主流的使用场景,因此我们的分析也基于这一场景进行. Flink的API针对不同的执行环境有不同的Environment对象,这里我们主要基于常用的RemoteStreamEnvironment和RemoteEnvironment进行分析 在前面我们谈到了Flink中实现了"惰性

Apache Flink源码解析之stream-source

今天我们来解读一下Flink stream里的source模块.它是整个stream的入口,也是我们了解其流处理体系的入口. SourceFunction SourceFunction是所有stream source的根接口. 它继承自一个标记接口(空接口)Function. SourceFunction定义了两个接口方法: run : 启动一个source,即对接一个外部数据源然后emit元素形成stream(大部分情况下会通过在该方法里运行一个while循环的形式来产生stream). ca

Apache Flink源码解析之stream-windowfunction

Window也即窗口,是Flink流处理的特性之一.前一篇文章我们谈到了Winodw的相关概念及其实现.窗口的目的是将无界的流转换为有界的元素集合,但这还不是最终的目的,最终的目的是在这有限的集合上apply(应用)某种函数,这就是我们本篇要谈的主题--WindowFunction(窗口函数). 那么窗口函数会在什么时候被应用呢?还记得上篇文章我们谈到了触发器Trigger,在触发器触发后会返回TriggerResult这个枚举类型的其中一个枚举值.当返回的是FIRE或者FIRE_AND_PUR

深入理解Apache Flink核心技术

Apache Flink(下简称Flink)项目是大数据处理领域最近冉冉升起的一颗新星,其不同于其他大数据项目的诸多特性吸引了越来越多人的关注.本文将深入分析Flink的一些关键技术与特性,希望能够帮助读者对Flink有更加深入的了解,对其他大数据系统开发者也能有所裨益.本文假设读者已对MapReduce.Spark及Storm等大数据处理框架有所了解,同时熟悉流处理与批处理的基本概念. Flink简介 Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布.数据通信以

深入理解Flink核心技术

Flink项目是大数据处理领域最近冉冉升起的一颗新星,其不同于其他大数据项目的诸多特性吸引了越来越多的人关注Flink项目.本文将深入分析Flink一些关键的技术与特性,希望能够帮助读者对Flink有更加深入的了解,对其他大数据系统的开发者也能有所裨益. 注:本文假设读者对MapReduce,Spark及Storm等大数据处理系统有基本了解,同时熟悉流处理与批处理的基本概念.36大数据(http://www.36dsj.com/) Flink简介 Flink的核心是一个流式的数据流执行引擎,其针