Catalyst 优化逻辑执行计划规则

Optimizer

本文分析Catalyst Optimize部分实现的对逻辑执行计划(LogicalPlan)的处理规则

Optimizer处理的是LogicalPlan对象。

Optimizer的batches如下:

object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("ConstantFolding", Once,
      ConstantFolding, // 可静态分析的常量表达式
      BooleanSimplification, // 布尔表达式提前短路
      SimplifyFilters, // 简化过滤操作(false, true, null)
      SimplifyCasts) :: // 简化转换(对象所属类已经是Cast目标类)
    Batch("Filter Pushdown", Once,
      CombineFilters, // 相邻(上下级)Filter操作合并
      PushPredicateThroughProject, // 映射操作中的Filter谓词下推
      PushPredicateThroughInnerJoin) :: Nil // inner join操作谓词下推
}

这是4.1号最新的Catalyst  Optimizer的代码。

ConstantFolding 

把可以静态分析出结果的表达式替换成Literal表达式。

object ConstantFolding extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsDown {
      // Skip redundant folding of literals.
      case l: Literal => l
      case e if e.foldable => Literal(e.apply(null), e.dataType)
    }
  }
}

Literal能处理的类型包括Int, Long, Double, Float, Byte,Short, String, Boolean, null。这些类型分别对应的是Catalyst框架的DataType,包括IntegerType, LongType, DoubleType,FloatType, ByteType, ShortType, StringType, BooleanType, NullType。

普通的Literal是不可变的,还有一个可变的MutalLiteral类,有update方法可以改变里面的value。

BooleanSimplification 

提前短路可以短路的布尔表达式

object BooleanSimplification extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case q: LogicalPlan => q transformExpressionsUp {
      case and @ And(left, right) =>
        (left, right) match {
          case (Literal(true, BooleanType), r) => r
          case (l, Literal(true, BooleanType)) => l
          case (Literal(false, BooleanType), _) => Literal(false)
          case (_, Literal(false, BooleanType)) => Literal(false)
          case (_, _) => and
        }

      case or @ Or(left, right) =>
        (left, right) match {
          case (Literal(true, BooleanType), _) => Literal(true)
          case (_, Literal(true, BooleanType)) => Literal(true)
          case (Literal(false, BooleanType), r) => r
          case (l, Literal(false, BooleanType)) => l
          case (_, _) => or
        }
    }
  }
}

SimplifyFilters 

提前处理可以被判断的过滤操作

object SimplifyFilters extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Filter(Literal(true, BooleanType), child) =>
      child
    case Filter(Literal(null, _), child) =>
      LocalRelation(child.output)
    case Filter(Literal(false, BooleanType), child) =>
      LocalRelation(child.output)
  }
}

SimplifyCasts 

把已经是目标类的Cast表达式替换掉

object SimplifyCasts extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
    case Cast(e, dataType) if e.dataType == dataType => e
  }
}

CombineFilters 

相邻都是过滤操作的话,把两个过滤操作合起来。相邻指的是上下两级。

object CombineFilters extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case ff @ Filter(fc, nf @ Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
  }
}

PushPredicateThroughProject 

把Project操作中的过滤操作下推。这一步里顺带做了别名转换的操作(认为开销不大的前提下)。

object PushPredicateThroughProject extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
      val sourceAliases = fields.collect { case a @ Alias(c, _) =>
        (a.toAttribute: Attribute) -> c
      }.toMap // 把fields中的别名属性都取出来
      project.copy(child = filter.copy( // 生成新的Filter操作
        replaceAlias(condition, sourceAliases), // condition中有别名的替换掉
        grandChild))
  }

  def replaceAlias(condition: Expression, sourceAliases: Map[Attribute, Expression]): Expression = {
    condition transform {
      case a: AttributeReference => sourceAliases.getOrElse(a, a)
    }
  }
}

PushPredicateThroughInnerJoin 

先找到Filter操作,若Filter操作里面是一次inner join,那么先把Filter条件和inner join条件先全部取出来,

然后把只涉及到左侧或右侧的过滤操作下推到join外部,把剩下来不能下推的条件放到join操作的condition里。

