1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-30 21:42:05 +03:00

Get rid of WALBufMappingLock

Allow multiple backends to initialize WAL buffers concurrently.  This way
`MemSet((char *) NewPage, 0, XLOG_BLCKSZ);` can run in parallel without
taking a single LWLock in exclusive mode.

The new algorithm works as follows:
 * reserve a page for initialization using XLogCtl->InitializeReserved,
 * ensure the page is written out,
 * once the page is initialized, try to advance XLogCtl->InitializedUpTo and
   signal to waiters using XLogCtl->InitializedUpToCondVar condition
   variable,
 * repeat previous steps until we reserve initialization up to the target
   WAL position,
 * wait until concurrent initialization finishes using a
   XLogCtl->InitializedUpToCondVar.

Now, multiple backends can, in parallel, concurrently reserve pages,
initialize them, and advance XLogCtl->InitializedUpTo to point to the latest
initialized page.

Author: Yura Sokolov <y.sokolov@postgrespro.ru>
Co-authored-by: Alexander Korotkov <aekorotkov@gmail.com>
Reviewed-by: Pavel Borisov <pashkin.elfe@gmail.com>
This commit is contained in:
Alexander Korotkov
2025-02-17 04:19:01 +02:00
parent fbc0fe9a2e
commit 6a2275b895
3 changed files with 132 additions and 48 deletions

View File

@ -302,11 +302,6 @@ static bool doPageWrites;
* so it's a plain spinlock. The other locks are held longer (potentially * so it's a plain spinlock. The other locks are held longer (potentially
* over I/O operations), so we use LWLocks for them. These locks are: * over I/O operations), so we use LWLocks for them. These locks are:
* *
* WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
* It is only held while initializing and changing the mapping. If the
* contents of the buffer being replaced haven't been written yet, the mapping
* lock is released while the write is done, and reacquired afterwards.
*
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
* XLogFlush). * XLogFlush).
* *
@ -473,21 +468,32 @@ typedef struct XLogCtlData
pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
/* /*
* Latest initialized page in the cache (last byte position + 1). * Latest reserved for inititalization page in the cache (last byte
* position + 1).
* *
* To change the identity of a buffer (and InitializedUpTo), you need to * To change the identity of a buffer, you need to advance
* hold WALBufMappingLock. To change the identity of a buffer that's * InitializeReserved first. To change the identity of a buffer that's
* still dirty, the old page needs to be written out first, and for that * still dirty, the old page needs to be written out first, and for that
* you need WALWriteLock, and you need to ensure that there are no * you need WALWriteLock, and you need to ensure that there are no
* in-progress insertions to the page by calling * in-progress insertions to the page by calling
* WaitXLogInsertionsToFinish(). * WaitXLogInsertionsToFinish().
*/ */
XLogRecPtr InitializedUpTo; pg_atomic_uint64 InitializeReserved;
/*
* Latest initialized page in the cache (last byte position + 1).
*
* InitializedUpTo is updated after the buffer initialization. After
* update, waiters got notification using InitializedUpToCondVar.
*/
pg_atomic_uint64 InitializedUpTo;
ConditionVariable InitializedUpToCondVar;
/* /*
* These values do not change after startup, although the pointed-to pages * These values do not change after startup, although the pointed-to pages
* and xlblocks values certainly do. xlblocks values are protected by * and xlblocks values certainly do. xlblocks values are changed
* WALBufMappingLock. * lock-free according to the check for the xlog write position and are
* accompanied by changes of InitializeReserved and InitializedUpTo.
*/ */
char *pages; /* buffers for unwritten XLOG pages */ char *pages; /* buffers for unwritten XLOG pages */
pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
@ -810,9 +816,9 @@ XLogInsertRecord(XLogRecData *rdata,
* fullPageWrites from changing until the insertion is finished. * fullPageWrites from changing until the insertion is finished.
* *
* Step 2 can usually be done completely in parallel. If the required WAL * Step 2 can usually be done completely in parallel. If the required WAL
* page is not initialized yet, you have to grab WALBufMappingLock to * page is not initialized yet, you have to go through AdvanceXLInsertBuffer,
* initialize it, but the WAL writer tries to do that ahead of insertions * which will ensure it is initialized. But the WAL writer tries to do that
* to avoid that from happening in the critical path. * ahead of insertions to avoid that from happening in the critical path.
* *
*---------- *----------
*/ */
@ -1991,32 +1997,70 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr;
XLogRecPtr NewPageBeginPtr; XLogRecPtr NewPageBeginPtr;
XLogPageHeader NewPage; XLogPageHeader NewPage;
XLogRecPtr ReservedPtr;
int npages pg_attribute_unused() = 0; int npages pg_attribute_unused() = 0;
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
/* /*
* Now that we have the lock, check if someone initialized the page * We must run the loop below inside the critical section as we expect
* already. * XLogCtl->InitializedUpTo to eventually keep up. The most of callers
* already run inside the critical section. Except for WAL writer, which
* passed 'opportunistic == true', and therefore we don't perform
* operations that could error out.
*
* Start an explicit critical section anyway though.
*/ */
while (upto >= XLogCtl->InitializedUpTo || opportunistic) Assert(CritSectionCount > 0 || opportunistic);
START_CRIT_SECTION();
/*--
* Loop till we get all the pages in WAL buffer before 'upto' reserved for
* initialization. Multiple process can initialize different buffers with
* this loop in parallel as following.
*
* 1. Reserve page for initialization using XLogCtl->InitializeReserved.
* 2. Initialize the reserved page.
* 3. Attempt to advance XLogCtl->InitializedUpTo,
*/
ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved);
while (upto >= ReservedPtr || opportunistic)
{ {
nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo); Assert(ReservedPtr % XLOG_BLCKSZ == 0);
/* /*
* Get ending-offset of the buffer page we need to replace (this may * Get ending-offset of the buffer page we need to replace.
* be zero if the buffer hasn't been used yet). Fall through if it's *
* already written out. * We don't lookup into xlblocks, but rather calculate position we
* must wait to be written. If it was written, xlblocks will have this
* position (or uninitialized)
*/ */
OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]); if (ReservedPtr + XLOG_BLCKSZ > XLOG_BLCKSZ * XLOGbuffers)
if (LogwrtResult.Write < OldPageRqstPtr) OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - XLOG_BLCKSZ * XLOGbuffers;
else
OldPageRqstPtr = InvalidXLogRecPtr;
if (LogwrtResult.Write < OldPageRqstPtr && opportunistic)
{ {
/* /*
* Nope, got work to do. If we just want to pre-initialize as much * If we just want to pre-initialize as much as we can without
* as we can without flushing, give up now. * flushing, give up now.
*/ */
if (opportunistic) upto = ReservedPtr - 1;
break; break;
}
/*
* Attempt to reserve the page for initialization. Failure means that
* this page got reserved by another process.
*/
if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved,
&ReservedPtr,
ReservedPtr + XLOG_BLCKSZ))
continue;
/* Fall through if it's already written out. */
if (LogwrtResult.Write < OldPageRqstPtr)
{
/* Nope, got work to do. */
/* Advance shared memory write request position */ /* Advance shared memory write request position */
SpinLockAcquire(&XLogCtl->info_lck); SpinLockAcquire(&XLogCtl->info_lck);
@ -2031,14 +2075,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
RefreshXLogWriteResult(LogwrtResult); RefreshXLogWriteResult(LogwrtResult);
if (LogwrtResult.Write < OldPageRqstPtr) if (LogwrtResult.Write < OldPageRqstPtr)
{ {
/*
* Must acquire write lock. Release WALBufMappingLock first,
* to make sure that all insertions that we need to wait for
* can finish (up to this same position). Otherwise we risk
* deadlock.
*/
LWLockRelease(WALBufMappingLock);
WaitXLogInsertionsToFinish(OldPageRqstPtr); WaitXLogInsertionsToFinish(OldPageRqstPtr);
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
@ -2060,9 +2096,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
PendingWalStats.wal_buffers_full++; PendingWalStats.wal_buffers_full++;
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
} }
/* Re-acquire WALBufMappingLock and retry */
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
continue;
} }
} }
@ -2070,10 +2103,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
* Now the next buffer slot is free and we can set it up to be the * Now the next buffer slot is free and we can set it up to be the
* next output page. * next output page.
*/ */
NewPageBeginPtr = XLogCtl->InitializedUpTo; NewPageBeginPtr = ReservedPtr;
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
nextidx = XLogRecPtrToBufIdx(ReservedPtr);
Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx); #ifdef USE_ASSERT_CHECKING
{
XLogRecPtr storedBound = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
Assert(storedBound == OldPageRqstPtr || storedBound == InvalidXLogRecPtr);
}
#endif
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
@ -2139,11 +2179,50 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
pg_write_barrier(); pg_write_barrier();
pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr); pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr);
XLogCtl->InitializedUpTo = NewPageEndPtr;
/*
* Try to advance XLogCtl->InitializedUpTo.
*
* If the CAS operation failed, then some of previous pages are not
* initialized yet, and this backend gives up.
*
* Since initializer of next page might give up on advancing of
* InitializedUpTo, this backend have to attempt advancing until it
* find page "in the past" or concurrent backend succeeded at
* advancing. When we finish advancing XLogCtl->InitializedUpTo, we
* notify all the waiters with XLogCtl->InitializedUpToCondVar.
*/
while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr))
{
NewPageBeginPtr = NewPageEndPtr;
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr);
if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) != NewPageEndPtr)
{
/*
* Page at nextidx wasn't initialized yet, so we cann't move
* InitializedUpto further. It will be moved by backend which
* will initialize nextidx.
*/
ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar);
break;
}
}
npages++; npages++;
} }
LWLockRelease(WALBufMappingLock);
END_CRIT_SECTION();
/*
* All the pages in WAL buffer before 'upto' were reserved for
* initialization. However, some pages might be reserved by concurrent
* processes. Wait till they finish initialization.
*/
while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo))
ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
ConditionVariableCancelSleep();
#ifdef WAL_DEBUG #ifdef WAL_DEBUG
if (XLOG_DEBUG && npages > 0) if (XLOG_DEBUG && npages > 0)
@ -5044,6 +5123,10 @@ XLOGShmemInit(void)
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr);
ConditionVariableInit(&XLogCtl->InitializedUpToCondVar);
} }
/* /*
@ -6063,7 +6146,7 @@ StartupXLOG(void)
memset(page + len, 0, XLOG_BLCKSZ - len); memset(page + len, 0, XLOG_BLCKSZ - len);
pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ; pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
} }
else else
{ {
@ -6072,8 +6155,9 @@ StartupXLOG(void)
* let the first attempt to insert a log record to initialize the next * let the first attempt to insert a log record to initialize the next
* buffer. * buffer.
*/ */
XLogCtl->InitializedUpTo = EndOfLog; pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog);
} }
pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo));
/* /*
* Update local and shared status. This is OK to do without any locks * Update local and shared status. This is OK to do without any locks

View File

@ -155,6 +155,7 @@ REPLICATION_SLOT_DROP "Waiting for a replication slot to become inactive so it c
RESTORE_COMMAND "Waiting for <xref linkend="guc-restore-command"/> to complete." RESTORE_COMMAND "Waiting for <xref linkend="guc-restore-command"/> to complete."
SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction." SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction."
SYNC_REP "Waiting for confirmation from a remote server during synchronous replication." SYNC_REP "Waiting for confirmation from a remote server during synchronous replication."
WAL_BUFFER_INIT "Waiting on WAL buffer to be initialized."
WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit."
WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication."
WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated."
@ -310,7 +311,6 @@ XidGen "Waiting to allocate a new transaction ID."
ProcArray "Waiting to access the shared per-process data structures (typically, to get a snapshot or report a session's transaction ID)." ProcArray "Waiting to access the shared per-process data structures (typically, to get a snapshot or report a session's transaction ID)."
SInvalRead "Waiting to retrieve messages from the shared catalog invalidation queue." SInvalRead "Waiting to retrieve messages from the shared catalog invalidation queue."
SInvalWrite "Waiting to add a message to the shared catalog invalidation queue." SInvalWrite "Waiting to add a message to the shared catalog invalidation queue."
WALBufMapping "Waiting to replace a page in WAL buffers."
WALWrite "Waiting for WAL buffers to be written to disk." WALWrite "Waiting for WAL buffers to be written to disk."
ControlFile "Waiting to read or update the <filename>pg_control</filename> file or create a new WAL file." ControlFile "Waiting to read or update the <filename>pg_control</filename> file or create a new WAL file."
MultiXactGen "Waiting to read or update shared multixact state." MultiXactGen "Waiting to read or update shared multixact state."

View File

@ -37,7 +37,7 @@ PG_LWLOCK(3, XidGen)
PG_LWLOCK(4, ProcArray) PG_LWLOCK(4, ProcArray)
PG_LWLOCK(5, SInvalRead) PG_LWLOCK(5, SInvalRead)
PG_LWLOCK(6, SInvalWrite) PG_LWLOCK(6, SInvalWrite)
PG_LWLOCK(7, WALBufMapping) /* 7 was WALBufMapping */
PG_LWLOCK(8, WALWrite) PG_LWLOCK(8, WALWrite)
PG_LWLOCK(9, ControlFile) PG_LWLOCK(9, ControlFile)
/* 10 was CheckpointLock */ /* 10 was CheckpointLock */