1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-07 00:36:50 +03:00
This commit is contained in:
Vadim B. Mikheev
2000-10-13 02:03:02 +00:00
parent 0b33ace678
commit 25a26a7ab8
5 changed files with 703 additions and 40 deletions

View File

@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.63 2000/08/10 02:33:20 inoue Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.64 2000/10/13 02:03:00 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -730,3 +730,583 @@ _bt_restscan(IndexScanDesc scan)
so->btso_curbuf = buf;
}
}
#ifdef XLOG
void btree_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(true, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(true, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
btree_xlog_split(true, false, lsn, record); /* new item on the right */
else if (info == XLOG_BTREE_SPLEFT)
btree_xlog_split(true, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(true, lsn, record);
else
elog(STOP, "btree_redo: unknown op code %u", info);
}
void btree_undo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(false, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(false, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
btree_xlog_split(false, false, lsn, record);/* new item on the right */
else if (info == XLOG_BTREE_SPLEFT)
btree_xlog_split(false, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(false, lsn, record);
else
elog(STOP, "btree_undo: unknown op code %u", info);
}
static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_delete *xlrec;
Relation *reln;
Buffer buffer;
Page page;
if (!redo)
return;
xlrec = (xl_btree_delete*) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
elog(STOP, "btree_delete_redo: block unfound");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_delete_redo: uninitialized page");
PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
return;
}
static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_insert *xlrec;
Relation *reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
xlrec = (xl_btree_insert*) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer((redo) ? true : false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_insert_%s: uninitialized page",
(redo) ? "redo" : "undo");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
if (XLByteLE(lsn, PageGetLSN(page)))
UnlockAndReleaseBuffer(buffer);
else
{
Size hsize = SizeOfBtreeInsert;
RelFileNode hnode;
if (P_ISLEAF(pageop))
{
hsize += (sizeof(CommandId) + sizeof(RelFileNode));
memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert +
sizeof(CommandId), sizeof(RelFileNode));
}
if (! _bt_add_item(page,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
(char*)xlrec + hsize,
record->xl_len - hsize,
&hnode))
elog(STOP, "btree_insert_redo: failed to add item");
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
}
else
{
BTItemData btdata;
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_insert_undo: bad page LSN");
if (! P_ISLEAF(pageop))
{
UnlockAndReleaseBuffer(buffer);
return;
}
memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert +
sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));
_bt_del_item(reln, buffer, &btdata, true, lsn, record);
}
return;
}
static void
btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_split *xlrec;
Relation *reln;
BlockNumber blkno;
BlockNumber parent;
Buffer buffer;
Page page;
BTPageOpaque pageop;
char *op = (redo) ? "redo" : "undo";
bool isleaf;
xlrec = (xl_btree_split*) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
/* Left (original) sibling */
blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
BlockIdGetBlockNumber(xlrec->otherblk);
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost left sibling", op);
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_split_%s: uninitialized left sibling", op);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
isleaf = P_ISLEAF(pageop);
parent = pageop->btpo_parent;
if (redo)
{
if (XLByteLE(lsn, PageGetLSN(page)))
UnlockAndReleaseBuffer(buffer);
else
{
/* Delete items related to new right sibling */
_bt_thin_left_page(page, record);
if (onleft)
{
BTItemData btdata;
Size hsize = SizeOfBtreeSplit;
Size itemsz;
RelFileNode hnode;
pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk);
if (isleaf)
{
hsize += (sizeof(CommandId) + sizeof(RelFileNode));
memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit +
sizeof(CommandId), sizeof(RelFileNode));
}
memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
if (! _bt_add_item(page,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
(char*)xlrec + hsize,
itemsz,
&hnode))
elog(STOP, "btree_split_redo: failed to add item");
}
else
pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
}
else /* undo */
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_split_undo: bad left sibling LSN");
if (! isleaf || ! onleft)
UnlockAndReleaseBuffer(buffer);
else
{
BTItemData btdata;
memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit +
sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));
_bt_del_item(reln, buffer, &btdata, false, lsn, record);
}
}
/* Right (new) sibling */
blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost right sibling", op);
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
{
if (!redo)
elog(STOP, "btree_split_undo: uninitialized right sibling");
PageInit(page, BufferGetPageSize(buffer), 0);
}
if (redo)
{
if (XLByteLE(lsn, PageGetLSN(page)))
UnlockAndReleaseBuffer(buffer);
else
{
Size hsize = SizeOfBtreeSplit;
BTItemData btdata;
Size itemsz;
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (isleaf)
{
pageop->btpo_flags |= BTP_LEAF;
hsize += (sizeof(CommandId) + sizeof(RelFileNode));
}
if (onleft) /* skip target item */
{
memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
hsize += itemsz;
}
for (char* item = (char*)xlrec + hsize;
item < (char*)record + record->xl_len; )
{
memcpy(&btdata, item, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,
LP_USED) == InvalidOffsetNumber)
elog(STOP, "btree_split_redo: can't add item to right sibling");
item += itemsz;
}
pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
BlockIdGetBlockNumber(xlrec->otherblk);
pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk);
pageop->btpo_parent = parent;
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
}
else /* undo */
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_split_undo: bad right sibling LSN");
if (! isleaf || onleft)
UnlockAndReleaseBuffer(buffer);
else
{
char tbuf[BLCKSZ];
int cnt;
char *item;
Size itemsz;
item = (char*)xlrec + SizeOfBtreeSplit +
sizeof(CommandId) + sizeof(RelFileNode);
for (cnt = 0; item < (char*)record + record->xl_len; )
{
BTItem btitem = (BTItem)
(tbuf + cnt * (MAXALIGN(sizeof(BTItemData))));
memcpy(btitem, item, sizeof(BTItemData));
itemsz = IndexTupleDSize(btitem->bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
item += itemsz;
cnt++;
}
cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid));
if (cnt < 0)
elog(STOP, "btree_split_undo: target item unfound in right sibling");
item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData)));
_bt_del_item(reln, buffer, (BTItem)item, false, lsn, record);
}
}
/* Right (next) page */
blkno = BlockIdGetBlockNumber(xlrec->rightblk);
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_split_%s: lost next right page", op);
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_split_%s: uninitialized next right page", op);
if (redo)
{
if (XLByteLE(lsn, PageGetLSN(page)))
UnlockAndReleaseBuffer(buffer);
else
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
}
else /* undo */
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(STOP, "btree_split_undo: bad next right page LSN");
UnlockAndReleaseBuffer(buffer);
}
}
static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_newroot *xlrec;
Relation *reln;
Buffer buffer;
Page page;
Buffer metabuf;
Page metapg;
if (!redo)
return;
xlrec = (xl_btree_newroot*) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
if (!BufferIsValid(buffer))
elog(STOP, "btree_newroot_redo: no root page");
metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
if (!BufferIsValid(buffer))
elog(STOP, "btree_newroot_redo: no metapage");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn))
{
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_flags |= BTP_ROOT;
pageop->btpo_prev = pageop->btpo_next = P_NONE;
pageop->btpo_parent = BTREE_METAPAGE;
if (record->xl_len == SizeOfBtreeNewroot) /* no childs */
pageop->btpo_flags |= BTP_LEAF;
else
{
BTItemData btdata;
Size itemsz;
for (char* item = (char*)xlrec + SizeOfBtreeNewroot;
item < (char*)record + record->xl_len; )
{
memcpy(&btdata, item, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,
LP_USED) == InvalidOffsetNumber)
elog(STOP, "btree_newroot_redo: can't add item");
item += itemsz;
}
}
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
else
UnlockAndReleaseBuffer(buffer);
metapg = BufferGetPage(metabuf);
if (PageIsNew((PageHeader) metapg))
{
BTMetaPageData md;
_bt_pageinit(metapg, BufferGetPageSize(metabuf));
md.btm_magic = BTREE_MAGIC;
md.btm_version = BTREE_VERSION;
md.btm_root = P_NONE;
md.btm_level = 0;
memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md));
}
if (XLByteLT(PageGetLSN(metapg), lsn))
{
BTMetaPageData *metad = BTPageGetMeta(metapg);
metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
(metad->btm_level)++;
PageSetLSN(metapg, lsn);
PageSetSUI(metapg, ThisStartUpID);
UnlockAndWriteBuffer(metabuf);
}
else
UnlockAndReleaseBuffer(metabuf);
return;
}
/*
* UNDO insertion on *leaf* page:
* - find inserted tuple;
* - delete it if heap tuple was inserted by the same xaction
*/
static void
_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert,
XLogRecPtr lsn, XLogRecord *record)
{
char *xlrec = (char*) XLogRecGetData(record);
Page page = (Page) BufferGetPage(buffer);
BTPageOpaque pageop;
BlockNumber blkno;
OffsetNumber offno;
ItemId lp;
for ( ; ; )
{
offno = _bt_find_btitem(page, btitem);
if (offno != InvalidOffsetNumber)
break;
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_RIGHTMOST(pageop))
break;
blkno = pageop->btpo_next;
UnlockAndReleaseBuffer(buffer);
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(STOP, "btree_%s_undo: lost right sibling",
(insert) ? "insert" : "split");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(STOP, "btree_%s_undo: uninitialized right sibling",
(insert) ? "insert" : "split");
if (XLByteLT(PageGetLSN(page), lsn))
break;
}
if (offno == InvalidOffsetNumber) /* not found */
{
if (!InRecovery)
elog(STOP, "btree_%s_undo: lost target tuple in rollback",
(insert) ? "insert" : "split");
UnlockAndReleaseBuffer(buffer);
return;
}
lp = PageGetItemId(page, offno);
if (ItemIdDeleted(lp)) /* marked for deletion */
{
if (!InRecovery)
elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
(insert) ? "insert" : "split");
}
else if (InRecovery) /* check heap tuple */
{
int result;
CommandId cid;
RelFileNode hnode;
Size hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
result = XLogCheckHeapTuple(hnode, &(btitem->bti_itup.t_tid),
record->xl_xid, cid);
if (result <= 0) /* no tuple or not owner */
{
UnlockAndReleaseBuffer(buffer);
return;
}
}
else if (! BufferIsUpdatable(buffer)) /* normal rollback */
{
lp->lp_flags |= LP_DELETE;
MarkBufferForCleanup(buffer, IndexPageCleanup);
return;
}
PageIndexTupleDelete(page, offno);
if (InRecovery)
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_flags |= BTP_REORDER;
}
UnlockAndWriteBuffer(buffer);
return;
}
static bool
_bt_add_item(Page page, OffsetNumber offno,
char* item, Size size, RelFileNode* hnode)
{
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (offno > PageGetMaxOffsetNumber(page) + 1)
{
if (! (pageop->btpo_flags & BTP_REORDER))
{
elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected");
pageop->btpo_flags |= BTP_REORDER;
}
offno = PageGetMaxOffsetNumber(page) + 1;
}
if (PageAddItem(page, (Item) item, size, offno,
LP_USED) == InvalidOffsetNumber)
{
/* ops, not enough space - try to deleted dead tuples */
bool result;
if (! P_ISLEAF(pageop))
return(false);
result = _bt_cleanup_page(page, hnode);
if (!result || PageAddItem(page, (Item) item, size, offno,
LP_USED) == InvalidOffsetNumber)
return(false);
}
return(true);
}
#endif