diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 101157ed8c9..90f9e8068a6 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1705,17 +1705,16 @@ static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, - TransactionId initial_effective_xmin, - TransactionId initial_catalog_effective_xmin, - XLogRecPtr initial_restart_lsn, TimestampTz *inactive_since, TimestampTz now) { Assert(possible_causes != RS_INVAL_NONE); if (possible_causes & RS_INVAL_WAL_REMOVED) { - if (initial_restart_lsn != InvalidXLogRecPtr && - initial_restart_lsn < oldestLSN) + XLogRecPtr restart_lsn = s->data.restart_lsn; + + if (restart_lsn != InvalidXLogRecPtr && + restart_lsn < oldestLSN) return RS_INVAL_WAL_REMOVED; } @@ -1725,12 +1724,15 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, if (SlotIsLogical(s) && (dboid == InvalidOid || dboid == s->data.database)) { - if (TransactionIdIsValid(initial_effective_xmin) && - TransactionIdPrecedesOrEquals(initial_effective_xmin, + TransactionId effective_xmin = s->effective_xmin; + TransactionId catalog_effective_xmin = s->effective_catalog_xmin; + + if (TransactionIdIsValid(effective_xmin) && + TransactionIdPrecedesOrEquals(effective_xmin, snapshotConflictHorizon)) return RS_INVAL_HORIZON; - else if (TransactionIdIsValid(initial_catalog_effective_xmin) && - TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, + else if (TransactionIdIsValid(catalog_effective_xmin) && + TransactionIdPrecedesOrEquals(catalog_effective_xmin, snapshotConflictHorizon)) return RS_INVAL_HORIZON; } @@ -1799,11 +1801,6 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, { int last_signaled_pid = 0; bool released_lock = false; - bool terminated = false; - TransactionId initial_effective_xmin = InvalidTransactionId; - TransactionId initial_catalog_effective_xmin = InvalidTransactionId; - XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr; - ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; TimestampTz inactive_since = 0; for (;;) @@ -1846,42 +1843,12 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, /* we do nothing if the slot is already invalid */ if (s->data.invalidated == RS_INVAL_NONE) - { - /* - * The slot's mutex will be released soon, and it is possible that - * those values change since the process holding the slot has been - * terminated (if any), so record them here to ensure that we - * would report the correct invalidation cause. - * - * Unlike other slot attributes, slot's inactive_since can't be - * changed until the acquired slot is released or the owning - * process is terminated. So, the inactive slot can only be - * invalidated immediately without being terminated. - */ - if (!terminated) - { - initial_restart_lsn = s->data.restart_lsn; - initial_effective_xmin = s->effective_xmin; - initial_catalog_effective_xmin = s->effective_catalog_xmin; - } - invalidation_cause = DetermineSlotInvalidationCause(possible_causes, s, oldestLSN, dboid, snapshotConflictHorizon, - initial_effective_xmin, - initial_catalog_effective_xmin, - initial_restart_lsn, &inactive_since, now); - } - - /* - * The invalidation cause recorded previously should not change while - * the process owning the slot (if any) has been terminated. - */ - Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated && - invalidation_cause_prev != invalidation_cause)); /* if there's no invalidation, we're done */ if (invalidation_cause == RS_INVAL_NONE) @@ -1899,6 +1866,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * If the slot can be acquired, do so and mark it invalidated * immediately. Otherwise we'll signal the owning process, below, and * retry. + * + * Note: Unlike other slot attributes, slot's inactive_since can't be + * changed until the acquired slot is released or the owning process + * is terminated. So, the inactive slot can only be invalidated + * immediately without being terminated. */ if (active_pid == 0) { @@ -1973,8 +1945,6 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, (void) kill(active_pid, SIGTERM); last_signaled_pid = active_pid; - terminated = true; - invalidation_cause_prev = invalidation_cause; } /* Wait until the slot is released. */ @@ -1985,6 +1955,14 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * Re-acquire lock and start over; we expect to invalidate the * slot next time (unless another process acquires the slot in the * meantime). + * + * Note: It is possible for a slot to advance its restart_lsn or + * xmin values sufficiently between when we release the mutex and + * when we recheck, moving from a conflicting state to a non + * conflicting state. This is intentional and safe: if the slot + * has caught up while we're busy here, the resources we were + * concerned about (WAL segments or tuples) have not yet been + * removed, and there's no reason to invalidate the slot. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); continue;