MapReduce V1:TaskTracker端启动Task流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。
TaskTracker周期性地向JobTracker发送心跳报告,在RPC调用返回结果后,解析结果得到JobTracker下发的运行Task的指令,即LaunchTaskAction,就会在TaskTracker节点上准备运行这个Task。Task的运行是在一个与TaskTracker进程隔离的JVM实例中执行,该JVM实例是通过org.apache.hadoop.mapred.Child来创建的,所以在创建Child VM实例之前,需要做大量的准备工作来启动Task运行。一个Task的启动过程,如下序列图所示:


通过上图,结合源码,我们将一个Task启动的过程,分为下面3个主要的步骤:

  1. 初始化跟踪Task运行的相关数据结构
  2. 准备Task运行所共享的Job资源
  3. 启动Task

下面,我们详细分析上面3个步骤的流程:

初始化跟踪Task运行的相关数据结构

如果是LaunchTaskAction,则TaskTracker会将该指令加入到一个启动Task的队列中,进行一步加载处理,如下所示:

1 private void addToTaskQueue(LaunchTaskAction action) {
2 if (action.getTask().isMapTask()) {
3 mapLauncher.addToTaskQueue(action);
4 } else {
5 reduceLauncher.addToTaskQueue(action);
6 }
7 }

根据Task的类型,分别加入到对应类型的TaskLauncher的队列中。这里需要了解一下TaskLauncher线程类,在TaskTracker中创建了2个TaskLauncher线程,一个是为启动MapTask,另一个是为启动ReduceTask。下面是TaskLauncher类的构造方法:

1 public TaskLauncher(TaskType taskType, int numSlots) {
2 this.maxSlots = numSlots;
3 this.numFreeSlots = new IntWritable(numSlots);
4 this.tasksToLaunch = new LinkedList<TaskInProgress>();
5 setDaemon(true);
6 setName("TaskLauncher for " + taskType + " tasks");
7 }

构造方法中,参数taskType表示Task类型,分为MapTask和ReduceTask,参数numSlots表示对每一种类型的Task每个TaskTracker上最多可以启动的Task的实例数,默认都是2个。在TaskTracker初始化时,会读取mapred-site.xml配置文件,读取mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum配置的参数值,分别赋值给maxMapSlots和maxReduceSlots这2个属性,如下TaskTracker构造方法中初始化这2个属性:

1 maxMapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum", 2);
2 maxReduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum", 2);

然后,在TaskTracker创建时,会根据上述maxMapSlots和maxReduceSlots的值来创建并启动2个TaskLauncher线程:

1 mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
2 reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
3 mapLauncher.start();
4 reduceLauncher.start();

将LaunchTaskAction加入到TaskLauncher的队列中,这个是调用TaskLauncher的addToTaskQueue()方法:

1 public void addToTaskQueue(LaunchTaskAction action) {
2 synchronized (tasksToLaunch) {
3 TaskInProgress tip = registerTask(action, this); // 注册Task,初始化用来跟踪该待启动的Task相关的数据结构
4 tasksToLaunch.add(tip); // 将TIP加入队列
5 tasksToLaunch.notifyAll(); // 通知TaskLauncher线程自己(在run()方法中会调用wait())启动Task
6 }
7 }

上面方法中,最关键的就是registerTask()方法,调用该方法来初始化TaskTracker端Task对应TaskInProgress结构,代码如下所示:

01 private TaskInProgress registerTask(LaunchTaskAction action, TaskLauncher launcher) {
02 Task t = action.getTask();
03 LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() + " task's state:" + t.getState());
04 TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher); // 创建TIP
05 synchronized (this) {
06 tasks.put(t.getTaskID(), tip); // 加入到队列tasks:TaskAttemptID -> TaskInProgress
07 runningTasks.put(t.getTaskID(), tip); // 加入到队列runningTasks:TaskAttemptID -> TaskInProgress
08 boolean isMap = t.isMapTask();
09 if (isMap) {
10 mapTotal++;
11 } else {
12 reduceTotal++;
13 }
14 }
15 return tip;
16 }

