每天万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践

背景

横看成岭侧成峰,

远近高低各不同。

不识庐山真面目,

只缘身在此山中。

不同的视角我们所看到的物体是不一样的,

http://t.m.china.com.cn/convert/c_ovWL9w.html

图为墨西哥城放射状的街区广场。

图为西班牙迷宫般的果树漩涡。

地心说和日心说也是视角不同所呈现的。

实际上数据也有这样,我们每天产生海量的数据,有各种属性,以每个属性为视角(分组、归类、聚合),看到的是对应属性为中心的数据。

对应的业务场景也非常多,例如:

1、物联网,

每个传感器有很多属性:ID,地理位置,传感器归属,各类指标等。

以传感器的角度进行观察,观察某个传感器ID在流金岁月中的值。

以归属角度(例如归属于公安、城管、某家公司、。。。)进行观察,

以地理位置为视角进行观察,。。。。

2、车联网、站长网。。。。

按车辆、按客户、按访问者。。。多重视角进行观察

观察在数据库中可以触发两种行为,一种是实时计算的行为,另一种是数据规整的行为。

数据规整指将数据按视角重新归类存放。(例如在云端汇聚了各个网站的被访问记录,站长只关注他自己的网站的被访问记录,当需要向站长提供数据时,可以按网站进行数据规整)。

那么如何在云端实现实时分析、准实时数据归类的需求呢?

1 架构

HybridDB for PostgreSQL是阿里云的一款分析型MPP数据库产品(基于Greenplum开源版本而来,新增了插件功能、云端特性以及内核代码优化),提供了水平扩展的能力以及强大的分析SQL兼容性,同时与云端海量存储OSS进行了深度整合,可以并行读写OSS,将OSS作为数据存储来使用。

实时计算架构

海量数据源,写入OSS,通过HybridDB for PostgreSQL的oss_ext插件,实时分析写入的数据。

OSS带宽指标:目前每个计算节点每个会话约30MB/s的读写速率。

对于列式存储格式,数值类型。1亿记录约381MB,压缩比5:1的话,约76.3MB。

按行换算的性能指标:2048个计算节点,读写吞吐约 805亿行/s。每天处理6900万亿行(当然,按多列进出打个折扣,万亿级别也是没有问题的)。

准实时数据规整架构

实时数据规整的目的是按视角将数据规整,数据进入OSS时,是打乱的。由HybridDB for PostgreSQL对接重分布分组规整后,再写出到OSS,形成规整的数据。

为什么需要重分布?

前面谈到了视角问题,我们可能有多重视角来观察数据,而在数据库中只能选择一种固定的分布键,当视角与之不同时,就需要重分布。

准实时导出的优化:

对于一个视角,可能有少量或多种属性,例如用户实际,假设有100万个用户,如果每个计算节点分别导出100万用户,每个用户对应到OSS的一个规整文件,那么由于文件数过多,导出会较慢。

那么可以对用户重分布,例如1000个节点,每个节点分配到1000个用户的数据,这样的话,并行写出到OSS时,一下子就降低到了每个节点写1000个文件的规模。

如何强制重分布呢?后面讲到。

2 实时计算

HybridDB for PostgreSQL与OSS对接的详细文档请参考:

https://help.aliyun.com/document_detail/35457.html

简略步骤如下:

1、创建OSS用户

2、创建OSS BUCKET,例如每个小时一个BUCKET

3、写入数据(最好写入小文件,数量为HybridDB for PostgreSQL的倍数)

4、在HybridDB for PostgreSQL中创建OSS外部表,例如每个小时一个

5、直接读取OSS外表进行分析

3 准实时数据规整

1、需求

按不同的视觉维度,进行分组,每个视觉属性规整到一个OSS文件。也就是说一个OSS文件不能存在多个对象。

2、查询

比如需要按视角分组,按时间排序输出。

select 聚合函数(t order by 时间) from tbl t group by 视角字段;

以上SQL,数据库可能不会按视角字段重分布,而是使用两阶段提交的方式。例如

