Spark SQL CLI 实现分析

背景

本文主要介绍了Spark SQL里目前的CLI实现,代码之后肯定会有不少变动,所以我关注的是比较核心的逻辑。主要是对比了Hive CLI的实现方式,比较Spark SQL在哪块地方做了修改,哪些地方与Hive CLI是保持一致的。可以先看下总结一节里的内容。

Spark SQL的hive-thriftserver项目里是其CLI实现代码,下面先说明Hive CLI的主要实现类和关系,再说明Spark SQL CLI的做法。

Hive CLI

核心启动类是org.apache.hive.service.server.HiveServer2,启动方式:

    try {
      ServerOptionsProcessor oproc = new ServerOptionsProcessor("hiveserver2");
      if (!oproc.process(args)) {
        LOG.fatal("Error starting HiveServer2 with given arguments");
        System.exit(-1);
      }
      HiveConf hiveConf = new HiveConf();
      HiveServer2 server = new HiveServer2();
      server.init(hiveConf);
      server.start();
    } catch (Throwable t) {
      LOG.fatal("Error starting HiveServer2", t);
      System.exit(-1);
    }

HiveServer2继承CompositeService类,CompositeService类内部维护一个serviceList,能够加入、删除、启动、停止不同的服务。HiveServer2在init(hiveConf)的时候,会加入CLIService和ThriftCLIService两个Service。根据传输模式,如果是http或https的话,就使用ThriftHttpCLIService,否则使用ThriftBinaryCLIService。无论是哪个ThriftCLIService,都传入了CLIService的引用,thrift只是一个封装。

加入了这些服务后,把服务都启动起来。

CLIService也继承自CompositeService,CLIService 在init的时候会加入SessionManager服务,并且根据hiveConf,从 hadoop shims里得到UGI里的serverUsername。

SessionManager管理hive连接的开启、关闭等管理功能,已有的连接会维护在一个HashMap里,value为HiveSession类,里面大致是用户名、密码、hive配置等info。

所以CLIService里几乎所有的事情都是委托给SessionManager做的。

 

SessionManager内主要是OperationManager这个服务,是最重要的和执行逻辑有关的类,下面会具体说。

 

另外,关于ThriftCLIService,有两个实现子类,子类只复写了run()方法,设置thrift server相关的网络连接,其他对CLIService的调用逻辑都在父类ThriftCLIService本身里面。

实际上,ThriftCLIService里很多事情也是委托给CLIService做的。

 

那么上面大致是Hive CLI、Thrift server启动的流程,以及几个主要类的相互关系。

Spark SQL CLI

根据上面Hive CLI的逻辑,看看Spark SQL的CLI是怎么做的。

Spark里的HiveThriftServer2(这个类名看起来有点奇怪)继承了Hive的HiveServer2,并且复写了init方法,其初始化的时候加入的是SparkSQLCLIService和ThriftBinaryCLIService两个服务。前者继承了Hive的CLIService,有一些不同的逻辑;后者直接使用的是Hive的类,但传入的是SparkSQLCLIService的引用。

SparkSQLCLIService内部,类似Hive的CLIService,有一个SparkSQLSessionManager,继承自Hive的SessionManager。也有得到serverUsername的逻辑,代码和CLIService是一样的。

 

SparkSQLSessionManager复写了init这个方法,里面有Spark自己的SparkSQLOperationManager服务,继承自Hive的OperationManager类。

 

可能上面这几个类有点看晕了,本质上都是一些封装而已,没什么大的区别。真正重要的是SparkSQLOperationManager这个类里面,定义了如何使用Spark SQL来处理query操作。

SparkSQLOperationManager关键逻辑

Hive的CLI Operation父类有如下的子类继承体系,代表hive cli会处理的不同操作类型:



上半部分ExecuteStatementOperation子类体系是实际和查询相关的操作,下半部分是一些元数据读取操作。SparkSQLOperationManager实际改写的就是ExecuteStatementOperation子类的执行逻辑,而元数据相关的操作还是沿用hive本来的处理逻辑。

 

原本hive的ExecuteStatementOperation处理逻辑是这样的:

  public static ExecuteStatementOperation newExecuteStatementOperation(
      HiveSession parentSession, String statement, Map<String, String> confOverlay, boolean runAsync) {
    String[] tokens = statement.trim().split("\\s+");
    String command = tokens[0].toLowerCase();

    if ("set".equals(command)) {
      return new SetOperation(parentSession, statement, confOverlay);
    } else if ("dfs".equals(command)) {
      return new DfsOperation(parentSession, statement, confOverlay);
    } else if ("add".equals(command)) {
      return new AddResourceOperation(parentSession, statement, confOverlay);
    } else if ("delete".equals(command)) {
      return new DeleteResourceOperation(parentSession, statement, confOverlay);
    } else {
      return new SQLOperation(parentSession, statement, confOverlay, runAsync);
    }
  }

ExecuteStatementOperation也分两部分,HiveCommandOperation和SQLOperation。

不同的ExecuteStatementOperation子类最终由对应的CommandProcessor子类来完成操作请求。

那Spark是如何改写ExecuteStatementOperation的执行逻辑的呢?

最核心的逻辑如下:

      def run(): Unit = {
        logInfo(s"Running query '$statement'")
        setState(OperationState.RUNNING)
        try {
          result = hiveContext.sql(statement)
          logDebug(result.queryExecution.toString())
          val groupId = round(random * 1000000).toString
          hiveContext.sparkContext.setJobGroup(groupId, statement)
          iter = result.queryExecution.toRdd.toLocalIterator
          dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
          setHasResultSet(true)
        } catch {
          // Actually do need to catch Throwable as some failures don't inherit from Exception and
          // HiveServer will silently swallow them.
          case e: Throwable =>
            logError("Error executing query:",e)
            throw new HiveSQLException(e.toString)
        }
        setState(OperationState.FINISHED)
      }

statement是一个String,即query本身,调用HiveContext的sql()方法,返回的是一个SchemaRDD。HiveContext的这段逻辑如下:

  override def sql(sqlText: String): SchemaRDD = {
    // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
    if (dialect == "sql") {
      super.sql(sqlText)
    } else if (dialect == "hiveql") {
      new SchemaRDD(this, HiveQl.parseSql(sqlText))
    }  else {
      sys.error(s"Unsupported SQL dialect: $dialect.  Try 'sql' or 'hiveql'")
    }
  }

调完sql()后返回的是一个带被解析过了的基础逻辑计划的SchemaRDD。后续,

logDebug(result.queryExecution.toString())

这一步触发了逻辑执行计划的进一步分析、优化和变成物理执行计划的几个过程。之后,

result.queryExecution.toRdd

toRdd这步是触发计算并返回结果。这几个逻辑在之前Spark SQL源码分析的文章里都提到过。

除了上面这部分,还有一些schema转化、数据类型转化的逻辑,是因为Catalyst这边,有自己的数据行表示方法,也有自己的dataType,而且schema这块呢,在生成SchemaRDD的时候也转化过一次。所以在返回执行结果的时候,需要有转换回Hive的TableSchema、FieldSchema的逻辑。

 

以上说明了Spark SQL是如何把query的执行转换到Spark SQL里的。

总结

基本上Spark SQL在CLI这块的实现很靠近Hive Service项目里的CLI模块,主要类继承体系、执行逻辑差不多都一样。Spark SQL修改的关键逻辑在CLIService内的SessionManager内的OperationManager里,将非元数据查询操作的query丢给了Spark SQL的Hive工程里的HiveContext.sql()来完成,通过返回的SchemaRDD,来进一步得到结果数据、得到中间执行计划的Schema信息。

全文完 :)

时间: 2025-01-21 08:05:12

Spark SQL CLI 实现分析的相关文章

Spark修炼之道(高级篇)——Spark源码阅读:第十二节 Spark SQL 处理流程分析

作者:周志湖 下面的代码演示了通过Case Class进行表Schema定义的例子: // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema usi

Spark SQL组件源码分析

功能 Spark新发布的Spark SQL组件让Spark对SQL有了别样于Shark基于Hive的支持.参考官方手册,具体分三部分: 其一,能在Scala代码里写SQL,支持简单的SQL语法检查,能把RDD指定为Table存储起来.此外支持部分SQL语法的DSL. 其二,支持Parquet文件的读写,且保留Schema. 其三,能在Scala代码里访问Hive元数据,能执行Hive语句,并且把结果取回作为RDD使用. 第一点对SQL的支持主要依赖了Catalyst这个新的查询优化框架(下面会给

《Spark 官方文档》Spark SQL, DataFrames 以及 Datasets 编程指南(四)

使用Spark SQL命令行工具 Spark SQL CLI是一个很方便的工具,它可以用local mode运行hive metastore service,并且在命令行中执行输入的查询.注意Spark SQL CLI目前还不支持和Thrift JDBC server通信. 用如下命令,在spark目录下启动一个Spark SQL CLI ./bin/spark-sql Hive配置在conf目录下hive-site.xml,core-site.xml,hdfs-site.xml中设置.你可以用

Spark SQL Columnar模块源码分析

概述 本文介绍Spark SQL增加的Columnar模块代码实现. 首先介绍Columnar内的代码结构和实现,然后介绍在SqlContext里的使用方式. Columnar InMemoryColumnarTableScan 实现 InMemoryColumnarTableScan类是SparkPlan LeafNode的实现,即是一个物理执行计划. private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attri

整理对Spark SQL的理解

Catalyst Catalyst是与Spark解耦的一个独立库,是一个impl-free的执行计划的生成和优化框架. 目前与Spark Core还是耦合的,对此user邮件组里有人对此提出疑问,见mail.   以下是Catalyst较早时候的架构图,展示的是代码结构和处理流程. Catalyst定位 其他系统如果想基于Spark做一些类sql.标准sql甚至其他查询语言的查询,需要基于Catalyst提供的解析器.执行计划树结构.逻辑执行计划的处理规则体系等类体系来实现执行计划的解析.生成.

关于CarbonData+Spark SQL的一些应用实践和调优经验分享

大数据时代,中大型企业数据的爆发式增长,几乎每天都能产生约 100GB 到 10TB 的数据.而企业数据分系统构建与扩张,导致不同应用场景下大数据冗余严重.行业亟需一个高效.统一的融合数仓,从海量数据中快速获取有效信息,从而洞察机遇.规避风险. 在这样的现状下,CarbonData 诞生了,作为首个由中国贡献给Apache社区的顶级开源项目,CarbonData 提供了一种新的融合数据存储方案,以一份数据同时支持多种大数据应用场景,并通过丰富的索引技术.字典编码.列存等特性提升了 IO 扫描和计

Spark SQL中的DataFrame

在2014年7月1日的 Spark Summit 上,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上.在会议上,Databricks 表示,Shark 更多是对 Hive 的改造,替换了 Hive 的物理执行引擎,因此会有一个很快的速度.然而,不容忽视的是,Shark 继承了大量的 Hive 代码,因此给优化和维护带来了大量的麻烦.随着性能优化和先进分析整合的进一步加深,基于 MapReduce 设计的部分无疑成为了整个项目的瓶颈. 详细内容请参看 Sh

Spark Tungsten-sort Based Shuffle 分析

前言 看这篇文章前,建议你先简单看看Spark Sort Based Shuffle内存分析. Tungsten 中文是钨丝的意思. Tungsten Project 是 Databricks 公司提出的对Spark优化内存和CPU使用的计划,该计划初期似乎对Spark SQL优化的最多.不过部分RDD API 还有Shuffle也因此受益. 简述 Tungsten-sort优化点主要在三个方面: 直接在serialized binary data上sort而不是java objects,减少了

Spark Catalyst 源码分析

Architecture Ø 把输入的SQL,parse成unresolved logical plan,这一步参考SqlParser的实现 Ø 把unresolved logical plan转化成resolved logical plan,这一步参考analysis的实现 Ø 把resolved logical plan转化成optimized logical plan,这一步参考optimize的实现 Ø 把optimized logical plan转化成physical plan,这一