Spark源码分析之Spark-submit和Spark-class

有了前面spark-shell的经验,看这两个脚本就容易多啦。前面总结的Spark-shell的分析可以参考:

Spark-submit

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

跟Spark-shell一样,先检查是否设置了${SPARK_HOME},然后启动spark-class,并传递了org.apache.spark.deploy.SparkSubmit作为第一个参数,然后把前面Spark-shell的参数都传给spark-class

Spark-class

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ `command -v java` ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

# Find assembly jar
SPARK_ASSEMBLY_JAR=
if [ -f "${SPARK_HOME}/RELEASE" ]; then
  ASSEMBLY_DIR="${SPARK_HOME}/lib"
else
  ASSEMBLY_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION"
fi

GREP_OPTIONS=
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then
  echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
  echo "You need to build Spark before running this program." 1>&2
  exit 1
fi
if [ -d "$ASSEMBLY_DIR" ]; then
  ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
  if [ "$num_jars" -gt "1" ]; then
    echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
    echo "$ASSEMBLY_JARS" 1>&2
    echo "Please remove all but one jar." 1>&2
    exit 1
  fi
fi

SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"

LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"

这个类是真正的执行者,我们好好看看这个真正的入口在哪里?

首先,依然是设置项目主目录:

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

然后,配置一些环境变量:

. "${SPARK_HOME}"/bin/load-spark-env.sh

在spark-env中设置了assembly相关的信息。

然后寻找java,并赋值给RUNNER变量

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ `command -v java` ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

中间是一大坨跟assembly相关的内容。

最关键的就是下面这句了:

CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"

首先循环读取ARG参数,加入到CMD中。然后执行了"$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@这个是真正执行的第一个spark的类。

该类在launcher模块下,简单的浏览下代码:

public static void main(String[] argsArray) throws Exception {
   ...
    List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
    String className = args.remove(0);
    ...
    //创建命令解析器
    AbstractCommandBuilder builder;
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
      try {
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {
        ...
      }
    } else {
      builder = new SparkClassCommandBuilder(className, args);
    }

    List<String> cmd = builder.buildCommand(env);//解析器解析参数
    ...
    //返回有效的参数
    if (isWindows()) {
      System.out.println(prepareWindowsCommand(cmd, env));
    } else {
      List<String> bashCmd = prepareBashCommand(cmd, env);
      for (String c : bashCmd) {
        System.out.print(c);
        System.out.print('\0');
      }
    }
  }

launcher.Main返回的数据存储到CMD中。

然后执行命令:

exec "${CMD[@]}"

这里开始真正执行某个Spark的类。

最后来说说这个exec命令,想要理解它跟着其他几个命令一起学习:

  • source命令,在执行脚本的时候,会在当前的shell中直接把source执行的脚本给挪到自己的shell中执行。换句话说,就是把目标脚本的任务拿过来自己执行。
  • exec命令,是创建一个新的进程,只不过这个进程与前一个进程的ID是一样的。这样,原来的脚本剩余的部分就不能执行了,因为相当于换了一个进程。另外,创建新进程并不是说把所有的东西都直接复制,而是采用写时复制,即在新进程使用到某些内容时,才拷贝这些内容
  • sh命令则是开启一个新的shell执行,相当于创建一个新进程

举个简单的例子,下面有三个脚本:
xingoo-test-1.sh

exec -c sh /home/xinghl/test/xingoo-test-2.sh

xingoo-test-2.sh

while true
do
        echo "a2"
        sleep 3
done

xingoo-test-3.sh

sh /home/xinghl/test/xingoo-test-2.sh

xingoo-test-4.sh

source /home/xinghl/test/xingoo-test-2.sh

在执行xingoo-test-1.sh和xingoo-test-4.sh的效果是一样的,都只有一个进程。
在执行xingoo-test-3.sh的时候会出现两个进程。

参考

linux里source、sh、bash、./有什么区别

本文转自博客园xingoo的博客,原文链接:Spark源码分析之Spark-submit和Spark-class,如需转载请自行联系原博主。

时间: 2024-09-27 19:48:18

Spark源码分析之Spark-submit和Spark-class的相关文章

Spark源码分析之二:Job的调度模型与运行反馈

        在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈.         首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop

Spark源码分析之五:Task调度(一)

        在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段:         1.Job的调度模型与运行反馈:         2.Stage划分:         3.Stage提交:对应TaskSet的生成.         Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分

Spark源码分析 -- TaskScheduler

Spark在设计上将DAGScheduler和TaskScheduler完全解耦合, 所以在资源管理和task调度上可以有更多的方案 现在支持, LocalSheduler, ClusterScheduler, MesosScheduler, YarnClusterScheduler 先分析ClusterScheduler, 即standalone的Spark集群上, 因为比较单纯不涉及其他的系统, 看看Spark的任务是如何被执行的   private var taskScheduler: T

Spark源码分析 – SchedulerBackend

SchedulerBackend, 两个任务, 申请资源和task执行和管理 对于SparkDeploySchedulerBackend, 基于actor模式, 主要就是启动和管理两个actor  Deploy.Client Actor, 负责资源申请, 在SparkDeploySchedulerBackend初始化的时候就会被创建, 然后Client会去到Master上注册, 最终完成在Worker上的ExecutorBackend的创建(参考, Spark源码分析 – Deploy), 并且

Spark源码分析:多种部署方式之间的区别与联系(1)

<http://www.aliyun.com/zixun/aggregation/13383.html">Spark源码分析:多种部署方式之间的区别与联系(1)> <Spark源码分析:多种部署方式之间的区别与联系(2)> 从官方的文档我们可以知道,Spark的部署方式有很多种:local.Standalone.Mesos.YARN-..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来看,其实流程都差不多. 从代码中,我们可以得知其实Spark的部署

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那

Spark源码分析 – Shuffle

参考详细探究Spark的shuffle实现, 写的很清楚, 当前设计的来龙去脉   Hadoop Hadoop的思路是, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件  所以Hadoop后面直到reduce之前做的所有的事情其实就是不断的merge, 基于文件的多路并归排序, 在map端的将相同partition的merge到

Spark源码分析 – BlockManager

参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memory或disk中,RDD是由partitions组成的,对应于block  所以storage模块,就是要实现RDD在memory和disk上的persistent功能 首先每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slave  master负责trac

Spark源码分析 – SparkContext

Spark源码分析之-scheduler模块  这位写的非常好, 让我对Spark的源码分析, 变的轻松了许多  这里自己再梳理一遍 先看一个简单的spark操作, val sc = new SparkContext(--) val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()   1. SparkContext 这是Spa