intel-hadoop/HiBench流程分析----以贝叶斯算法为例

1.HiBench算法简介

Hibench 包含9个典型的hadoop负载(micro benchmarks,hdfs benchmarks,web search bench marks,machine learning benchmarks和data analytics benchmarks)

具体参考CDH集群安装&测试总结:第三节内容

  • micro benchmarks
    Sort:使用hadoop randomtextwriter生成数据,并对数据进行排序。
    Wordcount:统计输入数据中每个单词的出现次数,输入数据使用hadoop randomtextwriter生成。
    TeraSort:输入数据由hadoop teragen产生,通过key值进行排序。
  • hdfs benchmarks
    增强行的dfsio:通过产生大量同时执行读写请求的任务测试hadoop机群的hdfs吞吐量
  • web search bench marks
    Nutch indexing:大规模收索引擎,这个是负载测试nutch(apache的一个开源搜索引擎)的搜索子系统,使用自动生成的web数据,web数据中的连接和单词符合zipfian分布(一个单词出现的次数与它在频率表的排名成反比)
    Pagerank:这个负载包含在一种在hadoop上的pagerank的算法实现,使用自动生成的web数据,web数据中的链接符合zipfian分布。(对于任意一个term其频度(frequency)的排名(rank)和frequency的乘积大致是一个常数)
  • machine learning benchmarks
    Mahout bayesian classification(bayes):大规模机器学习,这个负载测试mahout(apache开源机器学习库)中的naive bayesian 训练器,输入的数据是自动生成的文档,文档中的单词符合zipfian分布。
    Mahout k-means clustering(kmeans):测试mahout中的k-means聚类算法,输入的数据集由基于平均分布和高斯分布的genkmeansdataset产生。
  • data analytics benchmarks
    Hive query benchmarks(hivebench):包含执行的典型olap查询的hive查询(aggregation和join),使用自动生成的web数据,web数据的链接符合zipfian分布。

注:使用的生成数据程序在hadoop-mapreduce-examples-2.6.0 jar 包内,可以使用反编译工具查看。


2.HiBench中bayes算法流程

  1. 主要流程为conf下配置测试项,测试语言和DataSize,然后运行bin下run-all.sh完成一次测试,此流程为手动完成,可以编写脚本重复此步骤完成多次测试减少手动操作;
    e.g.
#!/bin/bash

#       Time: 20160930,created by sunfei
#       Describe: automatic run the hibench
#       Functions :
#            search(): Find the style of application in the  99-user_defined_properties.conf,eg:tiny,small..
#                               exec_application_noSQL(): run the application for times,and no use hive
#                               exec_application_SQL(): run the application for times,and use hive
#                               save_result(): save the result of application
#                               main_function(): the main function of running all the appliction
#                               main(): the main function of running different kind application

cpuLoad()
{
        cpu=`grep -c 'model name' /proc/cpuinfo`
        load_15=`uptime | awk '{print $NF}'`
        average_load=`echo "scale=2;a=${load_15}/${cpu};if(length(a)==scale(a)) print 0;print a" | bc`
        date >> datetime-load.txt
        ${average_load} >> cpu-load.txt
        paste datetime-load.txt cpu-load.txt >> load-day.txt
}

search()
{
        #config="/opt/HiBench/HiBench-master/conf/99-user_defined_properties.conf"
        config=/usr/HiBench-master/conf/99-user_defined_properties.conf
        sed -n '/hibench.scale.profile/p' ${config} >> hibench.txt
        var=''
        while read line
        do
                        if [ ${line:0:13} = "hibench.scale" ];then
                                        echo -e "\033[32m match sucessfull! \033[0m"
                                        var=${line:22}
                        fi
        done<"hibench.txt"

        if [ "$var" = "${1}" ];then
                echo -e "\033[31m The style of application can't same,do you want to continue? yes | no \033[0m"
                read -p "Input your chose :" chose
                if [ "${chose}" = "no" ];then
                        exit 1
                else
                        echo -e "\033[32m The ${1}  style of application will be run! \033[0m"
                fi
        fi

        if [ -f "hibench.txt" ];then
                        rm -rf "hibench.txt"
                        echo -e "\033[32m The hibench.txt has deleted! \033[0m"
        fi

        echo -e "\033[32m The application will run the "${1}" style \033[0m"
    sed -i "s/${var}/${1}/" ${config}
}

