hcatalog简介和使用

Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要hive 0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。

HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance

由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode

HiveMetaStoreClient.java

public String getDelegationToken(String owner, String renewerKerberosPrincipalName) throws
    MetaException, TException {
  if (localMetaStore) {
    throw new UnsupportedOperationException("getDelegationToken() can be " +
        "called only in thrift (non local) mode");
  }
  return client.get_delegation_token(owner, renewerKerberosPrincipalName);
}

HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表

HCatInputFormat API:

public static void setInput(Job job,
    InputJobInfo inputJobInfo) throws IOException;

先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据

public static HCatSchema getTableSchema(JobContext context)
    throws IOException;

在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息

HCatOutputFormat API:

public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;

OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map<String, String>,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values

比如要要写入指定的partition(dt='2013-06-13',country='china' ),可以这样写

Map<String, String> partitionValues = new HashMap<String, String>();
partitionValues.put("dt", "2013-06-13");
partitionValues.put("country", "china");
HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);
HCatOutputFormat.setOutput(job, info);

public static HCatSchema getTableSchema(JobContext context) throws IOException;

获取之前HCatOutputFormat.setOutput指定的table schema信息

public static void setSchema(final Job job, final HCatSchema schema) throws IOException;

设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息

更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/webkf/tools/

下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中

import java.io.IOException;
import java.util.Iterator;  

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;  

public class GroupByGuid extends Configured implements Tool {  

    @SuppressWarnings("rawtypes")
    public static class Map extends
            Mapper<WritableComparable, HCatRecord, Text, IntWritable> {
        HCatSchema schema;
        Text guid;
        IntWritable one;  

        @Override
        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
                throws IOException, InterruptedException {
            guid = new Text();
            one = new IntWritable(1);
            schema = HCatInputFormat.getTableSchema(context);
        }  

        @Override
        protected void map(WritableComparable key, HCatRecord value,
                Context context) throws IOException, InterruptedException {
            guid.set(value.getString("guid", schema));
            context.write(guid, one);
        }
    }  

    @SuppressWarnings("rawtypes")
    public static class Reduce extends
            Reducer<Text, IntWritable, WritableComparable, HCatRecord> {
        HCatSchema schema;  

        @Override
        protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
                throws IOException, InterruptedException {
            schema = HCatOutputFormat.getTableSchema(context);
        }  

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            Iterator<IntWritable> iter = values.iterator();
            while (iter.hasNext()) {
                sum++;
                iter.next();
            }
            HCatRecord record = new DefaultHCatRecord(2);
            record.setString("guid", schema, key.toString());
            record.setInteger("count", schema, sum);
            context.write(null, record);
        }
    }  

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();  

        String dbname = args[0];
        String inputTable = args[1];
        String filter = args[2];
        String outputTable = args[3];
        int reduceNum = Integer.parseInt(args[4]);  

        Job job = new Job(conf,
                "GroupByGuid, Calculating every guid's pageview");
        HCatInputFormat.setInput(job,
                InputJobInfo.create(dbname, inputTable, filter));  

        job.setJarByClass(GroupByGuid.class);
        job.setInputFormatClass(HCatInputFormat.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(WritableComparable.class);
        job.setOutputValueClass(DefaultHCatRecord.class);
        job.setNumReduceTasks(reduceNum);  

        HCatOutputFormat.setOutput(job,
                OutputJobInfo.create(dbname, outputTable, null));
        HCatSchema s = HCatOutputFormat.getTableSchema(job);
        HCatOutputFormat.setSchema(job, s);  

        job.setOutputFormatClass(HCatOutputFormat.class);  

        return (job.waitForCompletion(true) ? 0 : 1);
    }  

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new GroupByGuid(), args);
        System.exit(exitCode);
    }
}

其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了

作者:csdn博客 lalaguozhe

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索mapreduce
, apache
, metastore
, schema
, exitcode
, job
, import
, hive mapreduce
ioException
hcatalog 简介、hive hcatalog 使用、hcatalog 使用、hcatalog、hive hcatalog,以便于您获取更多的相关知识。

时间: 2024-10-06 03:20:52

hcatalog简介和使用的相关文章

大数据相关开源系统简介汇总

本片博客介绍大数据相关的开源系统以及他们对应的一句话简介, 对于各位想大概了解大数据都有哪些开源系统的同学有帮助.各种相关开源系统简介:   如下是Apache基金支持的开源软件 hdfs   跟GFS类似, 一个分布式文件系统.   mapreduce   跟Google的MapReduce类似, 一个典型的简单的分布式计算框架.   yarn   资源管理系统, 跟Mesos类比.   Avro   跟PB类似, 用于将数据结构序列化成字节码, 在不同的语言之间切换.   官方举例是将C转换

《Hadoop技术详解》一第1章 简介

