Spark Streaming 1.6 流式状态管理分析

关于状态管理

在流式计算中,数据是持续不断来的,有时候我们要对一些数据做跨周期(Duration)的统计,这个时候就不得不维护状态了。而状态管理对Spark 的 RDD模型是个挑战,因为在spark里,任何数据集都需要通过RDD来呈现,而RDD 的定义是一个不变的分布式集合。在状态管理中,比如Spark Streaming中的word-count 就涉及到更新原有的记录,比如在batch 1 中  A 出现1次,batch 2中出现3次,则总共出现了4次。这里就有两种实现:

  1. 获取batch 1 中的 状态RDD  和当前的batch RDD 做co-group 得到一个新的状态RDD。这种方式完美的契合了RDD的不变性,但是对性能却会有比较大的影响,因为需要对所有数据做处理,计算量和数据集大小是成线性相关的。这个我们后续会详细讨论。
  2. 第二种是一种变通的实现。因为没法变更RDD/Partition等核心概念,所以Spark Streaming在集合元素上做了文章,定义了MapWithStateRDD,将该RDD的元素做了限定,必须是MapWithStateRDDRecord 这个东西。该MapWithStateRDDRecord 保持有某个分区的所有key的状态(通过stateMap记录)以及计算结果(mappedData),元素MapWithStateRDDRecord 变成可变的,但是RDD 依然是不变的。

这两个方案分别对应了 updateStateByKey/mapWithState  的实现。

前言

在这篇文章中,Apache Spark 1.6发布,提到了spark1.6 三块性能改进:

  • Parquet性能
  • 自动内存管理模型
  • 流式状态管理10倍性能提升

之前就想系统的对这三块仔细阐述下。现在总算有了第二篇。

本文会从三个方面展开:

  1. updateStateByKey的实现;
  2. mapWithState(1.6新引入的流式状态管理)的实现
  3. mapWithState额外内容

updateStateByKey的实现

在 关于状态管理中,我们已经描述了一个大概。该方法可以在org.apache.spark.streaming.dstream.PairDStreamFunctions中找到。调用该方法后会构建出一个org.apache.spark.streaming.dstream.StateDStream对象。计算的方式也较为简单,核心逻辑是下面两行代码:

val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)

首先将prevStateRDD 和 parentRDD(新batch 的数据) 做一次cogroup,形成了 (K, Seq[V], Seq[W]) 这样的结果集。你会发现和updateStateByKey 要求的(Seq[V], Option[S])签名还是有些类似的。事实上这里的Seq[V] 就是parentRDD的对应K 的新的值。为了适配他两,Spark 内部会对你传进来的updateFunc 做两次转换,从而使得你的函数能够接受(K, Seq[V], Seq[W])这样的参数。看到这,想必你也就知道为啥updateStateByKey  接受的函数签名是那样的了。

前文我们提到,这样做很漂亮,代码也少,契合RDD的概念,然而你会发现无论parentRDD里有多少key,哪怕是只有一个,也需要对原有所有的数据做cogroup 并且全部做一遍处理(也就是应用你的update函数)。显然这是很低效的。很多场景下,新的batch 里只有一小部分数据,但是我们却不得不对所有的数据都进行计算。

正因为上面的问题,所以Spark Streaming 提出了一个新的API mapWithState,对应的jira为:Improved state management for Spark Streaming。除了我前面提到的性能问题,新的API 还提供两个新的功能:

  1. 可以为Key 设置TTL(Timeout)
  2. 用户可以对返回值进行控制

mapWithState(1.6新引入的流式状态管理)的实现

前面我们提到,在新的mapWithState API 中,核心思路是创建一个新的MapWithStateRDD,该RDD的元素是 MapWithStateRDDRecord,每个MapWithStateRDDRecord 记录某个Partiton下所有key的State。

依然的,你在org.apache.spark.streaming.dstream.PairDStreamFunctions 可以看到mapWithState 签名。

@Experimental
  def mapWithState[StateType: ClassTag, MappedType: ClassTag](
      spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType] = {
    new MapWithStateDStreamImpl[K, V, StateType, MappedType](
      self,
      spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
    )
  }

这一段代码有三点值得注意:

  1. 该接口在1.6 中还是 Experimental 状态
  2. 接受的不是一函数,而是一个StateSpec 的对象。
  3. 返回了一个新的DStream

其实StateSpec 只是一个包裹,你在实际操作上依然是定义一个函数,然后通过StateSpec进行包裹一下。以 wordcount 为例:

   val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      val output = (word, sum)
      state.update(sum)
      output
    }

接着StateSpec.function(mappingFunc) 包裹一下就可以传递给mapWithState。我们看到该函数更加清晰,word 是K,one新值,state 是原始值(本batch之前的状态值)。这里你需要把state 更新为新值,该实现是做了一个内部状态维护的,不像updateStateByKey一样,一切都是现算的。

