Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)

executor在发送outbounding message的时候, 需要决定发送到next component的哪些tasks 
这里就需要用到streaming grouping,

 

1. mk-grouper

除了direct grouping, 返回的是grouper function, 执行该grouper function得到target tasks list 
direct grouping返回, :direct

(defn- mk-grouper
  "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index."
  [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks]
  (let [num-tasks (count target-tasks)
        random (Random.)
        target-tasks (vec (sort target-tasks))]
    (condp = (thrift/grouping-type thrift-grouping)
      :fields                                         ;;1.1 fields-grouping, 根据某个field进行grouping
        (if (thrift/global-grouping? thrift-grouping) ;;1.2 fields为空时,代表global-grouping,所有tuple发到一个task
          (fn [task-id tuple]
            ;; It's possible for target to have multiple tasks if it reads multiple sources
            (first target-tasks))                     ;;对于global-grouping,取排过序的第一个task, taskid最小的task
          (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))] ;;取出group-fields
            (mk-fields-grouper out-fields group-fields target-tasks)
            ))
      :all
        (fn [task-id tuple] target-tasks) ;;1.3 all-grouping, 比较简单, 发送到所有task, 所以返回整个target-tasks
      :shuffle
        (mk-shuffle-grouper target-tasks) ;;1.4 shuffle-grouping
      :local-or-shuffle                   ;;1.5 local优先, 如果目标tasks有local的则shuffle到local的tasks
        (let [same-tasks (set/intersection
                           (set target-tasks)
                           (set (.getThisWorkerTasks context)))]
          (if-not (empty? same-tasks)
            (mk-shuffle-grouper (vec same-tasks))
            (mk-shuffle-grouper target-tasks)))
      :none                              ;;1.6 简单的版本的random,从target-tasks随机取一个
        (fn [task-id tuple]
          (let [i (mod (.nextInt random) num-tasks)]
            (.get target-tasks i)
            ))
      :custom-object
        (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]
          (mk-custom-grouper grouping context component-id stream-id target-tasks))
      :custom-serialized
        (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))]
          (mk-custom-grouper grouping context component-id stream-id target-tasks))
      :direct
        :direct
      )))

 

1.1 fields-groups

使用.select取出group-fields在tuple中对应的values list, 你可以使用多个fields来进行group 
使用tuple/list-hash-code, 对values list产生hash code 
对num-tasks取mod, 并使用task-getter取出对应的target-tasks

(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List target-tasks]
  (let [num-tasks (count target-tasks)
        task-getter (fn [i] (.get target-tasks i))]
    (fn [task-id ^List values]
      (-> (.select out-fields group-fields values)
          tuple/list-hash-code
          (mod num-tasks)
          task-getter))))

Fields类, 除了存放fields的list, 还有个用于快速field读取的index 
index的生成, 很简单, 就是记录fields以及自然排序 
使用时调用select, 给出需要哪几个fields的value, 以及tuple 
从index读出fields的index值, 直接从tuple中读出对应index的value (当然生成tuple的时候, 也必须安装fields的顺序生成)

public class Fields implements Iterable<String>, Serializable {
    private List<String> _fields;
    private Map<String, Integer> _index = new HashMap<String, Integer>();

    private void index() {
        for(int i=0; i<_fields.size(); i++) {
            _index.put(_fields.get(i), i);
        }
    }

    public List<Object> select(Fields selector, List<Object> tuple) {
        List<Object> ret = new ArrayList<Object>(selector.size());
        for(String s: selector) {
            ret.add(tuple.get(_index.get(s)));
        }
        return ret;
    }
}

1.2 globle-groups

fields grouping, 但是field为空, 就代表globle grouping, 所有tuple都发送到一个task

默认选取第一个task

1.3 all-groups

发送到所有的tasks

1.4 shuffle-grouper

没有采用比较简单的直接用random取值的方式(区别于none-grouping) 
因为考虑到load balance, 所以采用下面这种伪随机的实现方式

