mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
Allow parallel create index to accumulate buffer usage stats.
Currently, we don't account for buffer usage incurred by parallel workers for parallel create index. This commit allows each worker to record the buffer usage stats and leader backend to accumulate that stats at the end of the operation. This will allow pg_stat_statements to display correct buffer usage stats for (parallel) create index command. Reported-by: Julien Rouhaud Author: Sawada Masahiko Reviewed-by: Dilip Kumar, Julien Rouhaud and Amit Kapila Backpatch-through: 11, where this was introduced Discussion: https://postgr.es/m/20200328151721.GB12854@nol
This commit is contained in:
@ -71,6 +71,7 @@
|
|||||||
#define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003)
|
#define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003)
|
||||||
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004)
|
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004)
|
||||||
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000005)
|
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000005)
|
||||||
|
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xA000000000000006)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DISABLE_LEADER_PARTICIPATION disables the leader's participation in
|
* DISABLE_LEADER_PARTICIPATION disables the leader's participation in
|
||||||
@ -194,6 +195,7 @@ typedef struct BTLeader
|
|||||||
Sharedsort *sharedsort2;
|
Sharedsort *sharedsort2;
|
||||||
Snapshot snapshot;
|
Snapshot snapshot;
|
||||||
WalUsage *walusage;
|
WalUsage *walusage;
|
||||||
|
BufferUsage *bufferusage;
|
||||||
} BTLeader;
|
} BTLeader;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1457,6 +1459,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
|
|||||||
BTSpool *btspool = buildstate->spool;
|
BTSpool *btspool = buildstate->spool;
|
||||||
BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader));
|
BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader));
|
||||||
WalUsage *walusage;
|
WalUsage *walusage;
|
||||||
|
BufferUsage *bufferusage;
|
||||||
bool leaderparticipates = true;
|
bool leaderparticipates = true;
|
||||||
char *sharedquery;
|
char *sharedquery;
|
||||||
int querylen;
|
int querylen;
|
||||||
@ -1510,16 +1513,19 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Estimate space for WalUsage -- PARALLEL_KEY_WAL_USAGE
|
* Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
|
||||||
|
* and PARALLEL_KEY_BUFFER_USAGE.
|
||||||
*
|
*
|
||||||
* WalUsage during execution of maintenance command can be used by an
|
* If there are no extensions loaded that care, we could skip this. We
|
||||||
* extension that reports the WAL usage, such as pg_stat_statements. We
|
* have no way of knowing whether anyone's looking at pgWalUsage or
|
||||||
* have no way of knowing whether anyone's looking at pgWalUsage, so do it
|
* pgBufferUsage, so do it unconditionally.
|
||||||
* unconditionally.
|
|
||||||
*/
|
*/
|
||||||
shm_toc_estimate_chunk(&pcxt->estimator,
|
shm_toc_estimate_chunk(&pcxt->estimator,
|
||||||
mul_size(sizeof(WalUsage), pcxt->nworkers));
|
mul_size(sizeof(WalUsage), pcxt->nworkers));
|
||||||
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||||
|
shm_toc_estimate_chunk(&pcxt->estimator,
|
||||||
|
mul_size(sizeof(BufferUsage), pcxt->nworkers));
|
||||||
|
shm_toc_estimate_keys(&pcxt->estimator, 1);
|
||||||
|
|
||||||
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
|
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
|
||||||
querylen = strlen(debug_query_string);
|
querylen = strlen(debug_query_string);
|
||||||
@ -1592,10 +1598,16 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
|
|||||||
memcpy(sharedquery, debug_query_string, querylen + 1);
|
memcpy(sharedquery, debug_query_string, querylen + 1);
|
||||||
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
|
||||||
|
|
||||||
/* Allocate space for each worker's WalUsage; no need to initialize */
|
/*
|
||||||
|
* Allocate space for each worker's WalUsage and BufferUsage; no need to
|
||||||
|
* initialize.
|
||||||
|
*/
|
||||||
walusage = shm_toc_allocate(pcxt->toc,
|
walusage = shm_toc_allocate(pcxt->toc,
|
||||||
mul_size(sizeof(WalUsage), pcxt->nworkers));
|
mul_size(sizeof(WalUsage), pcxt->nworkers));
|
||||||
shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
|
||||||
|
bufferusage = shm_toc_allocate(pcxt->toc,
|
||||||
|
mul_size(sizeof(BufferUsage), pcxt->nworkers));
|
||||||
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
|
||||||
|
|
||||||
/* Launch workers, saving status for leader/caller */
|
/* Launch workers, saving status for leader/caller */
|
||||||
LaunchParallelWorkers(pcxt);
|
LaunchParallelWorkers(pcxt);
|
||||||
@ -1608,6 +1620,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
|
|||||||
btleader->sharedsort2 = sharedsort2;
|
btleader->sharedsort2 = sharedsort2;
|
||||||
btleader->snapshot = snapshot;
|
btleader->snapshot = snapshot;
|
||||||
btleader->walusage = walusage;
|
btleader->walusage = walusage;
|
||||||
|
btleader->bufferusage = bufferusage;
|
||||||
|
|
||||||
/* If no workers were successfully launched, back out (do serial build) */
|
/* If no workers were successfully launched, back out (do serial build) */
|
||||||
if (pcxt->nworkers_launched == 0)
|
if (pcxt->nworkers_launched == 0)
|
||||||
@ -1646,7 +1659,7 @@ _bt_end_parallel(BTLeader *btleader)
|
|||||||
* or we might get incomplete data.)
|
* or we might get incomplete data.)
|
||||||
*/
|
*/
|
||||||
for (i = 0; i < btleader->pcxt->nworkers_launched; i++)
|
for (i = 0; i < btleader->pcxt->nworkers_launched; i++)
|
||||||
InstrAccumParallelQuery(NULL, &btleader->walusage[i]);
|
InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]);
|
||||||
|
|
||||||
/* Free last reference to MVCC snapshot, if one was used */
|
/* Free last reference to MVCC snapshot, if one was used */
|
||||||
if (IsMVCCSnapshot(btleader->snapshot))
|
if (IsMVCCSnapshot(btleader->snapshot))
|
||||||
@ -1779,6 +1792,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
|
|||||||
LOCKMODE heapLockmode;
|
LOCKMODE heapLockmode;
|
||||||
LOCKMODE indexLockmode;
|
LOCKMODE indexLockmode;
|
||||||
WalUsage *walusage;
|
WalUsage *walusage;
|
||||||
|
BufferUsage *bufferusage;
|
||||||
int sortmem;
|
int sortmem;
|
||||||
|
|
||||||
#ifdef BTREE_BUILD_STATS
|
#ifdef BTREE_BUILD_STATS
|
||||||
@ -1848,9 +1862,11 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc)
|
|||||||
_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
|
_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
|
||||||
sharedsort2, sortmem, false);
|
sharedsort2, sortmem, false);
|
||||||
|
|
||||||
/* Report WAL usage during parallel execution */
|
/* Report WAL/buffer usage during parallel execution */
|
||||||
|
bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
|
||||||
walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
|
walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
|
||||||
InstrEndParallelQuery(NULL, &walusage[ParallelWorkerNumber]);
|
InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
|
||||||
|
&walusage[ParallelWorkerNumber]);
|
||||||
|
|
||||||
#ifdef BTREE_BUILD_STATS
|
#ifdef BTREE_BUILD_STATS
|
||||||
if (log_btree_build_stats)
|
if (log_btree_build_stats)
|
||||||
|
@ -188,11 +188,8 @@ InstrStartParallelQuery(void)
|
|||||||
void
|
void
|
||||||
InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
|
InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
|
||||||
{
|
{
|
||||||
if (bufusage)
|
|
||||||
{
|
|
||||||
memset(bufusage, 0, sizeof(BufferUsage));
|
memset(bufusage, 0, sizeof(BufferUsage));
|
||||||
BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage);
|
BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage);
|
||||||
}
|
|
||||||
memset(walusage, 0, sizeof(WalUsage));
|
memset(walusage, 0, sizeof(WalUsage));
|
||||||
WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage);
|
WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage);
|
||||||
}
|
}
|
||||||
@ -201,7 +198,6 @@ InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
|
|||||||
void
|
void
|
||||||
InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
|
InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
|
||||||
{
|
{
|
||||||
if (bufusage)
|
|
||||||
BufferUsageAdd(&pgBufferUsage, bufusage);
|
BufferUsageAdd(&pgBufferUsage, bufusage);
|
||||||
WalUsageAdd(&pgWalUsage, walusage);
|
WalUsageAdd(&pgWalUsage, walusage);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user