Spark SQL Columnar模块源码分析

概述

本文介绍Spark SQL增加的Columnar模块代码实现。

首先介绍Columnar内的代码结构和实现,然后介绍在SqlContext里的使用方式。

Columnar

InMemoryColumnarTableScan

实现

InMemoryColumnarTableScan类是SparkPlan LeafNode的实现,即是一个物理执行计划。

private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
  extends LeafNode {

传入的child是一个SparkPlan(确认了的物理执行计划)和一个属性序列。

 

行转列并cache的过程如下:

  lazy val cachedColumnBuffers = {
    val output = child.output
    // 遍历每个RDD的partiti	on
    val cached = child.execute().mapPartitions { iterator =>
      // 把属性Seq转换成为ColumnBuilder数组
      val columnBuilders = output.map { attribute =>
        // 都是基本ColumnBuilder,默认ByteBuffer大小
        ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
      }.toArray

      var row: Row = null
      // RDD每个Partition的Rows,每个Row的所有field信息存到ColumnBuilder里
      while (iterator.hasNext) {
        row = iterator.next()
        var i = 0
        while (i < row.length) {
          columnBuilders(i).appendFrom(row, i)
          i += 1
        }
      }

      Iterator.single(columnBuilders.map(_.build()))
    }.cache()

    cached.setName(child.toString)
    // Force the materialization of the cached RDD.
    cached.count()
    cached
  }

ColumnType类用于表示Column的类型,他的typeId变量用来区分数据类型,生成对应的ColumnBuilder(typeId, initialSize=0, columnName)。ColumnBuilder的生成如下:

  def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
    val builder = (typeId match {
      case INT.typeId     => new IntColumnBuilder
      case LONG.typeId    => new LongColumnBuilder
      case FLOAT.typeId   => new FloatColumnBuilder
      case DOUBLE.typeId  => new DoubleColumnBuilder
      case BOOLEAN.typeId => new BooleanColumnBuilder
      case BYTE.typeId    => new ByteColumnBuilder
      case SHORT.typeId   => new ShortColumnBuilder
      case STRING.typeId  => new StringColumnBuilder
      case BINARY.typeId  => new BinaryColumnBuilder
      case GENERIC.typeId => new GenericColumnBuilder
    }).asInstanceOf[ColumnBuilder]

    builder.initialize(initialSize, columnName)
    builder
  }

他的继承结构如下,主要有三大体系:

这里涉及到的是Basic这个体系,继承结构如下:


BasicColumnBuilder里,initialSize = 0,指使用ByteBuffer的默认大小,即10*1024*104。然后在initialize()方法,会初始化ByteBuffer。

 

接下来,针对RDD每个partition,

      var row: Row = null
      while (iterator.hasNext) {
        row = iterator.next()
        var i = 0
        while (i < row.length) {
          columnBuilders(i).appendFrom(row, i)
          i += 1
        }
      }

进行了appendFrom操作:

  override def appendFrom(row: Row, ordinal: Int) {
    val field = columnType.getField(row, ordinal)
    buffer = ensureFreeSpace(buffer, columnType.actualSize(field))
    columnType.append(field, buffer)
  }

用于把一个Row的每一个field,都存到一个ColumnBuilder里。在这里指BasicColumnBuilder这个类,维护了一个自己的ByteBuffer,把row里的各个field信息都存在了buffer里。

最后ColumnBuilders里的每个ColumnBuilder进行build(),即BasicColumnBuilder.build()方法,进行了一次ByteBuffer的rewind()方法。

 

这个方法的结果是一个RDD集合。由于在结束前调用了.count()方法,所以RDD的计算是被执行了的,返回的是新的RDD。

在Spark SQL里,外部调用cachedColumnBuffers方法只有在uncache table的时候,进行了unpersisit()操作。

下面看execute()方法:

  override def execute() = {
cachedColumnBuffers.mapPartitions { iterator =>
  // 在RDD partition里,iterator.next()返回的是一个ByteBuffer
  // 也就是说,cachedColumnBuffers返回的结果RDD,类型是ByteBuffer
      val columnBuffers = iterator.next()
      assert(!iterator.hasNext)

      new Iterator[Row] {
	    // 访问每一个ByteBuffer里的列信息
        val columnAccessors = columnBuffers.map(ColumnAccessor(_))
        val nextRow = new GenericMutableRow(columnAccessors.length)

        override def next() = {
          var i = 0
          // 把column里的信息再转到Row里
          while (i < nextRow.length) {
            columnAccessors(i).extractTo(nextRow, i)
            i += 1
          }
          nextRow
        }

        override def hasNext = columnAccessors.head.hasNext
      }
    }
  }

