HBase源码分析之事件处理模型

        HBase是一个复杂的分布式非结构化数据库,它将表中的数据按照行的方向切分成一个个的Region,并在若干RegionServer上上线,依靠所在RegionServer对外提供数据读写IO服务。一开始,表中数据由于很少,只有一个Region。随着数据越来越多,一个Region已难以满足频繁的数据读写请求,所以,Region开始分裂。分裂后的两个Region又会按照一定策略选择RegionServer上线,继续对外提供数据读写服务。并且,HBase作为一个分布式数据库,肯定需要考虑负载均衡,它会按照某些策略选择若干Region,在比较繁忙的RegionServer上下线,转移到较为空闲的RegionSever上线继续提供高质量的数据读写服务。所有涉及到的这些Region的上线、下线、分裂,以及我们还没提到的合并等等流程,在HBase内部都是通过不同组件之间发送事件,然后按照一定策略调度执行的。这就是HBase的事件处理模型。

        那么,HBase的事件处理模型是如何实现的呢?本文,我们就将研究下HBase内部事件处理模型的实现。

        在HBase中有一个抽象类EventHandler,定义如下:

@InterfaceAudience.Private
public abstract class EventHandler implements Runnable, Comparable<Runnable> {

        它实现了Runnable接口,说明其子类是一个线程,而且,在它内部定义了以下成员变量:

  // type of event this object represents
  // 该对象代表的事件类型
  protected EventType eventType;

  // 服务器
  protected Server server;

  // sequence id generator for default FIFO ordering of events
  // 默认的FIFO调度的事件的序列化ID生成器
  protected static final AtomicLong seqids = new AtomicLong(0);

  // sequence id for this event
  // 该事件的序列化ID
  private final long seqid;

  // Listener to call pre- and post- processing.  May be null.
  // 监听器,可能为空
  private EventHandlerListener listener;

  // Time to wait for events to happen, should be kept short
  // 等待事件发生的时间
  protected int waitingTimeForEvents;

  // 祖先
  private final Span parent;

        监听器listener是实现了EventHandlerListener接口的实例,接口定义如下:

  /**
   * This interface provides pre- and post-process hooks for events.
   * 为事件提供事前和事后处理钩子的接口
   */
  public interface EventHandlerListener {
    /**
     * Called before any event is processed
     * 任何事件执行前被调用
     * @param event The event handler whose process method is about to be called.
     */
    void beforeProcess(EventHandler event);
    /**
     * Called after any event is processed
     * 任何事件执行后被调用
     * @param event The event handler whose process method is about to be called.
     */
    void afterProcess(EventHandler event);
  }

        接口就定义了两个方法,一个是任何事件执行前被调用的beforeProcess()方法和任何事件执行后被调用的afterProcess()方法。

        抽象类EventHandler既然实现了Runnable接口,那么其子类肯定是一个线程,而且其功能的实现,必然在核心方法run()方法内。下面,我们就看下这个run()方法,代码如下:

  /**
   * 线程实现功能的主方法,run()方法,是不是很像一个模板方法啊
   */
  public void run() {
<span style="white-space:pre">	</span>  
<span style="white-space:pre">	</span>// 开启一个TraceScope
    TraceScope chunk = Trace.startSpan(this.getClass().getSimpleName(), parent);
    
    try {
      
      // 先执行监听器的beforeProcess()方法
      if (getListener() != null) getListener().beforeProcess(this);
      
      // 接着执行process()方法
      process();
      
      // 最后执行监听器的afterProcess()方法
      if (getListener() != null) getListener().afterProcess(this);
      
    } catch(Throwable t) {
    <span style="white-space:pre">	</span>
      // 处理事件异常t
      handleException(t);
      
    } finally {
      
      // 关闭TraceScope
      chunk.close();
    }
  }

        第一感觉是不是个模板方法呢?它的主要流程就是:

        1、开启一个TraceScope;

        2、先执行监听器的beforeProcess()方法;

        3、接着执行process()方法;

        4、最后执行监听器的afterProcess()方法;

        5、关闭TraceScope。

        如果中间出现Throwable异常,则调用handleException()方法处理事件异常。

        关于监听器的beforeProcess()方法和afterProcess()方法我们在上面已经提到过了,这里不再赘述。关键是看一下process()方法,其定义如下:

  /**
   * This method is the main processing loop to be implemented by the various
   * subclasses.
   *
   * 核心方法process()方法是一个抽象方法,子类必须实现
   * @throws IOException
   */
  public abstract void process() throws IOException;

        它是一个抽象方法,也就意味着子类必须要实现它。并且,它是子类完成事件处理核心逻辑所必须执行的方法。

