MapReduce中的InputFormat(2)自定义InputFormat

1 概述

Hadoop内置的输入文件格式类有:
1)FileInputFormat<K,V>这个是基本的父类,自定义就直接使用它作为父类。
2)TextInputFormat<LongWritable,Text>这个是默认的数据格式类。key代表当前行数据距离文件开始的距离,value代码当前行字符串。
3)SequenceFileInputFormat<K,V>这个是序列文件输入格式,使用序列文件可以提高效率,但是不利于查看结果,建议在过程中使用序列文件,最后展示可以使用可视化输出。
4)KeyValueTextInputFormat<Text,Text>这个是读取以Tab(也即是\t)分隔的数据,每行数据如果以\t分隔,那么使用这个读入,就可以自动把\t前面的当做key,后面的当做value。
5)CombineFileInputFormat<K,V>合并大量小数据是使用。
6)MultipleInputs,多种输入,可以为每个输入指定逻辑处理的Mapper。

2 运行轨迹

2.1 Mapper

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
	}
}

进入context.nextKeyValue()方法,从而进入WrappedMapper类。


2.2 WrappedMapper

public boolean nextKeyValue() throws IOException, InterruptedException{
      return mapContext.nextKeyValue();
}

进入该方法的nextKeyValue(),从而进入MapContextImpl类。


2.3 MapContextImpl

public boolean nextKeyValue() throws IOException, InterruptedException {
    return reader.nextKeyValue();
}

现希望知道reader具体类型是什么,先看reader的申明和赋值。

public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
    extends TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
    implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  private RecordReader<KEYIN,VALUEIN> reader;
  private InputSplit split;
  public MapContextImpl(Configuration conf, TaskAttemptID taskid,
                        RecordReader<KEYIN,VALUEIN> reader,
                        RecordWriter<KEYOUT,VALUEOUT> writer,
                        OutputCommitter committer,
                        StatusReporter reporter,
                        InputSplit split) {
    super(conf, taskid, writer, committer, reporter);
    this.reader = reader;
    this.split = split;
  }
}

此处看到是调用MapContextImpl构造方法进行赋值的,那么继续跟进看何处调用了MapContextImpl方法。右击MapContextImpl > open call Hierarchy。跟进一个方法叫runNewMapper可以看到,一步步看变量申明,就可以看到inputFormat就是我们代码中设置的InputFormat.class类型。

3 自定义InputFormat

基于文件的FileInputFormat的设计思想是:
A 由公共基类FileInputFormat采用统一的方法,对文件进行切分成InputSplit(如按照统一的大小)。getSplit方法。
B 由各个派生类根据自己的需求,解析InputSplit。即各个子类实现的createRecordReader方法。那么Input只需实现自定义createRecordReader方法即可。


3.1 MyInputFormat

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;;

public class MyInputFormat extends FileInputFormat<Text, Text> {
	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new MyRecordReader();
	}
}

3.2 MyRecordReader

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public class MyRecordReader extends RecordReader<Text, Text> {
	private LineReader lr;
	private Text key = new Text();
	private Text value = new Text();
	private long start;
	private long end;
	private long currentPos;
	private Text line = new Text();

	@Override
	public void initialize(InputSplit inputSplit, TaskAttemptContext cxt)
			throws IOException, InterruptedException {

		FileSplit split = (FileSplit) inputSplit;
		Configuration conf = cxt.getConfiguration();
		// 获取分片文件对应的完整文件
		Path path = split.getPath();
		FileSystem fs = path.getFileSystem(conf);
		FSDataInputStream is = fs.open(path);
		lr = new LineReader(is, conf);
		// 获取分片文件的启始位置
		start = split.getStart();
		// 获取分片文件的结束位置
		end = start + split.getLength();
		is.seek(start);
		if (start != 0) {
			start += lr.readLine(new Text(), 0,
					(int) Math.min(Integer.MAX_VALUE, end - start));
		}
		currentPos = start;
	}

	// 针对每行数据进行处理
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException{
		if (currentPos > end) {
			return false;
		}
		currentPos += lr.readLine(line);
		if (line.getLength() == 0) {
			return false;
		}
		// 若是需要被忽略的行,直接读下一行
		if (line.toString().startsWith("ignore")) {
			currentPos += lr.readLine(line);
		}
		String[] words = line.toString().split(",");
		if (words.length < 2) {
			System.err.println("line:" + line.toString() + ".");
			return false;
		}
		key.set(words[0]);
		value.set(words[1]);
		return true;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException{
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (currentPos - start) / (float) (end - start));
		}
	}

	@Override
	public void close() throws IOException {
		lr.close();
	}
} 

