Spark性能优化——和shuffle搏斗

Spark的性能分析和调优很有意思,今天再写一篇。主要话题是shuffle,当然也牵涉一些其他代码上的小把戏。

以前写过一篇文章,比较了几种不同场景的性能优化,包括portal的性能优化,web service的性能优化,还有Spark job的性能优化。Spark的性能优化有一些特殊的地方,比如实时性一般不在考虑范围之内,通常我们用Spark来处理的数据,都是要求异步得到结果的数据;再比如数据量一般都很大,要不然也没有必要在集群上操纵这么一个大家伙,等等。事实上,我们都知道没有银弹,但是每一种性能优化场景都有一些特定的“大boss”,通常抓住和解决大boss以后,能解决其中一大部分问题。比如对于portal来说,是页面静态化,对于web service来说,是高并发(当然,这两种可以说并不确切,这只是针对我参与的项目总结的经验而已),而对于Spark来说,这个大boss就是shuffle。

首先要明确什么是shuffle。Shuffle指的是从map阶段到reduce阶段转换的时候,即map的output向着reduce的input映射的时候,并非节点一一对应的,即干map工作的slave A,它的输出可能要分散跑到reduce节点A、B、C、D …… X、Y、Z去,就好像shuffle的字面意思“洗牌”一样,这些map的输出数据要打散然后根据新的路由算法(比如对key进行某种hash算法),发送到不同的reduce节点上去。(下面这幅图来自《Spark Architecture: Shuffle》

为什么说shuffle是Spark job的大boss,就是因为Spark本身的计算通常都是在内存中完成的,比如这样一个map结构的RDD:(String, Seq),key是字符串,value是一个Seq,如果只是对value进行一一映射的map操作,比如(1)先计算Seq的长度,(2)再把这个长度作为元素添加到Seq里面去。这两步计算,都可以在local完成,而事实上也是在内存中操作完成的,换言之,不需要跑到别的node上去拿数据,因此执行的速度是非常快的。但是,如果对于一个大的rdd,shuffle发生的时候,就会因为网络传输、数据序列化/反序列化产生大量的磁盘IO和CPU开销。这个性能上的损失是非常巨大的。

要减少shuffle的开销,主要有两个思路:

  1. 减少shuffle次数,尽量不改变key,把数据处理在local完成;
  2. 减少shuffle的数据规模。

先去重,再合并

比如有A、B这样两个规模比较大的RDD,如果各自内部有大量重复,那么二者一合并,再去重:


1

A.union(B).distinct()

这样的操作固然正确,但是如果可以先各自去重,再合并,再去重,可以大幅度减小shuffle的开销(注意Spark的默认union和Oracle里面的“union all”很像——不去重):


1

A.distinct().union(B.distinct()).distinct()

看起来变复杂了对不对,但是当时我解决这个问题的时候,用第二种方法时间开销从3个小时减到20分钟。

如果中间结果rdd如果被调用多次,可以显式调用cache()和persist(),以告知Spark,保留当前rdd。当然,即便不这么做,Spark依然存放不久前计算过的结果(以下来自官方指南):

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

数据量大,并不一定慢。通常情况下,由于Spark的job是放到内存里面进行运算的,因此一个复杂的map操作不一定执行起来很慢。但是如果牵涉到shuffle,这里面有网络传输和序列化的问题,就有可能非常慢。

类似地,还有filter等等操作,目的也是要先对大的RDD进行“瘦身”操作,然后在做其他操作。

mapValues比map好

明确key不会变的map,就用mapValues来替代,因为这样可以保证Spark不会shuffle你的数据:


1

A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}

改成:


1

A.mapValues{case ((B, C), (D, E)) => (B, C, E)}

用broadcast + filter来代替join

这种优化是一种特定场景的神器,就是拿大的RDD A去join一个小的RDD B,比如有这样两个RDD:

  • A的结构为(name, age, sex),表示全国人民的RDD,超大
  • B的结果为(age, title),表示“年龄 -> 称号”的映射,比如60岁有称号“花甲之年”,70岁则是“古稀之年”,这个RDD显然很小,因为人的年龄范围在0~200岁之间,而且有的“年龄”还没有“称号”

