1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Implement LockBufferForCleanup(), which will allow concurrent VACUUM

to wait until it's safe to remove tuples and compact free space in a
shared buffer page.  Miscellaneous small code cleanups in bufmgr, too.
This commit is contained in:
Tom Lane
2001-07-06 21:04:26 +00:00
parent 1e9e5defc2
commit 55432fedd2
11 changed files with 418 additions and 254 deletions

View File

@@ -0,0 +1,100 @@
$Header: /cvsroot/pgsql/src/backend/storage/buffer/README,v 1.1 2001/07/06 21:04:25 tgl Exp $
Notes about shared buffer access rules
--------------------------------------
There are two separate access control mechanisms for shared disk buffers:
reference counts (a/k/a pin counts) and buffer locks. (Actually, there's
a third level of access control: one must hold the appropriate kind of
lock on a relation before one can legally access any page belonging to
the relation. Relation-level locks are not discussed here.)
Pins: one must "hold a pin on" a buffer (increment its reference count)
before being allowed to do anything at all with it. An unpinned buffer is
subject to being reclaimed and reused for a different page at any instant,
so touching it is unsafe. Typically a pin is acquired via ReadBuffer and
released via WriteBuffer (if one modified the page) or ReleaseBuffer (if not).
It is OK and indeed common for a single backend to pin a page more than
once concurrently; the buffer manager handles this efficiently. It is
considered OK to hold a pin for long intervals --- for example, sequential
scans hold a pin on the current page until done processing all the tuples
on the page, which could be quite a while if the scan is the outer scan of
a join. Similarly, btree index scans hold a pin on the current index page.
This is OK because normal operations never wait for a page's pin count to
drop to zero. (Anything that might need to do such a wait is instead
handled by waiting to obtain the relation-level lock, which is why you'd
better hold one first.) Pins may not be held across transaction
boundaries, however.
Buffer locks: there are two kinds of buffer locks, shared and exclusive,
which act just as you'd expect: multiple backends can hold shared locks on
the same buffer, but an exclusive lock prevents anyone else from holding
either shared or exclusive lock. (These can alternatively be called READ
and WRITE locks.) These locks are short-term: they should not be held for
long. They are implemented as per-buffer spinlocks, so another backend
trying to acquire a competing lock will spin as long as you hold yours!
Buffer locks are acquired and released by LockBuffer(). It will *not* work
for a single backend to try to acquire multiple locks on the same buffer.
One must pin a buffer before trying to lock it.
Buffer access rules:
1. To scan a page for tuples, one must hold a pin and either shared or
exclusive lock. To examine the commit status (XIDs and status bits) of
a tuple in a shared buffer, one must likewise hold a pin and either shared
or exclusive lock.
2. Once one has determined that a tuple is interesting (visible to the
current transaction) one may drop the buffer lock, yet continue to access
the tuple's data for as long as one holds the buffer pin. This is what is
typically done by heap scans, since the tuple returned by heap_fetch
contains a pointer to tuple data in the shared buffer. Therefore the
tuple cannot go away while the pin is held (see rule #5). Its state could
change, but that is assumed not to matter after the initial determination
of visibility is made.
3. To add a tuple or change the xmin/xmax fields of an existing tuple,
one must hold a pin and an exclusive lock on the containing buffer.
This ensures that no one else might see a partially-updated state of the
tuple.
4. It is considered OK to update tuple commit status bits (ie, OR the
values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID, HEAP_XMAX_COMMITTED, or
HEAP_XMAX_INVALID into t_infomask) while holding only a shared lock and
pin on a buffer. This is OK because another backend looking at the tuple
at about the same time would OR the same bits into the field, so there
is little or no risk of conflicting update; what's more, if there did
manage to be a conflict it would merely mean that one bit-update would
be lost and need to be done again later. These four bits are only hints
(they cache the results of transaction status lookups in pg_log), so no
great harm is done if they get reset to zero by conflicting updates.
5. To physically remove a tuple or compact free space on a page, one
must hold a pin and an exclusive lock, *and* observe while holding the
exclusive lock that the buffer's shared reference count is one (ie,
no other backend holds a pin). If these conditions are met then no other
backend can perform a page scan until the exclusive lock is dropped, and
no other backend can be holding a reference to an existing tuple that it
might expect to examine again. Note that another backend might pin the
buffer (increment the refcount) while one is performing the cleanup, but
it won't be able to actually examine the page until it acquires shared
or exclusive lock.
As of 7.1, the only operation that removes tuples or compacts free space is
(oldstyle) VACUUM. It does not have to implement rule #5 directly, because
it instead acquires exclusive lock at the relation level, which ensures
indirectly that no one else is accessing pages of the relation at all.
To implement concurrent VACUUM we will need to make it obey rule #5 fully.
To do this, we'll create a new buffer manager operation
LockBufferForCleanup() that gets an exclusive lock and then checks to see
if the shared pin count is currently 1. If not, it releases the exclusive
lock (but not the caller's pin) and waits until signaled by another backend,
whereupon it tries again. The signal will occur when UnpinBuffer
decrements the shared pin count to 1. As indicated above, this operation
might have to wait a good while before it acquires lock, but that shouldn't
matter much for concurrent VACUUM. The current implementation only
supports a single waiter for pin-count-1 on any particular shared buffer.
This is enough for VACUUM's use, since we don't allow multiple VACUUMs
concurrently on a single relation anyway.

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/buf_init.c,v 1.42 2001/03/22 03:59:44 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/buf_init.c,v 1.43 2001/07/06 21:04:25 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -63,7 +63,6 @@ long *PrivateRefCount; /* also used in freelist.c */
bits8 *BufferLocks; /* flag bits showing locks I have set */
BufferTag *BufferTagLastDirtied; /* tag buffer had when last
* dirtied by me */
BufferBlindId *BufferBlindLastDirtied;
bool *BufferDirtiedByMe; /* T if buf has been dirtied in cur xact */
@@ -237,7 +236,6 @@ InitBufferPoolAccess(void)
PrivateRefCount = (long *) calloc(NBuffers, sizeof(long));
BufferLocks = (bits8 *) calloc(NBuffers, sizeof(bits8));
BufferTagLastDirtied = (BufferTag *) calloc(NBuffers, sizeof(BufferTag));
BufferBlindLastDirtied = (BufferBlindId *) calloc(NBuffers, sizeof(BufferBlindId));
BufferDirtiedByMe = (bool *) calloc(NBuffers, sizeof(bool));
/*

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.115 2001/07/02 18:47:18 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.116 2001/07/06 21:04:25 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -46,14 +46,12 @@
#include <math.h>
#include <signal.h>
#include "executor/execdebug.h"
#include "miscadmin.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/s_lock.h"
#include "storage/proc.h"
#include "storage/smgr.h"
#include "utils/relcache.h"
#include "catalog/pg_database.h"
#include "pgstat.h"
@@ -254,7 +252,7 @@ ReadBufferInternal(Relation reln, BlockNumber blockNum,
if (!BufTableDelete(bufHdr))
{
SpinRelease(BufMgrLock);
elog(FATAL, "BufRead: buffer table broken after IO error\n");
elog(FATAL, "BufRead: buffer table broken after IO error");
}
/* remember that BufferAlloc() pinned the buffer */
UnpinBuffer(bufHdr);
@@ -426,33 +424,27 @@ BufferAlloc(Relation reln,
if (smok == FALSE)
{
elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s",
buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
elog(NOTICE, "BufferAlloc: cannot write block %u for %u/%u",
buf->tag.blockNum,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode);
inProgress = FALSE;
buf->flags |= BM_IO_ERROR;
buf->flags &= ~BM_IO_IN_PROGRESS;
TerminateBufferIO(buf);
PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
Assert(buf->refcount > 0);
buf->refcount--;
if (buf->refcount == 0)
{
AddBufferToFreelist(buf);
buf->flags |= BM_FREE;
}
UnpinBuffer(buf);
buf = (BufferDesc *) NULL;
}
else
{
/*
* BM_JUST_DIRTIED cleared by BufferReplace and shouldn't
* be setted by anyone. - vadim 01/17/97
*/
if (buf->flags & BM_JUST_DIRTIED)
{
elog(STOP, "BufferAlloc: content of block %u (%s) changed while flushing",
buf->tag.blockNum, buf->blind.relname);
elog(STOP, "BufferAlloc: content of block %u (%u/%u) changed while flushing",
buf->tag.blockNum,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode);
}
else
buf->flags &= ~BM_DIRTY;
@@ -475,8 +467,7 @@ BufferAlloc(Relation reln,
inProgress = FALSE;
buf->flags &= ~BM_IO_IN_PROGRESS;
TerminateBufferIO(buf);
PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
buf->refcount--;
UnpinBuffer(buf);
buf = (BufferDesc *) NULL;
}
@@ -501,15 +492,8 @@ BufferAlloc(Relation reln,
{
buf->flags &= ~BM_IO_IN_PROGRESS;
TerminateBufferIO(buf);
/* give up the buffer since we don't need it any more */
PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
Assert(buf->refcount > 0);
buf->refcount--;
if (buf->refcount == 0)
{
AddBufferToFreelist(buf);
buf->flags |= BM_FREE;
}
/* give up old buffer since we don't need it any more */
UnpinBuffer(buf);
}
PinBuffer(buf2);
@@ -551,18 +535,15 @@ BufferAlloc(Relation reln,
if (!BufTableDelete(buf))
{
SpinRelease(BufMgrLock);
elog(FATAL, "buffer wasn't in the buffer table\n");
elog(FATAL, "buffer wasn't in the buffer table");
}
/* record the database name and relation name for this buffer */
strcpy(buf->blind.dbname, (DatabaseName) ? DatabaseName : "Recovery");
strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln));
INIT_BUFFERTAG(&(buf->tag), reln, blockNum);
if (!BufTableInsert(buf))
{
SpinRelease(BufMgrLock);
elog(FATAL, "Buffer in lookup table twice \n");
elog(FATAL, "Buffer in lookup table twice");
}
/*
@@ -704,14 +685,7 @@ ReleaseAndReadBuffer(Buffer buffer,
else
{
SpinAcquire(BufMgrLock);
PrivateRefCount[buffer - 1] = 0;
Assert(bufHdr->refcount > 0);
bufHdr->refcount--;
if (bufHdr->refcount == 0)
{
AddBufferToFreelist(bufHdr);
bufHdr->flags |= BM_FREE;
}
UnpinBuffer(bufHdr);
return ReadBufferInternal(relation, blockNum, true);
}
}
@@ -831,8 +805,9 @@ BufferSync()
}
if (status == SM_FAIL) /* disk failure ?! */
elog(STOP, "BufferSync: cannot write %u for %s",
bufHdr->tag.blockNum, bufHdr->blind.relname);
elog(STOP, "BufferSync: cannot write %u for %u/%u",
bufHdr->tag.blockNum,
bufHdr->tag.rnode.tblNode, bufHdr->tag.rnode.relNode);
/*
* Note that it's safe to change cntxDirty here because of we
@@ -956,16 +931,11 @@ ResetBufferPool(bool isCommit)
{
BufferDesc *buf = &BufferDescriptors[i];
PrivateRefCount[i] = 1; /* make sure we release shared pin */
SpinAcquire(BufMgrLock);
PrivateRefCount[i] = 0;
Assert(buf->refcount > 0);
buf->refcount--;
if (buf->refcount == 0)
{
AddBufferToFreelist(buf);
buf->flags |= BM_FREE;
}
UnpinBuffer(buf);
SpinRelease(BufMgrLock);
Assert(PrivateRefCount[i] == 0);
}
}
@@ -975,32 +945,31 @@ ResetBufferPool(bool isCommit)
smgrabort();
}
/* -----------------------------------------------
* BufferPoolCheckLeak
/*
* BufferPoolCheckLeak
*
* check if there is buffer leak
*
* -----------------------------------------------
*/
int
BufferPoolCheckLeak()
bool
BufferPoolCheckLeak(void)
{
int i;
int result = 0;
bool result = false;
for (i = 1; i <= NBuffers; i++)
for (i = 0; i < NBuffers; i++)
{
if (PrivateRefCount[i - 1] != 0)
if (PrivateRefCount[i] != 0)
{
BufferDesc *buf = &(BufferDescriptors[i - 1]);
BufferDesc *buf = &(BufferDescriptors[i]);
elog(NOTICE,
"Buffer Leak: [%03d] (freeNext=%d, freePrev=%d, \
relname=%s, blockNum=%d, flags=0x%x, refcount=%d %ld)",
i - 1, buf->freeNext, buf->freePrev,
buf->blind.relname, buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i - 1]);
result = 1;
rel=%u/%u, blockNum=%u, flags=0x%x, refcount=%d %ld)",
i, buf->freeNext, buf->freePrev,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
result = true;
}
}
return result;
@@ -1389,10 +1358,11 @@ PrintBufferDescs()
SpinAcquire(BufMgrLock);
for (i = 0; i < NBuffers; ++i, ++buf)
{
elog(DEBUG, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \
blockNum=%d, flags=0x%x, refcount=%d %ld)",
elog(DEBUG, "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u, \
blockNum=%u, flags=0x%x, refcount=%d %ld)",
i, buf->freeNext, buf->freePrev,
buf->blind.relname, buf->tag.blockNum, buf->flags,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
}
SpinRelease(BufMgrLock);
@@ -1402,8 +1372,9 @@ blockNum=%d, flags=0x%x, refcount=%d %ld)",
/* interactive backend */
for (i = 0; i < NBuffers; ++i, ++buf)
{
printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n",
i, buf->blind.relname, buf->tag.blockNum,
printf("[%-2d] (%u/%u, %u) flags=0x%x, refcnt=%d %ld)\n",
i, buf->tag.rnode.tblNode, buf->tag.rnode.relNode,
buf->tag.blockNum,
buf->flags, buf->refcount, PrivateRefCount[i]);
}
}
@@ -1419,9 +1390,10 @@ PrintPinnedBufs()
for (i = 0; i < NBuffers; ++i, ++buf)
{
if (PrivateRefCount[i] > 0)
elog(NOTICE, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \
blockNum=%d, flags=0x%x, refcount=%d %ld)\n",
i, buf->freeNext, buf->freePrev, buf->blind.relname,
elog(NOTICE, "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u, \
blockNum=%u, flags=0x%x, refcount=%d %ld)",
i, buf->freeNext, buf->freePrev,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode,
buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]);
}
@@ -1581,8 +1553,10 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
(char *) MAKE_PTR(bufHdr->data));
if (status == SM_FAIL) /* disk failure ?! */
elog(STOP, "FlushRelationBuffers: cannot write %u for %s",
bufHdr->tag.blockNum, bufHdr->blind.relname);
elog(STOP, "FlushRelationBuffers: cannot write %u for %u/%u",
bufHdr->tag.blockNum,
bufHdr->tag.rnode.tblNode,
bufHdr->tag.rnode.relNode);
BufferFlushCount++;
@@ -1624,7 +1598,6 @@ FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
/*
* ReleaseBuffer -- remove the pin on a buffer without
* marking it dirty.
*
*/
int
ReleaseBuffer(Buffer buffer)
@@ -1649,14 +1622,7 @@ ReleaseBuffer(Buffer buffer)
else
{
SpinAcquire(BufMgrLock);
PrivateRefCount[buffer - 1] = 0;
Assert(bufHdr->refcount > 0);
bufHdr->refcount--;
if (bufHdr->refcount == 0)
{
AddBufferToFreelist(bufHdr);
bufHdr->flags |= BM_FREE;
}
UnpinBuffer(bufHdr);
SpinRelease(BufMgrLock);
}
@@ -1665,7 +1631,7 @@ ReleaseBuffer(Buffer buffer)
/*
* ReleaseBufferWithBufferLock
* Same as ReleaseBuffer except we hold the lock
* Same as ReleaseBuffer except we hold the bufmgr lock
*/
static int
ReleaseBufferWithBufferLock(Buffer buffer)
@@ -1688,16 +1654,7 @@ ReleaseBufferWithBufferLock(Buffer buffer)
if (PrivateRefCount[buffer - 1] > 1)
PrivateRefCount[buffer - 1]--;
else
{
PrivateRefCount[buffer - 1] = 0;
Assert(bufHdr->refcount > 0);
bufHdr->refcount--;
if (bufHdr->refcount == 0)
{
AddBufferToFreelist(bufHdr);
bufHdr->flags |= BM_FREE;
}
}
UnpinBuffer(bufHdr);
return STATUS_OK;
}
@@ -1712,9 +1669,11 @@ IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
{
BufferDesc *buf = &BufferDescriptors[buffer - 1];
fprintf(stderr, "PIN(Incr) %d relname = %s, blockNum = %d, \
fprintf(stderr, "PIN(Incr) %d rel = %u/%u, blockNum = %u, \
refcount = %ld, file: %s, line: %d\n",
buffer, buf->blind.relname, buf->tag.blockNum,
buffer,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode,
buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line);
}
}
@@ -1730,9 +1689,11 @@ ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
{
BufferDesc *buf = &BufferDescriptors[buffer - 1];
fprintf(stderr, "UNPIN(Rel) %d relname = %s, blockNum = %d, \
fprintf(stderr, "UNPIN(Rel) %d rel = %u/%u, blockNum = %u, \
refcount = %ld, file: %s, line: %d\n",
buffer, buf->blind.relname, buf->tag.blockNum,
buffer,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode,
buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line);
}
}
@@ -1757,18 +1718,22 @@ ReleaseAndReadBuffer_Debug(char *file,
{
BufferDesc *buf = &BufferDescriptors[buffer - 1];
fprintf(stderr, "UNPIN(Rel&Rd) %d relname = %s, blockNum = %d, \
fprintf(stderr, "UNPIN(Rel&Rd) %d rel = %u/%u, blockNum = %u, \
refcount = %ld, file: %s, line: %d\n",
buffer, buf->blind.relname, buf->tag.blockNum,
buffer,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode,
buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line);
}
if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
{
BufferDesc *buf = &BufferDescriptors[b - 1];
fprintf(stderr, "PIN(Rel&Rd) %d relname = %s, blockNum = %d, \
fprintf(stderr, "PIN(Rel&Rd) %d rel = %u/%u, blockNum = %u, \
refcount = %ld, file: %s, line: %d\n",
b, buf->blind.relname, buf->tag.blockNum,
b,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode,
buf->tag.blockNum,
PrivateRefCount[b - 1], file, line);
}
return b;
@@ -1784,6 +1749,7 @@ refcount = %ld, file: %s, line: %d\n",
* and die if there's anything fishy.
*/
void
_bm_trace(Oid dbId, Oid relId, int blkNo, int bufNo, int allocType)
{
long start,
@@ -1835,6 +1801,7 @@ okay:
*CurTraceBuf = (start + 1) % BMT_LIMIT;
}
void
_bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
int allocType, long start, long cur)
{
@@ -1860,7 +1827,7 @@ _bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
tb = &TraceBuf[i];
if (tb->bmt_op != BMT_NOTUSED)
{
fprintf(fp, " [%3d]%spid %d buf %2d for <%d,%u,%d> ",
fprintf(fp, " [%3d]%spid %d buf %2d for <%u,%u,%u> ",
i, (i == cur ? " ---> " : "\t"),
tb->bmt_pid, tb->bmt_buf,
tb->bmt_dbid, tb->bmt_relid, tb->bmt_blkno);
@@ -1967,7 +1934,9 @@ UnlockBuffers(void)
for (i = 0; i < NBuffers; i++)
{
if (BufferLocks[i] == 0)
bits8 buflocks = BufferLocks[i];
if (buflocks == 0)
continue;
Assert(BufferIsValid(i + 1));
@@ -1977,14 +1946,13 @@ UnlockBuffers(void)
S_LOCK(&(buf->cntx_lock));
if (BufferLocks[i] & BL_R_LOCK)
if (buflocks & BL_R_LOCK)
{
Assert(buf->r_locks > 0);
(buf->r_locks)--;
}
if (BufferLocks[i] & BL_RI_LOCK)
if (buflocks & BL_RI_LOCK)
{
/*
* Someone else could remove our RI lock when acquiring W
* lock. This is possible if we came here from elog(ERROR)
@@ -1993,7 +1961,7 @@ UnlockBuffers(void)
*/
buf->ri_lock = false;
}
if (BufferLocks[i] & BL_W_LOCK)
if (buflocks & BL_W_LOCK)
{
Assert(buf->w_lock);
buf->w_lock = false;
@@ -2001,6 +1969,20 @@ UnlockBuffers(void)
S_UNLOCK(&(buf->cntx_lock));
if (buflocks & BL_PIN_COUNT_LOCK)
{
SpinAcquire(BufMgrLock);
/*
* Don't complain if flag bit not set; it could have been reset
* but we got a cancel/die interrupt before getting the signal.
*/
if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
buf->wait_backend_id == MyBackendId)
buf->flags &= ~BM_PIN_COUNT_WAITER;
SpinRelease(BufMgrLock);
ProcCancelWaitForSignal();
}
BufferLocks[i] = 0;
RESUME_INTERRUPTS();
@@ -2126,6 +2108,77 @@ LockBuffer(Buffer buffer, int mode)
RESUME_INTERRUPTS();
}
/*
* LockBufferForCleanup - lock a buffer in preparation for deleting items
*
* Items may be deleted from a disk page only when the caller (a) holds an
* exclusive lock on the buffer and (b) has observed that no other backend
* holds a pin on the buffer. If there is a pin, then the other backend
* might have a pointer into the buffer (for example, a heapscan reference
* to an item --- see README for more details). It's OK if a pin is added
* after the cleanup starts, however; the newly-arrived backend will be
* unable to look at the page until we release the exclusive lock.
*
* To implement this protocol, a would-be deleter must pin the buffer and
* then call LockBufferForCleanup(). LockBufferForCleanup() is similar to
* LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
* it has successfully observed pin count = 1.
*/
void
LockBufferForCleanup(Buffer buffer)
{
BufferDesc *bufHdr;
bits8 *buflock;
Assert(BufferIsValid(buffer));
if (BufferIsLocal(buffer))
{
/* There should be exactly one pin */
if (LocalRefCount[-buffer - 1] != 1)
elog(ERROR, "LockBufferForCleanup: wrong local pin count");
/* Nobody else to wait for */
return;
}
/* There should be exactly one local pin */
if (PrivateRefCount[buffer - 1] != 1)
elog(ERROR, "LockBufferForCleanup: wrong local pin count");
bufHdr = &BufferDescriptors[buffer - 1];
buflock = &(BufferLocks[buffer - 1]);
for (;;)
{
/* Try to acquire lock */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
SpinAcquire(BufMgrLock);
Assert(bufHdr->refcount > 0);
if (bufHdr->refcount == 1)
{
/* Successfully acquired exclusive lock with pincount 1 */
SpinRelease(BufMgrLock);
return;
}
/* Failed, so mark myself as waiting for pincount 1 */
if (bufHdr->flags & BM_PIN_COUNT_WAITER)
{
SpinRelease(BufMgrLock);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
elog(ERROR, "Multiple backends attempting to wait for pincount 1");
}
bufHdr->wait_backend_id = MyBackendId;
bufHdr->flags |= BM_PIN_COUNT_WAITER;
*buflock |= BL_PIN_COUNT_LOCK;
SpinRelease(BufMgrLock);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* Wait to be signaled by UnpinBuffer() */
ProcWaitForSignal();
*buflock &= ~BL_PIN_COUNT_LOCK;
/* Loop back and try again */
}
}
/*
* Functions for IO error handling
*
@@ -2240,8 +2293,9 @@ AbortBufferIO(void)
/* Issue notice if this is not the first failure... */
if (buf->flags & BM_IO_ERROR)
{
elog(NOTICE, "write error may be permanent: cannot write block %u for %s/%s",
buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
elog(NOTICE, "write error may be permanent: cannot write block %u for %u/%u",
buf->tag.blockNum,
buf->tag.rnode.tblNode, buf->tag.rnode.relNode);
}
buf->flags |= BM_DIRTY;
}
@@ -2252,59 +2306,6 @@ AbortBufferIO(void)
}
}
/*
* Cleanup buffer or mark it for cleanup. Buffer may be cleaned
* up if it's pinned only once.
*
* NOTE: buffer must be excl locked.
*/
void
MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc) (Buffer))
{
BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
Assert(PrivateRefCount[buffer - 1] > 0);
if (PrivateRefCount[buffer - 1] > 1)
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
PrivateRefCount[buffer - 1]--;
SpinAcquire(BufMgrLock);
Assert(bufHdr->refcount > 0);
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
bufHdr->CleanupFunc = CleanupFunc;
SpinRelease(BufMgrLock);
return;
}
SpinAcquire(BufMgrLock);
Assert(bufHdr->refcount > 0);
if (bufHdr->refcount == 1)
{
SpinRelease(BufMgrLock);
CleanupFunc(buffer);
CleanupFunc = NULL;
}
else
SpinRelease(BufMgrLock);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
SpinAcquire(BufMgrLock);
PrivateRefCount[buffer - 1] = 0;
Assert(bufHdr->refcount > 0);
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
bufHdr->CleanupFunc = CleanupFunc;
bufHdr->refcount--;
if (bufHdr->refcount == 0)
{
AddBufferToFreelist(bufHdr);
bufHdr->flags |= BM_FREE;
}
SpinRelease(BufMgrLock);
return;
}
RelFileNode
BufferGetFileNode(Buffer buffer)
{

View File

@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/freelist.c,v 1.23 2001/01/24 19:43:06 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/freelist.c,v 1.24 2001/07/06 21:04:26 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -29,14 +29,14 @@
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"
static BufferDesc *SharedFreeList;
/* only actually used in debugging. The lock
* should be acquired before calling the freelist manager.
/*
* State-checking macros
*/
extern SPINLOCK BufMgrLock;
#define IsInQueue(bf) \
( \
@@ -45,7 +45,7 @@ extern SPINLOCK BufMgrLock;
AssertMacro((bf->flags & BM_FREE)) \
)
#define NotInQueue(bf) \
#define IsNotInQueue(bf) \
( \
AssertMacro((bf->freeNext == INVALID_DESCRIPTOR)), \
AssertMacro((bf->freePrev == INVALID_DESCRIPTOR)), \
@@ -61,14 +61,14 @@ extern SPINLOCK BufMgrLock;
* the manner in which buffers are added to the freelist queue.
* Currently, they are added on an LRU basis.
*/
void
static void
AddBufferToFreelist(BufferDesc *bf)
{
#ifdef BMTRACE
_bm_trace(bf->tag.relId.dbId, bf->tag.relId.relId, bf->tag.blockNum,
BufferDescriptorGetBuffer(bf), BMT_DEALLOC);
#endif /* BMTRACE */
NotInQueue(bf);
IsNotInQueue(bf);
/* change bf so it points to inFrontOfNew and its successor */
bf->freePrev = SharedFreeList->freePrev;
@@ -83,13 +83,14 @@ AddBufferToFreelist(BufferDesc *bf)
/*
* PinBuffer -- make buffer unavailable for replacement.
*
* This should be applied only to shared buffers, never local ones.
* Bufmgr lock must be held by caller.
*/
void
PinBuffer(BufferDesc *buf)
{
long b;
/* Assert (buf->refcount < 25); */
int b = BufferDescriptorGetBuffer(buf) - 1;
if (buf->refcount == 0)
{
@@ -104,13 +105,12 @@ PinBuffer(BufferDesc *buf)
buf->flags &= ~BM_FREE;
}
else
NotInQueue(buf);
IsNotInQueue(buf);
b = BufferDescriptorGetBuffer(buf) - 1;
Assert(PrivateRefCount[b] >= 0);
if (PrivateRefCount[b] == 0)
buf->refcount++;
PrivateRefCount[b]++;
Assert(PrivateRefCount[b] > 0);
}
#ifdef NOT_USED
@@ -135,24 +135,35 @@ refcount = %ld, file: %s, line: %d\n",
/*
* UnpinBuffer -- make buffer available for replacement.
*
* This should be applied only to shared buffers, never local ones.
* Bufmgr lock must be held by caller.
*/
void
UnpinBuffer(BufferDesc *buf)
{
long b = BufferDescriptorGetBuffer(buf) - 1;
int b = BufferDescriptorGetBuffer(buf) - 1;
IsNotInQueue(buf);
Assert(buf->refcount > 0);
Assert(PrivateRefCount[b] > 0);
PrivateRefCount[b]--;
if (PrivateRefCount[b] == 0)
buf->refcount--;
NotInQueue(buf);
if (buf->refcount == 0)
{
/* buffer is now unpinned */
AddBufferToFreelist(buf);
buf->flags |= BM_FREE;
}
else if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
buf->refcount == 1)
{
/* we just released the last pin other than the waiter's */
buf->flags &= ~BM_PIN_COUNT_WAITER;
ProcSendSignal(buf->wait_backend_id);
}
else
{
/* do nothing */
@@ -179,18 +190,16 @@ refcount = %ld, file: %s, line: %d\n",
/*
* GetFreeBuffer() -- get the 'next' buffer from the freelist.
*
*/
BufferDesc *
GetFreeBuffer()
GetFreeBuffer(void)
{
BufferDesc *buf;
if (Free_List_Descriptor == SharedFreeList->freeNext)
{
/* queue is empty. All buffers in the buffer pool are pinned. */
elog(ERROR, "out of free buffers: time to abort !\n");
elog(ERROR, "out of free buffers: time to abort!");
return NULL;
}
buf = &(BufferDescriptors[SharedFreeList->freeNext]);
@@ -220,7 +229,7 @@ InitFreeList(bool init)
if (init)
{
/* we only do this once, normally the postmaster */
/* we only do this once, normally in the postmaster */
SharedFreeList->data = INVALID_OFFSET;
SharedFreeList->flags = 0;
SharedFreeList->flags &= ~(BM_VALID | BM_DELETED | BM_FREE);
@@ -249,37 +258,23 @@ DBG_FreeListCheck(int nfree)
buf = &(BufferDescriptors[SharedFreeList->freeNext]);
for (i = 0; i < nfree; i++, buf = &(BufferDescriptors[buf->freeNext]))
{
if (!(buf->flags & (BM_FREE)))
{
if (buf != SharedFreeList)
{
printf("\tfree list corrupted: %d flags %x\n",
buf->buf_id, buf->flags);
}
else
{
printf("\tfree list corrupted: too short -- %d not %d\n",
i, nfree);
}
}
if ((BufferDescriptors[buf->freeNext].freePrev != buf->buf_id) ||
(BufferDescriptors[buf->freePrev].freeNext != buf->buf_id))
{
printf("\tfree list links corrupted: %d %ld %ld\n",
buf->buf_id, buf->freePrev, buf->freeNext);
}
}
if (buf != SharedFreeList)
{
printf("\tfree list corrupted: %d-th buffer is %d\n",
nfree, buf->buf_id);
}
}
#endif