MapReduce源码分析之JobSubmitter(一)

        JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑。本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter。

        首先,我们先看下JobSubmitter的类成员变量,如下:

  // 文件系统FileSystem实例
  private FileSystem jtFs;
  // 客户端通信协议ClientProtocol实例
  private ClientProtocol submitClient;
  // 提交作业的主机名
  private String submitHostName;
  // 提交作业的主机地址
  private String submitHostAddress;

        它一共有四个类成员变量,分别为:

        1、文件系统FileSystem实例jtFs:用于操作作业运行需要的各种文件等;

        2、客户端通信协议ClientProtocol实例submitClient:用于与集群交互,完成作业提交、作业状态查询等;

        3、提交作业的主机名submitHostName;

        4、提交作业的主机地址submitHostAddress。

        其中,客户端通信协议ClientProtocol实例submitClient是通过Cluster的客户端通信协议ClientProtocol实例client来赋值的,我们在《MapReduce源码分析之新API作业提交(二):连接集群》一文中曾经提到过,它根据MapReduce中参数mapreduce.framework.name的配置为yarn或local,有Yarn模式的YARNRunner和Local模式的LocalJobRunner两种情况。
        接下来,我们再看下JobSubmitter的构造函数,如下:

  JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)
  throws IOException {

	// 根据入参赋值成员变量submitClient、jtFs
    this.submitClient = submitClient;
    this.jtFs = submitFs;
  }

        很简单,根据入参赋值成员变量submitClient、jtFs而已。

        关键的来了,我们看下JobSubmitter唯一的对外核心功能方法submitJobInternal(),它被用于提交作业至集群,代码如下:

  /**
   * Internal method for submitting jobs to the system.
   *
   * <p>The job submission process involves:
   * <ol>
   *   <li>
   *   Checking the input and output specifications of the job.
   *   </li>
   *   <li>
   *   Computing the {@link InputSplit}s for the job.
   *   </li>
   *   <li>
   *   Setup the requisite accounting information for the
   *   {@link DistributedCache} of the job, if necessary.
   *   </li>
   *   <li>
   *   Copying the job's jar and configuration to the map-reduce system
   *   directory on the distributed file-system.
   *   </li>
   *   <li>
   *   Submitting the job to the <code>JobTracker</code> and optionally
   *   monitoring it's status.
   *   </li>
   * </ol></p>
   * @param job the configuration to submit
   * @param cluster the handle to the Cluster
   * @throws ClassNotFoundException
   * @throws InterruptedException
   * @throws IOException
   */
  JobStatus submitJobInternal(Job job, Cluster cluster)
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs
	// 调用checkSpecs()方法,校验作业输出路径是否配置,且是否已存在,
	// 正确的情况应该是已配置且未存在,输出路径配置参数为mapreduce.output.fileoutputformat.outputdir,
	// 之前WordCount作业的输出路径配置为hdfs://nameservice1/output/output
    checkSpecs(job);

    // 从作业job中获取配置信息conf
    Configuration conf = job.getConfiguration();

    // 调用addMRFrameworkToDistributedCache()方法添加应用框架路径到分布式缓存中
    addMRFrameworkToDistributedCache(conf);

    // 通过JobSubmissionFiles的getStagingDir()静态方法获取作业执行时阶段区域路径jobStagingArea
    // 取参数yarn.app.mapreduce.am.staging-dir,参数未配置默认为/tmp/hadoop-yarn/staging
	// 然后后面是/提交作业用户名/.staging
	// 通过之前的WordCount任务的执行,我们查看历史记录,得知参数yarn.app.mapreduce.am.staging-dir配置的为/user,
	// 而提交作业用户名为hdfs,所以完整的路径应该为/user/hdfs/.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    //configure the command line options correctly on the submitting dfs
    // 获取当前本机地址
    InetAddress ip = InetAddress.getLocalHost();

    // 确定提交作业的主机地址、主机名,并设置入配置信息conf,对应参数分别为
    // mapreduce.job.submithostname
    // mapreduce.job.submithostaddress
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }

    // 生成作业ID,即JobID实例jobId
    JobID jobId = submitClient.getNewJobID();

    // 将jobId设置入job
    job.setJobID(jobId);

    // 构造提交作业路径Path实例submitJobDir,jobStagingArea后接/jobId,比如/job_1459913635503_0005
    // 之前WordCount作业的完整路径为/user/hdfs/.staging/job_1459913635503_0005
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;

    // 设置作业一些参数:
    try {

      // 设置mapreduce.job.user.name为当前用户,之前的WordCount示例配置的为hdfs用户
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());

      // 设置hadoop.http.filter.initializers为AmFilterInitializer
      conf.set("hadoop.http.filter.initializers",
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");

      // 设置mapreduce.job.dir为submitJobDir,比如/user/hdfs/.staging/job_1459913635503_0005
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir
          + " as the submit dir");

      // get delegation token for the dir
      // 获取路径的授权令牌:调用TokenCache的obtainTokensForNamenodes()静态方法
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);

      // 获取密钥和令牌,并将它们存储到令牌缓存TokenCache中
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {

          int keyLen = CryptoUtils.isShuffleEncrypted(conf)
              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
              : SHUFFLE_KEY_LENGTH;
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(keyLen);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }

      // 复制并且配置相关文件
      copyAndConfigureFiles(job, submitJobDir);

      // 获取配置文件路径:job.xml
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

      // 调用writeSplits()方法,写分片数据文件job.split和分片元数据文件job.splitmetainfo,
      // 并获得计算得到的map任务数目maps
      int maps = writeSplits(job, submitJobDir);

      // 配置信息中设置map任务数目mapreduce.job.maps为上面得到的maps
      conf.setInt(MRJobConfig.NUM_MAPS, maps);

      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.

      // 获取作业队列名queue,取参数mapreduce.job.queuename,参数未配置默认为default,
      // 之前的WordCount任务示例中,作业队列名queue就为default
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);

      // 获取队列的访问权限控制列表AccessControlList实例acl,通过客户端通信协议ClientProtocol实例submitClient的getQueueAdmins()方法,传入队列名queue,
      // 实际上之前的WordCount任务示例中,这里获取的是*
      AccessControlList acl = submitClient.getQueueAdmins(queue);

      // 配置信息中设置队列参数mapred.queue.default.acl-administer-jobs
      // 之前的WordCount任务示例中,该参数被设置成为*
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      // 清空缓存的令牌
      TokenCache.cleanUpTokenReferral(conf);

      // 根据参数确定是否需要追踪令牌ID
      // 取参数mapreduce.job.token.tracking.ids.enabled,参数未配置默认为false
      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {

    	// 通过job获取令牌ID,并存储到trackingIds列表中
    	// Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }

        // 将trackingIds列表中的内容设置到参数mapreduce.job.token.tracking.ids中
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      // 如有必要,设置存在的预订信息
      // 参数为mapreduce.job.reservation.id
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      // 调用writeConf()方法,写入作业配置信息至文件job.xml
      writeConf(conf, submitJobFile);

      //
      // Now, actually submit the job (using the submit name)
      // 调用printTokens()方法打印令牌信息到Log文件
      printTokens(jobId, job.getCredentials());

      // 通过客户端通信协议ClientProtocol实例submitClient的submitJob()方法提交作业,
      // 并获取作业状态JobStatus实例status
      // 由集群连接一文的分析我们可以知道,这个submitClient实际上是YARNRunner或LocalJobRunner对象,
      // 最终调用的是二者的submitJob()方法,我们留待以后分析
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());

      // 如果作业状态JobStatus实例status不为null,直接返回,否则抛出无法加载作业的IO异常
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {

      // 最终,抛出无法加载作业的IO异常前,调用文件系统FileSystem实例jtFs的delete()方法,
      // 删除作业提交的相关目录或文件submitJobDir
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

        submitJobInternal()方法篇幅比较长,逻辑也很复杂,本文先介绍下它的大体逻辑,后续分文会介绍各个环节的详细内容,且下面涉及到的之前WordCount作业示例在《Hadoop2.6.0版本MapReudce示例之WordCount(一)》及其姊妹篇中,敬请注意!submitJobInternal()方法大体逻辑如下:

        1、调用checkSpecs()方法,校验作业输出路径是否配置,且是否已存在:

              正确的情况应该是已配置且未存在,输出路径配置参数为mapreduce.output.fileoutputformat.outputdir,之前WordCount作业的输出路径配置为hdfs://nameservice1/output/output;

        2、从作业job中获取配置信息conf;

        3、调用addMRFrameworkToDistributedCache()方法添加应用框架路径到分布式缓存中;

        4、通过JobSubmissionFiles的getStagingDir()静态方法获取作业执行时阶段区域路径jobStagingArea:

              取参数yarn.app.mapreduce.am.staging-dir,参数未配置默认为/tmp/hadoop-yarn/staging,然后后面是/提交作业用户名/.staging,通过之前的WordCount任务的执行,我们查看历史记录,得知参数yarn.app.mapreduce.am.staging-dir配置的为/user,而提交作业用户名为hdfs,所以完整的路径应该为/user/hdfs/.staging;

        5、获取当前本机地址ip;

        6、确定提交作业的主机地址、主机名,并设置入配置信息conf,对应参数分别为mapreduce.job.submithostname、mapreduce.job.submithostaddress;

        7、生成作业ID,即JobID实例jobId:

              通过客户端通信协议ClientProtocol实例submitClient的getNewJobID()方法生成作业ID,即JobID实例jobId;

        8、 将jobId设置入job;

        9、构造提交作业路径Path实例submitJobDir:

               jobStagingArea后接/jobId,比如/job_1459913635503_0005,之前WordCount作业的完整路径为/user/hdfs/.staging/job_1459913635503_0005;

        10、设置作业一些参数:

                 10.1、设置mapreduce.job.user.name为当前用户,之前的WordCount示例配置的为hdfs用户;

                 10.2、设置hadoop.http.filter.initializers为AmFilterInitializer;

                 10.3、设置mapreduce.job.dir为submitJobDir,比如/user/hdfs/.staging/job_1459913635503_0005;

        11、获取路径的授权令牌:调用TokenCache的obtainTokensForNamenodes()静态方法;

        12、通过populateTokenCache()方法获取密钥和令牌,并将它们存储到令牌缓存TokenCache中;

        14、复制并且配置相关文件:通过copyAndConfigureFiles()方法实现;

        15、获取配置文件路径:job.xml;

        16、调用writeSplits()方法,写分片数据文件job.split和分片元数据文件job.splitmetainfo,并获得计算得到的map任务数目maps;

        17、配置信息中设置map任务数目mapreduce.job.maps为上面得到的maps;

        18、获取作业队列名queue,取参数mapreduce.job.queuename,参数未配置默认为default,之前的WordCount任务示例中,作业队列名queue就为default;

        19、获取队列的访问权限控制列表AccessControlList实例acl:

                通过客户端通信协议ClientProtocol实例submitClient的getQueueAdmins()方法,传入队列名queue,实际上之前的WordCount任务示例中,这里获取的是*;

        20、配置信息中设置队列参数mapred.queue.default.acl-administer-jobs,之前的WordCount任务示例中,该参数被设置成为*;

        21、清空缓存的令牌:通过TokenCache的cleanUpTokenReferral()方法实现;

        22、根据参数确定是否需要追踪令牌ID,如果需要的话:

                取参数mapreduce.job.token.tracking.ids.enabled,参数未配置默认为false,通过job获取令牌ID,并存储到trackingIds列表中,将trackingIds列表中的内容设置到参数mapreduce.job.token.tracking.ids中;

        23、如有必要,设置存在的预订信息:参数为mapreduce.job.reservation.id;

        24、调用writeConf()方法,写入作业配置信息至文件job.xml;

        25、调用printTokens()方法打印令牌信息到Log文件;

        26、通过客户端通信协议ClientProtocol实例submitClient的submitJob()方法提交作业,并获取作业状态JobStatus实例status:

                由集群连接一文的分析我们可以知道,这个submitClient实际上是YARNRunner或LocalJobRunner对象,最终调用的是二者的submitJob()方法,我们留待以后分析;

        27、如果作业状态JobStatus实例status不为null,直接返回,否则抛出无法加载作业的IO异常:

                最终,抛出无法加载作业的IO异常前,调用文件系统FileSystem实例jtFs的delete()方法,删除作业提交的相关目录或文件submitJobDir。

        整体流程如上,对于关键步骤的主要细节,限于篇幅,敬请关注《MapReduce源码分析之JobSubmitter(二)》!

时间: 2024-09-17 04:48:28

MapReduce源码分析之JobSubmitter(一)的相关文章

MapReduce源码分析之新API作业提交(二):连接集群

         MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下: private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { // 如果cluster为null,构造Cluster实例cluster, // Cluster为连接MapReduce集群的一种工

MapReduce源码分析之LocatedFileStatusFetcher

        LocatedFileStatusFetcher是MapReduce中一个针对给定输入路径数组,使用配置的线程数目来获取数据块位置的实用类.它的主要作用就是利用多线程技术,每个线程对应一个任务,每个任务针对给定输入路径数组Path[],解析出文件状态列表队列BlockingQueue<List<FileStatus>>.其中,输入数据输入路径只不过是一个Path,而输出数据则是文件状态列表队列BlockingQueue<List<FileStatus&g

MapReduce源码分析之JobSplitWriter

        JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo.它有两个静态成员变量,如下: // 分片版本,当前默认为1 private static final int splitVersion = JobSplit.META_SPLIT_VERSION; // 分片文件头部,为UTF-8格式的字符串"SPL"的字节数组"SPL" private static

MapReduce源码分析之作业Job状态机解析(一)简介与正常流程浅析

        作业Job状态机维护了MapReduce作业的整个生命周期,即从提交到运行结束的整个过程.Job状态机被封装在JobImpl中,其主要包括14种状态和19种导致状态发生的事件.         作业Job的全部状态维护在类JobStateInternal中,如下所示: public enum JobStateInternal { // 作业新建状态,当作业Job被新创建时所处的状态 NEW, // 作业启动状态,此时运行时间已被设置,任务处于开始被调度阶段 SETUP, // 作

MapReduce源码分析之InputFormat

        InputFormat描述了一个Map-Reduce作业中的输入规范.Map-Reduce框架依靠作业的InputFormat实现以下内容:         1.校验作业的输入规范:         2.分割输入文件(可能为多个),生成逻辑输入分片InputSplit(往往为多个),每个输入分片InputSplit接着被分配给单独的Mapper:         3.提供记录读取器RecordReader的实现,RecordReader被用于从逻辑输入分片InputSplit收集

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

        我们知道,MapReduce有三层调度模型,即Job-->Task-->TaskAttempt,并且:         1.通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业.JobSetup Task等复杂的情况这里不做考虑):         2.每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task

Hadoop2源码分析-MapReduce篇

1.概述 前面我们已经对Hadoop有了一个初步认识,接下来我们开始学习Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天为大家分享的是mapreduce部分,其内容目录如下所示: MapReduce V1 MapReduce V2 MR V1和MR V2的区别 MR V2的重构思路 本篇文章的源码是基于hadoop-2.6.0-src.tar.gz来完成的.代码下载地址,请参考<Hadoop2源码分析-准备篇>. 2.MapReduce V

《MapReduce 2.0源码分析与编程实战》一导读

前 言 MapReduce 2.0源码分析与编程实战 我们处于一个数据大爆炸的时代.每时每刻.各行各业都在产生和积累海量的数据内容.这些数据中蕴含着进行业务活动.获取商业信息.做出管理决策的重要信息.如何处理这些数据并获取有价值的信息,是众多组织和单位面临的共同问题.而这个问题的解决又依赖两项技术,一是能够对产生的业务数据进行统一管理和综合,并且能够无限扩展存储空间:二是能够有效处理获得的海量数据,在限定时间内获得处理结果的处理程序. 因此,寻求一个合理可靠的大数据处理决方案是目前数据处理的热点

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)

        本文继<Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)>,接着讲述MapReduce作业在MRAppMaster上处理总流程,继上篇讲到作业初始化之后的作业启动,关于作业初始化主体流程的详细介绍,请参见<Yarn源码分析之MRAppMaster上MapReduce作业初始化解析>一文.         (三)启动         作业的启动是通过MRAppMaster的startJobs()方法实现的,其代码如下: /** * Th