1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

redo/undo support functions and cleanups.

This commit is contained in:
Vadim B. Mikheev
2000-10-20 11:01:21 +00:00
parent e18a862d46
commit b58c0411ba
15 changed files with 726 additions and 167 deletions

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.88 2000/10/13 12:05:20 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.89 2000/10/20 11:01:02 vadim Exp $
*
*
* INTERFACE ROUTINES
@@ -87,7 +87,16 @@
#include "utils/relcache.h"
#ifdef XLOG /* comments are in heap_update */
#include "access/xlogutils.h"
void heap_redo(XLogRecPtr lsn, XLogRecord *record);
void heap_undo(XLogRecPtr lsn, XLogRecord *record);
static xl_heaptid _locked_tuple_;
static void _heap_unlock_tuple(void *data);
static void HeapPageCleanup(Buffer buffer);
#endif
@@ -1380,6 +1389,8 @@ heap_insert(Relation relation, HeapTuple tup)
/* XLOG stuff */
{
xl_heap_insert xlrec;
XLogRecPtr recptr;
xlrec.target.node = relation->rd_node;
xlrec.target.cid = GetCurrentCommandId();
xlrec.target.tid = tup->t_self;
@@ -1388,8 +1399,8 @@ heap_insert(Relation relation, HeapTuple tup)
xlrec.t_hoff = tup->t_data->t_hoff;
xlrec.mask = tup->t_data->t_infomask;
XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INSERT,
(char*) xlrec, SizeOfHeapInsert,
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INSERT,
(char*) &xlrec, SizeOfHeapInsert,
(char*) tup->t_data + offsetof(HeapTupleHeaderData, t_bits),
tup->t_len - offsetof(HeapTupleHeaderData, t_bits));
@@ -1493,11 +1504,13 @@ l1:
/* XLOG stuff */
{
xl_heap_delete xlrec;
XLogRecPtr recptr;
xlrec.target.node = relation->rd_node;
xlrec.target.cid = GetCurrentCommandId();
xlrec.target.tid = tp.t_self;
XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE,
(char*) xlrec, SizeOfHeapDelete, NULL, 0);
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE,
(char*) &xlrec, SizeOfHeapDelete, NULL, 0);
PageSetLSN(dp, recptr);
PageSetSUI(dp, ThisStartUpID);
@@ -1692,16 +1705,19 @@ l2:
/* XLOG stuff */
{
xl_heap_update xlrec;
XLogRecPtr recptr;
xlrec.target.node = relation->rd_node;
xlrec.target.cid = GetCurrentCommandId();
xlrec.target.tid = oldtup.t_self;
xlrec.newtid.tid = newtup->t_self;
xlrec.newtid = newtup->t_self;
xlrec.t_natts = newtup->t_data->t_natts;
xlrec.t_oid = newtup->t_data->t_oid;
xlrec.t_hoff = newtup->t_data->t_hoff;
xlrec.mask = newtup->t_data->t_infomask;
XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_UPDATE,
(char*) xlrec, SizeOfHeapUpdate,
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_UPDATE,
(char*) &xlrec, SizeOfHeapUpdate,
(char*) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits),
newtup->t_len - offsetof(HeapTupleHeaderData, t_bits));
@@ -2000,51 +2016,26 @@ heap_restrpos(HeapScanDesc scan)
}
#ifdef XLOG
void heap_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_HEAP_INSERT)
heap_xlog_insert(true, lsn, record);
else if (info == XLOG_HEAP_DELETE)
heap_xlog_delete(true, lsn, record);
else if (info == XLOG_HEAP_UPDATE)
heap_xlog_update(true, lsn, record);
else if (info == XLOG_HEAP_MOVE)
heap_xlog_move(true, lsn, record);
else
elog(STOP, "heap_redo: unknown op code %u", info);
}
void heap_undo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_HEAP_INSERT)
heap_xlog_insert(false, lsn, record);
else if (info == XLOG_HEAP_DELETE)
heap_xlog_delete(false, lsn, record);
else if (info == XLOG_HEAP_UPDATE)
heap_xlog_update(false, lsn, record);
else if (info == XLOG_HEAP_MOVE)
heap_xlog_move(false, lsn, record);
else
elog(STOP, "heap_undo: unknown op code %u", info);
}
void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
static void
heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_delete *xlrec = (xl_heap_delete*) XLogRecGetData(record);
Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
Buffer buffer;
Page page;
OffsetNumber offnum;
ItemId lp;
HeapTupleHeader htup;
if (!RelationIsValid(reln))
return;
Buffer buffer = XLogReadBuffer(false, reln,
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
return;
Page page = (Page) BufferGetPage(buffer);
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
{
PageInit(page, BufferGetPageSize(buffer), 0);
@@ -2065,8 +2056,8 @@ void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */
elog(STOP, "heap_delete_undo: bad page LSN");
OffsetNumber offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
ItemId lp = PageGetItemId(page, offnum);
offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
lp = PageGetItemId(page, offnum);
if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
{
@@ -2084,7 +2075,7 @@ void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
UnlockAndReleaseBuffer(buffer);
return;
}
HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp);
htup = (HeapTupleHeader) PageGetItem(page, lp);
if (redo)
{
@@ -2115,19 +2106,25 @@ void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
}
void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
static void
heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_insert *xlrec = (xl_heap_insert*) XLogRecGetData(record);
Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
Buffer buffer;
Page page;
OffsetNumber offnum;
ItemId lp;
HeapTupleHeader htup;
if (!RelationIsValid(reln))
return;
Buffer buffer = XLogReadBuffer((redo) ? true : false, reln,
buffer = XLogReadBuffer((redo) ? true : false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
return;
Page page = (Page) BufferGetPage(buffer);
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
{
PageInit(page, BufferGetPageSize(buffer), 0);
@@ -2142,16 +2139,16 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
if (redo)
{
char tbuf[MaxTupleSize];
HeapTupleHeader htup = (HeapTupleHeader) tbuf;
uint32 newlen = record->xl_len - SizeOfHeapInsert;
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
UnlockAndReleaseBuffer(buffer);
return;
}
char tbuf[MaxTupleSize];
HeapTupleHeader htup = (HeapTupleHeader) tbuf;
uint32 newlen = record->xl_len - SizeOfHeapInsert;
memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits),
(char*)xlrec + SizeOfHeapInsert, newlen);
newlen += offsetof(HeapTupleHeaderData, t_bits);
@@ -2162,10 +2159,9 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
htup->t_cmin = xlrec->target.cid;
htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask;
PageManagerModeSet(OverwritePageManagerMode);
OffsetNumber offnum = PageAddItem(page, htup, newlen,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)), LP_USED);
PageManagerModeSet(ShufflePageManagerMode);
offnum = PageAddItem(page, (Item)htup, newlen,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
LP_USED | OverwritePageMode);
if (offnum == InvalidOffsetNumber)
elog(STOP, "heap_insert_redo: failed to add tuple");
PageSetLSN(page, lsn);
@@ -2178,8 +2174,8 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */
elog(STOP, "heap_insert_undo: bad page LSN");
OffsetNumber offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
ItemId lp = PageGetItemId(page, offnum);
offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
lp = PageGetItemId(page, offnum);
if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
{
@@ -2195,11 +2191,11 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
UnlockAndReleaseBuffer(buffer);
return;
}
HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp);
htup = (HeapTupleHeader) PageGetItem(page, lp);
/* is it our tuple ? */
if (PageGetSUI(page) != ThisStartUpID ||
htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
Assert(PageGetSUI(page) == ThisStartUpID);
if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
{
if (!InRecovery)
elog(STOP, "heap_insert_undo: invalid target tuple in rollback");
@@ -2207,33 +2203,25 @@ void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
return;
}
if (InRecovery || BufferIsUpdatable(buffer))
{
lp->lp_flags &= ~LP_USED;
PageRepairFragmentation(page);
UnlockAndWriteBuffer(buffer);
}
else /* we can't delete tuple right now */
{
lp->lp_flags |= LP_DELETE; /* mark for deletion */
MarkBufferForCleanup(buffer, HeapPageCleanup);
}
lp->lp_flags |= LP_DELETE; /* mark for deletion */
MarkBufferForCleanup(buffer, HeapPageCleanup);
}
void heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record)
static void
heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_heap_update *xlrec = (xl_heap_update*) XLogRecGetData(record);
Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
Buffer buffer;
Page page;
OffsetNumber offnum;
ItemId lp;
HeapTupleHeader htup;
if (!RelationIsValid(reln))
return;
/*
* Currently UPDATE is DELETE + INSERT and so code below are near
* exact sum of code in heap_xlog_delete & heap_xlog_insert. We could
@@ -2339,15 +2327,15 @@ newt:;
if (redo)
{
char tbuf[MaxTupleSize];
uint32 newlen = record->xl_len - SizeOfHeapUpdate;
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
UnlockAndReleaseBuffer(buffer);
return;
}
char tbuf[MaxTupleSize];
uint32 newlen = record->xl_len - SizeOfHeapUpdate;
htup = (HeapTupleHeader) tbuf;
memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits),
(char*)xlrec + SizeOfHeapUpdate, newlen);
@@ -2359,10 +2347,9 @@ newt:;
htup->t_cmin = xlrec->target.cid;
htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask;
PageManagerModeSet(OverwritePageManagerMode);
OffsetNumber offnum = PageAddItem(page, htup, newlen,
ItemPointerGetOffsetNumber(&(xlrec->newtid)), LP_USED);
PageManagerModeSet(ShufflePageManagerMode);
offnum = PageAddItem(page, (Item)htup, newlen,
ItemPointerGetOffsetNumber(&(xlrec->newtid)),
LP_USED | OverwritePageMode);
if (offnum == InvalidOffsetNumber)
elog(STOP, "heap_update_redo: failed to add tuple");
PageSetLSN(page, lsn);
@@ -2395,8 +2382,8 @@ newt:;
htup = (HeapTupleHeader) PageGetItem(page, lp);
/* is it our tuple ? */
if (PageGetSUI(page) != ThisStartUpID ||
htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
Assert(PageGetSUI(page) == ThisStartUpID);
if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
{
if (!InRecovery)
elog(STOP, "heap_update_undo: invalid new tuple in rollback");
@@ -2404,19 +2391,93 @@ newt:;
return;
}
if (InRecovery || BufferIsUpdatable(buffer))
{
lp->lp_flags &= ~LP_USED;
PageRepairFragmentation(page);
UnlockAndWriteBuffer(buffer);
}
else /* we can't delete tuple right now */
{
lp->lp_flags |= LP_DELETE; /* mark for deletion */
MarkBufferForCleanup(buffer, PageCleanup);
}
lp->lp_flags |= LP_DELETE; /* mark for deletion */
MarkBufferForCleanup(buffer, HeapPageCleanup);
}
static void
_heap_unlock_tuple(void *data)
{
xl_heaptid *xltid = (xl_heaptid*) data;
Relation reln = XLogOpenRelation(false, RM_HEAP_ID, xltid->node);
Buffer buffer;
Page page;
OffsetNumber offnum;
ItemId lp;
HeapTupleHeader htup;
if (!RelationIsValid(reln))
elog(STOP, "_heap_unlock_tuple: can't open relation");
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xltid->tid)));
if (!BufferIsValid(buffer))
elog(STOP, "_heap_unlock_tuple: can't read buffer");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "_heap_unlock_tuple: uninitialized page");
offnum = ItemPointerGetOffsetNumber(&(xltid->tid));
if (offnum > PageGetMaxOffsetNumber(page))
elog(STOP, "_heap_unlock_tuple: invalid itemid");
lp = PageGetItemId(page, offnum);
if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
elog(STOP, "_heap_unlock_tuple: unused/deleted tuple in rollback");
htup = (HeapTupleHeader) PageGetItem(page, lp);
if (htup->t_xmax != GetCurrentTransactionId() ||
htup->t_cmax != GetCurrentCommandId())
elog(STOP, "_heap_unlock_tuple: invalid xmax/cmax in rollback");
htup->t_infomask &= ~HEAP_XMAX_UNLOGGED;
htup->t_infomask |= HEAP_XMAX_INVALID;
UnlockAndWriteBuffer(buffer);
return;
}
void heap_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_HEAP_INSERT)
heap_xlog_insert(true, lsn, record);
else if (info == XLOG_HEAP_DELETE)
heap_xlog_delete(true, lsn, record);
else if (info == XLOG_HEAP_UPDATE)
heap_xlog_update(true, lsn, record);
#ifdef NOT_USED
else if (info == XLOG_HEAP_MOVE)
heap_xlog_move(true, lsn, record);
#endif
else
elog(STOP, "heap_redo: unknown op code %u", info);
}
void heap_undo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_HEAP_INSERT)
heap_xlog_insert(false, lsn, record);
else if (info == XLOG_HEAP_DELETE)
heap_xlog_delete(false, lsn, record);
else if (info == XLOG_HEAP_UPDATE)
heap_xlog_update(false, lsn, record);
#ifdef NOT_USED
else if (info == XLOG_HEAP_MOVE)
heap_xlog_move(false, lsn, record);
#endif
else
elog(STOP, "heap_undo: unknown op code %u", info);
}
static void
HeapPageCleanup(Buffer buffer)
{
Page page = (Page) BufferGetPage(buffer);
PageRepairFragmentation(page);
}
#endif /* XLOG */

