Storm入门之第五章Bolts

本文翻译自《Getting Started With Storm》  译者:吴京润   编辑:方腾飞

第5章 Bolts

正如你已经看到的,bolts是一个Storm集群中的关键组件。你将在这一章学到bolt生命周期,一些bolt设计策略,以及几个有关这些内容的例子。

Bolt生命周期

Bolt是这样一种组件,它把元组作为输入,然后产生新的元组作为输出。实现一个bolt时,通常需要实现IRichBolt接口。Bolts对象由客户端机器创建,序列化为拓扑,并提交给集群中的主机。然后集群启动工人进程反序列化bolt,调用prepare,最后开始处理元组。

NOTE:要创建一个bolt对象,它通过构造器参数初始化成员属性,bolt被提交到集群时,这些属性值会随着一起序列化。

Bolt结构

Bolts拥有如下方法:

declareOutputFields(OutputFieldsDeclarer declarer)bolt声明输出模式
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
    仅在bolt开始处理元组之前调用
execute(Tuple input)
    处理输入的单个元组
cleanup()
    在bolt即将关闭时调用

下面看一个例子,在这个例子中bolt把一句话分割成单词列表:

class SplitSentence implements IRichBolt {
    private OutputCollector collector;
    publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

正如你所看到的,这是一个很简单的bolt。值得一提的是在这个例子里,没有消息担保。这就意味着,如果bolt因为某些原因丢弃了一些消息——不论是因为bolt挂了,还是因为程序故意丢弃的——生成这条消息的spout不会收到任何通知,任何其它的spoutsbolts也不会收到。

然而在许多情况下,你想确保消息在整个拓扑范围内都被处理过了。

可靠的bolts和不可靠的bolts

正如前面所说的,Storm保证通过spout发送的每条消息会得到所有bolt的全面处理。基于设计上的考虑,这意味着你要自己决定你的bolts是否保证这一点。

拓扑是一个树型结构,消息(元组)穿过其中一条或多条分支。树上的每个节点都会调用ack(tuple)fail(tuple),Storm因此知道一条消息是否失败了,并通知那个/那些制造了这些消息的spout(s)。既然一个Storm拓扑运行在高度并行化的环境里,跟踪始发spout实例的最好方法就是在消息元组内包含一个始发spout引用。这一技巧称做锚定(译者注:原文为Anchoring)。修改一下刚刚讲过的SplitSentence,使它能够确保消息都被处理了。

class SplitSentence implenents IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(tuple, new Values(word));
        }
        collector.ack(tuple);
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declar.declare(new Fields("word"));
    }
}

锚定发生在调用collector.emit()时。正如前面提到的,Storm可以沿着元组追踪到始发spoutcollector.ack(tuple)collector.fail(tuple)会告知spout每条消息都发生了什么。当树上的每条消息都已被处理了,Storm就认为来自spout的元组被全面的处理了。如果一个元组没有在设置的超时时间内完成对消息树的处理,就认为这个元组处理失败。默认超时时间为30秒。

NOTE:你可以通过修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓扑的超时时间。

当然了spout需要考虑消息的失败情况,并相应的重试或丢弃消息。

NOTE:你处理的每条消息要么是确认的(译者注:collector.ack())要么是失败的(译者注:collector.fail())。Storm使用内存跟踪每个元组,所以如果你不调用这两个方法,该任务最终将耗尽内存。

多数据流

一个bolt可以使用emit(streamId, tuple)把元组分发到多个流,其中参数streamId是一个用来标识流的字符串。然后,你可以在TopologyBuilder决定由哪个流订阅它。

多锚定

为了用bolt连接或聚合数据流,你需要借助内存缓冲元组。为了在这一场景下确保消息完成,你不得不把流锚定到多个元组上。可以向emit方法传入一个元组列表来达成目的。

...
List anchors = new ArrayList();
anchors.add(tuple1);
anchors.add(tuple2);
collector.emit(anchors, values);
...

通过这种方式,bolt在任意时刻调用ackfail方法,都会通知消息树,而且由于流锚定了多个元组,所有相关的spout都会收到通知。

使用IBasicBolt自动确认

你可能已经注意到了,在许多情况下都需要消息确认。简单起见,Storm提供了另一个用来实现bolt的接口,IBasicBolt。对于该接口的实现类的对象,会在执行execute方法之后自动调用ack方法。

class SplitSentence extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

NOTE:分发消息的BasicOutputCollector自动锚定到作为参数传入的元组。

文章转自 并发编程网-ifeve.com

时间: 2024-09-21 18:48:36

Storm入门之第五章Bolts的相关文章

Storm入门之第8章事务性拓扑

Storm入门之第8章事务性拓扑 本文翻译自<Getting Started With Storm>译者:吴京润    编辑:郭蕾 方腾飞 正如书中之前所提到的,使用Storm编程,可以通过调用ack和fail方法来确保一条消息的处理成功或失败.不过当元组被重发时,会发生什么呢?你又该如何砍不会重复计算?   Storm0.7.0实现了一个新特性--事务性拓扑,这一特性使消息在语义上确保你可以安全的方式重发消息,并保证它们只会被处理一次.在不支持事务性拓扑的情况下,你无法在准确性,可扩展性,以

