IBasicOutputCollector.java
List<Integer> emit(String streamId, List<Object> tuple);
提交一系列的tuple,返回接收到这些tuple的taskId
void emitDirect(int taskId, String streamId, List<Object> tuple);
直接向某个task提交一系列的tuple
BasicOutputCollector.java
这个类里封装了一个OutputCollector的代理和一个inputTuple
OutputCollector是在构造函数里传入的,在Bolt处理完tuple之后调用此类的emit方法时,方法内部会调用封装的OutputCollector来进行emit,
最终的emit是OutputCollector的emit
此类还提供了两个emit方法的重载,目的是在没有指定streamId的时候提供一个默认名为“default”的streamId.
此类的emit方法没有提供anchors参数,每次bolt执行完之后进行emit时会自动将输入tuples和输出tuples关联,如果不需要关联,则可不用此类。
IOutputCollector.java
List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);
提交一系列的tuple,返回接收到这些tuple的taskId,anchors参数是指接收到的tuples
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
直接向某个task提交一系列tuple,同样的也会附带上输入的tuple
void ack(Tuple input);
ack某个tuple
void fail(Tuple input);
fail某个tuple
OutputCollector.java
封装了一个IOutputCollector的代理,该代理在构造函数时传递进来被初始化。
提供了多个emit方法的重载,基本上包括有(单个anchor,无anchor,无指定streamId,只有输出的tuple)这些
emitDirect重载的方式也基本上一样,都是为了使用方便来做的。
当然,最终的emit,ack和fail都是通过代理来实现的。
CoordinatedOutputCollector.java
这个类比较奇葩,是定义在CoordinatedBolt的内部类,只有CoordinatedBolt这个类使用。
封装了一个IOutputCollector代理,该代理在构造函数时被初始化。
此类没有重载emit和emitDirect方法,但是在emit和emitDirect方法内部会调用一个名为updateTaskCounts的方法
private void updateTaskCounts(Object id, List<Integer> tasks) {
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
if (track != null) {
Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
for(Integer task: tasks) {
int newCount = get(taskEmittedTuples, task, 0) + 1;
taskEmittedTuples.put(task, newCount);
}
}
}
}
这个方法主要是更新目标task和向其发送的tuple数量关系,其关系维护在_tracked变量里,关系链为
tuple_id —> task_id —> num
public void ack(Tuple tuple) {
Object id = tuple.getValue(0);
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
if (track != null)
track.receivedTuples++;
}
boolean failed = checkFinishId(tuple, TupleType.REGULAR);
if(failed) {
_delegate.fail(tuple);
} else {
_delegate.ack(tuple);
}
}
将收到的tupleID对应的跟踪信息中receivedTuples(已接收数量)+1 ,然后检查是否已经处理完该tupleID对应的任务,如果检查失败就fail回上一个bolt
TupleType.REGULAR是为了保证不是传递ID的也不是传递数量的流。
public void fail(Tuple tuple) {
Object id = tuple.getValue(0);
synchronized(_tracked) {
TrackingInfo track = _tracked.get(id);
if (track != null)
track.failed = true;
}
checkFinishId(tuple, TupleType.REGULAR);
_delegate.fail(tuple);
}
设置跟踪信息failed,然后checkFinishId方法中会fail所有应该ack的tuples,然后删除这个tupleID对应的跟踪信息