Flink-CEP之模式流与运算符

之前我们分析了CEP的API,接下来我们将分析CEP API的内部实现包括模式流与运算符。

模式流

模式流(PatternStream)是CEP模式匹配的流抽象,一个PatternStream对象表示模式检测到的序列所对应的流。该序列以映射来表示,以模式名关联一组事件对象。

为了使用PatternStream,我们首先要构建它,为此Flink提供了一个名为CEP的帮助类,它定义了一个pattern静态方法:

DataStream<String> inputStream = ...
Pattern<String, ?> pattern = ...

PatternStream<String> patternStream = CEP.pattern(inputStream, pattern);

该方法接收初始事件流DataStream对象以及用于匹配的Pattern对象,在pattern方法内部通过将这两个参数传递给PatternStream的构造方法来构建该对象的。

从之前的案例代码中我们看到,通常会在PatternStream上调用select或flatSelect来获取某个模式下匹配到的事件来实现我们的业务逻辑。而在select/flatSelect方法内部,其实仍然是借助于常规DataStream实现的,我们以其中select方法(存在多个重载)一个作为示例:

public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction,
    TypeInformation<R> outTypeInfo) {
    DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);

    return patternStream
        .map(new PatternSelectMapper<>(
            patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
        .returns(outTypeInfo);
}

方法的第一行,借助于CEPOperatorUtils这一帮助类构建DataStream

final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false);

接着,会判断初始输入流是否是基于键分组的(KeyedStream),这是为了采用不同的运算符对初始输入流进行转换,如果是KeyedStream,则将其初始输入流进行强制转换为KeyedStream并采用KeyedCEPPatternOperator:

patternStream = keyedStream.transform(
    "KeyedCEPPatternOperator",
    (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
    new KeyedCEPPatternOperator<>(
        inputSerializer,
        isProcessingTime,
        keySelector,
        keySerializer,
        nfaFactory)
);

如果是普通未分组的数据流,则采用CEPPatternOperator:

patternStream = inputStream.transform(
    "CEPPatternOperator",
    (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
    new CEPPatternOperator<T>(
        inputSerializer,
        isProcessingTime,
        nfaFactory
    )
).setParallelism(1);

从上面我们看到无论是哪种运算符都要求传递NFA工厂,说明NFA是在运算符内部工作的。另外需要注意的是,如果是普通数据流,其并行度被设置为1,也就是整个数据流没办法分区以并行执行,而是作为一个全局数据流参与模式匹配。这一点其实不难想象,因为我们在分析模式时,其有事件选择策略(严格紧邻还是非严格紧邻),也就是说事件前后顺序是模式的一部分,那么这时候如果普通事件流再分区执行,将会打破这种顺序,从而导致匹配失效。

通过对PatternStream的解析可知,它其实不同于DataStream
API里的各种数据流对象,它并不是�DataStream的特例,也不是由转换函数得来,它只是对DataStream的二次封装。

上面我们提及了两种运算符,但其实并不止这么多,具体它们的实现以及差别,我们接下来会进行详细分析。

运算符

CEP的运算符实现有两个考虑因素:是否针对基于键分区的数据流以及是否支持对超时的匹配序列进行处理。因此针对这两个因素的组合将会产生四种运算符的实现,所有运算符相关的类图如下所示:

AbstractCEPBasePatternOperator为所有的运算符提供基础模板,它自身继承自流运算符(AbstractStreamOperator)并扩展了单输入流运算符接口(OneInputStreamOperator)。AbstractCEPBasePatternOperator定义了两对抽象方法,分别是:

  • (get/update)NFA:(获得/更新)NFA的实例;
  • (get/update)PriorityQueue:(获得/更新)优先级队列;

其实,这两对方法主要是为了实现基于键分组的运算符提供的,因为它们会利用用户状态API来获取并更新NFA实例以及优先级队列(优先级队列用于缓存事件时间语义时的事件以等待水位线),这一点我们会在下文剖析。借助于这两对抽象方法,提供了对processElement(定义在OneInputStreamOperator接口中)的实现:

public void processElement(StreamRecord<IN> element) throws Exception {
    if (isProcessingTime) {
        //获得NFA对象,处理事件并更新NFA对象
        NFA<IN> nfa = getNFA();
        processEvent(nfa, element.getValue(), System.currentTimeMillis());
        updateNFA(nfa);
    } else {
        //获得优先级队列缓存该元素(直到接收到水位线),更新优先级队列
        PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();

        if (getExecutionConfig().isObjectReuseEnabled()) {
            priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()),
                element.getTimestamp()));
        } else {
            priorityQueue.offer(element);
        }
        updatePriorityQueue(priorityQueue);
    }
}

