1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

bufmgr: Implement AIO read support

This commit implements the infrastructure to perform asynchronous reads into
the buffer pool.

To do so, it:

- Adds readv AIO callbacks for shared and local buffers

  It may be worth calling out that shared buffer completions may be run in a
  different backend than where the IO started.

- Adds an AIO wait reference to BufferDesc, to allow backends to wait for
  in-progress asynchronous IOs

- Adapts StartBufferIO(), WaitIO(), TerminateBufferIO(), and their localbuf.c
  equivalents, to be able to deal with AIO

- Moves the code to handle BM_PIN_COUNT_WAITER into a helper function, as it
  now also needs to be called on IO completion

As of this commit, nothing issues AIO on shared/local buffers. A future commit
will update StartReadBuffers() to do so.

Buffer reads executed through this infrastructure will report invalid page /
checksum errors / warnings differently than before:

In the error case the error message will cover all the blocks that were
included in the read, rather than just the reporting the first invalid
block. If more than one block is invalid, the error will include information
about the range of the read, the first invalid block and the number of invalid
pages, with a HINT towards the server log for per-block details.

For the warning case (i.e. zero_damaged_buffers) we would previously emit one
warning message for each buffer in a multi-block read. Now there is only a
single warning message for the entire read, again referring to the server log
for more details in case of multiple checksum failures within a single larger
read.

Reviewed-by: Noah Misch <noah@leadboat.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
This commit is contained in:
Andres Freund
2025-03-30 17:28:03 -04:00
parent ef64fe26ba
commit 047cba7fa0
10 changed files with 885 additions and 66 deletions

View File

@@ -18,6 +18,7 @@
#include "miscadmin.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/bufmgr.h"
#include "storage/md.h"
@@ -40,6 +41,10 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb),
#undef CALLBACK_ENTRY
};

View File

@@ -147,9 +147,12 @@ in the buffer. It is used per the rules above.
* The BM_IO_IN_PROGRESS flag acts as a kind of lock, used to wait for I/O on a
buffer to complete (and in releases before 14, it was accompanied by a
per-buffer LWLock). The process doing a read or write sets the flag for the
duration, and processes that need to wait for it to be cleared sleep on a
condition variable.
per-buffer LWLock). The process starting a read or write sets the flag. When
the I/O is completed, be it by the process that initiated the I/O or by
another process, the flag is removed and the Buffer's condition variable is
signalled. Processes that need to wait for the I/O to complete can wait for
asynchronous I/O by using BufferDesc->io_wref and for BM_IO_IN_PROGRESS to be
unset by sleeping on the buffer's condition variable.
Normal Buffer Replacement Strategy

View File

