大数据应用之HBase数据插入性能优化之多线程并行插入测试案例

一、引言:

  上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。集群指标是:CPU双核1.83,虚拟机512M内存,集群部署单点模式。本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示:

二、源程序:


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;

public class HBaseImportEx {
    static Configuration hbaseConfig = null;
    public static HTablePool pool = null;
    public static String tableName = "T_TEST_1";
    static{
         //conf = HBaseConfiguration.create();
         Configuration HBASE_CONFIG = new Configuration();
         HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000");
         HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133");
         HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
         hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG);

         pool = new HTablePool(hbaseConfig, 1000);
    }
    /*
     * Insert Test single thread
     * */
    public static void SingleThreadInsert()throws IOException
    {
        System.out.println("---------开始SingleThreadInsert测试----------");
        long start = System.currentTimeMillis();
        //HTableInterface table = null;
        HTable table = null;
        table = (HTable)pool.getTable(tableName);
        table.setAutoFlush(false);
        table.setWriteBufferSize(24*1024*1024);
        //构造测试数据
        List<Put> list = new ArrayList<Put>();
        int count = 10000;
        byte[] buffer = new byte[350];
        Random rand = new Random();
        for(int i=0;i<count;i++)
        {
            Put put = new Put(String.format("row %d",i).getBytes());
            rand.nextBytes(buffer);
            put.add("f1".getBytes(), null, buffer);
            //wal=false
            put.setWriteToWAL(false);
            list.add(put);
            if(i%10000 == 0)
            {
                table.put(list);
                list.clear();
                table.flushCommits();
            }
        }
        long stop = System.currentTimeMillis();
        //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);

        System.out.println("插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");

        System.out.println("---------结束SingleThreadInsert测试----------");
    }
    /*
     * 多线程环境下线程插入函数
     *
     * */
    public static void InsertProcess()throws IOException
    {
        long start = System.currentTimeMillis();
        //HTableInterface table = null;
        HTable table = null;
        table = (HTable)pool.getTable(tableName);
        table.setAutoFlush(false);
        table.setWriteBufferSize(24*1024*1024);
        //构造测试数据
        List<Put> list = new ArrayList<Put>();
        int count = 10000;
        byte[] buffer = new byte[256];
        Random rand = new Random();
        for(int i=0;i<count;i++)
        {
            Put put = new Put(String.format("row %d",i).getBytes());
            rand.nextBytes(buffer);
            put.add("f1".getBytes(), null, buffer);
            //wal=false
            put.setWriteToWAL(false);
            list.add(put);
            if(i%10000 == 0)
            {
                table.put(list);
                list.clear();
                table.flushCommits();
            }
        }
        long stop = System.currentTimeMillis();
        //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);

        System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
    }

    /*
     * Mutil thread insert test
     * */
    public static void MultThreadInsert() throws InterruptedException
    {
        System.out.println("---------开始MultThreadInsert测试----------");
        long start = System.currentTimeMillis();
        int threadNumber = 10;
        Thread[] threads=new Thread[threadNumber];
        for(int i=0;i<threads.length;i++)
        {
            threads[i]= new ImportThread();
            threads[i].start();
        }
        for(int j=0;j< threads.length;j++)
        {
             (threads[j]).join();
        }
        long stop = System.currentTimeMillis();

        System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗时:"+ (stop - start)*1.0/1000+"s");
        System.out.println("---------结束MultThreadInsert测试----------");
    }    

    /**
     * @param args
     */
    public static void main(String[] args)  throws Exception{
        // TODO Auto-generated method stub
        //SingleThreadInsert();
        MultThreadInsert();

    }

    public static class ImportThread extends Thread{
        public void HandleThread()
        {
            //this.TableName = "T_TEST_1";

        }
        //
        public void run(){
            try{
                InsertProcess();
            }
            catch(IOException e){
                e.printStackTrace();
            }finally{
                System.gc();
                }
            }
        }

}




三、说明

1.线程数设置需要根据本集群硬件参数,实际测试得出。否则线程过多的情况下,总耗时反而是下降的。

2.单笔提交数对性能的影响非常明显,需要在自己的环境下,找到最理想的数值,这个需要与单条记录的字节数相关。

四、测试结果

---------开始MultThreadInsert测试----------

线程:8插入数据:10000共耗时:1.328s
线程:16插入数据:10000共耗时:1.562s
线程:11插入数据:10000共耗时:1.562s
线程:10插入数据:10000共耗时:1.812s
线程:13插入数据:10000共耗时:2.0s
线程:17插入数据:10000共耗时:2.14s
线程:14插入数据:10000共耗时:2.265s
线程:9插入数据:10000共耗时:2.468s
线程:15插入数据:10000共耗时:2.562s
线程:12插入数据:10000共耗时:2.671s
MultThreadInsert:100000共耗时:2.703s
---------结束MultThreadInsert测试----------

 备注:该技术专题讨论正在群Hadoop高级交流群:293503507同步直播中,敬请关注。

时间: 2024-12-30 19:43:49

大数据应用之HBase数据插入性能优化之多线程并行插入测试案例的相关文章

APMCon2017 | 一大波技术大神来袭,你要的性能优化干货这里全都有

