Flink 原理与实现:Session Window

上一篇文章:Window机制中,我们介绍了窗口的概念和底层实现,以及 Flink 一些内建的窗口,包括滑动窗口、翻滚窗口。本文将深入讲解一种较为特殊的窗口:会话窗口(session window)。建议您在阅读完上一篇文章的基础上再阅读本文。

当我们需要分析用户的一段交互的行为事件时,通常的想法是将用户的事件流按照“session”来分组。session 是指一段持续活跃的期间,由活跃间隙分隔开。通俗一点说,消息之间的间隔小于超时阈值(sessionGap)的,则被分配到同一个窗口,间隔大于阈值的,则被分配到不同的窗口。目前开源领域大部分的流计算引擎都有窗口的概念,但是没有对 session window 的支持,要实现 session window,需要用户自己去做完大部分事情。而当 Flink 1.1.0 版本正式发布时,Flink 将会是开源流计算领域第一个内建支持 session window 的引擎。

在 Flink 1.1.0 之前,Flink 也可以通过自定义的window assigner和trigger来实现一个基本能用的session window。release-1.0 版本中提供了一个实现 session window 的 example:SessionWindowing。这个session window范例的实现原理是,基于GlobleWindow这个window assigner,将所有元素都分配到同一个窗口中,然后指定一个自定义的trigger来触发执行窗口。这个trigger的触发机制是,对于每个到达的元素都会根据其时间戳(timestamp)注册一个会话超时的定时器(timestamp+sessionTimeout),并移除上一次注册的定时器。最新一个元素到达后,如果超过 sessionTimeout 的时间还没有新元素到达,那么trigger就会触发,当前窗口就会是一个session window。处理完窗口后,窗口中的数据会清空,用来缓存下一个session window的数据。

但是这种session window的实现是非常弱的,无法应用到实际生产环境中的。因为它无法处理乱序 event time 的消息。 而在即将到来的 Flink 1.1.0 版本中,Flink 提供了对 session window 的直接支持,用户可以通过SessionWindows.withGap()来轻松地定义 session widnow,而且能够处理乱序消息。Flink 对 session window 的支持主要借鉴自 Google 的 DataFlow 。

Session Window in Flink

假设有这么个场景,用户点开手机淘宝后会进行一系列的操作(点击、浏览、搜索、购买、切换tab等),这些操作以及对应发生的时间都会发送到服务器上进行用户行为分析。那么用户的操作行为流的样例可能会长下面这样:

通过上图,我们可以很直观地观察到,用户的行为是一段一段的,每一段内的行为都是连续紧凑的,段内行为的关联度要远大于段之间行为的关联度。我们把每一段用户行为称之为“session”,段之间的空档我们称之为“session gap”。所以,理所当然地,我们应该按照 session window 对用户的行为流进行切分,并计算每个session的结果。如下图所示:

为了定义上述的窗口切分规则,我们可以使用 Flink 提供的 SessionWindows 这个 widnow assigner API。如果你用过 SlidingEventTimeWindowsTumlingProcessingTimeWindows等,你会对这个很熟悉。

