Hadoop实战之 hadoop作业调度详解

对Hadoop的最感兴趣的地方,也就在于Hadoop的作业调度了,在正式介绍如何搭建Hadoop之前,深入理解一下Hadoop的作业调度很有必要。我们不一定能用得上Hadoop,但是如果理通顺Hadoop的分布式调度原理,在有需要的时候未必不能自己写一个Mini Hadoop~: )

开始

Map/Reduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的,Google已经将它完整的MapReduce论 文公开发布了。其中对它的定义是,Map/Reduce是一个编程模型(programming model),是一个用于处理和生成大规模数据集(processing and generating large data sets)的相关的实现。用户定义一个map函数来处理一个key/value对以生成一批中间的key/value对,再定义一个reduce函数将所 有这些中间的有着相同key的values合并起来。很多现实世界中的任务都可用这个模型来表达。

Hadoop的Map/Reduce框架也是基于这个原理实现的,下面简要介绍一下Map/Reduce框架主要组成及相互的关系。

2.1       总体结构2.1.1            Mapper和Reducer

运行于Hadoop的MapReduce应用程序最基本的组成部分包括一个Mapper和一个Reducer类,以及一个创建JobConf的执行程序,在一些应用中还可以包括一个Combiner类,它实际也是Reducer的实现。

2.1.2            JobTracker和TaskTracker

它们都是由一个master服务JobTracker和多个运行于多个节点的slaver服务TaskTracker两个类提供的服务调度的。 master负责调度job的每一个子任务task运行于slave上,并监控它们,如果发现有失败的task就重新运行它,slave则负责直接执行每 一个task。TaskTracker都需要运行在HDFS的DataNode上,而JobTracker则不需要,一般情况应该把JobTracker 部署在单独的机器上。

2.1.3            JobClient

每一个job都会在用户端通过JobClient类将应用程序以及配置参数Configuration打包成jar文件存储在HDFS,并把路径提 交到JobTracker的master服务,然后由master创建每一个Task(即MapTask和ReduceTask)将它们分发到各个 TaskTracker服务中去执行。

2.1.4            JobInProgress

JobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。 JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目 的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。

2.1.5            TaskInProgress

JobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和 ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此 TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。启动具体的 Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job.jar,并设置好环境 变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。

2.1.6            MapTask和ReduceTask

一个完整的job会自动依次执行Mapper、Combiner(在JobConf指定了Combiner时执行)和Reducer,其中 Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask调用,Combiner实际也是Reducer接口 类的实现。Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了 Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。MapTask的任务全完 成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。这个过程在下一部分再详细介绍。

下图描述了Map/Reduce框架中主要组成和它们之间的关系:

2.2       Job创建过程2.2.1            JobClient.runJob() 开始运行job并分解输入数据集

一个MapReduce的Job会通过JobClient类根据用户在JobConf类中定义的InputFormat实现类来将输入的数据集分解成一批小的数据集,每一个小数据集会对应创建一个MapTask来处理。JobClient会使用缺省的FileInputFormat类调用FileInputFormat.getSplits()方 法生成小数据集,如果判断数据文件是isSplitable()的话,会将大的文件分解成小的FileSplit,当然只是记录文件在HDFS里的路径及 偏移量和Split大小。这些信息会统一打包到jobFile的jar中并存储在HDFS中,再将jobFile路径提交给JobTracker去调度和 执行。

2.2.2            JobClient.submitJob() 提交job到JobTracker

jobFile的提交过程是通过RPC模块(有单独一章来详细介绍)来实现的。大致过程是,JobClient类中通过RPC实现的Proxy接口 调用JobTracker的submitJob()方法,而JobTracker必须实现JobSubmissionProtocol接口。 JobTracker则根据获得的jobFile路径创建与job有关的一系列对象(即JobInProgress和TaskInProgress等)来 调度并执行job。

JobTracker创建job成功后会给JobClient传回一个JobStatus对象用于记录job的状态信息,如执行时间、Map和 Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个NetworkedJob的RunningJob对象,用于 定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。

与创建Job过程相关的类和方法如下图所示

2.3       Job执行过程

上面已经提到,job是统一由JobTracker来调度的,具体的Task分发给各个TaskTracker节点来执行。下面通过源码来详细解析执行过程,首先先从JobTracker收到JobClient的提交请求开始。

