标签
PostgreSQL , UDF , 动态格式 , format , JOIN , OSS外部表
背景
有一些业务需要将数据归类动态的输出,比如一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。
比如这个业务:
《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》
一、需求
1、可以根据ToB的用户的定义,输出不同的格式。
2、每个ToB的用户,写入到一个文件或多个文件。
3、一个文件不能出现两个用户的内容。
其他需求见:
《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》
二、架构设计
1、采用OSS存储实时载入的海量公共日志。
2、采用HybridDB for PostgreSQL或RDS PostgreSQL的OSS外部表接口,直接并行读取OSS的文件。
3、为了防止ToB ID的数据倾斜问题,引入一个时间字段进行截断,采用 "ToB ID+截断时间" 两个字段进行重分布。
为了实现自动生成截断格式,需要起一个任务,自动计算截断格式。并将格式写入一张表。
4、通过HDB PG的窗口函数,按 "ToB ID+截断时间" 强制重分布。
5、通过UDF,将公共日志的格式,按ToB ID对应的UDF转换为对应的格式。
为了实现动态的格式需求,采用UDF,并将ToB ID对应的UDF写入一张表。
6、将转换后的数据,写入OSS。自动按ToB ID切换,绝对保证每个ToB的用户,写入到一个文件或多个文件。一个文件不出现两个用户的内容。
以上功能是阿里云HybridDB for PostgreSQL或RDS PostgreSQL的独有功能。
三、DEMO与性能
这里介绍如何动态的转换数据,其他内容请详见案例:
《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》
1、创建公共日志OSS外部表 - 数据来源
根据domain字段的值,动态格式输出,每个domain写一个或多个文件。不同的domain绝对不能出现在同一个文件中。
create external table demo_source (
UnixTime text, -- 这个字段为unixtime,需要使用to_timestamp进行格式输出
domain text,
c1 text,
c2 text,
c3 text,
c4 text,
c5 text,
c6 int,
c7 text,
c8 text,
c9 text
)
location('oss://xxxxx
dir=xxxx/xxx/xxx/xxx id=xxx
key=xxx bucket=xxx') FORMAT 'csv' (QUOTE '''' ESCAPE '''' DELIMITER ',')
LOG ERRORS SEGMENT REJECT LIMIT 10000
;
2、写入一批测试数据,根据对应的CSV格式,将文件写入OSS对应的dir/bucket中。
3、创建公共日志OSS外部表 - 动态格式输出
create WRITABLE external table demo_output (
rn int8, -- 输出到OSS时,忽略该字段
domain_and_key text, -- 输出到OSS时,忽略该字段
Data text
)
location('oss://xxxxx
dir=xxxx/xxx/xxx/xxx id=xxx id=xxx
key=xxx bucket=xxx distributed_column=domain_and_key') FORMAT 'csv'
DISTRIBUTED BY (domain_and_key)
;
将按domain_and_key字段值的不同,自动将Data字段的数据写入对应的OSS文件。实现数据的重规整。(domain_and_key字段不写入OSS文件。)
这是阿里云HybridDB for PG的定制功能。
4、创建domain的时间格式化表。
create table domain_tsfmt(
domain text,
tsfmt text
);
根据domain的count(*),即数据量的多少,决定使用的时间截断格式,(yyyy, yyyymm, yyyymmdd, yyyymmddhh24, yyyymmddhh24miss)
越大的domain,需要越长(yyyymmhh24miss)的截断。
业务方可以来维护这个表,例如一天生成一次。
对于很小的domain,不需要写入这张表,可以采用统一格式,例如0000。
insert into domain_tsfmt values ('domain_a', 'yyyymmhh24');
insert into domain_tsfmt values ('domain_b', 'yyyymmhh24mi');
5、创建UDF元信息表,存储每个ToB ID对应的UDF名字
create table domain_udf(domain text, udf name);
6、创建UDF,需要定制格式的ToB ID,创建对应的UDF
PostgreSQL 例子
建议用format。
create or replace function f1(demo_source) returns text as $$
select format('domain: %L , c2: %L , c4: %L', $1.domain, $1.c2, $1.c4);
$$ language sql strict;
create or replace function f2(demo_source) returns text as $$
declare
res text := format('%L , %L, %L', upper($1.c2), $1.c4, $1.c3);
begin
return res;
end;
$$ language plpgsql strict;
HybridDB for PostgreSQL 例子
create or replace function f1(demo_source) returns text as $$
select 'domain: '||$1.domain||' , c2: '||$1.c2||' , c4: '||$1.c4;
$$ language sql strict;
create or replace function f2(demo_source) returns text as $$
select upper($1.c2)||' , '||$1.c4||' , '||$1.c3;
$$ language sql strict;
create or replace function f3(demo_source) returns text as $$
select upper($1.c2)||' , '||$1.c4||' , '||$1.c9;
$$ language sql strict;
7、创建动态UDF,根据输入,动态调用对应的UDF
create or replace function ff(domain_source, name) returns text as $$
declare
sql text := 'select '||quote_ident($2)||'('||quote_literal($1)||')';
res text;
begin
execute sql into res;
return res;
end;
$$ language plpgsql strict;
-- 由于hdb版本太老,不支持format,不支持record和text互转,不支持quote_literal(record)。
-- 调整如下
create or replace function ff(domain_source, name) returns text as $$
declare
sql text := 'select '||quote_ident($2)||'($abc_abc_abc$'||textin(record_out($1))||'$abc_abc_abc$)';
res text;
begin
execute sql into res;
return res;
end;
$$ language plpgsql strict;
8、写入UDF映射,例如1-100的domain,使用F1进行转换,0的ID使用F2进行转换。
insert into domain_udf select 'domain_'||generate_series(1,100), 'f1';
insert into domain_udf values ('domain_0', 'f2');
不在这里的DOMAIN,采用默认UDF转换格式,例如f3。
9、根据 "domain + 时间截断" 重分发,根据UDF动态转换查询如下:
select domain_and_key, data from
(
select row_number() over (partition by domain||key order by domain,key) as RN, domain||key as domain_and_key, data from
-- partition by的窗口与第二个字段domain||key(目标表的分布键)保持一致,确保只需要重分发一次
(
select
t1.domain,
(case when t2.* is null then '0000' else to_char(to_timestamp(t1.UnixTime::text::float8), t2.tsfmt) end) as key,
ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data
from domain_source t1
left join domain_tsfmt t2 using (domain)
left join domain_udf t3 using (domain)
) t
) t
;
阿里云HDB FOR PG将自动根据OSS目标外部表中定义的distributed_column参数,当VALUE发生变化时,自动写新文件,从而实现不同的DOMAIN VALUE,写到不同的文件中。
10、将规整后的数据输出到OSS
insert into domain_output
select domain_and_key, data from
(
select row_number() over (partition by domain||key order by domain,key) as RN, domain||key as domain_and_key, data from
-- partition by的窗口与第二个字段domain||key(目标表的分布键)保持一致,确保只需要重分发一次
(
select
t1.domain,
(case when t2.* is null then '0000' else to_char(to_timestamp(t1.UnixTime::text::float8), t2.tsfmt) end) as key,
ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data
from domain_source t1
left join domain_tsfmt t2 using (domain)
left join domain_udf t3 using (domain)
) t
) t
;
执行计划
只需要重分布一次
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Insert (slice0; segments: 3) (rows=333334 width=64)
-> Subquery Scan t (cost=375072.68..395072.68 rows=333334 width=64)
-> Window (cost=375072.68..385072.68 rows=333334 width=96)
Partition By: domain_and_key
Order By: domain_and_key
-- 有按目标列排序,所以OSS切换文件没有问题
-- 注意同一个domain的不同时间的数据,可能会切成多个文件
-- 如果想让同一个domain的数据,切到一个文件中,那么请使用domain, key作为窗口分组,同时OSS外表的实现上需要修改一下,忽略两个列。
-> Sort (cost=375072.68..377572.68 rows=333334 width=96)
Sort Key: domain_and_key
-> Redistribute Motion 3:3 (slice3; segments: 3) (cost=12.84..86770.34 rows=333334 width=96)
-- 重分布一次。采用窗口函数,强制了重分布
Hash Key: domain_and_key
-> Subquery Scan t (cost=12.84..66770.34 rows=333334 width=96)
-> Hash Left Join (cost=12.84..54270.34 rows=333334 width=237)
Hash Cond: t1.domain = t2.domain
-> Hash Left Join (cost=11.75..24261.75 rows=333334 width=192)
Hash Cond: t1.domain = t3.domain
-> External Scan on domain_source t1 (cost=0.00..11000.00 rows=333334 width=96)
-> Hash (cost=8.00..8.00 rows=100 width=105)
-> Broadcast Motion 3:3 (slice1; segments: 3) (cost=0.00..8.00 rows=100 width=105)
-> Seq Scan on domain_udf t3 (cost=0.00..4.00 rows=34 width=105)
-> Hash (cost=1.05..1.05 rows=1 width=61)
-> Broadcast Motion 3:3 (slice2; segments: 3) (cost=0.00..1.05 rows=1 width=61)
-> Seq Scan on domain_tsfmt t2 (cost=0.00..1.01 rows=1 width=61)
(21 rows)
简化DEMO (不依赖OSS,仅仅演示)
1、创建公共日志表
create table t1 (tid int, c1 text, c2 text, c3 int, c4 timestamp, c5 numeric);
2、写入一批测试数据
insert into t1 select random()*100, md5(random()::text), 'test', random()*10000, clock_timestamp(), random() from generate_series(1,1000000);
3、创建目标表
create table t1_output (rn int8, fmt text, data text);
4、创建UDF元信息表,存储每个ToB ID对应的UDF名字
create table t2(tid int, udf name);
5、创建UDF,需要定制格式的ToB ID,创建对应的UDF
create or replace function f1(t1) returns text as $$
select 'tid: '||$1.tid||' , c2: '||$1.c2||' , c4: '||$1.c4;
$$ language sql strict;
create or replace function f2(t1) returns text as $$
select $1.tid||' , '||upper($1.c2)||' , '||$1.c4||' , '||$1.c3;
$$ language sql strict;
create or replace function f3(t1) returns text as $$
select $1.tid||' , '||upper($1.c2)||' , '||$1.c3||' , '||$1.c3;
$$ language sql strict;
默认采用f3()函数。
6、创建动态UDF,根据输入,动态调用对应的UDF
create or replace function ff(t1, name) returns text as $$
declare
sql text := 'select '||quote_ident($2)||'($abc_abc_abc$'||textin(record_out($1))||'$abc_abc_abc$)';
res text;
begin
execute sql into res;
return res;
end;
$$ language plpgsql strict;
7、写入UDF映射,例如1-10的ID,使用F1进行转换,0的ID使用F2进行转换。
insert into t2 select generate_series(1,10), 'f1';
insert into t2 values (0, 'f2');
8、创建格式表
create table t1_fmt (tid int, fmt text);
insert into t1_fmt values (1, 'yyyymm');
insert into t1_fmt values (2, 'yyyy');
默认采样'0000'的格式。
9、动态转换查询如下:
select tid_key, data from
(
select row_number() over (partition by tid||key order by key) as RN, tid||key as tid_key, data from
(
select
t1.tid,
(case when t2.* is null then '0000' else to_char(t1.c4, t2.fmt) end) as key,
ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data
from t1
left join t1_fmt t2 using (tid)
left join t2 t3 using (tid)
) t
) t
;
10、将规整后的数据输出到目标表
insert into t1_output
select tid_key, data from
(
select row_number() over (partition by tid||key order by key) as RN, tid||key as tid_key, data from
(
select
t1.tid,
(case when t2.* is null then '0000' else to_char(t1.c4, t2.fmt) end) as key,
ff(t1, (case when t3.* is null then 'f3' else t3.udf end)) as data
from t1
left join t1_fmt t2 using (tid)
left join t2 t3 using (tid)
) t
) t
;
INSERT 0 1000000
postgres=# select * from t1_output limit 100;
fmt | data
---------+-----------------------------------------------------
870000 | 87 , TEST , 7108 , 7108
870000 | 87 , TEST , 787 , 787
870000 | 87 , TEST , 6748 , 6748
870000 | 87 , TEST , 6385 , 6385
870000 | 87 , TEST , 5278 , 5278
870000 | 87 , TEST , 8132 , 8132
870000 | 87 , TEST , 7513 , 7513
870000 | 87 , TEST , 2025 , 2025
870000 | 87 , TEST , 7322 , 7322
870000 | 87 , TEST , 2019 , 2019
870000 | 87 , TEST , 6416 , 6416
870000 | 87 , TEST , 9959 , 9959
870000 | 87 , TEST , 7876 , 7876
870000 | 87 , TEST , 5022 , 5022
.....
写OSS外部表时,根据fmt列的VALUE进行识别,当遇到不同的VALUE就切换文件名,不同FMT VALUE的数据写入不同的文件。
四、技术点
这里只谈本文涉及的技术点。
1、UDF
PostgreSQL支持多种UDF语言(例如C,plpgsql, sql, plpython, pljava, plv8, ......),用户通过UDF定义需要转换的格式。
2、动态调用
用户通过动态调用,可以动态的调用对应的UDF,在一个请求中生成不同的格式。
五、云端产品
阿里云 RDS PostgreSQL
阿里云 HybridDB for PostgreSQL
阿里云 OSS
六、类似场景、案例
《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》
七、小结
一些公共日志服务,所有用户的日志都被统一的按格式记录到一起,但是每个最终用户关心的字段都不一样,甚至每个用户对数据转换的需求都不一样。
PostgreSQL支持多种UDF语言(例如C,plpgsql, sql, plpython, pljava, plv8, ......),用户通过UDF定义需要转换的格式。
用户通过动态调用,可以动态的调用对应的UDF,在一个请求中生成不同的格式。
八、参考
《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》