《Hadoop实战第2版》——3.5节Hadoop Pipes

3.5 Hadoop Pipes
Hadoop Pipes提供了一个在Hadoop上运行C++程序的方法。与流不同的是,流使用的是标准输入输出作为可执行程序与Hadoop相关进程间通信的工具,而Pipes使用的是Sockets。先看一个示例程序wordcount.cpp:

#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"

const std::string WORDCOUNT = "WORDCOUNT";
const std::string INPUT_WORDS = "INPUT_WORDS";
const std::string OUTPUT_WORDS = "OUTPUT_WORDS";

class WordCountMap: public HadoopPipes::Mapper {
public:
  HadoopPipes::TaskContext::Counter* inputWords;

  WordCountMap(HadoopPipes::TaskContext& context) {
    inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
  }

  void map(HadoopPipes::MapContext& context) {
    std::vector<std::string> words =
      HadoopUtils::splitString(context.getInputValue(), " ");
    for(unsigned int i=0; i < words.size(); ++i) {
      context.emit(words[i], "1");
    }
    context.incrementCounter(inputWords, words.size());
  }
};

class WordCountReduce: public HadoopPipes::Reducer {
public:
  HadoopPipes::TaskContext::Counter* outputWords;

  WordCountReduce(HadoopPipes::TaskContext& context) {
    outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
  }

  void reduce(HadoopPipes::ReduceContext& context) {
    int sum = 0;
    while (context.nextValue()) {
      sum += HadoopUtils::toInt(context.getInputValue());
    }
    context.emit(context.getInputKey(), HadoopUtils::toString(sum));
    context.incrementCounter(outputWords, 1);
  }
};
int main(int argc, char *argv[]) {
  return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, WordCountReduce>());
}

这个程序连接的是一个C++库,结构类似于Java编写的程序。如新版API一样,这个程序使用context方法读入和收集对。在使用时要重写HadoopPipes名字空间下的Mapper和Reducer函数,并用context.emit()方法输出对。main函数是应用程序的入口,它调用HadoopPipes::runTask方法,这个方法由一个TemplateFactory参数来创建Map和Reduce实例,也可以重载factory设置combiner()、partitioner()、record reader、record writer。
接下来,编译这个程序。这个编译命令需要用到g++,读者可以使用apt自动安装这个程序。g++的命令格式如下所示:

apt-get install g++

然后建立文件Makerfile,如下所示:

HADOOP_INSTALL="你的hadoop安装文件夹"
PLATFORM=Linux-i386-32(如果是AMD的CPU,请使用Linux-amd64-64)

CC = g++
CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

wordcount: wordcount.cpp
$(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes -lhadooputils -lpthread -g -O2 -o $@

注意在$(CC)前有一个符号,这个分隔符是很关键的。
在当前目录下建立一个WordCount可执行文件。
接着,上传可执行文件到HDFS上,这是为了TaskTracker能够获得这个可执行文件。这里上传到bin文件夹内。

~/hadoop/bin/hadoop fs –mkdir bin
~/hadoop/bin/hadoop dfs –put wordcount bin

然后,就可以运行这个MapReduce程序了,可以采用两种配置方式运行这个程序。一种方式是直接在命令中运行指定配置,如下所示:

~/hadoop/bin/hadoop pipes\
-D hadoop.pipes.java.recordreader=true\
-D hadoop.pipes.java.recordwriter=true\
-input input\
-output Coutput\
-program bin/wordcount

另一种方式是预先将配置写入配置文件中,如下所示:

<?xml version="1.0"?>
<configuration>
  <property>
    // Set the binary path on DFS
    <name>hadoop.pipes.executable</name>
    <value>bin/wordcount</value>
  </property>
  <property>
    <name>hadoop.pipes.java.recordreader</name>
    <value>true</value>
  </property>
  <property>
    <name>hadoop.pipes.java.recordwriter</name>
    <value>true</value>
  </property>
</configuration>

然后通过如下命令运行这个程序:

~/hadoop/bin/hadoop pipes -conf word.xml -input input -output output

将参数hadoop.pipes.executable和hadoop.pipes.java.recordreader设置为true表示使用Hadoop默认的输入输出方式(即Java的)。同样的,也可以设置一个Java语言编写的Mapper函数、Reducer函数、combiner函数和partitioner函数。实际上,在任何一个作业中,都可以混用Java类和C++类。

时间: 2024-09-18 04:06:14

《Hadoop实战第2版》——3.5节Hadoop Pipes的相关文章

《Hadoop实战第2版》——2.1节在Linux上安装与配置Hadoop

2.1 在Linux上安装与配置Hadoop 在Linux上安装Hadoop之前,需要先安装两个程序: 1)JDK 1.6(或更高版本).Hadoop是用Java编写的程序,Hadoop的编译及MapReduce的运行都需要使用JDK.因此在安装Hadoop前,必须安装JDK 1.6或更高版本. 2)SSH(安全外壳协议),推荐安装OpenSSH.Hadoop需要通过SSH来启动Slave列表中各台主机的守护进程,因此SSH也是必须安装的,即使是安装伪分布式版本(因为Hadoop并没有区分开集群

《Hadoop实战第2版》——1.2节Hadoop项目及其结构

1.2 Hadoop项目及其结构 现在Hadoop已经发展成为包含很多项目的集合.虽然其核心内容是MapReduce和Hadoop分布式文件系统,但与Hadoop相关的Common.Avro.Chukwa.Hive.HBase等项目也是不可或缺的.它们提供了互补性服务或在核心层上提供了更高层的服务.图1-1是Hadoop的项目结构图. 下面将对Hadoop的各个关联项目进行更详细的介绍. 1)Common:Common是为Hadoop其他子项目提供支持的常用工具,它主要包括FileSystem.

