Spark修炼之道(进阶篇)——Spark入门到精通:第六节 Spark编程模型(三)

作者:周志湖
网名:摇摆少年梦
微信号:zhouzhihubeyond

本节主要内容

  1. RDD transformation(续)
  2. RDD actions

1. RDD transformation(续)

(1)repartitionAndSortWithinPartitions(partitioner)
repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。
函数定义:
/**
* Repartition the RDD according to the given partitioner and, within each resulting partition,
* sort records by their keys.
*
* This is more efficient than calling repartition and then sorting within each partition
* because it can push the sorting down into the shuffle machinery.
*/
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]

使用示例:

scala> val data = sc.parallelize(List((1,3),(1,2),(5,4),(1, 4),(2,3),(2,4)),3)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:21

scala> data.repartitionAndSortWithinPartitions(new HashPartitioner(3)).collect
res3: Array[(Int, Int)] = Array((1,4), (1,3), (1,2), (2,3), (2,4), (5,4))

(2)aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。其函数定义如下:
/**
* Aggregate the values of each key, using given combine functions and a neutral “zero value”.
* This function can return a different result type, U, than the type of the values in this RDD,
* V. Thus, we need one operation for merging a V into a U and one operation for merging two U’s,
* as in scala.TraversableOnce. The former operation is used for merging values within a
* partition, and the latter is used for merging values between partitions. To avoid memory
* allocation, both of these functions are allowed to modify and return their first argument
* instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]

示例代码:

import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount{
  def main(args: Array[String]) {
    if (args.length == 0) {
      System.err.println("Usage: SparkWordCount <inputfile> <outputfile>")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("SparkWordCount").setMaster("local")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3),(2,4)))
    def seqOp(a:Int, b:Int) : Int ={
      println("seq: " + a + "\t " + b)
      math.max(a,b)
   }

   def combineOp(a:Int, b:Int) : Int ={
     println("comb: " + a + "\t " + b)
     a + b
   }

   val localIterator=data.aggregateByKey(1)(seqOp, combineOp).toLocalIterator
    for(i<-localIterator) println(i)
    sc.stop()
  }
}

执行结果:

seq: 1 3
seq: 3 2
seq: 3 4
seq: 1 3
seq: 3 4

(1,4)
(2,4)

从输出结果来看,seqOp函数起作用了,但comineOp函数并没有起作用,在Spark 1.5、1.4及1.3三个版本中测试,结果都是一样的。这篇文章http://www.iteblog.com/archives/1261给出了aggregateByKey的使用,其Spark版本是1.1,其返回结果符合预期。个人觉得是版本原因造成的,具体后面有时间再来分析。

RDD中还有其它非常有用的transformation操作,参见API文档:http://spark.apache.org/docs/latest/api/scala/index.html

2. RDD actions

本小节将介绍常用的action操作,前面使用的collect方法便是一种action,它返回RDD中所有的数据元素,方法定义如下:

/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T]

(1) reduce(func)
reduce采样累加或关联操作减少RDD中元素的数量,其方法定义如下:
/**
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T
使用示例:

scala> val data=sc.parallelize(1 to 9)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:22

scala> data.reduce((x,y)=>x+y)
res12: Int = 45

scala> data.reduce(_+_)
res13: Int = 45

(2)count()

/**
* Return the number of elements in the RDD.
*/
def count(): Long

使用示例:

scala> val data=sc.parallelize(1 to 9)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:22
scala> data.count
res14: Long = 9

(3)first()
/**
* Return the first element in this RDD.
*/
def first()

scala> val data=sc.parallelize(1 to 9)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:22
scala> data.first
res15: Int = 1

(4)take(n)

/**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*
* @note due to complications in the internal implementation, this method will raise
* an exception if called on an RDD of Nothing or Null.
*/
def take(num: Int): Array[T]

scala> val data=sc.parallelize(1 to 9)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:22
scala> data.take(2)
res16: Array[Int] = Array(1, 2)

(5) takeSample(withReplacement, num, [seed])

对RDD中的数据进行采样
/**
* Return a fixed-size sampled subset of this RDD in an array
*
* @param withReplacement whether sampling is done with replacement
* @param num size of the returned sample
* @param seed seed for the random number generator
* @return sample of specified size in an array
*/
// TODO: rewrite this without return statements so we can wrap it in a scope
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T]

scala> val data=sc.parallelize(1 to 9)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:22

scala> data.takeSample(false,5)
res17: Array[Int] = Array(6, 7, 4, 1, 2)

scala> data.takeSample(true,5)
res18: Array[Int] = Array(3, 3, 8, 3, 8)

(6) takeOrdered(n, [ordering])

/**
* Returns the first k (smallest) elements from this RDD as defined by the specified
* implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
* For example:
* {{{
* sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)
* // returns Array(2)
*
* sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2)
* // returns Array(2, 3)
* }}}
*
* @param num k, the number of elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

(6) saveAsTextFile(path)

将RDD保存到文件,本地模式时保存在本地文件,集群模式指如果在Hadoop基础上则保存在HDFS上
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String): Unit

scala> data.saveAsTextFile("/data.txt")

(7) countByKey()
将RDD中的数据按Key计数
/**
* Count the number of elements for each key, collecting the results to a local Map.
*
* Note that this method should only be used if the resulting map is expected to be small, as
* the whole thing is loaded into the driver’s memory.
* To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
* returns an RDD[T, Long] instead of a map.
*/
def countByKey(): Map[K, Long]

