3.5 Hadoop Pipes
Hadoop Pipes提供了一个在Hadoop上运行C++程序的方法。与流不同的是,流使用的是标准输入输出作为可执行程序与Hadoop相关进程间通信的工具,而Pipes使用的是Sockets。先看一个示例程序wordcount.cpp:
#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"
const std::string WORDCOUNT = "WORDCOUNT";
const std::string INPUT_WORDS = "INPUT_WORDS";
const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
class WordCountMap: public HadoopPipes::Mapper {
public:
HadoopPipes::TaskContext::Counter* inputWords;
WordCountMap(HadoopPipes::TaskContext& context) {
inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
}
void map(HadoopPipes::MapContext& context) {
std::vector<std::string> words =
HadoopUtils::splitString(context.getInputValue(), " ");
for(unsigned int i=0; i < words.size(); ++i) {
context.emit(words[i], "1");
}
context.incrementCounter(inputWords, words.size());
}
};
class WordCountReduce: public HadoopPipes::Reducer {
public:
HadoopPipes::TaskContext::Counter* outputWords;
WordCountReduce(HadoopPipes::TaskContext& context) {
outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
}
void reduce(HadoopPipes::ReduceContext& context) {
int sum = 0;
while (context.nextValue()) {
sum += HadoopUtils::toInt(context.getInputValue());
}
context.emit(context.getInputKey(), HadoopUtils::toString(sum));
context.incrementCounter(outputWords, 1);
}
};
int main(int argc, char *argv[]) {
return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMap, WordCountReduce>());
}
这个程序连接的是一个C++库,结构类似于Java编写的程序。如新版API一样,这个程序使用context方法读入和收集对。在使用时要重写HadoopPipes名字空间下的Mapper和Reducer函数,并用context.emit()方法输出对。main函数是应用程序的入口,它调用HadoopPipes::runTask方法,这个方法由一个TemplateFactory参数来创建Map和Reduce实例,也可以重载factory设置combiner()、partitioner()、record reader、record writer。
接下来,编译这个程序。这个编译命令需要用到g++,读者可以使用apt自动安装这个程序。g++的命令格式如下所示:
apt-get install g++
然后建立文件Makerfile,如下所示:
HADOOP_INSTALL="你的hadoop安装文件夹"
PLATFORM=Linux-i386-32(如果是AMD的CPU,请使用Linux-amd64-64)
CC = g++
CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
wordcount: wordcount.cpp
$(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes -lhadooputils -lpthread -g -O2 -o $@
注意在$(CC)前有一个符号,这个分隔符是很关键的。
在当前目录下建立一个WordCount可执行文件。
接着,上传可执行文件到HDFS上,这是为了TaskTracker能够获得这个可执行文件。这里上传到bin文件夹内。
~/hadoop/bin/hadoop fs –mkdir bin
~/hadoop/bin/hadoop dfs –put wordcount bin
然后,就可以运行这个MapReduce程序了,可以采用两种配置方式运行这个程序。一种方式是直接在命令中运行指定配置,如下所示:
~/hadoop/bin/hadoop pipes\
-D hadoop.pipes.java.recordreader=true\
-D hadoop.pipes.java.recordwriter=true\
-input input\
-output Coutput\
-program bin/wordcount
另一种方式是预先将配置写入配置文件中,如下所示:
<?xml version="1.0"?>
<configuration>
<property>
// Set the binary path on DFS
<name>hadoop.pipes.executable</name>
<value>bin/wordcount</value>
</property>
<property>
<name>hadoop.pipes.java.recordreader</name>
<value>true</value>
</property>
<property>
<name>hadoop.pipes.java.recordwriter</name>
<value>true</value>
</property>
</configuration>
然后通过如下命令运行这个程序:
~/hadoop/bin/hadoop pipes -conf word.xml -input input -output output
将参数hadoop.pipes.executable和hadoop.pipes.java.recordreader设置为true表示使用Hadoop默认的输入输出方式(即Java的)。同样的,也可以设置一个Java语言编写的Mapper函数、Reducer函数、combiner函数和partitioner函数。实际上,在任何一个作业中,都可以混用Java类和C++类。