Apache Spark源码走读(十)ShuffleMapTask计算结果的保存与读取 &WEB UI和Metrics初始化及数据更新过程分析

<一>ShuffleMapTask计算结果的保存与读取

概要

ShuffleMapTask的计算结果保存在哪,随后Stage中的task又是如何知道从哪里去读取的呢,这个过程一直让我困惑不已。

用比较通俗一点的说法来解释一下Shuffle数据的写入和读取过程

  1. 每一个task负责处理一个特定的data partition
  2. task在初始化的时候就已经明确处理结果可能会产生多少个不同的data partition
  3. 利用partitioner函数,task将处理结果存入到不同的partition,这些数据存放在当前task执行的机器上
  4. 假设当前是stage 2有两个task, stage 2可能输出4个不同的data partition, task 0和task 1各自运行于不同的机器上,task 0中的部分处理结果会存入到data partition 0,task 1的部分处理结果也可能存入到data partition 0.
  5. 由于stage 2产生了4个不同的data partition, 后续stage 1中的task个数就为4. task 0 就负责读取data partition 0的数据,对于(stage1, task0)来说,所要读取的data partition 0的内容由task 0和task 1中的partition 0共同组成。
  6. 现在问题的关键转换成为(stage_1, task_0)如何知道(stage_2, task_x)有没有相应的输出是属于data partition 0的呢?这个问题的解决就是MapStatus
  7. 每一个ShuffleMapTask在执行结束,都会上报一个MapStatus,在MapStatus中会反应出朝哪些data partition写入了数据,写入了数据则size为非零值,否则为零值
  8. (stage_1,task_0)会去获取stage_2中所有task的MapStatus,以判定(stage_2, task_x)产生的数据中有自己需要读入的内容
  9. 假设(stage_1,task_0)知道(stage_2, task_0)生成了data partition 0中的数据,于是去(stage_2, task_0)运行时的机器去获取具体的数据,如果恰巧这个时候远端机器已经挂掉了,获取失败,怎么办?
  10. 上报异常,由DAGScheduler重新调度(stage_2,task_0),重新生成所需要的数据。
  11. Spark不像Hadoop中的MapReduce有一个明显的combine阶段,在spark中combine过程有两次调用,一是Shuffle数据写入过程,另一个是Shuffle数据读取过程。

如果能够明白上述的过程,并对应到相应的代码,那就无须看下述的详细解释了。

好了,让我们开始代码跟踪吧。

数据写入过程

数据写入动作最原始的触发点是ShuffleMapTask.runTask函数,看一看源码先。

  override def runTask(context: TaskContext): MapStatus = {
    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(split, context).asInstanceOf[Iterator[_
        if (writer != null) {
          writer.stop(success = false)
        }
        throw e
    } finally {
      context.executeOnCompleteCallbacks()
    }
  }

managerGetWriter返回的是HashShuffleWriter,所以调用过程是:ShuffleMapTask.runTask->HashShuffleWriter.write->BlockObjectWriter.write. 注意dep.mapSideCombine这一分支判断。ReduceByKey(_ + _)中的(_ + _)在此处被执行一次,另一次执行是在read过程。

  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
      throw new IllegalStateException("Aggregator is empty for map-side combine")
    } else {
      records
    }

    for (elem <- iter) {
      val bucketId = dep.partitioner.getPartition(elem._1)
      shuffle.writers(bucketId).write(elem)
    }

HashShuffleWriter.write中主要处理两件事:

  1. 判断是否需要进行聚合,比如<hello,1>和<hello,1>都要写入的话,那么先生成<hello,2>然后再进行后续的写入工作
  2. 利用Partitioner函数来决定<k,val>写入到哪一个文件中

Partitioner是在什么时候注入的,RDD抽象类中,Partitioner为空?以reduceByKey为例,HashPartitioner会在后面combineByKey的代码创建ShuffledRDD的时候作为ShuffledRDD的构造函数传入。

  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

Stage在创建的时候通过构造函数入参明确需要从多少Partition读取数据,生成的Partition会有多少。看一看Stage的构造函数,读取的分区数目由RDD.partitions.size决定,输出的partitions由shuffleDep决定。

private[spark] class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val shuffleDep: Option[ShuffleDependency[_, _, _]],  // Output shuffle if stage is a map stage
    val parents: List[Stage],
    val jobId: Int,
    val callSite: CallSite)
