Flink批处理优化器之范围分区重写

为最终计划应用范围分区重写

Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

final DataSet<Tuple2<Integer, String>> ds = getTupleDataSet(env);
ds.partitionByRange(0).withOrders(Order.ASCENDING, Order.DESCENDING);

在使用范围分区这一特性时,需要尽可能保证各分区所处理的数据集均衡性以最大化利用计算资源并减少作业的执行时间。为此,优化器提供了范围分区重写器(RangePartitionRewriter)来对范围分区的分区策略进行优化,使其尽可能平均地分配数据,避免数据倾斜。要做到这一点需要对数据集的范围有足够的“了解”,RangePartitionRewriter通过对数据集进行采样来得到分区的范围。接下来我们就来分析RangePartitionRewriter的实现细节。

范围分区重写器

范围分区重写器(RangePartitionRewriter)同样遍历的是最终选择的计划并作用于计划节点(PlanNode),其主要用于在后置遍历时对传输策略为范围分区节点的输入端通道的连接情况进行重写,核心逻辑如下:

//提取当前所有的计划节点的输入通道
final Iterable<Channel> inputChannels = node.getInputs();
//遍历输入通道
for (Channel channel : inputChannels) {
    ShipStrategyType shipStrategy = channel.getShipStrategy();
    // 确保优化的通道的数据传输策略为范围分区
    if (shipStrategy == ShipStrategyType.PARTITION_RANGE) {

        if(channel.getDataDistribution() == null) {
            if (node.isOnDynamicPath()) {
                throw new InvalidProgramException("Range Partitioning not supported within iterations "
                    + " if users do not supply the data distribution.");
            }

            //对该通道的范围分区进行“重写”,并将当前通道从源计划节点的通道中删除,然后加入新的通道集合
            PlanNode channelSource = channel.getSource();
            List<Channel> newSourceOutputChannels = rewriteRangePartitionChannel(channel);
            channelSource.getOutgoingChannels().remove(channel);
            channelSource.getOutgoingChannels().addAll(newSourceOutputChannels);
        }
    }
}

上述代码段的关键在于对rewriteRangePartitionChannel方法的调用,它封装了对最终计划进行改写的逻辑,改写产生的逻辑Dataflow对比示意图如下:

由上图可见,改写后的逻辑dataflow被拆分成两个分支:上层分支主要完成的功能是采样跟构建范围边界,我们将其简称为“采样分支”;下层分支则用于对记录分区的索引进行查找、路由以及相关处理,可简称为“数据处理分支”。两分支之间有一个衔接关系在于:采样分支最终会输出“范围边界”,并将其以广播变量的形式传递给数据处理分支(见图中的虚线部分),数据处理分支将依据范围边界为来自source的记录查找该归属的分区编号。你可能会产生疑惑:依照这种表述来看,采样分支和数据处理分支是有前后的时序依赖关系的,而单纯的逻辑Dataflow中从source分拆的两个分支通常没有这种关系。那么Flink是如何保证该依赖关系的呢?答案在于数据处理分支的第一个channel,其数据交换模式(DataExchangeMode)被设置为Batch模式(见图中括号的标注,没有特别备注的数据交换模式默认都是Pipeline),Batch模式将数据生产者跟消费者解耦并使得它们不必时刻互相依赖(当数据都生产完成之后,消费者才消费),这也避免了数据处理分支开始处理数据流时还没有收到来自采样分支的范围边界广播变量。

具体而言,其核心流程可分解为如下六步:

  1. 为每个分区采样固定数目的记录作为样本;
  2. 让中央协调器从每个分区的样本中采样固定数目的样本作为最终的样本;
  3. 基于最终样本数据构建范围边界;
  4. 将范围边界作为广播变量传递同时为每个记录构建<分区编号,记录>的二元组并输出然后以自定义分区来分区记录;
  5. 找到记录的分区之后,分区编号就没有存在的意义了,因此为流中的记录移除分区编号;
  6. 连接目标节点

