Spark 动态资源分配(Dynamic Resource Allocation) 解析

前言

最近在使用Spark Streaming程序时,发现如下几个问题:

  1. 高峰和低峰Spark Streaming每个周期要处理的数据量相差三倍以上,预分配资源会导致低峰的时候资源的大量浪费。
  2. Spark Streaming 跑的数量多了后,资源占用相当可观。

所以便有了要开发一套针对Spark Streaming 动态资源调整的想法。我在文章最后一个章节给出了一个可能的设计方案。不过要做这件事情,首先我们需要了解现有的Spark 已经实现的 Dynamic Resource Allocation 机制,以及为什么它无法满足现有的需求。

入口

在SparkContext 中可以看到这一行:

_executorAllocationManager =
      if (dynamicAllocationEnabled) {
        Some(new ExecutorAllocationManager(this, listenerBus, _conf))
      } else {
        None
      }

通过spark.dynamicAllocation.enabled参数开启后就会启动ExecutorAllocationManager。

这里有我第一个吐槽的点,这么直接new出来,好歹也做个配置,方便第三方开发个新的组件可以集成进去。但是Spark很多地方都是这么搞的,完全没有原来Java社区的风格。

动态调整资源面临的问题

我们先看看,动态资源调整需要解决哪几个问题:

  1. Cache问题。如果需要移除的Executor含有RDD cache该如何办?
  2. Shuffle问题。 如果需要移除的Executor包含了Shuffle Write先关数据该怎么办?
  3. 添加和删除之后都需要告知DAGSchedule进行相关信息更新。

Cache去掉了重算即可。为了防止数据抖动,默认包含有Cache的Executor是不会被删除的,因为默认的Idle时间设置的非常大:

private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.cachedExecutorIdleTimeout",
s"${Integer.MAX_VALUE}s")

你可以自己设置从而去掉这个限制。

而对于Shuffle,则需要和Yarn集成,需要配置yarn.nodemanager.aux-services。具体配置方式,大家可以Google。这样Spark Executor就不用保存Shuffle状态了。

触发条件

添加Worker的触发条件是:

  1. 有Stage正在运行,并且预估需要的Executors > 现有的
  2. 删除Woker的触发条件是:
  3. 一定时间内(默认60s)没有task运行的Executor

我们看到触发条件还是比较简单的。这种简单就意味着用户需要根据实际场景,调整各个时间参数,比如到底多久没有运行task的Executor才需要删除。

默认检测时间是100ms:

private val intervalMillis: Long = 100

如何实现Container的添加和释放

只有ApplicationMaster才能够向Yarn发布这些动作。而真正的中控是org.apache.spark.ExecutorAllocationManager,所以他们之间需要建立一个通讯机制。对应的方式是在ApplicationMaster有一个private class AMEndpoint(类,比如删除释放容器的动作在里就有:

  case KillExecutors(executorIds) =>
        logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
        Option(allocator) match {
          case Some(a) => executorIds.foreach(a.killExecutor)
          case None => logWarning("Container allocator is not ready to kill executors yet.")
        }
        context.reply(true)

而ExecutorAllocationManager则是引用YarnSchedulerBackend实例,该实例持有ApplicationMaster的 RPC引用

private var amEndpoint: Option[RpcEndpointRef]

如何获取调度信息

要触发上面描述的操作,就需要任务的调度信息。这个是通过ExecutorAllocationListener extends SparkListener来完成的。具体是在 ExecutorAllocationMaster的start函数里,会将该Listener实例添加到SparkContext里的listenerBus里,从而实现对DAGSchecude等模块的监听。机制可以参看这篇文章 Spark ListenerBus 和 MetricsSystem 体系分析。

根据上面的分析,我们至少要知道如下三个信息:

  1. Executor上是否为空,如果为空,就可以标记为Idle.只要超过一定的时间,就可以删除掉这个Executor.
  2. 正在跑的Task有多少
  3. 等待调度的Task有多少

这里是以Stage为区分的。分别以三个变量来表示:

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]

名字已经很清楚了。值得说的是stageIdToTaskIndices,其实就是stageId 对应的正在运行的task id 集合。

那么怎么计算出等待调度的task数量呢?计算方法如下: 

 stageIdToNumTasks(stageId) - stageIdToTaskIndices(stageId).size

这些都是动态更新变化的,因为有了监听器,所以任务那边有啥变化,这边都会得到通知。

定时扫描器

有了上面的铺垫,我们现在进入核心方法:

