《Spark大数据分析实战》——3.4节MLlib

3.4 MLlib
MLlib是构建在Spark上的分布式机器学习库,充分利用了Spark的内存计算和适合迭代型计算的优势,将性能大幅度提升。同时由于Spark算子丰富的表现力,让大规模机器学习的算法开发不再复杂。
3.4.1 MLlib简介
MLlib是一些常用的机器学习算法和库在Spark平台上的实现。MLlib是AMPLab的在研机器学习项目MLBase的底层组件。MLBase是一个机器学习平台,MLI是一个接口层,提供很多结构,MLlib是底层算法实现层,如图3-17所示。

MLlib中包含分类与回归、聚类、协同过滤、数据降维组件以及底层的优化库,如

通过图3-18读者可以对MLlib的整体组件和依赖库有一个宏观的把握。
下面对图3-18中读者可能不太熟悉的底层组件进行简要介绍。
BLAS/LAPACK层:LAPACK是用Fortran编写的算法库,顾名思义,Linear Algebra PACKage,是为了解决通用的线性代数问题的。另外必须要提的算法包是BLAS(Basic Linear Algebra Subprograms),其实LAPACK底层是使用了BLAS库的。不少计算机厂商都提供了针对不同处理器进行了优化的BLAS/LAPACK算法包。
Netlib-java(官网为:https://github.com/fommil/netlib-java/)是一个对底层BLAS, LAPACK封装的Java接口层。
Breeze(官网为:https://github.com/scalanlp/breeze)是一个Scala写的数值处理库,提供向量、矩阵运算等API。
库依赖:MLlib底层使用到了Scala书写的线性代数库Breeze,Breeze底层依赖netlib-java库。netlib-java底层依赖原生的Fortran routines。所以,当用户使用时需要在节点上预先安装gfortran runtime library(下载地址:https://github.com/mikiobraun/jblas/wiki/Missing-Libraries)。由于许可证(license)问题,官方的MLlib依赖集中没有引入netlib-java原生库的依赖。如果运行时环境没有可用原生库,用户将会看到警告信息。如果程序中需要使用netlib-java的库,用户需要在项目中引入com.github.fommil.netlib:all:1.1.2的依赖或者参照指南(网址为:https://github.com/fommil/netlib-java/blob/master/README.md#machine-optimised-system-libraries)来建立用户自己的项目。如果用户需要使用python接口,则需要1.4或者更高版本的NumPy(注意:MLlib源码中注释有Experimental/DeveloperApi的API在未来的发布版本中可能会进行调整和改变,官方会在不同版本发布时提供迁移指南)。
3.4.2 MLlib中的聚类和分类
聚类和分类是机器学习中两个常用的算法,聚类将数据分开为不同的集合,分类对新数据进行类别预测,下面将就两类算法进行介绍。
1.?聚类和分类
(1)什么是聚类
聚类(Clustering)指将数据对象分组成为多个类或者簇(Cluster),它的目标是:在同一个簇中的对象之间具有较高的相似度,而不同簇中的对象差别较大。其实,聚类在人们日常生活中是一种常见行为,即所谓的“物以类聚,人以群分”,其核心思想在于分组,人们不断地改进聚类模式来学习如何区分各个事物和人。
(2)什么是分类
数据仓库、数据库或者其他信息库中有许多可以为商业、科研等活动的决策提供所需要的知识。分类与预测即是其中的两种数据分析形式,可以用来抽取能够描述重要数据集合或预测未来数据趋势。分类方法(Classif?ication)用于预测数据对象的离散类别(Categorical Label);预测方法(Prediction)用于预测数据对象的连续取值。
分类流程:新样本→特征选取→分类→评价
训练流程:训练集→特征选取→训练→分类器
最初,机器学习的分类应用大多都是在这些方法及基于内存基础上所构造的算法。目前,数据挖掘方法都要求具有基于外存以处理大规模数据集合能力,同时具有可扩展能力。
2.?MLlib中的聚类和分类
MLlib目前已经实现了K-Means聚类算法、朴素贝叶斯和决策树分类算法。这里主要介绍被广泛使用的K-Means聚类算法和朴素贝叶斯分类算法。
(1)K-Means算法
1)K-Means算法简介。
K-Means聚类算法能轻松地对聚类问题建模。K-Means聚类算法容易理解,并且能在分布式的环境下并行运行。学习K-Means聚类算法,能更容易地理解聚类算法的优缺点,以及其他算法对于特定数据的高效性。
K-Means聚类算法中的K是聚类的数目,在算法中会强制要求用户输入。如果将新闻聚类成诸如政治、经济、文化等大类,可以选择10~20的数字作为K。因为这种顶级类别的数量是很小的。如果要对这些新闻详细分类,选择50~100的数字也是没有问题的。K-Means聚类算法主要可以分为三步。第一步是为待聚类的点寻找聚类中心;第二步是计算每个点聚类中心的距离,将每个点聚类到离该点最近的聚类中去;第三步是计算聚类中所有点的坐标平均值,并将这个平均值作为新的聚类中心点。反复执行第二步,直到聚类中心不再进行大范围的移动,或者聚类次数达到要求为止。
2)k-Means示例。
下面的例子中有7名选手,每名选手有两个类别的比分,A类比分和B类比分如表3-6所示。

将1号和4号选手分别作为两个簇的中心点,下面每一步将选取的点计算和两个簇中心的欧几里德距离,哪个中心距离小就放到哪个簇中,如表3-8所示。

第二轮将使用(1.8,2.3)和(4.1,5.4)作为新的簇中心,重复以上的过程。直到迭代次数达到用户设定的次数终止。最后一轮的迭代分出的两个簇就是最后的聚类结果。
3)MLlib之K-Means源码解析。
MLlib中的K-Means的原理是:在同一个数据集上,跑多个K-Means算法(每个称为一个run),然后返回效果最好的那个聚类的类簇中心。初始的类簇中心点的选取有两种方法,一种是随机,另一种是采用KMeans||(KMeans++的一个变种)。算法的停止条件是迭代次数达到设置的次数,或者在某一次迭代后所有run的K-Means算法都收敛。
①?类簇中心初始化。
本节介绍的初始化方法是对于每个运行的K-Means都随机选择K个点作为初始
类簇:

private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = {
    // Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs * k, new Random().nextInt()).toSeq
Array.tabulate(runs)(r => sample.slice(r  k, (r + 1)  k).toArray)
  }
②?计算属于某个类簇的点。
在每一次迭代中,首先会计算属于各个类簇的点,然后更新各个类簇的中心。
// K-Means算法的并行实现通过Spark 的mapPartitions函数,通过该函数获取到分区的迭代器。
可以在每个分区内计算该分区内的点属于哪个类簇
// 之后对于每个运行算法中的每个类簇计算属于该类簇的点的个数以及累加和。
val totalContribs = data.mapPartitions { points =>
val runs = activeCenters.length
val k = activeCenters(0).length
val dims = activeCenters(0)(0).length

val sums = Array.fill(runs, k)(new DoubleMatrix(dims))
val counts = Array.fill(runs, k)(0L)

for (point <- points; (centers, runIndex) <- activeCenters.zipWithIndex) {
          // 找到距离该点最近的类簇中心点
val (bestCenter, cost) = KMeans.findClosest(centers, point)
          // 统计该运行算法开销, 用于在之后选取开销最小的那个运行的算法
costAccums(runIndex) += cost
          // 将该点加到最近的类簇的统计总和中去, 方便之后计算该类簇的新中心点
sums(runIndex)(bestCenter).addi(new DoubleMatrix(point))
          // 将距离该点最近的类簇的点数量加1,sum.divi(count)就是类簇的新中心
counts(runIndex)(bestCenter) += 1
        }

val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
          ((i, j), (sums(i)(j), counts(i)(j)))
        }
        contribs.iterator
// 对于每个运行算法的每个类簇计算属于该类簇的点的个数和加和
}.reduceByKey(mergeContribs).collectAsMap()

// mergeContribs是一个负责合并的函数:
def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
         (p1._1.addi(p2._1), p1._2 + p2._2)
}
③?更新类簇的中心点。
for ((run, i) <- activeRuns.zipWithIndex) {
var changed = false
for (j <- 0 until k) {
val (sum, count) = totalContribs((i, j))
if (count != 0) {
            // 计算类簇的新的中心点
val newCenter = sum.divi(count).data
if (MLUtils.squaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
            // 此处与代码和算法的停止条件有关
changed = true
            }
centers(run)(j) = newCenter
          }
        }
        // 如果某个run的KMeans算法的某轮次迭代中K个类簇的中心点变化都不超过指定阈值,
      则认为该KMeans算法收敛。
if (!changed) {
active(run) = false
logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations")
        }
costs(run) = costAccums(i).value
        }
