3.4 Hadoop流
Hadoop流提供了一个API,允许用户使用任何脚本语言写Map函数或Reduce函数。Hadoop流的关键是,它使用UNIX标准流作为程序与Hadoop之间的接口。因此,任何程序只要可以从标准输入流中读取数据并且可以写入数据到标准输出流,那么就可以通过Hadoop流使用其他语言编写MapReduce程序的Map函数或Reduce函数。
举个最简单的例子(本例的运行环境:Ubuntu,Hadoop-0.20.2):
bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper /bin/cat –reducer usr/bin/wc
从这个例子中可以看到,Hadoop流引入的包是hadoop-0.20.2-streaming.jar,并且具有如下命令:
-input 指明输入文件路径
-output 指明输出文件路径
-mapper 指定map函数
-reducer 指定reduce函数
Hadoop流的操作还有其他参数,后面会一一列出。
3.4.1 Hadoop流的工作原理
先来看Hadoop流的工作原理。在上例中,Map和Reduce都是Linux内的可执行文件,更重要的是,它们接受的都是标准输入(stdin),输出的都是标准输出(stdout)。如果大家熟悉Linux,那么对它们一定不会陌生。执行上一节中的示例程序的过程如下所示。
程序的输入与WordCount程序是一样的,具体如下:
file01:
hello world bye world
file02
hello hadoop bye hadoop
输入命令:
bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper /bin/cat -reducer /usr/bin/wc
显示:
packageJobJar: [/root/tmp/hadoop-unjar7103575849190765740/] [] /tmp/streamjob2314757737747407133.jar tmpDir=null
11/01/23 02:07:36 INFO mapred.FileInputFormat: Total input paths to process : 2
11/01/23 02:07:37 INFO streaming.StreamJob: getLocalDirs(): [/root/tmp/mapred/local]
11/01/23 02:07:37 INFO streaming.StreamJob: Running job: job_201101111819_0020
11/01/23 02:07:37 INFO streaming.StreamJob: To kill this job, run:
11/01/23 02:07:37 INFO streaming.StreamJob: /root/hadoop/bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201101111819_0020
11/01/23 02:07:37 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201101111819_0020
11/01/23 02:07:38 INFO streaming.StreamJob: map 0% reduce 0%
11/01/23 02:07:47 INFO streaming.StreamJob: map 100% reduce 0%
11/01/23 02:07:59 INFO streaming.StreamJob: map 100% reduce 100%
11/01/23 02:08:02 INFO streaming.StreamJob: Job complete: job_201101111819_0020
11/01/23 02:08:02 INFO streaming.StreamJob: Output: output
程序的输出是:
2 8 46
wc命令用来统计文件中的行数、单词数与字节数,可以看到,这个结果是正确的。
Hadoop流的工作原理并不复杂,其中Map的工作原理如图3-4所示(Reduce与其相同)。
当一个可执行文件作为Mapper时,每一个Map任务会以一个独立的进程启动这个可执行文件,然后在Map任务运行时,会把输入切分成行提供给可执行文件,并作为它的标准输入(stdin)内容。当可执行文件运行出结果时,Map从标准输出(stdout)中收集数据,并将其转化为对,作为Map的输出。
Reduce与Map相同,如果可执行文件做Reducer时,Reduce任务会启动这个可执行文件,并且将对转化为行作为这个可执行文件的标准输入(stdin)。然后Reduce会收集这个可执行文件的标准输出(stdout)的内容。并把每一行转化为对,作为Reduce的输出。
Map与Reduce将输出转化为对的默认方法是:将每行的第一个tab符号(制表符)之前的内容作为key,之后的内容作为value。如果没有tab符号,那么这一行的所有内容会作为key,而value值为null。当然这是可以更改的。
值得一提的是,可以使用Java类作为Map,而用一个可执行程序作为Reduce;或使用Java类作为Reduce,而用可执行程序作为Map。例如:
/bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar
-input myInputDirs -output myOutputDir –mapper
org.apache.hadoop.mapred.lib.IdentityMapper -reducer /bin/wc
3.4.2 Hadoop流的命令
Hadoop流提供自己的流命令选项及一个通用的命令选项,用于设置Hadoop流任务。首先介绍一下流命令。
- Hadoop流命令选项
表3-1所示的Hadoop流命令中,必选的4个很好理解,分别用于指定输入/输出文件的位置及Map/Reduce函数。在其他的可选命令中,这里我们只解释常用的几个。
-file
-file指令用于将文件加入到Hadoop的Job中。上面的例子中,cat和wc都是Linux系统中的命令,而在Hadoop流的使用中,往往需要使用自己写的文件(作为Map函数或Reduce函数)。一般而言,这些文件是Hadoop集群中的机器上没有的,这时就需要使用Hadoop流中的-file命令将这个可执行文件加入到Hadoop的Job中。-combiner
这个命令用来加入combiner程序。
-inputformat和-outputformat
这两个命令用来设置输入输出文件的处理方法,这两个命令后面的参数必须是Java类。
- Hadoop流通用的命令选项
Hadoop流的通用命令用来配置Hadoop流的Job。需要注意的是,如果使用这部分配置,就必须将其置于流命令配置之前,否则命令会失败。这里简要列出命令列表(如表3-2所示),供大家参考。
3.4.3 两个例子
从上面的内容可以知道,Hadoop流的API是一个扩展性非常强的框架,它与程序相连的部分只有数据,因此可以接受任何适用于UNIX标准输入/输出的脚本语言,比如Bash、PHP、Ruby、Python等。
下面举两个非常简单的例子来进一步说明它的特性。- Bash
MapReduce框架是一个非常适合在大规模的非结构化数据中查找数据的编程模型,grep就是这种类型的一个例子。
在Linux中,grep命令用来在一个或多个文件中查找某个字符模式(这个字符模式可以代表字符串,多用正则表达式表示)。
下面尝试在如下的数据中查找带有Hadoop字符串的行,如下所示。
输入文件为:file01: hello world bye world file02: hello hadoop bye hadoop reduce文件为: reduce.sh: grep hadoop 输入命令为: bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper /bin/cat -reducer ~/Desktop/test/reducer.sh -file ~/Desktop/test/reducer.sh 结果为: hello hadoop bye hadoop
显然,这个结果是正确的。
- Python
对于Python来说,情况有些特殊。因为Python是可以编译为JAR包的,如果将程序编译为JAR包,那么就可以采用运行JAR包的方式来运行了。
不过,同样也可以用流的方式运行Python程序。请看如下代码:
Reduce.py #!/usr/bin/python import sys; def generateLongCountToken(id): return "LongValueSum:" + id + "\t" + "1" def main(argv): line = sys.stdin.readline(); try: while line: line = line[:-1]; fields = line.split("\t"); print generateLongCountToken(fields[0]); line = sys.stdin.readline(); except "end of file": return None if __name__ == "__main__": main(sys.argv) 使用如下命令来运行: bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output pyoutput -mapper reduce.py -reducer aggregate -file reduce.py
注意其中的aggregate是Hadoop提供的一个包,它提供一个Reduce函数和一个combine函数。这个函数实现一些简单的类似求和、取最大值最小值等的功能。
- Hadoop流通用的命令选项