1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-29 10:41:53 +03:00

Fix LookupTupleHashEntryHash() pipeline-stall issue.

Refactor hash lookups in nodeAgg.c to improve performance.

Author: Andres Freund and Jeff Davis
Discussion: https://postgr.es/m/20200612213715.op4ye4q7gktqvpuo%40alap3.anarazel.de
Backpatch-through: 13
This commit is contained in:
Jeff Davis
2020-07-26 14:55:52 -07:00
parent 21b0055359
commit 7f5f2249b2
6 changed files with 107 additions and 103 deletions

View File

@ -391,7 +391,9 @@ static void finalize_partialaggregate(AggState *aggstate,
AggStatePerAgg peragg,
AggStatePerGroup pergroupstate,
Datum *resultVal, bool *resultIsNull);
static void prepare_hash_slot(AggState *aggstate);
static inline void prepare_hash_slot(AggStatePerHash perhash,
TupleTableSlot *inputslot,
TupleTableSlot *hashslot);
static void prepare_projection_slot(AggState *aggstate,
TupleTableSlot *slot,
int currentSet);
@ -413,8 +415,9 @@ static int hash_choose_num_partitions(uint64 input_groups,
double hashentrysize,
int used_bits,
int *log2_npartittions);
static AggStatePerGroup lookup_hash_entry(AggState *aggstate, uint32 hash,
bool *in_hash_table);
static void initialize_hash_entry(AggState *aggstate,
TupleHashTable hashtable,
TupleHashEntry entry);
static void lookup_hash_entries(AggState *aggstate);
static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
static void agg_fill_hash_table(AggState *aggstate);
@ -1207,12 +1210,11 @@ finalize_partialaggregate(AggState *aggstate,
* Extract the attributes that make up the grouping key into the
* hashslot. This is necessary to compute the hash or perform a lookup.
*/
static void
prepare_hash_slot(AggState *aggstate)
static inline void
prepare_hash_slot(AggStatePerHash perhash,
TupleTableSlot *inputslot,
TupleTableSlot *hashslot)
{
TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple;
AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
TupleTableSlot *hashslot = perhash->hashslot;
int i;
/* transfer just the needed columns into hashslot */
@ -2013,75 +2015,39 @@ hash_choose_num_partitions(uint64 input_groups, double hashentrysize,
}
/*
* Find or create a hashtable entry for the tuple group containing the current
* tuple (already set in tmpcontext's outertuple slot), in the current grouping
* set (which the caller must have selected - note that initialize_aggregate
* depends on this).
*
* When called, CurrentMemoryContext should be the per-query context. The
* already-calculated hash value for the tuple must be specified.
*
* If in "spill mode", then only find existing hashtable entries; don't create
* new ones. If a tuple's group is not already present in the hash table for
* the current grouping set, assign *in_hash_table=false and the caller will
* spill it to disk.
* Initialize a freshly-created TupleHashEntry.
*/
static AggStatePerGroup
lookup_hash_entry(AggState *aggstate, uint32 hash, bool *in_hash_table)
static void
initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable,
TupleHashEntry entry)
{
AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
TupleTableSlot *hashslot = perhash->hashslot;
TupleHashEntryData *entry;
bool isnew = false;
bool *p_isnew;
AggStatePerGroup pergroup;
int transno;
/* if hash table already spilled, don't create new entries */
p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
aggstate->hash_ngroups_current++;
hash_agg_check_limits(aggstate);
/* find or create the hashtable entry using the filtered tuple */
entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, p_isnew,
hash);
/* no need to allocate or initialize per-group state */
if (aggstate->numtrans == 0)
return;
if (entry == NULL)
pergroup = (AggStatePerGroup)
MemoryContextAlloc(hashtable->tablecxt,
sizeof(AggStatePerGroupData) * aggstate->numtrans);
entry->additional = pergroup;
/*
* Initialize aggregates for new tuple group, lookup_hash_entries()
* already has selected the relevant grouping set.
*/
for (transno = 0; transno < aggstate->numtrans; transno++)
{
*in_hash_table = false;
return NULL;
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
AggStatePerGroup pergroupstate = &pergroup[transno];
initialize_aggregate(aggstate, pertrans, pergroupstate);
}
else
*in_hash_table = true;
if (isnew)
{
AggStatePerGroup pergroup;
int transno;
aggstate->hash_ngroups_current++;
hash_agg_check_limits(aggstate);
/* no need to allocate or initialize per-group state */
if (aggstate->numtrans == 0)
return NULL;
pergroup = (AggStatePerGroup)
MemoryContextAlloc(perhash->hashtable->tablecxt,
sizeof(AggStatePerGroupData) * aggstate->numtrans);
entry->additional = pergroup;
/*
* Initialize aggregates for new tuple group, lookup_hash_entries()
* already has selected the relevant grouping set.
*/
for (transno = 0; transno < aggstate->numtrans; transno++)
{
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
AggStatePerGroup pergroupstate = &pergroup[transno];
initialize_aggregate(aggstate, pertrans, pergroupstate);
}
}
return entry->additional;
}
/*
@ -2106,21 +2072,37 @@ static void
lookup_hash_entries(AggState *aggstate)
{
AggStatePerGroup *pergroup = aggstate->hash_pergroup;
TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
int setno;
for (setno = 0; setno < aggstate->num_hashes; setno++)
{
AggStatePerHash perhash = &aggstate->perhash[setno];
TupleHashTable hashtable = perhash->hashtable;
TupleTableSlot *hashslot = perhash->hashslot;
TupleHashEntry entry;
uint32 hash;
bool in_hash_table;
bool isnew = false;
bool *p_isnew;
/* if hash table already spilled, don't create new entries */
p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
select_current_set(aggstate, setno, true);
prepare_hash_slot(aggstate);
hash = TupleHashTableHash(perhash->hashtable, perhash->hashslot);
pergroup[setno] = lookup_hash_entry(aggstate, hash, &in_hash_table);
prepare_hash_slot(perhash,
outerslot,
hashslot);
/* check to see if we need to spill the tuple for this grouping set */
if (!in_hash_table)
entry = LookupTupleHashEntry(hashtable, hashslot,
p_isnew, &hash);
if (entry != NULL)
{
if (isnew)
initialize_hash_entry(aggstate, hashtable, entry);
pergroup[setno] = entry->additional;
}
else
{
HashAggSpill *spill = &aggstate->hash_spills[setno];
TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
@ -2131,6 +2113,7 @@ lookup_hash_entries(AggState *aggstate)
aggstate->hashentrysize);
hashagg_spill_tuple(aggstate, spill, slot, hash);
pergroup[setno] = NULL;
}
}
}
@ -2588,6 +2571,7 @@ static bool
agg_refill_hash_table(AggState *aggstate)
{
HashAggBatch *batch;
AggStatePerHash perhash;
HashAggSpill spill;
HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
uint64 ngroups_estimate;
@ -2639,6 +2623,8 @@ agg_refill_hash_table(AggState *aggstate)
select_current_set(aggstate, batch->setno, true);
perhash = &aggstate->perhash[aggstate->current_set];
/*
* Spilled tuples are always read back as MinimalTuples, which may be
* different from the outer plan, so recompile the aggregate expressions.
@ -2652,10 +2638,13 @@ agg_refill_hash_table(AggState *aggstate)
HASHAGG_READ_BUFFER_SIZE);
for (;;)
{
TupleTableSlot *slot = aggstate->hash_spill_rslot;
TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
TupleTableSlot *hashslot = perhash->hashslot;
TupleHashEntry entry;
MinimalTuple tuple;
uint32 hash;
bool in_hash_table;
bool isnew = false;
bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
CHECK_FOR_INTERRUPTS();
@ -2663,16 +2652,20 @@ agg_refill_hash_table(AggState *aggstate)
if (tuple == NULL)
break;
ExecStoreMinimalTuple(tuple, slot, true);
aggstate->tmpcontext->ecxt_outertuple = slot;
ExecStoreMinimalTuple(tuple, spillslot, true);
aggstate->tmpcontext->ecxt_outertuple = spillslot;
prepare_hash_slot(aggstate);
aggstate->hash_pergroup[batch->setno] =
lookup_hash_entry(aggstate, hash, &in_hash_table);
prepare_hash_slot(perhash,
aggstate->tmpcontext->ecxt_outertuple,
hashslot);
entry = LookupTupleHashEntryHash(
perhash->hashtable, hashslot, p_isnew, hash);
if (in_hash_table)
if (entry != NULL)
{
/* Advance the aggregates (or combine functions) */
if (isnew)
initialize_hash_entry(aggstate, perhash->hashtable, entry);
aggstate->hash_pergroup[batch->setno] = entry->additional;
advance_aggregates(aggstate);
}
else
@ -2688,7 +2681,9 @@ agg_refill_hash_table(AggState *aggstate)
ngroups_estimate, aggstate->hashentrysize);
}
/* no memory for a new group, spill */
hashagg_spill_tuple(aggstate, &spill, slot, hash);
hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
aggstate->hash_pergroup[batch->setno] = NULL;
}
/*