使用

在SqlContext里选择cache table的时候,会使用该类。

其实在cache的时候,首先去catalog里寻找这个table的信息和table的执行计划,然后会进行执行(执行到物理执行计划生成),然后把这个table再放回catalog里维护起来,这个时候的执行计划已经是最终要执行的物理执行计划了。但是此时Columner模块相关的转换等操作都是没有触发的。

真正的触发还是在execute()的时候,同其他SparkPlan的execute()方法触发场景是一样的。

ColumnBuilder 与 ColumnAccessor

一个包装Row的每个field成Column;一个访问column,然后可以转回Row

关于压缩

private[sql] abstract class NativeColumnBuilder[T <: NativeType](
    override val columnStats: NativeColumnStats[T],
    override val columnType: NativeColumnType[T])
  extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)
  with NullableColumnBuilder
  with AllCompressionSchemes
  with CompressibleColumnBuilder[T]

private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)

private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)

从继承结构看,压缩的builder和Accessor都以trait的方式继承了ColumnBuilder,而子类比如IntColumnBuilder,不但继承了BaseColumnBuilder,同时也具备压缩处理能力。

 

具体压缩处理可以参考CompressibleColumnBuilder类里的实现。

是否压缩会做一次判断,压缩比在0.8以下才执行压缩。

在build()的时候实施压缩,并且按照以下结构存在bytebuffer内。

 *    .--------------------------- Column type ID (4 bytes)
 *    |   .----------------------- Null count N (4 bytes)
 *    |   |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
 *    |   |   |     .------------- Compression scheme ID (4 bytes)
 *    |   |   |     |   .--------- Compressed non-null elements
 *    V   V   V     V   V
 *    +---+---+-----+---+---------+
 *    |   |   | ... |   | ... ... |
 *    +---+---+-----+---+---------+
 *    \-----------/ \-----------/
 *        header         body

CompressionScheme子类是不同的压缩实现



都是scala实现的,未借助第三方库。不同的实现,指定了支持的column data类型。在build()的时候,会比较每种压缩,选择压缩率最小的(若仍大于0.8就不压缩了)。

这里的估算能力,在子类实现里,好像是由gatherCompressibilityStats方法实现的。

SqlContext

分析SqlContext内目前cache和uncache table的实现细节与Columnar的关系。

Cache Table

  /** Caches the specified table in-memory. */
  def cacheTable(tableName: String): Unit = {
    // 得到的是一个logicalPlan
    val currentTable = catalog.lookupRelation(None, tableName)

    // 物理执行计划生成之后交给InMemoryColumnarTableScan
    val asInMemoryRelation =
      InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)

    // SparkLogicalPlan接受的Plan必须是已经确定plan好的SparkPlan
    catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
  }

从上面那段代码可以看到,cache之前,需要先把本次cache的table的物理执行计划生成出来。上述的currentTable其实是一个logicalPlan,来自catalog的lookupRelation。

最后注册表的时候,涉及到的SparkLogicalPlan类是LogicalPlan的实现类(但是本身其实是一个SparkPlan),它接受的是SparkPlan,并且是已经确定Plan好了的逻辑执行计划,目前接受两类:ExistingRdd和InMemoryColumnarTableScan。

 

在cache这个过程里,InMemoryColumnarTableScan并没有执行,但是生成了以InMemoryColumnarTableScan为物理执行计划的SparkLogicalPlan,并存成table的plan。

Uncache Table

在这一步,除了删除catalog里的table信息之外,还调用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,并进行了unpersist()操作。cacheColumnBuffers方法具体见Columner内,主要做了把RDD每个partition里的ROW的每个Field存到了ColumnBuilder内。

全文完 :)

时间: 2024-08-31 08:17:46

Spark SQL Columnar模块源码分析的相关文章

Spark MLlib - Decision Tree源码分析

以决策树作为开始,因为简单,而且也比较容易用到,当前的boosting或random forest也是常以其为基础的 决策树算法本身参考之前的blog,其实就是贪婪算法,每次切分使得数据变得最为有序   那么如何来定义有序或无序? 无序,node impurity  对于分类问题,我们可以用熵entropy或Gini来表示信息的无序程度  对于回归问题,我们用方差Variance来表示无序程度,方差越大,说明数据间差异越大 information gain 用于表示,由父节点划分后得到子节点,所

《深入理解Spark:核心思想与源码分析》——第3章SparkContext的初始化

