Storm-源码分析- Messaging (backtype.storm.messaging)

先定义两个接口和一个类 
TaskMessage类本身比较好理解, 抽象storm的message格式 
对于IContext, 注释也说了, 定义messaging plugin, 通过什么渠道去发送message, storm这里设计成可替换的 
默认定义storm实现了local和ZMQ两种plugin, 当然你可以实现更多的 
local应该是用于local mode, 而ZMQ用于distributed mode

IContext接口主要是用于创建IConnection, 体现对socket的管理, 分别通过bind和connect定义服务器端和客户端的connection  
IConnection接口主要用于定义, 真正收发message的逻辑

最终通过TransportFactory, 根据Config.STORM_MESSAGING_TRANSPORT的配置, 利用Java的reflection动态的创建不同类型的context

IContext接口

/**
 * This interface needs to be implemented for messaging plugin.
 *
 * Messaging plugin is specified via Storm config parameter, storm.messaging.transport.
 *
 * A messaging plugin should have a default constructor and implements IContext interface.
 * Upon construction, we will invoke IContext::prepare(storm_conf) to enable context to be configured
 * according to storm configuration.
 */
public interface IContext {
    /**
     * This method is invoked at the startup of messaging plugin
     * @param storm_conf storm configuration
     */
    public void prepare(Map storm_conf);

    /**
     * This method is invoked when a worker is unload a messaging plugin
     */
    public void term();

    /**
     * This method establishes a server side connection
     * @param storm_id topology ID
     * @param port port #
     * @return server side connection
     */
    public IConnection bind(String storm_id, int port);

    /**
     * This method establish a client side connection to a remote server
     * @param storm_id topology ID
     * @param host remote host
     * @param port remote port
     * @return client side connection
     */
    public IConnection connect(String storm_id, String host, int port);
};

IConnection接口

public interface IConnection {
    /**
     * receive a message (consists taskId and payload)
     * @param flags 0: block, 1: non-block
     * @return
     */
    public TaskMessage recv(int flags);
    /**
     * send a message with taskId and payload
     * @param taskId task ID
     * @param payload
     */
    public void send(int taskId,  byte[] payload);

    /**
     * close this connection
     */
    public void close();
}

TaskMessage

TaskMessage如其名, 包含task和message字段, 以说明发送给哪个task的message 
并且定义了序列化和反序列化的函数

public class TaskMessage {
    private int _task;
    private byte[] _message;

    public TaskMessage(int task, byte[] message) {
        _task = task;
        _message = message;
    }

    public int task() {
        return _task;
    }

    public byte[] message() {
        return _message;
    }

    public ByteBuffer serialize() {
        ByteBuffer bb = ByteBuffer.allocate(_message.length+2);
        bb.putShort((short)_task);
        bb.put(_message);
        return bb;
    }

    public void deserialize(ByteBuffer packet) {
        if (packet==null) return;
        _task = packet.getShort();
        _message = new byte[packet.limit()-2];
        packet.get(_message);
    }
}

TransportFactory

public class TransportFactory {
    public static IContext makeContext(Map storm_conf) {
        //get factory class name
        String transport_plugin_klassName = (String)storm_conf.get(Config.STORM_MESSAGING_TRANSPORT);
        LOG.info("Storm peer transport plugin:"+transport_plugin_klassName);

        IContext transport = null;
        try {
            //create a factory class
            Class klass = Class.forName(transport_plugin_klassName);
            //obtain a context object
            Object obj = klass.newInstance();
            if (obj instanceof IContext) {
                //case 1: plugin is a IContext class
                transport = (IContext)obj;
                //initialize with storm configuration
                transport.prepare(storm_conf);
            } else {
                //case 2: Non-IContext plugin must have a makeContext(storm_conf) method that returns IContext object
                Method method = klass.getMethod("makeContext", Map.class);
                LOG.debug("object:"+obj+" method:"+method);
                transport = (IContext) method.invoke(obj, storm_conf);
            }
        } catch(Exception e) {
            throw new RuntimeException("Fail to construct messaging plugin from plugin "+transport_plugin_klassName, e);
        }
        return transport;
    }
}

 

