1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-17 06:41:09 +03:00
Files
postgres/src/backend/access/gist/gistsplit.c
Tom Lane 932ab7462b Repair bugs in GiST page splitting code for multi-column indexes.
When considering a non-last column in a multi-column GiST index,
gistsplit.c tries to improve on the split chosen by the opclass-specific
pickSplit function by considering penalties for the next column.  However,
there were two bugs in this code: it failed to recompute the union keys for
the leftmost index columns, even though these might well change after
reassigning tuples; and it included the old union keys in the recomputation
for the columns it did recompute, so that those keys couldn't get smaller
even if they should.  The first problem could result in an invalid index
in which searches wouldn't find index entries that are in fact present;
the second would make the index less efficient to search.

Both of these errors were caused by misuse of gistMakeUnionItVec, whose
API was designed in a way that just begged such errors to be made.  There
is no situation in which it's safe or useful to compute the union keys for
a subset of the index columns, and there is no caller that wants any
previous union keys to be included in the computation; so the undocumented
choice to treat the union keys as in/out rather than pure output parameters
is a waste of code as well as being dangerous.

Hence, rather than just making a minimal patch, I've changed the API of
gistMakeUnionItVec to remove the "startkey" parameter (it now always
processes all index columns) and treat the attr/isnull arrays as purely
output parameters.

In passing, also get rid of a couple of unnecessary and dangerous uses
of static variables in gistutil.c.  It's remarkable that the one in
gistMakeUnionKey hasn't given us portability troubles before now, because
in addition to posing a re-entrancy hazard, it was unsafely assuming that
a static char[] array would have at least Datum alignment.

Per investigation of a trouble report from Tomas Vondra.  (There are also
some bugs in contrib/btree_gist to be fixed, but that seems like material
for a separate patch.)  Back-patch to all supported branches.
2013-02-07 17:44:21 -05:00

685 lines
18 KiB
C

