《Storm分布式实时计算模式》——1.3 实现单词计数topology

1.3 实现单词计数topology

前面介绍了Storm的基础概念,我们已经准备好实现一个简单的应用。现在开始着手开发一个Storm topology,并且在本地模式执行。Storm本地模式会在一个JVM实例中模拟出一个Storm集群。大大简化了用户在开发环境或者IDE中进行开发和调试。后续章节将会演示如何将本地模式下开发好的topology部署到真实的Storm集群环境。
1.3.1 配置开发环境
新建一个Storm项目其实就是将Storm及其依赖的类库添加到Java classpath中。在第2章中,你将了解到,将Storm topology发布到集群环境中,需要将编译好的类和相关依赖打包在一起。基于这个原因,我们强烈建议使用构建管理工具来管理项目,比如Apache Maven、Gradle或者Leinengen。在单词计数这个例子中,我们使用Maven。
首先,建立一个Maven项目:

然后,编辑配置文件pom.xml,添加Storm依赖

之后,通过执行下述命令编译项目,来测试配置Maven是否正确。

1.3.2 实现SentenceSpout
为简化起见,SentenceSpout的实现通过重复静态语句列表来模拟数据源。每句话作为一个单值的tuple向后循环发射。完整实现如例1.1所示。
例1.1 SentenceSpout.java

BaseRichSpout类是ISpout接口和IComponent接口的一个简便的实现。接口对本例中用不到的方法提供了默认实现。使用这个类,我们可以专注在所需要的方法上。方法declareOutputFields()是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口。Storm的组件通过这个方法告诉Storm该组件会发射哪些数据流,每个数据流的tuple中包含哪些字段。本例中,我们声明了spout会发射一个数据流,其中的tuple包含一个字段(sentence)
Open()方法在ISpout接口中定义,所有Spout组件在初始化时调用这个方法。Open()方法接收三个参数,一个包含了Storm配置信息的map,TopologyContext对象提供了topology中组件的信息,SpoutOutputCollector对象提供了发射tuple的方法。本例中,初始化时不需要做额外操作,因此open()方法实现仅仅是简单将SpoutOutputCollector对象的引用保存在变量中。
nextTuple()方法是所有spout实现的核心所在,Storm通过调用这个方法向输出的collector发射tuple。这个例子中,我们发射当前索引对应的语句,并且递增索引指向下一个语句。
1.3.3 实现语句分割bolt
例1.2列出了SplitSentenceBolt类的实现。
例1.2 SplitSentenceBolt.java

BaseRichBolt类是IComponent和IBolt接口的一个简便实现。继承这个类,就不用去实现本例不关心的方法,将注意力放在实现我们需要的功能上。
prepare()方法在IBolt中定义,类同与ISpout接口中定义的open()方法。这个方法在bolt初始化时调用,可以用来准备bolt用到的资源,如数据库连接。和SentenceSpout类一样,SplitSentenceBolt类在初始化时没有额外操作,因此prepare()方法仅仅保存OutputCollector对象的引用。
在declareOutputFields()方法中,SplitSentenceBolt声明了一个输出流,每个tuple包含一个字段“word”。
SplitSentenceBolt类的核心功能在execute()方法中实现,这个方法是IBolt接口定义的。每当从订阅的数据流中接收一个tuple,都会调用这个方法。本例中,execute()方法按照字符串读取“sentence”字段的值,然后将其拆分为单词,每个单词向后面的输出流发射一个tuple。
1.3.4 实现单词计数bolt
WordCountBolt类(见例1.3)是topology中实际进行单词计数的组件。该bolt的prepare()方法中,实例化了一个HashMap的实例,用来存储单词和对应的计数。大部分实例变量通常是在prepare()方法中进行实例化,这个设计模式是由topology的部署方式决定的。当topology发布时,所有的bolt和spout组件首先会进行序列化,然后通过网络发送到集群中。如果spout或者bolt在序列化之前(比如说在构造函数中生成)实例化了任何无法序列化的实例变量,在进行序列化时会抛出NotSerializableException异常,topology就会部署失败。本例中,因为HashMap是可序列化的,所以在构造函数中进行实例化也是安全的。但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化,在prepare()方法中对不可序列化的对象进行实例化。
在declareOutputFields()方法中,类WordCountBolt声明了一个输出流,其中的tuple包括了单词和对应的计数。execute()方法中,当接收到一个单词时,首先查找这个单词对应的计数(如果单词没有出现过则计数初始化为0),递增并存储计数,然后将单词和最新计数作为tuple向后发射。将单词计数作为数据流发射,topology中的其他bolt就可以订阅这个数据流进行进一步的处理。
例1.3 WordCountBolt.java

