Siddhi CEP Window机制

https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Window

https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows

http://wso2.com/library/articles/2013/06/understanding-siddhi-powers-wso2-cep-2x/

https://docs.wso2.com/display/CEP400/Samples+on+Processing+Events

 

windows机制有点晦涩,而且例子给的也不充分,这里详细看看。

基本语法:

from <input stream name>[<filter condition>]#window.<window name>(<parameter>, <parameter>, ... )

select <attribute name>, <attribute name>, ...

insert [current events | expired events | all events] into <output stream name>

window.length

直接看个例子,这里用expired event,但使用的时候往往不用expired

"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.length()" +
"select symbol, price, avg(price) as ap, sum(price) as sp, count(price) as cp " +
"group by symbol " +
"insert expired events into outputStream;";

简单解释下,

define,定义stream,stream中每个event的结构

@info,可选,定义query的名字

query的含义,对于cseEventStream,当price<700时,生成length为4的窗口 
那么当windows的length超过4的时候,就会产生expired event,此时就会触发insert操作

insert的内容取决于select

下面我输入如下的流数据,

int i = 0;
while (i < 10) {
    float p = i*10;
    inputHandler.send(new Object[]{"WSO2", p, 100});
    System.out.println("\"WSO2\", " + p);
    inputHandler.send(new Object[] {"IBM", p, 100});
    System.out.println("\"IBM\", " + p);
    Thread.sleep(1000);
    i++;
}

得到的结果部分如下,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
receive events: 1 Event{timestamp=1447906176329, data=[WSO2, 0.0, 15.0, 30.0, 2], isExpired=false}
"IBM", 30.0
receive events: 1
Event{timestamp=1447906176331, data=[IBM, 0.0, 15.0, 30.0, 2], isExpired=false}
"WSO2", 40.0receive events: 1 
Event{timestamp=1447906177331, data=[WSO2, 10.0, 25.0, 50.0, 2], isExpired=false} 
"IBM", 40.0
receive events: 1
Event{timestamp=1447906177331, data=[IBM, 10.0, 25.0, 50.0, 2], isExpired=false}

解释下,可以说明几个问题,

1. window length = 6, 所以当发出第7个event的时,会触发expired

2. 此时,outputStream就会收到这条expired的event

3. 从这个event当然我们可以得到该event的所有信息,并且还可以通过aggregate functions来得到当前window中的所有events的统计值 
这个地方很难以理解,得到的event只是expired的,无法得到window中的所有event,但用aggre func却可以对window你们的events做统计

这里我们做了3个统计,平均值,sum, count,这样你可以看出avg是怎么算出来的? 
比如,对于Event{timestamp=1447906176329, data=[WSO2, 0.0, 15.0, 30.0, 2], isExpired=false} 
由于我们加了groupby,所以只会针对symbol=wso2的做统计, 
当我们发送"WSO2", 30.0 时,会触发"WSO2", 0.0的过期,你会发现这时候去统计,这两条event都会被排除在外,参加统计的如下

"IBM", 0.0 "WSO2", 10.0 "IBM", 10.0 "WSO2", 20.0 "IBM", 20.0

所以,count为2, sum为30,而avg=15

如果不加groupby的结果如下,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
receive events: 1
Event{timestamp=1447913986723, data=[WSO2, 0.0, 12.0, 60.0, 5], isExpired=false}
"IBM", 30.0
receive events: 1
Event{timestamp=1447913986725, data=[IBM, 0.0, 18.0, 90.0, 5], isExpired=false}

这样就不会管symbol是什么,会把window里面的全相加

 

这里expired event是可选的,还有current event和all event, 
expired event是当event expired时触发,那么current event就是当event达到时触发,all event就是两种情况都触发,

下面我们看看如果换成all event,会是什么结果,我测的结果是和current event一样的,只会在event到达的时候触发,bug?

"WSO2", 10.0
"IBM", 10.0
receive events: 1
Event{timestamp=1447914310502, data=[WSO2, 10.0, 5.0, 10.0, 2], isExpired=false}
receive events: 1
Event{timestamp=1447914310502, data=[IBM, 10.0, 5.0, 10.0, 2], isExpired=false}
"WSO2", 20.0
"IBM", 20.0
receive events: 1
Event{timestamp=1447914311503, data=[WSO2, 20.0, 10.0, 30.0, 3], isExpired=false}
receive events: 1
Event{timestamp=1447914311503, data=[IBM, 20.0, 10.0, 30.0, 3], isExpired=false}
"WSO2", 30.0
"IBM", 30.0
receive events: 1
Event{timestamp=1447914312503, data=[WSO2, 30.0, 20.0, 60.0, 3], isExpired=false}
receive events: 1
Event{timestamp=1447914312503, data=[IBM, 30.0, 20.0, 60.0, 3], isExpired=false}
"WSO2", 40.0
"IBM", 40.0
receive events: 1
Event{timestamp=1447914313503, data=[WSO2, 40.0, 30.0, 90.0, 3], isExpired=false}
receive events: 1
Event{timestamp=1447914313503, data=[IBM, 40.0, 30.0, 90.0, 3], isExpired=false}

window.time

这个和length是一样的,只是触发条件是time

"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.time()" +
"select symbol, price, avg(price) as ap, sum(price) as sp, count(price) as cp " +
"group by symbol " +
"insert expired events into outputStream;";

得到结果如下,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
receive events: 1
Event{timestamp=1447915287974, data=[WSO2, 0.0, 10.0, 10.0, 1], isExpired=false}
receive events: 1
Event{timestamp=1447915287977, data=[IBM, 0.0, 15.0, 30.0, 2], isExpired=false}
"WSO2", 30.0
"IBM", 30.0
receive events: 2
Event{timestamp=1447915288975, data=[WSO2, 10.0, 20.0, 20.0, 1], isExpired=false}
Event{timestamp=1447915288975, data=[IBM, 10.0, 20.0, 20.0, 1], isExpired=false}

可以看到,这里expire是根据时间的,所以expire不一定是在event来的时候判断,而是根据scheduled timer,如下图,

所以在算统计的时候,取决于当时间timer被触发时,window里面有几个event,所以上面的结果有可能是1,也有可能是2

 

window.lengthBatch;timeBatch

这种window就是非sliding的,直接看例子,

"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.lengthBatch(4)" +
"select symbol, price " +
"insert expired events into outputStream;";

仍然是上面的输入,得到结果,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
"IBM", 30.0
receive events: 4
Event{timestamp=1447923776094, data=[WSO2, 0.0], isExpired=false}
Event{timestamp=1447923776094, data=[IBM, 0.0], isExpired=false}
Event{timestamp=1447923776094, data=[WSO2, 10.0], isExpired=false}
Event{timestamp=1447923776094, data=[IBM, 10.0], isExpired=false}
"WSO2", 40.0
"IBM", 40.0
"WSO2", 50.0
"IBM", 50.0
receive events: 4
Event{timestamp=1447923778094, data=[WSO2, 20.0], isExpired=false}
Event{timestamp=1447923778094, data=[IBM, 20.0], isExpired=false}
Event{timestamp=1447923778094, data=[WSO2, 30.0], isExpired=false}
Event{timestamp=1447923778094, data=[IBM, 30.0], isExpired=false}

可以看到,lengthBatch设为4,当window的length达到8的时候,才触发expired

每次以一个batch进行expire,所以每次收到4条events,并且不重复的,所以window是没有sliding的

 

再看过timeBatch的例子,这次用 all event

"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.timeBatch(3 sec)" +
"select symbol, price " +
"insert all events into outputStream;";

