1
0
mirror of https://github.com/postgres/postgres.git synced 2025-09-09 13:09:39 +03:00

Improve tuplesort.c to support variable merge order. The original coding

with fixed merge order (fixed number of "tapes") was based on obsolete
assumptions, namely that tape drives are expensive.  Since our "tapes"
are really just a couple of buffers, we can have a lot of them given
adequate workspace.  This allows reduction of the number of merge passes
with consequent savings of I/O during large sorts.

Simon Riggs with some rework by Tom Lane
This commit is contained in:
Tom Lane
2006-02-19 05:54:06 +00:00
parent 85c0eac1af
commit df700e6b40
3 changed files with 169 additions and 63 deletions

View File

@@ -48,7 +48,7 @@
* each source run; we repeatedly output the smallest tuple and insert the
* next tuple from its source tape (if any). When the heap empties, the merge
* is complete. The basic merge algorithm thus needs very little memory ---
* only M tuples for an M-way merge, and M is at most six in the present code.
* only M tuples for an M-way merge, and M is constrained to a small number.
* However, we can still make good use of our full workMem allocation by
* pre-reading additional tuples from each source tape. Without prereading,
* our access pattern to the temporary file would be very erratic; on average
@@ -73,12 +73,25 @@
* on-the-fly as the caller repeatedly calls tuplesort_gettuple; this
* saves one cycle of writing all the data out to disk and reading it in.
*
* Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
* grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
* to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that
* tape drives are expensive beasts, and in particular that there will always
* be many more runs than tape drives. In our implementation a "tape drive"
* doesn't cost much more than a few Kb of memory buffers, so we can afford
* to have lots of them. In particular, if we can have as many tape drives
* as sorted runs, we can eliminate any repeated I/O at all. In the current
* code we determine the number of tapes M on the basis of workMem: we want
* workMem/M to be large enough that we read a fair amount of data each time
* we preread from a tape, so as to maintain the locality of access described
* above. Nonetheless, with large workMem we can have many tapes.
*
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.57 2006/01/05 01:56:29 momjian Exp $
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.58 2006/02/19 05:54:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -120,11 +133,18 @@ typedef enum
} TupSortStatus;
/*
* We use a seven-tape polyphase merge, which is the "sweet spot" on the
* tapes-to-passes curve according to Knuth's figure 70 (section 5.4.2).
* Parameters for calculation of number of tapes to use --- see inittapes().
*
* In this calculation we assume that each tape will cost us about 3 blocks
* worth of buffer space (which is an underestimate for very large data
* volumes, but it's probably close enough --- see logtape.c).
*
* MERGE_BUFFER_SIZE is how much data we'd like to read from each
* tape during a preread cycle (see discussion at top of file).
*/
#define MAXTAPES 7 /* Knuth's T */
#define TAPERANGE (MAXTAPES-1) /* Knuth's P */
#define MINTAPES 7 /* minimum number of tapes */
#define TAPE_BUFFER_OVERHEAD (BLCKSZ * 3)
#define MERGE_BUFFER_SIZE (BLCKSZ * 32)
/*
* Private state of a Tuplesort operation.
@@ -135,6 +155,8 @@ struct Tuplesortstate
bool randomAccess; /* did caller request random access? */
long availMem; /* remaining memory available, in bytes */
long allowedMem; /* total memory allowed, in bytes */
int maxTapes; /* number of tapes (Knuth's T) */
int tapeRange; /* maxTapes-1 (Knuth's P) */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
/*
@@ -179,7 +201,7 @@ struct Tuplesortstate
* SORTEDINMEM, the tuples are in final sorted order; in states BUILDRUNS
* and FINALMERGE, the tuples are organized in "heap" order per Algorithm
* H. (Note that memtupcount only counts the tuples that are part of the
* heap --- during merge passes, memtuples[] entries beyond TAPERANGE are
* heap --- during merge passes, memtuples[] entries beyond tapeRange are
* never in the heap and are used to hold pre-read tuples.) In state
* SORTEDONTAPE, the array is not used.
*/
@@ -204,6 +226,11 @@ struct Tuplesortstate
*/
int currentRun;
/*
* Unless otherwise noted, all pointer variables below are pointers
* to arrays of length maxTapes, holding per-tape data.
*/
/*
* These variables are only used during merge passes. mergeactive[i] is
* true if we are reading an input run from (actual) tape number i and
@@ -218,11 +245,10 @@ struct Tuplesortstate
* in these lists, because memtuples[0] is part of the merge heap and is
* never a pre-read tuple.
*/
bool mergeactive[MAXTAPES]; /* Active input run source? */
int mergenext[MAXTAPES]; /* first preread tuple for each source */
int mergelast[MAXTAPES]; /* last preread tuple for each source */
long mergeavailmem[MAXTAPES]; /* availMem for prereading
* tapes */
bool *mergeactive; /* Active input run source? */
int *mergenext; /* first preread tuple for each source */
int *mergelast; /* last preread tuple for each source */
long *mergeavailmem; /* availMem for prereading tapes */
long spacePerTape; /* actual per-tape target usage */
int mergefreelist; /* head of freelist of recycled slots */
int mergefirstfree; /* first slot never used in this merge */
@@ -234,10 +260,10 @@ struct Tuplesortstate
*/
int Level; /* Knuth's l */
int destTape; /* current output tape (Knuth's j, less 1) */
int tp_fib[MAXTAPES]; /* Target Fibonacci run counts (A[]) */
int tp_runs[MAXTAPES]; /* # of real runs on each tape */
int tp_dummy[MAXTAPES]; /* # of dummy runs for each tape (D[]) */
int tp_tapenum[MAXTAPES]; /* Actual tape numbers (TAPE[]) */
int *tp_fib; /* Target Fibonacci run counts (A[]) */
int *tp_runs; /* # of real runs on each tape */
int *tp_dummy; /* # of dummy runs for each tape (D[]) */
int *tp_tapenum; /* Actual tape numbers (TAPE[]) */
/*
* These variables are used after completion of sorting to keep track of
@@ -259,8 +285,8 @@ struct Tuplesortstate
*/
TupleDesc tupDesc;
int nKeys;
ScanKey scanKeys;
SortFunctionKind *sortFnKinds;
ScanKey scanKeys; /* array of length nKeys */
SortFunctionKind *sortFnKinds; /* array of length nKeys */
/*
* These variables are specific to the IndexTuple case; they are set by
@@ -448,7 +474,10 @@ tuplesort_begin_common(int workMem, bool randomAccess)
state->currentRun = 0;
/* Algorithm D variables will be initialized by inittapes, if needed */
/*
* maxTapes, tapeRange, and Algorithm D variables will be initialized by
* inittapes(), if needed
*/
state->result_tape = -1; /* flag that result tape has not been formed */
@@ -1041,6 +1070,29 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward,
return true;
}
/*
* tuplesort_merge_order - report merge order we'll use for given memory
*
* This is exported for use by the planner. allowedMem is in bytes.
*
* This must match the calculation in inittapes. The only reason we
* don't fold the code together is that inittapes wants to know if the
* MINTAPES limitation applies or not.
*/
int
tuplesort_merge_order(long allowedMem)
{
int maxTapes;
/* see inittapes for comments */
maxTapes = (int) ((allowedMem - TAPE_BUFFER_OVERHEAD) /
(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD)) + 1;
maxTapes = Max(maxTapes, MINTAPES);
/* The merge order is one less than the number of tapes */
return maxTapes - 1;
}
/*
* inittapes - initialize for tape sorting.
@@ -1050,16 +1102,64 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward,
static void
inittapes(Tuplesortstate *state)
{
int ntuples,
int maxTapes,
ntuples,
j;
/*
* Determine the number of tapes to use based on allowed memory.
*
* We need T+1 tapes to do a T-way merge, and we want MERGE_BUFFER_SIZE
* tuple workspace for each input tape of the merge. The output tape
* doesn't account for tuple workspace but it does need tape buffer space.
*
* Keep this code in sync with tuplesort_merge_order!
*/
maxTapes = (int) ((state->allowedMem - TAPE_BUFFER_OVERHEAD) /
(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD)) + 1;
/*
* We will use at least MINTAPES regardless, but otherwise we decrease
* availMem to reflect the space that goes into buffers.
*/
if (maxTapes >= MINTAPES)
{
/* maxTapes is OK, adjust availMem */
USEMEM(state, maxTapes * TAPE_BUFFER_OVERHEAD);
}
else
{
/*
* Force minimum tape count. In this path we ignore the tape buffers
* in our space calculation, to avoid driving availMem permanently
* negative if allowedMem is really tiny. (This matches the pre-8.2
* behavior which was to ignore the tape buffers always, on the
* grounds that they were fixed-size overhead.)
*/
maxTapes = MINTAPES;
}
state->maxTapes = maxTapes;
state->tapeRange = maxTapes - 1;
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "switching to external sort: %s",
pg_rusage_show(&state->ru_start));
elog(LOG, "switching to external sort with %d tapes: %s",
maxTapes, pg_rusage_show(&state->ru_start));
#endif
state->tapeset = LogicalTapeSetCreate(MAXTAPES);
/*
* Create the tape set and allocate the per-tape data arrays.
*/
state->tapeset = LogicalTapeSetCreate(maxTapes);
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
/*
* Allocate the memtupindex array, same size as memtuples.
@@ -1087,15 +1187,15 @@ inittapes(Tuplesortstate *state)
/*
* Initialize variables of Algorithm D (step D1).
*/
for (j = 0; j < MAXTAPES; j++)
for (j = 0; j < maxTapes; j++)
{
state->tp_fib[j] = 1;
state->tp_runs[j] = 0;
state->tp_dummy[j] = 1;
state->tp_tapenum[j] = j;
}
state->tp_fib[TAPERANGE] = 0;
state->tp_dummy[TAPERANGE] = 0;
state->tp_fib[state->tapeRange] = 0;
state->tp_dummy[state->tapeRange] = 0;
state->Level = 1;
state->destTape = 0;
@@ -1130,7 +1230,7 @@ selectnewtape(Tuplesortstate *state)
/* Step D4: increase level */
state->Level++;
a = state->tp_fib[0];
for (j = 0; j < TAPERANGE; j++)
for (j = 0; j < state->tapeRange; j++)
{
state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
state->tp_fib[j] = a + state->tp_fib[j + 1];
@@ -1170,18 +1270,19 @@ mergeruns(Tuplesortstate *state)
}
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
for (;;)
{
/* Step D5: merge runs onto tape[T] until tape[P] is empty */
while (state->tp_runs[TAPERANGE - 1] || state->tp_dummy[TAPERANGE - 1])
while (state->tp_runs[state->tapeRange - 1] ||
state->tp_dummy[state->tapeRange - 1])
{
bool allDummy = true;
bool allOneRun = true;
for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
if (state->tp_dummy[tapenum] == 0)
allDummy = false;
@@ -1203,8 +1304,8 @@ mergeruns(Tuplesortstate *state)
}
if (allDummy)
{
state->tp_dummy[TAPERANGE]++;
for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
state->tp_dummy[state->tapeRange]++;
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
state->tp_dummy[tapenum]--;
}
else
@@ -1214,20 +1315,20 @@ mergeruns(Tuplesortstate *state)
if (--state->Level == 0)
break;
/* rewind output tape T to use as new input */
LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE],
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
false);
/* rewind used-up input tape P, and prepare it for write pass */
LogicalTapeRewind(state->tapeset, state->tp_tapenum[TAPERANGE - 1],
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
true);
state->tp_runs[TAPERANGE - 1] = 0;
state->tp_runs[state->tapeRange - 1] = 0;
/*
* reassign tape units per step D6; note we no longer care about A[]
*/
svTape = state->tp_tapenum[TAPERANGE];
svDummy = state->tp_dummy[TAPERANGE];
svRuns = state->tp_runs[TAPERANGE];
for (tapenum = TAPERANGE; tapenum > 0; tapenum--)
svTape = state->tp_tapenum[state->tapeRange];
svDummy = state->tp_dummy[state->tapeRange];
svRuns = state->tp_runs[state->tapeRange];
for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
{
state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
@@ -1246,7 +1347,7 @@ mergeruns(Tuplesortstate *state)
* output tape while rewinding it. The last iteration of step D6 would be
* a waste of cycles anyway...
*/
state->result_tape = state->tp_tapenum[TAPERANGE];
state->result_tape = state->tp_tapenum[state->tapeRange];
LogicalTapeFreeze(state->tapeset, state->result_tape);
state->status = TSS_SORTEDONTAPE;
}
@@ -1260,7 +1361,7 @@ mergeruns(Tuplesortstate *state)
static void
mergeonerun(Tuplesortstate *state)
{
int destTape = state->tp_tapenum[TAPERANGE];
int destTape = state->tp_tapenum[state->tapeRange];
int srcTape;
int tupIndex;
void *tup;
@@ -1313,7 +1414,7 @@ mergeonerun(Tuplesortstate *state)
* output tape, and increment its count of real runs.
*/
markrunend(state, destTape);
state->tp_runs[TAPERANGE]++;
state->tp_runs[state->tapeRange]++;
#ifdef TRACE_SORT
if (trace_sort)
@@ -1341,16 +1442,16 @@ beginmerge(Tuplesortstate *state)
Assert(state->memtupcount == 0);
/* Clear merge-pass state variables */
memset(state->mergeactive, 0, sizeof(state->mergeactive));
memset(state->mergenext, 0, sizeof(state->mergenext));
memset(state->mergelast, 0, sizeof(state->mergelast));
memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
memset(state->mergeactive, 0, state->maxTapes * sizeof(*state->mergeactive));
memset(state->mergenext, 0, state->maxTapes * sizeof(*state->mergenext));
memset(state->mergelast, 0, state->maxTapes * sizeof(*state->mergelast));
memset(state->mergeavailmem, 0, state->maxTapes * sizeof(*state->mergeavailmem));
state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = MAXTAPES; /* first slot available for preread */
state->mergefirstfree = state->maxTapes; /* 1st slot avail for preread */
/* Adjust run counts and mark the active tapes */
activeTapes = 0;
for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
{
if (state->tp_dummy[tapenum] > 0)
state->tp_dummy[tapenum]--;
@@ -1370,7 +1471,7 @@ beginmerge(Tuplesortstate *state)
*/
Assert(activeTapes > 0);
state->spacePerTape = state->availMem / activeTapes;
for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
if (state->mergeactive[srcTape])
state->mergeavailmem[srcTape] = state->spacePerTape;
@@ -1383,7 +1484,7 @@ beginmerge(Tuplesortstate *state)
mergepreread(state);
/* Load the merge heap with the first tuple from each input tape */
for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
int tupIndex = state->mergenext[srcTape];
void *tup;
@@ -1420,7 +1521,7 @@ mergepreread(Tuplesortstate *state)
long priorAvail,
spaceUsed;
for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
if (!state->mergeactive[srcTape])
continue;
@@ -1534,9 +1635,9 @@ dumptuples(Tuplesortstate *state, bool alltuples)
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "finished writing%s run %d: %s",
elog(LOG, "finished writing%s run %d to tape %d: %s",
(state->memtupcount == 0) ? " final" : "",
state->currentRun,
state->currentRun, state->destTape,
pg_rusage_show(&state->ru_start));
#endif