PostgreSQL Fine-Grained Table,Column,Row Level Audit

通过配置用户级或数据库级的参数可以实现用户以及数据库级别的审计, 但是这样的粒度可能还是太粗糙了.

如果需要更细致的审计, 例如针对某些表的操作审计, 某些用户对某些表的审计, 或者仅仅当某个列的值发生变化时才被审计(记录到LOG或表里面, 本文的例子是将审计信息输出到LOG, 使用raise).

这样的需求可以通过触发器来实现.

接下来以PostgreSQL 9.2为例进行讲解.

# 基础的参数配置

log_destination = 'csvlog'
logging_collector = on
log_directory = 'pg_log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_file_mode = 0600
log_truncate_on_rotation = on
log_rotation_age = 1d
log_rotation_size = 10MB
log_connections = on
log_error_verbosity = verbose
log_timezone = 'PRC'
log_statement = 'none'
log_min_duration_statement = -1

# 创建测试表 : 

digoal=> create table user_account_kb(id int, info text, balance numeric, crt_time timestamp, mod_time timestamp);
CREATE TABLE

# 插入测试数据 : 

digoal=> insert into user_account_kb select generate_series(1,10),'test',trunc(100*random()),now(),null;
INSERT 0 10
digoal=> select * from user_account_kb ;
 id | info | balance |          crt_time          | mod_time
----+------+---------+----------------------------+----------
  1 | test |      66 | 2013-03-20 10:08:15.969523 |
  2 | test |      50 | 2013-03-20 10:08:15.969523 |
  3 | test |      95 | 2013-03-20 10:08:15.969523 |
  4 | test |      90 | 2013-03-20 10:08:15.969523 |
  5 | test |      50 | 2013-03-20 10:08:15.969523 |
  6 | test |      12 | 2013-03-20 10:08:15.969523 |
  7 | test |      39 | 2013-03-20 10:08:15.969523 |
  8 | test |      42 | 2013-03-20 10:08:15.969523 |
  9 | test |       6 | 2013-03-20 10:08:15.969523 |
 10 | test |      11 | 2013-03-20 10:08:15.969523 |
(10 rows)

【审计场景1】

1. 审计某个表的insert, update, delete, truncate语句.

使用after for each statement触发器.

# 创建触发器函数

digoal=> create or replace function trace_statement() returns trigger as $$
declare
  v_user name;
  v_db name;
  v_query text;
begin
  select current_user, current_database(), current_query() into v_user, v_db, v_query;
  raise warning 'user:%, db:%, query:%', v_user, v_db, v_query;
  return null;
end;
$$ language plpgsql;

# 创建触发器

digoal=> create trigger tg1 after insert or update or delete or truncate on user_account_kb for each statement execute procedure trace_statement();
CREATE TRIGGER

# 测试插入

digoal=> insert into user_account_kb values(11,'test',100,now(),null);
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb values(11,'test',100,now(),null);
INSERT 0 1
digoal=> select * from user_account_kb where id=11;
 id | info | balance |          crt_time          | mod_time
----+------+---------+----------------------------+----------
 11 | test |     100 | 2013-03-20 10:18:02.495836 |
(1 row)

# 测试更新

digoal=> update user_account_kb set info='new' where id=11;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;
UPDATE 1
digoal=> select * from user_account_kb where id=11;
 id | info | balance |          crt_time          | mod_time
----+------+---------+----------------------------+----------
 11 | new  |     100 | 2013-03-20 10:18:02.495836 |
(1 row)

# 测试删除

digoal=> delete from user_account_kb where id=11;
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id=11;
DELETE 1

# 测试truncate

digoal=> begin;
BEGIN
digoal=> truncate user_account_kb ;
WARNING:  user:digoal, db:digoal, query:truncate user_account_kb ;
TRUNCATE TABLE
digoal=> rollback;
ROLLBACK

# 注意回滚的操作不会被记录. 即使log_statement = 'ddl', 所以rollback没有被记录下来.

# 这是个弊端. 需要注意. 希望未来的PostgreSQL版本加以改进. 现在的解决办法是修正触发器的触发点, 小结部分会提到.

