Spark-SparkSQL深入学习系列六(转自OopsOutOfMemory)

  /** Spark SQL源码分析系列文章*/

  前面几篇文章主要介绍的是Spark sql包里的的spark
sql执行流程
,以及Catalyst包内的SqlParserAnalyzerOptimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical
Plan。物理计划是Spark SQL执行Spark job的前置,也是最后一道计划。

  如图:

  

一、SparkPlanner

 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized Logical Plan进行转换,生成Physical plans。

[java] view
plain
 copy

  1. lazy val optimizedPlan = optimizer(analyzed)  
  2.     // TODO: Don't just pick the first one...  
  3.     lazy val sparkPlan = planner(optimizedPlan).next()  

  SparkPlanner的apply方法,会返回一个Iterator[PhysicalPlan]。
  SparkPlanner继承了SparkStrategies,SparkStrategies继承了QueryPlanner。
  SparkStrategies包含了一系列特定的Strategies,这些Strategies是继承自QueryPlanner中定义的Strategy,它定义接受一个Logical Plan,生成一系列的Physical Plan

[java] view
plain
 copy

  1. @transient  
  2. protected[sql] val planner = new SparkPlanner  
  3.   
  4.   protected[sql] class SparkPlanner extends SparkStrategies {  
  5.   val sparkContext: SparkContext = self.sparkContext  
  6.   
  7.   val sqlContext: SQLContext = self  
  8.   
  9.   def numPartitions = self.numShufflePartitions //partitions的个数  
  10.   
  11.   val strategies: Seq[Strategy] =  //策略的集合  
  12.     CommandStrategy(self) ::  
  13.     TakeOrdered ::  
  14.     PartialAggregation ::  
  15.     LeftSemiJoin ::  
  16.     HashJoin ::  
  17.     InMemoryScans ::  
  18.     ParquetOperations ::  
  19.     BasicOperators ::  
  20.     CartesianProduct ::  
  21.     BroadcastNestedLoopJoin :: Nil  
  22. etc......  
  23. }  

QueryPlanner 是SparkPlanner的基类,定义了一系列的关键点,如Strategy,planLater和apply。

[java] view
plain
 copy

  1. abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {  
  2.   /** A list of execution strategies that can be used by the planner */  
  3.   def strategies: Seq[Strategy]  
  4.   
  5.   /** 
  6.    * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can 
  7.    * be used for execution. If this strategy does not apply to the give logical operation then an 
  8.    * empty list should be returned. 
  9.    */  
  10.   abstract protected class Strategy extends Logging {  
  11.     def apply(plan: LogicalPlan): Seq[PhysicalPlan]  //接受一个logical plan,返回Seq[PhysicalPlan]  
  12.   }  
  13.   
  14.   /** 
  15.    * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be 
  16.    * filled in automatically by the QueryPlanner using the other execution strategies that are 
  17.    * available. 
  18.    */  
  19.   protected def planLater(plan: LogicalPlan) = apply(plan).next() //返回一个占位符,占位符会自动被QueryPlanner用其它的strategies apply  
  20.   
  21.   def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = {  
  22.     // Obviously a lot to do here still...  
  23.     val iter = strategies.view.flatMap(_(plan)).toIterator //整合所有的Strategy,_(plan)每个Strategy应用plan上,得到所有Strategies执行完后生成的所有Physical Plan的集合,一个iter  
  24.     assert(iter.hasNext, s"No plan for $plan")  
  25.     iter //返回所有物理计划  
  26.   }  
  27. }  

  继承关系:

二、Spark Plan

 Spark Plan是Catalyst里经过所有Strategies apply 的最终的物理执行计划的抽象类,它只是用来执行spark job的。

[java] view
plain
 copy

  1. lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)  

prepareForExecution其实是一个RuleExecutor[SparkPlan],当然这里的Rule就是SparkPlan了。

[java] view
plain
 copy

  1. @transient  
  2.  protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {  
  3.    val batches =  
  4.      Batch("Add exchange", Once, AddExchange(self)) :: //添加shuffler操作如果必要的话  
  5.      Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil //Bind references  
  6.  }  

