1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-11 20:28:21 +03:00

Revise hash join code so that we can increase the number of batches

on-the-fly, and thereby avoid blowing out memory when the planner has
underestimated the hash table size.  Hash join will now obey the
work_mem limit with some faithfulness.  Per my recent proposal
(hash aggregate part isn't done yet though).
This commit is contained in:
Tom Lane
2005-03-06 22:15:05 +00:00
parent 31b6d840f6
commit 849074f9ae
9 changed files with 654 additions and 450 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.67 2004/12/31 21:59:45 pgsql Exp $
* $PostgreSQL: pgsql/src/backend/executor/nodeHashjoin.c,v 1.68 2005/03/06 22:15:04 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -16,16 +16,19 @@
#include "postgres.h"
#include "executor/executor.h"
#include "executor/hashjoin.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "optimizer/clauses.h"
#include "utils/memutils.h"
static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *node,
HashJoinState *hjstate);
static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
HashJoinState *hjstate,
uint32 *hashvalue);
static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
BufFile *file,
uint32 *hashvalue,
TupleTableSlot *tupleSlot);
static int ExecHashJoinNewBatch(HashJoinState *hjstate);
@ -34,9 +37,9 @@ static int ExecHashJoinNewBatch(HashJoinState *hjstate);
* ExecHashJoin
*
* This function implements the Hybrid Hashjoin algorithm.
* recursive partitioning remains to be added.
* Note: the relation we build hash table on is the inner
* the other one is outer.
*
* Note: the relation we build hash table on is the "inner"
* the other one is "outer".
* ----------------------------------------------------------------
*/
TupleTableSlot * /* return: a tuple or NULL */
@ -45,8 +48,6 @@ ExecHashJoin(HashJoinState *node)
EState *estate;
PlanState *outerNode;
HashState *hashNode;
List *hjclauses;
List *outerkeys;
List *joinqual;
List *otherqual;
ScanDirection dir;
@ -56,12 +57,12 @@ ExecHashJoin(HashJoinState *node)
HashJoinTable hashtable;
HeapTuple curtuple;
TupleTableSlot *outerTupleSlot;
int i;
uint32 hashvalue;
int batchno;
/*
* get information from HashJoin node
*/
hjclauses = node->hashclauses;
estate = node->js.ps.state;
joinqual = node->js.joinqual;
otherqual = node->js.ps.qual;
@ -73,7 +74,6 @@ ExecHashJoin(HashJoinState *node)
* get information from HashJoin state
*/
hashtable = node->hj_HashTable;
outerkeys = node->hj_OuterHashKeys;
econtext = node->js.ps.ps_ExprContext;
/*
@ -111,12 +111,11 @@ ExecHashJoin(HashJoinState *node)
/*
* if this is the first call, build the hash table for inner relation
*/
if (!node->hj_hashdone)
if (hashtable == NULL)
{
/*
* create the hash table
*/
Assert(hashtable == NULL);
hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan,
node->hj_HashOperators);
node->hj_HashTable = hashtable;
@ -139,13 +138,10 @@ ExecHashJoin(HashJoinState *node)
}
/*
* Open temp files for outer batches, if needed. Note that file
* buffers are palloc'd in regular executor context.
* need to remember whether nbatch has increased since we began
* scanning the outer relation
*/
for (i = 0; i < hashtable->nbatch; i++)
hashtable->outerBatchFile[i] = BufFileCreateTemp(false);
node->hj_hashdone = true;
hashtable->nbatch_outstart = hashtable->nbatch;
}
/*
@ -159,7 +155,8 @@ ExecHashJoin(HashJoinState *node)
if (node->hj_NeedNewOuter)
{
outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
node);
node,
&hashvalue);
if (TupIsNull(outerTupleSlot))
{
/* end of join */
@ -175,32 +172,26 @@ ExecHashJoin(HashJoinState *node)
* now we have an outer tuple, find the corresponding bucket
* for this tuple from the hash table
*/
node->hj_CurBucketNo = ExecHashGetBucket(hashtable, econtext,
outerkeys);
node->hj_CurHashValue = hashvalue;
ExecHashGetBucketAndBatch(hashtable, hashvalue,
&node->hj_CurBucketNo, &batchno);
node->hj_CurTuple = NULL;
/*
* Now we've got an outer tuple and the corresponding hash
* bucket, but this tuple may not belong to the current batch.
* This need only be checked in the first pass.
*/
if (hashtable->curbatch == 0)
if (batchno != hashtable->curbatch)
{
int batchno = ExecHashGetBatch(node->hj_CurBucketNo,
hashtable);
if (batchno >= 0)
{
/*
* Need to postpone this outer tuple to a later batch.
* Save it in the corresponding outer-batch file.
*/
hashtable->outerBatchSize[batchno]++;
ExecHashJoinSaveTuple(outerTupleSlot->val,
hashtable->outerBatchFile[batchno]);
node->hj_NeedNewOuter = true;
continue; /* loop around for a new outer tuple */
}
/*
* Need to postpone this outer tuple to a later batch.
* Save it in the corresponding outer-batch file.
*/
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(outerTupleSlot->val, hashvalue,
&hashtable->outerBatchFile[batchno]);
node->hj_NeedNewOuter = true;
continue; /* loop around for a new outer tuple */
}
}
@ -209,9 +200,7 @@ ExecHashJoin(HashJoinState *node)
*/
for (;;)
{
curtuple = ExecScanHashBucket(node,
hjclauses,
econtext);
curtuple = ExecScanHashBucket(node, econtext);
if (curtuple == NULL)
break; /* out of matches */
@ -412,10 +401,9 @@ ExecInitHashJoin(HashJoin *node, EState *estate)
/*
* initialize hash-specific info
*/
hjstate->hj_hashdone = false;
hjstate->hj_HashTable = NULL;
hjstate->hj_CurHashValue = 0;
hjstate->hj_CurBucketNo = 0;
hjstate->hj_CurTuple = NULL;
@ -499,17 +487,21 @@ ExecEndHashJoin(HashJoinState *node)
ExecEndNode(innerPlanState(node));
}
/* ----------------------------------------------------------------
* ExecHashJoinOuterGetTuple
/*
* ExecHashJoinOuterGetTuple
*
* get the next outer tuple for hashjoin: either by
* executing a plan node as in the first pass, or from
* the tmp files for the hashjoin batches.
* ----------------------------------------------------------------
* executing a plan node in the first pass, or from
* the temp files for the hashjoin batches.
*
* Returns a null slot if no more outer tuples. On success, the tuple's
* hash value is stored at *hashvalue --- this is either originally computed,
* or re-read from the temp file.
*/
static TupleTableSlot *
ExecHashJoinOuterGetTuple(PlanState *node, HashJoinState *hjstate)
ExecHashJoinOuterGetTuple(PlanState *outerNode,
HashJoinState *hjstate,
uint32 *hashvalue)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
int curbatch = hashtable->curbatch;
@ -517,9 +509,20 @@ ExecHashJoinOuterGetTuple(PlanState *node, HashJoinState *hjstate)
if (curbatch == 0)
{ /* if it is the first pass */
slot = ExecProcNode(node);
slot = ExecProcNode(outerNode);
if (!TupIsNull(slot))
{
/*
* We have to compute the tuple's hash value.
*/
ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
econtext->ecxt_outertuple = slot;
*hashvalue = ExecHashGetHashValue(hashtable, econtext,
hjstate->hj_OuterHashKeys);
return slot;
}
/*
* We have just reached the end of the first pass. Try to switch
@ -530,12 +533,14 @@ ExecHashJoinOuterGetTuple(PlanState *node, HashJoinState *hjstate)
/*
* Try to read from a temp file. Loop allows us to advance to new
* batch as needed.
* batches as needed. NOTE: nbatch could increase inside
* ExecHashJoinNewBatch, so don't try to optimize this loop.
*/
while (curbatch <= hashtable->nbatch)
while (curbatch < hashtable->nbatch)
{
slot = ExecHashJoinGetSavedTuple(hjstate,
hashtable->outerBatchFile[curbatch - 1],
hashtable->outerBatchFile[curbatch],
hashvalue,
hjstate->hj_OuterTupleSlot);
if (!TupIsNull(slot))
return slot;
@ -546,29 +551,209 @@ ExecHashJoinOuterGetTuple(PlanState *node, HashJoinState *hjstate)
return NULL;
}
/* ----------------------------------------------------------------
* ExecHashJoinGetSavedTuple
/*
* ExecHashJoinNewBatch
* switch to a new hashjoin batch
*
* read the next tuple from a tmp file
* ----------------------------------------------------------------
* Returns the number of the new batch (1..nbatch-1), or nbatch if no more.
* We will never return a batch number that has an empty outer batch file.
*/
static int
ExecHashJoinNewBatch(HashJoinState *hjstate)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
int nbatch;
int curbatch;
BufFile *innerFile;
TupleTableSlot *slot;
uint32 hashvalue;
start_over:
nbatch = hashtable->nbatch;
curbatch = hashtable->curbatch;
if (curbatch > 0)
{
/*
* We no longer need the previous outer batch file; close it right
* away to free disk space.
*/
if (hashtable->outerBatchFile[curbatch])
BufFileClose(hashtable->outerBatchFile[curbatch]);
hashtable->outerBatchFile[curbatch] = NULL;
}
/*
* We can always skip over any batches that are completely empty on both
* sides. We can sometimes skip over batches that are empty on only one
* side, but there are exceptions:
*
* 1. In a LEFT JOIN, we have to process outer batches even if the
* inner batch is empty.
*
* 2. If we have increased nbatch since the initial estimate, we have
* to scan inner batches since they might contain tuples that need to
* be reassigned to later inner batches.
*
* 3. Similarly, if we have increased nbatch since starting the outer
* scan, we have to rescan outer batches in case they contain tuples
* that need to be reassigned.
*/
curbatch++;
while (curbatch < nbatch &&
(hashtable->outerBatchFile[curbatch] == NULL ||
hashtable->innerBatchFile[curbatch] == NULL))
{
if (hashtable->outerBatchFile[curbatch] &&
hjstate->js.jointype == JOIN_LEFT)
break; /* must process due to rule 1 */
if (hashtable->innerBatchFile[curbatch] &&
nbatch != hashtable->nbatch_original)
break; /* must process due to rule 2 */
if (hashtable->outerBatchFile[curbatch] &&
nbatch != hashtable->nbatch_outstart)
break; /* must process due to rule 3 */
/* We can ignore this batch. */
/* Release associated temp files right away. */
if (hashtable->innerBatchFile[curbatch])
BufFileClose(hashtable->innerBatchFile[curbatch]);
hashtable->innerBatchFile[curbatch] = NULL;
if (hashtable->outerBatchFile[curbatch])
BufFileClose(hashtable->outerBatchFile[curbatch]);
hashtable->outerBatchFile[curbatch] = NULL;
curbatch++;
}
if (curbatch >= nbatch)
return curbatch; /* no more batches */
hashtable->curbatch = curbatch;
/*
* Reload the hash table with the new inner batch (which could be empty)
*/
ExecHashTableReset(hashtable);
innerFile = hashtable->innerBatchFile[curbatch];
if (innerFile != NULL)
{
if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join temporary file: %m")));
while ((slot = ExecHashJoinGetSavedTuple(hjstate,
innerFile,
&hashvalue,
hjstate->hj_HashTupleSlot)))
{
/*
* NOTE: some tuples may be sent to future batches. Also,
* it is possible for hashtable->nbatch to be increased here!
*/
ExecHashTableInsert(hashtable, slot->val, hashvalue);
}
/*
* after we build the hash table, the inner batch file is no longer
* needed
*/
BufFileClose(innerFile);
hashtable->innerBatchFile[curbatch] = NULL;
}
/*
* If there's no outer batch file, advance to next batch.
*/
if (hashtable->outerBatchFile[curbatch] == NULL)
goto start_over;
/*
* Rewind outer batch file, so that we can start reading it.
*/
if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join temporary file: %m")));
return curbatch;
}
/*
* ExecHashJoinSaveTuple
* save a tuple to a batch file.
*
* The data recorded in the file for each tuple is its hash value,
* then an image of its HeapTupleData (with meaningless t_data pointer)
* followed by the HeapTupleHeader and tuple data.
*
* Note: it is important always to call this in the regular executor
* context, not in a shorter-lived context; else the temp file buffers
* will get messed up.
*/
void
ExecHashJoinSaveTuple(HeapTuple heapTuple, uint32 hashvalue,
BufFile **fileptr)
{
BufFile *file = *fileptr;
size_t written;
if (file == NULL)
{
/* First write to this batch file, so open it. */
file = BufFileCreateTemp(false);
*fileptr = file;
}
written = BufFileWrite(file, (void *) &hashvalue, sizeof(uint32));
if (written != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to hash-join temporary file: %m")));
written = BufFileWrite(file, (void *) heapTuple, sizeof(HeapTupleData));
if (written != sizeof(HeapTupleData))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to hash-join temporary file: %m")));
written = BufFileWrite(file, (void *) heapTuple->t_data, heapTuple->t_len);
if (written != (size_t) heapTuple->t_len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to hash-join temporary file: %m")));
}
/*
* ExecHashJoinGetSavedTuple
* read the next tuple from a batch file. Return NULL if no more.
*
* On success, *hashvalue is set to the tuple's hash value, and the tuple
* itself is stored in the given slot.
*/
static TupleTableSlot *
ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
BufFile *file,
uint32 *hashvalue,
TupleTableSlot *tupleSlot)
{
HeapTupleData htup;
size_t nread;
HeapTuple heapTuple;
nread = BufFileRead(file, (void *) &htup, sizeof(HeapTupleData));
nread = BufFileRead(file, (void *) hashvalue, sizeof(uint32));
if (nread == 0)
return NULL; /* end of file */
if (nread != sizeof(uint32))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from hash-join temporary file: %m")));
nread = BufFileRead(file, (void *) &htup, sizeof(HeapTupleData));
if (nread != sizeof(HeapTupleData))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from hash-join temporary file: %m")));
errmsg("could not read from hash-join temporary file: %m")));
heapTuple = palloc(HEAPTUPLESIZE + htup.t_len);
memcpy((char *) heapTuple, (char *) &htup, sizeof(HeapTupleData));
heapTuple->t_datamcxt = CurrentMemoryContext;
@ -578,131 +763,10 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
if (nread != (size_t) htup.t_len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from hash-join temporary file: %m")));
errmsg("could not read from hash-join temporary file: %m")));
return ExecStoreTuple(heapTuple, tupleSlot, InvalidBuffer, true);
}
/* ----------------------------------------------------------------
* ExecHashJoinNewBatch
*
* switch to a new hashjoin batch
* ----------------------------------------------------------------
*/
static int
ExecHashJoinNewBatch(HashJoinState *hjstate)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
int nbatch = hashtable->nbatch;
int newbatch = hashtable->curbatch + 1;
long *innerBatchSize = hashtable->innerBatchSize;
long *outerBatchSize = hashtable->outerBatchSize;
BufFile *innerFile;
TupleTableSlot *slot;
ExprContext *econtext;
List *innerhashkeys;
if (newbatch > 1)
{
/*
* We no longer need the previous outer batch file; close it right
* away to free disk space.
*/
BufFileClose(hashtable->outerBatchFile[newbatch - 2]);
hashtable->outerBatchFile[newbatch - 2] = NULL;
}
/*
* Normally we can skip over any batches that are empty on either side
* --- but for JOIN_LEFT, can only skip when left side is empty.
* Release associated temp files right away.
*/
while (newbatch <= nbatch &&
(outerBatchSize[newbatch - 1] == 0L ||
(innerBatchSize[newbatch - 1] == 0L &&
hjstate->js.jointype != JOIN_LEFT)))
{
BufFileClose(hashtable->innerBatchFile[newbatch - 1]);
hashtable->innerBatchFile[newbatch - 1] = NULL;
BufFileClose(hashtable->outerBatchFile[newbatch - 1]);
hashtable->outerBatchFile[newbatch - 1] = NULL;
newbatch++;
}
if (newbatch > nbatch)
return newbatch; /* no more batches */
/*
* Rewind inner and outer batch files for this batch, so that we can
* start reading them.
*/
if (BufFileSeek(hashtable->outerBatchFile[newbatch - 1], 0, 0L, SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join temporary file: %m")));
innerFile = hashtable->innerBatchFile[newbatch - 1];
if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join temporary file: %m")));
/*
* Reload the hash table with the new inner batch
*/
ExecHashTableReset(hashtable, innerBatchSize[newbatch - 1]);
econtext = hjstate->js.ps.ps_ExprContext;
innerhashkeys = hjstate->hj_InnerHashKeys;
while ((slot = ExecHashJoinGetSavedTuple(hjstate,
innerFile,
hjstate->hj_HashTupleSlot))
&& !TupIsNull(slot))
{
econtext->ecxt_innertuple = slot;
ExecHashTableInsert(hashtable, econtext, innerhashkeys);
}
/*
* after we build the hash table, the inner batch file is no longer
* needed
*/
BufFileClose(innerFile);
hashtable->innerBatchFile[newbatch - 1] = NULL;
hashtable->curbatch = newbatch;
return newbatch;
}
/* ----------------------------------------------------------------
* ExecHashJoinSaveTuple
*
* save a tuple to a tmp file.
*
* The data recorded in the file for each tuple is an image of its
* HeapTupleData (with meaningless t_data pointer) followed by the
* HeapTupleHeader and tuple data.
* ----------------------------------------------------------------
*/
void
ExecHashJoinSaveTuple(HeapTuple heapTuple,
BufFile *file)
{
size_t written;
written = BufFileWrite(file, (void *) heapTuple, sizeof(HeapTupleData));
if (written != sizeof(HeapTupleData))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to hash-join temporary file: %m")));
written = BufFileWrite(file, (void *) heapTuple->t_data, heapTuple->t_len);
if (written != (size_t) heapTuple->t_len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to hash-join temporary file: %m")));
}
void
ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
@ -711,9 +775,8 @@ ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
* If we haven't yet built the hash table then we can just return;
* nothing done yet, so nothing to undo.
*/
if (!node->hj_hashdone)
if (node->hj_HashTable == NULL)
return;
Assert(node->hj_HashTable != NULL);
/*
* In a multi-batch join, we currently have to do rescans the hard
@ -722,7 +785,7 @@ ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
* parameter change for the inner subnode, then we can just re-use the
* existing hash table without rebuilding it.
*/
if (node->hj_HashTable->nbatch == 0 &&
if (node->hj_HashTable->nbatch == 1 &&
((PlanState *) node)->righttree->chgParam == NULL)
{
/* okay to reuse the hash table; needn't rescan inner, either */
@ -730,7 +793,6 @@ ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
else
{
/* must destroy and rebuild hash table */
node->hj_hashdone = false;
ExecHashTableDestroy(node->hj_HashTable);
node->hj_HashTable = NULL;
@ -743,6 +805,7 @@ ExecReScanHashJoin(HashJoinState *node, ExprContext *exprCtxt)
}
/* Always reset intra-tuple state */
node->hj_CurHashValue = 0;
node->hj_CurBucketNo = 0;
node->hj_CurTuple = NULL;