可以详细看看local和ZMQ的plugin的实现

Local

在local模式下使用的message plugin 
实现比较简单, 所有都基于queues-map来实现, 这里的queue直接使用LinkedBlockingQueue, 因为local用于测试, 不用考虑高效性 
所有的接收队列或发送队列都通过add-queue!加到queues-map里面(stormid+port作为key) 
那么所有的recv和send, 都是基于queue的操作

(defn add-queue! [queues-map lock storm-id port]
  (let [id (str storm-id "-" port)]
    (locking lock
      (when-not (contains? @queues-map id)
        (swap! queues-map assoc id (LinkedBlockingQueue.))))
    (@queues-map id)))
(deftype LocalConnection [storm-id port queues-map lock queue]
  IConnection
  (^TaskMessage recv [this ^int flags]
    (when-not queue
      (throw (IllegalArgumentException. "Cannot receive on this socket")))
    (if (= flags 1)
      (.poll queue)
      (.take queue)))
  (^void send [this ^int taskId ^bytes payload]
    (let [send-queue (add-queue! queues-map lock storm-id port)]
      (.put send-queue (TaskMessage. taskId payload))
      ))
  (^void close [this]
    ))

(deftype LocalContext [^{:unsynchronized-mutable true} queues-map
                       ^{:unsynchronized-mutable true} lock]
  IContext
  (^void prepare [this ^Map storm-conf]
    (set! queues-map (atom {}))
    (set! lock (Object.)))
  (^IConnection bind [this ^String storm-id ^int port]
    (LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port)))
  (^IConnection connect [this ^String storm-id ^String host ^int port]
    (LocalConnection. storm-id port queues-map lock nil))
  (^void term [this]
    ))

这里使用Deftype, 而不是Defrecord, 即connection和context本身不需要对字典的支持 
并且在IContext的实现中, 使用到了可变field, 据说是比较难用对的高级特性 
我个人的理解, 是因为deftype和defrecord一样, 没有闭包的效果, 而只有field(对象成员)可以随时被接口函数访问, 所以有些场景下需要field的mutable, 比如这里的queues-map 
之前类似的场景都是用reify实现的, 这里给出用deftype实现的版本 

ZMQ

号称最快的消息队列, 接近socket API 的性能, 参考http://www.cnblogs.com/yjf512/archive/2012/03/03/2378024.html 
在distributed mode时, storm使用ZMQ作为进程间和instrance间通信