Storm入门之第三章拓扑

本文翻译自<Getting Started With Storm>  译者:吴京润   编辑:方腾飞 在这一章,你将学到如何在同一个Storm拓扑结构内的不同组件之间传递元组,以及如何向一个运行中的Storm集群发布一个拓扑. 数据流组 设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被bolts消费的).一个数据流组指定了每个bolt会消费哪些数据流,以及如何消费它们. NOTE:一个节点能够发布一个以上的数据流,一个数据流组允许我们选择接收哪个. 数据

Storm入门之第四章Spouts

本文翻译自<Getting Started With Storm>  译者:吴京润   编辑:方腾飞 你将在本章了解到spout作为拓扑入口和它的容错机制相关的最常见的设计策略. 可靠的消息 VS 不可靠的消息 在设计拓扑结构时,始终在头脑中记着的一件重要事情就是消息的可靠性.当有无法处理的消息时,你就要决定该怎么办,以及作为一个整体的拓扑结构该做些什么.举个例子,在处理银行存款时,不要丢失任何事务报文就是很重要的事情.但是如果你要统计分析数以百万的tweeter消息,即使有一条丢失了,仍然可

Storm入门之第7章使用非JVM语言开发

本文翻译自<Getting Started With Storm>译者:吴京润 编辑:郭蕾 方腾飞 有时候你可能想使用不是基于JVM的语言开发一个Storm工程,你可能更喜欢使用别的语言或者想使用用某种语言编写的库. Storm是用Java实现的,你看到的所有这本书中的spout和bolt都是用java编写的.那么有可能使用像Python.Ruby.或者JavaScript这样的语言编写spout和bolt吗?答案是当然 可以!可以使用多语言协议达到这一目的. 多语言协议是Storm实现的一种

Storm入门之第6章一个实际的例子

本文翻译自<Getting Started With Storm>译者:吴京润    编辑:郭蕾 方腾飞 本章要阐述一个典型的网络分析解决方案,而这类问题通常利用Hadoop批处理作为解决方案.与Hadoop不同的是,基于Storm的方案会实时输出结果.     我们的这个例子有三个主要组件(见图6-1) 一个基于Node.js的web应用,用于测试系统 一个Redis服务器,用于持久化数据 一个Storm拓扑,用于分布式实时处理数据 图6-1  架构概览 NOTE:你如果想先把这个例子运行起

《Storm入门》中文版

本文翻译自<Getting Started With Storm>译者:吴京润    编辑:郭蕾 方腾飞 本书的译文仅限于学习和研究之用,没有原作者和译者的授权不能用于商业用途. 译者序 Storm入门终于翻译完了.首先感谢并发编程网同意本人在网站上首发本书译文,同时还要感谢并发编程网的各位大牛们的耐心帮助.这是本人翻译的第一本书,其中必有各种不足请诸位读者朋友不吝斧正. 译完此书之后,我已经忘记了是如何知道的Storm这个工具了.本人读过的所有技术书籍大部分都是在地铁上完成的,现在已经成了习

Storm入门 第二章准备开始

本文翻译自<Getting Started With Storm>  译者:吴京润   编辑:方腾飞 准备开始 在本章,我们要创建一个Storm工程和我们的第一个Storm拓扑结构. NOTE: 下面假设你的JRE版本在1.6以上.我们推荐Oracle提供的JRE.你可以到http://www.java .com/downloads/下载. 操作模式 开始之前,有必要了解一下Storm的操作模式.有下面两种方式. 本地模式 在本地模式下,Storm拓扑结构运行在本地计算机的单一JVM进程上.这

ArcGIS for Desktop入门教程_第五章_ArcCatalog使用 - ArcGIS知乎-新一代ArcGIS问答社区

原文:ArcGIS for Desktop入门教程_第五章_ArcCatalog使用 - ArcGIS知乎-新一代ArcGIS问答社区 1 ArcCatalog使用1.1 GIS数据 地理信息系统,就是将真实的地物或地理现象抽象为计算机可表达的简单的集合类型(比如点.线.多边形),再按其几何类型和专题信息进行分类,通过计算机技术来管理和分析这些数据.从数据的组织形式上,我们通常将其分为矢量数据和栅格数据两大类. 如上图所示,分别用矢量和栅格的形式来表达一条高速公路.我们可以清晰地发现矢量数据与栅

SEO从零开始第五章——新闻源与百度敏感词

  营销手段中包含了SEO,SEO是营销一部分如果想在SEO界有所成就那一定要学会网络营销手段,网络营销会了SEO自然就会了,因为SEO是根本网络营销而变的.2013年5月28日百度外链工具全新升级,我记得在SEO从零开始第2章节说过,快照全部停留在27号,28号百度肯定有动作,百度拒绝外链技术越来越成熟, 回到正题,最近很多灰色行业在大量收购百度新闻源来做百度敏感词语,先说说我自己理解的原理,先知道原理然后再推荐百度如何针对这些问题.目前主流获得敏感词语排名的方案企业,新闻源站劫持获取排名.百