1.3.5 实现上报bolt
ReportBolt类的作用是对所有单词的计数生成一份报告。和WordCountBolt类似,ReportBolt使用一个HashMap对象来保存单词和对应计数。本例中,它的功能是简单的存储接收到计数bolt发射出的计数tuple。
上报bolt和上述其他bolt的一个区别是,它是一个位于数据流末端的bolt,只接收tuple。因为它不发射任何数据流,所以declareOutputFields()方法是空的。
上报bolt中初次引入了cleanup()方法,这个方法在IBolt接口中定义。Storm在终止一个bolt之前会调用这个方法。本例中我们利用cleanup()方法在topology关闭时输出最终的计数结果。通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或者数据库连接。
开发bolt时需要谨记的是,当topology在Storm集群上运行时,IBolt.cleanup()方法是不可靠的,不能保证会执行。下一章讲到Storm的容错机制时,会讨论其中的原因。但这个例子我们是运行在开发模式中的,可以保证cleanup()被调用。
类ReportBolt的完整代码见示例1.4。
例1.4 ReportBolt.java

1.3.6 实现单词计数topology
我们已经定义了计算所需要的spout和bolt。下面将它们整合为一个可运行的topology(见例1.5)
例1.5 WordCountTopology.java

Storm topology通常由Java的main()函数进行定义,运行或者提交(部署到集群的操作)。在本例中,我们首先定义了一系列字符串常量,作为Storm组件的唯一标识符。main()方法中,首先实例化了spout和bolt,并生成一个TopologyBuilder实例。TopologyBuilder类提供了流式接口风格的API来定义topology组件之间的数据流。首先注册一个sentence spout并且赋值给其唯一的ID:

然后注册一个SplitSentenceBolt,这个bolt订阅SentenceSpout发射出来的数据流:

类TopologyBuilder的setBolt()方法会注册一个bolt,并且返回BoltDeclarer的实例,可以定义bolt的数据源。这个例子中,我们将SentenceSpout的唯一ID赋值给shuffleGrouping()方法确立了这种订阅关系。shuffleGrouping()方法告诉Storm,要将类SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例。后续在讨论Storm的并发性时,会解释数据流分组的详情。代码下一行确立了类SplitSentenceBolt和类theWordCountBolt之间的连接关系:

你将了解到,有时候需要将含有特定数据的tuple路由到特殊的bolt实例中。在此我们使用类BoltDeclarer的fieldsGrouping()方法来保证所有“word”字段值相同的tuple会被路由到同一个WordCountBolt实例中。
定义数据流的最后一步是将WordCountBolt实例发射出的tuple流路由到类ReportBolt上。本例中,我们希望WordCountBolt发射的所有tuple路由到唯一的ReportBolt任务中。globalGrouping()方法提供了这种用法:

所有的数据流都已经定义好,运行单词计数计算的最后一步是编译并提交到集群上:

这里我们采用了Storm的本地模式,使用Storm的LocalCluster类在本地开发环境来模拟一个完整的Storm集群。本地模式是开发和测试的简便方式,省去了在分布式集群中反复部署的开销。本地模式还能够很方便地在IDE中执行Storm topology,设置断点,暂停运行,观察变量,分析程序性能。当topology发布到分布式集群后,这些事情会很耗时甚至难以做到。
Storm的Config类是一个HashMap的子类,并定义了一些Storm特有的常量和简便的方法,用来配置topology运行时行为。当一个topology提交时,Storm会将默认配置和Config实例中的配置合并后作为参数传递给submitTopology()方法。合并后的配置被分发给各个spout的bolt的open()、prepare()方法。从这个层面上讲,Config对象代表了对topology所有组件全局生效的配置参数集合。现在可以运行WordCountTopology类了,main()方法会提交topology,在执行10秒后,停止(卸载)该topology,最后关闭本地模式的集群。程序执行完毕后,在控制台可以看到类似以下的输出:

时间: 2025-01-26 23:13:35

《Storm分布式实时计算模式》——1.3 实现单词计数topology的相关文章

《Storm分布式实时计算模式》——3.7 执行topology

3.7 执行topology OutbreakDetectionTopology类有下列方法: https://yqfile.alicdn.com/f60f4a59d0169ba5d18c37d5992d18f5c1b86bfd.png" > 执行这个方法会将topology部署到本地集群中.spout会立即开始发送疾病事件,由Count aggregator收集计数.OutbreakDetector类中的阈值故意设置得很小,这样计数很快就超过阈值,这时程序结束,输出如下日志: 注意当数据

《Storm分布式实时计算模式》——导读

前 言 目前对信息高时效性.可操作性的需求不断增长,这要求软件系统在更少的时间内能处理更多的数据.随着可连接设备数量不断增加,以及在众多行业领域广泛应用,这种信息需求已无处不在.传统企业的运营系统被迫处理原先只有互联网企业才会遇到的大规模数据.这种重大转变正不断瓦解传统架构和解决方案,传统上会将在线事务处理和离线分析分割开来.与此同时,人们正在重新勾勒从数据中提取信息的意义和价值.软件框架和基础设施也在不断进化,以适应这种新场景. 具体地说,数据的生成可以看作一连串发生的离散事件,这些事件流会伴