MapWithStateDStreamImpl 的compute逻辑都委托给了InternalMapWithStateDStream,最终要得到MapWithStateRDD,基本是通过下面的逻辑来计算的:

val prevStateRDD = getOrCompute(validTime - slideDuration) match ...
val dataRDD = parent.getOrCompute(validTime).getOrElse {  context.sparkContext.emptyRDD[(K, V)]}
.....
val partitionedDataRDD = dataRDD.partitionBy(partitioner)
Some(new MapWithStateRDD(  prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))

这里有个很重要的操作是对dataRDD进行了partition操作,保证和prevStateRDD 按相同的分区规则进行分区。这个在后面做计算时有用。

获取到prevStateRDD,接着获取当前batch的数据的RDD,最后组装成一个新的MapWithStateRDD。MapWithStateRDD 还接受你定义的函数mappingFunction以及key的超时时间。

其中MapWithStateRDD 和别的RDD 不同之处在于RDD里的元素是MapWithStateRDDRecord 对象。其实prevStateRDD  也是个MapWithStateRDD 。

整个实际计算逻辑都在MapWithStateRDDRecord.updateRecordWithData 方法里。

前面我们提到,MapWithStateRDDRecord 是prevStateRDD 里的元素。有多少个分区,就有多少个MapWithStateRDDRecord 。一个Record 对应一个分区下所有数据的状态。在MapWithStateRDDRecord.updateRecordWithData 方法中,第一步是copy 当前record 的状态。这个copy是非常快的。我们会在mapWithSate额外内容 那个章节有更详细的分析。

 val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() } //prevRecord=MapWithStateRDDRecord[K, S, E]

接着定义了两个变量,其中mappedData  会作为最后的计算结果返回,wrappedState 类似Hadoop里的 Text,你可以不断给它赋值,然后获得一些新的功能,避免返回创建对象。它主要是给state添加了一些方法,比如update,define状态等。

 val mappedData = new ArrayBuffer[E]
 val wrappedState = new StateImpl[S]()

接着遍历当前batch 所有的数据,并且应用用户定义的函数。这里我们看到,我们只对当前batch的数据进行函数计算,而不是针对历史全集数据进行计算,这是一个很大的性能提升点。接着根据wrappedState的状态对newStateMap做更新,主要是删除或者数据的更新。最后将新的结果返回并且放到mappedData 。

dataIterator.foreach { case (key, value) =>
      wrappedState.wrap(newStateMap.get(key))
      val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
      if (wrappedState.isRemoved) {
        newStateMap.remove(key)
      } else if (wrappedState.isUpdated
          || (wrappedState.exists && timeoutThresholdTime.isDefined)) {
        newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
      }
      mappedData ++= returned
    }

上面这段逻辑,你会发现一个问题,如果dataIterator 里有重复的数据比如某个K 出现多次,则mappedData也会有多次。以wordcount 为例:

hello 出现了三次,所以会加入到mappedData中三次。其实我没发现这么做的意义,并且我认为会对内存占用造成一定的压力。

如果你想要最后的结果,需要调用完mapWithState 之后需要再调用一次stateSnapshots,就可以拿到第三栏的计算结果了。

经过上面的计算,我们对parentRDD里的每个分区进行计算,得到了mappedData以及newStateMap,这两个对象一起构建出MapWithStateRDDRecord,而该Record 则形成一个Partition,最后构成新的MapWithStateRDD。 

mapWithState额外内容

MapWithStateRDDRecord 透过stateMap 维护了某个分区下所有key的当前状态。 在前面的分析中,我们第一步便是clone old stateMap。如果集合非常大,拷贝也是很费时才对,而且还耗费内存。

所以如何实现好stateMap 变得非常重要:

  1. 实现过程采用的是 增量copy。也叫deltaMap。 新创建的stateMap 会引用旧的stateMap。新增数据会放到新的stateMap中,而更新,删除,查找等操作则有可能发生在老得stateMap上。缺点也是有的,如果stateMap 链路太长,则可能会对性能造成一定的影响。我们只要在特定条件下做合并即可。目前是超过DELTA_CHAIN_LENGTH_THRESHOLD=20 时会做合并。
  2. 使用 org.apache.spark.util.collection.OpenHashMap,该实现比java.util.HashMap 快5倍,并且占用更少的内存空间。不过该HashMap 无法进行删除操作。
时间: 2024-09-20 18:38:04

Spark Streaming 1.6 流式状态管理分析的相关文章

Spark Streaming 昨日 Active Batches,状态Processing,Scheduling Delay 8.2min

