1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-13 07:41:39 +03:00
Files
postgres/src/backend/access/gist/gistxlog.c
Teodor Sigaev 22f519c92a Fix bug introduced by microvacuum for GiST
Commit 013ebc0a7b introduces microvacuum for
GiST, deletetion of tuple marked LP_DEAD uses IndexPageMultiDelete while
recovery code uses IndexPageTupleDelete in loop. This causes a difference
in offset numbers of tuples to delete. Patch introduces usage of
IndexPageMultiDelete in GiST except gistplacetopage() where only one tuple is
deleted at once. That also slightly improve performance, because
IndexPageMultiDelete is more effective.

Patch changes WAL format, so bump wal page magic.

Bug report from Jeff Janes
Diagnostic and patch by Anastasia Lubennikova and me
2015-09-17 14:22:37 +03:00

423 lines
10 KiB
C

/*-------------------------------------------------------------------------
*
* gistxlog.c
* WAL replay logic for GiST.
*
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/gist/gistxlog.c
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/gist_private.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "utils/memutils.h"
static MemoryContext opCtx; /* working memory for operations */
/*
* Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
*
* Even if the WAL record includes a full-page image, we have to update the
* follow-right flag, because that change is not included in the full-page
* image. To be sure that the intermediate state with the wrong flag value is
* not visible to concurrent Hot Standby queries, this function handles
* restoring the full-page image as well as updating the flag. (Note that
* we never need to do anything else to the child page in the current WAL
* action.)
*/
static void
gistRedoClearFollowRight(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
Page page;
XLogRedoAction action;
/*
* Note that we still update the page even if it was restored from a full
* page image, because the updated NSN is not included in the image.
*/
action = XLogReadBufferForRedo(record, block_id, &buffer);
if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
{
page = BufferGetPage(buffer);
GistPageSetNSN(page, lsn);
GistClearFollowRight(page);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/*
* redo any page update (except page split)
*/
static void
gistRedoPageUpdateRecord(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record);
Buffer buffer;
Page page;
if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
{
char *begin;
char *data;
Size datalen;
int ninserted = 0;
data = begin = XLogRecGetBlockData(record, 0, &datalen);
page = (Page) BufferGetPage(buffer);
/* Delete old tuples */
if (xldata->ntodelete > 0)
{
OffsetNumber *todelete = (OffsetNumber *) data;
data += sizeof(OffsetNumber) * xldata->ntodelete;
PageIndexMultiDelete(page, todelete, xldata->ntodelete);
if (GistPageIsLeaf(page))
GistMarkTuplesDeleted(page);
}
/* add tuples */
if (data - begin < datalen)
{
OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
OffsetNumberNext(PageGetMaxOffsetNumber(page));
while (data - begin < datalen)
{
IndexTuple itup = (IndexTuple) data;
Size sz = IndexTupleSize(itup);
OffsetNumber l;
data += sz;
l = PageAddItem(page, (Item) itup, sz, off, false, false);
if (l == InvalidOffsetNumber)
elog(ERROR, "failed to add item to GiST index page, size %d bytes",
(int) sz);
off++;
ninserted++;
}
}
Assert(ninserted == xldata->ntoinsert);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
/*
* Fix follow-right data on left child page
*
* This must be done while still holding the lock on the target page. Note
* that even if the target page no longer exists, we still attempt to
* replay the change on the child page.
*/
if (XLogRecHasBlockRef(record, 1))
gistRedoClearFollowRight(record, 1);
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
}
/*
* Returns an array of index pointers.
*/
static IndexTuple *
decodePageSplitRecord(char *begin, int len, int *n)
{
char *ptr;
int i = 0;
IndexTuple *tuples;
/* extract the number of tuples */
memcpy(n, begin, sizeof(int));
ptr = begin + sizeof(int);
tuples = palloc(*n * sizeof(IndexTuple));
for (i = 0; i < *n; i++)
{
Assert(ptr - begin < len);
tuples[i] = (IndexTuple) ptr;
ptr += IndexTupleSize((IndexTuple) ptr);
}
Assert(ptr - begin == len);
return tuples;
}
static void
gistRedoPageSplitRecord(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
Buffer firstbuffer = InvalidBuffer;
Buffer buffer;
Page page;
int i;
bool isrootsplit = false;
/*
* We must hold lock on the first-listed page throughout the action,
* including while updating the left child page (if any). We can unlock
* remaining pages in the list as soon as they've been written, because
* there is no path for concurrent queries to reach those pages without
* first visiting the first-listed page.
*/
/* loop around all pages */
for (i = 0; i < xldata->npage; i++)
{
int flags;
char *data;
Size datalen;
int num;
BlockNumber blkno;
IndexTuple *tuples;
XLogRecGetBlockTag(record, i + 1, NULL, NULL, &blkno);
if (blkno == GIST_ROOT_BLKNO)
{
Assert(i == 0);
isrootsplit = true;
}
buffer = XLogInitBufferForRedo(record, i + 1);
page = (Page) BufferGetPage(buffer);
data = XLogRecGetBlockData(record, i + 1, &datalen);
tuples = decodePageSplitRecord(data, datalen, &num);
/* ok, clear buffer */
if (xldata->origleaf && blkno != GIST_ROOT_BLKNO)
flags = F_LEAF;
else
flags = 0;
GISTInitBuffer(buffer, flags);
/* and fill it */
gistfillbuffer(page, tuples, num, FirstOffsetNumber);
if (blkno == GIST_ROOT_BLKNO)
{
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
GistPageSetNSN(page, xldata->orignsn);
GistClearFollowRight(page);
}
else
{
if (i < xldata->npage - 1)
{
BlockNumber nextblkno;
XLogRecGetBlockTag(record, i + 2, NULL, NULL, &nextblkno);
GistPageGetOpaque(page)->rightlink = nextblkno;
}
else
GistPageGetOpaque(page)->rightlink = xldata->origrlink;
GistPageSetNSN(page, xldata->orignsn);
if (i < xldata->npage - 1 && !isrootsplit &&
xldata->markfollowright)
GistMarkFollowRight(page);
else
GistClearFollowRight(page);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
if (i == 0)
firstbuffer = buffer;
else
UnlockReleaseBuffer(buffer);
}
/* Fix follow-right data on left child page, if any */
if (XLogRecHasBlockRef(record, 0))
gistRedoClearFollowRight(record, 0);
/* Finally, release lock on the first page */
UnlockReleaseBuffer(firstbuffer);
}
static void
gistRedoCreateIndex(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
Page page;
buffer = XLogInitBufferForRedo(record, 0);
Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
page = (Page) BufferGetPage(buffer);
GISTInitBuffer(buffer, F_LEAF);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
void
gist_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
MemoryContext oldCxt;
/*
* GiST indexes do not require any conflict processing. NB: If we ever
* implement a similar optimization we have in b-tree, and remove killed
* tuples outside VACUUM, we'll need to handle that here.
*/
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIST_PAGE_UPDATE:
gistRedoPageUpdateRecord(record);
break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(record);
break;
case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(record);
break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
}
MemoryContextSwitchTo(oldCxt);
MemoryContextReset(opCtx);
}
void
gist_xlog_startup(void)
{
opCtx = createTempGistContext();
}
void
gist_xlog_cleanup(void)
{
MemoryContextDelete(opCtx);
}
/*
* Write WAL record of a page split.
*/
XLogRecPtr
gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
SplitedPageLayout *dist,
BlockNumber origrlink, GistNSN orignsn,
Buffer leftchildbuf, bool markfollowright)
{
gistxlogPageSplit xlrec;
SplitedPageLayout *ptr;
int npage = 0;
XLogRecPtr recptr;
int i;
for (ptr = dist; ptr; ptr = ptr->next)
npage++;
xlrec.origrlink = origrlink;
xlrec.orignsn = orignsn;
xlrec.origleaf = page_is_leaf;
xlrec.npage = (uint16) npage;
xlrec.markfollowright = markfollowright;
XLogBeginInsert();
/*
* Include a full page image of the child buf. (only necessary if a
* checkpoint happened since the child page was split)
*/
if (BufferIsValid(leftchildbuf))
XLogRegisterBuffer(0, leftchildbuf, REGBUF_STANDARD);
/*
* NOTE: We register a lot of data. The caller must've called
* XLogEnsureRecordSpace() to prepare for that. We cannot do it here,
* because we're already in a critical section. If you change the number
* of buffer or data registrations here, make sure you modify the
* XLogEnsureRecordSpace() calls accordingly!
*/
XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageSplit));
i = 1;
for (ptr = dist; ptr; ptr = ptr->next)
{
XLogRegisterBuffer(i, ptr->buffer, REGBUF_WILL_INIT);
XLogRegisterBufData(i, (char *) &(ptr->block.num), sizeof(int));
XLogRegisterBufData(i, (char *) ptr->list, ptr->lenlist);
i++;
}
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT);
return recptr;
}
/*
* Write XLOG record describing a page update. The update can include any
* number of deletions and/or insertions of tuples on a single index page.
*
* If this update inserts a downlink for a split page, also record that
* the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
*
* Note that both the todelete array and the tuples are marked as belonging
* to the target buffer; they need not be stored in XLOG if XLogInsert decides
* to log the whole buffer contents instead.
*/
XLogRecPtr
gistXLogUpdate(RelFileNode node, Buffer buffer,
OffsetNumber *todelete, int ntodelete,
IndexTuple *itup, int ituplen,
Buffer leftchildbuf)
{
gistxlogPageUpdate xlrec;
int i;
XLogRecPtr recptr;
xlrec.ntodelete = ntodelete;
xlrec.ntoinsert = ituplen;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(gistxlogPageUpdate));
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) todelete, sizeof(OffsetNumber) * ntodelete);
/* new tuples */
for (i = 0; i < ituplen; i++)
XLogRegisterBufData(0, (char *) (itup[i]), IndexTupleSize(itup[i]));
/*
* Include a full page image of the child buf. (only necessary if a
* checkpoint happened since the child page was split)
*/
if (BufferIsValid(leftchildbuf))
XLogRegisterBuffer(1, leftchildbuf, REGBUF_STANDARD);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE);
return recptr;
}