diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 336630ce417..9eedcf6f0f4 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4429,6 +4429,46 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + idle_replication_slot_timeout (integer) + + idle_replication_slot_timeout configuration parameter + + + + + Invalidate replication slots that have remained idle longer than this + duration. If this value is specified without units, it is taken as + minutes. A value of zero (the default) disables the idle timeout + invalidation mechanism. This parameter can only be set in the + postgresql.conf file or on the server command + line. + + + + Slot invalidation due to idle timeout occurs during checkpoint. + Because checkpoints happen at checkpoint_timeout + intervals, there can be some lag between when the + idle_replication_slot_timeout was exceeded and when + the slot invalidation is triggered at the next checkpoint. + To avoid such lags, users can force a checkpoint to promptly invalidate + inactive slots. The duration of slot inactivity is calculated using the + slot's pg_replication_slots.inactive_since + value. + + + + Note that the idle timeout invalidation mechanism is not applicable + for slots that do not reserve WAL or for slots on the standby server + that are being synced from the primary server (i.e., standby slots + having pg_replication_slots.synced + value true). Synced slots are always considered to + be inactive because they don't perform logical decoding to produce + changes. + + + + wal_sender_timeout (integer) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 613abcd28b7..3d18e507bbc 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2390,6 +2390,11 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER plus some reserve for table synchronization. + + Logical replication slots are also affected by + idle_replication_slot_timeout. + + max_wal_senders should be set to at least the same as diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index ad2903d5ac7..3f5a306247e 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2619,6 +2619,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx perform logical decoding. It is set only for logical slots. + + + idle_timeout means that the slot has remained + idle longer than the configured + duration. + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 25a5c605404..f9bf5ba7509 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7337,7 +7337,7 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) { @@ -7792,7 +7792,7 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fe5acd8b1fc..d73c9c2fc32 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -102,16 +102,24 @@ typedef struct /* * Lookup table for slot invalidation causes. */ -const char *const SlotInvalidationCauses[] = { - [RS_INVAL_NONE] = "none", - [RS_INVAL_WAL_REMOVED] = "wal_removed", - [RS_INVAL_HORIZON] = "rows_removed", - [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", +typedef struct SlotInvalidationCauseMap +{ + ReplicationSlotInvalidationCause cause; + const char *cause_name; +} SlotInvalidationCauseMap; + +static const SlotInvalidationCauseMap SlotInvalidationCauses[] = { + {RS_INVAL_NONE, "none"}, + {RS_INVAL_WAL_REMOVED, "wal_removed"}, + {RS_INVAL_HORIZON, "rows_removed"}, + {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"}, + {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"}, }; -/* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL - +/* + * Ensure that the lookup table is up-to-date with the enums defined in + * ReplicationSlotInvalidationCause. + */ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -141,6 +149,12 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 10; /* the maximum number of replication * slots */ +/* + * Invalidate replication slots that have remained idle longer than this + * duration; '0' disables it. + */ +int idle_replication_slot_timeout_mins = 0; + /* * This GUC lists streaming replication standby server slot names that * logical WAL sender processes will wait for. @@ -575,7 +589,7 @@ retry: errmsg("can no longer access replication slot \"%s\"", NameStr(s->data.name)), errdetail("This replication slot has been invalidated due to \"%s\".", - SlotInvalidationCauses[s->data.invalidated])); + GetSlotInvalidationCauseName(s->data.invalidated))); } /* @@ -592,14 +606,23 @@ retry: if (!nowait) ConditionVariablePrepareToSleep(&s->active_cv); + /* + * It is important to reset the inactive_since under spinlock here to + * avoid race conditions with slot invalidation. See comments related + * to inactive_since in InvalidatePossiblyObsoleteSlot. + */ SpinLockAcquire(&s->mutex); if (s->active_pid == 0) s->active_pid = MyProcPid; active_pid = s->active_pid; + ReplicationSlotSetInactiveSince(s, 0, false); SpinLockRelease(&s->mutex); } else + { active_pid = MyProcPid; + ReplicationSlotSetInactiveSince(s, 0, true); + } LWLockRelease(ReplicationSlotControlLock); /* @@ -640,11 +663,6 @@ retry: if (SlotIsLogical(s)) pgstat_acquire_replslot(s); - /* - * Reset the time since the slot has become inactive as the slot is active - * now. - */ - ReplicationSlotSetInactiveSince(s, 0, true); if (am_walsender) { @@ -1512,12 +1530,14 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, - TransactionId snapshotConflictHorizon) + TransactionId snapshotConflictHorizon, + long slot_idle_seconds) { StringInfoData err_detail; - bool hint = false; + StringInfoData err_hint; initStringInfo(&err_detail); + initStringInfo(&err_hint); switch (cause) { @@ -1525,13 +1545,15 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, { unsigned long long ex = oldestLSN - restart_lsn; - hint = true; appendStringInfo(&err_detail, ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.", "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", ex), LSN_FORMAT_ARGS(restart_lsn), ex); + /* translator: %s is a GUC variable name */ + appendStringInfo(&err_hint, _("You might need to increase \"%s\"."), + "max_slot_wal_keep_size"); break; } case RS_INVAL_HORIZON: @@ -1542,6 +1564,21 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, case RS_INVAL_WAL_LEVEL: appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server.")); break; + + case RS_INVAL_IDLE_TIMEOUT: + { + int minutes = slot_idle_seconds / SECS_PER_MINUTE; + int secs = slot_idle_seconds % SECS_PER_MINUTE; + + /* translator: %s is a GUC variable name */ + appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."), + minutes, secs, "idle_replication_slot_timeout", + idle_replication_slot_timeout_mins); + /* translator: %s is a GUC variable name */ + appendStringInfo(&err_hint, _("You might need to increase \"%s\"."), + "idle_replication_slot_timeout"); + break; + } case RS_INVAL_NONE: pg_unreachable(); } @@ -1553,9 +1590,99 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, errmsg("invalidating obsolete replication slot \"%s\"", NameStr(slotname)), errdetail_internal("%s", err_detail.data), - hint ? errhint("You might need to increase \"%s\".", "max_slot_wal_keep_size") : 0); + err_hint.len ? errhint("%s", err_hint.data) : 0); pfree(err_detail.data); + pfree(err_hint.data); +} + +/* + * Can we invalidate an idle replication slot? + * + * Idle timeout invalidation is allowed only when: + * + * 1. Idle timeout is set + * 2. Slot has reserved WAL + * 3. Slot is inactive + * 4. The slot is not being synced from the primary while the server is in + * recovery. This is because synced slots are always considered to be + * inactive because they don't perform logical decoding to produce changes. + */ +static inline bool +CanInvalidateIdleSlot(ReplicationSlot *s) +{ + return (idle_replication_slot_timeout_mins != 0 && + !XLogRecPtrIsInvalid(s->data.restart_lsn) && + s->inactive_since > 0 && + !(RecoveryInProgress() && s->data.synced)); +} + +/* + * DetermineSlotInvalidationCause - Determine the cause for which a slot + * becomes invalid among the given possible causes. + * + * This function sequentially checks all possible invalidation causes and + * returns the first one for which the slot is eligible for invalidation. + */ +static ReplicationSlotInvalidationCause +DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, + XLogRecPtr oldestLSN, Oid dboid, + TransactionId snapshotConflictHorizon, + TransactionId initial_effective_xmin, + TransactionId initial_catalog_effective_xmin, + XLogRecPtr initial_restart_lsn, + TimestampTz *inactive_since, TimestampTz now) +{ + Assert(possible_causes != RS_INVAL_NONE); + + if (possible_causes & RS_INVAL_WAL_REMOVED) + { + if (initial_restart_lsn != InvalidXLogRecPtr && + initial_restart_lsn < oldestLSN) + return RS_INVAL_WAL_REMOVED; + } + + if (possible_causes & RS_INVAL_HORIZON) + { + /* invalid DB oid signals a shared relation */ + if (SlotIsLogical(s) && + (dboid == InvalidOid || dboid == s->data.database)) + { + if (TransactionIdIsValid(initial_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_effective_xmin, + snapshotConflictHorizon)) + return RS_INVAL_HORIZON; + else if (TransactionIdIsValid(initial_catalog_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, + snapshotConflictHorizon)) + return RS_INVAL_HORIZON; + } + } + + if (possible_causes & RS_INVAL_WAL_LEVEL) + { + if (SlotIsLogical(s)) + return RS_INVAL_WAL_LEVEL; + } + + if (possible_causes & RS_INVAL_IDLE_TIMEOUT) + { + Assert(now > 0); + + /* + * Check if the slot needs to be invalidated due to + * idle_replication_slot_timeout GUC. + */ + if (CanInvalidateIdleSlot(s) && + TimestampDifferenceExceedsSeconds(s->inactive_since, now, + idle_replication_slot_timeout_mins * SECS_PER_MINUTE)) + { + *inactive_since = s->inactive_since; + return RS_INVAL_IDLE_TIMEOUT; + } + } + + return RS_INVAL_NONE; } /* @@ -1572,7 +1699,7 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, * for syscalls, so caller must restart if we return true. */ static bool -InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, +InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, @@ -1585,6 +1712,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, TransactionId initial_catalog_effective_xmin = InvalidTransactionId; XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr; ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; + TimestampTz inactive_since = 0; for (;;) { @@ -1592,6 +1720,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, NameData slotname; int active_pid = 0; ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE; + TimestampTz now = 0; + long slot_idle_secs = 0; Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); @@ -1602,6 +1732,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, break; } + if (possible_causes & RS_INVAL_IDLE_TIMEOUT) + { + /* + * Assign the current time here to avoid system call overhead + * while holding the spinlock in subsequent code. + */ + now = GetCurrentTimestamp(); + } + /* * Check if the slot needs to be invalidated. If it needs to be * invalidated, and is not currently acquired, acquire it and mark it @@ -1621,6 +1760,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * those values change since the process holding the slot has been * terminated (if any), so record them here to ensure that we * would report the correct invalidation cause. + * + * Unlike other slot attributes, slot's inactive_since can't be + * changed until the acquired slot is released or the owning + * process is terminated. So, the inactive slot can only be + * invalidated immediately without being terminated. */ if (!terminated) { @@ -1629,35 +1773,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, initial_catalog_effective_xmin = s->effective_catalog_xmin; } - switch (cause) - { - case RS_INVAL_WAL_REMOVED: - if (initial_restart_lsn != InvalidXLogRecPtr && - initial_restart_lsn < oldestLSN) - invalidation_cause = cause; - break; - case RS_INVAL_HORIZON: - if (!SlotIsLogical(s)) - break; - /* invalid DB oid signals a shared relation */ - if (dboid != InvalidOid && dboid != s->data.database) - break; - if (TransactionIdIsValid(initial_effective_xmin) && - TransactionIdPrecedesOrEquals(initial_effective_xmin, - snapshotConflictHorizon)) - invalidation_cause = cause; - else if (TransactionIdIsValid(initial_catalog_effective_xmin) && - TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, - snapshotConflictHorizon)) - invalidation_cause = cause; - break; - case RS_INVAL_WAL_LEVEL: - if (SlotIsLogical(s)) - invalidation_cause = cause; - break; - case RS_INVAL_NONE: - pg_unreachable(); - } + invalidation_cause = DetermineSlotInvalidationCause(possible_causes, + s, oldestLSN, + dboid, + snapshotConflictHorizon, + initial_effective_xmin, + initial_catalog_effective_xmin, + initial_restart_lsn, + &inactive_since, + now); } /* @@ -1705,12 +1829,25 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, /* * The logical replication slots shouldn't be invalidated as GUC - * max_slot_wal_keep_size is set to -1 during the binary upgrade. See - * check_old_cluster_for_valid_slots() where we ensure that no - * invalidated before the upgrade. + * max_slot_wal_keep_size is set to -1 and + * idle_replication_slot_timeout is set to 0 during the binary + * upgrade. See check_old_cluster_for_valid_slots() where we ensure + * that no invalidated before the upgrade. */ Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade)); + /* + * Calculate the idle time duration of the slot if slot is marked + * invalidated with RS_INVAL_IDLE_TIMEOUT. + */ + if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT) + { + int slot_idle_usecs; + + TimestampDifference(inactive_since, now, &slot_idle_secs, + &slot_idle_usecs); + } + if (active_pid != 0) { /* @@ -1739,7 +1876,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, { ReportSlotInvalidation(invalidation_cause, true, active_pid, slotname, restart_lsn, - oldestLSN, snapshotConflictHorizon); + oldestLSN, snapshotConflictHorizon, + slot_idle_secs); if (MyBackendType == B_STARTUP) (void) SendProcSignal(active_pid, @@ -1785,7 +1923,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReportSlotInvalidation(invalidation_cause, false, active_pid, slotname, restart_lsn, - oldestLSN, snapshotConflictHorizon); + oldestLSN, snapshotConflictHorizon, + slot_idle_secs); /* done with this slot for now */ break; @@ -1802,26 +1941,32 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * * Returns true when any slot have got invalidated. * - * Whether a slot needs to be invalidated depends on the cause. A slot is - * removed if it: + * Whether a slot needs to be invalidated depends on the invalidation cause. + * A slot is invalidated if it: * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given * db; dboid may be InvalidOid for shared relations - * - RS_INVAL_WAL_LEVEL: is logical + * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient + * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured + * "idle_replication_slot_timeout" duration. + * + * Note: This function attempts to invalidate the slot for multiple possible + * causes in a single pass, minimizing redundant iterations. The "cause" + * parameter can be a MASK representing one or more of the defined causes. * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ bool -InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, +InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon) { XLogRecPtr oldestLSN; bool invalidated = false; - Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon)); - Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0); - Assert(cause != RS_INVAL_NONE); + Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon)); + Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0); + Assert(possible_causes != RS_INVAL_NONE); if (max_replication_slots == 0) return invalidated; @@ -1837,7 +1982,7 @@ restart: if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid, + if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid, snapshotConflictHorizon, &invalidated)) { @@ -2426,26 +2571,37 @@ RestoreSlotFromDisk(const char *name) * ReplicationSlotInvalidationCause. */ ReplicationSlotInvalidationCause -GetSlotInvalidationCause(const char *invalidation_reason) +GetSlotInvalidationCause(const char *cause_name) { - ReplicationSlotInvalidationCause cause; - ReplicationSlotInvalidationCause result = RS_INVAL_NONE; - bool found PG_USED_FOR_ASSERTS_ONLY = false; + Assert(cause_name); - Assert(invalidation_reason); - - for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++) + /* Search lookup table for the cause having this name */ + for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++) { - if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0) - { - found = true; - result = cause; - break; - } + if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0) + return SlotInvalidationCauses[i].cause; } - Assert(found); - return result; + Assert(false); + return RS_INVAL_NONE; /* to keep compiler quiet */ +} + +/* + * Maps an ReplicationSlotInvalidationCause to the invalidation + * reason for a replication slot. + */ +const char * +GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause) +{ + /* Search lookup table for the name of this cause */ + for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++) + { + if (SlotInvalidationCauses[i].cause == cause) + return SlotInvalidationCauses[i].cause_name; + } + + Assert(false); + return "none"; /* to keep compiler quiet */ } /* @@ -2802,3 +2958,22 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) ConditionVariableCancelSleep(); } + +/* + * GUC check_hook for idle_replication_slot_timeout + * + * The value of idle_replication_slot_timeout must be set to 0 during + * a binary upgrade. See start_postmaster() in pg_upgrade for more details. + */ +bool +check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source) +{ + if (IsBinaryUpgrade && *newval != 0) + { + GUC_check_errdetail("The value of \"%s\" must be set to 0 during binary upgrade mode.", + "idle_replication_slot_timeout"); + return false; + } + + return true; +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 8be4b8c65b5..f652ec8a73e 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -431,7 +431,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) if (cause == RS_INVAL_NONE) nulls[i++] = true; else - values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]); + values[i++] = CStringGetTextDatum(GetSlotInvalidationCauseName(cause)); values[i++] = BoolGetDatum(slot_contents.data.failover); diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index ba9bae05069..9682f9dbdca 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -1786,6 +1786,24 @@ TimestampDifferenceExceeds(TimestampTz start_time, return (diff >= msec * INT64CONST(1000)); } +/* + * Check if the difference between two timestamps is >= a given + * threshold (expressed in seconds). + */ +bool +TimestampDifferenceExceedsSeconds(TimestampTz start_time, + TimestampTz stop_time, + int threshold_sec) +{ + long secs; + int usecs; + + /* Calculate the difference in seconds */ + TimestampDifference(start_time, stop_time, &secs, &usecs); + + return (secs >= threshold_sec); +} + /* * Convert a time_t to TimestampTz. * diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index aac91a6e31f..3cde94a1759 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3068,6 +3068,18 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"idle_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the duration a replication slot can remain idle before " + "it is invalidated."), + NULL, + GUC_UNIT_MIN + }, + &idle_replication_slot_timeout_mins, + 0, 0, INT_MAX / SECS_PER_MINUTE, + check_idle_replication_slot_timeout, NULL, NULL + }, + { {"commit_delay", PGC_SUSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d472987ed46..415f253096c 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -329,6 +329,7 @@ # (change requires restart) #wal_keep_size = 0 # in megabytes; 0 disables #max_slot_wal_keep_size = -1 # in megabytes; -1 disables +#idle_replication_slot_timeout = 0 # in minutes; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 37fdf150b41..9fdf15e5ac0 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -1457,6 +1457,10 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_ appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path); appendShellString(pg_ctl_cmd, subscriber_dir); appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\""); + + /* Prevent unintended slot invalidation */ + appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\""); + if (restricted_access) { appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port); diff --git a/src/bin/pg_upgrade/server.c b/src/bin/pg_upgrade/server.c index de6971cde6a..873e5b5117b 100644 --- a/src/bin/pg_upgrade/server.c +++ b/src/bin/pg_upgrade/server.c @@ -252,6 +252,13 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error) if (GET_MAJOR_VERSION(cluster->major_version) >= 1700) appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1"); + /* + * Use idle_replication_slot_timeout=0 to prevent slot invalidation due to + * idle_timeout by checkpointer process during upgrade. + */ + if (GET_MAJOR_VERSION(cluster->major_version) >= 1800) + appendPQExpBufferStr(&pgoptions, " -c idle_replication_slot_timeout=0"); + /* * Use -b to disable autovacuum and logical replication launcher * (effective in PG17 or later for the latter). diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 000c36d30dd..f5a24ccfbf2 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -44,21 +44,25 @@ typedef enum ReplicationSlotPersistency * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the * 'invalidated' field is set to a value other than _NONE. * - * When adding a new invalidation cause here, remember to update - * SlotInvalidationCauses and RS_INVAL_MAX_CAUSES. + * When adding a new invalidation cause here, the value must be powers of 2 + * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update + * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c. */ typedef enum ReplicationSlotInvalidationCause { - RS_INVAL_NONE, + RS_INVAL_NONE = 0, /* required WAL has been removed */ - RS_INVAL_WAL_REMOVED, + RS_INVAL_WAL_REMOVED = (1 << 0), /* required rows have been removed */ - RS_INVAL_HORIZON, + RS_INVAL_HORIZON = (1 << 1), /* wal_level insufficient for slot */ - RS_INVAL_WAL_LEVEL, + RS_INVAL_WAL_LEVEL = (1 << 2), + /* idle slot timeout has occurred */ + RS_INVAL_IDLE_TIMEOUT = (1 << 3), } ReplicationSlotInvalidationCause; -extern PGDLLIMPORT const char *const SlotInvalidationCauses[]; +/* Maximum number of invalidation causes */ +#define RS_INVAL_MAX_CAUSES 4 /* * On-Disk data of a replication slot, preserved across restarts. @@ -254,6 +258,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; extern PGDLLIMPORT char *synchronized_standby_slots; +extern PGDLLIMPORT int idle_replication_slot_timeout_mins; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -286,7 +291,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); -extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, +extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon); @@ -303,6 +308,7 @@ extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); extern ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *invalidation_reason); +extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause); extern bool SlotExistsInSyncStandbySlots(const char *slot_name); extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel); diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 87999218d68..951451a9765 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -174,5 +174,7 @@ extern void assign_wal_sync_method(int new_wal_sync_method, void *extra); extern bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source); extern void assign_synchronized_standby_slots(const char *newval, void *extra); +extern bool check_idle_replication_slot_timeout(int *newval, void **extra, + GucSource source); #endif /* GUC_HOOKS_H */ diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index d26f023fb87..9963bddc0ec 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -107,6 +107,9 @@ extern long TimestampDifferenceMilliseconds(TimestampTz start_time, extern bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec); +extern bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, + TimestampTz stop_time, + int threshold_sec); extern TimestampTz time_t_to_timestamptz(pg_time_t tm); extern pg_time_t timestamptz_to_time_t(TimestampTz t); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 80aa50d55a4..64c6bf7a891 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2690,6 +2690,7 @@ SkipPages SlabBlock SlabContext SlabSlot +SlotInvalidationCauseMap SlotNumber SlotSyncCtxStruct SlruCtl