mirror of
https://github.com/postgres/postgres.git
synced 2025-09-09 13:09:39 +03:00
Teach tuplestore.c to throw away data before the "mark" point when the caller
is using mark/restore but not rewind or backward-scan capability. Insert a materialize plan node between a mergejoin and its inner child if the inner child is a sort that is expected to spill to disk. The materialize shields the sort from the need to do mark/restore and thereby allows it to perform its final merge pass on-the-fly; while the materialize itself is normally cheap since it won't spill to disk unless the number of tuples with equal key values exceeds work_mem. Greg Stark, with some kibitzing from Tom Lane.
This commit is contained in:
@@ -20,10 +20,12 @@
|
||||
* maxKBytes, we dump all the tuples into a temp file and then read from that
|
||||
* when needed.
|
||||
*
|
||||
* When the caller requests random access to the data, we write the temp file
|
||||
* When the caller requests backward-scan capability, we write the temp file
|
||||
* in a format that allows either forward or backward scan. Otherwise, only
|
||||
* forward scan is allowed. But rewind and markpos/restorepos are allowed
|
||||
* in any case.
|
||||
* forward scan is allowed. Rewind and markpos/restorepos are normally allowed
|
||||
* but can be turned off via tuplestore_set_eflags; turning off both backward
|
||||
* scan and rewind enables truncation of the tuplestore at the mark point
|
||||
* (if any) for minimal memory usage.
|
||||
*
|
||||
* Because we allow reading before writing is complete, there are two
|
||||
* interesting positions in the temp file: the current read position and
|
||||
@@ -36,7 +38,7 @@
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.30 2007/01/05 22:19:47 momjian Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.31 2007/05/21 17:57:34 tgl Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@@ -44,6 +46,7 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/heapam.h"
|
||||
#include "executor/executor.h"
|
||||
#include "storage/buffile.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/tuplestore.h"
|
||||
@@ -66,7 +69,7 @@ typedef enum
|
||||
struct Tuplestorestate
|
||||
{
|
||||
TupStoreStatus status; /* enumerated value as shown above */
|
||||
bool randomAccess; /* did caller request random access? */
|
||||
int eflags; /* capability flags */
|
||||
bool interXact; /* keep open through transactions? */
|
||||
long availMem; /* remaining memory available, in bytes */
|
||||
BufFile *myfile; /* underlying file, or NULL if none */
|
||||
@@ -157,11 +160,11 @@ struct Tuplestorestate
|
||||
* may or may not match the in-memory representation of the tuple ---
|
||||
* any conversion needed is the job of the writetup and readtup routines.
|
||||
*
|
||||
* If state->randomAccess is true, then the stored representation of the
|
||||
* tuple must be followed by another "unsigned int" that is a copy of the
|
||||
* If state->eflags & EXEC_FLAG_BACKWARD, then the stored representation of
|
||||
* the tuple must be followed by another "unsigned int" that is a copy of the
|
||||
* length --- so the total tape space used is actually sizeof(unsigned int)
|
||||
* more than the stored length value. This allows read-backwards. When
|
||||
* randomAccess is not true, the write/read routines may omit the extra
|
||||
* EXEC_FLAG_BACKWARD is not set, the write/read routines may omit the extra
|
||||
* length word.
|
||||
*
|
||||
* writetup is expected to write both length words as well as the tuple
|
||||
@@ -192,11 +195,12 @@ struct Tuplestorestate
|
||||
*/
|
||||
|
||||
|
||||
static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
|
||||
static Tuplestorestate *tuplestore_begin_common(int eflags,
|
||||
bool interXact,
|
||||
int maxKBytes);
|
||||
static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
|
||||
static void dumptuples(Tuplestorestate *state);
|
||||
static void tuplestore_trim(Tuplestorestate *state, int ntuples);
|
||||
static unsigned int getlen(Tuplestorestate *state, bool eofOK);
|
||||
static void *copytup_heap(Tuplestorestate *state, void *tup);
|
||||
static void writetup_heap(Tuplestorestate *state, void *tup);
|
||||
@@ -209,14 +213,14 @@ static void *readtup_heap(Tuplestorestate *state, unsigned int len);
|
||||
* Initialize for a tuple store operation.
|
||||
*/
|
||||
static Tuplestorestate *
|
||||
tuplestore_begin_common(bool randomAccess, bool interXact, int maxKBytes)
|
||||
tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
|
||||
{
|
||||
Tuplestorestate *state;
|
||||
|
||||
state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
|
||||
|
||||
state->status = TSS_INMEM;
|
||||
state->randomAccess = randomAccess;
|
||||
state->eflags = eflags;
|
||||
state->interXact = interXact;
|
||||
state->availMem = maxKBytes * 1024L;
|
||||
state->myfile = NULL;
|
||||
@@ -255,9 +259,18 @@ tuplestore_begin_common(bool randomAccess, bool interXact, int maxKBytes)
|
||||
Tuplestorestate *
|
||||
tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
|
||||
{
|
||||
Tuplestorestate *state = tuplestore_begin_common(randomAccess,
|
||||
interXact,
|
||||
maxKBytes);
|
||||
Tuplestorestate *state;
|
||||
int eflags;
|
||||
|
||||
/*
|
||||
* This interpretation of the meaning of randomAccess is compatible
|
||||
* with the pre-8.3 behavior of tuplestores.
|
||||
*/
|
||||
eflags = randomAccess ?
|
||||
(EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND | EXEC_FLAG_MARK) :
|
||||
(EXEC_FLAG_REWIND | EXEC_FLAG_MARK);
|
||||
|
||||
state = tuplestore_begin_common(eflags, interXact, maxKBytes);
|
||||
|
||||
state->copytup = copytup_heap;
|
||||
state->writetup = writetup_heap;
|
||||
@@ -266,6 +279,30 @@ tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
|
||||
return state;
|
||||
}
|
||||
|
||||
/*
|
||||
* tuplestore_set_eflags
|
||||
*
|
||||
* Set capability flags at a finer grain than is allowed by
|
||||
* tuplestore_begin_xxx. This must be called before inserting any data
|
||||
* into the tuplestore.
|
||||
*
|
||||
* eflags is a bitmask following the meanings used for executor node
|
||||
* startup flags (see executor.h). tuplestore pays attention to these bits:
|
||||
* EXEC_FLAG_REWIND need rewind to start
|
||||
* EXEC_FLAG_BACKWARD need backward fetch
|
||||
* EXEC_FLAG_MARK need mark/restore
|
||||
* If tuplestore_set_eflags is not called, REWIND and MARK are allowed,
|
||||
* and BACKWARD is set per "randomAccess" in the tuplestore_begin_xxx call.
|
||||
*/
|
||||
void
|
||||
tuplestore_set_eflags(Tuplestorestate *state, int eflags)
|
||||
{
|
||||
Assert(state->status == TSS_INMEM);
|
||||
Assert(state->memtupcount == 0);
|
||||
|
||||
state->eflags = eflags;
|
||||
}
|
||||
|
||||
/*
|
||||
* tuplestore_end
|
||||
*
|
||||
@@ -420,6 +457,9 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
|
||||
* Fetch the next tuple in either forward or back direction.
|
||||
* Returns NULL if no more tuples. If should_free is set, the
|
||||
* caller must pfree the returned tuple when done with it.
|
||||
*
|
||||
* Backward scan is only allowed if randomAccess was set true or
|
||||
* EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
|
||||
*/
|
||||
static void *
|
||||
tuplestore_gettuple(Tuplestorestate *state, bool forward,
|
||||
@@ -428,7 +468,7 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
|
||||
unsigned int tuplen;
|
||||
void *tup;
|
||||
|
||||
Assert(forward || state->randomAccess);
|
||||
Assert(forward || (state->eflags & EXEC_FLAG_BACKWARD));
|
||||
|
||||
switch (state->status)
|
||||
{
|
||||
@@ -643,6 +683,8 @@ dumptuples(Tuplestorestate *state)
|
||||
void
|
||||
tuplestore_rescan(Tuplestorestate *state)
|
||||
{
|
||||
Assert(state->eflags & EXEC_FLAG_REWIND);
|
||||
|
||||
switch (state->status)
|
||||
{
|
||||
case TSS_INMEM:
|
||||
@@ -671,10 +713,26 @@ tuplestore_rescan(Tuplestorestate *state)
|
||||
void
|
||||
tuplestore_markpos(Tuplestorestate *state)
|
||||
{
|
||||
Assert(state->eflags & EXEC_FLAG_MARK);
|
||||
|
||||
switch (state->status)
|
||||
{
|
||||
case TSS_INMEM:
|
||||
state->markpos_current = state->current;
|
||||
/*
|
||||
* We can truncate the tuplestore if neither backward scan nor
|
||||
* rewind capability are required by the caller. There will
|
||||
* never be a need to back up past the mark point.
|
||||
*
|
||||
* Note: you might think we could remove all the tuples before
|
||||
* "current", since that one is the next to be returned. However,
|
||||
* since tuplestore_gettuple returns a direct pointer to our
|
||||
* internal copy of the tuple, it's likely that the caller has
|
||||
* still got the tuple just before "current" referenced in a slot.
|
||||
* Don't free it yet.
|
||||
*/
|
||||
if (!(state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND)))
|
||||
tuplestore_trim(state, 1);
|
||||
break;
|
||||
case TSS_WRITEFILE:
|
||||
if (state->eof_reached)
|
||||
@@ -708,6 +766,8 @@ tuplestore_markpos(Tuplestorestate *state)
|
||||
void
|
||||
tuplestore_restorepos(Tuplestorestate *state)
|
||||
{
|
||||
Assert(state->eflags & EXEC_FLAG_MARK);
|
||||
|
||||
switch (state->status)
|
||||
{
|
||||
case TSS_INMEM:
|
||||
@@ -733,6 +793,55 @@ tuplestore_restorepos(Tuplestorestate *state)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* tuplestore_trim - remove all but ntuples tuples before current
|
||||
*/
|
||||
static void
|
||||
tuplestore_trim(Tuplestorestate *state, int ntuples)
|
||||
{
|
||||
int nremove;
|
||||
int i;
|
||||
|
||||
/*
|
||||
* We don't bother trimming temp files since it usually would mean more
|
||||
* work than just letting them sit in kernel buffers until they age out.
|
||||
*/
|
||||
if (state->status != TSS_INMEM)
|
||||
return;
|
||||
|
||||
nremove = state->current - ntuples;
|
||||
if (nremove <= 0)
|
||||
return; /* nothing to do */
|
||||
Assert(nremove <= state->memtupcount);
|
||||
|
||||
/* Release no-longer-needed tuples */
|
||||
for (i = 0; i < nremove; i++)
|
||||
{
|
||||
FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
|
||||
pfree(state->memtuples[i]);
|
||||
}
|
||||
|
||||
/*
|
||||
* Slide the array down and readjust pointers. This may look pretty
|
||||
* stupid, but we expect that there will usually not be very many
|
||||
* tuple-pointers to move, so this isn't that expensive; and it keeps
|
||||
* a lot of other logic simple.
|
||||
*
|
||||
* In fact, in the current usage for merge joins, it's demonstrable that
|
||||
* there will always be exactly one non-removed tuple; so optimize that
|
||||
* case.
|
||||
*/
|
||||
if (nremove + 1 == state->memtupcount)
|
||||
state->memtuples[0] = state->memtuples[nremove];
|
||||
else
|
||||
memmove(state->memtuples, state->memtuples + nremove,
|
||||
(state->memtupcount - nremove) * sizeof(void *));
|
||||
|
||||
state->memtupcount -= nremove;
|
||||
state->current -= nremove;
|
||||
state->markpos_current -= nremove;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Tape interface routines
|
||||
@@ -783,7 +892,7 @@ writetup_heap(Tuplestorestate *state, void *tup)
|
||||
|
||||
if (BufFileWrite(state->myfile, (void *) tuple, tuplen) != (size_t) tuplen)
|
||||
elog(ERROR, "write failed");
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */
|
||||
if (BufFileWrite(state->myfile, (void *) &tuplen,
|
||||
sizeof(tuplen)) != sizeof(tuplen))
|
||||
elog(ERROR, "write failed");
|
||||
@@ -804,7 +913,7 @@ readtup_heap(Tuplestorestate *state, unsigned int len)
|
||||
if (BufFileRead(state->myfile, (void *) ((char *) tuple + sizeof(int)),
|
||||
len - sizeof(int)) != (size_t) (len - sizeof(int)))
|
||||
elog(ERROR, "unexpected end of data");
|
||||
if (state->randomAccess) /* need trailing length word? */
|
||||
if (state->eflags & EXEC_FLAG_BACKWARD) /* need trailing length word? */
|
||||
if (BufFileRead(state->myfile, (void *) &tuplen,
|
||||
sizeof(tuplen)) != sizeof(tuplen))
|
||||
elog(ERROR, "unexpected end of data");
|
||||
|
Reference in New Issue
Block a user