Storm之Collector-p1



IBasicOutputCollector.java   

List<Integer> emit(String streamId, List<Object> tuple);

提交一系列的tuple,返回接收到这些tuple的taskId

void emitDirect(int taskId, String streamId, List<Object> tuple);

直接向某个task提交一系列的tuple


BasicOutputCollector.java 

这个类里封装了一个OutputCollector的代理和一个inputTuple

OutputCollector是在构造函数里传入的,在Bolt处理完tuple之后调用此类的emit方法时,方法内部会调用封装的OutputCollector来进行emit,

最终的emit是OutputCollector的emit

此类还提供了两个emit方法的重载,目的是在没有指定streamId的时候提供一个默认名为“default”的streamId.

此类的emit方法没有提供anchors参数,每次bolt执行完之后进行emit时会自动将输入tuples和输出tuples关联,如果不需要关联,则可不用此类。

IOutputCollector.java

List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

提交一系列的tuple,返回接收到这些tuple的taskId,anchors参数是指接收到的tuples

void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

直接向某个task提交一系列tuple,同样的也会附带上输入的tuple

void ack(Tuple input);

ack某个tuple

void fail(Tuple input);

fail某个tuple

OutputCollector.java

封装了一个IOutputCollector的代理,该代理在构造函数时传递进来被初始化。

提供了多个emit方法的重载,基本上包括有(单个anchor,无anchor,无指定streamId,只有输出的tuple)这些

emitDirect重载的方式也基本上一样,都是为了使用方便来做的。

当然,最终的emit,ack和fail都是通过代理来实现的。

CoordinatedOutputCollector.java

这个类比较奇葩,是定义在CoordinatedBolt的内部类,只有CoordinatedBolt这个类使用。

封装了一个IOutputCollector代理,该代理在构造函数时被初始化。

此类没有重载emit和emitDirect方法,但是在emit和emitDirect方法内部会调用一个名为updateTaskCounts的方法

private void updateTaskCounts(Object id, List<Integer> tasks) {
    synchronized(_tracked) {
        TrackingInfo track = _tracked.get(id);
        if (track != null) {
            Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
            for(Integer task: tasks) {
                int newCount = get(taskEmittedTuples, task, 0) + 1;
                taskEmittedTuples.put(task, newCount);
            }
        }
    }
}

这个方法主要是更新目标task和向其发送的tuple数量关系,其关系维护在_tracked变量里,关系链为 

tuple_id —> task_id —> num

public void ack(Tuple tuple) {
    Object id = tuple.getValue(0);
    synchronized(_tracked) {
        TrackingInfo track = _tracked.get(id);
        if (track != null)
            track.receivedTuples++;
    }
    boolean failed = checkFinishId(tuple, TupleType.REGULAR);
    if(failed) {
        _delegate.fail(tuple);
    } else {
        _delegate.ack(tuple);
    }
}

将收到的tupleID对应的跟踪信息中receivedTuples(已接收数量)+1 ,然后检查是否已经处理完该tupleID对应的任务,如果检查失败就fail回上一个bolt

TupleType.REGULAR是为了保证不是传递ID的也不是传递数量的流。

public void fail(Tuple tuple) {
    Object id = tuple.getValue(0);
    synchronized(_tracked) {
        TrackingInfo track = _tracked.get(id);
        if (track != null)
            track.failed = true;
    }
    checkFinishId(tuple, TupleType.REGULAR);
    _delegate.fail(tuple);
}

设置跟踪信息failed,然后checkFinishId方法中会fail所有应该ack的tuples,然后删除这个tupleID对应的跟踪信息

时间: 2025-01-01 17:17:21

Storm之Collector-p1的相关文章

Storm专题二:Storm Trident API 使用详解

一.概述      Storm Trident中的核心数据模型就是"Stream",也就是说,Storm Trident处理的是Stream,但是实际上Stream是被成批处理的,Stream被切分成一个个的Batch分布到集群中,所有应用在Stream上的函数最终会应用到每个节点的Batch中,实现并行计算,具体如下图所示:       在Trident中有五种操作类型: Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输      Repartit

Storm之Bolt-接口

IBolt:           bolt接口类,定义了常用的几个接口,IBolt的实现类在client上被创建,然后序列化到拓扑里并被提交到集群的master上,之后nimbus会启动worker进行反序列化,调用prepare进行准备完毕之后就开始处理tuples         如果是在java里定义bolts ,建议实现IRichBolt.java接口类,IRichBolt.java同时继承了IComponent.java接口,提供了更多对拓扑进行操作的方法. /** * 当集群中的wo

算法:hdu 2639 Bone Collector II (dp 01背包求第k优解)

题目大意: 有n件物品,每件物品有体积和价值两个属性, 一个小偷带着一个大小为v的背包,要 偷这些东西,问小偷能偷的第k大的价值是多少? 思路: 这题和典型的01背包求最优解不同, 是要求第k大的解,所以,最直观的想法就是在01背包的基础上再增加一维,用来保存前k大小的数,然后在 递推时,根据前一个状态的前k 大小的数推出下一个阶段的前k个数保存下来. d[i][j][k], 表示取前i个物品,用j的费用,第k大价值是多少 在递推d[i][j][1...k]时,先获取上一个状态d[i- 1][j

Storm中的可靠性

     我们知道Storm有一个很重要的特性,那就是Storm API能够保证它的一个Tuple能够被完全处理,这一点尤为重要,其实storm中的可靠性是由spout和bolt组件共同完成的,下面就从spout和bolt两个方便给大家介绍一下storm中的可靠性,最后会给出一个实现了可靠性的例子. 1.Spout的可靠性保证      在Storm中,消息处理可靠性从Spout开始的.storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面

Storm - Guaranteeing message processing

https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/   这章讨论Storm's reliability capabilities, 如何保证从sp

Storm-源码分析- bolt (backtype.storm.task)

Bolt关键的接口为execute,  Tuple的真正处理逻辑, 通过OutputCollector.emit发出新的tuples, 调用ack或fail处理的tuple /** * An IBolt represents a component that takes tuples as input and produces tuples * as output. An IBolt can do everything from filtering to joining to functions

如何在eclipse调试storm程序

 一.介绍       storm提供了两种运行模式:本地模式和分布式模式.本地模式针对开发调试storm topologies非常有用.       Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is

Storm实时计算:流操作入门编程实践

Storm是一个分布式是实时计算系统,它设计了一种对流和计算的抽象,概念比较简单,实际编程开发起来相对容易.下面,简单介绍编程实践过程中需要理解的Storm中的几个概念: Topology Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排.容纳一组计算逻辑组件(Spout.Bolt)的对象(Hadoop MapReduce中一个Job包含一组Map Task.Reduce Task),这一组计算组件可以按照DAG图的方式编排起来(通过选择Stre

Storm专题一、Storm DRPC 分布式计算

本文译自:https://storm.incubator.apache.org/documentation/Distributed-RPC.html  Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算.DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流. DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语spout,bolt, topology而成的一种模

Storm详解二、写第一个Storm应用

     在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm. Storm运行模式: 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发.调试. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式. 写一个Hel