Spark技术内幕: 如何解决Shuffle Write一定要落盘的问题?

在Spark 0.6和0.7时,Shuffle的结果都需要先存储到内存中(有可能要写入磁盘),因此对于大数据量的情况下,发生GC和OOM的概率非常大。因此在Spark 0.8的时候,Shuffle的每个record都会直接写入磁盘,并且为下游的每个Task都生成一个单独的文件。这样解决了Shuffle解决都需要存入内存的问题,但是又引入了另外一个问题:生成的小文件过多,尤其在每个文件的数据量不大而文件特别多的时候,大量的随机读会非常影响性能。Spark 0.8.1为了解决0.8中引入的问题,引入了FileConsolidation机制,在一定程度上解决了这个问题。由此可见,Hash Based Shuffle在Scalability方面的确有局限性。而Spark1.0中引入的Shuffle Pluggable Framework,为加入新的Shuffle机制和引入第三方的Shuffle机制奠定了基础。在Spark1.1的时候,引入了Sort Based Shuffle;并且在Spark1.2.0时,Sort Based Shuffle已经成为Shuffle的默认选项。但是,随着内存成本的不断下降和容量的不断上升,Spark Core会在未来重新将Shuffle的过程全部是in memory的吗?我认为这个不太可能也没太大必要,如果用户对于性能有比较苛刻的要求而Shuffle的过程的确是性能优化的重点,那么可以尝试以下实现方式:

1)       Worker的节点采用固态硬盘

2)       Woker的Shuffle结果保存到RAMDisk上

3)       根据自己的应用场景,实现自己的Shuffle机制

 

时间: 2024-10-01 01:41:30

Spark技术内幕: 如何解决Shuffle Write一定要落盘的问题?的相关文章

Spark技术内幕: Task向Executor提交的源码解析

在上文<Spark技术内幕:Stage划分及提交源码分析>中,我们分析了Stage的生成和提交.但是Stage的提交,只是DAGScheduler完成了对DAG的划分,生成了一个计算拓扑,即需要按照顺序计算的Stage,Stage中包含了可以以partition为单位并行计算的Task.我们并没有分析Stage中得Task是如何生成并且最终提交到Executor中去的. 这就是本文的主题. 从org.apache.spark.scheduler.DAGScheduler#submitMissi

Spark技术内幕:Master的故障恢复

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现  详细阐述了使用ZK实现的Master的HA,那么Master是如何快速故障恢复的呢? 处于Standby状态的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent发送的ElectedLeader消息后,就开始通过ZK中保存的Application,Driver和Worker的元数据信息进行故障恢复了,它

Spark技术内幕:Shuffle的性能调优

通过上面的架构和源码实现的分析,不难得出Shuffle是Spark Core比较复杂的模块的结论.它也是非常影响性能的操作之一.因此,在这里整理了会影响Shuffle性能的各项配置.尽管大部分的配置项在前文已经解释过它的含义,由于这些参数的确是非常重要,这里算是做一个详细的总结. 1.1.1  spark.shuffle.manager 前文也多次提到过,Spark1.2.0官方支持两种方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle.其中在Sp

Spark技术内幕:Sort Based Shuffle实现解析

在Spark 1.2.0中,Spark Core的一个重要的升级就是将默认的Hash Based Shuffle换成了Sort Based Shuffle,即spark.shuffle.manager 从hash换成了sort,对应的实现类分别是org.apache.spark.shuffle.hash.HashShuffleManager和org.apache.spark.shuffle.sort.SortShuffleManager. 这个方式的选择是在org.apache.spark.Sp

Spark技术内幕:Shuffle Read的整体流程

回忆一下,每个Stage的上边界,要么需要从外部存储读取数据,要么需要读取上一个Stage的输出:而下边界,要么是需要写入本地文件系统(需要Shuffle),以供childStage读取,要么是最后一个Stage,需要输出结果.这里的Stage,在运行时的时候就是可以以pipeline的方式运行的一组Task,除了最后一个Stage对应的是ResultTask,其余的Stage对应的都是ShuffleMap Task. 而除了需要从外部存储读取数据和RDD已经做过cache或者checkpoin

Spark技术内幕: Shuffle详解(一)

通过上面一系列文章,我们知道在集群启动时,在Standalone模式下,Worker会向Master注册,使得Master可以感知进而管理整个集群:Master通过借助ZK,可以简单的实现HA:而应用方通过SparkContext这个与集群的交互接口,在创建SparkContext时就完成了Application的注册,Master为其分配Executor:在应用方创建了RDD并且在这个RDD上进行了很多的Transformation后,触发action,通过DAGScheduler将DAG划分

Spark技术内幕:Shuffle Pluggable框架详解,你怎么开发自己的Shuffle Service?

首先介绍一下需要实现的接口.框架的类图如图所示(今天CSDN抽风,竟然上传不了图片.如果需要实现新的Shuffle机制,那么需要实现这些接口. 1.1.1  org.apache.spark.shuffle.ShuffleManager Driver和每个Executor都会持有一个ShuffleManager,这个ShuffleManager可以通过配置项spark.shuffle.manager指定,并且由SparkEnv创建.Driver中的ShuffleManager负责注册Shuffl

Spark技术内幕: Shuffle详解(三)

前两篇文章写了Shuffle Read的一些实现细节.但是要想彻底理清楚这里边的实现逻辑,还是需要更多篇幅的:本篇开始,将按照Job的执行顺序,来讲解Shuffle.即,结果数据(ShuffleMapTask的结果和ResultTask的结果)是如何产生的:结果是如何处理的:结果是如何读取的. 在Worker上接收Task执行命令的是org.apache.spark.executor.CoarseGrainedExecutorBackend.它在接收到LaunchTask的命令后,通过在Driv

Spark技术内幕:Shuffle Map Task运算结果的处理

Shuffle Map Task运算结果的处理 这个结果的处理,分为两部分,一个是在Executor端是如何直接处理Task的结果的:还有就是Driver端,如果在接到Task运行结束的消息时,如何对Shuffle Write的结果进行处理,从而在调度下游的Task时,下游的Task可以得到其需要的数据. Executor端的处理 在解析BasicShuffle Writer时,我们知道ShuffleMap Task在Executor上运行时,最终会调用org.apache.spark.sche