Storm-源码分析-EventManager (backtype.storm.event)

大体结构,

定义protocol EventManager, 其实就是定义interface

函数event-manager, 主要做2件事 
1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行 
2. 返回实现EventManager的匿名record(reify部分, 实现protocol)

这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner

(ns backtype.storm.event
  (:use [backtype.storm log util])
  (:import [backtype.storm.utils Time Utils])
  (:import [java.util.concurrent LinkedBlockingQueue TimeUnit])
  )

(defprotocol EventManager
  (add [this event-fn])
  (waiting? [this])
  (shutdown [this]))

(defn event-manager
  "Creates a thread to respond to events. Any error will cause process to halt"
  [daemon?]
  (let [added (atom 0)
        processed (atom 0)
        ^LinkedBlockingQueue queue (LinkedBlockingQueue.)
        running (atom true)
        runner (Thread.
                  (fn []
                    (try-cause
                      (while @running
                        (let [r (.take queue)]
                          (r)
                          (swap! processed inc)))
                    (catch InterruptedException t
                      (log-message "Event manager interrupted"))
                    (catch Throwable t
                      (log-error t "Error when processing event")
                      (halt-process! 20 "Error when processing an event"))
                      )))]
    (.setDaemon runner daemon?)
    (.start runner)
    (reify
      EventManager
      (add [this event-fn]
        ;; should keep track of total added and processed to know if this is finished yet
        (when-not @running
          (throw (RuntimeException. "Cannot add events to a shutdown event manager")))
        (swap! added inc)
        (.put queue event-fn)
        )
      (waiting? [this]
        (or (Time/isThreadWaiting runner)
            (= @processed @added)
            ))
      (shutdown [this]
        (reset! running false)
        (.interrupt runner)
        (.join runner)
        )
        )))

 

使用的时候很简单, 如下

let [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
(.add processes-event-manager sync-processes)

可以直接调用add或其他的function 
相当于给event-manager增加EventManager protocol, 反过来说, 给add或其他接口functions增加对event-manager record的support, 因为protocol函数的第一个参数总是类型

比较神奇的是, 闭包产生的效果, 可以在完全没有queue, runner定义或声明的情况下, 方便的操作他们

本文章摘自博客园,原文发布日期:2013-06-24

时间: 2024-11-27 14:26:07

Storm-源码分析-EventManager (backtype.storm.event)的相关文章

Storm-源码分析- Scheduler (backtype.storm.scheduler)

首先看看IScheduler接口的定义, 主要实现两个接口, prepare和schedule 对于schedule的参数注释写的非常清楚,  topologies包含所有topology的静态信息, 而cluster中包含了topology的运行态信息  根据他们就可以来判断如何assignment package backtype.storm.scheduler; import java.util.Map; public interface IScheduler { void prepare

Storm-源码分析-acker (backtype.storm.daemon.acker)

backtype.storm.daemon.acker  设计的巧妙在于, 不用分别记录和track, stream过程中所有的tuple, 而只需要track root tuple, 而所有中间过程都通过异或更新track entry acker-init, 在spout发送一个tuple时触发, 初始化这个root tuple的track entry   acker-ack, 在blot ack一个tuple的时候触发, 会对该tuple的anchors-to-ids中记录的每个(root,

Storm-源码分析- hook (backtype.storm.hooks)

task hook 在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook 当前定义如下事件, emit, cleanup, spoutAck-- 用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS) 系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions   public interface ITaskHook { void prepare(Map conf, T

Storm-源码分析- spout (backtype.storm.spout)

1. ISpout接口 ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message.  如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message 必须要注意的是, spout线程会在一个线程中调用ack, fail, nextTuple, 所以不用考虑互斥,

Storm-源码分析- Messaging (backtype.storm.messaging)

先定义两个接口和一个类  TaskMessage类本身比较好理解, 抽象storm的message格式  对于IContext, 注释也说了, 定义messaging plugin, 通过什么渠道去发送message, storm这里设计成可替换的  默认定义storm实现了local和ZMQ两种plugin, 当然你可以实现更多的  local应该是用于local mode, 而ZMQ用于distributed mode IContext接口主要是用于创建IConnection, 体现对soc

Storm-源码分析- bolt (backtype.storm.task)

Bolt关键的接口为execute,  Tuple的真正处理逻辑, 通过OutputCollector.emit发出新的tuples, 调用ack或fail处理的tuple /** * An IBolt represents a component that takes tuples as input and produces tuples * as output. An IBolt can do everything from filtering to joining to functions

Storm-源码分析-Stats (backtype.storm.stats)

会发现, 现在storm里面有两套metrics系统, metrics framework和stats framework 并且在所有地方都是同时注册两套, 貌似准备用metrics来替代stats, 但当前版本UI仍然使用stats   这个模块统计的数据怎么被使用, 1. 在worker中, 会定期调用do-executor-heartbeats去往zk同步hb  可以看到, stats也会作为hb的一部分被同步到zk上 (defnk do-executor-heartbeats [work

Storm-源码分析- timer (backtype.storm.timer)

mk-timer timer是基于PriorityQueue实现的(和PriorityBlockingQueue区别, 在于没有阻塞机制, 不是线程安全的), 优先级队列是堆数据结构的典型应用  默认情况下, 按照自然顺序(其实就是默认comparator的定义), 最小的元素排在堆头  当然也可以自己重新实现comparator接口, 比如timer就用reify重新实现了comparator接口 整个过程其实比较简单, 开个timer-thread, 不断check PriorityQueu

Storm-源码分析-LocalState (backtype.storm.utils)

LocalState A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk. 基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景. public syn