1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-28 11:44:57 +03:00

Add parallelism support for TID Range Scans

In v14, bb437f995 added support for scanning for ranges of TIDs using a
dedicated executor node for the purpose.  Here, we allow these scans to
be parallelized.  The range of blocks to scan is divvied up similarly to
how a Parallel Seq Scans does that, where 'chunks' of blocks are
allocated to each worker and the size of those chunks is slowly reduced
down to 1 block per worker by the time we're nearing the end of the
scan.  Doing that means workers finish at roughly the same time.

Allowing TID Range Scans to be parallelized removes the dilemma from the
planner as to whether a Parallel Seq Scan will cost less than a
non-parallel TID Range Scan due to the CPU concurrency of the Seq Scan
(disk costs are not divided by the number of workers).  It was possible
the planner could choose the Parallel Seq Scan which would result in
reading additional blocks during execution than the TID Scan would have.
Allowing Parallel TID Range Scans removes the trade-off the planner
makes when choosing between reduced CPU costs due to parallelism vs
additional I/O from the Parallel Seq Scan due to it scanning blocks from
outside of the required TID range.  There is also, of course, the
traditional parallelism performance benefits to be gained as well, which
likely doesn't need to be explained here.

Author: Cary Huang <cary.huang@highgo.ca>
Author: David Rowley <dgrowleyml@gmail.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Steven Niu <niushiji@gmail.com>
Discussion: https://postgr.es/m/18f2c002a24.11bc2ab825151706.3749144144619388582@highgo.ca
This commit is contained in:
David Rowley
2025-11-27 14:05:04 +13:00
parent 42473b3b31
commit 0ca3b16973
15 changed files with 446 additions and 58 deletions

View File

@@ -299,6 +299,15 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
within each worker process. within each worker process.
</para> </para>
</listitem> </listitem>
<listitem>
<para>
In a <emphasis>parallel tid range scan</emphasis>, the range of blocks
will be subdivided into smaller ranges which are shared among the
cooperating processes. Each worker process will complete the scanning
of its given range of blocks before requesting an additional range of
blocks.
</para>
</listitem>
</itemizedlist> </itemizedlist>
Other scan types, such as scans of non-btree indexes, may support Other scan types, such as scans of non-btree indexes, may support

View File

@@ -258,7 +258,9 @@ heap_scan_stream_read_next_parallel(ReadStream *stream,
/* parallel scan */ /* parallel scan */
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
scan->rs_parallelworkerdata, scan->rs_parallelworkerdata,
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel,
scan->rs_startblock,
scan->rs_numblocks);
/* may return InvalidBlockNumber if there are no more blocks */ /* may return InvalidBlockNumber if there are no more blocks */
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,

View File

