[大数据之Spark]——Actions算子操作入门实例

Actions

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

这个方法会传入两个参数,计算这两个参数返回一个结果。返回的结果与下一个参数一起当做参数继续进行计算。

比如,计算一个数组的和。

//创建数据集
scala> var data = sc.parallelize(1 to 3,1)

scala> data.collect
res6: Array[Int] = Array(1, 2, 3)

//collect计算
scala> data.reduce((x,y)=>x+y)
res5: Int = 6

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

返回数据集的所有元素,通常是在使用filter或者其他操作的时候,返回的数据量比较少时使用。

比如,显示刚刚定义的数据集内容。

//创建数据集
scala> var data = sc.parallelize(1 to 3,1)

scala> data.collect
res6: Array[Int] = Array(1, 2, 3)

count()

Return the number of elements in the dataset.

计算数据集的数据个数,一般都是统计内部元素的个数。

//创建数据集
scala> var data = sc.parallelize(1 to 3,1)

//统计个数
scala> data.count
res7: Long = 3

scala> var data = sc.parallelize(List(("A",1),("B",1)))
scala> data.count
res8: Long = 2

first()

Return the first element of the dataset (similar to take(1)).

返回数据集的第一个元素,类似take(1)

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))

//获取第一条元素
scala> data.first
res9: (String, Int) = (A,1)

take(n)

Return an array with the first n elements of the dataset.

返回数组的头n个元素

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))

scala> data.take(1)
res10: Array[(String, Int)] = Array((A,1))

//如果n大于总数,则会返回所有的数据
scala> data.take(8)
res12: Array[(String, Int)] = Array((A,1), (B,1))

//如果n小于等于0,会返回空数组
scala> data.take(-1)
res13: Array[(String, Int)] = Array()

scala> data.take(0)
res14: Array[(String, Int)] = Array()

takeSample(withReplacement, num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

这个方法与sample还是有一些不同的,主要表现在:

  • 返回具体个数的样本(第二个参数指定)
  • 直接返回array而不是RDD
  • 内部会将返回结果随机打散
//创建数据集
scala> var data = sc.parallelize(List(1,3,5,7))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

//随机2个数据
scala> data.takeSample(true,2,1)
res0: Array[Int] = Array(7, 1)

//随机4个数据,注意随机的数据可能是重复的
scala> data.takeSample(true,4,1)
res1: Array[Int] = Array(7, 7, 3, 7)

//第一个参数是是否重复
scala> data.takeSample(false,4,1)
res2: Array[Int] = Array(3, 5, 7, 1)

scala> data.takeSample(false,5,1)
res3: Array[Int] = Array(3, 5, 7, 1)

takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

基于内置的排序规则或者自定义的排序规则排序,返回前n个元素

//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21

//返回排序数据
scala> data.takeOrdered(3)
res4: Array[String] = Array(a, b, c)

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

将数据集作为文本文件保存到指定的文件系统、hdfs、或者hadoop支持的其他文件系统中。

//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21

//保存为test_data_save文件
scala> data.saveAsTextFile("test_data_save")

scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
<console>:24: error: not found: type GzipCodec
              data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
                                                            ^
//引入必要的class
scala> import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.io.compress.GzipCodec

//保存为压缩文件
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])

查看文件

[xingoo@localhost bin]$ ll
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save2
[xingoo@localhost bin]$ cd test_data_save2
[xingoo@localhost test_data_save2]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 30 Oct 10 23:07 part-00000.gz
-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save2]$ cd ..
[xingoo@localhost bin]$ cd test_data_save
[xingoo@localhost test_data_save]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 10 Oct 10 23:07 part-00000
-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save]$ cat part-00000
b
a
e
f
c

saveAsSequenceFile(path)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

保存为sequence文件

scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)),3)
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:22

scala> data.saveAsSequenceFile("kv_test")

[xingoo@localhost bin]$ cd kv_test/
[xingoo@localhost kv_test]$ ll
total 12
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00000
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00001
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00002
-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:25 _SUCCESS

saveAsObjectFile(path)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().

基于Java序列化保存文件

scala> var data = sc.parallelize(List("a","b","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:22

scala> data.saveAsObjectFile("str_test")

scala> var data2 = sc.objectFile[Array[String]]("str_test")
data2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[20] at objectFile at <console>:22

scala> data2.collect

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

统计KV中,相同K的V的个数

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:22

//统计个数
scala> data.countByKey
res9: scala.collection.Map[String,Long] = Map(B -> 1, A -> 2)

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.

Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

针对每个参数执行,通常在更新互斥或者与外部存储系统交互的时候使用

// 创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:22

// 遍历
scala> data.foreach(x=>println(x+" hello"))
b hello
a hello
e hello
f hello
c hello

本文转自博客园xingoo的博客,原文链接:[大数据之Spark]——Actions算子操作入门实例,如需转载请自行联系原博主。

时间: 2024-09-13 08:19:57

[大数据之Spark]——Actions算子操作入门实例的相关文章

[大数据之Spark]——快速入门

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用.可以查看编程指南了解更多的内容. 为了良好的阅读下面的文档,最好是结合实际的练习.首先需要下载spark,然后安装hdfs,可以下载任意版本的hdfs. Spark Shell 交互 基本操作 Spark Shell提供给用户一个简单的学习API的方式 以及 快速分析数据的工具.在shell中,既可以使用scala(运行在java虚拟机,因

[大数据之Spark]——Transformations转换入门经典实例

Spark相比于Mapreduce的一大优势就是提供了很多的方法,可以直接使用:另一个优势就是执行速度快,这要得益于DAG的调度,想要理解这个调度规则,还要理解函数之间的依赖关系. 本篇就着重描述下Spark提供的Transformations方法. 依赖关系 宽依赖和窄依赖 窄依赖(narrow dependencies) 窄依赖是指父RDD仅仅被一个子RDD所使用,子RDD的每个分区依赖于常数个父分区(O(1),与数据规模无关). 输入输出一对一的算子,且结果RDD的分区结构不变.主要是ma

【Spark Summit East 2017】2017年大数据与Spark的发展趋势

本讲义出自Matei Zaharia在Spark Summit East 2017上的演讲,主要介绍了2016年以及2017年大数据与Spark技术的未来的汇合的发展趋势以及Databricks对于使Spark与像深度学习库这样的原生代码能够更好地进行交互所做的工作.

大数据应用之:MongoDB从入门到精通你不得不知的21个为什么?

一.引言: 互联网的发展和电子商务平台的崛起,催生了大数据时代的来临,作为大数据典型开发框架的MongoDB成为了No-sql数据库的典型代表.MongoDB从入门到精通你不得不知的21个为什么专为大数据时代,大数据应用系统系统分析.架构设计和平台开发人员而准备.希望能够为大家起到提纲挈领,指明大家学习目标和方向的作用. 一.正文 n1.MongoDB概述   1.1 MongoDB为何而生?   1.2 MongoDB有哪些技术特点?为何适应大数据时代的发展?   1.3 MongoDB不是万

大数据:Spark和Hadoop是友非敌

Spark 在 6 月份取得了激动人心的成绩.在圣何塞举办的 Hadoop 峰会上,Spark 成了人们经常提及的话题和许多演讲的主题.IBM 还在 6 月 15 号宣布,将对 Spark 相关的技术进行巨额投资. 这一声明帮助推动了旧金山Spark 峰会的召开.在这里,人们会看到有越来越多的工程师在学习 Spark,也有越来越多的公司在试验和采用 Spark. 对 Spark 的投资和采用形成了一个正向循环,迅速推动这一重要技术的成熟和发展,让整个大数据社区受益.然而,人们对 Spark 的日

云计算大数据核心技术Spark,星星之火即将燎原

大数据时代,Spark是继Hadoop之后,成为替代Hadoop的下一代云计算大数据核心技术,目前Spark已经构建了自己的整个大数据处理生态系统,如流处理.图技术.机器学习.NoSQL查询等方面都有自己的技术,并且是Apache顶级Project,可以 预计的是 2014年下半年到2015年Spark在社区和商业应用上会有爆发式的增长.目前Spark的技术在国内还属于起步阶段,为了更好地帮助大家了解Spark技术特点及应用前景,我们走访了Spark亚太研究院的院长王家林.       王家林,

助人就是助己:IBM宣布大规模资助开源大数据项目Spark

本周一,IBM宣布将对开源实时大数据分析项目Apache Spark进行大规模资助,蓝色巨人宣称,其资助的力度之大相当于每年数亿美元的投入. Hadoop技术出自Google.Yahoo这些互联网公司,主要是为了对规模庞大的各类数据进行处理和分析.不过近年来随着大数据应用的流行,越来越多的公司也希望自己具备类似的能力,这使得Hadoop逐步进入了主流.Hadoop以及相关的分发企业如Cloudera, Hortonworks等也成为了大数据领域的投资热点. Spark同样也提供大数据处理与分析能

预告:大数据(hadoop\spark)解决方案构建详解,以阿里云E-MapReduce为例

在2016年09月22日 20:00 - 21:00笔者将在 CSDN学院,分享<大数据解决方案构建详解 :以阿里云E-MapReduce为例>欢迎大家报名,报名链接:http://edu.csdn.net/huiyiCourse/detail/186 ppt地址:https://yq.aliyun.com/attachment/download/?spm=0.0.0.0.PpnciK&filename=%E5%A4%A7%E6%95%B0%E6%8D%AE%E8%A7%A3%E5%8

大数据在银行的七个应用实例

Hadoop is present in nearly every vertical today that isleveraging big data in order to analyze information and gain competitiveadvantages. Many financial organizations firms are already using Hadoopsolutions successfully and the ones who are not hav