private def schedule(): Unit = synchronized {
    val now = clock.getTimeMillis

    updateAndSyncNumExecutorsTarget(now)

    removeTimes.retain { case (executorId, expireTime) =>
      val expired = now >= expireTime
      if (expired) {
        initializing = false
        removeExecutor(executorId)
      }
      !expired
    }
  }

该方法会每隔100ms被调度一次。你可以理解为一个监控线程。

Executor判定为空闲的机制

只要有一个task结束,就会判定有哪些Executor已经没有任务了。然后会被加入待移除列表。在放到removeTimes的时候,会把当前时间now + executorIdleTimeoutS * 1000 作为时间戳存储起来。当调度进程扫描这个到Executor时,会判定时间是不是到了,到了的话就执行实际的remove动作。在这个期间,一旦有task再启动,并且正好运行在这个Executor上,则又会从removeTimes列表中被移除。 那么这个Executor就不会被真实的删除了。

Executor 需要增加的情况

首先,系统会根据下面的公式计算出实际需要的Executors数目:

private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
  }

接着每个计算周期到了之后,会和当前已经有的Executors数:numExecutorsTarget 进行比较。

  1. 如果发现  maxNumExecutorsNeeded < numExecutorsTarget 则会发出取消还有没有执行的Container申请。并且重置每次申请的容器数为1,也就是numExecutorsToAdd=1
  2. 否则如果发现当前时间now >= addTime(addTime 每次会增加一个sustainedSchedulerBacklogTimeoutS ,避免申请容器过于频繁),则会进行新容器的申请,如果是第一次,则增加一个(numExecutorsToAdd),如果是第二次则增加2个以此按倍数类推。直到maxNumExecutorsNeeded <= numExecutorsTarget ,然后就会重置numExecutorsToAdd。

所以我们会发现,我们并不是一次性就申请足够的资源,而是每隔sustainedSchedulerBacklogTimeoutS次时间,按[1,2,4,8]这种节奏去申请资源的。因为在某个sustainedSchedulerBacklogTimeoutS期间,可能已经有很多任务完成了,其实不需要那么多资源了。而按倍数上升的原因是,防止为了申请到足够的资源时间花费过长。这是一种权衡。

DRA评价

我们发现,DRA(Dynamic Resource Allocation)涉及到的点还是很多的,虽然逻辑比较简单,但是和任务调度密切相关,是一个非常动态的过程。这个设计本身也是面向一个通用的调度方式。

我个人建议如果采用了DRA,可以注意如下几点:

  1. 设置一个合理的minExecutors-maxExecutors值
  2. 将Executor对应的cpuCore 最好设置为<=3 ,避免Executor数目下降时,等不及新申请到资源,已有的Executor就因为任务过重而导致集群挂掉。
  3. 如果程序中有shuffle,例如(reduce,groupBy),建议设置一个合理的并行数,避免杀掉过多的Executors。
  4. 对于每个Stage持续时间很短的应用,其实不适合这套机制。这样会频繁增加和杀掉Executors,造成系统颠簸。而Yarn对资源的申请处理速度并不快。

Spark Streaming该使用什么机制动态调整资源

现有的DRA机制其实适合长时的批处理过程中,每个Stage需要的资源量不一样,并且耗时都比较长。Spark Streaming 可以理解为循环的微批处理。而DRA是在每次微批处理起作用,可能还没等DRA反应过来,这个周期就已经过了。

Spark Streaming需要一个从全局一天24小时来考虑。每个调度周期的processing time可能更适合作为增减Executors的标准。同时如果发生delay的话,则可以扩大资源申请的速度。并且,因为是周期性的,释放和新增动作只会发生在一个新的周期的开始,所以他并不会面临现有 DRA的问题,譬如需要通过额外的方式保存Shuffle 状态等。 所以实现起来更加容易。我们可能需要同时监听StreamingContext的一些信息。

具体而言:

 每个周期检查上个周期的处理时间 ,设为 preProcessingTime,周期为duration, 一般而言,我们的Spark Streaming程序都会让preProcessingTime < duration。否则会发生delay。 

如果 preProcessingTime > 0.8 * duration,则一次性将资源申请到maxExecutors。

如果preProcessingTime < duration,则应该删除的Worker为

    removeExecutorNum =  currentExecutors * ((duration -preProcessingTime)/duration - 0.2)

其中0.2 为预留的worker数。如果removeExecutorNum如果<=0 则不进行任何操作。

假设duration =10s, preProcessingTime= 5s, currentExecutors=100,则我们理论上认为 只要保留50%的资源即可。但是为了防止延时,我们其实额外保留一些20%资源。也就意味着我们删除30个Executor。 我们并不会一次性将资源都释放掉。假设我们增加一个新的参数spark.streaming.release.num.duration=5,这个参数意味着我们需要花费5个周期释放掉这30个Executor的资源。也就是当前这个周期,我们要释放掉 6个Executor。

接着到下一个周期,重复上面的计算。 直到计算结果 <=0 为止。

时间: 2024-12-20 17:02:33

Spark 动态资源分配(Dynamic Resource Allocation) 解析的相关文章

Spark Streaming Dynamic Resource Allocation

Problem Statement DRA has already been implemented since Spark 1.2 . However the existing Spark DRA on Yarn implementation does not embody the specific property of Spark Streaming.   Spark DRA works when there are some executors being idle for  remov

Spark Streaming Dynamic Resource Allocation 文档(非官方特性)

必要配置 通过下面参数开启DRA spark.streaming.dynamicAllocation.enabled=true 设置最大最小的Executor 数目: spark.streaming.dynamicAllocation.minExecutors=0 spark.streaming.dynamicAllocation.maxExecutors=50 可选配置 这些参数可以不用配置,都已经提供了一个较为合理的默认值 开启日志: spark.streaming.dynamicAlloc

DRM - Dynamic Resource management (Doc ID 390483.1)

 DRM - Dynamic Resource management (Doc ID 390483.1) To Bottom Modified:09-Jul-2013Type:BULLETIN In this Document   Purpose   Scope   Details   DRM - Dynamic Resource Mastering APPLIES TO: Oracle Database - Standard Edition - Version 10.1.0.2 to 11

Dynamic resource management for energy saving in the cloud computing environment

Dynamic resource management for energy saving in the cloud computing environment Liang-Teh Lee, Kang-Yuan Liu, and Hui-Yang Huang This paper presents a method of dynamic voltage scaling for dynamic adjustment of resources by inspecting CPU utilizatio

Hyper-V的动态内存(Dynamic Memory)

Hyper-V不支持Memory Overcommitment,一直为VMware和其他虚拟化厂商所诟病.当然,微软一直不承认这是他们的软肋,认为为了保证生产环境VM的性能,过量分配内存是不合时宜的.但是,微软的态度似乎突然转变,3月18日在其虚拟化的官方博客上发表了这样的一篇文章,声称微软听取了很多客户的意见,认为如何在避免性能下降的同时,最大限度的利用资源和投资,是客户所关心的,也是微软的努力方向,于是他们计划推出"动态内存"技术. http://blogs.technet.com

动态加载的网页解析

问题描述 hiall:我需要解析网页为http://auto.qq.com/buycar/tag_article_list.htm?tags=rav4查看源代码<divclass="listCon"><divclass="hd"><h2>关于<spanclass="tag-wrap"></span>的新闻</h2></div>是动态加载的源码中动态的js为:<

设计模式之动态代理(dynamic proxy)

1 动态代理与静态代理 我们从上一篇设计模式之代理模式一文中已经知道,在代理模式中代理对象和被代理对象一般实现相同的接口,调用者与代理对象进行交互.代理的存在对于调用者来说是透明的,调用者看到的只是接口.这就是传统的代理模式静态代理的特点. 那么传统的静态代理模式有什么问题呢?如果需要代理的类只有一个,那么静态代理没什么问题,如果有很多类需要代理呢,用静态代理的话就需要为每一个类创建一个代理类,显然这么做太过繁琐也容易出错.为此,JDK 5引入的动态代理机制,允许开发人员在运行时刻动态的创建出代

Android自定义View制作动态炫酷按钮实例解析_Android

普通按钮也就那么几种样式,看着都审美疲劳,先放效果图:   你会不会以为这个按钮是集结了很多动画的产物,我告诉你,并没有.所有的实现都是基于自定义View,采用最底层的onDraw一点一点的画出来的.没有采用一丁点的动画.虽然演示时间很短,但是要完成这么多变化,还是挺吃力. 首先讲解用法:  public class MainActivity extends Activity { @Override protected void onCreate(Bundle savedInstanceStat

Android自定义View制作动态炫酷按钮实例解析

普通按钮也就那么几种样式,看着都审美疲劳,先放效果图: 你会不会以为这个按钮是集结了很多动画的产物,我告诉你,并没有.所有的实现都是基于自定义View,采用最底层的onDraw一点一点的画出来的.没有采用一丁点的动画.虽然演示时间很短,但是要完成这么多变化,还是挺吃力. 首先讲解用法: public class MainActivity extends Activity { @Override protected void onCreate(Bundle savedInstanceState)