diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a4c76f0136..76d4a697726 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
- spill slot truncate stream stats
+ spill slot truncate stream stats twophase twophase_stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
new file mode 100644
index 00000000000..f9f6bedd1cf
--- /dev/null
+++ b/contrib/test_decoding/expected/twophase.out
@@ -0,0 +1,235 @@
+-- Test prepared transactions. When two-phase-commit is enabled, transactions are
+-- decoded at PREPARE time rather than at COMMIT PREPARED time.
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+-- Test that decoding happens at PREPARE time when two-phase-commit is enabled.
+-- Decoding after COMMIT PREPARED must have all the commands in the transaction.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (1);
+INSERT INTO test_prepared1 VALUES (2);
+-- should show nothing because the xact has not been prepared yet.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+PREPARE TRANSACTION 'test_prepared#1';
+-- should show both the above inserts and the PREPARE TRANSACTION.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ table public.test_prepared1: INSERT: id[integer]:2
+ PREPARE TRANSACTION 'test_prepared#1'
+(4 rows)
+
+COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ table public.test_prepared1: INSERT: id[integer]:2
+ PREPARE TRANSACTION 'test_prepared#1'
+ COMMIT PREPARED 'test_prepared#1'
+(5 rows)
+
+-- Test that rollback of a prepared xact is decoded.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (3);
+PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(3 rows)
+
+ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+-------------------------------------
+ ROLLBACK PREPARED 'test_prepared#2'
+(1 row)
+
+-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
+BEGIN;
+ALTER TABLE test_prepared1 ADD COLUMN data text;
+INSERT INTO test_prepared1 VALUES (4, 'frakbar');
+PREPARE TRANSACTION 'test_prepared#3';
+-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+ relation | locktype | mode
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
+-- The insert should show the newly altered column but not the DDL.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+(3 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+INSERT INTO test_prepared2 VALUES (5);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:5
+ COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+ COMMIT PREPARED 'test_prepared#3'
+(4 rows)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (6);
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
+ COMMIT
+(6 rows)
+
+-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (8, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (9, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+SELECT 'test_prepared1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+ relation | locktype | mode
+----------------+----------+---------------------
+ test_prepared1 | relation | RowExclusiveLock
+ test_prepared1 | relation | ShareLock
+ test_prepared1 | relation | AccessExclusiveLock
+(3 rows)
+
+-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
+-- call should return within a second.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+---------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+(4 rows)
+
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+---------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+ COMMIT PREPARED 'test_prepared_lock'
+(5 rows)
+
+-- Test savepoints and sub-xacts. Creating savepoints will create
+-- sub-xacts implicitly.
+BEGIN;
+CREATE TABLE test_prepared_savepoint (a int);
+INSERT INTO test_prepared_savepoint VALUES (1);
+SAVEPOINT test_savepoint;
+INSERT INTO test_prepared_savepoint VALUES (2);
+ROLLBACK TO SAVEPOINT test_savepoint;
+PREPARE TRANSACTION 'test_prepared_savepoint';
+-- should show only 1, not 2
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------------------------------------------------------------
+ BEGIN
+ table public.test_prepared_savepoint: INSERT: a[integer]:1
+ PREPARE TRANSACTION 'test_prepared_savepoint'
+(3 rows)
+
+COMMIT PREPARED 'test_prepared_savepoint';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------------------------------------------------------------
+ BEGIN
+ table public.test_prepared_savepoint: INSERT: a[integer]:1
+ PREPARE TRANSACTION 'test_prepared_savepoint'
+ COMMIT PREPARED 'test_prepared_savepoint'
+(4 rows)
+
+-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+---------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:20 data[text]:null
+ COMMIT
+(3 rows)
+
+-- Test 8:
+-- cleanup and make sure results are also empty
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
new file mode 100644
index 00000000000..3acc4acd365
--- /dev/null
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -0,0 +1,147 @@
+-- Test streaming of two-phase commits
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+CREATE TABLE stream_test(data text);
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+------
+(0 rows)
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+ ?column?
+----------
+ msg5
+(1 row)
+
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK TO s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1';
+-- should show the inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ data
+----------------------------------------------------------
+ streaming message: transactional: 1 prefix: test, sz: 50
+ opening a streamed block for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ closing a streamed block for transaction
+ preparing streamed transaction 'test1'
+(24 rows)
+
+COMMIT PREPARED 'test1';
+--should show the COMMIT PREPARED and the other changes in the transaction
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ data
+-------------------------------------------------------------
+ BEGIN
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
+ PREPARE TRANSACTION 'test1'
+ COMMIT PREPARED 'test1'
+(23 rows)
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
+-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+ ?column?
+----------
+ msg5
+(1 row)
+
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1_nodecode';
+-- should NOT show inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ data
+----------------------------------------------------------
+ streaming message: transactional: 1 prefix: test, sz: 50
+(1 row)
+
+COMMIT PREPARED 'test1_nodecode';
+-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ data
+-------------------------------------------------------------
+ BEGIN
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
+ COMMIT
+(22 rows)
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/twophase.sql b/contrib/test_decoding/sql/twophase.sql
new file mode 100644
index 00000000000..894e4f5baf1
--- /dev/null
+++ b/contrib/test_decoding/sql/twophase.sql
@@ -0,0 +1,112 @@
+-- Test prepared transactions. When two-phase-commit is enabled, transactions are
+-- decoded at PREPARE time rather than at COMMIT PREPARED time.
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+
+-- Test that decoding happens at PREPARE time when two-phase-commit is enabled.
+-- Decoding after COMMIT PREPARED must have all the commands in the transaction.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (1);
+INSERT INTO test_prepared1 VALUES (2);
+-- should show nothing because the xact has not been prepared yet.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+PREPARE TRANSACTION 'test_prepared#1';
+-- should show both the above inserts and the PREPARE TRANSACTION.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that rollback of a prepared xact is decoded.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (3);
+PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
+BEGIN;
+ALTER TABLE test_prepared1 ADD COLUMN data text;
+INSERT INTO test_prepared1 VALUES (4, 'frakbar');
+PREPARE TRANSACTION 'test_prepared#3';
+-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+-- The insert should show the newly altered column but not the DDL.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+INSERT INTO test_prepared2 VALUES (5);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (6);
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (8, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (9, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+SELECT 'test_prepared1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
+-- call should return within a second.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test savepoints and sub-xacts. Creating savepoints will create
+-- sub-xacts implicitly.
+BEGIN;
+CREATE TABLE test_prepared_savepoint (a int);
+INSERT INTO test_prepared_savepoint VALUES (1);
+SAVEPOINT test_savepoint;
+INSERT INTO test_prepared_savepoint VALUES (2);
+ROLLBACK TO SAVEPOINT test_savepoint;
+PREPARE TRANSACTION 'test_prepared_savepoint';
+-- should show only 1, not 2
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_savepoint';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 8:
+-- cleanup and make sure results are also empty
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql
new file mode 100644
index 00000000000..e9dd44fdb37
--- /dev/null
+++ b/contrib/test_decoding/sql/twophase_stream.sql
@@ -0,0 +1,45 @@
+-- Test streaming of two-phase commits
+
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE stream_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK TO s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1';
+-- should show the inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1';
+--should show the COMMIT PREPARED and the other changes in the transaction
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
+-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1_nodecode';
+-- should NOT show inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1_nodecode';
+-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index d63f90ff282..cf705ed9cda 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -165,7 +165,58 @@ COMMIT 693
ControlC
$ pg_recvlogical -d postgres --slot=test --drop-slot
-
+
+
+ The following example shows SQL interface that can be used to decode prepared
+ transactions. Before you use two-phase commit commands, you must set
+ max_prepared_transactions to at least 1. You must also set
+ the option 'two-phase-commit' to 1 while calling
+ pg_logical_slot_get_changes. Note that we will stream
+ the entire transaction after the commit if it is not already decoded.
+
+
+postgres=# BEGIN;
+postgres=*# INSERT INTO data(data) VALUES('5');
+postgres=*# PREPARE TRANSACTION 'test_prepared1';
+
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+ lsn | xid | data
+-----------+-----+---------------------------------------------------------
+ 0/1689DC0 | 529 | BEGIN 529
+ 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
+ 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
+(3 rows)
+
+postgres=# COMMIT PREPARED 'test_prepared1';
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+ lsn | xid | data
+-----------+-----+--------------------------------------------
+ 0/1689DC0 | 529 | BEGIN 529
+ 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
+ 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
+ 0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
+(4 row)
+
+postgres=#-- you can also rollback a prepared transaction
+postgres=# BEGIN;
+postgres=*# INSERT INTO data(data) VALUES('6');
+postgres=*# PREPARE TRANSACTION 'test_prepared2';
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+ lsn | xid | data
+-----------+-----+---------------------------------------------------------
+ 0/168A180 | 530 | BEGIN 530
+ 0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
+ 0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
+(3 rows)
+
+postgres=# ROLLBACK PREPARED 'test_prepared2';
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+ lsn | xid | data
+-----------+-----+----------------------------------------------
+ 0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
+(1 row)
+
+
Logical Decoding Concepts
@@ -1126,4 +1177,55 @@ stream_commit_cb(...); <-- commit of the streamed transaction
+
+
+ Two-phase commit support for Logical Decoding
+
+
+ With the basic output plugin callbacks (eg., begin_cb,
+ change_cb, commit_cb and
+ message_cb) two-phase commit commands like
+ PREPARE TRANSACTION, COMMIT PREPARED
+ and ROLLBACK PREPARED are not decoded. While the
+ PREPARE TRANSACTION is ignored,
+ COMMIT PREPARED is decoded as a COMMIT
+ and ROLLBACK PREPARED is decoded as a
+ ROLLBACK.
+
+
+
+ To support the streaming of two-phase commands, an output plugin needs to
+ provide additional callbacks. There are multiple two-phase commit callbacks
+ that are required, (begin_prepare_cb,
+ prepare_cb, commit_prepared_cb,
+ rollback_prepared_cb and
+ stream_prepare_cb) and an optional callback
+ (filter_prepare_cb).
+
+
+
+ If the output plugin callbacks for decoding two-phase commit commands are
+ provided, then on PREPARE TRANSACTION, the changes of
+ that transaction are decoded, passed to the output plugin, and the
+ prepare_cb callback is invoked. This differs from the
+ basic decoding setup where changes are only passed to the output plugin
+ when a transaction is committed. The start of a prepared transaction is
+ indicated by the begin_prepare_cb callback.
+
+
+
+ When a prepared transaction is rollbacked using the
+ ROLLBACK PREPARED, then the
+ rollback_prepared_cb callback is invoked and when the
+ prepared transaction is committed using COMMIT PREPARED,
+ then the commit_prepared_cb callback is invoked.
+
+
+
+ Optionally the output plugin can specify a name pattern in the
+ filter_prepare_cb and transactions with gid containing
+ that name pattern will not be decoded as a two-phase commit transaction.
+
+
+
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1887ba79440..23ab3cf6052 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -67,13 +67,24 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_commit *parsed, TransactionId xid);
+ xl_xact_parsed_commit *parsed, TransactionId xid,
+ bool two_phase);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_abort *parsed, TransactionId xid);
+ xl_xact_parsed_abort *parsed, TransactionId xid,
+ bool two_phase);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed);
+
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+/* helper functions for decoding transactions */
+static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
+ XLogRecordBuffer *buf, Oid dbId,
+ RepOriginId origin_id);
+
/*
* Take every XLogReadRecord()ed record and perform the actions required to
* decode it using the output plugin already setup in the logical decoding
@@ -244,6 +255,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_commit *xlrec;
xl_xact_parsed_commit parsed;
TransactionId xid;
+ bool two_phase = false;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
@@ -253,7 +265,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
xid = parsed.twophase_xid;
- DecodeCommit(ctx, buf, &parsed, xid);
+ /*
+ * We would like to process the transaction in a two-phase
+ * manner iff output plugin supports two-phase commits and
+ * doesn't filter the transaction at prepare time.
+ */
+ if (info == XLOG_XACT_COMMIT_PREPARED)
+ two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+
+ DecodeCommit(ctx, buf, &parsed, xid, two_phase);
break;
}
case XLOG_XACT_ABORT:
@@ -262,6 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_abort *xlrec;
xl_xact_parsed_abort parsed;
TransactionId xid;
+ bool two_phase = false;
xlrec = (xl_xact_abort *) XLogRecGetData(r);
ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
@@ -271,7 +292,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
xid = parsed.twophase_xid;
- DecodeAbort(ctx, buf, &parsed, xid);
+ /*
+ * We would like to process the transaction in a two-phase
+ * manner iff output plugin supports two-phase commits and
+ * doesn't filter the transaction at prepare time.
+ */
+ if (info == XLOG_XACT_ABORT_PREPARED)
+ two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+
+ DecodeAbort(ctx, buf, &parsed, xid, two_phase);
break;
}
case XLOG_XACT_ASSIGNMENT:
@@ -312,17 +341,30 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
break;
case XLOG_XACT_PREPARE:
+ {
+ xl_xact_parsed_prepare parsed;
+ xl_xact_prepare *xlrec;
- /*
- * Currently decoding ignores PREPARE TRANSACTION and will just
- * decode the transaction when the COMMIT PREPARED is sent or
- * throw away the transaction's contents when a ROLLBACK PREPARED
- * is received. In the future we could add code to expose prepared
- * transactions in the changestream allowing for a kind of
- * distributed 2PC.
- */
- ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
- break;
+ /* ok, parse it */
+ xlrec = (xl_xact_prepare *) XLogRecGetData(r);
+ ParsePrepareRecord(XLogRecGetInfo(buf->record),
+ xlrec, &parsed);
+
+ /*
+ * We would like to process the transaction in a two-phase
+ * manner iff output plugin supports two-phase commits and
+ * doesn't filter the transaction at prepare time.
+ */
+ if (FilterPrepare(ctx, parsed.twophase_gid))
+ {
+ ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+ buf->origptr);
+ break;
+ }
+
+ DecodePrepare(ctx, buf, &parsed);
+ break;
+ }
default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
}
@@ -520,6 +562,32 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+/*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as a regular commit later.
+ */
+static inline bool
+FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+{
+ /*
+ * Skip if decoding of two-phase transactions at PREPARE time is not
+ * enabled. In that case, all two-phase transactions are considered
+ * filtered out and will be applied as regular transactions at COMMIT
+ * PREPARED.
+ */
+ if (!ctx->twophase)
+ return true;
+
+ /*
+ * The filter_prepare callback is optional. When not supplied, all
+ * prepared transactions should go through.
+ */
+ if (ctx->callbacks.filter_prepare_cb == NULL)
+ return false;
+
+ return filter_prepare_cb_wrapper(ctx, gid);
+}
+
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
@@ -582,10 +650,15 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Consolidated commit record handling between the different form of commit
* records.
+ *
+ * 'two_phase' indicates that caller wants to process the transaction in two
+ * phases, first process prepare if not already done and then process
+ * commit_prepared.
*/
static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_commit *parsed, TransactionId xid)
+ xl_xact_parsed_commit *parsed, TransactionId xid,
+ bool two_phase)
{
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
TimestampTz commit_time = parsed->xact_time;
@@ -606,15 +679,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* the reorderbuffer to forget the content of the (sub-)transactions
* if not.
*
- * There can be several reasons we might not be interested in this
- * transaction:
- * 1) We might not be interested in decoding transactions up to this
- * LSN. This can happen because we previously decoded it and now just
- * are restarting or if we haven't assembled a consistent snapshot yet.
- * 2) The transaction happened in another database.
- * 3) The output plugin is not interested in the origin.
- * 4) We are doing fast-forwarding
- *
* We can't just use ReorderBufferAbort() here, because we need to execute
* the transaction's invalidations. This currently won't be needed if
* we're just skipping over the transaction because currently we only do
@@ -627,9 +691,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* relevant syscaches.
* ---
*/
- if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
- ctx->fast_forward || FilterByOrigin(ctx, origin_id))
+ if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
{
for (i = 0; i < parsed->nsubxacts; i++)
{
@@ -647,34 +709,163 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
buf->origptr, buf->endptr);
}
- /* replay actions of all transaction + subtransactions in order */
- ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time, origin_id, origin_lsn);
+ /*
+ * Send the final commit record if the transaction data is already
+ * decoded, otherwise, process the entire transaction.
+ */
+ if (two_phase)
+ {
+ ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn,
+ parsed->twophase_gid, true);
+ }
+ else
+ {
+ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn);
+ }
/*
- * Update the decoding stats at transaction commit/abort. It is not clear
- * that sending more or less frequently than this would be better.
+ * Update the decoding stats at transaction prepare/commit/abort. It is
+ * not clear that sending more or less frequently than this would be
+ * better.
*/
UpdateDecodingStats(ctx);
}
+/*
+ * Decode PREPARE record. Similar logic as in DecodeCommit.
+ *
+ * Note that we don't skip prepare even if have detected concurrent abort
+ * because it is quite possible that we had already sent some changes before we
+ * detect abort in which case we need to abort those changes in the subscriber.
+ * To abort such changes, we do send the prepare and then the rollback prepared
+ * which is what happened on the publisher-side as well. Now, we can invent a
+ * new abort API wherein in such cases we send abort and skip sending prepared
+ * and rollback prepared but then it is not that straightforward because we
+ * might have streamed this transaction by that time in which case it is
+ * handled when the rollback is encountered. It is not impossible to optimize
+ * the concurrent abort case but it can introduce design complexity w.r.t
+ * handling different cases so leaving it for now as it doesn't seem worth it.
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogRecPtr origin_lsn = parsed->origin_lsn;
+ TimestampTz prepare_time = parsed->xact_time;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+ int i;
+ TransactionId xid = parsed->twophase_xid;
+
+ if (parsed->origin_timestamp != 0)
+ prepare_time = parsed->origin_timestamp;
+
+ /*
+ * Remember the prepare info for a txn so that it can be used later in
+ * commit prepared if required. See ReorderBufferFinishPrepared.
+ */
+ if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
+ buf->endptr, prepare_time, origin_id,
+ origin_lsn))
+ return;
+
+ /* We can't start streaming unless a consistent state is reached. */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
+ {
+ ReorderBufferSkipPrepare(ctx->reorder, xid);
+ return;
+ }
+
+ /*
+ * Check whether we need to process this transaction. See
+ * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
+ * transaction.
+ *
+ * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
+ * hasn't yet been committed, removing this txn before a commit might
+ * result in the computation of an incorrect restart_lsn. See
+ * SnapBuildProcessRunningXacts. But we need to process cache
+ * invalidations if there are any for the reasons mentioned in
+ * DecodeCommit.
+ */
+ if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
+ {
+ ReorderBufferSkipPrepare(ctx->reorder, xid);
+ ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
+ return;
+ }
+
+ /* Tell the reorderbuffer about the surviving subtransactions. */
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+ buf->origptr, buf->endptr);
+ }
+
+ /* replay actions of all transaction + subtransactions in order */
+ ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
+
+ /*
+ * Update the decoding stats at transaction prepare/commit/abort. It is
+ * not clear that sending more or less frequently than this would be
+ * better.
+ */
+ UpdateDecodingStats(ctx);
+}
+
+
/*
* Get the data from the various forms of abort records and pass it on to
- * snapbuild.c and reorderbuffer.c
+ * snapbuild.c and reorderbuffer.c.
+ *
+ * 'two_phase' indicates to finish prepared transaction.
*/
static void
DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_abort *parsed, TransactionId xid)
+ xl_xact_parsed_abort *parsed, TransactionId xid,
+ bool two_phase)
{
int i;
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ TimestampTz abort_time = parsed->xact_time;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+ bool skip_xact;
- for (i = 0; i < parsed->nsubxacts; i++)
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
{
- ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
- buf->record->EndRecPtr);
+ origin_lsn = parsed->origin_lsn;
+ abort_time = parsed->origin_timestamp;
}
- ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+ /*
+ * Check whether we need to process this transaction. See
+ * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
+ * transaction.
+ */
+ skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
+
+ /*
+ * Send the final rollback record for a prepared transaction unless we
+ * need to skip it. For non-two-phase xacts, simply forget the xact.
+ */
+ if (two_phase && !skip_xact)
+ {
+ ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ abort_time, origin_id, origin_lsn,
+ parsed->twophase_gid, false);
+ }
+ else
+ {
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
+ buf->record->EndRecPtr);
+ }
+
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+ }
/* update the decoding stats */
UpdateDecodingStats(ctx);
@@ -1080,3 +1271,24 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
header->t_infomask2 = xlhdr.t_infomask2;
header->t_hoff = xlhdr.t_hoff;
}
+
+/*
+ * Check whether we are interested in this specific transaction.
+ *
+ * There can be several reasons we might not be interested in this
+ * transaction:
+ * 1) We might not be interested in decoding transactions up to this
+ * LSN. This can happen because we previously decoded it and now just
+ * are restarting or if we haven't assembled a consistent snapshot yet.
+ * 2) The transaction happened in another database.
+ * 3) The output plugin is not interested in the origin.
+ * 4) We are doing fast-forwarding
+ */
+static bool
+DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ Oid txn_dbid, RepOriginId origin_id)
+{
+ return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
+ ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index e49b5115175..605ec0986ca 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,15 +1083,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
Assert(!ctx->fast_forward);
- /*
- * Skip if decoding of two-phase transactions at PREPARE time is not
- * enabled. In that case, all two-phase transactions are considered
- * filtered out and will be applied as regular transactions at COMMIT
- * PREPARED.
- */
- if (!ctx->twophase)
- return true;
-
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "filter_prepare";
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index df63b90a67a..315bfe7cae2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
-static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ bool txn_prepared);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
@@ -422,6 +423,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* free data that's contained */
+ if (txn->gid != NULL)
+ {
+ pfree(txn->gid);
+ txn->gid = NULL;
+ }
+
if (txn->tuplecid_hash != NULL)
{
hash_destroy(txn->tuplecid_hash);
@@ -1516,12 +1523,18 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Discard changes from a transaction (and subtransactions), after streaming
- * them. Keep the remaining info - transactions, tuplecids, invalidations and
- * snapshots.
+ * Discard changes from a transaction (and subtransactions), either after
+ * streaming or decoding them at PREPARE. Keep the remaining info -
+ * transactions, tuplecids, invalidations and snapshots.
+ *
+ * We additionaly remove tuplecids after decoding the transaction at prepare
+ * time as we only need to perform invalidation at rollback or commit prepared.
+ *
+ * 'txn_prepared' indicates that we have decoded the transaction at prepare
+ * time.
*/
static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
{
dlist_mutable_iter iter;
@@ -1540,7 +1553,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
- ReorderBufferTruncateTXN(rb, subtxn);
+ ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
}
/* cleanup changes in the txn */
@@ -1574,9 +1587,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
* about the toplevel xact (we send the XID in all messages), but we never
* stream XIDs of empty subxacts.
*/
- if ((!txn->toptxn) || (txn->nentries_mem != 0))
+ if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
txn->txn_flags |= RBTXN_IS_STREAMED;
+ if (txn_prepared)
+ {
+ /*
+ * If this is a prepared txn, cleanup the tuplecids we stored for
+ * decoding catalog snapshot access. They are always stored in the
+ * toplevel transaction.
+ */
+ dlist_foreach_modify(iter, &txn->tuplecids)
+ {
+ ReorderBufferChange *change;
+
+ change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+ /* Check we're not mixing changes from different transactions. */
+ Assert(change->txn == txn);
+ Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+
+ /* Remove the change from its containing list. */
+ dlist_delete(&change->node);
+
+ ReorderBufferReturnChange(rb, change, true);
+ }
+ }
+
/*
* Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
* memory. We could also keep the hash table and update it with new ctid
@@ -1756,9 +1793,10 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
}
/*
- * If the transaction was (partially) streamed, we need to commit it in a
- * 'streamed' way. That is, we first stream the remaining part of the
- * transaction, and then invoke stream_commit message.
+ * If the transaction was (partially) streamed, we need to prepare or commit
+ * it in a 'streamed' way. That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_prepare or stream_commit message as per
+ * the case.
*/
static void
ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
@@ -1768,29 +1806,49 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferStreamTXN(rb, txn);
- rb->stream_commit(rb, txn, txn->final_lsn);
+ if (rbtxn_prepared(txn))
+ {
+ /*
+ * Note, we send stream prepare even if a concurrent abort is
+ * detected. See DecodePrepare for more information.
+ */
+ rb->stream_prepare(rb, txn, txn->final_lsn);
- ReorderBufferCleanupTXN(rb, txn);
+ /*
+ * This is a PREPARED transaction, part of a two-phase commit. The
+ * full cleanup will happen as part of the COMMIT PREPAREDs, so now
+ * just truncate txn by removing changes and tuple_cids.
+ */
+ ReorderBufferTruncateTXN(rb, txn, true);
+ /* Reset the CheckXidAlive */
+ CheckXidAlive = InvalidTransactionId;
+ }
+ else
+ {
+ rb->stream_commit(rb, txn, txn->final_lsn);
+ ReorderBufferCleanupTXN(rb, txn);
+ }
}
/*
* Set xid to detect concurrent aborts.
*
- * While streaming an in-progress transaction there is a possibility that the
- * (sub)transaction might get aborted concurrently. In such case if the
- * (sub)transaction has catalog update then we might decode the tuple using
- * wrong catalog version. For example, suppose there is one catalog tuple with
- * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple
- * and after that we will have two tuples (xmin: 500, xmax: 501) and
- * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction
- * say 502 updates the same catalog tuple then the first tuple will be changed
- * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode
- * the tuple inserted/updated in 501 after the catalog update, we will see the
- * catalog tuple with (xmin: 500, xmax: 502) as visible because it will
- * consider that the tuple is deleted by xid 502 which is not visible to our
- * snapshot. And when we will try to decode with that catalog tuple, it can
- * lead to a wrong result or a crash. So, it is necessary to detect
- * concurrent aborts to allow streaming of in-progress transactions.
+ * While streaming an in-progress transaction or decoding a prepared
+ * transaction there is a possibility that the (sub)transaction might get
+ * aborted concurrently. In such case if the (sub)transaction has catalog
+ * update then we might decode the tuple using wrong catalog version. For
+ * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
+ * the transaction 501 updates the catalog tuple and after that we will have
+ * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
+ * aborted and some other transaction say 502 updates the same catalog tuple
+ * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
+ * problem is that when we try to decode the tuple inserted/updated in 501
+ * after the catalog update, we will see the catalog tuple with (xmin: 500,
+ * xmax: 502) as visible because it will consider that the tuple is deleted by
+ * xid 502 which is not visible to our snapshot. And when we will try to
+ * decode with that catalog tuple, it can lead to a wrong result or a crash.
+ * So, it is necessary to detect concurrent aborts to allow streaming of
+ * in-progress transactions or decoding of prepared transactions.
*
* For detecting the concurrent abort we set CheckXidAlive to the current
* (sub)transaction's xid for which this change belongs to. And, during
@@ -1799,7 +1857,10 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
* and discard the already streamed changes on such an error. We might have
* already streamed some of the changes for the aborted (sub)transaction, but
* that is fine because when we decode the abort we will stream abort message
- * to truncate the changes in the subscriber.
+ * to truncate the changes in the subscriber. Similarly, for prepared
+ * transactions, we stop decoding if concurrent abort is detected and then
+ * rollback the changes when rollback prepared is encountered. See
+ * DecodePreare.
*/
static inline void
SetupCheckXidLive(TransactionId xid)
@@ -1901,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferChange *specinsert)
{
/* Discard the changes that we just streamed */
- ReorderBufferTruncateTXN(rb, txn);
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Free all resources allocated for toast reconstruction */
ReorderBufferToastReset(rb, txn);
@@ -1913,15 +1974,19 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
specinsert = NULL;
}
- /* Stop the stream. */
- rb->stream_stop(rb, txn, last_lsn);
-
- /* Remember the command ID and snapshot for the streaming run */
- ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ /*
+ * For the streaming case, stop the stream and remember the command ID and
+ * snapshot for the streaming run.
+ */
+ if (rbtxn_is_streamed(txn))
+ {
+ rb->stream_stop(rb, txn, last_lsn);
+ ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ }
}
/*
- * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN.
+ * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
*
* Send data of a transaction (and its subtransactions) to the
* output plugin. We iterate over the top and subtransactions (using a k-way
@@ -1974,9 +2039,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
else
StartTransactionCommand();
- /* We only need to send begin/commit for non-streamed transactions. */
+ /*
+ * We only need to send begin/begin-prepare for non-streamed
+ * transactions.
+ */
if (!streaming)
- rb->begin(rb, txn);
+ {
+ if (rbtxn_prepared(txn))
+ rb->begin_prepare(rb, txn);
+ else
+ rb->begin(rb, txn);
+ }
ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
@@ -2007,8 +2080,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
prev_lsn = change->lsn;
- /* Set the current xid to detect concurrent aborts. */
- if (streaming)
+ /*
+ * Set the current xid to detect concurrent aborts. This is
+ * required for the cases when we decode the changes before the
+ * COMMIT record is processed.
+ */
+ if (streaming || rbtxn_prepared(change->txn))
{
curtxn = change->txn;
SetupCheckXidLive(curtxn->xid);
@@ -2299,7 +2376,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
}
else
- rb->commit(rb, txn, commit_lsn);
+ {
+ /*
+ * Call either PREPARE (for two-phase transactions) or COMMIT (for
+ * regular ones).
+ */
+ if (rbtxn_prepared(txn))
+ rb->prepare(rb, txn, commit_lsn);
+ else
+ rb->commit(rb, txn, commit_lsn);
+ }
/* this is just a sanity check against bad output plugin behaviour */
if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -2333,15 +2419,22 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
RollbackAndReleaseCurrentSubTransaction();
/*
- * If we are streaming the in-progress transaction then discard the
- * changes that we just streamed, and mark the transactions as
- * streamed (if they contained changes). Otherwise, remove all the
- * changes and deallocate the ReorderBufferTXN.
+ * We are here due to one of the four reasons: 1. Decoding an
+ * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
+ * prepared txn that was (partially) streamed. 4. Decoding a committed
+ * txn.
+ *
+ * For 1, we allow truncation of txn data by removing the changes
+ * already streamed but still keeping other things like invalidations,
+ * snapshot, and tuplecids. For 2 and 3, we indicate
+ * ReorderBufferTruncateTXN to do more elaborate truncation of txn
+ * data as the entire transaction has been decoded except for commit.
+ * For 4, as the entire txn has been decoded, we can fully clean up
+ * the TXN reorder buffer.
*/
- if (streaming)
+ if (streaming || rbtxn_prepared(txn))
{
- ReorderBufferTruncateTXN(rb, txn);
-
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
@@ -2374,17 +2467,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
/*
* The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
- * abort of the (sub)transaction we are streaming. We need to do the
- * cleanup and return gracefully on this error, see SetupCheckXidLive.
+ * abort of the (sub)transaction we are streaming or preparing. We
+ * need to do the cleanup and return gracefully on this error, see
+ * SetupCheckXidLive.
*/
if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
{
/*
- * This error can only occur when we are sending the data in
- * streaming mode and the streaming is not finished yet.
+ * This error can occur either when we are sending the data in
+ * streaming mode and the streaming is not finished yet or when we
+ * are sending the data out on a PREPARE during a two-phase
+ * commit.
*/
- Assert(streaming);
- Assert(stream_started);
+ Assert(streaming || rbtxn_prepared(txn));
+ Assert(stream_started || rbtxn_prepared(txn));
/* Cleanup the temporary error state. */
FlushErrorState();
@@ -2414,26 +2510,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* ReorderBufferCommitChild(), even if previously assigned to the toplevel
* transaction with ReorderBufferAssignChild.
*
- * This interface is called once a toplevel commit is read for both streamed
- * as well as non-streamed transactions.
+ * This interface is called once a prepare or toplevel commit is read for both
+ * streamed as well as non-streamed transactions.
*/
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+static void
+ReorderBufferReplay(ReorderBufferTXN *txn,
+ ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn)
{
- ReorderBufferTXN *txn;
Snapshot snapshot_now;
CommandId command_id = FirstCommandId;
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
-
- /* unknown transaction, nothing to replay */
- if (txn == NULL)
- return;
-
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
@@ -2463,7 +2552,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (txn->base_snapshot == NULL)
{
Assert(txn->ninvalidations == 0);
- ReorderBufferCleanupTXN(rb, txn);
+
+ /*
+ * Removing this txn before a commit might result in the computation
+ * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
+ */
+ if (!rbtxn_prepared(txn))
+ ReorderBufferCleanupTXN(rb, txn);
return;
}
@@ -2474,6 +2569,178 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
command_id, false);
}
+/*
+ * Commit a transaction.
+ *
+ * See comments for ReorderBufferReplay().
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
+ origin_id, origin_lsn);
+}
+
+/*
+ * Record the prepare information for a transaction.
+ */
+bool
+ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
+ TimestampTz prepare_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return false;
+
+ /*
+ * Remember the prepare information to be later used by commit prepared in
+ * case we skip doing prepare.
+ */
+ txn->final_lsn = prepare_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = prepare_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ return true;
+}
+
+/* Remember that we have skipped prepare */
+void
+ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return;
+
+ txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
+}
+
+/*
+ * Prepare a two-phase transaction.
+ *
+ * See comments for ReorderBufferReplay().
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+ char *gid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ txn->txn_flags |= RBTXN_PREPARE;
+ txn->gid = pstrdup(gid);
+
+ /* The prepare info must have been updated in txn by now. */
+ Assert(txn->final_lsn != InvalidXLogRecPtr);
+
+ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
+ txn->commit_time, txn->origin_id, txn->origin_lsn);
+}
+
+/*
+ * This is used to handle COMMIT/ROLLBACK PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time, RepOriginId origin_id,
+ XLogRecPtr origin_lsn, char *gid, bool is_commit)
+{
+ ReorderBufferTXN *txn;
+ XLogRecPtr prepare_end_lsn;
+ TimestampTz prepare_time;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return;
+
+ /*
+ * By this time the txn has the prepare record information, remember it to
+ * be later used for rollback.
+ */
+ prepare_end_lsn = txn->end_lsn;
+ prepare_time = txn->commit_time;
+
+ /* add the gid in the txn */
+ txn->gid = pstrdup(gid);
+
+ /*
+ * It is possible that this transaction is not decoded at prepare time
+ * either because by that time we didn't have a consistent snapshot or it
+ * was decoded earlier but we have restarted. We can't distinguish between
+ * those two cases so we send the prepare in both the cases and let
+ * downstream decide whether to process or skip it. We don't need to
+ * decode the xact for aborts if it is not done already.
+ */
+ if (!rbtxn_prepared(txn) && is_commit)
+ {
+ txn->txn_flags |= RBTXN_PREPARE;
+
+ /*
+ * The prepare info must have been updated in txn even if we skip
+ * prepare.
+ */
+ Assert(txn->final_lsn != InvalidXLogRecPtr);
+
+ /*
+ * By this time the txn has the prepare record information and it is
+ * important to use that so that downstream gets the accurate
+ * information. If instead, we have passed commit information here
+ * then downstream can behave as it has already replayed commit
+ * prepared after the restart.
+ */
+ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
+ txn->commit_time, txn->origin_id, txn->origin_lsn);
+ }
+
+ txn->final_lsn = commit_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ if (is_commit)
+ rb->commit_prepared(rb, txn, commit_lsn);
+ else
+ rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
+
+ /* cleanup: make sure there's no cache pollution */
+ ReorderBufferExecuteInvalidations(txn->ninvalidations,
+ txn->invalidations);
+ ReorderBufferCleanupTXN(rb, txn);
+}
+
/*
* Abort a transaction that possibly has previous changes. Needs to be first
* called for subtransactions and then for the toplevel xid.
@@ -2605,6 +2872,39 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
ReorderBufferCleanupTXN(rb, txn);
}
+/*
+ * Invalidate cache for those transactions that need to be skipped just in case
+ * catalogs were manipulated as part of the transaction.
+ *
+ * Note that this is a special-purpose function for prepared transactions where
+ * we don't want to clean up the TXN even when we decide to skip it. See
+ * DecodePrepare.
+ */
+void
+ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown, nothing to do */
+ if (txn == NULL)
+ return;
+
+ /*
+ * Process cache invalidation messages if there are any. Even if we're not
+ * interested in the transaction's contents, it could have manipulated the
+ * catalog and we need to update the caches according to that.
+ */
+ if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
+ ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+ txn->invalidations);
+ else
+ Assert(txn->ninvalidations == 0);
+}
+
+
/*
* Execute invalidations happening outside the context of a decoded
* transaction. That currently happens either for xid-less commits
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 6afc25e8d3d..15b07a54c11 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -834,6 +834,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
continue;
+ /*
+ * We don't need to add snapshot to prepared transactions as they
+ * should not see the new catalog contents.
+ */
+ if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
+ continue;
+
elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9f982137d93..bab31bf7af7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -174,6 +174,8 @@ typedef struct ReorderBufferChange
#define RBTXN_IS_STREAMED 0x0010
#define RBTXN_HAS_TOAST_INSERT 0x0020
#define RBTXN_HAS_SPEC_INSERT 0x0040
+#define RBTXN_PREPARE 0x0080
+#define RBTXN_SKIPPED_PREPARE 0x0100
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
@@ -233,6 +235,18 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
+/* Has this transaction been prepared? */
+#define rbtxn_prepared(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_PREPARE) != 0 \
+)
+
+/* prepare for this transaction skipped? */
+#define rbtxn_skip_prepared(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
+)
+
typedef struct ReorderBufferTXN
{
/* See above */
@@ -258,10 +272,11 @@ typedef struct ReorderBufferTXN
XLogRecPtr first_lsn;
/* ----
- * LSN of the record that lead to this xact to be committed or
+ * LSN of the record that lead to this xact to be prepared or committed or
* aborted. This can be a
* * plain commit record
* * plain commit record, of a parent transaction
+ * * prepared tansaction
* * prepared transaction commit
* * plain abort record
* * prepared transaction abort
@@ -293,7 +308,8 @@ typedef struct ReorderBufferTXN
XLogRecPtr origin_lsn;
/*
- * Commit time, only known when we read the actual commit record.
+ * Commit or Prepare time, only known when we read the actual commit or
+ * prepare record.
*/
TimestampTz commit_time;
@@ -625,12 +641,18 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ char *gid, bool is_commit);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+void ReorderBufferInvalidate(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
@@ -644,10 +666,17 @@ void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr l
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
SharedInvalidationMessage *invalidations);
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
+
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
+bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
+ TimestampTz prepare_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
+void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);