3.5 Spark的性能调整
虽然数据管道的高效执行是任务调度器优先考虑的,这是Spark驱动的一部分,有时Spark需要人为给出一些提示。Spark调度主要与两个参数有关:CPU和内存。当然其他资源(如磁盘和网络I/O)也在Spark性能方面发挥重要作用,但目前Spark、Mesos或YARN都不能主动管理它们。
要监控的第一个参数是RDD的分区数,可以从文件中读取RDD时明确指定。常见的Spark错误是分区太多,这样做需要提供更多的并行性。当任务开始/结束时间相对较小的情况下,这样做也可以工作。但是建议减少分区数,特别是在有聚合的情况时。
每个RDD的默认分区数和并行级别由spark.default.parallelism参数决定,可在$ SPARK_HOME/conf/spark-defaults.conf配置文件中定义此参数。具体的RDD的分区数也可以通过coalesce( )或repartition( )方法来显式地更改。
内核总数和有效内存不足会导致任务无法继续进行,通常会造成死锁。当从命令行调用spark-submit、spark-shell或PySpark时,可以用--executor-cores选项来指定每个执行器的内核数。也可以在之前讨论的spark-defaults.conf文件中设置相应的参数。如果内核数量设置得太大,调度器将无法在节点上分配资源,从而导致死锁。
类似地,可通过--executor-memory(或spark.executor.memory属性)选项来指定所有任务请求的堆大小(默认为1G)。如果执行器的内存设制得太大,调度器可能会被死锁,或只能调度节点上有限的执行器。
在计算内核和内存数量时,独立模式中隐含的假设是:Spark是唯一运行的应用程序,这可能是正确。当在Mesos或YARN下运行时,配置集群调度器很重要,它通过Spark驱动来调度执行器对资源的有效请求。相关的YARN属性有:yarn.nodemanager.resource. cpu-vcores和yarn.nodemanager.resource.memory-mb。YARN可能会多给一点请求的内存。YARN的yarn.scheduler.minimum-allocation-mb和yarn.scheduler.increment-allocation-mb属性分别控制着最小值和增量请求值。
JVM还可以使用堆以外的一些内存,例如,用于内部字符串和直接字节缓冲区。spark.yarn.executor.memoryOverhead的属性值被添加到执行器内存,以确定每个执行器对YARN的内存请求。它默认为max(384, .07 * spark.executor.memory)。
由于Spark需要在执行器和客户机节点之间传输数据,高效的序列化非常重要。第6章会介绍不同的序列化框架,但在默认情况下,Spark会使用Kryo来进行序列化,这要求按静态方法显式地注册类。如果运行时发现序列化错误,可能是因为相应的类没有被注册或Kryo不支持它,这种情形出现嵌套和复杂的数据类型。一般来说,若不能非常有效地完成对象序列化,建议避免在执行器之间传递复杂的对象。
驱动具有类似的参数:spark.driver.cores、spark.driver.memory和spark.driver.maxRe-sultSize。后者为从所有执行器收集的结果设置限制,是通过collect方法来进行收集的。让驱动进程不出现内存不足的异常很重要。另一种避免内存不足异常和后续问题的方法是修改管道返回的聚合(或过滤)的结果,也可改用take方法。
《Scala机器学习》一一3.5 Spark的性能调整
时间: 2024-09-28 02:28:48
《Scala机器学习》一一3.5 Spark的性能调整的相关文章
《Scala机器学习》一一导读
前 言 这是一本关于机器学习的书,它以Scala为重点,介绍了函数式编程方法以及如何在Spark上处理大数据.九个月前,当我受邀写作本书时,我的第一反应是:Scala.大数据.机器学习,每一个主题我都曾彻底调研过,也参加了很多的讨论,结合任何两个话题来写都具有挑战性,更不用说在一本书中结合这三个主题.这个挑战激发了我的兴趣,于是就有了这本书.并不是每一章的内容都像我所希望的那样圆满,但技术每天都在快速发展.我有一份具体的工作,写作只是表达我想法的一种方式. 下面先介绍机器学习.机器学习经历了翻天
《Scala机器学习》一一第3章 使用Spark和MLlib
第3章 使用Spark和MLlib 上一章介绍了在全局数据驱动的企业架构中的什么地方以及如何利用统计和机器学习来处理知识,但接下来不会介绍Spark和MLlib的具体实现,MLlib是Spark顶层的机器学习库.Spark是大数据生态系统中相对较新的成员,它基于内存使用而不是磁盘来进行优化.数据仍然可以根据需要转储到磁盘上,但Spark只有在明确指示这样做或活动数据集不适合内存时才会执行转储.如果节点出现故障或由于某些原因从内存中擦除信息,Spark会利用存储的信息来重新计算活动数据集.这与传统
《Scala机器学习》一一1.5 使用Scala和Spark的Notebook工作
1.5 使用Scala和Spark的Notebook工作通常,这五种数字汇总方式不足以对数据形成初步认识.描述性统计(descriptive statistics)的术语非常通用,并且可以采用非常复杂的方法来描述数据.分位数和帕雷托图(Pareto chart)都是描述性统计的例子,当分析一个以上的属性时,相关性也是.在大多数情况下都能查阅到这些数据汇总的方法,但通过具体的计算来理解这些方法也很重要.Scala或者Spark Notebook(https://github.com/Bridgew
《Scala机器学习》一一3.3 应用
3.3 应用 下面会介绍Spark/Scala中的一些实际示例和库,具体会从一个非常经典的单词计数问题开始.3.3.1 单词计数 大多数现代机器学习算法需要多次传递数据.如果数据能存放在单台机器的内存中,则该数据会容易获得,并且不会呈现性能瓶颈.如果数据太大,单台机器的内存容纳不下,则可保存在磁盘(或数据库)上,这样虽然可得到更大的存储空间,但存取速度大约会降为原来的1/100.另外还有一种方式就是分割数据集,将其存储在网络中的多台机器上,并通过网络来传输结果.虽然对这种方式仍有争议,但分析表明
《Scala机器学习》一一2.5 数据驱动系统的基本组件
2.5 数据驱动系统的基本组件 简单地说,一个数据驱动架构包含如下的组件(或者可精简为以下这些组件): 数据收集:需要从系统和设备上收集数据.大多数的系统有日志,或者至少可选择将日志写入本地文件系统.一些系统可以通过网络来传输信息,比如syslog.但若没有审计信息,缺少持久层意味着有可能丢失数据. 数据转换层:也被称为提取.变换和加载(ETL).现在数据转换层也可以进行实时处理,即通过最近的数据来计算汇总信息.数据转换层也用来重新格式化数据和索引数据,以便能被UI组件有效地访问. 数据分析和机
Apache Spark机器学习.1.1 Spark概述和技术优势
摘要 Spark机器学习简介 本章从机器学习和数据分析视角介绍Apache Spark,并讨论Spark中的机器学习计算处理技术.本章首先概括介绍Apache Spark,通过与MapReduce等计算平台进行比较,展示Spark在数据分析中的技术优势和特点.接着,讨论如下五个方面的内容: 机器学习算法与程序库 Spark RDD和DataFrame 机器学习框架 Spark pipeline技术 Spark notebook技术 以上是数据科学家或机器学习专业人员必须掌握的五项最重要的技术内容
## Spark作业性能调优总结
Spark作业性能调优总结 前段时间在集群上运行Spark作业,但是发现作业运行到某个stage之后就卡住了,之后也不再有日志输出.于是开始着手对作业进行调优,下面是遇到的问题和解决过程: 运行时错误 Out Of Memory: Java heap space / GC overhead limit exceeded 使用yarn logs -applicationId=appliation_xxx_xxx 命令查看Yarn收集的各个Executor的日志. 可以发现OOM的错误,以及一些re
Apache Spark机器学习.1.5 Spark RDD和DataFrame
1.5 Spark RDD和DataFrame 本节关注数据以及Spark如何表示和组织数据.我们将介绍Spark RDD和DataFrame技术. 通过本节的学习,读者将掌握Spark的两个关键概念:RDD和DataFrame,并将它们应用于机器学习项目. 1.5.1 Spark RDD Spark主要以一种分布式项集合的形式进行数据抽象,称之为弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的关键创新,使其比其他框架计算更加快速和高效
Spark SQL性能优化
性能优化参数 针对Spark SQL 性能调优参数如下: 代码示例 import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import org.a