Storm-源码分析-acker (backtype.storm.daemon.acker)

backtype.storm.daemon.acker 
设计的巧妙在于, 不用分别记录和track, stream过程中所有的tuple, 而只需要track root tuple, 而所有中间过程都通过异或更新track entry

acker-init, 在spout发送一个tuple时触发, 初始化这个root tuple的track entry  
acker-ack, 在blot ack一个tuple的时候触发, 会对该tuple的anchors-to-ids中记录的每个(root, edge)进行ack, 并出于优化还会附带登记新的edge(对acker透明, 在发送前已经完成) 
acker-fail, 任一个过程中的tuple fail, 都会导致这个root tuple失败

 

(defn mk-acker-bolt []
  (let [output-collector (MutableObject.)
        pending (MutableObject.)]
    (reify IBolt
      (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
               (.setObject output-collector collector)
               (.setObject pending (RotatingMap. 2)) ;;用RotatingMap来缓存每个tuple的track信息
               )
      (^void execute [this ^Tuple tuple]
             (let [^RotatingMap pending (.getObject pending)
                   stream-id (.getSourceStreamId tuple)]  ;;从ack tuple中取出streamid
               (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID) ;;收到system_tick_stream, rotate pending, spout的pending和acker的pending超期时间是一样的, 都取决于system-tick
                 (.rotate pending)
                 (let [id (.getValue tuple 0) ;;else,其他的stream,取出tuple id
                       ^OutputCollector output-collector (.getObject output-collector)
                       curr (.get pending id) ;;取出相应tuple的track entry
                       curr (condp = stream-id
                                ACKER-INIT-STREAM-ID (-> curr  ;;初始化tuple的track entry
                                                         (update-ack (.getValue tuple 1)) ;;更新entry中的track value
                                                         (assoc :spout-task (.getValue tuple 2))) ;;记录该tuple和spout-task的关系, 这样在ack或fail的时候才知道通知谁
                                ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1));;ack, 用val和原来的entry value做异或
                                ACKER-FAIL-STREAM-ID (assoc curr :failed true))] ;;fail, 直接把entry的:failed设true
                   (.put pending id curr)
                   (when (and curr (:spout-task curr))
                     (cond (= 0 (:val curr)) ;;val为0, 表示该tuple的所有edge都被成功ack
                           (do
                             (.remove pending id) ;;从pending中删除track entry, 并向相应的spout-task发送ack消息
                             (acker-emit-direct output-collector
                                                (:spout-task curr)
                                                ACKER-ACK-STREAM-ID
                                                [id]
                                                ))
                           (:failed curr) ;;:failed为true, 表示该tuple失败
                           (do
                             (.remove pending id) ;;从pending中删除track entry, 并向相应的spout-task发送fail消息 
                             (acker-emit-direct output-collector
                                                (:spout-task curr)
                                                ACKER-FAIL-STREAM-ID
                                                [id]
                                                ))
                           ))
                   (.ack output-collector tuple) ;;acker bolt也是bolt, 所以最后完成对该ack tuple的ack
                   ))))
      (^void cleanup [this]
        )
      )))

 

(defn- update-ack [curr-entry val]
  (let [old (get curr-entry :val 0)] ;;取出entry中的value值,默认设为0
    (assoc curr-entry :val (bit-xor old val)) ;;将old和新val异或, 赋给entry的value
    ))

本文章摘自博客园,原文发布日期:2013-08-06
时间: 2024-10-03 12:13:55

Storm-源码分析-acker (backtype.storm.daemon.acker)的相关文章

Storm-源码分析- Scheduler (backtype.storm.scheduler)

首先看看IScheduler接口的定义, 主要实现两个接口, prepare和schedule 对于schedule的参数注释写的非常清楚,  topologies包含所有topology的静态信息, 而cluster中包含了topology的运行态信息  根据他们就可以来判断如何assignment package backtype.storm.scheduler; import java.util.Map; public interface IScheduler { void prepare

Storm-源码分析-EventManager (backtype.storm.event)

大体结构, 定义protocol EventManager, 其实就是定义interface 函数event-manager, 主要做2件事  1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行  2. 返回实现EventManager的匿名record(reify部分, 实现protocol) 这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner (ns backtype.storm

Storm-源码分析- hook (backtype.storm.hooks)

task hook 在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook 当前定义如下事件, emit, cleanup, spoutAck-- 用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS) 系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions   public interface ITaskHook { void prepare(Map conf, T

Storm-源码分析- spout (backtype.storm.spout)

1. ISpout接口 ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message.  如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message 必须要注意的是, spout线程会在一个线程中调用ack, fail, nextTuple, 所以不用考虑互斥,

Storm-源码分析- Messaging (backtype.storm.messaging)

先定义两个接口和一个类  TaskMessage类本身比较好理解, 抽象storm的message格式  对于IContext, 注释也说了, 定义messaging plugin, 通过什么渠道去发送message, storm这里设计成可替换的  默认定义storm实现了local和ZMQ两种plugin, 当然你可以实现更多的  local应该是用于local mode, 而ZMQ用于distributed mode IContext接口主要是用于创建IConnection, 体现对soc

Storm-源码分析- bolt (backtype.storm.task)

Bolt关键的接口为execute,  Tuple的真正处理逻辑, 通过OutputCollector.emit发出新的tuples, 调用ack或fail处理的tuple /** * An IBolt represents a component that takes tuples as input and produces tuples * as output. An IBolt can do everything from filtering to joining to functions

Storm-源码分析-Stats (backtype.storm.stats)

会发现, 现在storm里面有两套metrics系统, metrics framework和stats framework 并且在所有地方都是同时注册两套, 貌似准备用metrics来替代stats, 但当前版本UI仍然使用stats   这个模块统计的数据怎么被使用, 1. 在worker中, 会定期调用do-executor-heartbeats去往zk同步hb  可以看到, stats也会作为hb的一部分被同步到zk上 (defnk do-executor-heartbeats [work

Storm-源码分析-LocalState (backtype.storm.utils)

LocalState A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk. 基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景. public syn

Storm-源码分析- timer (backtype.storm.timer)

mk-timer timer是基于PriorityQueue实现的(和PriorityBlockingQueue区别, 在于没有阻塞机制, 不是线程安全的), 优先级队列是堆数据结构的典型应用  默认情况下, 按照自然顺序(其实就是默认comparator的定义), 最小的元素排在堆头  当然也可以自己重新实现comparator接口, 比如timer就用reify重新实现了comparator接口 整个过程其实比较简单, 开个timer-thread, 不断check PriorityQueu

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net