1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-12 21:01:52 +03:00

Refactor LogicalTapeSet/LogicalTape interface.

All the tape functions, like LogicalTapeRead and LogicalTapeWrite, now
take a LogicalTape as argument, instead of LogicalTapeSet+tape number.
You can create any number of LogicalTapes in a single LogicalTapeSet, and
you don't need to decide the number upfront, when you create the tape set.

This makes the tape management in hash agg spilling in nodeAgg.c simpler.

Discussion: https://www.postgresql.org/message-id/420a0ec7-602c-d406-1e75-1ef7ddc58d83%40iki.fi
Reviewed-by: Peter Geoghegan, Zhihong Yu, John Naylor
This commit is contained in:
Heikki Linnakangas
2021-10-18 14:30:00 +03:00
parent 409f9ca447
commit c4649cce39
5 changed files with 377 additions and 571 deletions

View File

@ -208,7 +208,16 @@
*
* Spilled data is written to logical tapes. These provide better control
* over memory usage, disk space, and the number of files than if we were
* to use a BufFile for each spill.
* to use a BufFile for each spill. We don't know the number of tapes needed
* at the start of the algorithm (because it can recurse), so a tape set is
* allocated at the beginning, and individual tapes are created as needed.
* As a particular tape is read, logtape.c recycles its disk space. When a
* tape is read to completion, it is destroyed entirely.
*
* Tapes' buffers can take up substantial memory when many tapes are open at
* once. We only need one tape open at a time in read mode (using a buffer
* that's a multiple of BLCKSZ); but we need one tape open in write mode (each
* requiring a buffer of size BLCKSZ) for each partition.
*
* Note that it's possible for transition states to start small but then
* grow very large; for instance in the case of ARRAY_AGG. In such cases,
@ -311,27 +320,6 @@
*/
#define CHUNKHDRSZ 16
/*
* Track all tapes needed for a HashAgg that spills. We don't know the maximum
* number of tapes needed at the start of the algorithm (because it can
* recurse), so one tape set is allocated and extended as needed for new
* tapes. When a particular tape is already read, rewind it for write mode and
* put it in the free list.
*
* Tapes' buffers can take up substantial memory when many tapes are open at
* once. We only need one tape open at a time in read mode (using a buffer
* that's a multiple of BLCKSZ); but we need one tape open in write mode (each
* requiring a buffer of size BLCKSZ) for each partition.
*/
typedef struct HashTapeInfo
{
LogicalTapeSet *tapeset;
int ntapes;
int *freetapes;
int nfreetapes;
int freetapes_alloc;
} HashTapeInfo;
/*
* Represents partitioned spill data for a single hashtable. Contains the
* necessary information to route tuples to the correct partition, and to
@ -343,9 +331,8 @@ typedef struct HashTapeInfo
*/
typedef struct HashAggSpill
{
LogicalTapeSet *tapeset; /* borrowed reference to tape set */
int npartitions; /* number of partitions */
int *partitions; /* spill partition tape numbers */
LogicalTape **partitions; /* spill partition tapes */
int64 *ntuples; /* number of tuples in each partition */
uint32 mask; /* mask to find partition from hash value */
int shift; /* after masking, shift by this amount */
@ -365,8 +352,7 @@ typedef struct HashAggBatch
{
int setno; /* grouping set */
int used_bits; /* number of bits of hash already used */
LogicalTapeSet *tapeset; /* borrowed reference to tape set */
int input_tapenum; /* input partition tape */
LogicalTape *input_tape; /* input partition tape */
int64 input_tuples; /* number of tuples in this batch */
double input_card; /* estimated group cardinality */
} HashAggBatch;
@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
int npartitions);
static void hashagg_finish_initial_spills(AggState *aggstate);
static void hashagg_reset_spill_state(AggState *aggstate);
static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
int input_tapenum, int setno,
static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
int64 input_tuples, double input_card,
int used_bits);
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts,
int used_bits, double input_groups,
double hashentrysize);
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *slot, uint32 hash);
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
int setno);
static void hashagg_tapeinfo_init(AggState *aggstate);
static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
int ndest);
static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
AggState *aggstate, EState *estate,
@ -1887,12 +1868,12 @@ hash_agg_enter_spill_mode(AggState *aggstate)
if (!aggstate->hash_ever_spilled)
{
Assert(aggstate->hash_tapeinfo == NULL);
Assert(aggstate->hash_tapeset == NULL);
Assert(aggstate->hash_spills == NULL);
aggstate->hash_ever_spilled = true;
hashagg_tapeinfo_init(aggstate);
aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
@ -1901,7 +1882,7 @@ hash_agg_enter_spill_mode(AggState *aggstate)
AggStatePerHash perhash = &aggstate->perhash[setno];
HashAggSpill *spill = &aggstate->hash_spills[setno];
hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
}
@ -1943,9 +1924,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
aggstate->hash_mem_peak = total_mem;
/* update disk usage */
if (aggstate->hash_tapeinfo != NULL)
if (aggstate->hash_tapeset != NULL)
{
uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
if (aggstate->hash_disk_used < disk_used)
aggstate->hash_disk_used = disk_used;
@ -2132,7 +2113,7 @@ lookup_hash_entries(AggState *aggstate)
TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
if (spill->partitions == NULL)
hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
perhash->aggnode->numGroups,
aggstate->hashentrysize);
@ -2597,7 +2578,7 @@ agg_refill_hash_table(AggState *aggstate)
HashAggBatch *batch;
AggStatePerHash perhash;
HashAggSpill spill;
HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
LogicalTapeSet *tapeset = aggstate->hash_tapeset;
bool spill_initialized = false;
if (aggstate->hash_batches == NIL)
@ -2693,7 +2674,7 @@ agg_refill_hash_table(AggState *aggstate)
* that we don't assign tapes that will never be used.
*/
spill_initialized = true;
hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
hashagg_spill_init(&spill, tapeset, batch->used_bits,
batch->input_card, aggstate->hashentrysize);
}
/* no memory for a new group, spill */
@ -2709,7 +2690,7 @@ agg_refill_hash_table(AggState *aggstate)
ResetExprContext(aggstate->tmpcontext);
}
hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
LogicalTapeClose(batch->input_tape);
/* change back to phase 0 */
aggstate->current_phase = 0;
@ -2884,67 +2865,6 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
return NULL;
}
/*
* Initialize HashTapeInfo
*/
static void
hashagg_tapeinfo_init(AggState *aggstate)
{
HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
int init_tapes = 16; /* expanded dynamically */
tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
tapeinfo->ntapes = init_tapes;
tapeinfo->nfreetapes = init_tapes;
tapeinfo->freetapes_alloc = init_tapes;
tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
for (int i = 0; i < init_tapes; i++)
tapeinfo->freetapes[i] = i;
aggstate->hash_tapeinfo = tapeinfo;
}
/*
* Assign unused tapes to spill partitions, extending the tape set if
* necessary.
*/
static void
hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
int npartitions)
{
int partidx = 0;
/* use free tapes if available */
while (partidx < npartitions && tapeinfo->nfreetapes > 0)
partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
if (partidx < npartitions)
{
LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
while (partidx < npartitions)
partitions[partidx++] = tapeinfo->ntapes++;
}
}
/*
* After a tape has already been written to and then read, this function
* rewinds it for writing and adds it to the free list.
*/
static void
hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
{
/* rewinding frees the buffer while not in use */
LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
{
tapeinfo->freetapes_alloc <<= 1;
tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
tapeinfo->freetapes_alloc * sizeof(int));
}
tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
}
/*
* hashagg_spill_init
*
@ -2952,7 +2872,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
* of partitions to create, and initializes them.
*/
static void
hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
double input_groups, double hashentrysize)
{
int npartitions;
@ -2961,13 +2881,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
used_bits, &partition_bits);
spill->partitions = palloc0(sizeof(int) * npartitions);
spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
spill->ntuples = palloc0(sizeof(int64) * npartitions);
spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
for (int i = 0; i < npartitions; i++)
spill->partitions[i] = LogicalTapeCreate(tapeset);
spill->tapeset = tapeinfo->tapeset;
spill->shift = 32 - used_bits - partition_bits;
spill->mask = (npartitions - 1) << spill->shift;
spill->npartitions = npartitions;
@ -2986,11 +2906,10 @@ static Size
hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *inputslot, uint32 hash)
{
LogicalTapeSet *tapeset = spill->tapeset;
TupleTableSlot *spillslot;
int partition;
MinimalTuple tuple;
int tapenum;
LogicalTape *tape;
int total_written = 0;
bool shouldFree;
@ -3029,12 +2948,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
*/
addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
tapenum = spill->partitions[partition];
tape = spill->partitions[partition];
LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32));
total_written += sizeof(uint32);
LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
LogicalTapeWrite(tape, (void *) tuple, tuple->t_len);
total_written += tuple->t_len;
if (shouldFree)
@ -3050,15 +2969,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
* be done.
*/
static HashAggBatch *
hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
hashagg_batch_new(LogicalTape *input_tape, int setno,
int64 input_tuples, double input_card, int used_bits)
{
HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
batch->setno = setno;
batch->used_bits = used_bits;
batch->tapeset = tapeset;
batch->input_tapenum = tapenum;
batch->input_tape = input_tape;
batch->input_tuples = input_tuples;
batch->input_card = input_card;
@ -3072,42 +2990,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
static MinimalTuple
hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
{
LogicalTapeSet *tapeset = batch->tapeset;
int tapenum = batch->input_tapenum;
LogicalTape *tape = batch->input_tape;
MinimalTuple tuple;
uint32 t_len;
size_t nread;
uint32 hash;
nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
if (nread == 0)
return NULL;
if (nread != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
tapenum, sizeof(uint32), nread)));
errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
tape, sizeof(uint32), nread)));
if (hashp != NULL)
*hashp = hash;
nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
if (nread != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
tapenum, sizeof(uint32), nread)));
errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
tape, sizeof(uint32), nread)));
tuple = (MinimalTuple) palloc(t_len);
tuple->t_len = t_len;
nread = LogicalTapeRead(tapeset, tapenum,
nread = LogicalTapeRead(tape,
(void *) ((char *) tuple + sizeof(uint32)),
t_len - sizeof(uint32));
if (nread != t_len - sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
tapenum, t_len - sizeof(uint32), nread)));
errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
tape, t_len - sizeof(uint32), nread)));
return tuple;
}
@ -3164,8 +3081,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
for (i = 0; i < spill->npartitions; i++)
{
LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
int tapenum = spill->partitions[i];
LogicalTape *tape = spill->partitions[i];
HashAggBatch *new_batch;
double cardinality;
@ -3177,10 +3093,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
freeHyperLogLog(&spill->hll_card[i]);
/* rewinding frees the buffer while not in use */
LogicalTapeRewindForRead(tapeset, tapenum,
HASHAGG_READ_BUFFER_SIZE);
LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
new_batch = hashagg_batch_new(tapeset, tapenum, setno,
new_batch = hashagg_batch_new(tape, setno,
spill->ntuples[i], cardinality,
used_bits);
aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
@ -3227,14 +3142,10 @@ hashagg_reset_spill_state(AggState *aggstate)
aggstate->hash_batches = NIL;
/* close tape set */
if (aggstate->hash_tapeinfo != NULL)
if (aggstate->hash_tapeset != NULL)
{
HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
LogicalTapeSetClose(tapeinfo->tapeset);
pfree(tapeinfo->freetapes);
pfree(tapeinfo);
aggstate->hash_tapeinfo = NULL;
LogicalTapeSetClose(aggstate->hash_tapeset);
aggstate->hash_tapeset = NULL;
}
}