《Hadoop实战第2版》——3.4节Hadoop流

3.4 Hadoop流
Hadoop流提供了一个API,允许用户使用任何脚本语言写Map函数或Reduce函数。Hadoop流的关键是,它使用UNIX标准流作为程序与Hadoop之间的接口。因此,任何程序只要可以从标准输入流中读取数据并且可以写入数据到标准输出流,那么就可以通过Hadoop流使用其他语言编写MapReduce程序的Map函数或Reduce函数。
举个最简单的例子(本例的运行环境:Ubuntu,Hadoop-0.20.2):

bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper /bin/cat –reducer usr/bin/wc

从这个例子中可以看到,Hadoop流引入的包是hadoop-0.20.2-streaming.jar,并且具有如下命令:
-input 指明输入文件路径
-output 指明输出文件路径
-mapper 指定map函数
-reducer 指定reduce函数
Hadoop流的操作还有其他参数,后面会一一列出。

3.4.1 Hadoop流的工作原理
先来看Hadoop流的工作原理。在上例中,Map和Reduce都是Linux内的可执行文件,更重要的是,它们接受的都是标准输入(stdin),输出的都是标准输出(stdout)。如果大家熟悉Linux,那么对它们一定不会陌生。执行上一节中的示例程序的过程如下所示。
程序的输入与WordCount程序是一样的,具体如下:

file01:
hello world bye world
file02
hello hadoop bye hadoop
输入命令:
bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper /bin/cat -reducer /usr/bin/wc
显示:
packageJobJar: [/root/tmp/hadoop-unjar7103575849190765740/] [] /tmp/streamjob2314757737747407133.jar tmpDir=null
11/01/23 02:07:36 INFO mapred.FileInputFormat: Total input paths to process : 2
11/01/23 02:07:37 INFO streaming.StreamJob: getLocalDirs(): [/root/tmp/mapred/local]
11/01/23 02:07:37 INFO streaming.StreamJob: Running job: job_201101111819_0020
11/01/23 02:07:37 INFO streaming.StreamJob: To kill this job, run:
11/01/23 02:07:37 INFO streaming.StreamJob: /root/hadoop/bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201101111819_0020
11/01/23 02:07:37 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201101111819_0020
11/01/23 02:07:38 INFO streaming.StreamJob:  map 0%  reduce 0%
11/01/23 02:07:47 INFO streaming.StreamJob:  map 100%  reduce 0%
11/01/23 02:07:59 INFO streaming.StreamJob:  map 100%  reduce 100%
11/01/23 02:08:02 INFO streaming.StreamJob: Job complete: job_201101111819_0020
11/01/23 02:08:02 INFO streaming.StreamJob: Output: output
程序的输出是:
2       8      46

wc命令用来统计文件中的行数、单词数与字节数,可以看到,这个结果是正确的。
Hadoop流的工作原理并不复杂,其中Map的工作原理如图3-4所示(Reduce与其相同)。

当一个可执行文件作为Mapper时,每一个Map任务会以一个独立的进程启动这个可执行文件,然后在Map任务运行时,会把输入切分成行提供给可执行文件,并作为它的标准输入(stdin)内容。当可执行文件运行出结果时,Map从标准输出(stdout)中收集数据,并将其转化为对,作为Map的输出。
Reduce与Map相同,如果可执行文件做Reducer时,Reduce任务会启动这个可执行文件,并且将对转化为行作为这个可执行文件的标准输入(stdin)。然后Reduce会收集这个可执行文件的标准输出(stdout)的内容。并把每一行转化为对,作为Reduce的输出。
Map与Reduce将输出转化为对的默认方法是:将每行的第一个tab符号(制表符)之前的内容作为key,之后的内容作为value。如果没有tab符号,那么这一行的所有内容会作为key,而value值为null。当然这是可以更改的。
值得一提的是,可以使用Java类作为Map,而用一个可执行程序作为Reduce;或使用Java类作为Reduce,而用可执行程序作为Map。例如:

/bin/hadoop  jar contrib/streaming/hadoop-0.20.2-streaming.jar
 -input myInputDirs -output myOutputDir –mapper
org.apache.hadoop.mapred.lib.IdentityMapper -reducer /bin/wc

3.4.2 Hadoop流的命令
Hadoop流提供自己的流命令选项及一个通用的命令选项,用于设置Hadoop流任务。首先介绍一下流命令。

  1. Hadoop流命令选项

    表3-1所示的Hadoop流命令中,必选的4个很好理解,分别用于指定输入/输出文件的位置及Map/Reduce函数。在其他的可选命令中,这里我们只解释常用的几个。
    -file
    -file指令用于将文件加入到Hadoop的Job中。上面的例子中,cat和wc都是Linux系统中的命令,而在Hadoop流的使用中,往往需要使用自己写的文件(作为Map函数或Reduce函数)。一般而言,这些文件是Hadoop集群中的机器上没有的,这时就需要使用Hadoop流中的-file命令将这个可执行文件加入到Hadoop的Job中。

    -combiner
    

    这个命令用来加入combiner程序。

    -inputformat和-outputformat
    

    这两个命令用来设置输入输出文件的处理方法,这两个命令后面的参数必须是Java类。

    1. Hadoop流通用的命令选项
      Hadoop流的通用命令用来配置Hadoop流的Job。需要注意的是,如果使用这部分配置,就必须将其置于流命令配置之前,否则命令会失败。这里简要列出命令列表(如表3-2所示),供大家参考。

    3.4.3 两个例子
    从上面的内容可以知道,Hadoop流的API是一个扩展性非常强的框架,它与程序相连的部分只有数据,因此可以接受任何适用于UNIX标准输入/输出的脚本语言,比如Bash、PHP、Ruby、Python等。
    下面举两个非常简单的例子来进一步说明它的特性。

    1. Bash
      MapReduce框架是一个非常适合在大规模的非结构化数据中查找数据的编程模型,grep就是这种类型的一个例子。

    在Linux中,grep命令用来在一个或多个文件中查找某个字符模式(这个字符模式可以代表字符串,多用正则表达式表示)。
    下面尝试在如下的数据中查找带有Hadoop字符串的行,如下所示。
    输入文件为:

    file01:
    hello    world bye world
    file02:
    hello    hadoop bye hadoop
    reduce文件为:
    reduce.sh:
    grep hadoop
    输入命令为:
    bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output output -mapper /bin/cat -reducer ~/Desktop/test/reducer.sh -file ~/Desktop/test/reducer.sh
    结果为:
    hello      hadoop bye hadoop
    

    显然,这个结果是正确的。

    1. Python
      对于Python来说,情况有些特殊。因为Python是可以编译为JAR包的,如果将程序编译为JAR包,那么就可以采用运行JAR包的方式来运行了。

    不过,同样也可以用流的方式运行Python程序。请看如下代码:

    Reduce.py
    #!/usr/bin/python
    
    import sys;
    
    def generateLongCountToken(id):
        return "LongValueSum:" + id + "\t" + "1"
    def main(argv):
        line = sys.stdin.readline();
        try:
            while line:
                line = line[:-1];
                fields = line.split("\t");
                print generateLongCountToken(fields[0]);
                line = sys.stdin.readline();
        except "end of file":
            return None
    if __name__ == "__main__":
         main(sys.argv)
    使用如下命令来运行:
    bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input input -output pyoutput -mapper reduce.py -reducer aggregate -file reduce.py
    

    注意其中的aggregate是Hadoop提供的一个包,它提供一个Reduce函数和一个combine函数。这个函数实现一些简单的类似求和、取最大值最小值等的功能。

时间: 2024-08-03 18:53:31

《Hadoop实战第2版》——3.4节Hadoop流的相关文章

《Hadoop实战第2版》——2.1节在Linux上安装与配置Hadoop

2.1 在Linux上安装与配置Hadoop 在Linux上安装Hadoop之前,需要先安装两个程序: 1)JDK 1.6(或更高版本).Hadoop是用Java编写的程序,Hadoop的编译及MapReduce的运行都需要使用JDK.因此在安装Hadoop前,必须安装JDK 1.6或更高版本. 2)SSH(安全外壳协议),推荐安装OpenSSH.Hadoop需要通过SSH来启动Slave列表中各台主机的守护进程,因此SSH也是必须安装的,即使是安装伪分布式版本(因为Hadoop并没有区分开集群

《Hadoop实战第2版》——1.2节Hadoop项目及其结构

1.2 Hadoop项目及其结构 现在Hadoop已经发展成为包含很多项目的集合.虽然其核心内容是MapReduce和Hadoop分布式文件系统,但与Hadoop相关的Common.Avro.Chukwa.Hive.HBase等项目也是不可或缺的.它们提供了互补性服务或在核心层上提供了更高层的服务.图1-1是Hadoop的项目结构图. 下面将对Hadoop的各个关联项目进行更详细的介绍. 1)Common:Common是为Hadoop其他子项目提供支持的常用工具,它主要包括FileSystem.

