1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-28 11:44:57 +03:00

Add slotsync_skip_reason column to pg_replication_slots view.

Introduce a new column, slotsync_skip_reason, in the pg_replication_slots
view. This column records the reason why the last slot synchronization was
skipped. It is primarily relevant for logical replication slots on standby
servers where the 'synced' field is true. The value is NULL when
synchronization succeeds.

Author: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Ashutosh Sharma <ashu.coek88@gmail.com>
Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAE9k0PkhfKrTEAsGz4DjOhEj1nQ+hbQVfvWUxNacD38ibW3a1g@mail.gmail.com
This commit is contained in:
Amit Kapila
2025-11-28 05:21:35 +00:00
parent 9ccc049dfe
commit e68b6adad9
13 changed files with 149 additions and 17 deletions

View File

@@ -1665,7 +1665,7 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para>
<para>
Number of times the slot synchronization is skipped. Slot
synchronization occur only on standby servers and thus this column has
synchronization occurs only on standby servers and thus this column has
no meaning on the primary server.
</para>
</entry>
@@ -1677,7 +1677,7 @@ description | Waiting for a newly initialized WAL file to reach durable storage
</para>
<para>
Time at which last slot synchronization was skipped. Slot
synchronization occur only on standby servers and thus this column has
synchronization occurs only on standby servers and thus this column has
no meaning on the primary server.
</para>
</entry>

View File

@@ -3102,6 +3102,49 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>slotsync_skip_reason</structfield><type>text</type>
</para>
<para>
The reason for the last slot synchronization skip. Slot
synchronization occurs only on standby servers and thus this column has
no meaning on the primary server. It is relevant mainly for logical slots
on standby servers whose <structfield>synced</structfield> field is
<literal>true</literal>. It is <literal>NULL</literal> if slot
synchronization is successful.
Possible values are:
<itemizedlist spacing="compact">
<listitem>
<para>
<literal>wal_or_rows_removed</literal> means that the required WALs or
catalog rows have already been removed or are at the risk of removal
from the standby.
</para>
</listitem>
<listitem>
<para>
<literal>wal_not_flushed</literal> means that the standby had not
flushed the WAL corresponding to the position reserved on the failover
slot.
</para>
</listitem>
<listitem>
<para>
<literal>no_consistent_snapshot</literal> means that the standby could
not build a consistent snapshot to decode WALs from
<structfield>restart_lsn</structfield>.
</para>
</listitem>
<listitem>
<para>
<literal>slot_invalidated</literal> means that the synced slot is
invalidated.
</para>
</listitem>
</itemizedlist>
</para></entry>
</row>
</tbody>
</tgroup>
</table>

View File

@@ -1060,7 +1060,8 @@ CREATE VIEW pg_replication_slots AS
L.conflicting,
L.invalidation_reason,
L.failover,
L.synced
L.synced,
L.slotsync_skip_reason
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);

View File

