阿里云RDS PG实践 - 流式标签 - 万亿级,实时任意标签圈人

标签

PostgreSQL , 阅后即焚 , 流计算 , 标签


背景

varbitx是阿里云RDS PG提供的一个BIT操作插件,使用这个插件已经成功的帮助用户提供了万亿级的毫秒级实时圈人功能。

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)》

结合阅后即焚的流式批量处理,schemaless UDF,可以实现高效的增、删标签,以及毫秒级别的按标签圈人。

《PostgreSQL 在铁老大订单系统中的schemaless设计和性能压测》

目标是百亿级用户体量,千万级标签。

架构图

阿里云varbitx 添加标签的函数接口

1. set_bit_array

set_bit_array (
  varbit,
  int,   -- 目标BIT (0|1)
  int,   -- 填充BIT (0|1)
  int[]  -- 目标位置
) returns varbit  

  将指定位置的BIT设置为0|1,(起始位=0),超出原始长度的部分填充0|1
  例如 set_bit_array('111100001111', 0, 1, array[1,15]) 返回 1011000011111110

正在添加这个函数,用于一次性处理添加和删除的BIT的设置

2. set_bit_array

set_bit_array (
  varbit,
  int,     -- 1目标BIT (0|1)
  int[]    -- 1目标位置
  int,     -- 2目标BIT (0|1)
  int[],   -- 2目标位置
  int      -- 填充BIT (0|1)
) returns varbit  

  将指定位置的BIT设置为0|1,(起始位=0),超出原始长度的部分填充0|1
  例如 set_bit_array('111100001111', 0, array[1,15], 1, array[0,4], 0) 返回 1011100011111110

阿里云varbitx 从bitmap得到字典ID

1. bit_posite

bit_posite (
  varbit,
  int,      -- (0|1)
  boolean
) returns int[]      

  返回 0|1 的位置,(起始位=0), true时正向返回,false时反向返回
  例如 bit_posite ('11110010011', 1, true) 返回 [0,1,2,3,6,9,10]
       bit_posite ('11110010011', 1, false) 返回 [10,9,6,3,2,1,0]

从字典ID得到USERID

select uid from dict where id = any ( bit_posite(x,x,x) );

demo

1 字典表

将USERID转换为ARRAY下标,即字典表

首先需要用到无缝自增ID

《PostgreSQL 无缝自增ID的实现 - by advisory lock》

1、字典表如下

create table dict(id int8 primary key, uid int8 not null);    

create unique index idx_dict_uid on dict (uid);

2 生成已有用户

create table t_uid (
  uid int8 primary key  -- 已有用户的USER ID
);

插入一批 USERID (一亿)

insert into t_uid select id from generate_series(1,100000000) t(id) order by random() ;

3 已有用户,一次性生成mapping下标

create sequence seq minvalue 0 start 0;    

insert into dict select nextval('seq'), uid from t_uid;    

select min(id),max(id),count(*) from dict;
 min |   max    |   count
-----+----------+-----------
   0 | 99999999 | 100000000
(1 row)

4 无缝自增下标 函数

新增的用户,通过这个函数写入,确保无缝自增下标:

create or replace function f_uniq(i_uid int8) returns int as $$
declare
  newid int;
  i int := 0;
  res int;
begin
  loop
    if i>0 then
      perform pg_sleep(0.2*random());
    else
      i := i+1;
    end if;  

    -- 获取已有的最大ID+1 (即将插入的ID)
    select max(id)+1 into newid from dict;
    if newid is not null then
      -- 获取AD LOCK
      if pg_try_advisory_xact_lock(newid) then
        -- 插入
        insert into dict (id,uid) values (newid,i_uid) ;
        -- 返回此次获取到的UID
        return newid;
      else
        -- 没有获取到AD LOCK则继续循环
        continue;
      end if;
    else
      -- 表示这是第一条记录,获取AD=0 的LOCK
      if pg_try_advisory_xact_lock(0) then
        insert into dict (id, uid) values (0, i_uid) ;
        return 0;
      else
        continue;
      end if;
    end if;
  end loop;    

  -- 如果因为瞬态导致PK冲突了,继续调用
  exception when others then
    select f_uniq(i_uid) into res;
    return res;
end;
$$ language plpgsql strict;

例如,新增一个USERID。

select f_uniq(?);

5 从ID或者USERID或从USERID获得ID

create or replace function get_id_from_uid (int8) returns int8 as $$
  select id from dict where uid=$1;
$$ language sql strict;  

create or replace function get_uid_from_id (int8) returns int8 as $$
  select uid from dict where id=$1;
$$ language sql strict;

6 标签描述表

create table t_tags(tagid int primary key, desc text);

7 流式标签表

1、UID范围映射表,落在什么区间的ID,对应什么后缀的表名

建议在写入时,自动写到不同的t_tag_log表,这样的话,不需要在消费时过滤。

create table t_mapping (
  dict_id int8range,      -- 字典ID区间
  suffix text unique             -- 表名后缀
);  

alter table t_mapping add constraint ck_exclude_dict_id exclude using gist(dict_id with &&);  -- 防止交叉区间
insert into t_mapping values (int8range(0,100000000), '0');
insert into t_mapping values (int8range(100000000,200000000), '1');
insert into t_mapping values (int8range(200000000,300000000), '2');
insert into t_mapping values (int8range(300000000,400000000), '3');

2、流式标签主表

create table t_tag_log (
  dict_id int8,        -- 用户下标ID
  action int2,         -- 删除0、新增1
  tagid int,           -- 标签ID
  crt_time timestamp   -- 时间
);  

create index idx_t_tag_log on t_tag_log (crt_time);

3、创建流式标签子表

按tagid分区,按t_mapping后缀对应关系分区。

do language plpgsql $$
declare
  x text;
begin
  for i in 0..63 loop
    for x in select suffix from t_mapping loop
      execute format('create table t_tag_log_%s_%s (like t_tag_log including all) inherits (t_tag_log)', i, x);
    end loop;
  end loop;
end;
$$;  

postgres=# \dt t_tag_log_*
             List of relations
 Schema |      Name      | Type  |  Owner
--------+----------------+-------+----------
 public | t_tag_log_0_0  | table | postgres
 public | t_tag_log_0_1  | table | postgres
 public | t_tag_log_0_2  | table | postgres
 public | t_tag_log_0_3  | table | postgres
 public | t_tag_log_10_0 | table | postgres
 public | t_tag_log_10_1 | table | postgres
 public | t_tag_log_10_2 | table | postgres
 public | t_tag_log_10_3 | table | postgres
 public | t_tag_log_11_0 | table | postgres
 public | t_tag_log_11_1 | table | postgres
.....................

4、使用schema lessUDF写入标签信息

create or replace function ins_tag(v_uid int8, v_action int2, v_tagid int, v_crt_time timestamp) returns void as $$
declare
  i int := mod(v_tagid, 64);
  x text;
begin
  select suffix into x from t_mapping where dict_id @> $1;
  execute format ('insert into t_tag_log_%s_%s (dict_id, action, tagid, crt_time) values (%s, %s, %s, %L)', i, x, get_id_from_uid(v_uid), v_action, v_tagid, v_crt_time);
end;
$$ language plpgsql strict;

8 高速写入贴标签、删除标签

vi test.sql  

\set uid random(1,100000000)
\set tagid random(1,100000)
\set action random(0,1)
select ins_tag (:uid::int8, :action::int2, :tagid, now()::timestamp);  

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120

单表单条写入性能:

单表或多表 批量写入 性能可以超过100万行/s。

transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 32
duration: 120 s
number of transactions actually processed: 19013298
latency average = 0.202 ms
latency stddev = 0.267 ms
tps = 158442.952245 (including connections establishing)
tps = 158449.386772 (excluding connections establishing)
script statistics:
 - statement latencies in milliseconds:
         0.001  \set uid random(1,100000000)
         0.000  \set tagid random(1,100000)
         0.000  \set action random(0,1)
         0.200  insert into t_tag_log (dict_id, action, tagid, crt_time) values (get_id_from_uid(:uid), :action, :tagid, now());

9 标签反转表

对应到前面的拆分模式,有两种模式,一种模式,使用多表存储。另一种模式使用单表存储。

1、多表存储。

标签反转主表

create table tag_userbitmap (
  tagid int primary key,             -- 标签ID
  usrebitmap varbit                  -- 用户bitmap
);

标签反转子表

