Flink-CEP之带版本的共享缓冲区

带版本的共享缓冲区

当股票模式以一个事件流作为输入时,状态转换将会作用于事件流从而引起事件的状态变化。结合窗口对参与匹配的事件的限制以及模式中结合事件上下文(状态)的过滤条件,同一事件流随着时间的流动或者多次运行都会产生多种不同的匹配结果。在此我们为示例模式构建了一个事件流以及其可能产生的三种匹配结果,如下图:

在事件e6到达后,会产生两个结果:R1和R2,而结果R3将会在e8到来之后匹配成功。图中可见R1、R2和R3这三个匹配结果在一些事件上产生了重叠。

为了保留已匹配的结果,需要将匹配结果中包含的事件保存起来,这种数据结构在论文中称之为缓冲区。首先,初步的解决方案是为独立匹配而设计缓冲区,在缓冲区中为了让不同的状态存储不同的事件,每个状态对应一个栈空间(除了最终态),针对上面三个匹配的独立缓冲区如下图a-c所示:

上图中的a-c描述了存储R1-R3三个匹配结果的独立缓冲区。每个栈包含事件和指向事件的指针,它们通常是因为“begin”或者“take”状态转换而被加入到缓冲区中。每个事件有一个前置指针指向之前被选择的事件,之前的事件要么在相同的栈中要么在之前的栈中。当一个事件被加入到缓冲区中,它的指针也一同被设置,在缓冲区中从该事件开始沿着前置指针的一次遍历将能检索到完整的匹配。

为每个匹配单独构建缓冲区,从技术实现上来看是没有问题的,但随着事件的流入,模式的匹配结果也将会变得更多,从而导致缓冲区的数量也极具上升。为了避免缓冲区、栈的数目过多以及在栈中频繁地复制事件,一种优化措施是将这些独立的缓冲区合并为单一共享的缓冲区。这个过程最终是基于合并这些独立缓冲区中相应的栈来实现的。为了在遍历时找到匹配的事件流,合并栈中相同的事件时必须保留他们的前置指针,这一步是整个优化措施的关键,如果草率地合并这些栈中的事件,在共享缓冲区中沿着这些已存在的指针所进行的遍历将会导致错误的结果。举个例子,假设我们将R2的a[i]栈中的e4元素以及b栈中的e6元素与R3缓冲区里的a[i]栈以及b栈合并(来达到合并R1和R2缓冲区的目的),从e6开始的一次遍历会产生包含:e1,e2,e3,e4和e6元素的结果,而这是一个错误的结果。产生这一问题的原因是因为在合并的过程中,没有区分来自不同缓冲区中的不同指针。

为了解决这个问题,

NFAb

设计了一个带版本的共享缓冲区。它会给每一次匹配分配一个版本号并使用该版本号来标记在这次匹配中的所有指针。但这里又会面临另一个问题:无法为某次匹配预分配版本号,因为任何非确定性的状态都能派生出新的匹配。而解决这一问题的技术是采用杜威十进制分类法[^1]来编码版本号,它以

id1

(.

idj

)∗(1≤j≤t)的形式动态增长,这里t关联着当前状态。直观地说,它表示这次运行从

idth1

状态开始被初始化然后到达状态

qj

,并从中分割出

idthj

的实例,这被称之为祖先运行。这种版本号编码技术也保证一个运行的版本号v跟它的祖先运行的版本号兼容。具体而言也就是说:(1)v包含了v’作为前缀或者(2)v与v’仅最后一个数值

idt

不同,而

idt

对于版本v而言要大于版本v’。

在程序实现上,Flink定义了一个DeweyNumber类来表示这种点分十进制形式的版本号,其内部使用数组来存储“点分”的每一个数值并提供了几个对版本号操作的方法。比如,增加版本号:

进入到一个新状态,将新增一位版本号:

以及检测当前DeweyNumber与另一个DeweyNumber是否兼容:

�一个带版本的共享缓冲区合并那三个独立缓冲区后的结果如上图中的图(d)所示,所有来自单个缓冲区的指针现在都被标记了兼容的版本号。而之前提到的那个因为不具备版本号而导致遍历产生错误结果的问题在这里也将不再出现,因为从e6指向e4版本号2.0.0跟e4指向e3(处于a[i]栈中)的版本号1.0不兼容,而只有版本号兼容,遍历才会继续。带有版本号的缓冲区对所有的匹配提供简洁的编码,并且被标记了兼容版本号的指针和事件构建了一个满足恰好匹配一次的带版本的视图。为了返回一次匹配成功的结果,检索算法会沿着兼容指针从栈中最近的事件开始遍历。

