1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Separate reinitialization of shared parallel-scan state from ExecReScan.

Previously, the parallel executor logic did reinitialization of shared
state within the ExecReScan code for parallel-aware scan nodes.  This is
problematic, because it means that the ExecReScan call has to occur
synchronously (ie, during the parent Gather node's ReScan call).  That is
swimming very much against the tide so far as the ExecReScan machinery is
concerned; the fact that it works at all today depends on a lot of fragile
assumptions, such as that no plan node between Gather and a parallel-aware
scan node is parameterized.  Another objection is that because ExecReScan
might be called in workers as well as the leader, hacky extra tests are
needed in some places to prevent unwanted shared-state resets.

Hence, let's separate this code into two functions, a ReInitializeDSM
call and the ReScan call proper.  ReInitializeDSM is called only in
the leader and is guaranteed to run before we start new workers.
ReScan is returned to its traditional function of resetting only local
state, which means that ExecReScan's usual habits of delaying or
eliminating child rescan calls are safe again.

As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary
to make these changes in 9.6, which is a good thing because the FDW and
CustomScan APIs are impacted.

Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
This commit is contained in:
Tom Lane
2017-08-30 13:18:16 -04:00
parent 5816ddc707
commit d6a149f4e6
22 changed files with 302 additions and 135 deletions

View File

@@ -320,22 +320,39 @@ void (*InitializeDSMCustomScan) (CustomScanState *node,
void *coordinate); void *coordinate);
</programlisting> </programlisting>
Initialize the dynamic shared memory that will be required for parallel Initialize the dynamic shared memory that will be required for parallel
operation; <literal>coordinate</> points to an amount of allocated space operation. <literal>coordinate</> points to a shared memory area of
equal to the return value of <function>EstimateDSMCustomScan</>. size equal to the return value of <function>EstimateDSMCustomScan</>.
This callback is optional, and need only be supplied if this custom This callback is optional, and need only be supplied if this custom
scan provider supports parallel execution. scan provider supports parallel execution.
</para> </para>
<para> <para>
<programlisting> <programlisting>
void (*ReInitializeDSMCustomScan) (CustomScanState *node,
ParallelContext *pcxt,
void *coordinate);
</programlisting>
Re-initialize the dynamic shared memory required for parallel operation
when the custom-scan plan node is about to be re-scanned.
This callback is optional, and need only be supplied if this custom
scan provider supports parallel execution.
Recommended practice is that this callback reset only shared state,
while the <function>ReScanCustomScan</> callback resets only local
state. Currently, this callback will be called
before <function>ReScanCustomScan</>, but it's best not to rely on
that ordering.
</para>
<para>
<programlisting>
void (*InitializeWorkerCustomScan) (CustomScanState *node, void (*InitializeWorkerCustomScan) (CustomScanState *node,
shm_toc *toc, shm_toc *toc,
void *coordinate); void *coordinate);
</programlisting> </programlisting>
Initialize a parallel worker's custom state based on the shared state Initialize a parallel worker's local state based on the shared state
set up in the leader by <literal>InitializeDSMCustomScan</>. set up by the leader during <function>InitializeDSMCustomScan</>.
This callback is optional, and needs only be supplied if this This callback is optional, and need only be supplied if this custom
custom path supports parallel execution. scan provider supports parallel execution.
</para> </para>
<para> <para>

View File

