From d84a6a5516eda38c52c3a864f95d13c43a6eae3f Mon Sep 17 00:00:00 2001 From: Teodor Sigaev Date: Mon, 29 Oct 2007 19:27:21 +0000 Subject: [PATCH] - Add check of already changed page while replay WAL. This touches only ginRedoInsert(), because other ginRedo* functions rewrite whole page or make changes which could be applied several times without consistent's loss - Remove check of identifying of corresponding split record: it's possible that replaying of WAL starts after actual page split, but before removing of that split from incomplete splits list. In this case, that check cause FATAL error. Per stress test which reproduces bug reported by Craig McElroy --- src/backend/access/gin/ginxlog.c | 120 ++++++++++++++++--------------- 1 file changed, 64 insertions(+), 56 deletions(-) diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c index 36cefa22abf..ee68ac92bbe 100644 --- a/src/backend/access/gin/ginxlog.c +++ b/src/backend/access/gin/ginxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.5.2.1 2007/06/04 15:59:20 teodor Exp $ + * $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.5.2.2 2007/10/29 19:27:21 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -53,7 +53,6 @@ static void forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno) { ListCell *l; - bool found = false; foreach(l, incomplete_splits) { @@ -62,16 +61,9 @@ forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updat if (RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno) { incomplete_splits = list_delete_ptr(incomplete_splits, split); - found = true; break; } } - - if (!found) - { - elog(ERROR, "failed to identify corresponding split record for %u/%u/%u", - node.relNode, leftBlkno, updateBlkno); - } } static void @@ -129,7 +121,7 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; - /* nothing else to do if page was backed up (and no info to do it with) */ + /* nothing else to do if page was backed up */ if (record->xl_info & XLR_BKP_BLOCK_1) return; @@ -143,37 +135,44 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) Assert(data->isDelete == FALSE); Assert(GinPageIsData(page)); - if (data->isLeaf) + if ( ! XLByteLE(lsn, PageGetLSN(page)) ) { - OffsetNumber i; - ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - - Assert(GinPageIsLeaf(page)); - Assert(data->updateBlkno == InvalidBlockNumber); - - for (i = 0; i < data->nitem; i++) - GinDataPageAddItem(page, items + i, data->offset + i); - } - else - { - PostingItem *pitem; - - Assert(!GinPageIsLeaf(page)); - - if (data->updateBlkno != InvalidBlockNumber) + if (data->isLeaf) { - /* update link to right page after split */ - pitem = (PostingItem *) GinDataPageGetItem(page, data->offset); - PostingItemSetBlockNumber(pitem, data->updateBlkno); + OffsetNumber i; + ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); + + Assert(GinPageIsLeaf(page)); + Assert(data->updateBlkno == InvalidBlockNumber); + + for (i = 0; i < data->nitem; i++) + GinDataPageAddItem(page, items + i, data->offset + i); } + else + { + PostingItem *pitem; - pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); + Assert(!GinPageIsLeaf(page)); - GinDataPageAddItem(page, pitem, data->offset); + if (data->updateBlkno != InvalidBlockNumber) + { + /* update link to right page after split */ + pitem = (PostingItem *) GinDataPageGetItem(page, data->offset); + PostingItemSetBlockNumber(pitem, data->updateBlkno); + } - if (data->updateBlkno != InvalidBlockNumber) - forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno); + pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); + + GinDataPageAddItem(page, pitem, data->offset); + } } + + if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) + { + PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); + forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno); + } + } else { @@ -181,36 +180,45 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) Assert(!GinPageIsData(page)); - if (data->updateBlkno != InvalidBlockNumber) + if ( ! XLByteLE(lsn, PageGetLSN(page)) ) { - /* update link to right page after split */ - Assert(!GinPageIsLeaf(page)); - Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page)); - itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset)); - ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber); + if (data->updateBlkno != InvalidBlockNumber) + { + /* update link to right page after split */ + Assert(!GinPageIsLeaf(page)); + Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page)); + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset)); + ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber); + } + + if (data->isDelete) + { + Assert(GinPageIsLeaf(page)); + Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page)); + PageIndexTupleDelete(page, data->offset); + } + + itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); + + if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, LP_USED) == InvalidOffsetNumber) + elog(ERROR, "failed to add item to index page in %u/%u/%u", + data->node.spcNode, data->node.dbNode, data->node.relNode); } - if (data->isDelete) - { - Assert(GinPageIsLeaf(page)); - Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page)); - PageIndexTupleDelete(page, data->offset); - } - - itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); - - if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, LP_USED) == InvalidOffsetNumber) - elog(ERROR, "failed to add item to index page in %u/%u/%u", - data->node.spcNode, data->node.dbNode, data->node.relNode); - if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) + { + itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno); + } } - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); + if ( ! XLByteLE(lsn, PageGetLSN(page)) ) + { + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); + MarkBufferDirty(buffer); + } UnlockReleaseBuffer(buffer); }