mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Parallel executor support.
This code provides infrastructure for a parallel leader to start up parallel workers to execute subtrees of the plan tree being executed in the master. User-supplied parameters from ParamListInfo are passed down, but PARAM_EXEC parameters are not. Various other constructs, such as initplans, subplans, and CTEs, are also not currently shared. Nevertheless, there's enough here to support a basic implementation of parallel query, and we can lift some of the current restrictions as needed. Amit Kapila and Robert Haas
This commit is contained in:
		| @@ -13,7 +13,8 @@ top_builddir = ../../.. | |||||||
| include $(top_builddir)/src/Makefile.global | include $(top_builddir)/src/Makefile.global | ||||||
|  |  | ||||||
| OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ | OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ | ||||||
|        execMain.o execProcnode.o execQual.o execScan.o execTuples.o \ |        execMain.o execParallel.o execProcnode.o execQual.o \ | ||||||
|  |        execScan.o execTuples.o \ | ||||||
|        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \ |        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \ | ||||||
|        nodeBitmapAnd.o nodeBitmapOr.o \ |        nodeBitmapAnd.o nodeBitmapOr.o \ | ||||||
|        nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \ |        nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \ | ||||||
|   | |||||||
							
								
								
									
										585
									
								
								src/backend/executor/execParallel.c
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										585
									
								
								src/backend/executor/execParallel.c
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,585 @@ | |||||||
|  | /*------------------------------------------------------------------------- | ||||||
|  |  * | ||||||
|  |  * execParallel.c | ||||||
|  |  *	  Support routines for parallel execution. | ||||||
|  |  * | ||||||
|  |  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group | ||||||
|  |  * Portions Copyright (c) 1994, Regents of the University of California | ||||||
|  |  * | ||||||
|  |  * | ||||||
|  |  * IDENTIFICATION | ||||||
|  |  *	  src/backend/executor/execParallel.c | ||||||
|  |  * | ||||||
|  |  *------------------------------------------------------------------------- | ||||||
|  |  */ | ||||||
|  |  | ||||||
|  | #include "postgres.h" | ||||||
|  |  | ||||||
|  | #include "executor/execParallel.h" | ||||||
|  | #include "executor/executor.h" | ||||||
|  | #include "executor/tqueue.h" | ||||||
|  | #include "nodes/nodeFuncs.h" | ||||||
|  | #include "optimizer/planmain.h" | ||||||
|  | #include "optimizer/planner.h" | ||||||
|  | #include "storage/spin.h" | ||||||
|  | #include "tcop/tcopprot.h" | ||||||
|  | #include "utils/memutils.h" | ||||||
|  | #include "utils/snapmgr.h" | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Magic numbers for parallel executor communication.  We use constants | ||||||
|  |  * greater than any 32-bit integer here so that values < 2^32 can be used | ||||||
|  |  * by individual parallel nodes to store their own state. | ||||||
|  |  */ | ||||||
|  | #define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000001) | ||||||
|  | #define PARALLEL_KEY_PARAMS				UINT64CONST(0xE000000000000002) | ||||||
|  | #define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000003) | ||||||
|  | #define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000004) | ||||||
|  | #define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000005) | ||||||
|  |  | ||||||
|  | #define PARALLEL_TUPLE_QUEUE_SIZE		65536 | ||||||
|  |  | ||||||
|  | /* DSM structure for accumulating per-PlanState instrumentation. */ | ||||||
|  | typedef struct SharedPlanStateInstrumentation | ||||||
|  | { | ||||||
|  | 	int plan_node_id; | ||||||
|  | 	slock_t mutex; | ||||||
|  | 	Instrumentation	instr; | ||||||
|  | } SharedPlanStateInstrumentation; | ||||||
|  |  | ||||||
|  | /* DSM structure for accumulating per-PlanState instrumentation. */ | ||||||
|  | struct SharedExecutorInstrumentation | ||||||
|  | { | ||||||
|  | 	int instrument_options; | ||||||
|  | 	int ps_ninstrument;			/* # of ps_instrument structures following */ | ||||||
|  | 	SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER]; | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | /* Context object for ExecParallelEstimate. */ | ||||||
|  | typedef struct ExecParallelEstimateContext | ||||||
|  | { | ||||||
|  | 	ParallelContext *pcxt; | ||||||
|  | 	int nnodes; | ||||||
|  | } ExecParallelEstimateContext; | ||||||
|  |  | ||||||
|  | /* Context object for ExecParallelEstimate. */ | ||||||
|  | typedef struct ExecParallelInitializeDSMContext | ||||||
|  | { | ||||||
|  | 	ParallelContext *pcxt; | ||||||
|  | 	SharedExecutorInstrumentation *instrumentation; | ||||||
|  | 	int nnodes; | ||||||
|  | } ExecParallelInitializeDSMContext; | ||||||
|  |  | ||||||
|  | /* Helper functions that run in the parallel leader. */ | ||||||
|  | static char *ExecSerializePlan(Plan *plan, List *rangetable); | ||||||
|  | static bool ExecParallelEstimate(PlanState *node, | ||||||
|  | 					 ExecParallelEstimateContext *e); | ||||||
|  | static bool ExecParallelInitializeDSM(PlanState *node, | ||||||
|  | 					 ExecParallelInitializeDSMContext *d); | ||||||
|  | static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt); | ||||||
|  | static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, | ||||||
|  | 						  SharedExecutorInstrumentation *instrumentation); | ||||||
|  |  | ||||||
|  | /* Helper functions that run in the parallel worker. */ | ||||||
|  | static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); | ||||||
|  | static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Create a serialized representation of the plan to be sent to each worker. | ||||||
|  |  */ | ||||||
|  | static char * | ||||||
|  | ExecSerializePlan(Plan *plan, List *rangetable) | ||||||
|  | { | ||||||
|  | 	PlannedStmt *pstmt; | ||||||
|  | 	ListCell   *tlist; | ||||||
|  |  | ||||||
|  | 	/* We can't scribble on the original plan, so make a copy. */ | ||||||
|  | 	plan = copyObject(plan); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * The worker will start its own copy of the executor, and that copy will | ||||||
|  | 	 * insert a junk filter if the toplevel node has any resjunk entries. We | ||||||
|  | 	 * don't want that to happen, because while resjunk columns shouldn't be | ||||||
|  | 	 * sent back to the user, here the tuples are coming back to another | ||||||
|  | 	 * backend which may very well need them.  So mutate the target list | ||||||
|  | 	 * accordingly.  This is sort of a hack; there might be better ways to do | ||||||
|  | 	 * this... | ||||||
|  | 	 */ | ||||||
|  | 	foreach(tlist, plan->targetlist) | ||||||
|  | 	{ | ||||||
|  | 		TargetEntry *tle = (TargetEntry *) lfirst(tlist); | ||||||
|  |  | ||||||
|  | 		tle->resjunk = false; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * Create a dummy PlannedStmt.  Most of the fields don't need to be valid | ||||||
|  | 	 * for our purposes, but the worker will need at least a minimal | ||||||
|  | 	 * PlannedStmt to start the executor. | ||||||
|  | 	 */ | ||||||
|  | 	pstmt = makeNode(PlannedStmt); | ||||||
|  | 	pstmt->commandType = CMD_SELECT; | ||||||
|  | 	pstmt->queryId = 0; | ||||||
|  | 	pstmt->hasReturning = 0; | ||||||
|  | 	pstmt->hasModifyingCTE = 0; | ||||||
|  | 	pstmt->canSetTag = 1; | ||||||
|  | 	pstmt->transientPlan = 0; | ||||||
|  | 	pstmt->planTree = plan; | ||||||
|  | 	pstmt->rtable = rangetable; | ||||||
|  | 	pstmt->resultRelations = NIL; | ||||||
|  | 	pstmt->utilityStmt = NULL; | ||||||
|  | 	pstmt->subplans = NIL; | ||||||
|  | 	pstmt->rewindPlanIDs = NULL; | ||||||
|  | 	pstmt->rowMarks = NIL; | ||||||
|  | 	pstmt->nParamExec = 0; | ||||||
|  | 	pstmt->relationOids = NIL; | ||||||
|  | 	pstmt->invalItems = NIL;	/* workers can't replan anyway... */ | ||||||
|  | 	pstmt->hasRowSecurity = false; | ||||||
|  |  | ||||||
|  | 	/* Return serialized copy of our dummy PlannedStmt. */ | ||||||
|  | 	return nodeToString(pstmt); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes | ||||||
|  |  * may need some state which is shared across all parallel workers.  Before | ||||||
|  |  * we size the DSM, give them a chance to call shm_toc_estimate_chunk or | ||||||
|  |  * shm_toc_estimate_keys on &pcxt->estimator. | ||||||
|  |  * | ||||||
|  |  * While we're at it, count the number of PlanState nodes in the tree, so | ||||||
|  |  * we know how many SharedPlanStateInstrumentation structures we need. | ||||||
|  |  */ | ||||||
|  | static bool | ||||||
|  | ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) | ||||||
|  | { | ||||||
|  | 	if (planstate == NULL) | ||||||
|  | 		return false; | ||||||
|  |  | ||||||
|  | 	/* Count this node. */ | ||||||
|  | 	e->nnodes++; | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * XXX. Call estimators for parallel-aware nodes here, when we have | ||||||
|  | 	 * some. | ||||||
|  | 	 */ | ||||||
|  |  | ||||||
|  | 	return planstate_tree_walker(planstate, ExecParallelEstimate, e); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes | ||||||
|  |  * may need to initialize shared state in the DSM before parallel workers | ||||||
|  |  * are available.  They can allocate the space they previous estimated using | ||||||
|  |  * shm_toc_allocate, and add the keys they previously estimated using | ||||||
|  |  * shm_toc_insert, in each case targeting pcxt->toc. | ||||||
|  |  */ | ||||||
|  | static bool | ||||||
|  | ExecParallelInitializeDSM(PlanState *planstate, | ||||||
|  | 						  ExecParallelInitializeDSMContext *d) | ||||||
|  | { | ||||||
|  | 	if (planstate == NULL) | ||||||
|  | 		return false; | ||||||
|  |  | ||||||
|  | 	/* If instrumentation is enabled, initialize array slot for this node. */ | ||||||
|  | 	if (d->instrumentation != NULL) | ||||||
|  | 	{ | ||||||
|  | 		SharedPlanStateInstrumentation *instrumentation; | ||||||
|  |  | ||||||
|  | 		instrumentation = &d->instrumentation->ps_instrument[d->nnodes]; | ||||||
|  | 		Assert(d->nnodes < d->instrumentation->ps_ninstrument); | ||||||
|  | 		instrumentation->plan_node_id = planstate->plan->plan_node_id; | ||||||
|  | 		SpinLockInit(&instrumentation->mutex); | ||||||
|  | 		InstrInit(&instrumentation->instr, | ||||||
|  | 				  d->instrumentation->instrument_options); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	/* Count this node. */ | ||||||
|  | 	d->nnodes++; | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * XXX. Call initializers for parallel-aware plan nodes, when we have | ||||||
|  | 	 * some. | ||||||
|  | 	 */ | ||||||
|  |  | ||||||
|  | 	return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * It sets up the response queues for backend workers to return tuples | ||||||
|  |  * to the main backend and start the workers. | ||||||
|  |  */ | ||||||
|  | static shm_mq_handle ** | ||||||
|  | ExecParallelSetupTupleQueues(ParallelContext *pcxt) | ||||||
|  | { | ||||||
|  | 	shm_mq_handle **responseq; | ||||||
|  | 	char	   *tqueuespace; | ||||||
|  | 	int			i; | ||||||
|  |  | ||||||
|  | 	/* Skip this if no workers. */ | ||||||
|  | 	if (pcxt->nworkers == 0) | ||||||
|  | 		return NULL; | ||||||
|  |  | ||||||
|  | 	/* Allocate memory for shared memory queue handles. */ | ||||||
|  | 	responseq = (shm_mq_handle **) | ||||||
|  | 		palloc(pcxt->nworkers * sizeof(shm_mq_handle *)); | ||||||
|  |  | ||||||
|  | 	/* Allocate space from the DSM for the queues themselves. */ | ||||||
|  | 	tqueuespace = shm_toc_allocate(pcxt->toc, | ||||||
|  | 								 PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); | ||||||
|  |  | ||||||
|  | 	/* Create the queues, and become the receiver for each. */ | ||||||
|  | 	for (i = 0; i < pcxt->nworkers; ++i) | ||||||
|  | 	{ | ||||||
|  | 		shm_mq	   *mq; | ||||||
|  |  | ||||||
|  | 		mq = shm_mq_create(tqueuespace + i * PARALLEL_TUPLE_QUEUE_SIZE, | ||||||
|  | 						   (Size) PARALLEL_TUPLE_QUEUE_SIZE); | ||||||
|  |  | ||||||
|  | 		shm_mq_set_receiver(mq, MyProc); | ||||||
|  | 		responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	/* Add array of queues to shm_toc, so others can find it. */ | ||||||
|  | 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace); | ||||||
|  |  | ||||||
|  | 	/* Return array of handles. */ | ||||||
|  | 	return responseq; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Sets up the required infrastructure for backend workers to perform | ||||||
|  |  * execution and return results to the main backend. | ||||||
|  |  */ | ||||||
|  | ParallelExecutorInfo * | ||||||
|  | ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) | ||||||
|  | { | ||||||
|  | 	ParallelExecutorInfo *pei; | ||||||
|  | 	ParallelContext *pcxt; | ||||||
|  | 	ExecParallelEstimateContext e; | ||||||
|  | 	ExecParallelInitializeDSMContext d; | ||||||
|  | 	char	   *pstmt_data; | ||||||
|  | 	char	   *pstmt_space; | ||||||
|  | 	char	   *param_space; | ||||||
|  | 	BufferUsage *bufusage_space; | ||||||
|  | 	SharedExecutorInstrumentation *instrumentation = NULL; | ||||||
|  | 	int			pstmt_len; | ||||||
|  | 	int			param_len; | ||||||
|  | 	int			instrumentation_len = 0; | ||||||
|  |  | ||||||
|  | 	/* Allocate object for return value. */ | ||||||
|  | 	pei = palloc0(sizeof(ParallelExecutorInfo)); | ||||||
|  | 	pei->planstate = planstate; | ||||||
|  |  | ||||||
|  | 	/* Fix up and serialize plan to be sent to workers. */ | ||||||
|  | 	pstmt_data = ExecSerializePlan(planstate->plan, estate->es_range_table); | ||||||
|  |  | ||||||
|  | 	/* Create a parallel context. */ | ||||||
|  | 	pcxt = CreateParallelContext(ParallelQueryMain, nworkers); | ||||||
|  | 	pei->pcxt = pcxt; | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * Before telling the parallel context to create a dynamic shared memory | ||||||
|  | 	 * segment, we need to figure out how big it should be.  Estimate space | ||||||
|  | 	 * for the various things we need to store. | ||||||
|  | 	 */ | ||||||
|  |  | ||||||
|  | 	/* Estimate space for serialized PlannedStmt. */ | ||||||
|  | 	pstmt_len = strlen(pstmt_data) + 1; | ||||||
|  | 	shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len); | ||||||
|  | 	shm_toc_estimate_keys(&pcxt->estimator, 1); | ||||||
|  |  | ||||||
|  | 	/* Estimate space for serialized ParamListInfo. */ | ||||||
|  | 	param_len = EstimateParamListSpace(estate->es_param_list_info); | ||||||
|  | 	shm_toc_estimate_chunk(&pcxt->estimator, param_len); | ||||||
|  | 	shm_toc_estimate_keys(&pcxt->estimator, 1); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * Estimate space for BufferUsage. | ||||||
|  | 	 * | ||||||
|  | 	 * If EXPLAIN is not in use and there are no extensions loaded that care, | ||||||
|  | 	 * we could skip this.  But we have no way of knowing whether anyone's | ||||||
|  | 	 * looking at pgBufferUsage, so do it unconditionally. | ||||||
|  | 	 */ | ||||||
|  | 	shm_toc_estimate_chunk(&pcxt->estimator, | ||||||
|  | 						   sizeof(BufferUsage) * pcxt->nworkers); | ||||||
|  | 	shm_toc_estimate_keys(&pcxt->estimator, 1); | ||||||
|  |  | ||||||
|  | 	/* Estimate space for tuple queues. */ | ||||||
|  | 	shm_toc_estimate_chunk(&pcxt->estimator, | ||||||
|  | 						   PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); | ||||||
|  | 	shm_toc_estimate_keys(&pcxt->estimator, 1); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * Give parallel-aware nodes a chance to add to the estimates, and get | ||||||
|  | 	 * a count of how many PlanState nodes there are. | ||||||
|  | 	 */ | ||||||
|  | 	e.pcxt = pcxt; | ||||||
|  | 	e.nnodes = 0; | ||||||
|  | 	ExecParallelEstimate(planstate, &e); | ||||||
|  |  | ||||||
|  | 	/* Estimate space for instrumentation, if required. */ | ||||||
|  | 	if (estate->es_instrument) | ||||||
|  | 	{ | ||||||
|  | 		instrumentation_len = | ||||||
|  | 			offsetof(SharedExecutorInstrumentation, ps_instrument) | ||||||
|  | 			+ sizeof(SharedPlanStateInstrumentation) * e.nnodes; | ||||||
|  | 		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len); | ||||||
|  | 		shm_toc_estimate_keys(&pcxt->estimator, 1); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	/* Everyone's had a chance to ask for space, so now create the DSM. */ | ||||||
|  | 	InitializeParallelDSM(pcxt); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * OK, now we have a dynamic shared memory segment, and it should be big | ||||||
|  | 	 * enough to store all of the data we estimated we would want to put into | ||||||
|  | 	 * it, plus whatever general stuff (not specifically executor-related) the | ||||||
|  | 	 * ParallelContext itself needs to store there.  None of the space we | ||||||
|  | 	 * asked for has been allocated or initialized yet, though, so do that. | ||||||
|  | 	 */ | ||||||
|  |  | ||||||
|  | 	/* Store serialized PlannedStmt. */ | ||||||
|  | 	pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len); | ||||||
|  | 	memcpy(pstmt_space, pstmt_data, pstmt_len); | ||||||
|  | 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space); | ||||||
|  |  | ||||||
|  | 	/* Store serialized ParamListInfo. */ | ||||||
|  | 	param_space = shm_toc_allocate(pcxt->toc, param_len); | ||||||
|  | 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space); | ||||||
|  | 	SerializeParamList(estate->es_param_list_info, ¶m_space); | ||||||
|  |  | ||||||
|  | 	/* Allocate space for each worker's BufferUsage; no need to initialize. */ | ||||||
|  | 	bufusage_space = shm_toc_allocate(pcxt->toc, | ||||||
|  | 									  sizeof(BufferUsage) * pcxt->nworkers); | ||||||
|  | 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); | ||||||
|  | 	pei->buffer_usage = bufusage_space; | ||||||
|  |  | ||||||
|  | 	/* Set up tuple queues. */ | ||||||
|  | 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * If instrumentation options were supplied, allocate space for the | ||||||
|  | 	 * data.  It only gets partially initialized here; the rest happens | ||||||
|  | 	 * during ExecParallelInitializeDSM. | ||||||
|  | 	 */ | ||||||
|  | 	if (estate->es_instrument) | ||||||
|  | 	{ | ||||||
|  | 		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len); | ||||||
|  | 		instrumentation->instrument_options = estate->es_instrument; | ||||||
|  | 		instrumentation->ps_ninstrument = e.nnodes; | ||||||
|  | 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, | ||||||
|  | 					   instrumentation); | ||||||
|  | 		pei->instrumentation = instrumentation; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * Give parallel-aware nodes a chance to initialize their shared data. | ||||||
|  | 	 * This also initializes the elements of instrumentation->ps_instrument, | ||||||
|  | 	 * if it exists. | ||||||
|  | 	 */ | ||||||
|  | 	d.pcxt = pcxt; | ||||||
|  | 	d.instrumentation = instrumentation; | ||||||
|  | 	d.nnodes = 0; | ||||||
|  | 	ExecParallelInitializeDSM(planstate, &d); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * Make sure that the world hasn't shifted under our feat.  This could | ||||||
|  | 	 * probably just be an Assert(), but let's be conservative for now. | ||||||
|  | 	 */ | ||||||
|  | 	if (e.nnodes != d.nnodes) | ||||||
|  | 		elog(ERROR, "inconsistent count of PlanState nodes"); | ||||||
|  |  | ||||||
|  | 	/* OK, we're ready to rock and roll. */ | ||||||
|  | 	return pei; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Copy instrumentation information about this node and its descendents from | ||||||
|  |  * dynamic shared memory. | ||||||
|  |  */ | ||||||
|  | static bool | ||||||
|  | ExecParallelRetrieveInstrumentation(PlanState *planstate, | ||||||
|  | 						  SharedExecutorInstrumentation *instrumentation) | ||||||
|  | { | ||||||
|  | 	int		i; | ||||||
|  | 	int		plan_node_id = planstate->plan->plan_node_id; | ||||||
|  | 	SharedPlanStateInstrumentation *ps_instrument; | ||||||
|  |  | ||||||
|  | 	/* Find the instumentation for this node. */ | ||||||
|  | 	for (i = 0; i < instrumentation->ps_ninstrument; ++i) | ||||||
|  | 		if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id) | ||||||
|  | 			break; | ||||||
|  | 	if (i >= instrumentation->ps_ninstrument) | ||||||
|  | 		elog(ERROR, "plan node %d not found", plan_node_id); | ||||||
|  |  | ||||||
|  | 	/* No need to acquire the spinlock here; workers have exited already. */ | ||||||
|  | 	ps_instrument = &instrumentation->ps_instrument[i]; | ||||||
|  | 	InstrAggNode(planstate->instrument, &ps_instrument->instr); | ||||||
|  |  | ||||||
|  | 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation, | ||||||
|  | 								 instrumentation); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Finish parallel execution.  We wait for parallel workers to finish, and | ||||||
|  |  * accumulate their buffer usage and instrumentation. | ||||||
|  |  */ | ||||||
|  | void | ||||||
|  | ExecParallelFinish(ParallelExecutorInfo *pei) | ||||||
|  | { | ||||||
|  | 	int		i; | ||||||
|  |  | ||||||
|  | 	/* First, wait for the workers to finish. */ | ||||||
|  | 	WaitForParallelWorkersToFinish(pei->pcxt); | ||||||
|  |  | ||||||
|  | 	/* Next, accumulate buffer usage. */ | ||||||
|  | 	for (i = 0; i < pei->pcxt->nworkers; ++i) | ||||||
|  | 		InstrAccumParallelQuery(&pei->buffer_usage[i]); | ||||||
|  |  | ||||||
|  | 	/* Finally, accumulate instrumentation, if any. */ | ||||||
|  | 	if (pei->instrumentation) | ||||||
|  | 		ExecParallelRetrieveInstrumentation(pei->planstate, | ||||||
|  | 											pei->instrumentation); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Create a DestReceiver to write tuples we produce to the shm_mq designated | ||||||
|  |  * for that purpose. | ||||||
|  |  */ | ||||||
|  | static DestReceiver * | ||||||
|  | ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc) | ||||||
|  | { | ||||||
|  | 	char	   *mqspace; | ||||||
|  | 	shm_mq	   *mq; | ||||||
|  |  | ||||||
|  | 	mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE); | ||||||
|  | 	mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE; | ||||||
|  | 	mq = (shm_mq *) mqspace; | ||||||
|  | 	shm_mq_set_sender(mq, MyProc); | ||||||
|  | 	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL)); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Create a QueryDesc for the PlannedStmt we are to execute, and return it. | ||||||
|  |  */ | ||||||
|  | static QueryDesc * | ||||||
|  | ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, | ||||||
|  | 						 int instrument_options) | ||||||
|  | { | ||||||
|  | 	char	   *pstmtspace; | ||||||
|  | 	char	   *paramspace; | ||||||
|  | 	PlannedStmt *pstmt; | ||||||
|  | 	ParamListInfo paramLI; | ||||||
|  |  | ||||||
|  | 	/* Reconstruct leader-supplied PlannedStmt. */ | ||||||
|  | 	pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT); | ||||||
|  | 	pstmt = (PlannedStmt *) stringToNode(pstmtspace); | ||||||
|  |  | ||||||
|  | 	/* Reconstruct ParamListInfo. */ | ||||||
|  | 	paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS); | ||||||
|  | 	paramLI = RestoreParamList(¶mspace); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * Create a QueryDesc for the query. | ||||||
|  | 	 * | ||||||
|  | 	 * It's not obvious how to obtain the query string from here; and even if | ||||||
|  | 	 * we could copying it would take more cycles than not copying it. But | ||||||
|  | 	 * it's a bit unsatisfying to just use a dummy string here, so consider | ||||||
|  | 	 * revising this someday. | ||||||
|  | 	 */ | ||||||
|  | 	return CreateQueryDesc(pstmt, | ||||||
|  | 						   "<parallel query>", | ||||||
|  | 						   GetActiveSnapshot(), InvalidSnapshot, | ||||||
|  | 						   receiver, paramLI, instrument_options); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Copy instrumentation information from this node and its descendents into | ||||||
|  |  * dynamic shared memory, so that the parallel leader can retrieve it. | ||||||
|  |  */ | ||||||
|  | static bool | ||||||
|  | ExecParallelReportInstrumentation(PlanState *planstate, | ||||||
|  | 						  SharedExecutorInstrumentation *instrumentation) | ||||||
|  | { | ||||||
|  | 	int		i; | ||||||
|  | 	int		plan_node_id = planstate->plan->plan_node_id; | ||||||
|  | 	SharedPlanStateInstrumentation *ps_instrument; | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * If we shuffled the plan_node_id values in ps_instrument into sorted | ||||||
|  | 	 * order, we could use binary search here.  This might matter someday | ||||||
|  | 	 * if we're pushing down sufficiently large plan trees.  For now, do it | ||||||
|  | 	 * the slow, dumb way. | ||||||
|  | 	 */ | ||||||
|  | 	for (i = 0; i < instrumentation->ps_ninstrument; ++i) | ||||||
|  | 		if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id) | ||||||
|  | 			break; | ||||||
|  | 	if (i >= instrumentation->ps_ninstrument) | ||||||
|  | 		elog(ERROR, "plan node %d not found", plan_node_id); | ||||||
|  |  | ||||||
|  | 	/* | ||||||
|  | 	 * There's one SharedPlanStateInstrumentation per plan_node_id, so we | ||||||
|  | 	 * must use a spinlock in case multiple workers report at the same time. | ||||||
|  | 	 */ | ||||||
|  | 	ps_instrument = &instrumentation->ps_instrument[i]; | ||||||
|  | 	SpinLockAcquire(&ps_instrument->mutex); | ||||||
|  | 	InstrAggNode(&ps_instrument->instr, planstate->instrument); | ||||||
|  | 	SpinLockRelease(&ps_instrument->mutex); | ||||||
|  |  | ||||||
|  | 	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation, | ||||||
|  | 								 instrumentation); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Main entrypoint for parallel query worker processes. | ||||||
|  |  * | ||||||
|  |  * We reach this function from ParallelMain, so the setup necessary to create | ||||||
|  |  * a sensible parallel environment has already been done; ParallelMain worries | ||||||
|  |  * about stuff like the transaction state, combo CID mappings, and GUC values, | ||||||
|  |  * so we don't need to deal with any of that here. | ||||||
|  |  * | ||||||
|  |  * Our job is to deal with concerns specific to the executor.  The parallel | ||||||
|  |  * group leader will have stored a serialized PlannedStmt, and it's our job | ||||||
|  |  * to execute that plan and write the resulting tuples to the appropriate | ||||||
|  |  * tuple queue.  Various bits of supporting information that we need in order | ||||||
|  |  * to do this are also stored in the dsm_segment and can be accessed through | ||||||
|  |  * the shm_toc. | ||||||
|  |  */ | ||||||
|  | static void | ||||||
|  | ParallelQueryMain(dsm_segment *seg, shm_toc *toc) | ||||||
|  | { | ||||||
|  | 	BufferUsage *buffer_usage; | ||||||
|  | 	DestReceiver *receiver; | ||||||
|  | 	QueryDesc  *queryDesc; | ||||||
|  | 	SharedExecutorInstrumentation *instrumentation; | ||||||
|  | 	int			instrument_options = 0; | ||||||
|  |  | ||||||
|  | 	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ | ||||||
|  | 	receiver = ExecParallelGetReceiver(seg, toc); | ||||||
|  | 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION); | ||||||
|  | 	if (instrumentation != NULL) | ||||||
|  | 		instrument_options = instrumentation->instrument_options; | ||||||
|  | 	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); | ||||||
|  |  | ||||||
|  | 	/* Prepare to track buffer usage during query execution. */ | ||||||
|  | 	InstrStartParallelQuery(); | ||||||
|  |  | ||||||
|  | 	/* Start up the executor, have it run the plan, and then shut it down. */ | ||||||
|  | 	ExecutorStart(queryDesc, 0); | ||||||
|  | 	ExecutorRun(queryDesc, ForwardScanDirection, 0L); | ||||||
|  | 	ExecutorFinish(queryDesc); | ||||||
|  | 	ExecutorEnd(queryDesc); | ||||||
|  |  | ||||||
|  | 	/* Report buffer usage during parallel execution. */ | ||||||
|  | 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE); | ||||||
|  | 	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]); | ||||||
|  |  | ||||||
|  | 	/* Report instrumentation data if any instrumentation options are set. */ | ||||||
|  | 	if (instrumentation != NULL) | ||||||
|  | 		ExecParallelReportInstrumentation(queryDesc->planstate, | ||||||
|  | 										  instrumentation); | ||||||
|  |  | ||||||
|  | 	/* Cleanup. */ | ||||||
|  | 	FreeQueryDesc(queryDesc); | ||||||
|  | 	(*receiver->rDestroy) (receiver); | ||||||
|  | } | ||||||
| @@ -18,7 +18,9 @@ | |||||||
| #include "executor/instrument.h" | #include "executor/instrument.h" | ||||||
|  |  | ||||||
| BufferUsage pgBufferUsage; | BufferUsage pgBufferUsage; | ||||||
|  | static BufferUsage save_pgBufferUsage; | ||||||
|  |  | ||||||
|  | static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); | ||||||
| static void BufferUsageAccumDiff(BufferUsage *dst, | static void BufferUsageAccumDiff(BufferUsage *dst, | ||||||
| 					 const BufferUsage *add, const BufferUsage *sub); | 					 const BufferUsage *add, const BufferUsage *sub); | ||||||
|  |  | ||||||
| @@ -47,6 +49,15 @@ InstrAlloc(int n, int instrument_options) | |||||||
| 	return instr; | 	return instr; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /* Initialize an pre-allocated instrumentation structure. */ | ||||||
|  | void | ||||||
|  | InstrInit(Instrumentation *instr, int instrument_options) | ||||||
|  | { | ||||||
|  | 	memset(instr, 0, sizeof(Instrumentation)); | ||||||
|  | 	instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0; | ||||||
|  | 	instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; | ||||||
|  | } | ||||||
|  |  | ||||||
| /* Entry to a plan node */ | /* Entry to a plan node */ | ||||||
| void | void | ||||||
| InstrStartNode(Instrumentation *instr) | InstrStartNode(Instrumentation *instr) | ||||||
| @@ -127,6 +138,73 @@ InstrEndLoop(Instrumentation *instr) | |||||||
| 	instr->tuplecount = 0; | 	instr->tuplecount = 0; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /* aggregate instrumentation information */ | ||||||
|  | void | ||||||
|  | InstrAggNode(Instrumentation *dst, Instrumentation *add) | ||||||
|  | { | ||||||
|  | 	if (!dst->running && add->running) | ||||||
|  | 	{ | ||||||
|  | 		dst->running = true; | ||||||
|  | 		dst->firsttuple = add->firsttuple; | ||||||
|  | 	} | ||||||
|  | 	else if (dst->running && add->running && dst->firsttuple > add->firsttuple) | ||||||
|  | 		dst->firsttuple = add->firsttuple; | ||||||
|  |  | ||||||
|  | 	INSTR_TIME_ADD(dst->counter, add->counter); | ||||||
|  |  | ||||||
|  | 	dst->tuplecount += add->tuplecount; | ||||||
|  | 	dst->startup += add->startup; | ||||||
|  | 	dst->total += add->total; | ||||||
|  | 	dst->ntuples += add->ntuples; | ||||||
|  | 	dst->nloops += add->nloops; | ||||||
|  | 	dst->nfiltered1 += add->nfiltered1; | ||||||
|  | 	dst->nfiltered2 += add->nfiltered2; | ||||||
|  |  | ||||||
|  | 	/* Add delta of buffer usage since entry to node's totals */ | ||||||
|  | 	if (dst->need_bufusage) | ||||||
|  | 		BufferUsageAdd(&dst->bufusage, &add->bufusage); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* note current values during parallel executor startup */ | ||||||
|  | void | ||||||
|  | InstrStartParallelQuery(void) | ||||||
|  | { | ||||||
|  | 	save_pgBufferUsage = pgBufferUsage; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* report usage after parallel executor shutdown */ | ||||||
|  | void | ||||||
|  | InstrEndParallelQuery(BufferUsage *result) | ||||||
|  | { | ||||||
|  | 	memset(result, 0, sizeof(BufferUsage)); | ||||||
|  | 	BufferUsageAccumDiff(result, &pgBufferUsage, &save_pgBufferUsage); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* accumulate work done by workers in leader's stats */ | ||||||
|  | void | ||||||
|  | InstrAccumParallelQuery(BufferUsage *result) | ||||||
|  | { | ||||||
|  | 	BufferUsageAdd(&pgBufferUsage, result); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* dst += add */ | ||||||
|  | static void | ||||||
|  | BufferUsageAdd(BufferUsage *dst, const BufferUsage *add) | ||||||
|  | { | ||||||
|  | 	dst->shared_blks_hit += add->shared_blks_hit; | ||||||
|  | 	dst->shared_blks_read += add->shared_blks_read; | ||||||
|  | 	dst->shared_blks_dirtied += add->shared_blks_dirtied; | ||||||
|  | 	dst->shared_blks_written += add->shared_blks_written; | ||||||
|  | 	dst->local_blks_hit += add->local_blks_hit; | ||||||
|  | 	dst->local_blks_read += add->local_blks_read; | ||||||
|  | 	dst->local_blks_dirtied += add->local_blks_dirtied; | ||||||
|  | 	dst->local_blks_written += add->local_blks_written; | ||||||
|  | 	dst->temp_blks_read += add->temp_blks_read; | ||||||
|  | 	dst->temp_blks_written += add->temp_blks_written; | ||||||
|  | 	INSTR_TIME_ADD(dst->blk_read_time, add->blk_read_time); | ||||||
|  | 	INSTR_TIME_ADD(dst->blk_write_time, add->blk_write_time); | ||||||
|  | } | ||||||
|  |  | ||||||
| /* dst += add - sub */ | /* dst += add - sub */ | ||||||
| static void | static void | ||||||
| BufferUsageAccumDiff(BufferUsage *dst, | BufferUsageAccumDiff(BufferUsage *dst, | ||||||
|   | |||||||
| @@ -66,7 +66,9 @@ tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) | |||||||
| static void | static void | ||||||
| tqueueShutdownReceiver(DestReceiver *self) | tqueueShutdownReceiver(DestReceiver *self) | ||||||
| { | { | ||||||
| 	/* do nothing */ | 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; | ||||||
|  |  | ||||||
|  | 	shm_mq_detach(shm_mq_get_queue(tqueue->handle)); | ||||||
| } | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
|   | |||||||
| @@ -112,6 +112,7 @@ CopyPlanFields(const Plan *from, Plan *newnode) | |||||||
| 	COPY_SCALAR_FIELD(total_cost); | 	COPY_SCALAR_FIELD(total_cost); | ||||||
| 	COPY_SCALAR_FIELD(plan_rows); | 	COPY_SCALAR_FIELD(plan_rows); | ||||||
| 	COPY_SCALAR_FIELD(plan_width); | 	COPY_SCALAR_FIELD(plan_width); | ||||||
|  | 	COPY_SCALAR_FIELD(plan_node_id); | ||||||
| 	COPY_NODE_FIELD(targetlist); | 	COPY_NODE_FIELD(targetlist); | ||||||
| 	COPY_NODE_FIELD(qual); | 	COPY_NODE_FIELD(qual); | ||||||
| 	COPY_NODE_FIELD(lefttree); | 	COPY_NODE_FIELD(lefttree); | ||||||
|   | |||||||
| @@ -271,6 +271,7 @@ _outPlanInfo(StringInfo str, const Plan *node) | |||||||
| 	WRITE_FLOAT_FIELD(total_cost, "%.2f"); | 	WRITE_FLOAT_FIELD(total_cost, "%.2f"); | ||||||
| 	WRITE_FLOAT_FIELD(plan_rows, "%.0f"); | 	WRITE_FLOAT_FIELD(plan_rows, "%.0f"); | ||||||
| 	WRITE_INT_FIELD(plan_width); | 	WRITE_INT_FIELD(plan_width); | ||||||
|  | 	WRITE_INT_FIELD(plan_node_id); | ||||||
| 	WRITE_NODE_FIELD(targetlist); | 	WRITE_NODE_FIELD(targetlist); | ||||||
| 	WRITE_NODE_FIELD(qual); | 	WRITE_NODE_FIELD(qual); | ||||||
| 	WRITE_NODE_FIELD(lefttree); | 	WRITE_NODE_FIELD(lefttree); | ||||||
|   | |||||||
| @@ -16,6 +16,7 @@ | |||||||
| #include "postgres.h" | #include "postgres.h" | ||||||
|  |  | ||||||
| #include "nodes/params.h" | #include "nodes/params.h" | ||||||
|  | #include "storage/shmem.h" | ||||||
| #include "utils/datum.h" | #include "utils/datum.h" | ||||||
| #include "utils/lsyscache.h" | #include "utils/lsyscache.h" | ||||||
|  |  | ||||||
| @@ -73,3 +74,157 @@ copyParamList(ParamListInfo from) | |||||||
|  |  | ||||||
| 	return retval; | 	return retval; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Estimate the amount of space required to serialize a ParamListInfo. | ||||||
|  |  */ | ||||||
|  | Size | ||||||
|  | EstimateParamListSpace(ParamListInfo paramLI) | ||||||
|  | { | ||||||
|  | 	int		i; | ||||||
|  | 	Size	sz = sizeof(int); | ||||||
|  |  | ||||||
|  | 	if (paramLI == NULL || paramLI->numParams <= 0) | ||||||
|  | 		return sz; | ||||||
|  |  | ||||||
|  | 	for (i = 0; i < paramLI->numParams; i++) | ||||||
|  | 	{ | ||||||
|  | 		ParamExternData *prm = ¶mLI->params[i]; | ||||||
|  | 		int16		typLen; | ||||||
|  | 		bool		typByVal; | ||||||
|  |  | ||||||
|  | 		/* give hook a chance in case parameter is dynamic */ | ||||||
|  | 		if (!OidIsValid(prm->ptype) && paramLI->paramFetch != NULL) | ||||||
|  | 			(*paramLI->paramFetch) (paramLI, i + 1); | ||||||
|  |  | ||||||
|  | 		sz = add_size(sz, sizeof(Oid));			/* space for type OID */ | ||||||
|  | 		sz = add_size(sz, sizeof(uint16));		/* space for pflags */ | ||||||
|  |  | ||||||
|  | 		/* space for datum/isnull */ | ||||||
|  | 		if (OidIsValid(prm->ptype)) | ||||||
|  | 			get_typlenbyval(prm->ptype, &typLen, &typByVal); | ||||||
|  | 		else | ||||||
|  | 		{ | ||||||
|  | 			/* If no type OID, assume by-value, like copyParamList does. */ | ||||||
|  | 			typLen = sizeof(Datum); | ||||||
|  | 			typByVal = true; | ||||||
|  | 		} | ||||||
|  | 		sz = add_size(sz, | ||||||
|  | 			datumEstimateSpace(prm->value, prm->isnull, typByVal, typLen)); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return sz; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Serialize a paramListInfo structure into caller-provided storage. | ||||||
|  |  * | ||||||
|  |  * We write the number of parameters first, as a 4-byte integer, and then | ||||||
|  |  * write details for each parameter in turn.  The details for each parameter | ||||||
|  |  * consist of a 4-byte type OID, 2 bytes of flags, and then the datum as | ||||||
|  |  * serialized by datumSerialize().  The caller is responsible for ensuring | ||||||
|  |  * that there is enough storage to store the number of bytes that will be | ||||||
|  |  * written; use EstimateParamListSpace to find out how many will be needed. | ||||||
|  |  * *start_address is updated to point to the byte immediately following those | ||||||
|  |  * written. | ||||||
|  |  * | ||||||
|  |  * RestoreParamList can be used to recreate a ParamListInfo based on the | ||||||
|  |  * serialized representation; this will be a static, self-contained copy | ||||||
|  |  * just as copyParamList would create. | ||||||
|  |  */ | ||||||
|  | void | ||||||
|  | SerializeParamList(ParamListInfo paramLI, char **start_address) | ||||||
|  | { | ||||||
|  | 	int			nparams; | ||||||
|  | 	int			i; | ||||||
|  |  | ||||||
|  | 	/* Write number of parameters. */ | ||||||
|  | 	if (paramLI == NULL || paramLI->numParams <= 0) | ||||||
|  | 		nparams = 0; | ||||||
|  | 	else | ||||||
|  | 		nparams = paramLI->numParams; | ||||||
|  | 	memcpy(*start_address, &nparams, sizeof(int)); | ||||||
|  | 	*start_address += sizeof(int); | ||||||
|  |  | ||||||
|  | 	/* Write each parameter in turn. */ | ||||||
|  | 	for (i = 0; i < nparams; i++) | ||||||
|  | 	{ | ||||||
|  | 		ParamExternData *prm = ¶mLI->params[i]; | ||||||
|  | 		int16		typLen; | ||||||
|  | 		bool		typByVal; | ||||||
|  |  | ||||||
|  | 		/* give hook a chance in case parameter is dynamic */ | ||||||
|  | 		if (!OidIsValid(prm->ptype) && paramLI->paramFetch != NULL) | ||||||
|  | 			(*paramLI->paramFetch) (paramLI, i + 1); | ||||||
|  |  | ||||||
|  | 		/* Write type OID. */ | ||||||
|  | 		memcpy(*start_address, &prm->ptype, sizeof(Oid)); | ||||||
|  | 		*start_address += sizeof(Oid); | ||||||
|  |  | ||||||
|  | 		/* Write flags. */ | ||||||
|  | 		memcpy(*start_address, &prm->pflags, sizeof(uint16)); | ||||||
|  | 		*start_address += sizeof(uint16); | ||||||
|  |  | ||||||
|  | 		/* Write datum/isnull. */ | ||||||
|  | 		if (OidIsValid(prm->ptype)) | ||||||
|  | 			get_typlenbyval(prm->ptype, &typLen, &typByVal); | ||||||
|  | 		else | ||||||
|  | 		{ | ||||||
|  | 			/* If no type OID, assume by-value, like copyParamList does. */ | ||||||
|  | 			typLen = sizeof(Datum); | ||||||
|  | 			typByVal = true; | ||||||
|  | 		} | ||||||
|  | 		datumSerialize(prm->value, prm->isnull, typByVal, typLen, | ||||||
|  | 					   start_address); | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Copy a ParamListInfo structure. | ||||||
|  |  * | ||||||
|  |  * The result is allocated in CurrentMemoryContext. | ||||||
|  |  * | ||||||
|  |  * Note: the intent of this function is to make a static, self-contained | ||||||
|  |  * set of parameter values.  If dynamic parameter hooks are present, we | ||||||
|  |  * intentionally do not copy them into the result.  Rather, we forcibly | ||||||
|  |  * instantiate all available parameter values and copy the datum values. | ||||||
|  |  */ | ||||||
|  | ParamListInfo | ||||||
|  | RestoreParamList(char **start_address) | ||||||
|  | { | ||||||
|  | 	ParamListInfo paramLI; | ||||||
|  | 	Size		size; | ||||||
|  | 	int			i; | ||||||
|  | 	int			nparams; | ||||||
|  |  | ||||||
|  | 	memcpy(&nparams, *start_address, sizeof(int)); | ||||||
|  | 	*start_address += sizeof(int); | ||||||
|  |  | ||||||
|  | 	size = offsetof(ParamListInfoData, params) + | ||||||
|  | 		nparams * sizeof(ParamExternData); | ||||||
|  |  | ||||||
|  | 	paramLI = (ParamListInfo) palloc(size); | ||||||
|  | 	paramLI->paramFetch = NULL; | ||||||
|  | 	paramLI->paramFetchArg = NULL; | ||||||
|  | 	paramLI->parserSetup = NULL; | ||||||
|  | 	paramLI->parserSetupArg = NULL; | ||||||
|  | 	paramLI->numParams = nparams; | ||||||
|  |  | ||||||
|  | 	for (i = 0; i < nparams; i++) | ||||||
|  | 	{ | ||||||
|  | 		ParamExternData *prm = ¶mLI->params[i]; | ||||||
|  |  | ||||||
|  | 		/* Read type OID. */ | ||||||
|  | 		memcpy(&prm->ptype, *start_address, sizeof(Oid)); | ||||||
|  | 		*start_address += sizeof(Oid); | ||||||
|  |  | ||||||
|  | 		/* Read flags. */ | ||||||
|  | 		memcpy(&prm->pflags, *start_address, sizeof(uint16)); | ||||||
|  | 		*start_address += sizeof(uint16); | ||||||
|  |  | ||||||
|  | 		/* Read datum/isnull. */ | ||||||
|  | 		prm->value = datumRestore(start_address, &prm->isnull); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return paramLI; | ||||||
|  | } | ||||||
|   | |||||||
| @@ -1413,6 +1413,7 @@ ReadCommonPlan(Plan *local_node) | |||||||
| 	READ_FLOAT_FIELD(total_cost); | 	READ_FLOAT_FIELD(total_cost); | ||||||
| 	READ_FLOAT_FIELD(plan_rows); | 	READ_FLOAT_FIELD(plan_rows); | ||||||
| 	READ_INT_FIELD(plan_width); | 	READ_INT_FIELD(plan_width); | ||||||
|  | 	READ_INT_FIELD(plan_node_id); | ||||||
| 	READ_NODE_FIELD(targetlist); | 	READ_NODE_FIELD(targetlist); | ||||||
| 	READ_NODE_FIELD(qual); | 	READ_NODE_FIELD(qual); | ||||||
| 	READ_NODE_FIELD(lefttree); | 	READ_NODE_FIELD(lefttree); | ||||||
|   | |||||||
| @@ -196,6 +196,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) | |||||||
| 	glob->nParamExec = 0; | 	glob->nParamExec = 0; | ||||||
| 	glob->lastPHId = 0; | 	glob->lastPHId = 0; | ||||||
| 	glob->lastRowMarkId = 0; | 	glob->lastRowMarkId = 0; | ||||||
|  | 	glob->lastPlanNodeId = 0; | ||||||
| 	glob->transientPlan = false; | 	glob->transientPlan = false; | ||||||
| 	glob->hasRowSecurity = false; | 	glob->hasRowSecurity = false; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -174,6 +174,8 @@ static bool extract_query_dependencies_walker(Node *node, | |||||||
|  * Currently, relations and user-defined functions are the only types of |  * Currently, relations and user-defined functions are the only types of | ||||||
|  * objects that are explicitly tracked this way. |  * objects that are explicitly tracked this way. | ||||||
|  * |  * | ||||||
|  |  * 7. We assign every plan node in the tree a unique ID. | ||||||
|  |  * | ||||||
|  * We also perform one final optimization step, which is to delete |  * We also perform one final optimization step, which is to delete | ||||||
|  * SubqueryScan plan nodes that aren't doing anything useful (ie, have |  * SubqueryScan plan nodes that aren't doing anything useful (ie, have | ||||||
|  * no qual and a no-op targetlist).  The reason for doing this last is that |  * no qual and a no-op targetlist).  The reason for doing this last is that | ||||||
| @@ -436,6 +438,9 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) | |||||||
| 	if (plan == NULL) | 	if (plan == NULL) | ||||||
| 		return NULL; | 		return NULL; | ||||||
|  |  | ||||||
|  | 	/* Assign this node a unique ID. */ | ||||||
|  | 	plan->plan_node_id = root->glob->lastPlanNodeId++; | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * Plan-type-specific fixes | 	 * Plan-type-specific fixes | ||||||
| 	 */ | 	 */ | ||||||
|   | |||||||
| @@ -246,3 +246,121 @@ datumIsEqual(Datum value1, Datum value2, bool typByVal, int typLen) | |||||||
| 	} | 	} | ||||||
| 	return res; | 	return res; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /*------------------------------------------------------------------------- | ||||||
|  |  * datumEstimateSpace | ||||||
|  |  * | ||||||
|  |  * Compute the amount of space that datumSerialize will require for a | ||||||
|  |  * particular Datum. | ||||||
|  |  *------------------------------------------------------------------------- | ||||||
|  |  */ | ||||||
|  | Size | ||||||
|  | datumEstimateSpace(Datum value, bool isnull, bool typByVal, int typLen) | ||||||
|  | { | ||||||
|  | 	Size	sz = sizeof(int); | ||||||
|  |  | ||||||
|  | 	if (!isnull) | ||||||
|  | 	{ | ||||||
|  | 		/* no need to use add_size, can't overflow */ | ||||||
|  | 		if (typByVal) | ||||||
|  | 			sz += sizeof(Datum); | ||||||
|  | 		else | ||||||
|  | 			sz += datumGetSize(value, typByVal, typLen); | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return sz; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /*------------------------------------------------------------------------- | ||||||
|  |  * datumSerialize | ||||||
|  |  * | ||||||
|  |  * Serialize a possibly-NULL datum into caller-provided storage. | ||||||
|  |  * | ||||||
|  |  * The format is as follows: first, we write a 4-byte header word, which | ||||||
|  |  * is either the length of a pass-by-reference datum, -1 for a | ||||||
|  |  * pass-by-value datum, or -2 for a NULL.  If the value is NULL, nothing | ||||||
|  |  * further is written.  If it is pass-by-value, sizeof(Datum) bytes | ||||||
|  |  * follow.  Otherwise, the number of bytes indicated by the header word | ||||||
|  |  * follow.  The caller is responsible for ensuring that there is enough | ||||||
|  |  * storage to store the number of bytes that will be written; use | ||||||
|  |  * datumEstimateSpace() to find out how many will be needed. | ||||||
|  |  * *start_address is updated to point to the byte immediately following | ||||||
|  |  * those written. | ||||||
|  |  *------------------------------------------------------------------------- | ||||||
|  |  */ | ||||||
|  | void | ||||||
|  | datumSerialize(Datum value, bool isnull, bool typByVal, int typLen, | ||||||
|  | 			   char **start_address) | ||||||
|  | { | ||||||
|  | 	int		header; | ||||||
|  |  | ||||||
|  | 	/* Write header word. */ | ||||||
|  | 	if (isnull) | ||||||
|  | 		header = -2; | ||||||
|  | 	else if (typByVal) | ||||||
|  | 		header = -1; | ||||||
|  | 	else | ||||||
|  | 		header = datumGetSize(value, typByVal, typLen); | ||||||
|  | 	memcpy(*start_address, &header, sizeof(int)); | ||||||
|  | 	*start_address += sizeof(int); | ||||||
|  |  | ||||||
|  | 	/* If not null, write payload bytes. */ | ||||||
|  | 	if (!isnull) | ||||||
|  | 	{ | ||||||
|  | 		if (typByVal) | ||||||
|  | 		{ | ||||||
|  | 			memcpy(*start_address, &value, sizeof(Datum)); | ||||||
|  | 			*start_address += sizeof(Datum); | ||||||
|  | 		} | ||||||
|  | 		else | ||||||
|  | 		{ | ||||||
|  | 			memcpy(*start_address, DatumGetPointer(value), header); | ||||||
|  | 			*start_address += header; | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /*------------------------------------------------------------------------- | ||||||
|  |  * datumRestore | ||||||
|  |  * | ||||||
|  |  * Restore a possibly-NULL datum previously serialized by datumSerialize. | ||||||
|  |  * *start_address is updated according to the number of bytes consumed. | ||||||
|  |  *------------------------------------------------------------------------- | ||||||
|  |  */ | ||||||
|  | Datum | ||||||
|  | datumRestore(char **start_address, bool *isnull) | ||||||
|  | { | ||||||
|  | 	int		header; | ||||||
|  | 	void   *d; | ||||||
|  |  | ||||||
|  | 	/* Read header word. */ | ||||||
|  | 	memcpy(&header, *start_address, sizeof(int)); | ||||||
|  | 	*start_address += sizeof(int); | ||||||
|  |  | ||||||
|  | 	/* If this datum is NULL, we can stop here. */ | ||||||
|  | 	if (header == -2) | ||||||
|  | 	{ | ||||||
|  | 		*isnull = true; | ||||||
|  | 		return (Datum) 0; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	/* OK, datum is not null. */ | ||||||
|  | 	*isnull = false; | ||||||
|  |  | ||||||
|  | 	/* If this datum is pass-by-value, sizeof(Datum) bytes follow. */ | ||||||
|  | 	if (header == -1) | ||||||
|  | 	{ | ||||||
|  | 		Datum		val; | ||||||
|  |  | ||||||
|  | 		memcpy(&val, *start_address, sizeof(Datum)); | ||||||
|  | 		*start_address += sizeof(Datum); | ||||||
|  | 		return val; | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	/* Pass-by-reference case; copy indicated number of bytes. */ | ||||||
|  | 	Assert(header > 0); | ||||||
|  | 	d = palloc(header); | ||||||
|  | 	memcpy(d, *start_address, header); | ||||||
|  | 	*start_address += header; | ||||||
|  | 	return PointerGetDatum(d); | ||||||
|  | } | ||||||
|   | |||||||
							
								
								
									
										36
									
								
								src/include/executor/execParallel.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										36
									
								
								src/include/executor/execParallel.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,36 @@ | |||||||
|  | /*-------------------------------------------------------------------- | ||||||
|  |  * execParallel.h | ||||||
|  |  *		POSTGRES parallel execution interface | ||||||
|  |  * | ||||||
|  |  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group | ||||||
|  |  * Portions Copyright (c) 1994, Regents of the University of California | ||||||
|  |  * | ||||||
|  |  * IDENTIFICATION | ||||||
|  |  *		src/include/executor/execParallel.h | ||||||
|  |  *-------------------------------------------------------------------- | ||||||
|  |  */ | ||||||
|  |  | ||||||
|  | #ifndef EXECPARALLEL_H | ||||||
|  | #define EXECPARALLEL_H | ||||||
|  |  | ||||||
|  | #include "access/parallel.h" | ||||||
|  | #include "nodes/execnodes.h" | ||||||
|  | #include "nodes/parsenodes.h" | ||||||
|  | #include "nodes/plannodes.h" | ||||||
|  |  | ||||||
|  | typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation; | ||||||
|  |  | ||||||
|  | typedef struct ParallelExecutorInfo | ||||||
|  | { | ||||||
|  | 	PlanState *planstate; | ||||||
|  | 	ParallelContext *pcxt; | ||||||
|  | 	BufferUsage *buffer_usage; | ||||||
|  | 	SharedExecutorInstrumentation *instrumentation; | ||||||
|  | 	shm_mq_handle **tqueue; | ||||||
|  | }	ParallelExecutorInfo; | ||||||
|  |  | ||||||
|  | extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, | ||||||
|  | 					 EState *estate, int nworkers); | ||||||
|  | extern void ExecParallelFinish(ParallelExecutorInfo *pei); | ||||||
|  |  | ||||||
|  | #endif   /* EXECPARALLEL_H */ | ||||||
| @@ -66,8 +66,13 @@ typedef struct Instrumentation | |||||||
| extern PGDLLIMPORT BufferUsage pgBufferUsage; | extern PGDLLIMPORT BufferUsage pgBufferUsage; | ||||||
|  |  | ||||||
| extern Instrumentation *InstrAlloc(int n, int instrument_options); | extern Instrumentation *InstrAlloc(int n, int instrument_options); | ||||||
|  | extern void InstrInit(Instrumentation *instr, int instrument_options); | ||||||
| extern void InstrStartNode(Instrumentation *instr); | extern void InstrStartNode(Instrumentation *instr); | ||||||
| extern void InstrStopNode(Instrumentation *instr, double nTuples); | extern void InstrStopNode(Instrumentation *instr, double nTuples); | ||||||
| extern void InstrEndLoop(Instrumentation *instr); | extern void InstrEndLoop(Instrumentation *instr); | ||||||
|  | extern void InstrAggNode(Instrumentation *dst, Instrumentation *add); | ||||||
|  | extern void InstrStartParallelQuery(void); | ||||||
|  | extern void InstrEndParallelQuery(BufferUsage *result); | ||||||
|  | extern void InstrAccumParallelQuery(BufferUsage *result); | ||||||
|  |  | ||||||
| #endif   /* INSTRUMENT_H */ | #endif   /* INSTRUMENT_H */ | ||||||
|   | |||||||
| @@ -102,5 +102,8 @@ typedef struct ParamExecData | |||||||
|  |  | ||||||
| /* Functions found in src/backend/nodes/params.c */ | /* Functions found in src/backend/nodes/params.c */ | ||||||
| extern ParamListInfo copyParamList(ParamListInfo from); | extern ParamListInfo copyParamList(ParamListInfo from); | ||||||
|  | extern Size EstimateParamListSpace(ParamListInfo paramLI); | ||||||
|  | extern void SerializeParamList(ParamListInfo paramLI, char **start_address); | ||||||
|  | extern ParamListInfo RestoreParamList(char **start_address); | ||||||
|  |  | ||||||
| #endif   /* PARAMS_H */ | #endif   /* PARAMS_H */ | ||||||
|   | |||||||
| @@ -111,6 +111,7 @@ typedef struct Plan | |||||||
| 	/* | 	/* | ||||||
| 	 * Common structural data for all Plan types. | 	 * Common structural data for all Plan types. | ||||||
| 	 */ | 	 */ | ||||||
|  | 	int			plan_node_id;	/* unique across entire final plan tree */ | ||||||
| 	List	   *targetlist;		/* target list to be computed at this node */ | 	List	   *targetlist;		/* target list to be computed at this node */ | ||||||
| 	List	   *qual;			/* implicitly-ANDed qual conditions */ | 	List	   *qual;			/* implicitly-ANDed qual conditions */ | ||||||
| 	struct Plan *lefttree;		/* input plan tree(s) */ | 	struct Plan *lefttree;		/* input plan tree(s) */ | ||||||
|   | |||||||
| @@ -99,6 +99,8 @@ typedef struct PlannerGlobal | |||||||
|  |  | ||||||
| 	Index		lastRowMarkId;	/* highest PlanRowMark ID assigned */ | 	Index		lastRowMarkId;	/* highest PlanRowMark ID assigned */ | ||||||
|  |  | ||||||
|  | 	int			lastPlanNodeId;	/* highest plan node ID assigned */ | ||||||
|  |  | ||||||
| 	bool		transientPlan;	/* redo plan when TransactionXmin changes? */ | 	bool		transientPlan;	/* redo plan when TransactionXmin changes? */ | ||||||
|  |  | ||||||
| 	bool		hasRowSecurity; /* row security applied? */ | 	bool		hasRowSecurity; /* row security applied? */ | ||||||
|   | |||||||
| @@ -46,4 +46,14 @@ extern Datum datumTransfer(Datum value, bool typByVal, int typLen); | |||||||
| extern bool datumIsEqual(Datum value1, Datum value2, | extern bool datumIsEqual(Datum value1, Datum value2, | ||||||
| 			 bool typByVal, int typLen); | 			 bool typByVal, int typLen); | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Serialize and restore datums so that we can transfer them to parallel | ||||||
|  |  * workers. | ||||||
|  |  */ | ||||||
|  | extern Size datumEstimateSpace(Datum value, bool isnull, bool typByVal, | ||||||
|  | 				   int typLen); | ||||||
|  | extern void datumSerialize(Datum value, bool isnull, bool typByVal, | ||||||
|  | 			   int typLen, char **start_address); | ||||||
|  | extern Datum datumRestore(char **start_address, bool *isnull); | ||||||
|  |  | ||||||
| #endif   /* DATUM_H */ | #endif   /* DATUM_H */ | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user