1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-06 07:49:08 +03:00

Clean up lockmanager data structures some more, in preparation for planned

rewrite of deadlock checking.  Lock holder objects are now reachable from
the associated LOCK as well as from the owning PROC.  This makes it
practical to find all the processes holding a lock, as well as all those
waiting on the lock.  Also, clean up some of the grottier aspects of the
SHMQueue API, and cause the waitProcs list to be stored in the intuitive
direction instead of the nonintuitive one.  (Bet you didn't know that
the code followed the 'prev' link to get to the next waiting process,
instead of the 'next' link.  It doesn't do that anymore.)
This commit is contained in:
Tom Lane
2001-01-22 22:30:06 +00:00
parent 56f5f2bf82
commit e84c429062
7 changed files with 503 additions and 317 deletions

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.78 2001/01/16 06:11:34 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.79 2001/01/22 22:30:06 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -127,10 +127,10 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP)
|| (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
)
elog(DEBUG,
"%s: holder(%lx) lock(%lx) tbl(%d) pid(%d) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
"%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
where, MAKE_OFFSET(holderP), holderP->tag.lock,
HOLDER_LOCKMETHOD(*(holderP)),
holderP->tag.pid, holderP->tag.xid,
holderP->tag.proc, holderP->tag.xid,
holderP->holding[1], holderP->holding[2], holderP->holding[3],
holderP->holding[4], holderP->holding[5], holderP->holding[6],
holderP->holding[7], holderP->nHolding);
@@ -455,8 +455,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
* tag.objId block id lock id2
* or xact id
* tag.offnum 0 lock id1
* xid.pid backend pid backend pid
* xid.xid xid or 0 0
* holder.xid xid or 0 0
* persistence transaction user or backend
* or backend
*
@@ -526,11 +525,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
{
lock->grantMask = 0;
lock->waitMask = 0;
SHMQueueInit(&(lock->lockHolders));
ProcQueueInit(&(lock->waitProcs));
lock->nRequested = 0;
lock->nGranted = 0;
MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
ProcQueueInit(&(lock->waitProcs));
LOCK_PRINT("LockAcquire: new", lock, lockmode);
}
else
@@ -547,7 +547,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
*/
MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
holdertag.lock = MAKE_OFFSET(lock);
holdertag.pid = MyProcPid;
holdertag.proc = MAKE_OFFSET(MyProc);
TransactionIdStore(xid, &holdertag.xid);
/*
@@ -570,7 +570,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
{
holder->nHolding = 0;
MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
ProcAddLock(&holder->queue);
/* Add holder to appropriate lists */
SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
HOLDER_PRINT("LockAcquire: new", holder);
}
else
@@ -693,7 +695,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
{
if (holder->nHolding == 0)
{
SHMQueueDelete(&holder->queue);
SHMQueueDelete(&holder->lockLink);
SHMQueueDelete(&holder->procLink);
holder = (HOLDER *) hash_search(holderTable,
(Pointer) holder,
HASH_REMOVE, &found);
@@ -862,33 +865,17 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
static void
LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
{
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
SHM_QUEUE *holderQueue = &(proc->holderQueue);
SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
SHM_QUEUE *procHolders = &(proc->procHolders);
HOLDER *holder;
int i;
MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
if (SHMQueueEmpty(holderQueue))
return;
holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
offsetof(HOLDER, procLink));
SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
do
while (holder)
{
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
if (holder->queue.next == end)
nextHolder = NULL;
else
SHMQueueFirst(&holder->queue,
(Pointer *) &nextHolder, &nextHolder->queue);
if (lockOffset == holder->tag.lock)
{
for (i = 1; i < MAX_LOCKMODES; i++)
@@ -897,8 +884,9 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
}
}
holder = nextHolder;
} while (holder);
holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
offsetof(HOLDER, procLink));
}
}
/*
@@ -1080,7 +1068,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
*/
MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
holdertag.lock = MAKE_OFFSET(lock);
holdertag.pid = MyProcPid;
holdertag.proc = MAKE_OFFSET(MyProc);
TransactionIdStore(xid, &holdertag.xid);
holderTable = lockMethodTable->holderHash;
@@ -1160,7 +1148,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
Assert(lock->nGranted <= lock->nRequested);
if (!lock->nRequested)
if (lock->nRequested == 0)
{
/* ------------------
* if there's no one waiting in the queue,
@@ -1189,15 +1177,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
* If this was my last hold on this lock, delete my entry in the holder
* table.
*/
if (!holder->nHolding)
if (holder->nHolding == 0)
{
if (holder->queue.prev == INVALID_OFFSET)
elog(NOTICE, "LockRelease: holder.prev == INVALID_OFFSET");
if (holder->queue.next == INVALID_OFFSET)
elog(NOTICE, "LockRelease: holder.next == INVALID_OFFSET");
if (holder->queue.next != INVALID_OFFSET)
SHMQueueDelete(&holder->queue);
HOLDER_PRINT("LockRelease: deleting", holder);
SHMQueueDelete(&holder->lockLink);
SHMQueueDelete(&holder->procLink);
holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
HASH_REMOVE_SAVED, &found);
if (!holder || !found)
@@ -1220,7 +1204,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
}
/*
* LockReleaseAll -- Release all locks in a process's lock queue.
* LockReleaseAll -- Release all locks in a process's lock list.
*
* Well, not really *all* locks.
*
@@ -1234,22 +1218,20 @@ bool
LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
bool allxids, TransactionId xid)
{
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
SHM_QUEUE *holderQueue = &(proc->holderQueue);
SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
SHM_QUEUE *procHolders = &(proc->procHolders);
HOLDER *holder;
HOLDER *nextHolder;
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
int i,
numLockModes;
LOCK *lock;
bool found;
int nleft;
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
lockmethod, MyProcPid);
lockmethod, proc->pid);
#endif
Assert(lockmethod < NumLockMethods);
@@ -1260,51 +1242,33 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
return FALSE;
}
if (SHMQueueEmpty(holderQueue))
return TRUE;
numLockModes = lockMethodTable->ctl->numLockModes;
masterLock = lockMethodTable->ctl->masterLock;
SpinAcquire(masterLock);
SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
offsetof(HOLDER, procLink));
nleft = 0;
do
while (holder)
{
bool wakeupNeeded = false;
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
if (holder->queue.next == end)
nextHolder = NULL;
else
SHMQueueFirst(&holder->queue,
(Pointer *) &nextHolder, &nextHolder->queue);
/* Get link first, since we may unlink/delete this holder */
nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
offsetof(HOLDER, procLink));
Assert(holder->tag.pid == proc->pid);
Assert(holder->tag.proc == MAKE_OFFSET(proc));
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
/* Ignore items that are not of the lockmethod to be removed */
if (LOCK_LOCKMETHOD(*lock) != lockmethod)
{
nleft++;
goto next_item;
}
/* If not allxids, ignore items that are of the wrong xid */
if (!allxids && xid != holder->tag.xid)
{
nleft++;
goto next_item;
}
HOLDER_PRINT("LockReleaseAll", holder);
LOCK_PRINT("LockReleaseAll", lock, 0);
@@ -1364,9 +1328,10 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
HOLDER_PRINT("LockReleaseAll: deleting", holder);
/*
* Remove the holder entry from the process' lock queue
* Remove the holder entry from the linked lists
*/
SHMQueueDelete(&holder->queue);
SHMQueueDelete(&holder->lockLink);
SHMQueueDelete(&holder->procLink);
/*
* remove the holder entry from the hashtable
@@ -1406,18 +1371,6 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
next_item:
holder = nextHolder;
} while (holder);
/*
* Reinitialize the queue only if nothing has been left in.
*/
if (nleft == 0)
{
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
elog(DEBUG, "LockReleaseAll: reinitializing holderQueue");
#endif
SHMQueueInit(holderQueue);
}
SpinRelease(masterLock);
@@ -1476,12 +1429,11 @@ LockShmemSize(int maxBackends)
bool
DeadLockCheck(PROC *thisProc, LOCK *findlock)
{
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
PROC *waitProc;
PROC_QUEUE *waitQueue;
SHM_QUEUE *holderQueue = &(thisProc->holderQueue);
SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
SHM_QUEUE *procHolders = &(thisProc->procHolders);
HOLDER *holder;
HOLDER *nextHolder;
LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
LOCK *lock;
int i,
@@ -1501,26 +1453,16 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
/*
* Scan over all the locks held/awaited by thisProc.
*/
if (SHMQueueEmpty(holderQueue))
return false;
holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
offsetof(HOLDER, procLink));
SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
do
while (holder)
{
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
if (holder->queue.next == end)
nextHolder = NULL;
else
SHMQueueFirst(&holder->queue,
(Pointer *) &nextHolder, &nextHolder->queue);
/* Get link first, since we may unlink/delete this holder */
nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
offsetof(HOLDER, procLink));
Assert(holder->tag.pid == thisProc->pid);
Assert(holder->tag.proc == MAKE_OFFSET(thisProc));
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
@@ -1532,7 +1474,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
LOCK_PRINT("DeadLockCheck", lock, 0);
/*
* waitLock is always in holderQueue of waiting proc, if !first_run
* waitLock is always in procHolders of waiting proc, if !first_run
* then upper caller will handle waitProcs queue of waitLock.
*/
if (thisProc->waitLock == lock && !first_run)
@@ -1555,13 +1497,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
}
/*
* Else - get the next lock from thisProc's holderQueue
* Else - get the next lock from thisProc's procHolders
*/
goto nxtl;
}
waitQueue = &(lock->waitProcs);
waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
waitProc = (PROC *) MAKE_PTR(waitQueue->links.next);
/*
* Inner loop scans over all processes waiting for this lock.
@@ -1589,7 +1531,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
/* and he blocked by me -> deadlock */
if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
return true;
/* we shouldn't look at holderQueue of our blockers */
/* we shouldn't look at procHolders of our blockers */
goto nextWaitProc;
}
@@ -1600,7 +1542,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
* implicitly). Note that we don't do like test if
* !first_run (when thisProc is holder and non-waiter on
* lock) and so we call DeadLockCheck below for every
* waitProc in thisProc->holderQueue, even for waitProc-s
* waitProc in thisProc->procHolders, even for waitProc-s
* un-blocked by thisProc. Should we? This could save us
* some time...
*/
@@ -1618,7 +1560,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
goto nextWaitProc;
}
/* Recursively check this process's holderQueue. */
/* Recursively check this process's procHolders. */
Assert(nprocs < MAXBACKENDS);
checked_procs[nprocs++] = waitProc;
@@ -1699,12 +1641,12 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
}
nextWaitProc:
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
waitProc = (PROC *) MAKE_PTR(waitProc->links.next);
}
nxtl:
holder = nextHolder;
} while (holder);
}
/* if we got here, no deadlock */
return false;
@@ -1712,18 +1654,17 @@ nxtl:
#ifdef LOCK_DEBUG
/*
* Dump all locks in the proc->holderQueue. Must have already acquired
* the masterLock.
* Dump all locks in the proc->procHolders list.
*
* Must have already acquired the masterLock.
*/
void
DumpLocks(void)
{
SHMEM_OFFSET location;
PROC *proc;
SHM_QUEUE *holderQueue;
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
SHMEM_OFFSET end;
SHM_QUEUE *procHolders;
HOLDER *holder;
LOCK *lock;
int lockmethod = DEFAULT_LOCKMETHOD;
LOCKMETHODTABLE *lockMethodTable;
@@ -1734,8 +1675,7 @@ DumpLocks(void)
proc = (PROC *) MAKE_PTR(location);
if (proc != MyProc)
return;
holderQueue = &proc->holderQueue;
end = MAKE_OFFSET(holderQueue);
procHolders = &proc->procHolders;
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
@@ -1745,34 +1685,21 @@ DumpLocks(void)
if (proc->waitLock)
LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
if (SHMQueueEmpty(holderQueue))
return;
holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
offsetof(HOLDER, procLink));
SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
do
while (holder)
{
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
if (holder->queue.next == end)
nextHolder = NULL;
else
SHMQueueFirst(&holder->queue,
(Pointer *) &nextHolder, &nextHolder->queue);
Assert(holder->tag.pid == proc->pid);
Assert(holder->tag.proc == MAKE_OFFSET(proc));
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
HOLDER_PRINT("DumpLocks", holder);
LOCK_PRINT("DumpLocks", lock, 0);
holder = nextHolder;
} while (holder);
holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
offsetof(HOLDER, procLink));
}
}
/*