由于想在项目中使用类似yarn的事件处理机制,就看了实现。主要是由Dispatcher.java,EventHandler.java,Service.java这3个类撑起来的。
在事件处理之前,先注册相应的事件处理handler,收到事件event后,由派发事件的Dispatcher进行派发,默认采用异步事件处理方式将事件放到事件队列(LinkedBlockingQueue)中,消费者会循环从队列中取出事件进行处理。
要使用事件处理,首先需要创建Dispatcher,示例代码如下:
dispatcher = new AsyncDispatcher();//创建 addIfService(dispatcher);// 由于继承AbstractService,可以方便完成服务统一管理,比如初始化和资源释放等操作 dispatcher.register(EventType.class,new EventHandler());//注册对应的事件处理方法
然后通过AsyncDispatcher调用getEventHandler()返回的EventHandler的处理对应事件,AsyncDispatcher类的getEventHandler()方法如下:
@Override public EventHandler getEventHandler() { if (handlerInstance == null) { handlerInstance = new GenericEventHandler();//如果没有注册生产事件处理,就走通用事件处理 } return handlerInstance; }
class GenericEventHandler implements EventHandler<Event> { public void handle(Event event) { if (blockNewEvents) { return; } /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); if (qSize !=0 && qSize %1000 == 0) { LOG.info("Size of event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { LOG.warn("Very low remaining capacity in the event-queue: " + remCapacity); } try { eventQueue.put(event);//放进队列 drained = false; } catch (InterruptedException e) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", e); } throw new RuntimeException(e); } }; }
上述完成生产,再看消费如下实现的:
@Override protected void serviceStart() throws Exception { //start all the components super.serviceStart(); eventHandlingThread = new Thread(createThread()); // 调用创建消费eventQueue队列中事件的线程 eventHandlingThread.setName("AsyncDispatcher event handler"); eventHandlingThread.start(); }
查看createThread()方法,如下所示:
Runnable createThread() { return new Runnable() { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { drained = eventQueue.isEmpty(); // blockNewEvents is only set when dispatcher is draining to stop, // adding this check is to avoid the overhead of acquiring the lock // and calling notify every time in the normal run of the loop. if (blockNewEvents) { synchronized (waitForDrained) { if (drained) { waitForDrained.notify(); } } } Event event; try { event = eventQueue.take(); } catch(InterruptedException ie) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", ie); } return; } if (event != null) { dispatch(event);//分发事件 } } } }; }
从eventQueue队列中取出Event,然后调用dispatch(event);来处理事件,看dispatch(event)方法,如下所示:
protected void dispatch(Event event) { //all events go thru this loop if (LOG.isDebugEnabled()) { LOG.debug("Dispatching the event " + event.getClass().getName() + "." + event.toString()); } Class<? extends Enum> type = event.getType().getDeclaringClass(); try{ EventHandler handler = eventDispatchers.get(type); //通过event获取事件类型,根据事件类型得到注册的EventHandler if(handler != null) { handler.handle(event); //EventHandler处理事件event } else { throw new Exception("No handler for registered for " + type); } } catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread", t); // If serviceStop is called, we should exit this thread gracefully. if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false && stopped == false) { LOG.info("Exiting, bbye.."); System.exit(-1); } } }
整个过程使用生产--消费者模型,异步事件处理,整体实现起来还是很简单的!
时间: 2024-09-14 07:21:39