@@ -188,6 +188,37 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
pscan, flags); pscan, flags);
} }
TableScanDesc
table_beginscan_parallel_tidrange(Relation relation,
ParallelTableScanDesc pscan)
{
Snapshot snapshot;
uint32 flags = SO_TYPE_TIDRANGESCAN | SO_ALLOW_PAGEMODE;
TableScanDesc sscan;
Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator));
/* disable syncscan in parallel tid range scan. */
pscan->phs_syncscan = false;
if (!pscan->phs_snapshot_any)
{
/* Snapshot was serialized -- restore it */
snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off);
RegisterSnapshot(snapshot);
flags |= SO_TEMP_SNAPSHOT;
}
else
{
/* SnapshotAny passed by caller (not serialized) */
snapshot = SnapshotAny;
}
sscan = relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL,
pscan, flags);
return sscan;
}
/* ---------------------------------------------------------------------------- /* ----------------------------------------------------------------------------
* Index scan related functions. * Index scan related functions.
@@ -398,6 +429,7 @@ table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
bpscan->phs_nblocks > NBuffers / 4; bpscan->phs_nblocks > NBuffers / 4;
SpinLockInit(&bpscan->phs_mutex); SpinLockInit(&bpscan->phs_mutex);
bpscan->phs_startblock = InvalidBlockNumber; bpscan->phs_startblock = InvalidBlockNumber;
bpscan->phs_numblock = InvalidBlockNumber;
pg_atomic_init_u64(&bpscan->phs_nallocated, 0); pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
return sizeof(ParallelBlockTableScanDescData); return sizeof(ParallelBlockTableScanDescData);
@@ -416,14 +448,22 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
* *
* Determine where the parallel seq scan should start. This function may be * Determine where the parallel seq scan should start. This function may be
* called many times, once by each parallel worker. We must be careful only * called many times, once by each parallel worker. We must be careful only
* to set the startblock once. * to set the phs_startblock and phs_numblock fields once.
*
* Callers may optionally specify a non-InvalidBlockNumber value for
* 'startblock' to force the scan to start at the given page. Likewise,
* 'numblocks' can be specified as a non-InvalidBlockNumber to limit the
* number of blocks to scan to that many blocks.
*/ */
void void
table_block_parallelscan_startblock_init(Relation rel, table_block_parallelscan_startblock_init(Relation rel,
ParallelBlockTableScanWorker pbscanwork, ParallelBlockTableScanWorker pbscanwork,
ParallelBlockTableScanDesc pbscan) ParallelBlockTableScanDesc pbscan,
BlockNumber startblock,
BlockNumber numblocks)
{ {
BlockNumber sync_startpage = InvalidBlockNumber; BlockNumber sync_startpage = InvalidBlockNumber;
BlockNumber scan_nblocks;
/* Reset the state we use for controlling allocation size. */ /* Reset the state we use for controlling allocation size. */
memset(pbscanwork, 0, sizeof(*pbscanwork)); memset(pbscanwork, 0, sizeof(*pbscanwork));
@@ -431,42 +471,36 @@ table_block_parallelscan_startblock_init(Relation rel,
StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE, StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE,
"pg_nextpower2_32 may be too small for non-standard BlockNumber width"); "pg_nextpower2_32 may be too small for non-standard BlockNumber width");
/*
* We determine the chunk size based on the size of the relation. First we
* split the relation into PARALLEL_SEQSCAN_NCHUNKS chunks but we then
* take the next highest power of 2 number of the chunk size. This means
* we split the relation into somewhere between PARALLEL_SEQSCAN_NCHUNKS
* and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
*/
pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks /
PARALLEL_SEQSCAN_NCHUNKS, 1));
/*
* Ensure we don't go over the maximum chunk size with larger tables. This
* means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
* tables. Too large a chunk size has been shown to be detrimental to
* synchronous scan performance.
*/
pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
retry: retry:
/* Grab the spinlock. */ /* Grab the spinlock. */
SpinLockAcquire(&pbscan->phs_mutex); SpinLockAcquire(&pbscan->phs_mutex);
/* /*
* If the scan's startblock has not yet been initialized, we must do so * When the caller specified a limit on the number of blocks to scan, set
* now. If this is not a synchronized scan, we just start at block 0, but * that in the ParallelBlockTableScanDesc, if it's not been done by
* if it is a synchronized scan, we must get the starting position from * another worker already.
* the synchronized scan machinery. We can't hold the spinlock while */
* doing that, though, so release the spinlock, get the information we if (numblocks != InvalidBlockNumber &&
* need, and retry. If nobody else has initialized the scan in the pbscan->phs_numblock == InvalidBlockNumber)
* meantime, we'll fill in the value we fetched on the second time {
* through. pbscan->phs_numblock = numblocks;
}
/*
* If the scan's phs_startblock has not yet been initialized, we must do
* so now. If a startblock was specified, start there, otherwise if this
* is not a synchronized scan, we just start at block 0, but if it is a
* synchronized scan, we must get the starting position from the
* synchronized scan machinery. We can't hold the spinlock while doing
* that, though, so release the spinlock, get the information we need, and
* retry. If nobody else has initialized the scan in the meantime, we'll
* fill in the value we fetched on the second time through.
*/ */
if (pbscan->phs_startblock == InvalidBlockNumber) if (pbscan->phs_startblock == InvalidBlockNumber)
{ {
if (!pbscan->base.phs_syncscan) if (startblock != InvalidBlockNumber)
pbscan->phs_startblock = startblock;
else if (!pbscan->base.phs_syncscan)
pbscan->phs_startblock = 0; pbscan->phs_startblock = 0;
else if (sync_startpage != InvalidBlockNumber) else if (sync_startpage != InvalidBlockNumber)
pbscan->phs_startblock = sync_startpage; pbscan->phs_startblock = sync_startpage;
@@ -478,6 +512,34 @@ retry:
} }
} }
SpinLockRelease(&pbscan->phs_mutex); SpinLockRelease(&pbscan->phs_mutex);
/*
* Figure out how many blocks we're going to scan; either all of them, or
* just phs_numblock's worth, if a limit has been imposed.
*/
if (pbscan->phs_numblock == InvalidBlockNumber)
scan_nblocks = pbscan->phs_nblocks;
else
scan_nblocks = pbscan->phs_numblock;
/*
* We determine the chunk size based on scan_nblocks. First we split
* scan_nblocks into PARALLEL_SEQSCAN_NCHUNKS chunks then we calculate the
* next highest power of 2 number of the result. This means we split the
* blocks we're scanning into somewhere between PARALLEL_SEQSCAN_NCHUNKS
* and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
*/
pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(scan_nblocks /
PARALLEL_SEQSCAN_NCHUNKS, 1));
/*
* Ensure we don't go over the maximum chunk size with larger tables. This
* means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
* tables. Too large a chunk size has been shown to be detrimental to
* sequential scan performance.
*/
pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
} }
/* /*
@@ -493,6 +555,7 @@ table_block_parallelscan_nextpage(Relation rel,
ParallelBlockTableScanWorker pbscanwork, ParallelBlockTableScanWorker pbscanwork,
ParallelBlockTableScanDesc pbscan) ParallelBlockTableScanDesc pbscan)
{ {
BlockNumber scan_nblocks;
BlockNumber page; BlockNumber page;
uint64 nallocated; uint64 nallocated;
@@ -513,7 +576,7 @@ table_block_parallelscan_nextpage(Relation rel,
* *
* Here we name these ranges of blocks "chunks". The initial size of * Here we name these ranges of blocks "chunks". The initial size of
* these chunks is determined in table_block_parallelscan_startblock_init * these chunks is determined in table_block_parallelscan_startblock_init
* based on the size of the relation. Towards the end of the scan, we * based on the number of blocks to scan. Towards the end of the scan, we
* start making reductions in the size of the chunks in order to attempt * start making reductions in the size of the chunks in order to attempt
* to divide the remaining work over all the workers as evenly as * to divide the remaining work over all the workers as evenly as
* possible. * possible.
@@ -530,17 +593,23 @@ table_block_parallelscan_nextpage(Relation rel,
* phs_nallocated counter will exceed rs_nblocks, because workers will * phs_nallocated counter will exceed rs_nblocks, because workers will
* still increment the value, when they try to allocate the next block but * still increment the value, when they try to allocate the next block but
* all blocks have been allocated already. The counter must be 64 bits * all blocks have been allocated already. The counter must be 64 bits
* wide because of that, to avoid wrapping around when rs_nblocks is close * wide because of that, to avoid wrapping around when scan_nblocks is
* to 2^32. * close to 2^32.
* *
* The actual block to return is calculated by adding the counter to the * The actual block to return is calculated by adding the counter to the
* starting block number, modulo nblocks. * starting block number, modulo phs_nblocks.
*/ */
/* First, figure out how many blocks we're planning on scanning */
if (pbscan->phs_numblock == InvalidBlockNumber)
scan_nblocks = pbscan->phs_nblocks;
else
scan_nblocks = pbscan->phs_numblock;
/* /*
* First check if we have any remaining blocks in a previous chunk for * Now check if we have any remaining blocks in a previous chunk for this
* this worker. We must consume all of the blocks from that before we * worker. We must consume all of the blocks from that before we allocate
* allocate a new chunk to the worker. * a new chunk to the worker.
*/ */
if (pbscanwork->phsw_chunk_remaining > 0) if (pbscanwork->phsw_chunk_remaining > 0)
{ {
@@ -562,7 +631,7 @@ table_block_parallelscan_nextpage(Relation rel,
* chunk size set to 1. * chunk size set to 1.
*/ */
if (pbscanwork->phsw_chunk_size > 1 && if (pbscanwork->phsw_chunk_size > 1 &&
pbscanwork->phsw_nallocated > pbscan->phs_nblocks - pbscanwork->phsw_nallocated > scan_nblocks -
(pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS)) (pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
pbscanwork->phsw_chunk_size >>= 1; pbscanwork->phsw_chunk_size >>= 1;
@@ -577,7 +646,8 @@ table_block_parallelscan_nextpage(Relation rel,
pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1; pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1;
} }
if (nallocated >= pbscan->phs_nblocks) /* Check if we've run out of blocks to scan */
if (nallocated >= scan_nblocks)
page = InvalidBlockNumber; /* all blocks have been allocated */ page = InvalidBlockNumber; /* all blocks have been allocated */
else else
page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks; page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;

View File

@@ -40,6 +40,7 @@
#include "executor/nodeSeqscan.h" #include "executor/nodeSeqscan.h"
#include "executor/nodeSort.h" #include "executor/nodeSort.h"
#include "executor/nodeSubplan.h" #include "executor/nodeSubplan.h"
#include "executor/nodeTidrangescan.h"
#include "executor/tqueue.h" #include "executor/tqueue.h"
#include "jit/jit.h" #include "jit/jit.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
@@ -266,6 +267,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecForeignScanEstimate((ForeignScanState *) planstate, ExecForeignScanEstimate((ForeignScanState *) planstate,
e->pcxt); e->pcxt);
break; break;
case T_TidRangeScanState:
if (planstate->plan->parallel_aware)
ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
e->pcxt);
break;
case T_AppendState: case T_AppendState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecAppendEstimate((AppendState *) planstate, ExecAppendEstimate((AppendState *) planstate,
@@ -493,6 +499,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecForeignScanInitializeDSM((ForeignScanState *) planstate, ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
d->pcxt); d->pcxt);
break; break;
case T_TidRangeScanState:
if (planstate->plan->parallel_aware)
ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
d->pcxt);
break;
case T_AppendState: case T_AppendState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecAppendInitializeDSM((AppendState *) planstate, ExecAppendInitializeDSM((AppendState *) planstate,
@@ -994,6 +1005,11 @@ ExecParallelReInitializeDSM(PlanState *planstate,
ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
pcxt); pcxt);
break; break;
case T_TidRangeScanState:
if (planstate->plan->parallel_aware)
ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
pcxt);
break;
case T_AppendState: case T_AppendState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecAppendReInitializeDSM((AppendState *) planstate, pcxt); ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
@@ -1362,6 +1378,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecForeignScanInitializeWorker((ForeignScanState *) planstate, ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
pwcxt); pwcxt);
break; break;
case T_TidRangeScanState:
if (planstate->plan->parallel_aware)
ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
pwcxt);
break;
case T_AppendState: case T_AppendState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecAppendInitializeWorker((AppendState *) planstate, pwcxt); ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);

