package com.mr.test; import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable> { @Override public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { CombineFileSplit combineFileSplit = (CombineFileSplit) split; CombineFileRecordReader<LongWritable, BytesWritable> recordReader = new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class); try { recordReader.initialize(combineFileSplit, context); } catch (InterruptedException e) { new RuntimeException("Error to initialize CombineSmallfileRecordReader."); } return recordReader; } }
package com.mr.test; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class CombineSmallfileRecordReader extends RecordReader<LongWritable, BytesWritable> { private CombineFileSplit combineFileSplit; private LineRecordReader lineRecordReader = new LineRecordReader(); private Path[] paths; private int totalLength; private int currentIndex; private float currentProgress = 0; private LongWritable currentKey; private BytesWritable currentValue = new BytesWritable(); public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException { super(); this.combineFileSplit = combineFileSplit; this.currentIndex = index; // 当前要处理的小文件Block在CombineFileSplit中的索引 } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.combineFileSplit = (CombineFileSplit) split; // 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据 FileSplit fileSplit = new FileSplit(combineFileSplit.getPath(currentIndex), combineFileSplit.getOffset(currentIndex), combineFileSplit.getLength(currentIndex), combineFileSplit.getLocations()); lineRecordReader.initialize(fileSplit, context); this.paths = combineFileSplit.getPaths(); totalLength = paths.length; context.getConfiguration().set("map.input.file.name", combineFileSplit.getPath(currentIndex).getName()); } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { currentKey = lineRecordReader.getCurrentKey(); return currentKey; } <strong><span style="color:#ff0000;"> @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { System.out.println("lineRecordReader:"+lineRecordReader.getCurrentValue().toString()); byte[] content = lineRecordReader.getCurrentValue().toString().getBytes(); System.out.println("content:"+new String(content)); currentValue = new BytesWritable(); currentValue.set(content, 0, content.length); System.out.println("currentValue:"+new String(currentValue.getBytes())); return currentValue; }</span></strong> public static void main(String args[]){ BytesWritable cv = new BytesWritable(); String str1 = "1234567"; String str2 = "123450"; cv.set(str1.getBytes(), 0, str1.getBytes().length); System.out.println(new String(cv.getBytes())); cv.setCapacity(0); cv.set(str2.getBytes(), 0, str2.getBytes().length); System.out.println(new String(cv.getBytes())); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (currentIndex >= 0 && currentIndex < totalLength) { return lineRecordReader.nextKeyValue(); } else { return false; } } @Override public float getProgress() throws IOException { if (currentIndex >= 0 && currentIndex < totalLength) { currentProgress = (float) currentIndex / totalLength; return currentProgress; } return currentProgress; } @Override public void close() throws IOException { lineRecordReader.close(); } }
更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/database/extra/
以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索hadoop
, mapreduce
, apache
, java hbase
, import
, netflix
, hbase二级索引
ioException
combineinputformat、hbase inputformat、kettle hbase input、hadoop inputformat、av find input format,以便于您获取更多的相关知识。
时间: 2024-09-13 05:14:17