MapReduce TotalOrderPartitioner的全局排序

我们知道Mapreduce框架在feed数据给reducer之前会对map output key排序,这种排序机制保证了每一个reducer局部有序,hadoop 默认的partitioner是HashPartitioner,它依赖于output key的hashcode,使得相同key会去相同reducer,但是不保证全局有序,如果想要获得全局排序结果(比如获取top N, bottom N),就需要用到TotalOrderPartitioner了,它保证了相同key去相同reducer的同时也保证了全局有序。

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}
/**
 * Partitioner effecting a total order by reading split points from
 * an externally generated source.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
    extends Partitioner<K,V> implements Configurable {
  // by construction, we know if our keytype
  @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
  public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(key);
  }
}

TotalOrderPartitioner依赖于一个partition file来distribute keys,partition file是一个实现计算好的sequence file,如果我们设置的reducer number是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。

InputSampler类的writePartitionFile方法会对input files取样并创建partition file。有三种取样方法:

1. RandomSampler  随机取样

2. IntervalSampler  从s个split里面按照一定间隔取样,通常适用于有序数据

3. SplitSampler  从s个split中选取前n条记录取样

更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/webkf/tools/

paritition file可以通过TotalOrderPartitioner.setPartitionFile(conf, partitionFile)来设置,在TotalOrderPartitioner instance创建的时候会调用setConf函数,这时会读入partition file中key值,如果key是BinaryComparable(可以认为是字符串类型)的话会构建trie,时间复杂度是O(n), n是树的深度。如果是非BinaryComparable类型就构建BinarySearchNode,用二分查找,时间复杂度O(log(n)),n是reduce数

boolean natOrder =
  conf.getBoolean(NATURAL_ORDER, true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
  partitions = buildTrie((BinaryComparable[])splitPoints, 0,
      splitPoints.length, new byte[0],
      // Now that blocks of identical splitless trie nodes are
      // represented reentrantly, and we develop a leaf for any trie
      // node with only one split point, the only reason for a depth
      // limit is to refute stack overflow or bloat in the pathological
      // case where the split points are long and mostly look like bytes
      // iii...iixii...iii   .  Therefore, we make the default depth
      // limit large but not huge.
      conf.getInt(MAX_TRIE_DEPTH, 200));
} else {
  partitions = new BinarySearchNode(splitPoints, comparator);
}

示例程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;  

public class TotalSortMR {  

    public static int runTotalSortJob(String[] args) throws Exception {
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
        Path partitionFile = new Path(args[2]);
        int reduceNumber = Integer.parseInt(args[3]);  

        // RandomSampler第一个参数表示key会被选中的概率,第二个参数是一个选取samples数,第三个参数是最大读取input splits数
        RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.1, 10000, 10);  

        Configuration conf = new Configuration();
        // 设置partition file全路径到conf
        TotalOrderPartitioner.setPartitionFile(conf, partitionFile);  

        Job job = new Job(conf);
        job.setJobName("Total-Sort");
        job.setJarByClass(TotalSortMR.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setNumReduceTasks(reduceNumber);  

        // partitioner class设置成TotalOrderPartitioner
        job.setPartitionerClass(TotalOrderPartitioner.class);  

        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);
        outputPath.getFileSystem(conf).delete(outputPath, true);  

        // 写partition file到mapreduce.totalorderpartitioner.path
        InputSampler.writePartitionFile(job, sampler);  

        return job.waitForCompletion(true)? 0 : 1;  

    }  

    public static void main(String[] args) throws Exception{
        System.exit(runTotalSortJob(args));
    }
}

上面的例子是采用InputSampler来创建partition file,其实还可以使用mapreduce来创建,可以自定义一个inputformat来取样,将output key输出到一个reducer

ps:hive 0.12实现了parallel ORDER BY(https://issues.apache.org/jira/browse/HIVE-1402),也是基于TotalOrderPartitioner,非常靠谱的new feature啊

作者:csdn博客 lalaguozhe

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索mapreduce
, 排序
, key
, public
, 全局
, 相同
有序
mapreduce全局变量、mapreduce 全局排序、mapreduce 全局计数器、total order、sql ordertotal,以便于您获取更多的相关知识。

时间: 2024-10-03 17:14:49

MapReduce TotalOrderPartitioner的全局排序的相关文章

C# datagrid全局排序

问题描述 C#使用datagridview+bindingnavigator自行定义了一个分页排序的功能,排序是全局排序,不是在分页内排序:分页已搞定,但全局排序有点问题:1.重写了ColumnHeaderMouseClick,代码大致如下:privatevoid***_ColumnHeaderMouseClick(objectsender,DataGridViewCellMouseEventArgse){//获取点击列DataGridViewColumnnewColumn=this.dgvAc

MapReduce初级案例——数据排序

" 数据排序"是许多实际任务执行时要完成的第一项工作,比如学生成绩评比. 数据建立索引等.这个实例和数据去重类似,都是先对原始数据进行初步处理,为进一步的数据操作打好基础.下面进入这个示例. 1 实例描述 对输入文件中数据进行排序. 输入文件中的每行内容均为一个数字, 即一个数据.要求在输出中每行有两个间隔的数字,其中, 第一个代表原始数据在原始数据集中的位次, 第二个代表原始数据. 样例输入: (1) file1: (2) file2: (3) file3: 样例输出: 2 设计思路

Hive中的排序语法

ORDER BY hive中的ORDER BY语句和关系数据库中的sql语法相似.他会对查询结果做全局排序,这意味着所有的数据会传送到一个Reduce任务上,这样会导致在大数量的情况下,花费大量时间. 与数据库中 ORDER BY 的区别在于在hive.mapred.mode = strict模式下,必须指定 limit 否则执行会报错. hive> set hive.mapred.mode=strict; hive> select * from test order by id; FAILE

从MapReduce的执行来看如何优化MaxCompute(原ODPS) SQL

SQL基础有这些操作(按照执行顺序来排列): from join(left join, right join, inner join, outer join ,semi join) where group by select sum distinct count order by 如果我们能理解mapreduce是怎么实现这些SQL中的基本操作的,那么我们将很容易理解怎么优化SQL写法.接下来我们一个一个的谈: from 这个操作是在解析过程中就完成了,目的就是找出输入的表(文件). join(

MapReduce:超大机群上的简单数据处理

MapReduce:超大机群上的简单数据处理                                           摘要MapReduce是一个编程模型,和处理,产生大数据集的相关实现.用户指定一个 map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间 value.下面将列举许多可以用这个模型来表示的现实世界的工作.以这种方式写的程序能自动的在大规模的普通机器上实现并行化.这个运行时系统关心这

hive中的排序和分组

order by order by 会对输入坐全局排序,因此 只有一个reducer(多个reducer会无法保证全局有序)只有一个reducer,会导致当输入规模较大时,需要较长的计算时间. set hive.mapred.mode =nonstrict;(default value/默认值) set hive.mapred.mode =strict; order by 和数据库中的order by 功能一致,按照某一项或几项的排序输出. 与数据库中order by 的区别在于在 hive.m

《R与Hadoop大数据分析实战》一2.3 Hadoop MapReduce原理

2.3 Hadoop MapReduce原理 为了更好地理解MapReduce的工作原理,我们将会: 学习MapReduce对象. MapReduce中实现Map阶段的执行单元数目. MapReduce中实现Reduce阶段的执行单元数目. 理解MapReduce的数据流. 深入理解Hadoop MapReduce. 2.3.1 MapReduce对象 由Hadoop的MapReduce技术可以引申出如下3个主要对象: Mapper:它主要用于实现MapReduce的Map阶段的操作.该对象在M

ext表头正序倒序设置成全局

问题描述 ext表头正序倒序设置成全局 在ext列表的表头点击的正序倒序想要全局排序,用了sortremote:true,没用,有没有大神指点一下 解决方案 点击正序 倒序 全局查询 解决方案二: remote为true需要你服务器依据排序条件服务器端进行排序,客户端不在干预排序结果,你服务器端修改了没有先.. ext会发送一个sort参数到服务器,里面包含要排序的字段和怎么样排序(升或者降) sort:[{"property":"rating","dir

《Hadoop MapReduce性能优化》一2.1 研究Hadoop参数

2.1 研究Hadoop参数 Hadoop MapReduce性能优化 正如第1章中提到的那样,有很多因素会对Hadoop MapReduce性能产生影响.一般说来,与工作负载相关的Hadoop性能优化需要关注以下3个主要方面:系统硬件.系统软件,以及Hadoop基础设施组件的配置和调优/优化. 需要指出的是,Hadoop被归类为高扩展性解决方案,但却不足以归类为高性能集群解决方案.系统管理员可以通过各种配置选项来配置和调优Hadoop集群.性能配置参数主要关注CPU利用率.内存占用情况.磁盘I