diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index f9cf1b2f875..d93fbacdf9e 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -71,6 +71,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate->ps.plan = (Plan *) node;
 	gatherstate->ps.state = estate;
 	gatherstate->ps.ExecProcNode = ExecGather;
+
+	gatherstate->initialized = false;
 	gatherstate->need_to_scan_locally = !node->single_copy;
 	gatherstate->tuples_needed = -1;
 
@@ -82,10 +84,10 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	ExecAssignExprContext(estate, &gatherstate->ps);
 
 	/*
-	 * initialize child expressions
+	 * Gather doesn't support checking a qual (it's always more efficient to
+	 * do it in the child node).
 	 */
-	gatherstate->ps.qual =
-		ExecInitQual(node->plan.qual, (PlanState *) gatherstate);
+	Assert(!node->plan.qual);
 
 	/*
 	 * tuple table initialization
@@ -169,15 +171,16 @@ ExecGather(PlanState *pstate)
 			 */
 			pcxt = node->pei->pcxt;
 			LaunchParallelWorkers(pcxt);
+			/* We save # workers launched for the benefit of EXPLAIN */
 			node->nworkers_launched = pcxt->nworkers_launched;
+			node->nreaders = 0;
+			node->nextreader = 0;
 
 			/* Set up tuple queue readers to read the results. */
 			if (pcxt->nworkers_launched > 0)
 			{
-				node->nreaders = 0;
-				node->nextreader = 0;
-				node->reader =
-					palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *));
+				node->reader = palloc(pcxt->nworkers_launched *
+									  sizeof(TupleQueueReader *));
 
 				for (i = 0; i < pcxt->nworkers_launched; ++i)
 				{
@@ -316,8 +319,8 @@ gather_readnext(GatherState *gatherstate)
 		tup = TupleQueueReaderNext(reader, true, &readerdone);
 
 		/*
-		 * If this reader is done, remove it.  If all readers are done, clean
-		 * up remaining worker state.
+		 * If this reader is done, remove it, and collapse the array.  If all
+		 * readers are done, clean up remaining worker state.
 		 */
 		if (readerdone)
 		{
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 0bd5da38b4a..67da5ff71ff 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -26,24 +26,30 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
-/*
- * Tuple array for each worker
- */
-typedef struct GMReaderTupleBuffer
-{
-	HeapTuple  *tuple;
-	int			readCounter;
-	int			nTuples;
-	bool		done;
-} GMReaderTupleBuffer;
-
 /*
  * When we read tuples from workers, it's a good idea to read several at once
  * for efficiency when possible: this minimizes context-switching overhead.
  * But reading too many at a time wastes memory without improving performance.
+ * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one).
  */
 #define MAX_TUPLE_STORE 10
 
+/*
+ * Pending-tuple array for each worker.  This holds additional tuples that
+ * we were able to fetch from the worker, but can't process yet.  In addition,
+ * this struct holds the "done" flag indicating the worker is known to have
+ * no more tuples.  (We do not use this struct for the leader; we don't keep
+ * any pending tuples for the leader, and the need_to_scan_locally flag serves
+ * as its "done" indicator.)
+ */
+typedef struct GMReaderTupleBuffer
+{
+	HeapTuple  *tuple;			/* array of length MAX_TUPLE_STORE */
+	int			nTuples;		/* number of tuples currently stored */
+	int			readCounter;	/* index of next tuple to extract */
+	bool		done;			/* true if reader is known exhausted */
+} GMReaderTupleBuffer;
+
 static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
 static int32 heap_compare_slots(Datum a, Datum b, void *arg);
 static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
@@ -53,7 +59,7 @@ static void gather_merge_init(GatherMergeState *gm_state);
 static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
 static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
 					  bool nowait);
-static void form_tuple_array(GatherMergeState *gm_state, int reader);
+static void load_tuple_array(GatherMergeState *gm_state, int reader);
 
 /* ----------------------------------------------------------------
  *		ExecInitGather
@@ -77,6 +83,9 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
 	gm_state->ps.plan = (Plan *) node;
 	gm_state->ps.state = estate;
 	gm_state->ps.ExecProcNode = ExecGatherMerge;
+
+	gm_state->initialized = false;
+	gm_state->gm_initialized = false;
 	gm_state->tuples_needed = -1;
 
 	/*
@@ -87,10 +96,10 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
 	ExecAssignExprContext(estate, &gm_state->ps);
 
 	/*
-	 * initialize child expressions
+	 * GatherMerge doesn't support checking a qual (it's always more efficient
+	 * to do it in the child node).
 	 */
-	gm_state->ps.qual =
-		ExecInitQual(node->plan.qual, &gm_state->ps);
+	Assert(!node->plan.qual);
 
 	/*
 	 * tuple table initialization
@@ -109,8 +118,6 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
 	ExecAssignResultTypeFromTL(&gm_state->ps);
 	ExecAssignProjectionInfo(&gm_state->ps, NULL);
 
-	gm_state->gm_initialized = false;
-
 	/*
 	 * initialize sort-key information
 	 */
@@ -177,7 +184,7 @@ ExecGatherMerge(PlanState *pstate)
 	if (!node->initialized)
 	{
 		EState	   *estate = node->ps.state;
-		GatherMerge *gm = (GatherMerge *) node->ps.plan;
+		GatherMerge *gm = castNode(GatherMerge, node->ps.plan);
 
 		/*
 		 * Sometimes we might have to run without parallelism; but if parallel
@@ -200,17 +207,16 @@ ExecGatherMerge(PlanState *pstate)
 			/* Try to launch workers. */
 			pcxt = node->pei->pcxt;
 			LaunchParallelWorkers(pcxt);
+			/* We save # workers launched for the benefit of EXPLAIN */
 			node->nworkers_launched = pcxt->nworkers_launched;
+			node->nreaders = 0;
 
 			/* Set up tuple queue readers to read the results. */
 			if (pcxt->nworkers_launched > 0)
 			{
-				node->nreaders = 0;
 				node->reader = palloc(pcxt->nworkers_launched *
 									  sizeof(TupleQueueReader *));
 
-				Assert(gm->numCols);
-
 				for (i = 0; i < pcxt->nworkers_launched; ++i)
 				{
 					shm_mq_set_handle(node->pei->tqueue[i],
@@ -248,9 +254,7 @@ ExecGatherMerge(PlanState *pstate)
 		return NULL;
 
 	/*
-	 * form the result tuple using ExecProject(), and return it --- unless the
-	 * projection produces an empty set, in which case we must loop back
-	 * around for another tuple
+	 * Form the result tuple using ExecProject(), and return it.
 	 */
 	econtext->ecxt_outertuple = slot;
 	return ExecProject(node->ps.ps_ProjInfo);
@@ -374,17 +378,16 @@ static void
 gather_merge_init(GatherMergeState *gm_state)
 {
 	int			nreaders = gm_state->nreaders;
-	bool		initialize = true;
+	bool		nowait = true;
 	int			i;
 
 	/*
-	 * Allocate gm_slots for the number of worker + one more slot for leader.
+	 * Allocate gm_slots for the number of workers + one more slot for leader.
 	 * Last slot is always for leader. Leader always calls ExecProcNode() to
 	 * read the tuple which will return the TupleTableSlot. Later it will
 	 * directly get assigned to gm_slot. So just initialize leader gm_slot
-	 * with NULL. For other slots below code will call
-	 * ExecInitExtraTupleSlot() which will do the initialization of worker
-	 * slots.
+	 * with NULL. For other slots, code below will call
+	 * ExecInitExtraTupleSlot() to create a slot for the worker's results.
 	 */
 	gm_state->gm_slots =
 		palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
@@ -393,10 +396,10 @@ gather_merge_init(GatherMergeState *gm_state)
 	/* Initialize the tuple slot and tuple array for each worker */
 	gm_state->gm_tuple_buffers =
 		(GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
-										(gm_state->nreaders + 1));
+										gm_state->nreaders);
 	for (i = 0; i < gm_state->nreaders; i++)
 	{
-		/* Allocate the tuple array with MAX_TUPLE_STORE size */
+		/* Allocate the tuple array with length MAX_TUPLE_STORE */
 		gm_state->gm_tuple_buffers[i].tuple =
 			(HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
 
@@ -413,39 +416,53 @@ gather_merge_init(GatherMergeState *gm_state)
 
 	/*
 	 * First, try to read a tuple from each worker (including leader) in
-	 * nowait mode, so that we initialize read from each worker as well as
-	 * leader. After this, if all active workers are unable to produce a
-	 * tuple, then re-read and this time use wait mode. For workers that were
-	 * able to produce a tuple in the earlier loop and are still active, just
-	 * try to fill the tuple array if more tuples are avaiable.
+	 * nowait mode.  After this, if not all workers were able to produce a
+	 * tuple (or a "done" indication), then re-read from remaining workers,
+	 * this time using wait mode.  Add all live readers (those producing at
+	 * least one tuple) to the heap.
 	 */
 reread:
 	for (i = 0; i < nreaders + 1; i++)
 	{
 		CHECK_FOR_INTERRUPTS();
 
-		if (!gm_state->gm_tuple_buffers[i].done &&
-			(TupIsNull(gm_state->gm_slots[i]) ||
-			 gm_state->gm_slots[i]->tts_isempty))
+		/* ignore this source if already known done */
+		if ((i < nreaders) ?
+			!gm_state->gm_tuple_buffers[i].done :
+			gm_state->need_to_scan_locally)
 		{
-			if (gather_merge_readnext(gm_state, i, initialize))
+			if (TupIsNull(gm_state->gm_slots[i]))
 			{
-				binaryheap_add_unordered(gm_state->gm_heap,
-										 Int32GetDatum(i));
+				/* Don't have a tuple yet, try to get one */
+				if (gather_merge_readnext(gm_state, i, nowait))
+					binaryheap_add_unordered(gm_state->gm_heap,
+											 Int32GetDatum(i));
+			}
+			else
+			{
+				/*
+				 * We already got at least one tuple from this worker, but
+				 * might as well see if it has any more ready by now.
+				 */
+				load_tuple_array(gm_state, i);
 			}
 		}
-		else
-			form_tuple_array(gm_state, i);
 	}
-	initialize = false;
 
+	/* need not recheck leader, since nowait doesn't matter for it */
 	for (i = 0; i < nreaders; i++)
+	{
 		if (!gm_state->gm_tuple_buffers[i].done &&
-			(TupIsNull(gm_state->gm_slots[i]) ||
-			 gm_state->gm_slots[i]->tts_isempty))
+			TupIsNull(gm_state->gm_slots[i]))
+		{
+			nowait = false;
 			goto reread;
+		}
+	}
 
+	/* Now heapify the heap. */
 	binaryheap_build(gm_state->gm_heap);
+
 	gm_state->gm_initialized = true;
 }
 
@@ -460,7 +477,7 @@ gather_merge_clear_slots(GatherMergeState *gm_state)
 	for (i = 0; i < gm_state->nreaders; i++)
 	{
 		pfree(gm_state->gm_tuple_buffers[i].tuple);
-		gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]);
+		ExecClearTuple(gm_state->gm_slots[i]);
 	}
 
 	/* Free tuple array as we don't need it any more */
@@ -500,7 +517,10 @@ gather_merge_getnext(GatherMergeState *gm_state)
 		if (gather_merge_readnext(gm_state, i, false))
 			binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
 		else
+		{
+			/* reader exhausted, remove it from heap */
 			(void) binaryheap_remove_first(gm_state->gm_heap);
+		}
 	}
 
 	if (binaryheap_empty(gm_state->gm_heap))
@@ -518,37 +538,37 @@ gather_merge_getnext(GatherMergeState *gm_state)
 }
 
 /*
- * Read the tuple for given reader in nowait mode, and form the tuple array.
+ * Read tuple(s) for given reader in nowait mode, and load into its tuple
+ * array, until we have MAX_TUPLE_STORE of them or would have to block.
  */
 static void
-form_tuple_array(GatherMergeState *gm_state, int reader)
+load_tuple_array(GatherMergeState *gm_state, int reader)
 {
-	GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+	GMReaderTupleBuffer *tuple_buffer;
 	int			i;
 
-	/* Last slot is for leader and we don't build tuple array for leader */
+	/* Don't do anything if this is the leader. */
 	if (reader == gm_state->nreaders)
 		return;
 
-	/*
-	 * We here because we already read all the tuples from the tuple array, so
-	 * initialize the counter to zero.
-	 */
+	tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+
+	/* If there's nothing in the array, reset the counters to zero. */
 	if (tuple_buffer->nTuples == tuple_buffer->readCounter)
 		tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
 
-	/* Tuple array is already full? */
-	if (tuple_buffer->nTuples == MAX_TUPLE_STORE)
-		return;
-
+	/* Try to fill additional slots in the array. */
 	for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
 	{
-		tuple_buffer->tuple[i] = heap_copytuple(gm_readnext_tuple(gm_state,
-																  reader,
-																  false,
-																  &tuple_buffer->done));
-		if (!HeapTupleIsValid(tuple_buffer->tuple[i]))
+		HeapTuple	tuple;
+
+		tuple = gm_readnext_tuple(gm_state,
+								  reader,
+								  true,
+								  &tuple_buffer->done);
+		if (!HeapTupleIsValid(tuple))
 			break;
+		tuple_buffer->tuple[i] = heap_copytuple(tuple);
 		tuple_buffer->nTuples++;
 	}
 }
@@ -556,13 +576,15 @@ form_tuple_array(GatherMergeState *gm_state, int reader)
 /*
  * Store the next tuple for a given reader into the appropriate slot.
  *
- * Returns false if the reader is exhausted, and true otherwise.
+ * Returns true if successful, false if not (either reader is exhausted,
+ * or we didn't want to wait for a tuple).  Sets done flag if reader
+ * is found to be exhausted.
  */
 static bool
 gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 {
 	GMReaderTupleBuffer *tuple_buffer;
-	HeapTuple	tup = NULL;
+	HeapTuple	tup;
 
 	/*
 	 * If we're being asked to generate a tuple from the leader, then we just
@@ -582,7 +604,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 				gm_state->gm_slots[reader] = outerTupleSlot;
 				return true;
 			}
-			gm_state->gm_tuple_buffers[reader].done = true;
+			/* need_to_scan_locally serves as "done" flag for leader */
 			gm_state->need_to_scan_locally = false;
 		}
 		return false;
@@ -594,7 +616,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 	if (tuple_buffer->nTuples > tuple_buffer->readCounter)
 	{
 		/* Return any tuple previously read that is still buffered. */
-		tuple_buffer = &gm_state->gm_tuple_buffers[reader];
 		tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
 	}
 	else if (tuple_buffer->done)
@@ -607,19 +628,19 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
 	else
 	{
 		/* Read and buffer next tuple. */
-		tup = heap_copytuple(gm_readnext_tuple(gm_state,
-											   reader,
-											   nowait,
-											   &tuple_buffer->done));
+		tup = gm_readnext_tuple(gm_state,
+								reader,
+								nowait,
+								&tuple_buffer->done);
+		if (!HeapTupleIsValid(tup))
+			return false;
+		tup = heap_copytuple(tup);
 
 		/*
 		 * Attempt to read more tuples in nowait mode and store them in the
-		 * tuple array.
+		 * pending-tuple array for the reader.
 		 */
-		if (HeapTupleIsValid(tup))
-			form_tuple_array(gm_state, reader);
-		else
-			return false;
+		load_tuple_array(gm_state, reader);
 	}
 
 	Assert(HeapTupleIsValid(tup));
@@ -642,15 +663,10 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
 				  bool *done)
 {
 	TupleQueueReader *reader;
-	HeapTuple	tup = NULL;
+	HeapTuple	tup;
 	MemoryContext oldContext;
 	MemoryContext tupleContext;
 
-	tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;
-
-	if (done != NULL)
-		*done = false;
-
 	/* Check for async events, particularly messages from workers. */
 	CHECK_FOR_INTERRUPTS();
 
@@ -658,6 +674,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
 	reader = gm_state->reader[nreader];
 
 	/* Run TupleQueueReaders in per-tuple context */
+	tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory;
 	oldContext = MemoryContextSwitchTo(tupleContext);
 	tup = TupleQueueReaderNext(reader, nowait, done);
 	MemoryContextSwitchTo(oldContext);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index d1565e74961..6cf128a7f02 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1923,15 +1923,17 @@ typedef struct UniqueState
 typedef struct GatherState
 {
 	PlanState	ps;				/* its first field is NodeTag */
-	bool		initialized;
-	struct ParallelExecutorInfo *pei;
-	int			nreaders;
-	int			nextreader;
-	int			nworkers_launched;
-	struct TupleQueueReader **reader;
-	TupleTableSlot *funnel_slot;
-	bool		need_to_scan_locally;
+	bool		initialized;	/* workers launched? */
+	bool		need_to_scan_locally;	/* need to read from local plan? */
 	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
+	/* these fields are set up once: */
+	TupleTableSlot *funnel_slot;
+	struct ParallelExecutorInfo *pei;
+	/* all remaining fields are reinitialized during a rescan: */
+	int			nworkers_launched;	/* original number of workers */
+	int			nreaders;		/* number of still-active workers */
+	int			nextreader;		/* next one to try to read from */
+	struct TupleQueueReader **reader;	/* array with nreaders active entries */
 } GatherState;
 
 /* ----------------
@@ -1942,25 +1944,27 @@ typedef struct GatherState
  *		merge the results into a single sorted stream.
  * ----------------
  */
-struct GMReaderTuple;
+struct GMReaderTupleBuffer;		/* private in nodeGatherMerge.c */
 
 typedef struct GatherMergeState
 {
 	PlanState	ps;				/* its first field is NodeTag */
-	bool		initialized;
-	struct ParallelExecutorInfo *pei;
-	int			nreaders;
-	int			nworkers_launched;
-	struct TupleQueueReader **reader;
-	TupleDesc	tupDesc;
-	TupleTableSlot **gm_slots;
-	struct binaryheap *gm_heap; /* binary heap of slot indices */
-	bool		gm_initialized; /* gather merge initilized ? */
-	bool		need_to_scan_locally;
+	bool		initialized;	/* workers launched? */
+	bool		gm_initialized; /* gather_merge_init() done? */
+	bool		need_to_scan_locally;	/* need to read from local plan? */
 	int64		tuples_needed;	/* tuple bound, see ExecSetTupleBound */
-	int			gm_nkeys;
-	SortSupport gm_sortkeys;	/* array of length ms_nkeys */
-	struct GMReaderTupleBuffer *gm_tuple_buffers;	/* tuple buffer per reader */
+	/* these fields are set up once: */
+	TupleDesc	tupDesc;		/* descriptor for subplan result tuples */
+	int			gm_nkeys;		/* number of sort columns */
+	SortSupport gm_sortkeys;	/* array of length gm_nkeys */
+	struct ParallelExecutorInfo *pei;
+	/* all remaining fields are reinitialized during a rescan: */
+	int			nworkers_launched;	/* original number of workers */
+	int			nreaders;		/* number of active workers */
+	TupleTableSlot **gm_slots;	/* array with nreaders+1 entries */
+	struct TupleQueueReader **reader;	/* array with nreaders active entries */
+	struct GMReaderTupleBuffer *gm_tuple_buffers;	/* nreaders tuple buffers */
+	struct binaryheap *gm_heap; /* binary heap of slot indices */
 } GatherMergeState;
 
 /* ----------------