《循序渐进学Spark》一3.2 Spark调度机制

3.2 Spark调度机制

Spark调度机制是保证Spark应用高效执行的关键。本节从Application、job、stage和task的维度,从上层到底层来一步一步揭示Spark的调度策略。

3.2.1 Application的调度

Spark中,每个Application对应一个SparkContext。SparkContext之间的调度关系取决于Spark的运行模式。对Standalone模式而言,Spark Master节点先计算集群内的计算资源能否满足等待队列中的应用对内存和CPU资源的需求,如果可以,则Master创建Spark Driver,启动应用的执行。宏观上来讲,这种对应用的调度类似于FIFO策略。在Mesos和YARN模式下,底层的资源调度系统的调度策略都是由Mesos和YARN决定的。具体分类描述如下:

1. Standalone模式

默认以用户提交Application的顺序来调度,即FIFO策略。每个应用执行时独占所有资源。如果有多个用户要共享集群资源,则可以使用参数spark.cores.max来配置应用在集群中可以使用的最大CPU核数。如果不配置,则采用默认参数spark.deploy.defaultCore的值来确定。

2. Mesos模式

如果在Mesos上运行Spark,用户想要静态配置资源的话,可以设置spark.mesos.coarse为true,这样Mesos变为粗粒度调度模式,然后可以设置spark.cores.max指定集群中可以使用的最大核数,与上面的Standalone模式类似。同时,在Mesos模式下,用户还可以设置参数spark.executor.memory来配置每个executor的内存使用量。如果想使Mesos在细粒度模式下运行,可以通过mesos://<url-info>设置动态共享cpu core的执行模式。在这种模式下,应用不执行时的空闲CPU资源得以被其他用户使用,提升了CPU使用率。

3. YARN模式

如果在YARN上运行Spark,用户可以在YARN的客户端上设置--num-executors 来控制为应用分配的Executor数量,然后设置--executor-memory指定每个Executor的内存大小,设置--executor-cores指定Executor占用的CPU核数。

3.2.2 job的调度

前面章节提到过,Spark应用程序实际上是一系列对RDD的操作,这些操作直至遇见Action算子,才触发Job的提交。事实上,在底层实现中,Action算子最后调用了runJob函数提交Job给Spark。其他的操作只是生成对应的RDD关系链。如在RDD.scala程序文件中,count函数源码所示。

def count():
Long = sc.runJob(this, Utils.getIteratorSize _).sum

其中sc为SparkContext的对象。可见在Spark中,对Job的提交都是在Action算子中隐式完成的,并不需要用户显式地提交作业。在SparkContext中Job提交的实现中,最后会调用DAGScheduler中的Job提交接口。DAGScheduler最重要的任务之一就是计算Job与Task的依赖关系,制定调度逻辑。

Job调度的基本工作流程如图3-4所示,每个Job从提交到完成,都要经历一系列步骤,拆分成以Tsk为最小单位,按照一定逻辑依赖关系的执行序列。

图3-4 Job的调度流程

图3-5则从Job调度流程中的细节模块出发,揭示了工作流程与对应模块之间的关系。从整体上描述了各个类在Job调度流程中的交互关系。

图3-5 Job调度流程细节

在Spark1.5.0的调度目录下的SchedulingAlgorithm.scala文件中,描述了Spark对Job的调度模式。

1. FIFO模式

默认情况下,Spark对Job以FIFO(先进先出)的模式进行调度。在SchedulingAlgorithm.scala文件中声明了FIFO算法实现。

3. 配置调度池

DAGScheduler构建了具有依赖关系的任务集。TaskScheduler负责提供任务给Task-SetManager作为调度的先决条件。TaskSetManager负责具体任务集内部的调度任务。调度池(pool)则用于调度每个SparkContext运行时并存的多个互相独立无依赖关系的任务集。调度池负责管理下一级的调度池和TaskSetManager对象。

用户可以通过配置文件定义调度池的属性。一般调度池支持如下3个参数:

1)调度模式Scheduling mode:用户可以设置FIFO或者FAIR调度方式。

2)weight:调度池的权重,在获取集群资源上权重高的可以获取多个资源。

3)miniShare:代表计算资源中的CPU核数。

用户可以通过conf/fairscheduler.xml配置调度池的属性,同时要在SparkConf对象中配置属性。

3.2.3 stage(调度阶段)和TasksetManager的调度

1. Stage划分

当一个Job被提交后,DAGScheduler会从RDD依赖链的末端触发,遍历整个RDD依赖链,划分Stage(调度阶段)。划分依据主要基于ShuffleDependency依赖关系。换句话说,当某RDD在计算中需要将数据进行Shuffle操作时,这个包含Shuffle操作的RDD将会被用来作为输入信息,构成一个新的Stage。以这个基准作为划分Stage,可以保证存在依赖关系的数据按照正确数据得到处理和运算。在Spark1.5.0的源代码中,DAGScheduler.scala中的getParentStages函数的实现从一定角度揭示了Stage的划分逻辑。

2. Stage调度

在第一步的Stage划分过程中,会产生一个或者多个互相关联的Stage。其中,真正执行Action算子的RDD所在的Stage被称为Final Stage。DAGScheduler会从这个final stage生成作业实例。

在Stage提交时,DAGScheduler首先会判断该Stage的父Stage的执行结果是否可用。如果所有父Stage的执行结果都可用,则提交该Stage。如果有任意一个父Stage的结果不可用,则尝试迭代提交该父Stage。所有结果不可用的Stage都将会被加入waiting队列,等待执行,如图3-6所示。

图3-6 Stage依赖

在图3-6中,虚箭头表示依赖关系。Stage序号越小,表示Stage越靠近上游。

图3-6中的Stage调度运行顺序如图3-7所示。

图3-7 Stage执行顺序

从图3-7可以看出,上游父Stage先得到执行,waiting queue中的stage随后得到执行。

3.
TasksetManager

每个Stage的提交会被转化为一组task的提交。DAGScheduler最终通过调用taskscheduler的接口来提交这组任务。在taskScheduler内部实现中创建了taskSetManager实例来管理任务集taskSet的生命周期。事实上可以说每个stage对应一个tasksetmanager。

至此,DAGScheduler的工作基本完毕。taskScheduler在得到集群计算资源时,taskSet-Manager会分配task到具体worker节点上执行。在Spark1.5.0的taskSchedulerImpl.scala文件中,提交task的函数实现如下:

当taskSetManager进入到调度池中时,会依据job id对taskSetManager排序,总体上先进入的taskSetManager先得到调度。对于同一job内的taskSetManager而言,job id较小的先得到调度。如果有的taskSetManager父Stage还未执行完,则该taskSet-Manager不会被放到调度池。

3.2.4 task的调度

在DAGScheduler.scala中,定义了函数submitMissingTasks,读者阅读完整实现,从中可以看到task的调度方式。限于篇幅,以下截取部分代码。

时间: 2024-10-28 10:15:26

《循序渐进学Spark》一3.2 Spark调度机制的相关文章

《循序渐进学Spark 》Spark 编程模型

本节书摘来自华章出版社<循序渐进学Spark >一书中的第1章,第3节,作者 小象学院 杨 磊,更多章节内容可以访问"华章计算机"公众号查看. Spark机制原理 本书前面几章分别介绍了Spark的生态系统.Spark运行模式及Spark的核心概念RDD和基本算子操作等重要基础知识.本章重点讲解Spark的主要机制原理,因为这是Spark程序得以高效执行的核心.本章先从Application.job.stage和task等层次阐述Spark的调度逻辑,并且介绍FIFO.FA

《循序渐进学Spark》一第2章

 本节书摘来自华章出版社<循序渐进学Spark>一书中的第2章,第2.1节,作者 小象学院 杨 磊,更多章节内容可以访问"华章计算机"公众号查看. 第2章 Spark 编程模型 与Hadoop相比,Spark最初为提升性能而诞生.Spark是Hadoop MapReduce的演化和改进,并兼容了一些数据库的基本思想,可以说,Spark一开始就站在Hadoop与数据库这两个巨人的肩膀上.同时,Spark依靠Scala强大的函数式编程Actor通信模式.闭包.容器.泛型,并借助