View File

@@ -415,3 +415,83 @@ ExecInitTidRangeScan(TidRangeScan *node, EState *estate, int eflags)
*/ */
return tidrangestate; return tidrangestate;
} }
/* ----------------------------------------------------------------
* Parallel Scan Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecTidRangeScanEstimate
*
* Compute the amount of space we'll need in the parallel
* query DSM, and inform pcxt->estimator about our needs.
* ----------------------------------------------------------------
*/
void
ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
node->trss_pscanlen =
table_parallelscan_estimate(node->ss.ss_currentRelation,
estate->es_snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, node->trss_pscanlen);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/* ----------------------------------------------------------------
* ExecTidRangeScanInitializeDSM
*
* Set up a parallel TID range scan descriptor.
* ----------------------------------------------------------------
*/
void
ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
ParallelTableScanDesc pscan;
pscan = shm_toc_allocate(pcxt->toc, node->trss_pscanlen);
table_parallelscan_initialize(node->ss.ss_currentRelation,
pscan,
estate->es_snapshot);
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
node->ss.ss_currentScanDesc =
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation,
pscan);
}
/* ----------------------------------------------------------------
* ExecTidRangeScanReInitializeDSM
*
* Reset shared state before beginning a fresh scan.
* ----------------------------------------------------------------
*/
void
ExecTidRangeScanReInitializeDSM(TidRangeScanState *node,
ParallelContext *pcxt)
{
ParallelTableScanDesc pscan;
pscan = node->ss.ss_currentScanDesc->rs_parallel;
table_parallelscan_reinitialize(node->ss.ss_currentRelation, pscan);
}
/* ----------------------------------------------------------------
* ExecTidRangeScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
* ----------------------------------------------------------------
*/
void
ExecTidRangeScanInitializeWorker(TidRangeScanState *node,
ParallelWorkerContext *pwcxt)
{
ParallelTableScanDesc pscan;
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->ss.ss_currentScanDesc =
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation,
pscan);
}

View File

@@ -1340,8 +1340,9 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
{ {
Selectivity selectivity; Selectivity selectivity;
double pages; double pages;
Cost startup_cost = 0; Cost startup_cost;
Cost run_cost = 0; Cost cpu_run_cost;
Cost disk_run_cost;
QualCost qpqual_cost; QualCost qpqual_cost;
Cost cpu_per_tuple; Cost cpu_per_tuple;
QualCost tid_qual_cost; QualCost tid_qual_cost;
@@ -1373,8 +1374,8 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
* page is just a normal sequential page read. NOTE: it's desirable for * page is just a normal sequential page read. NOTE: it's desirable for
* TID Range Scans to cost more than the equivalent Sequential Scans, * TID Range Scans to cost more than the equivalent Sequential Scans,
* because Seq Scans have some performance advantages such as scan * because Seq Scans have some performance advantages such as scan
* synchronization and parallelizability, and we'd prefer one of them to * synchronization, and we'd prefer one of them to be picked unless a TID
* be picked unless a TID Range Scan really is better. * Range Scan really is better.
*/ */
ntuples = selectivity * baserel->tuples; ntuples = selectivity * baserel->tuples;
nseqpages = pages - 1.0; nseqpages = pages - 1.0;
@@ -1391,7 +1392,7 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
&spc_seq_page_cost); &spc_seq_page_cost);
/* disk costs; 1 random page and the remainder as seq pages */ /* disk costs; 1 random page and the remainder as seq pages */
run_cost += spc_random_page_cost + spc_seq_page_cost * nseqpages; disk_run_cost = spc_random_page_cost + spc_seq_page_cost * nseqpages;
/* Add scanning CPU costs */ /* Add scanning CPU costs */
get_restriction_qual_cost(root, baserel, param_info, &qpqual_cost); get_restriction_qual_cost(root, baserel, param_info, &qpqual_cost);
@@ -1403,20 +1404,35 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
* can't be removed, this is a mistake and we're going to underestimate * can't be removed, this is a mistake and we're going to underestimate
* the CPU cost a bit.) * the CPU cost a bit.)
*/ */
startup_cost += qpqual_cost.startup + tid_qual_cost.per_tuple; startup_cost = qpqual_cost.startup + tid_qual_cost.per_tuple;
cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple - cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple -
tid_qual_cost.per_tuple; tid_qual_cost.per_tuple;
run_cost += cpu_per_tuple * ntuples; cpu_run_cost = cpu_per_tuple * ntuples;
/* tlist eval costs are paid per output row, not per tuple scanned */ /* tlist eval costs are paid per output row, not per tuple scanned */
startup_cost += path->pathtarget->cost.startup; startup_cost += path->pathtarget->cost.startup;
run_cost += path->pathtarget->cost.per_tuple * path->rows; cpu_run_cost += path->pathtarget->cost.per_tuple * path->rows;
/* Adjust costing for parallelism, if used. */
if (path->parallel_workers > 0)
{
double parallel_divisor = get_parallel_divisor(path);
/* The CPU cost is divided among all the workers. */
cpu_run_cost /= parallel_divisor;
/*
* In the case of a parallel plan, the row count needs to represent
* the number of tuples processed per worker.
*/
path->rows = clamp_row_est(path->rows / parallel_divisor);
}
/* we should not generate this path type when enable_tidscan=false */ /* we should not generate this path type when enable_tidscan=false */
Assert(enable_tidscan); Assert(enable_tidscan);
path->disabled_nodes = 0; path->disabled_nodes = 0;
path->startup_cost = startup_cost; path->startup_cost = startup_cost;
path->total_cost = startup_cost + run_cost; path->total_cost = startup_cost + cpu_run_cost + disk_run_cost;
} }
/* /*

View File

@@ -490,9 +490,8 @@ ec_member_matches_ctid(PlannerInfo *root, RelOptInfo *rel,
/* /*
* create_tidscan_paths * create_tidscan_paths
* Create paths corresponding to direct TID scans of the given rel. * Create paths corresponding to direct TID scans of the given rel and add
* * them to the corresponding path list via add_path or add_partial_path.
* Candidate paths are added to the rel's pathlist (using add_path).
*/ */
bool bool
create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel) create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
@@ -553,7 +552,24 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
add_path(rel, (Path *) create_tidrangescan_path(root, rel, add_path(rel, (Path *) create_tidrangescan_path(root, rel,
tidrangequals, tidrangequals,
required_outer)); required_outer,
0));
/* If appropriate, consider parallel tid range scan. */
if (rel->consider_parallel && required_outer == NULL)
{
int parallel_workers;
parallel_workers = compute_parallel_worker(rel, rel->pages, -1,
max_parallel_workers_per_gather);
if (parallel_workers > 0)
add_partial_path(rel, (Path *) create_tidrangescan_path(root,
rel,
tidrangequals,
required_outer,
parallel_workers));
}
} }
/* /*

View File

@@ -1262,7 +1262,8 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
*/ */
TidRangePath * TidRangePath *
create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel, create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
List *tidrangequals, Relids required_outer) List *tidrangequals, Relids required_outer,
int parallel_workers)
{ {
TidRangePath *pathnode = makeNode(TidRangePath); TidRangePath *pathnode = makeNode(TidRangePath);
@@ -1271,9 +1272,9 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
pathnode->path.pathtarget = rel->reltarget; pathnode->path.pathtarget = rel->reltarget;
pathnode->path.param_info = get_baserel_parampathinfo(root, rel, pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
required_outer); required_outer);
pathnode->path.parallel_aware = false; pathnode->path.parallel_aware = (parallel_workers > 0);
pathnode->path.parallel_safe = rel->consider_parallel; pathnode->path.parallel_safe = rel->consider_parallel;
pathnode->path.parallel_workers = 0; pathnode->path.parallel_workers = parallel_workers;
pathnode->path.pathkeys = NIL; /* always unordered */ pathnode->path.pathkeys = NIL; /* always unordered */
pathnode->tidrangequals = tidrangequals; pathnode->tidrangequals = tidrangequals;

View File

@@ -96,6 +96,8 @@ typedef struct ParallelBlockTableScanDescData
BlockNumber phs_nblocks; /* # blocks in relation at start of scan */ BlockNumber phs_nblocks; /* # blocks in relation at start of scan */
slock_t phs_mutex; /* mutual exclusion for setting startblock */ slock_t phs_mutex; /* mutual exclusion for setting startblock */
BlockNumber phs_startblock; /* starting block number */ BlockNumber phs_startblock; /* starting block number */
BlockNumber phs_numblock; /* # blocks to scan, or InvalidBlockNumber if
* no limit */
pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to
* workers so far. */ * workers so far. */
} ParallelBlockTableScanDescData; } ParallelBlockTableScanDescData;

