1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-13 07:41:39 +03:00
* new split algorithm (as proposed in http://archives.postgresql.org/pgsql-hackers/2006-06/msg00254.php)
  * possible call pickSplit() for second and below columns
  * add spl_(l|r)datum_exists to GIST_SPLITVEC -
    pickSplit should check its values to use already defined
    spl_(l|r)datum for splitting. pickSplit should set
    spl_(l|r)datum_exists to 'false' (if they was 'true') to
    signal to caller about using spl_(l|r)datum.
  * support for old pickSplit(): not very optimal
    but correct split
* remove 'bytes' field from GISTENTRY: in any case size of
  value is defined by it's type.
* split GIST_SPLITVEC to two structures: one for using in picksplit
  and second - for internal use.
* some code refactoring
* support of subsplit to rtree opclasses

TODO: add support of subsplit to contrib modules
This commit is contained in:
Teodor Sigaev
2006-06-28 12:00:14 +00:00
parent a1dc5c60bc
commit 1f7ef548ec
23 changed files with 266 additions and 645 deletions

View File

@ -4,7 +4,7 @@
# Makefile for access/gist
#
# IDENTIFICATION
# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.15 2005/07/01 19:19:02 tgl Exp $
# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.16 2006/06/28 12:00:13 teodor Exp $
#
#-------------------------------------------------------------------------
@ -13,7 +13,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = gist.o gistutil.o gistxlog.o gistvacuum.o gistget.o gistscan.o \
gistproc.o
gistproc.o gistsplit.o
all: SUBSYS.o

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.138 2006/05/29 12:50:06 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.139 2006/06/28 12:00:14 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@ -187,7 +187,7 @@ gistbuildCallback(Relation index,
/* form an index tuple and point it at the heap tuple */
itup = gistFormTuple(&buildstate->giststate, index,
values, NULL /* size is currently bogus */, isnull);
values, isnull, true /* size is currently bogus */);
itup->t_tid = htup->t_self;
/*
@ -233,7 +233,7 @@ gistinsert(PG_FUNCTION_ARGS)
initGISTstate(&giststate, r);
itup = gistFormTuple(&giststate, r,
values, NULL /* size is currently bogus */, isnull);
values, isnull, true /* size is currently bogus */);
itup->t_tid = *ht_ctid;
gistdoinsert(r, itup, &giststate);
@ -899,150 +899,6 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
}
/*
* simple split page
*/
static void
gistSplitHalf(GIST_SPLITVEC *v, int len) {
int i;
v->spl_nright = v->spl_nleft = 0;
v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->spl_right= (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
for(i = 1; i <= len; i++)
if ( i<len/2 )
v->spl_right[ v->spl_nright++ ] = i;
else
v->spl_left[ v->spl_nleft++ ] = i;
}
/*
* if it was invalid tuple then we need special processing.
* We move all invalid tuples on right page.
*
* if there is no place on left page, gistSplit will be called one more
* time for left page.
*
* Normally, we never exec this code, but after crash replay it's possible
* to get 'invalid' tuples (probability is low enough)
*/
static void
gistSplitByInvalid(GISTSTATE *giststate, GIST_SPLITVEC *v, IndexTuple *itup, int len) {
int i;
static OffsetNumber offInvTuples[ MaxOffsetNumber ];
int nOffInvTuples = 0;
for (i = 1; i <= len; i++)
if ( GistTupleIsInvalid(itup[i - 1]) )
offInvTuples[ nOffInvTuples++ ] = i;
if ( nOffInvTuples == len ) {
/* corner case, all tuples are invalid */
v->spl_rightvalid= v->spl_leftvalid = false;
gistSplitHalf( v, len );
} else {
GistSplitVec gsvp;
v->spl_right = offInvTuples;
v->spl_nright = nOffInvTuples;
v->spl_rightvalid = false;
v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->spl_nleft = 0;
for(i = 1; i <= len; i++)
if ( !GistTupleIsInvalid(itup[i - 1]) )
v->spl_left[ v->spl_nleft++ ] = i;
v->spl_leftvalid = true;
gsvp.idgrp = NULL;
gsvp.attrsize = v->spl_lattrsize;
gsvp.attr = v->spl_lattr;
gsvp.len = v->spl_nleft;
gsvp.entries = v->spl_left;
gsvp.isnull = v->spl_lisnull;
gistunionsubkeyvec(giststate, itup, &gsvp, 0);
}
}
/*
* trys to split page by attno key, in a case of null
* values move its to separate page.
*/
static void
gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate,
GIST_SPLITVEC *v, GistEntryVector *entryvec, int attno) {
int i;
static OffsetNumber offNullTuples[ MaxOffsetNumber ];
int nOffNullTuples = 0;
for (i = 1; i <= len; i++) {
Datum datum;
bool IsNull;
if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) {
gistSplitByInvalid(giststate, v, itup, len);
return;
}
datum = index_getattr(itup[i - 1], attno+1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, attno, &(entryvec->vector[i]),
datum, r, page, i,
ATTSIZE(datum, giststate->tupdesc, attno+1, IsNull),
FALSE, IsNull);
if ( IsNull )
offNullTuples[ nOffNullTuples++ ] = i;
}
v->spl_leftvalid = v->spl_rightvalid = true;
if ( nOffNullTuples == len ) {
/*
* Corner case: All keys in attno column are null, we should try to
* by keys in next column. It all keys in all columns
* are NULL just split page half by half
*/
v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
if ( attno+1 == r->rd_att->natts )
gistSplitHalf( v, len );
else
gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1);
} else if ( nOffNullTuples > 0 ) {
int j=0;
/*
* We don't want to mix NULLs and not-NULLs keys
* on one page, so move nulls to right page
*/
v->spl_right = offNullTuples;
v->spl_nright = nOffNullTuples;
v->spl_risnull[attno] = TRUE;
v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->spl_nleft = 0;
for(i = 1; i <= len; i++)
if ( j<v->spl_nright && offNullTuples[j] == i )
j++;
else
v->spl_left[ v->spl_nleft++ ] = i;
v->spl_idgrp = NULL;
gistunionsubkey(giststate, itup, v, 0);
} else {
/*
* all keys are not-null
*/
if ( gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate) && attno+1 != r->rd_att->natts )
/*
* Splitting on attno column is not optimized: unions of left and right
* page are the same, we will try to split page by
* following columns
*/
gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1);
}
}
/*
* gistSplit -- split a page in the tree and fill struct
* used for XLOG and real writes buffers. Function is recursive, ie
@ -1057,7 +913,7 @@ gistSplit(Relation r,
{
IndexTuple *lvectup,
*rvectup;
GIST_SPLITVEC v;
GistSplitVector v;
GistEntryVector *entryvec;
int i;
SplitedPageLayout *res = NULL;
@ -1066,6 +922,8 @@ gistSplit(Relation r,
entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
entryvec->n = len + 1;
memset( v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts );
memset( v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts );
gistSplitByKey(r, page, itup, len, giststate,
&v, entryvec, 0);
@ -1073,31 +931,31 @@ gistSplit(Relation r,
lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
for (i = 0; i < v.spl_nleft; i++)
lvectup[i] = itup[v.spl_left[i] - 1];
for (i = 0; i < v.splitVector.spl_nleft; i++)
lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
for (i = 0; i < v.spl_nright; i++)
rvectup[i] = itup[v.spl_right[i] - 1];
for (i = 0; i < v.splitVector.spl_nright; i++)
rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
/* finalyze splitting (may need another split) */
if (!gistfitpage(rvectup, v.spl_nright))
if (!gistfitpage(rvectup, v.splitVector.spl_nright))
{
res = gistSplit(r, page, rvectup, v.spl_nright, giststate);
res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
}
else
{
ROTATEDIST(res);
res->block.num = v.spl_nright;
res->list = gistfillitupvec(rvectup, v.spl_nright, &( res->lenlist ) );
res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
res->block.num = v.splitVector.spl_nright;
res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &( res->lenlist ) );
res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false)
: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
}
if (!gistfitpage(lvectup, v.spl_nleft))
if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
{
SplitedPageLayout *resptr, *subres;
resptr = subres = gistSplit(r, page, lvectup, v.spl_nleft, giststate);
resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
/* install on list's tail */
while( resptr->next )
@ -1109,9 +967,9 @@ gistSplit(Relation r,
else
{
ROTATEDIST(res);
res->block.num = v.spl_nleft;
res->list = gistfillitupvec(lvectup, v.spl_nleft, &( res->lenlist ) );
res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
res->block.num = v.splitVector.spl_nleft;
res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &( res->lenlist ) );
res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false)
: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
}

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.57 2006/05/24 11:01:39 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.58 2006/06/28 12:00:14 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@ -391,7 +391,6 @@ gistindex_keytest(IndexTuple tuple,
gistdentryinit(giststate, key->sk_attno - 1, &de,
datum, r, p, offset,
IndexTupleSize(tuple) - sizeof(IndexTupleData),
FALSE, isNull);
/*

View File

@ -10,7 +10,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.5 2006/03/05 15:58:20 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.6 2006/06/28 12:00:14 teodor Exp $
*
*-------------------------------------------------------------------------
*/
@ -112,6 +112,17 @@ gist_box_consistent(PG_FUNCTION_ARGS)
strategy));
}
static void
adjustBox( BOX *b, BOX *addon ) {
if (b->high.x < addon->high.x)
b->high.x = addon->high.x;
if (b->low.x > addon->low.x)
b->low.x = addon->low.x;
if (b->high.y < addon->high.y)
b->high.y = addon->high.y;
if (b->low.y > addon->low.y)
b->low.y = addon->low.y;
}
/*
* The GiST Union method for boxes
@ -136,14 +147,7 @@ gist_box_union(PG_FUNCTION_ARGS)
for (i = 1; i < numranges; i++)
{
cur = DatumGetBoxP(entryvec->vector[i].key);
if (pageunion->high.x < cur->high.x)
pageunion->high.x = cur->high.x;
if (pageunion->low.x > cur->low.x)
pageunion->low.x = cur->low.x;
if (pageunion->high.y < cur->high.y)
pageunion->high.y = cur->high.y;
if (pageunion->low.y > cur->low.y)
pageunion->low.y = cur->low.y;
adjustBox( pageunion, cur );
}
*sizep = sizeof(BOX);
@ -206,6 +210,74 @@ compare_KB(const void *a, const void *b)
return (sa > sb) ? 1 : -1;
}
static void
chooseLR( GIST_SPLITVEC *v,
OffsetNumber *list1, int nlist1, BOX *union1,
OffsetNumber *list2, int nlist2, BOX *union2 )
{
bool firstToLeft = true;
if ( v->spl_ldatum_exists || v->spl_rdatum_exists ) {
if ( v->spl_ldatum_exists && v->spl_rdatum_exists ) {
BOX LRl = *union1, LRr = *union2;
BOX RLl = *union2, RLr = *union1;
double sizeLR, sizeRL;
adjustBox( &LRl, DatumGetBoxP( v->spl_ldatum ) );
adjustBox( &LRr, DatumGetBoxP( v->spl_rdatum ) );
adjustBox( &RLl, DatumGetBoxP( v->spl_ldatum ) );
adjustBox( &RLr, DatumGetBoxP( v->spl_rdatum ) );
sizeLR = size_box( DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)) );
sizeRL = size_box( DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)) );
if ( sizeLR > sizeRL )
firstToLeft = false;
} else {
float p1, p2;
GISTENTRY oldUnion, addon;
gistentryinit(oldUnion, ( v->spl_ldatum_exists ) ? v->spl_ldatum : v->spl_rdatum,
NULL, NULL, InvalidOffsetNumber, FALSE);
gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&union1), PointerGetDatum(&p1));
gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&union2), PointerGetDatum(&p2));
if ( (v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2) )
firstToLeft = false;
}
}
if ( firstToLeft ) {
v->spl_left = list1;
v->spl_right = list2;
v->spl_nleft = nlist1;
v->spl_nright = nlist2;
if ( v->spl_ldatum_exists )
adjustBox(union1, DatumGetBoxP( v->spl_ldatum ) );
v->spl_ldatum = BoxPGetDatum(union1);
if ( v->spl_rdatum_exists )
adjustBox(union2, DatumGetBoxP( v->spl_rdatum ) );
v->spl_rdatum = BoxPGetDatum(union2);
} else {
v->spl_left = list2;
v->spl_right = list1;
v->spl_nleft = nlist2;
v->spl_nright = nlist1;
if ( v->spl_ldatum_exists )
adjustBox(union2, DatumGetBoxP( v->spl_ldatum ) );
v->spl_ldatum = BoxPGetDatum(union2);
if ( v->spl_rdatum_exists )
adjustBox(union1, DatumGetBoxP( v->spl_rdatum ) );
v->spl_rdatum = BoxPGetDatum(union1);
}
v->spl_ldatum_exists = v->spl_rdatum_exists = false;
}
/*
* The GiST PickSplit method
*
@ -255,14 +327,7 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
))
allisequal = false;
if (pageunion.high.x < cur->high.x)
pageunion.high.x = cur->high.x;
if (pageunion.low.x > cur->low.x)
pageunion.low.x = cur->low.x;
if (pageunion.high.y < cur->high.y)
pageunion.high.y = cur->high.y;
if (pageunion.low.y > cur->low.y)
pageunion.low.y = cur->low.y;
adjustBox( &pageunion, cur );
}
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
@ -294,9 +359,17 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
v->spl_nright++;
}
}
if ( v->spl_ldatum_exists )
adjustBox( unionL, DatumGetBoxP( v->spl_ldatum ) );
v->spl_ldatum = BoxPGetDatum(unionL);
if ( v->spl_rdatum_exists )
adjustBox( unionR, DatumGetBoxP( v->spl_rdatum ) );
v->spl_rdatum = BoxPGetDatum(unionR);
v->spl_ldatum_exists = v->spl_rdatum_exists = false;
PG_RETURN_POINTER(v);
}
}
@ -399,23 +472,13 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
}
if (direction == 'x')
{
v->spl_left = listL;
v->spl_right = listR;
v->spl_nleft = posL;
v->spl_nright = posR;
v->spl_ldatum = BoxPGetDatum(unionL);
v->spl_rdatum = BoxPGetDatum(unionR);
}
else
{
v->spl_left = listB;
v->spl_right = listT;
v->spl_nleft = posB;
v->spl_nright = posT;
v->spl_ldatum = BoxPGetDatum(unionB);
v->spl_rdatum = BoxPGetDatum(unionT);
}
chooseLR( v,
listL, posL, unionL,
listR, posR, unionR );
else
chooseLR( v,
listB, posB, unionB,
listT, posT, unionT );
PG_RETURN_POINTER(v);
}
@ -629,14 +692,14 @@ gist_poly_compress(PG_FUNCTION_ARGS)
memcpy((void *) r, (void *) &(in->boundbox), sizeof(BOX));
gistentryinit(*retval, PointerGetDatum(r),
entry->rel, entry->page,
entry->offset, sizeof(BOX), FALSE);
entry->offset, FALSE);
}
else
{
gistentryinit(*retval, (Datum) 0,
entry->rel, entry->page,
entry->offset, 0, FALSE);
entry->offset, FALSE);
}
}
else
@ -700,14 +763,14 @@ gist_circle_compress(PG_FUNCTION_ARGS)
r->low.y = in->center.y - in->radius;
gistentryinit(*retval, PointerGetDatum(r),
entry->rel, entry->page,
entry->offset, sizeof(BOX), FALSE);
entry->offset, FALSE);
}
else
{
gistentryinit(*retval, (Datum) 0,
entry->rel, entry->page,
entry->offset, 0, FALSE);
entry->offset, FALSE);
}
}
else

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.15 2006/05/29 12:50:06 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.16 2006/06/28 12:00:14 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
@ -21,23 +21,12 @@
#include "miscadmin.h"
#include "storage/freespace.h"
/* group flags ( in gistadjsubkey ) */
#define LEFT_ADDED 0x01
#define RIGHT_ADDED 0x02
#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED )
static float gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2);
/*
* static *S used for temrorary storage (saves stack and palloc() call)
*/
static int attrsizeS[INDEX_MAX_KEYS];
static Datum attrS[INDEX_MAX_KEYS];
static bool isnullS[INDEX_MAX_KEYS];
static Datum attrS[INDEX_MAX_KEYS];
static bool isnullS[INDEX_MAX_KEYS];
/*
* Write itup vector to page, has no control of free space
@ -156,24 +145,31 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
* invalid tuple. Resulting Datums aren't compressed.
*/
static bool
bool
gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
Datum *attr, bool *isnull, int *attrsize ) {
Datum *attr, bool *isnull ) {
int i;
GistEntryVector *evec;
int attrsize;
evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
evec = (GistEntryVector *) palloc( ( len + 2 ) * sizeof(GISTENTRY) + GEVHDRSZ);
for (i = startkey; i < giststate->tupdesc->natts; i++) {
int j;
evec->n = 0;
if ( !isnull[i] ) {
gistentryinit( evec->vector[evec->n], attr[i],
NULL, NULL, (OffsetNumber) 0,
FALSE);
evec->n++;
}
for (j = 0; j < len; j++) {
Datum datum;
bool IsNull;
if (GistTupleIsInvalid(itvec[j]))
if (GistTupleIsInvalid(itvec[j]))
return FALSE; /* signals that union with invalid tuple => result is invalid */
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
@ -184,7 +180,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
evec->vector + evec->n,
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
evec->n++;
}
@ -192,7 +187,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
/* If this tuple vector was all NULLs, the union is NULL */
if ( evec->n == 0 ) {
attr[i] = (Datum) 0;
attrsize[i] = (Datum) 0;
isnull[i] = TRUE;
} else {
if (evec->n == 1) {
@ -203,7 +197,7 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
/* Make union and store in attr array */
attr[i] = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(attrsize + i));
PointerGetDatum(&attrsize));
isnull[i] = FALSE;
}
@ -219,20 +213,24 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
IndexTuple
gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
{
if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS, attrsizeS) )
memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts);
if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS ) )
return gist_form_invalid_tuple(InvalidBlockNumber);
return gistFormTuple(giststate, r, attrS, attrsizeS, isnullS);
return gistFormTuple(giststate, r, attrS, isnullS, false);
}
/*
* makes union of two key
*/
static void
void
gistMakeUnionKey( GISTSTATE *giststate, int attno,
GISTENTRY *entry1, bool isnull1,
GISTENTRY *entry2, bool isnull2,
Datum *dst, int *dstsize, bool *dstisnull ) {
Datum *dst, bool *dstisnull ) {
int dstsize;
static char storage[ 2 * sizeof(GISTENTRY) + GEVHDRSZ ];
GistEntryVector *evec = (GistEntryVector*)storage;
@ -242,7 +240,6 @@ gistMakeUnionKey( GISTSTATE *giststate, int attno,
if ( isnull1 && isnull2 ) {
*dstisnull = TRUE;
*dst = (Datum)0;
*dstsize = 0;
} else {
if ( isnull1 == FALSE && isnull2 == FALSE ) {
evec->vector[0] = *entry1;
@ -258,11 +255,11 @@ gistMakeUnionKey( GISTSTATE *giststate, int attno,
*dstisnull = FALSE;
*dst = FunctionCall2(&giststate->unionFn[attno],
PointerGetDatum(evec),
PointerGetDatum(dstsize));
PointerGetDatum(&dstsize));
}
}
static bool
bool
gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b) {
bool result;
@ -272,6 +269,25 @@ gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b) {
return result;
}
/*
* Decompress all keys in tuple
*/
void
gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
OffsetNumber o, GISTENTRY *attdata, bool *isnull)
{
int i;
for (i = 0; i < r->rd_att->natts; i++)
{
Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
gistdentryinit(giststate, i, &attdata[i],
datum, r, p, o,
FALSE, isnull[i]);
}
}
/*
* Forms union of oldtup and addtup, if union == oldtup then return NULL
*/
@ -299,7 +315,7 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
gistMakeUnionKey( giststate, i,
oldentries + i, oldisnull[i],
addentries + i, addisnull[i],
attrS + i, attrsizeS + i, isnullS + i );
attrS + i, isnullS + i );
if ( neednew )
/* we already need new key, so we can skip check */
@ -318,244 +334,13 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
if (neednew)
{
/* need to update key */
newtup = gistFormTuple(giststate, r, attrS, attrsizeS, isnullS);
newtup = gistFormTuple(giststate, r, attrS, isnullS, false);
newtup->t_tid = oldtup->t_tid;
}
return newtup;
}
/*
* Forms unions of subkeys after page split, but
* uses only tuples aren't in groups of equalent tuples
*/
void
gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec,
GistSplitVec *gsvp, int startkey) {
IndexTuple *cleanedItVec;
int i, cleanedLen=0;
cleanedItVec = (IndexTuple*)palloc(sizeof(IndexTuple) * gsvp->len);
for(i=0;i<gsvp->len;i++) {
if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[i]])
continue;
cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
}
gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey,
gsvp->attr, gsvp->isnull, gsvp->attrsize);
pfree( cleanedItVec );
}
/*
* unions subkeys for after user picksplit over attno-1 column
*/
void
gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, int attno)
{
GistSplitVec gsvp;
gsvp.idgrp = spl->spl_idgrp;
gsvp.attrsize = spl->spl_lattrsize;
gsvp.attr = spl->spl_lattr;
gsvp.len = spl->spl_nleft;
gsvp.entries = spl->spl_left;
gsvp.isnull = spl->spl_lisnull;
gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
gsvp.attrsize = spl->spl_rattrsize;
gsvp.attr = spl->spl_rattr;
gsvp.len = spl->spl_nright;
gsvp.entries = spl->spl_right;
gsvp.isnull = spl->spl_risnull;
gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
}
/*
* find group in vector with equal value
*/
static int
gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl, int attno)
{
int i;
int curid = 1;
/*
* attno key is always not null (see gistSplitByKey), so we may not check for
* nulls
*/
for (i = 0; i < spl->spl_nleft; i++)
{
int j;
int len;
if (spl->spl_idgrp[spl->spl_left[i]])
continue;
len = 0;
/* find all equal value in right part */
for (j = 0; j < spl->spl_nright; j++)
{
if (spl->spl_idgrp[spl->spl_right[j]])
continue;
if (gistKeyIsEQ(giststate, attno, valvec[spl->spl_left[i]].key, valvec[spl->spl_right[j]].key))
{
spl->spl_idgrp[spl->spl_right[j]] = curid;
len++;
}
}
/* find all other equal value in left part */
if (len)
{
/* add current val to list of equal values */
spl->spl_idgrp[spl->spl_left[i]] = curid;
/* searching .. */
for (j = i + 1; j < spl->spl_nleft; j++)
{
if (spl->spl_idgrp[spl->spl_left[j]])
continue;
if (gistKeyIsEQ(giststate, attno, valvec[spl->spl_left[i]].key, valvec[spl->spl_left[j]].key))
{
spl->spl_idgrp[spl->spl_left[j]] = curid;
len++;
}
}
spl->spl_ngrp[curid] = len + 1;
curid++;
}
}
return curid;
}
/*
* Insert equivalent tuples to left or right page with minimum
* penalty
*/
static void
gistadjsubkey(Relation r,
IndexTuple *itup, /* contains compressed entry */
int len,
GIST_SPLITVEC *v,
GISTSTATE *giststate,
int attno)
{
int curlen;
OffsetNumber *curwpos;
GISTENTRY entry,
identry[INDEX_MAX_KEYS];
float lpenalty = 0,
rpenalty = 0;
bool isnull[INDEX_MAX_KEYS];
int i,
j;
/* clear vectors */
curlen = v->spl_nleft;
curwpos = v->spl_left;
for (i = 0; i < v->spl_nleft; i++)
{
if (v->spl_idgrp[v->spl_left[i]] == 0)
{
*curwpos = v->spl_left[i];
curwpos++;
}
else
curlen--;
}
v->spl_nleft = curlen;
curlen = v->spl_nright;
curwpos = v->spl_right;
for (i = 0; i < v->spl_nright; i++)
{
if (v->spl_idgrp[v->spl_right[i]] == 0)
{
*curwpos = v->spl_right[i];
curwpos++;
}
else
curlen--;
}
v->spl_nright = curlen;
/* add equivalent tuple */
for (i = 0; i < len; i++)
{
if (v->spl_idgrp[i + 1] == 0) /* already inserted */
continue;
gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
identry, isnull);
v->spl_ngrp[v->spl_idgrp[i + 1]]--;
if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
(v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
{
/* force last in group */
rpenalty = 1.0;
lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
}
else
{
/* where? */
for (j = attno+1; j < r->rd_att->natts; j++)
{
gistentryinit(entry, v->spl_lattr[j], r, NULL,
(OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
lpenalty = gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
&identry[j], isnull[j]);
gistentryinit(entry, v->spl_rattr[j], r, NULL,
(OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
rpenalty = gistpenalty(giststate, j, &entry, v->spl_risnull[j],
&identry[j], isnull[j]);
if (lpenalty != rpenalty)
break;
}
}
/*
* add XXX: refactor this to avoid duplicating code
*/
if (lpenalty < rpenalty)
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
v->spl_left[v->spl_nleft++] = i + 1;
for (j = attno+1; j < r->rd_att->natts; j++)
{
gistentryinit(entry, v->spl_lattr[j], r, NULL,
(OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
gistMakeUnionKey( giststate, j,
&entry, v->spl_lisnull[j],
identry + j, isnull[j],
v->spl_lattr + j, v->spl_lattrsize + j, v->spl_lisnull + j );
}
}
else
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
v->spl_right[v->spl_nright++] = i + 1;
for (j = attno+1; j < r->rd_att->natts; j++)
{
gistentryinit(entry, v->spl_rattr[j], r, NULL,
(OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
gistMakeUnionKey( giststate, j,
&entry, v->spl_risnull[j],
identry + j, isnull[j],
v->spl_rattr + j, v->spl_rattrsize + j, v->spl_risnull + j );
}
}
}
}
/*
* find entry with lowest penalty
*/
@ -580,6 +365,9 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
it, NULL, (OffsetNumber) 0,
identry, isnull);
Assert( maxoff >= FirstOffsetNumber );
Assert( !GistPageIsLeaf(p) );
for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
{
int j;
@ -602,7 +390,6 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, j, &entry, datum, r, p, i,
ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
FALSE, IsNull);
usize = gistpenalty(giststate, j, &entry, IsNull,
&identry[j], isnull[j]);
@ -637,23 +424,23 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
void
gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
Datum k, Relation r, Page pg, OffsetNumber o,
int b, bool l, bool isNull)
bool l, bool isNull)
{
if (b && !isNull)
if (!isNull)
{
GISTENTRY *dep;
gistentryinit(*e, k, r, pg, o, b, l);
gistentryinit(*e, k, r, pg, o, l);
dep = (GISTENTRY *)
DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
PointerGetDatum(e)));
/* decompressFn may just return the given pointer */
if (dep != e)
gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
dep->bytes, dep->leafkey);
dep->leafkey);
}
else
gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
gistentryinit(*e, (Datum) 0, r, pg, o, l);
}
@ -663,28 +450,28 @@ gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
void
gistcentryinit(GISTSTATE *giststate, int nkey,
GISTENTRY *e, Datum k, Relation r,
Page pg, OffsetNumber o, int b, bool l, bool isNull)
Page pg, OffsetNumber o, bool l, bool isNull)
{
if (!isNull)
{
GISTENTRY *cep;
gistentryinit(*e, k, r, pg, o, b, l);
gistentryinit(*e, k, r, pg, o, l);
cep = (GISTENTRY *)
DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
PointerGetDatum(e)));
/* compressFn may just return the given pointer */
if (cep != e)
gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
cep->bytes, cep->leafkey);
cep->leafkey);
}
else
gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
gistentryinit(*e, (Datum) 0, r, pg, o, l);
}
IndexTuple
gistFormTuple(GISTSTATE *giststate, Relation r,
Datum attdata[], int datumsize[], bool isnull[])
Datum attdata[], bool isnull[], bool newValues)
{
GISTENTRY centry[INDEX_MAX_KEYS];
Datum compatt[INDEX_MAX_KEYS];
@ -699,8 +486,7 @@ gistFormTuple(GISTSTATE *giststate, Relation r,
{
gistcentryinit(giststate, i, &centry[i], attdata[i],
r, NULL, (OffsetNumber) 0,
(datumsize) ? datumsize[i] : -1,
(datumsize) ? FALSE : TRUE,
newValues,
FALSE);
compatt[i] = centry[i].key;
}
@ -711,24 +497,7 @@ gistFormTuple(GISTSTATE *giststate, Relation r,
return res;
}
void
gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
OffsetNumber o, GISTENTRY *attdata, bool *isnull)
{
int i;
for (i = 0; i < r->rd_att->natts; i++)
{
Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
gistdentryinit(giststate, i, &attdata[i],
datum, r, p, o,
ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
FALSE, isnull[i]);
}
}
static float
float
gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *orig, bool isNullOrig,
GISTENTRY *add, bool isNullAdd)
@ -748,74 +517,6 @@ gistpenalty(GISTSTATE *giststate, int attno,
return penalty;
}
/*
* Calls user picksplit method for attno columns to split vector to
* two vectors. May use attno+n columns data to
* get better split.
* Returns TRUE if left and right unions of attno columns are the same,
* so caller may find better split
*/
bool
gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GIST_SPLITVEC *v,
IndexTuple *itup, int len, GISTSTATE *giststate)
{
/*
* now let the user-defined picksplit function set up the split vector; in
* entryvec have no null value!!
*/
FunctionCall2(&giststate->picksplitFn[attno],
PointerGetDatum(entryvec),
PointerGetDatum(v));
/* compatibility with old code */
if (v->spl_left[v->spl_nleft - 1] == InvalidOffsetNumber)
v->spl_left[v->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
if (v->spl_right[v->spl_nright - 1] == InvalidOffsetNumber)
v->spl_right[v->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
v->spl_lattr[attno] = v->spl_ldatum;
v->spl_rattr[attno] = v->spl_rdatum;
v->spl_lisnull[attno] = false;
v->spl_risnull[attno] = false;
/*
* if index is multikey, then we must to try get smaller bounding box for
* subkey(s)
*/
if (giststate->tupdesc->natts > 1 && attno+1 != giststate->tupdesc->natts)
{
if ( gistKeyIsEQ(giststate, attno, v->spl_ldatum, v->spl_rdatum) ) {
/*
* Left and right key's unions are equial, so
* we can get better split by following columns. Note,
* uninons for attno columns are already done.
*/
return true;
} else {
int MaxGrpId;
v->spl_idgrp = (int *) palloc0(sizeof(int) * entryvec->n);
v->spl_grpflag = (char *) palloc0(sizeof(char) * entryvec->n);
v->spl_ngrp = (int *) palloc(sizeof(int) * entryvec->n);
MaxGrpId = gistfindgroup(giststate, entryvec->vector, v, attno);
/* form union of sub keys for each page (l,p) */
gistunionsubkey(giststate, itup, v, attno + 1);
/*
* if possible, we insert equivalent tuples with control by penalty
* for a subkey(s)
*/
if (MaxGrpId > 1)
gistadjsubkey(r, itup, len, v, giststate, attno);
}
}
return false;
}
/*
* Initialize a new index page
*/