diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 49ec63658f2..7e9b8970204 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.113 2001/03/25 23:23:58 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.114 2001/05/12 19:58:27 tgl Exp $ * * * INTERFACE ROUTINES @@ -487,7 +487,7 @@ heapgettup(Relation relation, return; } - *buffer = ReleaseAndReadBuffer(*buffer, relation, page); + *buffer = ReleaseAndReadBuffer(*buffer, relation, page, false); if (!BufferIsValid(*buffer)) elog(ERROR, "heapgettup: failed ReadBuffer"); diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c index b4780c208e0..7278ecd2dea 100644 --- a/src/backend/access/heap/hio.c +++ b/src/backend/access/heap/hio.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Id: hio.c,v 1.37 2001/03/22 06:16:07 momjian Exp $ + * $Id: hio.c,v 1.38 2001/05/12 19:58:27 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -66,7 +66,7 @@ RelationPutHeapTuple(Relation relation, /* * RelationGetBufferForTuple * - * Returns (locked) buffer with free space >= given len. + * Returns exclusive-locked buffer with free space >= given len. * * Note that we use LockPage to lock relation for extension. We can * do this as long as in all other places we use page-level locking @@ -75,14 +75,14 @@ RelationPutHeapTuple(Relation relation, * * ELOG(ERROR) is allowed here, so this routine *must* be called * before any (unlogged) changes are made in buffer pool. - * */ Buffer RelationGetBufferForTuple(Relation relation, Size len) { - Buffer buffer; + Buffer buffer = InvalidBuffer; Page pageHeader; - BlockNumber lastblock; + BlockNumber lastblock, + oldnblocks; len = MAXALIGN(len); /* be conservative */ @@ -93,59 +93,102 @@ RelationGetBufferForTuple(Relation relation, Size len) elog(ERROR, "Tuple is too big: size %lu, max size %ld", (unsigned long) len, MaxTupleSize); + /* + * First, use relcache's record of table length to guess where the + * last page is, and try to put the tuple there. This cached value + * may be out of date, in which case we'll be inserting into a non-last + * page, but that should be OK. Note that in a newly created relcache + * entry, rd_nblocks may be zero; if so, we'll set it correctly below. + */ + if (relation->rd_nblocks > 0) + { + lastblock = relation->rd_nblocks - 1; + buffer = ReadBuffer(relation, lastblock); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + pageHeader = (Page) BufferGetPage(buffer); + if (len <= PageGetFreeSpace(pageHeader)) + return buffer; + /* + * Doesn't fit, so we'll have to try someplace else. + */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + /* buffer release will happen below... */ + } + + /* + * Before extending relation, make sure no one else has done + * so more recently than our last rd_nblocks update. (If we + * blindly extend the relation here, then probably most of the + * page the other guy added will end up going to waste.) + * + * We have to use a lock to ensure no one else is extending the + * rel at the same time, else we will both try to initialize the + * same new page. + */ if (!relation->rd_myxactonly) LockPage(relation, 0, ExclusiveLock); + oldnblocks = relation->rd_nblocks; /* - * XXX This does an lseek - VERY expensive - but at the moment it is + * XXX This does an lseek - rather expensive - but at the moment it is * the only way to accurately determine how many blocks are in a - * relation. A good optimization would be to get this to actually - * work properly. + * relation. Is it worth keeping an accurate file length in shared + * memory someplace, rather than relying on the kernel to do it for us? */ - lastblock = RelationGetNumberOfBlocks(relation); + relation->rd_nblocks = RelationGetNumberOfBlocks(relation); - /* - * Get the last existing page --- may need to create the first one if - * this is a virgin relation. - */ - if (lastblock == 0) + if (relation->rd_nblocks > oldnblocks) { - buffer = ReadBuffer(relation, P_NEW); + /* + * Someone else has indeed extended the relation recently. + * Try to fit our tuple into the new last page. + */ + lastblock = relation->rd_nblocks - 1; + buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); pageHeader = (Page) BufferGetPage(buffer); - Assert(PageIsNew((PageHeader) pageHeader)); - PageInit(pageHeader, BufferGetPageSize(buffer), 0); - } - else - { - buffer = ReadBuffer(relation, lastblock - 1); - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - pageHeader = (Page) BufferGetPage(buffer); - } - - /* - * Is there room on the last existing page? - */ - if (len > PageGetFreeSpace(pageHeader)) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW); - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - pageHeader = (Page) BufferGetPage(buffer); - Assert(PageIsNew((PageHeader) pageHeader)); - PageInit(pageHeader, BufferGetPageSize(buffer), 0); - - if (len > PageGetFreeSpace(pageHeader)) + if (len <= PageGetFreeSpace(pageHeader)) { - /* We should not get here given the test at the top */ - elog(STOP, "Tuple is too big: size %lu", - (unsigned long) len); + /* OK, we don't need to extend again. */ + if (!relation->rd_myxactonly) + UnlockPage(relation, 0, ExclusiveLock); + return buffer; } + /* + * Doesn't fit, so we'll have to extend the relation (again). + */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + /* buffer release will happen below... */ } + /* + * Extend the relation by one page and update rd_nblocks for next time. + */ + lastblock = relation->rd_nblocks; + buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, true); + relation->rd_nblocks = lastblock + 1; + + /* + * We need to initialize the empty new page. + */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + pageHeader = (Page) BufferGetPage(buffer); + Assert(PageIsNew((PageHeader) pageHeader)); + PageInit(pageHeader, BufferGetPageSize(buffer), 0); + + /* + * Release the file-extension lock; it's now OK for someone else + * to extend the relation some more. + */ if (!relation->rd_myxactonly) UnlockPage(relation, 0, ExclusiveLock); - return (buffer); + if (len > PageGetFreeSpace(pageHeader)) + { + /* We should not get here given the test at the top */ + elog(STOP, "Tuple is too big: size %lu", + (unsigned long) len); + } + return buffer; } diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index b8d3e51b5d5..d5ff24df022 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.110 2001/05/10 20:38:49 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.111 2001/05/12 19:58:27 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -88,10 +88,10 @@ extern void AbortBufferIO(void); */ #define BUFFER_IS_BROKEN(buf) ((buf->flags & BM_IO_ERROR) && !(buf->flags & BM_DIRTY)) -static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum, - bool bufferLockHeld); +static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum, + bool isExtend, bool bufferLockHeld); static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum, - bool *foundPtr, bool bufferLockHeld); + bool *foundPtr); static int ReleaseBufferWithBufferLock(Buffer buffer); static int BufferReplace(BufferDesc *bufHdr); void PrintBufferDescs(void); @@ -121,7 +121,7 @@ RelationGetBufferWithBuffer(Relation relation, SpinRelease(BufMgrLock); return buffer; } - return ReadBufferWithBufferLock(relation, blockNumber, true); + return ReadBufferInternal(relation, blockNumber, false, true); } else { @@ -131,7 +131,7 @@ RelationGetBufferWithBuffer(Relation relation, return buffer; } } - return ReadBuffer(relation, blockNumber); + return ReadBufferInternal(relation, blockNumber, false, false); } /* @@ -152,38 +152,44 @@ RelationGetBufferWithBuffer(Relation relation, /* * ReadBuffer - * */ Buffer ReadBuffer(Relation reln, BlockNumber blockNum) { - return ReadBufferWithBufferLock(reln, blockNum, false); + return ReadBufferInternal(reln, blockNum, false, false); } /* - * ReadBufferWithBufferLock -- does the work of - * ReadBuffer() but with the possibility that - * the buffer lock has already been held. this - * is yet another effort to reduce the number of - * semops in the system. + * ReadBufferInternal -- internal version of ReadBuffer with more options + * + * isExtend: if true, assume that we are extending the file and the caller + * is passing the current EOF block number (ie, caller already called + * smgrnblocks()). + * + * bufferLockHeld: if true, caller already acquired the bufmgr spinlock. + * (This is assumed never to be true if dealing with a local buffer!) */ static Buffer -ReadBufferWithBufferLock(Relation reln, - BlockNumber blockNum, - bool bufferLockHeld) +ReadBufferInternal(Relation reln, BlockNumber blockNum, + bool isExtend, bool bufferLockHeld) { BufferDesc *bufHdr; int status; bool found; - bool extend; /* extending the file by one block */ bool isLocalBuf; - extend = (blockNum == P_NEW); isLocalBuf = reln->rd_myxactonly; if (isLocalBuf) { ReadLocalBufferCount++; + /* Substitute proper block number if caller asked for P_NEW */ + if (blockNum == P_NEW) + { + blockNum = reln->rd_nblocks; + reln->rd_nblocks++; + isExtend = true; + } bufHdr = LocalBufferAlloc(reln, blockNum, &found); if (found) LocalBufferHitCount++; @@ -191,16 +197,25 @@ ReadBufferWithBufferLock(Relation reln, else { ReadBufferCount++; - + /* Substitute proper block number if caller asked for P_NEW */ + if (blockNum == P_NEW) + { + blockNum = smgrnblocks(DEFAULT_SMGR, reln); + isExtend = true; + } /* * lookup the buffer. IO_IN_PROGRESS is set if the requested * block is not currently in memory. */ - bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld); + if (!bufferLockHeld) + SpinAcquire(BufMgrLock); + bufHdr = BufferAlloc(reln, blockNum, &found); if (found) BufferHitCount++; } + /* At this point we do NOT hold the bufmgr spinlock. */ + if (!bufHdr) return InvalidBuffer; @@ -208,11 +223,11 @@ ReadBufferWithBufferLock(Relation reln, if (found) { /* - * Could see found && extend if a buffer was already created for + * Could have found && isExtend if a buffer was already created for * the next page position, but then smgrextend failed to write * the page. Must fall through and try to extend file again. */ - if (!extend) + if (!isExtend) return BufferDescriptorGetBuffer(bufHdr); } @@ -220,16 +235,16 @@ ReadBufferWithBufferLock(Relation reln, * if we have gotten to this point, the reln pointer must be ok and * the relation file must be open. */ - if (extend) + if (isExtend) { /* new buffers are zero-filled */ MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ); - status = smgrextend(DEFAULT_SMGR, reln, bufHdr->tag.blockNum, + status = smgrextend(DEFAULT_SMGR, reln, blockNum, (char *) MAKE_PTR(bufHdr->data)); } else { - status = smgrread(DEFAULT_SMGR, reln, bufHdr->tag.blockNum, + status = smgrread(DEFAULT_SMGR, reln, blockNum, (char *) MAKE_PTR(bufHdr->data)); } @@ -290,13 +305,13 @@ ReadBufferWithBufferLock(Relation reln, * * Returns: descriptor for buffer * - * When this routine returns, the BufMgrLock is guaranteed NOT be held. + * BufMgrLock must be held at entry. When this routine returns, + * the BufMgrLock is guaranteed NOT to be held. */ static BufferDesc * BufferAlloc(Relation reln, BlockNumber blockNum, - bool *foundPtr, - bool bufferLockHeld) + bool *foundPtr) { BufferDesc *buf, *buf2; @@ -305,16 +320,8 @@ BufferAlloc(Relation reln, /* create a new tag so we can lookup the buffer */ /* assume that the relation is already open */ - if (blockNum == P_NEW) - { - blockNum = smgrnblocks(DEFAULT_SMGR, reln); - } - INIT_BUFFERTAG(&newTag, reln, blockNum); - if (!bufferLockHeld) - SpinAcquire(BufMgrLock); - /* see if the block is in the buffer pool already */ buf = BufTableLookup(&newTag); if (buf != NULL) @@ -666,25 +673,34 @@ WriteNoReleaseBuffer(Buffer buffer) #undef ReleaseAndReadBuffer /* * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer() - * so that only one semop needs to be called. + * to save a spinlock release/acquire. * + * An additional frammish of this routine is that the caller may perform + * file extension (as if blockNum = P_NEW) by passing the actual current + * EOF block number as blockNum and setting isExtend true. This hack + * allows us to avoid calling smgrnblocks() again when the caller has + * already done it. + * + * Note: it is OK to pass buffer = InvalidBuffer, indicating that no old + * buffer actually needs to be released. This case is the same as ReadBuffer + * except for the isExtend option. */ Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, - BlockNumber blockNum) + BlockNumber blockNum, + bool isExtend) { BufferDesc *bufHdr; - Buffer retbuf; - if (BufferIsLocal(buffer)) + if (BufferIsValid(buffer)) { - Assert(LocalRefCount[-buffer - 1] > 0); - LocalRefCount[-buffer - 1]--; - } - else - { - if (BufferIsValid(buffer)) + if (BufferIsLocal(buffer)) + { + Assert(LocalRefCount[-buffer - 1] > 0); + LocalRefCount[-buffer - 1]--; + } + else { bufHdr = &BufferDescriptors[buffer - 1]; Assert(PrivateRefCount[buffer - 1] > 0); @@ -701,13 +717,14 @@ ReleaseAndReadBuffer(Buffer buffer, AddBufferToFreelist(bufHdr); bufHdr->flags |= BM_FREE; } - retbuf = ReadBufferWithBufferLock(relation, blockNum, true); - return retbuf; + return ReadBufferInternal(relation, blockNum, + isExtend, true); } } } - return ReadBuffer(relation, blockNum); + return ReadBufferInternal(relation, blockNum, + isExtend, false); } /* @@ -1735,13 +1752,14 @@ ReleaseAndReadBuffer_Debug(char *file, int line, Buffer buffer, Relation relation, - BlockNumber blockNum) + BlockNumber blockNum, + bool isExtend) { bool bufferValid; Buffer b; bufferValid = BufferIsValid(buffer); - b = ReleaseAndReadBuffer(buffer, relation, blockNum); + b = ReleaseAndReadBuffer(buffer, relation, blockNum, isExtend); if (ShowPinTrace && bufferValid && BufferIsLocal(buffer) && is_userbuffer(buffer)) { diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 6e3cd756411..4b4a31066b2 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -16,7 +16,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.40 2001/03/22 03:59:44 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.41 2001/05/12 19:58:27 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -54,12 +54,6 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr) int i; BufferDesc *bufHdr = (BufferDesc *) NULL; - if (blockNum == P_NEW) - { - blockNum = reln->rd_nblocks; - reln->rd_nblocks++; - } - /* a low tech search for now -- not optimized for scans */ for (i = 0; i < NLocBuffer; i++) { diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index fc76f9fe4c5..d45c8888c1d 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: bufmgr.h,v 1.50 2001/03/22 04:01:05 momjian Exp $ + * $Id: bufmgr.h,v 1.51 2001/05/12 19:58:28 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -164,7 +164,7 @@ extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum); extern int WriteBuffer(Buffer buffer); extern int WriteNoReleaseBuffer(Buffer buffer); extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation, - BlockNumber blockNum); + BlockNumber blockNum, bool isExtend); extern int FlushBuffer(Buffer buffer, bool sync, bool release); extern void InitBufferPool(void);