2.3.1            JobTracker初始化Job和Task队列过程2.3.1.1     JobTracker.submitJob() 收到请求

当JobTracker接收到新的job请求(即submitJob()函数被调用)后,会创建一个JobInProgress对象并通过它来管理 和调度任务。JobInProgress在创建的时候会初始化一系列与任务有关的参数,如job jar的位置(会把它从HDFS复制本地的文件系统中的临时目录里),Map和Reduce的数据,job的优先级别,以及记录统计报告的对象等。

2.3.1.2     JobTracker.resortPriority() 加入队列并按优先级排序

JobInProgress创建后,首先将它加入到jobs队列里,分别用一个map成员变量jobs用来管理所有jobs对象,一个list成员 变量jobsByPriority用来维护jobs的执行优先级别。之后JobTracker会调用resortPriority()函数,将jobs先 按优先级别排序,再按提交时间排序,这样保证最高优先并且先提交的job会先执行。

2.3.1.3     JobTracker.JobInitThread 通知初始化线程

然后JobTracker会把此job加入到一个管理需要初始化的队列里,即一个list成员变量jobInitQueue里。通过此成员变量调用 notifyAll()函数,会唤起一个用于初始化job的线程JobInitThread来处理(JobTracker会有几个内部的线程来维护 jobs队列,它们的实现都在JobTracker代码里,稍候再详细介绍)。JobInitThread收到信号后即取出最靠前的job,即优先级别最 高的job,调用JobInProgress的initTasks()函数执行真正的初始化工作。

2.3.1.4     JobInProgress.initTasks() 初始化TaskInProgress

Task的初始化过程稍复杂些,首先步骤JobInProgress会创建Map的监控对象。在initTasks()函数里通过调用 JobClient的readSplitFile()获得已分解的输入数据的RawSplit列表,然后根据这个列表创建对应数目的Map执行管理对象 TaskInProgress。在这个过程中,还会记录该RawSplit块对应的所有在HDFS里的blocks所在的DataNode节点的 host,这个会在RawSplit创建时通过FileSplit的getLocations()函数获取,该函数会调用 DistributedFileSystem的getFileCacheHints()获得(这个细节会在HDFS模块中讲解)。当然如果是存储在本地文 件系统中,即使用LocalFileSystem时当然只有一个location即“localhost”了。

其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1个 Reduce任务。监控和调度Reduce任务的也是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参 数分别创建具体的MapTask或者ReduceTask。

JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,然后再调用 JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束,执行 则是通过另一异步的方式处理的,下面接着介绍它。

与初始化Job过程相关的类和方法如下图所示

2.3.2            TaskTracker执行Task的过程

Task的执行实际是由TaskTracker发起的,TaskTracker会定期(缺省为10秒钟,参见MRConstants类中定义的 HEARTBEAT_INTERVAL变量)与JobTracker进行一次通信,报告自己Task的执行状态,接收JobTracker的指令等。如果 发现有自己需要执行的新任务也会在这时启动,即是在TaskTracker调用JobTracker的heartbeat()方法时进行,此调用底层是通 过IPC层调用Proxy接口(在IPC章节详细介绍)实现。这个过程实际比较复杂,下面一一简单介绍下每个步骤。

2.3.2.1     TaskTracker.run() 连接JobTracker

TaskTracker的启动过程会初始化一系列参数和服务(另有单独的一节介绍),然后尝试连接JobTracker服务(即必须实现 InterTrackerProtocol接口),如果连接断开,则会循环尝试连接JobTracker,并重新初始化所有成员和参数,此过程参见 run()方法。

2.3.2.2     TaskTracker.offerService() 主循环

如果连接JobTracker服务成功,TaskTracker就会调用offerService()函数进入主执行循环中。这个循环会每隔10秒 与JobTracker通讯一次,调用transmitHeartBeat()获得HeartbeatResponse信息。然后调用 HeartbeatResponse的getActions()函数获得JobTracker传过来的所有指令即一个TaskTrackerAction 数组。再遍历这个数组,如果是一个新任务指令即LaunchTaskAction则调用startNewTask()函数执行新任务,否则加入到 tasksToCleanup队列,交给一个taskCleanupThread线程来处理,如执行KillJobAction或者 KillTaskAction等。