extends Logging {
  val isShuffleMap = shuffleDep.isDefined
  val numPartitions = rdd.partitions.size
  val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
  var numAvailableOutputs = 0
  private var nextAttemptId = 0

回到数据写入的问题上来,结果写入时的一个主要问题就是已经知道shuffle_id, map_id和要写入的elem,如何找到对应的写入文件。每一个临时文件由三元组(shuffle_id,map_id,reduce_id)来决定,当前已经知道了两个,还剩下一下reduce_id待确定。

reduce_id是使用partitioner计算出来的结果,输入的是elem的键值。也就是dep.partitioner.getPartition(elem._1)。 根据计算出来的bucketid找到对应的writer,然后真正写入。

在HashShuffleWriter.write中使用到的shuffle由ShuffleBlockManager的forMapTask函数生成,注意forMapTask中产生writers的代码逻辑。

每个writer分配一下文件, 文件名由三元组(shuffle_id,map_id,reduce_id)组成,如果知道了这个三元组就可以找到对应的文件。

如果consolidation没有打开,那么在一个task中,有多少个输出的partition就会有多少个中间文件。

      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
        }
      } else {
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          val blockFile = blockManager.diskBlockManager.getFile(blockId)
          // Because of previous failures, the shuffle file may already exist on this machine.
          // If so, remove it.
          if (blockFile.exists) {
            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
        }
      }

getFile负责将三元组(shuffle_id,map_id,reduce_id)映射到文件名

def getFile(filename: String): File = {
    // Figure out which local directory it hashes to, and which subdirectory in that
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    var subDir = subDirs(dirId)(subDirId)
    if (subDir == null) {
      subDir = subDirs(dirId).synchronized {
        val old = subDirs(dirId)(subDirId)
        if (old != null) {
          old
        } else {
          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
          newDir.mkdir()
          subDirs(dirId)(subDirId) = newDir
          newDir
        }
      }
    }

    new File(subDir, filename)
  }

  def getFile(blockId: BlockId): File = getFile(blockId.name)

产生的文件在哪呢,如果没有更改默认的配置,生成的目录结构类似于下

/tmp/spark-local-20140723092540-7f24
/tmp/spark-local-20140723092540-7f24/0d
/tmp/spark-local-20140723092540-7f24/0d/shuffle_0_0_1
/tmp/spark-local-20140723092540-7f24/0d/shuffle_0_1_0
/tmp/spark-local-20140723092540-7f24/0c
/tmp/spark-local-20140723092540-7f24/0c/shuffle_0_0_0
/tmp/spark-local-20140723092540-7f24/0e
/tmp/spark-local-20140723092540-7f24/0e/shuffle_0_1_1 

当所有的数据写入文件并提交以后,还需要生成MapStatus汇报给driver application. MapStatus在哪生成的呢?commitWritesAndBuildStatus就干这活。

调用关系HashShuffleWriter.stop->commitWritesAndBuildStatus

private def commitWritesAndBuildStatus(): MapStatus = {
    // Commit the writes. Get the size of each bucket block (total block size).
    var totalBytes = 0L
    var totalTime = 0L
    val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
      writer.commit()
      writer.close()
      val size = writer.fileSegment().length
      totalBytes += size
      totalTime += writer.timeWriting()
      MapOutputTracker.compressSize(size)
    }

    // Update shuffle metrics.
    val shuffleMetrics = new ShuffleWriteMetrics
    shuffleMetrics.shuffleBytesWritten = totalBytes
    shuffleMetrics.shuffleWriteTime = totalTime
    metrics.shuffleWriteMetrics = Some(shuffleMetrics)

    new MapStatus(blockManager.blockManagerId, compressedSizes)
  }

compressedSize是一个非常让人疑惑的地方,原因慢慢道来,先看一下MapStatus的构造函数

class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])

 compressedSize是一个byte数组,每一个byte反应了该partiton中的数据大小。如Array(0)=128就表示在data partition 0中有128byte数据。

问题的问题是一个byte只能表示255,如果超过255怎么办呢?

当当当,数学闪亮登场了,注意到compressSize没,通过转换将2^8变换为1.1^256。一下子由255byte延伸到近35G.

看一看这神奇的compressSize函数吧,只是聊聊几行代码而已。

  def compressSize(size: Long): Byte = {
    if (size == 0) {
      0
    } else if (size <= 1L) {
      1
    } else {
      math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
    }
  }

 ShuffleMapTask运行结束时,会将MapStatus结果封装在StatusUpdate消息中汇报给SchedulerBackend, 由DAGScheduler在handleTaskCompletion函数中将MapStatus加入到相应的Stage。这一过程略过,不再详述。

