Hadoop服务库与事件库的使用及其工作流程

Hadoop服务库:

&">nbsp;   YARN采用了基于服务的对象管理模型,主要特点有:

被服务化的对象分4个状态:NOTINITED,INITED,STARTED,STOPED 任何服务状态变化都可以触发另外一些动作 可通过组合方式对任意服务进行组合,统一管理

具体类请参见 org.apache.hadoop.service包下.核心接口是Service,抽象实现是AbstractService

YARN中,ResourceManager和NodeManager属于组合服务,内部包含多个单一和组合服务.以实现对内部多种服务的统一管理.

Hadoop事件库:

YARN采用事件驱动并发模型, 把各种逻辑抽象成事件,进入事件队列,然后由中央异步调度器负责传递给相应的事件调度器处理,或者调度器之间再传递,直至完成任务.

具体参见org.apache.hadoop.yarn.event.主要类和接口是:Event, AsyncDispatcher,EventHandler

按照惯例, 先给出一个Demo,然后顺着Demo研究代码实现.

示例我是直接抄<hadoop技术内幕>:

例子涉及如下几个模块:

Task
TaskType
Job
JobType
Dispatcher

package com.demo1;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.service.CompositeService;

import org.apache.hadoop.service.Service;

import org.apache.hadoop.yarn.event.AsyncDispatcher;

import org.apache.hadoop.yarn.event.Dispatcher;

import org.apache.hadoop.yarn.event.EventHandler;

/**

* Created by yang on 2014/8/25.

*/

public class SimpleService extends CompositeService {

private Dispatcher dispatcher;

private String jobID;

private int taskNum;

private String[] taskIDs;

public SimpleService(String name, String jobID, int taskNum) {

super(name);

this.jobID = jobID;

this.taskNum = taskNum;

this.taskIDs = new String[taskNum];

for (int i = 0; i < taskNum; i++) {

taskIDs[i] = new String(jobID + "_task_" + i);

}

}

public Dispatcher getDispatcher() {

return dispatcher;

}

public void serviceInit(Configuration conf) throws Exception {

dispatcher = new AsyncDispatcher();

dispatcher.register(JobEventType.class, new JobEventDIspatcher());

dispatcher.register(TaskEventType.class, new TaskEventDIspatcher());

addService((Service)dispatcher);

super.serviceInit(conf);

}

private class JobEventDIspatcher implements EventHandler<JobEvent> {

@Override

public void handle(JobEvent jobEvent) {

if (jobEvent.getType() == JobEventType.JOB_KILL) {

System.out.println("JOB KILL EVENT");

for (int i = 0; i < taskNum; i++) {

dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_KILL));

}

} else if (jobEvent.getType() == JobEventType.JOB_INIT) {

System.out.println("JOB INIT EVENT");

for (int i = 0; i < taskNum; i++) {

dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE));

}

}

}

}

private class TaskEventDIspatcher implements EventHandler<TaskEvent> {

@Override

public void handle(TaskEvent taskEvent) {

if (taskEvent.getType() == TaskEventType.T_KILL) {

System.out.println("TASK KILL EVENT" + taskEvent.getTaskID());

} else if (taskEvent.getType() == TaskEventType.T_SCHEDULE) {

System.out.println("TASK INIT EVENT" + taskEvent.getTaskID());

}

}

}

}

测试程序

package com.demo1;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.yarn.conf.YarnConfiguration;

/**

* Created by yang on 2014/8/25.

*/

public class Test {

public static void main(String[] args) throws Exception {

String jobID="job_1";

SimpleService ss = new SimpleService("test",jobID,5);

YarnConfiguration config = new YarnConfiguration(new Configuration());

ss.serviceInit(config);

ss.init(config);

ss.start();

ss.getDispatcher().getEventHandler().handle(new JobEvent(jobID,JobEventType.JOB_KILL));

ss.getDispatcher().getEventHandler().handle(new JobEvent(jobID,JobEventType.JOB_KILL));

}

}

不出意外的话,运行结果应该类似:

14/08/25 16:02:20 INFO event.AsyncDispatcher: Registering class com.yws.demo1.JobEventType for class com.yws.demo1.SimpleService$JobEventDIspatcher

14/08/25 16:02:42 INFO event.AsyncDispatcher: Registering class com.yws.demo1.TaskEventType for class com.yws.demo1.SimpleService$TaskEventDIspatcher

14/08/25 16:02:54 INFO event.AsyncDispatcher: Registering class com.yws.demo1.JobEventType for class com.yws.demo1.SimpleService$JobEventDIspatcher

14/08/25 16:03:03 INFO event.AsyncDispatcher: Registering class com.yws.demo1.TaskEventType for class com.yws.demo1.SimpleService$TaskEventDIspatcher

JOB KILL EVENT

JOB KILL EVENT

TASK KILL EVENTjob_1_task_0

TASK KILL EVENTjob_1_task_1

TASK KILL EVENTjob_1_task_2

TASK KILL EVENTjob_1_task_3

TASK KILL EVENTjob_1_task_4

TASK KILL EVENTjob_1_task_0

TASK KILL EVENTjob_1_task_1

TASK KILL EVENTjob_1_task_2

TASK KILL EVENTjob_1_task_3

TASK KILL EVENTjob_1_task_4

我们开始分析:

所谓的Task,Job,其实是按业务逻辑划分的, 他们都继承AbstractEvent类.

SimpleService是一个组合服务,里面放了EventHandler和Dispatcher

从Test开始,看看Service是如何创建的

构造函数比较简单,就是将一个job拆分成taskNum个Task

ss.serviceInit(config);做了什么呢:

创建一个中央事件调度器: AsyncDispatcher(具体实现我们在后文分析)

并把Job和Task的Event及2者对应的EventHandler注册到调度器中.

这里就是初始化和启动服务了.最后2行就是模拟2个事件的JOB_KILL事件.

我们进到ss.getDispatcher().getEventHandler(),发现他其实是创建一个GenericEventHandler

这个handler干什么是呢?

就是把

塞到BlockingQueue<Event> eventQueue; 中.

不知道你发现没有, 这个方法仅仅是一个入队操作啊. 那具体调用JobEventDIspatcher.handler是在什么地方呢?

这时联想到之前不是有个中央调度器嘛, AsyncDispatcher, Line 80行, 他创建了一个线程,并不断的从之前说的EventQueue中不断的取Event,然后执行,这里的执行也就是调用了具体的handler了

就这样一个基于事件驱动的程序这么完成了.

按照hadoop 早起版本中, 业务逻辑之间是通过函数调用方式实现的,也就是串行的. 现在基于事件驱动后,大大提高了并发性.很值得我们学习.

来张全家福:

HandlerThread就是前文说的那个隐藏线程. EventHandler会产生一些新的Event,然后又重新进入队列.循环.

时间: 2024-08-06 12:53:52

Hadoop服务库与事件库的使用及其工作流程的相关文章

eclipse-Linux下Eclipse导出Hadoop项目找不到库文件

问题描述 Linux下Eclipse导出Hadoop项目找不到库文件 求教各位hadoop技术大牛~~~~~ Linux环境下,Eclipse已经集成了Hadoop插件,运行Hadoop程序没有问题 代码:CLibrary Instance = (CLibrary) Native.loadLibrary("libNLPIR.so", CLibrary.class);能够成功加载libNLPIR.so文件,其中libNLPIR.so文件是用来分词的第三方库 但是导出成Runnable j

《Redis官方文档》Redis事件库

原文链接 译者:cndpzc Redis实现了自己的事件库,代码在ae.c中.想要理解Redis事件库的工作原理,最好的方法就是去理解Redis如何使用它. 事件循环初始化 redis.c中的initServer函数初始化了redisServer结构体变量的众多成员,其中一个就是Redis事件循环(event loop)el: 1 aeEventLoop *el initServer调用aeCreateEventLoop(定义在ae.c)初始化server.el的成员.aeEventLoop的定

Threejs 的场景查看 - 几个交互事件库助你方便查看场景