        另外,还实现了诸如获取监听器、设置监听器、获取优先级、获取事件序列ID等工具方法,十分简单,不再一一介绍,代码粘贴如下,读者可自行查看:

  /**
   * This method is the main processing loop to be implemented by the various
   * subclasses.
   *
   * 核心方法process()方法是一个抽象方法,子类必须实现
   * @throws IOException
   */
  public abstract void process() throws IOException;

  /**
   * Return the event type
   * 获取时事件类型
   * @return The event type.
   */
  public EventType getEventType() {
    return this.eventType;
  }

  /**
   * Get the priority level for this handler instance.  This uses natural
   * ordering so lower numbers are higher priority.
   * 获取handler实例的优先级,数字越低级别越高
   *
   * <p>
   * Lowest priority is Integer.MAX_VALUE.  Highest priority is 0.
   * <p>
   * Subclasses should override this method to allow prioritizing handlers.
   * <p>
   * Handlers with the same priority are handled in FIFO order.
   * <p>
   * @return Integer.MAX_VALUE by default, override to set higher priorities
   */
  public int getPriority() {
    return Integer.MAX_VALUE;
  }

  /**
   * 获取事件的序列号ID
   * @return This events' sequence id.
   */
  public long getSeqid() {
    return this.seqid;
  }

  /**
   * Default prioritized runnable comparator which implements a FIFO ordering.
   * <p>
   * Subclasses should not override this.  Instead, if they want to implement
   * priority beyond FIFO, they should override {@link #getPriority()}.
   *
   * 实现可比较接口的compareTo()方法,先比较优先级Priority,谁的优先级越小谁就越小。
   * 优先级相同的话,再比较事件序列号ID,谁的事件序列号ID越小谁就越小
   */
  @Override
  public int compareTo(Runnable o) {
    EventHandler eh = (EventHandler)o;
    if(getPriority() != eh.getPriority()) {
      return (getPriority() < eh.getPriority()) ? -1 : 1;
    }
    return (this.seqid < eh.seqid) ? -1 : 1;
  }

  /**
   * 获取事件的监听器
   * @return Current listener or null if none set.
   */
  public synchronized EventHandlerListener getListener() {
    return listener;
  }

  /**
   * 设置事件的监听器
   * @param listener Listener to call pre- and post- {@link #process()}.
   */
  public synchronized void setListener(EventHandlerListener listener) {
    this.listener = listener;
  }

  @Override
  public String toString() {
    return "Event #" + getSeqid() +
      " of type " + eventType +
      " (" + getInformativeName() + ")";
  }

  /**
   * Event implementations should override thie class to provide an
   * informative name about what event they are handling. For example,
   * event-specific information such as which region or server is
   * being processed should be included if possible.
   */
  public String getInformativeName() {
    return this.getClass().toString();
  }

  /**
   * 处理事件异常,可能被覆写
   * Event exception handler, may be overridden
   * @param t Throwable object
   */
  protected void handleException(Throwable t) {
    LOG.error("Caught throwable while processing event " + eventType, t);
  }

        接下来就有一个问题,继承了抽象类EventHandler的各种事件是如何被提交的?它们被提交到哪里,又是如何被调度执行的呢?别慌,下面我一一为大家解答。

        首先在HRegionServer上有一个叫做service的成员变量,定义如下:

  // Instance of the hbase executor service.
  // HBase执行服务的实例
  protected ExecutorService service;

        它是HRegionServer上执行各种事件的ExecutorService实例,而ExecutorService提供了通用的事件执行机制,它抽象了线程池、队列,EventType可以被提交,使用线程处理被添加到队列中的对象。如果要创建一个的服务, 创建该类的一个实例,并调用实例的startExecutorService()方法。当服务完成后,调用shutdown()方法。

        那么事件是如何被提交的呢?我们以Region上线为例,在HRegionServer对外提供RPC服务的RSRpcServices类的openRegion()方法中,Region上线事件OpenRegionHandler是通过以下方式被提交的,代码如下:

          // If there is no action in progress, we can submit a specific handler.
          // Need to pass the expected version in the constructor.
          // 如果对应Region上没有相关的操作在进行,我们可以提交一个特定的处理者
          if (region.isMetaRegion()) {
            regionServer.service.submit(new OpenMetaHandler(
              regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
          } else {
            regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
              regionOpenInfo.getFavoredNodesList());
            regionServer.service.submit(new OpenRegionHandler(
              regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
          }

        它就是通过HRegionServer中成员变量service的submit()方法,来提交OpenRegionHandler事件的。

        

时间: 2024-10-14 15:16:05

HBase源码分析之事件处理模型的相关文章

HBase源码分析之compact请求发起时机、判断条件等详情(一)

        一般说来,任何一个比较复杂的分布式系统,针对能够使得其性能得到大幅提升的某一内部处理流程,必然有一个定期检查机制,使得该流程在满足一定条件的情况下,能够自发的进行,这样才能够很好的体现出复杂系统的自我适应与自我调节能力.我们知道,HBase内部的compact处理流程是为了解决MemStore Flush之后,文件数目太多,导致读数据性能大大下降的一种自我调节手段,它会将文件按照某种策略进行合并,大大提升HBase的数据读性能.那么,基于我刚才的陈述,compact流程是否有一个

HBase源码分析之MemStore的flush发起时机、判断条件等详情(二)

        在<HBase源码分析之MemStore的flush发起时机.判断条件等详情>一文中,我们详细介绍了MemStore flush的发起时机.判断条件等详情,主要是两类操作,一是会引起MemStore数据大小变化的Put.Delete.Append.Increment等操作,二是会引起HRegion变化的诸如Regin的分裂.合并以及做快照时的复制拷贝等,同样会触发MemStore的flush流程.同时,在<HBase源码分析之compact请求发起时机.判断条件等详情(一

HBase源码分析之HRegionServer上MemStore的flush处理流程(一)

        在<HBase源码分析之HRegion上MemStore的flsuh流程(一)>.<HBase源码分析之HRegion上MemStore的flsuh流程(二)>等文中,我们介绍了HRegion上Memstore flush的主体流程和主要细节.但是,HRegion只是HBase表中按照行的方向对一片连续的数据区域的抽象,它并不能对外提供单独的服务,供客户端或者HBase其它实体调用.而HRegion上MemStore的flush还是要通过HRegionServer来

HBase源码分析之HRegion上MemStore的flsuh流程(二)

        继上篇<HBase源码分析之HRegion上MemStore的flsuh流程(一)>之后,我们继续分析下HRegion上MemStore flush的核心方法internalFlushcache(),它的主要流程如图所示:         其中,internalFlushcache()方法的代码如下: /** * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of upda

HBase源码分析之HRegion上compact流程分析(三)

        在<HBase源码分析之HRegion上compact流程分析(二)>一文中,我们没有讲解真正执行合并的CompactionContext的compact()方法.现在我们来分析下它的具体实现.         首先,CompactionContext表示合并的上下文信息,它只是一个抽象类,其compact()并没有实现,代码如下: /** * Runs the compaction based on current selection. select/forceSelect

HBase源码分析之MemStore的flush发起时机、判断条件等详情

        前面的几篇文章,我们详细介绍了HBase中HRegion上MemStore的flsuh流程,以及HRegionServer上MemStore的flush处理流程.那么,flush到底是在什么情况下触发的呢?本文我们将详细探究下HBase中MemStore的flush流程的发起时机,看看到底都有哪些操作,或者哪些后台服务进程会触发MemStore的flush.         首先,在<HBase源码分析之HRegionServer上MemStore的flush处理流程(一)>和

HBase源码分析之HRegionServer上MemStore的flush处理流程(二)

        继上篇文章<HBase源码分析之HRegionServer上MemStore的flush处理流程(一)>遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程,重点讲述下如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的.         我们先来看下第一个问题:如何选择一个HRegion进行flush以缓解MemStore压力.上文中我们讲到过flush处理线程如果从flus

HBase源码分析之HRegion上MemStore的flsuh流程(一)

        了解HBase架构的用户应该知道,HBase是一种基于LSM模型的分布式数据库.LSM的全称是Log-Structured Merge-Trees,即日志-结构化合并-树.相比于Oracle普通索引所采用的B+树,LSM模型的最大特点就是,在读写之间采取一种平衡,牺牲部分读数据的性能,来大幅度的提升写数据的性能.通俗的讲,HBase写数据如此快,正是由于基于LSM模型,将数据写入内存和日志文件后即立即返回.         但是,数据始终在内存和日志中是不妥当的,首先内存毕竟是有

写在HBase源码分析前面的话

       本版块全部为HBase1.0.2相关源码分析文章,系个人研究源码原创写成,除对部分引用标示外,其余均为原创,或翻译源码注释.        该系列文章为与网友交流HBase学习,不做任何其他商业用途~~O(∩_∩)O哈哈~         由于水平有限,文章基本上是边读源码,边翻译注释,边分析源码写成,没有较强的前后逻辑性,我会在写完全部文章后再回头整理.         由于水平有限,文章中可能存在理解错误的地方,欢迎各位针对文章中的错误.问题或者其他指导建议,踊跃评论,多多指点