1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-28 23:42:10 +03:00

CREATE INDEX ... INCLUDING (column[, ...])

Now indexes (but only B-tree for now) can contain "extra" column(s) which
doesn't participate in index structure, they are just stored in leaf
tuples. It allows to use index only scan by using single index instead
of two or more indexes.

Author: Anastasia Lubennikova with minor editorializing by me
Reviewers: David Rowley, Peter Geoghegan, Jeff Janes
This commit is contained in:
Teodor Sigaev
2016-04-08 19:31:49 +03:00
parent 339025c68f
commit 386e3d7609
68 changed files with 1321 additions and 256 deletions

View File

@ -92,6 +92,7 @@ brinhandler(PG_FUNCTION_ARGS)
amroutine->amstorage = true;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = brinbuild;

View File

@ -19,6 +19,7 @@
#include "access/heapam.h"
#include "access/itup.h"
#include "access/tuptoaster.h"
#include "utils/rel.h"
/* ----------------------------------------------------------------
@ -441,3 +442,33 @@ CopyIndexTuple(IndexTuple source)
memcpy(result, source, size);
return result;
}
/*
* Reform index tuple. Truncate nonkey (INCLUDING) attributes.
*/
IndexTuple
index_truncate_tuple(Relation idxrel, IndexTuple olditup)
{
TupleDesc itupdesc = RelationGetDescr(idxrel);
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
IndexTuple newitup;
int indnatts = IndexRelationGetNumberOfAttributes(idxrel);
int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(idxrel);
Assert(indnatts <= INDEX_MAX_KEYS);
Assert(indnkeyatts > 0);
Assert(indnkeyatts < indnatts);
index_deform_tuple(olditup, itupdesc, values, isnull);
/* form new tuple that will contain only key attributes */
itupdesc->natts = indnkeyatts;
newitup = index_form_tuple(itupdesc, values, isnull);
newitup->t_tid = olditup->t_tid;
itupdesc->natts = indnatts;
Assert(IndexTupleSize(newitup) <= IndexTupleSize(olditup));
return newitup;
}

View File

@ -47,6 +47,7 @@ ginhandler(PG_FUNCTION_ARGS)
amroutine->amstorage = true;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = ginbuild;

View File

@ -69,6 +69,7 @@ gisthandler(PG_FUNCTION_ARGS)
amroutine->amstorage = true;
amroutine->amclusterable = true;
amroutine->ampredlocks = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = gistbuild;

View File

@ -64,6 +64,7 @@ hashhandler(PG_FUNCTION_ARGS)
amroutine->amstorage = false;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = INT4OID;
amroutine->ambuild = hashbuild;

View File

