Spark Streaming 数据清理机制

前言

为啥要了解机制呢?这就好比JVM的垃圾回收,虽然JVM的垃圾回收已经巨牛了,但是依然会遇到很多和它相关的case导致系统运行不正常。

这个内容我记得自己刚接触Spark Streaming的时候,老板也问过我,运行期间会保留多少个RDD? 当时没回答出来。后面在群里也有人问到了,所以就整理了下。文中如有谬误之处,还望指出。

DStream 和 RDD

我们知道Spark Streaming 计算还是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上关系。然而Spark Streaming 并没有直接让用户使用RDD而是自己抽象了一套DStream的概念。 DStream 和 RDD 是包含的关系,你可以理解为Java里的装饰模式,也就是DStream  是对RDD的增强,但是行为表现和RDD是基本上差不多的。都具备几个条件:

  1. 具有类似的tranformation动作,比如map,reduceByKey等,也有一些自己独有的,比如Window,mapWithStated等
  2. 都具有Action动作,比如foreachRDD,count等

从编程模型上看是一致的。

所以很可能你写的那堆Spark Streaming代码看起来好像和Spark 一致的,然而并不能直接复用,因为一个是DStream的变换,一个是RDD的变化。

Spark Streaming中 DStream 介绍

DStream 下面包含几个类:

  1. 数据源类,比如InputDStream,具体如DirectKafkaInputStream等
  2. 转换类,典型比如MappedDStream,ShuffledDStream
  3. 输出类,典型比如ForEachDStream

从上面来看,数据从开始(输入)到结束(输出)都是DStream体系来完成的,也就意味着用户正常情况是无法直接去产生和操作RDD的,这也就是说,DStream有机会和义务去负责RDD的生命周期。

这就回答了前言中的问题了。Spark Streaming具备自动清理功能。

RDD 在Spark Stream中产生的流程

在Spark Streaming中RDD的生命流程大体如下:

  1. 在InputDStream会将接受到的数据转化成RDD,比如DirectKafkaInputStream 产生的就是 KafkaRDD
  2. 接着通过MappedDStream等进行数据转换,这个时候是直接调用RDD对应的map方法进行转换的
  3. 在进行输出类操作时,才暴露出RDD,可以让用户执行相应的存储,其他计算等操作。

我们这里就以下面的代码来进行更详细的解释:

val source  =   KafkaUtils.createDirectInputStream(....)
source.map(....).foreachRDD{rdd=>
    rdd.saveTextFile(....)
}

foreachRDD 产生ForEachDStream,因为foreachRDD是个Action,所以会触发任务的执行,会被调用generateJob方法。

 override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

对应的parent是MappedDStream,也就是说调用MappedDStream.getOrCompute.该方法在DStream中,首先会在MappedDStream对象中的generatedRDDs 变量中查找是否已经有RDD,如果没有则触发计算,并且将产生的RDD放到generatedRDDs

@transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
....
generatedRDDs.put(time, newRDD)
....

计算RDD是调用的compute方法,MappedDStream 的compute方法很简单,直接调用的父类也就是DirectKafkaInputStream的getOrCompute方法:

override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }

在上面的例子中,MappedDStream 的parent是DirectKafkaInputStream中,这是个数据源,所以他的compute方法会直接new出一个RDD.

从上面可以得出几个结论:

  1. 数据源以及转换类DStream都会维护一个generatedRDDs,可以按batchTime 进行获取
  2. 内部本质还是进行的RDD的转换

如果我们调用了cache会发生什么

这里又会有两种情况,一种是调用DStream.cache,第二种是RDD.cache。事实上他们是完全一样的。

DStream的cache 动作只是将DStream的变量storageLevel 设置为MEMORY_ONLY_SER,然后在产生(或者获取)RDD的时候,调用RDD的persit方法进行设置。所以DStream.cache 产生的效果等价于RDD.cache(也就是你自己调用foreachRDD 将RDD 都设置一遍)

进入正题,我们是怎么释放Cache住的RDD的

其实无所谓Cache不Cache住,RDD最终都是要释放的,否则运行久了,光RDD对象也能承包了你的内存。我们知道,在Spark Streaming中,周期性产生事件驱动Spark Streaming 的类其实是:

org.apache.spark.streaming.scheduler.JobGenerator

他内部有个永动机(定时器),定时发布一个产生任务的事件:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

然后通过processEvent进行事件处理:

  /** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

目前我们只关注ClearMetadata 事件。对应的方法为:

private def clearMetadata(time: Time) {
    ssc.graph.clearMetadata(time)

    // If checkpointing is enabled, then checkpoint,
    // else mark batch to be fully processed
    if (shouldCheckpoint) {
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
    } else {
      // If checkpointing is not enabled, then delete metadata information about
      // received blocks (block data not saved in any case). Otherwise, wait for
      // checkpointing of this batch to complete.
      val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
      jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
      jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
      markBatchFullyProcessed(time)
    }
  }

首先是清理输出DStream(比如ForeachDStream),接着是清理输入类(基于Receiver模式)的数据。

ForeachDStream 其实调用的也是DStream的方法。该方法大体如下:

private[streaming] def clearMetadata(time: Time) {
    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
    logDebug("Clearing references to old RDDs: [" +
      oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
    generatedRDDs --= oldRDDs.keys
    if (unpersistData) {
      logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
      oldRDDs.values.foreach { rdd =>
        rdd.unpersist(false)
        // Explicitly remove blocks of BlockRDD
        rdd match {
          case b: BlockRDD[_] =>
            logInfo("Removing blocks of RDD " + b + " of time " + time)
            b.removeBlocks()
          case _ =>
        }
      }
    }
    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
      (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
    dependencies.foreach(_.clearMetadata(time))
  }

大体执行动作如下描述:

  1. 根据记忆周期得到应该剔除的RDD
  2. 根据是否要清理cache数据,进行unpersit 操作,并且显示的移除block
  3. 根据依赖调用其他的DStream进行动作清理

这里我们还可以看到,通过参数spark.streaming.unpersist 你是可以决定是否手工控制是否需要对cache住的数据进行清理。

这里你会有两个疑问:

  1. dependencies 是什么?
  2. rememberDuration 是怎么来的?

dependencies 你可以简单理解为父DStream,通过dependencies 我们可以获得已完整DStream链。

rememberDuration 的设置略微复杂些,大体是 slideDuration,如果设置了checkpointDuration 则是2*checkpointDuration 或者通过DStreamGraph.rememberDuration(如果设置了的话,譬如通过StreamingContext.remember方法,不过通过该方法设置的值要大于计算得到的值会生效)

另外值得一提的就是后面的DStream 会调整前面的DStream的rememberDuration,譬如如果你用了window* 相关的操作,则在此之前的DStream 的rememberDuration 都需要加上windowDuration。

然后根据Spark Streaming的定时性,每个周期只要完成了,都会触发清理动作,这个就是清理动作发生的时机。代码如下:

def onBatchCompletion(time: Time) {
    eventLoop.post(ClearMetadata(time))
}

总结下

Spark Streaming 会在每个Batch任务结束时进行一次清理动作。每个DStream 都会被扫描,不同的DStream根据情况不同,保留的RDD数量也是不一致的,但都是根据rememberDuration变量决定,而该变量会被下游的DStream所影响,所以不同的DStream的rememberDuration取值是不一样的。

时间: 2024-12-23 10:58:11

Spark Streaming 数据清理机制的相关文章

Spark Streaming 数据接收优化

看这篇文章前,请先移步Spark Streaming 数据产生与导入相关的内存分析, 文章重点讲的是从Kafka消费到数据进入BlockManager的这条线路的分析. 这篇内容是个人的一些经验,大家用的时候还是建议好好理解内部的原理,不可照搬 让Receiver均匀的分布到你的Executor上 在Spark Streaming 数据产生与导入相关的内存分析中我说了这么一句话: 我发现在数据量很大的情况下,最容易挂掉的就是Receiver所在的Executor了. 建议Spark Stream

Spark Streaming 数据产生与导入相关的内存分析

前言 我这篇文章会分几个点来描述Spark Streaming 的Receiver在内存方面的表现. 一个大致的数据接受流程 一些存储结构的介绍 哪些点可能导致内存问题,以及相关的配置参数 另外,有位大牛写了Spark Streaming 源码解析系列,我觉得写的不错,这里也推荐下. 我在部门尽力推荐使用Spark Streaming做数据处理,目前已经应用在日志处理,机器学习等领域.这期间也遇到不少问题,尤其是Kafka在接受到的数据量非常大的情况下,会有一些内存相关的问题. 另外特别说明下,

Spark Streaming 误用.transform(func)函数导致的问题解析

问题描述 今天有朋友贴了一段 gist,大家可以先看看这段代码有什么问题. 特定情况你会发现UI 的Storage标签上有很多新的Cache RDD,然后你以为是Cache RDD 不被释放,但是通过Spark Streaming 数据清理机制分析我们可以排除这个问题. 接着通过给RDD的设置名字,名字带上时间,发现是延时的Batch 也会产生cache RDD.那这是怎么回事呢? 另外还有一个问题,也是相同的原因造成的:我通过KafkaInputStream.transform 方法获取Kaf

Spark修炼之道(进阶篇)——Spark入门到精通:第十四节 Spark Streaming 缓存、Checkpoint机制

作者:周志湖 微信号:zhouzhihubeyond 主要内容 本节内容基于官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一样,也可以将流式数据持久化到内容当中,采用的同样是persisit方法,调用该方法后D

Spark Streaming Crash 如何保证Exactly Once Semantics

前言 其实这次写Spark Streaming相关的内容,主要是解决在其使用过程中大家真正关心的一些问题.我觉得应该有两块: 数据接收.我在用的过程中确实产生了问题. 应用的可靠性.因为SS是7*24小时运行的问题,我想知道如果它Crash了,会不会丢数据. 第一个问题在之前的三篇文章已经有所阐述: Spark Streaming 数据产生与导入相关的内存分析 Spark Streaming 数据接收优化 Spark Streaming Direct Approach (No Receivers

《Spark大数据分析实战》——3.2节Spark Streaming

3.2 Spark StreamingSpark Streaming是一个批处理的流式计算框架.它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错性.下面将对Spark Streaming进行详细的介绍.3.2.1 Spark Streaming简介Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流式大数据处理能力.Spark Streaming将数据流以时间片为单位进行分割形成RDD,使用RDD操作处理每一块数据,每块数据(也就

Spark Streaming Direct Approach (No Receivers) 分析

前言 这个算是Spark Streaming 接收数据相关的第三篇文章了. 前面两篇是: Spark Streaming 数据产生与导入相关的内存分析 Spark Streaming 数据接收优化 Spark Streaming 接受数据的方式有两种: Receiver-based Approach Direct Approach (No Receivers) 上面提到的两篇文章讲的是 Receiver-based Approach . 而这篇文章则重点会分析Direct Approach (N

spark-在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉,怎么解决?

问题描述 在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉,怎么解决? 我的Spark Streaming代码如下所示: val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2) val words = lines.filter(examtep(_)) words.foreachRDD(exam(_)) //some other

spark streaming问题-六台机器集群,40M数据就报错,spark streaming运行例子程序wordcount

问题描述 六台机器集群,40M数据就报错,spark streaming运行例子程序wordcount 请大神帮忙解决一下:六台机器,SparkStreaming的例子程序,运行在yarn上四个计算节点(nodemanager),每台8G内存,i7处理器,想测测性能. 自己写了socket一直向一个端口发送数据,spark 接收并处理 运行十几分钟汇报错:WARN scheduler TaskSetManagerost task 0.1 in stage 265.0 :java.lang.Exc