hadoop中OutputFormat 接口的设计与实现

OutputFormat 主要用于描述输出数据的格式,它能够将用户提供的 key/value 对写入特定格式的文件中。 本文将介绍 Hadoop 如何设计 OutputFormat 接口 , 以及一些常用的OutputFormat 实现。

1.旧版 API 的 OutputFormat 解析

如图所示, 在旧版 API 中,OutputFormat 是一个接口,它包含两个方法:

RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                     String name, Progressable progress)
                                             throws IOException;
void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

checkOutputSpecs 方法一般在用户作业被提交到 JobTracker 之前, 由 JobClient 自动调用,以检查输出目录是否合法。

getRecordWriter 方法返回一个 RecordWriter 类对象。 该类中的方法 write 接收一个key/value
对, 并将之写入文件。在 Task 执行过程中, MapReduce 框架会将 map() 或者reduce() 函数产生的结果传入 write
方法, 主要代码(经过简化)如下。假设用户编写的 map() 函数如下:

public void map(Text key, Text value,
         OutputCollector<Text, Text> output,
         Reporter reporter) throws IOException {
         // 根据当前 key/value 产生新的输出 <newKey, newValue>, 并输出
         ……
         output.collect(newKey, newValue);
}

则函数 output.collect(newKey, newValue) 内部执行代码如下:

RecordWriter<K, V> out = job.getOutputFormat().getRecordWriter(...);
out.write(newKey, newValue);

Hadoop 自带了很多OutputFormat 实现, 它们与 InputFormat 实现相对应,具体如图所示。所有基于文件的
OutputFormat 实现的基类为 FileOutputFormat, 并由此派生出一些基于文本文件格式、
二进制文件格式的或者多输出的实现。

为了深入分析OutputFormat的实现方法,选取比较有代表性的FileOutputFormat类进行分析。同介绍
InputFormat 实现的思路一样,我们先介绍基类FileOutputFormat,再介绍其派生类 TextOutputFormat。基类
FileOutputFormat 需要提供所有基于文件的 OutputFormat 实现的公共功能,总结起来,主要有以下两个:
(1) 实现 checkOutputSpecs 接口
该接口 在作业运行之前被调用, 默认功能是检查用户配置的输出目 录是否存在,如果存在则抛出异常,以防止之前的数据被覆盖。
(2) 处理 side-effect file

务的 side-effect file 并不是任务的最终输出文件,而是具有特殊用途的任务专属文件。 它的典型应用是执行推测式任务。 在
Hadoop 中,因为硬件老化、网络故障等原因,同一个作业的某些任务执行速度可能明显慢于其他任务,这种任务会拖慢整个作业的执行速度。为了对这种“
慢任务” 进行优化, Hadoop
会为之在另外一个节点上启动一个相同的任务,该任务便被称为推测式任务,最先完成任务的计算结果便是这块数据对应的处理结果。为防止这两个任务同
时往一个输出 文件中 写入数据时发生写冲突, FileOutputFormat会为每个 Task 的数据创建一个 side-effect
file,并将产生的数据临时写入该文件,待 Task完成后,再移动到最终输出目 录中。 这些文件的相关操作, 比如创建、删除、移动等,均由
OutputCommitter 完成。它是一个接口,Hadoop 提供了默认实现
FileOutputCommitter,用户也可以根据自己的需求编写 OutputCommitter 实现, 并通过参数
{mapred.output.committer.class} 指定。OutputCommitter 接口定义以及
FileOutputCommitter 对应的实现如表所示。

表-- OutputCommitter 接口定义以及 FileOutputCommitter 对应的实现

方法 何时被调用 FileOutputCommitter 实现
setupJob 作业初始化 创建临时目录 ${mapred.out.dir} /_temporary
commitJob 作业成功运行完成 删除临时目录,并在${mapred.out.dir} 目录下创建空文件_SUCCESS
abortJob 作业运行失败  删除临时目录
setupTask 任务初始化 不进行任何操作。原本是需要在临时目录下创建 side-effect file
的,但它是用时创建的(create on demand)
needsTaskCommit  判断是否需要提交结果 只要存在side-effect file,就返回 true
commitTask 任务成功运行完成 提交结果, 即将 side-effect file 移动到 ${mapred.out.dir} 目录下
abortTask 任务运行失败 删除任务的 side-effect file注意默认情况下,当作业成功运行完成后,会在最终结果目录 ${mapred.out.dir} 下生成

注意:默认情况下,当作业成功运行完成后,会在最终结果目录 ${mapred.out.dir} 下生成空文件 _SUCCESS。该文件主要为高层应用提供作业运行完成的标识,比如,Oozie 需要通过检测结果目 录下是否存在该文件判 断作业是否运行完成。

2. 新版 API 的 OutputFormat 解析

如图所示,除了接口变为抽象类外,新 API 中的 OutputFormat 增加了一个新的方法:getOutputCommitter,以允许用户自 己定制合适的 OutputCommitter 实现。

时间: 2024-09-17 07:20:39

hadoop中OutputFormat 接口的设计与实现的相关文章

hadoop中InputFormat 接口的设计与实现

