HBase——使用Put迁移MySql数据到Hbase

先上code:

/**
 * 功能:迁移mysql上电池历史数据到hbase
 * Created by liuhuichao on 2016/12/6.
 */
public class MySqlToHBase {

    /**
     * 获取表
     * @param tableName
     * @return
     * @throws IOException
     */
    private  HTable connectHBase(String tableName) throws IOException{
        HTable table=null;
        Configuration conf= HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","lhc-centos");
        table=new HTable(conf,tableName);
        return table;
    }

    /**
     * 获取mysql连接
     * @return
     * @throws Exception
     */
    private  java.sql.Connection connectDB() throws  Exception{
        String userName="root";
        String password="root";
        String url="jdbc:mysql://10.0.1.42:3306/energy?useUnicode=true&characterEncoding=UTF-8";
        Class.forName("com.mysql.jdbc.Driver").newInstance();
        java.sql.Connection conn=DriverManager .getConnection(url,userName,password);
        return  conn;
    }

    /**
     * 将电池历史数据表中输入导入HBase
     * @throws Exception
     */
    @Test
    public void exportFromMySqlToHBase() throws Exception{
        java.sql.Connection dbConn=null;
        HTable table=null;

        Statement stmt=null;
        String strQuery="SELECT * FROM `res_battery_data_history` limit 10000,1000000";
        dbConn=connectDB();//连接mysql
        table=connectHBase("batteryDataHistory");//连接HBase
        try{

            stmt=dbConn.createStatement();
            ResultSet rs=stmt.executeQuery(strQuery);
            long beginTime=System.currentTimeMillis();//开始
            System.out.println( "beginTime:---"+beginTime);
            while (rs.next()) {
                UUID uuid = UUID.randomUUID();
                String rowKey = uuid.toString();//作为行健
                Put put = new Put(Bytes.toBytes(rowKey));
                /**
                 * family:baseData
                 */
                // Integer id=rs.getInt("id");
                String batteryNo = rs.getString("battery_no");
                Integer batteryType = rs.getInt("battery_type");
                Float voltageDeviation = rs.getFloat("voltage_deviation");
                Float totalVoltage = rs.getFloat("total_voltage");
                Float temprature1 = rs.getFloat("temprature1");
                Float temprature2 = rs.getFloat("temprature2");
                Float chargeNum = rs.getFloat("charge_num");
                Float longtitude = rs.getFloat("longtitude");
                Float latitude = rs.getFloat("latitude");
                Float totalCurrent = rs.getFloat("total_current");
                Float soc = rs.getFloat("soc");

                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("batteryNo"), Bytes.toBytes(batteryNo));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("batteryType"), Bytes.toBytes(batteryType));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("voltageDeviation"), Bytes.toBytes(voltageDeviation));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("totalVoltage"), Bytes.toBytes(totalVoltage));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("temprature1"), Bytes.toBytes(temprature1));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("temprature2"), Bytes.toBytes(temprature2));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("chargeNum"), Bytes.toBytes(chargeNum));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("longtitude"), Bytes.toBytes(longtitude));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("latitude"), Bytes.toBytes(latitude));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("totalCurrent"), Bytes.toBytes(totalCurrent));
                put.add(Bytes.toBytes("baseData"), Bytes.toBytes("soc"), Bytes.toBytes(soc));
                /**
                 * family:volumnData
                 */
                Float vol1 = rs.getFloat("vol1");
                Float vol2 = rs.getFloat("vol2");
                Float vol3 = rs.getFloat("vol3");
                Float vol4 = rs.getFloat("vol4");
                Float vol5 = rs.getFloat("vol5");
                Float vol6 = rs.getFloat("vol6");
                Float vol7 = rs.getFloat("vol7");
                Float vol8 = rs.getFloat("vol8");
                Float vol9 = rs.getFloat("vol9");
                Float vol10 = rs.getFloat("vol10");
                Float vol11 = rs.getFloat("vol11");
                Float vol12 = rs.getFloat("vol12");
                Float vol13 = rs.getFloat("vol13");
                Float vol14 = rs.getFloat("vol14");
                Float vol15 = rs.getFloat("vol15");
                Float vol16 = rs.getFloat("vol16");
                Float vol17 = rs.getFloat("vol17");
                Float vol18 = rs.getFloat("vol18");
                Float vol19 = rs.getFloat("vol19");
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v1"), Bytes.toBytes(vol1));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v2"), Bytes.toBytes(vol2));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v3"), Bytes.toBytes(vol3));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v4"), Bytes.toBytes(vol4));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v5"), Bytes.toBytes(vol5));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v6"), Bytes.toBytes(vol6));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v7"), Bytes.toBytes(vol7));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v8"), Bytes.toBytes(vol8));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v9"), Bytes.toBytes(vol9));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v10"), Bytes.toBytes(vol10));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v11"), Bytes.toBytes(vol11));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v12"), Bytes.toBytes(vol12));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v13"), Bytes.toBytes(vol13));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v14"), Bytes.toBytes(vol14));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v15"), Bytes.toBytes(vol15));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v16"), Bytes.toBytes(vol16));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v17"), Bytes.toBytes(vol17));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v18"), Bytes.toBytes(vol18));
                put.add(Bytes.toBytes("volumnData"), Bytes.toBytes("v19"), Bytes.toBytes(vol19));
                /**
                 * family:extraData
                 */
                String remarks = rs.getString("remarks");
                String testUserName = rs.getString("test_user_name");
                String createTime = rs.getString("create_time");
                Integer createUser = rs.getInt("create_user");
                Integer source = rs.getInt("source");
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("remarks"), Bytes.toBytes(remarks));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("testUserName"), Bytes.toBytes(testUserName));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("createTime"), Bytes.toBytes(createTime));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("createUser"), Bytes.toBytes(createUser));
                put.add(Bytes.toBytes("extraData"), Bytes.toBytes("source"), Bytes.toBytes(source));
                table.put(put);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            try{
                if(stmt !=null){
                    stmt.close();
                }
                if(dbConn !=null){
                    dbConn.close();
                }
                if(table!=null){
                    table.close();
                }
                long endTime=System.currentTimeMillis();//开始
                System.out.println( "endTime:---"+endTime);
            }catch(Exception e){
                e.printStackTrace();
            }
        }

    }
}

   结果:10000 row(s) in 218.0310 seconds

      (我是单机版的HBase,未使用HDFS做底层存储。)

     刚开始的sql没有加limit,直接上来把测试表里面的三百多万条直接插出来,结果还没查完,就堆栈溢出了;额。。。后来先塞了一万条到HBase,额,还好,速度很快,但是感觉这种导入方式不具有通用性,对于大量数据的导入是不ok的。

时间: 2024-08-31 23:58:30

HBase——使用Put迁移MySql数据到Hbase的相关文章

数据导入HBase最常用的三种方式及实践分析

要使用Hadoop,数据合并至关重要,HBase应用甚广.一般而言,需要 针对不同情景模式将现有的各种类型的数据库或数据文件中的数据转入至HBase 中.常见方式为:使用HBase的API中的Put方法: 使用HBase 的bulk load 工具:使用定制的MapReduce Job方式.<HBase Administration Cookbook>一书对这三种方式有着详尽描述,由 ImportNew 的陈晨进行了编译,很有收获,推荐给大家. HBase数据迁移(1)-使用HBase的API

从关系型Mysql到Nosql HBase的迁移实践

2013年11月22-23日,作为国内唯一专注于Hadoop技术与应用分享的大规模行业盛会,2013 Hadoop中国技术峰会(China Hadoop Summit 2013)于北京福朋喜来登集团酒店隆重举行.来自国内外各http://www.aliyun.com/zixun/aggregation/17611.html">行业领域的近千名CIO.CTO.架构师.IT经理.咨询顾问.工程师.Hadoop技术爱好者,以及从事Hadoop研究与推广的IT厂商和技术专家将共襄盛举. 在SQL&

使用Sqoop从Mysql向云HBase同步数据

Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具.本文介绍如何使用sqoop将数据从Mysql导入到HBase.从成本的角度考虑,针对没有hadoop集群的用户,重点介绍单机运行sqoop的配置和参数. 安装 要完成从Mysql向HBase导入数据的任务,需要安装和配置的软件包括hadoop,sqoop,mysql-connector和HBase.我们针对单机运行sqoop的情况提供了四合一的安装包简化安装流程.如果是在hadoop集群上运行sqoop,可以参考Sqoop官

sqoop 从oracle导数据到hbase中报错

问题描述 sqoop 从oracle导数据到hbase中报错 解决方案 通过Sqoop实现Mysql / Oracle 与HDFS / Hbase互导数据通过Sqoop实现Mysql / Oracle 与HDFS / Hbase互导数据通过Sqoop实现Mysql / Oracle 与HDFS / Hbase互导数据

HBase原理-迟到的‘数据读取流程’部分细节

笔者去年年底分享了一篇关于HBase中数据读取(scan)逻辑的文章(戳这里),主要介绍了scan的基本流程以及实现框架,看官反应甚是强烈.文章最后还挖了一个不大不小的坑,承诺后期会就部分细节进行深入分析,然而因为部分原因这个坑一直没填上.HBase-Scan的细节其实并不好讲,涉及太多代码层面的底层逻辑,大部分童鞋应该都不会太过关心.虽说如此,挖了的坑,含着泪也要填上,当然为了把坑填好,笔者将会使出洪荒之力将这些核心细节通过各种辅助化方式(示例.图解等)进行解读,方便读者理解.注:笔者能力有限

利用importtsv导入数据到hbase。假如数据第一列不是唯一怎么办

问题描述 利用importtsv导入数据到hbase.假如数据第一列不是唯一怎么办 利用importtsv导入数据,假如数据文件里第一列不是唯一的,请问怎么导入?可以指定主键吗?比如指定主键是两个字段相加 解决方案 可以使用HBASE_ROWKEY关键字指定主键

sqoop将oracle数据导入hbase的问题,求各位大神们指导

问题描述 sqoop将oracle数据导入hbase的问题,求各位大神们指导 sqoop将oracle数据导入hbase,要求可以Java连接服务器上的sqoop,sqoop1可以直接实现但是没有Java client的API,sqoop2 有client但是不能直接实现oracle到hbase,这是我得出的结论,请教大神们,有没有好的方法?

HBase伪分布式安装(HDFS)+ZooKeeper安装+HBase数据操作+HBase架构体系

HBase1.2.2伪分布式安装(HDFS)+ZooKeeper-3.4.8安装配置+HBase表和数据操作+HBase的架构体系+单例安装,记录了在Ubuntu下对HBase1.2.2的实践操作,HBase的安装到数据库表的操作.包含内容1.HBase单例安装2.HBase伪分布式安装(基于Hadoop的HDFS)过程,3.HBase的shell编程,对HBase表的创建,删除等的命令,HBase对数据的增删查等操作.4.简单概述了Hbase的架构体系.5.zookeeper的单例安装和常用操

hbase-Hive中在整合HBase的表中插入数据时报错

问题描述 Hive中在整合HBase的表中插入数据时报错 伪分布式模式下整合Hadoop 2.2.0(自己基于Ubuntu 64位系统编译的)+HBase 0.98+Hive 0.14,其他功能操作都正常,但是在Hive中往基于HBase存储的表中插入数据时报错,网上找了很多方法,但都没用,具体错误如下: java.lang.IllegalArgumentException: Can not create a Path from an empty string at org.apache.had