diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index f339a9cfcd1..6082af2fffe 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.146.2.3 2008/06/11 08:42:35 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.146.2.4 2010/08/29 19:33:43 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -60,9 +60,8 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page, static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, int leftfree, int rightfree, bool newitemonleft, Size firstrightitemsz); -static void _bt_pgaddtup(Relation rel, Page page, - Size itemsize, IndexTuple itup, - OffsetNumber itup_off, const char *where); +static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, + OffsetNumber itup_off); static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); static void _bt_vacuum_one_page(Relation rel, Buffer buffer); @@ -593,7 +592,9 @@ _bt_insertonpg(Relation rel, /* Do the update. No ereport(ERROR) until changes are logged */ START_CRIT_SECTION(); - _bt_pgaddtup(rel, page, itemsz, itup, newitemoff, "page"); + if (!_bt_pgaddtup(page, itemsz, itup, newitemoff)) + elog(PANIC, "failed to add new item to block %u in index \"%s\"", + itup_blkno, RelationGetRelationName(rel)); MarkBufferDirty(buf); @@ -719,6 +720,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, Page origpage; Page leftpage, rightpage; + BlockNumber origpagenumber, + rightpagenumber; BTPageOpaque ropaque, lopaque, oopaque; @@ -735,11 +738,27 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, OffsetNumber maxoff; OffsetNumber i; + /* Acquire a new page to split into */ rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); + + /* + * origpage is the original page to be split. leftpage is a temporary + * buffer that receives the left-sibling data, which will be copied back + * into origpage on success. rightpage is the new page that receives + * the right-sibling data. If we fail before reaching the critical + * section, origpage hasn't been modified and leftpage is only workspace. + * In principle we shouldn't need to worry about rightpage either, + * because it hasn't been linked into the btree page structure; but to + * avoid leaving possibly-confusing junk behind, we are careful to rewrite + * rightpage as zeroes before throwing any error. + */ origpage = BufferGetPage(buf); leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData)); rightpage = BufferGetPage(rbuf); + origpagenumber = BufferGetBlockNumber(buf); + rightpagenumber = BufferGetBlockNumber(rbuf); + _bt_pageinit(leftpage, BufferGetPageSize(buf)); /* rightpage was already initialized by _bt_getbuf */ @@ -754,8 +773,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE); ropaque->btpo_flags = lopaque->btpo_flags; lopaque->btpo_prev = oopaque->btpo_prev; - lopaque->btpo_next = BufferGetBlockNumber(rbuf); - ropaque->btpo_prev = BufferGetBlockNumber(buf); + lopaque->btpo_next = rightpagenumber; + ropaque->btpo_prev = origpagenumber; ropaque->btpo_next = oopaque->btpo_next; lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level; /* Since we already have write-lock on both pages, ok to read cycleid */ @@ -778,9 +797,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, item = (IndexTuple) PageGetItem(origpage, itemid); if (PageAddItem(rightpage, (Item) item, itemsz, rightoff, LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the right sibling" + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the right sibling" " while splitting block %u of index \"%s\"", - BufferGetBlockNumber(buf), RelationGetRelationName(rel)); + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } @@ -805,9 +827,12 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, } if (PageAddItem(leftpage, (Item) item, itemsz, leftoff, LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add hikey to the left sibling" + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add hikey to the left sibling" " while splitting block %u of index \"%s\"", - BufferGetBlockNumber(buf), RelationGetRelationName(rel)); + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); /* @@ -826,18 +851,28 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, { if (newitemonleft) { - _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } itup_off = leftoff; - itup_blkno = BufferGetBlockNumber(buf); + itup_blkno = origpagenumber; leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } itup_off = rightoff; - itup_blkno = BufferGetBlockNumber(rbuf); + itup_blkno = rightpagenumber; rightoff = OffsetNumberNext(rightoff); } } @@ -845,14 +880,24 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, /* decide which page to put it on */ if (i < firstright) { - _bt_pgaddtup(rel, leftpage, itemsz, item, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, itemsz, item, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add old item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } rightoff = OffsetNumberNext(rightoff); } } @@ -862,18 +907,28 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, { if (newitemonleft) { - _bt_pgaddtup(rel, leftpage, newitemsz, newitem, leftoff, - "left sibling"); + if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the left sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } itup_off = leftoff; - itup_blkno = BufferGetBlockNumber(buf); + itup_blkno = origpagenumber; leftoff = OffsetNumberNext(leftoff); } else { - _bt_pgaddtup(rel, rightpage, newitemsz, newitem, rightoff, - "right sibling"); + if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff)) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "failed to add new item to the right sibling" + " while splitting block %u of index \"%s\"", + origpagenumber, RelationGetRelationName(rel)); + } itup_off = rightoff; - itup_blkno = BufferGetBlockNumber(rbuf); + itup_blkno = rightpagenumber; rightoff = OffsetNumberNext(rightoff); } } @@ -886,16 +941,19 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * neighbors. */ - if (!P_RIGHTMOST(ropaque)) + if (!P_RIGHTMOST(oopaque)) { - sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE); + sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE); spage = BufferGetPage(sbuf); sopaque = (BTPageOpaque) PageGetSpecialPointer(spage); - if (sopaque->btpo_prev != ropaque->btpo_prev) - elog(PANIC, "right sibling's left-link doesn't match: " + if (sopaque->btpo_prev != origpagenumber) + { + memset(rightpage, 0, BufferGetPageSize(rbuf)); + elog(ERROR, "right sibling's left-link doesn't match: " "block %u links to %u instead of expected %u in index \"%s\"", - ropaque->btpo_next, sopaque->btpo_prev, ropaque->btpo_prev, + oopaque->btpo_next, sopaque->btpo_prev, origpagenumber, RelationGetRelationName(rel)); + } /* * Check to see if we can set the SPLIT_END flag in the right-hand @@ -920,8 +978,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * * NO EREPORT(ERROR) till right sibling is updated. We can get away with * not starting the critical section till here because we haven't been - * scribbling on the original page yet, and we don't care about the new - * sibling until it's linked into the btree. + * scribbling on the original page yet; see comments above. */ START_CRIT_SECTION(); @@ -930,7 +987,7 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, if (!P_RIGHTMOST(ropaque)) { - sopaque->btpo_prev = BufferGetBlockNumber(rbuf); + sopaque->btpo_prev = rightpagenumber; MarkBufferDirty(sbuf); } @@ -945,9 +1002,9 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, xlrec.target.node = rel->rd_node; ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off); if (newitemonleft) - xlrec.otherblk = BufferGetBlockNumber(rbuf); + xlrec.otherblk = rightpagenumber; else - xlrec.otherblk = BufferGetBlockNumber(buf); + xlrec.otherblk = origpagenumber; xlrec.leftblk = lopaque->btpo_prev; xlrec.rightblk = ropaque->btpo_next; xlrec.level = lopaque->btpo.level; @@ -1014,7 +1071,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright, * (in the page management code) that the center of a page always be * clean, and the most efficient way to guarantee this is just to compact * the data by reinserting it into a new left page. (XXX the latter - * comment is probably obsolete.) + * comment is probably obsolete; but in any case it's good to not scribble + * on the original page until we enter the critical section.) * * It's a bit weird that we don't fill in the left page till after writing * the XLOG entry, but not really worth changing. Note that we use the @@ -1648,13 +1706,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) * we insert the tuples in order, so that the given itup_off does * represent the final position of the tuple! */ -static void -_bt_pgaddtup(Relation rel, - Page page, +static bool +_bt_pgaddtup(Page page, Size itemsize, IndexTuple itup, - OffsetNumber itup_off, - const char *where) + OffsetNumber itup_off) { BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); IndexTupleData trunctuple; @@ -1669,8 +1725,9 @@ _bt_pgaddtup(Relation rel, if (PageAddItem(page, (Item) itup, itemsize, itup_off, LP_USED) == InvalidOffsetNumber) - elog(PANIC, "failed to add item to the %s in index \"%s\"", - where, RelationGetRelationName(rel)); + return false; + + return true; } /* diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 6bf5fb70588..911db47d510 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.101.2.1 2007/12/31 04:52:20 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtpage.c,v 1.101.2.2 2010/08/29 19:33:43 tgl Exp $ * * NOTES * Postgres btree pages look like ordinary relation pages. The opaque @@ -1027,6 +1027,13 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full) */ rightsib = opaque->btpo_next; rbuf = _bt_getbuf(rel, rightsib, BT_WRITE); + page = BufferGetPage(rbuf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + if (opaque->btpo_prev != target) + elog(ERROR, "right sibling's left-link doesn't match: " + "block %u links to %u instead of expected %u in index \"%s\"", + rightsib, opaque->btpo_prev, target, + RelationGetRelationName(rel)); /* * Next find and write-lock the current parent of the target page. This is @@ -1104,6 +1111,38 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full) } } + /* + * Check that the parent-page index items we're about to delete/overwrite + * contain what we expect. This can fail if the index has become + * corrupt for some reason. We want to throw any error before entering + * the critical section --- otherwise it'd be a PANIC. + * + * The test on the target item is just an Assert because _bt_getstackbuf + * should have guaranteed it has the expected contents. The test on the + * next-child downlink is known to sometimes fail in the field, though. + */ + page = BufferGetPage(pbuf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + +#ifdef USE_ASSERT_CHECKING + itemid = PageGetItemId(page, poffset); + itup = (IndexTuple) PageGetItem(page, itemid); + Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target); +#endif + + if (!parent_half_dead) + { + OffsetNumber nextoffset; + + nextoffset = OffsetNumberNext(poffset); + itemid = PageGetItemId(page, nextoffset); + itup = (IndexTuple) PageGetItem(page, itemid); + if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib) + elog(ERROR, "right sibling %u of block %u is not next child %u of block %u in index \"%s\"", + rightsib, target, ItemPointerGetBlockNumber(&(itup->t_tid)), + parent, RelationGetRelationName(rel)); + } + /* * Here we begin doing the deletion. */ @@ -1117,8 +1156,6 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full) * to copy the right sibling's downlink over the target downlink, and then * delete the following item. */ - page = BufferGetPage(pbuf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (parent_half_dead) { PageIndexTupleDelete(page, poffset); @@ -1130,23 +1167,16 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack, bool vacuum_full) itemid = PageGetItemId(page, poffset); itup = (IndexTuple) PageGetItem(page, itemid); - Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target); ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY); nextoffset = OffsetNumberNext(poffset); - /* This part is just for double-checking */ - itemid = PageGetItemId(page, nextoffset); - itup = (IndexTuple) PageGetItem(page, itemid); - if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib) - elog(PANIC, "right sibling %u of block %u is not next child of %u in index \"%s\"", - rightsib, target, BufferGetBlockNumber(pbuf), - RelationGetRelationName(rel)); PageIndexTupleDelete(page, nextoffset); } /* * Update siblings' side-links. Note the target page's side-links will - * continue to point to the siblings. + * continue to point to the siblings. Asserts here are just rechecking + * things we already verified above. */ if (BufferIsValid(lbuf)) {