twitter storm源码走读(三)

worker进程中线程的分类及用途

本文重点分析storm的worker进程在正常启动之后有哪些类型的线程,针对每种类型的线程,剖析其用途及消息的接收与发送流程。

概述

worker进程启动过程中最重要的两个函数是mk-workerworker-data,代码就不一一列出了。worker顺利启动之后会拥有如下图所示的各类线程。

 

接收和发送线程

worker在启动的时候会生成进程级别的消息接收和消息发送线程,它们视具体配置而定,可以是基于zmq,也可以基于netty,这个没有太多好说的。socket connection的建立过程可以在tuple消息传递一文中找到说明。

zk client

worker需要定期的向zk server发送心跳消息,与zk server之间的连接处理就落到zk client这个线程身上了。具体代码见函数do-heartbeat及do-executor-heartbeats。

定时器线程

worker进程需要定期的做些事情,比如发送心跳消息,刷新socket连接,这些定时器归为如下几类,每类定时器运行在各自的线程。

  1. :heartbeat-timer worker
  2. :refresh-connections-timer worker
  3. :refresh-active-timer worker
  4. :executor-heartbeat-timer worker
  5. :user-timer worker

 

上述定时器分类见于worker的shutdown函数,有时候在分析代码的时候,如果从入口看不清楚的话,不妨试试从退出的处理逻辑哪里找找答案。

SystemBolt

在topology提交的时候曾经见过函数system-topology!,这个函数会创建SystemBolt,每个worker内有且只有一个SystemBolt,可以见SystemBolt.java中注释的说明或参考github上storm对该改变的说明,https://github.com/nathanmarz/storm/pull/517

SystemBolt主要进行进程相关的统计功能,比如内存使用情况,网络包的吞吐量,具体可见SystemBolt.java。SystemBolt是不接收tuple,只有出度,没有入度。

Metrics Bolt线程

MetricsBolt主要也是处理统计工作,与systembolt不同的是,metricsbolt主要处理executor级别的,如果用户在配置文件中定义了相关的MetricsConsumer类,那么这些类会在此被执行。

与之相关的配置内容,

## Metrics Consumers
# topology.metrics.consumer.register:
#   - class: "backtype.storm.metrics.LoggingMetricsConsumer"
#     parallelism.hint: 1
#   - class: "org.mycompany.MyMetricsConsumer"
#     parallelism.hint: 1
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"

 

Shared Executor

这个是在storm 0.8中引入的,其用途可在0.8的release notes中找到,创建共享线程池,具体用途没太搞清楚,:).

Metrics的执行流程

metrics所做的计量工作是在什么时候被唤醒的呢,也就是说如何一步步的触发直到MetricsConsumeBolt的execute函数被调用。

下图勾勒出与metrics相关的线程间的消息传递过程。

 

简要说明如下

  1. worker在启动的时候,会往:user-timer中注册metrics timer(见setup-metrics!函数).
  2. 一旦metrics timer超时,会发送一个stream-id为metrics-tick-stream-id的tuple到非metrics类型的bolt,如user/acker/system bolt.
  3. 接收到tuple之后,会调用metrics-tick函数发送task-data给MetricsConsumerBolt, stream-id为metrics-stream-id
  4. MetricsConsumerBolt接收到stream-id为metrics-stream-id的tuple后,会执行execute

注:在worker内部还有另一套计量api,定义于builtin-metrics.clj中,与MetricsConsumerBolt的区别在于,builtin-metrics是在处理外部进程发送过来的tuple时进行计量统计,而MetricsConsumerBolt是定时触发。

worker进程内部消息传递处理和数据结构分析

本文从外部消息在worker进程内部的转化,传递及处理过程入手,一步步分析在worker-data中的数据项存在的原因和意义。试图从代码实现的角度来回答,如果是从头开始实现worker的话,该如何来定义消息接口,如何实现各自接口上的消息处理。

Topology到Worker的映射关系

Topology由Spout,Bolt组成,其逻辑关系大体如下图所示。

无论是Spout或Bolt的处理逻辑都需要在进程或线程内执行,那么它们与进程及线程间的映射关系又是如何呢。有关这个问题,Understanding the Parallelism of a Storm Topology 一文作了很好的总结,现重复一下其要点。

  1. worker是进程,executor对应于线程,spout或bolt是一个个的task
  2. 同一个worker只会执行同一个topology相关的task
  3. 在同一个executor中可以执行多个同类型的task, 即在同一个executor中,要么全部是bolt类的task,要么全部是spout类的task
  4. 运行的时候,spout和bolt需要被包装成一个又一个task

worker,executor, task三者之间的关系可以用下图表示

小结一下,Worker=Process, Executor=Thread, Task=Spout or Bolt.

每一个executor使用的是actor pattern,high level的处理逻辑如下图所示

外部消息的接收和处理