postgres=# create table tbl(uid int, info text, c1 int);
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'uid' as the Greenplum Database data distribution key for this table.
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE TABLE
表按UID分布  

但是查询不按它,看看会不会重分布  

postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from tbl t group by c1;
                                               QUERY PLAN
--------------------------------------------------------------------------------------------------------
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.06 rows=1 width=36)
   ->  GroupAggregate  (cost=0.03..0.06 rows=1 width=36)
         Group By: c1
         ->  Sort  (cost=0.03..0.04 rows=1 width=36)
               Sort Key: c1
	       -- 按c1重分布
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=36)
                     Hash Key: c1
                     ->  Seq Scan on tbl t  (cost=0.00..0.00 rows=1 width=36)
 Settings:  optimizer=off
 Optimizer status: legacy query optimizer
(10 rows)  

这条是不被期望的,因为发生了两次聚合。
我们在将数据写入OSS时,不希望两次聚合。  

postgres=# explain select max(c1) from tbl group by info;
                                            QUERY PLAN
--------------------------------------------------------------------------------------------------
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.04..0.05 rows=1 width=36)
   -- 第二次聚合
   ->  HashAggregate  (cost=0.04..0.05 rows=1 width=36)
         Group By: tbl.info
         -- 重分布
	 ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.01..0.03 rows=1 width=36)
               Hash Key: tbl.info
               -- 第一次聚合
	       ->  HashAggregate  (cost=0.01..0.01 rows=1 width=36)
                     Group By: tbl.info
                     ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=36)
 Settings:  optimizer=off
 Optimizer status: legacy query optimizer
(10 rows)

强制按指定键重分布

postgres=# explain select row_number() over (partition by c1 order by info), * from tbl;
                                               QUERY PLAN
--------------------------------------------------------------------------------------------------------
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.04 rows=1 width=40)
   ->  Window  (cost=0.03..0.04 rows=1 width=40)
         Partition By: c1
         Order By: info
         ->  Sort  (cost=0.03..0.04 rows=1 width=40)
               Sort Key: c1, info
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=40)
                     Hash Key: c1
                     ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=40)
 Settings:  optimizer=off
 Optimizer status: legacy query optimizer
(11 rows)

按强制重分布, 改写SQL

使用窗口查询,将数据强制重分布,然后再进行计算节点的原地聚合。

postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from (select row_number() over (partition by c1 order by info), * from tbl) t group by c1;
                                                     QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.07 rows=1 width=36)
   -- 计算节点原地聚合
   ->  GroupAggregate  (cost=0.03..0.07 rows=1 width=36)
         Group By: t.c1
         ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=36)
               ->  Window  (cost=0.03..0.04 rows=1 width=40)
                     Partition By: tbl.c1
                     Order By: tbl.info
                     -- 按指定要求的顺序排序,例如按时间
		     ->  Sort  (cost=0.03..0.04 rows=1 width=40)
                           Sort Key: tbl.c1, tbl.info
                           -- 按C1重分布
			   ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=40)
                                 Hash Key: tbl.c1
                                 ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=40)
 Settings:  optimizer=off
 Optimizer status: legacy query optimizer
(14 rows)

3、自定义聚合函数

Greenplum的自定义聚合与单节点聚合不同,一种为单阶段模式,另一种为两阶段聚合模式。

单阶段模式,将数据收到MASTER后进行聚合。流水:

初始值INITCOND,MASTER过程函数SFUNC,MASTER FINAL函数FINALFUNC。

两阶段模式先在数据节点并行执行,然后在MASTER执行第二阶段。流水:

初始值INITCOND,数据节点过程函数SFUNC(数据节点并行执行),MASTER聚合函数PREFUNC,MASTER FINAL函数FINALFUNC。

SFUNC操作流水如下

1、每个节点调用sfunc聚合,输入参数为(input_type数据 , 临时结果stype),输出为stype。处理第一条记录时,临时结果stype为 NULL 或 初始值INITCOND。

