1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-21 12:05:57 +03:00
postgres/src/backend/executor/nodeAppend.c
Tom Lane 141225b251 Mop up some undue familiarity with the innards of Bitmapsets.
nodeAppend.c used non-nullness of appendstate->as_valid_subplans as
a state flag to indicate whether it'd done ExecFindMatchingSubPlans
(or some sufficient approximation to that).  This was pretty
questionable even in the beginning, since it wouldn't really work
right if there are no valid subplans.  It got more questionable
after commit 27e1f1456 added logic that could reduce as_valid_subplans
to an empty set: at that point we were depending on unspecified
behavior of bms_del_members, namely that it'd not return an empty
set as NULL.  It's about to start doing that, which breaks this
logic entirely.  Hence, add a separate boolean flag to signal
whether as_valid_subplans has been computed.

Also fix a previously-cosmetic bug in nodeAgg.c, wherein it ignored
the return value of bms_del_member instead of updating its pointer.

Patch by me; thanks to Nathan Bossart and Richard Guo for review.

Discussion: https://postgr.es/m/1159933.1677621588@sss.pgh.pa.us
2023-03-02 11:37:37 -05:00

1184 lines
33 KiB
C

/*-------------------------------------------------------------------------
*
* nodeAppend.c
* routines to handle append nodes.
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/executor/nodeAppend.c
*
*-------------------------------------------------------------------------
*/
/* INTERFACE ROUTINES
* ExecInitAppend - initialize the append node
* ExecAppend - retrieve the next tuple from the node
* ExecEndAppend - shut down the append node
* ExecReScanAppend - rescan the append node
*
* NOTES
* Each append node contains a list of one or more subplans which
* must be iteratively processed (forwards or backwards).
* Tuples are retrieved by executing the 'whichplan'th subplan
* until the subplan stops returning tuples, at which point that
* plan is shut down and the next started up.
*
* Append nodes don't make use of their left and right
* subtrees, rather they maintain a list of subplans so
* a typical append node looks like this in the plan tree:
*
* ...
* /
* Append -------+------+------+--- nil
* / \ | | |
* nil nil ... ... ...
* subplans
*
* Append nodes are currently used for unions, and to support
* inheritance queries, where several relations need to be scanned.
* For example, in our standard person/student/employee/student-emp
* example, where student and employee inherit from person
* and student-emp inherits from student and employee, the
* query:
*
* select name from person
*
* generates the plan:
*
* |
* Append -------+-------+--------+--------+
* / \ | | | |
* nil nil Scan Scan Scan Scan
* | | | |
* person employee student student-emp
*/
#include "postgres.h"
#include "executor/execAsync.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
{
LWLock pa_lock; /* mutual exclusion to choose next subplan */
int pa_next_plan; /* next plan to choose by any worker */
/*
* pa_finished[i] should be true if no more workers should select subplan
* i. for a non-partial plan, this should be set to true as soon as a
* worker selects the plan; for a partial plan, it remains false until
* some worker executes the plan to completion.
*/
bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
};
#define INVALID_SUBPLAN_INDEX -1
#define EVENT_BUFFER_SIZE 16
static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
static void mark_invalid_subplans_as_finished(AppendState *node);
static void ExecAppendAsyncBegin(AppendState *node);
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
static void ExecAppendAsyncEventWait(AppendState *node);
static void classify_matching_subplans(AppendState *node);
/* ----------------------------------------------------------------
* ExecInitAppend
*
* Begin all of the subscans of the append node.
*
* (This is potentially wasteful, since the entire result of the
* append node may not be scanned, but this way all of the
* structures get allocated in the executor's top level memory
* block instead of that of the call to ExecAppend.)
* ----------------------------------------------------------------
*/
AppendState *
ExecInitAppend(Append *node, EState *estate, int eflags)
{
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
Bitmapset *validsubplans;
Bitmapset *asyncplans;
int nplans;
int nasyncplans;
int firstvalid;
int i,
j;
/* check for unsupported flags */
Assert(!(eflags & EXEC_FLAG_MARK));
/*
* create new AppendState for our append node
*/
appendstate->ps.plan = (Plan *) node;
appendstate->ps.state = estate;
appendstate->ps.ExecProcNode = ExecAppend;
/* Let choose_next_subplan_* function handle setting the first subplan */
appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
appendstate->as_syncdone = false;
appendstate->as_begun = false;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_index >= 0)
{
PartitionPruneState *prunestate;
/*
* Set up pruning data structure. This also initializes the set of
* subplans to initialize (validsubplans) by taking into account the
* result of performing initial pruning if any.
*/
prunestate = ExecInitPartitionPruning(&appendstate->ps,
list_length(node->appendplans),
node->part_prune_index,
node->apprelids,
&validsubplans);
appendstate->as_prune_state = prunestate;
nplans = bms_num_members(validsubplans);
/*
* When no run-time pruning is required and there's at least one
* subplan, we can fill as_valid_subplans immediately, preventing
* later calls to ExecFindMatchingSubPlans.
*/
if (!prunestate->do_exec_prune && nplans > 0)
{
appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
appendstate->as_valid_subplans_identified = true;
}
}
else
{
nplans = list_length(node->appendplans);
/*
* When run-time partition pruning is not enabled we can just mark all
* subplans as valid; they must also all be initialized.
*/
Assert(nplans > 0);
appendstate->as_valid_subplans = validsubplans =
bms_add_range(NULL, 0, nplans - 1);
appendstate->as_valid_subplans_identified = true;
appendstate->as_prune_state = NULL;
}
/*
* Initialize result tuple type and slot.
*/
ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
/* node returns slots from each of its subnodes, therefore not fixed */
appendstate->ps.resultopsset = true;
appendstate->ps.resultopsfixed = false;
appendplanstates = (PlanState **) palloc(nplans *
sizeof(PlanState *));
/*
* call ExecInitNode on each of the valid plans to be executed and save
* the results into the appendplanstates array.
*
* While at it, find out the first valid partial plan.
*/
j = 0;
asyncplans = NULL;
nasyncplans = 0;
firstvalid = nplans;
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
/*
* Record async subplans. When executing EvalPlanQual, we treat them
* as sync ones; don't do this when initializing an EvalPlanQual plan
* tree.
*/
if (initNode->async_capable && estate->es_epq_active == NULL)
{
asyncplans = bms_add_member(asyncplans, j);
nasyncplans++;
}
/*
* Record the lowest appendplans index which is a valid partial plan.
*/
if (i >= node->first_partial_plan && j < firstvalid)
firstvalid = j;
appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
}
appendstate->as_first_partial_plan = firstvalid;
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
/* Initialize async state */
appendstate->as_asyncplans = asyncplans;
appendstate->as_nasyncplans = nasyncplans;
appendstate->as_asyncrequests = NULL;
appendstate->as_asyncresults = NULL;
appendstate->as_nasyncresults = 0;
appendstate->as_nasyncremain = 0;
appendstate->as_needrequest = NULL;
appendstate->as_eventset = NULL;
appendstate->as_valid_asyncplans = NULL;
if (nasyncplans > 0)
{
appendstate->as_asyncrequests = (AsyncRequest **)
palloc0(nplans * sizeof(AsyncRequest *));
i = -1;
while ((i = bms_next_member(asyncplans, i)) >= 0)
{
AsyncRequest *areq;
areq = palloc(sizeof(AsyncRequest));
areq->requestor = (PlanState *) appendstate;
areq->requestee = appendplanstates[i];
areq->request_index = i;
areq->callback_pending = false;
areq->request_complete = false;
areq->result = NULL;
appendstate->as_asyncrequests[i] = areq;
}
appendstate->as_asyncresults = (TupleTableSlot **)
palloc0(nasyncplans * sizeof(TupleTableSlot *));
if (appendstate->as_valid_subplans_identified)
classify_matching_subplans(appendstate);
}
/*
* Miscellaneous initialization
*/
appendstate->ps.ps_ProjInfo = NULL;
/* For parallel query, this will be overridden later. */
appendstate->choose_next_subplan = choose_next_subplan_locally;
return appendstate;
}
/* ----------------------------------------------------------------
* ExecAppend
*
* Handles iteration over multiple subplans.
* ----------------------------------------------------------------
*/
static TupleTableSlot *
ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
TupleTableSlot *result;
/*
* If this is the first call after Init or ReScan, we need to do the
* initialization work.
*/
if (!node->as_begun)
{
Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
Assert(!node->as_syncdone);
/* Nothing to do if there are no subplans */
if (node->as_nplans == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
/* If there are any async subplans, begin executing them. */
if (node->as_nasyncplans > 0)
ExecAppendAsyncBegin(node);
/*
* If no sync subplan has been chosen, we must choose one before
* proceeding.
*/
if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
Assert(node->as_syncdone ||
(node->as_whichplan >= 0 &&
node->as_whichplan < node->as_nplans));
/* And we're initialized. */
node->as_begun = true;
}
for (;;)
{
PlanState *subnode;
CHECK_FOR_INTERRUPTS();
/*
* try to get a tuple from an async subplan if any
*/
if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
{
if (ExecAppendAsyncGetNext(node, &result))
return result;
Assert(!node->as_syncdone);
Assert(bms_is_empty(node->as_needrequest));
}
/*
* figure out which sync subplan we are currently processing
*/
Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
subnode = node->appendplans[node->as_whichplan];
/*
* get a tuple from the subplan
*/
result = ExecProcNode(subnode);
if (!TupIsNull(result))
{
/*
* If the subplan gave us something then return it as-is. We do
* NOT make use of the result slot that was set up in
* ExecInitAppend; there's no need for it.
*/
return result;
}
/*
* wait or poll for async events if any. We do this before checking
* for the end of iteration, because it might drain the remaining
* async subplans.
*/
if (node->as_nasyncremain > 0)
ExecAppendAsyncEventWait(node);
/* choose new sync subplan; if no sync/async subplans, we're done */
if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
}
/* ----------------------------------------------------------------
* ExecEndAppend
*
* Shuts down the subscans of the append node.
*
* Returns nothing of interest.
* ----------------------------------------------------------------
*/
void
ExecEndAppend(AppendState *node)
{
PlanState **appendplans;
int nplans;
int i;
/*
* get information from the node
*/
appendplans = node->appendplans;
nplans = node->as_nplans;
/*
* shut down each of the subscans
*/
for (i = 0; i < nplans; i++)
ExecEndNode(appendplans[i]);
}
void
ExecReScanAppend(AppendState *node)
{
int nasyncplans = node->as_nasyncplans;
int i;
/*
* If any PARAM_EXEC Params used in pruning expressions have changed, then
* we'd better unset the valid subplans so that they are reselected for
* the new parameter values.
*/
if (node->as_prune_state &&
bms_overlap(node->ps.chgParam,
node->as_prune_state->execparamids))
{
node->as_valid_subplans_identified = false;
bms_free(node->as_valid_subplans);
node->as_valid_subplans = NULL;
bms_free(node->as_valid_asyncplans);
node->as_valid_asyncplans = NULL;
}
for (i = 0; i < node->as_nplans; i++)
{
PlanState *subnode = node->appendplans[i];
/*
* ExecReScan doesn't know about my subplans, so I have to do
* changed-parameter signaling myself.
*/
if (node->ps.chgParam != NULL)
UpdateChangedParamSet(subnode, node->ps.chgParam);
/*
* If chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode or by first ExecAsyncRequest.
*/
if (subnode->chgParam == NULL)
ExecReScan(subnode);
}
/* Reset async state */
if (nasyncplans > 0)
{
i = -1;
while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
{
AsyncRequest *areq = node->as_asyncrequests[i];
areq->callback_pending = false;
areq->request_complete = false;
areq->result = NULL;
}
node->as_nasyncresults = 0;
node->as_nasyncremain = 0;
bms_free(node->as_needrequest);
node->as_needrequest = NULL;
}
/* Let choose_next_subplan_* function handle setting the first subplan */
node->as_whichplan = INVALID_SUBPLAN_INDEX;
node->as_syncdone = false;
node->as_begun = false;
}
/* ----------------------------------------------------------------
* Parallel Append Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecAppendEstimate
*
* Compute the amount of space we'll need in the parallel
* query DSM, and inform pcxt->estimator about our needs.
* ----------------------------------------------------------------
*/
void
ExecAppendEstimate(AppendState *node,
ParallelContext *pcxt)
{
node->pstate_len =
add_size(offsetof(ParallelAppendState, pa_finished),
sizeof(bool) * node->as_nplans);
shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/* ----------------------------------------------------------------
* ExecAppendInitializeDSM
*
* Set up shared state for Parallel Append.
* ----------------------------------------------------------------
*/
void
ExecAppendInitializeDSM(AppendState *node,
ParallelContext *pcxt)
{
ParallelAppendState *pstate;
pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
memset(pstate, 0, node->pstate_len);
LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
node->as_pstate = pstate;
node->choose_next_subplan = choose_next_subplan_for_leader;
}
/* ----------------------------------------------------------------
* ExecAppendReInitializeDSM
*
* Reset shared state before beginning a fresh scan.
* ----------------------------------------------------------------
*/
void
ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
{
ParallelAppendState *pstate = node->as_pstate;
pstate->pa_next_plan = 0;
memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
}
/* ----------------------------------------------------------------
* ExecAppendInitializeWorker
*
* Copy relevant information from TOC into planstate, and initialize
* whatever is required to choose and execute the optimal subplan.
* ----------------------------------------------------------------
*/
void
ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
{
node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
node->choose_next_subplan = choose_next_subplan_for_worker;
}
/* ----------------------------------------------------------------
* choose_next_subplan_locally
*
* Choose next sync subplan for a non-parallel-aware Append,
* returning false if there are no more.
* ----------------------------------------------------------------
*/
static bool
choose_next_subplan_locally(AppendState *node)
{
int whichplan = node->as_whichplan;
int nextplan;
/* We should never be called when there are no subplans */
Assert(node->as_nplans > 0);
/* Nothing to do if syncdone */
if (node->as_syncdone)
return false;
/*
* If first call then have the bms member function choose the first valid
* sync subplan by initializing whichplan to -1. If there happen to be no
* valid sync subplans then the bms member function will handle that by
* returning a negative number which will allow us to exit returning a
* false value.
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
{
if (node->as_nasyncplans > 0)
{
/* We'd have filled as_valid_subplans already */
Assert(node->as_valid_subplans_identified);
}
else if (!node->as_valid_subplans_identified)
{
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state, false);
node->as_valid_subplans_identified = true;
}
whichplan = -1;
}
/* Ensure whichplan is within the expected range */
Assert(whichplan >= -1 && whichplan <= node->as_nplans);
if (ScanDirectionIsForward(node->ps.state->es_direction))
nextplan = bms_next_member(node->as_valid_subplans, whichplan);
else
nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
if (nextplan < 0)
{
/* Set as_syncdone if in async mode */
if (node->as_nasyncplans > 0)
node->as_syncdone = true;
return false;
}
node->as_whichplan = nextplan;
return true;
}
/* ----------------------------------------------------------------
* choose_next_subplan_for_leader
*
* Try to pick a plan which doesn't commit us to doing much
* work locally, so that as much work as possible is done in
* the workers. Cheapest subplans are at the end.
* ----------------------------------------------------------------
*/
static bool
choose_next_subplan_for_leader(AppendState *node)
{
ParallelAppendState *pstate = node->as_pstate;
/* Backward scan is not supported by parallel-aware plans */
Assert(ScanDirectionIsForward(node->ps.state->es_direction));
/* We should never be called when there are no subplans */
Assert(node->as_nplans > 0);
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
{
/* Mark just-completed subplan as finished. */
node->as_pstate->pa_finished[node->as_whichplan] = true;
}
else
{
/* Start with last subplan. */
node->as_whichplan = node->as_nplans - 1;
/*
* If we've yet to determine the valid subplans then do so now. If
* run-time pruning is disabled then the valid subplans will always be
* set to all subplans.
*/
if (!node->as_valid_subplans_identified)
{
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state, false);
node->as_valid_subplans_identified = true;
/*
* Mark each invalid plan as finished to allow the loop below to
* select the first valid subplan.
*/
mark_invalid_subplans_as_finished(node);
}
}
/* Loop until we find a subplan to execute. */
while (pstate->pa_finished[node->as_whichplan])
{
if (node->as_whichplan == 0)
{
pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
node->as_whichplan = INVALID_SUBPLAN_INDEX;
LWLockRelease(&pstate->pa_lock);
return false;
}
/*
* We needn't pay attention to as_valid_subplans here as all invalid
* plans have been marked as finished.
*/
node->as_whichplan--;
}
/* If non-partial, immediately mark as finished. */
if (node->as_whichplan < node->as_first_partial_plan)
node->as_pstate->pa_finished[node->as_whichplan] = true;
LWLockRelease(&pstate->pa_lock);
return true;
}
/* ----------------------------------------------------------------
* choose_next_subplan_for_worker
*
* Choose next subplan for a parallel-aware Append, returning
* false if there are no more.
*
* We start from the first plan and advance through the list;
* when we get back to the end, we loop back to the first
* partial plan. This assigns the non-partial plans first in
* order of descending cost and then spreads out the workers
* as evenly as possible across the remaining partial plans.
* ----------------------------------------------------------------
*/
static bool
choose_next_subplan_for_worker(AppendState *node)
{
ParallelAppendState *pstate = node->as_pstate;
/* Backward scan is not supported by parallel-aware plans */
Assert(ScanDirectionIsForward(node->ps.state->es_direction));
/* We should never be called when there are no subplans */
Assert(node->as_nplans > 0);
LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
/* Mark just-completed subplan as finished. */
if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
node->as_pstate->pa_finished[node->as_whichplan] = true;
/*
* If we've yet to determine the valid subplans then do so now. If
* run-time pruning is disabled then the valid subplans will always be set
* to all subplans.
*/
else if (!node->as_valid_subplans_identified)
{
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state, false);
node->as_valid_subplans_identified = true;
mark_invalid_subplans_as_finished(node);
}
/* If all the plans are already done, we have nothing to do */
if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
{
LWLockRelease(&pstate->pa_lock);
return false;
}
/* Save the plan from which we are starting the search. */
node->as_whichplan = pstate->pa_next_plan;
/* Loop until we find a valid subplan to execute. */
while (pstate->pa_finished[pstate->pa_next_plan])
{
int nextplan;
nextplan = bms_next_member(node->as_valid_subplans,
pstate->pa_next_plan);
if (nextplan >= 0)
{
/* Advance to the next valid plan. */
pstate->pa_next_plan = nextplan;
}
else if (node->as_whichplan > node->as_first_partial_plan)
{
/*
* Try looping back to the first valid partial plan, if there is
* one. If there isn't, arrange to bail out below.
*/
nextplan = bms_next_member(node->as_valid_subplans,
node->as_first_partial_plan - 1);
pstate->pa_next_plan =
nextplan < 0 ? node->as_whichplan : nextplan;
}
else
{
/*
* At last plan, and either there are no partial plans or we've
* tried them all. Arrange to bail out.
*/
pstate->pa_next_plan = node->as_whichplan;
}
if (pstate->pa_next_plan == node->as_whichplan)
{
/* We've tried everything! */
pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
LWLockRelease(&pstate->pa_lock);
return false;
}
}
/* Pick the plan we found, and advance pa_next_plan one more time. */
node->as_whichplan = pstate->pa_next_plan;
pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
pstate->pa_next_plan);
/*
* If there are no more valid plans then try setting the next plan to the
* first valid partial plan.
*/
if (pstate->pa_next_plan < 0)
{
int nextplan = bms_next_member(node->as_valid_subplans,
node->as_first_partial_plan - 1);
if (nextplan >= 0)
pstate->pa_next_plan = nextplan;
else
{
/*
* There are no valid partial plans, and we already chose the last
* non-partial plan; so flag that there's nothing more for our
* fellow workers to do.
*/
pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
}
}
/* If non-partial, immediately mark as finished. */
if (node->as_whichplan < node->as_first_partial_plan)
node->as_pstate->pa_finished[node->as_whichplan] = true;
LWLockRelease(&pstate->pa_lock);
return true;
}
/*
* mark_invalid_subplans_as_finished
* Marks the ParallelAppendState's pa_finished as true for each invalid
* subplan.
*
* This function should only be called for parallel Append with run-time
* pruning enabled.
*/
static void
mark_invalid_subplans_as_finished(AppendState *node)
{
int i;
/* Only valid to call this while in parallel Append mode */
Assert(node->as_pstate);
/* Shouldn't have been called when run-time pruning is not enabled */
Assert(node->as_prune_state);
/* Nothing to do if all plans are valid */
if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
return;
/* Mark all non-valid plans as finished */
for (i = 0; i < node->as_nplans; i++)
{
if (!bms_is_member(i, node->as_valid_subplans))
node->as_pstate->pa_finished[i] = true;
}
}
/* ----------------------------------------------------------------
* Asynchronous Append Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecAppendAsyncBegin
*
* Begin executing designed async-capable subplans.
* ----------------------------------------------------------------
*/
static void
ExecAppendAsyncBegin(AppendState *node)
{
int i;
/* Backward scan is not supported by async-aware Appends. */
Assert(ScanDirectionIsForward(node->ps.state->es_direction));
/* We should never be called when there are no subplans */
Assert(node->as_nplans > 0);
/* We should never be called when there are no async subplans. */
Assert(node->as_nasyncplans > 0);
/* If we've yet to determine the valid subplans then do so now. */
if (!node->as_valid_subplans_identified)
{
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state, false);
node->as_valid_subplans_identified = true;
classify_matching_subplans(node);
}
/* Initialize state variables. */
node->as_syncdone = bms_is_empty(node->as_valid_subplans);
node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
/* Nothing to do if there are no valid async subplans. */
if (node->as_nasyncremain == 0)
return;
/* Make a request for each of the valid async subplans. */
i = -1;
while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
{
AsyncRequest *areq = node->as_asyncrequests[i];
Assert(areq->request_index == i);
Assert(!areq->callback_pending);
/* Do the actual work. */
ExecAsyncRequest(areq);
}
}
/* ----------------------------------------------------------------
* ExecAppendAsyncGetNext
*
* Get the next tuple from any of the asynchronous subplans.
* ----------------------------------------------------------------
*/
static bool
ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
{
*result = NULL;
/* We should never be called when there are no valid async subplans. */
Assert(node->as_nasyncremain > 0);
/* Request a tuple asynchronously. */
if (ExecAppendAsyncRequest(node, result))
return true;
while (node->as_nasyncremain > 0)
{
CHECK_FOR_INTERRUPTS();
/* Wait or poll for async events. */
ExecAppendAsyncEventWait(node);
/* Request a tuple asynchronously. */
if (ExecAppendAsyncRequest(node, result))
return true;
/* Break from loop if there's any sync subplan that isn't complete. */
if (!node->as_syncdone)
break;
}
/*
* If all sync subplans are complete, we're totally done scanning the
* given node. Otherwise, we're done with the asynchronous stuff but must
* continue scanning the sync subplans.
*/
if (node->as_syncdone)
{
Assert(node->as_nasyncremain == 0);
*result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
return true;
}
return false;
}
/* ----------------------------------------------------------------
* ExecAppendAsyncRequest
*
* Request a tuple asynchronously.
* ----------------------------------------------------------------
*/
static bool
ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
{
Bitmapset *needrequest;
int i;
/* Nothing to do if there are no async subplans needing a new request. */
if (bms_is_empty(node->as_needrequest))
{
Assert(node->as_nasyncresults == 0);
return false;
}
/*
* If there are any asynchronously-generated results that have not yet
* been returned, we have nothing to do; just return one of them.
*/
if (node->as_nasyncresults > 0)
{
--node->as_nasyncresults;
*result = node->as_asyncresults[node->as_nasyncresults];
return true;
}
/* Make a new request for each of the async subplans that need it. */
needrequest = node->as_needrequest;
node->as_needrequest = NULL;
i = -1;
while ((i = bms_next_member(needrequest, i)) >= 0)
{
AsyncRequest *areq = node->as_asyncrequests[i];
/* Do the actual work. */
ExecAsyncRequest(areq);
}
bms_free(needrequest);
/* Return one of the asynchronously-generated results if any. */
if (node->as_nasyncresults > 0)
{
--node->as_nasyncresults;
*result = node->as_asyncresults[node->as_nasyncresults];
return true;
}
return false;
}
/* ----------------------------------------------------------------
* ExecAppendAsyncEventWait
*
* Wait or poll for file descriptor events and fire callbacks.
* ----------------------------------------------------------------
*/
static void
ExecAppendAsyncEventWait(AppendState *node)
{
int nevents = node->as_nasyncplans + 1;
long timeout = node->as_syncdone ? -1 : 0;
WaitEvent occurred_event[EVENT_BUFFER_SIZE];
int noccurred;
int i;
/* We should never be called when there are no valid async subplans. */
Assert(node->as_nasyncremain > 0);
node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents);
AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
/* Give each waiting subplan a chance to add an event. */
i = -1;
while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
{
AsyncRequest *areq = node->as_asyncrequests[i];
if (areq->callback_pending)
ExecAsyncConfigureWait(areq);
}
/*
* No need for further processing if there are no configured events other
* than the postmaster death event.
*/
if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
{
FreeWaitEventSet(node->as_eventset);
node->as_eventset = NULL;
return;
}
/* We wait on at most EVENT_BUFFER_SIZE events. */
if (nevents > EVENT_BUFFER_SIZE)
nevents = EVENT_BUFFER_SIZE;
/*
* If the timeout is -1, wait until at least one event occurs. If the
* timeout is 0, poll for events, but do not wait at all.
*/
noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
nevents, WAIT_EVENT_APPEND_READY);
FreeWaitEventSet(node->as_eventset);
node->as_eventset = NULL;
if (noccurred == 0)
return;
/* Deliver notifications. */
for (i = 0; i < noccurred; i++)
{
WaitEvent *w = &occurred_event[i];
/*
* Each waiting subplan should have registered its wait event with
* user_data pointing back to its AsyncRequest.
*/
if ((w->events & WL_SOCKET_READABLE) != 0)
{
AsyncRequest *areq = (AsyncRequest *) w->user_data;
if (areq->callback_pending)
{
/*
* Mark it as no longer needing a callback. We must do this
* before dispatching the callback in case the callback resets
* the flag.
*/
areq->callback_pending = false;
/* Do the actual work. */
ExecAsyncNotify(areq);
}
}
}
}
/* ----------------------------------------------------------------
* ExecAsyncAppendResponse
*
* Receive a response from an asynchronous request we made.
* ----------------------------------------------------------------
*/
void
ExecAsyncAppendResponse(AsyncRequest *areq)
{
AppendState *node = (AppendState *) areq->requestor;
TupleTableSlot *slot = areq->result;
/* The result should be a TupleTableSlot or NULL. */
Assert(slot == NULL || IsA(slot, TupleTableSlot));
/* Nothing to do if the request is pending. */
if (!areq->request_complete)
{
/* The request would have been pending for a callback. */
Assert(areq->callback_pending);
return;
}
/* If the result is NULL or an empty slot, there's nothing more to do. */
if (TupIsNull(slot))
{
/* The ending subplan wouldn't have been pending for a callback. */
Assert(!areq->callback_pending);
--node->as_nasyncremain;
return;
}
/* Save result so we can return it. */
Assert(node->as_nasyncresults < node->as_nasyncplans);
node->as_asyncresults[node->as_nasyncresults++] = slot;
/*
* Mark the subplan that returned a result as ready for a new request. We
* don't launch another one here immediately because it might complete.
*/
node->as_needrequest = bms_add_member(node->as_needrequest,
areq->request_index);
}
/* ----------------------------------------------------------------
* classify_matching_subplans
*
* Classify the node's as_valid_subplans into sync ones and
* async ones, adjust it to contain sync ones only, and save
* async ones in the node's as_valid_asyncplans.
* ----------------------------------------------------------------
*/
static void
classify_matching_subplans(AppendState *node)
{
Bitmapset *valid_asyncplans;
Assert(node->as_valid_subplans_identified);
Assert(node->as_valid_asyncplans == NULL);
/* Nothing to do if there are no valid subplans. */
if (bms_is_empty(node->as_valid_subplans))
{
node->as_syncdone = true;
node->as_nasyncremain = 0;
return;
}
/* Nothing to do if there are no valid async subplans. */
if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
{
node->as_nasyncremain = 0;
return;
}
/* Get valid async subplans. */
valid_asyncplans = bms_intersect(node->as_asyncplans,
node->as_valid_subplans);
/* Adjust the valid subplans to contain sync subplans only. */
node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
valid_asyncplans);
/* Save valid async subplans. */
node->as_valid_asyncplans = valid_asyncplans;
}