object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) =>
      // 这一步是把过滤条件和join条件里的condition都提取出来
      val allConditions = splitConjunctivePredicates(filterCondition) ++
        joinCondition.map(splitConjunctivePredicates).getOrElse(Nil)

      // 把参考属性都属于右侧输出属性的condition挑选到rightCondition里
      val (rightConditions, leftOrJoinConditions) =
        allConditions.partition(_.references subsetOf right.outputSet)
      // 同理,把剩余condition里面,参考属性都属于左侧输出属性的condition挑选到
      // leftCondition里,剩余的就属于joinCondition
      val (leftConditions, joinConditions) =
        leftOrJoinConditions.partition(_.references subsetOf left.outputSet)

      // 生成新的left和right:先把condition里的操作用AND折叠起来,然后将该折叠后的表达式和原始的left/right logical plan合起来生成新的Filter操作,即新的Fil      // ter logical plan
      // 这样就做到了把过滤条件中的谓词下推到了left/right里,即本次inner join的“外部”
      val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
      val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
      Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And))
  }
}

以下帮助理解上面这段代码。

Join操作(LogicalPlan的Binary)

case class Join(
  left: LogicalPlan,
  right: LogicalPlan,
  joinType: JoinType,
  condition: Option[Expression]) extends BinaryNode {

  def references = condition.map(_.references).getOrElse(Set.empty)
  def output = left.output ++ right.output
}

Filter操作(LogicalPlan的Unary)

case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
  def output = child.output
  def references = condition.references
}

reduceLeftOption逻辑是这样的:

def reduceLeftOption[B >: A](op: (B, A) => B): Option[B] =
    if (isEmpty) None else Some(reduceLeft(op))

reduceLeft(op)的结果是op( op( ... op(x_1, x_2) ...,x_{n-1}), x_n)

谓词助手这个trait,负责把And操作里的condition分离开,返回表达式Seq

trait PredicateHelper {
  def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
    case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
    case other => other :: Nil
  }
}

Example

case class Person(name:String, age: Int)

case classNum(v1: Int, v2: Int)

case one

SELECT  people.age, num.v1,  num.v2

FROM

    people

    JOIN  num

    ON   people.age > 20  and  num.v1> 0

WHERE  num.v2< 50

== QueryPlan ==

