diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 447f69d044e..7e4fbafc535 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -19,7 +19,7 @@ #include "commands/createas.h" #include "commands/defrem.h" #include "commands/prepare.h" -#include "executor/hashjoin.h" +#include "executor/nodeHash.h" #include "foreign/fdwapi.h" #include "nodes/extensible.h" #include "nodes/nodeFuncs.h" @@ -2379,34 +2379,62 @@ show_sort_info(SortState *sortstate, ExplainState *es) static void show_hash_info(HashState *hashstate, ExplainState *es) { - HashJoinTable hashtable; + HashInstrumentation *hinstrument = NULL; - hashtable = hashstate->hashtable; - - if (hashtable) + /* + * In a parallel query, the leader process may or may not have run the + * hash join, and even if it did it may not have built a hash table due to + * timing (if it started late it might have seen no tuples in the outer + * relation and skipped building the hash table). Therefore we have to be + * prepared to get instrumentation data from a worker if there is no hash + * table. + */ + if (hashstate->hashtable) { - long spacePeakKb = (hashtable->spacePeak + 1023) / 1024; + hinstrument = (HashInstrumentation *) + palloc(sizeof(HashInstrumentation)); + ExecHashGetInstrumentation(hinstrument, hashstate->hashtable); + } + else if (hashstate->shared_info) + { + SharedHashInfo *shared_info = hashstate->shared_info; + int i; + + /* Find the first worker that built a hash table. */ + for (i = 0; i < shared_info->num_workers; ++i) + { + if (shared_info->hinstrument[i].nbatch > 0) + { + hinstrument = &shared_info->hinstrument[i]; + break; + } + } + } + + if (hinstrument) + { + long spacePeakKb = (hinstrument->space_peak + 1023) / 1024; if (es->format != EXPLAIN_FORMAT_TEXT) { - ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es); + ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es); ExplainPropertyLong("Original Hash Buckets", - hashtable->nbuckets_original, es); - ExplainPropertyLong("Hash Batches", hashtable->nbatch, es); + hinstrument->nbuckets_original, es); + ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es); ExplainPropertyLong("Original Hash Batches", - hashtable->nbatch_original, es); + hinstrument->nbatch_original, es); ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es); } - else if (hashtable->nbatch_original != hashtable->nbatch || - hashtable->nbuckets_original != hashtable->nbuckets) + else if (hinstrument->nbatch_original != hinstrument->nbatch || + hinstrument->nbuckets_original != hinstrument->nbuckets) { appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, "Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n", - hashtable->nbuckets, - hashtable->nbuckets_original, - hashtable->nbatch, - hashtable->nbatch_original, + hinstrument->nbuckets, + hinstrument->nbuckets_original, + hinstrument->nbatch, + hinstrument->nbatch_original, spacePeakKb); } else @@ -2414,7 +2442,7 @@ show_hash_info(HashState *hashstate, ExplainState *es) appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfo(es->str, "Buckets: %d Batches: %d Memory Usage: %ldkB\n", - hashtable->nbuckets, hashtable->nbatch, + hinstrument->nbuckets, hinstrument->nbatch, spacePeakKb); } } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 53c5254be13..0aca00b0e68 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -29,6 +29,7 @@ #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" +#include "executor/nodeHash.h" #include "executor/nodeIndexscan.h" #include "executor/nodeIndexonlyscan.h" #include "executor/nodeSeqscan.h" @@ -259,8 +260,12 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate, e->pcxt); break; + case T_HashState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecHashEstimate((HashState *) planstate, e->pcxt); + break; case T_SortState: - /* even when not parallel-aware */ + /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortEstimate((SortState *) planstate, e->pcxt); break; @@ -458,8 +463,12 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate, d->pcxt); break; + case T_HashState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecHashInitializeDSM((HashState *) planstate, d->pcxt); + break; case T_SortState: - /* even when not parallel-aware */ + /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortInitializeDSM((SortState *) planstate, d->pcxt); break; @@ -872,8 +881,12 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, pcxt); break; + case T_HashState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecHashReInitializeDSM((HashState *) planstate, pcxt); + break; case T_SortState: - /* even when not parallel-aware */ + /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortReInitializeDSM((SortState *) planstate, pcxt); break; @@ -928,12 +941,18 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, planstate->worker_instrument->num_workers = instrumentation->num_workers; memcpy(&planstate->worker_instrument->instrument, instrument, ibytes); - /* - * Perform any node-type-specific work that needs to be done. Currently, - * only Sort nodes need to do anything here. - */ - if (IsA(planstate, SortState)) - ExecSortRetrieveInstrumentation((SortState *) planstate); + /* Perform any node-type-specific work that needs to be done. */ + switch (nodeTag(planstate)) + { + case T_SortState: + ExecSortRetrieveInstrumentation((SortState *) planstate); + break; + case T_HashState: + ExecHashRetrieveInstrumentation((HashState *) planstate); + break; + default: + break; + } return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation, instrumentation); @@ -1160,8 +1179,12 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, pwcxt); break; + case T_HashState: + /* even when not parallel-aware, for EXPLAIN ANALYZE */ + ExecHashInitializeWorker((HashState *) planstate, pwcxt); + break; case T_SortState: - /* even when not parallel-aware */ + /* even when not parallel-aware, for EXPLAIN ANALYZE */ ExecSortInitializeWorker((SortState *) planstate, pwcxt); break; diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index c1aa5064c90..9befca90161 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -751,6 +751,9 @@ ExecShutdownNode(PlanState *node) case T_GatherMergeState: ExecShutdownGatherMerge((GatherMergeState *) node); break; + case T_HashState: + ExecShutdownHash((HashState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index f7cd8fb3472..6fe5d69d558 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1637,6 +1637,110 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } } +/* + * Reserve space in the DSM segment for instrumentation data. + */ +void +ExecHashEstimate(HashState *node, ParallelContext *pcxt) +{ + size_t size; + + size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation)); + size = add_size(size, offsetof(SharedHashInfo, hinstrument)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Set up a space in the DSM for all workers to record instrumentation data + * about their hash table. + */ +void +ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) +{ + size_t size; + + size = offsetof(SharedHashInfo, hinstrument) + + pcxt->nworkers * sizeof(HashInstrumentation); + node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size); + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, + node->shared_info); +} + +/* + * Reset shared state before beginning a fresh scan. + */ +void +ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt) +{ + if (node->shared_info != NULL) + { + memset(node->shared_info->hinstrument, 0, + node->shared_info->num_workers * sizeof(HashInstrumentation)); + } +} + +/* + * Locate the DSM space for hash table instrumentation data that we'll write + * to at shutdown time. + */ +void +ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt) +{ + SharedHashInfo *shared_info; + + shared_info = (SharedHashInfo *) + shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, true); + node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber]; +} + +/* + * Copy instrumentation data from this worker's hash table (if it built one) + * to DSM memory so the leader can retrieve it. This must be done in an + * ExecShutdownHash() rather than ExecEndHash() because the latter runs after + * we've detached from the DSM segment. + */ +void +ExecShutdownHash(HashState *node) +{ + if (node->hinstrument && node->hashtable) + ExecHashGetInstrumentation(node->hinstrument, node->hashtable); +} + +/* + * Retrieve instrumentation data from workers before the DSM segment is + * detached, so that EXPLAIN can access it. + */ +void +ExecHashRetrieveInstrumentation(HashState *node) +{ + SharedHashInfo *shared_info = node->shared_info; + size_t size; + + /* Replace node->shared_info with a copy in backend-local memory. */ + size = offsetof(SharedHashInfo, hinstrument) + + shared_info->num_workers * sizeof(HashInstrumentation); + node->shared_info = palloc(size); + memcpy(node->shared_info, shared_info, size); +} + +/* + * Copy the instrumentation data from 'hashtable' into a HashInstrumentation + * struct. + */ +void +ExecHashGetInstrumentation(HashInstrumentation *instrument, + HashJoinTable hashtable) +{ + instrument->nbuckets = hashtable->nbuckets; + instrument->nbuckets_original = hashtable->nbuckets_original; + instrument->nbatch = hashtable->nbatch; + instrument->nbatch_original = hashtable->nbatch_original; + instrument->space_peak = hashtable->spacePeak; +} + /* * Allocate 'size' bytes from the currently active HashMemoryChunk */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 3ae556fb6c5..75d4c70f6f6 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -14,6 +14,7 @@ #ifndef NODEHASH_H #define NODEHASH_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags); @@ -48,5 +49,13 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, int *numbatches, int *num_skew_mcvs); extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); +extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); +extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt); +extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt); +extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt); +extern void ExecHashRetrieveInstrumentation(HashState *node); +extern void ExecShutdownHash(HashState *node); +extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, + HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index e05bc04f525..084d59ef834 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1980,6 +1980,29 @@ typedef struct GatherMergeState struct binaryheap *gm_heap; /* binary heap of slot indices */ } GatherMergeState; +/* ---------------- + * Values displayed by EXPLAIN ANALYZE + * ---------------- + */ +typedef struct HashInstrumentation +{ + int nbuckets; /* number of buckets at end of execution */ + int nbuckets_original; /* planned number of buckets */ + int nbatch; /* number of batches at end of execution */ + int nbatch_original; /* planned number of batches */ + size_t space_peak; /* speak memory usage in bytes */ +} HashInstrumentation; + +/* ---------------- + * Shared memory container for per-worker hash information + * ---------------- + */ +typedef struct SharedHashInfo +{ + int num_workers; + HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER]; +} SharedHashInfo; + /* ---------------- * HashState information * ---------------- @@ -1990,6 +2013,9 @@ typedef struct HashState HashJoinTable hashtable; /* hash table for the hashjoin */ List *hashkeys; /* list of ExprState nodes */ /* hashkeys is same as parent's hj_InnerHashKeys */ + + SharedHashInfo *shared_info; /* one entry per worker */ + HashInstrumentation *hinstrument; /* this worker's entry */ } HashState; /* ---------------- diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out index b7d17900978..001d96dc2d8 100644 --- a/src/test/regress/expected/join.out +++ b/src/test/regress/expected/join.out @@ -6173,6 +6173,21 @@ $$); rollback to settings; -- A couple of other hash join tests unrelated to work_mem management. +-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local parallel_leader_participation = off; +select * from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); + original | final +----------+------- + 1 | 1 +(1 row) + +rollback to settings; -- A full outer join where every record is matched. -- non-parallel savepoint settings; diff --git a/src/test/regress/sql/join.sql b/src/test/regress/sql/join.sql index c6d4a513e86..882601b3388 100644 --- a/src/test/regress/sql/join.sql +++ b/src/test/regress/sql/join.sql @@ -2159,6 +2159,17 @@ rollback to settings; -- A couple of other hash join tests unrelated to work_mem management. +-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate +savepoint settings; +set local max_parallel_workers_per_gather = 2; +set local work_mem = '4MB'; +set local parallel_leader_participation = off; +select * from hash_join_batches( +$$ + select count(*) from simple r join simple s using (id); +$$); +rollback to settings; + -- A full outer join where every record is matched. -- non-parallel