Apache Spark源码走读(七)Standalone部署方式分析&sql的解析与执行

<一>Standalone部署方式分析

楔子

在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解。

没有HA的Standalone运行模式

先从比较简单的说起,所谓的没有ha是指master节点没有ha。

组成cluster的两大元素即Master和Worker。slave worker可以有1到多个,这些worker都处于active状态。

Driver Application可以运行在Cluster之内,也可以在cluster之外运行,先从简单的讲起即Driver Application独立于Cluster。那么这样的整体框架如下图所示,由driver,master和多个slave worker来共同组成整个的运行环境。

执行顺序

步骤1 运行master

$SPARK_HOME/sbin/start_master.sh

start_master.sh中最关键的一句就是

"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT

检测Master的jvm进程

root     23438     1 67 22:57 pts/0    00:00:05 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080

Master的日志在$SPARK_HOME/logs目录下

步骤2 运行worker,可以启动多个

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077

worker运行时,需要注册到指定的master url,这里就是spark://localhost:7077.

Master侧收到RegisterWorker通知,其处理代码如下

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
    {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          sender, workerUiPort, publicAddress)
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
          schedule()
        } else {
          val workerAddress = worker.actor.path.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress)
        }
      }
    }

步骤3 运行Spark-shell

MASTER=spark://localhost:7077 $SPARK_HOME/bin/spark-shell

spark-shell属于application,有关appliation的运行日志存储在$SPARK_HOME/works目录下

spark-shell作为application,在Master侧其处理的分支是RegisterApplication,具体处理代码如下。

case RegisterApplication(description) => {
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, sender)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        sender ! RegisteredApplication(app.id, masterUrl)
        schedule()
      }
    }

每当有新的application注册到master,master都要调度schedule函数将application发送到相应的worker,在对应的worker启动相应的ExecutorBackend. 具体代码请参考Master.scala中的schedule函数,代码就不再列出。

步骤4 结果检测

/opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080
root     23752 23745 21 23:00 pts/0    00:00:25 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.repl.Main
root     23986 23938 25 23:02 pts/2    00:00:03 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://localhost:7077
root     24047 23986 34 23:02 pts/2    00:00:04 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler 0 localhost 4 akka.tcp://sparkWorker@localhost:53568/user/Worker app-20140511230059-0000

从运行的进程之间的关系可以看出,worker和master之间的连接建立完毕之后,如果有新的driver application连接上master,master会要求worker启动相应的ExecutorBackend进程。此后若有什么Task需要运行,则会运行在这些Executor之上。可以从以下的日志信息得出此结论,当然看源码亦可。

14/05/11 23:02:36 INFO Worker: Asked to launch executor app-20140511230059-0000/0 for Spark shell
14/05/11 23:02:36 INFO ExecutorRunner: Launch command: "/opt/java/bin/java" "-cp" ":/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar" "-Xms512M" "-Xmx512M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler" "0" "localhost" "4" "akka.tcp://sparkWorker@localhost:53568/user/Worker" "app-20140511230059-0000"

worker中启动exectuor的相关源码见worker中的receive函数,相关代码如下

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
            self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
            workDir, akkaUrl, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          masterLock.synchronized {
            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
          }
        } catch {
          case e: Exception => {
            logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            masterLock.synchronized {
              master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
            }
          }
        }
      }

关于standalone的部署,需要详细研究的源码文件如下所列。

  • deploy/master/Master.scala
  • deploy/worker/worker.scala
  • executor/CoarseGrainedExecutorBackend.scala

查看进程之间的父子关系,请用"pstree"

使用下图来小结单Master的部署情况。

类的动态加载和反射

在谈部署Driver到Cluster上之前,我们先回顾一下java的一大特性“类的动态加载和反射机制”。本人不是一直写java代码出身,所以好多东西都是边用边学,难免挂一漏万。

所谓的反射,其实就是要解决在运行期实现类的动态加载。

来个简单的例子

package test;

public class Demo {

