1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-30 06:01:21 +03:00

Allow users to limit storage reserved by replication slots

Replication slots are useful to retain data that may be needed by a
replication system.  But experience has shown that allowing them to
retain excessive data can lead to the primary failing because of running
out of space.  This new feature allows the user to configure a maximum
amount of space to be reserved using the new option
max_slot_wal_keep_size.  Slots that overrun that space are invalidated
at checkpoint time, enabling the storage to be released.

Author: Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Jehan-Guillaume de Rorthais <jgdr@dalibo.com>
Reviewed-by: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20170228.122736.123383594.horiguchi.kyotaro@lab.ntt.co.jp
This commit is contained in:
Alvaro Herrera
2020-04-07 18:35:00 -04:00
parent b63c293bcb
commit c655077639
17 changed files with 595 additions and 43 deletions

View File

@@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
else
end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
ReplicationSlotAcquire(NameStr(*name), true);
(void) ReplicationSlotAcquire(NameStr(*name), SAB_Error);
PG_TRY();
{

View File

@@ -325,9 +325,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
/*
* Find a previously created slot and mark it as used by this backend.
*
* The return value is only useful if behavior is SAB_Inquire, in which
* it's zero if we successfully acquired the slot, or the PID of the
* owning process otherwise. If behavior is SAB_Error, then trying to
* acquire an owned slot is an error. If SAB_Block, we sleep until the
* slot is released by the owning process.
*/
void
ReplicationSlotAcquire(const char *name, bool nowait)
int
ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
{
ReplicationSlot *slot;
int active_pid;
@@ -392,11 +398,13 @@ retry:
*/
if (active_pid != MyProcPid)
{
if (nowait)
if (behavior == SAB_Error)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d",
name, active_pid)));
else if (behavior == SAB_Inquire)
return active_pid;
/* Wait here until we get signaled, and then restart */
ConditionVariableSleep(&slot->active_cv,
@@ -412,6 +420,9 @@ retry:
/* We made this slot active, so it's ours now. */
MyReplicationSlot = slot;
/* success */
return 0;
}
/*
@@ -518,7 +529,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
{
Assert(MyReplicationSlot == NULL);
ReplicationSlotAcquire(name, nowait);
(void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
ReplicationSlotDropAcquired();
}
@@ -743,6 +754,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
/*
* Compute the oldest restart LSN across all slots and inform xlog module.
*
* Note: while max_slot_wal_keep_size is theoretically relevant for this
* purpose, we don't try to account for that, because this module doesn't
* know what to compare against.
*/
void
ReplicationSlotsComputeRequiredLSN(void)
@@ -818,6 +833,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
restart_lsn = s->data.restart_lsn;
SpinLockRelease(&s->mutex);
if (restart_lsn == InvalidXLogRecPtr)
continue;
if (result == InvalidXLogRecPtr ||
restart_lsn < result)
result = restart_lsn;
@@ -1064,6 +1082,80 @@ ReplicationSlotReserveWal(void)
}
}
/*
* Mark any slot that points to an LSN older than the given segment
* as invalid; it requires WAL that's about to be removed.
*
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
*/
void
InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
{
XLogRecPtr oldestLSN;
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
restart:
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn = InvalidXLogRecPtr;
char *slotname;
if (!s->in_use)
continue;
SpinLockAcquire(&s->mutex);
if (s->data.restart_lsn == InvalidXLogRecPtr ||
s->data.restart_lsn >= oldestLSN)
{
SpinLockRelease(&s->mutex);
continue;
}
slotname = pstrdup(NameStr(s->data.name));
restart_lsn = s->data.restart_lsn;
SpinLockRelease(&s->mutex);
LWLockRelease(ReplicationSlotControlLock);
for (;;)
{
int wspid = ReplicationSlotAcquire(slotname, SAB_Inquire);
/* no walsender? success! */
if (wspid == 0)
break;
ereport(LOG,
(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
wspid, slotname)));
(void) kill(wspid, SIGTERM);
ConditionVariableTimedSleep(&s->active_cv, 10,
WAIT_EVENT_REPLICATION_SLOT_DROP);
}
ConditionVariableCancelSleep();
ereport(LOG,
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
slotname,
(uint32) (restart_lsn >> 32),
(uint32) restart_lsn)));
SpinLockAcquire(&s->mutex);
s->data.restart_lsn = InvalidXLogRecPtr;
SpinLockRelease(&s->mutex);
ReplicationSlotRelease();
/* if we did anything, start from scratch */
CHECK_FOR_INTERRUPTS();
goto restart;
}
LWLockRelease(ReplicationSlotControlLock);
}
/*
* Flush all replication slots to disk.
*

View File

@@ -234,7 +234,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
#define PG_GET_REPLICATION_SLOTS_COLS 11
#define PG_GET_REPLICATION_SLOTS_COLS 13
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -288,6 +288,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
Oid database;
NameData slot_name;
NameData plugin;
WALAvailability walstate;
XLogSegNo last_removed_seg;
int i;
if (!slot->in_use)
@@ -355,6 +357,40 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
else
nulls[i++] = true;
walstate = GetWALAvailability(restart_lsn);
switch (walstate)
{
case WALAVAIL_INVALID_LSN:
nulls[i++] = true;
break;
case WALAVAIL_NORMAL:
values[i++] = CStringGetTextDatum("normal");
break;
case WALAVAIL_RESERVED:
values[i++] = CStringGetTextDatum("reserved");
break;
case WALAVAIL_REMOVED:
values[i++] = CStringGetTextDatum("lost");
break;
}
if (max_slot_wal_keep_size_mb >= 0 &&
(walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
{
XLogRecPtr min_safe_lsn;
XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0,
wal_segment_size, min_safe_lsn);
values[i++] = Int64GetDatum(min_safe_lsn);
}
else
nulls[i++] = true;
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
LWLockRelease(ReplicationSlotControlLock);
@@ -377,6 +413,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
XLogRecPtr retlsn = startlsn;
Assert(moveto != InvalidXLogRecPtr);
if (startlsn < moveto)
{
SpinLockAcquire(&MyReplicationSlot->mutex);
@@ -414,6 +452,8 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
ResourceOwner old_resowner = CurrentResourceOwner;
XLogRecPtr retlsn;
Assert(moveto != InvalidXLogRecPtr);
PG_TRY();
{
/*
@@ -552,7 +592,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire(NameStr(*slotname), true);
(void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
/* A slot whose restart_lsn has never been reserved cannot be advanced */
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))

View File

@@ -595,7 +595,7 @@ StartReplication(StartReplicationCmd *cmd)
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
(void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1132,7 +1132,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
Assert(!MyReplicationSlot);
ReplicationSlotAcquire(cmd->slotname, true);
(void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
/*
* Force a disconnect, so that the decoding code doesn't need to care