Storm入门之第7章使用非JVM语言开发

本文翻译自《Getting Started With Storm》译者:吴京润 编辑:郭蕾 方腾飞

有时候你可能想使用不是基于JVM的语言开发一个Storm工程,你可能更喜欢使用别的语言或者想使用用某种语言编写的库。

Storm是用Java实现的,你看到的所有这本书中的spoutbolt都是用java编写的。那么有可能使用像Python、Ruby、或者JavaScript这样的语言编写spoutbolt吗?答案是当然



可以!可以使用多语言协议达到这一目的。

多语言协议是Storm实现的一种特殊的协议,它使用标准输入输出作为spoutbolt进程间的通讯通道。消息以JSON格式或纯文本格式在通道中传递。

我们看一个用非JVM语言开发spoutbolt的简单例子。在这个例子中有一个spout产生从1到10,000的数字,一个bolt过滤素数,二者都用PHP实现。

NOTE: 在这个例子中,我们使用一个很笨的办法验证素数。有更好当然也更复杂的方法,它们已经超出了这个例子的范围。

有一个专门为Storm实现的PHP DSL(译者注:领域特定语言),我们将会在例子中展示我们的实现。首先定义拓扑。

...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));
builder.setBolt("prime-numbers-filter", new
PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
StormTopology topology = builder.createTopology();
...

NOTE:有一种使用非JVM语言定义拓扑的方式。既然Storm拓扑是Thrift架构,而且Nimbus是一个Thrift守护进程,你就可以使用任何你想用的语言创建并提交拓扑。但是这已经超出了本书的范畴了。

这里没什么新鲜了。我们看一下NumbersGeneratorSpout的实现。

public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
 public NumberGeneratorSpout(Integer from, Integer to) {
 super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to.toString());
 }
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 declarer.declare(new Fields("number"));
 }
 public Map<String, Object> getComponentConfiguration() {
 return null;
 }
}

你可能已经注意到了,这个spout继承了ShellSpout。这是个由Storm提供的特殊的类,用来帮助你运行并控制用其它语言编写的spout。在这种情况下它告诉Storm如何执行你的PHP脚本。

NumberGeneratorSpout的PHP脚本向标准输出分发元组,并从标准输入读取确认或失败信号。

在开始实现NumberGeneratorSpout.php脚本之前,多观察一下多语言协议是如何工作的。

spout按照传递给构造器的参数从fromto顺序生成数字。

接下来看看PrimeNumbersFilterBolt。这个类实现了之前提到的壳。它告诉Storm如何执行你的PHP脚本。Storm为这一目的提供了一个特殊的叫做ShellBolt的类,你惟一要做的事就是指出如何运行脚本以及声明要分发的属性。

public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
 public PrimeNumbersFilterBolt() {
 super("php", "-f", "PrimeNumbersFilterBolt.php");
 }
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 declarer.declare(new Fields("number"));
 }
}

在这个构造器中只是告诉Storm如何运行PHP脚本。它与下列命令等价。

 php -f PrimeNumbersFilterBolt.php

PrimeNumbersFilterBolt.php脚本从标准输入读取元组,处理它们,然后向标准输出分发、确认或失败。在开始这个脚本之前,我们先多了解一些多语言协议的工作方式。

  1. 发起一次握手
  2. 开始循环
  3. 读/写元组

NOTE:有一种特殊的方式可以使用Storm的内建日志机制在你的脚本中记录日志,所以你不需要自己实现日志系统。

下面我们来看一看上述每一步的细节,以及如何用PHP实现它。

发起握手

为了控制整个流程(开始以及结束它),Storm需要知道它执行的脚本进程号(PID)。根据多语言协议,你的进程开始时发生的第一件事就是Storm要向标准输入(译者注:根据上下文理解,本章提到的标准输入输出都是从非JVM语言的角度理解的,这里提到的标准输入也就是PHP的标准输入)发送一段JSON数据,它包含Storm配置、拓扑上下文和一个进程号目录。它看起来就像下面的样子:

{
 "conf": {
 "topology.message.timeout.secs": 3,
 // etc
 },
 "context": {
 "task->component": {
 "1": "example-spout",
 "2": "__acker",
 "3": "example-bolt"
 },
 "taskid": 3
 },
 "pidDir": "..."
}

脚本进程必须在pidDir指定的目录下以自己的进程号为名字创建一个文件,并以JSON格式把进程号写到标准输出。

{"pid": 1234}

举个例子,如果你收到/tmp/example\n而你的脚本进程号是123,你应该创建一个名为/tmp/example/123的空文件并向标准输出打印文本行 {“pid”: 123}\n(译者注:此处原文只有一个n,译者猜测应是排版错误)和end\n。这样Storm就能持续追踪进程号并在它关闭时杀死脚本进程。下面是PHP实现:

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

你已经实现了一个叫做read_msg的函数,用来处理从标准输入读取的消息。按照多语言协议的声明,消息可以是单行或多行JSON文本。一条消息以end\n结束。

function read_msg() {
 $msg = "";
 while(true) {
 $l = fgets(STDIN);
 $line = substr($l,0,-1);
 if($line=="end") {
 break;
 }
 $msg = "$msg$line\n";
 }
 return substr($msg, 0, -1);
}
function storm_send($json) {
 write_line(json_encode($json));
 write_line("end");
}
function write_line($line) {
 echo("$line\n");
}

NOTE:flush()方法非常重要;有可能字符缓冲只有在积累到一定程度时才会清空。这意味着你的脚本可能会为了等待一个来自Storm的输入而永远挂起,而Storm却在等待来自你的脚本的输出。因此当你的脚本有内容输出时立即清空缓冲是很重要的。

开始循环以及读/写元组

这是整个工作中最重要的一步。这一步的实现取决于你开发的spoutbolt

如果是spout,你应当开始分发元组。如果是bolt,就循环读取元组,处理它们,分发它发,确认成功或失败。

下面我们就看看用来分发数字的spout

$from = intval($argv[1]);
$to = intval($argv[2]);
while(true) {
 $msg = read_msg();
 $cmd = json_decode($msg, true);
 if ($cmd['command']=='next') {
 if ($from<$to) {
 storm_emit(array("$from"));
 $task_ids = read_msg();
 $from++;
 } else {
 sleep(1);
 }
 }
 storm_sync();
}

从命令行获取参数fromto,并开始迭代。每次从Storm得到一条next消息,这意味着你已准备好分发下一个元组。

一旦你发送了所有的数字,而且没有更多元组可发了,就休眠一段时间。

为了确保脚本已准备好发送下一个元组,Storm会在发送下一条之前等待sync\n文本行。调用read_msg(),读取一条命令,解析JSON。

对于bolts来说,有少许不同。

while(true) {
 $msg = read_msg();
 $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
 if (!empty($tuple["id"])) {
 if (isPrime($tuple["tuple"][0])) {
 storm_emit(array($tuple["tuple"][0]));
 }
 storm_ack($tuple["id"]);
 }
}

循环的从标准输入读取元组。解析读取每一条JSON消息,判断它是不是一个元组,如果是,再检查它是不是一个素数,如果是素数再次分发一个元组,否则就忽略掉,最后不论如何都要确认成功。

NOTE:json_decode函数中使用的JSON_BIGINT_AS_STRING是为了解决一个在JAVA和PHP之间的数据转换问题。JAVA发送的一些很大的数字,在PHP中会丢失精度,这样就会导致问题。为了避开这个问题,告诉PHP把大数字当作字符串处理,并在JSON消息中输出数字时不使用双引号。PHP5.4.0或更高版本要求使用这个参数。

emit,ack,fail,以及log消息都是如下结构:

emit

{
 "command": "emit",
 "tuple": ["foo", "bar"]
}

其中的数组包含了你分发的元组数据。

ack

{
 "command": "ack",
 "id": 123456789
}

其中的id就是你处理的元组的ID。
fail

{
 "command": "fail",
 "id": 123456789
}

ack(译者注:原文是emit从上下JSON的内容和每个方法的功能上判断此处就是ack,可能是排版错误)相同,其中id就是你处理的元组ID。
log

{
 "command": "log",
 "msg": "some message to be logged by storm."
}

下面是完整的的PHP代码。

//你的spout:
<?php
function read_msg() {
 $msg = "";
 while(true) {
 $l = fgets(STDIN);
 $line = substr($l,0,-1);
 if ($line=="end") {
 break;
 }
 $msg = "$msg$line\n";
 }
 return substr($msg, 0, -1);
}
function write_line($line) {
 echo("$line\n");
}
function storm_emit($tuple) {
 $msg = array("command" => "emit", "tuple" => $tuple);
 storm_send($msg);
}
function storm_send($json) {
 write_line(json_encode($json));
 write_line("end");
}
function storm_sync() {
 storm_send(array("command" => "sync"));
}
function storm_log($msg) {
 $msg = array("command" => "log", "msg" => $msg);
 storm_send($msg);
 flush();
}
$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();
$from = intval($argv[1]);
$to = intval($argv[2]);
while(true) {
 $msg = read_msg();
 $cmd = json_decode($msg, true);
 if ($cmd['command']=='next') {
 if ($from<$to) {
 storm_emit(array("$from"));
 $task_ids = read_msg();
 $from++;
 } else {
 sleep(1);
 }
 }
 storm_sync();
}
?>
//你的bolt:
<?php
function isPrime($number) {
 if ($number < 2) {
 return false;
 }
 if ($number==2) {
 return true;
 }
 for ($i=2; $i<=$number-1; $i++) {
 if ($number % $i == 0) {
 return false;
 }
 }
 return true;
}
function read_msg() {
 $msg = "";
 while(true) {
 $l = fgets(STDIN);
 $line = substr($l,0,-1);
 if ($line=="end") {
 break;
 }
 $msg = "$msg$line\n";
 }
 return substr($msg, 0, -1);
}
function write_line($line) {
 echo("$line\n");
}
function storm_emit($tuple) {
 $msg = array("command" => "emit", "tuple" => $tuple);
 storm_send($msg);
}
function storm_send($json) {
 write_line(json_encode($json));
 write_line("end");
}
function storm_ack($id) {
 storm_send(["command"=>"ack", "id"=>"$id"]);
}
function storm_log($msg) {
 $msg = array("command" => "log", "msg" => "$msg");
 storm_send($msg);
}
$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();
while(true) {
 $msg = read_msg();
 $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
 if (!empty($tuple["id"])) {
 if (isPrime($tuple["tuple"][0])) {
 storm_emit(array($tuple["tuple"][0]));
 }
 storm_ack($tuple["id"]);
 }
}
?>

NOTE:需要重点指出的是,应当把所有的脚本文件保存在你的工程目录下的一个名为multilang/resources的子目录中。这个子目录被包含在发送给工人进程的jar文件中。如果你不把脚本包含在这个目录中,Storm就不能运行它们,并抛出一个错误。

文章转自 并发编程网-ifeve.com

时间: 2024-10-03 07:58:49

Storm入门之第7章使用非JVM语言开发的相关文章

Storm入门之第8章事务性拓扑

Storm入门之第8章事务性拓扑 本文翻译自<Getting Started With Storm>译者:吴京润    编辑:郭蕾 方腾飞 正如书中之前所提到的,使用Storm编程,可以通过调用ack和fail方法来确保一条消息的处理成功或失败.不过当元组被重发时,会发生什么呢?你又该如何砍不会重复计算?   Storm0.7.0实现了一个新特性--事务性拓扑,这一特性使消息在语义上确保你可以安全的方式重发消息,并保证它们只会被处理一次.在不支持事务性拓扑的情况下,你无法在准确性,可扩展性,以

Apache Storm 官方文档 —— 定义 Storm 的非 JVM 语言 DSL

实现非 JVM 语言 DSL(Domain Specific Language,领域专用语言)应该从 storm-core/src/storm.thrift 文件开始.由于 Storm 拓扑是 Thrift 结构,而且 Nimbus 是一个 Thrift 后台进程,你可以以任意语言创建并提交拓扑. 当你创建 Thrift 结构的 spouts 与 bolts 时,spout 或者 bolt 的代码是以 ComponentObject 结构体的形式定义的: union ComponentObjec

《嵌入式 Linux C 语言应用程序设计(修订版)》——第 2 章 嵌入式Linux C语言开发工具 2.1 嵌入式Linux下C语言概述

