diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 313b2344540..93a566ba629 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -214,8 +214,11 @@ ExecGather(GatherState *node) /* * Reset per-tuple memory context to free any expression evaluation * storage allocated in the previous tuple cycle. Note we can't do this - * until we're done projecting. + * until we're done projecting. This will also clear any previous tuple + * returned by a TupleQueueReader; to make sure we don't leave a dangling + * pointer around, clear the working slot first. */ + ExecClearTuple(node->funnel_slot); econtext = node->ps.ps_ExprContext; ResetExprContext(econtext); @@ -274,13 +277,19 @@ gather_getnext(GatherState *gatherstate) PlanState *outerPlan = outerPlanState(gatherstate); TupleTableSlot *outerTupleSlot; TupleTableSlot *fslot = gatherstate->funnel_slot; + MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory; HeapTuple tup; while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) { if (gatherstate->reader != NULL) { + MemoryContext oldContext; + + /* Run TupleQueueReaders in per-tuple context */ + oldContext = MemoryContextSwitchTo(tupleContext); tup = gather_readnext(gatherstate); + MemoryContextSwitchTo(oldContext); if (HeapTupleIsValid(tup)) { @@ -288,8 +297,7 @@ gather_getnext(GatherState *gatherstate) fslot, /* slot in which to store the tuple */ InvalidBuffer, /* buffer associated with this * tuple */ - true); /* pfree this pointer if not from heap */ - + false); /* slot should not pfree tuple */ return fslot; } } @@ -314,7 +322,7 @@ gather_getnext(GatherState *gatherstate) static HeapTuple gather_readnext(GatherState *gatherstate) { - int waitpos = gatherstate->nextreader; + int nvisited = 0; for (;;) { @@ -335,6 +343,7 @@ gather_readnext(GatherState *gatherstate) */ if (readerdone) { + Assert(!tup); DestroyTupleQueueReader(reader); --gatherstate->nreaders; if (gatherstate->nreaders == 0) @@ -342,17 +351,12 @@ gather_readnext(GatherState *gatherstate) ExecShutdownGatherWorkers(gatherstate); return NULL; } - else - { - memmove(&gatherstate->reader[gatherstate->nextreader], - &gatherstate->reader[gatherstate->nextreader + 1], - sizeof(TupleQueueReader *) - * (gatherstate->nreaders - gatherstate->nextreader)); - if (gatherstate->nextreader >= gatherstate->nreaders) - gatherstate->nextreader = 0; - if (gatherstate->nextreader < waitpos) - --waitpos; - } + memmove(&gatherstate->reader[gatherstate->nextreader], + &gatherstate->reader[gatherstate->nextreader + 1], + sizeof(TupleQueueReader *) + * (gatherstate->nreaders - gatherstate->nextreader)); + if (gatherstate->nextreader >= gatherstate->nreaders) + gatherstate->nextreader = 0; continue; } @@ -367,11 +371,13 @@ gather_readnext(GatherState *gatherstate) * every tuple, but it turns out to be much more efficient to keep * reading from the same queue until that would require blocking. */ - gatherstate->nextreader = - (gatherstate->nextreader + 1) % gatherstate->nreaders; + gatherstate->nextreader++; + if (gatherstate->nextreader >= gatherstate->nreaders) + gatherstate->nextreader = 0; - /* Have we visited every TupleQueueReader? */ - if (gatherstate->nextreader == waitpos) + /* Have we visited every (surviving) TupleQueueReader? */ + nvisited++; + if (nvisited >= gatherstate->nreaders) { /* * If (still) running plan locally, return NULL so caller can @@ -384,6 +390,7 @@ gather_readnext(GatherState *gatherstate) WaitLatch(MyLatch, WL_LATCH_SET, 0); CHECK_FOR_INTERRUPTS(); ResetLatch(MyLatch); + nvisited = 0; } } } diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index e81c333e4cd..64555599cee 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -524,13 +524,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader) /* * Fetch a tuple from a tuple queue reader. * + * The return value is NULL if there are no remaining tuples or if + * nowait = true and no tuple is ready to return. *done, if not NULL, + * is set to true when there are no remaining tuples and otherwise to false. + * + * The returned tuple, if any, is allocated in CurrentMemoryContext. + * That should be a short-lived (tuple-lifespan) context, because we are + * pretty cavalier about leaking memory in that context if we have to do + * tuple remapping. + * * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still * accumulate bytes from a partially-read message, so it's useful to call * this with nowait = true even if nothing is returned. - * - * The return value is NULL if there are no remaining queues or if - * nowait = true and no tuple is ready to return. *done, if not NULL, - * is set to true when queue is detached and otherwise to false. */ HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) @@ -565,10 +570,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) * OK, we got a message. Process it. * * One-byte messages are mode switch messages, so that we can switch - * between "control" and "data" mode. When in "data" mode, each - * message (unless exactly one byte) is a tuple. When in "control" - * mode, each message provides a transient-typmod-to-tupledesc mapping - * so we can interpret future tuples. + * between "control" and "data" mode. Otherwise, when in "data" mode, + * each message is a tuple. When in "control" mode, each message + * provides a transient-typmod-to-tupledesc mapping to let us + * interpret future tuples. Both of those cases certainly require + * more than one byte, so no confusion is possible. */ if (nbytes == 1) { diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h index 4f23c00feb1..3a0aba162d4 100644 --- a/src/include/executor/tqueue.h +++ b/src/include/executor/tqueue.h @@ -17,15 +17,17 @@ #include "storage/shm_mq.h" #include "tcop/dest.h" +/* Opaque struct, only known inside tqueue.c. */ +typedef struct TupleQueueReader TupleQueueReader; + /* Use this to send tuples to a shm_mq. */ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle); /* Use these to receive tuples from a shm_mq. */ -typedef struct TupleQueueReader TupleQueueReader; extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc); -extern void DestroyTupleQueueReader(TupleQueueReader *funnel); -extern HeapTuple TupleQueueReaderNext(TupleQueueReader *, +extern void DestroyTupleQueueReader(TupleQueueReader *reader); +extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done); #endif /* TQUEUE_H */