1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-09 17:03:00 +03:00

Get rid of WALBufMappingLock

Allow multiple backends to initialize WAL buffers concurrently.  This way
`MemSet((char *) NewPage, 0, XLOG_BLCKSZ);` can run in parallel without
taking a single LWLock in exclusive mode.

The new algorithm works as follows:
 * reserve a page for initialization using XLogCtl->InitializeReserved,
 * ensure the page is written out,
 * once the page is initialized, try to advance XLogCtl->InitializedUpTo and
   signal to waiters using XLogCtl->InitializedUpToCondVar condition
   variable,
 * repeat previous steps until we reserve initialization up to the target
   WAL position,
 * wait until concurrent initialization finishes using a
   XLogCtl->InitializedUpToCondVar.

Now, multiple backends can, in parallel, concurrently reserve pages,
initialize them, and advance XLogCtl->InitializedUpTo to point to the latest
initialized page.

Author: Yura Sokolov <y.sokolov@postgrespro.ru>
Co-authored-by: Alexander Korotkov <aekorotkov@gmail.com>
Reviewed-by: Pavel Borisov <pashkin.elfe@gmail.com>
Reviewed-by: Tomas Vondra <tomas@vondra.me>
Tested-by: Michael Paquier <michael@paquier.xyz>
This commit is contained in:
Alexander Korotkov
2025-04-02 12:44:24 +03:00
parent b53b88109f
commit bc22dc0e0d
3 changed files with 189 additions and 49 deletions

View File

@@ -302,11 +302,6 @@ static bool doPageWrites;
* so it's a plain spinlock. The other locks are held longer (potentially * so it's a plain spinlock. The other locks are held longer (potentially
* over I/O operations), so we use LWLocks for them. These locks are: * over I/O operations), so we use LWLocks for them. These locks are:
* *
* WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
* It is only held while initializing and changing the mapping. If the
* contents of the buffer being replaced haven't been written yet, the mapping
* lock is released while the write is done, and reacquired afterwards.
*
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
* XLogFlush). * XLogFlush).
* *
@@ -473,21 +468,37 @@ typedef struct XLogCtlData
pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
/* /*
* Latest initialized page in the cache (last byte position + 1). * First initialized page in the cache (first byte position).
*/
XLogRecPtr InitializedFrom;
/*
* Latest reserved for inititalization page in the cache (last byte
* position + 1).
* *
* To change the identity of a buffer (and InitializedUpTo), you need to * To change the identity of a buffer, you need to advance
* hold WALBufMappingLock. To change the identity of a buffer that's * InitializeReserved first. To change the identity of a buffer that's
* still dirty, the old page needs to be written out first, and for that * still dirty, the old page needs to be written out first, and for that
* you need WALWriteLock, and you need to ensure that there are no * you need WALWriteLock, and you need to ensure that there are no
* in-progress insertions to the page by calling * in-progress insertions to the page by calling
* WaitXLogInsertionsToFinish(). * WaitXLogInsertionsToFinish().
*/ */
XLogRecPtr InitializedUpTo; pg_atomic_uint64 InitializeReserved;
/*
* Latest initialized page in the cache (last byte position + 1).
*
* InitializedUpTo is updated after the buffer initialization. After
* update, waiters got notification using InitializedUpToCondVar.
*/
pg_atomic_uint64 InitializedUpTo;
ConditionVariable InitializedUpToCondVar;
/* /*
* These values do not change after startup, although the pointed-to pages * These values do not change after startup, although the pointed-to pages
* and xlblocks values certainly do. xlblocks values are protected by * and xlblocks values certainly do. xlblocks values are changed
* WALBufMappingLock. * lock-free according to the check for the xlog write position and are
* accompanied by changes of InitializeReserved and InitializedUpTo.
*/ */
char *pages; /* buffers for unwritten XLOG pages */ char *pages; /* buffers for unwritten XLOG pages */
pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -810,9 +821,9 @@ XLogInsertRecord(XLogRecData *rdata,
* fullPageWrites from changing until the insertion is finished. * fullPageWrites from changing until the insertion is finished.
* *
* Step 2 can usually be done completely in parallel. If the required WAL * Step 2 can usually be done completely in parallel. If the required WAL
* page is not initialized yet, you have to grab WALBufMappingLock to * page is not initialized yet, you have to go through AdvanceXLInsertBuffer,
* initialize it, but the WAL writer tries to do that ahead of insertions * which will ensure it is initialized. But the WAL writer tries to do that
* to avoid that from happening in the critical path. * ahead of insertions to avoid that from happening in the critical path.
* *
*---------- *----------
*/ */
@@ -1991,32 +2002,79 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr;
XLogRecPtr NewPageBeginPtr; XLogRecPtr NewPageBeginPtr;
XLogPageHeader NewPage; XLogPageHeader NewPage;
XLogRecPtr ReservedPtr;
int npages pg_attribute_unused() = 0; int npages pg_attribute_unused() = 0;
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
/* /*
* Now that we have the lock, check if someone initialized the page * We must run the loop below inside the critical section as we expect
* already. * XLogCtl->InitializedUpTo to eventually keep up. The most of callers
* already run inside the critical section. Except for WAL writer, which
* passed 'opportunistic == true', and therefore we don't perform
* operations that could error out.
*
* Start an explicit critical section anyway though.
*/ */
while (upto >= XLogCtl->InitializedUpTo || opportunistic) Assert(CritSectionCount > 0 || opportunistic);
START_CRIT_SECTION();
/*--
* Loop till we get all the pages in WAL buffer before 'upto' reserved for
* initialization. Multiple process can initialize different buffers with
* this loop in parallel as following.
*
* 1. Reserve page for initialization using XLogCtl->InitializeReserved.
* 2. Initialize the reserved page.
* 3. Attempt to advance XLogCtl->InitializedUpTo,
*/
ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved);
while (upto >= ReservedPtr || opportunistic)
{ {
nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo); Assert(ReservedPtr % XLOG_BLCKSZ == 0);
/* /*
* Get ending-offset of the buffer page we need to replace (this may * Get ending-offset of the buffer page we need to replace.
* be zero if the buffer hasn't been used yet). Fall through if it's *
* already written out. * We don't lookup into xlblocks, but rather calculate position we
* must wait to be written. If it was written, xlblocks will have this
* position (or uninitialized)
*/ */
OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]); if (ReservedPtr + XLOG_BLCKSZ > XLogCtl->InitializedFrom + XLOG_BLCKSZ * XLOGbuffers)
OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - (XLogRecPtr) XLOG_BLCKSZ * XLOGbuffers;
else
OldPageRqstPtr = InvalidXLogRecPtr;
if (LogwrtResult.Write < OldPageRqstPtr && opportunistic)
{
/*
* If we just want to pre-initialize as much as we can without
* flushing, give up now.
*/
upto = ReservedPtr - 1;
break;
}
/*
* Attempt to reserve the page for initialization. Failure means that
* this page got reserved by another process.
*/
if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved,
&ReservedPtr,
ReservedPtr + XLOG_BLCKSZ))
continue;
/*
* Wait till page gets correctly initialized up to OldPageRqstPtr.
*/
nextidx = XLogRecPtrToBufIdx(ReservedPtr);
while (pg_atomic_read_u64(&XLogCtl->InitializedUpTo) < OldPageRqstPtr)
ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
ConditionVariableCancelSleep();
Assert(pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) == OldPageRqstPtr);
/* Fall through if it's already written out. */
if (LogwrtResult.Write < OldPageRqstPtr) if (LogwrtResult.Write < OldPageRqstPtr)
{ {
/* /* Nope, got work to do. */
* Nope, got work to do. If we just want to pre-initialize as much
* as we can without flushing, give up now.
*/
if (opportunistic)
break;
/* Advance shared memory write request position */ /* Advance shared memory write request position */
SpinLockAcquire(&XLogCtl->info_lck); SpinLockAcquire(&XLogCtl->info_lck);
@@ -2031,14 +2089,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
RefreshXLogWriteResult(LogwrtResult); RefreshXLogWriteResult(LogwrtResult);
if (LogwrtResult.Write < OldPageRqstPtr) if (LogwrtResult.Write < OldPageRqstPtr)
{ {
/*
* Must acquire write lock. Release WALBufMappingLock first,
* to make sure that all insertions that we need to wait for
* can finish (up to this same position). Otherwise we risk
* deadlock.
*/
LWLockRelease(WALBufMappingLock);
WaitXLogInsertionsToFinish(OldPageRqstPtr); WaitXLogInsertionsToFinish(OldPageRqstPtr);
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
@@ -2060,9 +2110,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
pgWalUsage.wal_buffers_full++; pgWalUsage.wal_buffers_full++;
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
} }
/* Re-acquire WALBufMappingLock and retry */
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
continue;
} }
} }
@@ -2070,11 +2117,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
* Now the next buffer slot is free and we can set it up to be the * Now the next buffer slot is free and we can set it up to be the
* next output page. * next output page.
*/ */
NewPageBeginPtr = XLogCtl->InitializedUpTo; NewPageBeginPtr = ReservedPtr;
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
/* /*
@@ -2138,12 +2183,100 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
*/ */
pg_write_barrier(); pg_write_barrier();
/*-----
* Update the value of XLogCtl->xlblocks[nextidx] and try to advance
* XLogCtl->InitializedUpTo in a lock-less manner.
*
* First, let's provide a formal proof of the algorithm. Let it be 'n'
* process with the following variables in shared memory:
* f - an array of 'n' boolean flags,
* v - atomic integer variable.
*
* Also, let
* i - a number of a process,
* j - local integer variable,
* CAS(var, oldval, newval) - compare-and-swap atomic operation
* returning true on success,
* write_barrier()/read_barrier() - memory barriers.
*
* The pseudocode for each process is the following.
*
* j := i
* f[i] := true
* write_barrier()
* while CAS(v, j, j + 1):
* j := j + 1
* read_barrier()
* if not f[j]:
* break
*
* Let's prove that v eventually reaches the value of n.
* 1. Prove by contradiction. Assume v doesn't reach n and stucks
* on k, where k < n.
* 2. Process k attempts CAS(v, k, k + 1). 1). If, as we assumed, v
* gets stuck at k, then this CAS operation must fail. Therefore,
* v < k when process k attempts CAS(v, k, k + 1).
* 3. If, as we assumed, v gets stuck at k, then the value k of v
* must be achieved by some process m, where m < k. The process
* m must observe f[k] == false. Otherwise, it will later attempt
* CAS(v, k, k + 1) with success.
* 4. Therefore, corresponding read_barrier() (while j == k) on
* process m happend before write_barrier() of process k. But then
* process k attempts CAS(v, k, k + 1) after process m successfully
* incremented v to k, and that CAS operation must succeed.
* That leads to a contradiction. So, there is no such k (k < n)
* where v gets stuck. Q.E.D.
*
* To apply this proof to the code below, we assume
* XLogCtl->InitializedUpTo will play the role of v with XLOG_BLCKSZ
* granularity. We also assume setting XLogCtl->xlblocks[nextidx] to
* NewPageEndPtr to play the role of setting f[i] to true. Also, note
* that processes can't concurrently map different xlog locations to
* the same nextidx because we previously requested that
* XLogCtl->InitializedUpTo >= OldPageRqstPtr. So, a xlog buffer can
* be taken for initialization only once the previous initialization
* takes effect on XLogCtl->InitializedUpTo.
*/
pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr); pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr);
XLogCtl->InitializedUpTo = NewPageEndPtr;
pg_write_barrier();
while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr))
{
NewPageBeginPtr = NewPageEndPtr;
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr);
pg_read_barrier();
if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) != NewPageEndPtr)
{
/*
* Page at nextidx wasn't initialized yet, so we cann't move
* InitializedUpto further. It will be moved by backend which
* will initialize nextidx.
*/
ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar);
break;
}
}
npages++; npages++;
} }
LWLockRelease(WALBufMappingLock);
END_CRIT_SECTION();
/*
* All the pages in WAL buffer before 'upto' were reserved for
* initialization. However, some pages might be reserved by concurrent
* processes. Wait till they finish initialization.
*/
while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo))
ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
ConditionVariableCancelSleep();
pg_read_barrier();
#ifdef WAL_DEBUG #ifdef WAL_DEBUG
if (XLOG_DEBUG && npages > 0) if (XLOG_DEBUG && npages > 0)
@@ -5071,6 +5204,10 @@ XLOGShmemInit(void)
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr);
ConditionVariableInit(&XLogCtl->InitializedUpToCondVar);
} }
/* /*
@@ -6090,7 +6227,8 @@ StartupXLOG(void)
memset(page + len, 0, XLOG_BLCKSZ - len); memset(page + len, 0, XLOG_BLCKSZ - len);
pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ; pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
XLogCtl->InitializedFrom = endOfRecoveryInfo->lastPageBeginPtr;
} }
else else
{ {
@@ -6099,8 +6237,10 @@ StartupXLOG(void)
* let the first attempt to insert a log record to initialize the next * let the first attempt to insert a log record to initialize the next
* buffer. * buffer.
*/ */
XLogCtl->InitializedUpTo = EndOfLog; pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog);
XLogCtl->InitializedFrom = EndOfLog;
} }
pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo));
/* /*
* Update local and shared status. This is OK to do without any locks * Update local and shared status. This is OK to do without any locks

View File

@@ -156,6 +156,7 @@ REPLICATION_SLOT_DROP "Waiting for a replication slot to become inactive so it c
RESTORE_COMMAND "Waiting for <xref linkend="guc-restore-command"/> to complete." RESTORE_COMMAND "Waiting for <xref linkend="guc-restore-command"/> to complete."
SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction." SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction."
SYNC_REP "Waiting for confirmation from a remote server during synchronous replication." SYNC_REP "Waiting for confirmation from a remote server during synchronous replication."
WAL_BUFFER_INIT "Waiting on WAL buffer to be initialized."
WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit."
WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication."
WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated."
@@ -314,7 +315,6 @@ XidGen "Waiting to allocate a new transaction ID."
ProcArray "Waiting to access the shared per-process data structures (typically, to get a snapshot or report a session's transaction ID)." ProcArray "Waiting to access the shared per-process data structures (typically, to get a snapshot or report a session's transaction ID)."
SInvalRead "Waiting to retrieve messages from the shared catalog invalidation queue." SInvalRead "Waiting to retrieve messages from the shared catalog invalidation queue."
SInvalWrite "Waiting to add a message to the shared catalog invalidation queue." SInvalWrite "Waiting to add a message to the shared catalog invalidation queue."
WALBufMapping "Waiting to replace a page in WAL buffers."
WALWrite "Waiting for WAL buffers to be written to disk." WALWrite "Waiting for WAL buffers to be written to disk."
ControlFile "Waiting to read or update the <filename>pg_control</filename> file or create a new WAL file." ControlFile "Waiting to read or update the <filename>pg_control</filename> file or create a new WAL file."
MultiXactGen "Waiting to read or update shared multixact state." MultiXactGen "Waiting to read or update shared multixact state."

View File

@@ -37,7 +37,7 @@ PG_LWLOCK(3, XidGen)
PG_LWLOCK(4, ProcArray) PG_LWLOCK(4, ProcArray)
PG_LWLOCK(5, SInvalRead) PG_LWLOCK(5, SInvalRead)
PG_LWLOCK(6, SInvalWrite) PG_LWLOCK(6, SInvalWrite)
PG_LWLOCK(7, WALBufMapping) /* 7 was WALBufMapping */
PG_LWLOCK(8, WALWrite) PG_LWLOCK(8, WALWrite)
PG_LWLOCK(9, ControlFile) PG_LWLOCK(9, ControlFile)
/* 10 was CheckpointLock */ /* 10 was CheckpointLock */