《Scala机器学习》一一3.3 应用

3.3 应用
下面会介绍Spark/Scala中的一些实际示例和库,具体会从一个非常经典的单词计数问题开始。
3.3.1 单词计数
大多数现代机器学习算法需要多次传递数据。如果数据能存放在单台机器的内存中,则该数据会容易获得,并且不会呈现性能瓶颈。如果数据太大,单台机器的内存容纳不下,则可保存在磁盘(或数据库)上,这样虽然可得到更大的存储空间,但存取速度大约会降为原来的1/100。另外还有一种方式就是分割数据集,将其存储在网络中的多台机器上,并通过网络来传输结果。虽然对这种方式仍有争议,但分析表明,对于大多数实际系统而言,如果能有效地在多个CPU之间拆分工作负载,则通过一组网络连接节点存储数据比从单个节点上的硬盘重复存储和读取数据略有优势。
磁盘的平均带宽约为100 MB/s,由于磁盘的转速和缓存不同,其传输时会有几毫秒的延迟。相对于直接从内存中读取数据,速度要降为原来的1/100左右,当然,这也会取决于数据大小和缓存的实现。现代数据总线可以超过10 GB/s的速度传输数据。而网络速度仍然落后于直接的内存访问,特别是标准网络层中TCP/IP内核的开销会对网络速度影响很大。但专用硬件可以达到每秒几十吉字节,如果并行运行,则可能和从内存读取一样快。当前的网络传输速度介于1~10 GB/s之间,但在实际应用中仍然比磁盘更快。因此,可以将数据分配到集群节点中所有机器的内存中,并在集群上执行迭代机器学习算法。
但内存也有一个问题:在节点出现故障并重新启动后,内存中的数据不会跨节点持久保存。一个流行的大数据框架Hadoop解决了这个问题。Hadoop受益于Dean/Ghemawat的论文(Jeff Dean和Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters, OSDI, 2004.),这篇文章提出使用磁盘层持久性来保证容错和存储中间结果。Hadoop MapReduce程序首先会在数据集的每一行上运行map函数,得到一个或多个键/值对。然后按键值对这些键/值对进行排序、分组和聚合,使得具有相同键的记录最终会在同一个reducer上处理,该reducer可能在一个(或多个)节点上运行。reducer会使用一个reduce函数,遍历同一个键对应的所有值,并将它们聚合在一起。如果reducer因为一些原因失败,由于其中间结果持久保存,则可以丢弃部分计算,然后可从检查点保存的结果重新开始reduce计算。很多简单的类ETL应用程序仅在保留非常少的状态信息的情况下才遍历数据集,这些状态信息是从一个记录到另一个记录的。
单词计数是MapReduce的经典应用程序。该程序可统计文档中每个单词的出现次数。在Scala中,对排好序的单词列表采用foldLeft方法,很容易得到单词计数。

如果运行这个程序,会输出(字,计数)这样的元组列表。该程序会按行来分词,并对得到的单词排序,然后将每个单词与(字,计数)元组列表中的最新条目(entry)进行匹配。同样的计算在MapReduce中会表示成如下形式:

首先需要按行处理文本,将行拆分成单词,并生成(word,1)对。这个任务很容易并行化。为了并行化全局计数,需对计数部分进行划分,具体的分解通过对单词子集分配计数任务来实现。在Hadoop中需计算单词的哈希值,并根据哈希值来划分工作。
一旦map任务找到给定哈希的所有条目,它就可以将键/值对发送到reducer,在MapReduce中,发送部分通常称为shuffle。从所有mapper中接收完所有的键/值对后,reducer才会组合这些值(如果可能,在mapper中也可部分组合这些值),并对整个聚合进行计算,在这种情况下只进行求和。单个reducer将查看给定单词的所有值。
下面介绍Spark中单词计数程序的日志输出(Spark在默认情况下输出的日志会非常冗长,为了输出关键的日志信息,可将conf /log4j.properties文件中的INFO替换为ERROR或FATAL):

这个过程发生的唯一的事情是元数据操作,Spark不会触及数据本身,它会估计数据集的大小和分区数。默认情况下是HDFS块数,但是可使用minPartitions参数明确指定最小分区数:

下面定义另一个RDD,它源于linesRdd:

在2 GB的文本数据(共有40 291行,353 087个单词)上执行单词计算程序时,进行读取、分词和按词分组所花的时间不到1秒。通过扩展日志记录可看到以下内容:
Spark打开几个端口与执行器和用户通信
Spark UI运行的端口为4040(可通过http://localhost: 4040打开)
可从本地或分布式存储(HDFS、Cassandra和S3)中读取文件
如果Spark构建时支持Hive,它会连接到Hive上
Spark使用惰性求值(仅当输出请求时)来执行管道
Spark使用内部调度器将作业拆分为任务,优化执行任务,然后执行它们
结果存储在RDD中,可用集合方法来保存或导入到执行shell的节点的RAM中
并行性能调整的原则是在不同节点或线程之间分割工作负载,使得开销相对较小,而且要保持负载平衡。
3.3.2 基于流的单词计数
Spark支持对输入流进行监听,能对其进行分区,并以接近实时的方式来计算聚合。目前支持来自Kafka、Flume、HDFS/S3、Kinesis、Twitter,以及传统的MQ(如ZeroMQ和MQTT)的数据流。在Spark中,流的传输是以小批量(micro-batch)方式进行的。在Spark内部会将输入数据分成小批量,通常按大小的不同,有些所花的时间不到1秒,有些却要几分钟,然后会对这些小批量数据执行RDD聚合操作。
下面扩展前面介绍的Flume示例。这需要修改Flume配置文件来创建一个Spark轮询槽(polling sink),用这种槽来替代HDFS:

现在不用写入HDFS,Flume将会等待Spark的轮询数据:

为了运行程序,在一个窗口中启动Flume代理:

然后在另一个窗口运行FlumeWordCount对象:

现在任何输入到netcat连接的文本都将被分词并在6秒的滑动窗口上按每2秒计算单词的量:

Spark/Scala允许在不同的流之间无缝切换。例如,Kafka发布/订阅主题模型类似于如下形式:

要启动Kafka代理,首先下载最新发布的二进制包并启动ZooKeeper。ZooKeeper是一个分布式服务协调器,即使Kafka部署在单节点上也需要它:

在另一个窗口中启动Kafka服务器:

运行KafkaWordCount对象:

现在将单词流发布到Kafka主题中,这需要再开启一个计数窗口:

从上面的结果可以看出程序每两秒输出一次。Spark流有时被称为小批次处理(micro-batch processing)。数据流有许多其他应用程序(和框架),但要完全讨论清楚会涉及很多内容,因此需要单独进行介绍。在第5章会讨论一些数据流上的机器学习问题。下面将介绍更传统的类SQL接口。
3.3.3 Spark SQL和数据框
数据框(Data Frame)相对较新,在Spark的1.3版本中才引入,它允许人们使用标准的SQL语言来分析数据。在第1章就使用了一些SQL命令来进行数据分析。SQL对于简单的数据分析和聚合非常有用。
最新的调查结果表明大约有70%的Spark用户使用DataFrame。虽然DataFrame最近成为表格数据最流行的工作框架,但它是一个相对重量级的对象。DataFrame使用的管道在执行速度上可能比基于Scala的vector或LabeledPoint(这两个对象将在下一章讨论)的速度慢得多。来自多名开发人员的证据表明:响应时间可为几十或几百毫秒,这与具体查询有关,若是更简单的对象会小于1毫秒。
Spark为SQL实现了自己的shell,这是除标准Scala REPL shell以外的另一个shell。可通过./bin/spark-sql来运行该shell,还可通过这种shell来访问Hive/Impala或关系数据库表:

在标准Spark的REPL中,可以通过运行相同的查询来执行以下命令:

时间: 2024-11-08 23:20:39

《Scala机器学习》一一3.3 应用的相关文章

《Scala机器学习》一一2.5 数据驱动系统的基本组件

2.5 数据驱动系统的基本组件 简单地说,一个数据驱动架构包含如下的组件(或者可精简为以下这些组件): 数据收集:需要从系统和设备上收集数据.大多数的系统有日志,或者至少可选择将日志写入本地文件系统.一些系统可以通过网络来传输信息,比如syslog.但若没有审计信息,缺少持久层意味着有可能丢失数据. 数据转换层:也被称为提取.变换和加载(ETL).现在数据转换层也可以进行实时处理,即通过最近的数据来计算汇总信息.数据转换层也用来重新格式化数据和索引数据,以便能被UI组件有效地访问. 数据分析和机

《Scala机器学习》一一3.4 机器学习库

3.4 机器学习库 Spark是基于内存的存储系统,它本质上能提高节点内和节点之间的数据访问速度.这似乎与ML有一种自然契合,因为许多算法需要对数据进行多次传递或重新分区.MLlib是一个开源库,但仍有一些私人公司还在不断按自己的方式来实现MLlib中的算法. 在第5章会看到大多数标准机器学习算法可以表示为优化问题.例如,经典线性回归会最小化回归直线与实际y值之间的距离平方和: 其中,是由下面的线性表达式所得到的预测值: A通常称为斜率,B通常称为截距.线性优化问题更一般化的公式可以写成最小化加

《Scala机器学习》一一导读

前 言 这是一本关于机器学习的书,它以Scala为重点,介绍了函数式编程方法以及如何在Spark上处理大数据.九个月前,当我受邀写作本书时,我的第一反应是:Scala.大数据.机器学习,每一个主题我都曾彻底调研过,也参加了很多的讨论,结合任何两个话题来写都具有挑战性,更不用说在一本书中结合这三个主题.这个挑战激发了我的兴趣,于是就有了这本书.并不是每一章的内容都像我所希望的那样圆满,但技术每天都在快速发展.我有一份具体的工作,写作只是表达我想法的一种方式. 下面先介绍机器学习.机器学习经历了翻天

《Scala机器学习》一一1.7 总结

1.7 总结本章试图为后面更复杂的数据科学建立一个通用平台.不要认为这里介绍了一套完整的探索性技术,因为探索性技术可扩展到非常复杂的模式上.但是,本章已经涉及了简单的汇总.抽样.文件操作(如读和写),并使用notebook和Spark DataFrame等工具来工作,Spark的DataFrame也为使用Spark/Scala的数据分析师引入了他们所熟悉的SQL结构.下一章开始介绍数据管道,可将其看作基于数据驱动企业的一部分,并从商业角度给出数据发现的过程:做数据分析试图要完成的最终目标是什么.

《Scala机器学习》一一3.7 总结

3.7 总结 本章概述了Spark/Hadoop以及它们与Scala和函数式编程的关系.重点介绍了一个经典的单词计数的例子,它是用Scala和Spark来实现的,并以单词计数和流为例介绍了Spark生态系统的高级组件.通过本章的学习,读者已经具备有了用Scala/Spark实现经典的机器学习算法的知识.下一章将开始介绍监督学习和无监督学习,这是对基于结构数据的学习算法的传统划分.

《Scala机器学习》一一第2章 数据管道和建模

**第2章数据管道和建模**上一章介绍了一些研究数据的基本工具.本章将深入介绍一些更复杂的主题,其中包括建立统计模型.最优控制以及科学驱动(science-driven)的工具等问题.不过事先声明,本书只会涉及最优控制的若干主题,因为本书是介绍基于Scala的机器学习(ML),而不是数据驱动的企业管理理论,企业管理理论本身就足以写成一本书.本章不会介绍基于Scala的具体实现,而是在一个高层次上探讨构建数据驱动型企业的问题.后面的章节将详细讨论如何实现这些细节.本章也特别强调不确定性的处理.不确

《Scala机器学习》一一1.6 相关性的基础

1.6 相关性的基础读者可能已经注意到,从列联表检测相关性是很难的.检测模式来源于实践,但许多人更擅长于识别可视化的模式.检测行为模式是机器学习的基本目标之一.虽然高级的监督机器学习技术将在第4章和第5章中讨论,但对变量之间相互依存关系的初步分析可得到正确的数据转换(或最佳的推理技术).目前有很多成熟的可视化工具及相关的网站(如http://www.kdnuggets.com)都专注于数据分析.数据研究和可视化软件的排名以及推荐.本书不会去质疑该排名的有效性和准确性,但确实很少有网站会介绍用Sc

《Scala机器学习》一一第3章 使用Spark和MLlib

第3章 使用Spark和MLlib 上一章介绍了在全局数据驱动的企业架构中的什么地方以及如何利用统计和机器学习来处理知识,但接下来不会介绍Spark和MLlib的具体实现,MLlib是Spark顶层的机器学习库.Spark是大数据生态系统中相对较新的成员,它基于内存使用而不是磁盘来进行优化.数据仍然可以根据需要转储到磁盘上,但Spark只有在明确指示这样做或活动数据集不适合内存时才会执行转储.如果节点出现故障或由于某些原因从内存中擦除信息,Spark会利用存储的信息来重新计算活动数据集.这与传统

《Scala机器学习》一一2.7 总结

2.7 总结 本章介绍了一种用于设计数据驱动企业的高级架构方法.同时还向读者介绍了影响图,它是一个用来了解传统企业和数据驱动企业是如何做决策的工具.接着介绍了几个重要的模型,如Kelly准则和多臂老虎机,并从数学的角度来说明这些问题是至关重要的.在这些内容的基础上还介绍了马尔可夫决策过程,该过程通过已有的决定和观察的结果来得到决策策略.本章深入研究了构建决策数据管道较为实用的方法,以及可用于构建它们的主要组件和框架.最后讨论了不同阶段和节点之间传递数据和建模结果的问题,以及将结果如何呈现给用户.