Storm-源码分析- metric

首先定义一系列metric相关的interface, IMetric, IReducer, ICombiner (backtype.storm.metric.api)

在task中, 创建一系列builtin-metrics, (backtype.storm.daemon.builtin-metrics), 并注册到topology context里面

task会不断的利用如spout-acked-tuple!的functions去更新这些builtin-metrics

task会定期将builtin-metrics里面的统计数据通过METRICS-STREAM发送给metric-bolt (backtype.storm.metric.MetricsConsumerBolt, 该bolt会创建实现backtype.storm.metric.api.IMetricsConsumer的对象, 用于计算出metrics) 
然后如何使用这些metrics? 
由于这是builtin metrics, 是不会被外界使用的 
如果处理这些metrics, 取决于_metricsConsumer.handleDataPoints, 这里的_metricsConsumer是通过topology's configuration配置的 
比如backtype.storm.metric.LoggingMetricsConsumer, 如果使用这个consumer就会将metrics写入log中

 

1. backtype.storm.metric.api

IMetric

package backtype.storm.metric.api;
public interface IMetric {
    public Object getValueAndReset(); ;;取得当前值并恢复初始状态
}

CountMetric, 计数, reset时清零 
AssignableMetric, 赋值, 不用reset 
MultiCountMetric, 使用hashmap记录多个count, reset时分别对每个count对象调用getValueAndReset

public class CountMetric implements IMetric {
    long _value = 0;

    public CountMetric() {
    }

    public void incr() {
        _value++;
    }

    public void incrBy(long incrementBy) {
        _value += incrementBy;
    }

    public Object getValueAndReset() {
        long ret = _value;
        _value = 0;
        return ret;
    }
}

 

ICombiner

public interface ICombiner<T> {
    public T identity();
    public T combine(T a, T b);
}

CombinedMetric, 结合ICombiner和IMetric

public class CombinedMetric implements IMetric {
    private final ICombiner _combiner;
    private Object _value;

    public CombinedMetric(ICombiner combiner) {
        _combiner = combiner;
        _value = _combiner.identity();
    }

    public void update(Object value) {
        _value = _combiner.combine(_value, value);
    }

    public Object getValueAndReset() {
        Object ret = _value;
        _value = _combiner.identity();
        return ret;
    }
}

 

IReducer

public interface IReducer<T> { 
    T init(); 
    T reduce(T accumulator, Object input); 
    Object extractResult(T accumulator); 
}

实现IReducer接口, 实现平均数Reducer, reduce里面累加和计数, extractResult里面acc/count求平均数

class MeanReducerState {
    public int count = 0;
    public double sum = 0.0;
}

public class MeanReducer implements IReducer<MeanReducerState> {
    public MeanReducerState init() {
        return new MeanReducerState();
    }

    public MeanReducerState reduce(MeanReducerState acc, Object input) {
        acc.count++;
        if(input instanceof Double) {
            acc.sum += (Double)input;
        } else if(input instanceof Long) {
            acc.sum += ((Long)input).doubleValue();
        } else if(input instanceof Integer) {
            acc.sum += ((Integer)input).doubleValue();
        } else {
            throw new RuntimeException(
                "MeanReducer::reduce called with unsupported input type `" + input.getClass()
                + "`. Supported types are Double, Long, Integer.");
        }
        return acc;
    }

    public Object extractResult(MeanReducerState acc) {
        if(acc.count > 0) {
            return new Double(acc.sum / (double)acc.count);
        } else {
            return null;
        }
    }
}

 

ReducedMetric

结合IReducer和IMetric

public class ReducedMetric implements IMetric {
    private final IReducer _reducer;
    private Object _accumulator;

    public ReducedMetric(IReducer reducer) {
        _reducer = reducer;
        _accumulator = _reducer.init();
    }

    public void update(Object value) {
        _accumulator = _reducer.reduce(_accumulator, value);
    }

    public Object getValueAndReset() {
        Object ret = _reducer.extractResult(_accumulator);
        _accumulator = _reducer.init();
        return ret;
    }
}

 

IMetricsConsumer

这个interface, 内嵌TaskInfo和DataPoint类 
handleDataPoints, 添加逻辑以处理task对应的一系列DataPoint

