mirror of
https://github.com/postgres/postgres.git
synced 2025-09-03 15:22:11 +03:00
1. full functional WAL for GiST
2. improve vacuum for gist - use FSM - full vacuum: - reforms parent tuple if it's needed ( tuples was deleted on child page or parent tuple remains invalid after crash recovery ) - truncate index file if possible 3. fixes bugs and mistakes
This commit is contained in:
@@ -8,7 +8,7 @@
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
|
||||
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.2 2005/06/20 10:29:36 teodor Exp $
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
@@ -22,11 +22,13 @@
|
||||
#include "miscadmin.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
typedef struct {
|
||||
gistxlogEntryUpdate *data;
|
||||
int len;
|
||||
IndexTuple *itup;
|
||||
BlockNumber *path;
|
||||
OffsetNumber *todelete;
|
||||
} EntryUpdateRecord;
|
||||
|
||||
typedef struct {
|
||||
@@ -44,6 +46,7 @@ typedef struct {
|
||||
NewPage *page;
|
||||
IndexTuple *itup;
|
||||
BlockNumber *path;
|
||||
OffsetNumber *todelete;
|
||||
} PageSplitRecord;
|
||||
|
||||
/* track for incomplete inserts, idea was taken from nbtxlog.c */
|
||||
@@ -55,6 +58,7 @@ typedef struct gistIncompleteInsert {
|
||||
BlockNumber *blkno;
|
||||
int pathlen;
|
||||
BlockNumber *path;
|
||||
XLogRecPtr lsn;
|
||||
} gistIncompleteInsert;
|
||||
|
||||
|
||||
@@ -65,12 +69,12 @@ static List *incomplete_inserts;
|
||||
|
||||
#define ItemPointerEQ( a, b ) \
|
||||
( \
|
||||
ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \
|
||||
ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
|
||||
ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
|
||||
)
|
||||
|
||||
static void
|
||||
pushIncompleteInsert(RelFileNode node, ItemPointerData key,
|
||||
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
|
||||
BlockNumber *blkno, int lenblk,
|
||||
BlockNumber *path, int pathlen,
|
||||
PageSplitRecord *xlinfo /* to extract blkno info */ ) {
|
||||
@@ -79,6 +83,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key,
|
||||
|
||||
ninsert->node = node;
|
||||
ninsert->key = key;
|
||||
ninsert->lsn = lsn;
|
||||
|
||||
if ( lenblk && blkno ) {
|
||||
ninsert->lenblk = lenblk;
|
||||
@@ -95,7 +100,7 @@ pushIncompleteInsert(RelFileNode node, ItemPointerData key,
|
||||
}
|
||||
Assert( ninsert->lenblk>0 );
|
||||
|
||||
if ( path && ninsert->pathlen ) {
|
||||
if ( path && pathlen ) {
|
||||
ninsert->pathlen = pathlen;
|
||||
ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
|
||||
memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
|
||||
@@ -135,11 +140,17 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
|
||||
decoded->data = (gistxlogEntryUpdate*)begin;
|
||||
|
||||
if ( decoded->data->pathlen ) {
|
||||
addpath = sizeof(BlockNumber) * decoded->data->pathlen;
|
||||
addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
|
||||
decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
|
||||
} else
|
||||
decoded->path = NULL;
|
||||
|
||||
if ( decoded->data->ntodelete ) {
|
||||
decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogEntryUpdate ) + addpath);
|
||||
addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
|
||||
} else
|
||||
decoded->todelete = NULL;
|
||||
|
||||
decoded->len=0;
|
||||
ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
|
||||
while( ptr - begin < record->xl_len ) {
|
||||
@@ -157,7 +168,9 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* redo any page update (except page split)
|
||||
*/
|
||||
static void
|
||||
gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
|
||||
EntryUpdateRecord xlrec;
|
||||
@@ -191,19 +204,39 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
|
||||
}
|
||||
}
|
||||
|
||||
if ( isnewroot )
|
||||
GISTInitBuffer(buffer, 0);
|
||||
else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
|
||||
PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
|
||||
if ( xlrec.data->isemptypage ) {
|
||||
while( !PageIsEmpty(page) )
|
||||
PageIndexTupleDelete( page, FirstOffsetNumber );
|
||||
|
||||
if ( xlrec.data->blkno == GIST_ROOT_BLKNO )
|
||||
GistPageSetLeaf( page );
|
||||
else
|
||||
GistPageSetDeleted( page );
|
||||
} else {
|
||||
if ( isnewroot )
|
||||
GISTInitBuffer(buffer, 0);
|
||||
else if ( xlrec.data->ntodelete ) {
|
||||
int i;
|
||||
for(i=0; i < xlrec.data->ntodelete ; i++)
|
||||
PageIndexTupleDelete(page, xlrec.todelete[i]);
|
||||
if ( GistPageIsLeaf(page) )
|
||||
GistMarkTuplesDeleted(page);
|
||||
}
|
||||
|
||||
/* add tuples */
|
||||
if ( xlrec.len > 0 ) {
|
||||
OffsetNumber off = (PageIsEmpty(page)) ?
|
||||
FirstOffsetNumber
|
||||
:
|
||||
OffsetNumberNext(PageGetMaxOffsetNumber(page));
|
||||
/* add tuples */
|
||||
if ( xlrec.len > 0 ) {
|
||||
OffsetNumber off = (PageIsEmpty(page)) ?
|
||||
FirstOffsetNumber
|
||||
:
|
||||
OffsetNumberNext(PageGetMaxOffsetNumber(page));
|
||||
|
||||
gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
|
||||
gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
|
||||
}
|
||||
|
||||
/* special case: leafpage, nothing to insert, nothing to delete, then
|
||||
vacuum marks page */
|
||||
if ( GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0 )
|
||||
GistClearTuplesDeleted(page);
|
||||
}
|
||||
|
||||
PageSetLSN(page, lsn);
|
||||
@@ -216,7 +249,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
|
||||
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
|
||||
|
||||
if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
|
||||
pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
|
||||
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
|
||||
&(xlrec.data->blkno), 1,
|
||||
xlrec.path, xlrec.data->pathlen,
|
||||
NULL);
|
||||
@@ -233,11 +266,17 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
|
||||
decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup );
|
||||
|
||||
if ( decoded->data->pathlen ) {
|
||||
addpath = sizeof(BlockNumber) * decoded->data->pathlen;
|
||||
decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
|
||||
addpath = MAXALIGN( sizeof(BlockNumber) * decoded->data->pathlen );
|
||||
decoded->path = (BlockNumber*)(begin+sizeof( gistxlogPageSplit ));
|
||||
} else
|
||||
decoded->path = NULL;
|
||||
|
||||
if ( decoded->data->ntodelete ) {
|
||||
decoded->todelete = (OffsetNumber*)(begin + sizeof( gistxlogPageSplit ) + addpath);
|
||||
addpath += MAXALIGN( sizeof(OffsetNumber) * decoded->data->ntodelete );
|
||||
} else
|
||||
decoded->todelete = NULL;
|
||||
|
||||
ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
|
||||
for(i=0;i<decoded->data->nitup;i++) {
|
||||
Assert( ptr - begin < record->xl_len );
|
||||
@@ -285,19 +324,23 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
|
||||
PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
|
||||
if ( xlrec.data->ntodelete ) {
|
||||
int i;
|
||||
for(i=0; i < xlrec.data->ntodelete ; i++)
|
||||
PageIndexTupleDelete(page, xlrec.todelete[i]);
|
||||
}
|
||||
|
||||
itup = gistextractbuffer(buffer, &len);
|
||||
itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup);
|
||||
institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len );
|
||||
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
|
||||
|
||||
/* read and fill all pages */
|
||||
for(i=0;i<xlrec.data->npage;i++) {
|
||||
int j;
|
||||
NewPage *newpage = xlrec.page + i;
|
||||
|
||||
/* prepare itup vector */
|
||||
/* prepare itup vector per page */
|
||||
for(j=0;j<newpage->header->num;j++)
|
||||
institup[j] = itup[ newpage->offnum[j] - 1 ];
|
||||
|
||||
@@ -311,9 +354,9 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
|
||||
if (!BufferIsValid(newpage->buffer))
|
||||
elog(PANIC, "gistRedoPageSplitRecord: lost page");
|
||||
newpage->page = (Page) BufferGetPage(newpage->buffer);
|
||||
if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
|
||||
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
||||
ReleaseBuffer(buffer);
|
||||
if (!PageIsNew((PageHeader) newpage->page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
|
||||
LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
|
||||
ReleaseBuffer(newpage->buffer);
|
||||
newpage->is_ok=true;
|
||||
continue; /* good page */
|
||||
} else {
|
||||
@@ -350,7 +393,7 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
|
||||
if ( incomplete_inserts != NIL )
|
||||
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
|
||||
|
||||
pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
|
||||
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
|
||||
NULL, 0,
|
||||
xlrec.path, xlrec.data->pathlen,
|
||||
&xlrec);
|
||||
@@ -386,6 +429,21 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
|
||||
WriteBuffer(buffer);
|
||||
}
|
||||
|
||||
static void
|
||||
gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) {
|
||||
char *begin = XLogRecGetData(record), *ptr;
|
||||
gistxlogInsertComplete *xlrec;
|
||||
|
||||
xlrec = (gistxlogInsertComplete*)begin;
|
||||
|
||||
ptr = begin + sizeof( gistxlogInsertComplete );
|
||||
while( ptr - begin < record->xl_len ) {
|
||||
Assert( record->xl_len - (ptr - begin) >= sizeof(ItemPointerData) );
|
||||
forgetIncompleteInsert( xlrec->node, *((ItemPointerData*)ptr) );
|
||||
ptr += sizeof(ItemPointerData);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
gist_redo(XLogRecPtr lsn, XLogRecord *record)
|
||||
{
|
||||
@@ -408,8 +466,7 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
|
||||
gistRedoCreateIndex(lsn, record);
|
||||
break;
|
||||
case XLOG_GIST_INSERT_COMPLETE:
|
||||
forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node,
|
||||
((gistxlogInsertComplete*)XLogRecGetData(record))->key );
|
||||
gistRedoCompleteInsert(lsn, record);
|
||||
break;
|
||||
default:
|
||||
elog(PANIC, "gist_redo: unknown op code %u", info);
|
||||
@@ -431,16 +488,16 @@ out_target(char *buf, RelFileNode node, ItemPointerData key)
|
||||
static void
|
||||
out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
|
||||
out_target(buf, xlrec->node, xlrec->key);
|
||||
sprintf(buf + strlen(buf), "; block number %u; update offset %u;",
|
||||
xlrec->blkno, xlrec->todeleteoffnum);
|
||||
sprintf(buf + strlen(buf), "; block number %u",
|
||||
xlrec->blkno);
|
||||
}
|
||||
|
||||
static void
|
||||
out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
|
||||
strcat(buf, "page_split: ");
|
||||
out_target(buf, xlrec->node, xlrec->key);
|
||||
sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages",
|
||||
xlrec->origblkno, xlrec->todeleteoffnum,
|
||||
sprintf(buf + strlen(buf), "; block number %u; add %d tuples; split to %d pages",
|
||||
xlrec->origblkno,
|
||||
xlrec->nitup, xlrec->npage);
|
||||
}
|
||||
|
||||
@@ -472,135 +529,172 @@ gist_desc(char *buf, uint8 xl_info, char *rec)
|
||||
((RelFileNode*)rec)->relNode);
|
||||
break;
|
||||
case XLOG_GIST_INSERT_COMPLETE:
|
||||
strcat(buf, "insert_complete: ");
|
||||
out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key);
|
||||
sprintf(buf + strlen(buf), "complete_insert: rel %u/%u/%u",
|
||||
((gistxlogInsertComplete*)rec)->node.spcNode,
|
||||
((gistxlogInsertComplete*)rec)->node.dbNode,
|
||||
((gistxlogInsertComplete*)rec)->node.relNode);
|
||||
default:
|
||||
elog(PANIC, "gist_desc: unknown op code %u", info);
|
||||
}
|
||||
}
|
||||
|
||||
IndexTuple
|
||||
gist_form_invalid_tuple(BlockNumber blkno) {
|
||||
/* we don't alloc space for null's bitmap, this is invalid tuple,
|
||||
be carefull in read and write code */
|
||||
Size size = IndexInfoFindDataOffset(0);
|
||||
IndexTuple tuple=(IndexTuple)palloc0( size );
|
||||
|
||||
tuple->t_info |= size;
|
||||
|
||||
ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
|
||||
GistTupleSetInvalid( tuple );
|
||||
|
||||
return tuple;
|
||||
}
|
||||
|
||||
#ifdef GIST_INCOMPLETE_INSERT
|
||||
static void
|
||||
gistContinueInsert(gistIncompleteInsert *insert) {
|
||||
GISTSTATE giststate;
|
||||
GISTInsertState state;
|
||||
int i;
|
||||
IndexTuple *itup;
|
||||
int i, lenitup;
|
||||
MemoryContext oldCxt;
|
||||
Relation index;
|
||||
|
||||
oldCxt = MemoryContextSwitchTo(opCtx);
|
||||
|
||||
state.r = XLogOpenRelation(insert->node);
|
||||
if (!RelationIsValid(state.r))
|
||||
index = XLogOpenRelation(insert->node);
|
||||
if (!RelationIsValid(index))
|
||||
return;
|
||||
|
||||
initGISTstate(&giststate, state.r);
|
||||
elog(LOG,"Detected incomplete insert into GiST index %u/%u/%u; It's desirable to vacuum or reindex index",
|
||||
insert->node.spcNode, insert->node.dbNode, insert->node.relNode);
|
||||
|
||||
state.needInsertComplete=false;
|
||||
ItemPointerSetInvalid( &(state.key) );
|
||||
state.path=NULL;
|
||||
state.pathlen=0;
|
||||
state.xlog_mode = true;
|
||||
/* needed vector itup never will be more than initial lenblkno+2,
|
||||
because during this processing Indextuple can be only smaller */
|
||||
lenitup = insert->lenblk;
|
||||
itup = (IndexTuple*)palloc(sizeof(IndexTuple)*(lenitup+2 /*guarantee root split*/));
|
||||
|
||||
/* form union tuples */
|
||||
state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk);
|
||||
state.ituplen = insert->lenblk;
|
||||
for(i=0;i<insert->lenblk;i++) {
|
||||
int len=0;
|
||||
IndexTuple *itup;
|
||||
Buffer buffer;
|
||||
Page page;
|
||||
for(i=0;i<insert->lenblk;i++)
|
||||
itup[i] = gist_form_invalid_tuple( insert->blkno[i] );
|
||||
|
||||
buffer = XLogReadBuffer(false, state.r, insert->blkno[i]);
|
||||
if (!BufferIsValid(buffer))
|
||||
elog(PANIC, "gistContinueInsert: block unfound");
|
||||
page = (Page) BufferGetPage(buffer);
|
||||
if ( PageIsNew((PageHeader)page) )
|
||||
elog(PANIC, "gistContinueInsert: uninitialized page");
|
||||
|
||||
itup = gistextractbuffer(buffer, &len);
|
||||
state.itup[i] = gistunion(state.r, itup, len, &giststate);
|
||||
|
||||
ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber );
|
||||
|
||||
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
||||
ReleaseBuffer(buffer);
|
||||
}
|
||||
|
||||
if ( insert->pathlen==0 ) {
|
||||
if ( insert->pathlen==0 ) {
|
||||
/*it was split root, so we should only make new root*/
|
||||
gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true);
|
||||
MemoryContextSwitchTo(oldCxt);
|
||||
MemoryContextReset(opCtx);
|
||||
return;
|
||||
}
|
||||
Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
|
||||
Page page;
|
||||
|
||||
/* form stack */
|
||||
state.stack=NULL;
|
||||
for(i=0;i<insert->pathlen;i++) {
|
||||
int j,len=0;
|
||||
IndexTuple *itup;
|
||||
GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) );
|
||||
if (!BufferIsValid(buffer))
|
||||
elog(PANIC, "gistContinueInsert: root block unfound");
|
||||
|
||||
top->blkno = insert->path[i];
|
||||
top->buffer = XLogReadBuffer(false, state.r, top->blkno);
|
||||
if (!BufferIsValid(top->buffer))
|
||||
elog(PANIC, "gistContinueInsert: block unfound");
|
||||
top->page = (Page) BufferGetPage(top->buffer);
|
||||
if ( PageIsNew((PageHeader)(top->page)) )
|
||||
elog(PANIC, "gistContinueInsert: uninitialized page");
|
||||
GISTInitBuffer(buffer, 0);
|
||||
page = BufferGetPage(buffer);
|
||||
gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
|
||||
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
|
||||
WriteBuffer(buffer);
|
||||
} else {
|
||||
Buffer *buffers;
|
||||
Page *pages;
|
||||
int numbuffer;
|
||||
|
||||
buffers= (Buffer*) palloc( sizeof(Buffer) * (insert->lenblk+2/*guarantee root split*/) );
|
||||
pages = (Page*) palloc( sizeof(Page ) * (insert->lenblk+2/*guarantee root split*/) );
|
||||
|
||||
top->todelete = false;
|
||||
for(i=0;i<insert->pathlen;i++) {
|
||||
int j, k, pituplen=0, childfound=0;
|
||||
|
||||
numbuffer=1;
|
||||
buffers[numbuffer-1] = XLogReadBuffer(false, index, insert->path[i]);
|
||||
if (!BufferIsValid(buffers[numbuffer-1]))
|
||||
elog(PANIC, "gistContinueInsert: block %u unfound", insert->path[i]);
|
||||
pages[numbuffer-1] = BufferGetPage( buffers[numbuffer-1] );
|
||||
if ( PageIsNew((PageHeader)(pages[numbuffer-1])) )
|
||||
elog(PANIC, "gistContinueInsert: uninitialized page");
|
||||
|
||||
/* find childoffnum */
|
||||
itup = gistextractbuffer(top->buffer, &len);
|
||||
top->childoffnum=InvalidOffsetNumber;
|
||||
for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) {
|
||||
BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) );
|
||||
pituplen = PageGetMaxOffsetNumber(pages[numbuffer-1]);
|
||||
|
||||
if ( i==0 ) {
|
||||
int k;
|
||||
for(k=0;k<insert->lenblk;k++)
|
||||
if ( insert->blkno[k] == blkno ) {
|
||||
top->childoffnum = j+1;
|
||||
/* remove old IndexTuples */
|
||||
for(j=0;j<pituplen && childfound<lenitup;j++) {
|
||||
BlockNumber blkno;
|
||||
ItemId iid = PageGetItemId(pages[numbuffer-1], j+FirstOffsetNumber);
|
||||
IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer-1], iid);
|
||||
|
||||
blkno = ItemPointerGetBlockNumber( &(idxtup->t_tid) );
|
||||
|
||||
for(k=0;k<lenitup;k++)
|
||||
if ( ItemPointerGetBlockNumber( &(itup[k]->t_tid) ) == blkno ) {
|
||||
PageIndexTupleDelete(pages[numbuffer-1], j+FirstOffsetNumber);
|
||||
j--; pituplen--;
|
||||
childfound++;
|
||||
break;
|
||||
}
|
||||
} else if ( insert->path[i-1]==blkno )
|
||||
top->childoffnum = j+1;
|
||||
}
|
||||
}
|
||||
|
||||
if ( top->childoffnum==InvalidOffsetNumber ) {
|
||||
elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes");
|
||||
return;
|
||||
}
|
||||
if ( gistnospace(pages[numbuffer-1], itup, lenitup) ) {
|
||||
/* no space left on page, so we should split */
|
||||
buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
|
||||
if (!BufferIsValid(buffers[numbuffer]))
|
||||
elog(PANIC, "gistContinueInsert: can't create new block");
|
||||
GISTInitBuffer(buffers[numbuffer], 0);
|
||||
pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
|
||||
gistfillbuffer( index, pages[numbuffer], itup, lenitup, FirstOffsetNumber );
|
||||
numbuffer++;
|
||||
|
||||
if ( i==0 )
|
||||
PageIndexTupleDelete(top->page, top->childoffnum);
|
||||
|
||||
/* install item on right place in stack */
|
||||
top->parent=NULL;
|
||||
if ( state.stack ) {
|
||||
GISTInsertStack *ptr = state.stack;
|
||||
while( ptr->parent )
|
||||
ptr = ptr->parent;
|
||||
ptr->parent=top;
|
||||
} else
|
||||
state.stack = top;
|
||||
if ( BufferGetBlockNumber( buffers[0] ) == GIST_ROOT_BLKNO ) {
|
||||
IndexTuple *parentitup;
|
||||
|
||||
parentitup = gistextractbuffer(buffers[numbuffer-1], &pituplen);
|
||||
|
||||
/* we split root, just copy tuples from old root to new page */
|
||||
if ( i+1 != insert->pathlen )
|
||||
elog(PANIC,"gistContinueInsert: can't restore index '%s'",
|
||||
RelationGetRelationName( index ));
|
||||
|
||||
/* fill new page */
|
||||
buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
|
||||
if (!BufferIsValid(buffers[numbuffer]))
|
||||
elog(PANIC, "gistContinueInsert: can't create new block");
|
||||
GISTInitBuffer(buffers[numbuffer], 0);
|
||||
pages[numbuffer] = BufferGetPage( buffers[numbuffer] );
|
||||
gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
|
||||
numbuffer++;
|
||||
|
||||
/* fill root page */
|
||||
GISTInitBuffer(buffers[0], 0);
|
||||
for(j=1;j<numbuffer;j++) {
|
||||
IndexTuple tuple = gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
|
||||
if ( InvalidOffsetNumber == PageAddItem(pages[0],
|
||||
(Item)tuple,
|
||||
IndexTupleSize( tuple ),
|
||||
(OffsetNumber)j,
|
||||
LP_USED) )
|
||||
elog( PANIC,"gistContinueInsert: can't restore index '%s'",
|
||||
RelationGetRelationName( index ));
|
||||
}
|
||||
}
|
||||
} else
|
||||
gistfillbuffer( index, pages[numbuffer-1], itup, lenitup,
|
||||
(PageIsEmpty(pages[numbuffer-1])) ?
|
||||
FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(pages[numbuffer-1])) );
|
||||
|
||||
lenitup=numbuffer;
|
||||
for(j=0;j<numbuffer;j++) {
|
||||
itup[j]=gist_form_invalid_tuple( BufferGetBlockNumber( buffers[j] ) );
|
||||
PageSetLSN(pages[j], insert->lsn);
|
||||
PageSetTLI(pages[j], ThisTimeLineID);
|
||||
LockBuffer(buffers[j], BUFFER_LOCK_UNLOCK);
|
||||
WriteBuffer( buffers[j] );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Good. Now we can continue insert */
|
||||
|
||||
gistmakedeal(&state, &giststate);
|
||||
|
||||
MemoryContextSwitchTo(oldCxt);
|
||||
MemoryContextReset(opCtx);
|
||||
}
|
||||
#endif
|
||||
|
||||
void
|
||||
gist_xlog_startup(void) {
|
||||
incomplete_inserts=NIL;
|
||||
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"GiST insert in xlog temporary context",
|
||||
"GiST recovery temporary context",
|
||||
ALLOCSET_DEFAULT_MINSIZE,
|
||||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
@@ -613,16 +707,194 @@ gist_xlog_cleanup(void) {
|
||||
|
||||
foreach(l, incomplete_inserts) {
|
||||
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
|
||||
char buf[1024];
|
||||
|
||||
*buf='\0';
|
||||
out_target(buf, insert->node, insert->key);
|
||||
elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf);
|
||||
#ifdef GIST_INCOMPLETE_INSERT
|
||||
gistContinueInsert(insert);
|
||||
#endif
|
||||
}
|
||||
MemoryContextDelete(opCtx);
|
||||
MemoryContextDelete(insertCtx);
|
||||
}
|
||||
|
||||
|
||||
XLogRecData *
|
||||
formSplitRdata(RelFileNode node, BlockNumber blkno,
|
||||
OffsetNumber *todelete, int ntodelete,
|
||||
IndexTuple *itup, int ituplen, ItemPointer key,
|
||||
BlockNumber *path, int pathlen, SplitedPageLayout *dist ) {
|
||||
|
||||
XLogRecData *rdata;
|
||||
gistxlogPageSplit *xlrec = (gistxlogPageSplit*)palloc(sizeof(gistxlogPageSplit));
|
||||
SplitedPageLayout *ptr;
|
||||
int npage = 0, cur=1, i;
|
||||
|
||||
ptr=dist;
|
||||
while( ptr ) {
|
||||
npage++;
|
||||
ptr=ptr->next;
|
||||
}
|
||||
|
||||
rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + ituplen + 3));
|
||||
|
||||
xlrec->node = node;
|
||||
xlrec->origblkno = blkno;
|
||||
xlrec->npage = (uint16)npage;
|
||||
xlrec->nitup = (uint16)ituplen;
|
||||
xlrec->ntodelete = (uint16)ntodelete;
|
||||
xlrec->pathlen = (uint16)pathlen;
|
||||
if ( key )
|
||||
xlrec->key = *key;
|
||||
else
|
||||
ItemPointerSetInvalid( &(xlrec->key) );
|
||||
|
||||
rdata[0].buffer = InvalidBuffer;
|
||||
rdata[0].data = (char *) xlrec;
|
||||
rdata[0].len = sizeof( gistxlogPageSplit );
|
||||
rdata[0].next = NULL;
|
||||
|
||||
if ( pathlen ) {
|
||||
rdata[cur-1].next = &(rdata[cur]);
|
||||
rdata[cur].buffer = InvalidBuffer;
|
||||
rdata[cur].data = (char*)path;
|
||||
rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
|
||||
rdata[cur].next = NULL;
|
||||
cur++;
|
||||
}
|
||||
|
||||
if ( ntodelete ) {
|
||||
rdata[cur-1].next = &(rdata[cur]);
|
||||
rdata[cur].buffer = InvalidBuffer;
|
||||
rdata[cur].data = (char*)todelete;
|
||||
rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
|
||||
rdata[cur].next = NULL;
|
||||
cur++;
|
||||
}
|
||||
|
||||
/* new tuples */
|
||||
for(i=0;i<ituplen;i++) {
|
||||
rdata[cur].buffer = InvalidBuffer;
|
||||
rdata[cur].data = (char*)(itup[i]);
|
||||
rdata[cur].len = IndexTupleSize(itup[i]);
|
||||
rdata[cur].next = NULL;
|
||||
rdata[cur-1].next = &(rdata[cur]);
|
||||
cur++;
|
||||
}
|
||||
|
||||
ptr=dist;
|
||||
while(ptr) {
|
||||
rdata[cur].buffer = InvalidBuffer;
|
||||
rdata[cur].data = (char*)&(ptr->block);
|
||||
rdata[cur].len = sizeof(gistxlogPage);
|
||||
rdata[cur-1].next = &(rdata[cur]);
|
||||
cur++;
|
||||
|
||||
rdata[cur].buffer = InvalidBuffer;
|
||||
rdata[cur].data = (char*)(ptr->list);
|
||||
rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
|
||||
if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
|
||||
rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
|
||||
rdata[cur-1].next = &(rdata[cur]);
|
||||
rdata[cur].next=NULL;
|
||||
cur++;
|
||||
ptr=ptr->next;
|
||||
}
|
||||
|
||||
return rdata;
|
||||
}
|
||||
|
||||
|
||||
XLogRecData *
|
||||
formUpdateRdata(RelFileNode node, BlockNumber blkno,
|
||||
OffsetNumber *todelete, int ntodelete, bool emptypage,
|
||||
IndexTuple *itup, int ituplen, ItemPointer key,
|
||||
BlockNumber *path, int pathlen) {
|
||||
XLogRecData *rdata;
|
||||
gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate*)palloc(sizeof(gistxlogEntryUpdate));
|
||||
|
||||
xlrec->node = node;
|
||||
xlrec->blkno = blkno;
|
||||
if ( key )
|
||||
xlrec->key = *key;
|
||||
else
|
||||
ItemPointerSetInvalid( &(xlrec->key) );
|
||||
|
||||
if ( emptypage ) {
|
||||
xlrec->isemptypage = true;
|
||||
xlrec->ntodelete = 0;
|
||||
xlrec->pathlen = 0;
|
||||
|
||||
rdata = (XLogRecData*)palloc( sizeof(XLogRecData) );
|
||||
rdata->buffer = InvalidBuffer;
|
||||
rdata->data = (char*)xlrec;
|
||||
rdata->len = sizeof(gistxlogEntryUpdate);
|
||||
rdata->next = NULL;
|
||||
} else {
|
||||
int cur=1,i;
|
||||
|
||||
xlrec->isemptypage = false;
|
||||
xlrec->ntodelete = ntodelete;
|
||||
xlrec->pathlen = pathlen;
|
||||
|
||||
rdata = (XLogRecData*) palloc( sizeof(XLogRecData) * ( 3 + ituplen ) );
|
||||
|
||||
rdata->buffer = InvalidBuffer;
|
||||
rdata->data = (char*)xlrec;
|
||||
rdata->len = sizeof(gistxlogEntryUpdate);
|
||||
rdata->next = NULL;
|
||||
|
||||
if ( pathlen ) {
|
||||
rdata[cur-1].next = &(rdata[cur]);
|
||||
rdata[cur].buffer = InvalidBuffer;
|
||||
rdata[cur].data = (char*)path;
|
||||
rdata[cur].len = MAXALIGN(sizeof(BlockNumber)*pathlen);
|
||||
rdata[cur].next = NULL;
|
||||
cur++;
|
||||
}
|
||||
|
||||
if ( ntodelete ) {
|
||||
rdata[cur-1].next = &(rdata[cur]);
|
||||
rdata[cur].buffer = InvalidBuffer;
|
||||
rdata[cur].data = (char*)todelete;
|
||||
rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ntodelete);
|
||||
rdata[cur].next = NULL;
|
||||
cur++;
|
||||
}
|
||||
|
||||
/* new tuples */
|
||||
for(i=0;i<ituplen;i++) {
|
||||
rdata[cur].buffer = InvalidBuffer;
|
||||
rdata[cur].data = (char*)(itup[i]);
|
||||
rdata[cur].len = IndexTupleSize(itup[i]);
|
||||
rdata[cur].next = NULL;
|
||||
rdata[cur-1].next = &(rdata[cur]);
|
||||
cur++;
|
||||
}
|
||||
}
|
||||
|
||||
return rdata;
|
||||
}
|
||||
|
||||
XLogRecPtr
|
||||
gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) {
|
||||
gistxlogInsertComplete xlrec;
|
||||
XLogRecData rdata[2];
|
||||
XLogRecPtr recptr;
|
||||
|
||||
Assert(len>0);
|
||||
xlrec.node = node;
|
||||
|
||||
rdata[0].buffer = InvalidBuffer;
|
||||
rdata[0].data = (char *) &xlrec;
|
||||
rdata[0].len = sizeof( gistxlogInsertComplete );
|
||||
rdata[0].next = &(rdata[1]);
|
||||
|
||||
rdata[1].buffer = InvalidBuffer;
|
||||
rdata[1].data = (char *) keys;
|
||||
rdata[1].len = sizeof( ItemPointerData ) * len;
|
||||
rdata[1].next = NULL;
|
||||
|
||||
START_CRIT_SECTION();
|
||||
|
||||
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
|
||||
|
||||
END_CRIT_SECTION();
|
||||
|
||||
return recptr;
|
||||
}
|
||||
|
Reference in New Issue
Block a user