Spark Plan继承Query Plan[Spark Plan],里面定义的partition,requiredChildDistribution以及spark sql启动执行的execute方法。

[java] view
plain
 copy

  1. abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {  
  2.   self: Product =>  
  3.   
  4.   // TODO: Move to `DistributedPlan`  
  5.   /** Specifies how data is partitioned across different nodes in the cluster. */  
  6.   def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH!  
  7.   /** Specifies any partition requirements on the input data for this operator. */  
  8.   def requiredChildDistribution: Seq[Distribution] =  
  9.     Seq.fill(children.size)(UnspecifiedDistribution)  
  10.   
  11.   /** 
  12.    * Runs this query returning the result as an RDD. 
  13.    */  
  14.   def execute(): RDD[Row]  //真正执行查询的方法execute,返回的是一个RDD  
  15.   
  16.   /** 
  17.    * Runs this query returning the result as an array. 
  18.    */  
  19.   def executeCollect(): Array[Row] = execute().map(_.copy()).collect() //exe & collect  
  20.   
  21.   protected def buildRow(values: Seq[Any]): Row =  //根据当前的值,生成Row对象,其实是一个封装了Array的对象。  
  22.     new GenericRow(values.toArray)  
  23. }  

  关于Spark Plan的继承关系,如图:

三、Strategies

  Strategy,注意这里Strategy是在execution包下的,在SparkPlanner里定义了目前的几种策略:
  LeftSemiJoin、HashJoin、PartialAggregation、BroadcastNestedLoopJoin、CartesianProduct、TakeOrdered、ParquetOperations、InMemoryScans、BasicOperators、CommandStrategy

 3.1、LeftSemiJoin

Join分为好几种类型:

[java] view
plain
 copy

  1. case object Inner extends JoinType  
  2. case object LeftOuter extends JoinType  
  3. case object RightOuter extends JoinType  
  4. case object FullOuter extends JoinType  
  5. case object LeftSemi extends JoinType  

  如果Logical Plan里的Join是joinType为LeftSemi的话,就会执行这种策略,
  这里ExtractEquiJoinKeys是一个pattern定义在patterns.scala里,主要是做模式匹配用的。
  这里匹配只要是等值的join操作,都会封装为ExtractEquiJoinKeys对象,它会解析当前join,最后返回(joinType, rightKeys, leftKeys, condition, leftChild, rightChild)的格式。
  最后返回一个execution.LeftSemiJoinHash这个Spark Plan,可见Spark Plan的类图继承关系图。

[java] view
plain
 copy

  1. object LeftSemiJoin extends Strategy with PredicateHelper {  
  2.    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  3.      // Find left semi joins where at least some predicates can be evaluated by matching join keys  
  4.      case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>  
  5.        val semiJoin = execution.LeftSemiJoinHash(  //根据解析后的Join,实例化execution.LeftSemiJoinHash这个Spark Plan 返回  
  6.          leftKeys, rightKeys, planLater(left), planLater(right))  
  7.        condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil  
  8.      // no predicate can be evaluated by matching hash keys  
  9.      case logical.Join(left, right, LeftSemi, condition) =>  //没有Join key的,即非等值join连接的,返回LeftSemiJoinBNL这个Spark Plan  
  10.        execution.LeftSemiJoinBNL(   
  11.          planLater(left), planLater(right), condition)(sqlContext) :: Nil  
  12.      case _ => Nil  
  13.    }  
  14.  }  

3.2、HashJoin

  HashJoin是我们最见的操作,innerJoin类型,里面提供了2种Spark Plan,BroadcastHashJoin 和 ShuffledHashJoin
  BroadcastHashJoin的实现是一种广播变量的实现方法,如果设置了spark.sql.join.broadcastTables这个参数的表(表面逗号隔开)
  就会用spark的Broadcast Variables方式先将一张表给查询出来,然后广播到各个机器中,相当于Hive中的map join。
  ShuffledHashJoin是一种最传统的默认的join方式,会根据shuffle key进行shuffle的hash join。