do language plpgsql $$
declare
  x text;
begin
  for x in select suffix from t_mapping loop
    execute format('create table tag_userbitmap_%s (like tag_userbitmap including all) inherits (tag_userbitmap)', x);
  end loop;
end;
$$;

tag不需要分区,因为增量更新时,没有行锁冲突,而且表大小也足够。

postgres=# \dt tag_userbitmap*
              List of relations
 Schema |       Name       | Type  |  Owner
--------+------------------+-------+----------
 public | tag_userbitmap   | table | postgres
 public | tag_userbitmap_0 | table | postgres
 public | tag_userbitmap_1 | table | postgres
 public | tag_userbitmap_2 | table | postgres
 public | tag_userbitmap_3 | table | postgres
(5 rows)

2、单表存储,则加一列区分OFFSET。

create table tag_userbitmap_single (
  tagid int ,               -- 标签ID
  bitmap_offset int ,       -- bit号段offset 值
  usrebitmap varbit,        -- 当前号段的bitmap
  primary key (tagid, bitmap_offset)
);

10 阅后即焚-流式消费标签,合并到标签反转表

不考虑分区表时,这样来进行消费。

with tmp as (delete from t_tag_log     -- 查询哪个表
  where ctid = any ( array (
    select ctid from t_tag_log order by crt_time limit 100000       -- 批量处理10万条
  )) returning *
)
select tagid, action, array_agg(dict_id) as dict_id from
(
  select row_number() over w1 as rn, *
  from tmp
  window w1 as (partition by dict_id, tagid order by crt_time desc)  -- 每个ID,每个标签,取最后一条
) t where rn=1
group by tagid, action                                               -- 聚合为下标数组
;

11 schemaless UDF, 阅后即焚-流式消费标签,合并到标签反转表

对应tag_userbimap表的设计(单表、或多表),schemaless UDF也有不同。

1、多表:在UDF中自动拼接表名,将BITMAP合并到对应子表。

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as $$
declare
  v_tagid int;
  v_action int2;
  v_dict_id int[];
  v_offset int;
  v_fixed_dict_id int[];
begin
  select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;   

  for v_tagid, v_action, v_dict_id in
    execute format
('
with tmp as (delete from t_tag_log_%s_%s     -- 查询哪个表
  where ctid = any ( array (
    select ctid from t_tag_log_%s_%s order by crt_time limit %s       -- 批量处理N条
  )) returning *
)
select tagid, action, array_agg(dict_id) as dict_id from
(
  select row_number() over w1 as rn, *
  from tmp
  window w1 as (partition by dict_id, tagid order by crt_time desc)   -- 每个ID,每个标签,取最后一条
) t where rn=1
group by tagid, action
',
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)    

loop
  select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;
  -- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;
  execute format('insert into tag_userbitmap_%s (tagid, usrebitmap) values (%s, set_bit_array(''0'', %s, %s, %L))
                  on conflict (tagid)
                  do update set usrebitmap=set_bit_array(tag_userbitmap_%s.usrebitmap, %s, %s, %L)',
                  v_suffix2, v_tagid, v_action, 0, v_fixed_dict_id, v_suffix2, v_action, 0, v_fixed_dict_id);
end loop;
end;
$$ language plpgsql strict;

2、单表:

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as $$
declare
  v_tagid int;
  v_action int2;
  v_dict_id int[];
  v_offset int;
  v_fixed_dict_id int[];
begin
  select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;   

  for v_tagid, v_action, v_dict_id in
    execute format
('
with tmp as (delete from t_tag_log_%s_%s     -- 查询哪个表
  where ctid = any ( array (
    select ctid from t_tag_log_%s_%s order by crt_time limit %s       -- 批量处理N条
  )) returning *
)
select tagid, action, array_agg(dict_id) as dict_id from
(
  select row_number() over w1 as rn, *
  from tmp
  window w1 as (partition by dict_id, tagid order by crt_time desc)   -- 每个ID,每个标签,取最后一条
) t where rn=1
group by tagid, action
',
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)    

