1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-23 03:21:12 +03:00

Initial MVCC code.

New code for locking buffer' context.
This commit is contained in:
Vadim B. Mikheev
1998-12-15 12:47:01 +00:00
parent c5a27161a1
commit 3f7fbf85dc
65 changed files with 1391 additions and 1282 deletions

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.38 1998/11/27 19:51:36 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.39 1998/12/15 12:45:13 vadim Exp $
*
*
* INTERFACE ROUTINES
@ -89,15 +89,12 @@
#include <utils/inval.h>
#include <utils/memutils.h>
#ifndef HAVE_MEMMOVE
#include <regex/utils.h>
#else
#include <string.h>
#endif
static void doinsert(Relation relation, HeapTuple tup);
/* ----------------------------------------------------------------
* heap support routines
* ----------------------------------------------------------------
@ -214,7 +211,7 @@ static void
heapgettup(Relation relation,
HeapTuple tuple,
int dir,
Buffer *buf,
Buffer *buffer,
Snapshot snapshot,
int nkeys,
ScanKey key)
@ -255,7 +252,7 @@ heapgettup(Relation relation,
elog(DEBUG, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
RelationGetRelationName(relation), tid, dir);
}
elog(DEBUG, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buf, nkeys, key);
elog(DEBUG, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
elog(DEBUG, "heapgettup: relation(%c)=`%s', %p",
relation->rd_rel->relkind, &relation->rd_rel->relname,
@ -288,25 +285,26 @@ heapgettup(Relation relation,
/* assume it is a valid TID XXX */
if (ItemPointerIsValid(tid) == false)
{
*buf = InvalidBuffer;
*buffer = InvalidBuffer;
tuple->t_data = NULL;
return;
}
*buf = RelationGetBufferWithBuffer(relation,
*buffer = RelationGetBufferWithBuffer(relation,
ItemPointerGetBlockNumber(tid),
*buf);
*buffer);
#ifndef NO_BUFFERISVALID
if (!BufferIsValid(*buf))
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
#endif
dp = (Page) BufferGetPage(*buf);
LockBuffer(*buffer, BUFFER_LOCK_SHARE);
dp = (Page) BufferGetPage(*buffer);
lineoff = ItemPointerGetOffsetNumber(tid);
lpp = PageGetItemId(dp, lineoff);
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
tuple->t_len = ItemIdGetLength(lpp);
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
return;
}
@ -328,18 +326,18 @@ heapgettup(Relation relation,
}
if (page < 0)
{
*buf = InvalidBuffer;
*buffer = InvalidBuffer;
tuple->t_data = NULL;
return;
}
*buf = RelationGetBufferWithBuffer(relation, page, *buf);
#ifndef NO_BUFFERISVALID
if (!BufferIsValid(*buf))
*buffer = RelationGetBufferWithBuffer(relation, page, *buffer);
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
#endif
dp = (Page) BufferGetPage(*buf);
LockBuffer(*buffer, BUFFER_LOCK_SHARE);
dp = (Page) BufferGetPage(*buffer);
lines = PageGetMaxOffsetNumber(dp);
if (tid == NULL)
{
@ -373,19 +371,19 @@ heapgettup(Relation relation,
if (page >= pages)
{
*buf = InvalidBuffer;
*buffer = InvalidBuffer;
tuple->t_data = NULL;
return;
}
/* page and lineoff now reference the physically next tid */
*buf = RelationGetBufferWithBuffer(relation, page, *buf);
#ifndef NO_BUFFERISVALID
if (!BufferIsValid(*buf))
*buffer = RelationGetBufferWithBuffer(relation, page, *buffer);
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
#endif
dp = (Page) BufferGetPage(*buf);
LockBuffer(*buffer, BUFFER_LOCK_SHARE);
dp = (Page) BufferGetPage(*buffer);
lines = PageGetMaxOffsetNumber(dp);
}
@ -420,10 +418,13 @@ heapgettup(Relation relation,
* if current tuple qualifies, return it.
* ----------------
*/
HeapTupleSatisfies(tuple, relation, *buf, (PageHeader) dp,
HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
snapshot, nkeys, key);
if (tuple->t_data != NULL)
{
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
return;
}
}
/* ----------------
@ -448,6 +449,7 @@ heapgettup(Relation relation,
* this page and it's time to move to the next..
* ----------------
*/
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
page = nextpage(page, dir);
/* ----------------
@ -456,20 +458,19 @@ heapgettup(Relation relation,
*/
if (page < 0 || page >= pages)
{
if (BufferIsValid(*buf))
ReleaseBuffer(*buf);
*buf = InvalidBuffer;
if (BufferIsValid(*buffer))
ReleaseBuffer(*buffer);
*buffer = InvalidBuffer;
tuple->t_data = NULL;
return;
}
*buf = ReleaseAndReadBuffer(*buf, relation, page);
*buffer = ReleaseAndReadBuffer(*buffer, relation, page);
#ifndef NO_BUFFERISVALID
if (!BufferIsValid(*buf))
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
#endif
dp = (Page) BufferGetPage(*buf);
LockBuffer(*buffer, BUFFER_LOCK_SHARE);
dp = (Page) BufferGetPage(*buffer);
lines = PageGetMaxOffsetNumber((Page) dp);
linesleft = lines - 1;
if (dir < 0)
@ -485,13 +486,6 @@ heapgettup(Relation relation,
}
}
static void
doinsert(Relation relation, HeapTuple tup)
{
RelationPutHeapTupleAtEnd(relation, tup);
return;
}
/* ----------------------------------------------------------------
* heap access method interface
@ -599,11 +593,7 @@ heap_beginscan(Relation relation,
if (RelationIsValid(relation) == false)
elog(ERROR, "heap_beginscan: !RelationIsValid(relation)");
/* ----------------
* set relation level read lock
* ----------------
*/
RelationSetLockForRead(relation);
LockRelation(relation, AccessShareLock);
/* XXX someday assert SelfTimeQual if relkind == RELKIND_UNCATALOGED */
if (relation->rd_rel->relkind == RELKIND_UNCATALOGED)
@ -707,13 +697,7 @@ heap_endscan(HeapScanDesc scan)
*/
RelationDecrementReferenceCount(scan->rs_rd);
/* ----------------
* Non 2-phase read locks on catalog relations
* ----------------
*/
if (IsSystemRelationName(RelationGetRelationName(scan->rs_rd)->data))
RelationUnsetLockForRead(scan->rs_rd);
UnlockRelation(scan->rs_rd, AccessShareLock);
pfree(scan); /* XXX */
}
@ -997,14 +981,6 @@ heap_fetch(Relation relation,
IncrHeapAccessStat(local_fetch);
IncrHeapAccessStat(global_fetch);
/*
* Note: This is collosally expensive - does two system calls per
* indexscan tuple fetch. Not good, and since we should be doing page
* level locking by the scanner anyway, it is commented out.
*/
/* RelationSetLockForTupleRead(relation, tid); */
/* ----------------
* get the buffer from the relation descriptor
* Note that this does a buffer pin.
@ -1013,13 +989,11 @@ heap_fetch(Relation relation,
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
#ifndef NO_BUFFERISVALID
if (!BufferIsValid(buffer))
{
elog(ERROR, "heap_fetch: %s relation: ReadBuffer(%lx) failed",
&relation->rd_rel->relname, (long) tid);
}
#endif
LockBuffer(buffer, BUFFER_LOCK_SHARE);
/* ----------------
* get the item line pointer corresponding to the requested tid
@ -1047,6 +1021,8 @@ heap_fetch(Relation relation,
HeapTupleSatisfies(tuple, relation, buffer, dp,
snapshot, 0, (ScanKey) NULL);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
if (tuple->t_data == NULL)
{
ReleaseBuffer(buffer);
@ -1090,14 +1066,6 @@ heap_insert(Relation relation, HeapTuple tup)
IncrHeapAccessStat(local_insert);
IncrHeapAccessStat(global_insert);
/* ----------------
* set relation level write lock. If this is a "local" relation (not
* visible to others), we don't need to set a write lock.
* ----------------
*/
if (!relation->rd_islocal)
RelationSetLockForWrite(relation);
/* ----------------
* If the object id of this tuple has already been assigned, trust
* the caller. There are a couple of ways this can happen. At initial
@ -1122,228 +1090,178 @@ heap_insert(Relation relation, HeapTuple tup)
tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
doinsert(relation, tup);
RelationPutHeapTupleAtEnd(relation, tup);
if (IsSystemRelationName(RelationGetRelationName(relation)->data))
{
RelationUnsetLockForWrite(relation);
/* ----------------
* invalidate caches (only works for system relations)
* ----------------
*/
RelationInvalidateHeapTuple(relation, tup);
}
return tup->t_data->t_oid;
}
/* ----------------
* heap_delete - delete a tuple
*
* Must decide how to handle errors.
* ----------------
/*
* heap_delete - delete a tuple
*/
int
heap_delete(Relation relation, ItemPointer tid)
heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid)
{
ItemId lp;
HeapTupleData tp;
PageHeader dp;
Buffer buf;
Buffer buffer;
int result;
/* ----------------
* increment access statistics
* ----------------
*/
/* increment access statistics */
IncrHeapAccessStat(local_delete);
IncrHeapAccessStat(global_delete);
/* ----------------
* sanity check
* ----------------
*/
Assert(ItemPointerIsValid(tid));
/* ----------------
* set relation level write lock
* ----------------
*/
RelationSetLockForWrite(relation);
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
buf = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
#ifndef NO_BUFFERISVALID
if (!BufferIsValid(buf))
{ /* XXX L_SH better ??? */
if (!BufferIsValid(buffer))
elog(ERROR, "heap_delete: failed ReadBuffer");
}
#endif /* NO_BUFFERISVALID */
dp = (PageHeader) BufferGetPage(buf);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
dp = (PageHeader) BufferGetPage(buffer);
lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
/*
* Just like test against non-functional updates we try to catch
* non-functional delete attempts. - vadim 05/05/97
*/
tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
tp.t_len = ItemIdGetLength(lp);
tp.t_self = *tid;
if (TupleUpdatedByCurXactAndCmd(&tp))
l1:
result = HeapTupleSatisfiesUpdate(&tp);
if (result == HeapTupleInvisible)
{
/*
* Vadim says this is no longer needed 1998/6/15 elog(NOTICE,
* "Non-functional delete, tuple already deleted");
*/
if (IsSystemRelationName(RelationGetRelationName(relation)->data))
RelationUnsetLockForWrite(relation);
ReleaseBuffer(buf);
return 1;
}
/* ----------------
* check that we're deleteing a valid item
* ----------------
*/
HeapTupleSatisfies((&tp), relation, buf, dp,
false, 0, (ScanKey) NULL);
if (!(tp.t_data))
{
/* XXX call something else */
ReleaseBuffer(buf);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
elog(ERROR, "heap_delete: (am)invalid tid");
}
else if (result == HeapTupleBeingUpdated)
{
TransactionId xwait = tp.t_data->t_xmax;
/* ----------------
* get the tuple and lock tell the buffer manager we want
* exclusive access to the page
* ----------------
*/
/* sleep untill concurrent transaction ends */
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
XactLockTableWait(xwait);
/* ----------------
* store transaction information of xact deleting the tuple
* ----------------
*/
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
if (TransactionIdDidAbort(xwait))
goto l1;
/* concurrent xact committed */
Assert(tp.t_data->t_xmax == xwait);
if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED))
{
tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
SetBufferCommitInfoNeedsSave(buffer);
}
/* if tuple was marked for update but not updated... */
if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
result = HeapTupleMayBeUpdated;
else
result = HeapTupleUpdated;
}
if (result != HeapTupleMayBeUpdated)
{
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
if (ctid != NULL)
*ctid = tp.t_data->t_ctid;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return result;
}
/* store transaction information of xact deleting the tuple */
TransactionIdStore(GetCurrentTransactionId(), &(tp.t_data->t_xmax));
tp.t_data->t_cmax = GetCurrentCommandId();
tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
/* ----------------
* invalidate caches
* ----------------
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* invalidate caches */
RelationInvalidateHeapTuple(relation, &tp);
WriteBuffer(buf);
if (IsSystemRelationName(RelationGetRelationName(relation)->data))
RelationUnsetLockForWrite(relation);
WriteBuffer(buffer);
return 0;
return HeapTupleMayBeUpdated;
}
/* ----------------
* heap_replace - replace a tuple
*
* Must decide how to handle errors.
*
* Fix arguments, work with indexes.
*
* 12/30/93 - modified the return value to be 1 when
* a non-functional update is detected. This
* prevents the calling routine from updating
* indices unnecessarily. -kw
*
* ----------------
/*
* heap_replace - replace a tuple
*/
int
heap_replace(Relation relation, ItemPointer otid, HeapTuple newtup)
heap_replace(Relation relation, ItemPointer otid, HeapTuple newtup,
ItemPointer ctid)
{
ItemId lp;
HeapTupleData oldtup;
Page dp;
PageHeader dp;
Buffer buffer;
int result;
/* ----------------
* increment access statistics
* ----------------
*/
/* increment access statistics */
IncrHeapAccessStat(local_replace);
IncrHeapAccessStat(global_replace);
/* ----------------
* sanity checks
* ----------------
*/
Assert(ItemPointerIsValid(otid));
/* ----------------
* set relation level write lock
* ----------------
*/
if (!relation->rd_islocal)
RelationSetLockForWrite(relation);
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
if (!BufferIsValid(buffer))
elog(ERROR, "amreplace: failed ReadBuffer");
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
dp = (Page) BufferGetPage(buffer);
dp = (PageHeader) BufferGetPage(buffer);
lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid));
/* ----------------
* logically delete old item
* ----------------
*/
oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
oldtup.t_len = ItemIdGetLength(lp);
oldtup.t_self = *otid;
/* -----------------
* the following test should be able to catch all non-functional
* update attempts and shut out all ghost tuples.
* XXX In the future, Spyros may need to update the rule lock on a tuple
* more than once within the same command and same transaction.
* He will have to introduce a new flag to override the following check.
* -- Wei
*
* -----------------
*/
if (TupleUpdatedByCurXactAndCmd(&oldtup))
l2:
result = HeapTupleSatisfiesUpdate(&oldtup);
if (result == HeapTupleInvisible)
{
elog(NOTICE, "Non-functional update, only first update is performed");
if (IsSystemRelationName(RelationGetRelationName(relation)->data))
RelationUnsetLockForWrite(relation);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return 1;
elog(ERROR, "heap_replace: (am)invalid tid");
}
/* ----------------
* check that we're replacing a valid item -
*
* NOTE that this check must follow the non-functional update test
* above as it can happen that we try to 'replace' the same tuple
* twice in a single transaction. The second time around the
* tuple will fail the NowTimeQual. We don't want to abort the
* xact, we only want to flag the 'non-functional' NOTICE. -mer
* ----------------
*/
HeapTupleSatisfies((&oldtup),
relation,
buffer,
(PageHeader) dp,
false,
0,
(ScanKey) NULL);
if (!(oldtup.t_data))
else if (result == HeapTupleBeingUpdated)
{
TransactionId xwait = oldtup.t_data->t_xmax;
/* sleep untill concurrent transaction ends */
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
XactLockTableWait(xwait);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
if (TransactionIdDidAbort(xwait))
goto l2;
/* concurrent xact committed */
Assert(oldtup.t_data->t_xmax == xwait);
if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED))
{
oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
SetBufferCommitInfoNeedsSave(buffer);
}
/* if tuple was marked for update but not updated... */
if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
result = HeapTupleMayBeUpdated;
else
result = HeapTupleUpdated;
}
if (result != HeapTupleMayBeUpdated)
{
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
if (ctid != NULL)
*ctid = oldtup.t_data->t_ctid;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
elog(ERROR, "heap_replace: (am)invalid otid");
return result;
}
/* XXX order problems if not atomic assignment ??? */
@ -1354,42 +1272,122 @@ heap_replace(Relation relation, ItemPointer otid, HeapTuple newtup)
newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
newtup->t_data->t_infomask |= HEAP_XMAX_INVALID;
/* ----------------
* insert new item
* ----------------
*/
if ((unsigned) DOUBLEALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp))
RelationPutHeapTuple(relation, BufferGetBlockNumber(buffer), newtup);
else
{
/* ----------------
* new item won't fit on same page as old item, have to look
* for a new place to put it.
* ----------------
*/
doinsert(relation, newtup);
}
/* ----------------
* new item in place, now record transaction information
* ----------------
*/
/* logically delete old item */
TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax));
oldtup.t_data->t_cmax = GetCurrentCommandId();
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
/* ----------------
* invalidate caches
* ----------------
/* insert new item */
if ((unsigned) DOUBLEALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp))
RelationPutHeapTuple(relation, buffer, newtup);
else
{
/*
* New item won't fit on same page as old item, have to look
* for a new place to put it. Note that we have to unlock
* current buffer context - not good but RelationPutHeapTupleAtEnd
* uses extend lock.
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
RelationPutHeapTupleAtEnd(relation, newtup);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
/*
* New item in place, now record address of new tuple in
* t_ctid of old one.
*/
oldtup.t_data->t_ctid = newtup->t_self;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* invalidate caches */
RelationInvalidateHeapTuple(relation, &oldtup);
WriteBuffer(buffer);
if (IsSystemRelationName(RelationGetRelationName(relation)->data))
RelationUnsetLockForWrite(relation);
return HeapTupleMayBeUpdated;
}
return 0;
/*
* heap_mark4update - mark a tuple for update
*/
int
heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer)
{
ItemPointer tid = &(tuple->t_self);
ItemId lp;
PageHeader dp;
int result;
/* increment access statistics */
IncrHeapAccessStat(local_mark4update);
IncrHeapAccessStat(global_mark4update);
*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
if (!BufferIsValid(*buffer))
elog(ERROR, "heap_mark4update: failed ReadBuffer");
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
dp = (PageHeader) BufferGetPage(*buffer);
lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
tuple->t_len = ItemIdGetLength(lp);
l3:
result = HeapTupleSatisfiesUpdate(tuple);
if (result == HeapTupleInvisible)
{
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(*buffer);
elog(ERROR, "heap_mark4update: (am)invalid tid");
}
else if (result == HeapTupleBeingUpdated)
{
TransactionId xwait = tuple->t_data->t_xmax;
/* sleep untill concurrent transaction ends */
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
XactLockTableWait(xwait);
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
if (TransactionIdDidAbort(xwait))
goto l3;
/* concurrent xact committed */
Assert(tuple->t_data->t_xmax == xwait);
if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED))
{
tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
SetBufferCommitInfoNeedsSave(*buffer);
}
/* if tuple was marked for update but not updated... */
if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
result = HeapTupleMayBeUpdated;
else
result = HeapTupleUpdated;
}
if (result != HeapTupleMayBeUpdated)
{
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
return result;
}
/* store transaction information of xact marking the tuple */
TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
tuple->t_data->t_cmax = GetCurrentCommandId();
tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE;
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
WriteNoReleaseBuffer(*buffer);
return HeapTupleMayBeUpdated;
}
/* ----------------

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Id: hio.c,v 1.14 1998/11/27 19:51:36 vadim Exp $
* $Id: hio.c,v 1.15 1998/12/15 12:45:14 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -15,6 +15,7 @@
#include <postgres.h>
#include <storage/bufpage.h>
#include <access/hio.h>
#include <access/heapam.h>
#include <storage/bufmgr.h>
#include <utils/memutils.h>
@ -29,19 +30,20 @@
* Probably needs to have an amdelunique to allow for
* internal index records to be deleted and reordered as needed.
* For the heap AM, this should never be needed.
*
* Note - we assume that caller hold BUFFER_LOCK_EXCLUSIVE on the buffer.
*
*/
void
RelationPutHeapTuple(Relation relation,
BlockNumber blockIndex,
Buffer buffer,
HeapTuple tuple)
{
Buffer buffer;
Page pageHeader;
BlockNumber numberOfBlocks;
OffsetNumber offnum;
unsigned int len;
ItemId itemId;
Item item;
Page pageHeader;
OffsetNumber offnum;
unsigned int len;
ItemId itemId;
Item item;
/* ----------------
* increment access statistics
@ -50,21 +52,6 @@ RelationPutHeapTuple(Relation relation,
IncrHeapAccessStat(local_RelationPutHeapTuple);
IncrHeapAccessStat(global_RelationPutHeapTuple);
Assert(RelationIsValid(relation));
Assert(HeapTupleIsValid(tuple));
numberOfBlocks = RelationGetNumberOfBlocks(relation);
Assert(blockIndex < numberOfBlocks);
buffer = ReadBuffer(relation, blockIndex);
#ifndef NO_BUFFERISVALID
if (!BufferIsValid(buffer))
{
elog(ERROR, "RelationPutHeapTuple: no buffer for %ld in %s",
blockIndex, &relation->rd_rel->relname);
}
#endif
pageHeader = (Page) BufferGetPage(buffer);
len = (unsigned) DOUBLEALIGN(tuple->t_len); /* be conservative */
Assert((int) len <= PageGetFreeSpace(pageHeader));
@ -75,11 +62,17 @@ RelationPutHeapTuple(Relation relation,
itemId = PageGetItemId((Page) pageHeader, offnum);
item = PageGetItem((Page) pageHeader, itemId);
ItemPointerSet(&((HeapTupleHeader) item)->t_ctid, blockIndex, offnum);
ItemPointerSet(&((HeapTupleHeader) item)->t_ctid,
BufferGetBlockNumber(buffer), offnum);
/*
* Let the caller do this!
*
WriteBuffer(buffer);
*/
/* return an accurate tuple */
ItemPointerSet(&tuple->t_self, blockIndex, offnum);
ItemPointerSet(&tuple->t_self, BufferGetBlockNumber(buffer), offnum);
}
/*
@ -99,6 +92,7 @@ RelationPutHeapTuple(Relation relation,
* RelationGetNumberOfBlocks to be useful.
*
* NOTE: This code presumes that we have a write lock on the relation.
* Not now - we use extend locking...
*
* Also note that this routine probably shouldn't have to exist, and does
* screw up the call graph rather badly, but we are wasting so much time and
@ -116,8 +110,8 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
ItemId itemId;
Item item;
Assert(RelationIsValid(relation));
Assert(HeapTupleIsValid(tuple));
if (!relation->rd_islocal)
LockRelation(relation, ExtendLock);
/*
* XXX This does an lseek - VERY expensive - but at the moment it is
@ -132,16 +126,18 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
{
buffer = ReadBuffer(relation, lastblock);
pageHeader = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) pageHeader))
{
buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW);
pageHeader = (Page) BufferGetPage(buffer);
PageInit(pageHeader, BufferGetPageSize(buffer), 0);
}
/*
* There was IF instead of ASSERT here ?!
*/
Assert(PageIsNew((PageHeader) pageHeader));
buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW);
pageHeader = (Page) BufferGetPage(buffer);
PageInit(pageHeader, BufferGetPageSize(buffer), 0);
}
else
buffer = ReadBuffer(relation, lastblock - 1);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
len = (unsigned) DOUBLEALIGN(tuple->t_len); /* be conservative */
@ -152,7 +148,9 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
if (len > PageGetFreeSpace(pageHeader))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
PageInit(pageHeader, BufferGetPageSize(buffer), 0);
@ -160,6 +158,9 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
elog(ERROR, "Tuple is too big: size %d", len);
}
if (!relation->rd_islocal)
UnlockRelation(relation, ExtendLock);
offnum = PageAddItem((Page) pageHeader, (Item) tuple->t_data,
tuple->t_len, InvalidOffsetNumber, LP_USED);
@ -173,5 +174,7 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
/* return an accurate tuple */
ItemPointerSet(&tuple->t_self, lastblock, offnum);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
}

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/Attic/stats.c,v 1.13 1997/09/08 02:20:31 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/heap/Attic/stats.c,v 1.14 1998/12/15 12:45:15 vadim Exp $
*
* NOTES
* initam should be moved someplace else.
@ -73,6 +73,7 @@ InitHeapAccessStatistics()
stats->global_insert = 0;
stats->global_delete = 0;
stats->global_replace = 0;
stats->global_mark4update = 0;
stats->global_markpos = 0;
stats->global_restrpos = 0;
stats->global_BufferGetRelation = 0;
@ -94,6 +95,7 @@ InitHeapAccessStatistics()
stats->local_insert = 0;
stats->local_delete = 0;
stats->local_replace = 0;
stats->local_mark4update = 0;
stats->local_markpos = 0;
stats->local_restrpos = 0;
stats->local_BufferGetRelation = 0;
@ -157,6 +159,7 @@ ResetHeapAccessStatistics()
stats->local_insert = 0;
stats->local_delete = 0;
stats->local_replace = 0;
stats->local_mark4update = 0;
stats->local_markpos = 0;
stats->local_restrpos = 0;
stats->local_BufferGetRelation = 0;
@ -274,6 +277,9 @@ PrintHeapAccessStatistics(HeapAccessStatistics stats)
printf("local/global_replace: %6d/%6d\n",
stats->local_replace, stats->global_replace);
printf("local/global_mark4update: %6d/%6d\n",
stats->local_mark4update, stats->global_mark4update);
printf("local/global_markpos: %6d/%6d\n",
stats->local_markpos, stats->global_markpos);