MapOutputTrackerMaster会保存所有最新的MapStatus.

只画张图来表示存储之后的示意。

 

数据读取过程

ShuffledRDD.compute函数是读取过程的触发点。

  override def compute(split: Partition, context: TaskContext): Iterator[P] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[P]]
  }

shuffleManager.getReader返回的是HashShuffleReader,所以看一看HashShuffleReader中的read函数的具体实现。

read函数处理逻辑中需要注意到一点即combine过程有可能会被再次执行。注意dep.aggregator.isDefined这一分支判断。ReduceByKey(_ + _)中的(_ + _)在此处被执行。

override def read(): Iterator[Product2[K, C]] = {
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context,
      Serializer.getSerializer(dep.serializer))

    if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
      throw new IllegalStateException("Aggregator is empty for map-side combine")
    } else {
      iter
    }
  }

一路辗转,终于来到了读取过程中非常关键的所在BlockStoreShuffleFetcher。

BlockStoreShuffleFetcher需要回答如下问题

  1. 所要获取的mapid的mapstatus的内容是什么
  2. 根据获得的mapstatus去相应的blockmanager获取具体的数据
 val blockManager = SparkEnv.get.blockManager

  val startTime = System.currentTimeMillis
  val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
  logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
  shuffleId, reduceId, System.currentTimeMillis - startTime))

  val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
  for (((address, size), index)
    (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
  }
  val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
  val itr = blockFetcherItr.flatMap(unpackBlock)

一个ShuffleMapTask会生成一个MapStatus,MapStatus中含有当前ShuffleMapTask产生的数据落到各个Partition中的大小。如果大小为0,则表示该分区没有数据产生。MapStatus中另一个重要的成员变量就是BlockManagerId,该变量表示目标数据在哪个BlockManager当中。

MapoutputTrackerMaster拥有最新的MapStatus信息,为了执行效率,MapoutputTrackerWorker会定期更新数据到本地,所以MapoutputTracker先从本地查找,如果找不到再从MapoutputTrackerMaster上同步最新数据。

索引即是reduceId,如果array(0) == 0,就表示上一个ShuffleMapTask中生成的数据中没有任意的内容可以作为reduceId为0的ResultTask的输入。如果不能理解,返回仔细看一下MapStatus的结构图。

BlockManager.getMultiple用于读取BlockManager中的数据,根据配置确定生成tNettyBlockFetcherIterator还是BasicBlockFetcherIterator。

如果所要获取的文件落在本地,则调用getLocal读取,否则发送请求到远端blockmanager。看一下BlockFetcherIterator的initialize函数

    override def initialize() {
      // Split local and remote blocks.
      val remoteRequests = splitLocalRemoteBlocks()
      // Add the remote requests into our queue in a random order
      fetchRequests ++= Utils.randomize(remoteRequests)

      // Send out initial requests for blocks, up to our maxBytesInFlight
      while (!fetchRequests.isEmpty &&
        (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
        sendRequest(fetchRequests.dequeue())
      }

      val numFetches = remoteRequests.size - fetchRequests.size
      logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

      // Get Local Blocks
      startTime = System.currentTimeMillis
      getLocalBlocks()
      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
}

至此,数据读取的正常流程讲述完毕。

数据读取异常

如果数据读取中碰到异常怎么办?比如,

  1. 已知(stage_2,task_0)产生的parition_0的数据在机器m1, 当前任务在m2执行,于是从m2向m1发起远程获取请求,如果m2中拥有目标数据的JVM进程异常退出,则相应的目标数据无法获取。

如果无法获取目标数据,就会上报FetchFailedException.

    def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
      val blockId = blockPair._1
      val blockOption = blockPair._2
      blockOption match {
        case Some(block) => {
          block.asInstanceOf[Iterator[T]]
        }
        case None => {
          blockId match {
            case ShuffleBlockId(shufId, mapId, _) =>
              val address = statuses(mapId.toInt)._1
              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
            case _ =>
              throw new SparkException(
                "Failed to get block " + blockId + ", which is not a shuffle block")
          }
        }
      }
    }

 FetchFailedExecption会被包装在StatutsUpdate上报给SchedulerBackend,然后一路处理下去,最终将丢失目标数据的归属Task重新提交。比如当前是(stage_1, task_0),需要读取(stage_2, task_1)产生的目标数据,但是对应的目标数据丢失,这个时候就需要将(stage_2, task_1)重新提交运行。

注意DAGScheduler中的FetchFailed处理分支,一路跟踪下去就会看到任务被重新提交了

  case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
        // Mark the stage that the reducer was in as unrunnable
        val failedStage = stageIdToStage(task.stageId)
        runningStages -= failedStage
        // TODO: Cancel running tasks in the stage
        logInfo("Marking " + failedStage + " (" + failedStage.name +
          ") for resubmision due to a fetch failure")
        // Mark the map whose fetch failed as broken in the map stage
        val mapStage = shuffleToMapStage(shuffleId)
        if (mapId != -1) {
          mapStage.removeOutputLoc(mapId, bmAddress)
          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
        }
        logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
          "); marking it for resubmission")
        if (failedStages.isEmpty && eventProcessActor != null) {
          // Don't schedule an event to resubmit failed stages if failed isn't empty, because
          // in that case the event will already have been scheduled. eventProcessActor may be
          // null during unit tests.
          import env.actorSystem.dispatcher
          env.actorSystem.scheduler.scheduleOnce(
            RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
        }
        failedStages += failedStage
        failedStages += mapStage
        // TODO: mark the executor as failed only if there were lots of fetch failures on it
        if (bmAddress != null) {
          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
        }

文件清除

生成的中间数据是在什么时候被清除的呢?

当Driver Application退出的时候,该Application生成的临时文件将会被一一清除,注意是application结束生命,不是job。一个application可以包含一至多个job。

实验

以local-cluster方式运行spark-shell,观察/tmp/spark-local*目录下的文件变化,具体指令如下

MASTER=local-cluster[2,2,512] bin/spark-shell
#进入spark-shell之后,输入
sc.textFile("README.md").flatMap(_.split(" ")).map(w=>(w,1)).reduceByKey(_ + _)

小结

Shuffle数据的写入和读取是Spark Core这一部分最为复杂的内容,彻底了解该部分内容才能深刻意识到Spark实现的精髓所在。

<二>WEB UI和Metrics初始化及数据更新过程分析

概要

WEB UI和Metrics子系统为外部观察监测Spark内部运行情况提供了必要的窗口,本文将简略的过一下其内部代码实现。

WEB UI

先上图感受一下spark webui 假设当前已经在本机运行standalone cluster模式,输入http://127.0.0.1:8080将会看到如下页面

  driver application默认会打开4040端口进行http监听,可以看到application相关的详细信息

显示每个stage的详细信息

启动过程

本节要讨论的重点是http server是如何启动的,页面中的数据是从哪里获取到的?Spark中用到的http server是jetty, jetty采用java编写,是非常轻巧的servlet engine和http server。能够嵌入到用户程序中执行,不用像tomcat或jboss那样需要自己独立的jvm进程。

SparkUI在SparkContext初始化的时候创建

// Initialize the Spark UI , registering all
associated listeners
private [spark] val ui = new SparkUI (this)
ui.bind ()

initialize的主要工作是注册页面处理句柄,WebUI的子类需要实现自己的initialize函数

bind将真正启动jetty server.

def bind () {
assert (! serverInfo .isDefined , " Attempted to bind %
s more than once!". format ( className ))
try {
// 启 动 JettyServer
serverInfo = Some( startJettyServer (" 0.0.0.0 ",
port , handlers , conf))
logInfo (" Started %s at http ://%s:%d". format (
className , publicHostName , boundPort ))
} catch {
case e: Exception =>
logError (" Failed to bind %s". format ( className )
, e)
System .exit (1)
}
}

在startJettyServer函数中将JettyServer运行起来的关键处理函数是connect

 def connect(currentPort: Int): (Server, Int) = {
      val server = new Server(new InetSocketAddress(hostName, currentPort))
      val pool = new QueuedThreadPool
      pool.setDaemon(true)
      server.setThreadPool(pool)
      server.setHandler(collection)

      Try {
        server.start()
      } match {
        case s: Success[_] =>
          (server, server.getConnectors.head.getLocalPort)
        case f: Failure[_] =>
          val nextPort = (currentPort + 1) % 65536
          server.stop()
          pool.stop()
          val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
          if (f.toString.contains("Address already in use")) {
            logWarning(s"$msg - $f")
          } else {
            logError(msg, f.exception)
          }
          connect(nextPort)
      }
    }

    val (server, boundPort) = connect(port)
    ServerInfo(server, boundPort, collection)
  }

数据获取

页面中的数据是如何获取的呢,这就要归功于SparkListener了,典型的观察者设计模式。当有与stage及task相关的事件发生时,这些Listener都将收到通知,并进行数据更新。

需要指出的是,数据尽管得以自动更新,但页面并没有,还是需要手工刷新才能得到最新的数据。

 

上图显示的是SparkUI中注册了哪些SparkListener子类。来看一看这些子类是在什么时候注册进去的, 注意研究一下SparkUI.initialize函

 def initialize() {
    listenerBus.addListener(storageStatusListener)
    val jobProgressTab = new JobProgressTab(this)
    attachTab(jobProgressTab)
    attachTab(new StorageTab(this))
    attachTab(new EnvironmentTab(this))
    attachTab(new ExecutorsTab(this))
    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
    attachHandler(
      createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
    if (live) {
      sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
    }
  }

举一个实际例子来看看Notifier发送Event的时刻,比如有任务提交的时 resourceOffer->taskStarted->handleBeginEvent

private [ scheduler ] def handleBeginEvent (task: Task[_
], taskInfo : TaskInfo ) {
listenerBus .post( SparkListenerTaskStart (task.
stageId , taskInfo ))
submitWaitingStages ()
}

post其实是向listenerBus的消息队列中添加一个消息,真正将消息发送 出去的时另一个处理线程listenerThread

override def run (): Unit = Utils.
logUncaughtExceptions {
while (true) {
eventLock . acquire ()
// Atomically remove and process this event
LiveListenerBus .this. synchronized {
val event = eventQueue .poll
if (event == SparkListenerShutdown ) {
// Get out of the while loop and shutdown
the daemon thread
return
}
Option (event). foreach ( postToAll )
}
}
}

Option(event).foreach(postToAll)负责将事件通知给各个Observer.postToAll的函数实现如下

def postToAll(event: SparkListenerEvent) {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        foreachListener(_.onStageSubmitted(stageSubmitted))
      case stageCompleted: SparkListenerStageCompleted =>
        foreachListener(_.onStageCompleted(stageCompleted))
      case jobStart: SparkListenerJobStart =>
        foreachListener(_.onJobStart(jobStart))
      case jobEnd: SparkListenerJobEnd =>
        foreachListener(_.onJobEnd(jobEnd))
      case taskStart: SparkListenerTaskStart =>
        foreachListener(_.onTaskStart(taskStart))
      case taskGettingResult: SparkListenerTaskGettingResult =>
        foreachListener(_.onTaskGettingResult(taskGettingResult))
      case taskEnd: SparkListenerTaskEnd =>
        foreachListener(_.onTaskEnd(taskEnd))
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        foreachListener(_.onEnvironmentUpdate(environmentUpdate))
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        foreachListener(_.onBlockManagerAdded(blockManagerAdded))
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
      case unpersistRDD: SparkListenerUnpersistRDD =>
        foreachListener(_.onUnpersistRDD(unpersistRDD))
      case applicationStart: SparkListenerApplicationStart =>
        foreachListener(_.onApplicationStart(applicationStart))
      case applicationEnd: SparkListenerApplicationEnd =>
        foreachListener(_.onApplicationEnd(applicationEnd))
      case SparkListenerShutdown =>
    }
  }

Metrics

在系统设计中,测量模块是不可或缺的组成部分。通过这些测量数据来感知系统的运行情况。

在Spark中,测量模块由MetricsSystem来担任,MetricsSystem中有三个重要的概念,分述如下。

  • instance 表示谁在使用metrics system, 目前已知的有master, worker, executor和client driver会创建metrics system用以测量
  • source 表示数据源,从哪里获取数据
  • sinks 数据目的地,将从source获取的数据发送到哪

Spark目前支持将测量数据保存或发送到如下目的地

  • ConsoleSink 输出到console
  • CSVSink 定期保存成为CSV文件
  • JmxSink 注册到JMX,以通过JMXConsole来查看
  • MetricsServlet 在SparkUI中添加MetricsServlet用以查看Task运行时的测量数据
  • GraphiteSink 发送给Graphite以对整个系统(不仅仅包括spark)进行监控

下面从MetricsSystem的创建,数据源的添加,数据更新与发送几个方面来跟踪一下源码。

初始化过程

MetricsSystem依赖于由codahale提供的第三方库Metrics,可以在metrics.codahale.com找到更为详细的介绍。

以Driver Application为例,driver application首先会初始化SparkContext,在SparkContext的初始化过程中就会创建MetricsSystem,具体调用关系如下。 SparkContext.init->SparkEnv.init->MetricsSystem.createMetricsSystem

注册数据源,继续以SparkContext为例

  private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
  private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)

  private def initDriverMetrics() {
    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
  }
