深入理解Spark:核心思想与源码分析. 3.4 SparkUI详解

3.4 SparkUI详解

任何系统都需要提供监控功能,用浏览器能访问具有样式及布局并提供丰富监控数据的页面无疑是一种简单、高效的方式。SparkUI就是这样的服务,它的架构如图3-1所示。

在大型分布式系统中,采用事件监听机制是最常见的。为什么要使用事件监听机制?假如SparkUI采用Scala的函数调用方式,那么随着整个集群规模的增加,对函数的调用会越来越多,最终会受到Driver所在JVM的线程数量限制而影响监控数据的更新,甚至出现监控数据无法及时显示给用户的情况。由于函数调用多数情况下是同步调用,这就导致线程被阻塞,在分布式环境中,还可能因为网络问题,导致线程被长时间占用。将函数调用更换为发送事件,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。

 

图3-1 SparkUI架构

我们先简单介绍图3-1中的各个组件:DAGScheduler是主要的产生各类SparkListener-Event的源头,它将各种SparkListenerEvent发送到listenerBus的事件队列中,listenerBus通过定时器将SparkListenerEvent事件匹配到具体的SparkListener,改变SparkListener中的统计监控数据,最终由SparkUI的界面展示。从图3-1中还可以看到Spark里定义了很多监听器SparkListener的实现,包括JobProgressListener、EnvironmentListener、StorageListener、ExecutorsListener,它们的类继承体系如图3-2所示。

 

图3-2 SparkListener的类继承体系

3.4.1 listenerBus详解

listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监听器监听状态信息的修改,达到UI界面的数据刷新效果。LiveListenerBus由以下部分组成:

事件阻塞队列:类型为LinkedBlockingQueue[SparkListenerEvent],固定大小是10 000;

监听器数组:类型为ArrayBuffer[SparkListener],存放各类监听器SparkListener。

事件匹配监听器的线程:此Thread不断拉取LinkedBlockingQueue中的事件,遍历监听器,调用监听器的方法。任何事件都会在LinkedBlockingQueue中存在一段时间,然后Thread处理了此事件后,会将其清除。因此使用listenerBus这个名字再合适不过了,到站就下车。listenerBus的实现见代码清单3-15。

代码清单3-15 LiveListenerBus的事件处理实现

private val EVENT_QUEUE_CAPACITY = 10000

    private
val eventQueue = new
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

   
private var queueFullErrorMessageLogged = false

   
private var started = false

   
// A counter that represents the number of events produced and consumed
in the queue

   
private val eventLock = new Semaphore(0)

 

   
private val listenerThread = new Thread("SparkListenerBus") {

     
setDaemon(true)

     
override def run(): Unit = Utils.logUncaughtExceptions {

       
while (true) {

           
eventLock.acquire()

           
// Atomically remove and process this event

           
LiveListenerBus.this.synchronized {

                val event = eventQueue.poll

                if (event ==
SparkListenerShutdown) {

                    // Get out of the while
loop and shutdown the daemon thread

                    return

                }

               
Option(event).foreach(postToAll)

           
}

       
}

    }

}

 

def start() {

   
if (started) {

       
throw new IllegalStateException("Listener bus already
started!")

    }

   
listenerThread.start()

   
started = true

    }

def post(event: SparkListenerEvent) {

   
val eventAdded = eventQueue.offer(event)

   
if (eventAdded) {

       
eventLock.release()

    }
else {

       
logQueueFullErrorMessage()

    }

}

 

def listenerThreadIsAlive: Boolean =
synchronized { listenerThread.isAlive }

 

def queueIsEmpty: Boolean = synchronized {
eventQueue.isEmpty }

 

def stop() {

   if
(!started) {

       
throw new IllegalStateException("Attempted to stop a listener bus
that has not yet started!")

    }

   
post(SparkListenerShutdown)

   
listenerThread.join()

}

LiveListenerBus中调用的postToAll方法实际定义在父类SparkListenerBus中,如代码清单3-16所示。

代码清单3-16 SparkListenerBus中的监听器调用

protected val sparkListeners = new
ArrayBuffer[SparkListener]

   
with mutable.SynchronizedBuffer[SparkListener]

 

def addListener(listener: SparkListener) {

   
sparkListeners += listener

}

 

def postToAll(event: SparkListenerEvent) {

   
event match {

       
case stageSubmitted: SparkListenerStageSubmitted =>

           
foreachListener(_.onStageSubmitted(stageSubmitted))

       
case stageCompleted: SparkListenerStageCompleted =>

           
foreachListener(_.onStageCompleted(stageCompleted))

       
case jobStart: SparkListenerJobStart =>

           
foreachListener(_.onJobStart(jobStart))

       
case jobEnd: SparkListenerJobEnd =>

           
foreachListener(_.onJobEnd(jobEnd))

       
case taskStart: SparkListenerTaskStart =>

           
foreachListener(_.onTaskStart(taskStart))

       
case taskGettingResult: SparkListenerTaskGettingResult =>

           
foreachListener(_.onTaskGettingResult(taskGettingResult))

       
case taskEnd: SparkListenerTaskEnd =>

           
foreachListener(_.onTaskEnd(taskEnd))

       
case environmentUpdate: SparkListenerEnvironmentUpdate =>

           
foreachListener(_.onEnvironmentUpdate(environmentUpdate))

       
case blockManagerAdded: SparkListenerBlockManagerAdded =>

           
foreachListener(_.onBlockManagerAdded(blockManagerAdded))

       
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>

           
foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))

       
case unpersistRDD: SparkListenerUnpersistRDD =>

         
  foreachListener(_.onUnpersistRDD(unpersistRDD))

       
case applicationStart: SparkListenerApplicationStart =>

           
foreachListener(_.onApplicationStart(applicationStart))

       
case applicationEnd: SparkListenerApplicationEnd =>

           
foreachListener(_.onApplicationEnd(applicationEnd))

       
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>

           
foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))

       
case SparkListenerShutdown =>

    }

}

 

private def foreachListener(f:
SparkListener => Unit): Unit = {

   
sparkListeners.foreach { listener =>

       
try {

           
f(listener)

       
} catch {

           
case e: Exception =>

           
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw
an exception", e)

      
}

    }

}

3.4.2 构造JobProgressListener

我们以JobProgressListener为例来讲解SparkListener。JobProgressListener是SparkContext中一个重要的组成部分,通过监听listenerBus中的事件更新任务进度。SparkStatusTracker和SparkUI实际上也是通过JobProgressListener来实现任务状态跟踪的。创建JobProgressListener的代码如下。

private[spark] val jobProgressListener =
new JobProgressListener(conf)

listenerBus.addListener(jobProgressListener)

 

val statusTracker = new
SparkStatusTracker(this)

JobProgressListener的作用是通过HashMap、ListBuffer等数据结构存储JobId及对应的JobUIData信息,并按照激活、完成、失败等job状态统计。对于StageId、StageInfo等信息按照激活、完成、忽略、失败等Stage状态统计,并且存储StageId与JobId的一对多关系。这些统计信息最终会被JobPage和StagePage等页面访问和渲染。JobProgressListener的数据结构见代码清单3-17。

代码清单3-17 JobProgressListener维护的信息

class JobProgressListener(conf: SparkConf)
extends SparkListener with Logging {

 

   
import JobProgressListener._

 

   
type JobId = Int

   
type StageId = Int

   
type StageAttemptId = Int

   
type PoolName = String

   
type ExecutorId = String

 

    // Jobs:

   
val activeJobs = new HashMap[JobId, JobUIData]

   
val completedJobs = ListBuffer[JobUIData]()

   
val failedJobs = ListBuffer[JobUIData]()

   
val jobIdToData = new HashMap[JobId, JobUIData]

 

   
// Stages:

   
val activeStages = new HashMap[StageId, StageInfo]

   
val completedStages = ListBuffer[StageInfo]()

   
val skippedStages = ListBuffer[StageInfo]()

   
val failedStages = ListBuffer[StageInfo]()

   
val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]

   
val stageIdToInfo = new HashMap[StageId, StageInfo]

   
val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]

   
val poolToActiveStages = HashMap[PoolName, HashMap[StageId,
StageInfo]]()

   
var numCompletedStages = 0         //
总共完成的Stage数量

   
var numFailedStages = 0         // 总共失败的Stage数量

 

   
// Misc:

   
val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()

   
def blockManagerIds = executorIdToBlockManagerId.values.toSeq

 

   
var schedulingMode: Option[SchedulingMode] = None

 

   
// number of non-active jobs and stages (there is no limit for active
jobs   and stages):

   
val retainedStages = conf.getInt("spark.ui.retainedStages",
DEFAULT_RETAINED_STAGES)

   
val retainedJobs = conf.getInt("spark.ui.retainedJobs",
DEFAULT_RETAINED_JOBS)

JobProgressListener 实现了onJobStart、onJobEnd、onStageCompleted、onStageSubmitted、onTaskStart、onTaskEnd等方法,这些方法正是在listenerBus的驱动下,改变JobProgress-Listener中的各种Job、Stage相关的数据。

3.4.3 SparkUI的创建与初始化

SparkUI的创建,见代码清单3-18。

代码清单3-18 SparkUI的声明

private[spark] val ui: Option[SparkUI] =

   
if (conf.getBoolean("spark.ui.enabled", true)) {

       
Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,

           
env.securityManager,appName))

    }
else {

       
None

    }

 

ui.foreach(_.bind())

可以看到如果不需要提供SparkUI服务,可以将属性spark.ui.enabled修改为false。其中createLiveUI实际是调用了create方法,见代码清单3-19。

代码清单3-19 SparkUI的创建

def createLiveUI(

       
sc: SparkContext,

       
conf: SparkConf,

       
listenerBus: SparkListenerBus,

       
jobProgressListener: JobProgressListener,

       
securityManager: SecurityManager,

       
appName: String): SparkUI =  {

   
create(Some(sc), conf, listenerBus, securityManager, appName,

       
jobProgressListener = Some(jobProgressListener))

  }

create方法的实现参见代码清单3-20。

代码清单3-20 creat方法的实现

private def create(

       
sc: Option[SparkContext],

       
conf: SparkConf,

       
listenerBus: SparkListenerBus,

       
securityManager: SecurityManager,

       
appName: String,

       
basePath: String = "",

       
jobProgressListener: Option[JobProgressListener] = None): SparkUI = {

 

   
val _jobProgressListener: JobProgressListener =
jobProgressListener.getOrElse {

       
val listener = new JobProgressListener(conf)

       
listenerBus.addListener(listener)

       
listener

    }

 

   
val environmentListener = new EnvironmentListener

   
val storageStatusListener = new StorageStatusListener

   
val executorsListener = new ExecutorsListener(storageStatusListener)

   
val storageListener = new StorageListener(storageStatusListener)

 

   
listenerBus.addListener(environmentListener)

   
listenerBus.addListener(storageStatusListener)

   
listenerBus.addListener(executorsListener)

   
listenerBus.addListener(storageListener)

 

   
new SparkUI(sc, conf, securityManager, environmentListener,
storageStatusListener,

       
executorsListener, _jobProgressListener, storageListener, appName,
basePath)

}

根据代码清单3-20,可以知道在create方法里除了JobProgressListener是外部传入的之外,又增加了一些SparkListener。例如,用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorsTab的ExecutorsListener;用于准备将Executor相关存储信息展示在BlockManagerUI的StorageListener等。最后创建SparkUI,Spark UI服务默认是可以被杀掉的,通过修改属性spark.ui.killEnabled为false可以保证不被杀死。initialize方法会组织前端页面各个Tab和Page的展示及布局,参见代码清单3-21。

代码清单3-21 SparkUI的初始化

private[spark] class SparkUI private (

   
val sc: Option[SparkContext],

   
val conf: SparkConf,

   
val securityManager: SecurityManager,

   
val environmentListener: EnvironmentListener,

   
val storageStatusListener: StorageStatusListener,

   
val executorsListener: ExecutorsListener,

   
val jobProgressListener: JobProgressListener,

   
val storageListener: StorageListener,

   
var appName: String,

   
val basePath: String)

