SparkContext的初始化(叔篇)——TaskScheduler的启动

《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市

《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第1章 环境准备》

《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接《第2章 SPARK设计理念与基本架构》

由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现。

《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(伯篇)》
《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(仲篇)》

本文展现第3章第三部分的内容:

3.8 TaskScheduler的启动

  3.7节介绍了任务调度器TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码如下。

taskScheduler.start()
TaskScheduler在启动的时候,实际调用了backend的start方法。
  override def start() {
    backend.start()
  }

以LocalBackend为例,启动LocalBackend时向actorSystem注册了LocalActor,见代码清单3-30所示(在《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(仲篇)》一文中)。

3.8.1 创建LocalActor

  创建LocalActor的过程主要是构建本地的Executor,见代码清单3-36。
代码清单3-36 LocalActor的实现

private[spark] class LocalActor(scheduler: TaskSchedulerImpl, executorBackend: LocalBackend,
  private val totalCores: Int) extends Actor with ActorLogReceive with Logging {
  import context.dispatcher   // to use Akka's scheduler.scheduleOnce()
  private var freeCores = totalCores
  private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
  private val localExecutorHostname = "localhost"

  val executor = new Executor(
    localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)

  override def receiveWithLogging = {
    case ReviveOffers =>
      reviveOffers()

    case StatusUpdate(taskId, state, serializedData) =>
      scheduler.statusUpdate(taskId, state, serializedData)
      if (TaskState.isFinished(state)) {
        freeCores += scheduler.CPUS_PER_TASK
        reviveOffers()
      }

    case KillTask(taskId, interruptThread) =>
      executor.killTask(taskId, interruptThread)

    case StopExecutor =>
      executor.stop()
  }

}

Executor的构建,见代码清单3-37,主要包括以下步骤:
1) 创建并注册ExecutorSource。ExecutorSource是做什么的呢?笔者将在3.10.2节详细介绍。
2) 获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置Executor中的ActorSystem的端口号。
3) 创建并注册ExecutorActor。ExecutorActor负责接受发送给Executor的消息。
4) urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。
5) 创建Executor执行TaskRunner任务(TaskRunner将在5.5节介绍)的线程池。此线程池是通过调用Utils.newDaemonCachedThreadPool创建的,具体实现请参阅附录A。
6) 启动Executor的心跳线程。此线程用于向Driver发送心跳。
此外,还包括Akka发送消息的帧大小(10485760字节)、结果总大小的字节限制(1073741824字节)、正在运行的task的列表、设置serializer的默认ClassLoader为创建的ClassLoader等。
代码清单3-37 Executor的构建

  val executorSource = new ExecutorSource(this, executorId)
private val env = {
    if (!isLocal) {
      val port = conf.getInt("spark.executor.port", 0)
      val _env = SparkEnv.createExecutorEnv(
        conf, executorId, executorHostname, port, numCores, isLocal, actorSystem)
      SparkEnv.set(_env)
      _env.metricsSystem.registerSource(executorSource)
      _env.blockManager.initialize(conf.getAppId)
      _env
    } else {
      SparkEnv.get
    }
  }

  private val executorActor = env.actorSystem.actorOf(
    Props(new ExecutorActor(executorId)), "ExecutorActor")

  private val urlClassLoader = createClassLoader()
  private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
  env.serializer.setDefaultClassLoader(urlClassLoader)

  private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
  private val maxResultSize = Utils.getMaxResultSize(conf)

  val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
  private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
  startDriverHeartbeater()

3.8.2 ExecutorSource的创建与注册

  ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、threadpool.completeTasks、threadpool.currentPool_size、threadpool.maxPool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.write_ops等,ExecutorSource的实现见代码清单3-38。Metric接口的具体实现,参考附录D。
代码清单3-38 ExecutorSource的实现