public interface IMetricsConsumer {
    public static class TaskInfo {
        public TaskInfo() {}
        public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) {
            this.srcWorkerHost = srcWorkerHost;
            this.srcWorkerPort = srcWorkerPort;
            this.srcComponentId = srcComponentId;
            this.srcTaskId = srcTaskId;
            this.timestamp = timestamp;
            this.updateIntervalSecs = updateIntervalSecs;
        }
        public String srcWorkerHost;
        public int srcWorkerPort;
        public String srcComponentId;
        public int srcTaskId;
        public long timestamp;
        public int updateIntervalSecs;
    }
    public static class DataPoint {
        public DataPoint() {}
        public DataPoint(String name, Object value) {
            this.name = name;
            this.value = value;
        }
        @Override
        public String toString() {
            return "[" + name + " = " + value + "]";
        }
        public String name;
        public Object value;
    }

    void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);
    void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
    void cleanup();
}

 

2. backtype.storm.daemon.builtin-metrics

定义Spout和Bolt所需要的一些metric, 主要两个record, BuiltinSpoutMetrics和BuiltinBoltMetrics, [metric-name, metric-object]的hashmap

(defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count
                                ^MultiReducedMetric complete-latency
                                ^MultiCountMetric fail-count
                                ^MultiCountMetric emit-count
                                ^MultiCountMetric transfer-count])
(defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count
                               ^MultiReducedMetric process-latency
                               ^MultiCountMetric fail-count
                               ^MultiCountMetric execute-count
                               ^MultiReducedMetric execute-latency
                               ^MultiCountMetric emit-count
                               ^MultiCountMetric transfer-count])

(defn make-data [executor-type]
  (condp = executor-type
    :spout (BuiltinSpoutMetrics. (MultiCountMetric.)
                                 (MultiReducedMetric. (MeanReducer.))
                                 (MultiCountMetric.)
                                 (MultiCountMetric.)
                                 (MultiCountMetric.))
    :bolt (BuiltinBoltMetrics. (MultiCountMetric.)
                               (MultiReducedMetric. (MeanReducer.))
                               (MultiCountMetric.)
                               (MultiCountMetric.)
                               (MultiReducedMetric. (MeanReducer.))
                               (MultiCountMetric.)
                               (MultiCountMetric.))))

(defn register-all [builtin-metrics  storm-conf topology-context]
  (doseq [[kw imetric] builtin-metrics]
    (.registerMetric topology-context (str "__" (name kw)) imetric
                     (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))

在mk-task-data的时候, 调用make-data来创建相应的metrics,

:builtin-metrics (builtin-metrics/make-data (:type executor-data))

并在executor的mk-threads中, 会将这些builtin-metrics注册到topologycontext中去,

(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))

上面完成的builtin-metrics的创建和注册, 接着定义了一系列用于更新metrics的functions,

以spout-acked-tuple!为例, 需要更新MultiCountMetric ack-count和MultiReducedMetric complete-latency 
.scope从MultiCountMetric取出某个CountMetric, 然后incrBy来将stats的rate增加到count上

(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms]
  (-> m .ack-count (.scope stream) (.incrBy (stats-rate stats)))
  (-> m .complete-latency (.scope stream) (.update latency-ms)))

 

3. backtype.storm.metric

MetricsConsumerBolt

创建实现IMetricsConsumer的对象, 并在execute里面调用handleDataPoints

package backtype.storm.metric;
public class MetricsConsumerBolt implements IBolt {
    IMetricsConsumer _metricsConsumer;
    String _consumerClassName;
    OutputCollector _collector;
    Object _registrationArgument;

    public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) {
        _consumerClassName = consumerClassName;
        _registrationArgument = registrationArgument;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        try {
            _metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance();
        } catch (Exception e) {
            throw new RuntimeException("Could not instantiate a class listed in config under section " +
                Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
        }
        _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector);
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1));
        _collector.ack(input);
    }

    @Override
    public void cleanup() {
        _metricsConsumer.cleanup();
    }

}

 

SystemBolt

SystemBolt, 根据comments里面说的, 每个worker都有一个, taskid=-1 
定义些system相关的metric, 并注册到topologycontext里面

 

需要使用Java调用clojure, 所以需要import下面的package

import clojure.lang.AFn;
import clojure.lang.IFn; //funtion
import clojure.lang.RT;  //run-time

并且用到些用于监控memory和JVM的java package

java.lang.management.MemoryUsage, 表示内存使用量快照的MemoryUsage对象
java.lang.management.GarbageCollectorMXBean, 用于Java虚拟机的垃圾回收的管理接口, 比如发生的回收的总次数, 和累计回收时间
java.lang.management.RuntimeMXBean, 用于Java 虚拟机的运行时系统的管理接口

这个bolt的特点是, 只有prepare实现了逻辑, 并且通过_prepareWasCalled保证prepare只被执行一次 
prepare中的逻辑, 主要就是定义各种metric, 并且通过registerMetric注册到TopologyContext中 
metic包含, JVM的运行时间, 开始时间, memory情况, 和每个GarbageCollector的情况 
注册的这些system metrics也会一起被发送到MetricsConsumerBolt进行处理 
这应该用spout实现, 为啥用bolt实现?

// There is one task inside one executor for each worker of the topology.
// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
// This bolt was conceived to export worker stats via metrics api.
public class SystemBolt implements IBolt {
    private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
    private static boolean _prepareWasCalled = false;

    private static class MemoryUsageMetric implements IMetric {
        IFn _getUsage;
        public MemoryUsageMetric(IFn getUsage) {
            _getUsage = getUsage;
        }
        @Override
        public Object getValueAndReset() {
            MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke();
            HashMap m = new HashMap();
            m.put("maxBytes", memUsage.getMax());
            m.put("committedBytes", memUsage.getCommitted());
            m.put("initBytes", memUsage.getInit());
            m.put("usedBytes", memUsage.getUsed());
            m.put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed());
            m.put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed());
            return m;
        }
    }

    // canonically the metrics data exported is time bucketed when doing counts.
    // convert the absolute values here into time buckets.
    private static class GarbageCollectorMetric implements IMetric {
        GarbageCollectorMXBean _gcBean;
        Long _collectionCount;
        Long _collectionTime;
        public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
            _gcBean = gcBean;
        }
        @Override
        public Object getValueAndReset() {
            Long collectionCountP = _gcBean.getCollectionCount();
            Long collectionTimeP = _gcBean.getCollectionTime();

            Map ret = null;
            if(_collectionCount!=null && _collectionTime!=null) {
                ret = new HashMap();
                ret.put("count", collectionCountP - _collectionCount);
                ret.put("timeMs", collectionTimeP - _collectionTime);
            }

            _collectionCount = collectionCountP;
            _collectionTime = collectionTimeP;
            return ret;
        }
    }

    @Override
    public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) {
        if(_prepareWasCalled && stormConf.get(Config.STORM_CLUSTER_MODE) != "local") {
            throw new RuntimeException("A single worker should have 1 SystemBolt instance.");
        }
        _prepareWasCalled = true;

        int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));

        final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();

        context.registerMetric("uptimeSecs", new IMetric() {
            @Override
            public Object getValueAndReset() {
                return jvmRT.getUptime()/1000.0;
            }
        }, bucketSize);

        context.registerMetric("startTimeSecs", new IMetric() {
            @Override
            public Object getValueAndReset() {
                return jvmRT.getStartTime()/1000.0;
            }
        }, bucketSize);

        context.registerMetric("newWorkerEvent", new IMetric() {
            boolean doEvent = true;

            @Override
            public Object getValueAndReset() {
                if (doEvent) {
                    doEvent = false;
                    return 1;
                } else return 0;
            }
        }, bucketSize);

        final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();

        context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
            public Object invoke() {
                return jvmMemRT.getHeapMemoryUsage();
            }
        }), bucketSize);
        context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() {
            public Object invoke() {
                return jvmMemRT.getNonHeapMemoryUsage();
            }
        }), bucketSize);

        for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
            context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize);
        }
    }

    @Override
    public void execute(Tuple input) {
        throw new RuntimeException("Non-system tuples should never be sent to __system bolt.");
    }

    @Override
    public void cleanup() {
    }
}

 

4. system-topology!

这里会动态的往topology里面, 加入metric-component (MetricsConsumerBolt) 和system-component (SystemBolt), 以及相应的steam信息

system-topology!会往topology加上些东西

1. acker, 后面再说 
2. metric-bolt, input是所有component的tasks发来的METRICS-STREAM, 没有output 
3. system-bolt, 没有input, output是两个TICK-STREAM 
4. 给所有component, 增加额外的输出metrics-stream, system-stream

(defn system-topology! [storm-conf ^StormTopology topology]
  (validate-basic! topology)
  (let [ret (.deepCopy topology)]
    (add-acker! storm-conf ret)
    (add-metric-components! storm-conf ret)
    (add-system-components! storm-conf ret)
    (add-metric-streams! ret)
    (add-system-streams! ret)
    (validate-structure! ret)
    ret
    ))

4.1 增加component

看下thrift中的定义, 往topology里面增加一个blot component, 其实就是往hashmap中增加一组[string, Bolt] 
关键就是看看如何使用thrift/mk-bolt-spec*来创建blot spec

struct StormTopology {
  1: required map<string, SpoutSpec> spouts;
  2: required map<string, Bolt> bolts;
  3: required map<string, StateSpoutSpec> state_spouts;
}
struct Bolt {
  1: required ComponentObject bolt_object;
  2: required ComponentCommon common;
}
struct ComponentCommon {
  1: required map<GlobalStreamId, Grouping> inputs;
  2: required map<string, StreamInfo> streams; //key is stream id, outputs
  3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
  4: optional string json_conf;
}
struct StreamInfo {
  1: required list<string> output_fields;
  2: required bool direct;
}
(defn add-metric-components! [storm-conf ^StormTopology topology]
  (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)] ;;从metrics-consumer-bolt-specs中可以看出该bolt会以METRICS-STREAM-ID为输入, 且没有输出
    (.put_to_bolts topology comp-id bolt-spec)))

(defn add-system-components! [conf ^StormTopology topology]
  (let [system-bolt-spec (thrift/mk-bolt-spec*
                          {} ;;input为空, 没有输入
                          (SystemBolt.) ;;object
                          {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
                           METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])} ;;output, 定义两个output streams, 但代码中并没有emit
                          :p 0
                          :conf {TOPOLOGY-TASKS 0})]
    (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))

 

metric-components

首先, topology里面所有的component(包含system component), 都需要往metics-bolt发送统计数据, 所以component-ids-that-emit-metrics就是all-components-ids+SYSTEM-COMPONENT-ID 
那么对于任意一个comp, 都会对metics-bolt产生如下输入, {[comp-id METRICS-STREAM-ID] :shuffle} (采用:suffle grouping方式)

然后, 用thrift/mk-bolt-spec*来定义创建bolt的fn, mk-bolt-spec

最后, 调用mk-bolt-spec来创建metics-bolt的spec, 参考上面的定义 
关键就是, 创建MetricsConsumerBolt对象, 需要从storm-conf里面读出, MetricsConsumer的实现类和参数 
这个bolt负责, 将从各个task接收到的数据, 调用handleDataPoints生成metircs, 参考前面的定义

(defn metrics-consumer-bolt-specs [storm-conf topology]
  (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
        inputs (->> (for [comp-id component-ids-that-emit-metrics]
                      {[comp-id METRICS-STREAM-ID] :shuffle})
                    (into {}))

        mk-bolt-spec (fn [class arg p]
                       (thrift/mk-bolt-spec*
                        inputs  ;;inputs集合
                        (backtype.storm.metric.MetricsConsumerBolt. class arg) ;;object
                        {} ;;output为空
:p p :conf {TOPOLOGY-TASKS p}))]

    (map
     (fn [component-id register]
       [component-id (mk-bolt-spec (get register "class")
                                   (get register "argument")
                                   (or (get register "parallelism.hint") 1))])

     (metrics-consumer-register-ids storm-conf)
     (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))

 

4.2 增加stream

给每个component增加两个output stream 
METRICS-STREAM-ID, 发送给metric-blot, 数据结构为output-fields ["task-info" "data-points"] 
SYSTEM-STREAM-ID, ,数据结构为output-fields ["event"]

(defn add-metric-streams! [^StormTopology topology]
  (doseq [[_ component] (all-components topology)
          :let [common (.get_common component)]]
    (.put_to_streams common METRICS-STREAM-ID
                     (thrift/output-fields ["task-info" "data-points"]))))

(defn add-system-streams! [^StormTopology topology]
  (doseq [[_ component] (all-components topology)
          :let [common (.get_common component)]]
    (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))))

本文章摘自博客园,原文发布日期:2013-07-30
时间: 2024-11-10 00:27:11

Storm-源码分析- metric的相关文章

深入理解Spark:核心思想与源码分析

大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术丛书) ISBN 978-7-111-52234-8 I. 深- II.耿- III.数据处理软件 IV. TP274 中国版本图书馆CIP数据核字(2015)第280808号 深入理解Spark:核心思想与源码分析 出版发行:机械工业出版社(北京市西城区百万庄大街22号 邮政编码:100037)

Storm源码结构 (来源Storm Github Wiki)

写在前面 本文译自Storm Github Wiki: Structure of the codebase,有助于深入了解Storm的设计和源码学习.本人也是参照这个进行学习的,觉得在理解Storm设计的过程中起到了重要作用,所以也帖一份放在自己博客里.以下的模块分析里没有包括Storm 0.9.0增加的Netty模块,对应的代码包在Storm Github下的storm-netty文件夹内,内容比较简单,关于这块的release note可以参考Storm 0.9.0 Released Net

Hadoop2源码分析-MapReduce篇

1.概述 前面我们已经对Hadoop有了一个初步认识,接下来我们开始学习Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天为大家分享的是mapreduce部分,其内容目录如下所示: MapReduce V1 MapReduce V2 MR V1和MR V2的区别 MR V2的重构思路 本篇文章的源码是基于hadoop-2.6.0-src.tar.gz来完成的.代码下载地址,请参考<Hadoop2源码分析-准备篇>. 2.MapReduce V

Apache Storm源码阅读笔记&amp;OLAP在大数据时代的挑战

 <一>Apache Storm源码阅读笔记 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比较少,理解起来非常费劲. 尽管自己也陆续对storm的源码走读发表了一些博文,当时写的时候比较匆忙,有时候衔接的不是太好,此番做了一些整理,主要是针对TridentTopology部分,修改过的内容采用pdf格式发布,方便打印. 文章中有些内容的理解得益于徐明明和fxjwind两

WebWork2源码分析

web Author: zhuam   昨晚一口气看完了夏昕写的<<Webwork2_Guide>>,虽然文档资料很简洁,但仍不失为一本好的WebWork2书籍,看的出作者的经验和能力都是非常的老道,在此向作者的开源精神致敬,并在此引用夏昕的那句话So many open source projects, Why not Open your Documents?   今天下载了最新的WebWork2版本, 开始了源码分析,这份文档只能算是我的个人笔记,也没时间细细校对,且个人能力有

JUnir源码分析(一)

一.引子 JUnit源码是我仔细阅读过的第一个开源项目源码.阅读高手写的代码能学到一些好的编程风格和实现思路,这是提高自己编程水平行之有效的方法,因此早就想看看这些赫赫有名的框架是怎么回事了.今天就拿最简单的JUnit下手,也算开始自己的源码分析之路.   JUnit作为最著名的单元测试框架,由两位业界有名人士协力完成,已经经历了多次版本升级(了解JUnit基础.JUnit实践).JUnit总体来说短小而精悍,有不少值得我们借鉴的经验在里面:但是也有一些不足存在,当然这对于任何程序来说都是难免的

java io学习(三) 管道的简介,源码分析和示例

管道(PipedOutputStream和PipedInputStream)的简介,源码分析和示例 本章,我们对java 管道进行学习. java 管道介绍 在java中,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流. 它们的作用是让多线程可以通过管道进行线程间的通讯.在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用. 使用管道通信时,大致的流程是:我们在线程A中向PipedOutputStr

java io学习(二)ByteArrayOutputStream的简介,源码分析和示例

ByteArrayOutputStream的简介,源码分析和示例(包括OutputStream) 前面学习ByteArrayInputStream,了解了"输入流".接下来,我们学习与ByteArrayInputStream相对应的输出流,即ByteArrayOutputStream. 本章,我们会先对ByteArrayOutputStream进行介绍,在了解了它的源码之后,再通过示例来掌握如何使用它. ByteArrayOutputStream 介绍 ByteArrayOutputS

java io学习(一)ByteArrayInputStream的简介,源码分析和示例

ByteArrayInputStream的简介,源码分析和示例(包括InputStream) 我们以ByteArrayInputStream,拉开对字节类型的"输入流"的学习序幕. 本章,我们会先对ByteArrayInputStream进行介绍,然后深入了解一下它的源码,最后通过示例来掌握它的用法. ByteArrayInputStream 介绍 ByteArrayInputStream 是字节数组输入流.它继承于InputStream. 它包含一个内部缓冲区,该缓冲区包含从流中读取

mahout源码分析之DistributedLanczosSolver(七) 总结篇

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 看svd算法官网上面使用的是亚马逊的云平台计算的,不过给出了svd算法的调用方式,当算出了eigenVectors后,应该怎么做呢?比如原始数据是600*60(600行,60列)的数据,计算得到的eigenVectors是24*60(其中的24是不大于rank的一个值),那么最后得到的结果应该是original_data乘以eigenVectors的转置这样就会得到一个600*24的矩阵,这样就达到了