第1章 简介 Hadoop技术详解 在过去的几年里,数据的存储.管理和处理发生了巨大的变化.各个公司存储的数据比以前更多,数据来源更加多样,数据格式也更加丰富.这不是因为我们变成了林鼠(译注:林鼠喜欢收集各种物品),而是因为我们想要创造出可以让我们进一步了解某一领域的产品.功能以及对其智能预测(这个领域可以是指用户.数据搜索.机器日志或者是某机构的任何信息).为了更好地服务其成员,各组织正在寻找新的方式来使用那些曾经被认为没有什么价值或者存储起来过于昂贵的数据.采集和存储数据只是其中的一部分工作

基于 Hive 的文件格式:RCFile 简介及其应用

Hadoop 作为MR 的开源实现,一直以动态运行解析文件格式并获得比MPP数据库快上几倍的装载速度为优势.不过,MPP数据库社区也一直批评Hadoop由于文件格式并非为特定目的而建,因此序列化和反序列化的成本过高. 1.hadoop 文件格式简介 目前 hadoop 中流行的文件格式有如下几种: (1)SequenceFile SequenceFile是Hadoop API 提供的一种二进制文件,它将数据以的形式序列化到文件中.这种二进制文件内部使用Hadoop 的标准的Writable 接口

Python中title()方法的使用简介

  这篇文章主要介绍了Python中title()方法的使用简介,是Python入门中的基础知识,需要的朋友可以参考下 title()方法返回所有单词的第一个字符大写的字符串的一个副本. 语法 以下是title()方法的语法: ? 1 str.title(); 参数 NA 返回值 此方法返回其中所有单词的前几个字符都是大写的字符串的一个副本. 例子 下面的例子显示了title()方法的使用. ? 1 2 3 4 #!/usr/bin/python   str = "this is string

shiro(1)-简介

简介 apache shiro 是一个功能强大和易于使用的Java安全框架,为开发人员提供一个直观而全面的的解决方案的认证,授权,加密,会话管理. 在实际应用中,它实现了应用程序的安全管理的各个方面. shiro的功能 apache shiro能做什么? 支持认证跨一个或多个数据源(LDAP,JDBC,kerberos身份等) 执行授权,基于角色的细粒度的权限控制. 增强的缓存的支持. 支持web或者非web环境,可以在任何单点登录(SSO)或集群分布式会话中使用. 主要功能是:认证,授权,会话

Tutum公司简介

2015年10月21日,由Tutum公司的CEO Borja Burgos对外宣布,Tutum与Docker公司正式合作,大家对Tutum和Docker的合作还是很期待的.下面我简单介绍一下Tutum公司. Tutum的历史 Tutum创立的时间很难确定.Tutum(拉丁语里安全的意思)的最初构思是在2012年秋季,它是作为Borja Burgos在卡内基梅隆大学(匹兹堡)的研究生课程和在日本兵库县大学的硕士论文,Tutum是一个可以帮助企业过渡到云的安全支持系统. 在2013年初,Tutum有

在应用中加入全文检索功能——基于Java的全文索引引擎Lucene简介

全文检索|索引 内容摘要: Lucene是一个基于Java的全文索引工具包. 基于Java的全文索引引擎Lucene简介:关于作者和Lucene的历史 全文检索的实现:Luene全文索引和数据库索引的比较 中文切分词机制简介:基于词库和自动切分词算法的比较 具体的安装和使用简介:系统结构介绍和演示 Hacking Lucene:简化的查询分析器,删除的实现,定制的排序,应用接口的扩展 从Lucene我们还可以学到什么 基于Java的全文索引/检索引擎--Lucene Lucene不是一个完整的全

Linux Namespace机制简介

最近Docker技术越来越受到关注,作为Docker中很重要的一项技术,Namespace也就经常在Docker的简介里面看到. 在这里总结一下它的内部机制.也解决一下自己原来的一些疑惑. Namespace是什么: C++中的Namespace: 首先,先提一下Namespace是什么.最早知道这个名词是在学习C++语言的时候.由于现在的系统越来越复杂,代码中不同的模块就可能使用相同变量,于是就出现了Namespace,来对全局作用域进行划分. 比如C++的标注库都定义在STD Namespa

Cloudera Manager简介

Hadoop家族 整个Hadoop家族由以下几个子项目组成: Hadoop Common: Hadoop体系最底层的一个模块,为Hadoop各子项目提供各 种工具,如:配置文件和日志操作等. HDFS: 是Hadoop应用程序中主要的分布式储存系统, HDFS集群包含了一个NameNode(主节点),这个节点负责管理所有文件系统的元数据及存储了真实数据的DataNode(数据节点,可以有很多).HDFS针对海量数据所设计,所以相比传统文件系统在大批量小文件上的优化,HDFS优化的则是对小批量大型