Spark源码分析 -- SchedulableBuilder

SchedulableBuilder就是对Scheduleable tree的封装, 
在Pool层面(中间节点), 完成对TaskSet的调度(FIFO, FAIR) 
在TaskSetManager 层面(叶子节点), 完成对TaskSet中task的调度(locality)以及track(retry)

TaskSetManager

用于封装TaskSet, 主要提供对单个TaskSet内部的tasks的track和schedule 
所以主要的接口, 
resourceOffer, 对于一个resource offer, 如何schedule一个task来执行 
statusUpdate, 对于task状态的track

/**
 * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
 * each task and is responsible for retries on failure and locality. The main interfaces to it
 * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and
 * statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
 *
 * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler
 * (e.g. its event handlers). It should not be called from other threads.
 */
private[spark] trait TaskSetManager extends Schedulable {
  def schedulableQueue = null
  def schedulingMode = SchedulingMode.NONE
  def taskSet: TaskSet
  def resourceOffer(
      execId: String,
      host: String,
      availableCpus: Int,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription]
  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
  def error(message: String)
}

 

ClusterTaskSetManager

ClusterScheduler上对于TaskSetManager的实现

1 addPendingTask 
locality, 在schedule时候需要考虑, 应该优先执行尽可能近的task 
所有未被执行的tasks, 都是pending task, 并且是安装不同locality粒度存储在hashmap中的 
pendingTasksForExecutor, hashmap, 每个executor被指定的task 
pendingTasksForHost,  hashmap, 每个instance被指定的task 
pendingTasksForRack, hashmap, 每个机架被指定的task 
pendingTasksWithNoPrefs, ArrayBuffer, 没有locality preferences的tasks, 随便在那边执行 
allPendingTasks, ArrayBuffer, 所有的pending task 
speculatableTasks, 重复的task, 熟悉hadoop的应该容易理解 
可以继续看下addPendingTask, 如何把task加到各个list上去

addPendingTask(index: Int, readding: Boolean = false) 
两个参数, 
index, task的index, 用于从taskset中取得task 
readding, 表示是否新的task, 因为当executor失败的时候, 也需要把task重新再加到各个list中, list中有重复的task是没有关系的, 因为选取task的时候会自动忽略已经run的task

 

2 resourceOffer 
解决如何在taskset内部schedule一个task, 主要需要考虑的是locality, 直接看注释 
其中比较意思的是, 对currentLocalityIndex的维护 
初始时为0, PROCESS_LOCAL, 只能选择PendingTasksForExecutor 
每次调用resourceOffer, 都会计算和前一次task launch之间的时间间隔, 如果超时(各个locality的超时时间不同), currentLocalityIndex会加1, 即不断的放宽 
而代表前一次的lastLaunchTime, 只有在resourceOffer中成功的findTask时会被更新, 所以逻辑就是优先选择更local的task, 但当findTask总失败时, 说明需要放宽 
但是放宽后, 当有比较local的task被选中时, 这个currentLocalityIndex还会缩小, 因为每次都会把tasklocality赋值给currentLocality

 

3 statusUpdate 
应对statusUpdate, 主要是通过在clusterScheduler中注册的listener通知DAGScheduler 
当然对于失败的task, 还要再加到pending list里面去

/**
 * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
 * the status of each task, retries tasks if they fail (up to a limited number of times), and
 * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
 * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
 * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
 *
 * THREADING: This class is designed to only be called from code with a lock on the
 * ClusterScheduler (e.g. its event handlers). It should not be called from other threads.
 */
private[spark] class ClusterTaskSetManager(
    sched: ClusterScheduler,
    val taskSet: TaskSet,
    clock: Clock = SystemClock)
  extends TaskSetManager
  with Logging
{
  val tasks = taskSet.tasks
  val numTasks = tasks.length
  // Set of pending tasks for each executor. These collections are actually
  // treated as stacks, in which new tasks are added to the end of the
  // ArrayBuffer and removed from the end. This makes it faster to detect
  // tasks that repeatedly fail because whenever a task failed, it is put
  // back at the head of the stack. They are also only cleaned up lazily;
  // when a task is launched, it remains in all the pending lists except
  // the one that it was launched from, but gets removed from them later.
  private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
  // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
  // but at host level.
  private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
  // Set of pending tasks for each rack -- similar to the above.
  private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
  // Set containing pending tasks with no locality preferences.
  val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
  // Set containing all pending tasks (also used as a stack, as above).
  val allPendingTasks = new ArrayBuffer[Int]
  // Tasks that can be speculated. Since these will be a small fraction of total
  // tasks, we'll just hold them in a HashSet.
  val speculatableTasks = new HashSet[Int]
 

  // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
  val myLocalityLevels = computeValidLocalityLevels() // 当前TaskSet里面的task locality有哪些
  val localityWaits = myLocalityLevels.map(getLocalityWait) // 每个locality level默认的等待时间(从配置读)

  // Delay scheduling variables: we keep track of our current locality level and the time we
  // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
  // We then move down if we manage to launch a "more local" task.
  var currentLocalityIndex = 0    // 当前myLocalityLevels中的index, 从0开始, 从最小的开始schedule
  var lastLaunchTime = clock.getTime()  // 记录最后launch task的时间, 用于后面会算超时, 如果发生超时, currentLocalityIndex+1 
 
  /**
   * Add a task to all the pending-task lists that it should be on. If readding is set, we are
   * re-adding the task so only include it in each list if it's not already there.
   */
  private def addPendingTask(index: Int, readding: Boolean = false) {
    // Utility method that adds `index` to a list only if readding=false or it's not already there
    def addTo(list: ArrayBuffer[Int]) {
      if (!readding || !list.contains(index)) { // 新的的task或在该list里面没有
        list += index
      }
    }
    var hadAliveLocations = false
    for (loc <- tasks(index).preferredLocations) {
      for (execId <- loc.executorId) {
        if (sched.isExecutorAlive(execId)) {
          addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) // 首先加到相应的executor列表中
          hadAliveLocations = true
        }
      }
      if (sched.hasExecutorsAliveOnHost(loc.host)) {
        addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) // 加到host的列表中
        for (rack <- sched.getRackForHost(loc.host)) {
          addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) // 加到Rack的列表中
        }
        hadAliveLocations = true
      }
    }

    if (!hadAliveLocations) { // 如果上面的选择都失败了, 或本来就没有preferred locations, 那就加到pendingTasksWithNoPrefs中
      // Even though the task might've had preferred locations, all of those hosts or executors
      // are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
      addTo(pendingTasksWithNoPrefs)
    }

    if (!readding) { // 对于新的task, 需要加到allPendingTasks中
      allPendingTasks += index  // No point scanning this whole list to find the old task there
    }
  }
 
  /**
   * Dequeue a pending task for a given node and return its index and locality level.
   * Only search for tasks matching the given locality constraint.
   */
  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
  {
    // 先从Executor
    for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) { // findTaskFromList, Dequeue a pending task from the given list and return its index.
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }
    // Node, 需要先check locality
    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { // locality >= TaskLocality.NODE_LOCAL
      for (index <- findTaskFromList(getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL))
      }
    }
    // Rack, 需要先check locality
    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- findTaskFromList(getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL))
      }
    }
    // Look for no-pref tasks after rack-local tasks since they can run anywhere.
    for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }
    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
      for (index <- findTaskFromList(allPendingTasks)) {
        return Some((index, TaskLocality.ANY))
      }
    }
    // Finally, if all else has failed, find a speculative task
    return findSpeculativeTask(execId, host, locality)
  }

 

  /**
   * Respond to an offer of a single executor from the scheduler by finding a task
   */
  override def resourceOffer(
      execId: String,
      host: String,
      availableCpus: Int,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { // 前提是task没有执行完和有足够的available cores(>1)
      val curTime = clock.getTime()

      var allowedLocality = getAllowedLocalityLevel(curTime) // 取到当前allowed LocalityLevel
      if (allowedLocality > maxLocality) {  //  不能超出作为参数传入的maxLocality, 调用者限定
        allowedLocality = maxLocality   // We're not allowed to search for farther-away tasks
      }

      findTask(execId, host, allowedLocality) match { // 调用findTask, 并对返回值进行case, findTask逻辑很简单就是依次从不同的locality中取task
        case Some((index, taskLocality)) => {
          // Found a task; do some bookkeeping and return a task description
          val task = tasks(index)
          val taskId = sched.newTaskId()
          // Figure out whether this should count as a preferred launch
          logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format(
            taskSet.id, index, taskId, execId, host, taskLocality))
          // Do various bookkeeping
          copiesRunning(index) += 1
          val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
          taskInfos(taskId) = info
          taskAttempts(index) = info :: taskAttempts(index)
          // Update our locality level for delay scheduling
          currentLocalityIndex = getLocalityIndex(taskLocality) // 用当前Task的locality来更新currentLocalityIndex, 这里index有可能会减少, 因为taskLocality <= currentLocality 
          lastLaunchTime = curTime      // 更新lastLaunchTime 
          // Serialize and return the task
          val startTime = clock.getTime()
          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
          // we assume the task can be serialized without exceptions.
          val serializedTask = Task.serializeWithDependencies(
            task, sched.sc.addedFiles, sched.sc.addedJars, ser)
          val timeTaken = clock.getTime() - startTime
          increaseRunningTasks(1)
          logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
            taskSet.id, index, serializedTask.limit, timeTaken))
          val taskName = "task %s:%d".format(taskSet.id, index)
          if (taskAttempts(index).size == 1)
            taskStarted(task,info)
          return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) // 最终返回schedule得到的那个task
        }
        case _ =>
      }
    }
    return None
  }
 
  /**
   * Get the level we can launch tasks according to delay scheduling, based on current wait time.
   */
  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&  // 发生超时
        currentLocalityIndex < myLocalityLevels.length - 1)
    {
      // Jump to the next locality level, and remove our waiting time for the current one since
      // we don't want to count it again on the next one
      lastLaunchTime += localityWaits(currentLocalityIndex)
      currentLocalityIndex += 1   // currentLocalityIndex 加 1
    }
    myLocalityLevels(currentLocalityIndex)
  }
 
  /**
   * Find the index in myLocalityLevels for a given locality. This is also designed to work with
   * localities that are not in myLocalityLevels (in case we somehow get those) by returning the
   * next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY.
   */
  def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = { // 查询locality在myLocalityLevels中的index
    var index = 0
    while (locality > myLocalityLevels(index)) {
      index += 1
    }
    index
  }
  
  /**
   * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
   * added to queues using addPendingTask.
   */
  // 仅仅从各个pending list中看看当前的taskset中的task有哪些preference locality, 从小到大
  private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {
      levels += PROCESS_LOCAL
    }
    if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {
      levels += NODE_LOCAL
    }
    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
      levels += RACK_LOCAL
    }
    levels += ANY
    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
    levels.toArray
  }
  
  /** Called by cluster scheduler when one of our tasks changes state */
  override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
    SparkEnv.set(env)
    state match {
      case TaskState.FINISHED =>
        taskFinished(tid, state, serializedData)
      case TaskState.LOST =>
        taskLost(tid, state, serializedData)
      case TaskState.FAILED =>
        taskLost(tid, state, serializedData)
      case TaskState.KILLED =>
        taskLost(tid, state, serializedData)
      case _ =>
    }
  }
  def taskStarted(task: Task[_], info: TaskInfo) {
    sched.listener.taskStarted(task, info)
  }
}

 

Pool

一种对schedulableQueue的抽象, 什么是schedulable? 
注释说的, 包含Pools and TaskSetManagers, 这里设计有问题, 你会发现Pools和TaskSetManagers的核心接口完全不同, 虽然TaskSetManagers里面也实现了这些接口, 但都是meanless的 
简单理解成, 作者想要统一对待, 泛化Pools和TaskSetManagers, 所以这样做了

所以对于Pool, 可以理解为TaskSetManagers的容器, 当然由于Pool本身也是Schedulable, 所以容器里面也可以放Pool 
核心接口getSortedTaskSetQueue, 通过配置不同的SchedulingAlgorithm来调度TaskSetManagers(或pool)

所以注意那些FIFO或FAIR都是用来调度TaskSet的, 所以Spark调度的基础是stage

/**
 * An interface for schedulable entities.
 * there are two type of Schedulable entities(Pools and TaskSetManagers)
 */
private[spark] trait Schedulable {
  var parent: Schedulable
  // child queues
  def schedulableQueue: ArrayBuffer[Schedulable]
  def schedulingMode: SchedulingMode
  def weight: Int
  def minShare: Int
  def runningTasks: Int
  def priority: Int
  def stageId: Int
  def name: String

  def increaseRunningTasks(taskNum: Int): Unit
  def decreaseRunningTasks(taskNum: Int): Unit
  def addSchedulable(schedulable: Schedulable): Unit
  def removeSchedulable(schedulable: Schedulable): Unit
  def getSchedulableByName(name: String): Schedulable
  def executorLost(executorId: String, host: String): Unit
  def checkSpeculatableTasks(): Boolean
  def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
  def hasPendingTasks(): Boolean
}

 

package org.apache.spark.scheduler.cluster
//An Schedulable entity that represent collection of Pools or TaskSetManagers
private[spark] class Pool(
    val poolName: String,
    val schedulingMode: SchedulingMode,
    initMinShare: Int,
    initWeight: Int)
  extends Schedulable
  with Logging {

  var schedulableQueue = new ArrayBuffer[Schedulable] // 用于buffer Schedulable, TaskSetManager
  var schedulableNameToSchedulable = new HashMap[String, Schedulable]

  var priority = 0
  var stageId = 0
  var name = poolName
  var parent:Schedulable = null

  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { // SchedulingAlgorithm其实就是定义comparator,后面好将TaskSet排序
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm() // Fair
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm() // FIFO
    }
  }

  override def addSchedulable(schedulable: Schedulable) { // 增加一个TaskSetManager
    schedulableQueue += schedulable
    schedulableNameToSchedulable(schedulable.name) = schedulable
    schedulable.parent= this
  }

  override def removeSchedulable(schedulable: Schedulable) { // 删除一个TaskSetManager
    schedulableQueue -= schedulable
    schedulableNameToSchedulable -= schedulable.name
  }

  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { // 返回排过序的TaskSetManager列表
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) // sortWith
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() // 这里的schedulable有可能也是pool, 所以需要递归调用
    }
    return sortedTaskSetQueue
  }
}

 

SchedulableBuilder

上面说了Pool里面可以是TaskSetManagers也可以是pool, 这样是不是可以形成tree 
SchedulableBuilder就是对Schedulable Tree的封装, 通过TaskSetManagers(叶节点)和pools(中间节点), 来生成Schedulable Tree 
这里只列出最简单的FIFO, 看不出tree的感觉 
对于FIFO很简单, 直接使用一个Pool就可以, 把所有的TaskSet使用addSchedulable加进去, 然后排序读出来即可

这里没有列出Fair的实现, 比较复杂, 后面再分析吧

/**
 * An interface to build Schedulable tree
 * buildPools: build the tree nodes(pools)
 * addTaskSetManager: build the leaf nodes(TaskSetManagers)
 */
private[spark] trait SchedulableBuilder {
  def buildPools()
  def addTaskSetManager(manager: Schedulable, properties: Properties)
}

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
  extends SchedulableBuilder with Logging {

  override def buildPools() {
    // nothing
  }

  override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    rootPool.addSchedulable(manager)
  }
}

本文章摘自博客园,原文发布日期:2014-01-06
时间: 2024-09-14 17:16:19

Spark源码分析 -- SchedulableBuilder的相关文章

Spark源码分析 -- TaskScheduler

Spark在设计上将DAGScheduler和TaskScheduler完全解耦合, 所以在资源管理和task调度上可以有更多的方案 现在支持, LocalSheduler, ClusterScheduler, MesosScheduler, YarnClusterScheduler 先分析ClusterScheduler, 即standalone的Spark集群上, 因为比较单纯不涉及其他的系统, 看看Spark的任务是如何被执行的   private var taskScheduler: T

Spark源码分析之五:Task调度(一)

        在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那

Spark源码分析 – Shuffle

参考详细探究Spark的shuffle实现, 写的很清楚, 当前设计的来龙去脉   Hadoop Hadoop的思路是, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件  所以Hadoop后面直到reduce之前做的所有的事情其实就是不断的merge, 基于文件的多路并归排序, 在map端的将相同partition的merge到

Spark源码分析 – BlockManager

参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block  所以storage模块,就是要实现RDD在memory和disk上的persistent功能 首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave  master负责trac

Spark源码分析 – SparkContext

Spark源码分析之-scheduler模块  这位写的非常好, 让我对Spark的源码分析, 变的轻松了许多  这里自己再梳理一遍 先看一个简单的spark操作, val sc = new SparkContext(--) val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()   1. SparkContext 这是Spa

Spark源码分析之二:Job的调度模型与运行反馈

        在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈.         首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop

Spark源码分析 – Dependency

Dependency 依赖, 用于表示RDD之间的因果关系, 一个dependency表示一个parent rdd, 所以在RDD中使用Seq[Dependency[_]]来表示所有的依赖关系   Dependency的base class  可见Dependency唯一的成员就是rdd, 即所依赖的rdd, 或parent rdd /** * Base class for dependencies. */ abstract class Dependency[T](val rdd: RDD[T]