diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 55fce231ce1..94b6ae5c300 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -71,6 +71,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags) gatherstate->ps.plan = (Plan *) node; gatherstate->ps.state = estate; gatherstate->ps.ExecProcNode = ExecGather; + + gatherstate->initialized = false; gatherstate->need_to_scan_locally = !node->single_copy; /* @@ -81,10 +83,10 @@ ExecInitGather(Gather *node, EState *estate, int eflags) ExecAssignExprContext(estate, &gatherstate->ps); /* - * initialize child expressions + * Gather doesn't support checking a qual (it's always more efficient to + * do it in the child node). */ - gatherstate->ps.qual = - ExecInitQual(node->plan.qual, (PlanState *) gatherstate); + Assert(!node->plan.qual); /* * tuple table initialization @@ -167,15 +169,16 @@ ExecGather(PlanState *pstate) */ pcxt = node->pei->pcxt; LaunchParallelWorkers(pcxt); + /* We save # workers launched for the benefit of EXPLAIN */ node->nworkers_launched = pcxt->nworkers_launched; + node->nreaders = 0; + node->nextreader = 0; /* Set up tuple queue readers to read the results. */ if (pcxt->nworkers_launched > 0) { - node->nreaders = 0; - node->nextreader = 0; - node->reader = - palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *)); + node->reader = palloc(pcxt->nworkers_launched * + sizeof(TupleQueueReader *)); for (i = 0; i < pcxt->nworkers_launched; ++i) { @@ -314,8 +317,8 @@ gather_readnext(GatherState *gatherstate) tup = TupleQueueReaderNext(reader, true, &readerdone); /* - * If this reader is done, remove it. If all readers are done, clean - * up remaining worker state. + * If this reader is done, remove it, and collapse the array. If all + * readers are done, clean up remaining worker state. */ if (readerdone) { diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 90c1be96ef4..d9c761b0424 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -26,24 +26,30 @@ #include "utils/memutils.h" #include "utils/rel.h" -/* - * Tuple array for each worker - */ -typedef struct GMReaderTupleBuffer -{ - HeapTuple *tuple; - int readCounter; - int nTuples; - bool done; -} GMReaderTupleBuffer; - /* * When we read tuples from workers, it's a good idea to read several at once * for efficiency when possible: this minimizes context-switching overhead. * But reading too many at a time wastes memory without improving performance. + * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one). */ #define MAX_TUPLE_STORE 10 +/* + * Pending-tuple array for each worker. This holds additional tuples that + * we were able to fetch from the worker, but can't process yet. In addition, + * this struct holds the "done" flag indicating the worker is known to have + * no more tuples. (We do not use this struct for the leader; we don't keep + * any pending tuples for the leader, and the need_to_scan_locally flag serves + * as its "done" indicator.) + */ +typedef struct GMReaderTupleBuffer +{ + HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */ + int nTuples; /* number of tuples currently stored */ + int readCounter; /* index of next tuple to extract */ + bool done; /* true if reader is known exhausted */ +} GMReaderTupleBuffer; + static TupleTableSlot *ExecGatherMerge(PlanState *pstate); static int32 heap_compare_slots(Datum a, Datum b, void *arg); static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); @@ -53,7 +59,7 @@ static void gather_merge_init(GatherMergeState *gm_state); static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait); -static void form_tuple_array(GatherMergeState *gm_state, int reader); +static void load_tuple_array(GatherMergeState *gm_state, int reader); /* ---------------------------------------------------------------- * ExecInitGather @@ -78,6 +84,9 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) gm_state->ps.state = estate; gm_state->ps.ExecProcNode = ExecGatherMerge; + gm_state->initialized = false; + gm_state->gm_initialized = false; + /* * Miscellaneous initialization * @@ -86,10 +95,10 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) ExecAssignExprContext(estate, &gm_state->ps); /* - * initialize child expressions + * GatherMerge doesn't support checking a qual (it's always more efficient + * to do it in the child node). */ - gm_state->ps.qual = - ExecInitQual(node->plan.qual, &gm_state->ps); + Assert(!node->plan.qual); /* * tuple table initialization @@ -108,8 +117,6 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) ExecAssignResultTypeFromTL(&gm_state->ps); ExecAssignProjectionInfo(&gm_state->ps, NULL); - gm_state->gm_initialized = false; - /* * initialize sort-key information */ @@ -176,7 +183,7 @@ ExecGatherMerge(PlanState *pstate) if (!node->initialized) { EState *estate = node->ps.state; - GatherMerge *gm = (GatherMerge *) node->ps.plan; + GatherMerge *gm = castNode(GatherMerge, node->ps.plan); /* * Sometimes we might have to run without parallelism; but if parallel @@ -198,17 +205,16 @@ ExecGatherMerge(PlanState *pstate) /* Try to launch workers. */ pcxt = node->pei->pcxt; LaunchParallelWorkers(pcxt); + /* We save # workers launched for the benefit of EXPLAIN */ node->nworkers_launched = pcxt->nworkers_launched; + node->nreaders = 0; /* Set up tuple queue readers to read the results. */ if (pcxt->nworkers_launched > 0) { - node->nreaders = 0; node->reader = palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *)); - Assert(gm->numCols); - for (i = 0; i < pcxt->nworkers_launched; ++i) { shm_mq_set_handle(node->pei->tqueue[i], @@ -246,9 +252,7 @@ ExecGatherMerge(PlanState *pstate) return NULL; /* - * form the result tuple using ExecProject(), and return it --- unless the - * projection produces an empty set, in which case we must loop back - * around for another tuple + * Form the result tuple using ExecProject(), and return it. */ econtext->ecxt_outertuple = slot; return ExecProject(node->ps.ps_ProjInfo); @@ -372,17 +376,16 @@ static void gather_merge_init(GatherMergeState *gm_state) { int nreaders = gm_state->nreaders; - bool initialize = true; + bool nowait = true; int i; /* - * Allocate gm_slots for the number of worker + one more slot for leader. + * Allocate gm_slots for the number of workers + one more slot for leader. * Last slot is always for leader. Leader always calls ExecProcNode() to * read the tuple which will return the TupleTableSlot. Later it will * directly get assigned to gm_slot. So just initialize leader gm_slot - * with NULL. For other slots below code will call - * ExecInitExtraTupleSlot() which will do the initialization of worker - * slots. + * with NULL. For other slots, code below will call + * ExecInitExtraTupleSlot() to create a slot for the worker's results. */ gm_state->gm_slots = palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *)); @@ -391,10 +394,10 @@ gather_merge_init(GatherMergeState *gm_state) /* Initialize the tuple slot and tuple array for each worker */ gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * - (gm_state->nreaders + 1)); + gm_state->nreaders); for (i = 0; i < gm_state->nreaders; i++) { - /* Allocate the tuple array with MAX_TUPLE_STORE size */ + /* Allocate the tuple array with length MAX_TUPLE_STORE */ gm_state->gm_tuple_buffers[i].tuple = (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); @@ -411,39 +414,53 @@ gather_merge_init(GatherMergeState *gm_state) /* * First, try to read a tuple from each worker (including leader) in - * nowait mode, so that we initialize read from each worker as well as - * leader. After this, if all active workers are unable to produce a - * tuple, then re-read and this time use wait mode. For workers that were - * able to produce a tuple in the earlier loop and are still active, just - * try to fill the tuple array if more tuples are avaiable. + * nowait mode. After this, if not all workers were able to produce a + * tuple (or a "done" indication), then re-read from remaining workers, + * this time using wait mode. Add all live readers (those producing at + * least one tuple) to the heap. */ reread: for (i = 0; i < nreaders + 1; i++) { CHECK_FOR_INTERRUPTS(); - if (!gm_state->gm_tuple_buffers[i].done && - (TupIsNull(gm_state->gm_slots[i]) || - gm_state->gm_slots[i]->tts_isempty)) + /* ignore this source if already known done */ + if ((i < nreaders) ? + !gm_state->gm_tuple_buffers[i].done : + gm_state->need_to_scan_locally) { - if (gather_merge_readnext(gm_state, i, initialize)) + if (TupIsNull(gm_state->gm_slots[i])) { - binaryheap_add_unordered(gm_state->gm_heap, - Int32GetDatum(i)); + /* Don't have a tuple yet, try to get one */ + if (gather_merge_readnext(gm_state, i, nowait)) + binaryheap_add_unordered(gm_state->gm_heap, + Int32GetDatum(i)); + } + else + { + /* + * We already got at least one tuple from this worker, but + * might as well see if it has any more ready by now. + */ + load_tuple_array(gm_state, i); } } - else - form_tuple_array(gm_state, i); } - initialize = false; + /* need not recheck leader, since nowait doesn't matter for it */ for (i = 0; i < nreaders; i++) + { if (!gm_state->gm_tuple_buffers[i].done && - (TupIsNull(gm_state->gm_slots[i]) || - gm_state->gm_slots[i]->tts_isempty)) + TupIsNull(gm_state->gm_slots[i])) + { + nowait = false; goto reread; + } + } + /* Now heapify the heap. */ binaryheap_build(gm_state->gm_heap); + gm_state->gm_initialized = true; } @@ -458,7 +475,7 @@ gather_merge_clear_slots(GatherMergeState *gm_state) for (i = 0; i < gm_state->nreaders; i++) { pfree(gm_state->gm_tuple_buffers[i].tuple); - gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]); + ExecClearTuple(gm_state->gm_slots[i]); } /* Free tuple array as we don't need it any more */ @@ -498,7 +515,10 @@ gather_merge_getnext(GatherMergeState *gm_state) if (gather_merge_readnext(gm_state, i, false)) binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); else + { + /* reader exhausted, remove it from heap */ (void) binaryheap_remove_first(gm_state->gm_heap); + } } if (binaryheap_empty(gm_state->gm_heap)) @@ -516,37 +536,37 @@ gather_merge_getnext(GatherMergeState *gm_state) } /* - * Read the tuple for given reader in nowait mode, and form the tuple array. + * Read tuple(s) for given reader in nowait mode, and load into its tuple + * array, until we have MAX_TUPLE_STORE of them or would have to block. */ static void -form_tuple_array(GatherMergeState *gm_state, int reader) +load_tuple_array(GatherMergeState *gm_state, int reader) { - GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + GMReaderTupleBuffer *tuple_buffer; int i; - /* Last slot is for leader and we don't build tuple array for leader */ + /* Don't do anything if this is the leader. */ if (reader == gm_state->nreaders) return; - /* - * We here because we already read all the tuples from the tuple array, so - * initialize the counter to zero. - */ + tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + + /* If there's nothing in the array, reset the counters to zero. */ if (tuple_buffer->nTuples == tuple_buffer->readCounter) tuple_buffer->nTuples = tuple_buffer->readCounter = 0; - /* Tuple array is already full? */ - if (tuple_buffer->nTuples == MAX_TUPLE_STORE) - return; - + /* Try to fill additional slots in the array. */ for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) { - tuple_buffer->tuple[i] = heap_copytuple(gm_readnext_tuple(gm_state, - reader, - false, - &tuple_buffer->done)); - if (!HeapTupleIsValid(tuple_buffer->tuple[i])) + HeapTuple tuple; + + tuple = gm_readnext_tuple(gm_state, + reader, + true, + &tuple_buffer->done); + if (!HeapTupleIsValid(tuple)) break; + tuple_buffer->tuple[i] = heap_copytuple(tuple); tuple_buffer->nTuples++; } } @@ -554,13 +574,15 @@ form_tuple_array(GatherMergeState *gm_state, int reader) /* * Store the next tuple for a given reader into the appropriate slot. * - * Returns false if the reader is exhausted, and true otherwise. + * Returns true if successful, false if not (either reader is exhausted, + * or we didn't want to wait for a tuple). Sets done flag if reader + * is found to be exhausted. */ static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) { GMReaderTupleBuffer *tuple_buffer; - HeapTuple tup = NULL; + HeapTuple tup; /* * If we're being asked to generate a tuple from the leader, then we just @@ -580,7 +602,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) gm_state->gm_slots[reader] = outerTupleSlot; return true; } - gm_state->gm_tuple_buffers[reader].done = true; + /* need_to_scan_locally serves as "done" flag for leader */ gm_state->need_to_scan_locally = false; } return false; @@ -592,7 +614,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) if (tuple_buffer->nTuples > tuple_buffer->readCounter) { /* Return any tuple previously read that is still buffered. */ - tuple_buffer = &gm_state->gm_tuple_buffers[reader]; tup = tuple_buffer->tuple[tuple_buffer->readCounter++]; } else if (tuple_buffer->done) @@ -605,19 +626,19 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) else { /* Read and buffer next tuple. */ - tup = heap_copytuple(gm_readnext_tuple(gm_state, - reader, - nowait, - &tuple_buffer->done)); + tup = gm_readnext_tuple(gm_state, + reader, + nowait, + &tuple_buffer->done); + if (!HeapTupleIsValid(tup)) + return false; + tup = heap_copytuple(tup); /* * Attempt to read more tuples in nowait mode and store them in the - * tuple array. + * pending-tuple array for the reader. */ - if (HeapTupleIsValid(tup)) - form_tuple_array(gm_state, reader); - else - return false; + load_tuple_array(gm_state, reader); } Assert(HeapTupleIsValid(tup)); @@ -640,15 +661,10 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done) { TupleQueueReader *reader; - HeapTuple tup = NULL; + HeapTuple tup; MemoryContext oldContext; MemoryContext tupleContext; - tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory; - - if (done != NULL) - *done = false; - /* Check for async events, particularly messages from workers. */ CHECK_FOR_INTERRUPTS(); @@ -656,6 +672,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, reader = gm_state->reader[nreader]; /* Run TupleQueueReaders in per-tuple context */ + tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory; oldContext = MemoryContextSwitchTo(tupleContext); tup = TupleQueueReaderNext(reader, nowait, done); MemoryContextSwitchTo(oldContext); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3272c4b3155..eb66163a356 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1911,14 +1911,16 @@ typedef struct UniqueState typedef struct GatherState { PlanState ps; /* its first field is NodeTag */ - bool initialized; - struct ParallelExecutorInfo *pei; - int nreaders; - int nextreader; - int nworkers_launched; - struct TupleQueueReader **reader; + bool initialized; /* workers launched? */ + bool need_to_scan_locally; /* need to read from local plan? */ + /* these fields are set up once: */ TupleTableSlot *funnel_slot; - bool need_to_scan_locally; + struct ParallelExecutorInfo *pei; + /* all remaining fields are reinitialized during a rescan: */ + int nworkers_launched; /* original number of workers */ + int nreaders; /* number of still-active workers */ + int nextreader; /* next one to try to read from */ + struct TupleQueueReader **reader; /* array with nreaders active entries */ } GatherState; /* ---------------- @@ -1929,24 +1931,26 @@ typedef struct GatherState * merge the results into a single sorted stream. * ---------------- */ -struct GMReaderTuple; +struct GMReaderTupleBuffer; /* private in nodeGatherMerge.c */ typedef struct GatherMergeState { PlanState ps; /* its first field is NodeTag */ - bool initialized; + bool initialized; /* workers launched? */ + bool gm_initialized; /* gather_merge_init() done? */ + bool need_to_scan_locally; /* need to read from local plan? */ + /* these fields are set up once: */ + TupleDesc tupDesc; /* descriptor for subplan result tuples */ + int gm_nkeys; /* number of sort columns */ + SortSupport gm_sortkeys; /* array of length gm_nkeys */ struct ParallelExecutorInfo *pei; - int nreaders; - int nworkers_launched; - struct TupleQueueReader **reader; - TupleDesc tupDesc; - TupleTableSlot **gm_slots; + /* all remaining fields are reinitialized during a rescan: */ + int nworkers_launched; /* original number of workers */ + int nreaders; /* number of active workers */ + TupleTableSlot **gm_slots; /* array with nreaders+1 entries */ + struct TupleQueueReader **reader; /* array with nreaders active entries */ + struct GMReaderTupleBuffer *gm_tuple_buffers; /* nreaders tuple buffers */ struct binaryheap *gm_heap; /* binary heap of slot indices */ - bool gm_initialized; /* gather merge initilized ? */ - bool need_to_scan_locally; - int gm_nkeys; - SortSupport gm_sortkeys; /* array of length ms_nkeys */ - struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per reader */ } GatherMergeState; /* ----------------