MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

        我们知道,MapReduce有三层调度模型,即Job——>Task——>TaskAttempt,并且:

        1、通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业、JobSetup Task等复杂的情况这里不做考虑);

        2、每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次。

        而TaskImpl中存在一个成员变量attempts,用来存储Task所包含TaskAttempt中TaskAttemptId与TaskAttempt的映射关系,定义及初始化如下:

    private Map<TaskAttemptId, TaskAttempt> attempts;
    this.attempts = Collections.emptyMap();

        也就是说,attempts一开始被初始化为Collections.emptyMap(),我们看下其实现:

    @SuppressWarnings("unchecked")
    public static final <K,V> Map<K,V> emptyMap() {
        return (Map<K,V>) EMPTY_MAP;
    }
    @SuppressWarnings("unchecked")
    public static final Map EMPTY_MAP = new EmptyMap<>();
    /**
     * @serial include
     */
    private static class EmptyMap<K,V>
        extends AbstractMap<K,V>
        implements Serializable
    {
        private static final long serialVersionUID = 6428348081105594320L;

        public int size()                          {return 0;}
        public boolean isEmpty()                   {return true;}
        public boolean containsKey(Object key)     {return false;}
        public boolean containsValue(Object value) {return false;}
        public V get(Object key)                   {return null;}
        public Set<K> keySet()                     {return emptySet();}
        public Collection<V> values()              {return emptySet();}
        public Set<Map.Entry<K,V>> entrySet()      {return emptySet();}

        public boolean equals(Object o) {
            return (o instanceof Map) && ((Map<?,?>)o).isEmpty();
        }

        public int hashCode()                      {return 0;}

        // Preserves singleton property
        private Object readResolve() {
            return EMPTY_MAP;
        }
    }

        可以看出,EmptyMap就是一个空的Map,大小为0,isEmpty为true,containsKey和containsValue等针对任何key或value均为false。

        而在生成TaskAttempt后将其添加至attempts的逻辑如下:

    // 将创建的任务运行尝试TaskAttemptImpl实例attempt与其ID的对应关系添加到TaskImpl的任务运行尝试集合attempts中,
    // attempts先被初始化为Collections.emptyMap()
    // this.attempts = Collections.emptyMap();
    switch (attempts.size()) {
      case 0:

    	// 如果attempts大小为0,即为Collections.emptyMap(),则将其更换为Collections.singletonMap(),并加入该TaskAttemptImpl实例attempt
        attempts = Collections.singletonMap(attempt.getID(),
            (TaskAttempt) attempt);
        break;

      case 1:

    	// 如果attempts大小为1,即为Collections.singletonMap(),则将其替换为LinkedHashMap,并加入之前和现在的TaskAttemptImpl实例attempt
        Map<TaskAttemptId, TaskAttempt> newAttempts
            = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
        newAttempts.putAll(attempts);
        attempts = newAttempts;
        attempts.put(attempt.getID(), attempt);
        break;

      default:
    	// 如果attempts大小大于1,说明其实一个LinkedHashMap,直接put吧
        attempts.put(attempt.getID(), attempt);
        break;
    }

        当Task第一次生成TaskAttempt,并将其加入attempts时,attempts为Collections.emptyMap(),其大小肯定为0,此时将TaskAttempt加入attempts时,会将attempts转换成Collections.singletonMap,即只含有一个Key-Value对的Map。而Collections.singletonMap定义如下:

    public static <K,V> Map<K,V> singletonMap(K key, V value) {
        return new SingletonMap<>(key, value);
    }
    private static class SingletonMap<K,V>
          extends AbstractMap<K,V>
          implements Serializable {
        private static final long serialVersionUID = -6979724477215052911L;

        private final K k;
        private final V v;

        SingletonMap(K key, V value) {
            k = key;
            v = value;
        }

        public int size()                          {return 1;}

        public boolean isEmpty()                   {return false;}

        public boolean containsKey(Object key)     {return eq(key, k);}

        public boolean containsValue(Object value) {return eq(value, v);}

        public V get(Object key)                   {return (eq(key, k) ? v : null);}

        private transient Set<K> keySet = null;
        private transient Set<Map.Entry<K,V>> entrySet = null;
        private transient Collection<V> values = null;

        public Set<K> keySet() {
            if (keySet==null)
                keySet = singleton(k);
            return keySet;
        }

        public Set<Map.Entry<K,V>> entrySet() {
            if (entrySet==null)
                entrySet = Collections.<Map.Entry<K,V>>singleton(
                    new SimpleImmutableEntry<>(k, v));
            return entrySet;
        }

        public Collection<V> values() {
            if (values==null)
                values = singleton(v);
            return values;
        }

    }

         
