Storm源码浅析之topology的提交

    原文:http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html
    作者:dennis (killme2008@gmail.com)
    转载请注明出处。

    最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。       

一、介绍
    Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:

     180 text files.
     177 unique files.                                          
       7 files ignored.

http://cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
-------------------------------------------------------------------------------
Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Java                           125           5010           2414          25661
Lisp                            33            732            283           4871
Python                           7            742            433           4675
CSS                              1             12             45           1837
ruby                             2             22              0            104
Bourne Shell                     1              0              0              6
Javascript                       2              1             15              6
-------------------------------------------------------------------------------
SUM:                           171           6519           3190          37160
-------------------------------------------------------------------------------

    Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。
        
二、Topology和Nimbus       
    Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交:

StormSubmitter.submitTopology(name, conf, builder.createTopology());

    我们将topology打成jar包后,利用bin/storm这个python脚本,执行如下命令:

bin/storm jar xxxx.jar com.taobao.MyTopology args

    将jar包提交给storm集群。storm脚本会启动JVM执行Topology的main方法,执行submitTopology的过程。而submitTopology会将jar文件上传到nimbus,上传是通过socket传输。在storm这个python脚本的jar方法里可以看到:

def jar(jarfile, klass, *args):                                                                                                                               
   exec_storm_class(                                                                                                                                          
        klass,                                                                                                                                                
        jvmtype="-client",                                                                                                                                    
        extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],                                                                                                    
        args=args,                                                                                                                                            
        prefix="export STORM_JAR=" + jarfile + ";")

     将jar文件的地址设置为环境变量STORM_JAR,这个环境变量在执行submitTopology的时候用到:

//StormSubmitter.java 
private static void submitJar(Map conf) {
        if(submittedJar==null) {
            LOG.info("Jar not uploaded to master yet. Submitting jar");
            String localJar = System.getenv("STORM_JAR");
            submittedJar = submitJar(conf, localJar);
        } else {
            LOG.info("Jar already uploaded to master. Not submitting jar.");
        }
    }

    通过环境变量找到jar包的地址,然后上传。利用环境变量传参是个小技巧。

    其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录,nimbus数据目录的结构

-nimbus
     -inbox
         -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
         -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

     -stormdist
        -storm-id
           -stormjar.jar
           -stormconf.ser
           -stormcode.ser

     其中inbox用于存放提交的jar文件,每个jar文件都重命名为stormjar加上一个32位的UUID。而stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。
    进入重点,topology任务的分配过程(zookeeper路径说明忽略root):