# 以上操作的日志输出如下.

2013-03-20 10:18:02.496 CST,"digoal","digoal",4521,"[local]",51491867.11a9,9,"INSERT",2013-03-20 10:01:11 CST,1/229,3355,WARNING,01000,"user:digoal, db:digoal, query:insert into user_account_kb values(11,'test',100,now(),null);",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"
2013-03-20 10:19:42.980 CST,"digoal","digoal",4521,"[local]",51491867.11a9,10,"UPDATE",2013-03-20 10:01:11 CST,1/233,3356,WARNING,01000,"user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"
2013-03-20 10:19:53.612 CST,"digoal","digoal",4521,"[local]",51491867.11a9,11,"DELETE",2013-03-20 10:01:11 CST,1/236,3357,WARNING,01000,"user:digoal, db:digoal, query:delete from user_account_kb where id=11;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"
2013-03-20 10:20:18.361 CST,"digoal","digoal",4521,"[local]",51491867.11a9,12,"TRUNCATE TABLE",2013-03-20 10:01:11 CST,1/237,3358,WARNING,01000,"user:digoal, db:digoal, query:truncate user_account_kb ;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"

【审计场景2】

2. 按用户审计某个表的insert, update, delete, truncate语句.

使用after for each statement when (current_user='')触发器.

# 删除前面用到的触发器

digoal=> drop trigger tg1 on user_account_kb;
DROP TRIGGER

# 创建触发器, 这次带上when条件

digoal=> create trigger tg1 after insert or update or delete or truncate on user_account_kb for each statement when (current_user='digoal') execute procedure trace_statement();
CREATE TRIGGER

# 测试digoal用户的操作

digoal=> update user_account_kb set info='new' where id=11;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;
UPDATE 0

# 测试其他用户的操作, 不被审计

digoal=> \c digoal postgres
You are now connected to database "digoal" as user "postgres".
digoal=# update digoal.user_account_kb set info='new' where id=11;
UPDATE 0

【审计场景3】

3. 按条件审计某个表的insert, update, delete语句.

使用after for each row when (new.balance <> old.balance)触发器.

# 删除前面用到的触发器

digoal=> drop trigger tg1 on user_account_kb;
DROP TRIGGER

# 新建触发器函数

digoal=> create or replace function trace_row() returns trigger as $$
declare
  v_user name;
  v_db name;
  v_query text;
begin
select current_user, current_database(), current_query() into v_user, v_db, v_query;
case TG_OP
  when 'UPDATE' then
    raise warning 'user:%, db:%, query:%, newdata:%, olddata:%', v_user, v_db, v_query, NEW, OLD;
  when 'INSERT' then
    raise warning 'user:%, db:%, query:%, newdata:%', v_user, v_db, v_query, NEW;
  when 'DELETE' then
    raise warning 'user:%, db:%, query:%, olddata:%', v_user, v_db, v_query, OLD;
  else
    null;
end case;
return null;
end;
$$ language plpgsql;
CREATE FUNCTION

# 新建触发器

digoal=> create trigger tg1 after insert or update or delete on user_account_kb for each row execute procedure trace_row();
CREATE TRIGGER

# 测试插入

