diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 4fea8ebf482..6a18e334809 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -531,6 +531,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, BlockNumber blockNum, BufferAccessStrategy strategy, bool *foundPtr, IOContext io_context); +static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks); +static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); @@ -1231,10 +1233,14 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence, return buffer; } + /* + * Signal that we are going to immediately wait. If we're immediately + * waiting, there is no benefit in actually executing the IO + * asynchronously, it would just add dispatch overhead. + */ + flags = READ_BUFFERS_SYNCHRONOUSLY; if (mode == RBM_ZERO_ON_ERROR) - flags = READ_BUFFERS_ZERO_ON_ERROR; - else - flags = 0; + flags |= READ_BUFFERS_ZERO_ON_ERROR; operation.smgr = smgr; operation.rel = rel; operation.persistence = persistence; @@ -1259,6 +1265,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, { int actual_nblocks = *nblocks; int maxcombine = 0; + bool did_start_io; Assert(*nblocks == 1 || allow_forwarding); Assert(*nblocks > 0); @@ -1326,6 +1333,20 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, if (i == 0) { *nblocks = 1; + +#ifdef USE_ASSERT_CHECKING + + /* + * Initialize enough of ReadBuffersOperation to make + * CheckReadBuffersOperation() work. Outside of assertions + * that's not necessary when no IO is issued. + */ + operation->buffers = buffers; + operation->blocknum = blockNum; + operation->nblocks = 1; + operation->nblocks_done = 1; + CheckReadBuffersOperation(operation, true); +#endif return false; } @@ -1368,25 +1389,74 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, operation->blocknum = blockNum; operation->flags = flags; operation->nblocks = actual_nblocks; + operation->nblocks_done = 0; + pgaio_wref_clear(&operation->io_wref); - if (flags & READ_BUFFERS_ISSUE_ADVICE) + /* + * When using AIO, start the IO in the background. If not, issue prefetch + * requests if desired by the caller. + * + * The reason we have a dedicated path for IOMETHOD_SYNC here is to + * de-risk the introduction of AIO somewhat. It's a large architectural + * change, with lots of chances for unanticipated performance effects. + * + * Use of IOMETHOD_SYNC already leads to not actually performing IO + * asynchronously, but without the check here we'd execute IO earlier than + * we used to. Eventually this IOMETHOD_SYNC specific path should go away. + */ + if (io_method != IOMETHOD_SYNC) { /* - * In theory we should only do this if PinBufferForBlock() had to - * allocate new buffers above. That way, if two calls to - * StartReadBuffers() were made for the same blocks before - * WaitReadBuffers(), only the first would issue the advice. That'd be - * a better simulation of true asynchronous I/O, which would only - * start the I/O once, but isn't done here for simplicity. + * Try to start IO asynchronously. It's possible that no IO needs to + * be started, if another backend already performed the IO. + * + * Note that if an IO is started, it might not cover the entire + * requested range, e.g. because an intermediary block has been read + * in by another backend. In that case any "trailing" buffers we + * already pinned above will be "forwarded" by read_stream.c to the + * next call to StartReadBuffers(). + * + * This is signalled to the caller by decrementing *nblocks *and* + * reducing operation->nblocks. The latter is done here, but not below + * WaitReadBuffers(), as in WaitReadBuffers() we can't "shorten" the + * overall read size anymore, we need to retry until done in its + * entirety or until failed. */ - smgrprefetch(operation->smgr, - operation->forknum, - blockNum, - actual_nblocks); + did_start_io = AsyncReadBuffers(operation, nblocks); + + operation->nblocks = *nblocks; + } + else + { + operation->flags |= READ_BUFFERS_SYNCHRONOUSLY; + + if (flags & READ_BUFFERS_ISSUE_ADVICE) + { + /* + * In theory we should only do this if PinBufferForBlock() had to + * allocate new buffers above. That way, if two calls to + * StartReadBuffers() were made for the same blocks before + * WaitReadBuffers(), only the first would issue the advice. + * That'd be a better simulation of true asynchronous I/O, which + * would only start the I/O once, but isn't done here for + * simplicity. + */ + smgrprefetch(operation->smgr, + operation->forknum, + blockNum, + actual_nblocks); + } + + /* + * Indicate that WaitReadBuffers() should be called. WaitReadBuffers() + * will initiate the necessary IO. + */ + did_start_io = true; } - /* Indicate that WaitReadBuffers() should be called. */ - return true; + CheckReadBuffersOperation(operation, !did_start_io); + + return did_start_io; } /* @@ -1452,8 +1522,35 @@ StartReadBuffer(ReadBuffersOperation *operation, return result; } +/* + * Perform sanity checks on the ReadBuffersOperation. + */ +static void +CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete) +{ +#ifdef USE_ASSERT_CHECKING + Assert(operation->nblocks_done <= operation->nblocks); + Assert(!is_complete || operation->nblocks == operation->nblocks_done); + + for (int i = 0; i < operation->nblocks; i++) + { + Buffer buffer = operation->buffers[i]; + BufferDesc *buf_hdr = BufferIsLocal(buffer) ? + GetLocalBufferDescriptor(-buffer - 1) : + GetBufferDescriptor(buffer - 1); + + Assert(BufferGetBlockNumber(buffer) == operation->blocknum + i); + Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_TAG_VALID); + + if (i < operation->nblocks_done) + Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_VALID); + } +#endif +} + +/* helper for ReadBuffersCanStartIO(), to avoid repetition */ static inline bool -WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) +ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) { if (BufferIsLocal(buffer)) return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), @@ -1462,28 +1559,85 @@ WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); } +/* + * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. + */ +static inline bool +ReadBuffersCanStartIO(Buffer buffer, bool nowait) +{ + /* + * If this backend currently has staged IO, we need to submit the pending + * IO before waiting for the right to issue IO, to avoid the potential for + * deadlocks (and, more commonly, unnecessary delays for other backends). + */ + if (!nowait && pgaio_have_staged()) + { + if (ReadBuffersCanStartIOOnce(buffer, true)) + return true; + + /* + * Unfortunately StartBufferIO() returning false doesn't allow to + * distinguish between the buffer already being valid and IO already + * being in progress. Since IO already being in progress is quite + * rare, this approach seems fine. + */ + pgaio_submit_staged(); + } + + return ReadBuffersCanStartIOOnce(buffer, nowait); +} + +/* + * Helper for WaitReadBuffers() that processes the results of a readv + * operation, raising an error if necessary. + */ +static void +ProcessReadBuffersResult(ReadBuffersOperation *operation) +{ + PgAioReturn *aio_ret = &operation->io_return; + PgAioResultStatus rs = aio_ret->result.status; + int newly_read_blocks = 0; + + Assert(pgaio_wref_valid(&operation->io_wref)); + Assert(aio_ret->result.status != PGAIO_RS_UNKNOWN); + + /* + * SMGR reports the number of blocks successfully read as the result of + * the IO operation. Thus we can simply add that to ->nblocks_done. + */ + + if (likely(rs != PGAIO_RS_ERROR)) + newly_read_blocks = aio_ret->result.result; + + if (rs == PGAIO_RS_ERROR || rs == PGAIO_RS_WARNING) + pgaio_result_report(aio_ret->result, &aio_ret->target_data, + rs == PGAIO_RS_ERROR ? ERROR : WARNING); + else if (aio_ret->result.status == PGAIO_RS_PARTIAL) + { + /* + * We'll retry, so we just emit a debug message to the server log (or + * not even that in prod scenarios). + */ + pgaio_result_report(aio_ret->result, &aio_ret->target_data, DEBUG1); + elog(DEBUG3, "partial read, will retry"); + } + + Assert(newly_read_blocks > 0); + Assert(newly_read_blocks <= MAX_IO_COMBINE_LIMIT); + + operation->nblocks_done += newly_read_blocks; + + Assert(operation->nblocks_done <= operation->nblocks); +} + void WaitReadBuffers(ReadBuffersOperation *operation) { - Buffer *buffers; - int nblocks; - BlockNumber blocknum; - ForkNumber forknum; + PgAioReturn *aio_ret = &operation->io_return; IOContext io_context; IOObject io_object; - char persistence; - /* Find the range of the physical read we need to perform. */ - nblocks = operation->nblocks; - buffers = &operation->buffers[0]; - blocknum = operation->blocknum; - forknum = operation->forknum; - persistence = operation->persistence; - - Assert(nblocks > 0); - Assert(nblocks <= MAX_IO_COMBINE_LIMIT); - - if (persistence == RELPERSISTENCE_TEMP) + if (operation->persistence == RELPERSISTENCE_TEMP) { io_context = IOCONTEXT_NORMAL; io_object = IOOBJECT_TEMP_RELATION; @@ -1494,155 +1648,335 @@ WaitReadBuffers(ReadBuffersOperation *operation) io_object = IOOBJECT_RELATION; } - for (int i = 0; i < nblocks; ++i) + /* + * If we get here without an IO operation having been issued, the + * io_method == IOMETHOD_SYNC path must have been used. Otherwise the + * caller should not have called WaitReadBuffers(). + * + * In the case of IOMETHOD_SYNC, we start - as we used to before the + * introducing of AIO - the IO in WaitReadBuffers(). This is done as part + * of the retry logic below, no extra code is required. + * + * This path is expected to eventually go away. + */ + if (!pgaio_wref_valid(&operation->io_wref) && io_method != IOMETHOD_SYNC) + elog(ERROR, "waiting for read operation that didn't read"); + + /* + * To handle partial reads, and IOMETHOD_SYNC, we re-issue IO until we're + * done. We may need multiple retries, not just because we could get + * multiple partial reads, but also because some of the remaining + * to-be-read buffers may have been read in by other backends, limiting + * the IO size. + */ + while (true) { - int io_buffers_len; - Buffer io_buffers[MAX_IO_COMBINE_LIMIT]; - void *io_pages[MAX_IO_COMBINE_LIMIT]; - instr_time io_start; - BlockNumber io_first_block; + int ignored_nblocks_progress; + + CheckReadBuffersOperation(operation, false); /* - * Skip this block if someone else has already completed it. If an - * I/O is already in progress in another backend, this will wait for - * the outcome: either done, or something went wrong and we will - * retry. + * If there is an IO associated with the operation, we may need to + * wait for it. */ - if (!WaitReadBuffersCanStartIO(buffers[i], false)) + if (pgaio_wref_valid(&operation->io_wref)) { /* - * Report and track this as a 'hit' for this backend, even though - * it must have started out as a miss in PinBufferForBlock(). The - * other backend will track this as a 'read'. + * Track the time spent waiting for the IO to complete. As + * tracking a wait even if we don't actually need to wait + * + * a) is not cheap, due to the timestamping overhead + * + * b) reports some time as waiting, even if we never waited + * + * we first check if we already know the IO is complete. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i, - operation->smgr->smgr_rlocator.locator.spcOid, - operation->smgr->smgr_rlocator.locator.dbOid, - operation->smgr->smgr_rlocator.locator.relNumber, - operation->smgr->smgr_rlocator.backend, - true); + if (aio_ret->result.status == PGAIO_RS_UNKNOWN && + !pgaio_wref_check_done(&operation->io_wref)) + { + instr_time io_start = pgstat_prepare_io_time(track_io_timing); - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_hit += 1; + pgaio_wref_wait(&operation->io_wref); + + /* + * The IO operation itself was already counted earlier, in + * AsyncReadBuffers(), this just accounts for the wait time. + */ + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, + io_start, 0, 0); + } else - pgBufferUsage.shared_blks_hit += 1; + { + Assert(pgaio_wref_check_done(&operation->io_wref)); + } - if (operation->rel) - pgstat_count_buffer_hit(operation->rel); - - pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); - - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; - - continue; + /* + * We now are sure the IO completed. Check the results. This + * includes reporting on errors if there were any. + */ + ProcessReadBuffersResult(operation); } + /* + * Most of the time, the one IO we already started, will read in + * everything. But we need to deal with partial reads and buffers not + * needing IO anymore. + */ + if (operation->nblocks_done == operation->nblocks) + break; + + CHECK_FOR_INTERRUPTS(); + + /* + * This may only complete the IO partially, either because some + * buffers were already valid, or because of a partial read. + * + * NB: In contrast to after the AsyncReadBuffers() call in + * StartReadBuffers(), we do *not* reduce + * ReadBuffersOperation->nblocks here, callers expect the full + * operation to be completed at this point (as more operations may + * have been queued). + */ + AsyncReadBuffers(operation, &ignored_nblocks_progress); + } + + CheckReadBuffersOperation(operation, true); + + /* NB: READ_DONE tracepoint was already executed in completion callback */ +} + +/* + * Initiate IO for the ReadBuffersOperation + * + * This function only starts a single IO at a time. The size of the IO may be + * limited to below the to-be-read blocks, if one of the buffers has + * concurrently been read in. If the first to-be-read buffer is already valid, + * no IO will be issued. + * + * To support retries after partial reads, the first operation->nblocks_done + * buffers are skipped. + * + * On return *nblocks_progress is updated to reflect the number of buffers + * affected by the call. If the first buffer is valid, *nblocks_progress is + * set to 1 and operation->nblocks_done is incremented. + * + * Returns true if IO was initiated, false if no IO was necessary. + */ +static bool +AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) +{ + Buffer *buffers = &operation->buffers[0]; + int flags = operation->flags; + BlockNumber blocknum = operation->blocknum; + ForkNumber forknum = operation->forknum; + char persistence = operation->persistence; + int16 nblocks_done = operation->nblocks_done; + Buffer *io_buffers = &operation->buffers[nblocks_done]; + int io_buffers_len = 0; + PgAioHandle *ioh; + uint32 ioh_flags = 0; + void *io_pages[MAX_IO_COMBINE_LIMIT]; + IOContext io_context; + IOObject io_object; + bool did_start_io; + + /* + * When this IO is executed synchronously, either because the caller will + * immediately block waiting for the IO or because IOMETHOD_SYNC is used, + * the AIO subsystem needs to know. + */ + if (flags & READ_BUFFERS_SYNCHRONOUSLY) + ioh_flags |= PGAIO_HF_SYNCHRONOUS; + + if (persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + ioh_flags |= PGAIO_HF_REFERENCES_LOCAL; + } + else + { + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; + } + + /* + * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR + * flag. The reason for that is that, hopefully, zero_damaged_pages isn't + * set globally, but on a per-session basis. The completion callback, + * which may be run in other processes, e.g. in IO workers, may have a + * different value of the zero_damaged_pages GUC. + * + * XXX: We probably should eventually use a different flag for + * zero_damaged_pages, so we can report different log levels / error codes + * for zero_damaged_pages and ZERO_ON_ERROR. + */ + if (zero_damaged_pages) + flags |= READ_BUFFERS_ZERO_ON_ERROR; + + /* + * For the same reason as with zero_damaged_pages we need to use this + * backend's ignore_checksum_failure value. + */ + if (ignore_checksum_failure) + flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES; + + + /* + * To be allowed to report stats in the local completion callback we need + * to prepare to report stats now. This ensures we can safely report the + * checksum failure even in a critical section. + */ + pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid); + + /* + * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire() + * might block, which we don't want after setting IO_IN_PROGRESS. + * + * If we need to wait for IO before we can get a handle, submit + * already-staged IO first, so that other backends don't need to wait. + * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to + * wait for already submitted IO, which doesn't require additional locks, + * but it could still cause undesirable waits. + * + * A secondary benefit is that this would allow us to measure the time in + * pgaio_io_acquire() without causing undue timer overhead in the common, + * non-blocking, case. However, currently the pgstats infrastructure + * doesn't really allow that, as it a) asserts that an operation can't + * have time without operations b) doesn't have an API to report + * "accumulated" time. + */ + ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return); + if (unlikely(!ioh)) + { + pgaio_submit_staged(); + + ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); + } + + /* + * Check if we can start IO on the first to-be-read buffer. + * + * If an I/O is already in progress in another backend, we want to wait + * for the outcome: either done, or something went wrong and we will + * retry. + */ + if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) + { + /* + * Someone else has already completed this block, we're done. + * + * When IO is necessary, ->nblocks_done is updated in + * ProcessReadBuffersResult(), but that is not called if no IO is + * necessary. Thus update here. + */ + operation->nblocks_done += 1; + *nblocks_progress = 1; + + pgaio_io_release(ioh); + pgaio_wref_clear(&operation->io_wref); + did_start_io = false; + + /* + * Report and track this as a 'hit' for this backend, even though it + * must have started out as a miss in PinBufferForBlock(). The other + * backend will track this as a 'read'. + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done, + operation->smgr->smgr_rlocator.locator.spcOid, + operation->smgr->smgr_rlocator.locator.dbOid, + operation->smgr->smgr_rlocator.locator.relNumber, + operation->smgr->smgr_rlocator.backend, + true); + + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_hit += 1; + else + pgBufferUsage.shared_blks_hit += 1; + + if (operation->rel) + pgstat_count_buffer_hit(operation->rel); + + pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); + + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageHit; + } + else + { + instr_time io_start; + /* We found a buffer that we need to read in. */ - io_buffers[0] = buffers[i]; - io_pages[0] = BufferGetBlock(buffers[i]); - io_first_block = blocknum + i; + Assert(io_buffers[0] == buffers[nblocks_done]); + io_pages[0] = BufferGetBlock(buffers[nblocks_done]); io_buffers_len = 1; /* * How many neighboring-on-disk blocks can we scatter-read into other * buffers at the same time? In this case we don't wait if we see an - * I/O already in progress. We already hold BM_IO_IN_PROGRESS for the + * I/O already in progress. We already set BM_IO_IN_PROGRESS for the * head block, so we should get on with that I/O as soon as possible. - * We'll come back to this block again, above. */ - while ((i + 1) < nblocks && - WaitReadBuffersCanStartIO(buffers[i + 1], true)) + for (int i = nblocks_done + 1; i < operation->nblocks; i++) { + if (!ReadBuffersCanStartIO(buffers[i], true)) + break; /* Must be consecutive block numbers. */ - Assert(BufferGetBlockNumber(buffers[i + 1]) == - BufferGetBlockNumber(buffers[i]) + 1); + Assert(BufferGetBlockNumber(buffers[i - 1]) == + BufferGetBlockNumber(buffers[i]) - 1); + Assert(io_buffers[io_buffers_len] == buffers[i]); - io_buffers[io_buffers_len] = buffers[++i]; io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); } + /* get a reference to wait for in WaitReadBuffers() */ + pgaio_io_get_wref(ioh, &operation->io_wref); + + /* provide the list of buffers to the completion callbacks */ + pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); + + pgaio_io_register_callbacks(ioh, + persistence == RELPERSISTENCE_TEMP ? + PGAIO_HCB_LOCAL_BUFFER_READV : + PGAIO_HCB_SHARED_BUFFER_READV, + flags); + + pgaio_io_set_flag(ioh, ioh_flags); + + /* --- + * Even though we're trying to issue IO asynchronously, track the time + * in smgrstartreadv(): + * - if io_method == IOMETHOD_SYNC, we will always perform the IO + * immediately + * - the io method might not support the IO (e.g. worker IO for a temp + * table) + * --- + */ io_start = pgstat_prepare_io_time(track_io_timing); - smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len); - pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, - 1, io_buffers_len * BLCKSZ); - - /* Verify each block we read, and terminate the I/O. */ - for (int j = 0; j < io_buffers_len; ++j) - { - BufferDesc *bufHdr; - Block bufBlock; - int piv_flags; - bool verified; - bool checksum_failure; - - if (persistence == RELPERSISTENCE_TEMP) - { - bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1); - bufBlock = LocalBufHdrGetBlock(bufHdr); - } - else - { - bufHdr = GetBufferDescriptor(io_buffers[j] - 1); - bufBlock = BufHdrGetBlock(bufHdr); - } - - /* check for garbage data */ - piv_flags = PIV_LOG_WARNING; - if (ignore_checksum_failure) - piv_flags |= PIV_IGNORE_CHECKSUM_FAILURE; - verified = PageIsVerified((Page) bufBlock, io_first_block + j, - piv_flags, &checksum_failure); - if (checksum_failure) - { - RelFileLocatorBackend rloc = operation->smgr->smgr_rlocator; - - pgstat_prepare_report_checksum_failure(rloc.locator.dbOid); - pgstat_report_checksum_failures_in_db(rloc.locator.dbOid, 1); - } - - if (!verified) - { - if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages) - { - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s; zeroing out page", - io_first_block + j, - relpath(operation->smgr->smgr_rlocator, forknum).str))); - memset(bufBlock, 0, BLCKSZ); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", - io_first_block + j, - relpath(operation->smgr->smgr_rlocator, forknum).str))); - } - - /* Set BM_VALID, terminate IO, and wake up any waiters */ - if (persistence == RELPERSISTENCE_TEMP) - TerminateLocalBufferIO(bufHdr, false, BM_VALID, false); - else - TerminateBufferIO(bufHdr, false, BM_VALID, true, false); - - /* Report I/Os as completing individually. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, - operation->smgr->smgr_rlocator.locator.spcOid, - operation->smgr->smgr_rlocator.locator.dbOid, - operation->smgr->smgr_rlocator.locator.relNumber, - operation->smgr->smgr_rlocator.backend, - false); - } + smgrstartreadv(ioh, operation->smgr, forknum, + blocknum + nblocks_done, + io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, + io_start, 1, io_buffers_len * BLCKSZ); if (persistence == RELPERSISTENCE_TEMP) pgBufferUsage.local_blks_read += io_buffers_len; else pgBufferUsage.shared_blks_read += io_buffers_len; + /* + * Track vacuum cost when issuing IO, not after waiting for it. + * Otherwise we could end up issuing a lot of IO in a short timespan, + * despite a low cost limit. + */ if (VacuumCostActive) VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + + *nblocks_progress = io_buffers_len; + did_start_io = true; } + + return did_start_io; } /* diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 11f8508a90b..867ae9facb5 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -114,6 +114,9 @@ typedef struct BufferManagerRelation #define READ_BUFFERS_ISSUE_ADVICE (1 << 1) /* Don't treat page as invalid due to checksum failures. */ #define READ_BUFFERS_IGNORE_CHECKSUM_FAILURES (1 << 2) +/* IO will immediately be waited for */ +#define READ_BUFFERS_SYNCHRONOUSLY (1 << 3) + struct ReadBuffersOperation { @@ -133,6 +136,9 @@ struct ReadBuffersOperation BlockNumber blocknum; int flags; int16 nblocks; + int16 nblocks_done; + PgAioWaitRef io_wref; + PgAioReturn io_return; }; typedef struct ReadBuffersOperation ReadBuffersOperation;