initDriverMetrics()

数据读取

数据读取由Sink来完成,在Spark中创建的Sink子类如下图所示

读取最新的数据,以CsvSink为例,最主要的就是创建CsvReporter,启动之后会定期更新最近的数据到console。不同类型的Sink所使用的Reporter是不一样的。

 val reporter: CsvReporter = CsvReporter.forRegistry(registry)
      .formatFor(Locale.US)
      .convertDurationsTo(TimeUnit.MILLISECONDS)
      .convertRatesTo(TimeUnit.SECONDS)
      .build(new File(pollDir))

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

Spark中关于metrics子系统的配置文件详见conf/metrics.properties. 默认的Sink是MetricsServlet,在任务提交执行之后,输入http://127.0.0.1:4040/metrics/json会得到以json格式保存的metrics信息。

时间: 2024-07-29 02:19:12

Apache Spark源码走读(十)ShuffleMapTask计算结果的保存与读取 &WEB UI和Metrics初始化及数据更新过程分析的相关文章

Apache Spark源码走读

http://www.aliyun.com/zixun/aggregation/13383.html">Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,即使使用磁盘,迭代类型的计算也会有10倍速度的提升.Spark从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手.Spark当下已成为Apache基金会的顶级开源项目,拥有着庞大的社区支持--活跃开发者人数已超过Hadoop MapReduc

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

<一>Standalone部署方式分析 楔子 在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解. 没有HA的Standalone运行模式 先从比较简单的说起,所谓的没有ha是指master节点没有ha. 组成cluster的两大元素即Master和Worker.slave worker可以有

Apache Spark源码走读(十一)浅谈mllib中线性回归的算法实现&amp;Spark MLLib中拟牛顿法L-BFGS的源码实现

<一>浅谈mllib中线性回归的算法实现 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最

Apache Spark源码走读(四)Hive on Spark运行环境搭建 &amp;hiveql on spark实现详解

<一>Hive on Spark运行环境搭建 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就

Apache Spark源码走读(六)Task运行期之函数调用关系分析 &amp;存储子系统分析

<一>Task运行期之函数调用关系分析 概要 本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回. 准备 spark已经安装完毕 spark运行在local mode或local-cluster mode local-cluster mode local-cluster模式也称为伪分布式,可以使用如下指令运行 MASTER=local[1,2,1024] bin/spark-shell

Apache Spark源码走读(八)Graphx实现剖析&amp;spark repl实现详解

<一>Graphx实现剖析 概要 图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架.Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情. Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口.本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习. Google为什么赢得了搜索引擎大战 当Google还在起步的时候,在

Apache Spark源码走读(三)Spark on Yarn &amp;Spark源码编译 &amp;在YARN上运行SparkPi

