Spark学习之RDD简单算子

collect

返回RDD的所有元素


  1. scala> var input=sc.parallelize(Array(-1,0,1,2,2)) 
  2. input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:27 
  3. scala> var result=input.collect 
  4. result: Array[Int] = Array(-1, 0, 1, 2, 2) 

count,coutByValue

count返回RDD的元素数量,countByValue返回每个值的出现次数


  1. scala> var input=sc.parallelize(Array(-1,0,1,2,2)) 
  2. scala> var result=input.count 
  3. result: Long = 5 
  4. scala> var result=input.countByValue 
  5. result: scala.collection.Map[Int,Long] = Map(0 -> 1, 1 -> 1, 2 -> 2, -1 -> 1) 

take,top,takeOrdered

take返回RDD的前N个元素 takeOrdered默认返回升序排序的前N个元素,可以指定排序算法 Top返回降序排序的前N个元素


  1. var input=sc.parallelize(Array(1,2,3,4,9,8,7,5,6)) 
  2.  
  3. scala> var result=input.take(6) 
  4. result: Array[Int] = Array(1, 2, 3, 4, 9, 8) 
  5. scala> var result=input.take(20) 
  6. result: Array[Int] = Array(1, 2, 3, 4, 9, 8, 7, 5, 6) 
  7.  
  8. scala> var result=input.takeOrdered(6) 
  9. result: Array[Int] = Array(1, 2, 3, 4, 5, 6) 
  10. scala> var result=input.takeOrdered(6)(Ordering[Int].reverse) 
  11. result: Array[Int] = Array(9, 8, 7, 6, 5, 4) 
  12.  
  13. scala> var result=input.top(6) 
  14. result: Array[Int] = Array(9, 8, 7, 6, 5, 4 

Filter

传入返回值为boolean的函数,返回改函数结果为true的RDD


  1. scala> var input=sc.parallelize(Array(-1,0,1,2)) 
  2. scala> var result=input.filter(_>0).collect() 
  3. result: Array[Int] = Array(1, 2) 

map,flatmap

map对每个元素执行函数,转换为新的RDD,flatMap和map类似,但会把map的返回结果做flat处理,就是把多个Seq的结果拼接成一个Seq输出


  1. scala> var input=sc.parallelize(Array(-1,0,1,2)) 
  2. scala> var result=input.map(_+1).collect 
  3. result: Array[Int] = Array(0, 1, 2, 3) 
  4.  
  5. scala>var result=input.map(x=>x.to(3)).collect 
  6. result: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(-1, 0, 1, 2, 3), Range(0, 1, 2, 3), Range(1, 2, 3), Range(2, 3)) 
  7.  
  8. scala>var result=input.flatMap(x=>x.to(3)).collect 
  9. result: Array[Int] = Array(-1, 0, 1, 2, 3, 0, 1, 2, 3, 1, 2, 3, 2, 3) 

distinct

RDD去重


  1. scala>var input=sc.parallelize(Array(-1,0,1,2,2)) 
  2. scala>var result=input.distinct.collect 
  3. result: Array[Int] = Array(0, 1, 2, -1) 

Reduce

通过函数聚集RDD中的所有元素


  1. scala> var input=sc.parallelize(Array(-1,0,1,2)) 
  2. scala> var result=input.reduce((x,y)=>{println(x,y);x+y}) 
  3. (-1,1)  //处理-1,1,结果为0,RDD剩余元素为{0,2} 
  4. (0,2)   //上面的结果为0,在处理0,2,结果为2,RDD剩余元素为{0} 
  5. (2,0)   //上面结果为2,再处理(2,0),结果为2,RDD剩余元素为{} 
  6. result: Int = 2 

sample,takeSample

sample就是从RDD中抽样,第一个参数withReplacement是指是否有放回的抽样,true为放回,为false为不放回,放回就是抽样结果可能重复,第二个参数是fraction,0到1之间的小数,表明抽样的百分比 takeSample类似,但返回类型是Array,第一个参数是withReplacement,第二个参数是样本个数


  1. var rdd=sc.parallelize(1 to 20) 
  2.  
  3. scala> rdd.sample(true,0.5).collect 
  4. res33: Array[Int] = Array(6, 8, 13, 15, 17, 17, 17, 18, 20) 
  5.  
  6. scala> rdd.sample(false,0.5).collect 
  7. res35: Array[Int] = Array(1, 3, 10, 11, 12, 13, 14, 17, 18) 
  8.  
  9. scala> rdd.sample(true,1).collect 
  10. res44: Array[Int] = Array(2, 2, 3, 5, 6, 6, 8, 9, 9, 10, 10, 10, 14, 15, 16, 17, 17, 18, 19, 19, 20, 20) 
  11.  
  12. scala> rdd.sample(false,1).collect 
  13. res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) 
  14.  
  15. scala> rdd.takeSample(true,3) 
  16. res1: Array[Int] = Array(1, 15, 19) 
  17.  
  18. scala> rdd.takeSample(false,3) 
  19. res2: Array[Int] = Array(7, 16, 6) 

