PostgreSQL、Greenplum DML合并操作 最佳实践

标签

PostgreSQL , Greenplum , 合并删除 , 合并更新 , 合并DML


背景

在很多场景中会涉及到数据的合并,比如

1. 某业务系统的总用户有1亿,每天的活跃用户有100万,新增用户10万,每天需要将新增、活跃用户的数据(比如他们的余额变化、等等)合并到数据仓库的用户信息表。

2. 物化视图,某个表被用户不断的增、删、改。需要将这个表(基表)的某些字段或者某部分数据提取到一个物化视图中。这个物化视图不需要对每一笔基表的DML都实施操作,比如对单条记录的操作,合并成一次操作。

3. 数据同步,将OLTP的数据,同步到OLAP系统,由于OLAP系统的事务处理能力没有TP系统强,所以也必须采用合并的方法,同一条记录被多次更新时,需要将多次更新合并成一次更新。

4. 基于REDO日志的逻辑数据复制,优化手段除了并行复制,还有一种就是合并复制。

不管是哪种数据合并,被合并的表最好是有主键的,本文也假设有主键来处理。否则会增加复杂度(需要使用整行记录来区分),而且整行记录有一个缺陷,例如根据行号定位重复记录中的一条,这样变更后,合并时可能会出错。

数据合并的方法

对于以上几种情况,比较复杂的是逻辑数据复制,它可能涉及到任意操作,单个KEY可能被删除,后续这个KEY又被插入、多次更新的情况。

同时还需要考虑事务一致性的问题,每一次合并操作都需要保证一致性。例如基于REDO的逻辑复制,对于未结束的事务产生的REDO,不能参与合并。

保证单个KEY,在合并时只操作一次,同时确保未结束的事务不参与合并。

REDO要素

table : 库\schema\表名

old : 主键值

new : 新插入的值 、 被变更的字段变更后的值

tag : insert 、 update 、 delete 、 truncate

例子

以逻辑复制为例,分解一下数据合并的过程。

创建测试表

create table tbl (pk1 int, pk2 int, c1 int, c2 text, crt_time timestamp, primary key(pk1,pk2));

产生一些DML

...忽略中间部分, 假设忽略的这部分已经同步到目标表...  

delete from tbl where pk1=2 and pk2=2;   -- 说明目标表已经存在pk1=2,pk2=2的记录
insert into tbl (pk1,pk2,c1,c2,crt_time) values (2,2,2,'test22','2017-02-14');
insert into tbl (pk1,pk2,c1,c2,crt_time) values (1,1,2,'test23','2017-02-14');
insert into tbl (pk1,pk2,c1,c2,crt_time) values (1,2,3,'test34','2017-02-14');  

update tbl set c2='new', crt_time=null where pk1=1 and pk2=2;  

delete from tbl where pk1=1 and pk2=1;  

insert into tbl (pk1,pk2,c1,c2,crt_time) values (1,1,5,'test56','2017-02-14');  

update tbl set c2='new11', crt_time=null where pk1=1 and pk2=1;
update tbl set c2='new12', crt_time='2017-02-15' where pk1=1 and pk2=2;

将以上DML转换为REDO要素如下(通常以下信息可以在数据库的REDO日志得到)

old: pk1=2,pk2=2
new: null
tag: delete  

old: null
new: pk1=2,pk2=2,c1=2,c2='test22',crt_time='2017-02-14'
tag: insert  

old: null
new: pk1=1,pk2=1,c1=2,c2='test23',crt_time='2017-02-14'
tag: insert  

old: null
new: pk1=1,pk2=2,c1=3,c2='test34',crt_time='2017-02-14'
tag: insert  

old: pk1=1,pk2=2
new: c2='new',crt_time=null
tag: update  

old: pk1=1,pk2=1
new: null
tag: delete  

old: null
new: pk1=1,pk2=1,c1=5,c2='test56',crt_time='2017-02-14'
tag: insert  

