mirror of
https://github.com/postgres/postgres.git
synced 2025-11-28 11:44:57 +03:00
Add slotsync skip statistics.
This patch adds two new columns to the pg_stat_replication_slots view: slotsync_skip_count - the total number of times a slotsync operation was skipped. slotsync_skip_at - the timestamp of the most recent skip. These additions provide better visibility into replication slot synchronization behavior. A future patch will introduce the slotsync_skip_reason column in pg_replication_slots to capture the reason for skip. Author: Shlok Kyal <shlok.kyal.oss@gmail.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Ashutosh Sharma <ashu.coek88@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Discussion: https://postgr.es/m/CAE9k0PkhfKrTEAsGz4DjOhEj1nQ+hbQVfvWUxNacD38ibW3a1g@mail.gmail.com
This commit is contained in:
@@ -78,17 +78,17 @@ SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count,
|
|||||||
|
|
||||||
-- verify accessing/resetting stats for non-existent slot does something reasonable
|
-- verify accessing/resetting stats for non-existent slot does something reasonable
|
||||||
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
|
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
|
||||||
slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset
|
slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_skip_at | stats_reset
|
||||||
--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
|
--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+------------------+-------------
|
||||||
do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
|
do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT pg_stat_reset_replication_slot('do-not-exist');
|
SELECT pg_stat_reset_replication_slot('do-not-exist');
|
||||||
ERROR: replication slot "do-not-exist" does not exist
|
ERROR: replication slot "do-not-exist" does not exist
|
||||||
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
|
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
|
||||||
slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset
|
slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_skip_at | stats_reset
|
||||||
--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
|
--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+------------------+-------------
|
||||||
do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
|
do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- spilling the xact
|
-- spilling the xact
|
||||||
|
|||||||
@@ -1659,6 +1659,30 @@ description | Waiting for a newly initialized WAL file to reach durable storage
|
|||||||
</entry>
|
</entry>
|
||||||
</row>
|
</row>
|
||||||
|
|
||||||
|
<row>
|
||||||
|
<entry role="catalog_table_entry"><para role="column_definition">
|
||||||
|
<structfield>slotsync_skip_count</structfield><type>bigint</type>
|
||||||
|
</para>
|
||||||
|
<para>
|
||||||
|
Number of times the slot synchronization is skipped. Slot
|
||||||
|
synchronization occur only on standby servers and thus this column has
|
||||||
|
no meaning on the primary server.
|
||||||
|
</para>
|
||||||
|
</entry>
|
||||||
|
</row>
|
||||||
|
|
||||||
|
<row>
|
||||||
|
<entry role="catalog_table_entry"><para role="column_definition">
|
||||||
|
<structfield>slotsync_skip_at</structfield><type>timestamp with time zone</type>
|
||||||
|
</para>
|
||||||
|
<para>
|
||||||
|
Time at which last slot synchronization was skipped. Slot
|
||||||
|
synchronization occur only on standby servers and thus this column has
|
||||||
|
no meaning on the primary server.
|
||||||
|
</para>
|
||||||
|
</entry>
|
||||||
|
</row>
|
||||||
|
|
||||||
<row>
|
<row>
|
||||||
<entry role="catalog_table_entry"><para role="column_definition">
|
<entry role="catalog_table_entry"><para role="column_definition">
|
||||||
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
|
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
|
||||||
|
|||||||
@@ -1076,6 +1076,8 @@ CREATE VIEW pg_stat_replication_slots AS
|
|||||||
s.mem_exceeded_count,
|
s.mem_exceeded_count,
|
||||||
s.total_txns,
|
s.total_txns,
|
||||||
s.total_bytes,
|
s.total_bytes,
|
||||||
|
s.slotsync_skip_count,
|
||||||
|
s.slotsync_skip_at,
|
||||||
s.stats_reset
|
s.stats_reset
|
||||||
FROM pg_replication_slots as r,
|
FROM pg_replication_slots as r,
|
||||||
LATERAL pg_stat_get_replication_slot(slot_name) as s
|
LATERAL pg_stat_get_replication_slot(slot_name) as s
|
||||||
|
|||||||
@@ -187,6 +187,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
|
|||||||
TransactionIdPrecedes(remote_slot->catalog_xmin,
|
TransactionIdPrecedes(remote_slot->catalog_xmin,
|
||||||
slot->data.catalog_xmin))
|
slot->data.catalog_xmin))
|
||||||
{
|
{
|
||||||
|
/* Update slot sync skip stats */
|
||||||
|
pgstat_report_replslotsync(slot);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This can happen in following situations:
|
* This can happen in following situations:
|
||||||
*
|
*
|
||||||
@@ -277,6 +280,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
|
|||||||
errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
|
errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
|
||||||
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
|
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
|
||||||
LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
|
LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we can't reach a consistent snapshot, the slot won't be
|
||||||
|
* persisted. See update_and_persist_local_synced_slot().
|
||||||
|
*/
|
||||||
|
if (found_consistent_snapshot && !(*found_consistent_snapshot))
|
||||||
|
pgstat_report_replslotsync(slot);
|
||||||
}
|
}
|
||||||
|
|
||||||
updated_xmin_or_lsn = true;
|
updated_xmin_or_lsn = true;
|
||||||
@@ -563,6 +573,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
|||||||
bool found_consistent_snapshot = false;
|
bool found_consistent_snapshot = false;
|
||||||
bool remote_slot_precedes = false;
|
bool remote_slot_precedes = false;
|
||||||
|
|
||||||
|
/* Slotsync skip stats are handled in function update_local_synced_slot() */
|
||||||
(void) update_local_synced_slot(remote_slot, remote_dbid,
|
(void) update_local_synced_slot(remote_slot, remote_dbid,
|
||||||
&found_consistent_snapshot,
|
&found_consistent_snapshot,
|
||||||
&remote_slot_precedes);
|
&remote_slot_precedes);
|
||||||
@@ -624,31 +635,9 @@ static bool
|
|||||||
synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
||||||
{
|
{
|
||||||
ReplicationSlot *slot;
|
ReplicationSlot *slot;
|
||||||
XLogRecPtr latestFlushPtr;
|
XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL);
|
||||||
bool slot_updated = false;
|
bool slot_updated = false;
|
||||||
|
|
||||||
/*
|
|
||||||
* Make sure that concerned WAL is received and flushed before syncing
|
|
||||||
* slot to target lsn received from the primary server.
|
|
||||||
*/
|
|
||||||
latestFlushPtr = GetStandbyFlushRecPtr(NULL);
|
|
||||||
if (remote_slot->confirmed_lsn > latestFlushPtr)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Can get here only if GUC 'synchronized_standby_slots' on the
|
|
||||||
* primary server was not configured correctly.
|
|
||||||
*/
|
|
||||||
ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
|
|
||||||
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
||||||
errmsg("skipping slot synchronization because the received slot sync"
|
|
||||||
" LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
|
|
||||||
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
|
|
||||||
remote_slot->name,
|
|
||||||
LSN_FORMAT_ARGS(latestFlushPtr)));
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Search for the named slot */
|
/* Search for the named slot */
|
||||||
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
|
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
|
||||||
{
|
{
|
||||||
@@ -707,10 +696,38 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
|||||||
/* Skip the sync of an invalidated slot */
|
/* Skip the sync of an invalidated slot */
|
||||||
if (slot->data.invalidated != RS_INVAL_NONE)
|
if (slot->data.invalidated != RS_INVAL_NONE)
|
||||||
{
|
{
|
||||||
|
pgstat_report_replslotsync(slot);
|
||||||
|
|
||||||
ReplicationSlotRelease();
|
ReplicationSlotRelease();
|
||||||
return slot_updated;
|
return slot_updated;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make sure that concerned WAL is received and flushed before syncing
|
||||||
|
* slot to target lsn received from the primary server.
|
||||||
|
*
|
||||||
|
* Report statistics only after the slot has been acquired, ensuring
|
||||||
|
* it cannot be dropped during the reporting process.
|
||||||
|
*/
|
||||||
|
if (remote_slot->confirmed_lsn > latestFlushPtr)
|
||||||
|
{
|
||||||
|
pgstat_report_replslotsync(slot);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Can get here only if GUC 'synchronized_standby_slots' on the
|
||||||
|
* primary server was not configured correctly.
|
||||||
|
*/
|
||||||
|
ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
|
||||||
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("skipping slot synchronization because the received slot sync"
|
||||||
|
" LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
|
||||||
|
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
|
||||||
|
remote_slot->name,
|
||||||
|
LSN_FORMAT_ARGS(latestFlushPtr)));
|
||||||
|
|
||||||
|
return slot_updated;
|
||||||
|
}
|
||||||
|
|
||||||
/* Slot not ready yet, let's attempt to make it sync-ready now. */
|
/* Slot not ready yet, let's attempt to make it sync-ready now. */
|
||||||
if (slot->data.persistency == RS_TEMPORARY)
|
if (slot->data.persistency == RS_TEMPORARY)
|
||||||
{
|
{
|
||||||
@@ -784,6 +801,32 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
|
|||||||
ReplicationSlotsComputeRequiredXmin(true);
|
ReplicationSlotsComputeRequiredXmin(true);
|
||||||
LWLockRelease(ProcArrayLock);
|
LWLockRelease(ProcArrayLock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make sure that concerned WAL is received and flushed before syncing
|
||||||
|
* slot to target lsn received from the primary server.
|
||||||
|
*
|
||||||
|
* Report statistics only after the slot has been acquired, ensuring
|
||||||
|
* it cannot be dropped during the reporting process.
|
||||||
|
*/
|
||||||
|
if (remote_slot->confirmed_lsn > latestFlushPtr)
|
||||||
|
{
|
||||||
|
pgstat_report_replslotsync(slot);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Can get here only if GUC 'synchronized_standby_slots' on the
|
||||||
|
* primary server was not configured correctly.
|
||||||
|
*/
|
||||||
|
ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
|
||||||
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("skipping slot synchronization because the received slot sync"
|
||||||
|
" LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
|
||||||
|
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
|
||||||
|
remote_slot->name,
|
||||||
|
LSN_FORMAT_ARGS(latestFlushPtr)));
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
|
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
|
||||||
|
|
||||||
slot_updated = true;
|
slot_updated = true;
|
||||||
|
|||||||
@@ -102,6 +102,36 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
|
|||||||
pgstat_unlock_entry(entry_ref);
|
pgstat_unlock_entry(entry_ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Report replication slot sync skip statistics.
|
||||||
|
*
|
||||||
|
* Similar to pgstat_report_replslot(), we can rely on the stats for the
|
||||||
|
* slot to exist and to belong to this slot.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
pgstat_report_replslotsync(ReplicationSlot *slot)
|
||||||
|
{
|
||||||
|
PgStat_EntryRef *entry_ref;
|
||||||
|
PgStatShared_ReplSlot *shstatent;
|
||||||
|
PgStat_StatReplSlotEntry *statent;
|
||||||
|
|
||||||
|
/* Slot sync stats are valid only for logical slots on standby. */
|
||||||
|
Assert(SlotIsLogical(slot));
|
||||||
|
Assert(RecoveryInProgress());
|
||||||
|
|
||||||
|
entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid,
|
||||||
|
ReplicationSlotIndex(slot), false);
|
||||||
|
Assert(entry_ref != NULL);
|
||||||
|
|
||||||
|
shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats;
|
||||||
|
statent = &shstatent->stats;
|
||||||
|
|
||||||
|
statent->slotsync_skip_count += 1;
|
||||||
|
statent->slotsync_skip_at = GetCurrentTimestamp();
|
||||||
|
|
||||||
|
pgstat_unlock_entry(entry_ref);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Report replication slot creation.
|
* Report replication slot creation.
|
||||||
*
|
*
|
||||||
@@ -133,7 +163,7 @@ pgstat_create_replslot(ReplicationSlot *slot)
|
|||||||
* Report replication slot has been acquired.
|
* Report replication slot has been acquired.
|
||||||
*
|
*
|
||||||
* This guarantees that a stats entry exists during later
|
* This guarantees that a stats entry exists during later
|
||||||
* pgstat_report_replslot() calls.
|
* pgstat_report_replslot() or pgstat_report_replslotsync() calls.
|
||||||
*
|
*
|
||||||
* If we previously crashed, no stats data exists. But if we did not crash,
|
* If we previously crashed, no stats data exists. But if we did not crash,
|
||||||
* the stats do belong to this slot:
|
* the stats do belong to this slot:
|
||||||
|
|||||||
@@ -2129,7 +2129,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
|
|||||||
Datum
|
Datum
|
||||||
pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
|
pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
#define PG_STAT_GET_REPLICATION_SLOT_COLS 11
|
#define PG_STAT_GET_REPLICATION_SLOT_COLS 13
|
||||||
text *slotname_text = PG_GETARG_TEXT_P(0);
|
text *slotname_text = PG_GETARG_TEXT_P(0);
|
||||||
NameData slotname;
|
NameData slotname;
|
||||||
TupleDesc tupdesc;
|
TupleDesc tupdesc;
|
||||||
@@ -2160,7 +2160,11 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
|
|||||||
INT8OID, -1, 0);
|
INT8OID, -1, 0);
|
||||||
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
|
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
|
||||||
INT8OID, -1, 0);
|
INT8OID, -1, 0);
|
||||||
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
|
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "slotsync_skip_count",
|
||||||
|
INT8OID, -1, 0);
|
||||||
|
TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_skip_at",
|
||||||
|
TIMESTAMPTZOID, -1, 0);
|
||||||
|
TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
|
||||||
TIMESTAMPTZOID, -1, 0);
|
TIMESTAMPTZOID, -1, 0);
|
||||||
BlessTupleDesc(tupdesc);
|
BlessTupleDesc(tupdesc);
|
||||||
|
|
||||||
@@ -2186,11 +2190,17 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
|
|||||||
values[7] = Int64GetDatum(slotent->mem_exceeded_count);
|
values[7] = Int64GetDatum(slotent->mem_exceeded_count);
|
||||||
values[8] = Int64GetDatum(slotent->total_txns);
|
values[8] = Int64GetDatum(slotent->total_txns);
|
||||||
values[9] = Int64GetDatum(slotent->total_bytes);
|
values[9] = Int64GetDatum(slotent->total_bytes);
|
||||||
|
values[10] = Int64GetDatum(slotent->slotsync_skip_count);
|
||||||
|
|
||||||
|
if (slotent->slotsync_skip_at == 0)
|
||||||
|
nulls[11] = true;
|
||||||
|
else
|
||||||
|
values[11] = TimestampTzGetDatum(slotent->slotsync_skip_at);
|
||||||
|
|
||||||
if (slotent->stat_reset_timestamp == 0)
|
if (slotent->stat_reset_timestamp == 0)
|
||||||
nulls[10] = true;
|
nulls[12] = true;
|
||||||
else
|
else
|
||||||
values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
|
values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
|
||||||
|
|
||||||
/* Returns the record as Datum */
|
/* Returns the record as Datum */
|
||||||
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
||||||
|
|||||||
@@ -57,6 +57,6 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/* yyyymmddN */
|
/* yyyymmddN */
|
||||||
#define CATALOG_VERSION_NO 202511221
|
#define CATALOG_VERSION_NO 202511251
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -5691,9 +5691,9 @@
|
|||||||
{ oid => '6169', descr => 'statistics: information about replication slot',
|
{ oid => '6169', descr => 'statistics: information about replication slot',
|
||||||
proname => 'pg_stat_get_replication_slot', provolatile => 's',
|
proname => 'pg_stat_get_replication_slot', provolatile => 's',
|
||||||
proparallel => 'r', prorettype => 'record', proargtypes => 'text',
|
proparallel => 'r', prorettype => 'record', proargtypes => 'text',
|
||||||
proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
|
proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}',
|
||||||
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
|
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
|
||||||
proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,stats_reset}',
|
proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,slotsync_skip_count,slotsync_skip_at,stats_reset}',
|
||||||
prosrc => 'pg_stat_get_replication_slot' },
|
prosrc => 'pg_stat_get_replication_slot' },
|
||||||
|
|
||||||
{ oid => '6230', descr => 'statistics: check if a stats object exists',
|
{ oid => '6230', descr => 'statistics: check if a stats object exists',
|
||||||
|
|||||||
@@ -214,7 +214,7 @@ typedef struct PgStat_TableXactStatus
|
|||||||
* ------------------------------------------------------------
|
* ------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define PGSTAT_FILE_FORMAT_ID 0x01A5BCBA
|
#define PGSTAT_FILE_FORMAT_ID 0x01A5BCBB
|
||||||
|
|
||||||
typedef struct PgStat_ArchiverStats
|
typedef struct PgStat_ArchiverStats
|
||||||
{
|
{
|
||||||
@@ -400,6 +400,8 @@ typedef struct PgStat_StatReplSlotEntry
|
|||||||
PgStat_Counter mem_exceeded_count;
|
PgStat_Counter mem_exceeded_count;
|
||||||
PgStat_Counter total_txns;
|
PgStat_Counter total_txns;
|
||||||
PgStat_Counter total_bytes;
|
PgStat_Counter total_bytes;
|
||||||
|
PgStat_Counter slotsync_skip_count;
|
||||||
|
TimestampTz slotsync_skip_at;
|
||||||
TimestampTz stat_reset_timestamp;
|
TimestampTz stat_reset_timestamp;
|
||||||
} PgStat_StatReplSlotEntry;
|
} PgStat_StatReplSlotEntry;
|
||||||
|
|
||||||
@@ -745,6 +747,7 @@ extern PgStat_TableStatus *find_tabstat_entry(Oid rel_id);
|
|||||||
extern void pgstat_reset_replslot(const char *name);
|
extern void pgstat_reset_replslot(const char *name);
|
||||||
struct ReplicationSlot;
|
struct ReplicationSlot;
|
||||||
extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat);
|
extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat);
|
||||||
|
extern void pgstat_report_replslotsync(struct ReplicationSlot *slot);
|
||||||
extern void pgstat_create_replslot(struct ReplicationSlot *slot);
|
extern void pgstat_create_replslot(struct ReplicationSlot *slot);
|
||||||
extern void pgstat_acquire_replslot(struct ReplicationSlot *slot);
|
extern void pgstat_acquire_replslot(struct ReplicationSlot *slot);
|
||||||
extern void pgstat_drop_replslot(struct ReplicationSlot *slot);
|
extern void pgstat_drop_replslot(struct ReplicationSlot *slot);
|
||||||
|
|||||||
@@ -213,19 +213,75 @@ is( $standby1->safe_psql(
|
|||||||
##################################################
|
##################################################
|
||||||
# Test that the synchronized slot will be dropped if the corresponding remote
|
# Test that the synchronized slot will be dropped if the corresponding remote
|
||||||
# slot on the primary server has been dropped.
|
# slot on the primary server has been dropped.
|
||||||
|
#
|
||||||
|
# Note: Both slots need to be dropped for the next test to work
|
||||||
##################################################
|
##################################################
|
||||||
|
|
||||||
$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");
|
$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");
|
||||||
|
$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub1_slot');");
|
||||||
|
|
||||||
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
|
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
|
||||||
|
|
||||||
is( $standby1->safe_psql(
|
is( $standby1->safe_psql(
|
||||||
'postgres',
|
'postgres',
|
||||||
q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';}
|
q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot');}
|
||||||
),
|
),
|
||||||
"t",
|
"t",
|
||||||
'synchronized slot has been dropped');
|
'synchronized slot has been dropped');
|
||||||
|
|
||||||
|
##################################################
|
||||||
|
# Verify that slotsync skip statistics are correctly updated when the
|
||||||
|
# slotsync operation is skipped.
|
||||||
|
##################################################
|
||||||
|
|
||||||
|
# Create a logical replication slot and create some DDL on the primary so
|
||||||
|
# that the slot lags behind the standby.
|
||||||
|
$primary->safe_psql(
|
||||||
|
'postgres', qq(
|
||||||
|
SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
|
||||||
|
CREATE TABLE wal_push(a int);
|
||||||
|
));
|
||||||
|
$primary->wait_for_replay_catchup($standby1);
|
||||||
|
|
||||||
|
my $log_offset = -s $standby1->logfile;
|
||||||
|
|
||||||
|
# Enable slot sync worker.
|
||||||
|
$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = on));
|
||||||
|
$standby1->reload;
|
||||||
|
|
||||||
|
# Confirm that the slot sync worker is able to start.
|
||||||
|
$standby1->wait_for_log(qr/slot sync worker started/, $log_offset);
|
||||||
|
|
||||||
|
# Confirm that the slot sync is skipped due to the remote slot lagging behind
|
||||||
|
$standby1->wait_for_log(
|
||||||
|
qr/could not synchronize replication slot \"lsub1_slot\"/, $log_offset);
|
||||||
|
|
||||||
|
# Confirm that the slotsync skip statistics is updated
|
||||||
|
$result = $standby1->safe_psql('postgres',
|
||||||
|
"SELECT slotsync_skip_count > 0 FROM pg_stat_replication_slots WHERE slot_name = 'lsub1_slot'"
|
||||||
|
);
|
||||||
|
is($result, 't', "check slot sync skip count increments");
|
||||||
|
|
||||||
|
# Clean the table
|
||||||
|
$primary->safe_psql(
|
||||||
|
'postgres', qq(
|
||||||
|
DROP TABLE wal_push;
|
||||||
|
));
|
||||||
|
$primary->wait_for_replay_catchup($standby1);
|
||||||
|
|
||||||
|
# Re-create the logical replication slot and sync it to standby for further tests
|
||||||
|
$primary->safe_psql(
|
||||||
|
'postgres', qq(
|
||||||
|
SELECT pg_drop_replication_slot('lsub1_slot');
|
||||||
|
SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
|
||||||
|
));
|
||||||
|
$standby1->wait_for_log(
|
||||||
|
qr/newly created replication slot \"lsub1_slot\" is sync-ready now/,
|
||||||
|
$log_offset);
|
||||||
|
|
||||||
|
$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = off));
|
||||||
|
$standby1->reload;
|
||||||
|
|
||||||
##################################################
|
##################################################
|
||||||
# Test that if the synchronized slot is invalidated while the remote slot is
|
# Test that if the synchronized slot is invalidated while the remote slot is
|
||||||
# still valid, the slot will be dropped and re-created on the standby by
|
# still valid, the slot will be dropped and re-created on the standby by
|
||||||
@@ -281,7 +337,7 @@ $inactive_since_on_primary =
|
|||||||
# the failover slots.
|
# the failover slots.
|
||||||
$primary->wait_for_replay_catchup($standby1);
|
$primary->wait_for_replay_catchup($standby1);
|
||||||
|
|
||||||
my $log_offset = -s $standby1->logfile;
|
$log_offset = -s $standby1->logfile;
|
||||||
|
|
||||||
# Synchronize the primary server slots to the standby.
|
# Synchronize the primary server slots to the standby.
|
||||||
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
|
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
|
||||||
|
|||||||
@@ -2151,9 +2151,11 @@ pg_stat_replication_slots| SELECT s.slot_name,
|
|||||||
s.mem_exceeded_count,
|
s.mem_exceeded_count,
|
||||||
s.total_txns,
|
s.total_txns,
|
||||||
s.total_bytes,
|
s.total_bytes,
|
||||||
|
s.slotsync_skip_count,
|
||||||
|
s.slotsync_skip_at,
|
||||||
s.stats_reset
|
s.stats_reset
|
||||||
FROM pg_replication_slots r,
|
FROM pg_replication_slots r,
|
||||||
LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, stats_reset)
|
LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, slotsync_skip_count, slotsync_skip_at, stats_reset)
|
||||||
WHERE (r.datoid IS NOT NULL);
|
WHERE (r.datoid IS NOT NULL);
|
||||||
pg_stat_slru| SELECT name,
|
pg_stat_slru| SELECT name,
|
||||||
blks_zeroed,
|
blks_zeroed,
|
||||||
|
|||||||
Reference in New Issue
Block a user