mirror of
https://github.com/postgres/postgres.git
synced 2025-06-26 12:21:12 +03:00
Extend CTE patch to support recursive UNION (ie, without ALL). The
implementation uses an in-memory hash table, so it will poop out for very large recursive results ... but the performance characteristics of a sort-based implementation would be pretty unpleasant too.
This commit is contained in:
@ -8,7 +8,7 @@
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/executor/nodeRecursiveunion.c,v 1.1 2008/10/04 21:56:53 tgl Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/executor/nodeRecursiveunion.c,v 1.2 2008/10/07 19:27:04 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -17,6 +17,41 @@
|
||||
#include "executor/execdebug.h"
|
||||
#include "executor/nodeRecursiveunion.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
/*
|
||||
* To implement UNION (without ALL), we need a hashtable that stores tuples
|
||||
* already seen. The hash key is computed from the grouping columns.
|
||||
*/
|
||||
typedef struct RUHashEntryData *RUHashEntry;
|
||||
|
||||
typedef struct RUHashEntryData
|
||||
{
|
||||
TupleHashEntryData shared; /* common header for hash table entries */
|
||||
} RUHashEntryData;
|
||||
|
||||
|
||||
/*
|
||||
* Initialize the hash table to empty.
|
||||
*/
|
||||
static void
|
||||
build_hash_table(RecursiveUnionState *rustate)
|
||||
{
|
||||
RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan;
|
||||
|
||||
Assert(node->numCols > 0);
|
||||
Assert(node->numGroups > 0);
|
||||
|
||||
rustate->hashtable = BuildTupleHashTable(node->numCols,
|
||||
node->dupColIdx,
|
||||
rustate->eqfunctions,
|
||||
rustate->hashfunctions,
|
||||
node->numGroups,
|
||||
sizeof(RUHashEntryData),
|
||||
rustate->tableContext,
|
||||
rustate->tempContext);
|
||||
}
|
||||
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
@ -44,49 +79,85 @@ ExecRecursiveUnion(RecursiveUnionState *node)
|
||||
PlanState *innerPlan = innerPlanState(node);
|
||||
RecursiveUnion *plan = (RecursiveUnion *) node->ps.plan;
|
||||
TupleTableSlot *slot;
|
||||
RUHashEntry entry;
|
||||
bool isnew;
|
||||
|
||||
/* 1. Evaluate non-recursive term */
|
||||
if (!node->recursing)
|
||||
{
|
||||
slot = ExecProcNode(outerPlan);
|
||||
if (!TupIsNull(slot))
|
||||
for (;;)
|
||||
{
|
||||
slot = ExecProcNode(outerPlan);
|
||||
if (TupIsNull(slot))
|
||||
break;
|
||||
if (plan->numCols > 0)
|
||||
{
|
||||
/* Find or build hashtable entry for this tuple's group */
|
||||
entry = (RUHashEntry)
|
||||
LookupTupleHashEntry(node->hashtable, slot, &isnew);
|
||||
/* Must reset temp context after each hashtable lookup */
|
||||
MemoryContextReset(node->tempContext);
|
||||
/* Ignore tuple if already seen */
|
||||
if (!isnew)
|
||||
continue;
|
||||
}
|
||||
/* Each non-duplicate tuple goes to the working table ... */
|
||||
tuplestore_puttupleslot(node->working_table, slot);
|
||||
/* ... and to the caller */
|
||||
return slot;
|
||||
}
|
||||
node->recursing = true;
|
||||
}
|
||||
|
||||
retry:
|
||||
/* 2. Execute recursive term */
|
||||
slot = ExecProcNode(innerPlan);
|
||||
if (TupIsNull(slot))
|
||||
for (;;)
|
||||
{
|
||||
if (node->intermediate_empty)
|
||||
return NULL;
|
||||
slot = ExecProcNode(innerPlan);
|
||||
if (TupIsNull(slot))
|
||||
{
|
||||
/* Done if there's nothing in the intermediate table */
|
||||
if (node->intermediate_empty)
|
||||
break;
|
||||
|
||||
/* done with old working table ... */
|
||||
tuplestore_end(node->working_table);
|
||||
/* done with old working table ... */
|
||||
tuplestore_end(node->working_table);
|
||||
|
||||
/* intermediate table becomes working table */
|
||||
node->working_table = node->intermediate_table;
|
||||
/* intermediate table becomes working table */
|
||||
node->working_table = node->intermediate_table;
|
||||
|
||||
/* create new empty intermediate table */
|
||||
node->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
|
||||
node->intermediate_empty = true;
|
||||
/* create new empty intermediate table */
|
||||
node->intermediate_table = tuplestore_begin_heap(false, false,
|
||||
work_mem);
|
||||
node->intermediate_empty = true;
|
||||
|
||||
/* and reset the inner plan */
|
||||
innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
|
||||
plan->wtParam);
|
||||
goto retry;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* reset the recursive term */
|
||||
innerPlan->chgParam = bms_add_member(innerPlan->chgParam,
|
||||
plan->wtParam);
|
||||
|
||||
/* and continue fetching from recursive term */
|
||||
continue;
|
||||
}
|
||||
|
||||
if (plan->numCols > 0)
|
||||
{
|
||||
/* Find or build hashtable entry for this tuple's group */
|
||||
entry = (RUHashEntry)
|
||||
LookupTupleHashEntry(node->hashtable, slot, &isnew);
|
||||
/* Must reset temp context after each hashtable lookup */
|
||||
MemoryContextReset(node->tempContext);
|
||||
/* Ignore tuple if already seen */
|
||||
if (!isnew)
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Else, tuple is good; stash it in intermediate table ... */
|
||||
node->intermediate_empty = false;
|
||||
tuplestore_puttupleslot(node->intermediate_table, slot);
|
||||
}
|
||||
/* ... and return it */
|
||||
return slot;
|
||||
}
|
||||
|
||||
return slot;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
@ -109,12 +180,40 @@ ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
|
||||
rustate->ps.plan = (Plan *) node;
|
||||
rustate->ps.state = estate;
|
||||
|
||||
rustate->eqfunctions = NULL;
|
||||
rustate->hashfunctions = NULL;
|
||||
rustate->hashtable = NULL;
|
||||
rustate->tempContext = NULL;
|
||||
rustate->tableContext = NULL;
|
||||
|
||||
/* initialize processing state */
|
||||
rustate->recursing = false;
|
||||
rustate->intermediate_empty = true;
|
||||
rustate->working_table = tuplestore_begin_heap(false, false, work_mem);
|
||||
rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
|
||||
|
||||
/*
|
||||
* If hashing, we need a per-tuple memory context for comparisons, and a
|
||||
* longer-lived context to store the hash table. The table can't just be
|
||||
* kept in the per-query context because we want to be able to throw it
|
||||
* away when rescanning.
|
||||
*/
|
||||
if (node->numCols > 0)
|
||||
{
|
||||
rustate->tempContext =
|
||||
AllocSetContextCreate(CurrentMemoryContext,
|
||||
"RecursiveUnion",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
rustate->tableContext =
|
||||
AllocSetContextCreate(CurrentMemoryContext,
|
||||
"RecursiveUnion hash table",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
}
|
||||
|
||||
/*
|
||||
* Make the state structure available to descendant WorkTableScan nodes
|
||||
* via the Param slot reserved for it.
|
||||
@ -154,6 +253,19 @@ ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
|
||||
outerPlanState(rustate) = ExecInitNode(outerPlan(node), estate, eflags);
|
||||
innerPlanState(rustate) = ExecInitNode(innerPlan(node), estate, eflags);
|
||||
|
||||
/*
|
||||
* If hashing, precompute fmgr lookup data for inner loop, and create
|
||||
* the hash table.
|
||||
*/
|
||||
if (node->numCols > 0)
|
||||
{
|
||||
execTuplesHashPrepare(node->numCols,
|
||||
node->dupOperators,
|
||||
&rustate->eqfunctions,
|
||||
&rustate->hashfunctions);
|
||||
build_hash_table(rustate);
|
||||
}
|
||||
|
||||
return rustate;
|
||||
}
|
||||
|
||||
@ -178,6 +290,12 @@ ExecEndRecursiveUnion(RecursiveUnionState *node)
|
||||
tuplestore_end(node->working_table);
|
||||
tuplestore_end(node->intermediate_table);
|
||||
|
||||
/* free subsidiary stuff including hashtable */
|
||||
if (node->tempContext)
|
||||
MemoryContextDelete(node->tempContext);
|
||||
if (node->tableContext)
|
||||
MemoryContextDelete(node->tableContext);
|
||||
|
||||
/*
|
||||
* clean out the upper tuple table
|
||||
*/
|
||||
@ -217,6 +335,14 @@ ExecRecursiveUnionReScan(RecursiveUnionState *node, ExprContext *exprCtxt)
|
||||
if (outerPlan->chgParam == NULL)
|
||||
ExecReScan(outerPlan, exprCtxt);
|
||||
|
||||
/* Release any hashtable storage */
|
||||
if (node->tableContext)
|
||||
MemoryContextResetAndDeleteChildren(node->tableContext);
|
||||
|
||||
/* And rebuild empty hashtable if needed */
|
||||
if (plan->numCols > 0)
|
||||
build_hash_table(node);
|
||||
|
||||
/* reset processing state */
|
||||
node->recursing = false;
|
||||
node->intermediate_empty = true;
|
||||
|
Reference in New Issue
Block a user