3.6 Shuffle机制
在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节,Shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark作为MapReduce框架的一种实现,自然也实现了Shuffle的逻辑。对于大数据计算框架而言,Shuffle阶段的效率是决定性能好坏的关键因素之一。
3.6.1 什么是Shuffle
Shuffle是MapReduce框架中的一个特定的阶段,介于Map阶段和Reduce阶段之间,当Map的输出结果要被Reduce使用时,输出结果需要按关键字值(key)哈希,并且分发到每一个Reducer上,这个过程就是Shuffle。直观来讲,Spark Shuffle机制是将一组无规则的数据转换为一组具有一定规则数据的过程。由于Shuffle涉及了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响整个程序的运行效率。
在MapReduce计算框架中,Shuffle连接了Map阶段和Reduce阶段,即每个Reduce Task从每个Map Task产生的数据中读取一片数据,极限情况下可能触发M*R个数据拷贝通道(M是Map Task数目,R是Reduce Task数目)。通常Shuffle分为两部分:Map阶段的数据准备和Reduce阶段的数据拷贝。首先,Map阶段需根据Reduce阶段的Task数量决定每个Map Task输出的数据分片数目,有多种方式存放这些数据分片:
1) 保存在内存中或者磁盘上(Spark和MapReduce都存放在磁盘上)。
2) 每个分片对应一个文件(现在Spark采用的方式,以及以前MapReduce采用的方式),或者所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量(现在MapReduce采用的方式)。
因此可以认为Spark Shuffle与Mapreduce Shuffle的设计思想相同,但在实现细节和优化方式上不同。
在Spark中,任务通常分为两种,Shuffle
mapTask和reduceTask,具体逻辑如图3-11所示:
图3-11 Spark Shuffle
图3-11中的主要逻辑如下:
1)首先每一个MapTask会根据ReduceTask的数量创建出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。
2)其次MapTask产生的结果会根据设置的partition算法填充到每个bucket中。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中。
当ReduceTask启动时,它会根据自己task的id和所依赖的Mapper的id从远端或本地的block
manager中取得相应的bucket作为Reducer的输入进行处理。
这里的bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。Spark shuffle可以分为两部分:
1) 将数据分成bucket,并将其写入磁盘的过程称为Shuffle Write。
2) 在存储Shuffle数据的节点Fetch数据,并执行用户定义的聚集操作,这个过程称为Shuffle Fetch。
3.6.2 Shuffle历史及细节
下面介绍Shuffle Write与Fetch。
1. Shuffle Write
在Spark的早期版本实现中,Spark在每一个MapTask中为每个ReduceTask创建一个bucket,并将RDD计算结果放进bucket中。
但早期的Shuffle Write有两个比较大的问题。
1)Map的输出必须先全部存储到内存中,然后写入磁盘。这对内存是非常大的开销,当内存不足以存储所有的Map输出时就会出现OOM(Out of Memory)。
2)每个MapTask会产生与ReduceTask数量一致的Shuffle文件,如果MapTask个数是1k,ReduceTask个数也是1k,就会产生1M个Shuffle文件。这对于文件系统是比较大的压力,同时在Shuffle数据量不大而Shuffle文件又非常多的情况下,随机写也会严重降低IO的性能。
后来到了Spark 0.8版实现时,显著减少了Shuffle的内存压力,现在Map输出不需要先全部存储在内存中,再flush到硬盘,而是record-by-record写入磁盘中。对于Shuffle文件的管理也独立出新的ShuffleBlockManager进行管理,而不是与RDD cache文件在一起了。
但是Spark 0.8版的Shuffle Write仍然有两个大的问题没有解决。
1)Shuffle文件过多的问题。这会导致文件系统的压力过大并降低IO的吞吐量。
2)虽然Map输出数据不再需要预先存储在内存中然后写入磁盘,从而显著减少了内存压力。但是新引入的DiskObjectWriter所带来的buffer开销也是不容小视的内存开销。假定有1k个MapTask和1k个ReduceTask,就会有1M个bucket,相应地就会有1M个write handler,而每一个write handler默认需要100KB内存,那么总共需要100GB内存。这样仅仅是buffer就需要这么多的内存。因此当ReduceTask数量很多时,内存开销会很大。
为了解决shuffle文件过多的情况,Spark后来引入了新的Shuffle
consolidation,以期显著减少Shuffle文件的数量。
Shuffle
consolidation的原理如图3-12所示:
在图3-12中,假定该job有4个Mapper和4个Reducer,有2个core能并行运行两个task。可以算出Spark的Shuffle Write共需要16个bucket,也就有了16个write handler。在之前的Spark版本中,每个bucket对应一个文件,因此在这里会产生16个shuffle文件。
图3-12 Shuffle
consolidation
而在Shuffle consolidation中,每个bucket并非对应一个文件,而是对应文件中的一个segment。同时Shuffle consolidation产生的Shuffle文件数量与Spark core的个数也有关系。在图3-12中,job中的4个Mapper分为两批运行,在第一批2个Mapper运行时会申请8个bucket,产生8个Shuffle文件;而在第二批Mapper运行时,申请的8个bucket并不会再产生8个新的文件,而是追加写到之前的8个文件后面,这样一共就只有8个Shuffle文件,而在文件内部共有16个不同的segment。因此从理论上讲Shuffle consolidation产生的Shuffle文件数量为C×R,其中C是Spark集群的core number,R是Reducer的个数。
很显然,当M=C时,Shuffle
consolidation产生的文件数和之前的实现相同。
Shuffle
consolidation显著减少了Shuffle文件的数量,解决了Spark之前实现中一个比较严重的问题。但是Writer handler的buffer开销过大依然没有减少,若要减少Writer handler的buffer开销,只能减少Reducer的数量,但是这又会引入新的问题。
2. Shuffle Fetch与Aggregator
Shuffle Write写出去的数据要被Reducer使用,就需要Shuffle Fetch将所需的数据Fetch过来。这里的Fetch操作包括本地和远端,因为Shuffle数据有可能一部分是存储在本地的。在早期版本中,Spark对Shuffle Fetcher实现了两套不同的框架:NIO通过socket连接Fetch数据;OIO通过netty server去fetch数据。分别对应的类是Basic-BlockFetcherIterator和NettyBlockFetcherIterator。
目前在Spark1.5.0中做了优化。新版本定义了类ShuffleBlockFetcherIterator来完成数据的fetch。对于local的数据,ShuffleBlockFetcherIterator会通过local的BlockMan-ager来fetch。对于远端的数据块,它通过BlockTransferService类来完成。具体实现参见如下代码:
在MapReduce的Shuffle过程中,Shuffle fetch过来的数据会进行归并排序(merge sort),使得相同key下的不同value按序归并到一起供Reducer使用,这个过程如图3-13所示:
这些归并排序都是在磁盘上进行的,这样做虽然有效地控制了内存使用,但磁盘IO却大幅增加了。虽然Spark属于MapReduce体系,但是对传统的MapReduce算法进行了一定的改变。Spark假定在大多数应用场景下,Shuffle数据的排序不是必须的,如word count。强制进行排序只会使性能变差,因此Spark并不在Reducer端做归并排序。既然没有归并排序,那Spark是如何进行reduce的呢?这就涉及下面要讲的Shuffle Aggregator了。
图3-13 Fetch merge
Aggregator本质上是一个hashmap,它是以map output的key为key,以任意所要combine的类型为value的hashmap。
在做word count reduce计算count值时,它会将Shuffle
fetch到的每一个key-value对更新或是插入hashmap中(若在hashmap中没有查找到,则插入其中;若查找到,则更新value值)。这样就不需要预先把所有的key-value进行merge sort,而是来一个处理一个,省去了外部排序这一步骤。但同时需要注意的是,reducer的内存必须足以存放这个partition的所有key和count值,因此对内存有一定的要求。
在上面word count的例子中,因为value会不断地更新,而不需要将其全部记录在内存中,因此内存的使用还是比较少的。考虑一下如果是groupByKey这样的操作,Reducer需要得到key对应的所有value。在Hadoop MapReduce中,由于有了归并排序,因此给予Reducer的数据已经是group by key了,而Spark没有这一步,因此需要将key和对应的value全部存放在hashmap中,并将value合并成一个array。可以想象为了能够存放所有数据,用户必须确保每一个partition小到内存能够容纳,这对于内存是非常严峻的考验。因此在Spark文档中,建议用户涉及这类操作时尽量增加partition,也就是增加Mapper和Reducer的数量。
增加Mapper和Reducer的数量固然可以减小partition的大小,使内存可以容纳这个partition。但是在Shuffle write中提到,bucket和对应于bucket的write handler是由Mapper和Reducer的数量决定的,task越多,bucket就会增加得更多,由此带来write handler所需的buffer也会更多。在一方面我们为了减少内存的使用采取了增加task数量的策略,另一方面task数量增多又会带来buffer开销更大的问题,因此陷入了内存使用的两难境地。
为了减少内存的使用,只能将Aggregator的操作从内存移到磁盘上进行,因此Spark新版本中提供了外部排序的实现,以解决这个问题。
Spark将需要聚集的数据分为两类:不需要归并排序和需要归并排序的数据。对于前者,在内存中的AppendOnlyMap中对数据聚集。对于需要归并排序的数据,现在内存中进行聚集,当内存数据达到阈值时,将数据排序后写入磁盘。事实上,磁盘上的数据只是全部数据的一部分,最后将磁盘数据全部进行归并排序和聚集。具体Aggregator的逻辑可以参见Aggregator类的实现。
本节就Shuffle的概念与原理先介绍到这里。在下一章讲解Spark源码时,会对Shuffle的核心机制——Shuffle存储做代码层面的讲解。相信学习完本章和第4章的Shuffle存储机制后,读者会对Shuffle机制掌握得更加深入。