[java] view
plain
 copy

  1. object HashJoin extends Strategy with PredicateHelper {  
  2.    private[this] def broadcastHashJoin(  
  3.        leftKeys: Seq[Expression],  
  4.        rightKeys: Seq[Expression],  
  5.        left: LogicalPlan,  
  6.        right: LogicalPlan,  
  7.        condition: Option[Expression],  
  8.        side: BuildSide) = {  
  9.      val broadcastHashJoin = execution.BroadcastHashJoin(  
  10.        leftKeys, rightKeys, side, planLater(left), planLater(right))(sqlContext)  
  11.      condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil  
  12.    }  
  13.   
  14.    def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer //获取需要广播的表  
  15.   
  16.    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  17.      case ExtractEquiJoinKeys(  
  18.              Inner,  
  19.              leftKeys,  
  20.              rightKeys,  
  21.              condition,  
  22.              left,  
  23.              right @ PhysicalOperation(_, _, b: BaseRelation))  
  24.        if broadcastTables.contains(b.tableName) => //如果右孩子是广播的表,则buildSide取BuildRight  
  25.          broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)  
  26.   
  27.      case ExtractEquiJoinKeys(  
  28.              Inner,  
  29.              leftKeys,  
  30.              rightKeys,  
  31.              condition,  
  32.              left @ PhysicalOperation(_, _, b: BaseRelation),  
  33.              right)  
  34.        if broadcastTables.contains(b.tableName) =>//如果左孩子是广播的表,则buildSide取BuildLeft  
  35.          broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)  
  36.   
  37.      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>  
  38.        val hashJoin =  
  39.          execution.ShuffledHashJoin( //根据hash key shuffle的 Hash Join  
  40.            leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))  
  41.        condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil  
  42.   
  43.      case _ => Nil  
  44.    }  
  45.  }  

3.3、PartialAggregation

  PartialAggregation是一个部分聚合的策略,即有些聚合操作可以在local里面完成的,就在local data里完成,而不必要的去shuffle所有的字段。

[java] view
plain
 copy

  1. object PartialAggregation extends Strategy {  
  2.     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  3.       case logical.Aggregate(groupingExpressions, aggregateExpressions, child) =>   
  4.         // Collect all aggregate expressions.  
  5.         val allAggregates =  
  6.           aggregateExpressions.flatMap(_ collect { case a: AggregateExpression => a })  
  7.         // Collect all aggregate expressions that can be computed partially.  
  8.         val partialAggregates =  
  9.           aggregateExpressions.flatMap(_ collect { case p: PartialAggregate => p })  
  10.   
  11.         // Only do partial aggregation if supported by all aggregate expressions.  
  12.         if (allAggregates.size == partialAggregates.size) {  
  13.           // Create a map of expressions to their partial evaluations for all aggregate expressions.  
  14.           val partialEvaluations: Map[Long, SplitEvaluation] =  
  15.             partialAggregates.map(a => (a.id, a.asPartial)).toMap  
  16.   
  17.           // We need to pass all grouping expressions though so the grouping can happen a second  
  18.           // time. However some of them might be unnamed so we alias them allowing them to be  
  19.           // referenced in the second aggregation.  
  20.           val namedGroupingExpressions: Map[Expression, NamedExpression] = groupingExpressions.map {  
  21.             case n: NamedExpression => (n, n)  
  22.             case other => (other, Alias(other, "PartialGroup")())  
  23.           }.toMap  
  24.   
  25.           // Replace aggregations with a new expression that computes the result from the already  
  26.           // computed partial evaluations and grouping values.  
  27.           val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp {  
  28.             case e: Expression if partialEvaluations.contains(e.id) =>  
  29.               partialEvaluations(e.id).finalEvaluation  
  30.             case e: Expression if namedGroupingExpressions.contains(e) =>  
  31.               namedGroupingExpressions(e).toAttribute  
  32.           }).asInstanceOf[Seq[NamedExpression]]  
  33.   
  34.           val partialComputation =  
  35.             (namedGroupingExpressions.values ++  
  36.              partialEvaluations.values.flatMap(_.partialEvaluations)).toSeq  
  37.   
  38.           // Construct two phased aggregation.  
  39.           execution.Aggregate( //返回execution.Aggregate这个Spark Plan  
  40.             partial = false,  
  41.             namedGroupingExpressions.values.map(_.toAttribute).toSeq,  
  42.             rewrittenAggregateExpressions,  
  43.             execution.Aggregate(  
  44.               partial = true,  
  45.               groupingExpressions,  
  46.               partialComputation,  
  47.               planLater(child))(sqlContext))(sqlContext) :: Nil  
  48.         } else {  
  49.           Nil  
  50.         }  
  51.       case _ => Nil  
  52.     }  
  53.   }  

 3.4、BroadcastNestedLoopJoin

  BroadcastNestedLoopJoin是用于Left Outer Join, RightOuter, FullOuter这三种类型的join
 而上述的Hash Join仅仅用于InnerJoin,这点要区分开来。

