Spark技术内幕:Stage划分及提交源码分析

当触发一个RDD的action后,以count为例,调用关系如下:

  1. org.apache.spark.rdd.RDD#count
  2. org.apache.spark.SparkContext#runJob
  3. org.apache.spark.scheduler.DAGScheduler#runJob
  4. org.apache.spark.scheduler.DAGScheduler#submitJob
  5. org.apache.spark.scheduler.DAGSchedulerEventProcessActor#receive(JobSubmitted)
  6. org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted

其中步骤五的DAGSchedulerEventProcessActor是DAGScheduler 的与外部交互的接口代理,DAGScheduler在创建时会创建名字为eventProcessActor的actor。这个actor的作用看它的实现就一目了然了:

  /**
   * The main event loop of the DAG scheduler.
   */
  def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties) // 提交job,来自与RDD->SparkContext->DAGScheduler的消息。之所以在这需要在这里中转一下,是为了模块功能的一致性。

    case StageCancelled(stageId) => // 消息源org.apache.spark.ui.jobs.JobProgressTab,在GUI上显示一个SparkContext的Job的执行状态。
      // 用户可以cancel一个Stage,会通过SparkContext->DAGScheduler 传递到这里。
      dagScheduler.handleStageCancellation(stageId)

    case JobCancelled(jobId) => // 来自于org.apache.spark.scheduler.JobWaiter的消息。取消一个Job
      dagScheduler.handleJobCancellation(jobId)

    case JobGroupCancelled(groupId) => // 取消整个Job Group
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled => //取消所有Job
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) => // TaskScheduler得到一个Executor被添加的消息。具体来自org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) => //来自TaskScheduler
      dagScheduler.handleExecutorLost(execId)

    case BeginEvent(task, taskInfo) => // 来自TaskScheduler
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) => //处理获得TaskResult信息的消息
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => //来自TaskScheduler,报告task是完成或者失败
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason) => //来自TaskScheduler,要么TaskSet失败次数超过阈值或者由于Job Cancel。
      dagScheduler.handleTaskSetFailed(taskSet, reason)

    case ResubmitFailedStages => //当一个Stage处理失败时,重试。来自org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion
      dagScheduler.resubmitFailedStages()
  }

总结一下org.apache.spark.scheduler.DAGSchedulerEventProcessActor的作用:可以把他理解成DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节,也更易于理解其逻辑;也降低了维护成本,将DAGScheduler的比较复杂功能接口化。

handleJobSubmitted

org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted首先会根据RDD创建finalStage。finalStage,顾名思义,就是最后的那个Stage。然后创建job,最后提交。提交的job如果满足一下条件,那么它将以本地模式运行:

1)spark.localExecution.enabled设置为true  并且 2)用户程序显式指定可以本地运行 并且 3)finalStage的没有父Stage 并且 4)仅有一个partition

3)和 4)的话主要为了任务可以快速执行;如果有多个stage或者多个partition的话,本地运行可能会因为本机的计算资源的问题而影响任务的计算速度。

要理解什么是Stage,首先要搞明白什么是Task。Task是在集群上运行的基本单位。一个Task负责处理RDD的一个partition。RDD的多个patition会分别由不同的Task去处理。当然了这些Task的处理逻辑完全是一致的。这一组Task就组成了一个Stage。有两种Task:

  1.  org.apache.spark.scheduler.ShuffleMapTask
  2.  org.apache.spark.scheduler.ResultTask

ShuffleMapTask根据Task的partitioner将计算结果放到不同的bucket中。而ResultTask将计算结果发送回Driver Application。一个Job包含了多个Stage,而Stage是由一组完全相同的Task组成的。最后的Stage包含了一组ResultTask。

在用户触发了一个action后,比如count,collect,SparkContext会通过runJob的函数开始进行任务提交。最后会通过DAG的event processor 传递到DAGScheduler本身的handleJobSubmitted,它首先会划分Stage,提交Stage,提交Task。至此,Task就开始在运行在集群上了。

一个Stage的开始就是从外部存储或者shuffle结果中读取数据;一个Stage的结束就是由于发生shuffle或者生成结果时。

创建finalStage

handleJobSubmitted 通过调用newStage来创建finalStage:

finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)

创建一个result stage,或者说finalStage,是通过调用org.apache.spark.scheduler.DAGScheduler#newStage完成的;而创建一个shuffle stage,需要通过调用org.apache.spark.scheduler.DAGScheduler#newOrUsedStage。 