/*-------------------------------------------------------------------------
*
* gistsplit.c
* Split page algorithm
*
*
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistsplit.c,v 1.12 2010/01/02 16:57:34 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/gist_private.h"
#include "utils/rel.h"
typedef struct
{
OffsetNumber *entries;
int len;
Datum *attr;
bool *isnull;
bool *equiv;
} GistSplitUnion;
/*
* Form unions of subkeys after a page split, ignoring any tuples
* that are marked in gsvp->equiv[]
*/
static void
gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec,
GistSplitUnion *gsvp)
{
IndexTuple *cleanedItVec;
int i,
cleanedLen = 0;
cleanedItVec = (IndexTuple *) palloc(sizeof(IndexTuple) * gsvp->len);
for (i = 0; i < gsvp->len; i++)
{
if (gsvp->equiv && gsvp->equiv[gsvp->entries[i]])
continue;
cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
}
if (!gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen,
gsvp->attr, gsvp->isnull))
elog(ERROR, "invalid GiST tuple not expected");
pfree(cleanedItVec);
}
/*
* Recompute unions of subkeys after a page split, ignoring any tuples
* that are marked in spl->spl_equiv[]
*/
static void
gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GistSplitVector *spl)
{
GistSplitUnion gsvp;
gsvp.equiv = spl->spl_equiv;
gsvp.entries = spl->splitVector.spl_left;
gsvp.len = spl->splitVector.spl_nleft;
gsvp.attr = spl->spl_lattr;
gsvp.isnull = spl->spl_lisnull;
gistunionsubkeyvec(giststate, itvec, &gsvp);
gsvp.entries = spl->splitVector.spl_right;
gsvp.len = spl->splitVector.spl_nright;
gsvp.attr = spl->spl_rattr;
gsvp.isnull = spl->spl_risnull;
gistunionsubkeyvec(giststate, itvec, &gsvp);
}
/*
* find group in vector with equivalent value
*/
static int
gistfindgroup(Relation r, GISTSTATE *giststate, GISTENTRY *valvec, GistSplitVector *spl, int attno)
{
int i;
GISTENTRY entry;
int len = 0;
/*
* attno key is always not null (see gistSplitByKey), so we may not check
* for nulls
*/
gistentryinit(entry, spl->splitVector.spl_rdatum, r, NULL, (OffsetNumber) 0, FALSE);
for (i = 0; i < spl->splitVector.spl_nleft; i++)
{
float penalty = gistpenalty(giststate, attno, &entry, false,
&valvec[spl->splitVector.spl_left[i]], false);
if (penalty == 0.0)
{
spl->spl_equiv[spl->splitVector.spl_left[i]] = true;
len++;
}
}
gistentryinit(entry, spl->splitVector.spl_ldatum, r, NULL, (OffsetNumber) 0, FALSE);
for (i = 0; i < spl->splitVector.spl_nright; i++)
{
float penalty = gistpenalty(giststate, attno, &entry, false,
&valvec[spl->splitVector.spl_right[i]], false);
if (penalty == 0.0)
{
spl->spl_equiv[spl->splitVector.spl_right[i]] = true;
len++;
}
}
return len;
}
static void
cleanupOffsets(OffsetNumber *a, int *len, bool *equiv, int *LenEquiv)
{
int curlen,
i;
OffsetNumber *curwpos;
curlen = *len;
curwpos = a;
for (i = 0; i < *len; i++)
{
if (equiv[a[i]] == FALSE)
{
*curwpos = a[i];
curwpos++;
}
else
{
/* corner case: we shouldn't make void array */
if (curlen == 1)
{
equiv[a[i]] = FALSE; /* mark item as non-equivalent */
i--; /* redo the same */
*LenEquiv -= 1;
continue;
}
else
curlen--;
}
}
*len = curlen;
}
static void
placeOne(Relation r, GISTSTATE *giststate, GistSplitVector *v, IndexTuple itup, OffsetNumber off, int attno)
{
GISTENTRY identry[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
bool toLeft = true;
gistDeCompressAtt(giststate, r, itup, NULL, (OffsetNumber) 0, identry, isnull);
for (; attno < giststate->tupdesc->natts; attno++)
{
float lpenalty,
rpenalty;
GISTENTRY entry;
gistentryinit(entry, v->spl_lattr[attno], r, NULL, 0, FALSE);
lpenalty = gistpenalty(giststate, attno, &entry, v->spl_lisnull[attno], identry + attno, isnull[attno]);
gistentryinit(entry, v->spl_rattr[attno], r, NULL, 0, FALSE);
rpenalty = gistpenalty(giststate, attno, &entry, v->spl_risnull[attno], identry + attno, isnull[attno]);
if (lpenalty != rpenalty)
{
if (lpenalty > rpenalty)
toLeft = false;
break;
}
}
if (toLeft)
v->splitVector.spl_left[v->splitVector.spl_nleft++] = off;
else
v->splitVector.spl_right[v->splitVector.spl_nright++] = off;
}
#define SWAPVAR( s, d, t ) \
do { \
(t) = (s); \
(s) = (d); \
(d) = (t); \
} while(0)
/*
* adjust left and right unions according to splits by previous
* split by firsts columns. This function is called only in case
* when pickSplit doesn't support subspplit.
*/
static void
supportSecondarySplit(Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC *sv, Datum oldL, Datum oldR)
{
bool leaveOnLeft = true,
tmpBool;
GISTENTRY entryL,
entryR,
entrySL,
entrySR;
gistentryinit(entryL, oldL, r, NULL, 0, FALSE);
gistentryinit(entryR, oldR, r, NULL, 0, FALSE);
gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);
if (sv->spl_ldatum_exists && sv->spl_rdatum_exists)
{
float penalty1,
penalty2;
penalty1 = gistpenalty(giststate, attno, &entryL, false, &entrySL, false) +
gistpenalty(giststate, attno, &entryR, false, &entrySR, false);
penalty2 = gistpenalty(giststate, attno, &entryL, false, &entrySR, false) +
gistpenalty(giststate, attno, &entryR, false, &entrySL, false);
if (penalty1 > penalty2)
leaveOnLeft = false;
}
else
{
GISTENTRY *entry1 = (sv->spl_ldatum_exists) ? &entryL : &entryR;
float penalty1,
penalty2;
/*
* there is only one previously defined union, so we just choose swap
* or not by lowest penalty
*/
penalty1 = gistpenalty(giststate, attno, entry1, false, &entrySL, false);
penalty2 = gistpenalty(giststate, attno, entry1, false, &entrySR, false);
if (penalty1 < penalty2)
leaveOnLeft = (sv->spl_ldatum_exists) ? true : false;
else
leaveOnLeft = (sv->spl_rdatum_exists) ? true : false;
}
if (leaveOnLeft == false)
{
/*
* swap left and right
*/
OffsetNumber *off,
noff;
Datum datum;
SWAPVAR(sv->spl_left, sv->spl_right, off);
SWAPVAR(sv->spl_nleft, sv->spl_nright, noff);
SWAPVAR(sv->spl_ldatum, sv->spl_rdatum, datum);
gistentryinit(entrySL, sv->spl_ldatum, r, NULL, 0, FALSE);
gistentryinit(entrySR, sv->spl_rdatum, r, NULL, 0, FALSE);
}
if (sv->spl_ldatum_exists)
gistMakeUnionKey(giststate, attno, &entryL, false, &entrySL, false,
&sv->spl_ldatum, &tmpBool);
if (sv->spl_rdatum_exists)
gistMakeUnionKey(giststate, attno, &entryR, false, &entrySR, false,
&sv->spl_rdatum, &tmpBool);
sv->spl_ldatum_exists = sv->spl_rdatum_exists = false;
}
/*
* Trivial picksplit implementaion. Function called only
* if user-defined picksplit puts all keys to the one page.
* That is a bug of user-defined picksplit but we'd like
* to "fix" that.
*/
static void
genericPickSplit(GISTSTATE *giststate, GistEntryVector *entryvec, GIST_SPLITVEC *v, int attno)
{
OffsetNumber i,
maxoff;
int nbytes;
GistEntryVector *evec;
maxoff = entryvec->n - 1;
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);
v->spl_nleft = v->spl_nright = 0;
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
{
v->spl_left[v->spl_nleft] = i;
v->spl_nleft++;
}
else
{
v->spl_right[v->spl_nright] = i;
v->spl_nright++;
}
}
/*
* Form unions of each page
*/
evec = palloc(sizeof(GISTENTRY) * entryvec->n + GEVHDRSZ);
evec->n = v->spl_nleft;
memcpy(evec->vector, entryvec->vector + FirstOffsetNumber,
sizeof(GISTENTRY) * evec->n);
v->spl_ldatum = FunctionCall2(&giststate->unionFn[attno],
PointerGetDatum(evec),
PointerGetDatum(&nbytes));
evec->n = v->spl_nright;
memcpy(evec->vector, entryvec->vector + FirstOffsetNumber + v->spl_nleft,
sizeof(GISTENTRY) * evec->n);
v->spl_rdatum = FunctionCall2(&giststate->unionFn[attno],
PointerGetDatum(evec),
PointerGetDatum(&nbytes));
}
/*
* Calls user picksplit method for attno columns to split vector to
* two vectors. May use attno+n columns data to
* get better split.
* Returns TRUE and v->spl_equiv = NULL if left and right unions of attno columns are the same,
* so caller may find better split
* Returns TRUE and v->spl_equiv != NULL if there is tuples which may be freely moved
*/
static bool
gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVector *v,
IndexTuple *itup, int len, GISTSTATE *giststate)
{
GIST_SPLITVEC *sv = &v->splitVector;
/*
* now let the user-defined picksplit function set up the split vector; in
* entryvec there is no null value!!
*/
sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
sv->spl_ldatum = v->spl_lattr[attno];
sv->spl_rdatum = v->spl_rattr[attno];
FunctionCall2(&giststate->picksplitFn[attno],
PointerGetDatum(entryvec),
PointerGetDatum(sv));
if (sv->spl_nleft == 0 || sv->spl_nright == 0)
{
ereport(DEBUG1,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("picksplit method for column %d of index \"%s\" failed",
attno + 1, RelationGetRelationName(r)),
errhint("The index is not optimal. To optimize it, contact a developer, or try to use the column as the second one in the CREATE INDEX command.")));
/*
* Reinit GIST_SPLITVEC. Although that fields are not used by
* genericPickSplit(), let us set up it for further processing
*/
sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
sv->spl_ldatum = v->spl_lattr[attno];
sv->spl_rdatum = v->spl_rattr[attno];
genericPickSplit(giststate, entryvec, sv, attno);
if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
}
else
{
/* compatibility with old code */
if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
{
elog(LOG, "picksplit method for column %d of index \"%s\" doesn't support secondary split",
attno + 1, RelationGetRelationName(r));
supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
}
}
v->spl_lattr[attno] = sv->spl_ldatum;
v->spl_rattr[attno] = sv->spl_rdatum;
v->spl_lisnull[attno] = false;
v->spl_risnull[attno] = false;
/*
* if index is multikey, then we must to try get smaller bounding box for
* subkey(s)
*/
v->spl_equiv = NULL;
if (giststate->tupdesc->natts > 1 && attno + 1 != giststate->tupdesc->natts)
{
if (gistKeyIsEQ(giststate, attno, sv->spl_ldatum, sv->spl_rdatum))
{
/*
* Left and right key's unions are equial, so we can get better
* split by following columns. Note, unions for attno columns are
* already done.
*/
return true;
}
else
{
int LenEquiv;
v->spl_equiv = (bool *) palloc0(sizeof(bool) * (entryvec->n + 1));
LenEquiv = gistfindgroup(r, giststate, entryvec->vector, v, attno);
/*
* if possible, we should distribute equivalent tuples
*/
if (LenEquiv == 0)
{
gistunionsubkey(giststate, itup, v);
}
else
{
cleanupOffsets(sv->spl_left, &sv->spl_nleft, v->spl_equiv, &LenEquiv);
cleanupOffsets(sv->spl_right, &sv->spl_nright, v->spl_equiv, &LenEquiv);
gistunionsubkey(giststate, itup, v);
if (LenEquiv == 1)
{
/*
* In case with one tuple we just choose left-right by
* penalty. It's simplify user-defined pickSplit
*/
OffsetNumber toMove = InvalidOffsetNumber;
for (toMove = FirstOffsetNumber; toMove < entryvec->n; toMove++)
if (v->spl_equiv[toMove])
break;
Assert(toMove < entryvec->n);
placeOne(r, giststate, v, itup[toMove - 1], toMove, attno + 1);
/*
* redo gistunionsubkey(): it will not degradate
* performance, because it's very rarely
*/
v->spl_equiv = NULL;
gistunionsubkey(giststate, itup, v);
return false;
}
else if (LenEquiv > 1)
return true;
}
}
}
return false;
}
/*
* simple split page
*/
static void
gistSplitHalf(GIST_SPLITVEC *v, int len)
{
int i;
v->spl_nright = v->spl_nleft = 0;
v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->spl_right = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
for (i = 1; i <= len; i++)
if (i < len / 2)
v->spl_right[v->spl_nright++] = i;
else
v->spl_left[v->spl_nleft++] = i;
}
/*
* if it was invalid tuple then we need special processing.
* We move all invalid tuples on right page.
*
* if there is no place on left page, gistSplit will be called one more
* time for left page.
*
* Normally, we never exec this code, but after crash replay it's possible
* to get 'invalid' tuples (probability is low enough)
*/
static void
gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len)
{
int i;
static OffsetNumber offInvTuples[MaxOffsetNumber];
int nOffInvTuples = 0;
for (i = 1; i <= len; i++)
if (GistTupleIsInvalid(itup[i - 1]))
offInvTuples[nOffInvTuples++] = i;
if (nOffInvTuples == len)
{
/* corner case, all tuples are invalid */
v->spl_rightvalid = v->spl_leftvalid = false;
gistSplitHalf(&v->splitVector, len);
}
else
{
GistSplitUnion gsvp;
v->splitVector.spl_right = offInvTuples;
v->splitVector.spl_nright = nOffInvTuples;
v->spl_rightvalid = false;
v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->splitVector.spl_nleft = 0;
for (i = 1; i <= len; i++)
if (!GistTupleIsInvalid(itup[i - 1]))
v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
v->spl_leftvalid = true;
gsvp.equiv = NULL;
gsvp.attr = v->spl_lattr;
gsvp.len = v->splitVector.spl_nleft;
gsvp.entries = v->splitVector.spl_left;
gsvp.isnull = v->spl_lisnull;
gistunionsubkeyvec(giststate, itup, &gsvp);
}
}
/*
* trys to split page by attno key, in a case of null
* values move its to separate page.
*/
void
gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate,
GistSplitVector *v, GistEntryVector *entryvec, int attno)
{
int i;
static OffsetNumber offNullTuples[MaxOffsetNumber];
int nOffNullTuples = 0;
for (i = 1; i <= len; i++)
{
Datum datum;
bool IsNull;
if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
{
gistSplitByInvalid(giststate, v, itup, len);
return;
}
datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, attno, &(entryvec->vector[i]),
datum, r, page, i,
FALSE, IsNull);
if (IsNull)
offNullTuples[nOffNullTuples++] = i;
}
v->spl_leftvalid = v->spl_rightvalid = true;
if (nOffNullTuples == len)
{
/*
* Corner case: All keys in attno column are null, we should try to
* split by keys in next column. It all keys in all columns are NULL
* just split page half by half
*/
v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
if (attno + 1 == r->rd_att->natts)
gistSplitHalf(&v->splitVector, len);
else
gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
}
else if (nOffNullTuples > 0)
{
int j = 0;
/*
* We don't want to mix NULLs and not-NULLs keys on one page, so move
* nulls to right page
*/
v->splitVector.spl_right = offNullTuples;
v->splitVector.spl_nright = nOffNullTuples;
v->spl_risnull[attno] = TRUE;
v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->splitVector.spl_nleft = 0;
for (i = 1; i <= len; i++)
if (j < v->splitVector.spl_nright && offNullTuples[j] == i)
j++;
else
v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
v->spl_equiv = NULL;
gistunionsubkey(giststate, itup, v);
}
else
{
/*
* all keys are not-null
*/
entryvec->n = len + 1;
if (gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno + 1 != r->rd_att->natts)
{
/*
* Splitting on attno column is not optimized: there is a tuples
* which can be freely left or right page, we will try to split
* page by following columns
*/
if (v->spl_equiv == NULL)
{
/*
* simple case: left and right keys for attno column are
* equial
*/
gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno + 1);
}
else
{
/* we should clean up vector from already distributed tuples */
IndexTuple *newitup = (IndexTuple *) palloc((len + 1) * sizeof(IndexTuple));
OffsetNumber *map = (OffsetNumber *) palloc((len + 1) * sizeof(IndexTuple));
int newlen = 0;
GIST_SPLITVEC backupSplit = v->splitVector;
for (i = 0; i < len; i++)
if (v->spl_equiv[i + 1])
{
map[newlen] = i + 1;
newitup[newlen++] = itup[i];
}
Assert(newlen > 0);
backupSplit.spl_left = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
memcpy(backupSplit.spl_left, v->splitVector.spl_left, sizeof(OffsetNumber) * v->splitVector.spl_nleft);
backupSplit.spl_right = (OffsetNumber *) palloc(sizeof(OffsetNumber) * len);
memcpy(backupSplit.spl_right, v->splitVector.spl_right, sizeof(OffsetNumber) * v->splitVector.spl_nright);
gistSplitByKey(r, page, newitup, newlen, giststate, v, entryvec, attno + 1);
/* merge result of subsplit */
for (i = 0; i < v->splitVector.spl_nleft; i++)
backupSplit.spl_left[backupSplit.spl_nleft++] = map[v->splitVector.spl_left[i] - 1];
for (i = 0; i < v->splitVector.spl_nright; i++)
backupSplit.spl_right[backupSplit.spl_nright++] = map[v->splitVector.spl_right[i] - 1];
v->splitVector = backupSplit;
/* reunion left and right datums */
gistunionsubkey(giststate, itup, v);
}
}
}
}