(deftype ZMQConnection [socket]
  IConnection
  (^TaskMessage recv [this ^int flags]
    (require 'backtype.storm.messaging.zmq)
    (if-let [packet (mq/recv socket flags)]
      (parse-packet packet)))
  (^void send [this ^int taskId ^bytes payload]
    (require 'backtype.storm.messaging.zmq)
    (mq/send socket (mk-packet taskId payload) ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
  (^void close [this]
    (.close socket)))
(deftype ZMQContext [^{:unsynchronized-mutable true} context
                     ^{:unsynchronized-mutable true} linger-ms
                     ^{:unsynchronized-mutable true} hwm
                     ^{:unsynchronized-mutable true} local?]
  IContext
  (^void prepare [this ^Map storm-conf]
    (let [num-threads (storm-conf ZMQ-THREADS)]
      (set! context (mq/context num-threads))
      (set! linger-ms (storm-conf ZMQ-LINGER-MILLIS))
      (set! hwm (storm-conf ZMQ-HWM))
      (set! local? (= (storm-conf STORM-CLUSTER-MODE) "local"))))
  (^IConnection bind [this ^String storm-id ^int port]
    (require 'backtype.storm.messaging.zmq)
    (-> context
      (mq/socket mq/pull)
      (mq/set-hwm hwm)
      (mq/bind (get-bind-zmq-url local? port))
      mk-connection
      ))
  (^IConnection connect [this ^String storm-id ^String host ^int port]
    (require 'backtype.storm.messaging.zmq)
    (-> context
      (mq/socket mq/push)
      (mq/set-hwm hwm)
      (mq/set-linger linger-ms)
      (mq/connect (get-connect-zmq-url local? host port))
      mk-connection))
  (^void term [this]
    (.term context))

  ZMQContextQuery
  (zmq-context [this]
    context))

本文章摘自博客园,原文发布日期:2013-07-16
时间: 2024-09-20 08:44:47

Storm-源码分析- Messaging (backtype.storm.messaging)的相关文章

Storm-源码分析- Scheduler (backtype.storm.scheduler)

首先看看IScheduler接口的定义, 主要实现两个接口, prepare和schedule 对于schedule的参数注释写的非常清楚,  topologies包含所有topology的静态信息, 而cluster中包含了topology的运行态信息  根据他们就可以来判断如何assignment package backtype.storm.scheduler; import java.util.Map; public interface IScheduler { void prepare

Storm-源码分析-acker (backtype.storm.daemon.acker)

backtype.storm.daemon.acker  设计的巧妙在于, 不用分别记录和track, stream过程中所有的tuple, 而只需要track root tuple, 而所有中间过程都通过异或更新track entry acker-init, 在spout发送一个tuple时触发, 初始化这个root tuple的track entry   acker-ack, 在blot ack一个tuple的时候触发, 会对该tuple的anchors-to-ids中记录的每个(root,

Storm-源码分析- hook (backtype.storm.hooks)

task hook 在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook 当前定义如下事件, emit, cleanup, spoutAck-- 用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS) 系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions   public interface ITaskHook { void prepare(Map conf, T

Storm-源码分析- spout (backtype.storm.spout)

1. ISpout接口 ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message.  如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message 必须要注意的是, spout线程会在一个线程中调用ack, fail, nextTuple, 所以不用考虑互斥,

Storm-源码分析-EventManager (backtype.storm.event)

大体结构, 定义protocol EventManager, 其实就是定义interface 函数event-manager, 主要做2件事  1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行  2. 返回实现EventManager的匿名record(reify部分, 实现protocol) 这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner (ns backtype.storm

Storm-源码分析- bolt (backtype.storm.task)

Bolt关键的接口为execute,  Tuple的真正处理逻辑, 通过OutputCollector.emit发出新的tuples, 调用ack或fail处理的tuple /** * An IBolt represents a component that takes tuples as input and produces tuples * as output. An IBolt can do everything from filtering to joining to functions

Storm-源码分析-Stats (backtype.storm.stats)

会发现, 现在storm里面有两套metrics系统, metrics framework和stats framework 并且在所有地方都是同时注册两套, 貌似准备用metrics来替代stats, 但当前版本UI仍然使用stats   这个模块统计的数据怎么被使用, 1. 在worker中, 会定期调用do-executor-heartbeats去往zk同步hb  可以看到, stats也会作为hb的一部分被同步到zk上 (defnk do-executor-heartbeats [work

Storm-源码分析-LocalState (backtype.storm.utils)

LocalState A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk. 基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景. public syn

Storm-源码分析- timer (backtype.storm.timer)

mk-timer timer是基于PriorityQueue实现的(和PriorityBlockingQueue区别, 在于没有阻塞机制, 不是线程安全的), 优先级队列是堆数据结构的典型应用  默认情况下, 按照自然顺序(其实就是默认comparator的定义), 最小的元素排在堆头  当然也可以自己重新实现comparator接口, 比如timer就用reify重新实现了comparator接口 整个过程其实比较简单, 开个timer-thread, 不断check PriorityQueu

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net