diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3991e1495d4..b9780c7bc99 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1540,6 +1540,7 @@ init_conflict_slot_xmin(void) Assert(MyReplicationSlot && !TransactionIdIsValid(MyReplicationSlot->data.xmin)); + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xmin_horizon = GetOldestSafeDecodingTransactionId(false); @@ -1552,6 +1553,7 @@ init_conflict_slot_xmin(void) ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + LWLockRelease(ReplicationSlotControlLock); /* Write this slot to disk */ ReplicationSlotMarkDirty(); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c8858e06616..9ed95cda677 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -394,11 +394,11 @@ CreateInitDecodingContext(const char *plugin, * without further interlock its return value might immediately be out of * date. * - * So we have to acquire the ProcArrayLock to prevent computation of new - * xmin horizons by other backends, get the safe decoding xid, and inform - * the slot machinery about the new limit. Once that's done the - * ProcArrayLock can be released as the slot machinery now is - * protecting against vacuum. + * So we have to acquire both the ReplicationSlotControlLock and the + * ProcArrayLock to prevent concurrent computation and update of new xmin + * horizons by other backends, get the safe decoding xid, and inform the + * slot machinery about the new limit. Once that's done both locks can be + * released as the slot machinery now is protecting against vacuum. * * Note that, temporarily, the data, not just the catalog, xmin has to be * reserved if a data snapshot is to be exported. Otherwise the initial @@ -411,6 +411,7 @@ CreateInitDecodingContext(const char *plugin, * * ---- */ + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); @@ -425,6 +426,7 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + LWLockRelease(ReplicationSlotControlLock); ReplicationSlotMarkDirty(); ReplicationSlotSave(); diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 2aea776352d..0feebffd431 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -857,6 +857,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, reserve_wal_for_local_slot(remote_slot->restart_lsn); + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xmin_horizon = GetOldestSafeDecodingTransactionId(true); SpinLockAcquire(&slot->mutex); @@ -865,6 +866,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + LWLockRelease(ReplicationSlotControlLock); /* * Make sure that concerned WAL is received and flushed before syncing diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 58c41d45516..4c1cd55f979 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1201,8 +1201,11 @@ ReplicationSlotPersist(void) /* * Compute the oldest xmin across all slots and store it in the ProcArray. * - * If already_locked is true, ProcArrayLock has already been acquired - * exclusively. + * If already_locked is true, both the ReplicationSlotControlLock and the + * ProcArrayLock have already been acquired exclusively. It is crucial that the + * caller first acquires the ReplicationSlotControlLock, followed by the + * ProcArrayLock, to prevent any undetectable deadlocks since this function + * acquires them in that order. */ void ReplicationSlotsComputeRequiredXmin(bool already_locked) @@ -1212,8 +1215,33 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) TransactionId agg_catalog_xmin = InvalidTransactionId; Assert(ReplicationSlotCtl != NULL); + Assert(!already_locked || + (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) && + LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE))); - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + /* + * Hold the ReplicationSlotControlLock until after updating the slot xmin + * values, so no backend updates the initial xmin for newly created slot + * concurrently. A shared lock is used here to minimize lock contention, + * especially when many slots exist and advancements occur frequently. + * This is safe since an exclusive lock is taken during initial slot xmin + * update in slot creation. + * + * One might think that we can hold the ProcArrayLock exclusively and + * update the slot xmin values, but it could increase lock contention on + * the ProcArrayLock, which is not great since this function can be called + * at non-negligible frequency. + * + * Concurrent invocation of this function may cause the computed slot xmin + * to regress. However, this is harmless because tuples prior to the most + * recent xmin are no longer useful once advancement occurs (see + * LogicalConfirmReceivedLocation where the slot's xmin value is flushed + * before updating the effective_xmin). Thus, such regression merely + * prevents VACUUM from prematurely removing tuples without causing the + * early deletion of required data. + */ + if (!already_locked) + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -1248,9 +1276,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) agg_catalog_xmin = effective_catalog_xmin; } - LWLockRelease(ReplicationSlotControlLock); - ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked); + + if (!already_locked) + LWLockRelease(ReplicationSlotControlLock); } /*