1 概念
InputFormat用于描述输入数据的格式,提供以下两个功能:
A、数据切分:按照某种策略将输入的数据切分成若干split,以便确定Map Task个数,以及对应的Split。
B、提供数据:为Mapper提供输入数据,对于给定split,能将其解析为<k,v>格式。即<K1,V1>。
2 新老版本
老版本:package org.apache.hadoop.mapred
public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split,JobConf job, Reporter reporter) throws IOException; }
新版本:package org.apache.hadoop.mapreduce
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException; }
3 解析
3.1 设计思想
所有基于文件的InputFormat的实现基类都是FileInputFormat。
针对文本格式:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat
针对二进制格式:SequenceFileInputFormat
基于文件的FileInputFormat的设计思想是:
A 由公共基类FileInputFormat采用统一的方法,对文件进行切分成InputSplit(如按照统一的大小)。getSplit方法。
B 由各个派生类根据自己的需求,解析InputSplit。即各个子类实现的createRecordReader方法。
3.2 getSplits
主要完成数据切分的功能,它会尝试着将输入数据切分为numSplit个inputSplit。有以下两个特点:
A、逻辑分片:inputSplit只记录分片的元信息。
B、可序列化:为了进程间通信。
在Hadoop1.X在JobClient的中writeNewSplits方法使用了getSplits。
// 通过反射获得设置的inputFormat.class的inputFormat对象 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); // 获取逻辑分片信息 List<InputSplit> splits = input.getSplits(job);
3.3 getRecordReader
该方法返回一个RecordReader对象,它实现了类似迭代器的功能,将某个split解析为一个个<k,v>对。该类需要考虑以下两点:
A、定位边界记录:为了识别一条完整的记录,记录之间要加上一些同步标志。
对于TextInputFormat:同步标识就是换行符。
对于SequenceFileInputFormat:每隔离若干条记录,会添加固定长度同步字符串。
B、解析<k,v>:定位到一条记录后,需要将该记录分解为key和value两部分。
对于TextInputFormat:key就是该行在文件的中的偏移量,value就是该行的内容。
对于SequenceFileInputFormat: 每条记录的格式为[record length] [key length] [key] [value]。
前两个字段分别是整条记录的长度和key的长度,均为4个字节,后半部分分别是key和value的内容。知道每条记录的格式后,很容易解析。
整理自董西成老师的《Hadoop技术内幕》,并阅读源码小有体会。