1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-10 17:42:29 +03:00

Implement genuine serializable isolation level.

Until now, our Serializable mode has in fact been what's called Snapshot
Isolation, which allows some anomalies that could not occur in any
serialized ordering of the transactions. This patch fixes that using a
method called Serializable Snapshot Isolation, based on research papers by
Michael J. Cahill (see README-SSI for full references). In Serializable
Snapshot Isolation, transactions run like they do in Snapshot Isolation,
but a predicate lock manager observes the reads and writes performed and
aborts transactions if it detects that an anomaly might occur. This method
produces some false positives, ie. it sometimes aborts transactions even
though there is no anomaly.

To track reads we implement predicate locking, see storage/lmgr/predicate.c.
Whenever a tuple is read, a predicate lock is acquired on the tuple. Shared
memory is finite, so when a transaction takes many tuple-level locks on a
page, the locks are promoted to a single page-level lock, and further to a
single relation level lock if necessary. To lock key values with no matching
tuple, a sequential scan always takes a relation-level lock, and an index
scan acquires a page-level lock that covers the search key, whether or not
there are any matching keys at the moment.

A predicate lock doesn't conflict with any regular locks or with another
predicate locks in the normal sense. They're only used by the predicate lock
manager to detect the danger of anomalies. Only serializable transactions
participate in predicate locking, so there should be no extra overhead for
for other transactions.

Predicate locks can't be released at commit, but must be remembered until
all the transactions that overlapped with it have completed. That means that
we need to remember an unbounded amount of predicate locks, so we apply a
lossy but conservative method of tracking locks for committed transactions.
If we run short of shared memory, we overflow to a new "pg_serial" SLRU
pool.

We don't currently allow Serializable transactions in Hot Standby mode.
That would be hard, because even read-only transactions can cause anomalies
that wouldn't otherwise occur.

Serializable isolation mode now means the new fully serializable level.
Repeatable Read gives you the old Snapshot Isolation level that we have
always had.

Kevin Grittner and Dan Ports, reviewed by Jeff Davis, Heikki Linnakangas and
Anssi Kääriäinen
This commit is contained in:
Heikki Linnakangas
2011-02-07 23:46:51 +02:00
parent c18f51da17
commit dafaa3efb7
90 changed files with 14995 additions and 271 deletions

View File

@@ -57,6 +57,7 @@
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "storage/standby.h"
@@ -261,20 +262,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
{
if (ItemIdIsNormal(lpp))
{
HeapTupleData loctup;
bool valid;
loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
loctup.t_len = ItemIdGetLength(lpp);
ItemPointerSet(&(loctup.t_self), page, lineoff);
if (all_visible)
valid = true;
else
{
HeapTupleData loctup;
loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
loctup.t_len = ItemIdGetLength(lpp);
ItemPointerSet(&(loctup.t_self), page, lineoff);
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
}
CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer);
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
}
@@ -468,12 +469,16 @@ heapgettup(HeapScanDesc scan,
snapshot,
scan->rs_cbuf);
CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf);
if (valid && key != NULL)
HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
nkeys, key, valid);
if (valid)
{
if (!scan->rs_relpredicatelocked)
PredicateLockTuple(scan->rs_rd, tuple);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
return;
}
@@ -741,12 +746,16 @@ heapgettup_pagemode(HeapScanDesc scan,
nkeys, key, valid);
if (valid)
{
if (!scan->rs_relpredicatelocked)
PredicateLockTuple(scan->rs_rd, tuple);
scan->rs_cindex = lineindex;
return;
}
}
else
{
if (!scan->rs_relpredicatelocked)
PredicateLockTuple(scan->rs_rd, tuple);
scan->rs_cindex = lineindex;
return;
}
@@ -1213,6 +1222,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
scan->rs_strategy = NULL; /* set in initscan */
scan->rs_allow_strat = allow_strat;
scan->rs_allow_sync = allow_sync;
scan->rs_relpredicatelocked = false;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
@@ -1459,8 +1469,13 @@ heap_fetch(Relation relation,
*/
valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
if (valid)
PredicateLockTuple(relation, tuple);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
CheckForSerializableConflictOut(valid, relation, tuple, buffer);
if (valid)
{
/*
@@ -1506,13 +1521,15 @@ heap_fetch(Relation relation,
* heap_fetch, we do not report any pgstats count; caller may do so if wanted.
*/
bool
heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
bool *all_dead)
heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
Snapshot snapshot, bool *all_dead)
{
Page dp = (Page) BufferGetPage(buffer);
TransactionId prev_xmax = InvalidTransactionId;
OffsetNumber offnum;
bool at_chain_start;
bool valid;
bool match_found;
if (all_dead)
*all_dead = true;
@@ -1522,6 +1539,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
Assert(ItemPointerGetBlockNumber(tid) == BufferGetBlockNumber(buffer));
offnum = ItemPointerGetOffsetNumber(tid);
at_chain_start = true;
match_found = false;
/* Scan through possible multiple members of HOT-chain */
for (;;)
@@ -1552,6 +1570,8 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
heapTuple.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
heapTuple.t_len = ItemIdGetLength(lp);
heapTuple.t_tableOid = relation->rd_id;
heapTuple.t_self = *tid;
/*
* Shouldn't see a HEAP_ONLY tuple at chain start.
@@ -1569,12 +1589,18 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
break;
/* If it's visible per the snapshot, we must return it */
if (HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer))
valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer);
CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer);
if (valid)
{
ItemPointerSetOffsetNumber(tid, offnum);
PredicateLockTuple(relation, &heapTuple);
if (all_dead)
*all_dead = false;
return true;
if (IsolationIsSerializable())
match_found = true;
else
return true;
}
/*
@@ -1603,7 +1629,7 @@ heap_hot_search_buffer(ItemPointer tid, Buffer buffer, Snapshot snapshot,
break; /* end of chain */
}
return false;
return match_found;
}
/*
@@ -1622,7 +1648,7 @@ heap_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot,
buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(buffer, BUFFER_LOCK_SHARE);
result = heap_hot_search_buffer(tid, buffer, snapshot, all_dead);
result = heap_hot_search_buffer(tid, relation, buffer, snapshot, all_dead);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return result;
@@ -1729,6 +1755,7 @@ heap_get_latest_tid(Relation relation,
* result candidate.
*/
valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
CheckForSerializableConflictOut(valid, relation, &tp, buffer);
if (valid)
*tid = ctid;
@@ -1893,6 +1920,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
InvalidBuffer, options, bistate);
/*
* We're about to do the actual insert -- check for conflict at the
* relation or buffer level first, to avoid possibly having to roll
* back work we've just done.
*/
CheckForSerializableConflictIn(relation, NULL, buffer);
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
@@ -2193,6 +2227,12 @@ l1:
return result;
}
/*
* We're about to do the actual delete -- check for conflict first,
* to avoid possibly having to roll back work we've just done.
*/
CheckForSerializableConflictIn(relation, &tp, buffer);
/* replace cid with a combo cid if necessary */
HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);
@@ -2546,6 +2586,12 @@ l2:
return result;
}
/*
* We're about to do the actual update -- check for conflict first,
* to avoid possibly having to roll back work we've just done.
*/
CheckForSerializableConflictIn(relation, &oldtup, buffer);
/* Fill in OID and transaction status data for newtup */
if (relation->rd_rel->relhasoids)
{
@@ -2690,6 +2736,16 @@ l2:
heaptup = newtup;
}
/*
* We're about to create the new tuple -- check for conflict first,
* to avoid possibly having to roll back work we've just done.
*
* NOTE: For a tuple insert, we only need to check for table locks, since
* predicate locking at the index level will cover ranges for anything
* except a table scan. Therefore, only provide the relation.
*/
CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
/*
* At this point newbuf and buffer are both pinned and locked, and newbuf
* has enough space for the new tuple. If they are the same buffer, only
@@ -2799,6 +2855,12 @@ l2:
END_CRIT_SECTION();
/*
* Any existing SIREAD locks on the old tuple must be linked to the new
* tuple for conflict detection purposes.
*/
PredicateLockTupleRowVersionLink(relation, &oldtup, newtup);
if (newbuf != buffer)
LockBuffer(newbuf, BUFFER_LOCK_UNLOCK);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

View File

@@ -64,9 +64,11 @@
#include "access/relscan.h"
#include "access/transam.h"
#include "access/xact.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
@@ -192,6 +194,11 @@ index_insert(Relation indexRelation,
RELATION_CHECKS;
GET_REL_PROCEDURE(aminsert);
if (!(indexRelation->rd_am->ampredlocks))
CheckForSerializableConflictIn(indexRelation,
(HeapTuple) NULL,
InvalidBuffer);
/*
* have the am's insert proc do all the work.
*/
@@ -266,6 +273,9 @@ index_beginscan_internal(Relation indexRelation,
RELATION_CHECKS;
GET_REL_PROCEDURE(ambeginscan);
if (!(indexRelation->rd_am->ampredlocks))
PredicateLockRelation(indexRelation);
/*
* We hold a reference count to the relcache entry throughout the scan.
*/
@@ -523,6 +533,7 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
{
ItemId lp;
ItemPointer ctid;
bool valid;
/* check for bogus TID */
if (offnum < FirstOffsetNumber ||
@@ -577,8 +588,13 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
break;
/* If it's visible per the snapshot, we must return it */
if (HeapTupleSatisfiesVisibility(heapTuple, scan->xs_snapshot,
scan->xs_cbuf))
valid = HeapTupleSatisfiesVisibility(heapTuple, scan->xs_snapshot,
scan->xs_cbuf);
CheckForSerializableConflictOut(valid, scan->heapRelation,
heapTuple, scan->xs_cbuf);
if (valid)
{
/*
* If the snapshot is MVCC, we know that it could accept at
@@ -586,7 +602,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
* any more members. Otherwise, check for continuation of the
* HOT-chain, and set state for next time.
*/
if (IsMVCCSnapshot(scan->xs_snapshot))
if (IsMVCCSnapshot(scan->xs_snapshot)
&& !IsolationIsSerializable())
scan->xs_next_hot = InvalidOffsetNumber;
else if (HeapTupleIsHotUpdated(heapTuple))
{
@@ -598,6 +615,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
else
scan->xs_next_hot = InvalidOffsetNumber;
PredicateLockTuple(scan->heapRelation, heapTuple);
LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
pgstat_count_heap_fetch(scan->indexRelation);

View File

@@ -21,6 +21,7 @@
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "utils/inval.h"
#include "utils/tqual.h"
@@ -174,6 +175,14 @@ top:
if (checkUnique != UNIQUE_CHECK_EXISTING)
{
/*
* The only conflict predicate locking cares about for indexes is when
* an index tuple insert conflicts with an existing lock. Since the
* actual location of the insert is hard to predict because of the
* random search used to prevent O(N^2) performance when there are many
* duplicate entries, we can just use the "first valid" page.
*/
CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
_bt_insertonpg(rel, buf, stack, itup, offset, false);
@@ -696,6 +705,9 @@ _bt_insertonpg(Relation rel,
/* split the buffer into left and right halves */
rbuf = _bt_split(rel, buf, firstright,
newitemoff, itemsz, itup, newitemonleft);
PredicateLockPageSplit(rel,
BufferGetBlockNumber(buf),
BufferGetBlockNumber(rbuf));
/*----------
* By here,

View File

@@ -29,6 +29,7 @@
#include "storage/freespace.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "utils/inval.h"
#include "utils/snapmgr.h"
@@ -1183,6 +1184,12 @@ _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
rightsib, opaque->btpo_prev, target,
RelationGetRelationName(rel));
/*
* Any insert which would have gone on the target block will now go to the
* right sibling block.
*/
PredicateLockPageCombine(rel, target, rightsib);
/*
* Next find and write-lock the current parent of the target page. This is
* essentially the same as the corresponding step of splitting.

View File

@@ -29,6 +29,7 @@
#include "storage/indexfsm.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
@@ -822,6 +823,7 @@ restart:
if (_bt_page_recyclable(page))
{
/* Okay to recycle this page */
Assert(!PageIsPredicateLocked(rel, blkno));
RecordFreeIndexPage(rel, blkno);
vstate->totFreePages++;
stats->pages_deleted++;

View File

@@ -21,6 +21,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/predicate.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
@@ -63,7 +64,10 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
/* If index is empty and access = BT_READ, no root page is created. */
if (!BufferIsValid(*bufP))
{
PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return (BTStack) NULL;
}
/* Loop iterates once per level descended in the tree */
for (;;)
@@ -88,7 +92,11 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
page = BufferGetPage(*bufP);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISLEAF(opaque))
{
if (access == BT_READ)
PredicateLockPage(rel, BufferGetBlockNumber(*bufP));
break;
}
/*
* Find the appropriate item on the internal page, and get the child
@@ -1142,6 +1150,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
PredicateLockPage(rel, blkno);
/* see if there are any matches on this page */
/* note that this will clear moreRight if we can stop */
if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque)))
@@ -1189,6 +1198,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
{
PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf));
/* see if there are any matches on this page */
/* note that this will clear moreLeft if we can stop */
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
@@ -1352,6 +1362,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
if (!BufferIsValid(buf))
{
/* empty index... */
PredicateLockRelation(rel); /* Nothing finer to lock exists. */
return InvalidBuffer;
}
@@ -1431,10 +1442,12 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
if (!BufferIsValid(buf))
{
/* empty index... */
PredicateLockRelation(rel); /* Nothing finer to lock exists. */
so->currPos.buf = InvalidBuffer;
return false;
}
PredicateLockPage(rel, BufferGetBlockNumber(buf));
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(P_ISLEAF(opaque));

View File

@@ -57,6 +57,7 @@
#include "pgstat.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
@@ -1357,6 +1358,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
else
ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
PredicateLockTwoPhaseFinish(xid, isCommit);
/* Count the prepared xact as committed or aborted */
AtEOXact_PgStat(isCommit);

View File

@@ -18,12 +18,14 @@
#include "access/twophase_rmgr.h"
#include "pgstat.h"
#include "storage/lock.h"
#include "storage/predicate.h"
const TwoPhaseCallback twophase_recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_recover, /* Lock */
predicatelock_twophase_recover, /* PredicateLock */
NULL, /* pgstat */
multixact_twophase_recover /* MultiXact */
};
@@ -32,6 +34,7 @@ const TwoPhaseCallback twophase_postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_postcommit, /* Lock */
NULL, /* PredicateLock */
pgstat_twophase_postcommit, /* pgstat */
multixact_twophase_postcommit /* MultiXact */
};
@@ -40,6 +43,7 @@ const TwoPhaseCallback twophase_postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] =
{
NULL, /* END ID */
lock_twophase_postabort, /* Lock */
NULL, /* PredicateLock */
pgstat_twophase_postabort, /* pgstat */
multixact_twophase_postabort /* MultiXact */
};
@@ -48,6 +52,7 @@ const TwoPhaseCallback twophase_standby_recover_callbacks[TWOPHASE_RM_MAX_ID + 1
{
NULL, /* END ID */
lock_twophase_standby_recover, /* Lock */
NULL, /* PredicateLock */
NULL, /* pgstat */
NULL /* MultiXact */
};

View File

@@ -21,6 +21,7 @@
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
#include "storage/pmsignal.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/syscache.h"
@@ -161,6 +162,10 @@ GetNewTransactionId(bool isSubXact)
ExtendCLOG(xid);
ExtendSUBTRANS(xid);
/* If it's top level, the predicate locking system also needs to know. */
if (!isSubXact)
RegisterPredicateLockingXid(xid);
/*
* Now advance the nextXid counter. This must not happen until after we
* have successfully completed ExtendCLOG() --- if that routine fails, we

View File

@@ -40,6 +40,7 @@
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
@@ -63,6 +64,9 @@ int XactIsoLevel;
bool DefaultXactReadOnly = false;
bool XactReadOnly;
bool DefaultXactDeferrable = false;
bool XactDeferrable;
bool XactSyncCommit = true;
int CommitDelay = 0; /* precommit delay in microseconds */
@@ -1640,6 +1644,7 @@ StartTransaction(void)
s->startedInRecovery = false;
XactReadOnly = DefaultXactReadOnly;
}
XactDeferrable = DefaultXactDeferrable;
XactIsoLevel = DefaultXactIsoLevel;
forceSyncCommit = false;
MyXactAccessedTempRel = false;
@@ -1786,6 +1791,13 @@ CommitTransaction(void)
/* close large objects before lower-level cleanup */
AtEOXact_LargeObject(true);
/*
* Mark serializable transaction as complete for predicate locking
* purposes. This should be done as late as we can put it and still
* allow errors to be raised for failure patterns found at commit.
*/
PreCommit_CheckForSerializationFailure();
/*
* Insert notifications sent by NOTIFY commands into the queue. This
* should be late in the pre-commit sequence to minimize time spent
@@ -1980,6 +1992,13 @@ PrepareTransaction(void)
/* close large objects before lower-level cleanup */
AtEOXact_LargeObject(true);
/*
* Mark serializable transaction as complete for predicate locking
* purposes. This should be done as late as we can put it and still
* allow errors to be raised for failure patterns found at commit.
*/
PreCommit_CheckForSerializationFailure();
/* NOTIFY will be handled below */
/*
@@ -2044,6 +2063,7 @@ PrepareTransaction(void)
AtPrepare_Notify();
AtPrepare_Locks();
AtPrepare_PredicateLocks();
AtPrepare_PgStat();
AtPrepare_MultiXact();
AtPrepare_RelationMap();
@@ -2103,6 +2123,7 @@ PrepareTransaction(void)
PostPrepare_MultiXact(xid);
PostPrepare_Locks(xid);
PostPrepare_PredicateLocks(xid);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,