前几天有DW用户反馈,在往一张表(RCFile表)中用“insert overwrite table partition(xx) select ...” 插入数据的时候,会产生重复文件。看了下这个作业log,发现map task 000005起了两个task attempt ,第二个attempt是推测执行,并且这两个attemp都在task close函数里面重命名temp文件成正式文件,而不是通过mapreduce框架的两阶段提交协议(two phrase commit protocol)在收到tasktracker发过来的commitTaskAction时再commit task来保证只有一个attemp的结果成为正式结果。
task log中的输出如下:
attempt_201304111550_268224_m_000005_0 renamed path hdfs://10.2.6.102/tmp/hive-deploy/hive_2013-05-30_10-13-59_124_8643833043783438119/_task_tmp.-ext-10000/hp_cal_month=2013-04/_tmp.000005_0 to hdfs://10.2.6.102/tmp/hive-deploy/hive_2013-05-30_10-13-59_124_8643833043783438119/_tmp.-ext-10000/hp_cal_month=2013-04/000005_0 . File size is 666922 attempt_201304111550_268234_m_000005_1 renamed path hdfs://10.2.6.102/tmp/hive-deploy/hive_2013-05-30_10-13-59_124_8643833043783438119/_task_tmp.-ext-10000/hp_cal_month=2013-04/_tmp.000005_1 to hdfs://10.2.6.102/tmp/hive-deploy/hive_2013-05-30_10-13-59_124_8643833043783438119/_tmp.-ext-10000/hp_cal_month=2013-04/000005_1 . File size is 666922
其实这条hive语句原本只会起1个job(Launching Job 1 out of 1),当第一个job结束的时候,会起一个conditional task分析每个partition下平均文件的大小,如果小于hive.merge.smallfiles.avgsize(默认为16MB), 第一个job又是map-only job,并且打开hive.merge.mapfiles(默认为true) ,则会另外起一个merge-file job来合并小文件,第二个job中用到RCFileMergeMapper就是合并之前生成的小文件的。workaround的方式有两种,一种是关闭推测执行,不过会有可能有一个task比较慢导致瓶颈,另一种是关闭merge file job(set hive.merge.mapfiles=fasle),从而不使用RCFileMergeMapper,不过这样产生大量的小文件就无法合并。
如果解析出来是要起merge job,会创建一个BlockMergeTask(继承自Task),执行里面的execute方法,先设置好JobConf相应的参数,比如mapred.mapper.class, hive.rcfile.merge.output.dir等,然后创建一个JobClient并submitJob,map的执行逻辑在RCFileMergeMapper类中,它继承了老的MapRed API中的抽象类MapReduceBase,覆盖了configure和close方法,前面提到的rename操作就是在close方法中,MapRunner类中的run方法会循环调用真正执行mapper的map方法,并在finally调用mapper的close方法
public void close() throws IOException { // close writer if (outWriter == null) { return; } outWriter.close(); outWriter = null; if (!exception) { FileStatus fss = fs.getFileStatus(outPath); LOG.info("renamed path " + outPath + " to " + finalPath + " . File size is " + fss.getLen()); if (!fs.rename(outPath, finalPath)) { throw new IOException("Unable to rename output to " + finalPath); } } else { if (!autoDelete) { fs.delete(outPath, true); } } }
job执行完成后,是有可能有同一task的不同attempt产生结果文件同时存在的,不过hive显然考虑到了这点,所以在merge作业执行后会调用RCFileMergeMapper.jobClose方法,它会先备份输出目录,然后将数据写入输出目录并调用Utilities.removeTempOrDuplicateFiles方法来删除重复文件,删除的逻辑是从文件名中提取taskid,如果同一个taskid有两个文件,则会将小的那个删除,不过在0.9版本中,RCFileMergeMapper对于目标表是动态分区表情况下不支持,所以还会有duplicated files,打上patch(https://issues.apache.org/jira/browse/HIVE-3149?attachmentOrder=asc)后解决问题
RCFileMergeMapper execute方法finally处理逻辑,源代码中catch住exception后无任何处理,我加了一些stack trace输出和设置return value
finally { try { if (ctxCreated) { ctx.clear(); } if (rj != null) { if (returnVal != 0) { rj.killJob(); } HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID()); jobID = rj.getID().toString(); } RCFileMergeMapper.jobClose(outputPath, success, job, console, work.getDynPartCtx()); } catch (Exception e) { console.printError("RCFile Merger Job Close Error", "n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); e.printStackTrace(System.err); success = false; returnVal = -500; } }
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索文件
, deploy path
, hive
, 方法
, sqoop hive mysql
, merge
, close
, 一个
fs.rename方法
hive rcfile、hive rcfile 压缩、hive rcfile orcfile、hive rcfile 导入、hive rcfilecat,以便于您获取更多的相关知识。