《Storm分布式实时计算模式》——1.4 Storm的并发机制

1.4 Storm的并发机制 在Storm的间接中提到过,Storm计算支持在多台机器上水平扩容,通过将计算切分为多个独立的tasks在集群上并发执行来实现.在Storm中,一个task可以简单地理解为在集群某节点上运行的一个spout或者bolt实例.为了理解storm的并发机制是如何运行的,我们先来解释下在集群中运行的topology的四个主要组成部分: Nodes(服务器):指配置在一个Storm集群中的服务器,会执行topology的一部分运算.一个Storm集群可以包括一个或者多个工作

《Storm分布式实时计算模式》——1.5 理解数据流分组

1.5 理解数据流分组 看了前面的例子,你会纳闷为什么没有增加ReportBolt的并发度.答案是,这样做没有任何意义.为了理解其中的原因,需要了解Storm中数据流分组的概念.数据流分组定义了一个数据流中的tuple如何分发给topology中不同bolt的task.举例说明,在并发版本的单词计数topology中,SplitSentenceBolt类指派了四个task.数据流分组决定了指定的一个tuple会分发到哪个task上.Storm定义了七种内置数据流分组的方式: Shuffle gr

《Storm分布式实时计算模式》——第1章 分布式单词计数1.1 Storm topology的组成部分——stream、spout和bolt

第1章 分布式单词计数 本章将介绍使用Storm建立一个分布式流式计算应用时涉及的核心概念.我们通过建立一个简单的计数器程序实现这个目的.计数器将持续输入的一句句话作为输入流,统计其中单词出现的次数.单词计数这个例子浅显易懂,引入了多种数据结构.技术和设计模式.这些都是实现更复杂计算所必须的基础. 本章首先概要介绍Storm的数据结构,然后实现一个完整Storm程序所需的各个组成部分.读完本章,读者将会了解Storm计算的基本结构.搭建开发环境的方法.Storm程序的开发和调试技术. 本章包括以

《Storm分布式实时计算模式》——2.3 在Linux上安装Storm

2.3 在Linux上安装Storm Storm是设计运行在Unix兼容的操作系统上.但在0.9.1版本,它也支持在Windows机器上部署. 为了简化部署,我们使用Ubuntu 12.04LTS的发行版作为安装服务器.将会使用服务器版本,默认不包括图形界面接口,因为我们用不到..在实体机和虚拟机上安装ubuntu都是非常方便的.出于学习和开发的目的,你会发现在虚拟机里进行部署更加方便,尤其是手头没有那么多实体机的情况. OSX.Linux.Windows都有着对应的虚拟机软件.我们建议从下面集

《Storm分布式实时计算模式》——1.6 有保障机制的数据处理

1.6 有保障机制的数据处理 Storm提供了一种API能够保证spout发送出来的每个tuple都能够执行完整的处理过程.在我们上面的例子中,不担心执行失败的情况.可以看到在一个topology中一个spout的数据流会被分割生成任意多的数据流,取决于下游bolt的行为.如果发生了执行失败会怎样?举个例子,考虑一个负责将数据持久化到数据库的bolt.怎样处理数据库更新失败的情况?1.6.1 spout的可靠性 在Storm中,可靠的消息处理机制是从spout开始的.一个提供了可靠的处理机制的s

《Storm分布式实时计算模式》——3.6 Trident状态

3.6 Trident状态 我们现在已经给每个aggregator的分组数据进行了计数,现在想将信息进行持久化存储,以便进一步分析.在Trident中,持久化操作从状态管理开始.Trident对状态有底层的操作原语,但不同于Storm API,它不关心要哪些数据会作为状态存储或者如何存储这些状态.Trident在高层提供了下述的状态接口: 上面提到了,Trident将tuple分组成一批批数据.每批数据都有自己的事务标识符.在前面的接口中,Trident告诉State对象什么时候开始提交状态,什

《Storm分布式实时计算模式》——2.1 Storm集群的框架

第2章 配置Storm集群 在本章中你将深入理解Storm的技术栈,它的软件依赖,以及搭建和部署Storm集群的过程.我们首先会在伪分布式模式下安装Storm,所有的组件都安装在同一台机器上,而不是在多台机器上.一旦你了解了安装和配置Storm的基本步骤,我们就可以通过Puppet这个工具进行自动化的安装,这样的话部署多节点的集群可以节省大量的时间和精力. 本章包括以下内容: 组成Storm集群的不同组件和服务 Storm的技术栈 在Linux上安装和配置Storm Storm的配置参数 Sto