extends WebUI(securityManager,
SparkUI.getUIPort(conf), conf, basePath, "SparkUI")

with Logging {

 

val killEnabled =
sc.map(_.conf.getBoolean("spark.ui.killEnabled",
true)).getOrElse(false)

 

/** Initialize all components of the
server. */

def initialize() {

   
attachTab(new JobsTab(this))

   
val stagesTab = new StagesTab(this)

   
attachTab(stagesTab)

   
attachTab(new StorageTab(this))

   
attachTab(new EnvironmentTab(this))

   
attachTab(new ExecutorsTab(this))

   
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR,
"/static"))

   
attachHandler(createRedirectHandler("/", "/jobs",
basePath = basePath))

   
attachHandler(

       
createRedirectHandler("/stages/stage/kill",
"/stages", stagesTab.handleKillRequest))

}

initialize()

3.4.4 Spark UI的页面布局与展示

SparkUI究竟是如何实现页面布局及展示的?JobsTab展示所有Job的进度、状态信息,这里我们以它为例来说明。JobsTab会复用SparkUI的killEnabled、SparkContext、job-ProgressListener,包括AllJobsPage和JobPage两个页面,见代码清单3-22。

代码清单3-22 JobsTab的实现

private[ui] class JobsTab(parent: SparkUI)
extends SparkUITab(parent, "jobs") {

   
val sc = parent.sc

   
val killEnabled = parent.killEnabled

   
def isFairScheduler = listener.schedulingMode.exists(_ ==
SchedulingMode.FAIR)

   
val listener = parent.jobProgressListener

 

   
attachPage(new AllJobsPage(this))

   
attachPage(new JobPage(this))

}

AllJobsPage由render方法渲染,利用jobProgressListener中的统计监控数据生成激活、完成、失败等状态的Job摘要信息,并调用jobsTable方法生成表格等html元素,最终使用UIUtils的headerSparkPage封装好css、js、header及页面布局等,见代码清单3-23。

代码清单3-23 AllJobsPage的实现

def render(request: HttpServletRequest):
Seq[Node] = {

   
listener.synchronized {

       
val activeJobs = listener.activeJobs.values.toSeq

       
val completedJobs = listener.completedJobs.reverse.toSeq

       
val failedJobs = listener.failedJobs.reverse.toSeq

       
val now = System.currentTimeMillis

 

       
val activeJobsTable =

           
jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)

       
val completedJobsTable =

           
jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)

       
val failedJobsTable =

           
jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)

 

       
val summary: NodeSeq =

           
<div>

                <ul
class="unstyled">

                    {if (startTime.isDefined) {

                        // Total duration is
not meaningful unless the UI is live

                        <li>

                            <strong>Total
Duration: </strong>

                           
{UIUtils.formatDuration(now - startTime.get)}

                        </li>

                    }}

                    <li>

                       
<strong>Scheduling Mode: </strong>

                       
{listener.schedulingMode.map(_.toString).getOrElse("Unknown")}

                    </li>

                    <li>

                        <a
href="#active"><strong>Active Jobs:</strong></a>

                        {activeJobs.size}

                    </li>

                    <li>

                        <a
href="#completed"><strong>Completed
Jobs:</strong></a>

                        {completedJobs.size}

                    </li>

                    <li>

                        <a
href="#failed"><strong>Failed Jobs:</strong></a>

                        {failedJobs.size}

                    </li>

                </ul>

           
</div>

jobsTable用来生成表格数据,见代码清单3-24。

代码清单3-24 jobsTable处理表格的实现

private def jobsTable(jobs:
Seq[JobUIData]): Seq[Node] = {

   
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)

 

   
val columns: Seq[Node] = {

       
<th>{if (someJobHasJobGroup) "Job Id (Job Group)" else
"Job Id"}</th>

       
<th>Description</th>

       
<th>Submitted</th>

       
<th>Duration</th>

       
<th class="sorttable_nosort">Stages:
Succeeded/Total</th>

       
<th class="sorttable_nosort">Tasks (for all stages):
Succeeded/Total</th>

    }

 

   
<table class="table table-bordered table-striped table-condensed
sortable">

       
<thead>{columns}</thead>

       
<tbody>

           
{jobs.map(makeRow)}

     
  </tbody>

   
</table>

}

表格中每行数据又是通过makeRow方法渲染的,参见代码清单3-25。

代码清单3-25 生成表格中的行

def makeRow(job: JobUIData): Seq[Node] = {

   
val lastStageInfo = Option(job.stageIds)

       
.filter(_.nonEmpty)

       
.flatMap { ids => listener.stageIdToInfo.get(ids.max) }

   
val lastStageData = lastStageInfo.flatMap { s =>

       
listener.stageIdToData.get((s.stageId, s.attemptId))

    }

   
val isComplete = job.status == JobExecutionStatus.SUCCEEDED

   
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown
Stage Name)")

   
val lastStageDescription =
lastStageData.flatMap(_.description).getOrElse("")

   
val duration: Option[Long] = {

       
job.startTime.map { start =>

           
val end = job.endTime.getOrElse(System.currentTimeMillis())

       
end - start

       
}

    }

   
val formattedDuration = duration.map(d =>
UIUtils.formatDuration(d)).getOrElse("Unknown")

   
val formattedSubmissionTime =
job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")

   
val detailUrl =

       
"%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath),
job.jobId)

   
<tr>

       
<td sorttable_customkey={job.jobId.toString}>

           
{job.jobId} {job.jobGroup.map(id =>
s"($id)").getOrElse("")}

       
</td>

       
<td>

     
      <div><em>{lastStageDescription}</em></div>

           
<a href={detailUrl}>{lastStageName}</a>

       
</td>

           
<td sorttable_customkey={job.startTime.getOrElse(-1).toString}>

           
{formattedSubmissionTime}

       
</td>

       
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formatted-Duration}</td>

       
<td class="stage-progress-cell">

           
{job.completedStageIndices.size}/{job.stageIds.size -
job.numSkipped-Stages}

           
{if (job.numFailedStages > 0) s"(${job.numFailedStages}
failed)"}

           
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages}
skipped)"}

       
</td>

       
<td class="progress-cell">

           
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed =
job.numCompletedTasks,

           
failed = job.numFailedTasks, skipped = job.numSkippedTasks,

           
total = job.numTasks - job.numSkippedTasks)}

       
</td>

   
</tr>

}

代码清单3-22中的attachPage方法存在于JobsTab的父类WebUITab中,WebUITab维护有ArrayBuffer[WebUIPage]的数据结构,AllJobsPage和JobPage将被放入此ArrayBuffer中,参见代码清单3-26。

代码清单3-26 WebUITab的实现

private[spark] abstract class
WebUITab(parent: WebUI, val prefix: String) {

   
val pages = ArrayBuffer[WebUIPage]()

   
val name = prefix.capitalize

 

   
/** Attach a page to this tab. This prepends the page's prefix with the
tab's own prefix. */

   
def attachPage(page: WebUIPage) {

       
page.prefix = (prefix + "/" +
page.prefix).stripSuffix("/")

       
pages += page

    }

 

   
/** Get a list of header tabs from the parent UI. */

   
def headerTabs: Seq[WebUITab] = parent.getTabs

 

   
def basePath: String = parent.getBasePath

}

JobsTab创建之后,将被attachTab方法加入SparkUI的ArrayBuffer[WebUITab]中,并且通过attachPage方法,给每一个page生成org.eclipse.jetty.servlet.ServletContextHandler,最后调用attachHandler方法将ServletContextHandler绑定到SparkUI,即加入到handlers
:ArrayBuffer[ServletContextHandler]和样例类ServerInfo的rootHandler(ContextHandlerCollection)中。SparkUI继承自WebUI,attachTab方法在WebUI中实现,参见代码清单3-27。

代码清单3-27 WebUI的实现

private[spark] abstract class WebUI(
securityManager: SecurityManager, port: Int,

       
conf: SparkConf, basePath: String = "", name: String =
"") extends Logging {

 

   
protected val tabs = ArrayBuffer[WebUITab]()

   
protected val handlers = ArrayBuffer[ServletContextHandler]()

   
protected var serverInfo: Option[ServerInfo] = None

   
protected val localHostName = Utils.localHostName()

   
protected val publicHostName =
Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)

   
private val className = Utils.getFormattedClassName(this)

 

   
def getBasePath: String = basePath

   
def getTabs: Seq[WebUITab] = tabs.toSeq

   
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq

   
def getSecurityManager: SecurityManager = securityManager

 

   
/** Attach a tab to this UI, along with all of its attached pages. */

   
def attachTab(tab: WebUITab) {

       
tab.pages.foreach(attachPage)

       
tabs += tab

    }

 

   
/** Attach a page to this UI. */

   
def attachPage(page: WebUIPage) {

      
 val pagePath = "/" +
page.prefix

       
attachHandler(createServletHandler(pagePath,

       
(request: HttpServletRequest) => page.render(request),
securityManager, basePath))

   
attachHandler(createServletHandler(pagePath.stripSuffix("/") +
"/json",

        (request: HttpServletRequest) =>
page.renderJson(request), security-Manager, basePath))

}

 

   
/** Attach a handler to this UI. */

   
def attachHandler(handler: ServletContextHandler) {

       
handlers += handler

       
serverInfo.foreach { info =>

           
info.rootHandler.addHandler(handler)

           
if (!handler.isStarted) {

                handler.start()

       
}

    }

}

由于代码清单3-27所在的类中使用import org.apache.spark.ui.JettyUtils._导入了JettyUtils的静态方法,所以createServletHandler方法实际是JettyUtils
的静态方法createServletHandler。createServletHandler实际创建了javax.servlet.http.HttpServlet的匿名内部类实例,此实例实际使用(request:
HttpServletRequest) => page.render(request)函数参数来处理请求,进而渲染页面呈现给用户。有关createServletHandler的实现及Jetty的相关信息,请参阅附录C。

3.4.5 SparkUI的启动

SparkUI创建好后,需要调用父类WebUI的bind方法,绑定服务和端口,bind方法中主要的代码实现如下。

serverInfo =
Some(startJettyServer("0.0.0.0", port, handlers, conf, name))

JettyUtils的静态方法startJettyServer的实现请参阅附录C。最终启动了Jetty提供的服务,默认端口是4040。

时间: 2024-09-13 04:53:00

深入理解Spark:核心思想与源码分析. 3.4 SparkUI详解的相关文章

《深入理解Spark:核心思想与源码分析》——3.4节SparkUI详解

3.4 SparkUI详解 任何系统都需要提供监控功能,用浏览器能访问具有样式及布局并提供丰富监控数据的页面无疑是一种简单.高效的方式.SparkUI就是这样的服务,它的架构如图3-1所示. 在大型分布式系统中,采用事件监听机制是最常见的.为什么要使用事件监听机制?假如SparkUI采用Scala的函数调用方式,那么随着整个集群规模的增加,对函数的调用会越来越多,最终会受到Driver所在JVM的线程数量限制而影响监控数据的更新,甚至出现监控数据无法及时显示给用户的情况.由于函数调用多数情况下是

《深入理解Spark:核心思想与源码分析》——2.3节Spark基本设计思想

2.3 Spark基本设计思想2.3.1 Spark模块设计 整个Spark主要由以下模块组成: Spark Core:Spark的核心功能实现,包括:SparkContext的初始化(Driver Application通过SparkContext提交).部署模式.存储体系.任务提交与执行.计算引擎等. Spark SQL:提供SQL处理能力,便于熟悉关系型数据库操作的工程师进行交互查询.此外,还为熟悉Hadoop的用户提供Hive SQL处理能力. Spark Streaming:提供流式计

《深入理解Spark:核心思想与源码分析》——第1章环境准备

第1章 环 境 准 备 凡事豫则立,不豫则废:言前定,则不跲:事前定,则不困. -<礼记·中庸> 本章导读 在深入了解一个系统的原理.实现细节之前,应当先准备好它的源码编译环境.运行环境.如果能在实际环境安装和运行Spark,显然能够提升读者对于Spark的一些感受,对系统能有个大体的印象,有经验的技术人员甚至能够猜出一些Spark采用的编程模型.部署模式等.当你通过一些途径知道了系统的原理之后,难道不会问问自己:"这是怎么做到的?"如果只是游走于系统使用.原理了解的层面,

《深入理解Spark:核心思想与源码分析》——第3章SparkContext的初始化

第3章 SparkContext的初始化 道生一, 一生二, 二生三, 三生万物. -<道德经> 本章导读 SparkContext的初始化是Driver应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark的读者理解源码.读者朋友如果能边跟踪代码,边学习本章内容,也许是快速理解SparkContext初始化过程的便捷途径.已经熟练使用Spark的开发人员可以选择跳过本章内容. 本章将在介绍SparkContext初始化过程的同时,向读者介绍

《深入理解Spark:核心思想与源码分析》——1.4节Spark源码编译与调试

1.4 Spark源码编译与调试 1.下载Spark源码 首先,访问Spark官网http://spark.apache.org/,如图1-18所示. 2.构建Scala应用 使用cmd命令行进到Spark根目录,执行sbt命令.会下载和解析很多jar包,要等很长时间,笔者大概花了一个多小时才执行完. 3.使用sbt生成Eclipse工程文件 等sbt提示符(>)出现后,输入Eclipse命令,开始生成Eclipse工程文件,也需要花费很长时间,笔者本地大致花了40分钟.完成时的状况如图1-21

《深入理解Spark:核心思想与源码分析》——3.1节SparkContext概述

3.1 SparkContext概述 Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端.了解Spark Driver的初始化,有助于读者理解用户应用程序在客户端的处理过程. Spark Driver的初始化始终围绕着SparkContext的初始化.SparkContext可以算得上是所有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动.SparkContext初始化完毕,才能向Spark集群提交任务.在平坦的公路上,发动机只需以较低的转速.较低的功率

《深入理解Spark:核心思想与源码分析》——1.5节小结

1.5 小结 本章通过引导大家在Linux操作系统下搭建基本的执行环境,并且介绍spark-shell等脚本的执行,来帮助读者由浅入深地进行Spark源码的学习.由于目前多数开发工作都在Windows系统下进行,并且Eclipse有最广大的用户群,即便是一些开始使用IntelliJ的用户对Eclipse也不陌生,所以在Windows环境下搭建源码阅读环境时,选择这些最常用的工具,能降低读者的学习门槛,并且替大家节省时间.

《深入理解Spark:核心思想与源码分析》——2.2节Spark基础知识

2.2 Spark基础知识 1.版本变迁 经过4年多的发展,Spark目前的版本是1.4.1.我们简单看看它的版本发展过程. 1)Spark诞生于UCBerkeley的AMP实验室(2009). 2)Spark正式对外开源(2010年). 3)Spark 0.6.0版本发布(2012-10-15),进行了大范围的性能改进,增加了一些新特性,并对Standalone部署模式进行了简化. 4)Spark 0.6.2版本发布(2013-02-07),解决了一些bug,并增强了系统的可用性. 5)Spa

《深入理解Spark:核心思想与源码分析》——3.6节创建任务调度器TaskScheduler

3.6 创建任务调度器TaskScheduler TaskScheduler也是SparkContext的重要组成部分,负责任务的提交,并且请求集群管理器对任务调度.TaskScheduler也可以看做任务调度的客户端.创建TaskScheduler的代码如下. private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) createTaskSchedu

《深入理解Spark:核心思想与源码分析》——2.4节Spark基本架构

2.4 Spark基本架构从集群部署的角度来看,Spark集群由以下部分组成:Cluster Manager:Spark的集群管理器,主要负责资源的分配与管理.集群管理器分配的资源属于一级分配,它将各个Worker上的内存.CPU等资源分配给应用程序,但是并不负责对Executor的资源分配.目前,Standalone.YARN.Mesos.EC2等都可以作为Spark的集群管理器.Worker:Spark的工作节点.对Spark应用程序来说,由集群管理器分配得到资源的Worker节点主要负责以