DataStream input = …
DataStream result = input
  .keyBy(<key selector>)
  .window(SessionWindows.withGap(Time.seconds(<seconds>))
  .apply(<window function>) // or reduce() or fold()

这样,Flink 就会基于元素的时间戳,自动地将元素放到不同的session window中。如果两个元素的时间戳间隔小于 session gap,则会在同一个session中。如果两个元素之间的间隔大于session gap,且没有元素能够填补上这个gap,那么它们会被放到不同的session中。

底层实现

为了实现 session window,我们需要扩展 Flink 中的窗口机制,使得能够支持窗口合并。要理解其原因,我们需要先了解窗口的现状。在上一篇文章中,我们谈到了 Flink 中 WindowAssigner 负责将元素分配到哪个/哪些窗口中去,Trigger 决定了一个窗口何时能够被计算或清除。当元素被分配到窗口之后,这些窗口是固定的不会改变的,而且窗口之间不会相互作用。

对于session window来说,我们需要窗口变得更灵活。基本的思想是这样的:SessionWindows assigner 会为每个进入的元素分配一个窗口,该窗口以元素的时间戳作为起始点,时间戳加会话超时时间为结束点,也就是该窗口为[timestamp, timestamp+sessionGap)。比如我们现在到了两个元素,它们被分配到两个独立的窗口中,两个窗口目前不相交,如图:

当第三个元素进入时,分配到的窗口与现有的两个窗口发生了叠加,情况变成了这样:

由于我们支持了窗口的合并,WindowAssigner可以合并这些窗口。它会遍历现有的窗口,并告诉系统哪些窗口需要合并成新的窗口。Flink 会将这些窗口进行合并,合并的主要内容有两部分:

  1. 需要合并的窗口的底层状态的合并(也就是窗口中缓存的数据,或者对于聚合窗口来说是一个聚合值)
  2. 需要合并的窗口的Trigger的合并(比如对于EventTime来说,会删除旧窗口注册的定时器,并注册新窗口的定时器)

总之,结果是三个元素现在在同一个窗口中了:

需要注意的是,对于每一个新进入的元素,都会分配一个属于该元素的窗口,都会检查并合并现有的窗口。在触发窗口计算之前,每一次都会检查该窗口是否可以和其他窗口合并,直到trigger触发后,会将该窗口从窗口列表中移除。对于 event time 来说,窗口的触发是要等到大于窗口结束时间的 watermark 到达,当watermark没有到,窗口会一直缓存着。所以基于这种机制,可以做到对乱序消息的支持。

这里有一个优化点可以做,因为每一个新进入的元素都会创建属于该元素的窗口,然后合并。如果新元素连续不断地进来,并且新元素的窗口一直都是可以和之前的窗口重叠合并的,那么其实这里多了很多不必要的创建窗口、合并窗口的操作,我们可以直接将新元素放到那个已存在的窗口,然后扩展该窗口的大小,看起来就像和新元素的窗口合并了一样。

源码分析

FLINK-3174 这个JIRA中有对 Flink 如何支持 session window 的详细说明,以及代码更新。建议可以结合该 PR 的代码来理解本文讨论的实现原理。

为了扩展 Flink 中的窗口机制,使得能够支持窗口合并,首先 window assigner 要能合并现有的窗口,Flink 增加了一个新的抽象类 MergingWindowAssigner 继承自 WindowAssigner,这里面主要多了一个 mergeWindows 的方法,用来决定哪些窗口是可以合并的。

public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
    private static final long serialVersionUID = 1L;

    /**
     * 决定哪些窗口需要被合并。对于每组需要合并的窗口, 都会调用 callback.merge(toBeMerged, mergeResult)
     *
     * @param windows 现存的窗口集合 The window candidates.
     * @param callback 需要被合并的窗口会回调 callback.merge 方法
     */
    public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);

    public interface MergeCallback<W> {

        /**
         * 用来声明合并窗口的具体动作(合并窗口底层状态、合并窗口trigger等)。
         *
         * @param toBeMerged  需要被合并的窗口列表
         * @param mergeResult 合并后的窗口
         */
        void merge(Collection<W> toBeMerged, W mergeResult);
    }
}

所有已经存在的 assigner 都继承自 WindowAssigner,只有新加入的 session window assigner 继承自 MergingWindowAssigner,如:ProcessingTimeSessionWindowsEventTimeSessionWindows

另外,Trigger 也需要能支持对合并窗口后的响应,所以 Trigger 添加了一个新的接口 onMerge(W window, OnMergeContext ctx),用来响应发生窗口合并之后对trigger的相关动作,比如根据合并后的窗口注册新的 event time 定时器。

OK,接下来我们看下最核心的代码,也就是对于每个进入的元素的处理,代码位于WindowOperator.processElement方法中,如下所示:

public void processElement(StreamRecord<IN> element) throws Exception {
    Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
    final K key = (K) getStateBackend().getCurrentKey();
    if (windowAssigner instanceof MergingWindowAssigner) {
        // 对于session window 的特殊处理,我们只关注该条件块内的代码
        MergingWindowSet<W> mergingWindows = getMergingWindowSet();

        for (W window: elementWindows) {
            final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);

            // 加入新窗口, 如果没有合并发生,那么actualWindow就是新加入的窗口
            // 如果有合并发生, 那么返回的actualWindow即为合并后的窗口,
            // 并且会调用 MergeFunction.merge 方法, 这里方法中的内容主要是更新trigger, 合并旧窗口中的状态到新窗口中
            W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                @Override
                public void merge(W mergeResult,
                        Collection<W> mergedWindows, W stateWindowResult,
                        Collection<W> mergedStateWindows) throws Exception {
                    context.key = key;
                    context.window = mergeResult;

                    // 这里面会根据新窗口的结束时间注册新的定时器
                    mergeTriggerResult.f0 = context.onMerge(mergedWindows);

                    // 删除旧窗口注册的定时器
                    for (W m: mergedWindows) {
                        context.window = m;
                        context.clear();
                    }

                    // 合并旧窗口(mergedStateWindows)中的状态到新窗口(stateWindowResult)中
                    getStateBackend().mergePartitionedStates(stateWindowResult,
                            mergedStateWindows,
                            windowSerializer,
                            (StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);
                }
            });

            // 取 actualWindow 对应的用来存状态的窗口
            W stateWindow = mergingWindows.getStateWindow(actualWindow);
            // 从状态后端拿出对应的状态
            AppendingState<IN, ACC> windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
            // 将新进入的元素数据加入到新窗口(或者说合并后的窗口)中对应的状态中
            windowState.add(element.getValue());

            context.key = key;
            context.window = actualWindow;

            // 检查是否需要fire or purge
            TriggerResult triggerResult = context.onElement(element);

            TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);

            // 根据trigger结果决定怎么处理窗口中的数据
            processTriggerResult(combinedTriggerResult, actualWindow);
        }

    } else {
        // 对于普通window assigner的处理, 这里我们不关注
        for (W window: elementWindows) {

            AppendingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
                    windowStateDescriptor);

            windowState.add(element.getValue());

            context.key = key;
            context.window = window;
            TriggerResult triggerResult = context.onElement(element);

            processTriggerResult(triggerResult, window);
        }
    }
}

其实这段代码写的并不是很clean,并且不是很好理解。在第六行中有用到MergingWindowSet,这个类很重要所以我们先介绍它。这是一个用来跟踪窗口合并的类。比如我们有A、B、C三个窗口需要合并,合并后的窗口为D窗口。这三个窗口在底层都有对应的状态集合,为了避免代价高昂的状态替换(创建新状态是很昂贵的),我们保持其中一个窗口作为原始的状态窗口,其他几个窗口的数据合并到该状态窗口中去,比如随机选择B作为状态窗口,那么A和C窗口中的数据需要合并到B窗口中去。这样就没有新状态产生了,但是我们需要额外维护窗口与状态窗口之间的映射关系(D->B),这就是MergingWindowSet负责的工作。这个映射关系需要在失败重启后能够恢复,所以MergingWindowSet内部也是对该映射关系做了容错。状态合并的工作示意图如下所示:

然后我们来解释下processElement的代码,首先根据window assigner为新进入的元素分配窗口集合。接着进入第一个条件块,取出当前的MergingWindowSet。对于每个分配到的窗口,我们就会将其加入到MergingWindowSet中(addWindow方法),由MergingWindowSet维护窗口与状态窗口之间的关系,并在需要窗口合并的时候,合并状态和trigger。然后根据映射关系,取出结果窗口对应的状态窗口,根据状态窗口取出对应的状态。将新进入的元素数据加入到该状态中。最后,根据trigger结果来对窗口数据进行处理,对于session window来说,这里都是不进行任何处理的。真正对窗口处理是由定时器超时后对完成的窗口调用processTriggerResult

总结

本文在上一篇文章:Window机制的基础上,深入讲解了 Flink 是如何支持 session window 的,核心的原理是窗口的合并。Flink 对于 session window 的支持很大程度上受到了 Google DataFlow 的启发,所以也建议阅读下 DataFlow 的论文。

参考资料

时间: 2024-10-02 04:12:02

Flink 原理与实现:Session Window的相关文章

Flink原理与实现:Window的实现原理

