Reduce Side Join实现

关于reduce边join,其最重要的是使用MultipleInputs.addInputPath这个api对不同的表使用不同的Map,然后在每个Map里做一下该表的标识,最后到了Reduce端再根据标识区分对应的表!

Reduce Side Join Example

User and comment join

In thisexample, we’ll be using theusers and comments tables from the StackOverflow dataset. Storing data in this matter makessense, as storingrepetitive user data witheach comment is unnecessary. Thiswould also makeupdating user information diffi‐ cult. However,having disjoint data sets posesproblems when it comes to associating a comment with the user who wroteit. Through the use of a reduceside join, thesetwo data sets canbe merged together using the userID as the foreign key. In this example, we’ll perform an inner, outer, and antijoin. The choice of which join to execute is set during job configuration.

Hadoop supportsthe ability to use multipleinput data typesat once, allowingyou to create a mapper classand input formatfor each inputsplit from different data sources. This is extremely helpful, because you don’t have to code logic for two different data inputs in the samemap implementation. In the following example, two mapperclasses are created: one for the user data and one for the comments. Each mapper classoutputs the user ID as the foreignkey, and the entire record as the value along with a single character to flag whichrecord came fromwhat set. Thereducer then copiesall values for eachgroup in memory, keepingtrack of whichrecord came fromwhat data set.The records are then joined togetherand output.

 

The following descriptions of eachcode section explainthe solution to the problem.

Problem: Given a set of user information and a list of user’s comments, enrich each comment with the information about the userwho created thecomment.

 

Drivercode.The job configurationis slightly different from the standard configuration due to the user of themultiple input utility. We also set the join type in the jobconfig‐ uration to args[2] so it can be used in the reducer. The relevant piece of the drivercode to use the MultipleInput follows:

...

// Use MultipleInputs to set which inputuses what mapper

// This will keep parsingof each dataset separate froma logical standpoint

// The firsttwo elements of theargs arrayare the two inputs 

MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,UserJoinMapper.class);

MultipleInputs.addInputPath(job,newPath(args[1]), TextInputFormat.class, CommentJoinMapper.class);

 

job.getConfiguration()..set("join.type", args[2]);

...

 

User mappercode.This mapper parseseach input lineof user dataXML. It grabs theuser ID associated with each record and outputs it along with the entire input  value. It prepends the letter A in front of theentire value. This allows the reducer to know which values came from what data  set.

public static class UserJoinMapper extendsMapper<Object, Text, Text, Text> {

 

private Text outkey =newText();

private Text outvalue =newText();

 

public void map(Object key, Text value, Context context) throwsIOException, InterruptedException {

 

// Parse the input stringinto a nice map

Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());

 

String userId = parsed.get("Id");

 

// The foreign join keyis the userID

outkey.set(userId);

 

// Flag this record for the reducerand then outputoutvalue.set("A" + value.toString()); context.write(outkey, outvalue);

}

}

When you output the value from the map side, the entire record doesn’t have to be sent. This is an opportunity to optimize the join by keepingonly the fields of data you want to join together. It requiresmore pro‐ cessing on the map side, but is worthit in the long run. Also, sincethe foreign key is in the map output key, you don’t need to keep that in the value, either.

 

 

Comment mapper code.This mapper parseseach input line of commentXML. Very sim‐ ilar to the UserJoinMapper,it too grabs the user ID associated with each record and outputs it along with the entire inputvalue. The only different here is that the XML attribute UserId representsthe user that posted to comment, where as Id in theuser  data set is the user ID. Here, this mapper prepends the letter B in front ofthe entire value.

public static class CommentJoinMapper extends Mapper<Object, Text, Text, Text> {

 

private Text outkey =newText();

private Text outvalue =newText();

 

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

 

Map<String, String> parsed = transformXmlToMap(value.toString());

 

// The foreign join keyis the userID

outkey.set( parsed.get("UserId"));

 

// Flag this record for the reducerand then outputoutvalue.set("B" + value.toString()); context.write(outkey, outvalue);

}

}

 

Reducer code.The reducer code iterates through all thevalues of each group and looks atwhat each record is tagged with and then puts the record in one of two lists.After all values are binned in either list, the actual join logic is executedusing the two lists. The join logic differs slightly based on the type of join,but always involves iterating through both lists and writing to the Context object.The type of join is pulled from the job configuration in the setup method. Let’s look at the main reduce method before looking at the join logic.

