mirror of
https://github.com/postgres/postgres.git
synced 2025-08-05 07:41:25 +03:00
Improve read_stream.c's fast path.
The "fast path" for well cached scans that don't do any I/O was accidentally coded in a way that could only be triggered by pg_prewarm's usage pattern, which starts out with a higher distance because of the flags it passes in. We want it to work for streaming sequential scans too, once that patch is committed. Adjust. Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Discussion: https://postgr.es/m/CA%2BhUKGKXZALJ%3D6aArUsXRJzBm%3Dqvc4AWp7%3DiJNXJQqpbRLnD_w%40mail.gmail.com
This commit is contained in:
@@ -169,7 +169,7 @@ get_per_buffer_data(ReadStream *stream, int16 buffer_index)
|
|||||||
/*
|
/*
|
||||||
* Ask the callback which block it would like us to read next, with a small
|
* Ask the callback which block it would like us to read next, with a small
|
||||||
* buffer in front to allow read_stream_unget_block() to work and to allow the
|
* buffer in front to allow read_stream_unget_block() to work and to allow the
|
||||||
* fast path to work in batches.
|
* fast path to skip this function and work directly from the array.
|
||||||
*/
|
*/
|
||||||
static inline BlockNumber
|
static inline BlockNumber
|
||||||
read_stream_get_block(ReadStream *stream, void *per_buffer_data)
|
read_stream_get_block(ReadStream *stream, void *per_buffer_data)
|
||||||
@@ -578,13 +578,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
|
|||||||
if (likely(stream->fast_path))
|
if (likely(stream->fast_path))
|
||||||
{
|
{
|
||||||
BlockNumber next_blocknum;
|
BlockNumber next_blocknum;
|
||||||
bool need_wait;
|
|
||||||
|
|
||||||
/* Fast path assumptions. */
|
/* Fast path assumptions. */
|
||||||
Assert(stream->ios_in_progress == 0);
|
Assert(stream->ios_in_progress == 0);
|
||||||
Assert(stream->pinned_buffers == 1);
|
Assert(stream->pinned_buffers == 1);
|
||||||
Assert(stream->distance == 1);
|
Assert(stream->distance == 1);
|
||||||
Assert(stream->pending_read_nblocks == 1);
|
Assert(stream->pending_read_nblocks == 0);
|
||||||
Assert(stream->per_buffer_data_size == 0);
|
Assert(stream->per_buffer_data_size == 0);
|
||||||
|
|
||||||
/* We're going to return the buffer we pinned last time. */
|
/* We're going to return the buffer we pinned last time. */
|
||||||
@@ -594,40 +593,29 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
|
|||||||
buffer = stream->buffers[oldest_buffer_index];
|
buffer = stream->buffers[oldest_buffer_index];
|
||||||
Assert(buffer != InvalidBuffer);
|
Assert(buffer != InvalidBuffer);
|
||||||
|
|
||||||
/*
|
/* Choose the next block to pin. */
|
||||||
* Pin a buffer for the next call. Same buffer entry, and arbitrary
|
|
||||||
* I/O entry (they're all free).
|
|
||||||
*/
|
|
||||||
need_wait = StartReadBuffer(&stream->ios[0].op,
|
|
||||||
&stream->buffers[oldest_buffer_index],
|
|
||||||
stream->pending_read_blocknum,
|
|
||||||
stream->advice_enabled ?
|
|
||||||
READ_BUFFERS_ISSUE_ADVICE : 0);
|
|
||||||
|
|
||||||
/* Choose the block the next call will pin. */
|
|
||||||
if (unlikely(stream->blocknums_next == stream->blocknums_count))
|
if (unlikely(stream->blocknums_next == stream->blocknums_count))
|
||||||
read_stream_fill_blocknums(stream);
|
read_stream_fill_blocknums(stream);
|
||||||
next_blocknum = stream->blocknums[stream->blocknums_next++];
|
next_blocknum = stream->blocknums[stream->blocknums_next++];
|
||||||
|
|
||||||
/*
|
if (likely(next_blocknum != InvalidBlockNumber))
|
||||||
* Fast return if the next call doesn't require I/O for the buffer we
|
|
||||||
* just pinned, and we have a block number to give it as a pending
|
|
||||||
* read.
|
|
||||||
*/
|
|
||||||
if (likely(!need_wait && next_blocknum != InvalidBlockNumber))
|
|
||||||
{
|
{
|
||||||
stream->pending_read_blocknum = next_blocknum;
|
/*
|
||||||
return buffer;
|
* Pin a buffer for the next call. Same buffer entry, and
|
||||||
}
|
* arbitrary I/O entry (they're all free). We don't have to
|
||||||
|
* adjust pinned_buffers because we're transferring one to caller
|
||||||
|
* but pinning one more.
|
||||||
|
*/
|
||||||
|
if (likely(!StartReadBuffer(&stream->ios[0].op,
|
||||||
|
&stream->buffers[oldest_buffer_index],
|
||||||
|
next_blocknum,
|
||||||
|
stream->advice_enabled ?
|
||||||
|
READ_BUFFERS_ISSUE_ADVICE : 0)))
|
||||||
|
{
|
||||||
|
/* Fast return. */
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* For anything more complex, set up some more state and take the slow
|
|
||||||
* path next time.
|
|
||||||
*/
|
|
||||||
stream->fast_path = false;
|
|
||||||
|
|
||||||
if (need_wait)
|
|
||||||
{
|
|
||||||
/* Next call must wait for I/O for the newly pinned buffer. */
|
/* Next call must wait for I/O for the newly pinned buffer. */
|
||||||
stream->oldest_io_index = 0;
|
stream->oldest_io_index = 0;
|
||||||
stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
|
stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
|
||||||
@@ -635,17 +623,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
|
|||||||
stream->ios[0].buffer_index = oldest_buffer_index;
|
stream->ios[0].buffer_index = oldest_buffer_index;
|
||||||
stream->seq_blocknum = next_blocknum + 1;
|
stream->seq_blocknum = next_blocknum + 1;
|
||||||
}
|
}
|
||||||
if (next_blocknum == InvalidBlockNumber)
|
|
||||||
{
|
|
||||||
/* Next call hits end of stream and can't pin anything more. */
|
|
||||||
stream->distance = 0;
|
|
||||||
stream->pending_read_nblocks = 0;
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* Set up the pending read. */
|
/* No more blocks, end of stream. */
|
||||||
stream->pending_read_blocknum = next_blocknum;
|
stream->distance = 0;
|
||||||
|
stream->oldest_buffer_index = stream->next_buffer_index;
|
||||||
|
stream->pinned_buffers = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stream->fast_path = false;
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@@ -762,15 +748,11 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
|
|||||||
if (stream->ios_in_progress == 0 &&
|
if (stream->ios_in_progress == 0 &&
|
||||||
stream->pinned_buffers == 1 &&
|
stream->pinned_buffers == 1 &&
|
||||||
stream->distance == 1 &&
|
stream->distance == 1 &&
|
||||||
stream->pending_read_nblocks == 1 &&
|
stream->pending_read_nblocks == 0 &&
|
||||||
stream->per_buffer_data_size == 0)
|
stream->per_buffer_data_size == 0)
|
||||||
{
|
{
|
||||||
stream->fast_path = true;
|
stream->fast_path = true;
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
stream->fast_path = false;
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return buffer;
|
return buffer;
|
||||||
@@ -790,6 +772,11 @@ read_stream_reset(ReadStream *stream)
|
|||||||
/* Stop looking ahead. */
|
/* Stop looking ahead. */
|
||||||
stream->distance = 0;
|
stream->distance = 0;
|
||||||
|
|
||||||
|
/* Forget buffered block numbers and fast path state. */
|
||||||
|
stream->blocknums_next = 0;
|
||||||
|
stream->blocknums_count = 0;
|
||||||
|
stream->fast_path = false;
|
||||||
|
|
||||||
/* Unpin anything that wasn't consumed. */
|
/* Unpin anything that wasn't consumed. */
|
||||||
while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
|
while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
|
||||||
ReleaseBuffer(buffer);
|
ReleaseBuffer(buffer);
|
||||||
|
Reference in New Issue
Block a user