diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index dc17ceab111..8f23e16992a 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.76 2001/01/24 19:42:48 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.77 2001/01/26 01:24:31 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -34,7 +34,9 @@ typedef struct int best_delta; /* best size delta so far */ } FindSplitData; -void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); +Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release); + +static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static TransactionId _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel, Buffer buf, @@ -44,6 +46,8 @@ static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, int keysz, ScanKey scankey, BTItem btitem, OffsetNumber afteritem); +static void _bt_insertuple(Relation rel, Buffer buf, + Size itemsz, BTItem btitem, OffsetNumber newitemoff); static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz, BTItem newitem, bool newitemonleft, @@ -456,9 +460,14 @@ _bt_insertonpg(Relation rel, if (is_root) { + Buffer rootbuf; + Assert(stack == (BTStack) NULL); /* create a new root node and release the split buffers */ - _bt_newroot(rel, buf, rbuf); + rootbuf = _bt_newroot(rel, buf, rbuf); + _bt_wrtbuf(rel, rootbuf); + _bt_wrtbuf(rel, rbuf); + _bt_wrtbuf(rel, buf); } else { @@ -519,52 +528,11 @@ _bt_insertonpg(Relation rel, } else { - START_CRIT_SECTION(); - _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page"); itup_off = newitemoff; itup_blkno = BufferGetBlockNumber(buf); - /* XLOG stuff */ - { - xl_btree_insert xlrec; - uint8 flag = XLOG_BTREE_INSERT; - XLogRecPtr recptr; - XLogRecData rdata[2]; - BTItemData truncitem; - xlrec.target.node = rel->rd_node; - ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff); - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char*)&xlrec; - rdata[0].len = SizeOfBtreeInsert; - rdata[0].next = &(rdata[1]); + _bt_insertuple(rel, buf, itemsz, btitem, newitemoff); - /* Read comments in _bt_pgaddtup */ - if (!(P_ISLEAF(lpageop)) && newitemoff == P_FIRSTDATAKEY(lpageop)) - { - truncitem = *btitem; - truncitem.bti_itup.t_info = sizeof(BTItemData); - rdata[1].data = (char*)&truncitem; - rdata[1].len = sizeof(BTItemData); - } - else - { - rdata[1].data = (char*)btitem; - rdata[1].len = IndexTupleDSize(btitem->bti_itup) + - (sizeof(BTItemData) - sizeof(IndexTupleData)); - } - rdata[1].buffer = buf; - rdata[1].next = NULL; - - if (P_ISLEAF(lpageop)) - flag |= XLOG_BTREE_LEAF; - - recptr = XLogInsert(RM_BTREE_ID, flag, rdata); - - PageSetLSN(page, recptr); - PageSetSUI(page, ThisStartUpID); - } - - END_CRIT_SECTION(); /* Write out the updated page and release pin/lock */ _bt_wrtbuf(rel, buf); } @@ -576,6 +544,57 @@ _bt_insertonpg(Relation rel, return res; } +static void +_bt_insertuple(Relation rel, Buffer buf, + Size itemsz, BTItem btitem, OffsetNumber newitemoff) +{ + Page page = BufferGetPage(buf); + BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); + + START_CRIT_SECTION(); + _bt_pgaddtup(rel, page, itemsz, btitem, newitemoff, "page"); + /* XLOG stuff */ + { + xl_btree_insert xlrec; + uint8 flag = XLOG_BTREE_INSERT; + XLogRecPtr recptr; + XLogRecData rdata[2]; + BTItemData truncitem; + xlrec.target.node = rel->rd_node; + ItemPointerSet(&(xlrec.target.tid), BufferGetBlockNumber(buf), newitemoff); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char*)&xlrec; + rdata[0].len = SizeOfBtreeInsert; + rdata[0].next = &(rdata[1]); + + /* Read comments in _bt_pgaddtup */ + if (!(P_ISLEAF(pageop)) && newitemoff == P_FIRSTDATAKEY(pageop)) + { + truncitem = *btitem; + truncitem.bti_itup.t_info = sizeof(BTItemData); + rdata[1].data = (char*)&truncitem; + rdata[1].len = sizeof(BTItemData); + } + else + { + rdata[1].data = (char*)btitem; + rdata[1].len = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + } + rdata[1].buffer = buf; + rdata[1].next = NULL; + if (P_ISLEAF(pageop)) + flag |= XLOG_BTREE_LEAF; + + recptr = XLogInsert(RM_BTREE_ID, flag, rdata); + + PageSetLSN(page, recptr); + PageSetSUI(page, ThisStartUpID); + } + + END_CRIT_SECTION(); +} + /* * _bt_split() -- split a page in the btree. * @@ -1130,11 +1149,12 @@ _bt_getstackbuf(Relation rel, BTStack stack) * graph. * * On entry, lbuf (the old root) and rbuf (its new peer) are write- - * locked. On exit, a new root page exists with entries for the - * two new children. The new root page is neither pinned nor locked, and - * we have also written out lbuf and rbuf and dropped their pins/locks. + * locked. On exit, a new root page exists with entries for the + * two new children, metapage is updated and unlocked/unpinned. + * The new root buffer is returned to caller which has to unlock/unpin + * lbuf, rbuf & rootbuf. */ -void +static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) { Buffer rootbuf; @@ -1257,13 +1277,156 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) } END_CRIT_SECTION(); - /* write and let go of the new root buffer */ - _bt_wrtbuf(rel, rootbuf); + /* write and let go of metapage buffer */ _bt_wrtbuf(rel, metabuf); - /* update and release new sibling, and finally the old root */ - _bt_wrtbuf(rel, rbuf); - _bt_wrtbuf(rel, lbuf); + return(rootbuf); +} + +/* + * In the event old root page was splitted but no new one was created we + * build required parent levels keeping write lock on old root page. + * Note: it's assumed that old root page' btpo_parent points to meta page, + * ie not to parent page. On exit, new root page buffer is write locked. + * If "release" is TRUE then oldrootbuf will be released immediately + * after upper level is builded. + */ +Buffer +_bt_fixroot(Relation rel, Buffer oldrootbuf, bool release) +{ + Buffer rootbuf; + BlockNumber rootblk; + Page rootpage; + XLogRecPtr rootLSN; + Page oldrootpage = BufferGetPage(oldrootbuf); + BTPageOpaque oldrootopaque = (BTPageOpaque) + PageGetSpecialPointer(oldrootpage); + Buffer buf, leftbuf, rightbuf; + Page page, leftpage, rightpage; + BTPageOpaque opaque, leftopaque, rightopaque; + OffsetNumber newitemoff; + BTItem btitem, ritem; + Size itemsz; + + if (! P_LEFTMOST(oldrootopaque) || P_RIGHTMOST(oldrootopaque)) + elog(ERROR, "bt_fixroot: not valid old root page"); + + /* Read right neighbor and create new root page*/ + leftbuf = _bt_getbuf(rel, oldrootopaque->btpo_next, BT_WRITE); + leftpage = BufferGetPage(leftbuf); + leftopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage); + rootbuf = _bt_newroot(rel, oldrootbuf, leftbuf); + rootpage = BufferGetPage(rootbuf); + rootLSN = PageGetLSN(rootpage); + rootblk = BufferGetBlockNumber(rootbuf); + + /* + * Update LSN & StartUpID of old root buffer and its neighbor to + * ensure that they will be written on disk after logging new + * root creation. Unfortunately, for the moment (?) we do not + * log this operation and so possibly break our rule to log entire + * page content of first after checkpoint modification. + */ + HOLD_INTERRUPTS(); + oldrootopaque->btpo_parent = rootblk; + leftopaque->btpo_parent = rootblk; + PageSetLSN(oldrootpage, rootLSN); + PageSetSUI(oldrootpage, ThisStartUpID); + PageSetLSN(leftpage, rootLSN); + PageSetSUI(leftpage, ThisStartUpID); + RESUME_INTERRUPTS(); + + /* parent page where to insert pointers */ + buf = rootbuf; + page = BufferGetPage(buf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + + /* + * Now read other pages (if any) on level and add them to new root. + * If concurrent process will split one of pages on this level then it + * will notice either btpo_parent == metablock or btpo_parent == rootblk. + * In first case it will give up its locks and try to lock leftmost page + * buffer (oldrootbuf) to fix root - ie it will wait for us and let us + * continue. In second case it will try to lock rootbuf keeping its locks + * on buffers we already passed, also waiting for us. If we'll have to + * unlock rootbuf (split it) and that process will have to split page + * of new level we created (level of rootbuf) then it will wait while + * we create upper level. Etc. + */ + while(! P_RIGHTMOST(leftopaque)) + { + rightbuf = _bt_getbuf(rel, leftopaque->btpo_next, BT_WRITE); + rightpage = BufferGetPage(rightbuf); + rightopaque = (BTPageOpaque) PageGetSpecialPointer(rightpage); + + /* Update LSN & StartUpID (see comments above) */ + HOLD_INTERRUPTS(); + rightopaque->btpo_parent = rootblk; + if (XLByteLT(PageGetLSN(rightpage), rootLSN)) + PageSetLSN(rightpage, rootLSN); + PageSetSUI(rightpage, ThisStartUpID); + RESUME_INTERRUPTS(); + + ritem = (BTItem) PageGetItem(leftpage, PageGetItemId(leftpage, P_HIKEY)); + btitem = _bt_formitem(&(ritem->bti_itup)); + ItemPointerSet(&(btitem->bti_itup.t_tid), leftopaque->btpo_next, P_HIKEY); + itemsz = IndexTupleDSize(btitem->bti_itup) + + (sizeof(BTItemData) - sizeof(IndexTupleData)); + itemsz = MAXALIGN(itemsz); + + newitemoff = OffsetNumberNext(PageGetMaxOffsetNumber(page)); + + if (PageGetFreeSpace(page) < itemsz) + { + Buffer newbuf; + OffsetNumber firstright; + OffsetNumber itup_off; + BlockNumber itup_blkno; + bool newitemonleft; + + firstright = _bt_findsplitloc(rel, page, + newitemoff, itemsz, &newitemonleft); + newbuf = _bt_split(rel, buf, firstright, + newitemoff, itemsz, btitem, newitemonleft, + &itup_off, &itup_blkno); + /* Keep lock on new "root" buffer ! */ + if (buf != rootbuf) + _bt_relbuf(rel, buf, BT_WRITE); + buf = newbuf; + page = BufferGetPage(buf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + } + else + _bt_insertuple(rel, buf, itemsz, btitem, newitemoff); + + /* give up left buffer */ + _bt_relbuf(rel, leftbuf, BT_WRITE); + leftbuf = rightbuf; + leftpage = rightpage; + leftopaque = rightopaque; + } + + /* give up rightmost page buffer */ + _bt_relbuf(rel, leftbuf, BT_WRITE); + + /* + * Here we hold locks on old root buffer, new root buffer we've + * created with _bt_newroot() - rootbuf, - and buf we've used + * for last insert ops - buf. If rootbuf != buf then we have to + * create at least one more level. And if "release" is TRUE + * (ie we've already created some levels) then we give up + * oldrootbuf. + */ + if (release) + _bt_relbuf(rel, oldrootbuf, BT_WRITE); + + if (rootbuf != buf) + { + _bt_relbuf(rel, buf, BT_WRITE); + return(_bt_fixroot(rel, rootbuf, true)); + } + + return(rootbuf); } /* diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index ad9d69e13d3..0f68a066dc7 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.47 2001/01/24 19:42:48 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.48 2001/01/26 01:24:31 vadim Exp $ * * NOTES * Postgres btree pages look like ordinary relation pages. The opaque @@ -28,6 +28,8 @@ #include "miscadmin.h" #include "storage/lmgr.h" +extern bool FixBTree; /* comments in nbtree.c */ +extern Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release); /* * We use high-concurrency locking on btrees. There are two cases in @@ -237,7 +239,58 @@ _bt_getroot(Relation rel, int access) if (! P_ISROOT(rootopaque)) { - /* it happened, try again */ + /* + * It happened, but if root page splitter failed to create + * new root page then we'll go in loop trying to call + * _bt_getroot again and again. + */ + if (FixBTree) + { + Buffer newrootbuf; + +check_parent:; + if (rootopaque->btpo_parent == BTREE_METAPAGE) /* unupdated! */ + { + LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK); + LockBuffer(rootbuf, BT_WRITE); + + /* handle concurrent fix of root page */ + if (rootopaque->btpo_parent == BTREE_METAPAGE) /* unupdated! */ + { + newrootbuf = _bt_fixroot(rel, rootbuf, true); + LockBuffer(newrootbuf, BUFFER_LOCK_UNLOCK); + LockBuffer(newrootbuf, BT_READ); + rootbuf = newrootbuf; + rootpage = BufferGetPage(rootbuf); + rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); + /* New root might be splitted while changing lock */ + if (P_ISROOT(rootopaque)) + return(rootbuf); + /* rootbuf is read locked */ + goto check_parent; + } + else /* someone else already fixed root */ + { + LockBuffer(rootbuf, BUFFER_LOCK_UNLOCK); + LockBuffer(rootbuf, BT_READ); + } + } + /* + * Ok, here we have old root page with btpo_parent pointing + * to upper level - check parent page because of there is + * good chance that parent is root page. + */ + newrootbuf = _bt_getbuf(rel, rootopaque->btpo_parent, BT_READ); + _bt_relbuf(rel, rootbuf, BT_READ); + rootbuf = newrootbuf; + rootpage = BufferGetPage(rootbuf); + rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage); + if (P_ISROOT(rootopaque)) + return(rootbuf); + /* no luck -:( */ + } + + /* try again */ _bt_relbuf(rel, rootbuf, BT_READ); return _bt_getroot(rel, access); } diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 7b2f7fa7d98..8685975edf5 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -12,7 +12,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.75 2001/01/24 19:42:48 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.76 2001/01/26 01:24:31 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -26,13 +26,18 @@ #include "executor/executor.h" #include "miscadmin.h" #include "storage/sinval.h" - +#include "access/xlogutils.h" bool BuildingBtree = false; /* see comment in btbuild() */ -bool FastBuild = true; /* use sort/build instead of insertion - * build */ +bool FastBuild = true; /* use sort/build instead */ + /* of insertion build */ -#include "access/xlogutils.h" + +/* + * TEMPORARY FLAG FOR TESTING NEW FIX TREE + * CODE WITHOUT AFFECTING ANYONE ELSE + */ +bool FixBTree = false; static void _bt_restscan(IndexScanDesc scan);