collectAsMap,countByKey,lookup

collectAsMap把PairRDD转为Map,如果存在相同的key,后面的会覆盖前面的。 countByKey统计每个key出现的次数 Lookup返回给定key的所有value


  1. scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) 
  2.  
  3. scala> var result=input.collectAsMap 
  4. result: scala.collection.Map[Int,String] = Map(2 -> two, 4 -> four, 1 -> one, 3 -> three) 
  5.  
  6. scala> var result=input.countByKey 
  7. result: scala.collection.Map[Int,Long] = Map(1 -> 2, 2 -> 1, 3 -> 1, 4 -> 1) 
  8.  
  9. scala> var result=input.lookup(1) 
  10. result: Seq[String] = WrappedArray(1, one) 
  11.  
  12. scala> var result=input.lookup(2) 
  13. result: Seq[String] = WrappedArray(two) 

groupBy,keyBy

groupBy根据传入的函数产生的key,形成元素为K-V形式的RDD,然后对key相同的元素分组 keyBy对每个value,为它加上key


  1. scala> var rdd=sc.parallelize(List("A1","A2","B1","B2","C")) 
  2. scala> var result=rdd.groupBy(_.substring(0,1)).collect 
  3. result: Array[(String, Iterable[String])] = Array((A,CompactBuffer(A1, A2)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C))) 
  4.  
  5. scala> var rdd=sc.parallelize(List("hello","world","spark","is","fun")) 
  6. scala> var result=rdd.keyBy(_.length).collect 
  7. result: Array[(Int, String)] = Array((5,hello), (5,world), (5,spark), (2,is), (3,fun)) 

keys,values


  1. scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) 
  2. scala> var result=input.keys.collect 
  3. result: Array[Int] = Array(1, 1, 2, 3, 4) 
  4. scala> var result=input.values.collect 
  5. result: Array[String] = Array(1, one, two, three, four) 
  6.  
  7. mapvalues 
  8. mapvalues对K-V形式的RDD的每个Value进行操作 
  9. scala> var input=sc.parallelize(List((1,"1"),(1,"one"),(2,"two"),(3,"three"),(4,"four"))) 
  10. scala> var result=input.mapValues(_*2).collect 
  11. result: Array[(Int, String)] = Array((1,11), (1,oneone), (2,twotwo), (3,threethree), (4,fourfour)) 

union,intersection,subtract,cartesian

union合并2个集合,不去重 subtract将第一个集合中的同时存在于第二个集合的元素去掉 intersection返回2个集合的交集 cartesian返回2个集合的笛卡儿积


  1. scala> var rdd1=sc.parallelize(Array(-1,1,1,2,3)) 
  2. scala> var rdd2=sc.parallelize(Array(0,1,2,3,4)) 
  3.  
  4. scala> var result=rdd1.union(rdd2).collect 
  5. result: Array[Int] = Array(-1, 1, 1, 2, 3, 0, 1, 2, 3, 4) 
  6.  
  7. scala> var result=rdd1.intersection(rdd2).collect 
  8. result: Array[Int] = Array(1, 2, 3) 
  9.  
  10. scala> var result=rdd1.subtract(rdd2).collect 
  11. result: Array[Int] = Array(-1) 
  12.  
  13. scala> var result=rdd1.cartesian(rdd2).collect 
  14. result: Array[(Int, Int)] = Array((-1,0), (-1,1), (-1,2), (-1,3), (-1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (1,0), (1,1), (1,2), (1,3), (1,4), (2,0), (2,1), (2,2), (2,3), (2,4), (3,0), (3,1), (3,2), (3,3), (3,4))
    本文作者:Endless2010
    来源:51CTO
时间: 2024-08-08 05:52:49

Spark学习之RDD简单算子的相关文章

Spark学习之RDD编程(2)

Spark学习之RDD编程(2) 1. Spark中的RDD是一个不可变的分布式对象集合. 2. 在Spark中数据的操作不外乎创建RDD.转化已有的RDD以及调用RDD操作进行求值. 3. 创建RDD:1)读取一个外部数据集2)在驱动器程序里分发驱动器程序中的对象集合. 4. RDD支持的操作: 1)转换操作,由一个RDD生成一个新的RDD. 2)行动操作,对RDD进行计算结果,并把结果返回到驱动器程序中,或者把结果存储到外部存储系统(如HDFS). 5. Spark程序或者shell会话都会

