Spark Streaming Crash 如何保证Exactly Once Semantics

前言

其实这次写Spark Streaming相关的内容,主要是解决在其使用过程中大家真正关心的一些问题。我觉得应该有两块:

  1. 数据接收。我在用的过程中确实产生了问题。
  2. 应用的可靠性。因为SS是7*24小时运行的问题,我想知道如果它Crash了,会不会丢数据。

第一个问题在之前的三篇文章已经有所阐述:

  • Spark Streaming 数据产生与导入相关的内存分析
  • Spark Streaming 数据接收优化
  • Spark Streaming Direct Approach (No Receivers) 分析

第二个问题则是这篇文章重点会分析的。需要了解的是,基本上Receiver Based Approach 已经被我否决掉了,所以这篇文章会以 Direct Approach 为基准点,详细分析应用Crash后,数据的安全情况。(PS:我这前言好像有点长 O(∩_∩)O~)

下文中所有涉及到Spark Streaming 的词汇我都直接用 SS了哈。

SS 自身可以做到at least once语义

SS 是靠CheckPoint 机制 来保证 at least once 语义的。 

如果你并不想了解这个机制,只是想看结论,可跳过这段,直接看  两个结论 

CheckPoint 机制

CheckPoint 会涉及到一些类,以及他们之间的关系:

DStreamGraph类负责生成任务执行图,而JobGenerator则是任务真实的提交者。任务的数据源则来源于DirectKafkaInputDStream,checkPoint 一些相关信息则是由类DirectKafkaInputDStreamCheckpointData 负责。

好像涉及的类有点多,其实没关系,我们完全可以不用关心他们。先看看checkpoint都干了些啥,checkpoint 其实就序列化了一个类而已:

org.apache.spark.streaming.Checkpoint

看看类成员都有哪些:

val master = ssc.sc.master
val framework = ssc.sc.appName
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDurationval pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll

其他的都比较容易理解,最重要的是 graph,该类全路径名是:

org.apache.spark.streaming.DStreamGraph

里面有两个核心的数据结构是:

private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()

inputStreams 对应的就是 DirectKafkaInputDStream 了。

再进一步,DirectKafkaInputDStream  有一个重要的对象

protected[streaming] override val checkpointData =  new DirectKafkaInputDStreamCheckpointData

checkpointData 里则有一个data 对象,里面存储的内容也很简单

data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]

就是每个batch 的唯一标识 time 对象,以及每个KafkaRDD对应的的Kafka偏移信息。

而 outputStreams 里则是RDD,如果你存储的时候做了foreach操作,那么应该就是 ForEachRDD了,他被序列化的时候是不包含数据的。

而downtime由checkpoint 时间决定,pending time之类的也会被序列化。

由上面的分析,我们可以得到如下的结论:

两个结论

  1. checkpoint 是非常高效的。没有涉及到实际数据的存储。一般大小只有几十K,因为只存了Kafka的偏移量等信息。
  2. checkpoint 采用的是序列化机制,尤其是DStreamGraph的引入,里面包含了可能如ForeachRDD等,而ForeachRDD里面的函数应该也会被序列化。如果采用了CheckPoint机制,而你的程序包做了做了变更,恢复后可能会有一定的问题。

扯远了,其实上面分析了那么多,就是想让你知道,SS 的checkpoint 到底都存储了哪些东西?我们看看JobGenerator是怎么提交一个真实的batch任务的,就清楚了。

  1. 产生jobs
  2. 成功则提交jobs 然后异步执行
  3. 失败则会发出一个失败的事件
  4. 无论成功或者失败,都会发出一个 DoCheckpoint 事件。
  5. 当任务运行完成后,还会再调用一次DoCheckpoint 事件。

只要任务运行完成后没能顺利执行完DoCheckpoint前crash,都会导致这次Batch被重新调度。也就说无论怎样,不存在丢数据的问题,而这种稳定性是靠checkpoint 机制以及Kafka的可回溯性来完成的。

那现在会产生一个问题,假设我们的业务逻辑会对每一条数据都处理,则

  1. 我们没有处理一条数据
  2. 我们可能只处理了部分数据
  3. 我们处理了全部数据

根据我们上面的分析,无论如何,这次失败了,都会被重新调度,那么我们可能会重复处理数据,可能最后失败的那一次数据的一部分,也可能是全部,但不会更多了。

业务需要做事务,保证 Exactly Once 语义

这里业务场景被区分为两个:

  1. 幂等操作
  2. 业务代码需要自身添加事物操作

所谓幂等操作就是重复执行不会产生问题,如果是这种场景下,你不需要额外做任何工作。但如果你的应用场景是不允许数据被重复执行的,那只能通过业务自身的逻辑代码来解决了。

