Storm starter - SingleJoinExample

Storm常见模式——流聚合

Topology

1.定义两个spout, 分别是genderSpout, ageSpout 
  Fields, ("id", "gender"), ("id", "age"), 最终join的结果应该是("id", "gender", "age")

2. 在设置SingleJoinBolt需要将outFields作为参数, 即告诉bolt, join完的结果应该包含哪些fields 
   并且对于两个spout都是以Fields("id")进行fieldsGrouping, 保证相同id都会发到同一个task

public class SingleJoinExample {
    public static void main(String[] args) {
        FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
        FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("gender", genderSpout);
        builder.setSpout("age", ageSpout);
        builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
                .fieldsGrouping("gender", new Fields("id"))
                .fieldsGrouping("age", new Fields("id"));
}

SingleJoinBolt

由于不能保证bolt可以同时收到某个id的所有tuple, 所以必须把收到的tuple都先在memory里面cache, 至到收到某id的所有的tuples, 再做join. 
做完join后, 这些tuple就可以从cache里面删除, 但是如果某id的某些tuple丢失, 就会导致该id的其他tuples被一直cache. 
解决这个问题, 对cache数据设置timeout, 过期后就删除, 并发送这些tuples的fail通知.

可见这个场景, 使用TimeCacheMap正合适,

TimeCacheMap<List<Object>, , Map,>

List<Object>, 被join的field, 对于上面的例子就是"id”, 之所以是List, 应该是为了支持多fields join 
Map<GlobalStreamId, Tuple>,记录tuple和stream的关系

对于这个例子, 从TimeCacheMap的bucket里面取出下面两个k,v, 然后进行join 
{id, {agestream, (id, age)}} 
{id, {genderstream, (id, gender)}}

 

1. prepare 
一般的prepare的逻辑都很简单, 而这里确很复杂... 
a, 设置Timeout和ExpireCallback 
timeout 设的是, Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 默认是30s, 这个可以根据场景自己调整 
应该设法保证不同spout中tuple的发送顺序, 以保证相同id的tuple以较短时间间隔被收到, 比如这个例子应该按id排序然后emit 
否则如果出现, ("id", "gender")被第一个emit, 而 ("id", "age")被最后一个emit, 会导致不断的timeout 
设置ExpireCallback, 对于所有timeout的tuples, 发送fail通知

    private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
        @Override
        public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
            for(Tuple tuple: tuples.values()) {
                _collector.fail(tuple);
            }
        }
    }

b. 找出_idFields(哪些field是相同的, 可以用作join) 和_fieldLocations (outfield和spout stream的关系, 比如gender属于genderstream) 
通过context.getThisSources()取出spout sources列表, 并通过getComponentOutputFields取到fields列表 
_idFields, 逻辑很简单, 每次都拿新的fields和idFields做retainAll(取出set共同部分), 最终会得到所有spout fields的相同部分 
_fieldLocations, 拿_outFields和spout fields进行匹配, 找到后记录下关系

其实, 我觉得这部分准备工作, 在调用的时候用参数指明就可以了, 犯不着那么麻烦的来做 
比如参数变为("id", {"gender", genderstream}, {"age", agestream})

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _fieldLocations = new HashMap<String, GlobalStreamId>();
        _collector = collector;
        int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
        _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
        _numSources = context.getThisSources().size();
        Set<String> idFields = null;
        for(GlobalStreamId source: context.getThisSources().keySet()) {
            Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
            Set<String> setFields = new HashSet<String>(fields.toList());
            if(idFields==null) idFields = setFields;
            else idFields.retainAll(setFields);

            for(String outfield: _outFields) {
                for(String sourcefield: fields) {
                    if(outfield.equals(sourcefield)) {
                        _fieldLocations.put(outfield, source);
                    }
                }
            }
        }
        _idFields = new Fields(new ArrayList<String>(idFields));

        if(_fieldLocations.size()!=_outFields.size()) {
            throw new RuntimeException("Cannot find all outfields among sources");
        }
    }

2, execute

a, 从tuple中取出_idFields和streamid  
   如果在_pending(TimeCacheMap)中没有此_idFields, 为这个_idFields创新新的hashmap并put到bucket 
b, 取出该_idFields所对应的所有Map<GlobalStreamId, Tuple> parts, 并检测当前收到的是否是无效tuple(从同一个stream emit的具有相同id的tuple) 
   将新的tuple, put到该_idFields所对应的map. parts.put(streamId, tuple); 
c, 判断如果parts的size等于spout sources的数目, 对于这个例子为2, 意思是当从genderstream和agestream过来的tuple都已经收到时 
       从_pending(TimeCacheMap)删除该_idFields的cache数据, 因为已经可以join, 不需要继续等待了 
       并根据_outFields以及_fieldLocations, 去各个stream的tuple中取出值 
       最终emit结果, (((id, age), (id, gender)),            (age, gender)) 
                       ArrayList<Tuple>(parts.values()),   joinResult 
       Ack所有的tuple

    @Override
    public void execute(Tuple tuple) {
        List<Object> id = tuple.select(_idFields);
        GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
        if(!_pending.containsKey(id)) {
            _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
        }
        Map<GlobalStreamId, Tuple> parts = _pending.get(id);
        if(parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice");
        parts.put(streamId, tuple);
        if(parts.size()==_numSources) {
            _pending.remove(id);
            List<Object> joinResult = new ArrayList<Object>();
            for(String outField: _outFields) {
                GlobalStreamId loc = _fieldLocations.get(outField);
                joinResult.add(parts.get(loc).getValueByField(outField));
            }
            _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);

            for(Tuple part: parts.values()) {
                _collector.ack(part);
            }
        }
    }

 

TimeCacheMap

Storm常见模式——TimeCacheMap

解决什么问题? 
常常需要在memory里面cache key-value, 比如实现快速查找表 
但是memeory是有限的, 所以希望只保留最新的cache的, 过期的key-value可以被删除. 所以TimeCacheMap就是用来解决这个问题的, 在一定time内cache map(kv set)

1. 构造参数

TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback)

首先需要expirationSecs, 表示多久过期 
然后, numBuckets, 表示时间粒度, 比如expirationSecs = 60s, 而numBuckets=10, 那么一个bucket就代表6s的时间窗, 并且6s会发生一次过期数据删除 
最后, ExpiredCallback<K, V> callback, 当发生超时的时候, 需要对超时的K,V做些操作的话, 可以定义这个callback, 比如发送fail通知

2. 数据成员

核心结构, 使用linkedlist来实现bucket list, 用HashMap<K, V>来实现每个bucket

private LinkedList<HashMap<K, V>> _buckets; 

辅助成员, lock对象和定期的cleaner thread

private final Object _lock = new Object();
private Thread _cleaner;

3. 构造函数

其实核心就是启动_cleaner Daemon线程 
_cleaner的逻辑其实很简单, 
定期的把最后一个bucket删除, 在bucket list开头加上新的bucket, 并且如果有定义callback, 对所有timeout的kv调用callback 
同时这里考虑线程安全, 会对操作过程加锁synchronized(_lock)

唯一需要讨论的是, sleepTime 
即如果保证数据在定义的expirationSecs时间后, 被删除 
定义, sleepTime = expirationMillis / (numBuckets-1) 
a, 如果cleaner刚刚完成删除last, 添加first bucket, 这时put的K,V的过期时间为, 
   expirationSecs / (numBuckets-1) * numBuckets = expirationSecs * (1 + 1 / (numBuckets-1)) 
   需要等待完整的numBuckets个sleepTime, 所以时间会略大于expirationSecs

b, 如果反之, 刚完成put k,v操作后, cleaner开始clean操作, 那么k,v的过期时间为, 
   expirationSecs / (numBuckets-1) * numBuckets - expirationSecs / (numBuckets-1) = expirationSecs 
   这种case会比a少等一个sleepTime, 时间恰恰是expirationSecs

所以这个方法保证, 数据会在[b,a]的时间区间内被删除

    public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
        if(numBuckets<2) {
            throw new IllegalArgumentException("numBuckets must be >= 2");
        }
        _buckets = new LinkedList<HashMap<K, V>>();
        for(int i=0; i<numBuckets; i++) {
            _buckets.add(new HashMap<K, V>());
        }

        _callback = callback;
        final long expirationMillis = expirationSecs * 1000L;
        final long sleepTime = expirationMillis / (numBuckets-1);
        _cleaner = new Thread(new Runnable() {
            public void run() {
                try {
                    while(true) {
                        Map<K, V> dead = null;
                        Time.sleep(sleepTime);
                        synchronized(_lock) {
                            dead = _buckets.removeLast();
                            _buckets.addFirst(new HashMap<K, V>());
                        }
                        if(_callback!=null) {
                            for(Entry<K, V> entry: dead.entrySet()) {
                                _callback.expire(entry.getKey(), entry.getValue());
                            }
                        }
                    }
                } catch (InterruptedException ex) {

                }
            }
        });
        _cleaner.setDaemon(true);
        _cleaner.start();
    }

4. 其他操作

