Storm 简介

https://github.com/nathanmarz/storm/wiki/Documentation

 

安装和配置

Storm的安装比较简单, 下载storm的release版本, 解压, 并且把bin/目录加到环境变量PATH里面去, 就ok了. 参考配置storm开发环境 
当然为了运行Storm, 需要装一些其他的依赖的包, 可以参考Twitter Storm 安装实战

Storm支持单机调试模式, 所以现在如果你已经有包含topology的jar包, 就可以直接运行单机模式来进行测试,

storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology

当然实际使用时需要搭建Storm集群, 需要配置Zookeeper和Storm集群,并launch daemons(nimbus, supervisor, UI) , 可以参考Setting up a Storm cluster

对于storm命令, 其实可以理解为storm的客户端, 支持除了上面的jar以外的一系列操作, 参考Command line client

 

如何创建能在storm运行的Jar, Storm Starter

参考新建一个strom项目

https://github.com/nathanmarz/storm-starter

Storm Start给出很好的例子, 下载下来后用Leiningen(here)产生Jar 
Lein会根据Project.clj配置, 自动下载和添加依赖包

lein deps
lein compile
lein uberjar

生成Jar后, 就可以使用Storm命令测试

如果要新建项目只需要, 使用lein new创建项目, 然后将Project.clj参考Storm Starter的进行修改就ok了.

 

基本概念

参考, https://github.com/nathanmarz/storm/wiki/Concepts

       http://xumingming.sinaapp.com/138/twitter-storm%E5%85%A5%E9%97%A8/

 

Streams, 流

流作为storm的核心概念, 定义为无限的tuple序列.

什么是tuple? 
命名的value序列, 可以理解成Key/value序列, 每个value可以是任何类型, 动态类型不需要事先声明. 
Tuple在传输中需要序列化和反序列化, storm集成了普通类型的序列化模块, 用户可以自定义特殊类型的序列化逻辑 
A tuple is a named list of values, where each value can be any type. 
Tuples are dynamically typed -- the types of the fields do not need to be declared. 
Tuples have helper methods like getInteger and getString to get field values without having to cast the result. 
Storm needs to know how to serialize all the values in a tuple.

The stream is the core abstraction in Storm. A stream is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion. 
Streams are defined with a schema that names the fields in the stream's tuples. By default, tuples can contain integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays. You can also define your own serializers so that custom types can be used natively within tuples.

Every stream is given an id when declared. Since single-stream spouts and bolts are so common, OutputFieldsDeclarer has convenience methods for declaring a single stream without specifying an id. In this case, the stream is given the default id of "default".

Resources:

Tuple: streams are composed of tuples

OutputFieldsDeclarer: used to declare streams and their schemas

Serialization: Information about Storm's dynamic typing of tuples and declaring custom serializations

ISerialization: custom serializers must implement this interface

CONFIG.TOPOLOGY_SERIALIZATIONS: custom serializers can be registered using this configuration

 

Spouts, 流的源头

Spout是Storm里面特有的名词, Stream的源头. 通常是从外部数据源读取tuples, 并emit到topology. 
Spout可以同时emit多个tuple stream, 通过OutputFieldsDeclarer中的declareStream method来定义 
Spout需要实现IRichSpout接口, 最重要的方法是nextTuple, storm会不断调用该接口从spout中取数据 
同时需要注意, Spout分为reliable or unreliable两种, 对于reliable, 还支持ack和fail方法, 具体参考"Reliability”

下面给出个nextTuple的例子, 从words里面随机选一个tuple作为输出

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike",
                     "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

A spout is a source of streams in a topology. 
Generally spouts will read tuples from an external source and emit them into the topology (e.g. a Kestrel queue or the Twitter API). 
Spouts can either be reliable or unreliable. A reliable spout is capable of replaying a tuple if it failed to be processed by Storm, whereas an unreliable spout forgets about the tuple as soon as it is emitted.

Spouts can emit more than one stream. To do so, declare multiple streams using the declareStream method ofOutputFieldsDeclarer and specify the stream to emit to when using the emit method on SpoutOutputCollector.

The main method on spouts is nextTuplenextTuple either emits a new tuple into the topology or simply returns if there are no new tuples to emit. It is imperative that nextTuple does not block for any spout implementation, because Storm calls all the spout methods on the same thread.