old: pk1=1,pk2=1
new: c2='new11',crt_time=null
tag: update  

old: pk1=1,pk2=2
new: c2='new12',crt_time='2017-02-15'
tag: update

合并过程,对已提交的记录,按PK进行分组,按执行先后顺序排序

delete from tbl where pk1=2 and pk2=2;   -- 说明目标表已经存在pk1=2,pk2=2的记录
insert into tbl (pk1,pk2,c1,c2,crt_time) values (2,2,2,'test22','2017-02-14');  

合并后,目标表应该仅仅执行如下SQL
update tbl set c1=2,c2='test22',crt_time='2017-02-14' where pk1=2 and pk2=2;
insert into tbl (pk1,pk2,c1,c2,crt_time) values (1,1,2,'test23','2017-02-14');
delete from tbl where pk1=1 and pk2=1;
insert into tbl (pk1,pk2,c1,c2,crt_time) values (1,1,5,'test56','2017-02-14');
update tbl set c2='new11', crt_time=null where pk1=1 and pk2=1;  

合并后,目标表应该仅仅执行如下SQL
insert into tbl (pk1,pk2,c1,c2,crt_time) values (1,1,5,'new11',null);
insert into tbl (pk1,pk2,c1,c2,crt_time) values (1,2,3,'test34','2017-02-14');
update tbl set c2='new', crt_time=null where pk1=1 and pk2=2;
update tbl set c2='new12', crt_time='2017-02-15' where pk1=1 and pk2=2;  

合并后,目标表应该仅仅执行如下SQL
insert into tbl (pk1,pk2,c1,c2,crt_time) values (1,2,3,'new12','2017-02-15');

如果涉及到PK的变更,需要将其分解为delete和insert两条

例如
old: pk1=1,pk2=1
new: pk1=1,pk2=3,c2='new11',crt_time=null
tag: update  

需要分解为
delete from tbl where pk1=1,pk2=1;
insert into tbl (pk1,pk2,c2,crt_time) values (1,3,'new11',null);

OLAP数据合并更新、删除的例子1

逻辑复制的合并相对来说比较复杂,但是PostgreSQL是一个功能强大的数据库,它支持窗口查询,编程能力强大的plpgsql函数语言(还有python, java, perl等数据库函数语言),使用SQL还是比较方便的可以完成以上合并的。

除了逻辑复制,在OLAP中也经常要用到合并更新,主要的目的是减少OLAP系统SQL的执行次数(因为OLAP系统并不是为TP业务设计,而是为批处理或大量运算设计的,多次SQL如果能合并成一次的话,可以大幅提升效率)

下面就以更新的合并为例,简单的讲解一下数据合并的例子。

比如一张表有1亿记录,每天要更新其中的10万条记录。我们要做的是将10万条UPDATE语句,合并成一条UPDATE语句。

合并方法

1. 首先将更新语句转换为数据,插入一张临时表

2. 然后使用join update来更新目标表

过程如下

创建一个生产表(目标表,必须有PK),假设它有1亿(为了演示,仅使用100万记录)用户数据。

create table prod(id int primary key, c1 int, info text, crt_time timestamp, mod_time timestamp);

创建一张临时表,用来存储合并前的DML,表结构如下,需要包含一个新增的序列PK,以及目标表的所有字段,以及每个字段对应的SET位(表示该字段是否被更新)

我们这里假设一条记录,可能被多次更新。

create table tmp1
(
  pk serial8 primary key,   -- 标记插入顺序
  id int, c1 int, info text, crt_time timestamp, mod_time timestamp, -- 更新后的值
  set_id boolean, set_c1 boolean, set_info boolean, set_crt_time boolean, set_mod_time boolean -- 被更新的字段
);

插入100万数据到prod表

insert into prod select generate_series(1,1000000), 1, 'test', now(), null;

TP系统中的UPDATE语句,我们将它转换为目标值,插入临时表

