Spark Streaming场景应用- Spark Streaming计算模型及监控

Spark Streaming是一套优秀的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们很多的场景应用。本篇结合我们的应用场景,介结我们在使用Spark Streaming方面的技术架构,并着重讲解Spark Streaming两种计算模型,无状态和状态计算模型以及该两种模型的注意事项;接着介绍了Spark Streaming在监控方面所做的一些事情,最后总结了Spark Streaming的优缺点。

一、概述

数据是非常宝贵的资源,对各级企事业单均有非常高的价值。但是数据的爆炸,导致原先单机的数据处理已经无法满足业务的场景需求。因此在此基础上出现了一些优秀的分布式计算框架,诸如Hadoop、Spark等。离线分布式处理框架虽然能够处理非常大量的数据,但是其迟滞性很难满足一些特定的需求场景,比如push反馈、实时推荐、实时用户行为等。为了满足这些场景,使数据处理能够达到实时的响应和反馈,又随之出现了实时计算框架。目前的实时处理框架有Apache Storm、Apache Flink以及Spark Streaming等。其中Spark Streaming由于其本身的扩展性、高吞吐量以及容错能力等特性,并且能够和离线各种框架有效结合起来,因而是当下是比较受欢迎的一种流式处理框架。

根据其官方文档介绍,Spark Streaming 有高扩展性、高吞吐量和容错能力强的特点。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。其架构见下图:

Spark Streaming 其优秀的特点给我们带来很多的应用场景,如网站监控和网络监控、异常监测、网页点击、用户行为、用户迁移等。本文中,将为大家详细介绍,我们的应用场景中,Spark Streaming的技术架构、两种状态模型以及Spark Streaming监控等。

二、应用场景

在 Spark Streaming 中,处理数据的单位是一批而不是单条,而数据采集却是逐条进行的,因此 Spark Streaming 系统需要设置间隔使得数据汇总到一定的量后再一并操作,这个间隔就是批处理间隔。批处理间隔是 Spark Streaming 的核心概念和关键参数,它决定了 Spark Streaming 提交作业的频率和数据处理的延迟,同时也影响着数据处理的吞吐量和性能。

2.1 框架

目前我们Spark Streaming的业务应用场景包括异常监测、网页点击、用户行为以及用户地图迁徙等场景。按计算模型来看大体可分为无状态的计算模型以及状态计算模型两种。在实际的应用场景中,我们采用Kafka作为实时输入源,Spark Streaming作为计算引擎处理完数据之后,再持久化到存储中,包括MySQL、HDFS、ElasticSearch以及MongoDB等;同时Spark Streaming 数据清洗后也会写入Kafka,然后经由Flume持久化到HDFS;接着基于持久化的内容做一些UI的展现。架构见下图:

2.2 无状态模型

无状态模型只关注当前新生成的DStream数据,所以的计算逻辑均基于该批次的数据进行处理。无状态模型能够很好地适应一些应用场景,比如网站点击实时排行榜、指定batch时间段的用户访问以及点击情况等。该模型由于没有状态,并不需要考虑有状态的情况,只需要根据业务场景保证数据不丢就行。此种情况一般采用Direct方式读取Kafka数据,并采用监听器方式持久化Offsets即可。具体流程如下:

其上模型框架包含以下几个处理步骤:

  • 读取Kafka实时数据;
  • Spark Streaming Transformations以及actions操作;
  • 将数据结果持久化到存储中,跳转到步骤一。

受网络、集群等一些因素的影响,实时程序出现长时失败,导致数据出现堆积。此种情况下是丢掉堆积的数据从Kafka largest处消费还是从之前的Kafka offsets处消费,这个取决具体的业务场景。

2.3 状态模型

