从源码角度看Spark on yarn client & cluster模式的本质区别

首先区分下AppMaster和Driver,任何一个yarn上运行的任务都必须有一个AppMaster,而任何一个Spark任务都会有一个Driver,Driver就是运行SparkContext(它会构建TaskScheduler和DAGScheduler)的进程,当然在Driver上你也可以做很多非Spark的事情,这些事情只会在Driver上面执行,而由SparkContext上牵引出来的代码则会由DAGScheduler分析,并形成Job和Stage交由TaskScheduler,再由TaskScheduler交由各Executor分布式执行。

所以Driver和AppMaster是两个完全不同的东西,Driver是控制Spark计算和任务资源的,而AppMaster是控制yarn app运行和任务资源的,只不过在Spark on Yarn上,这两者就出现了交叉,而在standalone模式下,资源则由Driver管理。在Spark on Yarn上,Driver会和AppMaster通信,资源的申请由AppMaster来完成,而任务的调度和执行则由Driver完成,Driver会通过与AppMaster通信来让Executor的执行具体的任务。

client与cluster的区别

对于yarn-client和yarn-cluster的唯一区别在于,yarn-client的Driver运行在本地,而AppMaster运行在yarn的一个节点上,他们之间进行远程通信,AppMaster只负责资源申请和释放(当然还有DelegationToken的刷新),然后等待Driver的完成;而yarn-cluster的Driver则运行在AppMaster所在的container里,Driver和AppMaster是同一个进程的两个不同线程,它们之间也会进行通信,AppMaster同样等待Driver的完成,从而释放资源。

Spark里AppMaster的实现:org.apache.spark.deploy.yarn.ApplicationMaster Yarn里MapReduce的AppMaster实现:org.apache.hadoop.mapreduce.v2.app.MRAppMaster

在yarn-client模式里,优先运行的是Driver(我们写的应用代码就是入口),然后在初始化SparkContext的时候,会作为client端向yarn申请AppMaster资源,当AppMaster运行后,它会向yarn注册自己并申请Executor资源,之后由本地Driver与其通信控制任务运行,而AppMaster则时刻监控Driver的运行情况,如果Driver完成或意外退出,AppMaster会释放资源并注销自己。所以在该模式下,如果运行spark-submit的程序退出了,整个任务也就退出了

在yarn-cluster模式里,本地进程则仅仅只是一个client,它会优先向yarn申请AppMaster资源运行AppMaster,在运行AppMaster的时候通过反射启动Driver(我们的应用代码),在SparkContext初始化成功后,再向yarn注册自己并申请Executor资源,此时Driver与AppMaster运行在同一个container里,是两个不同的线程,当Driver运行完毕,AppMaster会释放资源并注销自己。所以在该模式下,本地进程仅仅是一个client,如果结束了该进程,整个Spark任务也不会退出,因为Driver是在远程运行的

下面从源码的角度看看SparkSubmit的代码调用(基于Spark2.0.0):

代码公共部分

SparkSubmit#main =>


  1. val appArgs = new SparkSubmitArguments(args) 
  2. appArgs.action match { 
  3.   // normal spark-submit 
  4.   case SparkSubmitAction.SUBMIT => submit(appArgs) 
  5.   // use --kill specified 
  6.   case SparkSubmitAction.KILL => kill(appArgs) 
  7.   // use --status specified 
  8.   case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) 

SparkSubmit的main方法是在用户使用spark-submit脚本提交Spark app的时候调用的,可以看到正常情况下,它会调用SparkSubmit#submit方法

SparkSubmit#submit =>


  1. val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) 
  2. // 此处省略掉代理账户,异常处理,提交失败的重提交逻辑,只看主干代码 
  3. runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) 

在submit方法内部,会先进行提交环境相关的处理,调用的是SparkSubmit#prepareSubmitEnvironment方法,之后利用拿到的mainClass等信息,再调用SparkSubmit#runMain方法来执行对于主函数

SparkSubmit#prepareSubmitEnvironment =>

主干相关的代码如下:


  1. // yarn client mode 
  2. if (deployMode == CLIENT) { 
  3.   // client 模式下,运行的是 --class 后指定的mainClass,也即我们的代码 
  4.   childMainClass = args.mainClass 
  5.   if (isUserJar(args.primaryResource)) { 
  6.     childClasspath += args.primaryResource 
  7.   } 
  8.   if (args.jars != null) { childClasspath ++= args.jars.split(",") } 
  9.   if (args.childArgs != null) { childArgs ++= args.childArgs } 
  10.  
  11. // yarn cluster mode 
  12. val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER 
  13. if (isYarnCluster) { 
  14.   // cluster 模式下,运行的是Client类 
  15.   childMainClass = "org.apache.spark.deploy.yarn.Client" 
  16.   if (args.isPython) { 
  17.     childArgs += ("--primary-py-file", args.primaryResource) 
  18.     childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") 
  19.   } else if (args.isR) { 
  20.     val mainFile = new Path(args.primaryResource).getName 
  21.     childArgs += ("--primary-r-file", mainFile) 
  22.     childArgs += ("--class", "org.apache.spark.deploy.RRunner") 
  23.   } else { 
  24.     if (args.primaryResource != SparkLauncher.NO_RESOURCE) { 
  25.       childArgs += ("--jar", args.primaryResource) 
  26.     } 
  27.     // 这里 --class 指定的是AppMaster里启动的Driver,也即我们的代码 
  28.     childArgs += ("--class", args.mainClass) 
  29.   } 
  30.   if (args.childArgs != null) { 
  31.     args.childArgs.foreach { arg => childArgs += ("--arg", arg) } 
  32.   } 

在 prepareSubmitEnvironment 里,主要负责解析用户参数,设置环境变量env,处理python/R等依赖,然后针对不同的部署模式,匹配不同的运行主类,比如: yarn-client>args.mainClass,yarn-cluster>o.a.s.deploy.yarn.Client

SparkSubmit#runMain =>

骨干代码如下


  1. try { 
  2.   mainClass = Utils.classForName(childMainClass) 
  3. } catch { 
  4.   // ... 
  5. val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) 
  6. try { 
  7.   // childArgs就是用户自己传给Spark应用代码的参数 
  8.   mainMethod.invoke(null, childArgs.toArray) 
  9. } catch { 
  10.   // ... 

在runMain方法里,会设置ClassLoader,根据用户代码优先的设置(spark.driver.userClassPathFirst)来加载对应的类,然后反射调用prepareSubmitEnvironment方法返回的主类,并调用其main方法

从所反射的不同主类,我们来看看具体调用方式的不同:

对于yarn-cluster

o.a.s.deploy.yarn.Client#main =>


  1. val sparkConf = new SparkConf  
  2. val args = new ClientArguments(argStrings) 
  3. new Client(args, sparkConf).run() 

在Client伴生对象里构建了Client类的对象,然后调用了Client#run方法

o.a.s.deploy.yarn.Client#run =>


  1. this.appId = submitApplication() 
  2. // report application ... 

run方法核心的就是提交任务到yarn,其调用了Client#submitApplication方法,拿到提交完的appID后,监控app的状态

o.a.s.deploy.yarn.Client#submitApplication =>


  1. try { 
  2.   // 获取提交用户的Credentials,用于后面获取delegationToken 
  3.   setupCredentials() 
  4.   yarnClient.init(yarnConf) 
  5.   yarnClient.start() 
  6.  
  7.   // Get a new application from our RM 
  8.   val newApp = yarnClient.createApplication() 
  9.   val newAppResponse = newApp.getNewApplicationResponse() 
  10.   // 拿到appID 
  11.   appId = newAppResponse.getApplicationId() 
  12.   // 报告状态 
  13.   reportLauncherState(SparkAppHandle.State.SUBMITTED) 
  14.   launcherBackend.setAppId(appId.toString) 
  15.  
  16.   // Verify whether the cluster has enough resources for our AM 
  17.   verifyClusterResources(newAppResponse) 
  18.  
  19.   // 创建AppMaster运行的context,为其准备运行环境,java options,以及需要运行的java命令,AppMaster通过该命令在yarn节点上启动 
  20.   val containerContext = createContainerLaunchContext(newAppResponse) 
  21.   val appContext = createApplicationSubmissionContext(newApp, containerContext) 
  22.  
  23.   // Finally, submit and monitor the application 
  24.   logInfo(s"Submitting application $appId to ResourceManager") 
  25.   yarnClient.submitApplication(appContext) 
  26.   appId 
  27. } catch { 
  28.   case e: Throwable => 
  29.     if (appId != null) { 
  30.       cleanupStagingDir(appId) 
  31.     } 
  32.     throw e 

在 submitApplication 里完成了app的申请,AppMaster context的创建,最后完成了任务的提交,对于cluster模式而言,任务提交后本地进程就只是一个client而已,Driver就运行在与AppMaster同一container里,对于client模式而言,执行 submitApplication 方法时,Driver已经在本地运行,这一步就只是提交任务到yarn而已

o.a.s.deploy.yarn.Client#createContainerLaunchContext


  1. val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) 
  2. // 非pySpark时,pySparkArchives为Nil 
  3. val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) 
  4. // 这一步会进行delegationtoken的获取,存于Credentials,在AppMasterContainer构建完的最后将其存入到context里 
  5. val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) 
  6.  
  7. val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) 
  8. // 设置AppMaster container运行的资源和环境 
  9. amContainer.setLocalResources(localResources.asJava) 
  10. amContainer.setEnvironment(launchEnv.asJava) 
  11. // 设置JVM参数 
  12. val javaOpts = ListBuffer[String]() 
  13. javaOpts += "-Djava.io.tmpdir=" + tmpDir 
  14. // other java opts setting... 
  15.  
  16. // 对于cluster模式,通过 --class 指定AppMaster运行我们的Driver端,对于client模式则纯作为资源申请和分配的工具 
  17. val userClass = 
  18.   if (isClusterMode) { 
  19.     Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass)) 
  20.   } else { 
  21.     Nil 
  22.   } 
  23. // 设置AppMaster运行的主类 
  24. val amClass = 
  25.   if (isClusterMode) { 
  26.     Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName 
  27.   } else { 
  28.     // ExecutorLauncher只是ApplicationMaster的一个warpper 
  29.     Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName 
  30.   } 
  31.  
  32. val amArgs = 
  33.   Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ 
  34.     userArgs ++ Seq( 
  35.       "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), 
  36.         LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) 
  37.  
  38. // Command for the ApplicationMaster 
  39. val commands = prefixEnv ++ Seq( 
  40.     YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" 
  41.   ) ++ 
  42.   javaOpts ++ amArgs ++ 
  43.   Seq( 
  44.     "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", 
  45.     "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") 
  46.  
  47. val printableCommands = commands.map(s => if (s == null) "null" else s).toList 
  48. // 设置需运行的命令 
  49. amContainer.setCommands(printableCommands.asJava) 
  50.  
  51. val securityManager = new SecurityManager(sparkConf) 
  52. // 设置应用权限 
  53. amContainer.setApplicationACLs( 
  54.       YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava) 
  55. // 设置delegationToken 
  56. setupSecurityToken(amContainer) 

