如何利用CombineFileInputFormat把netflix data set 导入到Hbase里

package com.mr.test;  

import java.io.IOException;  

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  

public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> {  

    @Override
    public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {  

        CombineFileSplit combineFileSplit = (CombineFileSplit) split;
        CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class);
        try {
            recordReader.initialize(combineFileSplit, context);
        } catch (InterruptedException e) {
            new RuntimeException("Error to initialize CombineSmallfileRecordReader.");
        }
        return recordReader;
    }  

}
package com.mr.test;  

import java.io.IOException;  

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;  

public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> {  

    private CombineFileSplit combineFileSplit;
    private LineRecordReader lineRecordReader = new LineRecordReader();
    private Path[] paths;
    private int totalLength;
    private int currentIndex;
    private float currentProgress = 0;
    private LongWritable currentKey;
    private BytesWritable currentValue = new BytesWritable();  

    public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException {
        super();
        this.combineFileSplit = combineFileSplit;
        this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引
    }  

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.combineFileSplit = (CombineFileSplit) split;
        // 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据
        FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations());
        lineRecordReader.initialize(fileSplit, context);  

        this.paths = combineFileSplit.getPaths();
        totalLength = paths.length;
        context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName());
    }  

    @Override
    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        currentKey = lineRecordReader.getCurrentKey();
        return currentKey;
    }  

<strong><span style="color:#ff0000;">   @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        System.out.println("lineRecordReader:"+lineRecordReader.getCurrentValue().toString());
        byte[] content = lineRecordReader.getCurrentValue().toString().getBytes();
        System.out.println("content:"+new String(content));
        currentValue = new BytesWritable();
        currentValue.set(content, 0, content.length);
        System.out.println("currentValue:"+new String(currentValue.getBytes()));
        return currentValue;
    }</span></strong>
    public static void main(String args[]){
        BytesWritable cv = new BytesWritable();
        String str1 = "1234567";
        String str2 = "123450";
        cv.set(str1.getBytes(), 0, str1.getBytes().length);
        System.out.println(new String(cv.getBytes()));  

        cv.setCapacity(0);  

        cv.set(str2.getBytes(), 0, str2.getBytes().length);
        System.out.println(new String(cv.getBytes()));
    }
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (currentIndex >= 0 && currentIndex < totalLength) {
            return lineRecordReader.nextKeyValue();
        } else {
            return false;
        }
    }  

    @Override
    public float getProgress() throws IOException {
        if (currentIndex >= 0 && currentIndex < totalLength) {
            currentProgress = (float) currentIndex / totalLength;
            return currentProgress;
        }
        return currentProgress;
    }  

    @Override
    public void close() throws IOException {
        lineRecordReader.close();
    }
}

更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/database/extra/

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索hadoop
, mapreduce
, apache
, java hbase
, import
, netflix
, hbase二级索引
ioException
combineinputformat、hbase inputformat、kettle hbase input、hadoop inputformat、av find input format,以便于您获取更多的相关知识。

时间: 2024-09-13 05:14:17

如何利用CombineFileInputFormat把netflix data set 导入到Hbase里的相关文章

线程-如何将存储量很大的txt文档数据导入到hbase当中

问题描述 如何将存储量很大的txt文档数据导入到hbase当中 我现在要写一个程序,将第三方导出的txt文件内容读取并放到hbase当中. 由于txt文档非常大(超过10G),我初步思路是通过单线程读取到一个线程安全的容器当中,然后再利用多线程向hbase中写入. 请问有没有什么更好的思路,谢谢! 解决方案 方案一: 1.在Linux环境利用split命令把文件分块: 2.多线程对多文件读取写入HBase 方案二: 1.要看文件的格式是怎么样的,你想存HBase的格式怎样的 2.把文件按照格式分

eclipse源代码-eclipse的源代码怎么导入到eclipse里?

问题描述 eclipse的源代码怎么导入到eclipse里? 我在网上找了一个eclipse的源代码的zip包,但是用eclipse无法将源代码import进去.在导入界面勾选源代码所在的文件夹后,下面的next按钮也没变成可用状态,也没有错误提示.

利用公路地图获取信息-利用公路地图写最短路径,怎么获取地图里的信息?百度搜索什么地图文件?

问题描述 利用公路地图写最短路径,怎么获取地图里的信息?百度搜索什么地图文件? 学软件的对地图一点不懂,就是想获取地图里的信息.比如有哪几个路口,各个路口的距离,然后写最短路径,做一个软件.为毕业设计做基础.我该找哪些东西呢?主要是怎么利用地图里的数据为我的代码服务. 解决方案 要二次开发用ArcGis这个库,地图有很多,比如http://download.csdn.net/detail/shaojun007/1630938,Google arcgis 地图数据

java-我在菜鸟教程上下载了一个JQuery的Demo,但是无法导入进Eclipse里,如何才能导入呢?

问题描述 我在菜鸟教程上下载了一个JQuery的Demo,但是无法导入进Eclipse里,如何才能导入呢? 是不是因为没有Project文件 解决方案 因为没有.project文件,所以它不是一个java工程,当然无法导入了.可以自己新建一个工程,再把这些文件复制到项目根目录下就可以了. 解决方案二: jquery是js不是java,用不着什么项目,也没有什么编译的过程,在eclipse中打开也就是把它当一个大号的文本编辑器罢了. 解决方案三: Jquery不是用eclipse打开的,Jquer

数据读写-qt 怎么将txt文本文档的数据导入到pdf里

问题描述 qt 怎么将txt文本文档的数据导入到pdf里 为什么我导入的txt数据在pdf里只有一行呢!!!! 生成的pdf 原来的txt文档内容 解决方案 明显是回车换行的处理有问题,你是怎么样处理的? 解决方案二: 用n等添加到换行的位置.

jar包导入到项目里报错

问题描述 jar包导入到项目里报错 我在网上下载的ant.jar包,每次导入到Myeclipse里总是提示missing,这个包明明存在怎么没有呢 解决方案 build path refresh validate --上面3个不好用就换个jar包 我本地测试没问题的 PS:提示能不能写全点-- 解决方案二: 完整的提示是什么,什么missing了.它所依赖的包是否齐全,下载的包本身有没有问题. 解决方案三: 其他包不存在,仔细读一下提示

swing-请问:用SWING容器怎么实现excel导入导出mysql里?谢谢!!!

问题描述 请问:用SWING容器怎么实现excel导入导出mysql里?谢谢!!! 也可以不用SWING容器,只要能使用excel批量导入导出mysql 解决方案 这个就是数据的读取和存储吧,一般使用jxl包来进行读取,参考下这个http://www.bkjia.com/ASPjc/892365.html

利用Java进行MySql数据库的导入和导出

利用Java来进行Mysql数据库的导入和导出的总体思想是通过Java来调用命令窗口执行相应的命令. MySql导出数据库的命令如下: mysqldump -uusername -ppassword -hhost -Pport exportDatabaseName > exportPath 利用Java调用命令窗口执行命令来进行MySql导入数据库一般分三步走: 第一步:登录Mysql数据库,在登录数据库的时候也可以指定登录到哪个数据库,如果指定了则可以跳过第二步: 第二步:切换数据库到需要导入

利用phpExcel实现Excel数据的导入导出(全步骤详细解析)_php技巧

很多文章都有提到关于使用phpExcel实现Excel数据的导入导出,大部分文章都差不多,或者就是转载的,都会出现一些问题,下面是本人研究phpExcel的使用例程总结出来的使用方法,接下来直接进入正题. 首先先说一下,本人的这段例程是使用在Thinkphp的开发框架上,要是使用在其他框架也是同样的方法,很多人可能不能正确的实现Excel的导入导出,问题基本上都是phpExcel的核心类引用路径出错,如果有问题大家务必要对路劲是否引用正确进行测试. (一)导入Excel 第一,在前台html页面