[大数据之Spark]——Transformations转换入门经典实例

Spark相比于Mapreduce的一大优势就是提供了很多的方法,可以直接使用;另一个优势就是执行速度快,这要得益于DAG的调度,想要理解这个调度规则,还要理解函数之间的依赖关系。

本篇就着重描述下Spark提供的Transformations方法.

依赖关系

宽依赖和窄依赖

窄依赖(narrow dependencies)

窄依赖是指父RDD仅仅被一个子RDD所使用,子RDD的每个分区依赖于常数个父分区(O(1),与数据规模无关)。

  • 输入输出一对一的算子,且结果RDD的分区结构不变。主要是map/flatmap
  • 输入输出一对一的算子,但结果RDD的分区结构发生了变化,如union/coalesce
  • 从输入中选择部分元素的算子,如filter、distinct、substract、sample
宽依赖(wide dependencies)

宽依赖是指父RDD被多个子分区使用,子RDD的每个分区依赖于所有的父RDD分区(O(n),与数据规模有关)

  • 对单个RDD基于key进行重组和reduce,如groupByKey,reduceByKey
  • 对两个RDD基于key进行join和重组,如join(父RDD不是hash-partitioned )
  • 需要进行分区,如partitionBy

Transformations转换方法实例

map(func)

map用于遍历rdd中的每个元素,可以针对每个元素做操作处理:

scala> var data = sc.parallelize(1 to 9,3)
//内容为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> data.map(x=>x*2).collect()
//输出内容 Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

filter(func)

filter用于过滤元素信息,仅仅返回满足过滤条件的元素

scala> var data = sc.parallelize(1 to 9,3)
//内容为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> data.filter(x=> x%2==0).collect()
//输出内容 Array[Int] = Array(2, 4, 6, 8)

flatMap(func)

flatMap与map相比,不同的是可以输出多个结果,比如

scala> var data = sc.parallelize(1 to 4,1)
//输出内容为 Array[Int] = Array(1, 2, 3, 4)

scala> data.flatMap(x=> 1 to x).collect()
//输出内容为 Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

mapPartitions(func)

mapPartitions与map类似,只不过每个元素都是一个分区的迭代器,因此内部可以针对分区为单位进行处理。

比如,针对每个分区做和

//首先创建三个分区
scala> var data = sc.parallelize(1 to 9,3)
//输出为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

//查看分区的个数
scala> data.partitions.size
//输出为 Int = 3

//使用mapPartitions
scala> var result = data.mapPartitions{ x=> {
     | var res = List[Int]()
     | var i = 0
     | while(x.hasNext){
     | i+=x.next()
     | }
     | res.::(i).iterator
     | }}

scala> result.collect
//输出为 Array[Int] = Array(6, 15, 24)

mapPartitionsWithIndex(func)

这个方法与上面的mapPartitions相同,只不过多提供了一个Index参数。

//首先创建三个分区
scala> var data = sc.parallelize(1 to 9,3)
//输出为 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

//查看分区的个数
scala> data.partitions.size
//输出为 Int = 3

scala> var result = data.mapPartitionsWithIndex{
     | (x,iter) => {
     | var result = List[String]()
     | var i = 0
     | while(iter.hasNext){
     | i += iter.next()
     | }
     | result.::( x + "|" +i).iterator
     | }}

result.collect
//输出结果为 Array[String] = Array(0|6, 1|15, 2|24)

sample(withReplacement, fraction, seed)

这个方法可以用于对数据进行采样,比如从1000个数据里面随机5个数据。

  • 第一个参数withReplacement代表是否进行替换,如果选true,上面的例子中,会出现重复的数据
  • 第二个参数fraction 表示随机的比例
  • 第三个参数seed 表示随机的种子
//创建数据
var data = sc.parallelize(1 to 1000,1)

//采用固定的种子seed随机
data.sample(false,0.005,0).collect
//输出为 Array[Int] = Array(53, 423, 433, 523, 956, 990)

//采用随机种子
data.sample(false,0.005,scala.util.Random.nextInt(1000)).collect
//输出为 Array[Int] = Array(136, 158)

union(otherDataset)

union方法可以合并两个数据集,但是不会去重,仅仅合并而已。

//创建第一个数据集
scala> var data1 = sc.parallelize(1 to 5,1)

