package com.mr.test; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.log4j.Logger; public class MRHbase { private static Logger log = Logger.getLogger(MRHbase.class); public static String family = "charactor"; public static String col = "hobby"; public static class HMap extends TableMapper<Text, Text> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // KeyValue kv = value.getColumnLatest(family.getBytes(), // col.getBytes()); // context.write(new Text(Bytes.toString(kv.getKey())), // new Text(Bytes.toString(kv.getValue()))); byte[] v = value.getValue(family.getBytes(), col.getBytes()); byte[] r = value.getRow(); context.write(new Text(Bytes.toString(v)), new Text(Bytes.toString(r))); } } public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { while (values.hasNext()) { output.collect(key, values.next()); } } } public static void main(String[] args) { Configuration conf = HBaseConfiguration.create(); try { Job job = new Job(conf, "hbase test"); job.setJarByClass(MRHbase.class); Scan scan = new Scan(); scan.addColumn(family.getBytes(), col.getBytes()); TableMapReduceUtil.initTableMapperJob("person", scan, HMap.class, Text.class, Text.class, job); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); job.waitForCompletion(true); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/database/extra/
注:要把zookeeper.jar添加到hadoop/lib目录下,master&slaves
//load ntf_data to hbase package com.ntf.data; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.GenericOptionsParser; public class BulkImportData { public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> { public Text _key = new Text(); public Text _value = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split(","); if(splits.length==3){ InputSplit inputSplit=(InputSplit)context.getInputSplit(); String filename=((FileSplit)inputSplit).getPath().getName(); filename = filename.replace("mv_", ""); filename = filename.replace(".txt", ""); _key.set(splits[0]+"_"+filename); context.write(_key, value); } } } public static class IntSumReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> itr = values.iterator(); while(itr.hasNext()){ Text t = itr.next(); String[] strs = t.toString().split(","); if(strs.length!=3)continue; Put put = new Put(key.getBytes()); put.add(Bytes.toBytes("content"), Bytes.toBytes("score"), Bytes.toBytes(strs[1])); put.add(Bytes.toBytes("content"), Bytes.toBytes("date"), Bytes.toBytes(strs[2])); context.write(new ImmutableBytesWritable(key.getBytes()), put); } } } public static void main(String[] args) throws Exception { String tablename = "ntf_data"; Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableExists(tablename)) { admin.disableTable(tablename); admin.deleteTable(tablename); } HTableDescriptor htd = new HTableDescriptor(tablename); HColumnDescriptor hcd = new HColumnDescriptor("content"); htd.addFamily(hcd); admin.createTable(htd); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 1) { System.err .println("Usage: wordcount <in> <out>" + otherArgs.length); System.exit(2); } Job job = new Job(conf, "word count"); job.setMapperClass(TokenizerMapper.class); job.setJarByClass(BulkImportData.class); job.setNumReduceTasks(5); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class, job); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索hbase
, hadoop
, class
, mapreduce
, apache
, java hbase
, hadoop 视频点播
, ambari hadoop
, import
, hadoop mapreduce
, hadoop mapreduce
, hadoop job
hadoop hbase
mapreduce读写hbase、mapreduce hbase、hbase mapreduce 例子、mapreduce写入hbase、mapreduce读取hbase,以便于您获取更多的相关知识。