The other main methods on spouts are ack and fail. These are called when Storm detects that a tuple emitted from the spout either successfully completed through the topology or failed to be completed. ack and fail are only called for reliable spouts. See the Javadoc for more information.

Resources:

IRichSpout: this is the interface that spouts must implement.

Guaranteeing message processing

 

Bolts, 流的处理节点

对于Bolt, 用户可以定义任意的处理逻辑, 最重要的方法是execute, 输入为tuple, 输出为emit 0或多个tuples到OutputCollector. 

Bolt支持多个输入流和emit多个输出流, 输出流和spout一样, 通过OutputFieldsDeclarer中的declareStream method来定义; 对于输入流, 如果想subscribe上层节点的多个输出streaming, 需要显式的通过stream_id去订阅, 如果不明确指定stream_id, 默认会订阅default stream.

public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context,
                        OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void cleanup() {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

All processing in topologies is done in bolts. Bolts can do anything from filtering, functions, aggregations, joins, talking to databases, and more.

Bolts can do simple stream transformations. Doing complex stream transformations often requires multiple steps and thus multiple bolts. For example, transforming a stream of tweets into a stream of trending images requires at least two steps: a bolt to do a rolling count of retweets for each image, and one or more bolts to stream out the top X images (you can do this particular stream transformation in a more scalable way with three bolts than with two).

Bolts can emit more than one stream. To do so, declare multiple streams using the declareStream method ofOutputFieldsDeclarer and specify the stream to emit to when using the emit method on OutputCollector.

When you declare a bolt's input streams, you always subscribe to specific streams of another component. If you want to subscribe to all the streams of another component, you have to subscribe to each one individually. InputDeclarer has syntactic sugar for subscribing to streams declared on the default stream id. Saying declarer.shuffleGrouping("1")subscribes to the default stream on component "1" and is equivalent to declarer.shuffleGrouping("1", DEFAULT_STREAM_ID).

The main method in bolts is the execute method which takes in as input a new tuple. Bolts emit new tuples using theOutputCollector object. Bolts must call the ack method on the OutputCollector for every tuple they process so that Storm knows when tuples are completed (and can eventually determine that its safe to ack the original spout tuples). For the common case of processing an input tuple, emitting 0 or more tuples based on that tuple, and then acking the input tuple, Storm provides an IBasicBolt interface which does the acking automatically.

Its perfectly fine to launch new threads in bolts that do processing asynchronously. OutputCollector is thread-safe and can be called at any time.

Resources:

IRichBolt: this is general interface for bolts.

IBasicBolt: this is a convenience interface for defining bolts that do filtering or simple functions.

OutputCollector: bolts emit tuples to their output streams using an instance of this class

Guaranteeing message processing

 

Topologies, 拓扑

可以理解为类似MapReduce job 
根本区别, MR job执行完就结束, 而Topology会一直存在. 因为MR流动的是代码, 而Storm流动的数据. 
所以Storm不可能替代MR, 因为对于海量数据, 数据的流动是不合理的 
另一个区别, 我自己的想法, Topology对工作流有更好的支持, 而MR job往往只能完成一个map/reduce的过程, 而对于复杂的操作, 需要多个MR job才能完成. 
而Topology的定义更加灵活, 可以简单的使用一个topology支持比较复杂的工作流场景

Storm Topology是基于Thrift结构, 并且Nimbus是个Thrift server, 所以对于Topology可以用任何语言实现, 最终都是转化为Thrift结构

具体的Java版本的Topology的例子,

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1new RandomSentenceSpout(), 5 );

builder.setBolt(2new SplitSentence(), 8 ).shuffleGrouping(1);

builder.setBolt(3new WordCount(), 12).fieldsGrouping(2new Fields("word"));

Topology有一个spout, 两个bolt. setSpout和setBolt的参数都是一样, 分别为id(在Topology中的唯一标识); 处理逻辑(对于Spout就是数据产生function); 并发线程数(task数) 
其中对于spout需要实现IRichSpout接口, 而bolt需要实现IRichBolt接口 
比较特别的是, setBolt方法会返回一个InputDeclarer对象, 并且该对象是用来定义Bolt输入的, 比如上面.shuffleGrouping(1), 用1(spout)的输出流作为输入