//创建第二个数据集
scala> var data2 = sc.parallelize(3 to 7,1)

//取并集
scala> data1.union(data2).collect
//输出为 Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)

intersection(otherDataset)

这个方法用于取两个数据集的交集

//创建第一个数据集
scala> var data1 = sc.parallelize(1 to 5,1)

//创建第二个数据集
scala> var data2 = sc.parallelize(3 to 7,1)

//取交集
scala> data1.intersection(data2).collect
//输出为 Array[Int] = Array(4, 3, 5)

distinct([numTasks]))

这个方法用于对本身的数据集进行去重处理。

//创建数据集
scala> var data = sc.parallelize(List(1,1,1,2,2,3,4),1)

//执行去重
scala> data.distinct.collect
//输出为 Array[Int] = Array(4, 1, 3, 2)

//如果是键值对的数据,kv都相同,才算是相同的元素
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))

//执行去重
scala> data.distinct.collect
//输出为 Array[(String, Int)] = Array((A,1), (B,1), (A,2))

groupByKey([numTasks])

这个方法属于宽依赖的方法,针对所有的kv进行分组,可以把相同的k的聚合起来。如果要想计算sum等操作,最好使用reduceByKey或者combineByKey

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))

//分组输出
scala> data.groupByKey.collect
//输出为 Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(1, 1, 2)))

reduceByKey(func, [numTasks])

这个方法用于根据key作分组计算,但是它跟reduce不同,它还是属于transfomation的方法。

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))

scala> data.reduceByKey((x,y) => x+y).collect
//输出为 Array[(String, Int)] = Array((B,1), (A,4))

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

aggregateByKey比较复杂,我也不是很熟练,不过试验了下,大概的意思是针对分区内部使用seqOp方法,针对最后的结果使用combOp方法。

比如,想要统计分区内的最大值,然后再全部统计加和:

scala> var data = sc.parallelize(List((1,1),(1,2),(1,3),(2,4)),2)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[54] at parallelize at <console>:27

scala> def sum(a:Int,b:Int):Int = { a+b }
sum: (a: Int, b: Int)Int

scala> data.aggregateByKey(0)(sum,sum).collect
res42: Array[(Int, Int)] = Array((2,4), (1,6))

scala> def max(a:Int,b:Int):Int = { math.max(a,b) }
max: (a: Int, b: Int)Int

scala> data.aggregateByKey(0)(max,sum).collect
res44: Array[(Int, Int)] = Array((2,4), (1,5))

sortByKey([ascending], [numTasks])

sortByKey用于针对Key做排序,默认是按照升序排序。

//创建数据集
scala> var data = sc.parallelize(List(("A",2),("B",2),("A",1),("B",1),("C",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27

//对数据集按照key进行默认排序
scala> data.sortByKey().collect
res23: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))

//升序排序
scala> data.sortByKey(true).collect
res24: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))

//降序排序
scala> data.sortByKey(false).collect
res25: Array[(String, Int)] = Array((C,1), (B,2), (B,1), (A,2), (A,1))

join(otherDataset, [numTasks])

join方法为(K,V)和(K,W)的数据集调用,返回相同的K,所组成的数据集。相当于sql中的按照key做连接。

有点类似于 select a.value,b.value from a inner join b on a.key = b.key;

举个例子

//创建第一个数据集
scala> var data1 = sc.parallelize(List(("A",1),("B",2),("C",3)))

//创建第二个数据集
scala> var data2 = sc.parallelize(List(("A",4)))

//创建第三个数据集
scala> var data3 = sc.parallelize(List(("A",4),("A",5)))

data1.join(data2).collect
//输出为 Array[(String, (Int, Int))] = Array((A,(1,4)))

data1.join(data3).collect
//输出为 Array[(String, (Int, Int))] = Array((A,(1,4)), (A,(1,5)))

cogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, (Seq[V], Seq[W]))元组的数据集。

//创建第一个数据集
scala> var data1 = sc.parallelize(List(("A",1),("B",2),("C",3)))

//创建第二个数据集
scala> var data2 = sc.parallelize(List(("A",4)))

//创建第三个数据集
scala> var data3 = sc.parallelize(List(("A",4),("A",5)))

scala> data1.cogroup(data2).collect
//输出为 Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(4))), (C,(CompactBuffer(3),CompactBuffer())))

