Spark机器学习之推荐引擎

Spark机器学习之推荐引擎

一. 最小二乘法建立模型

关于最小二乘法矩阵分解,我们可以参阅:

一、矩阵分解模型。

用户对物品的打分行为可以表示成一个评分矩阵A(m*n),表示m个用户对n各物品的打分情况。如下图所示:

其中,A(i,j)表示用户user i对物品item j的打分。但是,ALS 的核心就是下面这个假设:的打分矩阵
A 可以用两个小矩阵和的乘积来近似:。这样我们就把整个系统的自由度从一下降到了。我们接下来就聊聊为什么
ALS 的低秩假设是合理的。世上万千事物,人们的喜好各不相同。但。举个例子,我喜欢看略带黑色幽默的警匪电影,那么大家根据这个描述就知道我大概会喜欢昆汀的《低俗小说》、《落水狗》和韦家辉的《一个字头的诞生》。这些电影都符合我对自己喜好的描述,也就是说他们在这个抽象的低维空间的投影和我的喜好相似。再抽象一些,把人们的喜好和电影的特征都投到这个低维空间,一个人的喜好映射到了一个低维向量,一个电影的特征变成了纬度相同的向量,那么这个人和这个电影的相似度就可以表述成这两个向量之间的内积。
我们把打分理解成相似度,那么“打分矩阵A(mn)”就可以由“用户喜好特征矩阵U(mk)”和“产品特征矩阵V(n*k)”的乘积来近似了。矩阵U、矩阵V如下图所示:

U
V

二、交替最小二乘法(ALS)。
矩阵分解模型的损失函数为:

有了损失函数之后,下面就开始谈优化方法了,通常的优化方法分为两种:交叉最小二乘法(alternative
least squares)和随机梯度下降法(stochastic gradient descent)。本文使用算法的思想就是:我们先随机生成然后固定它求解,再固定求解,这样交替进行下去,直到取得最优解min(C)。因为每步迭代都会降低误差,并且误差是有下界的,所以
ALS 一定会收敛。但由于问题是非凸的,ALS 并不保证会收敛到全局最优解。但在实际应用中,ALS 对初始点不是很敏感,是不是全局最优解造成的影响并不大。

算法的执行步骤:

1、先随机生成一个。一般可以取0值或者全局均值。

2、固定(即:认为是已知的常量),来求解。

此时,损失函数为:

由于C中只有Vj一个未知变量,因此C的最优化问题转化为最小二乘问题,用最小二乘法求解Vj的最优解:

固定j(j=1,2,......,n),则:C的导数


按照上式依次计算v1,v2,......,vn,从而得到。

3、固定(即:认为是已知的量),来求解。

此时,损失函数为:

同理,用步骤2中类似的方法,可以计算ui的值:

依照上式依次计算u1,u2,......,um,从而得到。

4、循环执行步骤2、3,直到损失函数C的值收敛(或者设置一个迭代次数N,迭代执行步骤2、3
N次后停止)。这样,就得到了C最优解对应的矩阵U、V。

MovieLens 数据
该数据集由用户ID,影片ID,评分,时间戳组成

我们只需要前3个字段

复制代码
/ Load the raw ratings data from a file. Replace 'PATH' with the path to the MovieLens data /
val rawData = sc.textFile("/PATH/ml-100k/u.data")
rawData.first()
// 14/03/30 13:21:25 INFO SparkContext: Job finished: first at :17, took 0.002843 s
// res24: String = 196 242 3 881250949

/ Extract the user id, movie id and rating only from the dataset /
val rawRatings = rawData.map(_.split("\t").take(3))
rawRatings.first()
// 14/03/30 13:22:44 INFO SparkContext: Job finished: first at :21, took 0.003703 s
// res25: Array[String] = Array(196, 242, 3)
复制代码

MLlib ALS模型
MLlib导入ALS模型:

import org.apache.spark.mllib.recommendation.ALS
我们看一下ALS.train函数:

复制代码
ALS.train
/*
:13: error: ambiguous reference to overloaded definition,
both method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
and method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int, lambda: Double)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
match expected type ?
ALS.train
^
*/
复制代码

我们可以得知train函数需要四个参数:ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int, lambda: Double

  1. ratings

org.apache.spark.mllib.recommendation.Rating类是对用户ID,影片ID,评分的封装

我们可以这样生成Rating的org.apache.spark.rdd.RDD:

val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }
ratings.first()
// 14/03/30 13:26:43 INFO SparkContext: Job finished: first at :24, took 0.002808 s
// res28: org.apache.spark.mllib.recommendation.Rating = Rating(196,242,3.0)

  1. rank

对应ALS模型中的因子个数,即“两个小矩阵和”中的k

  1. iterations

