1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

Fix EXPLAIN ANALYZE for parallel HashAgg plans

Since 1f39bce02, HashAgg nodes have had the ability to spill to disk when
memory consumption exceeds work_mem. That commit added new properties to
EXPLAIN ANALYZE to show the maximum memory usage and disk usage, however,
it didn't quite go as far as showing that information for parallel
workers.  Since workers may have experienced something very different from
the main process, we should show this information per worker, as is done
in Sort.

Reviewed-by: Justin Pryzby
Reviewed-by: Jeff Davis
Discussion: https://postgr.es/m/CAApHDvpEKbfZa18mM1TD7qV6PG+w97pwCWq5tVD0dX7e11gRJw@mail.gmail.com
Backpatch-through: 13, where the hashagg spilling code was added.
This commit is contained in:
David Rowley
2020-06-19 17:25:07 +12:00
parent 5fffa8fce3
commit bdee4af8e0
5 changed files with 245 additions and 18 deletions

View File

@ -3051,29 +3051,111 @@ show_hashagg_info(AggState *aggstate, ExplainState *es)
Agg *agg = (Agg *) aggstate->ss.ps.plan;
int64 memPeakKb = (aggstate->hash_mem_peak + 1023) / 1024;
Assert(IsA(aggstate, AggState));
if (agg->aggstrategy != AGG_HASHED &&
agg->aggstrategy != AGG_MIXED)
return;
if (es->costs && aggstate->hash_planned_partitions > 0)
if (es->format != EXPLAIN_FORMAT_TEXT)
{
ExplainPropertyInteger("Planned Partitions", NULL,
aggstate->hash_planned_partitions, es);
if (es->costs && aggstate->hash_planned_partitions > 0)
{
ExplainPropertyInteger("Planned Partitions", NULL,
aggstate->hash_planned_partitions, es);
}
if (!es->analyze)
return;
/* EXPLAIN ANALYZE */
ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es);
if (aggstate->hash_batches_used > 0)
{
ExplainPropertyInteger("Disk Usage", "kB",
aggstate->hash_disk_used, es);
ExplainPropertyInteger("HashAgg Batches", NULL,
aggstate->hash_batches_used, es);
}
}
else
{
bool gotone = false;
if (es->costs && aggstate->hash_planned_partitions > 0)
{
ExplainIndentText(es);
appendStringInfo(es->str, "Planned Partitions: %d",
aggstate->hash_planned_partitions);
gotone = true;
}
if (!es->analyze)
{
if (gotone)
appendStringInfoChar(es->str, '\n');
return;
}
if (!gotone)
ExplainIndentText(es);
else
appendStringInfoString(es->str, " ");
appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB",
memPeakKb);
if (aggstate->hash_batches_used > 0)
appendStringInfo(es->str, " Disk Usage: " UINT64_FORMAT " kB HashAgg Batches: %d",
aggstate->hash_disk_used,
aggstate->hash_batches_used);
appendStringInfoChar(es->str, '\n');
}
if (!es->analyze)
return;
/* EXPLAIN ANALYZE */
ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb, es);
if (aggstate->hash_batches_used > 0)
/* Display stats for each parallel worker */
if (es->analyze && aggstate->shared_info != NULL)
{
ExplainPropertyInteger("Disk Usage", "kB",
aggstate->hash_disk_used, es);
ExplainPropertyInteger("HashAgg Batches", NULL,
aggstate->hash_batches_used, es);
for (int n = 0; n < aggstate->shared_info->num_workers; n++)
{
AggregateInstrumentation *sinstrument;
uint64 hash_disk_used;
int hash_batches_used;
sinstrument = &aggstate->shared_info->sinstrument[n];
hash_disk_used = sinstrument->hash_disk_used;
hash_batches_used = sinstrument->hash_batches_used;
memPeakKb = (sinstrument->hash_mem_peak + 1023) / 1024;
if (es->workers_state)
ExplainOpenWorker(n, es);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
ExplainIndentText(es);
appendStringInfo(es->str, "Peak Memory Usage: " INT64_FORMAT " kB",
memPeakKb);
if (hash_batches_used > 0)
appendStringInfo(es->str, " Disk Usage: " UINT64_FORMAT " kB HashAgg Batches: %d",
hash_disk_used, hash_batches_used);
appendStringInfoChar(es->str, '\n');
}
else
{
ExplainPropertyInteger("Peak Memory Usage", "kB", memPeakKb,
es);
if (hash_batches_used > 0)
{
ExplainPropertyInteger("Disk Usage", "kB", hash_disk_used,
es);
ExplainPropertyInteger("HashAgg Batches", NULL,
hash_batches_used, es);
}
}
if (es->workers_state)
ExplainCloseWorker(n, es);
}
}
}

