Hadoop2源码分析-MapReduce篇

1.概述

  前面我们已经对Hadoop有了一个初步认识,接下来我们开始学习Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天为大家分享的是mapreduce部分,其内容目录如下所示:

  • MapReduce V1
  • MapReduce V2
  • MR V1和MR V2的区别
  • MR V2的重构思路

  本篇文章的源码是基于hadoop-2.6.0-src.tar.gz来完成的。代码下载地址,请参考《Hadoop2源码分析-准备篇》。

2.MapReduce V1

  下面我们给出第一代的MapReduce的架构图,如下所示:

  上图描述了第一代MapReduce框架的流程以及设计思路,下面为大家解释下这张图的具体含义:

  • 当我们编写完MR作业后,需要通过JobClient来提交一个job,提交的信息会发送到JobTracker模块,这个模块是第一代
    MapReduce计算框架的核心之一,它负责与集群中的其他节点维持心跳,为提交的作业分配资源,管理提交的作业的正常运作(失败,重启等)。
  • 第一代MapReduce的另一个核心的功能是TaskTracker,在各个TaskTracker安装节点上,它的主要功能是监控自己所在节点的资源使用情况。
  • TaskTracker监控当前节点的Tasks的运行情况,其中包含Map Task和Reduce Task,最后由Reduce
    Task到Reduce阶段,将结果输送到HDFS的文件系统中;其中的具体流程如图中描述的1-7步骤。TaskTracker在监控期间,需要把这些
    信息通过心跳机制发送给JobTracker,JobTracker收集到这些信息后,给新提交的作业分配其他的资源,避免重复资源分配。

  可以看出,第一代的MapReduce架构简单清晰,在刚面世的那几年,也曾获得总多企业的支持和认可。但随着分布式集群的规模和企业业务的增长,第一代框架的问题也逐渐暴露出来,主要有以下问题:

  • JobTracker是第一代MapReduce的入口点,若是JobTracker服务宕机,整个服务将会瘫痪,存在单点问题。
  • JobTracker负责的事情太多,完成来太多的任务,占用过多的资源,当Job数非常多的时候,会消耗很多内存,容易出现性能瓶颈。
  • 对TaskTracker而言,Task担当的角色过于简单,没有考虑到CPU及内存的使用情况,若存在多个大内存的Task被集中调度,容易出现内存溢出。
  • 另外,TaskTracker把资源强制分为map task slot和reduce task slot,若是MR任务中只存在其中一个(map或是reduce),会出现资源浪费的情况,资源利用率低。
  • 从开发人员的角度来说,源码分析的时候,阅读性不够友好,代码量大,任务不清晰,给开发人员在修复BUG和维护的时候增大了难度。

3.MapReduce V2

  在Hadoop V2中,加入了YARN的概念,所以MapReduce V2的架构和MapReduce V1的架构有些许的变化,如下图所示:

  从上图中,我们可以清晰的看出,架构重构的基本思想在于将JobTracker的两个核心的功能单独分离成独立的组件了。分离后的组件分别为资
源管理(Applications Manager)和任务调度器(Resource Scheduler)。新的资源管理器(Resource
Manager)管理整个系统的资源分配,而每一个Node Manager下的App Master(Application
Master)负责对应的调度和协调工作,而在实际中,App Master从Resource Manager上获得资源,让Node
Manager来协同工作和任务监控。

  从图中我们可以看出,Resource Manager是支持队列分层的,这些队列可以从集群中获取一定比例的资源,也就是说Resource Manager可以算得上是一个调度器,它在执行的过程当中本身不负责对应用的监控和状态的定位跟踪。

  Resource
Manager在内存,CPU,IO等方面是动态分配的,相比第一代MapReduce计算框架,在资源使用上大大的加强了资源使用的灵活性。上图中的
Node Manager是一个代理框架,负责应用程序的执行,监控应用程序的资源利用率,并将信息上报给资源管理器。另外,App
Master所担当的角色职责包含:在运行任务是,向任务调度器动态的申请资源,对应用程序的状态进行监控,处理异常情况,如若出现问题,会在其他节点进
行重启。

4.MR V1和MR V2的区别

  在和大家分析完 MR V1 和 MR V2 的架构后,我们来看看二者有哪些变化。在MR
V2版本中,大部分的API接口都是兼容的保留下来,MR V1中的JobTracker和TaskTracker被替换成相应的Resource
Manager,Node Manager。对比于MR V1中的Task的监控,重启等内热都交由App Master来处理,Resource
Manager提供中心服务,负责资源的分配与调度。Node Manager负责维护Container的状态,并将收集的信息上报给Resource
Manager,以及负责和Resource Manager维持心跳。

  MR V2中加入Yarn的概念后,体现以下设计优点:

  • 减少来资源消耗,让监控每一个作业更加分布式了。
  • 能够支持更多的变成模型,如:Spark,Storm,以及其他待开发的编程模型。
  • 将资源以内存量的概念来描述,比MR V1中的slot更加合理。

  另外,在工程目录结构也有了些许的变化,如下表所示:

改变目录 MR V1 MR V2 描述
配置文件 ${HADOOP_HOME}/conf ${HADOOP_HOME}/etc/hadoop MR V2中的配置文件路径修改为etc/hadoop目录下
脚本 ${HADOOP_HOME}/bin ${HADOOP_HOME}/sbin和${HADOOP_HOME}/bin 在MR V2中启动,停止等命令都位于sbin目录下,操作hdfs的命令存放在bin目录下
JAVA_HOME ${HADOOP_HOME}/conf/hadoop-env.sh ${HADOOP_HOME}/etc/hadoop/hadoop-env.sh和${HADOOP_HOME}/etc/hadoop/yarn-env.sh 在MR V2中需要同时在hadoop-env.sh和yarn-env.sh中配置JDK的路径

  由于添加Yarn特性,与第一代MR的框架变化较大,第一代的核心配置文件许多项也在新框架中摒弃了,具体新框架的核心配置文件信息,请参考《配置高可用的Hadoop平台》。

5.MR V2的重构思路

  在V2中的MapReduce重构的思路主要有以下几点:

  • 层次化的管理:分层级对资源的调度和分配进行管理。
  • 资源管理方式:由第一代的slot作为资源单位元,调整为更加细粒的内存单位元。
  • 编程模型拓展:V2版的设计支持除MapReduce以外的编程模型。

  MapReduce:WordCount V2,代码如下:

package cn.hdfs.mapreduce.example;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

/**
* @date Apr 17, 2015
*
* @author dengjie
*/
public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", true)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

  Spark:WordCount,代码如下:

package com.hdfs.spark.example

/**
* @date Apr 17, 2015
*
* @author dengjie
*/
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/**
* 统计字符出现次数
*/
object WordCount {
   def main(args: Array[String]) {
     if (args.length < 1) {
       System.err.println("Usage: <file>")
       System.exit(1)
     }

     val conf = new SparkConf()
     val sc = new SparkContext(conf)
     val line = sc.textFile(args(0))
     line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)
     sc.stop()
}
}

6.结束语

  这篇文章就和大家分享到这里,如果大家在研究和学习的过程中有什么疑问,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

时间: 2025-01-19 07:15:10

Hadoop2源码分析-MapReduce篇的相关文章

Hadoop2源码分析-准备篇

1.概述 我们已经能够搭建一个高可用的Hadoop平台了,也熟悉并掌握了一个项目在Hadoop平台下的开发流程,基于Hadoop的一些套件我们也能够使用,并且能利用这些套件进行一些任务的开发.在Hadoop的应用级别上,我们接着往后面去研究学习,那就是Hadoop的源码了,作为Hadoop开发人员,我们得去学习和研究Hadoop得实现原理,底层框架的设计,编码的实现过程等等,下面就开始我们今天的Hadoop源码分析之旅. 2.准备 在分析源码之前,我们需要准备好分析源码的环境,以及如何去分析(分

Hadoop2源码分析-HDFS核心模块分析

1.概述 这篇博客接着<Hadoop2源码分析-RPC机制初识> 来讲述,前面我们对MapReduce.序列化.RPC进行了分析和探索,对Hadoop V2的这些模块都有了大致的了解,通过对这些模块的研究,我们明白了MapReduce的运行流程以及内部的实现机制,Hadoop的序列化以及它的通信 机制(RPC).今天我们来研究另一个核心的模块,那就是Hadoop的分布式文件存储系统--HDFS,下面是今天分享的内容目录: HDFS简述 NameNode DataNode 接下来,我们开始今天的

Hadoop2源码分析-YARN RPC 示例介绍

1.概述 之前在<Hadoop2源码分析-RPC探索实战>一文当中介绍了Hadoop的RPC机制,今天给大家分享关于YARN的RPC的机制.下面是今天的分享目录: YARN的RPC介绍 YARN的RPC示例 截图预览 下面开始今天的内容分享. 2.YARN的RPC介绍 我们知道在Hadoop的RPC当中,其主要由RPC,Client及Server这三个大类组成,分别实现对外提供编程接口.客户端实现及服务端实现.如下图所示:     图中是Hadoop的RPC的一个类的关系图,大家可以到<

Hadoop2源码分析-RPC机制初识

1.概述 上一篇博客,讲述Hadoop V2的序列化机制,这为我们学习Hadoop V2的RPC机制奠定了基础.RPC的内容涵盖的信息有点多,包含Hadoop的序列化机制,RPC,代理,NIO等.若对Hadoop序列化不了解的同学,可以参考<Hadoop2源码分析-序列化篇>.今天这篇博客为大家介绍的内容目录如下: RPC概述 第三方RPC Hadoop V2的RPC简述 那么,下面开始今天的学习之路. 2.RPC概述 首先,我们要弄明白,什么是RPC?RPC能用来做什么? 2.1什么是RPC

Hadoop2源码分析-RPC探索实战

1.概述 在<Hadoop2源码分析-RPC机制初识>博客中,我们对RPC机制有了初步的认识和了解,下面我们对Hadoop V2的RPC机制做进一步探索,在研究Hadoop V2的RPC机制,我们需要掌握相关的Java基础知识,如:Java NIO.动态代理与反射等.本篇博客介绍的内容目录如下所示: Java NIO简述 Java NIO实例演示 动态代理与反射简述 动态代理与反射实例演示 Hadoop V2 RPC框架使用实例 下面开始今天的博客介绍. 2.Java NIO简述 Java N

Hadoop2源码分析-Hadoop V2初识

1.概述 在完成分析Hadoop2源码的准备工作后,我们进入到后续的源码学习阶段.本篇博客给大家分享,让大家对Hadoop V2有个初步认识,博客的目录内容如下所示: Hadoop的渊源 Hadoop V2部分项目图 各个包的功能介绍 本篇文章的源码是基于Hadoop-2.6.0来分析,其他版本的Hadoop的源码可以此作为参考分析. 2.Hadoop的渊源 其实,早年Google的核心竞争力是它的计算平台,Google对外公布的论文有一下内容: GoogleCluster  Chubby  G

Hadoop2源码分析-序列化篇

1.概述 上一篇我们了解了MapReduce的相关流程,包含MapReduce V2的重构思路,新的设计架构,与MapReduce V1的区别等内容,今天我们在来学习下在Hadoop V2中的序列化的相关内容,其目录如下所示: 序列化的由来 Hadoop序列化依赖图详解 Writable常用实现类 下面,我们开始学习今天的内容. 2.序列化的由来 我们知道Java语言对序列化提供了非常友好的支持,在定义一个类时,如果我们需要序列化一个类,只需要实现该类的序列化接口即可.场景:让一个AppInfo

MapReduce源码分析之新API作业提交(二):连接集群

         MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下: private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { // 如果cluster为null,构造Cluster实例cluster, // Cluster为连接MapReduce集群的一种工

MapReduce源码分析之JobSubmitter(一)

        JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑.本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter.         首先,我们先看下JobSubmitter的类成员变量,如下: // 文件系统FileSystem实例 pr