1
0
mirror of https://github.com/postgres/postgres.git synced 2025-09-02 04:21:28 +03:00

Refactor per-page logic common to all redo routines to a new function.

Every redo routine uses the same idiom to determine what to do to a page:
check if there's a backup block for it, and if not read, the buffer if the
block exists, and check its LSN. Refactor that into a common function,
XLogReadBufferForRedo, making all the redo routines shorter and more
readable.

This has no user-visible effect, and makes no changes to the WAL format.

Reviewed by Andres Freund, Alvaro Herrera, Michael Paquier.
This commit is contained in:
Heikki Linnakangas
2014-08-13 15:39:08 +03:00
parent 26f8b99b24
commit f8f4227976
8 changed files with 1430 additions and 1739 deletions

View File

@@ -116,27 +116,25 @@ _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
*/
static void
_bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record,
int block_index,
RelFileNode rnode, BlockNumber cblock)
{
Buffer buf;
buf = XLogReadBuffer(rnode, cblock, false);
if (BufferIsValid(buf))
if (XLogReadBufferForRedo(lsn, record, block_index, rnode, cblock, &buf)
== BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buf);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (lsn > PageGetLSN(page))
{
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
Assert((pageop->btpo_flags & BTP_INCOMPLETE_SPLIT) != 0);
pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
Assert((pageop->btpo_flags & BTP_INCOMPLETE_SPLIT) != 0);
pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
PageSetLSN(page, lsn);
MarkBufferDirty(buf);
}
UnlockReleaseBuffer(buf);
PageSetLSN(page, lsn);
MarkBufferDirty(buf);
}
if (BufferIsValid(buf))
UnlockReleaseBuffer(buf);
}
static void
@@ -184,39 +182,28 @@ btree_xlog_insert(bool isleaf, bool ismeta,
*/
if (!isleaf)
{
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
_bt_clear_incomplete_split(lsn, record, xlrec->target.node, cblkno);
_bt_clear_incomplete_split(lsn, record, 0, xlrec->target.node, cblkno);
main_blk_index = 1;
}
else
main_blk_index = 0;
if (record->xl_info & XLR_BKP_BLOCK(main_blk_index))
(void) RestoreBackupBlock(lsn, record, main_blk_index, false, false);
else
if (XLogReadBufferForRedo(lsn, record, main_blk_index, xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
&buffer) == BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(xlrec->target.node,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
page = BufferGetPage(buffer);
if (lsn > PageGetLSN(page))
{
if (PageAddItem(page, (Item) datapos, datalen,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
false, false) == InvalidOffsetNumber)
elog(PANIC, "btree_insert_redo: failed to add item");
if (PageAddItem(page, (Item) datapos, datalen,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
false, false) == InvalidOffsetNumber)
elog(PANIC, "btree_insert_redo: failed to add item");
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/*
* Note: in normal operation, we'd update the metapage while still holding
@@ -299,12 +286,7 @@ btree_xlog_split(bool onleft, bool isroot,
* before locking the other pages)
*/
if (!isleaf)
{
if (record->xl_info & XLR_BKP_BLOCK(1))
(void) RestoreBackupBlock(lsn, record, 1, false, false);
else
_bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
}
_bt_clear_incomplete_split(lsn, record, 1, xlrec->node, cblkno);
/* Reconstruct right (new) sibling page from scratch */
rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
@@ -340,87 +322,77 @@ btree_xlog_split(bool onleft, bool isroot,
/* don't release the buffer yet; we touch right page's first item below */
/* Now reconstruct left (original) sibling page */
if (record->xl_info & XLR_BKP_BLOCK(0))
lbuf = RestoreBackupBlock(lsn, record, 0, false, true);
else
if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->leftsib,
&lbuf) == BLK_NEEDS_REDO)
{
lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
/*
* To retain the same physical order of the tuples that they had, we
* initialize a temporary empty page for the left page and add all the
* items to that in item number order. This mirrors how _bt_split()
* works. It's not strictly required to retain the same physical
* order, as long as the items are in the correct item number order,
* but it helps debugging. See also _bt_restore_page(), which does
* the same for the right page.
*/
Page lpage = (Page) BufferGetPage(lbuf);
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
OffsetNumber off;
Page newlpage;
OffsetNumber leftoff;
if (BufferIsValid(lbuf))
newlpage = PageGetTempPageCopySpecial(lpage);
/* Set high key */
leftoff = P_HIKEY;
if (PageAddItem(newlpage, left_hikey, left_hikeysz,
P_HIKEY, false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add high key to left page after split");
leftoff = OffsetNumberNext(leftoff);
for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
{
/*
* To retain the same physical order of the tuples that they had,
* we initialize a temporary empty page for the left page and add
* all the items to that in item number order. This mirrors how
* _bt_split() works. It's not strictly required to retain the
* same physical order, as long as the items are in the correct
* item number order, but it helps debugging. See also
* _bt_restore_page(), which does the same for the right page.
*/
Page lpage = (Page) BufferGetPage(lbuf);
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
ItemId itemid;
Size itemsz;
Item item;
if (lsn > PageGetLSN(lpage))
/* add the new item if it was inserted on left page */
if (onleft && off == newitemoff)
{
OffsetNumber off;
Page newlpage;
OffsetNumber leftoff;
newlpage = PageGetTempPageCopySpecial(lpage);
/* Set high key */
leftoff = P_HIKEY;
if (PageAddItem(newlpage, left_hikey, left_hikeysz,
P_HIKEY, false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add high key to left page after split");
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++)
{
ItemId itemid;
Size itemsz;
Item item;
/* add the new item if it was inserted on left page */
if (onleft && off == newitemoff)
{
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
itemid = PageGetItemId(lpage, off);
itemsz = ItemIdGetLength(itemid);
item = PageGetItem(lpage, itemid);
if (PageAddItem(newlpage, item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add old item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
/* cope with possibility that newitem goes at the end */
if (onleft && off == newitemoff)
{
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
PageRestoreTempPage(newlpage, lpage);
/* Fix opaque fields */
lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
if (isleaf)
lopaque->btpo_flags |= BTP_LEAF;
lopaque->btpo_next = xlrec->rightsib;
lopaque->btpo_cycleid = 0;
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuf);
}
itemid = PageGetItemId(lpage, off);
itemsz = ItemIdGetLength(itemid);
item = PageGetItem(lpage, itemid);
if (PageAddItem(newlpage, item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add old item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
/* cope with possibility that newitem goes at the end */
if (onleft && off == newitemoff)
{
if (PageAddItem(newlpage, newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
PageRestoreTempPage(newlpage, lpage);
/* Fix opaque fields */
lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
if (isleaf)
lopaque->btpo_flags |= BTP_LEAF;
lopaque->btpo_next = xlrec->rightsib;
lopaque->btpo_cycleid = 0;
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuf);
}
/* We no longer need the buffers */
@@ -443,31 +415,21 @@ btree_xlog_split(bool onleft, bool isroot,
* whether this was a leaf or internal page.
*/
int rnext_index = isleaf ? 1 : 2;
Buffer buffer;
if (record->xl_info & XLR_BKP_BLOCK(rnext_index))
(void) RestoreBackupBlock(lsn, record, rnext_index, false, false);
else
if (XLogReadBufferForRedo(lsn, record, rnext_index, xlrec->node,
xlrec->rnext, &buffer) == BLK_NEEDS_REDO)
{
Buffer buffer;
Page page = (Page) BufferGetPage(buffer);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
pageop->btpo_prev = xlrec->rightsib;
if (BufferIsValid(buffer))
{
Page page = (Page) BufferGetPage(buffer);
if (lsn > PageGetLSN(page))
{
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = xlrec->rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
}
@@ -529,54 +491,41 @@ btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
}
}
/*
* If we have a full-page image, restore it (using a cleanup lock) and
* we're done.
*/
if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, true, false);
return;
}
/*
* Like in btvacuumpage(), we need to take a cleanup lock on every leaf
* page. See nbtree/README for details.
*/
buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
if (!BufferIsValid(buffer))
return;
LockBufferForCleanup(buffer);
page = (Page) BufferGetPage(buffer);
if (lsn <= PageGetLSN(page))
if (XLogReadBufferForRedoExtended(lsn, record, 0,
xlrec->node, MAIN_FORKNUM, xlrec->block,
RBM_NORMAL, true, &buffer)
== BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
if (record->xl_len > SizeOfBtreeVacuum)
{
OffsetNumber *unused;
OffsetNumber *unend;
unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
if ((unend - unused) > 0)
PageIndexMultiDelete(page, unused, unend - unused);
}
/*
* Mark the page as not containing any LP_DEAD items --- see comments
* in _bt_delitems_vacuum().
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
return;
}
if (record->xl_len > SizeOfBtreeVacuum)
{
OffsetNumber *unused;
OffsetNumber *unend;
unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
if ((unend - unused) > 0)
PageIndexMultiDelete(page, unused, unend - unused);
}
/*
* Mark the page as not containing any LP_DEAD items --- see comments in
* _bt_delitems_vacuum().
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
/*
@@ -752,47 +701,36 @@ btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
}
/* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return;
}
/*
* We don't need to take a cleanup lock to apply these changes. See
* nbtree/README for details.
*/
buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
if (lsn <= PageGetLSN(page))
if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, xlrec->block,
&buffer) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(buffer);
if (record->xl_len > SizeOfBtreeDelete)
{
OffsetNumber *unused;
unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
PageIndexMultiDelete(page, unused, xlrec->nitems);
}
/*
* Mark the page as not containing any LP_DEAD items --- see comments
* in _bt_delitems_delete().
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
return;
}
if (record->xl_len > SizeOfBtreeDelete)
{
OffsetNumber *unused;
unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
PageIndexMultiDelete(page, unused, xlrec->nitems);
}
/*
* Mark the page as not containing any LP_DEAD items --- see comments in
* _bt_delitems_delete().
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void
@@ -816,42 +754,36 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogRecPtr lsn, XLogRecord *record)
*/
/* parent page */
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
if (XLogReadBufferForRedo(lsn, record, 0, xlrec->target.node, parent,
&buffer) == BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(xlrec->target.node, parent, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (lsn > PageGetLSN(page))
{
OffsetNumber poffset;
ItemId itemid;
IndexTuple itup;
OffsetNumber nextoffset;
BlockNumber rightsib;
OffsetNumber poffset;
ItemId itemid;
IndexTuple itup;
OffsetNumber nextoffset;
BlockNumber rightsib;
poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
nextoffset = OffsetNumberNext(poffset);
itemid = PageGetItemId(page, nextoffset);
itup = (IndexTuple) PageGetItem(page, itemid);
rightsib = ItemPointerGetBlockNumber(&itup->t_tid);
poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
nextoffset = OffsetNumberNext(poffset);
itemid = PageGetItemId(page, nextoffset);
itup = (IndexTuple) PageGetItem(page, itemid);
rightsib = ItemPointerGetBlockNumber(&itup->t_tid);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
}
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* Rewrite the leaf page as a halfdead page */
buffer = XLogReadBuffer(xlrec->target.node, xlrec->leafblk, true);
@@ -911,56 +843,34 @@ btree_xlog_unlink_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
*/
/* Fix left-link of right sibling */
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
if (XLogReadBufferForRedo(lsn, record, 0, xlrec->node, rightsib, &buffer)
== BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(xlrec->node, rightsib, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
if (lsn <= PageGetLSN(page))
{
UnlockReleaseBuffer(buffer);
}
else
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = leftsib;
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = leftsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* Fix right-link of left sibling, if any */
if (record->xl_info & XLR_BKP_BLOCK(1))
(void) RestoreBackupBlock(lsn, record, 1, false, false);
else
if (leftsib != P_NONE)
{
if (leftsib != P_NONE)
if (XLogReadBufferForRedo(lsn, record, 1, xlrec->node, leftsib, &buffer)
== BLK_NEEDS_REDO)
{
buffer = XLogReadBuffer(xlrec->node, leftsib, false);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
if (lsn <= PageGetLSN(page))
{
UnlockReleaseBuffer(buffer);
}
else
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_next = rightsib;
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_next = rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/* Rewrite target page as empty deleted page */
@@ -1071,10 +981,7 @@ btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
/* Clear the incomplete-split flag in left child */
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
_bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
_bt_clear_incomplete_split(lsn, record, 0, xlrec->node, cblkno);
}
PageSetLSN(page, lsn);