结果如下,我们每发一组会sleep 1s,所以发6组后触发第一次expired,expire 6条events 
并且可以看到,这次除了expire,在event reach的时候也会触发output,因为这次我们用的是all event

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
"WSO2", 20.0
"IBM", 20.0
receive events: 6
Event{timestamp=1447924146613, data=[WSO2, 0.0], isExpired=false}
Event{timestamp=1447924146614, data=[IBM, 0.0], isExpired=false}
Event{timestamp=1447924147614, data=[WSO2, 10.0], isExpired=false}
Event{timestamp=1447924147614, data=[IBM, 10.0], isExpired=false}
Event{timestamp=1447924148614, data=[WSO2, 20.0], isExpired=false}
Event{timestamp=1447924148614, data=[IBM, 20.0], isExpired=false}
"WSO2", 30.0
"IBM", 30.0
"WSO2", 40.0
"IBM", 40.0
"WSO2", 50.0
"IBM", 50.0
receive events: 12
Event{timestamp=1447924152571, data=[WSO2, 0.0], isExpired=false}
Event{timestamp=1447924152571, data=[IBM, 0.0], isExpired=false}
Event{timestamp=1447924152571, data=[WSO2, 10.0], isExpired=false}
Event{timestamp=1447924152571, data=[IBM, 10.0], isExpired=false}
Event{timestamp=1447924152571, data=[WSO2, 20.0], isExpired=false}
Event{timestamp=1447924152571, data=[IBM, 20.0], isExpired=false}
Event{timestamp=1447924149614, data=[WSO2, 30.0], isExpired=false}
Event{timestamp=1447924149614, data=[IBM, 30.0], isExpired=false}
Event{timestamp=1447924150614, data=[WSO2, 40.0], isExpired=false}
Event{timestamp=1447924150614, data=[IBM, 40.0], isExpired=false}
Event{timestamp=1447924151614, data=[WSO2, 50.0], isExpired=false}
Event{timestamp=1447924151614, data=[IBM, 50.0], isExpired=false}

但对于这样的场景,我们一般的需求是,对于batch做些统计, 例子,

"define stream cseEventStream (symbol string, price float, volume long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.lengthBatch(4) " +
"select symbol, price, avg(price) as avgPrice " +
"group by symbol " +
"insert into outputStream;";

得到的结果,

"WSO2", 0.0
"IBM", 0.0
"WSO2", 10.0
"IBM", 10.0
receive events: 2
Event{timestamp=1447991871794, data=[WSO2, 10.0, 5.0], isExpired=false}
Event{timestamp=1447991871794, data=[IBM, 10.0, 5.0], isExpired=false}
"WSO2", 20.0
"IBM", 20.0
"WSO2", 30.0
"IBM", 30.0
receive events: 2
Event{timestamp=1447991873795, data=[WSO2, 30.0, 25.0], isExpired=false}
Event{timestamp=1447991873795, data=[IBM, 30.0, 25.0], isExpired=false}

可以看到,对于batch中的数据可以groupby,并进行avg统计, 
注意这里,不要用expired events,否则aggre结果一直为0,因为对于batch,每次expire完后,window里面是空的。

 

window.externalTime

https://docs.wso2.com/display/CEP400/Sample+0114+-+Using+External+Time+Windows

这个挺有用,可以以外部的时间进行slide window,因为大部分时间可能是根据采集时间,而非到达时间做聚合

但局限在于,externalTime必须递增的,有时候在实际场景中,无法保证严格的时序。

看例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.externalTime(time, 3 sec) " +
"select symbol, price, time, sum(price) as ap, count(price) as cp " +
"group by symbol " +
"insert expired events into outputStream;";

发送的代码如下,

int i = 0;
long time = 1447921187000L;
while (i < 10) {
    float p = i*10;
    inputHandler.send(new Object[]{"WSO2", p, time});
    System.out.println("\"WSO2\", " + p + ", " + time);
    inputHandler.send(new Object[] {"IBM", p, time});
    System.out.println("\"IBM\", " + p + ", " + time);
    Thread.sleep(1000);
    i++;
    time = time + 1000;
}