Flink原理与实现系列文章: Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraphFlink原理与实现:如何生成ExecutionGraph及物理执行图Flink原理与实现:Operator Chain原理Flink原理与实现:详解Flink中的状态管理 在阅读本文之前,请先阅读Flink 原理与实现:Window机制,这篇文章从用户的角度,对Window做了比较详细的分析,而本文主要是从Flink框架

Flink 原理与实现:Window 机制

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.而窗口(window)就是从 Streaming 到 Batch 的一个桥梁.Flink 提供了非常完善的窗口机制,这是我认为的 Flink 最大的亮点之一(其他的亮点包括消息乱序处理,和 checkpoint 机制).本文我们将介绍流式处理中的窗口概念,介绍 Flink 内建的一些窗口和 Window API,最后讨论下窗口在底层是如何实现的. 什么是 Win

Flink原理与实现:详解Flink中的状态管理

Flink原理与实现系列文章 : Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraphFlink原理与实现:如何生成ExecutionGraph及物理执行图Flink原理与实现:Operator Chain原理 上面Flink原理与实现的文章中,有引用word count的例子,但是都没有包含状态管理.也就是说,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算.从

Flink原理与实现:如何生成ExecutionGraph及物理执行图

阅读本文之前,请先阅读Flink原理与实现系列前面的几篇文章 : Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraph ExecutionGraph生成过程 StreamGraph和JobGraph都是在client生成的,这篇文章将描述如何生成ExecutionGraph以及物理执行图.同时会讲解一个作业提交后如何被调度和执行. client生成JobGraph之后,就通过submitJob提交至Job

Flink 原理与实现:理解 Flink 中的计算资源

本文所讨论的计算资源是指用来执行 Task 的资源,是一个逻辑概念.本文会介绍 Flink 计算资源相关的一些核心概念,如:Slot.SlotSharingGroup.CoLocationGroup.Chain等.并会着重讨论 Flink 如何对计算资源进行管理和隔离,如何将计算资源利用率最大化等等.理解 Flink 中的计算资源对于理解 Job 如何在集群中运行的有很大的帮助,也有利于我们更透彻地理解 Flink 原理,更快速地定位问题. Operator Chains 为了更高效地分布式执行

Flink 原理与实现:如何处理反压问题

流处理系统需要能优雅地处理反压(backpressure)问题.反压通常产生于这样的场景:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率.许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或者遇到大促或秒杀活动导致流量陡增.反压如果不能得到正确的处理,可能会导致资源耗尽甚至系统崩溃. 目前主流的流处理系统 Storm/JStorm/Spark Streaming/Flink 都已经提供了反压机制,不过其实现各不相同. Storm 是通过监控 Bolt 中的接收队

Flink 原理与实现:内存管理

如今,大数据领域的开源框架(Hadoop,Spark,Storm)都使用的 JVM,当然也包括 Flink.基于 JVM 的数据分析引擎都需要面对将大量数据存到内存中,这就不得不面对 JVM 存在的几个问题: Java 对象存储密度低.一个只包含 boolean 属性的对象占用了16个字节内存:对象头占了8个,boolean 属性占了1个,对齐填充占了7个.而实际上只需要一个bit(1/8字节)就够了. Full GC 会极大地影响性能,尤其是为了处理更大数据而开了很大内存空间的JVM来说,GC

Flink 原理与实现:架构和拓扑概览

架构 要了解一个系统,一般都是从架构开始.我们关心的问题是:系统部署成功后各个节点都启动了哪些服务,各个服务之间又是怎么交互和协调的.下方是 Flink 集群启动后架构图. 当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager.由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager.TaskManager

【资料合集】Apache Flink 精选PDF下载

Apache Flink是一款分布式.高性能的开源流式处理框架,在2015年1月12日,Apache Flink正式成为Apache顶级项目.目前Flink在阿里巴巴.Bouygues Teleccom.Capital One等公司得到应用,如阿里巴巴对Apache Flink的应用案例. 为了更好地让大家了解和使用Apache Flink,我们收集了25+个Flink相关的演讲PDF(资料来自Apache Flink官网推荐)和相关文章,供大家参考. PDF下载 Robert Metzger: