SparkContext的初始化(季篇)——测量系统、ContextCleaner等组件介绍

《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市

《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第1章 环境准备》

《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接《第2章 SPARK设计理念与基本架构》

由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现。

《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(伯篇)》
《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(仲篇)》
《深入理解Spark:核心思想与源码分析》一书第三章第三部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(叔篇)》

本文展现第3章第四部分的内容:

3.9 启动测量系统MetricsSystem

  MetricsSystem使用codahale提供的第三方测量仓库Metrics,有关Metrics的具体信息可以参考附录D。MetricsSystem中有三个概念:

  • Instance:指定了谁在使用测量系统;
  • Source:指定了从哪里收集测量数据;
  • Sink:指定了往哪里输出测量数据。
    Spark按照Instance的不同,区分为Master、Worker、Application、Driver和Executor。Spark目前提供的Sink有ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等。Spark中使用MetricsServlet作为默认的Sink。MetricsSystem的启动代码如下。

    val metricsSystem = env.metricsSystem
    metricsSystem.start()
    

    MetricsSystem的启动过程包括以下步骤:
    1) 注册Sources;
    2) 注册Sinks;
    3) 给Sinks增加Jetty的ServletContextHandler。
    MetricsSystem启动完毕后,会遍历与Sinks有关的ServletContextHandler,并调用attachHandler将它们绑定到SparkUI上。

    metricsSystem.getServletHandlers.foreach(handler=> ui.foreach(_.attachHandler(handler)))
    

    3.9.1 注册Sources

      registerSources方法用于注册Sources,它的实现见代码清单3-44。注册Sources的过程分为以下步骤:
    1) 从metricsConfig获取Driver的Properties,默认为创建MetricsSystem的过程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet,sink.servlet.path=/metrics/json}。
    2) 从Driver的Properties中用正则匹配以source.开头的属性。然后将属性中的Source反射得到的实例,加入ArrayBuffer[Source]。
    3) 将每个Source的metricRegistry(也是MetricSet的子类型)注册到ConcurrentMap metrics。这里的registerSource方法已在3.8.2节讲解过。
    代码清单3-44 MetricsSystem

    private def registerSources() {
    val instConfig =metricsConfig.getInstance(instance)
    val sourceConfigs =metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)  
    
    //Register all the sources related to instance
    sourceConfigs.foreach{ kv =>
    val classPath = kv._2.getProperty("class")
    try {
      val source =Class.forName(classPath).newInstance()
      registerSource(source.asInstanceOf[Source])
    } catch {
      case e: Exception =>logError("Source class" + classPath + " cannot beinstantiated", e)
    }
    }
    }
    

    3.9.2 注册Sinks

      registerSinks方法用于注册Sinks,它的实现见代码清单3-45。注册Sinks的步骤如下:
    1) 从Driver的Properties中用正则匹配以sink.开头的属性,如:{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet,sink.servlet.path=/metrics/json}。将其转换为Map(servlet-> {class=org.apache.spark.metrics.sink.MetricsServlet, path=/metrics/json})。
    2) 将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。
    代码清单3-45 MetricsSystem注册Sinks的实现

    private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
    sinkConfigs.foreach { kv =>
    val classPath = kv._2.getProperty("class")
    if (null != classPath) {
      try {
        val sink = Class.forName(classPath)
          .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
          .newInstance(kv._2, registry, securityMgr)
        if (kv._1 == "servlet") {
          metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
        } else {
          sinks += sink.asInstanceOf[Sink]
        }
      } catch {
        case e: Exception => logError("Sink class "+ classPath + " cannot be instantialized",e)
      }
    }
    }
    }
    

    3.9.3 给Sinks增加Jetty的ServletContextHandler

      MetricsSystem的getServletHandlers方法,实现如下。

    def getServletHandlers = {
    require(running, "Canonly call getServletHandlers on a running MetricsSystem")
    metricsServlet.map(_.getHandlers).getOrElse(Array())
    }
    

    可以看到调用了metricsServlet的getHandlers,其实现如下。

    def getHandlers = Array[ServletContextHandler](
    createServletHandler(servletPath,
     newServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)
    )
    

    最终生成处理/metrics/json请求的ServletContextHandler,而请求的真正处理由getMetricsSnapshot方法,利用fastjson解析。生成的ServletContextHandler通过SparkUI的attachHandler方法,也被绑定到SparkUI。createServletHandler与attachHandler方法都已经在3.4.4节详细阐述。最终我们可以使用以下这些地址来访问测量数据。
    http://localhost:4040/metrics/applications/json
    http://localhost:4040/metrics/json
    http://localhost:4040/metrics/master/json

