1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Simplify the code for logical tape read buffers.

Pass the buffer size as argument to LogicalTapeRewindForRead, rather than
setting it earlier with the separate LogicTapeAssignReadBufferSize call.
This way, the buffer size is set closer to where it's actually used, which
makes the code easier to understand.

This makes the calculation for how much memory to use for the buffers less
precise. We now use the same amount of memory for every tape, rounded down
to the nearest BLCKSZ boundary, instead of using one more block for some
tapes, to get the total up to exact amount of memory available. That should
be OK, merging isn't too sensitive to the exact amount of memory used.

Reviewed by Peter Geoghegan

Discussion: <0f607c4b-df23-353e-bf56-c0389d28495f@iki.fi>
This commit is contained in:
Heikki Linnakangas
2016-10-12 12:05:45 +03:00
parent 2f1eaf87e8
commit b75f467b6e
3 changed files with 139 additions and 200 deletions

View File

@@ -366,6 +366,9 @@ struct Tuplesortstate
char *slabMemoryEnd; /* end of slab memory arena */
SlabSlot *slabFreeHead; /* head of free list */
/* Buffer size to use for reading input tapes, during merge. */
size_t read_buffer_size;
/*
* When we return a tuple to the caller in tuplesort_gettuple_XXX, that
* came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
@@ -579,7 +582,6 @@ static bool useselection(Tuplesortstate *state);
static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state);
static void init_slab_allocator(Tuplesortstate *state, int numSlots);
static void init_tape_buffers(Tuplesortstate *state, int numInputTapes);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
@@ -2056,7 +2058,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* end of the sort anyway, but better to release the
* memory early.
*/
LogicalTapeRewind(state->tapeset, srcTape, true);
LogicalTapeRewindForWrite(state->tapeset, srcTape);
return true;
}
newtup.tupindex = srcTape;
@@ -2511,72 +2513,6 @@ init_slab_allocator(Tuplesortstate *state, int numSlots)
state->slabAllocatorUsed = true;
}
/*
* Divide all remaining work memory (availMem) as read buffers, for all
* the tapes that will be used during the merge.
*
* We use the number of possible *input* tapes here, rather than maxTapes,
* for the calculation. At all times, we'll be reading from at most
* numInputTapes tapes, and one tape is used for output (unless we do an
* on-the-fly final merge, in which case we don't have an output tape).
*/
static void
init_tape_buffers(Tuplesortstate *state, int numInputTapes)
{
int64 availBlocks;
int64 blocksPerTape;
int remainder;
int tapenum;
/*
* Divide availMem evenly among the number of input tapes.
*/
availBlocks = state->availMem / BLCKSZ;
blocksPerTape = availBlocks / numInputTapes;
remainder = availBlocks % numInputTapes;
USEMEM(state, availBlocks * BLCKSZ);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
(availBlocks * BLCKSZ) / 1024, numInputTapes);
#endif
/*
* Use one page per tape, even if we are out of memory.
* tuplesort_merge_order() should've chosen the number of tapes so that
* this can't happen, but better safe than sorry. (This also protects
* from a negative availMem.)
*/
if (blocksPerTape < 1)
{
blocksPerTape = 1;
remainder = 0;
}
/*
* Set the buffers for the tapes.
*
* In a multi-phase merge, the tape that is initially used as an output
* tape, will later be rewound and read from, and should also use a large
* buffer at that point. So we must loop up to maxTapes, not just
* numInputTapes!
*
* If there are fewer runs than tapes, we will set the buffer size also
* for tapes that will go completely unused, but that's harmless.
* LogicalTapeAssignReadBufferSize() doesn't allocate the buffer
* immediately, it just sets the size that will be used, when the tape is
* rewound for read, and the tape isn't empty.
*/
for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
{
int64 numBlocks = blocksPerTape + (tapenum < remainder ? 1 : 0);
LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
numBlocks * BLCKSZ);
}
}
/*
* mergeruns -- merge all the completed initial runs.
*
@@ -2679,25 +2615,32 @@ mergeruns(Tuplesortstate *state)
}
/*
* Use all the spare memory we have available for read buffers for the
* tapes.
* Use all the spare memory we have available for read buffers among the
* input tapes.
*
* We do this only after checking for the case that we produced only one
* initial run, because there is no need to use a large read buffer when
* we're reading from a single tape. With one tape, the I/O pattern will
* be the same regardless of the buffer size.
*
* We don't try to "rebalance" the amount of memory among tapes, when we
* start a new merge phase, even if some tapes can be inactive in the
* phase. That would be hard, because logtape.c doesn't know where one
* run ends and another begins. When a new merge phase begins, and a tape
* doesn't participate in it, its buffer nevertheless already contains
* tuples from the next run on same tape, so we cannot release the buffer.
* That's OK in practice, merge performance isn't that sensitive to the
* amount of buffers used, and most merge phases use all or almost all
* tapes, anyway.
* We don't try to "rebalance" the memory among tapes, when we start a new
* merge phase, even if some tapes are inactive in the new phase. That
* would be hard, because logtape.c doesn't know where one run ends and
* another begins. When a new merge phase begins, and a tape doesn't
* participate in it, its buffer nevertheless already contains tuples from
* the next run on same tape, so we cannot release the buffer. That's OK
* in practice, merge performance isn't that sensitive to the amount of
* buffers used, and most merge phases use all or almost all tapes,
* anyway.
*/
init_tape_buffers(state, numInputTapes);
#ifdef TRACE_SORT
if (trace_sort)
elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
(state->availMem) / 1024, numInputTapes);
#endif
state->read_buffer_size = state->availMem / numInputTapes;
USEMEM(state, state->availMem);
/*
* Allocate a new 'memtuples' array, for the heap. It will hold one tuple
@@ -2709,7 +2652,7 @@ mergeruns(Tuplesortstate *state)
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
for (;;)
{
@@ -2772,11 +2715,10 @@ mergeruns(Tuplesortstate *state)
if (--state->Level == 0)
break;
/* rewind output tape T to use as new input */
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
false);
LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
state->read_buffer_size);
/* rewind used-up input tape P, and prepare it for write pass */
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
true);
LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
state->tp_runs[state->tapeRange - 1] = 0;
/*
@@ -2812,7 +2754,7 @@ mergeruns(Tuplesortstate *state)
for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
{
if (tapenum != state->result_tape)
LogicalTapeRewind(state->tapeset, tapenum, true);
LogicalTapeRewindForWrite(state->tapeset, tapenum);
}
}
@@ -3174,9 +3116,9 @@ tuplesort_rescan(Tuplesortstate *state)
state->markpos_eof = false;
break;
case TSS_SORTEDONTAPE:
LogicalTapeRewind(state->tapeset,
state->result_tape,
false);
LogicalTapeRewindForRead(state->tapeset,
state->result_tape,
0);
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;