The logic for a realtime application is packaged into a Storm topology. A Storm topology is analogous to a MapReduce job. One key difference is that a MapReduce job eventually finishes, whereas a topology runs forever (or until you kill it, of course). A topology is a graph of spouts and bolts that are connected with stream groupings. These concepts are described below.

Since topology definitions are just Thrift structs, and Nimbus is a Thrift service, you can create and submit topologies using any programming language.

Resources:

TopologyBuilder: use this class to construct topologies in Java

Running topologies on a production cluster

Local mode: Read this to learn how to develop and test topologies in local mode.

 

Nimbus和Supervisor

在Storm的集群里面有两种节点: 
Nimbus节点, 主节点, 它的作用类似Hadoop里面的JobTracker. Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures. 
Supervisor的节点, 工作节点, listens for work assigned to its machine and starts and stops worker processes as necessary based on what Nimbus has assigned to it. 
Each worker process executes a subset of a topology; a running topology consists of many worker processes spread across many machines.

Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集群来完成 
nimbus进程和supervisor都是快速失败(fail-fast)和无状态的, 所有的状态都存储在Zookeeper或本地磁盘上 
这也就意味着你可以用kill -9来杀死nimbus和supervisor进程, 然后再重启它们, 它们可以继续工作 
更重要的是, nimbus和supervisor的fail或restart不会影响worker的工作, 不象Hadoop, Job tracker的fail会导致job失败

 

Workers, Executor, Tasks

参考 Storm Topology的并发度

 

Stream groupings

如果从task的粒度来看一个运行的topology, 它应该如图, 所以需要策略决定blot和spout, 以及bolt之间的数据流向问题

Part of defining a topology is specifying for each bolt which streams it should receive as input. A stream grouping defines how that stream should be partitioned among the bolt's tasks.

There are seven built-in stream groupings in Storm, and you can implement a custom stream grouping by implementing theCustomStreamGrouping interface:

  1. Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
  2. Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.
  3. All grouping: The stream is replicated across all the bolt's tasks. Use this grouping with care.
  4. Global grouping: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
  5. None grouping: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
  6. Direct grouping: This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the emitDirectmethods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).
  7. Local or shuffle grouping: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.

Resources:

TopologyBuilder: use this class to define topologies

InputDeclarer: this object is returned whenever setBolt is called on TopologyBuilder and is used for declaring a bolt's input streams and how those streams should be grouped

CoordinatedBolt: this bolt is useful for distributed RPC topologies and makes heavy use of direct streams and direct groupings

 

Reliability

Storm guarantees that every spout tuple will be fully processed by the topology. It does this by tracking the tree of tuples triggered by every spout tuple and determining when that tree of tuples has been successfully completed. Every topology has a "message timeout" associated with it. If Storm fails to detect that a spout tuple has been completed within that timeout, then it fails the tuple and replays it later.

To take advantage of Storm's reliability capabilities, you must tell Storm when new edges in a tuple tree are being created and tell Storm whenever you've finished processing an individual tuple. These are done using the OutputCollector object that bolts use to emit tuples. Anchoring is done in the emit method, and you declare that you're finished with a tuple using theack method.

This is all explained in much more detail on Guaranteeing message processing (《storm如何保证消息不丢失》)

本文章摘自博客园,原文发布日期:2013-05-03

时间: 2024-10-02 17:41:11

Storm 简介的相关文章

颠覆大数据分析之Storm简介