首先, 所有操作都会使用synchronized(_lock)保证线程互斥 
其次, 所有操作的复杂度都是O(numBuckets), 因为每个item都是hashmap, 都是O(1)操作

 

最重要的是Put, 只会将新的k,v, put到第一个(即最新的)bucket, 并且将之前旧bucket里面的相同key的cache数据删除

public void put(K key, V value) {
        synchronized(_lock) {
            Iterator<HashMap<K, V>> it = _buckets.iterator();
            HashMap<K, V> bucket = it.next();
            bucket.put(key, value);
            while(it.hasNext()) {
                bucket = it.next();
                bucket.remove(key);
            }
        }
    }

其他还支持如下操作,

public boolean containsKey(K key) 
public V get(K key) 
public Object remove(K key) 
public int size() //将所有bucket的HashMap的size累加

本文章摘自博客园,原文发布日期:2013-05-24

时间: 2024-08-01 11:38:59

Storm starter - SingleJoinExample的相关文章

Storm starter - Overview

Storm的starter例子, 都给的很有诚意, 不光是例子, 而是可以直接使用在实际的场景里面.  并且提高一些很有用的tool, 比如SlidingWindowCounter, TimeCacheMap  所以starter可以说是提高了基于storm编程的框架, 值得认真研究一下...   ExclamationTopology, 基本的Topology 没有什么特别的地方, 标准的例子 /** * This is a basic example of a Storm topology.

Storm starter - RollingTopWords

计算top N words的topology, 用于比如trending topics or trending images on Twitter. 实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码  Topology 这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping String spoutId = "wordGenerator"; String counterId = &

Storm-源码分析-Topology Submit-Executor-mk-threads

对于executor thread是整个storm最为核心的代码, 因为在这个thread里面真正完成了大部分工作, 而其他的如supervisor,worker都是封装调用. 对于executor的mk-threads, 是通过mutilmethods对spout和bolt分别定义不同的逻辑 1. Spout Thread (defmethod mk-threads :spout [executor-data task-datas] (let [{:keys [storm-conf compo

Twitter Storm

第 121 章 Twitter Storm 目录 121.1. 单机版121.2. lein 安装 121.1. 单机版 操作系统环境:Ubuntu 13.04 KVM虚拟机 安装 storm 涉及到安装以下包:Python.zookeeper.zeromq.jzmq.storm 过程 121.1. Ubuntu + Storm 单机环境安装 安装 zookeeper $ sudo apt-get install zookeeper zookeeper-bin zookeeperd 安装 zer

如何在eclipse调试storm程序

 一.介绍       storm提供了两种运行模式:本地模式和分布式模式.本地模式针对开发调试storm topologies非常有用.       Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is

Docker集群轻松部署Apache Storm

Apache Storm是一个非常常用的实时流计算框架.最近有客户来咨询如何在Docker中运行Apache Storm的问题.我之前读过一篇文章介绍Apache Storm在Docker环境的部署,The Joy Of Deploying Apache Storm On Docker Swarm.文章写的很好,但是整个过程需要从手工构建Docker集群环境开始,再一步步把Storm配置起来,虽然作者提到整个过程是"a real joy",估计绝大多数用户依然会望而生畏. 利用Dock

Apache Storm技术实战

 WordCountTopology "源码走读系列"从代码层面分析了storm的具体实现,接下来通过具体的实例来说明storm的使用.因为目前storm已经正式迁移到Apache,文章系列也由twitter storm转为apache storm. WordCountTopology 使用storm来统计文件中的每个单词的出现次数. 通过该例子来说明tuple发送时的几个要素 source component   发送源 destination component 接收者 strea

CentOS 6.4单机环境下安装配置Storm

Storm是一个分布式的.高容错的实时计算系统,在实时性要求比较强的应用场景下,可以用它来处理海量数据.我们尝试着搭建Storm平台,来实现实时计算.下面,我们在CentOS 6.4上安装配置Storm系统. 安装配置 安装配置过程,按照如下步骤进行: 1.安装配置sunjdk 下载sunjdk,并安装Java运行环境: 1 wget http://download.oracle.com/otn/java/jdk/6u45-b06/jdk-6u45-linux-x64.bin 2 chmod +

twitter storm源码走读(二)

topology提交过程分析 概要 storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配.除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这个时候nimbus充当了调度室的角色.supervisor作为中层干部,职责就是生产车间的主任,他的日常工作就是时刻等待着调度到给他下达新的工作.作为车间主任,supervisor领到的活是不用自己亲力亲为去作的,他手下有着一班的普通工人.supervisor对这些工人只会喊两句话,开工,收工.注意