Spark技术内幕:Shuffle的性能调优

通过上面的架构和源码实现的分析,不难得出Shuffle是Spark Core比较复杂的模块的结论。它也是非常影响性能的操作之一。因此,在这里整理了会影响Shuffle性能的各项配置。尽管大部分的配置项在前文已经解释过它的含义,由于这些参数的确是非常重要,这里算是做一个详细的总结。

1.1.1  spark.shuffle.manager

前文也多次提到过,Spark1.2.0官方支持两种方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle。其中在Spark 1.0之前仅支持Hash Based Shuffle。Spark 1.1的时候引入了Sort Based Shuffle。Spark 1.2的默认Shuffle机制从Hash变成了Sort。如果需要Hash Based Shuffle,可以将spark.shuffle.manager设置成“hash”即可。

如果对性能有比较苛刻的要求,那么就要理解这两种不同的Shuffle机制的原理,结合具体的应用场景进行选择。

Hash Based Shuffle,就是将数据根据Hash的结果,将各个Reducer partition的数据写到单独的文件中去,写数据时不会有排序的操作。这个问题就是如果Reducer的partition比较多的时候,会产生大量的磁盘文件。这会带来两个问题:

1)       同时打开的文件比较多,那么大量的文件句柄和写操作分配的临时内存会非常大,对于内存的使用和GC带来很多的压力。尤其是在Sparkon YARN的模式下,Executor分配的内存普遍比较小的时候,这个问题会更严重。

2)       从整体来看,这些文件带来大量的随机读,读性能可能会遇到瓶颈。

更加细节的讨论可以参见7.1节和7.6.6(尝试去解决写的文件太多的问题)。

 

Sort Based Shuffle会根据实际情况对数据采用不同的方式进行Sort。这个排序可能仅仅是按照Reducer的partition进行排序,保证同一个Shuffle Map Task的对应于不同的Reducer的partition的数据都可以写到同一个数据文件,通过一个Offset来标记不同的Reducer partition的分界。因此一个Shuffle Map Task仅仅会生成一个数据文件(还有一个index索引文件),从而避免了Hash Based Shuffle文件数量过多的问题。

 

选择Hash还是Sort,取决于内存,排序和文件操作等因素的综合影响。

对于不需要进行排序的Shuffle而且Shuffle产生的文件数量不是特别多,Hash Based Shuffle可能是个更好的选择;毕竟Sort Based Shuffle至少会按照Reducer的partition进行排序。

而Sort BasedShuffle的优势就在于Scalability,它的出现实际上很大程度上是解决Hash Based Shuffle的Scalability的问题。由于Sort Based Shuffle还在不断的演进中,因此Sort Based Shuffle的性能会得到不断的改善。

对选择那种Shuffle,如果对于性能要求苛刻,最好还是通过实际的场景中测试后再决定。不过选择默认的Sort,可以满足大部分的场景需要。

1.1.2  spark.shuffle.spill

这个参数的默认值是true,用于指定Shuffle过程中如果内存中的数据超过阈值(参考spark.shuffle.memoryFraction的设置),那么是否需要将部分数据临时写入外部存储。如果设置为false,那么这个过程就会一直使用内存,会有Out Of Memory的风险。因此只有在确定内存足够使用时,才可以将这个选项设置为false。

对于Hash BasedShuffle的Shuffle Write过程中使用的org.apache.spark.util.collection.AppendOnlyMap就是全内存的方式,而org.apache.spark.util.collection.ExternalAppendOnlyMap对org.apache.spark.util.collection.AppendOnlyMap有了进一步的封装,在内存使用超过阈值时会将它spill到外部存储,在最后的时候会对这些临时文件进行Merge。

而Sort BasedShuffle Write使用到的org.apache.spark.util.collection.ExternalSorter也会有类似的spill。

而对于ShuffleRead,如果需要做aggregate,也可能在aggregate的过程中将数据spill的外部存储。

1.1.3  spark.shuffle.memoryFraction和spark.shuffle.safetyFraction

在启用spark.shuffle.spill的情况下,spark.shuffle.memoryFraction决定了当Shuffle过程中使用的内存达到总内存多少比例的时候开始Spill。在Spark 1.2.0里,这个值是0.2。通过这个参数可以设置Shuffle过程占用内存的大小,它直接影响了Spill的频率和GC。

 如果Spill的频率太高,那么可以适当的增加spark.shuffle.memoryFraction来增加Shuffle过程的可用内存数,进而减少Spill的频率。当然为了避免OOM(内存溢出),可能就需要减少RDD cache所用的内存,即需要减少spark.storage.memoryFraction的值;但是减少RDD cache所用的内存有可能会带来其他的影响,因此需要综合考量。

在Shuffle过程中,Shuffle占用的内存数是估计出来的,并不是每次新增的数据项都会计算一次占用的内存大小,这样做是为了降低时间开销。但是估计也会有误差,因此存在实际使用的内存数比估算值要大的情况,因此参数 spark.shuffle.safetyFraction作为一个保险系数降低实际Shuffle过程所需要的内存值,降低实际内存超出用户配置值的风险。

