Spark机器学习8· 文本处理(spark-shell)


Spark机器学习

自然语言处理(NLP,Natural Language Processing)

  • 提取特征
  • 建模
  • 机器学习

TF-IDF(词频 term frequency–逆向文件频率 inverse document frequency)

  • 短语加权:根据词频,为单词赋予权值
  • 特征哈希:使用哈希方程对特征赋予向量下标

0 运行环境

tar xfvz 20news-bydate.tar.gz

export SPARK_HOME=/Users/erichan/Garden/spark-1.5.1-bin-hadoop2.6
cd $SPARK_HOME
bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2

1 提取特征

val PATH = "/Users/erichan/sourcecode/book/Spark机器学习/20news-bydate"
val path = PATH+"/20news-bydate-train/*"
val rdd = sc.wholeTextFiles(path)
println(rdd.count)

11314

查看新闻组主题

val newsgroups = rdd.map { case (file, text) => file.split("/").takeRight(2).head }
val countByGroup = newsgroups.map(n => (n, 1)).reduceByKey(_ + _).collect.sortBy(-_._2).mkString("\n")
println(countByGroup)

(rec.sport.hockey,600)
(soc.religion.christian,599)
(rec.motorcycles,598)
(rec.sport.baseball,597)
(sci.crypt,595)
(rec.autos,594)
(sci.med,594)
(comp.windows.x,593)
(sci.space,593)
(sci.electronics,591)
(comp.os.ms-windows.misc,591)
(comp.sys.ibm.pc.hardware,590)
(misc.forsale,585)
(comp.graphics,584)
(comp.sys.mac.hardware,578)
(talk.politics.mideast,564)
(talk.politics.guns,546)
(alt.atheism,480)
(talk.politics.misc,465)
(talk.religion.misc,377)

2 建模

2.1 分词

val text = rdd.map { case (file, text) => text }
val whiteSpaceSplit = text.flatMap(t => t.split(" ").map(_.toLowerCase))
println(whiteSpaceSplit.distinct.count)
println(whiteSpaceSplit.sample(true, 0.3, 42).take(100).mkString(","))

402978
from:,mathew,mathew,faq:,faq:,atheist,resources
summary:,music,--,fiction,,mantis,consultants,,uk.
supersedes:,290

archive-name:,1.0

,,,,,,,,,,,,,,,,,,,organizations

,organizations

,,,,,,,,,,,,,,,,stickers,and,and,the,from,from,in,to:,to:,ffrf,,256-8900

evolution,designs

evolution,a,stick,cars,,written
inside.,fish,us.

write,evolution,,,,,,,bay,can,get,get,,to,the
price,is,of,the,the,so,on.,and,foote.,,atheist,pp.,0-910309-26-4,,,atrocities,,foote:,aap.,,the

2.2 改进分词

val nonWordSplit = text.flatMap(t => t.split("""\W+""").map(_.toLowerCase))
println(nonWordSplit.distinct.count)
println(nonWordSplit.distinct.sample(true, 0.3, 42).take(100).mkString(","))
val regex = """[^0-9]*""".r
val filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches)
println(filterNumbers.distinct.count)
println(filterNumbers.distinct.sample(true, 0.3, 42).take(100).mkString(","))

2.3 移除停用词

val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _)
val oreringDesc = Ordering.by[(String, Int), Int](_._2)
//println(tokenCounts.top(20)(oreringDesc).mkString("\n"))

val stopwords = Set(
    "the","a","an","of","or","in","for","by","on","but", "is", "not", "with", "as", "was", "if",
    "they", "are", "this", "and", "it", "have", "from", "at", "my", "be", "that", "to"
)
val tokenCountsFilteredStopwords = tokenCounts.filter { case (k, v) => !stopwords.contains(k) }
//println(tokenCountsFilteredStopwords.top(20)(oreringDesc).mkString("\n"))

val tokenCountsFilteredSize = tokenCountsFilteredStopwords.filter { case (k, v) => k.size >= 2 }
println(tokenCountsFilteredSize.top(20)(oreringDesc).mkString("\n"))

2.4 移除低频词

val oreringAsc = Ordering.by[(String, Int), Int](-_._2)
//println(tokenCountsFilteredSize.top(20)(oreringAsc).mkString("\n"))