View File

@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.65 2000/10/13 12:05:20 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.66 2000/10/20 11:01:03 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -1261,44 +1261,40 @@ _bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
}
lp = PageGetItemId(page, offno);
if (ItemIdDeleted(lp)) /* marked for deletion */
{
if (!InRecovery)
elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
(insert) ? "insert" : "split");
}
else if (InRecovery) /* check heap tuple */
{
int result;
CommandId cid;
RelFileNode hnode;
Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid),
record->xl_xid, cid);
if (result < 0) /* not owner */
{
UnlockAndReleaseBuffer(buffer);
return;
}
}
else if (! BufferIsUpdatable(buffer)) /* normal rollback */
if (InRecovery) /* check heap tuple */
{
lp->lp_flags |= LP_DELETE;
MarkBufferForCleanup(buffer, IndexPageCleanup);
if (!ItemIdDeleted(lp))
{
int result;
CommandId cid;
RelFileNode hnode;
Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid),
record->xl_xid, cid);
if (result < 0) /* not owner */
{
UnlockAndReleaseBuffer(buffer);
return;
}
}
PageIndexTupleDelete(page, offno);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_flags |= BTP_REORDER;
UnlockAndWriteBuffer(buffer);
return;
}
PageIndexTupleDelete(page, offno);
if (InRecovery)
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_flags |= BTP_REORDER;
}
UnlockAndWriteBuffer(buffer);
/* normal rollback */
if (ItemIdDeleted(lp)) /* marked for deletion ?! */
elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
(insert) ? "insert" : "split");
lp->lp_flags |= LP_DELETE;
MarkBufferForCleanup(buffer, IndexPageCleanup);
return;
}

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.72 2000/10/11 21:28:17 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.73 2000/10/20 11:01:04 vadim Exp $
*
* NOTES
* Transaction aborts can now occur two ways:
@@ -154,6 +154,8 @@
*/
#include "postgres.h"
#include <sys/time.h>
#include "access/nbtree.h"
#include "catalog/heap.h"
#include "catalog/index.h"
@@ -215,6 +217,19 @@ TransactionState CurrentTransactionState = &CurrentTransactionStateData;
int DefaultXactIsoLevel = XACT_READ_COMMITTED;
int XactIsoLevel;
#ifdef XLOG
#include "access/xlogutils.h"
int CommitDelay;
void xact_redo(XLogRecPtr lsn, XLogRecord *record);
void xact_undo(XLogRecPtr lsn, XLogRecord *record);
static void (*_RollbackFunc)(void*) = NULL;
static void *_RollbackData = NULL;
#endif
/* ----------------
* info returned when the system is disabled
*
@@ -676,6 +691,28 @@ RecordTransactionCommit()
*/
TransactionIdCommit(xid);
#ifdef XLOG
{
xl_xact_commit xlrec;
struct timeval delay;
XLogRecPtr recptr;
xlrec.xtime = time(NULL);
/*
* MUST SAVE ARRAY OF RELFILENODE-s TO DROP
*/
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT,
(char*) &xlrec, SizeOfXactCommit, NULL, 0);
/*
* Sleep before commit! So we can flush more than one
* commit records per single fsync.
*/
delay.tv_sec = 0;
delay.tv_usec = CommitDelay;
(void) select(0, NULL, NULL, NULL, &delay);
}
#endif
/*
* Now write the log info to the disk too.
*/
@@ -785,6 +822,18 @@ RecordTransactionAbort()
if (SharedBufferChanged && !TransactionIdDidCommit(xid))
TransactionIdAbort(xid);
#ifdef XLOG
if (SharedBufferChanged)
{
xl_xact_abort xlrec;
XLogRecPtr recptr;
xlrec.xtime = time(NULL);
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT,
(char*) &xlrec, SizeOfXactAbort, NULL, 0);
}
#endif
/*
* Tell bufmgr and smgr to release resources.
*/
@@ -1123,10 +1172,13 @@ AbortTransaction()
AtEOXact_SPI();
AtEOXact_nbtree();
AtAbort_Cache();
AtAbort_Locks();
AtAbort_Memory();
AtEOXact_Files();
/* Here we'll rollback xaction changes */
AtAbort_Locks();
SharedBufferChanged = false; /* safest place to do it */
/* ----------------
@@ -1663,3 +1715,54 @@ IsTransactionBlock()
return false;
}
#ifdef XLOG
void
xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit*) XLogRecGetData(record);
XLogMarkCommitted(record->xl_xid);
/* MUST REMOVE FILES OF ALL DROPPED RELATIONS */
}
else if (info == XLOG_XACT_ABORT)
{
XLogMarkAborted(record->xl_xid);
}
else
elog(STOP, "xact_redo: unknown op code %u", info);
}
void
xact_undo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_XACT_COMMIT) /* shouldn't be called by XLOG */
elog(STOP, "xact_undo: can't undo committed xaction");
else if (info != XLOG_XACT_ABORT)
elog(STOP, "xact_redo: unknown op code %u", info);
}
void
XactPushRollback(void (*func) (void *), void* data)
{
if (_RollbackFunc != NULL)
elog(STOP, "XactPushRollback: already installed");
_RollbackFunc = func;
_RollbackData = data;
}
void
XactPopRollback(void)
{
_RollbackFunc = NULL;
}
#endif