@@ -14,6 +14,7 @@
*/
#include "postgres.h"
#include "storage/aio.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
@@ -125,6 +126,8 @@ BufferManagerShmemInit(void)
buf->buf_id = i;
pgaio_wref_clear(&buf->io_wref);
/*
* Initially link all the buffers together as unused. Subsequent
* management of this list is done by freelist.c.

View File

@@ -48,6 +48,7 @@
#include "pg_trace.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
#include "storage/aio.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
@@ -519,7 +520,8 @@ static int SyncOneBuffer(int buf_id, bool skip_recently_used,
static void WaitIO(BufferDesc *buf);
static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
uint32 set_flag_bits, bool forget_owner);
uint32 set_flag_bits, bool forget_owner,
bool release_aio);
static void AbortBufferIO(Buffer buffer);
static void shared_buffer_write_error_callback(void *arg);
static void local_buffer_write_error_callback(void *arg);
@@ -1041,7 +1043,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
{
/* Simple case for non-shared buffers. */
bufHdr = GetLocalBufferDescriptor(-buffer - 1);
need_to_zero = StartLocalBufferIO(bufHdr, true);
need_to_zero = StartLocalBufferIO(bufHdr, true, false);
}
else
{
@@ -1077,9 +1079,9 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
/* Set BM_VALID, terminate IO, and wake up any waiters */
if (isLocalBuf)
TerminateLocalBufferIO(bufHdr, false, BM_VALID);
TerminateLocalBufferIO(bufHdr, false, BM_VALID, false);
else
TerminateBufferIO(bufHdr, false, BM_VALID, true);
TerminateBufferIO(bufHdr, false, BM_VALID, true, false);
}
else if (!isLocalBuf)
{
@@ -1454,7 +1456,8 @@ static inline bool
WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
{
if (BufferIsLocal(buffer))
return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), true);
return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
true, nowait);
else
return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
}
@@ -1619,9 +1622,9 @@ WaitReadBuffers(ReadBuffersOperation *operation)
/* Set BM_VALID, terminate IO, and wake up any waiters */
if (persistence == RELPERSISTENCE_TEMP)
TerminateLocalBufferIO(bufHdr, false, BM_VALID);
TerminateLocalBufferIO(bufHdr, false, BM_VALID, false);
else
TerminateBufferIO(bufHdr, false, BM_VALID, true);
TerminateBufferIO(bufHdr, false, BM_VALID, true, false);
/* Report I/Os as completing individually. */
TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
@@ -1883,13 +1886,14 @@ retry:
}
/*
* We assume the only reason for it to be pinned is that someone else is
* flushing the page out. Wait for them to finish. (This could be an
* infinite loop if the refcount is messed up... it would be nice to time
* out after awhile, but there seems no way to be sure how many loops may
* be needed. Note that if the other guy has pinned the buffer but not
* yet done StartBufferIO, WaitIO will fall through and we'll effectively
* be busy-looping here.)
* We assume the reason for it to be pinned is that either we were
* asynchronously reading the page in before erroring out or someone else
* is flushing the page out. Wait for the IO to finish. (This could be
* an infinite loop if the refcount is messed up... it would be nice to
* time out after awhile, but there seems no way to be sure how many loops
* may be needed. Note that if the other guy has pinned the buffer but
* not yet done StartBufferIO, WaitIO will fall through and we'll
* effectively be busy-looping here.)
*/
if (BUF_STATE_GET_REFCOUNT(buf_state) != 0)
{
@@ -2529,7 +2533,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
if (lock)
LWLockAcquire(BufferDescriptorGetContentLock(buf_hdr), LW_EXCLUSIVE);
TerminateBufferIO(buf_hdr, false, BM_VALID, true);
TerminateBufferIO(buf_hdr, false, BM_VALID, true, false);
}
pgBufferUsage.shared_blks_written += extend_by;
@@ -2875,6 +2879,44 @@ PinBuffer_Locked(BufferDesc *buf)
ResourceOwnerRememberBuffer(CurrentResourceOwner, b);
}
/*
* Support for waking up another backend that is waiting for the cleanup lock
* to be released using BM_PIN_COUNT_WAITER.
*
* See LockBufferForCleanup().
*
* Expected to be called just after releasing a buffer pin (in a BufferDesc,
* not just reducing the backend-local pincount for the buffer).
*/
static void
WakePinCountWaiter(BufferDesc *buf)
{
/*
* Acquire the buffer header lock, re-check that there's a waiter. Another
* backend could have unpinned this buffer, and already woken up the
* waiter.
*
* There's no danger of the buffer being replaced after we unpinned it
* above, as it's pinned by the waiter. The waiter removes
* BM_PIN_COUNT_WAITER if it stops waiting for a reason other than this
* backend waking it up.
*/
uint32 buf_state = LockBufHdr(buf);
if ((buf_state & BM_PIN_COUNT_WAITER) &&
BUF_STATE_GET_REFCOUNT(buf_state) == 1)
{
/* we just released the last pin other than the waiter's */
int wait_backend_pgprocno = buf->wait_backend_pgprocno;
buf_state &= ~BM_PIN_COUNT_WAITER;
UnlockBufHdr(buf, buf_state);
ProcSendSignal(wait_backend_pgprocno);
}
else
UnlockBufHdr(buf, buf_state);
}
/*
* UnpinBuffer -- make buffer available for replacement.
*
@@ -2943,29 +2985,8 @@ UnpinBufferNoOwner(BufferDesc *buf)
/* Support LockBufferForCleanup() */
if (buf_state & BM_PIN_COUNT_WAITER)
{
/*
* Acquire the buffer header lock, re-check that there's a waiter.
* Another backend could have unpinned this buffer, and already
* woken up the waiter. There's no danger of the buffer being
* replaced after we unpinned it above, as it's pinned by the
* waiter.
*/
buf_state = LockBufHdr(buf);
WakePinCountWaiter(buf);
if ((buf_state & BM_PIN_COUNT_WAITER) &&
BUF_STATE_GET_REFCOUNT(buf_state) == 1)
{
/* we just released the last pin other than the waiter's */
int wait_backend_pgprocno = buf->wait_backend_pgprocno;
buf_state &= ~BM_PIN_COUNT_WAITER;
UnlockBufHdr(buf, buf_state);
ProcSendSignal(wait_backend_pgprocno);
}
else
UnlockBufHdr(buf, buf_state);
}
ForgetPrivateRefCountEntry(ref);
}
}
@@ -3986,7 +4007,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
* Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
* end the BM_IO_IN_PROGRESS state.
*/
TerminateBufferIO(buf, true, 0, true);
TerminateBufferIO(buf, true, 0, true, false);
TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag),
buf->tag.blockNum,
@@ -5256,6 +5277,13 @@ LockBufferForCleanup(Buffer buffer)
CheckBufferIsPinnedOnce(buffer);
/*
* We do not yet need to be worried about in-progress AIOs holding a pin,
* as we, so far, only support doing reads via AIO and this function can
* only be called once the buffer is valid (i.e. no read can be in
* flight).
*/
/* Nobody else to wait for */
if (BufferIsLocal(buffer))
return;
@@ -5413,6 +5441,8 @@ ConditionalLockBufferForCleanup(Buffer buffer)
Assert(BufferIsValid(buffer));
/* see AIO related comment in LockBufferForCleanup() */
if (BufferIsLocal(buffer))
{
refcount = LocalRefCount[-buffer - 1];
@@ -5468,6 +5498,8 @@ IsBufferCleanupOK(Buffer buffer)
Assert(BufferIsValid(buffer));
/* see AIO related comment in LockBufferForCleanup() */
if (BufferIsLocal(buffer))
{
/* There should be exactly one pin */
@@ -5520,6 +5552,7 @@ WaitIO(BufferDesc *buf)
for (;;)
{
uint32 buf_state;
PgAioWaitRef iow;
/*
* It may not be necessary to acquire the spinlock to check the flag
@@ -5527,10 +5560,40 @@ WaitIO(BufferDesc *buf)
* play it safe.
*/
buf_state = LockBufHdr(buf);
/*
* Copy the wait reference while holding the spinlock. This protects
* against a concurrent TerminateBufferIO() in another backend from
* clearing the wref while it's being read.
*/
iow = buf->io_wref;
UnlockBufHdr(buf, buf_state);
/* no IO in progress, we don't need to wait */
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
/*
* The buffer has asynchronous IO in progress, wait for it to
* complete.
*/
if (pgaio_wref_valid(&iow))
{
pgaio_wref_wait(&iow);
/*
* The AIO subsystem internally uses condition variables and thus
* might remove this backend from the BufferDesc's CV. While that
* wouldn't cause a correctness issue (the first CV sleep just
* immediately returns if not already registered), it seems worth
* avoiding unnecessary loop iterations, given that we take care
* to do so at the start of the function.
*/
ConditionVariablePrepareToSleep(cv);
continue;
}
/* wait on BufferDesc->cv, e.g. for concurrent synchronous IO */
ConditionVariableSleep(cv, WAIT_EVENT_BUFFER_IO);
}
ConditionVariableCancelSleep();
@@ -5539,13 +5602,12 @@ WaitIO(BufferDesc *buf)
/*
* StartBufferIO: begin I/O on this buffer
* (Assumptions)
* My process is executing no IO
* My process is executing no IO on this buffer
* The buffer is Pinned
*
* In some scenarios there are race conditions in which multiple backends
* could attempt the same I/O operation concurrently. If someone else
* has already started I/O on this buffer then we will block on the
* I/O condition variable until he's done.
* In some scenarios multiple backends could attempt the same I/O operation
* concurrently. If someone else has already started I/O on this buffer then
* we will wait for completion of the IO using WaitIO().
*
* Input operations are only attempted on buffers that are not BM_VALID,
* and output operations only on buffers that are BM_VALID and BM_DIRTY,
@@ -5581,9 +5643,9 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
/* Once we get here, there is definitely no I/O active on this buffer */
/* Check if someone else already did the I/O */
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
/* someone else already did the I/O */
UnlockBufHdr(buf, buf_state);
return false;
}
@@ -5619,7 +5681,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
*/
static void
TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
bool forget_owner)
bool forget_owner, bool release_aio)
{
uint32 buf_state;
@@ -5634,6 +5696,14 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
if (release_aio)
{
/* release ownership by the AIO subsystem */
Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
buf_state -= BUF_REFCOUNT_ONE;
pgaio_wref_clear(&buf->io_wref);
}
buf_state |= set_flag_bits;
UnlockBufHdr(buf, buf_state);
@@ -5642,6 +5712,17 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
BufferDescriptorGetBuffer(buf));
ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
/*
* Support LockBufferForCleanup()
*
* We may have just released the last pin other than the waiter's. In most
* cases, this backend holds another pin on the buffer. But, if, for
* example, this backend is completing an IO issued by another backend, it
* may be time to wake the waiter.
*/
if (release_aio && (buf_state & BM_PIN_COUNT_WAITER))
WakePinCountWaiter(buf);
}
/*
@@ -5690,7 +5771,7 @@ AbortBufferIO(Buffer buffer)
}
}
TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false);
TerminateBufferIO(buf_hdr, false, BM_IO_ERROR, false, false);
}
/*
@@ -6141,3 +6222,671 @@ EvictUnpinnedBuffer(Buffer buf)
return result;
}
/*
* Generic implementation of the AIO handle staging callback for readv/writev
* on local/shared buffers.
*
* Each readv/writev can target multiple buffers. The buffers have already
* been registered with the IO handle.
*
* To make the IO ready for execution ("staging"), we need to ensure that the
* targeted buffers are in an appropriate state while the IO is ongoing. For
* that the AIO subsystem needs to have its own buffer pin, otherwise an error
* in this backend could lead to this backend's buffer pin being released as
* part of error handling, which in turn could lead to the buffer being
* replaced while IO is ongoing.
*/
static pg_attribute_always_inline void
buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
{
uint64 *io_data;
uint8 handle_data_len;
PgAioWaitRef io_ref;
BufferTag first PG_USED_FOR_ASSERTS_ONLY = {0};
io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
pgaio_io_get_wref(ioh, &io_ref);
/* iterate over all buffers affected by the vectored readv/writev */
for (int i = 0; i < handle_data_len; i++)
{
Buffer buffer = (Buffer) io_data[i];
BufferDesc *buf_hdr = is_temp ?
GetLocalBufferDescriptor(-buffer - 1)
: GetBufferDescriptor(buffer - 1);
uint32 buf_state;
/*
* Check that all the buffers are actually ones that could conceivably
* be done in one IO, i.e. are sequential. This is the last
* buffer-aware code before IO is actually executed and confusion
* about which buffers are targeted by IO can be hard to debug, making
* it worth doing extra-paranoid checks.
*/
if (i == 0)
first = buf_hdr->tag;
else
{
Assert(buf_hdr->tag.relNumber == first.relNumber);
Assert(buf_hdr->tag.blockNum == first.blockNum + i);
}
if (is_temp)
buf_state = pg_atomic_read_u32(&buf_hdr->state);
else
buf_state = LockBufHdr(buf_hdr);
/* verify the buffer is in the expected state */
Assert(buf_state & BM_TAG_VALID);
if (is_write)
{
Assert(buf_state & BM_VALID);
Assert(buf_state & BM_DIRTY);
}
else
{
Assert(!(buf_state & BM_VALID));
Assert(!(buf_state & BM_DIRTY));
}
/* temp buffers don't use BM_IO_IN_PROGRESS */
if (!is_temp)
Assert(buf_state & BM_IO_IN_PROGRESS);
Assert(BUF_STATE_GET_REFCOUNT(buf_state) >= 1);
/*
* Reflect that the buffer is now owned by the AIO subsystem.
*
* For local buffers: This can't be done just via LocalRefCount, as
* one might initially think, as this backend could error out while
* AIO is still in progress, releasing all the pins by the backend
* itself.
*
* This pin is released again in TerminateBufferIO().
*/
buf_state += BUF_REFCOUNT_ONE;
buf_hdr->io_wref = io_ref;
if (is_temp)
pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
else
UnlockBufHdr(buf_hdr, buf_state);
/*
* Ensure the content lock that prevents buffer modifications while
* the buffer is being written out is not released early due to an
* error.
*/
if (is_write && !is_temp)
{
LWLock *content_lock;
content_lock = BufferDescriptorGetContentLock(buf_hdr);
Assert(LWLockHeldByMe(content_lock));
/*
* Lock is now owned by AIO subsystem.
*/
LWLockDisown(content_lock);
}
/*
* Stop tracking this buffer via the resowner - the AIO system now
* keeps track.
*/
if (!is_temp)
ResourceOwnerForgetBufferIO(CurrentResourceOwner, buffer);
}
}
/*
* Decode readv errors as encoded by buffer_readv_encode_error().
*/
static inline void
buffer_readv_decode_error(PgAioResult result,
bool *zeroed_any,
bool *ignored_any,
uint8 *zeroed_or_error_count,
uint8 *checkfail_count,
uint8 *first_off)
{
uint32 rem_error = result.error_data;
/* see static asserts in buffer_readv_encode_error */
#define READV_COUNT_BITS 7
#define READV_COUNT_MASK ((1 << READV_COUNT_BITS) - 1)
*zeroed_any = rem_error & 1;
rem_error >>= 1;
*ignored_any = rem_error & 1;
rem_error >>= 1;
*zeroed_or_error_count = rem_error & READV_COUNT_MASK;
rem_error >>= READV_COUNT_BITS;
*checkfail_count = rem_error & READV_COUNT_MASK;
rem_error >>= READV_COUNT_BITS;
*first_off = rem_error & READV_COUNT_MASK;
rem_error >>= READV_COUNT_BITS;
}
/*
* Helper to encode errors for buffer_readv_complete()
*
* Errors are encoded as follows:
* - bit 0 indicates whether any page was zeroed (1) or not (0)
* - bit 1 indicates whether any checksum failure was ignored (1) or not (0)
* - next READV_COUNT_BITS bits indicate the number of errored or zeroed pages
* - next READV_COUNT_BITS bits indicate the number of checksum failures
* - next READV_COUNT_BITS bits indicate the first offset of the first page
* that was errored or zeroed or, if no errors/zeroes, the first ignored
* checksum
*/
static inline void
buffer_readv_encode_error(PgAioResult *result,
bool is_temp,
bool zeroed_any,
bool ignored_any,
uint8 error_count,
uint8 zeroed_count,
uint8 checkfail_count,
uint8 first_error_off,
uint8 first_zeroed_off,
uint8 first_ignored_off)
{
uint8 shift = 0;
uint8 zeroed_or_error_count =
error_count > 0 ? error_count : zeroed_count;
uint8 first_off;
StaticAssertStmt(PG_IOV_MAX <= 1 << READV_COUNT_BITS,
"PG_IOV_MAX is bigger than reserved space for error data");
StaticAssertStmt((1 + 1 + 3 * READV_COUNT_BITS) <= PGAIO_RESULT_ERROR_BITS,
"PGAIO_RESULT_ERROR_BITS is insufficient for buffer_readv");
/*
* We only have space to encode one offset - but luckily that's good
* enough. If there is an error, the error is the interesting offset, same
* with a zeroed buffer vs an ignored buffer.
*/
if (error_count > 0)
first_off = first_error_off;
else if (zeroed_count > 0)
first_off = first_zeroed_off;
else
first_off = first_ignored_off;
Assert(!zeroed_any || error_count == 0);
result->error_data = 0;
result->error_data |= zeroed_any << shift;
shift += 1;
result->error_data |= ignored_any << shift;
shift += 1;
result->error_data |= ((uint32) zeroed_or_error_count) << shift;
shift += READV_COUNT_BITS;
result->error_data |= ((uint32) checkfail_count) << shift;
shift += READV_COUNT_BITS;
result->error_data |= ((uint32) first_off) << shift;
shift += READV_COUNT_BITS;
result->id = is_temp ? PGAIO_HCB_LOCAL_BUFFER_READV :
PGAIO_HCB_SHARED_BUFFER_READV;
if (error_count > 0)
result->status = PGAIO_RS_ERROR;
else
result->status = PGAIO_RS_WARNING;
/*
* The encoding is complicated enough to warrant cross-checking it against
* the decode function.
*/
#ifdef USE_ASSERT_CHECKING
{
bool zeroed_any_2,
ignored_any_2;
uint8 zeroed_or_error_count_2,
checkfail_count_2,
first_off_2;
buffer_readv_decode_error(*result,
&zeroed_any_2, &ignored_any_2,
&zeroed_or_error_count_2,
&checkfail_count_2,
&first_off_2);
Assert(zeroed_any == zeroed_any_2);
Assert(ignored_any == ignored_any_2);
Assert(zeroed_or_error_count == zeroed_or_error_count_2);
Assert(checkfail_count == checkfail_count_2);
Assert(first_off == first_off_2);
}
#endif
#undef READV_COUNT_BITS
#undef READV_COUNT_MASK
}
/*
* Helper for AIO readv completion callbacks, supporting both shared and temp
* buffers. Gets called once for each buffer in a multi-page read.
*/
static pg_attribute_always_inline void
buffer_readv_complete_one(PgAioTargetData *td, uint8 buf_off, Buffer buffer,
uint8 flags, bool failed, bool is_temp,
bool *buffer_invalid,
bool *failed_checksum,
bool *ignored_checksum,
bool *zeroed_buffer)
{
BufferDesc *buf_hdr = is_temp ?
GetLocalBufferDescriptor(-buffer - 1)
: GetBufferDescriptor(buffer - 1);
BufferTag tag = buf_hdr->tag;
char *bufdata = BufferGetBlock(buffer);
uint32 set_flag_bits;
int piv_flags;
/* check that the buffer is in the expected state for a read */
#ifdef USE_ASSERT_CHECKING
{
uint32 buf_state = pg_atomic_read_u32(&buf_hdr->state);
Assert(buf_state & BM_TAG_VALID);
Assert(!(buf_state & BM_VALID));
/* temp buffers don't use BM_IO_IN_PROGRESS */
if (!is_temp)
Assert(buf_state & BM_IO_IN_PROGRESS);
Assert(!(buf_state & BM_DIRTY));
}
#endif
*buffer_invalid = false;
*failed_checksum = false;
*ignored_checksum = false;
*zeroed_buffer = false;
/*
* We ask PageIsVerified() to only log the message about checksum errors,
* as the completion might be run in any backend (or IO workers). We will
* report checksum errors in buffer_readv_report().
*/
piv_flags = PIV_LOG_LOG;
/* the local zero_damaged_pages may differ from the definer's */
if (flags & READ_BUFFERS_IGNORE_CHECKSUM_FAILURES)
piv_flags |= PIV_IGNORE_CHECKSUM_FAILURE;
/* Check for garbage data. */
if (!failed)
{
PgAioResult result_one;
if (!PageIsVerified((Page) bufdata, tag.blockNum, piv_flags,
failed_checksum))
{
if (flags & READ_BUFFERS_ZERO_ON_ERROR)
{
memset(bufdata, 0, BLCKSZ);
*zeroed_buffer = true;
}
else
{
*buffer_invalid = true;
/* mark buffer as having failed */
failed = true;
}
}
else if (*failed_checksum)
*ignored_checksum = true;
/*
* Immediately log a message about the invalid page, but only to the
* server log. The reason to do so immediately is that this may be
* executed in a different backend than the one that originated the
* request. The reason to do so immediately is that the originator
* might not process the query result immediately (because it is busy
* doing another part of query processing) or at all (e.g. if it was
* cancelled or errored out due to another IO also failing). The
* definer of the IO will emit an ERROR or WARNING when processing the
* IO's results
*
* To avoid duplicating the code to emit these log messages, we reuse
* buffer_readv_report().
*/
if (*buffer_invalid || *failed_checksum || *zeroed_buffer)
{
buffer_readv_encode_error(&result_one, is_temp,
*zeroed_buffer,
*ignored_checksum,
*buffer_invalid,
*zeroed_buffer ? 1 : 0,
*failed_checksum ? 1 : 0,
buf_off, buf_off, buf_off);
pgaio_result_report(result_one, td, LOG_SERVER_ONLY);
}
}
/* Terminate I/O and set BM_VALID. */
set_flag_bits = failed ? BM_IO_ERROR : BM_VALID;
if (is_temp)
TerminateLocalBufferIO(buf_hdr, false, set_flag_bits, true);
else
TerminateBufferIO(buf_hdr, false, set_flag_bits, false, true);
/*
* Call the BUFFER_READ_DONE tracepoint in the callback, even though the
* callback may not be executed in the same backend that called
* BUFFER_READ_START. The alternative would be to defer calling the
* tracepoint to a later point (e.g. the local completion callback for
* shared buffer reads), which seems even less helpful.
*/
TRACE_POSTGRESQL_BUFFER_READ_DONE(tag.forkNum,
tag.blockNum,
tag.spcOid,
tag.dbOid,
tag.relNumber,
is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
false);
}
/*
* Perform completion handling of a single AIO read. This read may cover
* multiple blocks / buffers.
*
* Shared between shared and local buffers, to reduce code duplication.
*/
static pg_attribute_always_inline PgAioResult
buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
uint8 cb_data, bool is_temp)
{
PgAioResult result = prior_result;
PgAioTargetData *td = pgaio_io_get_target_data(ioh);
uint8 first_error_off = 0;
uint8 first_zeroed_off = 0;
uint8 first_ignored_off = 0;
uint8 error_count = 0;
uint8 zeroed_count = 0;
uint8 ignored_count = 0;
uint8 checkfail_count = 0;
uint64 *io_data;
uint8 handle_data_len;
if (is_temp)
{
Assert(td->smgr.is_temp);
Assert(pgaio_io_get_owner(ioh) == MyProcNumber);
}
else
Assert(!td->smgr.is_temp);
/*
* Iterate over all the buffers affected by this IO and call the
* per-buffer completion function for each buffer.
*/
io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++)
{
Buffer buf = io_data[buf_off];
bool failed;
bool failed_verification = false;
bool failed_checksum = false;
bool zeroed_buffer = false;
bool ignored_checksum = false;
Assert(BufferIsValid(buf));
/*
* If the entire I/O failed on a lower-level, each buffer needs to be
* marked as failed. In case of a partial read, the first few buffers
* may be ok.
*/
failed =
prior_result.status == PGAIO_RS_ERROR
|| prior_result.result <= buf_off;
buffer_readv_complete_one(td, buf_off, buf, cb_data, failed, is_temp,
&failed_verification,
&failed_checksum,
&ignored_checksum,
&zeroed_buffer);
/*
* Track information about the number of different kinds of error
* conditions across all pages, as there can be multiple pages failing
* verification as part of one IO.
*/
if (failed_verification && !zeroed_buffer && error_count++ == 0)
first_error_off = buf_off;
if (zeroed_buffer && zeroed_count++ == 0)
first_zeroed_off = buf_off;
if (ignored_checksum && ignored_count++ == 0)
first_ignored_off = buf_off;
if (failed_checksum)
checkfail_count++;
}
/*
* If the smgr read succeeded [partially] and page verification failed for
* some of the pages, adjust the IO's result state appropriately.
*/
if (prior_result.status != PGAIO_RS_ERROR &&
(error_count > 0 || ignored_count > 0 || zeroed_count > 0))
{
buffer_readv_encode_error(&result, is_temp,
zeroed_count > 0, ignored_count > 0,
error_count, zeroed_count, checkfail_count,
first_error_off, first_zeroed_off,
first_ignored_off);
pgaio_result_report(result, td, DEBUG1);
}
/*
* For shared relations this reporting is done in
* shared_buffer_readv_complete_local().
*/
if (is_temp && checkfail_count > 0)
pgstat_report_checksum_failures_in_db(td->smgr.rlocator.dbOid,
checkfail_count);
return result;
}
/*
* AIO error reporting callback for aio_shared_buffer_readv_cb and
* aio_local_buffer_readv_cb.
*
* The error is encoded / decoded in buffer_readv_encode_error() /
* buffer_readv_decode_error().
*/
static void
buffer_readv_report(PgAioResult result, const PgAioTargetData *td,
int elevel)
{
int nblocks = td->smgr.nblocks;
BlockNumber first = td->smgr.blockNum;
BlockNumber last = first + nblocks - 1;
ProcNumber errProc =
td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER;
RelPathStr rpath =
relpathbackend(td->smgr.rlocator, errProc, td->smgr.forkNum);
bool zeroed_any,
ignored_any;
uint8 zeroed_or_error_count,
checkfail_count,
first_off;
uint8 affected_count;
const char *msg_one,
*msg_mult,
*det_mult,
*hint_mult;
buffer_readv_decode_error(result, &zeroed_any, &ignored_any,
&zeroed_or_error_count,
&checkfail_count,
&first_off);
/*
* Treat a read that had both zeroed buffers *and* ignored checksums as a
* special case, it's too irregular to be emitted the same way as the
* other cases.
*/
if (zeroed_any && ignored_any)
{
Assert(zeroed_any && ignored_any);
Assert(nblocks > 1); /* same block can't be both zeroed and ignored */
Assert(result.status != PGAIO_RS_ERROR);
affected_count = zeroed_or_error_count;
ereport(elevel,
errcode(ERRCODE_DATA_CORRUPTED),
errmsg("zeroing %u page(s) and ignoring %u checksum failure(s) among blocks %u..%u of relation %s",
affected_count, checkfail_count, first, last, rpath.str),
affected_count > 1 ?
errdetail("Block %u held first zeroed page.",
first + first_off) : 0,
errhint("See server log for details about the other %u invalid block(s).",
affected_count + checkfail_count - 1));
return;
}
/*
* The other messages are highly repetitive. To avoid duplicating a long
* and complicated ereport(), gather the translated format strings
* separately and then do one common ereport.
*/
if (result.status == PGAIO_RS_ERROR)
{
Assert(!zeroed_any); /* can't have invalid pages when zeroing them */
affected_count = zeroed_or_error_count;
msg_one = _("invalid page in block %u of relation %s");
msg_mult = _("%u invalid pages among blocks %u..%u of relation %s");
det_mult = _("Block %u held first invalid page.");
hint_mult = _("See server log for the other %u invalid block(s).");
}
else if (zeroed_any && !ignored_any)
{
affected_count = zeroed_or_error_count;
msg_one = _("invalid page in block %u of relation %s; zeroing out page");
msg_mult = _("zeroing out %u invalid pages among blocks %u..%u of relation %s");
det_mult = _("Block %u held first zeroed page.");
hint_mult = _("See server log for the other %u zeroed block(s).");
}
else if (!zeroed_any && ignored_any)
{
affected_count = checkfail_count;
msg_one = _("ignoring checksum failure in block %u of relation %s");
msg_mult = _("ignoring %u checksum failures among blocks %u..%u of relation %s");
det_mult = _("Block %u held first ignored page.");
hint_mult = _("See server log for the other %u ignored block(s).");
}
else
pg_unreachable();
ereport(elevel,
errcode(ERRCODE_DATA_CORRUPTED),
affected_count == 1 ?
errmsg_internal(msg_one, first + first_off, rpath.str) :
errmsg_internal(msg_mult, affected_count, first, last, rpath.str),
affected_count > 1 ? errdetail_internal(det_mult, first + first_off) : 0,
affected_count > 1 ? errhint_internal(hint_mult, affected_count - 1) : 0);
}
static void
shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
{
buffer_stage_common(ioh, false, false);
}
static PgAioResult
shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
uint8 cb_data)
{
return buffer_readv_complete(ioh, prior_result, cb_data, false);
}
/*
* We need a backend-local completion callback for shared buffers, to be able
* to report checksum errors correctly. Unfortunately that can only safely
* happen if the reporting backend has previously called
* pgstat_prepare_report_checksum_failure(), which we can only guarantee in
* the backend that started the IO. Hence this callback.
*/
static PgAioResult
shared_buffer_readv_complete_local(PgAioHandle *ioh, PgAioResult prior_result,
uint8 cb_data)
{
bool zeroed_any,
ignored_any;
uint8 zeroed_or_error_count,
checkfail_count,
first_off;
if (prior_result.status == PGAIO_RS_OK)
return prior_result;
buffer_readv_decode_error(prior_result,
&zeroed_any,
&ignored_any,
&zeroed_or_error_count,
&checkfail_count,
&first_off);
if (checkfail_count)
{
PgAioTargetData *td = pgaio_io_get_target_data(ioh);
pgstat_report_checksum_failures_in_db(td->smgr.rlocator.dbOid,
checkfail_count);
}
return prior_result;
}
static void
local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
{
buffer_stage_common(ioh, false, true);
}
static PgAioResult
local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
uint8 cb_data)
{
return buffer_readv_complete(ioh, prior_result, cb_data, true);
}
/* readv callback is passed READ_BUFFERS_* flags as callback data */
const PgAioHandleCallbacks aio_shared_buffer_readv_cb = {
.stage = shared_buffer_readv_stage,
.complete_shared = shared_buffer_readv_complete,
/* need a local callback to report checksum failures */
.complete_local = shared_buffer_readv_complete_local,
.report = buffer_readv_report,
};
/* readv callback is passed READ_BUFFERS_* flags as callback data */
const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
.stage = local_buffer_readv_stage,
/*
* Note that this, in contrast to the shared_buffers case, uses
* complete_local, as only the issuing backend has access to the required
* datastructures. This is important in case the IO completion may be
* consumed incidentally by another backend.
*/
.complete_local = local_buffer_readv_complete,
.report = buffer_readv_report,
};

View File

@@ -18,6 +18,7 @@
#include "access/parallel.h"
#include "executor/instrument.h"
#include "pgstat.h"
#include "storage/aio.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
@@ -187,7 +188,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
* Try to start an I/O operation. There currently are no reasons for
* StartLocalBufferIO to return false, so we raise an error in that case.
*/
if (!StartLocalBufferIO(bufHdr, false))
if (!StartLocalBufferIO(bufHdr, false, false))
elog(ERROR, "failed to start write IO on local buffer");
/* Find smgr relation for buffer */
@@ -211,7 +212,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
IOOP_WRITE, io_start, 1, BLCKSZ);
/* Mark not-dirty */
TerminateLocalBufferIO(bufHdr, true, 0);
TerminateLocalBufferIO(bufHdr, true, 0, false);
pgBufferUsage.local_blks_written++;
}
@@ -430,7 +431,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
/* no need to loop for local buffers */
StartLocalBufferIO(existing_hdr, true);
StartLocalBufferIO(existing_hdr, true, false);
}
else
{
@@ -446,7 +447,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
hresult->id = victim_buf_id;
StartLocalBufferIO(victim_buf_hdr, true);
StartLocalBufferIO(victim_buf_hdr, true, false);
}
}
@@ -515,13 +516,31 @@ MarkLocalBufferDirty(Buffer buffer)
* Like StartBufferIO, but for local buffers
*/
bool
StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
{
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
uint32 buf_state;
/*
* With AIO the buffer could have IO in progress, e.g. when there are two
* scans of the same relation. Either wait for the other IO or return
* false.
*/
if (pgaio_wref_valid(&bufHdr->io_wref))
{
PgAioWaitRef iow = bufHdr->io_wref;
if (nowait)
return false;
pgaio_wref_wait(&iow);
}
/* Once we get here, there is definitely no I/O active on this buffer */
/* Check if someone else already did the I/O */
buf_state = pg_atomic_read_u32(&bufHdr->state);
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
/* someone else already did the I/O */
return false;
}
@@ -536,7 +555,8 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
* Like TerminateBufferIO, but for local buffers
*/
void
TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits)
TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
bool release_aio)
{
/* Only need to adjust flags */
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
@@ -549,12 +569,22 @@ TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bit
if (clear_dirty)
buf_state &= ~BM_DIRTY;
if (release_aio)
{
/* release pin held by IO subsystem, see also buffer_stage_common() */
Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
buf_state -= BUF_REFCOUNT_ONE;
pgaio_wref_clear(&bufHdr->io_wref);
}
buf_state |= set_flag_bits;
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
/* local buffers don't track IO using resowners */
/* local buffers don't use the IO CV, as no other process can see buffer */
/* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
}
/*
@@ -575,6 +605,19 @@ InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
uint32 buf_state;
LocalBufferLookupEnt *hresult;
/*
* It's possible that we started IO on this buffer before e.g. aborting
* the transaction that created a table. We need to wait for that IO to
* complete before removing / reusing the buffer.
*/
if (pgaio_wref_valid(&bufHdr->io_wref))
{
PgAioWaitRef iow = bufHdr->io_wref;
pgaio_wref_wait(&iow);
Assert(!pgaio_wref_valid(&bufHdr->io_wref));
}
buf_state = pg_atomic_read_u32(&bufHdr->state);
/*
@@ -714,6 +757,8 @@ InitLocalBuffers(void)
*/
buf->buf_id = -i - 2;
pgaio_wref_clear(&buf->io_wref);
/*
* Intentionally do not initialize the buffer's atomic variable
* (besides zeroing the underlying memory above). That way we get

View File

@@ -78,8 +78,8 @@ PageInit(Page page, Size pageSize, Size specialSize)
* treat such a page as empty and without free space. Eventually, VACUUM
* will clean up such a page and make it usable.
*
* If flag PIV_LOG_WARNING is set, a WARNING is logged in the event of
* a checksum failure.
* If flag PIV_LOG_WARNING/PIV_LOG_LOG is set, a WARNING/LOG message is logged
* in the event of a checksum failure.
*
* If flag PIV_IGNORE_CHECKSUM_FAILURE is set, checksum failures will cause a
* message about the failure to be emitted, but will not cause
@@ -143,13 +143,13 @@ PageIsVerified(PageData *page, BlockNumber blkno, int flags, bool *checksum_fail
return true;
/*
* Throw a WARNING if the checksum fails, but only after we've checked for
* the all-zeroes case.
* Throw a WARNING/LOG, as instructed by PIV_LOG_*, if the checksum fails,
* but only after we've checked for the all-zeroes case.
*/
if (checksum_failure)
{
if ((flags & PIV_LOG_WARNING) != 0)
ereport(WARNING,
if ((flags & (PIV_LOG_WARNING | PIV_LOG_LOG)) != 0)
ereport(flags & PIV_LOG_WARNING ? WARNING : LOG,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("page verification failed, calculated checksum %u but expected %u",
checksum, p->pd_checksum)));

View File

@@ -194,9 +194,13 @@ typedef enum PgAioHandleCallbackID
PGAIO_HCB_INVALID = 0,
PGAIO_HCB_MD_READV,
PGAIO_HCB_SHARED_BUFFER_READV,
PGAIO_HCB_LOCAL_BUFFER_READV,
} PgAioHandleCallbackID;
#define PGAIO_HCB_MAX PGAIO_HCB_MD_READV
#define PGAIO_HCB_MAX PGAIO_HCB_LOCAL_BUFFER_READV
StaticAssertDecl(PGAIO_HCB_MAX <= (1 << PGAIO_RESULT_ID_BITS),
"PGAIO_HCB_MAX is too big for PGAIO_RESULT_ID_BITS");

View File

@@ -17,6 +17,7 @@
#include "pgstat.h"
#include "port/atomics.h"
#include "storage/aio_types.h"
#include "storage/buf.h"
#include "storage/bufmgr.h"
#include "storage/condition_variable.h"
@@ -264,6 +265,8 @@ typedef struct BufferDesc
int wait_backend_pgprocno; /* backend of pin-count waiter */
int freeNext; /* link in freelist chain */
PgAioWaitRef io_wref; /* set iff AIO is in progress */
LWLock content_lock; /* to lock access to buffer contents */
} BufferDesc;
@@ -472,8 +475,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
uint32 *extended_by);
extern void MarkLocalBufferDirty(Buffer buffer);
extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
uint32 set_flag_bits);
extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput);
uint32 set_flag_bits, bool release_aio);
extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait);
extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
extern void DropRelationLocalBuffers(RelFileLocator rlocator,
ForkNumber forkNum,

View File

@@ -15,6 +15,7 @@
#define BUFMGR_H
#include "port/pg_iovec.h"
#include "storage/aio_types.h"
#include "storage/block.h"
#include "storage/buf.h"
#include "storage/bufpage.h"
@@ -111,6 +112,8 @@ typedef struct BufferManagerRelation
#define READ_BUFFERS_ZERO_ON_ERROR (1 << 0)
/* Call smgrprefetch() if I/O necessary. */
#define READ_BUFFERS_ISSUE_ADVICE (1 << 1)
/* Don't treat page as invalid due to checksum failures. */
#define READ_BUFFERS_IGNORE_CHECKSUM_FAILURES (1 << 2)
struct ReadBuffersOperation
{
@@ -170,6 +173,9 @@ extern PGDLLIMPORT int checkpoint_flush_after;
extern PGDLLIMPORT int backend_flush_after;
extern PGDLLIMPORT int bgwriter_flush_after;
extern const PgAioHandleCallbacks aio_shared_buffer_readv_cb;
extern const PgAioHandleCallbacks aio_local_buffer_readv_cb;
/* in buf_init.c */
extern PGDLLIMPORT char *BufferBlocks;

View File

@@ -467,6 +467,7 @@ do { \
/* flags for PageIsVerified() */
#define PIV_LOG_WARNING (1 << 0)
#define PIV_LOG_LOG (1 << 1)
#define PIV_IGNORE_CHECKSUM_FAILURE (1 << 2)
#define PageAddItem(page, item, size, offsetNumber, overwrite, is_heap) \