3.3 TestFormat

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestFormat extends Configured implements Tool {

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new TestFormat(), args);
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		String inPath = "hdfs://192.XXX.XXX.XXX:9000/test/bigFile.txt";
		String outPath = "hdfs://192.XXX.XXX.XXX:9000/test/out/";
		Path in = new Path(inPath);
		Path out = new Path(outPath);
		out.getFileSystem(conf).delete(out, true);

		Job job = Job.getInstance(conf, "fileintputformat test job");
		job.setJarByClass(getClass());
		job.setInputFormatClass(MyInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setMapperClass(Mapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setNumReduceTasks(0);

		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		return job.waitForCompletion(true) ? 0 : -1;
	}
}

参考地址:http://www.cnblogs.com/hyl8218/p/5198030.html

时间: 2024-10-25 11:30:22

MapReduce中的InputFormat(2)自定义InputFormat的相关文章

Hadoop MapReduce处理海量小文件:自定义InputFormat和RecordReader

一般来说,基于Hadoop的MapReduce框架来处理数据,主要是面向海量大数据,对于这类数据,Hadoop能够使其真正发挥其能力.对于海量小文件,不是说不能使用Hadoop来处理,只不过直接进行处理效率不会高,而且海量的小文件对于HDFS的架构设计来说,会占用NameNode大量的内存来保存文件的元数据(Bookkeeping).另外,由于文件比较小,我们是指远远小于HDFS默认Block大小(64M),比如1k~2M,都很小了,在进行运算的时候,可能无法最大限度地充分Locality特性带

MapReduce中的InputFormat(1)概述

1 概念InputFormat用于描述输入数据的格式,提供以下两个功能:A.数据切分:按照某种策略将输入的数据切分成若干split,以便确定Map Task个数,以及对应的Split.B.提供数据:为Mapper提供输入数据,对于给定split,能将其解析为<k,v>格式.即<K1,V1>. 2 新老版本 老版本:package org.apache.hadoop.mapred public interface InputFormat<K, V> { InputSpli

ASP.NET中利用DataGrid的自定义分页功能

asp.net|datagrid|分页 ASP.NET中利用DataGrid的自定义分页功能和存储过程结合实现高效分页 ASP.Net中的DataGrid有内置分页功能, 但是它的默认的分页方式效率是很低的,特别是在数据量很大的时候,用它内置的分页功能几乎是不可能的事,因为它会把所有的数据从数据库读出来再进行分页, 这种只选取了一小部分而丢掉大部分的方法是不可去取的. 在最进的一个项目中因为一个管理页面要管理的数据量非常大,所以必须分页显示,并且不能用DataGrid的内置分页功能,于是自己实现

Excel2007中创建或删除自定义数字格式

 Excel 2007提供了许多内置数字格式,但如果这些格式无法满足您的需要,您可以自定义内置数字格式以便创建自己的数字格式. 创建自定义数字格式 1.打开要创建并存储自定义数字格式的工作簿. 2.在"开始"选项卡上,单击"数字"旁边的"对话框启动器". 3.在"分类"框中,单击"自定义". 4.在"类型"列表中,选择要自定义的数字格式. 选择的数字格式将显示在"类型"

ios-在AppDelegate中能不能创建自定义的delegate?

问题描述 在AppDelegate中能不能创建自定义的delegate? 能不能在AppDelegate类中创建自定义的delegate?像下面实例这样: @protocol AppDelegateDelegate <NSObject>- (void)finishSync:(BOOL)success;@end@interface AppDelegate : UIResponder <UIApplicationDelegate> {@property (nonatomic weak)

android-Android自定义控件中为Button设置自定义监听器出现空指针。

问题描述 Android自定义控件中为Button设置自定义监听器出现空指针. Android自定义TopBar中给ImageButton设置自定义点击监听器,当点击ImageButton时在listener处出现NullPointerException. 会出错的地方我在后面写了//TODO . 请问为什么会出现listener没有被实例化的情况? 还有为什么在MainActivity中使用findViewById实例化TopBar也会出现不能实例化控件的情况? IDE : Android S

c#-C#中如何声明一个自定义类型的全局变量

问题描述 C#中如何声明一个自定义类型的全局变量 项目中添加一个Person.cs类文件定义了一个Person类想声明一个Person类型的全局变量应该如何做? 解决方案 在某个类中,声明 public static Person person

android中怎么实现一个自定义对话框

问题描述 android中怎么实现一个自定义对话框 android界面中,单击EditText弹出一个时间对话框, 解决方案 1.可以自定义对话框的布局 2.新建一个activity 设置android:theme="@style/mydialog" parent="@android:style/Theme.Dialog" ><br> <item name="android:windowNoTitle">true&l

mongodb-MongoDB中的mapReduce中reduce文件只作用在一个文档上

问题描述 MongoDB中的mapReduce中reduce文件只作用在一个文档上 小白刚接触MongoDB,看到mapReduce这里不是很清楚,自己编了一段代码,想做个计数,然后发觉map方法出来的value如果只有一个元素的时候,好像不参与到reduce函数的计算中.如: var map=function(){ emit(this.name{count:1})}var reduce=function(keyvalues){ var count=0; for(var i in values)