exec_application_noSQL()
{
        var=0
        for ((i=1;i<=${1};i++))
        do
                        let "var=$i%1"
                        if [ "$var" -eq 0 ];then
                                        hadoop fs -rm  -r hdfs://archive.cloudera.com:8020/user/hdfs/.Trash/*
                                        hadoop fs -rm -r hdfs://archive.cloudera.com:8020/HiBench/*
                        fi
                        echo -e  "\033[32m **********************The current times is ********************:\033[0m" ${i}
                        #/opt/HiBench/HiBench-master/bin/run-all.sh
                        /usr/HiBench-master/bin/run-all.sh
                        echo -e  "\033[32m ********************** The current time is "${i}" ,and it has exec finished successfully! ********************:\033[0m"
        done
        echo -e "\033[32m *********The application has finished,please modify the configuration!***** \033[0m"
}

exec_application_SQL()
{
        var=0
        for ((i=1;i<=${1};i++))
        do
                        echo "drop table uservisits;drop table uservisits_aggre;drop table rankings;drop table rankings_uservisits_join;drop table uservisits_copy;exit;" | /usr/bin/hive
                        let "var=$i%1"
                        if [ "$var" -eq 0 ];then
                                        hadoop fs -rm  -r hdfs://archive.cloudera.com:8020/user/hdfs/.Trash/*
                                        hadoop fs -rm -r hdfs://archive.cloudera.com:8020/HiBench/*
                        fi
                        echo -e  "\033[32m **********************The current times is ********************:\033[0m" ${i}
                        #/opt/HiBench/HiBench-master/bin/run-all.sh
                        /usr/HiBench-master/bin/run-all.sh
                        echo -e  "\033[32m **********************The current time is "${i}" ,and it has exec finished successfully! ********************:\033[0m"
        done
        echo -e "\033[32m *********The application has finished,please modify the configuration!***** \033[0m"

}

save_result()
{
        if [ -f result.txt ];then
                        rm -rf result.txt
                         echo -e "\033[32m The hibench.txt has deleted! \033[0m"
        fi
        #select the words in the report
        #filepath=/opt/HiBench/HiBench-master/report/hibench.report
        filepath=/usr/HiBench-master/report/hibench.report
        word=""
        var1=`date +"%m/%d/%Y-%k:%M:%S"`
        var2=${1}
        var5=".txt"
        var4=${var2}${var5}
        case ${1} in
        "aggregation")
                word="JavaSparkAggregation"
                ;;
        "join")
                word="JavaSparkJoin"
                ;;
        "scan")
                word="JavaSparkScan"
                ;;
        "kmeans")
                word="JavaSparkKmeans"
                ;;
        "pagerank")
                word="JavaSparkPagerank"
                ;;
        "sleep")
                word="JavaSparkSleep"
                ;;
        "sort")
                word="JavaSparkSort"
                ;;
        "wordcount")
                word="JavaSparkWordcount"
                ;;
        "bayes")
                word="JavaSparkBayes"
                ;;
        "terasort")
                word="JavaSparkTerasort"
                ;;
        *)
                echo -e "\033[32m The name of application is wrong,please change it! \033[0m"
                ;;
        esac

        while read line
        do
                        echo $line | sed -n "/${word}/p" >> ${var4}
        done <$filepath
        echo -e "\033[32m The job has finished! \033[0m"
}

main_function()
{
        #Input the name of application need to exec
        for appName in aggregation join scan pagerank sleep sort wordcount bayes terasort kmeans
        do
                #appConfig=/opt/HiBench/HiBench-master/conf/benchmarks.lst
                appConfig=/usr/HiBench-master/conf/benchmarks.lst
                echo "The name of application is :"${appName}
                echo ${appName} > ${appConfig}
                        for style in tiny small large huge gigantic
                        do
                                search ${style}
                                if [ "aggregation" = ${appName} ] || [ "join" = ${appName} ] || [ "scan" = ${appName} ];then
                                                        exec_application_SQL ${1}
                                else
                                                        exec_application_noSQL ${1}
                                fi
                        done
                save_result ${appName}
        done
}

