diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index e9a59af8c34..4f336ee0adf 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2854,6 +2854,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx + + + two_phase_at pg_lsn + + + The address (LSN) from which the decoding of prepared + transactions is enabled. NULL for logical slots + where two_phase is false and for physical slots. + + + inactive_since timestamptz diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 64a7240aa77..273008db37f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1025,6 +1025,7 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, + L.two_phase_at, L.inactive_since, L.conflicting, L.invalidation_reason, diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 2c0a7439be4..e22d41891e6 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -139,6 +139,7 @@ typedef struct RemoteSlot bool failover; XLogRecPtr restart_lsn; XLogRecPtr confirmed_lsn; + XLogRecPtr two_phase_at; TransactionId catalog_xmin; /* RS_INVAL_NONE if valid, or the reason of invalidation */ @@ -276,7 +277,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, if (remote_dbid != slot->data.database || remote_slot->two_phase != slot->data.two_phase || remote_slot->failover != slot->data.failover || - strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0) + strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 || + remote_slot->two_phase_at != slot->data.two_phase_at) { NameData plugin_name; @@ -287,6 +289,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, slot->data.plugin = plugin_name; slot->data.database = remote_dbid; slot->data.two_phase = remote_slot->two_phase; + slot->data.two_phase_at = remote_slot->two_phase_at; slot->data.failover = remote_slot->failover; SpinLockRelease(&slot->mutex); @@ -788,9 +791,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) static bool synchronize_slots(WalReceiverConn *wrconn) { -#define SLOTSYNC_COLUMN_COUNT 9 +#define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, - LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID}; WalRcvExecResult *res; TupleTableSlot *tupslot; @@ -798,7 +801,7 @@ synchronize_slots(WalReceiverConn *wrconn) bool some_slot_updated = false; bool started_tx = false; const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," - " restart_lsn, catalog_xmin, two_phase, failover," + " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," " database, invalidation_reason" " FROM pg_catalog.pg_replication_slots" " WHERE failover and NOT temporary"; @@ -853,6 +856,9 @@ synchronize_slots(WalReceiverConn *wrconn) &isnull)); Assert(!isnull); + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col, &isnull)); Assert(!isnull); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 146eef5871e..8a314b5ff3b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -235,7 +235,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 19 +#define PG_GET_REPLICATION_SLOTS_COLS 20 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -406,6 +406,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + if (slot_contents.data.two_phase && + !XLogRecPtrIsInvalid(slot_contents.data.two_phase_at)) + values[i++] = LSNGetDatum(slot_contents.data.two_phase_at); + else + nulls[i++] = true; + if (slot_contents.inactive_since > 0) values[i++] = TimestampTzGetDatum(slot_contents.inactive_since); else diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 0c9b8ac9de1..950fda777fb 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202504021 +#define CATALOG_VERSION_NO 202504031 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 4962d611f31..a28a15993a2 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11424,9 +11424,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,inactive_since,conflicting,invalidation_reason,failover,synced}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 8f65142909a..9c8b49e942d 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -22,7 +22,11 @@ $publisher->init( # Disable autovacuum to avoid generating xid during stats update as otherwise # the new XID could then be replicated to standby at some random point making # slots at primary lag behind standby during slot sync. -$publisher->append_conf('postgresql.conf', 'autovacuum = off'); +$publisher->append_conf( + 'postgresql.conf', qq{ +autovacuum = off +max_prepared_transactions = 1 +}); $publisher->start; $publisher->safe_psql('postgres', @@ -33,6 +37,7 @@ my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; # Create a subscriber node, wait for sync to complete my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); $subscriber1->init; +$subscriber1->append_conf('postgresql.conf', 'max_prepared_transactions = 1'); $subscriber1->start; # Capture the time before the logical failover slot is created on the @@ -830,13 +835,72 @@ $primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "'sb1_slot'"); $primary->reload; +################################################## +# Test the synchronization of the two_phase setting for a subscription with the +# standby. Additionally, prepare a transaction before enabling the two_phase +# option; subsequent tests will verify if it can be correctly replicated to the +# subscriber after committing it on the promoted standby. +################################################## + +$standby1->start; + +# Prepare a transaction +$primary->safe_psql( + 'postgres', qq[ + BEGIN; + INSERT INTO tab_int values(0); + PREPARE TRANSACTION 'test_twophase_slotsync'; +]); + +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_catchup('regress_mysub1'); + +# Disable the subscription to allow changing the two_phase option. +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'", + 1); + +# Set two_phase to true and enable the subscription +$subscriber1->safe_psql( + 'postgres', qq[ + ALTER SUBSCRIPTION regress_mysub1 SET (two_phase = true); + ALTER SUBSCRIPTION regress_mysub1 ENABLE; +]); + +$primary->wait_for_catchup('regress_mysub1'); + +my $two_phase_at = $primary->safe_psql('postgres', + "SELECT two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot';" +); + +# Confirm that two_phase setting of lsub1_slot slot is synced to the standby +ok( $standby1->poll_query_until( + 'postgres', + "SELECT two_phase AND '$two_phase_at' = two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;" + ), + 'two_phase setting of slot lsub1_slot synced to standby'); + +# Confirm that the prepared transaction is not yet replicated to the +# subscriber. +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = 0 FROM pg_prepared_xacts;"); +is($result, 't', + "the prepared transaction is not replicated to the subscriber"); + ################################################## # Promote the standby1 to primary. Confirm that: # a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary # b) logical replication for regress_mysub1 is resumed successfully after failover -# c) changes can be consumed from the synced slot 'snap_test_slot' +# c) changes from the transaction prepared 'test_twophase_slotsync' can be +# consumed from the synced slot 'snap_test_slot' once committed on the new +# primary. +# d) changes can be consumed from the synced slot 'snap_test_slot' ################################################## -$standby1->start; $primary->wait_for_replay_catchup($standby1); # Capture the time before the standby is promoted @@ -876,6 +940,15 @@ is( $standby1->safe_psql( 't', 'synced slot retained on the new primary'); +# Commit the prepared transaction +$standby1->safe_psql('postgres', + "COMMIT PREPARED 'test_twophase_slotsync';"); +$standby1->wait_for_catchup('regress_mysub1'); + +# Confirm that the prepared transaction is replicated to the subscriber +is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), + "11", 'prepared data replicated from the new primary'); + # Insert data on the new primary $standby1->safe_psql('postgres', "INSERT INTO tab_int SELECT generate_series(11, 20);"); @@ -883,7 +956,7 @@ $standby1->wait_for_catchup('regress_mysub1'); # Confirm that data in tab_int replicated on the subscriber is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), - "20", 'data replicated from the new primary'); + "21", 'data replicated from the new primary'); # Consume the data from the snap_test_slot. The synced slot should reach a # consistent point by restoring the snapshot at the restart_lsn serialized diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index d9533deb04e..673c63b8d1b 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1490,12 +1490,13 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, + l.two_phase_at, l.inactive_since, l.conflicting, l.invalidation_reason, l.failover, l.synced - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, inactive_since, conflicting, invalidation_reason, failover, synced) + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, two_phase_at, inactive_since, conflicting, invalidation_reason, failover, synced) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper,