让Storm插上CEP的翅膀 - Siddhi调研和集成

什么是 Siddhi?

Siddhi 是一种 lightweight, easy-to-use, open source CEP(Complex Event Processing)引擎,由wso2公司开发(http://wso2.com/about/)。

像绝大多数的 CEP 系统一样,Siddhi 支持对于流式数据的类 SQL 的查询,SQL 式的 query 通过 complier 翻译成 Java 代码。 
当一条数据流或多条数据流流入时,Siddhi Core 会实时的 check 当前数据流是否满足定义的 query,如果满足则触发 Callback 执行相应的逻辑。

Siddhi和传统的CEP系统,如Esper,相比区别? 
主要是比较轻量和高效,之所以可以达到更高的 performance,因为:

  • Multi-threading
  • Queues and use of pipelining
  • Nested queries and chaining streams
  • Query optimization and common sub query elimination

尤其是前两点非常关键,传统的CEP系统,如果Esper,都是使用单线程去处理所有的 query matching,这样虽然简单,但是效率不高,无法利用 cpu 多核。 
所以 Siddhi 采用多线程,并且结合pipeline机制,如下图

Siddhi 将整个 query 切分成独立的 stages,即 processors,这样做的好处,首先是便于多线程化,再者,可以重用相同的 processor; 
而 processor 之间通过 queue 进行连接,这里就不详细描述了,有兴趣的同学可以去仔细看 Siddhi 的论文和文档。

 

Siddhi 能做什么?

下面我们就来看看,最关键的,Siddhi 可以为我们做什么?

这里就用几个形象的例子来说明 Siddhi 使用的典型的场景

简单 ETL

我们先用个最简单的例子,看看如果 run 一个真正的 Siddhi 例子,

上面说了,Siddhi 是用类 SQL 的查询语言,

首先需要先定义流的格式,

define stream TempStream (deviceID long, roomNo int, temp double);

然后定义查询,

from TempStream
select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom
insert into RoomTempStream;

这样就能实现一个完整的 ETL 过程, 
extraction,将需要的字段从 TempStream 里面 select 出来; 
transform, 将摄氏温度转换为华氏温度; 
loading,将结果输出到RoomTempStream流;

很方便,不用再另外写任何的逻辑,只需要写类SQL的Query语句。 
为了增加感性认识,我给出一个完成的 Java 测试例子,

SiddhiManager siddhiManager = new SiddhiManager();

String executionPlan = "" +
        "ddefine stream TempStream (deviceID int, roomNo int, temp float);" +
        "" +
        "@info(name = 'query1') " +
        "from TempStream " +
        "select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom " +
        "insert into RoomTempStream;";

ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);

executionPlanRuntime.addCallback("query1", new QueryCallback() {
    @Override
    public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
        EventPrinter.print(timeStamp, inEvents, removeEvents);
    }
});

InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
executionPlanRuntime.start();

inputHandler.send(new Object[] {12344, 201, 28.2f});
inputHandler.send(new Object[] {12345, 202, 22.2f});inputHandler.send(new Object[] {12346, 203, 24.2f});
//Shutting down the runtime
executionPlanRuntime.shutdown();

//Shutting down Siddhi
siddhiManager.shutdown();

 

基于 window 聚合

Siddhi 支持很多中类型的 window,具体参考https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows-time

这里给出最基本的,基于时间窗口的例子,

from TempStream#window.time(1 min)
select roomNo, avg(temp) as avgTemp
group by roomNo
insert all events into AvgRoomTempStream ;

这个查询会计算以1分钟为滑动窗口的,每个 room 的平均温度

Siddhi时间窗口也支持,按照外部的输入的时间进行聚合,但是它要求时间是必须递增的;这点我们brain的聚合库比它通用,可以适用于非单调递增的场景

 

多个流 Join

Siddhi 支持基于 window 的多个流的实时 join,

from TempStream[temp > 30.0]#window.time(1 min) as T
  join RegulatorStream[isOn == false]#window.length(1) as R
  on T.roomNo == R.roomNo
select T.roomNo, T.temp, R.deviceID, 'start' as action
insert into RegulatorActionStream ;

上面的查询将,TempStream 和RegulatorStream 通过 roomNo 进行 join

 

Pattern Query

这种 query 最能表达出 CEP 的威力,什么是Pattern Query? 
“Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival.”

直接看个例子,用 Pattern 查询来 detect credit card/ATM transaction frauds:

from every a1 = atmStatsStream[amountWithdrawed < 100]
    -> b1 = atmStatsStream[amountWithdrawed > 10000 and a1.cardNo == b1.cardNo]
    within 1 day
select a1.cardNo as cardNo, a1.cardHolderName as cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as location, b1.cardHolderMobile as cardHolderMobile
insert into possibleFraudStream;

注意看到这个符号‘->’,这个表示 event 发生顺序, 
上面这个查询的意思就是,在一天内,出现一次取现金额 < 100后,同一张卡,出现取现金额 > 10000,则认为可能是 fraud。

当然这只是个例子,不是说这样真的可以 detect fraud。你可以参照这个,写出更为复杂的查询。

 

Sequence Query

和 pattern 的区别是,pattern 的多个 event 之间可以是不连续的,但 sequence 的 events 之间必须是连续的。

我们可以看个例子,用 sequence 来发现股票价格的 peak:

from every e1=FilteredStockStream[price>20],            e2=FilteredStockStream[((e2[last].price is null) and price>=e1.price) or ((not (e2[last].price is null)) and price>=e2[last].price)],
e3=FilteredStockStream[price<e2[last].price]
select e1.price as priceInitial, e2[last].price as pricePeak, e3.price as priceAfterPeak
insert into PeakStream ;

上面的查询的意思, 
e1,收到一条 event.price>20 
e2,后续收到的所有 events 的 price,都大于前一条 event 
e3,最终收到一条 event 的 price,小于前一条 event

ok,我们发现了一个peak

 

Siddhi 还有其他很多的功能,这里就不一一说明。。。。。。

 

集成到 Storm

那么最后,我们看看如何将 Siddhi 融入到我们当前的框架中,达到作为 Brain 补充的目的。

我将 Siddhi core 封装成一个 Siddhi Bolt,这样可以在 JStorm 的 topology 中很灵活的,选择是否什么方案,可以部分统计用 brain,部分用 Siddhi,非常简单。

废话不说,直接给出源码,供大家参考,

public class SiddhiBolt implements IRichBolt {
protected OutputCollector collector;
    protected  SiddhiManager siddhiManager = null;
    protected String executionPlan = null;
    ExecutionPlanRuntime executionPlanRuntime = null;
    protected HashMap<String,InputHandler> handlers = null;

    public SiddhiBolt(String plan) {
        this.executionPlan = plan;
    }

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.siddhiManager = new SiddhiManager();
        this.executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
        addCallbacks();
        handlers = new HashMap<String,InputHandler>();
        executionPlanRuntime.start();
    }

    public void execute(Tuple tuple) {
        String inputStream = tuple.getSourceStreamId();
        InputHandler inputHandler = getInputHandler(inputStream);

        List<Object> values = tuple.getValues();
        Object[] objects = values.toArray();

        try {
            inputHandler.send(objects);
        }catch (Exception e){
            LOG.error("Send stream event error: ", e);
        }
        // collector.fail(tuple); //test replay
        collector.ack(tuple); // remember ack the tuple
        // Make sure that add anchor tuple if you want to track it
        // collector.emit(streamid, tuple,new Values(counters, now));
    }

    public InputHandler getInputHandler(String streamName){
        InputHandler handler = null;
        if(handlers.containsKey(streamName))
            handler = handlers.get(streamName);
        else {
            handler = executionPlanRuntime.getInputHandler(streamName);
            if (handler != null) {
                handlers.put(streamName, handler);
            }
        }
        return handler;
    }

    //Need Override
    public void addCallbacks( ){
        //StreamCallback example
        executionPlanRuntime.addCallback("outputStream", new StreamCallback() {

            @Override
            public void receive(Event[] events) {
                LOG.info("receive events: " + events.length);
                for (Event e:events)
                    LOG.info(e);
            }
        });

        //QueryCallback example
        executionPlanRuntime.addCallback("query1", new QueryCallback() {
            @Override
            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
                printEvents(timeStamp, inEvents, removeEvents);
            }
        });
    }

    public void printEvents(long timeStamp, Event[] inEvents, Event[] removeEvents){
        StringBuilder sb = new StringBuilder();
        sb.append("Events{ @timeStamp = ").append(timeStamp).append(", inEvents = ").append(
            Arrays.deepToString(inEvents)).append(", RemoveEvents = ").append(Arrays.deepToString(removeEvents)).append(" }");
        LOG.info(sb.toString());
    }

    public void cleanup() {
        //Shutting down the runtime
        executionPlanRuntime.shutdown();

        //Shutting down Siddhi
        siddhiManager.shutdown();
    }
}

2015-12-15
时间: 2024-08-01 10:17:40

让Storm插上CEP的翅膀 - Siddhi调研和集成的相关文章

为PowerPoint文档插上音乐的翅膀

Powerpoint 2003的功能已经相当丰富,但还没有一种直接为整个幻灯片添加背景音乐的功能.虽然我们可以通过依次点击"插入-影片和声音-文件中的声音"的方法来插入音乐(图1),但是当演示到下一张幻灯片的时候音乐就会停止.因此要使整个幻灯片使用一个背景音乐似乎是不可能实现的事情. 文档插上音乐的翅膀-powerpoint文档恢复"> 但事实并非如此,下面我就将我在实际操作中所总结的在PowerPoint中实现插入背景音乐的两种方法和大家一起分享(本文以PowerPo

中海阳:拥抱“十三五” 让光伏插上腾飞的翅膀

回顾光伏行业过去的2015年,脑海里立马浮现出了好几个记忆深刻的关键词,欧盟反倾销.光伏补贴.汉能李河君等等.虽然在光伏的这条路上充满着血和泪,但是,他们挺过了2015,迎来了崭新的2016年,迎来了"十三五"规划的开局之年,迎来了整个光伏行业涅槃重生---. 针对光伏行业的"十三五"规划,我们采访了光伏行业的领军企业中海阳的相关负责人,该负责人表示:"随着光伏"十三五"规划的成功落地,国家针对光伏行业的政策红利将会在极大程度上促进我国

