动态输出(ToB海量日志转换业务) - 阿里云HybridDB for PostgreSQL最佳实践

标签

PostgreSQL , UDF , 动态格式 , format , JOIN , OSS外部表


背景

有一些业务需要将数据归类动态的输出,比如一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。

比如这个业务:

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

一、需求

1、可以根据ToB的用户的定义,输出不同的格式。

2、每个ToB的用户,写入到一个文件或多个文件。

3、一个文件不能出现两个用户的内容。

其他需求见:

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

二、架构设计

1、采用OSS存储实时载入的海量公共日志。

2、采用HybridDB for PostgreSQL或RDS PostgreSQL的OSS外部表接口,直接并行读取OSS的文件。

3、为了防止ToB ID的数据倾斜问题,引入一个时间字段进行截断,采用 "ToB ID+截断时间" 两个字段进行重分布。

为了实现自动生成截断格式,需要起一个任务,自动计算截断格式。并将格式写入一张表。

4、通过HDB PG的窗口函数,按 "ToB ID+截断时间" 强制重分布。

5、通过UDF,将公共日志的格式,按ToB ID对应的UDF转换为对应的格式。

为了实现动态的格式需求,采用UDF,并将ToB ID对应的UDF写入一张表。

6、将转换后的数据,写入OSS。自动按ToB ID切换,绝对保证每个ToB的用户,写入到一个文件或多个文件。一个文件不出现两个用户的内容。

以上功能是阿里云HybridDB for PostgreSQL或RDS PostgreSQL的独有功能。

三、DEMO与性能

这里介绍如何动态的转换数据,其他内容请详见案例:

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

1、创建公共日志OSS外部表 - 数据来源

根据domain字段的值,动态格式输出,每个domain写一个或多个文件。不同的domain绝对不能出现在同一个文件中。

create external table demo_source (
UnixTime text,  -- 这个字段为unixtime,需要使用to_timestamp进行格式输出
domain  text,
c1 text,
c2 text,
c3 text,
c4 text,
c5 text,
c6 int,
c7 text,
c8 text,
c9 text
)
location('oss://xxxxx
        dir=xxxx/xxx/xxx/xxx id=xxx
        key=xxx bucket=xxx') FORMAT 'csv' (QUOTE '''' ESCAPE ''''   DELIMITER ',')
LOG ERRORS SEGMENT REJECT LIMIT 10000
;

2、写入一批测试数据,根据对应的CSV格式,将文件写入OSS对应的dir/bucket中。

3、创建公共日志OSS外部表 - 动态格式输出

create WRITABLE external table demo_output (
rn int8,   -- 输出到OSS时,忽略该字段
domain_and_key  text,   -- 输出到OSS时,忽略该字段
Data  text
)
location('oss://xxxxx
        dir=xxxx/xxx/xxx/xxx id=xxx id=xxx
        key=xxx bucket=xxx distributed_column=domain_and_key') FORMAT 'csv'
DISTRIBUTED BY (domain_and_key)
;

将按domain_and_key字段值的不同,自动将Data字段的数据写入对应的OSS文件。实现数据的重规整。(domain_and_key字段不写入OSS文件。)

这是阿里云HybridDB for PG的定制功能。

4、创建domain的时间格式化表。

create table domain_tsfmt(
  domain text,
  tsfmt text
);

根据domain的count(*),即数据量的多少,决定使用的时间截断格式,(yyyy, yyyymm, yyyymmdd, yyyymmddhh24, yyyymmddhh24miss)

越大的domain,需要越长(yyyymmhh24miss)的截断。

业务方可以来维护这个表,例如一天生成一次。

对于很小的domain,不需要写入这张表,可以采用统一格式,例如0000。

insert into domain_tsfmt values ('domain_a', 'yyyymmhh24');
insert into domain_tsfmt values ('domain_b', 'yyyymmhh24mi');

5、创建UDF元信息表,存储每个ToB ID对应的UDF名字

create table domain_udf(domain text, udf name);

6、创建UDF,需要定制格式的ToB ID,创建对应的UDF

PostgreSQL 例子

建议用format。

create or replace function f1(demo_source) returns text as $$
  select format('domain: %L , c2: %L , c4: %L', $1.domain, $1.c2, $1.c4);
$$ language sql strict;      

create or replace function f2(demo_source) returns text as $$
declare
  res text := format('%L , %L, %L', upper($1.c2), $1.c4, $1.c3);
begin
  return res;
end;
$$ language plpgsql strict;

HybridDB for PostgreSQL 例子

create or replace function f1(demo_source) returns text as $$
  select 'domain: '||$1.domain||' , c2: '||$1.c2||' , c4: '||$1.c4;
$$ language sql strict;      

create or replace function f2(demo_source) returns text as $$
  select upper($1.c2)||' , '||$1.c4||' , '||$1.c3;
$$ language sql strict;     

create or replace function f3(demo_source) returns text as $$
  select upper($1.c2)||' , '||$1.c4||' , '||$1.c9;
$$ language sql strict;

7、创建动态UDF,根据输入,动态调用对应的UDF

create or replace function ff(domain_source, name) returns text as $$
declare
  sql text := 'select '||quote_ident($2)||'('||quote_literal($1)||')';
  res text;
begin
  execute sql into res;
  return res;
end;
$$ language plpgsql strict;      

-- 由于hdb版本太老,不支持format,不支持record和text互转,不支持quote_literal(record)。
-- 调整如下    

create or replace function ff(domain_source, name) returns text as $$
declare
  sql text := 'select '||quote_ident($2)||'($abc_abc_abc$'||textin(record_out($1))||'$abc_abc_abc$)';
  res text;
begin
  execute sql into res;
  return res;
end;
$$ language plpgsql strict;

8、写入UDF映射,例如1-100的domain,使用F1进行转换,0的ID使用F2进行转换。

insert into domain_udf select 'domain_'||generate_series(1,100), 'f1';
insert into domain_udf values ('domain_0', 'f2');

不在这里的DOMAIN,采用默认UDF转换格式,例如f3。

9、根据 "domain + 时间截断" 重分发,根据UDF动态转换查询如下:

select domain_and_key, data from
(
select row_number() over (partition by domain||key order by domain,key) as RN, domain||key as domain_and_key, data from
-- partition by的窗口与第二个字段domain||key(目标表的分布键)保持一致,确保只需要重分发一次
(
  select
    t1.domain,
    (case when t2.* is null then '0000' else to_char(to_timestamp(t1.UnixTime::text::float8), t2.tsfmt) end) as key,
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data
  from domain_source t1
    left join domain_tsfmt t2 using (domain)
    left join domain_udf t3 using (domain)
) t
) t
;

阿里云HDB FOR PG将自动根据OSS目标外部表中定义的distributed_column参数,当VALUE发生变化时,自动写新文件,从而实现不同的DOMAIN VALUE,写到不同的文件中。

10、将规整后的数据输出到OSS

insert into domain_output
select domain_and_key, data from
(
select row_number() over (partition by domain||key order by domain,key) as RN, domain||key as domain_and_key, data from
-- partition by的窗口与第二个字段domain||key(目标表的分布键)保持一致,确保只需要重分发一次
(
  select
    t1.domain,
    (case when t2.* is null then '0000' else to_char(to_timestamp(t1.UnixTime::text::float8), t2.tsfmt) end) as key,
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data
  from domain_source t1
    left join domain_tsfmt t2 using (domain)
    left join domain_udf t3 using (domain)
) t
) t
;    

执行计划
只需要重分布一次  

                                                               QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
 Insert (slice0; segments: 3)  (rows=333334 width=64)
   ->  Subquery Scan t  (cost=375072.68..395072.68 rows=333334 width=64)
         ->  Window  (cost=375072.68..385072.68 rows=333334 width=96)
               Partition By: domain_and_key
               Order By: domain_and_key
	       -- 有按目标列排序,所以OSS切换文件没有问题
	       -- 注意同一个domain的不同时间的数据,可能会切成多个文件
	       -- 如果想让同一个domain的数据,切到一个文件中,那么请使用domain, key作为窗口分组,同时OSS外表的实现上需要修改一下,忽略两个列。
               ->  Sort  (cost=375072.68..377572.68 rows=333334 width=96)
                     Sort Key: domain_and_key
                     ->  Redistribute Motion 3:3  (slice3; segments: 3)  (cost=12.84..86770.34 rows=333334 width=96)
		     -- 重分布一次。采用窗口函数,强制了重分布
                           Hash Key: domain_and_key
                           ->  Subquery Scan t  (cost=12.84..66770.34 rows=333334 width=96)
                                 ->  Hash Left Join  (cost=12.84..54270.34 rows=333334 width=237)
                                       Hash Cond: t1.domain = t2.domain
                                       ->  Hash Left Join  (cost=11.75..24261.75 rows=333334 width=192)
                                             Hash Cond: t1.domain = t3.domain
                                             ->  External Scan on domain_source t1  (cost=0.00..11000.00 rows=333334 width=96)
                                             ->  Hash  (cost=8.00..8.00 rows=100 width=105)
                                                   ->  Broadcast Motion 3:3  (slice1; segments: 3)  (cost=0.00..8.00 rows=100 width=105)
                                                         ->  Seq Scan on domain_udf t3  (cost=0.00..4.00 rows=34 width=105)
                                       ->  Hash  (cost=1.05..1.05 rows=1 width=61)
                                             ->  Broadcast Motion 3:3  (slice2; segments: 3)  (cost=0.00..1.05 rows=1 width=61)
                                                   ->  Seq Scan on domain_tsfmt t2  (cost=0.00..1.01 rows=1 width=61)
(21 rows)

简化DEMO (不依赖OSS,仅仅演示)

1、创建公共日志表

create table t1 (tid int, c1 text, c2 text, c3 int, c4 timestamp, c5 numeric);

2、写入一批测试数据

insert into t1 select random()*100, md5(random()::text), 'test', random()*10000, clock_timestamp(), random() from generate_series(1,1000000);

3、创建目标表

create table t1_output (rn int8, fmt text, data text);

4、创建UDF元信息表,存储每个ToB ID对应的UDF名字

create table t2(tid int, udf name);

5、创建UDF,需要定制格式的ToB ID,创建对应的UDF

create or replace function f1(t1) returns text as $$
  select 'tid: '||$1.tid||' , c2: '||$1.c2||' , c4: '||$1.c4;
$$ language sql strict;      

create or replace function f2(t1) returns text as $$
  select $1.tid||' , '||upper($1.c2)||' , '||$1.c4||' , '||$1.c3;
$$ language sql strict;      

create or replace function f3(t1) returns text as $$
  select $1.tid||' , '||upper($1.c2)||' , '||$1.c3||' , '||$1.c3;
$$ language sql strict;

默认采用f3()函数。

6、创建动态UDF,根据输入,动态调用对应的UDF

create or replace function ff(t1, name) returns text as $$
declare
  sql text := 'select '||quote_ident($2)||'($abc_abc_abc$'||textin(record_out($1))||'$abc_abc_abc$)';
  res text;
begin
  execute sql into res;
  return res;
end;
$$ language plpgsql strict;

7、写入UDF映射,例如1-10的ID,使用F1进行转换,0的ID使用F2进行转换。

insert into t2 select generate_series(1,10), 'f1';
insert into t2 values (0, 'f2');

8、创建格式表

create table t1_fmt (tid int, fmt text);    

insert into t1_fmt values (1, 'yyyymm');
insert into t1_fmt values (2, 'yyyy');

默认采样'0000'的格式。

9、动态转换查询如下:

select tid_key, data from
(
select row_number() over (partition by tid||key order by key) as RN, tid||key as tid_key, data from
(
  select
    t1.tid,
    (case when t2.* is null then '0000' else to_char(t1.c4, t2.fmt) end) as key,
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data
  from t1
    left join t1_fmt t2 using (tid)
    left join t2 t3 using (tid)
) t
) t
;

10、将规整后的数据输出到目标表

insert into t1_output
select tid_key, data from
(
select row_number() over (partition by tid||key order by key) as RN, tid||key as tid_key, data from
(
  select
    t1.tid,
    (case when t2.* is null then '0000' else to_char(t1.c4, t2.fmt) end) as key,
    ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data
  from t1
    left join t1_fmt t2 using (tid)
    left join t2 t3 using (tid)
) t
) t
;    

INSERT 0 1000000    

postgres=# select * from t1_output  limit 100;
   fmt   |                        data
---------+-----------------------------------------------------
 870000  | 87 , TEST , 7108 , 7108
 870000  | 87 , TEST , 787 , 787
 870000  | 87 , TEST , 6748 , 6748
 870000  | 87 , TEST , 6385 , 6385
 870000  | 87 , TEST , 5278 , 5278
 870000  | 87 , TEST , 8132 , 8132
 870000  | 87 , TEST , 7513 , 7513
 870000  | 87 , TEST , 2025 , 2025
 870000  | 87 , TEST , 7322 , 7322
 870000  | 87 , TEST , 2019 , 2019
 870000  | 87 , TEST , 6416 , 6416
 870000  | 87 , TEST , 9959 , 9959
 870000  | 87 , TEST , 7876 , 7876
 870000  | 87 , TEST , 5022 , 5022
.....  

写OSS外部表时,根据fmt列的VALUE进行识别,当遇到不同的VALUE就切换文件名,不同FMT VALUE的数据写入不同的文件。

四、技术点

这里只谈本文涉及的技术点。

1、UDF

PostgreSQL支持多种UDF语言(例如C,plpgsql, sql, plpython, pljava, plv8, ......),用户通过UDF定义需要转换的格式。

2、动态调用

用户通过动态调用,可以动态的调用对应的UDF,在一个请求中生成不同的格式。

五、云端产品

阿里云 RDS PostgreSQL

阿里云 HybridDB for PostgreSQL

阿里云 OSS

六、类似场景、案例

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

七、小结

一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。

PostgreSQL支持多种UDF语言(例如C,plpgsql, sql, plpython, pljava, plv8, ......),用户通过UDF定义需要转换的格式。

用户通过动态调用,可以动态的调用对应的UDF,在一个请求中生成不同的格式。

八、参考

《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

时间: 2024-12-11 18:37:56

动态输出(ToB海量日志转换业务) - 阿里云HybridDB for PostgreSQL最佳实践的相关文章

每天万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践

背景 横看成岭侧成峰, 远近高低各不同. 不识庐山真面目, 只缘身在此山中. 不同的视角我们所看到的物体是不一样的, http://t.m.china.com.cn/convert/c_ovWL9w.html 图为墨西哥城放射状的街区广场. 图为西班牙迷宫般的果树漩涡. 地心说和日心说也是视角不同所呈现的. 实际上数据也有这样,我们每天产生海量的数据,有各种属性,以每个属性为视角(分组.归类.聚合),看到的是对应属性为中心的数据. 对应的业务场景也非常多,例如: 1.物联网, 每个传感器有很多属

Greenplum 空间(GIS)数据检索 B-Tree & GiST 索引实践 - 阿里云HybridDB for PostgreSQL最佳实践

标签 PostgreSQL , GIS , PostGIS , Greenplum , 空间检索 , GiST , B-Tree , geohash 背景 气象数据.地震数据.室内定位.室外定位.手机.车联网.还有我们最喜欢的"左划不喜欢.右划喜欢",越来越多的位置属性的数据.将来会越来越多. 基于GIS的数据分析.OLTP业务也越来越受到决策者的青睐,例如商场的选址决策,O2O的广告营销等.有很多基于多边形.时间.用户对象属性过滤的需求. 阿里云HybridDB for Postgr

MPP分布式数据库性能评估方法 - 阿里云HybridDB for PostgreSQL最佳实践

背景 通常评估一个数据库的性能,可以选择工业标准测试,或者根据业务模型,建模进行测试. 例如PostgreSQL pgbench支持的tpc-b测试,以及自定义模型测试. benchmarksql支持的tpc-c测试. gp_tpch支持的tpc-h测试等等. 参考文档如下 <TPC-H测试 - PostgreSQL 10 vs Deepgreen(Greenplum)> <PostgreSQL 使用 pgbench 测试 sysbench 相关case> <PostgreS

分布式DB锁问题排查方法 - 阿里云HybridDB for PostgreSQL最佳实践

标签 PostgreSQL , Greenplum , 锁 , SEGMENT不一致 , gp_session_role=utility , gp_dist_random 背景 Greenplum(GPDB)是一个分布式数据库,分布式数据库的锁管理比单机更加复杂.例如在加锁时,需要对所有节点加锁(包括MASTER和所有的SEGMENT节点),在释放锁时,则需要释放所有节点的锁. 如果在释放过程中,MASTER的锁释放了,而SEGMENT锁没有释放,会造成什么问题呢? 不用说,会有很诡异的问题出现

分布式DB数据倾斜的原因和解法 - 阿里云HybridDB for PostgreSQL最佳实践

标签 PostgreSQL , Greenplum , query倾斜 , 存储倾斜 , OOM , disk full , 短板 , 数据分布 背景 对于分布式数据库来说,QUERY的运行效率取决于最慢的那个节点. 当数据出现倾斜时,某些节点的运算量可能比其他节点大.除了带来运行慢的问题,还有其他的问题,例如导致OOM,或者DISK FULL等问题. 如何监控倾斜 1.监控数据库级别倾斜 postgres=# select gp_execution_dbid(), datname, pg_si

行存、列存,堆表、AO表性能对比 - 阿里云HDB for PostgreSQL最佳实践

标签 PostgreSQL , GIS , PostGIS , Greenplum , 空间检索 , GiST , B-Tree , geohash 背景 <Greenplum 行存.列存,堆表.AO表的原理和选择> 以上文档详细的介绍了行存.列存,堆表.AO表的原理以及选择的依据. <一个简单算法可以帮助物联网,金融 用户 节约98%的数据存储成本 (PostgreSQL,Greenplum帮你做到)> 以上文档介绍了提升基于列存的全局数据压缩比的方法. <解密上帝之手 -

如何检测、清理Greenplum垃圾 - 阿里云HybridDB for PG最佳实践

标签 PostgreSQL , Greenplum , HDB for PG 背景 Greenplum通过多版本支持数据的删除和更新的并发和回滚,在删除数据时(使用DELETE删除),对记录的头部xmax值进行标记.在删除记录时,对记录的头部进行标记,同时插入新的版本. 这一就会导致一个问题,如果用户经常删除和插入或更新数据,表和索引都会膨胀. PostgreSQL是通过HOT技术以及autovacuum来避免或减少垃圾的.但是Greenplum没有自动回收的worker进程,所以需要人为的触发

云端流计算、在线业务、实时分析 闭环设计 - 阿里云RDS、HybridDB for PostgreSQL最佳实践

背景 水的流动汇成江河大海,孕育生命,形成大自然生态.数据流动,推进社会进步,拓展业务边界. <从人类河流文明 洞察 数据流动的重要性> 以某淘系业务案例展开,看看用户如何利用阿里云RDS PostgreSQL,HybridDB for PostgreSQL,海量对象存储OSS,打造一个从流计算到在线业务,再到数据分析和挖掘的业务,发挥数据的价值,拓展业务的边界. 业务简介 一个电商业务通常会涉及 商家.门店.物流.用户.支付渠道.贷款渠道.商品.平台.小二.广告商.厂家.分销商.店主.店员.

音视图(泛内容)网站透视分析 DB设计 - 阿里云(RDS、HybridDB) for PostgreSQL最佳实践

标签 PostgreSQL , 用户透视 , 设备透视 , 圈人 , 标签 , 视频网站 , 优酷 , 土豆 , 喜马拉雅 背景 日常生活中,人们使用最多的除了社交类网站.购物网站,估计就是音频.视频.图文信息类内容网站了. 视频网站,已经渗透到各种终端,除了喜闻乐见的手机,还包括移动终端.电脑.盒子.电视.投影仪等.有设备属性.会员属性.渠道属性等. 内容运营是非常重要的环节,而透视则是运营的重要武器. 业务需求 1.生成设备.会员画像 ID.各个维度的标签.其中包括一些多值列标签(例如最近7