digoal=> insert into user_account_kb select * from user_account_kb limit 3;
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(1,test,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(2,test,50,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(3,test,95,"2013-03-20 10:08:15.969523",)
INSERT 0 3

# 测试更新

digoal=> update user_account_kb set info='new' where id<3;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(1,new,66,"2013-03-20 10:08:15.969523",), olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(2,new,50,"2013-03-20 10:08:15.969523",), olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(1,new,66,"2013-03-20 10:08:15.969523",), olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(2,new,50,"2013-03-20 10:08:15.969523",), olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
UPDATE 4

# 测试删除

digoal=> delete from user_account_kb where id<3;
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(1,new,66,"2013-03-20 10:08:15.969523",)
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(2,new,50,"2013-03-20 10:08:15.969523",)
DELETE 4

【审计场景4】

# 基于列的判断审计

# 删除前面用到的触发器

digoal=> drop trigger tg1 on user_account_kb;
DROP TRIGGER

# 创建触发器, 这里用到when条件, 只有当balance变化时才审计

digoal=> create trigger tg1 after update on user_account_kb for each row when (new.balance<>old.balance) execute procedure trace_row();
CREATE TRIGGER

# 测试

digoal=> update user_account_kb set info='new' where id=4;
UPDATE 1
digoal=> update user_account_kb set info='new',balance=balance where id=4;
UPDATE 1

# balance变化才审计

digoal=> update user_account_kb set info='new',balance=balance-1 where id=4;
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new',balance=balance-1 where id=4;, newdata:(4,new,89,"2013-03-20 10:08:15.969523",), olddata:(4,new,90,"2013-03-20 10:08:15.969523",)
UPDATE 1

【小结】

1. 前面提到ROLLBACK等事务相关的SQL不会被审计到, 所以当SQL执行失败时, LOG已经记录了, 但是没有记录回滚的动作, 所以信息是不完整的, 除非从XLOG/CLOG中取出对应的XID是提交还是回滚. 

为了使记录在LOG中的语句一定是提交的, 那么需要调整一下触发器的创建方法, 使得回滚的事务中所有的SQL都不被审计.

如下,

触发器只有在提交时才会触发, 回滚不触发. (使用constraint来创建触发器)

digoal=> create constraint trigger tg1 after update on user_account_kb DEFERRABLE INITIALLY deferred for each row when (new.balance<>old.balance) execute procedure trace_row();
CREATE TRIGGER
digoal=> begin;
BEGIN
digoal=> update user_account_kb set balance=balance+1 where id=1;
UPDATE 0
digoal=> update user_account_kb set balance=balance+1 where id=4;
UPDATE 1
digoal=> end;
WARNING:  user:digoal, db:digoal, query:end;, newdata:(4,new,90,"2013-03-20 10:08:15.969523",), olddata:(4,new,89,"2013-03-20 10:08:15.969523",)
COMMIT
digoal=> begin;
BEGIN
digoal=> update user_account_kb set balance=balance+1 where id=4;
UPDATE 1
digoal=> rollback;
ROLLBACK

注意以上方法只有after ... for each row才能被用到.

When the CONSTRAINT option is specified, this command creates a constraint trigger. This is the same as a regular trigger except that the timing of the trigger firing can be adjusted using SET CONSTRAINTS. 

Constraint triggers must be AFTER ROW triggers. They can be fired either at the end of the statement causing the triggering event, or at the end of the containing transaction; in the latter case they are said to be deferred. 

A pending deferred-trigger firing can also be forced to happen immediately by using SET CONSTRAINTS. 

Constraint triggers are expected to raise an exception when the constraints they implement are violated.

【参考】

1. http://blog.163.com/digoal@126/blog/static/16387704020132208241607/

2. http://blog.163.com/digoal@126/blog/static/1638770402013283547959/

3. http://blog.163.com/digoal@126/blog/static/1638770402013211102130526/

4. http://blog.163.com/digoal@126/blog/static/163877040201252575529358/
5. http://blog.163.com/digoal@126/blog/static/16387704020132131361949/

6. http://www.postgresql.org/docs/9.2/static/sql-createtrigger.html

时间: 2024-08-21 09:16:19

PostgreSQL Fine-Grained Table,Column,Row Level Audit的相关文章

RLS(Row Level Security) 在 PostgreSQL 9.5 中的使用

PostgreSQL 9.5 更新增加了许多新的功能,比如增加新的JSONB函数,新的GROUPING函数等.RLS(Row Level Security)功能在此次更新中也得到相应的提升.RLS是一种表格行安全机制,利用为每一个表加上你需要使用数据库角色作为一个主要的安全机制. 以下是一个RLS示例: 创建表.修改表.创建策略:使用RLS CREATE TABLE ratings2 ( user_role_name NAME, rating_type_name TEXT, artist_nam

obiee-OBIEE Report 怎么设置table column header 始终显示?

问题描述 OBIEE Report 怎么设置table column header 始终显示? 如图所示 即使这个table里的数据对应的DataSet没有数据了,也能在报表里显示标记的部分(table column header) 请大神,指教!

PostgreSQL cstore_fdw column-ORI table VS MonetDB (with fixed-length width table)

前面测试了PostgreSQL 行存储引擎和MonetDB列存储的性能差别. 包括导入, 查询, 关联查询等. 本文将测试一下PostgreSQL使用cstore_fdw插件, 对比MonetDB的性能. 测试方法和性能数据见 :  http://blog.163.com/digoal@126/blog/static/1638770402014715113449394/ http://blog.163.com/digoal@126/blog/static/16387704020147159365

MonetDB vs PostgreSQL 2, width table with random data

前面一篇简单的对比了一下PostgreSQL和MonetDB在bulk load和简单的统计查询的性能. 因为测试数据比较单一, 可能没有什么说服力.  本文测试环境与之前的一致, 但是使用宽表, 随机离散字符串进行测试. 5000万测试数据, 60个字段, 单表, 容量149GB, (PG96GB) http://blog.163.com/digoal@126/blog/static/16387704020147139412871/ [注意]  PostgreSQL注重的是高并发, 而Mone

Oracle SQL : delete from (query), delete which table&#039;s row?

今天群里面一位朋友问了一个问题如下 : DELETE FROM (SELECT * FROM EPAD_MENU EM, MSG_DESC MD WHERE MD.MD_TABLE = 'epad_menu' AND MD.MD_FIELD = 'em_id' AND MD.MD_VALUE = EM.EM_ID AND EM.EM_BOOK_CODE = 'aa') 为什么删除的是 MSG_DESC的数据? 于是找了个测试库测试一下 : SQL> create table t1(id int

又来勒索,有完没完 - 数据库安全指南

背景 数据库在一个企业中通常都处于非常核心的位置,数据库安全是一个非常严肃的话题. 从机房.网络.服务器.数据交换设备.操作系统.应用程序.数据库本身,数据库所处的环境非常复杂,安全隐患也非常多. 所以本文将从各个层面帮助大家理解和避免一些常见的安全隐患问题. 本文是PostgreSQL使用安全指导性的文章,涉及详细的用法或原理请参考相关链接. 如何安全的使用PostgreSQL,让用户高枕无忧呢? 可以分为如下几个方面来加固你的数据库. 一.认证安全 认证前的安全,端口暴露度的把握. Post

PostgreSQL SQL 语言:数据定义

本文档为PostgreSQL 9.6.0文档,本转载已得到原译者彭煜玮授权. 1.表基础 关系型数据库中的一个表非常像纸上的一张表:它由行和列组成.列的数量和顺序是固定的,并且每一列拥有一个名字.行的数目是变化的,它反映了在一个给定时刻表中存储的数据量.SQL并不保证表中行的顺序.当一个表被读取时,表中的行将以非特定顺序出现,除非明确地指定需要排序.这些将在Chapter 7介绍.此外,SQL不会为行分配唯一的标识符,因此在一个表中可能会存在一些完全相同的行.这是SQL之下的数学模型导致的结果,

Microsoft Word 对象

word|对象 Microsoft Word 对象 目 录 运用Application对象 运用Document对象 运用Range对象 运用Selection对象 运用Find和Replacement对象 运用Table, Column, Row,和 Cell对象 运用其他普通对象 判断对象是否有效 修改Word 命令 运用事件 使用自动宏 使用自动化 Visual Basic支持一个对象集合,该集合中的对象直接对应于Microsoft Word 97中的元素,并且通过用户界面,用户熟悉这些元

Perl访问MSSQL并迁移到MySQL数据库脚本实例_perl

Linux下没有专门为MSSQL设计的访问库,不过介于MSSQL本是从sybase派生出来的,因此用来访问Sybase的库自然也能访问MSSQL,FreeTDS就是这么一个实现.Perl中通常使用DBI来访问数据库,因此在系统安装了FreeTDS之后,可以使用DBI来通过FreeTDS来访问MSSQL数据库,例子: 复制代码 代码如下: using DBI;my $cs = "DRIVER={FreeTDS};SERVER=主机;PORT=1433;DATABASE=数据库;UID=sa;PWD