1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-24 01:29:19 +03:00

Support ordered-set (WITHIN GROUP) aggregates.

This patch introduces generic support for ordered-set and hypothetical-set
aggregate functions, as well as implementations of the instances defined in
SQL:2008 (percentile_cont(), percentile_disc(), rank(), dense_rank(),
percent_rank(), cume_dist()).  We also added mode() though it is not in the
spec, as well as versions of percentile_cont() and percentile_disc() that
can compute multiple percentile values in one pass over the data.

Unlike the original submission, this patch puts full control of the sorting
process in the hands of the aggregate's support functions.  To allow the
support functions to find out how they're supposed to sort, a new API
function AggGetAggref() is added to nodeAgg.c.  This allows retrieval of
the aggregate call's Aggref node, which may have other uses beyond the
immediate need.  There is also support for ordered-set aggregates to
install cleanup callback functions, so that they can be sure that
infrastructure such as tuplesort objects gets cleaned up.

In passing, make some fixes in the recently-added support for variadic
aggregates, and make some editorial adjustments in the recent FILTER
additions for aggregates.  Also, simplify use of IsBinaryCoercible() by
allowing it to succeed whenever the target type is ANY or ANYELEMENT.
It was inconsistent that it dealt with other polymorphic target types
but not these.

Atri Sharma and Andrew Gierth; reviewed by Pavel Stehule and Vik Fearing,
and rather heavily editorialized upon by Tom Lane
This commit is contained in:
Tom Lane
2013-12-23 16:11:35 -05:00
parent 37484ad2aa
commit 8d65da1f01
64 changed files with 4686 additions and 755 deletions

View File