现在我要从全国人民中找出这些有称号的人来。如果直接写成:


1

2

3

A.map{case (name, age, sex) => (age, (name, sex))}

 .join(B)

 .map{case (age, ((name, sex), title)) => (name, age, sex)}

你就可以想象,执行的时候超大的A被打散和分发到各个节点去。而且更要命的是,为了恢复一开始的(name, age, sex)的结构,又做了一次map,而这次map一样导致shuffle。两次shuffle,太疯狂了。但是如果这样写:


1

2

val b = sc.broadcast(B.collectAsMap)

A.filter{case (name, age, sex) => b.values.contains(age)}

一次shuffle都没有,A老老实实待着不动,等着全量的B被分发过来。

另外,在Spark SQL里面直接有BroadcastHashJoin,也是把小的rdd广播出去。

不均匀的shuffle

在工作中遇到这样一个问题,需要转换成这样一个非常巨大的RDD A,结构是(countryId, product),key是国家id,value是商品的具体信息。当时在shuffle的时候,这个hash算法是根据key来选择节点的,但是事实上这个countryId的分布是极其不均匀的,大部分商品都在美国(countryId=1),于是我们通过Ganglia看到,其中一台slave的CPU特别高,计算全部聚集到那一台去了。

找到原因以后,问题解决就容易了,要么避免这个shuffle,要么改进一下key,让它的shuffle能够均匀分布(比如可以拿countryId+商品名称的tuple作key,甚至生成一个随机串)。

明确哪些操作必须在master完成

如果想打印一些东西到stdout里去:


1

A.foreach(println)

想把RDD的内容逐条打印出来,但是结果却没有出现在stdout里面,因为这一步操作被放到slave上面去执行了。其实只需要collect一下,这些内容就被加载到master的内存中打印了:


1

A.collect.foreach(println)

再比如,如果遇到RDD操作嵌套的情况,通常考虑优化掉,因为只有master才能去理解和执行RDD的操作,slave只能处理被分配的task而已。比如:


1

A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}

就可以用join来代替:


1

A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}

用reduceByKey代替groupByKey

这一条应该是比较经典的了。reduceByKey会在当前节点(local)中做reduce操作,也就是说,会在shuffle前,尽可能地减小数据量。而groupByKey则不是,它会不做任何处理而直接去shuffle。当然,有一些场景下,功能上二者并不能互相替换。因为reduceByKey要求参与运算的value,并且和输出的value类型要一样,但是groupByKey则没有这个要求。

有一些类似的xxxByKey操作,都比groupByKey好,比如foldByKey和aggregateByKey。

另外,还有一条类似的是用treeReduce来代替reduce,主要是用于单个reduce操作开销比较大,可以条件treeReduce的深度来控制每次reduce的规模。

转自:http://www.raychase.net/3788

ps:如果大家像尝试一下spark的速度,可以使用一下阿里云的 E-MapReduce

链接地址为:https://emr.console.aliyun.com/#/cluster/region/cn-hangzhou

时间: 2024-09-19 08:09:22

Spark性能优化——和shuffle搏斗的相关文章

Spark性能优化

Spark的性能分析和调优很有意思,今天再写一篇.主要话题是shuffle,当然也牵涉一些其他代码上的小把戏. 以前写过一篇文章,比较了几种不同场景的性能优化,包括portal的性能优化,web service的性能优化,还有Spark job的性能优化.Spark的性能优化有一些特殊的地方,比如实时性一般不在考虑范围之内,通常我们用Spark来处理的数据,都是要求异步得到结果的数据:再比如数据量一般都很大,要不然也没有必要在集群上操纵这么一个大家伙,等等.事实上,我们都知道没有银弹,但是每一种

Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势

本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义Partitioner,使用Map侧Join代替Reduce侧Join,给倾斜Key加上随机前缀等. 为何要处理数据倾斜(Data Skew) 什么是数据倾斜 对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜. 何谓数据倾斜?数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得

