1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-16 06:01:02 +03:00

Simplify executor's determination of whether to use parallelism.

Our parallel-mode code only works when we are executing a query
in full, so ExecutePlan must disable parallel mode when it is
asked to do partial execution.  The previous logic for this
involved passing down a flag (variously named execute_once or
run_once) from callers of ExecutorRun or PortalRun.  This is
overcomplicated, and unsurprisingly some of the callers didn't
get it right, since it requires keeping state that not all of
them have handy; not to mention that the requirements for it were
undocumented.  That led to assertion failures in some corner
cases.  The only state we really need for this is the existing
QueryDesc.already_executed flag, so let's just put all the
responsibility in ExecutePlan.  (It could have been done in
ExecutorRun too, leading to a slightly shorter patch -- but if
there's ever more than one caller of ExecutePlan, it seems better
to have this logic in the subroutine than the callers.)

This makes those ExecutorRun/PortalRun parameters unnecessary.
In master it seems okay to just remove them, returning the
API for those functions to what it was before parallelism.
Such an API break is clearly not okay in stable branches,
but for them we can just leave the parameters in place after
documenting that they do nothing.

Per report from Yugo Nagata, who also reviewed and tested
this patch.  Back-patch to all supported branches.

Discussion: https://postgr.es/m/20241206062549.710dc01cf91224809dd6c0e1@sraoss.co.jp
This commit is contained in:
Tom Lane
2024-12-09 14:38:19 -05:00
parent 4d8275046c
commit 3eea7a0c97
19 changed files with 49 additions and 73 deletions

View File

@ -79,7 +79,7 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags);
static void explain_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction,
uint64 count, bool execute_once);
uint64 count);
static void explain_ExecutorFinish(QueryDesc *queryDesc);
static void explain_ExecutorEnd(QueryDesc *queryDesc);
@ -321,15 +321,15 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags)
*/
static void
explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
uint64 count, bool execute_once)
uint64 count)
{
nesting_level++;
PG_TRY();
{
if (prev_ExecutorRun)
prev_ExecutorRun(queryDesc, direction, count, execute_once);
prev_ExecutorRun(queryDesc, direction, count);
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
standard_ExecutorRun(queryDesc, direction, count);
}
PG_FINALLY();
{

View File

@ -335,7 +335,7 @@ static PlannedStmt *pgss_planner(Query *parse,
static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags);
static void pgss_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction,
uint64 count, bool execute_once);
uint64 count);
static void pgss_ExecutorFinish(QueryDesc *queryDesc);
static void pgss_ExecutorEnd(QueryDesc *queryDesc);
static void pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
@ -1021,16 +1021,15 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags)
* ExecutorRun hook: all we need do is track nesting depth
*/
static void
pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
bool execute_once)
pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count)
{
nesting_level++;
PG_TRY();
{
if (prev_ExecutorRun)
prev_ExecutorRun(queryDesc, direction, count, execute_once);
prev_ExecutorRun(queryDesc, direction, count);
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
standard_ExecutorRun(queryDesc, direction, count);
}
PG_FINALLY();
{

View File

@ -880,7 +880,7 @@ DoCopyTo(CopyToState cstate)
else
{
/* run the plan --- the dest receiver will send tuples */
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0, true);
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0);
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}

View File

@ -340,7 +340,7 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
ExecutorStart(queryDesc, GetIntoRelEFlags(into));
/* run the plan to completion */
ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
ExecutorRun(queryDesc, ForwardScanDirection, 0);
/* save the rowcount if we're given a qc to fill */
if (qc)

View File

@ -719,7 +719,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
dir = ForwardScanDirection;
/* run the plan */
ExecutorRun(queryDesc, dir, 0, true);
ExecutorRun(queryDesc, dir, 0);
/* run cleanup too */
ExecutorFinish(queryDesc);

View File

@ -912,7 +912,7 @@ execute_sql_string(const char *sql, const char *filename)
dest, NULL, NULL, 0);
ExecutorStart(qdesc, 0);
ExecutorRun(qdesc, ForwardScanDirection, 0, true);
ExecutorRun(qdesc, ForwardScanDirection, 0);
ExecutorFinish(qdesc);
ExecutorEnd(qdesc);

View File

@ -446,7 +446,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
ExecutorStart(queryDesc, 0);
/* run the plan */
ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
ExecutorRun(queryDesc, ForwardScanDirection, 0);
processed = queryDesc->estate->es_processed;

View File

@ -427,7 +427,7 @@ PersistHoldablePortal(Portal portal)
NULL);
/* Fetch the result set into the tuplestore */
ExecutorRun(queryDesc, direction, 0, false);
ExecutorRun(queryDesc, direction, 0);
queryDesc->dest->rDestroy(queryDesc->dest);
queryDesc->dest = NULL;