@@ -148,6 +148,35 @@ typedef struct RemoteSlot
static void slotsync_failure_callback(int code, Datum arg);
static void update_synced_slots_inactive_since(void);
/*
* Update slot sync skip stats. This function requires the caller to acquire
* the slot.
*/
static void
update_slotsync_skip_stats(SlotSyncSkipReason skip_reason)
{
ReplicationSlot *slot;
Assert(MyReplicationSlot);
slot = MyReplicationSlot;
/*
* Update the slot sync related stats in pg_stat_replication_slot when a
* slot sync is skipped
*/
if (skip_reason != SS_SKIP_NONE)
pgstat_report_replslotsync(slot);
/* Update the slot sync skip reason */
if (slot->slotsync_skip_reason != skip_reason)
{
SpinLockAcquire(&slot->mutex);
slot->slotsync_skip_reason = skip_reason;
SpinLockRelease(&slot->mutex);
}
}
/*
* If necessary, update the local synced slot's metadata based on the data
* from the remote slot.
@@ -170,6 +199,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
ReplicationSlot *slot = MyReplicationSlot;
bool updated_xmin_or_lsn = false;
bool updated_config = false;
SlotSyncSkipReason skip_reason = SS_SKIP_NONE;
Assert(slot->data.invalidated == RS_INVAL_NONE);
@@ -188,7 +218,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
slot->data.catalog_xmin))
{
/* Update slot sync skip stats */
pgstat_report_replslotsync(slot);
update_slotsync_skip_stats(SS_SKIP_WAL_OR_ROWS_REMOVED);
/*
* This can happen in following situations:
@@ -286,12 +316,15 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
* persisted. See update_and_persist_local_synced_slot().
*/
if (found_consistent_snapshot && !(*found_consistent_snapshot))
pgstat_report_replslotsync(slot);
skip_reason = SS_SKIP_NO_CONSISTENT_SNAPSHOT;
}
updated_xmin_or_lsn = true;
}
/* Update slot sync skip stats */
update_slotsync_skip_stats(skip_reason);
if (remote_dbid != slot->data.database ||
remote_slot->two_phase != slot->data.two_phase ||
remote_slot->failover != slot->data.failover ||
@@ -696,7 +729,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Skip the sync of an invalidated slot */
if (slot->data.invalidated != RS_INVAL_NONE)
{
pgstat_report_replslotsync(slot);
update_slotsync_skip_stats(SS_SKIP_INVALID);
ReplicationSlotRelease();
return slot_updated;
@@ -711,7 +744,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
*/
if (remote_slot->confirmed_lsn > latestFlushPtr)
{
pgstat_report_replslotsync(slot);
update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
/*
* Can get here only if GUC 'synchronized_standby_slots' on the
@@ -812,7 +845,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
*/
if (remote_slot->confirmed_lsn > latestFlushPtr)
{
pgstat_report_replslotsync(slot);
update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
/*
* Can get here only if GUC 'synchronized_standby_slots' on the

View File

@@ -491,6 +491,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
slot->last_saved_restart_lsn = InvalidXLogRecPtr;
slot->inactive_since = 0;
slot->slotsync_skip_reason = SS_SKIP_NONE;
/*
* Create the slot on disk. We haven't actually marked the slot allocated

View File

@@ -24,6 +24,17 @@
#include "utils/guc.h"
#include "utils/pg_lsn.h"
/*
* Map SlotSyncSkipReason enum values to human-readable names.
*/
static const char *SlotSyncSkipReasonNames[] = {
[SS_SKIP_NONE] = "none",
[SS_SKIP_WAL_NOT_FLUSHED] = "wal_not_flushed",
[SS_SKIP_WAL_OR_ROWS_REMOVED] = "wal_or_rows_removed",
[SS_SKIP_NO_CONSISTENT_SNAPSHOT] = "no_consistent_snapshot",
[SS_SKIP_INVALID] = "slot_invalidated"
};
/*
* Helper function for creating a new physical replication slot with
* given arguments. Note that this function doesn't release the created
@@ -235,7 +246,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
#define PG_GET_REPLICATION_SLOTS_COLS 20
#define PG_GET_REPLICATION_SLOTS_COLS 21
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@@ -443,6 +454,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.synced);
if (slot_contents.slotsync_skip_reason == SS_SKIP_NONE)
nulls[i++] = true;
else
values[i++] = CStringGetTextDatum(SlotSyncSkipReasonNames[slot_contents.slotsync_skip_reason]);
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,

View File

@@ -115,8 +115,8 @@ pgstat_report_replslotsync(ReplicationSlot *slot)
PgStatShared_ReplSlot *shstatent;
PgStat_StatReplSlotEntry *statent;
/* Slot sync stats are valid only for logical slots on standby. */
Assert(SlotIsLogical(slot));
/* Slot sync stats are valid only for synced logical slots on standby. */
Assert(slot->data.synced);
Assert(RecoveryInProgress());
entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,

View File

@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202511251
#define CATALOG_VERSION_NO 202511281
#endif

View File

@@ -11519,9 +11519,9 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced}',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,pg_lsn,timestamptz,bool,text,bool,bool,text}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,two_phase_at,inactive_since,conflicting,invalidation_reason,failover,synced,slotsync_skip_reason}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',

View File

@@ -71,6 +71,24 @@ typedef enum ReplicationSlotInvalidationCause
/* Maximum number of invalidation causes */
#define RS_INVAL_MAX_CAUSES 4
/*
* When the slot synchronization worker is running, or when
* pg_sync_replication_slots is executed, slot synchronization may be
* skipped. This enum defines the possible reasons for skipping slot
* synchronization.
*/
typedef enum SlotSyncSkipReason
{
SS_SKIP_NONE, /* No skip */
SS_SKIP_WAL_NOT_FLUSHED, /* Standby did not flush the wal corresponding
* to confirmed flush of remote slot */
SS_SKIP_WAL_OR_ROWS_REMOVED, /* Remote slot is behind; required WAL or
* rows may be removed or at risk */
SS_SKIP_NO_CONSISTENT_SNAPSHOT, /* Standby could not build a consistent
* snapshot */
SS_SKIP_INVALID /* Local slot is invalid */
} SlotSyncSkipReason;
/*
* On-Disk data of a replication slot, preserved across restarts.
*/
@@ -249,6 +267,18 @@ typedef struct ReplicationSlot
*/
XLogRecPtr last_saved_restart_lsn;
/*
* Reason for the most recent slot synchronization skip.
*
* Slot sync skips can occur for both temporary and persistent replication
* slots. They are more common for temporary slots, but persistent slots
* may also skip synchronization in rare cases (e.g.,
* SS_SKIP_WAL_NOT_FLUSHED or SS_SKIP_WAL_OR_ROWS_REMOVED).
*
* Since, temporary slots are dropped after server restart, persisting
* slotsync_skip_reason provides no practical benefit.
*/
SlotSyncSkipReason slotsync_skip_reason;
} ReplicationSlot;
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)

View File

@@ -1049,6 +1049,12 @@ $standby2->wait_for_log(qr/slot sync worker started/, $log_offset);
$standby2->wait_for_log(
qr/could not synchronize replication slot \"lsub1_slot\"/, $log_offset);
# Confirm that the slotsync skip reason is updated
$result = $standby2->safe_psql('postgres',
"SELECT slotsync_skip_reason FROM pg_replication_slots WHERE slot_name = 'lsub1_slot'"
);
is($result, 'wal_or_rows_removed', "check slot sync skip reason");
# Confirm that the slotsync skip statistics is updated
$result = $standby2->safe_psql('postgres',
"SELECT slotsync_skip_count > 0 FROM pg_stat_replication_slots WHERE slot_name = 'lsub1_slot'"

View File

@@ -1507,8 +1507,9 @@ pg_replication_slots| SELECT l.slot_name,
l.conflicting,
l.invalidation_reason,
l.failover,
l.synced
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, two_phase_at, inactive_since, conflicting, invalidation_reason, failover, synced)
l.synced,
l.slotsync_skip_reason
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, two_phase_at, inactive_since, conflicting, invalidation_reason, failover, synced, slotsync_skip_reason)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,

View File

@@ -2807,6 +2807,7 @@ SlabSlot
SlotInvalidationCauseMap
SlotNumber
SlotSyncCtxStruct
SlotSyncSkipReason
SlruCtl
SlruCtlData
SlruErrorCause