scala> data1.cogroup(data3).collect
//输出为 Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(4, 5))), (C,(CompactBuffer(3),CompactBuffer())))

cartesian(otherDataset)

这个方法用于计算两个(K,V)数据集之间的笛卡尔积

//创建第一个数据集
scala> var a = sc.parallelize(List(1,2))

//创建第二个数据集
scala> var b = sc.parallelize(List("A","B"))

//计算笛卡尔积
scala> a.cartesian(b).collect
//输出结果 res2: Array[(Int, String)] = Array((1,A), (1,B), (2,A), (2,B))

pipe(command, [envVars])

pipe方法用于针对每个分区的RDD执行一个shell脚本命令,可以使perl或者bash。分区的元素将会被当做输入,脚本的输出则被当做返回的RDD值。

//创建数据集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:27

//测试脚本
scala> data.pipe("head -n 1").collect
res26: Array[String] = Array(1, 4, 7)

scala> data.pipe("tail -n 1").collect
res27: Array[String] = Array(3, 6, 9)

scala> data.pipe("tail -n 2").collect
res28: Array[String] = Array(2, 3, 5, 6, 8, 9)

coalesce(numPartitions)

这个方法用于对RDD进行重新分区,第一个参数是分区的数量,第二个参数是是否进行shuffle

//创建数据集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27

//查看分区的大小
scala> data.partitions.size
res3: Int = 3

//不使用shuffle重新分区
scala> var result = data.coalesce(2,false)
result: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[19] at coalesce at <console>:29

scala> result.partitions.length
res12: Int = 2

scala> result.toDebugString
res13: String =
(2) CoalescedRDD[19] at coalesce at <console>:29 []
 |  ParallelCollectionRDD[9] at parallelize at <console>:27 []

//使用shuffle重新分区
scala> var result = data.coalesce(2,true)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at coalesce at <console>:29

scala> result.partitions.length
res14: Int = 2

scala> result.toDebugString
res15: String =
(2) MapPartitionsRDD[23] at coalesce at <console>:29 []
 |  CoalescedRDD[22] at coalesce at <console>:29 []
 |  ShuffledRDD[21] at coalesce at <console>:29 []
 +-(3) MapPartitionsRDD[20] at coalesce at <console>:29 []
    |  ParallelCollectionRDD[9] at parallelize at <console>:27 []

repartition(numPartitions)

这个方法作用于coalesce一样,重新对RDD进行分区,相当于shuffle版的calesce

//创建数据集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27

//查看分区的大小
scala> data.partitions.size
res3: Int = 3

scala> var result = data.repartition(2)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at repartition at <console>:29

scala> result.partitions.length
res16: Int = 2

scala> result.toDebugString
res17: String =
(2) MapPartitionsRDD[27] at repartition at <console>:29 []
 |  CoalescedRDD[26] at repartition at <console>:29 []
 |  ShuffledRDD[25] at repartition at <console>:29 []
 +-(3) MapPartitionsRDD[24] at repartition at <console>:29 []
    |  ParallelCollectionRDD[9] at parallelize at <console>:27 []

scala>

repartitionAndSortWithinPartitions(partitioner)

这个方法是在分区中按照key进行排序,这种方式比先分区再sort更高效,因为相当于在shuffle阶段就进行排序。

下面的例子中,由于看不到分区里面的数据。可以通过设置分区个数为1,看到排序的效果。

scala> var data = sc.parallelize(List((1,2),(1,1),(2,3),(2,1),(1,4),(3,5)),2)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[60] at parallelize at <console>:27

scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(2)).collect
res52: Array[(Int, Int)] = Array((2,3), (2,1), (1,2), (1,1), (1,4), (3,5))

scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(1)).collect
res53: Array[(Int, Int)] = Array((1,2), (1,1), (1,4), (2,3), (2,1), (3,5))

scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(3)).collect
res54: Array[(Int, Int)] = Array((3,5), (1,2), (1,1), (1,4), (2,3), (2,1))

参考

spark 官方文档

本文转自博客园xingoo的博客,原文链接:[大数据之Spark]——Transformations转换入门经典实例,如需转载请自行联系原博主。

时间: 2024-10-31 10:24:31

[大数据之Spark]——Transformations转换入门经典实例的相关文章

