Spark修炼之道(高级篇)——Spark源码阅读:第七节 resourceOffers方法与launchTasks方法解析

在上一节中,我们提到Task提交通过makeOffers提交到Executor上

    // Make fake resource offers on just one executor
    private def makeOffers(executorId: String) {
      // Filter out executors under killing
      if (!executorsPendingToRemove.contains(executorId)) {
        val executorData = executorDataMap(executorId)
        val workOffers = Seq(
          new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
        launchTasks(scheduler.resourceOffers(workOffers))
      }
    }

上面的代码依赖于两个重要方法,它们分别是TaskSchedulerImpl resourceOffers方法及CoarseGrainedSchedulerBackend的launchTasks方法

//TaskSchedulerImpl resourceOffers方法
 /**
   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
   * that tasks are balanced across the cluster.
   */
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    // 处理新的executor加入
    var newExecAvail = false
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      activeExecutorIds += o.executorId
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    //随机打散,使Task均匀分配各Worker节点上
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray

    //根据调度策略获取ArrayBuffer[TaskSetManager]
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    //按就近原则进行Task调度
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

调用完resourceOffers方法后,再调用launchTasks方法,最终在Worker节点上启动任务的运行

//CoarseGrainedSchedulerBackend中的launchTasks方法
 // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        //序列化后的任何不能超过设定的大小
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          //Worker节点上的CoarseGrainedExecutorBackend对象将接受LaunchTask消息,在Worker节点的Executor上启动Task的执行
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }
时间: 2024-11-28 23:55:06

Spark修炼之道(高级篇)——Spark源码阅读:第七节 resourceOffers方法与launchTasks方法解析的相关文章

《Linux内核修炼之道》——分析内核源码如何入手?(上)

<Linux内核修炼之道>--分析内核源码如何入手?(上) 透过现象看本质,兽兽门无非就是一些人体艺术展示.同样往本质里看过去,学习内核,就是学习内核的源代码,任何内核有关的书籍都是基于内核,而又不高于内核的. 既然要学习内核源码,就要经常对内核代码进行分析,而内核代码千千万,还前仆后继的不断往里加,这就让大部分人都有种雾里看花花不见的无助感.不过不要怕,孔老夫子早就留给我们了应对之策:敏于事而慎于言,就有道而正焉,可谓好学也已.这就是说,做事要踏实才是好学生好同志,要遵循严谨的态度,去理解每

Spark修炼之道(高级篇)——Spark源码阅读:第二节 SparkContext的创建

博文推荐:http://blog.csdn.net/anzhsoft/article/details/39268963,由大神张安站写的Spark架构原理,使用Spark版本为1.2,本文以Spark 1.5.0为蓝本,介绍Spark应用程序的执行流程. 本文及后面的源码分析都以下列代码为样板 import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount{ def main(args: Array[String])

Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程

作者:摇摆少年梦 微信号: zhouzhihubeyond spark-submit 脚本应用程序提交流程 在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下: root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit --master spark://sparkmaster:7077 --class SparkWordC

Spark修炼之道(高级篇)——Spark源码阅读:第九节 Task执行成功时的结果处理

Task执行成功时的结果处理 在上一节中,给出了Task在Executor上的运行代码演示,我们知道代码的最终运行通过的是TaskRunner方法 class TaskRunner( execBackend: ExecutorBackend, val taskId: Long, val attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) extends Runnable { //其它无关代码省略 //向Driver端

Linux驱动修炼之道-SPI驱动框架源码分析(上)【转】

转自:http://blog.csdn.net/lanmanck/article/details/6895318 SPI驱动架构,以前用过,不过没这个详细,跟各位一起分享: 来自:http://blog.csdn.net/woshixingaaa/article/details/6574215     SPI协议是一种同步的串行数据连接标准,由摩托罗拉公司命名,可工作于全双工模式.相关通讯设备可工作于m/s模式.主设备发起数据帧,允许多个从设备的存在.每个从设备 有独立的片选信号,SPI一般来说

Flume-NG源码阅读:SourceRunner及选择器selector和拦截器interceptor的执行

在AbstractConfigurationProvider类中loadSources方法会将所有的source进行封装成SourceRunner放到了Map<String, SourceRunner> sourceRunnerMap之中.相关代码如下: Map<String, String> selectorConfig = context.getSubProperties( BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSEL

Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交

Task提交 在上一节中的 Stage提交中我们提到,最终stage被封装成TaskSet,使用taskScheduler.submitTasks提交,具体代码如下: taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties)) Stage由一系列的tasks组成,这些task被封装成TaskSet,TaskSet

Spark修炼之道(高级篇)——Spark源码阅读:第八节 Task执行

Task执行 在上一节中,我们提到在Driver端CoarseGrainedSchedulerBackend中的launchTasks方法向Worker节点中的Executor发送启动任务命令,该命令的接收者是CoarseGrainedExecutorBackend(Standalone模式),类定义源码如下: private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: St

Spark修炼之道(高级篇)——Spark源码阅读:第十三节 Spark SQL之SQLContext(一)

作者:周志湖 1. SQLContext的创建 SQLContext是Spark SQL进行结构化数据处理的入口,可以通过它进行DataFrame的创建及SQL的执行,其创建方式如下: //sc为SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) 其对应的源码为: def this(sparkContext: SparkContext) = { this(sparkContext, new CacheMana

Spark修炼之道系列教程预告

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 部分内容会在实际编写时动态调整,或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据开