[java] view
plain
 copy

  1. object BroadcastNestedLoopJoin extends Strategy {  
  2.   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  3.     case logical.Join(left, right, joinType, condition) =>  
  4.       execution.BroadcastNestedLoopJoin(  
  5.         planLater(left), planLater(right), joinType, condition)(sqlContext) :: Nil  
  6.     case _ => Nil  
  7.   }  
  8. }  

部分代码;

[java] view
plain
 copy

  1.     if (!matched && (joinType == LeftOuter || joinType == FullOuter)) {  //LeftOuter or FullOuter  
  2.       matchedRows += buildRow(streamedRow ++ Array.fill(right.output.size)(null))  
  3.     }  
  4.   }  
  5.   Iterator((matchedRows, includedBroadcastTuples))  
  6. }  
  7.   
  8. val includedBroadcastTuples = streamedPlusMatches.map(_._2)  
  9. val allIncludedBroadcastTuples =  
  10.   if (includedBroadcastTuples.count == 0) {  
  11.     new scala.collection.mutable.BitSet(broadcastedRelation.value.size)  
  12.   } else {  
  13.     streamedPlusMatches.map(_._2).reduce(_ ++ _)  
  14.   }  
  15.   
  16. val rightOuterMatches: Seq[Row] =  
  17.   if (joinType == RightOuter || joinType == FullOuter) { //RightOuter or FullOuter  
  18.     broadcastedRelation.value.zipWithIndex.filter {  
  19.       case (row, i) => !allIncludedBroadcastTuples.contains(i)  
  20.     }.map {  
  21.       // TODO: Use projection.  
  22.       case (row, _) => buildRow(Vector.fill(left.output.size)(null) ++ row)  
  23.     }  
  24.   } else {  
  25.     Vector()  
  26.   }  

3.5、CartesianProduct 

[java] view
plain
 copy

  1. 笛卡尔积的Join,有待过滤条件的Join。  
  2. 主要是利用RDD的cartesian实现的。  
  3. object CartesianProduct extends Strategy {  
  4.   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  5.     case logical.Join(left, right, _, None) =>  
  6.       execution.CartesianProduct(planLater(left), planLater(right)) :: Nil  
  7.     case logical.Join(left, right, Inner, Some(condition)) =>  
  8.       execution.Filter(condition,  
  9.         execution.CartesianProduct(planLater(left), planLater(right))) :: Nil  
  10.     case _ => Nil  
  11.   }  
  12. }  

3.6、TakeOrdered

  TakeOrdered是用于Limit操作的,如果有Limit和Sort操作。
  则返回一个TakeOrdered的Spark Plan。
  主要也是利用RDD的takeOrdered方法来实现的排序后取TopN。

