本文节选于清华大学出版社推出的《Hadoop权威指南》一书,作者为Tom White,译者是华东师范大学数据科学与工程学院。本书从Hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍Hadoop这一高性能处理海量数据集的理想工具。全书共16章,3个附录,涉及的主题包括:Haddoop;MapReduce;Hadoop分布式文件系统;Hadoop的I/O、MapReduce应用程序开发;MapReduce的工作机制;MapReduce的类型和格式;MapReduce的特性;如何构建Hadoop集群,如何管理Hadoop;Pig;HBase;Hive;ZooKeeper;开源工具Sqoop,最后还提供了丰富的案例分析。本书是Hadoop权威参考,程序员可从中探索如何分析海量数据集,管理员可以从中了解如何安装与运行Hadoop集群。
下文包括了第二章的所有内容:
2.1 气象数据集
2.2 使用Unix工具来分析数据
2.3 使用Hadoop来分析数据
2.4 横向扩展
2.5 Hadoop Streaming
2.6 Hadoop Pipes
MapReduce是一种可用于数据处理的编程模型。该模型比较简单,但要想写出有用的程序却不太容易。Hadoop可以运行各种语言版本的MapReduce程序。在本章中,我们将看到同一个程序的Java、Ruby、Python 和C++语言版本。最重要的是,MapReduce程序本质上是并行运行的,因此可以将大规模的数据分析任务分发给任何一个拥有足够多机器的数据中心。MapReduce的优势在于处理大规模数据集,所以这里先来看一个数据集。
2.1 气象数据集
在我们的例子里,要写一个挖掘气象数据的程序。分布在全球各地的很多气象传感器每隔一小时收集气象数据和收集大量日志数据,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用MapReduce来分析。
数据格式
我们使用的数据来自美国国家气候数据中心(National Climatic Data Center,简称NCDC,http://www.ncdc.noaa.gov/)。这些数据按行并以ASCII格式存储,其中每一行是一条记录。该存储格式支持丰富的气象要素,其中许多要素可以选择性地列入收集范围或其数据所需的存储长度是可变的。为了简单起见,我们重点讨论一些基本要素(比如气温),这些要素始终都有而且长度都是固定的。
范例2-1显示了一行采样数据,其中重要字段被突出显示。该行数据被分成很多行以突出每个字段,但在实际文件中,这些字段被整合成一行且没有任何分隔符。
0057 332130 # USAF weather station identifier 99999 # WBAN weather station identifier 19500101 # observation date 0300 # observation time 4 +51317 # latitude (degrees x 1000) +028783 # longitude (degrees x 1000) FM-12 +0171 # elevation (meters) 99999 V020 320 # wind direction (degrees) 1 # quality code N 0072 1 00450 # sky ceiling height (meters) 1 # quality code C N 010000 # visibility distance (meters) 1 # quality code N 9 -0128 # air temperature (degrees Celsius x 10) 1 # quality code -0139 # dew point temperature (degrees Celsius x 10) 1 # quality code 10268 # atmospheric pressure (hectopascals x 10) 1 # quality code数据文件按照日期和气象站进行组织。从1901 年到2001 年,每一年都有一个目录,每一个目录中包含各个气象站该年气象数据的打包文件及其说明文件。例如,1999年对应文件夹下面就包含下面的记录:
% ls raw/1990 | head 010010-99999-1990.gz 010014-99999-1990.gz 010015-99999-1990.gz 010016-99999-1990.gz 010017-99999-1990.gz 010030-99999-1990.gz 010040-99999-1990.gz 010080-99999-1990.gz 010100-99999-1990.gz 010150-99999-1990.gz因为有成千上万个气象台,所以整个数据集由大量的小文件组成。通常情况下,处理少量的大型文件更容易、更有效,因此,这些数据需要经过预处理,将每年的数据文件拼接成一个单独的文件。具体做法请参见附录C。
2.2 使用Unix工具来分析数据
在这个数据集中,每年全球气温的最高记录是多少?我们先不使用Hadoop来解决这个问题,因为只有提供了性能基准和结果检查工具,才能和Hadoop进行有效对比。
传统处理按行存储数据的工具是awk。范例2-2是一个程序脚本,用于计算每年的最高气温。
范例2-2. 该程序从NCDC气象记录中找出每年最高气温
#!/usr/bin/env bash for year in all/* do echo -ne `basename $year .gz`"\t" gunzip -c $year | \ awk'{ temp = substr($0, 88, 5) + 0; q = substr($0, 93, 1); if ( temp!=9999 && q ~ /[01459]/ && temp > max) max = temp} END { print max }' done这个脚本循环遍历按年压缩的数据文件,首先显示年份,然后使用awk处理每一个文件。awk从数据中提取两个字段:气温和质量代码。气温值加0后转换为整数。接着测试气温值是否有效(用9999替代NCDC 数据集中的缺失的值),通过质量代码来检测读取的数值是否有疑问或错误。如果数据读取正确,那么该值将与目前读取到的最大气温值进行比较,如果该值比原先的最大值大,就替换目前的最大值。处理完文件中所有的行后,再执行END块中的代码并在屏幕上输出最大气温值。
下面是某次运行结果的起始部分:
% ./max_temperature.sh 1901 317 1902 244 1903 289 1904 256 1905 283 ...由于源文件中的气温值被放大10倍,所以1901年的最高气温是31.7℃(20世纪初记录的气温数据比较少,所以这个结果也是可能的)。使用亚马逊的EC2High-CPU Extra Large Instance运行这个程序,只需要42分钟就可以处理完一个世纪的气象数据,找出最高气温。
为了加快处理速度,我们需要并行处理程序来进行数据分析。从理论上讲,这很简单:我们可以使用计算机上所有可用的硬件线程(hardware thread)来处理,每个线程负责处理不同年份的数据。但这样做仍然存在一些问题。
首先,将任务划分成大小相同的作业通常并不是一件容易的事情。在我们这个例子中,不同年份数据文件的大小差异很大,所以有一部分线程会比其他线程更早结束运行。即使可以再为它们分配下一个作业,但总的运行时间仍然取决于处理最长文件所需要的时间。另一种更好的方法是将输入数据分成固定大小的块(chunk),然后每块分到各个进程去执行,这样一来,即使有一些进程可以处理更多数据,我们也可以为它们分配更多的数据。
其次,合并各个独立进程的运行结果,可能还需要额外进行处理。在我们的例子中,每年的结果独立于其他年份,所以可能需要把所有结果拼接起来,然后再按年份进行排序。如果使用固定块大小的方法,则需要一种精巧的方法来合并结果。在这个例子中,某年的数据通常被分割成几个块,每个块独立处理。我们最终获得每个块的最高气温,所以最后一步找出最大值作为该年的最高气温,其他年份的数据都像这样处理。
最后,还是得受限于单台计算机的处理能力。即便开足马力,用上所有处理器,至少也得花20分钟,系统无法更快了。另外,某些数据集的增长可能会超出单台计算机的处理能力。一旦开始使用多台计算机,整个大环境中的其他因素就会互相影响,最主要的两个因素是协调性和可靠性。哪个进程负责运行整个作业?我们如何处理失败的进程?
因此,虽然并行处理是可行的,不过实际上也很麻烦。使用Hadoop这样的框架来解决这些问题很有帮助。
2.3 使用Hadoop来分析数据
为了充分利用Hadoop 提供的并行处理优势,我们需要将查询表示成MapReduce 作业。完成某种本地端的小规模测试之后,就可以把作业部署到在集群上运行。
2.3.1 map和reduce
MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键值对作为输入和输出,其类型由程序员来选择。程序员还需要写两个函数:map函数和reduce 函数。
map阶段的输入是NCDC原始数据。我们选择文本格式作为输入格式,将数据集的每一行作为文本输入。键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。
我们的map函数很简单。由于我们只对年份和气温属性感兴趣,所以只需要取出这两个字段数据。在本例中,map函数只是一个数据准备阶段,通过这种方式来准备数据,使reducer函数能够继续对它进行处理:即找出每年的最高气温。map函数还是一个比较适合去除已损记录的地方:此处,我们筛掉缺失的、可疑的或错误的气温数据。
为了全面了解map 的工作方式,我们考虑以下输入数据的示例数据(考虑到篇幅,去除了一些未使用的列,并用省略号表示):
0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+00781+99999999999...这些行以键/值对的方式作为map函数的输入:
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...) (106, 0043011990999991950051512004...9999999N9+00221+99999999999...) (212, 0043011990999991950051518004...9999999N9-00111+99999999999...) (318, 0043012650999991949032412004...0500001N9+01111+99999999999...) (424, 0043012650999991949032418004...0500001N9+00781+99999999999...)键(key)是文件中的行偏移量,map函数并不需要这个信息,所以将其忽略。map函数的功能仅限于提取年份和气温信息(以粗体显示),并将它们作为输出(气温值已用整数表示):
(1950, 0)(1950, 22)(1950, −11) (1949, 111) (1949, 78)map函数的输出经由MapReduce框架处理后,最后发送到reduce函数。这个处理过程基于键来对键值对进行排序和分组。因此,在这一示例中,reduce函数看到的是如下输入:
(1949, [111, 78]) (1950, [0, 22, −11])每一年份后紧跟着一系列气温数据。reduce函数现在要做的是遍历整个列表并从中找出最大的读数:
(1949, 111) (1950, 22)
这是最终输出结果:每一年的全球最高气温记录。
整个数据流如图2-1所示。在图的底部是Unix管线,用于模拟整个MapReduce的流程,部分内容将在讨论Hadoop Streaming时再次涉及。
图2-1. MapReduce的逻辑数据流
2.3.2 Java MapReduce
明白MapReduce 程序的工作原理之后,下一步就是写代码实现它。我们需要三样东西:一个map 函数、一个reduce 函数和一些用来运行作业的代码。map函数由Mapper 类实现来表示,后者声明一个map()虚方法。范例2-3显示了我们的map函数实现。
范例2-3. 查找最高气温的Mapper类
import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } }}
这个Mapper类是一个泛型类型,它有四个形参类型,分别指定map函数的输入键、输入值、输出键和输出值的类型。就现在这个例子来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。Hadoop本身提供了一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型都在org.apache.hadoop.io包中。这里使用LongWritable类型(相当于Java的Long类型)、Text类型(相当于Java中的String类型)和IntWritable类型(相当于Java的Integer类型)。
map()方法的输入是一个键和一个值。我们首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取我们感兴趣的列。
map()方法还提供了Context实例用于输出内容的写入。在这种情况下,我们将年份数据按Text对象进行读/写 (因为我们把年份当作键),将气温值封装在IntWritable 类型中。只有气温数据不缺并且所对应质量代码显示为正确的气温读数时,这些数据才会被写入输出记录中。
以类似方法用Reducer来定义reduce函数,如范例2-4所示。
范例2-4. 查找最高气温的Reducer类
import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); }}
同样,reduce函数也有四个形式参数类型用于指定输入和输出类型。reduce 函数的输入类型必须匹配map 函数的输出类型:即Text类型和IntWritable类型。在这种情况下,reduce函数的输出类型也必须是Text和IntWritable类型,分别输出年份及其最高气温。这个最高气温是通过循环比较每个气温与当前所知最高气温所得到的。
第三部分代码负责运行MapReduce 作业(请参见范例2-5)。
范例2-5. 这个应用程序在气象数据集中找出最高气温
import java.io.IOException;import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.input.FileOutputFormat; import org.apache.hadoop.mapredduce.input.FileOutputFormatpublic class MaxTemperature { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }}Job对象指定作业执行规范。我们可以用它来控制整个作业的运行。我们在Hadoop 集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop在集群上发布这个文件)。不必明确指定JAR文件的名称,在Job对象的setJarByClass()方法中传递一个类即可,Hadoop利用这个类来查找包含它的JAR文件,进而找到相关的JAR文件。
构造Job对象之后,需要指定输入和输出数据的路径。调用 FileInputFormat类的静态方法addInputPath()来定义输入数据的路径,这个路径可以是单个的文件、一个目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一系列文件。由函数名可知,可以多次调用addInputPath()来实现多路径的输入。
调用FileOutputFormat 类中的静态方法 setOutputPath()来指定输出路径(只能有一个输出路径)。这个方法指定的是reduce 函数输出文件的写入目录。在运行作业前该目录是不应该存在的,否则Hadoop 会报错并拒绝运行作业。这种预防措施的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是非常恼人的)。
接着,通过setMapperClass()和setReducerClass()指定map类型和reduce类型。
setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型,正如本例所示,这两个输出类型一般都是相同的。如果不同,则通过setMapOutputKeyClass()和setMapOutputValueClass()来设置map函数的输出类型。
输入的类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。
在设置定义map 和reduce 函数的类之后,可以开始运行作业。Job中的waitForCompletion()方法提交作业并等待执行完成。该方法中的布尔参数是个详细标识,所以作业会把进度写到控制台。
waitForCompletion()方法返回一个布尔值,表示执行的成 (true)败(false),这个布尔值被转换成程序的退出代码0或者1。
2.3.1.1 运行测试
写好MapReduce 作业之后,通常要拿一个小型数据集进行测试以排除代码问题。首先,以独立(本机)模式安装Hadoop,详细说明请参见附录A。在这种模式下,Hadoop在本地文件系统上运行作业程序。然后,使用本书网站上的指令安装和编译示例。
以前面讨过的5行采样数据为例来测试MapReduce作业(考虑到篇幅,这里对输出稍有修改):
% export HADOOP_CLASSPATH=hadoop-examples.jar% hadoop MaxTemperature input/ncdc/sample.txt output12/02/04 11:50:41 WARN util.NativeCodeLoader: Unable to load native-hadoop libraryfor your platform... using builtin-java classes where applicable12/02/04 11:50:41 WARN mapred.JobClient: Use GenericOptionsParser for parsing thearguments. Applications should implement Tool for the same.12/02/04 11:50:41 INFO input.FileInputFormat: Total input paths to process : 112/02/04 11:50:41 INFO mapred.JobClient: Running job: job_local_000112/02/04 11:50:41 INFO mapred.Task: Using ResourceCalculatorPlugin : null12/02/04 11:50:41 INFO mapred.MapTask: io.sort.mb = 10012/02/04 11:50:42 INFO mapred.MapTask: data buffer = 79691776/9961472012/02/04 11:50:42 INFO mapred.MapTask: record buffer = 262144/32768012/02/04 11:50:42 INFO mapred.MapTask: Starting flush of map output12/02/04 11:50:42 INFO mapred.MapTask: Finished spill 012/02/04 11:50:42 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting12/02/04 11:50:42 INFO mapred.JobClient: map 0% reduce 0%12/02/04 11:50:44 INFO mapred.LocalJobRunner:12/02/04 11:50:44 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.12/02/04 11:50:44 INFO mapred.Task: Using ResourceCalculatorPlugin : null12/02/04 11:50:44 INFO mapred.LocalJobRunner:12/02/04 11:50:44 INFO mapred.Merger: Merging 1 sorted segments12/02/04 11:50:44 INFO mapred.Merger: Down to the last merge-pass, with 1 segmentsleft of total size: 57 bytes12/02/04 11:50:44 INFO mapred.LocalJobRunner:12/02/04 11:50:45 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. Andis in the process of commiting12/02/04 11:50:45 INFO mapred.LocalJobRunner:12/02/04 11:50:45 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed tocommit now12/02/04 11:50:45 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to output12/02/04 11:50:45 INFO mapred.JobClient: map 100% reduce 0%12/02/04 11:50:47 INFO mapred.LocalJobRunner: reduce > reduce12/02/04 11:50:47 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.12/02/04 11:50:48 INFO mapred.JobClient: map 100% reduce 100%12/02/04 11:50:48 INFO mapred.JobClient: Job complete: job_local_000112/02/04 11:50:48 INFO mapred.JobClient: Counters: 1712/02/04 11:50:48 INFO mapred.JobClient: File Output Format Counters12/02/04 11:50:48 INFO mapred.JobClient: Bytes Written=2912/02/04 11:50:48 INFO mapred.JobClient: FileSystemCounters12/02/04 11:50:48 INFO mapred.JobClient: FILE_BYTES_READ=35750312/02/04 11:50:48 INFO mapred.JobClient: FILE_BYTES_WRITTEN=42581712/02/04 11:50:48 INFO mapred.JobClient: File Input Format Counters12/02/04 11:50:48 INFO mapred.JobClient: Bytes Read=52912/02/04 11:50:48 INFO mapred.JobClient: Map-Reduce Framework12/02/04 11:50:48 INFO mapred.JobClient: Map output materialized bytes=6112/02/04 11:50:48 INFO mapred.JobClient: Map input records=512/02/04 11:50:48 INFO mapred.JobClient: Reduce shuffle bytes=012/02/04 11:50:48 INFO mapred.JobClient: Spilled Records=1012/02/04 11:50:48 INFO mapred.JobClient: Map output bytes=4512/02/04 11:50:48 INFO mapred.JobClient: Total committed heap usage (bytes)=36923801612/02/04 11:50:48 INFO mapred.JobClient: SPLIT_RAW_BYTES=12912/02/04 11:50:48 INFO mapred.JobClient: Combine input records=012/02/04 11:50:48 INFO mapred.JobClient: Reduce input records=512/02/04 11:50:48 INFO mapred.JobClient: Reduce input groups=212/02/04 11:50:48 INFO mapred.JobClient: Combine output records=012/02/04 11:50:48 INFO mapred.JobClient: Reduce output records=212/02/04 11:50:48 INFO mapred.JobClient: Map output records=5如果调用hadoop命令的第一个参数是类名,Hadoop就会启动一个JVM(Java虚拟机)来运行这个类。使用hadoop命令运行作业比直接使用Java命令来运行更方便,因为前者将Hadoop库文件(及其依赖关系)路径加入到类路径参数中,同时也能获得Hadoop的配置文件。需要定义一个HADOOP_CLASSPATH 环境变量用于添加应用程序类的路径,然后由Hadoop 脚本来执行相关操作。
以本地(独立)模式运行时,本书中所有程序均假设按照这种方式来设置HADOOP_CLASSPATH。命令的运行需要在范例代码所在的文件夹下进行。
运行作业所得到的输出提供了一些有用的信息。例如,我们可以看到,这个作业有指定的标识,即job_local_0001,并且执行了一个map 任务和一个reduce 任务(使用attempt_local_0001_m_000000_0和attempt_ local_0001_r_000000_0两个ID)。在调试MapReduce作业时,知道作业ID和任务ID 是非常有用的。
输出的最后一部分,以Counters为标题,显示Hadoop 上运行的每个作业的一些统计信息。这些信息对检查数据是否按照预期进行处理非常有用。例如,我们查看系统输出的记录信息可知:5个map输入产生了5个map输出,然后5个reduce 输入产生2个reduce 输出。
输出数据写入output目录,其中每个reducer都有一个输出文件。我们的例子中只有一个 reducer,所以只能找到一个名为part-00000的文件:
% cat output/part-00000 1949 111 1950 22这个结果和我们之前手动寻找的结果一样。我们把这个结果解释为1949年的最高气温记录为11.1℃,而1950年为2.2℃。
2.3.1.2 旧的和新的Java MapReduce API
前一小节中使用的Java MapReduce API率先在Hadoop 0.20.0中发布。这一新的API,有时也称为“上下文对象”(context object),设计意图是使API日后更容易扩展。新API 在类型上不兼容旧的API,所以需要重写以前的应用程序才能使新的API发挥作用。
除了缺失的极少数MapReduce类库之外(请查看最新发行版本,以确定org.apache.hadoop.mapreduce.lib的子程序包中是否包含自己想要的类库),新的API在最新发布的1.x系列(该系列是0.20系列的后继版本)中得以显著改善。
本书的前两个版本是基于0.20发行版本的,一直使用的是旧的API。除了极少几个地方,本书中将新的API作为主要使用的API。因为本书网站上针对书中的范例提供了使用旧的API的代码,所以你希望使用旧的API也是可以的。(部分早期的0.20发行版本反对使用旧API,但是在后续版本中可以继续使用旧API,因此1.x和2.x发行版本同时支持新旧API,而不会提示废弃旧API的警告。)
新旧API之间有如下几个明显的区别。
新API 倾向于使用虚类,而不是接口,因为更有利于扩展。这意味着用不着修改类的实现,即可在虚类中添加一个方法(即默认的实现)。在旧API中使用Mapper和Reducer接口,而在新API 中使用虚类。
新API放在org.apache.hadoop.mapreduce包(和子包)中。之前版本的API依旧放在org.apache.hadoop.mapred中。 l新API充分使用上下文对象,使用户代码能与MapReduce系统通信。例如,新的Context基本统一了旧API中的JobConf、OutputCollector和Reporter的功能。 键/值对记录在这两类API中都被推给mapper和reducer,但除此之外,新的API 通过重写run()方法允许mapper和reducer控制执行流程。例如,既可以批处理记录,也可以在处理完所有的记录之前停止。在旧API中可以通过写MapRunnable类在mapper中实现上述功能,但是在reducer中没有对等的实现。 新的API中作业控制由Job类实现,而非旧API中的JobClient类,新的API中删除了JobClient类。 新增的API实现了配置的统一。旧API 通过一个特殊的JobConf 对象配置作业,该对象是Hadoop配置对象的一个扩展(用于配置守护进程,详情请参见5.1节的“API配置”)。在新API 中,作业的配置由Configuration(或许通过Job类中的一些辅助方法)来完成。 输出文件的命名方式稍有不同。在旧的API中map和reduce的输出被统一命名为part-nnmm,但是在新API中map的输出文件名为part-m-nnnnn,而reduce的输出文件名为part-r-nnnnn(其中nnnnn是从0开始的表示分块序号的整数)。 新API中的用户重载函数被声明为抛出异常java.lang.InterruptedException。这意味着可以用代码来实现中断响应,从而使该框架在必要时可以优雅地取消需长时间运行的作业。 在新的API中,reduce()传递的值是java.lang.Iterable类型的,而非java.lang.Iterator类型(旧API中传递该类型的值)。这一改变使我们更容易通过Java的for-each循环结构来来迭代这些值。
For (VALUEIN valueLvalues){…}范例2-6所示为使用旧API重写的MaxTemperature应用。不同的地方已经加粗显示。
将Mapper和Reducer类转换为新API时,记住将map()和reduce()的签名转换为新形式。如果只是将类的继承修改为对新的Mapper和Reducer类的继承,编译的时候不会报错或显示警告信息,因为新的Mapper和Reducer类也同样提供了等价的map()和reduce()函数。但是,自己写的mapper或reducer代码是不会被调用的,这会导致难以诊断的错误。
对map()和reduce()方法添加@override注释,Java编译器会发现这些错误。
范例2-6. 使用旧MapReduce API重写后的MaxTemperature应用
public class OldMaxTemperature { static class OldMaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { output.collect(new Text(year), new IntWritable(airTemperature)); } } } static class OldMaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); }} public static void main(String[] args) throws IOException { if (args.length != 2) { System.err.println("Usage: OldMaxTemperature <input path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(OldMaxTemperature.class); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(OldMaxTemperatureMapper.class); conf.setReducerClass(OldMaxTemperatureReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); }}2.4 横向扩展
前面介绍了MapReduce针对少量输入数据是如何工作的,现在我们开始鸟瞰整个系统以及有大量输入时的数据流。为了简单起见,到目前为止,我们的例子都只是用了本地文件系统中的文件。然而,为了实现横向扩展(scaling out),我们需要把数据存储在分布式文件系统中,一般为HDFS (详见第3章),由此允许Hadoop将MapReduce 计算转移到存储有部分数据的各台机器上。下面我们看看具体过程。
2.4.1 数据流
首先定义一些术语。MapReduce作业(job) 是客户端需要执行的一个工作单元:它包括输入数据、MapReduce程序和配置信息。Hadoop将作业分成若干个小任务 (task)来执行,其中包括两类任务:map任务和reduce任务。
有两类节点控制着作业执行过程:一个jobtracker及一系列tasktracker。jobtracker通过调度tasktracker上运行的任务来协调所有运行在系统上的作业。tasktracker在运行任务的同时将运行进度报告发送给jobtracker,jobtracker由此记录每项作业任务的整体进度情况。如果其中一个任务失败,jobtracker可以在另外一个tasktracker节点上重新调度该任务。
Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split)或简称“分片”。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map 函数从而处理分片中的每条记录。
拥有许多分片,意味着处理每个分片所需要的时间少于处理整个输入数据所花的时间。因此,如果我们并行处理每个分片,且每个分片数据比较小,那么整个处理过程将获得更好的负载平衡,因为一台较快的计算机能够处理的数据分片比一台较慢的计算机更多,且成一定的比例。即使使用相同的机器,失败的进程或其他同时运行的作业能够实现满意的负载平衡,并且如果分片被切分得更细,负载平衡的会更高。
另一方面,如果分片切分得太小,那么管理分片的总时间和构建map 任务的总时间将决定作业的整个执行时间。对于大多数作业来说,一个合理的分片大小趋向于HDFS的一个块的大小,默认是64 MB,不过可以针对集群调整这个默认值(对新建的所有文件),或对新建的每个文件具体指定。
Hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,可以获得最佳性能。这就是所谓的“数据本地化优化”(data locality optimization),因为它无需使用宝贵的集群带宽资源。但是,有时对于一个map任务的输入来说,存储有某个HDFS数据块备份的三个节点可能正在运行其他map任务,此时作业调度需要在三个备份中的某个数据寻求同个机架中空闲的机器来运行该map任务。仅仅在非常偶然的情况下(该情况基本上不会发生),会使用其他机架中的机器运行该map任务,这将导致机架与机架之间的网络传输。
图2-2显示了这三种可能性。
图2-2. 本地数据(a)、本地机架(b)和跨机架(c)map任务
现在我们应该清楚为什么最佳分片的大小应该与块大小相同:因为它是确保可以存储在单个节点上的最大输入块的大小。如果分片跨越两个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务节点。与使用本地数据运行整个map任务相比,这种方法显然效率更低。
map任务将其输出写入本地硬盘,而非HDFS。这是为什么?因为map的输出是中间结果:该中间结果由reduce任务处理后才产生最终输出结果,而且一旦作业完成,map的输出结果就可以删除。因此,如果把它存储在HDFS中并实现备份,难免有些小题大做。如果该节点上运行的map任务在将map 中间结果传送给reduce 任务之前失败,Hadoop将在另一个节点上重新运行这个map任务以再次构建map中间结果。
reduce任务并不具备数据本地化的优势——单个reduce任务的输入通常来自于所有mapper的输出。在本例中,我们仅有一个reduce 任务,其输入是所有map任务的输出。因此,排过序的map输出需通过网络传输发送到运行reduce 任务的节点。数据在reduce端合并,然后由用户定义的reduce 函数处理。reduce的输出通常存储在HDFS中以实现可靠存储。如第3章所述,对于每个reduce 输出的HDFS块,第一个复本存储在本地节点上,其他复本存储在其他机架节点中。因此,将reduce的输出写入HDFS确实需要占用网络带宽,但这与正常的HDFS流水线写入的消耗一样。
一个reduce任务的完整数据流如图2-3所示。虚线框表示节点,虚线箭头表示节点内部的数据传输,而实线箭头表示不同节点之间的数据传输。
图2-3. 一个reduce任务的MapReduce数据流
reduce任务的数量并非由输入数据的大小决定,而事实上是独立指定的。7.1.1节将介绍如何为指定的作业选择reduce任务的数量。
如果有好多个reduce任务,每个map任务就会针对输出进行分区(partition),即为每个reduce任务建一个分区。每个分区有许多键(及其对应的值),但每个键对应的键/值对记录都在同一分区中。分区由用户定义的partition函数控制,但通常用默认的partitioner通过哈希函数来分区,很高效。
一般情况下,多个reduce任务的数据流如图2-4所示。该图清楚地表明了为什么map任务和reduce任务之间的数据流称为shuffle(混洗),因为每个reduce 任务的输入都来自许多map任务。shuffle一般比图中所示的更复杂,而且调整混洗参数对作业总执行时间的影响非常大,详情参见6.4节。
最后,当数据处理可以完全并行,即无需混洗时,可能会出现无reduce任务的情况(示例参见7.2.2节)。在这种情况下,唯一的非本地节点数据传输是map任务将结果写入HDFS(参见图2-5)。
图2-3. 多个reduce任务的数据流
2.4.2 combiner函数
集群上的可用带宽限制了MapReduce作业的数量,因此尽量避免map和reduce任务之间的数据传输是有利的。Hadoop允许用户针对map任务的输出指定一个combiner(就像mapper和reducer一样)——combiner函数的输出作为reduce函数的输入。由于combiner属于优化方案,所以Hadoop无法确定要对map任务输出记录调用多少次combiner (如果需要)。换而言之,不管调用combiner多少次,0次、1次或多次,reducer的输出结果都是一样的。
combiner的规则制约着可用的函数类型。这里最好用一个例子来说明。还是假设以前计算最高气温的例子,1950年的读数由两个map任务处理(因为它们在不同的分片中)。假设第一个map 的输出如下:
(1950, 0) (1950, 20) (1950, 10)第二个map的输出如下:
(1950, 25) (1950, 15)
图2-5. 无reduce任务的MapReduce数据流
reduce函数被调用时,输入如下:
(1950, [0, 20, 10, 25, 15])因为25为该列数据中最大的,所以它的输出如下:
(1950, 25)我们可以像使用reduce函数那样,使用combiner找出每个map任务输出结果中的最高气温。如此一来,reduce函数调用时将被传入以下数据:
(1950, [20, 25]) reduce输出的结果和以前一样。更简单地说,我们可以通过下面的表达式来说明气温数值的函数调用:
max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
并非所有函数都具有该属性。[ 有此属性的函数叫commutative和associative。有时也将它们称为distributive,比如在Gray等人1995年发表的论文“Data Cube: A Relational Aggregation Operatior Generalizing Groupby, Cross-Tab, and Sub-Totals”中。]例如,如果我们计算平均气温,就不能用平均数作为combiner,因为
mean(0, 20, 10, 25, 15) = 14 但是combiner不能取代reduce函数:
mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15
为什么呢?我们仍然需要reduce函数来处理不同map输出中具有相同键的记录。但它能有效减少mapper和reducer之间的数据传输量,在MapReduce作业中使用combiner函数需要慎重考虑。
指定一个combiner
让我们回到Java MapReduce 程序,combiner是通过Reducer类来定义的,并且在这个例子中,它的实现与MaxTemperatureReducer中的reduce函数相同。唯一的改动是在Job中设置combiner类(参见范例2-7)。
范例2-7. 使用combiner快速找出最高气温
public class MaxTemperatureWithCombiner { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: MaxTemperatureWithCombiner <input path> " + "<output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperatureWithCombiner.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
2.4.3 运行分布式的MapReduce作业
这个程序无需修改便可以在一个完整的数据集上直接运行。这是MapReduce的优势:它可以根据数据量的大小和硬件规模进行扩展。这里有一个运行结果:在一个10节点EC2集群运行High-CPU Extra Large lnstance,程序执行时间只花了6分钟。
我们将在第5章分析在集群上运行程序的机制。
2.5 Hadoop Streaming
Hadoop提供了MapReduce的API,允许你使用非Java的其他语言来写自己的map和reduce函数。HadoopStreaming使用Unix标准流作为Hadoop和应用程序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出来写MapReduce程序。
Streaming天生适合用于文本处理。map的输入数据通过标准输入流传递给map函数,并且是一行一行地传输,最后将结果行写到标准输出。map输出的键/值对是以一个制表符分隔的行,并且写入标准输出reduce 函数的输入格式与之相同(通过制表符来分隔的键/值对)并通过标准输入流进行传输。reduce函数从标准输入流中读取输入行,该输入已由Hadoop框架根据键排过序,最后将结果写入标准输出。
下面使用Streaming来重写按年份查找最高气温的MapReduce程序。
2.5.1 Ruby版本
范例2-8显示了用Ruby编写的map函数。
范例2-8. 用Ruby编写查找最高气温的map函数
#!/usr/bin/env rubySTDIN.each_line do |line| val = line year, temp, q = val[15,4], val[87,5], val[92,1] puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) end程序通过程序块执行STDIN(一个IO类型的全局常量)中的每一行来迭代执行标准输入中的每一行。该程序块从输入的每一行中取出相关字段,如果气温有效,就将年份以及气温以制表符\t隔开写为标准输出(使用puts)。
值得一提的是Streaming和Java MapReduce API之间的设计差异。Java API控制的map函数一次只处理一条记录。针对输入数据中的每一条记录,该框架均需调用Mapper的map()方法来处理。然而在Streaming中,map程序可以自己决定如何处理输入数据,例如,它可以轻松读取并同时处理若干行,因为它受读操作的控制。用户的Java map实现的是“推”记录方式,但它仍然可以同时处理多行,具体做法是通过mapper中实例变量将之前读取的多行汇聚在一起。[ 另一种方法是,可以在新增的MapReduce API中使用“拉”的方式来处理。详情参见2.3.1节对新旧Java MapReduce API的讨论。]在这种情况下,需要实现close()方法,以便知道何时读到最后一条记录,进而完成对最后一组记录行的处理。
由于这个脚本只能在标准输入和输出上运行,所以最简单的方式是在Unix管道上进行测试,而不是在Hadoop中进行测试:
% cat input/ncdc/sample.txt | ch02/src/main/ruby/max_temperature_map.rb 1950 +0000 1950 +0022 1950 -0011 1949 +0111 1949 +0078
范例2-9显示的reduce函数更复杂一些。
范例2-9. 用Ruby编写的查找最高气温的reduce函数
#!/usr/bin/env rubylast_key, max_val = nil, -1000000 STDIN.each_line do |line| key, val = line.split("\t") if last_key && last_key != key puts "#{last_key}\t#{max_val}" last_key, max_val = key, val.to_i else last_key, max_val = key, [max_val, val.to_i].max end end puts "#{last_key}\t#{max_val}" if last_key同样,程序遍历标准输入中的行,但在我们处理每个键组时,要存储一些状态。在这种情况下,键是年份,我们存储最后一个看到的键和迄今为止见到的该键对应的最高气温。MapReduce框架保证了键的有序性,我们由此可知,如果读到一个键与前一个键不同,就需要开始处理一个新的键组。相比之下,Java API系统提供一个针对每个键组的迭代器,而在Streaming中,需要在程序中找出键组的边界。
我们从每行取出键和值,然后如果正好完成一个键组的处理(last_key & last_key != key),就针对该键组写入该键及其最高气温,用一个制表符来进行分隔,最后开始处理新键组时我们需要重置最高气温值。如果尚未完成对一个键组的处理,那么就只更新当前键的最高气温。
程序的最后一行确保处理完输入的最后一个键组之后,会有一行输出。
现在可以用Unix管线来模拟整个MapReduce管线,该管线与图2-1中显示的Unix管线是相同的:
% cat input/ncdc/sample.txt | ch02/src/main/ruby/max_temperature_map.rb | \ sort | ch02/src/main/ruby/max_temperature_reduce.rb 1949 111 1950 22输出结果和Java程序的一样,所以下一步是通过Hadoop运行它。
hadoop命令不支持Streaming,因此,我们需要在指定Streaming JAR文件流与jar选项时指定。Streaming程序的选项指定了输入和输出路径以及map和reduce脚本。如下所示:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input input/ncdc/sample.txt \ -output output \ -mapper ch02/src/main/ruby/max_temperature_map.rb \ -reducer ch02/src/main/ruby/max_temperature_reduce.rb
在一个集群上运行一个庞大的数据集时,我们应该使用-combiner选项来设置combiner。
在1.x之后的发行版本,combiner可以是任何一个Streaming命令。对于早期版本,combiner只能用Java写,所以一个变通的方法是在mapper中进行手动合并,从而避开Java语言。在这里,我们可以把mapper改成管线:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input input/ncdc/all \ -output output \ -mapper "ch02/src/main/ruby/max_temperature_map.rb | sort | ch02/src/main/ruby/max_temperature_reduce.rb" \ -reducer ch02/src/main/ruby/max_temperature_reduce.rb \ -file ch02/src/main/ruby/max_temperature_map.rb \ -file ch02/src/main/ruby/max_temperature_reduce.rb还需注意-file选项的使用,在集群上运行Streaming程序时,我们会使用这个选项,从而将脚本传输到集群。
2.5.2 Python版本
Streaming支持任何可以从标准输入读取和写入到标准输出中的编程语言,因此对于更熟悉Python的读者,下面提供了同一个例子的Python 版本。map脚本参见范例2-10,reduce脚本参见范例2-11。
范例2-10. 用于查找最高气温的map函数(python版)
#!/usr/bin/env pythonimport re import sysfor line in sys.stdin: val = line.strip() (year, temp, q) = (val[15:19], val[87:92], val[92:93]) if (temp != "+9999" and re.match("[01459]", q)): print "%s\t%s" % (year, temp)范例2-11. 用于查找最高气温的reduce函数(python版)
#!/usr/bin/env pythonimport