Hadoop: MapReduce2的几个基本示例

1) WordCount 

这个就不多说了,满大街都是,网上有几篇对WordCount的详细分析

http://www.sxt.cn/u/235/blog/5809

http://www.cnblogs.com/zhanghuijunjava/archive/2013/04/27/3036549.html

这二篇都写得不错, 特别几张图画得很清晰

 

2) 去重处理(Distinct)

类似于db中的select distinct(x) from table , 去重处理甚至比WordCount还要简单,假如我们要对以下文件的内容做去重处理(注:该文件也是后面几个示例的输入参数)

2
8
8
3
2
3
5
3
0
2
7

基本上啥也不用做,在map阶段,把每一行的值当成key分发下去,然后在reduce阶段回收上来就可以了.

注:里面用到了一个自己写的类HDFSUtil,可以在 hadoop: hdfs API示例 一文中找到.

原理:map阶段完成后,在reduce开始之前,会有一个combine的过程,相同的key值会自动合并,所以自然而然的就去掉了重复.

 1 package yjmyzz.mr;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.NullWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 import org.apache.hadoop.util.GenericOptionsParser;
13
14 import yjmyzz.util.HDFSUtil;
15
16 import java.io.IOException;
17
18
19 public class RemoveDup {
20
21     public static class RemoveDupMapper
22             extends Mapper<Object, Text, Text, NullWritable> {
23
24         public void map(Object key, Text value, Context context)
25                 throws IOException, InterruptedException {
26             context.write(value, NullWritable.get());
27             //System.out.println("map: key=" + key + ",value=" + value);
28         }
29
30     }
31
32     public static class RemoveDupReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
33         public void reduce(Text key, Iterable<NullWritable> values, Context context)
34                 throws IOException, InterruptedException {
35             context.write(key, NullWritable.get());
36             //System.out.println("reduce: key=" + key);
37         }
38     }
39
40     public static void main(String[] args) throws Exception {
41         Configuration conf = new Configuration();
42         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
43         if (otherArgs.length < 2) {
44             System.err.println("Usage: RemoveDup <in> [<in>...] <out>");
45             System.exit(2);
46         }
47
48         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
49         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
50
51         Job job = Job.getInstance(conf, "RemoveDup");
52         job.setJarByClass(RemoveDup.class);
53         job.setMapperClass(RemoveDupMapper.class);
54         job.setCombinerClass(RemoveDupReducer.class);
55         job.setReducerClass(RemoveDupReducer.class);
56         job.setOutputKeyClass(Text.class);
57         job.setOutputValueClass(NullWritable.class);
58
59
60         for (int i = 0; i < otherArgs.length - 1; ++i) {
61             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
62         }
63         FileOutputFormat.setOutputPath(job,
64                 new Path(otherArgs[otherArgs.length - 1]));
65         System.exit(job.waitForCompletion(true) ? 0 : 1);
66     }
67
68
69 }

View Code

输出:

0
2
3
5
7
8

 

3) 记录计数(Count)

这个跟WordCount略有不同,类似于Select Count(*) from tables的效果,代码也超级简单,直接拿WordCount改一改就行了

 1 package yjmyzz.mr;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12 import org.apache.hadoop.util.GenericOptionsParser;
13 import yjmyzz.util.HDFSUtil;
14
15 import java.io.IOException;
16 import java.util.StringTokenizer;
17
18
19 public class RowCount {
20
21     public static class RowCountMapper
22             extends Mapper<Object, Text, Text, IntWritable> {
23
24         private final static IntWritable one = new IntWritable(1);
25         private final  static Text countKey = new Text("count");
26
27         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
28                 context.write(countKey, one);
29         }
30     }
31
32     public static class RowCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
33         private IntWritable result = new IntWritable();
34
35         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
36             int sum = 0;
37             for (IntWritable val : values) {
38                 sum += val.get();
39             }
40             result.set(sum);
41             context.write(key, result);
42         }
43     }
44
45     public static void main(String[] args) throws Exception {
46         Configuration conf = new Configuration();
47         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
48         if (otherArgs.length < 2) {
49             System.err.println("Usage: RowCount <in> [<in>...] <out>");
50             System.exit(2);
51         }
52         //删除输出目录(可选)
53         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
54
55         Job job = Job.getInstance(conf, "word count");
56         job.setJarByClass(RowCount.class);
57         job.setMapperClass(RowCountMapper.class);
58         job.setCombinerClass(RowCountReducer.class);
59         job.setReducerClass(RowCountReducer.class);
60         job.setOutputKeyClass(Text.class);
61         job.setOutputValueClass(IntWritable.class);
62         for (int i = 0; i < otherArgs.length - 1; ++i) {
63             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
64         }
65         FileOutputFormat.setOutputPath(job,
66                 new Path(otherArgs[otherArgs.length - 1]));
67         System.exit(job.waitForCompletion(true) ? 0 : 1);
68     }
69
70
71 }

