1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-20 00:42:27 +03:00

Fix slot synchronization for two_phase enabled slots.

The issue is that the transactions prepared before two-phase decoding is
enabled can fail to replicate to the subscriber after being committed on a
promoted standby following a failover. This is because the two_phase_at
field of a slot, which tracks the LSN from which two-phase decoding
starts, is not synchronized to standby servers. Without two_phase_at, the
logical decoding might incorrectly identify prepared transaction as
already replicated to the subscriber after promotion of standby server,
causing them to be skipped.

To address the issue on HEAD, the two_phase_at field of the slot is
exposed by the pg_replication_slots view and allows the slot
synchronization to copy this value to the corresponding synced slot on the
standby server.

This bug is likely to occur if the user toggles the two_phase option to
true after initial slot creation. Given that altering the two_phase option
of a replication slot is not allowed in PostgreSQL 17, this bug is less
likely to occur. We can't change the view/function definition in
backbranch so we can't push the same fix but we are brainstorming an
appropriate solution for PG17.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/TYAPR01MB5724CC7C288535BBCEEE65DA94A72@TYAPR01MB5724.jpnprd01.prod.outlook.com
This commit is contained in:
Amit Kapila 2025-04-03 12:07:46 +05:30
parent a7187c3723
commit 4868c96bc8
8 changed files with 112 additions and 14 deletions

View File

@ -2854,6 +2854,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
</para></entry> </para></entry>
</row> </row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>two_phase_at</structfield> <type>pg_lsn</type>
</para>
<para>
The address (<literal>LSN</literal>) from which the decoding of prepared
transactions is enabled. <literal>NULL</literal> for logical slots
where <structfield>two_phase</structfield> is false and for physical slots.
</para></entry>
</row>
<row> <row>
<entry role="catalog_table_entry"><para role="column_definition"> <entry role="catalog_table_entry"><para role="column_definition">
<structfield>inactive_since</structfield> <type>timestamptz</type> <structfield>inactive_since</structfield> <type>timestamptz</type>

View File

@ -1025,6 +1025,7 @@ CREATE VIEW pg_replication_slots AS
L.wal_status, L.wal_status,
L.safe_wal_size, L.safe_wal_size,
L.two_phase, L.two_phase,
L.two_phase_at,
L.inactive_since, L.inactive_since,
L.conflicting, L.conflicting,
L.invalidation_reason, L.invalidation_reason,

View File

@ -139,6 +139,7 @@ typedef struct RemoteSlot
bool failover; bool failover;
XLogRecPtr restart_lsn; XLogRecPtr restart_lsn;
XLogRecPtr confirmed_lsn; XLogRecPtr confirmed_lsn;
XLogRecPtr two_phase_at;
TransactionId catalog_xmin; TransactionId catalog_xmin;
/* RS_INVAL_NONE if valid, or the reason of invalidation */ /* RS_INVAL_NONE if valid, or the reason of invalidation */
@ -276,7 +277,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
if (remote_dbid != slot->data.database || if (remote_dbid != slot->data.database ||
remote_slot->two_phase != slot->data.two_phase || remote_slot->two_phase != slot->data.two_phase ||
remote_slot->failover != slot->data.failover || remote_slot->failover != slot->data.failover ||
strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0) strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
remote_slot->two_phase_at != slot->data.two_phase_at)
{ {
NameData plugin_name; NameData plugin_name;
@ -287,6 +289,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
slot->data.plugin = plugin_name; slot->data.plugin = plugin_name;
slot->data.database = remote_dbid; slot->data.database = remote_dbid;
slot->data.two_phase = remote_slot->two_phase; slot->data.two_phase = remote_slot->two_phase;
slot->data.two_phase_at = remote_slot->two_phase_at;
slot->data.failover = remote_slot->failover; slot->data.failover = remote_slot->failover;
SpinLockRelease(&slot->mutex); SpinLockRelease(&slot->mutex);
@ -788,9 +791,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
static bool static bool
synchronize_slots(WalReceiverConn *wrconn) synchronize_slots(WalReceiverConn *wrconn)
{ {
#define SLOTSYNC_COLUMN_COUNT 9 #define SLOTSYNC_COLUMN_COUNT 10
Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
WalRcvExecResult *res; WalRcvExecResult *res;
TupleTableSlot *tupslot; TupleTableSlot *tupslot;
@ -798,7 +801,7 @@ synchronize_slots(WalReceiverConn *wrconn)
bool some_slot_updated = false; bool some_slot_updated = false;
bool started_tx = false; bool started_tx = false;
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
" restart_lsn, catalog_xmin, two_phase, failover," " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
" database, invalidation_reason" " database, invalidation_reason"
" FROM pg_catalog.pg_replication_slots" " FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary"; " WHERE failover and NOT temporary";
@ -853,6 +856,9 @@ synchronize_slots(WalReceiverConn *wrconn)
&isnull)); &isnull));
Assert(!isnull); Assert(!isnull);
d = slot_getattr(tupslot, ++col, &isnull);
remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col, remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
&isnull)); &isnull));
Assert(!isnull); Assert(!isnull);