《Hadoop实战第2版》——1.1节什么是Hadoop

1.1 什么是Hadoop 1.1.1 Hadoop概述 Hadoop是Apache软件基金会旗下的一个开源分布式计算平台.以Hadoop分布式文件系统(Hadoop Distributed File System,HDFS)和MapReduce(Google MapReduce的开源实现)为核心的Hadoop为用户提供了系统底层细节透明的分布式基础架构.HDFS的高容错性.高伸缩性等优点允许用户将Hadoop部署在低廉的硬件上,形成分布式系统:MapReduce分布式编程模型允许用户在不了解分

《Hadoop实战第2版》——1.7节Hadoop集群安全策略

1.7 Hadoop集群安全策略众所周知,Hadoop的优势在于其能够将廉价的普通PC组织成能够高效稳定处理事务的大型集群,企业正是利用这一特点来构架Hadoop集群.获取海量数据的高效处理能力的.但是,Hadoop集群搭建起来后如何保证它安全稳定地运行呢?旧版本的Hadoop中没有完善的安全策略,导致Hadoop集群面临很多风险,例如,用户可以以任何身份访问HDFS或MapReduce集群,可以在Hadoop集群上运行自己的代码来冒充Hadoop集群的服务,任何未被授权的用户都可以访问Data

《Hadoop实战第2版》——3.5节Hadoop Pipes

3.5 Hadoop PipesHadoop Pipes提供了一个在Hadoop上运行C++程序的方法.与流不同的是,流使用的是标准输入输出作为可执行程序与Hadoop相关进程间通信的工具,而Pipes使用的是Sockets.先看一个示例程序wordcount.cpp: #include "hadoop/Pipes.hh" #include "hadoop/TemplateFactory.hh" #include "hadoop/StringUtils.h

《Hadoop实战第2版》——1.3节Hadoop体系结构

1.3 Hadoop体系结构如上文所说,HDFS和MapReduce是Hadoop的两大核心.而整个Hadoop的体系结构主要是通过HDFS来实现分布式存储的底层支持的,并且它会通过MapReduce来实现分布式并行任务处理的程序支持.下面首先介绍HDFS的体系结构.HDFS采用了主从(Master/Slave)结构模型,一个HDFS集群是由一个NameNode和若干个DataNode组成的.其中NameNode作为主服务器,管理文件系统的命名空间和客户端对文件的访问操作:集群中的DataNod

《Hadoop实战第2版》——1.8节本章小结

1.8 本章小结本章首先介绍了Hadoop分布式计算平台:它是由Apache软件基金会开发的一个开源分布式计算平台.以Hadoop分布式文件系统(HDFS)和MapReduce(Google MapReduce的开源实现)为核心的Hadoop为用户提供了系统底层细节透明的分布式基础架构.由于Hadoop拥有可计量.成本低.高效.可信等突出特点,基于Hadoop的应用已经遍地开花,尤其是在互联网领域.本章接下来介绍了Hadoop项目及其结构,现在Hadoop已经发展成为一个包含多个子项目的集合,被

《Hadoop实战第2版》——2.6节本章小结

2.6 本章小结本章主要讲解了Hadoop的安装和配置过程.Hadoop的安装过程并不复杂,基本配置也简单明了,其中有几个关键点: Hadoop主要是用Java语言写的,它无法使用一般Linux预装的OpenJDK,因此在安装Hadoop前要先安装JDK(版本要在1.6以上): 作为分布式系统,Hadoop需要通过SSH的方式启动处于slave上的程序,因此必须安装和配置SSH.由此可见,在安装Hadoop前需要安装JDK及SSH.Hadoop在Mac OS X上的安装与Linux雷同,在Win

《Hadoop实战手册》一1.2 使用Hadoop shell命令导入和导出数据到HDFS

1.2 使用Hadoop shell命令导入和导出数据到HDFS HDFS提供了许多shell命令来实现访问文件系统的功能,这些命令都是构建在HDFS FileSystem API之上的.Hadoop自带的shell脚本是通过命令行来执行所有操作的.这个脚本的名称叫做hadoop,通常安装在$HADOOP_BIN目录下,其中$HADOOP_BIN是Hadoopbin文件完整的安装目录,同时有必要将$HADOOP_BIN配置到$PATH环境变量中,这样所有的命令都可以通过hadoop fs -co