diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile index 8376cdfca20..eec03f6f2b4 100644 --- a/src/backend/storage/Makefile +++ b/src/backend/storage/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/storage top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync +SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile new file mode 100644 index 00000000000..2f29a9ec4d1 --- /dev/null +++ b/src/backend/storage/aio/Makefile @@ -0,0 +1,14 @@ +# +# Makefile for storage/aio +# +# src/backend/storage/aio/Makefile +# + +subdir = src/backend/storage/aio +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + read_stream.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build new file mode 100644 index 00000000000..10e1aa3b20b --- /dev/null +++ b/src/backend/storage/aio/meson.build @@ -0,0 +1,5 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +backend_sources += files( + 'read_stream.c', +) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c new file mode 100644 index 00000000000..4f21262ff5e --- /dev/null +++ b/src/backend/storage/aio/read_stream.c @@ -0,0 +1,812 @@ +/*------------------------------------------------------------------------- + * + * read_stream.c + * Mechanism for accessing buffered relation data with look-ahead + * + * Code that needs to access relation data typically pins blocks one at a + * time, often in a predictable order that might be sequential or data-driven. + * Calling the simple ReadBuffer() function for each block is inefficient, + * because blocks that are not yet in the buffer pool require I/O operations + * that are small and might stall waiting for storage. This mechanism looks + * into the future and calls StartReadBuffers() and WaitReadBuffers() to read + * neighboring blocks together and ahead of time, with an adaptive look-ahead + * distance. + * + * A user-provided callback generates a stream of block numbers that is used + * to form reads of up to io_combine_limit, by attempting to merge them with a + * pending read. When that isn't possible, the existing pending read is sent + * to StartReadBuffers() so that a new one can begin to form. + * + * The algorithm for controlling the look-ahead distance tries to classify the + * stream into three ideal behaviors: + * + * A) No I/O is necessary, because the requested blocks are fully cached + * already. There is no benefit to looking ahead more than one block, so + * distance is 1. This is the default initial assumption. + * + * B) I/O is necessary, but fadvise is undesirable because the access is + * sequential, or impossible because direct I/O is enabled or the system + * doesn't support advice. There is no benefit in looking ahead more than + * io_combine_limit, because in this case only goal is larger read system + * calls. Looking further ahead would pin many buffers and perform + * speculative work looking ahead for no benefit. + * + * C) I/O is necesssary, it appears random, and this system supports fadvise. + * We'll look further ahead in order to reach the configured level of I/O + * concurrency. + * + * The distance increases rapidly and decays slowly, so that it moves towards + * those levels as different I/O patterns are discovered. For example, a + * sequential scan of fully cached data doesn't bother looking ahead, but a + * sequential scan that hits a region of uncached blocks will start issuing + * increasingly wide read calls until it plateaus at io_combine_limit. + * + * The main data structure is a circular queue of buffers of size + * max_pinned_buffers plus some extra space for technical reasons, ready to be + * returned by read_stream_next_buffer(). Each buffer also has an optional + * variable sized object that is passed from the callback to the consumer of + * buffers. + * + * Parallel to the queue of buffers, there is a circular queue of in-progress + * I/Os that have been started with StartReadBuffers(), and for which + * WaitReadBuffers() must be called before returning the buffer. + * + * For example, if the callback return block numbers 10, 42, 43, 60 in + * successive calls, then these data structures might appear as follows: + * + * buffers buf/data ios + * + * +----+ +-----+ +--------+ + * | | | | +----+ 42..44 | <- oldest_io_index + * +----+ +-----+ | +--------+ + * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 | + * +----+ +-----+ | | +--------+ + * | 42 | | ? |<-+ | | | <- next_io_index + * +----+ +-----+ | +--------+ + * | 43 | | ? | | | | + * +----+ +-----+ | +--------+ + * | 44 | | ? | | | | + * +----+ +-----+ | +--------+ + * | 60 | | ? |<---+ + * +----+ +-----+ + * next_buffer_index -> | | | | + * +----+ +-----+ + * + * In the example, 5 buffers are pinned, and the next buffer to be streamed to + * the client is block 10. Block 10 was a hit and has no associated I/O, but + * the range 42..44 requires an I/O wait before its buffers are returned, as + * does block 60. + * + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/read_stream.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "catalog/pg_tablespace.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/smgr.h" +#include "storage/read_stream.h" +#include "utils/memdebug.h" +#include "utils/rel.h" +#include "utils/spccache.h" + +typedef struct InProgressIO +{ + int16 buffer_index; + ReadBuffersOperation op; +} InProgressIO; + +/* + * State for managing a stream of reads. + */ +struct ReadStream +{ + int16 max_ios; + int16 ios_in_progress; + int16 queue_size; + int16 max_pinned_buffers; + int16 pinned_buffers; + int16 distance; + bool advice_enabled; + + /* + * Small buffer of block numbers, useful for 'ungetting' to resolve flow + * control problems when I/Os are split. Also useful for batch-loading + * block numbers in the fast path. + */ + BlockNumber blocknums[16]; + int16 blocknums_count; + int16 blocknums_next; + + /* + * The callback that will tell us which block numbers to read, and an + * opaque pointer that will be pass to it for its own purposes. + */ + ReadStreamBlockNumberCB callback; + void *callback_private_data; + + /* Next expected block, for detecting sequential access. */ + BlockNumber seq_blocknum; + + /* The read operation we are currently preparing. */ + BlockNumber pending_read_blocknum; + int16 pending_read_nblocks; + + /* Space for buffers and optional per-buffer private data. */ + size_t per_buffer_data_size; + void *per_buffer_data; + + /* Read operations that have been started but not waited for yet. */ + InProgressIO *ios; + int16 oldest_io_index; + int16 next_io_index; + + bool fast_path; + + /* Circular queue of buffers. */ + int16 oldest_buffer_index; /* Next pinned buffer to return */ + int16 next_buffer_index; /* Index of next buffer to pin */ + Buffer buffers[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* + * Return a pointer to the per-buffer data by index. + */ +static inline void * +get_per_buffer_data(ReadStream *stream, int16 buffer_index) +{ + return (char *) stream->per_buffer_data + + stream->per_buffer_data_size * buffer_index; +} + +/* + * Ask the callback which block it would like us to read next, with a small + * buffer in front to allow read_stream_unget_block() to work and to allow the + * fast path to work in batches. + */ +static inline BlockNumber +read_stream_get_block(ReadStream *stream, void *per_buffer_data) +{ + if (stream->blocknums_next < stream->blocknums_count) + return stream->blocknums[stream->blocknums_next++]; + + /* + * We only bother to fetch one at a time here (but see the fast path which + * uses more). + */ + return stream->callback(stream, + stream->callback_private_data, + per_buffer_data); +} + +/* + * In order to deal with short reads in StartReadBuffers(), we sometimes need + * to defer handling of a block until later. + */ +static inline void +read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) +{ + if (stream->blocknums_next == stream->blocknums_count) + { + /* Never initialized or entirely consumed. Re-initialize. */ + stream->blocknums[0] = blocknum; + stream->blocknums_count = 1; + stream->blocknums_next = 0; + } + else + { + /* Must be the last value return from blocknums array. */ + Assert(stream->blocknums_next > 0); + stream->blocknums_next--; + Assert(stream->blocknums[stream->blocknums_next] == blocknum); + } +} + +#ifndef READ_STREAM_DISABLE_FAST_PATH +static void +read_stream_fill_blocknums(ReadStream *stream) +{ + BlockNumber blocknum; + int i = 0; + + do + { + blocknum = stream->callback(stream, + stream->callback_private_data, + NULL); + stream->blocknums[i++] = blocknum; + } while (i < lengthof(stream->blocknums) && + blocknum != InvalidBlockNumber); + stream->blocknums_count = i; + stream->blocknums_next = 0; +} +#endif + +static void +read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) +{ + bool need_wait; + int nblocks; + int flags; + int16 io_index; + int16 overflow; + int16 buffer_index; + + /* This should only be called with a pending read. */ + Assert(stream->pending_read_nblocks > 0); + Assert(stream->pending_read_nblocks <= io_combine_limit); + + /* We had better not exceed the pin limit by starting this read. */ + Assert(stream->pinned_buffers + stream->pending_read_nblocks <= + stream->max_pinned_buffers); + + /* We had better not be overwriting an existing pinned buffer. */ + if (stream->pinned_buffers > 0) + Assert(stream->next_buffer_index != stream->oldest_buffer_index); + else + Assert(stream->next_buffer_index == stream->oldest_buffer_index); + + /* + * If advice hasn't been suppressed, this system supports it, and this + * isn't a strictly sequential pattern, then we'll issue advice. + */ + if (!suppress_advice && + stream->advice_enabled && + stream->pending_read_blocknum != stream->seq_blocknum) + flags = READ_BUFFERS_ISSUE_ADVICE; + else + flags = 0; + + /* We say how many blocks we want to read, but may be smaller on return. */ + buffer_index = stream->next_buffer_index; + io_index = stream->next_io_index; + nblocks = stream->pending_read_nblocks; + need_wait = StartReadBuffers(&stream->ios[io_index].op, + &stream->buffers[buffer_index], + stream->pending_read_blocknum, + &nblocks, + flags); + stream->pinned_buffers += nblocks; + + /* Remember whether we need to wait before returning this buffer. */ + if (!need_wait) + { + /* Look-ahead distance decays, no I/O necessary (behavior A). */ + if (stream->distance > 1) + stream->distance--; + } + else + { + /* + * Remember to call WaitReadBuffers() before returning head buffer. + * Look-ahead distance will be adjusted after waiting. + */ + stream->ios[io_index].buffer_index = buffer_index; + if (++stream->next_io_index == stream->max_ios) + stream->next_io_index = 0; + Assert(stream->ios_in_progress < stream->max_ios); + stream->ios_in_progress++; + stream->seq_blocknum = stream->pending_read_blocknum + nblocks; + } + + /* + * We gave a contiguous range of buffer space to StartReadBuffers(), but + * we want it to wrap around at queue_size. Slide overflowing buffers to + * the front of the array. + */ + overflow = (buffer_index + nblocks) - stream->queue_size; + if (overflow > 0) + memmove(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + + /* Compute location of start of next read, without using % operator. */ + buffer_index += nblocks; + if (buffer_index >= stream->queue_size) + buffer_index -= stream->queue_size; + Assert(buffer_index >= 0 && buffer_index < stream->queue_size); + stream->next_buffer_index = buffer_index; + + /* Adjust the pending read to cover the remaining portion, if any. */ + stream->pending_read_blocknum += nblocks; + stream->pending_read_nblocks -= nblocks; +} + +static void +read_stream_look_ahead(ReadStream *stream, bool suppress_advice) +{ + while (stream->ios_in_progress < stream->max_ios && + stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) + { + BlockNumber blocknum; + int16 buffer_index; + void *per_buffer_data; + + if (stream->pending_read_nblocks == io_combine_limit) + { + read_stream_start_pending_read(stream, suppress_advice); + suppress_advice = false; + continue; + } + + /* + * See which block the callback wants next in the stream. We need to + * compute the index of the Nth block of the pending read including + * wrap-around, but we don't want to use the expensive % operator. + */ + buffer_index = stream->next_buffer_index + stream->pending_read_nblocks; + if (buffer_index >= stream->queue_size) + buffer_index -= stream->queue_size; + Assert(buffer_index >= 0 && buffer_index < stream->queue_size); + per_buffer_data = get_per_buffer_data(stream, buffer_index); + blocknum = read_stream_get_block(stream, per_buffer_data); + if (blocknum == InvalidBlockNumber) + { + /* End of stream. */ + stream->distance = 0; + break; + } + + /* Can we merge it with the pending read? */ + if (stream->pending_read_nblocks > 0 && + stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum) + { + stream->pending_read_nblocks++; + continue; + } + + /* We have to start the pending read before we can build another. */ + if (stream->pending_read_nblocks > 0) + { + read_stream_start_pending_read(stream, suppress_advice); + suppress_advice = false; + if (stream->ios_in_progress == stream->max_ios) + { + /* And we've hit the limit. Rewind, and stop here. */ + read_stream_unget_block(stream, blocknum); + return; + } + } + + /* This is the start of a new pending read. */ + stream->pending_read_blocknum = blocknum; + stream->pending_read_nblocks = 1; + } + + /* + * We don't start the pending read just because we've hit the distance + * limit, preferring to give it another chance to grow to full + * io_combine_limit size once more buffers have been consumed. However, + * if we've already reached io_combine_limit, or we've reached the + * distance limit and there isn't anything pinned yet, or the callback has + * signaled end-of-stream, we start the read immediately. + */ + if (stream->pending_read_nblocks > 0 && + (stream->pending_read_nblocks == io_combine_limit || + (stream->pending_read_nblocks == stream->distance && + stream->pinned_buffers == 0) || + stream->distance == 0) && + stream->ios_in_progress < stream->max_ios) + read_stream_start_pending_read(stream, suppress_advice); +} + +/* + * Create a new read stream object that can be used to perform the equivalent + * of a series of ReadBuffer() calls for one fork of one relation. + * Internally, it generates larger vectored reads where possible by looking + * ahead. The callback should return block numbers or InvalidBlockNumber to + * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also + * write extra data for each block into the space provided to it. It will + * also receive callback_private_data for its own purposes. + */ +ReadStream * +read_stream_begin_relation(int flags, + BufferAccessStrategy strategy, + Relation rel, + ForkNumber forknum, + ReadStreamBlockNumberCB callback, + void *callback_private_data, + size_t per_buffer_data_size) +{ + ReadStream *stream; + size_t size; + int16 queue_size; + int16 max_ios; + uint32 max_pinned_buffers; + Oid tablespace_id; + SMgrRelation smgr; + + smgr = RelationGetSmgr(rel); + + /* + * Decide how many I/Os we will allow to run at the same time. That + * currently means advice to the kernel to tell it that we will soon read. + * This number also affects how far we look ahead for opportunities to + * start more I/Os. + */ + tablespace_id = smgr->smgr_rlocator.locator.spcOid; + if (!OidIsValid(MyDatabaseId) || + IsCatalogRelation(rel) || + IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber)) + { + /* + * Avoid circularity while trying to look up tablespace settings or + * before spccache.c is ready. + */ + max_ios = effective_io_concurrency; + } + else if (flags & READ_STREAM_MAINTENANCE) + max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id); + else + max_ios = get_tablespace_io_concurrency(tablespace_id); + max_ios = Min(max_ios, PG_INT16_MAX); + + /* + * Choose the maximum number of buffers we're prepared to pin. We try to + * pin fewer if we can, though. We clamp it to at least io_combine_limit + * so that we can have a chance to build up a full io_combine_limit sized + * read, even when max_ios is zero. Be careful not to allow int16 to + * overflow (even though that's not possible with the current GUC range + * limits), allowing also for the spare entry and the overflow space. + */ + max_pinned_buffers = Max(max_ios * 4, io_combine_limit); + max_pinned_buffers = Min(max_pinned_buffers, + PG_INT16_MAX - io_combine_limit - 1); + + /* Don't allow this backend to pin more than its share of buffers. */ + if (SmgrIsTemp(smgr)) + LimitAdditionalLocalPins(&max_pinned_buffers); + else + LimitAdditionalPins(&max_pinned_buffers); + Assert(max_pinned_buffers > 0); + + /* + * We need one extra entry for buffers and per-buffer data, because users + * of per-buffer data have access to the object until the next call to + * read_stream_next_buffer(), so we need a gap between the head and tail + * of the queue so that we don't clobber it. + */ + queue_size = max_pinned_buffers + 1; + + /* + * Allocate the object, the buffers, the ios and per_data_data space in + * one big chunk. Though we have queue_size buffers, we want to be able + * to assume that all the buffers for a single read are contiguous (i.e. + * don't wrap around halfway through), so we allow temporary overflows of + * up to the maximum possible read size by allocating an extra + * io_combine_limit - 1 elements. + */ + size = offsetof(ReadStream, buffers); + size += sizeof(Buffer) * (queue_size + io_combine_limit - 1); + size += sizeof(InProgressIO) * Max(1, max_ios); + size += per_buffer_data_size * queue_size; + size += MAXIMUM_ALIGNOF * 2; + stream = (ReadStream *) palloc(size); + memset(stream, 0, offsetof(ReadStream, buffers)); + stream->ios = (InProgressIO *) + MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]); + if (per_buffer_data_size > 0) + stream->per_buffer_data = (void *) + MAXALIGN(&stream->ios[Max(1, max_ios)]); + +#ifdef USE_PREFETCH + + /* + * This system supports prefetching advice. We can use it as long as + * direct I/O isn't enabled, the caller hasn't promised sequential access + * (overriding our detection heuristics), and max_ios hasn't been set to + * zero. + */ + if ((io_direct_flags & IO_DIRECT_DATA) == 0 && + (flags & READ_STREAM_SEQUENTIAL) == 0 && + max_ios > 0) + stream->advice_enabled = true; +#endif + + /* + * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled + * above. If we had real asynchronous I/O we might need a slightly + * different definition. + */ + if (max_ios == 0) + max_ios = 1; + + stream->max_ios = max_ios; + stream->per_buffer_data_size = per_buffer_data_size; + stream->max_pinned_buffers = max_pinned_buffers; + stream->queue_size = queue_size; + stream->callback = callback; + stream->callback_private_data = callback_private_data; + + /* + * Skip the initial ramp-up phase if the caller says we're going to be + * reading the whole relation. This way we start out assuming we'll be + * doing full io_combine_limit sized reads (behavior B). + */ + if (flags & READ_STREAM_FULL) + stream->distance = Min(max_pinned_buffers, io_combine_limit); + else + stream->distance = 1; + + /* + * Since we always always access the same relation, we can initialize + * parts of the ReadBuffersOperation objects and leave them that way, to + * avoid wasting CPU cycles writing to them for each read. + */ + for (int i = 0; i < max_ios; ++i) + { + stream->ios[i].op.rel = rel; + stream->ios[i].op.smgr = RelationGetSmgr(rel); + stream->ios[i].op.smgr_persistence = 0; + stream->ios[i].op.forknum = forknum; + stream->ios[i].op.strategy = strategy; + } + + return stream; +} + +/* + * Pull one pinned buffer out of a stream. Each call returns successive + * blocks in the order specified by the callback. If per_buffer_data_size was + * set to a non-zero size, *per_buffer_data receives a pointer to the extra + * per-buffer data that the callback had a chance to populate, which remains + * valid until the next call to read_stream_next_buffer(). When the stream + * runs out of data, InvalidBuffer is returned. The caller may decide to end + * the stream early at any time by calling read_stream_end(). + */ +Buffer +read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) +{ + Buffer buffer; + int16 oldest_buffer_index; + +#ifndef READ_STREAM_DISABLE_FAST_PATH + + /* + * A fast path for all-cached scans (behavior A). This is the same as the + * usual algorithm, but it is specialized for no I/O and no per-buffer + * data, so we can skip the queue management code, stay in the same buffer + * slot and use singular StartReadBuffer(). + */ + if (likely(stream->fast_path)) + { + BlockNumber next_blocknum; + bool need_wait; + + /* Fast path assumptions. */ + Assert(stream->ios_in_progress == 0); + Assert(stream->pinned_buffers == 1); + Assert(stream->distance == 1); + Assert(stream->pending_read_nblocks == 1); + Assert(stream->per_buffer_data_size == 0); + + /* We're going to return the buffer we pinned last time. */ + oldest_buffer_index = stream->oldest_buffer_index; + Assert((oldest_buffer_index + 1) % stream->queue_size == + stream->next_buffer_index); + buffer = stream->buffers[oldest_buffer_index]; + Assert(buffer != InvalidBuffer); + + /* + * Pin a buffer for the next call. Same buffer entry, and arbitrary + * I/O entry (they're all free). + */ + need_wait = StartReadBuffer(&stream->ios[0].op, + &stream->buffers[oldest_buffer_index], + stream->pending_read_blocknum, + stream->advice_enabled ? + READ_BUFFERS_ISSUE_ADVICE : 0); + + /* Choose the block the next call will pin. */ + if (unlikely(stream->blocknums_next == stream->blocknums_count)) + read_stream_fill_blocknums(stream); + next_blocknum = stream->blocknums[stream->blocknums_next++]; + + /* + * Fast return if the next call doesn't require I/O for the buffer we + * just pinned, and we have a block number to give it as a pending + * read. + */ + if (likely(!need_wait && next_blocknum != InvalidBlockNumber)) + { + stream->pending_read_blocknum = next_blocknum; + return buffer; + } + + /* + * For anything more complex, set up some more state and take the slow + * path next time. + */ + stream->fast_path = false; + + if (need_wait) + { + /* Next call must wait for I/O for the newly pinned buffer. */ + stream->oldest_io_index = 0; + stream->next_io_index = stream->max_ios > 1 ? 1 : 0; + stream->ios_in_progress = 1; + stream->ios[0].buffer_index = oldest_buffer_index; + stream->seq_blocknum = next_blocknum + 1; + } + if (next_blocknum == InvalidBlockNumber) + { + /* Next call hits end of stream and can't pin anything more. */ + stream->distance = 0; + stream->pending_read_nblocks = 0; + } + else + { + /* Set up the pending read. */ + stream->pending_read_blocknum = next_blocknum; + } + return buffer; + } +#endif + + if (unlikely(stream->pinned_buffers == 0)) + { + Assert(stream->oldest_buffer_index == stream->next_buffer_index); + + /* End of stream reached? */ + if (stream->distance == 0) + return InvalidBuffer; + + /* + * The usual order of operations is that we look ahead at the bottom + * of this function after potentially finishing an I/O and making + * space for more, but if we're just starting up we'll need to crank + * the handle to get started. + */ + read_stream_look_ahead(stream, true); + + /* End of stream reached? */ + if (stream->pinned_buffers == 0) + { + Assert(stream->distance == 0); + return InvalidBuffer; + } + } + + /* Grab the oldest pinned buffer and associated per-buffer data. */ + Assert(stream->pinned_buffers > 0); + oldest_buffer_index = stream->oldest_buffer_index; + Assert(oldest_buffer_index >= 0 && + oldest_buffer_index < stream->queue_size); + buffer = stream->buffers[oldest_buffer_index]; + if (per_buffer_data) + *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index); + + Assert(BufferIsValid(buffer)); + + /* Do we have to wait for an associated I/O first? */ + if (stream->ios_in_progress > 0 && + stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) + { + int16 io_index = stream->oldest_io_index; + int16 distance; + + /* Sanity check that we still agree on the buffers. */ + Assert(stream->ios[io_index].op.buffers == + &stream->buffers[oldest_buffer_index]); + + WaitReadBuffers(&stream->ios[io_index].op); + + Assert(stream->ios_in_progress > 0); + stream->ios_in_progress--; + if (++stream->oldest_io_index == stream->max_ios) + stream->oldest_io_index = 0; + + if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE) + { + /* Distance ramps up fast (behavior C). */ + distance = stream->distance * 2; + distance = Min(distance, stream->max_pinned_buffers); + stream->distance = distance; + } + else + { + /* No advice; move towards io_combine_limit (behavior B). */ + if (stream->distance > io_combine_limit) + { + stream->distance--; + } + else + { + distance = stream->distance * 2; + distance = Min(distance, io_combine_limit); + distance = Min(distance, stream->max_pinned_buffers); + stream->distance = distance; + } + } + } + +#ifdef CLOBBER_FREED_MEMORY + /* Clobber old buffer and per-buffer data for debugging purposes. */ + stream->buffers[oldest_buffer_index] = InvalidBuffer; + + /* + * The caller will get access to the per-buffer data, until the next call. + * We wipe the one before, which is never occupied because queue_size + * allowed one extra element. This will hopefully trip up client code + * that is holding a dangling pointer to it. + */ + if (stream->per_buffer_data) + wipe_mem(get_per_buffer_data(stream, + oldest_buffer_index == 0 ? + stream->queue_size - 1 : + oldest_buffer_index - 1), + stream->per_buffer_data_size); +#endif + + /* Pin transferred to caller. */ + Assert(stream->pinned_buffers > 0); + stream->pinned_buffers--; + + /* Advance oldest buffer, with wrap-around. */ + stream->oldest_buffer_index++; + if (stream->oldest_buffer_index == stream->queue_size) + stream->oldest_buffer_index = 0; + + /* Prepare for the next call. */ + read_stream_look_ahead(stream, false); + +#ifndef READ_STREAM_DISABLE_FAST_PATH + /* See if we can take the fast path for all-cached scans next time. */ + if (stream->ios_in_progress == 0 && + stream->pinned_buffers == 1 && + stream->distance == 1 && + stream->pending_read_nblocks == 1 && + stream->per_buffer_data_size == 0) + { + stream->fast_path = true; + } + else + { + stream->fast_path = false; + } +#endif + + return buffer; +} + +/* + * Reset a read stream by releasing any queued up buffers, allowing the stream + * to be used again for different blocks. This can be used to clear an + * end-of-stream condition and start again, or to throw away blocks that were + * speculatively read and read some different blocks instead. + */ +void +read_stream_reset(ReadStream *stream) +{ + Buffer buffer; + + /* Stop looking ahead. */ + stream->distance = 0; + + /* Unpin anything that wasn't consumed. */ + while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) + ReleaseBuffer(buffer); + + Assert(stream->pinned_buffers == 0); + Assert(stream->ios_in_progress == 0); + + /* Start off assuming data is cached. */ + stream->distance = 1; +} + +/* + * Release and free a read stream. + */ +void +read_stream_end(ReadStream *stream) +{ + read_stream_reset(stream); + pfree(stream); +} diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build index 40345bdca27..739d13293fb 100644 --- a/src/backend/storage/meson.build +++ b/src/backend/storage/meson.build @@ -1,5 +1,6 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group +subdir('aio') subdir('buffer') subdir('file') subdir('freespace') diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h new file mode 100644 index 00000000000..fae09d2b4cc --- /dev/null +++ b/src/include/storage/read_stream.h @@ -0,0 +1,63 @@ +/*------------------------------------------------------------------------- + * + * read_stream.h + * Mechanism for accessing buffered relation data with look-ahead + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/read_stream.h + * + *------------------------------------------------------------------------- + */ +#ifndef READ_STREAM_H +#define READ_STREAM_H + +#include "storage/bufmgr.h" + +/* Default tuning, reasonable for many users. */ +#define READ_STREAM_DEFAULT 0x00 + +/* + * I/O streams that are performing maintenance work on behalf of potentially + * many users, and thus should be governed by maintenance_io_concurrency + * instead of effective_io_concurrency. For example, VACUUM or CREATE INDEX. + */ +#define READ_STREAM_MAINTENANCE 0x01 + +/* + * We usually avoid issuing prefetch advice automatically when sequential + * access is detected, but this flag explicitly disables it, for cases that + * might not be correctly detected. Explicit advice is known to perform worse + * than letting the kernel (at least Linux) detect sequential access. + */ +#define READ_STREAM_SEQUENTIAL 0x02 + +/* + * We usually ramp up from smaller reads to larger ones, to support users who + * don't know if it's worth reading lots of buffers yet. This flag disables + * that, declaring ahead of time that we'll be reading all available buffers. + */ +#define READ_STREAM_FULL 0x04 + +struct ReadStream; +typedef struct ReadStream ReadStream; + +/* Callback that returns the next block number to read. */ +typedef BlockNumber (*ReadStreamBlockNumberCB) (ReadStream *stream, + void *callback_private_data, + void *per_buffer_data); + +extern ReadStream *read_stream_begin_relation(int flags, + BufferAccessStrategy strategy, + Relation rel, + ForkNumber forknum, + ReadStreamBlockNumberCB callback, + void *callback_private_data, + size_t per_buffer_data_size); +extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_private); +extern void read_stream_reset(ReadStream *stream); +extern void read_stream_end(ReadStream *stream); + +#endif /* READ_STREAM_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 04484d41601..8bc8dd6f1c6 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1215,6 +1215,7 @@ InjectionPointCacheEntry InjectionPointEntry InjectionPointSharedState InlineCodeBlock +InProgressIO InsertStmt Instrumentation Int128AggState @@ -2295,6 +2296,7 @@ ReadExtraTocPtrType ReadFunc ReadLocalXLogPageNoWaitPrivate ReadReplicationSlotCmd +ReadStream ReassignOwnedStmt RecheckForeignScan_function RecordCacheArrayEntry