1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-01 21:31:19 +03:00

Fix duplicates handling.

This commit is contained in:
Vadim B. Mikheev
1997-05-30 18:35:40 +00:00
parent 43b6f1e678
commit 3f5834fb8c
4 changed files with 531 additions and 109 deletions

View File

@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.12 1997/04/16 01:48:11 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.13 1997/05/30 18:35:31 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -28,13 +28,14 @@
#endif
static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem);
static Buffer _bt_split(Relation rel, Buffer buf);
static Buffer _bt_split(Relation rel, Buffer buf, BTItem hiRightItem);
static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit);
static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem);
static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);
static InsertIndexResult _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem hikey);
/*
* _bt_doinsert() -- Handle insertion of a single btitem in the tree.
@@ -225,31 +226,152 @@ _bt_insertonpg(Relation rel,
Buffer rbuf;
Buffer pbuf;
Page rpage;
ScanKey newskey;
BTItem ritem;
BTPageOpaque lpageop;
BTPageOpaque rpageop;
BlockNumber rbknum, itup_blkno;
OffsetNumber itup_off;
int itemsz;
InsertIndexResult newres;
BTItem new_item = (BTItem) NULL;
BTItem lowLeftItem;
OffsetNumber leftmost_offset;
Page ppage;
BTPageOpaque ppageop;
BlockNumber bknum;
page = BufferGetPage(buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
itemsz = IndexTupleDSize(btitem->bti_itup)
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = DOUBLEALIGN(itemsz); /* be safe, PageAddItem will do this
but we need to be consistent */
/*
* If we have to insert item on the leftmost page which is the first
* page in the chain of duplicates then:
* 1. if scankey == hikey (i.e. - new duplicate item) then
* insert it here;
* 2. if scankey < hikey then we grab new page, copy current page
* content there and insert new item on the current page.
*/
if ( lpageop->btpo_flags & BTP_CHAIN )
{
OffsetNumber maxoff = PageGetMaxOffsetNumber (page);
ItemId hitemid;
BTItem hitem;
Assert ( !P_RIGHTMOST(lpageop) );
hitemid = PageGetItemId(page, P_HIKEY);
hitem = (BTItem) PageGetItem(page, hitemid);
if ( maxoff > P_HIKEY &&
!_bt_itemcmp (rel, keysz, hitem,
(BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)),
BTEqualStrategyNumber) )
elog (FATAL, "btree: bad key on the page in the chain of duplicates");
if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid,
BTEqualStrategyNumber) )
{
if ( !P_LEFTMOST(lpageop) )
elog (FATAL, "btree: attempt to insert bad key on the non-leftmost page in the chain of duplicates");
if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid,
BTLessStrategyNumber) )
elog (FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates");
return (_bt_shift(rel, buf, stack, keysz, scankey, btitem, hitem));
}
}
if (PageGetFreeSpace(page) < itemsz) {
if (PageGetFreeSpace(page) < itemsz)
{
BlockNumber bknum = BufferGetBlockNumber(buf);
BTItem lowLeftItem;
BTItem hiRightItem = NULL;
/*
* If we have to split leaf page in the chain of duplicates
* then we try to move righter to avoid splitting.
*/
if ( ( lpageop->btpo_flags & BTP_CHAIN ) &&
( lpageop->btpo_flags & BTP_LEAF ) )
{
bool use_left = true;
for ( ; ; )
{
bool keys_equal = false;
rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
rpage = BufferGetPage(rbuf);
rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
if ( P_RIGHTMOST (rpageop) )
{
Assert ( !( rpageop->btpo_flags & BTP_CHAIN ) );
use_left = false;
break;
}
/*
* If we have the same hikey here then it's
* yet another page in chain and we may move
* even righter.
*/
if ( _bt_skeycmp (rel, keysz, scankey, rpage,
PageGetItemId(rpage, P_HIKEY),
BTEqualStrategyNumber) )
{
if ( !( rpageop->btpo_flags & BTP_CHAIN ) )
elog (FATAL, "btree: lost page in the chain of duplicates");
keys_equal = true;
}
else if ( _bt_skeycmp (rel, keysz, scankey, rpage,
PageGetItemId(rpage, P_HIKEY),
BTGreaterStrategyNumber) )
elog (FATAL, "btree: hikey is out of order");
/*
* If hikey > scankey and BTP_CHAIN is ON
* then it's first page of the chain of higher keys:
* our left sibling hikey was lying! We can't add new
* item here, but we can turn BTP_CHAIN off on our
* left page and overwrite its hikey.
*/
if ( !keys_equal && ( rpageop->btpo_flags & BTP_CHAIN ) )
{
BTItem tmp;
lpageop->btpo_flags &= ~BTP_CHAIN;
tmp = (BTItem) PageGetItem(rpage,
PageGetItemId(rpage, P_HIKEY));
hiRightItem = _bt_formitem(&(tmp->bti_itup));
break;
}
/*
* if there is room here or hikey > scankey (so it's our
* last page in the chain and we can't move righter)
* we have to use this page .
*/
if ( PageGetFreeSpace (rpage) > itemsz || !keys_equal )
{
use_left = false;
break;
}
/* try to move righter */
_bt_relbuf(rel, buf, BT_WRITE);
buf = rbuf;
page = rpage;
lpageop = rpageop;
}
if ( !use_left ) /* insert on the right page */
{
_bt_relbuf(rel, buf, BT_WRITE);
return ( _bt_insertonpg(rel, rbuf, stack, keysz,
scankey, btitem, afteritem) );
}
_bt_relbuf(rel, rbuf, BT_WRITE);
bknum = BufferGetBlockNumber(buf);
}
/* split the buffer into left and right halves */
rbuf = _bt_split(rel, buf);
rbuf = _bt_split(rel, buf, hiRightItem);
if ( hiRightItem != (BTItem) NULL )
pfree (hiRightItem);
/* which new page (left half or right half) gets the tuple? */
if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) {
@@ -264,6 +386,14 @@ _bt_insertonpg(Relation rel,
itup_blkno = BufferGetBlockNumber(rbuf);
}
lowLeftItem = (BTItem) PageGetItem(page,
PageGetItemId(page, P_FIRSTKEY));
if ( _bt_itemcmp (rel, keysz, lowLeftItem,
(BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)),
BTEqualStrategyNumber) )
lpageop->btpo_flags |= BTP_CHAIN;
/*
* By here,
*
@@ -287,6 +417,11 @@ _bt_insertonpg(Relation rel,
_bt_relbuf(rel, rbuf, BT_WRITE);
} else {
ScanKey newskey;
InsertIndexResult newres;
BTItem new_item;
OffsetNumber upditem_offset = P_HIKEY;
bool do_update = false;
/* form a index tuple that points at the new right page */
rbknum = BufferGetBlockNumber(rbuf);
@@ -294,27 +429,43 @@ _bt_insertonpg(Relation rel,
rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
/*
* By convention, the first entry (0) on every
* By convention, the first entry (1) on every
* non-rightmost page is the high key for that page. In
* order to get the lowest key on the new right page, we
* actually look at its second (1) entry.
* actually look at its second (2) entry.
*/
if (! P_RIGHTMOST(rpageop)) {
if (! P_RIGHTMOST(rpageop))
{
ritem = (BTItem) PageGetItem(rpage,
PageGetItemId(rpage, P_FIRSTKEY));
} else {
if ( _bt_itemcmp (rel, keysz, ritem,
(BTItem) PageGetItem(rpage,
PageGetItemId(rpage, P_HIKEY)),
BTEqualStrategyNumber) )
rpageop->btpo_flags |= BTP_CHAIN;
}
else
ritem = (BTItem) PageGetItem(rpage,
PageGetItemId(rpage, P_HIKEY));
}
/* get a unique btitem for this key */
new_item = _bt_formitem(&(ritem->bti_itup));
ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
/* find the parent buffer */
/*
* Find the parent buffer and get the parent page.
*
* Oops - if we were moved right then we need to
* change stack item! We want to find parent pointing to
* where we are, right ? - vadim 05/27/97
*/
ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
bknum, P_HIKEY);
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
ppage = BufferGetPage(pbuf);
ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
/*
* If the key of new_item is < than the key of the item
@@ -330,29 +481,59 @@ _bt_insertonpg(Relation rel,
* key spills over to our new right page, we get an
* inconsistency if we don't update the left key in the
* parent page.
*
* Also, new duplicates handling code require us to update
* parent item if some smaller items left on the left page
* (which is possible in splitting leftmost page) and
* current parent item == new_item. - vadim 05/27/97
*/
if (_bt_itemcmp(rel, keysz, stack->bts_btitem, new_item,
BTGreaterStrategyNumber)) {
ppageop = (BTPageOpaque) PageGetSpecialPointer(page);
Assert (P_LEFTMOST(ppageop));
lowLeftItem =
(BTItem) PageGetItem(page,
PageGetItemId(page, P_FIRSTKEY));
/* this method does not work--_bt_updateitem tries to */
/* overwrite an entry with another entry that might be */
/* bigger. if lowLeftItem is bigger, it corrupts the */
/* parent page. instead, we have to delete the original */
/* leftmost item from the parent, and insert the new one */
/* with a regular _bt_insertonpg (it could cause a split */
/* because it's bigger than what was there before). */
/* --djm 8/21/96 */
if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, new_item,
BTGreaterStrategyNumber) ||
( _bt_itemcmp(rel, keysz, stack->bts_btitem,
new_item, BTEqualStrategyNumber) &&
_bt_itemcmp(rel, keysz, lowLeftItem,
new_item, BTLessStrategyNumber) ) )
{
do_update = true;
/*
* but it works for items with the same size and so why don't
* use it for them ? - vadim 12/05/96
* figure out which key is leftmost (if the parent page
* is rightmost, too, it must be the root)
*/
if(P_RIGHTMOST(ppageop))
upditem_offset = P_HIKEY;
else
upditem_offset = P_FIRSTKEY;
if ( !P_LEFTMOST(lpageop) ||
stack->bts_offset != upditem_offset )
elog (FATAL, "btree: items are out of order");
}
/*
* There was bug caused by deletion all minimum keys (K1) from
* an index page and insertion there (up to page splitting)
* higher duplicate keys (K2): after it parent item for left
* page contained K1 and the next item (for new right page) - K2,
* - and scan for the key = K2 lost items on the left page.
* So, we have to update parent item if its key < minimum
* key on the left and minimum keys on the left and on the right
* are equal. It would be nice to update hikey on the previous
* page of the left one too, but we may get deadlock here
* (read comments in _bt_split), so we leave previous page
* hikey _inconsistent_, but there should to be BTP_CHAIN flag
* on it, which privents _bt_moveright from dangerous movings
* from there. - vadim 05/27/97
*/
else if ( _bt_itemcmp (rel, keysz, stack->bts_btitem,
lowLeftItem, BTLessStrategyNumber) &&
_bt_itemcmp (rel, keysz, new_item,
lowLeftItem, BTEqualStrategyNumber) )
{
do_update = true;
upditem_offset = stack->bts_offset;
}
if ( do_update )
{
/* Try to update in place. */
if ( DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) ==
DOUBLEALIGN (IndexTupleDSize (stack->bts_btitem->bti_itup)) )
{
@@ -363,33 +544,16 @@ _bt_insertonpg(Relation rel,
}
else
{
/* get the parent page */
ppage = BufferGetPage(pbuf);
ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
/*
* figure out which key is leftmost (if the parent page
* is rightmost, too, it must be the root)
*/
if(P_RIGHTMOST(ppageop)) {
leftmost_offset = P_HIKEY;
} else {
leftmost_offset = P_FIRSTKEY;
}
PageIndexTupleDelete(ppage, leftmost_offset);
PageIndexTupleDelete(ppage, upditem_offset);
/*
* don't write anything out yet--we still have the write
* lock, and now we call another _bt_insertonpg to
* insert the correct leftmost key
* insert the correct key.
* First, make a new item, using the tuple data from
* lowLeftItem. Point it to the left child.
* Update it on the stack at the same time.
*/
/*
* make a new leftmost item, using the tuple data from
* lowLeftItem. point it to the left child.
* update it on the stack at the same time.
*/
bknum = BufferGetBlockNumber(buf);
pfree(stack->bts_btitem);
stack->bts_btitem = _bt_formitem(&(lowLeftItem->bti_itup));
ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
@@ -400,9 +564,10 @@ _bt_insertonpg(Relation rel,
_bt_relbuf(rel, rbuf, BT_WRITE);
/*
* a regular _bt_binsrch should find the right place to
* put the new entry, since it should be lower than any
* other key on the page, therefore set afteritem to NULL
* A regular _bt_binsrch should find the right place to
* put the new entry, since it should be either lower
* than any other key on the page or unique.
* Therefore set afteritem to NULL.
*/
newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup));
newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
@@ -458,7 +623,7 @@ _bt_insertonpg(Relation rel,
* pin and lock on buf are maintained.
*/
static Buffer
_bt_split(Relation rel, Buffer buf)
_bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
{
Buffer rbuf;
Page origpage;
@@ -492,6 +657,7 @@ _bt_split(Relation rel, Buffer buf)
/* if we're splitting this page, it won't be the root when we're done */
oopaque->btpo_flags &= ~BTP_ROOT;
oopaque->btpo_flags &= ~BTP_CHAIN;
lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags;
lopaque->btpo_prev = oopaque->btpo_prev;
ropaque->btpo_prev = BufferGetBlockNumber(buf);
@@ -516,10 +682,23 @@ _bt_split(Relation rel, Buffer buf)
/* splitting a non-rightmost page, start at the first data item */
start = P_FIRSTKEY;
/* copy the original high key to the new page */
itemid = PageGetItemId(origpage, P_HIKEY);
itemsz = ItemIdGetLength(itemid);
item = (BTItem) PageGetItem(origpage, itemid);
/*
* Copy the original high key to the new page if high key
* was not passed by caller.
*/
if ( hiRightItem == NULL )
{
itemid = PageGetItemId(origpage, P_HIKEY);
itemsz = ItemIdGetLength(itemid);
item = (BTItem) PageGetItem(origpage, itemid);
}
else
{
item = hiRightItem;
itemsz = IndexTupleDSize(hiRightItem->bti_itup)
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = DOUBLEALIGN(itemsz);
}
(void) PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED);
rightoff = P_FIRSTKEY;
} else {
@@ -744,7 +923,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
itemsz = ItemIdGetLength(itemid);
item = (BTItem) PageGetItem(lpage, itemid);
new_item = _bt_formitem(&(item->bti_itup));
ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_FIRSTKEY);
ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
/*
* insert the left page pointer into the new root page. the root
@@ -1098,3 +1277,137 @@ _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum,
/* by here, the keys are equal */
return (true);
}
/*
* _bt_shift - insert btitem on the passed page after shifting page
* to the right in the tree.
*
* NOTE: tested for shifting leftmost page only, having btitem < hikey.
*/
static InsertIndexResult
_bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz,
ScanKey scankey, BTItem btitem, BTItem hikey)
{
InsertIndexResult res;
int itemsz;
Page page;
BlockNumber bknum;
BTPageOpaque pageop;
Buffer rbuf;
Page rpage;
BTPageOpaque rpageop;
Buffer pbuf;
Page ppage;
BTPageOpaque ppageop;
Buffer nbuf;
Page npage;
BTPageOpaque npageop;
BlockNumber nbknum;
BTItem nitem;
OffsetNumber afteroff;
btitem = _bt_formitem(&(btitem->bti_itup));
hikey = _bt_formitem(&(hikey->bti_itup));
page = BufferGetPage(buf);
/* grab new page */
nbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
nbknum = BufferGetBlockNumber(nbuf);
npage = BufferGetPage(nbuf);
_bt_pageinit(npage, BufferGetPageSize(nbuf));
npageop = (BTPageOpaque) PageGetSpecialPointer(npage);
/* copy content of the passed page */
memmove ((char *) npage, (char *) page, BufferGetPageSize(buf));
/* re-init old (passed) page */
_bt_pageinit(page, BufferGetPageSize(buf));
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
/* init old page opaque */
pageop->btpo_flags = npageop->btpo_flags; /* restore flags */
pageop->btpo_flags &= ~BTP_CHAIN;
if ( _bt_itemcmp (rel, keysz, hikey, btitem, BTEqualStrategyNumber) )
pageop->btpo_flags |= BTP_CHAIN;
pageop->btpo_prev = npageop->btpo_prev; /* restore prev */
pageop->btpo_next = nbknum; /* next points to the new page */
/* init shifted page opaque */
npageop->btpo_prev = bknum = BufferGetBlockNumber(buf);
/* shifted page is ok, populate old page */
/* add passed hikey */
itemsz = IndexTupleDSize(hikey->bti_itup)
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = DOUBLEALIGN(itemsz);
(void) PageAddItem(page, (Item) hikey, itemsz, P_HIKEY, LP_USED);
pfree (hikey);
/* add btitem */
itemsz = IndexTupleDSize(btitem->bti_itup)
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = DOUBLEALIGN(itemsz);
(void) PageAddItem(page, (Item) btitem, itemsz, P_FIRSTKEY, LP_USED);
pfree (btitem);
nitem = (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
btitem = _bt_formitem(&(nitem->bti_itup));
ItemPointerSet(&(btitem->bti_itup.t_tid), bknum, P_HIKEY);
/* ok, write them out */
_bt_wrtnorelbuf(rel, nbuf);
_bt_wrtnorelbuf(rel, buf);
/* fix btpo_prev on right sibling of old page */
if ( !P_RIGHTMOST (npageop) )
{
rbuf = _bt_getbuf(rel, npageop->btpo_next, BT_WRITE);
rpage = BufferGetPage(rbuf);
rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
rpageop->btpo_prev = nbknum;
_bt_wrtbuf(rel, rbuf);
}
/* get parent pointing to the old page */
ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
bknum, P_HIKEY);
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
ppage = BufferGetPage(pbuf);
ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
_bt_relbuf(rel, nbuf, BT_WRITE);
_bt_relbuf(rel, buf, BT_WRITE);
/* re-set parent' pointer - we shifted our page to the right ! */
nitem = (BTItem) PageGetItem (ppage,
PageGetItemId (ppage, stack->bts_offset));
ItemPointerSet(&(nitem->bti_itup.t_tid), nbknum, P_HIKEY);
ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), nbknum, P_HIKEY);
_bt_wrtnorelbuf(rel, pbuf);
/*
* Now we want insert into the parent pointer to our old page. It has to
* be inserted before the pointer to new page. You may get problems here
* (in the _bt_goesonpg and/or _bt_pgaddtup), but may be not - I don't
* know. It works if old page is leftmost (nitem is NULL) and
* btitem < hikey and it's all what we need currently. - vadim 05/30/97
*/
nitem = NULL;
afteroff = P_FIRSTKEY;
if ( !P_RIGHTMOST (ppageop) )
afteroff = OffsetNumberNext (afteroff);
if ( stack->bts_offset >= afteroff )
{
afteroff = OffsetNumberPrev (stack->bts_offset);
nitem = (BTItem) PageGetItem (ppage, PageGetItemId (ppage, afteroff));
nitem = _bt_formitem(&(nitem->bti_itup));
}
res = _bt_insertonpg(rel, pbuf, stack->bts_parent,
keysz, scankey, btitem, nitem);
pfree (btitem);
ItemPointerSet(&(res->pointerData), nbknum, P_HIKEY);
return (res);
}