1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]
3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息
4.开始分配任务(内部称为assignment), 具体步骤:
 (1)从zk上获得已有的assignment(新的toplogy当然没有了)
 (2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。
 (3)将任务均匀地分配给可用的worker,这里有两种情况:
 (a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配

{1: [host1:port1] 2 : [host2:port1]
         3 : [host1:port1] 4 : [host2:port1]}

,可以看到任务平均地分配在两个worker上。
(b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成

[host1:port1 host2:port1 host1:port2 host2:port2]

,然后分配任务为

{1: host1:port1 , 2 : host2:port2}

(4)记录启动时间
(5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。
5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。
6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。

文章转自庄周梦蝶  ,原文发布时间2011-12-01

时间: 2025-01-29 11:50:58

Storm源码浅析之topology的提交的相关文章

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net

PgSQL · 最佳实践 · pg_rman源码浅析与使用

背景 对于商业数据库来说,备份的功能一般都非常的全面. 比如Oracle,它的备份工具rman是非常强大的,很多年前就已经支持全量.增量.归档的备份模式,支持压缩等. 还支持元数据存储到数据库中,管理也非常的方便,例如保留多少归档,备份集的管理也很方便,例如要恢复到什么时间点,将此前的备份清除等等. 对于开源数据库来说,支持向商业版本这么丰富功能的比较少,PostgreSQL算是非常完善的一个. PostgreSQL作为最高级的开源数据库,备份方面已经向商业数据库看齐. 目前PostgreSQL

我对java String的理解 及 源码浅析

一.char说起到String 这也是自己第二次回过头来啃java基础书,小生自认为愚昧无知.如果大神有好的教育,可以评论私信.以下都是我的看法: 为什么说char 呢,我这里先卖个关子.在java中,char是用unicode编码的,占16位(2字节).从ansi编码(1字节)到unicode编码(2字 节).Java中使用Unicode的原因是,Java的Applet(网页)运行,Unicode里面包含最多最广比如:中 文,English,Spanish,German, French等.因此

Android源码浅析(一)——VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置

Android源码浅析(一)--VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置 最近地方工作,就是接触源码的东西了,所以好东西还是要分享,系列开了这么多,完结 的也没几个,主要还是自己覆盖的太广了,却又不精通,嘿嘿,工作需要,所以写下了本篇博客 一.VMware 12 我选择的虚拟机试VMware,挺好用的感觉,下载VMware就不说了,善用搜索键嘛,这里我提供一个我现在在用的 下载地址:链接:http://pan.baidu.com/s/1k

苹果推出最受欢迎的iOS 到 民用与商用数据库备份的差异与源码浅析

背景 苹果推出了有史以来最受欢迎的一版iOS,为什么这么受欢迎? 最主要的还是使用了最新的APFS文件系统,这个文件系统几乎集成了ZFS,Btrfs的所有优良特性,比如最为好用的快照(块级增量).压缩.使得苹果的操作系统一下子瘦了,而且备份占用空间也非常小. 对于数据库来说,备份也不是小事,如何实现高效的备份.节省空间的备份以及具备可以定义SLA的恢复(不会随着数据库的大小.REDO的多少而变化). 对于商业数据库来说,备份的功能一般都非常的全面. 比如Oracle,它的备份工具rman是非常强

PostgreSQL 9.6 快照过旧 - 源码浅析

PostgreSQL 9.6 快照过旧 - 源码浅析 作者 digoal 日期 2016-10-05 标签 PostgreSQL , 9.6 , 快照过旧 , snapshot too old 背景 在PostgreSQL 9.6以前,垃圾回收存在的问题. 当vacuum回收垃圾时,遇到垃圾记录的xmax大于数据库中现存的最早未提交事务xmin时,不会对其进行回收. 因此当数据库中存在很久为结束的事务时,可能会导致数据库膨胀. PostgreSQL 9.6加入了快照过旧的功能,目的是防止过长的事

Android源码浅析(二)——Ubuntu Root,Git,VMware Tools,安装输入法,主题美化,Dock,安装JDK和配置环境

Android源码浅析(二)--Ubuntu Root,Git,VMware Tools,安装输入法,主题美化,Dock,安装JDK和配置环境 接着上篇,上片主要是介绍了一些安装工具的小知识点Android源码浅析(一)--VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置,其实Ubuntu Kylin 16.04 LTS也只是为了体验,我们为了追求稳定,还是使用了Ubuntu14.04 这里提供一个国内镜像的下载链接,可以用迅雷,下载下来之后后缀

Apache Storm源码阅读笔记&OLAP在大数据时代的挑战

 <一>Apache Storm源码阅读笔记 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比较少,理解起来非常费劲. 尽管自己也陆续对storm的源码走读发表了一些博文,当时写的时候比较匆忙,有时候衔接的不是太好,此番做了一些整理,主要是针对TridentTopology部分,修改过的内容采用pdf格式发布,方便打印. 文章中有些内容的理解得益于徐明明和fxjwind两

【深入浅出jQuery】源码浅析2--奇技淫巧

最近一直在研读 jQuery 源码,初看源码一头雾水毫无头绪,真正静下心来细看写的真是精妙,让你感叹代码之美. 其结构明晰,高内聚.低耦合,兼具优秀的性能与便利的扩展性,在浏览器的兼容性(功能缺陷.渐进增强)优雅的处理能力以及 Ajax 等方面周到而强大的定制功能无不令人惊叹. 另外,阅读源码让我接触到了大量底层的知识.对原生JS .框架设计.代码优化有了全新的认识,接下来将会写一系列关于 jQuery 解析的文章. 我在 github 上关于 jQuery 源码的全文注解,感兴趣的可以围观一下