背景
Hive query将运算好的数据写回hdfs(比如insert into语句),有时候会产生大量的小文件,如果不采用CombineHiveInputFormat就对这些小文件进行操作的话会产生大量的map task,耗费大量集群资源,而且小文件过多会对namenode造成很大压力。所以Hive在正常job执行完之后,会起一个conditional task,来判断是否需要合并小文件,如果满足要求就会另外启动一个map-only job 或者mapred job来完成合并
参数解释
hive.mergejob.maponly (默认为true)
如果hadoop版本支持CombineFileInputFormat,则启动Map-only job for merge,否则启动 MapReduce merge job,map端combine file是比较高效的做法
hive.merge.mapfiles(默认为true)
正常的map-only job后,是否启动merge job来合并map端输出的结果
hive.merge.mapredfiles(默认为false)
正常的map-reduce job后,是否启动merge job来合并reduce端输出的结果,建议开启
hive.merge.smallfiles.avgsize(默认为16MB)
如果不是partitioned table的话,输出table文件的平均大小小于这个值,启动merge job,如果是partitioned table,则分别计算每个partition下文件平均大小,只merge平均大小小于这个值的partition。这个值只有当hive.merge.mapfiles或hive.merge.mapredfiles设定为true时,才有效
hive.exec.reducers.bytes.per.reducer(默认为1G)
更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/database/extra/
如果用户不主动设置mapred.reduce.tasks数,则会根据input directory计算出所有读入文件的input summary size,然后除以这个值算出reduce number
reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
hive.merge.size.per.task(默认是256MB)
merge job后每个文件的目标大小(targetSize),用之前job输出文件的total size除以这个值,就可以决定merge job的reduce数目。merge job的map端相当于identity map,然后shuffle到reduce,每个reduce dump一个文件,通过这种方式控制文件的数量和大小
MapredWork work = (MapredWork) mrTask.getWork();
if (work.getNumReduceTasks() > 0) {
int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
int reducers = (int) ((totalSize +targetSize - 1) / targetSize);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
work.setNumReduceTasks(reducers);
}
mapred.max.split.size(默认256MB)
mapred.min.split.size.per.node(默认1 byte)
mapred.min.split.size.per.rack(默认1 byte)
这三个参数CombineFileInputFormat中会使用,Hive默认的InputFormat是CombineHiveInputFormat,里面所有的调用(包括最重要的getSplits和getRecordReader)都会转换成CombineFileInputFormat的调用,所以可以看成是它的一个包装。CombineFileInputFormat 可以将许多小文件合并成一个map的输入,如果文件很大,也可以对大文件进行切分,分成多个map的输入。一个CombineFileSplit对应一个map的输入,包含一组path(hdfs路径list),startoffset, lengths, locations(文件所在hostname list)mapred.max.split.size是一个split 最大的大小,mapred.min.split.size.per.node是一个节点上(datanode)split至少的大小,mapred.min.split.size.per.rack是同一个交换机(rack locality)下split至少的大小通过这三个数的调节,组成了一串CombineFileSplit用户可以通过增大mapred.max.split.size的值来减少Map Task数量
结论
hive 通过上述几个值来控制是否启动merge file job,通常是建议大家都开启,如果是一堆顺序执行的作业链,只有最后一张表需要固化落地,中间表用好就删除的话,可以在最后一个insert into table之前再开启,防止之前的作业也会launch merge job使得作业变慢。
上周还发现目前启动的针对RCFile的Block Merger在某种少见情况下,会生成duplicated files,Hive代码中本身已经考虑到这点,所以会在Merger Task RCFileMergeMapper的JobClose函数中调用Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx), 不过不知道为什么没有生效,还会存在重复文件,需要再研究下
Hive是否起merge job是由conditional task在运行时决定的,如果hadoop job或者hive未如预期般执行合并作业,则可以利用github上的file crush工具完成合并,它的原理也是启动一个mapreduce job完成合并,不过目前只支持textfile 和 sequencefile
链接地址:https://github.com/edwardcapriolo/filecrush