Spark-SparkSQL深入学习系列九(转自OopsOutOfMemory)

  /** Spark SQL源码分析系列文章*/

    Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率。

    这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage、Column Based Storage、 PAX Storage。

    Spark SQL 的内存数据是如何组织的?

    Spark SQL 将数据加载到内存是以列的存储结构。称为In-Memory Columnar Storage。

    若直接存储Java Object 会产生很大的内存开销,并且这样是基于Row的存储结构。查询某些列速度略慢,虽然数据以及载入内存,查询效率还是低于面向列的存储结构。

基于Row的Java Object存储:

内存开销大,且容易FULL GC,按列查询比较慢。


基于Column的ByteBuffer存储(Spark SQL):

内存开销小,按列查询速度较快。


    Spark SQL的In-Memory Columnar Storage是位于spark列下面org.apache.spark.sql.columnar包内:

    核心的类有 ColumnBuilder,  InMemoryColumnarTableScan, ColumnAccessor, ColumnType.

    如果列有压缩的情况:compression包下面有具体的build列和access列的类。

    

一、引子

    当我们调用spark sql 里的cache table command时,会生成一CacheCommand,这个Command是一个物理计划。

[java] view
plain
 copy

  1. scala> val cached = sql("cache table src")  

[java] view
plain
 copy

  1. cached: org.apache.spark.sql.SchemaRDD =   
  2. SchemaRDD[0] at RDD at SchemaRDD.scala:103  
  3. == Query Plan ==  
  4. == Physical Plan ==  
  5. CacheCommand src, true  

这里打印出来tableName是src, 和一个是否要cache的boolean flag.

我们看下CacheCommand的构造:

CacheCommand支持2种操作,一种是把数据源加载带内存中,一种是将数据源从内存中卸载。

对应于SQLContext下的cacheTable和uncacheTabele。  

[java] view
plain
 copy

  1. case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)  
  2.   extends LeafNode with Command {  
  3.   
  4.   override protected[sql] lazy val sideEffectResult = {  
  5.     if (doCache) {  
  6.       context.cacheTable(tableName) //缓存表到内存  
  7.     } else {  
  8.       context.uncacheTable(tableName)//从内存中移除该表的数据  
  9.     }  
  10.     Seq.empty[Any]  
  11.   }  
  12.   override def execute(): RDD[Row] = {  
  13.     sideEffectResult  
  14.     context.emptyResult  
  15.   }  
  16.   override def output: Seq[Attribute] = Seq.empty  
  17. }  

如果调用cached.collect(),则会根据Command命令来执行cache或者uncache操作,这里我们执行cache操作。

cached.collect()将会调用SQLContext下的cacheTable函数:

首先通过catalog查询关系,构造一个SchemaRDD。

[java] view
plain
 copy

  1. /** Returns the specified table as a SchemaRDD */  
  2. def table(tableName: String): SchemaRDD =  
  3.   new SchemaRDD(this, catalog.lookupRelation(None, tableName))  

找到该Schema的analyzed计划。匹配构造InMemoryRelation:

[java] view
plain
 copy

  1. /** Caches the specified table in-memory. */  
  2. def cacheTable(tableName: String): Unit = {  
  3.   val currentTable = table(tableName).queryExecution.analyzed //构造schemaRDD并将其执行analyze计划操作  
  4.   val asInMemoryRelation = currentTable match {  
  5.     case _: InMemoryRelation => //如果已经是InMemoryRelation,则返回  
  6.       currentTable.logicalPlan  
  7.   
  8.     case _ => //如果不是(默认刚刚cache的时候是空的)则构建一个内存关系InMemoryRelation  
  9.       InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)  
  10.   }  
  11.   //将构建好的InMemoryRelation注册到catalog里。  
  12.   catalog.registerTable(None, tableName, asInMemoryRelation)  
  13. }  

二、InMemoryRelation

 InMemoryRelation继承自LogicalPlan,是Spark1.1 Spark SQL里新添加的一种TreeNode,也是catalyst里的一种plan. 现在TreeNode变成了4种:

1、BinaryNode 二元节点