@@ -1191,12 +1191,12 @@ ImportForeignSchema (ImportForeignSchemaStmt *stmt, Oid serverOid);
<para> <para>
A <structname>ForeignScan</> node can, optionally, support parallel A <structname>ForeignScan</> node can, optionally, support parallel
execution. A parallel <structname>ForeignScan</> will be executed execution. A parallel <structname>ForeignScan</> will be executed
in multiple processes and should return each row only once across in multiple processes and must return each row exactly once across
all cooperating processes. To do this, processes can coordinate through all cooperating processes. To do this, processes can coordinate through
fixed size chunks of dynamic shared memory. This shared memory is not fixed-size chunks of dynamic shared memory. This shared memory is not
guaranteed to be mapped at the same address in every process, so pointers guaranteed to be mapped at the same address in every process, so it
may not be used. The following callbacks are all optional in general, must not contain pointers. The following functions are all optional,
but required if parallel execution is to be supported. but most are required if parallel execution is to be supported.
</para> </para>
<para> <para>
@@ -1215,7 +1215,7 @@ IsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
</para> </para>
<para> <para>
If this callback is not defined, it is assumed that the scan must take If this function is not defined, it is assumed that the scan must take
place within the parallel leader. Note that returning true does not mean place within the parallel leader. Note that returning true does not mean
that the scan itself can be done in parallel, only that the scan can be that the scan itself can be done in parallel, only that the scan can be
performed within a parallel worker. Therefore, it can be useful to define performed within a parallel worker. Therefore, it can be useful to define
@@ -1230,6 +1230,9 @@ EstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
Estimate the amount of dynamic shared memory that will be required Estimate the amount of dynamic shared memory that will be required
for parallel operation. This may be higher than the amount that will for parallel operation. This may be higher than the amount that will
actually be used, but it must not be lower. The return value is in bytes. actually be used, but it must not be lower. The return value is in bytes.
This function is optional, and can be omitted if not needed; but if it
is omitted, the next three functions must be omitted as well, because
no shared memory will be allocated for the FDW's use.
</para> </para>
<para> <para>
@@ -1239,8 +1242,25 @@ InitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
void *coordinate); void *coordinate);
</programlisting> </programlisting>
Initialize the dynamic shared memory that will be required for parallel Initialize the dynamic shared memory that will be required for parallel
operation; <literal>coordinate</> points to an amount of allocated space operation. <literal>coordinate</> points to a shared memory area of
equal to the return value of <function>EstimateDSMForeignScan</>. size equal to the return value of <function>EstimateDSMForeignScan</>.
This function is optional, and can be omitted if not needed.
</para>
<para>
<programlisting>
void
ReInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
void *coordinate);
</programlisting>
Re-initialize the dynamic shared memory required for parallel operation
when the foreign-scan plan node is about to be re-scanned.
This function is optional, and can be omitted if not needed.
Recommended practice is that this function reset only shared state,
while the <function>ReScanForeignScan</> function resets only local
state. Currently, this function will be called
before <function>ReScanForeignScan</>, but it's best not to rely on
that ordering.
</para> </para>
<para> <para>
@@ -1249,10 +1269,9 @@ void
InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc, InitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
void *coordinate); void *coordinate);
</programlisting> </programlisting>
Initialize a parallel worker's custom state based on the shared state Initialize a parallel worker's local state based on the shared state
set up in the leader by <literal>InitializeDSMForeignScan</>. set up by the leader during <function>InitializeDSMForeignScan</>.
This callback is optional, and needs only be supplied if this This function is optional, and can be omitted if not needed.
custom path supports parallel execution.
</para> </para>
<para> <para>

View File

@@ -1525,25 +1525,6 @@ heap_rescan(HeapScanDesc scan,
* reinitialize scan descriptor * reinitialize scan descriptor
*/ */
initscan(scan, key, true); initscan(scan, key, true);
/*
* reset parallel scan, if present
*/
if (scan->rs_parallel != NULL)
{
ParallelHeapScanDesc parallel_scan;
/*
* Caller is responsible for making sure that all workers have
* finished the scan before calling this, so it really shouldn't be
* necessary to acquire the mutex at all. We acquire it anyway, just
* to be tidy.
*/
parallel_scan = scan->rs_parallel;
SpinLockAcquire(&parallel_scan->phs_mutex);
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
SpinLockRelease(&parallel_scan->phs_mutex);
}
} }
/* ---------------- /* ----------------
@@ -1640,6 +1621,25 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
SerializeSnapshot(snapshot, target->phs_snapshot_data); SerializeSnapshot(snapshot, target->phs_snapshot_data);
} }
/* ----------------
* heap_parallelscan_reinitialize - reset a parallel scan
*
* Call this in the leader process. Caller is responsible for
* making sure that all workers have finished the scan beforehand.
* ----------------
*/
void
heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan)
{
/*
* It shouldn't be necessary to acquire the mutex here, but we do it
* anyway, just to be tidy.
*/
SpinLockAcquire(&parallel_scan->phs_mutex);
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
SpinLockRelease(&parallel_scan->phs_mutex);
}
/* ---------------- /* ----------------
* heap_beginscan_parallel - join a parallel scan * heap_beginscan_parallel - join a parallel scan
* *

View File

@@ -109,6 +109,8 @@ static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d); ExecParallelInitializeDSMContext *d);
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
bool reinitialize); bool reinitialize);
static bool ExecParallelReInitializeDSM(PlanState *planstate,
ParallelContext *pcxt);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation); SharedExecutorInstrumentation *instrumentation);
@@ -364,18 +366,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
return responseq; return responseq;
} }
/*
* Re-initialize the parallel executor info such that it can be reused by
* workers.
*/
void
ExecParallelReinitialize(ParallelExecutorInfo *pei)
{
ReinitializeParallelDSM(pei->pcxt);
pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
pei->finished = false;
}
/* /*
* Sets up the required infrastructure for backend workers to perform * Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend. * execution and return results to the main backend.
@@ -567,7 +557,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
ExecParallelInitializeDSM(planstate, &d); ExecParallelInitializeDSM(planstate, &d);
/* /*
* Make sure that the world hasn't shifted under our feat. This could * Make sure that the world hasn't shifted under our feet. This could
* probably just be an Assert(), but let's be conservative for now. * probably just be an Assert(), but let's be conservative for now.
*/ */
if (e.nnodes != d.nnodes) if (e.nnodes != d.nnodes)
@@ -577,6 +567,75 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
return pei; return pei;
} }
/*
* Re-initialize the parallel executor shared memory state before launching
* a fresh batch of workers.
*/
void
ExecParallelReinitialize(PlanState *planstate,
ParallelExecutorInfo *pei)
{
/* Old workers must already be shut down */
Assert(pei->finished);
ReinitializeParallelDSM(pei->pcxt);
pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
pei->finished = false;
/* Traverse plan tree and let each child node reset associated state. */
ExecParallelReInitializeDSM(planstate, pei->pcxt);
}
/*
* Traverse plan tree to reinitialize per-node dynamic shared memory state
*/
static bool
ExecParallelReInitializeDSM(PlanState *planstate,
ParallelContext *pcxt)
{
if (planstate == NULL)
return false;
/*
* Call reinitializers for DSM-using plan nodes.
*/
if (planstate->plan->parallel_aware)
{
switch (nodeTag(planstate))
{
case T_SeqScanState:
ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
pcxt);
break;
case T_IndexScanState:
ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
pcxt);
break;
case T_IndexOnlyScanState:
ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
pcxt);
break;
case T_ForeignScanState:
ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
pcxt);
break;
case T_CustomScanState:
ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
pcxt);
break;
case T_BitmapHeapScanState:
ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
pcxt);
break;
default:
break;
}
}
return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
}
/* /*
* Copy instrumentation information about this node and its descendants from * Copy instrumentation information about this node and its descendants from
* dynamic shared memory. * dynamic shared memory.

View File

@@ -705,23 +705,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
node->shared_tbmiterator = NULL; node->shared_tbmiterator = NULL;
node->shared_prefetch_iterator = NULL; node->shared_prefetch_iterator = NULL;
/* Reset parallel bitmap state, if present */
if (node->pstate)
{
dsa_area *dsa = node->ss.ps.state->es_query_dsa;
node->pstate->state = BM_INITIAL;
if (DsaPointerIsValid(node->pstate->tbmiterator))
tbm_free_shared_area(dsa, node->pstate->tbmiterator);
if (DsaPointerIsValid(node->pstate->prefetch_iterator))
tbm_free_shared_area(dsa, node->pstate->prefetch_iterator);
node->pstate->tbmiterator = InvalidDsaPointer;
node->pstate->prefetch_iterator = InvalidDsaPointer;
}
ExecScanReScan(&node->ss); ExecScanReScan(&node->ss);
/* /*
@@ -999,6 +982,31 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
node->pstate = pstate; node->pstate = pstate;
} }
/* ----------------------------------------------------------------
* ExecBitmapHeapReInitializeDSM
*
* Reset shared state before beginning a fresh scan.
* ----------------------------------------------------------------
*/
void
ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
ParallelContext *pcxt)
{
ParallelBitmapHeapState *pstate = node->pstate;
dsa_area *dsa = node->ss.ps.state->es_query_dsa;
pstate->state = BM_INITIAL;
if (DsaPointerIsValid(pstate->tbmiterator))
tbm_free_shared_area(dsa, pstate->tbmiterator);
if (DsaPointerIsValid(pstate->prefetch_iterator))
tbm_free_shared_area(dsa, pstate->prefetch_iterator);
pstate->tbmiterator = InvalidDsaPointer;
pstate->prefetch_iterator = InvalidDsaPointer;
}
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecBitmapHeapInitializeWorker * ExecBitmapHeapInitializeWorker
* *

View File

@@ -194,6 +194,21 @@ ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
} }
} }
void
ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt)
{
const CustomExecMethods *methods = node->methods;
if (methods->ReInitializeDSMCustomScan)
{
int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate;
coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
methods->ReInitializeDSMCustomScan(node, pcxt, coordinate);
}
}
void void
ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc)
{ {

View File

@@ -332,7 +332,28 @@ ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecForeignScanInitializeDSM * ExecForeignScanReInitializeDSM
*
* Reset shared state before beginning a fresh scan.
* ----------------------------------------------------------------
*/
void
ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
{
FdwRoutine *fdwroutine = node->fdwroutine;
if (fdwroutine->ReInitializeDSMForeignScan)
{
int plan_node_id = node->ss.ps.plan->plan_node_id;
void *coordinate;
coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
}
}
/* ----------------------------------------------------------------
* ExecForeignScanInitializeWorker
* *
* Initialization according to the parallel coordination information * Initialization according to the parallel coordination information
* ---------------------------------------------------------------- * ----------------------------------------------------------------

View File

@@ -152,11 +152,14 @@ ExecGather(PlanState *pstate)
{ {
ParallelContext *pcxt; ParallelContext *pcxt;
/* Initialize the workers required to execute Gather node. */ /* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei) if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree, node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate, estate,
gather->num_workers); gather->num_workers);
else
ExecParallelReinitialize(node->ps.lefttree,
node->pei);
/* /*
* Register backend workers. We might not get as many as we * Register backend workers. We might not get as many as we
@@ -424,7 +427,7 @@ ExecShutdownGather(GatherState *node)
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecReScanGather * ExecReScanGather
* *
* Re-initialize the workers and rescans a relation via them. * Prepare to re-scan the result of a Gather.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
@@ -433,19 +436,12 @@ ExecReScanGather(GatherState *node)
Gather *gather = (Gather *) node->ps.plan; Gather *gather = (Gather *) node->ps.plan;
PlanState *outerPlan = outerPlanState(node); PlanState *outerPlan = outerPlanState(node);
/* /* Make sure any existing workers are gracefully shut down */
* Re-initialize the parallel workers to perform rescan of relation. We
* want to gracefully shutdown all the workers so that they should be able
* to propagate any error or other information to master backend before
* dying. Parallel context will be reused for rescan.
*/
ExecShutdownGatherWorkers(node); ExecShutdownGatherWorkers(node);
/* Mark node so that shared state will be rebuilt at next call */
node->initialized = false; node->initialized = false;
if (node->pei)
ExecParallelReinitialize(node->pei);
/* /*
* Set child node's chgParam to tell it that the next scan might deliver a * Set child node's chgParam to tell it that the next scan might deliver a
* different set of rows within the leader process. (The overall rowset * different set of rows within the leader process. (The overall rowset
@@ -457,10 +453,15 @@ ExecReScanGather(GatherState *node)
outerPlan->chgParam = bms_add_member(outerPlan->chgParam, outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
gather->rescan_param); gather->rescan_param);
/* /*
* if chgParam of subnode is not null then plan will be re-scanned by * If chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode. * first ExecProcNode. Note: because this does nothing if we have a
* rescan_param, it's currently guaranteed that parallel-aware child nodes
* will not see a ReScan call until after they get a ReInitializeDSM call.
* That ordering might not be something to rely on, though. A good rule
* of thumb is that ReInitializeDSM should reset only shared state, ReScan
* should reset only local state, and anything that depends on both of
* those steps being finished must wait until the first ExecProcNode call.
*/ */
if (outerPlan->chgParam == NULL) if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan); ExecReScan(outerPlan);

View File

@@ -186,11 +186,14 @@ ExecGatherMerge(PlanState *pstate)
{ {
ParallelContext *pcxt; ParallelContext *pcxt;
/* Initialize data structures for workers. */ /* Initialize, or re-initialize, shared state needed by workers. */
if (!node->pei) if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree, node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate, estate,
gm->num_workers); gm->num_workers);
else
ExecParallelReinitialize(node->ps.lefttree,
node->pei);
/* Try to launch workers. */ /* Try to launch workers. */
pcxt = node->pei->pcxt; pcxt = node->pei->pcxt;
@@ -319,7 +322,7 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node)
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecReScanGatherMerge * ExecReScanGatherMerge
* *
* Re-initialize the workers and rescans a relation via them. * Prepare to re-scan the result of a GatherMerge.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
@@ -328,20 +331,13 @@ ExecReScanGatherMerge(GatherMergeState *node)
GatherMerge *gm = (GatherMerge *) node->ps.plan; GatherMerge *gm = (GatherMerge *) node->ps.plan;
PlanState *outerPlan = outerPlanState(node); PlanState *outerPlan = outerPlanState(node);
/* /* Make sure any existing workers are gracefully shut down */
* Re-initialize the parallel workers to perform rescan of relation. We
* want to gracefully shutdown all the workers so that they should be able
* to propagate any error or other information to master backend before
* dying. Parallel context will be reused for rescan.
*/
ExecShutdownGatherMergeWorkers(node); ExecShutdownGatherMergeWorkers(node);
/* Mark node so that shared state will be rebuilt at next call */
node->initialized = false; node->initialized = false;
node->gm_initialized = false; node->gm_initialized = false;
if (node->pei)
ExecParallelReinitialize(node->pei);
/* /*
* Set child node's chgParam to tell it that the next scan might deliver a * Set child node's chgParam to tell it that the next scan might deliver a
* different set of rows within the leader process. (The overall rowset * different set of rows within the leader process. (The overall rowset
@@ -353,10 +349,15 @@ ExecReScanGatherMerge(GatherMergeState *node)
outerPlan->chgParam = bms_add_member(outerPlan->chgParam, outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
gm->rescan_param); gm->rescan_param);
/* /*
* if chgParam of subnode is not null then plan will be re-scanned by * If chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode. * first ExecProcNode. Note: because this does nothing if we have a
* rescan_param, it's currently guaranteed that parallel-aware child nodes
* will not see a ReScan call until after they get a ReInitializeDSM call.
* That ordering might not be something to rely on, though. A good rule
* of thumb is that ReInitializeDSM should reset only shared state, ReScan
* should reset only local state, and anything that depends on both of
* those steps being finished must wait until the first ExecProcNode call.
*/ */
if (outerPlan->chgParam == NULL) if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan); ExecReScan(outerPlan);

