文本挖掘分词mapreduce化

软件版本

paoding-analysis3.0

项目jar包和拷贝庖丁dic目录到项目的类路径下

修改paoding-analysis.jar下的paoding-dic-home.properties文件设置词典文件路径

paoding.dic.home=classpath:dic

分词程序demo

import java.io.IOException;
import java.io.StringReader;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;

import net.paoding.analysis.analyzer.PaodingAnalyzer;

public class TokenizeWithPaoding {
public static void main(String[] args) {
    
    String line="中华民族共和国";
    PaodingAnalyzer analyzer =new PaodingAnalyzer();
    StringReader sr=new StringReader(line);
    TokenStream ts=analyzer.tokenStream("", sr);//分词流,第一个参数无意义
    //迭代分词流
    try {
        while(ts.incrementToken()){
            CharTermAttribute ta=ts.getAttribute(CharTermAttribute.class);
            System.out.println(ta.toString());
        }
    } catch (Exception e) {
        
        e.printStackTrace();
    }
}
}

新闻文文本分类源文件

http://people.csail.mit.edu/jrennie/20Newsgroups/20news-bydate.tar.gz

每个文件夹代表一个类别,每个类别下的文件代表一条新闻

中文新闻分类需要先分词

对于大量小文件可以使用FileInputFormat的另一个抽象子类CombineFileInputFormat实现createRecordReader方法

CombineFileInputFormat重写了getSpilt方法,返回的分片类型是CombineFileSpilt,是InputSpilt的子类,可包含多个文件

RecordReader怎么由文件生成key-value是由nextKeyValue函数决定

自定义的CombineFileInputFormat类

package org.conan.myhadoop.fengci;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

/**
 * 自定义MyInputFormat类, 用于实现一个Split包含多个文件
 * @author BOB
 *
 */
public class MyInputFormat extends CombineFileInputFormat<Text, Text>{
        
        //禁止文件切分
        @Override
        protected boolean isSplitable(JobContext context, Path file) {
                return false;
        }

        @Override
        public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
                return new CombineFileRecordReader<Text, Text>((CombineFileSplit)split, context, MyRecordReader.class);
        }

}

自定义的RecordReader类

package org.conan.myhadoop.fengci;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

/**
 * 自定义MyRecordReader类, 用于读取MyInputFormat对象切分的Split分片中的内容
 * @author BOB
 *
 */
public class MyRecordReader extends RecordReader<Text, Text> {

        private CombineFileSplit combineFileSplit;                //当前处理的分片
        private Configuration conf;                        //作业的配置信息
        private Text currentKey = new Text();                //当前读入的key
        private Text currentValue = new Text();        //当前读入的value
        private int totalLength;                        //当前分片中文件的数量
        private int currentIndex;                                //正在读取的文件在当前分片中的位置索引
        private float currentProgress = 0F;                //当前进度
        private boolean processed = false;        //标记当前文件是否已经被处理过
        
        //构造方法
        public MyRecordReader(CombineFileSplit combineFileSplit,
                        TaskAttemptContext context, Integer fileIndex) {
                super();
                this.combineFileSplit = combineFileSplit;
                this.currentIndex = fileIndex;
                this.conf = context.getConfiguration();
                this.totalLength = combineFileSplit.getPaths().length;
        }

     
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        }
        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
                return currentKey;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
                return currentValue;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
                if(currentIndex >= 0 && currentIndex < totalLength) {
                        return currentProgress = (float) currentIndex/totalLength;
                }
                return currentProgress;
        }

        @Override
        public void close() throws IOException {

        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
                if(!processed) {
                        //由文件的父目录, 文件名以及目录分割符组成key
                        Path file = combineFileSplit.getPath(currentIndex);
                        StringBuilder sb = new StringBuilder();
                        sb.append("/");
                        sb.append(file.getParent().getName()).append("/");
                        sb.append(file.getName());
                        currentKey.set(sb.toString());
                        
                        //以整个文件的内容作为value
                        FSDataInputStream in = null;
                        byte[] content = new byte[(int)combineFileSplit.getLength(currentIndex)];
                        FileSystem fs = file.getFileSystem(conf);
                        in = fs.open(file);
                        in.readFully(content);
                        currentValue.set(content);
                        in.close();
                        processed = true;
                        return true;
                }
                return false;
        }

}

分词驱动类

package org.conan.myhadoop.fengci;

import java.io.IOException;
import java.io.StringReader;

import net.paoding.analysis.analyzer.PaodingAnalyzer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;

/**
 * 分词驱动器类, 用于给输入文件进行分词
 * @author BOB
 *
 */
public class TokenizerDriver extends Configured implements Tool{

        
        public static void main(String[] args) throws Exception{
                int res = ToolRunner.run(new Configuration(), new TokenizerDriver(), args);
                System.exit(res);
        }

        @Override
        public int run(String[] args) throws Exception {
                Configuration conf = new Configuration();
                //参数设置
                conf.setLong("mapreduce.input.fileinputformat.split.maxsize", 4000000);
              //作业名称
                Job job = new Job(conf,"Tokenizer");
                job.setJarByClass(TokenizerDriver.class);
                
                job.setMapperClass(Map.class);
                
                job.setInputFormatClass(MyInputFormat.class);
                
                job.setOutputFormatClass(SequenceFileOutputFormat.class);
                
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                Path inpath=new Path(args[0]);
                Path outpath=new Path(args[1]);
                FileSystem fs = inpath.getFileSystem(conf);
                FileStatus[] status = fs.listStatus(inpath);
                Path[] paths = FileUtil.stat2Paths(status);
                for(Path path : paths) {
                        FileInputFormat.addInputPath(job, path);
                }
                FileOutputFormat.setOutputPath(job, outpath);
                
                //输出文件夹已经存在则删除
                FileSystem hdfs = outpath.getFileSystem(conf);
                if(hdfs.exists(outpath)){
                    hdfs.delete(outpath,true);
                    hdfs.close();
                }
                //没有Reduce任务
                job.setNumReduceTasks(0); 
                return job.waitForCompletion(true) ? 0 : 1;
        }
        
        /**
         * Hadoop计算框架下的Map类, 用于并行处理文本分词任务
         * @author BOB
         *
         */
        static class Map extends Mapper<Text, Text, Text, Text> {
                
                @Override
                protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
                        //创建分词器
                        Analyzer analyzer = new PaodingAnalyzer();
                        String line = value.toString();
                        StringReader reader = new StringReader(line);
                        //获取分词流对象
                        TokenStream ts = analyzer.tokenStream("", reader);
                        StringBuilder sb = new StringBuilder();
                        
                        //遍历分词流中的词语
                        while(ts.incrementToken()) {
                                CharTermAttribute ta = ts.getAttribute(CharTermAttribute.class);
                                if(sb.length() != 0) {
                                        sb.append(" ").append(ta.toString());
                                } else {
                                        sb.append(ta.toString());
                                }
                        }
                        value.set(sb.toString());
                        context.write(key, value);
                }
                
        }
}

分词预先处理结果,将所有新闻集中到一个文本中,key为类别,一行代表一篇新闻,单词之间用空格分开

处理后的数据可用于mahout做贝叶斯分类器

参考文章:

http://f.dataguru.cn/thread-244375-1-1.html

http://www.cnblogs.com/panweishadow/p/4320720.html

 

本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1712931

时间: 2024-11-03 01:59:32

文本挖掘分词mapreduce化的相关文章

百度分词理论之标题的设计实例解析(上篇)

影响一个页面在搜索引擎排名最重要的因素之一是相关度,而最直接体现一个页面与用户搜索行为是否相关的就是标题,那么想写好一个标题你就不能不去深入的了解搜索引擎的分词原理!以百度为例,经过十年左右的完善,在中文搜索里百度已经是一个非常高效的搜索引擎,大家也和点水一样肯定都知道一个高效的搜索引擎工作肯定会涉及到很多为人知的技术点,但我们如果把复杂的搜索引擎工作简化为三个步骤那么分别是:查询,分词,匹配.那我们通过实例来看看搜索引擎是如何处理的呢,为了能理解的更加直观,我们以水手(年轻时很喜欢这首歌)为主

MapReduce朝不保夕的江湖地位

可怜的MapReduce,直到2013年末,都是Hadoop系统中的关键一环,在这个开源大数据处理框架中,它既是集群的资源管理器,又作为主要编程手段和处理环境存在.但如今看来,情况正在发生变化. Apache Software Foundation的Hadoop 2版本添加了一个名叫YARN的新技术,取代了MapReduce的资源管理角色,并将Hadoop发展成了超越MapReduce批处理作业的应用程序.目前有很多厂商推出了SQL-on-Hadoop工具,让用户编写针对Hadoop数据分析查询

面向云环境的移动信息服务情景化协同过滤推荐

