1
0
mirror of https://github.com/postgres/postgres.git synced 2025-05-28 05:21:27 +03:00

Allow parallel query for prepared statements with generic plans.

This was always intended to work, but due to an oversight in
max_parallel_hazard_walker, it didn't.  In testing, we missed the
fact that it was only working for custom plans, where the parameter
value has been substituted for the parameter itself early enough
that everything worked.  In a generic plan, the Param node survives
and must be treated as parallel-safe.  SerializeParamList provides
for the transmission of parameter values to workers.

Amit Kapila with help from Kuntal Ghosh.  Some changes by me.

Discussion: http://postgr.es/m/CAA4eK1+_BuZrmVCeua5Eqnm4Co9DAXdM5HPAOE2J19ePbR912Q@mail.gmail.com
This commit is contained in:
Robert Haas 2017-10-27 22:22:39 +02:00
parent 69125c883d
commit a87c0c7631
4 changed files with 78 additions and 23 deletions

View File

@ -1223,13 +1223,17 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
/*
* We can't pass Params to workers at the moment either, so they are also
* parallel-restricted, unless they are PARAM_EXEC Params listed in
* safe_param_ids, meaning they could be generated within the worker.
* parallel-restricted, unless they are PARAM_EXTERN Params or are
* PARAM_EXEC Params listed in safe_param_ids, meaning they could be
* generated within the worker.
*/
else if (IsA(node, Param))
{
Param *param = (Param *) node;
if (param->paramkind == PARAM_EXTERN)
return false;
if (param->paramkind != PARAM_EXEC ||
!list_member_int(context->safe_param_ids, param->paramid))
{

View File

@ -6821,7 +6821,7 @@ exec_simple_recheck_plan(PLpgSQL_expr *expr, CachedPlan *cplan)
{
PlannedStmt *stmt;
Plan *plan;
TargetEntry *tle;
Expr *tle_expr;
/*
* Initialize to "not simple", and remember the plan generation number we
@ -6836,18 +6836,51 @@ exec_simple_recheck_plan(PLpgSQL_expr *expr, CachedPlan *cplan)
if (list_length(cplan->stmt_list) != 1)
return;
stmt = linitial_node(PlannedStmt, cplan->stmt_list);
/*
* 2. It must be a RESULT plan --> no scan's required
*/
if (stmt->commandType != CMD_SELECT)
return;
plan = stmt->planTree;
if (!IsA(plan, Result))
return;
/*
* 3. Can't have any subplan or qual clause, either
* 2. Ordinarily, the plan node should be a simple Result. However, if
* force_parallel_mode is on, the planner might've stuck a Gather node
* atop that. The simplest way to deal with this is to look through the
* Gather node. The Gather node's tlist would normally contain a Var
* referencing the child node's output, but it could also be a Param, or
* it could be a Const that setrefs.c copied as-is.
*/
plan = stmt->planTree;
for (;;)
{
/*
* 3. The plan must have a single attribute as result
*/
if (list_length(plan->targetlist) != 1)
return;
tle_expr = castNode(TargetEntry, linitial(plan->targetlist))->expr;
if (IsA(plan, Gather))
{
if (plan->righttree != NULL ||
plan->initPlan != NULL ||
plan->qual != NULL)
return;
/* If setrefs.c copied up a Const, no need to look further */
if (IsA(tle_expr, Const))
break;
/* Otherwise, it had better be a Param or an outer Var */
if (!IsA(tle_expr, Param) && !(IsA(tle_expr, Var) &&
((Var *) tle_expr)->varno == OUTER_VAR))
return;
/* Descend to the child node */
plan = plan->lefttree;
continue;
}
else if (!IsA(plan, Result))
return;
break;
}
/*
* 4. Can't have any subplan or qual clause, either
*/
if (plan->lefttree != NULL ||
plan->righttree != NULL ||
@ -6856,31 +6889,23 @@ exec_simple_recheck_plan(PLpgSQL_expr *expr, CachedPlan *cplan)
((Result *) plan)->resconstantqual != NULL)
return;
/*
* 4. The plan must have a single attribute as result
*/
if (list_length(plan->targetlist) != 1)
return;
tle = (TargetEntry *) linitial(plan->targetlist);
/*
* 5. Check that all the nodes in the expression are non-scary.
*/
if (!exec_simple_check_node((Node *) tle->expr))
if (!exec_simple_check_node((Node *) tle_expr))
return;
/*
* Yes - this is a simple expression. Mark it as such, and initialize
* state to "not valid in current transaction".
*/
expr->expr_simple_expr = tle->expr;
expr->expr_simple_expr = tle_expr;
expr->expr_simple_state = NULL;
expr->expr_simple_in_use = false;
expr->expr_simple_lxid = InvalidLocalTransactionId;
/* Also stash away the expression result type */
expr->expr_simple_type = exprType((Node *) tle->expr);
expr->expr_simple_typmod = exprTypmod((Node *) tle->expr);
expr->expr_simple_type = exprType((Node *) tle_expr);
expr->expr_simple_typmod = exprTypmod((Node *) tle_expr);
}
/*

View File

@ -101,6 +101,26 @@ explain (costs off)
-> Parallel Index Only Scan using tenk1_unique1 on tenk1
(5 rows)
-- test prepared statement
prepare tenk1_count(integer) As select count((unique1)) from tenk1 where hundred > $1;
explain (costs off) execute tenk1_count(1);
QUERY PLAN
----------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 4
-> Partial Aggregate
-> Parallel Seq Scan on tenk1
Filter: (hundred > 1)
(6 rows)
execute tenk1_count(1);
count
-------
9800
(1 row)
deallocate tenk1_count;
-- test parallel plans for queries containing un-correlated subplans.
alter table tenk2 set (parallel_workers = 0);
explain (costs off)

View File

@ -39,6 +39,12 @@ explain (costs off)
select sum(parallel_restricted(unique1)) from tenk1
group by(parallel_restricted(unique1));
-- test prepared statement
prepare tenk1_count(integer) As select count((unique1)) from tenk1 where hundred > $1;
explain (costs off) execute tenk1_count(1);
execute tenk1_count(1);
deallocate tenk1_count;
-- test parallel plans for queries containing un-correlated subplans.
alter table tenk2 set (parallel_workers = 0);
explain (costs off)