diff --git a/doc/src/sgml/gist.sgml b/doc/src/sgml/gist.sgml index b8cb2914670..f6e31092aa1 100644 --- a/doc/src/sgml/gist.sgml +++ b/doc/src/sgml/gist.sgml @@ -709,33 +709,4 @@ my_distance(PG_FUNCTION_ARGS) - - Crash Recovery - - - Usually, replay of the WAL log is sufficient to restore the integrity - of a GiST index following a database crash. However, there are some - corner cases in which the index state is not fully rebuilt. The index - will still be functionally correct, but there might be some performance - degradation. When this occurs, the index can be repaired by - VACUUMing its table, or by rebuilding the index using - REINDEX. In some cases a plain VACUUM is - not sufficient, and either VACUUM FULL or REINDEX - is needed. The need for one of these procedures is indicated by occurrence - of this log message during crash recovery: - -LOG: index NNN/NNN/NNN needs VACUUM or REINDEX to finish crash recovery - - or this log message during routine index insertions: - -LOG: index "FOO" needs VACUUM or REINDEX to finish crash recovery - - If a plain VACUUM finds itself unable to complete recovery - fully, it will return a notice: - -NOTICE: index "FOO" needs VACUUM FULL or REINDEX to finish crash recovery - - - - diff --git a/src/backend/access/gist/README b/src/backend/access/gist/README index 03069f02685..2d78dcb0dfa 100644 --- a/src/backend/access/gist/README +++ b/src/backend/access/gist/README @@ -108,43 +108,71 @@ Penalty is used for choosing a subtree to insert; method PickSplit is used for the node splitting algorithm; method Union is used for propagating changes upward to maintain the tree properties. -NOTICE: We modified original INSERT algorithm for performance reason. In -particularly, it is now a single-pass algorithm. +To insert a tuple, we first have to find a suitable leaf page to insert to. +The algorithm walks down the tree, starting from the root, along the path +of smallest Penalty. At each step: -Function findLeaf is used to identify subtree for insertion. Page, in which -insertion is proceeded, is locked as well as its parent page. Functions -findParent and findPath are used to find parent pages, which could be changed -because of concurrent access. Function pageSplit is recurrent and could split -page by more than 2 pages, which could be necessary if keys have different -lengths or more than one key are inserted (in such situation, user defined -function pickSplit cannot guarantee free space on page). +1. Has this page been split since we looked at the parent? If so, it's +possible that we should be inserting to the other half instead, so retreat +back to the parent. +2. If this is a leaf node, we've found our target node. +3. Otherwise use Penalty to pick a new target subtree. +4. Check the key representing the target subtree. If it doesn't already cover +the key we're inserting, replace it with the Union of the old downlink key +and the key being inserted. (Actually, we always call Union, and just skip +the replacement if the Unioned key is the same as the existing key) +5. Replacing the key in step 4 might cause the page to be split. In that case, +propagate the change upwards and restart the algorithm from the first parent +that didn't need to be split. +6. Walk down to the target subtree, and goto 1. -findLeaf(new-key) - push(stack, [root, 0]) //page, LSN - while(true) - ptr = top of stack - latch( ptr->page, S-mode ) - ptr->lsn = ptr->page->lsn - if ( exists ptr->parent AND ptr->parent->lsn < ptr->page->nsn ) - unlatch( ptr->page ) - pop stack - else if ( ptr->page is not leaf ) - push( stack, [get_best_child(ptr->page, new-key), 0] ) - unlatch( ptr->page ) - else - unlatch( ptr->page ) - latch( ptr->page, X-mode ) - if ( ptr->page is not leaf ) - //the only root page can become a non-leaf - unlatch( ptr->page ) - else if ( ptr->parent->lsn < ptr->page->nsn ) - unlatch( ptr->page ) - pop stack - else - return stack - end - end - end +This differs from the insertion algorithm in the original paper. In the +original paper, you first walk down the tree until you reach a leaf page, and +then you adjust the downlink in the parent, and propagating the adjustment up, +all the way up to the root in the worst case. But we adjust the downlinks to +cover the new key already when we walk down, so that when we reach the leaf +page, we don't need to update the parents anymore, except to insert the +downlinks if we have to split the page. This makes crash recovery simpler: +after inserting a key to the page, the tree is immediately self-consistent +without having to update the parents. Even if we split a page and crash before +inserting the downlink to the parent, the tree is self-consistent because the +right half of the split is accessible via the rightlink of the left page +(which replaced the original page). + +Note that the algorithm can walk up and down the tree before reaching a leaf +page, if internal pages need to split while adjusting the downlinks for the +new key. Eventually, you should reach the bottom, and proceed with the +insertion of the new tuple. + +Once we've found the target page to insert to, we check if there's room +for the new tuple. If there is, the tuple is inserted, and we're done. +If it doesn't fit, however, the page needs to be split. Note that it is +possible that a page needs to be split into more than two pages, if keys have +different lengths or more than one key is being inserted at a time (which can +happen when inserting downlinks for a page split that resulted in more than +two pages at the lower level). After splitting a page, the parent page needs +to be updated. The downlink for the new page needs to be inserted, and the +downlink for the old page, which became the left half of the split, needs to +be updated to only cover those tuples that stayed on the left page. Inserting +the downlink in the parent can again lead to a page split, recursing up to the +root page in the worst case. + +gistplacetopage is the workhorse function that performs one step of the +insertion. If the tuple fits, it inserts it to the given page, otherwise +it splits the page, and constructs the new downlink tuples for the split +pages. The caller must then call gistplacetopage() on the parent page to +insert the downlink tuples. The parent page that holds the downlink to +the child might have migrated as a result of concurrent splits of the +parent, gistfindCorrectParent() is used to find the parent page. + +Splitting the root page works slightly differently. At root split, +gistplacetopage() allocates the new child pages and replaces the old root +page with the new root containing downlinks to the new children, all in one +operation. + + +findPath is a subroutine of findParent, used when the correct parent page +can't be found by following the rightlinks at the parent level: findPath( stack item ) push stack, [root, 0, 0] // page, LSN, parent @@ -165,9 +193,13 @@ findPath( stack item ) pop stack end + +gistFindCorrectParent is used to re-find the parent of a page during +insertion. It might have migrated to the right since we traversed down the +tree because of page splits. + findParent( stack item ) parent = item->parent - latch( parent->page, X-mode ) if ( parent->page->lsn != parent->lsn ) while(true) search parent tuple on parent->page, if found the return @@ -181,9 +213,13 @@ findParent( stack item ) end newstack = findPath( item->parent ) replace part of stack to new one + latch( parent->page, X-mode ) return findParent( item ) end +pageSplit function decides how to distribute keys to the new pages after +page split: + pageSplit(page, allkeys) (lkeys, rkeys) = pickSplit( allkeys ) if ( page is root ) @@ -204,39 +240,44 @@ pageSplit(page, allkeys) return newkeys -placetopage(page, keysarray) - if ( no space left on page ) - keysarray = pageSplit(page, [ extract_keys(page), keysarray]) - last page in chain gets old NSN, - original and others - new NSN equals to LSN - if ( page is root ) - make new root with keysarray - end - else - put keysarray on page - if ( length of keysarray > 1 ) - keysarray = [ union(keysarray) ] - end - end -insert(new-key) - stack = findLeaf(new-key) - keysarray = [new-key] - ptr = top of stack - while(true) - findParent( ptr ) //findParent latches parent page - keysarray = placetopage(ptr->page, keysarray) - unlatch( ptr->page ) - pop stack; - ptr = top of stack - if (length of keysarray == 1) - newboundingkey = union(oldboundingkey, keysarray) - if (newboundingkey == oldboundingkey) - unlatch ptr->page - break loop - end - end - end +Concurrency control +------------------- +As a rule of thumb, if you need to hold a lock on multiple pages at the +same time, the locks should be acquired in the following order: child page +before parent, and left-to-right at the same level. Always acquiring the +locks in the same order avoids deadlocks. + +The search algorithm only looks at and locks one page at a time. Consequently +there's a race condition between a search and a page split. A page split +happens in two phases: 1. The page is split 2. The downlink is inserted to the +parent. If a search looks at the parent page between those steps, before the +downlink is inserted, it will still find the new right half by following the +rightlink on the left half. But it must not follow the rightlink if it saw the +downlink in the parent, or the page will be visited twice! + +A split initially marks the left page with the F_FOLLOW_RIGHT flag. If a scan +sees that flag set, it knows that the right page is missing the downlink, and +should be visited too. When split inserts the downlink to the parent, it +clears the F_FOLLOW_RIGHT flag in the child, and sets the NSN field in the +child page header to match the LSN of the insertion on the parent. If the +F_FOLLOW_RIGHT flag is not set, a scan compares the NSN on the child and the +LSN it saw in the parent. If NSN < LSN, the scan looked at the parent page +before the downlink was inserted, so it should follow the rightlink. Otherwise +the scan saw the downlink in the parent page, and will/did follow that as +usual. + +A scan can't normally see a page with the F_FOLLOW_RIGHT flag set, because +a page split keeps the child pages locked until the downlink has been inserted +to the parent and the flag cleared again. But if a crash happens in the middle +of a page split, before the downlinks are inserted into the parent, that will +leave a page with F_FOLLOW_RIGHT in the tree. Scans handle that just fine, +but we'll eventually want to fix that for performance reasons. And more +importantly, dealing with pages with missing downlink pointers in the parent +would complicate the insertion algorithm. So when an insertion sees a page +with F_FOLLOW_RIGHT set, it immediately tries to bring the split that +crashed in the middle to completion by adding the downlink in the parent. + Authors: Teodor Sigaev diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index b34830bb424..7cd144e2f09 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -31,6 +31,12 @@ typedef struct MemoryContext tmpCtx; } GISTBuildState; +/* A List of these is used represent a split-in-progress. */ +typedef struct +{ + Buffer buf; /* the split page "half" */ + IndexTuple downlink; /* downlink for this half. */ +} GISTPageSplitInfo; /* non-export function prototypes */ static void gistbuildCallback(Relation index, @@ -43,8 +49,13 @@ static void gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *GISTstate); -static void gistfindleaf(GISTInsertState *state, - GISTSTATE *giststate); +static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate); +static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, + IndexTuple *tuples, int ntup, OffsetNumber oldoffnum, + Buffer leftchild); +static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, List *splitinfo); #define ROTATEDIST(d) do { \ @@ -251,41 +262,52 @@ gistinsert(PG_FUNCTION_ARGS) /* - * Workhouse routine for doing insertion into a GiST index. Note that - * this routine assumes it is invoked in a short-lived memory context, - * so it does not bother releasing palloc'd allocations. + * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple + * at that offset is atomically removed along with inserting the new tuples. + * This is used to replace a tuple with a new one. + * + * If 'leftchildbuf' is valid, we're inserting the downlink for the page + * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'. + * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set. + * + * If there is not enough room on the page, it is split. All the split + * pages are kept pinned and locked and returned in *splitinfo, the caller + * is responsible for inserting the downlinks for them. However, if + * 'buffer' is the root page and it needs to be split, gistplacetopage() + * performs the split as one atomic operation, and *splitinfo is set to NIL. + * In that case, we continue to hold the root page locked, and the child + * pages are released; note that new tuple(s) are *not* on the root page + * but in one of the new child pages. */ -static void -gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) -{ - GISTInsertState state; - - memset(&state, 0, sizeof(GISTInsertState)); - - state.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); - state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); - memcpy(state.itup[0], itup, IndexTupleSize(itup)); - state.ituplen = 1; - state.freespace = freespace; - state.r = r; - state.key = itup->t_tid; - state.needInsertComplete = true; - - state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); - state.stack->blkno = GIST_ROOT_BLKNO; - - gistfindleaf(&state, giststate); - gistmakedeal(&state, giststate); -} - static bool -gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) +gistplacetopage(GISTInsertState *state, GISTSTATE *giststate, + Buffer buffer, + IndexTuple *itup, int ntup, OffsetNumber oldoffnum, + Buffer leftchildbuf, + List **splitinfo) { - bool is_splitted = false; - bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false; + Page page = BufferGetPage(buffer); + bool is_leaf = (GistPageIsLeaf(page)) ? true : false; + XLogRecPtr recptr; + int i; + bool is_split; /* - * if (!is_leaf) remove old key: This node's key has been modified, either + * Refuse to modify a page that's incompletely split. This should + * not happen because we finish any incomplete splits while we walk + * down the tree. However, it's remotely possible that another + * concurrent inserter splits a parent page, and errors out before + * completing the split. We will just throw an error in that case, + * and leave any split we had in progress unfinished too. The next + * insert that comes along will clean up the mess. + */ + if (GistFollowRight(page)) + elog(ERROR, "concurrent GiST page split was incomplete"); + + *splitinfo = NIL; + + /* + * if isupdate, remove old key: This node's key has been modified, either * because a child split occurred or because we needed to adjust our key * for an insert in a child node. Therefore, remove the old version of * this node's key. @@ -293,77 +315,134 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) * for WAL replay, in the non-split case we handle this by setting up a * one-element todelete array; in the split case, it's handled implicitly * because the tuple vector passed to gistSplit won't include this tuple. - * - * XXX: If we want to change fillfactors between node and leaf, fillfactor - * = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor) */ - if (gistnospace(state->stack->page, state->itup, state->ituplen, - is_leaf ? InvalidOffsetNumber : state->stack->childoffnum, - state->freespace)) + is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace); + if (is_split) { /* no space for insertion */ IndexTuple *itvec; int tlen; SplitedPageLayout *dist = NULL, *ptr; - BlockNumber rrlink = InvalidBlockNumber; - GistNSN oldnsn; + BlockNumber oldrlink = InvalidBlockNumber; + GistNSN oldnsn = { 0, 0 }; + SplitedPageLayout rootpg; + BlockNumber blkno = BufferGetBlockNumber(buffer); + bool is_rootsplit; - is_splitted = true; + is_rootsplit = (blkno == GIST_ROOT_BLKNO); /* - * Form index tuples vector to split: remove old tuple if t's needed - * and add new tuples to vector + * Form index tuples vector to split. If we're replacing an old tuple, + * remove the old version from the vector. */ - itvec = gistextractpage(state->stack->page, &tlen); - if (!is_leaf) + itvec = gistextractpage(page, &tlen); + if (OffsetNumberIsValid(oldoffnum)) { /* on inner page we should remove old tuple */ - int pos = state->stack->childoffnum - FirstOffsetNumber; + int pos = oldoffnum - FirstOffsetNumber; tlen--; if (pos != tlen) memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos)); } - itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); - dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate); + itvec = gistjoinvector(itvec, &tlen, itup, ntup); + dist = gistSplit(state->r, page, itvec, tlen, giststate); - state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen); - state->ituplen = 0; - - if (state->stack->blkno != GIST_ROOT_BLKNO) + /* + * Set up pages to work with. Allocate new buffers for all but the + * leftmost page. The original page becomes the new leftmost page, + * and is just replaced with the new contents. + * + * For a root-split, allocate new buffers for all child pages, the + * original page is overwritten with new root page containing + * downlinks to the new child pages. + */ + ptr = dist; + if (!is_rootsplit) { - /* - * if non-root split then we should not allocate new buffer, but - * we must create temporary page to operate - */ - dist->buffer = state->stack->buffer; - dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer)); + /* save old rightlink and NSN */ + oldrlink = GistPageGetOpaque(page)->rightlink; + oldnsn = GistPageGetOpaque(page)->nsn; + + dist->buffer = buffer; + dist->block.blkno = BufferGetBlockNumber(buffer); + dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer)); /* clean all flags except F_LEAF */ GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0; + + ptr = ptr->next; + } + for (; ptr; ptr = ptr->next) + { + /* Allocate new page */ + ptr->buffer = gistNewBuffer(state->r); + GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); + ptr->page = BufferGetPage(ptr->buffer); + ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); } - /* make new pages and fills them */ + /* + * Now that we know whick blocks the new pages go to, set up downlink + * tuples to point to them. + */ for (ptr = dist; ptr; ptr = ptr->next) { - int i; - char *data; + ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); + GistTupleSetValid(ptr->itup); + } - /* get new page */ - if (ptr->buffer == InvalidBuffer) + /* + * If this is a root split, we construct the new root page with the + * downlinks here directly, instead of requiring the caller to insert + * them. Add the new root page to the list along with the child pages. + */ + if (is_rootsplit) + { + IndexTuple *downlinks; + int ndownlinks = 0; + int i; + + rootpg.buffer = buffer; + rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer)); + GistPageGetOpaque(rootpg.page)->flags = 0; + + /* Prepare a vector of all the downlinks */ + for (ptr = dist; ptr; ptr = ptr->next) + ndownlinks++; + downlinks = palloc(sizeof(IndexTuple) * ndownlinks); + for (i = 0, ptr = dist; ptr; ptr = ptr->next) + downlinks[i++] = ptr->itup; + + rootpg.block.blkno = GIST_ROOT_BLKNO; + rootpg.block.num = ndownlinks; + rootpg.list = gistfillitupvec(downlinks, ndownlinks, + &(rootpg.lenlist)); + rootpg.itup = NULL; + + rootpg.next = dist; + dist = &rootpg; + } + else + { + /* Prepare split-info to be returned to caller */ + for (ptr = dist; ptr; ptr = ptr->next) { - ptr->buffer = gistNewBuffer(state->r); - GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); - ptr->page = BufferGetPage(ptr->buffer); + GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo)); + si->buf = ptr->buffer; + si->downlink = ptr->itup; + *splitinfo = lappend(*splitinfo, si); } - ptr->block.blkno = BufferGetBlockNumber(ptr->buffer); + } - /* - * fill page, we can do it because all these pages are new (ie not - * linked in tree or masked by temp page - */ - data = (char *) (ptr->list); + /* + * Fill all pages. All the pages are new, ie. freshly allocated empty + * pages, or a temporary copy of the old page. + */ + for (ptr = dist; ptr; ptr = ptr->next) + { + char *data = (char *) (ptr->list); for (i = 0; i < ptr->block.num; i++) { if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber) @@ -371,276 +450,381 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) data += IndexTupleSize((IndexTuple) data); } - /* set up ItemPointer and remember it for parent */ - ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); - state->itup[state->ituplen] = ptr->itup; - state->ituplen++; - } + /* Set up rightlinks */ + if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO) + GistPageGetOpaque(ptr->page)->rightlink = + ptr->next->block.blkno; + else + GistPageGetOpaque(ptr->page)->rightlink = oldrlink; - /* saves old rightlink */ - if (state->stack->blkno != GIST_ROOT_BLKNO) - rrlink = GistPageGetOpaque(dist->page)->rightlink; + if (ptr->next && !is_rootsplit) + GistMarkFollowRight(ptr->page); + else + GistClearFollowRight(ptr->page); + + /* + * Copy the NSN of the original page to all pages. The + * F_FOLLOW_RIGHT flags ensure that scans will follow the + * rightlinks until the downlinks are inserted. + */ + GistPageGetOpaque(ptr->page)->nsn = oldnsn; + } START_CRIT_SECTION(); /* - * must mark buffers dirty before XLogInsert, even though we'll still - * be changing their opaque fields below. set up right links. + * Must mark buffers dirty before XLogInsert, even though we'll still + * be changing their opaque fields below. */ for (ptr = dist; ptr; ptr = ptr->next) - { MarkBufferDirty(ptr->buffer); - GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ? - ptr->next->block.blkno : rrlink; - } + if (BufferIsValid(leftchildbuf)) + MarkBufferDirty(leftchildbuf); - /* restore splitted non-root page */ - if (state->stack->blkno != GIST_ROOT_BLKNO) - { - PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer)); - dist->page = BufferGetPage(dist->buffer); - } + /* + * The first page in the chain was a temporary working copy meant + * to replace the old page. Copy it over the old page. + */ + PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer)); + dist->page = BufferGetPage(dist->buffer); + /* Write the WAL record */ if (RelationNeedsWAL(state->r)) - { - XLogRecPtr recptr; - XLogRecData *rdata; - - rdata = formSplitRdata(state->r->rd_node, state->stack->blkno, - is_leaf, &(state->key), dist); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); - - for (ptr = dist; ptr; ptr = ptr->next) - { - PageSetLSN(ptr->page, recptr); - PageSetTLI(ptr->page, ThisTimeLineID); - } - } + recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf, + dist, oldrlink, oldnsn, leftchildbuf); else - { - for (ptr = dist; ptr; ptr = ptr->next) - { - PageSetLSN(ptr->page, GetXLogRecPtrForTemp()); - } - } - - /* set up NSN */ - oldnsn = GistPageGetOpaque(dist->page)->nsn; - if (state->stack->blkno == GIST_ROOT_BLKNO) - /* if root split we should put initial value */ - oldnsn = PageGetLSN(dist->page); + recptr = GetXLogRecPtrForTemp(); for (ptr = dist; ptr; ptr = ptr->next) { - /* only for last set oldnsn */ - GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ? - PageGetLSN(ptr->page) : oldnsn; + PageSetLSN(ptr->page, recptr); + PageSetTLI(ptr->page, ThisTimeLineID); } /* - * release buffers, if it was a root split then release all buffers - * because we create all buffers + * Return the new child buffers to the caller. + * + * If this was a root split, we've already inserted the downlink + * pointers, in the form of a new root page. Therefore we can + * release all the new buffers, and keep just the root page locked. */ - ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next; - for (; ptr; ptr = ptr->next) - UnlockReleaseBuffer(ptr->buffer); - - if (state->stack->blkno == GIST_ROOT_BLKNO) + if (is_rootsplit) { - gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key)); - state->needInsertComplete = false; + for (ptr = dist->next; ptr; ptr = ptr->next) + UnlockReleaseBuffer(ptr->buffer); } - - END_CRIT_SECTION(); } else { - /* enough space */ + /* + * Enough space. We also get here if ntuples==0. + */ START_CRIT_SECTION(); - if (!is_leaf) - PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); - gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber); + if (OffsetNumberIsValid(oldoffnum)) + PageIndexTupleDelete(page, oldoffnum); + gistfillbuffer(page, itup, ntup, InvalidOffsetNumber); - MarkBufferDirty(state->stack->buffer); + MarkBufferDirty(buffer); + + if (BufferIsValid(leftchildbuf)) + MarkBufferDirty(leftchildbuf); if (RelationNeedsWAL(state->r)) { - OffsetNumber noffs = 0, - offs[1]; - XLogRecPtr recptr; - XLogRecData *rdata; + OffsetNumber ndeloffs = 0, + deloffs[1]; - if (!is_leaf) + if (OffsetNumberIsValid(oldoffnum)) { - /* only on inner page we should delete previous version */ - offs[0] = state->stack->childoffnum; - noffs = 1; + deloffs[0] = oldoffnum; + ndeloffs = 1; } - rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer, - offs, noffs, - state->itup, state->ituplen, - &(state->key)); + recptr = gistXLogUpdate(state->r->rd_node, buffer, + deloffs, ndeloffs, itup, ntup, + leftchildbuf); - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); - PageSetLSN(state->stack->page, recptr); - PageSetTLI(state->stack->page, ThisTimeLineID); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); } else - PageSetLSN(state->stack->page, GetXLogRecPtrForTemp()); - - if (state->stack->blkno == GIST_ROOT_BLKNO) - state->needInsertComplete = false; - - END_CRIT_SECTION(); - - if (state->ituplen > 1) - { /* previous is_splitted==true */ - - /* - * child was splited, so we must form union for insertion in - * parent - */ - IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); - - ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno); - state->itup[0] = newtup; - state->ituplen = 1; - } - else if (is_leaf) { - /* - * itup[0] store key to adjust parent, we set it to valid to - * correct check by GistTupleIsInvalid macro in gistgetadjusted() - */ - ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno); - GistTupleSetValid(state->itup[0]); + recptr = GetXLogRecPtrForTemp(); + PageSetLSN(page, recptr); } + + *splitinfo = NIL; } - return is_splitted; + + /* + * If we inserted the downlink for a child page, set NSN and clear + * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know + * to follow the rightlink if and only if they looked at the parent page + * before we inserted the downlink. + * + * Note that we do this *after* writing the WAL record. That means that + * the possible full page image in the WAL record does not include + * these changes, and they must be replayed even if the page is restored + * from the full page image. There's a chicken-and-egg problem: if we + * updated the child pages first, we wouldn't know the recptr of the WAL + * record we're about to write. + */ + if (BufferIsValid(leftchildbuf)) + { + Page leftpg = BufferGetPage(leftchildbuf); + + GistPageGetOpaque(leftpg)->nsn = recptr; + GistClearFollowRight(leftpg); + + PageSetLSN(leftpg, recptr); + PageSetTLI(leftpg, ThisTimeLineID); + } + + END_CRIT_SECTION(); + + return is_split; } /* - * returns stack of pages, all pages in stack are pinned, and - * leaf is X-locked + * Workhouse routine for doing insertion into a GiST index. Note that + * this routine assumes it is invoked in a short-lived memory context, + * so it does not bother releasing palloc'd allocations. */ - static void -gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) +gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) { ItemId iid; IndexTuple idxtuple; - GISTPageOpaque opaque; + GISTInsertStack firststack; + GISTInsertStack *stack; + GISTInsertState state; + bool xlocked = false; + + memset(&state, 0, sizeof(GISTInsertState)); + state.freespace = freespace; + state.r = r; + + /* Start from the root */ + firststack.blkno = GIST_ROOT_BLKNO; + firststack.lsn.xrecoff = 0; + firststack.parent = NULL; + state.stack = stack = &firststack; /* - * walk down, We don't lock page for a long time, but so we should be - * ready to recheck path in a bad case... We remember, that page->lsn - * should never be invalid. + * Walk down along the path of smallest penalty, updating the parent + * pointers with the key we're inserting as we go. If we crash in the + * middle, the tree is consistent, although the possible parent updates + * were a waste. */ for (;;) { - if (XLogRecPtrIsInvalid(state->stack->lsn)) - state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); - LockBuffer(state->stack->buffer, GIST_SHARE); - gistcheckpage(state->r, state->stack->buffer); + if (XLogRecPtrIsInvalid(stack->lsn)) + stack->buffer = ReadBuffer(state.r, stack->blkno); - state->stack->page = (Page) BufferGetPage(state->stack->buffer); - opaque = GistPageGetOpaque(state->stack->page); - - state->stack->lsn = PageGetLSN(state->stack->page); - Assert(!RelationNeedsWAL(state->r) || !XLogRecPtrIsInvalid(state->stack->lsn)); - - if (state->stack->blkno != GIST_ROOT_BLKNO && - XLByteLT(state->stack->parent->lsn, opaque->nsn)) + /* + * Be optimistic and grab shared lock first. Swap it for an + * exclusive lock later if we need to update the page. + */ + if (!xlocked) { - /* - * caused split non-root page is detected, go up to parent to - * choose best child - */ - UnlockReleaseBuffer(state->stack->buffer); - state->stack = state->stack->parent; + LockBuffer(stack->buffer, GIST_SHARE); + gistcheckpage(state.r, stack->buffer); + } + + stack->page = (Page) BufferGetPage(stack->buffer); + stack->lsn = PageGetLSN(stack->page); + Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn)); + + /* + * If this page was split but the downlink was never inserted to + * the parent because the inserting backend crashed before doing + * that, fix that now. + */ + if (GistFollowRight(stack->page)) + { + if (!xlocked) + { + LockBuffer(stack->buffer, GIST_UNLOCK); + LockBuffer(stack->buffer, GIST_EXCLUSIVE); + xlocked = true; + /* someone might've completed the split when we unlocked */ + if (!GistFollowRight(stack->page)) + continue; + } + gistfixsplit(&state, giststate); + + UnlockReleaseBuffer(stack->buffer); + xlocked = false; + state.stack = stack = stack->parent; continue; } - if (!GistPageIsLeaf(state->stack->page)) + if (stack->blkno != GIST_ROOT_BLKNO && + XLByteLT(stack->parent->lsn, + GistPageGetOpaque(stack->page)->nsn)) { /* - * This is an internal page, so continue to walk down the tree. We - * find the child node that has the minimum insertion penalty and - * recursively invoke ourselves to modify that node. Once the - * recursive call returns, we may need to adjust the parent node - * for two reasons: the child node split, or the key in this node - * needs to be adjusted for the newly inserted key below us. + * Concurrent split detected. There's no guarantee that the + * downlink for this page is consistent with the tuple we're + * inserting anymore, so go back to parent and rechoose the + * best child. */ - GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); + UnlockReleaseBuffer(stack->buffer); + xlocked = false; + state.stack = stack = stack->parent; + continue; + } - state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); + if (!GistPageIsLeaf(stack->page)) + { + /* + * This is an internal page so continue to walk down the tree. + * Find the child node that has the minimum insertion penalty. + */ + BlockNumber childblkno; + IndexTuple newtup; + GISTInsertStack *item; - iid = PageGetItemId(state->stack->page, state->stack->childoffnum); - idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid); - item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); - LockBuffer(state->stack->buffer, GIST_UNLOCK); + stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate); + iid = PageGetItemId(stack->page, stack->childoffnum); + idxtuple = (IndexTuple) PageGetItem(stack->page, iid); + childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); - item->parent = state->stack; - item->child = NULL; - if (state->stack) - state->stack->child = item; - state->stack = item; + /* + * Check that it's not a leftover invalid tuple from pre-9.1 + */ + if (GistTupleIsInvalid(idxtuple)) + ereport(ERROR, + (errmsg("index \"%s\" contains an inner tuple marked as invalid", + RelationGetRelationName(r)), + errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."), + errhint("Please REINDEX it."))); + + /* + * Check that the key representing the target child node is + * consistent with the key we're inserting. Update it if it's not. + */ + newtup = gistgetadjusted(state.r, idxtuple, itup, giststate); + if (newtup) + { + /* + * Swap shared lock for an exclusive one. Beware, the page + * may change while we unlock/lock the page... + */ + if (!xlocked) + { + LockBuffer(stack->buffer, GIST_UNLOCK); + LockBuffer(stack->buffer, GIST_EXCLUSIVE); + xlocked = true; + stack->page = (Page) BufferGetPage(stack->buffer); + + if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn)) + { + /* the page was changed while we unlocked it, retry */ + continue; + } + } + /* + * Update the tuple. + * + * gistinserthere() might have to split the page to make the + * updated tuple fit. It will adjust the stack so that after + * the call, we'll be holding a lock on the page containing + * the tuple, which might have moved right. + * + * Except if this causes a root split, gistinserthere() + * returns 'true'. In that case, stack only holds the new + * root, and the child page was released. Have to start + * all over. + */ + if (gistinserttuples(&state, stack, giststate, &newtup, 1, + stack->childoffnum, InvalidBuffer)) + { + UnlockReleaseBuffer(stack->buffer); + xlocked = false; + state.stack = stack = stack->parent; + continue; + } + } + LockBuffer(stack->buffer, GIST_UNLOCK); + xlocked = false; + + /* descend to the chosen child */ + item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); + item->blkno = childblkno; + item->parent = stack; + state.stack = stack = item; } else { - /* be carefull, during unlock/lock page may be changed... */ - LockBuffer(state->stack->buffer, GIST_UNLOCK); - LockBuffer(state->stack->buffer, GIST_EXCLUSIVE); - state->stack->page = (Page) BufferGetPage(state->stack->buffer); - opaque = GistPageGetOpaque(state->stack->page); + /* + * Leaf page. Insert the new key. We've already updated all the + * parents on the way down, but we might have to split the page + * if it doesn't fit. gistinserthere() will take care of that. + */ - if (state->stack->blkno == GIST_ROOT_BLKNO) + /* + * Swap shared lock for an exclusive one. Be careful, the page + * may change while we unlock/lock the page... + */ + if (!xlocked) { - /* - * the only page can become inner instead of leaf is a root - * page, so for root we should recheck it - */ - if (!GistPageIsLeaf(state->stack->page)) + LockBuffer(stack->buffer, GIST_UNLOCK); + LockBuffer(stack->buffer, GIST_EXCLUSIVE); + xlocked = true; + stack->page = (Page) BufferGetPage(stack->buffer); + stack->lsn = PageGetLSN(stack->page); + + if (stack->blkno == GIST_ROOT_BLKNO) { /* - * very rarely situation: during unlock/lock index with - * number of pages = 1 was increased + * the only page that can become inner instead of leaf + * is the root page, so for root we should recheck it */ - LockBuffer(state->stack->buffer, GIST_UNLOCK); + if (!GistPageIsLeaf(stack->page)) + { + /* + * very rare situation: during unlock/lock index with + * number of pages = 1 was increased + */ + LockBuffer(stack->buffer, GIST_UNLOCK); + xlocked = false; + continue; + } + + /* + * we don't need to check root split, because checking + * leaf/inner is enough to recognize split for root + */ + } + else if (GistFollowRight(stack->page) || + XLByteLT(stack->parent->lsn, + GistPageGetOpaque(stack->page)->nsn)) + { + /* + * The page was split while we momentarily unlocked the + * page. Go back to parent. + */ + UnlockReleaseBuffer(stack->buffer); + xlocked = false; + state.stack = stack = stack->parent; continue; } - - /* - * we don't need to check root split, because checking - * leaf/inner is enough to recognize split for root - */ - - } - else if (XLByteLT(state->stack->parent->lsn, opaque->nsn)) - { - /* - * detecting split during unlock/lock, so we should find - * better child on parent - */ - - /* forget buffer */ - UnlockReleaseBuffer(state->stack->buffer); - - state->stack = state->stack->parent; - continue; } - state->stack->lsn = PageGetLSN(state->stack->page); + /* now state.stack->(page, buffer and blkno) points to leaf page */ - /* ok we found a leaf page and it X-locked */ + gistinserttuples(&state, stack, giststate, &itup, 1, + InvalidOffsetNumber, InvalidBuffer); + LockBuffer(stack->buffer, GIST_UNLOCK); + + /* Release any pins we might still hold before exiting */ + for (; stack; stack = stack->parent) + ReleaseBuffer(stack->buffer); break; } } - - /* now state->stack->(page, buffer and blkno) points to leaf page */ } /* @@ -648,7 +832,7 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) * * returns from the beginning of closest parent; * - * To prevent deadlocks, this should lock only one page simultaneously. + * To prevent deadlocks, this should lock only one page at a time. */ GISTInsertStack * gistFindPath(Relation r, BlockNumber child) @@ -683,6 +867,13 @@ gistFindPath(Relation r, BlockNumber child) top->lsn = PageGetLSN(page); + /* + * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a + * downlink. This should not normally happen.. + */ + if (GistFollowRight(page)) + elog(ERROR, "concurrent GiST page split was incomplete"); + if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) && GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ ) { @@ -711,8 +902,6 @@ gistFindPath(Relation r, BlockNumber child) ptr = top; while (ptr->parent) { - /* set child link */ - ptr->parent->child = ptr; /* move childoffnum.. */ if (ptr == top) { @@ -754,17 +943,16 @@ gistFindPath(Relation r, BlockNumber child) return NULL; } - /* - * Returns X-locked parent of stack page + * Updates the stack so that child->parent is the correct parent of the + * child. child->parent must be exclusively locked on entry, and will + * remain so at exit, but it might not be the same page anymore. */ - static void gistFindCorrectParent(Relation r, GISTInsertStack *child) { GISTInsertStack *parent = child->parent; - LockBuffer(parent->buffer, GIST_EXCLUSIVE); gistcheckpage(r, parent->buffer); parent->page = (Page) BufferGetPage(parent->buffer); @@ -836,83 +1024,231 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child) /* install new chain of parents to stack */ child->parent = parent; - parent->child = child; /* make recursive call to normal processing */ + LockBuffer(child->parent->buffer, GIST_EXCLUSIVE); gistFindCorrectParent(r, child); } return; } -void -gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) +/* + * Form a downlink pointer for the page in 'buf'. + */ +static IndexTuple +gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate, + GISTInsertStack *stack) { - int is_splitted; - ItemId iid; - IndexTuple oldtup, - newtup; + Page page = BufferGetPage(buf); + OffsetNumber maxoff; + OffsetNumber offset; + IndexTuple downlink = NULL; - /* walk up */ - while (true) + maxoff = PageGetMaxOffsetNumber(page); + for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset)) { - /* - * After this call: 1. if child page was splited, then itup contains - * keys for each page 2. if child page wasn't splited, then itup - * contains additional for adjustment of current key - */ - - if (state->stack->parent) + IndexTuple ituple = (IndexTuple) + PageGetItem(page, PageGetItemId(page, offset)); + if (downlink == NULL) + downlink = CopyIndexTuple(ituple); + else { - /* - * X-lock parent page before proceed child, gistFindCorrectParent - * should find and lock it - */ - gistFindCorrectParent(state->r, state->stack); + IndexTuple newdownlink; + newdownlink = gistgetadjusted(rel, downlink, ituple, + giststate); + if (newdownlink) + downlink = newdownlink; } - is_splitted = gistplacetopage(state, giststate); - - /* parent locked above, so release child buffer */ - UnlockReleaseBuffer(state->stack->buffer); - - /* pop parent page from stack */ - state->stack = state->stack->parent; - - /* stack is void */ - if (!state->stack) - break; - - /* - * child did not split, so we can check is it needed to update parent - * tuple - */ - if (!is_splitted) - { - /* parent's tuple */ - iid = PageGetItemId(state->stack->page, state->stack->childoffnum); - oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); - newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); - - if (!newtup) - { /* not need to update key */ - LockBuffer(state->stack->buffer, GIST_UNLOCK); - break; - } - - state->itup[0] = newtup; - } - } /* while */ - - /* release all parent buffers */ - while (state->stack) - { - ReleaseBuffer(state->stack->buffer); - state->stack = state->stack->parent; } - /* say to xlog that insert is completed */ - if (state->needInsertComplete && RelationNeedsWAL(state->r)) - gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1); + /* + * If the page is completely empty, we can't form a meaningful + * downlink for it. But we have to insert a downlink for the page. + * Any key will do, as long as its consistent with the downlink of + * parent page, so that we can legally insert it to the parent. + * A minimal one that matches as few scans as possible would be best, + * to keep scans from doing useless work, but we don't know how to + * construct that. So we just use the downlink of the original page + * that was split - that's as far from optimal as it can get but will + * do.. + */ + if (!downlink) + { + ItemId iid; + + LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE); + gistFindCorrectParent(rel, stack); + iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum); + downlink = (IndexTuple) PageGetItem(stack->parent->page, iid); + downlink = CopyIndexTuple(downlink); + LockBuffer(stack->parent->buffer, GIST_UNLOCK); + } + + ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf)); + GistTupleSetValid(downlink); + + return downlink; +} + + +/* + * Complete the incomplete split of state->stack->page. + */ +static void +gistfixsplit(GISTInsertState *state, GISTSTATE *giststate) +{ + GISTInsertStack *stack = state->stack; + Buffer buf; + Page page; + List *splitinfo = NIL; + + elog(LOG, "fixing incomplete split in index \"%s\", block %u", + RelationGetRelationName(state->r), stack->blkno); + + Assert(GistFollowRight(stack->page)); + Assert(OffsetNumberIsValid(stack->parent->childoffnum)); + + buf = stack->buffer; + + /* + * Read the chain of split pages, following the rightlinks. Construct + * a downlink tuple for each page. + */ + for (;;) + { + GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo)); + IndexTuple downlink; + + page = BufferGetPage(buf); + + /* Form the new downlink tuples to insert to parent */ + downlink = gistformdownlink(state->r, buf, giststate, stack); + + si->buf = buf; + si->downlink = downlink; + + splitinfo = lappend(splitinfo, si); + + if (GistFollowRight(page)) + { + /* lock next page */ + buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink); + LockBuffer(buf, GIST_EXCLUSIVE); + } + else + break; + } + + /* Insert the downlinks */ + gistfinishsplit(state, stack, giststate, splitinfo); +} + +/* + * Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples + * replace an old tuple at oldoffnum. The caller must hold an exclusive lock + * on the page. + * + * If leftchild is valid, we're inserting/updating the downlink for the + * page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and + * update NSN on leftchild, atomically with the insertion of the downlink. + * + * Returns 'true' if the page had to be split. On return, we will continue + * to hold an exclusive lock on state->stack->buffer, but if we had to split + * the page, it might not contain the tuple we just inserted/updated. + */ +static bool +gistinserttuples(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, + IndexTuple *tuples, int ntup, OffsetNumber oldoffnum, + Buffer leftchild) +{ + List *splitinfo; + bool is_split; + + is_split = gistplacetopage(state, giststate, stack->buffer, + tuples, ntup, oldoffnum, + leftchild, + &splitinfo); + if (splitinfo) + gistfinishsplit(state, stack, giststate, splitinfo); + + return is_split; +} + +/* + * Finish an incomplete split by inserting/updating the downlinks in + * parent page. 'splitinfo' contains all the child pages, exclusively-locked, + * involved in the split, from left-to-right. + */ +static void +gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack, + GISTSTATE *giststate, List *splitinfo) +{ + ListCell *lc; + List *reversed; + GISTPageSplitInfo *right; + GISTPageSplitInfo *left; + IndexTuple tuples[2]; + + /* A split always contains at least two halves */ + Assert(list_length(splitinfo) >= 2); + + /* + * We need to insert downlinks for each new page, and update the + * downlink for the original (leftmost) page in the split. Begin at + * the rightmost page, inserting one downlink at a time until there's + * only two pages left. Finally insert the downlink for the last new + * page and update the downlink for the original page as one operation. + */ + + /* for convenience, create a copy of the list in reverse order */ + reversed = NIL; + foreach(lc, splitinfo) + { + reversed = lcons(lfirst(lc), reversed); + } + + LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE); + gistFindCorrectParent(state->r, stack); + + while(list_length(reversed) > 2) + { + right = (GISTPageSplitInfo *) linitial(reversed); + left = (GISTPageSplitInfo *) lsecond(reversed); + + if (gistinserttuples(state, stack->parent, giststate, + &right->downlink, 1, + InvalidOffsetNumber, + left->buf)) + { + /* + * If the parent page was split, need to relocate the original + * parent pointer. + */ + gistFindCorrectParent(state->r, stack); + } + UnlockReleaseBuffer(right->buf); + reversed = list_delete_first(reversed); + } + + right = (GISTPageSplitInfo *) linitial(reversed); + left = (GISTPageSplitInfo *) lsecond(reversed); + + /* + * Finally insert downlink for the remaining right page and update the + * downlink for the original page to not contain the tuples that were + * moved to the new pages. + */ + tuples[0] = left->downlink; + tuples[1] = right->downlink; + gistinserttuples(state, stack->parent, giststate, + tuples, 2, + stack->parent->childoffnum, + left->buf); + LockBuffer(stack->parent->buffer, GIST_UNLOCK); + UnlockReleaseBuffer(right->buf); + Assert(left->buf == stack->buffer); } /* @@ -963,8 +1299,7 @@ gistSplit(Relation r, ROTATEDIST(res); res->block.num = v.splitVector.spl_nright; res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist)); - res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false) - : gist_form_invalid_tuple(GIST_ROOT_BLKNO); + res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false); } if (!gistfitpage(lvectup, v.splitVector.spl_nleft)) @@ -986,50 +1321,12 @@ gistSplit(Relation r, ROTATEDIST(res); res->block.num = v.splitVector.spl_nleft; res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist)); - res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false) - : gist_form_invalid_tuple(GIST_ROOT_BLKNO); + res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false); } return res; } -/* - * buffer must be pinned and locked by caller - */ -void -gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key) -{ - Page page; - - Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO); - page = BufferGetPage(buffer); - - START_CRIT_SECTION(); - - GISTInitBuffer(buffer, 0); - gistfillbuffer(page, itup, len, FirstOffsetNumber); - - MarkBufferDirty(buffer); - - if (RelationNeedsWAL(r)) - { - XLogRecPtr recptr; - XLogRecData *rdata; - - rdata = formUpdateRdata(r->rd_node, buffer, - NULL, 0, - itup, len, key); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); - PageSetLSN(page, recptr); - PageSetTLI(page, ThisTimeLineID); - } - else - PageSetLSN(page, GetXLogRecPtrForTemp()); - - END_CRIT_SECTION(); -} - /* * Fill a GISTSTATE with information about the index */ diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 56921cfee01..5061003a3bf 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -254,9 +254,15 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances, page = BufferGetPage(buffer); opaque = GistPageGetOpaque(page); - /* check if page split occurred since visit to parent */ + /* + * Check if we need to follow the rightlink. We need to follow it if the + * page was concurrently split since we visited the parent (in which case + * parentlsn < nsn), or if the the system crashed after a page split but + * before the downlink was inserted into the parent. + */ if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) && - XLByteLT(pageItem->data.parentlsn, opaque->nsn) && + (GistFollowRight(page) || + XLByteLT(pageItem->data.parentlsn, opaque->nsn)) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* There was a page split, follow right link to add pages */ diff --git a/src/backend/access/gist/gistsplit.c b/src/backend/access/gist/gistsplit.c index 0ce317cdbc8..ac9a6578414 100644 --- a/src/backend/access/gist/gistsplit.c +++ b/src/backend/access/gist/gistsplit.c @@ -499,58 +499,6 @@ gistSplitHalf(GIST_SPLITVEC *v, int len) v->spl_left[v->spl_nleft++] = i; } -/* - * if it was invalid tuple then we need special processing. - * We move all invalid tuples on right page. - * - * if there is no place on left page, gistSplit will be called one more - * time for left page. - * - * Normally, we never exec this code, but after crash replay it's possible - * to get 'invalid' tuples (probability is low enough) - */ -static void -gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len) -{ - int i; - static OffsetNumber offInvTuples[MaxOffsetNumber]; - int nOffInvTuples = 0; - - for (i = 1; i <= len; i++) - if (GistTupleIsInvalid(itup[i - 1])) - offInvTuples[nOffInvTuples++] = i; - - if (nOffInvTuples == len) - { - /* corner case, all tuples are invalid */ - v->spl_rightvalid = v->spl_leftvalid = false; - gistSplitHalf(&v->splitVector, len); - } - else - { - GistSplitUnion gsvp; - - v->splitVector.spl_right = offInvTuples; - v->splitVector.spl_nright = nOffInvTuples; - v->spl_rightvalid = false; - - v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber)); - v->splitVector.spl_nleft = 0; - for (i = 1; i <= len; i++) - if (!GistTupleIsInvalid(itup[i - 1])) - v->splitVector.spl_left[v->splitVector.spl_nleft++] = i; - v->spl_leftvalid = true; - - gsvp.equiv = NULL; - gsvp.attr = v->spl_lattr; - gsvp.len = v->splitVector.spl_nleft; - gsvp.entries = v->splitVector.spl_left; - gsvp.isnull = v->spl_lisnull; - - gistunionsubkeyvec(giststate, itup, &gsvp, 0); - } -} - /* * trys to split page by attno key, in a case of null * values move its to separate page. @@ -568,12 +516,6 @@ gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist Datum datum; bool IsNull; - if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) - { - gistSplitByInvalid(giststate, v, itup, len); - return; - } - datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, attno, &(entryvec->vector[i]), datum, r, page, i, @@ -582,8 +524,6 @@ gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist offNullTuples[nOffNullTuples++] = i; } - v->spl_leftvalid = v->spl_rightvalid = true; - if (nOffNullTuples == len) { /* diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c index d488bbb4200..8a0d3568604 100644 --- a/src/backend/access/gist/gistutil.c +++ b/src/backend/access/gist/gistutil.c @@ -152,7 +152,7 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) * invalid tuple. Resulting Datums aren't compressed. */ -bool +void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull) { @@ -180,10 +180,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke Datum datum; bool IsNull; - if (GistTupleIsInvalid(itvec[j])) - return FALSE; /* signals that union with invalid tuple => - * result is invalid */ - datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); if (IsNull) continue; @@ -218,8 +214,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke isnull[i] = FALSE; } } - - return TRUE; } /* @@ -231,8 +225,7 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts); - if (!gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS)) - return gist_form_invalid_tuple(InvalidBlockNumber); + gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS); return gistFormTuple(giststate, r, attrS, isnullS, false); } @@ -328,9 +321,6 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis IndexTuple newtup = NULL; int i; - if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup)) - return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid))); - gistDeCompressAtt(giststate, r, oldtup, NULL, (OffsetNumber) 0, oldentries, oldisnull); @@ -401,14 +391,6 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ int j; IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup)) - { - ereport(LOG, - (errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery", - RelationGetRelationName(r)))); - continue; - } - sum_grow = 0; for (j = 0; j < r->rd_att->natts; j++) { @@ -521,7 +503,11 @@ gistFormTuple(GISTSTATE *giststate, Relation r, } res = index_form_tuple(giststate->tupdesc, compatt, isnull); - GistTupleSetValid(res); + /* + * The offset number on tuples on internal pages is unused. For historical + * reasons, it is set 0xffff. + */ + ItemPointerSetOffsetNumber( &(res->t_tid), 0xffff); return res; } diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index e02e72d4313..86af414dced 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -26,13 +26,6 @@ #include "utils/memutils.h" -typedef struct GistBulkDeleteResult -{ - IndexBulkDeleteResult std; /* common state */ - bool needReindex; -} GistBulkDeleteResult; - - /* * VACUUM cleanup: update FSM */ @@ -40,7 +33,7 @@ Datum gistvacuumcleanup(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); - GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1); + IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); Relation rel = info->index; BlockNumber npages, blkno; @@ -56,10 +49,10 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) /* Set up all-zero stats if gistbulkdelete wasn't called */ if (stats == NULL) { - stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult)); + stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); /* use heap's tuple count */ - stats->std.num_index_tuples = info->num_heap_tuples; - stats->std.estimated_count = info->estimated_count; + stats->num_index_tuples = info->num_heap_tuples; + stats->estimated_count = info->estimated_count; /* * XXX the above is wrong if index is partial. Would it be OK to just @@ -67,11 +60,6 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) */ } - if (stats->needReindex) - ereport(NOTICE, - (errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery", - RelationGetRelationName(rel)))); - /* * Need lock unless it's local to this backend. */ @@ -112,10 +100,10 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) IndexFreeSpaceMapVacuum(info->index); /* return statistics */ - stats->std.pages_free = totFreePages; + stats->pages_free = totFreePages; if (needLock) LockRelationForExtension(rel, ExclusiveLock); - stats->std.num_pages = RelationGetNumberOfBlocks(rel); + stats->num_pages = RelationGetNumberOfBlocks(rel); if (needLock) UnlockRelationForExtension(rel, ExclusiveLock); @@ -135,7 +123,7 @@ pushStackIfSplited(Page page, GistBDItem *stack) GISTPageOpaque opaque = GistPageGetOpaque(page); if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) && - XLByteLT(stack->parentlsn, opaque->nsn) && + (GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) && opaque->rightlink != InvalidBlockNumber /* sanity check */ ) { /* split page detected, install right link to the stack */ @@ -162,7 +150,7 @@ Datum gistbulkdelete(PG_FUNCTION_ARGS) { IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); - GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1); + IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2); void *callback_state = (void *) PG_GETARG_POINTER(3); Relation rel = info->index; @@ -171,10 +159,10 @@ gistbulkdelete(PG_FUNCTION_ARGS) /* first time through? */ if (stats == NULL) - stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult)); + stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); /* we'll re-count the tuples each time */ - stats->std.estimated_count = false; - stats->std.num_index_tuples = 0; + stats->estimated_count = false; + stats->num_index_tuples = 0; stack = (GistBDItem *) palloc0(sizeof(GistBDItem)); stack->blkno = GIST_ROOT_BLKNO; @@ -232,10 +220,10 @@ gistbulkdelete(PG_FUNCTION_ARGS) { todelete[ntodelete] = i - ntodelete; ntodelete++; - stats->std.tuples_removed += 1; + stats->tuples_removed += 1; } else - stats->std.num_index_tuples += 1; + stats->num_index_tuples += 1; } if (ntodelete) @@ -250,22 +238,13 @@ gistbulkdelete(PG_FUNCTION_ARGS) if (RelationNeedsWAL(rel)) { - XLogRecData *rdata; XLogRecPtr recptr; - gistxlogPageUpdate *xlinfo; - rdata = formUpdateRdata(rel->rd_node, buffer, + recptr = gistXLogUpdate(rel->rd_node, buffer, todelete, ntodelete, - NULL, 0, - NULL); - xlinfo = (gistxlogPageUpdate *) rdata->next->data; - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); + NULL, 0, InvalidBuffer); PageSetLSN(page, recptr); PageSetTLI(page, ThisTimeLineID); - - pfree(xlinfo); - pfree(rdata); } else PageSetLSN(page, GetXLogRecPtrForTemp()); @@ -293,7 +272,11 @@ gistbulkdelete(PG_FUNCTION_ARGS) stack->next = ptr; if (GistTupleIsInvalid(idxtuple)) - stats->needReindex = true; + ereport(LOG, + (errmsg("index \"%s\" contains an inner tuple marked as invalid", + RelationGetRelationName(rel)), + errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."), + errhint("Please REINDEX it."))); } } diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index a90303e547b..7d25728d8d8 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -20,15 +20,6 @@ #include "utils/memutils.h" #include "utils/rel.h" - -typedef struct -{ - gistxlogPageUpdate *data; - int len; - IndexTuple *itup; - OffsetNumber *todelete; -} PageUpdateRecord; - typedef struct { gistxlogPage *header; @@ -41,144 +32,37 @@ typedef struct NewPage *page; } PageSplitRecord; -/* track for incomplete inserts, idea was taken from nbtxlog.c */ - -typedef struct gistIncompleteInsert -{ - RelFileNode node; - BlockNumber origblkno; /* for splits */ - ItemPointerData key; - int lenblk; - BlockNumber *blkno; - XLogRecPtr lsn; - BlockNumber *path; - int pathlen; -} gistIncompleteInsert; - - static MemoryContext opCtx; /* working memory for operations */ -static MemoryContext insertCtx; /* holds incomplete_inserts list */ -static List *incomplete_inserts; - - -#define ItemPointerEQ(a, b) \ - ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ - ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) ) - +/* + * Replay the clearing of F_FOLLOW_RIGHT flag. + */ static void -pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, - BlockNumber *blkno, int lenblk, - PageSplitRecord *xlinfo /* to extract blkno info */ ) +gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn, + BlockNumber leftblkno) { - MemoryContext oldCxt; - gistIncompleteInsert *ninsert; + Buffer buffer; - if (!ItemPointerIsValid(&key)) + buffer = XLogReadBuffer(node, leftblkno, false); + if (BufferIsValid(buffer)) + { + Page page = (Page) BufferGetPage(buffer); /* - * if key is null then we should not store insertion as incomplete, - * because it's a vacuum operation.. + * Note that we still update the page even if page LSN is equal to the + * LSN of this record, because the updated NSN is not included in the + * full page image. */ - return; - - oldCxt = MemoryContextSwitchTo(insertCtx); - ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert)); - - ninsert->node = node; - ninsert->key = key; - ninsert->lsn = lsn; - - if (lenblk && blkno) - { - ninsert->lenblk = lenblk; - ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); - memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk); - ninsert->origblkno = *blkno; - } - else - { - int i; - - Assert(xlinfo); - ninsert->lenblk = xlinfo->data->npage; - ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk); - for (i = 0; i < ninsert->lenblk; i++) - ninsert->blkno[i] = xlinfo->page[i].header->blkno; - ninsert->origblkno = xlinfo->data->origblkno; - } - Assert(ninsert->lenblk > 0); - - /* - * Stick the new incomplete insert onto the front of the list, not the - * back. This is so that gist_xlog_cleanup will process incompletions in - * last-in-first-out order. - */ - incomplete_inserts = lcons(ninsert, incomplete_inserts); - - MemoryContextSwitchTo(oldCxt); -} - -static void -forgetIncompleteInsert(RelFileNode node, ItemPointerData key) -{ - ListCell *l; - - if (!ItemPointerIsValid(&key)) - return; - - if (incomplete_inserts == NIL) - return; - - foreach(l, incomplete_inserts) - { - gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); - - if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key))) + if (!XLByteLT(lsn, PageGetLSN(page))) { - /* found */ - incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); - pfree(insert->blkno); - pfree(insert); - break; + GistPageGetOpaque(page)->nsn = lsn; + GistClearFollowRight(page); + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); } - } -} - -static void -decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) -{ - char *begin = XLogRecGetData(record), - *ptr; - int i = 0, - addpath = 0; - - decoded->data = (gistxlogPageUpdate *) begin; - - if (decoded->data->ntodelete) - { - decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath); - addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete); - } - else - decoded->todelete = NULL; - - decoded->len = 0; - ptr = begin + sizeof(gistxlogPageUpdate) + addpath; - while (ptr - begin < record->xl_len) - { - decoded->len++; - ptr += IndexTupleSize((IndexTuple) ptr); - } - - decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len); - - ptr = begin + sizeof(gistxlogPageUpdate) + addpath; - while (ptr - begin < record->xl_len) - { - decoded->itup[i] = (IndexTuple) ptr; - ptr += IndexTupleSize(decoded->itup[i]); - i++; + UnlockReleaseBuffer(buffer); } } @@ -186,29 +70,22 @@ decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) * redo any page update (except page split) */ static void -gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) +gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) { - gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record); - PageUpdateRecord xlrec; + char *begin = XLogRecGetData(record); + gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin; Buffer buffer; Page page; + char *data; - /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */ - forgetIncompleteInsert(xldata->node, xldata->key); + if (BlockNumberIsValid(xldata->leftchild)) + gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); - if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO) - /* operation with root always finalizes insertion */ - pushIncompleteInsert(xldata->node, lsn, xldata->key, - &(xldata->blkno), 1, - NULL); - - /* nothing else to do if page was backed up (and no info to do it with) */ + /* nothing more to do if page was backed up (and no info to do it with) */ if (record->xl_info & XLR_BKP_BLOCK_1) return; - decodePageUpdateRecord(&xlrec, record); - - buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false); + buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); if (!BufferIsValid(buffer)) return; page = (Page) BufferGetPage(buffer); @@ -219,28 +96,49 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) return; } - if (isnewroot) - GISTInitBuffer(buffer, 0); - else if (xlrec.data->ntodelete) + data = begin + sizeof(gistxlogPageUpdate); + + /* Delete old tuples */ + if (xldata->ntodelete > 0) { int i; + OffsetNumber *todelete = (OffsetNumber *) data; + data += sizeof(OffsetNumber) * xldata->ntodelete; - for (i = 0; i < xlrec.data->ntodelete; i++) - PageIndexTupleDelete(page, xlrec.todelete[i]); + for (i = 0; i < xldata->ntodelete; i++) + PageIndexTupleDelete(page, todelete[i]); if (GistPageIsLeaf(page)) GistMarkTuplesDeleted(page); } /* add tuples */ - if (xlrec.len > 0) - gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber); + if (data - begin < record->xl_len) + { + OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); + while (data - begin < record->xl_len) + { + IndexTuple itup = (IndexTuple) data; + Size sz = IndexTupleSize(itup); + OffsetNumber l; + data += sz; - /* - * special case: leafpage, nothing to insert, nothing to delete, then - * vacuum marks page - */ - if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0) - GistClearTuplesDeleted(page); + l = PageAddItem(page, (Item) itup, sz, off, false, false); + if (l == InvalidOffsetNumber) + elog(ERROR, "failed to add item to GiST index page, size %d bytes", + (int) sz); + off++; + } + } + else + { + /* + * special case: leafpage, nothing to insert, nothing to delete, then + * vacuum marks page + */ + if (GistPageIsLeaf(page) && xldata->ntodelete == 0) + GistClearTuplesDeleted(page); + } if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) @@ -315,41 +213,67 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) static void gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) { + gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record); PageSplitRecord xlrec; Buffer buffer; Page page; int i; - int flags; + bool isrootsplit = false; + if (BlockNumberIsValid(xldata->leftchild)) + gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); decodePageSplitRecord(&xlrec, record); - flags = xlrec.data->origleaf ? F_LEAF : 0; /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { NewPage *newpage = xlrec.page + i; + int flags; + + if (newpage->header->blkno == GIST_ROOT_BLKNO) + { + Assert(i == 0); + isrootsplit = true; + } buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); /* ok, clear buffer */ + if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO) + flags = F_LEAF; + else + flags = 0; GISTInitBuffer(buffer, flags); /* and fill it */ gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); + if (newpage->header->blkno == GIST_ROOT_BLKNO) + { + GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; + GistPageGetOpaque(page)->nsn = xldata->orignsn; + GistClearFollowRight(page); + } + else + { + if (i < xlrec.data->npage - 1) + GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno; + else + GistPageGetOpaque(page)->rightlink = xldata->origrlink; + GistPageGetOpaque(page)->nsn = xldata->orignsn; + if (i < xlrec.data->npage - 1 && !isrootsplit) + GistMarkFollowRight(page); + else + GistClearFollowRight(page); + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); MarkBufferDirty(buffer); UnlockReleaseBuffer(buffer); } - - forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); - - pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, - NULL, 0, - &xlrec); } static void @@ -372,24 +296,6 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); } -static void -gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record) -{ - char *begin = XLogRecGetData(record), - *ptr; - gistxlogInsertComplete *xlrec; - - xlrec = (gistxlogInsertComplete *) begin; - - ptr = begin + sizeof(gistxlogInsertComplete); - while (ptr - begin < record->xl_len) - { - Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData)); - forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr)); - ptr += sizeof(ItemPointerData); - } -} - void gist_redo(XLogRecPtr lsn, XLogRecord *record) { @@ -401,30 +307,23 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) * implement a similar optimization we have in b-tree, and remove killed * tuples outside VACUUM, we'll need to handle that here. */ - RestoreBkpBlocks(lsn, record, false); oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { case XLOG_GIST_PAGE_UPDATE: - gistRedoPageUpdateRecord(lsn, record, false); + gistRedoPageUpdateRecord(lsn, record); break; case XLOG_GIST_PAGE_DELETE: gistRedoPageDeleteRecord(lsn, record); break; - case XLOG_GIST_NEW_ROOT: - gistRedoPageUpdateRecord(lsn, record, true); - break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); break; case XLOG_GIST_CREATE_INDEX: gistRedoCreateIndex(lsn, record); break; - case XLOG_GIST_INSERT_COMPLETE: - gistRedoCompleteInsert(lsn, record); - break; default: elog(PANIC, "gist_redo: unknown op code %u", info); } @@ -434,20 +333,16 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) } static void -out_target(StringInfo buf, RelFileNode node, ItemPointerData key) +out_target(StringInfo buf, RelFileNode node) { appendStringInfo(buf, "rel %u/%u/%u", node.spcNode, node.dbNode, node.relNode); - if (ItemPointerIsValid(&key)) - appendStringInfo(buf, "; tid %u/%u", - ItemPointerGetBlockNumber(&key), - ItemPointerGetOffsetNumber(&key)); } static void out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { - out_target(buf, xlrec->node, xlrec->key); + out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u", xlrec->blkno); } @@ -463,7 +358,7 @@ static void out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) { appendStringInfo(buf, "page_split: "); - out_target(buf, xlrec->node, xlrec->key); + out_target(buf, xlrec->node); appendStringInfo(buf, "; block number %u splits to %d pages", xlrec->origblkno, xlrec->npage); } @@ -482,10 +377,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) case XLOG_GIST_PAGE_DELETE: out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec); break; - case XLOG_GIST_NEW_ROOT: - appendStringInfo(buf, "new_root: "); - out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key); - break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); break; @@ -495,415 +386,102 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) ((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->relNode); break; - case XLOG_GIST_INSERT_COMPLETE: - appendStringInfo(buf, "complete_insert: rel %u/%u/%u", - ((gistxlogInsertComplete *) rec)->node.spcNode, - ((gistxlogInsertComplete *) rec)->node.dbNode, - ((gistxlogInsertComplete *) rec)->node.relNode); - break; default: appendStringInfo(buf, "unknown gist op code %u", info); break; } } -IndexTuple -gist_form_invalid_tuple(BlockNumber blkno) -{ - /* - * we don't alloc space for null's bitmap, this is invalid tuple, be - * carefull in read and write code - */ - Size size = IndexInfoFindDataOffset(0); - IndexTuple tuple = (IndexTuple) palloc0(size); - - tuple->t_info |= size; - - ItemPointerSetBlockNumber(&(tuple->t_tid), blkno); - GistTupleSetInvalid(tuple); - - return tuple; -} - - -static void -gistxlogFindPath(Relation index, gistIncompleteInsert *insert) -{ - GISTInsertStack *top; - - insert->pathlen = 0; - insert->path = NULL; - - if ((top = gistFindPath(index, insert->origblkno)) != NULL) - { - int i; - GISTInsertStack *ptr; - - for (ptr = top; ptr; ptr = ptr->parent) - insert->pathlen++; - - insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen); - - i = 0; - for (ptr = top; ptr; ptr = ptr->parent) - insert->path[i++] = ptr->blkno; - } - else - elog(ERROR, "lost parent for block %u", insert->origblkno); -} - -static SplitedPageLayout * -gistMakePageLayout(Buffer *buffers, int nbuffers) -{ - SplitedPageLayout *res = NULL, - *resptr; - - while (nbuffers-- > 0) - { - Page page = BufferGetPage(buffers[nbuffers]); - IndexTuple *vec; - int veclen; - - resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout)); - - resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]); - resptr->block.num = PageGetMaxOffsetNumber(page); - - vec = gistextractpage(page, &veclen); - resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist)); - - resptr->next = res; - res = resptr; - } - - return res; -} - -/* - * Continue insert after crash. In normal situations, there aren't any - * incomplete inserts, but if a crash occurs partway through an insertion - * sequence, we'll need to finish making the index valid at the end of WAL - * replay. - * - * Note that we assume the index is now in a valid state, except for the - * unfinished insertion. In particular it's safe to invoke gistFindPath(); - * there shouldn't be any garbage pages for it to run into. - * - * To complete insert we can't use basic insertion algorithm because - * during insertion we can't call user-defined support functions of opclass. - * So, we insert 'invalid' tuples without real key and do it by separate algorithm. - * 'invalid' tuple should be updated by vacuum full. - */ -static void -gistContinueInsert(gistIncompleteInsert *insert) -{ - IndexTuple *itup; - int i, - lenitup; - Relation index; - - index = CreateFakeRelcacheEntry(insert->node); - - /* - * needed vector itup never will be more than initial lenblkno+2, because - * during this processing Indextuple can be only smaller - */ - lenitup = insert->lenblk; - itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->lenblk; i++) - itup[i] = gist_form_invalid_tuple(insert->blkno[i]); - - /* - * any insertion of itup[] should make LOG message about - */ - - if (insert->origblkno == GIST_ROOT_BLKNO) - { - /* - * it was split root, so we should only make new root. it can't be - * simple insert into root, we should replace all content of root. - */ - Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true); - - gistnewroot(index, buffer, itup, lenitup, NULL); - UnlockReleaseBuffer(buffer); - } - else - { - Buffer *buffers; - Page *pages; - int numbuffer; - OffsetNumber *todelete; - - /* construct path */ - gistxlogFindPath(index, insert); - - Assert(insert->pathlen > 0); - - buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ )); - pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ )); - todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ )); - - for (i = 0; i < insert->pathlen; i++) - { - int j, - k, - pituplen = 0; - uint8 xlinfo; - XLogRecData *rdata; - XLogRecPtr recptr; - Buffer tempbuffer = InvalidBuffer; - int ntodelete = 0; - - numbuffer = 1; - buffers[0] = ReadBuffer(index, insert->path[i]); - LockBuffer(buffers[0], GIST_EXCLUSIVE); - - /* - * we check buffer, because we restored page earlier - */ - gistcheckpage(index, buffers[0]); - - pages[0] = BufferGetPage(buffers[0]); - Assert(!GistPageIsLeaf(pages[0])); - - pituplen = PageGetMaxOffsetNumber(pages[0]); - - /* find remove old IndexTuples to remove */ - for (j = 0; j < pituplen && ntodelete < lenitup; j++) - { - BlockNumber blkno; - ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber); - IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid); - - blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid)); - - for (k = 0; k < lenitup; k++) - if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno) - { - todelete[ntodelete] = j + FirstOffsetNumber - ntodelete; - ntodelete++; - break; - } - } - - if (ntodelete == 0) - elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)"); - - /* - * we check space with subtraction only first tuple to delete, - * hope, that wiil be enough space.... - */ - - if (gistnospace(pages[0], itup, lenitup, *todelete, 0)) - { - - /* no space left on page, so we must split */ - buffers[numbuffer] = ReadBuffer(index, P_NEW); - LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); - GISTInitBuffer(buffers[numbuffer], 0); - pages[numbuffer] = BufferGetPage(buffers[numbuffer]); - gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber); - numbuffer++; - - if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) - { - Buffer tmp; - - /* - * we split root, just copy content from root to new page - */ - - /* sanity check */ - if (i + 1 != insert->pathlen) - elog(PANIC, "unexpected pathlen in index \"%s\"", - RelationGetRelationName(index)); - - /* fill new page, root will be changed later */ - tempbuffer = ReadBuffer(index, P_NEW); - LockBuffer(tempbuffer, GIST_EXCLUSIVE); - memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer)); - - /* swap buffers[0] (was root) and temp buffer */ - tmp = buffers[0]; - buffers[0] = tempbuffer; - tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, - * it is still unchanged */ - - pages[0] = BufferGetPage(buffers[0]); - } - - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - - xlinfo = XLOG_GIST_PAGE_SPLIT; - rdata = formSplitRdata(index->rd_node, insert->path[i], - false, &(insert->key), - gistMakePageLayout(buffers, numbuffer)); - - } - else - { - START_CRIT_SECTION(); - - for (j = 0; j < ntodelete; j++) - PageIndexTupleDelete(pages[0], todelete[j]); - gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber); - - xlinfo = XLOG_GIST_PAGE_UPDATE; - rdata = formUpdateRdata(index->rd_node, buffers[0], - todelete, ntodelete, - itup, lenitup, &(insert->key)); - } - - /* - * use insert->key as mark for completion of insert (form*Rdata() - * above) for following possible replays - */ - - /* write pages, we should mark it dirty befor XLogInsert() */ - for (j = 0; j < numbuffer; j++) - { - GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; - MarkBufferDirty(buffers[j]); - } - recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata); - for (j = 0; j < numbuffer; j++) - { - PageSetLSN(pages[j], recptr); - PageSetTLI(pages[j], ThisTimeLineID); - } - - END_CRIT_SECTION(); - - lenitup = numbuffer; - for (j = 0; j < numbuffer; j++) - { - itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); - UnlockReleaseBuffer(buffers[j]); - } - - if (tempbuffer != InvalidBuffer) - { - /* - * it was a root split, so fill it by new values - */ - gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key)); - UnlockReleaseBuffer(tempbuffer); - } - } - } - - FreeFakeRelcacheEntry(index); - - ereport(LOG, - (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery", - insert->node.spcNode, insert->node.dbNode, insert->node.relNode), - errdetail("Incomplete insertion detected during crash replay."))); -} - void gist_xlog_startup(void) { - incomplete_inserts = NIL; - insertCtx = AllocSetContextCreate(CurrentMemoryContext, - "GiST recovery temporary context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); opCtx = createTempGistContext(); } void gist_xlog_cleanup(void) { - ListCell *l; - MemoryContext oldCxt; - - oldCxt = MemoryContextSwitchTo(opCtx); - - foreach(l, incomplete_inserts) - { - gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); - - gistContinueInsert(insert); - MemoryContextReset(opCtx); - } - MemoryContextSwitchTo(oldCxt); - MemoryContextDelete(opCtx); - MemoryContextDelete(insertCtx); -} - -bool -gist_safe_restartpoint(void) -{ - if (incomplete_inserts) - return false; - return true; -} - - -XLogRecData * -formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, - ItemPointer key, SplitedPageLayout *dist) -{ - XLogRecData *rdata; - gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); - SplitedPageLayout *ptr; - int npage = 0, - cur = 1; - - ptr = dist; - while (ptr) - { - npage++; - ptr = ptr->next; - } - - rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); - - xlrec->node = node; - xlrec->origblkno = blkno; - xlrec->origleaf = page_is_leaf; - xlrec->npage = (uint16) npage; - if (key) - xlrec->key = *key; - else - ItemPointerSetInvalid(&(xlrec->key)); - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) xlrec; - rdata[0].len = sizeof(gistxlogPageSplit); - rdata[0].next = NULL; - - ptr = dist; - while (ptr) - { - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char *) &(ptr->block); - rdata[cur].len = sizeof(gistxlogPage); - rdata[cur - 1].next = &(rdata[cur]); - cur++; - - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char *) (ptr->list); - rdata[cur].len = ptr->lenlist; - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].next = NULL; - cur++; - ptr = ptr->next; - } - - return rdata; } /* - * Construct the rdata array for an XLOG record describing a page update - * (deletion and/or insertion of tuples on a single index page). + * Write WAL record of a page split. + */ +XLogRecPtr +gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf, + SplitedPageLayout *dist, + BlockNumber origrlink, GistNSN orignsn, + Buffer leftchildbuf) +{ + XLogRecData *rdata; + gistxlogPageSplit xlrec; + SplitedPageLayout *ptr; + int npage = 0, + cur; + XLogRecPtr recptr; + + for (ptr = dist; ptr; ptr = ptr->next) + npage++; + + rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); + + xlrec.node = node; + xlrec.origblkno = blkno; + xlrec.origrlink = origrlink; + xlrec.orignsn = orignsn; + xlrec.origleaf = page_is_leaf; + xlrec.npage = (uint16) npage; + xlrec.leftchild = + BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; + + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(gistxlogPageSplit); + rdata[0].buffer = InvalidBuffer; + + cur = 1; + + /* + * Include a full page image of the child buf. (only necessary if a + * checkpoint happened since the child page was split) + */ + if (BufferIsValid(leftchildbuf)) + { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = NULL; + rdata[cur].len = 0; + rdata[cur].buffer = leftchildbuf; + rdata[cur].buffer_std = true; + cur++; + } + + for (ptr = dist; ptr; ptr = ptr->next) + { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char *) &(ptr->block); + rdata[cur].len = sizeof(gistxlogPage); + cur++; + + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char *) (ptr->list); + rdata[cur].len = ptr->lenlist; + cur++; + } + rdata[cur - 1].next = NULL; + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); + + pfree(rdata); + return recptr; +} + +/* + * Write XLOG record describing a page update. The update can include any + * number of deletions and/or insertions of tuples on a single index page. + * + * If this update inserts a downlink for a split page, also record that + * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set. * * Note that both the todelete array and the tuples are marked as belonging * to the target buffer; they need not be stored in XLOG if XLogInsert decides @@ -911,27 +489,26 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, * at least one rdata item referencing the buffer, even when ntodelete and * ituplen are both zero; this ensures that XLogInsert knows about the buffer. */ -XLogRecData * -formUpdateRdata(RelFileNode node, Buffer buffer, - OffsetNumber *todelete, int ntodelete, - IndexTuple *itup, int ituplen, ItemPointer key) +XLogRecPtr +gistXLogUpdate(RelFileNode node, Buffer buffer, + OffsetNumber *todelete, int ntodelete, + IndexTuple *itup, int ituplen, + Buffer leftchildbuf) { XLogRecData *rdata; gistxlogPageUpdate *xlrec; int cur, i; + XLogRecPtr recptr; - rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen)); + rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntodelete = ntodelete; - - if (key) - xlrec->key = *key; - else - ItemPointerSetInvalid(&(xlrec->key)); + xlrec->leftchild = + BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; rdata[0].buffer = buffer; rdata[0].buffer_std = true; @@ -945,13 +522,13 @@ formUpdateRdata(RelFileNode node, Buffer buffer, rdata[1].next = &(rdata[2]); rdata[2].data = (char *) todelete; - rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); + rdata[2].len = sizeof(OffsetNumber) * ntodelete; rdata[2].buffer = buffer; rdata[2].buffer_std = true; - rdata[2].next = NULL; + + cur = 3; /* new tuples */ - cur = 3; for (i = 0; i < ituplen; i++) { rdata[cur - 1].next = &(rdata[cur]); @@ -959,38 +536,26 @@ formUpdateRdata(RelFileNode node, Buffer buffer, rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].buffer = buffer; rdata[cur].buffer_std = true; - rdata[cur].next = NULL; cur++; } - return rdata; -} + /* + * Include a full page image of the child buf. (only necessary if + * a checkpoint happened since the child page was split) + */ + if (BufferIsValid(leftchildbuf)) + { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = NULL; + rdata[cur].len = 0; + rdata[cur].buffer = leftchildbuf; + rdata[cur].buffer_std = true; + cur++; + } + rdata[cur - 1].next = NULL; -XLogRecPtr -gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) -{ - gistxlogInsertComplete xlrec; - XLogRecData rdata[2]; - XLogRecPtr recptr; - - Assert(len > 0); - xlrec.node = node; - - rdata[0].buffer = InvalidBuffer; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(gistxlogInsertComplete); - rdata[0].next = &(rdata[1]); - - rdata[1].buffer = InvalidBuffer; - rdata[1].data = (char *) keys; - rdata[1].len = sizeof(ItemPointerData) * len; - rdata[1].next = NULL; - - START_CRIT_SECTION(); - - recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata); - - END_CRIT_SECTION(); + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); + pfree(rdata); return recptr; } diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index c706e979567..f8f797c33d3 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -40,6 +40,6 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = { {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, {"Hash", hash_redo, hash_desc, NULL, NULL, NULL}, {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint}, - {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint}, + {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL}, {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL} }; diff --git a/src/include/access/gist.h b/src/include/access/gist.h index 01fddb94af9..39132182041 100644 --- a/src/include/access/gist.h +++ b/src/include/access/gist.h @@ -58,9 +58,10 @@ /* * Page opaque data in a GiST index page. */ -#define F_LEAF (1 << 0) -#define F_DELETED (1 << 1) -#define F_TUPLES_DELETED (1 << 2) +#define F_LEAF (1 << 0) /* leaf page */ +#define F_DELETED (1 << 1) /* the page has been deleted */ +#define F_TUPLES_DELETED (1 << 2) /* some tuples on the page are dead */ +#define F_FOLLOW_RIGHT (1 << 3) /* page to the right has no downlink */ typedef XLogRecPtr GistNSN; @@ -132,6 +133,10 @@ typedef struct GISTENTRY #define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED) #define GistClearTuplesDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED) +#define GistFollowRight(page) ( GistPageGetOpaque(page)->flags & F_FOLLOW_RIGHT) +#define GistMarkFollowRight(page) ( GistPageGetOpaque(page)->flags |= F_FOLLOW_RIGHT) +#define GistClearFollowRight(page) ( GistPageGetOpaque(page)->flags &= ~F_FOLLOW_RIGHT) + /* * Vector of GISTENTRY structs; user-defined methods union and picksplit * take it as one of their arguments diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h index 058435cfe56..1bacb468ee6 100644 --- a/src/include/access/gist_private.h +++ b/src/include/access/gist_private.h @@ -132,9 +132,9 @@ typedef GISTScanOpaqueData *GISTScanOpaque; /* XLog stuff */ #define XLOG_GIST_PAGE_UPDATE 0x00 -#define XLOG_GIST_NEW_ROOT 0x20 +/* #define XLOG_GIST_NEW_ROOT 0x20 */ /* not used anymore */ #define XLOG_GIST_PAGE_SPLIT 0x30 -#define XLOG_GIST_INSERT_COMPLETE 0x40 +/* #define XLOG_GIST_INSERT_COMPLETE 0x40 */ /* not used anymore */ #define XLOG_GIST_CREATE_INDEX 0x50 #define XLOG_GIST_PAGE_DELETE 0x60 @@ -144,9 +144,10 @@ typedef struct gistxlogPageUpdate BlockNumber blkno; /* - * It used to identify completeness of insert. Sets to leaf itup + * If this operation completes a page split, by inserting a downlink for + * the split page, leftchild points to the left half of the split. */ - ItemPointerData key; + BlockNumber leftchild; /* number of deleted offsets */ uint16 ntodelete; @@ -160,11 +161,12 @@ typedef struct gistxlogPageSplit { RelFileNode node; BlockNumber origblkno; /* splitted page */ + BlockNumber origrlink; /* rightlink of the page before split */ + GistNSN orignsn; /* NSN of the page before split */ bool origleaf; /* was splitted page a leaf page? */ - uint16 npage; - /* see comments on gistxlogPageUpdate */ - ItemPointerData key; + BlockNumber leftchild; /* like in gistxlogPageUpdate */ + uint16 npage; /* # of pages in the split */ /* * follow: 1. gistxlogPage and array of IndexTupleData per page @@ -177,12 +179,6 @@ typedef struct gistxlogPage int num; /* number of index tuples following */ } gistxlogPage; -typedef struct gistxlogInsertComplete -{ - RelFileNode node; - /* follows ItemPointerData key to clean */ -} gistxlogInsertComplete; - typedef struct gistxlogPageDelete { RelFileNode node; @@ -206,7 +202,6 @@ typedef struct SplitedPageLayout * GISTInsertStack used for locking buffers and transfer arguments during * insertion */ - typedef struct GISTInsertStack { /* current page */ @@ -215,7 +210,7 @@ typedef struct GISTInsertStack Page page; /* - * log sequence number from page->lsn to recognize page update and + * log sequence number from page->lsn to recognize page update and * compare it with page's nsn to recognize page split */ GistNSN lsn; @@ -223,9 +218,8 @@ typedef struct GISTInsertStack /* child's offset */ OffsetNumber childoffnum; - /* pointer to parent and child */ + /* pointer to parent */ struct GISTInsertStack *parent; - struct GISTInsertStack *child; /* for gistFindPath */ struct GISTInsertStack *next; @@ -238,12 +232,10 @@ typedef struct GistSplitVector Datum spl_lattr[INDEX_MAX_KEYS]; /* Union of subkeys in * spl_left */ bool spl_lisnull[INDEX_MAX_KEYS]; - bool spl_leftvalid; Datum spl_rattr[INDEX_MAX_KEYS]; /* Union of subkeys in * spl_right */ bool spl_risnull[INDEX_MAX_KEYS]; - bool spl_rightvalid; bool *spl_equiv; /* equivalent tuples which can be freely * distributed between left and right pages */ @@ -252,28 +244,40 @@ typedef struct GistSplitVector typedef struct { Relation r; - IndexTuple *itup; /* in/out, points to compressed entry */ - int ituplen; /* length of itup */ Size freespace; /* free space to be left */ - GISTInsertStack *stack; - bool needInsertComplete; - /* pointer to heap tuple */ - ItemPointerData key; + GISTInsertStack *stack; } GISTInsertState; /* root page of a gist index */ #define GIST_ROOT_BLKNO 0 /* - * mark tuples on inner pages during recovery + * Before PostgreSQL 9.1, we used rely on so-called "invalid tuples" on inner + * pages to finish crash recovery of incomplete page splits. If a crash + * happened in the middle of a page split, so that the downlink pointers were + * not yet inserted, crash recovery inserted a special downlink pointer. The + * semantics of an invalid tuple was that it if you encounter one in a scan, + * it must always be followed, because we don't know if the tuples on the + * child page match or not. + * + * We no longer create such invalid tuples, we now mark the left-half of such + * an incomplete split with the F_FOLLOW_RIGHT flag instead, and finish the + * split properly the next time we need to insert on that page. To retain + * on-disk compatibility for the sake of pg_upgrade, we still store 0xffff as + * the offset number of all inner tuples. If we encounter any invalid tuples + * with 0xfffe during insertion, we throw an error, though scans still handle + * them. You should only encounter invalid tuples if you pg_upgrade a pre-9.1 + * gist index which already has invalid tuples in it because of a crash. That + * should be rare, and you are recommended to REINDEX anyway if you have any + * invalid tuples in an index, so throwing an error is as far as we go with + * supporting that. */ #define TUPLE_IS_VALID 0xffff #define TUPLE_IS_INVALID 0xfffe #define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID ) #define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID ) -#define GistTupleSetInvalid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID ) /* gist.c */ extern Datum gistbuild(PG_FUNCTION_ARGS); @@ -281,8 +285,6 @@ extern Datum gistinsert(PG_FUNCTION_ARGS); extern MemoryContext createTempGistContext(void); extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void freeGISTstate(GISTSTATE *giststate); -extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate); -extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key); extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate); @@ -294,18 +296,17 @@ extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec); extern void gist_xlog_startup(void); extern void gist_xlog_cleanup(void); -extern bool gist_safe_restartpoint(void); -extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno); -extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer, - OffsetNumber *todelete, int ntodelete, - IndexTuple *itup, int ituplen, ItemPointer key); +extern XLogRecPtr gistXLogUpdate(RelFileNode node, Buffer buffer, + OffsetNumber *todelete, int ntodelete, + IndexTuple *itup, int ntup, + Buffer leftchild); -extern XLogRecData *formSplitRdata(RelFileNode node, - BlockNumber blkno, bool page_is_leaf, - ItemPointer key, SplitedPageLayout *dist); - -extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len); +extern XLogRecPtr gistXLogSplit(RelFileNode node, + BlockNumber blkno, bool page_is_leaf, + SplitedPageLayout *dist, + BlockNumber origrlink, GistNSN oldnsn, + Buffer leftchild); /* gistget.c */ extern Datum gistgettuple(PG_FUNCTION_ARGS); @@ -357,7 +358,7 @@ extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, extern float gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *key1, bool isNull1, GISTENTRY *key2, bool isNull2); -extern bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, +extern void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull); extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b); extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,