1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-11 10:01:57 +03:00

Add a view to show the stats of subscription workers.

This commit adds a new system view pg_stat_subscription_workers, that
shows information about any errors which occur during the application of
logical replication changes as well as during performing initial table
synchronization. The subscription statistics entries are removed when the
corresponding subscription is removed.

It also adds an SQL function pg_stat_reset_subscription_worker() to reset
single subscription errors.

The contents of this view can be used by an upcoming patch that skips the
particular transaction that conflicts with the existing data on the
subscriber.

This view can be extended in the future to track other xact related
statistics like the number of xacts committed/aborted for subscription
workers.

Author: Masahiko Sawada
Reviewed-by: Greg Nancarrow, Hou Zhijie, Tang Haiying, Vignesh C, Dilip Kumar, Takamichi Osumi, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
This commit is contained in:
Amit Kapila
2021-11-30 08:54:30 +05:30
parent 98105e53e0
commit 8d74fc96db
13 changed files with 1069 additions and 27 deletions

View File

@ -14,6 +14,7 @@
#include "datatype/timestamp.h"
#include "portability/instr_time.h"
#include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */
#include "replication/logicalproto.h"
#include "utils/backend_progress.h" /* for backward compatibility */
#include "utils/backend_status.h" /* for backward compatibility */
#include "utils/hsearch.h"
@ -83,6 +84,8 @@ typedef enum StatMsgType
PGSTAT_MTYPE_REPLSLOT,
PGSTAT_MTYPE_CONNECT,
PGSTAT_MTYPE_DISCONNECT,
PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
PGSTAT_MTYPE_SUBWORKERERROR,
} StatMsgType;
/* ----------
@ -145,7 +148,8 @@ typedef enum PgStat_Shared_Reset_Target
typedef enum PgStat_Single_Reset_Type
{
RESET_TABLE,
RESET_FUNCTION
RESET_FUNCTION,
RESET_SUBWORKER
} PgStat_Single_Reset_Type;
/* ------------------------------------------------------------
@ -364,6 +368,7 @@ typedef struct PgStat_MsgResetsinglecounter
Oid m_databaseid;
PgStat_Single_Reset_Type m_resettype;
Oid m_objectid;
Oid m_subobjectid;
} PgStat_MsgResetsinglecounter;
/* ----------
@ -536,6 +541,54 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_total_bytes;
} PgStat_MsgReplSlot;
/* ----------
* PgStat_MsgSubscriptionPurge Sent by the backend and autovacuum to tell the
* collector about the dead subscriptions.
* ----------
*/
#define PGSTAT_NUM_SUBSCRIPTIONPURGE \
((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid))
typedef struct PgStat_MsgSubscriptionPurge
{
PgStat_MsgHdr m_hdr;
Oid m_databaseid;
int m_nentries;
Oid m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE];
} PgStat_MsgSubscriptionPurge;
/* ----------
* PgStat_MsgSubWorkerError Sent by the apply worker or the table sync
* worker to report the error occurred while
* processing changes.
* ----------
*/
#define PGSTAT_SUBWORKERERROR_MSGLEN 256
typedef struct PgStat_MsgSubWorkerError
{
PgStat_MsgHdr m_hdr;
/*
* m_subid and m_subrelid are used to determine the subscription and the
* reporter of the error. m_subrelid is InvalidOid if reported by an apply
* worker otherwise reported by a table sync worker.
*/
Oid m_databaseid;
Oid m_subid;
Oid m_subrelid;
/*
* Oid of the table that the reporter was actually processing. m_relid can
* be InvalidOid if an error occurred during worker applying a
* non-data-modification message such as RELATION.
*/
Oid m_relid;
LogicalRepMsgType m_command;
TransactionId m_xid;
TimestampTz m_timestamp;
char m_message[PGSTAT_SUBWORKERERROR_MSGLEN];
} PgStat_MsgSubWorkerError;
/* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
@ -714,6 +767,8 @@ typedef union PgStat_Msg
PgStat_MsgReplSlot msg_replslot;
PgStat_MsgConnect msg_connect;
PgStat_MsgDisconnect msg_disconnect;
PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
PgStat_MsgSubWorkerError msg_subworkererror;
} PgStat_Msg;
@ -725,7 +780,7 @@ typedef union PgStat_Msg
* ------------------------------------------------------------
*/
#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA4
#define PGSTAT_FILE_FORMAT_ID 0x01A5BCA5
/* ----------
* PgStat_StatDBEntry The collector's data per database
@ -768,11 +823,16 @@ typedef struct PgStat_StatDBEntry
TimestampTz stats_timestamp; /* time of db stats file update */
/*
* tables and functions must be last in the struct, because we don't write
* the pointers out to the stats file.
* tables, functions, and subscription workers must be last in the struct,
* because we don't write the pointers out to the stats file.
*
* subworkers is the hash table of PgStat_StatSubWorkerEntry which stores
* statistics of logical replication workers: apply worker and table sync
* worker.
*/
HTAB *tables;
HTAB *functions;
HTAB *subworkers;
} PgStat_StatDBEntry;
@ -929,6 +989,38 @@ typedef struct PgStat_StatReplSlotEntry
TimestampTz stat_reset_timestamp;
} PgStat_StatReplSlotEntry;
/* The lookup key for subscription worker hash table */
typedef struct PgStat_StatSubWorkerKey
{
Oid subid;
/*
* Oid of the table for which tablesync worker will copy the initial data.
* An InvalidOid will be assigned for apply workers.
*/
Oid subrelid;
} PgStat_StatSubWorkerKey;
/*
* Logical replication apply worker and table sync worker statistics kept in the
* stats collector.
*/
typedef struct PgStat_StatSubWorkerEntry
{
PgStat_StatSubWorkerKey key; /* hash key (must be first) */
/*
* Subscription worker error statistics representing an error that
* occurred during application of changes or the initial table
* synchronization.
*/
Oid last_error_relid;
LogicalRepMsgType last_error_command;
TransactionId last_error_xid;
PgStat_Counter last_error_count;
TimestampTz last_error_time;
char last_error_message[PGSTAT_SUBWORKERERROR_MSGLEN];
} PgStat_StatSubWorkerEntry;
/*
* Working state needed to accumulate per-function-call timing statistics.
@ -1019,7 +1111,8 @@ extern void pgstat_drop_database(Oid databaseid);
extern void pgstat_clear_snapshot(void);
extern void pgstat_reset_counters(void);
extern void pgstat_reset_shared_counters(const char *);
extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
extern void pgstat_reset_single_counter(Oid objectid, Oid subobjectid,
PgStat_Single_Reset_Type type);
extern void pgstat_reset_slru_counter(const char *);
extern void pgstat_reset_replslot_counter(const char *name);
@ -1038,6 +1131,10 @@ extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
extern void pgstat_report_replslot_create(const char *slotname);
extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
LogicalRepMsgType command,
TransactionId xid, const char *errmsg);
extern void pgstat_report_subscription_drop(Oid subid);
extern void pgstat_initialize(void);
@ -1129,6 +1226,8 @@ extern void pgstat_send_wal(bool force);
extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid);
extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid);
extern PgStat_StatFuncEntry *pgstat_fetch_stat_funcentry(Oid funcid);
extern PgStat_StatSubWorkerEntry *pgstat_fetch_stat_subworker_entry(Oid subid,
Oid subrelid);
extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
extern PgStat_BgWriterStats *pgstat_fetch_stat_bgwriter(void);
extern PgStat_CheckpointerStats *pgstat_fetch_stat_checkpointer(void);