MapReduce源码分析之新API作业提交(二):连接集群

         MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下:

  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {

	// 如果cluster为null,构造Cluster实例cluster,
	// Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法
	if (cluster == null) {
      cluster =
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException,
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

        这个方法用synchronized关键字标识,处理逻辑为:如果cluster为null,构造Cluster实例cluster。

        Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法,我们看下它的成员变量,如下所示:

  // 客户端通信协议提供者
  private ClientProtocolProvider clientProtocolProvider;
  // 客户端通信协议实例
  private ClientProtocol client;

  // 用户信息
  private UserGroupInformation ugi;

  // 配置信息
  private Configuration conf;

  // 文件系统实例
  private FileSystem fs = null;

  // 系统路径
  private Path sysDir = null;

  // 阶段区域路径
  private Path stagingAreaDir = null;

  // 作业历史路径
  private Path jobHistoryDir = null;

  // 日志
  private static final Log LOG = LogFactory.getLog(Cluster.class);

  // 客户端通信协议提供者加载器
  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
      ServiceLoader.load(ClientProtocolProvider.class);

        Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider,客户端通信协议ClientProtocol实例client,而后者是依托前者的create()方法生成的。

        Cluster提供了两个构造函数,如下:

  public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {
<span style="white-space:pre">	</span>// 设置配置信息
    this.conf = conf;

    // 获取当前用户
    this.ugi = UserGroupInformation.getCurrentUser();

    // 调用initialize()方法完成初始化
    initialize(jobTrackAddr, conf);
  }

        最终会调用initialize()方法完成初始化,代码如下:

  // 确定客户端ClientProtocol实例client
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {

      // 取出每个ClientProtocolProvider实例provider,通过其create()方法,
      // 构造ClientProtocol实例clientProtocol,
      // 并将两者赋值给对类应成员变量,退出循环
      for (ClientProtocolProvider provider : frameworkLoader) {

    	LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());

    	ClientProtocol clientProtocol = null; 

    	try {

          // 通过ClientProtocolProvider的create()方法,获取客户端与集群通讯ClientProtocol实例clientProtocol
          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          // 设置类成员变量clientProtocolProvider、client,并退出循环
          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;

            // 记录debug级别日志信息
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {

        	// 记录debug级别日志信息
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        }
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

    // 如果clientProtocolProvider、client任一为空,直接抛出IO异常
    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }

        initialize()方法唯一的一个任务就是确定客户端通信协议提供者clientProtocolProvider,并通过其create()方法构造客户端通信协议ClientProtocol实例client。

        MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。

        我们先看下Yarn模式,看下YarnClientProtocolProvider的create()方法,代码如下:

  @Override
  public ClientProtocol create(Configuration conf) throws IOException {

	// 如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null
    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
      return new YARNRunner(conf);
    }
    return null;
  }

        Yarn模式下,如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null,关于YARNRunner,我们待会再讲,我们接着再看下Local模式,LocalClientProtocolProvider的create()方法,代码如下:

  @Override
  public ClientProtocol create(Configuration conf) throws IOException {

	// 初始化framework:取参数mapreduce.framework.name,参数未配置默认为local
	String framework =
        conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);

	// 如果framework是local,,则返回LocalJobRunner实例,并设置map任务数量为1,否则返回null
    if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
      return null;
    }
    conf.setInt(JobContext.NUM_MAPS, 1);

    return new LocalJobRunner(conf);
  }

        Local模式也是需要看参数mapreduce.framework.name的配置是否为local,是的话,返回LocalJobRunner实例,并设置map任务数量为1,否则返回null,值得一提的是,这里参数mapreduce.framework.name未配置的话,默认为local,也就是说,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。

        到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner,记住这点,对透彻了解MapReduce作业提交的整体流程非常重要。

        好了,我们继续以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现,先看下它的成员变量,如下:

  // 记录工厂RecordFactory实例
  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

  // ResourceManager代理ResourceMgrDelegate实例
  private ResourceMgrDelegate resMgrDelegate;

  // 客户端缓存ClientCache实例
  private ClientCache clientCache;

  // 配置信息Configuration实例
  private Configuration conf;

  // 文件上下文FileContext实例
  private final FileContext defaultFileContext;

        其中,最重要的一个变量就是ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。关于ResourceMgrDelegate的详细介绍,请阅读《MapReduce源码分析ResourceMgrDelegate》一文,这里不再做详细介绍。

        另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache,

        接下来,我们看下YARNRunner的构造函数,如下:

  /**
   * Yarn runner incapsulates the client interface of
   * yarn
   * @param conf the configuration object for the client
   */
  public YARNRunner(Configuration conf) {

   // 先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数
   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
  }

  /**
   * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
   * {@link ResourceMgrDelegate}. Enables mocking and testing.
   * @param conf the configuration object for the client
   * @param resMgrDelegate the resourcemanager client handle.
   */
  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
   // 先构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数
   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
  }

  /**
   * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
   * but allowing injecting {@link ClientCache}. Enable mocking and testing.
   * @param conf the configuration object
   * @param resMgrDelegate the resource manager delegate
   * @param clientCache the client cache object.
   */
  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
      ClientCache clientCache) {

	// 成员变量赋值
	this.conf = conf;
    try {
      this.resMgrDelegate = resMgrDelegate;
      this.clientCache = clientCache;

      // 获取文件山下文FileContext实例defaultFileContext
      this.defaultFileContext = FileContext.getFileContext(this.conf);
    } catch (UnsupportedFileSystemException ufe) {
      throw new RuntimeException("Error in instantiating YarnClient", ufe);
    }
  }

        YARNRunner一共提供了三个构造函数,而我们之前说的WordCount作业提交时,其内部调用的是YARNRunner带有一个参数的构造函数,它会先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数,继而构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数,而最终的构造函数只是进行简单的类成员变量赋值,然后通过FileContext的静态getFileContext()方法获取文件山下文FileContext实例defaultFileContext。

        总结

        MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.6.0中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。