关于采样算法的细节我们将会在下一小节专门进行分析,因此这里我们先假设已采样完成并从广播变量中得到了范围边界。接下来我们来分析数据处理分支的核心逻辑。当记录到来后需要确定它要落到哪个分区,这需要对范围边界集合进行查找并定位分区编号,优化器提供了一个RangeBoundaries接口,其定义了一个方法来提供该功能:

int getRangeIndex(T record);

其通用实现CommonRangeBoundaries使用二分查找来实现该方法:

public int getRangeIndex(T record) {
    return binarySearch(record);
}

CommonRangeBoundaries将会被应用在一个名为AssignRangeIndex的UDF(扩展自:RichMapPartitionFunction)中。AssignRangeIndex首先获取“范围边界”这一广播变量,然后构建CommonRangeBoundaries的实例,随之遍历当前聚集的分区数据并一一查找其分区编号以构建二元组,然后输出到下游,代码如下:

public void mapPartition(Iterable<IN> values, Collector<Tuple2<Integer, IN>> out) throws Exception {
    List<Object> broadcastVariable = getRuntimeContext().getBroadcastVariable("RangeBoundaries");
    if (broadcastVariable == null || broadcastVariable.size() != 1) {
        throw new RuntimeException("AssignRangeIndex require a single RangeBoundaries as broadcast input.");
    }
    Object[][] boundaryObjects = (Object[][]) broadcastVariable.get(0);
    RangeBoundaries rangeBoundaries = new CommonRangeBoundaries(typeComparator.createComparator(),
        boundaryObjects);

    Tuple2<Integer, IN> tupleWithPartitionId = new Tuple2<>();

    for (IN record : values) {
        tupleWithPartitionId.f0 = rangeBoundaries.getRangeIndex(record);
        tupleWithPartitionId.f1 = record;
        out.collect(tupleWithPartitionId);
    }
}

以AssignRangeIndex构建的运算符所产生的计划节点连接着自定义的分区器来对为记录路由到指定的分区:

//以下标为0的字段(也即上面查找到的分区索引)作为分区依据
final FieldList keys = new FieldList(0);
partChannel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, keys,
    idPartitioner, DataExchangeMode.PIPELINED);

当记录(此时已是上面的二元组了)被路由到正确的分区之后,分区编号已没有用了,不需要再往下游传输了,优化器又定义了一个名为RemoveRangeIndex的UDF来移除分区编号,具体的做法是只输出二元组里下标为1的字段。最终将以RemoveRangeIndex构建的运算符所生成的计划节点替换通道原先的source节点并使得其与target节点进行连接。

原文发布时间为:2017-04-09

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-08-01 22:30:50

Flink批处理优化器之范围分区重写的相关文章

Flink批处理优化器之范围分区重写采用算法