main()
{
        # run the application
        read -p "Input the times of exec: " times
        if [ "${times}" -eq 0 -o "${times}" -gt 60 ];then
                echo -e "\033[31m The times of application can't be empty or gt 60 ! Do you want to continue ? yes | no\033[0m"
                read -p "Input your chose :" chose
                if [ "${chose}" = "no" ];then
                        exit 1
                else
                        echo -e "\033[32m The application will be run ${times} times ! \033[0m"
                fi
        fi
        echo -e "\033[33m Select the style of application : \033[0m \033[31m All | Signal \033[0m"
        read -p "Input your chose :" style
        if [ "${style}" = "" ];then
                echo -e "\033[31m The style of application can't be empty \033[0m"
                exit 1
        elif [ "${style}" != "All" -a "${style}" != "Signal" ];then
                echo -e "\033[31m The style of application is wrong,please correct! \033[0m"
                exit 1
        else
                echo -e "\033[32m The style of application is ok ! \033[0m"
        fi
        if [ "All" = "${style}" ];then
                main_function ${times}
        else
                echo -e "\033[033m Input the name of apliaction,eg:\033[0m \033[31m aggregation | join | scan | kmeans | pagerank | sleep | sort | wordcount | bayes | terasort\033[0m"
                read -p "Input you chose :" application
                if [ "${application}" = "" ];then
                                echo -e "\033[31m The name of application can't be empty! \033[0m"
                                exit 1
                fi
                echo "********************The ${application} will be exec**********************"
                appConfig=/usr/HiBench-master/conf/benchmarks.lst
                #appConfig=/opt/HiBench/HiBench-master/conf/benchmarks.lst
                read -p "Do you want exec all the style of application,eg:tiny,small,large,huge,gigantic? yes | no " chose
                if [ "${chose}" = "" ];then
                        echo -e "\033[31m The style of application can't be empty! \033[0m"
                        exit 1
                elif [ "yes" != ${chose} ] && [ "no" != ${chose} ];then
                        echo -e "\033[31m The style of application is wrong,please correct! \033[0m"
                        exit 1
                else
                        echo -e "\033[32m The style of application is ok ! \033[0m"
                fi
                read -p "Input the sytle of application,eg:( tiny small large huge gigantic )!" appStyle
                echo "***************************The ${appStyle} style will be exec***************************"
                for appName in ${application}
                do
                        echo ${appName} > ${appConfig}
                        if [ "yes" = "${chose}" ];then
                                for var in tiny small large huge gigantic
                                do
                                        echo "******************The ${appName} will be exec!************************************"
                                        search ${var}
                                        if [ "aggregation" = ${appName} ] || [ "join" = ${appName} ] || [ "scan" = ${appName} ];then
                                                        exec_application_SQL ${times}
                                        else
                                                        exec_application_noSQL ${times}
                                        fi
                                done
                        else
                        #       read -p "Input the sytle of application,eg:( tiny small large huge gigantic )!" appStyle
                                echo "**************************The ${appName} will be exec!************************"
                                if [ "${appStyle}" = "" ];then
                                                echo -e "\033[31m The style of application can't be empty! \033[0m"
                                                exit 1
                                fi
                                for var in ${appStyle}
                                do
                                        search ${var}
                                        if [ "aggregation" = ${appName} ] || [ "join" = ${appName} ] || [ "scan" = ${appName} ];then
                                                exec_application_SQL ${times}
                                        else
                                                exec_application_noSQL ${times}
                                        fi
                                done
                        fi
                        save_result ${appName}
                done
        fi
}

# the main function of application
main
  1. prepare.sh->run.sh为run-all.sh的子流程;
  2. enter_bench->…->leave_bench为prepare.sh和run.sh的子流程;
  3. enter_bench…..gen_report等为workload-functions.sh中的公共函数。

流程图如下:

2.1 数据生成代码分析,接口:HiBench.DataGen

对java代码我不太熟悉,接口中我看主要用了一个switch语句

DataGen类中DataOptions options = new DataOptions(args);
如果是bayes测试的话,就调用对应的数据生成类,进行数据生成。生成的数据接口部分代码:

case BAYES: {
                BayesData data = new BayesData(options);
                data.generate();
                break;
            }

BayesData实现:

package HiBench;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Random;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.NLineInputFormat;

public class BayesData {

    private static final Log log = LogFactory.getLog(BayesData.class.getName());

    private DataOptions options;
    private Dummy dummy;
    private int cgroups;

    BayesData(DataOptions options) {
        this.options = options;
        parseArgs(options.getRemainArgs());
    }

    private void parseArgs(String[] args) {

        for (int i=0; i<args.length; i++) {
            if ("-class".equals(args[i])) {
                cgroups = Integer.parseInt(args[++i]);
            } else {
                DataOptions.printUsage("Unknown bayes data arguments -- " + args[i] + "!!!");
                System.exit(-1);
            }
        }
    }