val rareTokens = tokenCounts.filter{ case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet
val tokenCountsFilteredAll = tokenCountsFilteredSize.filter { case (k, v) => !rareTokens.contains(k) }
println(tokenCountsFilteredAll.top(20)(oreringAsc).mkString("\n"))

def tokenize(line: String): Seq[String] = {
    line.split("""\W+""")
        .map(_.toLowerCase)
        .filter(token => regex.pattern.matcher(token).matches)
        .filterNot(token => stopwords.contains(token))
        .filterNot(token => rareTokens.contains(token))
        .filter(token => token.size >= 2)
        .toSeq
}

//println(text.flatMap(doc => tokenize(doc)).distinct.count)
val tokens = text.map(doc => tokenize(doc))
println(tokens.first.take(20))

2.5 提取词干

  • 标准NLP方法
  • 搜索引擎
    • NLTK
    • OpenNLP
    • Lucene

3 训练模型

3.1 HashingTF 特征哈希

import org.apache.spark.mllib.linalg.{ SparseVector => SV }
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.feature.IDF
// set the dimensionality of TF-IDF vectors to 2^18
val dim = math.pow(2, 18).toInt
val hashingTF = new HashingTF(dim)
val tf = hashingTF.transform(tokens)
tf.cache
val v = tf.first.asInstanceOf[SV]
println(v.size)
println(v.values.size)
println(v.values.take(10).toSeq)
println(v.indices.take(10).toSeq)

262144
706
WrappedArray(1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0)
WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

fit & transform

val idf = new IDF().fit(tf)
val tfidf = idf.transform(tf)
val v2 = tfidf.first.asInstanceOf[SV]
println(v2.values.size)
println(v2.values.take(10).toSeq)
println(v2.indices.take(10).toSeq)

706
WrappedArray(2.3869085659322193, 4.670445463955571, 6.561295835827856, 4.597686109673142, 8.932700215224111, 5.750365619611528, 2.1871123786150006, 5.520408782213984, 3.4312512246662714, 1.7430324343790569)
WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)

3.2 分析权重

val minMaxVals = tfidf.map { v =>
    val sv = v.asInstanceOf[SV]
    (sv.values.min, sv.values.max)
}
val globalMinMax = minMaxVals.reduce { case ((min1, max1), (min2, max2)) =>
    (math.min(min1, min2), math.max(max1, max2))
}
println(globalMinMax)

globalMinMax: (Double, Double) = (0.0,66155.39470409753)

常用词

val common = sc.parallelize(Seq(Seq("you", "do", "we")))
val tfCommon = hashingTF.transform(common)
val tfidfCommon = idf.transform(tfCommon)
val commonVector = tfidfCommon.first.asInstanceOf[SV]
println(commonVector.values.toSeq)

WrappedArray(0.9965359935704624, 1.3348773448236835, 0.5457486182039175)

不常出现的单词

val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation", "investment")))
val tfUncommon = hashingTF.transform(uncommon)
val tfidfUncommon = idf.transform(tfUncommon)
val uncommonVector = tfidfUncommon.first.asInstanceOf[SV]
println(uncommonVector.values.toSeq)

WrappedArray(5.3265513728351666, 5.308532867332488, 5.483736956357579)

4 使用模型

4.1 余弦相似度

import breeze.linalg._

val hockeyText = rdd.filter { case (file, text) => file.contains("hockey") }
val hockeyTF = hockeyText.mapValues(doc => hashingTF.transform(tokenize(doc)))
val hockeyTfIdf = idf.transform(hockeyTF.map(_._2))

val hockey1 = hockeyTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
val breeze1 = new SparseVector(hockey1.indices, hockey1.values, hockey1.size)

val hockey2 = hockeyTfIdf.sample(true, 0.1, 43).first.asInstanceOf[SV]
val breeze2 = new SparseVector(hockey2.indices, hockey2.values, hockey2.size)
val cosineSim = breeze1.dot(breeze2) / (norm(breeze1) * norm(breeze2))
println(cosineSim)

cosineSim: Double = 0.060250114361164626

val graphicsText = rdd.filter { case (file, text) => file.contains("comp.graphics") }
val graphicsTF = graphicsText.mapValues(doc => hashingTF.transform(tokenize(doc)))
val graphicsTfIdf = idf.transform(graphicsTF.map(_._2))
val graphics = graphicsTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
val breezeGraphics = new SparseVector(graphics.indices, graphics.values, graphics.size)
val cosineSim2 = breeze1.dot(breezeGraphics) / (norm(breeze1) * norm(breezeGraphics))
println(cosineSim2)

cosineSim2: Double = 0.004664850323792852

val baseballText = rdd.filter { case (file, text) => file.contains("baseball") }
val baseballTF = baseballText.mapValues(doc => hashingTF.transform(tokenize(doc)))
val baseballTfIdf = idf.transform(baseballTF.map(_._2))
val baseball = baseballTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
val breezeBaseball = new SparseVector(baseball.indices, baseball.values, baseball.size)
val cosineSim3 = breeze1.dot(breezeBaseball) / (norm(breeze1) * norm(breezeBaseball))
println(cosineSim3)