postgres=# \h create aggre
Command:     CREATE AGGREGATE
Description: define a new aggregate function
Syntax:
CREATE AGGREGATE name ( input_data_type [ , ... ] ) (
    SFUNC = sfunc,
    STYPE = state_data_type
    [ , PREFUNC = prefunc ]
    [ , FINALFUNC = ffunc ]
    [ , INITCOND = initial_condition ]
    [ , SORTOP = sort_operator ]
)  

or the old syntax  

CREATE AGGREGATE name (
    BASETYPE = base_type,
    SFUNC = sfunc,
    STYPE = state_data_type
    [ , FINALFUNC = ffunc ]
    [ , INITCOND = initial_condition ]
    [ , SORTOP = sort_operator ]
)

两阶段聚合优化方法如下

在节点调用sfunc聚合,输入参数为(input_type数据 , 临时结果stype),输出为stype

sfunc( internal-state, next-data-values ) ---> next-internal-state

segment第一阶段收集结果传输到master调用prefunc,输入(stype , stype),得到的结果为stype

prefunc( internal-state, internal-state ) ---> next-internal-state

最后再将stype转换为聚合的输出类型即可(可选使用finalfunc)。

hll_union_agg 优化例子

CREATE AGGREGATE gp_hll_union_agg (hll) (
  SFUNC = hll_union,
  prefunc = hll_union, -- 第二阶段函数
  STYPE = hll
);

hll_add_agg 优化例子

# select hll_empty();
  hll_empty
--------------
 \021\213\177
(1 row)  

CREATE AGGREGATE gp_hll_add_agg (hll_hashval) (
  SFUNC = hll_add,
  STYPE = hll,
  prefunc = hll_union, -- 第二阶段函数
  initcond='\021\213\177'  -- 初始值
);

但是请注意,由于在segment节点sfunc执行完没有断点接口,所以我们无法在SEGMENT节点直接将一阶段聚合的数据写入到OSS。(除非改GPDB代码,加入一个断点接口。)

怎么办呢?

通过UDF函数来实现,并要求它在每个数据节点单独执行。

create or replace function f(gid int, v anyarray) returns void as $$
declare
  oss_ext_tbl name;
begin
  oss_ext_tbl := 'ext_tbl_'||gid;
  execute format ('insert into %I select unnest(%L)', oss_ext_tbl, v);
end;
$$ language plpgsql strict;

虽然这是一种方法,但是这种方式依旧不是最高效的,因为还有一次聚合的过程。

更高效率的方法是首先对数据重分布和排序,同时在导出到文件时自动根据上下文的VALUE变化,切换文件,根据新的VALUE命名并写入新文件。

这部分工作需要修改数据库的导出代码来实现。

4、并行写出到OSS

实现了在导出到文件时自动根据上下文的VALUE变化,切换文件,根据新的VALUE命名并写入新文件这部分工作后,规整数据变得异常简单。

1、非规整外部表(来源表)

例子

create external table origin (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)
.........  -- 外部表OSS位置
;

同样需要使用这种方法进行强制重分布

按UID规整,按crt_time排序

postgres=# explain select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;
                                                  QUERY PLAN
--------------------------------------------------------------------------------------------------------------
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.05 rows=1 width=32)
   ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=32)
         ->  Window  (cost=0.03..0.04 rows=1 width=44)
               Partition By: tbl.uid
               Order By: tbl.crt_time
               ->  Sort  (cost=0.03..0.04 rows=1 width=44)
                     Sort Key: tbl.uid, tbl.crt_time
                     ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=44)
                           Hash Key: tbl.uid
                           ->  Seq Scan on origin tbl  (cost=0.00..0.00 rows=1 width=44)
 Settings:  optimizer=off
 Optimizer status: legacy query optimizer
(12 rows)

2、创建规整后OSS外部表

参考 阿里云HybridDB for PostgreSQL OSS存储用法

create external table dest (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)
.........  -- 外部表OSS位置
;

