Spark独立模式
Spark除了可以在Mesos和YARN集群上运行之外,还支持一种简单的独立部署模式。独立部署模式下,你既可以手工启动(手动运行master和workers),也可以利用我们提供的启动脚本(launch scripts)。同时,独立部署模式下,你可以在单机上运行这些程序,以方便测试。
Spark集群独立安装
要独立安装Spark,你只需要将编译好的Spark包复制到集群中每一个节点上即可。你可以下载一个编译好的Spark版本,也可以在这里自己编译一个版本(build it yourself)。
手动启动集群
执行以下命令,启动独立部署的master server:
./sbin/start-master.sh
一旦启动完成,master会打印出master URL(spark://HOST:PORT),后续worker需要用这个URL来连接master,写代码的时候SparkContext中的master参数也需要设置成这个master URL。你还可以在master的web UI(默认为http://localhost:8080)上查看master URL。
类似地,你可以通过以下命令,启动一个或多个worker节点,并将其连接到master:
./sbin/start-slave.sh <master-spark-URL>
启动一个worker以后,刷新一下master的web UI(默认为http://localhost:8080),你应该可以在这里看到一个新的节点。
最后,以下配置选项将会被传给master和worker:
参数 | 含义 |
---|---|
-h HOST , --host HOST
|
监听的主机名 |
-i HOST , --ip HOST
|
监听的主机名(已经废弃,请使用-h 或者 –host) |
-p PORT , --port PORT
|
服务监听的端口(master节点默认7077,worker节点随机) |
--webui-port PORT |
web UI端口(master节点默认8080,worker节点默认8081) |
-c CORES , --cores CORES
|
单节点上,Spark应用能够使用的CPU core数上限(默认,等于CPU core的个数);仅worker节点有效 |
-m MEM , --memory MEM
|
单节点上,Spark应用能够使用的内存上限,格式为1000M 或者 2G(默认为,机器上所有内存减去1G);仅worker节点有效 |
-d DIR , --work-dir DIR
|
工作目录,同时job的日志也输出到该目录(默认:${SPAKR_HOME}/work);仅worker节点有效 |
--properties-file FILE |
自定义Spark属性文件的加载路径(默认:conf/spark-defaults.conf) |
集群启动脚本
要使用启动脚本,来启动一个Spark独立部署集群,首先你需要在Spark目录下创建一个文件conf/slaves,并且在文件中写入你需要作为worker节点启动的每一台机器主机名(或IP),每行一台机器。如果conf/slaves文件不存在,启动脚本会默认会用单机方式启动,这种方式对测试很有帮助。注意,master节点访问各个worker时使用ssh。默认情况下,你需要配置ssh免密码登陆(使用秘钥文件)。如果你没有设置免密码登陆,那么你也可以通过环境变量SPARK_SSH_FOREGROUND来一个一个地设置每个worker的密码。
设置好conf/slaves文件以后,你就可以用一下shell脚本来启动或停止集群了,类似于Hadoop的部署,这些脚本都在${SPARK_HOME}/sbin目录下:
sbin/start-master.sh
– 在本机启动一个master实例sbin/start-slaves.sh
– 在conf/slaves文件所指定的每一台机器上都启动一个slave实例sbin/start-slave.sh
– 在本机启动一个slave实例sbin/start-all.sh
– 启动一个master和多个slave实例,详细见上面的描述。sbin/stop-master.sh
– 停止 start-master.sh所启动的master实例sbin/stop-slaves.sh
– 停止所有在conf/slaves中指定的slave实例sbin/stop-all.sh
– 停止master节点和所有slave节点,详细见上面的描述
注意,这些脚本都需要在你启动Spark master的机器上运行,而不是你的本地机器。
Spark独立部署集群的其他可选的环境变量见conf/spark-env.sh。你可以通过复制conf/spark-env.sh.template来创建这个文件,同时你还需要将配置好的文件复制到所有的worker节点上。以下是可用的设置:
环境变量 | 含义 |
---|---|
SPARK_MASTER_IP |
master实例绑定的IP地址,例如,绑定到一个公网IP |
SPARK_MASTER_PORT |
mater实例绑定的端口(默认7077) |
SPARK_MASTER_WEBUI_PORT |
master web UI的端口(默认8080) |
SPARK_MASTER_OPTS |
master专用配置属性,格式如”-Dx=y” (默认空),可能的选项请参考下面的列表。 |
SPARK_LOCAL_DIRS |
Spark的本地工作目录,包括:映射输出的临时文件和RDD保存到磁盘上的临时数据。这个目录需要快速访问,最好设成本地磁盘上的目录。也可以通过使用逗号分隔列表,将其设成多个磁盘上的不同路径。 |
SPARK_WORKER_CORES |
本机上Spark应用可以使用的CPU core上限(默认所有CPU core) |
SPARK_WORKER_MEMORY |
本机上Spark应用可以使用的内存上限,如:1000m,2g(默认为本机所有内存减去1GB);注意每个应用单独使用的内存大小要用 spark.executor.memory 属性配置的。 |
SPARK_WORKER_PORT |
Spark worker绑定的端口(默认随机) |
SPARK_WORKER_WEBUI_PORT |
worker web UI端口(默认8081) |
SPARK_WORKER_INSTANCES |
每个slave机器上启动的worker实例个数(默认:1)。如果你的slave机器非常强劲,可以把这个值设为大于1;相应的,你需要设置SPARK_WORKER_CORES参数来显式地限制每个worker实例使用的CPU个数,否则每个worker实例都会使用所有的CPU。 |
SPARK_WORKER_DIR |
Spark worker的工作目录,包括worker的日志以及临时存储空间(默认:${SPARK_HOME}/work) |
SPARK_WORKER_OPTS |
worker的专用配置属性,格式为:”-Dx=y”,可能的选项请参考下面的列表。 |
SPARK_DAEMON_MEMORY |
Spark master和worker后台进程所使用的内存(默认:1g) |
SPARK_DAEMON_JAVA_OPTS |
Spark master和workers后台进程所使用的JVM选项,格式为:”-Dx=y”(默认空) |
SPARK_PUBLIC_DNS |
Spark master和workers使用的公共DNS(默认空) |
注意: 启动脚本目前不支持Windows。如需在Windows上运行,请手工启动master和workers。
SPARK_MASTER_OPTS支持以下属性:
属性名 | 默认值 | 含义 |
---|---|---|
spark.deploy.retainedApplications |
200 | web UI上最多展示几个已结束应用。更早的应用的数将被删除。 |
spark.deploy.retainedDrivers |
200 | web UI上最多展示几个已结束的驱动器。更早的驱动器进程数据将被删除。 |
spark.deploy.spreadOut |
true | 独立部署集群的master是否应该尽可能将应用分布到更多的节点上;设为true,对数据本地性支持较好;设为false,计算会收缩到少数几台机器上,这对计算密集型任务比较有利。 |
spark.deploy.defaultCores |
(无限制) | Spark独立模式下应用程序默认使用的CPU个数(没有设置spark.cores.max的情况下)。如果不设置,则为所有可用CPU个数(除非设置了spark.cores.max)。如果集群是共享的,最好将此值设小一些,以避免用户占满整个集群。 |
spark.worker.timeout |
60 | 如果master没有收到worker的心跳,那么将在这么多秒之后,master将丢弃该worker。 |
SPARK_WORKER_OPTS支持以下属性:
属性名 | 默认值 | 含义 |
---|---|---|
spark.worker.cleanup.enabled |
false | 是否定期清理 worker 和应用的工作目录。注意,该设置仅在独立模式下有效,YARN有自己的清理方式;同时,只会清理已经结束的应用对应的目录。 |
spark.worker.cleanup.interval |
1800 (30 minutes) | worker清理本地应用工作目录的时间间隔(秒) |
spark.worker.cleanup.appDataTtl |
7 * 24 * 3600 (7 days) | 清理多久以前的应用的工作目录。这个选项值将取决于你的磁盘总量。spark应用会将日志和jar包都放在其对应的工作目录下。随着时间流逝,应用的工作目录很快会占满磁盘,尤其是在你的应用提交比较频繁的情况下。 |
连接到集群
要在Spark集群上运行一个应用,只需把spark://IP:PORT这个master URL传给SparkContext(参考SparkContext
constructor)
如需要运行交互式的spark shell,运行如下命令:
./bin/spark-shell --master spark://IP:PORT
你也可以通过设置选线 –total-executor-cores <numCores> 来控制spark-shell在集群上使用的CPU总数。
启动Spark应用
spark-submit脚本(spark-submit
script )是提交spark应用最简洁的方式。对于独立安装的集群来说,spark目前支持两种运行模式。客户端(client)模式下,驱动器进程(driver)将在提交应用的机器上启动。而在集群(cluster)模式下,驱动器(driver)将会在集群中的某一台worker上启动,同时提交应用的客户端在提交动作完成之后立即退出,而不会等到Spark应用运行结束。
如果你的应用时通过spark-submit提交启动的,那么应用对应的jar包会自动发布到所有的worker节点上。任何额外的依赖项jar包,都必须在–jars参数中指明,并以逗号分隔(如:–jars jar1,jar2)。
另外,独立安装的集群还支持异常退出(返回值非0)时自动重启。要启用这个特性,你需要在spark-submit时指定–supervise参数。其后,如果你需要杀掉一个重复失败的应用,你可能需要运行如下指令:
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
你可以在master web UI(http://<master url>:8080)上查看驱动器ID。
资源调度
独立安装集群目前只支持简单的先进先出(FIFO)调度器。这个调度器可以支持多用户,你可以控制每个应用所使用的最大资源。默认情况下,Spark应用会申请集群中所有的CPU,这不太合理,除非你的进群同一时刻只运行一个应用。你可以通过SparkConf中的spark.cores.max,来设置一个CPU帽子以限制其使用的CPU总数。例如:
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)
另外,你也可以通过conf/spark-env.sh中的spark.deploy.defaultCores设置应用默认使用的CPU个数(特别针对没有设置spark.cores.max的应用)。
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
在一些共享的集群上,用户很可能忘记单独设置一个最大CPU限制,那么这个参数将很有用。
监控和日志
Spark独立安装模式提供了一个基于web的集群监控用户界面。master和每个worker都有其对应的web UI,展示集群和Spark作业的统计数据。默认情况下,你可以在master机器的8080端口上访问到这个web UI。这个端口可以通过配置文件或者命令行来设置。
另外,每个作业的详细日志,将被输出到每个slave节点上的工作目录下(默认为:${SPARK_HOME}/work)。每个Spark作业下都至少有两个日志文件,stdout和stderr,这里将包含所有的输出到控制台的信息。
和Hadoop同时运行
你可以让Spark和已有的Hadoop在同一集群上同时运行,只需要将Spark作为独立的服务启动即可。这样Spark可以通过hdfs:// URL来访问Hadoop上的数据(通常情况下是,hdfs://<namenode>:9000/path,你可以在Hadoop Namenode的web UI上找到正确的链接)。当然,你也可以为Spark部署一个独立的集群,这时候Spark仍然可以通过网络访问HDFS上的数据;这会比访问本地磁盘慢一些,但如果Spark和Hadoop集群都在同一个本地局域网内的话,问题不大(例如,你可以在Hadoop集群的每个机架上新增一些部署Spark的机器)。
网络安全端口配置
Spark会大量使用网络资源,而有些环境会设置严密的防火墙设置,以严格限制网络访问。完整的端口列表,请参考这里:security page.
高可用性
默认情况下,独立调度的集群能够容忍worker节点的失败(在Spark本身来说,它能够将失败的工作移到其他worker节点上)。然而,调度器需要master做出调度决策,而这(默认行为)会造成单点失败:如果master挂了,任何应用都不能提交和调度。为了绕过这个单点问题,我们有两种高可用方案,具体如下:
基于Zookeeper的热备master
概要
利用Zookeeper来提供领导节点选举以及一些状态数据的存储,你可以在集群中启动多个master并连接到同一个Zookeeper。其中一个将被选举为“领导”,而另一个将处于备用(standby)状态。如果“领导”挂了,则另一个master会立即被选举,并从Zookeeper恢复已挂“领导”的状态,并继续调度。整个恢复流程(从“领导”挂开始计时)可能需要1到2分钟的时间。注意,整个延时只会影响新增应用 – 已经运行的应用不会受到影响。
更多关于Zookeeper信息请参考这里:here
配置
要启用这种恢复模式,你可以在spark-env中设置 SPARK_DAEMON_JAVA_OPTS,可用的属性如下:
系统属性 | 含义 |
---|---|
spark.deploy.recoveryMode |
设为ZOOKEEPER以启用热备master恢复模式(默认空) |
spark.deploy.zookeeper.url |
Zookeeper集群URL(如:192.168.1.100:2181,192.168.1.101:2181) |
spark.deploy.zookeeper.dir |
用于存储可恢复状态的Zookeeper目录(默认 /spark) |
可能的问题:如果你有多个master,但没有正确设置好master使用Zookeeper的配置,那么这些master彼此都不可见,并且每个master都认为自己是“领导”。这件会导致整个集群处于不稳定状态(多个master都会独立地进行调度)
详细
如果你已经有一个Zookeeper集群,那么启动高可用特性是很简单的。只需要在不同节点上启动多个master,并且配置相同的Zookeeper(包括Zookeeper URL和目录)即可。masters可以随时添加和删除。
在调度新提交的Spark应用或者新增worker节点时,需要知道当前”领导“的IP地址。你只需要将以前单个的master地址替换成多个master地址即可。例如,你可以在SparkContext中设置master URL为spark://host1:port1.host2:port2。这会导致SparkContext在两个master中都进行登记 – 那么这时候,如果host1挂了,这个应用的配置同样可以在新”领导“(host2)中找到。
”在master注册“和普通操作有一个显著的区别。在Spark应用或worker启动时,它们需要找当前的”领导“master,并在该master上注册。一旦注册成功,它们的状态将被存储到Zookeeper上。如果”老领导“挂了,”新领导“将会联系所有之前注册过的Spark应用和worker并通知它们领导权的变更,所以Spark应用和worker在启动时甚至没有必要知道”新领导“的存在。
由于这一特性,新的master可以在任何时间添加进来,你唯一需要关注的就是,新的应用或worker能够访问到这个master。总之,只要应用和worker注册成功,其他的你都不用管了。
基于本地文件系统的单点恢复
概要
利用Zookeeper当然是生成环境下高可用的最佳选择,但有时候你仍然希望在master挂了的时候能够重启之,FILESYSTEM模式能帮你实现这一需求。当应用和worker注册到master的时候,他们的状态都将被写入文件系统目录中,一旦master挂了,你只需要重启master,这些状态都能够恢复。
配置
要使用这种恢复模式,你需要在spark-env中设置SPARK_DAEMON_JAVA_OPTS,可用的属性如下:
系统属性 | 含义 |
---|---|
spark.deploy.recoveryMode |
设为FILESYSTEM以启用单点恢复模式(默认空) |
spark.deploy.recoveryDirectory |
用于存储可恢复状态数据的目录,master进程必须有访问权限 |
Details(详细)
- 这个解决方案可以用于和一些监控、管理系统进行串联(如:monit),或者与手动重启相结合。
- 至少,基于文件系统工单恢复总比不能恢复强;同时,这种恢复模式也会是开发或者实验场景下的不错的选择。在某些情况下,通过stop-master.sh杀死master可能不会清理其状态恢复目录下的数据,那么这时候你启动或重启master,将会进入恢复模式。这可能导致master的启动时间长达一分钟(master可能要等待之前注册的worker和客户端超时)。
- 你也可以使用NFS目录来作为数据恢复目录(虽然这不是官方声明支持的)。如果老的master挂了,你可以在另一个节点上启动master,这个master只要能访问同一个NFS目录,它就能够正确地恢复状态数据,包括之前注册的worker和应用(等价于Zookeeper模式)。后续的应用必须使用新的master来进行注册。