1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-17 17:02:08 +03:00

Invent ResourceOwner mechanism as per my recent proposal, and use it to

keep track of portal-related resources separately from transaction-related
resources.  This allows cursors to work in a somewhat sane fashion with
nested transactions.  For now, cursor behavior is non-subtransactional,
that is a cursor's state does not roll back if you abort a subtransaction
that fetched from the cursor.  We might want to change that later.
This commit is contained in:
Tom Lane
2004-07-17 03:32:14 +00:00
parent f4c069ca8f
commit fe548629c5
41 changed files with 2086 additions and 1192 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.172 2004/07/01 00:50:46 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.173 2004/07/17 03:28:49 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -45,8 +45,8 @@
#include "storage/bufpage.h"
#include "storage/proc.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/relcache.h"
#include "utils/resowner.h"
#include "pgstat.h"
@ -65,13 +65,9 @@ long NDirectFileRead; /* some I/O's are direct file access.
* bypass bufmgr */
long NDirectFileWrite; /* e.g., I/O in psort and hashjoin. */
/* List of upper-level-transaction buffer refcount arrays */
static List *upperRefCounts = NIL;
static void PinBuffer(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf);
static void BufferFixLeak(Buffer bufnum, int32 shouldBe, bool emitWarning);
static void PinBuffer(BufferDesc *buf, bool fixOwner);
static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
static void WaitIO(BufferDesc *buf);
static void StartBufferIO(BufferDesc *buf, bool forInput);
static void TerminateBufferIO(BufferDesc *buf, int err_flag);
@ -103,6 +99,7 @@ static void write_buffer(Buffer buffer, bool unpin);
Buffer
ReadBuffer(Relation reln, BlockNumber blockNum)
{
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
return ReadBufferInternal(reln, blockNum, false);
}
@ -111,6 +108,8 @@ ReadBuffer(Relation reln, BlockNumber blockNum)
*
* bufferLockHeld: if true, caller already acquired the bufmgr lock.
* (This is assumed never to be true if dealing with a local buffer!)
*
* The caller must have done ResourceOwnerEnlargeBuffers(CurrentResourceOwner)
*/
static Buffer
ReadBufferInternal(Relation reln, BlockNumber blockNum,
@ -287,7 +286,7 @@ BufferAlloc(Relation reln,
*/
*foundPtr = TRUE;
PinBuffer(buf);
PinBuffer(buf, true);
if (!(buf->flags & BM_VALID))
{
@ -337,6 +336,9 @@ BufferAlloc(Relation reln,
buf->refcount = 1;
PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1;
ResourceOwnerRememberBuffer(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf));
if ((buf->flags & BM_VALID) &&
(buf->flags & BM_DIRTY || buf->cntxDirty))
{
@ -382,7 +384,7 @@ BufferAlloc(Relation reln,
* buffer we were planning to use.
*/
TerminateBufferIO(buf, 0);
UnpinBuffer(buf);
UnpinBuffer(buf, true);
buf = buf2;
@ -390,7 +392,7 @@ BufferAlloc(Relation reln,
*foundPtr = TRUE;
PinBuffer(buf);
PinBuffer(buf, true);
if (!(buf->flags & BM_VALID))
{
@ -425,7 +427,7 @@ BufferAlloc(Relation reln,
if (buf->refcount > 1 || buf->flags & BM_DIRTY || buf->cntxDirty)
{
TerminateBufferIO(buf, 0);
UnpinBuffer(buf);
UnpinBuffer(buf, true);
inProgress = FALSE;
buf = NULL;
}
@ -497,7 +499,7 @@ write_buffer(Buffer buffer, bool release)
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
if (release)
UnpinBuffer(bufHdr);
UnpinBuffer(bufHdr, true);
LWLockRelease(BufMgrLock);
}
@ -561,6 +563,8 @@ ReleaseAndReadBuffer(Buffer buffer,
if (bufHdr->tag.blockNum == blockNum &&
RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
return buffer;
ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
/* owner now has a free slot, so no need for Enlarge() */
LocalRefCount[-buffer - 1]--;
}
else
@ -570,16 +574,20 @@ ReleaseAndReadBuffer(Buffer buffer,
if (bufHdr->tag.blockNum == blockNum &&
RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
return buffer;
ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
/* owner now has a free slot, so no need for Enlarge() */
if (PrivateRefCount[buffer - 1] > 1)
PrivateRefCount[buffer - 1]--;
else
{
LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
UnpinBuffer(bufHdr);
UnpinBuffer(bufHdr, false);
return ReadBufferInternal(relation, blockNum, true);
}
}
}
else
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
return ReadBufferInternal(relation, blockNum, false);
}
@ -589,9 +597,12 @@ ReleaseAndReadBuffer(Buffer buffer,
*
* This should be applied only to shared buffers, never local ones.
* Bufmgr lock must be held by caller.
*
* Most but not all callers want CurrentResourceOwner to be adjusted.
* Note that ResourceOwnerEnlargeBuffers must have been done already.
*/
static void
PinBuffer(BufferDesc *buf)
PinBuffer(BufferDesc *buf, bool fixOwner)
{
int b = BufferDescriptorGetBuffer(buf) - 1;
@ -599,6 +610,9 @@ PinBuffer(BufferDesc *buf)
buf->refcount++;
PrivateRefCount[b]++;
Assert(PrivateRefCount[b] > 0);
if (fixOwner)
ResourceOwnerRememberBuffer(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf));
}
/*
@ -606,12 +620,18 @@ PinBuffer(BufferDesc *buf)
*
* This should be applied only to shared buffers, never local ones.
* Bufmgr lock must be held by caller.
*
* Most but not all callers want CurrentResourceOwner to be adjusted.
*/
static void
UnpinBuffer(BufferDesc *buf)
UnpinBuffer(BufferDesc *buf, bool fixOwner)
{
int b = BufferDescriptorGetBuffer(buf) - 1;
if (fixOwner)
ResourceOwnerForgetBuffer(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf));
Assert(buf->refcount > 0);
Assert(PrivateRefCount[b] > 0);
PrivateRefCount[b]--;
@ -677,6 +697,9 @@ BufferSync(int percent, int maxpages)
if (maxpages > 0 && num_buffer_dirty > maxpages)
num_buffer_dirty = maxpages;
/* Make sure we can handle the pin inside the loop */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
/*
* Loop over buffers to be written. Note the BufMgrLock is held at
* loop top, but is released and reacquired within FlushBuffer,
@ -724,13 +747,13 @@ BufferSync(int percent, int maxpages)
* buffer now and set IO state for it *before* acquiring shlock to
* avoid conflicts with FlushRelationBuffers.
*/
PinBuffer(bufHdr);
PinBuffer(bufHdr, true);
StartBufferIO(bufHdr, false);
FlushBuffer(bufHdr, NULL);
TerminateBufferIO(bufHdr, 0);
UnpinBuffer(bufHdr);
UnpinBuffer(bufHdr, true);
}
LWLockRelease(BufMgrLock);
@ -831,104 +854,34 @@ AtEOXact_Buffers(bool isCommit)
for (i = 0; i < NBuffers; i++)
{
if (PrivateRefCount[i] != 0)
BufferFixLeak(i, 0, isCommit);
{
BufferDesc *buf = &(BufferDescriptors[i]);
if (isCommit)
elog(WARNING,
"buffer refcount leak: [%03d] "
"(rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)",
i,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
/*
* We don't worry about updating the ResourceOwner structures;
* resowner.c will clear them for itself.
*/
PrivateRefCount[i] = 1; /* make sure we release shared pin */
LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
UnpinBuffer(buf, false);
LWLockRelease(BufMgrLock);
Assert(PrivateRefCount[i] == 0);
}
}
AtEOXact_LocalBuffers(isCommit);
}
/*
* During subtransaction start, save buffer reference counts.
*/
void
AtSubStart_Buffers(void)
{
int32 *copyRefCounts;
Size rcSize;
MemoryContext old_cxt;
/* this is probably the active context already, but be safe */
old_cxt = MemoryContextSwitchTo(CurTransactionContext);
/*
* We need to copy the current state of PrivateRefCount[]. In the typical
* scenario, few if any of the entries will be nonzero, and we could save
* space by storing only the nonzero ones. However, copying the whole
* thing is lots simpler and faster both here and in AtEOSubXact_Buffers,
* so it seems best to waste the space.
*/
rcSize = NBuffers * sizeof(int32);
copyRefCounts = (int32 *) palloc(rcSize);
memcpy(copyRefCounts, PrivateRefCount, rcSize);
/* Attach to list */
upperRefCounts = lcons(copyRefCounts, upperRefCounts);
MemoryContextSwitchTo(old_cxt);
}
/*
* AtEOSubXact_Buffers
*
* At subtransaction end, we restore the saved counts. If committing, we
* complain if the refcounts don't match; if aborting, just restore silently.
*/
void
AtEOSubXact_Buffers(bool isCommit)
{
int32 *oldRefCounts;
int i;
oldRefCounts = (int32 *) linitial(upperRefCounts);
upperRefCounts = list_delete_first(upperRefCounts);
for (i = 0; i < NBuffers; i++)
{
if (PrivateRefCount[i] != oldRefCounts[i])
BufferFixLeak(i, oldRefCounts[i], isCommit);
}
pfree(oldRefCounts);
}
/*
* Fix a buffer refcount leak.
*
* The caller does not hold the BufMgrLock.
*/
static void
BufferFixLeak(Buffer bufnum, int32 shouldBe, bool emitWarning)
{
BufferDesc *buf = &(BufferDescriptors[bufnum]);
if (emitWarning)
elog(WARNING,
"buffer refcount leak: [%03d] (rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d, should be=%d)",
bufnum,
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode,
buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[bufnum], shouldBe);
/* If it's less, we're in a heap o' trouble */
if (PrivateRefCount[bufnum] <= shouldBe)
elog(FATAL, "buffer refcount was decreased by subtransaction");
if (shouldBe > 0)
{
/* We still keep the shared-memory pin */
PrivateRefCount[bufnum] = shouldBe;
}
else
{
PrivateRefCount[bufnum] = 1; /* make sure we release shared pin */
LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
UnpinBuffer(buf);
LWLockRelease(BufMgrLock);
Assert(PrivateRefCount[bufnum] == 0);
}
}
/*
* FlushBufferPool
*
@ -1172,9 +1125,15 @@ DropRelFileNodeBuffers(RelFileNode rnode, bool istemp,
if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
bufHdr->tag.blockNum >= firstDelBlock)
{
if (LocalRefCount[i] != 0)
elog(FATAL, "block %u of %u/%u/%u is still referenced (local %u)",
bufHdr->tag.blockNum,
bufHdr->tag.rnode.spcNode,
bufHdr->tag.rnode.dbNode,
bufHdr->tag.rnode.relNode,
LocalRefCount[i]);
bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
bufHdr->cntxDirty = false;
LocalRefCount[i] = 0;
bufHdr->tag.rnode.relNode = InvalidOid;
}
}
@ -1205,29 +1164,22 @@ recheck:
*/
goto recheck;
}
/*
* There should be no pin on the buffer.
*/
if (bufHdr->refcount != 0)
elog(FATAL, "block %u of %u/%u/%u is still referenced (private %d, global %u)",
bufHdr->tag.blockNum,
bufHdr->tag.rnode.spcNode,
bufHdr->tag.rnode.dbNode,
bufHdr->tag.rnode.relNode,
PrivateRefCount[i - 1], bufHdr->refcount);
/* Now we can do what we came for */
bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
bufHdr->cntxDirty = false;
/*
* Release any refcount we may have. If someone else has a
* pin on the buffer, we got trouble.
*/
if (bufHdr->refcount != 0)
{
/* the sole pin should be ours */
if (bufHdr->refcount != 1 || PrivateRefCount[i - 1] == 0)
elog(FATAL, "block %u of %u/%u/%u is still referenced (private %d, global %u)",
bufHdr->tag.blockNum,
bufHdr->tag.rnode.spcNode,
bufHdr->tag.rnode.dbNode,
bufHdr->tag.rnode.relNode,
PrivateRefCount[i - 1], bufHdr->refcount);
/* Make sure it will be released */
PrivateRefCount[i - 1] = 1;
UnpinBuffer(bufHdr);
}
/*
* And mark the buffer as no longer occupied by this rel.
*/
@ -1353,7 +1305,7 @@ PrintPinnedBufs(void)
for (i = 0; i < NBuffers; ++i, ++buf)
{
if (PrivateRefCount[i] > 0)
elog(WARNING,
elog(NOTICE,
"[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u/%u, "
"blockNum=%u, flags=0x%x, refcount=%u %d)",
i, buf->freeNext, buf->freePrev,
@ -1456,6 +1408,9 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
return;
}
/* Make sure we can handle the pin inside the loop */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
for (i = 0; i < NBuffers; i++)
@ -1466,7 +1421,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
if ((bufHdr->flags & BM_VALID) &&
(bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty))
{
PinBuffer(bufHdr);
PinBuffer(bufHdr, true);
/* Someone else might be flushing buffer */
if (bufHdr->flags & BM_IO_IN_PROGRESS)
WaitIO(bufHdr);
@ -1479,7 +1434,7 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
TerminateBufferIO(bufHdr, 0);
}
UnpinBuffer(bufHdr);
UnpinBuffer(bufHdr, true);
if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
elog(ERROR, "FlushRelationBuffers(\"%s\", %u): block %u was re-dirtied",
RelationGetRelationName(rel), firstDelBlock,
@ -1507,6 +1462,8 @@ ReleaseBuffer(Buffer buffer)
{
BufferDesc *bufHdr;
ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
if (BufferIsLocal(buffer))
{
Assert(LocalRefCount[-buffer - 1] > 0);
@ -1526,11 +1483,39 @@ ReleaseBuffer(Buffer buffer)
else
{
LWLockAcquire(BufMgrLock, LW_EXCLUSIVE);
UnpinBuffer(bufHdr);
UnpinBuffer(bufHdr, false);
LWLockRelease(BufMgrLock);
}
}
/*
* IncrBufferRefCount
* Increment the pin count on a buffer that we have *already* pinned
* at least once.
*
* This function cannot be used on a buffer we do not have pinned,
* because it doesn't change the shared buffer state. Therefore the
* Assert checks are for refcount > 0. Someone got this wrong once...
*/
void
IncrBufferRefCount(Buffer buffer)
{
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
if (BufferIsLocal(buffer))
{
Assert(buffer >= -NLocBuffer);
Assert(LocalRefCount[-buffer - 1] > 0);
LocalRefCount[-buffer - 1]++;
}
else
{
Assert(!BAD_BUFFER_ID(buffer));
Assert(PrivateRefCount[buffer - 1] > 0);
PrivateRefCount[buffer - 1]++;
}
}
#ifdef NOT_USED
void
IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)