diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index fc64df43e3f..2741c138593 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2192,6 +2192,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + seq_sync_error_count bigint + + + Number of times an error occurred in the sequence synchronization + worker. A single worker synchronizes all sequences, so one error + increment may represent failures across multiple sequences. + + + sync_error_count bigint diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index dec8df4f8ee..059e8778ca7 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1415,6 +1415,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.subid, s.subname, ss.apply_error_count, + ss.seq_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index a8a39bec508..e093e65e540 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -732,6 +732,9 @@ start_sequence_sync() * idle state. */ AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_SEQUENCESYNC); + PG_RE_THROW(); } } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e5a2856fd17..dcc6124cc73 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1530,7 +1530,8 @@ start_table_sync(XLogRecPtr *origin_startpos, char **slotname) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + pgstat_report_subscription_error(MySubscription->oid, + WORKERTYPE_TABLESYNC); PG_RE_THROW(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 28f61f96a1a..93970c6af29 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5606,7 +5606,8 @@ start_apply(XLogRecPtr origin_startpos) * idle state. */ AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + pgstat_report_subscription_error(MySubscription->oid, + MyLogicalRepWorker->type); PG_RE_THROW(); } @@ -5953,15 +5954,12 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); - if (am_leader_apply_worker() || am_tablesync_worker()) - { - /* - * Report the worker failed during either table synchronization or - * apply. - */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - } + /* + * Report the worker failed during sequence synchronization, table + * synchronization, or apply. + */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->type); /* Disable the subscription */ StartTransactionCommand(); diff --git a/src/backend/utils/activity/pgstat_subscription.c b/src/backend/utils/activity/pgstat_subscription.c index f9a1c831a07..35916772b9d 100644 --- a/src/backend/utils/activity/pgstat_subscription.c +++ b/src/backend/utils/activity/pgstat_subscription.c @@ -17,6 +17,7 @@ #include "postgres.h" +#include "replication/worker_internal.h" #include "utils/pgstat_internal.h" @@ -24,7 +25,7 @@ * Report a subscription error. */ void -pgstat_report_subscription_error(Oid subid, bool is_apply_error) +pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype) { PgStat_EntryRef *entry_ref; PgStat_BackendSubEntry *pending; @@ -33,10 +34,25 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error) InvalidOid, subid, NULL); pending = entry_ref->pending; - if (is_apply_error) - pending->apply_error_count++; - else - pending->sync_error_count++; + switch (wtype) + { + case WORKERTYPE_APPLY: + pending->apply_error_count++; + break; + + case WORKERTYPE_SEQUENCESYNC: + pending->seq_sync_error_count++; + break; + + case WORKERTYPE_TABLESYNC: + pending->sync_error_count++; + break; + + default: + /* Should never happen. */ + Assert(0); + break; + } } /* @@ -115,6 +131,7 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) #define SUB_ACC(fld) shsubent->stats.fld += localent->fld SUB_ACC(apply_error_count); + SUB_ACC(seq_sync_error_count); SUB_ACC(sync_error_count); for (int i = 0; i < CONFLICT_NUM_TYPES; i++) SUB_ACC(conflict_count[i]); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index a710508979e..1521d6e2ab4 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2203,7 +2203,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 13 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2221,25 +2221,27 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "seq_sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2256,6 +2258,9 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) /* apply_error_count */ values[i++] = Int64GetDatum(subentry->apply_error_count); + /* seq_sync_error_count */ + values[i++] = Int64GetDatum(subentry->seq_sync_error_count); + /* sync_error_count */ values[i++] = Int64GetDatum(subentry->sync_error_count); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 593aed7fe21..60e7fd047d1 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202511051 +#define CATALOG_VERSION_NO 202511071 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 34b7fddb0e7..5cf9e12fcb9 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5704,9 +5704,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,seq_sync_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 7ae503e71a2..a0610bb3e31 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -16,6 +16,7 @@ #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ #include "replication/conflict.h" +#include "replication/worker_internal.h" #include "utils/backend_progress.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/backend_status.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/pgstat_kind.h" @@ -108,6 +109,7 @@ typedef struct PgStat_FunctionCallUsage typedef struct PgStat_BackendSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter seq_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; } PgStat_BackendSubEntry; @@ -416,6 +418,7 @@ typedef struct PgStat_SLRUStats typedef struct PgStat_StatSubEntry { PgStat_Counter apply_error_count; + PgStat_Counter seq_sync_error_count; PgStat_Counter sync_error_count; PgStat_Counter conflict_count[CONFLICT_NUM_TYPES]; TimestampTz stat_reset_timestamp; @@ -769,7 +772,8 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void); * Functions in pgstat_subscription.c */ -extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error); +extern void pgstat_report_subscription_error(Oid subid, + LogicalRepWorkerType wtype); extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type); extern void pgstat_create_subscription(Oid subid); extern void pgstat_drop_subscription(Oid subid); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2bf968ae3d3..7c52181cbcb 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2191,6 +2191,7 @@ pg_stat_subscription| SELECT su.oid AS subid, pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, + ss.seq_sync_error_count, ss.sync_error_count, ss.confl_insert_exists, ss.confl_update_origin_differs, @@ -2202,7 +2203,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_multiple_unique_conflicts, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, seq_sync_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl index 00a1c2fcd48..fc0bcee5187 100644 --- a/src/test/subscription/t/026_stats.pl +++ b/src/test/subscription/t/026_stats.pl @@ -21,10 +21,16 @@ $node_subscriber->start; sub create_sub_pub_w_errors { - my ($node_publisher, $node_subscriber, $db, $table_name) = @_; - # Initial table setup on both publisher and subscriber. On subscriber we - # create the same tables but with primary keys. Also, insert some data that - # will conflict with the data replicated from publisher later. + my ($node_publisher, $node_subscriber, $db, $table_name, $sequence_name) + = @_; + # Initial table and sequence setup on both publisher and subscriber. + # + # Tables: Created on both nodes, but the subscriber version includes + # primary keys and pre-populated data that will intentionally conflict with + # replicated data from the publisher. + # + # Sequences: Created on both nodes with different INCREMENT values to + # intentionally trigger replication conflicts. $node_publisher->safe_psql( $db, qq[ @@ -32,6 +38,7 @@ sub create_sub_pub_w_errors CREATE TABLE $table_name(a int); ALTER TABLE $table_name REPLICA IDENTITY FULL; INSERT INTO $table_name VALUES (1); + CREATE SEQUENCE $sequence_name; COMMIT; ]); $node_subscriber->safe_psql( @@ -40,35 +47,57 @@ sub create_sub_pub_w_errors BEGIN; CREATE TABLE $table_name(a int primary key); INSERT INTO $table_name VALUES (1); + CREATE SEQUENCE $sequence_name INCREMENT BY 10; COMMIT; ]); # Set up publication. my $pub_name = $table_name . '_pub'; + my $pub_seq_name = $sequence_name . '_pub'; my $publisher_connstr = $node_publisher->connstr . qq( dbname=$db); - $node_publisher->safe_psql($db, - qq(CREATE PUBLICATION $pub_name FOR TABLE $table_name)); + $node_publisher->safe_psql( + $db, + qq[ + CREATE PUBLICATION $pub_name FOR TABLE $table_name; + CREATE PUBLICATION $pub_seq_name FOR ALL SEQUENCES; + ]); # Create subscription. The tablesync for table on subscription will enter into - # infinite error loop due to violating the unique constraint. + # infinite error loop due to violating the unique constraint. The sequencesync + # will also fail due to different sequence increment values on publisher and + # subscriber. my $sub_name = $table_name . '_sub'; $node_subscriber->safe_psql($db, - qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name) + qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name, $pub_seq_name) ); $node_publisher->wait_for_catchup($sub_name); - # Wait for the tablesync error to be reported. + # Wait for the tablesync and sequencesync error to be reported. $node_subscriber->poll_query_until( $db, qq[ - SELECT sync_error_count > 0 - FROM pg_stat_subscription_stats - WHERE subname = '$sub_name' + SELECT count(1) = 1 FROM pg_stat_subscription_stats + WHERE subname = '$sub_name' AND seq_sync_error_count > 0 AND sync_error_count > 0 ]) or die - qq(Timed out while waiting for tablesync errors for subscription '$sub_name'); + qq(Timed out while waiting for sequencesync errors and tablesync errors for subscription '$sub_name'); + + # Change the sequence INCREMENT value back to the default on the subscriber + # so it doesn't error out. + $node_subscriber->safe_psql($db, + qq(ALTER SEQUENCE $sequence_name INCREMENT 1)); + + # Wait for sequencesync to finish. + $node_subscriber->poll_query_until( + $db, + qq[ + SELECT count(1) = 1 FROM pg_subscription_rel + WHERE srrelid = '$sequence_name'::regclass AND srsubstate = 'r' + ]) + or die + qq(Timed out while waiting for subscriber to synchronize data for sequence '$sequence_name'.); # Truncate test_tab1 so that tablesync worker can continue. $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name)); @@ -136,14 +165,17 @@ is($result, qq(0), # Create the publication and subscription with sync and apply errors my $table1_name = 'test_tab1'; +my $sequence1_name = 'test_seq1'; my ($pub1_name, $sub1_name) = create_sub_pub_w_errors($node_publisher, $node_subscriber, $db, - $table1_name); + $table1_name, $sequence1_name); -# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset +# timestamp is NULL. is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, + seq_sync_error_count > 0, sync_error_count > 0, confl_insert_exists > 0, confl_delete_missing > 0, @@ -151,8 +183,8 @@ is( $node_subscriber->safe_psql( FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), - qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.) + qq(t|t|t|t|t|t), + qq(Check that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.) ); # Reset a single subscription @@ -160,10 +192,12 @@ $node_subscriber->safe_psql($db, qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name'))) ); -# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and +# stats_reset timestamp is not NULL. is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, + seq_sync_error_count = 0, sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, @@ -171,8 +205,8 @@ is( $node_subscriber->safe_psql( FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.) ); # Get reset timestamp @@ -198,14 +232,17 @@ is( $node_subscriber->safe_psql( # Make second subscription and publication my $table2_name = 'test_tab2'; +my $sequence2_name = 'test_seq2'; my ($pub2_name, $sub2_name) = create_sub_pub_w_errors($node_publisher, $node_subscriber, $db, - $table2_name); + $table2_name, $sequence2_name); -# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 +# and stats_reset timestamp is NULL is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count > 0, + seq_sync_error_count > 0, sync_error_count > 0, confl_insert_exists > 0, confl_delete_missing > 0, @@ -213,18 +250,20 @@ is( $node_subscriber->safe_psql( FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.) ); # Reset all subscriptions $node_subscriber->safe_psql($db, qq(SELECT pg_stat_reset_subscription_stats(NULL))); -# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL +# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and +# stats_reset timestamp is not NULL. is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, + seq_sync_error_count = 0, sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, @@ -232,13 +271,14 @@ is( $node_subscriber->safe_psql( FROM pg_stat_subscription_stats WHERE subname = '$sub1_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.) ); is( $node_subscriber->safe_psql( $db, qq(SELECT apply_error_count = 0, + seq_sync_error_count = 0, sync_error_count = 0, confl_insert_exists = 0, confl_delete_missing = 0, @@ -246,8 +286,8 @@ is( $node_subscriber->safe_psql( FROM pg_stat_subscription_stats WHERE subname = '$sub2_name') ), - qq(t|t|t|t|t), - qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.) + qq(t|t|t|t|t|t), + qq(Confirm that apply errors, sequencesync errors, tablesync errors, errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.) ); $reset_time1 = $node_subscriber->safe_psql($db,