Spark的RDD原理以及2.0特性的介绍(转)

Spark 是什么 

Spark 是 Apache 顶级项目里面最火的大数据处理的计算引擎,它目前是负责大数据计算的工作。包括离线计算或交互式查询、数据挖掘算法、流式计算以及图计算等。全世界有许多公司和组织使用或给社区贡献代码,社区的活跃度见 www.github.com/apache/spark。

2013 年开始 Spark开发团队成立 Databricks,来对 Spark 进行运作和管理,并提供 Cloud 服务。Spark 社区基本保持一个季度一个版本,不出意外的话 Spark 2.0 将在五月底发布。

与 Mapreduce 相比,Spark 具备 DAG 执行引擎以及基于内存的多轮迭代计算等优势,在SQL 层面上,比 Hive/Pig 相比,引入关系数据库的许多特性,以及内存管理技术。另外在 Spark 上所有的计算模型最终都统一基于 RDD 之上运行执行,包括流式和离线计算。Spark 基于磁盘的性能是 MR 的 10 倍,基于内存的性能是 MR 的 100 倍 。

Spark 提供 SQL、机器学习库 MLlib、流计算 Streaming 和图计算 Graphx,同时也支持 Scala、Java、Python 和 R 语言开发的基于 API 的应用程序。

RDD 的原理

RDD,英文全称叫 Resilient Distributed Datasets。

an RDD is a read-only, partitioned collection of records. 字面意思是只读的分布式数据集。

但其实个人觉得可以把 RDD 理解为关系数据库 里的一个个操作,比如 map,filter,Join 等。在 Spark 里面实现了许多这样的 RDD 类,即可以看成是操作类。当我们调用一个 map 接口,底层实现是会生成一个 MapPartitionsRDD 对象,当 RDD 真正执行时,会调用 MapPartitionsRDD 对象里面的 compute 方法来执行这个操作的计算逻辑。但是不同的是,RDD 是 lazy 模式,只有像 count,saveasText 这种 action 动作被调用后再会去触发 runJob 动作。

RDD 分为二类:transformation 和 action。

transformation 是从一个 RDD 转换为一个新的 RDD 或者从数据源生成一个新的 RDD;

action 是触发 job 的执行。所有的 transformation 都是 lazy 执行,只有在 action 被提交的时候才触发前面整个 RDD 的执行图。如下

val file = sc.textFile(args(0))

val words = file.flatMap(line => line.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _, 2) wordCounts.saveAsTextFile(args(1))

这段代码生成的 RDD 的执行树是如下图所示:

最终在 saveAsTextFile 方法时才会将整个 RDD 的执行图提交给 DAG 执行引擎,根据相关信息切分成一个一个 Stage,每个 Stage 去执行多个 task,最终完成整个 Job 的执行。

还有一个区别就是,RDD 计算后的中间结果是可以被持久化,当下一次需要使用时,可以直接使用之前持久化好的结果,而不是重新计算,并且这些结果被存储在各个结点的 executor 上。下一次使用时,调度器可以直接把 task 分发到存储持久化数据的结点上,减少数据的网络传输开稍。这种场景在数据挖掘迭代计算是经常出现。如下代码

val links = spark.textFile(...).map(...).persist() var ranks = // RDD of (URL, rank) pairs

for (i <- 1 to ITERATIONS) {

// Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap {

(url, (links, rank)) =>

links.map(dest => (dest, rank/links.size)) }

// Sum contributions by URL and get new ranks

ranks = contribs.reduceByKey((x,y) => x+y)

.mapValues(sum => a/N + (1-a)*sum) }

以上代码生成的 RDD 执行树如下图所示:

计算 contribs-0 时需要使用 links 的计算逻辑,当 links 每个分片计算完后,会将这个结果保存到本地内存或磁盘上,下一次 contribs-1 计算要使用 links 的数据时,直接从上一次保存的内存和磁盘上读取就可以了。这个持久化系统叫做 blockManager,类似于在内部再构建了一个 KV 系统,K 表示每个分区 ID 号,V 表示这个分区计算后的结果。

另外在 streaming 计算时,每个 batch 会去消息队列上拉取这个时间段的数据,每个 Recevier 接收过来数据形成 block 块并存放到 blockManager 上,为了可靠性,这个 block 块可以远程备份,后续的 batch 计算就直接在之前已读取的 block 块上进行计算,这样不断循环迭代来完成流处理。

一个 RDD 一般会有以下四个函数组成。

1. 操作算子的物理执行逻辑

定义为:

def compute(split: Partition, context: TaskContext): Iterator[T]

如在 MapPartitionsRDD 里的实现是如下:

override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))

函数定义

f: (TaskContext, Int, Iterator[T]) => Iterator[U]

2. 获取分片信息

protected def getPartitions: Array[Partition] 

即这个操作的数据划分为多少个分 区。跟 mapreduce 里的 map 上的 split 类似的。

3. 获取父 RDD 的依赖关系

protected def getDependencies: Seq[Dependency[_]] 

依赖分二种:如果 RDD 的每个分区最多只能被一个 Child RDD 的一个分区使用,则称之为 narrow dependency;若依赖于多个 Child RDD 分区,则称之为 wide dependency。不同的操作根据其特性,可能会产生不同的依赖 。如下图所示

map 操作前后二个 RDD 操作之间的分区是一对一的关系,故产生 narrow dependency,而 join 操作的分区分别对应于它的二个子操作相对应的分区,故产生 wide dependency。当最后要生成具体的 task 运行时,就需要利用这个依赖关系也生成 Stage 的 DAG 图。

4. 获取该操作对应数据的存放位置信息,主要是针对 HDFS 这类有数据源的 RDD。

protected def getPreferredLocations(split: Partition): Seq[String]

Spark 的执行模式

Spark 的执行模式有 local、Yarn、Standalone、Mesos 四类。后面三个分别有 cluster 和 client 二种。client 和 cluster 的区别就是指 Driver 是在程序提交客户端还是在集群的 AM 上。 比如常见的 Yarn-cluster 模式如下图所示:

一般来说,运行简单测试或 UT 用的是 local 模式运行,其实就是用多线程模似分布式执行。 如果业务部门较少且不需要对部门或组之间的资源做划分和优先级调度的话,可以使用 Standalone 模式来部署。

当如果有多个部门或组,且希望每个组织可以限制固定运行的最大资源,另外组或者任务需要有优先级执行的话,可以选择 Yarn 或 Mesos。

Spark 2.0 的特性

Unifying DataFrames and Datasets in Scala/Java

DataFrame 和 Dataset 的功能是什么?

它们都是提供给用户使用,包括各类操作接口的 API。1.3 版本引入 DataFrame,1.6 版本引入 Dataset,2.0 提供的功能是将二者统一,即保留 Dataset,而把 DataFrame 定义为 Dataset[Row],即是 Dataset 里的元素对象为 Row 的一种(SPARK-13485)。

DataFrame,它就是提供了一系列操作 API,与 RDD API 相比较,DataFrame 里操作的数据都是带有 Schema 信息,所以 DataFrame 里的所有操作是可以享受 Spark SQL Catalyst optimizer 带来的性能提升,比如 code generation 以及 Tungsten等。执行过程如下图所示

但是 DataFrame 出来后发现有些情况下 RDD 可以表达的逻辑用 DataFrame 无法表达。比如 要对 group by 或 join 后的结果用自定义的函数,可能用 SQL 是无法表达的。如下代码:

case class ClassData(a: String, b: Int)

case class ClassNullableData(a: String, b: Integer)

val ds = Seq(ClassData("a", 1), ClassData("a", 2)).toDS()

val agged = ds.groupByKey(d => ClassNullableData(d.a, null))

.mapGroups {

case (key, values) => key.a + values.map(_.b).sum

}

中间处理过程的数据是自定义的类型,并且 groupby 后的聚合逻辑也是自定义的,故用 SQL 比较难以表达,所以提出了 Dataset API。Dataset API 扩展 DataFrame API 支持静态类型和运行已经存在的 Scala 或 Java 语言的用户自定义函数。同时 Dataset 也能享受 Spark SQL 里所有性能 带来的提升。

那么后面发现 Dataset 是包含了 DataFrame 的功能,这样二者就出现了很大的冗余,故在 2.0 时将二者统一,保留 Dataset API,把 DataFrame 表示为 Dataset[Row],即 Dataset 的子集。

因此我们在使用 API 时,优先选择 DataFrame & Dataset,因为它的性能很好,而且以后的优化它都可以享受到,但是为了兼容早期版本的程序,RDD API 也会一直保留着。后续 Spark 上层的库将全部会用 DataFrame,比如 MLlib、Streaming、Graphx 等。

Whole-stage code generation

其中一个例子:

select count(*) from store_sales where ss_item_sk = 1000

那么在翻译成计算引擎的执行计划如下图:

而通常物理计划的代码是这样实现的:

class Filter {

def next(): Boolean = {

var found = false

while (!found && child.next()) {

found = predicate(child.fetch())

}

return found

}

def fetch(): InternalRow = {

child.fetch()

}...

}

但是真正如果我们用 hard code 写的话,代码是这样的:

var count = 0

for (ss_item_sk in store_sales) {

if (ss_item_sk == 1000) {

count += 1

}

}

发现二者相关如下图所示:

那么如何使得计算引擎的物理执行速度能达到 hard code 的性能呢?这就提出了 whole-stage code generation,即对物理执行的多次调用转换为代码 for 循环,类似 hard code 方式,减少中间执行的函数调用次数,当数据记录多时,这个调用次数是很大。 最后这个优化带来的性能提升如下图所示:

从 benchmark 的结果可以看出,使用了该特性后各操作的性能都有很大的提升。

Structured Streaming

Spark Streaming 是把流式计算看成一个一个的离线计算来完成流式计算,提供了一套 Dstream 的流 API,相比于其他的流式计算,Spark Streaming 的优点是容错性和吞吐量上要有优势,关于 Spark Streaming 的详细设计思想和分析,可以到 https://github.com/lw-lin/CoolplaySpark 进行详细学习和了解。

在 2.0 以前的版本,用户在使用时,如果有流计算,又有离线计算,就需要用二套 API 去编写程序,一套是 RDD API,一套是 Dstream API。而且 Dstream API 在易用性上远不如 SQL 或 DataFrame。

为了真正将流式计算和离线计算在编程 API 上统一,同时也让 Streaming 作业能够享受 DataFrame/Dataset 上所带来的优势:性能提升和 API 易用,于是提出了 Structured Streaming。最后我们只需要基于 DataFrame/Dataset 可以开发离线计算和流式计算的程序,很容易使得 Spark 在 API 跟业界所说的 DataFlow 来统一离线计算和流式计算效果一样。

比如在做 Batch Aggregation 时我们可以写成下面的代码

那么对于流式计算时,我们仅仅是调用了 DataFrame/Dataset 的不同函数代码,如下:

最后,在 DataFrame/Dataset 这个 API 上可以完成如下图所示的所有应用:

其他主要性能提升

采用 vectorized Parquet decoder 读取 parquet 上数据。以前是一行一行的读取,然后处理。现在改为一次读取 4096 行记录,不需要每处理一行记录去调用一次 Parquet 获取记录的方法,而是改为一批去调用一次(SPARK-12854)。加上 Parquet 本身是列式存储,这个优化使得 Parquet 读取速度提高 3 倍。

采有 radix sort 来提高 sort 的性能(SPARK-14724)。在某些情况下排序性能可以提高 10-20 倍。

使用 VectorizedHashmap 来代替 Java 的 HashMap 加速 group by 的执行(SPARK-14319)。

将 Hive 中的 Window 函数用 Native Spark Window 实现,因为 Native Spark Window 在内存管理上有优势(SPARK-8641)。

避免复杂语句中的逻辑相同部分在执行时重复计算(SPARK-13523)。

压缩算法默认使用 LZ4(SPARK-12388)。

语句的增强

建立新的语法解析(SPARK-12362)满足所有的 SQL 语法,这样即合并 Hive 和标准 SQL 的语句解析,同时不依赖 Hive 的语法解析 jar(SPARK-14776)。之前版本二者的语法解析是独立的,这样导致在标准 SQL 中无法使用窗口函数或者 Hive 的语法,而在使用 Hive 语法时无法使用标准 SQL 的语法,比如 In/Exists 子句等。在 SQL 编写时,没法在一个 Context 把二者的范围全部支持,然而有了这个特性后,使得 SQL 语句表达更强大,后续要增加任何语法,只需要维护这一个语法解析即可。当然缺点是后续 Hive 版本的新语法,需要手动添加进来。

支持 intersect/except(SPARK-12542)。如 select * from t1 except select * from t2 或者 select * from t1 intersect select * from t2。

支持 uncorrelated scalar subquery(SPARK-13417)。如 select (select min(value) from testData where key = (select max(key) from testData) - 1)。

支持 DDL/DML(SPARK-14118)。之前 DDL/DML 语句是调用 Hive 的 DDL/DML 语句命令来完成,而现在是直接在 Spark SQL 上就可以完成。

支持 multi-insert(SPARK-13924)。

支持 exist(SPARK-12545)和 NOT EXISTS(SPARK-10600),如 select * from (select 1 as a union all select 2 as a) t where exists (select * from (select 1 as b) t2 where b = a and b < 2)。

支持 subqueries 带有 In/Not In 子句(SPARK-4226),如 select * from (select 1 as a union all select 2 as a) t where a in (select b as a from t2 where b < 2)。

支持 select/where/having 中使用 subquery(SPARK-12543),如 select * from t where a = (select max(b) from t2) 或 select max(a) as ma from t having ma = (select max(b) from t2)。

支持 LeftSemi/LeftAnti(SPARK-14853)。

支持在条件表达式 In/Not In 里使用子句(SPARK-14781),如 select * from l where l.a in (select c from r) or l.a in (select c from r where l.b < r.d)。

支持所有的 TPCDS 语句(SPARK-12540)。

与以前版本兼容(SPARK-11806)

不支持运行在 Hadoop 版本 < 2.2 上(SPARK-11807)。

去掉 HTTPBroadcast(SPARK-12588)。

去掉 HashShuffleManager(SPARK-14667)。

去掉 Akka RPC。

简化与完善 accumulators and task metrics(SPARK-14626)。

将 Hive 语法解析以及语法移至 Core 里(SPARK-14825),在没有 Hive 元数据库和 Hive 依赖包时,我们可以像之前版本使用标准 SQL 一样去使用 HiveQL 语句。

1.6 版本严重问题的解决

在 http://geek.csdn.net/news/detail/70162 提到的 1.6 问题中 Spillable 集合内存溢出问题在 SPARK-4452 里已解决,BlockManager 死锁问题在 SPARK-12757 里已解决。

最后 2.0 版本还有一些其他的特性,如:

用 SparkSession 替换掉原来的 SQLContext and HiveContext。

mllib 里的计算用 DataFrame-based API 代替以前的 RDD 计算逻辑。

提供更多的 R 语言算法。

默认使用 Scala 2.11 编译与运行。

原文地址:http://www.dataguru.cn/article-9353-1.html

时间: 2024-09-15 11:06:36

Spark的RDD原理以及2.0特性的介绍(转)的相关文章

BlogEngine.Net架构与源代码分析系列part7:Web2.0特性

Pingback&Trackback 今天这篇文章主要向大家讲述一下Blog系统中应用最多的,具有Web2.0特性的,也是一种标准化的--Pingback&Trackback.分析一下BlogEngine.Net提供的比较全面的Pingback&Trackback支持.本文内容相对来说比较独立,如果您对整个系列感兴趣请参照这里. Pingback&Trackback参考 简单地说,Pingback&Trackback是博客在链接.引用其他博客内容时通知对方博客的一种

将Spark部署到Hadoop 2.2.0上

本文介绍的是如何将http://www.aliyun.com/zixun/aggregation/14417.html">Apache Spark部署到Hadoop 2.2.0上,如果你们的Hadoop是其他版本,比如CDH4,可直接参考官方说明操作. 需要注意两点:(1)使用的Hadoop必须是2.0系列,比如0.23.x,2.0.x,2.x.x或CDH4.CDH5等,将Spark运行在 Hadoop上,本质上是将Spark运行在Hadoop YARN上,因为Spark自身只提供了作业管

呼之欲出!比Spark快10倍的Hadoop3.0有哪些实用新特性?

Apache hadoop 项目组最新消息,hadoop3.x以后将会调整方案架构,将Mapreduce 基于内存+io+磁盘,共同处理数据. 其实最大改变的是hdfs,hdfs 通过最近black块计算,根据最近计算原则,本地black块,加入到内存,先计算,通过IO,共享内存计算区域,最后快速形成计算结果. Hadoop Hadoop 3.0简介 Hadoop 2.0是基于JDK 1.7开发的,而JDK 1.7在2015年4月已停止更新,这直接迫使Hadoop社区基于JDK 1.8重新发布一

Spark之RDD的transformation&amp;action(Java&amp;Scala实现)

1,transformation是得到一个新的RDD,方式很多,比如: 1.1 从Hadoop文件系统(如HDFS.Hive.HBase)输入创建 1.2 从父RDD转换得到新RDD 1.3 通过parallelize或makeRDD将单机数据创建为分布式RDD (区别: A)makeRDD函数比parallelize函数多提供了数据的位置信息. B)两者的返回值都是ParallelCollectionRDD,但parallelize函数可以自己指定分区的数量,而 makeRDD函数固定为seq

ADO.NET2.0跟ADO.NET3.0的一些新特性简要介绍

ado 觉得很多人在写关于ASP.NET2.0的东东,很少有人写关于ADO.NET2.0的新特性.查找了一下MSDN,给大家介绍几点好了.(如果需要察看所有ADO.NET2.0的新特性,请查看 http://msdn2.microsoft.com/en-us/library/ex6y04yf.aspx) Server Enumeration 用来枚举活动状态的SQL Server实例,版本需要在SQL2000及更新版本.使用的是SqlDataSourceEnumerator类 可以参考以下示例代

C# 3.0特性

在本篇中我要介绍两个概念,我觉得这两个东西必须一起来介绍,这样才能 连贯. C# 2.0里我们已经匿名方法了,现在类型也玩起匿名来了,怪不得大家 "举报"的时候都喜欢匿名,为啥?因为匿名被举报人就找不着报复 对象了呗,是的,匿名就是把名字隐藏起来,没有名字谁还能找得到你啊. 匿名类型 在C#里有这样一些类型,它是作为临时储存数据的,生命周期只在这个方法 内,方法结束了,这个类型的生命周期也没有了.那么这里我们就可以使用一个 匿名类型. var KeyPair = new {Key=&q

【转载】OpenEJB 3.0支持对枚举和集合的依赖注入及OSGi和EJB 3.0特性

    开源轻量级EJB实现框架OpenEJB的最新版支持对枚举.集合和Maps的依赖注入(Dependency Injection,即DI),并且支持OSGi和EJB 3.0规范.在经历了一年半的开发后,OpenEJB 3.0最终版近期发布了.该版本还支持@EJB引用其他EAR文件中的本地接口.事务日志及基于HTTP协议的EJBd,同时它还支持EJB 3.0的新特性如Business Interfaces.Java Persistence API (JPA)及JAX-WS Web Servic

spring boot 2.0 特性之springApplication

SpringApplication SpringApplication 类提供了在main函数中启动spring应用程序的便利性,通过调用其静态方法其实运行,代码如下: public static void main(String[] args) { SpringApplication.run(MySpringConfiguration.class, args); } 1. 默认情况下,其日志级别为info.如果上述启动失败,会注册FailureAnalyzers分析器获取一个对应的错误信息以及

spring boot 2.0特性之外部化配置

简介 spring boot允许你外部化其配置以便你能够在不同的环境中使用相同的代码.可以通过使用properties,YAML 文件,环境变量,以及命名行参数等形式外部化其配置.属性值可以通过@value的形式直接注入到bean中去(其通过spring 的抽象或者是通过使用@ConfigurationProperties绑定其结构化对象) spring boot使用了一种特别的 PropertySource允许你覆盖其值,其覆盖顺序如下: 1. devtools的环境变量被设置,则优先使用(其