其实,使用MapReduce计算最大值的问题,和Hadoop自带的WordCount的程序没什么区别,不过在Reducer中一个是求最大值,一个是做累加,本质一样,比较简单。下面我们结合一个例子来实现。
测试数据
我们通过自己的模拟程序,生成了一组简单的测试样本数据。输入数据的格式,截取一个片段,如下所示:
01 |
SG 253654006139495 253654006164392 619850464 |
02 |
KG 253654006225166 253654006252433 743485698 |
03 |
UZ 253654006248058 253654006271941 570409379 |
04 |
TT 253654006282019 253654006286839 23236775 |
05 |
BE 253654006276984 253654006301435 597874033 |
06 |
BO 253654006293624 253654006315946 498265375 |
07 |
SR 253654006308428 253654006330442 484613339 |
08 |
SV 253654006320312 253654006345405 629640166 |
09 |
LV 253654006330384 253654006359891 870680704 |
10 |
FJ 253654006351709 253654006374468 517965666 |
上面文本数据一行一行存储,一行包含4部分,分别表示:
- 国家代码
- 起始时间
- 截止时间
- 随机成本/权重估值
各个字段之间以空格号分隔。我们要计算的结果是,求各个国家(以国家代码标识)的成本估值的最大值。
编程实现
因为比较简单,直接看实际的代码。代码分为三个部分,当然是Mapper、Reducer、Driver。Mapper实现类为GlobalCostMapper,实现代码如下所示:
01 |
package org.shirdrn.kodz.inaction.hadoop.extremum.max;
|
03 |
import java.io.IOException;
|
05 |
import org.apache.hadoop.io.LongWritable;
|
06 |
import org.apache.hadoop.io.Text;
|
07 |
import org.apache.hadoop.mapreduce.Mapper;
|
09 |
public class GlobalCostMapper extends
|
10 |
Mapper<LongWritable, Text, Text, LongWritable> {
|
12 |
private final static LongWritable costValue = new LongWritable( 0 );
|
13 |
private Text code = new Text();
|
16 |
protected void map(LongWritable key, Text value, Context context)
|
17 |
throws IOException, InterruptedException {
|
18 |
// a line, such as 'SG 253654006139495 253654006164392 619850464'
|
19 |
String line = value.toString();
|
20 |
String[] array = line.split( "\\s" );
|
21 |
if (array.length == 4 ) {
|
22 |
String countryCode = array[ 0 ];
|
23 |
String strCost = array[ 3 ];
|
26 |
cost = Long.parseLong(strCost);
|
27 |
} catch (NumberFormatException e) {
|
31 |
code.set(countryCode);
|
33 |
context.write(code, costValue);
|
上面实现逻辑非常简单,就是根据空格分隔符,将各个字段的值分离出来,最后输出键值对。
接着,Mapper输出了的键值对列表,在Reducer中就需要进行合并化简,Reducer的实现类为GlobalCostReducer,实现代码如下所示:
01 |
package org.shirdrn.kodz.inaction.hadoop.extremum.max;
|
03 |
import java.io.IOException;
|
04 |
import java.util.Iterator;
|
06 |
import org.apache.hadoop.io.LongWritable;
|
07 |
import org.apache.hadoop.io.Text;
|
08 |
import org.apache.hadoop.mapreduce.Reducer;
|
10 |
public class GlobalCostReducer extends
|
11 |
Reducer<Text, LongWritable, Text, LongWritable> {
|
14 |
protected void reduce(Text key, Iterable<LongWritable> values,
|
15 |
Context context) throws IOException, InterruptedException {
|
17 |
Iterator<LongWritable> iter = values.iterator();
|
18 |
while (iter.hasNext()) {
|
19 |
LongWritable current = iter.next();
|
20 |
if (current.get() > max) {
|
24 |
context.write(key, new LongWritable(max));
|
上面计算一组键值对列表中代价估值的最大值,逻辑比较简单。为了优化,在Map输出以后,可以使用该Reducer进行合并操作,即作为Combiner,减少从Mapper到Reducer的数据传输量,在配置Job的时候可以指定。
下面看,如何来配置和运行一个Job,实现类为GlobalMaxCostDriver,实现代码如下所示:
01 |
package org.shirdrn.kodz.inaction.hadoop.extremum.max;
|
03 |
import java.io.IOException;
|
05 |
import org.apache.hadoop.conf.Configuration;
|
06 |
import org.apache.hadoop.fs.Path;
|
07 |
import org.apache.hadoop.io.LongWritable;
|
08 |
import org.apache.hadoop.io.Text;
|
09 |
import org.apache.hadoop.mapreduce.Job;
|
10 |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
11 |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
12 |
import org.apache.hadoop.util.GenericOptionsParser;
|
14 |
public class GlobalMaxCostDriver {
|
16 |
public static void main(String[] args) throws IOException,
|
17 |
InterruptedException, ClassNotFoundException {
|
19 |
Configuration conf = new Configuration();
|
20 |
String[] otherArgs = new GenericOptionsParser(conf, args)
|
22 |
if (otherArgs.length != 2 ) {
|
23 |
System.err.println( "Usage: maxcost <in> <out>" );
|
27 |
Job job = new Job(conf, "max cost" );
|
29 |
job.setJarByClass(GlobalMaxCostDriver. class );
|
30 |
job.setMapperClass(GlobalCostMapper. class );
|
31 |
job.setCombinerClass(GlobalCostReducer. class );
|
32 |
job.setReducerClass(GlobalCostReducer. class );
|
34 |
job.setOutputKeyClass(Text. class );
|
35 |
job.setOutputValueClass(LongWritable. class );
|
37 |
FileInputFormat.addInputPath(job, new Path(otherArgs[ 0 ]));
|
38 |
FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 1 ]));
|
40 |
int exitFlag = job.waitForCompletion( true ) ? 0 : 1 ;
|
41 |
System.exit(exitFlag);
|
运行程序
首先,需要保证Hadoop集群正常运行,我这里NameNode是主机ubuntu3。下面看运行程序的过程:
- 编译代码(我直接使用Maven进行),打成jar文件
1 |
shirdrn@SYJ:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes$ jar -cvf global-max-cost.jar -C ./ org |
- 拷贝上面生成的jar文件,到NameNode环境中
1 |
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ scp shirdrn@172.0.8.212:~/programs/eclipse-jee-juno/workspace/kodz-all/kodz-hadoop/target/classes/global-max-cost.jar ./
|
1 |
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs -copyFromLocal /opt/stone/cloud/dataset/data_10m /user/xiaoxiang/datasets/cost/ |
1 |
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop jar global-max-cost.jar org.shirdrn.kodz.inaction.hadoop.extremum.max.GlobalMaxCostDriver /user/xiaoxiang/datasets/cost /user/xiaoxiang/output/cost |
运行过程控制台输出内容,大概如下所示:
01 |
13/03/22 16:30:16 INFO input.FileInputFormat: Total input paths to process : 1 |
02 |
13/03/22 16:30:16 INFO util.NativeCodeLoader: Loaded the native-hadoop library |
03 |
13/03/22 16:30:16 WARN snappy.LoadSnappy: Snappy native library not loaded |
04 |
13/03/22 16:30:16 INFO mapred.JobClient: Running job: job_201303111631_0004 |
05 |
13/03/22 16:30:17 INFO mapred.JobClient: map 0% reduce 0% |
06 |
13/03/22 16:30:33 INFO mapred.JobClient: map 22% reduce 0% |
07 |
13/03/22 16:30:36 INFO mapred.JobClient: map 28% reduce 0% |
08 |
13/03/22 16:30:45 INFO mapred.JobClient: map 52% reduce 9% |
09 |
13/03/22 16:30:48 INFO mapred.JobClient: map 57% reduce 9% |
10 |
13/03/22 16:30:57 INFO mapred.JobClient: map 80% reduce 9% |
11 |
13/03/22 16:31:00 INFO mapred.JobClient: map 85% reduce 19% |
12 |
13/03/22 16:31:10 INFO mapred.JobClient: map 100% reduce 28% |
13 |
13/03/22 16:31:19 INFO mapred.JobClient: map 100% reduce 100% |
14 |
13/03/22 16:31:24 INFO mapred.JobClient: Job complete: job_201303111631_0004 |
15 |
13/03/22 16:31:24 INFO mapred.JobClient: Counters: 29 |
16 |
13/03/22 16:31:24 INFO mapred.JobClient: Job Counters |
17 |
13/03/22 16:31:24 INFO mapred.JobClient: Launched reduce tasks=1 |
18 |
13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=76773 |
19 |
13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 |
20 |
13/03/22 16:31:24 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 |
21 |
13/03/22 16:31:24 INFO mapred.JobClient: Launched map tasks=7 |
22 |
13/03/22 16:31:24 INFO mapred.JobClient: Data-local map tasks=7 |
23 |
13/03/22 16:31:24 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=40497 |
24 |
13/03/22 16:31:24 INFO mapred.JobClient: File Output Format Counters |
25 |
13/03/22 16:31:24 INFO mapred.JobClient: Bytes Written=3029 |
26 |
13/03/22 16:31:24 INFO mapred.JobClient: FileSystemCounters |
27 |
13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_READ=142609 |
28 |
13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_READ=448913653 |
29 |
13/03/22 16:31:24 INFO mapred.JobClient: FILE_BYTES_WRITTEN=338151 |
30 |
13/03/22 16:31:24 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=3029 |
31 |
13/03/22 16:31:24 INFO mapred.JobClient: File Input Format Counters |
32 |
13/03/22 16:31:24 INFO mapred.JobClient: Bytes Read=448912799 |
33 |
13/03/22 16:31:24 INFO mapred.JobClient: Map-Reduce Framework |
34 |
13/03/22 16:31:24 INFO mapred.JobClient: Map output materialized bytes=21245 |
35 |
13/03/22 16:31:24 INFO mapred.JobClient: Map input records=10000000 |
36 |
13/03/22 16:31:24 INFO mapred.JobClient: Reduce shuffle bytes=18210 |
37 |
13/03/22 16:31:24 INFO mapred.JobClient: Spilled Records=12582 |
38 |
13/03/22 16:31:24 INFO mapred.JobClient: Map output bytes=110000000 |
39 |
13/03/22 16:31:24 INFO mapred.JobClient: CPU time spent (ms)=80320 |
40 |
13/03/22 16:31:24 INFO mapred.JobClient: Total committed heap usage (bytes)=1535639552 |
41 |
13/03/22 16:31:24 INFO mapred.JobClient: Combine input records=10009320 |
42 |
13/03/22 16:31:24 INFO mapred.JobClient: SPLIT_RAW_BYTES=854 |
43 |
13/03/22 16:31:24 INFO mapred.JobClient: Reduce input records=1631 |
44 |
13/03/22 16:31:24 INFO mapred.JobClient: Reduce input groups=233 |
45 |
13/03/22 16:31:24 INFO mapred.JobClient: Combine output records=10951 |
46 |
13/03/22 16:31:24 INFO mapred.JobClient: Physical memory (bytes) snapshot=1706708992 |
47 |
13/03/22 16:31:24 INFO mapred.JobClient: Reduce output records=233 |
48 |
13/03/22 16:31:24 INFO mapred.JobClient: Virtual memory (bytes) snapshot=4316872704 |
49 |
13/03/22 16:31:24 INFO mapred.JobClient: Map output records=10000000 |
001 |
xiaoxiang@ubuntu3:/opt/stone/cloud/hadoop-1.0.3$ bin/hadoop fs - cat /user/xiaoxiang/output/cost/part-r-00000
|
可见,结果是我们所期望的。
时间: 2024-08-01 08:53:26