0.05047395039466008

4.2 学习单词与主题的映射关系

多分类映射
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.evaluation.MulticlassMetrics

val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap
val zipped = newsgroups.zip(tfidf)
val train = zipped.map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }
train.cache
朴素贝叶斯训练
val model = NaiveBayes.train(train, lambda = 0.1)
加载测试数据集
val testPath = PATH+"/20news-bydate-test/*"
val testRDD = sc.wholeTextFiles(testPath)
val testLabels = testRDD.map { case (file, text) =>
    val topic = file.split("/").takeRight(2).head
    newsgroupsMap(topic)
}
val testTf = testRDD.map { case (file, text) => hashingTF.transform(tokenize(text)) }
val testTfIdf = idf.transform(testTf)
val zippedTest = testLabels.zip(testTfIdf)
val test = zippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }
计算准确度和多分类加权F-指标
val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
println(accuracy)

0.7915560276155071

val metrics = new MulticlassMetrics(predictionAndLabel)
println(metrics.weightedFMeasure)

0.7810675969031116

5 评估

val rawTokens = rdd.map { case (file, text) => text.split(" ") }
val rawTF = rawTokens.map(doc => hashingTF.transform(doc))
val rawTrain = newsgroups.zip(rawTF).map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }
val rawModel = NaiveBayes.train(rawTrain, lambda = 0.1)
val rawTestTF = testRDD.map { case (file, text) => hashingTF.transform(text.split(" ")) }
val rawZippedTest = testLabels.zip(rawTestTF)
val rawTest = rawZippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }
val rawPredictionAndLabel = rawTest.map(p => (rawModel.predict(p.features), p.label))
val rawAccuracy = 1.0 * rawPredictionAndLabel.filter(x => x._1 == x._2).count() / rawTest.count()
println(rawAccuracy)

0.7648698884758365

val rawMetrics = new MulticlassMetrics(rawPredictionAndLabel)
println(rawMetrics.weightedFMeasure)

0.7653320418573546

6 Word2Vec模型

Word2Vec模型(分布向量表示):把每个单词表示成一个向量,MLlib中使用skip-gram模型

6.1 训练

import org.apache.spark.mllib.feature.Word2Vec
val word2vec = new Word2Vec()
word2vec.setSeed(42) // we do this to generate the same results each time
val word2vecModel = word2vec.fit(tokens)

6.2 使用

最相似的20个单词
word2vecModel.findSynonyms("hockey", 20).foreach(println)

(sport,1.4818968962277133)
(ecac,1.467546566194254)
(hispanic,1.4166835301985194)
(glens,1.4061103042432825)
(woofers,1.3810090447028116)
(tournament,1.3148823031671586)
(champs,1.3133863003013941)
(boxscores,1.307735040384543)
(aargh,1.274986851270267)
(ahl,1.265165428167253)
(playoff,1.2645991118770572)
(ncaa,1.2383382015648046)
(pool,1.2261154635870224)
(champion,1.2119919989539134)
(filinuk,1.2062208620660915)
(olympic,1.2026738930160243)
(motorcycles,1.2008032355579679)
(yankees,1.1989755767973371)
(calder,1.194001886835493)
(homeruns,1.1800625883573932)

word2vecModel.findSynonyms("legislation", 20).foreach(println)

(accommodates,0.9918184454068688)
(briefed,0.9256758135452989)
(amended,0.9105987267173344)
(telephony,0.8679173760123956)
(pitted,0.8609974033962533)
(aclu,0.8605885863332372)
(licensee,0.8493930472487975)
(agency,0.836706135804648)
(policies,0.8337986602365566)
(senate,0.8327312936220903)
(businesses,0.8291191155630467)
(permit,0.8266658804181389)
(cpsr,0.8231228090944367)
(cooperation,0.8195562469006543)
(surveillance,0.8134342524628756)
(congress,0.8132899468772855)
(restricted,0.8115013134507126)
(procure,0.8096839595766356)
(inquiry,0.8086297702914405)
(industry,0.8077900093754752)

  • legislation 立法
  • aclu 美国公民自由协会
  • senate 参议院
  • surveillance 监视
  • inquiry 调查
时间: 2024-09-20 09:26:49

Spark机器学习8· 文本处理(spark-shell)的相关文章

Apache Spark机器学习.1.5 Spark RDD和DataFrame

1.5 Spark RDD和DataFrame 本节关注数据以及Spark如何表示和组织数据.我们将介绍Spark RDD和DataFrame技术. 通过本节的学习,读者将掌握Spark的两个关键概念:RDD和DataFrame,并将它们应用于机器学习项目. 1.5.1 Spark RDD Spark主要以一种分布式项集合的形式进行数据抽象,称之为弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的关键创新,使其比其他框架计算更加快速和高效

