1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-06 07:49:08 +03:00

Overdue code review for transaction-level advisory locks patch.

Commit 62c7bd31c8 had assorted problems, most
visibly that it broke PREPARE TRANSACTION in the presence of session-level
advisory locks (which should be ignored by PREPARE), as per a recent
complaint from Stephen Rees.  More abstractly, the patch made the
LockMethodData.transactional flag not merely useless but outright
dangerous, because in point of fact that flag no longer tells you anything
at all about whether a lock is held transactionally.  This fix therefore
removes that flag altogether.  We now rely entirely on the convention
already in use in lock.c that transactional lock holds must be owned by
some ResourceOwner, while session holds are never so owned.  Setting the
locallock struct's owner link to NULL thus denotes a session hold, and
there is no redundant marker for that.

PREPARE TRANSACTION now works again when there are session-level advisory
locks, and it is also able to transfer transactional advisory locks to the
prepared transaction, but for implementation reasons it throws an error if
we hold both types of lock on a single lockable object.  Perhaps it will be
worth improving that someday.

Assorted other minor cleanup and documentation editing, as well.

Back-patch to 9.1, except that in the 9.1 branch I did not remove the
LockMethodData.transactional flag for fear of causing an ABI break for
any external code that might be examining those structs.
This commit is contained in:
Tom Lane
2012-05-04 17:43:27 -04:00
parent 1715ff1128
commit 71b9549d05
6 changed files with 266 additions and 208 deletions

View File

