Spark源码分析:多种部署方式之间的区别与联系(1)

  《">Spark源码分析:多种部署方式之间的区别与联系(1)》

  《Spark源码分析:多种部署方式之间的区别与联系(2)》

  从官方的文档我们可以知道,Spark的部署方式有很多种:local、Standalone、Mesos、YARN…..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来看,其实流程都差不多。
  从代码中,我们可以得知其实Spark的部署方式其实比官方文档中介绍的还要多,这里我来列举一下:

  1、local:这种方式是在本地启动一个线程来运行作业;

  2、local[N]:也是本地模式,但是启动了N个线程;

  3、local[*]:还是本地模式,但是用了系统中所
有的核;

  4、local[N,M]:这里有两个参数,第一个代表的是用到的核个数;第二个参数代表的是容许该作业失败M次。上面的几种模式没有指定M参数,其默认值都是1;

  5、local-cluster[N, cores, memory]:本地伪集群模式,参数的含义我就不说了,看名字就知道;式;

  6、spark:// :这是用到了Spark的Standalone模

  7、(mesos|zk)://:这是Mesos模式;

  8、yarn-standalone\yarn-cluster\yarn-client:这是YARN模式。
前面两种代表的是集群模式;后面代表的是客户端模式;

  9、simr://:这种你就不知道了吧?simr其实是Spark In MapReduce的缩写。我们知道MapReduce 1中是没有YARN的,如果你在MapReduce 1中使用Spark,
那么就用这种模式吧。

  总体来说,上面列出的各种部署方式运行的流程大致一样:都是从SparkContext切入,在SparkContext的初始化过程中主要做了以下几件事:
  1、根据SparkConf创建SparkEnv

01// Create the Spark execution environment (cache, map output tracker, etc)02  private[spark] val env = SparkEnv.create(03    conf,04    "<driver>",05    conf.get("spark.driver.host"),06    conf.get("spark.driver.port").toInt,07    isDriver = true,08    isLocal = isLocal,09    listenerBus = listenerBus)10  SparkEnv.set(env)

  2、初始化executor的环境变量executorEnvs
  这个步骤代码太多了,我就不贴出来。
  3、创建TaskScheduler

1// Create and start the scheduler2  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)

  4、创建DAGScheduler

1@volatile private[spark] var dagScheduler: DAGScheduler = _2  try {3    dagScheduler = new DAGScheduler(this)4  } catch {5    case e: Exception => throw6      new SparkException("DAGScheduler 7                     cannot be initialized due to %s".format(e.getMessage))8  }

  5、启动TaskScheduler

1// start TaskScheduler after taskScheduler 2// sets DAGScheduler reference in DAGScheduler's3  // constructor4  taskScheduler.start()

  那么,DAGScheduler和TaskScheduler都是什么?
  DAGScheduler称为作业调度,它基于Stage的高层调度模块的实现,它为每个Job的Stages计算DAG,记录哪些RDD和 Stage的输出已经实物化,然后找到最小的调度方式来运行这个Job。然后以Task Sets的形式提交给底层的任务调度模块来具体执行。
  TaskScheduler称为任务调度。它是低层次的task调度接口,目前仅仅被TaskSchedulerImpl实现。这个接口可以以插件的 形式应用在不同的task调度器中。每个TaskScheduler只给一个SparkContext调度task,这些调度器接受来自 DAGScheduler中的每个stage提交的tasks,并负责将这些tasks提交给cluster运行。如果提交失败了,它将会重试;并处理 stragglers。所有的事件都返回到DAGScheduler中。
  在创建DAGScheduler的时候,程序已经将taskScheduler作为参数传进去了,代码如下:

01def this(sc: SparkContext, taskScheduler: TaskScheduler) = {02    this(03      sc,04      taskScheduler,05      sc.listenerBus,06      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],07      sc.env.blockManager.master,08      sc.env)09  }10 11  def this(sc: SparkContext) = this(sc, sc.taskScheduler)

也就是DAGScheduler封装了TaskScheduler。TaskScheduler中有两个比较重要的方法:

1// Submit a sequence of tasks to run.2def submitTasks(taskSet: TaskSet): Unit3 4// Cancel a stage.5def cancelTasks(stageId: Int, interruptThread: Boolean)

  这些方法在DAGScheduler中被调用,而TaskSchedulerImpl实现了TaskScheduler,为各种调度模式提供了 任务调度接口,在TaskSchedulerImpl中还实现了resourceOffers和statusUpdate两个接口给Backend调用, 用于提供调度资源和更新任务状态。
  在YARN模式中,还提供了YarnClusterScheduler类,他只是简单地继承TaskSchedulerImpl类,主要重写了getRackForHost(hostPort: String)和postStartHook() 方法。继承图如下:

在下篇文章中,我将介绍上面九种部署模式涉及到的各种类及其之间的关系。欢迎关注本博客!这里先列出下篇文章用到的类图

尊重原创,转载请注明: 转载自过往记忆(http://www.iteblog.com/)                  原文链接:http://www.iteblog.com/archives/1181

时间: 2024-12-18 22:15:16

Spark源码分析:多种部署方式之间的区别与联系(1)的相关文章

Spark源码分析之五:Task调度(一)

        在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析 – BlockManager

参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block  所以storage模块,就是要实现RDD在memory和disk上的persistent功能 首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave  master负责trac

Spark源码分析 – Dependency

Dependency 依赖, 用于表示RDD之间的因果关系, 一个dependency表示一个parent rdd, 所以在RDD中使用Seq[Dependency[_]]来表示所有的依赖关系   Dependency的base class  可见Dependency唯一的成员就是rdd, 即所依赖的rdd, 或parent rdd /** * Base class for dependencies. */ abstract class Dependency[T](val rdd: RDD[T]

Spark源码分析之七:Task运行(一)

        在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq

Spark源码分析之九:内存管理模型

        Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Spark内存管理模型的神秘面纱.         我们在<Spark源码分析之七:Task运行(一)>一文中曾经提到过,在Task被传递到Executor上去执行时,在为其分配的TaskRunner线程的run()方法内,在Task真正运行之前,我们就要构造一个任务内存管理器Task

Spark源码分析之八:Task运行(二)

        在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤:         Step1:Task及其运行时需要的辅助对象构造,主要包括:                        1.当前线程设置上下文类加载器:        

Spark源码分析之Spark Shell(下)

继上次的Spark-shell脚本源码分析,还剩下后面半段.由于上次涉及了不少shell的基本内容,因此就把trap和stty放在这篇来讲述. 上篇回顾:Spark源码分析之Spark Shell(上) function main() { if $cygwin; then # Workaround for issue involving JLine and Cygwin # (see http://sourceforge.net/p/jline/bugs/40/). # If you're us

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那