2、LeafNode 叶子节点

3、UnaryNode 单孩子节点

4、InMemoryRelation 内存关系型节点

 

类图如下:

值得注意的是,_cachedColumnBuffers这个类型为RDD[Array[ByteBuffer]]的私有字段。

这个封装就是面向列的存储ByteBuffer。前面提到相较于plain java object存储记录,用ByteBuffer能显著的提高存储效率,减少内存占用。并且按列查询的速度会非常快。

InMemoryRelation具体实现如下:

构造一个InMemoryRelation需要该Relation的output Attributes,是否需要useCoompression来压缩,默认为false,一次处理的多少行数据batchSize, child 即SparkPlan。

[java] view
plain
 copy

  1. private[sql] case class InMemoryRelation(  
  2.     output: Seq[Attribute], //输出属性,比如src表里就是[key,value]  
  3.     useCompression: Boolean, //操作时是否使用压缩,默认false  
  4.     batchSize: Int, //批的大小量  
  5.     child: SparkPlan) //spark plan 具体child  

可以通过设置:

spark.sql.inMemoryColumnarStorage.compressed 为true来设置内存中的列存储是否需要压缩。

spark.sql.inMemoryColumnarStorage.batchSize 来设置一次处理多少row

spark.sql.defaultSizeInBytes 来设置初始化的column的bufferbytes的默认大小,这里只是其中一个参数。

这些参数都可以在源码中设置,都在SQL Conf

