一、什么是Trident State
直译过来就是trident状态,这里的状态主要涉及到Trident如何实现一致性语义规则,Trident的计算结果将被如何提交,如何保存,如何更新等等。我们知道Trident的计算都是以batch为单位的,但是batch在中的tuple在处理过程中有可能会失败,失败之后bach又有可能会被重播,这就涉及到很多事务一致性问题。Trident State就是管理这些问题的一套方案,与这套方案对应的就是Trident State API。这样说可能还比较抽象,下面就用一个例子具体说明一下。
1.1 举例具体例子来说明
假设有这么一个需求,统计一个数据流中各个单词出现的数量,并把单词和其数量更新到数据库中。假设我们在数据库中只有两个字段,单词和其数量,在计数过程中,如果遇到相同的单词则就把其数量加一。但是这么做有一个问题,如果某个单词是被重播的单词,就有可能导致这个单词被多加了一遍。因此,在数据库中只保存单词和其数量两个字段是无法做到“数据只被处理一次”的语义要求的。
1.2 Trident是怎么解决这个问题的呢?
Trident定义了如下语义规则:
1. 所有的Tuple都是以batch的形式处理的
2. 每个batch都会被分配一个唯一的“transaction id”(txid),如果batch被重发,txid不变
3. 各个batch状态的更新是有序的,也就是说batch2一定会在batch3之前更新
有了这三个规则,我们就可以通过txid知道batch是否被处理过,然后就可以根据实际情况来更新状态信息了。很明显,要满足这几个语义规则,就需要spout来支持,因为把tuple封装成batch,分配txid等等都是有spout来负责的。
但是在具体应用场景中,storm应该能够提供不同的容错级别,因为某些情况下我们并不需要强一致性。为了更灵活的处理,Trident提供了三类spout,分别是:
1. Transactional spouts : 事务spout,提供了强一致性
2. Opaque Transactional spouts:不透明事务spout,提供了弱一致性
3. No-Transactional spouts:非事务spout,对一致性无法保证
注意,所有的Trident Spout都是以batch的形式发送数据,每个batch也都会分配一个唯一的txid,决定它们有不同性质的地方在于它们对各自的batch提供了什么样的保证。
1.3 Trident State的类型
我们已经知道Trident 提供了三种类型的spout来服务Trident State管理,那么对应的Trident State也有三种类型:
1. Transactional
2. Opaque Transactional
3. No-Transactional
二、各类Trident Spout详解
2.1 Transactional spouts
Transactional spouts对batch的发送提供了如下保证:
1. 相同txid的batch完全一样,如果一个batch被重播,重播的batch的txid及其所有tuple和原batch的完全一致
2. 两个batch中的tuple不会有重合
3. 每个tuple都在batch中,不会有batch漏掉某个tuple
这三个特性是“最完美”的保证,也最容易理解,Stream被分割成固定的batch,而且不会改变。Storm就提供了一个Transactional spout的实现:TransactionalTridentKafkaSpout。
我们现在再看上面1.1节提到的那个实例,我们要把单词和其数量保存在数据库中,为了保证“数据只被处理一次”,除了要保存单词和数量两个字段之外,我们再加一个字段txid。在更新数据时,我们先对比一下当前的数据的txid和数据库中数据的txid,若txid相同,说明是被重播的数据,直接跳过即可,如果不同,则把两个数值相加即可。
下面具体说明一下,假设当前处理的batch的txid=3,其中的tuples为:
[man]
[man]
[dog]
再假设数据库中保存的数据为:
man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
数据库中“man”单词的txid为1,而当前batch的txid为3,说明当前batch中的“man”单词未被累加过,所以需要把当前batch中”man”的个数累加到数据库中。数据库中“dog”单词的txid为3,和当前batch的txid相同,说明已经被累计过了直接跳过。最终数据库中的结果变为:
man => [count=5, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
总结一下整个处理过程:
if(database txid=current txid){//两次更新的txid相同
跳过;
}else{
用current value替换掉database value;
}
2.2 Opaque Transactional spouts
上面已经提到过,并不是所有情形下都需要保证强一致性。例如在TransactionalTridentKafkaSpout中(关于Kafka相关介绍,点这里),如果它的一个batch中的tuples来自一个topic的所有partitions,如果要满足Transactionnal Spout语义的话,一旦这个batch因为某些失败而被重发,重发batch中的所有tuple必须与这个batch中的完全一致,而恰好kafka集群某个节点down掉导致这个topic其中一个partition无法使用,那么就会导致这个batch无法凑齐所有tuple(无法获取失败partition上的数据),整个处理过程被挂起。而Opaque Transactional spouts就可以解决这个问题。
Opaque Transactional spouts提供了如下保证:
- 每个tuple只在一个batch中被成功处理,如果一个batch中的tuple处理失败的话,会被后面的batch继续处理
怎么理解这个特性呢,简要来说就OpaqueTransactional spout和Transactional spouts基本差不多,只是在Opaque Transactional spout中,相同txid的batch中的tuple集合可能不一样。OpaqueTridentKafkaSpout就是符合这种特性的spout的,所以它可以容忍kafka节点失败。
因为重播的batch中的tuple集合可能不一样,所以对于Opaque Transactional Spout,就不能根据txid是否一致来决定是否需要更新状态了。我们需要在数据库中保存更多的状态信息,除了单词名,数量、txid之外,我们还需要保存一个pre-value来记录前一次计算的值。我们再用上面例子具体说明一下。
假设数据库中的记录如下:
{ value = 4,
preValue = 1,
txid = 2
}
假设当前batch的count值为2,txid=3。因为当前txid和数据库中的不同,我们需要把preValue替换成value的值,累计value值,然后更新txid值为3,结果如下:
{ value = 6,
prevValue = 4,
txid = 3
}
再假设当前batch的count值为1,txid=2。这是当前txid和数据库中的相同,虽然两个txid值相同,但由于两个batch的内容已经变了,所以上次的更新可以忽略掉,需要对数据库中的value值进行重新计算,即把当前值和preValue值相加,结果如下:
{ value =3,
prevValue = 1,
txid = 2
}
总结一下整个处理过程:
if(database txid=current txid){
value=preValue+current value;//重新更新value
//preValue不变;
}else{
preValue=value;//更新preValue
value=preValue+current value;//更新value
txid=current txid;//更新txid
}
2.3 No-Transactional spouts
No-Transactional spouts对每个batch的内容不做任何保证。如果失败的batch没被重发,它有会出现“最多被处理一次”的请况,如果tuples被多个batch处理,则会发生“最少被处理一次的情况”,很难保证“数据只被处理一次”的情况。
三、Spout和State的类型总结
下面这个表格描述了“数据只被处理一次”的spout/state的类型组合:
总的来说, Opaque transactional states即有一定的容错性又能保证数据一致性,但它的代价是需要在数据库中保存更多的状态信息(txid和preValue)。Transactional states虽然需要较少的状态信息(txid),但是它需要transactional spouts的支持。non-transactional states需要在数据库中保存最少的状态信息但难以保证“数据只被处理一次”的语义。
因此,在实际应用中,spout和state类型的选择需要根据我们具体应用需求来决定,当然在容错性和增加存储代价之间也需要做个权衡。
四、State API
上面讲的看上去有点啰嗦,庆幸的是Trident State API 在内部为我们实现了所有状态管理的逻辑,我们不需要再进行诸如对比txid,在数据库中存储多个值等操作,仅需要简单调用Trident API即可,例如:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
.parallelismHint(6);
所有的管理Opaque transactional states状态的逻辑都在MemcachedState.opaque()方法内部实现了。另外,所有的更新操作都是以batch为单位的,这样减少了对数据库的调用次数,极大的提高了效率。下面就向大家介绍一下和Trident State 相关的API。
4.1 State接口
Trident API中最基本的State接口只有两个方法:
public interface State {
void beginCommit(Long txid);
void commit(Long txid);
}
State接口只定义了状态什么时候开始更新,什么时候结束更新,并且我们都能获得一个txid。具体这个State如何工作,如何更新State,如何查询State,Trident并没有对此作出限制,我们可以自己任意实现。
假设我们有一个Location数据库,我们要通过Trident查新和更新这个数据库,那么我们可以自己实现这样一个LocationDB State,因为我们需要查询和更新,所以我们为这个LocationDB 可以添加对Location的get和set的实现:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocation(long userId, String location) {
// code to access database and set location
}
public String getLocation(long userId) {
// code to get location from database
}
}
4.2 StateFactory工厂接口
Trident提供了State Factory接口,我们实现了这个接口之后,Trident 就可以通过这个接口获得具体的Trident State实例了,下面我们就实现一个可以制造LocationDB实例的LocationDBFactory:
public class LocationDBFactory implements StateFactory {
public State makeState(Map conf, int partitionIndex, int numPartitions) {
return new LocationDB();
}
}
4.3 QueryFunction接口
这个接口是用来帮助Trident查询一个State,这个接口定义了两个方法:
public interface QueryFunction<S extends State, T> extends EachOperation {
List<T> batchRetrieve(S state, List<TridentTuple> args);
void execute(TridentTuple tuple, T result, TridentCollector collector);
}
接口的第一个方法batchRetrieve()
有两个参数,分别是要查询的State源和查询参数,因为trident都是以batch为单位处理的,所以这个查询参数是一个List<TridentTuple>
集合。关于第二个方法execute()
有三个参数,第一个代表查询参数中的某个tuple,第二个代表这个查询参数tuple对应的查询结果,第三个则是一个消息发送器。下面就看一个QuaryLocation的实例:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<String> ret = new ArrayList();
for(TridentTuple input: inputs) {
ret.add(state.getLocation(input.getLong(0)));
}
return ret;
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
}
}
QueryLocation
接收到Trident发送的查询参数,参数是一个batch,batch中tuple内容是userId信息,然后batchRetrieve()
方法负责从State源中获取每个userId对应的的location。最终batchRetrieve()查询的结果会被execute()
方法发送出去。
但这里有个问题,batchRetrieve()
方法中针对每个userid都做了一次查询State操作,这样处理显然效率不高,也不符合Trident所有操作都是针对batch的原则。所以,我们要对LocationDB
这个State做一下改造,提供一个bulkGetLocations()
方法来替换掉getLocation()
方法,请看改造后的LocationDB的实现:
public class LocationDB implements State {
public void beginCommit(Long txid) {
}
public void commit(Long txid) {
}
public void setLocationsBulk(List<Long> userIds, List<String> locations) {
// set locations in bulk
}
public List<String> bulkGetLocations(List<Long> userIds) {
// get locations in bulk
}
}
我们可以看到,改造的LocationDB
对Location的查询和更新都是批量操作的,这样显然可以提高处理效率。此时,我们再稍微改一下QueryFunction中
的batchRetrieve()
方法:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
List<Long> userIds = new ArrayList<Long>();
for(TridentTuple input: inputs) {
userIds.add(input.getLong(0));
}
return state.bulkGetLocations(userIds);
}
public void execute(TridentTuple tuple, String location, TridentCollector collector) {
collector.emit(new Values(location));
}
}
QueryLocation
在topology中可以这么使用:
TridentTopology topology = new TridentTopology();
topology.newStream("myspout", spout)
.stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
4.4 UpdateState接口
当我们要更新一个State源时,我们需要实现一个UpdateState
接口。UpdateState接口只提供了一个方法:
public interface StateUpdater<S extends State> extends Operation {
void updateState(S state, List<TridentTuple> tuples, TridentCollector collector);
}
下面我们来具体看一下LocationUpdater
的实现:
public class LocationUpdater extends BaseStateUpdater<LocationDB> {
public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
List<Long> ids = new ArrayList<Long>();
List<String> locations = new ArrayList<String>();
for(TridentTuple t: tuples) {
ids.add(t.getLong(0));
locations.add(t.getString(1));
}
state.setLocationsBulk(ids, locations);
}
}
对于LocationUpdater
在topology中可以这么使用:
TridentTopology topology = new TridentTopology();
TridentState locations =
topology.newStream("locations", locationsSpout)
通过调用Trident Stream的partitionPersist
方法可以更新一个State。在上面这个实例中,LocationUpdater
接收一个State和要更新的batch,最终通过调用LocationFactory
制造的LocationDB
中的setLocationsBulk()
方法把batch中的userid及其location批量更新到State中。
partitionPersist操作会返回一个TridentState对象,这个对象即是被TridentTopology更新后的LocationDB,所以,我们可以在topology中续继续对这个返回的State做查询操作。
另外一点需要注意的是,从上面StateUpdater接口可以看出,在它的updateState()
方法中还提供了一个TridentCollector
,因此在执行StateUpdate的同时仍然可以形成一个新的Stream。若要操作StateUpdater形成的Stream,可以通过调用TridentState.newValueStream()
方法实现。
五、persistentAggregate
Trident另一个update state的方法时persistentAggregate
,请看下面word count的例子:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
5.1 MapState接口
persistentAggregate是在partitionPersist之上的另一个抽象,它会对Trident Stream进行聚合之后再把聚合结果更新到State中。在上面这个例子中,因为聚合的是一个groupedStream
,Trident要求这种情况下State需要实现MapState
接口,被grouped的字段会被做为MapSate的key,被grouped的数据计算的结果会被做为MapSate的value。MapSate接口定义如下:
public interface MapState<T> extends State {
List<T> multiGet(List<List<Object>> keys);
List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
void multiPut(List<List<Object>> keys, List<T> vals);
}
对于MapSate的实现有MemoryMapState。
5.2 Snapshottable接口
如果我们聚合的不是一个groupedStream,Trident要求我们的State实现Snapshottable
接口:
public interface Snapshottable<T> extends State {
T get();
T update(ValueUpdater updater);
void set(T o);
}
对于Snapshottable的实现有 MemcachedState。
六、Map States的实现
在Trident中实现MapState
很简单,大部分工作Trident已经替我们做了。OpaqueMap
,TransactionalMap
, 和NonTransactionalMap
类已经替我们完成了和容错相关的处理逻辑. 我们仅仅提供一个 IBackingMap
的实现类即可, IBackingMap的接口定义如下:
public interface IBackingMap<T> {
List<T> multiGet(List<List<Object>> keys);
void multiPut(List<List<Object>> keys, List<T> vals);
}
OpaqueMap’s调用的multiPut将会把value值自动封装成OpaqueValue来处理, TransactionalMap’s 将会把value封装成TransactionalValue再进行处理, 而NonTransactionalMaps 则不会对value做处理,直接传递给topology。
另外,
Trident提供的CachedMap 类会对Map中的key/value做自动的LRU缓存 。
Trident提供的SnapshottableMap类会把MapState转换成SnapShottable对象(把MapState中的所有key/value对聚合成一个固定的key)。
详细更详细的了解整个MapState的实现过程,请查看 MemcachedState 的实现,MemcachedState除了把上面介绍的相关接口整合到一起之外,还提供了对opaque transactional, transactional, non-transactional三个语义规则的支持。