这个SS 倒是也给出了官方方案:

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}

这代码啥含义呢? 就是说针对每个partition的数据,产生一个uniqueId,只有这个partion的所有数据被完全消费,则算成功,否则算失败,要回滚。下次重复执行这个uniqueId 时,如果已经被执行成功过的,则skip掉。

这样,就能保证数据 Exactly Once 语义啦。

其实Direct Approach 的容错性比较容易做,而且稳定。

后话

这篇内容本来不想做源码分析的,但是或多或少还是引入了一些。重要的是,为了保证Exactly Once Semantics ,你需要知道SS做了什么,你还需要做什么。

时间: 2025-01-20 19:04:26

Spark Streaming Crash 如何保证Exactly Once Semantics的相关文章

Spark Streaming Programming Guide

参考,http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html  Overview SparkStreaming支持多种流输入,like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets,并且可以在上面进行transform操作,最终数据存入HDFS,数据库或dashboard 另外可以把Spark's in-built machine

Spark Streaming 流式计算实战

这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容.  业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成 userName/year/month/day/hh/normal  userName/year/month/day/hh/delay 路径,存储到HDFS中.如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 no

Spark Streaming Dynamic Resource Allocation 文档(非官方特性)

必要配置 通过下面参数开启DRA spark.streaming.dynamicAllocation.enabled=true 设置最大最小的Executor 数目: spark.streaming.dynamicAllocation.minExecutors=0 spark.streaming.dynamicAllocation.maxExecutors=50 可选配置 这些参数可以不用配置,都已经提供了一个较为合理的默认值 开启日志: spark.streaming.dynamicAlloc

Spark Streaming 不同Batch任务可以并行计算么?

关于Spark Streaming中的任务有如下几个概念: Batch Job Stage Task 其实Job,Stage,Task都是Spark Core里就有的概念,Batch则是Streaming特有的概念.同一Stage里的Task一般都是并行的.同一Job里的Stage可以并行,但是一般如果有依赖则是串行,可以参考我这篇文章Spark 多个Stage执行是串行执行的么?. Job的并行度复杂些,由两个配置决定: spark.scheduler.mode(FIFO/FAIR) spar

《Spark官方文档》Spark Streaming编程指南(二)

累加器和广播变量 首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的.所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化.代码示例如下: Scala Java Python object WordBlacklist { @volatile private var ins

Spark Streaming 1.6 流式状态管理分析

关于状态管理 在流式计算中,数据是持续不断来的,有时候我们要对一些数据做跨周期(Duration)的统计,这个时候就不得不维护状态了.而状态管理对Spark 的 RDD模型是个挑战,因为在spark里,任何数据集都需要通过RDD来呈现,而RDD 的定义是一个不变的分布式集合.在状态管理中,比如Spark Streaming中的word-count 就涉及到更新原有的记录,比如在batch 1 中  A 出现1次,batch 2中出现3次,则总共出现了4次.这里就有两种实现: 获取batch 1

Spark Streaming容错的改进和零数据丢失

本文来自Spark Streaming项目带头人 Tathagata Das的博客文章,他现在就职于Databricks公司.过去曾在UC Berkeley的AMPLab实验室进行大数据和Spark Streaming的研究工作.本文主要谈及了Spark Streaming容错的改进和零数据丢失. 以下为原文: 实时流处理系统必须要能在24/7时间内工作,因此它需要具备从各种系统故障中恢复过来的能力.最开始,Spark Streaming就支持从driver和worker故障恢复的能力.然而有些

Spark Streaming场景应用- Spark Streaming计算模型及监控

Spark Streaming是一套优秀的实时计算框架.其良好的可扩展性.高吞吐量以及容错机制能够满足我们很多的场景应用.本篇结合我们的应用场景,介结我们在使用Spark Streaming方面的技术架构,并着重讲解Spark Streaming两种计算模型,无状态和状态计算模型以及该两种模型的注意事项;接着介绍了Spark Streaming在监控方面所做的一些事情,最后总结了Spark Streaming的优缺点. 一.概述 数据是非常宝贵的资源,对各级企事业单均有非常高的价值.但是数据的爆

《Spark大数据分析实战》——3.2节Spark Streaming

3.2 Spark StreamingSpark Streaming是一个批处理的流式计算框架.它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错性.下面将对Spark Streaming进行详细的介绍.3.2.1 Spark Streaming简介Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流式大数据处理能力.Spark Streaming将数据流以时间片为单位进行分割形成RDD,使用RDD操作处理每一块数据,每块数据(也就