1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Mark index entries "killed" when they are no longer visible to any

transaction, so as to avoid returning them out of the index AM.  Saves
repeated heap_fetch operations on frequently-updated rows.  Also detect
queries on unique keys (equality to all columns of a unique index), and
don't bother continuing scan once we have found first match.

Killing is implemented in the btree and hash AMs, but not yet in rtree
or gist, because there isn't an equally convenient place to do it in
those AMs (the outer amgetnext routine can't do it without re-pinning
the index page).

Did some small cleanup on APIs of HeapTupleSatisfies, heap_fetch, and
index_insert to make this a little easier.
This commit is contained in:
Tom Lane
2002-05-24 18:57:57 +00:00
parent 2f2d05763d
commit 3f4d488022
30 changed files with 498 additions and 273 deletions

View File

@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.92 2002/05/20 23:51:40 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/gist/gist.c,v 1.93 2002/05/24 18:57:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -294,9 +294,9 @@ gistinsert(PG_FUNCTION_ARGS)
Datum *datum = (Datum *) PG_GETARG_POINTER(1);
char *nulls = (char *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
bool checkUnique = PG_GETARG_BOOL(5);
#endif
InsertIndexResult res;
IndexTuple itup;
@@ -1607,6 +1607,8 @@ gistbulkdelete(PG_FUNCTION_ARGS)
/* walk through the entire index */
iscan = index_beginscan(NULL, rel, SnapshotAny, 0, (ScanKey) NULL);
/* including killed tuples */
iscan->ignore_killed_tuples = false;
while (index_getnext_indexitem(iscan, ForwardScanDirection))
{

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.57 2002/05/20 23:51:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.58 2002/05/24 18:57:55 tgl Exp $
*
* NOTES
* This file contains only the public interface routines.
@@ -166,8 +166,8 @@ hashinsert(PG_FUNCTION_ARGS)
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
bool checkUnique = PG_GETARG_BOOL(5);
#endif
InsertIndexResult res;
HashItem hitem;
IndexTuple itup;
@@ -210,6 +210,9 @@ hashgettuple(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Page page;
OffsetNumber offnum;
bool res;
/*
@@ -217,12 +220,49 @@ hashgettuple(PG_FUNCTION_ARGS)
* the appropriate direction. If we haven't done so yet, we call a
* routine to get the first item in the scan.
*/
if (ItemPointerIsValid(&(scan->currentItemData)))
{
/*
* Check to see if we should kill the previously-fetched tuple.
*/
if (scan->kill_prior_tuple)
{
/*
* Yes, so mark it by setting the LP_DELETE bit in the item flags.
*/
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->hashso_curbuf);
PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
/*
* Since this can be redone later if needed, it's treated the
* same as a commit-hint-bit status update for heap tuples:
* we mark the buffer dirty but don't make a WAL log entry.
*/
SetBufferCommitInfoNeedsSave(so->hashso_curbuf);
}
/*
* Now continue the scan.
*/
res = _hash_next(scan, dir);
}
else
res = _hash_first(scan, dir);
/*
* Skip killed tuples if asked to.
*/
if (scan->ignore_killed_tuples)
{
while (res)
{
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->hashso_curbuf);
if (!ItemIdDeleted(PageGetItemId(page, offnum)))
break;
res = _hash_next(scan, dir);
}
}
PG_RETURN_BOOL(res);
}
@@ -418,6 +458,8 @@ hashbulkdelete(PG_FUNCTION_ARGS)
/* walk through the entire index */
iscan = index_beginscan(NULL, rel, SnapshotAny, 0, (ScanKey) NULL);
/* including killed tuples */
iscan->ignore_killed_tuples = false;
while (index_getnext_indexitem(iscan, ForwardScanDirection))
{

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.26 2002/05/20 23:51:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashscan.c,v 1.27 2002/05/24 18:57:55 tgl Exp $
*
* NOTES
* Because we can be doing an index scan on a relation while we
@@ -32,8 +32,6 @@
#include "access/hash.h"
static void _hash_scandel(IndexScanDesc scan, BlockNumber blkno, OffsetNumber offno);
static bool _hash_scantouched(IndexScanDesc scan, BlockNumber blkno, OffsetNumber offno);
typedef struct HashScanListData
{
@@ -46,6 +44,10 @@ typedef HashScanListData *HashScanList;
static HashScanList HashScans = (HashScanList) NULL;
static void _hash_scandel(IndexScanDesc scan,
BlockNumber blkno, OffsetNumber offno);
/*
* AtEOXact_hash() --- clean up hash subsystem at xact abort or commit.
*
@@ -129,63 +131,51 @@ static void
_hash_scandel(IndexScanDesc scan, BlockNumber blkno, OffsetNumber offno)
{
ItemPointer current;
ItemPointer mark;
Buffer buf;
Buffer metabuf;
HashScanOpaque so;
if (!_hash_scantouched(scan, blkno, offno))
return;
metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ);
so = (HashScanOpaque) scan->opaque;
buf = so->hashso_curbuf;
current = &(scan->currentItemData);
mark = &(scan->currentMarkData);
if (ItemPointerIsValid(current)
&& ItemPointerGetBlockNumber(current) == blkno
&& ItemPointerGetOffsetNumber(current) >= offno)
{
metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ);
buf = so->hashso_curbuf;
_hash_step(scan, &buf, BackwardScanDirection, metabuf);
so->hashso_curbuf = buf;
}
current = &(scan->currentMarkData);
if (ItemPointerIsValid(current)
&& ItemPointerGetBlockNumber(current) == blkno
&& ItemPointerGetOffsetNumber(current) >= offno)
if (ItemPointerIsValid(mark)
&& ItemPointerGetBlockNumber(mark) == blkno
&& ItemPointerGetOffsetNumber(mark) >= offno)
{
ItemPointerData tmp;
/*
* The idea here is to exchange the current and mark positions,
* then step backwards (affecting current), then exchange again.
*/
ItemPointerData tmpitem;
Buffer tmpbuf;
tmp = *current;
*current = scan->currentItemData;
scan->currentItemData = tmp;
tmpitem = *mark;
*mark = *current;
*current = tmpitem;
tmpbuf = so->hashso_mrkbuf;
so->hashso_mrkbuf = so->hashso_curbuf;
so->hashso_curbuf = tmpbuf;
metabuf = _hash_getbuf(scan->indexRelation, HASH_METAPAGE, HASH_READ);
buf = so->hashso_curbuf;
_hash_step(scan, &buf, BackwardScanDirection, metabuf);
so->hashso_mrkbuf = buf;
tmp = *current;
*current = scan->currentItemData;
scan->currentItemData = tmp;
tmpitem = *mark;
*mark = *current;
*current = tmpitem;
tmpbuf = so->hashso_mrkbuf;
so->hashso_mrkbuf = so->hashso_curbuf;
so->hashso_curbuf = tmpbuf;
}
}
static bool
_hash_scantouched(IndexScanDesc scan,
BlockNumber blkno,
OffsetNumber offno)
{
ItemPointer current;
current = &(scan->currentItemData);
if (ItemPointerIsValid(current)
&& ItemPointerGetBlockNumber(current) == blkno
&& ItemPointerGetOffsetNumber(current) >= offno)
return true;
current = &(scan->currentMarkData);
if (ItemPointerIsValid(current)
&& ItemPointerGetBlockNumber(current) == blkno
&& ItemPointerGetOffsetNumber(current) >= offno)
return true;
return false;
}

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.28 2002/05/20 23:51:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.29 2002/05/24 18:57:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -54,10 +54,10 @@ _hash_search(Relation rel,
* _hash_next() -- Get the next item in a scan.
*
* On entry, we have a valid currentItemData in the scan, and a
* read lock on the page that contains that item. We do not have
* the page pinned. We return the next item in the scan. On
* exit, we have the page containing the next item locked but not
* pinned.
* pin and read lock on the page that contains that item.
* We find the next item in the scan, if any.
* On success exit, we have the page containing the next item
* pinned and locked.
*/
bool
_hash_next(IndexScanDesc scan, ScanDirection dir)
@@ -74,25 +74,12 @@ _hash_next(IndexScanDesc scan, ScanDirection dir)
rel = scan->indexRelation;
so = (HashScanOpaque) scan->opaque;
current = &(scan->currentItemData);
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
/*
* XXX 10 may 91: somewhere there's a bug in our management of the
* cached buffer for this scan. wei discovered it. the following is
* a workaround so he can work until i figure out what's going on.
*/
if (!BufferIsValid(so->hashso_curbuf))
{
so->hashso_curbuf = _hash_getbuf(rel,
ItemPointerGetBlockNumber(current),
HASH_READ);
}
/* we still have the buffer pinned and locked */
buf = so->hashso_curbuf;
Assert(BufferIsValid(buf));
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
/*
* step to next valid tuple. note that _hash_step releases our lock

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.135 2002/05/21 22:05:53 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.136 2002/05/24 18:57:55 tgl Exp $
*
*
* INTERFACE ROUTINES
@@ -306,6 +306,8 @@ heapgettup(Relation relation,
{
if (ItemIdIsUsed(lpp))
{
bool valid;
tuple->t_datamcxt = NULL;
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
tuple->t_len = ItemIdGetLength(lpp);
@@ -315,8 +317,8 @@ heapgettup(Relation relation,
* if current tuple qualifies, return it.
*/
HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
snapshot, nkeys, key);
if (tuple->t_data != NULL)
snapshot, nkeys, key, valid);
if (valid)
{
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
return;
@@ -864,32 +866,37 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
}
/* ----------------
* heap_fetch - retrieve tuple with given tid
/*
* heap_fetch - retrieve tuple with given tid
*
* On entry, tuple->t_self is the TID to fetch.
* On entry, tuple->t_self is the TID to fetch. We pin the buffer holding
* the tuple, fill in the remaining fields of *tuple, and check the tuple
* against the specified snapshot.
*
* If successful (ie, tuple found and passes snapshot time qual),
* then the rest of *tuple is filled in, and *userbuf is set to the
* buffer holding the tuple. A pin is obtained on the buffer; the
* caller must BufferRelease the buffer when done with the tuple.
* If successful (tuple passes snapshot time qual), then *userbuf is set to
* the buffer holding the tuple and TRUE is returned. The caller must
* unpin the buffer when done with the tuple.
*
* If not successful, tuple->t_data is set to NULL and *userbuf is set to
* InvalidBuffer.
* ----------------
* If the tuple fails the time qual check, then FALSE will be returned.
* When the caller specifies keep_buf = true, we retain the pin on the
* buffer and return it in *userbuf (so the caller can still access the
* tuple); when keep_buf = false, the pin is released and *userbuf is set
* to InvalidBuffer.
*/
void
bool
heap_fetch(Relation relation,
Snapshot snapshot,
HeapTuple tuple,
Buffer *userbuf,
bool keep_buf,
PgStat_Info *pgstat_info)
{
ItemPointer tid = &(tuple->t_self);
ItemId lp;
Buffer buffer;
PageHeader dp;
ItemPointer tid = &(tuple->t_self);
OffsetNumber offnum;
bool valid;
/*
* increment access statistics
@@ -901,14 +908,16 @@ heap_fetch(Relation relation,
* get the buffer from the relation descriptor. Note that this does a
* buffer pin.
*/
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
if (!BufferIsValid(buffer))
elog(ERROR, "heap_fetch: %s relation: ReadBuffer(%ld) failed",
elog(ERROR, "heap_fetch: ReadBuffer(%s, %lu) failed",
RelationGetRelationName(relation),
(long) ItemPointerGetBlockNumber(tid));
(unsigned long) ItemPointerGetBlockNumber(tid));
/*
* Need share lock on buffer to examine tuple commit status.
*/
LockBuffer(buffer, BUFFER_LOCK_SHARE);
/*
@@ -921,38 +930,34 @@ heap_fetch(Relation relation,
/*
* more sanity checks
*/
if (!ItemIdIsUsed(lp))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
*userbuf = InvalidBuffer;
tuple->t_datamcxt = NULL;
tuple->t_data = NULL;
return;
elog(ERROR, "heap_fetch: invalid tuple id (%s, %lu, %u)",
RelationGetRelationName(relation),
(unsigned long) ItemPointerGetBlockNumber(tid),
offnum);
}
/*
* fill in *tuple fields
*/
tuple->t_datamcxt = NULL;
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
tuple->t_len = ItemIdGetLength(lp);
tuple->t_tableOid = relation->rd_id;
/*
* check time qualification of tid
* check time qualification of tuple, then release lock
*/
HeapTupleSatisfies(tuple, relation, buffer, dp,
snapshot, 0, (ScanKey) NULL);
snapshot, 0, (ScanKey) NULL, valid);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
if (tuple->t_data == NULL)
{
/* Tuple failed time check, so we can release now. */
ReleaseBuffer(buffer);
*userbuf = InvalidBuffer;
}
else
if (valid)
{
/*
* All checks passed, so return the tuple as valid. Caller is now
@@ -968,13 +973,28 @@ heap_fetch(Relation relation,
pgstat_count_heap_fetch(pgstat_info);
else
pgstat_count_heap_fetch(&relation->pgstat_info);
return true;
}
/* Tuple failed time qual, but maybe caller wants to see it anyway. */
if (keep_buf)
{
*userbuf = buffer;
return false;
}
/* Okay to release pin on buffer. */
ReleaseBuffer(buffer);
*userbuf = InvalidBuffer;
return false;
}
/* ----------------
/*
* heap_get_latest_tid - get the latest tid of a specified tuple
*
* ----------------
*/
ItemPointer
heap_get_latest_tid(Relation relation,
@@ -989,7 +1009,8 @@ heap_get_latest_tid(Relation relation,
HeapTupleHeader t_data;
ItemPointerData ctid;
bool invalidBlock,
linkend;
linkend,
valid;
/*
* get the buffer from the relation descriptor Note that this does a
@@ -1038,7 +1059,7 @@ heap_get_latest_tid(Relation relation,
*/
HeapTupleSatisfies(&tp, relation, buffer, dp,
snapshot, 0, (ScanKey) NULL);
snapshot, 0, (ScanKey) NULL, valid);
linkend = true;
if ((t_data->t_infomask & HEAP_XMIN_COMMITTED) != 0 &&
@@ -1048,7 +1069,7 @@ heap_get_latest_tid(Relation relation,
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
if (tp.t_data == NULL)
if (!valid)
{
if (linkend)
return NULL;

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/tuptoaster.c,v 1.30 2002/05/21 22:05:53 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/heap/tuptoaster.c,v 1.31 2002/05/24 18:57:55 tgl Exp $
*
*
* INTERFACE ROUTINES
@@ -923,7 +923,7 @@ toast_save_datum(Relation rel, Datum value)
*/
idxres = index_insert(toastidx, t_values, t_nulls,
&(toasttup->t_self),
toastrel);
toastrel, toastidx->rd_uniqueindex);
if (idxres == NULL)
elog(ERROR, "Failed to insert index entry for TOAST tuple");

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/index/genam.c,v 1.33 2002/05/20 23:51:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/index/genam.c,v 1.34 2002/05/24 18:57:55 tgl Exp $
*
* NOTES
* many of the old access method routines have been turned into
@@ -89,6 +89,11 @@ RelationGetIndexScan(Relation indexRelation,
else
scan->keyData = NULL;
scan->kill_prior_tuple = false;
scan->ignore_killed_tuples = true; /* default setting */
scan->keys_are_unique = false; /* may be set by amrescan */
scan->got_tuple = false;
scan->opaque = NULL;
ItemPointerSetInvalid(&scan->currentItemData);

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/index/indexam.c,v 1.58 2002/05/20 23:51:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/index/indexam.c,v 1.59 2002/05/24 18:57:55 tgl Exp $
*
* INTERFACE ROUTINES
* index_open - open an index relation by relation OID
@@ -204,7 +204,8 @@ index_insert(Relation indexRelation,
Datum *datums,
char *nulls,
ItemPointer heap_t_ctid,
Relation heapRelation)
Relation heapRelation,
bool check_uniqueness)
{
RegProcedure procedure;
InsertIndexResult specificResult;
@@ -216,12 +217,13 @@ index_insert(Relation indexRelation,
* have the am's insert proc do all the work.
*/
specificResult = (InsertIndexResult)
DatumGetPointer(OidFunctionCall5(procedure,
DatumGetPointer(OidFunctionCall6(procedure,
PointerGetDatum(indexRelation),
PointerGetDatum(datums),
PointerGetDatum(nulls),
PointerGetDatum(heap_t_ctid),
PointerGetDatum(heapRelation)));
PointerGetDatum(heapRelation),
BoolGetDatum(check_uniqueness)));
/* must be pfree'ed */
return specificResult;
@@ -303,6 +305,10 @@ index_rescan(IndexScanDesc scan, ScanKey key)
SCAN_CHECKS;
GET_SCAN_PROCEDURE(rescan, amrescan);
scan->kill_prior_tuple = false; /* for safety */
scan->keys_are_unique = false; /* may be set by amrescan */
scan->got_tuple = false;
OidFunctionCall2(procedure,
PointerGetDatum(scan),
PointerGetDatum(key));
@@ -369,6 +375,9 @@ index_restrpos(IndexScanDesc scan)
SCAN_CHECKS;
GET_SCAN_PROCEDURE(restrpos, amrestrpos);
scan->kill_prior_tuple = false; /* for safety */
scan->got_tuple = false;
OidFunctionCall1(procedure, PointerGetDatum(scan));
}
@@ -385,7 +394,7 @@ index_restrpos(IndexScanDesc scan)
HeapTuple
index_getnext(IndexScanDesc scan, ScanDirection direction)
{
bool found;
HeapTuple heapTuple = &scan->xs_ctup;
SCAN_CHECKS;
@@ -396,8 +405,21 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
scan->xs_cbuf = InvalidBuffer;
}
/* just make sure this is false... */
scan->kill_prior_tuple = false;
/*
* Can skip entering the index AM if we already got a tuple
* and it must be unique.
*/
if (scan->keys_are_unique && scan->got_tuple)
return NULL;
for (;;)
{
bool found;
uint16 sv_infomask;
pgstat_count_index_scan(&scan->xs_pgstat_info);
/*
@@ -407,32 +429,62 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
found = DatumGetBool(FunctionCall2(&scan->fn_getnext,
PointerGetDatum(scan),
Int32GetDatum(direction)));
/* Reset kill flag immediately for safety */
scan->kill_prior_tuple = false;
if (!found)
return NULL; /* failure exit */
/*
* Fetch the heap tuple and see if it matches the snapshot.
*/
heap_fetch(scan->heapRelation, scan->xs_snapshot,
&scan->xs_ctup, &scan->xs_cbuf,
&scan->xs_pgstat_info);
if (scan->xs_ctup.t_data != NULL)
if (heap_fetch(scan->heapRelation, scan->xs_snapshot,
heapTuple, &scan->xs_cbuf, true,
&scan->xs_pgstat_info))
break;
/*
* XXX here, consider whether we can kill the index tuple.
* If we can't see it, maybe no one else can either. Check to see
* if the tuple is dead to all transactions. If so, signal the
* index AM to not return it on future indexscans.
*
* We told heap_fetch to keep a pin on the buffer, so we can
* re-access the tuple here. But we must re-lock the buffer first.
* Also, it's just barely possible for an update of hint bits to
* occur here.
*/
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
sv_infomask = heapTuple->t_data->t_infomask;
if (HeapTupleSatisfiesVacuum(heapTuple->t_data, RecentGlobalXmin) ==
HEAPTUPLE_DEAD)
scan->kill_prior_tuple = true;
if (sv_infomask != heapTuple->t_data->t_infomask)
SetBufferCommitInfoNeedsSave(scan->xs_cbuf);
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(scan->xs_cbuf);
scan->xs_cbuf = InvalidBuffer;
}
/* Success exit */
scan->got_tuple = true;
pgstat_count_index_getnext(&scan->xs_pgstat_info);
return &scan->xs_ctup;
return heapTuple;
}
/* ----------------
* index_getnext_indexitem - get the next index tuple from a scan
*
* Finds the next index tuple satisfying the scan keys. Note that no
* time qual (snapshot) check is done; indeed the heap tuple is not accessed.
* Finds the next index tuple satisfying the scan keys. Note that the
* corresponding heap tuple is not accessed, and thus no time qual (snapshot)
* check is done, other than the index AM's internal check for killed tuples
* (which most callers of this routine will probably want to suppress by
* setting scan->ignore_killed_tuples = false).
*
* On success (TRUE return), the found index TID is in scan->currentItemData,
* and its heap TID is in scan->xs_ctup.t_self. scan->xs_cbuf is untouched.
* ----------------
@@ -445,6 +497,9 @@ index_getnext_indexitem(IndexScanDesc scan,
SCAN_CHECKS;
/* just make sure this is false... */
scan->kill_prior_tuple = false;
/*
* have the am's gettuple proc do all the work. index_beginscan
* already set up fn_getnext.

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.90 2002/03/06 06:09:17 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.91 2002/05/24 18:57:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -176,7 +176,6 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
Page page;
BTPageOpaque opaque;
Buffer nbuf = InvalidBuffer;
bool chtup = true;
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -194,70 +193,85 @@ _bt_check_unique(Relation rel, BTItem btitem, Relation heapRel,
for (;;)
{
HeapTupleData htup;
Buffer buffer;
Buffer hbuffer;
ItemId curitemid;
BTItem cbti;
BlockNumber nblkno;
/*
* _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's how we
* handling NULLs - and so we must not use _bt_compare in real
* comparison, but only for ordering/finding items on pages. -
* vadim 03/24/97
*
* make sure the offset points to an actual key before trying to
* compare it...
*/
if (offset <= maxoff)
{
/*
* _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's how we
* handling NULLs - and so we must not use _bt_compare in real
* comparison, but only for ordering/finding items on pages. -
* vadim 03/24/97
*/
if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
break; /* we're past all the equal tuples */
curitemid = PageGetItemId(page, offset);
/*
* Have to check is inserted heap tuple deleted one (i.e. just
* moved to another place by vacuum)! We only need to do this
* once, but don't want to do it at all unless we see equal
* tuples, so as not to slow down unequal case.
* We can skip the heap fetch if the item is marked killed.
*/
if (chtup)
if (!ItemIdDeleted(curitemid))
{
htup.t_self = btitem->bti_itup.t_tid;
heap_fetch(heapRel, SnapshotDirty, &htup, &buffer, NULL);
if (htup.t_data == NULL) /* YES! */
break;
/* Live tuple is being inserted, so continue checking */
ReleaseBuffer(buffer);
chtup = false;
}
cbti = (BTItem) PageGetItem(page, PageGetItemId(page, offset));
htup.t_self = cbti->bti_itup.t_tid;
heap_fetch(heapRel, SnapshotDirty, &htup, &buffer, NULL);
if (htup.t_data != NULL) /* it is a duplicate */
{
TransactionId xwait =
(TransactionIdIsValid(SnapshotDirty->xmin)) ?
SnapshotDirty->xmin : SnapshotDirty->xmax;
/*
* If this tuple is being updated by other transaction
* then we have to wait for its commit/abort.
*/
ReleaseBuffer(buffer);
if (TransactionIdIsValid(xwait))
cbti = (BTItem) PageGetItem(page, curitemid);
htup.t_self = cbti->bti_itup.t_tid;
if (heap_fetch(heapRel, SnapshotDirty, &htup, &hbuffer,
true, NULL))
{
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
/* Tell _bt_doinsert to wait... */
return xwait;
}
/* it is a duplicate */
TransactionId xwait =
(TransactionIdIsValid(SnapshotDirty->xmin)) ?
SnapshotDirty->xmin : SnapshotDirty->xmax;
/*
* Otherwise we have a definite conflict.
*/
elog(ERROR, "Cannot insert a duplicate key into unique index %s",
RelationGetRelationName(rel));
ReleaseBuffer(hbuffer);
/*
* If this tuple is being updated by other transaction
* then we have to wait for its commit/abort.
*/
if (TransactionIdIsValid(xwait))
{
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
/* Tell _bt_doinsert to wait... */
return xwait;
}
/*
* Otherwise we have a definite conflict.
*/
elog(ERROR, "Cannot insert a duplicate key into unique index %s",
RelationGetRelationName(rel));
}
else
{
/*
* Hmm, if we can't see the tuple, maybe it can be
* marked killed. This logic should match index_getnext
* and btgettuple.
*/
uint16 sv_infomask;
LockBuffer(hbuffer, BUFFER_LOCK_SHARE);
sv_infomask = htup.t_data->t_infomask;
if (HeapTupleSatisfiesVacuum(htup.t_data,
RecentGlobalXmin) ==
HEAPTUPLE_DEAD)
{
curitemid->lp_flags |= LP_DELETE;
SetBufferCommitInfoNeedsSave(buf);
}
if (sv_infomask != htup.t_data->t_infomask)
SetBufferCommitInfoNeedsSave(hbuffer);
LockBuffer(hbuffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(hbuffer);
}
}
/* htup null so no buffer to release */
}
/*

View File

@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.89 2002/05/20 23:51:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.90 2002/05/24 18:57:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -271,6 +271,7 @@ btinsert(PG_FUNCTION_ARGS)
char *nulls = (char *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
bool checkUnique = PG_GETARG_BOOL(5);
InsertIndexResult res;
BTItem btitem;
IndexTuple itup;
@@ -280,7 +281,7 @@ btinsert(PG_FUNCTION_ARGS)
itup->t_tid = *ht_ctid;
btitem = _bt_formitem(itup);
res = _bt_doinsert(rel, btitem, rel->rd_uniqueindex, heapRel);
res = _bt_doinsert(rel, btitem, checkUnique, heapRel);
pfree(btitem);
pfree(itup);
@@ -296,14 +297,16 @@ btgettuple(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
bool res;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
Page page;
OffsetNumber offnum;
bool res;
/*
* If we've already initialized this scan, we can just advance it in
* the appropriate direction. If we haven't done so yet, we call a
* routine to get the first item in the scan.
*/
if (ItemPointerIsValid(&(scan->currentItemData)))
{
/*
@@ -312,11 +315,47 @@ btgettuple(PG_FUNCTION_ARGS)
* buffer, too.
*/
_bt_restscan(scan);
/*
* Check to see if we should kill the previously-fetched tuple.
*/
if (scan->kill_prior_tuple)
{
/*
* Yes, so mark it by setting the LP_DELETE bit in the item flags.
*/
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->btso_curbuf);
PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
/*
* Since this can be redone later if needed, it's treated the
* same as a commit-hint-bit status update for heap tuples:
* we mark the buffer dirty but don't make a WAL log entry.
*/
SetBufferCommitInfoNeedsSave(so->btso_curbuf);
}
/*
* Now continue the scan.
*/
res = _bt_next(scan, dir);
}
else
res = _bt_first(scan, dir);
/*
* Skip killed tuples if asked to.
*/
if (scan->ignore_killed_tuples)
{
while (res)
{
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->btso_curbuf);
if (!ItemIdDeleted(PageGetItemId(page, offnum)))
break;
res = _bt_next(scan, dir);
}
}
/*
* Save heap TID to use it in _bt_restscan. Then release the read
* lock on the buffer so that we aren't blocking other backends.

View File

@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.70 2002/05/20 23:51:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.71 2002/05/24 18:57:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -425,7 +425,8 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
bool
_bt_first(IndexScanDesc scan, ScanDirection dir)
{
Relation rel;
Relation rel = scan->indexRelation;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
Buffer buf;
Page page;
BTStack stack;
@@ -437,7 +438,6 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
StrategyNumber strat;
bool res;
int32 result;
BTScanOpaque so;
bool scanFromEnd;
bool continuescan;
ScanKey scankeys = NULL;
@@ -447,14 +447,11 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
j;
StrategyNumber strat_total;
rel = scan->indexRelation;
so = (BTScanOpaque) scan->opaque;
/*
* Order the scan keys in our canonical fashion and eliminate any
* redundant keys.
*/
_bt_orderkeys(rel, so);
_bt_orderkeys(scan);
/*
* Quit now if _bt_orderkeys() discovered that the scan keys can never

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.48 2002/05/20 23:51:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.49 2002/05/24 18:57:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -22,6 +22,9 @@
#include "executor/execdebug.h"
static int _bt_getstrategynumber(RegProcedure sk_procedure, StrategyMap map);
/*
* _bt_mkscankey
* Build a scan key that contains comparison data from itup
@@ -174,6 +177,11 @@ _bt_formitem(IndexTuple itup)
* attribute, which can be seen to be correct by considering the above
* example.
*
* Furthermore, we detect the case where the index is unique and we have
* equality quals for all columns. In this case there can be at most one
* (visible) matching tuple. index_getnext uses this to avoid uselessly
* continuing the scan after finding one match.
*
* The initial ordering of the keys is expected to be by attribute already
* (see group_clauses_by_indexkey() in indxpath.c). The task here is to
* standardize the appearance of multiple keys for the same attribute.
@@ -191,8 +199,10 @@ _bt_formitem(IndexTuple itup)
*----------
*/
void
_bt_orderkeys(Relation relation, BTScanOpaque so)
_bt_orderkeys(IndexScanDesc scan)
{
Relation relation = scan->indexRelation;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
ScanKeyData xform[BTMaxStrategyNumber];
bool init[BTMaxStrategyNumber];
int numberOfKeys = so->numberOfKeys;
@@ -208,6 +218,7 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
so->qual_ok = true;
so->numberOfRequiredKeys = 0;
scan->keys_are_unique = false;
if (numberOfKeys < 1)
return; /* done if qual-less scan */
@@ -228,6 +239,17 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
*/
if (cur->sk_flags & SK_ISNULL)
so->qual_ok = false;
else if (relation->rd_index->indisunique &&
relation->rd_rel->relnatts == 1)
{
/* it's a unique index, do we have an equality qual? */
map = IndexStrategyGetStrategyMap(RelationGetIndexStrategy(relation),
BTMaxStrategyNumber,
1);
j = _bt_getstrategynumber(cur->sk_procedure, map);
if (j == (BTEqualStrategyNumber - 1))
scan->keys_are_unique = true;
}
so->numberOfRequiredKeys = 1;
return;
}
@@ -390,17 +412,8 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
MemSet(init, 0, sizeof(init));
}
/*
* OK, figure out which strategy this key corresponds to
*/
for (j = BTMaxStrategyNumber; --j >= 0;)
{
if (cur->sk_procedure == map->entry[j].sk_procedure)
break;
}
if (j < 0)
elog(ERROR, "_bt_orderkeys: unable to identify operator %u",
cur->sk_procedure);
/* figure out which strategy this key's operator corresponds to */
j = _bt_getstrategynumber(cur->sk_procedure, map);
/* have we seen one of these before? */
if (init[j])
@@ -424,6 +437,34 @@ _bt_orderkeys(Relation relation, BTScanOpaque so)
}
so->numberOfKeys = new_numberOfKeys;
/*
* If unique index and we have equality keys for all columns,
* set keys_are_unique flag for higher levels.
*/
if (allEqualSoFar && relation->rd_index->indisunique &&
relation->rd_rel->relnatts == new_numberOfKeys)
scan->keys_are_unique = true;
}
/*
* Determine which btree strategy an operator procedure matches.
*
* Result is strategy number minus 1.
*/
static int
_bt_getstrategynumber(RegProcedure sk_procedure, StrategyMap map)
{
int j;
for (j = BTMaxStrategyNumber; --j >= 0;)
{
if (sk_procedure == map->entry[j].sk_procedure)
return j;
}
elog(ERROR, "_bt_getstrategynumber: unable to identify operator %u",
sk_procedure);
return -1; /* keep compiler quiet */
}
/*

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.71 2002/05/20 23:51:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.72 2002/05/24 18:57:55 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -223,9 +223,9 @@ rtinsert(PG_FUNCTION_ARGS)
Datum *datum = (Datum *) PG_GETARG_POINTER(1);
char *nulls = (char *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
bool checkUnique = PG_GETARG_BOOL(5);
#endif
InsertIndexResult res;
IndexTuple itup;
@@ -1206,6 +1206,8 @@ rtbulkdelete(PG_FUNCTION_ARGS)
/* walk through the entire index */
iscan = index_beginscan(NULL, rel, SnapshotAny, 0, (ScanKey) NULL);
/* including killed tuples */
iscan->ignore_killed_tuples = false;
while (index_getnext_indexitem(iscan, ForwardScanDirection))
{