diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index d5930f258d9..d8e8ccad1ff 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -10,14 +10,22 @@ * amounts are sorted using temporary files and a standard external sort * algorithm. * - * See Knuth, volume 3, for more than you want to know about the external - * sorting algorithm. Historically, we divided the input into sorted runs - * using replacement selection, in the form of a priority tree implemented - * as a heap (essentially his Algorithm 5.2.3H), but now we always use - * quicksort for run generation. We merge the runs using polyphase merge, - * Knuth's Algorithm 5.4.2D. The logical "tapes" used by Algorithm D are - * implemented by logtape.c, which avoids space wastage by recycling disk - * space as soon as each block is read from its "tape". + * See Knuth, volume 3, for more than you want to know about external + * sorting algorithms. The algorithm we use is a balanced k-way merge. + * Before PostgreSQL 15, we used the polyphase merge algorithm (Knuth's + * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced + * merge is better. Knuth is assuming that tape drives are expensive + * beasts, and in particular that there will always be many more runs than + * tape drives. The polyphase merge algorithm was good at keeping all the + * tape drives busy, but in our implementation a "tape drive" doesn't cost + * much more than a few Kb of memory buffers, so we can afford to have + * lots of them. In particular, if we can have as many tape drives as + * sorted runs, we can eliminate any repeated I/O at all. + * + * Historically, we divided the input into sorted runs using replacement + * selection, in the form of a priority tree implemented as a heap + * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort + * for run generation. * * The approximate amount of memory allowed for any one sort operation * is specified in kilobytes by the caller (most pass work_mem). Initially, @@ -27,9 +35,11 @@ * tuples just by scanning the tuple array sequentially. If we do exceed * workMem, we begin to emit tuples into sorted runs in temporary tapes. * When tuples are dumped in batch after quicksorting, we begin a new run - * with a new output tape (selected per Algorithm D). After the end of the - * input is reached, we dump out remaining tuples in memory into a final run, - * then merge the runs using Algorithm D. + * with a new output tape. If we reach the max number of tapes, we write + * subsequent runs on the existing tapes in a round-robin fashion. We will + * need multiple merge passes to finish the merge in that case. After the + * end of the input is reached, we dump out remaining tuples in memory into + * a final run, then merge the runs. * * When merging runs, we use a heap containing just the frontmost tuple from * each source run; we repeatedly output the smallest tuple and replace it @@ -52,6 +62,14 @@ * accesses. The pre-reading is handled by logtape.c, we just tell it how * much memory to use for the buffers. * + * In the current code we determine the number of input tapes M on the basis + * of workMem: we want workMem/M to be large enough that we read a fair + * amount of data each time we read from a tape, so as to maintain the + * locality of access described above. Nonetheless, with large workMem we + * can have many tapes. The logical "tapes" are implemented by logtape.c, + * which avoids space wastage by recycling disk space as soon as each block + * is read from its "tape". + * * When the caller requests random access to the sort result, we form * the final sorted run on a logical tape which is then "frozen", so * that we can access it randomly. When the caller does not need random @@ -60,20 +78,6 @@ * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this * saves one cycle of writing all the data out to disk and reading it in. * - * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the - * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according - * to Knuth's figure 70 (section 5.4.2). However, Knuth is assuming that - * tape drives are expensive beasts, and in particular that there will always - * be many more runs than tape drives. In our implementation a "tape drive" - * doesn't cost much more than a few Kb of memory buffers, so we can afford - * to have lots of them. In particular, if we can have as many tape drives - * as sorted runs, we can eliminate any repeated I/O at all. In the current - * code we determine the number of tapes M on the basis of workMem: we want - * workMem/M to be large enough that we read a fair amount of data each time - * we preread from a tape, so as to maintain the locality of access described - * above. Nonetheless, with large workMem we can have many tapes (but not - * too many -- see the comments in tuplesort_merge_order). - * * This module supports parallel sorting. Parallel sorts involve coordination * among one or more worker processes, and a leader process, each with its own * tuplesort state. The leader process (or, more accurately, the @@ -223,8 +227,9 @@ typedef enum * worth of buffer space. This ignores the overhead of all the other data * structures needed for each tape, but it's probably close enough. * - * MERGE_BUFFER_SIZE is how much data we'd like to read from each input - * tape during a preread cycle (see discussion at top of file). + * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each + * input tape, for pre-reading (see discussion at top of file). This is *in + * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD. */ #define MINORDER 6 /* minimum merge order */ #define MAXORDER 500 /* maximum merge order */ @@ -249,8 +254,8 @@ struct Tuplesortstate bool tuples; /* Can SortTuple.tuple ever be set? */ int64 availMem; /* remaining memory available, in bytes */ int64 allowedMem; /* total memory allowed, in bytes */ - int maxTapes; /* number of tapes (Knuth's T) */ - int tapeRange; /* maxTapes-1 (Knuth's P) */ + int maxTapes; /* max number of input tapes to merge in each + * pass */ int64 maxSpace; /* maximum amount of space occupied among sort * of groups, either in-memory or on-disk */ bool isMaxSpaceDisk; /* true when maxSpace is value for on-disk @@ -262,7 +267,6 @@ struct Tuplesortstate MemoryContext sortcontext; /* memory context holding most sort data */ MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ - LogicalTape **tapes; /* * These function pointers decouple the routines that must know what kind @@ -347,8 +351,8 @@ struct Tuplesortstate char *slabMemoryEnd; /* end of slab memory arena */ SlabSlot *slabFreeHead; /* head of free list */ - /* Buffer size to use for reading input tapes, during merge. */ - size_t read_buffer_size; + /* Memory used for input and output tape buffers. */ + size_t tape_buffer_mem; /* * When we return a tuple to the caller in tuplesort_gettuple_XXX, that @@ -365,36 +369,29 @@ struct Tuplesortstate int currentRun; /* - * Unless otherwise noted, all pointer variables below are pointers to - * arrays of length maxTapes, holding per-tape data. + * Logical tapes, for merging. + * + * The initial runs are written in the output tapes. In each merge pass, + * the output tapes of the previous pass become the input tapes, and new + * output tapes are created as needed. When nInputTapes equals + * nInputRuns, there is only one merge pass left. */ + LogicalTape **inputTapes; + int nInputTapes; + int nInputRuns; - /* - * This variable is only used during merge passes. mergeactive[i] is true - * if we are reading an input run from (actual) tape number i and have not - * yet exhausted that run. - */ - bool *mergeactive; /* active input run source? */ + LogicalTape **outputTapes; + int nOutputTapes; + int nOutputRuns; - /* - * Variables for Algorithm D. Note that destTape is a "logical" tape - * number, ie, an index into the tp_xxx[] arrays. Be careful to keep - * "logical" and "actual" tape numbers straight! - */ - int Level; /* Knuth's l */ - int destTape; /* current output tape (Knuth's j, less 1) */ - int *tp_fib; /* Target Fibonacci run counts (A[]) */ - int *tp_runs; /* # of real runs on each tape */ - int *tp_dummy; /* # of dummy runs for each tape (D[]) */ - int *tp_tapenum; /* Actual tape numbers (TAPE[]) */ - int activeTapes; /* # of active input tapes in merge pass */ + LogicalTape *destTape; /* current output tape */ /* * These variables are used after completion of sorting to keep track of * the next tuple to return. (In the tape case, the tape's current read * position is also critical state.) */ - LogicalTape *result_tape; /* tape of finished output */ + LogicalTape *result_tape; /* actual tape of finished output */ int current; /* array index (only used if SORTEDINMEM) */ bool eof_reached; /* reached EOF (needed for cursors) */ @@ -415,8 +412,9 @@ struct Tuplesortstate * * nParticipants is the number of worker Tuplesortstates known by the * leader to have actually been launched, which implies that they must - * finish a run leader can merge. Typically includes a worker state held - * by the leader process itself. Set in the leader Tuplesortstate only. + * finish a run that the leader needs to merge. Typically includes a + * worker state held by the leader process itself. Set in the leader + * Tuplesortstate only. */ int worker; Sharedsort *shared; @@ -620,7 +618,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state); -static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup); +static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup); static void dumptuples(Tuplesortstate *state, bool alltuples); static void make_bounded_heap(Tuplesortstate *state); static void sort_bounded_heap(Tuplesortstate *state); @@ -885,8 +883,8 @@ tuplesort_begin_batch(Tuplesortstate *state) state->currentRun = 0; /* - * maxTapes, tapeRange, and Algorithm D variables will be initialized by - * inittapes(), if needed + * Tape variables (inputTapes, outputTapes, etc.) will be initialized by + * inittapes(), if needed. */ state->result_tape = NULL; /* flag that result tape has not been formed */ @@ -1408,6 +1406,10 @@ tuplesort_free(Tuplesortstate *state) * * Note: want to include this in reported total cost of sort, hence need * for two #ifdef TRACE_SORT sections. + * + * We don't bother to destroy the individual tapes here. They will go away + * with the sortcontext. (In TSS_FINALMERGE state, we have closed + * finished tapes already.) */ if (state->tapeset) LogicalTapeSetClose(state->tapeset); @@ -2130,7 +2132,7 @@ tuplesort_performsort(Tuplesortstate *state) { if (state->status == TSS_FINALMERGE) elog(LOG, "performsort of worker %d done (except %d-way final merge): %s", - state->worker, state->activeTapes, + state->worker, state->nInputTapes, pg_rusage_show(&state->ru_start)); else elog(LOG, "performsort of worker %d done: %s", @@ -2338,7 +2340,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, */ if (state->memtupcount > 0) { - int srcTape = state->memtuples[0].srctape; + int srcTapeIndex = state->memtuples[0].srctape; + LogicalTape *srcTape = state->inputTapes[srcTapeIndex]; SortTuple newtup; *stup = state->memtuples[0]; @@ -2360,15 +2363,16 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, * Remove the top node from the heap. */ tuplesort_heap_delete_top(state); + state->nInputRuns--; /* * Close the tape. It'd go away at the end of the sort * anyway, but better to release the memory early. */ - LogicalTapeClose(state->tapes[srcTape]); + LogicalTapeClose(srcTape); return true; } - newtup.srctape = srcTape; + newtup.srctape = srcTapeIndex; tuplesort_heap_replace_top(state, &newtup); return true; } @@ -2599,18 +2603,29 @@ tuplesort_merge_order(int64 allowedMem) { int mOrder; - /* - * We need one tape for each merge input, plus another one for the output, - * and each of these tapes needs buffer space. In addition we want - * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't - * count). + /*---------- + * In the merge phase, we need buffer space for each input and output tape. + * Each pass in the balanced merge algorithm reads from M input tapes, and + * writes to N output tapes. Each tape consumes TAPE_BUFFER_OVERHEAD bytes + * of memory. In addition to that, we want MERGE_BUFFER_SIZE workspace per + * input tape. + * + * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) + + * N * TAPE_BUFFER_OVERHEAD + * + * Except for the last and next-to-last merge passes, where there can be + * fewer tapes left to process, M = N. We choose M so that we have the + * desired amount of memory available for the input buffers + * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory + * available for the tape buffers (allowedMem). * * Note: you might be thinking we need to account for the memtuples[] * array in this calculation, but we effectively treat that as part of the * MERGE_BUFFER_SIZE workspace. + *---------- */ - mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) / - (MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD); + mOrder = allowedMem / + (2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE); /* * Even in minimum memory, use at least a MINORDER merge. On the other @@ -2620,7 +2635,7 @@ tuplesort_merge_order(int64 allowedMem) * which in turn can cause the same sort to need more runs, which makes * merging slower even if it can still be done in a single pass. Also, * high order merges are quite slow due to CPU cache effects; it can be - * faster to pay the I/O cost of a polyphase merge than to perform a + * faster to pay the I/O cost of a multi-pass merge than to perform a * single merge pass across many hundreds of tapes. */ mOrder = Max(mOrder, MINORDER); @@ -2629,6 +2644,42 @@ tuplesort_merge_order(int64 allowedMem) return mOrder; } +/* + * Helper function to calculate how much memory to allocate for the read buffer + * of each input tape in a merge pass. + * + * 'avail_mem' is the amount of memory available for the buffers of all the + * tapes, both input and output. + * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs. + * 'maxOutputTapes' is the max. number of output tapes we should produce. + */ +static int64 +merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns, + int maxOutputTapes) +{ + int nOutputRuns; + int nOutputTapes; + + /* + * How many output tapes will we produce in this pass? + * + * This is nInputRuns / nInputTapes, rounded up. + */ + nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes; + + nOutputTapes = Min(nOutputRuns, maxOutputTapes); + + /* + * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory. All + * remaining memory is divided evenly between the input tapes. + * + * This also follows from the formula in tuplesort_merge_order, but here + * we derive the input buffer size from the amount of memory available, + * and M and N. + */ + return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0); +} + /* * inittapes - initialize for tape sorting. * @@ -2637,58 +2688,49 @@ tuplesort_merge_order(int64 allowedMem) static void inittapes(Tuplesortstate *state, bool mergeruns) { - int maxTapes, - j; - Assert(!LEADER(state)); if (mergeruns) { - /* Compute number of tapes to use: merge order plus 1 */ - maxTapes = tuplesort_merge_order(state->allowedMem) + 1; + /* Compute number of input tapes to use when merging */ + state->maxTapes = tuplesort_merge_order(state->allowedMem); } else { /* Workers can sometimes produce single run, output without merge */ Assert(WORKER(state)); - maxTapes = MINORDER + 1; + state->maxTapes = MINORDER; } #ifdef TRACE_SORT if (trace_sort) elog(LOG, "worker %d switching to external sort with %d tapes: %s", - state->worker, maxTapes, pg_rusage_show(&state->ru_start)); + state->worker, state->maxTapes, pg_rusage_show(&state->ru_start)); #endif - /* Create the tape set and allocate the per-tape data arrays */ - inittapestate(state, maxTapes); + /* Create the tape set */ + inittapestate(state, state->maxTapes); state->tapeset = LogicalTapeSetCreate(false, state->shared ? &state->shared->fileset : NULL, state->worker); - state->tapes = palloc(maxTapes * sizeof(LogicalTape *)); - for (j = 0; j < maxTapes; j++) - state->tapes[j] = LogicalTapeCreate(state->tapeset); state->currentRun = 0; /* - * Initialize variables of Algorithm D (step D1). + * Initialize logical tape arrays. */ - for (j = 0; j < maxTapes; j++) - { - state->tp_fib[j] = 1; - state->tp_runs[j] = 0; - state->tp_dummy[j] = 1; - state->tp_tapenum[j] = j; - } - state->tp_fib[state->tapeRange] = 0; - state->tp_dummy[state->tapeRange] = 0; + state->inputTapes = NULL; + state->nInputTapes = 0; + state->nInputRuns = 0; - state->Level = 1; - state->destTape = 0; + state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *)); + state->nOutputTapes = 0; + state->nOutputRuns = 0; state->status = TSS_BUILDRUNS; + + selectnewtape(state); } /* @@ -2719,52 +2761,37 @@ inittapestate(Tuplesortstate *state, int maxTapes) * called already, but it doesn't matter if it is called a second time. */ PrepareTempTablespaces(); - - state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool)); - state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); - state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); - state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); - state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int)); - - /* Record # of tapes allocated (for duration of sort) */ - state->maxTapes = maxTapes; - /* Record maximum # of tapes usable as inputs when merging */ - state->tapeRange = maxTapes - 1; } /* - * selectnewtape -- select new tape for new initial run. + * selectnewtape -- select next tape to output to. * * This is called after finishing a run when we know another run - * must be started. This implements steps D3, D4 of Algorithm D. + * must be started. This is used both when building the initial + * runs, and during merge passes. */ static void selectnewtape(Tuplesortstate *state) { - int j; - int a; - - /* Step D3: advance j (destTape) */ - if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1]) + if (state->nOutputRuns < state->maxTapes) { - state->destTape++; - return; + /* Create a new tape to hold the next run */ + Assert(state->outputTapes[state->nOutputRuns] == NULL); + Assert(state->nOutputRuns == state->nOutputTapes); + state->destTape = LogicalTapeCreate(state->tapeset); + state->outputTapes[state->nOutputRuns] = state->destTape; + state->nOutputTapes++; + state->nOutputRuns++; } - if (state->tp_dummy[state->destTape] != 0) + else { - state->destTape = 0; - return; + /* + * We have reached the max number of tapes. Append to an existing + * tape. + */ + state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes]; + state->nOutputRuns++; } - - /* Step D4: increase level */ - state->Level++; - a = state->tp_fib[0]; - for (j = 0; j < state->tapeRange; j++) - { - state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j]; - state->tp_fib[j] = a + state->tp_fib[j + 1]; - } - state->destTape = 0; } /* @@ -2803,18 +2830,13 @@ init_slab_allocator(Tuplesortstate *state, int numSlots) /* * mergeruns -- merge all the completed initial runs. * - * This implements steps D5, D6 of Algorithm D. All input data has + * This implements the Balanced k-Way Merge Algorithm. All input data has * already been written to initial runs on tape (see dumptuples). */ static void mergeruns(Tuplesortstate *state) { - int tapenum, - svTape, - svRuns, - svDummy; - int numTapes; - int numInputTapes; + int tapenum; Assert(state->status == TSS_BUILDRUNS); Assert(state->memtupcount == 0); @@ -2849,99 +2871,111 @@ mergeruns(Tuplesortstate *state) pfree(state->memtuples); state->memtuples = NULL; - /* - * If we had fewer runs than tapes, refund the memory that we imagined we - * would need for the tape buffers of the unused tapes. - * - * numTapes and numInputTapes reflect the actual number of tapes we will - * use. Note that the output tape's tape number is maxTapes - 1, so the - * tape numbers of the used tapes are not consecutive, and you cannot just - * loop from 0 to numTapes to visit all used tapes! - */ - if (state->Level == 1) - { - numInputTapes = state->currentRun; - numTapes = numInputTapes + 1; - FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD); - } - else - { - numInputTapes = state->tapeRange; - numTapes = state->maxTapes; - } - /* * Initialize the slab allocator. We need one slab slot per input tape, * for the tuples in the heap, plus one to hold the tuple last returned * from tuplesort_gettuple. (If we're sorting pass-by-val Datums, * however, we don't need to do allocate anything.) * + * In a multi-pass merge, we could shrink this allocation for the last + * merge pass, if it has fewer tapes than previous passes, but we don't + * bother. + * * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism * to track memory usage of individual tuples. */ if (state->tuples) - init_slab_allocator(state, numInputTapes + 1); + init_slab_allocator(state, state->nOutputTapes + 1); else init_slab_allocator(state, 0); /* * Allocate a new 'memtuples' array, for the heap. It will hold one tuple * from each input tape. + * + * We could shrink this, too, between passes in a multi-pass merge, but we + * don't bother. (The initial input tapes are still in outputTapes. The + * number of input tapes will not increase between passes.) */ - state->memtupsize = numInputTapes; + state->memtupsize = state->nOutputTapes; state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext, - numInputTapes * sizeof(SortTuple)); + state->nOutputTapes * sizeof(SortTuple)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); /* - * Use all the remaining memory we have available for read buffers among - * the input tapes. - * - * We don't try to "rebalance" the memory among tapes, when we start a new - * merge phase, even if some tapes are inactive in the new phase. That - * would be hard, because logtape.c doesn't know where one run ends and - * another begins. When a new merge phase begins, and a tape doesn't - * participate in it, its buffer nevertheless already contains tuples from - * the next run on same tape, so we cannot release the buffer. That's OK - * in practice, merge performance isn't that sensitive to the amount of - * buffers used, and most merge phases use all or almost all tapes, - * anyway. + * Use all the remaining memory we have available for tape buffers among + * all the input tapes. At the beginning of each merge pass, we will + * divide this memory between the input and output tapes in the pass. */ + state->tape_buffer_mem = state->availMem; + USEMEM(state, state->availMem); #ifdef TRACE_SORT if (trace_sort) - elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", - state->worker, state->availMem / 1024, numInputTapes); + elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for tape buffers", + state->worker, state->tape_buffer_mem / 1024); #endif - state->read_buffer_size = Max(state->availMem / numInputTapes, 0); - USEMEM(state, state->read_buffer_size * numInputTapes); - - /* End of step D2: rewind all output tapes to prepare for merging */ - for (tapenum = 0; tapenum < state->tapeRange; tapenum++) - LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size); - for (;;) { /* - * At this point we know that tape[T] is empty. If there's just one - * (real or dummy) run left on each input tape, then only one merge - * pass remains. If we don't have to produce a materialized sorted - * tape, we can stop at this point and do the final merge on-the-fly. + * On the first iteration, or if we have read all the runs from the + * input tapes in a multi-pass merge, it's time to start a new pass. + * Rewind all the output tapes, and make them inputs for the next + * pass. */ - if (!state->randomAccess && !WORKER(state)) + if (state->nInputRuns == 0 && !WORKER(state)) { - bool allOneRun = true; + int64 input_buffer_size; - Assert(state->tp_runs[state->tapeRange] == 0); - for (tapenum = 0; tapenum < state->tapeRange; tapenum++) + /* Close the old, emptied, input tapes */ + if (state->nInputTapes > 0) { - if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1) - { - allOneRun = false; - break; - } + for (tapenum = 0; tapenum < state->nInputTapes; tapenum++) + LogicalTapeClose(state->inputTapes[tapenum]); + pfree(state->inputTapes); } - if (allOneRun) + + /* Previous pass's outputs become next pass's inputs. */ + state->inputTapes = state->outputTapes; + state->nInputTapes = state->nOutputTapes; + state->nInputRuns = state->nOutputRuns; + + /* + * Reset output tape variables. The actual LogicalTapes will be + * created as needed, here we only allocate the array to hold + * them. + */ + state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *)); + state->nOutputTapes = 0; + state->nOutputRuns = 0; + + /* + * Redistribute the memory allocated for tape buffers, among the + * new input and output tapes. + */ + input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem, + state->nInputTapes, + state->nInputRuns, + state->maxTapes); + +#ifdef TRACE_SORT + if (trace_sort) + elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s", + state->nInputRuns, state->nInputTapes, input_buffer_size / 1024, + pg_rusage_show(&state->ru_start)); +#endif + + /* Prepare the new input tapes for merge pass. */ + for (tapenum = 0; tapenum < state->nInputTapes; tapenum++) + LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size); + + /* + * If there's just one run left on each input tape, then only one + * merge pass remains. If we don't have to produce a materialized + * sorted tape, we can stop at this point and do the final merge + * on-the-fly. + */ + if (!state->randomAccess && state->nInputRuns <= state->nInputTapes) { /* Tell logtape.c we won't be writing anymore */ LogicalTapeSetForgetFreeSpace(state->tapeset); @@ -2952,103 +2986,47 @@ mergeruns(Tuplesortstate *state) } } - /* Step D5: merge runs onto tape[T] until tape[P] is empty */ - while (state->tp_runs[state->tapeRange - 1] || - state->tp_dummy[state->tapeRange - 1]) - { - bool allDummy = true; + /* Select an output tape */ + selectnewtape(state); - for (tapenum = 0; tapenum < state->tapeRange; tapenum++) - { - if (state->tp_dummy[tapenum] == 0) - { - allDummy = false; - break; - } - } - - if (allDummy) - { - state->tp_dummy[state->tapeRange]++; - for (tapenum = 0; tapenum < state->tapeRange; tapenum++) - state->tp_dummy[tapenum]--; - } - else - mergeonerun(state); - } - - /* Step D6: decrease level */ - if (--state->Level == 0) - break; - - /* rewind output tape T to use as new input */ - LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]], - state->read_buffer_size); - - /* close used-up input tape P, and create a new one for write pass */ - LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]); - state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset); - state->tp_runs[state->tapeRange - 1] = 0; + /* Merge one run from each input tape. */ + mergeonerun(state); /* - * reassign tape units per step D6; note we no longer care about A[] + * If the input tapes are empty, and we output only one output run, + * we're done. The current output tape contains the final result. */ - svTape = state->tp_tapenum[state->tapeRange]; - svDummy = state->tp_dummy[state->tapeRange]; - svRuns = state->tp_runs[state->tapeRange]; - for (tapenum = state->tapeRange; tapenum > 0; tapenum--) - { - state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1]; - state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1]; - state->tp_runs[tapenum] = state->tp_runs[tapenum - 1]; - } - state->tp_tapenum[0] = svTape; - state->tp_dummy[0] = svDummy; - state->tp_runs[0] = svRuns; + if (state->nInputRuns == 0 && state->nOutputRuns <= 1) + break; } /* - * Done. Knuth says that the result is on TAPE[1], but since we exited - * the loop without performing the last iteration of step D6, we have not - * rearranged the tape unit assignment, and therefore the result is on - * TAPE[T]. We need to do it this way so that we can freeze the final - * output tape while rewinding it. The last iteration of step D6 would be - * a waste of cycles anyway... + * Done. The result is on a single run on a single tape. */ - state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]]; + state->result_tape = state->outputTapes[0]; if (!WORKER(state)) LogicalTapeFreeze(state->result_tape, NULL); else worker_freeze_result_tape(state); state->status = TSS_SORTEDONTAPE; - /* Close all the other tapes, to release their read buffers. */ - for (tapenum = 0; tapenum < state->maxTapes; tapenum++) - { - if (state->tapes[tapenum] != state->result_tape) - { - LogicalTapeClose(state->tapes[tapenum]); - state->tapes[tapenum] = NULL; - } - } + /* Close all the now-empty input tapes, to release their read buffers. */ + for (tapenum = 0; tapenum < state->nInputTapes; tapenum++) + LogicalTapeClose(state->inputTapes[tapenum]); } /* - * Merge one run from each input tape, except ones with dummy runs. - * - * This is the inner loop of Algorithm D step D5. We know that the - * output tape is TAPE[T]. + * Merge one run from each input tape. */ static void mergeonerun(Tuplesortstate *state) { - int destTapeNum = state->tp_tapenum[state->tapeRange]; - LogicalTape *destTape = state->tapes[destTapeNum]; - int srcTape; + int srcTapeIndex; + LogicalTape *srcTape; /* * Start the merge by loading one tuple from each active source tape into - * the heap. We can also decrease the input run/dummy run counts. + * the heap. */ beginmerge(state); @@ -3062,8 +3040,9 @@ mergeonerun(Tuplesortstate *state) SortTuple stup; /* write the tuple to destTape */ - srcTape = state->memtuples[0].srctape; - WRITETUP(state, destTape, &state->memtuples[0]); + srcTapeIndex = state->memtuples[0].srctape; + srcTape = state->inputTapes[srcTapeIndex]; + WRITETUP(state, state->destTape, &state->memtuples[0]); /* recycle the slot of the tuple we just wrote out, for the next read */ if (state->memtuples[0].tuple) @@ -3075,72 +3054,47 @@ mergeonerun(Tuplesortstate *state) */ if (mergereadnext(state, srcTape, &stup)) { - stup.srctape = srcTape; + stup.srctape = srcTapeIndex; tuplesort_heap_replace_top(state, &stup); + } else + { tuplesort_heap_delete_top(state); + state->nInputRuns--; + } } /* * When the heap empties, we're done. Write an end-of-run marker on the - * output tape, and increment its count of real runs. + * output tape. */ - markrunend(destTape); - state->tp_runs[state->tapeRange]++; - -#ifdef TRACE_SORT - if (trace_sort) - elog(LOG, "worker %d finished %d-way merge step: %s", state->worker, - state->activeTapes, pg_rusage_show(&state->ru_start)); -#endif + markrunend(state->destTape); } /* * beginmerge - initialize for a merge pass * - * We decrease the counts of real and dummy runs for each tape, and mark - * which tapes contain active input runs in mergeactive[]. Then, fill the - * merge heap with the first tuple from each active tape. + * Fill the merge heap with the first tuple from each input tape. */ static void beginmerge(Tuplesortstate *state) { int activeTapes; - int tapenum; - int srcTape; + int srcTapeIndex; /* Heap should be empty here */ Assert(state->memtupcount == 0); - /* Adjust run counts and mark the active tapes */ - memset(state->mergeactive, 0, - state->maxTapes * sizeof(*state->mergeactive)); - activeTapes = 0; - for (tapenum = 0; tapenum < state->tapeRange; tapenum++) - { - if (state->tp_dummy[tapenum] > 0) - state->tp_dummy[tapenum]--; - else - { - Assert(state->tp_runs[tapenum] > 0); - state->tp_runs[tapenum]--; - srcTape = state->tp_tapenum[tapenum]; - state->mergeactive[srcTape] = true; - activeTapes++; - } - } - Assert(activeTapes > 0); - state->activeTapes = activeTapes; + activeTapes = Min(state->nInputTapes, state->nInputRuns); - /* Load the merge heap with the first tuple from each input tape */ - for (srcTape = 0; srcTape < state->maxTapes; srcTape++) + for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++) { SortTuple tup; - if (mergereadnext(state, srcTape, &tup)) + if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup)) { - tup.srctape = srcTape; + tup.srctape = srcTapeIndex; tuplesort_heap_insert(state, &tup); } } @@ -3152,20 +3106,13 @@ beginmerge(Tuplesortstate *state) * Returns false on EOF. */ static bool -mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup) +mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup) { - LogicalTape *srcTape = state->tapes[srcTapeIndex]; unsigned int tuplen; - if (!state->mergeactive[srcTapeIndex]) - return false; /* tape's run is already exhausted */ - /* read next tuple, if any */ if ((tuplen = getlen(srcTape, true)) == 0) - { - state->mergeactive[srcTapeIndex] = false; return false; - } READTUP(state, stup, srcTape, tuplen); return true; @@ -3180,7 +3127,6 @@ mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup) static void dumptuples(Tuplesortstate *state, bool alltuples) { - LogicalTape *destTape; int memtupwrite; int i; @@ -3196,22 +3142,13 @@ dumptuples(Tuplesortstate *state, bool alltuples) * Final call might require no sorting, in rare cases where we just so * happen to have previously LACKMEM()'d at the point where exactly all * remaining tuples are loaded into memory, just before input was - * exhausted. - * - * In general, short final runs are quite possible. Rather than allowing - * a special case where there was a superfluous selectnewtape() call (i.e. - * a call with no subsequent run actually written to destTape), we prefer - * to write out a 0 tuple run. - * - * mergereadnext() is prepared for 0 tuple runs, and will reliably mark - * the tape inactive for the merge when called from beginmerge(). This - * case is therefore similar to the case where mergeonerun() finds a dummy - * run for the tape, and so doesn't need to merge a run from the tape (or - * conceptually "merges" the dummy run, if you prefer). According to - * Knuth, Algorithm D "isn't strictly optimal" in its method of - * distribution and dummy run assignment; this edge case seems very - * unlikely to make that appreciably worse. + * exhausted. In general, short final runs are quite possible, but avoid + * creating a completely empty run. In a worker, though, we must produce + * at least one tape, even if it's empty. */ + if (state->memtupcount == 0 && state->currentRun > 0) + return; + Assert(state->status == TSS_BUILDRUNS); /* @@ -3224,6 +3161,9 @@ dumptuples(Tuplesortstate *state, bool alltuples) errmsg("cannot have more than %d runs for an external sort", INT_MAX))); + if (state->currentRun > 0) + selectnewtape(state); + state->currentRun++; #ifdef TRACE_SORT @@ -3247,10 +3187,9 @@ dumptuples(Tuplesortstate *state, bool alltuples) #endif memtupwrite = state->memtupcount; - destTape = state->tapes[state->tp_tapenum[state->destTape]]; for (i = 0; i < memtupwrite; i++) { - WRITETUP(state, destTape, &state->memtuples[i]); + WRITETUP(state, state->destTape, &state->memtuples[i]); state->memtupcount--; } @@ -3263,19 +3202,14 @@ dumptuples(Tuplesortstate *state, bool alltuples) */ MemoryContextReset(state->tuplecontext); - markrunend(destTape); - state->tp_runs[state->destTape]++; - state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ + markrunend(state->destTape); #ifdef TRACE_SORT if (trace_sort) elog(LOG, "worker %d finished writing run %d to tape %d: %s", - state->worker, state->currentRun, state->destTape, + state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1, pg_rusage_show(&state->ru_start)); #endif - - if (!alltuples) - selectnewtape(state); } /* @@ -4669,8 +4603,9 @@ worker_nomergeruns(Tuplesortstate *state) { Assert(WORKER(state)); Assert(state->result_tape == NULL); + Assert(state->nOutputRuns == 1); - state->result_tape = state->tapes[state->tp_tapenum[state->destTape]]; + state->result_tape = state->destTape; worker_freeze_result_tape(state); } @@ -4707,47 +4642,36 @@ leader_takeover_tapes(Tuplesortstate *state) * Create the tapeset from worker tapes, including a leader-owned tape at * the end. Parallel workers are far more expensive than logical tapes, * so the number of tapes allocated here should never be excessive. - * - * We still have a leader tape, though it's not possible to write to it - * due to restrictions in the shared fileset infrastructure used by - * logtape.c. It will never be written to in practice because - * randomAccess is disallowed for parallel sorts. */ - inittapestate(state, nParticipants + 1); - state->tapeset = LogicalTapeSetCreate(false, - &shared->fileset, - state->worker); - state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *)); - for (j = 0; j < nParticipants; j++) - state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]); - /* tapes[nParticipants] represents the "leader tape", which is not used */ + inittapestate(state, nParticipants); + state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1); - /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */ + /* + * Set currentRun to reflect the number of runs we will merge (it's not + * used for anything, this is just pro forma) + */ state->currentRun = nParticipants; /* - * Initialize variables of Algorithm D to be consistent with runs from - * workers having been generated in the leader. + * Initialize the state to look the same as after building the initial + * runs. * * There will always be exactly 1 run per worker, and exactly one input * tape per run, because workers always output exactly 1 run, even when * there were no input tuples for workers to sort. */ - for (j = 0; j < state->maxTapes; j++) - { - /* One real run; no dummy runs for worker tapes */ - state->tp_fib[j] = 1; - state->tp_runs[j] = 1; - state->tp_dummy[j] = 0; - state->tp_tapenum[j] = j; - } - /* Leader tape gets one dummy run, and no real runs */ - state->tp_fib[state->tapeRange] = 0; - state->tp_runs[state->tapeRange] = 0; - state->tp_dummy[state->tapeRange] = 1; + state->inputTapes = NULL; + state->nInputTapes = 0; + state->nInputRuns = 0; - state->Level = 1; - state->destTape = 0; + state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *)); + state->nOutputTapes = nParticipants; + state->nOutputRuns = nParticipants; + + for (j = 0; j < nParticipants; j++) + { + state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]); + } state->status = TSS_BUILDRUNS; }