Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)

        本文继《Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)》,接着讲述MapReduce作业在MRAppMaster上处理总流程,继上篇讲到作业初始化之后的作业启动,关于作业初始化主体流程的详细介绍,请参见《Yarn源码分析之MRAppMaster上MapReduce作业初始化解析》一文。

        (三)启动

        作业的启动是通过MRAppMaster的startJobs()方法实现的,其代码如下:

  /**
   * This can be overridden to instantiate multiple jobs and create a
   * workflow.
   *
   * TODO:  Rework the design to actually support this.  Currently much of the
   * job stuff has been moved to init() above to support uberization (MR-1220).
   * In a typical workflow, one presumably would want to uberize only a subset
   * of the jobs (the "small" ones), which is awkward with the current design.
   */
  @SuppressWarnings("unchecked")
  protected void startJobs() {
    /** create a job-start event to get this ball rolling */
	// 构造作业启动事件JobStartEvent实例startJobEvent
    JobEvent startJobEvent = new JobStartEvent(job.getID(),
        recoveredJobStartTime);
    /** send the job-start event. this triggers the job execution. */
    // 将作业启动事件JobStartEvent实例startJobEvent交由事件分发器dispatcher的事件处理器处理
    dispatcher.getEventHandler().handle(startJobEvent);
  }

        很简单,首先构造作业启动事件JobStartEvent实例startJobEvent,然后将作业启动事件JobStartEvent实例startJobEvent交由事件分发器dispatcher的事件处理器处理。我们首先看下事件分发器dispatcher是如何初始化的,其在MRAppMaster服务初始化的serviceInit()方法中,关键代码如下:

dispatcher = createDispatcher();

        再来看下createDispatcher()方法,如下:

  protected Dispatcher createDispatcher() {
    return new AsyncDispatcher();
  }

        就是创建一个AsyncDispatcher对象,其代表的是一个事件异步分发器AsyncDispatcher,我们曾经在《Yarn源码分析之事件异步分发器AsyncDispatcher》一文中专门介绍过这个AsyncDispatcher。AsyncDispatcher其实是一个生产者-消费者模型的事件异步分发器。在其内部有一个待分发事件队列eventQueue,并有一个GenericEventHandler类型的事件处理器handlerInstance,由其handle()方法负责将外部事件event添加到待分发队列eventQueue中,等到AsyncDispatcher中的消费者eventHandlingThread不断的获取待分发队列eventQueue中的事件,分发并交由之前注册的事件类型对应的事件处理器处理。关于这部分的内容请阅读《Yarn源码分析之事件异步分发器AsyncDispatcher》一文,此处不再做过多介绍。

        那么dispatcher中是如何注册JobStartEvent事件的处理器的呢?注册的事件处理器又是谁呢?还是在服务初始化的方法中,如下:

      //register the event dispatchers
      dispatcher.register(JobEventType.class, jobEventDispatcher);

        通过查看JobStartEvent的源码我们知道,JobStartEvent继承自JobEvent,它也是一种JobEvent,所以其处理会交给jobEventDispatcher来处理。细心的读者获取会发现,在此之前,dispatcher已经注册过一个JobEventType对应的事件处理器,NoopEventHandler类型的eater了,代码如下:

      NoopEventHandler eater = new NoopEventHandler();
      //We do not have a JobEventDispatcher in this path
      dispatcher.register(JobEventType.class, eater);

        我们先看下NoopEventHandler的定义,如下:

  /**
   * Eats events that are not needed in some error cases.
   */
  private static class NoopEventHandler implements EventHandler<Event> {

    @Override
    public void handle(Event event) {
      //Empty
    }
  }

        四个字,空空如也!那么,读者在这里可能就有疑问了,到底是由jobEventDispatcher还是eater来处理作业启动JobStartEvent事件内。这里要说的是,这两次注册实际上是形成了一个JobEventType事件类型的链式事件处理器,它会将事件挨个通过链式事件处理器中的每个处理器进行处理,这在《Yarn源码分析之事件异步分发器AsyncDispatcher》一文中的register()方法介绍中也提到过了,读者可自行查看。而这里,既然eater为空,不对事件做任何处理,我们还是看看jobEventDispatcher吧。

        那么,jobEventDispatcher是如何定义及初始化的呢?其实这个jobEventDispatcher在Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)中的作业初始化事件JobEventType.JOB_INIT处理时已经讲到过了,它就是一个JobEventDispatcher对象,这里再回顾一下,其定义如下:

  private class JobEventDispatcher implements EventHandler<JobEvent> {
    @SuppressWarnings("unchecked")
    @Override
    public void handle(JobEvent event) {
      // 从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件
      ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
    }
  }

        它实际上并不真正干活,而是从应用运行上下文信息context中根据jobId获取Job实例,即JobImpl对象,调用其handle()方法,处理对应事件。那么在处理作业初始化事件时我们也提到过了,它是根据作业状态机的doTransition()方法根据事件类型来处理的,关于作业状态机,我们这里还是不做介绍,你还是只要知道作业启动事件是通过JobImpl的静态内部类StartTransition的transition()方法来处理的就行,其代码如下:

  public static class StartTransition
  implements SingleArcTransition<JobImpl, JobEvent> {
    /**
     * This transition executes in the event-dispatcher thread, though it's
     * triggered in MRAppMaster's startJobs() method.
     */
    @Override
    public void transition(JobImpl job, JobEvent event) {
      JobStartEvent jse = (JobStartEvent) event;

      // 设置作业的起始时间startTime
      if (jse.getRecoveredJobStartTime() != 0) {
        job.startTime = jse.getRecoveredJobStartTime();
      } else {
        job.startTime = job.clock.getTime();
      }

      // 创建作业已初始化事件JobInitedEvent实例jie
      JobInitedEvent jie =
        new JobInitedEvent(job.oldJobId,
             job.startTime,
             job.numMapTasks, job.numReduceTasks,
             job.getState().toString(),
             job.isUber());

      // 将作业已初始化事件JobInitedEvent实例jie包装成作业历史事件JobHistoryEvent,并交给作业的事件处理器eventHandler处理
      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));

      // 创建作业信息变更事件JobInfoChangeEvent实例jice
      JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
          job.appSubmitTime, job.startTime);

      // 将作业信息变更事件JobInfoChangeEvent实例jice包装成作业历史事件JobHistoryEvent,并交给作业的事件处理器eventHandler处理
      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));

      // 调用作业度量指标metrics的runningJob()方法,标识作业已开始运行
      job.metrics.runningJob(job);

      // 构造提交作业Setup事件CommitterJobSetupEvent,并交由作业的事件处理器eventHandler处理
      job.eventHandler.handle(new CommitterJobSetupEvent(
              job.jobId, job.jobContext));
    }
  }

        去掉关于作业历史信息等不是十分关键的细节,整体主体流程如下:

        1、设置作业的起始时间startTime;

        2、构造提交作业Setup事件CommitterJobSetupEvent,并交由作业的事件处理器eventHandler处理。

        那么,作业的事件处理器eventHandler是什么呢?它又是如何处理提交作业Setup事件CommitterJobSetupEvent的呢?

        我们先看下作业的事件处理器eventHandler,在MRAppMaster服务启动时创建作业JobImpl实例时,eventHandler是通过传入的dispatcher.getEventHandler()来初始化的,基于上面的陈述,这我们就不用讲了吧。

        我们还是看下dispatcher是如何注册事件CommitterJobSetupEvent对应的事件处理器的吧,代码如下:

     dispatcher.register(CommitterEventType.class, committerEventHandler);

        我们知道,CommitterJobSetupEvent继承自CommitterEvent,所以它实际上是通过committerEventHandler来处理的,那么什么是committerEventHandler呢?其初始化如下:

      //service to handle the output committer
      committerEventHandler = createCommitterEventHandler(context, committer);

        通过调用createCommitterEventHandler()方法,构造了一个CommitterEventHandler实例,如下:

  protected EventHandler<CommitterEvent> createCommitterEventHandler(
      AppContext context, OutputCommitter committer) {
    return new CommitterEventHandler(context, committer,
        getRMHeartbeatHandler(), jobClassLoader);
  }

        关于CommitterEventHandler的介绍,我们后续会写相关文章进行详细的介绍,这里,你只要知道,它类似AsyncDispatcher,也是一个生产者-消费者模式的事件分发器,而最终是通过其内部EventProcessor类型的事件处理线程eventHandlingThread来处理的,在EventProcessor中,有针对JOB_SETUP事件处理的逻辑,关键代码如下:

      switch (event.getType()) {
      case JOB_SETUP:
        handleJobSetup((CommitterJobSetupEvent) event);
        break;

        继续追踪handleJobSetup()方法,如下:

    @SuppressWarnings("unchecked")
    protected void handleJobSetup(CommitterJobSetupEvent event) {
      try {
        committer.setupJob(event.getJobContext());
        context.getEventHandler().handle(
            new JobSetupCompletedEvent(event.getJobID()));
      } catch (Exception e) {
        LOG.warn("Job setup failed", e);
        context.getEventHandler().handle(new JobSetupFailedEvent(
            event.getJobID(), StringUtils.stringifyException(e)));
      }
    }

        它做了两件事情,如下:

        1、调用committer的setupJob()方法处理该CommitterJobSetupEvent事件;

        2、又构造了一个JobSetupCompletedEvent事件,交由应用运行上下文context的事件处理器进行处理。

        而首先要说的是,committer、context均是由MRAppMaster在创建CommitterEventHandler时传入的,其对应的对象类型分别是:

        1、committer:

              1.1、新版API是通过OutputFormat组件的getOutputCommitter()方法获取的;

              1.2、旧版API是通过参数mapred.output.committer.class获取的,参数未配置默认为FileOutputCommitter。

        2、context:RunningAppContext。

        对于committer,我们这里以较为通用的FileOutputCommitter为例,看下其setupJob()方法,如下:

  /**
   * Create the temporary directory that is the root of all of the task
   * work directories.
   * @param context the job's context
   */
  public void setupJob(JobContext context) throws IOException {
    if (hasOutputPath()) {
      Path jobAttemptPath = getJobAttemptPath(context);
      FileSystem fs = jobAttemptPath.getFileSystem(
          context.getConfiguration());
      if (!fs.mkdirs(jobAttemptPath)) {
        LOG.error("Mkdirs failed to create " + jobAttemptPath);
      }
    } else {
      LOG.warn("Output Path is null in setupJob()");
    }
  }

        实际上就做了一件事情,创建作业中所有任务工作的临时根目录。

        再来看下context是如何处理JobSetupCompletedEvent的,还记得之前我们讲述的,RunningAppContext实际上什么都不干,而是交给了JobImpl对应的作业状态机了吗?我们就看下JobImpl中是如何处理JobSetupCompletedEvent事件的,其对应的处理在其静态内部类SetupCompletedTransition的transition()中,代码如下:

  private static class SetupCompletedTransition
      implements SingleArcTransition<JobImpl, JobEvent> {
    @Override
    public void transition(JobImpl job, JobEvent event) {

      // 通过设置作业的setupProgress为1,标记作业setup已完成
      job.setupProgress = 1.0f;

      // 调度作业的Map Task
      job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
      // 调度作业的Reduce Task
      job.scheduleTasks(job.reduceTasks, true);

      // If we have no tasks, just transition to job completed
      // 如果没有task了,则生成JOB_COMPLETED事件并交由作业的事件处理器eventHandler进行处理
      if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
        job.eventHandler.handle(new JobEvent(job.jobId,
            JobEventType.JOB_COMPLETED));
      }
    }
  }

        是不是很简单,而且也很理所当然?处理流程如下:

        1、通过设置作业的setupProgress为1,标记作业setup已完成;

        2、调度作业的Map Task;

        3、调度作业的Reduce Task;

        4、如果没有task了,则生成JOB_COMPLETED事件并交由作业的事件处理器eventHandler进行处理。

        未完待续,后续作业启动部分内容详细描述、作业停止等内容,请关注《Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(三)》。

 

时间: 2025-01-31 01:59:47

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)的相关文章

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

        我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行.如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点.         通过MRApp

HBase源码分析之HRegion上MemStore的flsuh流程(二)

        继上篇<HBase源码分析之HRegion上MemStore的flsuh流程(一)>之后,我们继续分析下HRegion上MemStore flush的核心方法internalFlushcache(),它的主要流程如图所示:         其中,internalFlushcache()方法的代码如下: /** * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of upda

HBase源码分析之HRegion上MemStore的flsuh流程(一)

        了解HBase架构的用户应该知道,HBase是一种基于LSM模型的分布式数据库.LSM的全称是Log-Structured Merge-Trees,即日志-结构化合并-树.相比于Oracle普通索引所采用的B+树,LSM模型的最大特点就是,在读写之间采取一种平衡,牺牲部分读数据的性能,来大幅度的提升写数据的性能.通俗的讲,HBase写数据如此快,正是由于基于LSM模型,将数据写入内存和日志文件后即立即返回.         但是,数据始终在内存和日志中是不妥当的,首先内存毕竟是有

Yarn源码分析之MRAppMaster:作业运行方式Local、Uber、Non-Uber

        基于作业大小因素,MRAppMaster提供了三种作业运行方式:本地Local模式.Uber模式.Non-Uber模式.其中,         1.本地Local模式:通常用于调试:         2.Uber模式:为降低小作业延迟而设计的一种模式,所有任务,不管是Map Task,还是Reduce Task,均在同一个Container中顺序执行,这个Container其实也是MRAppMaster所在Container:         3.Non-Uber模式:对于运行时

Yarn源码分析之如何确定作业运行方式Uber or Non-Uber?

        在MRAppMaster中,当MapReduce作业初始化时,它会通过作业状态机JobImpl中InitTransition的transition()方法,进行MapReduce作业初始化相关操作,而这其中就包括:         1.调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo:         2.确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多

HBase源码分析之HRegionServer上MemStore的flush处理流程(一)

        在<HBase源码分析之HRegion上MemStore的flsuh流程(一)>.<HBase源码分析之HRegion上MemStore的flsuh流程(二)>等文中,我们介绍了HRegion上Memstore flush的主体流程和主要细节.但是,HRegion只是HBase表中按照行的方向对一片连续的数据区域的抽象,它并不能对外提供单独的服务,供客户端或者HBase其它实体调用.而HRegion上MemStore的flush还是要通过HRegionServer来

HBase源码分析之HRegion上compact流程分析(三)

        在<HBase源码分析之HRegion上compact流程分析(二)>一文中,我们没有讲解真正执行合并的CompactionContext的compact()方法.现在我们来分析下它的具体实现.         首先,CompactionContext表示合并的上下文信息,它只是一个抽象类,其compact()并没有实现,代码如下: /** * Runs the compaction based on current selection. select/forceSelect

HBase源码分析之HRegionServer上MemStore的flush处理流程(二)

        继上篇文章<HBase源码分析之HRegionServer上MemStore的flush处理流程(一)>遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程,重点讲述下如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的.         我们先来看下第一个问题:如何选择一个HRegion进行flush以缓解MemStore压力.上文中我们讲到过flush处理线程如果从flus

从源码分析Android的Volley库的工作流程_Android

Volley现在已经被官方放到AOSP里面,已经逐步成为Android官方推荐的网络框架. 类抽象 对Http协议的抽象 Requeset顾名思义,对请求的封装,实现了Comparable接口,因为在Volley中是可以指定请求的优先级的,实现Comparable是为了在Request任务队列中进行排序,优先级高的Request会被优先调度执行.NetworkResponseHttp响应的封装,其中包括返回的状态码 头部 数据等.Response给调用者返回的结果封装,它比NetworkResp