从代码实现来看,真正执行模式匹配的是processEvent方法。AbstractCEPBasePatternOperator有两个抽象派生类,分别是:

  • AbstractCEPPatternOperator:普通的CEP模式运算符;
  • AbstractKeyedCEPPatternOperator:基于键分区的CEP模式运算符

由于AbstractCEPPatternOperator相对较为简单,因此我们先分析它的实现。AbstractCEPPatternOperator在运行时是单实例的,因为它的并行度为一,因此它不需要用到用户状态API,同时也就不需要实现抽象方法updateNFA以及updatePriorityQueue。它实现了processWatermark方法:

public void processWatermark(Watermark mark) throws Exception {
    //如果优先级队列不为空且队首元素的时间戳小于等于水位线的时间戳,则出队元素并调用processEvent方法处理
    while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
        StreamRecord<IN> streamRecord = priorityQueue.poll();

        processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
    }

    //发射水位线
    output.emitWatermark(mark);
}

由于AbstractCEPPatternOperator最终继承自AbstractStreamOperator,所以它还需要实现运算符状态的快照/恢复方法对。我们可以直接利用运算符状态快照来保存相关状态,这里主要的状态就是NFA对象以及优先级队列。

接下来,我们来分析AbstractKeyedCEPPatternOperator的实现,不同于AbstractCEPPatternOperator所处理的全局事件流。AbstractKeyedCEPPatternOperator所面对的是基于键分区的事件流,因此除了NFA对象以及优先级队列,还有所有用户的键集合需要存储。且因为是多分区并行执行,那么NFA对象和优先级队列也将会在多个分区内并行存在。这时,将不得不使用用户状态API,以在内部将这些状态是跟键关联(内部是KVState):

private transient ValueState<NFA<IN>> nfaOperatorState;
private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;

因此get/updateNFA方法对是为了配合ValueState的value/update方法对。但键集合仍然可以使用运算符状态来保存。

AbstractKeyedCEPPatternOperator跟AbstractCEPPatternOperator还有一个区别比较大的地方在于对processWatermark方法的实现,在processWatermark内部它会迭代所有的键,并使得它们内部符合计算条件(参照水位线)的元素都被计算。

参照我们上面给出的运算符继承关系图,到目前为止,我们已经解析了上面两层运算符。其中,第一层为processElement提供模板实现,第二层为processWatermark(跟事件时间有关)提供模板实现以及对运算符逻辑相关的状态进行维护。而最后一层则才是真正处理事件的模式匹配的processEvent方法的实现,该方法由AbstractCEPBasePatternOperator定义。

运算符对processEvent方法的实现,其逻辑基本上都是类似的:调用NFA对象的process方法,逐个处理事件,该方法我们在分析NFA时做过重点剖析。下面我们选择四个运算符里最为复杂的基于键分区且支持超时的运算符(TimeoutKeyedCEPPatternOperator)进行分析:

protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
    //调用NFA的process实例方法,会得到由两个集合组成的二元组,其中二元组第一个下标表示匹配模式的事件序列;
    //第二个下标表示超时的序列
    Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns =
        nfa.process(event, timestamp);

    Collection<Map<String, IN>> matchedPatterns = patterns.f0;
    Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;

    //构建用于输出的流记录对象,其内部存储的数据结构是一个Either对象,它表示这样一个语义:
    //该对象要么左边有值,要么右边有值
    StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord =
        new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(null, timestamp);

    //如果有匹配模式的事件序列,则加入Either的右对象
    if (!matchedPatterns.isEmpty()) {
        for (Map<String, IN> matchedPattern : matchedPatterns) {
            streamRecord.replace(Either.Right(matchedPattern));
            output.collect(streamRecord);
        }
    }

    //如果有超时事件序列,则加入Either的左对象
    if (!partialPatterns.isEmpty()) {
        for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
            streamRecord.replace(Either.Left(partialPattern));
            output.collect(streamRecord);
        }
    }
}

其他运算符对processEvent的实现大同小异,由于篇幅有限,不再赘述。

原文发布时间为:2017-03-16

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2025-01-01 13:21:21

Flink-CEP之模式流与运算符的相关文章

详解C++编程中的重载流插入运算符和流提取运算符_C 语言

