Storm-源码分析- Disruptor在storm中的使用

Disruptor 2.0, (http://ifeve.com/disruptor-2-change/)

Disruptor为了更便于使用, 在2.0做了比较大的调整, 比较突出的是更换了几乎所有的概念名

老版本,

新版本,

从左到右的变化如下,

1. Producer –> Publisher 
2. ProducerBarrier被integrate到RingBuffer里面, 叫做PublishPort, 提供publish接口 
3. Entry –> Event 
4, Cursor封装成Sequence, 其实Sequence就是将cursor+pading封装一下 
5. Consumer –> EventProcesser 
6. ConsumerBarrier 变为DependencyBarrier, 或SequenceBarrier

并且对于publisher和EventProcesser, 存在ClaimStrategy和WaitStrategy 
对于publisher的ClaimStrategy, 由于publisher需要先claim到sequencer才能publish: SingleThreadedClaimStrategy, MultiThreadedClaimStrategy, 应该是对于singlethread不需要使用CAS更为高效 
对于EventProcesser的WaitStrategy, 当取不到数据的时候采用什么样的策略进行等待: BlockingWaitStrategy, BusySpinWaitStrategy, SleepingWaitStrategy, YieldingWaitStrategy 
Blocking就是同步加锁, BusySpin就是忙等耗CPU, 都比较低效 
Yielding就是调用thread.yeild(), 把线程的从可执行状态调整成就绪装, 意思我先息下, 你们忙你们先来, 就是把CPU让给其他的线程, 但是yeild并不保证过多久线程被执行, 如果没有其他线程, 可能会被立即执行 
而sleep, 会强制线程休眠指定时间, 然后再重新调度

 

DisruptorQueue.java

    static final Object FLUSH_CACHE = new Object(); //特殊对象, 当consumer取到时, 触发cache queue的flush
    static final Object INTERRUPT = new Object(); //特殊对象, 当consumer取到时, 触发InterruptedException

    RingBuffer<MutableObject> _buffer; //Disruptor的主要的数据结构RingBuffer
    Sequence _consumer; //consumer读取序号
    SequenceBarrier _barrier; //用于consumer监听RingBuffer的序号情况

    // TODO: consider having a threadlocal cache of this variable to speed up reads?
    volatile boolean consumerStartedFlag = false; //标志consumer是否start, 由于需要在change后其他线程可以马上知道, 所以使用volatile
    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue(); //当consumer没有start的时候, cache event的queue

ConcurrentLinkedQueue, 使用CAS而非lock来实现的线程安全队列, 具体参考(http://blog.sina.com.cn/s/blog_5efa3473010129pj.html)

首先声明一组变量, 部分会在构造函数中被初始化 
最重要的结构就是RingBuffer, 这是个模板类, 这里从ObjectEventFactory()的实现也可以看出来, 初始化的时候在ringbuffer的每个entry上都创建一个MutableObject对象

MutableObject的实现很简单, 这是封装了object o, 为什么要做这层封装? 
为了避免Java GC, 对于RingBuffer一旦初始化好, 上面的所有的MutableObject都不会被释放, 你只是去对object o, set不同的值

_buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
public static class ObjectEventFactory implements EventFactory<MutableObject> {
    @Override
    public MutableObject newInstance() {
        return new MutableObject();
    }
}
public class MutableObject {
    Object o = null;
}

Publish

Publish过程, 可见当前ProducerBarrier已经被集成到RingBuffer里面, 所以直接调用_buffer的接口 
首先调用next, claim序号 
取出序号上的MutableObject, 并将输入obj set 
最后, publish当前序号, 表示consumer可以读取 
当consumer没有start时, 会将obj cache在_cache中, 而不会放到ringbuffer中 (我没有想明白why? 为何要使用低效的链表queue来cache, 而不直接放到ringbuffer里面)

    public void publish(Object obj, boolean block) throws InsufficientCapacityException {
        if(consumerStartedFlag) {
            final long id;
            if(block) {
                id = _buffer.next();
            } else {
                id = _buffer.tryNext(1);
            }
            final MutableObject m = _buffer.get(id);
            m.setObject(obj);
            _buffer.publish(id);
        } else {
            _cache.add(obj);
            if(consumerStartedFlag) flushCache();
        }
    }

Consume

consume的过程, 这里实现的时Batch consume, 即给定Cursor, 会一直consume到该cursor为止

_consumer代表当前已经被consume的序号, 所以从_consumer.get() + 1开始读 
取出MutableObject中的o, 并将MutableObject 清空 
根据o的情况, 3种情况, 
    1. 如果是FLUSH_CACHE对象, 将cache中的event读出调用handler.onEvent 
    2. 如果是INTERRUPT对象, 触发InterruptedException 
    3. 正常情况, 直接调用handler.onEvent处理该o, curr == cursor判断表示batch是否结束, 当读到cursor的时候结束

最终将_consumer置为cursor, 表示已经读到cursor位置

    private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
        for(long curr = _consumer.get() + 1; curr <= cursor; curr++) {
            try {
                MutableObject mo = _buffer.get(curr);
                Object o = mo.o;
                mo.setObject(null);
                if(o==FLUSH_CACHE) {
                    Object c = null;
                    while(true) {
                        c = _cache.poll();
                        if(c==null) break;
                        else handler.onEvent(c, curr, true);
                    }
                } else if(o==INTERRUPT) {
                    throw new InterruptedException("Disruptor processing interrupted");
                } else {
                    handler.onEvent(o, curr, curr == cursor);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        //TODO: only set this if the consumer cursor has changed?
        _consumer.set(cursor);
    }

backtype.storm.disruptor.clj

创建DisruptorQueue, 选用MultiThreadedClaimStrategy和BlockingWaitStrategy

(defnk disruptor-queue [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
  (DisruptorQueue. ((CLAIM-STRATEGY claim-strategy) buffer-size)
                   (mk-wait-strategy wait-strategy)
                   ))

并封装一系列Java接口 
最重要的工作是, 启动consume-loop 
这里ret是closeover了一个间隔为0的不停执行(consume-batch-when-available queue handler) 的线程, 而consumeBatchWhenAvailable的实现就是不停的sleep并调用consumeBatchToCursor

并且通过consumer-started!通知其他线程consumer已经start

(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
                      :thread-name nil]
  (let [ret (async-loop
              (fn []
                (consume-batch-when-available queue handler)
                0 )
              :kill-fn kill-fn
              :thread-name thread-name
              )]
     (consumer-started! queue)
     ret
     ))

(defmacro consume-loop [queue & handler-args]
  `(let [handler# (handler ~@handler-args)]
     (consume-loop* ~queue handler#)
     ))

看看async-loop实现什么功能? 
返回reify实现的record, 其中closeover了thread 
这个thread主要就是死循环的执行传入的afn, 并且以afn的返回值作为执行间隔

主要功能, 异步的loop, 开启新的线程来执行loop, 而不是在当前主线程, 并且提供了sleep设置

;; afn returns amount of time to sleep
(defnk async-loop [afn
                   :daemon false
                   :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
                   :priority Thread/NORM_PRIORITY
                   :factory? false
                   :start true
                   :thread-name nil]
  (let [thread (Thread.
                (fn []
                  (try-cause
                    (let [afn (if factory? (afn) afn)]
                      (loop []
                        (let [sleep-time (afn)]
                          (when-not (nil? sleep-time)
                            (sleep-secs sleep-time)
                            (recur))
                          )))
                    (catch InterruptedException e
                      (log-message "Async loop interrupted!")
                      )
                    (catch Throwable t
                      (log-error t "Async loop died!")
                      (kill-fn t)
                      ))
                  ))]
    (.setDaemon thread daemon)
    (.setPriority thread priority)
    (when thread-name
      (.setName thread (str (.getName thread) "-" thread-name)))
    (when start
      (.start thread))
    ;; should return object that supports stop, interrupt, join, and waiting?
    (reify SmartThread
      (start [this]
        (.start thread))
      (join [this]
        (.join thread))
      (interrupt [this]
        (.interrupt thread))
      (sleeping? [this]
        (Time/isThreadWaiting thread)
        ))
      ))

 

Storm在Worker中executors线程间通信, 如何使用Disruptor的?

 

Understanding the Internal Message Buffers of Storm, 可以参考

 本文章摘自博客园,原文发布日期:2013-07-10

时间: 2024-09-08 11:49:53

Storm-源码分析- Disruptor在storm中的使用的相关文章

springboot源码分析14-ApplicationContextInitializer原理Springboot中PropertySource注解多环境支持以及原理

摘要:Springboot中PropertySource注解的使用一文中,详细讲解了PropertySource注解的使用,通过PropertySource注解去加载指定的资源文件.然后将加载的属性注入到指定的配置类,@value以及@ConfigurationProperties的使用.但是也遗留一个问题,PropertySource注解貌似是不支持多种环境的动态切换?这个问题该如何解决呢?我们需要从源码中看看他到底是否支持. 首先,我们开始回顾一下上节课说的PropertySource注解的

Yarn源码分析之MapReduce作业中任务Task调度整体流程(一)

        v2版本的MapReduce作业中,作业JOB_SETUP_COMPLETED事件的发生,即作业SETUP阶段完成事件,会触发作业由SETUP状态转换到RUNNING状态,而作业状态转换中涉及作业信息的处理,是由SetupCompletedTransition来完成的,它主要做了四件事:         1.通过设置作业Job的成员变量setupProgress为1,标记作业setup已完成:         2.调度作业Job的Map Task:         3.调度作业的

Jquery源码分析---DOM元素(中)

5.2.2 width&heigth 对于元素的宽度和高度,dom元素提供了 client(clientHeight,clientWidth).offset(offsetHeight,offsetwidth). scroll(srollHeight,srollWidth)三种方式,这三种有什么区别呢? client=content+padding.Offset=content+padding+border.Scroll的宽度和 高度都是没有经过scroll的原始宽度和高度.也就是这个一般会大于现

PgSQL · 源码分析 · PG 优化器中的pathkey与索引在排序时的使用

概要 SQL在PostgreSQL中的处理,是类似于流水线方式的处理,先后由: 词法.语法解析,生成解析树后,将其交给语义解析 语义解析,生成查询树,将其交给Planner Planner根据查询树,生成执行计划,交给执行器 执行器执行完成后返回结果 数据库优化器在生成执行计划的时候,优化器会考虑是否需要使用索引,而使用了索引之后,则会考虑如何利用索引已经排过序的特点,来优化相关的排序,比如ORDER BY / GROUP BY等. 先来看个索引对ORDER BY起作用的例子: postgres

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net

Apache Storm源码阅读笔记&amp;OLAP在大数据时代的挑战

 <一>Apache Storm源码阅读笔记 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比较少,理解起来非常费劲. 尽管自己也陆续对storm的源码走读发表了一些博文,当时写的时候比较匆忙,有时候衔接的不是太好,此番做了一些整理,主要是针对TridentTopology部分,修改过的内容采用pdf格式发布,方便打印. 文章中有些内容的理解得益于徐明明和fxjwind两

详解SpringMVC中Controller的方法中参数的工作原理[附带源码分析] good

目录 前言 现象 源码分析 HandlerMethodArgumentResolver与HandlerMethodReturnValueHandler接口介绍 HandlerMethodArgumentResolver与HandlerMethodReturnValueHandler接口的具体应用 常用HandlerMethodArgumentResolver介绍 常用HandlerMethodReturnValueHandler介绍 本文开头现象解释以及解决方案 编写自定义的HandlerMet

Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现之DFSPacket

一.简介       HDFS在数据传输过程中,针对数据块Block,不是整个block进行传输的,而是将block切分成一个个的数据包进行传输.而DFSPacket就是HDFS数据传输过程中对数据包的抽象. 二.实现       HDFS客户端在往DataNodes节点写数据时,会以数据包packet的形式写入,且每个数据包包含一个包头,n个连续的校验和数据块checksum chunks和n个连续的实际数据块 actual data chunks,每个校验和数据块对应一个实际数据块,被用来做

Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

一.综述       HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作.       首先上一段代码,客户端是如何写文件的: Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path file = new Path("demo.txt"); FSDataOutputStream