作者:一帅
简介
SQL任务是ODPS中使用最频繁的一类作业,大部分用户开始使用ODPS时要做的第一件事情就是学习怎么写ODPS的SQL。ODPS SQL是一种非常灵活的语言,兼容大部分的SQL92规范,也对大规模计算场景做了一些特别的定制。有些用户写出的SQL让人看了之后茅塞顿开的感觉,也有一些神级用户经常写一些1000多行的SQL,让人看的只想撞墙。本文会介绍一下SQL是如何分析解析,并拆解成分布式飞天任务的一些实现原理。
ps.由于一些历史包袱和工程实现的原因,ODPS某些内部实现细节可能与本文提到的不一致
1. 编译
语法分析的作用是将一个输入的‘字符串’变换为一个描述这个字符串的‘结构体’,让计算机可以更容易的理解用户输入的字符串是什么意义。这个阶段包含三个过程,分别是词法分析、语法分析、输出抽象语法树。
1.1词法分析
词法分析器是一个确定有限自动机(DFA),可以按照我们定义好的词法,将输入的字符集转换为‘单词’。如下:
1.2语法分析
在词法分析之后,接下来的过程就是语法分析了,词法分析的结果会作为语法分析的输入,语法分析在词法分析的基础上,来判断用户输入的单词是否符合语法逻辑,*SELECT FOO+100 FROM POKES*就是一个符合语法的句子,而*SELECT FOO+100 FROM*,是个不合法的语句,因为在FROM之后,一定要跟着一个表名。此时语法分析器会报错:
1.3抽象语法树
抽象语法树
(AST)的英文全拼是:*abstract syntax tree
*,这是用户输入语句的树形结构的表现形式,树上的每一个节点都是一个单词
,树的结构体现了语法
。抽象语法树是随着语法分析的过程构造的,当语法分析正常结束后,语法分析器就会输出一个抽象语法树,用户的输入和抽象语法树的结构内容是一一对应的,至此,用户输入的‘字符串’完完全全的变成了一个‘结构体’, SELECT FOO+100 FROM POKES转换为抽象语法树后如下所示:
ps.在ODPS中,真实的抽象语法树会复杂许多,为了方便大家理解,我将输出的抽象语法树做了一些简化。
编译的过程在过去曾经是最为复杂繁琐的,涉及到很多编译原理的理论,但是现在,开源的编译器工具已经足够的多,我们可以定义好语法,让编译器工具来帮我们完成这个转换。目前我们使用编译工具:Antlr来完成我们的编译。
2.语义分析
语义分析阶段是SQL解析过程中最为复杂最有难度的一环,涉及到SQL标准,SQL优化,和MapReduce的相关理论和概念。在这里,接着上面环输出的抽象语法树,语意分析后会输出一个查询计划
,这个查询计划
会指导着物理执行算子一步步的运行在我们的分布式系统之上,去读取表的内容,根据SQL的语意做运算,最后输入用户的内容。接下来我们会逐步分解语义分析的过程,揭开庐山真面目
语义分析阶段包含两大块,先逻辑分析
后物理分析
,逻辑分析基本上是纯代数的分析过程,与底层的分布式环境无关,而物理分析则是将逻辑分析后的结果做变换,与底层的执行环境密切相关。如我们使用飞天的分布式环境,物理分析时就需要确定在MapReduce时如何将数据分区、排序、读取数据量的大小、启动多少个进程来执行任务,等等。
2.1逻辑分析
顾名思义,逻辑分析过程就是要分析一下输入的SQL语句到底是干什么的,都有哪些操作。一般来讲,一个SQL语句总有一个输入,一个输出,输入数据经过SQL加工后得到输出数据,
2.1.1语句的执行顺序
SQL语句基本可以分解成下面7大块:
(5)SELECT (6)DISTINCT < select list >
(1)FROM < table source >
(2)WHERE < condition >
(3)GROUP BY < group by list >
(4)HAVING < having condition >
(7) ORDER BY < order by list >
在执行时,按照1-7的标号顺序执行,有些子句是可选的,比如where子句。当没有出现的时候就跳过这步。我们发现,写在最前面的select子句其实并不是最先执行的,这是因为SQL语句设计时为了让用SQL的人更容易与自己的思维相衔接。
2.1.2逻辑算子
根据上述的几个SQL基本操作,我们抽象出了一些逻辑算子(Operator)
,这些算子的功能是单一的不可再拆分的单位。分别是:
这些奇怪的算子是干什么用的呢?说白了,一个逻辑查询计划就是由这些算子组成的一个有向无环图(DAG)
,每一个算子都描述了SQL操作里的不同动作,由算子组成的有向无环图(DAG)描述了数据流的方向.
对于大部分算子而言,都有一个输入数据集,和一个输出数据集。JoinOperator和UnionAllOperator比较特殊,拥有两个或者两个以上的输入数据集,因为这两个算子的操作就是要将多个数据集做关联。我们将算子的输入数据集
和输出数据集
称之为虚表(vtable)
用户是看不到虚表(vtable)的,它只用来做内部分析,是算子和算子之间的桥梁,如下图所示:
2.1.3表达式分析
在SQL里,有很多子句都可以带有表达式,比如
其中SELECT子句中,GROUP BY子句中, WHERE子句中都带有表达式。表达式的解析和计算贯穿着整个SQL解析的过程,所以这里单独讲讲表达式。
1.类型推导
在分析表达式时,会遇到用户输入的常量,我们需要通过类型推导给输入的每一个常量做标记,识别SQL中常量的类型,规则较为简单,如:
2.隐式类型转换
所有的编程语言都会遇到隐式类型转换的问题,即当调用一个函数时,如果输入参数类型不符合函数签名时,就要尝试对输入的参数做隐式类型转换。当然,并不一定每次隐式类型转换都是成功的,如果发现无法无论如何转换都无法满足函数的签名,就会有异常抛出,终止分析过程。
3.布尔表达式分析
布尔表达式的分析主要作用是可以让之后的SQL优化更容易的进行下去,如Join时的条件下推优化,分区裁剪优化,都需要使用布尔表达式分析后的结果来进行。这步分析会用到很多布尔代数的知识,目的只有一个,那就是将用户输入的冗长的布尔表达式变换为最简合取范式
,简而言之,就是将用户输入的一大推’and’ ‘or’组成的布尔表达式变换成由’and’连接的最简形式,如:
看起来这是一个很神奇的变换,实际上已经有很现成的算法来解决这个问题了。总共需要2步:
- 利用Quine McCluskey 算法对输入的布尔表达式生成合取范式(CNF)
- 利用Petrick’s method 算法对第一步生成的CNF计算最简合取范式(Minimal CNF)
4.CASE WHEN表达式的分析
CASE WHEN表达式是一个略显奇葩的表达式,它本身上是一个值函数(ScalarFunction)
,但又有逻辑判断,返回值又不固定,并且还可以嵌套使用,而且在语法上还有两种形式(简单CASE函数和CASE搜索函数) – -! 想在计算机里优雅的记录表达这个CASE WHEN真的很不容易。
condition参数是casewhen子句的条件,returnvalue1代表这THEN后的返回值,returnvalue2代表ELSE后的返回值。
这样,我们就可以很好的在计算机中结构化的表达,如:
2.1.4逻辑查询计划生成
有了以上的基础,我们就可以开始生成我们的查询计划了。严格按照SQL语句的执行顺序来遍历编译阶段生成的AST树,遇到什么操作就生成什么样的算子,遇到表达式就调用之前的表达式分析,真是兵来将挡水来土掩。
举个例子:
需要注意的是,在聚合函数里的值函数、Group by列表中的值函数,需要在聚合操作以前就计算完成,否则无法进行聚合操作,于是乎,出现了一个叫初始投影
的东西,本质上这是一个SelectOperator,只是用来计算一下聚合需要用到的表达式。
题外话,在很久以前,group by 列表中和聚合函数里都是不允许使用表达式的,只能使用单一的值或者列,所以那时也不需要初始投影。用户想使用类似功能时只能通过子查询来实现。后来SQL语法扩展了,支持了group by、聚合函数中调用值函数,于是,在SQL解析时要先判断一下是否需要初始投影
还有很多结构的SQL没有讲到,比如JOIN, UNION ALL, WINDOWN FUNCTION,由于篇幅原因,这里先不提了,感兴趣的同学可以来找我们私下交流。
2.1.5子查询
SQL语法本身就是一个递归的结构,支持在FROM之后写一个子查询,如:
面对这样的语句,我们只要先去生成子查询的逻辑查询计划,将子查询的的结果虚表作为父查询的输入即可,在逻辑上很方便去应对。上面这个示例的查询计划如下图所示:
2.1.6逻辑优化
生成逻辑查询计划后,需要先对查询计划做一次优化,将一些显而易见的点优化掉,避免冗余的计算。主要包含三个优化:
- 常量表达式的计算举个例子:
SELECT 1+2 FROM POKES
“1+2
“就是一个常量表达式,此时,我们可以将1+2的结果先计算出来,然后将结果放入查询计划,避免在执行时,对每一行数据都去计算这个固定结果的表达式。 - 列裁剪在生成查询计划时,默认会把全表中没一列的数据都读取出来,但现实的情况是用户可能只需要其中的某几列做计算,其他的列就变成了冗余数据,读取出来耗时耗力,但没有被用到。此时,我们就使用列裁剪这个优化去把不必要的列裁剪掉。
- Predict Push Down在遇有JOIN运算时,用户很有可能还要在JOIN之后做WHERE运算,此时就要从代数逻辑上分析,WHERE中计算的条件是否可以被提前到JOIN之前运算,以此来减少JOIN运算的数据量,提升效率,千言万语不胜一张图,(又称no pic you say a bird):
SELECT * FROM A JOIN B ON A.ID=B.ID WHERE A.AGE>10 AND B.AGE>5左面的是未优化前的查询计划,在FIL_4中计算了A.AGE>10 AND B.AGE>5这个表达式,右面的是优化后的查询计划,将A.AGE>10放入了FIL_7计算并且提前,将B.AGE>5放入了FIL_8中计算并且提前,最后将原有的FIL_4删除,以此来达到减少JOIN输入数据量的目的。
至此,逻辑查询与逻辑优化就结束了,逻辑查询计划和逻辑优化在所有的SQL系统中都是差不多的,下面来讲讲与我们分布式系统MapReduce相关的物理查询计划。
2.2物理分析
物理查询计划是通过之前产生的逻辑查询计划生成的,在转换的过程中,要与飞天的MapReduce编程框架做适配,生成飞天系统可以识别的DAG
2.2.1物理算子
飞天的DAG是一个类似MapReduce的编程框架,想把刚刚一个SQL跑在分布式的飞天系统上,就需要按照分布式系统编程框架来抽象出一些新的物理运算符。
- Shuffle-Sort算子(在ODPS中,这个算子叫ReduceSink)在飞天系统上,我们如果想做Group by或者Join操作,那么必须把相同key的数据放到同一个进程节点上来执行,而在这直线,这些相同key的数据也许是被打散在各个进程里的,这时我们就需要一个专门的算子来做数据的重新分区、排序的操作
- GroupBy的不同阶段在飞天系统上,我们想实现一个GroupBy需要有4步:
- 准备阶段(AggregationPrepare), 在做一些非线性的聚合函数操作时,比如AVG求平均值,需要将AVG()拆解成SUM(),COUNT()两个线性的聚合函数,最后再使用SUM()/COUNT()来算出AVG()的值。在这步,只做拆解。
- 本地聚合(SemiHashAggregation), 对于Group by来说,需要将所有Group by 列表的字段数据放倒一个机器上才可以进行完全聚合,但是出于优化考虑,我们可以在数据片不全的时候先做一次聚合,虽然这次聚合操作不完全,但是可以减少输出的数据量,并且可以保证数据的正确性
- 流式聚合(StremAggregation), 这个聚合有个前提,一定是要求前趋的虚表Group by 列表中的数据都会在这一个进程里,并且排好序。一般而言,在本地聚合之后,数据会通过Shuffle-Sort运算数据重新分区和排序,再输入到流式聚合算子中
- 合并(FinalAggregation),这里输入的其实是已经聚合好的结果了,但是由于第一步提到的原因,有些非线性聚合函数被分解成了线性聚合函数,这里要将他们合并。如:AVG()=SUM()/COUNT()
在只有线性聚合函数时,上面的1,4步可以省略。
- MapJoin 算子和MergeJoin算子
- MergeJoinMergeJoin是最常见的一种Join算子,一般而言,MergeJoin是要求输入数据的虚表按照Join的Key分区并且排序的,所以MergeJoin一般出现在Shuffle-Sort算子之后。
- MapJoin使用过的人应该都知道有一种Join的优化叫MapJoin,这个名字的本意是Map-side JOIN,就是JOIN运算在MapReduce的Map阶段完成。如果用户在做Join时,知道有一个数据表的数据量很小,可以选择使用MapJoin,MapJoin算子会在每一个进程里都把小表中的数据加载到内存,与打表一一做Join。这样可以减少一次Shuffle-Sort,提升执行效率。
2.2.2生成物理查询计划
逻辑查询计划是物理查询计划的输入,我们按照拓扑序去遍历逻辑查询计划上的每一个逻辑算子,生成物理算子,当我们认为虚表需要重新分区排序才能满足下一个阶段的运算时,我们就在中间加入一个Shuffle-Sort运算符.
还是使用逻辑查询计划生成的那个例子来描述一下物理查询计划是什么样子:
2.2.3物理优化
现在,又进入了一个优化的环节。此时的优化与底层的分布式系统更相关,主要目标就是减少读取的数据量,减少整个SQL执行的过程中,数据分区排序落地的过程。以此来提高执行效率。
- 分区裁剪大家知道,我们的业务表一般都是有分区的,而且一般都是按照时间来分区。大部分情况下不需要全表扫描,只需读出几个分区的数据就可以完成我们的业务逻辑。于是,分区裁剪优化诞了。
我们会分析用户写在WHERE子句中的分区字段,将分区字段的条件拿出来,再去metastore中读取所有的分区信息,用WHERE子句中的条件做过滤,最后,我们就知道哪些分区是需要读取的了,我们把要读取的分区信息放入对应的TableScanOperator,在执行是时,就不用读取不必要的数据了。
需要注意的是,并不是所有的WHERE条件中的分区条件都可以做裁剪,当用户写了LEFT JOIN,RIGHT JOIN, FULL OUTER JOIN时,如果在JOIN条件中涉及到了分区字段,那么很有可能就无法完成分区裁剪的优化,因为裁剪后SQL的结果就不对了。 - 减少不必要的Shuffle-Sort有时我们会写出这样的语句:
在上面这个例子中,Join 后做Group by ,应该在Join和Group by之间加入一个Shuffle-Sort算子,以保证Group by 算子的输入虚表按照固定的A.ID来排序,但是我们发现,JOIN之后A.ID这个字段本来就是有序的,所以,我们可以将中间这个Shuffle-Sort算子删除,减少数据的网络传输和落地。
2.2.4生成飞天的DAG
物理查询计划已经生成好了,下一步就是按照飞天的DAG编程模型把物理查询计划的算子适配进去。飞天DAG的单位是Stage,由多个Stage组成了DAG,Stage和Stage之间可以进行对数据的分区和排序,有点想Map和Reduce的关系。
生成飞天DAG的规则也很简单:
- 按照拓扑序遍历物理查询计划上的每一个算子,每一个算子都在一个独立的集和里。如果两个算子相连接,则将这两个集和合并。当遇到Shuffle-Sort算子时终止,并开始新一轮的合并集和过程。
对于上面这个语句,按照规则生成DAG后的样子如下:
其中每一个灰色的方块代表Fuxi的一个Stage。TS_1在STAGE1中读取表A,RS_3进按A.ID进行分区排序,TS_2在STAGE2中读取表B,RS_4按照B.ID进行分区排序。JOIN_5在STAGTE3中,按照A.ID=B.ID做MergeJoin, SEMIHASH_7为Group by A.AGE做半聚合,通过RS_8将数据按照A.AGE重新分区排序。 STAGE3的第一算子是STREAMEDAGG_9,接收按照A.AGE排序后的数据做流式聚合,最后SEL_10将数据做投影,FS_11将数据写出到磁盘。
3.结语
洋洋洒洒写了这么多,SQL解析的逻辑基本就结束了,SQL解析是一个逻辑非常复杂繁琐的过程,有很多细节和恶心的坑本文中还没有提到,稍有不慎就可能引起SQL正确性的错误。