mirror of
https://github.com/postgres/postgres.git
synced 2025-11-22 12:22:45 +03:00
Create infrastructure for moving-aggregate optimization.
Until now, when executing an aggregate function as a window function within a window with moving frame start (that is, any frame start mode except UNBOUNDED PRECEDING), we had to recalculate the aggregate from scratch each time the frame head moved. This patch allows an aggregate definition to include an alternate "moving aggregate" implementation that includes an inverse transition function for removing rows from the aggregate's running state. As long as this can be done successfully, runtime is proportional to the total number of input rows, rather than to the number of input rows times the average frame length. This commit includes the core infrastructure, documentation, and regression tests using user-defined aggregates. Follow-on commits will update some of the built-in aggregates to use this feature. David Rowley and Florian Pflug, reviewed by Dean Rasheed; additional hacking by me
This commit is contained in:
@@ -57,10 +57,16 @@ AggregateCreate(const char *aggName,
|
||||
Oid variadicArgType,
|
||||
List *aggtransfnName,
|
||||
List *aggfinalfnName,
|
||||
List *aggmtransfnName,
|
||||
List *aggminvtransfnName,
|
||||
List *aggmfinalfnName,
|
||||
List *aggsortopName,
|
||||
Oid aggTransType,
|
||||
int32 aggTransSpace,
|
||||
const char *agginitval)
|
||||
Oid aggmTransType,
|
||||
int32 aggmTransSpace,
|
||||
const char *agginitval,
|
||||
const char *aggminitval)
|
||||
{
|
||||
Relation aggdesc;
|
||||
HeapTuple tup;
|
||||
@@ -69,14 +75,19 @@ AggregateCreate(const char *aggName,
|
||||
Form_pg_proc proc;
|
||||
Oid transfn;
|
||||
Oid finalfn = InvalidOid; /* can be omitted */
|
||||
Oid mtransfn = InvalidOid; /* can be omitted */
|
||||
Oid minvtransfn = InvalidOid; /* can be omitted */
|
||||
Oid mfinalfn = InvalidOid; /* can be omitted */
|
||||
Oid sortop = InvalidOid; /* can be omitted */
|
||||
Oid *aggArgTypes = parameterTypes->values;
|
||||
bool hasPolyArg;
|
||||
bool hasInternalArg;
|
||||
bool mtransIsStrict = false;
|
||||
Oid rettype;
|
||||
Oid finaltype;
|
||||
Oid fnArgs[FUNC_MAX_ARGS];
|
||||
int nargs_transfn;
|
||||
int nargs_finalfn;
|
||||
Oid procOid;
|
||||
TupleDesc tupDesc;
|
||||
int i;
|
||||
@@ -128,6 +139,16 @@ AggregateCreate(const char *aggName,
|
||||
errmsg("cannot determine transition data type"),
|
||||
errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument.")));
|
||||
|
||||
/*
|
||||
* Likewise for moving-aggregate transtype, if any
|
||||
*/
|
||||
if (OidIsValid(aggmTransType) &&
|
||||
IsPolymorphicType(aggmTransType) && !hasPolyArg)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
|
||||
errmsg("cannot determine transition data type"),
|
||||
errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument.")));
|
||||
|
||||
/*
|
||||
* An ordered-set aggregate that is VARIADIC must be VARIADIC ANY. In
|
||||
* principle we could support regular variadic types, but it would make
|
||||
@@ -234,32 +255,120 @@ AggregateCreate(const char *aggName,
|
||||
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
|
||||
errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type")));
|
||||
}
|
||||
|
||||
ReleaseSysCache(tup);
|
||||
|
||||
/* handle moving-aggregate transfn, if supplied */
|
||||
if (aggmtransfnName)
|
||||
{
|
||||
/*
|
||||
* The arguments are the same as for the regular transfn, except that
|
||||
* the transition data type might be different. So re-use the fnArgs
|
||||
* values set up above, except for that one.
|
||||
*/
|
||||
Assert(OidIsValid(aggmTransType));
|
||||
fnArgs[0] = aggmTransType;
|
||||
|
||||
mtransfn = lookup_agg_function(aggmtransfnName, nargs_transfn,
|
||||
fnArgs, variadicArgType,
|
||||
&rettype);
|
||||
|
||||
/* As above, return type must exactly match declared mtranstype. */
|
||||
if (rettype != aggmTransType)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATATYPE_MISMATCH),
|
||||
errmsg("return type of transition function %s is not %s",
|
||||
NameListToString(aggmtransfnName),
|
||||
format_type_be(aggmTransType))));
|
||||
|
||||
tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(mtransfn));
|
||||
if (!HeapTupleIsValid(tup))
|
||||
elog(ERROR, "cache lookup failed for function %u", mtransfn);
|
||||
proc = (Form_pg_proc) GETSTRUCT(tup);
|
||||
|
||||
/*
|
||||
* If the mtransfn is strict and the minitval is NULL, check first
|
||||
* input type and mtranstype are binary-compatible.
|
||||
*/
|
||||
if (proc->proisstrict && aggminitval == NULL)
|
||||
{
|
||||
if (numArgs < 1 ||
|
||||
!IsBinaryCoercible(aggArgTypes[0], aggmTransType))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
|
||||
errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type")));
|
||||
}
|
||||
|
||||
/* Remember if mtransfn is strict; we may need this below */
|
||||
mtransIsStrict = proc->proisstrict;
|
||||
|
||||
ReleaseSysCache(tup);
|
||||
}
|
||||
|
||||
/* handle minvtransfn, if supplied */
|
||||
if (aggminvtransfnName)
|
||||
{
|
||||
/*
|
||||
* This must have the same number of arguments with the same types as
|
||||
* the forward transition function, so just re-use the fnArgs data.
|
||||
*/
|
||||
Assert(aggmtransfnName);
|
||||
|
||||
minvtransfn = lookup_agg_function(aggminvtransfnName, nargs_transfn,
|
||||
fnArgs, variadicArgType,
|
||||
&rettype);
|
||||
|
||||
/* As above, return type must exactly match declared mtranstype. */
|
||||
if (rettype != aggmTransType)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATATYPE_MISMATCH),
|
||||
errmsg("return type of inverse transition function %s is not %s",
|
||||
NameListToString(aggminvtransfnName),
|
||||
format_type_be(aggmTransType))));
|
||||
|
||||
tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(minvtransfn));
|
||||
if (!HeapTupleIsValid(tup))
|
||||
elog(ERROR, "cache lookup failed for function %u", minvtransfn);
|
||||
proc = (Form_pg_proc) GETSTRUCT(tup);
|
||||
|
||||
/*
|
||||
* We require the strictness settings of the forward and inverse
|
||||
* transition functions to agree. This saves having to handle
|
||||
* assorted special cases at execution time.
|
||||
*/
|
||||
if (proc->proisstrict != mtransIsStrict)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
|
||||
errmsg("strictness of aggregate's forward and inverse transition functions must match")));
|
||||
|
||||
ReleaseSysCache(tup);
|
||||
}
|
||||
|
||||
/*
|
||||
* Set up fnArgs for looking up finalfn(s)
|
||||
*
|
||||
* For ordinary aggs, the finalfn just takes the transtype. For
|
||||
* ordered-set aggs, it takes the transtype plus all args. (The
|
||||
* aggregated args are useless at runtime, and are actually passed as
|
||||
* NULLs, but we may need them in the function signature to allow
|
||||
* resolution of a polymorphic agg's result type.)
|
||||
*/
|
||||
fnArgs[0] = aggTransType;
|
||||
if (AGGKIND_IS_ORDERED_SET(aggKind))
|
||||
{
|
||||
nargs_finalfn = numArgs + 1;
|
||||
memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid));
|
||||
}
|
||||
else
|
||||
{
|
||||
nargs_finalfn = 1;
|
||||
/* variadic-ness of the aggregate doesn't affect finalfn */
|
||||
variadicArgType = InvalidOid;
|
||||
}
|
||||
|
||||
/* handle finalfn, if supplied */
|
||||
if (aggfinalfnName)
|
||||
{
|
||||
int nargs_finalfn;
|
||||
|
||||
/*
|
||||
* For ordinary aggs, the finalfn just takes the transtype. For
|
||||
* ordered-set aggs, it takes the transtype plus all args. (The
|
||||
* aggregated args are useless at runtime, and are actually passed as
|
||||
* NULLs, but we may need them in the function signature to allow
|
||||
* resolution of a polymorphic agg's result type.)
|
||||
*/
|
||||
fnArgs[0] = aggTransType;
|
||||
if (AGGKIND_IS_ORDERED_SET(aggKind))
|
||||
{
|
||||
nargs_finalfn = numArgs + 1;
|
||||
memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid));
|
||||
}
|
||||
else
|
||||
{
|
||||
nargs_finalfn = 1;
|
||||
/* variadic-ness of the aggregate doesn't affect finalfn */
|
||||
variadicArgType = InvalidOid;
|
||||
}
|
||||
finalfn = lookup_agg_function(aggfinalfnName, nargs_finalfn,
|
||||
fnArgs, variadicArgType,
|
||||
&finaltype);
|
||||
@@ -314,6 +423,49 @@ AggregateCreate(const char *aggName,
|
||||
errmsg("unsafe use of pseudo-type \"internal\""),
|
||||
errdetail("A function returning \"internal\" must have at least one \"internal\" argument.")));
|
||||
|
||||
/*
|
||||
* If a moving-aggregate implementation is supplied, look up its finalfn
|
||||
* if any, and check that the implied aggregate result type matches the
|
||||
* plain implementation.
|
||||
*/
|
||||
if (OidIsValid(aggmTransType))
|
||||
{
|
||||
/* handle finalfn, if supplied */
|
||||
if (aggmfinalfnName)
|
||||
{
|
||||
/*
|
||||
* The arguments are the same as for the regular finalfn, except
|
||||
* that the transition data type might be different. So re-use
|
||||
* the fnArgs values set up above, except for that one.
|
||||
*/
|
||||
fnArgs[0] = aggmTransType;
|
||||
|
||||
mfinalfn = lookup_agg_function(aggmfinalfnName, nargs_finalfn,
|
||||
fnArgs, variadicArgType,
|
||||
&rettype);
|
||||
|
||||
/* As above, check strictness if it's an ordered-set agg */
|
||||
if (AGGKIND_IS_ORDERED_SET(aggKind) && func_strict(mfinalfn))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
|
||||
errmsg("final function of an ordered-set aggregate must not be declared STRICT")));
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* If no finalfn, aggregate result type is type of the state value
|
||||
*/
|
||||
rettype = aggmTransType;
|
||||
}
|
||||
Assert(OidIsValid(rettype));
|
||||
if (rettype != finaltype)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
|
||||
errmsg("moving-aggregate implementation returns type %s, but plain implementation returns type %s",
|
||||
format_type_be(aggmTransType),
|
||||
format_type_be(aggTransType))));
|
||||
}
|
||||
|
||||
/* handle sortop, if supplied */
|
||||
if (aggsortopName)
|
||||
{
|
||||
@@ -340,6 +492,13 @@ AggregateCreate(const char *aggName,
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
aclcheck_error_type(aclresult, aggTransType);
|
||||
|
||||
if (OidIsValid(aggmTransType))
|
||||
{
|
||||
aclresult = pg_type_aclcheck(aggmTransType, GetUserId(), ACL_USAGE);
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
aclcheck_error_type(aclresult, aggmTransType);
|
||||
}
|
||||
|
||||
aclresult = pg_type_aclcheck(finaltype, GetUserId(), ACL_USAGE);
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
aclcheck_error_type(aclresult, finaltype);
|
||||
@@ -392,13 +551,22 @@ AggregateCreate(const char *aggName,
|
||||
values[Anum_pg_aggregate_aggnumdirectargs - 1] = Int16GetDatum(numDirectArgs);
|
||||
values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn);
|
||||
values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn);
|
||||
values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn);
|
||||
values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn);
|
||||
values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
|
||||
values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop);
|
||||
values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType);
|
||||
values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace);
|
||||
values[Anum_pg_aggregate_aggmtranstype - 1] = ObjectIdGetDatum(aggmTransType);
|
||||
values[Anum_pg_aggregate_aggmtransspace - 1] = Int32GetDatum(aggmTransSpace);
|
||||
if (agginitval)
|
||||
values[Anum_pg_aggregate_agginitval - 1] = CStringGetTextDatum(agginitval);
|
||||
else
|
||||
nulls[Anum_pg_aggregate_agginitval - 1] = true;
|
||||
if (aggminitval)
|
||||
values[Anum_pg_aggregate_aggminitval - 1] = CStringGetTextDatum(aggminitval);
|
||||
else
|
||||
nulls[Anum_pg_aggregate_aggminitval - 1] = true;
|
||||
|
||||
aggdesc = heap_open(AggregateRelationId, RowExclusiveLock);
|
||||
tupDesc = aggdesc->rd_att;
|
||||
@@ -414,6 +582,7 @@ AggregateCreate(const char *aggName,
|
||||
* Create dependencies for the aggregate (above and beyond those already
|
||||
* made by ProcedureCreate). Note: we don't need an explicit dependency
|
||||
* on aggTransType since we depend on it indirectly through transfn.
|
||||
* Likewise for aggmTransType if any.
|
||||
*/
|
||||
myself.classId = ProcedureRelationId;
|
||||
myself.objectId = procOid;
|
||||
@@ -434,6 +603,33 @@ AggregateCreate(const char *aggName,
|
||||
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
|
||||
}
|
||||
|
||||
/* Depends on forward transition function, if any */
|
||||
if (OidIsValid(mtransfn))
|
||||
{
|
||||
referenced.classId = ProcedureRelationId;
|
||||
referenced.objectId = mtransfn;
|
||||
referenced.objectSubId = 0;
|
||||
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
|
||||
}
|
||||
|
||||
/* Depends on inverse transition function, if any */
|
||||
if (OidIsValid(minvtransfn))
|
||||
{
|
||||
referenced.classId = ProcedureRelationId;
|
||||
referenced.objectId = minvtransfn;
|
||||
referenced.objectSubId = 0;
|
||||
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
|
||||
}
|
||||
|
||||
/* Depends on final function, if any */
|
||||
if (OidIsValid(mfinalfn))
|
||||
{
|
||||
referenced.classId = ProcedureRelationId;
|
||||
referenced.objectId = mfinalfn;
|
||||
referenced.objectSubId = 0;
|
||||
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
|
||||
}
|
||||
|
||||
/* Depends on sort operator, if any */
|
||||
if (OidIsValid(sortop))
|
||||
{
|
||||
@@ -447,7 +643,12 @@ AggregateCreate(const char *aggName,
|
||||
}
|
||||
|
||||
/*
|
||||
* lookup_agg_function -- common code for finding both transfn and finalfn
|
||||
* lookup_agg_function
|
||||
* common code for finding transfn, invtransfn and finalfn
|
||||
*
|
||||
* Returns OID of function, and stores its return type into *rettype
|
||||
*
|
||||
* NB: must not scribble on input_types[], as we may re-use those
|
||||
*/
|
||||
static Oid
|
||||
lookup_agg_function(List *fnName,
|
||||
|
||||
Reference in New Issue
Block a user