[java] view
plain
 copy

  1. private[spark] object SQLConf {  
  2.   val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"  
  3.   val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"   
  4.   val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"  

 再回到case class InMemoryRelation:

_cachedColumnBuffers就是我们最终将table放入内存的存储句柄,是一个RDD[Array[ByteBuffer]。

缓存主流程:

1、判断_cachedColumnBuffers是否为null,如果不是null,则已经Cache了当前table,重复cache不会触发cache操作。

2、child是SparkPlan,即执行hive table scan,测试我拿sbt/sbt hive/console里test里的src table为例,操作是扫描这张表。这个表有2个字的key是int, value 是string

3、拿到child的output, 这里的output就是 key, value2个列。

4、执行mapPartitions操作,对当前RDD的每个分区的数据进行操作。

5、对于每一个分区,迭代里面的数据生成新的Iterator。每个Iterator里面是Array[ByteBuffer]

6、对于child.output的每一列,都会生成一个ColumnBuilder,最后组合为一个columnBuilders是一个数组。

7、数组内每个CommandBuilder持有一个ByteBuffer

8、遍历原始分区的记录,将对于的行转为列,并将数据存到ByteBuffer内。

9、最后将此RDD调用cache方法,将RDD缓存。

10、将cached赋给_cachedColumnBuffers。

此操作总结下来是:执行hive table scan操作,返回的MapPartitionsRDD对其重新定义mapPartition方法,将其行转列,并且最终cache到内存中。

所有流程如下:

[java] view
plain
 copy

  1. // If the cached column buffers were not passed in, we calculate them in the constructor.  
  2. // As in Spark, the actual work of caching is lazy.  
  3. if (_cachedColumnBuffers == null) { //判断是否已经cache了当前table  
  4.   val output = child.output  
  5.     /** 
  6.          * child.output 
  7.         res65: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#6, value#7) 
  8.          */  
  9.   val cached = child.execute().mapPartitions { baseIterator =>  
  10.     /** 
  11.      * child.execute()是Row的集合,迭代Row 
  12.      * res66: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238]) 
  13.      *  
  14.      * val row1 = child.execute().take(1) 
  15.      * res67: Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([238,val_238]) 
  16.      * */  
  17.     /* 
  18.      * 对每个Partition进行map,映射生成一个Iterator[Array[ByteBuffer],对应java的Iterator<List<ByteBuffer>> 
  19.      * */  
  20.     new Iterator[Array[ByteBuffer]] {  
  21.       def next() = {  
  22.         //遍历每一列,首先attribute是key 为 IntegerType ,然后attribute是value是String  
  23.         //最后封装成一个Array, index 0 是 IntColumnBuilder, 1 是StringColumnBuilder  
  24.         val columnBuilders = output.map { attribute =>  
  25.           val columnType = ColumnType(attribute.dataType)  
  26.           val initialBufferSize = columnType.defaultSize * batchSize  
  27.           ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)  
  28.         }.toArray  
  29.         //src表里Row是[238,val_238] 这行Row的length就是2  
  30.         var row: Row = null  
  31.         var rowCount = 0  
  32.         //batchSize默认1000  
  33.         while (baseIterator.hasNext && rowCount < batchSize) {  
  34.           //遍历每一条记录  
  35.           row = baseIterator.next()  
  36.           var i = 0  
  37.           //这里row length是2,i的取值是0 和 1  
  38.           while (i < row.length) {  
  39.             //获取columnBuilders, 0是IntColumnBuilder,   
  40.             //BasicColumnBuilder的appendFrom  
  41.             //Appends `row(ordinal)` to the column builder.  
  42.             columnBuilders(i).appendFrom(row, i)  
  43.             i += 1  
  44.           }  
  45.           //该行已经插入完毕  
  46.           rowCount += 1  
  47.         }  
  48.         //limit and rewind,Returns the final columnar byte buffer.  
  49.         columnBuilders.map(_.build())  
  50.       }  
  51.   
  52.       def hasNext = baseIterator.hasNext  
  53.     }  
  54.   }.cache()  
  55.   
  56.   cached.setName(child.toString)  
  57.   _cachedColumnBuffers = cached  
  58. }  

三、Columnar Storage

初始化ColumnBuilders:

[java] view
plain
 copy

  1. val columnBuilders = output.map { attribute =>  
  2.               val columnType = ColumnType(attribute.dataType)  
  3.               val initialBufferSize = columnType.defaultSize * batchSize  
  4.               ColumnBuilder(columnType.typeId, initialBufferSize, attribute.name, useCompression)  
  5.             }.toArray  

这里会声明一个数组,来对应每一列的存储,如下图:

然后初始化类型builder的时候会传入的参数:

initialBufferSize:文章开头的图中会有ByteBuffer,ByteBuffer的初始化大小是如何计算的?

initialBufferSize = 列类型默认长度 × batchSize ,默认batchSize是1000

拿Int类型举例,initialBufferSize of IntegerType = 4 * 1000 

attribute.name即字段名age,name etc。。。

ColumnType:

ColumnType封装了 该类型的 typeId  和  该类型的 defaultSize。并且提供了extract、append\getField方法,来向buffer里追加和获取数据。

如IntegerType  typeId 为0, defaultSize 4 ......

详细看下类图,画的不是非常严格的类图,主要为了展示目前类型系统:

ColumnBuilder:

ColumnBuilder的主要职责是:管理ByteBuffer,包括初始化buffer,添加数据到buffer内,检查剩余空间,和申请新的空间这几项主要职责。

initialize负责初始化buffer。

appendFrom是负责添加数据。

ensureFreeSpace确保buffer的长度动态增加。

类图如下:

ByteBuffer的初始化过程:

初始化大小initialSize:拿Int举例,在前面builder初始化传入的是4×batchSize=4*1000,initialSize也就是4KB,如果没有传入initialSize,则默认是1024×1024。

列名称,是否需要压缩,都是需要传入的。

ByteBuffer声明时预留了4个字节,为了放column type id,这个在ColumnType的构造里有介绍过。

[java] view
plain
 copy

  1. override def initialize(  
  2.     initialSize: Int,  
  3.     columnName: String = "",  
  4.     useCompression: Boolean = false) = {  
  5.   
  6.   val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize //如果没有默认1024×1024 byte  
  7.   this.columnName = columnName  
  8.   
  9.   // Reserves 4 bytes for column type ID  
  10.   buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize) // buffer的初始化长度,需要加上4byte类型ID空间。  
  11.   buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)//根据nativeOrder排序,然后首先放入typeId  
  12. }  

存储的方式如下:

Int的type id 是0, string的 type id 是 7. 后面就是实际存储的数据了。