private def newStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: Option[ShuffleDependency[_, _, _]],
      jobId: Int,
      callSite: CallSite)
    : Stage =
  {
    val id = nextStageId.getAndIncrement()
    val stage =
      new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

对于result 的final stage来说,传入的shuffleDep是None。

我们知道,RDD通过org.apache.spark.rdd.RDD#getDependencies可以获得它依赖的parent RDD。而Stage也可能会有parent Stage。看一个RDD论文的Stage划分吧:

一个stage的边界,输入是外部的存储或者一个stage shuffle的结果;输入则是Job的结果(result task对应的stage)或者shuffle的结果。

上图的话stage3的输入则是RDD A和RDD F shuffle的结果。而A和F由于到B和G需要shuffle,因此需要划分到不同的stage。

从源码实现的角度来看,通过触发action也就是最后一个RDD创建final stage(上图的stage 3),我们注意到new Stage的第五个参数就是该Stage的parent Stage:通过rdd和job id获取:

// 生成rdd的parent Stage。没遇到一个ShuffleDependency,就会生成一个Stage
  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage] //存储parent stage
    val visited = new HashSet[RDD[_]] //存储已经被访问到得RDD
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => // 在ShuffleDependency时需要生成新的stage
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              waitingForVisit.push(dep.rdd) //不是ShuffleDependency,那么就属于同一个Stage
          }
        }
      }
    }
    waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd
    while (!waitingForVisit.isEmpty) { //只要stack不为空,则一直处理。
      visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage
    }
    parents.toList
  }

生成了finalStage后,就需要提交Stage了。

  // 提交Stage,如果有parent Stage没有提交,那么递归提交它。
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      // 如果当前stage不在等待其parent stage的返回,并且 不在运行的状态, 并且 没有已经失败(失败会有重试机制,不会通过这里再次提交)
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing == Nil) { // 如果所有的parent stage都已经完成,那么提交该stage所包含的task
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) { // 有parent stage为完成,则递归提交它
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id)
    }
  }

DAGScheduler将Stage划分完成后,提交实际上是通过把Stage转换为TaskSet,然后通过TaskScheduler将计算任务最终提交到集群。其所在的位置如下图所示。

接下来,将分析Stage是如何转换为TaskSet,并最终提交到Executor去运行的。

BTW,最近工作太忙了,基本上到家洗漱完都要10点多。也再没有精力去进行源码解析了。幸运的是周末不用加班。因此以后的博文更新都要集中在周末了。加油。

时间: 2024-09-14 11:34:47

Spark技术内幕:Stage划分及提交源码分析的相关文章

Spark技术内幕: Task向Executor提交的源码解析

在上文<Spark技术内幕:Stage划分及提交源码分析>中,我们分析了Stage的生成和提交.但是Stage的提交,只是DAGScheduler完成了对DAG的划分,生成了一个计算拓扑,即需要按照顺序计算的Stage,Stage中包含了可以以partition为单位并行计算的Task.我们并没有分析Stage中得Task是如何生成并且最终提交到Executor中去的. 这就是本文的主题. 从org.apache.spark.scheduler.DAGScheduler#submitMissi

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那

Spark源码分析之三:Stage划分

        继上篇<Spark源码分析之Job的调度模型与运行反馈>之后,我们继续来看第二阶段--Stage划分.         Stage划分的大体流程如下图所示:         前面提到,对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理.那么我们先来看下代码: // 处理Job提交的函数 private[scheduler] def handleJobSubmitted(jobId: Int, finalRD

Spark技术内幕:Master的故障恢复

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现  详细阐述了使用ZK实现的Master的HA,那么Master是如何快速故障恢复的呢? 处于Standby状态的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent发送的ElectedLeader消息后,就开始通过ZK中保存的Application,Driver和Worker的元数据信息进行故障恢复了,它

深入理解Spark:核心思想与源码分析

大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术丛书) ISBN 978-7-111-52234-8 I. 深- II.耿- III.数据处理软件 IV. TP274 中国版本图书馆CIP数据核字(2015)第280808号 深入理解Spark:核心思想与源码分析 出版发行:机械工业出版社(北京市西城区百万庄大街22号 邮政编码:100037)

Spark源码分析之二:Job的调度模型与运行反馈

        在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈.         首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop

Spark源码分析之七:Task运行(一)

        在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq

Spark源码分析之五:Task调度(一)

        在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分

《深入理解SPARK:核心思想与源码分析》(第1章)

       自己牺牲了7个月的周末和下班空闲时间,通过研究Spark源码和原理,总结整理的<深入理解Spark:核心思想与源码分析>一书现在已经正式出版上市,目前亚马逊.京东.当当.天猫等网站均有销售,欢迎感兴趣的同学购买.我开始研究源码时的Spark版本是1.2.0,经过7个多月的研究和出版社近4个月的流程,Spark自身的版本迭代也很快,如今最新已经是1.6.0.目前市面上另外2本源码研究的Spark书籍的版本分别是0.9.0版本和1.2.0版本,看来这些书的作者都与我一样,遇到了这种问