Spark技术内幕:Shuffle Read的整体流程

回忆一下,每个Stage的上边界,要么需要从外部存储读取数据,要么需要读取上一个Stage的输出;而下边界,要么是需要写入本地文件系统(需要Shuffle),以供childStage读取,要么是最后一个Stage,需要输出结果。这里的Stage,在运行时的时候就是可以以pipeline的方式运行的一组Task,除了最后一个Stage对应的是ResultTask,其余的Stage对应的都是ShuffleMap Task。

而除了需要从外部存储读取数据和RDD已经做过cache或者checkpoint的Task,一般Task的开始都是从ShuffledRDD的ShuffleRead开始的。本节将详细讲解Shuffle Read的过程。

先看一下ShuffleRead的整体架构图。

org.apache.spark.rdd.ShuffledRDD#compute 开始,通过调用org.apache.spark.shuffle.ShuffleManager的getReader方法,获取到org.apache.spark.shuffle.ShuffleReader,然后调用其read()方法进行读取。在Spark1.2.0中,不管是Hash BasedShuffle或者是Sort BasedShuffle,内置的Shuffle Reader都是 org.apache.spark.shuffle.hash.HashShuffleReader。核心实现:

 override def read(): Iterator[Product2[K, C]] = {
val ser =Serializer.getSerializer(dep.serializer)
// 获取结果
   val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId,startPartition, context, ser)
   // 处理结果
   val aggregatedIter: Iterator[Product2[K, C]] = if(dep.aggregator.isDefined) {//需要聚合
     if (dep.mapSideCombine) {//需要map side的聚合
       new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(
                            iter, context))
     } else {//只需要reducer端的聚合
       new InterruptibleIterator(context,dep.aggregator.get.combineValuesByKey(
                            iter, context))

}
    }else { // 无需聚合操作
       iter.asInstanceOf[Iterator[Product2[K,C]]].map(pair => (pair._1, pair._2))
    }

   // Sort the output if there is a sort ordering defined.
   dep.keyOrdering match {//判断是否需要排序
     case Some(keyOrd: Ordering[K]) => //对于需要排序的情况
       // 使用ExternalSorter进行排序,注意如果spark.shuffle.spill是false,那么数据是
       // 不会spill到硬盘的
       val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd),
                                         serializer= Some(ser))
       sorter.insertAll(aggregatedIter)
       context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
       context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
       sorter.iterator
     case None => //无需排序
       aggregatedIter
    }
  }

org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher#fetch会获得数据,它首先会通过

org.apache.spark.MapOutputTracker#getServerStatuses来获得数据的meta信息,这个过程有可能需要向org.apache.spark.MapOutputTrackerMasterActor发送读请求,这个读请求是在org.apache.spark.MapOutputTracker#askTracker发出的。在获得了数据的meta信息后,它会将这些数据存入Seq[(BlockManagerId,Seq[(BlockId, Long)])]中,然后调用org.apache.spark.storage.ShuffleBlockFetcherIterator最终发起请求。ShuffleBlockFetcherIterator根据数据的本地性原则进行数据获取。如果数据在本地,那么会调用org.apache.spark.storage.BlockManager#getBlockData进行本地数据块的读取。而getBlockData对于shuffle类型的数据,会调用ShuffleManager的ShuffleBlockManager的getBlockData。

如果数据在其他的Executor上,那么如果用户使用的spark.shuffle.blockTransferService是netty,那么就会通过org.apache.spark.network.netty.NettyBlockTransferService#fetchBlocks获取;如果使用的是nio,那么就会通过org.apache.spark.network.nio.NioBlockTransferService#fetchBlocks获取。

数据读取策略的划分

org.apache.spark.storage.ShuffleBlockFetcherIterator会通过splitLocalRemoteBlocks划分数据的读取策略:如果在本地有,那么可以直接从BlockManager中获取数据;如果需要从其他的节点上获取,那么需要走网络。由于Shuffle的数据量可能会很大,因此这里的网络读有以下的策略:

1)       每次最多启动5个线程去最多5个节点上读取数据

2)       每次请求的数据大小不会超过spark.reducer.maxMbInFlight(默认值为48MB)/5

这样做的原因有几个:

1)  避免占用目标机器的过多带宽,在千兆网卡为主流的今天,带宽还是比较重要的。如果机器使用的万兆网卡,那么可以通过设置spark.reducer.maxMbInFlight来充分利用带宽。

2)  请求数据可以平行化,这样请求数据的时间可以大大减少。请求数据的总时间就是请求中耗时最长的。这样可以缓解一个节点出现网络拥塞时的影响。

主要的实现:

private[this] def splitLocalRemoteBlocks():ArrayBuffer[FetchRequest] = {
   val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
   val remoteRequests = new ArrayBuffer[FetchRequest]
   for ((address, blockInfos) <- blocksByAddress) {
     if (address.executorId == blockManager.blockManagerId.executorId) {
       // Block在本地,需要过滤大小为0的block。
       localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
       numBlocksToFetch += localBlocks.size
     } else { //需要远程获取的Block
       val iterator = blockInfos.iterator
        var curRequestSize = 0L
       var curBlocks = new ArrayBuffer[(BlockId, Long)]
       while (iterator.hasNext) {
          //blockId 是org.apache.spark.storage.ShuffleBlockId,
          // 格式:"shuffle_" +shuffleId + "_" + mapId + "_" + reduceId
         val (blockId, size) = iterator.next()
         // Skip empty blocks
         if (size > 0) {
           curBlocks += ((blockId, size))
           remoteBlocks += blockId
           numBlocksToFetch += 1
           curRequestSize += size
          }

         if (curRequestSize >= targetRequestSize) {
           // 当前总的size已经可以批量放入一次request中
           remoteRequests += new FetchRequest(address, curBlocks)
           curBlocks = new ArrayBuffer[(BlockId, Long)]
           curRequestSize = 0
         }
       }
       // 剩余的请求组成一次request
       if (curBlocks.nonEmpty) {
         remoteRequests += new FetchRequest(address, curBlocks)
       }
     }
    }
   remoteRequests
  }

本地读取

fetchLocalBlocks() 负责本地Block的获取。在splitLocalRemoteBlocks中,已经将本地的Block列表存入了localBlocks:private[this] val localBlocks = newArrayBuffer[BlockId]()

具体过程如下:

  val iter = localBlocks.iterator
   while (iter.hasNext) {
     val blockId = iter.next()
     try {
       val buf = blockManager.getBlockData(blockId)
       shuffleMetrics.localBlocksFetched += 1
       buf.retain()
       results.put(new SuccessFetchResult(blockId, 0, buf))
     } catch {
     }
    }

而blockManager.getBlockData(blockId)的实现是:

override def getBlockData(blockId:BlockId): ManagedBuffer = {
   if (blockId.isShuffle) {
    shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
}

这就调用了ShuffleBlockManager的getBlockData方法。在Shuffle Pluggable框架中我们介绍了实现一个Shuffle Service之一就是要实现ShuffleBlockManager。

以Hash BasedShuffle为例,它的ShuffleBlockManager是org.apache.spark.shuffle.FileShuffleBlockManager。FileShuffleBlockManager有两种情况,一种是File consolidate的,这种的话需要根据Map ID和 Reduce ID首先获得FileGroup的一个文件,然后根据在文件中的offset和size来获取需要的数据;如果是没有File consolidate,那么直接根据Shuffle Block ID直接读取整个文件就可以。

override def getBlockData(blockId:ShuffleBlockId): ManagedBuffer = {
   if (consolidateShuffleFiles) {
     val shuffleState = shuffleStates(blockId.shuffleId)
     val iter = shuffleState.allFileGroups.iterator
while(iter.hasNext) {
  // 根据Map ID和Reduce ID获取File Segment的信息
       val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId,blockId.reduceId)
       if (segmentOpt.isDefined) {
         val segment = segmentOpt.get
         // 根据File Segment的信息,从FileGroup中找到相应的File和Block在
          // 文件中的offset和size
         return new FileSegmentManagedBuffer(
           transportConf, segment.file, segment.offset, segment.length)
       }
     }
     throw new IllegalStateException("Failed to find shuffle block:" + blockId)
    }else {
     val file = blockManager.diskBlockManager.getFile(blockId) //直接获取文件句柄
     new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
    }
  }

对于Sort BasedShuffle,它需要通过索引文件来获得数据块在数据文件中的具体位置信息,从而读取这个数据。

具体实现在org.apache.spark.shuffle.IndexShuffleBlockManager#getBlockData中。 

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
   // 根据ShuffleID和MapID从org.apache.spark.storage.DiskBlockManager 获取索引文件
   val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
   val in = new DataInputStream(new FileInputStream(indexFile))
   try {
     ByteStreams.skipFully(in, blockId.reduceId * 8) //跳到本次Block的数据区
     val offset = in.readLong() //数据文件中的开始位置
     val nextOffset = in.readLong() //数据文件中的结束位置
     new FileSegmentManagedBuffer(
       transportConf,
       getDataFile(blockId.shuffleId, blockId.mapId),
       offset,
       nextOffset - offset)
    }finally {
     in.close()
    }
  }

如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。

点我投票

时间: 2024-11-16 00:47:25

Spark技术内幕:Shuffle Read的整体流程的相关文章

Spark技术内幕:Storage 模块整体架构

Storage模块负责了Spark计算过程中所有的存储,包括基于Disk的和基于Memory的.用户在实际编程中,面对的是RDD,可以将RDD的数据通过调用org.apache.spark.rdd.RDD#cache将数据持久化:持久化的动作都是由Storage模块完成的.包括Shuffle过程中的数据,也都是由Storage模块管理的.可以说,RDD实现了用户的逻辑,而Storage则管理了用户的数据.本章将讲解Storage模块的实现. 1.1     模块整体架构 org.apache.s

Spark技术内幕: Shuffle详解(一)

通过上面一系列文章,我们知道在集群启动时,在Standalone模式下,Worker会向Master注册,使得Master可以感知进而管理整个集群:Master通过借助ZK,可以简单的实现HA:而应用方通过SparkContext这个与集群的交互接口,在创建SparkContext时就完成了Application的注册,Master为其分配Executor:在应用方创建了RDD并且在这个RDD上进行了很多的Transformation后,触发action,通过DAGScheduler将DAG划分

Spark技术内幕: Shuffle详解(三)

前两篇文章写了Shuffle Read的一些实现细节.但是要想彻底理清楚这里边的实现逻辑,还是需要更多篇幅的:本篇开始,将按照Job的执行顺序,来讲解Shuffle.即,结果数据(ShuffleMapTask的结果和ResultTask的结果)是如何产生的:结果是如何处理的:结果是如何读取的. 在Worker上接收Task执行命令的是org.apache.spark.executor.CoarseGrainedExecutorBackend.它在接收到LaunchTask的命令后,通过在Driv

Spark技术内幕: Shuffle详解(二)

本文主要关注ShuffledRDD的Shuffle Read是如何从其他的node上读取数据的. 上文讲到了获取如何获取的策略都在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks中.可以见注释. protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { // Make remo

Spark技术内幕: Task向Executor提交的源码解析

在上文<Spark技术内幕:Stage划分及提交源码分析>中,我们分析了Stage的生成和提交.但是Stage的提交,只是DAGScheduler完成了对DAG的划分,生成了一个计算拓扑,即需要按照顺序计算的Stage,Stage中包含了可以以partition为单位并行计算的Task.我们并没有分析Stage中得Task是如何生成并且最终提交到Executor中去的. 这就是本文的主题. 从org.apache.spark.scheduler.DAGScheduler#submitMissi

Spark技术内幕:Master的故障恢复

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现  详细阐述了使用ZK实现的Master的HA,那么Master是如何快速故障恢复的呢? 处于Standby状态的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent发送的ElectedLeader消息后,就开始通过ZK中保存的Application,Driver和Worker的元数据信息进行故障恢复了,它

Spark技术内幕:Shuffle的性能调优

通过上面的架构和源码实现的分析,不难得出Shuffle是Spark Core比较复杂的模块的结论.它也是非常影响性能的操作之一.因此,在这里整理了会影响Shuffle性能的各项配置.尽管大部分的配置项在前文已经解释过它的含义,由于这些参数的确是非常重要,这里算是做一个详细的总结. 1.1.1  spark.shuffle.manager 前文也多次提到过,Spark1.2.0官方支持两种方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle.其中在Sp

Spark技术内幕: 如何解决Shuffle Write一定要落盘的问题?

在Spark 0.6和0.7时,Shuffle的结果都需要先存储到内存中(有可能要写入磁盘),因此对于大数据量的情况下,发生GC和OOM的概率非常大.因此在Spark 0.8的时候,Shuffle的每个record都会直接写入磁盘,并且为下游的每个Task都生成一个单独的文件.这样解决了Shuffle解决都需要存入内存的问题,但是又引入了另外一个问题:生成的小文件过多,尤其在每个文件的数据量不大而文件特别多的时候,大量的随机读会非常影响性能.Spark 0.8.1为了解决0.8中引入的问题,引入

Spark技术内幕:Shuffle Pluggable框架详解,你怎么开发自己的Shuffle Service?

首先介绍一下需要实现的接口.框架的类图如图所示(今天CSDN抽风,竟然上传不了图片.如果需要实现新的Shuffle机制,那么需要实现这些接口. 1.1.1  org.apache.spark.shuffle.ShuffleManager Driver和每个Executor都会持有一个ShuffleManager,这个ShuffleManager可以通过配置项spark.shuffle.manager指定,并且由SparkEnv创建.Driver中的ShuffleManager负责注册Shuffl