目的,就是按外部时间time,进行sliding window,结果如下,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
receive events: 2
Event{timestamp=1447921190000, data=[WSO2, 0.0, 1447921187000, 30.0, 2], isExpired=false}
Event{timestamp=1447921190000, data=[IBM, 0.0, 1447921187000, 30.0, 2], isExpired=false}
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
receive events: 2
Event{timestamp=1447921191000, data=[WSO2, 10.0, 1447921188000, 50.0, 2], isExpired=false}
Event{timestamp=1447921191000, data=[IBM, 10.0, 1447921188000, 50.0, 2], isExpired=false}

可以看到根据传入的time,当收到"WSO2", 30.0, 1447921190000 时触发3秒的过期 
其他的和普通的sliding window没有区别

window.cron

https://docs.wso2.com/display/CEP400/Sample+0115+-+Quartz+scheduler+based+alerts

定时任务,其实用timeBatch也可以实现,只是cron更方便些

例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.cron('*/4 * * * * ?') " +
"select symbol, time, sum(price) as ap, count(price) as cp " +
"group by symbol " +
"insert into outputStream;";

关键是要理解cron的语法,参考http://www.cnblogs.com/wangyuyu/p/4230742.html

Siddhi的语法多了秒,所以第一个是秒,*/4,即每4秒触发一次

得到结果如下,可以看到确实是每4秒触发一次

"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
receive events: 2
Event{timestamp=1448006719652, data=[WSO2, 1447921191000, 100.0, 4], isExpired=false}
Event{timestamp=1448006719652, data=[IBM, 1447921191000, 100.0, 4], isExpired=false}
"WSO2", 50.0, 1447921192000
"IBM", 50.0, 1447921192000
"WSO2", 60.0, 1447921193000
"IBM", 60.0, 1447921193000
"WSO2", 70.0, 1447921194000
"IBM", 70.0, 1447921194000
"WSO2", 80.0, 1447921195000
"IBM", 80.0, 1447921195000
receive events: 2
Event{timestamp=1448006723653, data=[WSO2, 1447921195000, 260.0, 4], isExpired=false}
Event{timestamp=1448006723653, data=[IBM, 1447921195000, 260.0, 4], isExpired=false}

 

window.unique, window.firstUnique

功能如其意,直接看例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.unique(symbol) " +
"select symbol, price, time " +
"insert into outputStream;";

