标签
PostgreSQL , 10.0 , sharding , 分布式事务 , 2pc , 两阶段事务
背景
作为一个完整的分布式数据库(sharding),没有分布式事务支持是不行的。
什么是分布式事务呢?比如我们把一个数据库比作一个小朋友,每个小朋友都有100块钱,然后A小朋友给了B小朋友20块钱,这样的话应该是A剩余80块,B剩余120块。如果在交易过程中B小朋友不小心把20块弄丢了,那么会怎么样呢?
理论上应该是交易不成功,A和B都回到100块的状态。 不应该出现中间状态。
PostgreSQL 10.0内置了基于postgres_fdw的sharding功能,同时也对postgres_fdw新增了2PC模块,你可以设置foreign server是否支持2PC.
创建foreign server时,通过参数two_phase_commit 指定即可。
分布式事务实现的原理
什么时候会使用两阶段事务
当写事务涉及到>=2个two_phase_commit=on的shard节点的写操作时。
当事务仅仅涉及单个shard时(包括本地的写),不需要2PC,在本地提交时可以通过pre-commit of notify确保本地和异地的一致性。
如何处理crash后的未知两阶段状态
两阶段事务,如果在第二阶段(即prepare成功后,准备commit前)时,数据库CRASH了,如何处理呢?
PostgreSQL提供了两种处理方法
1. 调用 pg_fdw_xact_resolve() 函数手工处理.
2. 使用pg_fdw_xact_resolver 模块自动处理.
pg_fdw_xact_resolver 是一个worker process,会自动检测是否有未知状态的2PC事务,自动处理.
API
为了支持分布式事务,新增的API如下
两阶段事务,获取shard节点1st prepare阶段产生的事务ID
GetPreparedID() is called to get transaction identifier on pre-commit phase.
非两阶段事务,在shard节点执行提交或回滚
EndForeignTransaction() is called on commit phase and executes either COMMIT or ROLLBACK on foreign servers.
两阶段事务,1st,在shard节点执行预提交操作
PrepareForeignTransaction() is called on pre-commit phase and executes PREPARE TRANSACTION on foreign servers.
两阶段事务,2nd,在shard节点执行提交或回滚prepared xact
ResolvePrepareForeignTransaction() is called on commit phase and execute either COMMIT PREPARED or ROLLBACK PREPARED with given transaction identifier on foreign servers.
如果foreign server没有开启两阶段支持,则不需要使用后两个API
If the foreign data wrapper is not capable of two-phase-commit protocol, last two APIs are not required.
目前的限制
注意,目前只要事务中涉及到2个或以上开启了Two-phase-commit的shard,那么都会开启两阶段事务。
可能会影响一些性能,所以请酌情使用Two-phase-commit开关。
Two-phase-commit protocol is used even when the transaction involves with multiple servers but does not modify data.
分布式2PC事务处理过程原理剖析
为了实现分布式两阶段事务,coordinator节点需要跟踪shard节点的事务。PG通过开辟一块共享内存区域(KnownFDWXact list)来记录shard节点的事务状态,这块区域的改写需要记录REDO日志,在检查点时持久化到$PGDATA/fdw_xact 目录中,每个2PC事务对应这个目录中的一个文件,文件名为(xid, foreign server oid, user oid)。
分布式2PC事务处理过程如下:
1. two_phase_commit = on的foreign server,当开启一个事务时,通过RegisterXactForeignServer()注册连接到MyFDWConnection结构中
2. pre-commit阶段执行如下动作(涉及到事务中用到的shard节点)
2.1 获取two_phase_commit = on的foreign server的本事务的 xact id
2.2 在two_phase_commit = off的节点, 执行commit,需要一个notify,shard才会真正提交。
2.3 将xact id写入fdw_xact共享内存结构中,同时写redo日志XLOG_FDW_XACT_INSERT 。
2.4 在two_phase_commit = on的节点,执行PREPARE TRANSACTION .
2.5 本地调用RecordTransactionCommit(),提交。完成2PC的第一阶段。如果第一阶段执行失败,所有节点包括本地节点都可以回滚,(包括two_phase_commit = off的shard)。
3. 如果第一阶段执行成功,那么表示two_phase_commit = off的节点事务已经成功提交了,本地事务也成功提交了,进入第二阶段。
3.1 从共享内存解析出shard(foreign)节点的xact id,Resolve foreign prepared transaction.
3.2 从共享内存移除 foreign transaction entry 同时写WAL日志 XLOG_FDW_XACT_REMOVE .
如果第一阶段成功了,但是第二阶段没有成功,或者还没有成功前,主备发生了切换,那么就需要recovery两阶段事务了。
分布式两阶段事务的恢复过程剖析
恢复过程可能出现在主库crash,或者备库上面。分布式两阶段事务恢复的原理与普通两阶段事务恢复原理类似。
恢复时,从wal日志中解析XLOG_FDW_XACT_INSERT和XLOG_FDW_XACT_REMOVE record,复原fdw_xact目录中的2PC状态记录文件。
redo恢复阶段结束后,PostgreSQL会扫描pg_fdw_xact目录中还有哪些文件,如果有,说明有未知状态的2PC事务。未提交的2PC事务包括事务ID,shard id, user id,数据库需要处理未知状态的2PC事务,提交或者回滚。
Crash recovery
During crash recovery, the fdw_xact entry are inserted to KnownFDWXact List or removed from KnownFDWXact list when corresponding WAL records are replayed.
After the redo is done fdw_xact file is re-created and then pg_fdw_xact directory is scanned for unresolved foreign prepared transactions.
The files in this directory are named as triplet (xid, foreign server oid, user oid) to create a unique name for each file.
This scan also emits the oldest transaction id with an unresolved prepared foreign transactions.
This affects oldest active transaction id, since the status of this transaction id is required to decide the fate of unresolved prepared foreign transaction.
On standby during WAL replay files are just inserted or removed.
If the standby is required to finish recovery and take over the master, pg_fdw_xact is scanned to read unresolved foreign prepared transactions into the shared memory.
Many of fdw_xact.c code is inspired by two_phase.c code. So recovery mechanism and process are almost same as two_phase.
The patch incorporated recent optimization of two_phase.c.
环境部署例子
1. coordinator库设置
为了支持2PC,在coordinator库上要保留一些2PC状态,所以coordinator库也有一个参数控制最大允许开多少个2PC事务,max_prepared_foreign_transactions 参数就是控制这个的。
公式如下
max_prepared_foreign_transactions = (max_connections) * (# of foreign server with two_phase_commit = on)
同时,coordinator库还要设置一个参数来支持2PC事务
max_prepared_transactions = 10 # 建议设置为max_connections相等即可。
2. sharding库,也需要设置如下
max_prepared_transactions = 100 # same as max_connections
log_statement = all # 方便观察
log_line_prefix = '[S1]' # on shard2 server, we can set '[S2]'
3. coordinator库上面创建postgres_fdw插件
4. 创建foreign server,假设有两个shard库
$ psql
=# CREATE SERVER shard_node1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'shard1', dbname 'postgres', port '5432', two_phase_commit 'on');
CREATE SERVER
=# CREATE SERVER shard_node2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'shard2', dbname 'postgres', port '5342', two_phase_commit 'on')
CREATE SERVER
=# SELECT * FROM pg_foreign_server;
srvname | srvowner | srvfdw | srvtype | srvversion | srvacl | srvoptions
-------------+----------+--------+---------+------------+--------+-------------------------------------------------
shard_node1 | 10 | 16387 | | | | {host=shard1,dbname=postgres,port=5432,two_phase_commit=on}
shard_node2 | 10 | 16387 | | | | {host=shard2,dbname=postgres,port=5432,two_phase_commit=on}
(2 rows)
5. 创建foreign server的user mapping
测试
1. 先来一个非两阶段事务的测试(只涉及到一个foreign server, 不会启用两阶段事务)
=# BEGIN;
=# INSERT INTO ft1 VALUES(1);
=# COMMIT;
查看日志
[S1] LOG: statement: SET search_path = pg_catalog
[S1] LOG: statement: SET timezone = 'UTC'
[S1] LOG: statement: SET datestyle = ISO
[S1] LOG: statement: SET intervalstyle = postgres
[S1] LOG: statement: SET extra_float_digits = 3
[S1] LOG: statement: START TRANSACTION ISOLATION LEVEL REPEATABLE READ
[S1] LOG: execute pgsql_fdw_prep_1: INSERT INTO public.ft1(c) VALUES ($1)
[S1] DETAIL: parameters: $1 = '1'
[S1] LOG: statement: DEALLOCATE pgsql_fdw_prep_1
[S1] LOG: statement: COMMIT TRANSACTION
2. 涉及多个two_phase_commit is on 的foreign server,自动开启两阶段事务
$ psql
=# BEGIN;
=# INSERT INTO ft1 VALUES(2);
=# INSERT INTO ft2 VALUES(2);
=# COMMIT;
日志如下,看到启动了两阶段事务
[S1] LOG: statement: SET search_path = pg_catalog
[S1] LOG: statement: SET timezone = 'UTC'
[S1] LOG: statement: SET datestyle = ISO
[S1] LOG: statement: SET intervalstyle = postgres
[S1] LOG: statement: SET extra_float_digits = 3
[S1] LOG: statement: START TRANSACTION ISOLATION LEVEL REPEATABLE READ
[S1] LOG: execute pgsql_fdw_prep_1: INSERT INTO public.ft1(c) VALUES ($1)
[S1] DETAIL: parameters: $1 = '2'
[S1] LOG: statement: DEALLOCATE pgsql_fdw_prep_1
[S2] LOG: statement: SET search_path = pg_catalog
[S2] LOG: statement: SET timezone = 'UTC'
[S2] LOG: statement: SET datestyle = ISO
[S2] LOG: statement: SET intervalstyle = postgres
[S2] LOG: statement: SET extra_float_digits = 3
[S2] LOG: statement: START TRANSACTION ISOLATION LEVEL REPEATABLE READ
[S2] LOG: execute pgsql_fdw_prep_2: INSERT INTO public.ft2(c) VALUES ($1)
[S2] DETAIL: parameters: $1 = '2'
[S2] LOG: statement: DEALLOCATE pgsql_fdw_prep_2
[S1] LOG: statement: PREPARE TRANSACTION 'px_1389361800_16388_10'
[S2] LOG: statement: PREPARE TRANSACTION 'px_53866648_16389_10'
[S1] LOG: statement: COMMIT PREPARED 'px_1389361800_16388_10'
[S2] LOG: statement: COMMIT PREPARED 'px_53866648_16389_10'
3. 在coordinator节点使用两阶段事务
=# BEGIN;
=# INSERT INTO ft1 VALUES (3);
=# INSERT INTO ft2 VALUES (3);
=# PREPARE TRANSACTION 'gxid';
此时,可以查看到2PC会下发到shard节点
=# SELECT * FROM pg_fdw_xacts;
dbid | transaction | serverid | userid | status | identifier
-------+-------------+----------+--------+----------+-----------------------
13182 | 564 | 16389 | 10 | prepared | px_450388264_16389_10
13182 | 564 | 16388 | 10 | prepared | px_569713952_16388_10
(2 rows)
当执行COMMIT PREPARED 'gxid'时,会提交远程的两阶段事务。
4. 两阶段事务的回滚,保证分布式一致性
=# BEGIN;
=# INSERT INTO lt VALUES(4);
=# INSERT INTO ft1 VALUES(4);
=# INSERT INTO ft2 VALUES(4);
关闭一个shard,提交失败
=# COMMIT; -- error
检查数据是否一致,所有节点均无数据
=# SELECT * FROM lt WHERE c = '4'; -- data on local server
c
---
(0 rows)
=# SELECT * FROM ft2 WHERE c = '4'; -- data on shard2 server
c
---
(0 rows)
当shard1恢复后,可以看到自动回滚掉了
=# SELECT * FROM ft1 WHERE c = '4'; -- data on shard1 server
c
---
(0 rows)
这个patch的讨论,详见邮件组,本文末尾URL。
PostgreSQL社区的作风非常严谨,一个patch可能在邮件组中讨论几个月甚至几年,根据大家的意见反复的修正,patch合并到master已经非常成熟,所以PostgreSQL的稳定性也是远近闻名的。
参考
https://wiki.postgresql.org/wiki/2PC_on_FDW