APMCon2017 | 一大波技术大神来袭,你要的性能优化干货这里全都有 在波涛汹涌的技术世界里,如果不希望自己的小船说翻就翻,就需要跟着大神多学学, 所谓听君一席话,胜读十年书.现在就有个跟着大神学技术的好机会 就在今年的8月10-11日,听云联合了极客邦科技 / InfoQ将共同主办国内第二届应用性能管理大会-APMCon 2017,会议内容聚焦了行业内最新的技术和最接地气的实践案例,将共同探讨APM相关的性能优化.技术方案以及创新思路,可以为更多的行业从业者指点应用效能提升的迷津.就在第

【数据蒋堂】多维分析的后台性能优化手段

" 开篇辞 <数据蒋堂>的作者蒋步星,从事信息系统建设和数据处理长达20多年的时间.他丰富的工程经验与深厚的理论功底相互融合.创新思想与传统观念的相互碰撞,虚拟与现实的相互交织,产生出了一篇篇的沥血之作.此连载的内容涉及从数据呈现.采集到加工计算再到存储以及挖掘等各个方面.大可观数据世界之远景.小可看技术疑难之细节.针对数据领域一些技术难点,站在研发人员的角度从浅入深,进行全方位.360度无死角深度剖析:对于一些业内观点,站在技术人员角度阐述自己的思考和理解.蒋步星还会对大数据的发展

MySQL批量SQL插入性能优化

对于一些数据量较大的系统,数据库面临的问题除了查询效率低下,还有就是数据入库时间长.特别像报表系统,每天花费在数据导入上的时间可能会长达几个小时或十几个小时之久.因此,优化数据库插入性能是很有意义的. 经过对MySQL innodb的一些性能测试,发现一些可以提高insert效率的方法,供大家参考参考. 1. 一条SQL语句插入多条数据. 常用的插入语句如: INSERT INTO `insert_table` (`datetime`, `uid`, `content`, `type`) VAL

【推荐】 RAC 性能优化全攻略与经典案例剖析

在近期的第七届数据技术嘉年华上,云和恩墨技术专家曾令军做了"RAC性能优化实战"为主题的演讲,分享了从硬件架构.系统与参数配置.应用设计以及工作负载管理这四个层面,剖析在RAC性能优化的过程中,应当注意的问题以及可以借鉴的经验和思路.我们再次分享出来,希望对各位有所指导借鉴. RAC硬件架构 "千尺之台,始于垒土",硬件架构是决定RAC环境运行性能最基础的部分.下面是一个比较简单的RAC架构拓扑图,一个存储.两台主机.三条网络,构成了一套RAC环境. 用户通过业务网

CYQ.Data 轻量数据层之路 MDataTable 绑定性能优化之章(十一)

昨天jyk进群后,用Microsoft Application Center Test 对CYQ.Data 框架进行进行了一下压力测试 然后截了几张图上来,只有纯图如下: 1:使用了框架:sql 2000的分页存储过程[临时表分的页]: 2:把存储过程直接换成select语句: 3:他的框架测试结果: 4:这是测试结果了. 以下是说明: 1.DataTable :714次/秒 2.MDataTable:559 次/秒 (简单存储过程) 3.MDataTable:500 次/秒 (完整存储过程)

《Spark大数据处理:技术、应用与性能优化》——第1章 Spark 简 介1.1 Spark是什么

第1章 Spark 简 介 本章主要介绍Spark大数据计算框架.架构.计算模型和数据管理策略及Spark在工业界的应用.围绕Spark的BDAS 项目及其子项目进行了简要介绍.目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL.Spark Streaming.GraphX.MLlib等子项目,本章只进行简要 1.1 Spark是什么 介绍,后续章节再详细阐述.Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的

《Spark大数据处理:技术、应用与性能优化》——3.2 弹性分布式数据集

3.2 弹性分布式数据集 本节简单介绍RDD,并介绍RDD与分布式共享内存的异同.3.2.1 RDD简介在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(resilient distributed dataset,RDD),它是逻辑集中的实体,在集群中的多台机器上进行了数据分区.通过对多台机器上不同RDD分区的控制,就能够减少机器之间的数据重排(data shuffling).Spark提供了"partitionBy"运算符,能够通过集群中多台机器之间对原始RDD进行数据

《Spark大数据处理:技术、应用与性能优化》——1.2 Spark生态系统BDAS

1.2 Spark生态系统BDAS 目前,Spark已经发展成为包含众多子项目的大数据计算平台.伯克利将Spark的整个生态系统称为伯克利数据分析栈(BDAS).其核心框架是Spark,同时BDAS涵盖支持结构化数据SQL查询与分析的查询引擎Spark SQL和Shark,提供机器学习功能的系统MLbase及底层的分布式机器学习库MLlib.并行图计算框架GraphX.流计算框架Spark Streaming.采样近似计算查询引擎BlinkDB.内存分布式文件系统Tachyon.资源管理框架Me

《Spark大数据处理:技术、应用与性能优化》——第3章 Spark计算模型3.1 Spark程序模型

第3章 Spark计算模型 创新都是站在巨人的肩膀上产生的,在大数据领域也不例外.微软的Dryad使用DAG执行模式.子任务自由组合的范型.该范型虽稍显复杂,但较为灵活.Pig也针对大关系表的处理提出了很多有创意的处理方式,如flatten.cogroup.经典虽难以突破,但作为后继者的Spark借鉴经典范式并进行创新.经过实践检验,Spark的编程范型在处理大数据时显得简单有效.的数据处理与传输模式也大获全胜.Spark站在巨人的肩膀上,依靠Scala强有力的函数式编程.Actor通信模式.闭