public static class UserJoinReducer extendsReducer<Text, Text, Text, Text> {

 

private staticfinal Text EMPTY_TEXT = Text("");

private Text tmp =newText();

private ArrayList<Text> listA =newArrayList<Text>();

private ArrayList<Text> listB =newArrayList<Text>();

private String joinType =null;

 

public void setup(Context context) {

// Get the type of join fromour configuration

joinType=context.getConfiguration().get("join.type");

}

 

public void reduce(Text key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

 

// Clear ourlists listA.clear(); listB.clear();

 

// iterate throughall our values,binning each recordbased on what

// it was tagged with.Make sure to remove thetag!

while(values.hasNext()) { tmp=values.next();

if (tmp.charAt(0) == 'A') {

listA.add(new Text(tmp.toString().substring(1)));

} else if (tmp.charAt('0') == 'B') {

listB.add(new Text(tmp.toString().substring(1)));

}

}

 

// Execute our join logicnow that the lists are filled

executeJoinLogic(context);

}

 

private void executeJoinLogic(Context context)

throws IOException, InterruptedException {

...

}

The input data types tothe reducer are two Text objects. The input key isthe foreign join key, which in this example is the user’s ID. The input values associated with the foreign key contain one record from the “users” data set tagged with ‘B’, as well as all the comments the user posted tagged with ‘B’. Any type of data formatting you would want toperform should be done here prior to outputting. For simplicity, the raw XML value from the left data set (users)is output as the key and the raw XML value from the rightdata set (comments) is output as the value.

Next, let’s look at each of the join types. First up is an inner join. If both the lists are not empty, simply performtwo nested forloops and joineach of thevalues together.

if (joinType.equalsIgnoreCase("inner")) {

// If both lists are not empty,join A with B

if (!listA.isEmpty() && !listB.isEmpty()) {

for (Text A : listA) {

for(Text B : listB) { context.write(A, B);

}

}

}

}...

For aleft outer join,if the right list is not empty, join A with B.If the right list is empty, outputeach record of A with an empty string.

... else if(joinType.equalsIgnoreCase("leftouter")) {

// For each entry in A,

for (Text A : listA) {

// If list B is not empty,join A andB

if (!listB.isEmpty()) {

for(Text B : listB) { context.write(A, B);

}

}else{

// Else, outputA by itself

context.write(A, EMPTY_TEXT);

}

}

}...

A rightouter join is very similar, except switching from the check for empty elements fromBto A. If the left list is empty, write records from B withan empty output key.

...else if (joinType.equalsIgnoreCase("rightouter")) {

// For each entry in B,

for (Text B : listB) {

// If list A is not empty,join A andB

if (!listA.isEmpty()) {

for(Text A : listA) { context.write(A, B);

}

} else {

// Else, outputB by itself

context.write(EMPTY_TEXT, B);

}

}

}...

A fullouter join is more complex, in that we want to keep allrecords, ensuring thatwe join records whereappropriate. If list A is not empty, then for everyelement inA, join withB whenthe B listis not empty, or output A by itself. IfA isempty, then just output B.

... else if (joinType.equalsIgnoreCase("fullouter")) {

// If list A is not empty

if (!listA.isEmpty()) {

// For each entry in A

for (Text A : listA) {

// If list B is not empty,join A with B

if (!listB.isEmpty()) {

for(Text B : listB) { context.write(A, B);

}

}else {

// Else, outputA by itself

context.write(A, EMPTY_TEXT);

}

}

} else {

// If list A is empty, just output B

for (Text B : listB) { context.write(EMPTY_TEXT, B);

}

}

}...

For anantijoin, if at least one of the lists is empty, output the recordsfrom the non- empty list with an empty Text object.

... else if(joinType.equalsIgnoreCase("anti")) {

// If list A is empty and B is empty or vice versa

if (listA.isEmpty() ^ listB.isEmpty()) {

 

// Iterate both A and B with null values

// The previous XOR checkwill make sure exactly one of

// these lists is emptyand therefore the list will be skipped

for (Text A : listA) { context.write(A, EMPTY_TEXT);

}

 

for (Text B : listB) { context.write(EMPTY_TEXT, B);

 

}

}

时间: 2024-09-18 10:23:33

Reduce Side Join实现的相关文章

Hive Map Side join策略

通常Hadoop在做join策略的时候会有两种方式map-side join(也叫replication join)和reduce-side join(也叫repartition join或者common join) 1. reduce side join 利用了mapreduce框架的sort-merge机制来使得相同key的数据聚合在一起,在map阶段会分别读取输入dataset,然后根据join key来分发每条记录(其他值包装在value中),在reduce阶段读取所有同一个join k

从上千篇投稿脱颖而出,这5篇大数据论文凭什么征服KDD评委?

5月23日消息,在2017国际知识发现与数据挖掘大会(KDD)全球论文投稿中,阿里集团和蚂蚁金服共有5篇论文被大会收录,这是继年初阿里云获得KDD Cup 2017举办权之后,阿里巴巴在国际数据挖掘顶会KDD学术成果上的又一次突破. 图 KDD 2017 官网图片 KDD的英文全称是Knowledge Discovery and Data Mining,即知识发现与数据挖掘,由美国计算机协会ACM下的数据挖掘分会举办,是国际数据挖掘领域的顶级会议,每年有大量来自世界各地的学术界和工业界人士参与此

【对标TensorFlow】阿里公开内部超大规模分布式机器学习平台

近年来,随着"大"数据及"大"模型的出现,学术界和工业界对分布式机器学习算法引起了广泛关注.针对这一刚需,阿里集团和蚂蚁金服设计了自己的分布式平台--鲲鹏.鲲鹏结合了分布式系统及并行优化算法,解决了大规模机器学习算法带来的一系列问题,不仅囊括了数据/模型并行.负载平衡.模型同步.稀疏表示.工业容错等特性,而且还提供了封闭好的.宜于调用的 API 供普通的机器学习者开发分布式算法,降低使用成本并提升效率.相关论文在本届 KDD 以口头报告的形式发表(应用数据科学 Tr

《Spark官方文档》Spark Streaming编程指南(一)

Spark Streaming编程指南 概览   Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性.高吞吐量和容错性.Spark Streaming支持从多种数据源提取数据,如:Kafka.Flume.Twitter.ZeroMQ.Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map.reduce.join和window等.最后,Spark Streaming支持将处理完的数据推送到文

《Spark核心技术与高级应用》——1.2节Spark的重要扩展

1.2 Spark的重要扩展 大家知道,在Hadoop中完成即席查询(ad-hoc queries).批处理(batch processing),流式处理(stream processing),需要构建不同的团队,每个团队需要不同的技术和经验,很难做到共享.而Spark实现了平台融合,一个基础平台解决所有的问题,一个团队拥有相同的技术和经验完成所有的任务. 基于Spark的基础平台扩展了5个主要的Spark库,包括支持结构化数据的Spark SQL.处理实时数据的Spark Streaming.

如何搭建大规模机器学习平台?以阿里和蚂蚁的多个实际场景为例

近年来,随着"大"数据及"大"模型的出现,学术界和工业界对分布式机器学习算法引起了广泛关注.针对这一刚需,本论文设计了一个独一无二的分布式平台--鲲鹏.它无缝的结合了分布式系统及并行优化算法,解决了大规模机器学习算法带来的一系列问题.鲲鹏不仅囊括了数据/模型并行.负载平衡.模型同步.稀疏表示.工业容错等特性,而且还提供了封闭好的.宜于调用的API供普通的机器学习者开发分布式算法,降低使用成本并提升效率. 本论文的实验在十亿级别的样本和特征数据上进行,结果表示,鲲鹏这

Spark-基础-Spark及其生态圈简介

1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL.Spark Streaming.MLLib和GraphX等组件,也就是BDAS(伯克利数据分析栈),这些组件逐渐形

Hadoop就业面试题

----------------------------------------------------------------------------- [申明:资料来源于互联网] 本文链接:http://blog.csdn.net/sdksdk0/article/details/51695341 编辑:朱培   ID:sdksdk0 ----------------------------------------------------------------- 以下资料来源于互联网,很多都

Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势

本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义Partitioner,使用Map侧Join代替Reduce侧Join,给倾斜Key加上随机前缀等. 为何要处理数据倾斜(Data Skew) 什么是数据倾斜 对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜. 何谓数据倾斜?数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得