mirror of
https://github.com/postgres/postgres.git
synced 2025-04-20 00:42:27 +03:00
misleadingly-named WriteBuffer routine, and instead require routines that change buffer pages to call MarkBufferDirty (which does exactly what it says). We also require that they do so before calling XLogInsert; this takes care of the synchronization requirement documented in SyncOneBuffer. Note that because bufmgr takes the buffer content lock (in shared mode) while writing out any buffer, it doesn't matter whether MarkBufferDirty is executed before the buffer content change is complete, so long as the content change is completed before releasing exclusive lock on the buffer. So it's OK to set the dirtybit before we fill in the LSN. This eliminates the former kluge of needing to set the dirtybit in LockBuffer. Aside from making the code more transparent, we can also add some new debugging assertions, in particular that the caller of MarkBufferDirty must hold the buffer content lock, not merely a pin.
876 lines
22 KiB
C
876 lines
22 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* gistxlog.c
|
|
* WAL replay logic for GiST.
|
|
*
|
|
*
|
|
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.14 2006/03/31 23:32:05 tgl Exp $
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "access/genam.h"
|
|
#include "access/gist_private.h"
|
|
#include "access/gistscan.h"
|
|
#include "access/heapam.h"
|
|
#include "catalog/index.h"
|
|
#include "commands/vacuum.h"
|
|
#include "miscadmin.h"
|
|
#include "utils/memutils.h"
|
|
|
|
|
|
typedef struct
|
|
{
|
|
gistxlogPageUpdate *data;
|
|
int len;
|
|
IndexTuple *itup;
|
|
OffsetNumber *todelete;
|
|
} PageUpdateRecord;
|
|
|
|
typedef struct
|
|
{
|
|
gistxlogPage *header;
|
|
IndexTuple *itup;
|
|
} NewPage;
|
|
|
|
typedef struct
|
|
{
|
|
gistxlogPageSplit *data;
|
|
NewPage *page;
|
|
} PageSplitRecord;
|
|
|
|
/* track for incomplete inserts, idea was taken from nbtxlog.c */
|
|
|
|
typedef struct gistIncompleteInsert
|
|
{
|
|
RelFileNode node;
|
|
BlockNumber origblkno; /* for splits */
|
|
ItemPointerData key;
|
|
int lenblk;
|
|
BlockNumber *blkno;
|
|
XLogRecPtr lsn;
|
|
BlockNumber *path;
|
|
int pathlen;
|
|
} gistIncompleteInsert;
|
|
|
|
|
|
static MemoryContext opCtx; /* working memory for operations */
|
|
static MemoryContext insertCtx; /* holds incomplete_inserts list */
|
|
static List *incomplete_inserts;
|
|
|
|
|
|
#define ItemPointerEQ(a, b) \
|
|
( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
|
|
ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
|
|
|
|
|
|
static void
|
|
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
|
|
BlockNumber *blkno, int lenblk,
|
|
PageSplitRecord *xlinfo /* to extract blkno info */ )
|
|
{
|
|
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
|
|
gistIncompleteInsert *ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
|
|
|
|
ninsert->node = node;
|
|
ninsert->key = key;
|
|
ninsert->lsn = lsn;
|
|
|
|
if (lenblk && blkno)
|
|
{
|
|
ninsert->lenblk = lenblk;
|
|
ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
|
|
memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
|
|
ninsert->origblkno = *blkno;
|
|
}
|
|
else
|
|
{
|
|
int i;
|
|
|
|
Assert(xlinfo);
|
|
ninsert->lenblk = xlinfo->data->npage;
|
|
ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
|
|
for (i = 0; i < ninsert->lenblk; i++)
|
|
ninsert->blkno[i] = xlinfo->page[i].header->blkno;
|
|
ninsert->origblkno = xlinfo->data->origblkno;
|
|
}
|
|
Assert(ninsert->lenblk > 0);
|
|
|
|
/*
|
|
* Stick the new incomplete insert onto the front of the list, not the
|
|
* back. This is so that gist_xlog_cleanup will process incompletions
|
|
* in last-in-first-out order.
|
|
*/
|
|
incomplete_inserts = lcons(ninsert, incomplete_inserts);
|
|
|
|
MemoryContextSwitchTo(oldCxt);
|
|
}
|
|
|
|
static void
|
|
forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
|
|
{
|
|
ListCell *l;
|
|
|
|
foreach(l, incomplete_inserts)
|
|
{
|
|
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
|
|
|
|
if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
|
|
{
|
|
/* found */
|
|
incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
|
|
pfree(insert->blkno);
|
|
pfree(insert);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
|
|
{
|
|
char *begin = XLogRecGetData(record),
|
|
*ptr;
|
|
int i = 0,
|
|
addpath = 0;
|
|
|
|
decoded->data = (gistxlogPageUpdate *) begin;
|
|
|
|
if (decoded->data->ntodelete)
|
|
{
|
|
decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath);
|
|
addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete);
|
|
}
|
|
else
|
|
decoded->todelete = NULL;
|
|
|
|
decoded->len = 0;
|
|
ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
|
|
while (ptr - begin < record->xl_len)
|
|
{
|
|
decoded->len++;
|
|
ptr += IndexTupleSize((IndexTuple) ptr);
|
|
}
|
|
|
|
decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
|
|
|
|
ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
|
|
while (ptr - begin < record->xl_len)
|
|
{
|
|
decoded->itup[i] = (IndexTuple) ptr;
|
|
ptr += IndexTupleSize(decoded->itup[i]);
|
|
i++;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* redo any page update (except page split)
|
|
*/
|
|
static void
|
|
gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
|
|
{
|
|
PageUpdateRecord xlrec;
|
|
Relation reln;
|
|
Buffer buffer;
|
|
Page page;
|
|
|
|
/* nothing to do if whole page was backed up (and no info to do it with) */
|
|
if (record->xl_info & XLR_BKP_BLOCK_1)
|
|
return;
|
|
|
|
decodePageUpdateRecord(&xlrec, record);
|
|
|
|
reln = XLogOpenRelation(xlrec.data->node);
|
|
buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
|
|
if (!BufferIsValid(buffer))
|
|
return;
|
|
page = (Page) BufferGetPage(buffer);
|
|
|
|
if (XLByteLE(lsn, PageGetLSN(page)))
|
|
{
|
|
UnlockReleaseBuffer(buffer);
|
|
return;
|
|
}
|
|
|
|
if (xlrec.data->isemptypage)
|
|
{
|
|
while (!PageIsEmpty(page))
|
|
PageIndexTupleDelete(page, FirstOffsetNumber);
|
|
|
|
if (xlrec.data->blkno == GIST_ROOT_BLKNO)
|
|
GistPageSetLeaf(page);
|
|
else
|
|
GistPageSetDeleted(page);
|
|
}
|
|
else
|
|
{
|
|
if (isnewroot)
|
|
GISTInitBuffer(buffer, 0);
|
|
else if (xlrec.data->ntodelete)
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < xlrec.data->ntodelete; i++)
|
|
PageIndexTupleDelete(page, xlrec.todelete[i]);
|
|
if (GistPageIsLeaf(page))
|
|
GistMarkTuplesDeleted(page);
|
|
}
|
|
|
|
/* add tuples */
|
|
if (xlrec.len > 0)
|
|
gistfillbuffer(reln, page, xlrec.itup, xlrec.len, InvalidOffsetNumber);
|
|
|
|
/*
|
|
* special case: leafpage, nothing to insert, nothing to delete, then
|
|
* vacuum marks page
|
|
*/
|
|
if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0)
|
|
GistClearTuplesDeleted(page);
|
|
}
|
|
|
|
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
|
|
PageSetLSN(page, lsn);
|
|
PageSetTLI(page, ThisTimeLineID);
|
|
MarkBufferDirty(buffer);
|
|
UnlockReleaseBuffer(buffer);
|
|
|
|
if (ItemPointerIsValid(&(xlrec.data->key)))
|
|
{
|
|
if (incomplete_inserts != NIL)
|
|
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
|
|
|
|
if (!isnewroot && xlrec.data->blkno != GIST_ROOT_BLKNO)
|
|
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
|
|
&(xlrec.data->blkno), 1,
|
|
NULL);
|
|
}
|
|
}
|
|
|
|
static void
|
|
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
|
|
{
|
|
char *begin = XLogRecGetData(record),
|
|
*ptr;
|
|
int j,
|
|
i = 0;
|
|
|
|
decoded->data = (gistxlogPageSplit *) begin;
|
|
decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
|
|
|
|
ptr = begin + sizeof(gistxlogPageSplit);
|
|
for (i = 0; i < decoded->data->npage; i++)
|
|
{
|
|
Assert(ptr - begin < record->xl_len);
|
|
decoded->page[i].header = (gistxlogPage *) ptr;
|
|
ptr += sizeof(gistxlogPage);
|
|
|
|
decoded->page[i].itup = (IndexTuple *)
|
|
palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
|
|
j = 0;
|
|
while (j < decoded->page[i].header->num)
|
|
{
|
|
Assert(ptr - begin < record->xl_len);
|
|
decoded->page[i].itup[j] = (IndexTuple) ptr;
|
|
ptr += IndexTupleSize((IndexTuple) ptr);
|
|
j++;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
|
|
{
|
|
PageSplitRecord xlrec;
|
|
Relation reln;
|
|
Buffer buffer;
|
|
Page page;
|
|
int i;
|
|
int flags;
|
|
|
|
decodePageSplitRecord(&xlrec, record);
|
|
reln = XLogOpenRelation(xlrec.data->node);
|
|
flags = xlrec.data->origleaf ? F_LEAF : 0;
|
|
|
|
/* loop around all pages */
|
|
for (i = 0; i < xlrec.data->npage; i++)
|
|
{
|
|
NewPage *newpage = xlrec.page + i;
|
|
|
|
buffer = XLogReadBuffer(reln, newpage->header->blkno, true);
|
|
Assert(BufferIsValid(buffer));
|
|
page = (Page) BufferGetPage(buffer);
|
|
|
|
/* ok, clear buffer */
|
|
GISTInitBuffer(buffer, flags);
|
|
|
|
/* and fill it */
|
|
gistfillbuffer(reln, page, newpage->itup, newpage->header->num, FirstOffsetNumber);
|
|
|
|
PageSetLSN(page, lsn);
|
|
PageSetTLI(page, ThisTimeLineID);
|
|
MarkBufferDirty(buffer);
|
|
UnlockReleaseBuffer(buffer);
|
|
}
|
|
|
|
if (ItemPointerIsValid(&(xlrec.data->key)))
|
|
{
|
|
if (incomplete_inserts != NIL)
|
|
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
|
|
|
|
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
|
|
NULL, 0,
|
|
&xlrec);
|
|
}
|
|
}
|
|
|
|
static void
|
|
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
|
|
{
|
|
RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
|
|
Relation reln;
|
|
Buffer buffer;
|
|
Page page;
|
|
|
|
reln = XLogOpenRelation(*node);
|
|
buffer = XLogReadBuffer(reln, GIST_ROOT_BLKNO, true);
|
|
Assert(BufferIsValid(buffer));
|
|
page = (Page) BufferGetPage(buffer);
|
|
|
|
GISTInitBuffer(buffer, F_LEAF);
|
|
|
|
PageSetLSN(page, lsn);
|
|
PageSetTLI(page, ThisTimeLineID);
|
|
|
|
MarkBufferDirty(buffer);
|
|
UnlockReleaseBuffer(buffer);
|
|
}
|
|
|
|
static void
|
|
gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
|
|
{
|
|
char *begin = XLogRecGetData(record),
|
|
*ptr;
|
|
gistxlogInsertComplete *xlrec;
|
|
|
|
xlrec = (gistxlogInsertComplete *) begin;
|
|
|
|
ptr = begin + sizeof(gistxlogInsertComplete);
|
|
while (ptr - begin < record->xl_len)
|
|
{
|
|
Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
|
|
forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
|
|
ptr += sizeof(ItemPointerData);
|
|
}
|
|
}
|
|
|
|
void
|
|
gist_redo(XLogRecPtr lsn, XLogRecord *record)
|
|
{
|
|
uint8 info = record->xl_info & ~XLR_INFO_MASK;
|
|
|
|
MemoryContext oldCxt;
|
|
|
|
oldCxt = MemoryContextSwitchTo(opCtx);
|
|
switch (info)
|
|
{
|
|
case XLOG_GIST_PAGE_UPDATE:
|
|
gistRedoPageUpdateRecord(lsn, record, false);
|
|
break;
|
|
case XLOG_GIST_NEW_ROOT:
|
|
gistRedoPageUpdateRecord(lsn, record, true);
|
|
break;
|
|
case XLOG_GIST_PAGE_SPLIT:
|
|
gistRedoPageSplitRecord(lsn, record);
|
|
break;
|
|
case XLOG_GIST_CREATE_INDEX:
|
|
gistRedoCreateIndex(lsn, record);
|
|
break;
|
|
case XLOG_GIST_INSERT_COMPLETE:
|
|
gistRedoCompleteInsert(lsn, record);
|
|
break;
|
|
default:
|
|
elog(PANIC, "gist_redo: unknown op code %u", info);
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldCxt);
|
|
MemoryContextReset(opCtx);
|
|
}
|
|
|
|
static void
|
|
out_target(StringInfo buf, RelFileNode node, ItemPointerData key)
|
|
{
|
|
appendStringInfo(buf, "rel %u/%u/%u; tid %u/%u",
|
|
node.spcNode, node.dbNode, node.relNode,
|
|
ItemPointerGetBlockNumber(&key),
|
|
ItemPointerGetOffsetNumber(&key));
|
|
}
|
|
|
|
static void
|
|
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
|
|
{
|
|
out_target(buf, xlrec->node, xlrec->key);
|
|
appendStringInfo(buf, "; block number %u", xlrec->blkno);
|
|
}
|
|
|
|
static void
|
|
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
|
|
{
|
|
appendStringInfo(buf, "page_split: ");
|
|
out_target(buf, xlrec->node, xlrec->key);
|
|
appendStringInfo(buf, "; block number %u splits to %d pages",
|
|
xlrec->origblkno, xlrec->npage);
|
|
}
|
|
|
|
void
|
|
gist_desc(StringInfo buf, uint8 xl_info, char *rec)
|
|
{
|
|
uint8 info = xl_info & ~XLR_INFO_MASK;
|
|
|
|
switch (info)
|
|
{
|
|
case XLOG_GIST_PAGE_UPDATE:
|
|
appendStringInfo(buf, "page_update: ");
|
|
out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
|
|
break;
|
|
case XLOG_GIST_NEW_ROOT:
|
|
appendStringInfo(buf, "new_root: ");
|
|
out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
|
|
break;
|
|
case XLOG_GIST_PAGE_SPLIT:
|
|
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
|
|
break;
|
|
case XLOG_GIST_CREATE_INDEX:
|
|
appendStringInfo(buf, "create_index: rel %u/%u/%u",
|
|
((RelFileNode *) rec)->spcNode,
|
|
((RelFileNode *) rec)->dbNode,
|
|
((RelFileNode *) rec)->relNode);
|
|
break;
|
|
case XLOG_GIST_INSERT_COMPLETE:
|
|
appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
|
|
((gistxlogInsertComplete *) rec)->node.spcNode,
|
|
((gistxlogInsertComplete *) rec)->node.dbNode,
|
|
((gistxlogInsertComplete *) rec)->node.relNode);
|
|
break;
|
|
default:
|
|
appendStringInfo(buf, "unknown gist op code %u", info);
|
|
break;
|
|
}
|
|
}
|
|
|
|
IndexTuple
|
|
gist_form_invalid_tuple(BlockNumber blkno)
|
|
{
|
|
/*
|
|
* we don't alloc space for null's bitmap, this is invalid tuple, be
|
|
* carefull in read and write code
|
|
*/
|
|
Size size = IndexInfoFindDataOffset(0);
|
|
IndexTuple tuple = (IndexTuple) palloc0(size);
|
|
|
|
tuple->t_info |= size;
|
|
|
|
ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
|
|
GistTupleSetInvalid(tuple);
|
|
|
|
return tuple;
|
|
}
|
|
|
|
|
|
static void
|
|
gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
|
|
{
|
|
GISTInsertStack *top;
|
|
|
|
insert->pathlen = 0;
|
|
insert->path = NULL;
|
|
|
|
if ((top = gistFindPath(index, insert->origblkno)) != NULL)
|
|
{
|
|
int i;
|
|
GISTInsertStack *ptr;
|
|
|
|
for (ptr = top; ptr; ptr = ptr->parent)
|
|
insert->pathlen++;
|
|
|
|
insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
|
|
|
|
i = 0;
|
|
for (ptr = top; ptr; ptr = ptr->parent)
|
|
insert->path[i++] = ptr->blkno;
|
|
}
|
|
else
|
|
elog(LOG, "lost parent for block %u", insert->origblkno);
|
|
}
|
|
|
|
/*
|
|
* Continue insert after crash. In normal situations, there aren't any
|
|
* incomplete inserts, but if a crash occurs partway through an insertion
|
|
* sequence, we'll need to finish making the index valid at the end of WAL
|
|
* replay.
|
|
*
|
|
* Note that we assume the index is now in a valid state, except for the
|
|
* unfinished insertion. In particular it's safe to invoke gistFindPath();
|
|
* there shouldn't be any garbage pages for it to run into.
|
|
*
|
|
* Although stored LSN in gistIncompleteInsert is a LSN of child page,
|
|
* we can compare it with LSN of parent, because parent is always locked
|
|
* while we change child page (look at gistmakedeal). So if parent's LSN is
|
|
* less than stored lsn then changes in parent aren't done yet.
|
|
*/
|
|
static void
|
|
gistContinueInsert(gistIncompleteInsert *insert)
|
|
{
|
|
IndexTuple *itup;
|
|
int i,
|
|
lenitup;
|
|
Relation index;
|
|
|
|
index = XLogOpenRelation(insert->node);
|
|
|
|
/*
|
|
* needed vector itup never will be more than initial lenblkno+2, because
|
|
* during this processing Indextuple can be only smaller
|
|
*/
|
|
lenitup = insert->lenblk;
|
|
itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
|
|
|
|
for (i = 0; i < insert->lenblk; i++)
|
|
itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
|
|
|
|
if (insert->origblkno == GIST_ROOT_BLKNO)
|
|
{
|
|
/*
|
|
* it was split root, so we should only make new root. it can't be
|
|
* simple insert into root, look at call pushIncompleteInsert in
|
|
* gistRedoPageSplitRecord
|
|
*/
|
|
Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);
|
|
Page page;
|
|
|
|
Assert(BufferIsValid(buffer));
|
|
page = BufferGetPage(buffer);
|
|
|
|
GISTInitBuffer(buffer, 0);
|
|
gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
|
|
|
|
PageSetLSN(page, insert->lsn);
|
|
PageSetTLI(page, ThisTimeLineID);
|
|
|
|
MarkBufferDirty(buffer);
|
|
UnlockReleaseBuffer(buffer);
|
|
|
|
/*
|
|
* XXX fall out to avoid making LOG message at bottom of routine.
|
|
* I think the logic for when to emit that message is all wrong...
|
|
*/
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
Buffer *buffers;
|
|
Page *pages;
|
|
int numbuffer;
|
|
|
|
/* construct path */
|
|
gistxlogFindPath(index, insert);
|
|
|
|
Assert(insert->pathlen > 0);
|
|
|
|
buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
|
|
pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
|
|
|
|
for (i = 0; i < insert->pathlen; i++)
|
|
{
|
|
int j,
|
|
k,
|
|
pituplen = 0,
|
|
childfound = 0;
|
|
|
|
numbuffer = 1;
|
|
buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]);
|
|
LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE);
|
|
pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]);
|
|
|
|
if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1])))
|
|
{
|
|
UnlockReleaseBuffer(buffers[numbuffer - 1]);
|
|
return;
|
|
}
|
|
|
|
pituplen = PageGetMaxOffsetNumber(pages[numbuffer - 1]);
|
|
|
|
/* remove old IndexTuples */
|
|
for (j = 0; j < pituplen && childfound < lenitup; j++)
|
|
{
|
|
BlockNumber blkno;
|
|
ItemId iid = PageGetItemId(pages[numbuffer - 1], j + FirstOffsetNumber);
|
|
IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer - 1], iid);
|
|
|
|
blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
|
|
|
|
for (k = 0; k < lenitup; k++)
|
|
if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
|
|
{
|
|
PageIndexTupleDelete(pages[numbuffer - 1], j + FirstOffsetNumber);
|
|
j--;
|
|
pituplen--;
|
|
childfound++;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (gistnospace(pages[numbuffer - 1], itup, lenitup))
|
|
{
|
|
/* no space left on page, so we must split */
|
|
buffers[numbuffer] = ReadBuffer(index, P_NEW);
|
|
LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
|
|
GISTInitBuffer(buffers[numbuffer], 0);
|
|
pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
|
|
gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber);
|
|
numbuffer++;
|
|
|
|
if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
|
|
{
|
|
IndexTuple *parentitup;
|
|
|
|
/*
|
|
* we split root, just copy tuples from old root to new
|
|
* page
|
|
*/
|
|
parentitup = gistextractbuffer(buffers[numbuffer - 1],
|
|
&pituplen);
|
|
|
|
/* sanity check */
|
|
if (i + 1 != insert->pathlen)
|
|
elog(PANIC, "unexpected pathlen in index \"%s\"",
|
|
RelationGetRelationName(index));
|
|
|
|
/* fill new page */
|
|
buffers[numbuffer] = ReadBuffer(index, P_NEW);
|
|
LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
|
|
GISTInitBuffer(buffers[numbuffer], 0);
|
|
pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
|
|
gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
|
|
numbuffer++;
|
|
|
|
/* fill root page */
|
|
GISTInitBuffer(buffers[0], 0);
|
|
for (j = 1; j < numbuffer; j++)
|
|
{
|
|
IndexTuple tuple = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
|
|
|
|
if (PageAddItem(pages[0],
|
|
(Item) tuple,
|
|
IndexTupleSize(tuple),
|
|
(OffsetNumber) j,
|
|
LP_USED) == InvalidOffsetNumber)
|
|
elog(PANIC, "failed to add item to index page in \"%s\"",
|
|
RelationGetRelationName(index));
|
|
}
|
|
}
|
|
}
|
|
else
|
|
gistfillbuffer(index, pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber);
|
|
|
|
lenitup = numbuffer;
|
|
for (j = 0; j < numbuffer; j++)
|
|
{
|
|
itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
|
|
PageSetLSN(pages[j], insert->lsn);
|
|
PageSetTLI(pages[j], ThisTimeLineID);
|
|
GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
|
|
MarkBufferDirty(buffers[j]);
|
|
UnlockReleaseBuffer(buffers[j]);
|
|
}
|
|
}
|
|
}
|
|
|
|
ereport(LOG,
|
|
(errmsg("index %u/%u/%u needs VACUUM or REINDEX to finish crash recovery",
|
|
insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
|
|
errdetail("Incomplete insertion detected during crash replay.")));
|
|
}
|
|
|
|
void
|
|
gist_xlog_startup(void)
|
|
{
|
|
incomplete_inserts = NIL;
|
|
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
|
|
"GiST recovery temporary context",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
opCtx = createTempGistContext();
|
|
}
|
|
|
|
void
|
|
gist_xlog_cleanup(void)
|
|
{
|
|
ListCell *l;
|
|
MemoryContext oldCxt;
|
|
|
|
oldCxt = MemoryContextSwitchTo(opCtx);
|
|
foreach(l, incomplete_inserts)
|
|
{
|
|
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
|
|
|
|
gistContinueInsert(insert);
|
|
MemoryContextReset(opCtx);
|
|
}
|
|
MemoryContextSwitchTo(oldCxt);
|
|
|
|
MemoryContextDelete(opCtx);
|
|
MemoryContextDelete(insertCtx);
|
|
}
|
|
|
|
|
|
XLogRecData *
|
|
formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
|
|
ItemPointer key, SplitedPageLayout *dist)
|
|
{
|
|
XLogRecData *rdata;
|
|
gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit));
|
|
SplitedPageLayout *ptr;
|
|
int npage = 0,
|
|
cur = 1;
|
|
|
|
ptr = dist;
|
|
while (ptr)
|
|
{
|
|
npage++;
|
|
ptr = ptr->next;
|
|
}
|
|
|
|
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
|
|
|
|
xlrec->node = node;
|
|
xlrec->origblkno = blkno;
|
|
xlrec->origleaf = page_is_leaf;
|
|
xlrec->npage = (uint16) npage;
|
|
if (key)
|
|
xlrec->key = *key;
|
|
else
|
|
ItemPointerSetInvalid(&(xlrec->key));
|
|
|
|
rdata[0].buffer = InvalidBuffer;
|
|
rdata[0].data = (char *) xlrec;
|
|
rdata[0].len = sizeof(gistxlogPageSplit);
|
|
rdata[0].next = NULL;
|
|
|
|
ptr = dist;
|
|
while (ptr)
|
|
{
|
|
rdata[cur].buffer = InvalidBuffer;
|
|
rdata[cur].data = (char *) &(ptr->block);
|
|
rdata[cur].len = sizeof(gistxlogPage);
|
|
rdata[cur - 1].next = &(rdata[cur]);
|
|
cur++;
|
|
|
|
rdata[cur].buffer = InvalidBuffer;
|
|
rdata[cur].data = (char *) (ptr->list);
|
|
rdata[cur].len = ptr->lenlist;
|
|
rdata[cur - 1].next = &(rdata[cur]);
|
|
rdata[cur].next = NULL;
|
|
cur++;
|
|
ptr = ptr->next;
|
|
}
|
|
|
|
return rdata;
|
|
}
|
|
|
|
/*
|
|
* Construct the rdata array for an XLOG record describing a page update
|
|
* (deletion and/or insertion of tuples on a single index page).
|
|
*
|
|
* Note that both the todelete array and the tuples are marked as belonging
|
|
* to the target buffer; they need not be stored in XLOG if XLogInsert decides
|
|
* to log the whole buffer contents instead. Also, we take care that there's
|
|
* at least one rdata item referencing the buffer, even when ntodelete and
|
|
* ituplen are both zero; this ensures that XLogInsert knows about the buffer.
|
|
*/
|
|
XLogRecData *
|
|
formUpdateRdata(RelFileNode node, Buffer buffer,
|
|
OffsetNumber *todelete, int ntodelete, bool emptypage,
|
|
IndexTuple *itup, int ituplen, ItemPointer key)
|
|
{
|
|
XLogRecData *rdata;
|
|
gistxlogPageUpdate *xlrec;
|
|
int cur,
|
|
i;
|
|
|
|
/* ugly wart in API: emptypage causes us to ignore other inputs */
|
|
if (emptypage)
|
|
ntodelete = ituplen = 0;
|
|
|
|
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen));
|
|
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
|
|
|
|
xlrec->node = node;
|
|
xlrec->blkno = BufferGetBlockNumber(buffer);
|
|
xlrec->ntodelete = ntodelete;
|
|
xlrec->isemptypage = emptypage;
|
|
if (key)
|
|
xlrec->key = *key;
|
|
else
|
|
ItemPointerSetInvalid(&(xlrec->key));
|
|
|
|
rdata[0].data = (char *) xlrec;
|
|
rdata[0].len = sizeof(gistxlogPageUpdate);
|
|
rdata[0].buffer = InvalidBuffer;
|
|
rdata[0].next = &(rdata[1]);
|
|
|
|
rdata[1].data = (char *) todelete;
|
|
rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete);
|
|
rdata[1].buffer = buffer;
|
|
rdata[1].buffer_std = true;
|
|
rdata[1].next = NULL;
|
|
|
|
/* new tuples */
|
|
cur = 2;
|
|
for (i = 0; i < ituplen; i++)
|
|
{
|
|
rdata[cur - 1].next = &(rdata[cur]);
|
|
rdata[cur].data = (char *) (itup[i]);
|
|
rdata[cur].len = IndexTupleSize(itup[i]);
|
|
rdata[cur].buffer = buffer;
|
|
rdata[cur].buffer_std = true;
|
|
rdata[cur].next = NULL;
|
|
cur++;
|
|
}
|
|
|
|
return rdata;
|
|
}
|
|
|
|
XLogRecPtr
|
|
gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len)
|
|
{
|
|
gistxlogInsertComplete xlrec;
|
|
XLogRecData rdata[2];
|
|
XLogRecPtr recptr;
|
|
|
|
Assert(len > 0);
|
|
xlrec.node = node;
|
|
|
|
rdata[0].buffer = InvalidBuffer;
|
|
rdata[0].data = (char *) &xlrec;
|
|
rdata[0].len = sizeof(gistxlogInsertComplete);
|
|
rdata[0].next = &(rdata[1]);
|
|
|
|
rdata[1].buffer = InvalidBuffer;
|
|
rdata[1].data = (char *) keys;
|
|
rdata[1].len = sizeof(ItemPointerData) * len;
|
|
rdata[1].next = NULL;
|
|
|
|
START_CRIT_SECTION();
|
|
|
|
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
|
|
|
|
END_CRIT_SECTION();
|
|
|
|
return recptr;
|
|
}
|