Spark源码分析之三:Stage划分

        继上篇《Spark源码分析之Job的调度模型与运行反馈》之后,我们继续来看第二阶段--Stage划分。

        Stage划分的大体流程如下图所示:

        前面提到,对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。那么我们先来看下代码:

// 处理Job提交的函数
  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null

    // 利用最后一个RDD(finalRDD),创建最后的stage对象:finalStage
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      // 根据最后一个RDD获取最后的stage
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    // 创建一个ActiveJob对象
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

    // 清除RDD分区位置缓存
    // private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
    clearCacheLocs()

    // 调用logInfo()方法记录日志信息
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()

    // 将jobId-->ActiveJob的对应关系添加到HashMap类型的数据结构jobIdToActiveJob中去
    jobIdToActiveJob(jobId) = job

    // 将ActiveJob添加到HashSet类型的数据结构activeJobs中去
    activeJobs += job

    finalStage.setActiveJob(job)

    //2 获取stageIds列表
    // jobIdToStageIds存储的是jobId--stageIds的对应关系
    // stageIds为HashSet[Int]类型的
    // jobIdToStageIds在上面newResultStage过程中已被处理
    val stageIds = jobIdToStageIds(jobId).toArray
    // stageIdToStage存储的是stageId-->Stage的对应关系
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

    // 提交最后一个stage
    submitStage(finalStage)

    // 提交其他正在等待的stage
    submitWaitingStages()
  }

        这个handleJobSubmitted()方法一共做了这么几件事:

        第一,调用newResultStage()方法,生成Stage,包括最后一个Stage:ResultStage和前面的Parent Stage:ShuffleMapStage;

        第二,创建一个ActiveJob对象job;

        第三,清除RDD分区位置缓存;

        第四,调用logInfo()方法记录日志信息;

        第五,维护各种数据对应关系涉及到的数据结构:

        (1)将jobId-->ActiveJob的对应关系添加到HashMap类型的数据结构jobIdToActiveJob中去;

        (2)将ActiveJob添加到HashSet类型的数据结构activeJobs中去;

        第六,提交Stage;

        下面,除了提交Stage留在第三阶段外,我们挨个分析第二阶段的每一步。

        首先是调用newResultStage()方法,生成Stage,包括最后一个Stage:ResultStage和前面的Parent Stage:ShuffleMapStage。代码如下:

/**
   * Create a ResultStage associated with the provided jobId.
   * 用提供的jobId创建一个ResultStage
   */
  private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {

    // 根据fianl RDD获取parent stage及id,这个id为ResultStage的stageId
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)

    // 创建一个ResultStage,即为整个Job的finalStage
    // 参数:id为stage的id,rdd为stage中最后一个rdd,func为在分区上执行的函数操作,
    // partitions为rdd中可以执行操作的分区,parentStages为该stage的父stages,jobId为该stage
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)

    // 将stage加入到stageIdToStage中
    stageIdToStage(id) = stage

    // 更新数据结构jobIdToStageIds
    updateJobIdStageIdMaps(jobId, stage)

    // 返回stage
    stage
  }

        首先,根据fianl RDD获取parent stages及id,这个id为ResultStage的stageId;

        其次,创建一个ResultStage,即为整个Job的finalStage;

        然后,将stage加入到数据结构stageIdToStage中;

        接着,更新数据结构jobIdToStageIds;

        最后,返回这个ResultStage。

        我们一步步来看。首先调用getParentStagesAndId()方法,根据fianl RDD获取parent stages及id,这个id为ResultStage的stageId。代码如下:

/**
   * Helper function to eliminate some code re-use when creating new stages.
   */
  private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    // 获取parent stages
    val parentStages = getParentStages(rdd, firstJobId)

    // 获取下一个stageId,为AtomicInteger类型,getAndIncrement()能保证原子操作
    val id = nextStageId.getAndIncrement()

    // 返回parentStages和id
    (parentStages, id)
  }

        这个id即为下一个stageId,通过AtomicInteger类型的getAndIncrement()获得,能够保证原子性。继续分析getParentStages()方法,通过它来获取final RDD的parent stage。代码如下:

/**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    // 用HashSet存储parents stage
    val parents = new HashSet[Stage]

    // 用HashSet存储已经被访问过的RDD
    val visited = new HashSet[RDD[_]]

    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    // 存储需要被处理的RDD。Stack中得RDD都需要被处理
    val waitingForVisit = new Stack[RDD[_]]

    // 定义一个visit函数,根据传入的RDD,如果之前没有处理过,标记为已处理,循环此RDD的依赖关系dependencies
    // 如果是ShuffleDependency,获取其parents;如果不是,则说明为同一stage,并压入Stack:waitingForVisit顶部
    def visit(r: RDD[_]) {
      if (!visited(r)) {// visited中没有的话
        // 将RDD r加入到visited,表示已经处理过了
        visited += r

        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        // 循环Rdd r的依赖关系
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              // 如果是ShuffleDependency,获取其parents,添加到parents中去
              parents += getShuffleMapStage(shufDep, firstJobId)
            case _ =>
              // 否则,属于同一个stage,压入Stack顶部,后续再递归处理
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }

    // 将rdd压入Stack顶部
    waitingForVisit.push(rdd)

    // 循环waitingForVisit,弹出每个rdd
    while (waitingForVisit.nonEmpty) {
      // 调用visit()方法,处理每个rdd
      visit(waitingForVisit.pop())
    }

    // 返回得到的parents列表
    parents.toList
  }

        getParentStages()方法在其内部定义了如下数据结构:

        parents:用HashSet存储parents stages,即finalRDD的所有parent stages,也就是ShuffleMapStage;

        visited:用HashSet存储已经被访问过的RDD,在RDD被处理前先存入该HashSet,保证存储在里面的RDD将不会被重复处理;

        waitingForVisit:存储需要被处理的RDD。Stack中得RDD都需要被处理。

        getParentStages()方法在其内部还定义了一个visit()方法,传入一个RDD,如果之前没有处理过,标记为已处理,并循环此RDD的依赖关系dependencies,如果是ShuffleDependency,调用getShuffleMapStage()方法获取其parent stage;如果不是,则说明为同一stage,并压入Stack:waitingForVisit顶部,等待后续通过visit()方法处理。所以,getParentStages()方法从finalRDD开始,逐渐往上查找,如果是窄依赖,证明在同一个Stage中,继续往上查找,如果是宽依赖,通过getShuffleMapStage()方法获取其parent
stage,就能得到整个Job中所有的parent stages,也就是ShuffleMapStage。

        接下来,我们看下getShuffleMapStage()方法的实现。代码如下:

/**
   * Get or create a shuffle map stage for the given shuffle dependency's map side.
   * 针对给定的shuffle dependency的map端,获取或者创建一个ShuffleMapStage
   */
  private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {

    // 从数据结构shuffleToMapStage中根据shuffleId获取,如果有直接返回,否则
    // 获取ShuffleDependency中的rdd,调用getAncestorShuffleDependencies()方法,
    // 循环每个parent,调用newOrUsedShuffleStage()方法,创建一个新的ShuffleMapStage,
    // 并加入到数据结构shuffleToMapStage中去
    //
    // 它的定义为:private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage // 有则直接返回
      case None => // 没有
        // We are going to register ancestor shuffle dependencies
        // 调用getAncestorShuffleDependencies()方法,传入ShuffleDependency中的rdd

        // 发现还没有在shuffleToMapStage中注册的祖先shuffle dependencies
        getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // 并循环返回的parents,调用newOrUsedShuffleStage()方法,创建一个新的ShuffleMapStage,
          // 并加入到数据结构shuffleToMapStage中去
          shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
        }

        // Then register current shuffleDep
        // 最后注册当前shuffleDep,并加入到数据结构shuffleToMapStage中,返回stage
        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
        shuffleToMapStage(shuffleDep.shuffleId) = stage
        stage
    }
  }

        从getShuffleMapStage()方法的注释就能看出,这个方法的主要作用就是针对给定的shuffle dependency的map端,获取或者创建一个ShuffleMapStage。为何是Get or create呢?通过源码得知,getShuffleMapStage()方法首先会根据shuffleDep.shuffleId从数据结构shuffleToMapStage中查找哦是否存在对应的stage,如果存在则直接返回,如果不存在,则调用newOrUsedShuffleStage()方法创建一个Stage并添加到数据结构shuffleToMapStage中,方便后续需要使用此Stage者直接使用。在此之前,会根据入参ShuffleDependency的rdd发现还没有在shuffleToMapStage中注册的祖先shuffle
dependencies,然后遍历每个ShuffleDependency,调用newOrUsedShuffleStage()方法为每个ShuffleDependency产生Stage并添加到数据结构shuffleToMapStage中。

        下面,我们看下这个getAncestorShuffleDependencies()方法的实现,代码如下:

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
  // 根据传入的RDD,发现还没有在shuffleToMapStage中未注册过的祖先shuffle dependencies
  private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {

    // 存放parents的栈:Stack
    val parents = new Stack[ShuffleDependency[_, _, _]]

    // 存放已经处理过的RDD的哈希表:HashSet
    val visited = new HashSet[RDD[_]]

    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    // 存放等待调用visit的RDD的栈:Stack
    val waitingForVisit = new Stack[RDD[_]]

    // 定义方法visit()
    def visit(r: RDD[_]) {
      if (!visited(r)) {// 如果之前没有处理过
        visited += r // 标记为已处理

        // 循环RDD的所有依赖
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => // 如果是ShuffleDependency
              // 如果shuffleToMapStage中没有,添加到parents中
              if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
                parents.push(shufDep)
              }
            case _ =>
          }

          // 将该dependence的rdd压入waitingForVisit栈顶部
          waitingForVisit.push(dep.rdd)
        }
      }
    }

    // 将RDD压入到waitingForVisit顶部
    waitingForVisit.push(rdd)
    // 循环waitingForVisit,针对每个RDD调用visit()方法
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }

    // 返回parents
    parents
  }

        通过代码我们可以发现,它和getParentStages()方法的代码风格非常相似。在其内部也定义了三个数据结构:

        parents:存放parents的栈,即Stack,用于存放入参RDD的在shuffleToMapStage中未注册过的祖先shuffle dependencies;

        visited:存放已经处理过的RDD的哈希表,即HashSet;

        waitingForVisit:存放等待被处理的RDD的栈,即Stack;

        定义了一个visit()方法,入参为RDD,针对传入的RDD,如果之前没有处理过则标记为已处理,并循环RDD的所有依赖,如果是如果是ShuffleDependency,并且其依赖的shuffleId在shuffleToMapStage中没有,添加到parents中,否则直接跳过,最后无论为何种Dependency,都将该dependence的rdd压入waitingForVisit栈顶部,等待后续处理。

        接下来,我们再看下newOrUsedShuffleStage()方法,其代码如下:

/**
   * Create a shuffle map Stage for the given RDD.  The stage will also be associated with the
   * provided firstJobId.  If a stage for the shuffleId existed previously so that the shuffleId is
   * present in the MapOutputTracker, then the number and location of available outputs are
   * recovered from the MapOutputTracker
   *
   * 为给定的RDD创建一个ShuffleStage
   */
  private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {

    // 从shuffleDep中获取RDD
    val rdd = shuffleDep.rdd

    // 获取RDD的分区个数,即未来的task数目
    val numTasks = rdd.partitions.length

    // 构造一个ShuffleMapStage实例
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)

    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // 如果mapOutputTracker中存在

      // 根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)

      // 反序列化
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)

      // 循环
      (0 until locs.length).foreach { i =>
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          // 将
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else {
      // 如果mapOutputTracker中不存在,注册一个

      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      // 注册的内容为
      // 1、根据shuffleDep获取的shuffleId;
      // 2、rdd中分区的个数
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

        这个方法的主要完成了以下两件事:

        1、构造一个ShuffleMapStage实例stage;

        2、判断是否在mapOutputTracker中存在:

           (1)如果不存在,调用mapOutputTracker的registerShuffle()方法注册一个,注册的内容为根据shuffleDep获取的shuffleId和rdd中分区的个数;

           (2)如果存在,根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象,反序列化后循环,逐个添加到stage中。

        紧接着,看下newShuffleMapStage()方法,其代码如下:

/**
   * Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
   * newOrUsedShuffleStage.  The stage will be associated with the provided firstJobId.
   * Production of shuffle map stages should always use newOrUsedShuffleStage, not
   * newShuffleMapStage directly.
   */
  private def newShuffleMapStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int,
      callSite: CallSite): ShuffleMapStage = {

    // 获得parentStages和下一个stageId
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)

    // 创建一个ShuffleMapStage
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
      firstJobId, callSite, shuffleDep)

    // 将stage加入到数据结构stageIdToStage
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(firstJobId, stage)
    stage
  }

        可以发现,这个方法也调用了getParentStagesAndId()方法,这样,就形成了一个递归,按照RDD的依赖关系,由后往前,逐渐生成Stage。代码剩余的部分就是创建一个ShuffleMapStage,并将stage加入到数据结构stageIdToStage,以及调用updateJobIdStageIdMaps()方法更新相关数据结构。这个updateJobIdStageIdMaps()方法留待下面分析。

        下面,简单看下mapOutputTracker注册的代码。

// 注册shuffle
  def registerShuffle(shuffleId: Int, numMaps: Int) {
    // 将shuffleId、numMaps大小和MapStatus类型的Array数组的映射关系,放入mapStatuses中
    // mapStatuses为TimeStampedHashMap[Int, Array[MapStatus]]类型的数据结构
    if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
      throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
    }
  }

        很简单,将shuffleId、numMaps大小和MapStatus类型的Array数组的映射关系,放入mapStatuses中,mapStatuses为TimeStampedHashMap[Int, Array[MapStatus]]类型的数据结构。
       经历了这多又长又大篇幅的叙述,现在返回newResultStage()方法,在通过getParentStagesAndId()方法获取parent stages及其result stage的id后,紧接着创建一个ResultStage,并将stage加入到stageIdToStage中,最后在调用updateJobIdStageIdMaps()更新数据结构jobIdToStageIds后,返回stage。

        下面,简单看下updateJobIdStageIdMaps()方法。代码如下:

/**
   * Registers the given jobId among the jobs that need the given stage and
   * all of that stage's ancestors.
   */
  private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
    // 定义一个函数updateJobIdStageIdMapsList()
    def updateJobIdStageIdMapsList(stages: List[Stage]) {

      if (stages.nonEmpty) {

        // 获取列表头元素
        val s = stages.head

        // 将jobId添加到Stage的jobIds中
        s.jobIds += jobId

        // 更新jobIdToStageIds,将jobId与stageIds的对应关系添加进去
        jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id

        val parents: List[Stage] = getParentStages(s.rdd, jobId)

        val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }
        updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
      }
    }
    // 调用函数updateJobIdStageIdMapsList()
    updateJobIdStageIdMapsList(List(stage))
  }

        这个方法的实现比较简单,在其内部定义了一个函数updateJobIdStageIdMapsList(),首选传入result stage,将jobId添加到stage的jobIds中,更新jobIdToStageIds,将jobId与stageIds的对应关系添加进去,然后根据给定stage的RDD获取其parent stages,过滤出不包含此JobId的parents stages,再递归调用updateJobIdStageIdMapsList()方法,直到全部stage都处理完。

        至此,第二阶段Stage划分大体流程已分析完毕,有遗漏或不清楚的地方,以后再查缺补漏以及细化及更正错误。

时间: 2024-12-24 08:10:44

Spark源码分析之三:Stage划分的相关文章

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那

Spark源码分析之五:Task调度(一)

        在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分

Spark源码分析 – Shuffle

参考详细探究Spark的shuffle实现, 写的很清楚, 当前设计的来龙去脉   Hadoop Hadoop的思路是, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件  所以Hadoop后面直到reduce之前做的所有的事情其实就是不断的merge, 基于文件的多路并归排序, 在map端的将相同partition的merge到

Spark源码分析之二:Job的调度模型与运行反馈

        在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈.         首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop

Spark源码分析 – Dependency

Dependency 依赖, 用于表示RDD之间的因果关系, 一个dependency表示一个parent rdd, 所以在RDD中使用Seq[Dependency[_]]来表示所有的依赖关系   Dependency的base class  可见Dependency唯一的成员就是rdd, 即所依赖的rdd, 或parent rdd /** * Base class for dependencies. */ abstract class Dependency[T](val rdd: RDD[T]

Spark源码分析之七:Task运行(一)

        在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析 – SparkContext

Spark源码分析之-scheduler模块  这位写的非常好, 让我对Spark的源码分析, 变的轻松了许多  这里自己再梳理一遍 先看一个简单的spark操作, val sc = new SparkContext(--) val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()   1. SparkContext 这是Spa

Spark源码分析 -- TaskScheduler

Spark在设计上将DAGScheduler和TaskScheduler完全解耦合, 所以在资源管理和task调度上可以有更多的方案 现在支持, LocalSheduler, ClusterScheduler, MesosScheduler, YarnClusterScheduler 先分析ClusterScheduler, 即standalone的Spark集群上, 因为比较单纯不涉及其他的系统, 看看Spark的任务是如何被执行的   private var taskScheduler: T