MapReduce V1:JobTracker端Job/Task数据结构

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。
在编写MapReduce程序时,我们是以Job为单位进行编程处理,一个应用程序可能由一组Job组成,而MapReduce框架给我们暴露的只是一些Map和Reduce的函数接口,在运行期它会构建对应MapTask和ReduceTask,所以我们知道一个Job是由一个或多个MapTask,以及0个或1个ReduceTask组成。而对于MapTask,它是根据输入的数据文件的的逻辑分片(InputSplit)而定的,通常有多少个分片就会有多少个MapTask;而对于ReduceTask,它会根据我们编写的MapReduce程序配置的个数来运行。
有了这些信息,我们能够预想到,在Job运行过程中,无非也需要维护与这些Job/Task相关的一些状态信息,通过一定的调度策略来管理Job/Task的运行。这里,我们主要关注JobTracker端的一些非常有用的数据结构:JobTracker、JobInProgress、TaskInProgress,来熟悉各种数据结构的定义及作用。

数据结构总体抽象

MapReduce框架就为了运行Job,所以我们基于Job的抽象来对JobTracker端的相关对象进行抽象,总体上理解它们之间的关系,如下图所示:


在JobTracker端,通过维护JobInProgress的信息来跟踪Job的运行生命周期,那么,JobTracker端肯定有一个用来维护所有Job状态的JobInProgress对象集合。而Job又是由Task组成,所以自然而然JobInProgress中应该有对Task运行状态的维护,Task的状态在JobTracker端通过TaskInProgress来抽象。一个Task可能运行失败,所以可能经过多次运行才能成功,而每一次运行会对应一个TaskAttempID,那么一个TaskInProgress又可能对应着多个TaskAttempID。

TaskInProgress数据结构

  • TaskAttemptID结构

一个TaskInProgress结构中包含了3个TaskAttemptID类型的数据,如下图所示:

  • TaskSplitMetaInfo结构

JobTracker会创建每一个Task需要运行Split的信息,包含了该Split所在的位置信息、起始偏移量、总输入字节数,结构图如下所示:

其他结构

结构图 说明
1 TreeSet<TaskAttemptID> tasks

一个TaskInProgress包含的TaskAttemptID的集合。
一个Task(MapTask/ReduceTask)可能包含多个TaskAttemptID在TaskTracker上运行,比如一个
ReduceTask在TaskTracker上运行,同时可能存在一个推测执行的ReduceTask,他们对应了2个不同的
TaskAttemptID。

1 TreeMap<TaskAttemptID, String> activeTasks

维护了当前运行的Task,该Task运行在哪个TaskTracker上。

1 TreeMap<TaskAttemptID, String> cleanupTasks

记录了某个cleanup task在哪个TaskTracker上。

1 TreeSet<TaskAttemptID> tasksReportedClosed

满足如下3种条件的Task会被加入到该数据结构中:

  • this.failed)
    || ((job.getStatus().getRunState() != JobStatus.RUNNING &&
    (job.getStatus().getRunState() != JobStatus.PREP))
  • isComplete() && !(isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid))
  • isCommitPending(taskid) && !shouldCommit(taskid)

该数据结构用来辅助判断,是否一个Task已经完成(成功/失败),需要被TaskTracker终止掉,这个需要JobTracker发送KillTaskAction指令,通知TaskTracker终止该Task运行。

1 TreeMap<TaskAttemptID, Boolean> tasksToKill

记录了某个Task是否需要被Kill掉。


1 TreeSet<String> machinesWhereFailed

记录了执行Task失败的TaskTracker的host信息。

JobInProgress数据结构

  • JobID结构

JobID的结构,如下图所示:

上图中jtIdentifier的值为job,它是组成一个Job的ID字符串的前缀,唯一标识一个Job的完整ID的组成,如下所示:

1 job_<JobTracker启动时间字符串>_<序号>

例如,一个Job的ID字符串为job_200912121733_0002 。

  • JobProfile结构

JobProfile描述了一个Job的基本信息,它的结构,如下图所示:



通过上图可以看出,JobProfile包含了一个Job的如下信息:

标识名称 类型 说明
jobid JobID 唯一标识一个Job的ID,例如:job_200912121733_0002
user String 提交的该Job的所属用户名称,例如:shirdrn
url String 在Web UI页面上查看该Job信息的链接,例如:http://jobtracker.hadoopcluster.com:8080/jobdetails.jsp?jobid=job_200912121733_0002
name String 提交Job的用户为该Job设置的名称字符串,例如:ChainUserEventsJob
jobFile String 该Job所对应的配置文件,例如:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml
queueName String 提交的该Job所在的队列的名称,例如:default
  • 组成Job的Task的信息

一个Job可能包含多个Task(MapTask/ReduceTask),每个Task在JobTracker端使用TaskInProgress结构来跟踪Task的信息,一个Job由下面4组结构来表示这些信息,如下图所示:

上图中出现了4种类型的Task,我们需要明白每种Task的作用是什么。一个Job在调度时,需要分解为上述4种类型的Task,基于类型来说明,一个Job对应的这4种类型的Task的运行顺序为:setup task、MapTask、ReduceTask、cleanup task,其中setup task和cleanup task运行也需要申请slot来运行,map setup运行需要占用Map Slot,而reduce setup运行需要占用Reduce Slot,对于cleanup task也是类似的。
这里说明一下cleanup task和setup task的作用。其实在JobTracker端来看,setup task、cleanup task都与MapTask、ReduceTask使用相同的TaskInProgress数据结构来维护状态。setup task主要是在一个Job开始运行之前,初始化一些状态信息,由于存在MapTask和ReduceTask两种计算型Task,那么对应就存在map setup task和reduce setup task两种setup task。cleanup task主要是在一个Job运行结束后,负责清理在TaskTracker上运行的Task生成的临时数据,更新TaskTracker端维护的相关对象的状态信息,等等,类似地也存在map cleanup task和reduce cleanup task两种cleanup task。

  • JobStatus结构

JobStatus结构定义一个Job的当前状态信息,如下图所示:



除了定义Job运行状态信息,还包含了其他信息,如下表所示:

标识名称 类型 说明
jobid JobID 唯一标识一个Job的ID,例如:job_200912121733_0002
mapProgress float MapTask运行进度百分比
reduceProgress float ReduceTask运行进度百分比
cleanupProgress float cleanup task运行进度百分比
setupProgress float setup task运行进度百分比
runState int Job运行状态:RUNNING = 1;SUCCEEDED = 2;FAILED = 3;PREP = 4;KILLED = 5;
user String 提交的该Job的所属用户名称,例如:shirdrn
priority JobPriority Job优先级信息,是一个枚举类型,包含如下优先级:VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW
schedulingInfo String Job调度信息
jobACLs Map 该Job设置的ACL(访问控制列表)列表信息
  • Counters结构

Counters包含了一组计数器,用来跟踪一个Job运行的信息,结构如下图所示:



每个Job都包含一组Counter计数器,如下表所示:

标识名称 类型
NUM_FAILED_MAPS 失败的MapTask数量
NUM_FAILED_REDUCES 失败的ReduceTask数量
TOTAL_LAUNCHED_MAPS 所有启动的 MapTask数量
TOTAL_LAUNCHED_REDUCES 所有启动的 ReduceTask数量
OTHER_LOCAL_MAPS 其他Local MapTask数量
DATA_LOCAL_MAPS DATA_LOCAL的MapTask数量
NODEGROUP_LOCAL_MAPS NODEGROUP_LOCAL的MapTask数量
RACK_LOCAL_MAPS RACK_LOCAL的MapTask数量
SLOTS_MILLIS_MAPS 被占用的Map slot的“Slot个数 * (结束时间 – 开始时间)”
SLOTS_MILLIS_REDUCES 被占用的Reduce slot:“Slot个数 * (结束时间 – 开始时间)”
FALLOW_SLOTS_MILLIS_MAPS 空闲Map Slot:“(当前时间 – 开始时间) * Slot个数”
FALLOW_SLOTS_MILLIS_REDUCES 空闲Reduce Slot:“(当前时间 – 开始时间) * Slot个数”
  • 其他结构

JobInProgress中使用了大量的集合来维护Job/Task相关的状态信息,具体内容如下表所示:

结构图 说明
1 Map<Node, List<TaskInProgress>> nonRunningMapCache

JobTracker端维护了某个Node上,没有运行的MapTask列表的信息。
在调度MapTask之前,需要计算某个MapTask将要运行在哪些Node上,这里维护了某个Node所对应的没有运行的MapTask的列表信息。

