diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 57bbb6288c6..1dc7ec64e46 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1123,6 +1123,158 @@ ReplicationSlotReserveWal(void) } } +/* + * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot + * and mark it invalid, if necessary and possible. + * + * Returns whether ReplicationSlotControlLock was released in the interim (and + * in that case we're not holding the lock at return, otherwise we are). + * + * This is inherently racy, because we release the LWLock + * for syscalls, so caller must restart if we return true. + */ +static bool +InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) +{ + int last_signaled_pid = 0; + bool released_lock = false; + + for (;;) + { + XLogRecPtr restart_lsn; + NameData slotname; + int active_pid = 0; + + Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); + + if (!s->in_use) + { + if (released_lock) + LWLockRelease(ReplicationSlotControlLock); + break; + } + + /* + * Check if the slot needs to be invalidated. If it needs to be + * invalidated, and is not currently acquired, acquire it and mark it + * as having been invalidated. We do this with the spinlock held to + * avoid race conditions -- for example the restart_lsn could move + * forward, or the slot could be dropped. + */ + SpinLockAcquire(&s->mutex); + + restart_lsn = s->data.restart_lsn; + + /* + * If the slot is already invalid or is fresh enough, we don't need to + * do anything. + */ + if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) + { + SpinLockRelease(&s->mutex); + if (released_lock) + LWLockRelease(ReplicationSlotControlLock); + break; + } + + slotname = s->data.name; + active_pid = s->active_pid; + + /* + * If the slot can be acquired, do so and mark it invalidated + * immediately. Otherwise we'll signal the owning process, below, and + * retry. + */ + if (active_pid == 0) + { + MyReplicationSlot = s; + s->active_pid = MyProcPid; + s->data.invalidated_at = restart_lsn; + s->data.restart_lsn = InvalidXLogRecPtr; + } + + SpinLockRelease(&s->mutex); + + if (active_pid != 0) + { + /* + * Prepare the sleep on the slot's condition variable before + * releasing the lock, to close a possible race condition if the + * slot is released before the sleep below. + */ + ConditionVariablePrepareToSleep(&s->active_cv); + + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* + * Signal to terminate the process that owns the slot, if we + * haven't already signalled it. (Avoidance of repeated + * signalling is the only reason for there to be a loop in this + * routine; otherwise we could rely on caller's restart loop.) + * + * There is the race condition that other process may own the slot + * after its current owner process is terminated and before this + * process owns it. To handle that, we signal only if the PID of + * the owning process has changed from the previous time. (This + * logic assumes that the same PID is not reused very quickly.) + */ + if (last_signaled_pid != active_pid) + { + ereport(LOG, + (errmsg("terminating process %d to release replication slot \"%s\"", + active_pid, NameStr(slotname)))); + + (void) kill(active_pid, SIGTERM); + last_signaled_pid = active_pid; + } + + /* Wait until the slot is released. */ + ConditionVariableSleep(&s->active_cv, + WAIT_EVENT_REPLICATION_SLOT_DROP); + + /* + * Re-acquire lock and start over; we expect to invalidate the slot + * next time (unless another process acquires the slot in the + * meantime). + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + continue; + } + else + { + /* + * We hold the slot now and have already invalidated it; flush it + * to ensure that state persists. + * + * Don't want to hold ReplicationSlotControlLock across file + * system operations, so release it now but be sure to tell caller + * to restart from scratch. + */ + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); + + ereport(LOG, + (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", + NameStr(slotname), + (uint32) (restart_lsn >> 32), + (uint32) restart_lsn))); + + /* done with this slot for now */ + break; + } + } + + Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock)); + + return released_lock; +} + /* * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. @@ -1141,100 +1293,15 @@ restart: for (int i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_lsn = InvalidXLogRecPtr; - NameData slotname; - int wspid; - int last_signaled_pid = 0; if (!s->in_use) continue; - SpinLockAcquire(&s->mutex); - slotname = s->data.name; - restart_lsn = s->data.restart_lsn; - SpinLockRelease(&s->mutex); - - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) - continue; - LWLockRelease(ReplicationSlotControlLock); - CHECK_FOR_INTERRUPTS(); - - /* Get ready to sleep on the slot in case it is active */ - ConditionVariablePrepareToSleep(&s->active_cv); - - for (;;) + if (InvalidatePossiblyObsoleteSlot(s, oldestLSN)) { - /* - * Try to mark this slot as used by this process. - * - * Note that ReplicationSlotAcquireInternal(SAB_Inquire) - * should not cancel the prepared condition variable - * if this slot is active in other process. Because in this case - * we have to wait on that CV for the process owning - * the slot to be terminated, later. - */ - wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire); - - /* - * Exit the loop if we successfully acquired the slot or - * the slot was dropped during waiting for the owning process - * to be terminated. For example, the latter case is likely to - * happen when the slot is temporary because it's automatically - * dropped by the termination of the owning process. - */ - if (wspid <= 0) - break; - - /* - * Signal to terminate the process that owns the slot. - * - * There is the race condition where other process may own - * the slot after the process using it was terminated and before - * this process owns it. To handle this case, we signal again - * if the PID of the owning process is changed than the last. - * - * XXX This logic assumes that the same PID is not reused - * very quickly. - */ - if (last_signaled_pid != wspid) - { - ereport(LOG, - (errmsg("terminating process %d because replication slot \"%s\" is too far behind", - wspid, NameStr(slotname)))); - (void) kill(wspid, SIGTERM); - last_signaled_pid = wspid; - } - - ConditionVariableTimedSleep(&s->active_cv, 10, - WAIT_EVENT_REPLICATION_SLOT_DROP); - } - ConditionVariableCancelSleep(); - - /* - * Do nothing here and start from scratch if the slot has - * already been dropped. - */ - if (wspid == -1) + /* if the lock was released, start from scratch */ goto restart; - - ereport(LOG, - (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", - NameStr(slotname), - (uint32) (restart_lsn >> 32), - (uint32) restart_lsn))); - - SpinLockAcquire(&s->mutex); - s->data.invalidated_at = s->data.restart_lsn; - s->data.restart_lsn = InvalidXLogRecPtr; - SpinLockRelease(&s->mutex); - - /* Make sure the invalidated state persists across server restart */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - ReplicationSlotRelease(); - - /* if we did anything, start from scratch */ - goto restart; + } } LWLockRelease(ReplicationSlotControlLock); }