1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-09 06:21:09 +03:00

bufmgr: Allow some buffer state modifications while holding header lock

Until now BufferDesc.state was not allowed to be modified while the buffer
header spinlock was held. This meant that operations like unpinning buffers
needed to use a CAS loop, waiting for the buffer header spinlock to be
released before updating.

The benefit of that restriction is that it allowed us to unlock the buffer
header spinlock with just a write barrier and an unlocked write (instead of a
full atomic operation). That was important to avoid regressions in
48354581a4. However, since then the hottest buffer header spinlock uses have
been replaced with atomic operations (in particular, the most common use of
PinBuffer_Locked(), in GetVictimBuffer() (formerly in BufferAlloc()), has been
removed in 5e89985928).

This change will allow, in a subsequent commit, to release buffer pins with a
single atomic-sub operation. This previously was not possible while such
operations were not allowed while the buffer header spinlock was held, as an
atomic-sub would not have allowed a race-free check for the buffer header lock
being held.

Using atomic-sub to unpin buffers is a nice scalability win, however it is not
the primary motivation for this change (although it would be sufficient). The
primary motivation is that we would like to merge the buffer content lock into
BufferDesc.state, which will result in more frequent changes of the state
variable, which in some situations can cause a performance regression, due to
an increased CAS failure rate when unpinning buffers.  The regression entirely
vanishes when using atomic-sub.

Naively implementing this would require putting CAS loops in every place
modifying the buffer state while holding the buffer header lock. To avoid
that, introduce UnlockBufHdrExt(), which can set/add flags as well as the
refcount, together with releasing the lock.

Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Matthias van de Meent <boekewurm+postgres@gmail.com>
Discussion: https://postgr.es/m/fvfmkr5kk4nyex56ejgxj3uzi63isfxovp2biecb4bspbjrze7@az2pljabhnff
This commit is contained in:
Andres Freund
2025-11-06 16:42:10 -05:00
parent 448b6a4173
commit c75ebc657f
6 changed files with 224 additions and 119 deletions

View File

@@ -220,7 +220,7 @@ pg_buffercache_pages(PG_FUNCTION_ARGS)
else else
fctx->record[i].isvalid = false; fctx->record[i].isvalid = false;
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
} }
} }
@@ -460,7 +460,6 @@ pg_buffercache_numa_pages(PG_FUNCTION_ARGS)
{ {
char *buffptr = (char *) BufferGetBlock(i + 1); char *buffptr = (char *) BufferGetBlock(i + 1);
BufferDesc *bufHdr; BufferDesc *bufHdr;
uint32 buf_state;
uint32 bufferid; uint32 bufferid;
int32 page_num; int32 page_num;
char *startptr_buff, char *startptr_buff,
@@ -471,9 +470,9 @@ pg_buffercache_numa_pages(PG_FUNCTION_ARGS)
bufHdr = GetBufferDescriptor(i); bufHdr = GetBufferDescriptor(i);
/* Lock each buffer header before inspecting. */ /* Lock each buffer header before inspecting. */
buf_state = LockBufHdr(bufHdr); LockBufHdr(bufHdr);
bufferid = BufferDescriptorGetBuffer(bufHdr); bufferid = BufferDescriptorGetBuffer(bufHdr);
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
/* start of the first page of this buffer */ /* start of the first page of this buffer */
startptr_buff = (char *) TYPEALIGN_DOWN(os_page_size, buffptr); startptr_buff = (char *) TYPEALIGN_DOWN(os_page_size, buffptr);

View File

@@ -730,7 +730,7 @@ apw_dump_now(bool is_bgworker, bool dump_unlogged)
++num_blocks; ++num_blocks;
} }
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
} }
snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE); snprintf(transient_dump_file_path, MAXPGPATH, "%s.tmp", AUTOPREWARM_FILE);

View File