private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source {
  private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
    FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption

  private def registerFileSystemStat[T](
        scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {
    metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {
      override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
    })
  }
  override val metricRegistry = new MetricRegistry()
  override val sourceName = "executor"

metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
    override def getValue: Int = executor.threadPool.getActiveCount()
  })
 metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
    override def getValue: Long = executor.threadPool.getCompletedTaskCount()
  })
  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
    override def getValue: Int = executor.threadPool.getPoolSize()
  })
  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
    override def getValue: Int = executor.threadPool.getMaximumPoolSize()
  })

  // Gauge for file system stats of this executor
  for (scheme <- Array("hdfs", "file")) {
    registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
    registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
    registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
    registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
    registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
  }
}

创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将Source注册到MetricRegistry,见代码清单3-39。关于MetricRegistry,具体参阅附录D。
代码清单3-39 MetricsSystem注册Source的实现

  def registerSource(source: Source) {
    sources += source
    try {
      val regName = buildRegistryName(source)
      registry.register(regName, source.metricRegistry)
    } catch {
      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
    }
  }

3.8.3 ExecutorActor的构建与注册

  ExecutorActor很简单,当接收到SparkUI发来的消息时,将所有线程的栈信息发送回去,代码实现如下。

  override def receiveWithLogging = {
    case TriggerThreadDump =>
      sender ! Utils.getThreadDump()
  }

3.8.4 Spark自身ClassLoader的创建

  获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下加载,最后创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,见代码清单3-40。
代码清单3-40 Spark自身ClassLoader的创建

  private def createClassLoader(): MutableURLClassLoader = {
    val currentLoader = Utils.getContextOrSparkClassLoader

    val urls = currentJars.keySet.map { uri =>
      new File(uri.split("/").last).toURI.toURL
    }.toArray
    val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
    userClassPathFirst match {
      case true => new ChildExecutorURLClassLoader(urls, currentLoader)
      case false => new ExecutorURLClassLoader(urls, currentLoader)
    }
  }

Utils.getContextOrSparkClassLoader的实现见附录A。ExecutorURLClassLoader或者ChildExecutorURLClassLoader实际上都继承了URLClassLoader,见代码清单3-41。

代码清单3-41 ChildExecutorURLClassLoader与ExecutorURLClassLoader的实现

private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
  extends MutableURLClassLoader {

  private object userClassLoader extends URLClassLoader(urls, null){
    override def addURL(url: URL) {
      super.addURL(url)
    }
    override def findClass(name: String): Class[_] = {
      super.findClass(name)
    }
  }

  private val parentClassLoader = new ParentClassLoader(parent)

  override def findClass(name: String): Class[_] = {
    try {
      userClassLoader.findClass(name)
    } catch {
      case e: ClassNotFoundException => {
        parentClassLoader.loadClass(name)
      }
    }
  }

  def addURL(url: URL) {
    userClassLoader.addURL(url)
  }

  def getURLs() = {
    userClassLoader.getURLs()
  }
}

private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
  extends URLClassLoader(urls, parent) with MutableURLClassLoader {

  override def addURL(url: URL) {
    super.addURL(url)
  }
}

如果需要REPL交互,还会调用addReplClassLoaderIfNeeded创建replClassLoader,见代码清单3-42。

代码清单3-42 addReplClassLoaderIfNeeded的实现

  private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
    val classUri = conf.get("spark.repl.class.uri", null)
    if (classUri != null) {
      logInfo("Using REPL class URI: " + classUri)
      val userClassPathFirst: java.lang.Boolean =
        conf.getBoolean("spark.files.userClassPathFirst", false)
      try {
        val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
          .asInstanceOf[Class[_ <: ClassLoader]]
        val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
          classOf[ClassLoader], classOf[Boolean])
        constructor.newInstance(conf, classUri, parent, userClassPathFirst)
      } catch {
        case _: ClassNotFoundException =>
          logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
          System.exit(1)
          null
      }
    } else {
      parent
    }
  }

