diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 45bdf819d57..c60e37e7f7f 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -95,8 +95,10 @@ struct ReadStream int16 ios_in_progress; int16 queue_size; int16 max_pinned_buffers; + int16 forwarded_buffers; int16 pinned_buffers; int16 distance; + int16 initialized_buffers; bool advice_enabled; bool temporary; @@ -224,8 +226,10 @@ static bool read_stream_start_pending_read(ReadStream *stream) { bool need_wait; + int requested_nblocks; int nblocks; int flags; + int forwarded; int16 io_index; int16 overflow; int16 buffer_index; @@ -272,11 +276,21 @@ read_stream_start_pending_read(ReadStream *stream) } } - /* How many more buffers is this backend allowed? */ + /* + * How many more buffers is this backend allowed? + * + * Forwarded buffers are already pinned and map to the leading blocks of + * the pending read (the remaining portion of an earlier short read that + * we're about to continue). They are not counted in pinned_buffers, but + * they are counted as pins already held by this backend according to the + * buffer manager, so they must be added to the limit it grants us. + */ if (stream->temporary) buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); else buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + Assert(stream->forwarded_buffers <= stream->pending_read_nblocks); + buffer_limit += stream->forwarded_buffers; if (buffer_limit == 0 && stream->pinned_buffers == 0) buffer_limit = 1; /* guarantee progress */ @@ -301,10 +315,16 @@ read_stream_start_pending_read(ReadStream *stream) /* * We say how many blocks we want to read, but it may be smaller on return - * if the buffer manager decides to shorten the read. + * if the buffer manager decides to shorten the read. Initialize buffers + * to InvalidBuffer (= not a forwarded buffer) as input on first use only, + * and keep the original nblocks number so we can check for forwarded + * buffers as output, below. */ buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; + while (stream->initialized_buffers < buffer_index + nblocks) + stream->buffers[stream->initialized_buffers++] = InvalidBuffer; + requested_nblocks = nblocks; need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -334,15 +354,34 @@ read_stream_start_pending_read(ReadStream *stream) } /* - * We gave a contiguous range of buffer space to StartReadBuffers(), but - * we want it to wrap around at queue_size. Slide overflowing buffers to - * the front of the array. + * How many pins were acquired but forwarded to the next call? These need + * to be passed to the next StartReadBuffers() call by leaving them + * exactly where they are in the queue, or released if the stream ends + * early. We need the number for accounting purposes, since they are not + * counted in stream->pinned_buffers but we already hold them. */ - overflow = (buffer_index + nblocks) - stream->queue_size; + forwarded = 0; + while (nblocks + forwarded < requested_nblocks && + stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer) + forwarded++; + stream->forwarded_buffers = forwarded; + + /* + * We gave a contiguous range of buffer space to StartReadBuffers(), but + * we want it to wrap around at queue_size. Copy overflowing buffers to + * the front of the array where they'll be consumed, but also leave a copy + * in the overflow zone which the I/O operation has a pointer to (it needs + * a contiguous array). Both copies will be cleared when the buffers are + * handed to the consumer. + */ + overflow = (buffer_index + nblocks + forwarded) - stream->queue_size; if (overflow > 0) - memmove(&stream->buffers[0], - &stream->buffers[stream->queue_size], - sizeof(stream->buffers[0]) * overflow); + { + Assert(overflow < stream->queue_size); /* can't overlap */ + memcpy(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + } /* Compute location of start of next read, without using % operator. */ buffer_index += nblocks; @@ -719,10 +758,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* Fast path assumptions. */ Assert(stream->ios_in_progress == 0); + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); Assert(stream->distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); + Assert(stream->initialized_buffers > stream->oldest_buffer_index); /* We're going to return the buffer we pinned last time. */ oldest_buffer_index = stream->oldest_buffer_index; @@ -771,6 +812,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; + stream->buffers[oldest_buffer_index] = InvalidBuffer; } stream->fast_path = false; @@ -846,10 +888,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->seq_until_processed = InvalidBlockNumber; } -#ifdef CLOBBER_FREED_MEMORY - /* Clobber old buffer for debugging purposes. */ + /* + * We must zap this queue entry, or else it would appear as a forwarded + * buffer. If it's potentially in the overflow zone (ie from a + * multi-block I/O that wrapped around the queue), also zap the copy. + */ stream->buffers[oldest_buffer_index] = InvalidBuffer; -#endif + if (oldest_buffer_index < stream->io_combine_limit - 1) + stream->buffers[stream->queue_size + oldest_buffer_index] = + InvalidBuffer; #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND) @@ -894,6 +941,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) #ifndef READ_STREAM_DISABLE_FAST_PATH /* See if we can take the fast path for all-cached scans next time. */ if (stream->ios_in_progress == 0 && + stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && stream->distance == 1 && stream->pending_read_nblocks == 0 && @@ -929,6 +977,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) void read_stream_reset(ReadStream *stream) { + int16 index; Buffer buffer; /* Stop looking ahead. */ @@ -942,6 +991,24 @@ read_stream_reset(ReadStream *stream) while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) ReleaseBuffer(buffer); + /* Unpin any unused forwarded buffers. */ + index = stream->next_buffer_index; + while (index < stream->initialized_buffers && + (buffer = stream->buffers[index]) != InvalidBuffer) + { + Assert(stream->forwarded_buffers > 0); + stream->forwarded_buffers--; + ReleaseBuffer(buffer); + + stream->buffers[index] = InvalidBuffer; + if (index < stream->io_combine_limit - 1) + stream->buffers[stream->queue_size + index] = InvalidBuffer; + + if (++index == stream->queue_size) + index = 0; + } + + Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 0); Assert(stream->ios_in_progress == 0);