BigData – Join中竟然也有谓词下推!?

上文简要介绍了Join在大数据领域中的使用背景以及常用的几种算法-broadcast hash join 、shuffle hash join以及sort merge join等,对每一种算法的核心应用场景也做了相关介绍,这里再重点说明一番:大表与小表进行join会使用broadcast hash join,一旦小表稍微大点不再适合广播分发就会选择shuffle hash join,最后,两张大表的话无疑选择sort merge join。

好了,问题来了,说是这么一说,但到底选择哪种算法归根结底是SQL执行引擎干的事情,按照上文逻辑,SQL执行引擎肯定要知道参与Join的两表大小,才能选择最优的算法喽!那么斗胆问一句,怎么知道两表大小?衡量两表大小的是物理大小还是纪录多少抑或两者都有?其实,这是另一门学问-基于代价优化(Cost Based Optimization,简称CBO),它不仅能够解释Join算法的选择问题,更重要的,它还能确定多表联合Join场景下的Join顺序问题。

是不是对CBO很期待呢?好吧,这里先刨个坑,下一个话题我们再聊。那今天要聊点什么呢?Join算法选择、Join顺序选择确实对Join性能影响极大,但,还有一个很重要的因素对Join的性能至关重要,那就是Join算法优化!无论是broadcast hash join、shuffle hash join还是sort merge join,都是最基础的join算法,有没有什么优化方案呢?还真有,这就是今天要聊的主角-Runtime Filter(下文简称RF)

RF预备知识:bloom filter

RF说白了是使用bloomfilter对参与join的表进行过滤,减少实际参与join的数据量。为了下文详细解释整个流程,有必要先解释一下bloomfilter这个数据结构(对之熟悉的看官可以绕道)。Bloom Filter使用位数组来实现过滤,初始状态下位数组每一位都为0,如下图所示:

假如此时有一个集合S = {x1, x2, … xn},Bloom Filter使用k个独立的hash函数,分别将集合中的每一个元素映射到{1,…,m}的范围。对于任何一个元素,被映射到的数字作为对应的位数组的索引,该位会被置为1。比如元素x1被hash函数映射到数字8,那么位数组的第8位就会被置为1。下图中集合S只有两个元素x和y,分别被3个hash函数进行映射,映射到的位置分别为(0,3,6)和(4,7,10),对应的位会被置为1:

现在假如要判断另一个元素是否是在此集合中,只需要被这3个hash函数进行映射,查看对应的位置是否有0存在,如果有的话,表示此元素肯定不存在于这个集合,否则有可能存在。下图所示就表示z肯定不在集合{x,y}中:

RF算法理论

为了更好地说明整个过程,这里使用一个SQL示例对RF算法进行完整讲解,SQL:select item.name, order.* from order , item where order.item_id = item.id and item.category = ‘book’,其中order为订单表,item为商品表,两张表根据商品id字段进行join,该SQL意为取出商品类别为书籍的所有订单详情。假设商品类型为书籍的商品并不多,join算法因此确定为broadcast hash join。整个流程如下图所示:

Step 1:将item表的join字段(item.id)经过多个hash函数映射处理为一个bloomfilter(如果对bloomfilter不了解,自行google)

Step 2:将映射好的bloomfilter分别广播到order表的所有partition上,准备进行过滤

Step 3:以Partition2为例,存储进程(比如DataNode进程)将order表中join列(order.item_id)数据一条一条读出来,使用bloomfilter进行过滤。淘汰该订单数据不是书籍相关商品的订单,这条数据直接跳过;否则该条订单数据有可能是待检索订单,将该行数据全部扫描出来。

Step 4:将所有未被bloomfilter过滤掉的订单数据,通过本地socket通信发送到计算进程(impalad)。

Step 5:再将所有书籍商品数据广播到所有Partition节点与step4所得订单数据进行真正的hashjoin操作,得到最终的选择结果

RF算法分析

