mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Optimize multi-batch hash joins when the outer relation has a nonuniform
distribution, by creating a special fast path for the (first few) most common values of the outer relation. Tuples having hashvalues matching the MCVs are effectively forced to be in the first batch, so that we never write them out to the batch temp files. Bryce Cutt and Ramon Lawrence, with some editorialization by me.
This commit is contained in:
		| @@ -8,7 +8,7 @@ | ||||
|  * | ||||
|  * | ||||
|  * IDENTIFICATION | ||||
|  *	  $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.117 2009/01/01 17:23:41 momjian Exp $ | ||||
|  *	  $PostgreSQL: pgsql/src/backend/executor/nodeHash.c,v 1.118 2009/03/21 00:04:38 tgl Exp $ | ||||
|  * | ||||
|  *------------------------------------------------------------------------- | ||||
|  */ | ||||
| @@ -24,6 +24,7 @@ | ||||
| #include <math.h> | ||||
| #include <limits.h> | ||||
|  | ||||
| #include "catalog/pg_statistic.h" | ||||
| #include "commands/tablespace.h" | ||||
| #include "executor/execdebug.h" | ||||
| #include "executor/hashjoin.h" | ||||
| @@ -35,9 +36,17 @@ | ||||
| #include "utils/dynahash.h" | ||||
| #include "utils/memutils.h" | ||||
| #include "utils/lsyscache.h" | ||||
| #include "utils/syscache.h" | ||||
|  | ||||
|  | ||||
| static void ExecHashIncreaseNumBatches(HashJoinTable hashtable); | ||||
| static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, | ||||
| 								  int mcvsToUse); | ||||
| static void ExecHashSkewTableInsert(HashJoinTable hashtable, | ||||
| 									TupleTableSlot *slot, | ||||
| 									uint32 hashvalue, | ||||
| 									int bucketNumber); | ||||
| static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable); | ||||
|  | ||||
|  | ||||
| /* ---------------------------------------------------------------- | ||||
| @@ -99,7 +108,20 @@ MultiExecHash(HashState *node) | ||||
| 		if (ExecHashGetHashValue(hashtable, econtext, hashkeys, false, false, | ||||
| 								 &hashvalue)) | ||||
| 		{ | ||||
| 			ExecHashTableInsert(hashtable, slot, hashvalue); | ||||
| 			int		bucketNumber; | ||||
|  | ||||
| 			bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue); | ||||
| 			if (bucketNumber != INVALID_SKEW_BUCKET_NO) | ||||
| 			{ | ||||
| 				/* It's a skew tuple, so put it into that hash table */ | ||||
| 				ExecHashSkewTableInsert(hashtable, slot, hashvalue, | ||||
| 										bucketNumber); | ||||
| 			} | ||||
| 			else | ||||
| 			{ | ||||
| 				/* Not subject to skew optimization, so insert normally */ | ||||
| 				ExecHashTableInsert(hashtable, slot, hashvalue); | ||||
| 			} | ||||
| 			hashtable->totalTuples += 1; | ||||
| 		} | ||||
| 	} | ||||
| @@ -225,6 +247,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators) | ||||
| 	Plan	   *outerNode; | ||||
| 	int			nbuckets; | ||||
| 	int			nbatch; | ||||
| 	int			num_skew_mcvs; | ||||
| 	int			log2_nbuckets; | ||||
| 	int			nkeys; | ||||
| 	int			i; | ||||
| @@ -239,7 +262,8 @@ ExecHashTableCreate(Hash *node, List *hashOperators) | ||||
| 	outerNode = outerPlan(node); | ||||
|  | ||||
| 	ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width, | ||||
| 							&nbuckets, &nbatch); | ||||
| 							OidIsValid(node->skewTable), | ||||
| 							&nbuckets, &nbatch, &num_skew_mcvs); | ||||
|  | ||||
| #ifdef HJDEBUG | ||||
| 	printf("nbatch = %d, nbuckets = %d\n", nbatch, nbuckets); | ||||
| @@ -259,6 +283,11 @@ ExecHashTableCreate(Hash *node, List *hashOperators) | ||||
| 	hashtable->nbuckets = nbuckets; | ||||
| 	hashtable->log2_nbuckets = log2_nbuckets; | ||||
| 	hashtable->buckets = NULL; | ||||
| 	hashtable->skewEnabled = false; | ||||
| 	hashtable->skewBucket = NULL; | ||||
| 	hashtable->skewBucketLen = 0; | ||||
| 	hashtable->nSkewBuckets = 0; | ||||
| 	hashtable->skewBucketNums = NULL; | ||||
| 	hashtable->nbatch = nbatch; | ||||
| 	hashtable->curbatch = 0; | ||||
| 	hashtable->nbatch_original = nbatch; | ||||
| @@ -269,6 +298,9 @@ ExecHashTableCreate(Hash *node, List *hashOperators) | ||||
| 	hashtable->outerBatchFile = NULL; | ||||
| 	hashtable->spaceUsed = 0; | ||||
| 	hashtable->spaceAllowed = work_mem * 1024L; | ||||
| 	hashtable->spaceUsedSkew = 0; | ||||
| 	hashtable->spaceAllowedSkew = | ||||
| 		hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100; | ||||
|  | ||||
| 	/* | ||||
| 	 * Get info about the hash functions to be used for each hash key. Also | ||||
| @@ -339,6 +371,13 @@ ExecHashTableCreate(Hash *node, List *hashOperators) | ||||
| 	hashtable->buckets = (HashJoinTuple *) | ||||
| 		palloc0(nbuckets * sizeof(HashJoinTuple)); | ||||
|  | ||||
| 	/* | ||||
| 	 * Set up for skew optimization, if possible and there's a need for more | ||||
| 	 * than one batch.  (In a one-batch join, there's no point in it.) | ||||
| 	 */ | ||||
| 	if (nbatch > 1) | ||||
| 		ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs); | ||||
|  | ||||
| 	MemoryContextSwitchTo(oldcxt); | ||||
|  | ||||
| 	return hashtable; | ||||
| @@ -356,13 +395,15 @@ ExecHashTableCreate(Hash *node, List *hashOperators) | ||||
| #define NTUP_PER_BUCKET			10 | ||||
|  | ||||
| void | ||||
| ExecChooseHashTableSize(double ntuples, int tupwidth, | ||||
| ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, | ||||
| 						int *numbuckets, | ||||
| 						int *numbatches) | ||||
| 						int *numbatches, | ||||
| 						int *num_skew_mcvs) | ||||
| { | ||||
| 	int			tupsize; | ||||
| 	double		inner_rel_bytes; | ||||
| 	long		hash_table_bytes; | ||||
| 	long		skew_table_bytes; | ||||
| 	int			nbatch; | ||||
| 	int			nbuckets; | ||||
| 	int			i; | ||||
| @@ -386,6 +427,41 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, | ||||
| 	 */ | ||||
| 	hash_table_bytes = work_mem * 1024L; | ||||
|  | ||||
| 	/* | ||||
| 	 * If skew optimization is possible, estimate the number of skew buckets | ||||
| 	 * that will fit in the memory allowed, and decrement the assumed space | ||||
| 	 * available for the main hash table accordingly. | ||||
| 	 * | ||||
| 	 * We make the optimistic assumption that each skew bucket will contain | ||||
| 	 * one inner-relation tuple.  If that turns out to be low, we will recover | ||||
| 	 * at runtime by reducing the number of skew buckets. | ||||
| 	 * | ||||
| 	 * hashtable->skewBucket will have up to 8 times as many HashSkewBucket | ||||
| 	 * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash | ||||
| 	 * will round up to the next power of 2 and then multiply by 4 to reduce | ||||
| 	 * collisions. | ||||
| 	 */ | ||||
| 	if (useskew) | ||||
| 	{ | ||||
| 		skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100; | ||||
|  | ||||
| 		*num_skew_mcvs = skew_table_bytes / ( | ||||
| 			/* size of a hash tuple */ | ||||
| 			tupsize + | ||||
| 			/* worst-case size of skewBucket[] per MCV */ | ||||
| 			(8 * sizeof(HashSkewBucket *)) + | ||||
| 			/* size of skewBucketNums[] entry */ | ||||
| 			sizeof(int) + | ||||
| 			/* size of skew bucket struct itself */ | ||||
| 			SKEW_BUCKET_OVERHEAD | ||||
| 			); | ||||
|  | ||||
| 		if (*num_skew_mcvs > 0) | ||||
| 			hash_table_bytes -= skew_table_bytes; | ||||
| 	} | ||||
| 	else | ||||
| 		*num_skew_mcvs = 0; | ||||
|  | ||||
| 	/* | ||||
| 	 * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when | ||||
| 	 * memory is filled.  Set nbatch to the smallest power of 2 that appears | ||||
| @@ -813,13 +889,18 @@ ExecScanHashBucket(HashJoinState *hjstate, | ||||
| 	uint32		hashvalue = hjstate->hj_CurHashValue; | ||||
|  | ||||
| 	/* | ||||
| 	 * hj_CurTuple is NULL to start scanning a new bucket, or the address of | ||||
| 	 * the last tuple returned from the current bucket. | ||||
| 	 * hj_CurTuple is the address of the tuple last returned from the current | ||||
| 	 * bucket, or NULL if it's time to start scanning a new bucket. | ||||
| 	 * | ||||
| 	 * If the tuple hashed to a skew bucket then scan the skew bucket | ||||
| 	 * otherwise scan the standard hashtable bucket. | ||||
| 	 */ | ||||
| 	if (hashTuple == NULL) | ||||
| 		hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; | ||||
| 	else | ||||
| 	if (hashTuple != NULL) | ||||
| 		hashTuple = hashTuple->next; | ||||
| 	else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO) | ||||
| 		hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples; | ||||
| 	else | ||||
| 		hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; | ||||
|  | ||||
| 	while (hashTuple != NULL) | ||||
| 	{ | ||||
| @@ -889,3 +970,347 @@ ExecReScanHash(HashState *node, ExprContext *exprCtxt) | ||||
| 	if (((PlanState *) node)->lefttree->chgParam == NULL) | ||||
| 		ExecReScan(((PlanState *) node)->lefttree, exprCtxt); | ||||
| } | ||||
|  | ||||
|  | ||||
| /* | ||||
|  * ExecHashBuildSkewHash | ||||
|  * | ||||
|  *		Set up for skew optimization if we can identify the most common values | ||||
|  *		(MCVs) of the outer relation's join key.  We make a skew hash bucket | ||||
|  *		for the hash value of each MCV, up to the number of slots allowed | ||||
|  *		based on available memory. | ||||
|  */ | ||||
| static void | ||||
| ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse) | ||||
| { | ||||
| 	HeapTupleData	*statsTuple; | ||||
| 	Datum			*values; | ||||
| 	int				nvalues; | ||||
| 	float4			*numbers; | ||||
| 	int				nnumbers; | ||||
|  | ||||
| 	/* Do nothing if planner didn't identify the outer relation's join key */ | ||||
| 	if (!OidIsValid(node->skewTable)) | ||||
| 		return; | ||||
| 	/* Also, do nothing if we don't have room for at least one skew bucket */ | ||||
| 	if (mcvsToUse <= 0) | ||||
| 		return; | ||||
|  | ||||
| 	/* | ||||
| 	 * Try to find the MCV statistics for the outer relation's join key. | ||||
| 	 */ | ||||
| 	statsTuple = SearchSysCache(STATRELATT, | ||||
| 								ObjectIdGetDatum(node->skewTable), | ||||
| 								Int16GetDatum(node->skewColumn), | ||||
| 								0, 0); | ||||
| 	if (!HeapTupleIsValid(statsTuple)) | ||||
| 		return; | ||||
|  | ||||
| 	if (get_attstatsslot(statsTuple, node->skewColType, node->skewColTypmod, | ||||
| 						 STATISTIC_KIND_MCV, InvalidOid, | ||||
| 						 &values, &nvalues, | ||||
| 						 &numbers, &nnumbers)) | ||||
| 	{ | ||||
| 		double		frac; | ||||
| 		int			nbuckets; | ||||
| 		FmgrInfo   *hashfunctions; | ||||
| 		int			i; | ||||
|  | ||||
| 		if (mcvsToUse > nvalues) | ||||
| 			mcvsToUse = nvalues; | ||||
|  | ||||
| 		/* | ||||
| 		 * Calculate the expected fraction of outer relation that will | ||||
| 		 * participate in the skew optimization.  If this isn't at least | ||||
| 		 * SKEW_MIN_OUTER_FRACTION, don't use skew optimization. | ||||
| 		 */ | ||||
| 		frac = 0; | ||||
| 		for (i = 0; i < mcvsToUse; i++) | ||||
| 			frac += numbers[i]; | ||||
| 		if (frac < SKEW_MIN_OUTER_FRACTION) | ||||
| 		{ | ||||
| 			free_attstatsslot(node->skewColType, | ||||
| 							  values, nvalues, numbers, nnumbers); | ||||
| 			ReleaseSysCache(statsTuple); | ||||
| 			return; | ||||
| 		} | ||||
|  | ||||
| 		/* | ||||
| 		 * Okay, set up the skew hashtable. | ||||
| 		 * | ||||
| 		 * skewBucket[] is an open addressing hashtable with a power of 2 size | ||||
| 		 * that is greater than the number of MCV values.  (This ensures there | ||||
| 		 * will be at least one null entry, so searches will always terminate.) | ||||
| 		 * | ||||
| 		 * Note: this code could fail if mcvsToUse exceeds INT_MAX/8, but | ||||
| 		 * that is not currently possible since we limit pg_statistic entries | ||||
| 		 * to much less than that. | ||||
| 		 */ | ||||
| 		nbuckets = 2; | ||||
| 		while (nbuckets <= mcvsToUse) | ||||
| 			nbuckets <<= 1; | ||||
| 		/* use two more bits just to help avoid collisions */ | ||||
| 		nbuckets <<= 2; | ||||
|  | ||||
| 		hashtable->skewEnabled = true; | ||||
| 		hashtable->skewBucketLen = nbuckets; | ||||
|  | ||||
| 		/* | ||||
| 		 * We allocate the bucket memory in the hashtable's batch context. | ||||
| 		 * It is only needed during the first batch, and this ensures it | ||||
| 		 * will be automatically removed once the first batch is done. | ||||
| 		 */ | ||||
| 		hashtable->skewBucket = (HashSkewBucket **) | ||||
| 			MemoryContextAllocZero(hashtable->batchCxt, | ||||
| 								   nbuckets * sizeof(HashSkewBucket *)); | ||||
| 		hashtable->skewBucketNums = (int *) | ||||
| 			MemoryContextAllocZero(hashtable->batchCxt, | ||||
| 								   mcvsToUse * sizeof(int)); | ||||
|  | ||||
| 		hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *) | ||||
| 			+ mcvsToUse * sizeof(int); | ||||
| 		hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *) | ||||
| 			+ mcvsToUse * sizeof(int); | ||||
|  | ||||
| 		/* | ||||
| 		 * Create a skew bucket for each MCV hash value. | ||||
| 		 * | ||||
| 		 * Note: it is very important that we create the buckets in order | ||||
| 		 * of decreasing MCV frequency.  If we have to remove some buckets, | ||||
| 		 * they must be removed in reverse order of creation (see notes in | ||||
| 		 * ExecHashRemoveNextSkewBucket) and we want the least common MCVs | ||||
| 		 * to be removed first. | ||||
| 		 */ | ||||
| 		hashfunctions = hashtable->outer_hashfunctions; | ||||
|  | ||||
| 		for (i = 0; i < mcvsToUse; i++) | ||||
| 		{ | ||||
| 			uint32 hashvalue; | ||||
| 			int bucket; | ||||
|  | ||||
| 			hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0], | ||||
| 													 values[i])); | ||||
|  | ||||
| 			/* | ||||
| 			 * While we have not hit a hole in the hashtable and have not hit | ||||
| 			 * the desired bucket, we have collided with some previous hash | ||||
| 			 * value, so try the next bucket location.  NB: this code must | ||||
| 			 * match ExecHashGetSkewBucket. | ||||
| 			 */ | ||||
| 			bucket = hashvalue & (nbuckets - 1); | ||||
| 			while (hashtable->skewBucket[bucket] != NULL && | ||||
| 				   hashtable->skewBucket[bucket]->hashvalue != hashvalue) | ||||
| 				bucket = (bucket + 1) & (nbuckets - 1); | ||||
|  | ||||
| 			/* | ||||
| 			 * If we found an existing bucket with the same hashvalue, | ||||
| 			 * leave it alone.  It's okay for two MCVs to share a hashvalue. | ||||
| 			 */ | ||||
| 			if (hashtable->skewBucket[bucket] != NULL) | ||||
| 				continue; | ||||
|  | ||||
| 			/* Okay, create a new skew bucket for this hashvalue. */ | ||||
| 			hashtable->skewBucket[bucket] = (HashSkewBucket *) | ||||
| 				MemoryContextAlloc(hashtable->batchCxt, | ||||
| 								   sizeof(HashSkewBucket)); | ||||
| 			hashtable->skewBucket[bucket]->hashvalue = hashvalue; | ||||
| 			hashtable->skewBucket[bucket]->tuples = NULL; | ||||
| 			hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket; | ||||
| 			hashtable->nSkewBuckets++; | ||||
| 			hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD; | ||||
| 			hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD; | ||||
| 		} | ||||
|  | ||||
| 		free_attstatsslot(node->skewColType, | ||||
| 						  values, nvalues, numbers, nnumbers); | ||||
| 	} | ||||
|  | ||||
| 	ReleaseSysCache(statsTuple); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * ExecHashGetSkewBucket | ||||
|  * | ||||
|  *		Returns the index of the skew bucket for this hashvalue, | ||||
|  *		or INVALID_SKEW_BUCKET_NO if the hashvalue is not | ||||
|  *		associated with any active skew bucket. | ||||
|  */ | ||||
| int | ||||
| ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue) | ||||
| { | ||||
| 	int			bucket; | ||||
|  | ||||
| 	/* | ||||
| 	 * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization | ||||
| 	 * (in particular, this happens after the initial batch is done). | ||||
| 	 */ | ||||
| 	if (!hashtable->skewEnabled) | ||||
| 		return INVALID_SKEW_BUCKET_NO; | ||||
|  | ||||
| 	/* | ||||
| 	 * Since skewBucketLen is a power of 2, we can do a modulo by ANDing. | ||||
| 	 */ | ||||
| 	bucket = hashvalue & (hashtable->skewBucketLen - 1); | ||||
|  | ||||
| 	/* | ||||
| 	 * While we have not hit a hole in the hashtable and have not hit the | ||||
| 	 * desired bucket, we have collided with some other hash value, so try | ||||
| 	 * the next bucket location. | ||||
| 	 */ | ||||
| 	while (hashtable->skewBucket[bucket] != NULL && | ||||
| 		   hashtable->skewBucket[bucket]->hashvalue != hashvalue) | ||||
| 		bucket = (bucket + 1) & (hashtable->skewBucketLen - 1); | ||||
|  | ||||
| 	/* | ||||
| 	 * Found the desired bucket? | ||||
| 	 */ | ||||
| 	if (hashtable->skewBucket[bucket] != NULL) | ||||
| 		return bucket; | ||||
|  | ||||
| 	/* | ||||
| 	 * There must not be any hashtable entry for this hash value. | ||||
| 	 */ | ||||
| 	return INVALID_SKEW_BUCKET_NO; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * ExecHashSkewTableInsert | ||||
|  * | ||||
|  *		Insert a tuple into the skew hashtable. | ||||
|  * | ||||
|  * This should generally match up with the current-batch case in | ||||
|  * ExecHashTableInsert. | ||||
|  */ | ||||
| static void | ||||
| ExecHashSkewTableInsert(HashJoinTable hashtable, | ||||
| 						TupleTableSlot *slot, | ||||
| 						uint32 hashvalue, | ||||
| 						int bucketNumber) | ||||
| { | ||||
| 	MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); | ||||
| 	HashJoinTuple hashTuple; | ||||
| 	int			hashTupleSize; | ||||
|  | ||||
| 	/* Create the HashJoinTuple */ | ||||
| 	hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len; | ||||
| 	hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, | ||||
| 												   hashTupleSize); | ||||
| 	hashTuple->hashvalue = hashvalue; | ||||
| 	memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); | ||||
|  | ||||
| 	/* Push it onto the front of the skew bucket's list */ | ||||
| 	hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples; | ||||
| 	hashtable->skewBucket[bucketNumber]->tuples = hashTuple; | ||||
|  | ||||
| 	/* Account for space used, and back off if we've used too much */ | ||||
| 	hashtable->spaceUsed += hashTupleSize; | ||||
| 	hashtable->spaceUsedSkew += hashTupleSize; | ||||
| 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew) | ||||
| 		ExecHashRemoveNextSkewBucket(hashtable); | ||||
|  | ||||
| 	/* Check we are not over the total spaceAllowed, either */ | ||||
| 	if (hashtable->spaceUsed > hashtable->spaceAllowed) | ||||
| 		ExecHashIncreaseNumBatches(hashtable); | ||||
| } | ||||
|  | ||||
| /* | ||||
|  *		ExecHashRemoveNextSkewBucket | ||||
|  * | ||||
|  *		Remove the least valuable skew bucket by pushing its tuples into | ||||
|  *		the main hash table. | ||||
|  */ | ||||
| static void | ||||
| ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) | ||||
| { | ||||
| 	int bucketToRemove; | ||||
| 	HashSkewBucket *bucket; | ||||
| 	uint32 hashvalue; | ||||
| 	int bucketno; | ||||
| 	int batchno; | ||||
| 	HashJoinTuple hashTuple; | ||||
|  | ||||
| 	/* Locate the bucket to remove */ | ||||
| 	bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1]; | ||||
| 	bucket = hashtable->skewBucket[bucketToRemove]; | ||||
|  | ||||
| 	/* | ||||
| 	 * Calculate which bucket and batch the tuples belong to in the main | ||||
| 	 * hashtable.  They all have the same hash value, so it's the same for all | ||||
| 	 * of them.  Also note that it's not possible for nbatch to increase | ||||
| 	 * while we are processing the tuples. | ||||
| 	 */ | ||||
| 	hashvalue = bucket->hashvalue; | ||||
| 	ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno); | ||||
|  | ||||
| 	/* Process all tuples in the bucket */ | ||||
| 	hashTuple = bucket->tuples; | ||||
| 	while (hashTuple != NULL) | ||||
| 	{ | ||||
| 		HashJoinTuple nextHashTuple = hashTuple->next; | ||||
| 		MinimalTuple tuple; | ||||
| 		Size	tupleSize; | ||||
|  | ||||
| 		/* | ||||
| 		 * This code must agree with ExecHashTableInsert.  We do not use | ||||
| 		 * ExecHashTableInsert directly as ExecHashTableInsert expects a | ||||
| 		 * TupleTableSlot while we already have HashJoinTuples. | ||||
| 		 */ | ||||
| 		tuple = HJTUPLE_MINTUPLE(hashTuple); | ||||
| 		tupleSize = HJTUPLE_OVERHEAD + tuple->t_len; | ||||
|  | ||||
| 		/* Decide whether to put the tuple in the hash table or a temp file */ | ||||
| 		if (batchno == hashtable->curbatch) | ||||
| 		{ | ||||
| 			/* Move the tuple to the main hash table */ | ||||
| 			hashTuple->next = hashtable->buckets[bucketno]; | ||||
| 			hashtable->buckets[bucketno] = hashTuple; | ||||
| 			/* We have reduced skew space, but overall space doesn't change */ | ||||
| 			hashtable->spaceUsedSkew -= tupleSize; | ||||
| 		} | ||||
| 		else | ||||
| 		{ | ||||
| 			/* Put the tuple into a temp file for later batches */ | ||||
| 			Assert(batchno > hashtable->curbatch); | ||||
| 			ExecHashJoinSaveTuple(tuple, hashvalue, | ||||
| 								  &hashtable->innerBatchFile[batchno]); | ||||
| 			pfree(hashTuple); | ||||
| 			hashtable->spaceUsed -= tupleSize; | ||||
| 			hashtable->spaceUsedSkew -= tupleSize; | ||||
| 		} | ||||
|  | ||||
| 		hashTuple = nextHashTuple; | ||||
| 	} | ||||
|  | ||||
| 	/* | ||||
| 	 * Free the bucket struct itself and reset the hashtable entry to NULL. | ||||
| 	 * | ||||
| 	 * NOTE: this is not nearly as simple as it looks on the surface, because | ||||
| 	 * of the possibility of collisions in the hashtable.  Suppose that hash | ||||
| 	 * values A and B collide at a particular hashtable entry, and that A | ||||
| 	 * was entered first so B gets shifted to a different table entry.  If | ||||
| 	 * we were to remove A first then ExecHashGetSkewBucket would mistakenly | ||||
| 	 * start reporting that B is not in the hashtable, because it would hit | ||||
| 	 * the NULL before finding B.  However, we always remove entries in the | ||||
| 	 * reverse order of creation, so this failure cannot happen. | ||||
| 	 */ | ||||
| 	hashtable->skewBucket[bucketToRemove] = NULL; | ||||
| 	hashtable->nSkewBuckets--; | ||||
| 	pfree(bucket); | ||||
| 	hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD; | ||||
| 	hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD; | ||||
|  | ||||
| 	/* | ||||
| 	 * If we have removed all skew buckets then give up on skew optimization. | ||||
| 	 * Release the arrays since they aren't useful any more. | ||||
| 	 */ | ||||
| 	if (hashtable->nSkewBuckets == 0) | ||||
| 	{ | ||||
| 		hashtable->skewEnabled = false; | ||||
| 		pfree(hashtable->skewBucket); | ||||
| 		pfree(hashtable->skewBucketNums); | ||||
| 		hashtable->skewBucket = NULL; | ||||
| 		hashtable->skewBucketNums = NULL; | ||||
| 		hashtable->spaceUsed -= hashtable->spaceUsedSkew; | ||||
| 		hashtable->spaceUsedSkew = 0; | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user