1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-28 11:55:03 +03:00
Files
postgres/src/backend/access/gin/gindatapage.c
Heikki Linnakangas 8fd04cb328 Fix full-page writes of internal GIN pages.
Insertion to a non-leaf GIN page didn't make a full-page image of the page,
which is wrong. The code used to do it correctly, but was changed (commit
853d1c3103) because the redo-routine didn't
track incomplete splits correctly when the page was restored from a full
page image. Of course, that was not right way to fix it, the redo routine
should've been fixed instead. The redo-routine was surreptitiously fixed
in 2010 (commit 4016bdef8a), so all we need
to do now is revert the code that creates the record to its original form.

This doesn't change the format of the WAL record.

Backpatch to all supported versions.
2013-12-03 22:13:16 +02:00

679 lines
16 KiB
C

/*-------------------------------------------------------------------------
*
* gindatapage.c
* page utilities routines for the postgres inverted index access method.
*
*
* Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/gin/gindatapage.c
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/gin_private.h"
#include "utils/rel.h"
int
ginCompareItemPointers(ItemPointer a, ItemPointer b)
{
BlockNumber ba = GinItemPointerGetBlockNumber(a);
BlockNumber bb = GinItemPointerGetBlockNumber(b);
if (ba == bb)
{
OffsetNumber oa = GinItemPointerGetOffsetNumber(a);
OffsetNumber ob = GinItemPointerGetOffsetNumber(b);
if (oa == ob)
return 0;
return (oa > ob) ? 1 : -1;
}
return (ba > bb) ? 1 : -1;
}
/*
* Merge two ordered arrays of itempointers, eliminating any duplicates.
* Returns the number of items in the result.
* Caller is responsible that there is enough space at *dst.
*/
uint32
ginMergeItemPointers(ItemPointerData *dst,
ItemPointerData *a, uint32 na,
ItemPointerData *b, uint32 nb)
{
ItemPointerData *dptr = dst;
ItemPointerData *aptr = a,
*bptr = b;
while (aptr - a < na && bptr - b < nb)
{
int cmp = ginCompareItemPointers(aptr, bptr);
if (cmp > 0)
*dptr++ = *bptr++;
else if (cmp == 0)
{
/* we want only one copy of the identical items */
*dptr++ = *bptr++;
aptr++;
}
else
*dptr++ = *aptr++;
}
while (aptr - a < na)
*dptr++ = *aptr++;
while (bptr - b < nb)
*dptr++ = *bptr++;
return dptr - dst;
}
/*
* Checks, should we move to right link...
* Compares inserting itemp pointer with right bound of current page
*/
static bool
dataIsMoveRight(GinBtree btree, Page page)
{
ItemPointer iptr = GinDataPageGetRightBound(page);
if (GinPageRightMost(page))
return FALSE;
return (ginCompareItemPointers(btree->items + btree->curitem, iptr) > 0) ? TRUE : FALSE;
}
/*
* Find correct PostingItem in non-leaf page. It supposed that page
* correctly chosen and searching value SHOULD be on page
*/
static BlockNumber
dataLocateItem(GinBtree btree, GinBtreeStack *stack)
{
OffsetNumber low,
high,
maxoff;
PostingItem *pitem = NULL;
int result;
Page page = BufferGetPage(stack->buffer);
Assert(!GinPageIsLeaf(page));
Assert(GinPageIsData(page));
if (btree->fullScan)
{
stack->off = FirstOffsetNumber;
stack->predictNumber *= GinPageGetOpaque(page)->maxoff;
return btree->getLeftMostPage(btree, page);
}
low = FirstOffsetNumber;
maxoff = high = GinPageGetOpaque(page)->maxoff;
Assert(high >= low);
high++;
while (high > low)
{
OffsetNumber mid = low + ((high - low) / 2);
pitem = (PostingItem *) GinDataPageGetItem(page, mid);
if (mid == maxoff)
{
/*
* Right infinity, page already correctly chosen with a help of
* dataIsMoveRight
*/
result = -1;
}
else
{
pitem = (PostingItem *) GinDataPageGetItem(page, mid);
result = ginCompareItemPointers(btree->items + btree->curitem, &(pitem->key));
}
if (result == 0)
{
stack->off = mid;
return PostingItemGetBlockNumber(pitem);
}
else if (result > 0)
low = mid + 1;
else
high = mid;
}
Assert(high >= FirstOffsetNumber && high <= maxoff);
stack->off = high;
pitem = (PostingItem *) GinDataPageGetItem(page, high);
return PostingItemGetBlockNumber(pitem);
}
/*
* Searches correct position for value on leaf page.
* Page should be correctly chosen.
* Returns true if value found on page.
*/
static bool
dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack)
{
Page page = BufferGetPage(stack->buffer);
OffsetNumber low,
high;
int result;
Assert(GinPageIsLeaf(page));
Assert(GinPageIsData(page));
if (btree->fullScan)
{
stack->off = FirstOffsetNumber;
return TRUE;
}
low = FirstOffsetNumber;
high = GinPageGetOpaque(page)->maxoff;
if (high < low)
{
stack->off = FirstOffsetNumber;
return false;
}
high++;
while (high > low)
{
OffsetNumber mid = low + ((high - low) / 2);
result = ginCompareItemPointers(btree->items + btree->curitem, (ItemPointer) GinDataPageGetItem(page, mid));
if (result == 0)
{
stack->off = mid;
return true;
}
else if (result > 0)
low = mid + 1;
else
high = mid;
}
stack->off = high;
return false;
}
/*
* Finds links to blkno on non-leaf page, returns
* offset of PostingItem
*/
static OffsetNumber
dataFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
{
OffsetNumber i,
maxoff = GinPageGetOpaque(page)->maxoff;
PostingItem *pitem;
Assert(!GinPageIsLeaf(page));
Assert(GinPageIsData(page));
/* if page isn't changed, we return storedOff */
if (storedOff >= FirstOffsetNumber && storedOff <= maxoff)
{
pitem = (PostingItem *) GinDataPageGetItem(page, storedOff);
if (PostingItemGetBlockNumber(pitem) == blkno)
return storedOff;
/*
* we hope, that needed pointer goes to right. It's true if there
* wasn't a deletion
*/
for (i = storedOff + 1; i <= maxoff; i++)
{
pitem = (PostingItem *) GinDataPageGetItem(page, i);
if (PostingItemGetBlockNumber(pitem) == blkno)
return i;
}
maxoff = storedOff - 1;
}
/* last chance */
for (i = FirstOffsetNumber; i <= maxoff; i++)
{
pitem = (PostingItem *) GinDataPageGetItem(page, i);
if (PostingItemGetBlockNumber(pitem) == blkno)
return i;
}
return InvalidOffsetNumber;
}
/*
* returns blkno of leftmost child
*/
static BlockNumber
dataGetLeftMostPage(GinBtree btree, Page page)
{
PostingItem *pitem;
Assert(!GinPageIsLeaf(page));
Assert(GinPageIsData(page));
Assert(GinPageGetOpaque(page)->maxoff >= FirstOffsetNumber);
pitem = (PostingItem *) GinDataPageGetItem(page, FirstOffsetNumber);
return PostingItemGetBlockNumber(pitem);
}
/*
* add ItemPointer or PostingItem to page. data should point to
* correct value! depending on leaf or non-leaf page
*/
void
GinDataPageAddItem(Page page, void *data, OffsetNumber offset)
{
OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
char *ptr;
if (offset == InvalidOffsetNumber)
{
ptr = GinDataPageGetItem(page, maxoff + 1);
}
else
{
ptr = GinDataPageGetItem(page, offset);
if (maxoff + 1 - offset != 0)
memmove(ptr + GinSizeOfDataPageItem(page),
ptr,
(maxoff - offset + 1) * GinSizeOfDataPageItem(page));
}
memcpy(ptr, data, GinSizeOfDataPageItem(page));
GinPageGetOpaque(page)->maxoff++;
}
/*
* Deletes posting item from non-leaf page
*/
void
GinPageDeletePostingItem(Page page, OffsetNumber offset)
{
OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
Assert(!GinPageIsLeaf(page));
Assert(offset >= FirstOffsetNumber && offset <= maxoff);
if (offset != maxoff)
memmove(GinDataPageGetItem(page, offset), GinDataPageGetItem(page, offset + 1),
sizeof(PostingItem) * (maxoff - offset));
GinPageGetOpaque(page)->maxoff--;
}
/*
* checks space to install new value,
* item pointer never deletes!
*/
static bool
dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
{
Page page = BufferGetPage(buf);
Assert(GinPageIsData(page));
Assert(!btree->isDelete);
if (GinPageIsLeaf(page))
{
if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
{
if ((btree->nitem - btree->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
return true;
}
else if (sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
return true;
}
else if (sizeof(PostingItem) <= GinDataPageGetFreeSpace(page))
return true;
return false;
}
/*
* In case of previous split update old child blkno to
* new right page
* item pointer never deletes!
*/
static BlockNumber
dataPrepareData(GinBtree btree, Page page, OffsetNumber off)
{
BlockNumber ret = InvalidBlockNumber;
Assert(GinPageIsData(page));
if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
{
PostingItem *pitem = (PostingItem *) GinDataPageGetItem(page, off);
PostingItemSetBlockNumber(pitem, btree->rightblkno);
ret = btree->rightblkno;
}
btree->rightblkno = InvalidBlockNumber;
return ret;
}
/*
* Places keys to page and fills WAL record. In case leaf page and
* build mode puts all ItemPointers to page.
*/
static void
dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, XLogRecData **prdata)
{
Page page = BufferGetPage(buf);
int sizeofitem = GinSizeOfDataPageItem(page);
/* these must be static so they can be returned to caller */
static XLogRecData rdata[3];
static ginxlogInsert data;
*prdata = rdata;
Assert(GinPageIsData(page));
data.updateBlkno = dataPrepareData(btree, page, off);
data.node = btree->index->rd_node;
data.blkno = BufferGetBlockNumber(buf);
data.offset = off;
data.nitem = 1;
data.isDelete = FALSE;
data.isData = TRUE;
data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
/*
* For incomplete-split tracking, we need updateBlkno information and the
* inserted item even when we make a full page image of the page, so put
* the buffer reference in a separate XLogRecData entry.
*/
rdata[0].buffer = buf;
rdata[0].buffer_std = FALSE;
rdata[0].data = NULL;
rdata[0].len = 0;
rdata[0].next = &rdata[1];
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) &data;
rdata[1].len = sizeof(ginxlogInsert);
rdata[1].next = &rdata[2];
rdata[2].buffer = InvalidBuffer;
rdata[2].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem));
rdata[2].len = sizeofitem;
rdata[2].next = NULL;
if (GinPageIsLeaf(page))
{
if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
{
/* usually, create index... */
uint32 savedPos = btree->curitem;
while (btree->curitem < btree->nitem)
{
GinDataPageAddItem(page, btree->items + btree->curitem, off);
off++;
btree->curitem++;
}
data.nitem = btree->curitem - savedPos;
rdata[2].len = sizeofitem * data.nitem;
}
else
{
GinDataPageAddItem(page, btree->items + btree->curitem, off);
btree->curitem++;
}
}
else
GinDataPageAddItem(page, &(btree->pitem), off);
}
/*
* split page and fills WAL record. original buffer(lbuf) leaves untouched,
* returns shadow page of lbuf filled new data. In leaf page and build mode puts all
* ItemPointers to pages. Also, in build mode splits data by way to full fulled
* left page
*/
static Page
dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata)
{
char *ptr;
OffsetNumber separator;
ItemPointer bound;
Page lpage = PageGetTempPageCopy(BufferGetPage(lbuf));
ItemPointerData oldbound = *GinDataPageGetRightBound(lpage);
int sizeofitem = GinSizeOfDataPageItem(lpage);
OffsetNumber maxoff = GinPageGetOpaque(lpage)->maxoff;
Page rpage = BufferGetPage(rbuf);
Size pageSize = PageGetPageSize(lpage);
Size freeSpace;
uint32 nCopied = 1;
/* these must be static so they can be returned to caller */
static ginxlogSplit data;
static XLogRecData rdata[4];
static char vector[2 * BLCKSZ];
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
freeSpace = GinDataPageGetFreeSpace(rpage);
*prdata = rdata;
data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
InvalidOffsetNumber : PostingItemGetBlockNumber(&(btree->pitem));
data.updateBlkno = dataPrepareData(btree, lpage, off);
memcpy(vector, GinDataPageGetItem(lpage, FirstOffsetNumber),
maxoff * sizeofitem);
if (GinPageIsLeaf(lpage) && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
{
nCopied = 0;
while (btree->curitem < btree->nitem &&
maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
{
memcpy(vector + maxoff * sizeof(ItemPointerData),
btree->items + btree->curitem,
sizeof(ItemPointerData));
maxoff++;
nCopied++;
btree->curitem++;
}
}
else
{
ptr = vector + (off - 1) * sizeofitem;
if (maxoff + 1 - off != 0)
memmove(ptr + sizeofitem, ptr, (maxoff - off + 1) * sizeofitem);
if (GinPageIsLeaf(lpage))
{
memcpy(ptr, btree->items + btree->curitem, sizeofitem);
btree->curitem++;
}
else
memcpy(ptr, &(btree->pitem), sizeofitem);
maxoff++;
}
/*
* we suppose that during index creation table scaned from begin to end,
* so ItemPointers are monotonically increased..
*/
if (btree->isBuild && GinPageRightMost(lpage))
separator = freeSpace / sizeofitem;
else
separator = maxoff / 2;
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
memcpy(GinDataPageGetItem(lpage, FirstOffsetNumber), vector, separator * sizeofitem);
GinPageGetOpaque(lpage)->maxoff = separator;
memcpy(GinDataPageGetItem(rpage, FirstOffsetNumber),
vector + separator * sizeofitem, (maxoff - separator) * sizeofitem);
GinPageGetOpaque(rpage)->maxoff = maxoff - separator;
PostingItemSetBlockNumber(&(btree->pitem), BufferGetBlockNumber(lbuf));
if (GinPageIsLeaf(lpage))
btree->pitem.key = *(ItemPointerData *) GinDataPageGetItem(lpage,
GinPageGetOpaque(lpage)->maxoff);
else
btree->pitem.key = ((PostingItem *) GinDataPageGetItem(lpage,
GinPageGetOpaque(lpage)->maxoff))->key;
btree->rightblkno = BufferGetBlockNumber(rbuf);
/* set up right bound for left page */
bound = GinDataPageGetRightBound(lpage);
*bound = btree->pitem.key;
/* set up right bound for right page */
bound = GinDataPageGetRightBound(rpage);
*bound = oldbound;
data.node = btree->index->rd_node;
data.rootBlkno = InvalidBlockNumber;
data.lblkno = BufferGetBlockNumber(lbuf);
data.rblkno = BufferGetBlockNumber(rbuf);
data.separator = separator;
data.nitem = maxoff;
data.isData = TRUE;
data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
data.isRootSplit = FALSE;
data.rightbound = oldbound;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &data;
rdata[0].len = sizeof(ginxlogSplit);
rdata[0].next = &rdata[1];
rdata[1].buffer = InvalidBuffer;
rdata[1].data = vector;
rdata[1].len = MAXALIGN(maxoff * sizeofitem);
rdata[1].next = NULL;
return lpage;
}
/*
* Fills new root by right bound values from child.
* Also called from ginxlog, should not use btree
*/
void
ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
{
Page page = BufferGetPage(root),
lpage = BufferGetPage(lbuf),
rpage = BufferGetPage(rbuf);
PostingItem li,
ri;
li.key = *GinDataPageGetRightBound(lpage);
PostingItemSetBlockNumber(&li, BufferGetBlockNumber(lbuf));
GinDataPageAddItem(page, &li, InvalidOffsetNumber);
ri.key = *GinDataPageGetRightBound(rpage);
PostingItemSetBlockNumber(&ri, BufferGetBlockNumber(rbuf));
GinDataPageAddItem(page, &ri, InvalidOffsetNumber);
}
void
ginPrepareDataScan(GinBtree btree, Relation index)
{
memset(btree, 0, sizeof(GinBtreeData));
btree->index = index;
btree->findChildPage = dataLocateItem;
btree->isMoveRight = dataIsMoveRight;
btree->findItem = dataLocateLeafItem;
btree->findChildPtr = dataFindChildPtr;
btree->getLeftMostPage = dataGetLeftMostPage;
btree->isEnoughSpace = dataIsEnoughSpace;
btree->placeToPage = dataPlaceToPage;
btree->splitPage = dataSplitPage;
btree->fillRoot = ginDataFillRoot;
btree->isData = TRUE;
btree->searchMode = FALSE;
btree->isDelete = FALSE;
btree->fullScan = FALSE;
btree->isBuild = FALSE;
}
GinPostingTreeScan *
ginPrepareScanPostingTree(Relation index, BlockNumber rootBlkno, bool searchMode)
{
GinPostingTreeScan *gdi = (GinPostingTreeScan *) palloc0(sizeof(GinPostingTreeScan));
ginPrepareDataScan(&gdi->btree, index);
gdi->btree.searchMode = searchMode;
gdi->btree.fullScan = searchMode;
gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
return gdi;
}
/*
* Inserts array of item pointers, may execute several tree scan (very rare)
*/
void
ginInsertItemPointers(GinPostingTreeScan *gdi,
ItemPointerData *items, uint32 nitem,
GinStatsData *buildStats)
{
BlockNumber rootBlkno = gdi->stack->blkno;
gdi->btree.items = items;
gdi->btree.nitem = nitem;
gdi->btree.curitem = 0;
while (gdi->btree.curitem < gdi->btree.nitem)
{
if (!gdi->stack)
gdi->stack = ginPrepareFindLeafPage(&gdi->btree, rootBlkno);
gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
if (gdi->btree.findItem(&(gdi->btree), gdi->stack))
{
/*
* gdi->btree.items[gdi->btree.curitem] already exists in index
*/
gdi->btree.curitem++;
LockBuffer(gdi->stack->buffer, GIN_UNLOCK);
freeGinBtreeStack(gdi->stack);
}
else
ginInsertValue(&(gdi->btree), gdi->stack, buildStats);
gdi->stack = NULL;
}
}
Buffer
ginScanBeginPostingTree(GinPostingTreeScan *gdi)
{
gdi->stack = ginFindLeafPage(&gdi->btree, gdi->stack);
return gdi->stack->buffer;
}