MapReduce实战--倒排索引

1.倒排索引简介

倒排索引(Inverted index),也常被称为反向索引置入档案反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。

有两种不同的反向索引形式:

  • 一条记录的水平反向索引(或者反向档案索引)包含每个引用单词的文档的列表。
  • 一个单词的水平反向索引(或者完全反向索引)又包含每个单词在一个文档中的位置。

后者的形式提供了更多的兼容性(比如短语搜索),但是需要更多的时间和空间来创建。

举例:

以英文为例,下面是要被索引的文本:

  • T0 = "it is what it is"
  • T1 = "what is it"
  • T2 = "it is a banana"

我们就能得到下面的反向文件索引:

 "a":      {2}
 "banana": {2}
 "is":     {0, 1, 2}
 "it":     {0, 1, 2}
 "what":   {0, 1}

检索的条件"what""is" 和 "it" 将对应这个集合:{0,1}∩{0,1,2}∩{0,1,2}={0,1}。

对相同的文字,我们得到后面这些完全反向索引,有文档数量和当前查询的单词结果组成的的成对数据。 同样,文档数量和当前查询的单词结果都从零开始。

所以,"banana": {(2, 3)} 就是说 "banana"在第三个文档里 (T2),而且在第三个文档的位置是第四个单词(地址为 3)。

"a":      {(2, 2)}
"banana": {(2, 3)}
"is":     {(0, 1), (0, 4), (1, 1), (2, 1)}
"it":     {(0, 0), (0, 3), (1, 2), (2, 0)}
"what":   {(0, 2), (1, 0)}

如果我们执行短语搜索"what is it" 我们得到这个短语的全部单词各自的结果所在文档为文档0和文档1。但是这个短语检索的连续的条件仅仅在文档1得到。

2.分析和设计

(1)Map过程

首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容,Map过程首先必须分析输入的<key, value>对,得到倒排索引中需要的三个信息:单词、文档URI和词频,如图所示:

存在两个问题,第一:<key, value>对只能有两个值,在不使用Hadoop自定义数据类型的情况下,需要根据情况将其中的两个值合并成一个值,作为value或key值;

第二,通过一个Reduce过程无法同时完成词频统计和生成文档列表,所以必须增加一个Combine过程完成词频统计

public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text> {
    private Text keyInfo = new Text();  //存储单词和URI的组合
    private Text valueInfo = new Text();//存储词频
    private FileSplit split;            //存储Split对象

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        //获得<key,value>对所属的FileSplit对象
        split = (FileSplit)context.getInputSplit();
        StringTokenizer itr = new StringTokenizer(value.toString());

        while(itr.hasMoreTokens()) {
            //key值由单词和URI组成,如"MapReduce:1.txt"
            keyInfo.set(itr.nextToken() + ":" + split.getPath().toString());
            // 词频初始为1
            valueInfo.set("1");
            context.write(keyInfo, valueInfo);
        }
    }
}

(2)Combine过程

将key值相同的value值累加,得到一个单词在文档中的词频,如图

public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
    private Text info = new Text();
    public void reduce(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException {
        //统计词频
        int sum = 0;
        for(Text value : values) {
            sum += Integer.parseInt(value.toString());
        }
        int splitIndex= key.toString().indexOf(":");

        //重新设置value值由URI和词频组成
        info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
        //重新设置key值为单词
        key.set(key.toString().substring(0, splitIndex));
        context.write(key, info);
    }
}

(3)Reduce过程

讲过上述两个过程后,Reduce过程只需将相同key值的value值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给MapReduce框架进行处理了

public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
    private Text result = new Text();
    public void reducer(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException {
        //生成文档列表
        String fileList = new String();
        for(Text value : values) {
            fileList += value.toString() + ";";
        }
        result.set(fileList);
        context.write(key, result);
    }
}

完整代码如下:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class InvertedIndex {
    public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text> {
        private Text keyInfo = new Text();
        private Text valueInfo = new Text();
        private FileSplit split;

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            split = (FileSplit)context.getInputSplit();
            StringTokenizer itr = new StringTokenizer(value.toString());

            while(itr.hasMoreTokens()) {
                keyInfo.set(itr.nextToken() + ":" + split.getPath().toString());
                valueInfo.set("1");
                context.write(keyInfo, valueInfo);
            }
        }

    }
    public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
        private Text info = new Text();
        public void reduce(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for(Text value : values) {
                sum += Integer.parseInt(value.toString());
            }
            int splitIndex= key.toString().indexOf(":");
            info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
            key.set(key.toString().substring(0, splitIndex));
            context.write(key, info);
        }
    }
    public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();
        public void reducer(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException {
            String fileList = new String();
            for(Text value : values) {
                fileList += value.toString() + ";";
            }
            result.set(fileList);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "InvertedIndex");
        job.setJarByClass(InvertedIndex.class);
        job.setMapperClass(InvertedIndexMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(InvertedIndexReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
时间: 2024-08-04 06:12:28

MapReduce实战--倒排索引的相关文章

《Hadoop MapReduce实战手册》一2.10 挂载HDFS(Fuse-DFS)

2.10 挂载HDFS(Fuse-DFS) Hadoop MapReduce实战手册 Fuse-DFS项目使我们能够在Linux上挂载HDFS(也支持许多其他版本的Unix)作为标准的文件系统.这样做,可以允许任何程序或用户使用类似于传统的文件系统的方式访问HDFS和与HDFS交互. 准备工作 系统中必须安装以下软件: Apache Ant(http://ant.apache.org/): Fuse和fuse开发包.Fuse开发文件可以通过Redhat/Fedora安装fuse-devel RP

《Hadoop MapReduce实战手册》一导读

前 言 Hadoop MapReduce实战手册 本书目标是帮助读者学会处理大型的复杂数据集.本书虽从简单的例子开始,但仍然可以看到深入的内容.这是一本简单的一站式指南,传授如何完成复杂的事情.它以一种简单而直接的方式呈现了90个攻略,给出了一步步的指导和真实环境的应用示例. 本产品包括在Apache软件基金会(http://www.apache.org/)开发的软件. 本书涵盖的内容 第1章解释了如何以单点模式以及集群模式安装和运行Hadoop. 第2章介绍了一套高级的HDFS操作,在处理大规

《Hadoop MapReduce实战手册》一1.8 在分布式集群环境中设置Hadoop

1.8 在分布式集群环境中设置Hadoop Hadoop MapReduce实战手册 Hadoop的部署包括一套HDFS.一个JobTracker和多个TaskTracker.在1.5节中,我们讨论了HDFS的部署.为了设置Hadoop,我们需要配置JobTracker和TaskTracker,然后在HADOOP_ HOME/conf/slaves文件中指定TaskTracker列表.当我们启动JobTracker时,它会启动相应的TaskTracker节点列表.图1-5描述了一套完整的Hado

《Hadoop MapReduce实战手册》一2.11 在HDFS中合并文件

2.11 在HDFS中合并文件 Hadoop MapReduce实战手册本节将传授如何合并HDFS中的若干文件,以创建一个单独的文件.对于获取有多个输出部分结果的reducer的MapReduce作业的计算结果来说,这招非常有用. 操作步骤HDFS的getMerge命令可以将HDFS中给定路径下的文件,复制到本地文件系统的单个合并后的文件中. >bin/hadoopfs -getmerge /user/foo/demofiles merged.txt 工作原理getmerge命令的语法如下: h

《Hadoop MapReduce实战手册》一第1章 搭建Hadoop并在集群中运行

第1章 搭建Hadoop并在集群中运行 Hadoop MapReduce实战手册本章将学习以下内容: 在你的机器上安装Hadoop 写WordCountMapReduce示例程序,打包并使用独立的Hadoop运行它 给WordCountMapReduce程序增加combiner步骤 安装HDFS 使用HDFS监控UI HDFS的基本命令行文件操作 在分布式集群环境中设置Hadoop 在分布式集群环境中运行WordCount程序 使用MapReduce监控UI

《Hadoop MapReduce实战手册》一1.7 HDFS的基本命令行文件操作

1.7 HDFS的基本命令行文件操作 Hadoop MapReduce实战手册HDFS是一个分布式的文件系统,就像一个Unix文件系统一样,它允许用户使用shell命令操纵文件系统.本节将说明如何使用HDFS的基本命令行来执行这些操作. 值得注意的是,每一条HDFS命令都有一个与之一一对应的Unix命令.例如,下面的命令: >hadoopdfs –cat /data/foo.txt 该命令用于读取/data/foo.txt文件,并把它打印到屏幕上,就像Unix系统的cat命令一样. 准备工作通过

《Hadoop MapReduce实战手册》一2.2 HDFS基准测试

2.2 HDFS基准测试 Hadoop MapReduce实战手册运行基准测试程序,可以很好地验证HDFS集群是否已如预期般正确设置并执行.DFSIO是一个Hadoop自带的基准测试,可以用来分析一个HDFS集群的I/O性能.该部分展示了如何使用DFSIO来对HDFS集群的读取和写入性能进行基准测试. 准备工作在运行这些基准程序之前,必须安装和部署HDFS和MapReduce.导出HADOOP_HOME环境变量,将其指向Hadoop安装根目录: >export HADOOP_HOME = /..

《Hadoop MapReduce实战手册》一2.7 设置文件冗余因子

2.7 设置文件冗余因子 Hadoop MapReduce实战手册HDFS跨集群存储文件时,会把文件切分成粗粒度的.大小固定的块.出于容错的目的,这些粗粒度的数据块会被复制到不同的DataNode中.数据块的冗余有助于增加数据本地化MapReduce计算的能力,同时也可以增加总的数据访问带宽.减少冗余因子则有助于节省HDFS上的存储空间. HDFS冗余因子(HDFS replication factor)是文件级属性,可以基于每个文件进行单独配置.本节将展示如何通过改变HDFS部署的默认冗余因子

《Hadoop MapReduce实战手册》一2.6 设置HDFS块大小

2.6 设置HDFS块大小 Hadoop MapReduce实战手册HDFS跨集群存储文件时,会把文件切分成粗粒度的.大小固定的块.默认的HDFS块大小为64 MB.数据产品的块大小会影响文件系统操作的性能,如果存储和处理非常大的文件,那么较大的块大小会更高效.数据产品的块大小会影响MapReduce计算的性能,因为Hadoop的默认行为是为输入文件中的每个数据块创建一个map任务. 操作步骤 要使用NameNode的配置文件来设置HDFS的块大小,需要在$HADOOP_HOME/conf/hd