insert into tmp1 (id,c1,info,crt_time,mod_time,set_id,set_c1,set_info,set_crt_time,set_mod_time)
  select random()*10000, 2, null, null, clock_timestamp(), true,true,false,false,true from generate_series(1,10000);  -- c1=2, mod_time=clock_timestamp()  

insert into tmp1 (id,c1,info,crt_time,mod_time,set_id,set_c1,set_info,set_crt_time,set_mod_time)
  select random()*10000, 3, 'new', null, clock_timestamp(), true,true,true,false,true from generate_series(1,10000);  -- c1=3, info='new', mod_time=clock_timestamp()  

insert into tmp1 (id,c1,info,crt_time,mod_time,set_id,set_c1,set_info,set_crt_time,set_mod_time)
  select random()*10000, null, 'new1', null, clock_timestamp(), true,false,true,false,true from generate_series(1,10000);  -- info='new1', mod_time=clock_timestamp()  

insert into tmp1 (id,c1,info,crt_time,mod_time,set_id,set_c1,set_info,set_crt_time,set_mod_time)
  select random()*10000, 5, null, null, clock_timestamp(), true,true,true,false,true from generate_series(1,10000);  -- c1=5, info=null, mod_time=clock_timestamp()

true大于false,将用于多条记录的合并

postgres=# select true > false;
 ?column?
----------
 t
(1 row)

如果一条记录被多次UPDATE,需要将多个UPDATE合并为一个UPDATE

用到了窗口查询,以目标表的PK为分组,按不同字段的set位优先取true的最后一条值,以及它的set状态。

select id, c1, set_c1, info, set_info, crt_time, set_crt_time, mod_time, set_mod_time from
(
  select
    row_number() over (partition by id) as rn,
    id,
    first_value(c1) over (partition by id order by set_c1 desc, pk desc) c1,
    first_value(set_c1) over (partition by id order by set_c1 desc, pk desc) set_c1,
    first_value(info) over (partition by id order by set_info desc, pk desc) info,
    first_value(set_info) over (partition by id order by set_info desc, pk desc) set_info,
    first_value(crt_time) over (partition by id order by set_crt_time desc, pk desc) crt_time,
    first_value(set_crt_time) over (partition by id order by set_crt_time desc, pk desc) set_crt_time,
    first_value(mod_time) over (partition by id order by set_mod_time desc, pk desc) mod_time,
    first_value(set_mod_time) over (partition by id order by set_mod_time desc, pk desc) set_mod_time
  from tmp1
) t
where t.rn=1;

以上就是合并后的数据

更新时,使用case,将字段set位为true的值更新为新的值,false的不变。

update prod set
  c1=(case when t.set_c1 then t.c1 else prod.c1 end) ,  -- 将字段set位为true的值更新为新的值,false的不变。
  info=(case when t.set_info then t.info else prod.info end) ,
  crt_time=(case when t.set_crt_time then t.crt_time else prod.crt_time end) ,
  mod_time=(case when t.set_mod_time then t.mod_time else prod.mod_time end)
from
(
  select id, c1, set_c1, info, set_info, crt_time, set_crt_time, mod_time, set_mod_time
  from
  (
    select
      row_number() over (partition by id) as rn,
      id,
      first_value(c1) over (partition by id order by set_c1 desc, pk desc) c1,
      first_value(set_c1) over (partition by id order by set_c1 desc, pk desc) set_c1,
      first_value(info) over (partition by id order by set_info desc, pk desc) info,
      first_value(set_info) over (partition by id order by set_info desc, pk desc) set_info,
      first_value(crt_time) over (partition by id order by set_crt_time desc, pk desc) crt_time,
      first_value(set_crt_time) over (partition by id order by set_crt_time desc, pk desc) set_crt_time,
      first_value(mod_time) over (partition by id order by set_mod_time desc, pk desc) mod_time,
      first_value(set_mod_time) over (partition by id order by set_mod_time desc, pk desc) set_mod_time
      from tmp1
  ) t
  where t.rn=1
) t
where prod.id=t.id;