ByteBuffer写入过程:

存储结构都介绍完毕,最后开始对Table进行scan了,scan后对每一个分区的每个Row进行操作遍历:

1、读每个分区的每条Row

2、获取每个列的值,从builders数组里找到索引 i 对应的bytebuffer,追加至bytebuffer。

[java] view
plain
 copy

  1. while (baseIterator.hasNext && rowCount < batchSize) {  
  2.            //遍历每一条记录  
  3.            row = baseIterator.next()  
  4.            var i = 0  
  5.            //这里row length是2,i的取值是0 和 1 Ps:还是拿src table做测试,每一个Row只有2个字段,key, value所有长度为2  
  6.            while (i < row.length) {  
  7.              //获取columnBuilders, 0是IntColumnBuilder,   
  8.              //BasicColumnBuilder的appendFrom  
  9.              //Appends `row(ordinal)` to the column builder.  
  10.              columnBuilders(i).appendFrom(row, i) //追加到对应的bytebuffer  
  11.              i += 1  
  12.            }  
  13.            //该行已经插入完毕  
  14.            rowCount += 1  
  15.          }  
  16.          //limit and rewind,Returns the final columnar byte buffer.  
  17.          columnBuilders.map(_.build())  

追加过程:

根据当前builder的类型,从row的对应索引中取出值,最后追加到builder的bytebuffer内。

[java] view
plain
 copy

  1. override def appendFrom(row: Row, ordinal: Int) {  
  2.   //ordinal是Row的index,0就是第一列值,1就是第二列值,获取列的值为field  
  3.   //最后在将该列的值put到该buffer内  
  4.   val field = columnType.getField(row, ordinal)  
  5.   buffer = ensureFreeSpace(buffer, columnType.actualSize(field))//动态扩容  
  6.   columnType.append(field, buffer)  
  7. }  

ensureFreeSpace:

主要是操作buffer,如果要追加的数据大于剩余空间,就扩大buffer。

[java] view
plain
 copy

  1. //确保剩余空间能容下,如果剩余空间小于 要放入的大小,则重新分配一看内存空间  
  2. private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {  
  3.   if (orig.remaining >= size) { //当前buffer剩余空间比要追加的数据大,则什么都不做,返回自身  
  4.     orig  
  5.   } else { //否则扩容  
  6.     // grow in steps of initial size  
  7.     val capacity = orig.capacity()  
  8.     val newSize = capacity + size.max(capacity / 8 + 1)  
  9.     val pos = orig.position()  
  10.   
  11.     orig.clear()  
  12.     ByteBuffer  
  13.       .allocate(newSize)  
  14.       .order(ByteOrder.nativeOrder())  
  15.       .put(orig.array(), 0, pos)  
  16.   }  
  17. }  

......

最后调用MapPartitionsRDD.cache(),将该RDD缓存并添加到spark cache管理中。

至此,我们将一张spark sql table缓存到了spark的jvm中。

四、总结

    对于数据的存储结构,我们常常关注持久化的存储结构,并且在长久时间内有了很多种高效结构。

    但是在实时性的要求下,内存数据库越来越被关注,如何优化内存数据库的存储结构,是一个重点,也是一个难点。

    对于Spark SQL 和 Shark 里的列存储 是一种优化方案,提高了关系查询中列查询的速度,和减少了内存占用。但是中存储方式还是比较简单的,没有额外的元数据和索引来提高查询效率,希望以后能了解到更多的In-Memory Storage。

——EOF——

创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/39525483

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

时间: 2024-10-23 08:02:04

Spark-SparkSQL深入学习系列九(转自OopsOutOfMemory)的相关文章

Spark修炼之道系列教程预告

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 部分内容会在实际编写时动态调整,或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据开

Asp.Net Web API 2 官网菜鸟学习系列导航[持续更新中]

