Spark结合源码解决数据倾斜造成Too Large Frame

新公司遇到的第一个spark的坑,寻找原因的过程其实还挺有意思,最终在源码和spark ui上的统计数据的帮助下找到根源,具体如下。

先说下问题

由于严重的数据倾斜,大量数据集中在单个task中,导致shuffle过程中发生异常

完整的exeception是这样的

但奇怪的是,经过尝试减小executor数量后任务反而成功,增大反而失败,经过多次测试,问题稳定复现。

成功的executor数量是7,失败的则是15,集群的active node是7

这结果直接改变了认知,也没爆内存,cpu也够,怎么会这样,executor数量应该越多越快啊,居然还失败了。

解决过程

这个数在几个失败里不一样,但是都超过了Integer.MaxValue。在spark源码中,这条异常发生在TransportFrameDecoder这个类中

检查发现是frame的大小超过了MAX_FRAME_SIZE,而MAX_FRAME_SIZE的大小定义如下

这个TransportFrameDecoder继承自ChannelInboundHandlerAdapter,这是Netty里的类,好了,看到这就明白了,原来错误发生在网络传输过程中,是数据量超大了。

但是对比了成功与失败的任务,都是单个task严重倾斜啊。再看下两个任务的executor分配。

失败的任务

成功的任务

失败的任务里,分配到的节点上都有多个executor;成功的任务里则每个节点只有一个executor。

再看下stage,失败的任务失败在stage26,这个stage依赖于stage24。看图说话

两个任务的stage24都是成功的,看下24的executor的数据量情况

可以看到,两个任务在这个stage上由于数据倾斜,所有数据输入输出都在一个executor中完成。但在stage26中,区别来了

为了提升性能,在hadoop和spark中都会尽量选择数据本地性,尽量让数据local,不行再选择rack等其他方案。而24的输出会作为26的输入。所以24之后自然会选择相同节点上的executor,看下stage26的情况

成功的任务

失败的任务

在成功的任务里,stage26与24的executor完全是同一个,这样数据是完全本地化的,甚至是同一个进程,因而经过优化不再需要通过网络传输

而在失败的任务里,stage26在执行时发现这个node上有3个executor,为了性能的提升,将数据分配给3个executor执行计算。可见其中也成功了一半,32686这个端口的executor是24中执行的那个,因而虽然它要处理3.3g的数据,但是因为不需要网络传输,也仍然可以成功。可是对于另外两个,即使是同一个节点,但是由于是不同进程,仍然需要通过netty的server拉取数据,而这一次的拉取最大不能超过int最大值,因而失败了一个,导致整个stage失败,也就导致了整个job的失败。

总结

由此可见在数据极度倾斜的情况下,增大executor的数量未见得是好事,还是要根据具体情况而来。减小了数量解决了问题,但是这其实并不是最好的解决方案,因为这种情况下,可见数据基本等同于本地执行,完全浪费了集群的并发性,更好的解决方案还需要再继续深入理解。

时间: 2024-08-20 00:20:36

Spark结合源码解决数据倾斜造成Too Large Frame的相关文章

HDFS源码分析数据块校验之DataBlockScanner

        DataBlockScanner是运行在数据节点DataNode上的一个后台线程.它为所有的块池管理块扫描.针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描.校验数据块.当一个BPOfferService服务变成活跃或死亡状态,该类中的blockPoolScannerMap将会更新.         我们先看下DataBlockScanner的成员变量,如下: // 所属数据节点DataNode实例 private

Spark Catalyst 源码分析

Architecture Ø 把输入的SQL,parse成unresolved logical plan,这一步参考SqlParser的实现 Ø 把unresolved logical plan转化成resolved logical plan,这一步参考analysis的实现 Ø 把resolved logical plan转化成optimized logical plan,这一步参考optimize的实现 Ø 把optimized logical plan转化成physical plan,这一

Spark Core源码分析: Spark任务执行模型

DAGScheduler 面向stage的调度层,为job生成以stage组成的DAG,提交TaskSet给TaskScheduler执行. 每一个Stage内,都是独立的tasks,他们共同执行同一个compute function,享有相同的shuffledependencies.DAG在切分stage的时候是依照出现shuffle为界限的. private[spark] class DAGScheduler( taskScheduler: TaskScheduler, listenerBu

专访周金可:我们更倾向于Greenplum来解决数据倾斜的问题

周金可,就职于听云,维护MySQL和GreenPlum的正常运行,以及调研适合听云业务场景的数据库技术方案. 听云周金可   9月24日,周金可将参加在北京举办的线下活动,并做主题为<GreenPlum在听云大数据实时分析的实践>的分享.值此,他分享了PG.工作上的一些经历和经验. 9月24日开源数据库企业应用实践PostgreSQL.Greenplum专场培训,点击这里>>>免费报名   正文:   周金可刚参加工作时是做系统运维的,后来慢慢接触了各种数据库,开始对数据库感

HDFS源码分析数据块复制监控线程ReplicationMonitor(一)

        ReplicationMonitor是HDFS中关于数据块复制的监控线程,它的主要作用就是计算DataNode工作,并将复制请求超时的块重新加入到待调度队列.其定义及作为线程核心的run()方法如下: /** * Periodically calls computeReplicationWork(). * 周期性调用computeReplicationWork()方法 */ private class ReplicationMonitor implements Runnable

HDFS源码分析数据块汇报之损坏数据块检测checkReplicaCorrupt()

        无论是第一次,还是之后的每次数据块汇报,名字名字节点都会对汇报上来的数据块进行检测,看看其是否为损坏的数据块.那么,损坏数据块是如何被检测的呢?本文,我们将研究下损坏数据块检测的checkReplicaCorrupt()方法.         关于数据块及其副本的状态,请阅读<HDFS源码分析之数据块及副本状态BlockUCState.ReplicaState>一文.         checkReplicaCorrupt()方法专门用于损坏数据块检测,代码如下: /** *

Android修改源码解决Alertdialog触摸对话框边缘消失的问题_Android

研究其父类时候发现,可以设置这么一条属性,在AlertDialog.Builder.create()之后才能调用这两个方法 方法一: setCanceledOnTouchOutside(false);调用这个方法时,按对话框以外的地方不起作用.按返回键还起作用 方法二: setCanceleable(false);调用这个方法时,按对话框以外的地方不起作用.按返回键也不起作用 这两个方法都属于Dialog方法,可查阅源码 修改后的源码如下: 复制代码 代码如下: case 1:         

Android修改源码解决Alertdialog触摸对话框边缘消失的问题

研究其父类时候发现,可以设置这么一条属性,在AlertDialog.Builder.create()之后才能调用这两个方法 方法一: setCanceledOnTouchOutside(false);调用这个方法时,按对话框以外的地方不起作用.按返回键还起作用 方法二: setCanceleable(false);调用这个方法时,按对话框以外的地方不起作用.按返回键也不起作用 这两个方法都属于Dialog方法,可查阅源码 修改后的源码如下: 复制代码 代码如下:case 1:          

HDFS源码分析数据块复制选取复制源节点

        数据块的复制当然需要一个源数据节点,从其上拷贝数据块至目标数据节点.那么数据块复制是如何选取复制源节点的呢?本文我们将针对这一问题进行研究.         在BlockManager中,chooseSourceDatanode()方法就是用来选取数据块复制时的源节点的,它负责解析数据块所属数据节点列表,并选择一个,用它作为数据块的复制源.其核心逻辑如下:         我们优先选择正处于退役过程中的数据节点而不是其他节点,因为前者没有写数据传输量因此相对不是很繁忙.我们不使用