Storm starter - Overview

Storm的starter例子, 都给的很有诚意, 不光是例子, 而是可以直接使用在实际的场景里面. 
并且提高一些很有用的tool, 比如SlidingWindowCounter, TimeCacheMap 
所以starter可以说是提高了基于storm编程的框架, 值得认真研究一下...

 

ExclamationTopology, 基本的Topology

没有什么特别的地方, 标准的例子

/**
 * This is a basic example of a Storm topology.
 */
public class ExclamationTopology {

    public static class ExclamationBolt extends BaseRichBolt {
        OutputCollector _collector;

        @Override
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("word", new TestWordSpout(), 10);
        builder.setBolt("exclaim1", new ExclamationBolt(), 3)
                .shuffleGrouping("word");
        builder.setBolt("exclaim2", new ExclamationBolt(), 2)
                .shuffleGrouping("exclaim1");

        Config conf = new Config();
        conf.setDebug(true);

        if(args!=null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }
}

 

RollingTopWords

实现了TopN和滑动窗口功能 
这个例子的Bolt实现的很有指导意义, Storm starter - RollingTopWords

 

SingleJoinExample

通过TimeCacheMap, 实现基于memory的join, Storm starter - SingleJoinExample

 

BasicDRPCTopology, ReachTopology

关于DRPC的例子, 参考Twitter Storm – DRPC

 

TransactionalGlobalCount, TransactionalWords

Transactional Topology, Storm - Transactional-topologies

TransactionalGlobalCount比较简单, 看看TransactionalWords 
在对word计数的基础上, 加上word count分布统计信息

public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();

使用Count_Database来记录word的计数 
使用Bucket_Database来记录word计数的分布, 比如, 出现0~9次的word有多少, 10~20的word有多少 


public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter

对于KeyedCountUpdater和前面的简单例子没有啥大区别, 在execute时对word进行count, 在finishBatch时, 直接commit到Count_Database 
输出, new Fields("id", "key", "count", "prev-count"), 其他都好理解, 为啥需要prev-count? 因为在更新Bucket_Database, 需要知道该word的bucket是否发生迁移, 所以必须知道之前的count

 

Bucketize, 根据count/BUCKET_SIZE, 算出应该属于哪个bucket 
如果新的word, 直接在某bucket +1 
如果word的bucket发生变化, 在新的bucket +1, 旧的bucket –1 
如果没有变化, 不需要输出

    public static class Bucketize extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
            int curr = tuple.getInteger(2);
            Integer prev = tuple.getInteger(3);

            int currBucket = curr / BUCKET_SIZE;
            Integer prevBucket = null;
            if(prev!=null) {
                prevBucket = prev / BUCKET_SIZE;
            }

            if(prevBucket==null) {
                collector.emit(new Values(attempt, currBucket, 1));
            } else if(currBucket != prevBucket) {
                collector.emit(new Values(attempt, currBucket, 1));
                collector.emit(new Values(attempt, prevBucket, -1));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("attempt", "bucket", "delta"));
        }
    }

BucketCountUpdater, 也就是将上面的bucket的更新, 更新到Bucket_Database 

Topology定义如下,

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
builder.setBolt("count", new KeyedCountUpdater(), 5)
        .fieldsGrouping("spout", new Fields("word"));
builder.setBolt("bucketize", new Bucketize())
        .noneGrouping("count");
builder.setBolt("buckets", new BucketCountUpdater(), 5)
        .fieldsGrouping("bucketize", new Fields("bucket"));

 

WordCountTopology, 多语言的支持

Storm 多语言支持