原文:Asp.Net Web API 2 官网菜鸟学习系列导航[持续更新中] 前言 本来一直参见于微软官网进行学习的, 官网网址http://www.asp.net/web-api.出于自己想锻炼一下学习阅读英文文章的目的,又可以学习下微软新发布的技术,其实也很久了,但自己菜鸟一枚,对自己来说都是新技术了.鉴于以上两个原因,本人打算借助google翻译和有道词典,来翻译学习这个系列,并通过博客园来记录自己的翻译学习过程.由于自己阅读水平的确太菜,在借助工具的情况下,有时候搞出来的也是蹩脚的语句,

kvm虚拟化学习笔记(九)之kvm虚拟机时间配置

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://koumm.blog.51cto.com/703525/1291862 KVM虚拟化学习笔记系列文章列表 ---------------------------------------- kvm虚拟化学习笔记(一)之kvm虚拟化环境安装http://koumm.blog.51cto.com/703525/1288795 kvm虚拟化学习笔记(二)之linux kvm虚拟机安装 h

ExtJs2.0学习系列(2)--Ext.Panel

上一篇文章ExtJs2.0学习系列(1)--Ext.MessageBox ,受到了大家的褒贬不一,还是有的朋友提出好的建议,在此表示感谢! 今天介绍extjs中的Panel组件. //html代码 <div id="container"> </div> //js代码 var p = new Ext.Panel({ title: 'My Panel',//标题 collapsible:true,//右上角上的那个收缩按钮,设为false则不显示 renderTo:

ExtJs2.0学习系列(6)--Ext.FormPanel之第三式(ComboBox篇)

前言:说句实话,此extjs系列的文章在博客园中的热度不高,可能是学这玩意的人不多吧,但是我觉得有这么个系列的文章对于中国朋友非常有帮助!请大家支持! 上篇ExtJs2.0学习系列(5)--Ext.FormPanel之第二式中我们讨论了下fieldset和表单验证的知识,今天我们接着深入解析表单元素中ComboBox组件的使用.会涉及 到.net简单服务器数据交互,但暂不做深入讨论,以后会详细分析服务器交互相关,不过可能要等较长一段时间,呵呵! 5.服务器数据作为ComboBox的数据源实例 首

JAVA/JSP学习系列之八(改写MySQL翻页例子)

js|mysql|翻页 一.前言 其实,改写后的JDBC Data-Source是运行在Servlet中的,通过JNDI去查找数据源.我用Orion试的,将本站<JAVA/JSP学习系列之六(MySQL翻页例子) > 简单改写了一下. 二.配置 (1)JDBC 需要将用到的JDBC驱动Copy到[ORION]/lib目录下 (2)data-source 在[ORION]/config/data-sources.xml文件中加入如下: 〈data-source class="com.e

ExtJs2.0学习系列

ExtJs2.0学习系列(15)--extjs换肤 ExtJs2.0学习系列(14)--Ext.TreePanel之第三式(可增删改的树) ExtJs2.0学习系列(13)--Ext.TreePanel之第二式 ExtJs2.0学习系列(12)--Ext.TreePanel之第一式 ExtJs2.0学习系列(11)--Ext.XTemplate ExtJs2.0学习系列(10)--Ext.TabPanel之第二式 ExtJs2.0学习系列(9)--Ext.TabPanel之第一式 ExtJs2.

ExtJs2.0学习系列(12)--Ext.TreePanel之第一式

今天开始,我们就开始一起学习TreePanel了,道个歉,上篇的代码很乱阿. 我总是喜欢用最简单的例子开始,去理解最基本的使用方法,减少对i后面高级使用的干扰! TreePanel是继承自Panel,所以很多在Panel中谈到的属性这里可能会一笔带过,如有问题,请参考ExtJs2.0学习系列(2)--Ext.Panel 1.第一个静态树--最简单的树 效果图: html代码: <div id="container"> </div> js代码: Ext.onRea

ExtJs2.0学习系列(3)--Ext.Window

前面介绍了panel组件--ExtJs2.0学习系列(2)--Ext.Panel,今天将介绍window组件,它继承自panel. 先介绍个最简单例子 //html代码 <div id="win" class="x-hidden"> </div> //js代码 var w=new Ext.Window({ contentEl:"win",//主体显示的html元素,也可以写为el:"win" width