Apache Spark机器学习.1.1 Spark概述和技术优势

摘要 Spark机器学习简介 本章从机器学习和数据分析视角介绍Apache Spark,并讨论Spark中的机器学习计算处理技术.本章首先概括介绍Apache Spark,通过与MapReduce等计算平台进行比较,展示Spark在数据分析中的技术优势和特点.接着,讨论如下五个方面的内容: 机器学习算法与程序库 Spark RDD和DataFrame 机器学习框架 Spark pipeline技术 Spark notebook技术 以上是数据科学家或机器学习专业人员必须掌握的五项最重要的技术内容

Apache Spark机器学习.1.8 Spark notebook简介

1.8 Spark notebook简介 在本节中,我们首先讨论有关面向机器学习的notebook方法.然后,我们介绍R Markdown,以其作为一个成熟的notebook案例,最后介绍Spark中的R notebook. 学习完本节,读者将掌握notebook相关的方法和概念,并为将其用于管理和开发机器学习项目做好准备. 1.8.1 面向机器学习的notebook方法 notebook已经成为众人青睐的机器学习工具,因为该工具既能动态驱动,还具备可重复生成的特点. 大部分notebook接口

Apache Spark机器学习3.1 Spark整体视图

摘要 基于Spark的整体视图 通过第1章,我们建立起了Spark系统,根据第2章的内容,我们完成了数据准备.现在将进入Spark系统应用的新阶段:从数据中获得洞见. 根据Gartner等机构的研究结果,许多公司仅仅是因为缺乏其商业的整体视图而损失了大量的价值.本章我们将回顾机器学习的方法和获得商业整体视图的步骤,然后讨论Spark如何简单.快速地进行相关计算,同时通过一个实例,循序渐进地展示使用Spark从数据到整体视图的开发过程. Spark整体视图 整体视图的方法 特征准备 模型估计 模型

用Spark机器学习数据流水线进行广告检测

在这篇文章中,我们Spark的其它机器学习API,名为Spark ML,如果要用数据流水线来开发大数据应用程序的话,这个是推荐的解决方案.关键点: 了解机器学习数据流水线有关内容. 怎么用Apache Spark机器学习包来实现机器学习数据流水线. 数据价值链处理的步骤. Spark机器学习流水线模块和API. 文字分类和广告检测用例. Spark ML(spark.ml)包提供了构建在DataFrame之上的机器学习API,它已经成了Spark SQL库的核心部分.这个包可以用于开发和管理机器

Spark机器学习2·准备数据(pyspark)

Spark机器学习 准备环境 anaconda nano ~/.zshrc export PATH=$PATH:/anaconda/bin source ~/.zshrc echo $HOME echo $PATH ipython conda update conda && conda update ipython ipython-notebook ipython-qtconsole conda install scipy PYTHONPATH export SPARK_HOME=/Use

《Spark机器学习》读书笔记总结

Spark机器学习 <Machine Learning with Spark>书评与作者访谈 1 编程入门 2 准备数据 3 推荐引擎 4 分类模型 5 回归模型 6 聚类模型 7 降维模型 8 文本处理 9 实时学习 示意图 潜在语义分析 推荐算法 分类算法

Spark机器学习之推荐引擎

Spark机器学习之推荐引擎 一. 最小二乘法建立模型 关于最小二乘法矩阵分解,我们可以参阅: 一.矩阵分解模型. 用户对物品的打分行为可以表示成一个评分矩阵A(m*n),表示m个用户对n各物品的打分情况.如下图所示: 其中,A(i,j)表示用户user i对物品item j的打分.但是,ALS 的核心就是下面这个假设:的打分矩阵 A 可以用两个小矩阵和的乘积来近似:.这样我们就把整个系统的自由度从一下降到了.我们接下来就聊聊为什么 ALS 的低秩假设是合理的.世上万千事物,人们的喜好各不相同.

Apache Spark机器学习.2.1 访问和加载数据集

摘要 Spark机器学习的数据准备 机器学习从业者和数据科学家时常耗费70%或80%的时间为机器学习项目准备数据.数据准备可能是很艰辛的工作,但是它影响到接下来的各方面工作,因此是非常必要和极其重要的.所以,在本章中,我们将讨论机器学习中所有必要的数据准备方面的内容,通常包括数据获取.数据清洗.数据集连接,再到特征开发,从而让我们为基于Spark平台构建机器学习模型准备好数据集.具体而言,我们将讨论前面提到的以下6个数据准备任务,然后在针对复用性和自动化的讨论中结束本章: 访问和加载数据集 开放