上面方法中,tasks队列用来记录该TaskTracker上所有的Task,包括正在运行和已经完成的Task,而队列runningTasks则表示当前TaskTracker上正在运行的Task。同时,通过mapTotal和reduceTotal来分别记录当前TaskTracker上运行的总的MapTask和ReduceTask的数量。
根据LaunchTaskAction创建的TaskInProgress结构被加入到队列tasksToLaunch中,然后通知TaskLauncher线程,在方法run中检测并取出队列中TaskInProgress对象,并判断当前TaskTracker的资源状态能否启动一个Task,如果可以则调用startNewTask()方法启动Task,代码如下所示:

01 TaskInProgress tip;
02 Task task;
03 synchronized (tasksToLaunch) {
04 while (tasksToLaunch.isEmpty()) { // 队列为空,则没有Task需要启动,等待向队列加入LaunchTaskAction指令及其通知
05 tasksToLaunch.wait();
06 }
07 tip = tasksToLaunch.remove(0); // 队列不空,则取出LaunchTaskAction
08 task = tip.getTask();
09 LOG.info("Trying to launch : " + tip.getTask().getTaskID() + " which needs " + task.getNumSlotsRequired() + " slots");
10 }
11 synchronized (numFreeSlots) { // 检查当前是否存在空闲的slot,以便运行Task
12 boolean canLaunch = true;
13 while (numFreeSlots.get() < task.getNumSlotsRequired()) { // 如果当前空闲slot小于该Task运行所需的slot数量
14 if (!tip.canBeLaunched()) { // 如果TIP状态不是下面3种状态:UNASSIGNED、FAILED_UNCLEAN、KILLED_UNCLEAN
15 canLaunch = false; // 检查TIP状态不能启动Task,但也不能阻塞该方法
16 break;
17 }
18 LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() + " to launch " + task.getTaskID() + ", currently we have " + numFreeSlots.get() + " free slots");
19 numFreeSlots.wait(); // 如果没有空闲slot,则等待
20 }
21 if (!canLaunch) {
22 continue;
23 }
24 LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+ " and trying to launch "+tip.getTask().getTaskID() + " which needs " + task.getNumSlotsRequired() +" slots");
25 numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired()); // 标记将满足该Task的Slot数已经分配
26 assert (numFreeSlots.get() >= 0);
27 }
28 synchronized (tip) {
29 // 到这里已经获取到了满足运行Task要求的空闲slot,但还要检查该TIP状态是否指示为被kill了
30 if (!tip.canBeLaunched()) {
31 LOG.info("Not launching task " + task.getTaskID() + " as it got killed externally. Task's state is " + tip.getRunState());
32 addFreeSlots(task.getNumSlotsRequired()); // 如果Task状态TIP标识不能启动,则释放slot
33 continue;
34 }
35 tip.slotTaken = true;
36 }
37
38 startNewTask(tip); // 获取到了满足Task启动所需的空闲slot,开始启动Task

这样,当前TaskTracker所在节点的资源状态,和Task对应的TIP状态都已经满足启动Task的要求,可以启动一个Task去运行。

准备Task运行所共享的Job资源

调用startNewTask()方法,异步地启动了一个单独的线程去启动Task,该方法如下所示:

01 void startNewTask(final TaskInProgress tip) throws InterruptedException {
02 Thread launchThread = new Thread(new Runnable() {
03 @Override
04 public void run() {
05 try {
06 RunningJob rjob = localizeJob(tip); // 在TaskTracker节点上初始化Job信息
07 tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString());
08 launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); // 启动Task
09 } catch (Throwable e) {
10 ... ...
11 }
12 }
13 });
14 launchThread.start();
15 }