@@ -1990,6 +1990,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
Buffer victim_buffer; Buffer victim_buffer;
BufferDesc *victim_buf_hdr; BufferDesc *victim_buf_hdr;
uint32 victim_buf_state; uint32 victim_buf_state;
uint32 set_bits = 0;
/* Make sure we will have room to remember the buffer pin */ /* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlarge(CurrentResourceOwner); ResourceOwnerEnlarge(CurrentResourceOwner);
@@ -2116,11 +2117,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* checkpoints, except for their "init" forks, which need to be treated * checkpoints, except for their "init" forks, which need to be treated
* just like permanent relations. * just like permanent relations.
*/ */
victim_buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; set_bits |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM) if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM)
victim_buf_state |= BM_PERMANENT; set_bits |= BM_PERMANENT;
UnlockBufHdr(victim_buf_hdr, victim_buf_state); UnlockBufHdrExt(victim_buf_hdr, victim_buf_state,
set_bits, 0, 0);
LWLockRelease(newPartitionLock); LWLockRelease(newPartitionLock);
@@ -2160,9 +2162,7 @@ InvalidateBuffer(BufferDesc *buf)
/* Save the original buffer tag before dropping the spinlock */ /* Save the original buffer tag before dropping the spinlock */
oldTag = buf->tag; oldTag = buf->tag;
buf_state = pg_atomic_read_u32(&buf->state); UnlockBufHdr(buf);
Assert(buf_state & BM_LOCKED);
UnlockBufHdr(buf, buf_state);
/* /*
* Need to compute the old tag's hashcode and partition lock ID. XXX is it * Need to compute the old tag's hashcode and partition lock ID. XXX is it
@@ -2186,7 +2186,7 @@ retry:
/* If it's changed while we were waiting for lock, do nothing */ /* If it's changed while we were waiting for lock, do nothing */
if (!BufferTagsEqual(&buf->tag, &oldTag)) if (!BufferTagsEqual(&buf->tag, &oldTag))
{ {
UnlockBufHdr(buf, buf_state); UnlockBufHdr(buf);
LWLockRelease(oldPartitionLock); LWLockRelease(oldPartitionLock);
return; return;
} }
@@ -2203,7 +2203,7 @@ retry:
*/ */
if (BUF_STATE_GET_REFCOUNT(buf_state) != 0) if (BUF_STATE_GET_REFCOUNT(buf_state) != 0)
{ {
UnlockBufHdr(buf, buf_state); UnlockBufHdr(buf);
LWLockRelease(oldPartitionLock); LWLockRelease(oldPartitionLock);
/* safety check: should definitely not be our *own* pin */ /* safety check: should definitely not be our *own* pin */
if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0)
@@ -2218,8 +2218,11 @@ retry:
*/ */
oldFlags = buf_state & BUF_FLAG_MASK; oldFlags = buf_state & BUF_FLAG_MASK;
ClearBufferTag(&buf->tag); ClearBufferTag(&buf->tag);
buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
UnlockBufHdr(buf, buf_state); UnlockBufHdrExt(buf, buf_state,
0,
BUF_FLAG_MASK | BUF_USAGECOUNT_MASK,
0);
/* /*
* Remove the buffer from the lookup hashtable, if it was in there. * Remove the buffer from the lookup hashtable, if it was in there.
@@ -2279,7 +2282,7 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr)
{ {
Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
UnlockBufHdr(buf_hdr, buf_state); UnlockBufHdr(buf_hdr);
LWLockRelease(partition_lock); LWLockRelease(partition_lock);
return false; return false;
@@ -2293,8 +2296,10 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr)
* tag (see e.g. FlushDatabaseBuffers()). * tag (see e.g. FlushDatabaseBuffers()).
*/ */
ClearBufferTag(&buf_hdr->tag); ClearBufferTag(&buf_hdr->tag);
buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK); UnlockBufHdrExt(buf_hdr, buf_state,
UnlockBufHdr(buf_hdr, buf_state); 0,
BUF_FLAG_MASK | BUF_USAGECOUNT_MASK,
0);
Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
@@ -2303,6 +2308,7 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr)
LWLockRelease(partition_lock); LWLockRelease(partition_lock);
buf_state = pg_atomic_read_u32(&buf_hdr->state);
Assert(!(buf_state & (BM_DIRTY | BM_VALID | BM_TAG_VALID))); Assert(!(buf_state & (BM_DIRTY | BM_VALID | BM_TAG_VALID)));
Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
Assert(BUF_STATE_GET_REFCOUNT(pg_atomic_read_u32(&buf_hdr->state)) > 0); Assert(BUF_STATE_GET_REFCOUNT(pg_atomic_read_u32(&buf_hdr->state)) > 0);
@@ -2393,7 +2399,7 @@ again:
/* Read the LSN while holding buffer header lock */ /* Read the LSN while holding buffer header lock */
buf_state = LockBufHdr(buf_hdr); buf_state = LockBufHdr(buf_hdr);
lsn = BufferGetLSN(buf_hdr); lsn = BufferGetLSN(buf_hdr);
UnlockBufHdr(buf_hdr, buf_state); UnlockBufHdr(buf_hdr);
if (XLogNeedsFlush(lsn) if (XLogNeedsFlush(lsn)
&& StrategyRejectBuffer(strategy, buf_hdr, from_ring)) && StrategyRejectBuffer(strategy, buf_hdr, from_ring))
@@ -2739,15 +2745,13 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
*/ */
do do
{ {
uint32 buf_state = LockBufHdr(existing_hdr); pg_atomic_fetch_and_u32(&existing_hdr->state, ~BM_VALID);
buf_state &= ~BM_VALID;
UnlockBufHdr(existing_hdr, buf_state);
} while (!StartBufferIO(existing_hdr, true, false)); } while (!StartBufferIO(existing_hdr, true, false));
} }
else else
{ {
uint32 buf_state; uint32 buf_state;
uint32 set_bits = 0;
buf_state = LockBufHdr(victim_buf_hdr); buf_state = LockBufHdr(victim_buf_hdr);
@@ -2757,11 +2761,13 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
victim_buf_hdr->tag = tag; victim_buf_hdr->tag = tag;
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; set_bits |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
if (bmr.relpersistence == RELPERSISTENCE_PERMANENT || fork == INIT_FORKNUM) if (bmr.relpersistence == RELPERSISTENCE_PERMANENT || fork == INIT_FORKNUM)
buf_state |= BM_PERMANENT; set_bits |= BM_PERMANENT;
UnlockBufHdr(victim_buf_hdr, buf_state); UnlockBufHdrExt(victim_buf_hdr, buf_state,
set_bits, 0,
0);
LWLockRelease(partition_lock); LWLockRelease(partition_lock);
@@ -2954,6 +2960,10 @@ MarkBufferDirty(Buffer buffer)
Assert(BufferIsPinned(buffer)); Assert(BufferIsPinned(buffer));
Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE)); Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE));
/*
* NB: We have to wait for the buffer header spinlock to be not held, as
* TerminateBufferIO() relies on the spinlock.
*/
old_buf_state = pg_atomic_read_u32(&bufHdr->state); old_buf_state = pg_atomic_read_u32(&bufHdr->state);
for (;;) for (;;)
{ {
@@ -3078,6 +3088,10 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy,
if (unlikely(skip_if_not_valid && !(old_buf_state & BM_VALID))) if (unlikely(skip_if_not_valid && !(old_buf_state & BM_VALID)))
return false; return false;
/*
* We're not allowed to increase the refcount while the buffer
* header spinlock is held. Wait for the lock to be released.
*/
if (old_buf_state & BM_LOCKED) if (old_buf_state & BM_LOCKED)
old_buf_state = WaitBufHdrUnlocked(buf); old_buf_state = WaitBufHdrUnlocked(buf);
@@ -3164,7 +3178,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy,
static void static void
PinBuffer_Locked(BufferDesc *buf) PinBuffer_Locked(BufferDesc *buf)
{ {
uint32 buf_state; uint32 old_buf_state;
/* /*
* As explained, We don't expect any preexisting pins. That allows us to * As explained, We don't expect any preexisting pins. That allows us to
@@ -3176,10 +3190,10 @@ PinBuffer_Locked(BufferDesc *buf)
* Since we hold the buffer spinlock, we can update the buffer state and * Since we hold the buffer spinlock, we can update the buffer state and
* release the lock in one operation. * release the lock in one operation.
*/ */
buf_state = pg_atomic_read_u32(&buf->state); old_buf_state = pg_atomic_read_u32(&buf->state);
Assert(buf_state & BM_LOCKED);
buf_state += BUF_REFCOUNT_ONE; UnlockBufHdrExt(buf, old_buf_state,
UnlockBufHdr(buf, buf_state); 0, 0, 1);
TrackNewBufferPin(BufferDescriptorGetBuffer(buf)); TrackNewBufferPin(BufferDescriptorGetBuffer(buf));
} }
@@ -3214,12 +3228,13 @@ WakePinCountWaiter(BufferDesc *buf)
/* we just released the last pin other than the waiter's */ /* we just released the last pin other than the waiter's */
int wait_backend_pgprocno = buf->wait_backend_pgprocno; int wait_backend_pgprocno = buf->wait_backend_pgprocno;
buf_state &= ~BM_PIN_COUNT_WAITER; UnlockBufHdrExt(buf, buf_state,
UnlockBufHdr(buf, buf_state); 0, BM_PIN_COUNT_WAITER,
0);
ProcSendSignal(wait_backend_pgprocno); ProcSendSignal(wait_backend_pgprocno);
} }
else else
UnlockBufHdr(buf, buf_state); UnlockBufHdr(buf);
} }
/* /*
@@ -3275,6 +3290,10 @@ UnpinBufferNoOwner(BufferDesc *buf)
* *
* Since buffer spinlock holder can update status using just write, * Since buffer spinlock holder can update status using just write,
* it's not safe to use atomic decrement here; thus use a CAS loop. * it's not safe to use atomic decrement here; thus use a CAS loop.
*
* TODO: The above requirement does not hold anymore, in a future
* commit this will be rewritten to release the pin in a single atomic
* operation.
*/ */
old_buf_state = pg_atomic_read_u32(&buf->state); old_buf_state = pg_atomic_read_u32(&buf->state);
for (;;) for (;;)
@@ -3388,6 +3407,7 @@ BufferSync(int flags)
for (buf_id = 0; buf_id < NBuffers; buf_id++) for (buf_id = 0; buf_id < NBuffers; buf_id++)
{ {
BufferDesc *bufHdr = GetBufferDescriptor(buf_id); BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
uint32 set_bits = 0;
/* /*
* Header spinlock is enough to examine BM_DIRTY, see comment in * Header spinlock is enough to examine BM_DIRTY, see comment in
@@ -3399,7 +3419,7 @@ BufferSync(int flags)
{ {
CkptSortItem *item; CkptSortItem *item;
buf_state |= BM_CHECKPOINT_NEEDED; set_bits = BM_CHECKPOINT_NEEDED;
item = &CkptBufferIds[num_to_scan++]; item = &CkptBufferIds[num_to_scan++];
item->buf_id = buf_id; item->buf_id = buf_id;
@@ -3409,7 +3429,9 @@ BufferSync(int flags)
item->blockNum = bufHdr->tag.blockNum; item->blockNum = bufHdr->tag.blockNum;
} }
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdrExt(bufHdr, buf_state,
set_bits, 0,
0);
/* Check for barrier events in case NBuffers is large. */ /* Check for barrier events in case NBuffers is large. */
if (ProcSignalBarrierPending) if (ProcSignalBarrierPending)
@@ -3948,14 +3970,14 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
else if (skip_recently_used) else if (skip_recently_used)
{ {
/* Caller told us not to write recently-used buffers */ /* Caller told us not to write recently-used buffers */
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
return result; return result;
} }
if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
{ {
/* It's clean, so nothing to do */ /* It's clean, so nothing to do */
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
return result; return result;
} }
@@ -4324,8 +4346,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
recptr = BufferGetLSN(buf); recptr = BufferGetLSN(buf);
/* To check if block content changes while flushing. - vadim 01/17/97 */ /* To check if block content changes while flushing. - vadim 01/17/97 */
buf_state &= ~BM_JUST_DIRTIED; UnlockBufHdrExt(buf, buf_state,
UnlockBufHdr(buf, buf_state); 0, BM_JUST_DIRTIED,
0);
/* /*
* Force XLOG flush up to buffer's LSN. This implements the basic WAL * Force XLOG flush up to buffer's LSN. This implements the basic WAL
@@ -4501,7 +4524,6 @@ BufferGetLSNAtomic(Buffer buffer)
char *page = BufferGetPage(buffer); char *page = BufferGetPage(buffer);
BufferDesc *bufHdr; BufferDesc *bufHdr;
XLogRecPtr lsn; XLogRecPtr lsn;
uint32 buf_state;
/* /*
* If we don't need locking for correctness, fastpath out. * If we don't need locking for correctness, fastpath out.
@@ -4514,9 +4536,9 @@ BufferGetLSNAtomic(Buffer buffer)
Assert(BufferIsPinned(buffer)); Assert(BufferIsPinned(buffer));
bufHdr = GetBufferDescriptor(buffer - 1); bufHdr = GetBufferDescriptor(buffer - 1);
buf_state = LockBufHdr(bufHdr); LockBufHdr(bufHdr);
lsn = PageGetLSN(page); lsn = PageGetLSN(page);
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
return lsn; return lsn;
} }
@@ -4617,7 +4639,6 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
for (i = 0; i < NBuffers; i++) for (i = 0; i < NBuffers; i++)
{ {
BufferDesc *bufHdr = GetBufferDescriptor(i); BufferDesc *bufHdr = GetBufferDescriptor(i);
uint32 buf_state;
/* /*
* We can make this a tad faster by prechecking the buffer tag before * We can make this a tad faster by prechecking the buffer tag before
@@ -4638,7 +4659,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
if (!BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator.locator)) if (!BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator.locator))
continue; continue;
buf_state = LockBufHdr(bufHdr); LockBufHdr(bufHdr);
for (j = 0; j < nforks; j++) for (j = 0; j < nforks; j++)
{ {
@@ -4651,7 +4672,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
} }
} }
if (j >= nforks) if (j >= nforks)
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
} }
} }
@@ -4780,7 +4801,6 @@ DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators)
{ {
RelFileLocator *rlocator = NULL; RelFileLocator *rlocator = NULL;
BufferDesc *bufHdr = GetBufferDescriptor(i); BufferDesc *bufHdr = GetBufferDescriptor(i);
uint32 buf_state;
/* /*
* As in DropRelationBuffers, an unlocked precheck should be safe and * As in DropRelationBuffers, an unlocked precheck should be safe and
@@ -4814,11 +4834,11 @@ DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators)
if (rlocator == NULL) if (rlocator == NULL)
continue; continue;
buf_state = LockBufHdr(bufHdr); LockBufHdr(bufHdr);
if (BufTagMatchesRelFileLocator(&bufHdr->tag, rlocator)) if (BufTagMatchesRelFileLocator(&bufHdr->tag, rlocator))
InvalidateBuffer(bufHdr); /* releases spinlock */ InvalidateBuffer(bufHdr); /* releases spinlock */
else else
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
} }
pfree(locators); pfree(locators);
@@ -4848,7 +4868,6 @@ FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum,
LWLock *bufPartitionLock; /* buffer partition lock for it */ LWLock *bufPartitionLock; /* buffer partition lock for it */
int buf_id; int buf_id;
BufferDesc *bufHdr; BufferDesc *bufHdr;
uint32 buf_state;
/* create a tag so we can lookup the buffer */ /* create a tag so we can lookup the buffer */
InitBufferTag(&bufTag, &rlocator, forkNum, curBlock); InitBufferTag(&bufTag, &rlocator, forkNum, curBlock);
@@ -4873,14 +4892,14 @@ FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum,
* evicted by some other backend loading blocks for a different * evicted by some other backend loading blocks for a different
* relation after we release lock on the BufMapping table. * relation after we release lock on the BufMapping table.
*/ */
buf_state = LockBufHdr(bufHdr); LockBufHdr(bufHdr);
if (BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) && if (BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator) &&
BufTagGetForkNum(&bufHdr->tag) == forkNum && BufTagGetForkNum(&bufHdr->tag) == forkNum &&
bufHdr->tag.blockNum >= firstDelBlock) bufHdr->tag.blockNum >= firstDelBlock)
InvalidateBuffer(bufHdr); /* releases spinlock */ InvalidateBuffer(bufHdr); /* releases spinlock */
else else
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
} }
} }
@@ -4908,7 +4927,6 @@ DropDatabaseBuffers(Oid dbid)
for (i = 0; i < NBuffers; i++) for (i = 0; i < NBuffers; i++)
{ {
BufferDesc *bufHdr = GetBufferDescriptor(i); BufferDesc *bufHdr = GetBufferDescriptor(i);
uint32 buf_state;
/* /*
* As in DropRelationBuffers, an unlocked precheck should be safe and * As in DropRelationBuffers, an unlocked precheck should be safe and
@@ -4917,11 +4935,11 @@ DropDatabaseBuffers(Oid dbid)
if (bufHdr->tag.dbOid != dbid) if (bufHdr->tag.dbOid != dbid)
continue; continue;
buf_state = LockBufHdr(bufHdr); LockBufHdr(bufHdr);
if (bufHdr->tag.dbOid == dbid) if (bufHdr->tag.dbOid == dbid)
InvalidateBuffer(bufHdr); /* releases spinlock */ InvalidateBuffer(bufHdr); /* releases spinlock */
else else
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
} }
} }
@@ -5018,7 +5036,7 @@ FlushRelationBuffers(Relation rel)
UnpinBuffer(bufHdr); UnpinBuffer(bufHdr);
} }
else else
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
} }
} }
@@ -5113,7 +5131,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels)
UnpinBuffer(bufHdr); UnpinBuffer(bufHdr);
} }
else else
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
} }
pfree(srels); pfree(srels);
@@ -5339,7 +5357,7 @@ FlushDatabaseBuffers(Oid dbid)
UnpinBuffer(bufHdr); UnpinBuffer(bufHdr);
} }
else else
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
} }
} }
@@ -5549,8 +5567,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
} }
buf_state |= BM_DIRTY | BM_JUST_DIRTIED; UnlockBufHdrExt(bufHdr, buf_state,
UnlockBufHdr(bufHdr, buf_state); BM_DIRTY | BM_JUST_DIRTIED,
0, 0);
if (delayChkptFlags) if (delayChkptFlags)
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
@@ -5581,6 +5600,7 @@ UnlockBuffers(void)
if (buf) if (buf)
{ {
uint32 buf_state; uint32 buf_state;
uint32 unset_bits = 0;
buf_state = LockBufHdr(buf); buf_state = LockBufHdr(buf);
@@ -5590,9 +5610,11 @@ UnlockBuffers(void)
*/ */
if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
buf->wait_backend_pgprocno == MyProcNumber) buf->wait_backend_pgprocno == MyProcNumber)
buf_state &= ~BM_PIN_COUNT_WAITER; unset_bits = BM_PIN_COUNT_WAITER;
UnlockBufHdr(buf, buf_state); UnlockBufHdrExt(buf, buf_state,
0, unset_bits,
0);
PinCountWaitBuf = NULL; PinCountWaitBuf = NULL;
} }
@@ -5710,6 +5732,7 @@ LockBufferForCleanup(Buffer buffer)
for (;;) for (;;)
{ {
uint32 buf_state; uint32 buf_state;
uint32 unset_bits = 0;
/* Try to acquire lock */ /* Try to acquire lock */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
@@ -5719,7 +5742,7 @@ LockBufferForCleanup(Buffer buffer)
if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
{ {
/* Successfully acquired exclusive lock with pincount 1 */ /* Successfully acquired exclusive lock with pincount 1 */
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
/* /*
* Emit the log message if recovery conflict on buffer pin was * Emit the log message if recovery conflict on buffer pin was
@@ -5742,14 +5765,15 @@ LockBufferForCleanup(Buffer buffer)
/* Failed, so mark myself as waiting for pincount 1 */ /* Failed, so mark myself as waiting for pincount 1 */
if (buf_state & BM_PIN_COUNT_WAITER) if (buf_state & BM_PIN_COUNT_WAITER)
{ {
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
elog(ERROR, "multiple backends attempting to wait for pincount 1"); elog(ERROR, "multiple backends attempting to wait for pincount 1");
} }
bufHdr->wait_backend_pgprocno = MyProcNumber; bufHdr->wait_backend_pgprocno = MyProcNumber;
PinCountWaitBuf = bufHdr; PinCountWaitBuf = bufHdr;
buf_state |= BM_PIN_COUNT_WAITER; UnlockBufHdrExt(bufHdr, buf_state,
UnlockBufHdr(bufHdr, buf_state); BM_PIN_COUNT_WAITER, 0,
0);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* Wait to be signaled by UnpinBuffer() */ /* Wait to be signaled by UnpinBuffer() */
@@ -5811,8 +5835,11 @@ LockBufferForCleanup(Buffer buffer)
buf_state = LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
bufHdr->wait_backend_pgprocno == MyProcNumber) bufHdr->wait_backend_pgprocno == MyProcNumber)
buf_state &= ~BM_PIN_COUNT_WAITER; unset_bits |= BM_PIN_COUNT_WAITER;
UnlockBufHdr(bufHdr, buf_state);
UnlockBufHdrExt(bufHdr, buf_state,
0, unset_bits,
0);
PinCountWaitBuf = NULL; PinCountWaitBuf = NULL;
/* Loop back and try again */ /* Loop back and try again */
@@ -5889,12 +5916,12 @@ ConditionalLockBufferForCleanup(Buffer buffer)
if (refcount == 1) if (refcount == 1)
{ {
/* Successfully acquired exclusive lock with pincount 1 */ /* Successfully acquired exclusive lock with pincount 1 */
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
return true; return true;
} }
/* Failed, so release the lock */ /* Failed, so release the lock */
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
return false; return false;
} }
@@ -5941,11 +5968,11 @@ IsBufferCleanupOK(Buffer buffer)
if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
{ {
/* pincount is OK. */ /* pincount is OK. */
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
return true; return true;
} }
UnlockBufHdr(bufHdr, buf_state); UnlockBufHdr(bufHdr);
return false; return false;
} }
@@ -5983,7 +6010,7 @@ WaitIO(BufferDesc *buf)
* clearing the wref while it's being read. * clearing the wref while it's being read.
*/ */
iow = buf->io_wref; iow = buf->io_wref;
UnlockBufHdr(buf, buf_state); UnlockBufHdr(buf);
/* no IO in progress, we don't need to wait */ /* no IO in progress, we don't need to wait */
if (!(buf_state & BM_IO_IN_PROGRESS)) if (!(buf_state & BM_IO_IN_PROGRESS))
@@ -6051,7 +6078,7 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
if (!(buf_state & BM_IO_IN_PROGRESS)) if (!(buf_state & BM_IO_IN_PROGRESS))
break; break;
UnlockBufHdr(buf, buf_state); UnlockBufHdr(buf);
if (nowait) if (nowait)
return false; return false;
WaitIO(buf); WaitIO(buf);
@@ -6062,12 +6089,13 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
/* Check if someone else already did the I/O */ /* Check if someone else already did the I/O */
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{ {
UnlockBufHdr(buf, buf_state); UnlockBufHdr(buf);
return false; return false;
} }
buf_state |= BM_IO_IN_PROGRESS; UnlockBufHdrExt(buf, buf_state,
UnlockBufHdr(buf, buf_state); BM_IO_IN_PROGRESS, 0,
0);
ResourceOwnerRememberBufferIO(CurrentResourceOwner, ResourceOwnerRememberBufferIO(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf)); BufferDescriptorGetBuffer(buf));
@@ -6100,28 +6128,31 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits,
bool forget_owner, bool release_aio) bool forget_owner, bool release_aio)
{ {
uint32 buf_state; uint32 buf_state;
uint32 unset_flag_bits = 0;
int refcount_change = 0;
buf_state = LockBufHdr(buf); buf_state = LockBufHdr(buf);
Assert(buf_state & BM_IO_IN_PROGRESS); Assert(buf_state & BM_IO_IN_PROGRESS);
buf_state &= ~BM_IO_IN_PROGRESS; unset_flag_bits |= BM_IO_IN_PROGRESS;
/* Clear earlier errors, if this IO failed, it'll be marked again */ /* Clear earlier errors, if this IO failed, it'll be marked again */
buf_state &= ~BM_IO_ERROR; unset_flag_bits |= BM_IO_ERROR;
if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); unset_flag_bits |= BM_DIRTY | BM_CHECKPOINT_NEEDED;
if (release_aio) if (release_aio)
{ {
/* release ownership by the AIO subsystem */ /* release ownership by the AIO subsystem */
Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
buf_state -= BUF_REFCOUNT_ONE; refcount_change = -1;
pgaio_wref_clear(&buf->io_wref); pgaio_wref_clear(&buf->io_wref);
} }
buf_state |= set_flag_bits; buf_state = UnlockBufHdrExt(buf, buf_state,
UnlockBufHdr(buf, buf_state); set_flag_bits, unset_flag_bits,
refcount_change);
if (forget_owner) if (forget_owner)
ResourceOwnerForgetBufferIO(CurrentResourceOwner, ResourceOwnerForgetBufferIO(CurrentResourceOwner,
@@ -6166,12 +6197,12 @@ AbortBufferIO(Buffer buffer)
if (!(buf_state & BM_VALID)) if (!(buf_state & BM_VALID))
{ {
Assert(!(buf_state & BM_DIRTY)); Assert(!(buf_state & BM_DIRTY));
UnlockBufHdr(buf_hdr, buf_state); UnlockBufHdr(buf_hdr);
} }
else else
{ {
Assert(buf_state & BM_DIRTY); Assert(buf_state & BM_DIRTY);
UnlockBufHdr(buf_hdr, buf_state); UnlockBufHdr(buf_hdr);
/* Issue notice if this is not the first failure... */ /* Issue notice if this is not the first failure... */
if (buf_state & BM_IO_ERROR) if (buf_state & BM_IO_ERROR)
@@ -6593,14 +6624,14 @@ EvictUnpinnedBufferInternal(BufferDesc *desc, bool *buffer_flushed)
if ((buf_state & BM_VALID) == 0) if ((buf_state & BM_VALID) == 0)
{ {
UnlockBufHdr(desc, buf_state); UnlockBufHdr(desc);
return false; return false;
} }
/* Check that it's not pinned already. */ /* Check that it's not pinned already. */
if (BUF_STATE_GET_REFCOUNT(buf_state) > 0) if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
{ {
UnlockBufHdr(desc, buf_state); UnlockBufHdr(desc);
return false; return false;
} }
@@ -6754,7 +6785,7 @@ EvictRelUnpinnedBuffers(Relation rel, int32 *buffers_evicted,
if ((buf_state & BM_VALID) == 0 || if ((buf_state & BM_VALID) == 0 ||
!BufTagMatchesRelFileLocator(&desc->tag, &rel->rd_locator)) !BufTagMatchesRelFileLocator(&desc->tag, &rel->rd_locator))
{ {
UnlockBufHdr(desc, buf_state); UnlockBufHdr(desc);
continue; continue;
} }
@@ -6852,13 +6883,15 @@ buffer_stage_common(PgAioHandle *ioh, bool is_write, bool is_temp)
* *
* This pin is released again in TerminateBufferIO(). * This pin is released again in TerminateBufferIO().
*/ */
buf_state += BUF_REFCOUNT_ONE;
buf_hdr->io_wref = io_ref; buf_hdr->io_wref = io_ref;
if (is_temp) if (is_temp)
{
buf_state += BUF_REFCOUNT_ONE;
pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state); pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
}
else else
UnlockBufHdr(buf_hdr, buf_state); UnlockBufHdrExt(buf_hdr, buf_state, 0, 0, 1);
/* /*
* Ensure the content lock that prevents buffer modifications while * Ensure the content lock that prevents buffer modifications while

View File

@@ -266,6 +266,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
break; break;
} }
/* See equivalent code in PinBuffer() */
if (unlikely(local_buf_state & BM_LOCKED)) if (unlikely(local_buf_state & BM_LOCKED))
{ {
old_buf_state = WaitBufHdrUnlocked(buf); old_buf_state = WaitBufHdrUnlocked(buf);
@@ -700,6 +701,7 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
|| BUF_STATE_GET_USAGECOUNT(local_buf_state) > 1) || BUF_STATE_GET_USAGECOUNT(local_buf_state) > 1)
break; break;
/* See equivalent code in PinBuffer() */
if (unlikely(local_buf_state & BM_LOCKED)) if (unlikely(local_buf_state & BM_LOCKED))
{ {
old_buf_state = WaitBufHdrUnlocked(buf); old_buf_state = WaitBufHdrUnlocked(buf);

View File

@@ -211,28 +211,36 @@ BufMappingPartitionLockByIndex(uint32 index)
/* /*
* BufferDesc -- shared descriptor/state data for a single shared buffer. * BufferDesc -- shared descriptor/state data for a single shared buffer.
* *
* Note: Buffer header lock (BM_LOCKED flag) must be held to examine or change * The state of the buffer is controlled by the, drumroll, state variable. It
* tag, state or wait_backend_pgprocno fields. In general, buffer header lock * only may be modified using atomic operations. The state variable combines
* is a spinlock which is combined with flags, refcount and usagecount into * various flags, the buffer's refcount and usage count. See comment above
* single atomic variable. This layout allow us to do some operations in a * BUF_REFCOUNT_BITS for details about the division. This layout allow us to
* single atomic operation, without actually acquiring and releasing spinlock; * do some operations in a single atomic operation, without actually acquiring
* for instance, increase or decrease refcount. buf_id field never changes * and releasing the spinlock; for instance, increasing or decreasing the
* after initialization, so does not need locking. The LWLock can take care * refcount.
* of itself. The buffer header lock is *not* used to control access to the
* data in the buffer!
* *
* It's assumed that nobody changes the state field while buffer header lock * One of the aforementioned flags is BM_LOCKED, used to implement the buffer
* is held. Thus buffer header lock holder can do complex updates of the * header lock. See the following paragraphs, as well as the documentation for
* state variable in single write, simultaneously with lock release (cleaning * individual fields, for more details.
* BM_LOCKED flag). On the other hand, updating of state without holding
* buffer header lock is restricted to CAS, which ensures that BM_LOCKED flag
* is not set. Atomic increment/decrement, OR/AND etc. are not allowed.
* *
* An exception is that if we have the buffer pinned, its tag can't change * The identity of the buffer (BufferDesc.tag) can only be changed by the
* underneath us, so we can examine the tag without locking the buffer header. * backend holding the buffer header lock.
* Also, in places we do one-time reads of the flags without bothering to *
* lock the buffer header; this is generally for situations where we don't * If the lock is held by another backend, neither additional buffer pins may
* expect the flag bit being tested to be changing. * be established (we would like to relax this eventually), nor can flags be
* set/cleared. These operations either need to acquire the buffer header
* spinlock, or need to use a CAS loop, waiting for the lock to be released if
* it is held. However, existing buffer pins may be released while the buffer
* header spinlock is held, using an atomic subtraction.
*
* The LWLock can take care of itself. The buffer header lock is *not* used
* to control access to the data in the buffer!
*
* If we have the buffer pinned, its tag can't change underneath us, so we can
* examine the tag without locking the buffer header. Also, in places we do
* one-time reads of the flags without bothering to lock the buffer header;
* this is generally for situations where we don't expect the flag bit being
* tested to be changing.
* *
* We can't physically remove items from a disk page if another backend has * We can't physically remove items from a disk page if another backend has
* the buffer pinned. Hence, a backend may need to wait for all other pins * the buffer pinned. Hence, a backend may need to wait for all other pins
@@ -256,13 +264,29 @@ BufMappingPartitionLockByIndex(uint32 index)
*/ */
typedef struct BufferDesc typedef struct BufferDesc
{ {
BufferTag tag; /* ID of page contained in buffer */ /*
int buf_id; /* buffer's index number (from 0) */ * ID of page contained in buffer. The buffer header spinlock needs to be
* held to modify this field.
*/
BufferTag tag;
/* state of the tag, containing flags, refcount and usagecount */ /*
* Buffer's index number (from 0). The field never changes after
* initialization, so does not need locking.
*/
int buf_id;
/*
* State of the buffer, containing flags, refcount and usagecount. See
* BUF_* and BM_* defines at the top of this file.
*/
pg_atomic_uint32 state; pg_atomic_uint32 state;
int wait_backend_pgprocno; /* backend of pin-count waiter */ /*
* Backend of pin-count waiter. The buffer header spinlock needs to be
* held to modify this field.
*/
int wait_backend_pgprocno;
PgAioWaitRef io_wref; /* set iff AIO is in progress */ PgAioWaitRef io_wref; /* set iff AIO is in progress */
LWLock content_lock; /* to lock access to buffer contents */ LWLock content_lock; /* to lock access to buffer contents */
@@ -364,11 +388,52 @@ BufferDescriptorGetContentLock(const BufferDesc *bdesc)
*/ */
extern uint32 LockBufHdr(BufferDesc *desc); extern uint32 LockBufHdr(BufferDesc *desc);
/*
* Unlock the buffer header.
*
* This can only be used if the caller did not modify BufferDesc.state. To
* set/unset flag bits or change the refcount use UnlockBufHdrExt().
*/
static inline void static inline void
UnlockBufHdr(BufferDesc *desc, uint32 buf_state) UnlockBufHdr(BufferDesc *desc)
{ {
pg_write_barrier(); Assert(pg_atomic_read_u32(&desc->state) & BM_LOCKED);
pg_atomic_write_u32(&desc->state, buf_state & (~BM_LOCKED));
pg_atomic_fetch_sub_u32(&desc->state, BM_LOCKED);
}
/*
* Unlock the buffer header, while atomically adding the flags in set_bits,
* unsetting the ones in unset_bits and changing the refcount by
* refcount_change.
*
* Note that this approach would not work for usagecount, since we need to cap
* the usagecount at BM_MAX_USAGE_COUNT.
*/
static inline uint32
UnlockBufHdrExt(BufferDesc *desc, uint32 old_buf_state,
uint32 set_bits, uint32 unset_bits,
int refcount_change)
{
for (;;)
{
uint32 buf_state = old_buf_state;
Assert(buf_state & BM_LOCKED);
buf_state |= set_bits;
buf_state &= ~unset_bits;
buf_state &= ~BM_LOCKED;
if (refcount_change != 0)
buf_state += BUF_REFCOUNT_ONE * refcount_change;
if (pg_atomic_compare_exchange_u32(&desc->state, &old_buf_state,
buf_state))
{
return old_buf_state;
}
}
} }
extern uint32 WaitBufHdrUnlocked(BufferDesc *buf); extern uint32 WaitBufHdrUnlocked(BufferDesc *buf);

View File

@@ -310,6 +310,7 @@ create_toy_buffer(Relation rel, BlockNumber blkno)
BufferDesc *buf_hdr; BufferDesc *buf_hdr;
uint32 buf_state; uint32 buf_state;
bool was_pinned = false; bool was_pinned = false;
uint32 unset_bits = 0;
/* place buffer in shared buffers without erroring out */ /* place buffer in shared buffers without erroring out */
buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL); buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_ZERO_AND_LOCK, NULL);
@@ -334,12 +335,17 @@ create_toy_buffer(Relation rel, BlockNumber blkno)
if (BUF_STATE_GET_REFCOUNT(buf_state) > 1) if (BUF_STATE_GET_REFCOUNT(buf_state) > 1)
was_pinned = true; was_pinned = true;
else else
buf_state &= ~(BM_VALID | BM_DIRTY); unset_bits |= BM_VALID | BM_DIRTY;
if (RelationUsesLocalBuffers(rel)) if (RelationUsesLocalBuffers(rel))
{
buf_state &= ~unset_bits;
pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state); pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
}
else else
UnlockBufHdr(buf_hdr, buf_state); {
UnlockBufHdrExt(buf_hdr, buf_state, 0, unset_bits, 0);
}
if (was_pinned) if (was_pinned)
elog(ERROR, "toy buffer %d was already pinned", elog(ERROR, "toy buffer %d was already pinned",