Spark-SparkSQL深入学习系列十(转自OopsOutOfMemory)

    /** Spark SQL源码分析系列文章*/

    前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的。

    那么基于以上存储结构,我们查询cache在jvm内的数据又是如何查询的,本文将揭示查询In-Memory Data的方式。

一、引子

本例使用hive console里查询cache后的src表。

select value from src

当我们将src表cache到了内存后,再次查询src,可以通过analyzed执行计划来观察内部调用。

即parse后,会形成InMemoryRelation结点,最后执行物理计划时,会调用InMemoryColumnarTableScan这个结点的方法。

如下:

[java] view
plain
 copy

  1. scala> val exe = executePlan(sql("select value from src").queryExecution.analyzed)  
  2. 14/09/26 10:30:26 INFO parse.ParseDriver: Parsing command: select value from src  
  3. 14/09/26 10:30:26 INFO parse.ParseDriver: Parse Completed  
  4. exe: org.apache.spark.sql.hive.test.TestHive.QueryExecution =   
  5. == Parsed Logical Plan ==  
  6. Project [value#5]  
  7.  InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
  8.   
  9. == Analyzed Logical Plan ==  
  10. Project [value#5]  
  11.  InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
  12.   
  13. == Optimized Logical Plan ==  
  14. Project [value#5]  
  15.  InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
  16.   
  17. == Physical Plan ==  
  18. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)) //查询内存中表的入口  
  19.   
  20. Code Generation: false  
  21. == RDD ==  

二、InMemoryColumnarTableScan

InMemoryColumnarTableScan是Catalyst里的一个叶子结点,包含了要查询的attributes,和InMemoryRelation(封装了我们缓存的In-Columnar Storage数据结构)。

执行叶子节点,出发execute方法对内存数据进行查询。

1、查询时,调用InMemoryRelation,对其封装的内存数据结构的每个分区进行操作。

2、获取要请求的attributes,如上,查询请求的是src表的value属性。

3、根据目的查询表达式,来获取在对应存储结构中,请求列的index索引。

4、通过ColumnAccessor来对每个buffer进行访问,获取对应查询数据,并封装为Row对象返回。

[java] view
plain
 copy

  1. private[sql] case class InMemoryColumnarTableScan(  
  2.     attributes: Seq[Attribute],  
  3.     relation: InMemoryRelation)  
  4.   extends LeafNode {  
  5.   
  6.   
  7.   override def output: Seq[Attribute] = attributes  
  8.   
  9.   
  10.   override def execute() = {  
  11.     relation.cachedColumnBuffers.mapPartitions { iterator =>  
  12.       // Find the ordinals of the requested columns.  If none are requested, use the first.  
  13.       val requestedColumns = if (attributes.isEmpty) {  
  14.         Seq(0)  
  15.       } else {  
  16.         attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) //根据表达式exprId找出对应列的ByteBuffer的索引  
  17.       }  
  18.   
  19.   
  20.       iterator  
  21.         .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//根据索引取得对应请求列的ByteBuffer,并封装为ColumnAccessor。  
  22.         .flatMap { columnAccessors =>  
  23.           val nextRow = new GenericMutableRow(columnAccessors.length) //Row的长度  
  24.           new Iterator[Row] {  
  25.             override def next() = {  
  26.               var i = 0  
  27.               while (i < nextRow.length) {  
  28.                 columnAccessors(i).extractTo(nextRow, i) //根据对应index和长度,从byterbuffer里取得值,封装到row里  
  29.                 i += 1  
  30.               }  
  31.               nextRow  
  32.             }  
  33.   
  34.   
  35.             override def hasNext = columnAccessors.head.hasNext  
  36.           }  
  37.         }  
  38.     }  
  39.   }  
  40. }  

查询请求的列,如下:

[java] view
plain
 copy

  1. scala> exe.optimizedPlan  
  2. res93: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  3. Project [value#5]  
  4.  InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
  5.   
  6.   
  7. scala> val relation =  exe.optimizedPlan(1)  
  8. relation: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   
  9. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)  
  10.   
  11.   
  12. scala> val request_relation = exe.executedPlan  
  13. request_relation: org.apache.spark.sql.execution.SparkPlan =   
  14. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None))  
  15.   
  16.   
  17. scala> request_relation.output //请求的列,我们请求的只有value列  
  18. res95: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)  
  19.   
  20. scala> relation.output //默认保存在relation中的所有列  
  21. res96: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#4, value#5)  
  22.   
  23.   
  24. scala> val attributes = request_relation.output   
  25. attributes: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)  

整个流程很简洁,关键步骤是第三步。根据ExprId来查找到,请求列的索引

attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))

[java] view
plain
 copy

  1. //根据exprId找出对应ID  
  2. scala> val attr_index = attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))  
  3. attr_index: Seq[Int] = ArrayBuffer(1) //找到请求的列value的索引是1, 我们查询就从Index为1的bytebuffer中,请求数据  
  4.   
  5. scala> relation.output.foreach(e=>println(e.exprId))  
  6. ExprId(4)    //对应<span style="font-family: Arial, Helvetica, sans-serif;">[key#4,value#5]</span>  
  7. ExprId(5)  
  8.   
  9. scala> request_relation.output.foreach(e=>println(e.exprId))  
  10. ExprId(5)  

三、ColumnAccessor

ColumnAccessor对应每一种类型,类图如下:

最后返回一个新的迭代器:

[java] view
plain
 copy

  1. new Iterator[Row] {  
  2.   override def next() = {  
  3.     var i = 0  
  4.     while (i < nextRow.length) { //请求列的长度  
  5.       columnAccessors(i).extractTo(nextRow, i)//调用columnType.setField(row, ordinal, extractSingle(buffer))解析buffer  
  6.       i += 1  
  7.     }  
  8.     nextRow//返回解析后的row  
  9.   }  
  10.   
  11.   override def hasNext = columnAccessors.head.hasNext  
  12. }  

四、总结

    Spark SQL In-Memory Columnar Storage的查询相对来说还是比较简单的,其查询思想主要和存储的数据结构有关。

    即存储时,按每列放到一个bytebuffer,形成一个bytebuffer数组。

    查询时,根据请求列的exprId查找到上述数组的索引,然后使用ColumnAccessor对buffer中字段进行解析,最后封装为Row对象,返回。

——EOF——

创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/39577419

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

时间: 2024-07-31 06:00:04

Spark-SparkSQL深入学习系列十(转自OopsOutOfMemory)的相关文章

WorldWind系列十五:如何切割影像和DEM数据及其在WW中的应用配置

WorldWind学习系列十四中我从代码上分析如何加载DEM数据,里面涉及了算法,有学习和借鉴意义.但对于只求应用或者说是急于求成的网友来说,实用价值不是太大!我们分析代码是一种学习过程,不是目的,终究要落在如何应用自己的影像和DEM数据.如何基于自己的数据开发满足自己项目需求的新的WW插件.这是一过程,需要循序渐进,今天只跟大家分享一下如何切割自己的影像或DEM,及如何设置相应的XML配置. 我的学习和实践,主要是参考http://worldwindcentral.com/wiki/Dstil

【玩转数据系列十二】PAI平台深度学习Caffe框架实现图像分类的模型训练

PAI平台深度学习Caffe框架实现图像分类的模型训练 背景 我们在之前的文章中介绍过如何通过PAI内置的TensorFlow框架实验基于Cifar10的图像分类,文章链接:https://yq.aliyun.com/articles/72841.使用Tensorflow做深度学习做深度学习的网络搭建和训练需要通过PYTHON代码才能使用,对于不太会写代码的同学还是有一定的使用门槛的.本文将介绍另一个深度学习框架Caffe,通过Caffe只需要填写一些配置文件就可以实现图像分类的模型训练.关于P

Silverlight &amp;amp; Blend动画设计系列十

Silverlight & Blend动画设计系列十:Silverlight中的坐标系统(Coordinate System)与向量(Vector)运动 如果我们习惯于数学坐标系,那么对于Silverlight中的坐标系可能会有些不习惯.因为 在Silverlight中的坐标系与Flash中的坐标系一样,一切都的颠倒的.在标准的数学坐标系 中,X轴表示水平轴,Y轴表是垂直轴,然而Silverlight中的坐标系是基于视频屏幕的坐标系 . Silverlight中的坐标系统和Flash中的坐标系统

【玩转数据系列十四】如何通过PAI实现云端实时心脏状况监测

背景 我们通过之前的案例已经为大家介绍了如何通过常规的体检数据预测心脏病的发生,请见https://yq.aliyun.com/articles/54260.通过前文的案例我们可以生成一个算法模型,通过向这个模型输入用户实时的体检数据就会返回用户患有心胀病的概率.那么我们该如何搭建这套实时监测用户健康情况的服务呢?PAI最新推出的在线预测服务帮您实现.目前,机器学习PAI已经支持实验模型一键部署到云端生成API,通过向这个API推送用户的实时体检数据,就可以实时拿到反馈结果,做到心脏状况的云端的

kvm虚拟化学习笔记(十五)之kvm虚拟机动态迁移

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://koumm.blog.51cto.com/703525/1300783 相比KVM虚拟机静态迁移中需要拷贝虚拟机虚拟磁盘文件,kvm虚拟机动态迁移无需拷贝虚拟磁盘文件,但是需要迁移到的虚拟主机之间需要有相同的目录结构虚拟机磁盘文件,本文这部分内容通过nfs来实现,当然也可以采用GFS2集群文件系统来实现,本文的动态迁移是基于共享存储动态迁移. KVM动态迁移目前有两种,一种是基于

kvm虚拟化学习笔记(十六)之kvm虚拟化存储池配置

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://koumm.blog.51cto.com/703525/1304196 KVM虚拟化学习笔记系列文章列表 ---------------------------------------- kvm虚拟化学习笔记(一)之kvm虚拟化环境安装http://koumm.blog.51cto.com/703525/1288795 kvm虚拟化学习笔记(二)之linuxkvm虚拟机安装htt

kvm虚拟化学习笔记(十二)之kvm linux虚拟机在线扩展磁盘

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://koumm.blog.51cto.com/703525/1295296 KVM虚拟化学习笔记系列文章列表 ---------------------------------------- kvm虚拟化学习笔记(一)之kvm虚拟化环境安装http://koumm.blog.51cto.com/703525/1288795 kvm虚拟化学习笔记(二)之linux kvm虚拟机安装 h

kvm虚拟化学习笔记(十八)之ESXi到KVM之v2v迁移

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://koumm.blog.51cto.com/703525/1304461 KVM虚拟化学习笔记系列文章列表 ---------------------------------------- kvm虚拟化学习笔记(一)之kvm虚拟化环境安装http://koumm.blog.51cto.com/703525/1288795 kvm虚拟化学习笔记(二)之linuxkvm虚拟机安装htt

Spark修炼之道系列教程预告

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 部分内容会在实际编写时动态调整,或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据开