有状态模型是指DStreams在指定的时间范围内有依赖关系,具体的时间范围由业务场景来指定,可以是2个及以上的多个batch time RDD组成。Spark Streaming提供了updateStateByKey方法来满足此类的业务场景。因涉及状态的问题,所以在实际的计算过程中需要保存计算的状态,Spark Streaming中通过checkpoint来保存计算的元数据以及计算的进度。该状态模型的应用场景有网站具体模块的累计访问统计、最近N batch time 的网站访问情况以及app新增累计统计等等。具体流程如下:

上述流程中,每batch time计算时,需要依赖最近2个batch time内的数据,经过转换及相关统计,最终持久化到MySQL中去。不过为了确保每个计算仅计算2个batch time内的数据,需要维护数据的状态,清除过期的数据。我们先来看下updateStateByKey的实现,其代码如下:

  • 暴露了全局状态数据中的key类型的方法。

  1. def updateStateByKey[S: ClassTag]( 
  2.       updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], 
  3.       partitioner: Partitioner, 
  4.       rememberPartitioner: Boolean 
  5.     ): DStream[(K, S)] = ssc.withScope { 
  6.      new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) 

隐藏了全局状态数据中的key类型,仅对Value提供自定义的方法。


  1. def updateStateByKey[S: ClassTag]( 
  2.       updateFunc: (Seq[V], Option[S]) => Option[S], 
  3.       partitioner: Partitioner, 
  4.       initialRDD: RDD[(K, S)] 
  5.     ): DStream[(K, S)] = ssc.withScope { 
  6.     val cleanedUpdateF = sparkContext.clean(updateFunc) 
  7.     val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { 
  8.       iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s))) 
  9.     } 
  10.     updateStateByKey(newUpdateFunc, partitioner, true, initialRDD) 

以上两种方法分别给我们提供清理过期数据的思路:

  • 泛型K进行过滤。K表示全局状态数据中对应的key,如若K不满足指定条件则反回false;
  • 返回值过滤。第二个方法中自定义函数指定了Option[S]返回值,若过期数据返回None,那么该数据将从全局状态中清除。

三、Spark Streaming监控

同Spark一样,Spark Streaming也提供了Jobs、Stages、Storage、Enviorment、Executors以及Streaming的监控,其中Streaming监控页的内容如下图:

上图是Spark UI中提供一些数据监控,包括实时输入数据、Scheduling Delay、处理时间以及总延迟的相关监控数据的趋势展现。另外除了提供上述数据监控外,Spark UI还提供了Active Batches以及Completed Batches相关信息。Active Batches包含当前正在处理的batch信息以及堆积的batch相关信息,而Completed Batches刚提供每个batch处理的明细数据,具体包括batch time、input size、scheduling delay、processing Time、Total Delay等,具体信息见下图:

Spark Streaming能够提供如此优雅的数据监控,是因在对监听器设计模式的使用。如若Spark UI无法满足你所需的监控需要,用户可以定制个性化监控信息。Spark Streaming提供了StreamingListener特质,通过继承此方法,就可以定制所需的监控,其代码如下:


  1. @DeveloperApi 
  2.     trait StreamingListener { 
  3.  
  4.       /** Called when a receiver has been started */ 
  5.       def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } 
  6.  
  7.       /** Called when a receiver has reported an error */ 
  8.       def onReceiverError(receiverError: StreamingListenerReceiverError) { } 
  9.  
  10.       /** Called when a receiver has been stopped */ 
  11.       def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { } 
  12.  
  13.       /** Called when a batch of jobs has been submitted for processing. */ 
  14.       def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } 
  15.  
  16.       /** Called when processing of a batch of jobs has started.  */ 
  17.       def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } 
  18.  
  19.       /** Called when processing of a batch of jobs has completed. */ 
  20.       def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } 
  21.  
  22.       /** Called when processing of a job of a batch has started. */ 
  23.       def onOutputOperationStarted( 
  24.           outputOperationStarted: StreamingListenerOutputOperationStarted) { } 
  25.  
  26.       /** Called when processing of a job of a batch has completed. */ 
  27.       def onOutputOperationCompleted( 
  28.           outputOperationCompleted: StreamingListenerOutputOperationCompleted) { } 
  29.     } 

