Hadoop 使用Combiner提高Map/Reduce程序效率

      众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。

      在上述过程中,我们看到至少两个性能瓶颈:

  1. 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
  2. 使用专利中的国家一项来阐述数据倾斜这个定义。这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。

        

        Hadoop通过使用一个介于Mapper和Reducer之间的Combiner步骤来解决上述瓶颈。你可以将Combiner视为Reducer的一个帮手,它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载。如果我们定义一个Combiner,MapReducer框架会对中间数据多次地使用它进行处理。

        如果Reducer只运行简单的分布式方法,例如最大值、最小值、或者计数,那么我们可以让Reducer自己作为Combiner。但许多有用的方法不是分布式的。以下我们使用求平均值作为例子进行讲解:

        Mapper输出它所处理的键值对,为了使单个DataNode计算平均值Reducer会对它收到的<key,value>键值对进行排序,求和。

        由于Reducer将它所收到的<key,value>键值的数目视为输入数据中的<key,value>键值对的数目,此时使用Combiner的主要障碍就是计数操作。我们可以重写MapReduce程序来明确的跟踪计数过程。

    

代码如下:

[java] view
plain
copy

  1. package com;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.conf.Configured;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.DoubleWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.Reducer;  
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  17. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  18. import org.apache.hadoop.util.Tool;  
  19. import org.apache.hadoop.util.ToolRunner;  
  20.   
  21. public class AveragingWithCombiner extends Configured implements Tool {  
  22.   
  23.     public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {  
  24.           
  25.         static enum ClaimsCounters { MISSING, QUOTED };  
  26.         // Map Method  
  27.         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  28.             String fields[] = value.toString().split(",", -20);  
  29.             String country = fields[4];  
  30.             String numClaims = fields[8];  
  31.               
  32.             if (numClaims.length() > 0 && !numClaims.startsWith("\"")) {  
  33.                 context.write(new Text(country), new Text(numClaims + ",1"));  
  34.             }  
  35.         }  
  36.     }  
  37.       
  38.     public static class Reduce extends Reducer<Text,Text,Text,DoubleWritable> {  
  39.           
  40.         // Reduce Method  
  41.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  42.             double sum = 0;  
  43.             int count = 0;  
  44.             for (Text value : values) {  
  45.                 String fields[] = value.toString().split(",");  
  46.                 sum += Double.parseDouble(fields[0]);  
  47.                 count += Integer.parseInt(fields[1]);  
  48.             }  
  49.             context.write(key, new DoubleWritable(sum/count));  
  50.         }  
  51.     }  
  52.       
  53.     public static class Combine extends Reducer<Text,Text,Text,Text> {  
  54.           
  55.         // Reduce Method  
  56.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  57.             double sum = 0;  
  58.             int count = 0;  
  59.             for (Text value : values) {  
  60.                 String fields[] = value.toString().split(",");  
  61.                 sum += Double.parseDouble(fields[0]);  
  62.                 count += Integer.parseInt(fields[1]);  
  63.             }  
  64.             context.write(key, new Text(sum+","+count));  
  65.         }  
  66.     }  
  67.       
  68.     // run Method  
  69.     public int run(String[] args) throws Exception {  
  70.         // Create and Run the Job  
  71.         Job job = new Job();  
  72.         job.setJarByClass(AveragingWithCombiner.class);  
  73.           
  74.         FileInputFormat.addInputPath(job, new Path(args[0]));  
  75.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  76.           
  77.         job.setJobName("AveragingWithCombiner");  
  78.         job.setMapperClass(MapClass.class);  
  79.         job.setCombinerClass(Combine.class);  
  80.         job.setReducerClass(Reduce.class);  
  81.         job.setInputFormatClass(TextInputFormat.class);  
  82.         job.setOutputFormatClass(TextOutputFormat.class);  
  83.           
  84.         job.setOutputKeyClass(Text.class);  
  85.         job.setOutputValueClass(Text.class);  
  86.           
  87.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  88.         return 0;  
  89.     }  
  90.       
  91.     public static void main(String[] args) throws Exception {  
  92.         int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args);  
  93.         System.exit(res);  
  94.     }  
  95.   
  96. }  
时间: 2024-09-19 09:16:21

Hadoop 使用Combiner提高Map/Reduce程序效率的相关文章

Hadoop Map/Reduce教程

