Spark 源码分析 -- task实际执行过程

Spark源码分析 – SparkContext 中的例子, 只分析到sc.runJob

那么最终是怎么执行的? 通过DAGScheduler切分成Stage, 封装成taskset, 提交给TaskScheduler, 然后等待调度, 最终到Executor上执行

val sc = new SparkContext(……)
val textFile = sc.textFile("README.md")
textFile.filter(line => line.contains("Spark")).count()

这是一个比较简单的没有shuffle的例子, 看看在Executor上是如何被执行的 
首先这个job只有一个stage, 所以只会产生resultTask

最关键的执行语句,

func(context, rdd.iterator(split, context))

对于这个例子, func就是最终产生结果的count(), 而rdd就是count前最后一个rdd, 即filter产生的rdd

可以看到Spark中rdd的执行, 不是从前往后, 而是从后往前推的, 为什么? 因为需要考虑cache和checkpoint

所以对于stage只会保留最后一个rdd, 其他的rdd通过dep去反推, 这里调用rdd.iterator来读取最后一个rdd

 

我可以说iterator是spark中最为核心的一个function吗:-)

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

如果结果被cache在memory或disk中, 则调用cacheManager.getOrCompute来读取, 否则直接从checkpoint读或compute 
通过CacheManager来完成从cache中读取数据, 或重新compute数据并且完成cache的过程

private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
  private val loading = new HashSet[String]

  /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
      : Iterator[T] = {
    val key = "rdd_%d_%d".format(rdd.id, split.index)
    blockManager.get(key) match {  // 从blockManager中获取cached值
      case Some(cachedValues) =>  // 从blockManager读到数据, 说明之前cache过, 直接返回即可
        // Partition is in cache, so just return its values
        return cachedValues.asInstanceOf[Iterator[T]]

      case None => // 没有读到数据说明没有cache过,需要重新load(compute或读cp)
        // Mark the split as loading (unless someone else marks it first)
        loading.synchronized { // 防止多次load相同的rdd, 加锁
          if (loading.contains(key)) {
            while (loading.contains(key)) {
              try {loading.wait()} catch {case _ : Throwable =>} // 如果已经在loading, 只需要wait
            }
            // See whether someone else has successfully loaded it. The main way this would fail
            // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
            // partition but we didn't want to make space for it. However, that case is unlikely
            // because it's unlikely that two threads would work on the same RDD partition. One
            // downside of the current code is that threads wait serially if this does happen.
            blockManager.get(key) match {
              case Some(values) =>
                return values.asInstanceOf[Iterator[T]]
              case None =>
                logInfo("Whoever was loading " + key + " failed; we'll try it ourselves")
                loading.add(key)
            }
          } else {
            loading.add(key) // 记录当前key, 开始loading
          }
        }
        try {
          // If we got here, we have to load the split
          logInfo("Computing partition " + split)  // loading的过程,就是读cp或重新compute
          val computedValues = rdd.computeOrReadCheckpoint(split, context) // compute的结果是iterator, 何处遍历产生真实数据?
          // Persist the result, so long as the task is not running locally
          if (context.runningLocally) { return computedValues }
          val elements = new ArrayBuffer[Any]
          elements ++= computedValues  // ++会触发iterator的遍历产生data放到elements中
          blockManager.put(key, elements, storageLevel, true) // 对新产生的数据经行cache, 调用blockManager.put
          return elements.iterator.asInstanceOf[Iterator[T]]
        } finally {
          loading.synchronized {
            loading.remove(key)
            loading.notifyAll()
          }
        }
    }
  }
}

 

Task执行的结果, 如何传到DAGScheduler

task执行的结果value, 参考Spark 源码分析 -- Task 
对于ResultTask是计算的值,比如count值, 
对于ShuffleTask为MapStatus(blockManager.blockManagerId, compressedSizes), 其中compressedSizes所有shuffle buckets写到文件中的data size

//TaskRunner
val value = task.run(taskId.toInt)
val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)  //context,StandaloneExecutorBackend

//StandaloneExecutorBackend.statusUpdate
driver ! StatusUpdate(executorId, taskId, state, data)

//DriverActor.StatusUpdate
scheduler.statusUpdate(taskId, state, data.value)

//ClusterScheduler.statusUpdate
var taskSetToUpdate: Option[TaskSetManager] = None
taskSetToUpdate.get.statusUpdate(tid, state, serializedData)

//ClusterTaskSetManager.statusUpdate
case TaskState.FINISHED =>
  taskFinished(tid, state, serializedData)

//ClusterTaskSetManager.taskFinished
val result = ser.deserialize[TaskResult[_]](serializedData)
result.metrics.resultSize = serializedData.limit()
sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
  //tasks = taskSet.tasks
  //info为TaskInfo
  class TaskInfo(
    val taskId: Long,
    val index: Int,
    val launchTime: Long,
    val executorId: String,
    val host: String,
    val taskLocality: TaskLocality.TaskLocality) 

//DAGScheduler.taskEnded
  override def taskEnded(
      task: Task[_],
      reason: TaskEndReason,
      result: Any,
      accumUpdates: Map[Long, Any],
      taskInfo: TaskInfo,
      taskMetrics: TaskMetrics) {
    eventQueue.put(CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
  }

//DAGScheduler.processEvent
handleTaskCompletion(completion)

//DAGScheduler.handleTaskCompletion
......

本文章摘自博客园,原文发布日期:2014-01-21
时间: 2024-09-20 06:10:12

Spark 源码分析 -- task实际执行过程的相关文章

spark源码分析之Checkpoint的过程

概述 checkpoint 的机制保证了需要访问重复数据的应用 Spark 的DAG执行行图可能很庞大,task 中计算链可能会很长,这时如果 task 中途运行出错,那么 task 的整个需要重算非常耗时,因此,有必要将计算代价较大的 RDD checkpoint 一下,当下游 RDD 计算出错时,可以直接从 checkpoint 过的 RDD 那里读取数据继续算. 我们先来看一个例子,checkpoint的使用 import org.apache.spark.SparkContext imp

Spark源码分析之七:Task运行(一)

        在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq

Spark源码分析之五:Task调度(一)

        在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分

Spark源码分析之八:Task运行(二)

        在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤:         Step1:Task及其运行时需要的辅助对象构造,主要包括:                        1.当前线程设置上下文类加载器:        

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那

Spark源码分析 – Shuffle

参考详细探究Spark的shuffle实现, 写的很清楚, 当前设计的来龙去脉   Hadoop Hadoop的思路是, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件  所以Hadoop后面直到reduce之前做的所有的事情其实就是不断的merge, 基于文件的多路并归排序, 在map端的将相同partition的merge到

Spark源码分析之九:内存管理模型

        Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Spark内存管理模型的神秘面纱.         我们在<Spark源码分析之七:Task运行(一)>一文中曾经提到过,在Task被传递到Executor上去执行时,在为其分配的TaskRunner线程的run()方法内,在Task真正运行之前,我们就要构造一个任务内存管理器Task

Spark源码分析之三:Stage划分

        继上篇<Spark源码分析之Job的调度模型与运行反馈>之后,我们继续来看第二阶段--Stage划分.         Stage划分的大体流程如下图所示:         前面提到,对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理.那么我们先来看下代码: // 处理Job提交的函数 private[scheduler] def handleJobSubmitted(jobId: Int, finalRD