Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要hive 0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。
HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance
由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode
HiveMetaStoreClient.java
public String getDelegationToken(String owner, String renewerKerberosPrincipalName) throws MetaException, TException { if (localMetaStore) { throw new UnsupportedOperationException("getDelegationToken() can be " + "called only in thrift (non local) mode"); } return client.get_delegation_token(owner, renewerKerberosPrincipalName); }
HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表
HCatInputFormat API:
public static void setInput(Job job, InputJobInfo inputJobInfo) throws IOException;
先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据
public static HCatSchema getTableSchema(JobContext context) throws IOException;
在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息
HCatOutputFormat API:
public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;
OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map<String, String>,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values
比如要要写入指定的partition(dt='2013-06-13',country='china' ),可以这样写
Map<String, String> partitionValues = new HashMap<String, String>(); partitionValues.put("dt", "2013-06-13"); partitionValues.put("country", "china"); HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues); HCatOutputFormat.setOutput(job, info);
public static HCatSchema getTableSchema(JobContext context) throws IOException;
获取之前HCatOutputFormat.setOutput指定的table schema信息
public static void setSchema(final Job job, final HCatSchema schema) throws IOException;
设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息
更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/webkf/tools/
下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatInputFormat; import org.apache.hcatalog.mapreduce.HCatOutputFormat; import org.apache.hcatalog.mapreduce.InputJobInfo; import org.apache.hcatalog.mapreduce.OutputJobInfo; public class GroupByGuid extends Configured implements Tool { @SuppressWarnings("rawtypes") public static class Map extends Mapper<WritableComparable, HCatRecord, Text, IntWritable> { HCatSchema schema; Text guid; IntWritable one; @Override protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { guid = new Text(); one = new IntWritable(1); schema = HCatInputFormat.getTableSchema(context); } @Override protected void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException { guid.set(value.getString("guid", schema)); context.write(guid, one); } } @SuppressWarnings("rawtypes") public static class Reduce extends Reducer<Text, IntWritable, WritableComparable, HCatRecord> { HCatSchema schema; @Override protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException { schema = HCatOutputFormat.getTableSchema(context); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; Iterator<IntWritable> iter = values.iterator(); while (iter.hasNext()) { sum++; iter.next(); } HCatRecord record = new DefaultHCatRecord(2); record.setString("guid", schema, key.toString()); record.setInteger("count", schema, sum); context.write(null, record); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); String dbname = args[0]; String inputTable = args[1]; String filter = args[2]; String outputTable = args[3]; int reduceNum = Integer.parseInt(args[4]); Job job = new Job(conf, "GroupByGuid, Calculating every guid's pageview"); HCatInputFormat.setInput(job, InputJobInfo.create(dbname, inputTable, filter)); job.setJarByClass(GroupByGuid.class); job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(WritableComparable.class); job.setOutputValueClass(DefaultHCatRecord.class); job.setNumReduceTasks(reduceNum); HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbname, outputTable, null)); HCatSchema s = HCatOutputFormat.getTableSchema(job); HCatOutputFormat.setSchema(job, s); job.setOutputFormatClass(HCatOutputFormat.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new GroupByGuid(), args); System.exit(exitCode); } }
其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了
作者:csdn博客 lalaguozhe
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索mapreduce
, apache
, metastore
, schema
, exitcode
, job
, import
, hive mapreduce
ioException
hcatalog 简介、hive hcatalog 使用、hcatalog 使用、hcatalog、hive hcatalog,以便于您获取更多的相关知识。