使用示例:

scala> val data = sc.parallelize(List((1,3),(1,2),(5,4),(1, 4),(2,3),(2,4)),3)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:22

scala> data.countByKey()
res22: scala.collection.Map[Int,Long] = Map(1 -> 3, 5 -> 1, 2 -> 2)

(8)foreach(func)
foreach方法遍历RDD中所有的元素
// Actions (launch a job to return a value to the user program)

/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit

import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf, SparkContext}

object ForEachDemo{
  def main(args: Array[String]) {
    if (args.length == 0) {
      System.err.println("Usage: SparkWordCount <inputfile> <outputfile>")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("SparkWordCount").setMaster("local")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3),(2,4)))

    data.foreach(x=>println("key="+x._1+",value="+x._2))
    sc.stop()
  }
}

Sparkh中还存在其它非常有用的action操作,如foldByKey、sampleByKey等,参见API文档:http://spark.apache.org/docs/latest/api/scala/index.html

时间: 2024-09-09 12:22:26

Spark修炼之道(进阶篇)——Spark入门到精通:第六节 Spark编程模型(三)的相关文章

Spark修炼之道(进阶篇)——Spark入门到精通:第一节 Spark 1.5.0集群搭建

作者:周志湖 网名:摇摆少年梦 微信号:zhouzhihubeyond 本节主要内容 操作系统环境准备 Hadoop 2.4.1集群搭建 Spark 1.5.0 集群部署 注:在利用CentOS 6.5操作系统安装spark 1.5集群过程中,本人发现Hadoop 2.4.1集群可以顺利搭建,但在Spark 1.5.0集群启动时出现了问题(可能原因是64位操作系统原因,源码需要重新编译,但本人没经过测试),经本人测试在ubuntu 10.04 操作系统上可以顺利成功搭建.大家可以利用CentOS

Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析

1.整体运行流程 使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits

Scala入门到精通——第六节:类和对象(一)

本节主要内容 1 类定义.创建对象 2 主构造器 3 辅助构造器 类定义.创建对象 //采用关键字class定义 class Person { //类成员必须初始化,否则会报错 //这里定义的是一个公有成员 var name:String=null } Person类在编译后会生成Person.class文件 利用javap -prviate Person命令查看字节码文件内容,可以看得到以下内容 D:\ScalaWorkspace\ScalaChapter06\bin\cn\scala\xtw

Spark修炼之道系列教程预告

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 部分内容会在实际编写时动态调整,或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据开

Spark修炼之道——Spark学习路线、课程大纲

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 部分内容会在实际编写时动态调整,或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据开

Spark修炼之道(进阶篇)——Spark入门到精通:第七节 Spark运行原理

作者:周志湖 网名:摇摆少年梦 微信号:zhouzhihubeyond 本节主要内容 Spark运行方式 Spark运行原理解析 本节内容及部分图片来自: http://blog.csdn.net/book_mmicky/article/details/25714419 http://blog.csdn.net/yirenboy/article/details/47441465 这两篇文件对Spark的运行架构原理进行了比较深入的讲解,写得非常好,建议大家认真看一下,在此向作者致敬! 1. Sp

Spark修炼之道(进阶篇)——Spark入门到精通:第十二节 Spark Streaming—— DStream Window操作

作者:周志湖 微信号:zhouzhihubeyond 本节主要内容 Window Operation 入门案例 1. Window Operation Spark Streaming提供窗口操作(Window Operation),如下图所示: 上图中,红色实线表示窗口当前的滑动位置,虚线表示前一次窗口位置,窗口每滑动一次,落在该窗口中的RDD被一起同时处理,生成一个窗口DStream(windowed DStream),窗口操作需要设置两个参数: (1)窗口长度(window length),

Spark修炼之道(进阶篇)——Spark入门到精通:第二节 Hadoop、Spark生成圈简介

作者:周志湖 网名:摇摆少年梦 微信号:zhouzhihubeyond 本节主要内容 Hadoop生态圈 Spark生态圈 1. Hadoop生态圈 原文地址:http://os.51cto.com/art/201508/487936_all.htm#rd?sukey=a805c0b270074a064cd1c1c9a73c1dcc953928bfe4a56cc94d6f67793fa02b3b983df6df92dc418df5a1083411b53325 下图给出了Hadoop生态圈中的重要

Spark修炼之道(进阶篇)——Spark入门到精通:第四节 Spark编程模型(一)

作者:周志湖 网名:摇摆少年梦 微信号:zhouzhihubeyond 本节主要内容 Spark重要概念 弹性分布式数据集(RDD)基础 1. Spark重要概念 本节部分内容源自官方文档:http://spark.apache.org/docs/latest/cluster-overview.html (1)Spark运行模式 目前最为常用的Spark运行模式有: - local:本地线程方式运行,主要用于开发调试Spark应用程序 - Standalone:利用Spark自带的资源管理与调度

Spark修炼之道(进阶篇)——Spark入门到精通:第三节 Spark Intellij IDEA开发环境搭建

作者:周志湖 网名:摇摆少年梦 微信号:zhouzhihubeyond 本节主要内容 Intellij IDEA 14.1.4开发环境配置 Spark应用程序开发 1. Intellij IDEA 14.1.4开发环境配置 Intellij IDEA 功能十分强大,能够开发JAVA.Scala等相关应用程序,在依赖管理 智能提示等方面做到了极致,大家可以到:http://www.jetbrains.com/idea/download/下载,目前有两种:Ultimate Edition Free