1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-11 10:01:57 +03:00

Revise tuplestore and nodeMaterial so that we don't have to read the

entire contents of the subplan into the tuplestore before we can return
any tuples.  Instead, the tuplestore holds what we've already read, and
we fetch additional rows from the subplan as needed.  Random access to
the previously-read rows works with the tuplestore, and doesn't affect
the state of the partially-read subplan.  This is a step towards fixing
the problems with cursors over complex queries --- we don't want to
stick in Materialize nodes if they'll prevent quick startup for a cursor.
This commit is contained in:
Tom Lane
2003-03-09 02:19:13 +00:00
parent 05a966fca4
commit aa60eecc37
7 changed files with 280 additions and 240 deletions

View File

@ -6,27 +6,37 @@
* This module handles temporary storage of tuples for purposes such
* as Materialize nodes, hashjoin batch files, etc. It is essentially
* a dumbed-down version of tuplesort.c; it does no sorting of tuples
* but can only store a sequence of tuples and regurgitate it later.
* but can only store and regurgitate a sequence of tuples. However,
* because no sort is required, it is allowed to start reading the sequence
* before it has all been written. This is particularly useful for cursors,
* because it allows random access within the already-scanned portion of
* a query without having to process the underlying scan to completion.
* A temporary file is used to handle the data if it exceeds the
* space limit specified by the caller.
*
* The (approximate) amount of memory allowed to the tuplestore is specified
* in kilobytes by the caller. We absorb tuples and simply store them in an
* in-memory array as long as we haven't exceeded maxKBytes. If we reach the
* end of the input without exceeding maxKBytes, we just return tuples during
* the read phase by scanning the tuple array sequentially. If we do exceed
* in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
* maxKBytes, we dump all the tuples into a temp file and then read from that
* during the read phase.
* when needed.
*
* When the caller requests random access to the data, we write the temp file
* in a format that allows either forward or backward scan.
* in a format that allows either forward or backward scan. Otherwise, only
* forward scan is allowed. But rewind and markpos/restorepos are allowed
* in any case.
*
* Because we allow reading before writing is complete, there are two
* interesting positions in the temp file: the current read position and
* the current write position. At any given instant, the temp file's seek
* position corresponds to one of these, and the other one is remembered in
* the Tuplestore's state.
*
*
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplestore.c,v 1.10 2002/11/13 00:39:48 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplestore.c,v 1.11 2003/03/09 02:19:13 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -43,11 +53,9 @@
*/
typedef enum
{
TSS_INITIAL, /* Loading tuples; still within memory
* limit */
TSS_WRITEFILE, /* Loading tuples; writing to temp file */
TSS_READMEM, /* Reading tuples; entirely in memory */
TSS_READFILE /* Reading tuples from temp file */
TSS_INMEM, /* Tuples still fit in memory */
TSS_WRITEFILE, /* Writing to temp file */
TSS_READFILE /* Reading from temp file */
} TupStoreStatus;
/*
@ -95,25 +103,38 @@ struct Tuplestorestate
/*
* This array holds pointers to tuples in memory if we are in state
* INITIAL or READMEM. In states WRITEFILE and READFILE it's not
* used.
* INMEM. In states WRITEFILE and READFILE it's not used.
*/
void **memtuples; /* array of pointers to palloc'd tuples */
int memtupcount; /* number of tuples currently present */
int memtupsize; /* allocated length of memtuples array */
/*
* These variables are used after completion of storing to keep track
* of the next tuple to return. (In the tape case, the tape's current
* read position is also critical state.)
* These variables are used to keep track of the current position.
*
* In state WRITEFILE, the current file seek position is the write point,
* and the read position is remembered in readpos_xxx; in state READFILE,
* the current file seek position is the read point, and the write position
* is remembered in writepos_xxx. (The write position is the same as EOF,
* but since BufFileSeek doesn't currently implement SEEK_END, we have
* to remember it explicitly.)
*
* Special case: if we are in WRITEFILE state and eof_reached is true,
* then the read position is implicitly equal to the write position
* (and hence to the file seek position); this way we need not update
* the readpos_xxx variables on each write.
*/
int current; /* array index (only used if READMEM) */
bool eof_reached; /* reached EOF (needed for cursors) */
bool eof_reached; /* read reached EOF (always valid) */
int current; /* next array index (valid if INMEM) */
int readpos_file; /* file# (valid if WRITEFILE and not eof) */
long readpos_offset; /* offset (valid if WRITEFILE and not eof) */
int writepos_file; /* file# (valid if READFILE) */
long writepos_offset; /* offset (valid if READFILE) */
/* markpos_xxx holds marked position for mark and restore */
int markpos_file; /* file# (only used if READFILE) */
long markpos_offset; /* saved "current", or offset in tape file */
bool markpos_eof; /* saved "eof_reached" */
int markpos_current; /* saved "current" */
int markpos_file; /* saved "readpos_file" */
long markpos_offset; /* saved "readpos_offset" */
};
#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
@ -128,8 +149,8 @@ struct Tuplestorestate
* NOTES about on-tape representation of tuples:
*
* We require the first "unsigned int" of a stored tuple to be the total size
* on-tape of the tuple, including itself (so it is never zero; an all-zero
* unsigned int is used to delimit runs). The remainder of the stored tuple
* on-tape of the tuple, including itself (so it is never zero).
* The remainder of the stored tuple
* may or may not match the in-memory representation of the tuple ---
* any conversion needed is the job of the writetup and readtup routines.
*
@ -172,7 +193,6 @@ static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
int maxKBytes);
static void dumptuples(Tuplestorestate *state);
static unsigned int getlen(Tuplestorestate *state, bool eofOK);
static void markrunend(Tuplestorestate *state);
static void *copytup_heap(Tuplestorestate *state, void *tup);
static void writetup_heap(Tuplestorestate *state, void *tup);
static void *readtup_heap(Tuplestorestate *state, unsigned int len);
@ -182,14 +202,6 @@ static void *readtup_heap(Tuplestorestate *state, unsigned int len);
* tuplestore_begin_xxx
*
* Initialize for a tuple store operation.
*
* After calling tuplestore_begin, the caller should call tuplestore_puttuple
* zero or more times, then call tuplestore_donestoring when all the tuples
* have been supplied. After donestoring, retrieve the tuples in order
* by calling tuplestore_gettuple until it returns NULL. (If random
* access was requested, rescan, markpos, and restorepos can also be called.)
* Call tuplestore_end to terminate the operation and release memory/disk
* space.
*/
static Tuplestorestate *
@ -199,7 +211,7 @@ tuplestore_begin_common(bool randomAccess, int maxKBytes)
state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
state->status = TSS_INITIAL;
state->status = TSS_INMEM;
state->randomAccess = randomAccess;
state->availMem = maxKBytes * 1024L;
state->myfile = NULL;
@ -213,6 +225,9 @@ tuplestore_begin_common(bool randomAccess, int maxKBytes)
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->eof_reached = false;
state->current = 0;
return state;
}
@ -249,9 +264,24 @@ tuplestore_end(Tuplestorestate *state)
}
/*
* Accept one tuple while collecting input data.
* tuplestore_ateof
*
* Returns the current eof_reached state.
*/
bool
tuplestore_ateof(Tuplestorestate *state)
{
return state->eof_reached;
}
/*
* Accept one tuple and append it to the tuplestore.
*
* Note that the input tuple is always copied; the caller need not save it.
*
* If the read status is currently "AT EOF" then it remains so (the read
* pointer advances along with the write pointer); otherwise the read
* pointer is unchanged. This is for the convenience of nodeMaterial.c.
*/
void
tuplestore_puttuple(Tuplestorestate *state, void *tuple)
@ -263,14 +293,10 @@ tuplestore_puttuple(Tuplestorestate *state, void *tuple)
switch (state->status)
{
case TSS_INITIAL:
/*
* Stash the tuple in the in-memory array.
*/
case TSS_INMEM:
/* Grow the array as needed */
if (state->memtupcount >= state->memtupsize)
{
/* Grow the array as needed. */
FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->memtupsize *= 2;
state->memtuples = (void **)
@ -278,8 +304,14 @@ tuplestore_puttuple(Tuplestorestate *state, void *tuple)
state->memtupsize * sizeof(void *));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
}
/* Stash the tuple in the in-memory array */
state->memtuples[state->memtupcount++] = tuple;
/* If eof_reached, keep read position in sync */
if (state->eof_reached)
state->current = state->memtupcount;
/*
* Done if we still fit in available memory.
*/
@ -296,56 +328,26 @@ tuplestore_puttuple(Tuplestorestate *state, void *tuple)
case TSS_WRITEFILE:
WRITETUP(state, tuple);
break;
case TSS_READFILE:
/*
* Switch from reading to writing.
*/
if (!state->eof_reached)
BufFileTell(state->myfile,
&state->readpos_file, &state->readpos_offset);
if (BufFileSeek(state->myfile,
state->writepos_file, state->writepos_offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore_puttuple: seek(EOF) failed");
state->status = TSS_WRITEFILE;
WRITETUP(state, tuple);
break;
default:
elog(ERROR, "tuplestore_puttuple: invalid state");
break;
}
}
/*
* All tuples have been provided; finish writing.
*/
void
tuplestore_donestoring(Tuplestorestate *state)
{
switch (state->status)
{
case TSS_INITIAL:
/*
* We were able to accumulate all the tuples within the
* allowed amount of memory. Just set up to scan them.
*/
state->current = 0;
state->eof_reached = false;
state->markpos_offset = 0L;
state->markpos_eof = false;
state->status = TSS_READMEM;
break;
case TSS_WRITEFILE:
/*
* Write the EOF marker.
*/
markrunend(state);
/*
* Set up for reading from tape.
*/
if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
elog(ERROR, "tuplestore_donestoring: seek(0) failed");
state->eof_reached = false;
state->markpos_file = 0;
state->markpos_offset = 0L;
state->markpos_eof = false;
state->status = TSS_READFILE;
break;
default:
elog(ERROR, "tuplestore_donestoring: invalid state");
break;
}
}
/*
* Fetch the next tuple in either forward or back direction.
* Returns NULL if no more tuples. If should_free is set, the
@ -358,10 +360,11 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
unsigned int tuplen;
void *tup;
Assert(forward || state->randomAccess);
switch (state->status)
{
case TSS_READMEM:
Assert(forward || state->randomAccess);
case TSS_INMEM:
*should_free = false;
if (forward)
{
@ -391,13 +394,27 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
}
break;
case TSS_WRITEFILE:
/* Skip state change if we'll just return NULL */
if (state->eof_reached && forward)
return NULL;
/*
* Switch from writing to reading.
*/
BufFileTell(state->myfile,
&state->writepos_file, &state->writepos_offset);
if (!state->eof_reached)
if (BufFileSeek(state->myfile,
state->readpos_file, state->readpos_offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore_gettuple: seek() failed");
state->status = TSS_READFILE;
/* FALL THRU into READFILE case */
case TSS_READFILE:
Assert(forward || state->randomAccess);
*should_free = true;
if (forward)
{
if (state->eof_reached)
return NULL;
if ((tuplen = getlen(state, true)) != 0)
{
tup = READTUP(state, tuplen);
@ -415,34 +432,23 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
*
* if all tuples are fetched already then we return last tuple,
* else - tuple before last returned.
*
* Back up to fetch previously-returned tuple's ending
* length word. If seek fails, assume we are at start of
* file.
*/
if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
SEEK_CUR) != 0)
return NULL;
tuplen = getlen(state, false);
if (state->eof_reached)
{
/*
* Seek position is pointing just past the zero tuplen at
* the end of file; back up to fetch last tuple's ending
* length word. If seek fails we must have a completely
* empty file.
*/
if (BufFileSeek(state->myfile, 0,
-(long) (2 * sizeof(unsigned int)),
SEEK_CUR) != 0)
return NULL;
state->eof_reached = false;
/* We will return the tuple returned before returning NULL */
}
else
{
/*
* Back up and fetch previously-returned tuple's ending
* length word. If seek fails, assume we are at start of
* file.
*/
if (BufFileSeek(state->myfile, 0,
-(long) sizeof(unsigned int),
SEEK_CUR) != 0)
return NULL;
tuplen = getlen(state, false);
/*
* Back up to get ending length word of tuple before it.
*/
@ -462,10 +468,9 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
return NULL;
}
tuplen = getlen(state, false);
}
tuplen = getlen(state, false);
/*
* Now we have the length of the prior tuple, back up and read
* it. Note: READTUP expects we are positioned after the
@ -486,14 +491,28 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
/*
* dumptuples - remove tuples from memory and write to tape
*
* As a side effect, we must set readpos and markpos to the value
* corresponding to "current"; otherwise, a dump would lose the current read
* position.
*/
static void
dumptuples(Tuplestorestate *state)
{
int i;
for (i = 0; i < state->memtupcount; i++)
for (i = 0; ; i++)
{
if (i == state->current)
BufFileTell(state->myfile,
&state->readpos_file, &state->readpos_offset);
if (i == state->markpos_current)
BufFileTell(state->myfile,
&state->markpos_file, &state->markpos_offset);
if (i >= state->memtupcount)
break;
WRITETUP(state, state->memtuples[i]);
}
state->memtupcount = 0;
}
@ -503,23 +522,21 @@ dumptuples(Tuplestorestate *state)
void
tuplestore_rescan(Tuplestorestate *state)
{
Assert(state->randomAccess);
switch (state->status)
{
case TSS_READMEM:
state->current = 0;
case TSS_INMEM:
state->eof_reached = false;
state->markpos_offset = 0L;
state->markpos_eof = false;
state->current = 0;
break;
case TSS_WRITEFILE:
state->eof_reached = false;
state->readpos_file = 0;
state->readpos_offset = 0L;
break;
case TSS_READFILE:
state->eof_reached = false;
if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
elog(ERROR, "tuplestore_rescan: seek(0) failed");
state->eof_reached = false;
state->markpos_file = 0;
state->markpos_offset = 0L;
state->markpos_eof = false;
break;
default:
elog(ERROR, "tuplestore_rescan: invalid state");
@ -533,19 +550,29 @@ tuplestore_rescan(Tuplestorestate *state)
void
tuplestore_markpos(Tuplestorestate *state)
{
Assert(state->randomAccess);
switch (state->status)
{
case TSS_READMEM:
state->markpos_offset = state->current;
state->markpos_eof = state->eof_reached;
case TSS_INMEM:
state->markpos_current = state->current;
break;
case TSS_WRITEFILE:
if (state->eof_reached)
{
/* Need to record the implicit read position */
BufFileTell(state->myfile,
&state->markpos_file,
&state->markpos_offset);
}
else
{
state->markpos_file = state->readpos_file;
state->markpos_offset = state->readpos_offset;
}
break;
case TSS_READFILE:
BufFileTell(state->myfile,
&state->markpos_file,
&state->markpos_offset);
state->markpos_eof = state->eof_reached;
break;
default:
elog(ERROR, "tuplestore_markpos: invalid state");
@ -560,21 +587,24 @@ tuplestore_markpos(Tuplestorestate *state)
void
tuplestore_restorepos(Tuplestorestate *state)
{
Assert(state->randomAccess);
switch (state->status)
{
case TSS_READMEM:
state->current = (int) state->markpos_offset;
state->eof_reached = state->markpos_eof;
case TSS_INMEM:
state->eof_reached = false;
state->current = state->markpos_current;
break;
case TSS_WRITEFILE:
state->eof_reached = false;
state->readpos_file = state->markpos_file;
state->readpos_offset = state->markpos_offset;
break;
case TSS_READFILE:
state->eof_reached = false;
if (BufFileSeek(state->myfile,
state->markpos_file,
state->markpos_offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore_restorepos failed");
state->eof_reached = state->markpos_eof;
break;
default:
elog(ERROR, "tuplestore_restorepos: invalid state");
@ -591,21 +621,16 @@ static unsigned int
getlen(Tuplestorestate *state, bool eofOK)
{
unsigned int len;
size_t nbytes;
if (BufFileRead(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
if (nbytes == sizeof(len))
return len;
if (nbytes != 0)
elog(ERROR, "tuplestore: unexpected end of tape");
if (len == 0 && !eofOK)
if (!eofOK)
elog(ERROR, "tuplestore: unexpected end of data");
return len;
}
static void
markrunend(Tuplestorestate *state)
{
unsigned int len = 0;
if (BufFileWrite(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
elog(ERROR, "tuplestore: write failed");
return 0;
}