1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-23 03:21:12 +03:00

Further work on connecting the free space map (which is still just a

stub) into the rest of the system.  Adopt a cleaner approach to preventing
deadlock in concurrent heap_updates: allow RelationGetBufferForTuple to
select any page of the rel, and put the onus on it to lock both buffers
in a consistent order.  Remove no-longer-needed isExtend hack from
API of ReleaseAndReadBuffer.
This commit is contained in:
Tom Lane
2001-06-29 21:08:25 +00:00
parent 0eab92c0e6
commit af5ced9cfd
12 changed files with 381 additions and 233 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.120 2001/06/27 23:31:38 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.121 2001/06/29 21:08:23 tgl Exp $
*
*
* INTERFACE ROUTINES
@ -202,8 +202,7 @@ heapgettup(Relation relation,
*buffer = ReleaseAndReadBuffer(*buffer,
relation,
ItemPointerGetBlockNumber(tid),
false);
ItemPointerGetBlockNumber(tid));
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
@ -238,8 +237,7 @@ heapgettup(Relation relation,
*buffer = ReleaseAndReadBuffer(*buffer,
relation,
page,
false);
page);
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
@ -280,8 +278,7 @@ heapgettup(Relation relation,
*buffer = ReleaseAndReadBuffer(*buffer,
relation,
page,
false);
page);
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
@ -374,8 +371,7 @@ heapgettup(Relation relation,
*buffer = ReleaseAndReadBuffer(*buffer,
relation,
page,
false);
page);
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
@ -1088,8 +1084,8 @@ heap_insert(Relation relation, HeapTuple tup)
heap_tuple_toast_attrs(relation, tup, NULL);
#endif
/* Find buffer for this tuple */
buffer = RelationGetBufferForTuple(relation, tup->t_len, 0);
/* Find buffer to insert this tuple into */
buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer);
/* NO ELOG(ERROR) from here till changes are logged */
START_CRIT_SECTION();
@ -1501,18 +1497,16 @@ l2:
* buffer locks on both old and new pages. To avoid deadlock against
* some other backend trying to get the same two locks in the other
* order, we must be consistent about the order we get the locks in.
* We use the rule "lock the higher-numbered page of the relation
* We use the rule "lock the lower-numbered page of the relation
* first". To implement this, we must do RelationGetBufferForTuple
* while not holding the lock on the old page, and we must tell it
* to give us a page beyond the old page.
* while not holding the lock on the old page, and we must rely on it
* to get the locks on both pages in the correct order.
*/
if (newtupsize > pagefree)
{
/* Assume there's no chance to put newtup on same page. */
newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
BufferGetBlockNumber(buffer) + 1);
/* Now reacquire lock on old tuple's page. */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
buffer);
}
else
{
@ -1529,8 +1523,7 @@ l2:
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
BufferGetBlockNumber(buffer) + 1);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
buffer);
}
else
{
@ -1550,7 +1543,8 @@ l2:
/*
* At this point newbuf and buffer are both pinned and locked,
* and newbuf has enough space for the new tuple.
* and newbuf has enough space for the new tuple. If they are
* the same buffer, only one pin is held.
*/
/* NO ELOG(ERROR) from here till changes are logged */

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Id: hio.c,v 1.40 2001/06/27 23:31:38 tgl Exp $
* $Id: hio.c,v 1.41 2001/06/29 21:08:23 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -17,6 +17,8 @@
#include "access/heapam.h"
#include "access/hio.h"
#include "storage/freespace.h"
/*
* RelationPutHeapTuple - place tuple at specified page
@ -66,16 +68,29 @@ RelationPutHeapTuple(Relation relation,
/*
* RelationGetBufferForTuple
*
* Returns exclusive-locked buffer with free space >= given len,
* being careful to select only a page at or beyond minblocknum
* in the relation.
* Returns pinned and exclusive-locked buffer of a page in given relation
* with free space >= given len.
*
* The minblocknum parameter is needed to prevent deadlock between
* concurrent heap_update operations; see heap_update for details.
* Pass zero if you don't particularly care which page you get.
* If otherBuffer is not InvalidBuffer, then it references a previously
* pinned buffer of another page in the same relation; on return, this
* buffer will also be exclusive-locked. (This case is used by heap_update;
* the otherBuffer contains the tuple being updated.)
*
* Note that we use LockPage to lock relation for extension. We can
* do this as long as in all other places we use page-level locking
* The reason for passing otherBuffer is that if two backends are doing
* concurrent heap_update operations, a deadlock could occur if they try
* to lock the same two buffers in opposite orders. To ensure that this
* can't happen, we impose the rule that buffers of a relation must be
* locked in increasing page number order. This is most conveniently done
* by having RelationGetBufferForTuple lock them both, with suitable care
* for ordering.
*
* NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
* same buffer we select for insertion of the new tuple (this could only
* happen if space is freed in that page after heap_update finds there's not
* enough there). In that case, the page will be pinned and locked only once.
*
* Note that we use LockPage(rel, 0) to lock relation for extension.
* We can do this as long as in all other places we use page-level locking
* for indices only. Alternatively, we could define pseudo-table as
* we do for transactions with XactLockTable.
*
@ -84,12 +99,13 @@ RelationPutHeapTuple(Relation relation,
*/
Buffer
RelationGetBufferForTuple(Relation relation, Size len,
BlockNumber minblocknum)
Buffer otherBuffer)
{
Buffer buffer = InvalidBuffer;
Page pageHeader;
BlockNumber lastblock,
oldnblocks;
Size pageFreeSpace;
BlockNumber targetBlock,
otherBlock;
len = MAXALIGN(len); /* be conservative */
@ -100,36 +116,118 @@ RelationGetBufferForTuple(Relation relation, Size len,
elog(ERROR, "Tuple is too big: size %lu, max size %ld",
(unsigned long) len, MaxTupleSize);
if (otherBuffer != InvalidBuffer)
otherBlock = BufferGetBlockNumber(otherBuffer);
else
otherBlock = InvalidBlockNumber; /* just to keep compiler quiet */
/*
* First, use relcache's record of table length to guess where the
* last page is, and try to put the tuple there. This cached value
* may be out of date, in which case we'll be inserting into a non-last
* page, but that should be OK. Note that in a newly created relcache
* entry, rd_nblocks may be zero; if so, we'll set it correctly below.
* We first try to put the tuple on the same page we last inserted a
* tuple on, as cached in the relcache entry. If that doesn't work,
* we ask the shared Free Space Map to locate a suitable page. Since
* the FSM's info might be out of date, we have to be prepared to loop
* around and retry multiple times. (To insure this isn't an infinite
* loop, we must update the FSM with the correct amount of free space on
* each page that proves not to be suitable.) If the FSM has no record of
* a page with enough free space, we give up and extend the relation.
*/
if (relation->rd_nblocks > 0)
targetBlock = relation->rd_targblock;
if (targetBlock == InvalidBlockNumber)
{
lastblock = relation->rd_nblocks - 1;
if (lastblock >= minblocknum)
/*
* We have no cached target page, so ask the FSM for an initial
* target.
*/
targetBlock = GetPageWithFreeSpace(&relation->rd_node, len);
/*
* If the FSM knows nothing of the rel, try the last page before
* we give up and extend. This avoids one-tuple-per-page syndrome
* during bootstrapping or in a recently-started system.
*/
if (targetBlock == InvalidBlockNumber)
{
buffer = ReadBuffer(relation, lastblock);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
if (len <= PageGetFreeSpace(pageHeader))
return buffer;
/*
* Doesn't fit, so we'll have to try someplace else.
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* buffer release will happen below... */
BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
if (nblocks > 0)
targetBlock = nblocks - 1;
}
}
while (targetBlock != InvalidBlockNumber)
{
/*
* Read and exclusive-lock the target block, as well as the
* other block if one was given, taking suitable care with
* lock ordering and the possibility they are the same block.
*/
if (otherBuffer == InvalidBuffer)
{
/* easy case */
buffer = ReadBuffer(relation, targetBlock);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else if (otherBlock == targetBlock)
{
/* also easy case */
buffer = otherBuffer;
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else if (otherBlock < targetBlock)
{
/* lock other buffer first */
buffer = ReadBuffer(relation, targetBlock);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
else
{
/* lock target buffer first */
buffer = ReadBuffer(relation, targetBlock);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
}
/*
* Now we can check to see if there's enough free space here.
* If so, we're done.
*/
pageHeader = (Page) BufferGetPage(buffer);
pageFreeSpace = PageGetFreeSpace(pageHeader);
if (len <= pageFreeSpace)
{
/* use this page as future insert target, too */
relation->rd_targblock = targetBlock;
return buffer;
}
/*
* Not enough space, so we must give up our page locks and
* pin (if any) and prepare to look elsewhere. We don't care
* which order we unlock the two buffers in, so this can be
* slightly simpler than the code above.
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
if (otherBuffer == InvalidBuffer)
{
ReleaseBuffer(buffer);
}
else if (otherBlock != targetBlock)
{
LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
}
/*
* Update FSM as to condition of this page, and ask for another
* page to try.
*/
targetBlock = RecordAndGetPageWithFreeSpace(&relation->rd_node,
targetBlock,
pageFreeSpace,
len);
}
/*
* Before extending relation, make sure no one else has done
* so more recently than our last rd_nblocks update. (If we
* blindly extend the relation here, then probably most of the
* page the other guy added will end up going to waste.)
* Have to extend the relation.
*
* We have to use a lock to ensure no one else is extending the
* rel at the same time, else we will both try to initialize the
@ -138,51 +236,27 @@ RelationGetBufferForTuple(Relation relation, Size len,
if (!relation->rd_myxactonly)
LockPage(relation, 0, ExclusiveLock);
oldnblocks = relation->rd_nblocks;
/*
* XXX This does an lseek - rather expensive - but at the moment it is
* the only way to accurately determine how many blocks are in a
* relation. Is it worth keeping an accurate file length in shared
* memory someplace, rather than relying on the kernel to do it for us?
*/
relation->rd_nblocks = RelationGetNumberOfBlocks(relation);
if (relation->rd_nblocks > oldnblocks)
{
/*
* Someone else has indeed extended the relation recently.
* Try to fit our tuple into the new last page.
*/
lastblock = relation->rd_nblocks - 1;
if (lastblock >= minblocknum)
{
buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
if (len <= PageGetFreeSpace(pageHeader))
{
/* OK, we don't need to extend again. */
if (!relation->rd_myxactonly)
UnlockPage(relation, 0, ExclusiveLock);
return buffer;
}
/*
* Doesn't fit, so we'll have to extend the relation (again).
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* buffer release will happen below... */
}
}
buffer = ReadBuffer(relation, P_NEW);
/*
* Extend the relation by one page and update rd_nblocks for next time.
*
* Note: at this point minblocknum is ignored; we won't extend by more
* than one block...
* Release the file-extension lock; it's now OK for someone else
* to extend the relation some more.
*/
lastblock = relation->rd_nblocks;
buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, true);
relation->rd_nblocks = lastblock + 1;
if (!relation->rd_myxactonly)
UnlockPage(relation, 0, ExclusiveLock);
/*
* We can be certain that locking the otherBuffer first is OK,
* since it must have a lower page number.
*/
if (otherBuffer != InvalidBuffer)
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
/*
* We need to initialize the empty new page.
@ -192,19 +266,22 @@ RelationGetBufferForTuple(Relation relation, Size len,
Assert(PageIsNew((PageHeader) pageHeader));
PageInit(pageHeader, BufferGetPageSize(buffer), 0);
/*
* Release the file-extension lock; it's now OK for someone else
* to extend the relation some more.
*/
if (!relation->rd_myxactonly)
UnlockPage(relation, 0, ExclusiveLock);
if (len > PageGetFreeSpace(pageHeader))
{
/* We should not get here given the test at the top */
elog(STOP, "Tuple is too big: size %lu",
(unsigned long) len);
elog(STOP, "Tuple is too big: size %lu", (unsigned long) len);
}
/*
* Remember the new page as our target for future insertions.
*
* XXX should we enter the new page into the free space map immediately,
* or just keep it for this backend's exclusive use in the short run
* (until VACUUM sees it)? Seems to depend on whether you expect the
* current backend to make more insertions or not, which is probably a
* good bet most of the time. So for now, don't add it to FSM yet.
*/
relation->rd_targblock = BufferGetBlockNumber(buffer);
return buffer;
}