《循序渐进学Spark》一2.3 Spark算子

 本节书摘来自华章出版社<循序渐进学Spark>一书中的第2章,第2.3节,作者 小象学院 杨 磊,更多章节内容可以访问"华章计算机"公众号查看. 2.3 Spark算子 本节介绍Spark算子的分类及其功能. 2.3.1 算子简介 Spark应用程序的本质,无非是把需要处理的数据转换为RDD,然后将RDD通过一系列变换(transformation)和操作(action)得到结果,简单来说,这些变换和操作即为算子. Spark支持的主要算子如图2-4所示. 根据所处理的数

《循序渐进学Spark》一第3章

第3章 Spark机制原理 本书前面几章分别介绍了Spark的生态系统.Spark运行模式及Spark的核心概念RDD和基本算子操作等重要基础知识.本章重点讲解Spark的主要机制原理,因为这是Spark程序得以高效执行的核心.本章先从Application.job.stage和task等层次阐述Spark的调度逻辑,并且介绍FIFO.FAIR等经典算法,然后对Spark的重要组成模块:I/O与通信控制模块.容错模块及Shuffle模块做了深入的阐述.其中,在Spark I/O模块中,数据以数据

《循序渐进学Spark》一导读

Preface 前 言 Spark诞生于美国加州大学伯克利分校AMP实验室.随着大数据技术在互联网.金融等领域的突破式进展,Spark在近些年得到更为广泛的应用.这是一个核心贡献者超过一半为华人的大数据平台开源项目,且正处于飞速发展.快速成熟的阶段. 为什么写这本书 Spark已经成为大数据计算.分析领域新的热点和发展方向.相对于Hadoop传统的MapReduce计算模型,Spark提供更为高效的计算框架以及更为丰富的功能,因此在大数据生产应用领域中不断攻城略地,势如破竹. 与企业不断涌现的对

《循序渐进学Spark 》Spark架构与集群环境

Spark架构与集群环境 本章首先介绍Spark大数据处理框架的基本概念,然后介绍Spark生态系统的主要组成部分,包括Spark SQL.Spark Streaming.MLlib和GraphX,接着简要描述了Spark的架构,便于读者认识和把握,最后描述了Spark集群环境搭建及Spark开发环境的构建方法. 1.1 Spark概述与架构 随着互联网规模的爆发式增长,不断增加的数据量要求应用程序能够延伸到更大的集群中去计算.与单台机器计算不同,集群计算引发了几个关键问题,如集群计算资源的共享

《循序渐进学Spark 》导读

目 录 前 言 第1章 Spark架构与集群环境    1.1 Spark概述与架构     1.1.1 Spark概述     1.1.2 Spark生态     1.1.3 Spark架构     1.2 在Linux集群上部署Spark     1.2.1 安装OpenJDK     1.2.2 安装Scala     1.2.3 配置SSH免密码登录     1.2.4 Hadoop的安装配置     1.2.5 Spark的安装部署     1.2.6 Hadoop与Spark的集群复

《循序渐进学Spark》一第1章

第1章 Spark架构与集群环境 本章首先介绍Spark大数据处理框架的基本概念,然后介绍Spark生态系统的主要组成部分,包括Spark SQL.Spark Streaming.MLlib和GraphX,接着简要描述了Spark的架构,便于读者认识和把握,最后描述了Spark集群环境搭建及Spark开发环境的构建方法. 1.1 Spark概述与架构 随着互联网规模的爆发式增长,不断增加的数据量要求应用程序能够延伸到更大的集群中去计算.与单台机器计算不同,集群计算引发了几个关键问题,如集群计算资

《循序渐进学Spark》一3.5 容错机制及依赖

3.5 容错机制及依赖 一般而言,对于分布式系统,数据集的容错性通常有两种方式: 1) 数据检查点(在Spark中对应Checkpoint机制). 2) 记录数据的更新(在Spark中对应Lineage血统机制). 对于大数据分析而言,数据检查点操作成本较高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低,同时会消耗大量存储资源. Spark选择记录更新的方式.但更新粒度过细时,记录更新成本也不低.因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然