1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

Collect JIT instrumentation from workers.

Previously, when using parallel query, EXPLAIN (ANALYZE)'s JIT
compilation timings did not include the overhead from doing so on the
workers.  Fix that.

We do so by simply aggregating the cost of doing JIT compilation on
workers and the leader together. Arguably that's not quite accurate,
because the total time spend doing so is spent in parallel - but it's
hard to do much better.  For additional detail, when VERBOSE is
specified, the stats for workers are displayed separately.

Author: Amit Khandekar and Andres Freund
Discussion: https://postgr.es/m/CAJ3gD9eLrz51RK_gTkod+71iDcjpB_N8eC6vU2AW-VicsAERpQ@mail.gmail.com
Backpatch: 11-
This commit is contained in:
Andres Freund
2018-09-25 12:54:29 -07:00
parent 9590f7d6c6
commit e63441c3f5
11 changed files with 211 additions and 46 deletions

View File

@ -48,6 +48,7 @@
#include "executor/execdebug.h"
#include "executor/nodeSubplan.h"
#include "foreign/fdwapi.h"
#include "jit/jit.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/clauses.h"
@ -494,6 +495,21 @@ standard_ExecutorEnd(QueryDesc *queryDesc)
ExecEndPlan(queryDesc->planstate, estate);
/*
* If this process has done JIT, either merge stats into worker stats, or
* use this process' stats as the global stats if no parallelism was used
* / no workers did JIT.
*/
if (estate->es_instrument && queryDesc->estate->es_jit)
{
if (queryDesc->estate->es_jit_combined_instr)
InstrJitAgg(queryDesc->estate->es_jit_combined_instr,
&queryDesc->estate->es_jit->instr);
else
queryDesc->estate->es_jit_combined_instr =
&queryDesc->estate->es_jit->instr;
}
/* do away with our snapshots */
UnregisterSnapshot(estate->es_snapshot);
UnregisterSnapshot(estate->es_crosscheck_snapshot);

View File

@ -37,6 +37,7 @@
#include "executor/nodeSort.h"
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
#include "jit/jit.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planmain.h"
#include "optimizer/planner.h"
@ -62,6 +63,7 @@
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@ -573,9 +575,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
char *paramlistinfo_space;
BufferUsage *bufusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
SharedJitInstrumentation *jit_instrumentation = NULL;
int pstmt_len;
int paramlistinfo_len;
int instrumentation_len = 0;
int jit_instrumentation_len = 0;
int instrument_offset = 0;
Size dsa_minsize = dsa_minimum_size();
char *query_string;
@ -669,6 +673,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
mul_size(e.nnodes, nworkers));
shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for JIT instrumentation, if required. */
if (estate->es_jit_flags != PGJIT_NONE)
{
jit_instrumentation_len =
offsetof(SharedJitInstrumentation, jit_instr) +
sizeof(JitInstrumentation) * nworkers;
shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
}
/* Estimate space for DSA area. */
@ -742,6 +756,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
instrumentation);
pei->instrumentation = instrumentation;
if (estate->es_jit_flags != PGJIT_NONE)
{
jit_instrumentation = shm_toc_allocate(pcxt->toc,
jit_instrumentation_len);
jit_instrumentation->num_workers = nworkers;
memset(jit_instrumentation->jit_instr, 0,
sizeof(JitInstrumentation) * nworkers);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
jit_instrumentation);
pei->jit_instrumentation = jit_instrumentation;
}
}
/*
@ -1003,6 +1029,46 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
instrumentation);
}
/*
* Add up the workers' JIT instrumentation from dynamic shared memory.
*/
static void
ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
SharedJitInstrumentation *shared_jit)
{
JitInstrumentation *combined;
int ibytes;
int n;
/*
* Accumulate worker JIT instrumentation into the combined JIT
* instrumentation, allocating it if required. Note this is kept separate
* from the leader's own instrumentation.
*/
if (!planstate->state->es_jit_combined_instr)
planstate->state->es_jit_combined_instr =
MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
combined = planstate->state->es_jit_combined_instr;
/* Accummulate all the workers' instrumentations. */
for (n = 0; n < shared_jit->num_workers; ++n)
InstrJitAgg(combined, &shared_jit->jit_instr[n]);
/*
* Store the per-worker detail.
*
* Similar to ExecParallelRetrieveInstrumentation(), allocate the
* instrumentation in per-query context.
*/
ibytes = offsetof(SharedJitInstrumentation, jit_instr)
+ mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
planstate->worker_jit_instrument =
MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
}
/*
* Finish parallel execution. We wait for parallel workers to finish, and
* accumulate their buffer usage.
@ -1068,6 +1134,11 @@ ExecParallelCleanup(ParallelExecutorInfo *pei)
ExecParallelRetrieveInstrumentation(pei->planstate,
pei->instrumentation);
/* Accumulate JIT instrumentation, if any. */
if (pei->jit_instrumentation)
ExecParallelRetrieveJitInstrumentation(pei->planstate,
pei->jit_instrumentation);
/* Free any serialized parameters. */
if (DsaPointerIsValid(pei->param_exec))
{
@ -1274,6 +1345,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
DestReceiver *receiver;
QueryDesc *queryDesc;
SharedExecutorInstrumentation *instrumentation;
SharedJitInstrumentation *jit_instrumentation;
int instrument_options = 0;
void *area_space;
dsa_area *area;
@ -1287,6 +1359,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
if (instrumentation != NULL)
instrument_options = instrumentation->instrument_options;
jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
true);
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
/* Setting debug_query_string for individual workers */
@ -1350,6 +1424,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ExecParallelReportInstrumentation(queryDesc->planstate,
instrumentation);
/* Report JIT instrumentation data if any */
if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
{
Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
jit_instrumentation->jit_instr[ParallelWorkerNumber] =
queryDesc->estate->es_jit->instr;
}
/* Must do this after capturing instrumentation. */
ExecutorEnd(queryDesc);