1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Indexes with INCLUDE columns and their support in B-tree

This patch introduces INCLUDE clause to index definition.  This clause
specifies a list of columns which will be included as a non-key part in
the index.  The INCLUDE columns exist solely to allow more queries to
benefit from index-only scans.  Also, such columns don't need to have
appropriate operator classes.  Expressions are not supported as INCLUDE
columns since they cannot be used in index-only scans.

Index access methods supporting INCLUDE are indicated by amcaninclude flag
in IndexAmRoutine.  For now, only B-tree indexes support INCLUDE clause.

In B-tree indexes INCLUDE columns are truncated from pivot index tuples
(tuples located in non-leaf pages and high keys).  Therefore, B-tree indexes
now might have variable number of attributes.  This patch also provides
generic facility to support that: pivot tuples contain number of their
attributes in t_tid.ip_posid.  Free 13th bit of t_info is used for indicating
that.  This facility will simplify further support of index suffix truncation.
The changes of above are backward-compatible, pg_upgrade doesn't need special
handling of B-tree indexes for that.

Bump catalog version

Author: Anastasia Lubennikova with contribition by Alexander Korotkov and me
Reviewed by: Peter Geoghegan, Tomas Vondra, Antonin Houska, Jeff Janes,
			 David Rowley, Alexander Korotkov
Discussion: https://www.postgresql.org/message-id/flat/56168952.4010101@postgrespro.ru
This commit is contained in:
Teodor Sigaev
2018-04-07 23:00:39 +03:00
parent 01bb85169a
commit 8224de4f42
89 changed files with 2112 additions and 467 deletions

View File

@ -97,6 +97,7 @@ brinhandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcanparallel = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = brinbuild;

View File

@ -19,6 +19,7 @@
#include "access/heapam.h"
#include "access/itup.h"
#include "access/tuptoaster.h"
#include "utils/rel.h"
/* ----------------------------------------------------------------
@ -445,3 +446,33 @@ CopyIndexTuple(IndexTuple source)
memcpy(result, source, size);
return result;
}
/*
* Truncate tailing attributes from given index tuple leaving it with
* new_indnatts number of attributes.
*/
IndexTuple
index_truncate_tuple(TupleDesc tupleDescriptor, IndexTuple olditup,
int new_indnatts)
{
TupleDesc itupdesc = CreateTupleDescCopyConstr(tupleDescriptor);
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
IndexTuple newitup;
int indnatts = tupleDescriptor->natts;
Assert(indnatts <= INDEX_MAX_KEYS);
Assert(new_indnatts > 0);
Assert(new_indnatts < indnatts);
index_deform_tuple(olditup, tupleDescriptor, values, isnull);
/* form new tuple that will contain only key attributes */
itupdesc->natts = new_indnatts;
newitup = index_form_tuple(itupdesc, values, isnull);
newitup->t_tid = olditup->t_tid;
FreeTupleDesc(itupdesc);
Assert(IndexTupleSize(newitup) <= IndexTupleSize(olditup));
return newitup;
}

View File

@ -52,6 +52,7 @@ ginhandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = ginbuild;

View File

@ -74,6 +74,7 @@ gisthandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = gistbuild;

View File

@ -70,6 +70,7 @@ hashhandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = true;
amroutine->amcanparallel = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = INT4OID;
amroutine->ambuild = hashbuild;

View File