[java] view
plain
 copy

  1. object TakeOrdered extends Strategy {  
  2.   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  3.     case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>  
  4.       execution.TakeOrdered(limit, order, planLater(child))(sqlContext) :: Nil  
  5.     case _ => Nil  
  6.   }  
  7. }  

 3.7、ParquetOperations

支持ParquetOperations的读写,插入Table等。

[java] view
plain
 copy

  1. object ParquetOperations extends Strategy {  
  2.   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  3.     // TODO: need to support writing to other types of files.  Unify the below code paths.  
  4.     case logical.WriteToFile(path, child) =>  
  5.       val relation =  
  6.         ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)  
  7.       // Note: overwrite=false because otherwise the metadata we just created will be deleted  
  8.       InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sqlContext) :: Nil  
  9.     case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>  
  10.       InsertIntoParquetTable(table, planLater(child), overwrite)(sqlContext) :: Nil  
  11.     case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>  
  12.       val prunePushedDownFilters =  
  13.         if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {  
  14.           (filters: Seq[Expression]) => {  
  15.             filters.filter { filter =>  
  16.               // Note: filters cannot be pushed down to Parquet if they contain more complex  
  17.               // expressions than simple "Attribute cmp Literal" comparisons. Here we remove  
  18.               // all filters that have been pushed down. Note that a predicate such as  
  19.               // "(A AND B) OR C" can result in "A OR C" being pushed down.  
  20.               val recordFilter = ParquetFilters.createFilter(filter)  
  21.               if (!recordFilter.isDefined) {  
  22.                 // First case: the pushdown did not result in any record filter.  
  23.                 true  
  24.               } else {  
  25.                 // Second case: a record filter was created; here we are conservative in  
  26.                 // the sense that even if "A" was pushed and we check for "A AND B" we  
  27.                 // still want to keep "A AND B" in the higher-level filter, not just "B".  
  28.                 !ParquetFilters.findExpression(recordFilter.get, filter).isDefined  
  29.               }  
  30.             }  
  31.           }  
  32.         } else {  
  33.           identity[Seq[Expression]] _  
  34.         }  
  35.       pruneFilterProject(  
  36.         projectList,  
  37.         filters,  
  38.         prunePushedDownFilters,  
  39.         ParquetTableScan(_, relation, filters)(sqlContext)) :: Nil  
  40.   
  41.     case _ => Nil  
  42.   }  
  43. }  

  3.8、InMemoryScans

  InMemoryScans主要是对InMemoryRelation这个Logical Plan操作。
  调用的其实是Spark Planner里的pruneFilterProject这个方法。

[java] view
plain
 copy

  1. object InMemoryScans extends Strategy {  
  2.    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  3.      case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>  
  4.        pruneFilterProject(  
  5.          projectList,  
  6.          filters,  
  7.          identity[Seq[Expression]], // No filters are pushed down.  
  8.          InMemoryColumnarTableScan(_, mem)) :: Nil  
  9.      case _ => Nil  
  10.    }  
  11.  }  

3.9、BasicOperators

  所有定义在org.apache.spark.sql.execution里的基本的Spark Plan,它们都在org.apache.spark.sql.execution包下basicOperators.scala内的
  有Project、Filter、Sample、Union、Limit、TakeOrdered、Sort、ExistingRdd。
  这些是基本元素,实现都相对简单,基本上都是RDD里的方法来实现的。