采样算法 上一篇我们分析了RangePartitionRewriter的数据处理分支,接下来我们开始分析采样分支,采样分支的核心在于采样算法.因为范围分区输入端每个分区的数据量无从得知,也就是说我们无法得出采样比例.此时,如果先对每分区内的所有数据进行遍历,再记录出数据总量会显得很低效,因此Flink选择借助于水塘抽样算法(https://en.wikipedia.org/wiki/Reservoir_sampling)来解决这个问题. 水塘抽样法是一种在线抽样法,可以在不知道样本总量或因样本数

浅谈Flink批处理优化器之Join优化

跟传统的关系型数据库类似,Flink提供了优化器"hint"(提示)以告诉优化器选择一些执行策略.目前优化提示主要针对批处理中的连接(join).在批处理中共有三个跟连接有关的转换函数: join:默认为等值连接(Equi-join),维基百科将其归类为内连接(inner join)的一种 https://en.wikipedia.org/wiki/Join_(SQL): outerJoin:外连接,具体细分为left-outer join.right-outer join.full-

Flink批处理优化器之数据属性

在一段时间之前我们已介绍过IP(Interesting Property)对于优化器的意义以及它将对优化器的优化决策产生的影响.本篇我们将介绍Flink的批处理优化器中涉及到的所有的IP,我们将其统称为数据属性.后续我们会介绍Flink如何为优化器节点计算IP,并在之后的"剪枝"(pruning)阶段发挥作用. 数据属性 数据属性是个统称,来自于Flink优化器模块定义的子包名:dataproperties,需要注意的是这里属性的含义不是代码实现层面上类里的属性,而是指代对优化器的优化

Flink批处理优化器之成本估算

成本估算 在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成.在Flink中成本估算依赖于每个不同的运算符所提供的自己的"预算",本篇我们将分析什么是成本.运算符如何提供自己的预算以及如何基于预算估算成本. 什么是成本 Flink以类Costs来定义成本,它封装了一些成本估算的因素同时提供了一些针对成本对象的计算方法(加.减.乘.除)以及对这些因素未知值的认定与校验. "cost"一词也有译作:开销.代价,将其视为同义即可. Flink当前将成本估算

Flink运行时之结果分区消费端

结果分区消费端 在前一篇,我们讲解了生产者分区,生产者分区是生产者任务生产中间结果数据的过程.消费者任务在获得结果分区可用的通知之后,会发起对数据的请求.我们仍然以生产者分区的例子作为假设,其在消费端示意图如下: 可以看到在生产端和消费端存在对等的模型,具体ResultSubpartition中的数据如何被消费,我们将在本篇进行深入剖析. 输入网关 输入网关(InputGate)用于消费中间结果(IntermediateResult)在并行执行时由子任务生产的一个或多个结果分区(ResultPa

Sql Server性能优化Partition(管理分区)

在企业管理器中,虽然有"管理分区"的菜单,里面的内容却可能与你的预想不同,这里并没有提供直接对分区进行操作的方法,所以一些普通的操作,比如"增加分区"."删除分区"之类的操作就需要通过脚本实现了. 增加分区(Split Partition) "增加分区"事实上就是将现有的分区分割开,基于此,在SQL Server中应用的是Split操作.在分离分区的时候,不仅仅要在Partition Function上指定分割的分界点,同样需

Flink运行时之生成作业图

生成作业图 在分析完了流处理程序生成的流图(StreamGraph)以及批处理程序生成的优化后的计划(OptimizedPlan)之后,下一步就是生成它们面向Flink运行时执行引擎的共同抽象--作业图(JobGraph). 什么是作业图 作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一. 相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已

Apache Flink Client生成StreamGraph

概述 上文我们分析提交流程时,RemoteStreamEnvironment类的execute方法的第一步就是生成StreamGraph. StreamGraph是用于表示流的拓扑结构的数据结构,它包含了生成JobGraph的必要信息.它的类继承关系图如下: 如果你按照StreamGraph的继承链向上追溯,最终会发现它实现了接口FlinkPlan.Flink在这里效仿的是数据库的执行SQL是产生执行计划的机制,FlinkPlan定义在Flink的优化器相关的包中,针对流应用的计划是Stream

基于Apache Flink的实时计算引擎Blink在阿里搜索中的应用

阿里巴巴是世界上最大的电子商务零售商. 我们在2015年的年销售额总计3940亿美元,超过eBay和亚马逊之和.阿里巴巴搜索(个性化搜索和推荐平台)是客户的关键入口,并承载了大部分在线收入,因此搜索基础架构团队需要不断探索新技术来改进产品. 在电子商务网站应用场景中,什么能造就一个强大的搜索引擎?答案就是尽可能的为每个用户提供实时相关和准确的结果.同样一个不容忽视的问题就是阿里巴巴的规模,当前很难找到能够适合我们的技术. Apache Flink就是一种这样的技术,阿里巴巴正在使用基于Flink