Storm-源码分析-Topology Submit-Supervisor

mk-supervisor

(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
  (log-message "Starting Supervisor with conf " conf)
  (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) ;;初始化supervisor-id,并存在localstate中(参考ISupervisor的实现)
(FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) ;;清空本机的supervisor目录
  (let [supervisor (supervisor-data conf shared-context isupervisor);;创建两个event-manager,用于在后台执行function
        [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
        sync-processes (partial sync-processes supervisor) ;;partial sync-process
        ;;mk-synchronize-supervisor, mk-supervisor的主要工作,参考下面        synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
        ;;定义生成supervisor hb的funciton        heartbeat-fn (fn [] (.supervisor-heartbeat!
                               (:storm-cluster-state supervisor)
                               (:supervisor-id supervisor)
                               (SupervisorInfo. (current-time-secs)
                                                (:my-hostname supervisor)
                                                (:assignment-id supervisor)
                                                (keys @(:curr-assignment supervisor))
                                                ;; used ports
                                                (.getMetadata isupervisor)
                                                (conf SUPERVISOR-SCHEDULER-META)
                                                ((:uptime supervisor)))))]
    ;;先调用heartbeat-fn发送一次supervisor的hb
    ;;接着使用schedule-recurring去定期调用heartbeat-fn更新hb
    (heartbeat-fn)
    ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
    (schedule-recurring (:timer supervisor)
                        0
                        (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
                        heartbeat-fn))
 

mk-synchronize-supervisor

supervisor很简单, 主要管两件事, 
当assignment发生变化时, 从nimbus同步topology的代码到本地 
当assignment发生变化时, check workers状态, 保证被分配的work的状态都是valid

两个需求, 
1. 当assignment发生变化时触发 
    怎样通过zookeeper的watcher实现这个反复触发机制, 参考
Storm-源码分析- Storm中Zookeeper的使用

2. 因为比较耗时, 后台执行 
    创建两个event-manager, 分别用于后台执行mk-synchronize-supervisor和sync-processes

mk-synchronize-supervisor, 比较特别的是内部用了一个有名字的匿名函数this来封装这个函数体 
刚开始看到非常诧异, 其实目的是为了可以在sync-callback中将这个函数add到event-manager里面去 
即每次被调用, 都需要再一次把sync-callback注册到zk, 以保证下次可以被继续触发

 

(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
  (fn this []
    (let [conf (:conf supervisor)
          storm-cluster-state (:storm-cluster-state supervisor)
          ^ISupervisor isupervisor (:isupervisor supervisor)
          ^LocalState local-state (:local-state supervisor) ;;本地缓存数据库
          sync-callback (fn [& ignored] (.add event-manager this)) ;;生成callback函数(后台执行mk-synchronize-supervisor)
          assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback) ;;读取assignments,并注册callback,在zk->assignment发生变化时被触发
          storm-code-map (read-storm-code-locations assignments-snapshot) ;;从哪儿下载topology code
          downloaded-storm-ids (set (read-downloaded-storm-ids conf)) ;;已经下载了哪些topology
          all-assignment (read-assignments  ;;supervisor的port上被分配了哪些executors
                           assignments-snapshot
                           (:assignment-id supervisor)) ;;supervisor-id
          new-assignment (->> all-assignment ;;new=all,因为confirmAssigned没有具体实现,always返回true
                              (filter-key #(.confirmAssigned isupervisor %)))
          assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) ;;supervisor上被分配的topology id集合
           existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)] ;;从local-state数据库里面读出当前保存的local assignments

      ;;下载新分配的topology代码
      (doseq [[storm-id master-code-dir] storm-code-map]
        (when (and (not (downloaded-storm-ids storm-id))
                   (assigned-storm-ids storm-id))
         (download-storm-code conf storm-id master-code-dir)))     
      (.put local-state     ;;把new-assignment存到local-state数据库中
            LS-LOCAL-ASSIGNMENTS
            new-assignment)
      (reset! (:curr-assignment supervisor) new-assignment) ;;把new-assignment cache到supervisor对象中
       
      ;;删除无用的topology code       ;;remove any downloaded code that's no longer assigned or active
      (doseq [storm-id downloaded-storm-ids]
        (when-not (assigned-storm-ids storm-id)
          (log-message "Removing code for storm id " storm-id)
          (rmr (supervisor-stormdist-root conf storm-id))
          ))
      ;;后台执行sync-processes
      (.add processes-event-manager sync-processes)
      )))

sync-processes

sync-processes用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers 
首先从本地读出workers的hb, 来判断work状况, shutdown所有状态非valid的workers 
并为被assignment, 而worker状态非valid的slot, 创建新的worker

(defn sync-processes [supervisor]
  (let [conf (:conf supervisor)
        ^LocalState local-state (:local-state supervisor)
        assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
        now (current-time-secs)
        allocated (read-allocated-workers supervisor assigned-executors now) ;;1.读取当前worker的状况
        keepers (filter-val     ;;找出状态为valid的worker
                 (fn [[state _]] (= state :valid))
                 allocated)
        keep-ports (set (for [[id [_ hb]] keepers] (:port hb))) ;;keepers的ports集合
         ;;select-keys-pred(pred map), 对map中的key使用pred进行过滤
         ;;找出assigned-executors中executor的port, 哪些不属于keep-ports, 
        ;;即找出新被assign的workers或那些虽被assign但状态不是valid的workers(dead或没有start)        ;;这些executors需要从新分配到新的worker上去
        reassign-executors (select-keys-pred (complement keep-ports) assigned-executors)
        new-worker-ids (into
                        {}
                        (for [port (keys reassign-executors)]  ;;为reassign-executors的port产生新的worker-id
                          [port (uuid)]))
        ]
    ;; 1. to kill are those in allocated that are dead or disallowed
    ;; 2. kill the ones that should be dead
    ;;     - read pids, kill -9 and individually remove file
    ;;     - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
    ;; 3. of the rest, figure out what assignments aren't yet satisfied
    ;; 4. generate new worker ids, write new "approved workers" to LS
    ;; 5. create local dir for worker id
    ;; 5. launch new workers (give worker-id, port, and supervisor-id)
    ;; 6. wait for workers launch
    (doseq [[id [state heartbeat]] allocated]
      (when (not= :valid state)  ;;shutdown所有状态不是valid的worker
       (shutdown-worker supervisor id)))
    (doseq [id (vals new-worker-ids)]
      (local-mkdirs (worker-pids-root conf id))) ;;为新的worker创建目录, 并加到local-state的LS-APPROVED-WORKERS中
    (.put local-state LS-APPROVED-WORKERS   ;;更新的approved worker, 状态为valid的 + new workers
          (merge
           (select-keys (.get local-state LS-APPROVED-WORKERS) ;;现有approved worker中状态为valid
                        (keys keepers))
           (zipmap (vals new-worker-ids) (keys new-worker-ids)) ;;new workers
           ))
    (wait-for-workers-launch ;;2.wait-for-workers-launch
     conf
     (dofor [[port assignment] reassign-executors]
       (let [id (new-worker-ids port)]
         (launch-worker supervisor
                        (:storm-id assignment)
                        port
                        id)
         id)))
    ))

1. read-allocated-workers

(defn read-allocated-workers
  "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
  [supervisor assigned-executors now]
  (let [conf (:conf supervisor)
        ^LocalState local-state (:local-state supervisor);从local-state中读出每个worker的hb, 当然每个worker进程会不断的更新本地hb
        id->heartbeat (read-worker-heartbeats conf) 
        approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))] ;;从local-state读出approved的worker
    (into
     {}
     (dofor [[id hb] id->heartbeat] ;;根据hb来判断worker的当前状态
            (let [state (cond
                         (or (not (contains? approved-ids id))
                             (not (matches-an-assignment? hb assigned-executors)))
                           :disallowed  ;;不被允许
                         (not hb)
                           :not-started ;;无hb,没有start
                         (> (- now (:time-secs hb))
                            (conf SUPERVISOR-WORKER-TIMEOUT-SECS))
                           :timed-out  ;;超时,dead
                         true
                           :valid)]
              (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now)
              [id [state hb]] ;;返回每个worker的当前state和hb
              ))
     )))

 