View Code

输出: count 11

注:如果只想输出一个数字,不需要"count"这个key,可以改进一下:

 1 package yjmyzz.mr;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.NullWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 import org.apache.hadoop.util.GenericOptionsParser;
14 import yjmyzz.util.HDFSUtil;
15
16 import java.io.IOException;
17
18
19 public class RowCount2 {
20
21     public static class RowCount2Mapper
22             extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
23
24         public long count = 0;
25
26         public void map(LongWritable key, Text value, Context context)
27                 throws IOException, InterruptedException {
28             count += 1;
29         }
30
31         protected void cleanup(Context context) throws IOException, InterruptedException {
32             context.write(new LongWritable(count), NullWritable.get());
33         }
34
35     }
36
37     public static class RowCount2Reducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
38
39         public long count = 0;
40
41         public void reduce(LongWritable key, Iterable<NullWritable> values, Context context)
42                 throws IOException, InterruptedException {
43             count += key.get();
44         }
45
46
47         protected void cleanup(Context context) throws IOException, InterruptedException {
48             context.write(new LongWritable(count), NullWritable.get());
49         }
50
51     }
52
53     public static void main(String[] args) throws Exception {
54         Configuration conf = new Configuration();
55         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
56         if (otherArgs.length < 2) {
57             System.err.println("Usage: FindMax <in> [<in>...] <out>");
58             System.exit(2);
59         }
60
61         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
62         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
63
64         Job job = Job.getInstance(conf, "RowCount2");
65         job.setJarByClass(RowCount2.class);
66         job.setMapperClass(RowCount2Mapper.class);
67         job.setCombinerClass(RowCount2Reducer.class);
68         job.setReducerClass(RowCount2Reducer.class);
69         job.setOutputKeyClass(LongWritable.class);
70         job.setOutputValueClass(NullWritable.class);
71
72         for (int i = 0; i < otherArgs.length - 1; ++i) {
73             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
74         }
75         FileOutputFormat.setOutputPath(job,
76                 new Path(otherArgs[otherArgs.length - 1]));
77         System.exit(job.waitForCompletion(true) ? 0 : 1);
78     }
79
80
81 }

View Code

这样输出结果就只有一个数字11了.

注意: 这里context.write(xxx)只能写在cleanup方法中, 该方法在Mapper和Reducer接口中都有, 在map方法及reduce方法执行完后,会触发cleanup方法. 大家可以尝试下,把context.write(xxx)写在map和reduce方法中试试看,结果会出现多行记录,而不是预期的仅1个数字.

 

4)求最大值(Max)

 1 package yjmyzz.mr;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.NullWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 import org.apache.hadoop.util.GenericOptionsParser;
14 import yjmyzz.util.HDFSUtil;
15
16 import java.io.IOException;
17
18
19 public class Max {
20
21     public static class MaxMapper
22             extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
23
24         public long max = Long.MIN_VALUE;
25
26         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
27             max = Math.max(Long.parseLong(value.toString()), max);
28         }
29
30         protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
31             context.write(new LongWritable(max), NullWritable.get());
32         }
33
34     }
35
36     public static class MaxReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
37
38         public long max = Long.MIN_VALUE;
39
40         public void reduce(LongWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
41
42             max = Math.max(max, key.get());
43
44         }
45
46
47         protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
48             context.write(new LongWritable(max), NullWritable.get());
49         }
50
51     }
52
53     public static void main(String[] args) throws Exception {
54         Configuration conf = new Configuration();
55         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
56         if (otherArgs.length < 2) {
57             System.err.println("Usage: Max <in> [<in>...] <out>");
58             System.exit(2);
59         }
60
61         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
62         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
63
64         Job job = Job.getInstance(conf, "Max");
65         job.setJarByClass(Max.class);
66         job.setMapperClass(MaxMapper.class);
67         job.setCombinerClass(MaxReducer.class);
68         job.setReducerClass(MaxReducer.class);
69         job.setOutputKeyClass(LongWritable.class);
70         job.setOutputValueClass(NullWritable.class);
71
72         for (int i = 0; i < otherArgs.length - 1; ++i) {
73             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
74         }
75         FileOutputFormat.setOutputPath(job,
76                 new Path(otherArgs[otherArgs.length - 1]));
77         System.exit(job.waitForCompletion(true) ? 0 : 1);
78     }
79
80
81 }

View Code

输出结果:8

如果看懂了刚才的Count2版本的代码,这个自然不用多解释.

 

5)求和(Sum)

 1 package yjmyzz.mr;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.NullWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13 import org.apache.hadoop.util.GenericOptionsParser;
14 import yjmyzz.util.HDFSUtil;
15
16 import java.io.IOException;
17
18
19 public class Sum {
20
21     public static class SumMapper
22             extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
23
24         public long sum = 0;
25
26         public void map(LongWritable key, Text value, Context context)
27                 throws IOException, InterruptedException {
28             sum += Long.parseLong(value.toString());
29         }
30
31         protected void cleanup(Context context) throws IOException, InterruptedException {
32             context.write(new LongWritable(sum), NullWritable.get());
33         }
34
35     }
36
37     public static class SumReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
38
39         public long sum = 0;
40
41         public void reduce(LongWritable key, Iterable<NullWritable> values, Context context)
42                 throws IOException, InterruptedException {
43             sum += key.get();
44         }
45
46
47         protected void cleanup(Context context) throws IOException, InterruptedException {
48             context.write(new LongWritable(sum), NullWritable.get());
49         }
50
51     }
52
53     public static void main(String[] args) throws Exception {
54         Configuration conf = new Configuration();
55         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
56         if (otherArgs.length < 2) {
57             System.err.println("Usage: Sum <in> [<in>...] <out>");
58             System.exit(2);
59         }
60
61         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
62         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
63
64         Job job = Job.getInstance(conf, "Sum");
65         job.setJarByClass(Sum.class);
66         job.setMapperClass(SumMapper.class);
67         job.setCombinerClass(SumReducer.class);
68         job.setReducerClass(SumReducer.class);
69         job.setOutputKeyClass(LongWritable.class);
70         job.setOutputValueClass(NullWritable.class);
71
72         for (int i = 0; i < otherArgs.length - 1; ++i) {
73             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
74         }
75         FileOutputFormat.setOutputPath(job,
76                 new Path(otherArgs[otherArgs.length - 1]));
77         System.exit(job.waitForCompletion(true) ? 0 : 1);
78     }
79
80
81 }

View Code

输出结果:43

Sum与刚才的Max原理如出一辙,不多解释了,依旧利用了cleanup方法

 

6)求平均值(Avg)

  1 package yjmyzz.mr;
  2
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.fs.Path;
  5 import org.apache.hadoop.io.*;
  6 import org.apache.hadoop.mapreduce.Job;
  7 import org.apache.hadoop.mapreduce.Mapper;
  8 import org.apache.hadoop.mapreduce.Reducer;
  9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 12 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 13 import org.apache.hadoop.util.GenericOptionsParser;
 14 import yjmyzz.util.HDFSUtil;
 15
 16 import java.io.IOException;
 17
 18
 19 public class Average {
 20
 21     public static class AvgMapper
 22             extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
 23
 24         public long sum = 0;
 25         public long count = 0;
 26
 27         public void map(LongWritable key, Text value, Context context)
 28                 throws IOException, InterruptedException {
 29             sum += Long.parseLong(value.toString());
 30             count += 1;
 31         }
 32
 33         protected void cleanup(Context context) throws IOException, InterruptedException {
 34             context.write(new LongWritable(sum), new LongWritable(count));
 35         }
 36
 37     }
 38
 39     public static class AvgCombiner extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
 40
 41         public long sum = 0;
 42         public long count = 0;
 43
 44         public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
 45                 throws IOException, InterruptedException {
 46             sum += key.get();
 47             for (LongWritable v : values) {
 48                 count += v.get();
 49             }
 50         }
 51
 52         protected void cleanup(Context context) throws IOException, InterruptedException {
 53             context.write(new LongWritable(sum), new LongWritable(count));
 54         }
 55
 56     }
 57
 58     public static class AvgReducer extends Reducer<LongWritable, LongWritable, DoubleWritable, NullWritable> {
 59
 60         public long sum = 0;
 61         public long count = 0;
 62
 63         public void reduce(LongWritable key, Iterable<LongWritable> values, Context context)
 64                 throws IOException, InterruptedException {
 65             sum += key.get();
 66             for (LongWritable v : values) {
 67                 count += v.get();
 68             }
 69         }
 70
 71
 72         protected void cleanup(Context context) throws IOException, InterruptedException {
 73             context.write(new DoubleWritable(new Double(sum)/count), NullWritable.get());
 74         }
 75
 76     }
 77
 78     public static void main(String[] args) throws Exception {
 79         Configuration conf = new Configuration();
 80         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 81         if (otherArgs.length < 2) {
 82             System.err.println("Usage: Avg <in> [<in>...] <out>");
 83             System.exit(2);
 84         }
 85
 86         //删除输出目录(可选,省得多次运行时,总是报OUTPUT目录已存在)
 87         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
 88
 89         Job job = Job.getInstance(conf, "Avg");
 90         job.setJarByClass(Average.class);
 91         job.setMapperClass(AvgMapper.class);
 92         job.setCombinerClass(AvgCombiner.class);
 93         job.setReducerClass(AvgReducer.class);
 94
 95         //注意这里:由于Mapper与Reducer的输出Key,Value类型不同,所以要单独为Mapper设置类型
 96         job.setMapOutputKeyClass(LongWritable.class);
 97         job.setMapOutputValueClass(LongWritable.class);
 98
 99
100         job.setOutputKeyClass(DoubleWritable.class);
101         job.setOutputValueClass(NullWritable.class);
102
103         for (int i = 0; i < otherArgs.length - 1; ++i) {
104             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
105         }
106         FileOutputFormat.setOutputPath(job,
107                 new Path(otherArgs[otherArgs.length - 1]));
108         System.exit(job.waitForCompletion(true) ? 0 : 1);
109     }
110
111
112 }

View Code

输出:3.909090909090909

这个稍微要复杂一点,平均值大家都知道=Sum/Count,所以这其实前面Count与Max的综合运用而已,思路是在输出的key-value中,用max做key,用count做value,最终形成{sum,count}的输出,然后在最后的cleanup中,sum/count即得avg,但是有一个特点要注意的地方,由于Mapper与Reducer的output {key,value}类型并不一致,所以96-101行这里,分别设置了Map及Reduce的key,value输出类型,如果没有96-97这二行,100-101这二行会默认把Mapper,Combiner,Reducer这三者的输出类型设置成相同的类型.

 

7) 改进型的WordCount(按词频倒排)

官网示例WordCount只统计出单词出现的次数,并未按词频做倒排,下面的代码示例实现了该功能

 1 package yjmyzz.mr;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.IntWritable;
 6 import org.apache.hadoop.io.LongWritable;
 7 import org.apache.hadoop.io.NullWritable;
 8 import org.apache.hadoop.io.Text;
 9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14 import org.apache.hadoop.util.GenericOptionsParser;
