Spark Streaming 数据接收优化

看这篇文章前,请先移步Spark Streaming 数据产生与导入相关的内存分析, 文章重点讲的是从Kafka消费到数据进入BlockManager的这条线路的分析。

这篇内容是个人的一些经验,大家用的时候还是建议好好理解内部的原理,不可照搬

让Receiver均匀的分布到你的Executor上

在Spark Streaming 数据产生与导入相关的内存分析中我说了这么一句话:

我发现在数据量很大的情况下,最容易挂掉的就是Receiver所在的Executor了。 建议Spark Streaming团队最好是能将数据写入到多个BlockManager上。

从现在的API来看,是没有提供这种途径的。但是Spark Streaming 提供了同时读多个topic的功能,每个topic是一个InputStream。 我们可以复用这个功能,具体代码如下:

val kafkaDStreams = (1 to kafkaDStreamsNum).map { _ => KafkaUtils.createStream(
ssc,
zookeeper,
groupId,
Map("your topic" -> 1),
if (memoryOnly) StorageLevel.MEMORY_ONLY else StorageLevel.MEMORY_AND_DISK_SER_2)}
val unionDStream = ssc.union(kafkaDStreams)
unionDStream

kafkaDStreamsNum 是你自己定义的,希望有多少个Executor 启动Receiver 去接收kafka数据。我的经验值是 1/4 个Executors 数目。因为数据还要做replication 一般,所以这样内存最大可以占到  1/2 的storage.

另外,务必给你系统设置 spark.streaming.receiver.maxRate。假设你启动了 N个 Receiver,那么你系统实际会接受到的数据不会超过 N*MaxRate,也就是说,maxRate参数是针对每个 Receiver 设置的。

减少非Storage 内存的占用

也就是我们尽量让数据都占用Spark 的Storage 内存。方法是把spark.streaming.blockInterval 调小点。当然也会造成一个副作用,就是input-block 会多。每个Receiver 产生的的input-block数为: batchInterval* 1000/blockInterval。 这里假设你的batchInterval 是以秒为单位的。 blockInterval 其实我也不知道会有啥影响。其实说白了,就是为了防止GC的压力。实时计算有一个很大问题是GC。

减少单个Executor的内存

一般在Spark Streaming中不建议把 Executor 的内存调的太大。对GC是个压力,大内存一FullGC比较可怕,很可能会拖垮整个计算。 多Executor的容错性也会更好些。

时间: 2024-08-31 20:22:59

Spark Streaming 数据接收优化的相关文章

Spark Streaming 数据清理机制

前言 为啥要了解机制呢?这就好比JVM的垃圾回收,虽然JVM的垃圾回收已经巨牛了,但是依然会遇到很多和它相关的case导致系统运行不正常. 这个内容我记得自己刚接触Spark Streaming的时候,老板也问过我,运行期间会保留多少个RDD? 当时没回答出来.后面在群里也有人问到了,所以就整理了下.文中如有谬误之处,还望指出. DStream 和 RDD 我们知道Spark Streaming 计算还是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Str

Spark Streaming 数据产生与导入相关的内存分析

前言 我这篇文章会分几个点来描述Spark Streaming 的Receiver在内存方面的表现. 一个大致的数据接受流程 一些存储结构的介绍 哪些点可能导致内存问题,以及相关的配置参数 另外,有位大牛写了Spark Streaming 源码解析系列,我觉得写的不错,这里也推荐下. 我在部门尽力推荐使用Spark Streaming做数据处理,目前已经应用在日志处理,机器学习等领域.这期间也遇到不少问题,尤其是Kafka在接受到的数据量非常大的情况下,会有一些内存相关的问题. 另外特别说明下,

Spark Streaming Crash 如何保证Exactly Once Semantics

前言 其实这次写Spark Streaming相关的内容,主要是解决在其使用过程中大家真正关心的一些问题.我觉得应该有两块: 数据接收.我在用的过程中确实产生了问题. 应用的可靠性.因为SS是7*24小时运行的问题,我想知道如果它Crash了,会不会丢数据. 第一个问题在之前的三篇文章已经有所阐述: Spark Streaming 数据产生与导入相关的内存分析 Spark Streaming 数据接收优化 Spark Streaming Direct Approach (No Receivers

Spark Streaming Direct Approach (No Receivers) 分析

前言 这个算是Spark Streaming 接收数据相关的第三篇文章了. 前面两篇是: Spark Streaming 数据产生与导入相关的内存分析 Spark Streaming 数据接收优化 Spark Streaming 接受数据的方式有两种: Receiver-based Approach Direct Approach (No Receivers) 上面提到的两篇文章讲的是 Receiver-based Approach . 而这篇文章则重点会分析Direct Approach (N

《Spark大数据分析实战》——3.2节Spark Streaming

3.2 Spark StreamingSpark Streaming是一个批处理的流式计算框架.它的核心执行引擎是Spark,适合处理实时数据与历史数据混合处理的场景,并保证容错性.下面将对Spark Streaming进行详细的介绍.3.2.1 Spark Streaming简介Spark Streaming是构建在Spark上的实时计算框架,扩展了Spark流式大数据处理能力.Spark Streaming将数据流以时间片为单位进行分割形成RDD,使用RDD操作处理每一块数据,每块数据(也就

Spark Streaming 误用.transform(func)函数导致的问题解析

问题描述 今天有朋友贴了一段 gist,大家可以先看看这段代码有什么问题. 特定情况你会发现UI 的Storage标签上有很多新的Cache RDD,然后你以为是Cache RDD 不被释放,但是通过Spark Streaming 数据清理机制分析我们可以排除这个问题. 接着通过给RDD的设置名字,名字带上时间,发现是延时的Batch 也会产生cache RDD.那这是怎么回事呢? 另外还有一个问题,也是相同的原因造成的:我通过KafkaInputStream.transform 方法获取Kaf

spark streaming问题-六台机器集群,40M数据就报错,spark streaming运行例子程序wordcount

问题描述 六台机器集群,40M数据就报错,spark streaming运行例子程序wordcount 请大神帮忙解决一下:六台机器,SparkStreaming的例子程序,运行在yarn上四个计算节点(nodemanager),每台8G内存,i7处理器,想测测性能. 自己写了socket一直向一个端口发送数据,spark 接收并处理 运行十几分钟汇报错:WARN scheduler TaskSetManagerost task 0.1 in stage 265.0 :java.lang.Exc

spark-在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉,怎么解决?

问题描述 在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉,怎么解决? 我的Spark Streaming代码如下所示: val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2) val words = lines.filter(examtep(_)) words.foreachRDD(exam(_)) //some other

大数据-spark streaming如何更好的计算关系型数据库中数据?

问题描述 spark streaming如何更好的计算关系型数据库中数据? 各位大虾过来围观一下. spark streaming在计算日志时通常会使用kafka+spark的架构, 目前很少看到有大虾讲spark streaming计算关系型数据库中的数据. 希望有大虾过来围观讨论,如何更好的把关系型数据库中的数据同步至spark中, 进行实时计算.有什么更好的架构或者开源软件的解决方案 解决方案 官网上看到Spark Streaming内置就支持两类数据源, 1) 基础数据源(Basic s