mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Add hash_mem_multiplier GUC.
Add a GUC that acts as a multiplier on work_mem.  It gets applied when
sizing executor node hash tables that were previously size constrained
using work_mem alone.
The new GUC can be used to preferentially give hash-based nodes more
memory than the generic work_mem limit.  It is intended to enable admin
tuning of the executor's memory usage.  Overall system throughput and
system responsiveness can be improved by giving hash-based executor
nodes more memory (especially over sort-based alternatives, which are
often much less sensitive to being memory constrained).
The default value for hash_mem_multiplier is 1.0, which is also the
minimum valid value.  This means that hash-based nodes continue to apply
work_mem in the traditional way by default.
hash_mem_multiplier is generally useful.  However, it is being added now
due to concerns about hash aggregate performance stability for users
that upgrade to Postgres 13 (which added disk-based hash aggregation in
commit 1f39bce0).  While the old hash aggregate behavior risked
out-of-memory errors, it is nevertheless likely that many users actually
benefited.  Hash agg's previous indifference to work_mem during query
execution was not just faster; it also accidentally made aggregation
resilient to grouping estimate problems (at least in cases where this
didn't create destabilizing memory pressure).
hash_mem_multiplier can provide a certain kind of continuity with the
behavior of Postgres 12 hash aggregates in cases where the planner
incorrectly estimates that all groups (plus related allocations) will
fit in work_mem/hash_mem.  This seems necessary because hash-based
aggregation is usually much slower when only a small fraction of all
groups can fit.  Even when it isn't possible to totally avoid hash
aggregates that spill, giving hash aggregation more memory will reliably
improve performance (the same cannot be said for external sort
operations, which appear to be almost unaffected by memory availability
provided it's at least possible to get a single merge pass).
The PostgreSQL 13 release notes should advise users that increasing
hash_mem_multiplier can help with performance regressions associated
with hash aggregation.  That can be taken care of by a later commit.
Author: Peter Geoghegan
Reviewed-By: Álvaro Herrera, Jeff Davis
Discussion: https://postgr.es/m/20200625203629.7m6yvut7eqblgmfo@alap3.anarazel.de
Discussion: https://postgr.es/m/CAH2-WzmD%2Bi1pG6rc1%2BCjc4V6EaFJ_qSuKCCHVnH%3DoruqD-zqow%40mail.gmail.com
Backpatch: 13-, where disk-based hash aggregation was introduced.
			
			
This commit is contained in:
		| @@ -165,13 +165,14 @@ BuildTupleHashTableExt(PlanState *parent, | ||||
| { | ||||
| 	TupleHashTable hashtable; | ||||
| 	Size		entrysize = sizeof(TupleHashEntryData) + additionalsize; | ||||
| 	int			hash_mem = get_hash_mem(); | ||||
| 	MemoryContext oldcontext; | ||||
| 	bool		allow_jit; | ||||
|  | ||||
| 	Assert(nbuckets > 0); | ||||
|  | ||||
| 	/* Limit initial table size request to not more than work_mem */ | ||||
| 	nbuckets = Min(nbuckets, (long) ((work_mem * 1024L) / entrysize)); | ||||
| 	/* Limit initial table size request to not more than hash_mem */ | ||||
| 	nbuckets = Min(nbuckets, (long) ((hash_mem * 1024L) / entrysize)); | ||||
|  | ||||
| 	oldcontext = MemoryContextSwitchTo(metacxt); | ||||
|  | ||||
|   | ||||
| @@ -203,7 +203,7 @@ | ||||
|  *	  entries (and initialize new transition states), we instead spill them to | ||||
|  *	  disk to be processed later. The tuples are spilled in a partitioned | ||||
|  *	  manner, so that subsequent batches are smaller and less likely to exceed | ||||
|  *	  work_mem (if a batch does exceed work_mem, it must be spilled | ||||
|  *	  hash_mem (if a batch does exceed hash_mem, it must be spilled | ||||
|  *	  recursively). | ||||
|  * | ||||
|  *	  Spilled data is written to logical tapes. These provide better control | ||||
| @@ -212,7 +212,7 @@ | ||||
|  * | ||||
|  *	  Note that it's possible for transition states to start small but then | ||||
|  *	  grow very large; for instance in the case of ARRAY_AGG. In such cases, | ||||
|  *	  it's still possible to significantly exceed work_mem. We try to avoid | ||||
|  *	  it's still possible to significantly exceed hash_mem. We try to avoid | ||||
|  *	  this situation by estimating what will fit in the available memory, and | ||||
|  *	  imposing a limit on the number of groups separately from the amount of | ||||
|  *	  memory consumed. | ||||
| @@ -1516,7 +1516,7 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets) | ||||
|  | ||||
| 	/* | ||||
| 	 * Used to make sure initial hash table allocation does not exceed | ||||
| 	 * work_mem. Note that the estimate does not include space for | ||||
| 	 * hash_mem. Note that the estimate does not include space for | ||||
| 	 * pass-by-reference transition data values, nor for the representative | ||||
| 	 * tuple of each group. | ||||
| 	 */ | ||||
| @@ -1782,7 +1782,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Set limits that trigger spilling to avoid exceeding work_mem. Consider the | ||||
|  * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the | ||||
|  * number of partitions we expect to create (if we do spill). | ||||
|  * | ||||
|  * There are two limits: a memory limit, and also an ngroups limit. The | ||||
| @@ -1796,13 +1796,14 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, | ||||
| { | ||||
| 	int			npartitions; | ||||
| 	Size		partition_mem; | ||||
| 	int			hash_mem = get_hash_mem(); | ||||
|  | ||||
| 	/* if not expected to spill, use all of work_mem */ | ||||
| 	if (input_groups * hashentrysize < work_mem * 1024L) | ||||
| 	/* if not expected to spill, use all of hash_mem */ | ||||
| 	if (input_groups * hashentrysize < hash_mem * 1024L) | ||||
| 	{ | ||||
| 		if (num_partitions != NULL) | ||||
| 			*num_partitions = 0; | ||||
| 		*mem_limit = work_mem * 1024L; | ||||
| 		*mem_limit = hash_mem * 1024L; | ||||
| 		*ngroups_limit = *mem_limit / hashentrysize; | ||||
| 		return; | ||||
| 	} | ||||
| @@ -1824,14 +1825,14 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, | ||||
| 		HASHAGG_WRITE_BUFFER_SIZE * npartitions; | ||||
|  | ||||
| 	/* | ||||
| 	 * Don't set the limit below 3/4 of work_mem. In that case, we are at the | ||||
| 	 * Don't set the limit below 3/4 of hash_mem. In that case, we are at the | ||||
| 	 * minimum number of partitions, so we aren't going to dramatically exceed | ||||
| 	 * work mem anyway. | ||||
| 	 */ | ||||
| 	if (work_mem * 1024L > 4 * partition_mem) | ||||
| 		*mem_limit = work_mem * 1024L - partition_mem; | ||||
| 	if (hash_mem * 1024L > 4 * partition_mem) | ||||
| 		*mem_limit = hash_mem * 1024L - partition_mem; | ||||
| 	else | ||||
| 		*mem_limit = work_mem * 1024L * 0.75; | ||||
| 		*mem_limit = hash_mem * 1024L * 0.75; | ||||
|  | ||||
| 	if (*mem_limit > hashentrysize) | ||||
| 		*ngroups_limit = *mem_limit / hashentrysize; | ||||
| @@ -1989,19 +1990,20 @@ hash_choose_num_partitions(double input_groups, double hashentrysize, | ||||
| 	int			partition_limit; | ||||
| 	int			npartitions; | ||||
| 	int			partition_bits; | ||||
| 	int			hash_mem = get_hash_mem(); | ||||
|  | ||||
| 	/* | ||||
| 	 * Avoid creating so many partitions that the memory requirements of the | ||||
| 	 * open partition files are greater than 1/4 of work_mem. | ||||
| 	 * open partition files are greater than 1/4 of hash_mem. | ||||
| 	 */ | ||||
| 	partition_limit = | ||||
| 		(work_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) / | ||||
| 		(hash_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) / | ||||
| 		HASHAGG_WRITE_BUFFER_SIZE; | ||||
|  | ||||
| 	mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize; | ||||
|  | ||||
| 	/* make enough partitions so that each one is likely to fit in memory */ | ||||
| 	npartitions = 1 + (mem_wanted / (work_mem * 1024L)); | ||||
| 	npartitions = 1 + (mem_wanted / (hash_mem * 1024L)); | ||||
|  | ||||
| 	if (npartitions > partition_limit) | ||||
| 		npartitions = partition_limit; | ||||
|   | ||||
| @@ -39,6 +39,7 @@ | ||||
| #include "port/atomics.h" | ||||
| #include "port/pg_bitutils.h" | ||||
| #include "utils/dynahash.h" | ||||
| #include "utils/guc.h" | ||||
| #include "utils/lsyscache.h" | ||||
| #include "utils/memutils.h" | ||||
| #include "utils/syscache.h" | ||||
| @@ -506,7 +507,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, | ||||
| 	hashtable->spaceAllowed = space_allowed; | ||||
| 	hashtable->spaceUsedSkew = 0; | ||||
| 	hashtable->spaceAllowedSkew = | ||||
| 		hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; | ||||
| 		hashtable->spaceAllowed * SKEW_HASH_MEM_PERCENT / 100; | ||||
| 	hashtable->chunks = NULL; | ||||
| 	hashtable->current_chunk = NULL; | ||||
| 	hashtable->parallel_state = state->parallel_state; | ||||
| @@ -665,7 +666,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, | ||||
|  | ||||
| void | ||||
| ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, | ||||
| 						bool try_combined_work_mem, | ||||
| 						bool try_combined_hash_mem, | ||||
| 						int parallel_workers, | ||||
| 						size_t *space_allowed, | ||||
| 						int *numbuckets, | ||||
| @@ -682,6 +683,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, | ||||
| 	int			nbatch = 1; | ||||
| 	int			nbuckets; | ||||
| 	double		dbuckets; | ||||
| 	int			hash_mem = get_hash_mem(); | ||||
|  | ||||
| 	/* Force a plausible relation size if no info */ | ||||
| 	if (ntuples <= 0.0) | ||||
| @@ -698,16 +700,16 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, | ||||
| 	inner_rel_bytes = ntuples * tupsize; | ||||
|  | ||||
| 	/* | ||||
| 	 * Target in-memory hashtable size is work_mem kilobytes. | ||||
| 	 * Target in-memory hashtable size is hash_mem kilobytes. | ||||
| 	 */ | ||||
| 	hash_table_bytes = work_mem * 1024L; | ||||
| 	hash_table_bytes = hash_mem * 1024L; | ||||
|  | ||||
| 	/* | ||||
| 	 * Parallel Hash tries to use the combined work_mem of all workers to | ||||
| 	 * avoid the need to batch.  If that won't work, it falls back to work_mem | ||||
| 	 * Parallel Hash tries to use the combined hash_mem of all workers to | ||||
| 	 * avoid the need to batch.  If that won't work, it falls back to hash_mem | ||||
| 	 * per worker and tries to process batches in parallel. | ||||
| 	 */ | ||||
| 	if (try_combined_work_mem) | ||||
| 	if (try_combined_hash_mem) | ||||
| 		hash_table_bytes += hash_table_bytes * parallel_workers; | ||||
|  | ||||
| 	*space_allowed = hash_table_bytes; | ||||
| @@ -728,7 +730,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, | ||||
| 	 */ | ||||
| 	if (useskew) | ||||
| 	{ | ||||
| 		skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100; | ||||
| 		skew_table_bytes = hash_table_bytes * SKEW_HASH_MEM_PERCENT / 100; | ||||
|  | ||||
| 		/*---------- | ||||
| 		 * Divisor is: | ||||
| @@ -751,7 +753,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, | ||||
| 	/* | ||||
| 	 * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when | ||||
| 	 * memory is filled, assuming a single batch; but limit the value so that | ||||
| 	 * the pointer arrays we'll try to allocate do not exceed work_mem nor | ||||
| 	 * the pointer arrays we'll try to allocate do not exceed hash_mem nor | ||||
| 	 * MaxAllocSize. | ||||
| 	 * | ||||
| 	 * Note that both nbuckets and nbatch must be powers of 2 to make | ||||
| @@ -790,10 +792,10 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, | ||||
| 		long		bucket_size; | ||||
|  | ||||
| 		/* | ||||
| 		 * If Parallel Hash with combined work_mem would still need multiple | ||||
| 		 * batches, we'll have to fall back to regular work_mem budget. | ||||
| 		 * If Parallel Hash with combined hash_mem would still need multiple | ||||
| 		 * batches, we'll have to fall back to regular hash_mem budget. | ||||
| 		 */ | ||||
| 		if (try_combined_work_mem) | ||||
| 		if (try_combined_hash_mem) | ||||
| 		{ | ||||
| 			ExecChooseHashTableSize(ntuples, tupwidth, useskew, | ||||
| 									false, parallel_workers, | ||||
| @@ -805,7 +807,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, | ||||
| 		} | ||||
|  | ||||
| 		/* | ||||
| 		 * Estimate the number of buckets we'll want to have when work_mem is | ||||
| 		 * Estimate the number of buckets we'll want to have when hash_mem is | ||||
| 		 * entirely full.  Each bucket will contain a bucket pointer plus | ||||
| 		 * NTUP_PER_BUCKET tuples, whose projected size already includes | ||||
| 		 * overhead for the hash code, pointer to the next tuple, etc. | ||||
| @@ -820,8 +822,8 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, | ||||
| 		/* | ||||
| 		 * Buckets are simple pointers to hashjoin tuples, while tupsize | ||||
| 		 * includes the pointer, hash code, and MinimalTupleData.  So buckets | ||||
| 		 * should never really exceed 25% of work_mem (even for | ||||
| 		 * NTUP_PER_BUCKET=1); except maybe for work_mem values that are not | ||||
| 		 * should never really exceed 25% of hash_mem (even for | ||||
| 		 * NTUP_PER_BUCKET=1); except maybe for hash_mem values that are not | ||||
| 		 * 2^N bytes, where we might get more because of doubling. So let's | ||||
| 		 * look for 50% here. | ||||
| 		 */ | ||||
| @@ -1095,15 +1097,17 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) | ||||
| 				/* Figure out how many batches to use. */ | ||||
| 				if (hashtable->nbatch == 1) | ||||
| 				{ | ||||
| 					int			hash_mem = get_hash_mem(); | ||||
|  | ||||
| 					/* | ||||
| 					 * We are going from single-batch to multi-batch.  We need | ||||
| 					 * to switch from one large combined memory budget to the | ||||
| 					 * regular work_mem budget. | ||||
| 					 * regular hash_mem budget. | ||||
| 					 */ | ||||
| 					pstate->space_allowed = work_mem * 1024L; | ||||
| 					pstate->space_allowed = hash_mem * 1024L; | ||||
|  | ||||
| 					/* | ||||
| 					 * The combined work_mem of all participants wasn't | ||||
| 					 * The combined hash_mem of all participants wasn't | ||||
| 					 * enough. Therefore one batch per participant would be | ||||
| 					 * approximately equivalent and would probably also be | ||||
| 					 * insufficient.  So try two batches per participant, | ||||
| @@ -2855,7 +2859,7 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, | ||||
|  | ||||
| 		/* | ||||
| 		 * Check if our space limit would be exceeded.  To avoid choking on | ||||
| 		 * very large tuples or very low work_mem setting, we'll always allow | ||||
| 		 * very large tuples or very low hash_mem setting, we'll always allow | ||||
| 		 * each backend to allocate at least one chunk. | ||||
| 		 */ | ||||
| 		if (hashtable->batches[0].at_least_one_chunk && | ||||
| @@ -3366,3 +3370,41 @@ ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size) | ||||
|  | ||||
| 	return true; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Get a hash_mem value by multiplying the work_mem GUC's value by the | ||||
|  * hash_mem_multiplier GUC's value. | ||||
|  * | ||||
|  * Returns a work_mem style KB value that hash-based nodes (including but not | ||||
|  * limited to hash join) use in place of work_mem.  This is subject to the | ||||
|  * same restrictions as work_mem itself.  (There is no such thing as the | ||||
|  * hash_mem GUC, but it's convenient for our callers to pretend that there | ||||
|  * is.) | ||||
|  * | ||||
|  * Exported for use by the planner, as well as other hash-based executor | ||||
|  * nodes.  This is a rather random place for this, but there is no better | ||||
|  * place. | ||||
|  */ | ||||
| int | ||||
| get_hash_mem(void) | ||||
| { | ||||
| 	double		hash_mem; | ||||
|  | ||||
| 	Assert(hash_mem_multiplier >= 1.0); | ||||
|  | ||||
| 	hash_mem = (double) work_mem * hash_mem_multiplier; | ||||
|  | ||||
| 	/* | ||||
| 	 * guc.c enforces a MAX_KILOBYTES limitation on work_mem in order to | ||||
| 	 * support the assumption that raw derived byte values can be stored in | ||||
| 	 * 'long' variables.  The returned hash_mem value must also meet this | ||||
| 	 * assumption. | ||||
| 	 * | ||||
| 	 * We clamp the final value rather than throw an error because it should | ||||
| 	 * be possible to set work_mem and hash_mem_multiplier independently. | ||||
| 	 */ | ||||
| 	if (hash_mem < MAX_KILOBYTES) | ||||
| 		return (int) hash_mem; | ||||
|  | ||||
| 	return MAX_KILOBYTES; | ||||
| } | ||||
|   | ||||
| @@ -89,9 +89,9 @@ | ||||
|  * PHJ_BUILD_HASHING_INNER so we can skip loading. | ||||
|  * | ||||
|  * Initially we try to plan for a single-batch hash join using the combined | ||||
|  * work_mem of all participants to create a large shared hash table.  If that | ||||
|  * hash_mem of all participants to create a large shared hash table.  If that | ||||
|  * turns out either at planning or execution time to be impossible then we | ||||
|  * fall back to regular work_mem sized hash tables. | ||||
|  * fall back to regular hash_mem sized hash tables. | ||||
|  * | ||||
|  * To avoid deadlocks, we never wait for any barrier unless it is known that | ||||
|  * all other backends attached to it are actively executing the node or have | ||||
|   | ||||
		Reference in New Issue
	
	Block a user