Spark-SparkSQL深入学习系列二(转自OopsOutOfMemory)

   /** Spark SQL源码分析系列文章*/

    Spark SQL的核心执行流程我们已经分析完毕,可以参见Spark
SQL核心执行流程
,下面我们来分析执行流程中各个核心组件的工作职责。

    本文先从入口开始分析,即如何解析SQL文本生成逻辑计划的,主要设计的核心组件式SqlParser是一个SQL语言的解析器,用scala实现的Parser将解析的结果封装为Catalyst TreeNode ,关于Catalyst这个框架后续文章会介绍。

一、SQL Parser入口

    Sql Parser 其实是封装了scala.util.parsing.combinator下的诸多Parser,并结合Parser下的一些解析方法,构成了Catalyst的组件UnResolved Logical Plan。

    先来看流程图:

     

     一段SQL会经过SQL Parser解析生成UnResolved Logical Plan(包含UnresolvedRelation、 UnresolvedFunction、 UnresolvedAttribute)。

    在源代码里是:  

[java] view
plain
 copy

  1. def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))//sql("select name,value from temp_shengli") 实例化一个SchemaRDD  
  2.   
  3. protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql) //实例化SqlParser  
  4.   
  5. class SqlParser extends StandardTokenParsers with PackratParsers {  
  6.   
  7.   def apply(input: String): LogicalPlan = {  //传入sql语句调用apply方法,input参数即sql语句  
  8.     // Special-case out set commands since the value fields can be  
  9.     // complex to handle without RegexParsers. Also this approach  
  10.     // is clearer for the several possible cases of set commands.  
  11.     if (input.trim.toLowerCase.startsWith("set")) {  
  12.       input.trim.drop(3).split("=", 2).map(_.trim) match {  
  13.         case Array("") => // "set"  
  14.           SetCommand(None, None)  
  15.         case Array(key) => // "set key"  
  16.           SetCommand(Some(key), None)  
  17.         case Array(key, value) => // "set key=value"  
  18.           SetCommand(Some(key), Some(value))  
  19.       }  
  20.     } else {  
  21.       phrase(query)(new lexical.Scanner(input)) match {  
  22.         case Success(r, x) => r  
  23.         case x => sys.error(x.toString)  
  24.       }  
  25.     }  
  26.   }  

    1.  当我们调用sql("select name,value from temp_shengli")时,实际上是new了一个SchemaRDD

    2. new SchemaRDD时,构造方法调用parseSql方法,parseSql方法实例化了一个SqlParser,这个Parser初始化调用其apply方法。

    3. apply方法分支:

         3.1 如果sql命令是set开头的就调用SetCommand,这个类似Hive里的参数设定,SetCommand其实是一个Catalyst里TreeNode之LeafNode,也是继承自LogicalPlan,关于Catalyst的TreeNode库这个暂不详细介绍,后面会有文章来详细讲解。

         3.2 关键是else语句块里,才是SqlParser解析SQL的核心代码:

[java] view
plain
 copy

  1. phrase(query)(new lexical.Scanner(input)) match {  
  2.        case Success(r, x) => r  
  3.        case x => sys.error(x.toString)  
  4.      }  

        可能 phrase方法大家很陌生,不知道是干什么的,那么我们首先看一下SqlParser的类图:

      

      SqlParser类继承了scala内置集合Parsers,这个Parsers。我们可以看到SqlParser现在是具有了分词的功能,也能解析combiner的语句(类似p ~> q,后面会介绍)。

     Phrase方法:

[java] view
plain
 copy

  1. /** A parser generator delimiting whole phrases (i.e. programs). 
  2.  * 
  3.  *  `phrase(p)` succeeds if `p` succeeds and no input is left over after `p`. 
  4.  * 
  5.  *  @param p the parser that must consume all input for the resulting parser 
  6.  *           to succeed. 
  7.  *  @return  a parser that has the same result as `p`, but that only succeeds 
  8.  *           if `p` consumed all the input. 
  9.  */  
  10. def phrase[T](p: Parser[T]) = new Parser[T] {  
  11.   def apply(in: Input) = lastNoSuccessVar.withValue(None) {  
  12.     p(in) match {  
  13.     case s @ Success(out, in1) =>  
  14.       if (in1.atEnd)  
  15.         s  
  16.       else  
  17.           lastNoSuccessVar.value filterNot { _.next.pos < in1.pos } getOrElse Failure("end of input expected", in1)  
  18.       case ns => lastNoSuccessVar.value.getOrElse(ns)  
  19.     }  
  20.   }  
  21. }  

     Phrase是一个循环读取输入字符的方法,如果输入in没有到达最后一个字符,就继续对parser进行解析,直到最后一个输入字符。

     我们注意到Success这个类,出现在Parser里, 在else块里最终返回的也有Success:

[java] view
plain
 copy

  1. /** The success case of `ParseResult`: contains the result and the remaining input. 
  2.   * 
  3.   *  @param result The parser's output 
  4.   *  @param next   The parser's remaining input 
  5.   */  
  6.  case class Success[+T](result: T, override val next: Input) extends ParseResult[T] {  

    通过源码可知,Success封装了当前解析器的解析结果result, 和还没有解析的语句。

   所以上面判断了Success的解析结果中in1.atEnd? 如果输入流结束了,就返回s,即Success对象,这个Success包含了SqlParser解析的输出。

二、Sql Parser核心

在SqlParser里phrase接受2个参数:

第一个是query,一种带模式的解析规则,返回的是LogicalPlan。

第二个是lexical词汇扫描输入。

SqlParser parse的流程是,用lexical词汇扫描接受SQL关键字,使用query模式来解析符合规则的SQL。

2.1 lexical keyword

在SqlParser里定义了KeyWord这个类:

[java] view
plain
 copy

  1. protected case class Keyword(str: String)  

在我使用的spark1.0.0版本里目前只支持了一下SQL保留字:

[java] view
plain
 copy

  1. protected val ALL = Keyword("ALL")  
  2.  protected val AND = Keyword("AND")  
  3.  protected val AS = Keyword("AS")  
  4.  protected val ASC = Keyword("ASC")  
  5.  protected val APPROXIMATE = Keyword("APPROXIMATE")  
  6.  protected val AVG = Keyword("AVG")  
  7.  protected val BY = Keyword("BY")  
  8.  protected val CACHE = Keyword("CACHE")  
  9.  protected val CAST = Keyword("CAST")  
  10.  protected val COUNT = Keyword("COUNT")  
  11.  protected val DESC = Keyword("DESC")  
  12.  protected val DISTINCT = Keyword("DISTINCT")  
  13.  protected val FALSE = Keyword("FALSE")  
  14.  protected val FIRST = Keyword("FIRST")  
  15.  protected val FROM = Keyword("FROM")  
  16.  protected val FULL = Keyword("FULL")  
  17.  protected val GROUP = Keyword("GROUP")  
  18.  protected val HAVING = Keyword("HAVING")  
  19.  protected val IF = Keyword("IF")  
  20.  protected val IN = Keyword("IN")  
  21.  protected val INNER = Keyword("INNER")  
  22.  protected val INSERT = Keyword("INSERT")  
  23.  protected val INTO = Keyword("INTO")  
  24.  protected val IS = Keyword("IS")  
  25.  protected val JOIN = Keyword("JOIN")  
  26.  protected val LEFT = Keyword("LEFT")  
  27.  protected val LIMIT = Keyword("LIMIT")  
  28.  protected val MAX = Keyword("MAX")  
  29.  protected val MIN = Keyword("MIN")  
  30.  protected val NOT = Keyword("NOT")  
  31.  protected val NULL = Keyword("NULL")  
  32.  protected val ON = Keyword("ON")  
  33.  protected val OR = Keyword("OR")  
  34.  protected val OVERWRITE = Keyword("OVERWRITE")  
  35.  protected val LIKE = Keyword("LIKE")  
  36.  protected val RLIKE = Keyword("RLIKE")  
  37.  protected val UPPER = Keyword("UPPER")  
  38.  protected val LOWER = Keyword("LOWER")  
  39.  protected val REGEXP = Keyword("REGEXP")  
  40.  protected val ORDER = Keyword("ORDER")  
  41.  protected val OUTER = Keyword("OUTER")  
  42.  protected val RIGHT = Keyword("RIGHT")  
  43.  protected val SELECT = Keyword("SELECT")  
  44.  protected val SEMI = Keyword("SEMI")  
  45.  protected val STRING = Keyword("STRING")  
  46.  protected val SUM = Keyword("SUM")  
  47.  protected val TABLE = Keyword("TABLE")  
  48.  protected val TRUE = Keyword("TRUE")  
  49.  protected val UNCACHE = Keyword("UNCACHE")  
  50.  protected val UNION = Keyword("UNION")  
  51.  protected val WHERE = Keyword("WHERE")  

这里根据这些保留字,反射,生成了一个SqlLexical

[java] view
plain
 copy

  1. override val lexical = new SqlLexical(reservedWords)  

SqlLexical利用它的Scanner这个Parser来读取输入,传递给query。

2.2 query

query的定义是Parser[LogicalPlan]  和 一堆奇怪的连接符(其实都是Parser的方法啦,看上图),*,~,^^^,看起来很让人费解。通过查阅读源码,以下列出几个常用的:

|  is the alternation combinator. It says “succeed if either the left or right operand parse successfully” 
左边算子和右边的算子只要有一个成功了,就返回succeed,类似or

~ is the sequential combinator. It says “succeed if the left operand parses successfully, and then the right parses successfully on the remaining input”
左边的算子成功后,右边的算子对后续的输入也计算成功,就返回succeed

opt  `opt(p)` is a parser that returns `Some(x)` if `p` returns `x` and `None` if `p` fails.
如果p算子成功则返回则返回Some(x) 如果p算子失败,返回fails

^^^ `p ^^^ v` succeeds if `p` succeeds; discards its result, and returns `v` instead.
如果左边的算子成功,取消左边算子的结果,返回右边算子。

~> says “succeed if the left operand parses successfully followed by the right, but do not include the left content in the result”
如果左边的算子和右边的算子都成功了,返回的结果中不包含左边的返回值。
  protected lazy val limit: Parser[Expression] =
    LIMIT ~> expression

<~ is the reverse, “succeed if the left operand is parsed successfully followed by the right, but do not include the right content in the result”
这个和~>操作符的意思相反,如果左边的算子和右边的算子都成功了,返回的结果中不包含右边的
    termExpression <~ IS ~ NOT ~ NULL ^^ { case e => IsNotNull(e) } |

^^{} 或者 ^^=> is the transformation combinator. It says “if the left operand parses successfully, transform the result using the function on the right”
rep => simply says “expect N-many repetitions of parser X” where X is the parser passed as an argument to rep
变形连接符,意思是如果左边的算子成功了,用^^右边的算子函数作用于返回的结果

接下来看query的定义:

[java] view
plain
 copy

  1. protected lazy val query: Parser[LogicalPlan] = (  
  2.    select * (  
  3.        UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |  
  4.        UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }  
  5.      )  
  6.    | insert | cache  
  7.  )  

没错,返回的是一个Parser,里面的类型是LogicalPlan。

query的定义其实是一种模式,用到了上述的诸多操作符,如|, ^^, ~> 等等

给定一种sql模式,如select,select xxx from yyy where ccc =ddd  如果匹配这种写法,则返回Success,否则返回Failure.

这里的模式是select 模式后面可以接union all 或者 union distinct。

即如下书写式合法的,否则出错。  

[java] view
plain
 copy

  1. select a,b from c   
  2. union all  
  3. select e,f from g  

这个 *号是一个repeat符号,即可以支持多个union all 子句。

看来目前spark1.0.0只支持这三种模式,即select, insert, cache。

那到底是怎么生成LogicalPlan的呢? 我们再看一个详细的:

[java] view
plain
 copy

  1. protected lazy val select: Parser[LogicalPlan] =  
  2.     SELECT ~> opt(DISTINCT) ~ projections ~  
  3.     opt(from) ~ opt(filter) ~  
  4.     opt(grouping) ~  
  5.     opt(having) ~  
  6.     opt(orderBy) ~  
  7.     opt(limit) <~ opt(";") ^^ {  
  8.       case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l  =>  
  9.         val base = r.getOrElse(NoRelation)  
  10.         val withFilter = f.map(f => Filter(f, base)).getOrElse(base)  
  11.         val withProjection =  
  12.           g.map {g =>  
  13.             Aggregate(assignAliases(g), assignAliases(p), withFilter)  
  14.           }.getOrElse(Project(assignAliases(p), withFilter))  
  15.         val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)  
  16.         val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)  
  17.         val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)  
  18.         val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder)  
  19.         withLimit  
  20.   }  

这里我给称它为select模式。

看这个select语句支持什么模式的写法:

select  distinct  projections from filter grouping having orderBy limit. 

给出一个符合的该select 模式的sql, 注意到 带opt连接符的是可选的,可以写distinct也可以不写。

[java] view
plain
 copy

  1. select  game_id, user_name from game_log where date<='2014-07-19' and user_name='shengli' group by game_id having game_id > 1 orderBy game_id limit 50.  

projections是什么呢?

其实是一个表达式,是一个Seq类型,一连串的表达式可以使 game_id也可以是 game_id AS gmid 。

返回的确实是一个Expression,是Catalyst里TreeNode。

[java] view
plain
 copy

  1. protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")  
  2.   
  3.   protected lazy val projection: Parser[Expression] =  
  4.     expression ~ (opt(AS) ~> opt(ident)) ^^ {  
  5.       case e ~ None => e  
  6.       case e ~ Some(a) => Alias(e, a)()  
  7.     }  

模式里from是什么的?

其实是一个relations,就是一个关系,在SQL里可以是表,表join表

[java] view
plain
 copy

  1. protected lazy val from: Parser[LogicalPlan] = FROM ~> relations  

[java] view
plain
 copy

  1. protected lazy val relation: Parser[LogicalPlan] =  
  2.   joinedRelation |  
  3.   relationFactor  
  4.   
  5. protected lazy val relationFactor: Parser[LogicalPlan] =  
  6.   ident ~ (opt(AS) ~> opt(ident)) ^^ {  
  7.     case tableName ~ alias => UnresolvedRelation(None, tableName, alias)  
  8.   } |  
  9.   "(" ~> query ~ ")" ~ opt(AS) ~ ident ^^ { case s ~ _ ~ _ ~ a => Subquery(a, s) }  
  10.   
  11.  protected lazy val joinedRelation: Parser[LogicalPlan] =  
  12.    relationFactor ~ opt(joinType) ~ JOIN ~ relationFactor ~ opt(joinConditions) ^^ {  
  13.     case r1 ~ jt ~ _ ~ r2 ~ cond =>  
  14.       Join(r1, r2, joinType = jt.getOrElse(Inner), cond)  
  15.    }  

这里看出来,其实就是table之间的操作,但是返回的Subquery确实是一个LogicalPlan

[java] view
plain
 copy

  1. case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {  
  2.   override def output = child.output.map(_.withQualifiers(alias :: Nil))  
  3.   override def references = Set.empty  
  4. }  

scala里的语法糖很多,这样写的确比较方便,但是对初学者可能有点晦涩了。

至此我们知道,SqlParser是怎么生成LogicalPlan的了。

三、总结

    本文从源代码剖析了Spark Catalyst 是如何将Sql解析成Unresolved逻辑计划(包含UnresolvedRelation、 UnresolvedFunction、 UnresolvedAttribute)的。

    sql文本作为输入,实例化了SqlParser,SqlParser的apply方法被调用,分别处理2种输入,一种是命令参数,一种是sql。对应命令参数的会生成一个叶子节点,SetCommand,对于sql语句,会调用Parser的phrase方法,由lexical的Scanner来扫描输入,分词,最后由query这个由我们定义好的sql模式利用parser的连接符来验证是否符合sql标准,如果符合则随即生成LogicalPlan语法树,不符合则会提示解析失败。

    通过对spark catalyst sql parser的解析,使我理解了,sql语言的语法标准是如何实现的和如何解析sql生成逻辑计划语法树。

——EOF——

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/37943507

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

时间: 2024-10-21 17:43:38

Spark-SparkSQL深入学习系列二(转自OopsOutOfMemory)的相关文章

ABP架构学习系列二:ABP中配置的注册和初始化

一.手工搭建平台 1.创建项目 创建MVC5项目,手动引入Abp.Abp.Web.Abp.Web.Mvc.Abp.Web.Api 使用nuget添加Newtonsoft.Json.Castle.Core.Castle.Windsor Install-Package Newtonsoft.Json -Version 8.0.3 Install-Package Castle.Windsor -Version 3.3.0 2.创建WebModule类 在App_Start下创建一个ZmBlogWebM

Android开发系列二之窗口Activity的生命周期_Android

在上篇文章给大家介绍了android开发系列一之用按钮实现显示时间,感兴趣的朋友可以点击阅读详情. 在Activity从创建到销毁的过程中需要在不同的阶段调用7个生命周期的方法这7个生命周期方法定义如下: protected void onCreate(Bundle savedInstanceState) protected void onStart() protected void onResume() protected void onPause() protected void onSto

ABP架构学习系列一 整体项目结构及目录

本系列是基于aspnetboilerplate-0.8.4.0版本写的,其中原因是由于较高的版本太抽象难以理解和分析,对于还菜菜的我要花更多的时间去学习. abp的源码分析学习主要来源于 HK Zhang ,他的博客是https://www.cnblogs.com/1zhk/ 一.什么是ABP ASP.NET Boilerplate(ABP)是现代新的Web应用程序的最佳实践和最流行的工具的起点.它的目标是实体模型.通用应用程序框架和项目模板. ABP是一个建立在最新的ASP.NET的MVC和W

