1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-06 07:49:08 +03:00
Files
postgres/src/backend/access/gin/ginbtree.c
Daniel Gustafsson 950d4a2cb1 Fix typos and duplicate words
This fixes various typos, duplicated words, and tiny bits of whitespace
mainly in code comments but also in docs.

Author: Daniel Gustafsson <daniel@yesql.se>
Author: Heikki Linnakangas <hlinnaka@iki.fi>
Author: Alexander Lakhin <exclusion@gmail.com>
Author: David Rowley <dgrowleyml@gmail.com>
Author: Nazir Bilal Yavuz <byavuz81@gmail.com>
Discussion: https://postgr.es/m/3F577953-A29E-4722-98AD-2DA9EFF2CBB8@yesql.se
2024-04-18 21:28:07 +02:00

836 lines
23 KiB
C

/*-------------------------------------------------------------------------
*
* ginbtree.c
* page utilities routines for the postgres inverted index access method.
*
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/access/gin/ginbtree.c
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/gin_private.h"
#include "access/ginxlog.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "storage/predicate.h"
#include "utils/injection_point.h"
#include "utils/memutils.h"
#include "utils/rel.h"
static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
Buffer childbuf, GinStatsData *buildStats);
static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
bool freestack, GinStatsData *buildStats);
static void ginFinishOldSplit(GinBtree btree, GinBtreeStack *stack,
GinStatsData *buildStats, int access);
/*
* Lock buffer by needed method for search.
*/
int
ginTraverseLock(Buffer buffer, bool searchMode)
{
Page page;
int access = GIN_SHARE;
LockBuffer(buffer, GIN_SHARE);
page = BufferGetPage(buffer);
if (GinPageIsLeaf(page))
{
if (searchMode == false)
{
/* we should relock our page */
LockBuffer(buffer, GIN_UNLOCK);
LockBuffer(buffer, GIN_EXCLUSIVE);
/* But root can become non-leaf during relock */
if (!GinPageIsLeaf(page))
{
/* restore old lock type (very rare) */
LockBuffer(buffer, GIN_UNLOCK);
LockBuffer(buffer, GIN_SHARE);
}
else
access = GIN_EXCLUSIVE;
}
}
return access;
}
/*
* Descend the tree to the leaf page that contains or would contain the key
* we're searching for. The key should already be filled in 'btree', in
* tree-type specific manner. If btree->fullScan is true, descends to the
* leftmost leaf page.
*
* If 'searchmode' is false, on return stack->buffer is exclusively locked,
* and the stack represents the full path to the root. Otherwise stack->buffer
* is share-locked, and stack->parent is NULL.
*
* If 'rootConflictCheck' is true, tree root is checked for serialization
* conflict.
*/
GinBtreeStack *
ginFindLeafPage(GinBtree btree, bool searchMode,
bool rootConflictCheck)
{
GinBtreeStack *stack;
stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
stack->blkno = btree->rootBlkno;
stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
stack->parent = NULL;
stack->predictNumber = 1;
if (rootConflictCheck)
CheckForSerializableConflictIn(btree->index, NULL, btree->rootBlkno);
for (;;)
{
Page page;
BlockNumber child;
int access;
stack->off = InvalidOffsetNumber;
page = BufferGetPage(stack->buffer);
access = ginTraverseLock(stack->buffer, searchMode);
/*
* If we're going to modify the tree, finish any incomplete splits we
* encounter on the way.
*/
if (!searchMode && GinPageIsIncompleteSplit(page))
ginFinishOldSplit(btree, stack, NULL, access);
/*
* ok, page is correctly locked, we should check to move right ..,
* root never has a right link, so small optimization
*/
while (btree->fullScan == false && stack->blkno != btree->rootBlkno &&
btree->isMoveRight(btree, page))
{
BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
if (rightlink == InvalidBlockNumber)
/* rightmost page */
break;
stack->buffer = ginStepRight(stack->buffer, btree->index, access);
stack->blkno = rightlink;
page = BufferGetPage(stack->buffer);
if (!searchMode && GinPageIsIncompleteSplit(page))
ginFinishOldSplit(btree, stack, NULL, access);
}
if (GinPageIsLeaf(page)) /* we found, return locked page */
return stack;
/* now we have correct buffer, try to find child */
child = btree->findChildPage(btree, stack);
LockBuffer(stack->buffer, GIN_UNLOCK);
Assert(child != InvalidBlockNumber);
Assert(stack->blkno != child);
if (searchMode)
{
/* in search mode we may forget path to leaf */
stack->blkno = child;
stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
}
else
{
GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
ptr->parent = stack;
stack = ptr;
stack->blkno = child;
stack->buffer = ReadBuffer(btree->index, stack->blkno);
stack->predictNumber = 1;
}
}
}
/*
* Step right from current page.
*
* The next page is locked first, before releasing the current page. This is
* crucial to prevent concurrent VACUUM from deleting a page that we are about
* to step to. (The lock-coupling isn't strictly necessary when we are
* traversing the tree to find an insert location, because page deletion grabs
* a cleanup lock on the root to prevent any concurrent inserts. See Page
* deletion section in the README. But there's no harm in doing it always.)
*/
Buffer
ginStepRight(Buffer buffer, Relation index, int lockmode)
{
Buffer nextbuffer;
Page page = BufferGetPage(buffer);
bool isLeaf = GinPageIsLeaf(page);
bool isData = GinPageIsData(page);
BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
nextbuffer = ReadBuffer(index, blkno);
LockBuffer(nextbuffer, lockmode);
UnlockReleaseBuffer(buffer);
/* Sanity check that the page we stepped to is of similar kind. */
page = BufferGetPage(nextbuffer);
if (isLeaf != GinPageIsLeaf(page) || isData != GinPageIsData(page))
elog(ERROR, "right sibling of GIN page is of different type");
return nextbuffer;
}
void
freeGinBtreeStack(GinBtreeStack *stack)
{
while (stack)
{
GinBtreeStack *tmp = stack->parent;
if (stack->buffer != InvalidBuffer)
ReleaseBuffer(stack->buffer);
pfree(stack);
stack = tmp;
}
}
/*
* Try to find parent for current stack position. Returns correct parent and
* child's offset in stack->parent. The root page is never released, to
* prevent conflict with vacuum process.
*/
static void
ginFindParents(GinBtree btree, GinBtreeStack *stack)
{
Page page;
Buffer buffer;
BlockNumber blkno,
leftmostBlkno;
OffsetNumber offset;
GinBtreeStack *root;
GinBtreeStack *ptr;
/*
* Unwind the stack all the way up to the root, leaving only the root
* item.
*
* Be careful not to release the pin on the root page! The pin on root
* page is required to lock out concurrent vacuums on the tree.
*/
root = stack->parent;
while (root->parent)
{
ReleaseBuffer(root->buffer);
root = root->parent;
}
Assert(root->blkno == btree->rootBlkno);
Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
root->off = InvalidOffsetNumber;
blkno = root->blkno;
buffer = root->buffer;
ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
for (;;)
{
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
if (GinPageIsLeaf(page))
elog(ERROR, "Lost path");
if (GinPageIsIncompleteSplit(page))
{
Assert(blkno != btree->rootBlkno);
ptr->blkno = blkno;
ptr->buffer = buffer;
/*
* parent may be wrong, but if so, the ginFinishSplit call will
* recurse to call ginFindParents again to fix it.
*/
ptr->parent = root;
ptr->off = InvalidOffsetNumber;
ginFinishOldSplit(btree, ptr, NULL, GIN_EXCLUSIVE);
}
leftmostBlkno = btree->getLeftMostChild(btree, page);
while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
{
blkno = GinPageGetOpaque(page)->rightlink;
if (blkno == InvalidBlockNumber)
{
/* Link not present in this level */
LockBuffer(buffer, GIN_UNLOCK);
/* Do not release pin on the root buffer */
if (buffer != root->buffer)
ReleaseBuffer(buffer);
break;
}
buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
/* finish any incomplete splits, as above */
if (GinPageIsIncompleteSplit(page))
{
Assert(blkno != btree->rootBlkno);
ptr->blkno = blkno;
ptr->buffer = buffer;
ptr->parent = root;
ptr->off = InvalidOffsetNumber;
ginFinishOldSplit(btree, ptr, NULL, GIN_EXCLUSIVE);
}
}
if (blkno != InvalidBlockNumber)
{
ptr->blkno = blkno;
ptr->buffer = buffer;
ptr->parent = root; /* it may be wrong, but in next call we will
* correct */
ptr->off = offset;
stack->parent = ptr;
return;
}
/* Descend down to next level */
blkno = leftmostBlkno;
buffer = ReadBuffer(btree->index, blkno);
}
}
/*
* Insert a new item to a page.
*
* Returns true if the insertion was finished. On false, the page was split and
* the parent needs to be updated. (A root split returns true as it doesn't
* need any further action by the caller to complete.)
*
* When inserting a downlink to an internal page, 'childbuf' contains the
* child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
* atomically with the insert. Also, the existing item at offset stack->off
* in the target page is updated to point to updateblkno.
*
* stack->buffer is locked on entry, and is kept locked.
* Likewise for childbuf, if given.
*/
static bool
ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
Buffer childbuf, GinStatsData *buildStats)
{
Page page = BufferGetPage(stack->buffer);
bool result;
GinPlaceToPageRC rc;
uint16 xlflags = 0;
Page childpage = NULL;
Page newlpage = NULL,
newrpage = NULL;
void *ptp_workspace = NULL;
MemoryContext tmpCxt;
MemoryContext oldCxt;
/*
* We do all the work of this function and its subfunctions in a temporary
* memory context. This avoids leakages and simplifies APIs, since some
* subfunctions allocate storage that has to survive until we've finished
* the WAL insertion.
*/
tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
"ginPlaceToPage temporary context",
ALLOCSET_DEFAULT_SIZES);
oldCxt = MemoryContextSwitchTo(tmpCxt);
if (GinPageIsData(page))
xlflags |= GIN_INSERT_ISDATA;
if (GinPageIsLeaf(page))
{
xlflags |= GIN_INSERT_ISLEAF;
Assert(!BufferIsValid(childbuf));
Assert(updateblkno == InvalidBlockNumber);
}
else
{
Assert(BufferIsValid(childbuf));
Assert(updateblkno != InvalidBlockNumber);
childpage = BufferGetPage(childbuf);
}
/*
* See if the incoming tuple will fit on the page. beginPlaceToPage will
* decide if the page needs to be split, and will compute the split
* contents if so. See comments for beginPlaceToPage and execPlaceToPage
* functions for more details of the API here.
*/
rc = btree->beginPlaceToPage(btree, stack->buffer, stack,
insertdata, updateblkno,
&ptp_workspace,
&newlpage, &newrpage);
if (rc == GPTP_NO_WORK)
{
/* Nothing to do */
result = true;
}
else if (rc == GPTP_INSERT)
{
/* It will fit, perform the insertion */
START_CRIT_SECTION();
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
XLogBeginInsert();
/*
* Perform the page update, dirty and register stack->buffer, and
* register any extra WAL data.
*/
btree->execPlaceToPage(btree, stack->buffer, stack,
insertdata, updateblkno, ptp_workspace);
/* An insert to an internal page finishes the split of the child. */
if (BufferIsValid(childbuf))
{
GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
MarkBufferDirty(childbuf);
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
}
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
XLogRecPtr recptr;
ginxlogInsert xlrec;
BlockIdData childblknos[2];
xlrec.flags = xlflags;
XLogRegisterData((char *) &xlrec, sizeof(ginxlogInsert));
/*
* Log information about child if this was an insertion of a
* downlink.
*/
if (BufferIsValid(childbuf))
{
BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
XLogRegisterData((char *) childblknos,
sizeof(BlockIdData) * 2);
}
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
PageSetLSN(page, recptr);
if (BufferIsValid(childbuf))
PageSetLSN(childpage, recptr);
}
END_CRIT_SECTION();
/* Insertion is complete. */
result = true;
}
else if (rc == GPTP_SPLIT)
{
/*
* Didn't fit, need to split. The split has been computed in newlpage
* and newrpage, which are pointers to palloc'd pages, not associated
* with buffers. stack->buffer is not touched yet.
*/
Buffer rbuffer;
BlockNumber savedRightLink;
ginxlogSplit data;
Buffer lbuffer = InvalidBuffer;
Page newrootpg = NULL;
/* Get a new index page to become the right page */
rbuffer = GinNewBuffer(btree->index);
/* During index build, count the new page */
if (buildStats)
{
if (btree->isData)
buildStats->nDataPages++;
else
buildStats->nEntryPages++;
}
savedRightLink = GinPageGetOpaque(page)->rightlink;
/* Begin setting up WAL record */
data.locator = btree->index->rd_locator;
data.flags = xlflags;
if (BufferIsValid(childbuf))
{
data.leftChildBlkno = BufferGetBlockNumber(childbuf);
data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
}
else
data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
if (stack->parent == NULL)
{
/*
* splitting the root, so we need to allocate new left page and
* place pointers to left and right page on root page.
*/
lbuffer = GinNewBuffer(btree->index);
/* During index build, count the new left page */
if (buildStats)
{
if (btree->isData)
buildStats->nDataPages++;
else
buildStats->nEntryPages++;
}
data.rrlink = InvalidBlockNumber;
data.flags |= GIN_SPLIT_ROOT;
GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
/*
* Construct a new root page containing downlinks to the new left
* and right pages. (Do this in a temporary copy rather than
* overwriting the original page directly, since we're not in the
* critical section yet.)
*/
newrootpg = PageGetTempPage(newrpage);
GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
btree->fillRoot(btree, newrootpg,
BufferGetBlockNumber(lbuffer), newlpage,
BufferGetBlockNumber(rbuffer), newrpage);
if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
{
PredicateLockPageSplit(btree->index,
BufferGetBlockNumber(stack->buffer),
BufferGetBlockNumber(lbuffer));
PredicateLockPageSplit(btree->index,
BufferGetBlockNumber(stack->buffer),
BufferGetBlockNumber(rbuffer));
}
}
else
{
/* splitting a non-root page */
data.rrlink = savedRightLink;
GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
{
PredicateLockPageSplit(btree->index,
BufferGetBlockNumber(stack->buffer),
BufferGetBlockNumber(rbuffer));
}
}
/*
* OK, we have the new contents of the left page in a temporary copy
* now (newlpage), and likewise for the new contents of the
* newly-allocated right block. The original page is still unchanged.
*
* If this is a root split, we also have a temporary page containing
* the new contents of the root.
*/
START_CRIT_SECTION();
MarkBufferDirty(rbuffer);
MarkBufferDirty(stack->buffer);
/*
* Restore the temporary copies over the real buffers.
*/
if (stack->parent == NULL)
{
/* Splitting the root, three pages to update */
MarkBufferDirty(lbuffer);
memcpy(page, newrootpg, BLCKSZ);
memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
}
else
{
/* Normal split, only two pages to update */
memcpy(page, newlpage, BLCKSZ);
memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
}
/* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
if (BufferIsValid(childbuf))
{
GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
MarkBufferDirty(childbuf);
}
/* write WAL record */
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
XLogRecPtr recptr;
XLogBeginInsert();
/*
* We just take full page images of all the split pages. Splits
* are uncommon enough that it's not worth complicating the code
* to be more efficient.
*/
if (stack->parent == NULL)
{
XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
}
else
{
XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
}
if (BufferIsValid(childbuf))
XLogRegisterBuffer(3, childbuf, REGBUF_STANDARD);
XLogRegisterData((char *) &data, sizeof(ginxlogSplit));
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
PageSetLSN(page, recptr);
PageSetLSN(BufferGetPage(rbuffer), recptr);
if (stack->parent == NULL)
PageSetLSN(BufferGetPage(lbuffer), recptr);
if (BufferIsValid(childbuf))
PageSetLSN(childpage, recptr);
}
END_CRIT_SECTION();
/*
* We can release the locks/pins on the new pages now, but keep
* stack->buffer locked. childbuf doesn't get unlocked either.
*/
UnlockReleaseBuffer(rbuffer);
if (stack->parent == NULL)
UnlockReleaseBuffer(lbuffer);
/*
* If we split the root, we're done. Otherwise the split is not
* complete until the downlink for the new page has been inserted to
* the parent.
*/
result = (stack->parent == NULL);
}
else
{
elog(ERROR, "invalid return code from GIN beginPlaceToPage method: %d", rc);
result = false; /* keep compiler quiet */
}
/* Clean up temp context */
MemoryContextSwitchTo(oldCxt);
MemoryContextDelete(tmpCxt);
return result;
}
/*
* Finish a split by inserting the downlink for the new page to parent.
*
* On entry, stack->buffer is exclusively locked.
*
* If freestack is true, all the buffers are released and unlocked as we
* crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
* locked, and stack is unmodified, except for possibly moving right to find
* the correct parent of page.
*/
static void
ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
GinStatsData *buildStats)
{
Page page;
bool done;
bool first = true;
/* this loop crawls up the stack until the insertion is complete */
do
{
GinBtreeStack *parent = stack->parent;
void *insertdata;
BlockNumber updateblkno;
#ifdef USE_INJECTION_POINTS
if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
INJECTION_POINT("gin-leave-leaf-split-incomplete");
else
INJECTION_POINT("gin-leave-internal-split-incomplete");
#endif
/* search parent to lock */
LockBuffer(parent->buffer, GIN_EXCLUSIVE);
/*
* If the parent page was incompletely split, finish that split first,
* then continue with the current one.
*
* Note: we have to finish *all* incomplete splits we encounter, even
* if we have to move right. Otherwise we might choose as the target a
* page that has no downlink in the parent, and splitting it further
* would fail.
*/
if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
ginFinishOldSplit(btree, parent, buildStats, GIN_EXCLUSIVE);
/* move right if it's needed */
page = BufferGetPage(parent->buffer);
while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
{
if (GinPageRightMost(page))
{
/*
* rightmost page, but we don't find parent, we should use
* plain search...
*/
LockBuffer(parent->buffer, GIN_UNLOCK);
ginFindParents(btree, stack);
parent = stack->parent;
Assert(parent != NULL);
break;
}
parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
parent->blkno = BufferGetBlockNumber(parent->buffer);
page = BufferGetPage(parent->buffer);
if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
ginFinishOldSplit(btree, parent, buildStats, GIN_EXCLUSIVE);
}
/* insert the downlink */
insertdata = btree->prepareDownlink(btree, stack->buffer);
updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
done = ginPlaceToPage(btree, parent,
insertdata, updateblkno,
stack->buffer, buildStats);
pfree(insertdata);
/*
* If the caller requested to free the stack, unlock and release the
* child buffer now. Otherwise keep it pinned and locked, but if we
* have to recurse up the tree, we can unlock the upper pages, only
* keeping the page at the bottom of the stack locked.
*/
if (!first || freestack)
LockBuffer(stack->buffer, GIN_UNLOCK);
if (freestack)
{
ReleaseBuffer(stack->buffer);
pfree(stack);
}
stack = parent;
first = false;
} while (!done);
/* unlock the parent */
LockBuffer(stack->buffer, GIN_UNLOCK);
if (freestack)
freeGinBtreeStack(stack);
}
/*
* An entry point to ginFinishSplit() that is used when we stumble upon an
* existing incompletely split page in the tree, as opposed to completing a
* split that we just made ourselves. The difference is that stack->buffer may
* be merely share-locked on entry, and will be upgraded to exclusive mode.
*
* Note: Upgrading the lock momentarily releases it. Doing that in a scan
* would not be OK, because a concurrent VACUUM might delete the page while
* we're not holding the lock. It's OK in an insert, though, because VACUUM
* has a different mechanism that prevents it from running concurrently with
* inserts. (Namely, it holds a cleanup lock on the root.)
*/
static void
ginFinishOldSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats, int access)
{
INJECTION_POINT("gin-finish-incomplete-split");
elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
stack->blkno, RelationGetRelationName(btree->index));
if (access == GIN_SHARE)
{
LockBuffer(stack->buffer, GIN_UNLOCK);
LockBuffer(stack->buffer, GIN_EXCLUSIVE);
if (!GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
{
/*
* Someone else already completed the split while we were not
* holding the lock.
*/
return;
}
}
ginFinishSplit(btree, stack, false, buildStats);
}
/*
* Insert a value to tree described by stack.
*
* The value to be inserted is given in 'insertdata'. Its format depends
* on whether this is an entry or data tree, ginInsertValue just passes it
* through to the tree-specific callback function.
*
* During an index build, buildStats is non-null and the counters it contains
* are incremented as needed.
*
* NB: the passed-in stack is freed, as though by freeGinBtreeStack.
*/
void
ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
GinStatsData *buildStats)
{
bool done;
/* If the leaf page was incompletely split, finish the split first */
if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
ginFinishOldSplit(btree, stack, buildStats, GIN_EXCLUSIVE);
done = ginPlaceToPage(btree, stack,
insertdata, InvalidBlockNumber,
InvalidBuffer, buildStats);
if (done)
{
LockBuffer(stack->buffer, GIN_UNLOCK);
freeGinBtreeStack(stack);
}
else
ginFinishSplit(btree, stack, true, buildStats);
}