上面通过一个SQL示例简单演示了整个RF算法在broadcast hash join中的操作流程,根据流程对该算法进行一下理论层次分析:

  • RF本质:通过谓词( bloomfilter)下推,在存储层通过bloomfilter对数据进行过滤,可以从三个方面实现对Join的优化。其一,如果可以跳过很多记录,就可以减少了数据IO扫描次数。这点需要重点解释一下,许多朋友会有这样的疑问:既然需要把数据扫描出来使用BloomFilter进行过滤,为什么还会减少IO扫描次数呢?这里需要关注一个事实:大多数表存储行为都是列存,列之间独立存储,扫描过滤只需要扫描join列数据(而不是所有列),如果某一列被过滤掉了,其他对应的同一行的列就不需要扫描了,这样减少IO扫描次数。其二,减少了数据从存储层通过socket(甚至TPC)发送到计算层的开销,其三,减少了最终hash join执行的开销。
  • RF代价:对照未使用RF的Broadcast Hash Join来看,前者主要增加了bloomfilter的生成、广播以及大表根据bloomfilter进行过滤这三个开销。通常情况下,这几个步骤在小表较小的情况下代价并不大,基本可以忽略。
  • RF优化效果:基本取决于bloomfilter的过滤效果,如果大量数据被过滤掉了,那么join的性能就会得到极大提升;否则性能提升就会有限。
  • RF实现:和常见的谓词下推(’=‘,’>’,’<‘等)一样,RF实现需要在计算层以及存储层分别进行相关逻辑实现,计算层要构造bloomfilter并将bloomfilter下传到存储层,存储层要实现使用该bloomfilter对指定数据进行过滤。

RF效果验证

事实上,RF这个东东的优化效果是在组内同事何大神做impala on parquet以及impala on kudu的基准对比测试的时候分析发现的。实际测试中,impala on parquet 比之impala on kudu性能有明显优势,目测至少10倍性能提升。同一SQL解析引擎,不同存储引擎,性能竟然天壤之别!为了分析具体原因,同事就使用impala的执行计划分析工具对两者的执行计划分别进行了分析,才透过蛛丝马迹发现前者使用了RF,而后者并没有(当然可能还有其他因素,但RF肯定是原因之一)。

简单复盘一下这次测试吧,基准测试使用TPCDS测试,数据规模为1T,本文使用测试过程中的一个典型SQL(Q40)作为示例对RF的神奇功效进行回放演示。下图是Q40的对比性能,直观上来看RF可以直接带来40x的性能提升,40倍哎,这到底是怎么做到的?

先来简单看看Q40的SQL语句,如下所示,看起来比较复杂,核心涉及到3个表(catalog_sales join date_dim 、catalog_sales join warehouse 、catalog_sales join item)的join操作:

select  

   w_state

  ,i_item_id

  ,sum(case when (cast(d_date as date) < cast (‘1998-04-08’ as date)) 

                then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_before

  ,sum(case when (cast(d_date as date) >= cast (‘1998-04-08’ as date)) 

                then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_after

 from

   catalog_sales left outer join catalog_returns on

       (catalog_sales.cs_order_number = catalog_returns.cr_order_number 

        and catalog_sales.cs_item_sk = catalog_returns.cr_item_sk)

  ,warehouse 

  ,item

  ,date_dim

 where

     i_current_price between 0.99 and 1.49

 and item.i_item_sk          = catalog_sales.cs_item_sk

 and catalog_sales.cs_warehouse_sk    = warehouse.w_warehouse_sk 

 and catalog_sales.cs_sold_date_sk    = date_dim.d_date_sk

 and date_dim.d_date between ‘1998-03-09’ and ‘1998-05-08’

 group by

    w_state,i_item_id

 order by w_state,i_item_id

limit 100;

典型的星型结构,其中catalog_sales是事实表,其他表为纬度表。本次分析选择其中catalog_sales join item这个纬度的join。因为对比测试中两者的SQL解析引擎都是使用impala,所以SQL执行计划基本都相同。在此基础上,来看看执行计划中单个执行节点在执行catalog_sales join item操作时由先到后的主要阶段耗时,其中只贴出来重要耗时阶段(Q40中Join算法为shuffle hash join,与上文所举broadcast hash join示例略有不同,不过不影响结论):

impala on kudu(without runtime filter) impala on parquet(with runtime filter)
total time  43s996ms 2s385ms
bloomfilter生成
Filter 0 arrival: 857ms

Filter 1 arrival: 879ms

Filter 2 arrival: 939ms

大表scan扫描
HDFS_SCAN_NODE (id=0):(Total: 3s479ms)

   – RowsRead: 72.01M 

   – RowsReturned: 72.01M 

   – RowsReturnedRate: 20.69 M/s


HDFS_SCAN_NODE (id=0):(Total: 2s011ms)

  – RowsRead: 72.01M 

  – RowsReturned: 35.92K 

  – RowsReturnedRate: 17.86 K/sec

Filter 0 (1.00 MB):

  – Rows processed: 72.01M

  – Rows rejected: 71.43M 

  – Rows total: 72.01M

Filter 1 (1.00 MB):

  – Rows processed: 49.15K 

  – Rows rejected: 126 

  – Rows total: 49.15K 

Filter 2 (1.00 MB):

  – Rows processed: 584.38K 

  – Rows rejected: 548.46K 

  – Rows total: 584.38K 

数据加载计算进程内存
DataStreamSender (dst_id=11):(Total: 15s984ms)

– NetworkThroughput(*): 298.78 MB/sec

– OverallThroughput: 100.85 MB/sec

– RowsReturned: 72.01M

– SerializeBatchTime: 10s567ms

– TransmitDataRPCTime: 5s395ms

DataStreamSender (dst_id=11):(Total: 10.725ms)

 – NetworkThroughput(*): 244.06 MB/sec

 – OverallThroughput: 71.23 MB/sec

 – RowsReturned: 35.92K

 – SerializeBatchTime: 7.544ms

 – TransmitDataRPCTime: 3.130ms

           

Hash Join
HASH_JOIN_NODE (id=5):(Total: 19s104ms)

– BuildPartitionTime: 862.560ms

– BuildRows: 8.99M 

– BuildRowsPartitioned: 8.99M

– BuildTime: 373.855ms

– …… 

– ProbeRows: 90.00M 

– ProbeRowsPartitioned: 0 (0)

– ProbeTime: 17s628ms

– RowsReturned: 90.00M

– RowsReturnedRate: 985.85 K/s

– SpilledPartitions: 0 (0)

– UnpinTime: 960.000ns


HASH_JOIN_NODE (id=6):(Total: 21.707ms)

– BuildPartitionTime: 3.487ms

– BuildRows: 18.81K (18814)

– BuildRowsPartitioned: 18.81K

– BuildTime: 646.817us

– …… 

– ProbeRows: 85.28K (85278)

– ProbeRowsPartitioned: 0 (0)

– ProbeTime: 6.396ms

– RowsReturned: 85.27K

– RowsReturnedRate: 38.88 K/s

– SpilledPartitions: 0 (0)

– UnpinTime: 915.000ns

经过对两种场景执行计划的解析,可以基本验证上文所做的基本理论结果:

1. 确认经过RF之后大表的数据量得到大量滤除,只剩下少量数据参与最终的HashJoin。参见第二行大表scan扫描结果,未使用rf的返回结果有7千万行+纪录,而经过RF过滤之后满足条件的只有3w+纪录。3万相比7千万,性能优化效果自然不言而喻。

2. 经过RF滤除之后,少量数据经过网络从存储进程加载到计算进程内存的网络耗时大量减少。参见第三行“数据加载到计算进程内存”,前者耗时15s,后者耗时仅仅11ms。主要耗时分为两部分,其中数据序列化时间占到2/3-10s左右,数据经过RPC传输时间占另外1/3 -5s左右。

3. 最后,经过RF滤除之后,参与到最终Hash Join的数据量大幅减少,Hash Join耗时前者是19s,后者是21ms左右。主要耗时在于大表Probe Time,前者消耗了17s左右,而后者仅需6ms。

说好的谓词下推呢?

讲真,刚开始接触RF的时候觉得这简直是一个实实在在的神器,崇拜之情溢于言表。然而,经过一段时间的探索消化,直至把这篇文章写完,也就是此时此刻,忽然觉得它并不高深莫测,说白了就是一个谓词下推,不同的是这里的谓词稍微奇怪一点,是一个bloomfilter而已。

提到谓词下推,这里再引申一下下。以前经常满大街听到谓词下推,然而对谓词下推却总感觉懵懵懂懂,并不明白的很真切。经过RF的洗礼,现在确信有了更进一步的理解。这里拿出来和大家交流交流。个人认为谓词下推有两个层面的理解:

  • 其一是逻辑执行计划优化层面的说法,比如SQL语句:select * from order ,item where item.id = order.item_id and item.category = ‘book’,正常情况语法解析之后应该是先执行Join操作,再执行Filter操作。通过谓词下推,可以将Filter操作下推到Join操作之前执行。即将where item.category = ‘book’下推到 item.id = order.item_id之前先行执行。
  • 其二是真正实现层面的说法,谓词下推是将过滤条件从计算进程下推到存储进程先行执行,注意这里有两种类型进程:计算进程以及存储进程。计算与存储分离思想,这在大数据领域相当常见,比如最常见的计算进程有SparkSQL、Hive、impala等,负责SQL解析优化、数据计算聚合等,存储进程有HDFS(DataNode)、Kudu、HBase,负责数据存储。正常情况下应该是将所有数据从存储进程加载到计算进程,再进行过滤计算。谓词下推是说将一些过滤条件下推到存储进程,直接让存储进程将数据过滤掉。这样的好处显而易见,过滤的越早,数据量越少,序列化开销、网络开销、计算开销这一系列都会减少,性能自然会提高。

写到这里,忽然意识到笔者在上文出现了一个很严重的认知错误:RF机制并不仅仅是一个简单的谓词下推,它的精髓在于提出了一个重要的谓词-bloomfilter。当前对RF支持的系统并不多,笔者只知道目前唯有Impala on Parquet进行了支持。Impala on Kudu虽说Impala支持,但Kudu并不支持。SparkSQL on Parqeut中虽有存储系统支持,无奈计算引擎-SparkSQL目前还不支持。

本文主要介绍了一种类似于semi-join的优化方法,对优化细节进行了深入地探讨,并结合分析过程对谓词下推技术谈了谈自己的理解。下篇文章将会为看官带来基于代价优化(CBO)相关的议题,期待哦~

本文转载自:http://hbasefly.com

原文链接

时间: 2024-08-03 19:28:08

BigData – Join中竟然也有谓词下推!?的相关文章

Java FP: Java中函数式编程的谓词函数(Predicates)第二部分

在上一篇文章中我们介绍了谓词函数.通过一个简单的只带一个返回值是true或者false的函数的接口,把函数式编程语言的优势带入到了类似Java的面向对象编程语言中.这一小节,我们将会介绍一些高级特性,方便你高效利用谓词函数. 测试 在测试代码中使用谓词的优势尤为明显.当你需要测试一个混合了数据结构与某些条件逻辑的方法时,通过使用谓词,你可以先单独测试数据结构,再测试条件逻辑. 第一步,先利用永真谓词或者永假谓词屏蔽用于判断的逻辑,将注意力集中在测试数据结构上: 1 // check with t

Java FP: Java中函数式编程的谓词函数(Predicates)第一部分

你一直在听说函数式编程将称霸整个编程届,而自己仍然沉浸在普通的Java里?请不要担心,因为你已经在日常Java代码中加入了函数式编程的特性.此外,函数式编程很有趣,能够帮你节省多行代码并且降低错误率. 什么是谓词函数? 许久之前,那时我还在用Java 1.4进行编码,当我第一次发现Apache Commons Collections,便爱上了谓词函数.Apache Commons Collections里的谓词函数仅仅只是一个只有一个方法的接口: evaluate(Object object):

c++中 算法sort带 谓词的怎么回事

问题描述 c++中 算法sort带 谓词的怎么回事 请大神详细解释一下这个带谓词的sort算法的使用与规则 c++中 算法sort带 谓词的怎么回事 大神求帮助 解决方案 简单来说,就是用一个函数指针传一个自己写的比较函数,告诉sort你的排序规则.从而实现按照特定方式(比如结构体排序按照某个字段排序,或者升序降序排序,或者字符串排序按照ascii顺序还是按照长度)排序. 比如 int cmp(const void * a, const void * b) { return *(int *)a

sql语句中left join、inner join中的on与where的区别

原文:sql语句中left join.inner join中的on与where的区别 table a(id, type): id     type ---------------------------------- 1      1         2      1          3      2          table b(id, class): id    class --------------------------------- 1      1 2      2 sql语

SQL语句Left join 中On和Where的用法区别

原文地址:点击打开链接 SQL语句如下: SELECT * FROM 表1 LEFT JOIN 表2 ON 表1.id = 表2.id AND 表2.Name != 'ff' WHERE 表1.NAME != 'aa'        步骤1:返回笛卡尔积(SELECT * FROM 表1 CROSS JOIN 表2) 步骤2:应用ON筛选器(当前的条件为  表1.id = 表2.id AND 表2.Name != 'ff') 步骤3:添加外部行 这一步只对OUTER JOIN起作用,如果是LEF

深入Oracle的left join中on和where的区别详解_oracle

今天遇到一个求某月所有天数的统计结果,如果某日的结果是0也需要显示出来,即: 日期                  交易次数   交易金额 2009-4-01           1              10 2009-4-02           2              20 2009-4-03           0              0 2009-4-04          5                50 .... 一开始我用的左连接,用on做为两表关联条件,

BigData-‘基于代价优化’究竟是怎么一回事?

还记得笔者在上篇文章无意中挖的一个坑么?如若不知,强烈建议看官先行阅读前面两文-<SparkSQL – 有必要坐下来聊聊Join>和<BigData – Join中竟然也有谓词下推!?>.第一篇文章主要分析了大数据领域Join的三种基础算法以及各自的适用场景,第二篇文章在第一篇的基础上进一步深入,讨论了Join基础算法的一种优化方案  – Runtime Filter,文章最后还引申地聊了聊谓词下推技术.同时,在第二篇文章开头,笔者引出了两个问题,SQL执行引擎如何知晓参与Join

开源大数据周刊-第52期

阿里云E-Mapreduce动态 E-MapReduce调度功能添加重试机制 ## 资讯 重磅|MapD宣布开源:在多GPU服务器上二次查询数十亿条记录的核心数据库和代 全球人工智能:专注为AI开发者提供全球最新AI技术动态和社群交流.用户来源包括:北大.清华.中科院.复旦.麻省理工.卡内基梅隆.斯坦福.哈佛.牛津.剑桥等世界名校的AI技术硕士.博士和教授:以及谷歌.腾讯.百度.脸谱.微软.华为.阿里.海康威视.滴滴.英伟达等全球名企的AI开发者和AI科学家. 实时离线融合在唯品会的进展:在实时

开源大数据周刊-第48期

资讯 云数据库厂商Snowflake获一亿美元融资,Iconiq领投Snowflake Computing,是一家提供数据存储和查询服务的云数据库公司,以方便分析师用BI工具进行分析.该公司近日宣布获得一亿美元的融资. 多省市出台大数据发展规划 14地产值目标过2.8万亿日前,数据中心联盟大数据发展促进委员会发布的报告显示,中国大数据产业正在形成五大产业集聚区,剔除重复因素,已确定2020年大数据产业规模目标的14个省市的规划总和已达28400亿元,远远超过了国家整体规划的目标. 未来医疗科技大