Yarn源码分析之参数mapreduce.job.reduce.slowstart.completedmaps介绍

        mapreduce.job.reduce.slowstart.completedmaps是MapReduce编程模型中的一个参数,这个参数的含义是,当Map Task完成的比例达到该值后才会为Reduce Task申请资源,默认是0.05,其在接口MRJobConfig中表示如下:

  // 当Map Task完成的比例达到该值后才会为Reduce Task申请资源,默认是0.05
  public static final String COMPLETED_MAPS_FOR_REDUCE_SLOWSTART = "mapreduce.job.reduce.slowstart.completedmaps";

        那么这个参数在Yarn中是如何使用的呢?本文我们将解答这个问题。

        既然这个参数的含义是当Map Task完成的比例达到该值后才会为Reduce Task申请资源,那么在Yarn中关于资源分配申请服务的RMContainerAllocator中,自然会用到它。在服务初始化的serviceInit()方法中,有如下代码:

    // reduceSlowStart取参数mapreduce.job.reduce.slowstart.completedmaps,默认为0.05,
    // 其代表当Map Task完成的比例达到该值后才会为Reduce Task申请资源
    reduceSlowStart = conf.getFloat(
        MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
        DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART);

        这个reduceSlowStart就代表当Map Task完成的比例达到该值后才会为Reduce Task申请资源,它取参数mapreduce.job.reduce.slowstart.completedmaps,参数未配置时默认为0.05。而在进行Reduce任务相关资源申请调度时,会传入这个reduceSlowStart,如下:

      scheduleReduces(
          getJob().getTotalMaps(), completedMaps,
          scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
          assignedRequests.maps.size(), assignedRequests.reduces.size(),
          mapResourceRequest, reduceResourceRequest,
          pendingReduces.size(),
          maxReduceRampupLimit, reduceSlowStart);

        scheduleReduces()方法是专门处理Reduce任务相关资源申请调度的,其中对于reduceSlowStart是按照以下方式进行处理的,如下:

    //check for slow start
    // 在Reduce调度尚未启动时,即标志位reduceStarted为false时
    if (!getIsReduceStarted()) {//not set yet

      // 计算Reduce Task启动时最低要求完成的Map Task数目completedMapsForReduceSlowstar,
      // 计算公式为reduceSlowStart * totalMaps,向上取整,totalMaps表示Map Task总数目
      int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart *
                      totalMaps);

      // 如果已完成Map Task数目completedMaps小于上述要求完成的Map Task数目completedMapsForReduceSlowstart,
      // 记录info级别日志信息:Reduce slow start threshold not met.completedMapsForReduceSlowstart ?
      // 即Reduce任务最低启动门槛没有满足,并输出最低启动门槛,即要求已完成的Map Task数目:completedMapsForReduceSlowstart数目,然后返回,不进行资源申请调度
      if(completedMaps < completedMapsForReduceSlowstart) {
        LOG.info("Reduce slow start threshold not met. " +
              "completedMapsForReduceSlowstart " +
            completedMapsForReduceSlowstart);
        return;
      } else {
    	// 如果达到了最低启动门槛,同样记录info级别日志信息:Reduce slow start threshold reached. Scheduling reduces.
    	// 即Reduce最低启动门槛已达到,开始调度Reduce
        LOG.info("Reduce slow start threshold reached. Scheduling reduces.");
        // 并设置标志位reduceStarted为true,即该Reduce资源申请已被调度
        setIsReduceStarted(true);
      }
    }

        我们看到,在Reduce调度尚未启动时,即标志位reduceStarted为false时:

        1、首先计算Reduce Task启动时最低要求完成的Map Task数目completedMapsForReduceSlowstar,计算公式为reduceSlowStart * totalMaps,向上取整,totalMaps表示Map Task总数目;

        2、如果已完成Map Task数目completedMaps小于上述要求完成的Map Task数目completedMapsForReduceSlowstart,记录info级别日志信息:Reduce slow start threshold not met.completedMapsForReduceSlowstart * ,即表示Reduce任务最低启动门槛没有满足,并输出最低启动门槛,即要求已完成的Map Task数目:completedMapsForReduceSlowstart数目,然后返回,不进行资源申请调度;

        3、如果达到了最低启动门槛,同样记录info级别日志信息:Reduce slow start threshold reached. Scheduling reduces.即Reduce最低启动门槛已达到,开始调度Reduce,并设置标志位reduceStarted为true,即该Reduce资源申请已被调度。

        需要特别注意的是,在JobImpl中,如果处于Uber模式下,会将mapreduce.job.reduce.slowstart.completedmaps参数设置为1,这很好理解,因为不管Map Task,还是Reduce Task,均是串行执行的,所以当Map Task完成的比例达到多少值后才会为Reduce Task申请资源,这个值百分百应该是1。处理该参数相关代码如下:

    if (isUber) {
      LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
          + numReduceTasks + "r tasks (" + dataInputLength
          + " input bytes) will run sequentially on single node.");

      // make sure reduces are scheduled only after all map are completed
      conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
                        1.0f);
    //......省略后续相关代码
时间: 2024-09-11 13:29:19

Yarn源码分析之参数mapreduce.job.reduce.slowstart.completedmaps介绍的相关文章

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)

        本文继<Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)>,接着讲述MapReduce作业在MRAppMaster上处理总流程,继上篇讲到作业初始化之后的作业启动,关于作业初始化主体流程的详细介绍,请参见<Yarn源码分析之MRAppMaster上MapReduce作业初始化解析>一文.         (三)启动         作业的启动是通过MRAppMaster的startJobs()方法实现的,其代码如下: /** * Th

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

        我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行.如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点.         通过MRApp

Yarn源码分析之如何确定作业运行方式Uber or Non-Uber?

        在MRAppMaster中,当MapReduce作业初始化时,它会通过作业状态机JobImpl中InitTransition的transition()方法,进行MapReduce作业初始化相关操作,而这其中就包括:         1.调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo:         2.确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多

Yarn源码分析之MRAppMaster:作业运行方式Local、Uber、Non-Uber

        基于作业大小因素,MRAppMaster提供了三种作业运行方式:本地Local模式.Uber模式.Non-Uber模式.其中,         1.本地Local模式:通常用于调试:         2.Uber模式:为降低小作业延迟而设计的一种模式,所有任务,不管是Map Task,还是Reduce Task,均在同一个Container中顺序执行,这个Container其实也是MRAppMaster所在Container:         3.Non-Uber模式:对于运行时

Yarn源码分析之MapReduce作业中任务Task调度整体流程(一)

        v2版本的MapReduce作业中,作业JOB_SETUP_COMPLETED事件的发生,即作业SETUP阶段完成事件,会触发作业由SETUP状态转换到RUNNING状态,而作业状态转换中涉及作业信息的处理,是由SetupCompletedTransition来完成的,它主要做了四件事:         1.通过设置作业Job的成员变量setupProgress为1,标记作业setup已完成:         2.调度作业Job的Map Task:         3.调度作业的

Yarn源码分析之事件异步分发器AsyncDispatcher

        AsyncDispatcher是Yarn中事件异步分发器,它是ResourceManager中的一个基于阻塞队列的分发或者调度事件的组件,其在一个特定的单线程中分派事件,交给AsyncDispatcher中之前注册的针对该事件所属事件类型的事件处理器EventHandler来处理.每个事件类型类可能会有多个处理渠道,即多个事件处理器,可以使用一个线程池调度事件.在Yarn的主节点ResourceManager中,就有一个Dispatcher类型的成员变量rmDispatcher,

MapReduce源码分析之新API作业提交(二):连接集群

         MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下: private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { // 如果cluster为null,构造Cluster实例cluster, // Cluster为连接MapReduce集群的一种工

MapReduce源码分析之JobSubmitter(一)

        JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑.本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter.         首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 pr

MapReduce源码分析之LocatedFileStatusFetcher

        LocatedFileStatusFetcher是MapReduce中一个针对给定输入路径数组,使用配置的线程数目来获取数据块位置的实用类.它的主要作用就是利用多线程技术,每个线程对应一个任务,每个任务针对给定输入路径数组Path[],解析出文件状态列表队列BlockingQueue<List<FileStatus>>.其中,输入数据输入路径只不过是一个Path,而输出数据则是文件状态列表队列BlockingQueue<List<FileStatus&g