3、将数据写入规整后OSS外部表

postgres=# explain insert into dest select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;
                                                     QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
 Insert (slice0; segments: 48)  (rows=1 width=32)
         ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=32)
               ->  Window  (cost=0.03..0.04 rows=1 width=44)
                     Partition By: tbl.uid
                     Order By: tbl.crt_time
                     ->  Sort  (cost=0.03..0.04 rows=1 width=44)
                           Sort Key: tbl.uid, tbl.crt_time
                           ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=44)
                                 Hash Key: tbl.uid
                                 ->  Seq Scan on origin tbl  (cost=0.00..0.00 rows=1 width=44)
 Settings:  optimizer=off
 Optimizer status: legacy query optimizer
(14 rows)

小结

使用HybridDB for PostgreSQL,同时实现了实时分析,准实时数据规整两个需求。

OSS作为海量数据入口,HDB作为OSS的计算引擎,实现海量数据实时分析。

同时HDB作为数据规整引擎,被规整的数据不需要在数据库本地落地,直接从OSS到OSS,只是用到了HDB的规整能力。

性能可以通过扩展HDB的计算节点线性扩展:

海量数据源,写入OSS,通过HybridDB for PostgreSQL的oss_ext插件,实时分析写入的数据。

OSS带宽指标:目前每个计算节点每个会话约30MB/s的读写速率。

对于列式存储格式,数值类型。1亿记录约381MB,压缩比5:1的话,约76.3MB。

按行换算的性能指标:2048个计算节点,读写吞吐约 805亿行/s。每天处理6900万亿行(当然,按多列进出打个折扣,万亿级别也是没有问题的)。

参考

阿里云HybridDB for PostgreSQL

阿里云HybridDB for PostgreSQL OSS存储用法

《Greenplum 性能评估公式 - 阿里云HybridDB for PostgreSQL最佳实践》

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《Postgres-XC customized aggregate introduction》

《PostgreSQL aggregate function customize》

时间: 2024-10-30 19:03:23

每天万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践的相关文章

动态输出(ToB海量日志转换业务) - 阿里云HybridDB for PostgreSQL最佳实践

标签 PostgreSQL , UDF , 动态格式 , format , JOIN , OSS外部表 背景 有一些业务需要将数据归类动态的输出,比如一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样. 比如这个业务: <日增量万亿+级 实时分析.数据规整 - 阿里云HybridDB for PostgreSQL最佳实践> 一.需求 1.可以根据ToB的用户的定义,输出不同的格式. 2.每个ToB的用户,写入到一

Greenplum 空间(GIS)数据检索 B-Tree &amp; GiST 索引实践 - 阿里云HybridDB for PostgreSQL最佳实践

标签 PostgreSQL , GIS , PostGIS , Greenplum , 空间检索 , GiST , B-Tree , geohash 背景 气象数据.地震数据.室内定位.室外定位.手机.车联网.还有我们最喜欢的"左划不喜欢.右划喜欢",越来越多的位置属性的数据.将来会越来越多. 基于GIS的数据分析.OLTP业务也越来越受到决策者的青睐,例如商场的选址决策,O2O的广告营销等.有很多基于多边形.时间.用户对象属性过滤的需求. 阿里云HybridDB for Postgr

分布式DB数据倾斜的原因和解法 - 阿里云HybridDB for PostgreSQL最佳实践

标签 PostgreSQL , Greenplum , query倾斜 , 存储倾斜 , OOM , disk full , 短板 , 数据分布 背景 对于分布式数据库来说,QUERY的运行效率取决于最慢的那个节点. 当数据出现倾斜时,某些节点的运算量可能比其他节点大.除了带来运行慢的问题,还有其他的问题,例如导致OOM,或者DISK FULL等问题. 如何监控倾斜 1.监控数据库级别倾斜 postgres=# select gp_execution_dbid(), datname, pg_si

MPP分布式数据库性能评估方法 - 阿里云HybridDB for PostgreSQL最佳实践

