Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

        我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行。如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点。

        通过MRAppMaster类的定义我们就能看出,MRAppMaster继承自CompositeService,而CompositeService又继承自AbstractService,也就是说MRAppMaster也是Hadoop中的一种服务,我们看下服务启动的serviceStart()方法中关于MapReduce作业的处理,关键代码如下:

  @SuppressWarnings("unchecked")
  @Override
  protected void serviceStart() throws Exception {

    // ......省略部分代码

    // 调用createJob()方法创建作业Job实例job
    // /////////////////// Create the job itself.
    job = createJob(getConfig(), forcedState, shutDownMessage);

    // End of creating the job.

    // ......省略部分代码

    // 作业初始化失败标志位initFailed默认为false,即初始化成功,没有错误
    boolean initFailed = false;
    if (!errorHappenedShutDown) {

      // create a job event for job intialization
      // 创建一个Job初始化事件initJobEvent
      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);

      // Send init to the job (this does NOT trigger job execution)
      // This is a synchronous call, not an event through dispatcher. We want
      // job-init to be done completely here.
      // 调用jobEventDispatcher的handle()方法,处理Job初始化事件initJobEvent,即将Job初始化事件交由事件分发器jobEventDispatcher处理,
      jobEventDispatcher.handle(initJobEvent);

      // If job is still not initialized, an error happened during
      // initialization. Must complete starting all of the services so failure
      // events can be processed.
      // 获取Job初始化结果initFailed
      initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);

      // JobImpl's InitTransition is done (call above is synchronous), so the
      // "uber-decision" (MR-1220) has been made.  Query job and switch to
      // ubermode if appropriate (by registering different container-allocator
      // and container-launcher services/event-handlers).

      // ......省略部分代码

      // Start ClientService here, since it's not initialized if
      // errorHappenedShutDown is true

      // 启动客户端服务clientService
      clientService.start();
    }

    //start all the components
    // 调用父类的serviceStart(),启动所有组件
    super.serviceStart();

    // finally set the job classloader
    // 最终设置作业类加载器
    MRApps.setClassLoader(jobClassLoader, getConfig());

    if (initFailed) {
      // 如果作业初始化失败,构造作业初始化失败JOB_INIT_FAILED事件,并交由事件分发器jobEventDispatcher处理
      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
      jobEventDispatcher.handle(initFailedEvent);
    } else {
      // All components have started, start the job.
      // 调用startJobs()方法启动作业
      startJobs();
    }
  }

        通过MRAppMaster服务启动的serviceStart()方法我们大致知道,MapReduce作业在MRAppMaster中经历了创建--初始化--启动三个主要过程,剪去枝叶,保留主干,具体如下:

        1、创建:调用createJob()方法创建作业Job实例job;

        2、初始化:

              2.1、创建一个Job初始化事件initJobEvent;

              2.2、调用jobEventDispatcher的handle()方法,处理Job初始化事件initJobEvent,即将Job初始化事件交由事件分发器jobEventDispatcher处理;

              2.3、获取Job初始化结果initFailed;

              2.4、如果作业初始化失败,构造作业初始化失败JOB_INIT_FAILED事件,并交由事件分发器jobEventDispatcher处理。

        3、启动:调用startJobs()方法启动作业。

         实际上,作业启动后不可能永远都不停止,MRAppMaster最终会将作业停止,这也是作业处理流程的第四步,即最后一步,作业停止!在哪里处理的呢?我们先卖个关子,请您暂时忽略这个问题,我们稍后会给出答案!

        下面,我们针对MapReduce作业的上述三个主要过程,分别展开描述。

        一、创建

        首先看作业创建,createJob()方法如下:

  /** Create and initialize (but don't start) a single job.
   * @param forcedState a state to force the job into or null for normal operation.
   * @param diagnostic a diagnostic message to include with the job.
   */
  protected Job createJob(Configuration conf, JobStateInternal forcedState,
      String diagnostic) {

    // create single job
	// 创建一个作业Job实例newJob,其实现为JobImpl
    Job newJob =
        new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
            taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
            completedTasksFromPreviousRun, metrics,
            committer, newApiCommitter,
            currentUser.getUserName(), appSubmitTime, amInfos, context,
            forcedState, diagnostic);

    // 将新创建的作业newJob的jobId与其自身的映射关系存储到应用运行上下文信息context中的jobs集合中
    ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);

    // 异步事件分发器dispatcher注册作业完成事件JobFinishEvent对应的事件处理器,通过createJobFinishEventHandler()方法获得
    dispatcher.register(JobFinishEvent.Type.class,
        createJobFinishEventHandler());
    // 返回新创建的作业newJob
    return newJob;
  } // end createJob()

        其主要逻辑如下:

        1、创建一个作业Job实例newJob,其实现为JobImpl,传入作业艾迪jobId、应用尝试艾迪appAttemptID、任务尝试监听器taskAttemptListener、输出提交器committer、用户名currentUser.getUserName()、应用运行上下文信息context等关键成员变量;

        2、将新创建的作业newJob的jobId与其自身的映射关系存储到应用运行上下文信息context中的jobs集合中;

        3、异步事件分发器dispatcher注册作业完成事件JobFinishEvent对应的事件处理器,通过createJobFinishEventHandler()方法获得;

        4、返回新创建的作业newJob。

        关于作业创建中的一些细节,我们暂时先不做过多关注,留待以后的文章专门进行分析。这里,我们先重点看看第3步,异步事件分发器dispatcher注册作业完成事件JobFinishEvent对应的事件处理器,通过createJobFinishEventHandler()方法获得,而createJobFinishEventHandler()方法代码如下:

  /**
   * create an event handler that handles the job finish event.
   * @return the job finish event handler.
   */
  protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
    return new JobFinishEventHandler();
  }

        也就是说,当作业被创建后,它就被定义了作业完成事件JobFinishEvent的处理器为JobFinishEventHandler,而JobFinishEventHandler的定义如下:

  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
    @Override
    public void handle(JobFinishEvent event) {
      // Create a new thread to shutdown the AM. We should not do it in-line
      // to avoid blocking the dispatcher itself.
      new Thread() {

        @Override
        public void run() {
          shutDownJob();
        }
      }.start();
    }
  }

        这就是我们上面没有详细介绍的第四步--作业停止,它最终是调用的shutDownJob()方法,并开启一个新的线程来完成作业停止的,我们稍后再做介绍。

        二、初始化

        我们再来看作业的初始化,它是通过创建一个Job初始化事件JobEvent实例initJobEvent,事件类型为JobEventType.JOB_INIT,然后交由事件分发器jobEventDispatcher处理的。我们先来看下这个jobEventDispatcher的定义及实例化,如下:

  // 作业事件分发器
  private JobEventDispatcher jobEventDispatcher;

         jobEventDispatcher是一个JobEventDispatcher类型的作业事件分发器,其实例化为:

this.jobEventDispatcher = new JobEventDispatcher();

        而JobEventDispatcher的定义如下:

  private class JobEventDispatcher implements EventHandler<JobEvent> {
    @SuppressWarnings("unchecked")
    @Override
    public void handle(JobEvent event) {
      // 从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件
      ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
    }
  }

        很简单,从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件,而这个Job实例,还记得上面描述的吗,就是在Job最初被创建时,被添加到应用运行上下文信息context中jobs集合中的,key为jobId,value就是JobImpl对象。context的实现RunningAppContext中,根据jobId获取job实例的代码如下:

    @Override
    public Job getJob(JobId jobID) {
      return jobs.get(jobID);
    }

        好了,我们就看下JobImpl中handle()方法是如何对类型为JobEventType.JOB_INIT的JobEvent进行处理的吧!

  @Override
  /**
   * The only entry point to change the Job.
   */
  public void handle(JobEvent event) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Processing " + event.getJobId() + " of type "
          + event.getType());
    }
    try {
      writeLock.lock();
      JobStateInternal oldState = getInternalState();
      try {
         getStateMachine().doTransition(event.getType(), event);
      } catch (InvalidStateTransitonException e) {
        LOG.error("Can't handle this event at current state", e);
        addDiagnostic("Invalid event " + event.getType() +
            " on Job " + this.jobId);
        eventHandler.handle(new JobEvent(this.jobId,
            JobEventType.INTERNAL_ERROR));
      }
      //notify the eventhandler of state change
      if (oldState != getInternalState()) {
        LOG.info(jobId + "Job Transitioned from " + oldState + " to "
                 + getInternalState());
        rememberLastNonFinalState(oldState);
      }
    }

    finally {
      writeLock.unlock();
    }
  }

        最核心的就是通过语句getStateMachine().doTransition(event.getType(), event)进行处理,实际上这牵着到了Yarn中MapReduce作业的状态机,为了本文叙述的流畅性、简洁性、重点明确性,我们对于作业状态机先不做解释,这部分内容留待以后的文章专门进行介绍,这里你只要知道作业初始化最终是通过JobImpl静态内部类InitTransition的transition()方法来实现的就行。我们看下InitTransition的transition()方法,如下:

/**
     * Note that this transition method is called directly (and synchronously)
     * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
     * just plain sequential call within AM context), so we can trigger
     * modifications in AM state from here (at least, if AM is written that
     * way; MR version is).
     */
    @Override
    public JobStateInternal transition(JobImpl job, JobEvent event) {

      // 调用作业度量指标体系metrics的submittedJob()方法,提交作业
      job.metrics.submittedJob(job);

      // 调用作业度量指标体系metrics的preparingJob()方法,开始作业准备
      job.metrics.preparingJob(job);

      // 新旧API创建不同的作业上下文JobContextImpl实例
      if (job.newApiCommitter) {
        job.jobContext = new JobContextImpl(job.conf,
            job.oldJobId);
      } else {
        job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
            job.conf, job.oldJobId);
      }

      try {

    	// 调用setup()方法,完成作业启动前的部分初始化工作
        setup(job);

        // 设置作业job对应的文件系统fs
        job.fs = job.getFileSystem(job.conf);

        //log to job history
        // 创建作业已提交事件JobSubmittedEvent实例jse
        JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
              job.conf.get(MRJobConfig.JOB_NAME, "test"),
            job.conf.get(MRJobConfig.USER_NAME, "mapred"),
            job.appSubmitTime,
            job.remoteJobConfFile.toString(),
            job.jobACLs, job.queueName,
            job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
            job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
            getWorkflowAdjacencies(job.conf),
            job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""));

        // 将作业已提交事件JobSubmittedEvent实例jse封装成作业历史事件JobHistoryEvent交由作业的时事件处理器eventHandler处理
        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
        //TODO JH Verify jobACLs, UserName via UGI?

        // 调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo
        TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);

        // 确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks
        job.numMapTasks = taskSplitMetaInfo.length;
        // 确定Reduce Task数目numReduceTasks,取作业参数mapreduce.job.reduces,参数未配置默认为0
        job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);

        // 确定作业的map和reduce权重mapWeight、reduceWeight
        if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
          job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
        } else if (job.numMapTasks == 0) {
          job.reduceWeight = 0.9f;
        } else if (job.numReduceTasks == 0) {
          job.mapWeight = 0.9f;
        } else {
          job.mapWeight = job.reduceWeight = 0.45f;
        }

        checkTaskLimits();

        // 根据分片元数据信息计算输入长度inputLength,也就是作业大小
        long inputLength = 0;
        for (int i = 0; i < job.numMapTasks; ++i) {
          inputLength += taskSplitMetaInfo[i].getInputDataLength();
        }

        // 根据作业大小inputLength,调用作业的makeUberDecision()方法,决定作业运行模式是Uber模式还是Non-Uber模式
        job.makeUberDecision(inputLength);

        // 根据作业的Map、Reduce任务数目之和,外加10,
        // 初始化任务尝试完成事件TaskAttemptCompletionEvent列表taskAttemptCompletionEvents
        job.taskAttemptCompletionEvents =
            new ArrayList<TaskAttemptCompletionEvent>(
                job.numMapTasks + job.numReduceTasks + 10);

        // 根据作业的Map任务数目,外加10,
        // 初始化Map任务尝试完成事件TaskCompletionEvent列表mapAttemptCompletionEvents
        job.mapAttemptCompletionEvents =
            new ArrayList<TaskCompletionEvent>(job.numMapTasks + 10);

        // 根据作业的Map、Reduce任务数目之和,外加10,
        // 初始化列表taskCompletionIdxToMapCompletionIdx
        job.taskCompletionIdxToMapCompletionIdx = new ArrayList<Integer>(
            job.numMapTasks + job.numReduceTasks + 10);

        // 确定允许Map、Reduce任务失败百分比,
        // 取参数mapreduce.map.failures.maxpercent、mapreduce.reduce.failures.maxpercent,
        // 参数未配置均默认为0,即不允许Map和Reduce任务失败
        job.allowedMapFailuresPercent =
            job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
        job.allowedReduceFailuresPercent =
            job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);

        // create the Tasks but don't start them yet
        // 创建Map Task
        createMapTasks(job, inputLength, taskSplitMetaInfo);
        // 创建Reduce Task
        createReduceTasks(job);

        // 调用作业度量指标体系metrics的endPreparingJob()方法,结束作业准备
        job.metrics.endPreparingJob(job);

        // 返回作业内部状态,JobStateInternal.INITED,即已经初始化
        return JobStateInternal.INITED;
      } catch (Exception e) {

    	// 记录warn级别日志信息:Job init failed,并打印出具体异常
        LOG.warn("Job init failed", e);
        // 调用作业度量指标体系metrics的endPreparingJob()方法,结束作业准备
        job.metrics.endPreparingJob(job);
        job.addDiagnostic("Job init failed : "
            + StringUtils.stringifyException(e));
        // Leave job in the NEW state. The MR AM will detect that the state is
        // not INITED and send a JOB_INIT_FAILED event.

        // 返回作业内部状态,JobStateInternal.NEW,即初始化失败后的新建
        return JobStateInternal.NEW;
      }
    }

        为了主体逻辑清晰,我们去掉部分细节,保留主干,将作业初始化总结如下:

        1、调用setup()方法,完成作业启动前的部分初始化工作,实际上最重要的两件事就是:

               1.1、获取并设置作业远程提交路径remoteJobSubmitDir;

               1.2、获取并设置作业远程配置文件remoteJobConfFile;

        2、调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo:

             通过SplitMetaInfoReader的静态方法readSplitMetaInfo(),从作业远程提交路径remoteJobSubmitDir中读取作业分片元数据信息,也就是每个任务的分片元数据信息,以此确定Map任务数、作业运行方式等一些列后续内容;

        3、确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks;

        4、确定Reduce Task数目numReduceTasks,取作业参数mapreduce.job.reduces,参数未配置默认为0;

        5、根据分片元数据信息计算输入长度inputLength,也就是作业大小;

        6、根据作业大小inputLength,调用作业的makeUberDecision()方法,决定作业运行模式是Uber模式还是Non-Uber模式:

              小作业会通过Uber模式运行,相反,大作业会通过Non-Uber模式运行,可参见《Yarn源码分析之MRAppMaster:作业运行方式Local、Uber、Non-Uber》一文!

        7、确定允许Map、Reduce任务失败百分比,取参数mapreduce.map.failures.maxpercent、mapreduce.reduce.failures.maxpercent,参数未配置均默认为0,即不允许Map和Reduce任务失败;

        8、创建Map Task;

        9、创建Reduce Task;

        10、返回作业内部状态,JobStateInternal.INITED,即已经初始化;

        11、如果出现异常:

                11.1、记录warn级别日志信息:Job init failed,并打印出具体异常;

                11.2、返回作业内部状态,JobStateInternal.NEW,即初始化失败后的新建;

         未完待续,后续作业初始化部分详细描述、作业启动、作业停止等内容,请关注《Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)》