View File

@@ -1130,6 +1130,16 @@ extern void table_parallelscan_initialize(Relation rel,
extern TableScanDesc table_beginscan_parallel(Relation relation, extern TableScanDesc table_beginscan_parallel(Relation relation,
ParallelTableScanDesc pscan); ParallelTableScanDesc pscan);
/*
* Begin a parallel tid range scan. `pscan` needs to have been initialized
* with table_parallelscan_initialize(), for the same relation. The
* initialization does not need to have happened in this backend.
*
* Caller must hold a suitable lock on the relation.
*/
extern TableScanDesc table_beginscan_parallel_tidrange(Relation relation,
ParallelTableScanDesc pscan);
/* /*
* Restart a parallel scan. Call this in the leader process. Caller is * Restart a parallel scan. Call this in the leader process. Caller is
* responsible for making sure that all workers have finished the scan * responsible for making sure that all workers have finished the scan
@@ -2028,7 +2038,9 @@ extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
ParallelBlockTableScanDesc pbscan); ParallelBlockTableScanDesc pbscan);
extern void table_block_parallelscan_startblock_init(Relation rel, extern void table_block_parallelscan_startblock_init(Relation rel,
ParallelBlockTableScanWorker pbscanwork, ParallelBlockTableScanWorker pbscanwork,
ParallelBlockTableScanDesc pbscan); ParallelBlockTableScanDesc pbscan,
BlockNumber startblock,
BlockNumber numblocks);
/* ---------------------------------------------------------------------------- /* ----------------------------------------------------------------------------

View File

@@ -14,6 +14,7 @@
#ifndef NODETIDRANGESCAN_H #ifndef NODETIDRANGESCAN_H
#define NODETIDRANGESCAN_H #define NODETIDRANGESCAN_H
#include "access/parallel.h"
#include "nodes/execnodes.h" #include "nodes/execnodes.h"
extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node, extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node,
@@ -21,4 +22,10 @@ extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node,
extern void ExecEndTidRangeScan(TidRangeScanState *node); extern void ExecEndTidRangeScan(TidRangeScanState *node);
extern void ExecReScanTidRangeScan(TidRangeScanState *node); extern void ExecReScanTidRangeScan(TidRangeScanState *node);
/* parallel scan support */
extern void ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt);
extern void ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt);
extern void ExecTidRangeScanReInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt);
extern void ExecTidRangeScanInitializeWorker(TidRangeScanState *node, ParallelWorkerContext *pwcxt);
#endif /* NODETIDRANGESCAN_H */ #endif /* NODETIDRANGESCAN_H */

