mahout源码分析之DistributedLanczosSolver(二) Job1

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit。

在上篇blog中的最后终端的信息可以看到,svd算法一共有5个Job任务。下面通过Mahout中DistributedLanczosSolver源代码来一个个分析:

为了方便后面的数据随时对照,使用wine.dat修改后的数据,如下(5行,13列):

14.23,1.71,2.43,15.6,127,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065
13.2,1.78,2.14,11.2,100,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050
13.16,2.36,2.67,18.6,101,2.8,3.24,0.3,2.81,5.68,1.03,3.17,1185
14.37,1.95,2.5,16.8,113,3.85,3.49,0.24,2.18,7.8,0.86,3.45,1480
13.24,2.59,2.87,21,118,2.8,2.69,0.39,1.82,4.32,1.04,2.93,735

1.首先,算法使用main方法进行调用,看到main方法中只用一句:

ToolRunner.run(new DistributedLanczosSolver().job(), args);  

所以直接去找run方法,进入96行的run方法,这里都是初始化我们设置的参数了,这里有一些参数是相对好理解的,比如数据的输入、输出以及输入数据的行数、列数,但是,这里有一个working dir参数,不知道是干什么的,还有symmetric,这个应该是说输入数据是否是对称的?rank参数,这个不理解。cleansvd,好像是对输出结果的一个修饰还是什么的,看官网上面设置为true,那这里也按照设置为true的思路来分析。如果cleansvd是true的话,那么接下来就有一个if条件判断了,就会进入:

if (cleansvd) {
      double maxError = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--maxError"));
      double minEigenvalue = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--minEigenvalue"));
      boolean inMemory = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--inMemory"));
      return run(inputPath,
                 outputPath,
                 outputTmpPath,
                 workingDirPath,
                 numRows,
                 numCols,
                 isSymmetric,
                 desiredRank,
                 maxError,
                 minEigenvalue,
                 inMemory);
    }

这里先初始化三个参数,这里都是按照默认的,maxError是0.05,minEigenvalue是0,inMemory是false;

然后进入run方法,这个run方法是调用142行的方法,这个方法中还有一个run方法以及另外一个Job,如下:

public int run(Path inputPath,
                 Path outputPath,
                 Path outputTmpPath,
                 Path workingDirPath,
                 int numRows,
                 int numCols,
                 boolean isSymmetric,
                 int desiredRank,
                 double maxError,
                 double minEigenvalue,
                 boolean inMemory) throws Exception {
    int result = run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, numCols,
        isSymmetric, desiredRank);
    if (result != 0) {
      return result;
    }
    Path rawEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
    return new EigenVerificationJob().run(inputPath,
                                          rawEigenVectorPath,
                                          outputPath,
                                          outputTmpPath,
                                          maxError,
                                          minEigenvalue,
                                          inMemory,
                                          getConf() != null ? new Configuration(getConf()) : new Configuration());
  }

这里看到有一个run方法,所以应该是这个run方法调用了三个Job,然后最后调用EigenVerificationJob.run()方法最后运行一个job,然后一共四个job,这个只是猜测。先看run方法里面的吧。进入181行的run方法,额,好吧,还在这个类中。这个run方法中才有点实质的内容:

public int run(Path inputPath,
                 Path outputPath,
                 Path outputTmpPath,
                 Path workingDirPath,
                 int numRows,
                 int numCols,
                 boolean isSymmetric,
                 int desiredRank) throws Exception {
    DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols);
    matrix.setConf(new Configuration(getConf() != null ? getConf() : new Configuration()));  

    LanczosState state;
    if (workingDirPath == null) {
      state = new LanczosState(matrix, desiredRank, getInitialVector(matrix));
    } else {
      HdfsBackedLanczosState hState =
          new HdfsBackedLanczosState(matrix, desiredRank, getInitialVector(matrix), workingDirPath);
      hState.setConf(matrix.getConf());
      state = hState;
    }
    solve(state, desiredRank, isSymmetric);  

    Path outputEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
    serializeOutput(state, outputEigenVectorPath);
    return 0;
  }

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索path
, 方法
, running job
, run
inmemory
mahout 自定义job、lanczos算法、lanczos、lanczos算法原理、block lanczos,以便于您获取更多的相关知识。