对target-tasks, 先随机shuffle, 打乱次序 
在acquire-random-range-id, 会依次读所有的task, 这样保证, 虽然顺序是随机的, 但是每个task都会被选中一次 
当curr越界时, 清空curr, 并从新shuffle target-tasks

(defn- mk-shuffle-grouper [^List target-tasks]
  (let [choices (rotating-random-range target-tasks)]
    (fn [task-id tuple]
      (acquire-random-range-id choices))))
(defn rotating-random-range [choices]
  (let [rand (Random.)
        choices (ArrayList. choices)]
    (Collections/shuffle choices rand)
    [(MutableInt. -1) choices rand]))

(defn acquire-random-range-id [[^MutableInt curr ^List state ^Random rand]]
  (when (>= (.increment curr) (.size state))
    (.set curr 0)
    (Collections/shuffle state rand))
  (.get state (.get curr)))

1.5 local-or-shuffle

local tasks优先选取, 并采用shuffle的方式  

1.6 none-grouping

不care grouping的方式, 现在的实现就是简单的random  

1.7 customing-grouping

可以自定义CustomStreamGrouping, 关键就是定义chooseTasks逻辑, 来实现自己的tasks choose策略

(defn- mk-custom-grouper [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks]
  (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks)
  (fn [task-id ^List values]
    (.chooseTasks grouping task-id values)
    ))
public interface CustomStreamGrouping extends Serializable {
   /**
     * Tells the stream grouping at runtime the tasks in the target bolt.
     * This information should be used in chooseTasks to determine the target tasks.
     *
     * It also tells the grouping the metadata on the stream this grouping will be used on.
     */
   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

   /**
     * This function implements a custom stream grouping. It takes in as input
     * the number of tasks in the target bolt in prepare and returns the
     * tasks to send the tuples to.
     *
     * @param values the values to group on
     */
   List<Integer> chooseTasks(int taskId, List<Object> values);
}

:custom-object 和:custom-serialized 的不同仅仅是, thrift-grouping是否被序列化过 
没有就可以直接读出object, 否则需要反序列成object

1.8 direct-grouping

producer of the tuple decides which task of the consumer will receive this tuple. 
Direct groupings can only be declared on streams that have been declared as direct streams.

这里直接返回:direct, 因为direct-grouping, 发送到哪个tasks, 是由producer产生tuple的时候已经决定了, 所以这里不需要做任何grouping相关工作  
 

2 stream->component->grouper

outbound-components 
一个executor只会对应于一个component, 所以给出当前executor的component-id 
getTargets, 可以得出所有outbound components, [streamid, [target-componentid, grouping]]

调用outbound-groupings, 
最终返回[streamid [component grouper]]的hashmap, 并赋值给executor-data中的stream->component->grouper

task在最终发送message的时候, 就会通过stream->component->grouper来产生真正的target tasks list

(defn outbound-components
  "Returns map of stream id to component id to grouper"
  [^WorkerTopologyContext worker-context component-id]
  (->> (.getTargets worker-context component-id) ;;[streamid, [target-componentid, grouping]]
        clojurify-structure
        (map (fn [[stream-id component->grouping]]
               [stream-id
                (outbound-groupings
                  worker-context
                  component-id
                  stream-id
                  (.getComponentOutputFields worker-context component-id stream-id)
                  component->grouping)]))
         (into {})
         (HashMap.)))

 

outbound-groupings 
对每个task不为空的target component调用mk-grouper 
mk-grouper返回的是grouper fn, 所以, 最终的返回, [component, grouper]

(defn- outbound-groupings [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping]
  (->> component->grouping
       (filter-key #(-> worker-context  ;;component对应的tasks不为0
                        (.getComponentTasks %)
                        count
                        pos?))
       (map (fn [[component tgrouping]]
               [component
                (mk-grouper worker-context
                            this-component-id
                            stream-id
                            out-fields
                            tgrouping
                            (.getComponentTasks worker-context component)
                            )]))
       (into {})
       (HashMap.)))

本文章摘自博客园,原文发布日期:2013-07-26
时间: 2025-01-13 12:25:24

Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)的相关文章

Storm-源码分析-Topology Submit-Task-TopologyContext (backtype.storm.task)

1. GeneralTopologyContext 记录了Topology的基本信息, 包含StormTopology, StormConf  已经从他们推导出的, task和component, component的streams, input/output信息 public class GeneralTopologyContext implements JSONAware { private StormTopology _topology; private Map<Integer, Stri

twitter storm源码走读(三)

worker进程中线程的分类及用途 本文重点分析storm的worker进程在正常启动之后有哪些类型的线程,针对每种类型的线程,剖析其用途及消息的接收与发送流程. 概述 worker进程启动过程中最重要的两个函数是mk-worker和worker-data,代码就不一一列出了.worker顺利启动之后会拥有如下图所示的各类线程.   接收和发送线程 worker在启动的时候会生成进程级别的消息接收和消息发送线程,它们视具体配置而定,可以是基于zmq,也可以基于netty,这个没有太多好说的.so

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net

Apache Storm源码阅读笔记&amp;OLAP在大数据时代的挑战

 <一>Apache Storm源码阅读笔记 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比较少,理解起来非常费劲. 尽管自己也陆续对storm的源码走读发表了一些博文,当时写的时候比较匆忙,有时候衔接的不是太好,此番做了一些整理,主要是针对TridentTopology部分,修改过的内容采用pdf格式发布,方便打印. 文章中有些内容的理解得益于徐明明和fxjwind两

深入理解Spark:核心思想与源码分析

大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术丛书) ISBN 978-7-111-52234-8 I. 深- II.耿- III.数据处理软件 IV. TP274 中国版本图书馆CIP数据核字(2015)第280808号 深入理解Spark:核心思想与源码分析 出版发行:机械工业出版社(北京市西城区百万庄大街22号 邮政编码:100037)

Hadoop2源码分析-MapReduce篇

1.概述 前面我们已经对Hadoop有了一个初步认识,接下来我们开始学习Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天为大家分享的是mapreduce部分,其内容目录如下所示: MapReduce V1 MapReduce V2 MR V1和MR V2的区别 MR V2的重构思路 本篇文章的源码是基于hadoop-2.6.0-src.tar.gz来完成的.代码下载地址,请参考<Hadoop2源码分析-准备篇>. 2.MapReduce V

《深入理解SPARK:核心思想与源码分析》一书正式出版上市

自己牺牲了7个月的周末和下班空闲时间,通过研究Spark源码和原理,总结整理的<深入理解Spark:核心思想与源码分析>一书现在已经正式出版上市,目前亚马逊.京东.当当.天猫等网站均有销售,欢迎感兴趣的同学购买.我开始研究源码时的Spark版本是1.2.0,经过7个多月的研究和出版社近4个月的流程,Spark自身的版本迭代也很快,如今最新已经是1.6.0.目前市面上另外2本源码研究的Spark书籍的版本分别是0.9.0版本和1.2.0版本,看来这些书的作者都与我一样,遇到了这种问题.由于研究和

深入理解Spark:核心思想与源码分析. 导读

  大数据技术丛书   深入理解Spark:核心思想与源码分析 耿嘉安 著     Preface  前言 为什么写这本书 要回答这个问题,需要从我个人的经历说起.说来惭愧,我第一次接触计算机是在高三.当时跟大家一起去网吧玩CS,跟身边的同学学怎么"玩".正是通过这种"玩"的过程,让我了解到计算机并没有那么神秘,它也只是台机器,用起来似乎并不比打开电视机费劲多少.高考填志愿的时候,凭着直觉"糊里糊涂"就选择了计算机专业.等到真正学习计算机课程的时

WebWork2源码分析

web Author: zhuam   昨晚一口气看完了夏昕写的<<Webwork2_Guide>>,虽然文档资料很简洁,但仍不失为一本好的WebWork2书籍,看的出作者的经验和能力都是非常的老道,在此向作者的开源精神致敬,并在此引用夏昕的那句话So many open source projects, Why not Open your Documents?   今天下载了最新的WebWork2版本, 开始了源码分析,这份文档只能算是我的个人笔记,也没时间细细校对,且个人能力有