3.8.5 启动Executor的心跳线程

  Executor的心跳由startDriverHeartbeater启动,见代码清单3-43。Executor心跳线程的间隔由属性spark.executor.heartbeatInterval配置,默认是10000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒,使用actorSystem.actorSelection (url)方法查找到匹配的Actor引用, url是akka.tcp://sparkDriver@ $driverHost:$driverPort/user/HeartbeatReceiver,最终创建一个运行过程中,每次会休眠10000到20000毫秒的线程。此线程从runningTasks获取最新的有关Task的测量信息,将其与executorId、blockManagerId封装为Heartbeat消息,向HeartbeatReceiver发送Heartbeat消息。
代码清单3-43 启动Executor的心跳线程

  def startDriverHeartbeater() {
    val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
    val timeout = AkkaUtils.lookupTimeout(conf)
    val retryAttempts = AkkaUtils.numRetries(conf)
    val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
    val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf,env.actorSystem)
    val t = new Thread() {
      override def run() {
        // Sleep a random interval so the heartbeats don't end up in sync
        Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
        while (!isStopped) {
          val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
          val curGCTime = gcTime
          for (taskRunner <- runningTasks.values()) {
            if (!taskRunner.attemptedTask.isEmpty) {
              Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
                metrics.updateShuffleReadMetrics
                metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
                if (isLocal) {
                  val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
                  tasksMetrics += ((taskRunner.taskId, copiedMetrics))
                } else {
                  // It will be copied by serialization
                  tasksMetrics += ((taskRunner.taskId, metrics))
                }
              }
            }
          }
          val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
          try {
            val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
              retryAttempts, retryIntervalMs, timeout)
            if (response.reregisterBlockManager) {
              logWarning("Told to re-register on heartbeat")
              env.blockManager.reregister()
            }
          } catch {
            case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
          }
          Thread.sleep(interval)
        }
      }
    }
    t.setDaemon(true)
    t.setName("Driver Heartbeater")
    t.start()
  }

这个心跳线程的作用是什么呢?其作用有两个:
 更新正在处理的任务的测量信息;
 通知BlockManagerMaster,此Executor上的BlockManager依然活着。

下面对心跳线程的实现详细分析下,读者可以自行选择是否需要阅读。
初始化TaskSchedulerImpl后会创建心跳接收器HeartbeatReceiver。HeartbeatReceiver接受所有分配给当前Driver Application的Executor的心跳,并将Task、Task计量信息、心跳等交给TaskSchedulerImpl和DAGScheduler作进一步处理。创建心跳接收器的代码如下。

  private val heartbeatReceiver = env.actorSystem.actorOf(
    Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

HeartbeatReceiver在收到心跳消息后,会调用TaskScheduler的executorHeartbeatReceived方法,代码如下。

  override def receiveWithLogging = {
    case Heartbeat(executorId, taskMetrics, blockManagerId) =>
      val response = HeartbeatResponse(
        !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
      sender ! response
  }

executorHeartbeatReceived的实现代码如下。

    val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
      taskMetrics.flatMap { case (id, metrics) =>
        taskIdToTaskSetId.get(id)
          .flatMap(activeTaskSets.get)
          .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))
      }
    }
    dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)

这段程序通过遍历taskMetrics,依据taskIdToTaskSetId和activeTaskSets找到TaskSetManager。然后将taskId、TaskSetManager.stageId、TaskSetManager .taskSet.attempt、TaskMetrics封装到Array[(Long, Int, Int, TaskMetrics)]的数组metricsWithStageIds中。最后调用了dagScheduler的executorHeartbeatReceived方法,其实现如下。

    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
    implicit val timeout = Timeout(600 seconds)

    Await.result(
      blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId),
      timeout.duration).asInstanceOf[Boolean]

dagScheduler将executorId、metricsWithStageIds封装为SparkListenerExecutorMetricsUpdate事件,并post到listenerBus中,此事件用于更新Stage的各种测量数据。最后给BlockManagerMaster持有的BlockManagerMasterActor发送BlockManagerHeartbeat消息。BlockManagerMasterActor在收到消息后会匹配执行heartbeatReceived方法(会在4.3.1节介绍)。heartbeatReceived最终更新BlockManagerMaster对BlockManager最后可见时间(即更新BlockManagerId对应的BlockManagerInfo的_lastSeenMs,见代码清单3-44)。
代码清单3-44 BlockManagerMasterActor的心跳处理

private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
    if (!blockManagerInfo.contains(blockManagerId)) {
      blockManagerId.isDriver && !isLocal
    } else {
      blockManagerInfo(blockManagerId).updateLastSeenMs()
      true
    }
  }

local模式下Executor的心跳通信过程,可以用图3-3来表示。

图3-3 Executor的心跳通信过程

注意:在非local模式中Executor发送心跳的过程是一样的,主要的区别是Executor进程与Driver不在同一个进程,甚至不在同一个节点上。

接下来会初始化块管理器BlockManager,代码如下。

env.blockManager.initialize(applicationId)

具体的初始化过程,请参阅第4章。

时间: 2024-09-19 09:17:41

SparkContext的初始化(叔篇)——TaskScheduler的启动的相关文章

SparkContext的初始化(季篇)——测量系统、ContextCleaner等组件介绍

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

SparkContext的初始化(伯篇)——执行环境与元数据清理器

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现.本文展现第3章第一部分的内容: 第3章 SparkCont

SparkContext的初始化(仲篇)——SparkUI、环境变量及调度

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

《深入理解Spark:核心思想与源码分析》——3.8节TaskScheduler的启动

3.8 TaskScheduler的启动3.6节介绍了任务调度器TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码如下.taskScheduler.start()TaskScheduler在启动的时候,实际调用了backend的start方法. override def start() { backend.start() } 以LocalBackend为例,启动LocalBackend时向actorSystem注册了LocalActor,见代码清单3-3

《深入理解Spark:核心思想与源码分析》——第3章SparkContext的初始化

第3章 SparkContext的初始化 道生一, 一生二, 二生三, 三生万物. -<道德经> 本章导读 SparkContext的初始化是Driver应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark的读者理解源码.读者朋友如果能边跟踪代码,边学习本章内容,也许是快速理解SparkContext初始化过程的便捷途径.已经熟练使用Spark的开发人员可以选择跳过本章内容. 本章将在介绍SparkContext初始化过程的同时,向读者介绍

2000条你应知的WPF小姿势 基础篇&lt;28-33 WPF启动故事&gt;

原文:2000条你应知的WPF小姿势 基础篇<28-33 WPF启动故事> 在正文开始之前需要介绍一个人:Sean Sexton. 来自明尼苏达双城的软件工程师.最为出色的是他维护了两个博客:2,000Things You Should Know About C#  和 2,000 Things You Should Know About WPF .他以类似微博式的150字简短语言来每天更新一条WPF和C#重要又容易被遗忘的知识.Follow他的博客也有一段日子了,很希望能够分享给大家. 本系

Tomcat源码阅读#3:Server初始化与启动

b) Catalina.load() b1) initDirs() -> set properties like catalina.home catalina.base == catalina.home (most cases) b2) initNaming setProperty(javax.naming.Context.INITIAL_CONTEXT_FACTORY, org.apache.naming.java.javaURLContextFactory ->default) b3) c

docker中mysql初始化及启动失败问题解决方案_docker

最近做项目,遇到这样问题,docker 中的mysql 不能启动,经过上网查资料,终于解决了这个问题,这里记录下,也许还能帮助到大家,  在docker中有一个mysql服务,其数据文件是挂在在主机外面的文件,在docker中的root有访问该数据文件的权限,但是docker中mysql访问数据文件的时候提示权限不足,于是只有以root用户来启动mysql了.    数据初始化: mysql_install_db --user=root --explicit_defaults_for_times

《深入理解Spark:核心思想与源码分析》——3.1节SparkContext概述

3.1 SparkContext概述 Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端.了解Spark Driver的初始化,有助于读者理解用户应用程序在客户端的处理过程. Spark Driver的初始化始终围绕着SparkContext的初始化.SparkContext可以算得上是所有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动.SparkContext初始化完毕,才能向Spark集群提交任务.在平坦的公路上,发动机只需以较低的转速.较低的功率