1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-07 00:36:50 +03:00

Re-implement deadlock detection and resolution, per design notes posted

to pghackers on 18-Jan-01.
This commit is contained in:
Tom Lane
2001-01-25 03:31:16 +00:00
parent 40203e4f3e
commit a05eae029a
7 changed files with 1017 additions and 574 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.97 2001/01/25 03:31:16 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -18,7 +18,7 @@
*
*
* Interface (a):
* ProcSleep(), ProcWakeup(), ProcWakeupNext(),
* ProcSleep(), ProcWakeup(),
* ProcQueueAlloc() -- create a shm queue for sleeping processes
* ProcQueueInit() -- create a queue without allocing memory
*
@ -47,8 +47,6 @@
* shared among backends (we keep a few sets of semaphores around).
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
*/
#include "postgres.h"
@ -257,7 +255,7 @@ InitProcess(void)
}
SHMQueueElemInit(&(MyProc->links));
MyProc->errType = NO_ERROR;
MyProc->errType = STATUS_OK;
MyProc->pid = MyProcPid;
MyProc->databaseId = MyDatabaseId;
MyProc->xid = InvalidTransactionId;
@ -284,7 +282,16 @@ InitProcess(void)
(location != MAKE_OFFSET(MyProc)))
elog(STOP, "InitProcess: ShmemPID table broken");
/*
* Arrange to clean up at backend exit.
*/
on_shmem_exit(ProcKill, 0);
/*
* Now that we have a PROC, we could try to acquire locks,
* so initialize the deadlock checker.
*/
InitDeadLockChecking();
}
/*
@ -304,50 +311,6 @@ ZeroProcSemaphore(PROC *proc)
}
}
/*
* Remove a proc from the wait-queue it is on
* (caller must know it is on one).
* Locktable lock must be held by caller.
*
* NB: this does not remove the process' holder object, nor the lock object,
* even though their counts might now have gone to zero. That will happen
* during a subsequent LockReleaseAll call, which we expect will happen
* during transaction cleanup. (Removal of a proc from its wait queue by
* this routine can only happen if we are aborting the transaction.)
*/
static void
RemoveFromWaitQueue(PROC *proc)
{
LOCK *waitLock = proc->waitLock;
LOCKMODE lockmode = proc->waitLockMode;
/* Make sure proc is waiting */
Assert(proc->links.next != INVALID_OFFSET);
Assert(waitLock);
Assert(waitLock->waitProcs.size > 0);
/* Remove proc from lock's wait queue */
SHMQueueDelete(&(proc->links));
waitLock->waitProcs.size--;
/* Undo increments of request counts by waiting process */
Assert(waitLock->nRequested > 0);
Assert(waitLock->nRequested > proc->waitLock->nGranted);
waitLock->nRequested--;
Assert(waitLock->requested[lockmode] > 0);
waitLock->requested[lockmode]--;
/* don't forget to clear waitMask bit if appropriate */
if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
waitLock->waitMask &= ~(1 << lockmode);
/* Clean up the proc's own state */
proc->waitLock = NULL;
proc->waitHolder = NULL;
/* See if any other waiters for the lock can be woken up now */
ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock);
}
/*
* Cancel any pending wait for lock, when aborting a transaction.
*
@ -529,34 +492,34 @@ ProcQueueInit(PROC_QUEUE *queue)
/*
* ProcSleep -- put a process to sleep
*
* P() on the semaphore should put us to sleep. The process
* semaphore is normally zero, so when we try to acquire it, we sleep.
* Caller must have set MyProc->heldLocks to reflect locks already held
* on the lockable object by this process (under all XIDs).
*
* Locktable's spinlock must be held at entry, and will be held
* at exit.
*
* Result is NO_ERROR if we acquired the lock, STATUS_ERROR if not (deadlock).
* Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock).
*
* ASSUME: that no one will fiddle with the queue until after
* we release the spin lock.
*
* NOTES: The process queue is now a priority queue for locking.
*
* P() on the semaphore should put us to sleep. The process
* semaphore is normally zero, so when we try to acquire it, we sleep.
*/
int
ProcSleep(LOCKMETHODCTL *lockctl,
ProcSleep(LOCKMETHODTABLE *lockMethodTable,
LOCKMODE lockmode,
LOCK *lock,
HOLDER *holder)
{
PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
SPINLOCK spinlock = lockctl->masterLock;
int myMask = (1 << lockmode);
int waitMask = lock->waitMask;
PROC_QUEUE *waitQueue = &(lock->waitProcs);
int myHeldLocks = MyProc->heldLocks;
PROC *proc;
int i;
int aheadGranted[MAX_LOCKMODES];
bool selfConflict = (lockctl->conflictTab[lockmode] & myMask),
prevSame = false;
#ifndef __BEOS__
struct itimerval timeval,
dummy;
@ -564,64 +527,63 @@ ProcSleep(LOCKMETHODCTL *lockctl,
bigtime_t time_interval;
#endif
proc = (PROC *) MAKE_PTR(waitQueue->links.next);
/* if we don't conflict with any waiter - be first in queue */
if (!(lockctl->conflictTab[lockmode] & waitMask))
goto ins;
/* otherwise, determine where we should go into the queue */
for (i = 1; i < MAX_LOCKMODES; i++)
aheadGranted[i] = lock->granted[i];
(aheadGranted[lockmode])++;
for (i = 0; i < waitQueue->size; i++)
/* ----------------------
* Determine where to add myself in the wait queue.
*
* Normally I should go at the end of the queue. However, if I already
* hold locks that conflict with the request of any previous waiter,
* put myself in the queue just in front of the first such waiter.
* This is not a necessary step, since deadlock detection would move
* me to before that waiter anyway; but it's relatively cheap to detect
* such a conflict immediately, and avoid delaying till deadlock timeout.
*
* Special case: if I find I should go in front of the first waiter,
* and I do not conflict with already-held locks, then just grant myself
* the requested lock immediately.
* ----------------------
*/
if (myHeldLocks != 0)
{
LOCKMODE procWaitMode = proc->waitLockMode;
/* must I wait for him ? */
if (lockctl->conflictTab[lockmode] & proc->heldLocks)
proc = (PROC *) MAKE_PTR(waitQueue->links.next);
for (i = 0; i < waitQueue->size; i++)
{
/* is he waiting for me ? */
if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
/* Must he wait for me? */
if (lockctl->conflictTab[proc->waitLockMode] & myHeldLocks)
{
/* Yes, report deadlock failure */
MyProc->errType = STATUS_ERROR;
return STATUS_ERROR;
}
/* I must go after him in queue - so continue loop */
}
/* if he waits for me, go before him in queue */
else if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
break;
/* if conflicting locks requested */
else if (lockctl->conflictTab[procWaitMode] & myMask)
{
/*
* If I request non self-conflicting lock and there are others
* requesting the same lock just before this guy - stop here.
*/
if (!selfConflict && prevSame)
/* Must I wait for him ? */
if (lockctl->conflictTab[lockmode] & proc->heldLocks)
{
/* Yes, can report deadlock failure immediately */
MyProc->errType = STATUS_ERROR;
return STATUS_ERROR;
}
if (i == 0)
{
/* I must go before first waiter. Check special case. */
if (LockCheckConflicts(lockMethodTable,
lockmode,
lock,
holder,
MyProc,
NULL) == STATUS_OK)
{
/* Skip the wait and just grant myself the lock. */
GrantLock(lock, holder, lockmode);
return STATUS_OK;
}
}
/* Break out of loop to put myself before him */
break;
}
proc = (PROC *) MAKE_PTR(proc->links.next);
}
/*
* Last attempt to not move any further to the back of the queue:
* if we don't conflict with remaining waiters, stop here.
*/
else if (!(lockctl->conflictTab[lockmode] & waitMask))
break;
/* Move past this guy, and update state accordingly */
prevSame = (procWaitMode == lockmode);
(aheadGranted[procWaitMode])++;
if (aheadGranted[procWaitMode] == lock->requested[procWaitMode])
waitMask &= ~(1 << procWaitMode);
proc = (PROC *) MAKE_PTR(proc->links.next);
}
else
{
/* I hold no locks, so I can't push in front of anyone. */
proc = (PROC *) &(waitQueue->links);
}
ins:;
/* -------------------
* Insert self into queue, ahead of the given proc (or at tail of queue).
* -------------------
@ -629,15 +591,14 @@ ins:;
SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
waitQueue->size++;
lock->waitMask |= myMask;
lock->waitMask |= (1 << lockmode);
/* Set up wait information in PROC object, too */
MyProc->waitLock = lock;
MyProc->waitHolder = holder;
MyProc->waitLockMode = lockmode;
/* We assume the caller set up MyProc->heldLocks */
MyProc->errType = NO_ERROR; /* initialize result for success */
MyProc->errType = STATUS_OK; /* initialize result for success */
/* mark that we are waiting for a lock */
waitingForLock = true;
@ -662,7 +623,7 @@ ins:;
* By delaying the check until we've waited for a bit, we can avoid
* running the rather expensive deadlock-check code in most cases.
*
* Need to zero out struct to set the interval and the micro seconds fields
* Need to zero out struct to set the interval and the microseconds fields
* to 0.
* --------------
*/
@ -768,89 +729,59 @@ ProcWakeup(PROC *proc, int errType)
/*
* ProcLockWakeup -- routine for waking up processes when a lock is
* released.
* released (or a prior waiter is aborted). Scan all waiters
* for lock, waken any that are no longer blocked.
*/
int
ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
void
ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock)
{
PROC_QUEUE *queue = &(lock->waitProcs);
LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
PROC_QUEUE *waitQueue = &(lock->waitProcs);
int queue_size = waitQueue->size;
PROC *proc;
int awoken = 0;
LOCKMODE last_lockmode = 0;
int queue_size = queue->size;
int conflictMask = 0;
Assert(queue_size >= 0);
if (!queue_size)
return STATUS_NOT_FOUND;
if (queue_size == 0)
return;
proc = (PROC *) MAKE_PTR(queue->links.next);
proc = (PROC *) MAKE_PTR(waitQueue->links.next);
while (queue_size-- > 0)
{
if (proc->waitLockMode == last_lockmode)
LOCKMODE lockmode = proc->waitLockMode;
/*
* Waken if (a) doesn't conflict with requests of earlier waiters,
* and (b) doesn't conflict with already-held locks.
*/
if (((1 << lockmode) & conflictMask) == 0 &&
LockCheckConflicts(lockMethodTable,
lockmode,
lock,
proc->waitHolder,
proc,
NULL) == STATUS_OK)
{
/* OK to waken */
GrantLock(lock, proc->waitHolder, lockmode);
proc = ProcWakeup(proc, STATUS_OK);
/*
* This proc will conflict as the previous one did, don't even
* try.
* ProcWakeup removes proc from the lock's waiting process queue
* and returns the next proc in chain; don't use proc's next-link,
* because it's been cleared.
*/
goto nextProc;
}
/*
* Does this proc conflict with locks held by others ?
*/
if (LockResolveConflicts(lockmethod,
proc->waitLockMode,
lock,
proc->waitHolder,
proc,
NULL) != STATUS_OK)
else
{
/* Yes. Quit if we already awoke at least one process. */
if (awoken != 0)
break;
/* Otherwise, see if any later waiters can be awoken. */
last_lockmode = proc->waitLockMode;
goto nextProc;
/* Cannot wake this guy. Add his request to conflict mask. */
conflictMask |= lockctl->conflictTab[lockmode];
proc = (PROC *) MAKE_PTR(proc->links.next);
}
/*
* OK to wake up this sleeping process.
*/
GrantLock(lock, proc->waitHolder, proc->waitLockMode);
proc = ProcWakeup(proc, NO_ERROR);
awoken++;
/*
* ProcWakeup removes proc from the lock's waiting process queue
* and returns the next proc in chain; don't use proc's next-link,
* because it's been cleared.
*/
continue;
nextProc:
proc = (PROC *) MAKE_PTR(proc->links.next);
}
Assert(queue->size >= 0);
if (awoken)
return STATUS_OK;
else
{
/* Something is still blocking us. May have deadlocked. */
#ifdef LOCK_DEBUG
if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
{
elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process",
MAKE_OFFSET(lock));
if (Debug_deadlocks)
DumpAllLocks();
}
#endif
return STATUS_NOT_FOUND;
}
Assert(waitQueue->size >= 0);
}
/* --------------------
@ -900,7 +831,7 @@ HandleDeadLock(SIGNAL_ARGS)
DumpAllLocks();
#endif
if (!DeadLockCheck(MyProc, MyProc->waitLock))
if (!DeadLockCheck(MyProc))
{
/* No deadlock, so keep waiting */
UnlockLockTable();