[看图说话] 基于Spark UI性能优化与调试——初级篇

Spark有几种部署的模式,单机版.集群版等等,平时单机版在数据量不大的时候可以跟传统的java程序一样进行断电调试.但是在集群上调试就比较麻烦了...远程断点不太方便,只能通过Log的形式进行数据分析,利用spark ui做性能调整和优化. 那么本篇就介绍下如何利用Ui做性能分析,因为本人的经验也不是很丰富,所以只能作为一个入门的介绍. 大体上会按照下面的思路进行讲解: 怎么访问Spark UI SparkUI能看到什么东西?job,stage,storage,environment,excu

Spark SQL性能优化

性能优化参数 针对Spark SQL 性能调优参数如下: 代码示例 import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import org.a

《Hadoop MapReduce性能优化》一2.1 研究Hadoop参数

2.1 研究Hadoop参数 Hadoop MapReduce性能优化 正如第1章中提到的那样,有很多因素会对Hadoop MapReduce性能产生影响.一般说来,与工作负载相关的Hadoop性能优化需要关注以下3个主要方面:系统硬件.系统软件,以及Hadoop基础设施组件的配置和调优/优化. 需要指出的是,Hadoop被归类为高扩展性解决方案,但却不足以归类为高性能集群解决方案.系统管理员可以通过各种配置选项来配置和调优Hadoop集群.性能配置参数主要关注CPU利用率.内存占用情况.磁盘I

作为大数据工程师,你必须熟练运用的性能优化技术

最近几年一直参与大数据产品的研发,同时大数据产品在海量数据场景下其处理性能又是其主要卖点和突破,所以个人在这几年经常忙于如何对大数据产品进行性能上面的优化,并且想通过本文和大家聊聊具体的几种比较常见大数据性能优化技术. 常见的大数据性能优化技术一般分为两部分,其一是硬件和系统层面的观测,从而来发现具体的瓶颈,并进行硬件或者系统级的调整;其二是主要通过对软件具体使用方法的调整来实现优化. 硬件方面的监测 图1. Windows7性能指数 关于硬件性能本身,个人觉得最好对性能的诠释就像图1大家比较熟

《Hadoop MapReduce性能优化》一1.3 Hadoop MapReduce的工作原理

1.3 Hadoop MapReduce的工作原理 Hadoop MapReduce性能优化经过一个或者多个步骤,MapReduce编程模型可以用来处理许多大规模数据问题,还可以更高效地实现MapReduce编程模型来支持使用大量机器处理大量数据的问题.在大数据的背景下,可以处理的数据规模可以大到无法在单机存储. 在典型的Hadoop MapReduce框架下,数据拆分为数据块并分发到集群内的多个节点上.MapReduce框架通过把计算逻辑转移到数据所在的机器,而不是把数据转移到其能够得以处理的

丰趣海淘:跨境电商平台的前端性能优化实践

原文出自[听云技术博客]:http://blog.tingyun.com/web/article/detail/586 随着互联网的发展,尤其是在2000年之后浏览器技术渐渐成熟,Web产品也越来越丰富,这时我们被浏览器窗口内的丰富"内容"所吸引,关注HTML/CSS,深入研究Dom.Bom和浏览器的渲染机制等,接触JavaScript库,"前端"这个职业,由此而生. 前端技术在这10多年中飞速发展,到了今天,我们可能发现"内容"的美在视觉上是有

如何确保无线局域网性能优化应

虽然很多无线局域网经理关注于解决问题和避免安全漏洞,但是无线局域网性能优化更注重于任务清单.这个方法不单单考虑不周全,而且忽视优化还是造成灾难的另一个因素.如果每个AP只需要支持十来个用户上网流量,一般的性能可能还是可以接受的,但是当AP接近最大容量而应用程序需求变得更高时,带宽消耗和瓶颈就会变得很严重. 为了实现无线局域网性能优化,网络经理和管理员必须使用新的无线局域网测量和测试工具,它们能够根据网络中运行的应用程序类型来检查网络性能.寻找这些工具会迫使网络管理员寻找目前的供应商以及测试方法之