本节书摘来自华章出版社《Spark大数据分析:核心概念、技术及实践》一书中的第3章,第3.5节,作者[美] 穆罕默德·古勒(Mohammed Guller),更多章节内容可以访问“华章计算机”公众号查看。
3.5 API
应用可以通过使用Spark提供的库获得Spark集群计算的能力。这些库都是用Scala编写的。但是Spark提供了各种语言的API。在本书编写之际,Spark API提供了如下语言的支持:Scala、Java、Python和R。可以使用上面的任何语言来开发Spark应用。也有其他语言(比如Clojure)的非官方支持。
Spark API主要由两个抽象部件SparkContext和弹性分布式数据集(RDD)构成。应用程序通过这两个部件和Spark进行交互。应用程序可以连接到Spark集群并使用相关资源。接下来会介绍这两个抽象部件,然后详细介绍RDD。
3.5.1 SparkContext
SparkContext是一个在Spark库中定义的类。它是Spark库的入口点。它表示与Spark集群的一个连接。使用Spark API创建的其他一些重要对象都依赖于它。
每个Spark应用程序都必须创建一个SparkContext类实例。目前,每个Spark应用程序只能拥有一个激活的SparkContext类实例。如果要创建一个新的实例,那么在此之前必须让当前激活的类实例失活。
SparkContext有多个构造函数。最简单的一个不需要任何参数。一个SparkContext类实例可以用如下代码创建。
在这种情况下,SparkContext的配置信息都从系统属性中获取,比如Spark master的地址、应用名称等。也可以创建一个SparkConf类实例,然后把它作为SparkContext的参数从而设定配置信息。SparkConf 是Spark库中定义的一个类。通过这种方式可以像下面这样设置各种Spark配置信息。
SparkConf为设置诸如Spark master这样的常用配置信息都提供了对应的显式方法。此外,它还提供了一个通用的方法用于设置配置信息,它使用键-值对进行设置。SparkContext和SparkConf可以使用的参数将在第4章进行详细介绍。
在本章接下来的例子中会继续使用上面创建的变量sc。
3.5.2 RDD
弹性分布式数据集(RDD)表示一个关于分区数据元素的集合,可以在其上进行并行操作。它是Spark的主要数据抽象概念。它是Spark库中定义的一个抽象类。
从概念上看,除了可以用于表示分布式数据集和支持惰性操作的特性外,RDD类似于Spark的集合。惰性操作将在本章稍后部分详细介绍。
下面分别简要描述RDD的特点。
不可变性
RDD是一种不可变的数据结构。一旦创建,它将不可以在原地修改。基本上,一个修改RDD的操作都会返回一个新的RDD。
分片
RDD表示的是一组数据的分区。这些分区分布在多个集群节点上。然而,当Spark在单个节点运行时,所有的分区数据都会在当前节点上。
Spark存储RDD的分区和数据集物理分区之间关系的映射关系。RDD是各个分布式数据源之中数据的一个抽象,它通常表示分布在多个集群节点上的分区数据。比如HDFS将数据分片或分块分散存储在集群中。默认情况下,一个RDD分区对应一个HDFS文件分片。其他的分布式数据源(比如Cassandra)同样也将数据分片分散存储在集群多个节点上。然而,一个RDD对应多个Cassandra分片。
容错性
RDD为可容错的。RDD代表了分散在集群中多个节点的数据,但是任何一个节点都有可能出故障。诚如之前所说的,一个节点出故障的可能性和集群节点数量成正比。集群越大,在任何一个节点它出故障的可能性就越高。
RDD会自动处理节点出故障的情况。当一个节点出故障时,该节点上存储的数据将无法被访问。此时,Spark会在其他节点上重建丢失的RDD分区数据。Spark存储每一个RDD的血统信息。通过这些血统信息,Spark可以恢复RDD的部分信息,当节点出故障的时候,它甚至可以恢复整个RDD。
接口
需要着重指出的是,RDD是一个处理数据的接口。在Spark库中它定义为一个抽象类。RDD为多种数据源提供了一个处理数据的统一接口,包括HDFS、HBase、Cassandra等。这个接口同样可以用于处理存储于多个节点内存中的数据。
Spark为不同数据源提供了各自具体的实现类,比如HadoopRDD、ParallelCollection-RDD、JdbcRDD和CassandraRDD。它们都支持基础的RDD接口。
强类型
RDD类有一个参数用于表示类型,这使得RDD可以表示不同类型的数据。RDD可以表示同一类型数据的分布式集合,包括Integer、Long、Float、String或者应用开发者自己定义的类型。而且,一个应用总会使用某种类型的RDD,包括Integer、Long、Float、Double、String或自定义类型。
驻留在内存中
之前已经提及了Spark的内存集群计算特性。RDD类提供一套支持内存计算的API。Spark允许RDD在内存中缓存或长期驻留。就像之前所说的,对一个缓存在内存中的RDD进行操作比操作没缓存的RDD要快很多。
3.5.3 创建RDD
由于RDD是一个抽象类,因此无法直接创建一个RDD的类实例。SparkContext类提供了一个工厂方法用来创建RDD实现类的类实例。RDD也可以通过由其他RDD执行转换操作得到。就像之前所说的,RDD是不可变的。任何一个对RDD的修改操作都将返回一个代表修改后数据的新RDD。
本节总结了几种创建RDD的常见方法。在下面的示例代码中,sc是一个SparkContext的类实例。之前的章节已经介绍了怎么创建它。
parallelize
这个方法用于从本地Scala集合创建RDD实例。它会对Scala集合中的数据重新分区、重新分布,然后返回一个代表这些数据的RDD。这个方法很少用在生产上,但是使用它有助于学习Spark。
textFile
textFile方法用于从文本文件创建RDD实例。它可以从多种来源读取数据,包括单个文件、本地同一目录下的多个文件、HDFS、Amazon S3,或其他Hadoop支持的存储系统。这个方法返回一个RDD,这个RDD代表的数据集每个元素都是一个字符串,每一个字符串代表输入文件中的一行。
上面的代码表示从存储于HDFS上的一个文件或者目录创建RDD实例。
textFile方法也可以读取压缩文件中的数据。而且,它的参数中可以存在通配符,用于从一个目录中读取多个文件。下面是一个例子。
textFile的第二个参数是一个可选参数,它用于指定分区的个数。默认情况下,Spark为每一个文件分块创建一个分区。可以设置成一个更大的数字从而提高并行化程度,但是设置成一个小于文件分块数的数字是不可以的。
wholeTextFiles
这个方法读取目录下的所有文本文件,然后返回一个由键值型RDD。返回RDD中的每一个键值对对应一个文件。键为文件路径,对应的值为该文件的内容。这个方法可以从多种来源读取数据,包括本地文件系统、HDFS、Amazon S3,或者其他Hadoop支持的存储系统。
sequenceFile
sequenceFile方法从SequenceFile文件中获取键值对数据,这些SequenceFile文件可以存储于本地文件系统、HDFS或者其他Hadoop支持的存储系统。这个方法返回一个键值对型RDD实例。当使用这个方法的时候,不仅需要提供文件名,还需要提供文件中数据键和值各自的类型。
3.5.4 RDD操作
Spark应用使用RDD类或其继承类中定义的方法来处理数据。这些方法也称为操作。既然Scala中可以把一个方法当成操作符使用,那么RDD中的方法有时也称为操作符。
Spark的美好之处就在于同样一个RDD方法既可以处理几字节的数据也可以处理PB级的数据。而且Spark应用可以使用同样的方法去处理数据,无论它是存储于本地还是存储于一个分布式存储系统。这样的灵活性使得开发者可以在单机上开发、调试、测试Spark应用,然后不用改动任何代码就可以将它部署到一个大集群上。
RDD操作可以归为两类:转换和行动。转换将会创建一个新的RDD实例。行动则会将结果返回给驱动程序。
转换
转换指的是在原RDD实例上进行计算,而后创建一个新的RDD实例。本节将介绍一些常见的转换操作。
从概念上看,RDD转换操作的类似于Scala集合上的方法。主要的区别在于Scala集合方法操作的数据是在单机内存中的,而RDD的转换操作可以处理分布在集群各个节点上的数据。另外一个重要的区别是,RDD转换操作是惰性的,而Scala集合方法不是。本章余下部分会详细介绍这些内容。
map
map方法是一个高阶方法,它把一个函数作为它的参数,并把这个函数作用在原RDD的每个元素上,从而创建一个新RDD实例。这个作为参数的函数拥有一个参数并返回一个值。
filter
filter方法是一个高阶方法,它把一个布尔函数作为它的参数,并把这个函数作用在原RDD的每个元素上,从而创建一个新RDD实例。一个布尔函数只有一个参数作为输入,返回true或false。filter方法返回一个新的RDD实例,这个RDD实例代表的数据集由布尔函数返回true的元素构成。因此,新RDD实例代表的数据集是原RDD的子集。
flatMap
flatMap方法是一个高阶方法,它把一个函数作为它的参数,这个函数处理原RDD中每个元素返回一个序列。扁平化这个序列的集合得到一个数据集,flatMap方法返回的RDD就代表这个数据集。
mapPartitions
mapPartitions是一个高阶方法,它使你可以以分区的粒度来处理数据。相比于一次处理一个元素,mapPartitions一次处理处理一个分区,每个分区被当成一个迭代器。mapPartitions方法的函数参数把迭代器作为输入,返回另外一个迭代器作为输出。map-Partitions将自定义函数参数作用于每一个分区上,从而返回一个新RDD实例。
union
union方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例的数据集是原RDD和输入RDD的合集。
intersection
intersection方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例代表的数据集是原RDD和输入RDD的交集。
这是另外一个例子。
subtract
subtract方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例代表的数据集由那些存在于原RDD实例中但不在输入RDD实例中的元素构成。
这是另外一个例子。
distinct
RDD实例上的distinct方法返回一个新RDD实例,这个新RDD实例的数据集由原RDD的数据集去重后得到。
cartesian
cartesian方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例的数据集由原RDD和输入RDD的所有元素的笛卡儿积构成。返回的RDD实例的每一个元素都是一个有序二元组,每一个有序二元组的第一个元素来自原RDD,第二个元素来自输入RDD。元素的个数等于原RDD的元素个数乘以输入RDD的元素个数。
这个方法类似于SQL中的join操作。
zip
zip方法把一个RDD实例作为输入,返回一个新RDD实例,这个新RDD实例的每一个元素是一个二元组,二元组的第一个元素来自原RDD,第二个元素来自输入RDD。和cartesian方法不同的是,zip方法返回的RDD的元素个数于原RDD的元素个数。原RDD的元素个数和输入RDD的相同。进一步地说,原RDD和输入RDD不仅有相同的分区数,每个分区还有相同的元素个数。
zipWithIndex
zipWithIndex方法返回一个新RDD实例,这个新RDD实例的每个元素都是由原RDD元素及其下标构成的二元组。
groupBy
groupBy是一个高阶方法,它将原RDD中的元素按照用户定义的标准分组从而组成一个RDD。它把一个函数作为它的参数,这个函数为原RDD中的每一个元素生成一个键。groupBy把这个函数作用在原RDD的每一个元素上,然后返回一个由二元组构成的新RDD实例,每个二元组的第一个元素是函数生成的键,第二个元素是对应这个键的所有原RDD元素的集合。其中,键和原RDD元素的对应关系由那个作为参数的函数决定。
需要注意的是,groupBy是一个费时操作,因为它可能需要对数据做shuffle操作。
假设有一个CSV文件,文件的内容为公司客户的姓名、年龄、性别和邮编。下面的示例代码演示了按照邮编将客户分组。
keyBy
keyBy方法与groupBy方法相类似。它是一个高阶方法,把一个函数作为参数,这个函数为原RDD中的每一个元素生成一个键。keyBy方法把这个函数作用在原RDD的每一个元素上,然后返回一个由二元组构成的新RDD实例,每个二元组的第一个元素是函数生成的键,第二个元素是对应这个键的原RDD元素。其中,键和原RDD元素的对应关系由那个作为参数的函数决定。返回的RDD实例的元素个数和原RDD的相同。
groupBy和KeyBy的区别在于返回RDD实例的元素上。虽然都是二元组,但是
groupBy返回的二元组中的第二个元素是一个集合,而keyBy的是单个值。
sortBy
sortBy是一个高阶方法,它将原RDD中的元素进行排序后组成一个新的RDD实例返回。它拥有两个参数。第一个参数是一个函数,这个函数将为原RDD的每一个元素生成一个键。第二个参数用来指明是升序还是降序排列。
下面是另一个示例。
pipe
pipe方法可以让你创建子进程来运行一段外部程序,然后捕获它的输出作为字符串,用这些字符串构成RDD实例返回。
randomSplit
randomSplit方法将原RDD分解成一个RDD数组。它的参数是分解的权重。
coalesce
coalesce方法用于减少RDD的分区数量。它把分区数作为参数,返回分区数等于这个参数的RDD实例。
使用coalesce方法时需要小心,因为减少了RDD的分区数也就意味着降低了Spark的并行能力。它通常用于合并小分区。举例来说,在执行filter操作之后,RDD可能会有很多小分区。在这种情况下,减少分区数能提升性能。
repartition
repartition方法把一个整数作为参数,返回分区数等于这个参数的RDD实例。它有助于提高Spark的并行能力。它会重新分布数据,因此它是一个耗时操作。
coalesce和repartition方法看起来一样,但是前者用于减少RDD中的分区,后者用于增加RDD中的分区。
sample
sample方法返回原RDD数据集的一个抽样子集。它拥有三个参数。第一个参数指定是有放回抽样还是无放回抽样。第二个参数指定抽样比例。第三个参数是可选的,指定抽样的随机数种子。
键值对型RDD的转换
除了上面介绍的RDD转换之外,针对键值对型RDD还支持其他的一些转换。下面将介绍只能作用于键值对型RDD的常用转换操作。
keys
keys方法返回只由原RDD中的键构成的RDD。
values
values方法返回只由原RDD中的值构成的RDD。
mapValues
mapValues是一个高阶方法,它把一个函数作为它的参数,并把这个函数作用在原RDD的每个值上。它返回一个由键值对构成的RDD。它和map方法类似,不同点在于它把作为参数的函数作用在原RDD的值上,所以原RDD的键都没有变。返回的RDD和原RDD拥有相同的键。
join
join方法把一个键值对型RDD作为参数输入,而后在原RDD和输入RDD上做内连接操作。它返回一个由二元组构成的RDD。二元组的第一个元素是原RDD和输入RDD都有的键,第二个元素是一个元组,这个元组由原RDD和输入RDD中键对应的值构成。
leftOuterJoin
leftOuterJoin方法把一个键值对型RDD作为参数输入,而后在原RDD和输入RDD之间做左连接操作。它返回一个由键值对构成的RDD。键值对的第一个元素是原RDD中的键,第二个元素是一个元组,这个元组由原RDD中键对应的值和输入RDD中的可选值构成。可选值用Option类型表示。
rightOuterJoin
rightOuterJoin方法把一个键值对型RDD作为参数输入,而后在原RDD和输入RDD之间做右连接操作。它返回一个由键值对构成的RDD。键值对的第一个元素是输入RDD中的键,第二个元素是一个元组,这个元组由原RDD中的可选值和输入RDD中键对应的值构成。可选值用Option类型表示。
fullOuterJoin
fullOuterJoin方法把一个键值对型RDD作为参数输入,而后在原RDD和输入RDD之间做全连接操作。它返回一个由键值对构成的RDD。
sampleByKey
sampleByKey通过在键上抽样返回原RDD的一个子集。它把对每个键的抽样比例作为输入参数,返回原RDD的一个抽样。
subtractByKey
subtractByKey方法把一个键值对型RDD作为输入参数,返回一个键值对RDD,这个键值对RDD的键都是只存在原RDD中但是不存在于输入RDD中。
groupByKey
groupByKey方法返回一个由二元组构成的RDD,二元组的第一个元素是原RDD的键,第二个元素是一个集合,集合由该键对应的所有值构成。它类似于上面介绍过的group-By方法。二者的区别在于groupBy是一个高阶方法,它的参数是一个函数,这个函数为原RDD的每一个元素生成一个键。groupByKey方法作用于RDD的每一个键值对上,故不需要一个生成键的函数作为输入参数。
应当尽量避免使用groupByKey。它是一个耗时操作,因为它可能会对数据进行shuffle操作。在大多数情况下,都有不使用groupByKey的更好的替代方案。
reduceByKey
reduceByKey是一个高阶方法,它把一个满足结合律的二元操作符当作输入参数。它把这个操作符作用于有相同键的值上。
一个二元操作符把两个值当作输入参数,返回一个值。一个满足结合律的二元操作符返回同样的结果,但是它不关心操作数的分组情况。
reduceByKey方法可以用于对同一键对应的值进行汇总操作。比如它可以用于对同一键对应的值进行求和,求乘积,求最小值,求最大值。
对于基于键的汇总操作、合并操作,reduceByKey比groupByKey更合适。
操作
操作指的是那些返回值给驱动程序的RDD方法。本节介绍一些RDD中常用的操作。
collect
collect方法返回一个数组,这个数组由原RDD中的元素构成。在使用这个方法的时候需要小心,因为它把在worker节点的数据移给了驱动程序。如果操作一个有大数据集的RDD,它有可能会导致驱动程序崩溃。
count
count方法返回原RDD中元素的个数。
countByValue
countByValue方法返回原RDD中每个元素的个数。它返回是一个map类实例,其中,键为元素的值,值为该元素的个数。
first
first方法返回原RDD中的第一个元素。
max
max方法返回RDD中最大的元素。
min
min方法返回RDD中最小的元素。
take
take方法的输入参数为一个整数N,它返回一个由原RDD中前N个元素构成的RDD。
takeOrdered
takeOrdered方法的输入参数为一个整数N,它返回一个由原RDD中前N小的元素构成的RDD。
top
top方法的输入参数为一个整数N,它返回一个由原RDD中前N大的元素构成的RDD。
fold
fold是一个高阶方法,用于对原RDD的元素做汇总操作,汇总的时候使用一个自定义的初值和一个满足结合律的二元操作符。它首先在每一个RDD的分区中进行汇总,然后再汇总这些结果。
初值的取值取决于RDD中的元素类型和汇总操作的目的。比如,给定一个元素为整数的RDD,为了计算这个RDD中所有元素的和,初值取为0。相反,给定一个元素为整数的RDD,为了计算这个RDD中所有元素的乘积,初值则应取为1。
reduce
reduce是一个高阶方法,用于对原RDD的元素做汇总操作,汇总的时候使用一个满足结合律和交换律的二元操作符。它类似于fold方法,然而,它并不需要初值。
键值对型RDD上的操作
键值对RDD上有一些额外的操作,我们在下面进行介绍。
countByKey
countByKey方法用于统计原RDD每个键的个数。它返回一个map类实例,其中,键为原RDD中的键,值为个数。
lookup
lookup方法的输入参数为一个键,返回一个序列,这个序列的元素为原RDD中这个键对应的值。
数值型RDD上的操作
如果RDD的元素类型为Integer、Long、Float或Double,则这样的RDD为数值型RDD。这类RDD还有一些对于统计分析十分有用的额外操作,下面将介绍一些常用的行动。
mean
mean方法返回原RDD中元素的平均值。
stdev
stdev方法返回原RDD中元素的标准差。
sum
sum方法返回原RDD中所有元素的和。
variance
variance方法返回原RDD中元素的方差。
3.5.5 保存RDD
一般来说,数据处理完毕后,结果会保存在硬盘上。Spark允许开发者将RDD保存在任何Hadoop支持的存储系统中。保存在硬盘上的RDD可以被其他Spark应用或Hadoop应用使用。
本节介绍将RDD保存成文件的常用方法。
saveAsTextFile
saveAsTextFile方法将原RDD中的元素保存在指定目录中,这个目录位于任何Hadoop支持的存储系统中。每一个RDD中的元素都用字符串表示并另存为文本中的一行。
saveAsObjectFile
saveAsObjectFile方法将原RDD中的元素序列化成Java对象,存储在指定目录中。
saveAsSequenceFile
saveAsSequenceFile方法将键值对型RDD以SequenceFile的格式保存。键值对型RDD也可以以文本的格式保存,只须使用saveAsTextFile方法即可。
需要注意的是,上面的方法都把一个目录的名字作为输入参数,然后在这个目录为每个RDD分区创建一个文件。这种设计不仅高效而且可容错。因为每一个分区被存成一个文件,所以Spark在保存RDD的时候可以启动多个任务,并行执行,将数据写入文件系统中。这样也保证了写入数据的过程是可容错的。一旦有一个将分区写入文件的任务失败了,Spark可以再启动一个任务,重写刚才失败任务创建的文件。