From 969f9d0b4ba574bb8df65683dbf7a09c030f3e67 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sat, 11 Apr 2020 12:39:19 -0400 Subject: [PATCH] Make EXPLAIN report maximum hashtable usage across multiple rescans. Before discarding the old hash table in ExecReScanHashJoin, capture its statistics, ensuring that we report the maximum hashtable size across repeated rescans of the hash input relation. We can repurpose the existing code for reporting hashtable size in parallel workers to help with this, making the patch pretty small. This also ensures that if rescans happen within parallel workers, we get the correct maximums across all instances. Konstantin Knizhnik and Tom Lane, per diagnosis by Thomas Munro of a trouble report from Alvaro Herrera. Discussion: https://postgr.es/m/20200323165059.GA24950@alvherre.pgsql --- src/backend/commands/explain.c | 46 +++++++++-------------- src/backend/executor/nodeHash.c | 58 ++++++++++++++++++++++------- src/backend/executor/nodeHashjoin.c | 10 ++++- src/include/executor/nodeHash.h | 4 +- src/include/nodes/execnodes.h | 18 +++++++-- 5 files changed, 87 insertions(+), 49 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 455f54ef83f..f3c8da1e01b 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2964,22 +2964,25 @@ show_hash_info(HashState *hashstate, ExplainState *es) HashInstrumentation hinstrument = {0}; /* + * Collect stats from the local process, even when it's a parallel query. * In a parallel query, the leader process may or may not have run the * hash join, and even if it did it may not have built a hash table due to * timing (if it started late it might have seen no tuples in the outer * relation and skipped building the hash table). Therefore we have to be * prepared to get instrumentation data from all participants. */ - if (hashstate->hashtable) - ExecHashGetInstrumentation(&hinstrument, hashstate->hashtable); + if (hashstate->hinstrument) + memcpy(&hinstrument, hashstate->hinstrument, + sizeof(HashInstrumentation)); /* * Merge results from workers. In the parallel-oblivious case, the * results from all participants should be identical, except where * participants didn't run the join at all so have no data. In the * parallel-aware case, we need to consider all the results. Each worker - * may have seen a different subset of batches and we want to find the - * highest memory usage for any one batch across all batches. + * may have seen a different subset of batches and we want to report the + * highest memory usage across all batches. We take the maxima of other + * values too, for the same reasons as in ExecHashAccumInstrumentation. */ if (hashstate->shared_info) { @@ -2990,31 +2993,16 @@ show_hash_info(HashState *hashstate, ExplainState *es) { HashInstrumentation *worker_hi = &shared_info->hinstrument[i]; - if (worker_hi->nbatch > 0) - { - /* - * Every participant should agree on the buckets, so to be - * sure we have a value we'll just overwrite each time. - */ - hinstrument.nbuckets = worker_hi->nbuckets; - hinstrument.nbuckets_original = worker_hi->nbuckets_original; - - /* - * Normally every participant should agree on the number of - * batches too, but it's possible for a backend that started - * late and missed the whole join not to have the final nbatch - * number. So we'll take the largest number. - */ - hinstrument.nbatch = Max(hinstrument.nbatch, worker_hi->nbatch); - hinstrument.nbatch_original = worker_hi->nbatch_original; - - /* - * In a parallel-aware hash join, for now we report the - * maximum peak memory reported by any worker. - */ - hinstrument.space_peak = - Max(hinstrument.space_peak, worker_hi->space_peak); - } + hinstrument.nbuckets = Max(hinstrument.nbuckets, + worker_hi->nbuckets); + hinstrument.nbuckets_original = Max(hinstrument.nbuckets_original, + worker_hi->nbuckets_original); + hinstrument.nbatch = Max(hinstrument.nbatch, + worker_hi->nbatch); + hinstrument.nbatch_original = Max(hinstrument.nbatch_original, + worker_hi->nbatch_original); + hinstrument.space_peak = Max(hinstrument.space_peak, + worker_hi->space_peak); } } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index c881dc1de81..5da13ada726 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2597,7 +2597,10 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) size = offsetof(SharedHashInfo, hinstrument) + pcxt->nworkers * sizeof(HashInstrumentation); node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size); + + /* Each per-worker area must start out as zeroes. */ memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, node->shared_info); @@ -2616,22 +2619,33 @@ ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt) if (!node->ps.instrument) return; + /* + * Find our entry in the shared area, and set up a pointer to it so that + * we'll accumulate stats there when shutting down or rebuilding the hash + * table. + */ shared_info = (SharedHashInfo *) shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber]; } /* - * Copy instrumentation data from this worker's hash table (if it built one) - * to DSM memory so the leader can retrieve it. This must be done in an - * ExecShutdownHash() rather than ExecEndHash() because the latter runs after - * we've detached from the DSM segment. + * Collect EXPLAIN stats if needed, saving them into DSM memory if + * ExecHashInitializeWorker was called, or local storage if not. In the + * parallel case, this must be done in ExecShutdownHash() rather than + * ExecEndHash() because the latter runs after we've detached from the DSM + * segment. */ void ExecShutdownHash(HashState *node) { + /* Allocate save space if EXPLAIN'ing and we didn't do so already */ + if (node->ps.instrument && !node->hinstrument) + node->hinstrument = (HashInstrumentation *) + palloc0(sizeof(HashInstrumentation)); + /* Now accumulate data for the current (final) hash table */ if (node->hinstrument && node->hashtable) - ExecHashGetInstrumentation(node->hinstrument, node->hashtable); + ExecHashAccumInstrumentation(node->hinstrument, node->hashtable); } /* @@ -2655,18 +2669,34 @@ ExecHashRetrieveInstrumentation(HashState *node) } /* - * Copy the instrumentation data from 'hashtable' into a HashInstrumentation - * struct. + * Accumulate instrumentation data from 'hashtable' into an + * initially-zeroed HashInstrumentation struct. + * + * This is used to merge information across successive hash table instances + * within a single plan node. We take the maximum values of each interesting + * number. The largest nbuckets and largest nbatch values might have occurred + * in different instances, so there's some risk of confusion from reporting + * unrelated numbers; but there's a bigger risk of misdiagnosing a performance + * issue if we don't report the largest values. Similarly, we want to report + * the largest spacePeak regardless of whether it happened in the same + * instance as the largest nbuckets or nbatch. All the instances should have + * the same nbuckets_original and nbatch_original; but there's little value + * in depending on that here, so handle them the same way. */ void -ExecHashGetInstrumentation(HashInstrumentation *instrument, - HashJoinTable hashtable) +ExecHashAccumInstrumentation(HashInstrumentation *instrument, + HashJoinTable hashtable) { - instrument->nbuckets = hashtable->nbuckets; - instrument->nbuckets_original = hashtable->nbuckets_original; - instrument->nbatch = hashtable->nbatch; - instrument->nbatch_original = hashtable->nbatch_original; - instrument->space_peak = hashtable->spacePeak; + instrument->nbuckets = Max(instrument->nbuckets, + hashtable->nbuckets); + instrument->nbuckets_original = Max(instrument->nbuckets_original, + hashtable->nbuckets_original); + instrument->nbatch = Max(instrument->nbatch, + hashtable->nbatch); + instrument->nbatch_original = Max(instrument->nbatch_original, + hashtable->nbatch_original); + instrument->space_peak = Max(instrument->space_peak, + hashtable->spacePeak); } /* diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 9e28ddd8951..cc8edacdd01 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1338,8 +1338,16 @@ ExecReScanHashJoin(HashJoinState *node) /* must destroy and rebuild hash table */ HashState *hashNode = castNode(HashState, innerPlanState(node)); - /* for safety, be sure to clear child plan node's pointer too */ Assert(hashNode->hashtable == node->hj_HashTable); + /* accumulate stats from old hash table, if wanted */ + /* (this should match ExecShutdownHash) */ + if (hashNode->ps.instrument && !hashNode->hinstrument) + hashNode->hinstrument = (HashInstrumentation *) + palloc0(sizeof(HashInstrumentation)); + if (hashNode->hinstrument) + ExecHashAccumInstrumentation(hashNode->hinstrument, + hashNode->hashtable); + /* for safety, be sure to clear child plan node's pointer too */ hashNode->hashtable = NULL; ExecHashTableDestroy(node->hj_HashTable); diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 1336fde6b4d..64d2ce693ca 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -73,7 +73,7 @@ extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt); extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt); extern void ExecHashRetrieveInstrumentation(HashState *node); extern void ExecShutdownHash(HashState *node); -extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, - HashJoinTable hashtable); +extern void ExecHashAccumInstrumentation(HashInstrumentation *instrument, + HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4c009b1a7c5..4fee043bb2b 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2358,7 +2358,7 @@ typedef struct HashInstrumentation int nbuckets_original; /* planned number of buckets */ int nbatch; /* number of batches at end of execution */ int nbatch_original; /* planned number of batches */ - size_t space_peak; /* peak memory usage in bytes */ + Size space_peak; /* peak memory usage in bytes */ } HashInstrumentation; /* ---------------- @@ -2381,8 +2381,20 @@ typedef struct HashState HashJoinTable hashtable; /* hash table for the hashjoin */ List *hashkeys; /* list of ExprState nodes */ - SharedHashInfo *shared_info; /* one entry per worker */ - HashInstrumentation *hinstrument; /* this worker's entry */ + /* + * In a parallelized hash join, the leader retains a pointer to the + * shared-memory stats area in its shared_info field, and then copies the + * shared-memory info back to local storage before DSM shutdown. The + * shared_info field remains NULL in workers, or in non-parallel joins. + */ + SharedHashInfo *shared_info; + + /* + * If we are collecting hash stats, this points to an initially-zeroed + * collection area, which could be either local storage or in shared + * memory; either way it's for just one process. + */ + HashInstrumentation *hinstrument; /* Parallel hash state. */ struct ParallelHashJoinState *parallel_state;