3.10 创建和启动ExecutorAllocationManager

  ExecutorAllocationManager用于动态分配executor,创建和启动ExecutorAllocationManager的代码如下。

 private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
   if (conf.getBoolean("spark.dynamicAllocation.enabled",false)) {
     Some(newExecutorAllocationManager(this, listenerBus, conf))
   } else {
     None
   }
executorAllocationManager.foreach(_.start())

默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入到listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态添加删除executor。并且通过Thread不断的添加executor,并且遍历executor,将超时的executor杀掉并且移除。ExecutorAllocationListener的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码见代码清单3-46。
代码清单3-46 ExecutorAllocationManagerr的关键代码

private valintervalMillis: Long = 100
private var clock: Clock = new RealClock
private val listener = newExecutorAllocationListener
def start():Unit = {
 listenerBus.addListener(listener)
 startPolling()
}  

private defstartPolling(): Unit = {
  val t = new Thread {
    override def run(): Unit= {
      while (true) {
        try {
         schedule()
        } catch {
          case e: Exception =>logError("Exception in dynamic executor allocation thread!", e)
        }
       Thread.sleep(intervalMillis)
      }
    }
  }
  t.setName("spark-dynamic-executor-allocation")
  t.setDaemon(true)
  t.start()
}

根据3.4.1节的内容,我们知道listenerBus内置了线程listenerThread,此线程不断从eventQueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerBus的start方法,代码如下。

listenerBus.start()

3.11 ContextCleaner的创建与启动

  由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。

private[spark] val cleaner:Option[ContextCleaner] = {
  if (conf.getBoolean("spark.cleaner.referenceTracking",true)) {
    Some(newContextCleaner(this))
  } else {
    None
  }
}
cleaner.foreach(_.start())

ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。ContextCleaner的组成如下:

  • referenceQueue:缓存顶级的AnyRef引用;
  • referenceBuffer:缓存AnyRef的虚引用;
  • listeners:缓存清理工作的监听器数组;
  • cleaningThread:用于具体清理工作的线程。
    ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-47。
    代码清单3-47 ContextCleaner的实现
private defkeepCleaning(): Unit = Utils.logUncaughtExceptions {
  while (!stopped) {
    try {
      val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
       .map(_.asInstanceOf[CleanupTaskWeakReference])
      //Synchronize here to avoid being interrupted on stop()
     synchronized {
        reference.map(_.task).foreach { task=>
         logDebug("Gotcleaning task " + task)
          referenceBuffer -= reference.get
         task match {
           case CleanRDD(rddId) =>
              doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
           case CleanShuffle(shuffleId) =>
             doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
           case CleanBroadcast(broadcastId) =>
             doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
         }
        }
      }
    } catch {
      case ie:InterruptedException if stopped => // ignore
      case e: Exception =>logError("Error in cleaningthread", e)
    }
  }
}

3.12 Spark环境更新

  在SparkContext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。

postEnvironmentUpdate()
postApplicationStart()

SparkContext初始化过程中,如果设置了spark.jars属性, spark.jars指定的jar包将由addJar方法加入到httpFileServer的jarDir变量指定的路径下。spark.files指定的文件将由addFile方法加入到httpFileServer的fileDir变量指定的路径下。见代码清单3-48。
代码清单3-48 依赖文件处理

  val jars: Seq[String] =
    conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size!= 0)).toSeq.flatten  

  val files: Seq[String] =
    conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size!= 0)).toSeq.flatten  

// Addeach JAR given through the constructor
  if (jars != null) {
    jars.foreach(addJar)
  }  

  if (files != null) {
    files.foreach(addFile)
  }

httpFileServer的addFile和addJar方法,见代码清单3-49。
代码清单3-49 HttpFileServer提供对依赖文件的访问

def addFile(file: File) : String = {
  addFileToDir(file, fileDir)
  serverUri + "/files/"+ file.getName
}  

def addJar(file:File) : String = {
  addFileToDir(file, jarDir)
  serverUri + "/jars/" +file.getName
}  

def addFileToDir(file: File, dir: File) : String = {
  if (file.isDirectory) {
    throw newIllegalArgumentException(s"$filecannot be a directory.")
  }
  Files.copy(file, new File(dir, file.getName))
  dir + "/" + file.getName
}

postEnvironmentUpdate的实现见代码清单3-50,其处理步骤如下:
1) 通过调用SparkEnv的方法environmentDetails最终影响环境的JVM参数、Spark 属性、系统属性、classPath等,参见代码清单3-51。
2) 生成事件SparkListenerEnvironmentUpdate,并post到listenerBus,此事件被EnvironmentListener监听,最终影响EnvironmentPage页面中的输出内容。
代码清单3-50 SparkContext环境更新

private def postEnvironmentUpdate() {
  if (taskScheduler != null) {
    val schedulingMode =getSchedulingMode.toString
    val addedJarPaths = addedJars.keys.toSeq
    val addedFilePaths = addedFiles.keys.toSeq
    val environmentDetails =
      SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
    val environmentUpdate =SparkListenerEnvironmentUpdate(environmentDetails)
    listenerBus.post(environmentUpdate)
  }
}

代码清单3-51 environmentDetails的实现

val jvmInformation = Seq(
  ("JavaVersion", s"$javaVersion ($javaVendor)"),
  ("Java Home", javaHome),
  ("Scala Version", versionString)
).sorted  

val schedulerMode =
  if (!conf.contains("spark.scheduler.mode")) {
    Seq(("spark.scheduler.mode", schedulingMode))
  } else {
    Seq[(String, String)]()
  }
val sparkProperties = (conf.getAll ++ schedulerMode).sorted  

// System properties that are not java classpaths
val systemProperties = Utils.getSystemProperties.toSeq
val otherProperties = systemProperties.filter { case (k, _) =>
  k != "java.class.path" && !k.startsWith("spark.")
}.sorted  

// Class paths including all added jars andfiles
val classPathEntries = javaClassPath
  .split(File.pathSeparator)
  .filterNot(_.isEmpty)
  .map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "AddedBy User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted  

Map[String, Seq[(String, String)]](
  "JVMInformation" -> jvmInformation,
  "Spark Properties" ->sparkProperties,
  "System Properties"-> otherProperties,
  "Classpath Entries"-> classPaths)

postApplicationStart方法很简单,只是向listenerBus发送了SparkListenerApplicationStart事件,代码如下。

listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
      startTime, sparkUser))

3.13 创建DAGSchedulerSource和BlockManagerSource

  在创建DAGSchedulerSource、BlockManagerSource之前首先调用taskScheduler的postStartHook方法,其目的是为了等待backend就绪,见代码清单3-52。postStartHook的实现见代码清单3-53。
创建DAGSchedulerSource和BlockManagerSource的过程类似于ExecutorSource,只不过DAGSchedulerSource测量的信息是stage. failedStages、stage.runningStages、stage. waitingStages、stage. allJobs、stage.activeJobs,BlockManagerSource测量的信息是memory. maxMem_MB、memory.remainingMem_MB、memory. memUsed_MB、memory. diskSpaceUsed_MB。
代码清单3-52 创建DAGSchedulerSource和BlockManagerSource

taskScheduler.postStartHook()  

private val dagSchedulerSource =new DAGSchedulerSource(this.dagScheduler)
private val blockManagerSource =new BlockManagerSource(SparkEnv.get.blockManager)  

ivate def initDriverMetrics() {
 SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
 SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}  

initDriverMetrics()

代码清单3-53 等待backend就绪的实现

override def postStartHook() {
   waitBackendReady()
  }  

private def waitBackendReady(): Unit = {
    if (backend.isReady) {
      return
    }
    while (!backend.isReady) {
     synchronized {
        this.wait(100)
      }
    }
  }

3.14 将SparkContext标记为激活

  SparkContext初始化的最后将当前SparkContext的状态从contextBeingConstructed(正在构建中)改为activeContext(已激活),代码如下。
[java] view plain copy 在CODE上查看代码片派生到我的代码片
SparkContext.setActiveContext(this, allowMultipleContexts)
setActiveContext方法的实现如下。

private[spark] defsetActiveContext(
    sc:SparkContext,
   allowMultipleContexts: Boolean): Unit = {
 SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
   assertNoOtherContextIsRunning(sc, allowMultipleContexts)
   contextBeingConstructed = None
    activeContext =Some(sc)
  }
}

3.15 小结

  回顾本章, Scala与Akka基于Actor的并发编程模型给人带来深刻的印象,改变了我本人每当需要提升性能时就想到使用多线程的传统观念,Actor与事件模型有类似之处,通过异步处理,减少线程切换开销,值得开发人员借鉴。listenerBus对于监听器模式的经典应用将处理转化为事件并交给统一的线程处理,减少了线程阻塞与切换,提升了性能,希望读者朋友能应用到自己的产品开发中去。此外,使用Netty所提供的异步网络框架构建的Block传输服务,基于Jetty构建的内嵌web服务、HTTP文件服务器和SparkUI,基于codahale提供的第三方测量仓库创建的测量系统,Executor中的心跳实现等内容,都值得借鉴。

时间: 2024-08-03 16:11:25

SparkContext的初始化(季篇)——测量系统、ContextCleaner等组件介绍的相关文章

SparkContext的初始化(伯篇)——执行环境与元数据清理器

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现.本文展现第3章第一部分的内容: 第3章 SparkCont

SparkContext的初始化(叔篇)——TaskScheduler的启动

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

SparkContext的初始化(仲篇)——SparkUI、环境变量及调度

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

《深入理解Spark:核心思想与源码分析》——第3章SparkContext的初始化

第3章 SparkContext的初始化 道生一, 一生二, 二生三, 三生万物. -<道德经> 本章导读 SparkContext的初始化是Driver应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark的读者理解源码.读者朋友如果能边跟踪代码,边学习本章内容,也许是快速理解SparkContext初始化过程的便捷途径.已经熟练使用Spark的开发人员可以选择跳过本章内容. 本章将在介绍SparkContext初始化过程的同时,向读者介绍

Vuejs第十篇之vuejs父子组件通信_javascript技巧

本篇文章是小编结合官方文档整理的一套更加细致,代码更多更全的教程,非常不错,比较适合新手阅读. 本篇资料来于官方文档: http://cn.vuejs.org/guide/components.html 父子组件通信 ①访问子组件.父组件.根组件: this.$parent 访问父组件 this.$children 访问子组件(是一个数组) this.$root 根实例的后代访问根实例 示例代码: <div id="app"> 父组件: <input v-model=

基本教程篇----第九节MasterSampleDemo.cs介绍

最近一直挺忙的,都没时间写博客了,好在这是基础篇的最后一篇了,我也可以歇歇了,关于其它的 深入章节我会在以后的时间补上的. 先来看看这一节的示图和源代码吧. using System; using System.Drawing; using System.Collections; using ZedGraph; namespace ZedGraph.Demo { ///<summary> /// Summary description for SimpleDemo. ///</summa

rpm 神器第二篇-multipkg 高阶用法介绍与实战

前言 前一篇文章中介绍了 multipkg 的安装和基本用法,这两天又结合之前的例子,整理了一篇类似于"实战系列"的文档, 详细说明了 rpm 中的一些细节配置在multipkg中怎么写,以便读者能通过multipkg实现rpm/spec的高阶功能:另外, 晚上总结了最近和以前的几个思路,给multipkg加了一些功能,还未合并到作者的master分支,需要使用的同学可以直接从我 的仓库来获取: git clone https://github.com/duanjigang1983/m

基本教程篇--第八节PieSampleDemo.cs介绍

这节我把饼形图粗略的介绍一下,如下图. using System; using System.Drawing; using System.Collections; using ZedGraph; namespace ZedGraph.Demo { ///<summary> /// Summary description for SimpleDemo. ///</summary> public class PieSampleDemo : DemoBase { public PieSa

iOS开发UI篇—UITableview控件简单介绍

一.基本介绍 在众多移动应⽤用中,能看到各式各样的表格数据 . 在iOS中,要实现表格数据展示,最常用的做法就是使用UITableView,UITableView继承自UIScrollView,因此支持垂直滚动,⽽且性能极佳 . UITableview有分组和不分组两种样式,可以在storyboard或者是用代码设置. 二.数据展示 UITableView需要⼀一个数据源(dataSource)来显示数据 UITableView会向数据源查询一共有多少行数据以及每⼀行显示什么数据等 没有设置数据