2.3.2.3     TaskTracker.transmitHeartBeat() 获取JobTracker指令

在transmitHeartBeat()函数处理中,TaskTracker会创建一个新的TaskTrackerStatus对象记录目前任务 的执行状况,然后通过IPC接口调用JobTracker的heartbeat()方法发送过去,并接受新的指令,即返回值 TaskTrackerAction数组。在这个调用之前,TaskTracker会先检查目前执行的Task数目以及本地磁盘的空间使用情况等,如果可 以接收新的Task则设置heartbeat()的askForNewTask参数为true。操作成功后再更新相关的统计信息等。

2.3.2.4     TaskTracker.startNewTask() 启动新任务

此函数的主要任务就是创建TaskTracker$TaskInProgress对象来调度和监控任务,并把它加入到runningTasks队列中。完成后则调用localizeJob()真正初始化Task并开始执行。

2.3.2.5     TaskTracker.localizeJob() 初始化job目录等

此函数主要任务是初始化工作目录workDir,再将job jar包从HDFS复制到本地文件系统中,调用RunJar.unJar()将包解压到工作目录。然后创建一个RunningJob并调用 addTaskToJob()函数将它添加到runningJobs监控队列中。完成后即调用launchTaskForJob()开始执行Task。

2.3.2.6     TaskTracker.launchTaskForJob() 执行任务

启动Task的工作实际是调用TaskTracker$TaskInProgress的launchTask()函数来执行的。

2.3.2.7     TaskTracker$TaskInProgress.launchTask() 执行任务

执行任务前先调用localizeTask()更新一下jobConf文件并写入到本地目录中。然后通过调用Task的createRunner()方法创建TaskRunner对象并调用其start()方法最后启动Task独立的java执行子进程。

2.3.2.8     Task.createRunner() 创建启动Runner对象

Task有两个实现版本,即MapTask和ReduceTask,它们分别用于创建Map和Reduce任务。MapTask会创建MapTaskRunner来启动Task子进程,而ReduceTask则创建ReduceTaskRunner来启动。

2.3.2.9     TaskRunner.start() 启动子进程真正执行Task

这里是真正启动子进程并执行Task的地方。它会调用run()函数来处理。执行的过程比较复杂,主要的工作就是初始化启动java子进程的一系列 环境变量,包括设定工作目录workDir,设置CLASSPATH环境变量等(需要将TaskTracker的环境变量以及job jar的路径合并起来)。然后装载job jar包,调用runChild()方法启动子进程,即通过ProcessBuilder来创建,同时子进程的stdout/stdin/syslog的 输出定向到该Task指定的输出日志目录中,具体的输出通过TaskLog类来实现。这里有个小问题,Task子进程只能输出INFO级别日志,而且该级 别是在run()函数中直接指定,不过改进也不复杂。

与Job执行过程相关的类和方法如下图所示

本文转载自:http://www.cnblogs.com/shipengzhi/articles/2487429.html

12下一页

时间: 2024-09-21 05:05:50

Hadoop实战之 hadoop作业调度详解的相关文章

《Hadoop海量数据处理:技术详解与项目实战》一1.2 Hadoop和大数据

1.2 Hadoop和大数据 Hadoop海量数据处理:技术详解与项目实战 在人们对云计算这个词汇耳熟能详之后,大数据这个词汇又在最短时间内进入大众视野.云计算对于普通人来说就像云一样,一直没有机会能够真正感受到,而大数据则更加实际,是确确实实能够改变人们生活的事物.Hadoop从某个方面来说,与大数据结合得更加紧密,它就是为大数据而生的. 1.2.1 大数据的定义 "大数据"(big data),一个看似通俗直白.简单朴实的名词,却无疑成为了时下IT界最炙手可热的名词,在全球引领了新

《Hadoop海量数据处理:技术详解与项目实战》一1.1 Hadoop和云计算

1.1 Hadoop和云计算 Hadoop海量数据处理:技术详解与项目实战 Hadoop从问世之日起,就和云计算有着千丝万缕的联系.本节将在介绍Hadoop的同时,介绍Hadoop和云计算之间的关系,为后面的学习打下基础. 1.1.1 Hadoop的电梯演讲 如果你是一名创业者或者是一名项目经理,那么最好准备一份"电梯演讲".所谓电梯演讲,是对自己产品的简单介绍,通常都是1-2分钟(电梯从1层-30层的时间),以便如果你恰巧和投资人挤上同一部电梯的时候,能够说服他投资你的项目或者产品.