View File

@ -235,7 +235,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum Datum
pg_get_replication_slots(PG_FUNCTION_ARGS) pg_get_replication_slots(PG_FUNCTION_ARGS)
{ {
#define PG_GET_REPLICATION_SLOTS_COLS 19 #define PG_GET_REPLICATION_SLOTS_COLS 20
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn; XLogRecPtr currlsn;
int slotno; int slotno;
@ -406,6 +406,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.two_phase); values[i++] = BoolGetDatum(slot_contents.data.two_phase);
if (slot_contents.data.two_phase &&
!XLogRecPtrIsInvalid(slot_contents.data.two_phase_at))
values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
else
nulls[i++] = true;
if (slot_contents.inactive_since > 0) if (slot_contents.inactive_since > 0)
values[i++] = TimestampTzGetDatum(slot_contents.inactive_since); values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
else else

View File

@ -57,6 +57,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 202504021 #define CATALOG_VERSION_NO 202504031
#endif #endif

View File

@ -11424,9 +11424,9 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record', proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '', proargtypes => '',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}', proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,inactive_since,conflicting,invalidation_reason,failover,synced}', proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced}',
prosrc => 'pg_get_replication_slots' }, prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot', { oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v', proname => 'pg_create_logical_replication_slot', provolatile => 'v',

View File

@ -22,7 +22,11 @@ $publisher->init(
# Disable autovacuum to avoid generating xid during stats update as otherwise # Disable autovacuum to avoid generating xid during stats update as otherwise
# the new XID could then be replicated to standby at some random point making # the new XID could then be replicated to standby at some random point making
# slots at primary lag behind standby during slot sync. # slots at primary lag behind standby during slot sync.
$publisher->append_conf('postgresql.conf', 'autovacuum = off'); $publisher->append_conf(
'postgresql.conf', qq{
autovacuum = off
max_prepared_transactions = 1
});
$publisher->start; $publisher->start;
$publisher->safe_psql('postgres', $publisher->safe_psql('postgres',
@ -33,6 +37,7 @@ my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
# Create a subscriber node, wait for sync to complete # Create a subscriber node, wait for sync to complete
my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
$subscriber1->init; $subscriber1->init;
$subscriber1->append_conf('postgresql.conf', 'max_prepared_transactions = 1');
$subscriber1->start; $subscriber1->start;
# Capture the time before the logical failover slot is created on the # Capture the time before the logical failover slot is created on the
@ -830,13 +835,72 @@ $primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
"'sb1_slot'"); "'sb1_slot'");
$primary->reload; $primary->reload;
##################################################
# Test the synchronization of the two_phase setting for a subscription with the
# standby. Additionally, prepare a transaction before enabling the two_phase
# option; subsequent tests will verify if it can be correctly replicated to the
# subscriber after committing it on the promoted standby.
##################################################
$standby1->start;
# Prepare a transaction
$primary->safe_psql(
'postgres', qq[
BEGIN;
INSERT INTO tab_int values(0);
PREPARE TRANSACTION 'test_twophase_slotsync';
]);
$primary->wait_for_replay_catchup($standby1);
$primary->wait_for_catchup('regress_mysub1');
# Disable the subscription to allow changing the two_phase option.
$subscriber1->safe_psql('postgres',
"ALTER SUBSCRIPTION regress_mysub1 DISABLE");
# Wait for the replication slot to become inactive on the publisher
$primary->poll_query_until(
'postgres',
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
1);
# Set two_phase to true and enable the subscription
$subscriber1->safe_psql(
'postgres', qq[
ALTER SUBSCRIPTION regress_mysub1 SET (two_phase = true);
ALTER SUBSCRIPTION regress_mysub1 ENABLE;
]);
$primary->wait_for_catchup('regress_mysub1');
my $two_phase_at = $primary->safe_psql('postgres',
"SELECT two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot';"
);
# Confirm that two_phase setting of lsub1_slot slot is synced to the standby
ok( $standby1->poll_query_until(
'postgres',
"SELECT two_phase AND '$two_phase_at' = two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"
),
'two_phase setting of slot lsub1_slot synced to standby');
# Confirm that the prepared transaction is not yet replicated to the
# subscriber.
$result = $subscriber1->safe_psql('postgres',
"SELECT count(*) = 0 FROM pg_prepared_xacts;");
is($result, 't',
"the prepared transaction is not replicated to the subscriber");
################################################## ##################################################
# Promote the standby1 to primary. Confirm that: # Promote the standby1 to primary. Confirm that:
# a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary # a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary
# b) logical replication for regress_mysub1 is resumed successfully after failover # b) logical replication for regress_mysub1 is resumed successfully after failover
# c) changes can be consumed from the synced slot 'snap_test_slot' # c) changes from the transaction prepared 'test_twophase_slotsync' can be
# consumed from the synced slot 'snap_test_slot' once committed on the new
# primary.
# d) changes can be consumed from the synced slot 'snap_test_slot'
################################################## ##################################################
$standby1->start;
$primary->wait_for_replay_catchup($standby1); $primary->wait_for_replay_catchup($standby1);
# Capture the time before the standby is promoted # Capture the time before the standby is promoted
@ -876,6 +940,15 @@ is( $standby1->safe_psql(
't', 't',
'synced slot retained on the new primary'); 'synced slot retained on the new primary');
# Commit the prepared transaction
$standby1->safe_psql('postgres',
"COMMIT PREPARED 'test_twophase_slotsync';");
$standby1->wait_for_catchup('regress_mysub1');
# Confirm that the prepared transaction is replicated to the subscriber
is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
"11", 'prepared data replicated from the new primary');
# Insert data on the new primary # Insert data on the new primary
$standby1->safe_psql('postgres', $standby1->safe_psql('postgres',
"INSERT INTO tab_int SELECT generate_series(11, 20);"); "INSERT INTO tab_int SELECT generate_series(11, 20);");
@ -883,7 +956,7 @@ $standby1->wait_for_catchup('regress_mysub1');
# Confirm that data in tab_int replicated on the subscriber # Confirm that data in tab_int replicated on the subscriber
is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
"20", 'data replicated from the new primary'); "21", 'data replicated from the new primary');
# Consume the data from the snap_test_slot. The synced slot should reach a # Consume the data from the snap_test_slot. The synced slot should reach a
# consistent point by restoring the snapshot at the restart_lsn serialized # consistent point by restoring the snapshot at the restart_lsn serialized

View File

@ -1490,12 +1490,13 @@ pg_replication_slots| SELECT l.slot_name,
l.wal_status, l.wal_status,
l.safe_wal_size, l.safe_wal_size,
l.two_phase, l.two_phase,
l.two_phase_at,
l.inactive_since, l.inactive_since,
l.conflicting, l.conflicting,
l.invalidation_reason, l.invalidation_reason,
l.failover, l.failover,
l.synced l.synced
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, inactive_since, conflicting, invalidation_reason, failover, synced) FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, two_phase_at, inactive_since, conflicting, invalidation_reason, failover, synced)
LEFT JOIN pg_database d ON ((l.datoid = d.oid))); LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname, pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper, pg_authid.rolsuper,