Apache Storm 官方文档 —— Storm 与 Kestrel

原文链接    译者:魏勇

本文说明了如何使用 Storm 从 Kestrel 集群中消费数据。

前言

Storm

本教程中使用了 storm-kestrel 项目和 storm-starter 项目中的例子。建议读者将这几个项目 clone 到本地,并动手运行其中的例子。

Kestrel

本文假定读者可以如此项目所述在本地运行一个 Kestrel 集群。

Kestrel 服务器与队列

Kestrel 服务中包含有一组消息队列。Kestrel 队列是一种非常简单的消息队列,可以运行于 JVM 上,并使用 memcache 协议(以及一些扩展)与客户端交互。详情可以参考 storm-kestrel 项目中的 KestrelThriftClient 类的实现。

每个队列均严格遵循先入先出的规则。为了提高服务性能,数据都是缓存在系统内存中的;不过,只有开头的 128MB 是保存在内存中的。在服务停止的时候,队列的状态会保存到一个日志文件中。

请参阅此文了解更多详细信息。

Kestrel 具有 * 快速 * 小巧 * 持久 * 可靠 等特点。

例如,Twitter 就使用 Kestrel 作为消息系统的核心环节,此文中介绍了相关信息。

** 向 Kestrel 中添加数据

首先,我们需要一个可以向 Kestrel 的队列添加数据的程序。下述方法使用了 storm-kestrel 项目中的 KestrelClient 的实现。该方法从一个包含 5 个句子的数组中随机选择一个句子添加到 Kestrel 的队列中。

  private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
            throws ParseError, IOException {

        String[] sentences = new String[] {
                "the cow jumped over the moon",
                "an apple a day keeps the doctor away",
                "four score and seven years ago",
                "snow white and the seven dwarfs",
                "i am at two with nature"};

        Random _rand = new Random();

        for(int i=1; i<=10; i++){

            String sentence = sentences[_rand.nextInt(sentences.length)];

            String val = "ID " + i + " " + sentence;

            boolean queueSucess = kestrelClient.queue(queueName, val);

            System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
        }
    }

从 Kestrel 中移除数据

此方法从一个队列中取出一个数据,但并不把该数据从队列中删除:

private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){

        Item item = kestrelClient.dequeue(queueName);

        if(item==null){
            System.out.println("The queue (" + queueName + ") contains no items.");
        }
        else
        {
            byte[] data = item._data;

            String receivedVal = new String(data);

            System.out.println("receivedItem=" + receivedVal);
        }
    }

此方法会从队列中取出并移除数据:

private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
throws IOException, ParseError
     {
        for(int i=1; i<=12; i++){

            Item item = kestrelClient.dequeue(queueName);

            if(item==null){
                System.out.println("The queue (" + queueName + ") contains no items.");
            }
            else
            {
                int itemID = item._id;

                byte[] data = item._data;

                String receivedVal = new String(data);

                kestrelClient.ack(queueName, itemID);

                System.out.println("receivedItem=" + receivedVal);
            }
        }
}

向 Kestrel 中连续添加数据

下面的程序可以向本地 Kestrel 服务的一个 sentence_queue 队列中连续添加句子,这也是我们的最后一个程序。

可以在命令行窗口中输入一个右中括号 ] 并回车来停止程序。

import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import backtype.storm.spout.KestrelClient;
import backtype.storm.spout.KestrelClient.Item;
import backtype.storm.spout.KestrelClient.ParseError;

public class AddSentenceItemsToKestrel {

    /**
     * @param args
     */
    public static void main(String[] args) {

        InputStream is = System.in;

        char closing_bracket = ']';

        int val = closing_bracket;

        boolean aux = true;

        try {

            KestrelClient kestrelClient = null;
            String queueName = "sentence_queue";

            while(aux){

                kestrelClient = new KestrelClient("localhost",22133);

                queueSentenceItems(kestrelClient, queueName);

                kestrelClient.close();

                Thread.sleep(1000);

                if(is.available()>0){
                 if(val==is.read())
                     aux=false;
                }
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        catch (ParseError e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("end");

    }
}

使用 KestrelSpout

下面的拓扑使用 KestrelSpout 从一个 Kestrel 队列中读取句子,并将句子分割成若干个单词(Bolt:SplitSentence),然后输出每个单词出现的次数(Bolt:WordCount)。数据处理的细节可以参考消息的可靠性保证一文。

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
            .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
        .fieldsGrouping("split", new Fields("word"));

运行

首先,以生产模式或者开发者模式启动你的本地 Kestrel 服务。

然后,等待大约 5 秒钟以防出现网络连接异常。

现在可以运行向队列中添加数据的程序,并启动 Storm 拓扑。程序启动的顺序并不重要。

如果你以 TOPOLOGY_DEBUG 模式运行拓扑你会观察到拓扑中 tuple 发送的细节信息。 

时间: 2024-11-03 02:23:59

Apache Storm 官方文档 —— Storm 与 Kestrel的相关文章

Apache Storm 官方文档 —— Storm 集群安装配置

原文链接    译者:魏勇 本文详细介绍了 Storm 集群的安装配置方法.如果需要在 AWS 上安装 Storm,你应该先了解一下 storm-deploy 项目.storm-deploy 可以自动完成 E2 上 Storm 集群的准备.配置.安装的全部过程,同时还设置好了 Ganglia,方便监控 CPU.磁盘以及网络的使用信息. 如果你在使用 Storm 集群时遇到问题,请先查看"问题与解决"一文中是否已有相应的解决方案.如果检索不到有效的解决方法,请向社区的邮件列表发送关于问题

Apache Storm 官方文档中文版

原文链接    译者:魏勇 About 本项目是 Apache Storm 官方文档的中文翻译版,致力于为有实时流计算项目需求和对 Apache Storm 感兴趣的同学提供有价值的中文资料,希望能够对大家的工作和学习有所帮助. 虽然 Storm 的正式推出已经有好几个年头了,发行版也已经到了 0.10.x,但是目前网络上靠谱的学习资料仍然不多,很多比较有价值的资料都过时了(甚至官方网站自己的资料都没有及时更新,这大概也是发展太快的社区的通病),而较新的资料大多比较零碎,在关键内容的描述上也有些

Apache Storm 官方文档 —— 内部技术实现

这部分的 wiki 是为了说明 Storm 是怎样实现的.在阅读本章之前你需要先了解怎样使用 Storm. 代码库架构 拓扑的生命周期1 消息传递的实现1 Ack 框架的实现 Metrics 事务型拓扑的工作机制1 单元测试2 时间模拟 完整的拓扑 集群跟踪 说明 1 该文内容已过期.2 该文官方文档暂未提供. 转载自 并发编程网 - ifeve.com

Apache Storm 官方文档 —— 源码组织结构

原文链接    译者:魏勇 Strom 的代码有三个层次: 第一,Storm 在一开始就是按照兼容多语言的目的来设计的.Nimbus 是一个 Thrift 服务,拓扑也被定义为 Thrift 架构.Thrift 的使用使得 Storm 可以用于任何一种语言. 第二,所有的 Storm 接口都设计为 Java 接口.所以,尽管 Storm 核心代码中有大量的 Clojure 实现,所有的访问都必须经过 Java API.这就意味着 Storm 的每个特性都可以通过 Java 来实现. 第三,Sto

Apache Storm 官方文档 —— 使用 Maven 构建 Storm 应用

在开发拓扑的时候,你需要在 classpath 中包含 Storm 的相关 jar 包.你可以将各个 jar 包直接包含到你的项目的 classpath 中,也可以使用 Maven 将 Storm 添加到依赖项中.Storm 已经集成到 Maven 的中心仓库中.你可以在项目的 pom.xml 中添加以下依赖来将 Storm 包含进项目中: <dependency> <groupId>org.apache.storm</groupId> <artifactId&g

Apache Storm 官方文档 —— 消息的可靠性保障

原文链接    译者:魏勇 Storm 能够保证每一个由 Spout 发送的消息都能够得到完整地处理.本文详细解释了 Storm 如何实现这种保障机制,以及作为用户如何使用好 Storm 的可靠性机制. 消息的"完整性处理"是什么意思 一个从 spout 中发送出的 tuple 会产生上千个基于它创建的 tuples.例如,有这样一个 word-count 拓扑: TopologyBuilder builder = new TopologyBuilder(); builder.setS

Apache Storm 官方文档 —— FAQ

Storm 最佳实践 关于配置 Storm + Trident 的建议 worker 的数量最好是服务器数量的倍数:topology 的总并发度(parallelism)最好是 worker 数量的倍数:Kafka 的分区数(partitions)最好是 Spout(特指 KafkaSpout)并发度的倍数 在每个机器(supervisor)上每个拓扑应用只配置一个 worker 在拓扑最开始运行的时候设置使用较少的大聚合器,并且最好是每个 worker 进程分配一个 使用独立的调度器(sche

Apache Storm 官方文档 —— Trident 教程

Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入.有状态的流式处理与低延时的分布式查询无缝结合起来.如果你了解 Pig 或者 Cascading 这样的高级批处理工具,你就会发现他们和 Trident 的概念非常相似.Trident 同样有联结(join).聚合(aggregation).分组(grouping).函数(function)以及过滤器(filter)这些功能.Trident 为数据库或者其他持久化存储上层的状态化.增量式处理提供了

Apache Storm 官方文档 —— 序列化

本文阐述了 Storm 0.6.0 以上版本的序列化机制.在低于 0.6.0 版本的 Storm 中使用了另一种序列化系统,详细信息可以参考 Serialization (prior to 0.6.0) 一文. Storm 中的 tuple 可以包含任何类型的对象.由于 Storm 是一个分布式系统,所以在不同的任务之间传递消息时 Storm 必须知道怎样序列化.反序列化消息对象. Storm 使用 Kryo 对对象进行序列化.Kryo 是一个生成小序列的灵活.快速的序列化库. Storm 本身