Hadoop Yarn事件处理框架源码分析

由于想在项目中使用类似yarn的事件处理机制,就看了实现。主要是由Dispatcher.java,EventHandler.java,Service.java这3个类撑起来的。

 在事件处理之前,先注册相应的事件处理handler,收到事件event后,由派发事件的Dispatcher进行派发,默认采用异步事件处理方式将事件放到事件队列(LinkedBlockingQueue)中,消费者会循环从队列中取出事件进行处理。

要使用事件处理,首先需要创建Dispatcher,示例代码如下:

 dispatcher = new AsyncDispatcher();//创建
  addIfService(dispatcher);// 由于继承AbstractService,可以方便完成服务统一管理,比如初始化和资源释放等操作
  dispatcher.register(EventType.class,new EventHandler());//注册对应的事件处理方法

然后通过AsyncDispatcher调用getEventHandler()返回的EventHandler的处理对应事件,AsyncDispatcher类的getEventHandler()方法如下:

@Override
  public EventHandler getEventHandler() {
    if (handlerInstance == null) {
      handlerInstance = new GenericEventHandler();//如果没有注册生产事件处理,就走通用事件处理
    }
    return handlerInstance;
  }
class GenericEventHandler implements EventHandler<Event> {
    public void handle(Event event) {
      if (blockNewEvents) {
        return;
      }

      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {
        LOG.info("Size of event-queue is " + qSize);
      }
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      }
      try {
        eventQueue.put(event);//放进队列
        drained = false;
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        }
        throw new RuntimeException(e);
      }
    };
  }

上述完成生产,再看消费如下实现的:

@Override
protected void serviceStart() throws Exception {
  //start all the components
  super.serviceStart();
  eventHandlingThread = new Thread(createThread()); // 调用创建消费eventQueue队列中事件的线程
  eventHandlingThread.setName("AsyncDispatcher event handler");
  eventHandlingThread.start();
}

查看createThread()方法,如下所示:

Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          drained = eventQueue.isEmpty();
          // blockNewEvents is only set when dispatcher is draining to stop,
          // adding this check is to avoid the overhead of acquiring the lock
          // and calling notify every time in the normal run of the loop.
          if (blockNewEvents) {
            synchronized (waitForDrained) {
              if (drained) {
                waitForDrained.notify();
              }
            }
          }
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);//分发事件
          }
        }
      }
    };
  }

从eventQueue队列中取出Event,然后调用dispatch(event);来处理事件,看dispatch(event)方法,如下所示:

protected void dispatch(Event event) {
  //all events go thru this loop
  if (LOG.isDebugEnabled()) {
    LOG.debug("Dispatching the event " + event.getClass().getName() + "."
        + event.toString());
  }

  Class<? extends Enum> type = event.getType().getDeclaringClass();

  try{
    EventHandler handler = eventDispatchers.get(type); //通过event获取事件类型,根据事件类型得到注册的EventHandler
    if(handler != null) {
      handler.handle(event); //EventHandler处理事件event
    } else {
      throw new Exception("No handler for registered for " + type);
    }
  } catch (Throwable t) {
    //TODO Maybe log the state of the queue
    LOG.fatal("Error in dispatcher thread", t);
    // If serviceStop is called, we should exit this thread gracefully.
    if (exitOnDispatchException
        && (ShutdownHookManager.get().isShutdownInProgress()) == false
        && stopped == false) {
      LOG.info("Exiting, bbye..");
      System.exit(-1);
    }
  }
}

整个过程使用生产--消费者模型,异步事件处理,整体实现起来还是很简单的!

时间: 2024-09-14 07:21:39

Hadoop Yarn事件处理框架源码分析的相关文章

[cocos2dx]TestCpp框架源码分析

Cocos2d-x2.0 TestCpp框架源码分析                     [本版教程使用的Cocos2d-x版本为cocos2d-2.0-x-2.0.2]          好的引擎,会提供一系列完整的功能示例,Cocos2d-x之所以能得到很多人的喜爱,其重要的原因是它提供了丰富而易学的示例.在cocos2d-2.0-x-2.0.2中这些示例被放在一个名叫TestCpp的工程中,为了更好的学习Cocos2d-x的功能示例,我们今天来学习一下这个工程的框架结构.      

android-async-http框架源码分析

async-http使用地址 android-async-http仓库:git clone https://github.com/loopj/android-async-http 源码分析 我们在做网络请求的时候经常通过下面的方式实例化AsyncHttpClient client=new AsyncHttpClient();然后通过系统内置的请求发送请求,通过async内部的请求去做真正的网络请求. 首先得到的是AsyncHttpClient实例,所以从这里入手分析一下: <code class

[jjzhu学java]之JDK集合框架源码分析

Java Collection Collection接口 AbstractCollection类 AbstractList类 Vector类 Stack栈 ArrayList AbstractSequentialList LinkedList线性链表 Map接口 AbstractMap HashMap LinkedHashMap treeMap HashTable 总结 Java Collection 图中实线边框表示的是实现类(ArrayList, Hashtable等),虚线边框的是抽象类(

yii框架源码分析之创建controller代码

使用yii框架的url路径一般形如hostname/?r=xxxx/xxxx/xxxx&sdfs=dsfdsf 我们可以看到有时会使用protected目录下的controller,有时会使用module中controller,具体是如何处理的呢,请看如下的分析: 以下代码摘自yii框架核心代码%Yiiroot%/framework/web/CWebApplication.php 复制代码 代码如下: ===============================================

Linux驱动修炼之道-SPI驱动框架源码分析(上)【转】

转自:http://blog.csdn.net/lanmanck/article/details/6895318 SPI驱动架构,以前用过,不过没这个详细,跟各位一起分享: 来自:http://blog.csdn.net/woshixingaaa/article/details/6574215     SPI协议是一种同步的串行数据连接标准,由摩托罗拉公司命名,可工作于全双工模式.相关通讯设备可工作于m/s模式.主设备发起数据帧,允许多个从设备的存在.每个从设备 有独立的片选信号,SPI一般来说

PHP yii框架源码阅读(二) 整体执行流程分析

一 程序入口 <?php // change the following paths if necessary $yii=dirname(__FILE__).'/http://www.cnblogs.com/framework/yii.php'; $config=dirname(__FILE__).'/protected/config/main.php'; // remove the following line when in production mode // defined('YII_D

Hadoop2源码分析-YARN RPC 示例介绍

1.概述 之前在<Hadoop2源码分析-RPC探索实战>一文当中介绍了Hadoop的RPC机制,今天给大家分享关于YARN的RPC的机制.下面是今天的分享目录: YARN的RPC介绍 YARN的RPC示例 截图预览 下面开始今天的内容分享. 2.YARN的RPC介绍 我们知道在Hadoop的RPC当中,其主要由RPC,Client及Server这三个大类组成,分别实现对外提供编程接口.客户端实现及服务端实现.如下图所示:     图中是Hadoop的RPC的一个类的关系图,大家可以到<

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)

        本文继<Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)>,接着讲述MapReduce作业在MRAppMaster上处理总流程,继上篇讲到作业初始化之后的作业启动,关于作业初始化主体流程的详细介绍,请参见<Yarn源码分析之MRAppMaster上MapReduce作业初始化解析>一文.         (三)启动         作业的启动是通过MRAppMaster的startJobs()方法实现的,其代码如下: /** * Th

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

        我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行.如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点.         通过MRApp