mirror of
https://github.com/postgres/postgres.git
synced 2025-04-18 13:44:19 +03:00
bufmgr: Use AIO in StartReadBuffers()
This finally introduces the first actual use of AIO. StartReadBuffers() now uses the AIO routines to issue IO. As the implementation of StartReadBuffers() is also used by the functions for reading individual blocks (StartReadBuffer() and through that ReadBufferExtended()) this means all buffered read IO passes through the AIO paths. However, as those are synchronous reads, actually performing the IO asynchronously would be rarely beneficial. Instead such IOs are flagged to always be executed synchronously. This way we don't have to duplicate a fair bit of code. When io_method=sync is used, the IO patterns generated after this change are the same as before, i.e. actual reads are only issued in WaitReadBuffers() and StartReadBuffers() may issue prefetch requests. This allows to bypass most of the actual asynchronicity, which is important to make a change as big as this less risky. One thing worth calling out is that, if IO is actually executed asynchronously, the precise meaning of what track_io_timing is measuring has changed. Previously it tracked the time for each IO, but that does not make sense when multiple IOs are executed concurrently. Now it only measures the time actually spent waiting for IO. A subsequent commit will adjust the docs for this. While AIO is now actually used, the logic in read_stream.c will often prevent using sufficiently many concurrent IOs. That will be addressed in the next commit. Reviewed-by: Noah Misch <noah@leadboat.com> Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com> Co-authored-by: Andres Freund <andres@anarazel.de> Co-authored-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
This commit is contained in:
parent
047cba7fa0
commit
12ce89fd07
@ -531,6 +531,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
|
||||
BlockNumber blockNum,
|
||||
BufferAccessStrategy strategy,
|
||||
bool *foundPtr, IOContext io_context);
|
||||
static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks);
|
||||
static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete);
|
||||
static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
|
||||
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
|
||||
IOObject io_object, IOContext io_context);
|
||||
@ -1231,10 +1233,14 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/*
|
||||
* Signal that we are going to immediately wait. If we're immediately
|
||||
* waiting, there is no benefit in actually executing the IO
|
||||
* asynchronously, it would just add dispatch overhead.
|
||||
*/
|
||||
flags = READ_BUFFERS_SYNCHRONOUSLY;
|
||||
if (mode == RBM_ZERO_ON_ERROR)
|
||||
flags = READ_BUFFERS_ZERO_ON_ERROR;
|
||||
else
|
||||
flags = 0;
|
||||
flags |= READ_BUFFERS_ZERO_ON_ERROR;
|
||||
operation.smgr = smgr;
|
||||
operation.rel = rel;
|
||||
operation.persistence = persistence;
|
||||
@ -1259,6 +1265,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
{
|
||||
int actual_nblocks = *nblocks;
|
||||
int maxcombine = 0;
|
||||
bool did_start_io;
|
||||
|
||||
Assert(*nblocks == 1 || allow_forwarding);
|
||||
Assert(*nblocks > 0);
|
||||
@ -1326,6 +1333,20 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
if (i == 0)
|
||||
{
|
||||
*nblocks = 1;
|
||||
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
|
||||
/*
|
||||
* Initialize enough of ReadBuffersOperation to make
|
||||
* CheckReadBuffersOperation() work. Outside of assertions
|
||||
* that's not necessary when no IO is issued.
|
||||
*/
|
||||
operation->buffers = buffers;
|
||||
operation->blocknum = blockNum;
|
||||
operation->nblocks = 1;
|
||||
operation->nblocks_done = 1;
|
||||
CheckReadBuffersOperation(operation, true);
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -1368,6 +1389,46 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
operation->blocknum = blockNum;
|
||||
operation->flags = flags;
|
||||
operation->nblocks = actual_nblocks;
|
||||
operation->nblocks_done = 0;
|
||||
pgaio_wref_clear(&operation->io_wref);
|
||||
|
||||
/*
|
||||
* When using AIO, start the IO in the background. If not, issue prefetch
|
||||
* requests if desired by the caller.
|
||||
*
|
||||
* The reason we have a dedicated path for IOMETHOD_SYNC here is to
|
||||
* de-risk the introduction of AIO somewhat. It's a large architectural
|
||||
* change, with lots of chances for unanticipated performance effects.
|
||||
*
|
||||
* Use of IOMETHOD_SYNC already leads to not actually performing IO
|
||||
* asynchronously, but without the check here we'd execute IO earlier than
|
||||
* we used to. Eventually this IOMETHOD_SYNC specific path should go away.
|
||||
*/
|
||||
if (io_method != IOMETHOD_SYNC)
|
||||
{
|
||||
/*
|
||||
* Try to start IO asynchronously. It's possible that no IO needs to
|
||||
* be started, if another backend already performed the IO.
|
||||
*
|
||||
* Note that if an IO is started, it might not cover the entire
|
||||
* requested range, e.g. because an intermediary block has been read
|
||||
* in by another backend. In that case any "trailing" buffers we
|
||||
* already pinned above will be "forwarded" by read_stream.c to the
|
||||
* next call to StartReadBuffers().
|
||||
*
|
||||
* This is signalled to the caller by decrementing *nblocks *and*
|
||||
* reducing operation->nblocks. The latter is done here, but not below
|
||||
* WaitReadBuffers(), as in WaitReadBuffers() we can't "shorten" the
|
||||
* overall read size anymore, we need to retry until done in its
|
||||
* entirety or until failed.
|
||||
*/
|
||||
did_start_io = AsyncReadBuffers(operation, nblocks);
|
||||
|
||||
operation->nblocks = *nblocks;
|
||||
}
|
||||
else
|
||||
{
|
||||
operation->flags |= READ_BUFFERS_SYNCHRONOUSLY;
|
||||
|
||||
if (flags & READ_BUFFERS_ISSUE_ADVICE)
|
||||
{
|
||||
@ -1375,9 +1436,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
* In theory we should only do this if PinBufferForBlock() had to
|
||||
* allocate new buffers above. That way, if two calls to
|
||||
* StartReadBuffers() were made for the same blocks before
|
||||
* WaitReadBuffers(), only the first would issue the advice. That'd be
|
||||
* a better simulation of true asynchronous I/O, which would only
|
||||
* start the I/O once, but isn't done here for simplicity.
|
||||
* WaitReadBuffers(), only the first would issue the advice.
|
||||
* That'd be a better simulation of true asynchronous I/O, which
|
||||
* would only start the I/O once, but isn't done here for
|
||||
* simplicity.
|
||||
*/
|
||||
smgrprefetch(operation->smgr,
|
||||
operation->forknum,
|
||||
@ -1385,8 +1447,16 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
|
||||
actual_nblocks);
|
||||
}
|
||||
|
||||
/* Indicate that WaitReadBuffers() should be called. */
|
||||
return true;
|
||||
/*
|
||||
* Indicate that WaitReadBuffers() should be called. WaitReadBuffers()
|
||||
* will initiate the necessary IO.
|
||||
*/
|
||||
did_start_io = true;
|
||||
}
|
||||
|
||||
CheckReadBuffersOperation(operation, !did_start_io);
|
||||
|
||||
return did_start_io;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1452,8 +1522,35 @@ StartReadBuffer(ReadBuffersOperation *operation,
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Perform sanity checks on the ReadBuffersOperation.
|
||||
*/
|
||||
static void
|
||||
CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
|
||||
{
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
Assert(operation->nblocks_done <= operation->nblocks);
|
||||
Assert(!is_complete || operation->nblocks == operation->nblocks_done);
|
||||
|
||||
for (int i = 0; i < operation->nblocks; i++)
|
||||
{
|
||||
Buffer buffer = operation->buffers[i];
|
||||
BufferDesc *buf_hdr = BufferIsLocal(buffer) ?
|
||||
GetLocalBufferDescriptor(-buffer - 1) :
|
||||
GetBufferDescriptor(buffer - 1);
|
||||
|
||||
Assert(BufferGetBlockNumber(buffer) == operation->blocknum + i);
|
||||
Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_TAG_VALID);
|
||||
|
||||
if (i < operation->nblocks_done)
|
||||
Assert(pg_atomic_read_u32(&buf_hdr->state) & BM_VALID);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
/* helper for ReadBuffersCanStartIO(), to avoid repetition */
|
||||
static inline bool
|
||||
WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
|
||||
ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
|
||||
{
|
||||
if (BufferIsLocal(buffer))
|
||||
return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
|
||||
@ -1462,28 +1559,85 @@ WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
|
||||
return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
|
||||
*/
|
||||
static inline bool
|
||||
ReadBuffersCanStartIO(Buffer buffer, bool nowait)
|
||||
{
|
||||
/*
|
||||
* If this backend currently has staged IO, we need to submit the pending
|
||||
* IO before waiting for the right to issue IO, to avoid the potential for
|
||||
* deadlocks (and, more commonly, unnecessary delays for other backends).
|
||||
*/
|
||||
if (!nowait && pgaio_have_staged())
|
||||
{
|
||||
if (ReadBuffersCanStartIOOnce(buffer, true))
|
||||
return true;
|
||||
|
||||
/*
|
||||
* Unfortunately StartBufferIO() returning false doesn't allow to
|
||||
* distinguish between the buffer already being valid and IO already
|
||||
* being in progress. Since IO already being in progress is quite
|
||||
* rare, this approach seems fine.
|
||||
*/
|
||||
pgaio_submit_staged();
|
||||
}
|
||||
|
||||
return ReadBuffersCanStartIOOnce(buffer, nowait);
|
||||
}
|
||||
|
||||
/*
|
||||
* Helper for WaitReadBuffers() that processes the results of a readv
|
||||
* operation, raising an error if necessary.
|
||||
*/
|
||||
static void
|
||||
ProcessReadBuffersResult(ReadBuffersOperation *operation)
|
||||
{
|
||||
PgAioReturn *aio_ret = &operation->io_return;
|
||||
PgAioResultStatus rs = aio_ret->result.status;
|
||||
int newly_read_blocks = 0;
|
||||
|
||||
Assert(pgaio_wref_valid(&operation->io_wref));
|
||||
Assert(aio_ret->result.status != PGAIO_RS_UNKNOWN);
|
||||
|
||||
/*
|
||||
* SMGR reports the number of blocks successfully read as the result of
|
||||
* the IO operation. Thus we can simply add that to ->nblocks_done.
|
||||
*/
|
||||
|
||||
if (likely(rs != PGAIO_RS_ERROR))
|
||||
newly_read_blocks = aio_ret->result.result;
|
||||
|
||||
if (rs == PGAIO_RS_ERROR || rs == PGAIO_RS_WARNING)
|
||||
pgaio_result_report(aio_ret->result, &aio_ret->target_data,
|
||||
rs == PGAIO_RS_ERROR ? ERROR : WARNING);
|
||||
else if (aio_ret->result.status == PGAIO_RS_PARTIAL)
|
||||
{
|
||||
/*
|
||||
* We'll retry, so we just emit a debug message to the server log (or
|
||||
* not even that in prod scenarios).
|
||||
*/
|
||||
pgaio_result_report(aio_ret->result, &aio_ret->target_data, DEBUG1);
|
||||
elog(DEBUG3, "partial read, will retry");
|
||||
}
|
||||
|
||||
Assert(newly_read_blocks > 0);
|
||||
Assert(newly_read_blocks <= MAX_IO_COMBINE_LIMIT);
|
||||
|
||||
operation->nblocks_done += newly_read_blocks;
|
||||
|
||||
Assert(operation->nblocks_done <= operation->nblocks);
|
||||
}
|
||||
|
||||
void
|
||||
WaitReadBuffers(ReadBuffersOperation *operation)
|
||||
{
|
||||
Buffer *buffers;
|
||||
int nblocks;
|
||||
BlockNumber blocknum;
|
||||
ForkNumber forknum;
|
||||
PgAioReturn *aio_ret = &operation->io_return;
|
||||
IOContext io_context;
|
||||
IOObject io_object;
|
||||
char persistence;
|
||||
|
||||
/* Find the range of the physical read we need to perform. */
|
||||
nblocks = operation->nblocks;
|
||||
buffers = &operation->buffers[0];
|
||||
blocknum = operation->blocknum;
|
||||
forknum = operation->forknum;
|
||||
persistence = operation->persistence;
|
||||
|
||||
Assert(nblocks > 0);
|
||||
Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
|
||||
|
||||
if (persistence == RELPERSISTENCE_TEMP)
|
||||
if (operation->persistence == RELPERSISTENCE_TEMP)
|
||||
{
|
||||
io_context = IOCONTEXT_NORMAL;
|
||||
io_object = IOOBJECT_TEMP_RELATION;
|
||||
@ -1494,28 +1648,241 @@ WaitReadBuffers(ReadBuffersOperation *operation)
|
||||
io_object = IOOBJECT_RELATION;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nblocks; ++i)
|
||||
{
|
||||
int io_buffers_len;
|
||||
Buffer io_buffers[MAX_IO_COMBINE_LIMIT];
|
||||
void *io_pages[MAX_IO_COMBINE_LIMIT];
|
||||
instr_time io_start;
|
||||
BlockNumber io_first_block;
|
||||
/*
|
||||
* If we get here without an IO operation having been issued, the
|
||||
* io_method == IOMETHOD_SYNC path must have been used. Otherwise the
|
||||
* caller should not have called WaitReadBuffers().
|
||||
*
|
||||
* In the case of IOMETHOD_SYNC, we start - as we used to before the
|
||||
* introducing of AIO - the IO in WaitReadBuffers(). This is done as part
|
||||
* of the retry logic below, no extra code is required.
|
||||
*
|
||||
* This path is expected to eventually go away.
|
||||
*/
|
||||
if (!pgaio_wref_valid(&operation->io_wref) && io_method != IOMETHOD_SYNC)
|
||||
elog(ERROR, "waiting for read operation that didn't read");
|
||||
|
||||
/*
|
||||
* Skip this block if someone else has already completed it. If an
|
||||
* I/O is already in progress in another backend, this will wait for
|
||||
* the outcome: either done, or something went wrong and we will
|
||||
* retry.
|
||||
* To handle partial reads, and IOMETHOD_SYNC, we re-issue IO until we're
|
||||
* done. We may need multiple retries, not just because we could get
|
||||
* multiple partial reads, but also because some of the remaining
|
||||
* to-be-read buffers may have been read in by other backends, limiting
|
||||
* the IO size.
|
||||
*/
|
||||
if (!WaitReadBuffersCanStartIO(buffers[i], false))
|
||||
while (true)
|
||||
{
|
||||
int ignored_nblocks_progress;
|
||||
|
||||
CheckReadBuffersOperation(operation, false);
|
||||
|
||||
/*
|
||||
* If there is an IO associated with the operation, we may need to
|
||||
* wait for it.
|
||||
*/
|
||||
if (pgaio_wref_valid(&operation->io_wref))
|
||||
{
|
||||
/*
|
||||
* Report and track this as a 'hit' for this backend, even though
|
||||
* it must have started out as a miss in PinBufferForBlock(). The
|
||||
* other backend will track this as a 'read'.
|
||||
* Track the time spent waiting for the IO to complete. As
|
||||
* tracking a wait even if we don't actually need to wait
|
||||
*
|
||||
* a) is not cheap, due to the timestamping overhead
|
||||
*
|
||||
* b) reports some time as waiting, even if we never waited
|
||||
*
|
||||
* we first check if we already know the IO is complete.
|
||||
*/
|
||||
TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
|
||||
if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
|
||||
!pgaio_wref_check_done(&operation->io_wref))
|
||||
{
|
||||
instr_time io_start = pgstat_prepare_io_time(track_io_timing);
|
||||
|
||||
pgaio_wref_wait(&operation->io_wref);
|
||||
|
||||
/*
|
||||
* The IO operation itself was already counted earlier, in
|
||||
* AsyncReadBuffers(), this just accounts for the wait time.
|
||||
*/
|
||||
pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
|
||||
io_start, 0, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(pgaio_wref_check_done(&operation->io_wref));
|
||||
}
|
||||
|
||||
/*
|
||||
* We now are sure the IO completed. Check the results. This
|
||||
* includes reporting on errors if there were any.
|
||||
*/
|
||||
ProcessReadBuffersResult(operation);
|
||||
}
|
||||
|
||||
/*
|
||||
* Most of the time, the one IO we already started, will read in
|
||||
* everything. But we need to deal with partial reads and buffers not
|
||||
* needing IO anymore.
|
||||
*/
|
||||
if (operation->nblocks_done == operation->nblocks)
|
||||
break;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* This may only complete the IO partially, either because some
|
||||
* buffers were already valid, or because of a partial read.
|
||||
*
|
||||
* NB: In contrast to after the AsyncReadBuffers() call in
|
||||
* StartReadBuffers(), we do *not* reduce
|
||||
* ReadBuffersOperation->nblocks here, callers expect the full
|
||||
* operation to be completed at this point (as more operations may
|
||||
* have been queued).
|
||||
*/
|
||||
AsyncReadBuffers(operation, &ignored_nblocks_progress);
|
||||
}
|
||||
|
||||
CheckReadBuffersOperation(operation, true);
|
||||
|
||||
/* NB: READ_DONE tracepoint was already executed in completion callback */
|
||||
}
|
||||
|
||||
/*
|
||||
* Initiate IO for the ReadBuffersOperation
|
||||
*
|
||||
* This function only starts a single IO at a time. The size of the IO may be
|
||||
* limited to below the to-be-read blocks, if one of the buffers has
|
||||
* concurrently been read in. If the first to-be-read buffer is already valid,
|
||||
* no IO will be issued.
|
||||
*
|
||||
* To support retries after partial reads, the first operation->nblocks_done
|
||||
* buffers are skipped.
|
||||
*
|
||||
* On return *nblocks_progress is updated to reflect the number of buffers
|
||||
* affected by the call. If the first buffer is valid, *nblocks_progress is
|
||||
* set to 1 and operation->nblocks_done is incremented.
|
||||
*
|
||||
* Returns true if IO was initiated, false if no IO was necessary.
|
||||
*/
|
||||
static bool
|
||||
AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
|
||||
{
|
||||
Buffer *buffers = &operation->buffers[0];
|
||||
int flags = operation->flags;
|
||||
BlockNumber blocknum = operation->blocknum;
|
||||
ForkNumber forknum = operation->forknum;
|
||||
char persistence = operation->persistence;
|
||||
int16 nblocks_done = operation->nblocks_done;
|
||||
Buffer *io_buffers = &operation->buffers[nblocks_done];
|
||||
int io_buffers_len = 0;
|
||||
PgAioHandle *ioh;
|
||||
uint32 ioh_flags = 0;
|
||||
void *io_pages[MAX_IO_COMBINE_LIMIT];
|
||||
IOContext io_context;
|
||||
IOObject io_object;
|
||||
bool did_start_io;
|
||||
|
||||
/*
|
||||
* When this IO is executed synchronously, either because the caller will
|
||||
* immediately block waiting for the IO or because IOMETHOD_SYNC is used,
|
||||
* the AIO subsystem needs to know.
|
||||
*/
|
||||
if (flags & READ_BUFFERS_SYNCHRONOUSLY)
|
||||
ioh_flags |= PGAIO_HF_SYNCHRONOUS;
|
||||
|
||||
if (persistence == RELPERSISTENCE_TEMP)
|
||||
{
|
||||
io_context = IOCONTEXT_NORMAL;
|
||||
io_object = IOOBJECT_TEMP_RELATION;
|
||||
ioh_flags |= PGAIO_HF_REFERENCES_LOCAL;
|
||||
}
|
||||
else
|
||||
{
|
||||
io_context = IOContextForStrategy(operation->strategy);
|
||||
io_object = IOOBJECT_RELATION;
|
||||
}
|
||||
|
||||
/*
|
||||
* If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR
|
||||
* flag. The reason for that is that, hopefully, zero_damaged_pages isn't
|
||||
* set globally, but on a per-session basis. The completion callback,
|
||||
* which may be run in other processes, e.g. in IO workers, may have a
|
||||
* different value of the zero_damaged_pages GUC.
|
||||
*
|
||||
* XXX: We probably should eventually use a different flag for
|
||||
* zero_damaged_pages, so we can report different log levels / error codes
|
||||
* for zero_damaged_pages and ZERO_ON_ERROR.
|
||||
*/
|
||||
if (zero_damaged_pages)
|
||||
flags |= READ_BUFFERS_ZERO_ON_ERROR;
|
||||
|
||||
/*
|
||||
* For the same reason as with zero_damaged_pages we need to use this
|
||||
* backend's ignore_checksum_failure value.
|
||||
*/
|
||||
if (ignore_checksum_failure)
|
||||
flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES;
|
||||
|
||||
|
||||
/*
|
||||
* To be allowed to report stats in the local completion callback we need
|
||||
* to prepare to report stats now. This ensures we can safely report the
|
||||
* checksum failure even in a critical section.
|
||||
*/
|
||||
pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
|
||||
|
||||
/*
|
||||
* Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
|
||||
* might block, which we don't want after setting IO_IN_PROGRESS.
|
||||
*
|
||||
* If we need to wait for IO before we can get a handle, submit
|
||||
* already-staged IO first, so that other backends don't need to wait.
|
||||
* There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to
|
||||
* wait for already submitted IO, which doesn't require additional locks,
|
||||
* but it could still cause undesirable waits.
|
||||
*
|
||||
* A secondary benefit is that this would allow us to measure the time in
|
||||
* pgaio_io_acquire() without causing undue timer overhead in the common,
|
||||
* non-blocking, case. However, currently the pgstats infrastructure
|
||||
* doesn't really allow that, as it a) asserts that an operation can't
|
||||
* have time without operations b) doesn't have an API to report
|
||||
* "accumulated" time.
|
||||
*/
|
||||
ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return);
|
||||
if (unlikely(!ioh))
|
||||
{
|
||||
pgaio_submit_staged();
|
||||
|
||||
ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if we can start IO on the first to-be-read buffer.
|
||||
*
|
||||
* If an I/O is already in progress in another backend, we want to wait
|
||||
* for the outcome: either done, or something went wrong and we will
|
||||
* retry.
|
||||
*/
|
||||
if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
|
||||
{
|
||||
/*
|
||||
* Someone else has already completed this block, we're done.
|
||||
*
|
||||
* When IO is necessary, ->nblocks_done is updated in
|
||||
* ProcessReadBuffersResult(), but that is not called if no IO is
|
||||
* necessary. Thus update here.
|
||||
*/
|
||||
operation->nblocks_done += 1;
|
||||
*nblocks_progress = 1;
|
||||
|
||||
pgaio_io_release(ioh);
|
||||
pgaio_wref_clear(&operation->io_wref);
|
||||
did_start_io = false;
|
||||
|
||||
/*
|
||||
* Report and track this as a 'hit' for this backend, even though it
|
||||
* must have started out as a miss in PinBufferForBlock(). The other
|
||||
* backend will track this as a 'read'.
|
||||
*/
|
||||
TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done,
|
||||
operation->smgr->smgr_rlocator.locator.spcOid,
|
||||
operation->smgr->smgr_rlocator.locator.dbOid,
|
||||
operation->smgr->smgr_rlocator.locator.relNumber,
|
||||
@ -1534,115 +1901,82 @@ WaitReadBuffers(ReadBuffersOperation *operation)
|
||||
|
||||
if (VacuumCostActive)
|
||||
VacuumCostBalance += VacuumCostPageHit;
|
||||
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
instr_time io_start;
|
||||
|
||||
/* We found a buffer that we need to read in. */
|
||||
io_buffers[0] = buffers[i];
|
||||
io_pages[0] = BufferGetBlock(buffers[i]);
|
||||
io_first_block = blocknum + i;
|
||||
Assert(io_buffers[0] == buffers[nblocks_done]);
|
||||
io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
|
||||
io_buffers_len = 1;
|
||||
|
||||
/*
|
||||
* How many neighboring-on-disk blocks can we scatter-read into other
|
||||
* buffers at the same time? In this case we don't wait if we see an
|
||||
* I/O already in progress. We already hold BM_IO_IN_PROGRESS for the
|
||||
* I/O already in progress. We already set BM_IO_IN_PROGRESS for the
|
||||
* head block, so we should get on with that I/O as soon as possible.
|
||||
* We'll come back to this block again, above.
|
||||
*/
|
||||
while ((i + 1) < nblocks &&
|
||||
WaitReadBuffersCanStartIO(buffers[i + 1], true))
|
||||
for (int i = nblocks_done + 1; i < operation->nblocks; i++)
|
||||
{
|
||||
if (!ReadBuffersCanStartIO(buffers[i], true))
|
||||
break;
|
||||
/* Must be consecutive block numbers. */
|
||||
Assert(BufferGetBlockNumber(buffers[i + 1]) ==
|
||||
BufferGetBlockNumber(buffers[i]) + 1);
|
||||
Assert(BufferGetBlockNumber(buffers[i - 1]) ==
|
||||
BufferGetBlockNumber(buffers[i]) - 1);
|
||||
Assert(io_buffers[io_buffers_len] == buffers[i]);
|
||||
|
||||
io_buffers[io_buffers_len] = buffers[++i];
|
||||
io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
|
||||
}
|
||||
|
||||
/* get a reference to wait for in WaitReadBuffers() */
|
||||
pgaio_io_get_wref(ioh, &operation->io_wref);
|
||||
|
||||
/* provide the list of buffers to the completion callbacks */
|
||||
pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
|
||||
|
||||
pgaio_io_register_callbacks(ioh,
|
||||
persistence == RELPERSISTENCE_TEMP ?
|
||||
PGAIO_HCB_LOCAL_BUFFER_READV :
|
||||
PGAIO_HCB_SHARED_BUFFER_READV,
|
||||
flags);
|
||||
|
||||
pgaio_io_set_flag(ioh, ioh_flags);
|
||||
|
||||
/* ---
|
||||
* Even though we're trying to issue IO asynchronously, track the time
|
||||
* in smgrstartreadv():
|
||||
* - if io_method == IOMETHOD_SYNC, we will always perform the IO
|
||||
* immediately
|
||||
* - the io method might not support the IO (e.g. worker IO for a temp
|
||||
* table)
|
||||
* ---
|
||||
*/
|
||||
io_start = pgstat_prepare_io_time(track_io_timing);
|
||||
smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);
|
||||
pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
|
||||
1, io_buffers_len * BLCKSZ);
|
||||
|
||||
/* Verify each block we read, and terminate the I/O. */
|
||||
for (int j = 0; j < io_buffers_len; ++j)
|
||||
{
|
||||
BufferDesc *bufHdr;
|
||||
Block bufBlock;
|
||||
int piv_flags;
|
||||
bool verified;
|
||||
bool checksum_failure;
|
||||
|
||||
if (persistence == RELPERSISTENCE_TEMP)
|
||||
{
|
||||
bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
|
||||
bufBlock = LocalBufHdrGetBlock(bufHdr);
|
||||
}
|
||||
else
|
||||
{
|
||||
bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
|
||||
bufBlock = BufHdrGetBlock(bufHdr);
|
||||
}
|
||||
|
||||
/* check for garbage data */
|
||||
piv_flags = PIV_LOG_WARNING;
|
||||
if (ignore_checksum_failure)
|
||||
piv_flags |= PIV_IGNORE_CHECKSUM_FAILURE;
|
||||
verified = PageIsVerified((Page) bufBlock, io_first_block + j,
|
||||
piv_flags, &checksum_failure);
|
||||
if (checksum_failure)
|
||||
{
|
||||
RelFileLocatorBackend rloc = operation->smgr->smgr_rlocator;
|
||||
|
||||
pgstat_prepare_report_checksum_failure(rloc.locator.dbOid);
|
||||
pgstat_report_checksum_failures_in_db(rloc.locator.dbOid, 1);
|
||||
}
|
||||
|
||||
if (!verified)
|
||||
{
|
||||
if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
|
||||
{
|
||||
ereport(WARNING,
|
||||
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg("invalid page in block %u of relation %s; zeroing out page",
|
||||
io_first_block + j,
|
||||
relpath(operation->smgr->smgr_rlocator, forknum).str)));
|
||||
memset(bufBlock, 0, BLCKSZ);
|
||||
}
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg("invalid page in block %u of relation %s",
|
||||
io_first_block + j,
|
||||
relpath(operation->smgr->smgr_rlocator, forknum).str)));
|
||||
}
|
||||
|
||||
/* Set BM_VALID, terminate IO, and wake up any waiters */
|
||||
if (persistence == RELPERSISTENCE_TEMP)
|
||||
TerminateLocalBufferIO(bufHdr, false, BM_VALID, false);
|
||||
else
|
||||
TerminateBufferIO(bufHdr, false, BM_VALID, true, false);
|
||||
|
||||
/* Report I/Os as completing individually. */
|
||||
TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
|
||||
operation->smgr->smgr_rlocator.locator.spcOid,
|
||||
operation->smgr->smgr_rlocator.locator.dbOid,
|
||||
operation->smgr->smgr_rlocator.locator.relNumber,
|
||||
operation->smgr->smgr_rlocator.backend,
|
||||
false);
|
||||
}
|
||||
smgrstartreadv(ioh, operation->smgr, forknum,
|
||||
blocknum + nblocks_done,
|
||||
io_pages, io_buffers_len);
|
||||
pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
|
||||
io_start, 1, io_buffers_len * BLCKSZ);
|
||||
|
||||
if (persistence == RELPERSISTENCE_TEMP)
|
||||
pgBufferUsage.local_blks_read += io_buffers_len;
|
||||
else
|
||||
pgBufferUsage.shared_blks_read += io_buffers_len;
|
||||
|
||||
/*
|
||||
* Track vacuum cost when issuing IO, not after waiting for it.
|
||||
* Otherwise we could end up issuing a lot of IO in a short timespan,
|
||||
* despite a low cost limit.
|
||||
*/
|
||||
if (VacuumCostActive)
|
||||
VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
|
||||
|
||||
*nblocks_progress = io_buffers_len;
|
||||
did_start_io = true;
|
||||
}
|
||||
|
||||
return did_start_io;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -114,6 +114,9 @@ typedef struct BufferManagerRelation
|
||||
#define READ_BUFFERS_ISSUE_ADVICE (1 << 1)
|
||||
/* Don't treat page as invalid due to checksum failures. */
|
||||
#define READ_BUFFERS_IGNORE_CHECKSUM_FAILURES (1 << 2)
|
||||
/* IO will immediately be waited for */
|
||||
#define READ_BUFFERS_SYNCHRONOUSLY (1 << 3)
|
||||
|
||||
|
||||
struct ReadBuffersOperation
|
||||
{
|
||||
@ -133,6 +136,9 @@ struct ReadBuffersOperation
|
||||
BlockNumber blocknum;
|
||||
int flags;
|
||||
int16 nblocks;
|
||||
int16 nblocks_done;
|
||||
PgAioWaitRef io_wref;
|
||||
PgAioReturn io_return;
|
||||
};
|
||||
|
||||
typedef struct ReadBuffersOperation ReadBuffersOperation;
|
||||
|
Loading…
x
Reference in New Issue
Block a user