问题描述 SparkStreaming中的ActiveBatches中有许多昨天的batches处于processing状态SchedulingDelay却显示只有8.2min,今日的batches都处理正常,求助大侠 解决方案

ASP应用程序设计的Web状态管理分析

web|程序|设计     许多开发人员把应用程序传送到Web之前从来没考虑状态的概念.正如前面说过的,Web是一个无状态的环境.因此应该探讨一下状态是什么,了解能够避免产生问题的方法. 状态的准确定义 在单用户程序中,创建一个可执行的应用程序时,例如使用VB建立一个.exe文件,可以声明一个全局(或Public)变量,然后在代码中任何地方可对其进行访问.在应用程序运行的所有时刻,时刻值一直是有效,并且是可访问的. 对于一个传统的客户机/服务器解决方案,例如一个基于客户机的应用程序对一个基于服务

从Storm和Spark 学习流式实时分布式计算的设计

0. 背景 最近我在做流式实时分布式计算系统的架构设计,而正好又要参加CSDN博文大赛的决赛.本来想就写Spark源码分析的文章吧.但是又想毕竟是决赛,要拿出一些自己的干货出来,仅仅是源码分析貌似分量不够.因此,我将最近一直在做的系统架构的思路整理出来,形成此文.为什么要参考Storm和Spark,因为没有参照效果可能不会太好,尤其是对于Storm和Spark由了解的同学来说,可能通过对比,更能体会到每个具体实现背后的意义. 本文对流式系统出现的背景,特点,数据HA,服务HA,节点间和计算逻辑间

流式大数据实时处理技术、平台及应用

摘要:大数据处理系统根据其时效性可分为批式大数据和流式大数据两类.上述两类系统均无法满足"事中"感知查询分析处理模式的需求.为此,从分析大数据应用场景入手,提出了"流立方"流式大数据实时处理技术和平台,在完整大数据集上实现了低迟滞.高实时的即席查询分析.目前基于"流立方"平台开发的业务系统已应用到金融风控反欺诈.机器防御等领域,具有广阔的应用前景. 1.引言 大数据技术的广泛应用使其成为引领众多行业技术进步.促进效益增长的关键支撑技术.根据数据处

利用Spark Streaming实现分布式采集系统

前言 前两天我刚在自己的一篇文章中鼓吹数据天生就是流式的,并且指出: 批量计算已经在慢慢退化,未来必然是属于流式计算的,数据的流动必定是由数据自己驱动流转的. 而Spark Streaming 在上层概念上,完美融合了批量计算和流式计算,让他们你中有我,我中有你,这种设计使得Spark Streaming 作为流式计算的一个载体,同时也能作为其他一些需要分布式架构的问题提供解决方案. Spark Streaming 作为一些分布式任务系统基础的优势 天然就是分布式的,不用再为实现分布式协调而蛋疼

Spark Streaming 流式计算实战

这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容.  业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成 userName/year/month/day/hh/normal  userName/year/month/day/hh/delay 路径,存储到HDFS中.如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 no

Spark与EVAM组CP,应对实时流式事件处理

文章讲的是Spark与EVAM组CP,应对实时流式事件处理,Apache Spark是一个开源大数据处理框架,其速度快并具有很好的易用性,并且可用来应对复杂分析.Spark可以支持流式处理,图表数据,机器学习,以及Hadoop和map/reduce.它是探索建立实时事件处理的理想平台. EVAM是一个实时事件处理方面的领跑者,有超过40种解决方案可以支持全球超两亿用户的实时事件处理.最近几年,EVAM已经建立了多种客户解决方案,比较流行的几个比如AWS Kinesis和RedShift.既然Ap

流式大数据处理的三种框架:Storm,Spark和Samza

许多分布式计算系统都可以实时或接近实时地处理大数据流.本文将对三种Apache框架分别进行简单介绍,然后尝试快速.高度概述其异同. Apache Storm 在Storm中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology).这个拓扑将会被提交给集群,由集群中的主控节点(master node)分发代码,将任务分配给工作节点(worker node)执行.一个拓扑中包括spout和bolt两种角色,其中spout发送消息,负责将数据流以tuple元组的形式发送出去;而bolt

专访阿里云高级技术专家吴威:Kafka、Spark和Flink类支持流式计算的软件会越来越流行

杭州·云栖大会将于2016年10月13-16日在云栖小镇举办,在这场标签为互联网.创新.创业的云计算盛宴上,众多行业精英都将在这几天里分享超过450个演讲主题. 为了帮助大家进一步了解这场全球前言技术共振盛会的内容,采访了各个论坛的大咖,以飨读者. 以下为正文: 吴威,阿里云高级技术专家.E-MapReduce产品是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,为用户提供集群.作业.数据等管理的一站式大数据处理分析服务,他在其中参与产品设计讨论.平台性能调优等工作,并为用户提供技