mirror of
https://github.com/postgres/postgres.git
synced 2025-08-30 06:01:21 +03:00
Add infrastructure to track WAL usage.
This allows gathering the WAL generation statistics for each statement execution. The three statistics that we collect are the number of WAL records, the number of full page writes and the amount of WAL bytes generated. This helps the users who have write-intensive workload to see the impact of I/O due to WAL. This further enables us to see approximately what percentage of overall WAL is due to full page writes. In the future, we can extend this functionality to allow us to compute the the exact amount of WAL data due to full page writes. This patch in itself is just an infrastructure to compute WAL usage data. The upcoming patches will expose this data via explain, auto_explain, pg_stat_statements and verbose (auto)vacuum output. Author: Kirill Bychik, Julien Rouhaud Reviewed-by: Dilip Kumar, Fujii Masao and Amit Kapila Discussion: https://postgr.es/m/CAB-hujrP8ZfUkvL5OYETipQwA=e3n7oqHFU=4ZLxWS_Cza3kQQ@mail.gmail.com
This commit is contained in:
@@ -139,6 +139,7 @@
|
||||
#define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2
|
||||
#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
|
||||
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
|
||||
#define PARALLEL_VACUUM_KEY_WAL_USAGE 5
|
||||
|
||||
/*
|
||||
* Macro to check if we are in a parallel vacuum. If true, we are in the
|
||||
@@ -275,6 +276,9 @@ typedef struct LVParallelState
|
||||
/* Points to buffer usage area in DSM */
|
||||
BufferUsage *buffer_usage;
|
||||
|
||||
/* Points to WAL usage area in DSM */
|
||||
WalUsage *wal_usage;
|
||||
|
||||
/*
|
||||
* The number of indexes that support parallel index bulk-deletion and
|
||||
* parallel index cleanup respectively.
|
||||
@@ -2143,8 +2147,8 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
|
||||
vacrelstats->dead_tuples, nindexes, vacrelstats);
|
||||
|
||||
/*
|
||||
* Next, accumulate buffer usage. (This must wait for the workers to
|
||||
* finish, or we might get incomplete data.)
|
||||
* Next, accumulate buffer and WAL usage. (This must wait for the workers
|
||||
* to finish, or we might get incomplete data.)
|
||||
*/
|
||||
if (nworkers > 0)
|
||||
{
|
||||
@@ -2154,7 +2158,7 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
|
||||
WaitForParallelWorkersToFinish(lps->pcxt);
|
||||
|
||||
for (i = 0; i < lps->pcxt->nworkers_launched; i++)
|
||||
InstrAccumParallelQuery(&lps->buffer_usage[i]);
|
||||
InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -3171,6 +3175,7 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
|
||||
LVShared *shared;
|
||||
LVDeadTuples *dead_tuples;
|
||||
BufferUsage *buffer_usage;
|
||||
WalUsage *wal_usage;
|
||||
bool *can_parallel_vacuum;
|
||||
long maxtuples;
|
||||
char *sharedquery;
|
||||
@@ -3255,15 +3260,19 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
|
||||
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||
|
||||
/*
|
||||
* Estimate space for BufferUsage -- PARALLEL_VACUUM_KEY_BUFFER_USAGE.
|
||||
* Estimate space for BufferUsage and WalUsage --
|
||||
* PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
|
||||
*
|
||||
* If there are no extensions loaded that care, we could skip this. We
|
||||
* have no way of knowing whether anyone's looking at pgBufferUsage, so do
|
||||
* it unconditionally.
|
||||
* have no way of knowing whether anyone's looking at pgBufferUsage or
|
||||
* pgWalUsage, so do it unconditionally.
|
||||
*/
|
||||
shm_toc_estimate_chunk(&pcxt->estimator,
|
||||
mul_size(sizeof(BufferUsage), pcxt->nworkers));
|
||||
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||
shm_toc_estimate_chunk(&pcxt->estimator,
|
||||
mul_size(sizeof(WalUsage), pcxt->nworkers));
|
||||
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||
|
||||
/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
|
||||
querylen = strlen(debug_query_string);
|
||||
@@ -3299,11 +3308,18 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
|
||||
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
|
||||
vacrelstats->dead_tuples = dead_tuples;
|
||||
|
||||
/* Allocate space for each worker's BufferUsage; no need to initialize */
|
||||
/*
|
||||
* Allocate space for each worker's BufferUsage and WalUsage; no need to
|
||||
* initialize
|
||||
*/
|
||||
buffer_usage = shm_toc_allocate(pcxt->toc,
|
||||
mul_size(sizeof(BufferUsage), pcxt->nworkers));
|
||||
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
|
||||
lps->buffer_usage = buffer_usage;
|
||||
wal_usage = shm_toc_allocate(pcxt->toc,
|
||||
mul_size(sizeof(WalUsage), pcxt->nworkers));
|
||||
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
|
||||
lps->wal_usage = wal_usage;
|
||||
|
||||
/* Store query string for workers */
|
||||
sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
|
||||
@@ -3435,6 +3451,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
|
||||
LVShared *lvshared;
|
||||
LVDeadTuples *dead_tuples;
|
||||
BufferUsage *buffer_usage;
|
||||
WalUsage *wal_usage;
|
||||
int nindexes;
|
||||
char *sharedquery;
|
||||
IndexBulkDeleteResult **stats;
|
||||
@@ -3511,9 +3528,11 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
|
||||
parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes,
|
||||
&vacrelstats);
|
||||
|
||||
/* Report buffer usage during parallel execution */
|
||||
/* Report buffer/WAL usage during parallel execution */
|
||||
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
|
||||
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
|
||||
wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
|
||||
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
|
||||
&wal_usage[ParallelWorkerNumber]);
|
||||
|
||||
/* Pop the error context stack */
|
||||
error_context_stack = errcallback.previous;
|
||||
|
@@ -67,6 +67,7 @@
|
||||
#include "access/xloginsert.h"
|
||||
#include "catalog/index.h"
|
||||
#include "commands/progress.h"
|
||||
#include "executor/instrument.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "storage/smgr.h"
|
||||
@@ -81,6 +82,7 @@
|
||||
#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002)
|
||||
#define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003)
|
||||
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004)
|
||||
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000005)
|
||||
|
||||
/*
|
||||
* DISABLE_LEADER_PARTICIPATION disables the leader's participation in
|
||||
@@ -203,6 +205,7 @@ typedef struct BTLeader
|
||||
Sharedsort *sharedsort;
|
||||
Sharedsort *sharedsort2;
|
||||
Snapshot snapshot;
|
||||
WalUsage *walusage;
|
||||
} BTLeader;
|
||||
|
||||
/*
|
||||
@@ -1476,6 +1479,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
|
||||
Sharedsort *sharedsort2;
|
||||
BTSpool *btspool = buildstate->spool;
|
||||
BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader));
|
||||
WalUsage *walusage;
|
||||
bool leaderparticipates = true;
|
||||
char *sharedquery;
|
||||
int querylen;
|
||||
@@ -1528,6 +1532,18 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
|
||||
shm_toc_estimate_keys(&pcxt->estimator, 3);
|
||||
}
|
||||
|
||||
/*
|
||||
* Estimate space for WalUsage -- PARALLEL_KEY_WAL_USAGE
|
||||
*
|
||||
* WalUsage during execution of maintenance command can be used by an
|
||||
* extension that reports the WAL usage, such as pg_stat_statements. We
|
||||
* have no way of knowing whether anyone's looking at pgWalUsage, so do it
|
||||
* unconditionally.
|
||||
*/
|
||||
shm_toc_estimate_chunk(&pcxt->estimator,
|
||||
mul_size(sizeof(WalUsage), pcxt->nworkers));
|
||||
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||
|
||||
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
|
||||
querylen = strlen(debug_query_string);
|
||||
shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
|
||||
@@ -1599,6 +1615,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
|
||||
memcpy(sharedquery, debug_query_string, querylen + 1);
|
||||
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
|
||||
|
||||
/* Allocate space for each worker's WalUsage; no need to initialize */
|
||||
walusage = shm_toc_allocate(pcxt->toc,
|
||||
mul_size(sizeof(WalUsage), pcxt->nworkers));
|
||||
shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
|
||||
|
||||
/* Launch workers, saving status for leader/caller */
|
||||
LaunchParallelWorkers(pcxt);
|
||||
btleader->pcxt = pcxt;
|
||||
@@ -1609,6 +1630,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
|
||||
btleader->sharedsort = sharedsort;
|
||||
btleader->sharedsort2 = sharedsort2;
|
||||
btleader->snapshot = snapshot;
|
||||
btleader->walusage = walusage;
|
||||
|
||||
/* If no workers were successfully launched, back out (do serial build) */
|
||||
if (pcxt->nworkers_launched == 0)
|
||||
@@ -1637,8 +1659,18 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
|
||||
static void
|
||||
_bt_end_parallel(BTLeader *btleader)
|
||||
{
|
||||
int i;
|
||||
|
||||
/* Shutdown worker processes */
|
||||
WaitForParallelWorkersToFinish(btleader->pcxt);
|
||||
|
||||
/*
|
||||
* Next, accumulate WAL usage. (This must wait for the workers to finish,
|
||||
* or we might get incomplete data.)
|
||||
*/
|
||||
for (i = 0; i < btleader->pcxt->nworkers_launched; i++)
|
||||
InstrAccumParallelQuery(NULL, &btleader->walusage[i]);
|
||||
|
||||
/* Free last reference to MVCC snapshot, if one was used */
|
||||
if (IsMVCCSnapshot(btleader->snapshot))
|
||||
UnregisterSnapshot(btleader->snapshot);
|
||||
@@ -1769,6 +1801,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
|
||||
Relation indexRel;
|
||||
LOCKMODE heapLockmode;
|
||||
LOCKMODE indexLockmode;
|
||||
WalUsage *walusage;
|
||||
int sortmem;
|
||||
|
||||
#ifdef BTREE_BUILD_STATS
|
||||
@@ -1830,11 +1863,18 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
|
||||
tuplesort_attach_shared(sharedsort2, seg);
|
||||
}
|
||||
|
||||
/* Prepare to track buffer usage during parallel execution */
|
||||
InstrStartParallelQuery();
|
||||
|
||||
/* Perform sorting of spool, and possibly a spool2 */
|
||||
sortmem = maintenance_work_mem / btshared->scantuplesortstates;
|
||||
_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
|
||||
sharedsort2, sortmem, false);
|
||||
|
||||
/* Report WAL usage during parallel execution */
|
||||
walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
|
||||
InstrEndParallelQuery(NULL, &walusage[ParallelWorkerNumber]);
|
||||
|
||||
#ifdef BTREE_BUILD_STATS
|
||||
if (log_btree_build_stats)
|
||||
{
|
||||
|
@@ -43,6 +43,7 @@
|
||||
#include "commands/progress.h"
|
||||
#include "commands/tablespace.h"
|
||||
#include "common/controldata_utils.h"
|
||||
#include "executor/instrument.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pg_trace.h"
|
||||
#include "pgstat.h"
|
||||
@@ -996,7 +997,8 @@ static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
|
||||
XLogRecPtr
|
||||
XLogInsertRecord(XLogRecData *rdata,
|
||||
XLogRecPtr fpw_lsn,
|
||||
uint8 flags)
|
||||
uint8 flags,
|
||||
int num_fpw)
|
||||
{
|
||||
XLogCtlInsert *Insert = &XLogCtl->Insert;
|
||||
pg_crc32c rdata_crc;
|
||||
@@ -1252,6 +1254,14 @@ XLogInsertRecord(XLogRecData *rdata,
|
||||
ProcLastRecPtr = StartPos;
|
||||
XactLastRecEnd = EndPos;
|
||||
|
||||
/* Report WAL traffic to the instrumentation. */
|
||||
if (inserted)
|
||||
{
|
||||
pgWalUsage.wal_bytes += rechdr->xl_tot_len;
|
||||
pgWalUsage.wal_records++;
|
||||
pgWalUsage.wal_num_fpw += num_fpw;
|
||||
}
|
||||
|
||||
return EndPos;
|
||||
}
|
||||
|
||||
|
@@ -25,6 +25,7 @@
|
||||
#include "access/xloginsert.h"
|
||||
#include "catalog/pg_control.h"
|
||||
#include "common/pg_lzcompress.h"
|
||||
#include "executor/instrument.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pg_trace.h"
|
||||
#include "replication/origin.h"
|
||||
@@ -108,7 +109,7 @@ static MemoryContext xloginsert_cxt;
|
||||
|
||||
static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
|
||||
XLogRecPtr RedoRecPtr, bool doPageWrites,
|
||||
XLogRecPtr *fpw_lsn);
|
||||
XLogRecPtr *fpw_lsn, int *num_fpw);
|
||||
static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
|
||||
uint16 hole_length, char *dest, uint16 *dlen);
|
||||
|
||||
@@ -448,6 +449,7 @@ XLogInsert(RmgrId rmid, uint8 info)
|
||||
bool doPageWrites;
|
||||
XLogRecPtr fpw_lsn;
|
||||
XLogRecData *rdt;
|
||||
int num_fpw = 0;
|
||||
|
||||
/*
|
||||
* Get values needed to decide whether to do full-page writes. Since
|
||||
@@ -457,9 +459,9 @@ XLogInsert(RmgrId rmid, uint8 info)
|
||||
GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
|
||||
|
||||
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
|
||||
&fpw_lsn);
|
||||
&fpw_lsn, &num_fpw);
|
||||
|
||||
EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
|
||||
EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpw);
|
||||
} while (EndPos == InvalidXLogRecPtr);
|
||||
|
||||
XLogResetInsertion();
|
||||
@@ -482,7 +484,7 @@ XLogInsert(RmgrId rmid, uint8 info)
|
||||
static XLogRecData *
|
||||
XLogRecordAssemble(RmgrId rmid, uint8 info,
|
||||
XLogRecPtr RedoRecPtr, bool doPageWrites,
|
||||
XLogRecPtr *fpw_lsn)
|
||||
XLogRecPtr *fpw_lsn, int *num_fpw)
|
||||
{
|
||||
XLogRecData *rdt;
|
||||
uint32 total_len = 0;
|
||||
@@ -635,6 +637,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
|
||||
*/
|
||||
bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
|
||||
|
||||
/* Report a full page image constructed for the WAL record */
|
||||
*num_fpw += 1;
|
||||
|
||||
/*
|
||||
* Construct XLogRecData entries for the page content.
|
||||
*/
|
||||
|
Reference in New Issue
Block a user