如何利用mapreduce批量读写hbase数据

package com.mr.test;  

import java.io.IOException;
import java.util.Iterator;  

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.log4j.Logger;  

public class MRHbase {
    private static Logger log = Logger.getLogger(MRHbase.class);
    public static String family = "charactor";
    public static String col = "hobby";  

    public static class HMap extends TableMapper<Text, Text> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Context context) throws IOException, InterruptedException {
//          KeyValue kv = value.getColumnLatest(family.getBytes(),
//                  col.getBytes());
//          context.write(new Text(Bytes.toString(kv.getKey())),
//                  new Text(Bytes.toString(kv.getValue())));
            byte[] v = value.getValue(family.getBytes(), col.getBytes());
            byte[] r = value.getRow();
            context.write(new Text(Bytes.toString(v)), new Text(Bytes.toString(r)));
        }
    }  

    public static class Reduce extends MapReduceBase implements
            Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            while (values.hasNext()) {
                output.collect(key, values.next());
            }
        }
    }  

    public static void main(String[] args) {
        Configuration conf = HBaseConfiguration.create();
        try {
            Job job = new Job(conf, "hbase test");
            job.setJarByClass(MRHbase.class);
            Scan scan = new Scan();
            scan.addColumn(family.getBytes(), col.getBytes());
            TableMapReduceUtil.initTableMapperJob("person", scan, HMap.class,
                    Text.class, Text.class, job);
            job.setOutputFormatClass(TextOutputFormat.class);
            FileOutputFormat.setOutputPath(job, new Path(args[0]));
            job.waitForCompletion(true);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }  

    }
}

更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/database/extra/

注:要把zookeeper.jar添加到hadoop/lib目录下,master&slaves

//load ntf_data to hbase
package com.ntf.data;  

import java.io.IOException;
import java.util.Iterator;  

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.GenericOptionsParser;  

public class BulkImportData {  

    public static class TokenizerMapper extends
            Mapper<Object, Text, Text, Text> {
        public Text _key = new Text();
        public Text _value = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] splits = value.toString().split(",");
            if(splits.length==3){
                InputSplit inputSplit=(InputSplit)context.getInputSplit();
                String filename=((FileSplit)inputSplit).getPath().getName();
                filename = filename.replace("mv_", "");
                filename = filename.replace(".txt", "");
                _key.set(splits[0]+"_"+filename);
                context.write(_key, value);
            }
        }
    }  

    public static class IntSumReducer extends
            TableReducer<Text, Text, ImmutableBytesWritable> {
        public void reduce(Text key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            Iterator<Text> itr = values.iterator();
            while(itr.hasNext()){
                Text t = itr.next();
                String[] strs = t.toString().split(",");
                if(strs.length!=3)continue;
                Put put = new Put(key.getBytes());
                put.add(Bytes.toBytes("content"), Bytes.toBytes("score"), Bytes.toBytes(strs[1]));
                put.add(Bytes.toBytes("content"), Bytes.toBytes("date"), Bytes.toBytes(strs[2]));
                context.write(new ImmutableBytesWritable(key.getBytes()), put);
            }
        }
    }  

    public static void main(String[] args) throws Exception {
        String tablename = "ntf_data";
        Configuration conf = HBaseConfiguration.create();
        HBaseAdmin admin = new HBaseAdmin(conf);
        if (admin.tableExists(tablename)) {
            admin.disableTable(tablename);
            admin.deleteTable(tablename);
        }
        HTableDescriptor htd = new HTableDescriptor(tablename);
        HColumnDescriptor hcd = new HColumnDescriptor("content");
        htd.addFamily(hcd);
        admin.createTable(htd);
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 1) {
            System.err
                    .println("Usage: wordcount <in> <out>" + otherArgs.length);
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setMapperClass(TokenizerMapper.class);
        job.setJarByClass(BulkImportData.class);
        job.setNumReduceTasks(5);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        TableMapReduceUtil.initTableReducerJob(tablename, IntSumReducer.class,
                job);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索hbase
, hadoop
, class
, mapreduce
, apache
, java hbase
, hadoop 视频点播
, ambari hadoop
, import
, hadoop mapreduce
, hadoop mapreduce
, hadoop job
hadoop hbase
mapreduce读写hbase、mapreduce hbase、hbase mapreduce 例子、mapreduce写入hbase、mapreduce读取hbase,以便于您获取更多的相关知识。

时间: 2024-12-24 07:08:50

如何利用mapreduce批量读写hbase数据的相关文章

如何提高spark批量读取HBase数据的性能

问题描述 Configurationconf=HBaseConfiguration.create();StringtableName="testTable";Scanscan=newScan();scan.setCaching(10000);scan.setCacheBlocks(false);conf.set(TableInputFormat.INPUT_TABLE,tableName);ClientProtos.Scanproto=ProtobufUtil.toScan(scan)

利用 MapReduce分析明星微博数据实战

互联网时代的到来,使得名人的形象变得更加鲜活,也拉近了明星和粉丝之间的距离.歌星.影星.体育明星.作家等名人通过互联网能够轻易实现和粉丝的互动,赚钱也变得前所未有的简单.同时,互联网的飞速发展本身也造就了一批互联网明星,这些人借助新的手段,最大程度发挥了粉丝经济的能量和作用,在互联网时代赚得盆满钵满. 正是基于这样一个大背景,今天我们做一个分析明星微博数据的小项目. 1.项目需求 自定义输入格式,将明星微博数据排序后按粉丝数关注数 微博数分别输出到不同文件中. 2.数据集 明星 明星微博名称 粉

hive数据批量导入hbase过程中遇到分区文件不存在异常

问题描述 hive数据批量导入hbase过程中遇到分区文件不存在异常 在做一个hive数据批量导入hbase的方法,根据官方文档一步一步的做下来,但是在生成HFILE文件时却报了一个比较让人纠结的错误,在网上找了很长时间,都木有合适的答案,在hive命令行中执行的代码如下: SET mapred.reduce.tasks=5; SET hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; SET t

sql 利用存储过程批量导入数据

什么是存储过程 存储过程(stored procedure)是一组为了完成特定功能的sql语句集,是利用sql server所提供的transact-sql语言所编写的程序.经编译后存储在数据库教程中.存储过程是数据库中的一个重要对象,用户通过指定存储过程的名字并给出参数(如果该存储过程带有参数)来执行它.存储过程是由流控制和sql语句书写的过程,这个过程经编译和优化后存储在数据库服务器中,存储过程可由应用程序通过一个调用来执行,而且允许用户声明变量 .同时,存储过程可以接收和输出参数.返回执行

如何利用php数组对百万数据进行排重

如何利用php数组对百万数据进行排重 在平时的工作中,经常接到要对网站的会员进行站内信.手机短信.email进行群发信息的通知,用户列表一般由别的同事提供,当中难免会有重复,为了避免重复发送,所以我在进行发送信息前要对他们提供的用户列表进行排重,下面我以uid列表来讲讲我是如何利用php数组进行排重的. 假如得到一个uid列表,数量在百万行以上,格式如下: 10001000 10001001 10001002 ................ 10001000 ................

HBase数据同步到ElasticSearch的方案

ElasticSearch的River机制 ElasticSearch自身提供了一个River机制,用于同步数据. 这里可以找到官方目前推荐的River: http://www.elasticsearch.org/guide/en/elasticsearch/rivers/current/ 但是官方没有提供HBase的River. 其实ES的River非常简单,就是一个用户打包好的jar包,ES负责找到一个node,并启动这个River.如果node失效了,会自动找另外一个node来启动这个Ri

HBase数据导入工具总结

本文对HBase常用的数据导入工具进行介绍,并结合云HBase常见的导入场景,给出建议的迁移工具和参考资料. HBase之间数据导入常用工具 HBase提供了几种数据迁移工具,其中基于API调用的有CopyTable,Export&Import.基于写HDFS的有distcp,snapshot. 这里要说明的是,本文作为一般性的介绍,不能忽略常用的工具distcp和snapshot,但是由于云HBase默认不开启HDFS端口,所以在云HBase上面基于HDFS的方法都是用不了的.我们推荐用户使用

Impala之加载HBase数据

        Impala如何加载HBase数据?本文将为大家进行详细介绍Impala加载HBase数据的步骤.         第一步:HBase创建表(或选择已有表)         HBase shell命令行执行命令: create 'impala_hbase_test_table', {NAME => 'f', VERSION => 3, COMPRESSION => 'SNAPPY'}         如图所示:         第二步:HBase表存入数据        

使用CopyTable同步HBase数据

CopyTable是Hbase提供的一个数据同步工具,可以用于同步表的部分或全部数据.本文介绍如何使用CopyTable同步HBase数据.针对没有hadoop集群的用户,还介绍了单机运行CopyTable的配置和参数.根据我们的测试,在表不压缩的情况下,单机版CopyTable可以达到1小时100G左右的导入速度.10T以下的数据都可以使用CopyTable导入数据. 准备工作 1 安装HBaseCopyTable依赖于hadoop mapreduce.如果源HBase集群中开启了mapred