④?算法停止条件。
算法的停止条件是迭代次数达到设置的次数,或者所有运行的K-Means算法都
收敛:
while (iteration < maxIterations && !activeRuns.isEmpty)

上文对典型聚类算法K-Means原理进行介绍,下面将对典型的分类算法朴素贝叶斯算法进行介绍。
(2)朴素贝叶斯分类算法
朴素贝叶斯分类算法是贝叶斯分类算法多个变种之一。朴素指假设各属性之间是相互独立的。研究发现,在大多数情况下,朴素贝叶斯分类算法(naive bayes classif?ier)在性能上与决策树(decision tree)、神经网络(netural network)相当。贝叶斯分类算法在大数据集的应用中具有方法简便、准确率高和速度快的优点。但事实上,贝叶斯分类也有其缺点。由于贝叶斯定理假设一个属性值对给定类的影响独立于其他的属性值,而此假设在实际情况中经常是不成立的,则其分类准确率可能会下降。
朴素贝叶斯分类算法是一种监督学习算法,使用朴素贝叶斯分类算法对文本进行分类,主要有两种模型,即多项式模型(multinomial model)和伯努利模型(bernoulli model)。MLlib使用的是被广泛使用的多项式模型。本书将以一个实际的例子来简略介绍使用多项式模型的朴素贝叶斯分类算法。
在多项式模型中,设某文档d=(t1,t2,…,tk),tk是该文档中出现过的单词,允许重复。
先验概率P(c) = 类c下单词总数/整个训练样本的单词总数
类条件概率P(tk|c) = (类c下单词tk在各个文档中出现过的次数之和+1)
/(类c下单词总数+|V|)
V是训练样本的单词表(即抽取单词,单词出现多次,只算一个),|V|则表示训练样本包含多少种单词。P(tk|c)可以看作是单词tk在证明d属于类c上提供了多大的证据,而P(c)则可以认为是类别c在整体上占多大比例(有多大可能性)。
给定一组分好类的文本训练数据,如表3-10所示。
给定一个新样本(河北河北河北吉林香港),对其进行分类。该文本用属性向量表示为d=(河北, 河北, 河北, 吉林, 香港),类别集合为Y={yes, no}。

类yes下总共有8个单词,类no下总共有3个单词,训练样本单词总数为11,因此P(yes)=8/11, P(no)=3/11。类条件概率计算如下:

P(河北 | yes)=(5+1)/(8+6)=6/14=3/7
P(河北 | yes)=P(吉林 | yes)= (0+1)/(8+6)=1/14
P(河北|no)=(1+1)/(3+6)=2/9
P(Japan|no)=P(吉林| no) =(1+1)/(3+6)=2/9

分母中的8,是指yes类别下textc的长度,也即训练样本的单词总数,6是指训练样本有河北、北京、上海、广东、吉林、香港,共6个单词,3是指no类下共有3个单词。
有了以上类条件概率,开始计算后验概率:

P(yes | d)=(3/7)3×1/14×1/14×8/11=108/184877≈0.00058417
P(no | d)= (2/9)3×2/9×2/9×3/11=32/216513≈0.00014780

比较大小,即可知道这个文档属于类别河北。

时间: 2024-12-06 14:05:15

《Spark大数据分析实战》——3.4节MLlib的相关文章

《Spark大数据分析实战》——3.3节GraphX

3.3 GraphX GraphX是Spark中的一个重要子项目,它利用Spark作为计算引擎,实现了大规模图计算的功能,并提供了类似Pregel的编程接口.GraphX的出现,将Spark生态系统变得更加完善和丰富:同时以其与Spark生态系统其他组件很好的融合,以及强大的图数据处理能力,在工业界得到了广泛的应用.本章主要介绍GraphX的架构.原理和使用方式.3.3.1 GraphX简介 GraphX是常用图算法在Spark上的并行化实现,同时提供了丰富的API接口.图算法是很多复杂机器学习

《Spark大数据分析实战》——1.2节Spark生态系统BDAS

1.2 Spark生态系统BDAS 目前,Spark已经发展成为包含众多子项目的大数据计算平台.BDAS是伯克利大学提出的基于Spark的数据分析栈(BDAS).其核心框架是Spark,同时涵盖支持结构化数据SQL查询与分析的查询引擎Spark SQL,提供机器学习功能的系统MLBase及底层的分布式机器学习库MLlib,并行图计算框架GraphX,流计算框架Spark Streaming,近似查询引擎BlinkDB,内存分布式文件系统Tachyon,资源管理框架Mesos等子项目.这些子项目在

《Spark大数据分析实战》——3.1节SQL on Spark

3.1 SQL on Spark AMPLab将大数据分析负载分为三大类型:批量数据处理.交互式查询.实时流处理.而其中很重要的一环便是交互式查询.大数据分析栈中需要满足用户ad-hoc.reporting.iterative等类型的查询需求,也需要提供SQL接口来兼容原有数据库用户的使用习惯,同时也需要SQL能够进行关系模式的重组.完成这些重要的SQL任务的便是Spark SQL和Shark这两个开源分布式大数据查询引擎,它们可以理解为轻量级Hive SQL在Spark上的实现,业界将该类技术

《Spark大数据分析实战》——3.5节本章小结

3.5 本章小结本章主要介绍了BDAS中广泛应用的几个数据分析组件.SQL on Spark提供在Spark上的SQL查询功能.让用户可以基于内存计算和SQL进行大数据分析.通过Spark Streaming,用户可以构建实时流处理应用,其高吞吐量,以及适合历史和实时数据混合分析的特性使其在流数据处理框架中突出重围.GraphX充当Spark生态系统中图计算的角色,其简洁的API让图处理算法的书写更加便捷.最后介绍了MLlib--Spark上的机器学习库,它充分利用Spark内存计算和适合迭代的

《Spark大数据分析实战》——1.4节弹性分布式数据集

1.4 弹性分布式数据集 本节将介绍弹性分布式数据集RDD.Spark是一个分布式计算框架,而RDD是其对分布式内存数据的抽象,可以认为RDD就是Spark分布式算法的数据结构,而RDD之上的操作是Spark分布式算法的核心原语,由数据结构和原语设计上层算法.Spark最终会将算法(RDD上的一连串操作)翻译为DAG形式的工作流进行调度,并进行分布式任务的分发.1.4.1 RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Da

《Spark大数据分析实战》——2.1节Spark应用开发环境配置

2.1 Spark应用开发环境配置 Spark的开发可以通过Intellij或者Eclipse IDE进行,在环境配置的开始阶段,还需要安装相应的Scala插件.2.1.1 使用Intellij开发Spark程序 本节介绍如何使用Intellij IDEA构建Spark开发环境和源码阅读环境.由于Intellij对Scala的支持更好,目前Spark开发团队主要使用Intellij作为开发环境. 1.?配置开发环境 (1)安装JDK 用户可以自行安装JDK8.官网地址:http://www.or

《Spark大数据分析实战》——3.2节Spark Streaming

3.2 Spark StreamingSpark Streaming是一个批处理的流式计算框架.它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错性.下面将对Spark Streaming进行详细的介绍.3.2.1 Spark Streaming简介Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流式大数据处理能力.Spark Streaming将数据流以时间片为单位进行分割形成RDD,使用RDD操作处理每一块数据,每块数据(也就

《Spark大数据分析实战》——1.1节初识Spark

1.1 初识SparkSpark是基于内存计算的大数据并行计算框架,因为它基于内存计算,所以提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群.1.?Spark执行的特点Hadoop中包含计算框架MapReduce和分布式文件系统HDFS.Spark是MapReduce的替代方案,而且兼容HDFS.Hive等分布式存储层,融入Hadoop的生态系统,并弥补MapReduce的不足.(1)中间结果输出Spark将执行工作流抽象

《Spark大数据分析实战》——1.3节Spark架构与运行逻辑

1.3 Spark架构与运行逻辑 1.?Spark的架构 Driver:运行Application的main()函数并且创建SparkContext. Client:用户提交作业的客户端. Worker:集群中任何可以运行Application代码的节点,运行一个或多个Executor进程. Executor:运行在Worker的Task执行器,Executor启动线程池运行Task,并且负责将数据存在内存或者磁盘上.每个Application都会申请各自的Executor来处理任务. Spar