1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-30 21:42:05 +03:00

First stage of reclaiming memory in executor by resetting short-term

memory contexts.  Currently, only leaks in expressions executed as
quals or projections are handled.  Clean up some old dead cruft in
executor while at it --- unused fields in state nodes, that sort of thing.
This commit is contained in:
Tom Lane
2000-07-12 02:37:39 +00:00
parent 46fb9c29e2
commit badce86a2c
53 changed files with 1536 additions and 1584 deletions

View File

@ -32,7 +32,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.68 2000/06/28 03:31:33 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.69 2000/07/12 02:37:03 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -50,6 +50,7 @@
#include "parser/parse_type.h"
#include "utils/syscache.h"
#include "utils/tuplesort.h"
#include "utils/datum.h"
/*
* AggStatePerAggData - per-aggregate working state for the Agg scan
@ -101,13 +102,15 @@ typedef struct AggStatePerAggData
initValue2IsNull;
/*
* We need the len and byval info for the agg's input and transition
* data types in order to know how to copy/delete values.
* We need the len and byval info for the agg's input, result, and
* transition data types in order to know how to copy/delete values.
*/
int inputtypeLen,
resulttypeLen,
transtype1Len,
transtype2Len;
bool inputtypeByVal,
resulttypeByVal,
transtype1ByVal,
transtype2ByVal;
@ -143,13 +146,16 @@ typedef struct AggStatePerAggData
static void initialize_aggregate(AggStatePerAgg peraggstate);
static void advance_transition_functions(AggStatePerAgg peraggstate,
Datum newVal, bool isNull);
static void process_sorted_aggregate(AggState *aggstate,
AggStatePerAgg peraggstate);
static void finalize_aggregate(AggStatePerAgg peraggstate,
Datum *resultVal, bool *resultIsNull);
static Datum copyDatum(Datum val, int typLen, bool typByVal);
/*
* Initialize one aggregate for a new set of input values.
*
* When called, CurrentMemoryContext should be the per-query context.
*/
static void
initialize_aggregate(AggStatePerAgg peraggstate)
@ -177,23 +183,14 @@ initialize_aggregate(AggStatePerAgg peraggstate)
/*
* (Re)set value1 and value2 to their initial values.
*
* Note that when the initial values are pass-by-ref, we just reuse
* them without copying for each group. Hence, transition function
* had better not scribble on its input!
*/
if (OidIsValid(peraggstate->xfn1_oid) &&
!peraggstate->initValue1IsNull)
peraggstate->value1 = copyDatum(peraggstate->initValue1,
peraggstate->transtype1Len,
peraggstate->transtype1ByVal);
else
peraggstate->value1 = (Datum) NULL;
peraggstate->value1 = peraggstate->initValue1;
peraggstate->value1IsNull = peraggstate->initValue1IsNull;
if (OidIsValid(peraggstate->xfn2_oid) &&
!peraggstate->initValue2IsNull)
peraggstate->value2 = copyDatum(peraggstate->initValue2,
peraggstate->transtype2Len,
peraggstate->transtype2ByVal);
else
peraggstate->value2 = (Datum) NULL;
peraggstate->value2 = peraggstate->initValue2;
peraggstate->value2IsNull = peraggstate->initValue2IsNull;
/* ------------------------------------------
@ -211,6 +208,9 @@ initialize_aggregate(AggStatePerAgg peraggstate)
/*
* Given a new input value, advance the transition functions of an aggregate.
*
* When called, CurrentMemoryContext should be the context we want transition
* function results to be delivered into on this cycle.
*
* Note: if the agg does not have usenulls set, null inputs will be filtered
* out before reaching here.
*/
@ -237,12 +237,13 @@ advance_transition_functions(AggStatePerAgg peraggstate,
* XXX We assume, without having checked, that the agg's input
* type is binary-compatible with its transtype1!
*
* We have to copy the datum since the tuple from which it came
* We had better copy the datum if it is pass-by-ref, since
* the given pointer may be pointing into a scan tuple that
* will be freed on the next iteration of the scan.
*/
peraggstate->value1 = copyDatum(newVal,
peraggstate->transtype1Len,
peraggstate->transtype1ByVal);
peraggstate->value1 = datumCopy(newVal,
peraggstate->transtype1ByVal,
peraggstate->transtype1Len);
peraggstate->value1IsNull = false;
peraggstate->noInitValue = false;
}
@ -264,8 +265,18 @@ advance_transition_functions(AggStatePerAgg peraggstate,
}
else
newVal = FunctionCallInvoke(&fcinfo);
if (!peraggstate->transtype1ByVal && !peraggstate->value1IsNull)
pfree(DatumGetPointer(peraggstate->value1));
/*
* If the transition function was uncooperative, it may have
* given us a pass-by-ref result that points at the scan tuple
* or the prior-cycle working memory. Copy it into the active
* context if it doesn't look right.
*/
if (!peraggstate->transtype1ByVal && !fcinfo.isnull &&
! MemoryContextContains(CurrentMemoryContext,
DatumGetPointer(newVal)))
newVal = datumCopy(newVal,
peraggstate->transtype1ByVal,
peraggstate->transtype1Len);
peraggstate->value1 = newVal;
peraggstate->value1IsNull = fcinfo.isnull;
}
@ -287,70 +298,116 @@ advance_transition_functions(AggStatePerAgg peraggstate,
}
else
newVal = FunctionCallInvoke(&fcinfo);
if (!peraggstate->transtype2ByVal && !peraggstate->value2IsNull)
pfree(DatumGetPointer(peraggstate->value2));
/*
* If the transition function was uncooperative, it may have
* given us a pass-by-ref result that points at the scan tuple
* or the prior-cycle working memory. Copy it into the active
* context if it doesn't look right.
*/
if (!peraggstate->transtype2ByVal && !fcinfo.isnull &&
! MemoryContextContains(CurrentMemoryContext,
DatumGetPointer(newVal)))
newVal = datumCopy(newVal,
peraggstate->transtype2ByVal,
peraggstate->transtype2Len);
peraggstate->value2 = newVal;
peraggstate->value2IsNull = fcinfo.isnull;
}
}
/*
* Compute the final value of one aggregate.
* Run the transition functions for a DISTINCT aggregate. This is called
* after we have completed entering all the input values into the sort
* object. We complete the sort, read out the value in sorted order, and
* run the transition functions on each non-duplicate value.
*
* When called, CurrentMemoryContext should be the per-query context.
*/
static void
finalize_aggregate(AggStatePerAgg peraggstate,
Datum *resultVal, bool *resultIsNull)
process_sorted_aggregate(AggState *aggstate,
AggStatePerAgg peraggstate)
{
Aggref *aggref = peraggstate->aggref;
FunctionCallInfoData fcinfo;
Datum oldVal = (Datum) 0;
bool haveOldVal = false;
MemoryContext oldContext;
Datum newVal;
bool isNull;
MemSet(&fcinfo, 0, sizeof(fcinfo));
tuplesort_performsort(peraggstate->sortstate);
/*
* If it's a DISTINCT aggregate, all we've done so far is to stuff the
* input values into the sort object. Complete the sort, then run the
* transition functions on the non-duplicate values. Note that
* DISTINCT always suppresses nulls, per SQL spec, regardless of
* usenulls.
* Note: if input type is pass-by-ref, the datums returned by the sort
* are freshly palloc'd in the per-query context, so we must be careful
* to pfree them when they are no longer needed.
*/
if (aggref->aggdistinct)
{
Datum oldVal = (Datum) 0;
bool haveOldVal = false;
Datum newVal;
bool isNull;
tuplesort_performsort(peraggstate->sortstate);
while (tuplesort_getdatum(peraggstate->sortstate, true,
&newVal, &isNull))
while (tuplesort_getdatum(peraggstate->sortstate, true,
&newVal, &isNull))
{
/*
* DISTINCT always suppresses nulls, per SQL spec, regardless of
* the aggregate's usenulls setting.
*/
if (isNull)
continue;
/*
* Clear and select the current working context for evaluation of
* the equality function and transition functions.
*/
MemoryContextReset(aggstate->agg_cxt[aggstate->which_cxt]);
oldContext =
MemoryContextSwitchTo(aggstate->agg_cxt[aggstate->which_cxt]);
if (haveOldVal &&
DatumGetBool(FunctionCall2(&peraggstate->equalfn,
oldVal, newVal)))
{
/* equal to prior, so forget this one */
if (!peraggstate->inputtypeByVal)
pfree(DatumGetPointer(newVal));
/* note we do NOT flip contexts in this case... */
}
else
{
if (isNull)
continue;
if (haveOldVal)
{
if (DatumGetBool(FunctionCall2(&peraggstate->equalfn,
oldVal, newVal)))
{
/* equal to prior, so forget this one */
if (!peraggstate->inputtypeByVal)
pfree(DatumGetPointer(newVal));
continue;
}
}
advance_transition_functions(peraggstate, newVal, false);
/*
* Make the other context current so that this transition
* result is preserved.
*/
aggstate->which_cxt = 1 - aggstate->which_cxt;
/* forget the old value, if any */
if (haveOldVal && !peraggstate->inputtypeByVal)
pfree(DatumGetPointer(oldVal));
oldVal = newVal;
haveOldVal = true;
}
if (haveOldVal && !peraggstate->inputtypeByVal)
pfree(DatumGetPointer(oldVal));
tuplesort_end(peraggstate->sortstate);
peraggstate->sortstate = NULL;
MemoryContextSwitchTo(oldContext);
}
if (haveOldVal && !peraggstate->inputtypeByVal)
pfree(DatumGetPointer(oldVal));
tuplesort_end(peraggstate->sortstate);
peraggstate->sortstate = NULL;
}
/*
* Compute the final value of one aggregate.
*
* When called, CurrentMemoryContext should be the context where we want
* final values delivered (ie, the per-output-tuple expression context).
*/
static void
finalize_aggregate(AggStatePerAgg peraggstate,
Datum *resultVal, bool *resultIsNull)
{
FunctionCallInfoData fcinfo;
MemSet(&fcinfo, 0, sizeof(fcinfo));
/*
* Now apply the agg's finalfn, or substitute the appropriate
* Apply the agg's finalfn, or substitute the appropriate
* transition value if there is no finalfn.
*
* XXX For now, only apply finalfn if we got at least one non-null input
@ -403,35 +460,27 @@ finalize_aggregate(AggStatePerAgg peraggstate,
/* Return value1 */
*resultVal = peraggstate->value1;
*resultIsNull = peraggstate->value1IsNull;
/* prevent pfree below */
peraggstate->value1IsNull = true;
}
else if (OidIsValid(peraggstate->xfn2_oid))
{
/* Return value2 */
*resultVal = peraggstate->value2;
*resultIsNull = peraggstate->value2IsNull;
/* prevent pfree below */
peraggstate->value2IsNull = true;
}
else
elog(ERROR, "ExecAgg: no valid transition functions??");
/*
* Release any per-group working storage, unless we're passing it back
* as the result of the aggregate.
* If result is pass-by-ref, make sure it is in the right context.
*/
if (OidIsValid(peraggstate->xfn1_oid) &&
!peraggstate->value1IsNull &&
!peraggstate->transtype1ByVal)
pfree(DatumGetPointer(peraggstate->value1));
if (OidIsValid(peraggstate->xfn2_oid) &&
!peraggstate->value2IsNull &&
!peraggstate->transtype2ByVal)
pfree(DatumGetPointer(peraggstate->value2));
if (!peraggstate->resulttypeByVal && ! *resultIsNull &&
! MemoryContextContains(CurrentMemoryContext,
DatumGetPointer(*resultVal)))
*resultVal = datumCopy(*resultVal,
peraggstate->resulttypeByVal,
peraggstate->resulttypeLen);
}
/* ---------------------------------------
*
* ExecAgg -
@ -461,6 +510,7 @@ ExecAgg(Agg *node)
Datum *aggvalues;
bool *aggnulls;
AggStatePerAgg peragg;
MemoryContext oldContext;
TupleTableSlot *resultSlot;
HeapTuple inputTuple;
int aggno;
@ -481,14 +531,18 @@ ExecAgg(Agg *node)
peragg = aggstate->peragg;
/*
* We loop retrieving groups until we find one matching
* node->plan.qual
* We loop retrieving groups until we find one matching node->plan.qual
*/
do
{
if (aggstate->agg_done)
return NULL;
/*
* Clear the per-output-tuple context for each group
*/
MemoryContextReset(aggstate->tup_cxt);
/*
* Initialize working state for a new input tuple group
*/
@ -514,6 +568,17 @@ ExecAgg(Agg *node)
break;
econtext->ecxt_scantuple = outerslot;
/*
* Clear and select the current working context for evaluation
* of the input expressions and transition functions at this
* input tuple.
*/
econtext->ecxt_per_tuple_memory =
aggstate->agg_cxt[aggstate->which_cxt];
ResetExprContext(econtext);
oldContext =
MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &peragg[aggno];
@ -527,13 +592,26 @@ ExecAgg(Agg *node)
continue; /* ignore this tuple for this agg */
if (aggref->aggdistinct)
{
/* putdatum has to be called in per-query context */
MemoryContextSwitchTo(oldContext);
tuplesort_putdatum(peraggstate->sortstate,
newVal, isNull);
MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
}
else
advance_transition_functions(peraggstate,
newVal, isNull);
}
/*
* Make the other context current so that these transition
* results are preserved.
*/
aggstate->which_cxt = 1 - aggstate->which_cxt;
MemoryContextSwitchTo(oldContext);
/*
* Keep a copy of the first input tuple for the projection.
* (We only need one since only the GROUP BY columns in it can
@ -546,14 +624,38 @@ ExecAgg(Agg *node)
/*
* Done scanning input tuple group. Finalize each aggregate
* calculation.
* calculation, and stash results in the per-output-tuple context.
*
* This is a bit tricky when there are both DISTINCT and plain
* aggregates: we must first finalize all the plain aggs and then all
* the DISTINCT ones. This is needed because the last transition
* values for the plain aggs are stored in the not-current working
* context, and we have to evaluate those aggs (and stash the results
* in the output tup_cxt!) before we start flipping contexts again
* in process_sorted_aggregate.
*/
oldContext = MemoryContextSwitchTo(aggstate->tup_cxt);
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &peragg[aggno];
finalize_aggregate(peraggstate,
&aggvalues[aggno], &aggnulls[aggno]);
if (! peraggstate->aggref->aggdistinct)
finalize_aggregate(peraggstate,
&aggvalues[aggno], &aggnulls[aggno]);
}
MemoryContextSwitchTo(oldContext);
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &peragg[aggno];
if (peraggstate->aggref->aggdistinct)
{
process_sorted_aggregate(aggstate, peraggstate);
oldContext = MemoryContextSwitchTo(aggstate->tup_cxt);
finalize_aggregate(peraggstate,
&aggvalues[aggno], &aggnulls[aggno]);
MemoryContextSwitchTo(oldContext);
}
}
/*
@ -584,7 +686,7 @@ ExecAgg(Agg *node)
/*
* If inputtuple==NULL (ie, the outerPlan didn't return
* anything), create a dummy all-nulls input tuple for use by
* execProject. 99.44% of the time this is a waste of cycles,
* ExecProject. 99.44% of the time this is a waste of cycles,
* because ordinarily the projected output tuple's targetlist
* cannot contain any direct (non-aggregated) references to
* input columns, so the dummy tuple will not be referenced.
@ -619,7 +721,8 @@ ExecAgg(Agg *node)
/*
* Store the representative input tuple in the tuple table slot
* reserved for it.
* reserved for it. The tuple will be deleted when it is cleared
* from the slot.
*/
ExecStoreTuple(inputTuple,
aggstate->csstate.css_ScanTupleSlot,
@ -627,6 +730,11 @@ ExecAgg(Agg *node)
true);
econtext->ecxt_scantuple = aggstate->csstate.css_ScanTupleSlot;
/*
* Do projection and qual check in the per-output-tuple context.
*/
econtext->ecxt_per_tuple_memory = aggstate->tup_cxt;
/*
* Form a projection tuple using the aggregate results and the
* representative input tuple. Store it in the result tuple slot.
@ -701,11 +809,33 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
}
/*
* assign node's base id and create expression context
* Create expression context
*/
ExecAssignNodeBaseInfo(estate, &aggstate->csstate.cstate, (Plan *) parent);
ExecAssignExprContext(estate, &aggstate->csstate.cstate);
/*
* We actually need three separate expression memory contexts: one
* for calculating per-output-tuple values (ie, the finished aggregate
* results), and two that we ping-pong between for per-input-tuple
* evaluation of input expressions and transition functions. The
* context made by ExecAssignExprContext() is used as the output context.
*/
aggstate->tup_cxt =
aggstate->csstate.cstate.cs_ExprContext->ecxt_per_tuple_memory;
aggstate->agg_cxt[0] =
AllocSetContextCreate(CurrentMemoryContext,
"AggExprContext1",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
aggstate->agg_cxt[1] =
AllocSetContextCreate(CurrentMemoryContext,
"AggExprContext2",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
aggstate->which_cxt = 0;
#define AGG_NSLOTS 2
/*
@ -769,16 +899,20 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
/* Fill in the peraggstate data */
peraggstate->aggref = aggref;
aggTuple = SearchSysCacheTuple(AGGNAME,
PointerGetDatum(aggname),
ObjectIdGetDatum(aggref->basetype),
0, 0);
aggTuple = SearchSysCacheTupleCopy(AGGNAME,
PointerGetDatum(aggname),
ObjectIdGetDatum(aggref->basetype),
0, 0);
if (!HeapTupleIsValid(aggTuple))
elog(ERROR, "ExecAgg: cache lookup failed for aggregate %s(%s)",
aggname,
typeidTypeName(aggref->basetype));
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
typeInfo = typeidType(aggform->aggfinaltype);
peraggstate->resulttypeLen = typeLen(typeInfo);
peraggstate->resulttypeByVal = typeByVal(typeInfo);
peraggstate->initValue1 =
AggNameGetInitVal(aggname,
aggform->aggbasetype,
@ -846,6 +980,8 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
peraggstate->sortOperator = any_ordering_op(inputType);
peraggstate->sortstate = NULL;
}
heap_freetuple(aggTuple);
}
return TRUE;
@ -866,6 +1002,17 @@ ExecEndAgg(Agg *node)
Plan *outerPlan;
ExecFreeProjectionInfo(&aggstate->csstate.cstate);
/*
* Make sure ExecFreeExprContext() frees the right expr context...
*/
aggstate->csstate.cstate.cs_ExprContext->ecxt_per_tuple_memory =
aggstate->tup_cxt;
ExecFreeExprContext(&aggstate->csstate.cstate);
/*
* ... and I free the others.
*/
MemoryContextDelete(aggstate->agg_cxt[0]);
MemoryContextDelete(aggstate->agg_cxt[1]);
outerPlan = outerPlan(node);
ExecEndNode(outerPlan, (Plan *) node);
@ -890,28 +1037,4 @@ ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent)
*/
if (((Plan *) node)->lefttree->chgParam == NULL)
ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
}
/*
* Helper routine to make a copy of a Datum.
*
* NB: input had better not be a NULL; might cause null-pointer dereference.
*/
static Datum
copyDatum(Datum val, int typLen, bool typByVal)
{
if (typByVal)
return val;
else
{
char *newVal;
if (typLen == -1) /* variable length type? */
typLen = VARSIZE((struct varlena *) DatumGetPointer(val));
newVal = (char *) palloc(typLen);
memcpy(newVal, DatumGetPointer(val), typLen);
return PointerGetDatum(newVal);
}
}