View File

@ -252,7 +252,7 @@ ExecuteQuery(ParseState *pstate,
*/
PortalStart(portal, paramLI, eflags, GetActiveSnapshot());
(void) PortalRun(portal, count, false, true, dest, dest, qc);
(void) PortalRun(portal, count, false, dest, dest, qc);
PortalDrop(portal, false);

View File

@ -77,14 +77,12 @@ static void InitPlan(QueryDesc *queryDesc, int eflags);
static void CheckValidRowMarkRel(Relation rel, RowMarkType markType);
static void ExecPostprocessPlan(EState *estate);
static void ExecEndPlan(PlanState *planstate, EState *estate);
static void ExecutePlan(EState *estate, PlanState *planstate,
bool use_parallel_mode,
static void ExecutePlan(QueryDesc *queryDesc,
CmdType operation,
bool sendTuples,
uint64 numberTuples,
ScanDirection direction,
DestReceiver *dest,
bool execute_once);
DestReceiver *dest);
static bool ExecCheckOneRelPerms(RTEPermissionInfo *perminfo);
static bool ExecCheckPermissionsModified(Oid relOid, Oid userid,
Bitmapset *modifiedCols,
@ -294,18 +292,17 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
*/
void
ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count,
bool execute_once)
ScanDirection direction, uint64 count)
{
if (ExecutorRun_hook)
(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
(*ExecutorRun_hook) (queryDesc, direction, count);
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
standard_ExecutorRun(queryDesc, direction, count);
}
void
standard_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once)
ScanDirection direction, uint64 count)
{
EState *estate;
CmdType operation;
@ -354,21 +351,12 @@ standard_ExecutorRun(QueryDesc *queryDesc,
* run plan
*/
if (!ScanDirectionIsNoMovement(direction))
{
if (execute_once && queryDesc->already_executed)
elog(ERROR, "can't re-execute query flagged for single execution");
queryDesc->already_executed = true;
ExecutePlan(estate,
queryDesc->planstate,
queryDesc->plannedstmt->parallelModeNeeded,
ExecutePlan(queryDesc,
operation,
sendTuples,
count,
direction,
dest,
execute_once);
}
dest);
/*
* Update es_total_processed to keep track of the number of tuples
@ -1601,22 +1589,19 @@ ExecCloseRangeTableRelations(EState *estate)
* moving in the specified direction.
*
* Runs to completion if numberTuples is 0
*
* Note: the ctid attribute is a 'junk' attribute that is removed before the
* user can see it
* ----------------------------------------------------------------
*/
static void
ExecutePlan(EState *estate,
PlanState *planstate,
bool use_parallel_mode,
ExecutePlan(QueryDesc *queryDesc,
CmdType operation,
bool sendTuples,
uint64 numberTuples,
ScanDirection direction,
DestReceiver *dest,
bool execute_once)
DestReceiver *dest)
{
EState *estate = queryDesc->estate;
PlanState *planstate = queryDesc->planstate;
bool use_parallel_mode;
TupleTableSlot *slot;
uint64 current_tuple_count;
@ -1631,11 +1616,17 @@ ExecutePlan(EState *estate,
estate->es_direction = direction;
/*
* If the plan might potentially be executed multiple times, we must force
* it to run without parallelism, because we might exit early.
* Set up parallel mode if appropriate.
*
* Parallel mode only supports complete execution of a plan. If we've
* already partially executed it, or if the caller asks us to exit early,
* we must force the plan to run without parallelism.
*/
if (!execute_once)
if (queryDesc->already_executed || numberTuples != 0)
use_parallel_mode = false;
else
use_parallel_mode = queryDesc->plannedstmt->parallelModeNeeded;
queryDesc->already_executed = true;
estate->es_use_parallel_mode = use_parallel_mode;
if (use_parallel_mode)

View File

@ -1471,8 +1471,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
*/
ExecutorRun(queryDesc,
ForwardScanDirection,
fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
true);
fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
/* Shut down the executor */
ExecutorFinish(queryDesc);

View File

@ -894,7 +894,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
/* Run regular commands to completion unless lazyEval */
uint64 count = (es->lazyEval) ? 1 : 0;
ExecutorRun(es->qd, ForwardScanDirection, count, !fcache->returnsSet || !es->lazyEval);
ExecutorRun(es->qd, ForwardScanDirection, count);
/*
* If we requested run to completion OR there was no tuple returned,

View File

@ -2929,7 +2929,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
ExecutorStart(queryDesc, eflags);
ExecutorRun(queryDesc, ForwardScanDirection, tcount, true);
ExecutorRun(queryDesc, ForwardScanDirection, tcount);
_SPI_current->processed = queryDesc->estate->es_processed;

View File

@ -1283,7 +1283,6 @@ exec_simple_query(const char *query_string)
(void) PortalRun(portal,
FETCH_ALL,
true, /* always top level */
true,
receiver,
receiver,
&qc);
@ -2260,7 +2259,6 @@ exec_execute_message(const char *portal_name, long max_rows)
completed = PortalRun(portal,
max_rows,
true, /* always top level */
!execute_is_fetch && max_rows == FETCH_ALL,
receiver,
receiver,
&qc);

View File

@ -157,7 +157,7 @@ ProcessQuery(PlannedStmt *plan,
/*
* Run the plan to completion.
*/
ExecutorRun(queryDesc, ForwardScanDirection, 0, true);
ExecutorRun(queryDesc, ForwardScanDirection, 0);
/*
* Build command completion status data, if caller wants one.
@ -681,7 +681,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats)
* suspended due to exhaustion of the count parameter.
*/
bool
PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
PortalRun(Portal portal, long count, bool isTopLevel,
DestReceiver *dest, DestReceiver *altdest,
QueryCompletion *qc)
{
@ -714,10 +714,6 @@ PortalRun(Portal portal, long count, bool isTopLevel, bool run_once,
*/
MarkPortalActive(portal);
/* Set run_once flag. Shouldn't be clear if previously set. */
Assert(!portal->run_once || run_once);
portal->run_once = run_once;
/*
* Set up global portal context pointers.
*
@ -921,8 +917,7 @@ PortalRunSelect(Portal portal,
else
{
PushActiveSnapshot(queryDesc->snapshot);
ExecutorRun(queryDesc, direction, (uint64) count,
portal->run_once);
ExecutorRun(queryDesc, direction, (uint64) count);
nprocessed = queryDesc->estate->es_processed;
PopActiveSnapshot();
}
@ -961,8 +956,7 @@ PortalRunSelect(Portal portal,
else
{
PushActiveSnapshot(queryDesc->snapshot);
ExecutorRun(queryDesc, direction, (uint64) count,
portal->run_once);
ExecutorRun(queryDesc, direction, (uint64) count);
nprocessed = queryDesc->estate->es_processed;
PopActiveSnapshot();
}
@ -1406,9 +1400,6 @@ PortalRunFetch(Portal portal,
*/
MarkPortalActive(portal);
/* If supporting FETCH, portal can't be run-once. */
Assert(!portal->run_once);
/*
* Set up global portal context pointers.
*/

View File

@ -48,7 +48,7 @@ typedef struct QueryDesc
EState *estate; /* executor's query-wide state */
PlanState *planstate; /* tree of per-plan-node state */
/* This field is set by ExecutorRun */
/* This field is set by ExecutePlan */
bool already_executed; /* true if previously executed */
/* This is always set NULL by the core system, but plugins can change it */

View File

@ -78,8 +78,7 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook;
/* Hook for plugins to get control in ExecutorRun() */
typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc,
ScanDirection direction,
uint64 count,
bool execute_once);
uint64 count);
extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
/* Hook for plugins to get control in ExecutorFinish() */
@ -200,9 +199,9 @@ ExecGetJunkAttribute(TupleTableSlot *slot, AttrNumber attno, bool *isNull)
extern void ExecutorStart(QueryDesc *queryDesc, int eflags);
extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags);
extern void ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once);
ScanDirection direction, uint64 count);
extern void standard_ExecutorRun(QueryDesc *queryDesc,
ScanDirection direction, uint64 count, bool execute_once);
ScanDirection direction, uint64 count);
extern void ExecutorFinish(QueryDesc *queryDesc);
extern void standard_ExecutorFinish(QueryDesc *queryDesc);
extern void ExecutorEnd(QueryDesc *queryDesc);

View File

@ -36,7 +36,7 @@ extern void PortalSetResultFormat(Portal portal, int nFormats,
int16 *formats);
extern bool PortalRun(Portal portal, long count, bool isTopLevel,
bool run_once, DestReceiver *dest, DestReceiver *altdest,
DestReceiver *dest, DestReceiver *altdest,
QueryCompletion *qc);
extern uint64 PortalRunFetch(Portal portal,

View File

@ -145,7 +145,6 @@ typedef struct PortalData
/* Features/options */
PortalStrategy strategy; /* see above */
int cursorOptions; /* DECLARE CURSOR option bits */
bool run_once; /* portal will only be run once */
/* Status data */
PortalStatus status; /* see above */