View File

@@ -1930,6 +1930,7 @@ typedef struct TidScanState
* trss_mintid the lowest TID in the scan range * trss_mintid the lowest TID in the scan range
* trss_maxtid the highest TID in the scan range * trss_maxtid the highest TID in the scan range
* trss_inScan is a scan currently in progress? * trss_inScan is a scan currently in progress?
* trss_pscanlen size of parallel heap scan descriptor
* ---------------- * ----------------
*/ */
typedef struct TidRangeScanState typedef struct TidRangeScanState
@@ -1939,6 +1940,7 @@ typedef struct TidRangeScanState
ItemPointerData trss_mintid; ItemPointerData trss_mintid;
ItemPointerData trss_maxtid; ItemPointerData trss_maxtid;
bool trss_inScan; bool trss_inScan;
Size trss_pscanlen;
} TidRangeScanState; } TidRangeScanState;
/* ---------------- /* ----------------

View File

@@ -67,7 +67,8 @@ extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
extern TidRangePath *create_tidrangescan_path(PlannerInfo *root, extern TidRangePath *create_tidrangescan_path(PlannerInfo *root,
RelOptInfo *rel, RelOptInfo *rel,
List *tidrangequals, List *tidrangequals,
Relids required_outer); Relids required_outer,
int parallel_workers);
extern AppendPath *create_append_path(PlannerInfo *root, RelOptInfo *rel, extern AppendPath *create_append_path(PlannerInfo *root, RelOptInfo *rel,
List *subpaths, List *partial_subpaths, List *subpaths, List *partial_subpaths,
List *pathkeys, Relids required_outer, List *pathkeys, Relids required_outer,

View File

@@ -297,4 +297,109 @@ FETCH LAST c;
COMMIT; COMMIT;
DROP TABLE tidrangescan; DROP TABLE tidrangescan;
-- Tests for parallel TID Range Scans
BEGIN;
SET LOCAL parallel_setup_cost TO 0;
SET LOCAL parallel_tuple_cost TO 0;
SET LOCAL min_parallel_table_scan_size TO 0;
SET LOCAL max_parallel_workers_per_gather TO 4;
CREATE TABLE parallel_tidrangescan (id integer, data text)
WITH (fillfactor = 10);
-- Insert enough tuples such that each page gets 5 tuples with fillfactor = 10
INSERT INTO parallel_tidrangescan
SELECT i, repeat('x', 100) FROM generate_series(1,200) AS s(i);
-- Ensure there are 40 pages for parallel test
SELECT min(ctid), max(ctid) FROM parallel_tidrangescan;
min | max
-------+--------
(0,1) | (39,5)
(1 row)
-- Parallel range scans with upper bound
EXPLAIN (COSTS OFF)
SELECT count(*) FROM parallel_tidrangescan WHERE ctid < '(30,1)';
QUERY PLAN
--------------------------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 4
-> Partial Aggregate
-> Parallel Tid Range Scan on parallel_tidrangescan
TID Cond: (ctid < '(30,1)'::tid)
(6 rows)
SELECT count(*) FROM parallel_tidrangescan WHERE ctid < '(30,1)';
count
-------
150
(1 row)
-- Parallel range scans with lower bound
EXPLAIN (COSTS OFF)
SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)';
QUERY PLAN
--------------------------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 4
-> Partial Aggregate
-> Parallel Tid Range Scan on parallel_tidrangescan
TID Cond: (ctid > '(10,0)'::tid)
(6 rows)
SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)';
count
-------
150
(1 row)
-- Parallel range scans with both bounds
EXPLAIN (COSTS OFF)
SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)' AND ctid < '(30,1)';
QUERY PLAN
-----------------------------------------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 4
-> Partial Aggregate
-> Parallel Tid Range Scan on parallel_tidrangescan
TID Cond: ((ctid > '(10,0)'::tid) AND (ctid < '(30,1)'::tid))
(6 rows)
SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)' AND ctid < '(30,1)';
count
-------
100
(1 row)
-- Parallel rescans
EXPLAIN (COSTS OFF)
SELECT t.ctid,t2.c FROM parallel_tidrangescan t,
LATERAL (SELECT count(*) c FROM parallel_tidrangescan t2 WHERE t2.ctid <= t.ctid) t2
WHERE t.ctid < '(1,0)';
QUERY PLAN
----------------------------------------------------------------
Nested Loop
-> Gather
Workers Planned: 4
-> Parallel Tid Range Scan on parallel_tidrangescan t
TID Cond: (ctid < '(1,0)'::tid)
-> Aggregate
-> Tid Range Scan on parallel_tidrangescan t2
TID Cond: (ctid <= t.ctid)
(8 rows)
SELECT t.ctid,t2.c FROM parallel_tidrangescan t,
LATERAL (SELECT count(*) c FROM parallel_tidrangescan t2 WHERE t2.ctid <= t.ctid) t2
WHERE t.ctid < '(1,0)';
ctid | c
-------+---
(0,1) | 1
(0,2) | 2
(0,3) | 3
(0,4) | 4
(0,5) | 5
(5 rows)
ROLLBACK;
RESET enable_seqscan; RESET enable_seqscan;

View File

@@ -98,4 +98,48 @@ COMMIT;
DROP TABLE tidrangescan; DROP TABLE tidrangescan;
-- Tests for parallel TID Range Scans
BEGIN;
SET LOCAL parallel_setup_cost TO 0;
SET LOCAL parallel_tuple_cost TO 0;
SET LOCAL min_parallel_table_scan_size TO 0;
SET LOCAL max_parallel_workers_per_gather TO 4;
CREATE TABLE parallel_tidrangescan (id integer, data text)
WITH (fillfactor = 10);
-- Insert enough tuples such that each page gets 5 tuples with fillfactor = 10
INSERT INTO parallel_tidrangescan
SELECT i, repeat('x', 100) FROM generate_series(1,200) AS s(i);
-- Ensure there are 40 pages for parallel test
SELECT min(ctid), max(ctid) FROM parallel_tidrangescan;
-- Parallel range scans with upper bound
EXPLAIN (COSTS OFF)
SELECT count(*) FROM parallel_tidrangescan WHERE ctid < '(30,1)';
SELECT count(*) FROM parallel_tidrangescan WHERE ctid < '(30,1)';
-- Parallel range scans with lower bound
EXPLAIN (COSTS OFF)
SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)';
SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)';
-- Parallel range scans with both bounds
EXPLAIN (COSTS OFF)
SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)' AND ctid < '(30,1)';
SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)' AND ctid < '(30,1)';
-- Parallel rescans
EXPLAIN (COSTS OFF)
SELECT t.ctid,t2.c FROM parallel_tidrangescan t,
LATERAL (SELECT count(*) c FROM parallel_tidrangescan t2 WHERE t2.ctid <= t.ctid) t2
WHERE t.ctid < '(1,0)';
SELECT t.ctid,t2.c FROM parallel_tidrangescan t,
LATERAL (SELECT count(*) c FROM parallel_tidrangescan t2 WHERE t2.ctid <= t.ctid) t2
WHERE t.ctid < '(1,0)';
ROLLBACK;
RESET enable_seqscan; RESET enable_seqscan;