1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

Add optimizer and executor support for parallel index scans.

In combination with 569174f1be, which
taught the btree AM how to perform parallel index scans, this allows
parallel index scan plans on btree indexes.  This infrastructure
should be general enough to support parallel index scans for other
index AMs as well, if someone updates them to support parallel
scans.

Amit Kapila, reviewed and tested by Anastasia Lubennikova, Tushar
Ahuja, and Haribabu Kommi, and me.
This commit is contained in:
Robert Haas
2017-02-15 13:53:24 -05:00
parent 51ee6f3160
commit 5262f7a4fc
29 changed files with 366 additions and 55 deletions

View File

@ -28,6 +28,7 @@
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/nodeIndexscan.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planmain.h"
@ -197,6 +198,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
case T_IndexScanState:
ExecIndexScanEstimate((IndexScanState *) planstate,
e->pcxt);
break;
case T_ForeignScanState:
ExecForeignScanEstimate((ForeignScanState *) planstate,
e->pcxt);
@ -249,6 +254,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
case T_IndexScanState:
ExecIndexScanInitializeDSM((IndexScanState *) planstate,
d->pcxt);
break;
case T_ForeignScanState:
ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
d->pcxt);
@ -725,6 +734,9 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
case T_SeqScanState:
ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
break;
case T_IndexScanState:
ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc);
break;
case T_ForeignScanState:
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
toc);

View File

@ -22,6 +22,9 @@
* ExecEndIndexScan releases all storage.
* ExecIndexMarkPos marks scan position.
* ExecIndexRestrPos restores scan position.
* ExecIndexScanEstimate estimates DSM space needed for parallel index scan
* ExecIndexScanInitializeDSM initialize DSM for parallel indexscan
* ExecIndexScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
@ -514,6 +517,18 @@ ExecIndexScan(IndexScanState *node)
void
ExecReScanIndexScan(IndexScanState *node)
{
bool reset_parallel_scan = true;
/*
* If we are here to just update the scan keys, then don't reset parallel
* scan. We don't want each of the participating process in the parallel
* scan to update the shared parallel scan state at the start of the scan.
* It is quite possible that one of the participants has already begun
* scanning the index when another has yet to start it.
*/
if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady)
reset_parallel_scan = false;
/*
* If we are doing runtime key calculations (ie, any of the index key
* values weren't simple Consts), compute the new key values. But first,
@ -539,10 +554,21 @@ ExecReScanIndexScan(IndexScanState *node)
reorderqueue_pop(node);
}
/* reset index scan */
index_rescan(node->iss_ScanDesc,
node->iss_ScanKeys, node->iss_NumScanKeys,
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
/*
* Reset (parallel) index scan. For parallel-aware nodes, the scan
* descriptor is initialized during actual execution of node and we can
* reach here before that (ex. during execution of nest loop join). So,
* avoid updating the scan descriptor at that time.
*/
if (node->iss_ScanDesc)
{
index_rescan(node->iss_ScanDesc,
node->iss_ScanKeys, node->iss_NumScanKeys,
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
if (reset_parallel_scan && node->iss_ScanDesc->parallel_scan)
index_parallelrescan(node->iss_ScanDesc);
}
node->iss_ReachedEnd = false;
ExecScanReScan(&node->ss);
@ -1013,22 +1039,29 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
}
/*
* Initialize scan descriptor.
* for parallel-aware node, we initialize the scan descriptor after
* initializing the shared memory for parallel execution.
*/
indexstate->iss_ScanDesc = index_beginscan(currentRelation,
indexstate->iss_RelationDesc,
estate->es_snapshot,
indexstate->iss_NumScanKeys,
if (!node->scan.plan.parallel_aware)
{
/*
* Initialize scan descriptor.
*/
indexstate->iss_ScanDesc = index_beginscan(currentRelation,
indexstate->iss_RelationDesc,
estate->es_snapshot,
indexstate->iss_NumScanKeys,
indexstate->iss_NumOrderByKeys);
/*
* If no run-time keys to calculate, go ahead and pass the scankeys to the
* index AM.
*/
if (indexstate->iss_NumRuntimeKeys == 0)
index_rescan(indexstate->iss_ScanDesc,
indexstate->iss_ScanKeys, indexstate->iss_NumScanKeys,
/*
* If no run-time keys to calculate, go ahead and pass the scankeys to
* the index AM.
*/
if (indexstate->iss_NumRuntimeKeys == 0)
index_rescan(indexstate->iss_ScanDesc,
indexstate->iss_ScanKeys, indexstate->iss_NumScanKeys,
indexstate->iss_OrderByKeys, indexstate->iss_NumOrderByKeys);
}
/*
* all done.
@ -1590,3 +1623,91 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
else if (n_array_keys != 0)
elog(ERROR, "ScalarArrayOpExpr index qual found where not allowed");
}
/* ----------------------------------------------------------------
* Parallel Scan Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecIndexScanEstimate
*
* estimates the space required to serialize indexscan node.
* ----------------------------------------------------------------
*/
void
ExecIndexScanEstimate(IndexScanState *node,
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
node->iss_PscanLen = index_parallelscan_estimate(node->iss_RelationDesc,
estate->es_snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, node->iss_PscanLen);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/* ----------------------------------------------------------------
* ExecIndexScanInitializeDSM
*
* Set up a parallel index scan descriptor.
* ----------------------------------------------------------------
*/
void
ExecIndexScanInitializeDSM(IndexScanState *node,
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
ParallelIndexScanDesc piscan;
piscan = shm_toc_allocate(pcxt->toc, node->iss_PscanLen);
index_parallelscan_initialize(node->ss.ss_currentRelation,
node->iss_RelationDesc,
estate->es_snapshot,
piscan);
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
node->iss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->iss_RelationDesc,
node->iss_NumScanKeys,
node->iss_NumOrderByKeys,
piscan);
/*
* If no run-time keys to calculate, go ahead and pass the scankeys to the
* index AM.
*/
if (node->iss_NumRuntimeKeys == 0)
index_rescan(node->iss_ScanDesc,
node->iss_ScanKeys, node->iss_NumScanKeys,
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
}
/* ----------------------------------------------------------------
* ExecIndexScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
* ----------------------------------------------------------------
*/
void
ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc)
{
ParallelIndexScanDesc piscan;
piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
node->iss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->iss_RelationDesc,
node->iss_NumScanKeys,
node->iss_NumOrderByKeys,
piscan);
/*
* If no run-time keys to calculate, go ahead and pass the scankeys to the
* index AM.
*/
if (node->iss_NumRuntimeKeys == 0)
index_rescan(node->iss_ScanDesc,
node->iss_ScanKeys, node->iss_NumScanKeys,
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
}