1 Map<Node, Set<TaskInProgress>> runningMapCache

JobTracker端维护了某个Node上,当前正在运行的MapTask列表的信息。

1 List<TaskInProgress> nonLocalMaps

JobTracker端维护的、非Local,并且还没有运行的MapTask的列表。

1 SortedSet<TaskInProgress> failedMaps

JobTracker端维护了失败的MapTask的信息,在该集合中的TaskInProgress基于失败次数降序排序。
当某个MapTask失败以后,就会被放到该集合中,后续重新调度MapTask运行时,会检索该集合。

1 Set<TaskInProgress> nonLocalRunningMaps

JobTracker端维护的、 非Local、正在运行的MapTask保存在该数据结构中。

1 Set<TaskInProgress> nonRunningReduces

JobTracker端维护的、没有运行的ReduceTask的列表。

1 List<TaskAttemptID> mapCleanupTasks

为MapTask运行的cleanup task列表。

1 List<TaskAttemptID> reduceCleanupTasks

为ReduceTask运行的cleanup task列表。

1 List<TaskCompletionEvent> taskCompletionEvents

TaskCompletionEvent是用来跟踪Task完成事件的数据结构,该列表结构保存了TaskCompletionEvent。
当一个Task的状态为TaskStatus.State.SUCCEEDED/TaskStatus.State.FAILED/TaskStatus.State.KILLED的时候,会创建一个对应的TaskCompletionEvent对象,根据该对象来更新JobTracker端维护的Task的状态信息。

1 Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap

TaskTracker发送心跳的时候,会将TaskTracker的状态信息发送给JobTracker,状态信息通过TaskTrackerStatus表示,该对象中包含了Task的报告信息TaskStatus。如果是在运行ReduceTask时,抓取MapTask输出的结果失败时,会在根据Task报告信息,更新JobTracker端维护的mapTaskIdToFetchFailuresMap,记录了Task抓取MapTask输出失败的次数计数信息。

1 Map<TaskType, Long> firstTaskLaunchTimes

TaskType枚举类型定义如下:

1 public enum TaskType {
2 MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP
3 }

该firstTaskLaunchTimes数据结构保存了某个TaskType类型第一次运行的时间戳信息。

1 Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps

FallowSlotInfo包含了空闲的slot信息,主要九个包含了空闲的slot的个数信息。
该数据结构维护了在某个TaskTracker上为MapTask运行所预留的空闲slot的信息。

1 Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces

该数据结构维护了在某个TaskTracker上为ReduceTask运行所预留的空闲slot的信息。

JobTracker数据结构

JobTracker通过在内存中维护有关Job、Task相关的所有信息,来跟踪他们运行、交互过程中所发生的数据交换,等等,如下表所示:

结构图 说明
1 List<ServicePlugin> plugins

通过ServicePlugin接口,可以基于任意的RPC协议暴露DataNode或NameNode的功能。
通过配置项mapreduce.jobtracker.plugins可以设置ServicePlugin,JobTracker启动的时候会加载初始化配置的ServicePlugin。

1 List<JobInProgressListener> jobInProgressListeners

JobTracker维护了一组JobInProgressListener监听器,在JobTracker运行过程中,发生某些事件会触发注册的JobInProgressListener的执行。比如,JobClient提交一个Job,JobTracker端会触发对应的JobInProgressListener调用jobAdded()初始化该Job;比如,Job执行过程中状态发生变更,会触发JobInProgressListener调用jobUpdated()执行;比如,Job运行完成,会触发obInProgressListener调用jobRemoved()执行。
JobTracker初始化时会创建TaskScheduler,而启动TaskScheduler的时候,会把TaskScheduler所维护的JobInProgressListener添加到jobInProgressListeners列表中。

1 Map<JobID, JobInProgress> jobs