版本号的实现以及理论分析完成之后,我们来看代码中如何实现这个共享缓冲区。SharedBuffer就是这一数据结构的实现,它是一个嵌套多层略显复杂的数据结构。一个SharedBuffer包含一个键与SharedBufferPage的映射(Map):

SharedBufferPage表示一组拥有相同键的元素的存储。但是元素也是由映射构成的,该映射的键是ValueTimeWrapper类型,而值为SharedBufferEntry类型:

其中ValueTimeWarpper类似于一个封装了值和时间戳的二元组。对于SharedBufferEntry,它保存了一组关联着的SharedBufferEdge。SharedBufferEdge包含指向目标SharedBufferEntry的指针(多个SharedBufferEntry之间的边)以及边上的版本号DeweyNumber。因此,SharedBuffer的整体视图如下所示:

从类图上来展示它们之间的关联关系如下图:

从上面的关系图可见,往SharedBuffer中添加元素如果有前置元素将会涉及到跟前置元素的SharedBufferEntry构建关联关系。因此对于设置元素的put方法被分成了两种情况:

  1. 无前置元素:不需要处理跟之前元素的关系,也不需要初始化对应的SharedBufferEntry;
  2. 有前置元素:需要提供前置元素的信息,并在内部查找到前置元素所对应的SharedBufferEntry,然后再构建ValueTimeWrapper与SharedBufferEntry的映射关系。

通常SharedBuffer如果用户的模式配置了时间窗口,那么它会基于窗口长度来对过期元素进行清理。提供该服务的方法是:prune。该方法会在每个page上进行prune。而在page上的prune则会对其内部Map的每一项的ValueTimeWrapper的时间戳进行比对,凡是小于等于清理时间戳的元素,都予以清理。

另外,由<key, value,
timestamp>结合所映射到的SharedBufferEntry,可能会被多次引用(如之前三次匹配中的e4),SharedBuffer采用的是引用计数机制(它是一种资源回收时常用的机制)来标记引用次数。具体而言是由lock、release以及remove这三个方法共同组合来完成这一功能的,而引用计数器实现在SharedBufferEntry上。当然,在删除该SharedBufferEntry时需要一并清除它被其他SharedBufferEdge的引用关系。

为了基于版本号提取某个匹配的的所有元素,Flink定义了一个ExtractionState来存储提取状态的信息,该数据结构内部以栈结构来存储向前遍历的整个路径。下面我们来分析一下,SharedBuffer是如何提取模式的匹配元素,该逻辑被封装在方法extractPatterns中:

注意,上面代码段中有两个栈:

  1. extractionStates:类型为Stack<ExtractionState<K,
    V>>,对当前处理状态压栈,辅助深度遍历;
  2. currentPath:类型为Stack<SharedBufferEntry<K,
    V>>,保存匹配模式中各状态的“路径”因为是从后往前遍历,所以恰好适合用栈来存储,出栈时正好是顺序的。 

原文发布时间为:2017-03-05

本文作者:vinoYang

本文来自合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

时间: 2025-01-25 06:03:45

Flink-CEP之带版本的共享缓冲区的相关文章

巧用笔记本电脑自带的无线网卡共享上网

对于现在每一个在用计算机的人来说,网络是与计算机搭配的必不可缺的重要部分之一,笔记本电脑更是如此.笔记本电脑的设计本身就是为了移动办公的设计,而网络与办公也是密不可分的,但是对于我们这些笔记本电脑用户却经常用到这样一个问题:有时候只有一个网络接口,但是却有多台计算机要接入互联网,这要怎么办呢? 其使答案很简单,可以利用无线共享上网. 方案1:利用无线路由器.这是一个最简单的方法,但是前提是笔记本电脑要有无线网卡模块,否则就只能只用有线的端口了.无线路由器的设置十分简单,以笔者的D-Link 62

Flink之CEP案例分析-网络攻击检测

上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解.所选取的案例是对网络遭受的潜在攻击进行检测并给出告警.当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据. 假定一家云服务提供商,有多个跨地区的数据中心,每个数据中心会定时向监控中心上报其瞬时流量. 我们将检测的结果分为三个等级: 正常:流量在预设的正常范围内: 警告:某数据中心在10

