Spark源码分析 – SchedulerBackend

SchedulerBackend, 两个任务, 申请资源和task执行和管理

对于SparkDeploySchedulerBackend, 基于actor模式, 主要就是启动和管理两个actor 
Deploy.Client Actor, 负责资源申请, 在SparkDeploySchedulerBackend初始化的时候就会被创建, 然后Client会去到Master上注册, 最终完成在Worker上的ExecutorBackend的创建(参考, Spark源码分析 – Deploy), 并且这些ExecutorBackend都会被注册到Driver Actor上 
Driver Actor, 负责task的执行 
由于Spark是原先基于Mesos的, 然后为了兼容性才提供Standalone模式, 所以你可以看到Driver Actor中的接口都是mesos风格的, 在mesos的情况下应该是动态的申请资源, 然后执行task (猜测, 还没有看源码) 
但对于coarse-grained Mesos mode和Spark's standalone deploy mode, 这步被简化成当TaskScheduler初始化的时候, 直接就将资源分配好了, 然后Driver Actor只是负责调度task在这些executor上执行 
所以在makeOffers的注释上, 写的是Make fake resource offers, 因为这里其实没有真正的offer resources 
关于Driver Actor如何调用task去执行, 关键在scheduler.resourceOffers

SchedulerBackend

package org.apache.spark.scheduler.cluster
/**
 * A backend interface for cluster scheduling systems that allows plugging in different ones under
 * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
 * machines become available and can launch tasks on them.
 */
private[spark] trait SchedulerBackend {
  def start(): Unit
  def stop(): Unit
  def reviveOffers(): Unit
  def defaultParallelism(): Int

  // Memory used by each executor (in megabytes)
  protected val executorMemory: Int = SparkContext.executorMemoryRequested

  // TODO: Probably want to add a killTask too
}

 

StandaloneSchedulerBackend

用于coarse-grained Mesos mode和Spark's standalone deploy mode 
可用看到主要目的, 就是创建并维护driverActor 
主要的逻辑都在driverActor 中

/**
 * A standalone scheduler backend, which waits for standalone executors to connect to it through
 * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
 * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
 */
private[spark]
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
  extends SchedulerBackend with Logging
{
  // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
  var totalCoreCount = new AtomicInteger(0)

  class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
  // ……后面分析
  }

  var driverActor: ActorRef = null
  val taskIdsOnSlave = new HashMap[String, HashSet[String]]

  override def start() {
    val properties = new ArrayBuffer[(String, String)]
    val iterator = System.getProperties.entrySet.iterator
    while (iterator.hasNext) {
      val entry = iterator.next
      val (key, value) = (entry.getKey.toString, entry.getValue.toString)
      if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
        properties += ((key, value))
      }
    }
    driverActor = actorSystem.actorOf( // 关键就是创建driverActor
      Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
  }

  private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")

  override def stop() {
    try {
      if (driverActor != null) {
        val future = driverActor.ask(StopDriver)(timeout) // 关闭driverActor
        Await.result(future, timeout)
      }
    } catch {
      case e: Exception =>
        throw new SparkException("Error stopping standalone scheduler's driver actor", e)
    }
  }

  override def reviveOffers() {
    driverActor ! ReviveOffers  // 发送ReviveOffers event给driverActor
  }

  override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
      .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))

  // Called by subclasses when notified of a lost worker
  def removeExecutor(executorId: String, reason: String) {
    try {
      val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
      Await.result(future, timeout)
    } catch {
      case e: Exception =>
        throw new SparkException("Error notifying standalone scheduler's driver actor", e)
    }
  }
}

DriverActor

关键的函数, makeOffers, 在executors上launch tasks, 什么时候调用? 
RegisterExecutor的时候, 
Task StatusUpdate的时候, 
收到ReviveOffers event的时候, 新的task被submit的时候, delay scheduling被触发的时候(per second) 
关于delay scheduling, 应该是为了保持活度, 当没有任何状态变化时, 仍然需要继续保持launch tasks

  class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
    private val executorActor = new HashMap[String, ActorRef] // track所有executorActor Ref
    private val executorAddress = new HashMap[String, Address]
    private val executorHost = new HashMap[String, String]
    private val freeCores = new HashMap[String, Int]
    private val actorToExecutorId = new HashMap[ActorRef, String]
    private val addressToExecutorId = new HashMap[Address, String]

    override def preStart() {
      // Listen for remote client disconnection events, since they don't go through Akka's watch()
      context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])

      // Periodically revive offers to allow delay scheduling to work
      val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
      context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
    }

    def receive = {
      case RegisterExecutor(executorId, hostPort, cores) =>  // 接收从StandaloneExecutorBackend发来的RegisterExecutor
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorActor.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor(sparkProperties)
          context.watch(sender) // watch executor actor
          executorActor(executorId) = sender
          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
          freeCores(executorId) = cores
          executorAddress(executorId) = sender.path.address
          actorToExecutorId(sender) = executorId
          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          makeOffers()
        }

      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          freeCores(executorId) += 1
          makeOffers(executorId)
        }

      case ReviveOffers => // 接收从StandaloneSchedulerBackend发来的ReviveOffers
        makeOffers()

      case StopDriver =>
        sender ! true
        context.stop(self)

      case RemoveExecutor(executorId, reason) =>
        removeExecutor(executorId, reason)
        sender ! true

      case Terminated(actor) =>
        actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))

      case RemoteClientDisconnected(transport, address) =>
        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))

      case RemoteClientShutdown(transport, address) =>
        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
    }

    // Make fake resource offers on all executors
    def makeOffers() {
      launchTasks(scheduler.resourceOffers(
        executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
    }

    // Make fake resource offers on just one executor
// 可以看到这里传给scheduler.resourceOffers的WorkOffer,是根据之前已经分布好的executor静态生成的
// 而不是动态得到的workeroffer, 如果用mesos, 这里应该是动态获取workeroffer, 然后传给scheduler.resourceOffers
    def makeOffers(executorId: String) {
      launchTasks(scheduler.resourceOffers(
        Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
    }

    // Launch tasks returned by a set of resource offers
    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        freeCores(task.executorId) -= 1
        executorActor(task.executorId) ! LaunchTask(task) // launch就是给executorActor发送LaunchTask event
      }
    }

    // Remove a disconnected slave from the cluster
    def removeExecutor(executorId: String, reason: String) {
      if (executorActor.contains(executorId)) {
        logInfo("Executor " + executorId + " disconnected, so removing it")
        val numCores = freeCores(executorId)
        actorToExecutorId -= executorActor(executorId)
        addressToExecutorId -= executorAddress(executorId)
        executorActor -= executorId
        executorHost -= executorId
        freeCores -= executorId
        totalCoreCount.addAndGet(-numCores)
        scheduler.executorLost(executorId, SlaveLost(reason))
      }
    }
  }

 

SparkDeploySchedulerBackend

关键就是创建和管理Driver and Client Actor

private[spark] class SparkDeploySchedulerBackend(
    scheduler: ClusterScheduler,
    sc: SparkContext,
    master: String,
    appName: String)
  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
  with ClientListener
  with Logging {
  var client: Client = null
  override def start() {
    super.start() // 调用StandaloneSchedulerBackend的start,创建DriverActor

    // The endpoint for executors to talk to us
    val driverUrl = "akka://spark@%s:%s/user/%s".format(
      System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
      StandaloneSchedulerBackend.ACTOR_NAME)
    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
    val command = Command(  // 生成worker中ExecutorRunner中执行的command, 其实就是运行StandaloneExecutorBackend
      "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
    val sparkHome = sc.getSparkHome().getOrElse(null)
    val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
        "http://" + sc.ui.appUIAddress) // 生成application description

    client = new Client(sc.env.actorSystem, master, appDesc, this) // 创建Client Actor, 并start
    client.start()
  }

  override def stop() {
    stopping = true
    super.stop()
    client.stop()
    if (shutdownCallback != null) {
      shutdownCallback(this)
    }
  }
}

本文章摘自博客园,原文发布日期: 2014-01-03
时间: 2024-09-17 04:08:15

Spark源码分析 – SchedulerBackend的相关文章

Spark源码分析 -- TaskScheduler

Spark在设计上将DAGScheduler和TaskScheduler完全解耦合, 所以在资源管理和task调度上可以有更多的方案 现在支持, LocalSheduler, ClusterScheduler, MesosScheduler, YarnClusterScheduler 先分析ClusterScheduler, 即standalone的Spark集群上, 因为比较单纯不涉及其他的系统, 看看Spark的任务是如何被执行的   private var taskScheduler: T

Spark源码分析之七:Task运行(一)

        在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq

Spark源码分析之五:Task调度(一)

        在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那

Spark源码分析 – Shuffle

参考详细探究Spark的shuffle实现, 写的很清楚, 当前设计的来龙去脉   Hadoop Hadoop的思路是, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件  所以Hadoop后面直到reduce之前做的所有的事情其实就是不断的merge, 基于文件的多路并归排序, 在map端的将相同partition的merge到

Spark源码分析 – BlockManager

参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block  所以storage模块,就是要实现RDD在memory和disk上的persistent功能 首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave  master负责trac

Spark源码分析 – SparkContext

Spark源码分析之-scheduler模块  这位写的非常好, 让我对Spark的源码分析, 变的轻松了许多  这里自己再梳理一遍 先看一个简单的spark操作, val sc = new SparkContext(--) val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()   1. SparkContext 这是Spa

Spark源码分析之二:Job的调度模型与运行反馈

        在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈.         首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop