1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Support parallel aggregation.

Parallel workers can now partially aggregate the data and pass the
transition values back to the leader, which can combine the partial
results to produce the final answer.

David Rowley, based on earlier work by Haribabu Kommi.  Reviewed by
Álvaro Herrera, Tomas Vondra, Amit Kapila, James Sewell, and me.
This commit is contained in:
Robert Haas
2016-03-21 09:20:53 -04:00
parent 7fa0064092
commit e06a38965b
23 changed files with 910 additions and 82 deletions

View File

@ -104,6 +104,8 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context);
static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context);
static void set_join_references(PlannerInfo *root, Join *join, int rtoffset);
static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset);
static void set_combineagg_references(PlannerInfo *root, Plan *plan,
int rtoffset);
static void set_dummy_tlist_references(Plan *plan, int rtoffset);
static indexed_tlist *build_tlist_index(List *tlist);
static Var *search_indexed_tlist_for_var(Var *var,
@ -117,6 +119,8 @@ static Var *search_indexed_tlist_for_sortgroupref(Node *node,
Index sortgroupref,
indexed_tlist *itlist,
Index newvarno);
static Var *search_indexed_tlist_for_partial_aggref(Aggref *aggref,
indexed_tlist *itlist, Index newvarno);
static List *fix_join_expr(PlannerInfo *root,
List *clauses,
indexed_tlist *outer_itlist,
@ -131,6 +135,13 @@ static Node *fix_upper_expr(PlannerInfo *root,
int rtoffset);
static Node *fix_upper_expr_mutator(Node *node,
fix_upper_expr_context *context);
static Node *fix_combine_agg_expr(PlannerInfo *root,
Node *node,
indexed_tlist *subplan_itlist,
Index newvarno,
int rtoffset);
static Node *fix_combine_agg_expr_mutator(Node *node,
fix_upper_expr_context *context);
static List *set_returning_clause_references(PlannerInfo *root,
List *rlist,
Plan *topplan,
@ -667,8 +678,16 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
}
break;
case T_Agg:
set_upper_references(root, plan, rtoffset);
break;
{
Agg *aggplan = (Agg *) plan;
if (aggplan->combineStates)
set_combineagg_references(root, plan, rtoffset);
else
set_upper_references(root, plan, rtoffset);
break;
}
case T_Group:
set_upper_references(root, plan, rtoffset);
break;
@ -1701,6 +1720,73 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset)
pfree(subplan_itlist);
}
/*
* set_combineagg_references
* This serves the same function as set_upper_references(), but treats
* Aggrefs differently. Here we transform Aggref nodes args to suit the
* combine aggregate phase. This means that the Aggref->args are converted
* to reference the corresponding aggregate function in the subplan rather
* than simple Var(s), as would be the case for a non-combine aggregate
* node.
*/
static void
set_combineagg_references(PlannerInfo *root, Plan *plan, int rtoffset)
{
Plan *subplan = plan->lefttree;
indexed_tlist *subplan_itlist;
List *output_targetlist;
ListCell *l;
Assert(IsA(plan, Agg));
Assert(((Agg *) plan)->combineStates);
subplan_itlist = build_tlist_index(subplan->targetlist);
output_targetlist = NIL;
foreach(l, plan->targetlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);
Node *newexpr;
/* If it's a non-Var sort/group item, first try to match by sortref */
if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var))
{
newexpr = (Node *)
search_indexed_tlist_for_sortgroupref((Node *) tle->expr,
tle->ressortgroupref,
subplan_itlist,
OUTER_VAR);
if (!newexpr)
newexpr = fix_combine_agg_expr(root,
(Node *) tle->expr,
subplan_itlist,
OUTER_VAR,
rtoffset);
}
else
newexpr = fix_combine_agg_expr(root,
(Node *) tle->expr,
subplan_itlist,
OUTER_VAR,
rtoffset);
tle = flatCopyTargetEntry(tle);
tle->expr = (Expr *) newexpr;
output_targetlist = lappend(output_targetlist, tle);
}
plan->targetlist = output_targetlist;
plan->qual = (List *)
fix_combine_agg_expr(root,
(Node *) plan->qual,
subplan_itlist,
OUTER_VAR,
rtoffset);
pfree(subplan_itlist);
}
/*
* set_dummy_tlist_references
* Replace the targetlist of an upper-level plan node with a simple
@ -1967,6 +2053,68 @@ search_indexed_tlist_for_sortgroupref(Node *node,
return NULL; /* no match */
}
/*
* search_indexed_tlist_for_partial_aggref - find an Aggref in an indexed tlist
*
* Aggrefs for partial aggregates have their aggoutputtype adjusted to set it
* to the aggregate state's type. This means that a standard equal() comparison
* won't match when comparing an Aggref which is in partial mode with an Aggref
* which is not. Here we manually compare all of the fields apart from
* aggoutputtype.
*/
static Var *
search_indexed_tlist_for_partial_aggref(Aggref *aggref, indexed_tlist *itlist,
Index newvarno)
{
ListCell *lc;
foreach(lc, itlist->tlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
if (IsA(tle->expr, Aggref))
{
Aggref *tlistaggref = (Aggref *) tle->expr;
Var *newvar;
if (aggref->aggfnoid != tlistaggref->aggfnoid)
continue;
if (aggref->aggtype != tlistaggref->aggtype)
continue;
/* ignore aggoutputtype */
if (aggref->aggcollid != tlistaggref->aggcollid)
continue;
if (aggref->inputcollid != tlistaggref->inputcollid)
continue;
if (!equal(aggref->aggdirectargs, tlistaggref->aggdirectargs))
continue;
if (!equal(aggref->args, tlistaggref->args))
continue;
if (!equal(aggref->aggorder, tlistaggref->aggorder))
continue;
if (!equal(aggref->aggdistinct, tlistaggref->aggdistinct))
continue;
if (!equal(aggref->aggfilter, tlistaggref->aggfilter))
continue;
if (aggref->aggstar != tlistaggref->aggstar)
continue;
if (aggref->aggvariadic != tlistaggref->aggvariadic)
continue;
if (aggref->aggkind != tlistaggref->aggkind)
continue;
if (aggref->agglevelsup != tlistaggref->agglevelsup)
continue;
newvar = makeVarFromTargetEntry(newvarno, tle);
newvar->varnoold = 0; /* wasn't ever a plain Var */
newvar->varoattno = 0;
return newvar;
}
}
return NULL;
}
/*
* fix_join_expr
* Create a new set of targetlist entries or join qual clauses by
@ -2237,6 +2385,105 @@ fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context)
(void *) context);
}
/*
* fix_combine_agg_expr
* Like fix_upper_expr() but additionally adjusts the Aggref->args of
* Aggrefs so that they references the corresponding Aggref in the subplan.
*/
static Node *
fix_combine_agg_expr(PlannerInfo *root,
Node *node,
indexed_tlist *subplan_itlist,
Index newvarno,
int rtoffset)
{
fix_upper_expr_context context;
context.root = root;
context.subplan_itlist = subplan_itlist;
context.newvarno = newvarno;
context.rtoffset = rtoffset;
return fix_combine_agg_expr_mutator(node, &context);
}
static Node *
fix_combine_agg_expr_mutator(Node *node, fix_upper_expr_context *context)
{
Var *newvar;
if (node == NULL)
return NULL;
if (IsA(node, Var))
{
Var *var = (Var *) node;
newvar = search_indexed_tlist_for_var(var,
context->subplan_itlist,
context->newvarno,
context->rtoffset);
if (!newvar)
elog(ERROR, "variable not found in subplan target list");
return (Node *) newvar;
}
if (IsA(node, PlaceHolderVar))
{
PlaceHolderVar *phv = (PlaceHolderVar *) node;
/* See if the PlaceHolderVar has bubbled up from a lower plan node */
if (context->subplan_itlist->has_ph_vars)
{
newvar = search_indexed_tlist_for_non_var((Node *) phv,
context->subplan_itlist,
context->newvarno);
if (newvar)
return (Node *) newvar;
}
/* If not supplied by input plan, evaluate the contained expr */
return fix_upper_expr_mutator((Node *) phv->phexpr, context);
}
if (IsA(node, Param))
return fix_param_node(context->root, (Param *) node);
if (IsA(node, Aggref))
{
Aggref *aggref = (Aggref *) node;
newvar = search_indexed_tlist_for_partial_aggref(aggref,
context->subplan_itlist,
context->newvarno);
if (newvar)
{
Aggref *newaggref;
TargetEntry *newtle;
/*
* Now build a new TargetEntry for the Aggref's arguments which is
* a single Var which references the corresponding AggRef in the
* node below.
*/
newtle = makeTargetEntry((Expr *) newvar, 1, NULL, false);
newaggref = (Aggref *) copyObject(aggref);
newaggref->args = list_make1(newtle);
return (Node *) newaggref;
}
else
elog(ERROR, "Aggref not found in subplan target list");
}
/* Try matching more complex expressions too, if tlist has any */
if (context->subplan_itlist->has_non_vars)
{
newvar = search_indexed_tlist_for_non_var(node,
context->subplan_itlist,
context->newvarno);
if (newvar)
return (Node *) newvar;
}
fix_expr_common(context->root, node);
return expression_tree_mutator(node,
fix_combine_agg_expr_mutator,
(void *) context);
}
/*
* set_returning_clause_references
* Perform setrefs.c's work on a RETURNING targetlist