目的 这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面. 先决条件 请先确认Hadoop被正确安装.配置和正常运行中.更多信息见: Hadoop快速入门对初次使用者. Hadoop集群搭建对大规模分布式集群. 概述 Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集. 一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若

Lucene-Hadoop, GFS中Map/Reduce的简单实现

Hadoop是一个用于构建分布式应用程序的框架.Hadoop框架给应用程序透明的提供了一组稳定和可靠的接口.这项技术的实现得易于映射/ 归约编程范式.在这个范式里,一个应用程序被分割成为许多的小的任务块.每一个这样的任务块被集群中的任意一个节点的计算机执行或重新执行.此外,这种范 式还提供了一种分布式的文件系统,这种文件系统用来存储数据于集群中相互间具有高带宽的计算机上.映射/归约和分布式文件系统都被设计成为容错的结构.也 就是说,当集群中某个节点发生了故障整个文件系统或者映射/归约操作仍然能够

Parallel Processing of cluster by Map Reduce

Parallel Processing of cluster by Map Reduce Madhavi Vaidya, Department of Computer Science This paper gives an overview of MapReduce programming model and its applications. The author has described here the workflow of MapReduce process. Some import

基于Hadoop的Map reduce编程(一)

翻译的一篇国外的关于hadoop mapreduce的文章,文章比较长,先翻译第一部分吧 翻译者:pconlin900 博客:http://pconline900.javaeye.com Hadoop是apache的一个开源的map-reduce框架,MapReduce是一个并行计算模型,用来处理海量数据.模型思想来源于google的Jeffrey Dean 和 Sanjay Ghemawat,包括map() reduce()两个主要的功能. 这是一个很简单的类似于Hadoop的MapReduc

Hadoop 少量map/reduce任务执行慢问题

最近在做报表统计,跑hadoop任务. 之前也跑过map/reduce但是数据量不大,遇到某些map/reduce执行时间特别长的问题. 执行时间长有几种可能性: 1. 单个map/reduce任务处理的任务大.     需要注意每个任务的数据处理量大小不至于偏差太大.可以切割部分大文件. 2. map数量过多, reduce拉取各方数据慢     这种情况,可以在中间加一轮map过程A.     即map -> mapA - > reduce,来减少reduce拉取数据的源头的个数. 3.

分布式基础学习【二】 —— 分布式计算系统(Map/Reduce)

二. 分布式计算(Map/Reduce) 分布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce 框架所设计的分布式框架.在Hadoop中,分布式文件系统,很大程度上,是为各种分布式计 算需求所服务的.我们说分布式文件系统就是加了分布式的文件系统,类似的定义推广到分 布式计算上,我们可以将其视为增加了分布式支持的计算函数.从计算的角度上看, Map/Reduce框架接受各种格式的键值对文件作为输入,读取计算后,最终生成自定义格式的 输出文件.而从分布式的角

Map Reduce - the Free Lunch is not over?

微软著名的C++大师 Herb Sutter在2005年初的时候曾经写过一篇重量级的文章:"The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software",预言OO之后软件开发将要面临的又一次重大变革-并行计算. 摩尔定律统制下的软件开发时代有一个非常有意思的现象:"Andy giveth, and Bill taketh away.".不管CPU的主频有多快,我们始终有办法来利用

hadoop mapreduce 在编写好的程序下 运行程序出现错误,求错误所在

问题描述 hadoop mapreduce 在编写好的程序下 运行程序出现错误,求错误所在 15/09/01 10:05:06 INFO mapred.JobClient: map 0% reduce 0% 15/09/01 10:05:22 INFO mapred.JobClient: Task Id : attempt_201509011003_0001_m_000002_0, Status : FAILED java.util.NoSuchElementException at java.

Ruby中的类Google Map/Reduce框架Skynet介绍_ruby专题

Skynet是一个很响亮的名字,因为它是阿诺施瓦辛格主演的经典系列电影<终结者>里面的统治人类的超级计算机网络.不过本文的Skynet没这么恐怖,它是一个ruby版本的Google Map/Reduce框架的名字而已. Google的Map/Reduce框架实在太有名气了,他可以把一个任务切分为很多份,交给n台计算机并行执行,返回的结果再并行的归并,最后得到运算的结果.据说Google一个搜索结果会Map到7000台服务器并行执行,这么多么可怕的分布式运算能力阿!有了Map/Reduce,程序