mirror of
https://github.com/postgres/postgres.git
synced 2025-05-05 09:19:17 +03:00
HashAgg: before spilling tuples, set unneeded columns to NULL.
This is a replacement for 4cad2534. Instead of projecting all tuples going into a HashAgg, only remove unnecessary attributes when actually spilling. This avoids the regression for the in-memory case. Discussion: https://postgr.es/m/a2fb7dfeb4f50aa0a123e42151ee3013933cb802.camel%40j-davis.com Backpatch-through: 13
This commit is contained in:
parent
926ecf83c0
commit
d8a7ce2450
@ -359,6 +359,14 @@ typedef struct HashAggBatch
|
|||||||
int64 input_tuples; /* number of tuples in this batch */
|
int64 input_tuples; /* number of tuples in this batch */
|
||||||
} HashAggBatch;
|
} HashAggBatch;
|
||||||
|
|
||||||
|
/* used to find referenced colnos */
|
||||||
|
typedef struct FindColsContext
|
||||||
|
{
|
||||||
|
bool is_aggref; /* is under an aggref */
|
||||||
|
Bitmapset *aggregated; /* column references under an aggref */
|
||||||
|
Bitmapset *unaggregated; /* other column references */
|
||||||
|
} FindColsContext;
|
||||||
|
|
||||||
static void select_current_set(AggState *aggstate, int setno, bool is_hash);
|
static void select_current_set(AggState *aggstate, int setno, bool is_hash);
|
||||||
static void initialize_phase(AggState *aggstate, int newphase);
|
static void initialize_phase(AggState *aggstate, int newphase);
|
||||||
static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
|
static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
|
||||||
@ -391,8 +399,9 @@ static void finalize_aggregates(AggState *aggstate,
|
|||||||
AggStatePerAgg peragg,
|
AggStatePerAgg peragg,
|
||||||
AggStatePerGroup pergroup);
|
AggStatePerGroup pergroup);
|
||||||
static TupleTableSlot *project_aggregates(AggState *aggstate);
|
static TupleTableSlot *project_aggregates(AggState *aggstate);
|
||||||
static Bitmapset *find_unaggregated_cols(AggState *aggstate);
|
static void find_cols(AggState *aggstate, Bitmapset **aggregated,
|
||||||
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
|
Bitmapset **unaggregated);
|
||||||
|
static bool find_cols_walker(Node *node, FindColsContext *context);
|
||||||
static void build_hash_tables(AggState *aggstate);
|
static void build_hash_tables(AggState *aggstate);
|
||||||
static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
|
static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
|
||||||
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
|
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
|
||||||
@ -425,8 +434,8 @@ static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
|
|||||||
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
|
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
|
||||||
int used_bits, uint64 input_tuples,
|
int used_bits, uint64 input_tuples,
|
||||||
double hashentrysize);
|
double hashentrysize);
|
||||||
static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot,
|
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
|
||||||
uint32 hash);
|
TupleTableSlot *slot, uint32 hash);
|
||||||
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
|
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
|
||||||
int setno);
|
int setno);
|
||||||
static void hashagg_tapeinfo_init(AggState *aggstate);
|
static void hashagg_tapeinfo_init(AggState *aggstate);
|
||||||
@ -1375,26 +1384,28 @@ project_aggregates(AggState *aggstate)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* find_unaggregated_cols
|
* Walk tlist and qual to find referenced colnos, dividing them into
|
||||||
* Construct a bitmapset of the column numbers of un-aggregated Vars
|
* aggregated and unaggregated sets.
|
||||||
* appearing in our targetlist and qual (HAVING clause)
|
|
||||||
*/
|
*/
|
||||||
static Bitmapset *
|
static void
|
||||||
find_unaggregated_cols(AggState *aggstate)
|
find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
|
||||||
{
|
{
|
||||||
Agg *node = (Agg *) aggstate->ss.ps.plan;
|
Agg *agg = (Agg *) aggstate->ss.ps.plan;
|
||||||
Bitmapset *colnos;
|
FindColsContext context;
|
||||||
|
|
||||||
colnos = NULL;
|
context.is_aggref = false;
|
||||||
(void) find_unaggregated_cols_walker((Node *) node->plan.targetlist,
|
context.aggregated = NULL;
|
||||||
&colnos);
|
context.unaggregated = NULL;
|
||||||
(void) find_unaggregated_cols_walker((Node *) node->plan.qual,
|
|
||||||
&colnos);
|
(void) find_cols_walker((Node *) agg->plan.targetlist, &context);
|
||||||
return colnos;
|
(void) find_cols_walker((Node *) agg->plan.qual, &context);
|
||||||
|
|
||||||
|
*aggregated = context.aggregated;
|
||||||
|
*unaggregated = context.unaggregated;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
|
find_cols_walker(Node *node, FindColsContext *context)
|
||||||
{
|
{
|
||||||
if (node == NULL)
|
if (node == NULL)
|
||||||
return false;
|
return false;
|
||||||
@ -1405,16 +1416,24 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
|
|||||||
/* setrefs.c should have set the varno to OUTER_VAR */
|
/* setrefs.c should have set the varno to OUTER_VAR */
|
||||||
Assert(var->varno == OUTER_VAR);
|
Assert(var->varno == OUTER_VAR);
|
||||||
Assert(var->varlevelsup == 0);
|
Assert(var->varlevelsup == 0);
|
||||||
*colnos = bms_add_member(*colnos, var->varattno);
|
if (context->is_aggref)
|
||||||
|
context->aggregated = bms_add_member(context->aggregated,
|
||||||
|
var->varattno);
|
||||||
|
else
|
||||||
|
context->unaggregated = bms_add_member(context->unaggregated,
|
||||||
|
var->varattno);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (IsA(node, Aggref) || IsA(node, GroupingFunc))
|
if (IsA(node, Aggref))
|
||||||
{
|
{
|
||||||
/* do not descend into aggregate exprs */
|
Assert(!context->is_aggref);
|
||||||
|
context->is_aggref = true;
|
||||||
|
expression_tree_walker(node, find_cols_walker, (void *) context);
|
||||||
|
context->is_aggref = false;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return expression_tree_walker(node, find_unaggregated_cols_walker,
|
return expression_tree_walker(node, find_cols_walker,
|
||||||
(void *) colnos);
|
(void *) context);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1532,13 +1551,27 @@ static void
|
|||||||
find_hash_columns(AggState *aggstate)
|
find_hash_columns(AggState *aggstate)
|
||||||
{
|
{
|
||||||
Bitmapset *base_colnos;
|
Bitmapset *base_colnos;
|
||||||
|
Bitmapset *aggregated_colnos;
|
||||||
|
TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
|
||||||
List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
|
List *outerTlist = outerPlanState(aggstate)->plan->targetlist;
|
||||||
int numHashes = aggstate->num_hashes;
|
int numHashes = aggstate->num_hashes;
|
||||||
EState *estate = aggstate->ss.ps.state;
|
EState *estate = aggstate->ss.ps.state;
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
/* Find Vars that will be needed in tlist and qual */
|
/* Find Vars that will be needed in tlist and qual */
|
||||||
base_colnos = find_unaggregated_cols(aggstate);
|
find_cols(aggstate, &aggregated_colnos, &base_colnos);
|
||||||
|
aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
|
||||||
|
aggstate->max_colno_needed = 0;
|
||||||
|
aggstate->all_cols_needed = true;
|
||||||
|
|
||||||
|
for (int i = 0; i < scanDesc->natts; i++)
|
||||||
|
{
|
||||||
|
int colno = i + 1;
|
||||||
|
if (bms_is_member(colno, aggstate->colnos_needed))
|
||||||
|
aggstate->max_colno_needed = colno;
|
||||||
|
else
|
||||||
|
aggstate->all_cols_needed = false;
|
||||||
|
}
|
||||||
|
|
||||||
for (j = 0; j < numHashes; ++j)
|
for (j = 0; j < numHashes; ++j)
|
||||||
{
|
{
|
||||||
@ -2097,7 +2130,7 @@ lookup_hash_entries(AggState *aggstate)
|
|||||||
perhash->aggnode->numGroups,
|
perhash->aggnode->numGroups,
|
||||||
aggstate->hashentrysize);
|
aggstate->hashentrysize);
|
||||||
|
|
||||||
hashagg_spill_tuple(spill, slot, hash);
|
hashagg_spill_tuple(aggstate, spill, slot, hash);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2619,7 +2652,7 @@ agg_refill_hash_table(AggState *aggstate)
|
|||||||
HASHAGG_READ_BUFFER_SIZE);
|
HASHAGG_READ_BUFFER_SIZE);
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
TupleTableSlot *slot = aggstate->hash_spill_slot;
|
TupleTableSlot *slot = aggstate->hash_spill_rslot;
|
||||||
MinimalTuple tuple;
|
MinimalTuple tuple;
|
||||||
uint32 hash;
|
uint32 hash;
|
||||||
bool in_hash_table;
|
bool in_hash_table;
|
||||||
@ -2655,7 +2688,7 @@ agg_refill_hash_table(AggState *aggstate)
|
|||||||
ngroups_estimate, aggstate->hashentrysize);
|
ngroups_estimate, aggstate->hashentrysize);
|
||||||
}
|
}
|
||||||
/* no memory for a new group, spill */
|
/* no memory for a new group, spill */
|
||||||
hashagg_spill_tuple(&spill, slot, hash);
|
hashagg_spill_tuple(aggstate, &spill, slot, hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -2934,9 +2967,11 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
|
|||||||
* partition.
|
* partition.
|
||||||
*/
|
*/
|
||||||
static Size
|
static Size
|
||||||
hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
|
hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
|
||||||
|
TupleTableSlot *inputslot, uint32 hash)
|
||||||
{
|
{
|
||||||
LogicalTapeSet *tapeset = spill->tapeset;
|
LogicalTapeSet *tapeset = spill->tapeset;
|
||||||
|
TupleTableSlot *spillslot;
|
||||||
int partition;
|
int partition;
|
||||||
MinimalTuple tuple;
|
MinimalTuple tuple;
|
||||||
int tapenum;
|
int tapenum;
|
||||||
@ -2945,8 +2980,28 @@ hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
|
|||||||
|
|
||||||
Assert(spill->partitions != NULL);
|
Assert(spill->partitions != NULL);
|
||||||
|
|
||||||
/* XXX: may contain unnecessary attributes, should project */
|
/* spill only attributes that we actually need */
|
||||||
tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
|
if (!aggstate->all_cols_needed)
|
||||||
|
{
|
||||||
|
spillslot = aggstate->hash_spill_wslot;
|
||||||
|
slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
|
||||||
|
ExecClearTuple(spillslot);
|
||||||
|
for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
|
||||||
|
{
|
||||||
|
if (bms_is_member(i + 1, aggstate->colnos_needed))
|
||||||
|
{
|
||||||
|
spillslot->tts_values[i] = inputslot->tts_values[i];
|
||||||
|
spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
|
||||||
|
}
|
||||||
|
else
|
||||||
|
spillslot->tts_isnull[i] = true;
|
||||||
|
}
|
||||||
|
ExecStoreVirtualTuple(spillslot);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
spillslot = inputslot;
|
||||||
|
|
||||||
|
tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
|
||||||
|
|
||||||
partition = (hash & spill->mask) >> spill->shift;
|
partition = (hash & spill->mask) >> spill->shift;
|
||||||
spill->ntuples[partition]++;
|
spill->ntuples[partition]++;
|
||||||
@ -3563,8 +3618,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
|
|||||||
aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
|
aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
|
||||||
"HashAgg meta context",
|
"HashAgg meta context",
|
||||||
ALLOCSET_DEFAULT_SIZES);
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
aggstate->hash_spill_slot = ExecInitExtraTupleSlot(estate, scanDesc,
|
aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
|
||||||
&TTSOpsMinimalTuple);
|
&TTSOpsMinimalTuple);
|
||||||
|
aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
|
||||||
|
&TTSOpsVirtual);
|
||||||
|
|
||||||
/* this is an array of pointers, not structures */
|
/* this is an array of pointers, not structures */
|
||||||
aggstate->hash_pergroup = pergroups;
|
aggstate->hash_pergroup = pergroups;
|
||||||
|
@ -2169,6 +2169,9 @@ typedef struct AggState
|
|||||||
int current_set; /* The current grouping set being evaluated */
|
int current_set; /* The current grouping set being evaluated */
|
||||||
Bitmapset *grouped_cols; /* grouped cols in current projection */
|
Bitmapset *grouped_cols; /* grouped cols in current projection */
|
||||||
List *all_grouped_cols; /* list of all grouped cols in DESC order */
|
List *all_grouped_cols; /* list of all grouped cols in DESC order */
|
||||||
|
Bitmapset *colnos_needed; /* all columns needed from the outer plan */
|
||||||
|
int max_colno_needed; /* highest colno needed from outer plan */
|
||||||
|
bool all_cols_needed; /* are all cols from outer plan needed? */
|
||||||
/* These fields are for grouping set phase data */
|
/* These fields are for grouping set phase data */
|
||||||
int maxsets; /* The max number of sets in any phase */
|
int maxsets; /* The max number of sets in any phase */
|
||||||
AggStatePerPhase phases; /* array of all phases */
|
AggStatePerPhase phases; /* array of all phases */
|
||||||
@ -2186,7 +2189,8 @@ typedef struct AggState
|
|||||||
struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
|
struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
|
||||||
struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set,
|
struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set,
|
||||||
* exists only during first pass */
|
* exists only during first pass */
|
||||||
TupleTableSlot *hash_spill_slot; /* slot for reading from spill files */
|
TupleTableSlot *hash_spill_rslot; /* for reading spill files */
|
||||||
|
TupleTableSlot *hash_spill_wslot; /* for writing spill files */
|
||||||
List *hash_batches; /* hash batches remaining to be processed */
|
List *hash_batches; /* hash batches remaining to be processed */
|
||||||
bool hash_ever_spilled; /* ever spilled during this execution? */
|
bool hash_ever_spilled; /* ever spilled during this execution? */
|
||||||
bool hash_spill_mode; /* we hit a limit during the current batch
|
bool hash_spill_mode; /* we hit a limit during the current batch
|
||||||
@ -2207,7 +2211,7 @@ typedef struct AggState
|
|||||||
* per-group pointers */
|
* per-group pointers */
|
||||||
|
|
||||||
/* support for evaluation of agg input expressions: */
|
/* support for evaluation of agg input expressions: */
|
||||||
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 49
|
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 53
|
||||||
AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than
|
AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than
|
||||||
* ->hash_pergroup */
|
* ->hash_pergroup */
|
||||||
ProjectionInfo *combinedproj; /* projection machinery */
|
ProjectionInfo *combinedproj; /* projection machinery */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user