JobTracker维护一个JobID->JobInProgress映射的列表,JobID标识一个提交的Job,JobInProgress是JobTracker端维护的Job的所有信息的数据结构。在如下情况下,会检索/操作该jobs数据结构:

  • JobClient提交Job的时候,会创建JobInProgress,并加入到jobs集合中
  • JobClient远程调用Kill掉指定Job的时候,会根据JobID从jobs中获取JobInProgress信息,并Kill掉该Job,更新状态信息
  • JobClient查询当前运行的所有Job信息时,会检索jobs列表
  • 在JobTracker端检索一个Job所维护的Task信息时,会根据JobInProgress所维护的数据结构获取到对应的Task的信息TaskInProgress
  • Job运行状态不为RUNNING,并且也不为PREP,并且完成时间早于当前时间,会将Job从jobs列表删除
  • JobTracker解析接收到的TaskTracker发送的心跳的过程中,会检索并更新jobs列表中的Job信息,找到可以分配给该TaskTracker的属于满足条件的Job所包含的Task
1 TreeMap<String, ArrayList<JobInProgress>> userToJobsMap

用来跟踪某个用户提交的需要运行的Job集合的数据结构。
当Job完成(success/failure/killed)后,会在JobTracker内存中保存一些Job,这些Job属于哪些用户的。默认情况下会保存MAX_COMPLETE_USER_JOBS_IN_MEMORY=100个用户的已完成的Job,当超过该值时,会清理掉最早的用户以及对应的完成的Job信息。
可以通过配置项mapred.jobtracker.completeuserjobs.maximum来设置该值。

1 Map<String, Set<JobID>> trackerToJobsToCleanup

用来跟踪某个TaskTracker上运行的Job集合的数据结构。
当一个Job已经运行完成,TaskTracker需要知道哪些运行在该节点上的Job已经完成,并等待通知进行清理,这时会在JobTracker端检索该Map,取出该TaskTracker对应的需要进行清理的Job的集合。
另外,还有一种情况,当JobTracker一段时间内没有收到TaskTracker发送的心跳报告,这时会将该TaskTracker对应的Job集合从trackerToJobsToCleanup中删除,后续会重新调度这些运行在该有问题的TaskTracker上的Task(这些Task属于某些Job,JobTracker分配任务的单位是Task)。

1 Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup

用来跟踪某个TaskTracker上运行的Task集合的数据结构。
当Job运行完成(成功或者失败)后,一个TaskTracker需要知道属于该Job的哪些Task运行在该TaskTracker上,需要对这些Task进行清理。JobTracker端会查询出这类Task,并通过心跳的响应,向对应的TaskTracker发送KillTaskAction指令,通知TaskTracker清理这些Task运行时生成的临时文件等。

1 Map<TaskAttemptID, TaskInProgress> taskidToTIPMap

TaskAttemptID用来标识一个MapTask或一个ReduceTask,通过该数据结构可以根据TaskAttemptID获取到MapTask/ReduceTask的运行信息,也就是TaskInProgress对象。
当需要检索MapTask/ReduceTask,或者对JobTracker端所维护的该Task的状态信息进行更新的时候,需要通过该数据结构获取到。

1 TreeMap<TaskAttemptID, String> taskidToTrackerMap

维护TaskAttemptID到TaskTracker的映射关系,可以通过一个Task的ID获取到该Task运行在哪个TaskTracker上。

1 Map<String, Set<TaskTracker>> hostnameToTaskTracker

一台主机上,可能运行着多个TaskTracker进程,该数据结构用来维护host到TaskTracker集合的映射关系。如果一个host被加入了黑名单,则该host上面的所有TaskTracker都无法接收任务。

1 TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap

某个TaskTracker上都运行着哪些Task,通过该数据结构来维护这种映射关系。

1 TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap

在某个TaskTracker上都运行完成了哪些Task,通过该数据结构来维护这种映射关系。

1 Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap

TaskTracker会周期性地向JobTracker发送心跳报告,最近一次发送的心跳报告,JobTracker会给其一个响应,最后的这个响应的数据保存在该数据结构中。

1 Map<String, Node> hostnameToNodeMap

JobTracker维护了一个网络拓扑结构(NetworkTopology),组成该拓扑结构的是一个一个的Node,每个Node都包含了网络位置信息、继承关系信息、名称等。
每个TaskTracker都是整个Hadoop集群的一个节点,通过该数据结构维护了TaskTracker在集群拓扑结构中相关信息。
比如,根据给定TaskTracker ID,从hostnameToNodeMap中检索出其对应的Node信息,在调度一个Job的MapTask运行时(MapTask运行具有Locality特性),可以基于local、rack-local、off-switch的顺序优先选择前面的Node运行该MapTask。