对于yarn-client

args.mainClass =>

在我们的Spark代码里,需要创建一个SparkContext来执行Spark任务,而在其构造器里创建TaskScheduler的时候,对于client模式就会向yarn申请资源提交任务,如下


  1. // 调用createTaskScheduler方法,对于yarn模式,master=="yarn" 
  2. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 
  3. _schedulerBackend = sched 
  4. _taskScheduler = ts 
  5. // 创建DAGScheduler 
  6. _dagScheduler = new DAGScheduler(this) 

SparkContext#createTaskScheduler =>

这里会根据master匹配不同模式,比如local/standalone/yarn,在yarn模式下会利用ServiceLoader装载YarnClusterManager,然后由它创建TaskScheduler和SchedulerBackend,如下:


  1. // 当为yarn模式的时候 
  2. case masterUrl => 
  3.   // 利用当前loader装载YarnClusterManager,masterUrl为"yarn" 
  4.   val cm = getClusterManager(masterUrl) match { 
  5.     case Some(clusterMgr) => clusterMgr 
  6.     case None => throw new SparkException("Could not parse Master URL: '" + master + "'") 
  7.   } 
  8.   try { 
  9.     // 创建TaskScheduler,这里masterUrl并没有用到 
  10.     val scheduler = cm.createTaskScheduler(sc, masterUrl) 
  11.     // 创建SchedulerBackend,对于client模式,这一步会向yarn申请AppMaster,提交任务 
  12.     val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) 
  13.     cm.initialize(scheduler, backend) 
  14.     (backend, scheduler) 
  15.   } catch { 
  16.     case se: SparkException => throw se 
  17.     case NonFatal(e) => 
  18.       throw new SparkException("External scheduler cannot be instantiated", e) 
  19.   } 

YarnClusterManager#createSchedulerBackend


  1. sc.deployMode match { 
  2.   case "cluster" => 
  3.     new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) 
  4.   case "client" => 
  5.     new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) 
  6.   case  _ => 
  7.     throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") 

可以看到yarn下的SchedulerBackend实现对于client和cluster模式是不同的,yarn-client模式为YarnClientSchedulerBackend,yarn-cluster模式为 YarnClusterSchedulerBackend,之所以不同,是因为在client模式下,YarnClientSchedulerBackend 相当于 yarn application 的client,它会调用o.a.s.deploy.yarn.Client#submitApplication 来准备环境,申请资源并提交yarn任务,如下:


  1. val driverHost = conf.get("spark.driver.host") 
  2. val driverPort = conf.get("spark.driver.port") 
  3. val hostport = driverHost + ":" + driverPort 
  4. sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) } 
  5.  
  6. val argsArrayBuf = new ArrayBuffer[String]() 
  7. argsArrayBuf += ("--arg", hostport) 
  8.  
  9. val args = new ClientArguments(argsArrayBuf.toArray) 
  10. totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf) 
  11. // 创建o.a.s.deploy.yarn.Client对象 
  12. client = new Client(args, conf) 
  13. // 调用submitApplication准备环境,申请资源,提交任务,并把appID保存下来 
  14. // 对于submitApplication,前文有详细的分析,这里与前面是一致的 
  15. bindToYarn(client.submitApplication(), None) 

而在 YarnClusterSchedulerBackend 里,由于 AppMaster 已经运行起来了,所以它并不需要再做申请资源等等工作,只需要保存appID和attemptID并启动SchedulerBackend即可.

本文作者:佚名

来源:51CTO

时间: 2024-11-08 22:24:52

从源码角度看Spark on yarn client & cluster模式的本质区别的相关文章

从JDK源码角度看Float

关于IEEE 754 在看Float前需要先了解IEEE 754标准,该标准定义了浮点数的格式还有一些特殊值,它规定了计算机中二进制与十进制浮点数转换的格式及方法.规定了四种表示浮点数值的方法,单精确度(32位).双精确度(64位).延伸单精确度(43位以上)与延伸双精确度(79位以上).多数编程语言支持单精确度和双精确度,这里讨论的Float就是Java的单精确度的实现. 浮点数的表示 浮点数由三部分组成,如下图,符号位s.指数e和尾数f. 对于求值我们是有一个公式对应的,根据该公式来看会更简

从JDK源码角度看Byte