由此可以看出,SingletonMap是只包含一对Key-Value的Map,其size大小固定为1,containsKey和containsValue返回入参key、value是否与SingletonMap内部的k、v相等,get会根据入参是否为k,来确定返回v还是null,等等。

        而当attempts大小为1,即为Collections.singletonMap时,再添加TaskAttempt的话,就需要将attempts更换为LinkedHashMap,将之前的和新添加的TaskAttempt加入,此后,如果再有TaskAttempt要加入的话,直接put即可。LinkedHashMap初始化时,其容量已被确定,为maxAttempts,这个maxAttempts取自方法getMaxAttempts(),它在TaskImpl中是一个抽象方法,由其两个子类MapTaskImpl、ReduceTaskImpl分别实现,如下:

        TaskImpl.java

  // No override of this method may require that the subclass be initialized.
  protected abstract int getMaxAttempts();

        MapTaskImpl.java

  @Override
  protected int getMaxAttempts() {
    return conf.getInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4);
  }

        ReduceTaskImpl.java

  @Override
  protected int getMaxAttempts() {
    return conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
  }

        可见,Map和Reduce任务的TaskAttempt都有一个限制,分别取自参数mapreduce.map.maxattempts、mapreduce.reduce.maxattempts,参数未配置的话,均默认为4。既然有了TaskAttempt个数的上限,那么我们初始化LinkedHashMap指定容量即可,其构造如下:

    /**
     * Constructs an empty insertion-ordered <tt>LinkedHashMap</tt> instance
     * with the specified initial capacity and a default load factor (0.75).
     *
     * @param  initialCapacity the initial capacity
     * @throws IllegalArgumentException if the initial capacity is negative
     */
    public LinkedHashMap(int initialCapacity) {
        super(initialCapacity);
        accessOrder = false;
    }

        调用父类HashMap的构造函数,如下:

    /**
     * Constructs an empty <tt>HashMap</tt> with the specified initial
     * capacity and the default load factor (0.75).
     *
     * @param  initialCapacity the initial capacity.
     * @throws IllegalArgumentException if the initial capacity is negative.
     */
    public HashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR);
    }

         确定其初始容量为指定的initialCapacity。

        思考:

        MapReduce为什么要这么设计呢?我想了想,大体有关于业务逻辑和性能等方面的两个原因:

        1、Task的调度执行是有顺序的,而Task的抽象类TaskImpl的实现类,无论是MapTaskImpl,还是ReduceTaskImpl的构造,都是必须先进行的,这样就有一个问题,如果attempts上来就被构造为指定大小的LinkedHashMap,势必会造成空间的浪费,还有性能的消耗,况且,作业执行成功与否,还是后话,而如果我们初始化为Collections.emptyMap(),则很容易解决上面两个问题;

        2、按照常理来说,理想情况下,每个Task应该有且只有一个TaskAttempt,只有当任务运行失败后重试,或开启推测执行机制后为有效加快拖后腿任务的执行而开启的备份任务等情况时,才会存在多个TaskAttempt,而在第一个TaskAttempt被构造时,将attempts由Collections.emptyMap()升级为Collections.singletonMap(),无论是在空间利用、性能上,还是业务逻辑上,都比较贴合实际情况;

        3、再需要重试任务或开启备份任务时,才将attempts由Collections.singletonMap()升级为指定容量的LinkedHashMap,里面有延迟加载的理念;

        4、占用资源越少,性能越高,对于其他作业或任务来说,是一种福音,能够整体提高集群的资源利用效率。

        上述性能和业务逻辑方面的考虑,您或许不以为然,可能觉得性能提升不大,但是如果在大规模集群中,当作业数量庞大、任务数目数量庞大时,这种优势就愈发明显,而它带来的好处,于已,于别的作业来说,都会是一种福音!这种设计上的细节,值得我们学习、借鉴与反思!

        