[java] view
plain
 copy

  1. object BasicOperators extends Strategy {  
  2.    def numPartitions = self.numPartitions  
  3.   
  4.    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  5.      case logical.Distinct(child) =>  
  6.        execution.Aggregate(  
  7.          partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil  
  8.      case logical.Sort(sortExprs, child) =>  
  9.        // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.  
  10.        execution.Sort(sortExprs, global = true, planLater(child)):: Nil  
  11.      case logical.SortPartitions(sortExprs, child) =>  
  12.        // This sort only sorts tuples within a partition. Its requiredDistribution will be  
  13.        // an UnspecifiedDistribution.  
  14.        execution.Sort(sortExprs, global = false, planLater(child)) :: Nil  
  15.      case logical.Project(projectList, child) =>  
  16.        execution.Project(projectList, planLater(child)) :: Nil  
  17.      case logical.Filter(condition, child) =>  
  18.        execution.Filter(condition, planLater(child)) :: Nil  
  19.      case logical.Aggregate(group, agg, child) =>  
  20.        execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil  
  21.      case logical.Sample(fraction, withReplacement, seed, child) =>  
  22.        execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil  
  23.      case logical.LocalRelation(output, data) =>  
  24.        val dataAsRdd =  
  25.          sparkContext.parallelize(data.map(r =>  
  26.            new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row))  
  27.        execution.ExistingRdd(output, dataAsRdd) :: Nil  
  28.      case logical.Limit(IntegerLiteral(limit), child) =>  
  29.        execution.Limit(limit, planLater(child))(sqlContext) :: Nil  
  30.      case Unions(unionChildren) =>  
  31.        execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil  
  32.      case logical.Generate(generator, join, outer, _, child) =>  
  33.        execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil  
  34.      case logical.NoRelation =>  
  35.        execution.ExistingRdd(Nil, singleRowRdd) :: Nil  
  36.      case logical.Repartition(expressions, child) =>  
  37.        execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil  
  38.      case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil  
  39.      case _ => Nil  
  40.    }  
  41.  }  

  3.10 CommandStrategy

  CommandStrategy是专门针对Command类型的Logical Plan
  即set key = value 、 explain sql、 cache table xxx 这类操作
  SetCommand主要实现方式是SparkContext的参数
  ExplainCommand主要实现方式是利用executed Plan打印出tree string
  CacheCommand主要实现方式SparkContext的cache table和uncache table

 

[java] view
plain
 copy

  1. case class CommandStrategy(context: SQLContext) extends Strategy {  
  2.     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  3.       case logical.SetCommand(key, value) =>  
  4.         Seq(execution.SetCommand(key, value, plan.output)(context))  
  5.       case logical.ExplainCommand(logicalPlan) =>  
  6.         Seq(execution.ExplainCommand(logicalPlan, plan.output)(context))  
  7.       case logical.CacheCommand(tableName, cache) =>  
  8.         Seq(execution.CacheCommand(tableName, cache)(context))  
  9.       case _ => Nil  
  10.     }  
  11.   }  

四、Execution

Spark Plan的Execution方式均为调用其execute()方法生成RDD,除了简单的基本操作例如上面的basic operator实现比较简单,其它的实现都比较复杂,大致的实现我都在上面介绍了,本文就不详细讨论了。

五、总结

  本文从介绍了Spark SQL的Catalyst框架的Physical plan以及其如何从Optimized Logical Plan转化为Spark Plan的过程,这个过程用到了很多的物理计划策略Strategies,每个Strategies最后还是在RuleExecutor里面被执行,最后生成一系列物理计划Executed Spark Plans。
  Spark Plan是执行前最后一种计划,当生成executed spark plan后,就可以调用collect()方法来启动Spark Job来进行Spark SQL的真正执行了。
——EOF——

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/38235247

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

时间: 2024-09-14 04:37:27

Spark-SparkSQL深入学习系列六(转自OopsOutOfMemory)的相关文章

Spark修炼之道系列教程预告

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 部分内容会在实际编写时动态调整,或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据开

Python爬虫学习系列教程

Python版本:2.7 一.爬虫入门 1. Python爬虫入门一之综述 2. Python爬虫入门二之爬虫基础了解 3. Python爬虫入门三之Urllib库的基本使用 4. Python爬虫入门四之Urllib库的高级用法 5. Python爬虫入门五之URLError异常处理 6. Python爬虫入门六之Cookie的使用 7. Python爬虫入门七之正则表达式 二.爬虫实战 1. Python爬虫实战一之爬取糗事百科段子 2. Python爬虫实战二之爬取百度贴吧帖子 3. Py

Asp.Net Web API 2 官网菜鸟学习系列导航[持续更新中]

