Apache Flink源码解析之stream-windowfunction

Window也即窗口,是Flink流处理的特性之一。前一篇文章我们谈到了Winodw的相关概念及其实现。窗口的目的是将无界的流转换为有界的元素集合,但这还不是最终的目的,最终的目的是在这有限的集合上apply(应用)某种函数,这就是我们本篇要谈的主题——WindowFunction(窗口函数)。

那么窗口函数会在什么时候被应用呢?还记得上篇文章我们谈到了触发器Trigger,在触发器触发后会返回TriggerResult这个枚举类型的其中一个枚举值。当返回的是FIRE或者FIRE_AND_PURGE时,窗口函数就会在窗口上应用。

Flink中将窗口函数分为两种:

  • AllWindowFunction : 针对全局的不基于某个key进行分组的window的窗口函数的实现
  • WindowFunction : 针对基于某个key进行分组的window的窗口函数的实现

它们在类型继承体系中分属两个不同的体系:

但可以看到,针对这两个体系几乎都提供了相同功能的窗口函数的实现。

AllWindowFunction

所有不基于某个key进行分组的window的窗口函数的实现的基类。该接口是个泛型接口,需要指定三个泛型参数:

  • IN :input数据的类型
  • OUT :output对象的类型
  • W : 继承自Window,表示需要在其上应用该操作的Window的类型

该接口只有一个接口方法:

    void apply(W window, Iterable<IN> values, Collector<OUT> out) throws Exception;

该方法用于在window上的元素集合values进行计算,然后out出0个或多个值。

RichAllWindowFunction

抽象类,继承AbstractRichFunction以提供rich
的AllWindowFunction(AbstractRichFunction提供了open/close方法对以及获得运行时上下文对象的手段)。我们在之前解析SourceFunctionSinkFunction时多次看到这种实现模式。这里该类不提供任何实现。

ReduceIterableAllWindowFunction

ReduceIterableAllWindowFunction用于对其窗口内的所有元素迭代应用reduce操作并合并为一个元素,然后再发射出去。它接收ReduceFunction的实例,以提供reduce函数。

该类apply方法实现如下:

    public void apply(W window, Iterable<T> input, Collector<T> out) throws Exception {

        T curr = null;
        for (T val: input) {
            if (curr == null) {
                curr = val;
            } else {
                curr = reduceFunction.reduce(curr, val);
            }
        }
        out.collect(curr);
    }

reduceFunction#reduce方法,用于将第一个参数和第二个参数进行合并为一个元素。

ReduceApplyAllWindowFunction

ReduceApplyAllWindowFunction用于对窗口内的所有元素进行reduce操作后再进行调用apply。其构造器接收两个参数:

  • reduceFunction : 提供reduce操作的ReduceFunction
  • windowFunction :
    提供apply操作的AllWindowFunction,该参数用于对window中元素进行reduce之后产生的单个元素再进行最终的apply操作。

该类的apply实现如下:

public void apply(W window, Iterable<T> input, Collector<R> out) throws Exception {

        T curr = null;
        for (T val: input) {
            if (curr == null) {
                curr = val;
            } else {
                curr = reduceFunction.reduce(curr, val);
            }
        }
        windowFunction.apply(window, Collections.singletonList(curr), out);
    }

PassThroughAllWindowFunction

PassThroughAllWindowFunction该类仅仅提供passthrough功能,也即直接通过发射器将窗口内的元素迭代发射出去,除此之外不进行任何操作。

FoldApplyAllWindowFunction

FoldApplyAllWindowFunction用于对窗口中的数据先进行fold操作,得到一个最终合并的元素,再进行apply操作。因此它需要如下三个参数:

  • initialValue : 应用foldFunction的初始值
  • foldFunction :执行fold操作
  • windowFunction :对fold之后的最终值应用apply操作

该类继承自WrappingFunctionWrappingFunction类似于一个包装器,包装传进来的某个Function,给一些模式化方法(open/close)提供了一些便捷处理。

这里有一点需要区分一下,因为ReduceFunctionFoldFuction都具有将一组元素合并为单个元素的功能,所以他们看起来非常相似。不过他们还是有区别的,其中的一个区别就是,FoldFunction在进行fold操作的时候,还会进行潜在的类型转换。看下面的示例:

ReduceFunction<Integer> {

  public Integer reduce(Integer a, Integer b) { return a + b; }
}

[1, 2, 3, 4, 5] -> reduce()  means: ((((1 + 2) + 3) + 4) + 5) = 15
FoldFunction<String, Integer> {

  public String fold(String current, Integer i) { return current +
String.valueOf(i); }
}

[1, 2, 3, 4, 5] -> fold("start-")  means: ((((("start-" + 1) + 2) + 3) + 4)
+ 5) = "start-12345" (as a String)

WindowFunction

这是Flink的另一个基于key进行分组的WindowFunction。因此跟AllWindowFunction主要的不同的是,其泛型参数多了一个KEY,表示进行分组的key的类型。

同时其接口方法中也相应多了一个参数:

    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;

