1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-05 07:41:25 +03:00

Operate XLogCtl->log{Write,Flush}Result with atomics

This removes the need to hold both the info_lck spinlock and
WALWriteLock to update them.  We use stock atomic write instead, with
WALWriteLock held.  Readers can use atomic read, without any locking.

This allows for some code to be reordered: some places were a bit
contorted to avoid repeated spinlock acquisition, but that's no longer a
concern, so we can turn them to more natural coding.  Some further
changes are possible (maybe to performance wins), but in this commit I
did rather minimal ones only, to avoid increasing the blast radius.

Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: Jeff Davis <pgsql@j-davis.com>
Reviewed-by: Andres Freund <andres@anarazel.de> (earlier versions)
Discussion: https://postgr.es/m/20200831182156.GA3983@alvherre.pgsql
This commit is contained in:
Alvaro Herrera
2024-04-05 16:14:39 +02:00
parent 6f132ed693
commit ee1cbe806d

View File

@@ -292,12 +292,7 @@ static bool doPageWrites;
* LogwrtRqst indicates a byte position that we need to write and/or fsync * LogwrtRqst indicates a byte position that we need to write and/or fsync
* the log up to (all records before that point must be written or fsynced). * the log up to (all records before that point must be written or fsynced).
* The positions already written/fsynced are maintained in logWriteResult * The positions already written/fsynced are maintained in logWriteResult
* and logFlushResult. * and logFlushResult using atomic access.
*
* To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
* info_lck or WALWriteLock. To update them, you need to hold both locks.
* The point of this arrangement is that the value can be examined by code
* that already holds WALWriteLock without needing to grab info_lck as well.
* In addition to the shared variable, each backend has a private copy of * In addition to the shared variable, each backend has a private copy of
* both in LogwrtResult, which is updated when convenient. * both in LogwrtResult, which is updated when convenient.
* *
@@ -473,12 +468,9 @@ typedef struct XLogCtlData
pg_time_t lastSegSwitchTime; pg_time_t lastSegSwitchTime;
XLogRecPtr lastSegSwitchLSN; XLogRecPtr lastSegSwitchLSN;
/* /* These are accessed using atomics -- info_lck not needed */
* Protected by info_lck and WALWriteLock (you must hold either lock to pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */
* read it, but both to update) pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
*/
XLogRecPtr logWriteResult; /* last byte + 1 written out */
XLogRecPtr logFlushResult; /* last byte + 1 flushed */
/* /*
* Latest initialized page in the cache (last byte position + 1). * Latest initialized page in the cache (last byte position + 1).
@@ -616,11 +608,15 @@ static XLogwrtResult LogwrtResult = {0, 0};
/* /*
* Update local copy of shared XLogCtl->log{Write,Flush}Result * Update local copy of shared XLogCtl->log{Write,Flush}Result
*
* It's critical that Flush always trails Write, so the order of the reads is
* important, as is the barrier. See also XLogWrite.
*/ */
#define RefreshXLogWriteResult(_target) \ #define RefreshXLogWriteResult(_target) \
do { \ do { \
_target.Write = XLogCtl->logWriteResult; \ _target.Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); \
_target.Flush = XLogCtl->logFlushResult; \ pg_read_barrier(); \
_target.Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); \
} while (0) } while (0)
/* /*
@@ -968,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata,
/* advance global request to include new block(s) */ /* advance global request to include new block(s) */
if (XLogCtl->LogwrtRqst.Write < EndPos) if (XLogCtl->LogwrtRqst.Write < EndPos)
XLogCtl->LogwrtRqst.Write = EndPos; XLogCtl->LogwrtRqst.Write = EndPos;
/* update local result copy while I have the chance */
RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck); SpinLockRelease(&XLogCtl->info_lck);
RefreshXLogWriteResult(LogwrtResult);
} }
/* /*
@@ -1989,17 +1984,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
if (opportunistic) if (opportunistic)
break; break;
/* Before waiting, get info_lck and update LogwrtResult */ /* Advance shared memory write request position */
SpinLockAcquire(&XLogCtl->info_lck); SpinLockAcquire(&XLogCtl->info_lck);
if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck); SpinLockRelease(&XLogCtl->info_lck);
/* /*
* Now that we have an up-to-date LogwrtResult value, see if we * Acquire an up-to-date LogwrtResult value and see if we still
* still need to write it or if someone else already did. * need to write it or if someone else already did.
*/ */
RefreshXLogWriteResult(LogwrtResult);
if (LogwrtResult.Write < OldPageRqstPtr) if (LogwrtResult.Write < OldPageRqstPtr)
{ {
/* /*
@@ -2556,16 +2551,35 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
* 'result' values. This is not absolutely essential, but it saves some * 'result' values. This is not absolutely essential, but it saves some
* code in a couple of places. * code in a couple of places.
*/ */
SpinLockAcquire(&XLogCtl->info_lck);
if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
SpinLockRelease(&XLogCtl->info_lck);
/*
* We write Write first, bar, then Flush. When reading, the opposite must
* be done (with a matching barrier in between), so that we always see a
* Flush value that trails behind the Write value seen.
*/
pg_atomic_write_u64(&XLogCtl->logWriteResult, LogwrtResult.Write);
pg_write_barrier();
pg_atomic_write_u64(&XLogCtl->logFlushResult, LogwrtResult.Flush);
#ifdef USE_ASSERT_CHECKING
{ {
SpinLockAcquire(&XLogCtl->info_lck); XLogRecPtr Flush;
XLogCtl->logWriteResult = LogwrtResult.Write; XLogRecPtr Write;
XLogCtl->logFlushResult = LogwrtResult.Flush;
if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; pg_read_barrier();
if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
SpinLockRelease(&XLogCtl->info_lck); /* WAL written to disk is always ahead of WAL flushed */
Assert(Write >= Flush);
} }
#endif
} }
/* /*
@@ -2582,7 +2596,6 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
XLogRecPtr prevAsyncXactLSN; XLogRecPtr prevAsyncXactLSN;
SpinLockAcquire(&XLogCtl->info_lck); SpinLockAcquire(&XLogCtl->info_lck);
RefreshXLogWriteResult(LogwrtResult);
sleeping = XLogCtl->WalWriterSleeping; sleeping = XLogCtl->WalWriterSleeping;
prevAsyncXactLSN = XLogCtl->asyncXactLSN; prevAsyncXactLSN = XLogCtl->asyncXactLSN;
if (XLogCtl->asyncXactLSN < asyncXactLSN) if (XLogCtl->asyncXactLSN < asyncXactLSN)
@@ -2608,6 +2621,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
{ {
int flushblocks; int flushblocks;
RefreshXLogWriteResult(LogwrtResult);
flushblocks = flushblocks =
WriteRqstPtr / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ; WriteRqstPtr / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
@@ -2790,14 +2805,8 @@ XLogFlush(XLogRecPtr record)
{ {
XLogRecPtr insertpos; XLogRecPtr insertpos;
/* read LogwrtResult and update local state */
SpinLockAcquire(&XLogCtl->info_lck);
if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
/* done already? */ /* done already? */
RefreshXLogWriteResult(LogwrtResult);
if (record <= LogwrtResult.Flush) if (record <= LogwrtResult.Flush)
break; break;
@@ -2805,6 +2814,10 @@ XLogFlush(XLogRecPtr record)
* Before actually performing the write, wait for all in-flight * Before actually performing the write, wait for all in-flight
* insertions to the pages we're about to write to finish. * insertions to the pages we're about to write to finish.
*/ */
SpinLockAcquire(&XLogCtl->info_lck);
if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
SpinLockRelease(&XLogCtl->info_lck);
insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr); insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
/* /*
@@ -2947,9 +2960,8 @@ XLogBackgroundFlush(void)
*/ */
insertTLI = XLogCtl->InsertTimeLineID; insertTLI = XLogCtl->InsertTimeLineID;
/* read LogwrtResult and update local state */ /* read updated LogwrtRqst */
SpinLockAcquire(&XLogCtl->info_lck); SpinLockAcquire(&XLogCtl->info_lck);
RefreshXLogWriteResult(LogwrtResult);
WriteRqst = XLogCtl->LogwrtRqst; WriteRqst = XLogCtl->LogwrtRqst;
SpinLockRelease(&XLogCtl->info_lck); SpinLockRelease(&XLogCtl->info_lck);
@@ -2957,6 +2969,7 @@ XLogBackgroundFlush(void)
WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ; WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
/* if we have already flushed that far, consider async commit records */ /* if we have already flushed that far, consider async commit records */
RefreshXLogWriteResult(LogwrtResult);
if (WriteRqst.Write <= LogwrtResult.Flush) if (WriteRqst.Write <= LogwrtResult.Flush)
{ {
SpinLockAcquire(&XLogCtl->info_lck); SpinLockAcquire(&XLogCtl->info_lck);
@@ -3125,9 +3138,7 @@ XLogNeedsFlush(XLogRecPtr record)
return false; return false;
/* read LogwrtResult and update local state */ /* read LogwrtResult and update local state */
SpinLockAcquire(&XLogCtl->info_lck);
RefreshXLogWriteResult(LogwrtResult); RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
/* check again */ /* check again */
if (record <= LogwrtResult.Flush) if (record <= LogwrtResult.Flush)
@@ -4940,6 +4951,8 @@ XLOGShmemInit(void)
SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck); SpinLockInit(&XLogCtl->info_lck);
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
} }
@@ -5961,11 +5974,13 @@ StartupXLOG(void)
XLogCtl->InitializedUpTo = EndOfLog; XLogCtl->InitializedUpTo = EndOfLog;
} }
/*
* Update local and shared status. This is OK to do without any locks
* because no other process can be reading or writing WAL yet.
*/
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
XLogCtl->logWriteResult = LogwrtResult.Write; pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
XLogCtl->logFlushResult = LogwrtResult.Flush;
XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -6410,9 +6425,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
{ {
Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE); Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
SpinLockAcquire(&XLogCtl->info_lck);
RefreshXLogWriteResult(LogwrtResult); RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
/* /*
* If we're writing and flushing WAL, the time line can't be changing, so * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9326,9 +9339,7 @@ GetXLogInsertRecPtr(void)
XLogRecPtr XLogRecPtr
GetXLogWriteRecPtr(void) GetXLogWriteRecPtr(void)
{ {
SpinLockAcquire(&XLogCtl->info_lck);
RefreshXLogWriteResult(LogwrtResult); RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
return LogwrtResult.Write; return LogwrtResult.Write;
} }