2. wait-for-workers-launch

对reassign-executors中的每个new_work_id调用launch-worker

最终调用wait-for-workers-launch, 等待worder被成功launch

逻辑也比较简单, check hb, 如果没有就不停的sleep, 至到超时, 打印failed to start

(defn- wait-for-worker-launch [conf id start-time]
  (let [state (worker-state conf id)]
    (loop []
      (let [hb (.get state LS-WORKER-HEARTBEAT)]
        (when (and
               (not hb)
               (<
                (- (current-time-secs) start-time)
                (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)
                ))
          (log-message id " still hasn't started")
          (Time/sleep 500)
          (recur)
          )))
    (when-not (.get state LS-WORKER-HEARTBEAT)
      (log-message "Worker " id " failed to start")
      )))

(defn- wait-for-workers-launch [conf ids]
  (let [start-time (current-time-secs)]
    (doseq [id ids]
      (wait-for-worker-launch conf id start-time))
    ))

本文章摘自博客园,原文发布日期:2013-06-28
时间: 2025-01-27 20:39:21

Storm-源码分析-Topology Submit-Supervisor的相关文章

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net

Storm-源码分析-Topology Submit-Nimbus

Nimbus Server Nimbus server, 首先从启动命令开始, 同样是使用storm命令"storm nimbus"来启动  看下源码, 此处和上面client不同, jvmtype="-server", 最终调用"backtype.storm.daemon.nimbus"的main  nimbus是用clojure实现的, 但是clojure是基于JVM的, 所以在最终发布的时候会产生nimbus.class, 所以在用户使用的