《Hadoop海量数据处理:技术详解与项目实战》一导读

前 言 Hadoop海量数据处理:技术详解与项目实战 为什么要写这本书 2013年被称为"大数据元年",标志着世界正式进入了大数据时代,而就在这一年,我加入了清华大学苏州汽车研究院大数据处理中心,从事Hadoop的开发.运维和数据挖掘等方面的工作.从出现之日起,Hadoop就深刻地改变了人们处理数据的方式.作为一款开源软件,Hadoop能让所有人享受到大数据红利,让所有人在大数据时代站在了同一起跑线上.Hadoop很好地诠释了什么是"大道至简,衍化至繁",Hadoo

《Hadoop海量数据处理:技术详解与项目实战》一 3.2 HDFS读取文件和写入文件

3.2 HDFS读取文件和写入文件 Hadoop海量数据处理:技术详解与项目实战我们知道在HDFS中,NameNode作为集群的大脑,保存着整个文件系统的元数据,而真正数据是存储在DataNode的块中.本节将介绍HDFS如何读取和写入文件,组成同一文件的块在HDFS的分布情况如何影响HDFS读取和写入速度. 3.2.1 块的分布HDFS会将文件切片成块并存储至各个DataNode中,文件数据块在HDFS的布局情况由NameNode和hdfs-site.xml中的配置dfs.replicatio

《Hadoop海量数据处理:技术详解与项目实战》一第1章 绪论

第1章 绪论 Hadoop海量数据处理:技术详解与项目实战本章作为绪论,目的是在学习Hadoop之前,让读者理清相关概念以及这些概念之间的联系.

《Hadoop海量数据处理:技术详解与项目实战》一3.1 认识HDFS

3.1 认识HDFS Hadoop海量数据处理:技术详解与项目实战HDFS的设计理念源于非常朴素的思想:当数据集的大小超过单台计算机的存储能力时,就有必要将其进行分区(partition)并存储到若干台单独的计算机上,而管理网络中跨多台计算机存储的文件系统称为分布式文件系统(distribute filesystem).该系统架构于网络之上,势必会引入网络编程的复杂性,因此分布式文件系统比普通文件系统更为复杂,例如,使文件系统能够容忍节点故障且不丢失任何数据,就是一个极大的挑战.通过本章的介绍,

《Hadoop海量数据处理:技术详解与项目实战(第2版)》一2.7 Cloudera Manager

2.7 Cloudera Manager 读到这里的时候,读者可能觉得安装Hadoop是一件比较麻烦的事情,特别是在需要安装的组件特别多.安装的主机特别多的情况下(例如几百台),这种安装方式就不太可取了.在CDH中,有一个特殊的组件Cloudera Manager,它正是考虑到用户的这种需求.它的主要功能有3个:集群自动化安装部署.集群监控和集群运维. 在CDH4时,Cloudera Manager还有集群规模限制,而在CDH5中,则去除了这个限制.这样中小企业在使用Cloudera Manag

Hadoop新MapReduce框架Yarn详解

Hadoop MapReduceV2(Yarn) 框架简介 原 Hadoop MapReduce 框架的问题 对于业界的大数据存储及分布式 处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架的介绍在此不再累述,读者 可参考 Hadoop 官方简介.使用和学习过老 Hadoop 框架(0.20.0 及之前版本)的同仁应该很熟悉如下的原 MapReduce 框 架图: 图 1.Hadoop 原 MapReduce 架构 从上图中可以清楚的看出原 Map

hadoop中典型Writable类详解

Hadoop将很多Writable类归入org.apache.hadoop.io包中,在这些类中,比较重要的有Java基本类.Text.Writable集合.ObjectWritable等,重点介绍Java基本类和ObjectWritable的实现. 1. Java基本类型的Writable封装 目前Java基本类型对应的Writable封装如下表所示.所有这些Writable类都继承自WritableComparable.也就是说,它们是可比较的.同时,它们都有get()和set()方法,用于