1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

Support INCLUDE'd columns in SP-GiST.

Not much to say here: does what it says on the tin.
We steal a previously-always-zero bit from the nextOffset
field of leaf index tuples in order to track whether there
is a nulls bitmap.  Otherwise it works about like included
columns in other index types.

Pavel Borisov, reviewed by Andrey Borodin and Anastasia Lubennikova,
and rather heavily editorialized on by me

Discussion: https://postgr.es/m/CALT9ZEFi-vMp4faht9f9Junb1nO3NOSjhpxTmbm1UGLMsLqiEQ@mail.gmail.com
This commit is contained in:
Tom Lane
2021-04-05 18:41:09 -04:00
parent 49f49defe7
commit 09c1c6ab4b
21 changed files with 630 additions and 232 deletions

View File

@ -56,7 +56,7 @@ list and there is no free space on page, then SP-GiST creates a new inner
tuple and distributes leaf tuples into a set of lists on, perhaps, several
pages.
Inner tuple consists of:
An inner tuple consists of:
optional prefix value - all successors must be consistent with it.
Example:
@ -67,14 +67,26 @@ Inner tuple consists of:
list of nodes, where node is a (label, pointer) pair.
Example of a label: a single character for radix tree
Leaf tuple consists of:
A leaf tuple consists of:
a leaf value
Example:
radix tree - the rest of string (postfix)
quad and k-d tree - the point itself
ItemPointer to the heap
ItemPointer to the corresponding heap tuple
nextOffset number of next leaf tuple in a chain on a leaf page
optional nulls bitmask
optional INCLUDE-column values
For compatibility with pre-v14 indexes, a leaf tuple has a nulls bitmask
only if there are null values (among the leaf value and the INCLUDE values)
*and* there is at least one INCLUDE column. The null-ness of the leaf
value can be inferred from whether the tuple is on a "nulls page" (see below)
so it is not necessary to represent it explicitly. But we include it anyway
in a bitmask used with INCLUDE values, so that standard tuple deconstruction
code can be used.
NULLS HANDLING

View File

@ -220,7 +220,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
SpGistBlockIsRoot(current->blkno))
{
/* Tuple is not part of a chain */
leafTuple->nextOffset = InvalidOffsetNumber;
SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
current->offnum = SpGistPageAddNewItem(state, current->page,
(Item) leafTuple, leafTuple->size,
NULL, false);
@ -253,7 +253,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
PageGetItemId(current->page, current->offnum));
if (head->tupstate == SPGIST_LIVE)
{
leafTuple->nextOffset = head->nextOffset;
SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
offnum = SpGistPageAddNewItem(state, current->page,
(Item) leafTuple, leafTuple->size,
NULL, false);
@ -264,14 +264,14 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
*/
head = (SpGistLeafTuple) PageGetItem(current->page,
PageGetItemId(current->page, current->offnum));
head->nextOffset = offnum;
SGLT_SET_NEXTOFFSET(head, offnum);
xlrec.offnumLeaf = offnum;
xlrec.offnumHeadLeaf = current->offnum;
}
else if (head->tupstate == SPGIST_DEAD)
{
leafTuple->nextOffset = InvalidOffsetNumber;
SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
PageIndexTupleDelete(current->page, current->offnum);
if (PageAddItem(current->page,
(Item) leafTuple, leafTuple->size,
@ -362,13 +362,13 @@ checkSplitConditions(Relation index, SpGistState *state,
{
/* We could see a DEAD tuple as first/only chain item */
Assert(i == current->offnum);
Assert(it->nextOffset == InvalidOffsetNumber);
Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
/* Don't count it in result, because it won't go to other page */
}
else
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
i = it->nextOffset;
i = SGLT_GET_NEXTOFFSET(it);
}
*nToSplit = n;
@ -437,7 +437,7 @@ moveLeafs(Relation index, SpGistState *state,
{
/* We could see a DEAD tuple as first/only chain item */
Assert(i == current->offnum);
Assert(it->nextOffset == InvalidOffsetNumber);
Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
/* We don't want to move it, so don't count it in size */
toDelete[nDelete] = i;
nDelete++;
@ -446,7 +446,7 @@ moveLeafs(Relation index, SpGistState *state,
else
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
i = it->nextOffset;
i = SGLT_GET_NEXTOFFSET(it);
}
/* Find a leaf page that will hold them */
@ -475,7 +475,7 @@ moveLeafs(Relation index, SpGistState *state,
* don't care). We're modifying the tuple on the source page
* here, but it's okay since we're about to delete it.
*/
it->nextOffset = r;
SGLT_SET_NEXTOFFSET(it, r);
r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
&startOffset, false);
@ -490,7 +490,7 @@ moveLeafs(Relation index, SpGistState *state,
}
/* add the new tuple as well */
newLeafTuple->nextOffset = r;
SGLT_SET_NEXTOFFSET(newLeafTuple, r);
r = SpGistPageAddNewItem(state, npage,
(Item) newLeafTuple, newLeafTuple->size,
&startOffset, false);
@ -690,14 +690,16 @@ doPickSplit(Relation index, SpGistState *state,
*nodes;
Buffer newInnerBuffer,
newLeafBuffer;
ItemPointerData *heapPtrs;
uint8 *leafPageSelect;
int *leafSizes;
OffsetNumber *toDelete;
OffsetNumber *toInsert;
OffsetNumber redirectTuplePos = InvalidOffsetNumber;
OffsetNumber startOffsets[2];
SpGistLeafTuple *oldLeafs;
SpGistLeafTuple *newLeafs;
Datum leafDatums[INDEX_MAX_KEYS];
bool leafIsnulls[INDEX_MAX_KEYS];
int spaceToDelete;
int currentFreeSpace;
int totalLeafSizes;
@ -718,9 +720,9 @@ doPickSplit(Relation index, SpGistState *state,
max = PageGetMaxOffsetNumber(current->page);
n = max + 1;
in.datums = (Datum *) palloc(sizeof(Datum) * n);
heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
@ -752,7 +754,7 @@ doPickSplit(Relation index, SpGistState *state,
{
in.datums[nToInsert] =
isNulls ? (Datum) 0 : SGLTDATUM(it, state);
heapPtrs[nToInsert] = it->heapPtr;
oldLeafs[nToInsert] = it;
nToInsert++;
toDelete[nToDelete] = i;
nToDelete++;
@ -778,7 +780,7 @@ doPickSplit(Relation index, SpGistState *state,
{
in.datums[nToInsert] =
isNulls ? (Datum) 0 : SGLTDATUM(it, state);
heapPtrs[nToInsert] = it->heapPtr;
oldLeafs[nToInsert] = it;
nToInsert++;
toDelete[nToDelete] = i;
nToDelete++;
@ -790,7 +792,7 @@ doPickSplit(Relation index, SpGistState *state,
{
/* We could see a DEAD tuple as first/only chain item */
Assert(i == current->offnum);
Assert(it->nextOffset == InvalidOffsetNumber);
Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
toDelete[nToDelete] = i;
nToDelete++;
/* replacing it with redirect will save no space */
@ -798,7 +800,7 @@ doPickSplit(Relation index, SpGistState *state,
else
elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
i = it->nextOffset;
i = SGLT_GET_NEXTOFFSET(it);
}
}
in.nTuples = nToInsert;
@ -811,7 +813,7 @@ doPickSplit(Relation index, SpGistState *state,
*/
in.datums[in.nTuples] =
isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
oldLeafs[in.nTuples] = newLeafTuple;
in.nTuples++;
memset(&out, 0, sizeof(out));
@ -833,9 +835,19 @@ doPickSplit(Relation index, SpGistState *state,
totalLeafSizes = 0;
for (i = 0; i < in.nTuples; i++)
{
newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
out.leafTupleDatums[i],
false);
if (state->leafTupDesc->natts > 1)
spgDeformLeafTuple(oldLeafs[i],
state->leafTupDesc,
leafDatums,
leafIsnulls,
isNulls);
leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
leafIsnulls[spgKeyColumn] = false;
newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
leafDatums,
leafIsnulls);
totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
}
}
@ -856,9 +868,22 @@ doPickSplit(Relation index, SpGistState *state,
totalLeafSizes = 0;
for (i = 0; i < in.nTuples; i++)
{
newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
(Datum) 0,
true);
if (state->leafTupDesc->natts > 1)
spgDeformLeafTuple(oldLeafs[i],
state->leafTupDesc,
leafDatums,
leafIsnulls,
isNulls);
/*
* Nulls tree can contain only null key values.
*/
leafDatums[spgKeyColumn] = (Datum) 0;
leafIsnulls[spgKeyColumn] = true;
newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
leafDatums,
leafIsnulls);
totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
}
}
@ -1192,10 +1217,10 @@ doPickSplit(Relation index, SpGistState *state,
if (ItemPointerIsValid(&nodes[n]->t_tid))
{
Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
}
else
it->nextOffset = InvalidOffsetNumber;
SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
/* Insert it on page */
newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
@ -1885,10 +1910,12 @@ spgSplitNodeAction(Relation index, SpGistState *state,
*/
bool
spgdoinsert(Relation index, SpGistState *state,
ItemPointer heapPtr, Datum datum, bool isnull)
ItemPointer heapPtr, Datum *datums, bool *isnulls)
{
TupleDesc leafDescriptor = state->leafTupDesc;
bool isnull = isnulls[spgKeyColumn];
int level = 0;
Datum leafDatum;
Datum leafDatums[INDEX_MAX_KEYS];
int leafSize;
SPPageDesc current,
parent;
@ -1905,8 +1932,8 @@ spgdoinsert(Relation index, SpGistState *state,
* Prepare the leaf datum to insert.
*
* If an optional "compress" method is provided, then call it to form the
* leaf datum from the input datum. Otherwise store the input datum as
* is. Since we don't use index_form_tuple in this AM, we have to make
* leaf key datum from the input datum. Otherwise, store the input datum
* as is. Since we don't use index_form_tuple in this AM, we have to make
* sure value to be inserted is not toasted; FormIndexDatum doesn't
* guarantee that. But we assume the "compress" method to return an
* untoasted value.
@ -1918,32 +1945,43 @@ spgdoinsert(Relation index, SpGistState *state,
FmgrInfo *compressProcinfo = NULL;
compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
leafDatum = FunctionCall1Coll(compressProcinfo,
index->rd_indcollation[0],
datum);
leafDatums[spgKeyColumn] =
FunctionCall1Coll(compressProcinfo,
index->rd_indcollation[spgKeyColumn],
datums[spgKeyColumn]);
}
else
{
Assert(state->attLeafType.type == state->attType.type);
if (state->attType.attlen == -1)
leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum));
leafDatums[spgKeyColumn] =
PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
else
leafDatum = datum;
leafDatums[spgKeyColumn] = datums[spgKeyColumn];
}
}
else
leafDatum = (Datum) 0;
leafDatums[spgKeyColumn] = (Datum) 0;
/* Likewise, ensure that any INCLUDE values are not toasted */
for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
{
if (!isnulls[i])
{
if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
else
leafDatums[i] = datums[i];
}
else
leafDatums[i] = (Datum) 0;
}
/*
* Compute space needed for a leaf tuple containing the given datum. This
* must match spgFormLeafTuple.
* Compute space needed for a leaf tuple containing the given data.
*/
leafSize = SGLTHDRSZ;
if (!isnull)
leafSize += SpGistGetLeafTypeSize(&state->attLeafType, leafDatum);
if (leafSize < SGDTSIZE)
leafSize = SGDTSIZE;
leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
/* Account for an item pointer, too */
leafSize += sizeof(ItemIdData);
@ -2048,7 +2086,7 @@ spgdoinsert(Relation index, SpGistState *state,
int nToSplit,
sizeToSplit;
leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
if (leafTuple->size + sizeof(ItemIdData) <=
SpGistPageGetFreeSpace(current.page, 1))
{
@ -2110,8 +2148,8 @@ spgdoinsert(Relation index, SpGistState *state,
innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
PageGetItemId(current.page, current.offnum));
in.datum = datum;
in.leafDatum = leafDatum;
in.datum = datums[spgKeyColumn];
in.leafDatum = leafDatums[spgKeyColumn];
in.level = level;
in.allTheSame = innerTuple->allTheSame;
in.hasPrefix = (innerTuple->prefixSize > 0);
@ -2160,9 +2198,9 @@ spgdoinsert(Relation index, SpGistState *state,
/* Replace leafDatum and recompute leafSize */
if (!isnull)
{
leafDatum = out.result.matchNode.restDatum;
leafSize = SGLTHDRSZ +
SpGistGetLeafTypeSize(&state->attLeafType, leafDatum);
leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
leafSize = SpGistGetLeafTupleSize(leafDescriptor,
leafDatums, isnulls);
leafSize += sizeof(ItemIdData);
}

View File

@ -56,7 +56,7 @@ spgistBuildCallback(Relation index, ItemPointer tid, Datum *values,
* any temp data when retrying.
*/
while (!spgdoinsert(index, &buildstate->spgstate, tid,
*values, *isnull))
values, isnull))
{
MemoryContextReset(buildstate->tmpCtx);
}
@ -227,7 +227,7 @@ spginsert(Relation index, Datum *values, bool *isnull,
* to avoid cumulative memory consumption. That means we also have to
* redo initSpGistState(), but it's cheap enough not to matter.
*/
while (!spgdoinsert(index, &spgstate, ht_ctid, *values, *isnull))
while (!spgdoinsert(index, &spgstate, ht_ctid, values, isnull))
{
MemoryContextReset(insertCtx);
initSpGistState(&spgstate, index);

View File

@ -27,7 +27,8 @@
#include "utils/rel.h"
typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
Datum leafValue, bool isNull, bool recheck,
Datum leafValue, bool isNull,
SpGistLeafTuple leafTuple, bool recheck,
bool recheckDistances, double *distances);
/*
@ -88,6 +89,9 @@ spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
DatumGetPointer(item->value) != NULL)
pfree(DatumGetPointer(item->value));
if (item->leafTuple)
pfree(item->leafTuple);
if (item->traversalValue)
pfree(item->traversalValue);
@ -133,6 +137,7 @@ spgAddStartItem(SpGistScanOpaque so, bool isnull)
startEntry->isLeaf = false;
startEntry->level = 0;
startEntry->value = (Datum) 0;
startEntry->leafTuple = NULL;
startEntry->traversalValue = NULL;
startEntry->recheck = false;
startEntry->recheckDistances = false;
@ -299,7 +304,6 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
{
IndexScanDesc scan;
SpGistScanOpaque so;
TupleDesc outTupDesc;
int i;
scan = RelationGetIndexScan(rel, keysz, orderbysz);
@ -319,20 +323,13 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
ALLOCSET_DEFAULT_SIZES);
/*
* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan.
* Set up reconTupDesc and xs_hitupdesc in case it's an index-only scan,
* making sure that the key column is shown as being of type attType.
* (It's rather annoying to do this work when it might be wasted, but for
* most opclasses we can re-use the index reldesc instead of making one.)
*/
if (so->state.attType.type ==
TupleDescAttr(RelationGetDescr(rel), 0)->atttypid)
outTupDesc = RelationGetDescr(rel);
else
{
outTupDesc = CreateTemplateTupleDesc(1);
TupleDescInitEntry(outTupDesc, 1, NULL,
so->state.attType.type, -1, 0);
}
so->indexTupDesc = scan->xs_hitupdesc = outTupDesc;
so->reconTupDesc = scan->xs_hitupdesc =
getSpGistTupleDesc(rel, &so->state.attType);
/* Allocate various arrays needed for order-by scans */
if (scan->numberOfOrderBys > 0)
@ -435,6 +432,10 @@ spgendscan(IndexScanDesc scan)
if (so->keyData)
pfree(so->keyData);
if (so->state.leafTupDesc &&
so->state.leafTupDesc != RelationGetDescr(so->state.index))
FreeTupleDesc(so->state.leafTupDesc);
if (so->state.deadTupleStorage)
pfree(so->state.deadTupleStorage);
@ -455,14 +456,14 @@ spgendscan(IndexScanDesc scan)
* Leaf SpGistSearchItem constructor, called in queue context
*/
static SpGistSearchItem *
spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple,
Datum leafValue, bool recheck, bool recheckDistances,
bool isnull, double *distances)
{
SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
item->level = level;
item->heapPtr = *heapPtr;
item->heapPtr = leafTuple->heapPtr;
/*
* If we need the reconstructed value, copy it to queue cxt out of tmp
@ -471,11 +472,28 @@ spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
* the wrong type. The correct leafValue type is attType not leafType.
*/
if (so->want_itup)
{
item->value = isnull ? (Datum) 0 :
datumCopy(leafValue, so->state.attType.attbyval,
so->state.attType.attlen);
/*
* If we're going to need to reconstruct INCLUDE attributes, store the
* whole leaf tuple so we can get the INCLUDE attributes out of it.
*/
if (so->state.leafTupDesc->natts > 1)
{
item->leafTuple = palloc(leafTuple->size);
memcpy(item->leafTuple, leafTuple, leafTuple->size);
}
else
item->leafTuple = NULL;
}
else
{
item->value = (Datum) 0;
item->leafTuple = NULL;
}
item->traversalValue = NULL;
item->isLeaf = true;
item->recheck = recheck;
@ -555,7 +573,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
/* the scan is ordered -> add the item to the queue */
MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt);
SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
&leafTuple->heapPtr,
leafTuple,
leafValue,
recheck,
recheckDistances,
@ -571,7 +589,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
/* non-ordered scan, so report the item right away */
Assert(!recheckDistances);
storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
recheck, false, NULL);
leafTuple, recheck, false, NULL);
*reportedSome = true;
}
}
@ -624,6 +642,8 @@ spgMakeInnerItem(SpGistScanOpaque so,
so->state.attLeafType.attlen)
: (Datum) 0;
item->leafTuple = NULL;
/*
* Elements of out.traversalValues should be allocated in
* in.traversalMemoryContext, which is actually a long lived context of
@ -765,7 +785,7 @@ spgTestLeafTuple(SpGistScanOpaque so,
/* dead tuple should be first in chain */
Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
/* No live entries on this page */
Assert(leafTuple->nextOffset == InvalidOffsetNumber);
Assert(SGLT_GET_NEXTOFFSET(leafTuple) == InvalidOffsetNumber);
return SpGistBreakOffsetNumber;
}
}
@ -779,7 +799,7 @@ spgTestLeafTuple(SpGistScanOpaque so,
spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
return leafTuple->nextOffset;
return SGLT_GET_NEXTOFFSET(leafTuple);
}
/*
@ -812,7 +832,8 @@ redirect:
/* We store heap items in the queue only in case of ordered search */
Assert(so->numberOfNonNullOrderBys > 0);
storeRes(so, &item->heapPtr, item->value, item->isNull,
item->recheck, item->recheckDistances, item->distances);
item->leafTuple, item->recheck,
item->recheckDistances, item->distances);
reportedSome = true;
}
else
@ -905,8 +926,9 @@ redirect:
/* storeRes subroutine for getbitmap case */
static void
storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
double *distances)
Datum leafValue, bool isnull,
SpGistLeafTuple leafTuple, bool recheck,
bool recheckDistances, double *distances)
{
Assert(!recheckDistances && !distances);
tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
@ -932,8 +954,9 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
/* storeRes subroutine for gettuple case */
static void
storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
double *nonNullDistances)
Datum leafValue, bool isnull,
SpGistLeafTuple leafTuple, bool recheck,
bool recheckDistances, double *nonNullDistances)
{
Assert(so->nPtrs < MaxIndexTuplesPerPage);
so->heapPtrs[so->nPtrs] = *heapPtr;
@ -978,9 +1001,20 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
* Reconstruct index data. We have to copy the datum out of the temp
* context anyway, so we may as well create the tuple here.
*/
so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc,
&leafValue,
&isnull);
Datum leafDatums[INDEX_MAX_KEYS];
bool leafIsnulls[INDEX_MAX_KEYS];
/* We only need to deform the old tuple if it has INCLUDE attributes */
if (so->state.leafTupDesc->natts > 1)
spgDeformLeafTuple(leafTuple, so->state.leafTupDesc,
leafDatums, leafIsnulls, isnull);
leafDatums[spgKeyColumn] = leafValue;
leafIsnulls[spgKeyColumn] = isnull;
so->reconTups[so->nPtrs] = heap_form_tuple(so->reconTupDesc,
leafDatums,
leafIsnulls);
}
so->nPtrs++;
}
@ -1048,6 +1082,10 @@ spgcanreturn(Relation index, int attno)
{
SpGistCache *cache;
/* INCLUDE attributes can always be fetched for index-only scans */
if (attno > 1)
return true;
/* We can do it if the opclass config function says so */
cache = spgGetCache(index);

View File

@ -19,6 +19,7 @@
#include "access/htup_details.h"
#include "access/reloptions.h"
#include "access/spgist_private.h"
#include "access/toast_compression.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_amop.h"
@ -58,7 +59,7 @@ spghandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcanparallel = false;
amroutine->amcaninclude = false;
amroutine->amcaninclude = true;
amroutine->amusemaintenanceworkmem = false;
amroutine->amparallelvacuumoptions =
VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP;
@ -154,8 +155,19 @@ GetIndexInputType(Relation index, AttrNumber indexcol)
static void
fillTypeDesc(SpGistTypeDesc *desc, Oid type)
{
HeapTuple tp;
Form_pg_type typtup;
desc->type = type;
get_typlenbyval(type, &desc->attlen, &desc->attbyval);
tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for type %u", type);
typtup = (Form_pg_type) GETSTRUCT(tp);
desc->attlen = typtup->typlen;
desc->attbyval = typtup->typbyval;
desc->attstorage = typtup->typstorage;
desc->attalign = typtup->typalign;
ReleaseSysCache(tp);
}
/*
@ -178,22 +190,23 @@ spgGetCache(Relation index)
cache = MemoryContextAllocZero(index->rd_indexcxt,
sizeof(SpGistCache));
/* SPGiST doesn't support multi-column indexes */
Assert(index->rd_att->natts == 1);
/* SPGiST must have one key column and can also have INCLUDE columns */
Assert(IndexRelationGetNumberOfKeyAttributes(index) == 1);
Assert(IndexRelationGetNumberOfAttributes(index) <= INDEX_MAX_KEYS);
/*
* Get the actual (well, nominal) data type of the column being
* indexed. We pass this to the opclass config function so that
* polymorphic opclasses are possible.
* Get the actual (well, nominal) data type of the key column. We
* pass this to the opclass config function so that polymorphic
* opclasses are possible.
*/
atttype = GetIndexInputType(index, 1);
atttype = GetIndexInputType(index, spgKeyColumn + 1);
/* Call the config function to get config info for the opclass */
in.attType = atttype;
procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
FunctionCall2Coll(procinfo,
index->rd_indcollation[0],
index->rd_indcollation[spgKeyColumn],
PointerGetDatum(&in),
PointerGetDatum(&cache->config));
@ -206,7 +219,7 @@ spgGetCache(Relation index)
*/
if (!OidIsValid(cache->config.leafType))
cache->config.leafType =
TupleDescAttr(RelationGetDescr(index), 0)->atttypid;
TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid;
/* Get the information we need about each relevant datatype */
fillTypeDesc(&cache->attType, atttype);
@ -254,12 +267,60 @@ spgGetCache(Relation index)
return cache;
}
/*
* Compute a tuple descriptor for leaf tuples or index-only-scan result tuples.
*
* We can use the relcache's tupdesc as-is in many cases, and it's always
* OK so far as any INCLUDE columns are concerned. However, the entry for
* the key column has to match leafType in the first case or attType in the
* second case. While the relcache's tupdesc *should* show leafType, this
* might not hold for legacy user-defined opclasses, since before v14 they
* were not allowed to declare their true storage type in CREATE OPCLASS.
* Also, attType can be different from what is in the relcache.
*
* This function gives back either a pointer to the relcache's tupdesc
* if that is suitable, or a palloc'd copy that's been adjusted to match
* the specified key column type. We can avoid doing any catalog lookups
* here by insisting that the caller pass an SpGistTypeDesc not just an OID.
*/
TupleDesc
getSpGistTupleDesc(Relation index, SpGistTypeDesc *keyType)
{
TupleDesc outTupDesc;
Form_pg_attribute att;
if (keyType->type ==
TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid)
outTupDesc = RelationGetDescr(index);
else
{
outTupDesc = CreateTupleDescCopy(RelationGetDescr(index));
att = TupleDescAttr(outTupDesc, spgKeyColumn);
/* It's sufficient to update the type-dependent fields of the column */
att->atttypid = keyType->type;
att->atttypmod = -1;
att->attlen = keyType->attlen;
att->attbyval = keyType->attbyval;
att->attalign = keyType->attalign;
att->attstorage = keyType->attstorage;
/* We shouldn't need to bother with making these valid: */
att->attcollation = InvalidOid;
att->attcompression = InvalidCompressionMethod;
/* In case we changed typlen, we'd better reset following offsets */
for (int i = spgFirstIncludeColumn; i < outTupDesc->natts; i++)
TupleDescAttr(outTupDesc, i)->attcacheoff = -1;
}
return outTupDesc;
}
/* Initialize SpGistState for working with the given index */
void
initSpGistState(SpGistState *state, Relation index)
{
SpGistCache *cache;
state->index = index;
/* Get cached static information about index */
cache = spgGetCache(index);
@ -269,6 +330,9 @@ initSpGistState(SpGistState *state, Relation index)
state->attPrefixType = cache->attPrefixType;
state->attLabelType = cache->attLabelType;
/* Ensure we have a valid descriptor for leaf tuples */
state->leafTupDesc = getSpGistTupleDesc(state->index, &state->attLeafType);
/* Make workspace for constructing dead tuples */
state->deadTupleStorage = palloc0(SGDTSIZE);
@ -696,24 +760,6 @@ SpGistGetInnerTypeSize(SpGistTypeDesc *att, Datum datum)
return MAXALIGN(size);
}
/*
* Get the space needed to store a non-null datum of the indicated type
* in a leaf tuple. This is just the usual storage space for the type,
* but rounded up to a MAXALIGN boundary.
*/
unsigned int
SpGistGetLeafTypeSize(SpGistTypeDesc *att, Datum datum)
{
unsigned int size;
if (att->attlen > 0)
size = att->attlen;
else
size = VARSIZE_ANY(datum);
return MAXALIGN(size);
}
/*
* Copy the given non-null datum to *target, in the inner-tuple case
*/
@ -734,42 +780,111 @@ memcpyInnerDatum(void *target, SpGistTypeDesc *att, Datum datum)
}
/*
* Copy the given non-null datum to *target, in the leaf-tuple case
* Compute space required for a leaf tuple holding the given data.
*
* This must match the size-calculation portion of spgFormLeafTuple.
*/
static void
memcpyLeafDatum(void *target, SpGistTypeDesc *att, Datum datum)
Size
SpGistGetLeafTupleSize(TupleDesc tupleDescriptor,
Datum *datums, bool *isnulls)
{
unsigned int size;
Size size;
Size data_size;
bool needs_null_mask = false;
int natts = tupleDescriptor->natts;
if (att->attbyval)
/*
* Decide whether we need a nulls bitmask.
*
* If there is only a key attribute (natts == 1), never use a bitmask, for
* compatibility with the pre-v14 layout of leaf tuples. Otherwise, we
* need one if any attribute is null.
*/
if (natts > 1)
{
store_att_byval(target, datum, att->attlen);
}
else
{
size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
memcpy(target, DatumGetPointer(datum), size);
for (int i = 0; i < natts; i++)
{
if (isnulls[i])
{
needs_null_mask = true;
break;
}
}
}
/*
* Calculate size of the data part; same as for heap tuples.
*/
data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
/*
* Compute total size.
*/
size = SGLTHDRSZ(needs_null_mask);
size += data_size;
size = MAXALIGN(size);
/*
* Ensure that we can replace the tuple with a dead tuple later. This test
* is unnecessary when there are any non-null attributes, but be safe.
*/
if (size < SGDTSIZE)
size = SGDTSIZE;
return size;
}
/*
* Construct a leaf tuple containing the given heap TID and datum value
* Construct a leaf tuple containing the given heap TID and datum values
*/
SpGistLeafTuple
spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
Datum datum, bool isnull)
Datum *datums, bool *isnulls)
{
SpGistLeafTuple tup;
unsigned int size;
/* compute space needed (note result is already maxaligned) */
size = SGLTHDRSZ;
if (!isnull)
size += SpGistGetLeafTypeSize(&state->attLeafType, datum);
TupleDesc tupleDescriptor = state->leafTupDesc;
Size size;
Size hoff;
Size data_size;
bool needs_null_mask = false;
int natts = tupleDescriptor->natts;
char *tp; /* ptr to tuple data */
uint16 tupmask = 0; /* unused heap_fill_tuple output */
/*
* Ensure that we can replace the tuple with a dead tuple later. This
* test is unnecessary when !isnull, but let's be safe.
* Decide whether we need a nulls bitmask.
*
* If there is only a key attribute (natts == 1), never use a bitmask, for
* compatibility with the pre-v14 layout of leaf tuples. Otherwise, we
* need one if any attribute is null.
*/
if (natts > 1)
{
for (int i = 0; i < natts; i++)
{
if (isnulls[i])
{
needs_null_mask = true;
break;
}
}
}
/*
* Calculate size of the data part; same as for heap tuples.
*/
data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
/*
* Compute total size.
*/
hoff = SGLTHDRSZ(needs_null_mask);
size = hoff + data_size;
size = MAXALIGN(size);
/*
* Ensure that we can replace the tuple with a dead tuple later. This test
* is unnecessary when there are any non-null attributes, but be safe.
*/
if (size < SGDTSIZE)
size = SGDTSIZE;
@ -778,10 +893,29 @@ spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
tup = (SpGistLeafTuple) palloc0(size);
tup->size = size;
tup->nextOffset = InvalidOffsetNumber;
SGLT_SET_NEXTOFFSET(tup, InvalidOffsetNumber);
tup->heapPtr = *heapPtr;
if (!isnull)
memcpyLeafDatum(SGLTDATAPTR(tup), &state->attLeafType, datum);
tp = (char *) tup + hoff;
if (needs_null_mask)
{
bits8 *bp; /* ptr to null bitmap in tuple */
/* Set nullmask presence bit in SpGistLeafTuple header */
SGLT_SET_HASNULLMASK(tup, true);
/* Fill the data area and null mask */
bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
&tupmask, bp);
}
else if (natts > 1 || !isnulls[spgKeyColumn])
{
/* Fill data area only */
heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
&tupmask, (bits8 *) NULL);
}
/* otherwise we have no data, nor a bitmap, to fill */
return tup;
}
@ -925,7 +1059,7 @@ spgFormDeadTuple(SpGistState *state, int tupstate,
tuple->tupstate = tupstate;
tuple->size = SGDTSIZE;
tuple->nextOffset = InvalidOffsetNumber;
SGLT_SET_NEXTOFFSET(tuple, InvalidOffsetNumber);
if (tupstate == SPGIST_REDIRECT)
{
@ -942,6 +1076,52 @@ spgFormDeadTuple(SpGistState *state, int tupstate,
return tuple;
}
/*
* Convert an SPGiST leaf tuple into Datum/isnull arrays.
*
* The caller must allocate sufficient storage for the output arrays.
* (INDEX_MAX_KEYS entries should be enough.)
*/
void
spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor,
Datum *datums, bool *isnulls, bool keyColumnIsNull)
{
bool hasNullsMask = SGLT_GET_HASNULLMASK(tup);
char *tp; /* ptr to tuple data */
bits8 *bp; /* ptr to null bitmap in tuple */
if (keyColumnIsNull && tupleDescriptor->natts == 1)
{
/*
* Trivial case: there is only the key attribute and we're in a nulls
* tree. The hasNullsMask bit in the tuple header should not be set
* (and thus we can't use index_deform_tuple_internal), but
* nonetheless the result is NULL.
*
* Note: currently this is dead code, because noplace calls this when
* there is only the key attribute. But we should cover the case.
*/
Assert(!hasNullsMask);
datums[spgKeyColumn] = (Datum) 0;
isnulls[spgKeyColumn] = true;
return;
}
tp = (char *) tup + SGLTHDRSZ(hasNullsMask);
bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
index_deform_tuple_internal(tupleDescriptor,
datums, isnulls,
tp, bp, hasNullsMask);
/*
* Key column isnull value from the tuple should be consistent with
* keyColumnIsNull flag from the caller.
*/
Assert(keyColumnIsNull == isnulls[spgKeyColumn]);
}
/*
* Extract the label datums of the nodes within innerTuple
*

View File

@ -168,23 +168,23 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
}
/* Form predecessor map, too */
if (lt->nextOffset != InvalidOffsetNumber)
if (SGLT_GET_NEXTOFFSET(lt) != InvalidOffsetNumber)
{
/* paranoia about corrupted chain links */
if (lt->nextOffset < FirstOffsetNumber ||
lt->nextOffset > max ||
predecessor[lt->nextOffset] != InvalidOffsetNumber)
if (SGLT_GET_NEXTOFFSET(lt) < FirstOffsetNumber ||
SGLT_GET_NEXTOFFSET(lt) > max ||
predecessor[SGLT_GET_NEXTOFFSET(lt)] != InvalidOffsetNumber)
elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
BufferGetBlockNumber(buffer),
RelationGetRelationName(index));
predecessor[lt->nextOffset] = i;
predecessor[SGLT_GET_NEXTOFFSET(lt)] = i;
}
}
else if (lt->tupstate == SPGIST_REDIRECT)
{
SpGistDeadTuple dt = (SpGistDeadTuple) lt;
Assert(dt->nextOffset == InvalidOffsetNumber);
Assert(SGLT_GET_NEXTOFFSET(dt) == InvalidOffsetNumber);
Assert(ItemPointerIsValid(&dt->pointer));
/*
@ -201,7 +201,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
}
else
{
Assert(lt->nextOffset == InvalidOffsetNumber);
Assert(SGLT_GET_NEXTOFFSET(lt) == InvalidOffsetNumber);
}
}
@ -250,7 +250,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
prevLive = deletable[i] ? InvalidOffsetNumber : i;
/* scan down the chain ... */
j = head->nextOffset;
j = SGLT_GET_NEXTOFFSET(head);
while (j != InvalidOffsetNumber)
{
SpGistLeafTuple lt;
@ -301,7 +301,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
interveningDeletable = false;
}
j = lt->nextOffset;
j = SGLT_GET_NEXTOFFSET(lt);
}
if (prevLive == InvalidOffsetNumber)
@ -366,7 +366,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
lt = (SpGistLeafTuple) PageGetItem(page,
PageGetItemId(page, chainSrc[i]));
Assert(lt->tupstate == SPGIST_LIVE);
lt->nextOffset = chainDest[i];
SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
}
MarkBufferDirty(buffer);

View File

@ -122,8 +122,8 @@ spgRedoAddLeaf(XLogReaderState *record)
head = (SpGistLeafTuple) PageGetItem(page,
PageGetItemId(page, xldata->offnumHeadLeaf));
Assert(head->nextOffset == leafTupleHdr.nextOffset);
head->nextOffset = xldata->offnumLeaf;
Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
}
}
else
@ -822,7 +822,7 @@ spgRedoVacuumLeaf(XLogReaderState *record)
lt = (SpGistLeafTuple) PageGetItem(page,
PageGetItemId(page, chainSrc[i]));
Assert(lt->tupstate == SPGIST_LIVE);
lt->nextOffset = chainDest[i];
SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
}
PageSetLSN(page, lsn);