深入理解Spark:核心思想与源码分析. 1.2 Spark初体验

1.2 Spark初体验

本节通过Spark的基本使用,让读者对Spark能有初步的认识,便于引导读者逐步深入学习。

1.2.1 运行spark-shell

要运行spark-shell,需要先对Spark进行配置。

1)进入Spark的conf文件夹:

cd ~/install/spark-1.2.0-bin-hadoop1/conf

2)复制一份spark-env.sh.template,命名为spark-env.sh,对它进行编辑,命令如下:

cp spark-env.sh.template spark-env.sh

vim spark-env.sh

3)添加如下配置:

export SPARK_MASTER_IP=127.0.0.1

export SPARK_LOCAL_IP=127.0.0.1

4)启动spark-shell:

cd ~/install/spark-1.2.0-bin-hadoop1/bin

./spark-shell

最后我们会看到spark启动的过程,如图1-3所示。

 

图1-3 Spark启动过程

从以上启动日志中我们可以看到SparkEnv、MapOutputTracker、BlockManagerMaster、DiskBlockManager、MemoryStore、HttpFileServer、SparkUI等信息。它们是做什么的?此处望文生义即可,具体内容将在后边的章节详细讲解。

1.2.2 执行word count

这一节,我们通过word count这个耳熟能详的例子来感受下Spark任务的执行过程。启动spark-shell后,会打开scala命令行,然后按照以下步骤输入脚本。

1)输入val lines =
sc.textFile("../README.md", 2),执行结果如图1-4所示。

 

图1-4 步骤1执行结果

2)输入val words =
lines.flatMap(line => line.split(" ")),执行结果如图1-5所示。

 

图1-5 步骤2执行结果

3)输入val ones = words.map(w
=> (w,1)),执行结果如图1-6所示。

 

图1-6 步骤3执行结果

4)输入val counts =
ones.reduceByKey(_ + _),执行结果如图1-7所示。

 

图1-7 步骤4执行结果

5)输入counts.foreach(println),任务执行过程如图1-8和图1-9所示。输出结果如图1-10所示。

 

图1-8 步骤5执行过程部分(一)

 

图1-9 步骤5执行过程部分(二)

 

图1-10 步骤5输出结果

在这些输出日志中,我们先是看到Spark中任务的提交与执行过程,然后看到单词计数的输出结果,最后打印一些任务结束的日志信息。有关任务的执行分析,笔者将在第5章中展开。

1.2.3 剖析spark-shell

通过word count在spark-shell中执行的过程,我们想看看spark-shell做了什么。spark-shell中有以下一段脚本,见代码清单1-1。

代码清单1-1 spark-shell中的一段脚本

function main() {

   
if $cygwin; then

stty -icanonmin 1 -echo > /dev/null
2>&1

       
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS
-Djline.terminal=unix"

       
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main
"${SUBMISSION_OPTS[@]}" spark-shell
"${APPLICATION_OPTS[@]}"

sttyicanon echo > /dev/null 2>&1

   
else

       
export SPARK_SUBMIT_OPTS

       
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main
"${SUBMISSION_OPTS[@]}" spark-shell
"${APPLICATION_OPTS[@]}"

fi

}

我们看到脚本spark-shell里执行了spark-submit脚本,打开spark-submit脚本,发现其中包含以下脚本。

exec "$SPARK_HOME"/bin/spark-class
org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"

脚本spark-submit在执行spark-class脚本时,给它增加了参数SparkSubmit。打开spark-class脚本,其中包含以下脚本,见代码清单1-2。

代码清单1-2 spark-class

if [ -n "${JAVA_HOME}" ]; then

   
RUNNER="${JAVA_HOME}/bin/java"

else

    if
[ `command -v java` ]; then

       
RUNNER="java"

   
else

      
echo "JAVA_HOME is not set" >&2

      
exit 1

   
fi

fi

 

exec "$RUNNER" -cp
"$CLASSPATH" $JAVA_OPTS "$@"

读到这里,应该知道Spark启动了以SparkSubmit为主类的jvm进程。

为便于在本地对Spark进程使用远程监控,给spark-class脚本追加以下jmx配置:

JAVA_OPTS="-XX:MaxPermSize=128m
$OUR_JAVA_OPTS -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=10207
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false"

在本地打开jvisualvm,添加远程主机,如图1-11所示。

右击已添加的远程主机,添加JMX连接,如图1-12所示。

 

单击右侧的“线程”选项卡,选择main线程,然后单击“线程Dump”按钮,如图1-13所示。

从dump的内容中找到线程main的信息,如代码清单1-3所示。

 

图1-13 查看Spark线程

代码清单1-3 main线程dump信息

"main" - Thread t@1

   
java.lang.Thread.State: RUNNABLE

       
at java.io.FileInputStream.read0(Native Method)

       
at java.io.FileInputStream.read(FileInputStream.java:210)

       
at
scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)

       
at scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)

       
at scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.

       
java:933)

       
at
scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)

       
at
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)

       
at
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)

       
at org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.

       
scala:80)

       
at
scala.tools.nsc.interpreter.InteractiveReader$class.readLine(Interactive-

       
Reader.scala:43)

       
at
org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25)

       
at org.apache.spark.repl.SparkILoop.readOneLine$1(SparkILoop.scala:619)

       
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

       
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

       
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp

       
(SparkI-Loop.scala:968)

       
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.

       
scala:916)

       
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.

       
scala:916)

       
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClass

       
Loader.scala:135)

       
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)

       
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)

       
at org.apache.spark.repl.Main$.main(Main.scala:31)

       
at org.apache.spark.repl.Main.main(Main.scala)

       
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

       
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.

       
java:57)

       
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces-

       
sorImpl.java:43)

       
at java.lang.reflect.Method.invoke(Method.java:606)

       
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)

       
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

       
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

从main线程的栈信息中可看出程序的调用顺序:SparkSubmit.main→repl.Main→SparkI-Loop.process。SparkILoop.process方法中会调用initializeSpark方法,initializeSpark的实现见代码清单1-4。

代码清单1-4 initializeSpark的实现

def initializeSpark() {

intp.beQuietDuring {

   
command("""

       
@transient val sc = {

           
val _sc = org.apache.spark.repl.Main.interp.createSparkContext()

           
println("Spark context available as sc.")

           
_sc

       
}

       
""")

       
command("import org.apache.spark.SparkContext._")

    }

}

我们看到initializeSpark调用了createSparkContext方法,createSparkContext的实现见代码清单1-5。

代码清单1-5 createSparkContext的实现

def createSparkContext(): SparkContext = {

valexecUri =
System.getenv("SPARK_EXECUTOR_URI")

valjars = SparkILoop.getAddedJars

valconf = new SparkConf()

   
.setMaster(getMaster())

   
.setAppName("Spark shell")

   
.setJars(jars)

   
.set("spark.repl.class.uri", intp.classServer.uri)

if (execUri != null) {

                      conf.set("spark.executor.uri",
execUri)

    }

sparkContext = new SparkContext(conf)

   
logInfo("Created spark context..")

   
sparkContext

}

这里最终使用SparkConf和SparkContext来完成初始化,具体内容将在第3章讲解。代码分析中涉及的repl主要用于与Spark实时交互。

时间: 2024-09-07 16:04:50

深入理解Spark:核心思想与源码分析. 1.2 Spark初体验的相关文章

《深入理解Spark:核心思想与源码分析》——1.4节Spark源码编译与调试

1.4 Spark源码编译与调试 1.下载Spark源码 首先,访问Spark官网http://spark.apache.org/,如图1-18所示. 2.构建Scala应用 使用cmd命令行进到Spark根目录,执行sbt命令.会下载和解析很多jar包,要等很长时间,笔者大概花了一个多小时才执行完. 3.使用sbt生成Eclipse工程文件 等sbt提示符(>)出现后,输入Eclipse命令,开始生成Eclipse工程文件,也需要花费很长时间,笔者本地大致花了40分钟.完成时的状况如图1-21

《深入理解Spark:核心思想与源码分析》——2.3节Spark基本设计思想

2.3 Spark基本设计思想2.3.1 Spark模块设计 整个Spark主要由以下模块组成: Spark Core:Spark的核心功能实现,包括:SparkContext的初始化(Driver Application通过SparkContext提交).部署模式.存储体系.任务提交与执行.计算引擎等. Spark SQL:提供SQL处理能力,便于熟悉关系型数据库操作的工程师进行交互查询.此外,还为熟悉Hadoop的用户提供Hive SQL处理能力. Spark Streaming:提供流式计

《深入理解Spark:核心思想与源码分析》——第1章环境准备