15 import yjmyzz.util.HDFSUtil;
16
17 import java.io.IOException;
18 import java.util.Comparator;
19 import java.util.StringTokenizer;
20 import java.util.TreeMap;
21
22
23 public class WordCount2 {
24
25     public static class TokenizerMapper
26             extends Mapper<Object, Text, Text, IntWritable> {
27
28         private final static IntWritable one = new IntWritable(1);
29         private Text word = new Text();
30
31         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
32             StringTokenizer itr = new StringTokenizer(value.toString());
33             while (itr.hasMoreTokens()) {
34                 word.set(itr.nextToken());
35                 context.write(word, one);
36             }
37         }
38     }
39
40     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
41
42         //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排
43         private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() {
44             @Override
45             public int compare(Integer x, Integer y) {
46                 return y.compareTo(x);
47             }
48         });
49
50         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
51             //reduce后的结果放入treeMap,而不是向context中记入结果
52             int sum = 0;
53             for (IntWritable val : values) {
54                 sum += val.get();
55             }
56             if (treeMap.containsKey(sum)){
57                 String value = treeMap.get(sum) + "," + key.toString();
58                 treeMap.put(sum,value);
59             }
60             else {
61                 treeMap.put(sum, key.toString());
62             }
63         }
64
65         protected void cleanup(Context context) throws IOException, InterruptedException {
66             //将treeMap中的结果,按value-key顺序写入contex中
67             for (Integer key : treeMap.keySet()) {
68                 context.write(new Text(treeMap.get(key)), new IntWritable(key));
69             }
70         }
71     }
72
73     public static void main(String[] args) throws Exception {
74         Configuration conf = new Configuration();
75         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
76         if (otherArgs.length < 2) {
77             System.err.println("Usage: wordcount2 <in> [<in>...] <out>");
78             System.exit(2);
79         }
80         //删除输出目录
81         HDFSUtil.deleteFile(conf, otherArgs[otherArgs.length - 1]);
82         Job job = Job.getInstance(conf, "word count2");
83         job.setJarByClass(WordCount2.class);
84         job.setMapperClass(TokenizerMapper.class);
85         job.setCombinerClass(IntSumReducer.class);
86         job.setReducerClass(IntSumReducer.class);
87         job.setOutputKeyClass(Text.class);
88         job.setOutputValueClass(IntWritable.class);
89         for (int i = 0; i < otherArgs.length - 1; ++i) {
90             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
91         }
92         FileOutputFormat.setOutputPath(job,
93                 new Path(otherArgs[otherArgs.length - 1]));
94         System.exit(job.waitForCompletion(true) ? 0 : 1);
95     }
96
97
98 }

View Code

原理: 依然用到了cleanup,此外为了实现排序,采用了TreeMap这种内置了key排序的数据结构.

这里为了展示更直观,选用了电影<超能陆战队>主题曲的第一段歌词做为输入:

They say we are what we are
But we do not have to be
I am  bad behavior but I do it in the best way
I will be the watcher
Of the eternal flame
I will be the guard dog
of all your fever dreams

原版的WordCount处理完后,结果如下:

But	1
I	4
Of	1
They	1
all	1
am	1
are	2
bad	1
be	3
behavior	1
best	1
but	1
do	2
dog	1
dreams	1
eternal	1
fever	1
flame	1
guard	1
have	1
in	1
it	1
not	1
of	1
say	1
the	4
to	1
watcher	1
way	1
we	3
what	1
will	2
your	1

改进后的WordCount2处理结果如下:

I,the	4
be,we	3
are,do,will	2
But,Of,They,all,am,bad,behavior,best,but,dog,dreams,eternal,fever,flame,guard,have,in,it,not,of,say,to,watcher,way,what,your	1

 

时间: 2024-09-11 05:34:48

Hadoop: MapReduce2的几个基本示例的相关文章

Spark教程-构建Spark集群-配置Hadoop伪分布模式并运行Wordcount示例(1)

&http://www.aliyun.com/zixun/aggregation/37954.html">nbsp;   伪分布模式主要涉及一下的配置信息: 修改Hadoop的核心配置文件core-site.xml,主要是配置HDFS的地址和端口号: 修改Hadoop中HDFS的配置文件hdfs-site.xml,主要是配置replication; 修改Hadoop的MapReduce的配置文件mapred-site.xml,主要是配置JobTracker的地址和端口: 在具体操作

Hadoop:pig 安装及入门示例

