Storm中的可靠性

     我们知道Storm有一个很重要的特性,那就是Storm API能够保证它的一个Tuple能够被完全处理,这一点尤为重要,其实storm中的可靠性是由spout和bolt组件共同完成的,下面就从spout和bolt两个方便给大家介绍一下storm中的可靠性,最后会给出一个实现了可靠性的例子。

1.Spout的可靠性保证

     在Storm中,消息处理可靠性从Spout开始的。storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fail的处理, 如果一个tuple被处理成功,那么spout便会调用其ack方法,如果失败,则会调用fail方法。而topology中处理tuple的每一个bolt都会通过OutputCollector来告知storm,当前bolt处理是否成功。

     我们知道spout必须能够追踪它发射的所有tuples或其子tuples,并且在这些tuples处理失败时能够重发。那么spout如何追踪tuple呢?storm是通过一个简单的anchor机制来实现的(在下面的bolt可靠性中会讲到)。

     

     如上图所示,实线代表的是spout发射的根tuple,而虚线代表的就是来源于根tuple的子tuples。这个图就是一个TupleTree。在这个tree中,所有的bolt都会ack或fail一个tuple,如果tree中所有的bolt都ack了经过它的tuple,那么Spout的ack方法就会被调用,表示整个消息被处理完成。如果tree中的任何一个bolt fail一个tuple,或者整个处理过程超时,则Spout的fail方法便会被调用。

     另外一点, storm只是通过ack/fail机制来告诉应用方bolt中间的处理情况, 对于成功/失败该如何处理, 必须由应用自己来决定, 因为storm内部也没有保存失败的具体数据, 但是也有办法知道失败记录,因为spout的ack/fail方法会附带一个msgId对象, 我们可以在最初发射tuple的时候将将msgId设置为tuple, 然后在ack/fail中对该tuple进行处理。这里其实有个问题, 就是每个bolt执行完之后要显式的调用ack/fail,否则会出现tuple不释放导致oom.
不知道storm在最初设计的时候,为什么不将bolt的ack设置为默认调用。

     Storm的ISpout接口定义了三个与可靠性有关的方法:nextTuple,ack和fail。

public interface ISpout extends Serializable {
           void open( Map conf, TopologyContext context, SpoutOutputCollector collector);
           void close();
           void nextTuple();
           void ack(Object msgId);
           void fail(Object msgId);
    }

     我们知道,当Storm的Spout发射一个Tuple后,他便会调用nextTuple()方法,在这个过程中,保证可靠性处理的第一步就是为发射出的Tuple分配一个唯一的ID,并把这个ID传给emit()方法:

collector.emit( new Values("value1" , "value2") , msgId );

     为Tuple分配一个唯一ID的目的就是为了告诉Storm,Spout希望这个Tuple产生的Tuple tree在处理完成或失败后告知它,如果Tuple被处理成功,Spout的ack()方法就会被调用,相反如果处理失败,Spout的fail()方法就会被调用,Tuple的ID也都会传入这两个方法中。

     需要注意的是,虽然spout有可靠性机制,但这个机制是否启用由我们控制的。IBasicBolt在emit一个tuple后自动调用ack()方法,用来实现比较简单的计算。如果是IRichBolt的话,如果想要实现anchor,必须自己调用ack方法。

2.Bolt中的可靠性

     Bolt中的可靠性主要靠两步来实现:

    1. 发射衍生Tuple的同时anchor原Tuple
    2. 对各个Tuples做ack或fail处理     

     anchor一个Tuple就意味着在输入Tuple和其衍生Tuple之间建立了关联,关联之后的Tuple便加入了Tuple tree。我们可以通过如下方式anchor一个Tuple:

collector.emit( tuple, new Values( word));

     如果我们发射新tuple的时候不同时发射元tuple,那么新发射的Tuple不会参与到整个可靠性机制中,它们的fail不会引起root tuple的重发,我们成为unanchor:

 collector.emit( new Values( word));

     ack和fail一个tuple的操作方法: 