@ -8023,7 +8023,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
TupleDesc desc = RelationGetDescr(relation);
Oid replidindex;
Relation idx_rel;
TupleDesc idx_desc;
char replident = relation->rd_rel->relreplident;
HeapTuple key_tuple = NULL;
bool nulls[MaxHeapAttributeNumber];
@ -8066,7 +8065,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
}
idx_rel = RelationIdGetRelation(replidindex);
idx_desc = RelationGetDescr(idx_rel);
/* deform tuple, so we have fast access to columns */
heap_deform_tuple(tp, desc, values, nulls);
@ -8078,7 +8076,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *
* Now set all columns contained in the index to NOT NULL, they cannot
* currently be NULL.
*/
for (natt = 0; natt < idx_desc->natts; natt++)
for (natt = 0; natt < IndexRelationGetNumberOfKeyAttributes(idx_rel); natt++)
{
int attno = idx_rel->rd_index->indkey.values[natt];

View File

@ -158,7 +158,8 @@ IndexScanEnd(IndexScanDesc scan)
*
* Construct a string describing the contents of an index entry, in the
* form "(key_name, ...)=(key_value, ...)". This is currently used
* for building unique-constraint and exclusion-constraint error messages.
* for building unique-constraint and exclusion-constraint error messages,
* so only key columns of the index are checked and printed.
*
* Note that if the user does not have permissions to view all of the
* columns involved then a NULL is returned. Returning a partial key seems
@ -180,13 +181,15 @@ BuildIndexValueDescription(Relation indexRelation,
StringInfoData buf;
Form_pg_index idxrec;
HeapTuple ht_idx;
int natts = indexRelation->rd_rel->relnatts;
int indnkeyatts;
int i;
int keyno;
Oid indexrelid = RelationGetRelid(indexRelation);
Oid indrelid;
AclResult aclresult;
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
/*
* Check permissions- if the user does not have access to view all of the
* key columns then return NULL to avoid leaking data.
@ -224,7 +227,7 @@ BuildIndexValueDescription(Relation indexRelation,
* No table-level access, so step through the columns in the index and
* make sure the user has SELECT rights on all of them.
*/
for (keyno = 0; keyno < idxrec->indnatts; keyno++)
for (keyno = 0; keyno < idxrec->indnkeyatts; keyno++)
{
AttrNumber attnum = idxrec->indkey.values[keyno];
@ -250,7 +253,7 @@ BuildIndexValueDescription(Relation indexRelation,
appendStringInfo(&buf, "(%s)=(",
pg_get_indexdef_columns(indexrelid, true));
for (i = 0; i < natts; i++)
for (i = 0; i < indnkeyatts; i++)
{
char *val;
@ -368,7 +371,7 @@ systable_beginscan(Relation heapRelation,
{
int j;
for (j = 0; j < irel->rd_index->indnatts; j++)
for (j = 0; j < IndexRelationGetNumberOfAttributes(irel); j++)
{
if (key[i].sk_attno == irel->rd_index->indkey.values[j])
{
@ -376,7 +379,7 @@ systable_beginscan(Relation heapRelation,
break;
}
}
if (j == irel->rd_index->indnatts)
if (j == IndexRelationGetNumberOfAttributes(irel))
elog(ERROR, "column is not in index");
}
@ -570,7 +573,7 @@ systable_beginscan_ordered(Relation heapRelation,
{
int j;
for (j = 0; j < indexRelation->rd_index->indnatts; j++)
for (j = 0; j < IndexRelationGetNumberOfAttributes(indexRelation); j++)
{
if (key[i].sk_attno == indexRelation->rd_index->indkey.values[j])
{
@ -578,7 +581,7 @@ systable_beginscan_ordered(Relation heapRelation,
break;
}
}
if (j == indexRelation->rd_index->indnatts)
if (j == IndexRelationGetNumberOfAttributes(indexRelation))
elog(ERROR, "column is not in index");
}

View File

@ -590,6 +590,23 @@ original search scankey is consulted as each index entry is sequentially
scanned to decide whether to return the entry and whether the scan can
stop (see _bt_checkkeys()).
We use term "pivot" index tuples to distinguish tuples which don't point
to heap tuples, but rather used for tree navigation. Pivot tuples includes
all tuples on non-leaf pages and high keys on leaf pages. Note that pivot
index tuples are only used to represent which part of the key space belongs
on each page, and can have attribute values copied from non-pivot tuples
that were deleted and killed by VACUUM some time ago. In principle, we could
truncate away attributes that are not needed for a page high key during a leaf
page split, provided that the remaining attributes distinguish the last index
tuple on the post-split left page as belonging on the left page, and the first
index tuple on the post-split right page as belonging on the right page. This
optimization is sometimes called suffix truncation, and may appear in a future
release. Since the high key is subsequently reused as the downlink in the
parent page for the new right page, suffix truncation can increase index
fan-out considerably by keeping pivot tuples short. INCLUDE indexes similarly
truncate away non-key attributes at the time of a leaf page split,
increasing fan-out.
Notes About Data Representation
-------------------------------

View File

@ -82,7 +82,7 @@ static void _bt_checksplitloc(FindSplitData *state,
int dataitemstoleft, Size firstoldonrightsz);
static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
static bool _bt_isequal(Relation idxrel, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
@ -109,13 +109,16 @@ _bt_doinsert(Relation rel, IndexTuple itup,
IndexUniqueCheck checkUnique, Relation heapRel)
{
bool is_unique = false;
int natts = rel->rd_rel->relnatts;
int indnkeyatts;
ScanKey itup_scankey;
BTStack stack = NULL;
Buffer buf;
OffsetNumber offset;
bool fastpath;
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
Assert(indnkeyatts != 0);
/* we need an insertion scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
@ -173,12 +176,12 @@ top:
* page.
*/
if (P_ISLEAF(lpageop) && P_RIGHTMOST(lpageop) &&
!P_INCOMPLETE_SPLIT(lpageop) &&
!P_IGNORE(lpageop) &&
(PageGetFreeSpace(page) > itemsz) &&
PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) &&
_bt_compare(rel, natts, itup_scankey, page,
P_FIRSTDATAKEY(lpageop)) > 0)
!P_INCOMPLETE_SPLIT(lpageop) &&
!P_IGNORE(lpageop) &&
(PageGetFreeSpace(page) > itemsz) &&
PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) &&
_bt_compare(rel, indnkeyatts, itup_scankey, page,
P_FIRSTDATAKEY(lpageop)) > 0)
{
fastpath = true;
}
@ -209,7 +212,7 @@ top:
if (!fastpath)
{
/* find the first page containing this key */
stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE,
stack = _bt_search(rel, indnkeyatts, itup_scankey, false, &buf, BT_WRITE,
NULL);
/* trade in our read lock for a write lock */
@ -223,7 +226,7 @@ top:
* need to move right in the tree. See Lehman and Yao for an
* excruciatingly precise description.
*/
buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
buf = _bt_moveright(rel, buf, indnkeyatts, itup_scankey, false,
true, stack, BT_WRITE, NULL);
}
@ -253,7 +256,7 @@ top:
TransactionId xwait;
uint32 speculativeToken;
offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
offset = _bt_binsrch(rel, buf, indnkeyatts, itup_scankey, false);
xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
checkUnique, &is_unique, &speculativeToken);
@ -287,10 +290,12 @@ top:
* actual location of the insert is hard to predict because of the
* random search used to prevent O(N^2) performance when there are
* many duplicate entries, we can just use the "first valid" page.
* This reasoning also applies to INCLUDE indexes, whose extra
* attributes are not considered part of the key space.
*/
CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
_bt_findinsertloc(rel, &buf, &offset, indnkeyatts, itup_scankey, itup,
stack, heapRel);
_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
}
@ -333,8 +338,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
IndexUniqueCheck checkUnique, bool *is_unique,
uint32 *speculativeToken)
{
TupleDesc itupdesc = RelationGetDescr(rel);
int natts = rel->rd_rel->relnatts;
int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
SnapshotData SnapshotDirty;
OffsetNumber maxoff;
Page page;
@ -393,7 +397,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
* in real comparison, but only for ordering/finding items on
* pages. - vadim 03/24/97
*/
if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
if (!_bt_isequal(rel, page, offset, indnkeyatts, itup_scankey))
break; /* we're past all the equal tuples */
/* okay, we gotta fetch the heap tuple ... */
@ -557,8 +561,8 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
/* If scankey == hikey we gotta check the next page too */
if (P_RIGHTMOST(opaque))
break;
if (!_bt_isequal(itupdesc, page, P_HIKEY,
natts, itup_scankey))
if (!_bt_isequal(rel, page, P_HIKEY,
indnkeyatts, itup_scankey))
break;
/* Advance to next non-dead page --- there must be one */
for (;;)
@ -1087,6 +1091,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
OffsetNumber maxoff;
OffsetNumber i;
bool isleaf;
IndexTuple lefthikey;
int indnatts = IndexRelationGetNumberOfAttributes(rel);
int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
/* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
@ -1186,7 +1193,23 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(origpage, itemid);
}
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
/*
* We must truncate included attributes of the "high key" item, before
* insert it onto the leaf page. It's the only point in insertion
* process, where we perform truncation. All other functions work with
* this high key and do not change it.
*/
if (indnatts != indnkeyatts && isleaf)
{
lefthikey = _bt_truncate_tuple(rel, item);
itemsz = IndexTupleSize(lefthikey);
itemsz = MAXALIGN(itemsz);
}
else
lefthikey = item;
if (PageAddItem(leftpage, (Item) lefthikey, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
{
memset(rightpage, 0, BufferGetPageSize(rbuf));
@ -1375,6 +1398,7 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
xl_btree_split xlrec;
uint8 xlinfo;
XLogRecPtr recptr;
bool loglhikey = false;
xlrec.level = ropaque->btpo.level;
xlrec.firstright = firstright;
@ -1404,18 +1428,20 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
XLogRegisterBufData(0, (char *) newitem, MAXALIGN(newitemsz));
/* Log left page */
if (!isleaf)
if (!isleaf || indnatts != indnkeyatts)
{
/*
* We must also log the left page's high key, because the right
* page's leftmost key is suppressed on non-leaf levels. Show it
* as belonging to the left page buffer, so that it is not stored
* if XLogInsert decides it needs a full-page image of the left
* page.
* We must also log the left page's high key. There are two
* reasons for that: right page's leftmost key is suppressed on
* non-leaf levels and in covering indexes included columns are
* truncated from high keys. Show it as belonging to the left
* page buffer, so that it is not stored if XLogInsert decides it
* needs a full-page image of the left page.
*/
itemid = PageGetItemId(origpage, P_HIKEY);
item = (IndexTuple) PageGetItem(origpage, itemid);
XLogRegisterBufData(0, (char *) item, MAXALIGN(IndexTupleSize(item)));
loglhikey = true;
}
/*
@ -1434,7 +1460,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
(char *) rightpage + ((PageHeader) rightpage)->pd_upper,
((PageHeader) rightpage)->pd_special - ((PageHeader) rightpage)->pd_upper);
xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
xlinfo = newitemonleft ?
(loglhikey ? XLOG_BTREE_SPLIT_L_HIGHKEY : XLOG_BTREE_SPLIT_L) :
(loglhikey ? XLOG_BTREE_SPLIT_R_HIGHKEY : XLOG_BTREE_SPLIT_R);
recptr = XLogInsert(RM_BTREE_ID, xlinfo);
PageSetLSN(origpage, recptr);
@ -1664,7 +1692,12 @@ _bt_checksplitloc(FindSplitData *state,
/*
* The first item on the right page becomes the high key of the left page;
* therefore it counts against left space as well as right space.
* therefore it counts against left space as well as right space. When
* index has included attribues, then those attributes of left page high
* key will be truncate leaving that page with slightly more free space.
* However, that shouldn't affect our ability to find valid split
* location, because anyway split location should exists even without high
* key truncation.
*/
leftfree -= firstrightitemsz;
@ -1787,18 +1820,18 @@ _bt_insert_parent(Relation rel,
stack = &fakestack;
stack->bts_blkno = BufferGetBlockNumber(pbuf);
stack->bts_offset = InvalidOffsetNumber;
/* bts_btentry will be initialized below */
stack->bts_btentry = InvalidBlockNumber;
stack->bts_parent = NULL;
_bt_relbuf(rel, pbuf);
}
/* get high key from left page == lowest key on new right page */
/* get high key from left page == lower bound for new right page */
ritem = (IndexTuple) PageGetItem(page,
PageGetItemId(page, P_HIKEY));
/* form an index tuple that points at the new right page */
new_item = CopyIndexTuple(ritem);
ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
BTreeInnerTupleSetDownLink(new_item, rbknum);
/*
* Find the parent buffer and get the parent page.
@ -1807,7 +1840,7 @@ _bt_insert_parent(Relation rel,
* want to find parent pointing to where we are, right ? - vadim
* 05/27/97
*/
ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
stack->bts_btentry = bknum;
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
/*
@ -1962,7 +1995,8 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
{
itemid = PageGetItemId(page, offnum);
item = (IndexTuple) PageGetItem(page, itemid);
if (BTEntrySame(item, &stack->bts_btentry))
if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry)
{
/* Return accurate pointer to where link is now */
stack->bts_blkno = blkno;
@ -1977,7 +2011,8 @@ _bt_getstackbuf(Relation rel, BTStack stack, int access)
{
itemid = PageGetItemId(page, offnum);
item = (IndexTuple) PageGetItem(page, itemid);
if (BTEntrySame(item, &stack->bts_btentry))
if (BTreeInnerTupleGetDownLink(item) == stack->bts_btentry)
{
/* Return accurate pointer to where link is now */
stack->bts_blkno = blkno;
@ -2067,7 +2102,8 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
left_item_sz = sizeof(IndexTupleData);
left_item = (IndexTuple) palloc(left_item_sz);
left_item->t_info = left_item_sz;
ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY);
BTreeInnerTupleSetDownLink(left_item, lbkno);
BTreeTupSetNAtts(left_item, 0);
/*
* Create downlink item for right page. The key for it is obtained from
@ -2077,7 +2113,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
right_item_sz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(lpage, itemid);
right_item = CopyIndexTuple(item);
ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
BTreeInnerTupleSetDownLink(right_item, rbkno);
/* NO EREPORT(ERROR) from here till newroot op is logged */
START_CRIT_SECTION();
@ -2208,6 +2244,7 @@ _bt_pgaddtup(Page page,
{
trunctuple = *itup;
trunctuple.t_info = sizeof(IndexTupleData);
BTreeTupSetNAtts(&trunctuple, 0);
itup = &trunctuple;
itemsize = sizeof(IndexTupleData);
}
@ -2226,9 +2263,10 @@ _bt_pgaddtup(Page page,
* Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
*/
static bool
_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
_bt_isequal(Relation idxrel, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey)
{
TupleDesc itupdesc = RelationGetDescr(idxrel);
IndexTuple itup;
int i;
@ -2237,6 +2275,17 @@ _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
/*
* Index tuple shouldn't be truncated. Despite we technically could
* compare truncated tuple as well, this function should be only called
* for regular non-truncated leaf tuples and P_HIKEY tuple on
* rightmost leaf page.
*/
Assert((P_RIGHTMOST((BTPageOpaque) PageGetSpecialPointer(page)) ||
offnum != P_HIKEY)
? BTreeTupGetNAtts(itup, idxrel) == itupdesc->natts
: true);
for (i = 1; i <= keysz; i++)
{
AttrNumber attno;

View File

@ -1143,7 +1143,7 @@ _bt_lock_branch_parent(Relation rel, BlockNumber child, BTStack stack,
* Locate the downlink of "child" in the parent (updating the stack entry
* if needed)
*/
ItemPointerSet(&(stack->bts_btentry.t_tid), child, P_HIKEY);
stack->bts_btentry = child;
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
if (pbuf == InvalidBuffer)
elog(ERROR, "failed to re-find parent key in index \"%s\" for deletion target page %u",
@ -1414,8 +1414,9 @@ _bt_pagedel(Relation rel, Buffer buf)
/* we need an insertion scan key for the search, so build one */
itup_scankey = _bt_mkscankey(rel, targetkey);
/* find the leftmost leaf page containing this key */
stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
false, &lbuf, BT_READ, NULL);
stack = _bt_search(rel,
IndexRelationGetNumberOfKeyAttributes(rel),
itup_scankey, false, &lbuf, BT_READ, NULL);
/* don't need a pin on the page */
_bt_relbuf(rel, lbuf);
@ -1551,15 +1552,15 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack)
#ifdef USE_ASSERT_CHECKING
itemid = PageGetItemId(page, topoff);
itup = (IndexTuple) PageGetItem(page, itemid);
Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target);
Assert(BTreeInnerTupleGetDownLink(itup) == target);
#endif
nextoffset = OffsetNumberNext(topoff);
itemid = PageGetItemId(page, nextoffset);
itup = (IndexTuple) PageGetItem(page, itemid);
if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib)
if (BTreeInnerTupleGetDownLink(itup) != rightsib)
elog(ERROR, "right sibling %u of block %u is not next child %u of block %u in index \"%s\"",
rightsib, target, ItemPointerGetBlockNumber(&(itup->t_tid)),
rightsib, target, BTreeInnerTupleGetDownLink(itup),
BufferGetBlockNumber(topparent), RelationGetRelationName(rel));
/*
@ -1582,7 +1583,7 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack)
itemid = PageGetItemId(page, topoff);
itup = (IndexTuple) PageGetItem(page, itemid);
ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
BTreeInnerTupleSetDownLink(itup, rightsib);
nextoffset = OffsetNumberNext(topoff);
PageIndexTupleDelete(page, nextoffset);
@ -1601,7 +1602,7 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack)
MemSet(&trunctuple, 0, sizeof(IndexTupleData));
trunctuple.t_info = sizeof(IndexTupleData);
if (target != leafblkno)
ItemPointerSet(&trunctuple.t_tid, target, P_HIKEY);
ItemPointerSetBlockNumber(&trunctuple.t_tid, target);
else
ItemPointerSetInvalid(&trunctuple.t_tid);
if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
@ -1713,7 +1714,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsib_empty)
*/
if (ItemPointerIsValid(leafhikey))
{
target = ItemPointerGetBlockNumber(leafhikey);
target = ItemPointerGetBlockNumberNoCheck(leafhikey);
Assert(target != leafblkno);
/* fetch the block number of the topmost parent's left sibling */
@ -1829,7 +1830,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsib_empty)
/* remember the next non-leaf child down in the branch. */
itemid = PageGetItemId(page, P_FIRSTDATAKEY(opaque));
nextchild = ItemPointerGetBlockNumber(&((IndexTuple) PageGetItem(page, itemid))->t_tid);
nextchild = BTreeInnerTupleGetDownLink((IndexTuple) PageGetItem(page, itemid));
if (nextchild == leafblkno)
nextchild = InvalidBlockNumber;
}
@ -1920,7 +1921,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsib_empty)
if (nextchild == InvalidBlockNumber)
ItemPointerSetInvalid(leafhikey);
else
ItemPointerSet(leafhikey, nextchild, P_HIKEY);
ItemPointerSetBlockNumber(leafhikey, nextchild);
}
/*

View File

@ -121,6 +121,7 @@ bthandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
amroutine->amcanparallel = true;
amroutine->amcaninclude = true;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = btbuild;

View File

@ -147,7 +147,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
offnum = _bt_binsrch(rel, *bufP, keysz, scankey, nextkey);
itemid = PageGetItemId(page, offnum);
itup = (IndexTuple) PageGetItem(page, itemid);
blkno = ItemPointerGetBlockNumber(&(itup->t_tid));
blkno = BTreeInnerTupleGetDownLink(itup);
par_blkno = BufferGetBlockNumber(*bufP);
/*
@ -163,7 +163,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
new_stack = (BTStack) palloc(sizeof(BTStackData));
new_stack->bts_blkno = par_blkno;
new_stack->bts_offset = offnum;
memcpy(&new_stack->bts_btentry, itup, sizeof(IndexTupleData));
new_stack->bts_btentry = blkno;
new_stack->bts_parent = stack_in;
/* drop the read lock on the parent page, acquire one on the child */
@ -436,6 +436,15 @@ _bt_compare(Relation rel,
IndexTuple itup;
int i;
/*
* Check tuple has correct number of attributes.
*/
if (unlikely(!_bt_check_natts(rel, page, offnum)))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("tuple has wrong number of attributes in index \"%s\"",
RelationGetRelationName(rel))));
/*
* Force result ">" if target item is first data item on an internal page
* --- see NOTE above.
@ -1833,7 +1842,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
offnum = P_FIRSTDATAKEY(opaque);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
blkno = ItemPointerGetBlockNumber(&(itup->t_tid));
blkno = BTreeInnerTupleGetDownLink(itup);
buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
page = BufferGetPage(buf);
@ -1959,3 +1968,51 @@ _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir)
so->numKilled = 0; /* just paranoia */
so->markItemIndex = -1; /* ditto */
}
/*
* Check if index tuple have appropriate number of attributes.
*/
bool
_bt_check_natts(Relation index, Page page, OffsetNumber offnum)
{
int16 natts = IndexRelationGetNumberOfAttributes(index);
int16 nkeyatts = IndexRelationGetNumberOfKeyAttributes(index);
ItemId itemid;
IndexTuple itup;
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
* Assert that mask allocated for number of keys in index tuple can fit
* maximum number of index keys.
*/
StaticAssertStmt(BT_N_KEYS_OFFSET_MASK >= INDEX_MAX_KEYS,
"BT_N_KEYS_OFFSET_MASK can't fit INDEX_MAX_KEYS");
itemid = PageGetItemId(page, offnum);
itup = (IndexTuple) PageGetItem(page, itemid);
if (P_ISLEAF(opaque) && offnum >= P_FIRSTDATAKEY(opaque))
{
/*
* Regular leaf tuples have as every index attributes
*/
return (BTreeTupGetNAtts(itup, index) == natts);
}
else if (!P_ISLEAF(opaque) && offnum == P_FIRSTDATAKEY(opaque))
{
/*
* Leftmost tuples on non-leaf pages have no attributes, or haven't
* INDEX_ALT_TID_MASK set in pg_upgraded indexes.
*/
return (BTreeTupGetNAtts(itup, index) == 0 ||
((itup->t_info & INDEX_ALT_TID_MASK) == 0));
}
else
{
/*
* Pivot tuples stored in non-leaf pages and hikeys of leaf pages
* contain only key attributes
*/
return (BTreeTupGetNAtts(itup, index) == nkeyatts);
}
}

View File

@ -752,6 +752,7 @@ _bt_sortaddtup(Page page,
{
trunctuple = *itup;
trunctuple.t_info = sizeof(IndexTupleData);
BTreeTupSetNAtts(&trunctuple, 0);
itup = &trunctuple;
itemsize = sizeof(IndexTupleData);
}
@ -802,6 +803,9 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
OffsetNumber last_off;
Size pgspc;
Size itupsz;
BTPageOpaque pageop;
int indnatts = IndexRelationGetNumberOfAttributes(wstate->index);
int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(wstate->index);
/*
* This is a handy place to check for cancel interrupts during the btree
@ -856,6 +860,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
ItemId ii;
ItemId hii;
IndexTuple oitup;
IndexTuple keytup;
BTPageOpaque opageop = (BTPageOpaque) PageGetSpecialPointer(opage);
/* Create new page of same level */
npage = _bt_blnewpage(state->btps_level);
@ -883,6 +889,29 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
ItemIdSetUnused(ii); /* redundant */
((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
if (indnkeyatts != indnatts && P_ISLEAF(opageop))
{
/*
* We truncate included attributes of high key here. Subsequent
* insertions assume that hikey is already truncated, and so they
* need not worry about it, when copying the high key into the
* parent page as a downlink.
*
* The code above have just rearranged item pointers, but it
* didn't save any space. In order to save the space on page we
* have to truly shift index tuples on the page. But that's not
* so bad for performance, because we operating pd_upper and don't
* have to shift much of tuples memory. Shift of ItemId's is
* rather cheap, because they are small.
*/
keytup = _bt_truncate_tuple(wstate->index, oitup);
/* delete "wrong" high key, insert keytup as P_HIKEY. */
PageIndexTupleDelete(opage, P_HIKEY);
_bt_sortaddtup(opage, IndexTupleSize(keytup), keytup, P_HIKEY);
}
/*
* Link the old page into its parent, using its minimum key. If we
* don't have a parent, we have to create one; this adds a new btree
@ -892,15 +921,18 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
state->btps_next = _bt_pagestate(wstate, state->btps_level + 1);
Assert(state->btps_minkey != NULL);
ItemPointerSet(&(state->btps_minkey->t_tid), oblkno, P_HIKEY);
BTreeInnerTupleSetDownLink(state->btps_minkey, oblkno);
_bt_buildadd(wstate, state->btps_next, state->btps_minkey);
pfree(state->btps_minkey);
/*
* Save a copy of the minimum key for the new page. We have to copy
* it off the old page, not the new one, in case we are not at leaf
* level.
* level. Despite oitup is already initialized, it's important to get
* high key from the page, since we could have replaced it with
* truncated copy. See comment above.
*/
oitup = (IndexTuple) PageGetItem(opage, PageGetItemId(opage, P_HIKEY));
state->btps_minkey = CopyIndexTuple(oitup);
/*
@ -927,6 +959,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
last_off = P_FIRSTKEY;
}
pageop = (BTPageOpaque) PageGetSpecialPointer(npage);
/*
* If the new item is the first for its page, stash a copy for later. Note
* this will only happen for the first item on a level; on later pages,
@ -936,7 +970,15 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
if (last_off == P_HIKEY)
{
Assert(state->btps_minkey == NULL);
state->btps_minkey = CopyIndexTuple(itup);
/*
* Truncate included attributes of the tuple that we're going to
* insert into the parent page as a downlink
*/
if (indnkeyatts != indnatts && P_ISLEAF(pageop))
state->btps_minkey = _bt_truncate_tuple(wstate->index, itup);
else
state->btps_minkey = CopyIndexTuple(itup);
}
/*
@ -989,7 +1031,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
else
{
Assert(s->btps_minkey != NULL);
ItemPointerSet(&(s->btps_minkey->t_tid), blkno, P_HIKEY);
BTreeInnerTupleSetDownLink(s->btps_minkey, blkno);
_bt_buildadd(wstate, s->btps_next, s->btps_minkey);
pfree(s->btps_minkey);
s->btps_minkey = NULL;
@ -1029,7 +1071,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
bool load1;
TupleDesc tupdes = RelationGetDescr(wstate->index);
int i,
keysz = RelationGetNumberOfAttributes(wstate->index);
keysz = IndexRelationGetNumberOfKeyAttributes(wstate->index);
ScanKey indexScanKey = NULL;
SortSupport sortKeys;

View File

@ -63,17 +63,28 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
{
ScanKey skey;
TupleDesc itupdesc;
int natts;
int indnatts PG_USED_FOR_ASSERTS_ONLY;
int indnkeyatts;
int16 *indoption;
int i;
itupdesc = RelationGetDescr(rel);
natts = RelationGetNumberOfAttributes(rel);
indnatts = IndexRelationGetNumberOfAttributes(rel);
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
indoption = rel->rd_indoption;
skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
Assert(indnkeyatts != 0);
Assert(indnkeyatts <= indnatts);
Assert(BTreeTupGetNAtts(itup, rel) == indnatts ||
BTreeTupGetNAtts(itup, rel) == indnkeyatts);
for (i = 0; i < natts; i++)
/*
* We'll execute search using ScanKey constructed on key columns. Non key
* (included) columns must be omitted.
*/
skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
for (i = 0; i < indnkeyatts; i++)
{
FmgrInfo *procinfo;
Datum arg;
@ -115,16 +126,16 @@ ScanKey
_bt_mkscankey_nodata(Relation rel)
{
ScanKey skey;
int natts;
int indnkeyatts;
int16 *indoption;
int i;
natts = RelationGetNumberOfAttributes(rel);
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
indoption = rel->rd_indoption;
skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
for (i = 0; i < natts; i++)
for (i = 0; i < indnkeyatts; i++)
{
FmgrInfo *procinfo;
int flags;
@ -2069,3 +2080,30 @@ btproperty(Oid index_oid, int attno,
return false; /* punt to generic code */
}
}
/*
* _bt_truncate_tuple() -- remove non-key (INCLUDE) attributes from index
* tuple.
*
* Transforms an ordinal B-tree leaf index tuple into pivot tuple to be used
* as hikey or non-leaf page tuple with downlink. Note that t_tid offset
* will be overritten in order to represent number of present tuple attributes.
*/
IndexTuple
_bt_truncate_tuple(Relation idxrel, IndexTuple olditup)
{
IndexTuple newitup;
int nkeyattrs = IndexRelationGetNumberOfKeyAttributes(idxrel);
/*
* We're assuming to truncate only regular leaf index tuples which have
* both key and non-key attributes.
*/
Assert(BTreeTupGetNAtts(olditup, idxrel) == IndexRelationGetNumberOfAttributes(idxrel));
newitup = index_truncate_tuple(RelationGetDescr(idxrel),
olditup, nkeyattrs);
BTreeTupSetNAtts(newitup, nkeyattrs);
return newitup;
}

View File

@ -202,7 +202,7 @@ btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record)
}
static void
btree_xlog_split(bool onleft, XLogReaderState *record)
btree_xlog_split(bool onleft, bool lhighkey, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
@ -248,11 +248,14 @@ btree_xlog_split(bool onleft, XLogReaderState *record)
_bt_restore_page(rpage, datapos, datalen);
/* Non-leaf page should always have its high key logged. */
Assert(isleaf || lhighkey);
/*
* On leaf level, the high key of the left page is equal to the first key
* on the right page.
* When the high key isn't present is the wal record, then we assume it to
* be equal to the first key on the right page.
*/
if (isleaf)
if (!lhighkey)
{
ItemId hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
@ -296,13 +299,14 @@ btree_xlog_split(bool onleft, XLogReaderState *record)
}
/* Extract left hikey and its size (assuming 16-bit alignment) */
if (!isleaf)
if (lhighkey)
{
left_hikey = (IndexTuple) datapos;
left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
datapos += left_hikeysz;
datalen -= left_hikeysz;
}
Assert(datalen == 0);
newlpage = PageGetTempPageCopySpecial(lpage);
@ -616,7 +620,7 @@ btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record)
* heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
* Note that we are not looking at tuple data here, just headers.
*/
hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
hoffnum = ItemPointerGetOffsetNumberNoCheck(&(itup->t_tid));
hitemid = PageGetItemId(hpage, hoffnum);
/*
@ -764,11 +768,11 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
nextoffset = OffsetNumberNext(poffset);
itemid = PageGetItemId(page, nextoffset);
itup = (IndexTuple) PageGetItem(page, itemid);
rightsib = ItemPointerGetBlockNumber(&itup->t_tid);
rightsib = BTreeInnerTupleGetDownLink(itup);
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple) PageGetItem(page, itemid);
ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
BTreeInnerTupleSetDownLink(itup, rightsib);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
@ -798,7 +802,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
MemSet(&trunctuple, 0, sizeof(IndexTupleData));
trunctuple.t_info = sizeof(IndexTupleData);
if (xlrec->topparent != InvalidBlockNumber)
ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY);
ItemPointerSetBlockNumber(&trunctuple.t_tid, xlrec->topparent);
else
ItemPointerSetInvalid(&trunctuple.t_tid);
if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
@ -908,7 +912,7 @@ btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
MemSet(&trunctuple, 0, sizeof(IndexTupleData));
trunctuple.t_info = sizeof(IndexTupleData);
if (xlrec->topparent != InvalidBlockNumber)
ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY);
ItemPointerSetBlockNumber(&trunctuple.t_tid, xlrec->topparent);
else
ItemPointerSetInvalid(&trunctuple.t_tid);
if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY,
@ -1004,10 +1008,16 @@ btree_redo(XLogReaderState *record)
btree_xlog_insert(false, true, record);
break;
case XLOG_BTREE_SPLIT_L:
btree_xlog_split(true, record);
btree_xlog_split(true, false, record);
break;
case XLOG_BTREE_SPLIT_L_HIGHKEY:
btree_xlog_split(true, true, record);
break;
case XLOG_BTREE_SPLIT_R:
btree_xlog_split(false, record);
btree_xlog_split(false, false, record);
break;
case XLOG_BTREE_SPLIT_R_HIGHKEY:
btree_xlog_split(false, true, record);
break;
case XLOG_BTREE_VACUUM:
btree_xlog_vacuum(record);

View File

@ -35,6 +35,8 @@ btree_desc(StringInfo buf, XLogReaderState *record)
}
case XLOG_BTREE_SPLIT_L:
case XLOG_BTREE_SPLIT_R:
case XLOG_BTREE_SPLIT_L_HIGHKEY:
case XLOG_BTREE_SPLIT_R_HIGHKEY:
{
xl_btree_split *xlrec = (xl_btree_split *) rec;
@ -119,6 +121,12 @@ btree_identify(uint8 info)
case XLOG_BTREE_SPLIT_R:
id = "SPLIT_R";
break;
case XLOG_BTREE_SPLIT_L_HIGHKEY:
id = "SPLIT_L_HIGHKEY";
break;
case XLOG_BTREE_SPLIT_R_HIGHKEY:
id = "SPLIT_R_HIGHKEY";
break;
case XLOG_BTREE_VACUUM:
id = "VACUUM";
break;

View File

@ -50,6 +50,7 @@ spghandler(PG_FUNCTION_ARGS)
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcanparallel = false;
amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = spgbuild;