View File

@@ -25,6 +25,7 @@
* parallel index-only scan * parallel index-only scan
* ExecIndexOnlyScanInitializeDSM initialize DSM for parallel * ExecIndexOnlyScanInitializeDSM initialize DSM for parallel
* index-only scan * index-only scan
* ExecIndexOnlyScanReInitializeDSM reinitialize DSM for fresh scan
* ExecIndexOnlyScanInitializeWorker attach to DSM info in parallel worker * ExecIndexOnlyScanInitializeWorker attach to DSM info in parallel worker
*/ */
#include "postgres.h" #include "postgres.h"
@@ -336,16 +337,6 @@ ExecIndexOnlyScan(PlanState *pstate)
void void
ExecReScanIndexOnlyScan(IndexOnlyScanState *node) ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
{ {
bool reset_parallel_scan = true;
/*
* If we are here to just update the scan keys, then don't reset parallel
* scan. For detailed reason behind this look in the comments for
* ExecReScanIndexScan.
*/
if (node->ioss_NumRuntimeKeys != 0 && !node->ioss_RuntimeKeysReady)
reset_parallel_scan = false;
/* /*
* If we are doing runtime key calculations (ie, any of the index key * If we are doing runtime key calculations (ie, any of the index key
* values weren't simple Consts), compute the new key values. But first, * values weren't simple Consts), compute the new key values. But first,
@@ -366,15 +357,10 @@ ExecReScanIndexOnlyScan(IndexOnlyScanState *node)
/* reset index scan */ /* reset index scan */
if (node->ioss_ScanDesc) if (node->ioss_ScanDesc)
{
index_rescan(node->ioss_ScanDesc, index_rescan(node->ioss_ScanDesc,
node->ioss_ScanKeys, node->ioss_NumScanKeys, node->ioss_ScanKeys, node->ioss_NumScanKeys,
node->ioss_OrderByKeys, node->ioss_NumOrderByKeys); node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
if (reset_parallel_scan && node->ioss_ScanDesc->parallel_scan)
index_parallelrescan(node->ioss_ScanDesc);
}
ExecScanReScan(&node->ss); ExecScanReScan(&node->ss);
} }
@@ -671,6 +657,19 @@ ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
node->ioss_OrderByKeys, node->ioss_NumOrderByKeys); node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
} }
/* ----------------------------------------------------------------
* ExecIndexOnlyScanReInitializeDSM
*
* Reset shared state before beginning a fresh scan.
* ----------------------------------------------------------------
*/
void
ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
ParallelContext *pcxt)
{
index_parallelrescan(node->ioss_ScanDesc);
}
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecIndexOnlyScanInitializeWorker * ExecIndexOnlyScanInitializeWorker
* *