@@ -114,6 +114,50 @@ static const char *const lock_mode_names[] =
"AccessExclusiveLock"
};
#ifndef LOCK_DEBUG
static bool Dummy_trace = false;
#endif
static const LockMethodData default_lockmethod = {
AccessExclusiveLock, /* highest valid lock mode number */
LockConflicts,
lock_mode_names,
#ifdef LOCK_DEBUG
&Trace_locks
#else
&Dummy_trace
#endif
};
static const LockMethodData user_lockmethod = {
AccessExclusiveLock, /* highest valid lock mode number */
LockConflicts,
lock_mode_names,
#ifdef LOCK_DEBUG
&Trace_userlocks
#else
&Dummy_trace
#endif
};
/*
* map from lock method id to the lock table data structures
*/
static const LockMethod LockMethods[] = {
NULL,
&default_lockmethod,
&user_lockmethod
};
/* Record that's written to 2PC state file when a lock is persisted */
typedef struct TwoPhaseLockRecord
{
LOCKTAG locktag;
LOCKMODE lockmode;
} TwoPhaseLockRecord;
/*
* Count of the number of fast path lock slots we believe to be used. This
* might be higher than the real number if another backend has transferred
@@ -193,51 +237,6 @@ typedef struct
FastPathStrongRelationLockData *FastPathStrongRelationLocks;
#ifndef LOCK_DEBUG
static bool Dummy_trace = false;
#endif
static const LockMethodData default_lockmethod = {
AccessExclusiveLock, /* highest valid lock mode number */
true,
LockConflicts,
lock_mode_names,
#ifdef LOCK_DEBUG
&Trace_locks
#else
&Dummy_trace
#endif
};
static const LockMethodData user_lockmethod = {
AccessExclusiveLock, /* highest valid lock mode number */
true,
LockConflicts,
lock_mode_names,
#ifdef LOCK_DEBUG
&Trace_userlocks
#else
&Dummy_trace
#endif
};
/*
* map from lock method id to the lock table data structures
*/
static const LockMethod LockMethods[] = {
NULL,
&default_lockmethod,
&user_lockmethod
};
/* Record that's written to 2PC state file when a lock is persisted */
typedef struct TwoPhaseLockRecord
{
LOCKTAG locktag;
LOCKMODE lockmode;
} TwoPhaseLockRecord;
/*
* Pointers to hash tables containing lock state
@@ -342,7 +341,7 @@ static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
static void FinishStrongLockAcquire(void);
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
static void ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner);
static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
PROCLOCK *proclock, LockMethod lockMethodTable);
static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
@@ -621,11 +620,11 @@ LockAcquireExtended(const LOCKTAG *locktag,
lockMethodTable->lockModeNames[lockmode]);
#endif
/* Session locks are never transactional, else check table */
if (!sessionLock && lockMethodTable->transactional)
owner = CurrentResourceOwner;
else
/* Identify owner for lock */
if (sessionLock)
owner = NULL;
else
owner = CurrentResourceOwner;
/*
* Find or create a LOCALLOCK entry for this lock and lockmode
@@ -1650,11 +1649,11 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
ResourceOwner owner;
int i;
/* Session locks are never transactional, else check table */
if (!sessionLock && lockMethodTable->transactional)
owner = CurrentResourceOwner;
else
/* Identify owner for lock */
if (sessionLock)
owner = NULL;
else
owner = CurrentResourceOwner;
for (i = locallock->numLockOwners - 1; i >= 0; i--)
{
@@ -1779,31 +1778,6 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
return TRUE;
}
/*
* LockReleaseSession -- Release all session locks of the specified lock method
* that are held by the current process.
*/
void
LockReleaseSession(LOCKMETHODID lockmethodid)
{
HASH_SEQ_STATUS status;
LOCALLOCK *locallock;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
hash_seq_init(&status, LockMethodLocalHash);
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
/* Ignore items that are not of the specified lock method */
if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
continue;
ReleaseLockForOwner(locallock, NULL);
}
}
/*
* LockReleaseAll -- Release all locks of the specified lock method that
* are held by the current process.
@@ -2057,6 +2031,31 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
#endif
}
/*
* LockReleaseSession -- Release all session locks of the specified lock method
* that are held by the current process.
*/
void
LockReleaseSession(LOCKMETHODID lockmethodid)
{
HASH_SEQ_STATUS status;
LOCALLOCK *locallock;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
hash_seq_init(&status, LockMethodLocalHash);
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
/* Ignore items that are not of the specified lock method */
if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
continue;
ReleaseLockIfHeld(locallock, true);
}
}
/*
* LockReleaseCurrentOwner
* Release all locks belonging to CurrentResourceOwner
@@ -2071,25 +2070,37 @@ LockReleaseCurrentOwner(void)
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
/* Ignore items that must be nontransactional */
if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
continue;
ReleaseLockForOwner(locallock, CurrentResourceOwner);
ReleaseLockIfHeld(locallock, false);
}
}
/*
* Subroutine to release a lock belonging to the 'owner' if found.
* 'owner' can be NULL to release a session lock.
* ReleaseLockIfHeld
* Release any session-level locks on this lockable object if sessionLock
* is true; else, release any locks held by CurrentResourceOwner.
*
* It is tempting to pass this a ResourceOwner pointer (or NULL for session
* locks), but without refactoring LockRelease() we cannot support releasing
* locks belonging to resource owners other than CurrentResourceOwner.
* If we were to refactor, it'd be a good idea to fix it so we don't have to
* do a hashtable lookup of the locallock, too. However, currently this
* function isn't used heavily enough to justify refactoring for its
* convenience.
*/
static void
ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner)
ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
{
int i;
ResourceOwner owner;
LOCALLOCKOWNER *lockOwners;
int i;
/* Scan to see if there are any locks belonging to the owner */
/* Identify owner for lock (must match LockRelease!) */
if (sessionLock)
owner = NULL;
else
owner = CurrentResourceOwner;
/* Scan to see if there are any locks belonging to the target owner */
lockOwners = locallock->lockOwners;
for (i = locallock->numLockOwners - 1; i >= 0; i--)
{
@@ -2116,8 +2127,8 @@ ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner)
locallock->nLocks = 1;
if (!LockRelease(&locallock->tag.lock,
locallock->tag.mode,
owner == NULL))
elog(WARNING, "ReleaseLockForOwner: failed??");
sessionLock))
elog(WARNING, "ReleaseLockIfHeld: failed??");
}
break;
}
@@ -2147,10 +2158,6 @@ LockReassignCurrentOwner(void)
int ic = -1;
int ip = -1;
/* Ignore items that must be nontransactional */
if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
continue;
/*
* Scan to see if there are any locks belonging to current owner or
* its parent
@@ -2729,13 +2736,13 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
* Do the preparatory work for a PREPARE: make 2PC state file records
* for all locks currently held.
*
* Non-transactional locks are ignored, as are VXID locks.
* Session-level locks are ignored, as are VXID locks.
*
* There are some special cases that we error out on: we can't be holding
* any session locks (should be OK since only VACUUM uses those) and we
* can't be holding any locks on temporary objects (since that would mess
* up the current backend if it tries to exit before the prepared xact is
* committed).
* There are some special cases that we error out on: we can't be holding any
* locks at both session and transaction level (since we must either keep or
* give away the PROCLOCK object), and we can't be holding any locks on
* temporary objects (since that would mess up the current backend if it tries
* to exit before the prepared xact is committed).
*/
void
AtPrepare_Locks(void)
@@ -2755,12 +2762,10 @@ AtPrepare_Locks(void)
{
TwoPhaseLockRecord record;
LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
bool haveSessionLock;
bool haveXactLock;
int i;
/* Ignore nontransactional locks */
if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
continue;
/*
* Ignore VXID locks. We don't want those to be held by prepared
* transactions, since they aren't meaningful after a restart.
@@ -2772,14 +2777,38 @@ AtPrepare_Locks(void)
if (locallock->nLocks <= 0)
continue;
/* Scan to verify there are no session locks */
/* Scan to see whether we hold it at session or transaction level */
haveSessionLock = haveXactLock = false;
for (i = locallock->numLockOwners - 1; i >= 0; i--)
{
/* elog not ereport since this should not happen */
if (lockOwners[i].owner == NULL)
elog(ERROR, "cannot PREPARE when session locks exist");
haveSessionLock = true;
else
haveXactLock = true;
}
/* Ignore it if we have only session lock */
if (!haveXactLock)
continue;
/*
* If we have both session- and transaction-level locks, fail. This
* should never happen with regular locks, since we only take those at
* session level in some special operations like VACUUM. It's
* possible to hit this with advisory locks, though.
*
* It would be nice if we could keep the session hold and give away
* the transactional hold to the prepared xact. However, that would
* require two PROCLOCK objects, and we cannot be sure that another
* PROCLOCK will be available when it comes time for PostPrepare_Locks
* to do the deed. So for now, we error out while we can still do so
* safely.
*/
if (haveSessionLock)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
/*
* If the local lock was taken via the fast-path, we need to move it
* to the primary lock table, or just get a pointer to the existing
@@ -2792,7 +2821,7 @@ AtPrepare_Locks(void)
}
/*
* Arrange not to release any strong lock count held by this lock
* Arrange to not release any strong lock count held by this lock
* entry. We must retain the count until the prepared transaction
* is committed or rolled back.
*/
@@ -2852,6 +2881,11 @@ PostPrepare_Locks(TransactionId xid)
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
bool haveSessionLock;
bool haveXactLock;
int i;
if (locallock->proclock == NULL || locallock->lock == NULL)
{
/*
@@ -2863,15 +2897,29 @@ PostPrepare_Locks(TransactionId xid)
continue;
}
/* Ignore nontransactional locks */
if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
continue;
/* Ignore VXID locks */
if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
continue;
/* We already checked there are no session locks */
/* Scan to see whether we hold it at session or transaction level */
haveSessionLock = haveXactLock = false;
for (i = locallock->numLockOwners - 1; i >= 0; i--)
{
if (lockOwners[i].owner == NULL)
haveSessionLock = true;
else
haveXactLock = true;
}
/* Ignore it if we have only session lock */
if (!haveXactLock)
continue;
/* This can't happen, because we already checked it */
if (haveSessionLock)
ereport(PANIC,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
/* Mark the proclock to show we need to release this lockmode */
if (locallock->nLocks > 0)
@@ -2912,10 +2960,6 @@ PostPrepare_Locks(TransactionId xid)
lock = proclock->tag.myLock;
/* Ignore nontransactional locks */
if (!LockMethods[LOCK_LOCKMETHOD(*lock)]->transactional)
goto next_item;
/* Ignore VXID locks */
if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
goto next_item;
@@ -2927,10 +2971,11 @@ PostPrepare_Locks(TransactionId xid)
Assert(lock->nGranted <= lock->nRequested);
Assert((proclock->holdMask & ~lock->grantMask) == 0);
/*
* Since there were no session locks, we should be releasing all
* locks
*/
/* Ignore it if nothing to release (must be a session lock) */
if (proclock->releaseMask == 0)
goto next_item;
/* Else we should be releasing all locks */
if (proclock->releaseMask != proclock->holdMask)
elog(PANIC, "we seem to have dropped a bit somewhere");