1
0
mirror of https://github.com/postgres/postgres.git synced 2025-12-21 05:21:08 +03:00
Files
postgres/src/backend/access/nbtree/nbtxlog.c
Peter Geoghegan 426ea61117 nbtree: Make BTMaxItemSize into object-like macro.
Make nbtree's "1/3 of a page limit" BTMaxItemSize function-like macro
(which accepts a "page" argument) into an object-like macro that can be
used from code that doesn't have convenient access to an nbtree page.

Preparation for an upcoming patch that adds skip scan to nbtree.
Parallel index scans that use skip scan will serialize datums (not just
SAOP array subscripts) when scheduling primitive scans.  BTMaxItemSize
will be used by btestimateparallelscan to determine how much DSM to
request.

Author: Peter Geoghegan <pg@bowt.ie>
Discussion: https://postgr.es/m/CAH2-Wz=H_RG5weNGeUG_TkK87tRBnH9mGCQj6WpM4V4FNWKv2g@mail.gmail.com
2025-03-11 10:35:56 -04:00

1132 lines
33 KiB
C

/*-------------------------------------------------------------------------
*
* nbtxlog.c
* WAL replay logic for btrees.
*
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/nbtree/nbtxlog.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/bufmask.h"
#include "access/nbtree.h"
#include "access/nbtxlog.h"
#include "access/transam.h"
#include "access/xlogutils.h"
#include "storage/standby.h"
#include "utils/memutils.h"
static MemoryContext opCtx; /* working memory for operations */
/*
* _bt_restore_page -- re-enter all the index tuples on a page
*
* The page is freshly init'd, and *from (length len) is a copy of what
* had been its upper part (pd_upper to pd_special). We assume that the
* tuples had been added to the page in item-number order, and therefore
* the one with highest item number appears first (lowest on the page).
*/
static void
_bt_restore_page(Page page, char *from, int len)
{
IndexTupleData itupdata;
Size itemsz;
char *end = from + len;
Item items[MaxIndexTuplesPerPage];
uint16 itemsizes[MaxIndexTuplesPerPage];
int i;
int nitems;
/*
* To get the items back in the original order, we add them to the page in
* reverse. To figure out where one tuple ends and another begins, we
* have to scan them in forward order first.
*/
i = 0;
while (from < end)
{
/*
* As we step through the items, 'from' won't always be properly
* aligned, so we need to use memcpy(). Further, we use Item (which
* is just a char*) here for our items array for the same reason;
* wouldn't want the compiler or anyone thinking that an item is
* aligned when it isn't.
*/
memcpy(&itupdata, from, sizeof(IndexTupleData));
itemsz = IndexTupleSize(&itupdata);
itemsz = MAXALIGN(itemsz);
items[i] = (Item) from;
itemsizes[i] = itemsz;
i++;
from += itemsz;
}
nitems = i;
for (i = nitems - 1; i >= 0; i--)
{
if (PageAddItem(page, items[i], itemsizes[i], nitems - i,
false, false) == InvalidOffsetNumber)
elog(PANIC, "_bt_restore_page: cannot add item to page");
}
}
static void
_bt_restore_meta(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr lsn = record->EndRecPtr;
Buffer metabuf;
Page metapg;
BTMetaPageData *md;
BTPageOpaque pageop;
xl_btree_metadata *xlrec;
char *ptr;
Size len;
metabuf = XLogInitBufferForRedo(record, block_id);
ptr = XLogRecGetBlockData(record, block_id, &len);
Assert(len == sizeof(xl_btree_metadata));
Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
xlrec = (xl_btree_metadata *) ptr;
metapg = BufferGetPage(metabuf);
_bt_pageinit(metapg, BufferGetPageSize(metabuf));
md = BTPageGetMeta(metapg);
md->btm_magic = BTREE_MAGIC;
md->btm_version = xlrec->version;
md->btm_root = xlrec->root;
md->btm_level = xlrec->level;
md->btm_fastroot = xlrec->fastroot;
md->btm_fastlevel = xlrec->fastlevel;
/* Cannot log BTREE_MIN_VERSION index metapage without upgrade */
Assert(md->btm_version >= BTREE_NOVAC_VERSION);
md->btm_last_cleanup_num_delpages = xlrec->last_cleanup_num_delpages;
md->btm_last_cleanup_num_heap_tuples = -1.0;
md->btm_allequalimage = xlrec->allequalimage;
pageop = BTPageGetOpaque(metapg);
pageop->btpo_flags = BTP_META;
/*
* Set pd_lower just past the end of the metadata. This is essential,
* because without doing so, metadata will be lost if xlog.c compresses
* the page.
*/
((PageHeader) metapg)->pd_lower =
((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
PageSetLSN(metapg, lsn);
MarkBufferDirty(metabuf);
UnlockReleaseBuffer(metabuf);
}
/*
* _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
*
* This is a common subroutine of the redo functions of all the WAL record
* types that can insert a downlink: insert, split, and newroot.
*/
static void
_bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr lsn = record->EndRecPtr;
Buffer buf;
if (XLogReadBufferForRedo(record, block_id, &buf) == BLK_NEEDS_REDO)
{
Page page = (Page) BufferGetPage(buf);
BTPageOpaque pageop = BTPageGetOpaque(page);
Assert(P_INCOMPLETE_SPLIT(pageop));
pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
PageSetLSN(page, lsn);
MarkBufferDirty(buf);
}
if (BufferIsValid(buf))
UnlockReleaseBuffer(buf);
}
static void
btree_xlog_insert(bool isleaf, bool ismeta, bool posting,
XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
Buffer buffer;
Page page;
/*
* Insertion to an internal page finishes an incomplete split at the child
* level. Clear the incomplete-split flag in the child. Note: during
* normal operation, the child and parent pages are locked at the same
* time (the locks are coupled), so that clearing the flag and inserting
* the downlink appear atomic to other backends. We don't bother with
* that during replay, because readers don't care about the
* incomplete-split flag and there cannot be updates happening.
*/
if (!isleaf)
_bt_clear_incomplete_split(record, 1);
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
Size datalen;
char *datapos = XLogRecGetBlockData(record, 0, &datalen);
page = BufferGetPage(buffer);
if (!posting)
{
/* Simple retail insertion */
if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add new item");
}
else
{
ItemId itemid;
IndexTuple oposting,
newitem,
nposting;
uint16 postingoff;
/*
* A posting list split occurred during leaf page insertion. WAL
* record data will start with an offset number representing the
* point in an existing posting list that a split occurs at.
*
* Use _bt_swap_posting() to repeat posting list split steps from
* primary. Note that newitem from WAL record is 'orignewitem',
* not the final version of newitem that is actually inserted on
* page.
*/
postingoff = *((uint16 *) datapos);
datapos += sizeof(uint16);
datalen -= sizeof(uint16);
itemid = PageGetItemId(page, OffsetNumberPrev(xlrec->offnum));
oposting = (IndexTuple) PageGetItem(page, itemid);
/* Use mutable, aligned newitem copy in _bt_swap_posting() */
Assert(isleaf && postingoff > 0);
newitem = CopyIndexTuple((IndexTuple) datapos);
nposting = _bt_swap_posting(newitem, oposting, postingoff);
/* Replace existing posting list with post-split version */
memcpy(oposting, nposting, MAXALIGN(IndexTupleSize(nposting)));
/* Insert "final" new item (not orignewitem from WAL stream) */
Assert(IndexTupleSize(newitem) == datalen);
if (PageAddItem(page, (Item) newitem, datalen, xlrec->offnum,
false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add posting split new item");
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/*
* Note: in normal operation, we'd update the metapage while still holding
* lock on the page we inserted into. But during replay it's not
* necessary to hold that lock, since no other index updates can be
* happening concurrently, and readers will cope fine with following an
* obsolete link from the metapage.
*/
if (ismeta)
_bt_restore_meta(record, 2);
}
static void
btree_xlog_split(bool newitemonleft, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
bool isleaf = (xlrec->level == 0);
Buffer buf;
Buffer rbuf;
Page rpage;
BTPageOpaque ropaque;
char *datapos;
Size datalen;
BlockNumber origpagenumber;
BlockNumber rightpagenumber;
BlockNumber spagenumber;
XLogRecGetBlockTag(record, 0, NULL, NULL, &origpagenumber);
XLogRecGetBlockTag(record, 1, NULL, NULL, &rightpagenumber);
if (!XLogRecGetBlockTagExtended(record, 2, NULL, NULL, &spagenumber, NULL))
spagenumber = P_NONE;
/*
* Clear the incomplete split flag on the appropriate child page one level
* down when origpage/buf is an internal page (there must have been
* cascading page splits during original execution in the event of an
* internal page split). This is like the corresponding btree_xlog_insert
* call for internal pages. We're not clearing the incomplete split flag
* for the current page split here (you can think of this as part of the
* insert of newitem that the page split action needs to perform in
* passing).
*
* Like in btree_xlog_insert, this can be done before locking other pages.
* We never need to couple cross-level locks in REDO routines.
*/
if (!isleaf)
_bt_clear_incomplete_split(record, 3);
/* Reconstruct right (new) sibling page from scratch */
rbuf = XLogInitBufferForRedo(record, 1);
datapos = XLogRecGetBlockData(record, 1, &datalen);
rpage = (Page) BufferGetPage(rbuf);
_bt_pageinit(rpage, BufferGetPageSize(rbuf));
ropaque = BTPageGetOpaque(rpage);
ropaque->btpo_prev = origpagenumber;
ropaque->btpo_next = spagenumber;
ropaque->btpo_level = xlrec->level;
ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
ropaque->btpo_cycleid = 0;
_bt_restore_page(rpage, datapos, datalen);
PageSetLSN(rpage, lsn);
MarkBufferDirty(rbuf);
/* Now reconstruct original page (left half of split) */
if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
{
/*
* To retain the same physical order of the tuples that they had, we
* initialize a temporary empty page for the left page and add all the
* items to that in item number order. This mirrors how _bt_split()
* works. Retaining the same physical order makes WAL consistency
* checking possible. See also _bt_restore_page(), which does the
* same for the right page.
*/
Page origpage = (Page) BufferGetPage(buf);
BTPageOpaque oopaque = BTPageGetOpaque(origpage);
OffsetNumber off;
IndexTuple newitem = NULL,
left_hikey = NULL,
nposting = NULL;
Size newitemsz = 0,
left_hikeysz = 0;
Page leftpage;
OffsetNumber leftoff,
replacepostingoff = InvalidOffsetNumber;
datapos = XLogRecGetBlockData(record, 0, &datalen);
if (newitemonleft || xlrec->postingoff != 0)
{
newitem = (IndexTuple) datapos;
newitemsz = MAXALIGN(IndexTupleSize(newitem));
datapos += newitemsz;
datalen -= newitemsz;
if (xlrec->postingoff != 0)
{
ItemId itemid;
IndexTuple oposting;
/* Posting list must be at offset number before new item's */
replacepostingoff = OffsetNumberPrev(xlrec->newitemoff);
/* Use mutable, aligned newitem copy in _bt_swap_posting() */
newitem = CopyIndexTuple(newitem);
itemid = PageGetItemId(origpage, replacepostingoff);
oposting = (IndexTuple) PageGetItem(origpage, itemid);
nposting = _bt_swap_posting(newitem, oposting,
xlrec->postingoff);
}
}
/*
* Extract left hikey and its size. We assume that 16-bit alignment
* is enough to apply IndexTupleSize (since it's fetching from a
* uint16 field).
*/
left_hikey = (IndexTuple) datapos;
left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
datapos += left_hikeysz;
datalen -= left_hikeysz;
Assert(datalen == 0);
leftpage = PageGetTempPageCopySpecial(origpage);
/* Add high key tuple from WAL record to temp page */
leftoff = P_HIKEY;
if (PageAddItem(leftpage, (Item) left_hikey, left_hikeysz, P_HIKEY,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add high key to left page after split");
leftoff = OffsetNumberNext(leftoff);
for (off = P_FIRSTDATAKEY(oopaque); off < xlrec->firstrightoff; off++)
{
ItemId itemid;
Size itemsz;
IndexTuple item;
/* Add replacement posting list when required */
if (off == replacepostingoff)
{
Assert(newitemonleft ||
xlrec->firstrightoff == xlrec->newitemoff);
if (PageAddItem(leftpage, (Item) nposting,
MAXALIGN(IndexTupleSize(nposting)), leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new posting list item to left page after split");
leftoff = OffsetNumberNext(leftoff);
continue; /* don't insert oposting */
}
/* add the new item if it was inserted on left page */
else if (newitemonleft && off == xlrec->newitemoff)
{
if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
itemid = PageGetItemId(origpage, off);
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(origpage, itemid);
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add old item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
/* cope with possibility that newitem goes at the end */
if (newitemonleft && off == xlrec->newitemoff)
{
if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff);
}
PageRestoreTempPage(leftpage, origpage);
/* Fix opaque fields */
oopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
if (isleaf)
oopaque->btpo_flags |= BTP_LEAF;
oopaque->btpo_next = rightpagenumber;
oopaque->btpo_cycleid = 0;
PageSetLSN(origpage, lsn);
MarkBufferDirty(buf);
}
/* Fix left-link of the page to the right of the new right sibling */
if (spagenumber != P_NONE)
{
Buffer sbuf;
if (XLogReadBufferForRedo(record, 2, &sbuf) == BLK_NEEDS_REDO)
{
Page spage = (Page) BufferGetPage(sbuf);
BTPageOpaque spageop = BTPageGetOpaque(spage);
spageop->btpo_prev = rightpagenumber;
PageSetLSN(spage, lsn);
MarkBufferDirty(sbuf);
}
if (BufferIsValid(sbuf))
UnlockReleaseBuffer(sbuf);
}
/*
* Finally, release the remaining buffers. sbuf, rbuf, and buf must be
* released together, so that readers cannot observe inconsistencies.
*/
UnlockReleaseBuffer(rbuf);
if (BufferIsValid(buf))
UnlockReleaseBuffer(buf);
}
static void
btree_xlog_dedup(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_dedup *xlrec = (xl_btree_dedup *) XLogRecGetData(record);
Buffer buf;
if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
{
char *ptr = XLogRecGetBlockData(record, 0, NULL);
Page page = (Page) BufferGetPage(buf);
BTPageOpaque opaque = BTPageGetOpaque(page);
OffsetNumber offnum,
minoff,
maxoff;
BTDedupState state;
BTDedupInterval *intervals;
Page newpage;
state = (BTDedupState) palloc(sizeof(BTDedupStateData));
state->deduplicate = true; /* unused */
state->nmaxitems = 0; /* unused */
/* Conservatively use larger maxpostingsize than primary */
state->maxpostingsize = BTMaxItemSize;
state->base = NULL;
state->baseoff = InvalidOffsetNumber;
state->basetupsize = 0;
state->htids = palloc(state->maxpostingsize);
state->nhtids = 0;
state->nitems = 0;
state->phystupsize = 0;
state->nintervals = 0;
minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page);
newpage = PageGetTempPageCopySpecial(page);
if (!P_RIGHTMOST(opaque))
{
ItemId itemid = PageGetItemId(page, P_HIKEY);
Size itemsz = ItemIdGetLength(itemid);
IndexTuple item = (IndexTuple) PageGetItem(page, itemid);
if (PageAddItem(newpage, (Item) item, itemsz, P_HIKEY,
false, false) == InvalidOffsetNumber)
elog(ERROR, "deduplication failed to add highkey");
}
intervals = (BTDedupInterval *) ptr;
for (offnum = minoff;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
ItemId itemid = PageGetItemId(page, offnum);
IndexTuple itup = (IndexTuple) PageGetItem(page, itemid);
if (offnum == minoff)
_bt_dedup_start_pending(state, itup, offnum);
else if (state->nintervals < xlrec->nintervals &&
state->baseoff == intervals[state->nintervals].baseoff &&
state->nitems < intervals[state->nintervals].nitems)
{
if (!_bt_dedup_save_htid(state, itup))
elog(ERROR, "deduplication failed to add heap tid to pending posting list");
}
else
{
_bt_dedup_finish_pending(newpage, state);
_bt_dedup_start_pending(state, itup, offnum);
}
}
_bt_dedup_finish_pending(newpage, state);
Assert(state->nintervals == xlrec->nintervals);
Assert(memcmp(state->intervals, intervals,
state->nintervals * sizeof(BTDedupInterval)) == 0);
if (P_HAS_GARBAGE(opaque))
{
BTPageOpaque nopaque = BTPageGetOpaque(newpage);
nopaque->btpo_flags &= ~BTP_HAS_GARBAGE;
}
PageRestoreTempPage(newpage, page);
PageSetLSN(page, lsn);
MarkBufferDirty(buf);
}
if (BufferIsValid(buf))
UnlockReleaseBuffer(buf);
}
static void
btree_xlog_updates(Page page, OffsetNumber *updatedoffsets,
xl_btree_update *updates, int nupdated)
{
BTVacuumPosting vacposting;
IndexTuple origtuple;
ItemId itemid;
Size itemsz;
for (int i = 0; i < nupdated; i++)
{
itemid = PageGetItemId(page, updatedoffsets[i]);
origtuple = (IndexTuple) PageGetItem(page, itemid);
vacposting = palloc(offsetof(BTVacuumPostingData, deletetids) +
updates->ndeletedtids * sizeof(uint16));
vacposting->updatedoffset = updatedoffsets[i];
vacposting->itup = origtuple;
vacposting->ndeletedtids = updates->ndeletedtids;
memcpy(vacposting->deletetids,
(char *) updates + SizeOfBtreeUpdate,
updates->ndeletedtids * sizeof(uint16));
_bt_update_posting(vacposting);
/* Overwrite updated version of tuple */
itemsz = MAXALIGN(IndexTupleSize(vacposting->itup));
if (!PageIndexTupleOverwrite(page, updatedoffsets[i],
(Item) vacposting->itup, itemsz))
elog(PANIC, "failed to update partially dead item");
pfree(vacposting->itup);
pfree(vacposting);
/* advance to next xl_btree_update from array */
updates = (xl_btree_update *)
((char *) updates + SizeOfBtreeUpdate +
updates->ndeletedtids * sizeof(uint16));
}
}
static void
btree_xlog_vacuum(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque opaque;
/*
* We need to take a cleanup lock here, just like btvacuumpage(). However,
* it isn't necessary to exhaustively get a cleanup lock on every block in
* the index during recovery (just getting a cleanup lock on pages with
* items to kill suffices). See nbtree/README for details.
*/
if (XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer)
== BLK_NEEDS_REDO)
{
char *ptr = XLogRecGetBlockData(record, 0, NULL);
page = (Page) BufferGetPage(buffer);
if (xlrec->nupdated > 0)
{
OffsetNumber *updatedoffsets;
xl_btree_update *updates;
updatedoffsets = (OffsetNumber *)
(ptr + xlrec->ndeleted * sizeof(OffsetNumber));
updates = (xl_btree_update *) ((char *) updatedoffsets +
xlrec->nupdated *
sizeof(OffsetNumber));
btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated);
}
if (xlrec->ndeleted > 0)
PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
/*
* Clear the vacuum cycle ID, and mark the page as not containing any
* LP_DEAD items
*/
opaque = BTPageGetOpaque(page);
opaque->btpo_cycleid = 0;
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
static void
btree_xlog_delete(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque opaque;
/*
* If we have any conflict processing to do, it must happen before we
* update the page
*/
if (InHotStandby)
{
RelFileLocator rlocator;
XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon,
xlrec->isCatalogRel,
rlocator);
}
/*
* We don't need to take a cleanup lock to apply these changes. See
* nbtree/README for details.
*/
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
char *ptr = XLogRecGetBlockData(record, 0, NULL);
page = (Page) BufferGetPage(buffer);
if (xlrec->nupdated > 0)
{
OffsetNumber *updatedoffsets;
xl_btree_update *updates;
updatedoffsets = (OffsetNumber *)
(ptr + xlrec->ndeleted * sizeof(OffsetNumber));
updates = (xl_btree_update *) ((char *) updatedoffsets +
xlrec->nupdated *
sizeof(OffsetNumber));
btree_xlog_updates(page, updatedoffsets, updates, xlrec->nupdated);
}
if (xlrec->ndeleted > 0)
PageIndexMultiDelete(page, (OffsetNumber *) ptr, xlrec->ndeleted);
/*
* Do *not* clear the vacuum cycle ID, but do mark the page as not
* containing any LP_DEAD items
*/
opaque = BTPageGetOpaque(page);
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
static void
btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque pageop;
IndexTupleData trunctuple;
/*
* In normal operation, we would lock all the pages this WAL record
* touches before changing any of them. In WAL replay, it should be okay
* to lock just one page at a time, since no concurrent index updates can
* be happening, and readers should not care whether they arrive at the
* target page or not (since it's surely empty).
*/
/* to-be-deleted subtree's parent page */
if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
{
OffsetNumber poffset;
ItemId itemid;
IndexTuple itup;
OffsetNumber nextoffset;
BlockNumber rightsib;
page = (Page) BufferGetPage(buffer);
pageop = BTPageGetOpaque(page);
poffset = xlrec->poffset;
nextoffset = OffsetNumberNext(poffset);
itemid = PageGetItemId(page, nextoffset);
itup = (IndexTuple) PageGetItem(page, itemid);
rightsib = BTreeTupleGetDownLink(itup);
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
BTreeTupleSetDownLink(itup, rightsib);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
/*
* Don't need to couple cross-level locks in REDO routines, so release
* lock on internal page immediately
*/
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
/* Rewrite the leaf page as a halfdead page */
buffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = BTPageGetOpaque(page);
pageop->btpo_prev = xlrec->leftblk;
pageop->btpo_next = xlrec->rightblk;
pageop->btpo_level = 0;
pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
pageop->btpo_cycleid = 0;
/*
* Construct a dummy high key item that points to top parent page (value
* is InvalidBlockNumber when the top parent page is the leaf page itself)
*/
MemSet(&trunctuple, 0, sizeof(IndexTupleData));
trunctuple.t_info = sizeof(IndexTupleData);
BTreeTupleSetTopParent(&trunctuple, xlrec->topparent);
if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
false, false) == InvalidOffsetNumber)
elog(ERROR, "could not add dummy high key to half-dead page");
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void
btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *) XLogRecGetData(record);
BlockNumber leftsib;
BlockNumber rightsib;
uint32 level;
bool isleaf;
FullTransactionId safexid;
Buffer leftbuf;
Buffer target;
Buffer rightbuf;
Page page;
BTPageOpaque pageop;
leftsib = xlrec->leftsib;
rightsib = xlrec->rightsib;
level = xlrec->level;
isleaf = (level == 0);
safexid = xlrec->safexid;
/* No leaftopparent for level 0 (leaf page) or level 1 target */
Assert(!BlockNumberIsValid(xlrec->leaftopparent) || level > 1);
/*
* In normal operation, we would lock all the pages this WAL record
* touches before changing any of them. In WAL replay, we at least lock
* the pages in the same standard left-to-right order (leftsib, target,
* rightsib), and don't release the sibling locks until the target is
* marked deleted.
*/
/* Fix right-link of left sibling, if any */
if (leftsib != P_NONE)
{
if (XLogReadBufferForRedo(record, 1, &leftbuf) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(leftbuf);
pageop = BTPageGetOpaque(page);
pageop->btpo_next = rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(leftbuf);
}
}
else
leftbuf = InvalidBuffer;
/* Rewrite target page as empty deleted page */
target = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(target);
_bt_pageinit(page, BufferGetPageSize(target));
pageop = BTPageGetOpaque(page);
pageop->btpo_prev = leftsib;
pageop->btpo_next = rightsib;
pageop->btpo_level = level;
BTPageSetDeleted(page, safexid);
if (isleaf)
pageop->btpo_flags |= BTP_LEAF;
pageop->btpo_cycleid = 0;
PageSetLSN(page, lsn);
MarkBufferDirty(target);
/* Fix left-link of right sibling */
if (XLogReadBufferForRedo(record, 2, &rightbuf) == BLK_NEEDS_REDO)
{
page = (Page) BufferGetPage(rightbuf);
pageop = BTPageGetOpaque(page);
pageop->btpo_prev = leftsib;
PageSetLSN(page, lsn);
MarkBufferDirty(rightbuf);
}
/* Release siblings */
if (BufferIsValid(leftbuf))
UnlockReleaseBuffer(leftbuf);
if (BufferIsValid(rightbuf))
UnlockReleaseBuffer(rightbuf);
/* Release target */
UnlockReleaseBuffer(target);
/*
* If we deleted a parent of the targeted leaf page, instead of the leaf
* itself, update the leaf to point to the next remaining child in the
* to-be-deleted subtree
*/
if (XLogRecHasBlockRef(record, 3))
{
/*
* There is no real data on the page, so we just re-create it from
* scratch using the information from the WAL record.
*
* Note that we don't end up here when the target page is also the
* leafbuf page. There is no need to add a dummy hikey item with a
* top parent link when deleting leafbuf because it's the last page
* we'll delete in the subtree undergoing deletion.
*/
Buffer leafbuf;
IndexTupleData trunctuple;
Assert(!isleaf);
leafbuf = XLogInitBufferForRedo(record, 3);
page = (Page) BufferGetPage(leafbuf);
_bt_pageinit(page, BufferGetPageSize(leafbuf));
pageop = BTPageGetOpaque(page);
pageop->btpo_flags = BTP_HALF_DEAD | BTP_LEAF;
pageop->btpo_prev = xlrec->leafleftsib;
pageop->btpo_next = xlrec->leafrightsib;
pageop->btpo_level = 0;
pageop->btpo_cycleid = 0;
/* Add a dummy hikey item */
MemSet(&trunctuple, 0, sizeof(IndexTupleData));
trunctuple.t_info = sizeof(IndexTupleData);
BTreeTupleSetTopParent(&trunctuple, xlrec->leaftopparent);
if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
false, false) == InvalidOffsetNumber)
elog(ERROR, "could not add dummy high key to half-dead page");
PageSetLSN(page, lsn);
MarkBufferDirty(leafbuf);
UnlockReleaseBuffer(leafbuf);
}
/* Update metapage if needed */
if (info == XLOG_BTREE_UNLINK_PAGE_META)
_bt_restore_meta(record, 4);
}
static void
btree_xlog_newroot(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
Buffer buffer;
Page page;
BTPageOpaque pageop;
char *ptr;
Size len;
buffer = XLogInitBufferForRedo(record, 0);
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = BTPageGetOpaque(page);
pageop->btpo_flags = BTP_ROOT;
pageop->btpo_prev = pageop->btpo_next = P_NONE;
pageop->btpo_level = xlrec->level;
if (xlrec->level == 0)
pageop->btpo_flags |= BTP_LEAF;
pageop->btpo_cycleid = 0;
if (xlrec->level > 0)
{
ptr = XLogRecGetBlockData(record, 0, &len);
_bt_restore_page(page, ptr, len);
/* Clear the incomplete-split flag in left child */
_bt_clear_incomplete_split(record, 1);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
_bt_restore_meta(record, 2);
}
/*
* In general VACUUM must defer recycling as a way of avoiding certain race
* conditions. Deleted pages contain a safexid value that is used by VACUUM
* to determine whether or not it's safe to place a page that was deleted by
* VACUUM earlier into the FSM now. See nbtree/README.
*
* As far as any backend operating during original execution is concerned, the
* FSM is a cache of recycle-safe pages; the mere presence of the page in the
* FSM indicates that the page must already be safe to recycle (actually,
* _bt_allocbuf() verifies it's safe using BTPageIsRecyclable(), but that's
* just because it would be unwise to completely trust the FSM, given its
* current limitations).
*
* This isn't sufficient to prevent similar concurrent recycling race
* conditions during Hot Standby, though. For that we need to log a
* xl_btree_reuse_page record at the point that a page is actually recycled
* and reused for an entirely unrelated page inside _bt_split(). These
* records include the same safexid value from the original deleted page,
* stored in the record's snapshotConflictHorizon field.
*
* The GlobalVisCheckRemovableFullXid() test in BTPageIsRecyclable() is used
* to determine if it's safe to recycle a page. This mirrors our own test:
* the PGPROC->xmin > limitXmin test inside GetConflictingVirtualXIDs().
* Consequently, one XID value achieves the same exclusion effect on primary
* and standby.
*/
static void
btree_xlog_reuse_page(XLogReaderState *record)
{
xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
if (InHotStandby)
ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon,
xlrec->isCatalogRel,
xlrec->locator);
}
void
btree_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
MemoryContext oldCtx;
oldCtx = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_BTREE_INSERT_LEAF:
btree_xlog_insert(true, false, false, record);
break;
case XLOG_BTREE_INSERT_UPPER:
btree_xlog_insert(false, false, false, record);
break;
case XLOG_BTREE_INSERT_META:
btree_xlog_insert(false, true, false, record);
break;
case XLOG_BTREE_SPLIT_L:
btree_xlog_split(true, record);
break;
case XLOG_BTREE_SPLIT_R:
btree_xlog_split(false, record);
break;
case XLOG_BTREE_INSERT_POST:
btree_xlog_insert(true, false, true, record);
break;
case XLOG_BTREE_DEDUP:
btree_xlog_dedup(record);
break;
case XLOG_BTREE_VACUUM:
btree_xlog_vacuum(record);
break;
case XLOG_BTREE_DELETE:
btree_xlog_delete(record);
break;
case XLOG_BTREE_MARK_PAGE_HALFDEAD:
btree_xlog_mark_page_halfdead(info, record);
break;
case XLOG_BTREE_UNLINK_PAGE:
case XLOG_BTREE_UNLINK_PAGE_META:
btree_xlog_unlink_page(info, record);
break;
case XLOG_BTREE_NEWROOT:
btree_xlog_newroot(record);
break;
case XLOG_BTREE_REUSE_PAGE:
btree_xlog_reuse_page(record);
break;
case XLOG_BTREE_META_CLEANUP:
_bt_restore_meta(record, 0);
break;
default:
elog(PANIC, "btree_redo: unknown op code %u", info);
}
MemoryContextSwitchTo(oldCtx);
MemoryContextReset(opCtx);
}
void
btree_xlog_startup(void)
{
opCtx = AllocSetContextCreate(CurrentMemoryContext,
"Btree recovery temporary context",
ALLOCSET_DEFAULT_SIZES);
}
void
btree_xlog_cleanup(void)
{
MemoryContextDelete(opCtx);
opCtx = NULL;
}
/*
* Mask a btree page before performing consistency checks on it.
*/
void
btree_mask(char *pagedata, BlockNumber blkno)
{
Page page = (Page) pagedata;
BTPageOpaque maskopaq;
mask_page_lsn_and_checksum(page);
mask_page_hint_bits(page);
mask_unused_space(page);
maskopaq = BTPageGetOpaque(page);
if (P_ISLEAF(maskopaq))
{
/*
* In btree leaf pages, it is possible to modify the LP_FLAGS without
* emitting any WAL record. Hence, mask the line pointer flags. See
* _bt_killitems(), _bt_check_unique() for details.
*/
mask_lp_flags(page);
}
/*
* BTP_HAS_GARBAGE is just an un-logged hint bit. So, mask it. See
* _bt_delete_or_dedup_one_page(), _bt_killitems(), and _bt_check_unique()
* for details.
*/
maskopaq->btpo_flags &= ~BTP_HAS_GARBAGE;
/*
* During replay of a btree page split, we don't set the BTP_SPLIT_END
* flag of the right sibling and initialize the cycle_id to 0 for the same
* page. See btree_xlog_split() for details.
*/
maskopaq->btpo_flags &= ~BTP_SPLIT_END;
maskopaq->btpo_cycleid = 0;
}