3.5 Trident聚合器
和function类似,aggregator(聚合器)允许topology组合tuple。不同的是,它会替换tuple的字段和值。有三种聚合器:CombinerAggregator、ReducerAggregator和Aggregator。
3.5.1 CombinerAggregator
CombinerAggregator用来将一个集合的tuple组合到一个单独的字段中,Combiner的签名(Signature)如下所示:
Storm对每个tuple调用init()方法,然后重复调用combine()方法直到一个分片的数据处理完成。传递给combine()方法的两个参数是局部聚合的结果,以及调用了init()返回的值。分片会在后面的部分详细介绍,分片实际上就是tuples组成的数据流在同一个机器上的一个子集。将tuple生成的值进行组合之后,Storm发送组合结果作为一个新的字段。如果分片是空的,Storm会发送zero()方法执行的返回。
3.5.2 ReducerAggregator
ReducerAggregator接口有一点区别,签名如下:
Storm调用init()方法来获取原始值。然后为分片中的每一个tuple调用reduce()方法,直到分片数据处理完成。第一个参数是局部的聚合结果。这个方法的实现需要将第二个参数tuple合并到局部聚合结果中返回。
3.5.3 Aggregator
最通用的聚合操作是Aggregator。签名如下所示:
Aggregator接口的aggregate()方法和function接口的execute()方法类似,但是多了一个value参数。这样Aggregator就可以在处理tuple的时候累积值。注意,在Aggregator接口中,aggregate()和complete()方法都有collector这个参数,通过它可以发射任意个数的tuple。在我们的topology例子中,我们利用了一个内置的Count的Aggregator。Count的实现如下面代码片段所示:
我们在示例topology中使用了分组和计数来统计在一个城市附近一个小时内发生疾病的次数。实现代码如下所示:
回顾Storm在不同机器上的数据的分片,如图3-2所示。
https://yqfile.alicdn.com/899f99dbd07c00661c7faf29135d61beb6420e4e.png
" >
groupBy()方法强制数据重新分片,将特定字段值相同的tuple分组到同一个分片中。为了做到这个,Storm必须将相似的tuple发送到相同的主机上。图3-3展示了数据被groupBy()重新分组后的分片情况。
https://yqfile.alicdn.com/3c5b95458c4e0eda57a90ddd66123171e5f7d382.png" >
分片后,agreagte函数在每个分片数据的每个分组中运行。在我们的例子里,根据城市、小时、疾病代码作为分组的关键词。然后Count aggregator在每个分组上执行,将计数发射给下游的消费者组件。