原文:Asp.Net Web API 2 官网菜鸟学习系列导航[持续更新中] 前言 本来一直参见于微软官网进行学习的, 官网网址http://www.asp.net/web-api.出于自己想锻炼一下学习阅读英文文章的目的,又可以学习下微软新发布的技术,其实也很久了,但自己菜鸟一枚,对自己来说都是新技术了.鉴于以上两个原因,本人打算借助google翻译和有道词典,来翻译学习这个系列,并通过博客园来记录自己的翻译学习过程.由于自己阅读水平的确太菜,在借助工具的情况下,有时候搞出来的也是蹩脚的语句,

kvm虚拟化学习笔记(六)之kvm虚拟机控制台登录配置

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://koumm.blog.51cto.com/703525/1290996 KVM虚拟化学习笔记系列文章列表 ---------------------------------------- kvm虚拟化学习笔记(一)之kvm虚拟化环境安装http://koumm.blog.51cto.com/703525/1288795 kvm虚拟化学习笔记(二)之linux kvm虚拟机安装 h

前端学习系列教程

Bootstrap学习系列教程 本系列教程是自己在工作中使用到而记录的,如有错误之处,请给与指正 第一章 Bootstrap简介 第二章 时间控件(DateTime Picker) 第三章 续:时间控件(TimePicker) 第四章 标签页 第五章 使用 Bootstrap Typeahead 组件(百度下拉效果) 第六章 使用 Bootstrap Typeahead 组件(百度下拉效果)(续) 第七章 模态框 第八章 让Bootstrap轮播插件carousel支持左右滑动手势的三种方法 第

ExtJs2.0学习系列(2)--Ext.Panel

上一篇文章ExtJs2.0学习系列(1)--Ext.MessageBox ,受到了大家的褒贬不一,还是有的朋友提出好的建议,在此表示感谢! 今天介绍extjs中的Panel组件. //html代码 <div id="container"> </div> //js代码 var p = new Ext.Panel({ title: 'My Panel',//标题 collapsible:true,//右上角上的那个收缩按钮,设为false则不显示 renderTo:

ExtJs2.0学习系列(6)--Ext.FormPanel之第三式(ComboBox篇)

前言:说句实话,此extjs系列的文章在博客园中的热度不高,可能是学这玩意的人不多吧,但是我觉得有这么个系列的文章对于中国朋友非常有帮助!请大家支持! 上篇ExtJs2.0学习系列(5)--Ext.FormPanel之第二式中我们讨论了下fieldset和表单验证的知识,今天我们接着深入解析表单元素中ComboBox组件的使用.会涉及 到.net简单服务器数据交互,但暂不做深入讨论,以后会详细分析服务器交互相关,不过可能要等较长一段时间,呵呵! 5.服务器数据作为ComboBox的数据源实例 首

Silverlight &amp;amp; Blend动画设计系列六

Silverlight & Blend动画设计系列六:动画技巧(Animation Techniques)之对象与路径转化.波感特效 当我们在进行Silverlight & Blend进行动画设计的过程中,可能需要设计出很多效 果不一的图形图像出来作为动画的基本组成元素.然而在设计过程中可能会出现许多的问题 ,比如当前绘制了一个椭圆,但是在动画中仅仅只需要椭圆的一半或是更多更少的部分用作 与动画元素,这时候就需要对椭圆对象进行相应的处理才能满足我们的需求,那到底该怎么 做才能实现最终想要的

JAVA/JSP学习系列之八(改写MySQL翻页例子)

js|mysql|翻页 一.前言 其实,改写后的JDBC Data-Source是运行在Servlet中的,通过JNDI去查找数据源.我用Orion试的,将本站<JAVA/JSP学习系列之六(MySQL翻页例子) > 简单改写了一下. 二.配置 (1)JDBC 需要将用到的JDBC驱动Copy到[ORION]/lib目录下 (2)data-source 在[ORION]/config/data-sources.xml文件中加入如下: 〈data-source class="com.e