mirror of
https://github.com/postgres/postgres.git
synced 2025-07-02 09:02:37 +03:00
aggregate(DISTINCT ...) works, per SQL spec.
Note this forces initdb because of change of Aggref node in stored rules.
This commit is contained in:
@ -3,15 +3,35 @@
|
||||
* nodeAgg.c
|
||||
* Routines to handle aggregate nodes.
|
||||
*
|
||||
* ExecAgg evaluates each aggregate in the following steps: (initcond1,
|
||||
* initcond2 are the initial values and sfunc1, sfunc2, and finalfunc are
|
||||
* the transition functions.)
|
||||
*
|
||||
* value1 = initcond1
|
||||
* value2 = initcond2
|
||||
* foreach input_value do
|
||||
* value1 = sfunc1(value1, input_value)
|
||||
* value2 = sfunc2(value2)
|
||||
* value1 = finalfunc(value1, value2)
|
||||
*
|
||||
* If initcond1 is NULL then the first non-NULL input_value is
|
||||
* assigned directly to value1. sfunc1 isn't applied until value1
|
||||
* is non-NULL.
|
||||
*
|
||||
* sfunc1 is never applied when the current tuple's input_value is NULL.
|
||||
* sfunc2 is applied for each tuple if the aggref is marked 'usenulls',
|
||||
* otherwise it is only applied when input_value is not NULL.
|
||||
* (usenulls was formerly used for COUNT(*), but is no longer needed for
|
||||
* that purpose; as of 10/1999 the support for usenulls is dead code.
|
||||
* I have not removed it because it seems like a potentially useful
|
||||
* feature for user-defined aggregates. We'd just need to add a
|
||||
* flag column to pg_aggregate and a parameter to CREATE AGGREGATE...)
|
||||
*
|
||||
*
|
||||
* Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* NOTE
|
||||
* The implementation of Agg node has been reworked to handle legal
|
||||
* SQL aggregates. (Do not expect POSTQUEL semantics.) -- ay 2/95
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.59 1999/10/30 02:35:14 tgl Exp $
|
||||
* $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.60 1999/12/13 01:26:52 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -20,11 +40,15 @@
|
||||
|
||||
#include "access/heapam.h"
|
||||
#include "catalog/pg_aggregate.h"
|
||||
#include "catalog/pg_operator.h"
|
||||
#include "executor/executor.h"
|
||||
#include "executor/nodeAgg.h"
|
||||
#include "optimizer/clauses.h"
|
||||
#include "parser/parse_expr.h"
|
||||
#include "parser/parse_oper.h"
|
||||
#include "parser/parse_type.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/tuplesort.h"
|
||||
|
||||
/*
|
||||
* AggStatePerAggData - per-aggregate working state for the Agg scan
|
||||
@ -36,6 +60,9 @@ typedef struct AggStatePerAggData
|
||||
* thereafter:
|
||||
*/
|
||||
|
||||
/* Link to Aggref node this working state is for */
|
||||
Aggref *aggref;
|
||||
|
||||
/* Oids of transfer functions */
|
||||
Oid xfn1_oid;
|
||||
Oid xfn2_oid;
|
||||
@ -47,6 +74,18 @@ typedef struct AggStatePerAggData
|
||||
FmgrInfo xfn1;
|
||||
FmgrInfo xfn2;
|
||||
FmgrInfo finalfn;
|
||||
/*
|
||||
* Type of input data and Oid of sort operator to use for it;
|
||||
* only set/used when aggregate has DISTINCT flag. (These are not
|
||||
* used directly by nodeAgg, but must be passed to the Tuplesort object.)
|
||||
*/
|
||||
Oid inputType;
|
||||
Oid sortOperator;
|
||||
/*
|
||||
* fmgr lookup data for input type's equality operator --- only set/used
|
||||
* when aggregate has DISTINCT flag.
|
||||
*/
|
||||
FmgrInfo equalfn;
|
||||
/*
|
||||
* initial values from pg_aggregate entry
|
||||
*/
|
||||
@ -55,19 +94,29 @@ typedef struct AggStatePerAggData
|
||||
bool initValue1IsNull,
|
||||
initValue2IsNull;
|
||||
/*
|
||||
* We need the len and byval info for the agg's transition status types
|
||||
* in order to know how to copy/delete values.
|
||||
* We need the len and byval info for the agg's input and transition
|
||||
* data types in order to know how to copy/delete values.
|
||||
*/
|
||||
int transtype1Len,
|
||||
int inputtypeLen,
|
||||
transtype1Len,
|
||||
transtype2Len;
|
||||
bool transtype1ByVal,
|
||||
bool inputtypeByVal,
|
||||
transtype1ByVal,
|
||||
transtype2ByVal;
|
||||
|
||||
/*
|
||||
* These values are working state that is initialized at the start
|
||||
* of an input tuple group and updated for each input tuple:
|
||||
* of an input tuple group and updated for each input tuple.
|
||||
*
|
||||
* For a simple (non DISTINCT) aggregate, we just feed the input values
|
||||
* straight to the transition functions. If it's DISTINCT, we pass the
|
||||
* input values into a Tuplesort object; then at completion of the input
|
||||
* tuple group, we scan the sorted values, eliminate duplicates, and run
|
||||
* the transition functions on the rest.
|
||||
*/
|
||||
|
||||
Tuplesortstate *sortstate; /* sort object, if a DISTINCT agg */
|
||||
|
||||
Datum value1, /* current transfer values 1 and 2 */
|
||||
value2;
|
||||
bool value1IsNull,
|
||||
@ -82,28 +131,248 @@ typedef struct AggStatePerAggData
|
||||
} AggStatePerAggData;
|
||||
|
||||
|
||||
/*
|
||||
* Helper routine to make a copy of a Datum.
|
||||
*
|
||||
* NB: input had better not be a NULL; might cause null-pointer dereference.
|
||||
*/
|
||||
static Datum
|
||||
copyDatum(Datum val, int typLen, bool typByVal)
|
||||
{
|
||||
if (typByVal)
|
||||
return val;
|
||||
else
|
||||
{
|
||||
char *newVal;
|
||||
static void initialize_aggregate (AggStatePerAgg peraggstate);
|
||||
static void advance_transition_functions (AggStatePerAgg peraggstate,
|
||||
Datum newVal, bool isNull);
|
||||
static void finalize_aggregate (AggStatePerAgg peraggstate,
|
||||
Datum *resultVal, bool *resultIsNull);
|
||||
static Datum copyDatum(Datum val, int typLen, bool typByVal);
|
||||
|
||||
if (typLen == -1) /* variable length type? */
|
||||
typLen = VARSIZE((struct varlena *) DatumGetPointer(val));
|
||||
newVal = (char *) palloc(typLen);
|
||||
memcpy(newVal, DatumGetPointer(val), typLen);
|
||||
return PointerGetDatum(newVal);
|
||||
|
||||
/*
|
||||
* Initialize one aggregate for a new set of input values.
|
||||
*/
|
||||
static void
|
||||
initialize_aggregate (AggStatePerAgg peraggstate)
|
||||
{
|
||||
Aggref *aggref = peraggstate->aggref;
|
||||
|
||||
/*
|
||||
* Start a fresh sort operation for each DISTINCT aggregate.
|
||||
*/
|
||||
if (aggref->aggdistinct)
|
||||
{
|
||||
/* In case of rescan, maybe there could be an uncompleted
|
||||
* sort operation? Clean it up if so.
|
||||
*/
|
||||
if (peraggstate->sortstate)
|
||||
tuplesort_end(peraggstate->sortstate);
|
||||
|
||||
peraggstate->sortstate =
|
||||
tuplesort_begin_datum(peraggstate->inputType,
|
||||
peraggstate->sortOperator,
|
||||
false);
|
||||
}
|
||||
|
||||
/*
|
||||
* (Re)set value1 and value2 to their initial values.
|
||||
*/
|
||||
if (OidIsValid(peraggstate->xfn1_oid) &&
|
||||
! peraggstate->initValue1IsNull)
|
||||
peraggstate->value1 = copyDatum(peraggstate->initValue1,
|
||||
peraggstate->transtype1Len,
|
||||
peraggstate->transtype1ByVal);
|
||||
else
|
||||
peraggstate->value1 = (Datum) NULL;
|
||||
peraggstate->value1IsNull = peraggstate->initValue1IsNull;
|
||||
|
||||
if (OidIsValid(peraggstate->xfn2_oid) &&
|
||||
! peraggstate->initValue2IsNull)
|
||||
peraggstate->value2 = copyDatum(peraggstate->initValue2,
|
||||
peraggstate->transtype2Len,
|
||||
peraggstate->transtype2ByVal);
|
||||
else
|
||||
peraggstate->value2 = (Datum) NULL;
|
||||
peraggstate->value2IsNull = peraggstate->initValue2IsNull;
|
||||
|
||||
/* ------------------------------------------
|
||||
* If the initial value for the first transition function
|
||||
* doesn't exist in the pg_aggregate table then we will let
|
||||
* the first value returned from the outer procNode become
|
||||
* the initial value. (This is useful for aggregates like
|
||||
* max{} and min{}.) The noInitValue flag signals that we
|
||||
* still need to do this.
|
||||
* ------------------------------------------
|
||||
*/
|
||||
peraggstate->noInitValue = peraggstate->initValue1IsNull;
|
||||
}
|
||||
|
||||
/*
|
||||
* Given a new input value, advance the transition functions of an aggregate.
|
||||
*
|
||||
* Note: if the agg does not have usenulls set, null inputs will be filtered
|
||||
* out before reaching here.
|
||||
*/
|
||||
static void
|
||||
advance_transition_functions (AggStatePerAgg peraggstate,
|
||||
Datum newVal, bool isNull)
|
||||
{
|
||||
Datum args[2];
|
||||
|
||||
if (OidIsValid(peraggstate->xfn1_oid) && !isNull)
|
||||
{
|
||||
if (peraggstate->noInitValue)
|
||||
{
|
||||
/*
|
||||
* value1 has not been initialized. This is the first non-NULL
|
||||
* input value. We use it as the initial value for value1.
|
||||
*
|
||||
* XXX We assume, without having checked, that the agg's input
|
||||
* type is binary-compatible with its transtype1!
|
||||
*
|
||||
* We have to copy the datum since the tuple from which it came
|
||||
* will be freed on the next iteration of the scan.
|
||||
*/
|
||||
peraggstate->value1 = copyDatum(newVal,
|
||||
peraggstate->transtype1Len,
|
||||
peraggstate->transtype1ByVal);
|
||||
peraggstate->value1IsNull = false;
|
||||
peraggstate->noInitValue = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* apply transition function 1 */
|
||||
args[0] = peraggstate->value1;
|
||||
args[1] = newVal;
|
||||
newVal = (Datum) fmgr_c(&peraggstate->xfn1,
|
||||
(FmgrValues *) args,
|
||||
&isNull);
|
||||
if (! peraggstate->transtype1ByVal)
|
||||
pfree(peraggstate->value1);
|
||||
peraggstate->value1 = newVal;
|
||||
}
|
||||
}
|
||||
|
||||
if (OidIsValid(peraggstate->xfn2_oid))
|
||||
{
|
||||
/* apply transition function 2 */
|
||||
args[0] = peraggstate->value2;
|
||||
isNull = false; /* value2 cannot be null, currently */
|
||||
newVal = (Datum) fmgr_c(&peraggstate->xfn2,
|
||||
(FmgrValues *) args,
|
||||
&isNull);
|
||||
if (! peraggstate->transtype2ByVal)
|
||||
pfree(peraggstate->value2);
|
||||
peraggstate->value2 = newVal;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Compute the final value of one aggregate.
|
||||
*/
|
||||
static void
|
||||
finalize_aggregate (AggStatePerAgg peraggstate,
|
||||
Datum *resultVal, bool *resultIsNull)
|
||||
{
|
||||
Aggref *aggref = peraggstate->aggref;
|
||||
char *args[2];
|
||||
|
||||
/*
|
||||
* If it's a DISTINCT aggregate, all we've done so far is to stuff the
|
||||
* input values into the sort object. Complete the sort, then run
|
||||
* the transition functions on the non-duplicate values. Note that
|
||||
* DISTINCT always suppresses nulls, per SQL spec, regardless of usenulls.
|
||||
*/
|
||||
if (aggref->aggdistinct)
|
||||
{
|
||||
Datum oldVal = (Datum) 0;
|
||||
bool haveOldVal = false;
|
||||
Datum newVal;
|
||||
bool isNull;
|
||||
|
||||
tuplesort_performsort(peraggstate->sortstate);
|
||||
while (tuplesort_getdatum(peraggstate->sortstate, true,
|
||||
&newVal, &isNull))
|
||||
{
|
||||
if (isNull)
|
||||
continue;
|
||||
if (haveOldVal)
|
||||
{
|
||||
Datum equal;
|
||||
|
||||
equal = (Datum) (*fmgr_faddr(&peraggstate->equalfn)) (oldVal,
|
||||
newVal);
|
||||
if (DatumGetInt32(equal) != 0)
|
||||
{
|
||||
if (! peraggstate->inputtypeByVal)
|
||||
pfree(DatumGetPointer(newVal));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
advance_transition_functions(peraggstate, newVal, false);
|
||||
if (haveOldVal && ! peraggstate->inputtypeByVal)
|
||||
pfree(DatumGetPointer(oldVal));
|
||||
oldVal = newVal;
|
||||
haveOldVal = true;
|
||||
}
|
||||
if (haveOldVal && ! peraggstate->inputtypeByVal)
|
||||
pfree(DatumGetPointer(oldVal));
|
||||
tuplesort_end(peraggstate->sortstate);
|
||||
peraggstate->sortstate = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Now apply the agg's finalfn, or substitute the appropriate transition
|
||||
* value if there is no finalfn.
|
||||
*
|
||||
* XXX For now, only apply finalfn if we got at least one
|
||||
* non-null input value. This prevents zero divide in AVG().
|
||||
* If we had cleaner handling of null inputs/results in functions,
|
||||
* we could probably take out this hack and define the result
|
||||
* for no inputs as whatever finalfn returns for null input.
|
||||
*/
|
||||
if (OidIsValid(peraggstate->finalfn_oid) &&
|
||||
! peraggstate->noInitValue)
|
||||
{
|
||||
if (peraggstate->finalfn.fn_nargs > 1)
|
||||
{
|
||||
args[0] = (char *) peraggstate->value1;
|
||||
args[1] = (char *) peraggstate->value2;
|
||||
}
|
||||
else if (OidIsValid(peraggstate->xfn1_oid))
|
||||
args[0] = (char *) peraggstate->value1;
|
||||
else if (OidIsValid(peraggstate->xfn2_oid))
|
||||
args[0] = (char *) peraggstate->value2;
|
||||
else
|
||||
elog(ERROR, "ExecAgg: no valid transition functions??");
|
||||
*resultIsNull = false;
|
||||
*resultVal = (Datum) fmgr_c(&peraggstate->finalfn,
|
||||
(FmgrValues *) args,
|
||||
resultIsNull);
|
||||
}
|
||||
else if (OidIsValid(peraggstate->xfn1_oid))
|
||||
{
|
||||
/* Return value1 */
|
||||
*resultVal = peraggstate->value1;
|
||||
*resultIsNull = peraggstate->value1IsNull;
|
||||
/* prevent pfree below */
|
||||
peraggstate->value1IsNull = true;
|
||||
}
|
||||
else if (OidIsValid(peraggstate->xfn2_oid))
|
||||
{
|
||||
/* Return value2 */
|
||||
*resultVal = peraggstate->value2;
|
||||
*resultIsNull = peraggstate->value2IsNull;
|
||||
/* prevent pfree below */
|
||||
peraggstate->value2IsNull = true;
|
||||
}
|
||||
else
|
||||
elog(ERROR, "ExecAgg: no valid transition functions??");
|
||||
|
||||
/*
|
||||
* Release any per-group working storage, unless we're passing
|
||||
* it back as the result of the aggregate.
|
||||
*/
|
||||
if (OidIsValid(peraggstate->xfn1_oid) &&
|
||||
! peraggstate->value1IsNull &&
|
||||
! peraggstate->transtype1ByVal)
|
||||
pfree(peraggstate->value1);
|
||||
|
||||
if (OidIsValid(peraggstate->xfn2_oid) &&
|
||||
! peraggstate->value2IsNull &&
|
||||
! peraggstate->transtype2ByVal)
|
||||
pfree(peraggstate->value2);
|
||||
}
|
||||
|
||||
/* ---------------------------------------
|
||||
*
|
||||
@ -118,30 +387,6 @@ copyDatum(Datum val, int typLen, bool typByVal)
|
||||
* the expression context to be used when ExecProject evaluates the
|
||||
* result tuple.
|
||||
*
|
||||
* ExecAgg evaluates each aggregate in the following steps: (initcond1,
|
||||
* initcond2 are the initial values and sfunc1, sfunc2, and finalfunc are
|
||||
* the transition functions.)
|
||||
*
|
||||
* value1 = initcond1
|
||||
* value2 = initcond2
|
||||
* foreach tuple do
|
||||
* value1 = sfunc1(value1, aggregated_value)
|
||||
* value2 = sfunc2(value2)
|
||||
* value1 = finalfunc(value1, value2)
|
||||
*
|
||||
* If initcond1 is NULL then the first non-NULL aggregated_value is
|
||||
* assigned directly to value1. sfunc1 isn't applied until value1
|
||||
* is non-NULL.
|
||||
*
|
||||
* sfunc1 is never applied when the current tuple's aggregated_value
|
||||
* is NULL. sfunc2 is applied for each tuple if the aggref is marked
|
||||
* 'usenulls', otherwise it is only applied when aggregated_value is
|
||||
* not NULL. (usenulls was formerly used for COUNT(*), but is no longer
|
||||
* needed for that purpose; as of 10/1999 the support for usenulls is
|
||||
* dead code. I have not removed it because it seems like a potentially
|
||||
* useful feature for user-defined aggregates. We'd just need to add a
|
||||
* flag column to pg_aggregate and a parameter to CREATE AGGREGATE...)
|
||||
*
|
||||
* If the outer subplan is a Group node, ExecAgg returns as many tuples
|
||||
* as there are groups.
|
||||
*
|
||||
@ -161,7 +406,6 @@ ExecAgg(Agg *node)
|
||||
TupleTableSlot *resultSlot;
|
||||
HeapTuple inputTuple;
|
||||
int aggno;
|
||||
List *alist;
|
||||
bool isDone;
|
||||
bool isNull;
|
||||
|
||||
@ -190,42 +434,11 @@ ExecAgg(Agg *node)
|
||||
/*
|
||||
* Initialize working state for a new input tuple group
|
||||
*/
|
||||
aggno = -1;
|
||||
foreach(alist, aggstate->aggs)
|
||||
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
|
||||
{
|
||||
AggStatePerAgg peraggstate = &peragg[++aggno];
|
||||
AggStatePerAgg peraggstate = &peragg[aggno];
|
||||
|
||||
/*
|
||||
* (Re)set value1 and value2 to their initial values.
|
||||
*/
|
||||
if (OidIsValid(peraggstate->xfn1_oid) &&
|
||||
! peraggstate->initValue1IsNull)
|
||||
peraggstate->value1 = copyDatum(peraggstate->initValue1,
|
||||
peraggstate->transtype1Len,
|
||||
peraggstate->transtype1ByVal);
|
||||
else
|
||||
peraggstate->value1 = (Datum) NULL;
|
||||
peraggstate->value1IsNull = peraggstate->initValue1IsNull;
|
||||
|
||||
if (OidIsValid(peraggstate->xfn2_oid) &&
|
||||
! peraggstate->initValue2IsNull)
|
||||
peraggstate->value2 = copyDatum(peraggstate->initValue2,
|
||||
peraggstate->transtype2Len,
|
||||
peraggstate->transtype2ByVal);
|
||||
else
|
||||
peraggstate->value2 = (Datum) NULL;
|
||||
peraggstate->value2IsNull = peraggstate->initValue2IsNull;
|
||||
|
||||
/* ------------------------------------------
|
||||
* If the initial value for the first transition function
|
||||
* doesn't exist in the pg_aggregate table then we will let
|
||||
* the first value returned from the outer procNode become
|
||||
* the initial value. (This is useful for aggregates like
|
||||
* max{} and min{}.) The noInitValue flag signals that we
|
||||
* still need to do this.
|
||||
* ------------------------------------------
|
||||
*/
|
||||
peraggstate->noInitValue = peraggstate->initValue1IsNull;
|
||||
initialize_aggregate(peraggstate);
|
||||
}
|
||||
|
||||
inputTuple = NULL; /* no saved input tuple yet */
|
||||
@ -243,13 +456,11 @@ ExecAgg(Agg *node)
|
||||
break;
|
||||
econtext->ecxt_scantuple = outerslot;
|
||||
|
||||
aggno = -1;
|
||||
foreach(alist, aggstate->aggs)
|
||||
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
|
||||
{
|
||||
Aggref *aggref = (Aggref *) lfirst(alist);
|
||||
AggStatePerAgg peraggstate = &peragg[++aggno];
|
||||
AggStatePerAgg peraggstate = &peragg[aggno];
|
||||
Aggref *aggref = peraggstate->aggref;
|
||||
Datum newVal;
|
||||
Datum args[2];
|
||||
|
||||
newVal = ExecEvalExpr(aggref->target, econtext,
|
||||
&isNull, &isDone);
|
||||
@ -257,53 +468,12 @@ ExecAgg(Agg *node)
|
||||
if (isNull && !aggref->usenulls)
|
||||
continue; /* ignore this tuple for this agg */
|
||||
|
||||
if (OidIsValid(peraggstate->xfn1_oid) && !isNull)
|
||||
{
|
||||
if (peraggstate->noInitValue)
|
||||
{
|
||||
/*
|
||||
* value1 has not been initialized. This is the
|
||||
* first non-NULL input value. We use it as the
|
||||
* initial value for value1. XXX We assume,
|
||||
* without having checked, that the agg's input type
|
||||
* is binary-compatible with its transtype1!
|
||||
*
|
||||
* We have to copy the datum since the tuple from
|
||||
* which it came will be freed on the next iteration
|
||||
* of the scan.
|
||||
*/
|
||||
peraggstate->value1 = copyDatum(newVal,
|
||||
peraggstate->transtype1Len,
|
||||
peraggstate->transtype1ByVal);
|
||||
peraggstate->value1IsNull = false;
|
||||
peraggstate->noInitValue = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* apply transition function 1 */
|
||||
args[0] = peraggstate->value1;
|
||||
args[1] = newVal;
|
||||
newVal = (Datum) fmgr_c(&peraggstate->xfn1,
|
||||
(FmgrValues *) args,
|
||||
&isNull);
|
||||
if (! peraggstate->transtype1ByVal)
|
||||
pfree(peraggstate->value1);
|
||||
peraggstate->value1 = newVal;
|
||||
}
|
||||
}
|
||||
|
||||
if (OidIsValid(peraggstate->xfn2_oid))
|
||||
{
|
||||
/* apply transition function 2 */
|
||||
args[0] = peraggstate->value2;
|
||||
isNull = false; /* value2 cannot be null, currently */
|
||||
newVal = (Datum) fmgr_c(&peraggstate->xfn2,
|
||||
(FmgrValues *) args,
|
||||
&isNull);
|
||||
if (! peraggstate->transtype2ByVal)
|
||||
pfree(peraggstate->value2);
|
||||
peraggstate->value2 = newVal;
|
||||
}
|
||||
if (aggref->aggdistinct)
|
||||
tuplesort_putdatum(peraggstate->sortstate,
|
||||
newVal, isNull);
|
||||
else
|
||||
advance_transition_functions(peraggstate,
|
||||
newVal, isNull);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -320,70 +490,12 @@ ExecAgg(Agg *node)
|
||||
* Done scanning input tuple group.
|
||||
* Finalize each aggregate calculation.
|
||||
*/
|
||||
aggno = -1;
|
||||
foreach(alist, aggstate->aggs)
|
||||
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
|
||||
{
|
||||
AggStatePerAgg peraggstate = &peragg[++aggno];
|
||||
char *args[2];
|
||||
AggStatePerAgg peraggstate = &peragg[aggno];
|
||||
|
||||
/*
|
||||
* XXX For now, only apply finalfn if we got at least one
|
||||
* non-null input value. This prevents zero divide in AVG().
|
||||
* If we had cleaner handling of null inputs/results in functions,
|
||||
* we could probably take out this hack and define the result
|
||||
* for no inputs as whatever finalfn returns for null input.
|
||||
*/
|
||||
if (OidIsValid(peraggstate->finalfn_oid) &&
|
||||
! peraggstate->noInitValue)
|
||||
{
|
||||
if (peraggstate->finalfn.fn_nargs > 1)
|
||||
{
|
||||
args[0] = (char *) peraggstate->value1;
|
||||
args[1] = (char *) peraggstate->value2;
|
||||
}
|
||||
else if (OidIsValid(peraggstate->xfn1_oid))
|
||||
args[0] = (char *) peraggstate->value1;
|
||||
else if (OidIsValid(peraggstate->xfn2_oid))
|
||||
args[0] = (char *) peraggstate->value2;
|
||||
else
|
||||
elog(ERROR, "ExecAgg: no valid transition functions??");
|
||||
aggnulls[aggno] = false;
|
||||
aggvalues[aggno] = (Datum) fmgr_c(&peraggstate->finalfn,
|
||||
(FmgrValues *) args,
|
||||
&(aggnulls[aggno]));
|
||||
}
|
||||
else if (OidIsValid(peraggstate->xfn1_oid))
|
||||
{
|
||||
/* Return value1 */
|
||||
aggvalues[aggno] = peraggstate->value1;
|
||||
aggnulls[aggno] = peraggstate->value1IsNull;
|
||||
/* prevent pfree below */
|
||||
peraggstate->value1IsNull = true;
|
||||
}
|
||||
else if (OidIsValid(peraggstate->xfn2_oid))
|
||||
{
|
||||
/* Return value2 */
|
||||
aggvalues[aggno] = peraggstate->value2;
|
||||
aggnulls[aggno] = peraggstate->value2IsNull;
|
||||
/* prevent pfree below */
|
||||
peraggstate->value2IsNull = true;
|
||||
}
|
||||
else
|
||||
elog(ERROR, "ExecAgg: no valid transition functions??");
|
||||
|
||||
/*
|
||||
* Release any per-group working storage, unless we're passing
|
||||
* it back as the result of the aggregate.
|
||||
*/
|
||||
if (OidIsValid(peraggstate->xfn1_oid) &&
|
||||
! peraggstate->value1IsNull &&
|
||||
! peraggstate->transtype1ByVal)
|
||||
pfree(peraggstate->value1);
|
||||
|
||||
if (OidIsValid(peraggstate->xfn2_oid) &&
|
||||
! peraggstate->value2IsNull &&
|
||||
! peraggstate->transtype2ByVal)
|
||||
pfree(peraggstate->value2);
|
||||
finalize_aggregate(peraggstate,
|
||||
& aggvalues[aggno], & aggnulls[aggno]);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -458,14 +570,14 @@ ExecAgg(Agg *node)
|
||||
|
||||
/*
|
||||
* Form a projection tuple using the aggregate results and the
|
||||
* representative input tuple. Store it in the result tuple slot,
|
||||
* and return it if it meets my qual condition.
|
||||
* representative input tuple. Store it in the result tuple slot.
|
||||
*/
|
||||
resultSlot = ExecProject(projInfo, &isDone);
|
||||
|
||||
/*
|
||||
* If the completed tuple does not match the qualifications,
|
||||
* it is ignored and we loop back to try to process another group.
|
||||
* Otherwise, return the tuple.
|
||||
*/
|
||||
}
|
||||
while (! ExecQual(node->plan.qual, econtext));
|
||||
@ -505,6 +617,11 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
|
||||
|
||||
/*
|
||||
* find aggregates in targetlist and quals
|
||||
*
|
||||
* Note: pull_agg_clauses also checks that no aggs contain other agg
|
||||
* calls in their arguments. This would make no sense under SQL semantics
|
||||
* anyway (and it's forbidden by the spec). Because that is true, we
|
||||
* don't need to worry about evaluating the aggs in any particular order.
|
||||
*/
|
||||
aggstate->aggs = nconc(pull_agg_clause((Node *) node->plan.targetlist),
|
||||
pull_agg_clause((Node *) node->plan.qual));
|
||||
@ -588,6 +705,9 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
|
||||
/* Mark Aggref node with its associated index in the result array */
|
||||
aggref->aggno = aggno;
|
||||
|
||||
/* Fill in the peraggstate data */
|
||||
peraggstate->aggref = aggref;
|
||||
|
||||
aggTuple = SearchSysCacheTuple(AGGNAME,
|
||||
PointerGetDatum(aggname),
|
||||
ObjectIdGetDatum(aggref->basetype),
|
||||
@ -644,6 +764,29 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
|
||||
{
|
||||
fmgr_info(finalfn_oid, &peraggstate->finalfn);
|
||||
}
|
||||
|
||||
if (aggref->aggdistinct)
|
||||
{
|
||||
Oid inputType = exprType(aggref->target);
|
||||
Operator eq_operator;
|
||||
Form_pg_operator pgopform;
|
||||
|
||||
peraggstate->inputType = inputType;
|
||||
typeInfo = typeidType(inputType);
|
||||
peraggstate->inputtypeLen = typeLen(typeInfo);
|
||||
peraggstate->inputtypeByVal = typeByVal(typeInfo);
|
||||
|
||||
eq_operator = oper("=", inputType, inputType, true);
|
||||
if (!HeapTupleIsValid(eq_operator))
|
||||
{
|
||||
elog(ERROR, "Unable to identify an equality operator for type '%s'",
|
||||
typeidTypeName(inputType));
|
||||
}
|
||||
pgopform = (Form_pg_operator) GETSTRUCT(eq_operator);
|
||||
fmgr_info(pgopform->oprcode, &(peraggstate->equalfn));
|
||||
peraggstate->sortOperator = any_ordering_op(inputType);
|
||||
peraggstate->sortstate = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return TRUE;
|
||||
@ -690,3 +833,26 @@ ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent)
|
||||
ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
|
||||
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Helper routine to make a copy of a Datum.
|
||||
*
|
||||
* NB: input had better not be a NULL; might cause null-pointer dereference.
|
||||
*/
|
||||
static Datum
|
||||
copyDatum(Datum val, int typLen, bool typByVal)
|
||||
{
|
||||
if (typByVal)
|
||||
return val;
|
||||
else
|
||||
{
|
||||
char *newVal;
|
||||
|
||||
if (typLen == -1) /* variable length type? */
|
||||
typLen = VARSIZE((struct varlena *) DatumGetPointer(val));
|
||||
newVal = (char *) palloc(typLen);
|
||||
memcpy(newVal, DatumGetPointer(val), typLen);
|
||||
return PointerGetDatum(newVal);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user