第 2 章 嵌入式Linux C语言开发工具 本章目标 任何应用程序的开发都离不开编辑器.编译器及调试器,嵌入式Linux的C语言开发也一样,它也有一套优秀的编辑.编译及调试工具. 掌握这些工具的使用是至关重要的,它直接影响到程序开发的效率.因此,希望读者能自己动手操作,切实熟练掌握这些工具的使用.通过本章的学习,读者将会掌握如下内容: 2.1 嵌入式Linux下C语言概述 读者在第一章中已经了解了嵌入式开发的基本流程,在嵌入式系统中应用程序的主体是在宿主机中开发完成的,就嵌入式Linux而言,

Storm入门之第三章拓扑

本文翻译自<Getting Started With Storm>  译者:吴京润   编辑:方腾飞 在这一章,你将学到如何在同一个Storm拓扑结构内的不同组件之间传递元组,以及如何向一个运行中的Storm集群发布一个拓扑. 数据流组 设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被bolts消费的).一个数据流组指定了每个bolt会消费哪些数据流,以及如何消费它们. NOTE:一个节点能够发布一个以上的数据流,一个数据流组允许我们选择接收哪个. 数据

Storm入门之第四章Spouts

本文翻译自<Getting Started With Storm>  译者:吴京润   编辑:方腾飞 你将在本章了解到spout作为拓扑入口和它的容错机制相关的最常见的设计策略. 可靠的消息 VS 不可靠的消息 在设计拓扑结构时,始终在头脑中记着的一件重要事情就是消息的可靠性.当有无法处理的消息时,你就要决定该怎么办,以及作为一个整体的拓扑结构该做些什么.举个例子,在处理银行存款时,不要丢失任何事务报文就是很重要的事情.但是如果你要统计分析数以百万的tweeter消息,即使有一条丢失了,仍然可

Storm入门之第五章Bolts

本文翻译自<Getting Started With Storm>  译者:吴京润   编辑:方腾飞 第5章 Bolts 正如你已经看到的,bolts是一个Storm集群中的关键组件.你将在这一章学到bolt生命周期,一些bolt设计策略,以及几个有关这些内容的例子. Bolt生命周期 Bolt是这样一种组件,它把元组作为输入,然后产生新的元组作为输出.实现一个bolt时,通常需要实现IRichBolt接口.Bolts对象由客户端机器创建,序列化为拓扑,并提交给集群中的主机.然后集群启动工人进

Storm入门之第6章一个实际的例子

本文翻译自<Getting Started With Storm>译者:吴京润    编辑:郭蕾 方腾飞 本章要阐述一个典型的网络分析解决方案,而这类问题通常利用Hadoop批处理作为解决方案.与Hadoop不同的是,基于Storm的方案会实时输出结果.     我们的这个例子有三个主要组件(见图6-1) 一个基于Node.js的web应用,用于测试系统 一个Redis服务器,用于持久化数据 一个Storm拓扑,用于分布式实时处理数据 图6-1  架构概览 NOTE:你如果想先把这个例子运行起

《Storm入门》中文版

本文翻译自<Getting Started With Storm>译者:吴京润    编辑:郭蕾 方腾飞 本书的译文仅限于学习和研究之用,没有原作者和译者的授权不能用于商业用途. 译者序 Storm入门终于翻译完了.首先感谢并发编程网同意本人在网站上首发本书译文,同时还要感谢并发编程网的各位大牛们的耐心帮助.这是本人翻译的第一本书,其中必有各种不足请诸位读者朋友不吝斧正. 译完此书之后,我已经忘记了是如何知道的Storm这个工具了.本人读过的所有技术书籍大部分都是在地铁上完成的,现在已经成了习

Storm入门 第二章准备开始

本文翻译自<Getting Started With Storm>  译者:吴京润   编辑:方腾飞 准备开始 在本章,我们要创建一个Storm工程和我们的第一个Storm拓扑结构. NOTE: 下面假设你的JRE版本在1.6以上.我们推荐Oracle提供的JRE.你可以到http://www.java .com/downloads/下载. 操作模式 开始之前,有必要了解一下Storm的操作模式.有下面两种方式. 本地模式 在本地模式下,Storm拓扑结构运行在本地计算机的单一JVM进程上.这