diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 175f8410baf..9c0163eda3e 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -133,6 +133,7 @@ struct ReadStream /* Next expected block, for detecting sequential access. */ BlockNumber seq_blocknum; + BlockNumber seq_until_processed; /* The read operation we are currently preparing. */ BlockNumber pending_read_blocknum; @@ -238,7 +239,7 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) * distance to a level that prevents look-ahead until buffers are released. */ static bool -read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) +read_stream_start_pending_read(ReadStream *stream) { bool need_wait; int nblocks; @@ -262,16 +263,32 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) else Assert(stream->next_buffer_index == stream->oldest_buffer_index); - /* - * If advice hasn't been suppressed, this system supports it, and this - * isn't a strictly sequential pattern, then we'll issue advice. - */ - if (!suppress_advice && - stream->advice_enabled && - stream->pending_read_blocknum != stream->seq_blocknum) - flags = READ_BUFFERS_ISSUE_ADVICE; - else - flags = 0; + /* Do we need to issue read-ahead advice? */ + flags = 0; + if (stream->advice_enabled) + { + if (stream->pending_read_blocknum == stream->seq_blocknum) + { + /* + * Sequential: Issue advice until the preadv() calls have caught + * up with the first advice issued for this sequential region, and + * then stay of the way of the kernel's own read-ahead. + */ + if (stream->seq_until_processed != InvalidBlockNumber) + flags = READ_BUFFERS_ISSUE_ADVICE; + } + else + { + /* + * Random jump: Note the starting location of a new potential + * sequential region and start issuing advice. Skip it this time + * if the preadv() follows immediately, eg first block in stream. + */ + stream->seq_until_processed = stream->pending_read_blocknum; + if (stream->pinned_buffers > 0) + flags = READ_BUFFERS_ISSUE_ADVICE; + } + } /* How many more buffers is this backend allowed? */ if (stream->temporary) @@ -360,7 +377,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) } static void -read_stream_look_ahead(ReadStream *stream, bool suppress_advice) +read_stream_look_ahead(ReadStream *stream) { while (stream->ios_in_progress < stream->max_ios && stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) @@ -371,8 +388,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) if (stream->pending_read_nblocks == stream->io_combine_limit) { - read_stream_start_pending_read(stream, suppress_advice); - suppress_advice = false; + read_stream_start_pending_read(stream); continue; } @@ -405,15 +421,13 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) /* We have to start the pending read before we can build another. */ while (stream->pending_read_nblocks > 0) { - if (!read_stream_start_pending_read(stream, suppress_advice) || + if (!read_stream_start_pending_read(stream) || stream->ios_in_progress == stream->max_ios) { /* We've hit the buffer or I/O limit. Rewind and stop here. */ read_stream_unget_block(stream, blocknum); return; } - - suppress_advice = false; } /* This is the start of a new pending read. */ @@ -437,7 +451,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) stream->pinned_buffers == 0) || stream->distance == 0) && stream->ios_in_progress < stream->max_ios) - read_stream_start_pending_read(stream, suppress_advice); + read_stream_start_pending_read(stream); /* * There should always be something pinned when we leave this function, @@ -613,6 +627,8 @@ read_stream_begin_impl(int flags, stream->callback = callback; stream->callback_private_data = callback_private_data; stream->buffered_blocknum = InvalidBlockNumber; + stream->seq_blocknum = InvalidBlockNumber; + stream->seq_until_processed = InvalidBlockNumber; stream->temporary = SmgrIsTemp(smgr); /* @@ -793,7 +809,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * space for more, but if we're just starting up we'll need to crank * the handle to get started. */ - read_stream_look_ahead(stream, true); + read_stream_look_ahead(stream); /* End of stream reached? */ if (stream->pinned_buffers == 0) @@ -854,6 +870,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->distance = distance; } } + + /* + * If we've reached the first block of a sequential region we're + * issuing advice for, cancel that until the next jump. The kernel + * will see the sequential preadv() pattern starting here. + */ + if (stream->advice_enabled && + stream->ios[io_index].op.blocknum == stream->seq_until_processed) + stream->seq_until_processed = InvalidBlockNumber; } #ifdef CLOBBER_FREED_MEMORY @@ -899,7 +924,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->oldest_buffer_index = 0; /* Prepare for the next call. */ - read_stream_look_ahead(stream, false); + read_stream_look_ahead(stream); #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */