Spark 源码分析 – BlockManagerMaster&Slave

BlockManagerMaster

只是维护一系列对BlockManagerMasterActor的接口, 所有的都是通过tell和askDriverWithReply从BlockManagerMasterActor获取数据 
比较鸡肋的类

private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
  /** Remove a dead executor from the driver actor. This is only called on the driver side. */
  def removeExecutor(execId: String)
  /**
   * Send the driver actor a heart beat from the slave. Returns true if everything works out,
   * false if the driver does not know about the given block manager, which means the block
   * manager should re-register.
   */
  def sendHeartBeat(blockManagerId: BlockManagerId): Boolean
  /** Register the BlockManager's id with the driver. */
  def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef)
  def updateBlockInfo(
      blockManagerId: BlockManagerId,
      blockId: String,
      storageLevel: StorageLevel,
      memSize: Long,
      diskSize: Long): Boolean
  /** Get locations of the blockId from the driver */
  def getLocations(blockId: String): Seq[BlockManagerId]
  /** Get locations of multiple blockIds from the driver */
  def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]]
  /** Get ids of other nodes in the cluster from the driver */
  def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId]
  /**
   * Remove a block from the slaves that have it. This can only be used to remove
   * blocks that the driver knows about.
   */
  def removeBlock(blockId: String)
  /**
   * Remove all blocks belonging to the given RDD.
   */
  def removeRdd(rddId: Int, blocking: Boolean)
  /**
   * Return the memory status for each block manager, in the form of a map from
   * the block manager's id to two long values. The first value is the maximum
   * amount of memory allocated for the block manager, while the second is the
   * amount of remaining memory.
   */
  def getMemoryStatus: Map[BlockManagerId, (Long, Long)]
  def getStorageStatus: Array[StorageStatus]
  /** Stop the driver actor, called only on the Spark driver node */
  def stop() {
    if (driverActor != null) {
      tell(StopBlockManagerMaster)
      driverActor = null
      logInfo("BlockManagerMaster stopped")
    }
  }

  /** Send a one-way message to the master actor, to which we expect it to reply with true. */
  private def tell(message: Any) {
    if (!askDriverWithReply[Boolean](message)) {
      throw new SparkException("BlockManagerMasterActor returned false, expected true.")
    }
  }

  /**
   * Send a message to the driver actor and get its result within a default timeout, or
   * throw a SparkException if this fails.
   */
  private def askDriverWithReply[T](message: Any): T = {
    // TODO: Consider removing multiple attempts
    if (driverActor == null) {
      throw new SparkException("Error sending message to BlockManager as driverActor is null" +
        "[message =" + message + "]")
    }
    var attempts = 0
    var lastException: Exception = null
    while (attempts < AKKA_RETRY_ATTEMPTS) {
      attempts += 1
      try {
        val future = driverActor.ask(message)(timeout)
        val result = Await.result(future, timeout)
        if (result == null) {
          throw new SparkException("BlockManagerMaster returned null")
        }
        return result.asInstanceOf[T]
      } catch {
        case ie: InterruptedException => throw ie
        case e: Exception =>
          lastException = e
          logWarning("Error sending message to BlockManagerMaster in" + attempts + " attempts", e)
      }
      Thread.sleep(AKKA_RETRY_INTERVAL_MS)
    }
    throw new SparkException(
      "Error sending message to BlockManagerMaster [message =" + message + "]", lastException)
  }
}

 

BlockManagerInfo

在BlockManagerMasterActor object中主要就是定义BlockManagerInfo 
主要用于管理BlockManager下面的所有block的BlockStatus和hb, 更新和删除

为何要定义在这个地方?

private[spark]
object BlockManagerMasterActor {
  case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)

  class BlockManagerInfo(
      val blockManagerId: BlockManagerId,
      timeMs: Long,
      val maxMem: Long,
      val slaveActor: ActorRef)
    extends Logging {
    private var _remainingMem: Long = maxMem  //BlockManager的memory大小
    private var _lastSeenMs: Long = timeMs    //BlockManager的heartbeat, 会被不停的更新 
    // Mapping from block id to its status.
    private val _blocks = new JHashMap[String, BlockStatus] // buffer每个block的BlockStatus
    
    // 这里的memSize, 默认为0, 意思是droppedMemorySize
    def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
      if (_blocks.containsKey(blockId)) {
        // The block exists on the slave already.
        val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
        if (originalLevel.useMemory) {
          _remainingMem += memSize
        }
      }

      if (storageLevel.isValid) {// isValid means it is either stored in-memory or on-disk.
        _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))

        if (storageLevel.useMemory) {
          _remainingMem -= memSize
        }
      } else if (_blocks.containsKey(blockId)) {
        // If isValid is not true, drop the block.
        val blockStatus: BlockStatus = _blocks.get(blockId)
        _blocks.remove(blockId)
        if (blockStatus.storageLevel.useMemory) {
          _remainingMem += blockStatus.memSize
        }
      }
    }

    def removeBlock(blockId: String) {
      if (_blocks.containsKey(blockId)) {
        _remainingMem += _blocks.get(blockId).memSize
        _blocks.remove(blockId)
      }
    }
  }
}

 

BlockManagerMasterActor

维护各个slave的BlockManagerInfo信息, 以及各个block的locations信息(所属哪个BlockManager)  
核心功能就是管理和更新这些元数据, 
RegisterBlockManager 
updateBlockInfo 
heartBeat 
RemoveRDD, Executor(BlockManager), Block

/**
 * BlockManagerMasterActor is an actor on the master node to track statuses of
 * all slaves' block managers.
 */
private[spark]
class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
  // Mapping from block manager id to the block manager's information.
  // Buffer所有的BlockManager的Info
  private val blockManagerInfo =
    new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]

  // Mapping from executor ID to block manager ID.
  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

  // Mapping from block id to the set of block managers that have the block.
  // Buffer blockLocation,这里用BlockManagerId来表示location,因为从BlockManagerId可以知道对应的executor
  private val blockLocations = new JHashMap[String, mutable.HashSet[BlockManagerId]]
  def receive = {
    case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
      register(blockManagerId, maxMemSize, slaveActor)
      sender ! true // BlockManagerMaster.tell要求返回true
    // ……这里接收的和BlockManagerMaster中的接口一致, 省略
  }

   // 处理RegisterBlockManager event, 用于slave向master注册自己的blockmanager 
  // 主要就是将slave的BlockManagerInfo注册到master中

  private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
    if (id.executorId == "<driver>" && !isLocal) { // 如果本身就是driver,就不需要注册
      // Got a register message from the master node; don't register it
    } else if (!blockManagerInfo.contains(id)) { // 如果包含,说明已经注册过
      blockManagerIdByExecutor.get(id.executorId) match {
        case Some(manager) => // 一个executor应该只有一个bm, 所以如果该executor已经注册过bm ……
          // A block manager of the same executor already exists.
          // This should never happen. Let's just quit.
          logError("Got two different block manager registrations on " + id.executorId)
          System.exit(1)
        case None =>
          blockManagerIdByExecutor(id.executorId) = id
      }
      blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo( // 创建新的BlockManagerInfo, 并buffer在blockManagerInfo中
        id, System.currentTimeMillis(), maxMemSize, slaveActor)
    }
  }

 

  // 处理updateBlockInfo

  private def updateBlockInfo(
      blockManagerId: BlockManagerId,
      blockId: String,
      storageLevel: StorageLevel,
      memSize: Long,
      diskSize: Long) {

    if (!blockManagerInfo.contains(blockManagerId)) { //blockManagerInfo中不包含这个blockManagerId 
      if (blockManagerId.executorId == "<driver>" && !isLocal) {
        // We intentionally do not register the master (except in local mode),
        // so we should not indicate failure.
        sender ! true
      } else {
        sender ! false
      }
      return
    }
    //调用BlockManagerInfo.updateBlockInfo
    blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)

    var locations: mutable.HashSet[BlockManagerId] = null
    if (blockLocations.containsKey(blockId)) {
      locations = blockLocations.get(blockId)
    } else {
      locations = new mutable.HashSet[BlockManagerId]
      blockLocations.put(blockId, locations) //缓存该block的location信息
    }

    if (storageLevel.isValid) {
      locations.add(blockManagerId)
    } else {
      locations.remove(blockManagerId)
    }

    // Remove the block from master tracking if it has been removed on all slaves.
    if (locations.size == 0) {
      blockLocations.remove(blockId)
    }
    sender ! true
  }

    // 处理removeRdd, 删除RDD

  private def removeRdd(rddId: Int): Future[Seq[Int]] = {
    // First remove the metadata for the given RDD, and then asynchronously remove the blocks from the slaves.
    val prefix = "rdd_" + rddId + "_"
    // Find all blocks for the given RDD, remove the block from both blockLocations and
    // the blockManagerInfo that is tracking the blocks.
    val blocks = blockLocations.keySet().filter(_.startsWith(prefix)) // 从blockLocations中找出所有该RDD对应的blocks
    blocks.foreach { blockId =>  // 从blockManagerInfo和blockLocations中去除这些blocks信息
      val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
      bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
      blockLocations.remove(blockId)
    }
    // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
    // The dispatcher is used as an implicit argument into the Future sequence construction.
    import context.dispatcher
    val removeMsg = RemoveRdd(rddId)
    Future.sequence(blockManagerInfo.values.map { bm =>   // Future.sequence, Transforms a Traversable[Future[A]] into a Future[Traversable[A]
      bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int] // 将RemoveRDD的msg发送给每个slave actors
    }.toSeq)
  }
  
  //处理removeExecutor  //删除Executor上的BlockManager, 名字起的不好

  private def removeExecutor(execId: String) {
    logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
    blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
  }
  private def removeBlockManager(blockManagerId: BlockManagerId) {
    val info = blockManagerInfo(blockManagerId)

    // Remove the block manager from blockManagerIdByExecutor.
    blockManagerIdByExecutor -= blockManagerId.executorId

    // Remove it from blockManagerInfo and remove all the blocks.
    blockManagerInfo.remove(blockManagerId)
    val iterator = info.blocks.keySet.iterator
    while (iterator.hasNext) {
      val blockId = iterator.next
      val locations = blockLocations.get(blockId)
      locations -= blockManagerId
      if (locations.size == 0) {
        blockLocations.remove(locations)
      }
    }
  }

  // 处理sendHeartBeat 
  // blockManager的hb通过blockManagerInfo的LastSeenMs来表示

  private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
    if (!blockManagerInfo.contains(blockManagerId)) {
      blockManagerId.executorId == "<driver>" && !isLocal
    } else {
      blockManagerInfo(blockManagerId).updateLastSeenMs()
      true
    }
  }

   // 处理removeBlock

  // Remove a block from the slaves that have it. This can only be used to remove
  // blocks that the master knows about.
  private def removeBlockFromWorkers(blockId: String) {
    val locations = blockLocations.get(blockId)
    if (locations != null) {
      locations.foreach { blockManagerId: BlockManagerId =>
        val blockManager = blockManagerInfo.get(blockManagerId)
        if (blockManager.isDefined) {
          // Remove the block from the slave's BlockManager.
          // Doesn't actually wait for a confirmation and the message might get lost.
          // If message loss becomes frequent, we should add retry logic here.
          blockManager.get.slaveActor ! RemoveBlock(blockId)
        }
      }
    }
  }

 

BlockManagerSlaveActor

Master可用发给的slave的message就2种, 所以很简单...过于简单 
因为他只处理master发送来的event, 而大部分对于数据的读写等, 在BlockManager中直接实现了

/**
 * An actor to take commands from the master to execute options. For example,
 * this is used to remove blocks from the slave's BlockManager.
 */
class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
  override def receive = {
    case RemoveBlock(blockId) =>
      blockManager.removeBlock(blockId)
    case RemoveRdd(rddId) =>
      val numBlocksRemoved = blockManager.removeRdd(rddId)
      sender ! numBlocksRemoved
  }
}

本文章摘自博客园,原文发布日期:2014-01-10
时间: 2024-12-31 17:01:08

Spark 源码分析 – BlockManagerMaster&Slave的相关文章

Spark源码分析 – BlockManager

参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block  所以storage模块,就是要实现RDD在memory和disk上的persistent功能 首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave  master负责trac

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析 – Shuffle

参考详细探究Spark的shuffle实现, 写的很清楚, 当前设计的来龙去脉   Hadoop Hadoop的思路是, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件  所以Hadoop后面直到reduce之前做的所有的事情其实就是不断的merge, 基于文件的多路并归排序, 在map端的将相同partition的merge到

Spark源码分析之五:Task调度(一)

        在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分

Spark源码分析 – SchedulerBackend

SchedulerBackend, 两个任务, 申请资源和task执行和管理 对于SparkDeploySchedulerBackend, 基于actor模式, 主要就是启动和管理两个actor  Deploy.Client Actor, 负责资源申请, 在SparkDeploySchedulerBackend初始化的时候就会被创建, 然后Client会去到Master上注册, 最终完成在Worker上的ExecutorBackend的创建(参考, Spark源码分析 – Deploy), 并且

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那

Spark源码分析 – SparkContext

Spark源码分析之-scheduler模块  这位写的非常好, 让我对Spark的源码分析, 变的轻松了许多  这里自己再梳理一遍 先看一个简单的spark操作, val sc = new SparkContext(--) val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()   1. SparkContext 这是Spa

Spark源码分析之二:Job的调度模型与运行反馈

        在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈.         首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop

Spark源码分析 – Dependency

Dependency 依赖, 用于表示RDD之间的因果关系, 一个dependency表示一个parent rdd, 所以在RDD中使用Seq[Dependency[_]]来表示所有的依赖关系   Dependency的base class  可见Dependency唯一的成员就是rdd, 即所依赖的rdd, 或parent rdd /** * Base class for dependencies. */ abstract class Dependency[T](val rdd: RDD[T]