View File

@@ -24,6 +24,7 @@
* ExecIndexRestrPos restores scan position. * ExecIndexRestrPos restores scan position.
* ExecIndexScanEstimate estimates DSM space needed for parallel index scan * ExecIndexScanEstimate estimates DSM space needed for parallel index scan
* ExecIndexScanInitializeDSM initialize DSM for parallel indexscan * ExecIndexScanInitializeDSM initialize DSM for parallel indexscan
* ExecIndexScanReInitializeDSM reinitialize DSM for fresh scan
* ExecIndexScanInitializeWorker attach to DSM info in parallel worker * ExecIndexScanInitializeWorker attach to DSM info in parallel worker
*/ */
#include "postgres.h" #include "postgres.h"
@@ -577,18 +578,6 @@ ExecIndexScan(PlanState *pstate)
void void
ExecReScanIndexScan(IndexScanState *node) ExecReScanIndexScan(IndexScanState *node)
{ {
bool reset_parallel_scan = true;
/*
* If we are here to just update the scan keys, then don't reset parallel
* scan. We don't want each of the participating process in the parallel
* scan to update the shared parallel scan state at the start of the scan.
* It is quite possible that one of the participants has already begun
* scanning the index when another has yet to start it.
*/
if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady)
reset_parallel_scan = false;
/* /*
* If we are doing runtime key calculations (ie, any of the index key * If we are doing runtime key calculations (ie, any of the index key
* values weren't simple Consts), compute the new key values. But first, * values weren't simple Consts), compute the new key values. But first,
@@ -614,21 +603,11 @@ ExecReScanIndexScan(IndexScanState *node)
reorderqueue_pop(node); reorderqueue_pop(node);
} }
/* /* reset index scan */
* Reset (parallel) index scan. For parallel-aware nodes, the scan
* descriptor is initialized during actual execution of node and we can
* reach here before that (ex. during execution of nest loop join). So,
* avoid updating the scan descriptor at that time.
*/
if (node->iss_ScanDesc) if (node->iss_ScanDesc)
{
index_rescan(node->iss_ScanDesc, index_rescan(node->iss_ScanDesc,
node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_ScanKeys, node->iss_NumScanKeys,
node->iss_OrderByKeys, node->iss_NumOrderByKeys); node->iss_OrderByKeys, node->iss_NumOrderByKeys);
if (reset_parallel_scan && node->iss_ScanDesc->parallel_scan)
index_parallelrescan(node->iss_ScanDesc);
}
node->iss_ReachedEnd = false; node->iss_ReachedEnd = false;
ExecScanReScan(&node->ss); ExecScanReScan(&node->ss);
@@ -1716,6 +1695,19 @@ ExecIndexScanInitializeDSM(IndexScanState *node,
node->iss_OrderByKeys, node->iss_NumOrderByKeys); node->iss_OrderByKeys, node->iss_NumOrderByKeys);
} }
/* ----------------------------------------------------------------
* ExecIndexScanReInitializeDSM
*
* Reset shared state before beginning a fresh scan.
* ----------------------------------------------------------------
*/
void
ExecIndexScanReInitializeDSM(IndexScanState *node,
ParallelContext *pcxt)
{
index_parallelrescan(node->iss_ScanDesc);
}
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecIndexScanInitializeWorker * ExecIndexScanInitializeWorker
* *

View File

@@ -22,6 +22,7 @@
* *
* ExecSeqScanEstimate estimates DSM space needed for parallel scan * ExecSeqScanEstimate estimates DSM space needed for parallel scan
* ExecSeqScanInitializeDSM initialize DSM for parallel scan * ExecSeqScanInitializeDSM initialize DSM for parallel scan
* ExecSeqScanReInitializeDSM reinitialize DSM for fresh parallel scan
* ExecSeqScanInitializeWorker attach to DSM info in parallel worker * ExecSeqScanInitializeWorker attach to DSM info in parallel worker
*/ */
#include "postgres.h" #include "postgres.h"
@@ -324,6 +325,21 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
} }
/* ----------------------------------------------------------------
* ExecSeqScanReInitializeDSM
*
* Reset shared state before beginning a fresh scan.
* ----------------------------------------------------------------
*/
void
ExecSeqScanReInitializeDSM(SeqScanState *node,
ParallelContext *pcxt)
{
HeapScanDesc scan = node->ss.ss_currentScanDesc;
heap_parallelscan_reinitialize(scan->rs_parallel);
}
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecSeqScanInitializeWorker * ExecSeqScanInitializeWorker
* *

View File

@@ -130,6 +130,7 @@ extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
extern Size heap_parallelscan_estimate(Snapshot snapshot); extern Size heap_parallelscan_estimate(Snapshot snapshot);
extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
Relation relation, Snapshot snapshot); Relation relation, Snapshot snapshot);
extern void heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan);
extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
extern bool heap_fetch(Relation relation, Snapshot snapshot, extern bool heap_fetch(Relation relation, Snapshot snapshot,

View File

@@ -36,7 +36,8 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers); EState *estate, int nworkers);
extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(PlanState *planstate,
ParallelExecutorInfo *pei);
extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); extern void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);

View File

@@ -24,6 +24,8 @@ extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node, extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
ParallelContext *pcxt);
extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
shm_toc *toc); shm_toc *toc);

View File

