mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-24 01:29:19 +03:00 
			
		
		
		
	Code review for nodeGatherMerge.c.
Comment the fields of GatherMergeState, and organize them a bit more sensibly. Comment GMReaderTupleBuffer more usefully too. Improve assorted other comments that were obsolete or just not very good English. Get rid of the use of a GMReaderTupleBuffer for the leader process; that was confusing, since only the "done" field was used, and that in a way redundant with need_to_scan_locally. In gather_merge_init, avoid calling load_tuple_array for already-known-exhausted workers. I'm not sure if there's a live bug there, but the case is unlikely to be well tested due to timing considerations. Remove some useless code, such as duplicating the tts_isempty test done by TupIsNull. Remove useless initialization of ps.qual, replacing that with an assertion that we have no qual to check. (If we did, the code would fail to check it.) Avoid applying heap_copytuple to a null tuple. While that fails to crash, it's confusing and it makes the code less legible not more so IMO. Propagate a couple of these changes into nodeGather.c, as well. Back-patch to v10, partly because of the possibility that the gather_merge_init change is fixing a live bug, but mostly to keep the branches in sync to ease future bug fixes.
This commit is contained in:
		| @@ -71,6 +71,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags) | |||||||
| 	gatherstate->ps.plan = (Plan *) node; | 	gatherstate->ps.plan = (Plan *) node; | ||||||
| 	gatherstate->ps.state = estate; | 	gatherstate->ps.state = estate; | ||||||
| 	gatherstate->ps.ExecProcNode = ExecGather; | 	gatherstate->ps.ExecProcNode = ExecGather; | ||||||
|  |  | ||||||
|  | 	gatherstate->initialized = false; | ||||||
| 	gatherstate->need_to_scan_locally = !node->single_copy; | 	gatherstate->need_to_scan_locally = !node->single_copy; | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| @@ -81,10 +83,10 @@ ExecInitGather(Gather *node, EState *estate, int eflags) | |||||||
| 	ExecAssignExprContext(estate, &gatherstate->ps); | 	ExecAssignExprContext(estate, &gatherstate->ps); | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * initialize child expressions | 	 * Gather doesn't support checking a qual (it's always more efficient to | ||||||
|  | 	 * do it in the child node). | ||||||
| 	 */ | 	 */ | ||||||
| 	gatherstate->ps.qual = | 	Assert(!node->plan.qual); | ||||||
| 		ExecInitQual(node->plan.qual, (PlanState *) gatherstate); |  | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * tuple table initialization | 	 * tuple table initialization | ||||||
| @@ -167,15 +169,16 @@ ExecGather(PlanState *pstate) | |||||||
| 			 */ | 			 */ | ||||||
| 			pcxt = node->pei->pcxt; | 			pcxt = node->pei->pcxt; | ||||||
| 			LaunchParallelWorkers(pcxt); | 			LaunchParallelWorkers(pcxt); | ||||||
|  | 			/* We save # workers launched for the benefit of EXPLAIN */ | ||||||
| 			node->nworkers_launched = pcxt->nworkers_launched; | 			node->nworkers_launched = pcxt->nworkers_launched; | ||||||
|  | 			node->nreaders = 0; | ||||||
|  | 			node->nextreader = 0; | ||||||
|  |  | ||||||
| 			/* Set up tuple queue readers to read the results. */ | 			/* Set up tuple queue readers to read the results. */ | ||||||
| 			if (pcxt->nworkers_launched > 0) | 			if (pcxt->nworkers_launched > 0) | ||||||
| 			{ | 			{ | ||||||
| 				node->nreaders = 0; | 				node->reader = palloc(pcxt->nworkers_launched * | ||||||
| 				node->nextreader = 0; | 									  sizeof(TupleQueueReader *)); | ||||||
| 				node->reader = |  | ||||||
| 					palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *)); |  | ||||||
|  |  | ||||||
| 				for (i = 0; i < pcxt->nworkers_launched; ++i) | 				for (i = 0; i < pcxt->nworkers_launched; ++i) | ||||||
| 				{ | 				{ | ||||||
| @@ -314,8 +317,8 @@ gather_readnext(GatherState *gatherstate) | |||||||
| 		tup = TupleQueueReaderNext(reader, true, &readerdone); | 		tup = TupleQueueReaderNext(reader, true, &readerdone); | ||||||
|  |  | ||||||
| 		/* | 		/* | ||||||
| 		 * If this reader is done, remove it.  If all readers are done, clean | 		 * If this reader is done, remove it, and collapse the array.  If all | ||||||
| 		 * up remaining worker state. | 		 * readers are done, clean up remaining worker state. | ||||||
| 		 */ | 		 */ | ||||||
| 		if (readerdone) | 		if (readerdone) | ||||||
| 		{ | 		{ | ||||||
|   | |||||||
| @@ -26,24 +26,30 @@ | |||||||
| #include "utils/memutils.h" | #include "utils/memutils.h" | ||||||
| #include "utils/rel.h" | #include "utils/rel.h" | ||||||
|  |  | ||||||
| /* |  | ||||||
|  * Tuple array for each worker |  | ||||||
|  */ |  | ||||||
| typedef struct GMReaderTupleBuffer |  | ||||||
| { |  | ||||||
| 	HeapTuple  *tuple; |  | ||||||
| 	int			readCounter; |  | ||||||
| 	int			nTuples; |  | ||||||
| 	bool		done; |  | ||||||
| } GMReaderTupleBuffer; |  | ||||||
|  |  | ||||||
| /* | /* | ||||||
|  * When we read tuples from workers, it's a good idea to read several at once |  * When we read tuples from workers, it's a good idea to read several at once | ||||||
|  * for efficiency when possible: this minimizes context-switching overhead. |  * for efficiency when possible: this minimizes context-switching overhead. | ||||||
|  * But reading too many at a time wastes memory without improving performance. |  * But reading too many at a time wastes memory without improving performance. | ||||||
|  |  * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one). | ||||||
|  */ |  */ | ||||||
| #define MAX_TUPLE_STORE 10 | #define MAX_TUPLE_STORE 10 | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Pending-tuple array for each worker.  This holds additional tuples that | ||||||
|  |  * we were able to fetch from the worker, but can't process yet.  In addition, | ||||||
|  |  * this struct holds the "done" flag indicating the worker is known to have | ||||||
|  |  * no more tuples.  (We do not use this struct for the leader; we don't keep | ||||||
|  |  * any pending tuples for the leader, and the need_to_scan_locally flag serves | ||||||
|  |  * as its "done" indicator.) | ||||||
|  |  */ | ||||||
|  | typedef struct GMReaderTupleBuffer | ||||||
|  | { | ||||||
|  | 	HeapTuple  *tuple;			/* array of length MAX_TUPLE_STORE */ | ||||||
|  | 	int			nTuples;		/* number of tuples currently stored */ | ||||||
|  | 	int			readCounter;	/* index of next tuple to extract */ | ||||||
|  | 	bool		done;			/* true if reader is known exhausted */ | ||||||
|  | } GMReaderTupleBuffer; | ||||||
|  |  | ||||||
| static TupleTableSlot *ExecGatherMerge(PlanState *pstate); | static TupleTableSlot *ExecGatherMerge(PlanState *pstate); | ||||||
| static int32 heap_compare_slots(Datum a, Datum b, void *arg); | static int32 heap_compare_slots(Datum a, Datum b, void *arg); | ||||||
| static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); | static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); | ||||||
| @@ -53,7 +59,7 @@ static void gather_merge_init(GatherMergeState *gm_state); | |||||||
| static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); | static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); | ||||||
| static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, | static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, | ||||||
| 					  bool nowait); | 					  bool nowait); | ||||||
| static void form_tuple_array(GatherMergeState *gm_state, int reader); | static void load_tuple_array(GatherMergeState *gm_state, int reader); | ||||||
|  |  | ||||||
| /* ---------------------------------------------------------------- | /* ---------------------------------------------------------------- | ||||||
|  *		ExecInitGather |  *		ExecInitGather | ||||||
| @@ -78,6 +84,9 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) | |||||||
| 	gm_state->ps.state = estate; | 	gm_state->ps.state = estate; | ||||||
| 	gm_state->ps.ExecProcNode = ExecGatherMerge; | 	gm_state->ps.ExecProcNode = ExecGatherMerge; | ||||||
|  |  | ||||||
|  | 	gm_state->initialized = false; | ||||||
|  | 	gm_state->gm_initialized = false; | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * Miscellaneous initialization | 	 * Miscellaneous initialization | ||||||
| 	 * | 	 * | ||||||
| @@ -86,10 +95,10 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) | |||||||
| 	ExecAssignExprContext(estate, &gm_state->ps); | 	ExecAssignExprContext(estate, &gm_state->ps); | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * initialize child expressions | 	 * GatherMerge doesn't support checking a qual (it's always more efficient | ||||||
|  | 	 * to do it in the child node). | ||||||
| 	 */ | 	 */ | ||||||
| 	gm_state->ps.qual = | 	Assert(!node->plan.qual); | ||||||
| 		ExecInitQual(node->plan.qual, &gm_state->ps); |  | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * tuple table initialization | 	 * tuple table initialization | ||||||
| @@ -108,8 +117,6 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) | |||||||
| 	ExecAssignResultTypeFromTL(&gm_state->ps); | 	ExecAssignResultTypeFromTL(&gm_state->ps); | ||||||
| 	ExecAssignProjectionInfo(&gm_state->ps, NULL); | 	ExecAssignProjectionInfo(&gm_state->ps, NULL); | ||||||
|  |  | ||||||
| 	gm_state->gm_initialized = false; |  | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * initialize sort-key information | 	 * initialize sort-key information | ||||||
| 	 */ | 	 */ | ||||||
| @@ -176,7 +183,7 @@ ExecGatherMerge(PlanState *pstate) | |||||||
| 	if (!node->initialized) | 	if (!node->initialized) | ||||||
| 	{ | 	{ | ||||||
| 		EState	   *estate = node->ps.state; | 		EState	   *estate = node->ps.state; | ||||||
| 		GatherMerge *gm = (GatherMerge *) node->ps.plan; | 		GatherMerge *gm = castNode(GatherMerge, node->ps.plan); | ||||||
|  |  | ||||||
| 		/* | 		/* | ||||||
| 		 * Sometimes we might have to run without parallelism; but if parallel | 		 * Sometimes we might have to run without parallelism; but if parallel | ||||||
| @@ -198,17 +205,16 @@ ExecGatherMerge(PlanState *pstate) | |||||||
| 			/* Try to launch workers. */ | 			/* Try to launch workers. */ | ||||||
| 			pcxt = node->pei->pcxt; | 			pcxt = node->pei->pcxt; | ||||||
| 			LaunchParallelWorkers(pcxt); | 			LaunchParallelWorkers(pcxt); | ||||||
|  | 			/* We save # workers launched for the benefit of EXPLAIN */ | ||||||
| 			node->nworkers_launched = pcxt->nworkers_launched; | 			node->nworkers_launched = pcxt->nworkers_launched; | ||||||
|  | 			node->nreaders = 0; | ||||||
|  |  | ||||||
| 			/* Set up tuple queue readers to read the results. */ | 			/* Set up tuple queue readers to read the results. */ | ||||||
| 			if (pcxt->nworkers_launched > 0) | 			if (pcxt->nworkers_launched > 0) | ||||||
| 			{ | 			{ | ||||||
| 				node->nreaders = 0; |  | ||||||
| 				node->reader = palloc(pcxt->nworkers_launched * | 				node->reader = palloc(pcxt->nworkers_launched * | ||||||
| 									  sizeof(TupleQueueReader *)); | 									  sizeof(TupleQueueReader *)); | ||||||
|  |  | ||||||
| 				Assert(gm->numCols); |  | ||||||
|  |  | ||||||
| 				for (i = 0; i < pcxt->nworkers_launched; ++i) | 				for (i = 0; i < pcxt->nworkers_launched; ++i) | ||||||
| 				{ | 				{ | ||||||
| 					shm_mq_set_handle(node->pei->tqueue[i], | 					shm_mq_set_handle(node->pei->tqueue[i], | ||||||
| @@ -246,9 +252,7 @@ ExecGatherMerge(PlanState *pstate) | |||||||
| 		return NULL; | 		return NULL; | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * form the result tuple using ExecProject(), and return it --- unless the | 	 * Form the result tuple using ExecProject(), and return it. | ||||||
| 	 * projection produces an empty set, in which case we must loop back |  | ||||||
| 	 * around for another tuple |  | ||||||
| 	 */ | 	 */ | ||||||
| 	econtext->ecxt_outertuple = slot; | 	econtext->ecxt_outertuple = slot; | ||||||
| 	return ExecProject(node->ps.ps_ProjInfo); | 	return ExecProject(node->ps.ps_ProjInfo); | ||||||
| @@ -372,17 +376,16 @@ static void | |||||||
| gather_merge_init(GatherMergeState *gm_state) | gather_merge_init(GatherMergeState *gm_state) | ||||||
| { | { | ||||||
| 	int			nreaders = gm_state->nreaders; | 	int			nreaders = gm_state->nreaders; | ||||||
| 	bool		initialize = true; | 	bool		nowait = true; | ||||||
| 	int			i; | 	int			i; | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * Allocate gm_slots for the number of worker + one more slot for leader. | 	 * Allocate gm_slots for the number of workers + one more slot for leader. | ||||||
| 	 * Last slot is always for leader. Leader always calls ExecProcNode() to | 	 * Last slot is always for leader. Leader always calls ExecProcNode() to | ||||||
| 	 * read the tuple which will return the TupleTableSlot. Later it will | 	 * read the tuple which will return the TupleTableSlot. Later it will | ||||||
| 	 * directly get assigned to gm_slot. So just initialize leader gm_slot | 	 * directly get assigned to gm_slot. So just initialize leader gm_slot | ||||||
| 	 * with NULL. For other slots below code will call | 	 * with NULL. For other slots, code below will call | ||||||
| 	 * ExecInitExtraTupleSlot() which will do the initialization of worker | 	 * ExecInitExtraTupleSlot() to create a slot for the worker's results. | ||||||
| 	 * slots. |  | ||||||
| 	 */ | 	 */ | ||||||
| 	gm_state->gm_slots = | 	gm_state->gm_slots = | ||||||
| 		palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *)); | 		palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *)); | ||||||
| @@ -391,10 +394,10 @@ gather_merge_init(GatherMergeState *gm_state) | |||||||
| 	/* Initialize the tuple slot and tuple array for each worker */ | 	/* Initialize the tuple slot and tuple array for each worker */ | ||||||
| 	gm_state->gm_tuple_buffers = | 	gm_state->gm_tuple_buffers = | ||||||
| 		(GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * | 		(GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * | ||||||
| 										(gm_state->nreaders + 1)); | 										gm_state->nreaders); | ||||||
| 	for (i = 0; i < gm_state->nreaders; i++) | 	for (i = 0; i < gm_state->nreaders; i++) | ||||||
| 	{ | 	{ | ||||||
| 		/* Allocate the tuple array with MAX_TUPLE_STORE size */ | 		/* Allocate the tuple array with length MAX_TUPLE_STORE */ | ||||||
| 		gm_state->gm_tuple_buffers[i].tuple = | 		gm_state->gm_tuple_buffers[i].tuple = | ||||||
| 			(HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); | 			(HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); | ||||||
|  |  | ||||||
| @@ -411,39 +414,53 @@ gather_merge_init(GatherMergeState *gm_state) | |||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * First, try to read a tuple from each worker (including leader) in | 	 * First, try to read a tuple from each worker (including leader) in | ||||||
| 	 * nowait mode, so that we initialize read from each worker as well as | 	 * nowait mode.  After this, if not all workers were able to produce a | ||||||
| 	 * leader. After this, if all active workers are unable to produce a | 	 * tuple (or a "done" indication), then re-read from remaining workers, | ||||||
| 	 * tuple, then re-read and this time use wait mode. For workers that were | 	 * this time using wait mode.  Add all live readers (those producing at | ||||||
| 	 * able to produce a tuple in the earlier loop and are still active, just | 	 * least one tuple) to the heap. | ||||||
| 	 * try to fill the tuple array if more tuples are avaiable. |  | ||||||
| 	 */ | 	 */ | ||||||
| reread: | reread: | ||||||
| 	for (i = 0; i < nreaders + 1; i++) | 	for (i = 0; i < nreaders + 1; i++) | ||||||
| 	{ | 	{ | ||||||
| 		CHECK_FOR_INTERRUPTS(); | 		CHECK_FOR_INTERRUPTS(); | ||||||
|  |  | ||||||
| 		if (!gm_state->gm_tuple_buffers[i].done && | 		/* ignore this source if already known done */ | ||||||
| 			(TupIsNull(gm_state->gm_slots[i]) || | 		if ((i < nreaders) ? | ||||||
| 			 gm_state->gm_slots[i]->tts_isempty)) | 			!gm_state->gm_tuple_buffers[i].done : | ||||||
|  | 			gm_state->need_to_scan_locally) | ||||||
| 		{ | 		{ | ||||||
| 			if (gather_merge_readnext(gm_state, i, initialize)) | 			if (TupIsNull(gm_state->gm_slots[i])) | ||||||
| 			{ | 			{ | ||||||
|  | 				/* Don't have a tuple yet, try to get one */ | ||||||
|  | 				if (gather_merge_readnext(gm_state, i, nowait)) | ||||||
| 					binaryheap_add_unordered(gm_state->gm_heap, | 					binaryheap_add_unordered(gm_state->gm_heap, | ||||||
| 											 Int32GetDatum(i)); | 											 Int32GetDatum(i)); | ||||||
| 			} | 			} | ||||||
| 		} |  | ||||||
| 			else | 			else | ||||||
| 			form_tuple_array(gm_state, i); | 			{ | ||||||
|  | 				/* | ||||||
|  | 				 * We already got at least one tuple from this worker, but | ||||||
|  | 				 * might as well see if it has any more ready by now. | ||||||
|  | 				 */ | ||||||
|  | 				load_tuple_array(gm_state, i); | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	initialize = false; |  | ||||||
|  |  | ||||||
|  | 	/* need not recheck leader, since nowait doesn't matter for it */ | ||||||
| 	for (i = 0; i < nreaders; i++) | 	for (i = 0; i < nreaders; i++) | ||||||
|  | 	{ | ||||||
| 		if (!gm_state->gm_tuple_buffers[i].done && | 		if (!gm_state->gm_tuple_buffers[i].done && | ||||||
| 			(TupIsNull(gm_state->gm_slots[i]) || | 			TupIsNull(gm_state->gm_slots[i])) | ||||||
| 			 gm_state->gm_slots[i]->tts_isempty)) | 		{ | ||||||
|  | 			nowait = false; | ||||||
| 			goto reread; | 			goto reread; | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	/* Now heapify the heap. */ | ||||||
| 	binaryheap_build(gm_state->gm_heap); | 	binaryheap_build(gm_state->gm_heap); | ||||||
|  |  | ||||||
| 	gm_state->gm_initialized = true; | 	gm_state->gm_initialized = true; | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -458,7 +475,7 @@ gather_merge_clear_slots(GatherMergeState *gm_state) | |||||||
| 	for (i = 0; i < gm_state->nreaders; i++) | 	for (i = 0; i < gm_state->nreaders; i++) | ||||||
| 	{ | 	{ | ||||||
| 		pfree(gm_state->gm_tuple_buffers[i].tuple); | 		pfree(gm_state->gm_tuple_buffers[i].tuple); | ||||||
| 		gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]); | 		ExecClearTuple(gm_state->gm_slots[i]); | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	/* Free tuple array as we don't need it any more */ | 	/* Free tuple array as we don't need it any more */ | ||||||
| @@ -498,8 +515,11 @@ gather_merge_getnext(GatherMergeState *gm_state) | |||||||
| 		if (gather_merge_readnext(gm_state, i, false)) | 		if (gather_merge_readnext(gm_state, i, false)) | ||||||
| 			binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); | 			binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); | ||||||
| 		else | 		else | ||||||
|  | 		{ | ||||||
|  | 			/* reader exhausted, remove it from heap */ | ||||||
| 			(void) binaryheap_remove_first(gm_state->gm_heap); | 			(void) binaryheap_remove_first(gm_state->gm_heap); | ||||||
| 		} | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if (binaryheap_empty(gm_state->gm_heap)) | 	if (binaryheap_empty(gm_state->gm_heap)) | ||||||
| 	{ | 	{ | ||||||
| @@ -516,37 +536,37 @@ gather_merge_getnext(GatherMergeState *gm_state) | |||||||
| } | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
|  * Read the tuple for given reader in nowait mode, and form the tuple array. |  * Read tuple(s) for given reader in nowait mode, and load into its tuple | ||||||
|  |  * array, until we have MAX_TUPLE_STORE of them or would have to block. | ||||||
|  */ |  */ | ||||||
| static void | static void | ||||||
| form_tuple_array(GatherMergeState *gm_state, int reader) | load_tuple_array(GatherMergeState *gm_state, int reader) | ||||||
| { | { | ||||||
| 	GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader]; | 	GMReaderTupleBuffer *tuple_buffer; | ||||||
| 	int			i; | 	int			i; | ||||||
|  |  | ||||||
| 	/* Last slot is for leader and we don't build tuple array for leader */ | 	/* Don't do anything if this is the leader. */ | ||||||
| 	if (reader == gm_state->nreaders) | 	if (reader == gm_state->nreaders) | ||||||
| 		return; | 		return; | ||||||
|  |  | ||||||
| 	/* | 	tuple_buffer = &gm_state->gm_tuple_buffers[reader]; | ||||||
| 	 * We here because we already read all the tuples from the tuple array, so |  | ||||||
| 	 * initialize the counter to zero. | 	/* If there's nothing in the array, reset the counters to zero. */ | ||||||
| 	 */ |  | ||||||
| 	if (tuple_buffer->nTuples == tuple_buffer->readCounter) | 	if (tuple_buffer->nTuples == tuple_buffer->readCounter) | ||||||
| 		tuple_buffer->nTuples = tuple_buffer->readCounter = 0; | 		tuple_buffer->nTuples = tuple_buffer->readCounter = 0; | ||||||
|  |  | ||||||
| 	/* Tuple array is already full? */ | 	/* Try to fill additional slots in the array. */ | ||||||
| 	if (tuple_buffer->nTuples == MAX_TUPLE_STORE) |  | ||||||
| 		return; |  | ||||||
|  |  | ||||||
| 	for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) | 	for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) | ||||||
| 	{ | 	{ | ||||||
| 		tuple_buffer->tuple[i] = heap_copytuple(gm_readnext_tuple(gm_state, | 		HeapTuple	tuple; | ||||||
|  |  | ||||||
|  | 		tuple = gm_readnext_tuple(gm_state, | ||||||
| 								  reader, | 								  reader, | ||||||
| 																  false, | 								  true, | ||||||
| 																  &tuple_buffer->done)); | 								  &tuple_buffer->done); | ||||||
| 		if (!HeapTupleIsValid(tuple_buffer->tuple[i])) | 		if (!HeapTupleIsValid(tuple)) | ||||||
| 			break; | 			break; | ||||||
|  | 		tuple_buffer->tuple[i] = heap_copytuple(tuple); | ||||||
| 		tuple_buffer->nTuples++; | 		tuple_buffer->nTuples++; | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @@ -554,13 +574,15 @@ form_tuple_array(GatherMergeState *gm_state, int reader) | |||||||
| /* | /* | ||||||
|  * Store the next tuple for a given reader into the appropriate slot. |  * Store the next tuple for a given reader into the appropriate slot. | ||||||
|  * |  * | ||||||
|  * Returns false if the reader is exhausted, and true otherwise. |  * Returns true if successful, false if not (either reader is exhausted, | ||||||
|  |  * or we didn't want to wait for a tuple).  Sets done flag if reader | ||||||
|  |  * is found to be exhausted. | ||||||
|  */ |  */ | ||||||
| static bool | static bool | ||||||
| gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) | gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) | ||||||
| { | { | ||||||
| 	GMReaderTupleBuffer *tuple_buffer; | 	GMReaderTupleBuffer *tuple_buffer; | ||||||
| 	HeapTuple	tup = NULL; | 	HeapTuple	tup; | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * If we're being asked to generate a tuple from the leader, then we just | 	 * If we're being asked to generate a tuple from the leader, then we just | ||||||
| @@ -580,7 +602,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) | |||||||
| 				gm_state->gm_slots[reader] = outerTupleSlot; | 				gm_state->gm_slots[reader] = outerTupleSlot; | ||||||
| 				return true; | 				return true; | ||||||
| 			} | 			} | ||||||
| 			gm_state->gm_tuple_buffers[reader].done = true; | 			/* need_to_scan_locally serves as "done" flag for leader */ | ||||||
| 			gm_state->need_to_scan_locally = false; | 			gm_state->need_to_scan_locally = false; | ||||||
| 		} | 		} | ||||||
| 		return false; | 		return false; | ||||||
| @@ -592,7 +614,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) | |||||||
| 	if (tuple_buffer->nTuples > tuple_buffer->readCounter) | 	if (tuple_buffer->nTuples > tuple_buffer->readCounter) | ||||||
| 	{ | 	{ | ||||||
| 		/* Return any tuple previously read that is still buffered. */ | 		/* Return any tuple previously read that is still buffered. */ | ||||||
| 		tuple_buffer = &gm_state->gm_tuple_buffers[reader]; |  | ||||||
| 		tup = tuple_buffer->tuple[tuple_buffer->readCounter++]; | 		tup = tuple_buffer->tuple[tuple_buffer->readCounter++]; | ||||||
| 	} | 	} | ||||||
| 	else if (tuple_buffer->done) | 	else if (tuple_buffer->done) | ||||||
| @@ -605,19 +626,19 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) | |||||||
| 	else | 	else | ||||||
| 	{ | 	{ | ||||||
| 		/* Read and buffer next tuple. */ | 		/* Read and buffer next tuple. */ | ||||||
| 		tup = heap_copytuple(gm_readnext_tuple(gm_state, | 		tup = gm_readnext_tuple(gm_state, | ||||||
| 								reader, | 								reader, | ||||||
| 								nowait, | 								nowait, | ||||||
| 											   &tuple_buffer->done)); | 								&tuple_buffer->done); | ||||||
|  | 		if (!HeapTupleIsValid(tup)) | ||||||
|  | 			return false; | ||||||
|  | 		tup = heap_copytuple(tup); | ||||||
|  |  | ||||||
| 		/* | 		/* | ||||||
| 		 * Attempt to read more tuples in nowait mode and store them in the | 		 * Attempt to read more tuples in nowait mode and store them in the | ||||||
| 		 * tuple array. | 		 * pending-tuple array for the reader. | ||||||
| 		 */ | 		 */ | ||||||
| 		if (HeapTupleIsValid(tup)) | 		load_tuple_array(gm_state, reader); | ||||||
| 			form_tuple_array(gm_state, reader); |  | ||||||
| 		else |  | ||||||
| 			return false; |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	Assert(HeapTupleIsValid(tup)); | 	Assert(HeapTupleIsValid(tup)); | ||||||
| @@ -640,15 +661,10 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, | |||||||
| 				  bool *done) | 				  bool *done) | ||||||
| { | { | ||||||
| 	TupleQueueReader *reader; | 	TupleQueueReader *reader; | ||||||
| 	HeapTuple	tup = NULL; | 	HeapTuple	tup; | ||||||
| 	MemoryContext oldContext; | 	MemoryContext oldContext; | ||||||
| 	MemoryContext tupleContext; | 	MemoryContext tupleContext; | ||||||
|  |  | ||||||
| 	tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory; |  | ||||||
|  |  | ||||||
| 	if (done != NULL) |  | ||||||
| 		*done = false; |  | ||||||
|  |  | ||||||
| 	/* Check for async events, particularly messages from workers. */ | 	/* Check for async events, particularly messages from workers. */ | ||||||
| 	CHECK_FOR_INTERRUPTS(); | 	CHECK_FOR_INTERRUPTS(); | ||||||
|  |  | ||||||
| @@ -656,6 +672,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, | |||||||
| 	reader = gm_state->reader[nreader]; | 	reader = gm_state->reader[nreader]; | ||||||
|  |  | ||||||
| 	/* Run TupleQueueReaders in per-tuple context */ | 	/* Run TupleQueueReaders in per-tuple context */ | ||||||
|  | 	tupleContext = gm_state->ps.ps_ExprContext->ecxt_per_tuple_memory; | ||||||
| 	oldContext = MemoryContextSwitchTo(tupleContext); | 	oldContext = MemoryContextSwitchTo(tupleContext); | ||||||
| 	tup = TupleQueueReaderNext(reader, nowait, done); | 	tup = TupleQueueReaderNext(reader, nowait, done); | ||||||
| 	MemoryContextSwitchTo(oldContext); | 	MemoryContextSwitchTo(oldContext); | ||||||
|   | |||||||
| @@ -1911,14 +1911,16 @@ typedef struct UniqueState | |||||||
| typedef struct GatherState | typedef struct GatherState | ||||||
| { | { | ||||||
| 	PlanState	ps;				/* its first field is NodeTag */ | 	PlanState	ps;				/* its first field is NodeTag */ | ||||||
| 	bool		initialized; | 	bool		initialized;	/* workers launched? */ | ||||||
| 	struct ParallelExecutorInfo *pei; | 	bool		need_to_scan_locally;	/* need to read from local plan? */ | ||||||
| 	int			nreaders; | 	/* these fields are set up once: */ | ||||||
| 	int			nextreader; |  | ||||||
| 	int			nworkers_launched; |  | ||||||
| 	struct TupleQueueReader **reader; |  | ||||||
| 	TupleTableSlot *funnel_slot; | 	TupleTableSlot *funnel_slot; | ||||||
| 	bool		need_to_scan_locally; | 	struct ParallelExecutorInfo *pei; | ||||||
|  | 	/* all remaining fields are reinitialized during a rescan: */ | ||||||
|  | 	int			nworkers_launched;	/* original number of workers */ | ||||||
|  | 	int			nreaders;		/* number of still-active workers */ | ||||||
|  | 	int			nextreader;		/* next one to try to read from */ | ||||||
|  | 	struct TupleQueueReader **reader;	/* array with nreaders active entries */ | ||||||
| } GatherState; | } GatherState; | ||||||
|  |  | ||||||
| /* ---------------- | /* ---------------- | ||||||
| @@ -1929,24 +1931,26 @@ typedef struct GatherState | |||||||
|  *		merge the results into a single sorted stream. |  *		merge the results into a single sorted stream. | ||||||
|  * ---------------- |  * ---------------- | ||||||
|  */ |  */ | ||||||
| struct GMReaderTuple; | struct GMReaderTupleBuffer;		/* private in nodeGatherMerge.c */ | ||||||
|  |  | ||||||
| typedef struct GatherMergeState | typedef struct GatherMergeState | ||||||
| { | { | ||||||
| 	PlanState	ps;				/* its first field is NodeTag */ | 	PlanState	ps;				/* its first field is NodeTag */ | ||||||
| 	bool		initialized; | 	bool		initialized;	/* workers launched? */ | ||||||
|  | 	bool		gm_initialized; /* gather_merge_init() done? */ | ||||||
|  | 	bool		need_to_scan_locally;	/* need to read from local plan? */ | ||||||
|  | 	/* these fields are set up once: */ | ||||||
|  | 	TupleDesc	tupDesc;		/* descriptor for subplan result tuples */ | ||||||
|  | 	int			gm_nkeys;		/* number of sort columns */ | ||||||
|  | 	SortSupport gm_sortkeys;	/* array of length gm_nkeys */ | ||||||
| 	struct ParallelExecutorInfo *pei; | 	struct ParallelExecutorInfo *pei; | ||||||
| 	int			nreaders; | 	/* all remaining fields are reinitialized during a rescan: */ | ||||||
| 	int			nworkers_launched; | 	int			nworkers_launched;	/* original number of workers */ | ||||||
| 	struct TupleQueueReader **reader; | 	int			nreaders;		/* number of active workers */ | ||||||
| 	TupleDesc	tupDesc; | 	TupleTableSlot **gm_slots;	/* array with nreaders+1 entries */ | ||||||
| 	TupleTableSlot **gm_slots; | 	struct TupleQueueReader **reader;	/* array with nreaders active entries */ | ||||||
|  | 	struct GMReaderTupleBuffer *gm_tuple_buffers;	/* nreaders tuple buffers */ | ||||||
| 	struct binaryheap *gm_heap; /* binary heap of slot indices */ | 	struct binaryheap *gm_heap; /* binary heap of slot indices */ | ||||||
| 	bool		gm_initialized; /* gather merge initilized ? */ |  | ||||||
| 	bool		need_to_scan_locally; |  | ||||||
| 	int			gm_nkeys; |  | ||||||
| 	SortSupport gm_sortkeys;	/* array of length ms_nkeys */ |  | ||||||
| 	struct GMReaderTupleBuffer *gm_tuple_buffers;	/* tuple buffer per reader */ |  | ||||||
| } GatherMergeState; | } GatherMergeState; | ||||||
|  |  | ||||||
| /* ---------------- | /* ---------------- | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user