1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Make btree index structure adjustments and WAL logging changes needed to

support btree compaction, as per proposal of a few days ago.  btree index
pages no longer store parent links, instead they have a level indicator
(counting up from zero for leaf pages).  The FixBTree recovery logic is
removed, and replaced by code that detects missing parent-level insertions
during WAL replay.  Also, generate appropriate WAL entries when updating
btree metapage and when building a btree index from scratch.  I believe
btree indexes are now completely WAL-legal for the first time.
initdb forced due to index and WAL changes.
This commit is contained in:
Tom Lane
2003-02-21 00:06:22 +00:00
parent 4df0f1d26f
commit 70508ba7ae
13 changed files with 2179 additions and 1875 deletions

View File

@ -12,21 +12,17 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.94 2002/11/15 01:26:08 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.95 2003/02/21 00:06:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/nbtree.h"
#include "catalog/index.h"
#include "executor/executor.h"
#include "miscadmin.h"
#include "storage/sinval.h"
#include "access/xlogutils.h"
/* Working state for btbuild and its callback */
@ -817,396 +813,3 @@ _bt_restscan(IndexScanDesc scan)
ItemPointerSet(current, blkno, offnum);
}
}
static void
_bt_restore_page(Page page, char *from, int len)
{
BTItemData btdata;
Size itemsz;
char *end = from + len;
for (; from < end;)
{
memcpy(&btdata, from, sizeof(BTItemData));
itemsz = IndexTupleDSize(btdata.bti_itup) +
(sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item) from, itemsz,
FirstOffsetNumber, LP_USED) == InvalidOffsetNumber)
elog(PANIC, "_bt_restore_page: can't add item to page");
from += itemsz;
}
}
static void
btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_delete *xlrec;
Relation reln;
Buffer buffer;
Page page;
if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
return;
xlrec = (xl_btree_delete *) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
elog(PANIC, "btree_delete_redo: block unfound");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_delete_redo: uninitialized page");
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
return;
}
static void
btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_insert *xlrec;
Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
return;
xlrec = (xl_btree_insert *) XLogRecGetData(record);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
elog(PANIC, "btree_insert_%sdo: block unfound", (redo) ? "re" : "un");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_insert_%sdo: uninitialized page", (redo) ? "re" : "un");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
if (PageAddItem(page, (Item) ((char *) xlrec + SizeOfBtreeInsert),
record->xl_len - SizeOfBtreeInsert,
ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
LP_USED) == InvalidOffsetNumber)
elog(PANIC, "btree_insert_redo: failed to add item");
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
else
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(PANIC, "btree_insert_undo: bad page LSN");
if (!P_ISLEAF(pageop))
{
UnlockAndReleaseBuffer(buffer);
return;
}
elog(PANIC, "btree_insert_undo: unimplemented");
}
return;
}
static void
btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
Relation reln;
BlockNumber blkno;
Buffer buffer;
Page page;
BTPageOpaque pageop;
char *op = (redo) ? "redo" : "undo";
bool isleaf = (record->xl_info & XLOG_BTREE_LEAF);
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
/* Left (original) sibling */
blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
BlockIdGetBlockNumber(&(xlrec->otherblk));
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_split_%s: lost left sibling", op);
page = (Page) BufferGetPage(buffer);
if (redo)
_bt_pageinit(page, BufferGetPageSize(buffer));
else if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_split_undo: uninitialized left sibling");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
pageop->btpo_prev = BlockIdGetBlockNumber(&(xlrec->leftblk));
if (onleft)
pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->otherblk));
else
pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));
pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
_bt_restore_page(page, (char *) xlrec + SizeOfBtreeSplit, xlrec->leftlen);
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
else
/* undo */
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(PANIC, "btree_split_undo: bad left sibling LSN");
elog(PANIC, "btree_split_undo: unimplemented");
}
/* Right (new) sibling */
blkno = (onleft) ? BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_split_%s: lost right sibling", op);
page = (Page) BufferGetPage(buffer);
if (redo)
_bt_pageinit(page, BufferGetPageSize(buffer));
else if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_split_undo: uninitialized right sibling");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
pageop->btpo_parent = BlockIdGetBlockNumber(&(xlrec->parentblk));
pageop->btpo_prev = (onleft) ?
ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
BlockIdGetBlockNumber(&(xlrec->otherblk));
pageop->btpo_next = BlockIdGetBlockNumber(&(xlrec->rightblk));
pageop->btpo_flags = (isleaf) ? BTP_LEAF : 0;
_bt_restore_page(page,
(char *) xlrec + SizeOfBtreeSplit + xlrec->leftlen,
record->xl_len - SizeOfBtreeSplit - xlrec->leftlen);
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
else
/* undo */
{
if (XLByteLT(PageGetLSN(page), lsn))
elog(PANIC, "btree_split_undo: bad right sibling LSN");
elog(PANIC, "btree_split_undo: unimplemented");
}
if (!redo || (record->xl_info & XLR_BKP_BLOCK_1))
return;
/* Right (next) page */
blkno = BlockIdGetBlockNumber(&(xlrec->rightblk));
if (blkno == P_NONE)
return;
buffer = XLogReadBuffer(false, reln, blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_split_redo: lost next right page");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_split_redo: uninitialized next right page");
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = (onleft) ?
BlockIdGetBlockNumber(&(xlrec->otherblk)) :
ItemPointerGetBlockNumber(&(xlrec->target.tid));
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
static void
btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
Relation reln;
Buffer buffer;
Page page;
BTPageOpaque pageop;
Buffer metabuf;
Page metapg;
BTMetaPageData md;
if (!redo)
return;
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
if (!BufferIsValid(buffer))
elog(PANIC, "btree_newroot_redo: no root page");
metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_newroot_redo: no metapage");
page = (Page) BufferGetPage(buffer);
_bt_pageinit(page, BufferGetPageSize(buffer));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_flags |= BTP_ROOT;
pageop->btpo_prev = pageop->btpo_next = P_NONE;
pageop->btpo_parent = BTREE_METAPAGE;
if (record->xl_info & XLOG_BTREE_LEAF)
pageop->btpo_flags |= BTP_LEAF;
if (record->xl_len > SizeOfBtreeNewroot)
_bt_restore_page(page,
(char *) xlrec + SizeOfBtreeNewroot,
record->xl_len - SizeOfBtreeNewroot);
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
metapg = BufferGetPage(metabuf);
_bt_pageinit(metapg, BufferGetPageSize(metabuf));
md.btm_magic = BTREE_MAGIC;
md.btm_version = BTREE_VERSION;
md.btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
md.btm_level = xlrec->level;
memcpy((char *) BTPageGetMeta(metapg), (char *) &md, sizeof(md));
pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
pageop->btpo_flags = BTP_META;
PageSetLSN(metapg, lsn);
PageSetSUI(metapg, ThisStartUpID);
UnlockAndWriteBuffer(metabuf);
}
void
btree_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
info &= ~XLOG_BTREE_LEAF;
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(true, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(true, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
btree_xlog_split(true, false, lsn, record); /* new item on the right */
else if (info == XLOG_BTREE_SPLEFT)
btree_xlog_split(true, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(true, lsn, record);
else
elog(PANIC, "btree_redo: unknown op code %u", info);
}
void
btree_undo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
info &= ~XLOG_BTREE_LEAF;
if (info == XLOG_BTREE_DELETE)
btree_xlog_delete(false, lsn, record);
else if (info == XLOG_BTREE_INSERT)
btree_xlog_insert(false, lsn, record);
else if (info == XLOG_BTREE_SPLIT)
btree_xlog_split(false, false, lsn, record); /* new item on the right */
else if (info == XLOG_BTREE_SPLEFT)
btree_xlog_split(false, true, lsn, record); /* new item on the left */
else if (info == XLOG_BTREE_NEWROOT)
btree_xlog_newroot(false, lsn, record);
else
elog(PANIC, "btree_undo: unknown op code %u", info);
}
static void
out_target(char *buf, xl_btreetid *target)
{
sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u",
target->node.tblNode, target->node.relNode,
ItemPointerGetBlockNumber(&(target->tid)),
ItemPointerGetOffsetNumber(&(target->tid)));
}
void
btree_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
info &= ~XLOG_BTREE_LEAF;
if (info == XLOG_BTREE_INSERT)
{
xl_btree_insert *xlrec = (xl_btree_insert *) rec;
strcat(buf, "insert: ");
out_target(buf, &(xlrec->target));
}
else if (info == XLOG_BTREE_DELETE)
{
xl_btree_delete *xlrec = (xl_btree_delete *) rec;
strcat(buf, "delete: ");
out_target(buf, &(xlrec->target));
}
else if (info == XLOG_BTREE_SPLIT || info == XLOG_BTREE_SPLEFT)
{
xl_btree_split *xlrec = (xl_btree_split *) rec;
sprintf(buf + strlen(buf), "split(%s): ",
(info == XLOG_BTREE_SPLIT) ? "right" : "left");
out_target(buf, &(xlrec->target));
sprintf(buf + strlen(buf), "; oth %u; rgh %u",
BlockIdGetBlockNumber(&xlrec->otherblk),
BlockIdGetBlockNumber(&xlrec->rightblk));
}
else if (info == XLOG_BTREE_NEWROOT)
{
xl_btree_newroot *xlrec = (xl_btree_newroot *) rec;
sprintf(buf + strlen(buf), "root: node %u/%u; blk %u",
xlrec->node.tblNode, xlrec->node.relNode,
BlockIdGetBlockNumber(&xlrec->rootblk));
}
else
strcat(buf, "UNKNOWN");
}