共享法拉利、购物车、遛娃车……不是所有带轮子的都能“走”下去

共享经济到底有多热?看看现在乌烟瘴气.花样百出的共享经济项目就能知晓.除了共享打车.共享单车.共享充电宝等已经有大笔融资进账,且已经展现自身潜力并对大众生活产生极大影响的项目外,共享篮球.共享马扎等不知所谓的项目也来凑热闹.甚至很多创业者认为只要是有"轮子"的共享经济项目,就都能一直"走"下去. 于是就在近日,共享法拉利.共享购物车.共享遛娃车等带轮子的共享经济项目出现在大众视野中.它们以各种特性来"搏出位",试图成为下一个一夜暴富的幸运儿.但事

Flink如何应对背压问题

经常有人会问Flink如何处理背压问题.其实,答案很简单:Flink没用使用任何通用方案来解决这个问题,因为那根本不需要那样的方案.它利用自身作为一个纯数据流引擎的优势来优雅地响应背压问题.这篇文章,我们将介绍背压问题,然后我们将深挖Flink的运行时如何在task之间传输数据缓冲区内的数据以及流数据如何自然地两端降速来应对背压,最终将以一个小示例来演示它. 什么是背压 像Flink这样的流处理系统需要能够优雅地应对背压问题.背压通常产生于这样一种场景:当一个系统接收数据的速率高于它在一个瞬时脉

基于Apache Flink的实时计算引擎Blink在阿里搜索中的应用

阿里巴巴是世界上最大的电子商务零售商. 我们在2015年的年销售额总计3940亿美元,超过eBay和亚马逊之和.阿里巴巴搜索(个性化搜索和推荐平台)是客户的关键入口,并承载了大部分在线收入,因此搜索基础架构团队需要不断探索新技术来改进产品. 在电子商务网站应用场景中,什么能造就一个强大的搜索引擎?答案就是尽可能的为每个用户提供实时相关和准确的结果.同样一个不容忽视的问题就是阿里巴巴的规模,当前很难找到能够适合我们的技术. Apache Flink就是一种这样的技术,阿里巴巴正在使用基于Flink

在驱动和应用程序间共享内存

在不同的场合,很多驱动编写人员需要在驱动和用户程序间共享内存.两种最容易的技术是:    l 应用程序发送IOCTL给驱动程序,提供一个指向内存的指针,之后驱动程序和应用程序就可以共享内存.(应用程序分配共享内存)    l 由驱动程序分配内存页,并映射这些内存页到指定用户模式进程的地址空间,并且将地址返回给应用程序.(驱动程序分配共享内存)    使用IOCTL共享Buffer:    使用一个IOCT描述的Buffer,在驱动和用户程序间共享内存是内存共享最简单的实现形式.毕竟,IOCTL也

局域网共享上网的方法

这里介绍的局域网共享上网的方法,是使用Windows 2000 Professional自带的Internet共享完成. 安装好上网用的设备(如Modem,TA,xDSL类Modem等等)后,与上网设备直联的计算机在网络属性中会提供"共享"的选项.在共享的选项中选择启用此连接的Internet连接共享,就可以基本上完成共享上网的设置. 一般的步骤包括: 1.安装上网的硬件设备及相关驱动程序; 2.确认在单机运行下,上网设备正常工作; 3.局域网中所有待上网的计算机都安装了TCP/IP协

获得文件的版本信息

我们在许多情况下,(如编制安装程序时),需要获得应用程序的版本信息,以决定是否更新文件.最简单的办法是比较文件的生成日期.其实,对于EXE.DLL.OCX等类型的文件有更加准确的方法获得文件的版本信息.利用API函数GetFileVersionInfo,GetFileVersionInfoSize,VerQueryValue,我们可以很方便地获得文件的一系列信息. 以下使用C++ Builder 4为例演示具体的操作: 首先,建立一个新项目,在主窗口上放置一个Edit控件,一个Button控件.

在Windows Phone 8与Windows 8应用程序之间共享代码

Visual Studio 2012 为构建 Windows 8 和 Windows Phone 8 应用程序提供了一套出色的工具.因此,可以进 行适当的探究,以了解可在应用程序的 Windows 应用商店版本与 Windows Phone 版本之间共享多少代 码. 您可以采用多种不同语言编写 Windows 应用商店应用程序:XAML 搭配 C#.Visual Basic .C++,甚至是 HTML5 搭配 JavaScript. 通常采用 XAML 搭配 C# 或 Visual Basic