目前,我们保存Offsets时,采用继承StreamingListener方式,此是一种应用场景。当然也可以监控实时计算程序的堆积情况,并在达到一阈值后发送报警邮件。具体监听器的定制还得依据应用场景而定。

四、Spark Streaming优缺点

Spark Streaming并非是Storm那样,其并非是真正的流式处理框架,而是一次处理一批次数据。也正是这种方式,能够较好地集成Spark 其他计算模块,包括MLlib(机器学习)、Graphx以及Spark SQL。这给实时计算带来很大的便利,与此带来便利的同时,也牺牲作为流式的实时性等性能。

4.1 优点

  • Spark Streaming基于Spark Core API,因此其能够与Spark中的其他模块保持良好的兼容性,为编程提供了良好的可扩展性;
  • Spark Streaming 是粗粒度的准实时处理框架,一次读取完或异步读完之后处理数据,且其计算可基于大内存进行,因而具有较高的吞吐量;
  • Spark Streaming采用统一的DAG调度以及RDD,因此能够利用其lineage机制,对实时计算有很好的容错支持;
  • Spark Streaming的DStream是基于RDD的在流式数据处理方面的抽象,其transformations 以及actions有较大的相似性,这在一定程度上降低了用户的使用门槛,在熟悉Spark之后,能够快速上手Spark Streaming。

4.2 缺点

  • Spark Streaming是准实时的数据处理框架,采用粗粒度的处理方式,当batch time到时才会触发计算,这并非像Storm那样是纯流式的数据处理方式。此种方式不可避免会出现相应的计算延迟 。
  • 目前来看,Spark Streaming稳定性方面还是会存在一些问题。有时会因一些莫名的异常导致退出,这种情况下得需要自己来保证数据一致性以及失败重启功能等。

四、总结

本篇文章主要介绍了Spark Streaming在实际应用场景中的两种计算模型,包括无状态模型以及状态模型;并且重点关注了下Spark Streaming在监控方面所作的努力。首先本文介绍了Spark Streaming应用场景以及在我们的实际应用中所采取的技术架构。在此基础上,引入无状态计算模型以及有状态模型两种计算模型;接着通过监听器模式介绍Spark UI相关监控信息等;最后对Spark Streaming的优缺点进行概括。希望本篇文章能够给各位带来帮助,后续我们会介绍Spark Streaming在场景应用中我们所做的优化方面的努力,敬请期待!

本文作者:徐胜国

来源:51CTO

时间: 2024-10-02 15:22:09

Spark Streaming场景应用- Spark Streaming计算模型及监控的相关文章

大数据-spark streaming如何更好的计算关系型数据库中数据?