在源码走读之四一文中总结了worker进程内的各种类型的thread,也即executor,这个等同于定义了进程内部和进程间的接口类型。那么这些接口上的消息在具体流传和处理过程中需要定义哪些数据结构,针对这些数据结构,又要做哪些必要的处理呢?

换句话说,就是为什么在worker.clj中有哪些数据和函数存在,不这样做,可以不?

先图示一下,外部消息处理的大概流程。

注:圈起来的数字表示消息转换和处理的序列。

步骤一

监听端口准备就绪,接收线程在收到外部的消息后,面临的问题就是如何确定由哪个task来处理该消息。接收到的tuple中含有task-id,根据task-id可以知道运行该task的executor,executor中有receive-message-queue即(incoming queue)来存放外部的tuple. 定义的数据结构需要反映这个转换过程task-id->executor->receive-queue-map.

那么在worker-data中哪些数据项与这个过程相关呢

  1. :port
  2. :executor-receive-queue-map
  3. :short-executor-receive-queue-map
  4. :task->short-executor
  5. :transfer-local-fn

transfer-local-fn将数据从接收线程发送到spout或bolt所在的executor线程。

步骤二

接下来数据会被传递到executor,于是又牵涉到executor的数据结构问题。executor-data由函数mk-executor-data创建,其内容与worker-data比较起来相对较少。

executor收到tuple之后,第一步需要进行反序列化,storm中使用kyro来进行序列化和反序列化,这也是为什么在executor中有该数据项的原因。

executor中与步骤2相关的数据项

  1. :type executor-type
  2. :receive-queue
  3. :deserializer (executor-data中的数据项)

步骤三:

步骤2处理结束,会产生相应的tuple发送到外部。这个过程需要多解释一下,首先tuple不是直接发送给worker的transfer-thread(负责向其它进程发送消息),而是发送给send-handler线程,每一个executor在创建的时候最起码会有两个线程被创建,一个用于运行bolt或spout的处理逻辑,另一个用以负责缓存bolt或spout产生的对外发送的tuple。

一旦snd-hander中的tuple数量达到阀值,这些被缓存的tuple会一次性发送给worker级别的transfer-thread.

executor中与步骤3相关的数据项

  1. :transfer-fn (mk-executor-transfer-fn batch-transfer->worker)
  2. :batch-transfer-queue

在步骤3中生成outgoing的tuple,tuple生成的时候需要回答两个基本问题

  1. tuple中含有哪些字段 --   该问题的解答由spout或bolt中的declareOutFields来解决
  2. 由哪个node+port来接收该tuple -- 由grouping来解决,这个时候就可以看出为什么需要task这一层的逻辑抽象了,有关grouping的详细解释,请参考fxjwind撰写的Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)

步骤四:

处理逻辑很简单,先将数据缓存,然后在达到阀值之后,一起传送给transfer-thread.

start-batch-transfer->worker-handler

(defn start-batch-transfer->worker-handler! [worker executor-data]
  (let [worker-transfer-fn (:transfer-fn worker)
        cached-emit (MutableObject. (ArrayList.))
        storm-conf (:storm-conf executor-data)
        serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))
        ]
    (disruptor/consume-loop*
      (:batch-transfer-queue executor-data)
      (disruptor/handler [o seq-id batch-end?]
        (let [^ArrayList alist (.getObject cached-emit)]
          (.add alist o)
          (when batch-end?
            (worker-transfer-fn serializer alist)
            (.setObject cached-emit (ArrayList.))
            )))
      :kill-fn (:report-error-and-die executor-data))))

worker-transfer-fn是worker中的transfer-fn,由mk-transfer-fn生成。

(defn mk-transfer-fn [worker]
  (let [local-tasks (-> worker :task-ids set)
        local-transfer (:transfer-local-fn worker)
        ^DisruptorQueue transfer-queue (:transfer-queue worker)]
    (fn [^KryoTupleSerializer serializer tuple-batch]
      (let [local (ArrayList.)
            remote (ArrayList.)]
        (fast-list-iter [[task tuple :as pair] tuple-batch]
          (if (local-tasks task)
            (.add local pair)
            (.add remote pair)
            ))
        (local-transfer local)
        ;; not using map because the lazy seq shows up in perf profiles
        (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])]
          (disruptor/publish transfer-queue serialized-pairs)
          )))))

步骤五:

处理函数mk-transfer-tuples-handler,主要进行序列化,将序列化后的数据发送给目的地址。

(defn mk-transfer-tuples-handler [worker]
  (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
        drainer (ArrayList.)
        node+port->socket (:cached-node+port->socket worker)
        task->node+port (:cached-task->node+port worker)
        endpoint-socket-lock (:endpoint-socket-lock worker)
        ]
    (disruptor/clojure-handler
      (fn [packets _ batch-end?]
        (.addAll drainer packets)
        (when batch-end?
          (read-locked endpoint-socket-lock
            (let [node+port->socket @node+port->socket
                  task->node+port @task->node+port]
              ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead)
              ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering)

              (fast-list-iter [[task ser-tuple] drainer]
                ;; TODO: consider write a batch of tuples here to every target worker
                ;; group by node+port, do multipart send
                (let [node-port (get task->node+port task)]
                  (when node-port
                    (.send ^IConnection (get node+port->socket node-port) task ser-tuple))
                    ))))
          (.clear drainer))))))