视频监控为校园安全插上“隐形的翅膀”

随着公众社会安全防范意识的提高,安防系统慢慢在各种公共场合普及.而我国教育事业的发展,高校的规模越来越大,学生人数不断增多,校区开放程度和后勤服务社会化程度亦越来越高,学校的管理工作和安全保安工作面临新的课题,校园安防也凸显了无限的商机.最早接触的校园监控系统是高中的学校,在没个教师安装的高速球,但当时的主要作用是监督学生的上课情况和自习情况.而现在校园安防已经在各大中小学慢慢普及开来. 校园安防--安防行业的下一片"蓝海",各地高校的开放度高,人员杂,流动大,因而增加了校园安防工作的

给传统营销插上数字化的翅膀

营销中所谓的传统在于传统的思维与意识.传统的传播内容与手段,然而在数字化时代背景下,杂糅了新营销元素的整合营销多了起来.若单纯只进行传统营销而 忽视了数字营销,确实不合时宜,所谓与时俱进,在营销上也如此. 在众多行业中,譬如在央视广告招标上闪亮夺目的酒类企业,其通过央视覆盖力强大的传播阵势,塑造品牌,这是一个行业的传播特征,高举高打的品牌需要强 曝光,需要选择对的媒介.早前在业界一直流传这样一句话:"品牌"知名度=广告投入量.比如耐克品牌每年全球广告投入15-25亿美元的广告费用,广告

AMD郭可尊:用信息化为农民插上致富的翅膀

为了配合国家"电脑下乡"政策的深入推广,加速农村信息化建设,11月19日,由工业和信息化部指导,全球领先的半导体企业AMD公司负责建设的 "农村综合信息服务培训中心"落户安顺市平坝县夏云镇.工业和信息化部信息化推进司副巡视员张保泰,AMD公司全球高级副总裁.大中华区总裁郭可尊女士以及贵州省经济和信息化委员会.安顺市的相关领导出席了该培训中心的揭牌仪式. 据悉,"农村综合信息服务培训中心"由工业和信息化部指导支持,AMD公司负责建设.在建设过程中,

当教育插上互联网的翅膀,在线教育还需本质突破

被不断"神化"的互联网思维,随着互联网的不断普及,已深入各行各业,教育行业也不例外. 用互联网思维改造传统行业似乎成了一个致胜的法宝.在线教育就是在这样的背景下,在2014年彻底地爆发了. 这一年以来,互联网对教育行业发起了前所未有的冲击,教育,这一个人类最悠久的行为分支,在近一两年中被迅速地改变着.学生们从单纯依赖教室授课的模式,变得更多的利用屏幕接触知识. 据统计数据显示,2013年年初以来,平均每天有2.6家在线教育公司诞生,另外多达60家在线教育企业获得了投资,有的甚至未上线就

为电子商务插上社会化的翅膀(二):四字箴言

中介交易 SEO诊断 淘宝客 云主机 技术大厅 接昨天的话题为电子商务插上社会化的翅膀(一),今天开始写写PPT中最后一部分,电子商务网站(这里主要是针对网络零售商)如何实现社会化.也许现在很多电子商务网站或多或少都在做这个事情,或者有计划要做,而做的最多的事情是通过社会化媒体去做一些推广工作,今天我会比较全面的为大家介绍下电子商务网站如何实现社会化. 电子商务网站实现社会化的四字箴言 电子商务网站实现社会化,必须要内外兼修,而我提出的四字箴言添(特性),育(社区),接(关系),建(基地)也是从

腾讯林松涛:希望腾讯为开发者插上梦想的翅膀

中介交易 http://www.aliyun.com/zixun/aggregation/6858.html">SEO诊断 淘宝客 云主机 技术大厅 速途网讯 腾讯开放平台总经理林松涛在2013年腾讯合作伙伴大会中介绍平台上线后每年都有跨越式的发展.根据林松涛的介绍,这个数字预计在2013年会达到30亿. 林松涛表示,在过去的一年中腾讯的开放平台开发者收入呈井喷式增长,开放平台应用流水增长率在过去的一年中增长了138%,开放平台开发者收入增长率为142%.此外,虽然PC端在全球走衰,但据林

周忻:给易居插上互联网的“翅膀”

徐健 "2000年公司成立的时候,我只想在卖房子这块做第一,我做到了:2005年,成立CRIC数据信息公司的时候,我希望这套数据系统能够在全国做大做强,我做到了:2009年,我们和新浪合并上市,进驻互联网媒体平台,把乐居作为中国最具有影响力的房地产互联网媒体平台,我也做到了.现在做电子商务平台,我希望我们也能脚踏实地做到." 12月14日,易居董事局主席周忻接受<第一财经日报>记者专访时,再次表达了他对发展房地产电商的"一意孤行". "不疯魔,