mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-25 13:17:41 +03:00 
			
		
		
		
	Basic binary heap implementation.
There are probably other places where this can be used, but for now, this just makes MergeAppend use it, so that this code will have test coverage. There is other work in the queue that will use this, as well. Abhijit Menon-Sen, reviewed by Andres Freund, Robert Haas, Álvaro Herrera, Tom Lane, and others.
This commit is contained in:
		| @@ -41,17 +41,16 @@ | ||||
| #include "executor/execdebug.h" | ||||
| #include "executor/nodeMergeAppend.h" | ||||
|  | ||||
| /* | ||||
|  * It gets quite confusing having a heap array (indexed by integers) which | ||||
|  * contains integers which index into the slots array. These typedefs try to | ||||
|  * clear it up, but they're only documentation. | ||||
|  */ | ||||
| typedef int SlotNumber; | ||||
| typedef int HeapPosition; | ||||
| #include "lib/binaryheap.h" | ||||
|  | ||||
| static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot); | ||||
| static void heap_siftup_slot(MergeAppendState *node); | ||||
| static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2); | ||||
| /* | ||||
|  * We have one slot for each item in the heap array.  We use SlotNumber | ||||
|  * to store slot indexes.  This doesn't actually provide any formal | ||||
|  * type-safety, but it makes the code more self-documenting. | ||||
|  */ | ||||
| typedef int32 SlotNumber; | ||||
|  | ||||
| static int	heap_compare_slots(Datum a, Datum b, void *arg); | ||||
|  | ||||
|  | ||||
| /* ---------------------------------------------------------------- | ||||
| @@ -88,7 +87,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) | ||||
| 	mergestate->ms_nplans = nplans; | ||||
|  | ||||
| 	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans); | ||||
| 	mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans); | ||||
| 	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, | ||||
| 											  mergestate); | ||||
|  | ||||
| 	/* | ||||
| 	 * Miscellaneous initialization | ||||
| @@ -143,9 +143,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) | ||||
| 	/* | ||||
| 	 * initialize to show we have not run the subplans yet | ||||
| 	 */ | ||||
| 	mergestate->ms_heap_size = 0; | ||||
| 	mergestate->ms_initialized = false; | ||||
| 	mergestate->ms_last_slot = -1; | ||||
|  | ||||
| 	return mergestate; | ||||
| } | ||||
| @@ -172,101 +170,53 @@ ExecMergeAppend(MergeAppendState *node) | ||||
| 		{ | ||||
| 			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); | ||||
| 			if (!TupIsNull(node->ms_slots[i])) | ||||
| 				heap_insert_slot(node, i); | ||||
| 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); | ||||
| 		} | ||||
| 		binaryheap_build(node->ms_heap); | ||||
| 		node->ms_initialized = true; | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		/* | ||||
| 		 * Otherwise, pull the next tuple from whichever subplan we returned | ||||
| 		 * from last time, and insert it into the heap.  (We could simplify | ||||
| 		 * the logic a bit by doing this before returning from the prior call, | ||||
| 		 * but it's better to not pull tuples until necessary.) | ||||
| 		 * from last time, and reinsert the subplan index into the heap, | ||||
| 		 * because it might now compare differently against the existing | ||||
| 		 * elements of the heap.  (We could perhaps simplify the logic a bit | ||||
| 		 * by doing this before returning from the prior call, but it's better | ||||
| 		 * to not pull tuples until necessary.) | ||||
| 		 */ | ||||
| 		i = node->ms_last_slot; | ||||
| 		i = DatumGetInt32(binaryheap_first(node->ms_heap)); | ||||
| 		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); | ||||
| 		if (!TupIsNull(node->ms_slots[i])) | ||||
| 			heap_insert_slot(node, i); | ||||
| 			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); | ||||
| 		else | ||||
| 			(void) binaryheap_remove_first(node->ms_heap); | ||||
| 	} | ||||
|  | ||||
| 	if (node->ms_heap_size > 0) | ||||
| 	{ | ||||
| 		/* Return the topmost heap node, and sift up the remaining nodes */ | ||||
| 		i = node->ms_heap[0]; | ||||
| 		result = node->ms_slots[i]; | ||||
| 		node->ms_last_slot = i; | ||||
| 		heap_siftup_slot(node); | ||||
| 	} | ||||
| 	else | ||||
| 	if (binaryheap_empty(node->ms_heap)) | ||||
| 	{ | ||||
| 		/* All the subplans are exhausted, and so is the heap */ | ||||
| 		result = ExecClearTuple(node->ps.ps_ResultTupleSlot); | ||||
| 	} | ||||
| 	else | ||||
| 	{ | ||||
| 		i = DatumGetInt32(binaryheap_first(node->ms_heap)); | ||||
| 		result = node->ms_slots[i]; | ||||
| 	} | ||||
|  | ||||
| 	return result; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Insert a new slot into the heap.  The slot must contain a valid tuple. | ||||
|  */ | ||||
| static void | ||||
| heap_insert_slot(MergeAppendState *node, SlotNumber new_slot) | ||||
| { | ||||
| 	SlotNumber *heap = node->ms_heap; | ||||
| 	HeapPosition j; | ||||
|  | ||||
| 	Assert(!TupIsNull(node->ms_slots[new_slot])); | ||||
|  | ||||
| 	j = node->ms_heap_size++;	/* j is where the "hole" is */ | ||||
| 	while (j > 0) | ||||
| 	{ | ||||
| 		int			i = (j - 1) / 2; | ||||
|  | ||||
| 		if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0) | ||||
| 			break; | ||||
| 		heap[j] = heap[i]; | ||||
| 		j = i; | ||||
| 	} | ||||
| 	heap[j] = new_slot; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Delete the heap top (the slot in heap[0]), and sift up. | ||||
|  */ | ||||
| static void | ||||
| heap_siftup_slot(MergeAppendState *node) | ||||
| { | ||||
| 	SlotNumber *heap = node->ms_heap; | ||||
| 	HeapPosition i, | ||||
| 				n; | ||||
|  | ||||
| 	if (--node->ms_heap_size <= 0) | ||||
| 		return; | ||||
| 	n = node->ms_heap_size;		/* heap[n] needs to be reinserted */ | ||||
| 	i = 0;						/* i is where the "hole" is */ | ||||
| 	for (;;) | ||||
| 	{ | ||||
| 		int			j = 2 * i + 1; | ||||
|  | ||||
| 		if (j >= n) | ||||
| 			break; | ||||
| 		if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0) | ||||
| 			j++; | ||||
| 		if (heap_compare_slots(node, heap[n], heap[j]) <= 0) | ||||
| 			break; | ||||
| 		heap[i] = heap[j]; | ||||
| 		i = j; | ||||
| 	} | ||||
| 	heap[i] = heap[n]; | ||||
| } | ||||
|  | ||||
| /* | ||||
|  * Compare the tuples in the two given slots. | ||||
|  */ | ||||
| static int32 | ||||
| heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2) | ||||
| heap_compare_slots(Datum a, Datum b, void *arg) | ||||
| { | ||||
| 	MergeAppendState *node = (MergeAppendState *) arg; | ||||
| 	SlotNumber	slot1 = DatumGetInt32(a); | ||||
| 	SlotNumber	slot2 = DatumGetInt32(b); | ||||
|  | ||||
| 	TupleTableSlot *s1 = node->ms_slots[slot1]; | ||||
| 	TupleTableSlot *s2 = node->ms_slots[slot2]; | ||||
| 	int			nkey; | ||||
| @@ -291,7 +241,7 @@ heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2) | ||||
| 									  datum2, isNull2, | ||||
| 									  sortKey); | ||||
| 		if (compare != 0) | ||||
| 			return compare; | ||||
| 			return -compare; | ||||
| 	} | ||||
| 	return 0; | ||||
| } | ||||
| @@ -347,7 +297,5 @@ ExecReScanMergeAppend(MergeAppendState *node) | ||||
| 		if (subnode->chgParam == NULL) | ||||
| 			ExecReScan(subnode); | ||||
| 	} | ||||
| 	node->ms_heap_size = 0; | ||||
| 	node->ms_initialized = false; | ||||
| 	node->ms_last_slot = -1; | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user