@ -174,13 +174,15 @@ BuildIndexValueDescription(Relation indexRelation,
StringInfoData buf;
Form_pg_index idxrec;
HeapTuple ht_idx;
int natts = indexRelation->rd_rel->relnatts;
int indnkeyatts;
int i;
int keyno;
Oid indexrelid = RelationGetRelid(indexRelation);
Oid indrelid;
AclResult aclresult;
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
/*
* Check permissions- if the user does not have access to view all of the
* key columns then return NULL to avoid leaking data.
@ -218,7 +220,7 @@ BuildIndexValueDescription(Relation indexRelation,
* No table-level access, so step through the columns in the index and
* make sure the user has SELECT rights on all of them.
*/
for (keyno = 0; keyno < idxrec->indnatts; keyno++)
for (keyno = 0; keyno < idxrec->indnkeyatts; keyno++)
{
AttrNumber attnum = idxrec->indkey.values[keyno];
@ -244,7 +246,7 @@ BuildIndexValueDescription(Relation indexRelation,
appendStringInfo(&buf, "(%s)=(",
pg_get_indexdef_columns(indexrelid, true));
for (i = 0; i < natts; i++)
for (i = 0; i < indnkeyatts; i++)
{
char *val;
@ -362,7 +364,7 @@ systable_beginscan(Relation heapRelation,
{
int j;
for (j = 0; j < irel->rd_index->indnatts; j++)
for (j = 0; j < IndexRelationGetNumberOfAttributes(irel); j++)
{
if (key[i].sk_attno == irel->rd_index->indkey.values[j])
{
@ -370,7 +372,7 @@ systable_beginscan(Relation heapRelation,
break;
}
}
if (j == irel->rd_index->indnatts)
if (j == IndexRelationGetNumberOfAttributes(irel))
elog(ERROR, "column is not in index");
}
@ -564,7 +566,7 @@ systable_beginscan_ordered(Relation heapRelation,
{
int j;
for (j = 0; j < indexRelation->rd_index->indnatts; j++)
for (j = 0; j < IndexRelationGetNumberOfAttributes(indexRelation); j++)
{
if (key[i].sk_attno == indexRelation->rd_index->indkey.values[j])
{
@ -572,7 +574,7 @@ systable_beginscan_ordered(Relation heapRelation,
break;
}
}
if (j == indexRelation->rd_index->indnatts)
if (j == IndexRelationGetNumberOfAttributes(indexRelation))
elog(ERROR, "column is not in index");
}

View File

@ -78,8 +78,6 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
static void _bt_checksplitloc(FindSplitData *state,
OffsetNumber firstoldonright, bool newitemonleft,
int dataitemstoleft, Size firstoldonrightsz);
static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
@ -108,18 +106,22 @@ _bt_doinsert(Relation rel, IndexTuple itup,
IndexUniqueCheck checkUnique, Relation heapRel)
{
bool is_unique = false;
int natts = rel->rd_rel->relnatts;
int indnkeyatts;
ScanKey itup_scankey;
BTStack stack;
Buffer buf;
OffsetNumber offset;
Assert(IndexRelationGetNumberOfAttributes(rel) != 0);
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
Assert(indnkeyatts != 0);
/* we need an insertion scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
top:
/* find the first page containing this key */
stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
stack = _bt_search(rel, indnkeyatts, itup_scankey, false, &buf, BT_WRITE);
offset = InvalidOffsetNumber;
@ -134,7 +136,7 @@ top:
* move right in the tree. See Lehman and Yao for an excruciatingly
* precise description.
*/
buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
buf = _bt_moveright(rel, buf, indnkeyatts, itup_scankey, false,
true, stack, BT_WRITE);
/*
@ -163,7 +165,7 @@ top:
TransactionId xwait;
uint32 speculativeToken;
offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
offset = _bt_binsrch(rel, buf, indnkeyatts, itup_scankey, false);
xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
checkUnique, &is_unique, &speculativeToken);
@ -199,7 +201,7 @@ top:
*/
CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
_bt_findinsertloc(rel, &buf, &offset, indnkeyatts, itup_scankey, itup,
stack, heapRel);
_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
}
@ -242,7 +244,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
uint32 *speculativeToken)
{
TupleDesc itupdesc = RelationGetDescr(rel);
int natts = rel->rd_rel->relnatts;
int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
SnapshotData SnapshotDirty;
OffsetNumber maxoff;
Page page;
@ -301,7 +303,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
* in real comparison, but only for ordering/finding items on
* pages. - vadim 03/24/97
*/
if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
if (!_bt_isequal(itupdesc, page, offset, indnkeyatts, itup_scankey))
break; /* we're past all the equal tuples */
/* okay, we gotta fetch the heap tuple ... */
@ -465,7 +467,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
if (P_RIGHTMOST(opaque))
break;
if (!_bt_isequal(itupdesc, page, P_HIKEY,
natts, itup_scankey))
indnkeyatts, itup_scankey))
break;
/* Advance to next non-dead page --- there must be one */
for (;;)
@ -980,6 +982,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
OffsetNumber i;
bool isroot;
bool isleaf;
IndexTuple lefthikey;
int indnatts = IndexRelationGetNumberOfAttributes(rel);
int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
/* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
@ -1080,7 +1085,22 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(origpage, itemid);
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
/*
* We must truncate the "high key" item, before insert it onto the leaf page.
* It's the only point in insertion process, where we perform truncation.
* All other functions work with this high key and do not change it.
*/
if (indnatts != indnkeyatts && P_ISLEAF(lopaque))
{
lefthikey = index_truncate_tuple(rel, item);
itemsz = IndexTupleSize(lefthikey);
itemsz = MAXALIGN(itemsz);
}
else
lefthikey = item;
if (PageAddItem(leftpage, (Item) lefthikey, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
{
memset(rightpage, 0, BufferGetPageSize(rbuf));
@ -1969,6 +1989,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
itemid = PageGetItemId(lpage, P_HIKEY);
right_item_sz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(lpage, itemid);
right_item = CopyIndexTuple(item);
ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
@ -2086,7 +2107,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
* we insert the tuples in order, so that the given itup_off does
* represent the final position of the tuple!
*/
static bool
bool
_bt_pgaddtup(Page page,
Size itemsize,
IndexTuple itup,

View File

@ -1254,8 +1254,9 @@ _bt_pagedel(Relation rel, Buffer buf)
/* we need an insertion scan key for the search, so build one */
itup_scankey = _bt_mkscankey(rel, targetkey);
/* find the leftmost leaf page containing this key */
stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
false, &lbuf, BT_READ);
stack = _bt_search(rel,
IndexRelationGetNumberOfKeyAttributes(rel),
itup_scankey, false, &lbuf, BT_READ);
/* don't need a pin on the page */
_bt_relbuf(rel, lbuf);

View File

@ -97,6 +97,7 @@ bthandler(PG_FUNCTION_ARGS)
amroutine->amstorage = false;
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amcaninclude = true;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = btbuild;

View File

@ -431,6 +431,8 @@ _bt_compare(Relation rel,
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
Assert (keysz <= rel->rd_index->indnkeyatts);
/*
* The scan key is set up with the attribute number associated with each
* term in the key. It is important that, if the index is multi-key, the

View File

@ -456,6 +456,9 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
OffsetNumber last_off;
Size pgspc;
Size itupsz;
BTPageOpaque pageop;
int indnatts = IndexRelationGetNumberOfAttributes(wstate->index);
int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(wstate->index);
/*
* This is a handy place to check for cancel interrupts during the btree
@ -510,6 +513,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
ItemId ii;
ItemId hii;
IndexTuple oitup;
IndexTuple keytup;
BTPageOpaque opageop = (BTPageOpaque) PageGetSpecialPointer(opage);
/* Create new page of same level */
npage = _bt_blnewpage(state->btps_level);
@ -537,6 +542,28 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
ItemIdSetUnused(ii); /* redundant */
((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
if (indnkeyatts != indnatts && P_ISLEAF(opageop))
{
/*
* It's essential to truncate High key here.
* The purpose is not just to save more space on this particular page,
* but to keep whole b-tree structure consistent. Subsequent insertions
* assume that hikey is already truncated, and so they should not
* worry about it, when copying the high key into the parent page
* as a downlink.
* NOTE It is not crutial for reliability in present,
* but maybe it will be that in the future.
*/
keytup = index_truncate_tuple(wstate->index, oitup);
/* delete "wrong" high key, insert keytup as P_HIKEY. */
PageIndexTupleDelete(opage, P_HIKEY);
if (!_bt_pgaddtup(opage, IndexTupleSize(keytup), keytup, P_HIKEY))
elog(ERROR, "failed to rewrite compressed item in index \"%s\"",
RelationGetRelationName(wstate->index));
}
/*
* Link the old page into its parent, using its minimum key. If we
* don't have a parent, we have to create one; this adds a new btree
@ -554,8 +581,15 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
* Save a copy of the minimum key for the new page. We have to copy
* it off the old page, not the new one, in case we are not at leaf
* level.
* If tuple contains non-key attributes, truncate them.
* We perform truncation only for leaf pages,
* beacuse all tuples at inner pages will be already
* truncated by the time we handle them.
*/
state->btps_minkey = CopyIndexTuple(oitup);
if (indnkeyatts != indnatts && P_ISLEAF(opageop))
state->btps_minkey = index_truncate_tuple(wstate->index, oitup);
else
state->btps_minkey = CopyIndexTuple(oitup);
/*
* Set the sibling links for both pages.
@ -581,6 +615,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
last_off = P_FIRSTKEY;
}
pageop = (BTPageOpaque) PageGetSpecialPointer(npage);
/*
* If the new item is the first for its page, stash a copy for later. Note
* this will only happen for the first item on a level; on later pages,
@ -590,7 +625,14 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
if (last_off == P_HIKEY)
{
Assert(state->btps_minkey == NULL);
state->btps_minkey = CopyIndexTuple(itup);
/*
* Truncate the tuple that we're going to insert
* into the parent page as a downlink
*/
if (indnkeyatts != indnatts && P_ISLEAF(pageop))
state->btps_minkey = index_truncate_tuple(wstate->index, itup);
else
state->btps_minkey = CopyIndexTuple(itup);
}
/*
@ -685,7 +727,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
load1;
TupleDesc tupdes = RelationGetDescr(wstate->index);
int i,
keysz = RelationGetNumberOfAttributes(wstate->index);
keysz = IndexRelationGetNumberOfKeyAttributes(wstate->index);
ScanKey indexScanKey = NULL;
SortSupport sortKeys;

View File

@ -63,17 +63,26 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
{
ScanKey skey;
TupleDesc itupdesc;
int natts;
int indnatts,
indnkeyatts;
int16 *indoption;
int i;
itupdesc = RelationGetDescr(rel);
natts = RelationGetNumberOfAttributes(rel);
indnatts = IndexRelationGetNumberOfAttributes(rel);
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
indoption = rel->rd_indoption;
skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
Assert(indnkeyatts != 0);
Assert(indnkeyatts <= indnatts);
for (i = 0; i < natts; i++)
/*
* We'll execute search using ScanKey constructed on key columns.
* Non key (included) columns must be omitted.
*/
skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
for (i = 0; i < indnkeyatts; i++)
{
FmgrInfo *procinfo;
Datum arg;
@ -115,16 +124,16 @@ ScanKey
_bt_mkscankey_nodata(Relation rel)
{
ScanKey skey;
int natts;
int indnkeyatts;
int16 *indoption;
int i;
natts = RelationGetNumberOfAttributes(rel);
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
indoption = rel->rd_indoption;
skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
for (i = 0; i < natts; i++)
for (i = 0; i < indnkeyatts; i++)
{
FmgrInfo *procinfo;
int flags;

View File

@ -48,6 +48,7 @@ spghandler(PG_FUNCTION_ARGS)
amroutine->amstorage = false;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = spgbuild;