diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c index ee68ac92bbe..ecf9589ec41 100644 --- a/src/backend/access/gin/ginxlog.c +++ b/src/backend/access/gin/ginxlog.c @@ -121,22 +121,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; + /* first, forget any incomplete split this insertion completes */ + if (data->isData) + { + Assert(data->isDelete == FALSE); + if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) + { + PostingItem *pitem; + + pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); + forgetIncompleteSplit(data->node, + PostingItemGetBlockNumber(pitem), + data->updateBlkno); + } + } + else + { + if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) + { + IndexTuple itup; + + itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); + forgetIncompleteSplit(data->node, + GinItemPointerGetBlockNumber(&itup->t_tid), + data->updateBlkno); + } + } + /* nothing else to do if page was backed up */ if (record->xl_info & XLR_BKP_BLOCK_1) return; reln = XLogOpenRelation(data->node); buffer = XLogReadBuffer(reln, data->blkno, false); - Assert(BufferIsValid(buffer)); + if (!BufferIsValid(buffer)) + return; page = (Page) BufferGetPage(buffer); - if (data->isData) + if (!XLByteLE(lsn, PageGetLSN(page))) { - Assert(data->isDelete == FALSE); - Assert(GinPageIsData(page)); - - if ( ! XLByteLE(lsn, PageGetLSN(page)) ) + if (data->isData) { + Assert(GinPageIsData(page)); + if (data->isLeaf) { OffsetNumber i; @@ -166,22 +193,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) GinDataPageAddItem(page, pitem, data->offset); } } - - if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) + else { - PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno); - } + IndexTuple itup; - } - else - { - IndexTuple itup; + Assert(!GinPageIsData(page)); - Assert(!GinPageIsData(page)); - - if ( ! XLByteLE(lsn, PageGetLSN(page)) ) - { if (data->updateBlkno != InvalidBlockNumber) { /* update link to right page after split */ @@ -205,20 +222,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) data->node.spcNode, data->node.dbNode, data->node.relNode); } - if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) - { - itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno); - } - } - - if ( ! XLByteLE(lsn, PageGetLSN(page)) ) - { PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); } + UnlockReleaseBuffer(buffer); } @@ -240,7 +249,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) if (data->isData) flags |= GIN_DATA; - lbuffer = XLogReadBuffer(reln, data->lblkno, data->isRootSplit); + lbuffer = XLogReadBuffer(reln, data->lblkno, true); Assert(BufferIsValid(lbuffer)); lpage = (Page) BufferGetPage(lbuffer); GinInitBuffer(lbuffer, flags); @@ -317,7 +326,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) if (data->isRootSplit) { - Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, false); + Buffer rootBuf = XLogReadBuffer(reln, data->rootBlkno, true); Page rootPage = BufferGetPage(rootBuf); GinInitBuffer(rootBuf, flags & ~GIN_LEAF); @@ -354,46 +363,51 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; - /* nothing else to do if page was backed up (and no info to do it with) */ + /* nothing to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; reln = XLogOpenRelation(data->node); buffer = XLogReadBuffer(reln, data->blkno, false); - Assert(BufferIsValid(buffer)); + if (!BufferIsValid(buffer)) + return; page = (Page) BufferGetPage(buffer); - if (GinPageIsData(page)) + if (!XLByteLE(lsn, PageGetLSN(page))) { - memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage), - GinSizeOfItem(page) *data->nitem); - GinPageGetOpaque(page)->maxoff = data->nitem; - } - else - { - OffsetNumber i, - *tod; - IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage)); - - tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page)); - for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++) - tod[i - 1] = i; - - PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page)); - - for (i = 0; i < data->nitem; i++) + if (GinPageIsData(page)) { - if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber) - elog(ERROR, "failed to add item to index page in %u/%u/%u", - data->node.spcNode, data->node.dbNode, data->node.relNode); - itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); + memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage), + GinSizeOfItem(page) *data->nitem); + GinPageGetOpaque(page)->maxoff = data->nitem; } + else + { + OffsetNumber i, + *tod; + IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage)); + + tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page)); + for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++) + tod[i - 1] = i; + + PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page)); + + for (i = 0; i < data->nitem; i++) + { + if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, LP_USED) == InvalidOffsetNumber) + elog(ERROR, "failed to add item to index page in %u/%u/%u", + data->node.spcNode, data->node.dbNode, data->node.relNode); + itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup))); + } + } + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + + MarkBufferDirty(buffer); } - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - - MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } @@ -410,38 +424,56 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) if (!(record->xl_info & XLR_BKP_BLOCK_1)) { buffer = XLogReadBuffer(reln, data->blkno, false); - page = BufferGetPage(buffer); - Assert(GinPageIsData(page)); - GinPageGetOpaque(page)->flags = GIN_DELETED; - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + { + page = BufferGetPage(buffer); + if (!XLByteLE(lsn, PageGetLSN(page))) + { + Assert(GinPageIsData(page)); + GinPageGetOpaque(page)->flags = GIN_DELETED; + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); + } } if (!(record->xl_info & XLR_BKP_BLOCK_2)) { buffer = XLogReadBuffer(reln, data->parentBlkno, false); - page = BufferGetPage(buffer); - Assert(GinPageIsData(page)); - Assert(!GinPageIsLeaf(page)); - PageDeletePostingItem(page, data->parentOffset); - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + { + page = BufferGetPage(buffer); + if (!XLByteLE(lsn, PageGetLSN(page))) + { + Assert(GinPageIsData(page)); + Assert(!GinPageIsLeaf(page)); + PageDeletePostingItem(page, data->parentOffset); + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); + } } if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber) { buffer = XLogReadBuffer(reln, data->leftBlkno, false); - page = BufferGetPage(buffer); - Assert(GinPageIsData(page)); - GinPageGetOpaque(page)->rightlink = data->rightLink; - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (BufferIsValid(buffer)) + { + page = BufferGetPage(buffer); + if (!XLByteLE(lsn, PageGetLSN(page))) + { + Assert(GinPageIsData(page)); + GinPageGetOpaque(page)->rightlink = data->rightLink; + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + } + UnlockReleaseBuffer(buffer); + } } } @@ -559,6 +591,13 @@ ginContinueSplit(ginIncompleteSplit *split) buffer = XLogReadBuffer(reln, split->leftBlkno, false); + /* + * Failure should be impossible here, because we wrote the page earlier. + */ + if (!BufferIsValid(buffer)) + elog(PANIC, "ginContinueSplit: left block %u not found", + split->leftBlkno); + if (split->rootBlkno == GIN_ROOT_BLKNO) { prepareEntryScan(&btree, reln, (Datum) 0, NULL);