我们知道,MapReduce有三层调度模型,即Job——>Task——>TaskAttempt,并且:
1、通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业、JobSetup Task等复杂的情况这里不做考虑);
2、每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次。
而TaskImpl中存在一个成员变量attempts,用来存储Task所包含TaskAttempt中TaskAttemptId与TaskAttempt的映射关系,定义及初始化如下:
private Map<TaskAttemptId, TaskAttempt> attempts;
this.attempts = Collections.emptyMap();
也就是说,attempts一开始被初始化为Collections.emptyMap(),我们看下其实现:
@SuppressWarnings("unchecked") public static final <K,V> Map<K,V> emptyMap() { return (Map<K,V>) EMPTY_MAP; }
@SuppressWarnings("unchecked") public static final Map EMPTY_MAP = new EmptyMap<>();
/** * @serial include */ private static class EmptyMap<K,V> extends AbstractMap<K,V> implements Serializable { private static final long serialVersionUID = 6428348081105594320L; public int size() {return 0;} public boolean isEmpty() {return true;} public boolean containsKey(Object key) {return false;} public boolean containsValue(Object value) {return false;} public V get(Object key) {return null;} public Set<K> keySet() {return emptySet();} public Collection<V> values() {return emptySet();} public Set<Map.Entry<K,V>> entrySet() {return emptySet();} public boolean equals(Object o) { return (o instanceof Map) && ((Map<?,?>)o).isEmpty(); } public int hashCode() {return 0;} // Preserves singleton property private Object readResolve() { return EMPTY_MAP; } }
可以看出,EmptyMap就是一个空的Map,大小为0,isEmpty为true,containsKey和containsValue等针对任何key或value均为false。
而在生成TaskAttempt后将其添加至attempts的逻辑如下:
// 将创建的任务运行尝试TaskAttemptImpl实例attempt与其ID的对应关系添加到TaskImpl的任务运行尝试集合attempts中, // attempts先被初始化为Collections.emptyMap() // this.attempts = Collections.emptyMap(); switch (attempts.size()) { case 0: // 如果attempts大小为0,即为Collections.emptyMap(),则将其更换为Collections.singletonMap(),并加入该TaskAttemptImpl实例attempt attempts = Collections.singletonMap(attempt.getID(), (TaskAttempt) attempt); break; case 1: // 如果attempts大小为1,即为Collections.singletonMap(),则将其替换为LinkedHashMap,并加入之前和现在的TaskAttemptImpl实例attempt Map<TaskAttemptId, TaskAttempt> newAttempts = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts); newAttempts.putAll(attempts); attempts = newAttempts; attempts.put(attempt.getID(), attempt); break; default: // 如果attempts大小大于1,说明其实一个LinkedHashMap,直接put吧 attempts.put(attempt.getID(), attempt); break; }
当Task第一次生成TaskAttempt,并将其加入attempts时,attempts为Collections.emptyMap(),其大小肯定为0,此时将TaskAttempt加入attempts时,会将attempts转换成Collections.singletonMap,即只含有一个Key-Value对的Map。而Collections.singletonMap定义如下:
public static <K,V> Map<K,V> singletonMap(K key, V value) { return new SingletonMap<>(key, value); }
private static class SingletonMap<K,V> extends AbstractMap<K,V> implements Serializable { private static final long serialVersionUID = -6979724477215052911L; private final K k; private final V v; SingletonMap(K key, V value) { k = key; v = value; } public int size() {return 1;} public boolean isEmpty() {return false;} public boolean containsKey(Object key) {return eq(key, k);} public boolean containsValue(Object value) {return eq(value, v);} public V get(Object key) {return (eq(key, k) ? v : null);} private transient Set<K> keySet = null; private transient Set<Map.Entry<K,V>> entrySet = null; private transient Collection<V> values = null; public Set<K> keySet() { if (keySet==null) keySet = singleton(k); return keySet; } public Set<Map.Entry<K,V>> entrySet() { if (entrySet==null) entrySet = Collections.<Map.Entry<K,V>>singleton( new SimpleImmutableEntry<>(k, v)); return entrySet; } public Collection<V> values() { if (values==null) values = singleton(v); return values; } }
由此可以看出,SingletonMap是只包含一对Key-Value的Map,其size大小固定为1,containsKey和containsValue返回入参key、value是否与SingletonMap内部的k、v相等,get会根据入参是否为k,来确定返回v还是null,等等。
而当attempts大小为1,即为Collections.singletonMap时,再添加TaskAttempt的话,就需要将attempts更换为LinkedHashMap,将之前的和新添加的TaskAttempt加入,此后,如果再有TaskAttempt要加入的话,直接put即可。LinkedHashMap初始化时,其容量已被确定,为maxAttempts,这个maxAttempts取自方法getMaxAttempts(),它在TaskImpl中是一个抽象方法,由其两个子类MapTaskImpl、ReduceTaskImpl分别实现,如下:
TaskImpl.java
// No override of this method may require that the subclass be initialized. protected abstract int getMaxAttempts();
MapTaskImpl.java
@Override protected int getMaxAttempts() { return conf.getInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4); }
ReduceTaskImpl.java
@Override protected int getMaxAttempts() { return conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4); }
可见,Map和Reduce任务的TaskAttempt都有一个限制,分别取自参数mapreduce.map.maxattempts、mapreduce.reduce.maxattempts,参数未配置的话,均默认为4。既然有了TaskAttempt个数的上限,那么我们初始化LinkedHashMap指定容量即可,其构造如下:
/** * Constructs an empty insertion-ordered <tt>LinkedHashMap</tt> instance * with the specified initial capacity and a default load factor (0.75). * * @param initialCapacity the initial capacity * @throws IllegalArgumentException if the initial capacity is negative */ public LinkedHashMap(int initialCapacity) { super(initialCapacity); accessOrder = false; }
调用父类HashMap的构造函数,如下:
/** * Constructs an empty <tt>HashMap</tt> with the specified initial * capacity and the default load factor (0.75). * * @param initialCapacity the initial capacity. * @throws IllegalArgumentException if the initial capacity is negative. */ public HashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR); }
确定其初始容量为指定的initialCapacity。
思考:
MapReduce为什么要这么设计呢?我想了想,大体有关于业务逻辑和性能等方面的两个原因:
1、Task的调度执行是有顺序的,而Task的抽象类TaskImpl的实现类,无论是MapTaskImpl,还是ReduceTaskImpl的构造,都是必须先进行的,这样就有一个问题,如果attempts上来就被构造为指定大小的LinkedHashMap,势必会造成空间的浪费,还有性能的消耗,况且,作业执行成功与否,还是后话,而如果我们初始化为Collections.emptyMap(),则很容易解决上面两个问题;
2、按照常理来说,理想情况下,每个Task应该有且只有一个TaskAttempt,只有当任务运行失败后重试,或开启推测执行机制后为有效加快拖后腿任务的执行而开启的备份任务等情况时,才会存在多个TaskAttempt,而在第一个TaskAttempt被构造时,将attempts由Collections.emptyMap()升级为Collections.singletonMap(),无论是在空间利用、性能上,还是业务逻辑上,都比较贴合实际情况;
3、再需要重试任务或开启备份任务时,才将attempts由Collections.singletonMap()升级为指定容量的LinkedHashMap,里面有延迟加载的理念;
4、占用资源越少,性能越高,对于其他作业或任务来说,是一种福音,能够整体提高集群的资源利用效率。
上述性能和业务逻辑方面的考虑,您或许不以为然,可能觉得性能提升不大,但是如果在大规模集群中,当作业数量庞大、任务数目数量庞大时,这种优势就愈发明显,而它带来的好处,于已,于别的作业来说,都会是一种福音!这种设计上的细节,值得我们学习、借鉴与反思!