将HBase通过mr传到hdfs上

package com.zhiyou100.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MRFromHBase {

    public static class MrFromHBaseMap extends TableMapper<Text, Text> {
        private Text outputKey = new Text();
        private Text outputValue = new Text();
        private Cell cell;
        private String rowKey;
        private String columnFamily;
        private String columnQualify;
        private String columnValue;

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                throws IOException, InterruptedException {
            // 从Result 中获取数据输出初速
            CellScanner scanner = value.cellScanner();
            while (scanner.advance()) {
                cell = scanner.current();
                rowKey = Bytes.toString(CellUtil.cloneRow(cell));
                columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
                columnQualify = Bytes.toString(CellUtil.cloneQualifier(cell));
                columnValue = Bytes.toString(CellUtil.cloneValue(cell));
                outputKey.set(rowKey);
                outputValue.set("columnFamily:" + columnFamily + "columnQualify:" + columnQualify + "columnValue:"
                        + columnValue);
                context.write(outputKey, outputValue);
            }
        }

    }
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration =HBaseConfiguration.create();
        Job job =Job.getInstance(configuration);
        job.setJarByClass(MRFromHBase.class);
        job.setJobName("mapreduce 从hbase中读取数据");
        //不需要reducer
        job.setNumReduceTasks(0);
        Scan scan =new Scan();
        TableMapReduceUtil.initTableMapperJob("bd17:fromjava", scan,MrFromHBaseMap.class, Text.class, Text.class, job);
        //设置输出路径
        Path outputDir =new Path("/fromhbase");
        outputDir.getFileSystem(configuration).delete(outputDir, true);
        FileOutputFormat.setOutputPath(job, outputDir);

        System.exit(job.waitForCompletion(true)?1:0);
    }
}

时间: 2024-11-02 00:20:30

将HBase通过mr传到hdfs上的相关文章

HDFS设计思路,HDFS使用,查看集群状态,HDFS,HDFS上传文件,HDFS下载文件,yarn web管理界面信息查看,运行一个mapreduce程序,mapreduce的demo

26 集群使用初步 HDFS的设计思路 l 设计思想   分而治之:将大文件.大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析:   l 在大数据系统中作用: 为各类分布式运算框架(如:mapreduce,spark,tez,--)提供数据存储服务   l 重点概念:文件切块,副本存放,元数据 26.1 HDFS使用 1.查看集群状态 命令:   hdfs  dfsadmin –report 可以看出,集群共有3个datanode可用 也可打开web控制台查看

hdfs上传文件难以刷新-hdfs文件系统刷新问题

问题描述 hdfs文件系统刷新问题 hdfs上传文件难以刷新,先是在/usr那里跳个1 ,然后立马变成0 解决方案 看看是不是权限不够,包括hdfs权限和登录用户权限

如何用java程序把本地文件拷贝到hdfs上并显示进度

把程序打成jar包放到Linux上 转到目录下执行命令 hadoop jar mapreducer.jar /home/clq/export/java/count.jar  hdfs://ubuntu:9000/out06/count/ 上面一个是本地文件,一个是上传hdfs位置 成功后出现:打印出来,你所要打印的字符. package com.clq.hdfs; import java.io.BufferedInputStream; import java.io.FileInputStream

spark计算hdfs上的文件时报错

问题描述 spark计算hdfs上的文件时报错 scala> val rdd = sc.textFile("hdfs://...") scala> rdd.count java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$AppendRequestProto overrides final method getUnknownFields.

hbase-HBase中的数据export到HDFS上

问题描述 HBase中的数据export到HDFS上 我现在把HDFS上的数据import到HBase中去了,现在想要知道,如何将HBase中的数据export到HDFS上,并且是以自己想要的格式存放到HDFS上,如:原文件是什么格式,我导出的就是什么格式,大神帮帮忙啊

做一个flume收集到另一个flume,再传给hdfs,但是现在flume连接hdfs出现如下错误

问题描述 做一个flume收集到另一个flume,再传给hdfs,但是现在flume连接hdfs出现如下错误 ![ 错误主要是这个:Failed to start agent because dependencies were not found in classpath.上图是报错,麻烦大神解决 下面是配置文件 #master_agent master_agent.channels = c2 master_agent.sources = s2 master_agent.sinks = k2 #

HBase源码分析之HRegionServer上MemStore的flush处理流程(二)

        继上篇文章<HBase源码分析之HRegionServer上MemStore的flush处理流程(一)>遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程,重点讲述下如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的.         我们先来看下第一个问题:如何选择一个HRegion进行flush以缓解MemStore压力.上文中我们讲到过flush处理线程如果从flus

PHP单文件上传原理及上传函数的封装

服务器(临时文件)-->指定目录,当文件进入服务器时它就是临时文件了,这时操作中要用临时文件的名称tmp_name. //在客户端设置上传文件的限制(文件类型和大小)是不安全的,因为客户能通过源代码修改限制,所以在服务端这里设置限制. //设置编码为UTF-8,以避免中文乱码 header('Content-Type:text/html;charset=utf-8'); //通过$_FILES接收上传文件的信息 $fileInfo = $_FILES['myFile']; function up

如何在上传的图片上加上版权文字

上传 很多时候需要在用户上传的图片上加上版权或者一些其他的附加文字信息,如何实现这样的功能,下面帖个简单实现的例子,起到抛砖引玉的作用.<%@ Page Language="c#" Debug="true" Trace="true"%><%@ Import Namespace="System.IO" %><%@ Import Namespace="System.Drawing"