HBase是一个复杂的分布式非结构化数据库,它将表中的数据按照行的方向切分成一个个的Region,并在若干RegionServer上上线,依靠所在RegionServer对外提供数据读写IO服务。一开始,表中数据由于很少,只有一个Region。随着数据越来越多,一个Region已难以满足频繁的数据读写请求,所以,Region开始分裂。分裂后的两个Region又会按照一定策略选择RegionServer上线,继续对外提供数据读写服务。并且,HBase作为一个分布式数据库,肯定需要考虑负载均衡,它会按照某些策略选择若干Region,在比较繁忙的RegionServer上下线,转移到较为空闲的RegionSever上线继续提供高质量的数据读写服务。所有涉及到的这些Region的上线、下线、分裂,以及我们还没提到的合并等等流程,在HBase内部都是通过不同组件之间发送事件,然后按照一定策略调度执行的。这就是HBase的事件处理模型。
那么,HBase的事件处理模型是如何实现的呢?本文,我们就将研究下HBase内部事件处理模型的实现。
在HBase中有一个抽象类EventHandler,定义如下:
@InterfaceAudience.Private public abstract class EventHandler implements Runnable, Comparable<Runnable> {
它实现了Runnable接口,说明其子类是一个线程,而且,在它内部定义了以下成员变量:
// type of event this object represents // 该对象代表的事件类型 protected EventType eventType; // 服务器 protected Server server; // sequence id generator for default FIFO ordering of events // 默认的FIFO调度的事件的序列化ID生成器 protected static final AtomicLong seqids = new AtomicLong(0); // sequence id for this event // 该事件的序列化ID private final long seqid; // Listener to call pre- and post- processing. May be null. // 监听器,可能为空 private EventHandlerListener listener; // Time to wait for events to happen, should be kept short // 等待事件发生的时间 protected int waitingTimeForEvents; // 祖先 private final Span parent;
监听器listener是实现了EventHandlerListener接口的实例,接口定义如下:
/** * This interface provides pre- and post-process hooks for events. * 为事件提供事前和事后处理钩子的接口 */ public interface EventHandlerListener { /** * Called before any event is processed * 任何事件执行前被调用 * @param event The event handler whose process method is about to be called. */ void beforeProcess(EventHandler event); /** * Called after any event is processed * 任何事件执行后被调用 * @param event The event handler whose process method is about to be called. */ void afterProcess(EventHandler event); }
接口就定义了两个方法,一个是任何事件执行前被调用的beforeProcess()方法和任何事件执行后被调用的afterProcess()方法。
抽象类EventHandler既然实现了Runnable接口,那么其子类肯定是一个线程,而且其功能的实现,必然在核心方法run()方法内。下面,我们就看下这个run()方法,代码如下:
/** * 线程实现功能的主方法,run()方法,是不是很像一个模板方法啊 */ public void run() { <span style="white-space:pre"> </span> <span style="white-space:pre"> </span>// 开启一个TraceScope TraceScope chunk = Trace.startSpan(this.getClass().getSimpleName(), parent); try { // 先执行监听器的beforeProcess()方法 if (getListener() != null) getListener().beforeProcess(this); // 接着执行process()方法 process(); // 最后执行监听器的afterProcess()方法 if (getListener() != null) getListener().afterProcess(this); } catch(Throwable t) { <span style="white-space:pre"> </span> // 处理事件异常t handleException(t); } finally { // 关闭TraceScope chunk.close(); } }
第一感觉是不是个模板方法呢?它的主要流程就是:
1、开启一个TraceScope;
2、先执行监听器的beforeProcess()方法;
3、接着执行process()方法;
4、最后执行监听器的afterProcess()方法;
5、关闭TraceScope。
如果中间出现Throwable异常,则调用handleException()方法处理事件异常。
关于监听器的beforeProcess()方法和afterProcess()方法我们在上面已经提到过了,这里不再赘述。关键是看一下process()方法,其定义如下:
/** * This method is the main processing loop to be implemented by the various * subclasses. * * 核心方法process()方法是一个抽象方法,子类必须实现 * @throws IOException */ public abstract void process() throws IOException;
它是一个抽象方法,也就意味着子类必须要实现它。并且,它是子类完成事件处理核心逻辑所必须执行的方法。
另外,还实现了诸如获取监听器、设置监听器、获取优先级、获取事件序列ID等工具方法,十分简单,不再一一介绍,代码粘贴如下,读者可自行查看:
/** * This method is the main processing loop to be implemented by the various * subclasses. * * 核心方法process()方法是一个抽象方法,子类必须实现 * @throws IOException */ public abstract void process() throws IOException; /** * Return the event type * 获取时事件类型 * @return The event type. */ public EventType getEventType() { return this.eventType; } /** * Get the priority level for this handler instance. This uses natural * ordering so lower numbers are higher priority. * 获取handler实例的优先级,数字越低级别越高 * * <p> * Lowest priority is Integer.MAX_VALUE. Highest priority is 0. * <p> * Subclasses should override this method to allow prioritizing handlers. * <p> * Handlers with the same priority are handled in FIFO order. * <p> * @return Integer.MAX_VALUE by default, override to set higher priorities */ public int getPriority() { return Integer.MAX_VALUE; } /** * 获取事件的序列号ID * @return This events' sequence id. */ public long getSeqid() { return this.seqid; } /** * Default prioritized runnable comparator which implements a FIFO ordering. * <p> * Subclasses should not override this. Instead, if they want to implement * priority beyond FIFO, they should override {@link #getPriority()}. * * 实现可比较接口的compareTo()方法,先比较优先级Priority,谁的优先级越小谁就越小。 * 优先级相同的话,再比较事件序列号ID,谁的事件序列号ID越小谁就越小 */ @Override public int compareTo(Runnable o) { EventHandler eh = (EventHandler)o; if(getPriority() != eh.getPriority()) { return (getPriority() < eh.getPriority()) ? -1 : 1; } return (this.seqid < eh.seqid) ? -1 : 1; } /** * 获取事件的监听器 * @return Current listener or null if none set. */ public synchronized EventHandlerListener getListener() { return listener; } /** * 设置事件的监听器 * @param listener Listener to call pre- and post- {@link #process()}. */ public synchronized void setListener(EventHandlerListener listener) { this.listener = listener; } @Override public String toString() { return "Event #" + getSeqid() + " of type " + eventType + " (" + getInformativeName() + ")"; } /** * Event implementations should override thie class to provide an * informative name about what event they are handling. For example, * event-specific information such as which region or server is * being processed should be included if possible. */ public String getInformativeName() { return this.getClass().toString(); } /** * 处理事件异常,可能被覆写 * Event exception handler, may be overridden * @param t Throwable object */ protected void handleException(Throwable t) { LOG.error("Caught throwable while processing event " + eventType, t); }
接下来就有一个问题,继承了抽象类EventHandler的各种事件是如何被提交的?它们被提交到哪里,又是如何被调度执行的呢?别慌,下面我一一为大家解答。
首先在HRegionServer上有一个叫做service的成员变量,定义如下:
// Instance of the hbase executor service. // HBase执行服务的实例 protected ExecutorService service;
它是HRegionServer上执行各种事件的ExecutorService实例,而ExecutorService提供了通用的事件执行机制,它抽象了线程池、队列,EventType可以被提交,使用线程处理被添加到队列中的对象。如果要创建一个的服务, 创建该类的一个实例,并调用实例的startExecutorService()方法。当服务完成后,调用shutdown()方法。
那么事件是如何被提交的呢?我们以Region上线为例,在HRegionServer对外提供RPC服务的RSRpcServices类的openRegion()方法中,Region上线事件OpenRegionHandler是通过以下方式被提交的,代码如下:
// If there is no action in progress, we can submit a specific handler. // Need to pass the expected version in the constructor. // 如果对应Region上没有相关的操作在进行,我们可以提交一个特定的处理者 if (region.isMetaRegion()) { regionServer.service.submit(new OpenMetaHandler( regionServer, regionServer, region, htd, masterSystemTime, coordination, ord)); } else { regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(), regionOpenInfo.getFavoredNodesList()); regionServer.service.submit(new OpenRegionHandler( regionServer, regionServer, region, htd, masterSystemTime, coordination, ord)); }
它就是通过HRegionServer中成员变量service的submit()方法,来提交OpenRegionHandler事件的。