分别使用ShellBolt和BaseBasicBolt来声明使用python和Java实现的Blot

    public static class SplitSentence extends ShellBolt implements IRichBolt {

        public SplitSentence() {
            super("python", "splitsentence.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }  

    public static class WordCount extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if(count==null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

在定义Topology的时候, 可以直接将ShellBolt和BaseBasicBolt混合使用, 非常方便

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

        builder.setBolt("split", new SplitSentence(), 8)
                 .shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12)
                 .fieldsGrouping("split", new Fields("word"));

本文章摘自博客园,原文发布日期:2013-05-24
时间: 2024-09-20 01:16:54

Storm starter - Overview的相关文章

Storm starter - SingleJoinExample

Storm常见模式--流聚合 Topology 1.定义两个spout, 分别是genderSpout, ageSpout    Fields, ("id", "gender"), ("id", "age"), 最终join的结果应该是("id", "gender", "age") 2. 在设置SingleJoinBolt需要将outFields作为参数, 即告诉bo

Storm starter - RollingTopWords

计算top N words的topology, 用于比如trending topics or trending images on Twitter. 实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码  Topology 这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping String spoutId = "wordGenerator"; String counterId = &

Twitter Storm

第 121 章 Twitter Storm 目录 121.1. 单机版121.2. lein 安装 121.1. 单机版 操作系统环境:Ubuntu 13.04 KVM虚拟机 安装 storm 涉及到安装以下包:Python.zookeeper.zeromq.jzmq.storm 过程 121.1. Ubuntu + Storm 单机环境安装 安装 zookeeper $ sudo apt-get install zookeeper zookeeper-bin zookeeperd 安装 zer

如何在eclipse调试storm程序

 一.介绍       storm提供了两种运行模式:本地模式和分布式模式.本地模式针对开发调试storm topologies非常有用.       Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is

Docker集群轻松部署Apache Storm

Apache Storm是一个非常常用的实时流计算框架.最近有客户来咨询如何在Docker中运行Apache Storm的问题.我之前读过一篇文章介绍Apache Storm在Docker环境的部署,The Joy Of Deploying Apache Storm On Docker Swarm.文章写的很好,但是整个过程需要从手工构建Docker集群环境开始,再一步步把Storm配置起来,虽然作者提到整个过程是"a real joy",估计绝大多数用户依然会望而生畏. 利用Dock

Apache Storm技术实战

 WordCountTopology "源码走读系列"从代码层面分析了storm的具体实现,接下来通过具体的实例来说明storm的使用.因为目前storm已经正式迁移到Apache,文章系列也由twitter storm转为apache storm. WordCountTopology 使用storm来统计文件中的每个单词的出现次数. 通过该例子来说明tuple发送时的几个要素 source component   发送源 destination component 接收者 strea

CentOS 6.4单机环境下安装配置Storm

Storm是一个分布式的.高容错的实时计算系统,在实时性要求比较强的应用场景下,可以用它来处理海量数据.我们尝试着搭建Storm平台,来实现实时计算.下面,我们在CentOS 6.4上安装配置Storm系统. 安装配置 安装配置过程,按照如下步骤进行: 1.安装配置sunjdk 下载sunjdk,并安装Java运行环境: 1 wget http://download.oracle.com/otn/java/jdk/6u45-b06/jdk-6u45-linux-x64.bin 2 chmod +

twitter storm源码走读(二)

topology提交过程分析 概要 storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配.除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这个时候nimbus充当了调度室的角色.supervisor作为中层干部,职责就是生产车间的主任,他的日常工作就是时刻等待着调度到给他下达新的工作.作为车间主任,supervisor领到的活是不用自己亲力亲为去作的,他手下有着一班的普通工人.supervisor对这些工人只会喊两句话,开工,收工.注意

Storm 简介

https://github.com/nathanmarz/storm/wiki/Documentation   安装和配置 Storm的安装比较简单, 下载storm的release版本, 解压, 并且把bin/目录加到环境变量PATH里面去, 就ok了. 参考配置storm开发环境  当然为了运行Storm, 需要装一些其他的依赖的包, 可以参考Twitter Storm 安装实战 Storm支持单机调试模式, 所以现在如果你已经有包含topology的jar包, 就可以直接运行单机模式来进