mirror of
https://github.com/postgres/postgres.git
synced 2025-07-27 12:41:57 +03:00
Track invalidation_reason in pg_replication_slots.
Till now, the reason for replication slot invalidation is not tracked directly in pg_replication_slots. A recent commit007693f2a3
added 'conflict_reason' to show the reasons for slot conflict/invalidation, but only for logical slots. This commit adds a new column 'invalidation_reason' to show invalidation reasons for both physical and logical slots. And, this commit also turns 'conflict_reason' text column to 'conflicting' boolean column (effectively reverting commit007693f2a3
). The 'conflicting' column is true for invalidation reasons 'rows_removed' and 'wal_level_insufficient' because those make the slot conflict with recovery. When 'conflicting' is true, one can now look at the new 'invalidation_reason' column for the reason for the logical slot's conflict with recovery. The new 'invalidation_reason' column will also be useful to track other invalidation reasons in the future commit. Author: Bharath Rupireddy Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik Discussion: https://www.postgresql.org/message-id/ZfR7HuzFEswakt/a%40ip-10-97-1-34.eu-west-3.compute.internal Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
This commit is contained in:
@ -663,7 +663,7 @@ synchronize_slots(WalReceiverConn *wrconn)
|
||||
bool started_tx = false;
|
||||
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
|
||||
" restart_lsn, catalog_xmin, two_phase, failover,"
|
||||
" database, conflict_reason"
|
||||
" database, invalidation_reason"
|
||||
" FROM pg_catalog.pg_replication_slots"
|
||||
" WHERE failover and NOT temporary";
|
||||
|
||||
|
@ -1525,14 +1525,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
XLogRecPtr initial_effective_xmin = InvalidXLogRecPtr;
|
||||
XLogRecPtr initial_catalog_effective_xmin = InvalidXLogRecPtr;
|
||||
XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
|
||||
ReplicationSlotInvalidationCause conflict_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
|
||||
ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
XLogRecPtr restart_lsn;
|
||||
NameData slotname;
|
||||
int active_pid = 0;
|
||||
ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
|
||||
ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
|
||||
|
||||
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
|
||||
|
||||
@ -1554,17 +1554,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
|
||||
restart_lsn = s->data.restart_lsn;
|
||||
|
||||
/*
|
||||
* If the slot is already invalid or is a non conflicting slot, we
|
||||
* don't need to do anything.
|
||||
*/
|
||||
/* we do nothing if the slot is already invalid */
|
||||
if (s->data.invalidated == RS_INVAL_NONE)
|
||||
{
|
||||
/*
|
||||
* The slot's mutex will be released soon, and it is possible that
|
||||
* those values change since the process holding the slot has been
|
||||
* terminated (if any), so record them here to ensure that we
|
||||
* would report the correct conflict cause.
|
||||
* would report the correct invalidation cause.
|
||||
*/
|
||||
if (!terminated)
|
||||
{
|
||||
@ -1578,7 +1575,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
case RS_INVAL_WAL_REMOVED:
|
||||
if (initial_restart_lsn != InvalidXLogRecPtr &&
|
||||
initial_restart_lsn < oldestLSN)
|
||||
conflict = cause;
|
||||
invalidation_cause = cause;
|
||||
break;
|
||||
case RS_INVAL_HORIZON:
|
||||
if (!SlotIsLogical(s))
|
||||
@ -1589,15 +1586,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
if (TransactionIdIsValid(initial_effective_xmin) &&
|
||||
TransactionIdPrecedesOrEquals(initial_effective_xmin,
|
||||
snapshotConflictHorizon))
|
||||
conflict = cause;
|
||||
invalidation_cause = cause;
|
||||
else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
|
||||
TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
|
||||
snapshotConflictHorizon))
|
||||
conflict = cause;
|
||||
invalidation_cause = cause;
|
||||
break;
|
||||
case RS_INVAL_WAL_LEVEL:
|
||||
if (SlotIsLogical(s))
|
||||
conflict = cause;
|
||||
invalidation_cause = cause;
|
||||
break;
|
||||
case RS_INVAL_NONE:
|
||||
pg_unreachable();
|
||||
@ -1605,14 +1602,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
}
|
||||
|
||||
/*
|
||||
* The conflict cause recorded previously should not change while the
|
||||
* process owning the slot (if any) has been terminated.
|
||||
* The invalidation cause recorded previously should not change while
|
||||
* the process owning the slot (if any) has been terminated.
|
||||
*/
|
||||
Assert(!(conflict_prev != RS_INVAL_NONE && terminated &&
|
||||
conflict_prev != conflict));
|
||||
Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
|
||||
invalidation_cause_prev != invalidation_cause));
|
||||
|
||||
/* if there's no conflict, we're done */
|
||||
if (conflict == RS_INVAL_NONE)
|
||||
/* if there's no invalidation, we're done */
|
||||
if (invalidation_cause == RS_INVAL_NONE)
|
||||
{
|
||||
SpinLockRelease(&s->mutex);
|
||||
if (released_lock)
|
||||
@ -1632,13 +1629,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
{
|
||||
MyReplicationSlot = s;
|
||||
s->active_pid = MyProcPid;
|
||||
s->data.invalidated = conflict;
|
||||
s->data.invalidated = invalidation_cause;
|
||||
|
||||
/*
|
||||
* XXX: We should consider not overwriting restart_lsn and instead
|
||||
* just rely on .invalidated.
|
||||
*/
|
||||
if (conflict == RS_INVAL_WAL_REMOVED)
|
||||
if (invalidation_cause == RS_INVAL_WAL_REMOVED)
|
||||
s->data.restart_lsn = InvalidXLogRecPtr;
|
||||
|
||||
/* Let caller know */
|
||||
@ -1681,7 +1678,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
*/
|
||||
if (last_signaled_pid != active_pid)
|
||||
{
|
||||
ReportSlotInvalidation(conflict, true, active_pid,
|
||||
ReportSlotInvalidation(invalidation_cause, true, active_pid,
|
||||
slotname, restart_lsn,
|
||||
oldestLSN, snapshotConflictHorizon);
|
||||
|
||||
@ -1694,7 +1691,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
|
||||
last_signaled_pid = active_pid;
|
||||
terminated = true;
|
||||
conflict_prev = conflict;
|
||||
invalidation_cause_prev = invalidation_cause;
|
||||
}
|
||||
|
||||
/* Wait until the slot is released. */
|
||||
@ -1727,7 +1724,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
|
||||
ReplicationSlotSave();
|
||||
ReplicationSlotRelease();
|
||||
|
||||
ReportSlotInvalidation(conflict, false, active_pid,
|
||||
ReportSlotInvalidation(invalidation_cause, false, active_pid,
|
||||
slotname, restart_lsn,
|
||||
oldestLSN, snapshotConflictHorizon);
|
||||
|
||||
@ -2356,21 +2353,21 @@ RestoreSlotFromDisk(const char *name)
|
||||
}
|
||||
|
||||
/*
|
||||
* Maps a conflict reason for a replication slot to
|
||||
* Maps an invalidation reason for a replication slot to
|
||||
* ReplicationSlotInvalidationCause.
|
||||
*/
|
||||
ReplicationSlotInvalidationCause
|
||||
GetSlotInvalidationCause(const char *conflict_reason)
|
||||
GetSlotInvalidationCause(const char *invalidation_reason)
|
||||
{
|
||||
ReplicationSlotInvalidationCause cause;
|
||||
ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
|
||||
bool found PG_USED_FOR_ASSERTS_ONLY = false;
|
||||
|
||||
Assert(conflict_reason);
|
||||
Assert(invalidation_reason);
|
||||
|
||||
for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
|
||||
{
|
||||
if (strcmp(SlotInvalidationCauses[cause], conflict_reason) == 0)
|
||||
if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
|
||||
{
|
||||
found = true;
|
||||
result = cause;
|
||||
|
@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
|
||||
Datum
|
||||
pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#define PG_GET_REPLICATION_SLOTS_COLS 17
|
||||
#define PG_GET_REPLICATION_SLOTS_COLS 18
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
XLogRecPtr currlsn;
|
||||
int slotno;
|
||||
@ -263,6 +263,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
|
||||
WALAvailability walstate;
|
||||
int i;
|
||||
ReplicationSlotInvalidationCause cause;
|
||||
|
||||
if (!slot->in_use)
|
||||
continue;
|
||||
@ -409,18 +410,28 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
||||
|
||||
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
|
||||
|
||||
if (slot_contents.data.database == InvalidOid)
|
||||
cause = slot_contents.data.invalidated;
|
||||
|
||||
if (SlotIsPhysical(&slot_contents))
|
||||
nulls[i++] = true;
|
||||
else
|
||||
{
|
||||
ReplicationSlotInvalidationCause cause = slot_contents.data.invalidated;
|
||||
|
||||
if (cause == RS_INVAL_NONE)
|
||||
nulls[i++] = true;
|
||||
/*
|
||||
* rows_removed and wal_level_insufficient are the only two
|
||||
* reasons for the logical slot's conflict with recovery.
|
||||
*/
|
||||
if (cause == RS_INVAL_HORIZON ||
|
||||
cause == RS_INVAL_WAL_LEVEL)
|
||||
values[i++] = BoolGetDatum(true);
|
||||
else
|
||||
values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
|
||||
values[i++] = BoolGetDatum(false);
|
||||
}
|
||||
|
||||
if (cause == RS_INVAL_NONE)
|
||||
nulls[i++] = true;
|
||||
else
|
||||
values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
|
||||
|
||||
values[i++] = BoolGetDatum(slot_contents.data.failover);
|
||||
|
||||
values[i++] = BoolGetDatum(slot_contents.data.synced);
|
||||
|
Reference in New Issue
Block a user