Flink - watermark

watermark,只有在有window的情况下才用到,所以在window operator前加上assignTimestampsAndWatermarks即可

不一定需要从source发出

 

1. 首先,source可以发出watermark

我们就看看kafka source的实现

    protected AbstractFetcher(
            SourceContext<T> sourceContext,
            List<KafkaTopicPartition> assignedPartitions,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,  //在创建KafkaConsumer的时候assignTimestampsAndWatermarks
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            ProcessingTimeService processingTimeProvider,
            long autoWatermarkInterval,  //env.getConfig().setAutoWatermarkInterval()
            ClassLoader userCodeClassLoader,
            boolean useMetrics) throws Exception
    {
        //判断watermark的类型
        if (watermarksPeriodic == null) {
            if (watermarksPunctuated == null) {
                // simple case, no watermarks involved
                timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
            } else {
                timestampWatermarkMode = PUNCTUATED_WATERMARKS;
            }
        } else {
            if (watermarksPunctuated == null) {
                timestampWatermarkMode = PERIODIC_WATERMARKS;
            } else {
                throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
            }
        }

        // create our partition state according to the timestamp/watermark mode
        this.allPartitions = initializePartitions(
                assignedPartitions,
                timestampWatermarkMode,
                watermarksPeriodic, watermarksPunctuated,
                userCodeClassLoader);

        // if we have periodic watermarks, kick off the interval scheduler
        if (timestampWatermarkMode == PERIODIC_WATERMARKS) { //如果是定期发出WaterMark
            KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] parts =
                    (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;

            PeriodicWatermarkEmitter periodicEmitter=
                    new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
            periodicEmitter.start();
        }
    }

 

FlinkKafkaConsumerBase

    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
        checkNotNull(assigner);

        if (this.punctuatedWatermarkAssigner != null) {
            throw new IllegalStateException("A punctuated watermark emitter has already been set.");
        }
        try {
            ClosureCleaner.clean(assigner, true);
            this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
            return this;
        } catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

这个接口的核心函数,定义,如何提取Timestamp和生成Watermark的逻辑

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
    Watermark getCurrentWatermark();
}
public interface TimestampAssigner<T> extends Function {
    long extractTimestamp(T element, long previousElementTimestamp);
}

如果在初始化KafkaConsumer的时候,没有assignTimestampsAndWatermarks,就不会产生watermark

 

可以看到watermark有两种,

PERIODIC_WATERMARKS,定期发送的watermark

PUNCTUATED_WATERMARKS,由element触发的watermark,比如有element的特征或某种类型的element来表示触发watermark,这样便于开发者来控制watermark

 

initializePartitions

case PERIODIC_WATERMARKS: {
    @SuppressWarnings("unchecked")
    KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
            (KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
                    new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];

    int pos = 0;
    for (KafkaTopicPartition partition : assignedPartitions) {
        KPH kafkaHandle = createKafkaPartitionHandle(partition);

        AssignerWithPeriodicWatermarks<T> assignerInstance =
                watermarksPeriodic.deserializeValue(userCodeClassLoader);

        partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
                partition, kafkaHandle, assignerInstance);
    }

    return partitions;
}

KafkaTopicPartitionStateWithPeriodicWatermarks

这个类里面最核心的函数,

    public long getTimestampForRecord(T record, long kafkaEventTimestamp) {
        return timestampsAndWatermarks.extractTimestamp(record, kafkaEventTimestamp);
    }

    public long getCurrentWatermarkTimestamp() {
        Watermark wm = timestampsAndWatermarks.getCurrentWatermark();
        if (wm != null) {
            partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp());
        }
        return partitionWatermark;
    }

可以看到是调用你定义的AssignerWithPeriodicWatermarks来实现

 

PeriodicWatermarkEmitter

    private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {

        public void start() {
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); //start定时器,定时触发
        }

        @Override
        public void onProcessingTime(long timestamp) throws Exception { //触发逻辑

            long minAcrossAll = Long.MAX_VALUE;
            for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) { //对于每个partitions

                // we access the current watermark for the periodic assigners under the state
                // lock, to prevent concurrent modification to any internal variables
                final long curr;
                //noinspection SynchronizationOnLocalVariableOrMethodParameter
                synchronized (state) {
                    curr = state.getCurrentWatermarkTimestamp(); //取出当前partition的WaterMark
                }

                minAcrossAll = Math.min(minAcrossAll, curr); //求min,以partition中最小的partition作为watermark
            }

            // emit next watermark, if there is one
            if (minAcrossAll > lastWatermarkTimestamp) {
                lastWatermarkTimestamp = minAcrossAll;
                emitter.emitWatermark(new Watermark(minAcrossAll)); //emit
            }

            // schedule the next watermark
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); //重新设置timer
        }
    }

 

2. DataStream也可以设置定时发送Watermark

其实实现是加了个chain的TimestampsAndPeriodicWatermarksOperator

DataStream

   /**
     * Assigns timestamps to the elements in the data stream and periodically creates
     * watermarks to signal event time progress.
     *
     * <p>This method creates watermarks periodically (for example every second), based
     * on the watermarks indicated by the given watermark generator. Even when no new elements
     * in the stream arrive, the given watermark generator will be periodically checked for
     * new watermarks. The interval in which watermarks are generated is defined in
     * {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
     *
     * <p>Use this method for the common cases, where some characteristic over all elements
     * should generate the watermarks, or where watermarks are simply trailing behind the
     * wall clock time by a certain amount.
     *
     * <p>For the second case and when the watermarks are required to lag behind the maximum
     * timestamp seen so far in the elements of the stream by a fixed amount of time, and this
     * amount is known in advance, use the
     * {@link BoundedOutOfOrdernessTimestampExtractor}.
     *
     * <p>For cases where watermarks should be created in an irregular fashion, for example
     * based on certain markers that some element carry, use the
     * {@link AssignerWithPunctuatedWatermarks}.
     *
     * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
     *                                      watermark generator.
     * @return The stream after the transformation, with assigned timestamps and watermarks.
     *
     * @see AssignerWithPeriodicWatermarks
     * @see AssignerWithPunctuatedWatermarks
     * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
     */
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

        // match parallelism to input, otherwise dop=1 sources could lead to some strange
        // behaviour: the watermark will creep along very slowly because the elements
        // from the source go to each extraction operator round robin.
        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

        TimestampsAndPeriodicWatermarksOperator<T> operator =
                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }

 

TimestampsAndPeriodicWatermarksOperator

  public class TimestampsAndPeriodicWatermarksOperator<T>
        extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
        implements OneInputStreamOperator<T, T>, Triggerable {

    private transient long watermarkInterval;
    private transient long currentWatermark;

    public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
        super(assigner); //AbstractUdfStreamOperator(F userFunction)
        this.chainingStrategy = ChainingStrategy.ALWAYS; //一定是chain
    }

    @Override
    public void open() throws Exception {
        super.open();

        currentWatermark = Long.MIN_VALUE;
        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

        if (watermarkInterval > 0) {
            registerTimer(System.currentTimeMillis() + watermarkInterval, this); //注册到定时器
        }
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        final long newTimestamp = userFunction.extractTimestamp(element.getValue(), //由element中基于AssignerWithPeriodicWatermarks提取时间戳
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

        output.collect(element.replace(element.getValue(), newTimestamp)); //更新element的时间戳,再次发出
    }

    @Override
    public void trigger(long timestamp) throws Exception { //定时器触发trigger
        // register next timer
        Watermark newWatermark = userFunction.getCurrentWatermark(); //取得watermark
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
            output.emitWatermark(newWatermark); //发出watermark
        }

        registerTimer(System.currentTimeMillis() + watermarkInterval, this); //重新注册到定时器
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
            currentWatermark = Long.MAX_VALUE;
            output.emitWatermark(mark); //forward watermark
        }
    }

 

可以看到在processElement会调用AssignerWithPeriodicWatermarks.extractTimestamp提取event time

然后更新StreamRecord的时间

 

然后在Window Operator中,

@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

会在windowAssigner.assignWindows时以element的timestamp作为assign时间

时间: 2024-09-25 06:37:03

Flink - watermark的相关文章