    public Demo() {
        System.out.println("Hi!");
    }

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception {
        Class clazz = Class.forName("test.Demo");
        Demo demo = (Demo) clazz.newInstance();
    }
}

谈到这里,就自然想到了一个面试题,“谈一谈Class.forName和ClassLoader.loadClass的区别"。说到面试,我总是很没有信心,面试官都很屌的, :)。

在cluster中运行Driver Application

上一节之所以写到类的动态加载与反射都是为了谈这一节的内容奠定基础。

将Driver application部署到Cluster中,启动的时序大体如下图所示。

  •  首先启动Master,然后启动Worker
  • 使用”deploy.Client"将Driver Application提交到Cluster中
./bin/spark-class org.apache.spark.deploy.Client launch
   [client-options] \
      \
   [application-options]
  • Master在收到RegisterDriver的请求之后,会发送LaunchDriver给worker,要求worker启动一个Driver的jvm process
  • Driver Application在新生成的JVM进程中运行开始时会注册到master中,发送RegisterApplication给Master
  • Master发送LaunchExecutor给Worker,要求Worker启动执行ExecutorBackend的JVM Process
  • 一当ExecutorBackend启动完毕,Driver Application就可以将任务提交到ExecutorBackend上面执行,即LaunchTask指令

提交侧的代码,详见deploy/Client.scala

    driverArgs.cmd match {
      case "launch" =>
        // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
        //       truncate filesystem paths similar to what YARN does. For now, we just require
        //       people call `addJar` assuming the jar is in the same directory.
        val env = Map[String, String]()
        System.getenv().foreach{case (k, v) => env(k) = v}

        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

        val classPathConf = "spark.driver.extraClassPath"
        val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val libraryPathConf = "spark.driver.extraLibraryPath"
        val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val javaOptionsConf = "spark.driver.extraJavaOptions"
        val javaOpts = sys.props.get(javaOptionsConf)
        val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
          driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)

        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)

        masterActor ! RequestSubmitDriver(driverDescription)

接收侧

从Deploy.client发送出来的消息被谁接收呢?答案比较明显,那就是Master。 Master.scala中的receive函数有专门针对RequestSubmitDriver的处理,具体代码如下

case RequestSubmitDriver(description) => {
      if (state != RecoveryState.ALIVE) {
        val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
        sender ! SubmitDriverResponse(false, None, msg)
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".

        sender ! SubmitDriverResponse(true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}")
      }
    }

SparkEnv

SparkEnv对于整个Spark的任务来说非常关键,不同的role在创建SparkEnv时传入的参数是不相同的,如Driver和Executor则存在重要区别。

在Executor.scala中,创建SparkEnv的代码如下所示:

  private val env = {
    if (!isLocal) {
      val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
        isDriver = false, isLocal = false)
      SparkEnv.set(_env)
      _env.metricsSystem.registerSource(executorSource)
      _env
    } else {
      SparkEnv.get
    }
  }

Driver Application则会创建SparkContext,在SparkContext创建过程中,比较重要的一步就是生成SparkEnv,其代码如下:

 private[spark] val env = SparkEnv.create(
    conf,
    "",
    conf.get("spark.driver.host"),
    conf.get("spark.driver.port").toInt,
    isDriver = true,
    isLocal = isLocal,
    listenerBus = listenerBus)
  SparkEnv.set(env)

Standalone模式下HA的实现

Spark在standalone模式下利用zookeeper来实现了HA机制,这里所说的HA是专门针对Master节点的,因为上面所有的分析可以看出Master是整个cluster中唯一可能出现单点失效的节点。

采用zookeeper之后,整个cluster的组成如下图所示。

为了使用zookeeper,Master在启动的时候需要指定如下的参数,修改conf/spark-env.sh, SPARK_DAEMON_JAVA_OPTS中添加如下选项。

System property Meaning
spark.deploy.recoveryMode Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark).

实现HA的原理

 zookeeper提供了一个Leader Election机制,利用这个机制,可以实现HA功能,具体请参考zookeeper recipes