背景 通常评估一个数据库的性能,可以选择工业标准测试,或者根据业务模型,建模进行测试. 例如PostgreSQL pgbench支持的tpc-b测试,以及自定义模型测试. benchmarksql支持的tpc-c测试. gp_tpch支持的tpc-h测试等等. 参考文档如下 <TPC-H测试 - PostgreSQL 10 vs Deepgreen(Greenplum)> <PostgreSQL 使用 pgbench 测试 sysbench 相关case> <PostgreS

分布式DB锁问题排查方法 - 阿里云HybridDB for PostgreSQL最佳实践

标签 PostgreSQL , Greenplum , 锁 , SEGMENT不一致 , gp_session_role=utility , gp_dist_random 背景 Greenplum(GPDB)是一个分布式数据库,分布式数据库的锁管理比单机更加复杂.例如在加锁时,需要对所有节点加锁(包括MASTER和所有的SEGMENT节点),在释放锁时,则需要释放所有节点的锁. 如果在释放过程中,MASTER的锁释放了,而SEGMENT锁没有释放,会造成什么问题呢? 不用说,会有很诡异的问题出现

行存、列存,堆表、AO表性能对比 - 阿里云HDB for PostgreSQL最佳实践

标签 PostgreSQL , GIS , PostGIS , Greenplum , 空间检索 , GiST , B-Tree , geohash 背景 <Greenplum 行存.列存,堆表.AO表的原理和选择> 以上文档详细的介绍了行存.列存,堆表.AO表的原理以及选择的依据. <一个简单算法可以帮助物联网,金融 用户 节约98%的数据存储成本 (PostgreSQL,Greenplum帮你做到)> 以上文档介绍了提升基于列存的全局数据压缩比的方法. <解密上帝之手 -

如何检测、清理Greenplum垃圾 - 阿里云HybridDB for PG最佳实践

标签 PostgreSQL , Greenplum , HDB for PG 背景 Greenplum通过多版本支持数据的删除和更新的并发和回滚,在删除数据时(使用DELETE删除),对记录的头部xmax值进行标记.在删除记录时,对记录的头部进行标记,同时插入新的版本. 这一就会导致一个问题,如果用户经常删除和插入或更新数据,表和索引都会膨胀. PostgreSQL是通过HOT技术以及autovacuum来避免或减少垃圾的.但是Greenplum没有自动回收的worker进程,所以需要人为的触发

云端流计算、在线业务、实时分析 闭环设计 - 阿里云RDS、HybridDB for PostgreSQL最佳实践

背景 水的流动汇成江河大海,孕育生命,形成大自然生态.数据流动,推进社会进步,拓展业务边界. <从人类河流文明 洞察 数据流动的重要性> 以某淘系业务案例展开,看看用户如何利用阿里云RDS PostgreSQL,HybridDB for PostgreSQL,海量对象存储OSS,打造一个从流计算到在线业务,再到数据分析和挖掘的业务,发挥数据的价值,拓展业务的边界. 业务简介 一个电商业务通常会涉及 商家.门店.物流.用户.支付渠道.贷款渠道.商品.平台.小二.广告商.厂家.分销商.店主.店员.

音视图(泛内容)网站透视分析 DB设计 - 阿里云(RDS、HybridDB) for PostgreSQL最佳实践

标签 PostgreSQL , 用户透视 , 设备透视 , 圈人 , 标签 , 视频网站 , 优酷 , 土豆 , 喜马拉雅 背景 日常生活中,人们使用最多的除了社交类网站.购物网站,估计就是音频.视频.图文信息类内容网站了. 视频网站,已经渗透到各种终端,除了喜闻乐见的手机,还包括移动终端.电脑.盒子.电视.投影仪等.有设备属性.会员属性.渠道属性等. 内容运营是非常重要的环节,而透视则是运营的重要武器. 业务需求 1.生成设备.会员画像 ID.各个维度的标签.其中包括一些多值列标签(例如最近7