InputFormat 主要用于描述输入数据的格式, 它提供以下两个功能. 数据切分:按照某个策略将输入数据切分成若干个 split, 以便确定 Map Task 个数以及对应的 split. 为 Mapper 提供输入数据: 给定某个 split, 能将其解析成一个个 key/value 对. 本文将介绍 Hadoop 如何设计 InputFormat 接口,以及提供了哪些常用的 InputFormat实现. 1 .旧版 API 的 InputFormat 解析 如图所示: 在旧版 API 中

hadoop中的序列化与Writable接口

简介 序列化和反序列化就是结构化对象和字节流之间的转换,主要用在内部进程的通讯和持久化存储方面. 通讯格式需求 hadoop在节点间的内部通讯使用的是RPC,RPC协议把消息翻译成二进制字节流发送到远程节点,远程节点再通过反序列化把二进制流转成原始的信息.RPC的序列化需要实现以下几点: 1.压缩,可以起到压缩的效果,占用的宽带资源要小. 2.快速,内部进程为分布式系统构建了高速链路,因此在序列化和反序列化间必须是快速的,不能让传输速度成为瓶颈. 3.可扩展的,新的服务端为新的客户端增加了一个参

hadoop中使用lzo的压缩

在hadoop中使用lzo的压缩算法可以减小数据的大小和数据的磁盘读写时间,不仅如此,lzo是基于block分块的,这样他就允许数据被分解成chunk,并行的被hadoop处理.这样的特点,就可以让lzo在hadoop上成为一种非常好用的压缩格式. lzo本身不是splitable的,所以当数据为text格式时,用lzo压缩出来的数据当做job的输入是一个文件作为一个map.但是 sequencefile本身是分块的,所以sequencefile格式的文件,再配上lzo的压缩格式,就可实现lzo

Hadoop中的Python框架的使用指南_python

 最近,我加入了Cloudera,在这之前,我在计算生物学/基因组学上已经工作了差不多10年.我的分析工作主要是利用Python语言和它很棒的科学计算栈来进行的.但Apache Hadoop的生态系统大部分都是用Java来实现的,也是为Java准备的,这让我很恼火.所以,我的头等大事变成了寻找一些Python可以用的Hadoop框架. 在这篇文章里,我会把我个人对这些框架的一些无关科学的看法写下来,这些框架包括:     Hadoop流     mrjob     dumbo     hadoo

Hadoop核心之HDFS 架构设计

概述:HDFS即Hadoop Distributed File System分布式文件系统,它的设计目标是把超大数据集存储到分布在网络中的多台普通商用计算机上,并且能够提供高可靠性和高吞吐量的服务.分布式文件系统要比普通磁盘文件系统复杂,因为它要引入网络编程,分布式文件系统要容忍节点故障也是一个很大的挑战. 设计前提和目标 专为存储超大文件而设计:hdfs应该能够支持GB级别大小的文件:它应该能够提供很大的数据带宽并且能够在集群中拓展到成百上千个节点:它的一个实例应该能够支持千万数量级别的文件.

基于.NET平台的分层架构实战(五)—接口的设计与实现

接下来,将进行接口的设计.这里包括数据访问层接口和业务逻辑层接口.在 分层架构中,接口扮演着非常重要的角色,它不但直接决定了各层中的各个操作 类需要实现何种操作,而且它明确了各个层次的职责.接口也是系统实现依赖注 入机制不可缺少的部分. 本项目的接口设计将按如下顺序进行: 1.首先由前文的需求分析,列出主要的UI部分. 2.分析各个UI需 要什么业务逻辑支持,从而确定业务逻辑层接口. 3.分析业务逻辑层接口 需要何种数据访问操作,从而确定数据访问层接口. 另外,为保证完全的 面向对象特性,接口之

hadoop中实现定制Writable类

Hadoop中有一套Writable实现可以满足大部分需求,但是在有些情况下,我们需要根据自己的需要构造一个新的实现,有了定制的Writable,我们就可以完全控制二进制表示和排序顺序. 为了演示如何新建一个定制的writable类型,我们需要写一个表示一对字符串的实现: blic class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public Te

Hadoop核心之MapReduce架构设计

Hadoop主要由两大部分组成,一个是分布式文件系统即HDFS,另一个是分布式计算框架MapReduce. 关于HDFS详细介绍请参考:[Hadoop核心之HDFS 架构设计] 本篇重点介绍分布式计算框架MapReduce.在Hadoop的MapReduce框架中主要涉及到两个组件:JobTracker和TaskTracker(HDFS中的组件是NameNode和DataNode),下面我们就分别看一下这两个组件. TaskTracker TaskTracker一个hadoop计算进程,运行在h

面向对象编程语言中的接口(Interface)

在大多面向对象的编程语言中都提供了Interface(接口)的概念.如果你事先学过这个概念,那么在谈到"接口测试"时,会不会想起这个概念来!?本篇文章简单介绍一下面向对象编程语言中的Interface.     Java中的Interface                                                                   在Java中定义接口使用interface关键字来声明,可以看做是一种特殊的抽象类,可以指定一个类必须做什么,而不