时间: 2024-11-03 21:48:55

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)的相关文章

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)

        本文继<Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)>,接着讲述MapReduce作业在MRAppMaster上处理总流程,继上篇讲到作业初始化之后的作业启动,关于作业初始化主体流程的详细介绍,请参见<Yarn源码分析之MRAppMaster上MapReduce作业初始化解析>一文.         (三)启动         作业的启动是通过MRAppMaster的startJobs()方法实现的,其代码如下: /** * Th

HBase源码分析之HRegion上MemStore的flsuh流程(二)

        继上篇<HBase源码分析之HRegion上MemStore的flsuh流程(一)>之后,我们继续分析下HRegion上MemStore flush的核心方法internalFlushcache(),它的主要流程如图所示:         其中,internalFlushcache()方法的代码如下: /** * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of upda

HBase源码分析之HRegion上MemStore的flsuh流程(一)

        了解HBase架构的用户应该知道,HBase是一种基于LSM模型的分布式数据库.LSM的全称是Log-Structured Merge-Trees,即日志-结构化合并-树.相比于Oracle普通索引所采用的B+树,LSM模型的最大特点就是,在读写之间采取一种平衡,牺牲部分读数据的性能,来大幅度的提升写数据的性能.通俗的讲,HBase写数据如此快,正是由于基于LSM模型,将数据写入内存和日志文件后即立即返回.         但是,数据始终在内存和日志中是不妥当的,首先内存毕竟是有

Yarn源码分析之MRAppMaster:作业运行方式Local、Uber、Non-Uber

        基于作业大小因素,MRAppMaster提供了三种作业运行方式:本地Local模式.Uber模式.Non-Uber模式.其中,         1.本地Local模式:通常用于调试:         2.Uber模式:为降低小作业延迟而设计的一种模式,所有任务,不管是Map Task,还是Reduce Task,均在同一个Container中顺序执行,这个Container其实也是MRAppMaster所在Container:         3.Non-Uber模式:对于运行时

Yarn源码分析之如何确定作业运行方式Uber or Non-Uber?

        在MRAppMaster中,当MapReduce作业初始化时,它会通过作业状态机JobImpl中InitTransition的transition()方法,进行MapReduce作业初始化相关操作,而这其中就包括:         1.调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo:         2.确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多

HBase源码分析之HRegionServer上MemStore的flush处理流程(一)

        在<HBase源码分析之HRegion上MemStore的flsuh流程(一)>.<HBase源码分析之HRegion上MemStore的flsuh流程(二)>等文中,我们介绍了HRegion上Memstore flush的主体流程和主要细节.但是,HRegion只是HBase表中按照行的方向对一片连续的数据区域的抽象,它并不能对外提供单独的服务,供客户端或者HBase其它实体调用.而HRegion上MemStore的flush还是要通过HRegionServer来

HBase源码分析之HRegion上compact流程分析(三)

        在<HBase源码分析之HRegion上compact流程分析(二)>一文中,我们没有讲解真正执行合并的CompactionContext的compact()方法.现在我们来分析下它的具体实现.         首先,CompactionContext表示合并的上下文信息,它只是一个抽象类,其compact()并没有实现,代码如下: /** * Runs the compaction based on current selection. select/forceSelect

HBase源码分析之HRegionServer上MemStore的flush处理流程(二)

        继上篇文章<HBase源码分析之HRegionServer上MemStore的flush处理流程(一)>遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程,重点讲述下如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的.         我们先来看下第一个问题:如何选择一个HRegion进行flush以缓解MemStore压力.上文中我们讲到过flush处理线程如果从flus

从源码分析Android的Volley库的工作流程_Android

Volley现在已经被官方放到AOSP里面,已经逐步成为Android官方推荐的网络框架. 类抽象 对Http协议的抽象 Requeset顾名思义,对请求的封装,实现了Comparable接口,因为在Volley中是可以指定请求的优先级的,实现Comparable是为了在Request任务队列中进行排序,优先级高的Request会被优先调度执行.NetworkResponseHttp响应的封装,其中包括返回的状态码 头部 数据等.Response给调用者返回的结果封装,它比NetworkResp