this .collector.ack(tuple);
this .collector.fail(tuple);

     上面讲过了,IBasicBolt 实现类不关心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail来决定. 其execute方法的BasicOutputCollector参数也没有提供ack/fail方法给你调用. 相当于忽略了该bolt的ack/fail行为。
     在 IRichBolt实现类中, 如果OutputCollector.emit(oldTuple,newTuple)这样调用来发射tuple(anchoring), 那么后面的bolt的ack/fail会影响spout ack/fail, 如果collector.emit(newTuple)这样来发射tuple(在storm称之为anchoring), 则相当于断开了后面bolt的ack/fail对spout的影响.spout将立即根据当前bolt前面的ack/fail的情况来决定调用spout的ack/fail.
所以某个bolt后面的bolt的成功失败对你来说不关心, 你可以直接通过这种方式来忽略.中间的某个bolt fail了, 不会影响后面的bolt执行, 但是会立即触发spout的fail. 相当于短路了, 后面bolt虽然也执行了, 但是ack/fail对spout已经无意义了. 也就是说, 只要bolt集合中的任何一个fail了, 会立即触发spout的fail方法. 而ack方法需要所有的bolt调用为ack才能触发. 所以IBasicBolt用来做filter或者简单的计算比较合适。

3.总结

    storm的可靠性是由spout和bolt共同决定的,storm利用了anchor机制来保证处理的可靠性。如果spout发射的一个tuple被完全处理,那么spout的ack方法即会被调用,如果失败,则其fail方法便会被调用。在bolt中,通过在emit(oldTuple,newTuple)的方式来anchor一个tuple,如果处理成功,则需要调用bolt的ack方法,如果失败,则调用其fail方法。一个tuple及其子tuple共同构成了一个tupletree,当这个tree中所有tuple在指定时间内都完成时spout的ack才会被调用,但是当tree中任何一个tuple失败时,spout的fail方法则会被调用。

     IBasicBolt类会自动调用ack/fail方法,而IRichBolt则需要我们手动调用ack/fail方法。我们可以通过TOPOLOGY_MESSAGE_TIMEOUT_SECS参数来指定一个tuple的处理完成时间,若这个时间未被处理完成,则spout也会调用fail方法。

4.一个可靠的WordCount例子

一个实现可靠性的spout:    

 public class ReliableSentenceSpout extends BaseRichSpout {
     private static final long serialVersionUID = 1L;
     private ConcurrentHashMap<UUID, Values> pending;
     private SpoutOutputCollector collector;
     private String[] sentences = { "my dog has fleas", "i like cold beverages" , "the dog ate my homework" , "don't have a cow man" , "i don't think i like fleas" };
     private int index = 0;
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare( new Fields( "sentence"));
      }
     public void open( Map config, TopologyContext context, SpoutOutputCollector collector) {
           this. collector = collector;
           this. pending = new ConcurrentHashMap<UUID, Values>();
      }
     public void nextTuple() {
          Values values = new Values( sentences[ index]);
          UUID msgId = UUID. randomUUID();
           this. pending.put(msgId, values);
           this. collector.emit(values, msgId);
           index++;
           if ( index >= sentences. length) {
               index = 0;
          }
           //Utils.waitForMillis(1);
      }
     public void ack(Object msgId) {
           this. pending.remove(msgId);
      }
     public void fail(Object msgId) {
           this. collector.emit( this. pending.get(msgId), msgId);
      }
 }

一个实现可靠性的bolt:

public class ReliableSplitSentenceBolt extends BaseRichBolt {
     private OutputCollector collector;
     public void prepare( Map config, TopologyContext context, OutputCollector collector) {
           this. collector = collector;
      }
     public void execute(Tuple tuple) {
          String sentence = tuple.getStringByField("sentence" );
          String[] words = sentence.split( " ");
           for (String word : words) {
               this. collector.emit(tuple, new Values(word));
          }
           this. collector.ack(tuple);
      }
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare( new Fields( "word"));
      }
 }

 

时间: 2024-11-03 19:39:57

Storm中的可靠性的相关文章

分布式计算系统storm中worker、executor、task比较

storm基础框架分析 本文我们要证明的主要问题是:在Topology中我们可以指定spout.bolt的并行度,在提交Topology时Storm如何将spout.bolt自动发布到每个服务器并且控制服务的CPU.磁盘等资源的? worker.executor.task的关系 nimbus将可以工作的worker称为worker-slot. nimbus是整个集群的控管核心,总体负责了topology的提交.运行状态监控.负载均衡及任务重新分配,等等工作. nimbus分配的任务包含了topo

如何将win8电脑中的可靠性历史记录功能关闭

  首先,小编有必要来解释一下什么是这个可靠性历史记录功能,这个可靠性历史记录功能就是咱们win8系统中对于系统过去状态的一种评分机制,如果系统在一段时间内都保持稳定运行的话,那么这个可靠性历史记录功能便会将它的分数慢慢提高,而咱们的win8系统也会根据这个评分不定时的向咱们的用户推送可靠性问题报告的解决方法,不少用户会觉得这个功能没有很实用,而且时间一长,这个报告就会堆积起来,占用咱们win8系统的空间,那么如何才能解决这个问题呢?下面,小编就来详细的介绍一下吧! 1.首先,咱们需要进入到wi

Storm-源码分析- Disruptor在storm中的使用

Disruptor 2.0, (http://ifeve.com/disruptor-2-change/) Disruptor为了更便于使用, 在2.0做了比较大的调整, 比较突出的是更换了几乎所有的概念名 老版本, 新版本, 从左到右的变化如下, 1. Producer –> Publisher  2. ProducerBarrier被integrate到RingBuffer里面, 叫做PublishPort, 提供publish接口  3. Entry –> Event  4, Curso

Twitter Storm中Bolt消息传递路径之源码解读

Bolt作为task被executor执行,而executor是一个个的线程,所以executor必须存在于具体的process之中,而这个process就是worker.至于worker是如何被supervisor创建,尔后worker又如何创建executor线程,这些暂且按下不表.   假设同属于一个Topology的Spout与Bolt分别处于不同的JVM,即不同的worker中,不同的JVM可能处于同一台物理机器,也可能处于不同的物理机器中.为了让情景简单,认为JVM处于不同的物理机器

Storm-源码分析- Storm中Zookeeper的使用

在backtype.storm.cluster.clj中, 定义了storm对于Zookeeper的使用   ClusterState 首先定义操作Zookeeper集群的interface (defprotocol ClusterState (set-ephemeral-node [this path data]) (delete-node [this path]) (create-sequential [this path data]) (set-data [this path data])

【Hadoop Summit Tokyo 2016】Apache Storm中的资源感知调度

本讲义出自Jerry Peng在Hadoop Summit Tokyo 2016上的演讲,主要介绍了Apache Storm的相关知识内容.目前遇到的挑战和问题并且对于资源感知调度器进行了详细介绍.

Apache Storm 官方文档 —— 消息的可靠性保障

原文链接    译者:魏勇 Storm 能够保证每一个由 Spout 发送的消息都能够得到完整地处理.本文详细解释了 Storm 如何实现这种保障机制,以及作为用户如何使用好 Storm 的可靠性机制. 消息的"完整性处理"是什么意思 一个从 spout 中发送出的 tuple 会产生上千个基于它创建的 tuples.例如,有这样一个 word-count 拓扑: TopologyBuilder builder = new TopologyBuilder(); builder.setS

Apache Storm 官方文档 —— 在生产环境中运行拓扑

在生产环境集群中运行拓扑的方式与本地模式非常相似,主要包括以下几个步骤: 1) 定义拓扑(如果使用 Java 进行开发就可以使用 TopologyBuilder) 2) 使用 StormSubmitter 向集群提交拓扑.StormSubmitter 接收拓扑名称.拓扑配置信息以及拓扑对象本身作为参数,如下所示: Config conf = new Config(); conf.setNumWorkers(20); conf.setMaxSpoutPending(5000); StormSubm

Storm集群中运行的各种组件及其并行

一.Storm中运行的组件      我们知道,Storm的强大之处就是可以很容易地在集群中横向拓展它的计算能力,它会把整个运算过程分割成多个独立的tasks在集群中进行并行计算.在Storm中,一个task就是运行在集群中的一个Spout或Bolt实例.      为了方便理解Storm如何并行处理我们分给它的任务,这里我先介绍一下在集群中涉及到Topology的四种组件: Nodes(machines):集群中的节点,就是这些节点一起工作来执行Topology. Workers(JVMs):