Tuning Spark

Data Serialization

数据序列化,对于任意分布式系统都是性能的关键点

Spark默认使用Java serialization,这个比较低效

推荐使用,Kryo serialization,会比Java序列化,更快更小, Spark使用Twitter chill library(Kryo的scala扩展)

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

conf.set("spark.kryoserializer.buffer.mb“, 2), 需要大于最大的需要序列化的对象size

之所以,spark不默认使用Kryo,因为Kryo需要显式的注册program中使用到的class,参考

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

只所以要做注册是因为,在把对象序列化成byte[]时,要记录下classname,classname带namespace一般很长的,所以每个里面加上这个classname比较费空间
在kryo里面注册过后,会用一个int来替代classname
当然不注册kryo也是可以用的,只是会多占空间

 

Memory Tuning

Tuning之前需要知道当前dataset的内存消耗是多少,
简单的方法是,以该dataset创建rdd,然后cache
这样从SparkContext的日志里面可以看到每个partition的大小,加一下,就可以得到整个数据集的大小

INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)
This means that partition 1 of RDD 0 consumed 717.5 KB.
然后可以从几个方面去进行优化,

Tuning Data Structures

Java对象虽然便于访问,但是和raw data比,java对象的size要大2~5倍
Each distinct Java object has an “object header”, which is about 16 bytes
Java Strings have about 40 bytes of overhead over the raw string data, and store each character as two bytes due to String’s internal usage of UTF-16 encoding
其他的比如HashMap或LinkedList,除了header,还需要8 bytes pointer来指向下个对象
总之,就是对于内存敏感的应用,直接使用Java对象是非常不经济的
可以从以下几点去优化,
a, 优先使用arrays of objects, and primitive types,而非java或scala的标准collection class
或者使用fastutil library,这个库提供了用primitive types实现的collection class
b, 避免含有大量小对象或pointer的嵌套数据结构
c, Consider using numeric IDs or enumeration objects instead of strings for keys
d, If you have less than 32 GB of RAM, set the JVM flag -XX:+UseCompressedOops to make pointers be four bytes instead of eight. You can add these options in spark-env.sh.

Serialized RDD Storage

使用MEMORY_ONLY_SER,在memory中cache序列化后的数据,降低内存使用,当然响应的访问速度会降低,由于需要反序列化

Garbage Collection Tuning

首先需要打开gc日志,
adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to the Java options

Cache Size Tuning
默认Spark使用60% 的executor memory(spark.executor.memory)来cache RDDs.
也就是说只有40%的memory用于task执行,如果发现频繁gc或是oom,可以调低用于cache的比例,
conf.set("spark.storage.memoryFraction", "0.5"), 这样设成50%
Advanced GC Tuning
Spark做gc tuning的目标是,避免在task执行过程中发生full gc, 即需要让Young区足够容纳short-lived objects
a, 如果发生多次full gc或是OldGen已经接近full,说明内存不够,可以降低cache比例
b, 如果很多minor gc,但没有major gc,说明young区过小, 我们可以根据task dataset需要消耗内存来预估eden区,young区大小= eden区 × (4/3),因为要加上survivor区
c, 如果从hdfs读取数据,可以根据hdfs block大小来预估eden区大小,比如,如果解压比例3倍,4个tasks并行,block大小64M,那么eden区大小 = 3×4×64M
 

其他的一些考虑,

调整并发的level, 通过增加并发来降低reduce task的内存消耗

broadcast functionality来处理大的变量, data locality

本文章摘自博客园,原文发布日期:2015-04-21

时间: 2024-09-08 13:28:32

Tuning Spark的相关文章

『 Spark 』5. 这些年,你不能错过的 spark 学习资源

原文链接:『 Spark 』5. 这些年,你不能错过的 spark 学习资源 写在前面 本系列是综合了自己在学习spark过程中的理解记录 + 对参考文章中的一些理解 + 个人实践spark过程中的一些心得而来.写这样一个系列仅仅是为了梳理个人学习spark的笔记记录,所以一切以能够理解为主,没有必要的细节就不会记录了,而且文中有时候会出现英文原版文档,只要不影响理解,都不翻译了.若想深入了解,最好阅读参考文章和官方文档. 其次,本系列是基于目前最新的 spark 1.6.0 系列开始的,spa

《Spark官方文档》Spark Streaming编程指南(一)

Spark Streaming编程指南 概览   Spark Streaming是对核心Spark API的一个扩展,它能够实现对实时数据流的流式处理,并具有很好的可扩展性.高吞吐量和容错性.Spark Streaming支持从多种数据源提取数据,如:Kafka.Flume.Twitter.ZeroMQ.Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map.reduce.join和window等.最后,Spark Streaming支持将处理完的数据推送到文

《Spark 官方文档》Spark编程指南

Spark编程指南 概述 总体上来说,每个Spark应用都包含一个驱动器(driver)程序,驱动器运行用户的main函数,并在集群上执行各种并行操作. Spark最重要的一个抽象概念就是弹性分布式数据集(resilient distributed dataset – RDD),RDD是一个可分区的元素集合,其包含的元素可以分布在集群各个节点上,并且可以执行一些分布式并行操作.RDD通常是通过,HDFS(或者其他Hadoop支持的文件系统)上的文件,或者驱动器中的Scala集合对象,来创建或转换

《Spark官方文档》Spark Streaming编程指南(二)

累加器和广播变量 首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的.所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化.代码示例如下: Scala Java Python object WordBlacklist { @volatile private var ins

Spark Streaming Programming Guide

参考,http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html  Overview SparkStreaming支持多种流输入,like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets,并且可以在上面进行transform操作,最终数据存入HDFS,数据库或dashboard 另外可以把Spark's in-built machine

《Spark 官方文档》Spark配置(一)

Spark配置 Spark有以下三种方式修改配置: Spark properties (Spark属性)可以控制绝大多数应用程序参数,而且既可以通过 SparkConf 对象来设置,也可以通过Java系统属性来设置. Environment variables (环境变量)可以指定一些各个机器相关的设置,如IP地址,其设置方法是写在每台机器上的conf/spark-env.sh中. Logging (日志)可以通过log4j.properties配置日志. Spark属性 Spark属性可以控制

Spark调优经验总结

概述 本文以Spark实践经验和Spark原理为依据,总结了Spark性能调优的一些方法.这些总结基于Spark-1.0.0版本.对于最近推出的Spark-1.1.0版本,本文介绍了几个版本增强. Spark性能调优 Executor和分区 Executor是一个独立的JVM进程,每个任务会有独立的线程来执行,Executor最大可并发任务数量与其拥有的核心数量相同,执行过程中的数据缓存放在Executor的全局空间中.根据以上我们可以得出: 同一个Executor中执行的任务,可以共享同一个数

《Spark 官方文档》Spark配置(二)

内存管理 属性名 默认值 含义 spark.memory.fraction 0.75 堆内存中用于执行.混洗和存储(缓存)的比例.这个值越低,则执行中溢出到磁盘越频繁,同时缓存被逐出内存也更频繁.这个配置的目的,是为了留出用户自定义数据结构.内部元数据使用的内存.推荐使用默认值.请参考this description. spark.memory.storageFraction 0.5 不会被逐出内存的总量,表示一个相对于 spark.memory.fraction的比例.这个越高,那么执行混洗等

Apache Spark 1.5新特性介绍

Apache Spark社区刚刚发布了1.5版本,大家一定想知道这个版本的主要变化,这篇文章告诉你答案. DataFrame执行后端优化(Tungsten第一阶段) DataFrame可以说是整个Spark项目最核心的部分,在1.5这个开发周期内最大的变化就是Tungsten项目的第一阶段已经完成.主要的变化是由Spark自己来管理内存而不是使用JVM,这样可以避免JVM GC带来的性能损失.内存中的Java对象被存储成Spark自己的二进制格式,计算直接发生在二进制格式上,省去了序列化和反序列