Storm-源码分析- hook (backtype.storm.hooks)

task hook

在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook

当前定义如下事件, emit, cleanup, spoutAck……

用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS)

系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions

 

public interface ITaskHook {
    void prepare(Map conf, TopologyContext context);
    void cleanup();
    void emit(EmitInfo info);
    void spoutAck(SpoutAckInfo info);
    void spoutFail(SpoutFailInfo info);
    void boltExecute(BoltExecuteInfo info);
    void boltAck(BoltAckInfo info);
    void boltFail(BoltFailInfo info);
}
public class EmitInfo {
    public List<Object> values;
    public String stream;
    public int taskId;
    public Collection<Integer> outTasks;

    public EmitInfo(List<Object> values, String stream, int taskId, Collection<Integer> outTasks) {
        this.values = values;
        this.stream = stream;
        this.taskId = taskId;
        this.outTasks = outTasks;
    }
}

 

1. add hook

在mk-task的时候, 会从storm-conf配置里面读出hooks的class names 
创建hook对象, 加入到TopologyContext的_hooks中

(defn mk-task [executor-data task-id]
    (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
      (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
)
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
    private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();

    public void addTaskHook(ITaskHook hook) {
        hook.prepare(_stormConf, this);
        _hooks.add(hook);
    }

    public Collection<ITaskHook> getHooks() {
        return _hooks;
    }
}

 

2. apply hook

当发生相应的事件时, 调用事先注册的hooks

下面的例子是在emit时, 调用相应的hooks 
apply-hooks宏实现也很简单, 从topology context中取出hooks列表, 对每个hook调用emit(EmitInfo)

(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(defmacro apply-hooks [topology-context method-sym info-form]
  (let [hook-sym (with-meta (gensym "hook") {:tag 'backtype.storm.hooks.ITaskHook})]
    `(let [hooks# (get-context-hooks ~topology-context)]
       (when-not (hooks-empty? hooks#)
         (let [info# ~info-form]
           (fast-list-iter [~hook-sym hooks#]
             (~method-sym ~hook-sym info#)
             ))))))

本文章摘自博客园,原文发布日期:2013-07-30
时间: 2024-10-26 05:24:57

Storm-源码分析- hook (backtype.storm.hooks)的相关文章

Storm-源码分析- Scheduler (backtype.storm.scheduler)

首先看看IScheduler接口的定义, 主要实现两个接口, prepare和schedule 对于schedule的参数注释写的非常清楚,  topologies包含所有topology的静态信息, 而cluster中包含了topology的运行态信息  根据他们就可以来判断如何assignment package backtype.storm.scheduler; import java.util.Map; public interface IScheduler { void prepare

Storm-源码分析-acker (backtype.storm.daemon.acker)

backtype.storm.daemon.acker  设计的巧妙在于, 不用分别记录和track, stream过程中所有的tuple, 而只需要track root tuple, 而所有中间过程都通过异或更新track entry acker-init, 在spout发送一个tuple时触发, 初始化这个root tuple的track entry   acker-ack, 在blot ack一个tuple的时候触发, 会对该tuple的anchors-to-ids中记录的每个(root,

Storm-源码分析- spout (backtype.storm.spout)

1. ISpout接口 ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message.  如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message 必须要注意的是, spout线程会在一个线程中调用ack, fail, nextTuple, 所以不用考虑互斥,

Storm-源码分析-EventManager (backtype.storm.event)

大体结构, 定义protocol EventManager, 其实就是定义interface 函数event-manager, 主要做2件事  1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行  2. 返回实现EventManager的匿名record(reify部分, 实现protocol) 这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner (ns backtype.storm

Storm-源码分析- Messaging (backtype.storm.messaging)

先定义两个接口和一个类  TaskMessage类本身比较好理解, 抽象storm的message格式  对于IContext, 注释也说了, 定义messaging plugin, 通过什么渠道去发送message, storm这里设计成可替换的  默认定义storm实现了local和ZMQ两种plugin, 当然你可以实现更多的  local应该是用于local mode, 而ZMQ用于distributed mode IContext接口主要是用于创建IConnection, 体现对soc

Storm-源码分析- bolt (backtype.storm.task)

Bolt关键的接口为execute,  Tuple的真正处理逻辑, 通过OutputCollector.emit发出新的tuples, 调用ack或fail处理的tuple /** * An IBolt represents a component that takes tuples as input and produces tuples * as output. An IBolt can do everything from filtering to joining to functions

Storm-源码分析-Stats (backtype.storm.stats)

会发现, 现在storm里面有两套metrics系统, metrics framework和stats framework 并且在所有地方都是同时注册两套, 貌似准备用metrics来替代stats, 但当前版本UI仍然使用stats   这个模块统计的数据怎么被使用, 1. 在worker中, 会定期调用do-executor-heartbeats去往zk同步hb  可以看到, stats也会作为hb的一部分被同步到zk上 (defnk do-executor-heartbeats [work

Storm-源码分析-LocalState (backtype.storm.utils)

LocalState A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk. 基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景. public syn

Storm-源码分析- timer (backtype.storm.timer)

mk-timer timer是基于PriorityQueue实现的(和PriorityBlockingQueue区别, 在于没有阻塞机制, 不是线程安全的), 优先级队列是堆数据结构的典型应用  默认情况下, 按照自然顺序(其实就是默认comparator的定义), 最小的元素排在堆头  当然也可以自己重新实现comparator接口, 比如timer就用reify重新实现了comparator接口 整个过程其实比较简单, 开个timer-thread, 不断check PriorityQueu