1
0
mirror of https://github.com/postgres/postgres.git synced 2025-09-02 04:21:28 +03:00

Move per-agg and per-trans duplicate finding to the planner.

This has the advantage that the cost estimates for aggregates can count
the number of calls to transition and final functions correctly.

Bump catalog version, because views can contain Aggrefs.

Reviewed-by: Andres Freund
Discussion: https://www.postgresql.org/message-id/b2e3536b-1dbc-8303-c97e-89cb0b4a9a48%40iki.fi
This commit is contained in:
Heikki Linnakangas
2020-11-24 10:45:00 +02:00
parent e522024bd8
commit 0a2bc5d61e
29 changed files with 955 additions and 747 deletions

View File

@@ -53,14 +53,6 @@
#include "utils/syscache.h"
#include "utils/typcache.h"
typedef struct
{
PlannerInfo *root;
AggSplit aggsplit;
AggClauseCosts *costs;
} get_agg_clause_costs_context;
typedef struct
{
ParamListInfo boundParams;
@@ -98,8 +90,6 @@ typedef struct
} max_parallel_hazard_context;
static bool contain_agg_clause_walker(Node *node, void *context);
static bool get_agg_clause_costs_walker(Node *node,
get_agg_clause_costs_context *context);
static bool find_window_functions_walker(Node *node, WindowFuncLists *lists);
static bool contain_subplans_walker(Node *node, void *context);
static bool contain_mutable_functions_walker(Node *node, void *context);
@@ -200,284 +190,6 @@ contain_agg_clause_walker(Node *node, void *context)
return expression_tree_walker(node, contain_agg_clause_walker, context);
}
/*
* get_agg_clause_costs
* Recursively find the Aggref nodes in an expression tree, and
* accumulate cost information about them.
*
* 'aggsplit' tells us the expected partial-aggregation mode, which affects
* the cost estimates.
*
* NOTE that the counts/costs are ADDED to those already in *costs ... so
* the caller is responsible for zeroing the struct initially.
*
* We count the nodes, estimate their execution costs, and estimate the total
* space needed for their transition state values if all are evaluated in
* parallel (as would be done in a HashAgg plan). Also, we check whether
* partial aggregation is feasible. See AggClauseCosts for the exact set
* of statistics collected.
*
* In addition, we mark Aggref nodes with the correct aggtranstype, so
* that that doesn't need to be done repeatedly. (That makes this function's
* name a bit of a misnomer.)
*
* This does not descend into subqueries, and so should be used only after
* reduction of sublinks to subplans, or in contexts where it's known there
* are no subqueries. There mustn't be outer-aggregate references either.
*/
void
get_agg_clause_costs(PlannerInfo *root, Node *clause, AggSplit aggsplit,
AggClauseCosts *costs)
{
get_agg_clause_costs_context context;
context.root = root;
context.aggsplit = aggsplit;
context.costs = costs;
(void) get_agg_clause_costs_walker(clause, &context);
}
static bool
get_agg_clause_costs_walker(Node *node, get_agg_clause_costs_context *context)
{
if (node == NULL)
return false;
if (IsA(node, Aggref))
{
Aggref *aggref = (Aggref *) node;
AggClauseCosts *costs = context->costs;
HeapTuple aggTuple;
Form_pg_aggregate aggform;
Oid aggtransfn;
Oid aggfinalfn;
Oid aggcombinefn;
Oid aggserialfn;
Oid aggdeserialfn;
Oid aggtranstype;
int32 aggtransspace;
QualCost argcosts;
Assert(aggref->agglevelsup == 0);
/*
* Fetch info about aggregate from pg_aggregate. Note it's correct to
* ignore the moving-aggregate variant, since what we're concerned
* with here is aggregates not window functions.
*/
aggTuple = SearchSysCache1(AGGFNOID,
ObjectIdGetDatum(aggref->aggfnoid));
if (!HeapTupleIsValid(aggTuple))
elog(ERROR, "cache lookup failed for aggregate %u",
aggref->aggfnoid);
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
aggtransfn = aggform->aggtransfn;
aggfinalfn = aggform->aggfinalfn;
aggcombinefn = aggform->aggcombinefn;
aggserialfn = aggform->aggserialfn;
aggdeserialfn = aggform->aggdeserialfn;
aggtranstype = aggform->aggtranstype;
aggtransspace = aggform->aggtransspace;
ReleaseSysCache(aggTuple);
/*
* Resolve the possibly-polymorphic aggregate transition type, unless
* already done in a previous pass over the expression.
*/
if (OidIsValid(aggref->aggtranstype))
aggtranstype = aggref->aggtranstype;
else
{
Oid inputTypes[FUNC_MAX_ARGS];
int numArguments;
/* extract argument types (ignoring any ORDER BY expressions) */
numArguments = get_aggregate_argtypes(aggref, inputTypes);
/* resolve actual type of transition state, if polymorphic */
aggtranstype = resolve_aggregate_transtype(aggref->aggfnoid,
aggtranstype,
inputTypes,
numArguments);
aggref->aggtranstype = aggtranstype;
}
/*
* Count it, and check for cases requiring ordered input. Note that
* ordered-set aggs always have nonempty aggorder. Any ordered-input
* case also defeats partial aggregation.
*/
costs->numAggs++;
if (aggref->aggorder != NIL || aggref->aggdistinct != NIL)
{
costs->numOrderedAggs++;
costs->hasNonPartial = true;
}
/*
* Check whether partial aggregation is feasible, unless we already
* found out that we can't do it.
*/
if (!costs->hasNonPartial)
{
/*
* If there is no combine function, then partial aggregation is
* not possible.
*/
if (!OidIsValid(aggcombinefn))
costs->hasNonPartial = true;
/*
* If we have any aggs with transtype INTERNAL then we must check
* whether they have serialization/deserialization functions; if
* not, we can't serialize partial-aggregation results.
*/
else if (aggtranstype == INTERNALOID &&
(!OidIsValid(aggserialfn) || !OidIsValid(aggdeserialfn)))
costs->hasNonSerial = true;
}
/*
* Add the appropriate component function execution costs to
* appropriate totals.
*/
if (DO_AGGSPLIT_COMBINE(context->aggsplit))
{
/* charge for combining previously aggregated states */
add_function_cost(context->root, aggcombinefn, NULL,
&costs->transCost);
}
else
add_function_cost(context->root, aggtransfn, NULL,
&costs->transCost);
if (DO_AGGSPLIT_DESERIALIZE(context->aggsplit) &&
OidIsValid(aggdeserialfn))
add_function_cost(context->root, aggdeserialfn, NULL,
&costs->transCost);
if (DO_AGGSPLIT_SERIALIZE(context->aggsplit) &&
OidIsValid(aggserialfn))
add_function_cost(context->root, aggserialfn, NULL,
&costs->finalCost);
if (!DO_AGGSPLIT_SKIPFINAL(context->aggsplit) &&
OidIsValid(aggfinalfn))
add_function_cost(context->root, aggfinalfn, NULL,
&costs->finalCost);
/*
* These costs are incurred only by the initial aggregate node, so we
* mustn't include them again at upper levels.
*/
if (!DO_AGGSPLIT_COMBINE(context->aggsplit))
{
/* add the input expressions' cost to per-input-row costs */
cost_qual_eval_node(&argcosts, (Node *) aggref->args, context->root);
costs->transCost.startup += argcosts.startup;
costs->transCost.per_tuple += argcosts.per_tuple;
/*
* Add any filter's cost to per-input-row costs.
*
* XXX Ideally we should reduce input expression costs according
* to filter selectivity, but it's not clear it's worth the
* trouble.
*/
if (aggref->aggfilter)
{
cost_qual_eval_node(&argcosts, (Node *) aggref->aggfilter,
context->root);
costs->transCost.startup += argcosts.startup;
costs->transCost.per_tuple += argcosts.per_tuple;
}
}
/*
* If there are direct arguments, treat their evaluation cost like the
* cost of the finalfn.
*/
if (aggref->aggdirectargs)
{
cost_qual_eval_node(&argcosts, (Node *) aggref->aggdirectargs,
context->root);
costs->finalCost.startup += argcosts.startup;
costs->finalCost.per_tuple += argcosts.per_tuple;
}
/*
* If the transition type is pass-by-value then it doesn't add
* anything to the required size of the hashtable. If it is
* pass-by-reference then we have to add the estimated size of the
* value itself, plus palloc overhead.
*/
if (!get_typbyval(aggtranstype))
{
int32 avgwidth;
/* Use average width if aggregate definition gave one */
if (aggtransspace > 0)
avgwidth = aggtransspace;
else if (aggtransfn == F_ARRAY_APPEND)
{
/*
* If the transition function is array_append(), it'll use an
* expanded array as transvalue, which will occupy at least
* ALLOCSET_SMALL_INITSIZE and possibly more. Use that as the
* estimate for lack of a better idea.
*/
avgwidth = ALLOCSET_SMALL_INITSIZE;
}
else
{
/*
* If transition state is of same type as first aggregated
* input, assume it's the same typmod (same width) as well.
* This works for cases like MAX/MIN and is probably somewhat
* reasonable otherwise.
*/
int32 aggtranstypmod = -1;
if (aggref->args)
{
TargetEntry *tle = (TargetEntry *) linitial(aggref->args);
if (aggtranstype == exprType((Node *) tle->expr))
aggtranstypmod = exprTypmod((Node *) tle->expr);
}
avgwidth = get_typavgwidth(aggtranstype, aggtranstypmod);
}
avgwidth = MAXALIGN(avgwidth);
costs->transitionSpace += avgwidth + 2 * sizeof(void *);
}
else if (aggtranstype == INTERNALOID)
{
/*
* INTERNAL transition type is a special case: although INTERNAL
* is pass-by-value, it's almost certainly being used as a pointer
* to some large data structure. The aggregate definition can
* provide an estimate of the size. If it doesn't, then we assume
* ALLOCSET_DEFAULT_INITSIZE, which is a good guess if the data is
* being kept in a private memory context, as is done by
* array_agg() for instance.
*/
if (aggtransspace > 0)
costs->transitionSpace += aggtransspace;
else
costs->transitionSpace += ALLOCSET_DEFAULT_INITSIZE;
}
/*
* We assume that the parser checked that there are no aggregates (of
* this level anyway) in the aggregated arguments, direct arguments,
* or filter clause. Hence, we need not recurse into any of them.
*/
return false;
}
Assert(!IsA(node, SubLink));
return expression_tree_walker(node, get_agg_clause_costs_walker,
(void *) context);
}
/*****************************************************************************
* Window-function clause manipulation
*****************************************************************************/