对应运行时的迭代次数

  1. lambda:

控制模型的正则化过程,从而控制模型的过拟合情况。

由此,我们可以得到模型:

复制代码
/ Train the ALS model with rank=50, iterations=10, lambda=0.01 /
val model = ALS.train(ratings, 50, 10, 0.01)
// ...
// 14/03/30 13:28:44 INFO MemoryStore: ensureFreeSpace(128) called with curMem=7544924, maxMem=311387750
// 14/03/30 13:28:44 INFO MemoryStore: Block broadcast_120 stored as values to memory (estimated size 128.0 B, free 289.8 MB)
// model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@7c7fbd3b

/ Inspect the user factors /
model.userFeatures
// res29: org.apache.spark.rdd.RDD[(Int, Array[Double])] = FlatMappedRDD[1099] at flatMap at ALS.scala:231

/ Count user factors and force computation /
model.userFeatures.count
// ...
// 14/03/30 13:30:08 INFO SparkContext: Job finished: count at :26, took 5.009689 s
// res30: Long = 943

model.productFeatures.count
// ...
// 14/03/30 13:30:59 INFO SparkContext: Job finished: count at :26, took 0.247783 s
// res31: Long = 1682

/ Make a prediction for a single user and movie pair /
val predictedRating = model.predict(789, 123)
复制代码

二. 使用推荐模型

用户推荐
用户推荐,向给定用户推荐物品。这里,我们给用户789推荐前10个他可能喜欢的电影。我们可以先解析下电影资料数据集

该数据集是由“|”分割,我们只需要前两个字段电影ID和电影名称

val movies = sc.textFile("/PATH/ml-100k/u.item")
val titles = movies.map(line => line.split("\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
titles(123)
// res68: String = Frighteners, The (1996)
我们看一下预测的结果:

复制代码
/* Make predictions for a single user across all movies /
val userId = 789
val K = 10
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString("\n"))
/

Rating(789,715,5.931851273771102)
Rating(789,12,5.582301095666215)
Rating(789,959,5.516272981542168)
Rating(789,42,5.458065302395629)
Rating(789,584,5.449949837103569)
Rating(789,750,5.348768847643657)
Rating(789,663,5.30832117499004)
Rating(789,134,5.278933936827717)
Rating(789,156,5.250959077906759)
Rating(789,432,5.169863417126231)
/
topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)
/

(To Die For (1995),5.931851273771102)
(Usual Suspects, The (1995),5.582301095666215)
(Dazed and Confused (1993),5.516272981542168)
(Clerks (1994),5.458065302395629)
(Secret Garden, The (1993),5.449949837103569)
(Amistad (1997),5.348768847643657)
(Being There (1979),5.30832117499004)
(Citizen Kane (1941),5.278933936827717)
(Reservoir Dogs (1992),5.250959077906759)
(Fantasia (1940),5.169863417126231)
*/
复制代码
我们再来看一下实际上的结果是:

复制代码
val moviesForUser = ratings.keyBy(_.user).lookup(789)
// moviesForUser: Seq[org.apache.spark.mllib.recommendation.Rating] = WrappedArray(Rating(789,1012,4.0), Rating(789,127,5.0), Rating(789,475,5.0), Rating(789,93,4.0), ...
// ...
println(moviesForUser.size)
// 33
moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)
/*
(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)
*/
复制代码
很遗憾,一个都没对上~不过,这很正常。因为预测的结果恰好都是用户789没看过的电影,其预测的评分都在5.0以上,而实际上的结果是根据用户789已经看过的电影按评分排序获得的,这也体现的推荐系统的作用~

物品推荐
物品推荐,给定一个物品,哪些物品和它最相似。这里我们使用余弦相似度:

Cosine相似度计算

将查询语句的特征词的权值组成向量 a

网页中对应的特征词的权值组成向量 b

查询语句与该网页的Cosine相似度:

/ Compute the cosine similarity between two vectors /
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}

jblas线性代数库
这里MLlib库需要依赖jblas线性代数库,如果大家编译jblas的jar包有问题,可以到我的百度云上获取。把jar包加到lib文件夹后,记得在spark-env.sh添加配置:

SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:$SPARK_LIBRARY_PATH/jblas-1.2.4-SNAPSHOT.jar"

import org.jblas.DoubleMatrix
val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))
// aMatrix: org.jblas.DoubleMatrix = [1.000000; 2.000000; 3.000000]

求各个产品的余弦相似度:

val sims = model.productFeatures.map{ case (id, factor) =>
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
(id, sim)
}
求相似度最高的前10个相识电影。第一名肯定是自己,所以要取前11个,再除去第1个:

复制代码
val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })
sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("\n")
/*
(Hideaway (1995),0.6932331537649621)
(Body Snatchers (1993),0.6898690594544726)
(Evil Dead II (1987),0.6897964975027041)
(Alien: Resurrection (1997),0.6891221044611473)
(Stephen King's The Langoliers (1995),0.6864214133620066)
(Liar Liar (1997),0.6812075443259535)
(Tales from the Crypt Presents: Bordello of Blood (1996),0.6754663844488256)
(Army of Darkness (1993),0.6702643811753909)
(Mystery Science Theater 3000: The Movie (1996),0.6594872765176396)
(Scream (1996),0.6538249646863378)
*/
复制代码

三.推荐模型评估

1.MSE/RMSE
均方差(MSE),就是对各个实际存在评分的项,pow(预测评分-实际评分,2)的值进行累加,在除以项数。而均方根差(RMSE)就是MSE开根号。

我们先用ratings生成(user,product)RDD,作为model.predict()的参数,从而生成以(user,product)为key,value为预测的rating的RDD。然后,用ratings生成以(user,product)为key,实际rating为value的RDD,并join上前者:

复制代码
val usersProducts = ratings.map{ case Rating(user, product, rating) => (user, product)}
val predictions = model.predict(usersProducts).map{
case Rating(user, product, rating) => ((user, product), rating)
}
val ratingsAndPredictions = ratings.map{
case Rating(user, product, rating) => ((user, product), rating)
}.join(predictions)
ratingsAndPredictions.first()
//res21: ((Int, Int), (Double, Double)) = ((291,800),(2.0,2.052364223387371))
复制代码
使用MLLib的评估函数,我们要传入一个(actual,predicted)的RDD。actual和predicted左右位置可以交换:

复制代码
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map { case ((user, product), (actual, predicted)) => (actual, predicted) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)
println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)
// Mean Squared Error = 0.08231947642632852
// Root Mean Squared Error = 0.2869137090247319
复制代码

  1. MAPK/MAP
    K值平均准确率(MAPK)可以简单的这么理解:

设定推荐K=10,即推荐10个物品。预测该用户评分最高的10个物品ID作为文本1,实际上用户评分过所有物品ID作为文本2,求二者的相关度。(个人认为该评估方法在这里不是很适用)

我们可以按评分排序预测物品ID,再从头遍历,如果该预测ID出现在实际评分过ID的集合中,那么就增加一定分数(当然,排名高的应该比排名低的增加更多的分数,因为前者更能体现推荐的准确性)。最后将累加得到的分数除以min(K,actual.size)

如果是针对所有用户,我们需要把各个用户的累加分数进行累加,在除以用户数。

在MLlib里面,使用的是全局平均准确率(MAP,不设定K)。它需要我们传入(predicted.Array,actual.Array)的RDD。

现在,我们先来生成predicted:

我们先生成产品矩阵:

/ Compute recommendations for all users /
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)
// (1682,50)
以便工作节点能够访问到,我们把该矩阵以广播变量的形式分发出去:

// broadcast the item factor matrix
val imBroadcast = sc.broadcast(itemMatrix)
“”,矩阵相乘,计算出评分。scores.data.zipWithIndex,scores.data再按评分排序。生成recommendedIds,构建(userId, recommendedIds)RDD。

复制代码
val allRecs = model.userFeatures.map{ case (userId, array) =>
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
(userId, recommendedIds)
}
复制代码
生成actual:

// next get all the movie ids per user, grouped by user id
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)
// userMovies: org.apache.spark.rdd.RDD[(Int, Seq[(Int, Int)])] = MapPartitionsRDD[277] at groupBy at :21
生成(predicted.Array,actual.Array)的RDD,并使用评估函数:

复制代码
import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2)
(predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
// Mean Average Precision = 0.07171412913757183
复制代码

时间: 2025-01-02 21:44:40

Spark机器学习之推荐引擎的相关文章

Spark机器学习3·推荐引擎(spark-shell)

Spark机器学习 准备环境 jblashttps://gcc.gnu.org/wiki/GFortranBinaries#MacOS org.jblas:jblas:1.2.4-SNAPSHOT git clone https://github.com/mikiobraun/jblas.git cd jblas mvn install 运行环境 cd /Users/erichan/Garden/spark-1.5.1-bin-cdh4 bin/spark-shell --name my_mli

《 Apache Spark机器学习.》导读

本节书摘来自华章出版社< Apache Spark机器学习.>一书中作者[美] 刘永川(Alex Liu) 著 闫龙川 高德荃 李君婷 译  前 言 作为数据科学家和机器学习专业人员,我们的工作是建立模型进行欺诈检测.预测客户流失,或者在广泛的领域将数据转换为洞见.为此,我们有时需要处理大量的数据和复杂的计算.因此,我们一直对新的计算工具满怀期待,例如Spark,我们花费了很多时间来学习新工具.有很多可用的资料来学习这些新的工具,但这些资料大多都由计算机科学家编写,更多的是从计算角度来描述.

Spark-构建基于Spark的推荐引擎

推荐引擎 推荐引擎就是是预测人们可能喜好的物品并通过探寻物品之间的联系来辅助这个过 程.从这点上来说,它同样也做预测的搜索引擎互补.但与搜索引擎不同,推荐引擎试图向人 们呈现的相关内容并不一定就是人们所搜索的,其返回的某些结果甚至人们都没听说过.推荐引擎试图对用户与某类物品之间的联系建模.比如上一个博客案 例中,我们使用推荐引擎来告诉用户有哪些电影他们可能会喜欢.如果这点做得很好,就能吸引 用户持续使用我们的服务.这对双方都有好处.同样,如果能准确告诉用户有哪些电影与某一电 影相似,就能方便用户

《Spark机器学习》读书笔记总结

Spark机器学习 <Machine Learning with Spark>书评与作者访谈 1 编程入门 2 准备数据 3 推荐引擎 4 分类模型 5 回归模型 6 聚类模型 7 降维模型 8 文本处理 9 实时学习 示意图 潜在语义分析 推荐算法 分类算法

用Spark机器学习数据流水线进行广告检测

在这篇文章中,我们Spark的其它机器学习API,名为Spark ML,如果要用数据流水线来开发大数据应用程序的话,这个是推荐的解决方案.关键点: 了解机器学习数据流水线有关内容. 怎么用Apache Spark机器学习包来实现机器学习数据流水线. 数据价值链处理的步骤. Spark机器学习流水线模块和API. 文字分类和广告检测用例. Spark ML(spark.ml)包提供了构建在DataFrame之上的机器学习API,它已经成了Spark SQL库的核心部分.这个包可以用于开发和管理机器

日处理数亿次请求的工作推荐引擎是如何演化的?

◆ ◆ ◆ 从搜索引擎到推荐 Indeed的产品运行在世界各地的许多数据中心上.来自每个数据中心的点击流数据以及其他应用事件被复制到我们在奥斯丁数据中心的一个中心化的HDFS数据仓库中.我们在这个数据仓库上进行计算分析并且构建我们的机器学习模型. 我们的职位搜索引擎是简单而直观的,只有两个输入:关键字和地点.搜索结果页面展示了一个匹配的职位列表,并基于相关性进行了排序.搜索引擎是我们的用户发现职位的主要方式. 我们决定在搜索引擎之上加入职位推荐作为一个新的交互模式是基于以下几个关键原因: Ind

探究推荐引擎瞬间被“秒”背后:究竟是什么让用户接踵而至?

6月16日,阿里云技术专家郑重(卢梭)将做客,直播分享<技术实战:21天搭建推荐系统>,报名地址:https://yq.aliyun.com/webinar/join/14 推荐引擎官网 "我是做电商CRM的,市场中有非常多的CRM在相互竞争,必须要找到一个能让自己的产品区别于竞争对手,甚至优于对手的核心能力.现在产品的未来都压在推荐引擎上,我需要你们帮助定制出这项能力."一位企业的负责人如此坦言. 5月18日上午11点,推荐引擎新版上线,在限量提供折扣抢购后,所有的产品瞬

基于Apache Mahout构建社会化推荐引擎

推荐引擎简介 推荐引擎利用特殊的信息过滤(IF,Information Filtering)技术,将不同的内容(例如电影.音乐.书籍.新闻.图片.网页等)推荐给可能感兴趣的用户.通常情况下,推荐引擎的实现是通过将用户的个人喜好与特定的参考特征进行比较,并试图预测用户对一些未评分项目的喜好程度.参考特征的选取可能是从项目本身的信息中提取的,或是基于用户所在的社会或社团环境. 根据如何抽取参考特征,我们可以将推荐引擎分为以下四大类: 基于内容的推荐引擎:它将计算得到并推荐给用户一些与该用户已选择过的

Apache Spark机器学习.1.4 MLlib

1.4 MLlib MLlib是一个可扩展的Spark机器学习库,包括很多常用的机器学习算法.MLlib内置的算法如下: 以向量和矩阵形式处理数据 基本的统计计算,例如:汇总统计和相关性.简单随机数生成.分层抽样.执行简单的假设检验 分类和回归建模 协同过滤 聚类 降维 特征提取与转换 频繁模式挖掘 开发优化 PMML模型导出 Spark MLlib还处在活跃开发阶段,预计每次新版发布都会有新的算法加入其中. MLlib符合Apache Spark的计算哲学,简单易用,性能卓越. MLlib使用