View File

@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.17 2000/07/04 01:49:43 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.18 2000/10/20 11:01:04 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -40,6 +40,7 @@ char ControlFilePath[MAXPGPATH];
uint32 XLOGbuffers = 0;
XLogRecPtr MyLastRecPtr = {0, 0};
bool StopIfError = false;
bool InRecovery = false;
SPINLOCK ControlFileLockId;
SPINLOCK XidGenLockId;
@@ -163,17 +164,6 @@ typedef struct CheckPoint
#define NextBufIdx(curridx) \
((curridx == XLogCtl->XLogCacheBlck) ? 0 : (curridx + 1))
#define XLByteLT(left, right) \
(right.xlogid > left.xlogid || \
(right.xlogid == left.xlogid && right.xrecoff > left.xrecoff))
#define XLByteLE(left, right) \
(right.xlogid > left.xlogid || \
(right.xlogid == left.xlogid && right.xrecoff >= left.xrecoff))
#define XLByteEQ(left, right) \
(right.xlogid == left.xlogid && right.xrecoff == left.xrecoff)
#define InitXLBuffer(curridx) (\
XLogCtl->xlblocks[curridx].xrecoff = \
(XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \

View File

@@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* xlog.c
* xlogutils.c
*
*
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
@@ -9,12 +9,26 @@
*-------------------------------------------------------------------------
*/
#ifdef XLOG
#include "postgres.h"
#include "access/xlog.h"
#include "access/xact.h"
#include "storage/bufpage.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
#include "access/htup.h"
#include "access/xlogutils.h"
#include "catalog/pg_database.h"
#ifdef XLOG
/*
* ---------------------------------------------------------------
*
* Index support functions
*
*----------------------------------------------------------------
*/
/*
* Check if specified heap tuple was inserted by given
@@ -59,7 +73,8 @@ XLogIsOwnerOfTuple(RelFileNode hnode, ItemPointer iptr,
htup = (HeapTupleHeader) PageGetItem(page, lp);
if (PageGetSUI(page) != ThisStartUpID || htup->t_xmin != xid || htup->t_cmin != cid)
Assert(PageGetSUI(page) == ThisStartUpID);
if (htup->t_xmin != xid || htup->t_cmin != cid)
{
UnlockAndReleaseBuffer(buffer);
return(-1);
@@ -70,6 +85,8 @@ XLogIsOwnerOfTuple(RelFileNode hnode, ItemPointer iptr,
}
/*
* MUST BE CALLED ONLY ON RECOVERY.
*
* Check if exists valid (inserted by not aborted xaction) heap tuple
* for given item pointer
*/
@@ -98,6 +115,14 @@ XLogIsValidTuple(RelFileNode hnode, ItemPointer iptr)
UnlockAndReleaseBuffer(buffer);
return(false);
}
if (PageGetSUI(page) != ThisStartUpID)
{
Assert(PageGetSUI(page) < ThisStartUpID);
UnlockAndReleaseBuffer(buffer);
return(true);
}
lp = PageGetItemId(page, ItemPointerGetOffsetNumber(iptr));
if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
{
@@ -107,7 +132,9 @@ XLogIsValidTuple(RelFileNode hnode, ItemPointer iptr)
htup = (HeapTupleHeader) PageGetItem(page, lp);
if (XLogIsAborted(PageGetSUI(page), htup->t_xmin))
/* MUST CHECK WASN'T TUPLE INSERTED IN PREV STARTUP */
if (XLogIsAborted(htup->t_xmin))
{
UnlockAndReleaseBuffer(buffer);
return(false);
@@ -117,4 +144,257 @@ XLogIsValidTuple(RelFileNode hnode, ItemPointer iptr)
return(true);
}
/*
* ---------------------------------------------------------------
*
* Transaction support functions for recovery
*
* On recovery we create tmp file to know what xactions were
* committed/aborted (2 bits per xaction).
*
*----------------------------------------------------------------
*/
bool
XLogIsAborted(TransactionId xid)
{
return(false);
}
bool
XLogIsCommitted(TransactionId xid)
{
return(true);
}
void
XLogMarkAborted(TransactionId xid)
{
return;
}
void
XLogMarkCommitted(TransactionId xid)
{
return;
}
/*
* ---------------------------------------------------------------
*
* Storage related support functions
*
*----------------------------------------------------------------
*/
Buffer
XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno)
{
BlockNumber lastblock = RelationGetNumberOfBlocks(reln);
Buffer buffer;
if (blkno >= lastblock)
{
buffer = InvalidBuffer;
if (extend) /* we do this in recovery only - no locks */
{
Assert(InRecovery);
while (lastblock <= blkno)
{
buffer = ReadBuffer(reln, P_NEW);
lastblock++;
}
}
if (buffer != InvalidBuffer)
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
return(buffer);
}
buffer = ReadBuffer(reln, blkno);
if (buffer != InvalidBuffer)
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
return(buffer);
}
/*
* "Relation" cache
*/
typedef struct XLogRelDesc
{
RelationData reldata;
struct XLogRelDesc *lessRecently;
struct XLogRelDesc *moreRecently;
} XLogRelDesc;
typedef struct XLogRelCacheEntry
{
RelFileNode rnode;
XLogRelDesc *rdesc;
} XLogRelCacheEntry;
static HTAB *_xlrelcache;
static XLogRelDesc *_xlrelarr = NULL;
static Form_pg_class _xlpgcarr = NULL;
static int _xlast = 0;
static int _xlcnt = 0;
#define _XLOG_INITRELCACHESIZE 32
#define _XLOG_MAXRELCACHESIZE 512
void
XLogCloseRelationCache(void)
{
int i;
if (!_xlrelarr)
return;
for (i = 1; i < _xlast; i++)
{
Relation reln = &(_xlrelarr[i].reldata);
if (reln->rd_fd >= 0)
smgrclose(DEFAULT_SMGR, reln);
}
free(_xlrelarr);
free(_xlpgcarr);
hash_destroy(_xlrelcache);
_xlrelarr = NULL;
}
static void
_xl_init_rel_cache(void)
{
HASHCTL ctl;
_xlcnt = _XLOG_INITRELCACHESIZE;
_xlast = 0;
_xlrelarr = (XLogRelDesc*) malloc(sizeof(XLogRelDesc) * _xlcnt);
memset(_xlrelarr, 0, sizeof(XLogRelDesc) * _xlcnt);
_xlpgcarr = (Form_pg_class) malloc(sizeof(FormData_pg_class) * _xlcnt);
memset(_xlpgcarr, 0, sizeof(XLogRelDesc) * _xlcnt);
memset(&ctl, 0, (int) sizeof(ctl));
ctl.keysize = sizeof(RelFileNode);
ctl.datasize = sizeof(XLogRelDesc*);
ctl.hash = tag_hash;
_xlrelcache = hash_create(_XLOG_INITRELCACHESIZE, &ctl,
HASH_ELEM | HASH_FUNCTION);
}
static XLogRelDesc*
_xl_new_reldesc(void)
{
_xlast++;
if (_xlast < _xlcnt)
{
_xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]);
return(&(_xlrelarr[_xlast]));
}
if ( 2 * _xlcnt <= _XLOG_MAXRELCACHESIZE)
{
_xlrelarr = (XLogRelDesc*) realloc(_xlrelarr,
2 * sizeof(XLogRelDesc) * _xlcnt);
memset(&(_xlrelarr[_xlcnt]), 0, sizeof(XLogRelDesc) * _xlcnt);
_xlpgcarr = (Form_pg_class) realloc(_xlpgcarr,
2 * sizeof(FormData_pg_class) * _xlcnt);
memset(&(_xlpgcarr[_xlcnt]), 0, sizeof(FormData_pg_class) * _xlcnt);
_xlcnt += _xlcnt;
_xlrelarr[_xlast].reldata.rd_rel = &(_xlpgcarr[_xlast]);
return(&(_xlrelarr[_xlast]));
}
else /* reuse */
{
XLogRelCacheEntry *hentry;
bool found;
XLogRelDesc *res = _xlrelarr[0].moreRecently;
Form_pg_class tpgc = res->reldata.rd_rel;
res->lessRecently->moreRecently = res->moreRecently;
res->moreRecently->lessRecently = res->lessRecently;
hentry = (XLogRelCacheEntry*) hash_search(_xlrelcache,
(char*)&(res->reldata.rd_node), HASH_REMOVE, &found);
if (hentry == NULL)
elog(STOP, "XLogOpenRelation: can't delete from cache");
if (!found)
elog(STOP, "XLogOpenRelation: file was not found in cache");
if (res->reldata.rd_fd >= 0)
smgrclose(DEFAULT_SMGR, &(res->reldata));
memset(res, 0, sizeof(XLogRelDesc));
memset(tpgc, 0, sizeof(FormData_pg_class));
res->reldata.rd_rel = tpgc;
_xlast--;
return(res);
}
}
Relation
XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode)
{
XLogRelDesc *res;
XLogRelCacheEntry *hentry;
bool found;
if (!_xlrelarr)
_xl_init_rel_cache();
hentry = (XLogRelCacheEntry*)
hash_search(_xlrelcache, (char*)&rnode, HASH_FIND, &found);
if (hentry == NULL)
elog(STOP, "XLogOpenRelation: error in cache");
if (found)
{
res = hentry->rdesc;
res->lessRecently->moreRecently = res->moreRecently;
res->moreRecently->lessRecently = res->lessRecently;
}
else
{
res = _xl_new_reldesc();
sprintf(RelationGetPhysicalRelationName(&(res->reldata)), "%u", rnode.relNode);
/* unexisting DB id */
res->reldata.rd_lockInfo.lockRelId.dbId = RecoveryDb;
res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode;
res->reldata.rd_node = rnode;
hentry = (XLogRelCacheEntry*)
hash_search(_xlrelcache, (char*)&rnode, HASH_ENTER, &found);
if (hentry == NULL)
elog(STOP, "XLogOpenRelation: can't insert into cache");
if (found)
elog(STOP, "XLogOpenRelation: file found on insert into cache");
hentry->rdesc = res;
res->reldata.rd_unlinked = true; /* look smgropen */
res->reldata.rd_fd = smgropen(DEFAULT_SMGR, &(res->reldata));
}
res->moreRecently = &(_xlrelarr[0]);
res->lessRecently = _xlrelarr[0].lessRecently;
_xlrelarr[0].lessRecently = res;
res->lessRecently->moreRecently = res;
if (res->reldata.rd_fd < 0) /* file doesn't exist */
return(NULL);
return(&(res->reldata));
}
#endif