第1章 环 境 准 备 凡事豫则立,不豫则废:言前定,则不跲:事前定,则不困. -<礼记·中庸> 本章导读 在深入了解一个系统的原理.实现细节之前,应当先准备好它的源码编译环境.运行环境.如果能在实际环境安装和运行Spark,显然能够提升读者对于Spark的一些感受,对系统能有个大体的印象,有经验的技术人员甚至能够猜出一些Spark采用的编程模型.部署模式等.当你通过一些途径知道了系统的原理之后,难道不会问问自己:"这是怎么做到的?"如果只是游走于系统使用.原理了解的层面,

《深入理解Spark:核心思想与源码分析》——第3章SparkContext的初始化

第3章 SparkContext的初始化 道生一, 一生二, 二生三, 三生万物. -<道德经> 本章导读 SparkContext的初始化是Driver应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark的读者理解源码.读者朋友如果能边跟踪代码,边学习本章内容,也许是快速理解SparkContext初始化过程的便捷途径.已经熟练使用Spark的开发人员可以选择跳过本章内容. 本章将在介绍SparkContext初始化过程的同时,向读者介绍

《深入理解Spark:核心思想与源码分析》——3.1节SparkContext概述

3.1 SparkContext概述 Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端.了解Spark Driver的初始化,有助于读者理解用户应用程序在客户端的处理过程. Spark Driver的初始化始终围绕着SparkContext的初始化.SparkContext可以算得上是所有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动.SparkContext初始化完毕,才能向Spark集群提交任务.在平坦的公路上,发动机只需以较低的转速.较低的功率

《深入理解Spark:核心思想与源码分析》——1.5节小结

1.5 小结 本章通过引导大家在Linux操作系统下搭建基本的执行环境,并且介绍spark-shell等脚本的执行,来帮助读者由浅入深地进行Spark源码的学习.由于目前多数开发工作都在Windows系统下进行,并且Eclipse有最广大的用户群,即便是一些开始使用IntelliJ的用户对Eclipse也不陌生,所以在Windows环境下搭建源码阅读环境时,选择这些最常用的工具,能降低读者的学习门槛,并且替大家节省时间.

《深入理解Spark:核心思想与源码分析》——2.2节Spark基础知识

2.2 Spark基础知识 1.版本变迁 经过4年多的发展,Spark目前的版本是1.4.1.我们简单看看它的版本发展过程. 1)Spark诞生于UCBerkeley的AMP实验室(2009). 2)Spark正式对外开源(2010年). 3)Spark 0.6.0版本发布(2012-10-15),进行了大范围的性能改进,增加了一些新特性,并对Standalone部署模式进行了简化. 4)Spark 0.6.2版本发布(2013-02-07),解决了一些bug,并增强了系统的可用性. 5)Spa

《深入理解Spark:核心思想与源码分析》——2.4节Spark基本架构

2.4 Spark基本架构从集群部署的角度来看,Spark集群由以下部分组成:Cluster Manager:Spark的集群管理器,主要负责资源的分配与管理.集群管理器分配的资源属于一级分配,它将各个Worker上的内存.CPU等资源分配给应用程序,但是并不负责对Executor的资源分配.目前,Standalone.YARN.Mesos.EC2等都可以作为Spark的集群管理器.Worker:Spark的工作节点.对Spark应用程序来说,由集群管理器分配得到资源的Worker节点主要负责以

《深入理解Spark:核心思想与源码分析》——3.2节创建执行环境SparkEnv

3.2 创建执行环境SparkEnv SparkEnv是Spark的执行环境对象,其中包括众多与Executor执行相关的对象.由于在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中.创建SparkEnv 主要使用Sp

《深入理解Spark:核心思想与源码分析》——第2章Spark设计理念与基本架构

第2章 Spark设计理念与基本架构 若夫乘天地之正,而御六气之辩,以游无穷者,彼且恶乎待哉? -<庄子·逍遥游> 本章导读 上一章,介绍了Spark环境的搭建,为方便读者学习Spark做好准备.本章首先从Spark产生的背景开始,介绍Spark的主要特点.基本概念.版本变迁.然后简要说明Spark的主要模块和编程模型.最后从Spark的设计理念和基本架构入手,使读者能够对Spark有宏观的认识,为之后的内容做一些准备工作. Spark是一个通用的并行计算框架,由加州伯克利大学(UCBerke