颠覆大数据分析之Storm简介 译者:吴京润    购书 之前我们已经极为简单的介绍了Storm.现在我们要对它做一个更详细的了解.Storm是一个复杂事件处理引擎(CEP),最初由Twitter实现.在实时计算与分析领域,Storm正在得到日益广泛的应用.Storm可以辅助基本的流式处理,例如聚合数据流,以及基于数据流的机器学习(译者注:原文是ML,根据上下文判断,此处应是指机器学习,下文相同不再缀述).通常情况,数据分析(译者注:原文为prestorage analytics,意义应是保存分

Storm Trident API 实践

一.概要      1.1 Storm(简介)      Storm是一个实时的可靠地分布式流计算框架.      具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息(可以是logs,clicks,sensor data).通过Storm对消息进行计算聚合等预处理.把处理结果持久化到NoSQL数据库或者HDFS做进一步深入分析. 1.2 Trident(简介)      Trident是对Storm的更高一层的抽象,除了提供一套简单易用的流数据处理API

《Storm企业级应用:实战、运维和调优》——1.2 Storm是什么

1.2 Storm是什么 本节主要通过介绍Storm出现的背景.简介.设计思想.与大数据框架Hadoop的比较等内容,使读者了解Storm的设计理念,从整体感观上切入,并快速掌握Storm.1.2.1 Storm出现的背景 互联网从诞生的第一时间起,对世界的最大改变就是让信息能够实时交互,从而大大加速了各个环节的效率.正因为大家有对信息实时响应.实时交互的需求,所以软件行业除了个人操作系统之外,数据库(更精确的说是关系型数据库)应该是发展最快.收益最为丰厚的产品了.记得十年前,很多银行别说实时转

浅谈Storm流式处理框架(转)

       Hadoop的高吞吐,海量数据处理的能力使得人们可以方便地处理海量数据.但是,Hadoop的缺点也和它的优点同样鲜明--延迟大,响应缓慢,运维复杂.       有需求也就有创造,在Hadoop基本奠定了大数据霸主地位的时候,很多的开源项目都是以弥补Hadoop的实时性为目标而被创造出来.而在这个节骨眼上Storm横空出世了.       Storm带着流式计算的标签华丽丽滴出场了,看看它的一些卖点: 分布式系统:可横向拓展,现在的项目不带个分布式特性都不好意思开源. 运维简单:S

《Storm实时数据处理》一1.1 简介

1.1 简介 本章将简要介绍Storm处理系统.这将涵盖所有你想知道的内容,从搭建你的开发环境到部署Topology时需要注意的操作关注事项,再到基本的质量实践,比如对Storm Topology进行单元测试和集成测试.在阅读完本章后,你将能够构建.测试和交付基本的Storm Topology.本书并不准备对Storm处理系统及其元语和架构进行理论介绍.我们假定你在阅读本书之前已经通过浏览如Storm wiki这样的在线资源了解了Storm的基本概念.当系统在产品环境中能持续可靠地产生商业价值时

《Storm实时数据处理》一2.1 简介

2.1 简介 本章将会展示一个企业日志存储系统的实现方法,以及一个基于Storm处理系统的搜索分析解决方案.其实日志数据处理已经不再是一个需要解决的问题了,但它依然能够帮助我们加深理解这些新概念.数据流处理在现代企业中是一个主要的架构关注点.但通常情况下,最理想的数据流也只是半结构化的.本章展示了一个企业日志的处理方案,目的是为了让读者学习了解各种重要概念,获得处理各种类型数据的能力.由于日志数据量庞大,因此日志数据也为学术研究提供了极大的便利.对于任何流处理或分析工作来说,其成功的关键都在于深

《Storm分布式实时计算模式》——2.2 Storm技术栈简介

2.2 Storm技术栈简介 在安装Storm之前,我们来看看Storm和topology是基于哪些技术建立的.2.2.1 Java和ClojureStorm运行在Java虚拟机上,大部分是使用Java和Clojure进行开发的.Storm的主要接口都是通过Java语言定制的,Storm使用Python实现了可执行程序.除了这些程序,由于Java使用了Apache Thrift接口,Java还友好的兼容多种其他语言.Storm的组件(spout和bot)实际上可以使用任何当前服务器安装支持的语言

Storm (实时分布式大数据处理系统) 简介

相比Hadoop的批处理,Storm的特点就是实时性. 组件 Storm集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper进行协调. 主节点 主节点通常运行一个后台程序 -- Nimbus,用于响应分布在集群中的节点,分配任务和监测故障.这个很类似于Hadoop中的Job Tracker. 工作节点 工作节点叫worker,一般就是集群中的一个节点,也就是一个计算机.它同样会运行一个后台程序 --Supervisor,用于收听工作指派并基于要求运行工作进程

有感 Visual Studio 2015 RTM 简介 - 八年后回归 Dot Net,终于迎来了 Mvc 时代,盼走了 Web 窗体时代

有感 Visual Studio 2015 RTM 简介 - 八年后回归 Dot Net,终于迎来了 Mvc 时代,盼走了 Web 窗体时代 太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:太阳火神的美丽人生 -  本博客专注于 敏捷开发及移动和物联设备研究:iOS.Android.Html5.Arduino.pcDuino,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. 关于