删除更加简单,只需要将ID记录下来,delete from tbl where id in (...)即可,不再列举。

验证以上合并方法的一致性

使用两张目标表,一张为合并更新(合并更新的数据来源于实时更新的触发器日志),一张为实时更新。

drop table IF EXISTS prod;
drop table IF EXISTS prod_ck;
drop table IF EXISTS tmp1;  

create table prod(id int primary key, c1 int, info text, crt_time timestamp, mod_time timestamp);
insert into prod select generate_series(1,1000000), 1, 'test', now(), null;  

create table prod_ck(id int primary key, c1 int, info text, crt_time timestamp, mod_time timestamp);
insert into prod_ck select * from prod;  

create table tmp1
(
  pk serial8 primary key,   -- 标记插入顺序
  id int, c1 int, info text, crt_time timestamp, mod_time timestamp, -- 更新后的值
  set_id boolean, set_c1 boolean, set_info boolean, set_crt_time boolean, set_mod_time boolean -- 被更新的字段
);    

create or replace function f_tg() returns trigger as $$
declare
begin
  insert into tmp1 (id,c1,info,crt_time,mod_time,set_id,set_c1,set_info,set_crt_time,set_mod_time) values
    (NEW.id, NEW.c1, NEW.info, NEW.crt_time, NEW.mod_time, true, true, true, true, true);
  return null;
end;
$$ language plpgsql strict;  

create trigger tg after update on prod_ck for each row execute procedure f_tg();

使用pgbench,不断更新prod_ck

vi test.sql  

\set id random(1,10000)
\set c1 random(1,1000000)
update prod_ck set c1=:c1, crt_time=clock_timestamp(), mod_time=null where id=:id;
update prod_ck set c1=:c1, crt_time=null, mod_time=clock_timestamp() where id=:id+1;  

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 30  

progress: 3.0 s, 26379.3 tps, lat 1.211 ms stddev 0.660
progress: 4.0 s, 26339.9 tps, lat 1.213 ms stddev 0.700

将tmp1的更新合并到prod

update prod set
  c1=(case when t.set_c1 then t.c1 else prod.c1 end) ,  -- 将字段set位为true的值更新为新的值,false的不变。
  info=(case when t.set_info then t.info else prod.info end) ,
  crt_time=(case when t.set_crt_time then t.crt_time else prod.crt_time end) ,
  mod_time=(case when t.set_mod_time then t.mod_time else prod.mod_time end)
from
(
  select id, c1, set_c1, info, set_info, crt_time, set_crt_time, mod_time, set_mod_time
  from
  (
    select
      row_number() over (partition by id) as rn,
      id,
      first_value(c1) over (partition by id order by set_c1 desc, pk desc) c1,
      first_value(set_c1) over (partition by id order by set_c1 desc, pk desc) set_c1,
      first_value(info) over (partition by id order by set_info desc, pk desc) info,
      first_value(set_info) over (partition by id order by set_info desc, pk desc) set_info,
      first_value(crt_time) over (partition by id order by set_crt_time desc, pk desc) crt_time,
      first_value(set_crt_time) over (partition by id order by set_crt_time desc, pk desc) set_crt_time,
      first_value(mod_time) over (partition by id order by set_mod_time desc, pk desc) mod_time,
      first_value(set_mod_time) over (partition by id order by set_mod_time desc, pk desc) set_mod_time
      from tmp1
  ) t
  where t.rn=1
) t
where prod.id=t.id;  

UPDATE 10001

验证合并更新后prod和prod_ck是否一致

postgres=# select sum(hashtext(t.*::text)) from prod t;
      sum
----------------
 -2538529730583
(1 row)  

postgres=# select sum(hashtext(t.*::text)) from prod_ck t;
      sum
----------------
 -2538529730583
(1 row)

小结

数据合并的目标是将多条DML语句合并成一条,

包括将单条记录的多次更新、插入、删除合并为一次更新、插入或删除操作,

也包括将多条记录的多次DML合并成一条DML语句。

在数据逻辑复制、TP到AP业务系统的同步、物化视图 等场景有着广泛的应用。

特别是OLAP系统,由于并不是针对TP场景涉及,使用合并操作,可以大幅提升AP系统的操作效率。(Greenplum更新和删除都是表级锁, 效率也一般)

在greenplum单条记录,基于PK的更新速度测试

pgbench -M simple -n -r -f ./test.sql -P 1 -c 4 -j 4 -T 100 -h 127.0.0.1 -p 29999 -U digoal
progress: 1.0 s, 203.0 tps, lat 19.449 ms stddev 29.442
progress: 2.0 s, 290.0 tps, lat 13.739 ms stddev 0.337
progress: 3.0 s, 291.0 tps, lat 13.763 ms stddev 0.627
progress: 4.0 s, 276.0 tps, lat 14.035 ms stddev 3.919
progress: 5.0 s, 280.0 tps, lat 14.791 ms stddev 5.954
progress: 6.0 s, 296.0 tps, lat 13.493 ms stddev 1.720
progress: 7.0 s, 300.0 tps, lat 13.347 ms stddev 1.433

1万次更新需要十几秒才能完成,而使用合并更新,只需要0.几秒
时间: 2024-11-02 15:57:01

PostgreSQL、Greenplum DML合并操作 最佳实践的相关文章

容器服务节点重启操作最佳实践

直接重启节点可能会导致集群出现异常.比如,对于 Swarm Mode 集群内的 Manager 节点,如果 Manager 健康节点数小于 2,则可能会导致集群无法自愈,最终导致集群不可用.本文结合阿里云历史案例经验,说明了在对容器服务进行主动运维等场景下,需要重启节点时的操作最佳实践. 检查业务高可用配置 在重启容器服务节点前,建议先检查或修正如下业务配置,以避免节点重启触发单点异常,进而导致业务可用性受损: 1. 配置数据持久化策略 建议为日志.业务配置等重要数据配置外部卷进行数据持久化,以

PostgreSQL 10.0 逻辑复制原理与最佳实践

标签 PostgreSQL , logical replication , 逻辑复制 , 最佳实践 背景 PostgreSQL 从2010年发布的9.0开始支持流式物理复制,备库可以作为只读库打开,提供给用户使用. 物理复制的好处 1. 物理层面完全一致,这是许多商业数据库的惯用手段.例如Oracle的DG. 2. 延迟低,事务执行过程中产生REDO record,实时的在备库apply,事务结束时,备库立马能见到数据.不论事务多大,都一样. 3. 物理复制的一致性.可靠性达到了金融级的需求,不

Greenplum 空间(GIS)数据检索 B-Tree & GiST 索引实践 - 阿里云HybridDB for PostgreSQL最佳实践

标签 PostgreSQL , GIS , PostGIS , Greenplum , 空间检索 , GiST , B-Tree , geohash 背景 气象数据.地震数据.室内定位.室外定位.手机.车联网.还有我们最喜欢的"左划不喜欢.右划喜欢",越来越多的位置属性的数据.将来会越来越多. 基于GIS的数据分析.OLTP业务也越来越受到决策者的青睐,例如商场的选址决策,O2O的广告营销等.有很多基于多边形.时间.用户对象属性过滤的需求. 阿里云HybridDB for Postgr

云端流计算、在线业务、实时分析 闭环设计 - 阿里云RDS、HybridDB for PostgreSQL最佳实践

背景 水的流动汇成江河大海,孕育生命,形成大自然生态.数据流动,推进社会进步,拓展业务边界. <从人类河流文明 洞察 数据流动的重要性> 以某淘系业务案例展开,看看用户如何利用阿里云RDS PostgreSQL,HybridDB for PostgreSQL,海量对象存储OSS,打造一个从流计算到在线业务,再到数据分析和挖掘的业务,发挥数据的价值,拓展业务的边界. 业务简介 一个电商业务通常会涉及 商家.门店.物流.用户.支付渠道.贷款渠道.商品.平台.小二.广告商.厂家.分销商.店主.店员.

贷款、天使投资(风控助手)业务数据库设计 - 阿里云RDS PostgreSQL, HybridDB for PostgreSQL最佳实践

标签 PostgreSQL , HybridDB for PostgreSQL , 小微贷款 , 金融风控 , 企业图谱 , 图式搜索 , 舆情分析 , 自动贷款 , 贷款审查 , 审查神器 背景 贷款是银行的主营业务之一,但是并不是只有银行能提供贷款,实际上资金雄厚的公司都有能力提供贷款(比如保险行业.资源垄断型企业等). 除了放贷,我们常说的天使投资.A轮B轮啥的,也是类似的场景,凭什么投你,背后如何决策也需要决策系统的支撑. 与贷款相反的是吸金类业务,比如我们现在发现越来越多的理财产品.股

机票业务(单实例 2700万行/s return)数据库架构设计 - 阿里云RDS PostgreSQL最佳实践

背景 机票业务的某个模块,数据量10亿+,写.更新.删除量较低.根据KEY查询一些数据,每次查询返回1万条左右的记录. 就是这样简单的需求,业务方发现读成为了巨大的瓶颈,每次返回1万条,100个并发请求,每秒就是100万条(500MB左右),主要的瓶颈: 1.网络是个较大的开销. 2.不同KEY的数据可能是分散存放的,存在查询时的IO放大,可能有一定的性能影响. 3.每次请求的返回记录数较多,数据库search buffer调用可能开销会上升. 就这几个问题,我们来看看如何优化或解决业务方的问题

音视图(泛内容)网站透视分析 DB设计 - 阿里云(RDS、HybridDB) for PostgreSQL最佳实践

标签 PostgreSQL , 用户透视 , 设备透视 , 圈人 , 标签 , 视频网站 , 优酷 , 土豆 , 喜马拉雅 背景 日常生活中,人们使用最多的除了社交类网站.购物网站,估计就是音频.视频.图文信息类内容网站了. 视频网站,已经渗透到各种终端,除了喜闻乐见的手机,还包括移动终端.电脑.盒子.电视.投影仪等.有设备属性.会员属性.渠道属性等. 内容运营是非常重要的环节,而透视则是运营的重要武器. 业务需求 1.生成设备.会员画像 ID.各个维度的标签.其中包括一些多值列标签(例如最近7

每天万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践

背景 横看成岭侧成峰, 远近高低各不同. 不识庐山真面目, 只缘身在此山中. 不同的视角我们所看到的物体是不一样的, http://t.m.china.com.cn/convert/c_ovWL9w.html 图为墨西哥城放射状的街区广场. 图为西班牙迷宫般的果树漩涡. 地心说和日心说也是视角不同所呈现的. 实际上数据也有这样,我们每天产生海量的数据,有各种属性,以每个属性为视角(分组.归类.聚合),看到的是对应属性为中心的数据. 对应的业务场景也非常多,例如: 1.物联网, 每个传感器有很多属

Greenplum 最佳实践 - 什么时候选择bitmap索引

标签 PostgreSQL , Greenplum , bitmap index 背景 PostgreSQL 目前支持8种索引接口,包括B-Tree, hash, gin, gist, sp-gist, brin, rum, bloom. Greenplum 目前支持B-Tree, GiST, bitmap三种索引接口. 用户可以根据不同的数据类型,不同的请求类型,使用不同的索引接口建立相应的索引.例如对于数组,全文检索类型,可以使用GIN索引,对于地理位置数据,范围数据类型,图像特征值数据,几