《循序渐进学Spark》一3.6 Shuffle机制

3.6 Shuffle机制

在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节,Shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark作为MapReduce框架的一种实现,自然也实现了Shuffle的逻辑。对于大数据计算框架而言,Shuffle阶段的效率是决定性能好坏的关键因素之一。

3.6.1 什么是Shuffle

Shuffle是MapReduce框架中的一个特定的阶段,介于Map阶段和Reduce阶段之间,当Map的输出结果要被Reduce使用时,输出结果需要按关键字值(key)哈希,并且分发到每一个Reducer上,这个过程就是Shuffle。直观来讲,Spark Shuffle机制是将一组无规则的数据转换为一组具有一定规则数据的过程。由于Shuffle涉及了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响整个程序的运行效率。

在MapReduce计算框架中,Shuffle连接了Map阶段和Reduce阶段,即每个Reduce Task从每个Map Task产生的数据中读取一片数据,极限情况下可能触发M*R个数据拷贝通道(M是Map Task数目,R是Reduce Task数目)。通常Shuffle分为两部分:Map阶段的数据准备和Reduce阶段的数据拷贝。首先,Map阶段需根据Reduce阶段的Task数量决定每个Map Task输出的数据分片数目,有多种方式存放这些数据分片:

1) 保存在内存中或者磁盘上(Spark和MapReduce都存放在磁盘上)。

2) 每个分片对应一个文件(现在Spark采用的方式,以及以前MapReduce采用的方式),或者所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量(现在MapReduce采用的方式)。

因此可以认为Spark Shuffle与Mapreduce Shuffle的设计思想相同,但在实现细节和优化方式上不同。

在Spark中,任务通常分为两种,Shuffle
mapTask和reduceTask,具体逻辑如图3-11所示:

图3-11 Spark Shuffle

图3-11中的主要逻辑如下:

1)首先每一个MapTask会根据ReduceTask的数量创建出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。

2)其次MapTask产生的结果会根据设置的partition算法填充到每个bucket中。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中。

当ReduceTask启动时,它会根据自己task的id和所依赖的Mapper的id从远端或本地的block
manager中取得相应的bucket作为Reducer的输入进行处理。

这里的bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。Spark shuffle可以分为两部分:

1) 将数据分成bucket,并将其写入磁盘的过程称为Shuffle Write。

2) 在存储Shuffle数据的节点Fetch数据,并执行用户定义的聚集操作,这个过程称为Shuffle Fetch。

3.6.2 Shuffle历史及细节

下面介绍Shuffle Write与Fetch。

1. Shuffle Write

在Spark的早期版本实现中,Spark在每一个MapTask中为每个ReduceTask创建一个bucket,并将RDD计算结果放进bucket中。

但早期的Shuffle Write有两个比较大的问题。

1)Map的输出必须先全部存储到内存中,然后写入磁盘。这对内存是非常大的开销,当内存不足以存储所有的Map输出时就会出现OOM(Out of Memory)。

2)每个MapTask会产生与ReduceTask数量一致的Shuffle文件,如果MapTask个数是1k,ReduceTask个数也是1k,就会产生1M个Shuffle文件。这对于文件系统是比较大的压力,同时在Shuffle数据量不大而Shuffle文件又非常多的情况下,随机写也会严重降低IO的性能。

后来到了Spark 0.8版实现时,显著减少了Shuffle的内存压力,现在Map输出不需要先全部存储在内存中,再flush到硬盘,而是record-by-record写入磁盘中。对于Shuffle文件的管理也独立出新的ShuffleBlockManager进行管理,而不是与RDD cache文件在一起了。

但是Spark 0.8版的Shuffle Write仍然有两个大的问题没有解决。

1)Shuffle文件过多的问题。这会导致文件系统的压力过大并降低IO的吞吐量。

2)虽然Map输出数据不再需要预先存储在内存中然后写入磁盘,从而显著减少了内存压力。但是新引入的DiskObjectWriter所带来的buffer开销也是不容小视的内存开销。假定有1k个MapTask和1k个ReduceTask,就会有1M个bucket,相应地就会有1M个write handler,而每一个write handler默认需要100KB内存,那么总共需要100GB内存。这样仅仅是buffer就需要这么多的内存。因此当ReduceTask数量很多时,内存开销会很大。

为了解决shuffle文件过多的情况,Spark后来引入了新的Shuffle
consolidation,以期显著减少Shuffle文件的数量。

Shuffle
consolidation的原理如图3-12所示:

在图3-12中,假定该job有4个Mapper和4个Reducer,有2个core能并行运行两个task。可以算出Spark的Shuffle Write共需要16个bucket,也就有了16个write handler。在之前的Spark版本中,每个bucket对应一个文件,因此在这里会产生16个shuffle文件。

图3-12 Shuffle
consolidation

而在Shuffle consolidation中,每个bucket并非对应一个文件,而是对应文件中的一个segment。同时Shuffle consolidation产生的Shuffle文件数量与Spark core的个数也有关系。在图3-12中,job中的4个Mapper分为两批运行,在第一批2个Mapper运行时会申请8个bucket,产生8个Shuffle文件;而在第二批Mapper运行时,申请的8个bucket并不会再产生8个新的文件,而是追加写到之前的8个文件后面,这样一共就只有8个Shuffle文件,而在文件内部共有16个不同的segment。因此从理论上讲Shuffle consolidation产生的Shuffle文件数量为C×R,其中C是Spark集群的core number,R是Reducer的个数。

很显然,当M=C时,Shuffle
consolidation产生的文件数和之前的实现相同。

Shuffle
consolidation显著减少了Shuffle文件的数量,解决了Spark之前实现中一个比较严重的问题。但是Writer handler的buffer开销过大依然没有减少,若要减少Writer handler的buffer开销,只能减少Reducer的数量,但是这又会引入新的问题。

2. Shuffle Fetch与Aggregator

Shuffle Write写出去的数据要被Reducer使用,就需要Shuffle Fetch将所需的数据Fetch过来。这里的Fetch操作包括本地和远端,因为Shuffle数据有可能一部分是存储在本地的。在早期版本中,Spark对Shuffle Fetcher实现了两套不同的框架:NIO通过socket连接Fetch数据;OIO通过netty server去fetch数据。分别对应的类是Basic-BlockFetcherIterator和NettyBlockFetcherIterator。

目前在Spark1.5.0中做了优化。新版本定义了类ShuffleBlockFetcherIterator来完成数据的fetch。对于local的数据,ShuffleBlockFetcherIterator会通过local的BlockMan-ager来fetch。对于远端的数据块,它通过BlockTransferService类来完成。具体实现参见如下代码:

在MapReduce的Shuffle过程中,Shuffle fetch过来的数据会进行归并排序(merge sort),使得相同key下的不同value按序归并到一起供Reducer使用,这个过程如图3-13所示:

这些归并排序都是在磁盘上进行的,这样做虽然有效地控制了内存使用,但磁盘IO却大幅增加了。虽然Spark属于MapReduce体系,但是对传统的MapReduce算法进行了一定的改变。Spark假定在大多数应用场景下,Shuffle数据的排序不是必须的,如word count。强制进行排序只会使性能变差,因此Spark并不在Reducer端做归并排序。既然没有归并排序,那Spark是如何进行reduce的呢?这就涉及下面要讲的Shuffle Aggregator了。

图3-13 Fetch merge

Aggregator本质上是一个hashmap,它是以map output的key为key,以任意所要combine的类型为value的hashmap。

在做word count reduce计算count值时,它会将Shuffle
fetch到的每一个key-value对更新或是插入hashmap中(若在hashmap中没有查找到,则插入其中;若查找到,则更新value值)。这样就不需要预先把所有的key-value进行merge sort,而是来一个处理一个,省去了外部排序这一步骤。但同时需要注意的是,reducer的内存必须足以存放这个partition的所有key和count值,因此对内存有一定的要求。

在上面word count的例子中,因为value会不断地更新,而不需要将其全部记录在内存中,因此内存的使用还是比较少的。考虑一下如果是groupByKey这样的操作,Reducer需要得到key对应的所有value。在Hadoop MapReduce中,由于有了归并排序,因此给予Reducer的数据已经是group by key了,而Spark没有这一步,因此需要将key和对应的value全部存放在hashmap中,并将value合并成一个array。可以想象为了能够存放所有数据,用户必须确保每一个partition小到内存能够容纳,这对于内存是非常严峻的考验。因此在Spark文档中,建议用户涉及这类操作时尽量增加partition,也就是增加Mapper和Reducer的数量。

增加Mapper和Reducer的数量固然可以减小partition的大小,使内存可以容纳这个partition。但是在Shuffle write中提到,bucket和对应于bucket的write handler是由Mapper和Reducer的数量决定的,task越多,bucket就会增加得更多,由此带来write handler所需的buffer也会更多。在一方面我们为了减少内存的使用采取了增加task数量的策略,另一方面task数量增多又会带来buffer开销更大的问题,因此陷入了内存使用的两难境地。

为了减少内存的使用,只能将Aggregator的操作从内存移到磁盘上进行,因此Spark新版本中提供了外部排序的实现,以解决这个问题。

Spark将需要聚集的数据分为两类:不需要归并排序和需要归并排序的数据。对于前者,在内存中的AppendOnlyMap中对数据聚集。对于需要归并排序的数据,现在内存中进行聚集,当内存数据达到阈值时,将数据排序后写入磁盘。事实上,磁盘上的数据只是全部数据的一部分,最后将磁盘数据全部进行归并排序和聚集。具体Aggregator的逻辑可以参见Aggregator类的实现。

本节就Shuffle的概念与原理先介绍到这里。在下一章讲解Spark源码时,会对Shuffle的核心机制——Shuffle存储做代码层面的讲解。相信学习完本章和第4章的Shuffle存储机制后,读者会对Shuffle机制掌握得更加深入。

时间: 2024-09-25 19:22:11

《循序渐进学Spark》一3.6 Shuffle机制的相关文章

《循序渐进学Spark 》Spark 编程模型

本节书摘来自华章出版社<循序渐进学Spark >一书中的第1章,第3节,作者 小象学院 杨 磊,更多章节内容可以访问"华章计算机"公众号查看. Spark机制原理 本书前面几章分别介绍了Spark的生态系统.Spark运行模式及Spark的核心概念RDD和基本算子操作等重要基础知识.本章重点讲解Spark的主要机制原理,因为这是Spark程序得以高效执行的核心.本章先从Application.job.stage和task等层次阐述Spark的调度逻辑,并且介绍FIFO.FA

《循序渐进学Spark》一第2章

 本节书摘来自华章出版社<循序渐进学Spark>一书中的第2章,第2.1节,作者 小象学院 杨 磊,更多章节内容可以访问"华章计算机"公众号查看. 第2章 Spark 编程模型 与Hadoop相比,Spark最初为提升性能而诞生.Spark是Hadoop MapReduce的演化和改进,并兼容了一些数据库的基本思想,可以说,Spark一开始就站在Hadoop与数据库这两个巨人的肩膀上.同时,Spark依靠Scala强大的函数式编程Actor通信模式.闭包.容器.泛型,并借助

《循序渐进学Spark》一2.3 Spark算子

 本节书摘来自华章出版社<循序渐进学Spark>一书中的第2章,第2.3节,作者 小象学院 杨 磊,更多章节内容可以访问"华章计算机"公众号查看. 2.3 Spark算子 本节介绍Spark算子的分类及其功能. 2.3.1 算子简介 Spark应用程序的本质,无非是把需要处理的数据转换为RDD,然后将RDD通过一系列变换(transformation)和操作(action)得到结果,简单来说,这些变换和操作即为算子. Spark支持的主要算子如图2-4所示. 根据所处理的数

《循序渐进学Spark》一3.5 容错机制及依赖

3.5 容错机制及依赖 一般而言,对于分布式系统,数据集的容错性通常有两种方式: 1) 数据检查点(在Spark中对应Checkpoint机制). 2) 记录数据的更新(在Spark中对应Lineage血统机制). 对于大数据分析而言,数据检查点操作成本较高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低,同时会消耗大量存储资源. Spark选择记录更新的方式.但更新粒度过细时,记录更新成本也不低.因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然

《循序渐进学Spark》一第3章

第3章 Spark机制原理 本书前面几章分别介绍了Spark的生态系统.Spark运行模式及Spark的核心概念RDD和基本算子操作等重要基础知识.本章重点讲解Spark的主要机制原理,因为这是Spark程序得以高效执行的核心.本章先从Application.job.stage和task等层次阐述Spark的调度逻辑,并且介绍FIFO.FAIR等经典算法,然后对Spark的重要组成模块:I/O与通信控制模块.容错模块及Shuffle模块做了深入的阐述.其中,在Spark I/O模块中,数据以数据

《循序渐进学Spark》一导读

Preface 前 言 Spark诞生于美国加州大学伯克利分校AMP实验室.随着大数据技术在互联网.金融等领域的突破式进展,Spark在近些年得到更为广泛的应用.这是一个核心贡献者超过一半为华人的大数据平台开源项目,且正处于飞速发展.快速成熟的阶段. 为什么写这本书 Spark已经成为大数据计算.分析领域新的热点和发展方向.相对于Hadoop传统的MapReduce计算模型,Spark提供更为高效的计算框架以及更为丰富的功能,因此在大数据生产应用领域中不断攻城略地,势如破竹. 与企业不断涌现的对

《循序渐进学Spark 》导读

目 录 前 言 第1章 Spark架构与集群环境    1.1 Spark概述与架构     1.1.1 Spark概述     1.1.2 Spark生态     1.1.3 Spark架构     1.2 在Linux集群上部署Spark     1.2.1 安装OpenJDK     1.2.2 安装Scala     1.2.3 配置SSH免密码登录     1.2.4 Hadoop的安装配置     1.2.5 Spark的安装部署     1.2.6 Hadoop与Spark的集群复

《循序渐进学Spark》一3.7 本章小结

3.7 本章小结 本章主要讲述了Spark的工作机制与原理.首先剖析了Spark的提交和执行时的具体机制,重点强调了Spark程序的宏观执行过程: 提交后的Job在Spark中形成了RDD DAG(有向无环图),然后进入一系列切分调度的过程.在剖析过程中,结合Spark的源码呈现了这些调度过程的代码细节.本章后半部分接着剖析了Spark的存储及IO.Spark通信机制,最后讲述了Spark的容错机制及Shuffle机制. 本章内容比较多,希望读者仔细体会.

《循序渐进学Spark》一3.2 Spark调度机制

3.2 Spark调度机制 Spark调度机制是保证Spark应用高效执行的关键.本节从Application.job.stage和task的维度,从上层到底层来一步一步揭示Spark的调度策略. 3.2.1 Application的调度 Spark中,每个Application对应一个SparkContext.SparkContext之间的调度关系取决于Spark的运行模式.对Standalone模式而言,Spark Master节点先计算集群内的计算资源能否满足等待队列中的应用对内存和CPU