利用hadoop mapreduce 做数据排序

  我们的需求是想统计一个文件中用IK分词后每个词出现的次数,
然后按照出现的次数降序排列。也就是高频词统计。

  由于hadoop在reduce之后就不能对结果做什么了,
所以只能分为两个job完成,第一个job统计次数,第二个job对第一个job的结果排序。 第一个job的就是hadoop最简单的例子countwords,
我要说的是用hadoop对结果排序。 假设第一个job的结果输出
如下:

  part-r-0000文件内容:

  a&">nbsp; 5

  b  4

  c  74

  d  78

  e  1

  r  64

  f    4

  要做的就是按照每个词出现的次数降序排列。

  **********************************分割线*****************************************
首先可能会出现这样的问题:

  1.可能上一个job为多个reduce,也就是会产生多个结果文件,因为一个reduce就会生成一个结果文件,结果存放在上一个job输出目录下类似part-r-00的文件里。

  2.需要排序的文件内容很大,所以需要考虑多个reduce的情况。

  *********************************分割线*******************************怎么去设计mapreduce

  1.在map阶段按照行读取文本,然后调用map方法时把上一个job的结果颠倒,也就是map后结果应该是这样的

  5    a

  4    b

  74    c

  ................

  .........................

  4    f2.然后map后,hadoop会对结果进行分组,这时结果就会变成

  (5:a)

  (4:b,f)

  (74:c)

  3.然后按照reduce数目的
大小自定义分区函数,让结果形成多个区间,比如我
认为大于50的应该在一个区间,一共3个reduce,
那么最后的数据应该是三个区间,大于50的直接分到第一个分区0,25到50之间的分到第二个分区1,小于25的分到第三个分区2.因为分区数和reduce数是相同的,所以不同的分区对应不同的reduce,因为分区是从0开始的,分区是0的会分到第一个reduce处理,分区是1的会分到第2个reduce处理,依次类推。并且reduce对应着输出文件,所以,第一个reduce生成的文件就会是part-r-0000,第二个reduce对应的生成文件就会是part-r-0001,依次类推,所以reduce处理时只需要把key和value再倒过来直接输出。这样最后就会让形成数目最大的字符串就会在第一个生成文件里,排好的序就会文件命的顺序。 代码如下:

  *******************************分割线*****************************************map:

  /**

  * 把上一个mapreduce的结果的key和value颠倒,调到后就可以按照key排序了。

  *

  * @author zhangdonghao

  *

  */

  public class SortIntValueMapper extends

  Mapper<LongWritable, Text, IntWritable, Text> {

  private final static IntWritable wordCount = new IntWritable(1);

  private Text word = new Text();

  public SortIntValueMapper() {

  super();

  }

  @Override

  public void map(LongWritable key, Text value, Context context)

  throws IOException, InterruptedException {

  StringTokenizer
tokenizer = new StringTokenizer(value.toString());

  while (tokenizer.hasMoreTokens()) {

  word.set(tokenizer.nextToken().trim());

  wordCount.set(Integer.valueOf(tokenizer.nextToken().trim()));

  context.write(wordCount, word);

  }

  }

  }reudce:

  /**

  * 把key和value颠倒过来输出

  * @author zhangdonghao

  *

  */

  public class SortIntValueReduce extends

  Reducer<IntWritable, Text, Text, IntWritable> {

  private Text result = new Text();

  @Override

  public void reduce(IntWritable key, Iterable<Text> values, Context context)

  throws IOException, InterruptedException {

  for (Text val : values) {

  result.set(val.toString());

  context.write(result, key);

  }

  }

  }Partitioner:

  /**

  * 按照key的大小来划分区间,当然,key 是 int值

  *

  * @author zhangdonghao

  *

  */

  public class KeySectionPartitioner<K, V> extends Partitioner<K, V> {

  public KeySectionPartitioner() {

  }

  @Override

  public int getPartition(K key, V value, int numReduceTasks) {

  /**

  * int值的hashcode还是自己本身的数值

  */

  //这里我认为大于maxValue的就应该在第一个分区

  int maxValue = 50;

  int keySection = 0;

  // 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0

  if (numReduceTasks > 1 && key.hashCode() < maxValue) {

  int sectionValue = maxValue / (numReduceTasks - 1);

  int count = 0;

  while ((key.hashCode() - sectionValue * count) > sectionValue) {

  count++;

  }

  keySection = numReduceTasks - 1 - count;

  }

  return keySection;

  }

  }Comparator:

  /**

  * int的key按照降序排列

  *

  * @author zhangdonghao

  *

  */

  public class IntKeyAscComparator extends WritableComparator {

  protected IntKeyAscComparator() {

  super(IntWritable.class, true);

  }

  @Override

  public int compare(WritableComparable a, WritableComparable b) {

  return -super.compare(a, b);

  }

  }job的关键设置:

  /**

  * 这里是map输出的key和value类型

  */

  job.setOutputKeyClass(IntWritable.class);

  job.setOutputValueClass(Text.class);

  job.setMapperClass(SortIntValueMapper.class);

  // job.setCombinerClass(WordCountReduce.class);

  job.setReducerClass(SortIntValueReduce.class);

  // key按照降序排列

  job.setSortComparatorClass(IntKeyAscComparator.class);

  job.setPartitionerClass(KeySectionPartitioner.class);

  job.setInputFormatClass(TextInputFormat.class);

  job.setOutputFormatClass(TextOutputFormat.class);

  /**

  *这里可以放输入目录数组,也就是可以把上一个job所
有的结果都放进去

  **/

  FileInputFormat.setInputPaths(job, inputPath);

  FileOutputFormat.setOutputPath(job,outputPath);大概就是这样,亲测可用。(^__^)

