Storm-源码分析-Stats (backtype.storm.stats)

会发现, 现在storm里面有两套metrics系统, metrics framework和stats framework

并且在所有地方都是同时注册两套, 貌似准备用metrics来替代stats, 但当前版本UI仍然使用stats

 

这个模块统计的数据怎么被使用,

1. 在worker中, 会定期调用do-executor-heartbeats去往zk同步hb 
可以看到, stats也会作为hb的一部分被同步到zk上

(defnk do-executor-heartbeats [worker :executors nil]
  ;; stats is how we know what executors are assigned to this worker
  (let [stats (if-not executors
                  (into {} (map (fn [e] {e nil}) (:executors worker)))
                  (->> executors
                    (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
                    (apply merge)))
        zk-hb {:storm-id (:storm-id worker)
               :

executor-stats stats

               :uptime ((:uptime worker))
               :time-secs (current-time-secs)
               }]
    ;; do the zookeeper heartbeat
    (.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)
    ))

2. 现在任何人都可以通过nimbus的thrift接口来得到相关信息 

(^TopologyInfo getTopologyInfo [this ^String storm-id]
   beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
   stats (:stats heartbeat))

3. 最直接的用户就是storm UI, 在准备topology page的时候, 就会调用getTopologyInfo来获取数据

(defn topology-page [id window include-sys?]
  (with-nimbus nimbus
    (let [summ (.getTopologyInfo ^Nimbus$Client nimbus id)]
)

 

Stats

这个模块用于spout和bolt来抽样统计数据, 需要统计的具体metics如下

(def COMMON-FIELDS [:emitted :transferred])
(defrecord CommonStats [emitted transferred rate])

(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
;;acked and failed count individual tuples
(defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])

(def SPOUT-FIELDS [:acked :failed :complete-latencies])
;;acked and failed count tuple completion
(defrecord SpoutExecutorStats [common acked failed complete-latencies])

 

抽样的比例在storm-conf, TOPOLOGY_STATS_SAMPLE_RATE, 配置

为什么统计时每次加rate, 而不是加1?

因为这里的统计是抽样的, 所以如果抽样比例是10%, 那么发现一个, 应该加1/(10%), 10个

(defn sampling-rate [conf]
  (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
       (/ 1)
       int))

 

然后统计是基于时间窗口的, 底下是对应默认的bucket和时间窗口的定义

(def NUM-STAT-BUCKETS 20) ;;bucket数
;; 10 minutes, 3 hours, 1 day ;;定义3种时间窗口
(def STAT-BUCKETS [30 540 4320]) ;;bucket大小分别是30,540,4320秒

 

核心数据结构是RollingWindowSet, 包含: 
统计数据需要的函数, updater extractor, 之所以治理也需要是因为需要统计all-time  
一组rolling windows, 默认是3个时间窗, 10 minutes, 3 hours, 1 day 
all-time, 在完整的时间区间上的统计结果

(defrecord RollingWindowSet [updater extractor windows all-time])
(defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes]
  (RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil)
  )

 

继续看看rolling window的定义, 
核心数据, buckets, hashmap, {streamid, data}, 初始化为{} 
统计data需要的函数, updater merger extractor 
时间窗口, buckets大小和buckets个数

(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets])
(defn rolling-window [updater merger extractor bucket-size-secs num-buckets]
  (RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))

 

1. mk-stats

在mk-executedata的时候需要创建stats

mk-executor-stats <> (sampling-rate storm-conf)

 

;; TODO: refactor this to be part of an executor-specific map
(defmethod mk-executor-stats :spout [_ rate]
  (stats/mk-spout-stats rate))
(defmethod mk-executor-stats :bolt [_ rate]
  (stats/mk-bolt-stats rate))

第一个参数忽略, 其实就是分别调用stats/mk-spout-stats或stats/mk-bolt-stats, 可见就是对于每个需要统计的数据, 创建一个rolling-windows-set

(defn- mk-common-stats [rate]
  (CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                rate
                ))

(defn mk-bolt-stats [rate]
  (BoltExecutorStats. (mk-common-stats rate)
                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  ))

(defn mk-spout-stats [rate]
  (SpoutExecutorStats. (mk-common-stats rate)
                   (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                   (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                   (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                   ))

 

2. 数据更新

(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms]
  (update-executor-stat! stats :acked stream (stats-rate stats))
  (update-executor-stat! stats :complete-latencies stream latency-ms)
  )
(defmacro update-executor-stat! [stats path & args]
  (let [path (collectify path)]
    `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)
    ))

就以update-executor-stat! stats :acked stream (stats-rate stats)为例子看看怎么做的?

SpoutExecutorStats取出用于记录spout acked情况的rolling-windows-set 
然后使用update-rolling-window-set来swap这个atom

来看看记录acked的rolling-windows-set是如何定义的?

keyed-counter-rolling-window-set, 预定义了updater merger extractor 
updater, incr-val [amap key amt], 把给定的值amt加到amap的对应的key的value上 
merger, (partial merge-with +), 用+作为map merge的逻辑, 即出现相同key则相加 
extractor, counter-extract, (if v v {}), 有则返回, 无则返回{} 
windows, rolling-window的list 
all-time, 初始化为nil

(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes]
  (apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))

 

好, 下面就看看, 当spout-acked-tuple!时更新:acked时, 如何update的?

首先更新每个rolling-window, 并把更新过的rolling-window-set更新到:windows 
并且更新:all-time, (apply (:updater rws) (:all-time rws) args) 
updated, incr-val [amap key amt] 
args, steamid, rate 
all-time, 是用来记录整个时间区间上的, 某个stream的统计情况

(defn update-rolling-window-set
  ([^RollingWindowSet rws & args]
     (let [now (current-time-secs)
           new-windows (dofor [w (:windows rws)]
                         (apply update-rolling-window w now args))]
       (assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args))
       )))

看下如何更新某个rolling-windw 
根据now算出当前属于哪个bucket, time-bucket 
取出buckets, 并使用:updater更新相应的bucket, 这里的操作仍然是把rate叠加到streamid的value上

(defn update-rolling-window
  ([^RollingWindow rw time-secs & args]
     ;; this is 2.5x faster than using update-in...
     (let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
           buckets (:buckets rw)
           curr (get buckets time-bucket)
           curr (apply (:updater rw) curr args)
           ]
       (assoc rw :buckets (assoc buckets time-bucket curr))
       )))

本文章摘自博客园,原文发布日期:2013-07-29
时间: 2024-09-19 08:39:20

Storm-源码分析-Stats (backtype.storm.stats)的相关文章

Storm-源码分析- Scheduler (backtype.storm.scheduler)

首先看看IScheduler接口的定义, 主要实现两个接口, prepare和schedule 对于schedule的参数注释写的非常清楚,  topologies包含所有topology的静态信息, 而cluster中包含了topology的运行态信息  根据他们就可以来判断如何assignment package backtype.storm.scheduler; import java.util.Map; public interface IScheduler { void prepare

Storm-源码分析-acker (backtype.storm.daemon.acker)

backtype.storm.daemon.acker  设计的巧妙在于, 不用分别记录和track, stream过程中所有的tuple, 而只需要track root tuple, 而所有中间过程都通过异或更新track entry acker-init, 在spout发送一个tuple时触发, 初始化这个root tuple的track entry   acker-ack, 在blot ack一个tuple的时候触发, 会对该tuple的anchors-to-ids中记录的每个(root,

Storm-源码分析- hook (backtype.storm.hooks)

task hook 在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook 当前定义如下事件, emit, cleanup, spoutAck-- 用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS) 系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions   public interface ITaskHook { void prepare(Map conf, T

Storm-源码分析- spout (backtype.storm.spout)

1. ISpout接口 ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message.  如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message 必须要注意的是, spout线程会在一个线程中调用ack, fail, nextTuple, 所以不用考虑互斥,

Storm-源码分析-EventManager (backtype.storm.event)

大体结构, 定义protocol EventManager, 其实就是定义interface 函数event-manager, 主要做2件事  1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行  2. 返回实现EventManager的匿名record(reify部分, 实现protocol) 这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner (ns backtype.storm

Storm-源码分析- Messaging (backtype.storm.messaging)

先定义两个接口和一个类  TaskMessage类本身比较好理解, 抽象storm的message格式  对于IContext, 注释也说了, 定义messaging plugin, 通过什么渠道去发送message, storm这里设计成可替换的  默认定义storm实现了local和ZMQ两种plugin, 当然你可以实现更多的  local应该是用于local mode, 而ZMQ用于distributed mode IContext接口主要是用于创建IConnection, 体现对soc

Storm-源码分析- bolt (backtype.storm.task)

Bolt关键的接口为execute,  Tuple的真正处理逻辑, 通过OutputCollector.emit发出新的tuples, 调用ack或fail处理的tuple /** * An IBolt represents a component that takes tuples as input and produces tuples * as output. An IBolt can do everything from filtering to joining to functions

Storm-源码分析-LocalState (backtype.storm.utils)

LocalState A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk. 基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景. public syn

Storm-源码分析- timer (backtype.storm.timer)

mk-timer timer是基于PriorityQueue实现的(和PriorityBlockingQueue区别, 在于没有阻塞机制, 不是线程安全的), 优先级队列是堆数据结构的典型应用  默认情况下, 按照自然顺序(其实就是默认comparator的定义), 最小的元素排在堆头  当然也可以自己重新实现comparator接口, 比如timer就用reify重新实现了comparator接口 整个过程其实比较简单, 开个timer-thread, 不断check PriorityQueu

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net