mirror of
https://github.com/postgres/postgres.git
synced 2025-04-20 00:42:27 +03:00
Injection points for hash aggregation.
Requires adding a guard against shift-by-32. Previously, that was impossible because the number of partitions was always greater than 1, but a new injection point can force the number of partitions to 1. Discussion: https://postgr.es/m/ff4e59305e5d689e03cd256a736348d3e7958f8f.camel@j-davis.com
This commit is contained in:
parent
052026c9b9
commit
38172d1856
@ -269,6 +269,7 @@
|
||||
#include "utils/datum.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/expandeddatum.h"
|
||||
#include "utils/injection_point.h"
|
||||
#include "utils/logtape.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
@ -1489,6 +1490,14 @@ build_hash_tables(AggState *aggstate)
|
||||
perhash->aggnode->numGroups,
|
||||
memory);
|
||||
|
||||
#ifdef USE_INJECTION_POINTS
|
||||
if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-oversize-table"))
|
||||
{
|
||||
nbuckets = memory / sizeof(TupleHashEntryData);
|
||||
INJECTION_POINT_CACHED("hash-aggregate-oversize-table");
|
||||
}
|
||||
#endif
|
||||
|
||||
build_hash_table(aggstate, setno, nbuckets);
|
||||
}
|
||||
|
||||
@ -1860,6 +1869,18 @@ hash_agg_check_limits(AggState *aggstate)
|
||||
true);
|
||||
Size hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
|
||||
true);
|
||||
bool do_spill = false;
|
||||
|
||||
#ifdef USE_INJECTION_POINTS
|
||||
if (ngroups >= 1000)
|
||||
{
|
||||
if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000"))
|
||||
{
|
||||
do_spill = true;
|
||||
INJECTION_POINT_CACHED("hash-aggregate-spill-1000");
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Don't spill unless there's at least one group in the hash table so we
|
||||
@ -1869,8 +1890,11 @@ hash_agg_check_limits(AggState *aggstate)
|
||||
(meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
|
||||
ngroups > aggstate->hash_ngroups_limit))
|
||||
{
|
||||
hash_agg_enter_spill_mode(aggstate);
|
||||
do_spill = true;
|
||||
}
|
||||
|
||||
if (do_spill)
|
||||
hash_agg_enter_spill_mode(aggstate);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1881,6 +1905,7 @@ hash_agg_check_limits(AggState *aggstate)
|
||||
static void
|
||||
hash_agg_enter_spill_mode(AggState *aggstate)
|
||||
{
|
||||
INJECTION_POINT("hash-aggregate-enter-spill-mode");
|
||||
aggstate->hash_spill_mode = true;
|
||||
hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
|
||||
|
||||
@ -2652,6 +2677,7 @@ agg_refill_hash_table(AggState *aggstate)
|
||||
*/
|
||||
hashagg_recompile_expressions(aggstate, true, true);
|
||||
|
||||
INJECTION_POINT("hash-aggregate-process-batch");
|
||||
for (;;)
|
||||
{
|
||||
TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
|
||||
@ -2900,6 +2926,15 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
|
||||
npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
|
||||
used_bits, &partition_bits);
|
||||
|
||||
#ifdef USE_INJECTION_POINTS
|
||||
if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition"))
|
||||
{
|
||||
npartitions = 1;
|
||||
partition_bits = 0;
|
||||
INJECTION_POINT_CACHED("hash-aggregate-single-partition");
|
||||
}
|
||||
#endif
|
||||
|
||||
spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
|
||||
spill->ntuples = palloc0(sizeof(int64) * npartitions);
|
||||
spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
|
||||
@ -2908,7 +2943,10 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
|
||||
spill->partitions[i] = LogicalTapeCreate(tapeset);
|
||||
|
||||
spill->shift = 32 - used_bits - partition_bits;
|
||||
spill->mask = (npartitions - 1) << spill->shift;
|
||||
if (spill->shift < 32)
|
||||
spill->mask = (npartitions - 1) << spill->shift;
|
||||
else
|
||||
spill->mask = 0;
|
||||
spill->npartitions = npartitions;
|
||||
|
||||
for (int i = 0; i < npartitions; i++)
|
||||
@ -2957,7 +2995,11 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
|
||||
|
||||
tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
|
||||
|
||||
partition = (hash & spill->mask) >> spill->shift;
|
||||
if (spill->shift < 32)
|
||||
partition = (hash & spill->mask) >> spill->shift;
|
||||
else
|
||||
partition = 0;
|
||||
|
||||
spill->ntuples[partition]++;
|
||||
|
||||
/*
|
||||
|
@ -11,7 +11,7 @@ EXTENSION = injection_points
|
||||
DATA = injection_points--1.0.sql
|
||||
PGFILEDESC = "injection_points - facility for injection points"
|
||||
|
||||
REGRESS = injection_points reindex_conc
|
||||
REGRESS = injection_points hashagg reindex_conc
|
||||
REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
|
||||
|
||||
ISOLATION = basic inplace syscache-update-pruned
|
||||
|
68
src/test/modules/injection_points/expected/hashagg.out
Normal file
68
src/test/modules/injection_points/expected/hashagg.out
Normal file
@ -0,0 +1,68 @@
|
||||
-- Test for hash aggregation
|
||||
CREATE EXTENSION injection_points;
|
||||
SELECT injection_points_set_local();
|
||||
injection_points_set_local
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
|
||||
injection_points_attach
|
||||
-------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
|
||||
injection_points_attach
|
||||
-------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- force partition fan-out to 1
|
||||
SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
|
||||
injection_points_attach
|
||||
-------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- force spilling after 1000 groups
|
||||
SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
|
||||
injection_points_attach
|
||||
-------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE hashagg_ij(x INTEGER);
|
||||
INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
|
||||
SET max_parallel_workers=0;
|
||||
SET max_parallel_workers_per_gather=0;
|
||||
SET enable_sort=FALSE;
|
||||
SET work_mem='4MB';
|
||||
SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
|
||||
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
|
||||
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
|
||||
NOTICE: notice triggered for injection point hash-aggregate-single-partition
|
||||
NOTICE: notice triggered for injection point hash-aggregate-process-batch
|
||||
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
|
||||
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
|
||||
NOTICE: notice triggered for injection point hash-aggregate-single-partition
|
||||
NOTICE: notice triggered for injection point hash-aggregate-process-batch
|
||||
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
|
||||
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
|
||||
NOTICE: notice triggered for injection point hash-aggregate-single-partition
|
||||
NOTICE: notice triggered for injection point hash-aggregate-process-batch
|
||||
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
|
||||
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
|
||||
NOTICE: notice triggered for injection point hash-aggregate-single-partition
|
||||
NOTICE: notice triggered for injection point hash-aggregate-process-batch
|
||||
NOTICE: notice triggered for injection point hash-aggregate-spill-1000
|
||||
NOTICE: notice triggered for injection point hash-aggregate-enter-spill-mode
|
||||
NOTICE: notice triggered for injection point hash-aggregate-single-partition
|
||||
NOTICE: notice triggered for injection point hash-aggregate-process-batch
|
||||
count
|
||||
-------
|
||||
5100
|
||||
(1 row)
|
||||
|
||||
DROP TABLE hashagg_ij;
|
||||
DROP EXTENSION injection_points;
|
@ -35,6 +35,7 @@ tests += {
|
||||
'regress': {
|
||||
'sql': [
|
||||
'injection_points',
|
||||
'hashagg',
|
||||
'reindex_conc',
|
||||
],
|
||||
'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'],
|
||||
|
26
src/test/modules/injection_points/sql/hashagg.sql
Normal file
26
src/test/modules/injection_points/sql/hashagg.sql
Normal file
@ -0,0 +1,26 @@
|
||||
-- Test for hash aggregation
|
||||
CREATE EXTENSION injection_points;
|
||||
|
||||
SELECT injection_points_set_local();
|
||||
|
||||
SELECT injection_points_attach('hash-aggregate-enter-spill-mode', 'notice');
|
||||
SELECT injection_points_attach('hash-aggregate-process-batch', 'notice');
|
||||
|
||||
-- force partition fan-out to 1
|
||||
SELECT injection_points_attach('hash-aggregate-single-partition', 'notice');
|
||||
|
||||
-- force spilling after 1000 groups
|
||||
SELECT injection_points_attach('hash-aggregate-spill-1000', 'notice');
|
||||
|
||||
CREATE TABLE hashagg_ij(x INTEGER);
|
||||
INSERT INTO hashagg_ij SELECT g FROM generate_series(1,5100) g;
|
||||
|
||||
SET max_parallel_workers=0;
|
||||
SET max_parallel_workers_per_gather=0;
|
||||
SET enable_sort=FALSE;
|
||||
SET work_mem='4MB';
|
||||
|
||||
SELECT COUNT(*) FROM (SELECT DISTINCT x FROM hashagg_ij) s;
|
||||
|
||||
DROP TABLE hashagg_ij;
|
||||
DROP EXTENSION injection_points;
|
Loading…
x
Reference in New Issue
Block a user