mirror of
https://github.com/postgres/postgres.git
synced 2025-06-30 21:42:05 +03:00
bufmgr: Acquire and clean victim buffer separately
Previously we held buffer locks for two buffer mapping partitions at the same time to change the identity of buffers. Particularly for extending relations needing to hold the extension lock while acquiring a victim buffer is painful.But it also creates a bottleneck for workloads that just involve reads. Now we instead first acquire a victim buffer and write it out, if necessary. Then we remove that buffer from the old partition with just the old partition's partition lock held and insert it into the new partition with just that partition's lock held. By separating out the victim buffer acquisition, future commits will be able to change relation extensions to scale better. On my workstation, a micro-benchmark exercising buffered reads strenuously and under a lot of concurrency, sees a >2x improvement. Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi> Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Discussion: https://postgr.es/m/20221029025420.eplyow6k7tgu6he3@awork3.anarazel.de
This commit is contained in:
@ -45,13 +45,14 @@ BufferDesc *LocalBufferDescriptors = NULL;
|
||||
Block *LocalBufferBlockPointers = NULL;
|
||||
int32 *LocalRefCount = NULL;
|
||||
|
||||
static int nextFreeLocalBuf = 0;
|
||||
static int nextFreeLocalBufId = 0;
|
||||
|
||||
static HTAB *LocalBufHash = NULL;
|
||||
|
||||
|
||||
static void InitLocalBuffers(void);
|
||||
static Block GetLocalBufferStorage(void);
|
||||
static Buffer GetLocalVictimBuffer(void);
|
||||
|
||||
|
||||
/*
|
||||
@ -113,10 +114,9 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
|
||||
BufferTag newTag; /* identity of requested block */
|
||||
LocalBufferLookupEnt *hresult;
|
||||
BufferDesc *bufHdr;
|
||||
int b;
|
||||
int trycounter;
|
||||
Buffer victim_buffer;
|
||||
int bufid;
|
||||
bool found;
|
||||
uint32 buf_state;
|
||||
|
||||
InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
|
||||
|
||||
@ -130,23 +130,51 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
|
||||
|
||||
if (hresult)
|
||||
{
|
||||
b = hresult->id;
|
||||
bufHdr = GetLocalBufferDescriptor(b);
|
||||
bufid = hresult->id;
|
||||
bufHdr = GetLocalBufferDescriptor(bufid);
|
||||
Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
|
||||
#ifdef LBDEBUG
|
||||
fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
|
||||
smgr->smgr_rlocator.locator.relNumber, forkNum, blockNum, -b - 1);
|
||||
#endif
|
||||
|
||||
*foundPtr = PinLocalBuffer(bufHdr, true);
|
||||
return bufHdr;
|
||||
}
|
||||
else
|
||||
{
|
||||
uint32 buf_state;
|
||||
|
||||
victim_buffer = GetLocalVictimBuffer();
|
||||
bufid = -victim_buffer - 1;
|
||||
bufHdr = GetLocalBufferDescriptor(bufid);
|
||||
|
||||
hresult = (LocalBufferLookupEnt *)
|
||||
hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
|
||||
if (found) /* shouldn't happen */
|
||||
elog(ERROR, "local buffer hash table corrupted");
|
||||
hresult->id = bufid;
|
||||
|
||||
/*
|
||||
* it's all ours now.
|
||||
*/
|
||||
bufHdr->tag = newTag;
|
||||
|
||||
buf_state = pg_atomic_read_u32(&bufHdr->state);
|
||||
buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
|
||||
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
|
||||
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
|
||||
|
||||
*foundPtr = false;
|
||||
}
|
||||
|
||||
#ifdef LBDEBUG
|
||||
fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
|
||||
smgr->smgr_rlocator.locator.relNumber, forkNum, blockNum,
|
||||
-nextFreeLocalBuf - 1);
|
||||
#endif
|
||||
return bufHdr;
|
||||
}
|
||||
|
||||
static Buffer
|
||||
GetLocalVictimBuffer(void)
|
||||
{
|
||||
int victim_bufid;
|
||||
int trycounter;
|
||||
uint32 buf_state;
|
||||
BufferDesc *bufHdr;
|
||||
|
||||
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
|
||||
|
||||
/*
|
||||
* Need to get a new buffer. We use a clock sweep algorithm (essentially
|
||||
@ -155,14 +183,14 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
|
||||
trycounter = NLocBuffer;
|
||||
for (;;)
|
||||
{
|
||||
b = nextFreeLocalBuf;
|
||||
victim_bufid = nextFreeLocalBufId;
|
||||
|
||||
if (++nextFreeLocalBuf >= NLocBuffer)
|
||||
nextFreeLocalBuf = 0;
|
||||
if (++nextFreeLocalBufId >= NLocBuffer)
|
||||
nextFreeLocalBufId = 0;
|
||||
|
||||
bufHdr = GetLocalBufferDescriptor(b);
|
||||
bufHdr = GetLocalBufferDescriptor(victim_bufid);
|
||||
|
||||
if (LocalRefCount[b] == 0)
|
||||
if (LocalRefCount[victim_bufid] == 0)
|
||||
{
|
||||
buf_state = pg_atomic_read_u32(&bufHdr->state);
|
||||
|
||||
@ -185,6 +213,15 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
|
||||
errmsg("no empty local buffer available")));
|
||||
}
|
||||
|
||||
/*
|
||||
* lazy memory allocation: allocate space on first use of a buffer.
|
||||
*/
|
||||
if (LocalBufHdrGetBlock(bufHdr) == NULL)
|
||||
{
|
||||
/* Set pointer for use by BufferGetBlock() macro */
|
||||
LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
|
||||
}
|
||||
|
||||
/*
|
||||
* this buffer is not referenced but it might still be dirty. if that's
|
||||
* the case, write it out before reusing it!
|
||||
@ -216,48 +253,24 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
|
||||
}
|
||||
|
||||
/*
|
||||
* lazy memory allocation: allocate space on first use of a buffer.
|
||||
*/
|
||||
if (LocalBufHdrGetBlock(bufHdr) == NULL)
|
||||
{
|
||||
/* Set pointer for use by BufferGetBlock() macro */
|
||||
LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the hash table: remove old entry, if any, and make new one.
|
||||
* Remove the victim buffer from the hashtable and mark as invalid.
|
||||
*/
|
||||
if (buf_state & BM_TAG_VALID)
|
||||
{
|
||||
LocalBufferLookupEnt *hresult;
|
||||
|
||||
hresult = (LocalBufferLookupEnt *)
|
||||
hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
|
||||
if (!hresult) /* shouldn't happen */
|
||||
elog(ERROR, "local buffer hash table corrupted");
|
||||
/* mark buffer invalid just in case hash insert fails */
|
||||
ClearBufferTag(&bufHdr->tag);
|
||||
buf_state &= ~(BM_VALID | BM_TAG_VALID);
|
||||
buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
|
||||
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
|
||||
pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT);
|
||||
}
|
||||
|
||||
hresult = (LocalBufferLookupEnt *)
|
||||
hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
|
||||
if (found) /* shouldn't happen */
|
||||
elog(ERROR, "local buffer hash table corrupted");
|
||||
hresult->id = b;
|
||||
|
||||
/*
|
||||
* it's all ours now.
|
||||
*/
|
||||
bufHdr->tag = newTag;
|
||||
buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
|
||||
buf_state |= BM_TAG_VALID;
|
||||
buf_state &= ~BUF_USAGECOUNT_MASK;
|
||||
buf_state += BUF_USAGECOUNT_ONE;
|
||||
pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
|
||||
|
||||
*foundPtr = false;
|
||||
return bufHdr;
|
||||
return BufferDescriptorGetBuffer(bufHdr);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -424,7 +437,7 @@ InitLocalBuffers(void)
|
||||
(errcode(ERRCODE_OUT_OF_MEMORY),
|
||||
errmsg("out of memory")));
|
||||
|
||||
nextFreeLocalBuf = 0;
|
||||
nextFreeLocalBufId = 0;
|
||||
|
||||
/* initialize fields that need to start off nonzero */
|
||||
for (i = 0; i < nbufs; i++)
|
||||
|
Reference in New Issue
Block a user