Spark学习之键值对(pair RDD)操作(3)

Spark学习之键值对(pair RDD)操作(3) 1. 我们通常从一个RDD中提取某些字段(如代表事件时间.用户ID或者其他标识符的字段),并使用这些字段为pair RDD操作中的键. 2. 创建pair RDD 1)读取本身就是键值对的数据 2)一个普通的RDD通过map()转为pair RDD,传递的函数需要返回键值对. Python中使用第一个单词作为键创建出一个pair RDD pairs = lines.amp(lambda x: (x.split(" ")[0],x))

从Storm和Spark 学习流式实时分布式计算的设计

0. 背景 最近我在做流式实时分布式计算系统的架构设计,而正好又要参加CSDN博文大赛的决赛.本来想就写Spark源码分析的文章吧.但是又想毕竟是决赛,要拿出一些自己的干货出来,仅仅是源码分析貌似分量不够.因此,我将最近一直在做的系统架构的思路整理出来,形成此文.为什么要参考Storm和Spark,因为没有参照效果可能不会太好,尤其是对于Storm和Spark由了解的同学来说,可能通过对比,更能体会到每个具体实现背后的意义. 本文对流式系统出现的背景,特点,数据HA,服务HA,节点间和计算逻辑间

Spark学习之数据读取与保存(4)

Spark学习之数据读取与保存(4) 1. 文件格式 Spark对很多种文件格式的读取和保存方式都很简单. 如文本文件的非结构化的文件,如JSON的半结构化文件,如SequenceFile结构化文件.通过扩展名进行处理. 2. 读取/保存文本文件 Python中读取一个文本文件 input = sc.textfile("file:///home/holen/repos/spark/README.md") Scala中读取一个文本文件 val input = sc.textFile(&q

Spark学习之Spark调优与调试(7)

Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项. 当创建一个SparkContext时就会创建一个SparkConf实例. 2. Spark特定的优先级顺序来选择实际配置: 优先级最高的是在用户代码中显示调用set()方法设置选项: 其次是通过spark-submit传递的参数: 再次是写在配置文件里的值: 最后是系统的默认值. 3.查看应用进度信息和性能指标有两种方式:网页用户界面.驱动器和执行器进程生成的日志文件. 4.

用Spark学习矩阵分解推荐算法

在矩阵分解在协同过滤推荐算法中的应用中,我们对矩阵分解在推荐算法中的应用原理做了总结,这里我们就从实践的角度来用Spark学习矩阵分解推荐算法. 1. Spark推荐算法概述 在Spark MLlib中,推荐算法这块只实现了基于矩阵分解的协同过滤推荐算法.而基于的算法是FunkSVD算法,即将m个用户和n个物品对应的评分矩阵M分解为两个低维的矩阵: 其中k为分解成低维的维数,一般远比m和n小.如果大家对FunkSVD算法不熟悉,可以复习对应的原理篇. 2. Spark推荐算法类库介绍 在Spar

『 Spark 』5. 这些年,你不能错过的 spark 学习资源

原文链接:『 Spark 』5. 这些年,你不能错过的 spark 学习资源 写在前面 本系列是综合了自己在学习spark过程中的理解记录 + 对参考文章中的一些理解 + 个人实践spark过程中的一些心得而来.写这样一个系列仅仅是为了梳理个人学习spark的笔记记录,所以一切以能够理解为主,没有必要的细节就不会记录了,而且文中有时候会出现英文原版文档,只要不影响理解,都不翻译了.若想深入了解,最好阅读参考文章和官方文档. 其次,本系列是基于目前最新的 spark 1.6.0 系列开始的,spa

Spark学习之Spark Streaming(9)

Spark学习之Spark Streaming(9) 1. Spark Streaming允许用户使用一套和批处理非常接近的API来编写流式计算应用,这就可以大量重用批处理应用的技术甚至代码. 2. Spark Streaming使用离散化(discretized steam)作为抽象表示,叫做DStream.DStream是随时间推移而收到的数据的序列. 3. DSteam支持两种操作:转换操作(transformation),会生成一个新的DStream:另一种是输出操作(output op

Spark学习之在集群上运行Spark(6)

Spark学习之在集群上运行Spark(6) 1. Spark的一个优点在于可以通过增加机器数量并使用集群模式运行,来扩展程序的计算能力. 2. Spark既能适用于专用集群,也可以适用于共享的云计算环境. 3. Spark在分布式环境中的架构: Created with Raphaël 2.1.0我的操作集群管理器Mesos.YARN.或独立集群管理器N个集群工作节点(执行器进程) Spark集群采用的是主/从结构,驱动器(Driver)节点和所有执行器(executor)节点一起被称为一个S