第3章 SparkContext的初始化 道生一, 一生二, 二生三, 三生万物. -<道德经> 本章导读 SparkContext的初始化是Driver应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark的读者理解源码.读者朋友如果能边跟踪代码,边学习本章内容,也许是快速理解SparkContext初始化过程的便捷途径.已经熟练使用Spark的开发人员可以选择跳过本章内容. 本章将在介绍SparkContext初始化过程的同时,向读者介绍

《深入理解Spark:核心思想与源码分析》——2.3节Spark基本设计思想

2.3 Spark基本设计思想2.3.1 Spark模块设计 整个Spark主要由以下模块组成: Spark Core:Spark的核心功能实现,包括:SparkContext的初始化(Driver Application通过SparkContext提交).部署模式.存储体系.任务提交与执行.计算引擎等. Spark SQL:提供SQL处理能力,便于熟悉关系型数据库操作的工程师进行交互查询.此外,还为熟悉Hadoop的用户提供Hive SQL处理能力. Spark Streaming:提供流式计

《深入理解Spark:核心思想与源码分析》——第1章环境准备

第1章 环 境 准 备 凡事豫则立,不豫则废:言前定,则不跲:事前定,则不困. -<礼记·中庸> 本章导读 在深入了解一个系统的原理.实现细节之前,应当先准备好它的源码编译环境.运行环境.如果能在实际环境安装和运行Spark,显然能够提升读者对于Spark的一些感受,对系统能有个大体的印象,有经验的技术人员甚至能够猜出一些Spark采用的编程模型.部署模式等.当你通过一些途径知道了系统的原理之后,难道不会问问自己:"这是怎么做到的?"如果只是游走于系统使用.原理了解的层面,

《深入理解Spark:核心思想与源码分析》——1.4节Spark源码编译与调试

1.4 Spark源码编译与调试 1.下载Spark源码 首先,访问Spark官网http://spark.apache.org/,如图1-18所示. 2.构建Scala应用 使用cmd命令行进到Spark根目录,执行sbt命令.会下载和解析很多jar包,要等很长时间,笔者大概花了一个多小时才执行完. 3.使用sbt生成Eclipse工程文件 等sbt提示符(>)出现后,输入Eclipse命令,开始生成Eclipse工程文件,也需要花费很长时间,笔者本地大致花了40分钟.完成时的状况如图1-21

《深入理解Spark:核心思想与源码分析》——1.5节小结

1.5 小结 本章通过引导大家在Linux操作系统下搭建基本的执行环境,并且介绍spark-shell等脚本的执行,来帮助读者由浅入深地进行Spark源码的学习.由于目前多数开发工作都在Windows系统下进行,并且Eclipse有最广大的用户群,即便是一些开始使用IntelliJ的用户对Eclipse也不陌生,所以在Windows环境下搭建源码阅读环境时,选择这些最常用的工具,能降低读者的学习门槛,并且替大家节省时间.

《深入理解Spark:核心思想与源码分析》——2.2节Spark基础知识

2.2 Spark基础知识 1.版本变迁 经过4年多的发展,Spark目前的版本是1.4.1.我们简单看看它的版本发展过程. 1)Spark诞生于UCBerkeley的AMP实验室(2009). 2)Spark正式对外开源(2010年). 3)Spark 0.6.0版本发布(2012-10-15),进行了大范围的性能改进,增加了一些新特性,并对Standalone部署模式进行了简化. 4)Spark 0.6.2版本发布(2013-02-07),解决了一些bug,并增强了系统的可用性. 5)Spa

《深入理解Spark:核心思想与源码分析》——第2章Spark设计理念与基本架构

第2章 Spark设计理念与基本架构 若夫乘天地之正,而御六气之辩,以游无穷者,彼且恶乎待哉? -<庄子·逍遥游> 本章导读 上一章,介绍了Spark环境的搭建,为方便读者学习Spark做好准备.本章首先从Spark产生的背景开始,介绍Spark的主要特点.基本概念.版本变迁.然后简要说明Spark的主要模块和编程模型.最后从Spark的设计理念和基本架构入手,使读者能够对Spark有宏观的认识,为之后的内容做一些准备工作. Spark是一个通用的并行计算框架,由加州伯克利大学(UCBerke

《深入理解Spark:核心思想与源码分析》——3.6节创建任务调度器TaskScheduler

3.6 创建任务调度器TaskScheduler TaskScheduler也是SparkContext的重要组成部分,负责任务的提交,并且请求集群管理器对任务调度.TaskScheduler也可以看做任务调度的客户端.创建TaskScheduler的代码如下. private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) createTaskSchedu