Android开发系列二之窗口Activity的生命周期

在上篇文章给大家介绍了android开发系列一之用按钮实现显示时间,感兴趣的朋友可以点击阅读详情. 在Activity从创建到销毁的过程中需要在不同的阶段调用7个生命周期的方法这7个生命周期方法定义如下: protected void onCreate(Bundle savedInstanceState) protected void onStart() protected void onResume() protected void onPause() protected void onSto

使用腾讯云 GPU 学习深度学习系列之二:Tensorflow 简明原理【转】

转自:https://www.qcloud.com/community/article/598765?fromSource=gwzcw.117333.117333.117333 这是<使用腾讯云 GPU 学习深度学习>系列文章的第二篇,主要介绍了 Tensorflow 的原理,以及如何用最简单的Python代码进行功能实现.本系列文章主要介绍如何使用 腾讯云GPU服务器 进行深度学习运算,前面主要介绍原理部分,后期则以实践为主. 往期内容: 使用腾讯云 GPU 学习深度学习系列之一:传统机器学

ExtJs2.0学习系列(6)--Ext.FormPanel之第三式(ComboBox篇)

前言:说句实话,此extjs系列的文章在博客园中的热度不高,可能是学这玩意的人不多吧,但是我觉得有这么个系列的文章对于中国朋友非常有帮助!请大家支持! 上篇ExtJs2.0学习系列(5)--Ext.FormPanel之第二式中我们讨论了下fieldset和表单验证的知识,今天我们接着深入解析表单元素中ComboBox组件的使用.会涉及 到.net简单服务器数据交互,但暂不做深入讨论,以后会详细分析服务器交互相关,不过可能要等较长一段时间,呵呵! 5.服务器数据作为ComboBox的数据源实例 首

JAVA/JSP学习系列之八(改写MySQL翻页例子)

js|mysql|翻页 一.前言 其实,改写后的JDBC Data-Source是运行在Servlet中的,通过JNDI去查找数据源.我用Orion试的,将本站<JAVA/JSP学习系列之六(MySQL翻页例子) > 简单改写了一下. 二.配置 (1)JDBC 需要将用到的JDBC驱动Copy到[ORION]/lib目录下 (2)data-source 在[ORION]/config/data-sources.xml文件中加入如下: 〈data-source class="com.e

ExtJs2.0学习系列(12)--Ext.TreePanel之第一式

今天开始,我们就开始一起学习TreePanel了,道个歉,上篇的代码很乱阿. 我总是喜欢用最简单的例子开始,去理解最基本的使用方法,减少对i后面高级使用的干扰! TreePanel是继承自Panel,所以很多在Panel中谈到的属性这里可能会一笔带过,如有问题,请参考ExtJs2.0学习系列(2)--Ext.Panel 1.第一个静态树--最简单的树 效果图: html代码: <div id="container"> </div> js代码: Ext.onRea

WorldWind系列二:擒贼先擒王篇1

有了WorldWind系列一的基础,我们已经可以进行正常调试运行啦!可以先操作看看软件的功能吧,这样我们才可以知道WorldWind有哪些功能等待我们学习的. 开始我们的"WorldWind系列二:擒贼先擒王"分析WorldWind主窗体,从Main函数入口一步步解析学习.至少对于我来说,里面有很多知识要学的.(补充一下:无法进入WorldWind.cs窗体的设计界面,这个问题我早就发现了,但没解决,我们根据功能直接看代码吧) 1.使用System.Version在内部,读取软件版本信