1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

Propagate sort instrumentation from workers back to leader.

Up until now, when parallel query was used, no details about the
sort method or space used by the workers were available; details
were shown only for any sorting done by the leader.  Fix that.

Commit 1177ab1dab forced the test case
added by commit 1f6d515a67 to run
without parallelism; now that we have this infrastructure, allow
that again, with a little tweaking to make it pass with and without
force_parallel_mode.

Robert Haas and Tom Lane

Discussion: http://postgr.es/m/CA+Tgmoa2VBZW6S8AAXfhpHczb=Rf6RqQ2br+zJvEgwJ0uoD_tQ@mail.gmail.com
This commit is contained in:
Robert Haas
2017-08-29 13:22:49 -04:00
parent 3452dc5240
commit bf11e7ee2e
9 changed files with 342 additions and 87 deletions

View File

@ -28,9 +28,10 @@
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/nodeIndexscan.h"
#include "executor/nodeIndexonlyscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/nodeSort.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planmain.h"
@ -202,10 +203,10 @@ ExecSerializePlan(Plan *plan, EState *estate)
}
/*
* Ordinary plan nodes won't do anything here, but parallel-aware plan nodes
* may need some state which is shared across all parallel workers. Before
* we size the DSM, give them a chance to call shm_toc_estimate_chunk or
* shm_toc_estimate_keys on &pcxt->estimator.
* Parallel-aware plan nodes (and occasionally others) may need some state
* which is shared across all parallel workers. Before we size the DSM, give
* them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
* &pcxt->estimator.
*
* While we're at it, count the number of PlanState nodes in the tree, so
* we know how many SharedPlanStateInstrumentation structures we need.
@ -219,38 +220,43 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
/* Count this node. */
e->nnodes++;
/* Call estimators for parallel-aware nodes. */
if (planstate->plan->parallel_aware)
switch (nodeTag(planstate))
{
switch (nodeTag(planstate))
{
case T_SeqScanState:
case T_SeqScanState:
if (planstate->plan->parallel_aware)
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
case T_IndexScanState:
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanEstimate((IndexScanState *) planstate,
e->pcxt);
break;
case T_IndexOnlyScanState:
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
e->pcxt);
break;
case T_ForeignScanState:
break;
case T_ForeignScanState:
if (planstate->plan->parallel_aware)
ExecForeignScanEstimate((ForeignScanState *) planstate,
e->pcxt);
break;
case T_CustomScanState:
break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanEstimate((CustomScanState *) planstate,
e->pcxt);
break;
case T_BitmapHeapScanState:
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
e->pcxt);
break;
default:
break;
}
break;
case T_SortState:
/* even when not parallel-aware */
ExecSortEstimate((SortState *) planstate, e->pcxt);
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelEstimate, e);
@ -276,46 +282,51 @@ ExecParallelInitializeDSM(PlanState *planstate,
d->nnodes++;
/*
* Call initializers for parallel-aware plan nodes.
* Call initializers for DSM-using plan nodes.
*
* Ordinary plan nodes won't do anything here, but parallel-aware plan
* nodes may need to initialize shared state in the DSM before parallel
* workers are available. They can allocate the space they previously
* Most plan nodes won't do anything here, but plan nodes that allocated
* DSM may need to initialize shared state in the DSM before parallel
* workers are launched. They can allocate the space they previously
* estimated using shm_toc_allocate, and add the keys they previously
* estimated using shm_toc_insert, in each case targeting pcxt->toc.
*/
if (planstate->plan->parallel_aware)
switch (nodeTag(planstate))
{
switch (nodeTag(planstate))
{
case T_SeqScanState:
case T_SeqScanState:
if (planstate->plan->parallel_aware)
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
case T_IndexScanState:
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanInitializeDSM((IndexScanState *) planstate,
d->pcxt);
break;
case T_IndexOnlyScanState:
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
d->pcxt);
break;
case T_ForeignScanState:
break;
case T_ForeignScanState:
if (planstate->plan->parallel_aware)
ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
d->pcxt);
break;
case T_CustomScanState:
break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeDSM((CustomScanState *) planstate,
d->pcxt);
break;
case T_BitmapHeapScanState:
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
d->pcxt);
break;
default:
break;
}
break;
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
@ -642,6 +653,13 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
planstate->worker_instrument->num_workers = instrumentation->num_workers;
memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
/*
* Perform any node-type-specific work that needs to be done. Currently,
* only Sort nodes need to do anything here.
*/
if (IsA(planstate, SortState))
ExecSortRetrieveInstrumentation((SortState *) planstate);
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
instrumentation);
}
@ -801,35 +819,40 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
if (planstate == NULL)
return false;
/* Call initializers for parallel-aware plan nodes. */
if (planstate->plan->parallel_aware)
switch (nodeTag(planstate))
{
switch (nodeTag(planstate))
{
case T_SeqScanState:
case T_SeqScanState:
if (planstate->plan->parallel_aware)
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
break;
case T_IndexScanState:
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc);
break;
case T_IndexOnlyScanState:
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, toc);
break;
case T_ForeignScanState:
break;
case T_ForeignScanState:
if (planstate->plan->parallel_aware)
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
toc);
break;
case T_CustomScanState:
break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
toc);
break;
case T_BitmapHeapScanState:
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
ExecBitmapHeapInitializeWorker(
(BitmapHeapScanState *) planstate, toc);
break;
default:
break;
}
break;
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeWorker((SortState *) planstate, toc);
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);

View File

@ -15,6 +15,7 @@
#include "postgres.h"
#include "access/parallel.h"
#include "executor/execdebug.h"
#include "executor/nodeSort.h"
#include "miscadmin.h"
@ -127,6 +128,15 @@ ExecSort(PlanState *pstate)
node->sort_Done = true;
node->bounded_Done = node->bounded;
node->bound_Done = node->bound;
if (node->shared_info && node->am_worker)
{
TuplesortInstrumentation *si;
Assert(IsParallelWorker());
Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
si = &node->shared_info->sinstrument[ParallelWorkerNumber];
tuplesort_get_stats(tuplesortstate, si);
}
SO1_printf("ExecSort: %s\n", "sorting done");
}
@ -334,3 +344,90 @@ ExecReScanSort(SortState *node)
else
tuplesort_rescan((Tuplesortstate *) node->tuplesortstate);
}
/* ----------------------------------------------------------------
* Parallel Query Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecSortEstimate
*
* Estimate space required to propagate sort statistics.
* ----------------------------------------------------------------
*/
void
ExecSortEstimate(SortState *node, ParallelContext *pcxt)
{
Size size;
/* don't need this if not instrumenting or no workers */
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
return;
size = mul_size(pcxt->nworkers, sizeof(TuplesortInstrumentation));
size = add_size(size, offsetof(SharedSortInfo, sinstrument));
shm_toc_estimate_chunk(&pcxt->estimator, size);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/* ----------------------------------------------------------------
* ExecSortInitializeDSM
*
* Initialize DSM space for sort statistics.
* ----------------------------------------------------------------
*/
void
ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)
{
Size size;
/* don't need this if not instrumenting or no workers */
if (!node->ss.ps.instrument || pcxt->nworkers == 0)
return;
size = offsetof(SharedSortInfo, sinstrument)
+ pcxt->nworkers * sizeof(TuplesortInstrumentation);
node->shared_info = shm_toc_allocate(pcxt->toc, size);
/* ensure any unfilled slots will contain zeroes */
memset(node->shared_info, 0, size);
node->shared_info->num_workers = pcxt->nworkers;
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
node->shared_info);
}
/* ----------------------------------------------------------------
* ExecSortInitializeWorker
*
* Attach worker to DSM space for sort statistics.
* ----------------------------------------------------------------
*/
void
ExecSortInitializeWorker(SortState *node, shm_toc *toc)
{
node->shared_info =
shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, true);
node->am_worker = true;
}
/* ----------------------------------------------------------------
* ExecSortRetrieveInstrumentation
*
* Transfer sort statistics from DSM to private memory.
* ----------------------------------------------------------------
*/
void
ExecSortRetrieveInstrumentation(SortState *node)
{
Size size;
SharedSortInfo *si;
if (node->shared_info == NULL)
return;
size = offsetof(SharedSortInfo, sinstrument)
+ node->shared_info->num_workers * sizeof(TuplesortInstrumentation);
si = palloc(size);
memcpy(si, node->shared_info, size);
node->shared_info = si;
}