mirror of
https://github.com/postgres/postgres.git
synced 2025-09-02 04:21:28 +03:00
pgstat: scaffolding for transactional stats creation / drop.
One problematic part of the current statistics collector design is that there is no reliable way of getting rid of statistics entries. Because of that pgstat_vacuum_stat() (called by [auto-]vacuum) matches all stats for the current database with the catalog contents and tries to drop now-superfluous entries. That's quite expensive. What's worse, it doesn't work on physical replicas, despite physical replicas collection statistics entries. This commit introduces infrastructure to create / drop statistics entries transactionally, together with the underlying catalog objects (functions, relations, subscriptions). pgstat_xact.c maintains a list of stats entries created / dropped transactionally in the current transaction. To ensure the removal of statistics entries is durable dropped statistics entries are included in commit / abort (and prepare) records, which also ensures that stats entries are dropped on standbys. Statistics entries created separately from creating the underlying catalog object (e.g. when stats were previously lost due to an immediate restart) are *not* WAL logged. However that can only happen outside of the transaction creating the catalog object, so it does not lead to "leaked" statistics entries. For this to work, functions creating / dropping functions / relations / subscriptions need to call into pgstat. For subscriptions this was already done when dropping subscriptions, via pgstat_report_subscription_drop() (now renamed to pgstat_drop_subscription()). This commit does not actually drop stats yet, it just provides the infrastructure. It is however a largely independent piece of infrastructure, so committing it separately makes sense. Bumps XLOG_PAGE_MAGIC. Author: Andres Freund <andres@anarazel.de> Reviewed-By: Thomas Munro <thomas.munro@gmail.com> Reviewed-By: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/20220303021600.hs34ghqcw6zcokdh@alap3.anarazel.de
This commit is contained in:
@@ -84,6 +84,17 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
|
||||
data += xl_relfilenodes->nrels * sizeof(RelFileNode);
|
||||
}
|
||||
|
||||
if (parsed->xinfo & XACT_XINFO_HAS_DROPPED_STATS)
|
||||
{
|
||||
xl_xact_stats_items *xl_drops = (xl_xact_stats_items *) data;
|
||||
|
||||
parsed->nstats = xl_drops->nitems;
|
||||
parsed->stats = xl_drops->items;
|
||||
|
||||
data += MinSizeOfXactStatsItems;
|
||||
data += xl_drops->nitems * sizeof(xl_xact_stats_item);
|
||||
}
|
||||
|
||||
if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
|
||||
{
|
||||
xl_xact_invals *xl_invals = (xl_xact_invals *) data;
|
||||
@@ -179,6 +190,17 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
|
||||
data += xl_relfilenodes->nrels * sizeof(RelFileNode);
|
||||
}
|
||||
|
||||
if (parsed->xinfo & XACT_XINFO_HAS_DROPPED_STATS)
|
||||
{
|
||||
xl_xact_stats_items *xl_drops = (xl_xact_stats_items *) data;
|
||||
|
||||
parsed->nstats = xl_drops->nitems;
|
||||
parsed->stats = xl_drops->items;
|
||||
|
||||
data += MinSizeOfXactStatsItems;
|
||||
data += xl_drops->nitems * sizeof(xl_xact_stats_item);
|
||||
}
|
||||
|
||||
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
|
||||
{
|
||||
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
|
||||
@@ -244,6 +266,12 @@ ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parsed_prepare *p
|
||||
parsed->abortnodes = (RelFileNode *) bufptr;
|
||||
bufptr += MAXALIGN(xlrec->nabortrels * sizeof(RelFileNode));
|
||||
|
||||
parsed->stats = (xl_xact_stats_item *) bufptr;
|
||||
bufptr += MAXALIGN(xlrec->ncommitstats * sizeof(xl_xact_stats_item));
|
||||
|
||||
parsed->abortstats = (xl_xact_stats_item *) bufptr;
|
||||
bufptr += MAXALIGN(xlrec->nabortstats * sizeof(xl_xact_stats_item));
|
||||
|
||||
parsed->msgs = (SharedInvalidationMessage *) bufptr;
|
||||
bufptr += MAXALIGN(xlrec->ninvalmsgs * sizeof(SharedInvalidationMessage));
|
||||
}
|
||||
@@ -280,6 +308,25 @@ xact_desc_subxacts(StringInfo buf, int nsubxacts, TransactionId *subxacts)
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
xact_desc_stats(StringInfo buf, const char *label,
|
||||
int ndropped, xl_xact_stats_item *dropped_stats)
|
||||
{
|
||||
int i;
|
||||
|
||||
if (ndropped > 0)
|
||||
{
|
||||
appendStringInfo(buf, "; %sdropped stats:", label);
|
||||
for (i = 0; i < ndropped; i++)
|
||||
{
|
||||
appendStringInfo(buf, " %u/%u/%u",
|
||||
dropped_stats[i].kind,
|
||||
dropped_stats[i].dboid,
|
||||
dropped_stats[i].objoid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
|
||||
{
|
||||
@@ -295,6 +342,7 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
|
||||
|
||||
xact_desc_relations(buf, "rels", parsed.nrels, parsed.xnodes);
|
||||
xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
|
||||
xact_desc_stats(buf, "", parsed.nstats, parsed.stats);
|
||||
|
||||
standby_desc_invalidations(buf, parsed.nmsgs, parsed.msgs, parsed.dbId,
|
||||
parsed.tsId,
|
||||
@@ -338,6 +386,8 @@ xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, RepOriginId or
|
||||
LSN_FORMAT_ARGS(parsed.origin_lsn),
|
||||
timestamptz_to_str(parsed.origin_timestamp));
|
||||
}
|
||||
|
||||
xact_desc_stats(buf, "", parsed.nstats, parsed.stats);
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -353,6 +403,8 @@ xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, RepOriginI
|
||||
xact_desc_relations(buf, "rels(commit)", parsed.nrels, parsed.xnodes);
|
||||
xact_desc_relations(buf, "rels(abort)", parsed.nabortrels,
|
||||
parsed.abortnodes);
|
||||
xact_desc_stats(buf, "commit ", parsed.nstats, parsed.stats);
|
||||
xact_desc_stats(buf, "abort ", parsed.nabortstats, parsed.abortstats);
|
||||
xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
|
||||
|
||||
standby_desc_invalidations(buf, parsed.nmsgs, parsed.msgs, parsed.dbId,
|
||||
|
@@ -205,6 +205,8 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
|
||||
TransactionId *children,
|
||||
int nrels,
|
||||
RelFileNode *rels,
|
||||
int nstats,
|
||||
xl_xact_stats_item *stats,
|
||||
int ninvalmsgs,
|
||||
SharedInvalidationMessage *invalmsgs,
|
||||
bool initfileinval,
|
||||
@@ -214,6 +216,8 @@ static void RecordTransactionAbortPrepared(TransactionId xid,
|
||||
TransactionId *children,
|
||||
int nrels,
|
||||
RelFileNode *rels,
|
||||
int nstats,
|
||||
xl_xact_stats_item *stats,
|
||||
const char *gid);
|
||||
static void ProcessRecords(char *bufptr, TransactionId xid,
|
||||
const TwoPhaseCallback callbacks[]);
|
||||
@@ -1046,6 +1050,8 @@ StartPrepare(GlobalTransaction gxact)
|
||||
TransactionId *children;
|
||||
RelFileNode *commitrels;
|
||||
RelFileNode *abortrels;
|
||||
xl_xact_stats_item *abortstats = NULL;
|
||||
xl_xact_stats_item *commitstats = NULL;
|
||||
SharedInvalidationMessage *invalmsgs;
|
||||
|
||||
/* Initialize linked list */
|
||||
@@ -1071,6 +1077,10 @@ StartPrepare(GlobalTransaction gxact)
|
||||
hdr.nsubxacts = xactGetCommittedChildren(&children);
|
||||
hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
|
||||
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
|
||||
hdr.ncommitstats =
|
||||
pgstat_get_transactional_drops(true, &commitstats);
|
||||
hdr.nabortstats =
|
||||
pgstat_get_transactional_drops(false, &abortstats);
|
||||
hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
|
||||
&hdr.initfileinval);
|
||||
hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
|
||||
@@ -1101,6 +1111,18 @@ StartPrepare(GlobalTransaction gxact)
|
||||
save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
|
||||
pfree(abortrels);
|
||||
}
|
||||
if (hdr.ncommitstats > 0)
|
||||
{
|
||||
save_state_data(commitstats,
|
||||
hdr.ncommitstats * sizeof(xl_xact_stats_item));
|
||||
pfree(commitstats);
|
||||
}
|
||||
if (hdr.nabortstats > 0)
|
||||
{
|
||||
save_state_data(abortstats,
|
||||
hdr.nabortstats * sizeof(xl_xact_stats_item));
|
||||
pfree(abortstats);
|
||||
}
|
||||
if (hdr.ninvalmsgs > 0)
|
||||
{
|
||||
save_state_data(invalmsgs,
|
||||
@@ -1472,6 +1494,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
||||
RelFileNode *abortrels;
|
||||
RelFileNode *delrels;
|
||||
int ndelrels;
|
||||
xl_xact_stats_item *commitstats;
|
||||
xl_xact_stats_item *abortstats;
|
||||
SharedInvalidationMessage *invalmsgs;
|
||||
|
||||
/*
|
||||
@@ -1506,6 +1530,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
||||
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
|
||||
abortrels = (RelFileNode *) bufptr;
|
||||
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
|
||||
commitstats = (xl_xact_stats_item*) bufptr;
|
||||
bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
|
||||
abortstats = (xl_xact_stats_item*) bufptr;
|
||||
bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
|
||||
invalmsgs = (SharedInvalidationMessage *) bufptr;
|
||||
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
|
||||
|
||||
@@ -1527,12 +1555,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
||||
RecordTransactionCommitPrepared(xid,
|
||||
hdr->nsubxacts, children,
|
||||
hdr->ncommitrels, commitrels,
|
||||
hdr->ncommitstats,
|
||||
commitstats,
|
||||
hdr->ninvalmsgs, invalmsgs,
|
||||
hdr->initfileinval, gid);
|
||||
else
|
||||
RecordTransactionAbortPrepared(xid,
|
||||
hdr->nsubxacts, children,
|
||||
hdr->nabortrels, abortrels,
|
||||
hdr->nabortstats,
|
||||
abortstats,
|
||||
gid);
|
||||
|
||||
ProcArrayRemove(proc, latestXid);
|
||||
@@ -1568,6 +1600,11 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
|
||||
/* Make sure files supposed to be dropped are dropped */
|
||||
DropRelationFiles(delrels, ndelrels, false);
|
||||
|
||||
if (isCommit)
|
||||
pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
|
||||
else
|
||||
pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
|
||||
|
||||
/*
|
||||
* Handle cache invalidation messages.
|
||||
*
|
||||
@@ -2066,6 +2103,8 @@ RecoverPreparedTransactions(void)
|
||||
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
|
||||
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
|
||||
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
|
||||
bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
|
||||
bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
|
||||
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
|
||||
|
||||
/*
|
||||
@@ -2248,6 +2287,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
|
||||
TransactionId *children,
|
||||
int nrels,
|
||||
RelFileNode *rels,
|
||||
int nstats,
|
||||
xl_xact_stats_item *stats,
|
||||
int ninvalmsgs,
|
||||
SharedInvalidationMessage *invalmsgs,
|
||||
bool initfileinval,
|
||||
@@ -2277,6 +2318,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
|
||||
*/
|
||||
recptr = XactLogCommitRecord(committs,
|
||||
nchildren, children, nrels, rels,
|
||||
nstats, stats,
|
||||
ninvalmsgs, invalmsgs,
|
||||
initfileinval,
|
||||
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
|
||||
@@ -2343,6 +2385,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
|
||||
TransactionId *children,
|
||||
int nrels,
|
||||
RelFileNode *rels,
|
||||
int nstats,
|
||||
xl_xact_stats_item *stats,
|
||||
const char *gid)
|
||||
{
|
||||
XLogRecPtr recptr;
|
||||
@@ -2373,6 +2417,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
|
||||
recptr = XactLogAbortRecord(GetCurrentTimestamp(),
|
||||
nchildren, children,
|
||||
nrels, rels,
|
||||
nstats, stats,
|
||||
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
|
||||
xid, gid);
|
||||
|
||||
|
@@ -1285,6 +1285,8 @@ RecordTransactionCommit(void)
|
||||
RelFileNode *rels;
|
||||
int nchildren;
|
||||
TransactionId *children;
|
||||
int ndroppedstats = 0;
|
||||
xl_xact_stats_item *droppedstats = NULL;
|
||||
int nmsgs = 0;
|
||||
SharedInvalidationMessage *invalMessages = NULL;
|
||||
bool RelcacheInitFileInval = false;
|
||||
@@ -1303,6 +1305,7 @@ RecordTransactionCommit(void)
|
||||
/* Get data needed for commit record */
|
||||
nrels = smgrGetPendingDeletes(true, &rels);
|
||||
nchildren = xactGetCommittedChildren(&children);
|
||||
ndroppedstats = pgstat_get_transactional_drops(true, &droppedstats);
|
||||
if (XLogStandbyInfoActive())
|
||||
nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
|
||||
&RelcacheInitFileInval);
|
||||
@@ -1317,10 +1320,12 @@ RecordTransactionCommit(void)
|
||||
/*
|
||||
* We expect that every RelationDropStorage is followed by a catalog
|
||||
* update, and hence XID assignment, so we shouldn't get here with any
|
||||
* pending deletes. Use a real test not just an Assert to check this,
|
||||
* since it's a bit fragile.
|
||||
* pending deletes. Same is true for dropping stats.
|
||||
*
|
||||
* Use a real test not just an Assert to check this, since it's a bit
|
||||
* fragile.
|
||||
*/
|
||||
if (nrels != 0)
|
||||
if (nrels != 0 || ndroppedstats != 0)
|
||||
elog(ERROR, "cannot commit a transaction that deleted files but has no xid");
|
||||
|
||||
/* Can't have child XIDs either; AssignTransactionId enforces this */
|
||||
@@ -1395,6 +1400,7 @@ RecordTransactionCommit(void)
|
||||
|
||||
XactLogCommitRecord(xactStopTimestamp,
|
||||
nchildren, children, nrels, rels,
|
||||
ndroppedstats, droppedstats,
|
||||
nmsgs, invalMessages,
|
||||
RelcacheInitFileInval,
|
||||
MyXactFlags,
|
||||
@@ -1518,6 +1524,8 @@ cleanup:
|
||||
/* Clean up local data */
|
||||
if (rels)
|
||||
pfree(rels);
|
||||
if (ndroppedstats)
|
||||
pfree(droppedstats);
|
||||
|
||||
return latestXid;
|
||||
}
|
||||
@@ -1698,6 +1706,8 @@ RecordTransactionAbort(bool isSubXact)
|
||||
TransactionId latestXid;
|
||||
int nrels;
|
||||
RelFileNode *rels;
|
||||
int ndroppedstats = 0;
|
||||
xl_xact_stats_item *droppedstats = NULL;
|
||||
int nchildren;
|
||||
TransactionId *children;
|
||||
TimestampTz xact_time;
|
||||
@@ -1734,6 +1744,7 @@ RecordTransactionAbort(bool isSubXact)
|
||||
/* Fetch the data we need for the abort record */
|
||||
nrels = smgrGetPendingDeletes(false, &rels);
|
||||
nchildren = xactGetCommittedChildren(&children);
|
||||
ndroppedstats = pgstat_get_transactional_drops(false, &droppedstats);
|
||||
|
||||
/* XXX do we really need a critical section here? */
|
||||
START_CRIT_SECTION();
|
||||
@@ -1750,6 +1761,7 @@ RecordTransactionAbort(bool isSubXact)
|
||||
XactLogAbortRecord(xact_time,
|
||||
nchildren, children,
|
||||
nrels, rels,
|
||||
ndroppedstats, droppedstats,
|
||||
MyXactFlags, InvalidTransactionId,
|
||||
NULL);
|
||||
|
||||
@@ -1796,6 +1808,8 @@ RecordTransactionAbort(bool isSubXact)
|
||||
/* And clean up local data */
|
||||
if (rels)
|
||||
pfree(rels);
|
||||
if (ndroppedstats)
|
||||
pfree(droppedstats);
|
||||
|
||||
return latestXid;
|
||||
}
|
||||
@@ -5573,6 +5587,7 @@ XLogRecPtr
|
||||
XactLogCommitRecord(TimestampTz commit_time,
|
||||
int nsubxacts, TransactionId *subxacts,
|
||||
int nrels, RelFileNode *rels,
|
||||
int ndroppedstats, xl_xact_stats_item *droppedstats,
|
||||
int nmsgs, SharedInvalidationMessage *msgs,
|
||||
bool relcacheInval,
|
||||
int xactflags, TransactionId twophase_xid,
|
||||
@@ -5583,6 +5598,7 @@ XactLogCommitRecord(TimestampTz commit_time,
|
||||
xl_xact_dbinfo xl_dbinfo;
|
||||
xl_xact_subxacts xl_subxacts;
|
||||
xl_xact_relfilenodes xl_relfilenodes;
|
||||
xl_xact_stats_items xl_dropped_stats;
|
||||
xl_xact_invals xl_invals;
|
||||
xl_xact_twophase xl_twophase;
|
||||
xl_xact_origin xl_origin;
|
||||
@@ -5640,6 +5656,12 @@ XactLogCommitRecord(TimestampTz commit_time,
|
||||
info |= XLR_SPECIAL_REL_UPDATE;
|
||||
}
|
||||
|
||||
if (ndroppedstats > 0)
|
||||
{
|
||||
xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
|
||||
xl_dropped_stats.nitems = ndroppedstats;
|
||||
}
|
||||
|
||||
if (nmsgs > 0)
|
||||
{
|
||||
xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
|
||||
@@ -5696,6 +5718,14 @@ XactLogCommitRecord(TimestampTz commit_time,
|
||||
nrels * sizeof(RelFileNode));
|
||||
}
|
||||
|
||||
if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
|
||||
{
|
||||
XLogRegisterData((char *) (&xl_dropped_stats),
|
||||
MinSizeOfXactStatsItems);
|
||||
XLogRegisterData((char *) droppedstats,
|
||||
ndroppedstats * sizeof(xl_xact_stats_item));
|
||||
}
|
||||
|
||||
if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS)
|
||||
{
|
||||
XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
|
||||
@@ -5729,6 +5759,7 @@ XLogRecPtr
|
||||
XactLogAbortRecord(TimestampTz abort_time,
|
||||
int nsubxacts, TransactionId *subxacts,
|
||||
int nrels, RelFileNode *rels,
|
||||
int ndroppedstats, xl_xact_stats_item *droppedstats,
|
||||
int xactflags, TransactionId twophase_xid,
|
||||
const char *twophase_gid)
|
||||
{
|
||||
@@ -5736,6 +5767,7 @@ XactLogAbortRecord(TimestampTz abort_time,
|
||||
xl_xact_xinfo xl_xinfo;
|
||||
xl_xact_subxacts xl_subxacts;
|
||||
xl_xact_relfilenodes xl_relfilenodes;
|
||||
xl_xact_stats_items xl_dropped_stats;
|
||||
xl_xact_twophase xl_twophase;
|
||||
xl_xact_dbinfo xl_dbinfo;
|
||||
xl_xact_origin xl_origin;
|
||||
@@ -5773,6 +5805,12 @@ XactLogAbortRecord(TimestampTz abort_time,
|
||||
info |= XLR_SPECIAL_REL_UPDATE;
|
||||
}
|
||||
|
||||
if (ndroppedstats > 0)
|
||||
{
|
||||
xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
|
||||
xl_dropped_stats.nitems = ndroppedstats;
|
||||
}
|
||||
|
||||
if (TransactionIdIsValid(twophase_xid))
|
||||
{
|
||||
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
|
||||
@@ -5834,6 +5872,14 @@ XactLogAbortRecord(TimestampTz abort_time,
|
||||
nrels * sizeof(RelFileNode));
|
||||
}
|
||||
|
||||
if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
|
||||
{
|
||||
XLogRegisterData((char *) (&xl_dropped_stats),
|
||||
MinSizeOfXactStatsItems);
|
||||
XLogRegisterData((char *) droppedstats,
|
||||
ndroppedstats * sizeof(xl_xact_stats_item));
|
||||
}
|
||||
|
||||
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
|
||||
{
|
||||
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
|
||||
@@ -5967,6 +6013,14 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
|
||||
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
|
||||
}
|
||||
|
||||
if (parsed->nstats > 0)
|
||||
{
|
||||
/* see equivalent call for relations above */
|
||||
XLogFlush(lsn);
|
||||
|
||||
pgstat_execute_transactional_drops(parsed->nstats, parsed->stats, true);
|
||||
}
|
||||
|
||||
/*
|
||||
* We issue an XLogFlush() for the same reason we emit ForceSyncCommit()
|
||||
* in normal operation. For example, in CREATE DATABASE, we copy all files
|
||||
@@ -6069,6 +6123,14 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
|
||||
|
||||
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
|
||||
}
|
||||
|
||||
if (parsed->nstats > 0)
|
||||
{
|
||||
/* see equivalent call for relations above */
|
||||
XLogFlush(lsn);
|
||||
|
||||
pgstat_execute_transactional_drops(parsed->nstats, parsed->stats, true);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
@@ -64,6 +64,7 @@
|
||||
#include "parser/parse_relation.h"
|
||||
#include "parser/parsetree.h"
|
||||
#include "partitioning/partdesc.h"
|
||||
#include "pgstat.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/predicate.h"
|
||||
#include "utils/builtins.h"
|
||||
@@ -1475,6 +1476,9 @@ heap_create_with_catalog(const char *relname,
|
||||
if (oncommit != ONCOMMIT_NOOP)
|
||||
register_on_commit_action(relid, oncommit);
|
||||
|
||||
/* ensure that stats are dropped if transaction aborts */
|
||||
pgstat_create_relation(new_rel_desc);
|
||||
|
||||
/*
|
||||
* ok, the relation has been cataloged, so close our relations and return
|
||||
* the OID of the newly created relation.
|
||||
@@ -1851,6 +1855,9 @@ heap_drop_with_catalog(Oid relid)
|
||||
if (RELKIND_HAS_STORAGE(rel->rd_rel->relkind))
|
||||
RelationDropStorage(rel);
|
||||
|
||||
/* ensure that stats are dropped if transaction commits */
|
||||
pgstat_drop_relation(rel);
|
||||
|
||||
/*
|
||||
* Close relcache entry, but *keep* AccessExclusiveLock on the relation
|
||||
* until transaction commit. This ensures no one else will try to do
|
||||
|
@@ -35,6 +35,7 @@
|
||||
#include "parser/analyze.h"
|
||||
#include "parser/parse_coerce.h"
|
||||
#include "parser/parse_type.h"
|
||||
#include "pgstat.h"
|
||||
#include "rewrite/rewriteHandler.h"
|
||||
#include "tcop/pquery.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
@@ -709,6 +710,10 @@ ProcedureCreate(const char *procedureName,
|
||||
AtEOXact_GUC(true, save_nestlevel);
|
||||
}
|
||||
|
||||
/* ensure that stats are dropped if transaction commits */
|
||||
if (!is_update)
|
||||
pgstat_create_function(retval);
|
||||
|
||||
return myself;
|
||||
}
|
||||
|
||||
|
@@ -1325,6 +1325,8 @@ RemoveFunctionById(Oid funcOid)
|
||||
|
||||
table_close(relation, RowExclusiveLock);
|
||||
|
||||
pgstat_drop_function(funcOid);
|
||||
|
||||
/*
|
||||
* If there's a pg_aggregate tuple, delete that too.
|
||||
*/
|
||||
|
@@ -738,6 +738,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
||||
|
||||
table_close(rel, RowExclusiveLock);
|
||||
|
||||
pgstat_create_subscription(subid);
|
||||
|
||||
if (opts.enabled)
|
||||
ApplyLauncherWakeupAtCommit();
|
||||
|
||||
@@ -1592,7 +1594,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
|
||||
* gets lost.
|
||||
*/
|
||||
if (slotname)
|
||||
pgstat_report_subscription_drop(subid);
|
||||
pgstat_drop_subscription(subid);
|
||||
|
||||
table_close(rel, NoLock);
|
||||
}
|
||||
|
@@ -886,7 +886,7 @@ pgstat_vacuum_stat(void)
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
if (hash_search(htab, (void *) &(subentry->subid), HASH_FIND, NULL) == NULL)
|
||||
pgstat_report_subscription_drop(subentry->subid);
|
||||
pgstat_drop_subscription(subentry->subid);
|
||||
}
|
||||
|
||||
hash_destroy(htab);
|
||||
|
@@ -415,11 +415,6 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
|
||||
smgrsw[which].smgr_close(rels[i], forknum);
|
||||
}
|
||||
|
||||
/*
|
||||
* It'd be nice to tell the stats collector to forget them immediately,
|
||||
* too. But we can't because we don't know the OIDs.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Send a shared-inval message to force other backends to close any
|
||||
* dangling smgr references they may have for these rels. We should do
|
||||
|
@@ -48,6 +48,28 @@ static HTAB *pgStatFunctions = NULL;
|
||||
static instr_time total_func_time;
|
||||
|
||||
|
||||
/*
|
||||
* Ensure that stats are dropped if transaction aborts.
|
||||
*/
|
||||
void
|
||||
pgstat_create_function(Oid proid)
|
||||
{
|
||||
pgstat_create_transactional(PGSTAT_KIND_FUNCTION,
|
||||
MyDatabaseId,
|
||||
proid);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ensure that stats are dropped if transaction commits.
|
||||
*/
|
||||
void
|
||||
pgstat_drop_function(Oid proid)
|
||||
{
|
||||
pgstat_drop_transactional(PGSTAT_KIND_FUNCTION,
|
||||
MyDatabaseId,
|
||||
proid);
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize function call usage data.
|
||||
* Called by the executor before invoking a function.
|
||||
|
@@ -171,33 +171,26 @@ pgstat_relation_init(Relation rel)
|
||||
}
|
||||
|
||||
/*
|
||||
* Tell the collector that we just dropped a relation.
|
||||
* (If the message gets lost, we will still clean the dead entry eventually
|
||||
* via future invocations of pgstat_vacuum_stat().)
|
||||
*
|
||||
* Currently not used for lack of any good place to call it; we rely
|
||||
* entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
|
||||
* Ensure that stats are dropped if transaction aborts.
|
||||
*/
|
||||
#ifdef NOT_USED
|
||||
void
|
||||
pgstat_drop_relation(Oid relid)
|
||||
pgstat_create_relation(Relation rel)
|
||||
{
|
||||
PgStat_MsgTabpurge msg;
|
||||
int len;
|
||||
|
||||
if (pgStatSock == PGINVALID_SOCKET)
|
||||
return;
|
||||
|
||||
msg.m_tableid[0] = relid;
|
||||
msg.m_nentries = 1;
|
||||
|
||||
len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
|
||||
|
||||
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
|
||||
msg.m_databaseid = MyDatabaseId;
|
||||
pgstat_send(&msg, len);
|
||||
pgstat_create_transactional(PGSTAT_KIND_RELATION,
|
||||
rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId,
|
||||
RelationGetRelid(rel));
|
||||
}
|
||||
|
||||
/*
|
||||
* Ensure that stats are dropped if transaction commits.
|
||||
*/
|
||||
void
|
||||
pgstat_drop_relation(Relation rel)
|
||||
{
|
||||
pgstat_drop_transactional(PGSTAT_KIND_RELATION,
|
||||
rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId,
|
||||
RelationGetRelid(rel));
|
||||
}
|
||||
#endif /* NOT_USED */
|
||||
|
||||
/*
|
||||
* Report that the table was just vacuumed.
|
||||
|
@@ -35,14 +35,31 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
|
||||
}
|
||||
|
||||
/*
|
||||
* Report dropping the subscription.
|
||||
* Report creating the subscription.
|
||||
*
|
||||
* Ensures that stats are dropped if transaction rolls back.
|
||||
*/
|
||||
void
|
||||
pgstat_report_subscription_drop(Oid subid)
|
||||
pgstat_create_subscription(Oid subid)
|
||||
{
|
||||
pgstat_create_transactional(PGSTAT_KIND_SUBSCRIPTION,
|
||||
InvalidOid, subid);
|
||||
}
|
||||
|
||||
/*
|
||||
* Report dropping the subscription.
|
||||
*
|
||||
* Ensures that stats are dropped if transaction commits.
|
||||
*/
|
||||
void
|
||||
pgstat_drop_subscription(Oid subid)
|
||||
{
|
||||
PgStat_MsgSubscriptionDrop msg;
|
||||
|
||||
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONDROP);
|
||||
msg.m_subid = subid;
|
||||
pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionDrop));
|
||||
|
||||
pgstat_drop_transactional(PGSTAT_KIND_SUBSCRIPTION,
|
||||
InvalidOid, subid);
|
||||
}
|
||||
|
@@ -19,6 +19,18 @@
|
||||
#include "utils/pgstat_internal.h"
|
||||
|
||||
|
||||
typedef struct PgStat_PendingDroppedStatsItem
|
||||
{
|
||||
xl_xact_stats_item item;
|
||||
bool is_create;
|
||||
dlist_node node;
|
||||
} PgStat_PendingDroppedStatsItem;
|
||||
|
||||
|
||||
static void AtEOXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state, bool isCommit);
|
||||
static void AtEOSubXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state,
|
||||
bool isCommit, int nestDepth);
|
||||
|
||||
static PgStat_SubXactStatus *pgStatXactStack = NULL;
|
||||
|
||||
|
||||
@@ -40,6 +52,7 @@ AtEOXact_PgStat(bool isCommit, bool parallel)
|
||||
Assert(xact_state->prev == NULL);
|
||||
|
||||
AtEOXact_PgStat_Relations(xact_state, isCommit);
|
||||
AtEOXact_PgStat_DroppedStats(xact_state, isCommit);
|
||||
}
|
||||
pgStatXactStack = NULL;
|
||||
|
||||
@@ -47,6 +60,49 @@ AtEOXact_PgStat(bool isCommit, bool parallel)
|
||||
pgstat_clear_snapshot();
|
||||
}
|
||||
|
||||
/*
|
||||
* When committing, drop stats for objects dropped in the transaction. When
|
||||
* aborting, drop stats for objects created in the transaction.
|
||||
*/
|
||||
static void
|
||||
AtEOXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state, bool isCommit)
|
||||
{
|
||||
dlist_mutable_iter iter;
|
||||
|
||||
if (xact_state->pending_drops_count == 0)
|
||||
{
|
||||
Assert(dlist_is_empty(&xact_state->pending_drops));
|
||||
return;
|
||||
}
|
||||
|
||||
dlist_foreach_modify(iter, &xact_state->pending_drops)
|
||||
{
|
||||
PgStat_PendingDroppedStatsItem *pending =
|
||||
dlist_container(PgStat_PendingDroppedStatsItem, node, iter.cur);
|
||||
|
||||
if (isCommit && !pending->is_create)
|
||||
{
|
||||
/*
|
||||
* Transaction that dropped an object committed. Drop the stats
|
||||
* too.
|
||||
*/
|
||||
/* will do work in subsequent commit */
|
||||
}
|
||||
else if (!isCommit && pending->is_create)
|
||||
{
|
||||
/*
|
||||
* Transaction that created an object aborted. Drop the stats
|
||||
* associated with the object.
|
||||
*/
|
||||
/* will do work in subsequent commit */
|
||||
}
|
||||
|
||||
dlist_delete(&pending->node);
|
||||
xact_state->pending_drops_count--;
|
||||
pfree(pending);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Called from access/transam/xact.c at subtransaction commit/abort.
|
||||
*/
|
||||
@@ -64,11 +120,63 @@ AtEOSubXact_PgStat(bool isCommit, int nestDepth)
|
||||
pgStatXactStack = xact_state->prev;
|
||||
|
||||
AtEOSubXact_PgStat_Relations(xact_state, isCommit, nestDepth);
|
||||
AtEOSubXact_PgStat_DroppedStats(xact_state, isCommit, nestDepth);
|
||||
|
||||
pfree(xact_state);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Like AtEOXact_PgStat_DroppedStats(), but for subtransactions.
|
||||
*/
|
||||
static void
|
||||
AtEOSubXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state,
|
||||
bool isCommit, int nestDepth)
|
||||
{
|
||||
PgStat_SubXactStatus *parent_xact_state;
|
||||
dlist_mutable_iter iter;
|
||||
|
||||
if (xact_state->pending_drops_count == 0)
|
||||
return;
|
||||
|
||||
parent_xact_state = pgstat_xact_stack_level_get(nestDepth - 1);
|
||||
|
||||
dlist_foreach_modify(iter, &xact_state->pending_drops)
|
||||
{
|
||||
PgStat_PendingDroppedStatsItem *pending =
|
||||
dlist_container(PgStat_PendingDroppedStatsItem, node, iter.cur);
|
||||
|
||||
dlist_delete(&pending->node);
|
||||
xact_state->pending_drops_count--;
|
||||
|
||||
if (!isCommit && pending->is_create)
|
||||
{
|
||||
/*
|
||||
* Subtransaction creating a new stats object aborted. Drop the
|
||||
* stats object.
|
||||
*/
|
||||
/* will do work in subsequent commit */
|
||||
pfree(pending);
|
||||
}
|
||||
else if (isCommit)
|
||||
{
|
||||
/*
|
||||
* Subtransaction dropping a stats object committed. Can't yet
|
||||
* remove the stats object, the surrounding transaction might
|
||||
* still abort. Pass it on to the parent.
|
||||
*/
|
||||
dlist_push_tail(&parent_xact_state->pending_drops, &pending->node);
|
||||
parent_xact_state->pending_drops_count++;
|
||||
}
|
||||
else
|
||||
{
|
||||
pfree(pending);
|
||||
}
|
||||
}
|
||||
|
||||
Assert(xact_state->pending_drops_count == 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Save the transactional stats state at 2PC transaction prepare.
|
||||
*/
|
||||
@@ -130,6 +238,8 @@ pgstat_xact_stack_level_get(int nest_level)
|
||||
xact_state = (PgStat_SubXactStatus *)
|
||||
MemoryContextAlloc(TopTransactionContext,
|
||||
sizeof(PgStat_SubXactStatus));
|
||||
dlist_init(&xact_state->pending_drops);
|
||||
xact_state->pending_drops_count = 0;
|
||||
xact_state->nest_level = nest_level;
|
||||
xact_state->prev = pgStatXactStack;
|
||||
xact_state->first = NULL;
|
||||
@@ -137,3 +247,116 @@ pgstat_xact_stack_level_get(int nest_level)
|
||||
}
|
||||
return xact_state;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get stat items that need to be dropped at commit / abort.
|
||||
*
|
||||
* When committing, stats for objects that have been dropped in the
|
||||
* transaction are returned. When aborting, stats for newly created objects are
|
||||
* returned.
|
||||
*
|
||||
* Used by COMMIT / ABORT and 2PC PREPARE processing when building their
|
||||
* respective WAL records, to ensure stats are dropped in case of a crash / on
|
||||
* standbys.
|
||||
*
|
||||
* The list of items is allocated in CurrentMemoryContext and must be freed by
|
||||
* the caller (directly or via memory context reset).
|
||||
*/
|
||||
int
|
||||
pgstat_get_transactional_drops(bool isCommit, xl_xact_stats_item **items)
|
||||
{
|
||||
PgStat_SubXactStatus *xact_state = pgStatXactStack;
|
||||
int nitems = 0;
|
||||
dlist_iter iter;
|
||||
|
||||
if (xact_state == NULL)
|
||||
return 0;
|
||||
|
||||
/*
|
||||
* We expect to be called for subtransaction abort (which logs a WAL
|
||||
* record), but not for subtransaction commit (which doesn't).
|
||||
*/
|
||||
Assert(!isCommit || xact_state->nest_level == 1);
|
||||
Assert(!isCommit || xact_state->prev == NULL);
|
||||
|
||||
*items = palloc(xact_state->pending_drops_count
|
||||
* sizeof(xl_xact_stats_item));
|
||||
|
||||
dlist_foreach(iter, &xact_state->pending_drops)
|
||||
{
|
||||
PgStat_PendingDroppedStatsItem *pending =
|
||||
dlist_container(PgStat_PendingDroppedStatsItem, node, iter.cur);
|
||||
|
||||
if (isCommit && pending->is_create)
|
||||
continue;
|
||||
if (!isCommit && !pending->is_create)
|
||||
continue;
|
||||
|
||||
Assert(nitems < xact_state->pending_drops_count);
|
||||
(*items)[nitems++] = pending->item;
|
||||
}
|
||||
|
||||
return nitems;
|
||||
}
|
||||
|
||||
/*
|
||||
* Execute scheduled drops post-commit. Called from xact_redo_commit() /
|
||||
* xact_redo_abort() during recovery, and from FinishPreparedTransaction()
|
||||
* during normal 2PC COMMIT/ABORT PREPARED processing.
|
||||
*/
|
||||
void
|
||||
pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_item *items, bool is_redo)
|
||||
{
|
||||
if (ndrops == 0)
|
||||
return;
|
||||
|
||||
for (int i = 0; i < ndrops; i++)
|
||||
{
|
||||
/* will do work in subsequent commit */
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
create_drop_transactional_internal(PgStat_Kind kind, Oid dboid, Oid objoid, bool is_create)
|
||||
{
|
||||
int nest_level = GetCurrentTransactionNestLevel();
|
||||
PgStat_SubXactStatus *xact_state;
|
||||
PgStat_PendingDroppedStatsItem *drop = (PgStat_PendingDroppedStatsItem *)
|
||||
MemoryContextAlloc(TopTransactionContext, sizeof(PgStat_PendingDroppedStatsItem));
|
||||
|
||||
xact_state = pgstat_xact_stack_level_get(nest_level);
|
||||
|
||||
drop->is_create = is_create;
|
||||
drop->item.kind = kind;
|
||||
drop->item.dboid = dboid;
|
||||
drop->item.objoid = objoid;
|
||||
|
||||
dlist_push_tail(&xact_state->pending_drops, &drop->node);
|
||||
xact_state->pending_drops_count++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a stats entry for a newly created database object in a transactional
|
||||
* manner.
|
||||
*
|
||||
* I.e. if the current (sub-)transaction aborts, the stats entry will also be
|
||||
* dropped.
|
||||
*/
|
||||
void
|
||||
pgstat_create_transactional(PgStat_Kind kind, Oid dboid, Oid objoid)
|
||||
{
|
||||
create_drop_transactional_internal(kind, dboid, objoid, /* create */ true);
|
||||
}
|
||||
|
||||
/*
|
||||
* Drop a stats entry for a just dropped database object in a transactional
|
||||
* manner.
|
||||
*
|
||||
* I.e. if the current (sub-)transaction aborts, the stats entry will stay
|
||||
* alive.
|
||||
*/
|
||||
void
|
||||
pgstat_drop_transactional(PgStat_Kind kind, Oid dboid, Oid objoid)
|
||||
{
|
||||
create_drop_transactional_internal(kind, dboid, objoid, /* create */ false);
|
||||
}
|
||||
|
Reference in New Issue
Block a user