时间: 2024-09-15 06:24:15

利用hadoop mapreduce 做数据排序的相关文章

用Excel做数据排序地常用办法与灵活技术

在用Excel制作相关的数据表格时,我们可以利用其强大的排序功能,浏览.查询.统计相关的数字.下面,我们以图1所示的"员工基本情况登记表"为例,来全面体验一番Excel的排序功能. 列表能否灵活排序"> 一.快速排序 如果我们希望对员工资料按某列属性(如"工龄"由长到短)进行排列,可以这样操作:选中"工龄"列任意一个单元格(如I3),然后按一下"常用"工具栏上的"降序排序"按钮即可(参见图1

Hadoop MapReduce:数据科学家探索之路

Forrester分析师James Kobielus在一篇关于"大数据"的博客中指出:"关键不在于采用什么方法,而在于能够使用任意可用工具或方法真正地解决问题." 近几年在解决大数据问题的迫切感驱使下,许多组织的数据架构师开始走向探索之路.简单而言,他们通常用于分析企业数据的传统数据库和商业智能工具已经无法胜任大数据处理任务. 要理解这个挑战,必须回到十年前:当时很少有TB级的企业数据仓库.Forrester分析报告指出,在2009年之前,有三分之二的企业数据仓库(

Hadoop专业解决方案-第3章:MapReduce处理数据

前言:非常感谢团队的努力,最新的章节终于有了成果,因为自己的懒惰,好久没有最新的进展了,感谢群里兄弟的努力. 群名称是Hadoop专业解决方案群  313702010 本章主要内容: 理解MapReduce基本原理 了解MapReduce应用的执行 理解MapReduce应用的设计 截止到目前,我们已经知道Hadoop如何存储数据,但Hadoop不仅仅是一个高可用 的,规模巨大的数据存储引擎,它的另一个主要特点是可以将数据存储与处理相结合. Hadoop的核心处理模块是MapReduce,也是当

MapReduce初级案例——数据排序

" 数据排序"是许多实际任务执行时要完成的第一项工作,比如学生成绩评比. 数据建立索引等.这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础.下面进入这个示例. 1 实例描述 对输入文件中数据进行排序. 输入文件中的每行内容均为一个数字, 即一个数据.要求在输出中每行有两个间隔的数字,其中, 第一个代表原始数据在原始数据集中的位次, 第二个代表原始数据. 样例输入: (1) file1: (2) file2: (3) file3: 样例输出: 2 设计思路

如何利用Apache Sqoop在DB2与Hadoop之间传递数据

随着云计算和物联网等技术在全球的快速发展,企业对大数据 (http://www.aliyun.com/zixun/aggregation/13527.html">Big Data) 业务的关注也持续升温.在大数据时代,数据无疑是企业的核心资产之一,若能盘活好数据,则能使企业在公司治理.企业决策和客户服务等方方面面受益匪浅:反之,则在现代企业竞争中,容易导致其核心竞争力下降,甚至衰落. Apache Hadoop 由于擅长处理大数据分析业务,受到了广大企业的青睐.目前,多数使用 Hadoop

hadoop mapreduce 数据分析 丢数据

问题描述 hadoop mapreduce 数据分析 丢数据 最近发现hadoop的mapreduce程序会丢数据,不知道是什么原因,请教各位:hadoop环境,通过mapreduce程序分析hdfs上的数据,一天的数据是按小时存储的,每一个小时一个文件价,数据格式都是一样的,现在如果在16点这个文件价里有一条数据a,如果我用mr分析一整天的数据,数据a则丢失,如果单独跑16点这个文件夹里的数据,则数据a不会丢失,可以正常被分析出来,只要一加上其他时间段的数据,数据a就分析不出来,请问这是为什么

hadoop-求救高手,Mapreduce导入数据到Hadoop报ClassNotFoundException

问题描述 求救高手,Mapreduce导入数据到Hadoop报ClassNotFoundException 最近在用Mapreduce连Hadoop,出现各类问题.请高手答疑. 环境: hadoop:2.7.0 Hbase:1.0.1.1 刚开始的时候报:HBaseConfiguration 找不到,百度之,说将 hbase的lib下的jar复制到hadoop的lib下 复制之,无果,找各类参考资料修改hadoop参数,都还是报异常. 最后无奈,只能修改 hadoop-env.sh,将 hbas

《Hadoop实战手册》一1.11 利用Flume加载数据到HDFS中

1.11 利用Flume加载数据到HDFS中 Apache Flume是Hadoop社区的一个项目,由多个相关项目组成,用于从不同的数据源可靠有效地加载数据流到HDFS中.Flume最常见的一个场景是加载多个数据源的网站日志数据.本节将介绍如何使用Flume加载数据到HDFS中. 准备工作在本节中假定你已经安装和配置好Flume. Flume可以从Apache网页(http://incubator.apache.org/flume/)下载. 如果你使用的是CDH3,那么默认已经安装了Flume

《Hadoop MapReduce性能优化》一2.1 研究Hadoop参数

2.1 研究Hadoop参数 Hadoop MapReduce性能优化 正如第1章中提到的那样,有很多因素会对Hadoop MapReduce性能产生影响.一般说来,与工作负载相关的Hadoop性能优化需要关注以下3个主要方面:系统硬件.系统软件,以及Hadoop基础设施组件的配置和调优/优化. 需要指出的是,Hadoop被归类为高扩展性解决方案,但却不足以归类为高性能集群解决方案.系统管理员可以通过各种配置选项来配置和调优Hadoop集群.性能配置参数主要关注CPU利用率.内存占用情况.磁盘I