[大数据之Spark]——快速入门

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用.可以查看编程指南了解更多的内容. 为了良好的阅读下面的文档,最好是结合实际的练习.首先需要下载spark,然后安装hdfs,可以下载任意版本的hdfs. Spark Shell 交互 基本操作 Spark Shell提供给用户一个简单的学习API的方式 以及 快速分析数据的工具.在shell中,既可以使用scala(运行在java虚拟机,因

[大数据之Spark]——Actions算子操作入门实例

Actions reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. 这个方法会传入两个参数,计算这两个参数返回一个

【Spark Summit East 2017】2017年大数据与Spark的发展趋势

本讲义出自Matei Zaharia在Spark Summit East 2017上的演讲,主要介绍了2016年以及2017年大数据与Spark技术的未来的汇合的发展趋势以及Databricks对于使Spark与像深度学习库这样的原生代码能够更好地进行交互所做的工作.

大数据:Spark和Hadoop是友非敌

Spark 在 6 月份取得了激动人心的成绩.在圣何塞举办的 Hadoop 峰会上,Spark 成了人们经常提及的话题和许多演讲的主题.IBM 还在 6 月 15 号宣布,将对 Spark 相关的技术进行巨额投资. 这一声明帮助推动了旧金山Spark 峰会的召开.在这里,人们会看到有越来越多的工程师在学习 Spark,也有越来越多的公司在试验和采用 Spark. 对 Spark 的投资和采用形成了一个正向循环,迅速推动这一重要技术的成熟和发展,让整个大数据社区受益.然而,人们对 Spark 的日

云计算大数据核心技术Spark,星星之火即将燎原

大数据时代,Spark是继Hadoop之后,成为替代Hadoop的下一代云计算大数据核心技术,目前Spark已经构建了自己的整个大数据处理生态系统,如流处理.图技术.机器学习.NoSQL查询等方面都有自己的技术,并且是Apache顶级Project,可以 预计的是 2014年下半年到2015年Spark在社区和商业应用上会有爆发式的增长.目前Spark的技术在国内还属于起步阶段,为了更好地帮助大家了解Spark技术特点及应用前景,我们走访了Spark亚太研究院的院长王家林.       王家林,

助人就是助己:IBM宣布大规模资助开源大数据项目Spark

本周一,IBM宣布将对开源实时大数据分析项目Apache Spark进行大规模资助,蓝色巨人宣称,其资助的力度之大相当于每年数亿美元的投入. Hadoop技术出自Google.Yahoo这些互联网公司,主要是为了对规模庞大的各类数据进行处理和分析.不过近年来随着大数据应用的流行,越来越多的公司也希望自己具备类似的能力,这使得Hadoop逐步进入了主流.Hadoop以及相关的分发企业如Cloudera, Hortonworks等也成为了大数据领域的投资热点. Spark同样也提供大数据处理与分析能

大数据在云计算中转换的4个步骤

如今的企业必须向顾客提供始终如一的高价值体验,否则会失去顾客.他们正在求助于大数据技术.通过大数据分析,组织可以更好地了解他们的客户,了解他们的习惯,并预测他们的需求,以提供更好的客户体验. 但是,大数据转换的路径并不简单.传统数据库管理和数据仓库设备变得过于昂贵,难以维护和规模化.此外,他们无法应对当今面临的挑战,其中包括非结构化数据,物联网(IoT),流数据,以及数字转型相结合的其他技术. 大数据转换的答案是云计算.参与大数据决策的IT专业人士中有64%的人表示已将技术堆栈转移到云端,或正在

预告:大数据(hadoop\spark)解决方案构建详解,以阿里云E-MapReduce为例

在2016年09月22日 20:00 - 21:00笔者将在 CSDN学院,分享<大数据解决方案构建详解 :以阿里云E-MapReduce为例>欢迎大家报名,报名链接:http://edu.csdn.net/huiyiCourse/detail/186 ppt地址:https://yq.aliyun.com/attachment/download/?spm=0.0.0.0.PpnciK&filename=%E5%A4%A7%E6%95%B0%E6%8D%AE%E8%A7%A3%E5%8

大数据在银行的七个应用实例

Hadoop is present in nearly every vertical today that isleveraging big data in order to analyze information and gain competitiveadvantages. Many financial organizations firms are already using Hadoopsolutions successfully and the ones who are not hav