1 概述
Hadoop内置的输入文件格式类有:
1)FileInputFormat<K,V>这个是基本的父类,自定义就直接使用它作为父类。
2)TextInputFormat<LongWritable,Text>这个是默认的数据格式类。key代表当前行数据距离文件开始的距离,value代码当前行字符串。
3)SequenceFileInputFormat<K,V>这个是序列文件输入格式,使用序列文件可以提高效率,但是不利于查看结果,建议在过程中使用序列文件,最后展示可以使用可视化输出。
4)KeyValueTextInputFormat<Text,Text>这个是读取以Tab(也即是\t)分隔的数据,每行数据如果以\t分隔,那么使用这个读入,就可以自动把\t前面的当做key,后面的当做value。
5)CombineFileInputFormat<K,V>合并大量小数据是使用。
6)MultipleInputs,多种输入,可以为每个输入指定逻辑处理的Mapper。
2 运行轨迹
2.1 Mapper
public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
进入context.nextKeyValue()方法,从而进入WrappedMapper类。
2.2 WrappedMapper
public boolean nextKeyValue() throws IOException, InterruptedException{ return mapContext.nextKeyValue(); }
进入该方法的nextKeyValue(),从而进入MapContextImpl类。
2.3 MapContextImpl
public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); }
现希望知道reader具体类型是什么,先看reader的申明和赋值。
public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT> implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private RecordReader<KEYIN,VALUEIN> reader; private InputSplit split; public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader<KEYIN,VALUEIN> reader, RecordWriter<KEYOUT,VALUEOUT> writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader; this.split = split; } }
此处看到是调用MapContextImpl构造方法进行赋值的,那么继续跟进看何处调用了MapContextImpl方法。右击MapContextImpl > open call Hierarchy。跟进一个方法叫runNewMapper可以看到,一步步看变量申明,就可以看到inputFormat就是我们代码中设置的InputFormat.class类型。
3 自定义InputFormat
基于文件的FileInputFormat的设计思想是:
A 由公共基类FileInputFormat采用统一的方法,对文件进行切分成InputSplit(如按照统一的大小)。getSplit方法。
B 由各个派生类根据自己的需求,解析InputSplit。即各个子类实现的createRecordReader方法。那么Input只需实现自定义createRecordReader方法即可。
3.1 MyInputFormat
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;; public class MyInputFormat extends FileInputFormat<Text, Text> { @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new MyRecordReader(); } }
3.2 MyRecordReader
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; public class MyRecordReader extends RecordReader<Text, Text> { private LineReader lr; private Text key = new Text(); private Text value = new Text(); private long start; private long end; private long currentPos; private Text line = new Text(); @Override public void initialize(InputSplit inputSplit, TaskAttemptContext cxt) throws IOException, InterruptedException { FileSplit split = (FileSplit) inputSplit; Configuration conf = cxt.getConfiguration(); // 获取分片文件对应的完整文件 Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); FSDataInputStream is = fs.open(path); lr = new LineReader(is, conf); // 获取分片文件的启始位置 start = split.getStart(); // 获取分片文件的结束位置 end = start + split.getLength(); is.seek(start); if (start != 0) { start += lr.readLine(new Text(), 0, (int) Math.min(Integer.MAX_VALUE, end - start)); } currentPos = start; } // 针对每行数据进行处理 @Override public boolean nextKeyValue() throws IOException, InterruptedException{ if (currentPos > end) { return false; } currentPos += lr.readLine(line); if (line.getLength() == 0) { return false; } // 若是需要被忽略的行,直接读下一行 if (line.toString().startsWith("ignore")) { currentPos += lr.readLine(line); } String[] words = line.toString().split(","); if (words.length < 2) { System.err.println("line:" + line.toString() + "."); return false; } key.set(words[0]); value.set(words[1]); return true; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException{ return value; } @Override public float getProgress() throws IOException, InterruptedException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (currentPos - start) / (float) (end - start)); } } @Override public void close() throws IOException { lr.close(); } }
3.3 TestFormat
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class TestFormat extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new TestFormat(), args); } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); String inPath = "hdfs://192.XXX.XXX.XXX:9000/test/bigFile.txt"; String outPath = "hdfs://192.XXX.XXX.XXX:9000/test/out/"; Path in = new Path(inPath); Path out = new Path(outPath); out.getFileSystem(conf).delete(out, true); Job job = Job.getInstance(conf, "fileintputformat test job"); job.setJarByClass(getClass()); job.setInputFormatClass(MyInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(Mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); return job.waitForCompletion(true) ? 0 : -1; } }
参考地址:http://www.cnblogs.com/hyl8218/p/5198030.html