Project [age#1:1,v1#2:2,v2#3:3]

CartesianProduct

      Filter(age#1:1 > 20)

          ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124

      Filter((v2#3:1 < 50) && (v1#2:0 > 0))

          ExistingRdd [v1#2,v2#3],MappedRDD[10] at map at basicOperators.scala:124

 

分析:where条件 num.v2 < 50 下推到Join里

case two

SELECT people.age,  1+2

FROM

    people

    JOIN  num

    ON   people.name<>’abc’ 
and
  num.v1> 0

WHERE num.v2 < 50

 

== QueryPlan ==

Project [age#1:1,3 AS c1#14]

    CartesianProduct

        Filter
NOT
(name#0:0 = abc)

            ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124

        Filter((v2#3:1 < 50) && (v1#2:0 > 0))

            ExistingRdd[v1#2,v2#3], MappedRDD[10] at map at basicOperators.scala:124

 

分析:1+2 被提前常量折叠,并被取了一个别名

全文完 :)

时间: 2024-11-02 21:29:57

Catalyst 优化逻辑执行计划规则的相关文章

Pig源码分析: 逻辑执行计划优化

Whole View 本文分析的是逻辑执行计划优化的代码结构,具体每种Rule的实现不做分析. 看本文之前最好参考之前那篇逻辑执行计划模型的文章. Architecture 几个关键类/接口的关系: 每个关键类/接口的实现和继承结构在下面各节展开. Optimizer PlanOptimizer是抽象类,主要和Rule.PlanTransformListener.OperatorPlan打交道. public abstract class PlanOptimizer { protected Li

【性能优化】执行计划与直方图

在Oracle中直方图是一种对数据分布质量情况进行描述的工具.它会按照某一列不同值出现数量多少,以及出现的频率高低来绘制数据的分布情况,以便能够指导优化器根据数据的分布做出正确的选择.在某些情况下,表的列中的数值分布将会影响优化器使用索引还是执行全表扫描的决策.当 where 子句的值具有不成比例数量的数值时,将出现这种情况,使得全表扫描比索引访问的成本更低.这种情况下如果where 子句的过滤谓词列之上上有一个合理的正确的直方图,将会对优化器做出正确的选择发挥巨大的作用,使得SQL语句执行成本

关键时刻HINT出彩 - PG优化器的参数优化、执行计划固化CASE

背景 有过数据库使用经验的童鞋可曾遇到过SQL执行计划不准确,或者SQL执行计划抖动的问题. PostgreSQL的执行计划与大多数的企业数据库是一样的,都是基于成本优化. 基于成本优化的优化器,在算法靠谱,统计信息准确的前提下,通常得到的执行计划是比较准确的. 那么什么时候执行计划可能不准确呢? 成本估算的算法不好 这个需要内核的不断改进,完善.在没有合理的算法支撑的情况下,内核中往往会带有一些经验值,或者将这些经验值开放给用户设置. 统计信息不准确 PG的统计信息收集调度是几个参数共同决定的

Pig源码分析: 逻辑执行计划模块

Whole View 本文分析的是Pig Logical模块的代码(newplan package下),具体每种逻辑执行的实现类不会做具体分析. Architecture 关键类/接口关系图 下面对关键类/接口具体实现做分析 Operator public abstract class Operator { protected SourceLocation location; // The location of the operator in the original pig script.

Pig源码分析: 简析执行计划的生成

摘要 本文通过跟代码的方式,分析从输入一批Pig-latin到输出物理执行计划(与launcher引擎有关,一般是MR执行计划,也可以是Spark RDD的执行算子)的整体流程. 不会具体涉及AST如何解析.如何使用了Anltr.逻辑执行计划如何映射.逻辑执行计划如何优化.MR执行计划如何切分为MR Job,而是从输入一批Pig DSL到待执行的真正执行计划的关键变化步骤(方法和类). 执行计划完整解析 入口处书Main类的main函数 /** * The Main-Class for the

《Oracle高性能SQL引擎剖析:SQL优化与调优机制详解》一第一篇 执行计划

第一篇 执行计划 执行计划是指示Oracle如何获取和过滤数据.产生最终结果集,是影响SQL语句执行性能的关键因素.我们在深入了解执行计划之前,首先需要知道执行计划是在什么时候产生的,以及如何让SQL引擎为语句生成执行计划. 在深入了解执行计划之前,我们先了解SQL语句的处理执行过程.当一条语句提交到Oracle后,SQL引擎会分为三个步骤对其处理和执行:解析(Parse).执行(Execute)和获取(Fetch),分别由SQL引擎的不同组件完成.SQL引擎的组件如图1-1所示. 1. SQL

《Oracle高性能SQL引擎剖析:SQL优化与调优机制详解》一1.2 显示执行计划

1.2 显示执行计划 我们现在知道,有三个途径可以获取查询计划:v$sql_plan.dba_hist_sql_plan和PLAN_TABLE.如果需要读取一条SQL语句的执行计划,就需要知道该条语句的SQL_ID,如果该语句存在多个游标或者执行计划,则还需要知道游标的CHILD_NUMBER或计划的哈希值(可选).而无论我们通过哪个途径来获取执行计划,显示方式主要是两种:语句查询和包DBMS_XPLAN显示. 1.2.1 通过查询语句显示计划 通过查询语句从一些视图里读出执行计划并作格式化输出

Spark SQL 物理执行计划各操作实现

SparkStrategy: logical to physical Catalyst作为一个实现无关的查询优化框架,在优化后的逻辑执行计划到真正的物理执行计划这部分只提供了接口,没有提供像Analyzer和Optimizer那样的实现. 本文介绍的是Spark SQL组件各个物理执行计划的操作实现.把优化后的逻辑执行计划映射到物理执行操作类这部分由SparkStrategies类实现,内部基于Catalyst提供的Strategy接口,实现了一些策略,用于分辨logicalPlan子类并替换为

Oracle中获取执行计划的几种方法

1. 预估执行计划 - Explain Plan Explain plan以SQL语句作为输入,得到这条 SQL语句的执行计划,并将执行计划输出存储到计划表中. 首先,在你要执行的SQL语 句前加explain plan for,此时将生成的执行计划存储到计划表中,语句如下: explain plan for SQL语句 然后,在计划表中查询刚刚生成的执行计划,语 句如下: select * from table(dbms_xplan.display); 注意:Explain plan 只生成执