时间: 2024-12-22 04:15:54

mahout源码分析之DistributedLanczosSolver(二) Job1的相关文章

mahout源码分析之DistributedLanczosSolver(七) 总结篇

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 看svd算法官网上面使用的是亚马逊的云平台计算的,不过给出了svd算法的调用方式,当算出了eigenVectors后,应该怎么做呢?比如原始数据是600*60(600行,60列)的数据,计算得到的eigenVectors是24*60(其中的24是不大于rank的一个值),那么最后得到的结果应该是original_data乘以eigenVectors的转置这样就会得到一个600*24的矩阵,这样就达到了

mahout源码分析之DistributedLanczosSolver(五)

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 1. Job 篇 接上篇,分析到EigenVerificationJob的run方法: public int run(Path corpusInput, Path eigenInput, Path output, Path tempOut, double maxError, double minEigenValue, boolean inMemory, Configuration conf) thro

mahout源码分析之DistributedLanczosSolver(六) 完结篇

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 接上篇,分析完3个Job后得到继续往下:其实就剩下两个函数了: List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData); saveCleanEigens(new Configuration(), prunedEigenMeta); 看pruneEigens函数: priv

mahout源码分析之DistributedLanczosSolver(四)rawEigen简介

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 接上篇,eigen分解,额,太复杂了,人太浮躁了,静不下来分析(说java对矩阵操作支持度不足,额,好吧是外部原因). 1. 前奏: eigen分解的是triDiag矩阵,这个矩阵,上篇求得的结果是: [[0.315642761491587, 0.9488780991876485, 0.0], [0.9488780991876485, 2.855117440373572, 0.0], [0.0, 0.

mahout源码分析之DistributedLanczosSolver(三) Job2

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 1. 前奏: 本篇接着上篇继续分析,分析LanczosSolver中的:Vector nextVector = isSymmetric ? corpus.times(currentVector) : corpus.timesSquared(currentVector);之后.前篇说到这个是建立了一个job任务,并且按照一定的算法求得了一个nextVector,那么接下来是? if (state.get

mahout源码分析之DistributedLanczosSolver(一) 实战

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 本篇开始系列svd,即降维.这个在mahout中可以直接运行MAHOUT_HOME/mahout/svd -h 即可看到该算法的调用参数,或者在官网相应页面也可以看到,本次实战使用的svd的调用参数如下: package mahout.fansy.svd; import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;

Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现之DFSPacket

一.简介       HDFS在数据传输过程中,针对数据块Block,不是整个block进行传输的,而是将block切分成一个个的数据包进行传输.而DFSPacket就是HDFS数据传输过程中对数据包的抽象. 二.实现       HDFS客户端在往DataNodes节点写数据时,会以数据包packet的形式写入,且每个数据包包含一个包头,n个连续的校验和数据块checksum chunks和n个连续的实际数据块 actual data chunks,每个校验和数据块对应一个实际数据块,被用来做

TOMCAT源码分析——生命周期管理(二)

前言 我在<TOMCAT源码分析--生命周期管理(一)>一文中介绍了TOMCAT生命周期类接口设计.JMX.容器以及基于容器的事件与监听等内容,本文将接着介绍Tomcat7.0中容器生命周期的管理. 容器生命周期 每个容器都会有自身的生命周期,其中也涉及状态的迁移,以及伴随的事件生成,本节详细介绍Tomcat中的容器生命周期实现.所有容器的转态转换(如新疆.初始化.启动.停止等)都是由外到内,由上到下进行,即先执行父容器的状态转换及相关操作,然后再执行子容器的转态转换,这个过程是层层迭代执行的

HBase源码分析之MemStore的flush发起时机、判断条件等详情(二)

        在<HBase源码分析之MemStore的flush发起时机.判断条件等详情>一文中,我们详细介绍了MemStore flush的发起时机.判断条件等详情,主要是两类操作,一是会引起MemStore数据大小变化的Put.Delete.Append.Increment等操作,二是会引起HRegion变化的诸如Regin的分裂.合并以及做快照时的复制拷贝等,同样会触发MemStore的flush流程.同时,在<HBase源码分析之compact请求发起时机.判断条件等详情(一