时间: 2024-12-31 15:08:54

MapReduce源码分析之新API作业提交(二):连接集群的相关文章

MapReduce源码分析之JobSubmitter(一)

        JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑.本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter.         首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 pr

Yarn源码分析之如何确定作业运行方式Uber or Non-Uber?

        在MRAppMaster中,当MapReduce作业初始化时,它会通过作业状态机JobImpl中InitTransition的transition()方法,进行MapReduce作业初始化相关操作,而这其中就包括:         1.调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo:         2.确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多

MapReduce源码分析之LocatedFileStatusFetcher

        LocatedFileStatusFetcher是MapReduce中一个针对给定输入路径数组,使用配置的线程数目来获取数据块位置的实用类.它的主要作用就是利用多线程技术,每个线程对应一个任务,每个任务针对给定输入路径数组Path[],解析出文件状态列表队列BlockingQueue<List<FileStatus>>.其中,输入数据输入路径只不过是一个Path,而输出数据则是文件状态列表队列BlockingQueue<List<FileStatus&g

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析之八:Task运行(二)

        在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤:         Step1:Task及其运行时需要的辅助对象构造,主要包括:                        1.当前线程设置上下文类加载器:        

Alluxio源码分析:RPC框架浅析(二)

        Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS.HBase等一样,也是由主从节点构成的.而节点之间的通信,一般都是采用的RPC通讯模型.Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答.         继<Alluxio源码分析:RPC框架浅析(一)>一文后,本文继续讲解Alluxio中RPC实现.         3.Server端实现:RPC Server端口绑定.传输协议

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

        我们知道,MapReduce有三层调度模型,即Job-->Task-->TaskAttempt,并且:         1.通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业.JobSetup Task等复杂的情况这里不做考虑):         2.每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task

Eureka源码分析:Eureka不会进行二次Replication的原因

Eureka不会进行二次同步注册信息 Eureka会将本实例中的注册信息同步到它的peer节点上,这是我们都知道的特性.然而,当peer节点收到同步数据后,并不会将这些信息再同步到它自己的peer节点上.如果有A, B, C三个实例,A配B, B配C, C配A, 那么当向A注册一个新服务时,只有A, B两个Eureka实例会有新服务的注册信息,C是没有的.这一点在官方wiki上并没有明确说明.下面通过源码来查找一下原因. 构建 Eureka当前版本 (https://github.com/Net

MapReduce源码分析之作业Job状态机解析(一)简介与正常流程浅析

        作业Job状态机维护了MapReduce作业的整个生命周期,即从提交到运行结束的整个过程.Job状态机被封装在JobImpl中,其主要包括14种状态和19种导致状态发生的事件.         作业Job的全部状态维护在类JobStateInternal中,如下所示: public enum JobStateInternal { // 作业新建状态,当作业Job被新创建时所处的状态 NEW, // 作业启动状态,此时运行时间已被设置,任务处于开始被调度阶段 SETUP, // 作