mirror of
https://github.com/postgres/postgres.git
synced 2025-07-31 22:04:40 +03:00
bufmgr: Support multiple in-progress IOs by using resowner
A future patch will add support for extending relations by multiple blocks at once. To be concurrency safe, the buffers for those blocks need to be marked as BM_IO_IN_PROGRESS. Until now we only had infrastructure for recovering from an IO error for a single buffer. This commit extends that infrastructure to multiple buffers by using the resource owner infrastructure. This commit increases the size of the ResourceOwnerData struct, which appears to have a just about measurable overhead in very extreme workloads. Medium term we are planning to substantially shrink the size of ResourceOwnerData. Short term the increase is small enough to not worry about it for now. Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Discussion: https://postgr.es/m/20221029025420.eplyow6k7tgu6he3@awork3.anarazel.de Discussion: https://postgr.es/m/20221029200025.w7bvlgvamjfo6z44@awork3.anarazel.de
This commit is contained in:
@ -159,10 +159,6 @@ int checkpoint_flush_after = DEFAULT_CHECKPOINT_FLUSH_AFTER;
|
||||
int bgwriter_flush_after = DEFAULT_BGWRITER_FLUSH_AFTER;
|
||||
int backend_flush_after = DEFAULT_BACKEND_FLUSH_AFTER;
|
||||
|
||||
/* local state for StartBufferIO and related functions */
|
||||
static BufferDesc *InProgressBuf = NULL;
|
||||
static bool IsForInput;
|
||||
|
||||
/* local state for LockBufferForCleanup */
|
||||
static BufferDesc *PinCountWaitBuf = NULL;
|
||||
|
||||
@ -2705,7 +2701,6 @@ InitBufferPoolAccess(void)
|
||||
static void
|
||||
AtProcExit_Buffers(int code, Datum arg)
|
||||
{
|
||||
AbortBufferIO();
|
||||
UnlockBuffers();
|
||||
|
||||
CheckForBufferLeaks();
|
||||
@ -4647,7 +4642,7 @@ StartBufferIO(BufferDesc *buf, bool forInput)
|
||||
{
|
||||
uint32 buf_state;
|
||||
|
||||
Assert(!InProgressBuf);
|
||||
ResourceOwnerEnlargeBufferIOs(CurrentResourceOwner);
|
||||
|
||||
for (;;)
|
||||
{
|
||||
@ -4671,8 +4666,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
|
||||
buf_state |= BM_IO_IN_PROGRESS;
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
|
||||
InProgressBuf = buf;
|
||||
IsForInput = forInput;
|
||||
ResourceOwnerRememberBufferIO(CurrentResourceOwner,
|
||||
BufferDescriptorGetBuffer(buf));
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -4698,8 +4693,6 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
|
||||
{
|
||||
uint32 buf_state;
|
||||
|
||||
Assert(buf == InProgressBuf);
|
||||
|
||||
buf_state = LockBufHdr(buf);
|
||||
|
||||
Assert(buf_state & BM_IO_IN_PROGRESS);
|
||||
@ -4711,13 +4704,14 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
|
||||
buf_state |= set_flag_bits;
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
|
||||
InProgressBuf = NULL;
|
||||
ResourceOwnerForgetBufferIO(CurrentResourceOwner,
|
||||
BufferDescriptorGetBuffer(buf));
|
||||
|
||||
ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
|
||||
}
|
||||
|
||||
/*
|
||||
* AbortBufferIO: Clean up any active buffer I/O after an error.
|
||||
* AbortBufferIO: Clean up active buffer I/O after an error.
|
||||
*
|
||||
* All LWLocks we might have held have been released,
|
||||
* but we haven't yet released buffer pins, so the buffer is still pinned.
|
||||
@ -4726,46 +4720,42 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
|
||||
* possible the error condition wasn't related to the I/O.
|
||||
*/
|
||||
void
|
||||
AbortBufferIO(void)
|
||||
AbortBufferIO(Buffer buf)
|
||||
{
|
||||
BufferDesc *buf = InProgressBuf;
|
||||
BufferDesc *buf_hdr = GetBufferDescriptor(buf - 1);
|
||||
uint32 buf_state;
|
||||
|
||||
if (buf)
|
||||
buf_state = LockBufHdr(buf_hdr);
|
||||
Assert(buf_state & (BM_IO_IN_PROGRESS | BM_TAG_VALID));
|
||||
|
||||
if (!(buf_state & BM_VALID))
|
||||
{
|
||||
uint32 buf_state;
|
||||
|
||||
buf_state = LockBufHdr(buf);
|
||||
Assert(buf_state & BM_IO_IN_PROGRESS);
|
||||
if (IsForInput)
|
||||
{
|
||||
Assert(!(buf_state & BM_DIRTY));
|
||||
|
||||
/* We'd better not think buffer is valid yet */
|
||||
Assert(!(buf_state & BM_VALID));
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(buf_state & BM_DIRTY);
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
/* Issue notice if this is not the first failure... */
|
||||
if (buf_state & BM_IO_ERROR)
|
||||
{
|
||||
/* Buffer is pinned, so we can read tag without spinlock */
|
||||
char *path;
|
||||
|
||||
path = relpathperm(BufTagGetRelFileLocator(&buf->tag),
|
||||
BufTagGetForkNum(&buf->tag));
|
||||
ereport(WARNING,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("could not write block %u of %s",
|
||||
buf->tag.blockNum, path),
|
||||
errdetail("Multiple failures --- write error might be permanent.")));
|
||||
pfree(path);
|
||||
}
|
||||
}
|
||||
TerminateBufferIO(buf, false, BM_IO_ERROR);
|
||||
Assert(!(buf_state & BM_DIRTY));
|
||||
UnlockBufHdr(buf_hdr, buf_state);
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(!(buf_state & BM_DIRTY));
|
||||
UnlockBufHdr(buf_hdr, buf_state);
|
||||
|
||||
/* Issue notice if this is not the first failure... */
|
||||
if (buf_state & BM_IO_ERROR)
|
||||
{
|
||||
/* Buffer is pinned, so we can read tag without spinlock */
|
||||
char *path;
|
||||
|
||||
path = relpathperm(BufTagGetRelFileLocator(&buf_hdr->tag),
|
||||
BufTagGetForkNum(&buf_hdr->tag));
|
||||
ereport(WARNING,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("could not write block %u of %s",
|
||||
buf_hdr->tag.blockNum, path),
|
||||
errdetail("Multiple failures --- write error might be permanent.")));
|
||||
pfree(path);
|
||||
}
|
||||
}
|
||||
|
||||
TerminateBufferIO(buf_hdr, false, BM_IO_ERROR);
|
||||
}
|
||||
|
||||
/*
|
||||
|
Reference in New Issue
Block a user