3.6 Trident状态
我们现在已经给每个aggregator的分组数据进行了计数,现在想将信息进行持久化存储,以便进一步分析。在Trident中,持久化操作从状态管理开始。Trident对状态有底层的操作原语,但不同于Storm API,它不关心要哪些数据会作为状态存储或者如何存储这些状态。Trident在高层提供了下述的状态接口:
上面提到了,Trident将tuple分组成一批批数据。每批数据都有自己的事务标识符。在前面的接口中,Trident告诉State对象什么时候开始提交状态,什么时候提交状态应该结束。
和function类似,Stream对象也有方法向topology引入基于状态的操作。更具体说,Trident有两种数据流:Stream和GroupedStream。一个GroupedStream是GroupBy操作的结果。在我们的topology中,我们根据HourAssignment function生成的key对tuple进行分组。
在Steam对象中,下列方法允许topology读和写状态信息:
stateQuery()方法从state生成了一个输入流,不同参数的几个partitionPersist()方法允许topology从数据流中的tuple更新状态信息。partitionPersist()方法的操作对象是每个数据分片。
在Stream对象的方法外,GroupedStream对象允许topology对一批tuple进行聚合统计,并且将收集到的信息持久化在state中。下列代码是GroupedSteam类中和状态相关的方法:
和Steam对象类似,stateQuery()方法从State生成一个输入数据流。不同参数的几个persistAggregate()方法允许topology从数据流中的tuple更新状态信息。注意GroupedStream方法有一个Aggregator参数,它在信息写入State对象之前执行。
现在考虑将这些function应用到我们的例子中来。在我们的系统中,需要将事件发生的城市、疾病代码、每小时内产生疾病统计量进行持久存储。这样可以生成报表如表3-2所示。
为了实现这个功能,我们需要将聚集操作中生成的统计量进行持久化存储。我们可以使用groupBy函数返回的GroupedStream接口(如前面所示)调用persistAggregate方法。下面代码是示例topology中具体的调用方式:
要了解持久化存储,我们首先来看这个方法的第一个参数。Trident使用一个工厂类来生成State的实例。OutbreakTrendFactory是我们的topology提供给Storm的工厂类。OutbreakTrendFactory代码如下:
工厂类返回一个State对象,Storm用它来持久化存储信息。在Storm中,有三种类型的状态。每个类型的描述如表3-3所示。
在分布式环境下,数据可能被重放,为了支持计数和状态更新,Trident将状态更新操作进行序列化,使用不同的状态更新模式对重放和错误数据进行容错。接下来会介绍这些模式。
3.6.1 重复事务型状态
在重复事务型状态中,最后一批提交的数据的标识符存在数据中。当且仅当一批数据标识符的序号大于当前标识符时,才进行更新操作。如果小于或者等于当前标识符,将会忽略更新操作。
为了演示这个实现方法,考虑如表3-4所示的数据批次的序列,这些记录对我们例子中的数据按照key进行聚合计数。
这些批次数据按照下列将顺序处理完成:
1 à 2 à 3 à 3 (重放)
处理结果将按照表3-5中的状态变更操作,中间的一列数据用来存储数据标识符,记录最近一次合并进状态的数据批次编号。