得到结果,从结果看起来,就和普通的流流过一样, 
因为每次这个symbol有更新都会触发一次event,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
receive events: 2
Event{timestamp=1448009613618, data=[WSO2, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448009613620, data=[IBM, 0.0, 1447921187000], isExpired=false}
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
receive events: 1
Event{timestamp=1448009614633, data=[WSO2, 10.0, 1447921188000], isExpired=false}
receive events: 1
Event{timestamp=1448009614633, data=[IBM, 10.0, 1447921188000], isExpired=false}
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
receive events: 2
Event{timestamp=1448009615650, data=[WSO2, 20.0, 1447921189000], isExpired=false}
Event{timestamp=1448009615650, data=[IBM, 20.0, 1447921189000], isExpired=false}
"WSO2", 30.0, 1447921190000
receive events: 1
"IBM", 30.0, 1447921190000
Event{timestamp=1448009616650, data=[WSO2, 30.0, 1447921190000], isExpired=false}
receive events: 1
Event{timestamp=1448009616650, data=[IBM, 30.0, 1447921190000], isExpired=false}

再看看first unique,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.firstUnique(symbol) " +
"select symbol, price, time " +
"insert into outputStream;";

得到的结果,可以看到只有symbol第一次出现时,会触发

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
receive events: 1
Event{timestamp=1448008769827, data=[WSO2, 0.0, 1447921187000], isExpired=false}
receive events: 1
Event{timestamp=1448008769831, data=[IBM, 0.0, 1447921187000], isExpired=false}
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
"WSO2", 50.0, 1447921192000
"IBM", 50.0, 1447921192000
"WSO2", 60.0, 1447921193000
"IBM", 60.0, 1447921193000
"WSO2", 70.0, 1447921194000
"IBM", 70.0, 1447921194000
"WSO2", 80.0, 1447921195000
"IBM", 80.0, 1447921195000
"WSO2", 90.0, 1447921196000
"IBM", 90.0, 1447921196000

 

这个往往和join会同时使用,如

from SymbolStream#window.lenght(1) unidirectional join StockExchangeStream#window.unique("symbol")
insert  into StockQuote StockExchangeStream.symbol as symbol,StockExchangeStream.price as lastTradedPrice

 

Output Rate Limiting

只所以在这里介绍这个,是因为觉得和unique一起用,很合适

基本语法,output ({<output-type>} every (<time interval>|<event interval> events) | snapshot every <time interval>)

其中"<output-type>","first", "last" and "all",默认是all

比如普通的window,如果每条都触发,太频繁了,我只想固定条数或时间触发一次就可以 
这个对于unique尤为合适,因为使用unique,一般是只想知道最新的情况,所以每一条都触发是没有意义的,定期触发就可以

还是用前面的例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.unique(symbol) " +
"select symbol, price, time " +
"group by symbol " +
"output last every 5 events " +
"insert into outputStream;";

得到的结果,虽然加上group by symbol,所以每次都会分别输出wso2,ibm两条 
但是对于event数的判断还是合一块的,并不是5条wso2或5条ibm触发

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
receive events: 2
Event{timestamp=1448010405404, data=[WSO2, 20.0, 1447921189000], isExpired=false}
Event{timestamp=1448010404405, data=[IBM, 10.0, 1447921188000], isExpired=false}
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
receive events: 2
Event{timestamp=1448010407404, data=[IBM, 40.0, 1447921191000], isExpired=false}
Event{timestamp=1448010407404, data=[WSO2, 40.0, 1447921191000], isExpired=false}

用时间也是一样的,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.unique(symbol) " +
"select symbol, price, time " +
"group by symbol " +
"output last every 5 sec " +
"insert into outputStream;";

结果如下,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
"WSO2", 40.0, 1447921191000
"IBM", 40.0, 1447921191000
receive events: 2
Event{timestamp=1448010645533, data=[WSO2, 40.0, 1447921191000], isExpired=false}
Event{timestamp=1448010645533, data=[IBM, 40.0, 1447921191000], isExpired=false}
"WSO2", 50.0, 1447921192000
"IBM", 50.0, 1447921192000
"WSO2", 60.0, 1447921193000
"IBM", 60.0, 1447921193000
"WSO2", 70.0, 1447921194000
"IBM", 70.0, 1447921194000
"WSO2", 80.0, 1447921195000
"IBM", 80.0, 1447921195000
"WSO2", 90.0, 1447921196000
"IBM", 90.0, 1447921196000
receive events: 2
Event{timestamp=1448010650533, data=[WSO2, 90.0, 1447921196000], isExpired=false}
Event{timestamp=1448010650533, data=[IBM, 90.0, 1447921196000], isExpired=false}

 

snapshot功能,emit all current events arrived so far,这个一般不会直接这么用,想不出啥场景

例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.unique(symbol) " +
"select symbol, price, time " +
"group by symbol " +
"output snapshot every 2 sec " +
"insert into outputStream;";

结果如下,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
receive events: 4
Event{timestamp=1448011434403, data=[WSO2, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448011434405, data=[IBM, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448011435405, data=[WSO2, 10.0, 1447921188000], isExpired=false}
Event{timestamp=1448011435405, data=[IBM, 10.0, 1447921188000], isExpired=false}
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
"WSO2", 30.0, 1447921190000
"IBM", 30.0, 1447921190000
receive events: 8
Event{timestamp=1448011434403, data=[WSO2, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448011434405, data=[IBM, 0.0, 1447921187000], isExpired=false}
Event{timestamp=1448011435405, data=[WSO2, 10.0, 1447921188000], isExpired=false}
Event{timestamp=1448011435405, data=[IBM, 10.0, 1447921188000], isExpired=false}
Event{timestamp=1448011436405, data=[WSO2, 20.0, 1447921189000], isExpired=false}
Event{timestamp=1448011436405, data=[IBM, 20.0, 1447921189000], isExpired=false}
Event{timestamp=1448011437405, data=[WSO2, 30.0, 1447921190000], isExpired=false}
Event{timestamp=1448011437405, data=[IBM, 30.0, 1447921190000], isExpired=false}

 

window.sort

在window中排序,

<event> sort(<int> windowLength, <string> attribute, <string> order, .. , <string> attributeN, <string> orderN)

order,"asc" or "desc",默认为asc

例子,

"define stream cseEventStream (symbol string, price float, time long);" +
"" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.sort(, price, 'asc') " +
"select symbol, price, time " +
"group by symbol " +
"insert all events into outputStream;";

length为3,对price升序;这里的意思是,当window length >3时,即4,会输出按price升序排序,最大的那个event

结果如下,

"WSO2", 0.0, 1447921187000
"IBM", 0.0, 1447921187000
Events{ @timeStamp = 1448875633289, inEvents = [Event{timestamp=1448875633289, data=[WSO2, 0.0, 1447921187000], isExpired=false}], RemoveEvents = null }
Events{ @timeStamp = 1448875633290, inEvents = [Event{timestamp=1448875633290, data=[IBM, 0.0, 1447921187000], isExpired=false}], RemoveEvents = null }
"WSO2", 10.0, 1447921188000
"IBM", 10.0, 1447921188000
Events{ @timeStamp = 1448875634291, inEvents = [Event{timestamp=1448875634291, data=[WSO2, 10.0, 1447921188000], isExpired=false}], RemoveEvents = null }
Events{ @timeStamp = 1448875634291, inEvents = [Event{timestamp=1448875634291, data=[IBM, 10.0, 1447921188000], isExpired=false}], RemoveEvents = [Event{timestamp=1448875634291, data=[IBM, 10.0, 1447921188000], isExpired=true}] }
"WSO2", 20.0, 1447921189000
"IBM", 20.0, 1447921189000
Events{ @timeStamp = 1448875635292, inEvents = [Event{timestamp=1448875635292, data=[WSO2, 20.0, 1447921189000], isExpired=false}], RemoveEvents = [Event{timestamp=1448875635292, data=[WSO2, 20.0, 1447921189000], isExpired=true}] }
Events{ @timeStamp = 1448875635292, inEvents = [Event{timestamp=1448875635292, data=[IBM, 20.0, 1447921189000], isExpired=false}], RemoveEvents = [Event{timestamp=1448875635292, data=[IBM, 20.0, 1447921189000], isExpired=true}] }

可以看到,大于3的时候,current event和expired event收到的都是一样的,因为是asc排序,所以大于前3个的都会被过期

 

window.frequent;window.lossyFrequent

<event> frequent(<int> eventCount, <string> attribute, .. , <string> attributeN), based on Misra-Gries counting algorithm, 参考http://www.zhihu.com/question/23480657

这个processor的实现原理参考,http://mail.wso2.org/mailarchive/dev/2015-September/055230.html

说实在的,如果对这个算法不了解,相当的晦涩,

"define stream cseEventStream (symbol string, price float, time long);" +
"@info(name = 'query1') " +
"from cseEventStream[700 > price]#window.frequent(2, symbol) " +
"select symbol, price, time " +
"insert all events into outputStream;";

frequent的意思,就是你接收current events,如果当前stream的event,是属于top frequent的,就会输出,否则就会丢掉 
说白了,从current events,你可以一直重复的收到属于top frequent的event,其他的则会丢掉

输入如下,

String str = "attributes to attributes to to events. If no no no no attributes";
String[] strs = str.split(" ");
for(String s:strs){
    float p = i*10;
    inputHandler.send(new Object[]{s, p, time});
    System.out.println(s + ", " + p + ", " + time);
    Thread.sleep(1000);
    i++;
    time = time + 1000;
}

得到结果,来分析一下,

attributes, 0.0, 1447921187000
Events{ @timeStamp = 1448873866506, inEvents = [Event{timestamp=1448873866506, data=[attributes, 0.0, 1447921187000], isExpired=false}], RemoveEvents = null }
to, 10.0, 1447921188000
Events{ @timeStamp = 1448873867509, inEvents = [Event{timestamp=1448873867509, data=[to, 10.0, 1447921188000], isExpired=false}], RemoveEvents = null }
attributes, 20.0, 1447921189000
Events{ @timeStamp = 1448873868509, inEvents = [Event{timestamp=1448873868509, data=[attributes, 20.0, 1447921189000], isExpired=false}], RemoveEvents = null }
to, 30.0, 1447921190000
Events{ @timeStamp = 1448873869509, inEvents = [Event{timestamp=1448873869509, data=[to, 30.0, 1447921190000], isExpired=false}], RemoveEvents = null }
to, 40.0, 1447921191000
Events{ @timeStamp = 1448873870509, inEvents = [Event{timestamp=1448873870509, data=[to, 40.0, 1447921191000], isExpired=false}], RemoveEvents = null }
events., 50.0, 1447921192000
If, 60.0, 1447921193000
Events{ @timeStamp = 1448873872509, inEvents = [Event{timestamp=1448873872509, data=[If, 60.0, 1447921193000], isExpired=false}], RemoveEvents = [Event{timestamp=1448873868509, data=[attributes, 20.0, 1447921189000], isExpired=true}] }
no, 70.0, 1447921194000
Events{ @timeStamp = 1448873873509, inEvents = [Event{timestamp=1448873873509, data=[no, 70.0, 1447921194000], isExpired=false}], RemoveEvents = [Event{timestamp=1448873870509, data=[to, 40.0, 1447921191000], isExpired=true}, Event{timestamp=1448873872509, data=[If, 60.0, 1447921193000], isExpired=true}] }

前面一直都没有问题,一直输入attributes,to, 
直到输入events.,因为attributes,to已经占满2个位置,所以要触发过期,window里面的所有event的frequency减1,过期frequency=0的event 
可是这里attributes,to的frequent都是大于0的,所以window里面没有可以expire的event, 
那么只能把当前的events.给丢掉了,所以在current events中并没有收到这个event,‘events.’ 
因为我们只能收到top frequent的events

到收到if,再次触发expire,window里面的所有event的frequency再次减1, 
此时,attributes的frequency已经为0,所以attribute被过期,而event,‘if’,被放入window中, 
所以此时,我们会在current events中看到‘if’,而在expired events中看到‘attributes’

 

<event> lossyFrequent(<double> supportThreshold, <double> errorBound, <string> attribute, .. , <string> attributeN), based on Lossy Counting algorithm, 参考http://stackoverflow.com/questions/8033012/what-is-lossy-counting

没测,应该是判断过期的算法不一样,其他差不多

本文章摘自博客园,原文发布日期:2015-11-24 

时间: 2024-08-31 10:24:35

Siddhi CEP Window机制的相关文章

Flink 原理与实现:Window 机制

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.而窗口(window)就是从 Streaming 到 Batch 的一个桥梁.Flink 提供了非常完善的窗口机制,这是我认为的 Flink 最大的亮点之一(其他的亮点包括消息乱序处理,和 checkpoint 机制).本文我们将介绍流式处理中的窗口概念,介绍 Flink 内建的一些窗口和 Window API,最后讨论下窗口在底层是如何实现的. 什么是 Win

Flink原理与实现:Window的实现原理

Flink原理与实现系列文章: Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraphFlink原理与实现:如何生成ExecutionGraph及物理执行图Flink原理与实现:Operator Chain原理Flink原理与实现:详解Flink中的状态管理 在阅读本文之前,请先阅读Flink 原理与实现:Window机制,这篇文章从用户的角度,对Window做了比较详细的分析,而本文主要是从Flink框架

Flink 原理与实现:Session Window

在上一篇文章:Window机制中,我们介绍了窗口的概念和底层实现,以及 Flink 一些内建的窗口,包括滑动窗口.翻滚窗口.本文将深入讲解一种较为特殊的窗口:会话窗口(session window).建议您在阅读完上一篇文章的基础上再阅读本文. 当我们需要分析用户的一段交互的行为事件时,通常的想法是将用户的事件流按照"session"来分组.session 是指一段持续活跃的期间,由活跃间隙分隔开.通俗一点说,消息之间的间隔小于超时阈值(sessionGap)的,则被分配到同一个窗口,

Apache Eagle:eBay开源分布式实时Hadoop数据安全引擎

日前,eBay公司隆重宣布正式向开源业界推出分布式实时安全监控方案:Apache Eagle,该项目已于2015年10月26日正式加入Apache基金会成为孵化器项目. http://goeagle.io  Apache Eagle提供一套高效分布式的流式策略引擎,具有高实时.可伸缩.易扩展.交互友好等特点,同时集成机器学习对历史模型训练建立用户画像以实现智能实时地保护Hadoop生态系统中大数据的安全. Eagle在eBay的使用场景 目前,Eagle的数据行为监控系统已经部署到一个拥有250

Apache Eagle:分布式实时 Hadoop 数据安全方案

日 前,eBay公司隆重宣布正式向开源业界推出分布式实时安全监控引方案 - Apache Eagle (http://goeagle.io ),该项目已正式加入Apache 称为孵化器项目.Apache Eagle提供一套高效分布式的流式策略引擎,具有高实时.可伸缩.易扩展.交互友好等特点,同时集成机器学习对用户行为建立Profile以实现实时智 能实时地保护Hadoop生态系统中大数据的安全. 背景 随着大数据的发展,越来越多的成功企业或者组织开始采取数据驱动 商业的运作模式.在eBay,我们

【资料合集】Apache Flink 精选PDF下载

Apache Flink是一款分布式.高性能的开源流式处理框架,在2015年1月12日,Apache Flink正式成为Apache顶级项目.目前Flink在阿里巴巴.Bouygues Teleccom.Capital One等公司得到应用,如阿里巴巴对Apache Flink的应用案例. 为了更好地让大家了解和使用Apache Flink,我们收集了25+个Flink相关的演讲PDF(资料来自Apache Flink官网推荐)和相关文章,供大家参考. PDF下载 Robert Metzger:

Qt学习之路(28):坐标变换

经过前面的章节,我们已经能够画出一些东西来,主要就是使用QPainter的相关函数.今天,我们要看的是QPainter的坐标系统. 同很多坐标系统一样,QPainter的默认坐标的原点(0, 0)位于屏幕的左上角,X轴正方向是水平向右,Y轴正方向是竖直向下.在这个坐标系统中,每个像素占据1 x 1的空间.你可以把它想象成是一张坐标值,其中的每个小格都是1个像素.这么说来,一个像素的中心实际上是一个"半像素坐标系",也就是说,像素(x, y)的中心位置其实是在(x + 0.5, y +

让Storm插上CEP的翅膀 - Siddhi调研和集成

什么是 Siddhi? Siddhi 是一种 lightweight, easy-to-use, open source CEP(Complex Event Processing)引擎,由wso2公司开发(http://wso2.com/about/). 像绝大多数的 CEP 系统一样,Siddhi 支持对于流式数据的类 SQL 的查询,SQL 式的 query 通过 complier 翻译成 Java 代码.  当一条数据流或多条数据流流入时,Siddhi Core 会实时的 check 当前

window操作系统下的句柄机制说明

  WORKAREA_ADDRESS(这个字段在V$SQL_WORKAREA_ACTIVE和V$SQL_WORKAREA 中都有),表示Address of the work area handle,也就是说,它是SQL工作区域这个对象的句柄(相当于是该对象在oracle系统级别上的一个标识符(identity),类似,一个人,在中国范围内,有一个唯一的区别于别人的身份证号.也类似锁是对象的中介一样.是一种表示地址的句柄)的值.Each SQL statement stored in the s