如果在一个TaskTracker节点上运行的多个Task都属于同一个Job(一个TaskTracker上运行的Task按照Job来分组,每一组Task都属于同一个Job),那么第一次初始化时,还没有建立一个Task到Job的映射关系,也就是说,在TaskTracker端也要维护Job的状态,以及属于该Job的所有Task的状态信息。比如,如果用户提交了一个kill掉Job的请求,那么正在运行的属于该Job的所有Task都应该被kill掉。
上面代码中调用localizeJob()方法,执行了如下处理:

  • 创建一个RunningJob对象,并加入到TaskTracker维护的runningJobs队列(包含了JobID到RunningJob的映射关系)中,同时将Task对应的TIP对象加入到RunningJob所维护的tasks队列中。
  • 一个Job完成初始化,还需要将Job相关的信息,如Job配置信息从HDFS上下载到TaskTracker所在节点本地,供该Job的一组Task运行共享。我们知道,在JobClient提交Job时,会将相关资源拷贝到HDFS上的指定目录中,例如,在HDFS上的/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/目录下存储Job相关的资源文件,拷贝到TaskTracker本地目录下,例如/tmp/mapred/local/ttprivate/taskTracker/shirdrn/jobcache/job_200912121733_0002/目录。
  • 调用TaskController的initializeJob()方法初始化Job所包含的相关资源信息,为属于该Job的一组Task所共享。

这里,TaskController使用的LinuxTaskController实现类,通过调用该方法,实际上构造了一个Shell命令行,用来在TaskTracker节点上初始化目录和拷贝相关资源,该命令行示例如下所示:

1 /usr/local/java/bin/java -classpath .:/usr/local/java/lib/*.jar;/usr/local/java/jre/lib/*.jar -Dhadoop.log.dir=/tmp/hadoop/logs -Dhadoop.root.logger=INFO,console -Djava.library.path= org.apache.hadoop.mapred.JobLocalizer shirdrn job_200912121733_0002

通过工具ShellCommandExecutor来执行上述命令行,启动一个单独的JVM实例,完成Job资源初始化,完成即退出。通过上述命令行可以看到,主要的初始化工作都在JobLocalizer中完成的,需要传入2个参数:用户、jobid,在JobLocalizer中创建了一个Job所包含的各种资源,供Task在TaskTracker节点上运行共享,这些相关的目录或资源文件包括:

1 ${mapred.local.dir}/taskTracker/${user}
2 ${mapred.local.dir}/taskTracker/${user}/jobcache
3 ${mapred.local.dir}/taskTracker/${user}/jobcache/${jobid}/work
4 ${mapred.local.dir}/taskTracker/${user}/jobcache/${jobid}/jars
5 ${mapred.local.dir}/taskTracker/${user}/jobcache/${jobid}/jars/job.jar
6 ${mapred.local.dir}/taskTracker/${user}/jobcache/${jobid}/job.xml
7 ${mapred.local.dir}/taskTracker/${user}/jobcache/${jobid}/jobToken
8 ${mapred.local.dir}/taskTracker/${user}/distcache

这样,在一个TaskTracker节点上运行的一组Task所共享的对应唯一Job相关的资源已经满足,接下来就可以启动Task了。

启动Task

启动Task的流程相对复杂一些,我们分几个阶段/要点来进行说明:

启动Task准备

在startNewTask()方法中调用localizeJob()方法,完成了Job资源在TaskTracker节点上的初始化,接着就可以调用launchTaskForJob()方法进入启动Task的处理流程,代码如下所示:

1 protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
2 RunningJob rjob) throws IOException {
3 synchronized (tip) {
4 jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localStorage.getDirsString());
5 tip.setJobConf(jobConf);
6 tip.setUGI(rjob.ugi);
7 tip.launchTask(rjob); // 这里才是启动Task的核心方法
8 }
9 }

通过调用TaskInProgress tip的launchTask()方法来启动Task,我们看一下该方法实现代码:

01 public synchronized void launchTask(RunningJob rjob) throws IOException {
02 if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
03 this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
04 this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
05 localizeTask(task);
06 if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { // 如果状态UNASSIGNED,则初始化完成后,将当前状态改为RUNNING
07 this.taskStatus.setRunState(TaskStatus.State.RUNNING);
08 }
09 setTaskRunner(task.createRunner(TaskTracker.this, this, rjob)); // 启动一个TaskRunner线程
10 this.runner.start();
11 long now = System.currentTimeMillis();
12 this.taskStatus.setStartTime(now);
13 this.lastProgressReport = now;
14 } else {
15 LOG.info("Not launching task: " + task.getTaskID() + " since it's state is " +this.taskStatus.getRunState());
16 }
17 }

TaskInProgress里面taskStatus维护了一个TIP的状态,通过上述代码可以看出,一个Task只有具备下面3个状态之一:UNASSIGNED、FAILED_UNCLEAN、KILLED_UNCLEAN,才能够被启动。
首先要进行Task的初始化,调用localizeTask()方法,如下所示:

1 void localizeTask(Task task) throws IOException{
2 task.localizeConfiguration(localJobConf);
3 task.setConf(localJobConf);
4 }

在这里,Task可能是MapTask,也可能是ReduceTask,所以调用task.localizeConfiguration()的初始化逻辑稍微有些不同,具体可以查看MapTask和ReduceTask类实现。另外,对于不同类型的Task,也会创建不同类型的TaskRunner线程,分别对应于MapTaskRunner和ReduceTaskRunner,实际所有Task启动的相关逻辑都是在这2个TaskRunner中实现的。
在TaskRunner中,主要逻辑是在run()方法中实现的,其中在调用launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir)之前,做了一些准备工作:

  • 构建setupCmds:读取系统环境变量,或者hadoop设置的环境变量,LD_LIBRARY_PATH、LD_LIBRARY_PATH、USER、SHELL、LOGNAME、HOME、HADOOP_TOKEN_FILE_LOCATION、HADOOP_ROOT_LOGGER、HADOOP_CLIENT_OPTS、HADOOP_CLIENT_OPTS,这些变量都是键值对的形式,最后会通过export在当前环境下导出这些变量配置
  • 构建vargs:设置启动Child VM的配置,读取mapred-site.xml配置文件中mapred.map.child.java.opts和mapred.reduce.child.java.opts的配置内容,最终会使用org.apache.hadoop.mapred.Child创建一个JVM实例来启动Task
  • 目录文件设置:包括2个日志文件stdout和stderr,以及当前启动JVM所在的目录workDir

使用JvmManager管理启动Task相关数据

完成上述准备工作以后,调用launchJvmAndWait()方法,创建Child VM实例,如下所示:

01 void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,
02 File stderr, long logSize, File workDir)
03 throws InterruptedException, IOException {
04 jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout, stderr, logSize, workDir, conf));
05 synchronized (lock) {
06 while (!done) {
07 lock.wait();
08 }
09 }
10 }

最终是通过JvmManager来实现JVM实例的创建,下面是JvmManager保存的一些数据结构,用来维护JVM相关数据的数据结构,如下图所示:


可以看到,一个JvmManager对应2个JvmManagerForType,分别负责管理MapTask和ReduceTask启动对应的Child VM等数据,JvmManager的构造方法,如下所示:

1 public JvmManager(TaskTracker tracker) {
2 mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), true, tracker);
3 reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(), false, tracker);
4 }

上面调用了jvmManager.launchJvm()方法,其中内部根据Task类型,选择调用mapJvmManager或reduceJvmManager的reapJvm()方法,如下所示:

01 private synchronized void reapJvm(TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
02 if (t.getTaskInProgress().wasKilled()) { // 检查当前准备启动的Task是否已经被kill掉,如果是则直接返回
03 return;
04 }
05 boolean spawnNewJvm = false;
06 JobID jobId = t.getTask().getJobID();
07 int numJvmsSpawned = jvmIdToRunner.size();
08 JvmRunner runnerToKill = null;
09 // 检查是否存在空闲slot来启动Task
10 if (numJvmsSpawned >= maxJvms) { // 检查当前TaskTracker上运行的某类Task对应的JVM实例数是否大于全局设置允许的最大slot数
11 Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = jvmIdToRunner.entrySet().iterator();
12 while (jvmIter.hasNext()) { // 遍历当前存在的<JVMId, JvmRunner>队列,检查每个JvmRunner的状态
13 JvmRunner jvmRunner = jvmIter.next().getValue();
14 JobID jId = jvmRunner.jvmId.getJobId();
15 if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){ // 如果在当前要启动的Task之前,已经有该Task对应的Job的其他Task运行完成,则预留该JVM以重用
16 setRunningTaskForJvm(jvmRunner.jvmId, t); // 将该JvmRunner映射到当前Task
17 LOG.info("No new JVM spawned for jobId/taskid: " + jobId+"/"+t.getTask().getTaskID() + ". Attempting to reuse: " + jvmRunner.jvmId);
18 return;
19 }
20 //一个JVM实例需要kill掉,需要满足下面条件之一:
21 // (1) 如果属于当前要启动的Task对应的Job,该Job对应的其他Task都已经运行完成;
22 // (2) 如果不属于当前要启动的Task所对应的Job,那些Job对应的Task都已经运行完成。
23 if ((jId.equals(jobId) && jvmRunner.ranAll()) || (!jId.equals(jobId) && !jvmRunner.isBusy())) {
24 runnerToKill = jvmRunner;
25 spawnNewJvm = true;
26 }
27 }
28 } else {
29 spawnNewJvm = true;
30 }
31
32 if (spawnNewJvm) {
33 if (runnerToKill != null) {
34 LOG.info("Killing JVM: " + runnerToKill.jvmId);
35 killJvmRunner(runnerToKill); // kill掉该JvmRunner
36 }
37 spawnNewJvm(jobId, env, t); // 创建一个新的JVM实例来启动该Task
38 return;
39 }
40 } catch (Exception e) {
41 LOG.fatal(e);
42 } finally {
43 System.exit(-1);
44 }
45 }

上面代码中,调用setRunningTaskForJvm()很关键,实际上把需要启动的Task与JvmRunner建立映射关系,更新相应的内存数据结构(队列),如下所示:

1 synchronized public void setRunningTaskForJvm(JVMId jvmId,
2 TaskRunner t) {
3 jvmToRunningTask.put(jvmId, t);
4 runningTaskToJvm.put(t,jvmId);
5 jvmIdToRunner.get(jvmId).setBusy(true); // 设置当前JvmRunner被占用,不允许释放该资源
6 }

该方法,在spawnNewJvm()方法也调用了,spawnNewJvm()方法创建了一个新的JVM,代码如下所示:

1 private void spawnNewJvm(JobID jobId, JvmEnv env, TaskRunner t) {
2 JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
3 jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
4 jvmRunner.setDaemon(true);
5 jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
6 setRunningTaskForJvm(jvmRunner.jvmId, t); // 调用setRunningTaskForJvm()方法
7 LOG.info(jvmRunner.getName());
8 jvmRunner.start(); // 启动JvmRunner线程,用来启动Child VM
9 }

接下来,我们看一下JvmRunner线程类,该线程体run()方法中直接调用了runChild()方法,该方法实现代码,如下所示:

01 public void runChild(JvmEnv env) throws IOException, InterruptedException{
02 int exitCode = 0;
03 try {
04 env.vargs.add(Integer.toString(jvmId.getId()));
05 TaskRunner runner = jvmToRunningTask.get(jvmId); // 从队列jvmToRunningTask中取出TaskRunner
06 if (runner != null) {
07 Task task = runner.getTask();
08 String user = task.getUser();
09 TaskAttemptID taskAttemptId = task.getTaskID();
10 String taskAttemptIdStr = task.isTaskCleanupTask() ? (taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) : taskAttemptId.toString();
11 exitCode = tracker.getTaskController().launchTask(user, jvmId.jobId.toString(), taskAttemptIdStr, env.setup, env.vargs, env.workDir, env.stdout.toString(), env.stderr.toString()); // 通过TaskController来启动Task
12 }
13 } catch (IOException ioe) {
14 // do nothing
15 } finally { // handle the exit code
16 // although the process has exited before we get here,
17 // make sure the entire process group has also been killed.
18 kill(); // Task运行完成,kill掉运行Task的Child VM实例
19 updateOnJvmExit(jvmId, exitCode);
20 LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode + ". Number of tasks it ran: " + numTasksRan);
21 deleteWorkDir(tracker, firstTask); // 清理临时目录
22 }
23 }

在JvmRunner线程类中,其中委托TaskController来控制Task的实际启动。

使用TaskController控制启动Child VM

下面,我们看TaskController启动Task的实现方法launchTask(),代码如下所示:

01 @Override
02 public int launchTask(String user, String jobId, String attemptId, List<String> setup, List<String> jvmArguments,
03 File currentWorkDirectory, String stdout, String stderr) throws IOException {
04 ShellCommandExecutor shExec = null;
05 try {
06 FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
07 long logSize = 0; //TODO MAPREDUCE-1100
08 String cmdLine = TaskLog.buildCommandLine(setup, jvmArguments, new File(stdout),new File(stderr), logSize, true);
09 // 将上述命令行写入本地缓存的目录中
10 Path p = new Path(allocator.getLocalPathForWrite( TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId), getConf()), COMMAND_FILE);
11 String commandFile = writeCommand(cmdLine, rawFs, p);
12 String[] command =
13 new String[]{taskControllerExe,
14 user,
15 localStorage.getDirsString(),
16 Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
17 jobId,
18 attemptId,
19 currentWorkDirectory.toString(),
20 commandFile};
21 shExec = new ShellCommandExecutor(command);
22
23 if (LOG.isDebugEnabled()) {
24 LOG.debug("launchTask: " + Arrays.toString(command));
25 }
26 shExec.execute(); // 执行启动Task的命令
27 } catch (Exception e) {
28 if (shExec == null) {
29 return -1;
30 }
31 int exitCode = shExec.getExitCode();
32 LOG.warn("Exit code from task is : " + exitCode);
33 // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
34 // terminated/killed forcefully. In all other cases, log the
35 // task-controller output
36 if (exitCode != 143 && exitCode != 137) {
37 LOG.warn("Exception thrown while launching task JVM : " + StringUtils.stringifyException(e));
38 LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
39 logOutput(shExec.getOutput());
40 }
41 return exitCode;
42 }
43 if (LOG.isDebugEnabled()) {
44 LOG.debug("Output from LinuxTaskController's launchTask follows:");
45 logOutput(shExec.getOutput());
46 }
47 return 0;
48 }

将构造好的启动Child的命令行写入到本地目录下的文件中,该脚本文件的绝对路径,示例如下所示:

1 /tmp/mapred/local/ttprivate/taskTracker/shirdrn/jobcache/job_200912121733_0002/attempt_200912121733_0002_m_000005_0/taskjvm.sh

在TaskController(实际上是LinuxTaskController)的launchTask()方法中,使用ShellCommandExecutor工具执行的命令行,类似如下这样:

1 /usr/local/hadoop/bin/task-controller shirdrn /tmp/mapred/local 1 job_200912121733_0002 attempt_200912121733_0002_m_000005_0 /tmp/mapred/local/ttprivate/taskTracker/shirdrn/jobcache/job_200912121733_0002/attempt_200912121733_0002_m_000005_0/taskjvm.sh

在taskjvm.sh脚本中的内容,才是真正启动Child VM的命令行,示例如下所示:

1 /usr/local/bin/java -Xmx 512M -verbose:gc -Xloggc:/tmp/attempt_200912121733_0002_m_000005_0.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.library.path= -Djava.io.tmpdir= -classpath .:/usr/local/java/lib/*.jar:/usr/local/java/jre/lib/*.jar -Dlog4j.configuration=task-log4j.properties -Dhadoop.log.dir=/tmp/hadoop/logs -Dhadoop.root.logger=INFO,TLA -Dhadoop.tasklog.taskid=attempt_200912121733_0002_m_000005_0 -Dhadoop.tasklog.iscleanup=false -Dhadoop.tasklog.totalLogFileSize= org.apache.hadoop.mapred.Child 127.0.0.1 0 attempt_200912121733_0002_m_000005_0 /tmp/hadoop/logs/userlogs/job_200912121733_0002/attempt_200912121733_0002_m_000005_0/ 2134

至此,一个Task通过Child VM的加载已经启动,就可以运行一个Task了,我们后续再详细介绍。

时间: 2024-11-03 00:01:14

MapReduce V1:TaskTracker端启动Task流程分析的相关文章

MapReduce V1:JobTracker处理Heartbeat流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程.这篇文章的内容,更多地主要是描述处理/交互流程性的东西,大部分流程图都是经过我梳理后画出来的(开始我打算使用序列图来描述流程,但是发现很多流程在单个对象内部都已经非常复杂,想要通过序列图表达有点担心描述不清,所以选择最基本的程序流程图),可能看起来比较枯燥,重点还是关注主要的处理流程要点,特别的地方我会刻意标示出来,便于理解. JobTracker与TaskTracker之间通过org.apache.hadoop.map

MapReduce V1:MapTask执行流程分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程. 在文章<MapReduce V1:TaskTracker设计要点概要分析>中我们已经了解了org.apache.hadoop.mapred.Child启动的基本流程,在Child VM启动的过程中会运行MapTask,实际是运行用户编写的MapReduce程序中的map方法中的处理逻辑,我们首先看一下,在Child类中,Child基于TaskUmbilicalProtocol协议与TaskTracker通信,获取到该

MapReduce V1:TaskTracker设计要点概要分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程. 本文不打算深入地详细分析TaskTracker某个具体的处理流程,而是概要地分析TaskTracker在MapReduce框架中的主要负责处理那些事情,是我们能够在宏观上了解TaskTracker端都做了哪些工作.我尽量将TaskTracker端的全部要点内容提出来,但是涉及到详细的分析,只是点到为止,后续会对相应模块的处理流程结合代码进行分析. TaskTracker主要负责MapReduce计算集群中Task运行的

MapReduce V1:Job提交流程之JobTracker端分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程.MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient.JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程. 上一篇我们分析了Job提交过程中JobClient端的处理流程(详见文章 MapReduce V1:Job提交流程之JobClient端分析),这里我们继续详细分析Job提交在JobTracker端的具体流程.通

MapReduce V1:JobTracker端Job/Task数据结构

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程.在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助. 在编写MapReduce程序时,我们是以Job为单位进行编程处理,一个应用程序可能由一组Job组成,而MapReduce框架给我们暴露的只是一些Map和Reduce的函数接口,在运行期它会构建对应MapTask和ReduceTask,所以我们

MapReduce V1:Job提交流程之JobClient端分析

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程. MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient.JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程.下图是<Hadoop权威指南>一书给出的MapReduce V1处理Job的抽象流程图: 如上图,我们展开阴影部分的处理逻辑,详细分析Job提交在JobClient端的具体流程.在编写好MapReduce程序以

myeclipse-MyEclipse启动后的运行流程分析

问题描述 MyEclipse启动后的运行流程分析 例如数据流的流向,以及在各个部分的编码格式转化问题等等. tomcat网卡监听的是什么数据?post提交的表单需要被tomcat编码一下不?tomcat端有jsp界面否? 解决方案 MyEclipse是开发环境,tomcat是web服务器,和网卡打交道的是操作系统和驱动,三者根本没关系,你在问什么. 解决方案二: Hadoop运行流程分析Hadoop运行流程分析SpringMVC运行流程分析 解决方案三: 编程尚未成功,同志仍需努力! Myecl

WebGL 启动加载触发更新流程分析

WebGL 启动加载触发更新流程分析 太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:太阳火神的美丽人生 -  本博客专注于 敏捷开发及移动和物联设备研究:iOS.Android.Html5.Arduino.pcDuino,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. requestAnimFrame(tick); 此命令是 HTML5 中新增的用于替换定时器触发更新的命令,

Hadoop MapReduce两种常见的容错场景分析

本文将分析Hadoop MapReduce(包括MRv1和MRv2)的两种常见的容错场景,第一种是,作业的某个任务阻塞了,长时间占用资源不释放,如何处理?另外一种是,作 业的Map http://www.aliyun.com/zixun/aggregation/17034.html">Task全部运行完成后,在Reduce Task运行过程中,某个Map Task所在节点挂了,或者某个Map Task结果存放磁盘损坏了,该如何处理? 第一种场景:作业的某个任务阻塞了,长时间占用资源不释放,