在Spark中没有直接使用zookeeper的api,而是使用了curator,curator对zookeeper做了相应的封装,在使用上更为友好。

小结

步步演进讲到在standalone模式下,如何利用zookeeper来实现ha。从中可以看出standalone master一个最主要的任务就是resource management和job scheduling,看到这两个主要功能的时候,您也许会想到这不就是YARN要解决的问题。对了,从本质上来说standalone是yarn的一个简化版本。

本系列下篇内容就要仔细讲讲spark部署到YARN上的实现细节。

 参考资料

  1. Spark Standalone Mode http://spark.apache.org/docs/latest/spark-standalone.html
  2. Cluster Mode Overview  http://spark.apache.org/docs/latest/cluster-overview.html

<二>sql的解析与执行

概要

在即将发布的spark 1.0中有一个新增的功能,即对sql的支持,也就是说可以用sql来对数据进行查询,这对于DBA来说无疑是一大福音,因为以前的知识继续生效,而无须去学什么scala或其它script.

一般来说任意一个sql子系统都需要有parser,optimizer,execution三大功能模块,在spark中这些又都是如何实现的呢,这些实现又有哪些亮点和问题?带着这些疑问,本文准备做一些比较深入的分析。

SQL模块分析有几大难点,分别为:

  1. sql分析和执行的通用过程,这个与是否用spark无关,应该是非常general的问题
  2. spark sql中具体实现时的整体架构
  3. 源码阅读时碰到的scala特殊语法,也就是常说的语法糖问题

为什么需要SQL

SQL是一种标准,一种用来进行数据分析的标准,已经存在多年。

在大数据的背景下,随着数据规模的日渐增大,原有的分析技巧是否就过时了呢?答案显然是否定的,原来的分析技巧在既有的分析维度上依然保持有效,当然对于新的数据我们想挖掘出更多有意思有价值的内容,这个目标可以交给数据挖掘或者机器学习去完成。

那么原有的数据分析人员如何快速的转换到Big Data的平台上来呢,去重新学一种脚本吗,直接用scala或python去编写RDD。显然这样的代价太高,学习成本大。数据分析人员希望底层存储机制和分析引擎的变换不要对上层分析的应用有直接的影响,需求用一句话来表达就是,“直接使用sql语句来对数据进行分析”。

这也是为什么Hive兴起的原因了。Hive的流行直接证明这种设计迎合了市场的需求。由于Hive是采用了Hadoop的MapReduce作为分析执行引擎,其处理速度上不是尽如人意。Spark以快著称,很快有好事者写出了Shark,Shark取得了非常不俗的成绩,迎得了极好的口碑。

毕竟Shark是游离于Spark之外的一个项目,不受Spark节制,那么Spark开发团队的目标是将对SQL支持作用Spark的核心功能里面。以上分析就是Spark中的sql功能的由来。

应用举例

val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext._
case class Person(name: String, age: Int)
val person = sc.textFile("examples/src/main/resources/people.txt").map(_.split(" ")).map(p => Person(p(0), p(1).trim.toInt))
person.registerAsTable("person")
val teenagers = sql("SELECT name, age FROM person WHERE age >= 13 and age <= 19")
teenagers.map(t => "name:" + t(0)).collect().foreach(println)

上述代码的逻辑非常清晰,就是将存在于person.txt中年龄界于13到19岁的年轻人名字打印出来。

SQL通用执行过程

SQL的组成部分

SQL语句大家都很熟悉,那么有没有仔细想过其有几大部分组成呢?可能你会说,”这还用问,不就是“select * from tablex where f1=?”,有什么好想吗?“

还是先来看看再说吧,说不定有些新的思维在里面呢?

 

上图是对最简单的sql语句的重新标注,SELECT表示是一种具体的操作,即查询数据,”f1,f2,f3"表示返回的结果,tableX是数据源,condition部分是查询条件。有没有发觉SQL表达式中的顺序与常见的RDD处理逻辑其在表达的顺序上有差异。还是继续用图来表示不同吧。

 

SQL语句在分析执行过程中会经历下图所示的几个步骤

  1. 语法解析
  2. 操作绑定
  3. 优化执行策略
  4. 交付执行

语法解析

语法解析之后,会形成一棵语法树,如下图所示。树中的每个节点是执行的rule,整棵树称之为执行策略。

策略优化

形成上述的执行策略树还只是第一步,因为这个执行策略可以进行优化,所谓的优化就是对树中节点进行合并或是进行顺序上的调整。

以大家熟悉的join操作为例,下图给出一个join优化的示例。A JOIN B等同于B JOIN A,但是顺序的调整可能给执行的性能带来极大的影响,下图就是调整前后的对比图。

再举一例,一般来说尽可能的先实施聚合操作(Aggregate)然后再join

小结

上述一大通分析,希望达到的目的就两个。

  1. 语法解析之后生成一个执行策略树
  2. 执行策略树可以优化,优化的过程就是对树中节点进行合并或者顺序调整

有关SQL查询分析优化的具体过程,强烈推荐参考query optimizer deep dive系列文章

SQL在spark中的实现

有了上述内容的铺垫,想必你已经意识到Spark如果要很好的支持sql,势必也要完成,解析,优化,执行的三大过程。

整个SQL部分的代码,其大致分类如下图所示:

  1. SqlParser生成LogicPlan Tree
  2. Analyzer和Optimizer将各种rule作用于LogicalPlan Tree
  3. 最终优化生成的LogicalPlan生成Spark RDD
  4. 最后将生成的RDD交由Spark执行

阶段1:生成LogicalPlan

在sql中引入了一种新的RDD,即SchemaRDD,且看SchemaRDD的构造函数:

class SchemaRDD(
    @transient val sqlContext: SQLContext,
    @transient protected[spark] val logicalPlan: LogicalPlan)

 构造函数中总共两入参一为SparkContext,另一个LogicalPlan。LogicalPlan又是如何生成的呢?

要回答这个问题,不得不回到整个问题的入口点sql函数,sql函数的定义如下

  def sql(sqlText: String): SchemaRDD = {
    val result = new SchemaRDD(this, parseSql(sqlText))

    result.queryExecution.toRdd
    result
  }

parseSql(sqlText)负责生成LogicalPlan,parseSql就是SqlParser的一个实例。

SqlParser这一部分的代码要理解起来关键是要搞清楚StandardTokenParsers的调用规则,里面有一大堆的符号,如果不理解是什么意思,估计很难理清头绪。

由于apply函数可以不被显示调用,所以parseSql(sqlText)一句其实会隐式的调用SqlParser中的apply函数。

  def apply(input: String): LogicalPlan = {
    phrase(query)(new lexical.Scanner(input)) match {
      case Success(r, x) => r
      case x => sys.error(x.toString)
    }
  }

最最最让人蛋疼的一行代码就是phrase(query)(new lexical.Scanner(input))这里了,翻译过来就是如果输入的input字符串符合Lexical中定义的规则,则继续使用query处理。

看一下query的定义是什么:

  protected lazy val query: Parser[LogicalPlan] =
    select * (
      UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
      UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
    ) | insert

到了这里终于看到有LogicalPlan了,也就是说将普通的string转换成LogicalPlan在这里发生了

query这段代码同时说明,在目前的spark sql中仅支持selectinsert两种操作,至于delete, update暂不支持。

注:即便是到现在,估计你和当初一样对于SqlParser的使用还是一头雾水,不要紧,请参考ref[3]和[4]中的内容,至于那些稀奇古怪的符号到底是什么意思,请参考ref[5].

阶段2:QueryExecution

第一阶段,将string转换成为logicalplan tree,第二阶段将各种规则作用于LogicalPlan。

在第一阶段中展示的代码,哪一句会触发优化规则呢?是sql函数中的"result.queryExecution.toRdd",此处的queryExecution就是QueryExecution。这里又涉及到scala的一个语法糖问题。QueryExecution是一个抽象类,但却看到了下述的代码

 protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }

怎么可以创建抽象类的实例?我的世界坍塌了,呵呵。不要紧张,这在scala的世界是允许的,只不过scala是隐含的创建了一个QueryExecution的子类并初始化而已,java里的原则还是对的,人家背后有猫腻。

Ok,轮到阶段2中最重要的角色QueryExecution闪亮登场了

protected abstract class QueryExecution {
    def logical: LogicalPlan

    lazy val analyzed = analyzer(logical)
    lazy val optimizedPlan = optimizer(analyzed)
    lazy val sparkPlan = planner(optimizedPlan).next()
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    /** Internal version of the RDD. Avoids copies and has no schema */
    lazy val toRdd: RDD[Row] = executedPlan.execute()

    protected def stringOrError[A](f: => A): String =
      try f.toString catch { case e: Throwable => e.toString }

    def simpleString: String = stringOrError(executedPlan)

    override def toString: String =
      s"""== Logical Plan ==
         |${stringOrError(analyzed)}
         |== Optimized Logical Plan ==
         |${stringOrError(optimizedPlan)}
         |== Physical Plan ==
         |${stringOrError(executedPlan)}
      """.stripMargin.trim

    def debugExec() = DebugQuery(executedPlan).execute().collect()
  }

三大步:

  1. lazy val analyzed = analyzer(logical)
  2. lazy val optimizedPlan = optimizer(analyzed)
  3. lazy val sparkPlan = planner(optimizedPlan).next()

无论analyzer还是optimizer,它们都是RuleExecutor的子类,

RuleExecutor的默认处理函数是apply,对所有的子类都是一样的,RuleExecutorapply函数定义如下,

def apply(plan: TreeType): TreeType = {
    var curPlan = plan

    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val result = rule(plan)
            if (!result.fastEquals(plan)) {
              logger.trace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }

            result
        }
        iteration += 1
        if (iteration > batch.strategy.maxIterations) {
          logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}")
          continue = false
        }

        if (curPlan.fastEquals(lastPlan)) {
          logger.trace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.")
          continue = false
        }
        lastPlan = curPlan
      }

      if (!batchStartPlan.fastEquals(curPlan)) {
        logger.debug(
          s"""
          |=== Result of Batch ${batch.name} ===
          |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}
        """.stripMargin)
      } else {
        logger.trace(s"Batch ${batch.name} has no effect.")
      }
    }

    curPlan
  }

对于RuleExecutor的子类来说,最主要的是定义自己的batches,来看analyzer中的batches是如何定义的

val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      typeCoercionRules :_*),
    Batch("AnalysisOperators", fixedPoint,
      EliminateAnalysisOperators)
  )

batch中定义了一系列的规则,这里再次出现语法糖问题。“如何理解::这个操作符”? ::表示cons的意思,即连接生成一个list.

Batch构造函数中需要指定一系列的Rule,像ResolveReferences就是Rule,有关Rule的代码就不一一分析了。

 阶段3:LogicalPlan转换成Physical Plan

在阶段3最主要的代码就两行:

  1. lazy val executePlan: SparkPlan = prepareForExecution(sparkPlan)
  2. lazy val toRdd: RDD[Row] = executedPlan.execute()

与LogicalPlan不同,SparkPlan最重要的区别就是有execute函数

针对Sparkplan的具体实现,又要分成UnaryNode, LeafNode和BinaryNode,简要来说即单目运算符操作,叶子结点,双目运算符操作。每个子类的具体实现可以自行参考源码。

阶段4: 触发RDD执行

RDD被触发真正执行的过程在看了前面几篇文章之后想来难不住你来,所有的所有都在这一行代码。

teenagers.map(p => "name:"+p(0)).foreach(println)

如果真的不明白,建议回头再读一下Spark Job的执行过程分析。

总结

行为至此,可以收笔了。应该说SQL部分的代码涉及到的知识点还是比较多的,最重要的是理清两点,即SQL语句的通用处理过程。另一个是Spark SQL子系统中具体实现机制。

Spark Sql子模块的具体实现紧紧围绕LogicalPlan Tree展开,一是用sqlparser来生成logicalplan,二是用RuleExecutor将各种Rule作用于LogicalPlan。最后生成普通的RDD将会给Spark core处理。

参考资料

  1. Spark Catalyst 源码分析
  2. Query Optimizer Deep Diver
  3. playing with Scala Parser Combinator
  4. Parsing Text With Scala
  5. Parser Api
时间: 2024-10-01 01:46:49

Apache Spark源码走读(七)Standalone部署方式分析&sql的解析与执行的相关文章

Apache Spark源码走读

http://www.aliyun.com/zixun/aggregation/13383.html">Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,即使使用磁盘,迭代类型的计算也会有10倍速度的提升.Spark从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手.Spark当下已成为Apache基金会的顶级开源项目,拥有着庞大的社区支持--活跃开发者人数已超过Hadoop MapReduc

Apache Spark源码走读(五)部署模式下的容错性分析 &amp;standalone cluster模式下资源的申请与释放

<一>部署模式下的容错性分析 概要 本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的. Standalone部署的节点组成 介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多. 在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外是Me

Apache Spark源码走读(三)Spark on Yarn &amp;Spark源码编译 &amp;在YARN上运行SparkPi

<一>Spark on Yarn 概要 Hadoop2中的Yarn是一个分布式计算资源的管理平台,由于其有极好的模型抽象,非常有可能成为分布式计算资源管理的事实标准.其主要职责将是分布式计算集群的管理,集群中计算资源的管理与分配. Yarn为应用程序开发提供了比较好的实现标准,Spark支持Yarn部署,本文将就Spark如何实现在Yarn平台上的部署作比较详尽的分析. Spark Standalone部署模式回顾 上图是Spark Standalone Cluster中计算模块的简要示意,从

Apache Spark源码走读(十一)浅谈mllib中线性回归的算法实现&amp;Spark MLLib中拟牛顿法L-BFGS的源码实现

<一>浅谈mllib中线性回归的算法实现 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最

Apache Spark源码走读(四)Hive on Spark运行环境搭建 &amp;hiveql on spark实现详解

<一>Hive on Spark运行环境搭建 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就

Apache Spark源码走读(八)Graphx实现剖析&amp;spark repl实现详解

<一>Graphx实现剖析 概要 图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架.Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情. Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口.本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习. Google为什么赢得了搜索引擎大战 当Google还在起步的时候,在

Apache Spark源码走读(六)Task运行期之函数调用关系分析 &amp;存储子系统分析

<一>Task运行期之函数调用关系分析 概要 本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回. 准备 spark已经安装完毕 spark运行在local mode或local-cluster mode local-cluster mode local-cluster模式也称为伪分布式,可以使用如下指令运行 MASTER=local[1,2,1024] bin/spark-shell

Apache Spark源码走读(九)如何进行代码跟读&amp;使用Intellij idea调试Spark源码

<一>如何进行代码跟读 概要 今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读.众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢? new Throwable().printStackTrace 代码跟读的时候,经常会借助于日志,针对日志中输出的每一句,我们都很想知道它们的调用者是谁.但有时苦于对spark系统的了解程度不深,或者对scala认识不

Apache Spark源码走读(一)Spark论文阅读笔记&amp;Job提交与运行

<一>Spark论文阅读笔记 楔子 源码阅读是一件非常容易的事,也是一件非常难的事.容易的是代码就在那里,一打开就可以看到.难的是要通过代码明白作者当初为什么要这样设计,设计之初要解决的主要问题是什么. 在对Spark的源码进行具体的走读之前,如果想要快速对Spark的有一个整体性的认识,阅读Matei Zaharia做的Spark论文是一个非常不错的选择. 在阅读该论文的基础之上,再结合Spark作者在2012 Developer Meetup上做的演讲Introduction to Spa