1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-22 23:02:54 +03:00
1996-08-26 06:32:06 +00:00

960 lines
26 KiB
C

/*-------------------------------------------------------------------------
*
* rtree.c--
* interface routines for the postgres rtree indexed access method.
*
* Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.2 1996/08/26 06:29:10 scrappy Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "utils/elog.h"
#include "utils/palloc.h"
#include "utils/rel.h"
#include "utils/excid.h"
#include "access/heapam.h"
#include "access/genam.h"
#include "access/rtree.h"
#include "access/rtscan.h"
#include "access/funcindex.h"
#include "access/tupdesc.h"
#include "nodes/execnodes.h"
#include "nodes/plannodes.h"
#include "executor/executor.h"
#include "executor/tuptable.h"
#include "catalog/index.h"
typedef struct SPLITVEC {
OffsetNumber *spl_left;
int spl_nleft;
char *spl_ldatum;
OffsetNumber *spl_right;
int spl_nright;
char *spl_rdatum;
} SPLITVEC;
typedef struct RTSTATE {
func_ptr unionFn; /* union function */
func_ptr sizeFn; /* size function */
func_ptr interFn; /* intersection function */
} RTSTATE;
/* non-export function prototypes */
static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup,
RTSTATE *rtstate);
static void rttighten(Relation r, RTSTACK *stk, char *datum, int att_size,
RTSTATE *rtstate);
static InsertIndexResult dosplit(Relation r, Buffer buffer, RTSTACK *stack,
IndexTuple itup, RTSTATE *rtstate);
static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup,
IndexTuple rtup, RTSTATE *rtstate);
static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt);
static void picksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup,
RTSTATE *rtstate);
static void RTInitBuffer(Buffer b, uint32 f);
static OffsetNumber choose(Relation r, Page p, IndexTuple it,
RTSTATE *rtstate);
static int nospace(Page p, IndexTuple it);
static void initRtstate(RTSTATE *rtstate, Relation index);
void
rtbuild(Relation heap,
Relation index,
int natts,
AttrNumber *attnum,
IndexStrategy istrat,
uint16 pcount,
Datum *params,
FuncIndexInfo *finfo,
PredInfo *predInfo)
{
HeapScanDesc scan;
Buffer buffer;
AttrNumber i;
HeapTuple htup;
IndexTuple itup;
TupleDesc hd, id;
InsertIndexResult res;
Datum *d;
bool *nulls;
int nb, nh, ni;
ExprContext *econtext;
TupleTable tupleTable;
TupleTableSlot *slot;
Oid hrelid, irelid;
Node *pred, *oldPred;
RTSTATE rtState;
initRtstate(&rtState, index);
/* rtrees only know how to do stupid locking now */
RelationSetLockForWrite(index);
pred = predInfo->pred;
oldPred = predInfo->oldPred;
/*
* We expect to be called exactly once for any index relation.
* If that's not the case, big trouble's what we have.
*/
if (oldPred == NULL && (nb = RelationGetNumberOfBlocks(index)) != 0)
elog(WARN, "%s already contains data", index->rd_rel->relname.data);
/* initialize the root page (if this is a new index) */
if (oldPred == NULL) {
buffer = ReadBuffer(index, P_NEW);
RTInitBuffer(buffer, F_LEAF);
WriteBuffer(buffer);
}
/* init the tuple descriptors and get set for a heap scan */
hd = RelationGetTupleDescriptor(heap);
id = RelationGetTupleDescriptor(index);
d = (Datum *)palloc(natts * sizeof (*d));
nulls = (bool *)palloc(natts * sizeof (*nulls));
/*
* If this is a predicate (partial) index, we will need to evaluate the
* predicate using ExecQual, which requires the current tuple to be in a
* slot of a TupleTable. In addition, ExecQual must have an ExprContext
* referring to that slot. Here, we initialize dummy TupleTable and
* ExprContext objects for this purpose. --Nels, Feb '92
*/
#ifndef OMIT_PARTIAL_INDEX
if (pred != NULL || oldPred != NULL) {
tupleTable = ExecCreateTupleTable(1);
slot = ExecAllocTableSlot(tupleTable);
econtext = makeNode(ExprContext);
FillDummyExprContext(econtext, slot, hd, buffer);
}
#endif /* OMIT_PARTIAL_INDEX */
scan = heap_beginscan(heap, 0, NowTimeQual, 0, (ScanKey) NULL);
htup = heap_getnext(scan, 0, &buffer);
/* count the tuples as we insert them */
nh = ni = 0;
for (; HeapTupleIsValid(htup); htup = heap_getnext(scan, 0, &buffer)) {
nh++;
/*
* If oldPred != NULL, this is an EXTEND INDEX command, so skip
* this tuple if it was already in the existing partial index
*/
if (oldPred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
/*SetSlotContents(slot, htup); */
slot->val = htup;
if (ExecQual((List*)oldPred, econtext) == true) {
ni++;
continue;
}
#endif /* OMIT_PARTIAL_INDEX */
}
/* Skip this tuple if it doesn't satisfy the partial-index predicate */
if (pred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
/*SetSlotContents(slot, htup); */
slot->val = htup;
if (ExecQual((List*)pred, econtext) == false)
continue;
#endif /* OMIT_PARTIAL_INDEX */
}
ni++;
/*
* For the current heap tuple, extract all the attributes
* we use in this index, and note which are null.
*/
for (i = 1; i <= natts; i++) {
int attoff;
bool attnull;
/*
* Offsets are from the start of the tuple, and are
* zero-based; indices are one-based. The next call
* returns i - 1. That's data hiding for you.
*/
attoff = AttrNumberGetAttrOffset(i);
/*
d[attoff] = HeapTupleGetAttributeValue(htup, buffer,
*/
d[attoff] = GetIndexValue(htup,
hd,
attoff,
attnum,
finfo,
&attnull,
buffer);
nulls[attoff] = (attnull ? 'n' : ' ');
}
/* form an index tuple and point it at the heap tuple */
itup = index_formtuple(id, &d[0], nulls);
itup->t_tid = htup->t_ctid;
/*
* Since we already have the index relation locked, we
* call rtdoinsert directly. Normal access method calls
* dispatch through rtinsert, which locks the relation
* for write. This is the right thing to do if you're
* inserting single tups, but not when you're initializing
* the whole index at once.
*/
res = rtdoinsert(index, itup, &rtState);
pfree(itup);
pfree(res);
}
/* okay, all heap tuples are indexed */
heap_endscan(scan);
RelationUnsetLockForWrite(index);
if (pred != NULL || oldPred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
ExecDestroyTupleTable(tupleTable, true);
pfree(econtext);
#endif /* OMIT_PARTIAL_INDEX */
}
/*
* Since we just counted the tuples in the heap, we update its
* stats in pg_relation to guarantee that the planner takes
* advantage of the index we just created. UpdateStats() does a
* CommandCounterIncrement(), which flushes changed entries from
* the system relcache. The act of constructing an index changes
* these heap and index tuples in the system catalogs, so they
* need to be flushed. We close them to guarantee that they
* will be.
*/
hrelid = heap->rd_id;
irelid = index->rd_id;
heap_close(heap);
index_close(index);
UpdateStats(hrelid, nh, true);
UpdateStats(irelid, ni, false);
if (oldPred != NULL) {
if (ni == nh) pred = NULL;
UpdateIndexPredicate(irelid, oldPred, pred);
}
/* be tidy */
pfree(nulls);
pfree(d);
}
/*
* rtinsert -- wrapper for rtree tuple insertion.
*
* This is the public interface routine for tuple insertion in rtrees.
* It doesn't do any work; just locks the relation and passes the buck.
*/
InsertIndexResult
rtinsert(Relation r, Datum *datum, char *nulls, ItemPointer ht_ctid)
{
InsertIndexResult res;
IndexTuple itup;
RTSTATE rtState;
/* generate an index tuple */
itup = index_formtuple(RelationGetTupleDescriptor(r), datum, nulls);
itup->t_tid = *ht_ctid;
initRtstate(&rtState, r);
RelationSetLockForWrite(r);
res = rtdoinsert(r, itup, &rtState);
/* XXX two-phase locking -- don't unlock the relation until EOT */
return (res);
}
static InsertIndexResult
rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate)
{
Page page;
Buffer buffer;
BlockNumber blk;
IndexTuple which;
OffsetNumber l;
RTSTACK *stack;
InsertIndexResult res;
RTreePageOpaque opaque;
char *datum;
blk = P_ROOT;
buffer = InvalidBuffer;
stack = (RTSTACK *) NULL;
do {
/* let go of current buffer before getting next */
if (buffer != InvalidBuffer)
ReleaseBuffer(buffer);
/* get next buffer */
buffer = ReadBuffer(r, blk);
page = (Page) BufferGetPage(buffer);
opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
if (!(opaque->flags & F_LEAF)) {
RTSTACK *n;
ItemId iid;
n = (RTSTACK *) palloc(sizeof(RTSTACK));
n->rts_parent = stack;
n->rts_blk = blk;
n->rts_child = choose(r, page, itup, rtstate);
stack = n;
iid = PageGetItemId(page, n->rts_child);
which = (IndexTuple) PageGetItem(page, iid);
blk = ItemPointerGetBlockNumber(&(which->t_tid));
}
} while (!(opaque->flags & F_LEAF));
if (nospace(page, itup)) {
/* need to do a split */
res = dosplit(r, buffer, stack, itup, rtstate);
freestack(stack);
WriteBuffer(buffer); /* don't forget to release buffer! */
return (res);
}
/* add the item and write the buffer */
if (PageIsEmpty(page)) {
l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
FirstOffsetNumber,
LP_USED);
} else {
l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
OffsetNumberNext(PageGetMaxOffsetNumber(page)),
LP_USED);
}
WriteBuffer(buffer);
datum = (((char *) itup) + sizeof(IndexTupleData));
/* now expand the page boundary in the parent to include the new child */
rttighten(r, stack, datum,
(IndexTupleSize(itup) - sizeof(IndexTupleData)), rtstate);
freestack(stack);
/* build and return an InsertIndexResult for this insertion */
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
ItemPointerSet(&(res->pointerData), blk, l);
return (res);
}
static void
rttighten(Relation r,
RTSTACK *stk,
char *datum,
int att_size,
RTSTATE *rtstate)
{
char *oldud;
char *tdatum;
Page p;
float old_size, newd_size;
Buffer b;
if (stk == (RTSTACK *) NULL)
return;
b = ReadBuffer(r, stk->rts_blk);
p = BufferGetPage(b);
oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->rts_child));
oldud += sizeof(IndexTupleData);
(*rtstate->sizeFn)(oldud, &old_size);
datum = (char *) (*rtstate->unionFn)(oldud, datum);
(*rtstate->sizeFn)(datum, &newd_size);
if (newd_size != old_size) {
TupleDesc td = RelationGetTupleDescriptor(r);
if (td->attrs[0]->attlen < 0) {
/*
* This is an internal page, so 'oldud' had better be a
* union (constant-length) key, too. (See comment below.)
*/
Assert(VARSIZE(datum) == VARSIZE(oldud));
memmove(oldud, datum, VARSIZE(datum));
} else {
memmove(oldud, datum, att_size);
}
WriteBuffer(b);
/*
* The user may be defining an index on variable-sized data (like
* polygons). If so, we need to get a constant-sized datum for
* insertion on the internal page. We do this by calling the union
* proc, which is guaranteed to return a rectangle.
*/
tdatum = (char *) (*rtstate->unionFn)(datum, datum);
rttighten(r, stk->rts_parent, tdatum, att_size, rtstate);
pfree(tdatum);
} else {
ReleaseBuffer(b);
}
pfree(datum);
}
/*
* dosplit -- split a page in the tree.
*
* This is the quadratic-cost split algorithm Guttman describes in
* his paper. The reason we chose it is that you can implement this
* with less information about the data types on which you're operating.
*/
static InsertIndexResult
dosplit(Relation r,
Buffer buffer,
RTSTACK *stack,
IndexTuple itup,
RTSTATE *rtstate)
{
Page p;
Buffer leftbuf, rightbuf;
Page left, right;
ItemId itemid;
IndexTuple item;
IndexTuple ltup, rtup;
OffsetNumber maxoff;
OffsetNumber i;
OffsetNumber leftoff, rightoff;
BlockNumber lbknum, rbknum;
BlockNumber bufblock;
RTreePageOpaque opaque;
int blank;
InsertIndexResult res;
char *isnull;
SPLITVEC v;
TupleDesc tupDesc;
isnull = (char *) palloc(r->rd_rel->relnatts);
for (blank = 0; blank < r->rd_rel->relnatts; blank++)
isnull[blank] = ' ';
p = (Page) BufferGetPage(buffer);
opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
/*
* The root of the tree is the first block in the relation. If
* we're about to split the root, we need to do some hocus-pocus
* to enforce this guarantee.
*/
if (BufferGetBlockNumber(buffer) == P_ROOT) {
leftbuf = ReadBuffer(r, P_NEW);
RTInitBuffer(leftbuf, opaque->flags);
lbknum = BufferGetBlockNumber(leftbuf);
left = (Page) BufferGetPage(leftbuf);
} else {
leftbuf = buffer;
IncrBufferRefCount(buffer);
lbknum = BufferGetBlockNumber(buffer);
left = (Page) PageGetTempPage(p, sizeof(RTreePageOpaqueData));
}
rightbuf = ReadBuffer(r, P_NEW);
RTInitBuffer(rightbuf, opaque->flags);
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
picksplit(r, p, &v, itup, rtstate);
leftoff = rightoff = FirstOffsetNumber;
maxoff = PageGetMaxOffsetNumber(p);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
itemid = PageGetItemId(p, i);
item = (IndexTuple) PageGetItem(p, itemid);
if (i == *(v.spl_left)) {
(void) PageAddItem(left, (Item) item, IndexTupleSize(item),
leftoff, LP_USED);
leftoff = OffsetNumberNext(leftoff);
v.spl_left++; /* advance in left split vector */
} else {
(void) PageAddItem(right, (Item) item, IndexTupleSize(item),
rightoff, LP_USED);
rightoff = OffsetNumberNext(rightoff);
v.spl_right++; /* advance in right split vector */
}
}
/* build an InsertIndexResult for this insertion */
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
/* now insert the new index tuple */
if (*(v.spl_left) != FirstOffsetNumber) {
(void) PageAddItem(left, (Item) itup, IndexTupleSize(itup),
leftoff, LP_USED);
leftoff = OffsetNumberNext(leftoff);
ItemPointerSet(&(res->pointerData), lbknum, leftoff);
} else {
(void) PageAddItem(right, (Item) itup, IndexTupleSize(itup),
rightoff, LP_USED);
rightoff = OffsetNumberNext(rightoff);
ItemPointerSet(&(res->pointerData), rbknum, rightoff);
}
if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT) {
PageRestoreTempPage(left, p);
}
WriteBuffer(leftbuf);
WriteBuffer(rightbuf);
/*
* Okay, the page is split. We have three things left to do:
*
* 1) Adjust any active scans on this index to cope with changes
* we introduced in its structure by splitting this page.
*
* 2) "Tighten" the bounding box of the pointer to the left
* page in the parent node in the tree, if any. Since we
* moved a bunch of stuff off the left page, we expect it
* to get smaller. This happens in the internal insertion
* routine.
*
* 3) Insert a pointer to the right page in the parent. This
* may cause the parent to split. If it does, we need to
* repeat steps one and two for each split node in the tree.
*/
/* adjust active scans */
rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber);
tupDesc = r->rd_att;
ltup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &(v.spl_ldatum), isnull);
rtup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &(v.spl_rdatum), isnull);
pfree(isnull);
/* set pointers to new child pages in the internal index tuples */
ItemPointerSet(&(ltup->t_tid), lbknum, 1);
ItemPointerSet(&(rtup->t_tid), rbknum, 1);
rtintinsert(r, stack, ltup, rtup, rtstate);
pfree(ltup);
pfree(rtup);
return (res);
}
static void
rtintinsert(Relation r,
RTSTACK *stk,
IndexTuple ltup,
IndexTuple rtup,
RTSTATE *rtstate)
{
IndexTuple old;
Buffer b;
Page p;
char *ldatum, *rdatum, *newdatum;
InsertIndexResult res;
if (stk == (RTSTACK *) NULL) {
rtnewroot(r, ltup, rtup);
return;
}
b = ReadBuffer(r, stk->rts_blk);
p = BufferGetPage(b);
old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child));
/*
* This is a hack. Right now, we force rtree keys to be constant size.
* To fix this, need delete the old key and add both left and right
* for the two new pages. The insertion of left may force a split if
* the new left key is bigger than the old key.
*/
if (IndexTupleSize(old) != IndexTupleSize(ltup))
elog(WARN, "Variable-length rtree keys are not supported.");
/* install pointer to left child */
memmove(old, ltup,IndexTupleSize(ltup));
if (nospace(p, rtup)) {
newdatum = (((char *) ltup) + sizeof(IndexTupleData));
rttighten(r, stk->rts_parent, newdatum,
(IndexTupleSize(ltup) - sizeof(IndexTupleData)), rtstate);
res = dosplit(r, b, stk->rts_parent, rtup, rtstate);
WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */
pfree(res);
} else {
(void) PageAddItem(p, (Item) rtup, IndexTupleSize(rtup),
PageGetMaxOffsetNumber(p), LP_USED);
WriteBuffer(b);
ldatum = (((char *) ltup) + sizeof(IndexTupleData));
rdatum = (((char *) rtup) + sizeof(IndexTupleData));
newdatum = (char *) (*rtstate->unionFn)(ldatum, rdatum);
rttighten(r, stk->rts_parent, newdatum,
(IndexTupleSize(rtup) - sizeof(IndexTupleData)), rtstate);
pfree(newdatum);
}
}
static void
rtnewroot(Relation r, IndexTuple lt, IndexTuple rt)
{
Buffer b;
Page p;
b = ReadBuffer(r, P_ROOT);
RTInitBuffer(b, 0);
p = BufferGetPage(b);
(void) PageAddItem(p, (Item) lt, IndexTupleSize(lt),
FirstOffsetNumber, LP_USED);
(void) PageAddItem(p, (Item) rt, IndexTupleSize(rt),
OffsetNumberNext(FirstOffsetNumber), LP_USED);
WriteBuffer(b);
}
static void
picksplit(Relation r,
Page page,
SPLITVEC *v,
IndexTuple itup,
RTSTATE *rtstate)
{
OffsetNumber maxoff;
OffsetNumber i, j;
IndexTuple item_1, item_2;
char *datum_alpha, *datum_beta;
char *datum_l, *datum_r;
char *union_d, *union_dl, *union_dr;
char *inter_d;
bool firsttime;
float size_alpha, size_beta, size_union, size_inter;
float size_waste, waste;
float size_l, size_r;
int nbytes;
OffsetNumber seed_1 = 0, seed_2 = 0;
OffsetNumber *left, *right;
maxoff = PageGetMaxOffsetNumber(page);
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);
firsttime = true;
waste = 0.0;
for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i)) {
item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j)) {
item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j));
datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
/* compute the wasted space by unioning these guys */
union_d = (char *)(rtstate->unionFn)(datum_alpha, datum_beta);
(rtstate->sizeFn)(union_d, &size_union);
inter_d = (char *)(rtstate->interFn)(datum_alpha, datum_beta);
(rtstate->sizeFn)(inter_d, &size_inter);
size_waste = size_union - size_inter;
pfree(union_d);
if (inter_d != (char *) NULL)
pfree(inter_d);
/*
* are these a more promising split that what we've
* already seen?
*/
if (size_waste > waste || firsttime) {
waste = size_waste;
seed_1 = i;
seed_2 = j;
firsttime = false;
}
}
}
left = v->spl_left;
v->spl_nleft = 0;
right = v->spl_right;
v->spl_nright = 0;
item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1));
datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
datum_l = (char *)(*rtstate->unionFn)(datum_alpha, datum_alpha);
(*rtstate->sizeFn)(datum_l, &size_l);
item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2));
datum_beta = ((char *) item_2) + sizeof(IndexTupleData);
datum_r = (char *)(*rtstate->unionFn)(datum_beta, datum_beta);
(*rtstate->sizeFn)(datum_r, &size_r);
/*
* Now split up the regions between the two seeds. An important
* property of this split algorithm is that the split vector v
* has the indices of items to be split in order in its left and
* right vectors. We exploit this property by doing a merge in
* the code that actually splits the page.
*
* For efficiency, we also place the new index tuple in this loop.
* This is handled at the very end, when we have placed all the
* existing tuples and i == maxoff + 1.
*/
maxoff = OffsetNumberNext(maxoff);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
/*
* If we've already decided where to place this item, just
* put it on the right list. Otherwise, we need to figure
* out which page needs the least enlargement in order to
* store the item.
*/
if (i == seed_1) {
*left++ = i;
v->spl_nleft++;
continue;
} else if (i == seed_2) {
*right++ = i;
v->spl_nright++;
continue;
}
/* okay, which page needs least enlargement? */
if (i == maxoff) {
item_1 = itup;
} else {
item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
}
datum_alpha = ((char *) item_1) + sizeof(IndexTupleData);
union_dl = (char *)(*rtstate->unionFn)(datum_l, datum_alpha);
union_dr = (char *)(*rtstate->unionFn)(datum_r, datum_alpha);
(*rtstate->sizeFn)(union_dl, &size_alpha);
(*rtstate->sizeFn)(union_dr, &size_beta);
/* pick which page to add it to */
if (size_alpha - size_l < size_beta - size_r) {
pfree(datum_l);
pfree(union_dr);
datum_l = union_dl;
size_l = size_alpha;
*left++ = i;
v->spl_nleft++;
} else {
pfree(datum_r);
pfree(union_dl);
datum_r = union_dr;
size_r = size_alpha;
*right++ = i;
v->spl_nright++;
}
}
*left = *right = FirstOffsetNumber; /* sentinel value, see dosplit() */
v->spl_ldatum = datum_l;
v->spl_rdatum = datum_r;
}
static void
RTInitBuffer(Buffer b, uint32 f)
{
RTreePageOpaque opaque;
Page page;
Size pageSize;
pageSize = BufferGetPageSize(b);
page = BufferGetPage(b);
memset(page, 0, (int) pageSize);
PageInit(page, pageSize, sizeof(RTreePageOpaqueData));
opaque = (RTreePageOpaque) PageGetSpecialPointer(page);
opaque->flags = f;
}
static OffsetNumber
choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate)
{
OffsetNumber maxoff;
OffsetNumber i;
char *ud, *id;
char *datum;
float usize, dsize;
OffsetNumber which;
float which_grow;
id = ((char *) it) + sizeof(IndexTupleData);
maxoff = PageGetMaxOffsetNumber(p);
which_grow = -1.0;
which = -1;
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
datum = (char *) PageGetItem(p, PageGetItemId(p, i));
datum += sizeof(IndexTupleData);
(*rtstate->sizeFn)(datum, &dsize);
ud = (char *) (*rtstate->unionFn)(datum, id);
(*rtstate->sizeFn)(ud, &usize);
pfree(ud);
if (which_grow < 0 || usize - dsize < which_grow) {
which = i;
which_grow = usize - dsize;
if (which_grow == 0)
break;
}
}
return (which);
}
static int
nospace(Page p, IndexTuple it)
{
return (PageGetFreeSpace(p) < IndexTupleSize(it));
}
void
freestack(RTSTACK *s)
{
RTSTACK *p;
while (s != (RTSTACK *) NULL) {
p = s->rts_parent;
pfree(s);
s = p;
}
}
char *
rtdelete(Relation r, ItemPointer tid)
{
BlockNumber blkno;
OffsetNumber offnum;
Buffer buf;
Page page;
/* must write-lock on delete */
RelationSetLockForWrite(r);
blkno = ItemPointerGetBlockNumber(tid);
offnum = ItemPointerGetOffsetNumber(tid);
/* adjust any scans that will be affected by this deletion */
rtadjscans(r, RTOP_DEL, blkno, offnum);
/* delete the index tuple */
buf = ReadBuffer(r, blkno);
page = BufferGetPage(buf);
PageIndexTupleDelete(page, offnum);
WriteBuffer(buf);
/* XXX -- two-phase locking, don't release the write lock */
return ((char *) NULL);
}
static void initRtstate(RTSTATE *rtstate, Relation index)
{
RegProcedure union_proc, size_proc, inter_proc;
func_ptr user_fn;
int pronargs;
union_proc = index_getprocid(index, 1, RT_UNION_PROC);
size_proc = index_getprocid(index, 1, RT_SIZE_PROC);
inter_proc = index_getprocid(index, 1, RT_INTER_PROC);
fmgr_info(union_proc, &user_fn, &pronargs);
rtstate->unionFn = user_fn;
fmgr_info(size_proc, &user_fn, &pronargs);
rtstate->sizeFn = user_fn;
fmgr_info(inter_proc, &user_fn, &pronargs);
rtstate->interFn = user_fn;
return;
}
#define RTDEBUG
#ifdef RTDEBUG
#include "utils/geo-decls.h"
void
_rtdump(Relation r)
{
Buffer buf;
Page page;
OffsetNumber offnum, maxoff;
BlockNumber blkno;
BlockNumber nblocks;
RTreePageOpaque po;
IndexTuple itup;
BlockNumber itblkno;
OffsetNumber itoffno;
char *datum;
char *itkey;
nblocks = RelationGetNumberOfBlocks(r);
for (blkno = 0; blkno < nblocks; blkno++) {
buf = ReadBuffer(r, blkno);
page = BufferGetPage(buf);
po = (RTreePageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
printf("Page %d maxoff %d <%s>\n", blkno, maxoff,
(po->flags & F_LEAF ? "LEAF" : "INTERNAL"));
if (PageIsEmpty(page)) {
ReleaseBuffer(buf);
continue;
}
for (offnum = FirstOffsetNumber;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum)) {
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
datum = ((char *) itup);
datum += sizeof(IndexTupleData);
itkey = (char *) box_out((BOX *) datum);
printf("\t[%d] size %d heap <%d,%d> key:%s\n",
offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
pfree(itkey);
}
ReleaseBuffer(buf);
}
}
#endif /* defined RTDEBUG */