C++的流插入运算符"<<"和流提取运算符">>"是C++在类库中提供的,所有C++编译系统都在类库中提供输入流类istream和输出流类ostream.cin和cout分别是istream类和ostream类的对象.在类库提供的头文件中已经对"<<"和">>"进行了重载,使之作为流插入运算符和流提取运算符,能用来输出和输入C++标准类型的数据.因此,凡是用"cout&

C++语言基础 例程 重载流插入运算符和流提取运算符

贺老师的教学链接  本课讲解 重载流插入运算符"<<" #include <iostream> using namespace std; class Complex { public: Complex( ) { real=0; imag=0; } Complex(double r,double i) { real=r; imag=i; } Complex operator + (Complex &c2); //运算符"+"重载为成员函

Flink运行时之流处理程序生成流图

流处理程序生成流图 DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph). 什么是流图 流图(StreamGraph)是表示流处理程序拓扑的数据结构,它封装了生成作业图(JobGraph)的必要信息.它的类继承关系如下图所示: 当你基于StreamGraph的继承链向上追溯,会发现它实现了FlinkPlan接口. Flink效仿了传统的关系型数据库在执行SQL时生成执行计划并对其进行优化的思路

Flink之CEP案例分析-网络攻击检测

上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解.所选取的案例是对网络遭受的潜在攻击进行检测并给出告警.当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据. 假定一家云服务提供商,有多个跨地区的数据中心,每个数据中心会定时向监控中心上报其瞬时流量. 我们将检测的结果分为三个等级: 正常:流量在预设的正常范围内: 警告:某数据中心在10

以Flink为例,消除流处理常见的六大谬见

我们在思考流处理问题上花了很多时间,更酷的是,我们也花了很多时间帮助其他人认识流处理,以及如何在他们的组织里应用流处理来解决数据问题. 我们首先要做的是纠正人们对流处理(作为一个快速变化的领域,这里有很多误见值得我们思考)的错误认识. 在这篇文章里,我们选出了其中的六个作为例子.因为我们对Apache Flink比较熟悉,所以我们会基于Flink来讲解这些例子. 谬见1:没有不使用批处理的流(Lambda架构) 谬见2:延迟和吞吐量:只能选择一个 谬见3:微批次意味着更好的吞吐量 谬见4:Exa

阿里蒋晓伟谈流计算和批处理引擎Blink,以及Flink和Spark的异同与优势

首届阿里巴巴在线技术峰会(Alibaba Online Technology Summit),将于7月19日-21日 20:00-21:30 在线举办.本次峰会邀请到阿里集团9位技术大V,分享电商架构.安全.数据处理.数据库.多应用部署.互动技术.Docker持续交付与微服务等一线实战经验,解读最新技术在阿里集团的应用实践. 7月19日晚8点,阿里搜索事业部资深搜索专家蒋晓伟将在线分享<阿里流计算和批处理引擎Blink>,其基于Apache Flink项目并且在API和它上兼容,深度分享阿里为

Flink流处理之迭代API分析

IterativeStream Flink在DataStream中也是通过一个特定的可迭代的流(IterativeStream)来构建相关的迭代处理逻辑,这一点跟DataSet提供的可迭代的数据集(IterativeDataSet)的是类似的. IterativeStream继承自DataStream,因此DataStream支持的转换函数,在IterativeStream上同样可以调用. IterativeStream的实例是通过DataStream的iterate方法创建的˙.iterate

专访阿里云高级技术专家吴威:Kafka、Spark和Flink类支持流式计算的软件会越来越流行

杭州·云栖大会将于2016年10月13-16日在云栖小镇举办,在这场标签为互联网.创新.创业的云计算盛宴上,众多行业精英都将在这几天里分享超过450个演讲主题. 为了帮助大家进一步了解这场全球前言技术共振盛会的内容,采访了各个论坛的大咖,以飨读者. 以下为正文: 吴威,阿里云高级技术专家.E-MapReduce产品是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,为用户提供集群.作业.数据等管理的一站式大数据处理分析服务,他在其中参与产品设计讨论.平台性能调优等工作,并为用户提供技

【C++】流插入、提取运算符和类型转换

流插入,流提取运算符重载和类型转换的讲解和实例 重载流插入和提取运算符的运算符函数,不能作为类的成员函数,只能作为普通函数 流插入,流提取运算符重载 流插入运算符"<<"和流提取运算符">>"也可以被用来重载. 我们可以使用cout对一个int,string等等类型的数据进行输出,我们却不可以对我们自己定义的一个类的对象进行直接输出,因为我们这个类是没有"<<"运算符的. "<<"