loop
  select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;
  -- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;
  execute format('insert into tag_userbitmap (tagid, bitmap_offset, usrebitmap) values (%s, %s, set_bit_array(''0'', %s, %s, %L))
                  on conflict (tagid, bitmap_offset)
                  do update set usrebitmap=set_bit_array(tag_userbitmap.usrebitmap, %s, %s, %L)',
                  v_suffix2, v_suffix1, v_action, 0, v_fixed_dict_id, v_action, 0, v_fixed_dict_id);
end loop;
end;
$$ language plpgsql strict;

3、单表和多表的函数接口是一样的,都是调用merge_tags函数,增量更新TAG,不同组合,允许并行调用。

以下QUERY可以并行调用:

select merge_tags(100000, 0, 0);
select merge_tags(100000, 0, 1);
select merge_tags(100000, 0, 2);
select merge_tags(100000, 0, 3);
.....
select merge_tags(100000, 63, 0);
select merge_tags(100000, 63, 1);
select merge_tags(100000, 63, 2);
select merge_tags(100000, 63, 3);

12 查询包含哪些TAG的USEID

这个SQL可以略微修改,包括coalesce,补齐varbit长度到固定长度等。

full outer JOIN 得到最终的varbit。

根据bitmap求dict_id, 再根据dict_id求user_id。

多表:

select bit_posite(t1.usrebitmap||t2.usrebitmap||t3.usrebitmap||t4.usrebitmap, '1', true)
from tag_userbitmap_0 t1 , ....
where tx.tagid in (....);

或单表:

create aggregate bit_agg (varbit) (sfunc = bitcat, stype=varbit) ;
select bit_posite(bit_and(tag), '1', true) from (
  select bit_agg(usrebitmap) from tag_userbitmap where tagid in (...) group by tagid
) t

详见用法

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

其他

1、维护字典表(删除僵尸用户)

2、同时收缩VARBITX

查询某个用户有哪些标签

这种点查,建议使用以下结构,数组作为标签。与本文相反。

create table t_user_tags(
  uid int8 primary key,   -- 用户ID
  tagid int[]             -- 标签ID
);

相似案例

《PostgreSQL手机行业经营分析、决策系统设计 - 实时圈选、透视、估算》

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《在PostgreSQL中实现update | delete limit - CTID扫描实践 (高效阅后即焚)》

《PostgreSQL 异步消息实践 - Feed系统实时监测与响应(如 电商主动服务) - 分钟级到毫秒级的实现》

时间: 2024-12-24 20:12:54

阿里云RDS PG实践 - 流式标签 - 万亿级,实时任意标签圈人的相关文章

阿里云发布首个流式存储与播放解决方案

本文讲的是阿里云发布首个流式存储与播放解决方案[IT168资讯]9月11日,首个面向视频监控行业的流式存储与播放一体化方案在阿里云官网上线.借助这份技术方案,摄像头接入云端的时间将从一两个月缩短到一周.由此带来的存储成本大幅下降,将加速推动监控企业从卖硬件向卖服务转型. 阿里云产品经理吴贻刚介绍,传统的技术方案普遍存在以下几个问题:码流上传占用极大的带宽,而下行回放录像的相对较小,造成带宽浪费;存储到一定规模后,达到设备的极限,水平扩容困难;灾备考虑不足,常常由于单块硬盘的损坏导致数据的丢失;一

德歌:阿里云RDS PG最佳实践

直播视频: (点击图片查看视频) 幻灯片下载地址:https://oss-cn-hangzhou.aliyuncs.com/yqfiles/1138a8a3aff5f63b426162e265d98375.pdf 上云实践 在上云之前,首先需要评估RDS的规格,这是因为线下使用的硬件可能与线上的硬件不能一一对应,并且线上的RDS可能还做了一定的优化.在评估RDS规格的时候,需要考虑以下几个方面: 可用区:  尽量与应用服务器在同一可用区:  否则只能通过公网地址访问. 数据库版本:根据业务需求选

Azure SQL数据库迁移阿里云RDS SQLserver实践

    一.背景          由于尝试直接使用DTS工具迁移,从微软云迁移SQL数据库到RDS SQLserver时发现,DTS虽然能够连接到Azure SQL, 但是无法获取结构,主要由于Azure SQL是微软针对微软云定制的数据库版本.与原本的MSSQL server还是不一样的.为了方便大家能够顺利迁移.整理了导入导出的迁移方式.   二.Azure SQL 数据库迁移到RDS SQLserver实践步骤   1.         在阿里云控制台创建好目标数据库和登陆用户. a) 