问题描述 spark streaming如何更好的计算关系型数据库中数据? 各位大虾过来围观一下. spark streaming在计算日志时通常会使用kafka+spark的架构, 目前很少看到有大虾讲spark streaming计算关系型数据库中的数据. 希望有大虾过来围观讨论,如何更好的把关系型数据库中的数据同步至spark中, 进行实时计算.有什么更好的架构或者开源软件的解决方案 解决方案 官网上看到Spark Streaming内置就支持两类数据源, 1) 基础数据源(Basic s

《Spark大数据处理:技术、应用与性能优化》——第3章 Spark计算模型3.1 Spark程序模型

第3章 Spark计算模型 创新都是站在巨人的肩膀上产生的,在大数据领域也不例外.微软的Dryad使用DAG执行模式.子任务自由组合的范型.该范型虽稍显复杂,但较为灵活.Pig也针对大关系表的处理提出了很多有创意的处理方式,如flatten.cogroup.经典虽难以突破,但作为后继者的Spark借鉴经典范式并进行创新.经过实践检验,Spark的编程范型在处理大数据时显得简单有效.的数据处理与传输模式也大获全胜.Spark站在巨人的肩膀上,依靠Scala强有力的函数式编程.Actor通信模式.闭

Spark :工作组上的集群计算的框架

翻译:Esri 卢萌 本文翻译自加州伯克利大学AMP lab的Matei大神发表的关于Spark框架的第一篇论文,限于本人英文水平很烂,所以翻译中肯定有很多错误,请发现了错误的直接与我联系,感谢. (括号中,斜体字部分是我自己做的解释) 摘要: MapReduce以及其的各种变种,在商业集群上进行的对大规模密集型数据集的应用上已经取得了很大的成功.然而大多数这类系统都是围绕着一个非迭代型 的数据流模型,这种模型不适用于目前很多主流的应用程序.本文的研究侧重于介绍其中这样一类应用:重复使用跨多个并

hadoop-在OLTP环境下,spark使用场景

问题描述 在OLTP环境下,spark使用场景 现在有个OLTP系统,调用的接口规则比较复杂,接口传入数据也比较多,需要1-2秒返回结果,有1000条记录2-3个字段,能否利用spark相关组件作为服务程序,并行处理这些记录,加快响应时间,或在本地多线程并行处理,spark设计主要目的是处理异步模型,对于同步模型处理是否在机制上有时延. 解决方案 http://zhidao.baidu.com/link?url=g_Nc2TOTL3ysGK4u83d0DP3EVcx_QJUYZcnMcp7eSG

spark使用场景咨询

问题描述 现在有个OLTP系统,调用的接口规则比较复杂,接口传入数据也比较多,需要1-2秒返回结果,有1000条记录2-3个字段,能否利用spark相关组件作为服务程序,并行处理这些记录,加快响应时间,或在本地多线程并行处理,spark设计主要目的是处理异步模型,对于同步模型处理是否在机制上有时延.

满满的技术干货!Spark顶级会议Apache Spark Summit精华讲义分享

Apache Spark Summit是Spark技术的顶级会议,这里大咖云集,一同探讨世界上最新的Spark发展动态以及产品应用和技术实践. 讲义资料持续更新中... 2月20日更新 [Spark Summit East 2017]工程快速索引[Spark Summit East 2017]提升Python与Spark的性能和互操作性[Spark Summit East 2017]Spark中的容错:从生产实践中获取的经验[Spark Summit East 2017]Spark:将数据科学作

Spark修炼之道——Spark学习路线、课程大纲

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 部分内容会在实际编写时动态调整,或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据开

AI瑞文智力测验超美国人平均IQ,计算模型用类比推理解决视觉问题

作为广泛应用于无国界的智力/推理能力测试,瑞文标准推理测验可以测验一个人的观察力及推理能力.在此前一项广受争议的对超过 80 个国家和地区进行的 IQ 调查中,曾得出了所谓的"国家(和地区)平均 IQ".美国西北大学的研究团队开发出了一个新的模型,能够在标准智力测试中超过到美国人的平均 IQ 水平.这项研究构建了用类比推理解决视觉问题的模型,研究者表示:"目前绝大多关于视觉的 AI 研究都集中在对象识别或场景标记,而非推理.但是识别只有能够为后续推理所用才有其意义.我们的研究

基于MapReduce计算模型的气象资料处理调优试验

基于MapReduce计算模型的气象资料处理调优试验 杨润芝 沈文海 肖卫青 胡开喜 杨昕 王颖 田伟 云计算技术使用分布式的计算技术实现了并行计算的计算能力和计算效率,解决了单机服务器计算能力低的问题.基于长序列历史资料所计算得出的气候标准值对于气象领域实时业务.准实时业务及科学研究中均具有重要的意义.由于长序列历史资料数据量大.运算逻辑较复杂,在传统单节点计算平台上进行整编计算耗时非常长.该文基于Hadoop分布式计算框架搭建了集群模式的云计算平台,以长序列历史资料作为源数据,基于MapRe