AsyncDispatcher是Yarn中事件异步分发器,它是ResourceManager中的一个基于阻塞队列的分发或者调度事件的组件,其在一个特定的单线程中分派事件,交给AsyncDispatcher中之前注册的针对该事件所属事件类型的事件处理器EventHandler来处理。每个事件类型类可能会有多个处理渠道,即多个事件处理器,可以使用一个线程池调度事件。在Yarn的主节点ResourceManager中,就有一个Dispatcher类型的成员变量rmDispatcher,定义如下:
private Dispatcher rmDispatcher;
而rmDispatcher的初始化则在基于AbstractService的ResourceManager服务初始化的serviceInit()方法中,关键代码如下:
rmDispatcher = setupDispatcher();
继续追踪setupDispatcher()方法,如下:
/** * Register the handlers for alwaysOn services */ private Dispatcher setupDispatcher() { Dispatcher dispatcher = createDispatcher(); dispatcher.register(RMFatalEventType.class, new ResourceManager.RMFatalEventDispatcher()); return dispatcher; }
实际上就是通过createDispatcher()方法创建了一个AsyncDispatcher实例,代码如下:
protected Dispatcher createDispatcher() { return new AsyncDispatcher(); }
我们先看下AsyncDispatcher的成员变量有哪些,如下:
// 待调度处理事件阻塞队列 private final BlockingQueue<Event> eventQueue; // AsyncDispatcher是否停止的标志位 private volatile boolean stopped = false; // Configuration flag for enabling/disabling draining dispatcher's events on // stop functionality. // 在stop功能中开启/禁用流尽分发器事件的配置标志位 private volatile boolean drainEventsOnStop = false; // Indicates all the remaining dispatcher's events on stop have been drained // and processed. // stop功能中所有剩余分发器事件已经被处理或流尽的标志位 private volatile boolean drained = true; // drained的等待锁 private Object waitForDrained = new Object(); // For drainEventsOnStop enabled only, block newly coming events into the // queue while stopping. // 在AsyncDispatcher停止过程中阻塞新近到来的事件进入队列的标志位,仅当drainEventsOnStop启用(即为true)时有效 private volatile boolean blockNewEvents = false; // 事件处理器实例 private EventHandler handlerInstance = null; // 事件处理调度线程 private Thread eventHandlingThread; // 事件类型枚举类Enum到事件处理器EventHandler实例的映射集合 protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; // 标志位:确保调度程序崩溃,但不做系统退出system-exit private boolean exitOnDispatchException;
AsyncDispatcher中最重要的一个成员变量则是待调度处理事件阻塞队列eventQueue,它是一个阻塞队列,存储的是全部等待调度处理的事件,默认的实现为线程安全的链式阻塞队列LinkedBlockingQueue,这在其无参构造方法中有体现,如下:
// 无参构造函数 public AsyncDispatcher() { // 调用有参构造函数,传入线程安全的链式阻塞队列LinkedBlockingQueue实例 this(new LinkedBlockingQueue<Event>()); }
而有参构造函数除了赋值eventQueue外,还会初始化eventDispatchers集合为HashMap,其专门用来存储事件类型枚举类Enum至事件处理器EventHandler实例的映射关系,所有被分发器分发的事件,都必须在按照其所属事件类型在eventDispatchers中注册一个事件处理器EventHandler,等待指定线程调度到该事件后,由其所属事件类型对应的事件处理器EventHandler进行处理,如果不注册事件处理器,则分发器不会对事件进行分发。
我们上面所说的特定线程就是eventHandlingThread,它是AsyncDispatcher中一个特定的单线程,由其从事件队列中取出事件,并从eventDispatchers中查找事件处理器EventHandler,然后转交EventHandler进行事件的处理。
AsyncDispatcher中还有一些标志位,如下:
1、stopped:AsyncDispatcher是否停止的标志位;
2、drainEventsOnStop:在stop功能中开启/禁用流尽分发器事件的配置标志位,如果启动,则AsyncDispatcher停止前需要先处理完待调度处理事件队列eventQueue中的事件,否则直接停止;
3、drained:stop功能中所有剩余分发器事件已经被处理或流尽的标志位,为true表示待调度处理事件已处理完,为false则表示尚未处理完;
4、waitForDrained:标志位drained上的等待锁;
5、blockNewEvents:在AsyncDispatcher停止过程中阻塞新近到来的事件进入队列的标志位,仅当drainEventsOnStop启用(即为true)时有效;
6、exitOnDispatchException:调度程序崩溃时是否做系统退出system-exit;
我们发现,AsyncDispatcher继承自AbstractService,那么它就是Hadoop中的一种抽象服务,其就必须遵循构造实例后先初始化再启动的规则。我们下看下它初始化的serviceInit()方法,如下:
@Override protected void serviceInit(Configuration conf) throws Exception { // 取参数yarn.dispatcher.exit-on-error, 参数未配置默认为false this.exitOnDispatchException = conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); super.serviceInit(conf); }
除了调用父类的serviceInit()方法设置配置信息成员变量外,它所做的唯一一件事就是确定exitOnDispatchException,取参数yarn.dispatcher.exit-on-error, 参数未配置默认为false。
做为一个服务,初始化过后就该启动,我们再看其启动serviceStart()方法,如下:
@Override protected void serviceStart() throws Exception { //start all the components super.serviceStart(); // 创建事件处理调度线程eventHandlingThread eventHandlingThread = new Thread(createThread()); // 设置线程名为AsyncDispatcher event handler eventHandlingThread.setName("AsyncDispatcher event handler"); // 启动事件处理调度线程eventHandlingThread eventHandlingThread.start(); }
很简单,创建一个事件处理调度线程eventHandlingThread,设置线程名为"AsyncDispatcher event handler",并启动线程。这个事件处理调度线程eventHandlingThread是通过createThread()来定义的,如下:
Runnable createThread() { return new Runnable() { @Override public void run() { // 标志位stopped为false,即AsyncDispatcher实例未停止的话,且当前线程未中断的话,一直运行 while (!stopped && !Thread.currentThread().isInterrupted()) { // 判断事件调度队列eventQueue是否为空,并赋值给标志位drained drained = eventQueue.isEmpty(); // blockNewEvents is only set when dispatcher is draining to stop, // adding this check is to avoid the overhead of acquiring the lock // and calling notify every time in the normal run of the loop. // 如果停止过程中阻止新的事件加入待处理队列,即标志位blockNewEvents为true if (blockNewEvents) { synchronized (waitForDrained) { <span style="white-space:pre"> </span> // 如果待处理队列中的事件都已调度完毕,调用waitForDrained的notify()方法通知等待者 if (drained) { waitForDrained.notify(); } } } Event event; try { // 从事件调度队列eventQueue中取出一个事件 // take()方法为取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻塞进入等待状态直到 // BlockingQueue有新的数据被加入 event = eventQueue.take(); } catch(InterruptedException ie) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", ie); } return; } // 如果取出待处理事件event,即不为null if (event != null) { // 调用dispatch()方法进行分发 dispatch(event); } } } }; }
我们看下线程的主体逻辑,它的run()方法有一个while循环,标志位stopped为false,即AsyncDispatcher实例未停止的话,且当前线程未中断的话,一直运行,大体如下:
(一)先处理下特殊情况:
1、判断事件调度队列eventQueue是否为空,并赋值给标志位drained;
2、如果停止过程中阻止新的事件加入待处理队列,即标志位blockNewEvents为true,这个标志位为true是在停止服务的serviceStop()方法中,当drainEventsOnStop为true时被设置的,即AsyncDispatcher停止前需要先处理完待调度处理事件队列eventQueue中的事件:
2.1如果待处理队列中的事件都已调度完毕(标志位drained为true),调用waitForDrained的notify()方法通知等待者,也就是服务停止serviceStop()方法;
(二)然后是正常事件调度处理过程:
1、从事件调度队列eventQueue中取出一个事件:
take()方法为取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻塞进入等待状态直到lockingQueue有新的数据被加入;
2、如果取出待处理事件event,即不为null,调用dispatch()方法进行分发:
2.1、根据事件event获取事件类型枚举类type;
2.2、根据事件类型枚举类type,从eventDispatchers中获取事件处理器EventHandler实例handler;
2.3、如果handler不为空,调用handler的handle()方法处理事件event,否则抛出异常,提示针对事件类型type的事件处理器handler没有注册;
而当线程遇到InterruptedException异常时,即外部中断该线程时,如果stopped标志位为false,非AsyncDispatcher服务正常停止情况下的中断,则记录warn级别日志信息,最后统一返回。
上面提到的dispatch()方法代码如下:
@SuppressWarnings("unchecked") protected void dispatch(Event event) { //all events go thru this loop if (LOG.isDebugEnabled()) { LOG.debug("Dispatching the event " + event.getClass().getName() + "." + event.toString()); } // 根据事件event获取事件类型枚举类type Class<? extends Enum> type = event.getType().getDeclaringClass(); try{ // 根据事件类型枚举类type,从eventDispatchers中获取事件处理器EventHandler实例handler EventHandler handler = eventDispatchers.get(type); if(handler != null) { // 如果handler不为空,调用handler的handle()方法处理事件event handler.handle(event); } else { // 否则抛出异常,提示针对事件类型type的事件处理器handler没有注册 throw new Exception("No handler for registered for " + type); } } catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread", t); // If serviceStop is called, we should exit this thread gracefully. if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false && stopped == false) { LOG.info("Exiting, bbye.."); System.exit(-1); } } }
dispatch()方法还有一部分,就是当异常Throwable发生时的处理。正常情况下,如果是正常调用serviceStop()方法停止服务,那么当前线程应该优雅的退出,而这里,如果发生了异常,同时exitOnDispatchException配置为true,即发生异常时退出系统,且stopped为false,不是通过服务停止发生的异常,那么,系统非正常退出,System.exit(-1)。
以上就是整个AsyncDispatcher服务构造、初始化、启动、并处理的主要内容,下面我们再看下其服务停止方面的内容,serviceStop()方法如下:
@Override protected void serviceStop() throws Exception { // 如果标志位drainEventsOnStop为true,则 if (drainEventsOnStop) { // 标志位blockNewEvents设置为true,阻止新的事件被加入待处理队列 blockNewEvents = true; // 记录info级别Log信息 LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); // waitForDrained上通过synchronized进行同步: synchronized (waitForDrained) { // 如果队列中的事件还没有处理完(drained为false),同时事件处理调度线程eventHandlingThread仍然存活 while (!drained && eventHandlingThread.isAlive()) { // waitForDrained释放锁,等待1s waitForDrained.wait(1000); LOG.info("Waiting for AsyncDispatcher to drain."); } } } // 停止标志位stopped设置为true stopped = true; // 如果事件处理调度线程eventHandlingThread不为null, if (eventHandlingThread != null) { // 中断eventHandlingThread线程 eventHandlingThread.interrupt(); try { // 等待eventHandlingThread线程结束 eventHandlingThread.join(); } catch (InterruptedException ie) { LOG.warn("Interrupted Exception while stopping", ie); } } // stop all the components super.serviceStop(); }
服务停止时的处理流程如下:
1、如果标志位drainEventsOnStop为true,即AsyncDispatcher停止前需要先处理完待调度处理事件队列eventQueue中的事件,则:
1.1、标志位blockNewEvents设置为true,阻止新的事件被加入待处理队列;
1.2、记录info级别Log信息--AsyncDispatcher is draining to stop, igonring any new events;
1.3、waitForDrained上通过synchronized进行同步:如果队列中的事件还没有处理完(drained为false),同时事件处理调度线程eventHandlingThread仍然存活,waitForDrained释放锁,等待1s,并记录info级别Log信息--Waiting for AsyncDispatcher to drain,这里的wait其实是等待事件处理调度线程eventHandlingThread调度完eventQueue队列中的剩余事件,见上面线程run()方法解释;
2、停止标志位stopped设置为true,标志AsyncDispatcher服务已停止;
3、如果事件处理调度线程eventHandlingThread不为null,中断eventHandlingThread线程,上面也有对中断异常的处理;
4、等待eventHandlingThread线程结束;
5、调用父类AbstractService的serviceStop()方法(其实是个空方法)。
等等,上面说了那么多,是不是少了些什么?AsyncDispatcher是不是很像一个生产者消费者模型?通过上面看来,eventHandlingThread事件处理调度线程不断的从待调度处理事件队列eventQueue中取出事件进行处理的过程,就是消费过程。那么生产过程是怎样的呢?下面我们就这个问题展开分析。
正常的分析过程应该是看看哪些地方如何使用的AsyncDispatcher。这里,我们根据AsyncDispatcher中eventQueue的private特性,看看AsyncDispatcher中都有哪些地方会将事件加入到eventQueue队列,答案很明显而且是唯一的,在GenericEventHandler中handle()方法会通过eventQueue.put(event)往队列中添加数据,即所谓的生产过程。那么,这个GenericEventHandler是如何获得的呢?getEventHandler()方法告诉了我们答案,如下:
@Override public EventHandler getEventHandler() { if (handlerInstance == null) { handlerInstance = new GenericEventHandler(); } return handlerInstance; }
成员变量handlerInstance如果为空,构造一个GenericEventHandler实例赋值给成员变量handlerInstance,并返回,下次再获取的话,就可以直接返回handlerInstance了。而这个handlerInstance在AsyncDispatcher内部也是一个私有的事件处理器实例,仅仅在第一次调用getEventHandler()方法时才会完成初始化。现在你也会不会困惑上面我们为什么没有讲解handlerInstance,这里我们可以简单的归纳下:
handlerInstance是AsyncDispatcher内部一个私有的事件处理器实例,它负责处理添加待调度处理事件这一事件,虽然感觉说法有些拗口,但是AsyncDispatcher很聪明的将添加待调度处理事件这一生产过程也当作一个事件来处理,我们看下GenericEventHandler定义及其handle()方法,如下:
class GenericEventHandler implements EventHandler<Event> { public void handle(Event event) { // 如果blockNewEvents为true,即AsyncDispatcher服务停止过程正在发生, // 且阻止新的事件加入待调度处理事件队列eventQueue,直接返回 if (blockNewEvents) { return; } // 标志位drained设置为false,说明队列中尚有事件需要调度 drained = false; /* all this method does is enqueue all the events onto the queue */ // 获取队列eventQueue大小qSize int qSize = eventQueue.size(); // 每隔1000记录一条info级别日志信息,比如:Size of event-queue is 2000 if (qSize !=0 && qSize %1000 == 0) { LOG.info("Size of event-queue is " + qSize); } // 获取队列eventQueue剩余容量remCapacity int remCapacity = eventQueue.remainingCapacity(); // 如果剩余容量remCapacity小于1000,记录warn级别日志信息,比如:Very low remaining capacity in the event-queue: 888 if (remCapacity < 1000) { LOG.warn("Very low remaining capacity in the event-queue: " + remCapacity); } // 队列eventQueue中添加事件event try { eventQueue.put(event); } catch (InterruptedException e) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", e); } throw new YarnRuntimeException(e); } }; }
handle()方法的处理逻辑如下:
1、如果blockNewEvents为true,即AsyncDispatcher服务停止过程正在发生,且阻止新的事件加入待调度处理事件队列eventQueue,直接返回;
2、标志位drained设置为false,说明队列中尚有事件需要调度;
3、获取队列eventQueue大小qSize,每隔1000记录一条info级别日志信息,比如:Size of event-queue is 2000;
4、获取队列eventQueue剩余容量remCapacity,如果剩余容量remCapacity小于1000,记录warn级别日志信息,比如:Very low remaining capacity in the event-queue: 888;
5、队列eventQueue中添加事件event。
上面,我们讲了AsyncDispatcher作为一个Dispatcher接口实现者所实现的getEventHandler()方法了,下面我们再讲接口另外一个重要方法的实现,register()方法,即将指定事件类型枚举类eventType与事件处理器EventHandler实例的映射关系,注册到AsyncDispatcher的eventDispatchers集合,代码如下:
@SuppressWarnings("unchecked") @Override public void register(Class<? extends Enum> eventType, EventHandler handler) { /* check to see if we have a listener registered */ // 查看事件类型eventType对应的事件处理器EventHandler之前是否在eventDispatchers中注册过 EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType); LOG.info("Registering " + eventType + " for " + handler.getClass()); if (registeredHandler == null) {// 没有注册过 eventDispatchers.put(eventType, handler); } else if (!(registeredHandler instanceof MultiListenerHandler)){// 已经注册过,且不是MultiListenerHandler /* for multiple listeners of an event add the multiple listener handler */ // 构造一个MultiListenerHandler实例,将之前注册过的事件处理器registeredHandler连同这次需要注册的事件处理器handler, // 做为一个符合监听事件处理器注册到eventDispatchers MultiListenerHandler multiHandler = new MultiListenerHandler(); multiHandler.addHandler(registeredHandler); multiHandler.addHandler(handler); eventDispatchers.put(eventType, multiHandler); } else {// 已经注册过,且是MultiListenerHandler /* already a multilistener, just add to it */ // 已经是一个MultiListenerHandler的话,强制转换下 MultiListenerHandler multiHandler = (MultiListenerHandler) registeredHandler; // 直接追加注册新的handler multiHandler.addHandler(handler); } }
其实register()的方法逻辑很简单,总结如下:
1、查看事件类型eventType对应的事件处理器EventHandler之前是否在eventDispatchers中注册过,取出赋值给registeredHandler;
2、判断registeredHandler:
2.1、如果没有注册过,直接放入eventDispatchers进行注册;
2.2、如果已经注册过,且不是MultiListenerHandler:构造一个MultiListenerHandler实例,将之前注册过的事件处理器registeredHandler连同这次需要注册的事件处理器handler,做为一个多路复合监听事件处理器注册到eventDispatchers;
2.3、如果已经注册过,且是MultiListenerHandler:强制转换下,直接追加注册新的handler。
最后,我们再讲解下这个多路复合监听事件处理器MultiListenerHandler,它是AsyncDispatcher的内部类,也实现了EventHandler<Event>接口,其构造方法如下:
public MultiListenerHandler() { listofHandlers = new ArrayList<EventHandler<Event>>(); }
初始化处理器列表listofHandlers。
它的addHandler()方法就是将需要注册的新的事件处理器追加到listofHandlers列表中,代码如下:
void addHandler(EventHandler<Event> handler) { listofHandlers.add(handler); }
而作为一个事件处理器核心功能实现的handle()方法,如下:
@Override public void handle(Event event) { for (EventHandler<Event> handler: listofHandlers) { handler.handle(event); } }
依次有序遍历listofHandlers中的事件处理器,分别调用它们的handle方法进行事件处理,真正实现了它所设计的Multiplexing an event目标,将事件依次通过列表中的handler的处理。