diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index e35ac9910ca..f50c6883068 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -664,7 +664,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid) SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid); - LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, false); + (void) LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, + false, NULL); } static void diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 9a2f94beaf7..0b9f105b531 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -105,11 +105,12 @@ void LockRelationOid(Oid relid, LOCKMODE lockmode) { LOCKTAG tag; + LOCALLOCK *locallock; LockAcquireResult res; SetLocktagRelationOid(&tag, relid); - res = LockAcquire(&tag, lockmode, false, false); + res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock); /* * Now that we have the lock, check for invalidation messages, so that we @@ -120,9 +121,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode) * relcache entry in an undesirable way. (In the case where our own xact * modifies the rel, the relcache update happens via * CommandCounterIncrement, not here.) + * + * However, in corner cases where code acts on tables (usually catalogs) + * recursively, we might get here while still processing invalidation + * messages in some outer execution of this function or a sibling. The + * "cleared" status of the lock tells us whether we really are done + * absorbing relevant inval messages. */ - if (res != LOCKACQUIRE_ALREADY_HELD) + if (res != LOCKACQUIRE_ALREADY_CLEAR) + { AcceptInvalidationMessages(); + MarkLockClear(locallock); + } } /* @@ -138,11 +148,12 @@ bool ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode) { LOCKTAG tag; + LOCALLOCK *locallock; LockAcquireResult res; SetLocktagRelationOid(&tag, relid); - res = LockAcquire(&tag, lockmode, false, true); + res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock); if (res == LOCKACQUIRE_NOT_AVAIL) return false; @@ -151,8 +162,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode) * Now that we have the lock, check for invalidation messages; see notes * in LockRelationOid. */ - if (res != LOCKACQUIRE_ALREADY_HELD) + if (res != LOCKACQUIRE_ALREADY_CLEAR) + { AcceptInvalidationMessages(); + MarkLockClear(locallock); + } return true; } @@ -199,20 +213,24 @@ void LockRelation(Relation relation, LOCKMODE lockmode) { LOCKTAG tag; + LOCALLOCK *locallock; LockAcquireResult res; SET_LOCKTAG_RELATION(tag, relation->rd_lockInfo.lockRelId.dbId, relation->rd_lockInfo.lockRelId.relId); - res = LockAcquire(&tag, lockmode, false, false); + res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock); /* * Now that we have the lock, check for invalidation messages; see notes * in LockRelationOid. */ - if (res != LOCKACQUIRE_ALREADY_HELD) + if (res != LOCKACQUIRE_ALREADY_CLEAR) + { AcceptInvalidationMessages(); + MarkLockClear(locallock); + } } /* @@ -226,13 +244,14 @@ bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode) { LOCKTAG tag; + LOCALLOCK *locallock; LockAcquireResult res; SET_LOCKTAG_RELATION(tag, relation->rd_lockInfo.lockRelId.dbId, relation->rd_lockInfo.lockRelId.relId); - res = LockAcquire(&tag, lockmode, false, true); + res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock); if (res == LOCKACQUIRE_NOT_AVAIL) return false; @@ -241,8 +260,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode) * Now that we have the lock, check for invalidation messages; see notes * in LockRelationOid. */ - if (res != LOCKACQUIRE_ALREADY_HELD) + if (res != LOCKACQUIRE_ALREADY_CLEAR) + { AcceptInvalidationMessages(); + MarkLockClear(locallock); + } return true; } diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 4f0ad27e6d1..f7e947af936 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -669,6 +669,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) * LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true * LOCKACQUIRE_OK lock successfully acquired * LOCKACQUIRE_ALREADY_HELD incremented count for lock already held + * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear * * In the normal case where dontWait=false and the caller doesn't need to * distinguish a freshly acquired lock from one already taken earlier in @@ -685,7 +686,8 @@ LockAcquire(const LOCKTAG *locktag, bool sessionLock, bool dontWait) { - return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true); + return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, + true, NULL); } /* @@ -696,13 +698,17 @@ LockAcquire(const LOCKTAG *locktag, * caller to note that the lock table is full and then begin taking * extreme action to reduce the number of other lock holders before * retrying the action. + * + * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK + * table entry if a lock is successfully acquired, or NULL if not. */ LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait, - bool reportMemoryError) + bool reportMemoryError, + LOCALLOCK **locallockp) { LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; @@ -769,6 +775,7 @@ LockAcquireExtended(const LOCKTAG *locktag, locallock->numLockOwners = 0; locallock->maxLockOwners = 8; locallock->holdsStrongLockCount = FALSE; + locallock->lockCleared = false; locallock->lockOwners = NULL; /* in case next line fails */ locallock->lockOwners = (LOCALLOCKOWNER *) MemoryContextAlloc(TopMemoryContext, @@ -789,13 +796,22 @@ LockAcquireExtended(const LOCKTAG *locktag, } hashcode = locallock->hashcode; + if (locallockp) + *locallockp = locallock; + /* * If we already hold the lock, we can just increase the count locally. + * + * If lockCleared is already set, caller need not worry about absorbing + * sinval messages related to the lock's object. */ if (locallock->nLocks > 0) { GrantLockLocal(locallock, owner); - return LOCKACQUIRE_ALREADY_HELD; + if (locallock->lockCleared) + return LOCKACQUIRE_ALREADY_CLEAR; + else + return LOCKACQUIRE_ALREADY_HELD; } /* @@ -877,6 +893,10 @@ LockAcquireExtended(const LOCKTAG *locktag, hashcode)) { AbortStrongLockAcquire(); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -911,6 +931,10 @@ LockAcquireExtended(const LOCKTAG *locktag, { AbortStrongLockAcquire(); LWLockRelease(partitionLock); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -976,6 +1000,8 @@ LockAcquireExtended(const LOCKTAG *locktag, LWLockRelease(partitionLock); if (locallock->nLocks == 0) RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; return LOCKACQUIRE_NOT_AVAIL; } @@ -1645,6 +1671,20 @@ GrantAwaitedLock(void) GrantLockLocal(awaitedLock, awaitedOwner); } +/* + * MarkLockClear -- mark an acquired lock as "clear" + * + * This means that we know we have absorbed all sinval messages that other + * sessions generated before we acquired this lock, and so we can confidently + * assume we know about any catalog changes protected by this lock. + */ +void +MarkLockClear(LOCALLOCK *locallock) +{ + Assert(locallock->nLocks > 0); + locallock->lockCleared = true; +} + /* * WaitOnLock -- wait to acquire a lock * @@ -1912,6 +1952,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) if (locallock->nLocks > 0) return TRUE; + /* + * At this point we can no longer suppose we are clear of invalidation + * messages related to this lock. Although we'll delete the LOCALLOCK + * object before any intentional return from this routine, it seems worth + * the trouble to explicitly reset lockCleared right now, just in case + * some error prevents us from deleting the LOCALLOCK. + */ + locallock->lockCleared = false; + /* Attempt fast release of any lock eligible for the fast path. */ if (EligibleForRelationFastPath(locktag, lockmode) && FastPathLocalUseCount > 0) diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index efa75ecca9a..cc9d8f7136a 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -412,6 +412,7 @@ typedef struct LOCALLOCK int numLockOwners; /* # of relevant ResourceOwners */ int maxLockOwners; /* allocated size of array */ bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */ + bool lockCleared; /* we read all sinval msgs for lock */ LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */ } LOCALLOCK; @@ -473,7 +474,8 @@ typedef enum { LOCKACQUIRE_NOT_AVAIL, /* lock not available, and dontWait=true */ LOCKACQUIRE_OK, /* lock successfully acquired */ - LOCKACQUIRE_ALREADY_HELD /* incremented count for lock already held */ + LOCKACQUIRE_ALREADY_HELD, /* incremented count for lock already held */ + LOCKACQUIRE_ALREADY_CLEAR /* incremented count for lock already clear */ } LockAcquireResult; /* Deadlock states identified by DeadLockCheck() */ @@ -529,8 +531,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait, - bool report_memory_error); + bool reportMemoryError, + LOCALLOCK **locallockp); extern void AbortStrongLockAcquire(void); +extern void MarkLockClear(LOCALLOCK *locallock); extern bool LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock); extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);