1
0
mirror of https://github.com/postgres/postgres.git synced 2025-09-03 15:22:11 +03:00

Make non-MVCC snapshots exempt from predicate locking. Scans with non-MVCC

snapshots, like in REINDEX, are basically non-transactional operations. The
DDL operation itself might participate in SSI, but there's separate
functions for that.

Kevin Grittner and Dan Ports, with some changes by me.
This commit is contained in:
Heikki Linnakangas
2011-06-15 11:43:05 +03:00
parent 80721b5182
commit ff4e078773
6 changed files with 151 additions and 100 deletions

View File

@@ -274,7 +274,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
else
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer);
CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup,
buffer, snapshot);
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
@@ -469,7 +470,8 @@ heapgettup(HeapScanDesc scan,
snapshot,
scan->rs_cbuf);
CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf);
CheckForSerializableConflictOut(valid, scan->rs_rd, tuple,
scan->rs_cbuf, snapshot);
if (valid && key != NULL)
HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
@@ -478,7 +480,7 @@ heapgettup(HeapScanDesc scan,
if (valid)
{
if (!scan->rs_relpredicatelocked)
PredicateLockTuple(scan->rs_rd, tuple);
PredicateLockTuple(scan->rs_rd, tuple, snapshot);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
return;
}
@@ -747,7 +749,7 @@ heapgettup_pagemode(HeapScanDesc scan,
if (valid)
{
if (!scan->rs_relpredicatelocked)
PredicateLockTuple(scan->rs_rd, tuple);
PredicateLockTuple(scan->rs_rd, tuple, scan->rs_snapshot);
scan->rs_cindex = lineindex;
return;
}
@@ -755,7 +757,7 @@ heapgettup_pagemode(HeapScanDesc scan,
else
{
if (!scan->rs_relpredicatelocked)
PredicateLockTuple(scan->rs_rd, tuple);
PredicateLockTuple(scan->rs_rd, tuple, scan->rs_snapshot);
scan->rs_cindex = lineindex;
return;
}
@@ -1470,9 +1472,9 @@ heap_fetch(Relation relation,
valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
if (valid)
PredicateLockTuple(relation, tuple);
PredicateLockTuple(relation, tuple, snapshot);
CheckForSerializableConflictOut(valid, relation, tuple, buffer);
CheckForSerializableConflictOut(valid, relation, tuple, buffer, snapshot);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
@@ -1588,11 +1590,12 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
/* If it's visible per the snapshot, we must return it */
valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer);
CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer);
CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer,
snapshot);
if (valid)
{
ItemPointerSetOffsetNumber(tid, offnum);
PredicateLockTuple(relation, &heapTuple);
PredicateLockTuple(relation, &heapTuple, snapshot);
if (all_dead)
*all_dead = false;
return true;
@@ -1750,7 +1753,7 @@ heap_get_latest_tid(Relation relation,
* result candidate.
*/
valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
CheckForSerializableConflictOut(valid, relation, &tp, buffer);
CheckForSerializableConflictOut(valid, relation, &tp, buffer, snapshot);
if (valid)
*tid = ctid;

View File

@@ -126,7 +126,7 @@ do { \
} while(0)
static IndexScanDesc index_beginscan_internal(Relation indexRelation,
int nkeys, int norderbys);
int nkeys, int norderbys, Snapshot snapshot);
/* ----------------------------------------------------------------
@@ -234,7 +234,7 @@ index_beginscan(Relation heapRelation,
{
IndexScanDesc scan;
scan = index_beginscan_internal(indexRelation, nkeys, norderbys);
scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot);
/*
* Save additional parameters into the scandesc. Everything else was set
@@ -259,7 +259,7 @@ index_beginscan_bitmap(Relation indexRelation,
{
IndexScanDesc scan;
scan = index_beginscan_internal(indexRelation, nkeys, 0);
scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot);
/*
* Save additional parameters into the scandesc. Everything else was set
@@ -275,7 +275,7 @@ index_beginscan_bitmap(Relation indexRelation,
*/
static IndexScanDesc
index_beginscan_internal(Relation indexRelation,
int nkeys, int norderbys)
int nkeys, int norderbys, Snapshot snapshot)
{
IndexScanDesc scan;
FmgrInfo *procedure;
@@ -284,7 +284,7 @@ index_beginscan_internal(Relation indexRelation,
GET_REL_PROCEDURE(ambeginscan);
if (!(indexRelation->rd_am->ampredlocks))
PredicateLockRelation(indexRelation);
PredicateLockRelation(indexRelation, snapshot);
/*
* We hold a reference count to the relcache entry throughout the scan.
@@ -602,7 +602,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
scan->xs_cbuf);
CheckForSerializableConflictOut(valid, scan->heapRelation,
heapTuple, scan->xs_cbuf);
heapTuple, scan->xs_cbuf,
scan->xs_snapshot);
if (valid)
{
@@ -624,7 +625,7 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
else
scan->xs_next_hot = InvalidOffsetNumber;
PredicateLockTuple(scan->heapRelation, heapTuple);
PredicateLockTuple(scan->heapRelation, heapTuple, scan->xs_snapshot);
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);

View File

@@ -64,10 +64,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
/* If index is empty and access = BT_READ, no root page is created. */
if (!BufferIsValid(*bufP))
{
PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return (BTStack) NULL;
}
/* Loop iterates once per level descended in the tree */
for (;;)
@@ -92,11 +89,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
page = BufferGetPage(*bufP);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISLEAF(opaque))
{
if (access == BT_READ)
PredicateLockPage(rel, BufferGetBlockNumber(*bufP));
break;
}
/*
* Find the appropriate item on the internal page, and get the child
@@ -855,9 +848,16 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
if (!BufferIsValid(buf))
{
/* Only get here if index is completely empty */
/*
* We only get here if the index is completely empty.
* Lock relation because nothing finer to lock exists.
*/
PredicateLockRelation(rel, scan->xs_snapshot);
return false;
}
else
PredicateLockPage(rel, BufferGetBlockNumber(buf),
scan->xs_snapshot);
/* initialize moreLeft/moreRight appropriately for scan direction */
if (ScanDirectionIsForward(dir))
@@ -1153,7 +1153,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
PredicateLockPage(rel, blkno);
PredicateLockPage(rel, blkno, scan->xs_snapshot);
/* see if there are any matches on this page */
/* note that this will clear moreRight if we can stop */
if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque)))
@@ -1201,7 +1201,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf));
PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf), scan->xs_snapshot);
/* see if there are any matches on this page */
/* note that this will clear moreLeft if we can stop */
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
@@ -1363,11 +1363,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
buf = _bt_gettrueroot(rel);
if (!BufferIsValid(buf))
{
/* empty index... */
PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return InvalidBuffer;
}
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -1444,13 +1440,16 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
if (!BufferIsValid(buf))
{
/* empty index... */
PredicateLockRelation(rel); /* Nothing finer to lock exists. */
/*
* Empty index. Lock the whole relation, as nothing finer to lock
* exists.
*/
PredicateLockRelation(rel, scan->xs_snapshot);
so->currPos.buf = InvalidBuffer;
return false;
}
PredicateLockPage(rel, BufferGetBlockNumber(buf));
PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(P_ISLEAF(opaque));