Spark源码分析之二:Job的调度模型与运行反馈

        在《Spark源码分析之Job提交运行总流程概述》一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:

        1、Job的调度模型与运行反馈;

        2、Stage划分;

        3、Stage提交:对应TaskSet的生成。

        今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈。

        首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行。入口方法为DAGScheduler的runJon()方法。代码如下:

/**
   * Run an action job on the given RDD and pass all the results to the resultHandler function as
   * they arrive.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @throws Exception when the job fails
   */
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {

    // 开始时间
    val start = System.nanoTime

    // 调用submitJob()方法,提交Job,返回JobWaiter
    // rdd为最后一个rdd,即target RDD to run tasks on
    // func为该rdd上每个分区需要执行的函数,a function to run on each partition of the RDD
    // partitions为该rdd上需要执行操作的分区集合,set of partitions to run on
    // callSite为用户程序job被调用的地方,where in the user program this job was called
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

    // JobWaiter调用awaitResult()方法等待结果
    waiter.awaitResult() match {
      case JobSucceeded => // Job运行成功
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case JobFailed(exception: Exception) =>// Job运行失败
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

        runJob()方法就做了三件事:

        首先,获取开始时间,方便最后计算Job执行时间;

        其次,调用submitJob()方法,提交Job,返回JobWaiter类型的对象waiter;

        最后,waiter调用JobWaiter的awaitResult()方法等待Job运行结果,这个运行结果就俩:JobSucceeded代表成功,JobFailed代表失败。

        awaitResult()方法通过轮询标志位_jobFinished,如果为false,则调用this.wait()继续等待,否则说明Job运行完成,返回JobResult,其代码如下:

def awaitResult(): JobResult = synchronized {

    // 循环,如果标志位_jobFinished为false,则一直循环,否则退出,返回JobResult
    while (!_jobFinished) {
      this.wait()
    }
    return jobResult
  }

        而这个标志位_jobFinished是在Task运行完成后,如果已完成Task数目等于总Task数目时,或者整个Job运行失败时设置的,随着标志位的设置,Job运行结果jobResult也同步进行设置,代码如下:

// 任务运行完成
  override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
    if (_jobFinished) {
      throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
    }
    resultHandler(index, result.asInstanceOf[T])
    finishedTasks += 1
    // 已完成Task数目是否等于总Task数目
    if (finishedTasks == totalTasks) {
      // 设置标志位_jobFinished为ture
      _jobFinished = true
      // 作业运行结果为成功
      jobResult = JobSucceeded
      this.notifyAll()
    }
  }

  // 作业失败
  override def jobFailed(exception: Exception): Unit = synchronized {
    // 设置标志位_jobFinished为ture
    _jobFinished = true
    // 作业运行结果为失败
    jobResult = JobFailed(exception)
    this.notifyAll()
  }

        接下来,看看submitJob()方法,代码定义如下:

/**
   * Submit an action job to the scheduler.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @return a JobWaiter object that can be used to block until the job finishes executing
   *         or can be used to cancel the job.
   *
   * @throws IllegalArgumentException when partitions ids are illegal
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {

    // Check to make sure we are not launching a task on a partition that does not exist.
    // 检测rdd分区以确保我们不会在一个不存在的partition上launch一个task
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    // 为Job生成一个jobId,jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增
    val jobId = nextJobId.getAndIncrement()

    // 如果partitions大小为0,即没有需要执行任务的分区,快速返回
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)

    // func转化下,否则JobSubmitted无法接受这个func参数,T转变为_
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]

    // 创建一个JobWaiter对象
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

    // eventProcessLoop加入一个JobSubmitted事件到事件队列中
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))

    // 返回JobWaiter
    waiter
  }

        submitJob()方法一共做了5件事情:

        第一,数据检测,检测rdd分区以确保我们不会在一个不存在的partition上launch一个task,并且,如果partitions大小为0,即没有需要执行任务的分区,快速返回;

        第二,为Job生成一个jobId,该jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增;

        第三,将func转化下,否则JobSubmitted无法接受这个func参数,T转变为_;

        第四,创建一个JobWaiter对象waiter,该对象会在方法结束时返回给上层方法,以用来监测Job运行结果;

        第五,将一个JobSubmitted事件加入到事件队列eventProcessLoop中,等待工作线程轮询调度(速度很快)。

        这里,我们有必要研究下事件队列eventProcessLoop,eventProcessLoop为DAGSchedulerEventProcessLoop类型的,在DAGScheduler初始化时被定义并赋值,代码如下:

// 创建DAGSchedulerEventProcessLoop类型的成员变量eventProcessLoop
  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

        DAGSchedulerEventProcessLoop继承自EventLoop,我们先来看看这个EventLoop的定义。

/**
 * An event loop to receive events from the caller and process all events in the event thread. It
 * will start an exclusive event thread to process all events.
 * EventLoop用来接收来自调用者的事件并在event thread中除了所有的事件。它将开启一个专门的事件处理线程处理所有的事件。
 *
 * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
 * handle events in time to avoid the potential OOM.
 */
private[spark] abstract class EventLoop[E](name: String) extends Logging {

  // LinkedBlockingDeque类型的事件队列,队列元素为E类型
  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  // 标志位
  private val stopped = new AtomicBoolean(false)

  // 事件处理线程
  private val eventThread = new Thread(name) {
    // 设置为后台线程
    setDaemon(true)

    override def run(): Unit = {
      try {
        // 如果标志位stopped没有被设置为true,一直循环
        while (!stopped.get) {
          // 从事件队列中take一条事件
          val event = eventQueue.take()
          try {
            // 调用onReceive()方法进行处理
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

  def stop(): Unit = {
    if (stopped.compareAndSet(false, true)) {
      eventThread.interrupt()
      var onStopCalled = false
      try {
        eventThread.join()
        // Call onStop after the event thread exits to make sure onReceive happens before onStop
        onStopCalled = true
        onStop()
      } catch {
        case ie: InterruptedException =>
          Thread.currentThread().interrupt()
          if (!onStopCalled) {
            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
            // it's already called.
            onStop()
          }
      }
    } else {
      // Keep quiet to allow calling `stop` multiple times.
    }
  }

  /**
   * Put the event into the event queue. The event thread will process it later.
   * 将事件加入到时间队列。事件线程过会会处理它。
   */
  def post(event: E): Unit = {
    // 将事件加入到待处理队列
    eventQueue.put(event)
  }

  /**
   * Return if the event thread has already been started but not yet stopped.
   */
  def isActive: Boolean = eventThread.isAlive

  /**
   * Invoked when `start()` is called but before the event thread starts.
   */
  protected def onStart(): Unit = {}

  /**
   * Invoked when `stop()` is called and the event thread exits.
   */
  protected def onStop(): Unit = {}

  /**
   * Invoked in the event thread when polling events from the event queue.
   *
   * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
   * and cannot process events in time. If you want to call some blocking actions, run them in
   * another thread.
   */
  protected def onReceive(event: E): Unit

  /**
   * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
   * will be ignored.
   */
  protected def onError(e: Throwable): Unit

}

        我们可以看到,EventLoop实际上就是一个任务队列及其对该队列一系列操作的封装。在它内部,首先定义了一个LinkedBlockingDeque类型的事件队列,队列元素为E类型,其中DAGSchedulerEventProcessLoop存储的则是DAGSchedulerEvent类型的事件,代码如下:

// LinkedBlockingDeque类型的事件队列,队列元素为E类型
  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

        并提供了一个后台线程,专门对事件队列里的事件进行监控,并调用onReceive()方法进行处理,代码如下:

// 事件处理线程
  private val eventThread = new Thread(name) {
    // 设置为后台线程
    setDaemon(true)

    override def run(): Unit = {
      try {
        // 如果标志位stopped没有被设置为true,一直循环
        while (!stopped.get) {
          // 从事件队列中take一条事件
          val event = eventQueue.take()
          try {
            // 调用onReceive()方法进行处理
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

        那么如何向队列中添加事件呢?调用其post()方法,传入事件即可。如下:

/**
   * Put the event into the event queue. The event thread will process it later.
   * 将事件加入到时间队列。事件线程过会会处理它。
   */
  def post(event: E): Unit = {
    // 将事件加入到待处理队列
    eventQueue.put(event)
  }

        言归正传,上面提到,submitJob()方法利用eventProcessLoop的post()方法加入一个JobSubmitted事件到事件队列中,那么DAGSchedulerEventProcessLoop对于JobSubmitted事件是如何处理的呢?我们看它的onReceive()方法,源码如下:

/**
   * The main event loop of the DAG scheduler.
   * DAGScheduler中事件主循环
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      // 调用doOnReceive()方法,将DAGSchedulerEvent类型的event传递进去
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

        继续看doOnReceive()方法,代码如下:

// 事件处理调度函数
  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

    // 如果是JobSubmitted事件,调用dagScheduler.handleJobSubmitted()方法处理
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    // 如果是MapStageSubmitted事件,调用dagScheduler.handleMapStageSubmitted()方法处理
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)

    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

        对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。

        好了,到这里,第一阶段Job的调度模型与运行反馈大体已经分析完了,至于后面的第二、第三阶段,留待后续博文继续分析吧~

时间: 2024-10-29 21:17:49

Spark源码分析之二:Job的调度模型与运行反馈的相关文章

Spark源码分析之九:内存管理模型

        Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Spark内存管理模型的神秘面纱.         我们在<Spark源码分析之七:Task运行(一)>一文中曾经提到过,在Task被传递到Executor上去执行时,在为其分配的TaskRunner线程的run()方法内,在Task真正运行之前,我们就要构造一个任务内存管理器Task

Spark源码分析之八:Task运行(二)

        在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤:         Step1:Task及其运行时需要的辅助对象构造,主要包括:                        1.当前线程设置上下文类加载器:        

Spark源码分析之七:Task运行(一)

        在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那

Spark源码分析 – Shuffle

参考详细探究Spark的shuffle实现, 写的很清楚, 当前设计的来龙去脉   Hadoop Hadoop的思路是, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件  所以Hadoop后面直到reduce之前做的所有的事情其实就是不断的merge, 基于文件的多路并归排序, 在map端的将相同partition的merge到

Spark源码分析 – BlockManager

参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block  所以storage模块,就是要实现RDD在memory和disk上的persistent功能 首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave  master负责trac

Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

一.综述       HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode.DataNode.DFSClient等众多角色的分工与合作.       首先上一段代码,客户端是如何写文件的: Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path file = new Path("demo.txt"); FSDataOutputStream

Spark源码分析 – SparkContext

Spark源码分析之-scheduler模块  这位写的非常好, 让我对Spark的源码分析, 变的轻松了许多  这里自己再梳理一遍 先看一个简单的spark操作, val sc = new SparkContext(--) val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()   1. SparkContext 这是Spa