<一>Spark on Yarn 概要 Hadoop2中的Yarn是一个分布式计算资源的管理平台,由于其有极好的模型抽象,非常有可能成为分布式计算资源管理的事实标准.其主要职责将是分布式计算集群的管理,集群中计算资源的管理与分配. Yarn为应用程序开发提供了比较好的实现标准,Spark支持Yarn部署,本文将就Spark如何实现在Yarn平台上的部署作比较详尽的分析. Spark Standalone部署模式回顾 上图是Spark Standalone Cluster中计算模块的简要示意,从

Apache Spark源码走读(九)如何进行代码跟读&amp;使用Intellij idea调试Spark源码

<一>如何进行代码跟读 概要 今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读.众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢? new Throwable().printStackTrace 代码跟读的时候,经常会借助于日志,针对日志中输出的每一句,我们都很想知道它们的调用者是谁.但有时苦于对spark系统的了解程度不深,或者对scala认识不

Apache Spark源码走读(十二)Sort-based Shuffle的设计与实现

概要 Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析. Sort-based Shuffle之初体验 通过一个小的实验来直观的感受一下sort-based shuffle算法会产生哪些中间文件,具体实验步骤如下所述. 步骤1: 修改conf/spark-default.conf, 加入如下内容 spark.shuffle.manager SORT 步骤2: 运行spark-shell SPARK