1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-16 15:02:33 +03:00

Add seq_sync_error_count to subscription statistics.

This commit adds a new column, seq_sync_error_count, to the
pg_stat_subscription_stats view. This counter tracks the number of errors
encountered by the sequence synchronization worker during operation.

Since a single worker handles the synchronization of all sequences, this
value may reflect errors from multiple sequences. This addition improves
observability of sequence synchronization behavior and helps monitor
potential issues during replication.

Author: Vignesh C <vignesh21@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
This commit is contained in:
Amit Kapila
2025-11-07 08:05:08 +00:00
parent c32e32f763
commit f6a4c498dc
12 changed files with 143 additions and 62 deletions

View File

@@ -1415,6 +1415,7 @@ CREATE VIEW pg_stat_subscription_stats AS
ss.subid,
s.subname,
ss.apply_error_count,
ss.seq_sync_error_count,
ss.sync_error_count,
ss.confl_insert_exists,
ss.confl_update_origin_differs,

View File

@@ -732,6 +732,9 @@ start_sequence_sync()
* idle state.
*/
AbortOutOfAnyTransaction();
pgstat_report_subscription_error(MySubscription->oid,
WORKERTYPE_SEQUENCESYNC);
PG_RE_THROW();
}
}

View File

@@ -1530,7 +1530,8 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
* idle state.
*/
AbortOutOfAnyTransaction();
pgstat_report_subscription_error(MySubscription->oid, false);
pgstat_report_subscription_error(MySubscription->oid,
WORKERTYPE_TABLESYNC);
PG_RE_THROW();
}

View File

@@ -5606,7 +5606,8 @@ start_apply(XLogRecPtr origin_startpos)
* idle state.
*/
AbortOutOfAnyTransaction();
pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
pgstat_report_subscription_error(MySubscription->oid,
MyLogicalRepWorker->type);
PG_RE_THROW();
}
@@ -5953,15 +5954,12 @@ DisableSubscriptionAndExit(void)
RESUME_INTERRUPTS();
if (am_leader_apply_worker() || am_tablesync_worker())
{
/*
* Report the worker failed during either table synchronization or
* apply.
*/
pgstat_report_subscription_error(MyLogicalRepWorker->subid,
!am_tablesync_worker());
}
/*
* Report the worker failed during sequence synchronization, table
* synchronization, or apply.
*/
pgstat_report_subscription_error(MyLogicalRepWorker->subid,
MyLogicalRepWorker->type);
/* Disable the subscription */
StartTransactionCommand();

View File

@@ -17,6 +17,7 @@
#include "postgres.h"
#include "replication/worker_internal.h"
#include "utils/pgstat_internal.h"
@@ -24,7 +25,7 @@
* Report a subscription error.
*/
void
pgstat_report_subscription_error(Oid subid, bool is_apply_error)
pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
{
PgStat_EntryRef *entry_ref;
PgStat_BackendSubEntry *pending;
@@ -33,10 +34,25 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
InvalidOid, subid, NULL);
pending = entry_ref->pending;
if (is_apply_error)
pending->apply_error_count++;
else
pending->sync_error_count++;
switch (wtype)
{
case WORKERTYPE_APPLY:
pending->apply_error_count++;
break;
case WORKERTYPE_SEQUENCESYNC:
pending->seq_sync_error_count++;
break;
case WORKERTYPE_TABLESYNC:
pending->sync_error_count++;
break;
default:
/* Should never happen. */
Assert(0);
break;
}
}
/*
@@ -115,6 +131,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
#define SUB_ACC(fld) shsubent->stats.fld += localent->fld
SUB_ACC(apply_error_count);
SUB_ACC(seq_sync_error_count);
SUB_ACC(sync_error_count);
for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
SUB_ACC(conflict_count[i]);

View File

@@ -2203,7 +2203,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12
#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 13
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2221,25 +2221,27 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "seq_sync_error_count",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted",
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing",
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs",
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_update_missing",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing",
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_origin_differs",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts",
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_delete_missing",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
TupleDescInitEntry(tupdesc, (AttrNumber) 12, "confl_multiple_unique_conflicts",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
@@ -2256,6 +2258,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
/* apply_error_count */
values[i++] = Int64GetDatum(subentry->apply_error_count);
/* seq_sync_error_count */
values[i++] = Int64GetDatum(subentry->seq_sync_error_count);
/* sync_error_count */
values[i++] = Int64GetDatum(subentry->sync_error_count);