使用Londiste3 增量同步 线下PostgreSQL 到 阿里云RDS PG

源端 CentOS 7 PostgreSQL 9.5.2 , listen port 1922 公网IP 101.xxx.xxx.171 skytools 3.2.6 目标端 RDS PG xxx.digoal.pg.rds.aliyuncs.com port=3433 user=digoal dbname=db1 password=digoal 源端安装 PostgreSQL 略 源库 postgres=# create database db1; CREATE DATABASE 目标库 RD

海量实时计算+OLTP+OLAP DB设计 - 阿里云(RDS、HybridDB) for PostgreSQL最佳实践 - 泛电网系统应用

标签 PostgreSQL , 国家电网 , 电表 , 余额 , 流式计算 , 状态监测 , 上下文相关 背景 电网系统是一个关系民生,又非常典型的传统系统,虽然传统,量可不小.在互联网化(物联网化)的今天,有很多值得借鉴和思考的点供给其他相关系统参考. 每个省份大概有亿级户电表,最大的地市可能有千万户级别. 以往我们电费是怎么交的呢?我们小区是两个月交一次,也就是说先消费,再付款的方式.这么说起来电网真的是很仁义啊,现在哪有这么多先消费再付款的呀.移动话费.家庭宽带.天然气等等,都是充值后使用

(新零售)商户网格化运营 - 阿里云RDS PostgreSQL最佳实践

标签 PostgreSQL , PostGIS , 地理位置 , KNN , 近邻检索 , 网格检索 , polygon中心点 , 半径搜索 背景 伟大的马老师说: "纯电商时代很快会结束,未来的十年.二十年,没有电子商务这一说,只有新零售这一说,也就是说线上线下和物流必须结合在一起,才能诞生真正的新零售" 线上是指云平台,线下是指销售门店或生产商,新物流消灭库存,减少囤货量. 电子商务平台消失是指,现有的电商平台分散,每个人都有自己的电商平台,不再入驻天猫.京东.亚马逊大型电子商务平

音视图(泛内容)网站透视分析 DB设计 - 阿里云(RDS、HybridDB) for PostgreSQL最佳实践

标签 PostgreSQL , 用户透视 , 设备透视 , 圈人 , 标签 , 视频网站 , 优酷 , 土豆 , 喜马拉雅 背景 日常生活中,人们使用最多的除了社交类网站.购物网站,估计就是音频.视频.图文信息类内容网站了. 视频网站,已经渗透到各种终端,除了喜闻乐见的手机,还包括移动终端.电脑.盒子.电视.投影仪等.有设备属性.会员属性.渠道属性等. 内容运营是非常重要的环节,而透视则是运营的重要武器. 业务需求 1.生成设备.会员画像 ID.各个维度的标签.其中包括一些多值列标签(例如最近7

阿里云RDS PostgreSQL OSS 外部表 - 并行写提速案例

标签 PostgreSQL , oss对象存储 , 阿里云RDS PG , 并行写 , dblink , 异步调用 , 异步任务监控 , OSS外部表 , 数据传输 背景 阿里云RDS PostgreSQL.HybridDB for PostgreSQL提供了一个非常强大的功能,OSS对象存储外部表. 阿里云的RDS PostgreSQL用户可以利用OSS存储冷数据(OSS外部表的形态呈现),实现冷热分离:也可以利用OSS作为数据的中转桥梁,打通其他云端业务,例如HDB FOR PostgreS

云端流计算、在线业务、实时分析 闭环设计 - 阿里云RDS、HybridDB for PostgreSQL最佳实践

背景 水的流动汇成江河大海,孕育生命,形成大自然生态.数据流动,推进社会进步,拓展业务边界. <从人类河流文明 洞察 数据流动的重要性> 以某淘系业务案例展开,看看用户如何利用阿里云RDS PostgreSQL,HybridDB for PostgreSQL,海量对象存储OSS,打造一个从流计算到在线业务,再到数据分析和挖掘的业务,发挥数据的价值,拓展业务的边界. 业务简介 一个电商业务通常会涉及 商家.门店.物流.用户.支付渠道.贷款渠道.商品.平台.小二.广告商.厂家.分销商.店主.店员.