diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 86ae99a3ca9..682eccd116c 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1572,63 +1572,65 @@ void ReplicationSlotReserveWal(void) { ReplicationSlot *slot = MyReplicationSlot; + XLogSegNo segno; + XLogRecPtr restart_lsn; Assert(slot != NULL); Assert(!XLogRecPtrIsValid(slot->data.restart_lsn)); Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn)); /* - * The replication slot mechanism is used to prevent removal of required - * WAL. As there is no interlock between this routine and checkpoints, WAL - * segments could concurrently be removed when a now stale return value of - * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that - * this happens we'll just retry. + * The replication slot mechanism is used to prevent the removal of + * required WAL. + * + * Acquire an exclusive lock to prevent the checkpoint process from + * concurrently computing the minimum slot LSN (see + * CheckPointReplicationSlots). This ensures that the WAL reserved for + * replication cannot be removed during a checkpoint. + * + * The mechanism is reliable because if WAL reservation occurs first, the + * checkpoint must wait for the restart_lsn update before determining the + * minimum non-removable LSN. On the other hand, if the checkpoint happens + * first, subsequent WAL reservations will select positions at or beyond + * the redo pointer of that checkpoint. */ - while (true) - { - XLogSegNo segno; - XLogRecPtr restart_lsn; + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); - /* - * For logical slots log a standby snapshot and start logical decoding - * at exactly that position. That allows the slot to start up more - * quickly. But on a standby we cannot do WAL writes, so just use the - * replay pointer; effectively, an attempt to create a logical slot on - * standby will cause it to wait for an xl_running_xact record to be - * logged independently on the primary, so that a snapshot can be - * built using the record. - * - * None of this is needed (or indeed helpful) for physical slots as - * they'll start replay at the last logged checkpoint anyway. Instead - * return the location of the last redo LSN. While that slightly - * increases the chance that we have to retry, it's where a base - * backup has to start replay at. - */ - if (SlotIsPhysical(slot)) - restart_lsn = GetRedoRecPtr(); - else if (RecoveryInProgress()) - restart_lsn = GetXLogReplayRecPtr(NULL); - else - restart_lsn = GetXLogInsertRecPtr(); + /* + * For logical slots log a standby snapshot and start logical decoding at + * exactly that position. That allows the slot to start up more quickly. + * But on a standby we cannot do WAL writes, so just use the replay + * pointer; effectively, an attempt to create a logical slot on standby + * will cause it to wait for an xl_running_xact record to be logged + * independently on the primary, so that a snapshot can be built using the + * record. + * + * None of this is needed (or indeed helpful) for physical slots as + * they'll start replay at the last logged checkpoint anyway. Instead, + * return the location of the last redo LSN, where a base backup has to + * start replay at. + */ + if (SlotIsPhysical(slot)) + restart_lsn = GetRedoRecPtr(); + else if (RecoveryInProgress()) + restart_lsn = GetXLogReplayRecPtr(NULL); + else + restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); - /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); + /* prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); - /* - * If all required WAL is still there, great, otherwise retry. The - * slot should prevent further removal of WAL, unless there's a - * concurrent ReplicationSlotsComputeRequiredLSN() after we've written - * the new restart_lsn above, so normally we should never need to loop - * more than twice. - */ - XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); - if (XLogGetLastRemovedSegno() < segno) - break; - } + /* Checkpoint shouldn't remove the required WAL. */ + XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + if (XLogGetLastRemovedSegno() >= segno) + elog(ERROR, "WAL required by replication slot %s has been removed concurrently", + NameStr(slot->data.name)); + + LWLockRelease(ReplicationSlotAllocationLock); if (!RecoveryInProgress() && SlotIsLogical(slot)) { @@ -2136,6 +2138,12 @@ CheckPointReplicationSlots(bool is_shutdown) * acquiring a slot we cannot take the control lock - but that's OK, * because holding ReplicationSlotAllocationLock is strictly stronger, and * enough to guarantee that nobody can change the in_use bits on us. + * + * Additionally, acquiring the Allocation lock is necessary to serialize + * the slot flush process with concurrent slot WAL reservation. This + * ensures that the WAL position being reserved is either flushed to disk + * or is beyond or equal to the redo pointer of the current checkpoint + * (See ReplicationSlotReserveWal for details). */ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);