Flink之CEP-API简介

CEP API的核心是Pattern
API,它允许你快速定义复杂的事件模式。每个模式包含多个阶段(stage)或者我们也可称为状态(state)。为了从一个状态切换到另一个状态,用户可以指定条件,这些条件可以作用在邻近的事件或独立事件上。

Pattern在外部无法通过构造器进行实例化,构造器的访问限定符是protected的,因此Pattern对象只能通过begin和next以及followedBy(用于创建其派生类FollowedByPattern)来创建,在创建时需要指定其名称。

每个模式必须以一个初始状态开始且必须指定唯一的名称来标识被匹配的事件:

Pattern<Event, ?> start = Pattern.<Event>begin("start");

通过where方法可以为起始状态指定一个过滤条件:

start.where(new FilterFunction<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
});

当然,也可以严格地限制接收事件为初始化的事件类型(Event)的子类型(SubEvent),通过subtype方法:

start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // some condition
    }
});

正如你所看到的,子类型条件也可以与在子类型上的一个额外的过滤条件合并。事实上,你可以通过多次调用where以及subtype方法并指定多个条件,这些条件将会通过逻辑“与”运算符合并。在构建匹配条件时,CEP提供了两个函数类:AndFilterFunction以及SubtypeFilterFunction。其中SubtypeFilterFunction专用于subtype
API来判定事件的类型是否符合要求。AndFilterFunction则是通用的逻辑“与”运算符来连接左右表达式。其类图如下:

图中展示了它的构造器,它可以注入两个FilterFunction函数的实例,分别作为逻辑与的左表达式和右表达式。由于AndFilterFunction扩展了接口FilterFunction,而当调用where或subtype
API时其实都是在不断扩充左表达式:

接下来,我们可以追加更多的状态来构建更复杂的模式,多个状态的转换涉及到模式对事件的选择策略:

Pattern当前支持严格邻近非严格邻近这两种事件选择策略。事件选择策略在Pattern的API上通过如下两个方法来指定:

  • next:会追加一个新的Pattern对象到既有的Pattern之后,它表示当前模式运算符所匹配的事件必须是严格紧邻的,这意味着两个被匹配的事件必须是前后紧邻,中间没有其他元素;
  • followedBy:会追加一个新的Pattern到既有的Pattern之后(其实返回的是一个FollowedByPattern对象,它是Pattern的派生类),它表示当前运算符所匹配的事件不必严格紧邻,这意味着其他事件被允许穿插在匹配的两个事件之间;

事实上Flink的CEP实现简化了论文里提及的四种事件选择策略。

创建一个新的严格邻近的Pattern:

Pattern<Event, ?> strictNext = start.next("middle");

创建一个非严格邻近的Pattern:

Pattern<Event, ?> nonStrictNext = start.followedBy("middle");

一个复杂的CEP程序可能其模式也较为复杂,而多个Pattern之间通过前向指针建立连接关系从而形成“模式链”,形如下图:

也可以通过within
API为模式定义一个时间约束(也即时间窗口),它表示第一个元素和最后一个元素之间的时间间隔不能超过窗口时间。例如,通过within定义一个模式匹配必须发生在10秒之内。

next.within(Time.seconds(10));

为了在你的事件流上运行模式检测,你得创建一个PatternStream。给定一个输入流input以及一个模式pattern(很有可能是一个模式链的头),你可以通过如下的示例代码创建一个PatternStream:

DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

一旦你获得PatternStream你可以从匹配的事件序列中通过select API或者flatSelect API选择匹配的事件。select
API要求一个PatternSelectFunction函数的实现。PatternSelectFunction有一个select方法,会被每个匹配的事件序列调用。它接收一个匹配事件的”状态名/事件”对映射,并恰好返回一个结果:

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
    @Override
    public OUT select(Map<String, IN> pattern) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");

        return new OUT(startEvent, endEvent);
    }
}

PatternFlatSelectFunction跟PatternSelectFunction类似,唯一的区别就是它可以返回任意数量的结果。为了做到这一点,select方法带了一个额外的Collector参数用来输出元素:

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
    @Override
    public void select(Map<String, IN> pattern, Collector<OUT> collector) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");

        for (int i = 0; i < startEvent.getValue(); i++ ) {
            collector.collect(new OUT(startEvent, endEvent));
        }
    }
}

下一小篇我们将以一个示例来展示这些API的应用。

原文发布时间为:2017-02-28

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2024-09-03 19:22:05

Flink之CEP-API简介的相关文章

Flink关系型API简介

在接触关系型API之前,用户通常会采用DataStream.DataSet API来编写Flink程序,它们都提供了丰富的处理能力,以DataStream为例,它有如下这些优点: 富有表现力的流处理,包括但不限于:转换数据,更新状态,定义窗口.聚合,事件时间语义,有状态且保证正确性等: 高度自定义的窗口逻辑:分配器.触发器.逐出器以及允许延迟等: 提升与外部系统连接能力的异步I/O接口: ProcessFunction给予用户访问时间戳和定时器等低层级的操作能力: 但它同时也存在一些使用壁垒导致

JavaMail API简介

JavaMail API简介JavaMail API是一种可选的.能用于读取.编写和发送电子消息的包(标准扩展).您可使用这种包创建邮件用户代理(Mail User Agent ,MUA) 类型的程序,它类似于Eudora.Pine及Microsoft Outlook这些邮件程序.其主要目的不是像发送邮件或其他邮件传输代理(Mail Transfer Agent,MTA)类型的程序那样用于传输.发送和转发消息.换句话说,用户可以与MUA类型的程序交互,以阅读和撰写电子邮件.MUA依靠MTA处理实

JavaMail学习笔记(二)、JavaMail API简介和配置开发环境

一.JavaMail API 简介        JavaMail API是Sun公司为方便Java开发人员在应用程序中实现邮件发送和接收功能而提供的一套标准开发包,它支持一些常用的邮件协议,如:SMTP.POP3.IMAP和MIME等.开发人员使用JavaMail API编写邮件处理软件时,无须考虑邮件协议底层的实现细节,只要调用JavaMail开发包中相应的API类就可以了.JavaMail API封装在一个名mail.jar的文件中,它是开发JavaMail应用程序时所必须使用的核心jar

《Java数字图像处理:编程技巧与应用实践》——第1章 Java Graphics及其API简介 1.1 什么是Java图形设备Graphics

第1章 Java Graphics及其API简介 在开始本书内容之前,笔者假设你已经有了面向对象语言编程的基本概念,了解Java语言的基本语法与特征,原因在于本书的所有源代码都是基于Java语言实现的,而且是基于Java开发环境运行与演示所有图像处理算法的.本书第1章到第3章是为了帮助读者了解与掌握Java 图形与GUI编程的基本知识与概念而写的.本章主要介绍Java GUI编程中基本的图形知识,针对GUI编程,Java语言提供了两套几乎并行的API,分别是Swing与AWT.早期的Java G

【VLC-Android】LibVLC API简介(相当于VLC的MediaPlayer)

vlc-android的LibVLC相当于MediaPlayer对象,这里列一下对应关系. 1.public void playMRL(String mrl) 对应MediaPlayer的setDataSource,注意不要转成Uri再toString,否则无法播放.用法如下: playMRL("http://live.3gv.ifeng.com/zixun.m3u8")  2. public native void play() 对应MediaPlayer的start(),开始播放.

Flink之CEP案例分析-网络攻击检测

上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解.所选取的案例是对网络遭受的潜在攻击进行检测并给出告警.当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据. 假定一家云服务提供商,有多个跨地区的数据中心,每个数据中心会定时向监控中心上报其瞬时流量. 我们将检测的结果分为三个等级: 正常:流量在预设的正常范围内: 警告:某数据中心在10

immutable.js 在React、Redux中的实践以及常用API简介

简介 这个immutable Data 是什么鬼,有什么优点,好处等等,我就不赘述了,这篇Immutable 详解及 React 中实践讲的很透彻. 一个说明不可变的例子 这个可变和不可变是相对于 JavaScript原生引用类型来说的. // 原生对象 let a1 = { b: 1, c: { c1: 123 } }; let b1 = a1; b1.b = 2; console.log(a1.b, b1.b); // 2, 2 console.log(a1 === b1); // true

Flink Table/SQL API 规划 —— Dynamic Table

动态表的概念是社区很早就提出的但并没有全部实现下文中所有介绍都是基于已有规划和proposal给出的,可能与之后实现存在出入仅供参考 概念 动态表直观上看是一个类似于数据库中的Materialized View概念.动态表随着时间改变:类似静态的batch table一样可以用标准SQL进行查询然后一个新的动态表:可以和流无损地互相转换(对偶的).对现有的API最大的改进关键在表的内容随着时间改变,而现在的状态只是append.当前的streaming table可以认为是一种动态表,appen

JNDI简介,jndi在tomcat中的配置,jdbc api简介,java连接数据库服务

连接数据库 JNDI(Java 命名和目录接口)   JNDI(Java 命名和目录接口) 分布式计算环境通常使用命名和目录服务来获取共享的组件和资源.命名和目录服务将名称与位置.服务.信息和资源关联起来.  命名服务提供名称-对象的映射.目录服务提供有关对象的信息,并提供定位这些对象所需的搜索工具.有许多命名和目录服务实现,并且到它们的接口是不同的. Java 命名和目录接口或 JNDI 提供了一个用于访问不同的命名和目录服务的公共接口.请参阅 URL java.sun.com/product

Java Reflection API简介

Java Reflection API提供对JVM中的类,接口和对象的深入洞察.开发者通常使用API来完成以下的任务,这解释了为什么总是使用开发工具,例如debugger和Integrated Development Environments (IDEs): · 决定一个对象的类. · 获得关于一个类的modifiers, fields, methods, constructors, etc的信息. · 获得关于一个接口的常量和方法声明. · 创建一个类的instance,这个类的名字直到运行时