具体的实现跟AllWindowFunction的实现大同小异,不再多谈。

小结

本篇主要剖析了Flink提供的两种不同的窗口函数AllWindowFunction以及WindowFunction。并对Flink针对AllWindowFunction的实现进行了解读。

原文发布时间为:2016-05-12

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-09-14 18:17:59

Apache Flink源码解析之stream-windowfunction的相关文章

Apache Flink源码解析之stream-window

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析.本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下. Window 一个Window代表有限对象的集合.一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点--所有应该进入这个窗口的元素都已经到达. Flink的根窗口对象是一个抽象类,只提供了一个抽象方法: public abstract long maxTimes

Apache Flink源码解析之stream-source

今天我们来解读一下Flink stream里的source模块.它是整个stream的入口,也是我们了解其流处理体系的入口. SourceFunction SourceFunction是所有stream source的根接口. 它继承自一个标记接口(空接口)Function. SourceFunction定义了两个接口方法: run : 启动一个source,即对接一个外部数据源然后emit元素形成stream(大部分情况下会通过在该方法里运行一个while循环的形式来产生stream). ca

Apache Flink源码解析之stream-sink

上一篇我们谈论了Flink stream source,它作为流的数据入口是整个DAG(有向无环图)拓扑的起点.那么与此对应的,流的数据出口就是跟source对应的Sink.这是我们本篇解读的内容. SinkFunction 跟SourceFunction对应,Flink针对Sink的根接口被称为SinkFunction.继承自Function这一标记接口.SinkFunction接口只提供了一个方法: void invoke(IN value) throws Exception; 该方法提供基

Java集合学习(十二) TreeMap详细介绍(源码解析)和使用示例

这一章,我们对TreeMap进行学习. 第1部分 TreeMap介绍 TreeMap 简介 TreeMap 是一个有序的key-value集合,它是通过红黑树实现的. TreeMap继承于AbstractMap,所以它是一个Map,即一个key-value集合. TreeMap 实现了NavigableMap接口,意味着它支持一系列的导航方法.比如返回有序的key集合. TreeMap 实现了Cloneable接口,意味着它能被克隆. TreeMap 实现了java.io.Serializabl

Java 集合系列12之 TreeMap详细介绍(源码解析)和使用示例

概要 这一章,我们对TreeMap进行学习.我们先对TreeMap有个整体认识,然后再学习它的源码,最后再通过实例来学会使用TreeMap.内容包括:第1部分 TreeMap介绍第2部分 TreeMap数据结构第3部分 TreeMap源码解析(基于JDK1.6.0_45)第4部分 TreeMap遍历方式第5部分 TreeMap示例 转载请注明出处:http://www.cnblogs.com/skywang12345/admin/EditPosts.aspx?postid=3310928   第

Step.js 使用教程(附源码解析)

Step.js(https://github.com/creationix/step)是控制流程工具(大小仅 150 行代码),解决回调嵌套层次过多等问题.适用于读文件.查询数据库等回调函数相互依赖,或者分别获取内容最后组合数据返回等应用情景.异步执行简单地可以分为"串行执行"和"并行"执行,下面我们分别去看看. 串行执行 这个库只有一个方法 Step(fns...).Step 方法其参数 fns 允许是多个函数,这些函数被依次执行.Step 利用 this 对象指

Java集合学习(十七) TreeSet详细介绍(源码解析)和使用示例

这一章,我们对TreeSet进行学习. 我们先对TreeSet有个整体认识,然后再学习它的源码,最后再通过实例来学会使用TreeSet. 第1部分 TreeSet介绍 TreeSet简介 TreeSet 是一个有序的集合,它的作用是提供有序的Set集合.它继承于AbstractSet抽象类,实现了NavigableSet<E>, Cloneable, java.io.Serializable接口. TreeSet 继承于AbstractSet,所以它是一个Set集合,具有Set的属性和方法.

Java集合学习(十六) HashSet详细介绍(源码解析)和使用示例

这一章,我们对HashSet进行学习. 我们先对HashSet有个整体认识,然后再学习它的源码,最后再通过实例来学会使用HashSet. 第1部分 HashSet介绍 HashSet 简介 HashSet 是一个没有重复元素的集合. 它是由HashMap实现的,不保证元素的顺序,而且HashSet允许使用 null 元素. HashSet是非同步的.如果多个线程同时访问一个哈希 set,而其中至少一个线程修改了该 set,那么它必须 保持外部同步.这通常是通过对自然封装该 set 的对象执行同步

Java集合学习(十三) WeakHashMap详细介绍(源码解析)和使用示例

这一章,我们对WeakHashMap进行学习. 我们先对WeakHashMap有个整体认识,然后再学习它的源码,最后再通过实例来学会使用WeakHashMap. 第1部分 WeakHashMap介绍 WeakHashMap简介    WeakHashMap 继承于AbstractMap,实现了Map接口.    和HashMap一样,WeakHashMap 也是一个散列表,它存储的内容也是键值对(key-value)映射,而且键和值都可以是null.   不过WeakHashMap的键是"弱键&