diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 96b4886e08c..fb4472356d5 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -346,9 +346,7 @@
A conflict will produce an error and will stop the replication; it must be
resolved manually by the user. Details about the conflict can be found in
-
- pg_stat_subscription_workers and the
- subscriber's server log.
+ the subscriber's server log.
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index bf7625d9889..9fb62fec8ec 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -628,11 +628,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
- pg_stat_subscription_workerspg_stat_subscription_workers
- One row per subscription worker, showing statistics about errors
- that occurred on that subscription worker.
- See
- pg_stat_subscription_workers for details.
+ pg_stat_subscription_statspg_stat_subscription_stats
+ One row per subscription, showing statistics about errors.
+ See
+ pg_stat_subscription_stats for details.
@@ -3063,23 +3062,20 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
-
- pg_stat_subscription_workers
+
+ pg_stat_subscription_stats
- pg_stat_subscription_workers
+ pg_stat_subscription_stats
- The pg_stat_subscription_workers view will contain
- one row per subscription worker on which errors have occurred, for workers
- applying logical replication changes and workers handling the initial data
- copy of the subscribed tables. The statistics entry is removed when the
- corresponding subscription is dropped.
+ The pg_stat_subscription_stats view will contain
+ one row per subscription.
-
- pg_stat_subscription_workers View
+
+ pg_stat_subscription_stats View
@@ -3113,72 +3109,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
- subrelidoid
+ apply_error_countbigint
- OID of the relation that the worker is synchronizing; null for the
- main apply worker
+ Number of times an error occurred while applying changes
- last_error_relidoid
+ sync_error_countbigint
- OID of the relation that the worker was processing when the
- error occurred
+ Number of times an error occurred during the initial table
+ synchronization
- last_error_commandtext
+ stats_resettimestamp with time zone
- Name of command being applied when the error occurred. This field
- is null if the error was reported during the initial data copy.
+ Time at which these statistics were last reset
-
-
-
- last_error_xidxid
-
-
- Transaction ID of the publisher node being applied when the error
- occurred. This field is null if the error was reported
- during the initial data copy.
-
-
-
-
-
- last_error_countuint8
-
-
- Number of consecutive times the error occurred
-
-
-
-
-
- last_error_messagetext
-
-
- The error message
-
-
-
-
-
- last_error_timetimestamp with time zone
-
-
- Last time at which this error occurred
-
-
-
@@ -5320,22 +5275,16 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
- pg_stat_reset_subscription_worker
+ pg_stat_reset_subscription_stats
- pg_stat_reset_subscription_worker ( subidoid, relidoid )
+ pg_stat_reset_subscription_stats ( oid )
void
- Resets the statistics of subscription workers running on the
- subscription with subid shown in the
- pg_stat_subscription_workers view. If the
- argument relid is not NULL,
- resets statistics of the subscription worker handling the initial data
- copy of the relation with relid. Otherwise,
- resets the subscription worker statistics of the main apply worker.
- If the argument relid is omitted, resets the
- statistics of all subscription workers running on the subscription
- with subid.
+ Resets statistics for a single subscription shown in the
+ pg_stat_subscription_stats view to zero. If
+ the argument is NULL, reset statistics for all
+ subscriptions.
This function is restricted to superusers by default, but other users
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index fd1421788e6..758ab6e25a3 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -639,9 +639,7 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ
REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
-REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid) FROM public;
-
-REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid, oid) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_stats(oid) FROM public;
REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3cb69b1f87b..40b7bca5a96 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1264,25 +1264,12 @@ GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
substream, subtwophasestate, subslotname, subsynccommit, subpublications)
ON pg_subscription TO public;
-CREATE VIEW pg_stat_subscription_workers AS
+CREATE VIEW pg_stat_subscription_stats AS
SELECT
- w.subid,
+ ss.subid,
s.subname,
- w.subrelid,
- w.last_error_relid,
- w.last_error_command,
- w.last_error_xid,
- w.last_error_count,
- w.last_error_message,
- w.last_error_time
- FROM (SELECT
- oid as subid,
- NULL as relid
- FROM pg_subscription
- UNION ALL
- SELECT
- srsubid as subid,
- srrelid as relid
- FROM pg_subscription_rel) sr,
- LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w
- JOIN pg_subscription s ON (w.subid = s.oid);
+ ss.apply_error_count,
+ ss.sync_error_count,
+ ss.stats_reset
+ FROM pg_subscription as s,
+ pg_stat_get_subscription_stats(s.oid) as ss;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 0646f530985..53ddd930e6e 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -106,7 +106,7 @@
#define PGSTAT_DB_HASH_SIZE 16
#define PGSTAT_TAB_HASH_SIZE 512
#define PGSTAT_FUNCTION_HASH_SIZE 512
-#define PGSTAT_SUBWORKER_HASH_SIZE 32
+#define PGSTAT_SUBSCRIPTION_HASH_SIZE 32
#define PGSTAT_REPLSLOT_HASH_SIZE 32
@@ -284,6 +284,7 @@ static PgStat_GlobalStats globalStats;
static PgStat_WalStats walStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
static HTAB *replSlotStatHash = NULL;
+static HTAB *subscriptionStatHash = NULL;
/*
* List of OIDs of databases we need to write out. If an entry is InvalidOid,
@@ -322,14 +323,13 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
Oid tableoid, bool create);
-static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry,
- Oid subid, Oid subrelid,
- bool create);
+static PgStat_StatSubEntry *pgstat_get_subscription_entry(Oid subid, bool create);
+static void pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts);
static void pgstat_write_statsfiles(bool permanent, bool allDbs);
static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
- HTAB *subworkerhash, bool permanent);
+ bool permanent);
static void backend_read_statsfile(void);
static bool pgstat_write_statsfile_needed(void);
@@ -341,7 +341,6 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now);
static void pgstat_send_funcstats(void);
static void pgstat_send_slru(void);
-static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg);
static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid);
static bool pgstat_should_report_connstat(void);
static void pgstat_report_disconnect(Oid dboid);
@@ -363,6 +362,7 @@ static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, in
static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
+static void pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len);
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
@@ -380,8 +380,8 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
-static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
-static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len);
+static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len);
/* ------------------------------------------------------------
* Public functions called from postmaster follow
@@ -1187,6 +1187,32 @@ pgstat_vacuum_stat(void)
}
}
+ /*
+ * Repeat the above steps for subscriptions, if subscription stats are
+ * being collected.
+ */
+ if (subscriptionStatHash)
+ {
+ PgStat_StatSubEntry *subentry;
+
+ /*
+ * Read pg_subscription and make a list of OIDs of all existing
+ * subscriptions.
+ */
+ htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid);
+
+ hash_seq_init(&hstat, subscriptionStatHash);
+ while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ if (hash_search(htab, (void *) &(subentry->subid), HASH_FIND, NULL) == NULL)
+ pgstat_report_subscription_drop(subentry->subid);
+ }
+
+ hash_destroy(htab);
+ }
+
/*
* Lookup our own database entry; if not found, nothing more to do.
*/
@@ -1311,74 +1337,6 @@ pgstat_vacuum_stat(void)
hash_destroy(htab);
}
-
- /*
- * Repeat for subscription workers. Similarly, we needn't bother in the
- * common case where no subscription workers' stats are being collected.
- */
- if (dbentry->subworkers != NULL &&
- hash_get_num_entries(dbentry->subworkers) > 0)
- {
- PgStat_StatSubWorkerEntry *subwentry;
- PgStat_MsgSubscriptionPurge spmsg;
-
- /*
- * Read pg_subscription and make a list of OIDs of all existing
- * subscriptions
- */
- htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid);
-
- spmsg.m_databaseid = MyDatabaseId;
- spmsg.m_nentries = 0;
-
- hash_seq_init(&hstat, dbentry->subworkers);
- while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
- {
- bool exists = false;
- Oid subid = subwentry->key.subid;
-
- CHECK_FOR_INTERRUPTS();
-
- if (hash_search(htab, (void *) &subid, HASH_FIND, NULL) != NULL)
- continue;
-
- /*
- * It is possible that we have multiple entries for the
- * subscription corresponding to apply worker and tablesync
- * workers. In such cases, we don't need to add the same subid
- * again.
- */
- for (int i = 0; i < spmsg.m_nentries; i++)
- {
- if (spmsg.m_subids[i] == subid)
- {
- exists = true;
- break;
- }
- }
-
- if (exists)
- continue;
-
- /* This subscription is dead, add the subid to the message */
- spmsg.m_subids[spmsg.m_nentries++] = subid;
-
- /*
- * If the message is full, send it out and reinitialize to empty
- */
- if (spmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE)
- {
- pgstat_send_subscription_purge(&spmsg);
- spmsg.m_nentries = 0;
- }
- }
-
- /* Send the rest of dead subscriptions */
- if (spmsg.m_nentries > 0)
- pgstat_send_subscription_purge(&spmsg);
-
- hash_destroy(htab);
- }
}
@@ -1551,8 +1509,7 @@ pgstat_reset_shared_counters(const char *target)
* ----------
*/
void
-pgstat_reset_single_counter(Oid objoid, Oid subobjoid,
- PgStat_Single_Reset_Type type)
+pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
{
PgStat_MsgResetsinglecounter msg;
@@ -1563,7 +1520,6 @@ pgstat_reset_single_counter(Oid objoid, Oid subobjoid,
msg.m_databaseid = MyDatabaseId;
msg.m_resettype = type;
msg.m_objectid = objoid;
- msg.m_subobjectid = subobjoid;
pgstat_send(&msg, sizeof(msg));
}
@@ -1623,6 +1579,30 @@ pgstat_reset_replslot_counter(const char *name)
pgstat_send(&msg, sizeof(msg));
}
+/* ----------
+ * pgstat_reset_subscription_counter() -
+ *
+ * Tell the statistics collector to reset a single subscription
+ * counter, or all subscription counters (when subid is InvalidOid).
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ * ----------
+ */
+void
+pgstat_reset_subscription_counter(Oid subid)
+{
+ PgStat_MsgResetsubcounter msg;
+
+ if (pgStatSock == PGINVALID_SOCKET)
+ return;
+
+ msg.m_subid = subid;
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBCOUNTER);
+
+ pgstat_send(&msg, sizeof(msg));
+}
+
/* ----------
* pgstat_report_autovac() -
*
@@ -1949,31 +1929,20 @@ pgstat_report_replslot_drop(const char *slotname)
}
/* ----------
- * pgstat_report_subworker_error() -
+ * pgstat_report_subscription_error() -
*
- * Tell the collector about the subscription worker error.
+ * Tell the collector about the subscription error.
* ----------
*/
void
-pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
- LogicalRepMsgType command, TransactionId xid,
- const char *errmsg)
+pgstat_report_subscription_error(Oid subid, bool is_apply_error)
{
- PgStat_MsgSubWorkerError msg;
- int len;
+ PgStat_MsgSubscriptionError msg;
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR);
- msg.m_databaseid = MyDatabaseId;
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONERROR);
msg.m_subid = subid;
- msg.m_subrelid = subrelid;
- msg.m_relid = relid;
- msg.m_command = command;
- msg.m_xid = xid;
- msg.m_timestamp = GetCurrentTimestamp();
- strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
-
- len = offsetof(PgStat_MsgSubWorkerError, m_message) + strlen(msg.m_message) + 1;
- pgstat_send(&msg, len);
+ msg.m_is_apply_error = is_apply_error;
+ pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionError));
}
/* ----------
@@ -1985,12 +1954,11 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
void
pgstat_report_subscription_drop(Oid subid)
{
- PgStat_MsgSubscriptionPurge msg;
+ PgStat_MsgSubscriptionDrop msg;
- msg.m_databaseid = MyDatabaseId;
- msg.m_subids[0] = subid;
- msg.m_nentries = 1;
- pgstat_send_subscription_purge(&msg);
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONDROP);
+ msg.m_subid = subid;
+ pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionDrop));
}
/* ----------
@@ -2998,36 +2966,6 @@ pgstat_fetch_stat_funcentry(Oid func_id)
return funcentry;
}
-/*
- * ---------
- * pgstat_fetch_stat_subworker_entry() -
- *
- * Support function for the SQL-callable pgstat* functions. Returns
- * the collected statistics for subscription worker or NULL.
- * ---------
- */
-PgStat_StatSubWorkerEntry *
-pgstat_fetch_stat_subworker_entry(Oid subid, Oid subrelid)
-{
- PgStat_StatDBEntry *dbentry;
- PgStat_StatSubWorkerEntry *wentry = NULL;
-
- /* Load the stats file if needed */
- backend_read_statsfile();
-
- /*
- * Lookup our database, then find the requested subscription worker stats.
- */
- dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
- if (dbentry != NULL && dbentry->subworkers != NULL)
- {
- wentry = pgstat_get_subworker_entry(dbentry, subid, subrelid,
- false);
- }
-
- return wentry;
-}
-
/*
* ---------
* pgstat_fetch_stat_archiver() -
@@ -3140,6 +3078,23 @@ pgstat_fetch_replslot(NameData slotname)
return pgstat_get_replslot_entry(slotname, false);
}
+/*
+ * ---------
+ * pgstat_fetch_stat_subscription() -
+ *
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * the collected statistics for one subscription or NULL.
+ * ---------
+ */
+PgStat_StatSubEntry *
+pgstat_fetch_stat_subscription(Oid subid)
+{
+ /* Load the stats file if needed */
+ backend_read_statsfile();
+
+ return pgstat_get_subscription_entry(subid, false);
+}
+
/*
* Shut down a single backend's statistics reporting at process exit.
*
@@ -3465,24 +3420,6 @@ pgstat_send_slru(void)
}
}
-/* --------
- * pgstat_send_subscription_purge() -
- *
- * Send a subscription purge message to the collector
- * --------
- */
-static void
-pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg)
-{
- int len;
-
- len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0])
- + msg->m_nentries * sizeof(Oid);
-
- pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE);
- pgstat_send(msg, len);
-}
-
/* ----------
* PgstatCollectorMain() -
*
@@ -3668,6 +3605,10 @@ PgstatCollectorMain(int argc, char *argv[])
len);
break;
+ case PGSTAT_MTYPE_RESETSUBCOUNTER:
+ pgstat_recv_resetsubcounter(&msg.msg_resetsubcounter, len);
+ break;
+
case PGSTAT_MTYPE_AUTOVAC_START:
pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
break;
@@ -3738,12 +3679,12 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_recv_disconnect(&msg.msg_disconnect, len);
break;
- case PGSTAT_MTYPE_SUBSCRIPTIONPURGE:
- pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len);
+ case PGSTAT_MTYPE_SUBSCRIPTIONDROP:
+ pgstat_recv_subscription_drop(&msg.msg_subscriptiondrop, len);
break;
- case PGSTAT_MTYPE_SUBWORKERERROR:
- pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
+ case PGSTAT_MTYPE_SUBSCRIPTIONERROR:
+ pgstat_recv_subscription_error(&msg.msg_subscriptionerror, len);
break;
default:
@@ -3791,8 +3732,7 @@ PgstatCollectorMain(int argc, char *argv[])
/*
* Subroutine to clear stats in a database entry
*
- * Tables, functions, and subscription workers hashes are initialized
- * to empty.
+ * Tables and functions hashes are initialized to empty.
*/
static void
reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
@@ -3845,13 +3785,6 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
PGSTAT_FUNCTION_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_BLOBS);
-
- hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
- hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
- dbentry->subworkers = hash_create("Per-database subscription worker",
- PGSTAT_SUBWORKER_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS);
}
/*
@@ -3876,7 +3809,7 @@ pgstat_get_db_entry(Oid databaseid, bool create)
/*
* If not found, initialize the new one. This creates empty hash tables
- * for tables, functions, and subscription workers, too.
+ * for tables and functions, too.
*/
if (!found)
reset_dbentry_counters(result);
@@ -3934,48 +3867,6 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
return result;
}
-/* ----------
- * pgstat_get_subworker_entry
- *
- * Return subscription worker entry with the given subscription OID and
- * relation OID. If subrelid is InvalidOid, it returns an entry of the
- * apply worker otherwise returns an entry of the table sync worker
- * associated with subrelid. If no subscription worker entry exists,
- * initialize it, if the create parameter is true. Else, return NULL.
- * ----------
- */
-static PgStat_StatSubWorkerEntry *
-pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
- bool create)
-{
- PgStat_StatSubWorkerEntry *subwentry;
- PgStat_StatSubWorkerKey key;
- bool found;
- HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
-
- key.subid = subid;
- key.subrelid = subrelid;
- subwentry = (PgStat_StatSubWorkerEntry *) hash_search(dbentry->subworkers,
- (void *) &key,
- action, &found);
-
- if (!create && !found)
- return NULL;
-
- /* If not found, initialize the new one */
- if (!found)
- {
- subwentry->last_error_relid = InvalidOid;
- subwentry->last_error_command = 0;
- subwentry->last_error_xid = InvalidTransactionId;
- subwentry->last_error_count = 0;
- subwentry->last_error_time = 0;
- subwentry->last_error_message[0] = '\0';
- }
-
- return subwentry;
-}
-
/* ----------
* pgstat_write_statsfiles() -
* Write the global statistics file, as well as requested DB files.
@@ -4059,8 +3950,8 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
{
/*
- * Write out the table, function, and subscription-worker stats for
- * this DB into the appropriate per-DB stat file, if required.
+ * Write out the table and function stats for this DB into the
+ * appropriate per-DB stat file, if required.
*/
if (allDbs || pgstat_db_requested(dbentry->databaseid))
{
@@ -4095,6 +3986,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
}
}
+ /*
+ * Write subscription stats struct
+ */
+ if (subscriptionStatHash)
+ {
+ PgStat_StatSubEntry *subentry;
+
+ hash_seq_init(&hstat, subscriptionStatHash);
+ while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&hstat)) != NULL)
+ {
+ fputc('S', fpout);
+ rc = fwrite(subentry, sizeof(PgStat_StatSubEntry), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
+ }
+
/*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
@@ -4174,10 +4081,8 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
{
HASH_SEQ_STATUS tstat;
HASH_SEQ_STATUS fstat;
- HASH_SEQ_STATUS sstat;
PgStat_StatTabEntry *tabentry;
PgStat_StatFuncEntry *funcentry;
- PgStat_StatSubWorkerEntry *subwentry;
FILE *fpout;
int32 format_id;
Oid dbid = dbentry->databaseid;
@@ -4232,17 +4137,6 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
(void) rc; /* we'll check for error with ferror */
}
- /*
- * Walk through the database's subscription worker stats table.
- */
- hash_seq_init(&sstat, dbentry->subworkers);
- while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL)
- {
- fputc('S', fpout);
- rc = fwrite(subwentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout);
- (void) rc; /* we'll check for error with ferror */
- }
-
/*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
@@ -4301,9 +4195,8 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
* files after reading; the in-memory status is now authoritative, and the
* files would be out of date in case somebody else reads them.
*
- * If a 'deep' read is requested, table/function/subscription-worker stats are
- * read, otherwise the table/function/subscription-worker hash tables remain
- * empty.
+ * If a 'deep' read is requested, table/function stats are read, otherwise
+ * the table/function hash tables remain empty.
* ----------
*/
static HTAB *
@@ -4482,7 +4375,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
dbentry->tables = NULL;
dbentry->functions = NULL;
- dbentry->subworkers = NULL;
/*
* In the collector, disregard the timestamp we read from the
@@ -4494,8 +4386,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbentry->stats_timestamp = 0;
/*
- * Don't create tables/functions/subworkers hashtables for
- * uninteresting databases.
+ * Don't create tables/functions hashtables for uninteresting
+ * databases.
*/
if (onlydb != InvalidOid)
{
@@ -4520,14 +4412,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
&hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey);
- hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry);
- hash_ctl.hcxt = pgStatLocalContext;
- dbentry->subworkers = hash_create("Per-database subscription worker",
- PGSTAT_SUBWORKER_HASH_SIZE,
- &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
/*
* If requested, read the data from the database-specific
* file. Otherwise we just leave the hashtables empty.
@@ -4536,7 +4420,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
pgstat_read_db_statsfile(dbentry->databaseid,
dbentry->tables,
dbentry->functions,
- dbentry->subworkers,
permanent);
break;
@@ -4580,6 +4463,45 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break;
}
+ /*
+ * 'S' A PgStat_StatSubEntry struct describing subscription
+ * statistics.
+ */
+ case 'S':
+ {
+ PgStat_StatSubEntry subbuf;
+ PgStat_StatSubEntry *subentry;
+
+ if (fread(&subbuf, 1, sizeof(PgStat_StatSubEntry), fpin)
+ != sizeof(PgStat_StatSubEntry))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ goto done;
+ }
+
+ if (subscriptionStatHash == NULL)
+ {
+ HASHCTL hash_ctl;
+
+ hash_ctl.keysize = sizeof(Oid);
+ hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
+ hash_ctl.hcxt = pgStatLocalContext;
+ subscriptionStatHash = hash_create("Subscription hash",
+ PGSTAT_SUBSCRIPTION_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ }
+
+ subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash,
+ (void *) &subbuf.subid,
+ HASH_ENTER, NULL);
+
+ memcpy(subentry, &subbuf, sizeof(subbuf));
+ break;
+ }
+
case 'E':
goto done;
@@ -4614,21 +4536,19 @@ done:
* As in pgstat_read_statsfiles, if the permanent file is requested, it is
* removed after reading.
*
- * Note: this code has the ability to skip storing per-table, per-function, or
- * per-subscription-worker data, if NULL is passed for the corresponding hashtable.
- * That's not used at the moment though.
+ * Note: this code has the ability to skip storing per-table or per-function
+ * data, if NULL is passed for the corresponding hashtable. That's not used
+ * at the moment though.
* ----------
*/
static void
pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
- HTAB *subworkerhash, bool permanent)
+ bool permanent)
{
PgStat_StatTabEntry *tabentry;
PgStat_StatTabEntry tabbuf;
PgStat_StatFuncEntry funcbuf;
PgStat_StatFuncEntry *funcentry;
- PgStat_StatSubWorkerEntry subwbuf;
- PgStat_StatSubWorkerEntry *subwentry;
FILE *fpin;
int32 format_id;
bool found;
@@ -4742,41 +4662,6 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
memcpy(funcentry, &funcbuf, sizeof(funcbuf));
break;
- /*
- * 'S' A PgStat_StatSubWorkerEntry struct describing
- * subscription worker statistics.
- */
- case 'S':
- if (fread(&subwbuf, 1, sizeof(PgStat_StatSubWorkerEntry),
- fpin) != sizeof(PgStat_StatSubWorkerEntry))
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
-
- /*
- * Skip if subscription worker data not wanted.
- */
- if (subworkerhash == NULL)
- break;
-
- subwentry = (PgStat_StatSubWorkerEntry *) hash_search(subworkerhash,
- (void *) &subwbuf.key,
- HASH_ENTER, &found);
-
- if (found)
- {
- ereport(pgStatRunningInCollector ? LOG : WARNING,
- (errmsg("corrupted statistics file \"%s\"",
- statfile)));
- goto done;
- }
-
- memcpy(subwentry, &subwbuf, sizeof(subwbuf));
- break;
-
/*
* 'E' The EOF marker of a complete stats file.
*/
@@ -4829,6 +4714,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
PgStat_StatReplSlotEntry myReplSlotStats;
+ PgStat_StatSubEntry mySubStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -4959,6 +4845,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
}
break;
+ /*
+ * 'S' A PgStat_StatSubEntry struct describing subscription
+ * statistics follows.
+ */
+ case 'S':
+ if (fread(&mySubStats, 1, sizeof(PgStat_StatSubEntry), fpin)
+ != sizeof(PgStat_StatSubEntry))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ FreeFile(fpin);
+ return false;
+ }
+ break;
+
case 'E':
goto done;
@@ -5164,6 +5066,7 @@ pgstat_clear_snapshot(void)
pgStatLocalContext = NULL;
pgStatDBHash = NULL;
replSlotStatHash = NULL;
+ subscriptionStatHash = NULL;
/*
* Historically the backend_status.c facilities lived in this file, and
@@ -5450,8 +5353,6 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
hash_destroy(dbentry->tables);
if (dbentry->functions != NULL)
hash_destroy(dbentry->functions);
- if (dbentry->subworkers != NULL)
- hash_destroy(dbentry->subworkers);
if (hash_search(pgStatDBHash,
(void *) &dbid,
@@ -5489,16 +5390,13 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
hash_destroy(dbentry->tables);
if (dbentry->functions != NULL)
hash_destroy(dbentry->functions);
- if (dbentry->subworkers != NULL)
- hash_destroy(dbentry->subworkers);
dbentry->tables = NULL;
dbentry->functions = NULL;
- dbentry->subworkers = NULL;
/*
* Reset database-level stats, too. This creates empty hash tables for
- * tables, functions, and subscription workers.
+ * tables and functions.
*/
reset_dbentry_counters(dbentry);
}
@@ -5567,14 +5465,6 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
else if (msg->m_resettype == RESET_FUNCTION)
(void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
HASH_REMOVE, NULL);
- else if (msg->m_resettype == RESET_SUBWORKER)
- {
- PgStat_StatSubWorkerKey key;
-
- key.subid = msg->m_objectid;
- key.subrelid = msg->m_subobjectid;
- (void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL);
- }
}
/* ----------
@@ -5645,6 +5535,51 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
}
}
+/* ----------
+ * pgstat_recv_resetsubcounter() -
+ *
+ * Reset some subscription statistics of the cluster.
+ * ----------
+ */
+static void
+pgstat_recv_resetsubcounter(PgStat_MsgResetsubcounter *msg, int len)
+{
+ PgStat_StatSubEntry *subentry;
+ TimestampTz ts;
+
+ /* Return if we don't have replication subscription statistics */
+ if (subscriptionStatHash == NULL)
+ return;
+
+ ts = GetCurrentTimestamp();
+ if (!OidIsValid(msg->m_subid))
+ {
+ HASH_SEQ_STATUS sstat;
+
+ /* Clear all subscription counters */
+ hash_seq_init(&sstat, subscriptionStatHash);
+ while ((subentry = (PgStat_StatSubEntry *) hash_seq_search(&sstat)) != NULL)
+ pgstat_reset_subscription(subentry, ts);
+ }
+ else
+ {
+ /* Get the subscription statistics to reset */
+ subentry = pgstat_get_subscription_entry(msg->m_subid, false);
+
+ /*
+ * Nothing to do if the given subscription entry is not found. This
+ * could happen when the subscription with the subid is removed and
+ * the corresponding statistics entry is also removed before receiving
+ * the reset message.
+ */
+ if (!subentry)
+ return;
+
+ /* Reset the stats for the requested subscription */
+ pgstat_reset_subscription(subentry, ts);
+ }
+}
+
/* ----------
* pgstat_recv_autovac() -
@@ -6118,81 +6053,42 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
}
/* ----------
- * pgstat_recv_subscription_purge() -
+ * pgstat_recv_subscription_drop() -
*
- * Process a SUBSCRIPTIONPURGE message.
+ * Process a SUBSCRIPTIONDROP message.
* ----------
*/
static void
-pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
+pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len)
{
- HASH_SEQ_STATUS hstat;
- PgStat_StatDBEntry *dbentry;
- PgStat_StatSubWorkerEntry *subwentry;
-
- dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
-
- /* No need to purge if we don't even know the database */
- if (!dbentry || !dbentry->subworkers)
+ /* Return if we don't have replication subscription statistics */
+ if (subscriptionStatHash == NULL)
return;
- /* Remove all subscription worker statistics for the given subscriptions */
- hash_seq_init(&hstat, dbentry->subworkers);
- while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL)
- {
- for (int i = 0; i < msg->m_nentries; i++)
- {
- if (subwentry->key.subid == msg->m_subids[i])
- {
- (void) hash_search(dbentry->subworkers, (void *) &(subwentry->key),
- HASH_REMOVE, NULL);
- break;
- }
- }
- }
+ /* Remove from hashtable if present; we don't care if it's not */
+ (void) hash_search(subscriptionStatHash, (void *) &(msg->m_subid),
+ HASH_REMOVE, NULL);
}
/* ----------
- * pgstat_recv_subworker_error() -
+ * pgstat_recv_subscription_error() -
*
- * Process a SUBWORKERERROR message.
+ * Process a SUBSCRIPTIONERROR message.
* ----------
*/
static void
-pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
+pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len)
{
- PgStat_StatDBEntry *dbentry;
- PgStat_StatSubWorkerEntry *subwentry;
+ PgStat_StatSubEntry *subentry;
- dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+ /* Get the subscription stats */
+ subentry = pgstat_get_subscription_entry(msg->m_subid, true);
+ Assert(subentry);
- /* Get the subscription worker stats */
- subwentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
- msg->m_subrelid, true);
- Assert(subwentry);
-
- if (subwentry->last_error_relid == msg->m_relid &&
- subwentry->last_error_command == msg->m_command &&
- subwentry->last_error_xid == msg->m_xid &&
- strcmp(subwentry->last_error_message, msg->m_message) == 0)
- {
- /*
- * The same error occurred again in succession, just update its
- * timestamp and count.
- */
- subwentry->last_error_count++;
- subwentry->last_error_time = msg->m_timestamp;
- return;
- }
-
- /* Otherwise, update the error information */
- subwentry->last_error_relid = msg->m_relid;
- subwentry->last_error_command = msg->m_command;
- subwentry->last_error_xid = msg->m_xid;
- subwentry->last_error_count = 1;
- subwentry->last_error_time = msg->m_timestamp;
- strlcpy(subwentry->last_error_message, msg->m_message,
- PGSTAT_SUBWORKERERROR_MSGLEN);
+ if (msg->m_is_apply_error)
+ subentry->apply_error_count++;
+ else
+ subentry->sync_error_count++;
}
/* ----------
@@ -6313,6 +6209,68 @@ pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
slotent->stat_reset_timestamp = ts;
}
+/* ----------
+ * pgstat_get_subscription_entry
+ *
+ * Return the subscription statistics entry with the given subscription OID.
+ * If no subscription entry exists, initialize it, if the create parameter is
+ * true. Else, return NULL.
+ * ----------
+ */
+static PgStat_StatSubEntry *
+pgstat_get_subscription_entry(Oid subid, bool create)
+{
+ PgStat_StatSubEntry *subentry;
+ bool found;
+ HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
+
+ if (subscriptionStatHash == NULL)
+ {
+ HASHCTL hash_ctl;
+
+ /*
+ * Quick return NULL if the hash table is empty and the caller didn't
+ * request to create the entry.
+ */
+ if (!create)
+ return NULL;
+
+ hash_ctl.keysize = sizeof(Oid);
+ hash_ctl.entrysize = sizeof(PgStat_StatSubEntry);
+ subscriptionStatHash = hash_create("Subscription hash",
+ PGSTAT_SUBSCRIPTION_HASH_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS);
+ }
+
+ subentry = (PgStat_StatSubEntry *) hash_search(subscriptionStatHash,
+ (void *) &subid,
+ action, &found);
+
+ if (!create && !found)
+ return NULL;
+
+ /* If not found, initialize the new one */
+ if (!found)
+ pgstat_reset_subscription(subentry, 0);
+
+ return subentry;
+}
+
+/* ----------
+ * pgstat_reset_subscription
+ *
+ * Reset the given subscription stats.
+ * ----------
+ */
+static void
+pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts)
+{
+ subentry->apply_error_count = 0;
+ subentry->sync_error_count = 0;
+ subentry->stat_reset_timestamp = ts;
+}
+
/*
* pgstat_slru_index
*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5d9acc61733..7e267f79607 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3377,7 +3377,6 @@ void
ApplyWorkerMain(Datum main_arg)
{
int worker_slot = DatumGetInt32(main_arg);
- MemoryContext cctx = CurrentMemoryContext;
MemoryContext oldctx;
char originname[NAMEDATALEN];
XLogRecPtr origin_startpos;
@@ -3485,20 +3484,15 @@ ApplyWorkerMain(Datum main_arg)
}
PG_CATCH();
{
- MemoryContext ecxt = MemoryContextSwitchTo(cctx);
- ErrorData *errdata = CopyErrorData();
-
/*
- * Report the table sync error. There is no corresponding message
- * type for table synchronization.
+ * Abort the current transaction so that we send the stats message
+ * in an idle state.
*/
- pgstat_report_subworker_error(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relid,
- 0, /* message type */
- InvalidTransactionId,
- errdata->message);
- MemoryContextSwitchTo(ecxt);
+ AbortOutOfAnyTransaction();
+
+ /* Report the worker failed during table synchronization */
+ pgstat_report_subscription_error(MySubscription->oid, false);
+
PG_RE_THROW();
}
PG_END_TRY();
@@ -3625,22 +3619,14 @@ ApplyWorkerMain(Datum main_arg)
}
PG_CATCH();
{
- /* report the apply error */
- if (apply_error_callback_arg.command != 0)
- {
- MemoryContext ecxt = MemoryContextSwitchTo(cctx);
- ErrorData *errdata = CopyErrorData();
+ /*
+ * Abort the current transaction so that we send the stats message in
+ * an idle state.
+ */
+ AbortOutOfAnyTransaction();
- pgstat_report_subworker_error(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- apply_error_callback_arg.rel != NULL
- ? apply_error_callback_arg.rel->localreloid
- : InvalidOid,
- apply_error_callback_arg.command,
- apply_error_callback_arg.remote_xid,
- errdata->message);
- MemoryContextSwitchTo(ecxt);
- }
+ /* Report the worker failed while applying changes */
+ pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
PG_RE_THROW();
}
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 30e8dfa7c12..fd993d0d5fb 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2163,7 +2163,7 @@ pg_stat_reset_single_table_counters(PG_FUNCTION_ARGS)
{
Oid taboid = PG_GETARG_OID(0);
- pgstat_reset_single_counter(taboid, InvalidOid, RESET_TABLE);
+ pgstat_reset_single_counter(taboid, RESET_TABLE);
PG_RETURN_VOID();
}
@@ -2173,38 +2173,11 @@ pg_stat_reset_single_function_counters(PG_FUNCTION_ARGS)
{
Oid funcoid = PG_GETARG_OID(0);
- pgstat_reset_single_counter(funcoid, InvalidOid, RESET_FUNCTION);
+ pgstat_reset_single_counter(funcoid, RESET_FUNCTION);
PG_RETURN_VOID();
}
-Datum
-pg_stat_reset_subscription_worker_subrel(PG_FUNCTION_ARGS)
-{
- Oid subid = PG_GETARG_OID(0);
- Oid relid = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
-
- pgstat_reset_single_counter(subid, relid, RESET_SUBWORKER);
-
- PG_RETURN_VOID();
-}
-
-/* Reset all subscription worker stats associated with the given subscription */
-Datum
-pg_stat_reset_subscription_worker_sub(PG_FUNCTION_ARGS)
-{
- Oid subid = PG_GETARG_OID(0);
-
- /*
- * Use subscription drop message to remove statistics of all subscription
- * workers.
- */
- pgstat_report_subscription_drop(subid);
-
- PG_RETURN_VOID();
-}
-
-
/* Reset SLRU counters (a specific one or all of them). */
Datum
pg_stat_reset_slru(PG_FUNCTION_ARGS)
@@ -2258,6 +2231,32 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+/* Reset subscription stats (a specific one or all of them) */
+Datum
+pg_stat_reset_subscription_stats(PG_FUNCTION_ARGS)
+{
+ Oid subid;
+
+ if (PG_ARGISNULL(0))
+ {
+ /* Clear all subscription stats */
+ subid = InvalidOid;
+ }
+ else
+ {
+ subid = PG_GETARG_OID(0);
+
+ if (!OidIsValid(subid))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid subscription OID %u", subid)));
+ }
+
+ pgstat_reset_subscription_counter(subid);
+
+ PG_RETURN_VOID();
+}
+
Datum
pg_stat_get_archiver(PG_FUNCTION_ARGS)
{
@@ -2400,50 +2399,32 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
}
/*
- * Get the subscription worker statistics for the given subscription
- * (and relation).
+ * Get the subscription statistics for the given subscription. If the
+ * subscription statistics is not available, return all-zeros stats.
*/
Datum
-pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
+pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 8
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 4
Oid subid = PG_GETARG_OID(0);
- Oid subrelid;
TupleDesc tupdesc;
- Datum values[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS];
- bool nulls[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS];
- PgStat_StatSubWorkerEntry *wentry;
- int i;
+ Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS];
+ bool nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS];
+ PgStat_StatSubEntry *subentry;
+ PgStat_StatSubEntry allzero;
- if (PG_ARGISNULL(1))
- subrelid = InvalidOid;
- else
- subrelid = PG_GETARG_OID(1);
-
- /* Get subscription worker stats */
- wentry = pgstat_fetch_stat_subworker_entry(subid, subrelid);
-
- /* Return NULL if there is no worker statistics */
- if (wentry == NULL)
- PG_RETURN_NULL();
+ /* Get subscription stats */
+ subentry = pgstat_fetch_stat_subscription(subid);
/* Initialise attributes information in the tuple descriptor */
- tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_WORKER_COLS);
+ tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid",
OIDOID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
- OIDOID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid",
- OIDOID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command",
- TEXTOID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid",
- XIDOID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message",
- TEXTOID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
@@ -2451,46 +2432,27 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
- i = 0;
+ if (!subentry)
+ {
+ /* If the subscription is not found, initialise its stats */
+ memset(&allzero, 0, sizeof(PgStat_StatSubEntry));
+ subentry = &allzero;
+ }
+
/* subid */
- values[i++] = ObjectIdGetDatum(subid);
+ values[0] = ObjectIdGetDatum(subid);
- /* subrelid */
- if (OidIsValid(subrelid))
- values[i++] = ObjectIdGetDatum(subrelid);
+ /* apply_error_count */
+ values[1] = Int64GetDatum(subentry->apply_error_count);
+
+ /* sync_error_count */
+ values[2] = Int64GetDatum(subentry->sync_error_count);
+
+ /* stats_reset */
+ if (subentry->stat_reset_timestamp == 0)
+ nulls[3] = true;
else
- nulls[i++] = true;
-
- /* last_error_relid */
- if (OidIsValid(wentry->last_error_relid))
- values[i++] = ObjectIdGetDatum(wentry->last_error_relid);
- else
- nulls[i++] = true;
-
- /* last_error_command */
- if (wentry->last_error_command != 0)
- values[i++] =
- CStringGetTextDatum(logicalrep_message_type(wentry->last_error_command));
- else
- nulls[i++] = true;
-
- /* last_error_xid */
- if (TransactionIdIsValid(wentry->last_error_xid))
- values[i++] = TransactionIdGetDatum(wentry->last_error_xid);
- else
- nulls[i++] = true;
-
- /* last_error_count */
- values[i++] = Int64GetDatum(wentry->last_error_count);
-
- /* last_error_message */
- values[i++] = CStringGetTextDatum(wentry->last_error_message);
-
- /* last_error_time */
- if (wentry->last_error_time != 0)
- values[i++] = TimestampTzGetDatum(wentry->last_error_time);
- else
- nulls[i++] = true;
+ values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 14194afe1cc..5cf18059b82 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202202251
+#define CATALOG_VERSION_NO 202203011
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7de8cfc7e91..bf888581716 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5376,14 +5376,14 @@
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slot' },
-{ oid => '8523', descr => 'statistics: information about subscription worker',
- proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
- proretset => 't', provolatile => 's', proparallel => 'r',
- prorettype => 'record', proargtypes => 'oid oid',
- proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}',
- proargmodes => '{i,i,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
- prosrc => 'pg_stat_get_subscription_worker' },
+{ oid => '8523', descr => 'statistics: information about subscription stats',
+ proname => 'pg_stat_get_subscription_stats', proisstrict => 'f',
+ provolatile => 's', proparallel => 'r',
+ prorettype => 'record', proargtypes => 'oid',
+ proallargtypes => '{oid,oid,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+ prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5772,15 +5772,10 @@
provolatile => 'v', prorettype => 'void', proargtypes => 'text',
prosrc => 'pg_stat_reset_replication_slot' },
{ oid => '8524',
- descr => 'statistics: reset collected statistics for a single subscription worker',
- proname => 'pg_stat_reset_subscription_worker', proisstrict => 'f',
- provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid',
- prosrc => 'pg_stat_reset_subscription_worker_subrel' },
-{ oid => '8525',
- descr => 'statistics: reset all collected statistics for a single subscription',
- proname => 'pg_stat_reset_subscription_worker',
+ descr => 'statistics: reset collected statistics for a single subscription',
+ proname => 'pg_stat_reset_subscription_stats', proisstrict => 'f',
provolatile => 'v', prorettype => 'void', proargtypes => 'oid',
- prosrc => 'pg_stat_reset_subscription_worker_sub' },
+ prosrc => 'pg_stat_reset_subscription_stats' },
{ oid => '3163', descr => 'current trigger depth',
proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e10d20222a2..be2f7e2bcc7 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -67,6 +67,7 @@ typedef enum StatMsgType
PGSTAT_MTYPE_RESETSINGLECOUNTER,
PGSTAT_MTYPE_RESETSLRUCOUNTER,
PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
+ PGSTAT_MTYPE_RESETSUBCOUNTER,
PGSTAT_MTYPE_AUTOVAC_START,
PGSTAT_MTYPE_VACUUM,
PGSTAT_MTYPE_ANALYZE,
@@ -84,8 +85,8 @@ typedef enum StatMsgType
PGSTAT_MTYPE_REPLSLOT,
PGSTAT_MTYPE_CONNECT,
PGSTAT_MTYPE_DISCONNECT,
- PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
- PGSTAT_MTYPE_SUBWORKERERROR,
+ PGSTAT_MTYPE_SUBSCRIPTIONDROP,
+ PGSTAT_MTYPE_SUBSCRIPTIONERROR,
} StatMsgType;
/* ----------
@@ -148,8 +149,7 @@ typedef enum PgStat_Shared_Reset_Target
typedef enum PgStat_Single_Reset_Type
{
RESET_TABLE,
- RESET_FUNCTION,
- RESET_SUBWORKER
+ RESET_FUNCTION
} PgStat_Single_Reset_Type;
/* ------------------------------------------------------------
@@ -368,7 +368,6 @@ typedef struct PgStat_MsgResetsinglecounter
Oid m_databaseid;
PgStat_Single_Reset_Type m_resettype;
Oid m_objectid;
- Oid m_subobjectid;
} PgStat_MsgResetsinglecounter;
/* ----------
@@ -394,6 +393,19 @@ typedef struct PgStat_MsgResetreplslotcounter
bool clearall;
} PgStat_MsgResetreplslotcounter;
+/* ----------
+ * PgStat_MsgResetsubcounter Sent by the backend to tell the collector
+ * to reset subscription counter(s)
+ * ----------
+ */
+typedef struct PgStat_MsgResetsubcounter
+{
+ PgStat_MsgHdr m_hdr;
+ Oid m_subid; /* InvalidOid means reset all subscription
+ * stats */
+} PgStat_MsgResetsubcounter;
+
+
/* ----------
* PgStat_MsgAutovacStart Sent by the autovacuum daemon to signal
* that a database is going to be processed
@@ -542,53 +554,28 @@ typedef struct PgStat_MsgReplSlot
} PgStat_MsgReplSlot;
/* ----------
- * PgStat_MsgSubscriptionPurge Sent by the backend and autovacuum to tell the
- * collector about the dead subscriptions.
+ * PgStat_MsgSubscriptionDrop Sent by the backend and autovacuum to tell the
+ * collector about the dead subscription.
* ----------
*/
-#define PGSTAT_NUM_SUBSCRIPTIONPURGE \
- ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid))
-
-typedef struct PgStat_MsgSubscriptionPurge
+typedef struct PgStat_MsgSubscriptionDrop
{
PgStat_MsgHdr m_hdr;
- Oid m_databaseid;
- int m_nentries;
- Oid m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE];
-} PgStat_MsgSubscriptionPurge;
+ Oid m_subid;
+} PgStat_MsgSubscriptionDrop;
/* ----------
- * PgStat_MsgSubWorkerError Sent by the apply worker or the table sync
- * worker to report the error occurred while
- * processing changes.
+ * PgStat_MsgSubscriptionError Sent by the apply worker or the table sync
+ * worker to report an error on the subscription.
* ----------
*/
-#define PGSTAT_SUBWORKERERROR_MSGLEN 256
-typedef struct PgStat_MsgSubWorkerError
+typedef struct PgStat_MsgSubscriptionError
{
PgStat_MsgHdr m_hdr;
- /*
- * m_subid and m_subrelid are used to determine the subscription and the
- * reporter of the error. m_subrelid is InvalidOid if reported by an apply
- * worker otherwise reported by a table sync worker.
- */
- Oid m_databaseid;
Oid m_subid;
- Oid m_subrelid;
-
- /*
- * Oid of the table that the reporter was actually processing. m_relid can
- * be InvalidOid if an error occurred during worker applying a
- * non-data-modification message such as RELATION.
- */
- Oid m_relid;
-
- LogicalRepMsgType m_command;
- TransactionId m_xid;
- TimestampTz m_timestamp;
- char m_message[PGSTAT_SUBWORKERERROR_MSGLEN];
-} PgStat_MsgSubWorkerError;
+ bool m_is_apply_error;
+} PgStat_MsgSubscriptionError;
/* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
@@ -750,6 +737,7 @@ typedef union PgStat_Msg
PgStat_MsgResetsinglecounter msg_resetsinglecounter;
PgStat_MsgResetslrucounter msg_resetslrucounter;
PgStat_MsgResetreplslotcounter msg_resetreplslotcounter;
+ PgStat_MsgResetsubcounter msg_resetsubcounter;
PgStat_MsgAutovacStart msg_autovacuum_start;
PgStat_MsgVacuum msg_vacuum;
PgStat_MsgAnalyze msg_analyze;
@@ -767,8 +755,8 @@ typedef union PgStat_Msg
PgStat_MsgReplSlot msg_replslot;
PgStat_MsgConnect msg_connect;
PgStat_MsgDisconnect msg_disconnect;
- PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
- PgStat_MsgSubWorkerError msg_subworkererror;
+ PgStat_MsgSubscriptionError msg_subscriptionerror;
+ PgStat_MsgSubscriptionDrop msg_subscriptiondrop;
} PgStat_Msg;
@@ -780,7 +768,7 @@ typedef union PgStat_Msg
* ------------------------------------------------------------
*/
-#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA5
+#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA6
/* ----------
* PgStat_StatDBEntry The collector's data per database
@@ -823,16 +811,11 @@ typedef struct PgStat_StatDBEntry
TimestampTz stats_timestamp; /* time of db stats file update */
/*
- * tables, functions, and subscription workers must be last in the struct,
- * because we don't write the pointers out to the stats file.
- *
- * subworkers is the hash table of PgStat_StatSubWorkerEntry which stores
- * statistics of logical replication workers: apply worker and table sync
- * worker.
+ * tables and functions must be last in the struct, because we don't write
+ * the pointers out to the stats file.
*/
HTAB *tables;
HTAB *functions;
- HTAB *subworkers;
} PgStat_StatDBEntry;
@@ -989,38 +972,17 @@ typedef struct PgStat_StatReplSlotEntry
TimestampTz stat_reset_timestamp;
} PgStat_StatReplSlotEntry;
-/* The lookup key for subscription worker hash table */
-typedef struct PgStat_StatSubWorkerKey
-{
- Oid subid;
-
- /*
- * Oid of the table for which tablesync worker will copy the initial data.
- * An InvalidOid will be assigned for apply workers.
- */
- Oid subrelid;
-} PgStat_StatSubWorkerKey;
-
/*
- * Logical replication apply worker and table sync worker statistics kept in the
- * stats collector.
+ * Subscription statistics kept in the stats collector.
*/
-typedef struct PgStat_StatSubWorkerEntry
+typedef struct PgStat_StatSubEntry
{
- PgStat_StatSubWorkerKey key; /* hash key (must be first) */
+ Oid subid; /* hash key (must be first) */
- /*
- * Subscription worker error statistics representing an error that
- * occurred during application of changes or the initial table
- * synchronization.
- */
- Oid last_error_relid;
- LogicalRepMsgType last_error_command;
- TransactionId last_error_xid;
- PgStat_Counter last_error_count;
- TimestampTz last_error_time;
- char last_error_message[PGSTAT_SUBWORKERERROR_MSGLEN];
-} PgStat_StatSubWorkerEntry;
+ PgStat_Counter apply_error_count;
+ PgStat_Counter sync_error_count;
+ TimestampTz stat_reset_timestamp;
+} PgStat_StatSubEntry;
/*
* Working state needed to accumulate per-function-call timing statistics.
@@ -1111,10 +1073,10 @@ extern void pgstat_drop_database(Oid databaseid);
extern void pgstat_clear_snapshot(void);
extern void pgstat_reset_counters(void);
extern void pgstat_reset_shared_counters(const char *);
-extern void pgstat_reset_single_counter(Oid objectid, Oid subobjectid,
- PgStat_Single_Reset_Type type);
+extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
extern void pgstat_reset_slru_counter(const char *);
extern void pgstat_reset_replslot_counter(const char *name);
+extern void pgstat_reset_subscription_counter(Oid subid);
extern void pgstat_report_connect(Oid dboid);
extern void pgstat_report_autovac(Oid dboid);
@@ -1131,9 +1093,7 @@ extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
extern void pgstat_report_replslot_create(const char *slotname);
extern void pgstat_report_replslot_drop(const char *slotname);
-extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
- LogicalRepMsgType command,
- TransactionId xid, const char *errmsg);
+extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
extern void pgstat_report_subscription_drop(Oid subid);
extern void pgstat_initialize(void);
@@ -1226,8 +1186,7 @@ extern void pgstat_send_wal(bool force);
extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid);
extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid);
extern PgStat_StatFuncEntry *pgstat_fetch_stat_funcentry(Oid funcid);
-extern PgStat_StatSubWorkerEntry *pgstat_fetch_stat_subworker_entry(Oid subid,
- Oid subrelid);
+extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
extern PgStat_BgWriterStats *pgstat_fetch_stat_bgwriter(void);
extern PgStat_CheckpointerStats *pgstat_fetch_stat_checkpointer(void);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 1420288d67b..ac468568a1a 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2072,24 +2072,13 @@ pg_stat_subscription| SELECT su.oid AS subid,
st.latest_end_time
FROM (pg_subscription su
LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
-pg_stat_subscription_workers| SELECT w.subid,
+pg_stat_subscription_stats| SELECT ss.subid,
s.subname,
- w.subrelid,
- w.last_error_relid,
- w.last_error_command,
- w.last_error_xid,
- w.last_error_count,
- w.last_error_message,
- w.last_error_time
- FROM ( SELECT pg_subscription.oid AS subid,
- NULL::oid AS relid
- FROM pg_subscription
- UNION ALL
- SELECT pg_subscription_rel.srsubid AS subid,
- pg_subscription_rel.srrelid AS relid
- FROM pg_subscription_rel) sr,
- (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
- JOIN pg_subscription s ON ((w.subid = s.oid)));
+ ss.apply_error_count,
+ ss.sync_error_count,
+ ss.stats_reset
+ FROM pg_subscription s,
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
pg_stat_all_indexes.indexrelid,
pg_stat_all_indexes.schemaname,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
new file mode 100644
index 00000000000..a42ea3170ec
--- /dev/null
+++ b/src/test/subscription/t/026_stats.pl
@@ -0,0 +1,102 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Tests for subscription stats.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Create publisher node.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node.
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On subscriber we
+# create the same tables but with primary keys. Also, insert some data that
+# will conflict with the data replicated from publisher later.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+CREATE TABLE test_tab1 (a int);
+INSERT INTO test_tab1 VALUES (1);
+COMMIT;
+]);
+$node_subscriber->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+CREATE TABLE test_tab1 (a int primary key);
+INSERT INTO test_tab1 VALUES (1);
+COMMIT;
+]);
+
+# Setup publication.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab1;");
+
+# There shouldn't be any subscription errors before starting logical replication.
+my $result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(1) FROM pg_stat_subscription_stats");
+is($result, qq(0), 'check no subscription error');
+
+# Create subscription. The tablesync for test_tab1 on tap_sub will enter into
+# infinite error loop due to violating the unique constraint.
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Wait for the tablesync error to be reported.
+$node_subscriber->poll_query_until(
+ 'postgres',
+ qq[
+SELECT sync_error_count > 0
+FROM pg_stat_subscription_stats
+WHERE subname = 'tap_sub'
+]) or die "Timed out while waiting for tablesync error";
+
+# Truncate test_tab1 so that tablesync worker can continue.
+$node_subscriber->safe_psql('postgres', "TRUNCATE test_tab1;");
+
+# Wait for initial tablesync for test_tab1 to finish.
+$node_subscriber->poll_query_until(
+ 'postgres',
+ qq[
+SELECT count(1) = 1 FROM pg_subscription_rel
+WHERE srrelid = 'test_tab1'::regclass AND srsubstate in ('r', 's')
+]) or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check test_tab1 on the subscriber has one row.
+$result = $node_subscriber->safe_psql('postgres', "SELECT a FROM test_tab1");
+is($result, qq(1), 'check the table has now row');
+
+# Insert data to test_tab1 on the publisher, raising an error on the subscriber
+# due to violation of the unique constraint on test_tab1.
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab1 VALUES (1)");
+
+# Wait for the apply error to be reported.
+$node_subscriber->poll_query_until(
+ 'postgres',
+ qq[
+SELECT apply_error_count > 0
+FROM pg_stat_subscription_stats
+WHERE subname = 'tap_sub'
+]) or die "Timed out while waiting for apply error";
+
+# Truncate test_tab1 so that apply worker can continue.
+$node_subscriber->safe_psql('postgres', "TRUNCATE test_tab1;");
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
diff --git a/src/test/subscription/t/026_worker_stats.pl b/src/test/subscription/t/026_worker_stats.pl
deleted file mode 100644
index f72e4766e83..00000000000
--- a/src/test/subscription/t/026_worker_stats.pl
+++ /dev/null
@@ -1,165 +0,0 @@
-
-# Copyright (c) 2021-2022, PostgreSQL Global Development Group
-
-# Tests for subscription error stats.
-use strict;
-use warnings;
-use PostgreSQL::Test::Cluster;
-use PostgreSQL::Test::Utils;
-use Test::More;
-
-# Test if the error reported on pg_stat_subscription_workers view is expected.
-sub test_subscription_error
-{
- my ($node, $relname, $command, $xid, $by_apply_worker, $errmsg_prefix, $msg)
- = @_;
-
- my $check_sql = qq[
-SELECT count(1) > 0
-FROM pg_stat_subscription_workers
-WHERE last_error_relid = '$relname'::regclass
- AND starts_with(last_error_message, '$errmsg_prefix')];
-
- # subrelid
- $check_sql .= $by_apply_worker
- ? qq[ AND subrelid IS NULL]
- : qq[ AND subrelid = '$relname'::regclass];
-
- # last_error_command
- $check_sql .= $command eq ''
- ? qq[ AND last_error_command IS NULL]
- : qq[ AND last_error_command = '$command'];
-
- # last_error_xid
- $check_sql .= $xid eq ''
- ? qq[ AND last_error_xid IS NULL]
- : qq[ AND last_error_xid = '$xid'::xid];
-
- # Wait for the particular error statistics to be reported.
- $node->poll_query_until('postgres', $check_sql,
-) or die "Timed out while waiting for " . $msg;
-}
-
-# Create publisher node.
-my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
-$node_publisher->init(allows_streaming => 'logical');
-$node_publisher->start;
-
-# Create subscriber node.
-my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
-$node_subscriber->init(allows_streaming => 'logical');
-
-# The subscriber will enter an infinite error loop, so we don't want
-# to overflow the server log with error messages.
-$node_subscriber->append_conf('postgresql.conf',
- qq[
-wal_retrieve_retry_interval = 2s
-]);
-$node_subscriber->start;
-
-# Initial table setup on both publisher and subscriber. On subscriber we
-# create the same tables but with primary keys. Also, insert some data that
-# will conflict with the data replicated from publisher later.
-$node_publisher->safe_psql(
- 'postgres',
- qq[
-BEGIN;
-CREATE TABLE test_tab1 (a int);
-CREATE TABLE test_tab2 (a int);
-INSERT INTO test_tab1 VALUES (1);
-INSERT INTO test_tab2 VALUES (1);
-COMMIT;
-]);
-$node_subscriber->safe_psql(
- 'postgres',
- qq[
-BEGIN;
-CREATE TABLE test_tab1 (a int primary key);
-CREATE TABLE test_tab2 (a int primary key);
-INSERT INTO test_tab2 VALUES (1);
-COMMIT;
-]);
-
-# Setup publications.
-my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
-$node_publisher->safe_psql(
- 'postgres',
- "CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2;");
-
-# There shouldn't be any subscription errors before starting logical replication.
-my $result = $node_subscriber->safe_psql(
- 'postgres',
- "SELECT count(1) FROM pg_stat_subscription_workers");
-is($result, qq(0), 'check no subscription error');
-
-# Create subscription. The table sync for test_tab2 on tap_sub will enter into
-# infinite error loop due to violating the unique constraint.
-$node_subscriber->safe_psql(
- 'postgres',
- "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;");
-
-$node_publisher->wait_for_catchup('tap_sub');
-
-# Wait for initial table sync for test_tab1 to finish.
-$node_subscriber->poll_query_until(
- 'postgres',
- qq[
-SELECT count(1) = 1 FROM pg_subscription_rel
-WHERE srrelid = 'test_tab1'::regclass AND srsubstate in ('r', 's')
-]) or die "Timed out while waiting for subscriber to synchronize data";
-
-# Check the initial data.
-$result = $node_subscriber->safe_psql(
- 'postgres',
- "SELECT count(a) FROM test_tab1");
-is($result, q(1), 'check initial data are copied to subscriber');
-
-# Insert more data to test_tab1, raising an error on the subscriber due to
-# violation of the unique constraint on test_tab1.
-my $xid = $node_publisher->safe_psql(
- 'postgres',
- qq[
-BEGIN;
-INSERT INTO test_tab1 VALUES (1);
-SELECT pg_current_xact_id()::xid;
-COMMIT;
-]);
-test_subscription_error($node_subscriber, 'test_tab1', 'INSERT', $xid,
- 1, # check apply worker error
- qq(duplicate key value violates unique constraint),
- 'error reported by the apply worker');
-
-# Check the table sync worker's error in the view.
-test_subscription_error($node_subscriber, 'test_tab2', '', '',
- 0, # check tablesync worker error
- qq(duplicate key value violates unique constraint),
- 'the error reported by the table sync worker');
-
-# Test for resetting subscription worker statistics.
-# Truncate test_tab1 and test_tab2 so that applying changes and table sync can
-# continue, respectively.
-$node_subscriber->safe_psql(
- 'postgres',
- "TRUNCATE test_tab1, test_tab2;");
-
-# Wait for the data to be replicated.
-$node_subscriber->poll_query_until(
- 'postgres',
- "SELECT count(1) > 0 FROM test_tab1");
-$node_subscriber->poll_query_until(
- 'postgres',
- "SELECT count(1) > 0 FROM test_tab2");
-
-# There shouldn't be any errors in the view after dropping the subscription.
-$node_subscriber->safe_psql(
- 'postgres',
- "DROP SUBSCRIPTION tap_sub;");
-$result = $node_subscriber->safe_psql(
- 'postgres',
- "SELECT count(1) FROM pg_stat_subscription_workers");
-is($result, q(0), 'no error after dropping subscription');
-
-$node_subscriber->stop('fast');
-$node_publisher->stop('fast');
-
-done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c6b302c7b2b..d9b83f744fb 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1943,9 +1943,10 @@ PgStat_MsgResetreplslotcounter
PgStat_MsgResetsharedcounter
PgStat_MsgResetsinglecounter
PgStat_MsgResetslrucounter
+PgStat_MsgResetsubcounter
PgStat_MsgSLRU
-PgStat_MsgSubscriptionPurge
-PgStat_MsgSubWorkerError
+PgStat_MsgSubscriptionDrop
+PgStat_MsgSubscriptionError
PgStat_MsgTabpurge
PgStat_MsgTabstat
PgStat_MsgTempFile
@@ -1957,8 +1958,7 @@ PgStat_Single_Reset_Type
PgStat_StatDBEntry
PgStat_StatFuncEntry
PgStat_StatReplSlotEntry
-PgStat_StatSubWorkerEntry
-PgStat_StatSubWorkerKey
+PgStat_StatSubEntry
PgStat_StatTabEntry
PgStat_SubXactStatus
PgStat_TableCounts