@@ -34,6 +34,8 @@ extern void ExecCustomScanEstimate(CustomScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecCustomScanInitializeDSM(CustomScanState *node, extern void ExecCustomScanInitializeDSM(CustomScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecCustomScanReInitializeDSM(CustomScanState *node,
ParallelContext *pcxt);
extern void ExecCustomScanInitializeWorker(CustomScanState *node, extern void ExecCustomScanInitializeWorker(CustomScanState *node,
shm_toc *toc); shm_toc *toc);
extern void ExecShutdownCustomScan(CustomScanState *node); extern void ExecShutdownCustomScan(CustomScanState *node);

View File

@@ -25,6 +25,8 @@ extern void ExecForeignScanEstimate(ForeignScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecForeignScanInitializeDSM(ForeignScanState *node, extern void ExecForeignScanInitializeDSM(ForeignScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
ParallelContext *pcxt);
extern void ExecForeignScanInitializeWorker(ForeignScanState *node, extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
shm_toc *toc); shm_toc *toc);
extern void ExecShutdownForeignScan(ForeignScanState *node); extern void ExecShutdownForeignScan(ForeignScanState *node);

View File

@@ -28,6 +28,8 @@ extern void ExecIndexOnlyScanEstimate(IndexOnlyScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node, extern void ExecIndexOnlyScanInitializeDSM(IndexOnlyScanState *node,
ParallelContext *pcxt); ParallelContext *pcxt);
extern void ExecIndexOnlyScanReInitializeDSM(IndexOnlyScanState *node,
ParallelContext *pcxt);
extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
shm_toc *toc); shm_toc *toc);

View File

@@ -24,6 +24,7 @@ extern void ExecIndexRestrPos(IndexScanState *node);
extern void ExecReScanIndexScan(IndexScanState *node); extern void ExecReScanIndexScan(IndexScanState *node);
extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt); extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc); extern void ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc);
/* /*

View File

@@ -24,6 +24,7 @@ extern void ExecReScanSeqScan(SeqScanState *node);
/* parallel scan support */ /* parallel scan support */
extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanReInitializeDSM(SeqScanState *node, ParallelContext *pcxt);
extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc); extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc);
#endif /* NODESEQSCAN_H */ #endif /* NODESEQSCAN_H */

View File

@@ -148,6 +148,9 @@ typedef Size (*EstimateDSMForeignScan_function) (ForeignScanState *node,
typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node, typedef void (*InitializeDSMForeignScan_function) (ForeignScanState *node,
ParallelContext *pcxt, ParallelContext *pcxt,
void *coordinate); void *coordinate);
typedef void (*ReInitializeDSMForeignScan_function) (ForeignScanState *node,
ParallelContext *pcxt,
void *coordinate);
typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node, typedef void (*InitializeWorkerForeignScan_function) (ForeignScanState *node,
shm_toc *toc, shm_toc *toc,
void *coordinate); void *coordinate);
@@ -224,6 +227,7 @@ typedef struct FdwRoutine
IsForeignScanParallelSafe_function IsForeignScanParallelSafe; IsForeignScanParallelSafe_function IsForeignScanParallelSafe;
EstimateDSMForeignScan_function EstimateDSMForeignScan; EstimateDSMForeignScan_function EstimateDSMForeignScan;
InitializeDSMForeignScan_function InitializeDSMForeignScan; InitializeDSMForeignScan_function InitializeDSMForeignScan;
ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
InitializeWorkerForeignScan_function InitializeWorkerForeignScan; InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
ShutdownForeignScan_function ShutdownForeignScan; ShutdownForeignScan_function ShutdownForeignScan;
} FdwRoutine; } FdwRoutine;

View File

@@ -136,6 +136,9 @@ typedef struct CustomExecMethods
void (*InitializeDSMCustomScan) (CustomScanState *node, void (*InitializeDSMCustomScan) (CustomScanState *node,
ParallelContext *pcxt, ParallelContext *pcxt,
void *coordinate); void *coordinate);
void (*ReInitializeDSMCustomScan) (CustomScanState *node,
ParallelContext *pcxt,
void *coordinate);
void (*InitializeWorkerCustomScan) (CustomScanState *node, void (*InitializeWorkerCustomScan) (CustomScanState *node,
shm_toc *toc, shm_toc *toc,
void *coordinate); void *coordinate);