mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Allow EXPLAIN (ANALYZE, VERBOSE) to display per-worker statistics.
The original parallel sequential scan commit included only very limited changes to the EXPLAIN output. Aggregated totals from all workers were displayed, but there was no way to see what each individual worker did or to distinguish the effort made by the workers from the effort made by the leader. Per a gripe by Thom Brown (and maybe others). Patch by me, reviewed by Amit Kapila.
This commit is contained in:
		| @@ -103,6 +103,7 @@ static void show_instrumentation_count(const char *qlabel, int which, | |||||||
| 						   PlanState *planstate, ExplainState *es); | 						   PlanState *planstate, ExplainState *es); | ||||||
| static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es); | static void show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es); | ||||||
| static const char *explain_get_index_name(Oid indexId); | static const char *explain_get_index_name(Oid indexId); | ||||||
|  | static void show_buffer_usage(ExplainState *es, const BufferUsage *usage); | ||||||
| static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir, | static void ExplainIndexScanDetails(Oid indexid, ScanDirection indexorderdir, | ||||||
| 						ExplainState *es); | 						ExplainState *es); | ||||||
| static void ExplainScanTarget(Scan *plan, ExplainState *es); | static void ExplainScanTarget(Scan *plan, ExplainState *es); | ||||||
| @@ -1437,108 +1438,73 @@ ExplainNode(PlanState *planstate, List *ancestors, | |||||||
|  |  | ||||||
| 	/* Show buffer usage */ | 	/* Show buffer usage */ | ||||||
| 	if (es->buffers && planstate->instrument) | 	if (es->buffers && planstate->instrument) | ||||||
|  | 		show_buffer_usage(es, &planstate->instrument->bufusage); | ||||||
|  |  | ||||||
|  | 	/* Show worker detail */ | ||||||
|  | 	if (es->analyze && es->verbose && planstate->worker_instrument) | ||||||
| 	{ | 	{ | ||||||
| 		const BufferUsage *usage = &planstate->instrument->bufusage; | 		WorkerInstrumentation *w = planstate->worker_instrument; | ||||||
|  | 		bool		opened_group = false; | ||||||
|  | 		int			n; | ||||||
|  |  | ||||||
| 		if (es->format == EXPLAIN_FORMAT_TEXT) | 		for (n = 0; n < w->num_workers; ++n) | ||||||
| 		{ | 		{ | ||||||
| 			bool		has_shared = (usage->shared_blks_hit > 0 || | 			Instrumentation *instrument = &w->instrument[n]; | ||||||
| 									  usage->shared_blks_read > 0 || | 			double		nloops = instrument->nloops; | ||||||
| 									  usage->shared_blks_dirtied > 0 || | 			double		startup_sec; | ||||||
| 									  usage->shared_blks_written > 0); | 			double		total_sec; | ||||||
| 			bool		has_local = (usage->local_blks_hit > 0 || | 			double		rows; | ||||||
| 									 usage->local_blks_read > 0 || |  | ||||||
| 									 usage->local_blks_dirtied > 0 || |  | ||||||
| 									 usage->local_blks_written > 0); |  | ||||||
| 			bool		has_temp = (usage->temp_blks_read > 0 || |  | ||||||
| 									usage->temp_blks_written > 0); |  | ||||||
| 			bool		has_timing = (!INSTR_TIME_IS_ZERO(usage->blk_read_time) || |  | ||||||
| 								 !INSTR_TIME_IS_ZERO(usage->blk_write_time)); |  | ||||||
|  |  | ||||||
| 			/* Show only positive counter values. */ | 			if (nloops <= 0) | ||||||
| 			if (has_shared || has_local || has_temp) | 				continue; | ||||||
|  | 			startup_sec = 1000.0 * instrument->startup / nloops; | ||||||
|  | 			total_sec = 1000.0 * instrument->total / nloops; | ||||||
|  | 			rows = instrument->ntuples / nloops; | ||||||
|  |  | ||||||
|  | 			if (es->format == EXPLAIN_FORMAT_TEXT) | ||||||
| 			{ | 			{ | ||||||
| 				appendStringInfoSpaces(es->str, es->indent * 2); | 				appendStringInfoSpaces(es->str, es->indent * 2); | ||||||
| 				appendStringInfoString(es->str, "Buffers:"); | 				appendStringInfo(es->str, "Worker %d: ", n); | ||||||
|  | 				if (es->timing) | ||||||
| 				if (has_shared) | 					appendStringInfo(es->str, | ||||||
| 				{ | 							"actual time=%.3f..%.3f rows=%.0f loops=%.0f\n", | ||||||
| 					appendStringInfoString(es->str, " shared"); | 								 startup_sec, total_sec, rows, nloops); | ||||||
| 					if (usage->shared_blks_hit > 0) | 				else | ||||||
| 						appendStringInfo(es->str, " hit=%ld", | 					appendStringInfo(es->str, | ||||||
| 										 usage->shared_blks_hit); | 									 "actual rows=%.0f loops=%.0f\n", | ||||||
| 					if (usage->shared_blks_read > 0) | 									 rows, nloops); | ||||||
| 						appendStringInfo(es->str, " read=%ld", | 				es->indent++; | ||||||
| 										 usage->shared_blks_read); | 				if (es->buffers) | ||||||
| 					if (usage->shared_blks_dirtied > 0) | 					show_buffer_usage(es, &instrument->bufusage); | ||||||
| 						appendStringInfo(es->str, " dirtied=%ld", | 				es->indent--; | ||||||
| 										 usage->shared_blks_dirtied); |  | ||||||
| 					if (usage->shared_blks_written > 0) |  | ||||||
| 						appendStringInfo(es->str, " written=%ld", |  | ||||||
| 										 usage->shared_blks_written); |  | ||||||
| 					if (has_local || has_temp) |  | ||||||
| 						appendStringInfoChar(es->str, ','); |  | ||||||
| 				} |  | ||||||
| 				if (has_local) |  | ||||||
| 				{ |  | ||||||
| 					appendStringInfoString(es->str, " local"); |  | ||||||
| 					if (usage->local_blks_hit > 0) |  | ||||||
| 						appendStringInfo(es->str, " hit=%ld", |  | ||||||
| 										 usage->local_blks_hit); |  | ||||||
| 					if (usage->local_blks_read > 0) |  | ||||||
| 						appendStringInfo(es->str, " read=%ld", |  | ||||||
| 										 usage->local_blks_read); |  | ||||||
| 					if (usage->local_blks_dirtied > 0) |  | ||||||
| 						appendStringInfo(es->str, " dirtied=%ld", |  | ||||||
| 										 usage->local_blks_dirtied); |  | ||||||
| 					if (usage->local_blks_written > 0) |  | ||||||
| 						appendStringInfo(es->str, " written=%ld", |  | ||||||
| 										 usage->local_blks_written); |  | ||||||
| 					if (has_temp) |  | ||||||
| 						appendStringInfoChar(es->str, ','); |  | ||||||
| 				} |  | ||||||
| 				if (has_temp) |  | ||||||
| 				{ |  | ||||||
| 					appendStringInfoString(es->str, " temp"); |  | ||||||
| 					if (usage->temp_blks_read > 0) |  | ||||||
| 						appendStringInfo(es->str, " read=%ld", |  | ||||||
| 										 usage->temp_blks_read); |  | ||||||
| 					if (usage->temp_blks_written > 0) |  | ||||||
| 						appendStringInfo(es->str, " written=%ld", |  | ||||||
| 										 usage->temp_blks_written); |  | ||||||
| 				} |  | ||||||
| 				appendStringInfoChar(es->str, '\n'); |  | ||||||
| 			} | 			} | ||||||
|  | 			else | ||||||
| 			/* As above, show only positive counter values. */ |  | ||||||
| 			if (has_timing) |  | ||||||
| 			{ | 			{ | ||||||
| 				appendStringInfoSpaces(es->str, es->indent * 2); | 				if (!opened_group) | ||||||
| 				appendStringInfoString(es->str, "I/O Timings:"); | 				{ | ||||||
| 				if (!INSTR_TIME_IS_ZERO(usage->blk_read_time)) | 					ExplainOpenGroup("Workers", "Workers", false, es); | ||||||
| 					appendStringInfo(es->str, " read=%0.3f", | 					opened_group = true; | ||||||
| 							  INSTR_TIME_GET_MILLISEC(usage->blk_read_time)); | 				} | ||||||
| 				if (!INSTR_TIME_IS_ZERO(usage->blk_write_time)) | 				ExplainOpenGroup("Worker", NULL, true, es); | ||||||
| 					appendStringInfo(es->str, " write=%0.3f", | 				ExplainPropertyInteger("Worker Number", n, es); | ||||||
| 							 INSTR_TIME_GET_MILLISEC(usage->blk_write_time)); |  | ||||||
| 				appendStringInfoChar(es->str, '\n'); | 				if (es->timing) | ||||||
|  | 				{ | ||||||
|  | 					ExplainPropertyFloat("Actual Startup Time", startup_sec, 3, es); | ||||||
|  | 					ExplainPropertyFloat("Actual Total Time", total_sec, 3, es); | ||||||
|  | 				} | ||||||
|  | 				ExplainPropertyFloat("Actual Rows", rows, 0, es); | ||||||
|  | 				ExplainPropertyFloat("Actual Loops", nloops, 0, es); | ||||||
|  |  | ||||||
|  | 				if (es->buffers) | ||||||
|  | 					show_buffer_usage(es, &instrument->bufusage); | ||||||
|  |  | ||||||
|  | 				ExplainCloseGroup("Worker", NULL, true, es); | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		else |  | ||||||
| 		{ | 		if (opened_group) | ||||||
| 			ExplainPropertyLong("Shared Hit Blocks", usage->shared_blks_hit, es); | 			ExplainCloseGroup("Workers", "Workers", false, es); | ||||||
| 			ExplainPropertyLong("Shared Read Blocks", usage->shared_blks_read, es); |  | ||||||
| 			ExplainPropertyLong("Shared Dirtied Blocks", usage->shared_blks_dirtied, es); |  | ||||||
| 			ExplainPropertyLong("Shared Written Blocks", usage->shared_blks_written, es); |  | ||||||
| 			ExplainPropertyLong("Local Hit Blocks", usage->local_blks_hit, es); |  | ||||||
| 			ExplainPropertyLong("Local Read Blocks", usage->local_blks_read, es); |  | ||||||
| 			ExplainPropertyLong("Local Dirtied Blocks", usage->local_blks_dirtied, es); |  | ||||||
| 			ExplainPropertyLong("Local Written Blocks", usage->local_blks_written, es); |  | ||||||
| 			ExplainPropertyLong("Temp Read Blocks", usage->temp_blks_read, es); |  | ||||||
| 			ExplainPropertyLong("Temp Written Blocks", usage->temp_blks_written, es); |  | ||||||
| 			ExplainPropertyFloat("I/O Read Time", INSTR_TIME_GET_MILLISEC(usage->blk_read_time), 3, es); |  | ||||||
| 			ExplainPropertyFloat("I/O Write Time", INSTR_TIME_GET_MILLISEC(usage->blk_write_time), 3, es); |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	/* Get ready to display the child plans */ | 	/* Get ready to display the child plans */ | ||||||
| @@ -2276,6 +2242,113 @@ explain_get_index_name(Oid indexId) | |||||||
| 	return result; | 	return result; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /* | ||||||
|  |  * Show buffer usage details. | ||||||
|  |  */ | ||||||
|  | static void | ||||||
|  | show_buffer_usage(ExplainState *es, const BufferUsage *usage) | ||||||
|  | { | ||||||
|  | 	if (es->format == EXPLAIN_FORMAT_TEXT) | ||||||
|  | 	{ | ||||||
|  | 		bool		has_shared = (usage->shared_blks_hit > 0 || | ||||||
|  | 								  usage->shared_blks_read > 0 || | ||||||
|  | 								  usage->shared_blks_dirtied > 0 || | ||||||
|  | 								  usage->shared_blks_written > 0); | ||||||
|  | 		bool		has_local = (usage->local_blks_hit > 0 || | ||||||
|  | 								 usage->local_blks_read > 0 || | ||||||
|  | 								 usage->local_blks_dirtied > 0 || | ||||||
|  | 								 usage->local_blks_written > 0); | ||||||
|  | 		bool		has_temp = (usage->temp_blks_read > 0 || | ||||||
|  | 								usage->temp_blks_written > 0); | ||||||
|  | 		bool		has_timing = (!INSTR_TIME_IS_ZERO(usage->blk_read_time) || | ||||||
|  | 								 !INSTR_TIME_IS_ZERO(usage->blk_write_time)); | ||||||
|  |  | ||||||
|  | 		/* Show only positive counter values. */ | ||||||
|  | 		if (has_shared || has_local || has_temp) | ||||||
|  | 		{ | ||||||
|  | 			appendStringInfoSpaces(es->str, es->indent * 2); | ||||||
|  | 			appendStringInfoString(es->str, "Buffers:"); | ||||||
|  |  | ||||||
|  | 			if (has_shared) | ||||||
|  | 			{ | ||||||
|  | 				appendStringInfoString(es->str, " shared"); | ||||||
|  | 				if (usage->shared_blks_hit > 0) | ||||||
|  | 					appendStringInfo(es->str, " hit=%ld", | ||||||
|  | 									 usage->shared_blks_hit); | ||||||
|  | 				if (usage->shared_blks_read > 0) | ||||||
|  | 					appendStringInfo(es->str, " read=%ld", | ||||||
|  | 									 usage->shared_blks_read); | ||||||
|  | 				if (usage->shared_blks_dirtied > 0) | ||||||
|  | 					appendStringInfo(es->str, " dirtied=%ld", | ||||||
|  | 									 usage->shared_blks_dirtied); | ||||||
|  | 				if (usage->shared_blks_written > 0) | ||||||
|  | 					appendStringInfo(es->str, " written=%ld", | ||||||
|  | 									 usage->shared_blks_written); | ||||||
|  | 				if (has_local || has_temp) | ||||||
|  | 					appendStringInfoChar(es->str, ','); | ||||||
|  | 			} | ||||||
|  | 			if (has_local) | ||||||
|  | 			{ | ||||||
|  | 				appendStringInfoString(es->str, " local"); | ||||||
|  | 				if (usage->local_blks_hit > 0) | ||||||
|  | 					appendStringInfo(es->str, " hit=%ld", | ||||||
|  | 									 usage->local_blks_hit); | ||||||
|  | 				if (usage->local_blks_read > 0) | ||||||
|  | 					appendStringInfo(es->str, " read=%ld", | ||||||
|  | 									 usage->local_blks_read); | ||||||
|  | 				if (usage->local_blks_dirtied > 0) | ||||||
|  | 					appendStringInfo(es->str, " dirtied=%ld", | ||||||
|  | 									 usage->local_blks_dirtied); | ||||||
|  | 				if (usage->local_blks_written > 0) | ||||||
|  | 					appendStringInfo(es->str, " written=%ld", | ||||||
|  | 									 usage->local_blks_written); | ||||||
|  | 				if (has_temp) | ||||||
|  | 					appendStringInfoChar(es->str, ','); | ||||||
|  | 			} | ||||||
|  | 			if (has_temp) | ||||||
|  | 			{ | ||||||
|  | 				appendStringInfoString(es->str, " temp"); | ||||||
|  | 				if (usage->temp_blks_read > 0) | ||||||
|  | 					appendStringInfo(es->str, " read=%ld", | ||||||
|  | 									 usage->temp_blks_read); | ||||||
|  | 				if (usage->temp_blks_written > 0) | ||||||
|  | 					appendStringInfo(es->str, " written=%ld", | ||||||
|  | 									 usage->temp_blks_written); | ||||||
|  | 			} | ||||||
|  | 			appendStringInfoChar(es->str, '\n'); | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		/* As above, show only positive counter values. */ | ||||||
|  | 		if (has_timing) | ||||||
|  | 		{ | ||||||
|  | 			appendStringInfoSpaces(es->str, es->indent * 2); | ||||||
|  | 			appendStringInfoString(es->str, "I/O Timings:"); | ||||||
|  | 			if (!INSTR_TIME_IS_ZERO(usage->blk_read_time)) | ||||||
|  | 				appendStringInfo(es->str, " read=%0.3f", | ||||||
|  | 						  INSTR_TIME_GET_MILLISEC(usage->blk_read_time)); | ||||||
|  | 			if (!INSTR_TIME_IS_ZERO(usage->blk_write_time)) | ||||||
|  | 				appendStringInfo(es->str, " write=%0.3f", | ||||||
|  | 						 INSTR_TIME_GET_MILLISEC(usage->blk_write_time)); | ||||||
|  | 			appendStringInfoChar(es->str, '\n'); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	else | ||||||
|  | 	{ | ||||||
|  | 		ExplainPropertyLong("Shared Hit Blocks", usage->shared_blks_hit, es); | ||||||
|  | 		ExplainPropertyLong("Shared Read Blocks", usage->shared_blks_read, es); | ||||||
|  | 		ExplainPropertyLong("Shared Dirtied Blocks", usage->shared_blks_dirtied, es); | ||||||
|  | 		ExplainPropertyLong("Shared Written Blocks", usage->shared_blks_written, es); | ||||||
|  | 		ExplainPropertyLong("Local Hit Blocks", usage->local_blks_hit, es); | ||||||
|  | 		ExplainPropertyLong("Local Read Blocks", usage->local_blks_read, es); | ||||||
|  | 		ExplainPropertyLong("Local Dirtied Blocks", usage->local_blks_dirtied, es); | ||||||
|  | 		ExplainPropertyLong("Local Written Blocks", usage->local_blks_written, es); | ||||||
|  | 		ExplainPropertyLong("Temp Read Blocks", usage->temp_blks_read, es); | ||||||
|  | 		ExplainPropertyLong("Temp Written Blocks", usage->temp_blks_written, es); | ||||||
|  | 		ExplainPropertyFloat("I/O Read Time", INSTR_TIME_GET_MILLISEC(usage->blk_read_time), 3, es); | ||||||
|  | 		ExplainPropertyFloat("I/O Write Time", INSTR_TIME_GET_MILLISEC(usage->blk_write_time), 3, es); | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| /* | /* | ||||||
|  * Add some additional details about an IndexScan or IndexOnlyScan |  * Add some additional details about an IndexScan or IndexOnlyScan | ||||||
|  */ |  */ | ||||||
|   | |||||||
| @@ -48,21 +48,19 @@ | |||||||
|  |  | ||||||
| #define PARALLEL_TUPLE_QUEUE_SIZE		65536 | #define PARALLEL_TUPLE_QUEUE_SIZE		65536 | ||||||
|  |  | ||||||
| /* DSM structure for accumulating per-PlanState instrumentation. */ |  | ||||||
| typedef struct SharedPlanStateInstrumentation |  | ||||||
| { |  | ||||||
| 	int plan_node_id; |  | ||||||
| 	slock_t mutex; |  | ||||||
| 	Instrumentation	instr; |  | ||||||
| } SharedPlanStateInstrumentation; |  | ||||||
|  |  | ||||||
| /* DSM structure for accumulating per-PlanState instrumentation. */ | /* DSM structure for accumulating per-PlanState instrumentation. */ | ||||||
| struct SharedExecutorInstrumentation | struct SharedExecutorInstrumentation | ||||||
| { | { | ||||||
| 	int instrument_options; | 	int instrument_options; | ||||||
| 	int ps_ninstrument;			/* # of ps_instrument structures following */ | 	int instrument_offset;		/* offset of first Instrumentation struct */ | ||||||
| 	SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER]; | 	int num_workers;							/* # of workers */ | ||||||
|  | 	int num_plan_nodes;							/* # of plan nodes */ | ||||||
|  | 	int plan_node_id[FLEXIBLE_ARRAY_MEMBER];	/* array of plan node IDs */ | ||||||
|  | 	/* array of num_plan_nodes * num_workers Instrumentation objects follows */ | ||||||
| }; | }; | ||||||
|  | #define GetInstrumentationArray(sei) \ | ||||||
|  | 	(AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \ | ||||||
|  | 	 (Instrumentation *) (((char *) sei) + sei->instrument_offset)) | ||||||
|  |  | ||||||
| /* Context object for ExecParallelEstimate. */ | /* Context object for ExecParallelEstimate. */ | ||||||
| typedef struct ExecParallelEstimateContext | typedef struct ExecParallelEstimateContext | ||||||
| @@ -196,18 +194,10 @@ ExecParallelInitializeDSM(PlanState *planstate, | |||||||
| 	if (planstate == NULL) | 	if (planstate == NULL) | ||||||
| 		return false; | 		return false; | ||||||
|  |  | ||||||
| 	/* If instrumentation is enabled, initialize array slot for this node. */ | 	/* If instrumentation is enabled, initialize slot for this node. */ | ||||||
| 	if (d->instrumentation != NULL) | 	if (d->instrumentation != NULL) | ||||||
| 	{ | 		d->instrumentation->plan_node_id[d->nnodes] = | ||||||
| 		SharedPlanStateInstrumentation *instrumentation; | 			planstate->plan->plan_node_id; | ||||||
|  |  | ||||||
| 		instrumentation = &d->instrumentation->ps_instrument[d->nnodes]; |  | ||||||
| 		Assert(d->nnodes < d->instrumentation->ps_ninstrument); |  | ||||||
| 		instrumentation->plan_node_id = planstate->plan->plan_node_id; |  | ||||||
| 		SpinLockInit(&instrumentation->mutex); |  | ||||||
| 		InstrInit(&instrumentation->instr, |  | ||||||
| 				  d->instrumentation->instrument_options); |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	/* Count this node. */ | 	/* Count this node. */ | ||||||
| 	d->nnodes++; | 	d->nnodes++; | ||||||
| @@ -307,6 +297,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) | |||||||
| 	int			pstmt_len; | 	int			pstmt_len; | ||||||
| 	int			param_len; | 	int			param_len; | ||||||
| 	int			instrumentation_len = 0; | 	int			instrumentation_len = 0; | ||||||
|  | 	int			instrument_offset = 0; | ||||||
|  |  | ||||||
| 	/* Allocate object for return value. */ | 	/* Allocate object for return value. */ | ||||||
| 	pei = palloc0(sizeof(ParallelExecutorInfo)); | 	pei = palloc0(sizeof(ParallelExecutorInfo)); | ||||||
| @@ -364,8 +355,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) | |||||||
| 	if (estate->es_instrument) | 	if (estate->es_instrument) | ||||||
| 	{ | 	{ | ||||||
| 		instrumentation_len = | 		instrumentation_len = | ||||||
| 			offsetof(SharedExecutorInstrumentation, ps_instrument) | 			offsetof(SharedExecutorInstrumentation, plan_node_id) | ||||||
| 			+ sizeof(SharedPlanStateInstrumentation) * e.nnodes; | 			+ sizeof(int) * e.nnodes; | ||||||
|  | 		instrumentation_len = MAXALIGN(instrumentation_len); | ||||||
|  | 		instrument_offset = instrumentation_len; | ||||||
|  | 		instrumentation_len += sizeof(Instrumentation) * e.nnodes * nworkers; | ||||||
| 		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len); | 		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len); | ||||||
| 		shm_toc_estimate_keys(&pcxt->estimator, 1); | 		shm_toc_estimate_keys(&pcxt->estimator, 1); | ||||||
| 	} | 	} | ||||||
| @@ -407,9 +401,17 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) | |||||||
| 	 */ | 	 */ | ||||||
| 	if (estate->es_instrument) | 	if (estate->es_instrument) | ||||||
| 	{ | 	{ | ||||||
|  | 		Instrumentation *instrument; | ||||||
|  | 		int		i; | ||||||
|  |  | ||||||
| 		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len); | 		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len); | ||||||
| 		instrumentation->instrument_options = estate->es_instrument; | 		instrumentation->instrument_options = estate->es_instrument; | ||||||
| 		instrumentation->ps_ninstrument = e.nnodes; | 		instrumentation->instrument_offset = instrument_offset; | ||||||
|  | 		instrumentation->num_workers = nworkers; | ||||||
|  | 		instrumentation->num_plan_nodes = e.nnodes; | ||||||
|  | 		instrument = GetInstrumentationArray(instrumentation); | ||||||
|  | 		for (i = 0; i < nworkers * e.nnodes; ++i) | ||||||
|  | 			InstrInit(&instrument[i], estate->es_instrument); | ||||||
| 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, | 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, | ||||||
| 					   instrumentation); | 					   instrumentation); | ||||||
| 		pei->instrumentation = instrumentation; | 		pei->instrumentation = instrumentation; | ||||||
| @@ -444,20 +446,31 @@ static bool | |||||||
| ExecParallelRetrieveInstrumentation(PlanState *planstate, | ExecParallelRetrieveInstrumentation(PlanState *planstate, | ||||||
| 						  SharedExecutorInstrumentation *instrumentation) | 						  SharedExecutorInstrumentation *instrumentation) | ||||||
| { | { | ||||||
|  | 	Instrumentation *instrument; | ||||||
| 	int		i; | 	int		i; | ||||||
|  | 	int		n; | ||||||
|  | 	int		ibytes; | ||||||
| 	int		plan_node_id = planstate->plan->plan_node_id; | 	int		plan_node_id = planstate->plan->plan_node_id; | ||||||
| 	SharedPlanStateInstrumentation *ps_instrument; |  | ||||||
|  |  | ||||||
| 	/* Find the instumentation for this node. */ | 	/* Find the instumentation for this node. */ | ||||||
| 	for (i = 0; i < instrumentation->ps_ninstrument; ++i) | 	for (i = 0; i < instrumentation->num_plan_nodes; ++i) | ||||||
| 		if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id) | 		if (instrumentation->plan_node_id[i] == plan_node_id) | ||||||
| 			break; | 			break; | ||||||
| 	if (i >= instrumentation->ps_ninstrument) | 	if (i >= instrumentation->num_plan_nodes) | ||||||
| 		elog(ERROR, "plan node %d not found", plan_node_id); | 		elog(ERROR, "plan node %d not found", plan_node_id); | ||||||
|  |  | ||||||
| 	/* No need to acquire the spinlock here; workers have exited already. */ | 	/* Accumulate the statistics from all workers. */ | ||||||
| 	ps_instrument = &instrumentation->ps_instrument[i]; | 	instrument = GetInstrumentationArray(instrumentation); | ||||||
| 	InstrAggNode(planstate->instrument, &ps_instrument->instr); | 	instrument += i * instrumentation->num_workers; | ||||||
|  | 	for (n = 0; n < instrumentation->num_workers; ++n) | ||||||
|  | 		InstrAggNode(planstate->instrument, &instrument[n]); | ||||||
|  |  | ||||||
|  | 	/* Also store the per-worker detail. */ | ||||||
|  | 	ibytes = instrumentation->num_workers * sizeof(Instrumentation); | ||||||
|  | 	planstate->worker_instrument = | ||||||
|  | 		palloc(offsetof(WorkerInstrumentation, instrument) + ibytes); | ||||||
|  | 	planstate->worker_instrument->num_workers = instrumentation->num_workers; | ||||||
|  | 	memcpy(&planstate->worker_instrument->instrument, instrument, ibytes); | ||||||
|  |  | ||||||
| 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation, | 	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation, | ||||||
| 								 instrumentation); | 								 instrumentation); | ||||||
| @@ -568,7 +581,9 @@ ExecParallelReportInstrumentation(PlanState *planstate, | |||||||
| { | { | ||||||
| 	int		i; | 	int		i; | ||||||
| 	int		plan_node_id = planstate->plan->plan_node_id; | 	int		plan_node_id = planstate->plan->plan_node_id; | ||||||
| 	SharedPlanStateInstrumentation *ps_instrument; | 	Instrumentation *instrument; | ||||||
|  |  | ||||||
|  | 	InstrEndLoop(planstate->instrument); | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * If we shuffled the plan_node_id values in ps_instrument into sorted | 	 * If we shuffled the plan_node_id values in ps_instrument into sorted | ||||||
| @@ -576,20 +591,21 @@ ExecParallelReportInstrumentation(PlanState *planstate, | |||||||
| 	 * if we're pushing down sufficiently large plan trees.  For now, do it | 	 * if we're pushing down sufficiently large plan trees.  For now, do it | ||||||
| 	 * the slow, dumb way. | 	 * the slow, dumb way. | ||||||
| 	 */ | 	 */ | ||||||
| 	for (i = 0; i < instrumentation->ps_ninstrument; ++i) | 	for (i = 0; i < instrumentation->num_plan_nodes; ++i) | ||||||
| 		if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id) | 		if (instrumentation->plan_node_id[i] == plan_node_id) | ||||||
| 			break; | 			break; | ||||||
| 	if (i >= instrumentation->ps_ninstrument) | 	if (i >= instrumentation->num_plan_nodes) | ||||||
| 		elog(ERROR, "plan node %d not found", plan_node_id); | 		elog(ERROR, "plan node %d not found", plan_node_id); | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * There's one SharedPlanStateInstrumentation per plan_node_id, so we | 	 * Add our statistics to the per-node, per-worker totals.  It's possible | ||||||
| 	 * must use a spinlock in case multiple workers report at the same time. | 	 * that this could happen more than once if we relaunched workers. | ||||||
| 	 */ | 	 */ | ||||||
| 	ps_instrument = &instrumentation->ps_instrument[i]; | 	instrument = GetInstrumentationArray(instrumentation); | ||||||
| 	SpinLockAcquire(&ps_instrument->mutex); | 	instrument += i * instrumentation->num_workers; | ||||||
| 	InstrAggNode(&ps_instrument->instr, planstate->instrument); | 	Assert(IsParallelWorker()); | ||||||
| 	SpinLockRelease(&ps_instrument->mutex); | 	Assert(ParallelWorkerNumber < instrumentation->num_workers); | ||||||
|  | 	InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument); | ||||||
|  |  | ||||||
| 	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation, | 	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation, | ||||||
| 								 instrumentation); | 								 instrumentation); | ||||||
|   | |||||||
| @@ -63,6 +63,12 @@ typedef struct Instrumentation | |||||||
| 	BufferUsage bufusage;		/* Total buffer usage */ | 	BufferUsage bufusage;		/* Total buffer usage */ | ||||||
| } Instrumentation; | } Instrumentation; | ||||||
|  |  | ||||||
|  | typedef struct WorkerInstrumentation | ||||||
|  | { | ||||||
|  | 	int			num_workers;	/* # of structures that follow */ | ||||||
|  | 	Instrumentation instrument[FLEXIBLE_ARRAY_MEMBER]; | ||||||
|  | } WorkerInstrumentation; | ||||||
|  |  | ||||||
| extern PGDLLIMPORT BufferUsage pgBufferUsage; | extern PGDLLIMPORT BufferUsage pgBufferUsage; | ||||||
|  |  | ||||||
| extern Instrumentation *InstrAlloc(int n, int instrument_options); | extern Instrumentation *InstrAlloc(int n, int instrument_options); | ||||||
|   | |||||||
| @@ -1029,6 +1029,7 @@ typedef struct PlanState | |||||||
| 								 * top-level plan */ | 								 * top-level plan */ | ||||||
|  |  | ||||||
| 	Instrumentation *instrument;	/* Optional runtime stats for this node */ | 	Instrumentation *instrument;	/* Optional runtime stats for this node */ | ||||||
|  | 	WorkerInstrumentation *worker_instrument; /* per-worker instrumentation */ | ||||||
|  |  | ||||||
| 	/* | 	/* | ||||||
| 	 * Common structural data for all Plan types.  These links to subsidiary | 	 * Common structural data for all Plan types.  These links to subsidiary | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user