spark job运行参数优化

一、问题

      使用spark join两张表(5000w*500w)总是出错,报的异常显示是在shuffle阶段。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

14/11/27 12:05:49 ERROR storage.DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /hadoop/application_1415632483774_448143/spark-local-20141127115224-9ca8/04/shuffle_1_1562_27

java.io.FileNotFoundException: /hadoop/application_1415632483774_448143/spark-local-20141127115224-9ca8/04/shuffle_1_1562_27 (No such file or directory)

        at java.io.FileOutputStream.open(Native Method)

        at java.io.FileOutputStream.<init>(FileOutputStream.java:212)

        at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(BlockObjectWriter.scala:178)

        at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$revertWrites$1.apply(HashShuffleWriter.scala:118)

        at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$revertWrites$1.apply(HashShuffleWriter.scala:117)

        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

        at org.apache.spark.shuffle.hash.HashShuffleWriter.revertWrites(HashShuffleWriter.scala:117)

        at org.apache.spark.shuffle.hash.HashShuffleWriter.stop(HashShuffleWriter.scala:89)

        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

        at org.apache.spark.scheduler.Task.run(Task.scala:54)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

        at java.lang.Thread.run(Thread.java:724)

     

    出问题的代码块(scala)

1 val cRdd = iRdd.leftOuterJoin(label).map {
2      case (id, (iMap, Some(set))) => (id, (iMap, set))
3      case (id, (iMap, None)) => (id, (iMap, new HashSet[Int]()))
4    }.persist(StorageLevel.MEMORY_AND_DISK)

 

二、问题分析与解决

     一般spark job很多问题都是来源于系统资源不够用,通过监控日志等判断是内存资源占用过高导致的问题,因此尝试通过配置参数的方法来解决。

1)--conf spark.akka.frameSize=100

     此参数控制Spark中通信消息的最大容量 (如task的输出结果),默认为10M。当处理大数据时,task的输出可能会大于这个值,需要根据实际数据设置一个更高的值。尝试将此参数设置成100M后,问题未能解决。

2)--conf spark.shuffle.manager=SORT

     Spark默认的shuffle采用Hash模式,在HASH模式下,每一次shuffle会生成M*R的数量的文件(M指的是Map的数目,R指的是Reduce的数目),而当Map和Reduce的数目开得较大时,会产生相当规模的文件,与此同时带来了大量的内存开销。

     为了降低系统资源,可以采用Sort模式,Sort模式只产生M数量的文件。具体可以参考:Sort-based Shuffle之初体验

     在我们的应用场景下,采用Sort模式后,shuffle时间比之前增大了1/3,但是问题依旧未解决。

3)--conf spark.yarn.executor.memoryOverhead=4096

     executor堆外内存设置。起初是1024M,未能跑过,后改为4096M,Job就能跑通,原因是程序使用了大量的堆外内存。

时间: 2024-09-02 09:51:08

spark job运行参数优化的相关文章

Spark程序运行常见错误解决方法以及优化

一.org.apache.spark.shuffle.FetchFailedException 1.问题描述 这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,非常的耗时. 2.报错提示 (1) missing output location org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0  (2)

Mapreduce和Yarn概念,参数优化,作用,原理,MapReduce计数器 Counter,MapReduce 多job串联之ControlledJob(来自学习资料)

3.3. MapReduce与YARN 3.3.1 YARN概述 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序 3.3.2 YARN的重要概念 1.  yarn并不清楚用户提交的程序的运行机制 2.  yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源) 3.  yarn中的主管角色叫ResourceManager 4.  yarn中具体提供运算资源的

基于WebSphere Commerce的电子商务应用性能优化(5) 参数优化建议

参数优化建议 WebSphere Commerce 是基于 WebSphere 应用程序服务器开发的大型电子商务应用程序.在初次成功安装 WebSphere Commerce 应用程序之后,安装程序已经对服务器上的关键参数进行了初始化调整.这组默认值是 WebSphere Commerce 测试团队经过反复测试总结出来的一组初始化的参数值.建议维护人员以这组默认值作 为初始值进行测试,比较测试结果与期望值的差距,从而有计划的对应用程序服务器的部分参数进行优化.如 果您正在维护的生产环境运行正常,

Mysql运行环境优化(Linux系统)

这篇文章主要介绍了Mysql运行环境优化(Linux系统),本文优化了修改Linux默认的IO调度算法.扩大文件描述符.禁用numa特性.修改swappiness设置.优化文件系统挂载参数等配置,需要的朋友可以参考下 一.修改Linux默认的IO调度算法. linux默认的IO调度算法为cfq,需要修改为dealine,如果是SSD或者PCIe-SSD设备,需要修改为noop,可以使用下面两种修改方式. 1.在线动态修改,重启失效. 代码如下: echo "deadline" >

MySQL配置文件my.cnf参数优化和中文详解_Mysql

Mysql参数优化对于新手来讲,是比较难懂的东西,其实这个参数优化,是个很复杂的东西,对于不同的网站,及其在线量,访问量,帖子数量,网络情况,以及机器硬件配置都有关系,优化不可能一次性完成,需要不断的观察以及调试,才有可能得到最佳效果. 复制代码 代码如下: [client]port = 3306socket = /tmp/mysql.sock [mysqld]port = 3306socket = /tmp/mysql.sock basedir = /usr/local/mysqldatadi

linux中nginx内核参数优化配置

内核参数优化配置 vi /etc/sysctl.conf 添加以下参数设置后运行命令: /sbin/sysctl -p 关于Nginx内核参数的优化: net.ipv4.tcp_max_tw_buckets = 6000 设定timewait的数量,默认是180000. net.ipv4.ip_local_port_range = 1024 65000 允许系统打开的端口范围. net.ipv4.tcp_tw_recycle = 1 启用timewait快速回收. net.ipv4.tcp_tw

Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析

1.整体运行流程 使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits

一个价值“千万”的秒杀场景参数优化

       秒杀最早来自天猫双11各种商品的促销活动中,现在已经有很多业务场景在使用,比如抢红包,抢票等.其特点有三高:瞬时并发高,数据一致性高,热点更新频度高.这样三高的场景下往往给数据库造成极大的压力,大量更新数据库中的同一行,这样必然会产生锁等待,导致数据库的性能急剧下降的问题,很容出现雪崩效应.笔者记得有一年春节,一个电视台定时在整点发放红包,结果由于压力太高,导致更新数据库红包数额的请求全部堆积,业务全部挂掉,面对这样的情况我们当时也束手无策.       面对秒杀业务的场景,数据库

二次规划-含松弛变量的svm参数优化问题,具体实现的方法有哪些

问题描述 含松弛变量的svm参数优化问题,具体实现的方法有哪些 有谁写过它的matlab实现吗