    private static class CreateBayesPages extends MapReduceBase implements
    Mapper<LongWritable, Text, Text, Text> {

        private static final Log log = LogFactory.getLog(CreateBayesPages.class.getName());

        private long pages, slotpages;
        private int groups;
        private HtmlCore generator;
        private Random rand;

        public void configure(JobConf job) {
            try {
                pages = job.getLong("pages", 0);
                slotpages = job.getLong("slotpages", 0);
                groups = job.getInt("groups", 0);

                generator = new HtmlCore(job);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            int slotId = Integer.parseInt(value.toString().trim());
            long[] range = HtmlCore.getPageRange(slotId, pages, slotpages);
            generator.fireRandom(slotId);
            rand = new Random(slotId * 1000 + 101);

            Text k = new Text();
            for (long i=range[0]; i<range[1]; i++) {
                String classname = "/class" + rand.nextInt(groups);
                k.set(classname);
                value.set(generator.genBayesWords());
                output.collect(k, value);
                reporter.incrCounter(HiBench.Counters.BYTES_DATA_GENERATED,
                    k.getLength()+value.getLength());
                if (0==(i % 10000)) {
                    log.info("still running: " + (i - range[0]) + " of " + slotpages);
                }
            }
        }
    }

    private void setBayesOptions(JobConf job) throws URISyntaxException {
        job.setLong("pages", options.getNumPages());
        job.setLong("slotpages", options.getNumSlotPages());
        job.setInt("groups", cgroups);

        Utils.shareWordZipfCore(options, job);
    }

    private void createBayesData() throws IOException, URISyntaxException {

        log.info("creating bayes text data ... ");

        JobConf job = new JobConf();

        Path fout = options.getResultPath();
        Utils.checkHdfsPath(fout);

        String jobname = "Create bayes data";
        job.setJobName(jobname);

        Utils.shareDict(options, job);

        setBayesOptions(job);

        FileInputFormat.setInputPaths(job, dummy.getPath());
        job.setInputFormat(NLineInputFormat.class);

        job.setJarByClass(CreateBayesPages.class);
        job.setMapperClass(CreateBayesPages.class);
        job.setNumReduceTasks(0);

        FileOutputFormat.setOutputPath(job, fout);
        job.setOutputFormat(SequenceFileOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        log.info("Running Job: " +jobname);
        log.info("Pages file " + dummy.getPath() + " as input");
        log.info("Rankings file " + fout + " as output");
        JobClient.runJob(job);
        log.info("Finished Running Job: " + jobname);
    }

    private void init() throws IOException {

        Utils.checkHdfsPath(options.getResultPath(), true);
        Utils.checkHdfsPath(options.getWorkPath(), true);

        dummy = new Dummy(options.getWorkPath(), options.getNumMaps());

        int words = RawData.putDictToHdfs(new Path(options.getWorkPath(), HtmlCore.getDictName()), options.getNumWords());
        options.setNumWords(words);

        Utils.serialWordZipf(options);
    }

    public void generate() throws Exception {

        init();

        createBayesData();

        close();
    }

    private void close() throws IOException {
        log.info("Closing bayes data generator...");
        Utils.checkHdfsPath(options.getWorkPath());
    }
}

prepare.sh运行时输出如下,可以看到刚开始主要是读取配置文件中的内容,随后调用hadoop和jar包跑了一个任务,这个就是bayes文本分类的生成数据,按照第一节以及介绍的和官网的说明,这个文本主要使用linux中的字典:”/usr/share/dict/words”并且符合zipfian分布。

[hdfs@sf11 prepare]$ ./prepare.sh
patching args=
Parsing conf: /opt/HiBench/HiBench-master/conf/00-default-properties.conf
Parsing conf: /opt/HiBench/HiBench-master/conf/01-default-streamingbench.conf
Parsing conf: /opt/HiBench/HiBench-master/conf/10-data-scale-profile.conf
Parsing conf: /opt/HiBench/HiBench-master/conf/20-samza-common.conf
Parsing conf: /opt/HiBench/HiBench-master/conf/30-samza-workloads.conf
Parsing conf: /opt/HiBench/HiBench-master/conf/99-user_defined_properties.conf
Parsing conf: /opt/HiBench/HiBench-master/workloads/bayes/conf/00-bayes-default.conf
Parsing conf: /opt/HiBench/HiBench-master/workloads/bayes/conf/10-bayes-userdefine.conf
probe sleep jar: /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/share/hadoop/mapreduce2/hadoop-mapreduce-client-jobclient-tests.jar
start HadoopPrepareBayes bench
/opt/HiBench/HiBench-master/bin/functions/workload-functions.sh: line 120: /dev/stderr: Permission denied
rm: `hdfs://archive.cloudera.com:8020/HiBench/Bayes/Input’: No such file or directory
Submit MapReduce Job: /opt/cloudera/parcels/CDH/lib/hadoop/bin/hadoop –config /etc/hadoop/conf jar /opt/HiBench/HiBench-master/src/autogen/target/autogen-5.0-SNAPSHOT-jar-with-dependencies.jar HiBench.DataGen -t bayes -b hdfs://archive.cloudera.com:8020/HiBench/Bayes -n Input -m 300 -r 1600 -p 500000 -class 100 -o sequence
16/10/21 16:34:02 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/10/21 16:34:32 INFO HiBench.BayesData: Closing bayes data generator…
finish HadoopPrepareBayes bench

部分生成数据:

在看了将近两周的HiBench代码进行测试后,终于摸清上述的运行流程,intel 的这个测试框架确实比较简介,通过配置文件和shell以及一些大数据框架自带的例子(如Hibench中的workcount测试就是直接调用hadoop或者spark自带的程序)完成了整个庞大的测试工作,下面我们针对贝叶斯文本分类算法中HiBench使用的三种语言:python,scala,java分别进行分析:

2.3 python代码分析


部分python代码:

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A naive bayes program using MLlib.

This example requires NumPy (http://www.numpy.org/).
"""

import sys

from pyspark import SparkContext
from pyspark.mllib.util import MLUtils
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.storagelevel import StorageLevel
from operator import add
from itertools import groupby
#
# Adopted from spark's doc: http://spark.apache.org/docs/latest/mllib-naive-bayes.html
#
def parseVector(line):
    return np.array([float(x) for x in line.split(' ')])

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: bayes <file>"
        exit(-1)
    sc = SparkContext(appName="PythonNaiveBayes")
    filename = sys.argv[1]

    data = sc.sequenceFile(filename, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text")
    wordCount = data                                \
        .flatMap(lambda (key, doc):doc.split(" "))    \
        .map(lambda x:(x, 1))                                \
        .reduceByKey(add)

    wordSum = wordCount.map(lambda x:x[1]).reduce(lambda x,y:x+y)
    wordDict = wordCount.zipWithIndex()             \
        .map(lambda ((key, count), index): (key, (index, count*1.0 / wordSum)) )             \
        .collectAsMap()
    sharedWordDict = sc.broadcast(wordDict)

    # for each document, generate vector based on word freq
    def doc2vector(dockey, doc):
        # map to word index: freq
        # combine freq with same word
        docVector = [(key, sum((z[1] for z in values))) for key, values in
                     groupby(sorted([sharedWordDict.value[x] for x in doc.split(" ")],
                                    key=lambda x:x[0]),
                             key=lambda x:x[0])]

        (indices, values) = zip(*docVector)      # unzip
        label = float(dockey[6:])
        return label, indices, values

    vector = data.map( lambda (dockey, doc) : doc2vector(dockey, doc))

    vector.persist(StorageLevel.MEMORY_ONLY)
    d = vector.map( lambda (label, indices, values) : indices[-1] if indices else 0)\
              .reduce(lambda a,b:max(a,b)) + 1

#    print "###### Load svm file", filename
    #examples = MLUtils.loadLibSVMFile(sc, filename, numFeatures = numFeatures)
    examples = vector.map( lambda (label, indices, values) : LabeledPoint(label, Vectors.sparse(d, indices, values)))

    examples.cache()

    # FIXME: need randomSplit!
    training = examples.sample(False, 0.8, 2)
    test = examples.sample(False, 0.2, 2)

    numTraining = training.count()
    numTest = test.count()
    print " numTraining = %d, numTest = %d." % (numTraining, numTest)
    model = NaiveBayes.train(training, 1.0)

    model_share = sc.broadcast(model)
    predictionAndLabel = test.map( lambda x: (x.label, model_share.value.predict(x.features)))
#    prediction = model.predict(test.map( lambda x: x.features ))
#    predictionAndLabel = prediction.zip(test.map( lambda x:x.label ))
    accuracy = predictionAndLabel.filter(lambda x: x[0] == x[1]).count() * 1.0 / numTest

    print "Test accuracy = %s." % accuracy

2.4 scala 代码分析

run-spark-job org.apache.spark.examples.mllib.SparseNaiveBayes ${INPUT_HDFS}

显然scala 的朴素贝叶斯就是调用spark mllib库中的代码了



2.5 java 代码分析

run-spark-job com.intel.sparkbench.bayes.JavaBayes ${INPUT_HDFS}

java部分比较意外的HiBench没有采用原生的代码或者jar包,而是自己写了一个
代码如下,回头慢慢分析:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.intel.sparkbench.bayes;

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.rdd.RDD;
import org.apache.spark.storage.StorageLevel;
import scala.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.hadoop.io.Text;

import java.lang.Boolean;
import java.lang.Double;
import java.lang.Long;
import java.util.*;
import java.util.regex.Pattern;

/*
 * Adopted from spark's doc: http://spark.apache.org/docs/latest/mllib-naive-bayes.html
 */
public final class JavaBayes {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {

    if (args.length < 1) {
      System.err.println("Usage: JavaBayes <file>");
      System.exit(1);
    }

    Random rand = new Random();

    SparkConf sparkConf = new SparkConf().setAppName("JavaBayes");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//    int numFeatures = Integer.parseInt(args[1]);

    // Generate vectors according to input documents
    JavaPairRDD<String, String> data = ctx.sequenceFile(args[0], Text.class, Text.class)
            .mapToPair(new PairFunction<Tuple2<Text, Text>, String, String>() {
                @Override
                public Tuple2<String, String> call(Tuple2<Text, Text> e) {
                    return new Tuple2<String, String>(e._1().toString(), e._2().toString());
                }
            });

    JavaPairRDD<String, Long> wordCount = data
            .flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
                @Override
                public Iterable<String> call(Tuple2<String, String> e) {
                    return Arrays.asList(SPACE.split(e._2()));
                }
            })
            .mapToPair(new PairFunction<String, String, Long>() {
                @Override
                public Tuple2<String, Long> call(String e) {
                    return new Tuple2<String, Long>(e, 1L);
                }
            })
            .reduceByKey(new Function2<Long, Long, Long>() {
                @Override
                public Long call(Long i1, Long i2) {
                    return i1 + i2;
                }
            });

      final Long wordSum = wordCount.map(new Function<Tuple2<String, Long>, Long>(){
          @Override
          public Long call(Tuple2<String, Long> e) {
              return e._2();
          }
      })
      .reduce(new Function2<Long, Long, Long>() {
          @Override
          public Long call(Long v1, Long v2) throws Exception {
              return v1 + v2;
          }
      });

    List<Tuple2<String, Tuple2<Long, Double>>> wordDictList = wordCount.zipWithIndex()
            .map(new Function<Tuple2<Tuple2<String, Long>, Long>, Tuple2<String, Tuple2<Long, Double>>>() {
                @Override
                public Tuple2<String, Tuple2<Long, Double>> call(Tuple2<Tuple2<String, Long>, Long> e) throws Exception {
                    String key = e._1()._1();
                    Long count = e._1()._2();
                    Long index = e._2();
                    return new Tuple2<String, Tuple2<Long, Double>>(key, new Tuple2<Long, Double>(index,
                            count.doubleValue() / wordSum));
                }
            }).collect();

    Map<String, Tuple2<Long, Double>> wordDict = new HashMap();
    for (Tuple2<String, Tuple2<Long, Double>> item : wordDictList) {
        wordDict.put(item._1(), item._2());
    }

    final Broadcast<Map<String, Tuple2<Long, Double>>> sharedWordDict = ctx.broadcast(wordDict);

    // for each document, generate vector based on word freq
      JavaRDD<Tuple3<Double, Long[], Double[]>> vector = data.map(new Function<Tuple2<String, String>, Tuple3<Double, Long[], Double[]>>() {
          @Override
          public Tuple3<Double, Long[], Double[]> call(Tuple2<String, String> v1) throws Exception {
              String dockey = v1._1();
              String doc = v1._2();
              String[] keys = SPACE.split(doc);
              Tuple2<Long, Double>[] datas = new Tuple2[keys.length];
              for (int i = 0; i < keys.length; i++) {
                  datas[i] = sharedWordDict.getValue().get(keys[i]);
              }
              Map<Long, Double> vector = new HashMap<Long, Double>();
              for (int i = 0; i < datas.length; i++) {
                  Long indic = datas[i]._1();
                  Double value = datas[i]._2();
                  if (vector.containsKey(indic)) {
                      vector.put(indic, value + vector.get(indic));
                  } else {
                      vector.put(indic, value);
                  }
              }

              Long[] indices = new Long[vector.size()];
              Double[] values = new Double[vector.size()];

              SortedSet<Long> sortedKeys = new TreeSet<Long>(vector.keySet());
              int c = 0;
              for (Long key : sortedKeys) {
                  indices[c] = key;
                  values[c] = vector.get(key);
                  c+=1;
              }

              Double label = Double.parseDouble(dockey.substring(6));
              return new Tuple3<Double, Long[], Double[]>(label, indices, values);
          }
      });

      vector.persist(StorageLevel.MEMORY_ONLY());
       final Long d = vector
               .map(new Function<Tuple3<Double,Long[],Double[]>, Long>() {
                   @Override
                   public Long call(Tuple3<Double, Long[], Double[]> v1) throws Exception {
                       Long[] indices = v1._2();
                       if (indices.length > 0) {
//                           System.out.println("v_length:"+indices.length+"  v_val:" + indices[indices.length - 1]);
                           return indices[indices.length - 1];
                       } else return Long.valueOf(0);
                   }
               })
              .reduce(new Function2<Long, Long, Long>() {
                  @Override
                  public Long call(Long v1, Long v2) throws Exception {
//                      System.out.println("v1:"+v1+"  v2:"+v2);
                      return v1 > v2 ? v1 : v2;
                  }
              }) + 1;

    RDD<LabeledPoint> examples = vector.map(new Function<Tuple3<Double,Long[],Double[]>, LabeledPoint>() {
        @Override
        public LabeledPoint call(Tuple3<Double, Long[], Double[]> v1) throws Exception {
            int intIndices [] = new int[v1._2().length];
            double intValues [] = new double[v1._3().length];
            for (int i=0; i< v1._2().length; i++){
                intIndices[i] = v1._2()[i].intValue();
                intValues[i] = v1._3()[i];
            }
            return new LabeledPoint(v1._1(), Vectors.sparse(d.intValue(),
                    intIndices, intValues));
        }
    }).rdd();

    //RDD<LabeledPoint> examples = MLUtils.loadLibSVMFile(ctx.sc(), args[0], false, numFeatures);
    RDD<LabeledPoint>[] split = examples.randomSplit(new double[]{0.8, 0.2}, rand.nextLong());

    JavaRDD<LabeledPoint> training = split[0].toJavaRDD();
    JavaRDD<LabeledPoint> test = split[1].toJavaRDD();

    final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);
    JavaRDD<Double> prediction =
        test.map(new Function<LabeledPoint, Double>() {
            @Override
            public Double call(LabeledPoint p) {
                return model.predict(p.features());
            }
        });

    JavaPairRDD < Double, Double > predictionAndLabel =
        prediction.zip(test.map(new Function<LabeledPoint, Double>() {
            @Override
            public Double call(LabeledPoint p) {
                return p.label();
            }
        }));

    double accuracy = (double) predictionAndLabel.filter(
            new Function<Tuple2<Double, Double>, Boolean>() {
                @Override
                public Boolean call(Tuple2<Double, Double> pl) {
                    return pl._1().equals(pl._2());
                }
            }).count() / test.count();

    System.out.println(String.format("Test accuracy = %f", accuracy));
    ctx.stop();
  }
}


3.运行结果

Type Date Time Input_data_size Duration(s) Throughput(bytes/s) Throughput/node
JavaSparkBayes 2016-10-09 16:41:09 113387030 48.857 2320793 2320793
ScalaSparkBayes 2016-10-09 16:42:00 113387030 45.164 2510562 2510562
PythonSparkBayes 2016-10-09 16:44:03 113387030 118.521 956683 956683

bayes算法数据规模参考:

#Bayes
hibench.bayes.tiny.pages 25000
hibench.bayes.tiny.classes 10
hibench.bayes.tiny.ngrams 1
hibench.bayes.small.pages 30000
hibench.bayes.small.classes 100
hibench.bayes.small.ngrams 2
hibench.bayes.large.pages 100000
hibench.bayes.large.classes 100
hibench.bayes.large.ngrams 2
hibench.bayes.huge.pages 500000
hibench.bayes.huge.classes 100
hibench.bayes.huge.ngrams 2
hibench.bayes.gigantic.pages 1000000
hibench.bayes.gigantic.classes 100
hibench.bayes.gigantic.ngrams 2
hibench.bayes.bigdata.pages 20000000
hibench.bayes.bigdata.classes 20000
hibench.bayes.bigdata.ngrams 2


参考文献

https://github.com/intel-hadoop/HiBench

时间: 2024-10-27 02:43:46

intel-hadoop/HiBench流程分析----以贝叶斯算法为例的相关文章

mahout 是否有基于内存模式的贝叶斯算法实现

问题描述 mahout 是否有基于内存模式的贝叶斯算法实现 数据量比较小,是否可以直接连数据库查询获取原始数据, mahout是否有这种实现 还是只能和hadoop连用

java weka 朴素贝叶斯算法 数据分类问题

问题描述 java weka 朴素贝叶斯算法 数据分类问题 现在有一个需求需要实现如下功能: 根据x,y,z的值得到状态,比如:0.5,0.1,0.2 状态是sit;0.6,0.1,0.2 状态是stand. 后来上网查到weka这个东西,因为我对算法确实不太了解,也没看懂,只能模仿. 我有一个训练样本,命名test1.arff,内容的一部分如下: @relation test1 @attribute x numeric @attribute y numeric @attribute z num

程序-基于朴素贝叶斯算法的java软件实现

问题描述 基于朴素贝叶斯算法的java软件实现 做一个exe的能够实现朴素贝叶斯算法的软件,求源程序,需要用到那些包可以标注下 解决方案 http://download.csdn.net/detail/walking56849/6972831http://blog.csdn.net/xceman1997/article/details/7955349

贝叶斯算法中,最后一步计算概率时,分子和分母极有可能是零怎么解决

问题描述 贝叶斯算法介绍一.贝叶斯过滤算法的基本步骤1)收集大量的垃圾邮件和非垃圾邮件,建立垃圾邮件集和非垃圾邮件集.2)提取邮件主题和邮件体中的独立字串例如ABC32,¥234等作为TOKEN串并统计提取出的TOKEN串出现的次数即字频.按照上述的方法分别处理垃圾邮件集和非垃圾邮件集中的所有邮件.3)每一个邮件集对应一个哈希表,hashtable_good对应非垃圾邮件集而hashtable_bad对应垃圾邮件集.表中存储TOKEN串到字频的映射关系.4)计算每个哈希表中TOKEN串出现的概率

朴素贝叶斯算法的python实现方法_python

本文实例讲述了朴素贝叶斯算法的python实现方法.分享给大家供大家参考.具体实现方法如下: 朴素贝叶斯算法优缺点 优点:在数据较少的情况下依然有效,可以处理多类别问题 缺点:对输入数据的准备方式敏感 适用数据类型:标称型数据 算法思想: 比如我们想判断一个邮件是不是垃圾邮件,那么我们知道的是这个邮件中的词的分布,那么我们还要知道:垃圾邮件中某些词的出现是多少,就可以利用贝叶斯定理得到. 朴素贝叶斯分类器中的一个假设是:每个特征同等重要 函数loadDataSet() 创建数据集,这里的数据集是

贝叶斯算法

2010-09-17 13:09 by T2噬菌体, 154746 阅读, 49 评论, 收藏, 编辑 0.写在前面的话       我个人一直很喜欢算法一类的东西,在我看来算法是人类智慧的精华,其中蕴含着无与伦比的美感.而每次将学过的算法应用到实际中,并解决了实际问题后,那种快感更是我在其它地方体会不到的.       一直想写关于算法的博文,也曾写过零散的两篇,但也许是相比于工程性文章来说太小众,并没有引起大家的兴趣.最近面临毕业找工作,为了能给自己增加筹码,决定再次复习算法方面的知识,我决

朴素贝叶斯算法

我们来约定一下: S: 邮件为垃圾邮件的概率 V: 邮件含有'viagra'词的概率 贝叶斯会告诉我们已知这个邮件含有viagra词,判断为垃圾邮件的概率: 假设垃圾邮件和非垃圾邮件的概率都是0.5,即: 则通过上面的公式得到: 假设50%的垃圾邮件中有'vargra',只有1%的非垃圾邮件中含有这个单词,问这个邮件是垃圾邮件的概率: 可见,含有这个单词一般都是垃圾邮件. 更加精致的邮件分类器 想象一下我们有一堆字母,,我们使用Xi表示一封信中含有此词的概率.同样,表示垃圾邮件中含有单词的概率,

Andrew Ng机器学习公开课笔记 -- 朴素贝叶斯算法

网易公开课,第5,6课  notes,http://cs229.stanford.edu/notes/cs229-notes2.pdf 前面讨论了高斯判别分析,是一种生成学习算法,其中x是连续值  这里要介绍第二种生成学习算法,Naive Bayes算法,其中x是离散值的向量  这种算法常用于文本分类,比如分类垃圾邮件 首先,如何表示一个文本,即x?   以上面这种向量来表示,字典中的词是否在该文本中出现  其中每个词,可以看作是一个特征,对于特征的选取,可以过滤到stop word,或只选取出

详解基于朴素贝叶斯的情感分析及 Python 实现

相对于「 基于词典的分析 」,「 基于机器学习 」的就不需要大量标注的词典,但是需要大量标记的数据,比如: 还是下面这句话,如果它的标签是: 服务质量 - 中 (共有三个级别,好.中.差) ╮(╯-╰)╭,其是机器学习,通过大量已经标签的数据训练出一个模型, 然后你在输入一条评论,来判断标签级别 宁馨的点评 国庆活动,用62开头的信用卡可以6.2元买一个印有银联卡标记的冰淇淋, 有香草,巧克力和抹茶三种口味可选,我选的是香草口味,味道很浓郁. 另外任意消费都可以10元买两个马卡龙,个头虽不是很大