深入理解Spark:核心思想与源码分析

大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术丛书) ISBN 978-7-111-52234-8 I. 深- II.耿- III.数据处理软件 IV. TP274 中国版本图书馆CIP数据核字(2015)第280808号 深入理解Spark:核心思想与源码分析 出版发行:机械工业出版社(北京市西城区百万庄大街22号 邮政编码:100037)

Storm-源码分析-Topology Submit-Worker

1 mk-worker 和其他的daemon一样, 都是通过defserverfn macro来创建worker (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id] (log-message "Launching worker for " storm-id " on " assignment-id ":" port "

MapReduce源码分析之LocatedFileStatusFetcher

        LocatedFileStatusFetcher是MapReduce中一个针对给定输入路径数组,使用配置的线程数目来获取数据块位置的实用类.它的主要作用就是利用多线程技术,每个线程对应一个任务,每个任务针对给定输入路径数组Path[],解析出文件状态列表队列BlockingQueue<List<FileStatus>>.其中,输入数据输入路径只不过是一个Path,而输出数据则是文件状态列表队列BlockingQueue<List<FileStatus&g

线程池源码分析-FutureTask

1 系列目录 线程池接口分析以及FutureTask设计实现 线程池源码分析-ThreadPoolExecutor 该系列打算从一个最简单的Executor执行器开始一步一步扩展到ThreadPoolExecutor,希望能粗略的描述出线程池的各个实现细节.针对JDK1.7中的线程池 2 Executor接口说明 Executor执行器,就是执行一个Runnable任务,可同步可异步,接口定义如下: public interface Executor { /** * Executes the g

Hadoop2源码分析-MapReduce篇

1.概述 前面我们已经对Hadoop有了一个初步认识,接下来我们开始学习Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天为大家分享的是mapreduce部分,其内容目录如下所示: MapReduce V1 MapReduce V2 MR V1和MR V2的区别 MR V2的重构思路 本篇文章的源码是基于hadoop-2.6.0-src.tar.gz来完成的.代码下载地址,请参考<Hadoop2源码分析-准备篇>. 2.MapReduce V

Apache Storm源码阅读笔记&amp;OLAP在大数据时代的挑战

 <一>Apache Storm源码阅读笔记 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比较少,理解起来非常费劲. 尽管自己也陆续对storm的源码走读发表了一些博文,当时写的时候比较匆忙,有时候衔接的不是太好,此番做了一些整理,主要是针对TridentTopology部分,修改过的内容采用pdf格式发布,方便打印. 文章中有些内容的理解得益于徐明明和fxjwind两

MapReduce源码分析之JobSubmitter(一)

        JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑.本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter.         首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 pr

Storm-源码分析-Topology Submit-Executor

在worker中通过executor/mk-executor worker e, 创建每个executor (defn mk-executor [worker executor-id] (let [executor-data (mk-executor-data worker executor-id) ;;1.mk-executor-data _ (log-message "Loading executor " (:component-id executor-data) ":&