附录
这里给出文中(文字/图片上)一些缩写词对应的完整名称,如下表所示:

简写词 完整名称
JIP JobInProgress
TIP TaskInProgress
TAID TaskAttemptID
TTID TaskTracker ID
TT HOST TaskTracker Host Name
TCE TaskCompletionEvent
JIPL JobInProgressListener
时间: 2024-10-29 16:49:55

MapReduce V1:JobTracker端Job/Task数据结构的相关文章

MapReduce V1:TaskTracker端启动Task流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程. TaskTracker周期性地向JobTracker发送心跳报告,在RPC调用返回结果后,解析结果得到JobTracker下发的运行Task的指令,即LaunchTaskAction,就会在TaskTracker节点上准备运行这个Task.Task的运行是在一个与TaskTracker进程隔离的JVM实例中执行,该JVM实例是通过org.apache.hadoop.mapred.Child来创建的,所以在创建Child

MapReduce V1:Job提交流程之JobTracker端分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程.MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient.JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程. 上一篇我们分析了Job提交过程中JobClient端的处理流程(详见文章 MapReduce V1:Job提交流程之JobClient端分析),这里我们继续详细分析Job提交在JobTracker端的具体流程.通

MapReduce V1:JobTracker处理Heartbeat流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程.这篇文章的内容,更多地主要是描述处理/交互流程性的东西,大部分流程图都是经过我梳理后画出来的(开始我打算使用序列图来描述流程,但是发现很多流程在单个对象内部都已经非常复杂,想要通过序列图表达有点担心描述不清,所以选择最基本的程序流程图),可能看起来比较枯燥,重点还是关注主要的处理流程要点,特别的地方我会刻意标示出来,便于理解. JobTracker与TaskTracker之间通过org.apache.hadoop.map

MapReduce V1:Job提交流程之JobClient端分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程. MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient.JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程.下图是<Hadoop权威指南>一书给出的MapReduce V1处理Job的抽象流程图: 如上图,我们展开阴影部分的处理逻辑,详细分析Job提交在JobClient端的具体流程.在编写好MapReduce程序以

MapReduce V1:MapTask执行流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程. 在文章<MapReduce V1:TaskTracker设计要点概要分析>中我们已经了解了org.apache.hadoop.mapred.Child启动的基本流程,在Child VM启动的过程中会运行MapTask,实际是运行用户编写的MapReduce程序中的map方法中的处理逻辑,我们首先看一下,在Child类中,Child基于TaskUmbilicalProtocol协议与TaskTracker通信,获取到该

MapReduce V1:TaskTracker设计要点概要分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程. 本文不打算深入地详细分析TaskTracker某个具体的处理流程,而是概要地分析TaskTracker在MapReduce框架中的主要负责处理那些事情,是我们能够在宏观上了解TaskTracker端都做了哪些工作.我尽量将TaskTracker端的全部要点内容提出来,但是涉及到详细的分析,只是点到为止,后续会对相应模块的处理流程结合代码进行分析. TaskTracker主要负责MapReduce计算集群中Task运行的

【C/C++学院】0828-STL入门与简介/STL容器概念/容器迭代器仿函数算法STL概念例子/栈队列双端队列优先队列/数据结构堆的概念/红黑树容器

STL入门与简介 #include<iostream> #include <vector>//容器 #include<array>//数组 #include <algorithm>//算法 using namespace std; //实现一个类模板,专门实现打印的功能 template<class T> //类模板实现了方法 class myvectorprint { public: void operator ()(const T &

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

        我们知道,MapReduce有三层调度模型,即Job-->Task-->TaskAttempt,并且:         1.通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业.JobSetup Task等复杂的情况这里不做考虑):         2.每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task

Hadoop专业解决方案-第3章:MapReduce处理数据

前言:非常感谢团队的努力,最新的章节终于有了成果,因为自己的懒惰,好久没有最新的进展了,感谢群里兄弟的努力. 群名称是Hadoop专业解决方案群  313702010 本章主要内容: 理解MapReduce基本原理 了解MapReduce应用的执行 理解MapReduce应用的设计 截止到目前,我们已经知道Hadoop如何存储数据,但Hadoop不仅仅是一个高可用 的,规模巨大的数据存储引擎,它的另一个主要特点是可以将数据存储与处理相结合. Hadoop的核心处理模块是MapReduce,也是当