时间: 2024-09-20 20:25:37

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考的相关文章

MapReduce源码分析之新API作业提交(二):连接集群

         MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下: private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { // 如果cluster为null,构造Cluster实例cluster, // Cluster为连接MapReduce集群的一种工

MapReduce源码分析之JobSubmitter(一)

        JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑.本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter.         首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 pr

MapReduce源码分析之LocatedFileStatusFetcher

        LocatedFileStatusFetcher是MapReduce中一个针对给定输入路径数组,使用配置的线程数目来获取数据块位置的实用类.它的主要作用就是利用多线程技术,每个线程对应一个任务,每个任务针对给定输入路径数组Path[],解析出文件状态列表队列BlockingQueue<List<FileStatus>>.其中,输入数据输入路径只不过是一个Path,而输出数据则是文件状态列表队列BlockingQueue<List<FileStatus&g

jQuery源码分析之jQuery中的循环技巧详解_jquery

jQuery的源码中有很多值得学习借鉴的技巧,本文即收集了jQuery中出现的各种遍历技巧和场景.具体分析如下: // 简单的for-in(事件) for ( type in events ) { } // 缓存length属性,避免每次都去查找length属性,稍微提升遍历速度 // 但是如果遍历HTMLCollection时,性能提升非常明显,因为每次访问HTMLCollection的属性,HTMLCollection都会内部匹配一次所有的节点 for ( var j = 0, l = ha

【原创】源码分析 TCP 协议中的 SYN queue 和 accept queue 处理

若要理解本文意图说明的问题,可能需要以下知识背景:  listen 系统调用的 backlog 参数含义,以及与 net.core.somaxconn 参数的关系: SYN flood 攻击与防护: SYN queue 和 accept queue 的用途,以及在不同 linux 版本中的实现差异: ----  在 SYN queue 未满的情况下,在收到 SYN 包后,TCP 协议栈自动回复 SYN,ACK 包,之后在收到 ACK 时,根据 accept queue 状态进行后续处理: 若 S

MapReduce源码分析之作业Job状态机解析(一)简介与正常流程浅析

        作业Job状态机维护了MapReduce作业的整个生命周期,即从提交到运行结束的整个过程.Job状态机被封装在JobImpl中,其主要包括14种状态和19种导致状态发生的事件.         作业Job的全部状态维护在类JobStateInternal中,如下所示: public enum JobStateInternal { // 作业新建状态,当作业Job被新创建时所处的状态 NEW, // 作业启动状态,此时运行时间已被设置,任务处于开始被调度阶段 SETUP, // 作

MapReduce源码分析之JobSplitWriter

        JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo.它有两个静态成员变量,如下: // 分片版本,当前默认为1 private static final int splitVersion = JobSplit.META_SPLIT_VERSION; // 分片文件头部,为UTF-8格式的字符串"SPL"的字节数组"SPL" private static

MapReduce源码分析之InputFormat

        InputFormat描述了一个Map-Reduce作业中的输入规范.Map-Reduce框架依靠作业的InputFormat实现以下内容:         1.校验作业的输入规范:         2.分割输入文件(可能为多个),生成逻辑输入分片InputSplit(往往为多个),每个输入分片InputSplit接着被分配给单独的Mapper:         3.提供记录读取器RecordReader的实现,RecordReader被用于从逻辑输入分片InputSplit收集

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {