1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-28 23:42:10 +03:00

Make sequential scans parallel-aware.

In addition, this path fills in a number of missing bits and pieces in
the parallel infrastructure.  Paths and plans now have a parallel_aware
flag indicating whether whatever parallel-aware logic they have should
be engaged.  It is believed that we will need this flag for a number of
path/plan types, not just sequential scans, which is why the flag is
generic rather than part of the SeqScan structures specifically.
Also, execParallel.c now gives parallel nodes a chance to initialize
their PlanState nodes from the DSM during parallel worker startup.

Amit Kapila, with a fair amount of adjustment by me.  Review of previous
patch versions by Haribabu Kommi and others.
This commit is contained in:
Robert Haas
2015-11-11 08:57:52 -05:00
parent f764ecd81b
commit f0661c4e8c
18 changed files with 254 additions and 73 deletions

View File

@ -439,6 +439,15 @@ ExecSupportsBackwardScan(Plan *node)
if (node == NULL)
return false;
/*
* Parallel-aware nodes return a subset of the tuples in each worker,
* and in general we can't expect to have enough bookkeeping state to
* know which ones we returned in this worker as opposed to some other
* worker.
*/
if (node->parallel_aware)
return false;
switch (nodeTag(node))
{
case T_Result:

View File

@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
#include "executor/nodeSeqscan.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planmain.h"
@ -167,10 +168,16 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
/* Count this node. */
e->nnodes++;
/*
* XXX. Call estimators for parallel-aware nodes here, when we have
* some.
*/
/* Call estimators for parallel-aware nodes. */
switch (nodeTag(planstate))
{
case T_SeqScanState:
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelEstimate, e);
}
@ -205,10 +212,16 @@ ExecParallelInitializeDSM(PlanState *planstate,
/* Count this node. */
d->nnodes++;
/*
* XXX. Call initializers for parallel-aware plan nodes, when we have
* some.
*/
/* Call initializers for parallel-aware plan nodes. */
switch (nodeTag(planstate))
{
case T_SeqScanState:
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
}
@ -574,6 +587,30 @@ ExecParallelReportInstrumentation(PlanState *planstate,
instrumentation);
}
/*
* Initialize the PlanState and its descendents with the information
* retrieved from shared memory. This has to be done once the PlanState
* is allocated and initialized by executor; that is, after ExecutorStart().
*/
static bool
ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
{
if (planstate == NULL)
return false;
/* Call initializers for parallel-aware plan nodes. */
switch (nodeTag(planstate))
{
case T_SeqScanState:
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
break;
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
}
/*
* Main entrypoint for parallel query worker processes.
*
@ -610,6 +647,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Start up the executor, have it run the plan, and then shut it down. */
ExecutorStart(queryDesc, 0);
ExecParallelInitializeWorker(queryDesc->planstate, toc);
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
ExecutorFinish(queryDesc);

View File

@ -19,6 +19,10 @@
* ExecInitSeqScan creates and initializes a seqscan node.
* ExecEndSeqScan releases any storage allocated.
* ExecReScanSeqScan rescans the relation
*
* ExecSeqScanEstimate estimates DSM space needed for parallel scan
* ExecSeqScanInitializeDSM initialize DSM for parallel scan
* ExecSeqScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
@ -53,10 +57,22 @@ SeqNext(SeqScanState *node)
/*
* get information from the estate and scan state
*/
scandesc = node->ss_currentScanDesc;
estate = node->ps.state;
scandesc = node->ss.ss_currentScanDesc;
estate = node->ss.ps.state;
direction = estate->es_direction;
slot = node->ss_ScanTupleSlot;
slot = node->ss.ss_ScanTupleSlot;
if (scandesc == NULL)
{
/*
* We reach here if the scan is not parallel, or if we're executing
* a scan that was intended to be parallel serially.
*/
scandesc = heap_beginscan(node->ss.ss_currentRelation,
estate->es_snapshot,
0, NULL);
node->ss.ss_currentScanDesc = scandesc;
}
/*
* get the next tuple from the table
@ -123,27 +139,19 @@ static void
InitScanRelation(SeqScanState *node, EState *estate, int eflags)
{
Relation currentRelation;
HeapScanDesc currentScanDesc;
/*
* get the relation object id from the relid'th entry in the range table,
* open that relation and acquire appropriate lock on it.
*/
currentRelation = ExecOpenScanRelation(estate,
((SeqScan *) node->ps.plan)->scanrelid,
((SeqScan *) node->ss.ps.plan)->scanrelid,
eflags);
/* initialize a heapscan */
currentScanDesc = heap_beginscan(currentRelation,
estate->es_snapshot,
0,
NULL);
node->ss_currentRelation = currentRelation;
node->ss_currentScanDesc = currentScanDesc;
node->ss.ss_currentRelation = currentRelation;
/* and report the scan tuple slot's rowtype */
ExecAssignScanType(node, RelationGetDescr(currentRelation));
ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
}
@ -167,44 +175,44 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
* create state structure
*/
scanstate = makeNode(SeqScanState);
scanstate->ps.plan = (Plan *) node;
scanstate->ps.state = estate;
scanstate->ss.ps.plan = (Plan *) node;
scanstate->ss.ps.state = estate;
/*
* Miscellaneous initialization
*
* create expression context for node
*/
ExecAssignExprContext(estate, &scanstate->ps);
ExecAssignExprContext(estate, &scanstate->ss.ps);
/*
* initialize child expressions
*/
scanstate->ps.targetlist = (List *)
scanstate->ss.ps.targetlist = (List *)
ExecInitExpr((Expr *) node->plan.targetlist,
(PlanState *) scanstate);
scanstate->ps.qual = (List *)
scanstate->ss.ps.qual = (List *)
ExecInitExpr((Expr *) node->plan.qual,
(PlanState *) scanstate);
/*
* tuple table initialization
*/
ExecInitResultTupleSlot(estate, &scanstate->ps);
ExecInitScanTupleSlot(estate, scanstate);
ExecInitResultTupleSlot(estate, &scanstate->ss.ps);
ExecInitScanTupleSlot(estate, &scanstate->ss);
/*
* initialize scan relation
*/
InitScanRelation(scanstate, estate, eflags);
scanstate->ps.ps_TupFromTlist = false;
scanstate->ss.ps.ps_TupFromTlist = false;
/*
* Initialize result tuple type and projection info.
*/
ExecAssignResultTypeFromTL(&scanstate->ps);
ExecAssignScanProjectionInfo(scanstate);
ExecAssignResultTypeFromTL(&scanstate->ss.ps);
ExecAssignScanProjectionInfo(&scanstate->ss);
return scanstate;
}
@ -224,24 +232,25 @@ ExecEndSeqScan(SeqScanState *node)
/*
* get information from node
*/
relation = node->ss_currentRelation;
scanDesc = node->ss_currentScanDesc;
relation = node->ss.ss_currentRelation;
scanDesc = node->ss.ss_currentScanDesc;
/*
* Free the exprcontext
*/
ExecFreeExprContext(&node->ps);
ExecFreeExprContext(&node->ss.ps);
/*
* clean out the tuple table
*/
ExecClearTuple(node->ps.ps_ResultTupleSlot);
ExecClearTuple(node->ss_ScanTupleSlot);
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
ExecClearTuple(node->ss.ss_ScanTupleSlot);
/*
* close heap scan
*/
heap_endscan(scanDesc);
if (scanDesc != NULL)
heap_endscan(scanDesc);
/*
* close the heap relation.
@ -265,10 +274,71 @@ ExecReScanSeqScan(SeqScanState *node)
{
HeapScanDesc scan;
scan = node->ss_currentScanDesc;
scan = node->ss.ss_currentScanDesc;
heap_rescan(scan, /* scan desc */
NULL); /* new scan keys */
if (scan != NULL)
heap_rescan(scan, /* scan desc */
NULL); /* new scan keys */
ExecScanReScan((ScanState *) node);
}
/* ----------------------------------------------------------------
* Parallel Scan Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecSeqScanEstimate
*
* estimates the space required to serialize seqscan node.
* ----------------------------------------------------------------
*/
void
ExecSeqScanEstimate(SeqScanState *node,
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
node->pscan_len = heap_parallelscan_estimate(estate->es_snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/* ----------------------------------------------------------------
* ExecSeqScanInitializeDSM
*
* Set up a parallel heap scan descriptor.
* ----------------------------------------------------------------
*/
void
ExecSeqScanInitializeDSM(SeqScanState *node,
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
ParallelHeapScanDesc pscan;
pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
heap_parallelscan_initialize(pscan,
node->ss.ss_currentRelation,
estate->es_snapshot);
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
node->ss.ss_currentScanDesc =
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
}
/* ----------------------------------------------------------------
* ExecSeqScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
* ----------------------------------------------------------------
*/
void
ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
{
ParallelHeapScanDesc pscan;
pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
node->ss.ss_currentScanDesc =
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
}