MapReduce多个job同时使用的方式(从网上找到的案例,原始博文:http://www.cnblogs.com/yjmyzz/p/4540469.html)

复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以上一篇中的求平均数为例,可以分解成三个步骤:

1. 求Sum

2. 求Count

3. 计算平均数

每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来

代码:

package cn.toto.bigdata.mr.wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Avg2 {

	private static final Text TEXT_SUM = new Text("SUM");
	private static final Text TEXT_COUNT = new Text("COUNT");
	private static final Text TEXT_AVG = new Text("AVG");

	public static class SumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
		public long sum = 0;

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			sum += value.toString().length();
		}

		@Override
		protected void cleanup(Mapper<LongWritable, Text, Text, LongWritable>.Context context)
				throws IOException, InterruptedException {
			context.write(TEXT_SUM, new LongWritable(sum));
		}
	}

	public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
		public long sum = 0;

		@Override
		protected void reduce(Text key, Iterable<LongWritable> values,Context context)
				throws IOException, InterruptedException {
			for (LongWritable v : values) {
				sum += v.get();
			}
			context.write(TEXT_SUM, new LongWritable(sum));
		}
	}

	//计算Count
	public static class CountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
		public long count = 0;

		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
				throws IOException, InterruptedException {
			count += 1;
		}

		@Override
		protected void cleanup(Context context)
				throws IOException, InterruptedException {
			context.write(TEXT_COUNT, new LongWritable(count));
		}
	}

	public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
		public long count = 0;

		@Override
		public void reduce(Text key, Iterable<LongWritable> values, Context context)
				throws IOException, InterruptedException {
			for (LongWritable v : values) {
				count += v.get();
			}
			context.write(TEXT_COUNT, new LongWritable(count));
		}
	}

	//计算Avg
	public static class AvgMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
		public long count = 0;
		public long sum = 0;

		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] v = value.toString().split("\t");
			if (v[0].equals("COUNT")) {
				count = Long.parseLong(v[1]);
			} else if (v[0].equals("SUM")) {
				sum = Long.parseLong(v[1]);
			}
		}

		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			context.write(new LongWritable(sum), new LongWritable(count));
		}
	}

	public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> {
		public long sum = 0;
		public long count = 0;

		@Override
		protected void reduce(LongWritable key, Iterable<LongWritable> values,Context context)
				throws IOException, InterruptedException {
			sum += key.get();
			for(LongWritable v : values) {
				count += v.get();
			}
		}

		@Override
		protected void cleanup(Reducer<LongWritable, LongWritable, Text, DoubleWritable>.Context context)
				throws IOException, InterruptedException {
			context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count));
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		String inputPath = "E:/wordcount/input/a.txt";
		String maxOutputPath = "E:/wordcount/output/max/";
		String countOutputPath = "E:/wordcount/output/count/";
		String avgOutputPath = "E:/wordcount/output/avg/";

		Job job1 = Job.getInstance(conf, "Sum");
		job1.setJarByClass(Avg2.class);
		job1.setMapperClass(SumMapper.class);
		job1.setCombinerClass(SumReducer.class);
		job1.setReducerClass(SumReducer.class);
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(LongWritable.class);
		FileInputFormat.addInputPath(job1, new Path(inputPath));
		FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath));

		Job job2 = Job.getInstance(conf, "Count");
		job2.setJarByClass(Avg2.class);
		job2.setMapperClass(CountMapper.class);
		job2.setCombinerClass(CountReducer.class);
		job2.setReducerClass(CountReducer.class);
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(LongWritable.class);
		FileInputFormat.addInputPath(job2, new Path(inputPath));
		FileOutputFormat.setOutputPath(job2, new Path(countOutputPath));

		Job job3 = Job.getInstance(conf, "Average");
		job3.setJarByClass(Avg2.class);
		job3.setMapperClass(AvgMapper.class);
		job3.setReducerClass(AvgReducer.class);
		job3.setMapOutputKeyClass(LongWritable.class);
		job3.setMapOutputValueClass(LongWritable.class);
		job3.setOutputKeyClass(Text.class);
		job3.setOutputValueClass(DoubleWritable.class);

		//将job1及job2的输出为做job3的输入
		FileInputFormat.addInputPath(job3, new Path(maxOutputPath));
		FileInputFormat.addInputPath(job3, new Path(countOutputPath));
		FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath));

		//提交job1及job2,并等待完成
		if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) {
			System.exit(job3.waitForCompletion(true) ? 0 : 1);
		}
	}

}

运行准备:
准备数据文件:

E:/wordcount/input/a.txt

数据文件的内容如下:

运行后:E:\wordcount\output\count\part-r-00000的值如下:

运行后:

E:\wordcount\output\max\part-r-00000的内容如下:

最终的平均值是:E:\wordcount\output\avg\part-r-00000

时间: 2024-11-08 22:33:21

MapReduce多个job同时使用的方式(从网上找到的案例,原始博文:http://www.cnblogs.com/yjmyzz/p/4540469.html)的相关文章

网上找到MD5的两种实现方式区别没弄明白

问题描述 byte[]result=Encoding.Default.GetBytes("123abc456");MD5md5=newMD5CryptoServiceProvider();byte[]output=md5.ComputeHash(result);Console.WriteLine(BitConverter.ToString(output).Replace("-",""));//为什么要Replace("-",&

百度钱包我可以用关闭窗口的方式退出网上银行网页吗?

  请不要用关闭浏览器窗口的方式退出,为确保网上银行的连接被安全地终止,在完成所有交易后,使用系统中的"退出"功能退出网上银行服务.

河南焦作创新追逃方式民警网上聊来逃犯自首

本报讯(穆荣磊 记者王怡波)一个偶然的机会,民警通过QQ聊天,"聊"动了逃犯的亲属,亲属将逃跑5年的赵某带到公安机关投案自首.随着"清网行动"的进一步开展,逃犯的抓获难度越来越大,为此,河南省焦作市公安局定和派出所的民警通过创新网上追逃方式,近期成功追回81.40%的逃犯. 近日,定和派出所案件侦查大队四中队队长刘波如往常一般,在QQ上和朋友聊天,一个朋友问刘波什么叫"清网行动".刘波灵机一动,便将公安机关对待网上逃犯的政策.法规等告诉在线的 网

Android(Xamarin)之旅(一)

原文:Android(Xamarin)之旅(一) Xamarin废话我就不多说了. 就是一款编写Android和IOS应用的IDE,从Visual Studio2010就开始有个这个插件.只要发展什么的,我觉得在这里说还不如自己去百度呢. 入正题: 一.安装和配置(以Visual Studio Pro 2015为例) Visual Studio2015直接提供了这个插件的选择项,稍微提示一下,如果要安装的话,最好准备好十个小时的打算,而且是网速不错的情况下,因为要下载Android api和 J

MapReduce原理与设计思想

简单解释 MapReduce 算法 一个有趣的例子 你想数出一摞牌中有多少张黑桃.直观方式是一张一张检查并且数出有多少张是黑桃 MapReduce方法则是 给在座的所有玩家中分配这摞牌 让每个玩家数自己手中的牌有几张是黑桃然后把这个数目汇报给你 你把所有玩家告诉你的数字加起来得到最后的结论 拆分 MapReduce合并了两种经典函数 映射Mapping对集合里的每个目标应用同一个操作.即如果你想把表单里每个单元格乘以二那么把这个函数单独地应用在每个单元格上的操作就属于mapping. 化简Red

《MapReduce 2.0源码分析与编程实战》一1.4 MapReduce与Hadoop

1.4 MapReduce与Hadoop 如果将Hadoop比作一头大象的话,那么MapReduce就是那头大象的大脑.MapReduce是Hadoop核心编程模型.在Hadoop中,数据处理核心为MapReduce程序设计模型.MapReduce把数据处理和分析分成两个主要阶段,即Map阶段和Reduce阶段.Map阶段主要是对输入进行整合,通过定义的输入格式获取文件信息和类型,并且确定读取方式,最终将读取的内容以键值对的形式保存.而Reduce是用来对结果进行后续处理,通过对Map获取内容中

【书摘】大数据开发之走进MapReduce

本文节选于清华大学出版社推出的<Hadoop权威指南>一书,作者为Tom White,译者是华东师范大学数据科学与工程学院.本书从Hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍Hadoop这一高性能处理海量数据集的理想工具.全书共16章,3个附录,涉及的主题包括:Haddoop:MapReduce:Hadoop分布式文件系统:Hadoop的I/O.MapReduce应用程序开发:MapReduce的工作机制:MapReduce的类型和格式:MapReduce的特性:如何构建Ha

ASP中存储过程调用的两种方式及比较

比较|存储过程|存储过程 beerfroth(原作) 本人用sql server 和asp写了一个简单的留言本,在不断的尝试中发现,分页显示留言的时候,不同的执行方式,时间上的一些差别. 下面通过对比来看看几种方式的用时对比. 一,使用存储过程分页,这种情况又分为两种方式: 第一种,使用command对象,如下: Set Cmd=server.CreateObject("Adodb.Command")Cmd.ActiveConnection=connCmd.CommandText=&q

PHP 数据库驱动、连接数据不同方式学习笔记

目录   1. PHP数据库驱动简介 2. PHP连接数据库的不同方式      1. PHP数据库驱动简介   驱动是一段设计用来于一种特定类型的数据库服务器进行交互的软件代码.驱动可能会调用一些库.类似于Java中的数据库驱动的概念   复制代码 1. JDBC-ODPC桥: 它将JDBC API映射到ODPC API.再让JDBC-ODPC调用数据库本地驱动代码(也就是数据库厂商提供的数据库操作二进制代码库,例如Oracle中的oci.dll) 2. 本地API驱动 直接将JDBC API