mirror of
https://github.com/postgres/postgres.git
synced 2025-09-05 02:22:28 +03:00
Enable parallel query with SERIALIZABLE isolation.
Previously, the SERIALIZABLE isolation level prevented parallel query
from being used. Allow the two features to be used together by
sharing the leader's SERIALIZABLEXACT with parallel workers.
An extra per-SERIALIZABLEXACT LWLock is introduced to make it safe to
share, and new logic is introduced to coordinate the early release
of the SERIALIZABLEXACT required for the SXACT_FLAG_RO_SAFE
optimization, as follows:
The first backend to observe the SXACT_FLAG_RO_SAFE flag (set by
some other transaction) will 'partially release' the SERIALIZABLEXACT,
meaning that the conflicts and locks it holds are released, but the
SERIALIZABLEXACT itself will remain active because other backends
might still have a pointer to it.
Whenever any backend notices the SXACT_FLAG_RO_SAFE flag, it clears
its own MySerializableXact variable and frees local resources so that
it can skip SSI checks for the rest of the transaction. In the
special case of the leader process, it transfers the SERIALIZABLEXACT
to a new variable SavedSerializableXact, so that it can be completely
released at the end of the transaction after all workers have exited.
Remove the serializable_okay flag added to CreateParallelContext() by
commit 9da0cc35
, because it's now redundant.
Author: Thomas Munro
Reviewed-by: Haribabu Kommi, Robert Haas, Masahiko Sawada, Kevin Grittner
Discussion: https://postgr.es/m/CAEepm=0gXGYhtrVDWOTHS8SQQy_=S9xo+8oCxGLWZAOoeJ=yzQ@mail.gmail.com
This commit is contained in:
@@ -521,6 +521,7 @@ RegisterLWLockTranches(void)
|
||||
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
|
||||
LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
|
||||
LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join");
|
||||
LWLockRegisterTranche(LWTRANCHE_SXACT, "serializable_xact");
|
||||
|
||||
/* Register named tranches. */
|
||||
for (i = 0; i < NamedLWLockTrancheRequests; i++)
|
||||
|
@@ -97,7 +97,9 @@
|
||||
* - All transactions share this single lock (with no partitioning).
|
||||
* - There is never a need for a process other than the one running
|
||||
* an active transaction to walk the list of locks held by that
|
||||
* transaction.
|
||||
* transaction, except parallel query workers sharing the leader's
|
||||
* transaction. In the parallel case, an extra per-sxact lock is
|
||||
* taken; see below.
|
||||
* - It is relatively infrequent that another process needs to
|
||||
* modify the list for a transaction, but it does happen for such
|
||||
* things as index page splits for pages with predicate locks and
|
||||
@@ -116,6 +118,12 @@
|
||||
* than its own active transaction must acquire an exclusive
|
||||
* lock.
|
||||
*
|
||||
* SERIALIZABLEXACT's member 'predicateLockListLock'
|
||||
* - Protects the linked list of locks held by a transaction. Only
|
||||
* needed for parallel mode, where multiple backends share the
|
||||
* same SERIALIZABLEXACT object. Not needed if
|
||||
* SerializablePredicateLockListLock is held exclusively.
|
||||
*
|
||||
* PredicateLockHashPartitionLock(hashcode)
|
||||
* - The same lock protects a target, all locks on that target, and
|
||||
* the linked list of locks on the target.
|
||||
@@ -162,7 +170,7 @@
|
||||
* PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
|
||||
* BlockNumber newblkno)
|
||||
* TransferPredicateLocksToHeapRelation(Relation relation)
|
||||
* ReleasePredicateLocks(bool isCommit)
|
||||
* ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
|
||||
*
|
||||
* conflict detection (may also trigger rollback)
|
||||
* CheckForSerializableConflictOut(bool visible, Relation relation,
|
||||
@@ -187,6 +195,7 @@
|
||||
|
||||
#include "access/heapam.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "access/parallel.h"
|
||||
#include "access/slru.h"
|
||||
#include "access/subtrans.h"
|
||||
#include "access/transam.h"
|
||||
@@ -279,6 +288,7 @@
|
||||
#define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
|
||||
#define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
|
||||
#define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
|
||||
#define SxactIsPartiallyReleased(sxact) (((sxact)->flags & SXACT_FLAG_PARTIALLY_RELEASED) != 0)
|
||||
|
||||
/*
|
||||
* Compute the hash code associated with a PREDICATELOCKTARGETTAG.
|
||||
@@ -409,6 +419,15 @@ static HTAB *LocalPredicateLockHash = NULL;
|
||||
static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
|
||||
static bool MyXactDidWrite = false;
|
||||
|
||||
/*
|
||||
* The SXACT_FLAG_RO_UNSAFE optimization might lead us to release
|
||||
* MySerializableXact early. If that happens in a parallel query, the leader
|
||||
* needs to defer the destruction of the SERIALIZABLEXACT until end of
|
||||
* transaction, because the workers still have a reference to it. In that
|
||||
* case, the leader stores it here.
|
||||
*/
|
||||
static SERIALIZABLEXACT *SavedSerializableXact = InvalidSerializableXact;
|
||||
|
||||
/* local functions */
|
||||
|
||||
static SERIALIZABLEXACT *CreatePredXact(void);
|
||||
@@ -465,6 +484,8 @@ static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
|
||||
static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
|
||||
static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
|
||||
SERIALIZABLEXACT *writer);
|
||||
static void CreateLocalPredicateLockHash(void);
|
||||
static void ReleasePredicateLocksLocal(void);
|
||||
|
||||
|
||||
/*------------------------------------------------------------------------*/
|
||||
@@ -521,7 +542,7 @@ SerializationNeededForRead(Relation relation, Snapshot snapshot)
|
||||
*/
|
||||
if (SxactIsROSafe(MySerializableXact))
|
||||
{
|
||||
ReleasePredicateLocks(false);
|
||||
ReleasePredicateLocks(false, true);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -1168,6 +1189,8 @@ InitPredicateLocks(void)
|
||||
memset(PredXact->element, 0, requestSize);
|
||||
for (i = 0; i < max_table_size; i++)
|
||||
{
|
||||
LWLockInitialize(&PredXact->element[i].sxact.predicateLockListLock,
|
||||
LWTRANCHE_SXACT);
|
||||
SHMQueueInsertBefore(&(PredXact->availableList),
|
||||
&(PredXact->element[i].link));
|
||||
}
|
||||
@@ -1513,14 +1536,14 @@ GetSafeSnapshot(Snapshot origSnapshot)
|
||||
ereport(DEBUG2,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("deferrable snapshot was unsafe; trying a new one")));
|
||||
ReleasePredicateLocks(false);
|
||||
ReleasePredicateLocks(false, false);
|
||||
}
|
||||
|
||||
/*
|
||||
* Now we have a safe snapshot, so we don't need to do any further checks.
|
||||
*/
|
||||
Assert(SxactIsROSafe(MySerializableXact));
|
||||
ReleasePredicateLocks(false);
|
||||
ReleasePredicateLocks(false, true);
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
@@ -1633,6 +1656,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
|
||||
{
|
||||
Assert(IsolationIsSerializable());
|
||||
|
||||
/*
|
||||
* If this is called by parallel.c in a parallel worker, we don't want to
|
||||
* create a SERIALIZABLEXACT just yet because the leader's
|
||||
* SERIALIZABLEXACT will be installed with AttachSerializableXact(). We
|
||||
* also don't want to reject SERIALIZABLE READ ONLY DEFERRABLE in this
|
||||
* case, because the leader has already determined that the snapshot it
|
||||
* has passed us is safe. So there is nothing for us to do.
|
||||
*/
|
||||
if (IsParallelWorker())
|
||||
return;
|
||||
|
||||
/*
|
||||
* We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to
|
||||
* import snapshots, since there's no way to wait for a safe snapshot when
|
||||
@@ -1666,7 +1700,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
|
||||
VirtualTransactionId vxid;
|
||||
SERIALIZABLEXACT *sxact,
|
||||
*othersxact;
|
||||
HASHCTL hash_ctl;
|
||||
|
||||
/* We only do this for serializable transactions. Once. */
|
||||
Assert(MySerializableXact == InvalidSerializableXact);
|
||||
@@ -1813,6 +1846,16 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
|
||||
|
||||
LWLockRelease(SerializableXactHashLock);
|
||||
|
||||
CreateLocalPredicateLockHash();
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
static void
|
||||
CreateLocalPredicateLockHash(void)
|
||||
{
|
||||
HASHCTL hash_ctl;
|
||||
|
||||
/* Initialize the backend-local hash table of parent locks */
|
||||
Assert(LocalPredicateLockHash == NULL);
|
||||
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
|
||||
@@ -1822,8 +1865,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
|
||||
max_predicate_locks_per_xact,
|
||||
&hash_ctl,
|
||||
HASH_ELEM | HASH_BLOBS);
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2078,7 +2119,9 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
|
||||
* This implementation is assuming that the usage of each target tag field
|
||||
* is uniform. No need to make this hard if we don't have to.
|
||||
*
|
||||
* We aren't acquiring lightweight locks for the predicate lock or lock
|
||||
* We acquire an LWLock in the case of parallel mode, because worker
|
||||
* backends have access to the leader's SERIALIZABLEXACT. Otherwise,
|
||||
* we aren't acquiring LWLocks for the predicate lock or lock
|
||||
* target structures associated with this transaction unless we're going
|
||||
* to modify them, because no other process is permitted to modify our
|
||||
* locks.
|
||||
@@ -2091,6 +2134,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
|
||||
|
||||
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
|
||||
sxact = MySerializableXact;
|
||||
if (IsInParallelMode())
|
||||
LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
|
||||
predlock = (PREDICATELOCK *)
|
||||
SHMQueueNext(&(sxact->predicateLocks),
|
||||
&(sxact->predicateLocks),
|
||||
@@ -2144,6 +2189,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
|
||||
|
||||
predlock = nextpredlock;
|
||||
}
|
||||
if (IsInParallelMode())
|
||||
LWLockRelease(&sxact->predicateLockListLock);
|
||||
LWLockRelease(SerializablePredicateLockListLock);
|
||||
}
|
||||
|
||||
@@ -2342,6 +2389,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
|
||||
partitionLock = PredicateLockHashPartitionLock(targettaghash);
|
||||
|
||||
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
|
||||
if (IsInParallelMode())
|
||||
LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
|
||||
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
|
||||
|
||||
/* Make sure that the target is represented. */
|
||||
@@ -2379,6 +2428,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
|
||||
}
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
if (IsInParallelMode())
|
||||
LWLockRelease(&sxact->predicateLockListLock);
|
||||
LWLockRelease(SerializablePredicateLockListLock);
|
||||
}
|
||||
|
||||
@@ -2566,7 +2617,8 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
|
||||
PREDICATELOCK *nextpredlock;
|
||||
bool found;
|
||||
|
||||
Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
|
||||
Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
|
||||
LW_EXCLUSIVE));
|
||||
Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
|
||||
|
||||
predlock = (PREDICATELOCK *)
|
||||
@@ -2626,7 +2678,7 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
|
||||
* covers it, or if we are absolutely certain that no one will need to
|
||||
* refer to that lock in the future.
|
||||
*
|
||||
* Caller must hold SerializablePredicateLockListLock.
|
||||
* Caller must hold SerializablePredicateLockListLock exclusively.
|
||||
*/
|
||||
static bool
|
||||
TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
|
||||
@@ -2641,7 +2693,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
|
||||
bool found;
|
||||
bool outOfShmem = false;
|
||||
|
||||
Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
|
||||
Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
|
||||
LW_EXCLUSIVE));
|
||||
|
||||
oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
|
||||
newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
|
||||
@@ -3217,9 +3270,17 @@ SetNewSxactGlobalXmin(void)
|
||||
* If this transaction is committing and is holding any predicate locks,
|
||||
* it must be added to a list of completed serializable transactions still
|
||||
* holding locks.
|
||||
*
|
||||
* If isReadOnlySafe is true, then predicate locks are being released before
|
||||
* the end of the transaction because MySerializableXact has been determined
|
||||
* to be RO_SAFE. In non-parallel mode we can release it completely, but it
|
||||
* in parallel mode we partially release the SERIALIZABLEXACT and keep it
|
||||
* around until the end of the transaction, allowing each backend to clear its
|
||||
* MySerializableXact variable and benefit from the optimization in its own
|
||||
* time.
|
||||
*/
|
||||
void
|
||||
ReleasePredicateLocks(bool isCommit)
|
||||
ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
|
||||
{
|
||||
bool needToClear;
|
||||
RWConflict conflict,
|
||||
@@ -3238,6 +3299,44 @@ ReleasePredicateLocks(bool isCommit)
|
||||
*/
|
||||
bool topLevelIsDeclaredReadOnly;
|
||||
|
||||
/* We can't be both committing and releasing early due to RO_SAFE. */
|
||||
Assert(!(isCommit && isReadOnlySafe));
|
||||
|
||||
/* Are we at the end of a transaction, that is, a commit or abort? */
|
||||
if (!isReadOnlySafe)
|
||||
{
|
||||
/*
|
||||
* Parallel workers mustn't release predicate locks at the end of
|
||||
* their transaction. The leader will do that at the end of its
|
||||
* transaction.
|
||||
*/
|
||||
if (IsParallelWorker())
|
||||
{
|
||||
ReleasePredicateLocksLocal();
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* By the time the leader in a parallel query reaches end of
|
||||
* transaction, it has waited for all workers to exit.
|
||||
*/
|
||||
Assert(!ParallelContextActive());
|
||||
|
||||
/*
|
||||
* If the leader in a parallel query earlier stashed a partially
|
||||
* released SERIALIZABLEXACT for final clean-up at end of transaction
|
||||
* (because workers might still have been accessing it), then it's
|
||||
* time to restore it.
|
||||
*/
|
||||
if (SavedSerializableXact != InvalidSerializableXact)
|
||||
{
|
||||
Assert(MySerializableXact == InvalidSerializableXact);
|
||||
MySerializableXact = SavedSerializableXact;
|
||||
SavedSerializableXact = InvalidSerializableXact;
|
||||
Assert(SxactIsPartiallyReleased(MySerializableXact));
|
||||
}
|
||||
}
|
||||
|
||||
if (MySerializableXact == InvalidSerializableXact)
|
||||
{
|
||||
Assert(LocalPredicateLockHash == NULL);
|
||||
@@ -3246,10 +3345,51 @@ ReleasePredicateLocks(bool isCommit)
|
||||
|
||||
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
|
||||
|
||||
/*
|
||||
* If the transaction is committing, but it has been partially released
|
||||
* already, then treat this as a roll back. It was marked as rolled back.
|
||||
*/
|
||||
if (isCommit && SxactIsPartiallyReleased(MySerializableXact))
|
||||
isCommit = false;
|
||||
|
||||
/*
|
||||
* If we're called in the middle of a transaction because we discovered
|
||||
* that the SXACT_FLAG_RO_SAFE flag was set, then we'll partially release
|
||||
* it (that is, release the predicate locks and conflicts, but not the
|
||||
* SERIALIZABLEXACT itself) if we're the first backend to have noticed.
|
||||
*/
|
||||
if (isReadOnlySafe && IsInParallelMode())
|
||||
{
|
||||
/*
|
||||
* The leader needs to stash a pointer to it, so that it can
|
||||
* completely release it at end-of-transaction.
|
||||
*/
|
||||
if (!IsParallelWorker())
|
||||
SavedSerializableXact = MySerializableXact;
|
||||
|
||||
/*
|
||||
* The first backend to reach this condition will partially release
|
||||
* the SERIALIZABLEXACT. All others will just clear their
|
||||
* backend-local state so that they stop doing SSI checks for the rest
|
||||
* of the transaction.
|
||||
*/
|
||||
if (SxactIsPartiallyReleased(MySerializableXact))
|
||||
{
|
||||
LWLockRelease(SerializableXactHashLock);
|
||||
ReleasePredicateLocksLocal();
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
MySerializableXact->flags |= SXACT_FLAG_PARTIALLY_RELEASED;
|
||||
/* ... and proceed to perform the partial release below. */
|
||||
}
|
||||
}
|
||||
Assert(!isCommit || SxactIsPrepared(MySerializableXact));
|
||||
Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
|
||||
Assert(!SxactIsCommitted(MySerializableXact));
|
||||
Assert(!SxactIsRolledBack(MySerializableXact));
|
||||
Assert(SxactIsPartiallyReleased(MySerializableXact)
|
||||
|| !SxactIsRolledBack(MySerializableXact));
|
||||
|
||||
/* may not be serializable during COMMIT/ROLLBACK PREPARED */
|
||||
Assert(MySerializableXact->pid == 0 || IsolationIsSerializable());
|
||||
@@ -3273,8 +3413,8 @@ ReleasePredicateLocks(bool isCommit)
|
||||
MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
|
||||
|
||||
/*
|
||||
* If it's not a commit it's a rollback, and we can clear our locks
|
||||
* immediately.
|
||||
* If it's not a commit it's either a rollback or a read-only transaction
|
||||
* flagged SXACT_FLAG_RO_SAFE, and we can clear our locks immediately.
|
||||
*/
|
||||
if (isCommit)
|
||||
{
|
||||
@@ -3298,7 +3438,8 @@ ReleasePredicateLocks(bool isCommit)
|
||||
* cleanup. This means it should not be considered when calculating
|
||||
* SxactGlobalXmin.
|
||||
*/
|
||||
MySerializableXact->flags |= SXACT_FLAG_DOOMED;
|
||||
if (!isReadOnlySafe)
|
||||
MySerializableXact->flags |= SXACT_FLAG_DOOMED;
|
||||
MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
|
||||
|
||||
/*
|
||||
@@ -3494,7 +3635,8 @@ ReleasePredicateLocks(bool isCommit)
|
||||
* was launched.
|
||||
*/
|
||||
needToClear = false;
|
||||
if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
|
||||
if (!isReadOnlySafe &&
|
||||
TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
|
||||
{
|
||||
Assert(PredXact->SxactGlobalXminCount > 0);
|
||||
if (--(PredXact->SxactGlobalXminCount) == 0)
|
||||
@@ -3513,14 +3655,28 @@ ReleasePredicateLocks(bool isCommit)
|
||||
SHMQueueInsertBefore(FinishedSerializableTransactions,
|
||||
&MySerializableXact->finishedLink);
|
||||
|
||||
/*
|
||||
* If we're releasing a RO_SAFE transaction in parallel mode, we'll only
|
||||
* partially release it. That's necessary because other backends may have
|
||||
* a reference to it. The leader will release the SERIALIZABLEXACT itself
|
||||
* at the end of the transaction after workers have stopped running.
|
||||
*/
|
||||
if (!isCommit)
|
||||
ReleaseOneSerializableXact(MySerializableXact, false, false);
|
||||
ReleaseOneSerializableXact(MySerializableXact,
|
||||
isReadOnlySafe && IsInParallelMode(),
|
||||
false);
|
||||
|
||||
LWLockRelease(SerializableFinishedListLock);
|
||||
|
||||
if (needToClear)
|
||||
ClearOldPredicateLocks();
|
||||
|
||||
ReleasePredicateLocksLocal();
|
||||
}
|
||||
|
||||
static void
|
||||
ReleasePredicateLocksLocal(void)
|
||||
{
|
||||
MySerializableXact = InvalidSerializableXact;
|
||||
MyXactDidWrite = false;
|
||||
|
||||
@@ -3712,6 +3868,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
|
||||
* them to OldCommittedSxact if summarize is true)
|
||||
*/
|
||||
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
|
||||
if (IsInParallelMode())
|
||||
LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
|
||||
predlock = (PREDICATELOCK *)
|
||||
SHMQueueNext(&(sxact->predicateLocks),
|
||||
&(sxact->predicateLocks),
|
||||
@@ -3791,6 +3949,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
|
||||
*/
|
||||
SHMQueueInit(&sxact->predicateLocks);
|
||||
|
||||
if (IsInParallelMode())
|
||||
LWLockRelease(&sxact->predicateLockListLock);
|
||||
LWLockRelease(SerializablePredicateLockListLock);
|
||||
|
||||
sxidtag.xid = sxact->topXid;
|
||||
@@ -4213,6 +4373,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
|
||||
PREDICATELOCK *rmpredlock;
|
||||
|
||||
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
|
||||
if (IsInParallelMode())
|
||||
LWLockAcquire(&MySerializableXact->predicateLockListLock, LW_EXCLUSIVE);
|
||||
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
|
||||
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
|
||||
|
||||
@@ -4247,6 +4409,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
|
||||
|
||||
LWLockRelease(SerializableXactHashLock);
|
||||
LWLockRelease(partitionLock);
|
||||
if (IsInParallelMode())
|
||||
LWLockRelease(&MySerializableXact->predicateLockListLock);
|
||||
LWLockRelease(SerializablePredicateLockListLock);
|
||||
|
||||
if (rmpredlock != NULL)
|
||||
@@ -4677,6 +4841,7 @@ PreCommit_CheckForSerializationFailure(void)
|
||||
/* Check if someone else has already decided that we need to die */
|
||||
if (SxactIsDoomed(MySerializableXact))
|
||||
{
|
||||
Assert(!SxactIsPartiallyReleased(MySerializableXact));
|
||||
LWLockRelease(SerializableXactHashLock);
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
@@ -4795,6 +4960,13 @@ AtPrepare_PredicateLocks(void)
|
||||
*/
|
||||
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
|
||||
|
||||
/*
|
||||
* No need to take sxact->predicateLockListLock in parallel mode because
|
||||
* there cannot be any parallel workers running while we are preparing a
|
||||
* transaction.
|
||||
*/
|
||||
Assert(!IsParallelWorker() && !ParallelContextActive());
|
||||
|
||||
predlock = (PREDICATELOCK *)
|
||||
SHMQueueNext(&(sxact->predicateLocks),
|
||||
&(sxact->predicateLocks),
|
||||
@@ -4867,7 +5039,7 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
|
||||
MySerializableXact = sxid->myXact;
|
||||
MyXactDidWrite = true; /* conservatively assume that we wrote
|
||||
* something */
|
||||
ReleasePredicateLocks(isCommit);
|
||||
ReleasePredicateLocks(isCommit, false);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -5003,3 +5175,28 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
|
||||
CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare to share the current SERIALIZABLEXACT with parallel workers.
|
||||
* Return a handle object that can be used by AttachSerializableXact() in a
|
||||
* parallel worker.
|
||||
*/
|
||||
SerializableXactHandle
|
||||
ShareSerializableXact(void)
|
||||
{
|
||||
return MySerializableXact;
|
||||
}
|
||||
|
||||
/*
|
||||
* Allow parallel workers to import the leader's SERIALIZABLEXACT.
|
||||
*/
|
||||
void
|
||||
AttachSerializableXact(SerializableXactHandle handle)
|
||||
{
|
||||
|
||||
Assert(MySerializableXact == InvalidSerializableXact);
|
||||
|
||||
MySerializableXact = (SERIALIZABLEXACT *) handle;
|
||||
if (MySerializableXact != InvalidSerializableXact)
|
||||
CreateLocalPredicateLockHash();
|
||||
}
|
||||
|
Reference in New Issue
Block a user