diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 76d4a697726..c5e28ce5cca 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -7,7 +7,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ - oldest_xmin snapshot_transfer subxact_without_top concurrent_stream + oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ + twophase_snapshot REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/twophase_snapshot.out b/contrib/test_decoding/expected/twophase_snapshot.out new file mode 100644 index 00000000000..3b7f23b5b45 --- /dev/null +++ b/contrib/test_decoding/expected/twophase_snapshot.out @@ -0,0 +1,41 @@ +Parsed test spec with 3 sessions + +starting permutation: s2b s2txid s1init s3b s3txid s2c s2b s2insert s2p s3c s1insert s1start s2cp s1start +step s2b: BEGIN; +step s2txid: SELECT pg_current_xact_id() IS NULL; +?column? + +f +step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +step s3b: BEGIN; +step s3txid: SELECT pg_current_xact_id() IS NULL; +?column? + +f +step s2c: COMMIT; +step s2b: BEGIN; +step s2insert: INSERT INTO do_write DEFAULT VALUES; +step s2p: PREPARE TRANSACTION 'test1'; +step s3c: COMMIT; +step s1init: <... completed> +?column? + +init +step s1insert: INSERT INTO do_write DEFAULT VALUES; +step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'two-phase-commit', '1'); +data + +BEGIN +table public.do_write: INSERT: id[integer]:2 +COMMIT +step s2cp: COMMIT PREPARED 'test1'; +step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'two-phase-commit', '1'); +data + +BEGIN +table public.do_write: INSERT: id[integer]:1 +PREPARE TRANSACTION 'test1' +COMMIT PREPARED 'test1' +?column? + +stop diff --git a/contrib/test_decoding/specs/twophase_snapshot.spec b/contrib/test_decoding/specs/twophase_snapshot.spec new file mode 100644 index 00000000000..bcaf68c6dc3 --- /dev/null +++ b/contrib/test_decoding/specs/twophase_snapshot.spec @@ -0,0 +1,53 @@ +# Test decoding of two-phase transactions during the build of a consistent snapshot. +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); +} + +teardown +{ + DROP TABLE do_write; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');} +step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'two-phase-commit', '1');} +step "s1insert" { INSERT INTO do_write DEFAULT VALUES; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2b" { BEGIN; } +step "s2txid" { SELECT pg_current_xact_id() IS NULL; } +step "s2c" { COMMIT; } +step "s2insert" { INSERT INTO do_write DEFAULT VALUES; } +step "s2p" { PREPARE TRANSACTION 'test1'; } +step "s2cp" { COMMIT PREPARED 'test1'; } + + +session "s3" +setup { SET synchronous_commit=on; } + +step "s3b" { BEGIN; } +step "s3txid" { SELECT pg_current_xact_id() IS NULL; } +step "s3c" { COMMIT; } + +# Force building of a consistent snapshot between a PREPARE and COMMIT PREPARED +# and ensure that the whole transaction is decoded at the time of COMMIT +# PREPARED. +# +# 's1init' step will initialize the replication slot and cause logical decoding +# to wait in initial starting point till the in-progress transaction in s2 is +# committed. 's2c' step will cause logical decoding to go to initial consistent +# point and wait for in-progress transaction s3 to commit. 's3c' step will cause +# logical decoding to find a consistent point while the transaction s2 is +# prepared and not yet committed. This will cause the first s1start to skip +# prepared transaction s2 as that will be before consistent point. The second +# s1start will allow decoding of skipped prepare along with commit prepared done +# as part of s2cp. +permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2c" "s2b" "s2insert" "s2p" "s3c" "s1insert" "s1start" "s2cp" "s1start"