1.1.4  spark.shuffle.sort.bypassMergeThreshold

这个配置的默认值是200,用于设置在Reducer的partition数目少于多少的时候,Sort Based Shuffle内部不使用Merge Sort的方式处理数据,而是直接将每个partition写入单独的文件。这个方式和Hash Based的方式是类似的,区别就是在最后这些文件还是会合并成一个单独的文件,并通过一个index索引文件来标记不同partition的位置信息。从Reducer看来,数据文件和索引文件的格式和内部是否做过Merge Sort是完全相同的。

这个可以看做SortBased Shuffle在Shuffle量比较小的时候对于Hash Based Shuffle的一种折衷。当然了它和Hash Based Shuffle一样,也存在同时打开文件过多导致内存占用增加的问题。因此如果GC比较严重或者内存比较紧张,可以适当的降低这个值。

1.1.5  spark.shuffle.blockTransferService

在Spark 1.2.0,这个配置的默认值是netty,而之前是nio。这个主要是用于在各个Executor之间传输Shuffle数据。Netty的实现更加简洁,但实际上用户不用太关心这个选项。除非是有特殊的需求,否则采用默认配置就可以。

1.1.6  spark.shuffle.consolidateFiles

这个配置的默认配置是false。主要是为了解决在Hash Based Shuffle过程中产生过多文件的问题。如果配置选项为true,那么对于同一个Core上运行的Shuffle Map Task不会新产生一个Shuffle文件而是重用原来的。但是每个Shuffle Map Task还是需要产生下游Task数量的文件,因此它并没有减少同时打开文件的数量。如果需要了解更加详细的细节,可以阅读7.1节。

但是consolidateFiles的机制在Spark 0.8.1就引入了,到Spark 1.2.0还是没有稳定下来。从源码实现的角度看,实现源码是非常简单的,但是由于涉及本地的文件系统等限制,这个策略可能会带来各种各样的问题。由于它并没有减少同时打开文件的数量,因此不能减少由文件句柄带来的内存消耗。如果面临Shuffle的文件数量非常大,那么是否打开这个选项最好还是通过实际测试后再决定。

 

1.1.7  spark.shuffle.service.enabled

(false)

1.1.8  spark.shuffle.compress和 spark.shuffle.spill.compress

 这两个参数的默认配置都是true。spark.shuffle.compress和spark.shuffle.spill.compress都是用来设置Shuffle过程中是否对Shuffle数据进行压缩;其中前者针对最终写入本地文件系统的输出文件,后者针对在处理过程需要spill到外部存储的中间数据,后者针对最终的shuffle输出文件。

 如何设置spark.shuffle.compress?

如果下游的Task通过网络获取上游Shuffle Map Task的结果的网络IO成为瓶颈,那么就需要考虑将它设置为true:通过压缩数据来减少网络IO。由于上游Shuffle Map Task和下游的Task现阶段是不会并行处理的,即上游Shuffle Map Task处理完成,然后下游的Task才会开始执行。因此如果需要压缩的时间消耗就是Shuffle MapTask压缩数据的时间 + 网络传输的时间 + 下游Task解压的时间;而不需要压缩的时间消耗仅仅是网络传输的时间。因此需要评估压缩解压时间带来的时间消耗和因为数据压缩带来的时间节省。如果网络成为瓶颈,比如集群普遍使用的是千兆网络,那么可能将这个选项设置为true是合理的;如果计算是CPU密集型的,那么可能将这个选项设置为false才更好。

 如何设置spark.shuffle.spill.compress?

如果设置为true,代表处理的中间结果在spill到本地硬盘时都会进行压缩,在将中间结果取回进行merge的时候,要进行解压。因此要综合考虑CPU由于引入压缩解压的消耗时间和Disk IO因为压缩带来的节省时间的比较。在Disk IO成为瓶颈的场景下,这个被设置为true可能比较合适;如果本地硬盘是SSD,那么这个设置为false可能比较合适。

1.1.9  spark.reducer.maxMbInFlight

这个参数用于限制一个ReducerTask向其他的Executor请求Shuffle数据时所占用的最大内存数,尤其是如果网卡是千兆和千兆以下的网卡时。默认值是48MB。设置这个值需要中和考虑网卡带宽和内存。

如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。

点我投票

时间: 2024-10-28 10:30:53

Spark技术内幕:Shuffle的性能调优的相关文章

Spark技术内幕: Shuffle详解(一)

通过上面一系列文章,我们知道在集群启动时,在Standalone模式下,Worker会向Master注册,使得Master可以感知进而管理整个集群:Master通过借助ZK,可以简单的实现HA:而应用方通过SparkContext这个与集群的交互接口,在创建SparkContext时就完成了Application的注册,Master为其分配Executor:在应用方创建了RDD并且在这个RDD上进行了很多的Transformation后,触发action,通过DAGScheduler将DAG划分

Spark技术内幕: Shuffle详解(三)

前两篇文章写了Shuffle Read的一些实现细节.但是要想彻底理清楚这里边的实现逻辑,还是需要更多篇幅的:本篇开始,将按照Job的执行顺序,来讲解Shuffle.即,结果数据(ShuffleMapTask的结果和ResultTask的结果)是如何产生的:结果是如何处理的:结果是如何读取的. 在Worker上接收Task执行命令的是org.apache.spark.executor.CoarseGrainedExecutorBackend.它在接收到LaunchTask的命令后,通过在Driv

Spark技术内幕: Shuffle详解(二)

本文主要关注ShuffledRDD的Shuffle Read是如何从其他的node上读取数据的. 上文讲到了获取如何获取的策略都在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks中.可以见注释. protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { // Make remo

## Spark作业性能调优总结

Spark作业性能调优总结 前段时间在集群上运行Spark作业,但是发现作业运行到某个stage之后就卡住了,之后也不再有日志输出.于是开始着手对作业进行调优,下面是遇到的问题和解决过程: 运行时错误 Out Of Memory: Java heap space / GC overhead limit exceeded 使用yarn logs -applicationId=appliation_xxx_xxx 命令查看Yarn收集的各个Executor的日志. 可以发现OOM的错误,以及一些re

oracle数据库性能调优技术:深入理解单表执行计划

一.概述 这篇文章是数据库性能调优技术的第二篇.上一篇讲解的索引调优是数据库性能调优技术的基础.这篇讲解的深入理解单表执行计划,是数据库性能调优的有力工具. 查询语句可以有多种可选执行计划,如何选择效率最高的执行计划?达梦数据库.oracle数据库.sql server数据库都是采用基于成本的查询优化,对备选执行计划进行打分,选择大家最小的执行计划进行执行.这些内容,我会在后续的几篇文章中进行详细的描述.在此之前,我们首先需要掌握如何理解数据库执行计划.这篇文章讲解只涉及单表操作的执行计划. 达

数据库性能调优技术

一.概述 随着数据库在各个领域的使用不断增长,越来越多的应用提出了高性能的要求.数据库性能调优是知识密集型的学科,需要综合考虑各种复杂的因素:数据库缓冲区的大小.索引的创建.语句改写等等.总之,数据库性能调优的目的在于使系统运行得更快. 调优需要有广泛的知识,这使得它既简单又复杂. 说调优简单,是因为调优者不必纠缠于复杂的公式和规则.许多学术界和业界的研究者都在尝试将调优和查询处理建立在数学基础之上. 称调优复杂,是因为如果要完全理解常识所依赖的原理,还需要对应用.数据库管理系统.操作系统以及硬

oracle数据库性能调优技术:索引调优

一.概述 随着数据库在各个领域的使用不断增长,越来越多的应用提出了高性能的要求.数据库性能调优是知识密集型的学科,需要综合考虑各种复杂的因素:数据库缓冲区的大小.索引的创建.语句改写等等.总之,数据库性能调优的目的在于使系统运行得更快. 调优需要有广泛的知识,这使得它既简单又复杂. 说调优简单,是因为调优者不必纠缠于复杂的公式和规则.许多学术界和业界的研究者都在尝试将调优和查询处理建立在数学基础之上. 称调优复杂,是因为如果要完全理解常识所依赖的原理,还需要对应用.数据库管理系统.操作系统以及硬

oracle数据库性能调优技术:深入理解嵌套循环执行计划

一.概述 这篇文章是数据库性能调优技术的第三篇.上一篇文章讲解了深入了解单表执行计划,单表执行计划是理解多表执行计划的基础. 两张表的连接有三种执行方式:1)嵌套循环连接:2)散列连接:3)归并连接.两张表连接时选择这三种中的哪一种呢?这取决于索引.以及连接的代价.在该系列的第三篇(本文)文章中讲解嵌套循环连接,第四篇文章中讲解散列连接,第五篇文章中讲解归并连接.在第六篇以后会分析IN子查询以及EXISTS子查询. 达梦数据库.oracle数据库.sql server数据库在数据库执行计划方面并

oracle数据库性能调优技术:深入理解散列连接执行计划

一.概述 这篇文章是数据库性能调优技术系列的第四篇.上一篇文章讲解了深入理解嵌套循环连接执行计划. 上一篇文章中提到两张表的连接有三种执行方式:1)嵌套循环连接:2)散列连接:3)归并连接.散列连接是很重要的连接方式,包含比较多的内容,这篇文章中讲解为什么需要散列连接?如何理解散列连接? 和前三篇文章一样,本文讲解的是些比较抽象的内容,不拘泥于具体的数据.所以本文中使用的代价评估模型也是抽象的,假设了数据库缓冲区大小只有一个页,新页的读取必然导致旧页的释放.读完本文之后应该能够读懂达梦数据库.o