Spark sc.textFile(...).map(...).count() 执行完整流程

引子

今天正好有人在群里问到相关的问题,不过他的原始问题是:

我在RDD里面看到很多  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)),但是我找不到context是从哪里来的

另外还有pid,iter都是哪来的呢? 如果你照着源码点进去你会很困惑。为莫名其妙怎么就有了这些iterator呢?

Transform 和Action的来源

一般刚接触Spark 的同学,都会被告知这两个概念。Transform就是RDD的转换,从一个RDD转化到另一个RDD(也有多个的情况)。 Action则是出发实际的执行动作。

标题中的map就是一个典型的tansform操作,看源码,无非就是从当前的RDD构建了一个新的MapPartitionsRDD

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

这个新的RDD 接受了this作为参数,也就记住了他的父RDD。同时接受了一个匿名函数:

 (context, pid, iter) => iter.map(cleanF))

至于这个context,pid,iter是怎么来的,你当前是不知道的。你只是知道这个新的RDD,有这么一个函数。至于什么时候这个函数会被调用,我们下面会讲解到。

而一个Action是什么样的呢?我们看看count:

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

发现不一样了,要真的开始run Job了。sparkContext 的runJob 有很多种形态,这里你看到的是接受当前这个RDD 以及一个函数(Utils.getIteratorSize _)。

当然,这里的Utils.getItteratorSize 是一个已经实现好的函数:

  def getIteratorSize[T](iterator: Iterator[T]): Long = {
    var count = 0L
    while (iterator.hasNext) {
      count += 1L
      iterator.next()
    }
    count
  }

它符合 sc.runJob 需要接受的签名形态:

 func: Iterator[T] => U

Driver端的工作

这里你会见到一些熟悉的身影,比如dagScheduler,TaskScheduler,SchedulerBackend等。我们慢慢分解。

我们深入runJob,你马上就可以看到了dagScheduler了。

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

这里的cleanedFunc 就是前面那个 func: Iterator[T] => U 函数。在我们的例子里,就是一个计数的函数。

这样我们就顺利的离开SparkContext 进入DAGScheduler的王国了。

dagScheduler会进一步提交任务。

 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

请记住上面第二个参数,func其实就是前面的 Utils.getItteratorSize 函数,不过签名略有改变,添加了context,变成了这种形态:

(TaskContext, Iterator[_]) =>

接着会变成一个事件,发到事件队列里,其中 func2 还是上面的func,只是被改了名字而已。

eventProcessLoop.post(JobSubmitted(  jobId, rdd, func2, partitions.toArray, callSite, waiter,  SerializationUtils.clone(properties)))

dag会通过handleJobSubmitted 函数处理这个事件。在这里完成Stage的拆分。这个不是我们这次关注的主题,所以不详细讨论。最后,会把Stage进行提交:

 submitMissingTasks(finalStage)

提交到哪去了呢?会根据Stage的类型,生成实际的任务,然后序列化。序列化后通过广播机制发送到所有节点上去。

var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] = stage match {
        case stage: ShuffleMapStage =>
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
        case stage: ResultStage =>
          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
      }

      taskBinary = sc.broadcast(taskBinaryBytes)

然后生成tasks对象,ShuffleMapTask 或者ResultTask,我们这里的count是ResultTask,通过下面的方式提交:

 taskScheduler.submitTasks(new TaskSet(  tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))

现在我们进入 TaskSchedulerImpl 的地盘了。在submitTasks里我们调用了backend.我们接着就进入到CoarseGrainedSchedulerBackend.DriverEndpoint里。这个DriverEndPoint做完应该怎么把Task分配到哪些Executor的计算后,最后会去做真正的launchTask的工作:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

把序列化好的任务发送到Executor 上。到这里,Driver端的工作就完整了。

有一点你可能会比较好奇,为什么要做两次序列化,发送两次的? 也就是前面的taskBinary,还有serializedTask。 taskBinany 包括一些RDD,函数等信息。而serializedTask 这是整个Task的任务信息,比如对应的那个分区号等。后面我们还会看到taskBinary的身影。

Executor端

Executor 的入口是org.apache.spark.executor. Executor类。你可以看到梦寐以求的launchTask 方法

 def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

核心你看到了,是TaskRunner方法。进去看看,核心代码如下:

 val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = attemptNumber,
            metricsSystem = env.metricsSystem)

这个task(ResultTask).run里是我们最后的核心,真正的逻辑调用发生在这里:

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

前面通过taskBinary 还原出RDD,func。 而这里的func就是我们那个经过改良的

Utils.getItteratorSize函数,前面在driver端就被改造成func(context, rdd.iterator(partition, context)) 这种形态了。但是函数体还是下面的
  def getIteratorSize[T](iterator: Iterator[T]): Long = {
    var count = 0L
    while (iterator.hasNext) {
      count += 1L
      iterator.next()
    }
    count
  }

也就是是一个计数函数。参数iterator则是通过rdd.iterator(partition, context)拿到了。

总结

到此,我们完成了整个代码的流转过程。之所以很多人看到这些地会比较疑惑,是因为看到的代码都是在driver端的。但是最后这些任务都要被序列化发送到Executor端。所以一般我们看到的流程不是连续的。

时间: 2024-09-11 06:25:01

Spark sc.textFile(...).map(...).count() 执行完整流程的相关文章

线程互斥问题,程序执行的流程

问题描述 线程互斥问题,程序执行的流程 public class TT implements Runnable { int b = 100; public synchronized void m1() throws Exception{ b = 1000; Thread.sleep(5000); System.out.println("b = " + b); } public synchronized void m2() throws Exception { Thread.sleep(

hadoop-linux搭建spark源码环境,执行build/sdt gen-idea要twitter东西?

问题描述 linux搭建spark源码环境,执行build/sdt gen-idea要twitter东西? 我linux不会翻墙啊.... [error] Server access Error: 连接超时 url=http://maven.twttr.com/org/mortbay/jetty/jetty-parent/10/jetty-parent-10.jar [info] Resolving org.apache.hadoop#hadoop-project;2.2.0 ... [erro

php微信开发之自定义菜单完整流程_php技巧

一.自定义菜单概述 自定义菜单能够帮助公众号丰富界面,让用户更好更快地理解公众号的功能.开启自定义菜单后,公众号界面如图所示: 二.申请自定义菜单 个人订阅号使用微博认证.企业订阅号通过微信认证:可以申请到自定义菜单资格 服务号默认有菜单权限. 三.获得AppId 和AppSecert AppId和AppSecret在开发者中心-开发者ID中,可以找到. 四.获得Access Token 用appid和appsecert获得access token,接口为 https://api.weixin.

微信 java 实现js-sdk 图片上传下载完整流程_javascript技巧

最近做的一个项目刚好用到微信js-sdk的图片上传接口,在这里做一下总结. 在这里能知道使用js api的基本配置 https://mp.weixin.qq.com/wiki t=resource/res_main&id=mp1421141115&token=〈=zh_CN 我这里没有用checkJsApi去判断当前客户端版本是否支持指定JS接口,好.通过看开发文档,我们知道调用js接口直接都要通过config接口注入权限验证配置 <code class="hljs cs&

vim利用map映射执行脚本

最近开始捡起C语音来看,<c语言实用基础>,一边看一边做例子学习.然后发现,每次写完一个例子后,都要写几个字符,去编译并执行一下,然后就想看看能不能利用vim的map映射,把这种常规性的命令绑定到一个快捷键上去,查了一些资料后,果真是可以实现的. 先摆出来怎么用的. 打开~/.vimrc文件,并在最后以后之后粘贴上一下代码: Example  代码如下 复制代码 "映射命令行下的c编译并执行 cmap  !gcc % && ./a.out 然后保存并关闭. 再重新打开

艾伟也谈项目管理,让亲身实践者执行工作流程

文 / 黄易山 在这里,我使用"工作流程"这个词来描述"个人或团体为了完成一项活动而遵循的步骤"意义上的流程,以及组织的一般制度.随着一家公司的成长,有必要增加或整理工作流程. 最重要的利弊权衡通常是工作流程所带来的阻力,以及效率或效益上的收益孰轻孰重. 一方面,很难评估这种权衡中的利弊,因为其中牵涉到很多因素,所以有一条可能会有帮助的原则:只允许那些有特殊需要的工作流程被执行,而且要由那些直接使用它的人来执行.通常,经理和管理人员会提议工作流程,因为它会帮助他们更

IOS安装CocoaPods完整流程

安装CocoaPods之前要先安装Ruby环境 先安装这个home-brew:http://mxcl.github.com/homebrew/   步骤1 -安装RVM  RVM 是干什么的这里就不解释了,后面你将会慢慢搞明白.             <span class="gp" style="background-color:#c0c0c0"> $ curl -L https://get.rvm.io | bash -s stable</s

App项目设计开发完整流程

转载自:http://blog.csdn.net/demon614/article/details/39692827 作为一个PHP程序猿想转行APP开发可不是件容易的事情,话说隔行如隔山,这隔着一层语言也是多东西需要学习啊,一直对APP开发很感兴趣,最近请教了几个做移动开发的朋友,看了很多的资料,决定把自己学到的东西总结一下分享给和我一样刚做开发的菜鸟们. 1. idea形成--APP项目雏形 一个APP项目的最初首先要确定项目整体方案,整个项目的规划,大体框架,做成文档展现出来,以便大家提意

Ajax执行顺序流程及回调问题分析_基础知识

一个全局的变量var JsonData; 我这里有一个Ajax处理的方法: JScript code: 复制代码 代码如下: function GetJson(DataSourceName) { $.ajax({ type: "post", url: "Ajax/AjaxData.ashx?MethodName=" + DataSourceName, contentType: "application/json;", data: "&q