@@ -14,16 +14,20 @@
*/
#include "postgres.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_clause.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parsetree.h"
#include "rewrite/rewriteManip.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
typedef struct
@@ -42,9 +46,13 @@ typedef struct
bool have_non_var_grouping;
List **func_grouped_rels;
int sublevels_up;
bool in_agg_direct_args;
} check_ungrouped_columns_context;
static int check_agg_arguments(ParseState *pstate, List *args, Expr *filter);
static int check_agg_arguments(ParseState *pstate,
List *directargs,
List *args,
Expr *filter);
static bool check_agg_arguments_walker(Node *node,
check_agg_arguments_context *context);
static void check_ungrouped_columns(Node *node, ParseState *pstate, Query *qry,
@@ -59,15 +67,21 @@ static bool check_ungrouped_columns_walker(Node *node,
* Finish initial transformation of an aggregate call
*
* parse_func.c has recognized the function as an aggregate, and has set up
* all the fields of the Aggref except args, aggorder, aggdistinct and
* agglevelsup. The passed-in args list has been through standard expression
* transformation, while the passed-in aggorder list hasn't been transformed
* at all.
* all the fields of the Aggref except aggdirectargs, args, aggorder,
* aggdistinct and agglevelsup. The passed-in args list has been through
* standard expression transformation and type coercion to match the agg's
* declared arg types, while the passed-in aggorder list hasn't been
* transformed at all.
*
* Here we convert the args list into a targetlist by inserting TargetEntry
* nodes, and then transform the aggorder and agg_distinct specifications to
* produce lists of SortGroupClause nodes. (That might also result in adding
* resjunk expressions to the targetlist.)
* Here we separate the args list into direct and aggregated args, storing the
* former in agg->aggdirectargs and the latter in agg->args. The regular
* args, but not the direct args, are converted into a targetlist by inserting
* TargetEntry nodes. We then transform the aggorder and agg_distinct
* specifications to produce lists of SortGroupClause nodes for agg->aggorder
* and agg->aggdistinct. (For a regular aggregate, this might result in
* adding resjunk expressions to the targetlist; but for ordered-set
* aggregates the aggorder list will always be one-to-one with the aggregated
* args.)
*
* We must also determine which query level the aggregate actually belongs to,
* set agglevelsup accordingly, and mark p_hasAggs true in the corresponding
@@ -77,76 +91,122 @@ void
transformAggregateCall(ParseState *pstate, Aggref *agg,
List *args, List *aggorder, bool agg_distinct)
{
List *tlist;
List *torder;
List *tlist = NIL;
List *torder = NIL;
List *tdistinct = NIL;
AttrNumber attno;
AttrNumber attno = 1;
int save_next_resno;
int min_varlevel;
ListCell *lc;
const char *err;
bool errkind;
/*
* Transform the plain list of Exprs into a targetlist. We don't bother
* to assign column names to the entries.
*/
tlist = NIL;
attno = 1;
foreach(lc, args)
if (AGGKIND_IS_ORDERED_SET(agg->aggkind))
{
Expr *arg = (Expr *) lfirst(lc);
TargetEntry *tle = makeTargetEntry(arg, attno++, NULL, false);
/*
* For an ordered-set agg, the args list includes direct args and
* aggregated args; we must split them apart.
*/
int numDirectArgs = list_length(args) - list_length(aggorder);
List *aargs;
ListCell *lc2;
tlist = lappend(tlist, tle);
}
Assert(numDirectArgs >= 0);
/*
* If we have an ORDER BY, transform it. This will add columns to the
* tlist if they appear in ORDER BY but weren't already in the arg list.
* They will be marked resjunk = true so we can tell them apart from
* regular aggregate arguments later.
*
* We need to mess with p_next_resno since it will be used to number any
* new targetlist entries.
*/
save_next_resno = pstate->p_next_resno;
pstate->p_next_resno = attno;
torder = transformSortClause(pstate,
aggorder,
&tlist,
EXPR_KIND_ORDER_BY,
true /* fix unknowns */ ,
true /* force SQL99 rules */ );
/*
* If we have DISTINCT, transform that to produce a distinctList.
*/
if (agg_distinct)
{
tdistinct = transformDistinctClause(pstate, &tlist, torder, true);
aargs = list_copy_tail(args, numDirectArgs);
agg->aggdirectargs = list_truncate(args, numDirectArgs);
/*
* Remove this check if executor support for hashed distinct for
* aggregates is ever added.
* Build a tlist from the aggregated args, and make a sortlist entry
* for each one. Note that the expressions in the SortBy nodes are
* ignored (they are the raw versions of the transformed args); we are
* just looking at the sort information in the SortBy nodes.
*/
foreach(lc, tdistinct)
forboth(lc, aargs, lc2, aggorder)
{
SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
Expr *arg = (Expr *) lfirst(lc);
SortBy *sortby = (SortBy *) lfirst(lc2);
TargetEntry *tle;
if (!OidIsValid(sortcl->sortop))
/* We don't bother to assign column names to the entries */
tle = makeTargetEntry(arg, attno++, NULL, false);
tlist = lappend(tlist, tle);
torder = addTargetToSortList(pstate, tle,
torder, tlist, sortby,
true /* fix unknowns */ );
}
/* Never any DISTINCT in an ordered-set agg */
Assert(!agg_distinct);
}
else
{
/* Regular aggregate, so it has no direct args */
agg->aggdirectargs = NIL;
/*
* Transform the plain list of Exprs into a targetlist.
*/
foreach(lc, args)
{
Expr *arg = (Expr *) lfirst(lc);
TargetEntry *tle;
/* We don't bother to assign column names to the entries */
tle = makeTargetEntry(arg, attno++, NULL, false);
tlist = lappend(tlist, tle);
}
/*
* If we have an ORDER BY, transform it. This will add columns to the
* tlist if they appear in ORDER BY but weren't already in the arg
* list. They will be marked resjunk = true so we can tell them apart
* from regular aggregate arguments later.
*
* We need to mess with p_next_resno since it will be used to number
* any new targetlist entries.
*/
save_next_resno = pstate->p_next_resno;
pstate->p_next_resno = attno;
torder = transformSortClause(pstate,
aggorder,
&tlist,
EXPR_KIND_ORDER_BY,
true /* fix unknowns */ ,
true /* force SQL99 rules */ );
/*
* If we have DISTINCT, transform that to produce a distinctList.
*/
if (agg_distinct)
{
tdistinct = transformDistinctClause(pstate, &tlist, torder, true);
/*
* Remove this check if executor support for hashed distinct for
* aggregates is ever added.
*/
foreach(lc, tdistinct)
{
Node *expr = get_sortgroupclause_expr(sortcl, tlist);
SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("could not identify an ordering operator for type %s",
format_type_be(exprType(expr))),
errdetail("Aggregates with DISTINCT must be able to sort their inputs."),
parser_errposition(pstate, exprLocation(expr))));
if (!OidIsValid(sortcl->sortop))
{
Node *expr = get_sortgroupclause_expr(sortcl, tlist);
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("could not identify an ordering operator for type %s",
format_type_be(exprType(expr))),
errdetail("Aggregates with DISTINCT must be able to sort their inputs."),
parser_errposition(pstate, exprLocation(expr))));
}
}
}
pstate->p_next_resno = save_next_resno;
}
/* Update the Aggref with the transformation results */
@@ -154,13 +214,14 @@ transformAggregateCall(ParseState *pstate, Aggref *agg,
agg->aggorder = torder;
agg->aggdistinct = tdistinct;
pstate->p_next_resno = save_next_resno;
/*
* Check the arguments to compute the aggregate's level and detect
* improper nesting.
*/
min_varlevel = check_agg_arguments(pstate, agg->args, agg->aggfilter);
min_varlevel = check_agg_arguments(pstate,
agg->aggdirectargs,
agg->args,
agg->aggfilter);
agg->agglevelsup = min_varlevel;
/* Mark the correct pstate level as having aggregates */
@@ -302,8 +363,17 @@ transformAggregateCall(ParseState *pstate, Aggref *agg,
* one is its parent, etc).
*
* The aggregate's level is the same as the level of the lowest-level variable
* or aggregate in its arguments or filter expression; or if it contains no
* variables at all, we presume it to be local.
* or aggregate in its aggregated arguments (including any ORDER BY columns)
* or filter expression; or if it contains no variables at all, we presume it
* to be local.
*
* Vars/Aggs in direct arguments are *not* counted towards determining the
* agg's level, as those arguments aren't evaluated per-row but only
* per-group, and so in some sense aren't really agg arguments. However,
* this can mean that we decide an agg is upper-level even when its direct
* args contain lower-level Vars/Aggs, and that case has to be disallowed.
* (This is a little strange, but the SQL standard seems pretty definite that
* direct args are not to be considered when setting the agg's level.)
*
* We also take this opportunity to detect any aggregates or window functions
* nested within the arguments. We can throw error immediately if we find
@@ -312,7 +382,10 @@ transformAggregateCall(ParseState *pstate, Aggref *agg,
* which we can't know until we finish scanning the arguments.
*/
static int
check_agg_arguments(ParseState *pstate, List *args, Expr *filter)
check_agg_arguments(ParseState *pstate,
List *directargs,
List *args,
Expr *filter)
{
int agglevel;
check_agg_arguments_context context;
@@ -337,8 +410,9 @@ check_agg_arguments(ParseState *pstate, List *args, Expr *filter)
if (context.min_varlevel < 0)
{
if (context.min_agglevel < 0)
return 0;
agglevel = context.min_agglevel;
agglevel = 0;
else
agglevel = context.min_agglevel;
}
else if (context.min_agglevel < 0)
agglevel = context.min_varlevel;
@@ -349,12 +423,49 @@ check_agg_arguments(ParseState *pstate, List *args, Expr *filter)
* If there's a nested aggregate of the same semantic level, complain.
*/
if (agglevel == context.min_agglevel)
{
int aggloc;
aggloc = locate_agg_of_level((Node *) args, agglevel);
if (aggloc < 0)
aggloc = locate_agg_of_level((Node *) filter, agglevel);
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("aggregate function calls cannot be nested"),
parser_errposition(pstate,
locate_agg_of_level((Node *) args,
agglevel))));
parser_errposition(pstate, aggloc)));
}
/*
* Now check for vars/aggs in the direct arguments, and throw error if
* needed. Note that we allow a Var of the agg's semantic level, but not
* an Agg of that level. In principle such Aggs could probably be
* supported, but it would create an ordering dependency among the
* aggregates at execution time. Since the case appears neither to be
* required by spec nor particularly useful, we just treat it as a
* nested-aggregate situation.
*/
if (directargs)
{
context.min_varlevel = -1;
context.min_agglevel = -1;
(void) expression_tree_walker((Node *) directargs,
check_agg_arguments_walker,
(void *) &context);
if (context.min_varlevel >= 0 && context.min_varlevel < agglevel)
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("outer-level aggregate cannot contain a lower-level variable in its direct arguments"),
parser_errposition(pstate,
locate_var_of_level((Node *) directargs,
context.min_varlevel))));
if (context.min_agglevel >= 0 && context.min_agglevel <= agglevel)
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("aggregate function calls cannot be nested"),
parser_errposition(pstate,
locate_agg_of_level((Node *) directargs,
context.min_agglevel))));
}
return agglevel;
}
@@ -442,6 +553,10 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
/*
* A window function call can't contain another one (but aggs are OK). XXX
* is this required by spec, or just an unimplemented feature?
*
* Note: we don't need to check the filter expression here, because the
* context checks done below and in transformAggregateCall would have
* already rejected any window funcs or aggs within the filter.
*/
if (pstate->p_hasWindowFuncs &&
contain_windowfuncs((Node *) wfunc->args))
@@ -800,6 +915,7 @@ check_ungrouped_columns(Node *node, ParseState *pstate, Query *qry,
context.have_non_var_grouping = have_non_var_grouping;
context.func_grouped_rels = func_grouped_rels;
context.sublevels_up = 0;
context.in_agg_direct_args = false;
check_ungrouped_columns_walker(node, &context);
}
@@ -815,16 +931,39 @@ check_ungrouped_columns_walker(Node *node,
IsA(node, Param))
return false; /* constants are always acceptable */
/*
* If we find an aggregate call of the original level, do not recurse into
* its arguments or filter; ungrouped vars there are not an error. We can
* also skip looking at aggregates of higher levels, since they could not
* possibly contain Vars of concern to us (see transformAggregateCall).
* We do need to look at aggregates of lower levels, however.
*/
if (IsA(node, Aggref) &&
(int) ((Aggref *) node)->agglevelsup >= context->sublevels_up)
return false;
if (IsA(node, Aggref))
{
Aggref *agg = (Aggref *) node;
if ((int) agg->agglevelsup == context->sublevels_up)
{
/*
* If we find an aggregate call of the original level, do not
* recurse into its normal arguments, ORDER BY arguments, or
* filter; ungrouped vars there are not an error. But we should
* check direct arguments as though they weren't in an aggregate.
* We set a special flag in the context to help produce a useful
* error message for ungrouped vars in direct arguments.
*/
bool result;
Assert(!context->in_agg_direct_args);
context->in_agg_direct_args = true;
result = check_ungrouped_columns_walker((Node *) agg->aggdirectargs,
context);
context->in_agg_direct_args = false;
return result;
}
/*
* We can skip recursing into aggregates of higher levels altogether,
* since they could not possibly contain Vars of concern to us (see
* transformAggregateCall). We do need to look at aggregates of lower
* levels, however.
*/
if ((int) agg->agglevelsup > context->sublevels_up)
return false;
}
/*
* If we have any GROUP BY items that are not simple Vars, check to see if
@@ -917,6 +1056,8 @@ check_ungrouped_columns_walker(Node *node,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("column \"%s.%s\" must appear in the GROUP BY clause or be used in an aggregate function",
rte->eref->aliasname, attname),
context->in_agg_direct_args ?
errdetail("Direct arguments of an ordered-set aggregate must use only grouped columns.") : 0,
parser_errposition(context->pstate, var->location)));
else
ereport(ERROR,
@@ -943,6 +1084,93 @@ check_ungrouped_columns_walker(Node *node,
(void *) context);
}
/*
* get_aggregate_argtypes
* Identify the specific datatypes passed to an aggregate call.
*
* Given an Aggref, extract the actual datatypes of the input arguments.
* The input datatypes are reported in a way that matches up with the
* aggregate's declaration, ie, any ORDER BY columns attached to a plain
* aggregate are ignored, but we report both direct and aggregated args of
* an ordered-set aggregate.
*
* Datatypes are returned into inputTypes[], which must reference an array
* of length FUNC_MAX_ARGS.
*
* The function result is the number of actual arguments.
*/
int
get_aggregate_argtypes(Aggref *aggref, Oid *inputTypes)
{
int numArguments = 0;
ListCell *lc;
/* Any direct arguments of an ordered-set aggregate come first */
foreach(lc, aggref->aggdirectargs)
{
Node *expr = (Node *) lfirst(lc);
inputTypes[numArguments] = exprType(expr);
numArguments++;
}
/* Now get the regular (aggregated) arguments */
foreach(lc, aggref->args)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
/* Ignore ordering columns of a plain aggregate */
if (tle->resjunk)
continue;
inputTypes[numArguments] = exprType((Node *) tle->expr);
numArguments++;
}
return numArguments;
}
/*
* resolve_aggregate_transtype
* Identify the transition state value's datatype for an aggregate call.
*
* This function resolves a polymorphic aggregate's state datatype.
* It must be passed the aggtranstype from the aggregate's catalog entry,
* as well as the actual argument types extracted by get_aggregate_argtypes.
* (We could fetch these values internally, but for all existing callers that
* would just duplicate work the caller has to do too, so we pass them in.)
*/
Oid
resolve_aggregate_transtype(Oid aggfuncid,
Oid aggtranstype,
Oid *inputTypes,
int numArguments)
{
/* resolve actual type of transition state, if polymorphic */
if (IsPolymorphicType(aggtranstype))
{
/* have to fetch the agg's declared input types... */
Oid *declaredArgTypes;
int agg_nargs;
(void) get_func_signature(aggfuncid, &declaredArgTypes, &agg_nargs);
/*
* VARIADIC ANY aggs could have more actual than declared args, but
* such extra args can't affect polymorphic type resolution.
*/
Assert(agg_nargs <= numArguments);
aggtranstype = enforce_generic_type_consistency(inputTypes,
declaredArgTypes,
agg_nargs,
aggtranstype,
false);
pfree(declaredArgTypes);
}
return aggtranstype;
}
/*
* Create expression trees for the transition and final functions
* of an aggregate. These are needed so that polymorphic functions
@@ -956,6 +1184,9 @@ check_ungrouped_columns_walker(Node *node,
* resolved to actual types (ie, none should ever be ANYELEMENT etc).
* agg_input_collation is the aggregate function's input collation.
*
* For an ordered-set aggregate, remember that agg_input_types describes
* the direct arguments followed by the aggregated arguments.
*
* transfn_oid and finalfn_oid identify the funcs to be called; the latter
* may be InvalidOid.
*
@@ -965,6 +1196,8 @@ check_ungrouped_columns_walker(Node *node,
void
build_aggregate_fnexprs(Oid *agg_input_types,
int agg_num_inputs,
int agg_num_direct_inputs,
bool agg_ordered_set,
bool agg_variadic,
Oid agg_state_type,
Oid agg_result_type,
@@ -995,7 +1228,7 @@ build_aggregate_fnexprs(Oid *agg_input_types,
args = list_make1(argp);
for (i = 0; i < agg_num_inputs; i++)
for (i = agg_num_direct_inputs; i < agg_num_inputs; i++)
{
argp = makeNode(Param);
argp->paramkind = PARAM_EXEC;
@@ -1035,10 +1268,26 @@ build_aggregate_fnexprs(Oid *agg_input_types,
argp->location = -1;
args = list_make1(argp);
if (agg_ordered_set)
{
for (i = 0; i < agg_num_inputs; i++)
{
argp = makeNode(Param);
argp->paramkind = PARAM_EXEC;
argp->paramid = -1;
argp->paramtype = agg_input_types[i];
argp->paramtypmod = -1;
argp->paramcollid = agg_input_collation;
argp->location = -1;
args = lappend(args, argp);
}
}
*finalfnexpr = (Expr *) makeFuncExpr(finalfn_oid,
agg_result_type,
args,
InvalidOid,
agg_input_collation,
COERCE_EXPLICIT_CALL);
/* finalfn is currently never treated as variadic */
}