Threejs 的场景查看 - 几个交互事件库助你方便查看场景 太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:太阳火神的美丽人生 -  本博客专注于 敏捷开发及移动和物联设备研究:iOS.Android.Html5.Arduino.pcDuino,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. 先提到一篇 <使用 WebGL 进行 3D 开发,第 3 部分: 添加用户交

《Redis官方文档》事件库

究竟为什么需要一个事件库呢?让我们通过下面一系列问答来了解为什么. 问:你希望网络服务器持续不断地做什么事? 答:监听端口上进来的连接请求并接收它们. 问:调用套接字的Accept方法产生一个描述符,我们用这个描述符做什么? 答:保存这个描述符,并在它上面完成一次非阻塞读写操作. 问:为什么读写操作必须是非阻塞式的? 答:如果文件操作(在Unix系统中甚至于套接字都被描述成一个文件)是阻塞式的那么当它在一次文件I/O操作中被锁定时它怎么可能接收另外一个请求 问:我猜我必须在套接字上做很多次非阻塞

《Redis官方教程》- 事件库

为什么需要一个事件库(Event Library)? 让我们通过一系列Q&A来弄明白. Q:你期望网络服务器都做些什么事情? A:在它监听的端口上等待连接的到来,然后接收(accpet)它们. Q:调用accept会产生一个描述符,我该怎么处理它? A:先保存这个描述符,然后对它进行非阻塞(non-blocking)的读写(read/write)操作. Q:为什么读写操作必须用非阻塞的方式呢? A:如果服务器阻塞在文件(在Unix世界socket也是文件)I/O操作上,这期间它还怎么处理其他连接

拖库还是撞库?网易邮箱罗生门

       从乌云言之凿凿的报告和网友的如潮控诉来看,网易邮箱信息泄露,似乎已经被坐实. 然而,对于数据泄露的方式,网易和乌云存在着一定的分歧.那么,乌云在说什么,网易又在说什么呢?我们可以简单梳理一下. 1.10月中旬开始,不断在微博爆出iPhone被锁.用网易邮箱注册的iCloud服务出现异常等等事件. 2.10月18日,网易公开回应,称自身安全不存在问题,泄露是由于用户用与网易邮箱相同的资料在其他网站注册,而其他网站的资料泄露所致. 3.10月19日乌云发布报告,称发现网易存在漏洞,可能

Jquery库及其他库之间的$命名冲突怎么办

 首先我们应该知道,在jquery中,$(美元符号)就是jquery的别名,也就是说使用$和使用jquery是一样的,在很多时候我们命名空间时,正是因为这个$而产生的冲突的发生.比如说:$('#xmlas')和JQuery('#xmlas') 虽然在写法上不同,但在实际上却是完全等同的. 要想解决这个冲突,其实最简单的方法就是使用不同的名称来命名,或者让执行代码认为是不同的命名空间即可. 一. jQuery库在其他库之前导入,直接使用jQuery(callback)方法如: 代码如下 <html

Windows系统运行库/游戏运行库组件怎么补全?

  SecuLauncher:failed to start application.[2000], Windows运行库以及游戏运行库不全,运行程序或游戏容易出现各种各样的错误提示.而Windows运行库主要包括Microsoft Visual C++ 2005 Runtime,Microsoft Visual C++ 2008Runtime,Microsoft Visual C++ 2010 Runtim,Direct 9.0C,.Net Framework3.5,.Net Framewor

Python标准库与第三方库详解_python

本文详细罗列并说明了Python的标准库与第三方库如下,供对此有需要的朋友进行参考: Tkinter---- Python默认的图形界面接口. Tkinter是一个和Tk接口的模块,Tkinter库提供了对Tk API的接口,它属于Tcl/Tk的GUI工具组.Tcl/Tk是由John Ousterhout发展的书写和图形设备.Tcl(工具命令语言)是个宏语言,用于简化shell下复杂程序的开发,Tk工具包是和Tcl一起开发的, 目的是为了简化用户接口的设计过程.Tk工具包由许多不同的小部件,如一