Apache Flink源码解析之stream-window

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析.本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下. Window 一个Window代表有限对象的集合.一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点--所有应该进入这个窗口的元素都已经到达. Flink的根窗口对象是一个抽象类,只提供了一个抽象方法: public abstract long maxTimes

Apache Flink源码解析之stream-source

今天我们来解读一下Flink stream里的source模块.它是整个stream的入口,也是我们了解其流处理体系的入口. SourceFunction SourceFunction是所有stream source的根接口. 它继承自一个标记接口(空接口)Function. SourceFunction定义了两个接口方法: run : 启动一个source,即对接一个外部数据源然后emit元素形成stream(大部分情况下会通过在该方法里运行一个while循环的形式来产生stream). ca

Flink如何应对背压问题

经常有人会问Flink如何处理背压问题.其实,答案很简单:Flink没用使用任何通用方案来解决这个问题,因为那根本不需要那样的方案.它利用自身作为一个纯数据流引擎的优势来优雅地响应背压问题.这篇文章,我们将介绍背压问题,然后我们将深挖Flink的运行时如何在task之间传输数据缓冲区内的数据以及流数据如何自然地两端降速来应对背压,最终将以一个小示例来演示它. 什么是背压 像Flink这样的流处理系统需要能够优雅地应对背压问题.背压通常产生于这样一种场景:当一个系统接收数据的速率高于它在一个瞬时脉

Flink - FLIP

https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals   FLIP-1 : Fine Grained Recovery from Task Failures   When a task fails during execution, Flink currently resets the entire execution graph and triggers complete re-executi

谈谈EventTime以及Watermark

多次被网友问及EventTime(事件时间)以及Watermark(水位线)的问题.今天开篇文章来谈一谈个人观点,如果有表述不当的地方,希望各位指正. 什么是EventTime 简单来说就是:事件发生于产生设备上的本地时间.作为对比,需要再引出另一个概念:ProcessingTime,它指的是事件被处理时所在处理节点的本地时间. 关于EventTime和ProcessingTime,有个很好的类比比较贴切.那就是经典的<星球大战>系列电影的发布顺序跟观影顺序的问题. 这个不错的类比思路来自于F

Flink原理与实现:如何生成ExecutionGraph及物理执行图

阅读本文之前,请先阅读Flink原理与实现系列前面的几篇文章 : Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraph ExecutionGraph生成过程 StreamGraph和JobGraph都是在client生成的,这篇文章将描述如何生成ExecutionGraph以及物理执行图.同时会讲解一个作业提交后如何被调度和执行. client生成JobGraph之后,就通过submitJob提交至Job

深入理解Apache Flink核心技术

Apache Flink(下简称Flink)项目是大数据处理领域最近冉冉升起的一颗新星,其不同于其他大数据项目的诸多特性吸引了越来越多人的关注.本文将深入分析Flink的一些关键技术与特性,希望能够帮助读者对Flink有更加深入的了解,对其他大数据系统开发者也能有所裨益.本文假设读者已对MapReduce.Spark及Storm等大数据处理框架有所了解,同时熟悉流处理与批处理的基本概念. Flink简介 Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布.数据通信以

Flink原理与实现:Window的实现原理

Flink原理与实现系列文章: Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraphFlink原理与实现:如何生成ExecutionGraph及物理执行图Flink原理与实现:Operator Chain原理Flink原理与实现:详解Flink中的状态管理 在阅读本文之前,请先阅读Flink 原理与实现:Window机制,这篇文章从用户的角度,对Window做了比较详细的分析,而本文主要是从Flink框架

Flink 原理与实现:Session Window

在上一篇文章:Window机制中,我们介绍了窗口的概念和底层实现,以及 Flink 一些内建的窗口,包括滑动窗口.翻滚窗口.本文将深入讲解一种较为特殊的窗口:会话窗口(session window).建议您在阅读完上一篇文章的基础上再阅读本文. 当我们需要分析用户的一段交互的行为事件时,通常的想法是将用户的事件流按照"session"来分组.session 是指一段持续活跃的期间,由活跃间隙分隔开.通俗一点说,消息之间的间隔小于超时阈值(sessionGap)的,则被分配到同一个窗口,