tuple发送的时候需要用到connection,但目前只知道task-id,所以在worker中需要保存task-id到node+port的映射,node+port与outgoing connections之间的映射。

worker中与步骤5相关的数据项:

  1. :cached-node+port->socket
  2. :cached-task->node+port
  3. :component->stream->fields
  4. :component->sorted-tasks
  5. :endpoint-socket-lock
  6. :transfer-queue (线程内部的消息队列)
  7. :task->component

其它的数据项

上述五个步骤并没有涵盖worker-data所有的数据项,那么其它的数据项归一归类,大体如下

timer相关,timer相关的数据项包括timer及其对应的处理句柄

  1. :heartbeat-timer
  2. :refresh-connection-timer
  3. :refresh-active-timer
  4. :executor-heartbeat-timer
  5. :user-timer

zk相关

  1. :storm-cluster-state
  2. :storm-active-atom
  3. :cluster-state

配置相关

  1. :conf
  2. :mq-context 在transport layer是使用zmq还是netty

Assignment相关

  1. :storm-id
  2. :assigment-id
  3. :worker-id
  4. :executors
  5. :task-ids
  6. :storm-conf
  7. :topology
  8. :system-topology

进程关闭相关

  1. :suicide-fn

其它的其它

  1. :uptime 运行时间,统计用
  2. :default-shared-resources 线程池
  3. :user-shared-resources 未启用

 小结

设计的时候,一定是先画出一个大概的蓝图,然后逐步的细化并加以实现。具体来说,步骤如下

  1. manifest 定义主要的功能
  2. draw skeleton 画出实现草图,定义主要的接口
  3. discussion 与团队讨论
  4. data structures 数据结构
  5. function 函数实现
  6. testing 测试
时间: 2024-09-09 05:50:50

twitter storm源码走读(三)的相关文章

twitter storm源码走读(二)

topology提交过程分析 概要 storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配.除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这个时候nimbus充当了调度室的角色.supervisor作为中层干部,职责就是生产车间的主任,他的日常工作就是时刻等待着调度到给他下达新的工作.作为车间主任,supervisor领到的活是不用自己亲力亲为去作的,他手下有着一班的普通工人.supervisor对这些工人只会喊两句话,开工,收工.注意

twitter storm源码走读(一)

nimbus启动场景分析 本文详细介绍了twitter storm中的nimbus节点的启动场景,分析nimbus是如何一步步实现定义于storm.thrift中的service,以及如何利用curator来和zookeeper server建立通讯. 对于storm client来说,nimbus是storm cluster与外部的唯一接口,是总的接口人,在这个接口上使用thrift定义的各种service.但是nimbus光接单并不干活,具体的脏活累活,这哥们都是分配到各个slots上的,让

twitter storm源码走读(四)

Trident Topology执行过程分析 TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射.关于TridentTopology的使用及运行原理,当前进行详细分析的文章不多. 从TridentTopology到vanilla topology(普通的topology)由三个层次组成: 面向最终用户的概念stream, operation 利用planner将tridenttopology转换成va

twitter storm源码走读(五)

TridentTopology创建过程详解 从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation.在实现层面来看,无论是stream,还是后续的operation都会转变成为各个Node,这些Node之间的关系通过重要的数据结构图来维护.具体到TridentTopology,实现图的各种操作的组件是jgrapht. 说到图,两个基本的概念会闪现出来,一是结点,二是描述结点之间关系的边.要想很好的理解TridentTo

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net

Apache Spark源码走读(十一)浅谈mllib中线性回归的算法实现&Spark MLLib中拟牛顿法L-BFGS的源码实现

<一>浅谈mllib中线性回归的算法实现 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最

Apache Spark源码走读(八)Graphx实现剖析&amp;spark repl实现详解

<一>Graphx实现剖析 概要 图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架.Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情. Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口.本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习. Google为什么赢得了搜索引擎大战 当Google还在起步的时候,在

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

<一>Standalone部署方式分析 楔子 在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解. 没有HA的Standalone运行模式 先从比较简单的说起,所谓的没有ha是指master节点没有ha. 组成cluster的两大元素即Master和Worker.slave worker可以有

Apache Storm源码阅读笔记&amp;OLAP在大数据时代的挑战

 <一>Apache Storm源码阅读笔记 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比较少,理解起来非常费劲. 尽管自己也陆续对storm的源码走读发表了一些博文,当时写的时候比较匆忙,有时候衔接的不是太好,此番做了一些整理,主要是针对TridentTopology部分,修改过的内容采用pdf格式发布,方便打印. 文章中有些内容的理解得益于徐明明和fxjwind两