期间遇到了无法转value的值为int型,我采用try catch解决
str22
str11
str33
str14
str47
str25
str39
用的\t隔开,得到结果
str11,4
str2 2,5
str3 3,9
str4 7
更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/Programming/sjjg/
我这里map,reduce都是单独出来的类,用了自定义的key
package com.kane.mr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; import com.j_spaces.obf.fi; //str2 2 //str1 1 //str3 3 //str1 4 //str4 7 //str2 5 //str3 9 public class IntPair implements WritableComparable<IntPair>{ public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } private String firstKey;//str1 private int secondKey;//1 @Override public void write(DataOutput out) throws IOException { out.writeUTF(firstKey); out.writeInt(secondKey); } @Override public void readFields(DataInput in) throws IOException { firstKey=in.readUTF(); secondKey=in.readInt(); } //这里做比较,另一个是自身本类,对key进行排序 @Override public int compareTo(IntPair o) { // int first=o.getFirstKey().compareTo(this.firstKey); // if (first!=0) { // return first; // } // else { // return o.getSecondKey()-this.secondKey; // } return o.getFirstKey().compareTo(this.getFirstKey()); } } package com.kane.mr; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SortMapper extends Mapper<Object,Text,IntPair,IntWritable>{ public IntPair intPair=new IntPair(); public IntWritable intWritable=new IntWritable(0); @Override protected void map(Object key, Text value,//str1 1 Context context) throws IOException, InterruptedException { //String[] values=value.toString().split("/t"); System.out.println(value); int intValue; try { intValue = Integer.parseInt(value.toString()); } catch (NumberFormatException e) { intValue=6; }//不加try catch总是读取value时,无法转成int型 intPair.setFirstKey(key.toString()); intPair.setSecondKey(intValue); intWritable.set(intValue); context.write(intPair, intWritable);// key(str2 2) 2 } } package com.kane.mr; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SortReducer extends Reducer<IntPair, IntWritable, Text,Text>{ @Override protected void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { StringBuffer combineValue=new StringBuffer(); Iterator<IntWritable> itr=values.iterator(); while (itr.hasNext()) { int value=itr.next().get(); combineValue.append(value+","); } context.write(new Text(key.getFirstKey()),new Text(combineValue.toString())); } } package com.kane.mr; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class PartionTest extends Partitioner<IntPair, IntWritable>{ @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) {//reduce个数 return (key.getFirstKey().hashCode()&Integer.MAX_VALUE%numPartitions); } } package com.kane.mr; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TextComparator extends WritableComparator{ public TextComparator(){ super(IntPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair o1=(IntPair)a; IntPair o2=(IntPair)b; return o1.getFirstKey().compareTo(o2.getFirstKey()); } } package com.kane.mr; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; @SuppressWarnings("rawtypes") public class TextIntCompartor extends WritableComparator{ protected TextIntCompartor() { super(IntPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair o1=(IntPair)a; IntPair o2=(IntPair)b; int first=o1.getFirstKey().compareTo(o2.getFirstKey()); if (first!=0) { return first; } else { return o1.getSecondKey()-o2.getSecondKey(); } } } package com.kane.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class SortMain { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "Sort"); job.setJarByClass(SortMain.class); job.setInputFormatClass(KeyValueTextInputFormat.class);//设定输入的格式是key(中间\t隔开)value job.setMapperClass(SortMapper.class); //job.setCombinerClass(IntSumReducer.class); job.setReducerClass(SortReducer.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setSortComparatorClass(TextIntCompartor.class); job.setGroupingComparatorClass(TextComparator.class);//以key 进行group by job.setPartitionerClass(PartionTest.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//输入参数,对应hadoop jar 对应类运行时在后面加的第一个参数 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//输出参数 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
导出jar包放到hadoop下,然后讲sort.txt放入到hdfs中,然后用hadoop jar KaneTest/sort.jar com.kane.mr.SoetMain /kane/sort.txt /kane/output命令执行
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索hadoop
, class
, mapreduce
, apache
, import
, public
hadoop job
mapreduce 排序、mapreduce 二次排序、mapreduce value排序、mapreduce排序算法、mapreduce排序原理,以便于您获取更多的相关知识。