pig是hadoop的一个子项目,用于简化MapReduce的开发工作,可以用更人性化的脚本方式分析数据. 一.安装 a) 下载 从官网http://pig.apache.org下载最新版本(目前是0.14.0版本),最新版本可以兼容hadop 0.x /1.x / 2.x版本,直接解压到某个目录即可. 注:下面是几个国内的镜像站点 http://mirrors.cnnic.cn/apache/pig/ http://mirror.bit.edu.cn/apache/pig/ http://mi

树莓派上搭建 Hadoop 集群环境的方法

最近在学习 Hadoop,正好前几天又在玩儿树莓派,查阅了一些文档,感觉在树莓派上搭建 Hadoop 集群是可行的,当然了,别对性能抱太大的希望,主要是感受一下分布式计算的魅力.在这个过程中参考了很多文档,先列在这里,基本上这些文档都非常详细了,大家可以直接参考. How to build 7 node Raspberry Pi Hadoop cluster Hadoop集群安装配置教程 Hadoop2.6.0 Ubuntu/CentOS 实验软硬件清单 以下是我的实验环境所用的硬件和软件版本清

使用Apache Hadoop、Impala和MySQL进行数据分析

http://www.aliyun.com/zixun/aggregation/14417.html">Apache Hadoop是目前被大家广泛使用的数据分析平台,它可靠.高效.可伸缩.Percona公司的Alexander Rubin 最近发表了一篇博客文章介绍了他是如何将一个表从MySQL导出到Hadoop然后将数据加载到Cloudera Impala并在这上面运行报告的.在Alexander Rubin的这个测试示例中他使用的集群包含6个数据节点.下面是具体的规格: 数据导出有很多

用 Linux 和 Apache Hadoop 进行云计算

IBM®.Google.VMWare 和 Amazon 等公司已经开始提供云计算产品和战略.本文讲解如何使用 Apache Hadoop 构建一个 MapReduce 框架以建立 Hadoop 集群,以及如何创建在 Hadoop 上运行的示例 MapReduce 应用程序.还将讨论如何在云上设置耗费时间/磁盘的任务. 云计算简介 近来云计算越来越热门了,云计算已经被看作 IT 业的新趋势.云计算可以粗略地定义为使用自己环境之外的某一服务提供的可伸缩计算资源,并按使用量付费.可以通过 Intern

Oozie工作流程定义详解

Oozie工作流程定义是一个DAG(Directed Acyclical Graphs)图,它由控制流节点(Control Flow Nodes)或动作节点(Action Nodes)组成,各个节点又是通过表征转移的箭线(transitions arrows)互相连通.对于工作流一般对应存在流程定义语言,例如jBPM是jPDL,大多数都是基于XML定义的,Oozie流程定义语言也是基于XML定义的,称为hPDL(Hadoop Process Definition Language). 下面,我们

用Apache Spark进行大数据处理—入门篇

文章讲的是用Apache Spark进行大数据处理-入门篇,Apache Spark 是一个围绕速度.易用性和复杂分析构建的大数据处理框架.最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一. 与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势. 首先,Spark为我们提供了一个全面.统一的框架用于管理各种有着不同性质(文本数据.图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求. Sp

《深入理解大数据:大数据处理与编程实践》一一导读

前 言 2012年以来,大数据(Big Data)技术在全世界范围内迅猛发展,在全球学术界.工业界和各国政府得到了高度关注和重视,掀起了一场可与20世纪90年代的信息高速公路相提并论的发展热潮. 大数据技术如此重要,已经被我国政府提升到国家重大发展战略的高度.2014年我国政府工作报告中指出:"设立新兴产业创业创新平台,在新一代移动通信.集成电路.大数据.先进制造.新能源.新材料等方面赶超先进,引领未来产业发展".由此可见,大数据已经被我国政府列为推动国家科技创新和引领经济结构优化升级

HDFS命令行客户端使用,命令行客户端支持的命令参数,常用命令参数介绍

3.HDFS的shell(命令行客户端)操作 3.1HDFS命令行客户端使用 HDFS提供shell命令行客户端,使用方法如下: [toto@hadoop hadoop-2.8.0]$ hdfs dfs -ls /     (推荐使用这种方式,hdfs现在这种是最新的一种方式) Found 4 items drwxr-xr-x   - toto supergroup          0 2017-05-29 14:01 /findbugs-1.3.9 drwxr-xr-x   - toto s