diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 8e2b0a7927b..be81c2b51d2 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2566,7 +2566,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx The time when the slot became inactive. NULL if the - slot is currently being streamed. + slot is currently being streamed. If the slot becomes invalid, + this value will never be updated. Note that for slots on the standby that are being synced from a primary server (whose synced field is true), the inactive_since diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index be6f87f00b2..987857b9491 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -1541,9 +1541,7 @@ update_synced_slots_inactive_since(void) if (now == 0) now = GetCurrentTimestamp(); - SpinLockAcquire(&s->mutex); - s->inactive_since = now; - SpinLockRelease(&s->mutex); + ReplicationSlotSetInactiveSince(s, now, true); } } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index c57a13d8208..fe5acd8b1fc 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -644,9 +644,7 @@ retry: * Reset the time since the slot has become inactive as the slot is active * now. */ - SpinLockAcquire(&s->mutex); - s->inactive_since = 0; - SpinLockRelease(&s->mutex); + ReplicationSlotSetInactiveSince(s, 0, true); if (am_walsender) { @@ -720,16 +718,12 @@ ReplicationSlotRelease(void) */ SpinLockAcquire(&slot->mutex); slot->active_pid = 0; - slot->inactive_since = now; + ReplicationSlotSetInactiveSince(slot, now, false); SpinLockRelease(&slot->mutex); ConditionVariableBroadcast(&slot->active_cv); } else - { - SpinLockAcquire(&slot->mutex); - slot->inactive_since = now; - SpinLockRelease(&slot->mutex); - } + ReplicationSlotSetInactiveSince(slot, now, true); MyReplicationSlot = NULL; @@ -2218,6 +2212,7 @@ RestoreSlotFromDisk(const char *name) bool restored = false; int readBytes; pg_crc32c checksum; + TimestampTz now = 0; /* no need to lock here, no concurrent access allowed yet */ @@ -2408,9 +2403,13 @@ RestoreSlotFromDisk(const char *name) /* * Set the time since the slot has become inactive after loading the * slot from the disk into memory. Whoever acquires the slot i.e. - * makes the slot active will reset it. + * makes the slot active will reset it. Use the same inactive_since + * time for all the slots. */ - slot->inactive_since = GetCurrentTimestamp(); + if (now == 0) + now = GetCurrentTimestamp(); + + ReplicationSlotSetInactiveSince(slot, now, false); restored = true; break; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 47ebdaecb6a..000c36d30dd 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -228,6 +228,23 @@ typedef struct ReplicationSlotCtlData ReplicationSlot replication_slots[1]; } ReplicationSlotCtlData; +/* + * Set slot's inactive_since property unless it was previously invalidated. + */ +static inline void +ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, + bool acquire_lock) +{ + if (acquire_lock) + SpinLockAcquire(&s->mutex); + + if (s->data.invalidated == RS_INVAL_NONE) + s->inactive_since = ts; + + if (acquire_lock) + SpinLockRelease(&s->mutex); +} + /* * Pointers to shared memory */