面向云环境的移动信息服务情景化协同过滤推荐 张亚明 刘海鸥 如何有效融合用户情景信息参量以实现其在移动信息推荐服务中的无缝耦合,是个性化推荐领域研究的热点问题.针对移动信息服务推荐的情景"缺位"问题,将情景信息引入协同过滤(CF)推荐算法,结合云计算处理技术深入探讨面向云环境的移动信息服务情景化协同过滤推荐机制,并提出Mapreduce化的协同过滤推荐方法.面向云环境的"旅游景点移动信息服务情景化推荐系统"实验结果表明,本文方法可有效改进CF在海量移动数据环境下的执

R语言为Hadoop注入统计血脉

R是GNU的一个开源工具,具有S语言血统,擅长统计计算和统计制图.由Revolution Analytics发起的一个开源项目RHadoop将R语言与Hadoop结合在一起,很好发挥了R语言特长.广大R语言爱好者借助强大工具RHadoop,可以在大数据领域大展拳脚,这对R语言程序员来说无疑是个喜讯.作者从一个程序员的角度对R语言和Hadoop做了一次详细的讲解. 以下为原文: 前言 写过几篇关于RHadoop的技术性文章,都是从统计的角度,介绍如何让R语言利用Hadoop处理大数据.今天决定反过

菜鸟网络高级专家章天锋:用大数据把夜晚还给分拣员

WOT2015"互联网+"时代大数据技术峰会于2015年11月28日于深圳前海华侨城JW万豪酒店盛大揭幕,42位业内重量级嘉宾汇聚,重磅解析大数据技术的点睛应用.秉承专注技术.服务技术人员的理念.  本次峰会涵盖九大技术主题,分别是:互联网金融.O2O电商架构.医疗应用.商业创新.移动大数据.技术创业.社交网络.数据安全.广告数据技术.DBA+社群作为本次大会合作方,将通过图文直播为大家全程跟踪报道这场技术盛宴.   在惨烈的11.11厮杀中,谁能在用户体验上出奇制胜,谁就拥有了主战场

基于hadoop的推荐系统设计与实现

基于hadoop的推荐系统设计与实现 电子科技大学  唐真 主要工作内容如下:1.通过对Hadoop运行机制和MapReduce编程原理的研究,结合对推荐系统与推荐算法,特别是对以物质扩散推荐算法和热传导推荐算法为代表的网络推荐算法的深入分析,设计和实现基于Hadoop平台的网络推荐算法MapReduce化编程实现方案,将该算法复杂的计算任务分解为一系列MapReduce作业流程,以便于在Hadoop和云计算平台上进行分布式并行化处理,通过一系列实验测试证明算法在集群上具有良好的并行性和可扩展性

如何让Hadoop结合R语言做大数据分析?

为什么要让Hadoop结合R语言? R语言和Hadoop让我们体会到了,两种技术在各自领域的强大.很多http://www.aliyun.com/zixun/aggregation/7155.html">开发人员在计算机的角度,都会提出下面2个问题.问题1: Hadoop的家族如此之强大,为什么还要结合R语言? 问题2: Mahout同样可以做数据挖掘和机器学习,和R语言的区别是什么?下面我尝试着做一个解答:问题1: Hadoop的家族如此之强大,为什么还要结合R语言? a. Hadoop

并行化的情感分类算法的研究

并行化的情感分类算法的研究 余永红 向小军  商琳 在海量数据集上执行情感分类任务时,传统的单机情感分类算法的扩展性成为系统的瓶颈.在云计算平台Hadoop上,实现了情感分类任务中特征提取.特征向量加权和情感分类等算法的MapReduce化.在情感语料数据集上,对各种子步骤组合下情感分类算法的精度及每种算法的时间开销进行了对比分析.实验结果验证了实现的并行化情感分类算法的有效性,同时它为用户选择合适算法实现情感分类任务提供了有价值的参考信息. 并行化的情感分类算法的研究

基于Hadoop的视频摘要的设计与实现

基于Hadoop的视频摘要的设计与实现 华南理工大学 彭华聪 本文意图利用云计算技术将目前主流的基于运动过程的动态视频摘要技术由单机提取模式改造成分布式模式.从而在面对数据膨胀时,仅仅通过加入廉价的计算节点来提高视频摘要的提取速率,使得视频摘要技术能更好的投入实际生产中.首先,对云计算技术和视频摘要技术进行了大量的研究.选取了合适的开源云计算平台Hadoop分布式系统,并对Hadoop分布式系统与处理视频摘要的相关机制进行了分析.在视频摘要技术方面,概述了视频摘要技术的总体状况,详细介绍了基于运