Java的Byte类主要的作用就是对基本类型byte进行封装,提供了一些处理byte类型的方法,比如byte到String类型的转换方法或String类型到byte类型的转换方法,当然也包含与其他类型之间的转换方法. 主要实现代码如下: public final class Byte extends Number implements Comparable<Byte> { public static final byte MIN_VALUE = -128; public static fina

从JDK源码角度看Long

概况 Java的Long类主要的作用就是对基本类型long进行封装,提供了一些处理long类型的方法,比如long到String类型的转换方法或String类型到long类型的转换方法,当然也包含与其他类型之间的转换方法.除此之外还有一些位相关的操作. 继承结构 --java.lang.Object --java.lang.Number --java.lang.Long 主要属性 public static final long MIN_VALUE = 0x8000000000000000L;

从JDK源码角度看Integer

概况 Java的Integer类主要的作用就是对基本类型int进行封装,提供了一些处理int类型的方法,比如int到String类型的转换方法或String类型到int类型的转换方法,当然也包含与其他类型之间的转换方法.除此之外还有一些位相关的操作. 继承结构 --java.lang.Object --java.lang.Number --java.lang.Integer 主要属性 第一部分 public static final int MIN_VALUE = 0x80000000; pub

从JDK源码角度看Short

概况 Java的Short类主要的作用就是对基本类型short进行封装,提供了一些处理short类型的方法,比如short到String类型的转换方法或String类型到short类型的转换方法,当然也包含与其他类型之间的转换方法. 继承结构 --java.lang.Object --java.lang.Number --java.lang.Short 主要属性 public static final short MIN_VALUE = -32768; public static final s

从JDK源码角度看Boolean

Java的Boolean类主要作用就是对基本类型boolean进行封装,提供了一些处理boolean类型的方法,比如String类型和boolean类型的转换. 主要实现源码如下: public final class Boolean implements java.io.Serializable, Comparable<Boolean> { private final boolean value; public static final Boolean TRUE = new Boolean(

从JDK源码角度看java并发的公平性

        JAVA为简化开发者开发提供了很多并发的工具,包括各种同步器,有了JDK我们只要学会简单使用类API即可.但这并不意味着不需要探索其具体的实现机制,本文从JDK源码角度简单讲讲并发时线程竞争的公平性.         所谓公平性指所有线程对临界资源申请访问权限的成功率都一样,不会让某些线程拥有优先权.我们知道CLH Node FIFO等待队列是一个先进先出的队列,那么是否就可以说每条线程获取锁时就是公平的呢?关于公平性这里分拆成三个点分别阐述:         ① 准备入队列的节

从JDK源码角度看线程池原理

        "池"技术对我们来说是非常熟悉的一个概念,它的引入是为了在某些场景下提高系统某些关键节点性能,最典型的例子就是数据库连接池,JDBC是一种服务供应接口(SPI),具体的数据库连接实现类由不同厂商实现,数据库连接的建立和销毁都是很耗时耗资源的操作,为了查询数据库中某条记录,最原始的一个过程是建立连接.发送查询语句.返回查询结果.销毁连接,假如仅仅是一个很简单的查询语句,那么可能建立连接与销毁连接两个步骤就已经占所有资源时间消耗的绝大部分,如此低下的效率显然让人无法接受.针

从JDK源码角度看并发竞争的超时

        JDK中的并发框架提供的另外一个优秀机制是锁获取超时的支持,当大量线程对某一锁竞争时可能导致某些线程在很长一段时间都获取不了锁,在某些场景下可能希望如果线程在一段时间内不能成功获取锁就取消对该锁的等待以提高性能,这时就需要用到超时机制.在JDK1.5之前并没有对此支持,当时的并发控制职能通过JVM内置的synchronized关键词实现锁,但对一些特殊要求却力不从心,例如超时取消控制.JDK1.5开始引入并发工具完美解决了此问题,JDK对并发线程开始提供超时的支持.