《Hadoop实战第2版》——3.4节Hadoop流

3.4 Hadoop流 Hadoop流提供了一个API,允许用户使用任何脚本语言写Map函数或Reduce函数.Hadoop流的关键是,它使用UNIX标准流作为程序与Hadoop之间的接口.因此,任何程序只要可以从标准输入流中读取数据并且可以写入数据到标准输出流,那么就可以通过Hadoop流使用其他语言编写MapReduce程序的Map函数或Reduce函数.举个最简单的例子(本例的运行环境:Ubuntu,Hadoop-0.20.2): bin/hadoop jar contrib/stream

《Hadoop实战第2版》——1.1节什么是Hadoop

1.1 什么是Hadoop 1.1.1 Hadoop概述 Hadoop是Apache软件基金会旗下的一个开源分布式计算平台.以Hadoop分布式文件系统(Hadoop Distributed File System,HDFS)和MapReduce(Google MapReduce的开源实现)为核心的Hadoop为用户提供了系统底层细节透明的分布式基础架构.HDFS的高容错性.高伸缩性等优点允许用户将Hadoop部署在低廉的硬件上,形成分布式系统:MapReduce分布式编程模型允许用户在不了解分

《Hadoop实战第2版》——1.7节Hadoop集群安全策略

1.7 Hadoop集群安全策略众所周知,Hadoop的优势在于其能够将廉价的普通PC组织成能够高效稳定处理事务的大型集群,企业正是利用这一特点来构架Hadoop集群.获取海量数据的高效处理能力的.但是,Hadoop集群搭建起来后如何保证它安全稳定地运行呢?旧版本的Hadoop中没有完善的安全策略,导致Hadoop集群面临很多风险,例如,用户可以以任何身份访问HDFS或MapReduce集群,可以在Hadoop集群上运行自己的代码来冒充Hadoop集群的服务,任何未被授权的用户都可以访问Data

《Hadoop实战第2版》——1.3节Hadoop体系结构

1.3 Hadoop体系结构如上文所说,HDFS和MapReduce是Hadoop的两大核心.而整个Hadoop的体系结构主要是通过HDFS来实现分布式存储的底层支持的,并且它会通过MapReduce来实现分布式并行任务处理的程序支持.下面首先介绍HDFS的体系结构.HDFS采用了主从(Master/Slave)结构模型,一个HDFS集群是由一个NameNode和若干个DataNode组成的.其中NameNode作为主服务器,管理文件系统的命名空间和客户端对文件的访问操作:集群中的DataNod

《Hadoop实战第2版》——1.8节本章小结

1.8 本章小结本章首先介绍了Hadoop分布式计算平台:它是由Apache软件基金会开发的一个开源分布式计算平台.以Hadoop分布式文件系统(HDFS)和MapReduce(Google MapReduce的开源实现)为核心的Hadoop为用户提供了系统底层细节透明的分布式基础架构.由于Hadoop拥有可计量.成本低.高效.可信等突出特点,基于Hadoop的应用已经遍地开花,尤其是在互联网领域.本章接下来介绍了Hadoop项目及其结构,现在Hadoop已经发展成为一个包含多个子项目的集合,被

《Hadoop实战第2版》——2.6节本章小结

2.6 本章小结本章主要讲解了Hadoop的安装和配置过程.Hadoop的安装过程并不复杂,基本配置也简单明了,其中有几个关键点: Hadoop主要是用Java语言写的,它无法使用一般Linux预装的OpenJDK,因此在安装Hadoop前要先安装JDK(版本要在1.6以上): 作为分布式系统,Hadoop需要通过SSH的方式启动处于slave上的程序,因此必须安装和配置SSH.由此可见,在安装Hadoop前需要安装JDK及SSH.Hadoop在Mac OS X上的安装与Linux雷同,在Win

《Hadoop实战手册》一1.2 使用Hadoop shell命令导入和导出数据到HDFS

1.2 使用Hadoop shell命令导入和导出数据到HDFS HDFS提供了许多shell命令来实现访问文件系统的功能,这些命令都是构建在HDFS FileSystem API之上的.Hadoop自带的shell脚本是通过命令行来执行所有操作的.这个脚本的名称叫做hadoop,通常安装在$HADOOP_BIN目录下,其中$HADOOP_BIN是Hadoopbin文件完整的安装目录,同时有必要将$HADOOP_BIN配置到$PATH环境变量中,这样所有的命令都可以通过hadoop fs -co

《Hadoop实战手册》一第1章 Hadoop分布式文件系统——导入和导出数据

第1章 Hadoop分布式文件系统--导入和导出数据 Hadoop实战手册 本章我们将介绍: 使用Hadoop shell命令导入和导出数据到HDFS 使用distcp实现集群间数据复制 使用Sqoop从MySQL数据库导入数据到HDFS 使用Sqoop从HDFS导出数据到MySQL 配置Sqoop以支持SQL Server 从HDFS导出数据到MongoDB 从MongoDB导入数据到HDFS 使用Pig从HDFS导出数据到MongoDB 在Greenplum外部表中使用HDFS 利用Flum