View File

@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
#include "executor/nodeAppend.h"
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
@ -288,7 +289,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
break;
case T_AggState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecAggEstimate((AggState *) planstate, e->pcxt);
break;
default:
break;
}
@ -505,7 +509,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
break;
case T_AggState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
break;
default:
break;
}
@ -1048,6 +1055,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
case T_HashState:
ExecHashRetrieveInstrumentation((HashState *) planstate);
break;
case T_AggState:
ExecAggRetrieveInstrumentation((AggState *) planstate);
break;
default:
break;
}
@ -1336,7 +1346,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
pwcxt);
break;
case T_AggState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecAggInitializeWorker((AggState *) planstate, pwcxt);
break;
default:
break;
}

View File

@ -240,6 +240,7 @@
#include "postgres.h"
#include "access/htup_details.h"
#include "access/parallel.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node)
int numGroupingSets = Max(node->maxsets, 1);
int setno;
/*
* When ending a parallel worker, copy the statistics gathered by the
* worker back into shared memory so that it can be picked up by the main
* process to report in EXPLAIN ANALYZE.
*/
if (node->shared_info && IsParallelWorker())
{
AggregateInstrumentation *si;
Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
si = &node->shared_info->sinstrument[ParallelWorkerNumber];
si->hash_batches_used = node->hash_batches_used;
si->hash_disk_used = node->hash_disk_used;
si->hash_mem_peak = node->hash_mem_peak;
}
/* Make sure we have closed any open tuplesorts */
if (node->sort_in)
@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS)
fcinfo->flinfo->fn_oid);
return (Datum) 0; /* keep compiler quiet */
}
/* ----------------------------------------------------------------
* Parallel Query Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecAggEstimate
*
* Estimate space required to propagate aggregate statistics.
* ----------------------------------------------------------------
*/
void
ExecAggEstimate(AggState *node, ParallelContext *pcxt)
{
Size size;
/* don't need this if not instrumenting or no workers */
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
return;
size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
size = add_size(size, offsetof(SharedAggInfo, sinstrument));
shm_toc_estimate_chunk(&pcxt->estimator, size);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/* ----------------------------------------------------------------
* ExecAggInitializeDSM
*
* Initialize DSM space for aggregate statistics.
* ----------------------------------------------------------------
*/
void
ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
{
Size size;
/* don't need this if not instrumenting or no workers */
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
return;
size = offsetof(SharedAggInfo, sinstrument)
+ pcxt->nworkers * sizeof(AggregateInstrumentation);
node->shared_info = shm_toc_allocate(pcxt->toc, size);
/* ensure any unfilled slots will contain zeroes */
memset(node->shared_info, 0, size);
node->shared_info->num_workers = pcxt->nworkers;
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
node->shared_info);
}
/* ----------------------------------------------------------------
* ExecAggInitializeWorker
*
* Attach worker to DSM space for aggregate statistics.
* ----------------------------------------------------------------
*/
void
ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
{
node->shared_info =
shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
}
/* ----------------------------------------------------------------
* ExecAggRetrieveInstrumentation
*
* Transfer aggregate statistics from DSM to private memory.
* ----------------------------------------------------------------
*/
void
ExecAggRetrieveInstrumentation(AggState *node)
{
Size size;
SharedAggInfo *si;
if (node->shared_info == NULL)
return;
size = offsetof(SharedAggInfo, sinstrument)
+ node->shared_info->num_workers * sizeof(AggregateInstrumentation);
si = palloc(size);
memcpy(si, node->shared_info, size);
node->shared_info = si;
}