http://esper.codehaus.org/tutorials/tutorial/tutorial.html
http://esper.codehaus.org/esper-4.6.0/doc/reference/en-US/html/index.html
http://www.slideshare.net/hemapani/siddhi-a-second-look-at-complex-event-processing-implementations
Esper Reference Version 4.6.0
1.1. Introduction to CEP and event stream analysis
The Esper engine has been developed to address the requirements of applications that analyze and react to events. Some typical examples of applications are:
• Business process management and automation (process monitoring, BAM, reporting exceptions)
• Finance (algorithmic trading, fraud detection, risk management)
• Network and application monitoring (intrusion detection, SLA monitoring)
• Sensor network applications (RFID reading, scheduling and control of fabrication lines, air traffic)
What these applications have in common is the requirement to process events (or messages) in real-time or near real-time. This is sometimes referred to as complex event processing (CEP) and
event stream analysis. Key considerations for these types of applications are throughput, latency and the complexity of the logic required.
• High throughput - applications that process large volumes of messages (between 1,000 to 100k messages per second)
• Low latency - applications that react in real-time to conditions that occur (from a few milliseconds to a few seconds)
• Complex computations - applications that detect patterns among events (event correlation), filter events, aggregate time or length windows of events, join event streams, trigger based on
absence of events etc.
1.2. CEP and relational databases
Relational databases and the standard query language (SQL) are designed for applications in which most data is fairly static and complex queries are less frequent. Also, most databases store
all data on disks (except for in-memory databases) and are therefore optimized for disk access.
To retrieve data from a database an application must issue a query. If an application need the data 10 times per second it must fire the query 10 times per second. This does not scale well to
hundreds or thousands of queries per second.
Database triggers can be used to fire in response to database update events. However database triggers tend to be slow and often cannot easily perform complex condition checking and implement logic to react.
In-memory databases may be better suited to CEP applications than traditional relational database as they generally have good query performance. Yet they are not optimized to provide immediate, real-time query results required for CEP and event stream analysis.
1.3. The Esper engine for CEP
Relational databases or message-based systems such as JMS make it really hard to deal with temporal data and real-time queries.
Indeed, databases require explicit querying(Pull) to return meaningful data and are not suited to push data as it changes. 关系数据库, 需要存入后去查询, 当流数据量很大的时候, 查询效率很低
JMS systems are stateless and require the developer to implement the temporal and aggregation logic himself. 消息系统需要自己实现aggregation逻辑, 比如统计时间段内的数据量
The Esper engine works a bit like a database turned upside-down(颠倒的).
Instead of storing the data and running queries against stored data, the Esper engine allows applications to store queries and run the data through.
Response from the Esper engine is real-time when conditions occur that match queries. The execution model is thus continuous rather than only when a query is submitted.
其实这两句话就说Esper的设计精髓,
Esper不同于一般的数据库, 查询已存储的数据, 他的设计是, 存储查询, 当流通过的时候, 得到结果.
Esper最大的优势就是real-time, 对海量流数据可以实时监控并得到统计数据. 并且他的结果是连续的, 不像数据库, 只有你查询的时候返回结果, 而对于Esper, 只要数据流过, 统计结果自动产生.
Esper provides two principal methods or mechanisms to process events: event patterns and event stream queries.
Event Patterns
Esper offers an event pattern language to specify expression-based event pattern matching.
Underlying the pattern matching engine is a state machine implementation. This method of event processing matches expected sequences of presence or absence of events or combinations of events. It includes time-based correlation of events.
Event Stream Queries
Esper also offers event stream queries that address the event stream analysis requirements of CEP applications.
Event stream queries provide the windows, aggregation, joining and analysis functions for use with streams of events.
These queries are following the EPL syntax. EPL has been designed for similarity with the SQL query language but differs from SQL in its use of views rather than tables.
Views represent the different operations needed to structure data in an event stream and to derive data from an event stream.
Event Stream Analysis
EPL statements derive and aggregate information from one or more streams of events, to join or merge event streams, and to feed results from one event stream to subsequent statements.
EPL is similar to SQL in it's use of the select clause and the where clause.
However EPL statements instead of tables use event streams and a concept called views. Similar to tables in an SQL statement, views define the data available for querying and filtering. Views can represent windows over a stream of events. Views can also sort events, derive statistics from event properties, group events or handle unique event property values.
This is a sample EPL statement that computes the average price for the last 30 seconds of stock tick events:
select avg(price) from StockTickEvent.win:time(30 sec)
A sample EPL that returns the average price per symbol for the last 100 stock ticks.
select symbol, avg(price) as averagePrice from StockTickEvent.win:length(100)group by symbol
This example joins 2 event streams. The first event stream consists of fraud warning events for which we keep the last 30 minutes (1800 seconds). The second stream is withdrawal events for which we consider the last 30 seconds. The streams are joined on account number.
select fraud.accountNumber as accntNum, fraud.warning as warn, withdraw.amount as amount, MAX(fraud.timestamp, withdraw.timestamp) as timestamp, 'withdrawlFraud' as desc from FraudWarningEvent.win:time(30 min) as fraud, WithdrawalEvent.win:time(30 sec) as withdraw where fraud.accountNumber = withdraw.accountNumber
上面的几个例子可以看出, EPL是一种类似SQL的语言, 可以非常方便的对时间窗口内的流数据进行查询分析.
select * from Withdrawal(amount>=200).win:length(5)
select * from Withdrawal.win:time(4 sec)
Event Pattern Matching
Event patterns match when an event or multiple events occur that match the pattern's definition.
Patterns can also be temporal (time-based). Pattern matching is implemented via state machines.
Pattern expressions can consist of filter expressions combined with pattern operators. Expressions can contain further nested pattern expressions by including the nested expression(s) in round brackets.
There are 5 types of operators:
- Operators that control pattern finder creation and termination: every
- Logical operators: and, or, not
- Temporal operators that operate on event order: -> (followed-by)
- Guards are where-conditions that filter out events and cause termination of the pattern finder, such as timer:within
- Observers observe time events as well as other events, such as timer:interval, timer:at
A sample pattern that alerts on each IBM stock tick with a price greater then 80 and within the next 60 seconds:
every StockTickEvent(symbol="IBM", price>80) where timer:within(60 seconds)
A sample pattern that alerts every 5 minutes past the hour:
every timer:at(5, *, *, *, *)
A sample pattern that alerts when event A occurs, followed by either event B or event C:
A -> ( B or C )
An event pattern where a property of a following event must match a property from the first event:
every a=EventX -> every b=EventY(objectID=a.objectID)
还支持event的特定pattern的事件触发, 包含多事件间的复杂关系...
Combining Patterns Matching with Event Stream Analysis
Patterns match when a sequence (or absence) of events is detected. Pattern match results are available for further analysis and processing. The pattern below detects a situation where a Status event is not followed by another Status event with the same id within 10 seconds. The statement further counts all such occurrences grouped per id.
select a.id, count(*) from pattern [ every a=Status -> (timer:interval(10 sec) and not Status(id=a.id)] group by id
Twitter Storm入门
http://xumingming.sinaapp.com/138/twitter-storm%E5%85%A5%E9%97%A8/
https://github.com/nathanmarz/storm/wiki/Documentation, Twitter Storm Manual
一个Storm集群的基本组件
storm的集群表面上看和hadoop的集群非常像。但是在Hadoop上面你运行的是MapReduce的Job, 而在Storm上面你运行的是Topology。它们是非常不一样的 — 一个关键的区别是: 一个MapReduce Job最终会结束, 而一个Topology运永远运行(除非你显式的杀掉他)。
在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个后台程序: Nimbus, 它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分布代码,分配工作给机器, 并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要 启动/关闭工作进程。每一个工作进程执行一个Topology的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程组成。
Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集群来完成。并且,nimbus进程和supervisor都是快速失败(fail-fast)和无状态的。所有的状态要么在Zookeeper里面, 要么在本地磁盘上。这也就意味着你可以用kill -9来杀死nimbus和supervisor进程, 然后再重启它们,它们可以继续工作, 就好像什么都没有发生过似的。这个设计使得storm不可思议的稳定。
Stream
Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。比如: 你可以把一个tweets流传输到热门话题的流。
storm提供的最基本的处理stream的原语是spout和bolt。你可以实现Spout和Bolt对应的接口以处理你的应用的逻辑。
spout, 流的源头。比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。又比如一个spout可以调用twitter的一个api并且把返回的tweets发射成一个流。
bolt, 流的处理, 可以接收任意多个输入stream, bolt还可以产生新的stream。一些复杂的流转换, 比如从一些tweet里面计算出热门话题, 需要多个步骤, 从而也就需要多个bolt。 Bolt可以做任何事情: 运行函数, 过滤tuple, 做一些聚合, 做一些合并以及访问数据库等等。
spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象, 你可以把topology提交给storm的集群来运行
topology里面的每一个节点都是并行运行的。 在你的topology里面, 你可以指定每个节点的并行度, storm则会在集群里面分配那么多线程来同时计算。
TopologyBuilder builder =
new
TopologyBuilder();
builder.setSpout(
1
,
new
RandomSentenceSpout(),
5
);
builder.setBolt(
2
,
new
SplitSentence(),
8
)
.shuffleGrouping(
1
);
builder.setBolt(
3
,
new
WordCount(),
12
)
.fieldsGrouping(
2
,
new
Fields(
"word"
));
看个简单的例子, 这个Topology有一个spout, 两个bolt
setSpout和setBolt的参数都是一样,
id, 在Topology中的唯一标识
处理function, 对于Spout就是数据产生function
并发线程数
从这个例子可以, 明确的看出这儿的处理流为, spout1 –> Bolt2 –> Bolt3
spouts和bolts以很多task的形式在topology里面同步执行。如果从task的粒度来看一个运行的topology, 它应该是这样的:
当Bolt A的一个task要发送一个tuple给Bolt B, 它应该发送给Bolt B的哪个task呢?
stream grouping专门回答这种问题的
有好几种不同的stream grouping:
- 最简单的grouping是shuffle grouping, 它随机发给任何一个task。上面例子里面RandomSentenceSpout和SplitSentence之间用的就是shuffle grouping, shuffle grouping对各个task的tuple分配的比较均匀。
- 一种更有趣的grouping是fields grouping, SplitSentence和WordCount之间使用的就是fields grouping, 这种grouping机制保证相同field值的tuple会去同一个task, 这对于WordCount来说非常关键,如果同一个单词不去同一个task, 那么统计出来的单词次数就不对了。
fields grouping是stream合并,stream聚合以及很多其它场景的基础。在背后呢, fields grouping使用的一致性哈希来分配tuple的。
Yahoo! s4和Twitter storm的粗略比较
http://www.blogjava.net/killme2008/archive/2011/11/08/363238.html
实时计算、流式处理系统简介与简单分析
http://www.cnblogs.com/MGGOON/archive/2012/04/27/2473152.html
一、实时计算一些基本概念
http://www.cnblogs.com/panfeng412/archive/2011/10/28/2227195.html
二、早期产品
1. IBM的StreamBase:
StreamBase是IBM开发的一款商业流式计算系统,在金融行业和政府部门使用
官方网站:http://www.streambase.com
2. Borealis:Brandeis University、Brown University和MIT合作开发的一个分布式流式系统,由之前的流式系统Aurora、Medusa演化而来,学术研究的一个产品,08年已经停止维护
三、近期产品:
1.Yahoo的S4:S4是一个通用的、分布式的、可扩展的、分区容错的、可插拔的流式系统,Yahoo!开发S4系统,主要是为了解决:搜索广告的展现、处理用户的点击反馈。
官方网站:http://s4.io/
S4简介:http://www.programmer.com.cn/5304/
2. Twitter实时计算
2.1 Twitter的storm:Storm是一个分布式的、容错的实时计算系统
Storm用途:可用于处理消息和更新数据库(流处理),在数据流上进行持续查询,并以流的形式返回结果到客户端(持续计算),并行化一个类似实时查询的热点查询(分布式的RPC)。
官方指南:https://storm.canonical.com/Tutorial
github下载:https://github.com/nathanmarz/storm/downloads
storm详解:http://duanple.blog.163.com/blog/static/7097176720111020102057795/
storm配置详解:http://www.tbdata.org/archives/2118
storm翻译及总结博客:http://chenlx.blog.51cto.com/4096635/d-1/p-1
2.2. Twitter的Rainbird:Rainbird 是一款分布式实时统计系统, Rainbird可以用于实时数据的统计:(1)统计网站中每一个页面,域名的点击次数,(2)内部系统的运行监控(统计被监控服务器的运行状态),(3) 记录最大值和最小值
官方简介:http://www.slideshare.net/kevinweil/rainbird-realtime-analytics-at-twitter-strata-2011
中文介绍:
http://www.cnblogs.com/gpcuster/archive/2011/02/06/1949466.html
3.Facebook 的Puma:facebook使用puma和Habase相结合来处理实时数据,另外facebook发表一篇利用HBase/Hadoop进行实时数据处理的论文(Apache Hadoop Goes Realtime at Facebook),通过一些实时性改造,让批处理计算平台也具备实时计算的能力。
4.淘宝的实时计算、流式处理
4.1银河流数据处理平台:通用的流数据实时计算系统,以实时数据产出的低延迟、高吞吐和复用性为初衷和目标,采用actor模型构建分布式流数据计算框架(底层基于akka),功能易扩展、部分容错、数据和状态可监控。 银河具有处理实时流数据(如TimeTunnel收集的实时数据)和静态数据(如本地文件、HDFS文件)的能力,能够提供灵活的实时数据输出,并提供自定义的数据输出接口以便扩展实时计算能力。 银河目前主要是为魔方提供实时的交易、浏览和搜索日志等数据的实时计算和分析。
4.2.基于storm的流式处理,统计计算、持续计算、实时消息处理。
4.3利用Habase实现的online应用。
5.另外还有一些其他实时计算系统:
Hstreaming:官方网站:http://www.hstreaming.com/technology/hstreaming/
Esper:esper可以用在股票系统、风险监控系统等等要求实时性比较高的系统中
官方网站:http://www.espertech.com/
官网的英文简介:http://www.espertech.com/products/esper.php
中文简介:http://www.cnblogs.com/qlee/archive/2011/06/22/2086550.html
The complex (event) world
http://www.javacodegeeks.com/2012/03/complex-event-world.html
Recently, while searching for an open source solution for real time CEP, our group stumbled across Twitter's Storm project (https://github.com/nathanmarz/storm). It claims to be most comparable to Yahoo's S4, while being in the same space as "Complex Event Processing" systems like Esper and Streambase. I am not sure about Streambase, but digging deeper into the Storm project made it look much different from CEP and from the ESPER solution. Ditto with S4 (http://incubator.apache.org/s4/). While S4 and Storm seem to be good at real time stream processing in a distributed mode and they appeared (as they claim) to be the "Hadoop for Real Time", they didn't seem to have provisions to match patterns (and thus to indicate complex events).
Searching for a definition for CEP (that our study can relate to) led to the following bullets, (http://colinclark.sys-con.com/node/1985400) which include the below four as prerequisite for a system/solution to be called a CEP component/project/solution:
- Domain Specific Language
- Continuous Query
- Time or Length Windows
- Temporal Pattern Matching
The lack of continuous query supporting time/length windows and temporal pattern matching seem to altogether missing in current versions of the S4 and Storm projects. Probably, it is due to their infancy and they will mature up to include such features in future. As of now, they only seem fit for pre-processing the event stream before passing it over to a CEP engine like ESPER. Their ability to do distributed processing (a la map-reduce mode) can help to speed up the pre-processing where events can be filtered off, or enriched via some lookup/computation etc. There have also been some attempts to integrate Storm with Esper (http://tomdzk.wordpress.com/2011/09/28/storm-esper/).
While the processing systems like S4 and Storm lack important features of CEP, ESPER based systems have the disadvantage of being memory bound. Having too many events or having a large time windows can potentially cause ESPER to run out of memory. If ESPER is used to process real time streams, for e.g. from a social media, there will be lot of data accumulating in the ESPER memory. On a high level, the problem statement is to invent a CEP solution for big data. On a finer level, the problem statement include architecting a CEP solution for handling on-board (batched) as well as in-flight (Real-Time) data.
Read more: http://www.javacodegeeks.com/2012/03/complex-event-world.html#ixzz226E53ylu
Beyond MapReduce:谈2011年风靡的数据流计算系统
http://www.programmer.com.cn/9642/
本文关注的是数据流实时应用,数据流计算来自于一个信念:数据的价值随着时间的流逝而降低,所以事件出现后必须尽快地对它们进行处理,最好数据出现时便立刻对其进行处理,发生一个事件进行一次处理,而不是缓存起来成一批处理。这个是用于判断, 你是否需要流计算平台的标准, 实时流平台的目的是面对bigdata, 怎样达到low latency
Facebook Data Free-way and Puma
Facebook实时数据流的用例主要来自站点和网页分析。Facebook Insights,该产品允许站长、广告商、应用开发者和网页主根据时间维度来查看展现/点击/动作计数,计数通过人口统计学逻辑像性别和年龄来做区分,另外还可以查看独立访次和热点像最受欢迎的网址。
构建Facebook Insights产品后端服务的主要挑战有两个方面:一是数据流量非常大,包括Facebook和非Facebook的网站流量;一是用户希望尽快看到摘要,以便他们能够立刻知道最受欢迎的文章或最新的游戏。
早期的Facebook通过已有的数据仓库解决方案来处理Insights流量。日志数据流从HTTP服务器产生,通过日志收集传输框架Scribe耗费秒级时间传送到共享存储NFS文件系统,然后通过小时级的Copier/Loader将数据文件上传到Hadoop。数据摘要通过每天例行的流水作业产生,结果定期会更新到前端的Mysql服务器,以便通过OLTP工具产生报表等。Hadoop 集群节点有3000个,扩展性和容错性方面的问题能够很好地解决。Copier/Loader其实就是MapReduce作业,能够屏蔽机器故障等问题。流水作业是由基于Hive的类SQl语言开发。
- Facebook传统日志处理流程
- Facebook Insights网页查询示意图
早期的这套系统扩展性很好,能够达到数据中心的部署上限,不过整体的处理延迟较大,从日志产生起1~2天后才能得到最终的报表。为了减少整个系统的处理延迟,Facebook做了两方面的工作,一是优化数据的传输通道,一是优化数据的处理系统。
优化后的数据通道称为Data Freeway,能够处理峰值9GB/s的数据并且端到端的延迟在10s以内,支持超过2500种的日志种类。Data Freeway主要包括4个组件,Scribe、Calligraphus、 Continuous Copier和PTail。
Scribe用于客户端,负责通过RPC发送数据;
Calligraphusshuffle数据并写到 HDFS,它提供了日志种类的管理,利用Zookeeper进行辅助;
Continuous Copier将文件从一个HDFS拷贝到另一个 HDFS;
PTail并行地tail多个HDFS上的目录,并写文件数据到标准输出。
换句话说,Data Freeway提供了文件到消息、消息到消息、消息到文件和文件到文件的四种传输方式,可以根据应用场景灵活部署,提高传输效率。
- Facebook Data Freeway数据传输系统
传输通道优化后,Facebook又优化了数据流处理系统Puma。早期的处理系统如下图所示,PTail将数据以流的方式传递给 Puma2,Puma2每秒需要处理百万级的消息,处理多为Aggregation方式的操作,遵循时间序列,涉及的复杂Aggregation操作诸如独立访次、最频繁事件等等。对于每条消息,Puma2发送“increment”操作到HBase。考虑到自动负载均衡、自动容错和写入吞吐等因素,Puma选择HBase而不是Mysql作为其存储引擎。Puma2的服务器都是对等的,也即同时可能有多个Puma2服务器向HBase中修改同一行数据。因此,Facebook为HBase增加了新的功能,支持一条increment操作修改同行数据的多个列。
Puma2系统数据处理通路
Puma2的架构非常简单并且易于维护,其涉及的状态仅仅是PTail的checkpoint ,即上游数据位置,周期性地存储在HBase中。由于是对称结构,集群扩容和机器故障时的处理都非常方便。不过,Puma2的缺点也很突出,首先,HBase的 increment操作是非常昂贵的,因为它涉及到读和写,而HBase的随机读效率是很差的;另外,复杂Aggregation操作也不好支持,需要在HBase上写很多用户代码;再者,Puma2在故障时会产生少量重复数据,因为HBase的increment和PTail的checkpoint并不是一个原子操作。
- Puma3系统数据处理通路
由于Puma2的缺点限制了整个数据通路的延迟优化,因此Facebook开发了新的Puma3系统。两代系统最大的区别在于Puma3的Aggregation是在本地内存执行的,而不是基于HBase,这使得Puma3的处理延迟很低。为了支持内存中的Aggregation,数据在 Puma3中通过Aggregation Key进行分片,这个分桶策略在传输通道的Calligraphus阶段实施的。Puma3的每个分片都是内存中的哈希表,每个表项对应一个Key及用户定义的Aggregation方法,包括count、sum、avg等等。HBase只作为持久化存储,定期 Puma3将内存数据checkpoint到HBase中,只有当Puma3故障时才从HBase中读相关数据进行重放。由于正常处理逻辑绕开了 HBase的读流程,因此整个处理系统的效率大大增加。
总结
首先不要为了技术而技术, 在采用实时流处理平台之前, 先考虑你是否真的需要它, 你如果是可以容忍几个小时的latency, 数据的价值随着时间的流逝而降低的程度你可以接受, 那么Hadoop对你就是个不错的方案.
如果对于bigdata, 追求严格的low latency, 那么你就需要使用实时流处理平台, 下面是些提到的主要的流处理平台,
Kafka, Flume Aggregtion系统, 对于Kafka比较熟悉, 常用于数据crawling和聚合, 和数据预处理
Storm, S4, Puma 对于Storm可以理解为real-time版的hadoop, 但是他只是定义了spout和bolt的topology, 具体spout和bolt的逻辑你可以随便定义
Esper, HStreaming CEP系统
What are the main differences between Storm and Kafka?
Nathan Marz, I'm the creator of Storm.
At a high level: Kafka is a distributed, persistent message broker. Storm is a realtime computation system.
They actually can be used together. You could use Kafka to store the source stream and use Storm to feed off that stream to do complex computation.
Storm 0.7.0 introduces transactional topologies, which let you get exactly-once messaging semantics in your processing (https://github.com/nathanmarz/st... ). Transactional topologies require a source queue more sophisticated than Kestrel or RabbitMQ, and Kafka is a perfect fit. storm-contrib has a TransactionalSpout implementation for Kafka:
storm-kafka provides a regular spout implementation and a TransactionalSpout implementation for Apache Kafka 0.7.
https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka
这是Storm的作者解释, Kafka和Storm的区别, 我的理解是, Kafka作为message streaming system也可以用于real-time分析, Linkedin发布的时候就是这也是kafka的优势
但是Kafka的主要目的还是distributed, persistent message broker, 在分析上不够灵活, 一般kafka的逻辑, 取数据--处理--转发, 如果你要做些聚合的算法, 或复杂的分析, kafka是无法应付的
而Storm是专门为realtime computation 设计的系统, 所以象Nathan说的, 两者结合可能更好, 把计算和分析交给storm更合适一些
Storm VS Esper
象前面一篇blog介绍的那样, storm提供的一种分布式实时计算的框架, 但是他没有实现象esper那么多方便的function, 如supporting time/length windows and temporal pattern matching.
而Esper问题是缺乏扩展性, 当数据过多, time windows过大时, 内存会爆.
所以如果可以结合, 比如下面的包, 可以将esper作为storm的一个bolt.
https://github.com/tomdz/storm-esper
The storm-esper
library provides a bolt that allows you to use Esper queries on Storm data streams.
本文章摘自博客园,原文发布日期:2012-08-04