1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-29 13:56:47 +03:00

Use dlist/dclist instead of PROC_QUEUE / SHM_QUEUE for heavyweight locks

Part of a series to remove SHM_QUEUE. ilist.h style lists are more widely used
and have an easier to use interface.

As PROC_QUEUE is now unused, remove it.

Reviewed-by: Thomas Munro <thomas.munro@gmail.com> (in an older version)
Discussion: https://postgr.es/m/20221120055930.t6kl3tyivzhlrzu2@awork3.anarazel.de
Discussion: https://postgr.es/m/20200211042229.msv23badgqljrdg2@alap3.anarazel.de
This commit is contained in:
Andres Freund 2023-01-18 11:41:14 -08:00
parent 51384cc40c
commit 5764f611e1
7 changed files with 183 additions and 339 deletions

View File

@ -461,7 +461,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
/* Initialize the PGPROC entry */ /* Initialize the PGPROC entry */
MemSet(proc, 0, sizeof(PGPROC)); MemSet(proc, 0, sizeof(PGPROC));
proc->pgprocno = gxact->pgprocno; proc->pgprocno = gxact->pgprocno;
SHMQueueElemInit(&(proc->links)); dlist_node_init(&proc->links);
proc->waitStatus = PROC_WAIT_STATUS_OK; proc->waitStatus = PROC_WAIT_STATUS_OK;
if (LocalTransactionIdIsValid(MyProc->lxid)) if (LocalTransactionIdIsValid(MyProc->lxid))
{ {
@ -491,7 +491,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
proc->waitProcLock = NULL; proc->waitProcLock = NULL;
pg_atomic_init_u64(&proc->waitStart, 0); pg_atomic_init_u64(&proc->waitStart, 0);
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
SHMQueueInit(&(proc->myProcLocks[i])); dlist_init(&proc->myProcLocks[i]);
/* subxid data must be filled later by GXactLoadSubxactData */ /* subxid data must be filled later by GXactLoadSubxactData */
proc->subxidStatus.overflowed = false; proc->subxidStatus.overflowed = false;
proc->subxidStatus.count = 0; proc->subxidStatus.count = 0;

View File

@ -216,9 +216,6 @@ InitDeadLockChecking(void)
DeadLockState DeadLockState
DeadLockCheck(PGPROC *proc) DeadLockCheck(PGPROC *proc)
{ {
int i,
j;
/* Initialize to "no constraints" */ /* Initialize to "no constraints" */
nCurConstraints = 0; nCurConstraints = 0;
nPossibleConstraints = 0; nPossibleConstraints = 0;
@ -246,26 +243,23 @@ DeadLockCheck(PGPROC *proc)
} }
/* Apply any needed rearrangements of wait queues */ /* Apply any needed rearrangements of wait queues */
for (i = 0; i < nWaitOrders; i++) for (int i = 0; i < nWaitOrders; i++)
{ {
LOCK *lock = waitOrders[i].lock; LOCK *lock = waitOrders[i].lock;
PGPROC **procs = waitOrders[i].procs; PGPROC **procs = waitOrders[i].procs;
int nProcs = waitOrders[i].nProcs; int nProcs = waitOrders[i].nProcs;
PROC_QUEUE *waitQueue = &(lock->waitProcs); dclist_head *waitQueue = &lock->waitProcs;
Assert(nProcs == waitQueue->size); Assert(nProcs == dclist_count(waitQueue));
#ifdef DEBUG_DEADLOCK #ifdef DEBUG_DEADLOCK
PrintLockQueue(lock, "DeadLockCheck:"); PrintLockQueue(lock, "DeadLockCheck:");
#endif #endif
/* Reset the queue and re-add procs in the desired order */ /* Reset the queue and re-add procs in the desired order */
ProcQueueInit(waitQueue); dclist_init(waitQueue);
for (j = 0; j < nProcs; j++) for (int j = 0; j < nProcs; j++)
{ dclist_push_tail(waitQueue, &procs[j]->links);
SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));
waitQueue->size++;
}
#ifdef DEBUG_DEADLOCK #ifdef DEBUG_DEADLOCK
PrintLockQueue(lock, "rearranged to:"); PrintLockQueue(lock, "rearranged to:");
@ -544,11 +538,8 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
{ {
PGPROC *proc; PGPROC *proc;
LOCK *lock = checkProc->waitLock; LOCK *lock = checkProc->waitLock;
PROCLOCK *proclock; dlist_iter proclock_iter;
SHM_QUEUE *procLocks;
LockMethod lockMethodTable; LockMethod lockMethodTable;
PROC_QUEUE *waitQueue;
int queue_size;
int conflictMask; int conflictMask;
int i; int i;
int numLockModes, int numLockModes,
@ -571,13 +562,9 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
* Scan for procs that already hold conflicting locks. These are "hard" * Scan for procs that already hold conflicting locks. These are "hard"
* edges in the waits-for graph. * edges in the waits-for graph.
*/ */
procLocks = &(lock->procLocks); dlist_foreach(proclock_iter, &lock->procLocks)
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
offsetof(PROCLOCK, lockLink));
while (proclock)
{ {
PROCLOCK *proclock = dlist_container(PROCLOCK, lockLink, proclock_iter.cur);
PGPROC *leader; PGPROC *leader;
proc = proclock->tag.myProc; proc = proclock->tag.myProc;
@ -636,9 +623,6 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
} }
} }
} }
proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
offsetof(PROCLOCK, lockLink));
} }
/* /*
@ -660,8 +644,7 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
{ {
/* Use the given hypothetical wait queue order */ /* Use the given hypothetical wait queue order */
PGPROC **procs = waitOrders[i].procs; PGPROC **procs = waitOrders[i].procs;
int queue_size = waitOrders[i].nProcs;
queue_size = waitOrders[i].nProcs;
for (i = 0; i < queue_size; i++) for (i = 0; i < queue_size; i++)
{ {
@ -711,9 +694,11 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
else else
{ {
PGPROC *lastGroupMember = NULL; PGPROC *lastGroupMember = NULL;
dlist_iter proc_iter;
dclist_head *waitQueue;
/* Use the true lock wait queue order */ /* Use the true lock wait queue order */
waitQueue = &(lock->waitProcs); waitQueue = &lock->waitProcs;
/* /*
* Find the last member of the lock group that is present in the wait * Find the last member of the lock group that is present in the wait
@ -726,13 +711,12 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
lastGroupMember = checkProc; lastGroupMember = checkProc;
else else
{ {
proc = (PGPROC *) waitQueue->links.next; dclist_foreach(proc_iter, waitQueue)
queue_size = waitQueue->size;
while (queue_size-- > 0)
{ {
proc = dlist_container(PGPROC, links, proc_iter.cur);
if (proc->lockGroupLeader == checkProcLeader) if (proc->lockGroupLeader == checkProcLeader)
lastGroupMember = proc; lastGroupMember = proc;
proc = (PGPROC *) proc->links.next;
} }
Assert(lastGroupMember != NULL); Assert(lastGroupMember != NULL);
} }
@ -740,12 +724,12 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
/* /*
* OK, now rescan (or scan) the queue to identify the soft conflicts. * OK, now rescan (or scan) the queue to identify the soft conflicts.
*/ */
queue_size = waitQueue->size; dclist_foreach(proc_iter, waitQueue)
proc = (PGPROC *) waitQueue->links.next;
while (queue_size-- > 0)
{ {
PGPROC *leader; PGPROC *leader;
proc = dlist_container(PGPROC, links, proc_iter.cur);
leader = proc->lockGroupLeader == NULL ? proc : leader = proc->lockGroupLeader == NULL ? proc :
proc->lockGroupLeader; proc->lockGroupLeader;
@ -779,8 +763,6 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
return true; return true;
} }
} }
proc = (PGPROC *) proc->links.next;
} }
} }
@ -832,8 +814,8 @@ ExpandConstraints(EDGE *constraints,
/* No, so allocate a new list */ /* No, so allocate a new list */
waitOrders[nWaitOrders].lock = lock; waitOrders[nWaitOrders].lock = lock;
waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs; waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;
waitOrders[nWaitOrders].nProcs = lock->waitProcs.size; waitOrders[nWaitOrders].nProcs = dclist_count(&lock->waitProcs);
nWaitOrderProcs += lock->waitProcs.size; nWaitOrderProcs += dclist_count(&lock->waitProcs);
Assert(nWaitOrderProcs <= MaxBackends); Assert(nWaitOrderProcs <= MaxBackends);
/* /*
@ -880,8 +862,8 @@ TopoSort(LOCK *lock,
int nConstraints, int nConstraints,
PGPROC **ordering) /* output argument */ PGPROC **ordering) /* output argument */
{ {
PROC_QUEUE *waitQueue = &(lock->waitProcs); dclist_head *waitQueue = &lock->waitProcs;
int queue_size = waitQueue->size; int queue_size = dclist_count(waitQueue);
PGPROC *proc; PGPROC *proc;
int i, int i,
j, j,
@ -889,14 +871,16 @@ TopoSort(LOCK *lock,
k, k,
kk, kk,
last; last;
dlist_iter proc_iter;
/* First, fill topoProcs[] array with the procs in their current order */ /* First, fill topoProcs[] array with the procs in their current order */
proc = (PGPROC *) waitQueue->links.next; i = 0;
for (i = 0; i < queue_size; i++) dclist_foreach(proc_iter, waitQueue)
{ {
topoProcs[i] = proc; proc = dlist_container(PGPROC, links, proc_iter.cur);
proc = (PGPROC *) proc->links.next; topoProcs[i++] = proc;
} }
Assert(i == queue_size);
/* /*
* Scan the constraints, and for each proc in the array, generate a count * Scan the constraints, and for each proc in the array, generate a count
@ -1066,17 +1050,16 @@ TopoSort(LOCK *lock,
static void static void
PrintLockQueue(LOCK *lock, const char *info) PrintLockQueue(LOCK *lock, const char *info)
{ {
PROC_QUEUE *waitQueue = &(lock->waitProcs); dclist_head *waitQueue = &lock->waitProcs;
int queue_size = waitQueue->size; dlist_iter proc_iter;
PGPROC *proc;
int i;
printf("%s lock %p queue ", info, lock); printf("%s lock %p queue ", info, lock);
proc = (PGPROC *) waitQueue->links.next;
for (i = 0; i < queue_size; i++) dclist_foreach(proc_iter, waitQueue)
{ {
PGPROC *proc = dlist_container(PGPROC, links, proc_iter.cur);
printf(" %d", proc->pid); printf(" %d", proc->pid);
proc = (PGPROC *) proc->links.next;
} }
printf("\n"); printf("\n");
fflush(stdout); fflush(stdout);

View File

@ -345,7 +345,7 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
lock->granted[1], lock->granted[2], lock->granted[3], lock->granted[1], lock->granted[2], lock->granted[3],
lock->granted[4], lock->granted[5], lock->granted[6], lock->granted[4], lock->granted[5], lock->granted[6],
lock->granted[7], lock->nGranted, lock->granted[7], lock->nGranted,
lock->waitProcs.size, dclist_count(&lock->waitProcs),
LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]); LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
} }
@ -1058,8 +1058,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
uint32 proclock_hashcode; uint32 proclock_hashcode;
proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
SHMQueueDelete(&proclock->lockLink); dlist_delete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink); dlist_delete(&proclock->procLink);
if (!hash_search_with_hash_value(LockMethodProcLockHash, if (!hash_search_with_hash_value(LockMethodProcLockHash,
(void *) &(proclock->tag), (void *) &(proclock->tag),
proclock_hashcode, proclock_hashcode,
@ -1194,8 +1194,8 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
{ {
lock->grantMask = 0; lock->grantMask = 0;
lock->waitMask = 0; lock->waitMask = 0;
SHMQueueInit(&(lock->procLocks)); dlist_init(&lock->procLocks);
ProcQueueInit(&(lock->waitProcs)); dclist_init(&lock->waitProcs);
lock->nRequested = 0; lock->nRequested = 0;
lock->nGranted = 0; lock->nGranted = 0;
MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
@ -1237,7 +1237,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
* of shared memory, because there won't be anything to cause * of shared memory, because there won't be anything to cause
* anyone to release the lock object later. * anyone to release the lock object later.
*/ */
Assert(SHMQueueEmpty(&(lock->procLocks))); Assert(dlist_is_empty(&(lock->procLocks)));
if (!hash_search_with_hash_value(LockMethodLockHash, if (!hash_search_with_hash_value(LockMethodLockHash,
(void *) &(lock->tag), (void *) &(lock->tag),
hashcode, hashcode,
@ -1270,9 +1270,8 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
proclock->holdMask = 0; proclock->holdMask = 0;
proclock->releaseMask = 0; proclock->releaseMask = 0;
/* Add proclock to appropriate lists */ /* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); dlist_push_tail(&lock->procLocks, &proclock->lockLink);
SHMQueueInsertBefore(&(proc->myProcLocks[partition]), dlist_push_tail(&proc->myProcLocks[partition], &proclock->procLink);
&proclock->procLink);
PROCLOCK_PRINT("LockAcquire: new", proclock); PROCLOCK_PRINT("LockAcquire: new", proclock);
} }
else else
@ -1427,9 +1426,8 @@ LockCheckConflicts(LockMethod lockMethodTable,
int conflictMask = lockMethodTable->conflictTab[lockmode]; int conflictMask = lockMethodTable->conflictTab[lockmode];
int conflictsRemaining[MAX_LOCKMODES]; int conflictsRemaining[MAX_LOCKMODES];
int totalConflictsRemaining = 0; int totalConflictsRemaining = 0;
dlist_iter proclock_iter;
int i; int i;
SHM_QUEUE *procLocks;
PROCLOCK *otherproclock;
/* /*
* first check for global conflicts: If no locks conflict with my request, * first check for global conflicts: If no locks conflict with my request,
@ -1501,11 +1499,11 @@ LockCheckConflicts(LockMethod lockMethodTable,
* shared memory state more complex (and larger) but it doesn't seem worth * shared memory state more complex (and larger) but it doesn't seem worth
* it. * it.
*/ */
procLocks = &(lock->procLocks); dlist_foreach(proclock_iter, &lock->procLocks)
otherproclock = (PROCLOCK *)
SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
while (otherproclock != NULL)
{ {
PROCLOCK *otherproclock =
dlist_container(PROCLOCK, lockLink, proclock_iter.cur);
if (proclock != otherproclock && if (proclock != otherproclock &&
proclock->groupLeader == otherproclock->groupLeader && proclock->groupLeader == otherproclock->groupLeader &&
(otherproclock->holdMask & conflictMask) != 0) (otherproclock->holdMask & conflictMask) != 0)
@ -1530,9 +1528,6 @@ LockCheckConflicts(LockMethod lockMethodTable,
return false; return false;
} }
} }
otherproclock = (PROCLOCK *)
SHMQueueNext(procLocks, &otherproclock->lockLink,
offsetof(PROCLOCK, lockLink));
} }
/* Nope, it's a real conflict. */ /* Nope, it's a real conflict. */
@ -1645,8 +1640,8 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
uint32 proclock_hashcode; uint32 proclock_hashcode;
PROCLOCK_PRINT("CleanUpLock: deleting", proclock); PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
SHMQueueDelete(&proclock->lockLink); dlist_delete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink); dlist_delete(&proclock->procLink);
proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
if (!hash_search_with_hash_value(LockMethodProcLockHash, if (!hash_search_with_hash_value(LockMethodProcLockHash,
(void *) &(proclock->tag), (void *) &(proclock->tag),
@ -1663,7 +1658,7 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
* object. * object.
*/ */
LOCK_PRINT("CleanUpLock: deleting", lock, 0); LOCK_PRINT("CleanUpLock: deleting", lock, 0);
Assert(SHMQueueEmpty(&(lock->procLocks))); Assert(dlist_is_empty(&lock->procLocks));
if (!hash_search_with_hash_value(LockMethodLockHash, if (!hash_search_with_hash_value(LockMethodLockHash,
(void *) &(lock->tag), (void *) &(lock->tag),
hashcode, hashcode,
@ -1926,12 +1921,11 @@ RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING); Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING);
Assert(proc->links.next != NULL); Assert(proc->links.next != NULL);
Assert(waitLock); Assert(waitLock);
Assert(waitLock->waitProcs.size > 0); Assert(!dclist_is_empty(&waitLock->waitProcs));
Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
/* Remove proc from lock's wait queue */ /* Remove proc from lock's wait queue */
SHMQueueDelete(&(proc->links)); dclist_delete_from(&waitLock->waitProcs, &proc->links);
waitLock->waitProcs.size--;
/* Undo increments of request counts by waiting process */ /* Undo increments of request counts by waiting process */
Assert(waitLock->nRequested > 0); Assert(waitLock->nRequested > 0);
@ -2185,7 +2179,6 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
numLockModes; numLockModes;
LOCALLOCK *locallock; LOCALLOCK *locallock;
LOCK *lock; LOCK *lock;
PROCLOCK *proclock;
int partition; int partition;
bool have_fast_path_lwlock = false; bool have_fast_path_lwlock = false;
@ -2342,8 +2335,8 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
{ {
LWLock *partitionLock; LWLock *partitionLock;
SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); dlist_head *procLocks = &MyProc->myProcLocks[partition];
PROCLOCK *nextplock; dlist_mutable_iter proclock_iter;
partitionLock = LockHashPartitionLockByIndex(partition); partitionLock = LockHashPartitionLockByIndex(partition);
@ -2366,24 +2359,16 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
* locallock situation, we lose that guarantee for fast-path locks. * locallock situation, we lose that guarantee for fast-path locks.
* This is not ideal. * This is not ideal.
*/ */
if (SHMQueueNext(procLocks, procLocks, if (dlist_is_empty(procLocks))
offsetof(PROCLOCK, procLink)) == NULL)
continue; /* needn't examine this partition */ continue; /* needn't examine this partition */
LWLockAcquire(partitionLock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE);
for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, dlist_foreach_modify(proclock_iter, procLocks)
offsetof(PROCLOCK, procLink));
proclock;
proclock = nextplock)
{ {
PROCLOCK *proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur);
bool wakeupNeeded = false; bool wakeupNeeded = false;
/* Get link first, since we may unlink/delete this proclock */
nextplock = (PROCLOCK *)
SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
Assert(proclock->tag.myProc == MyProc); Assert(proclock->tag.myProc == MyProc);
lock = proclock->tag.myLock; lock = proclock->tag.myLock;
@ -2918,7 +2903,7 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
LockMethod lockMethodTable; LockMethod lockMethodTable;
LOCK *lock; LOCK *lock;
LOCKMASK conflictMask; LOCKMASK conflictMask;
SHM_QUEUE *procLocks; dlist_iter proclock_iter;
PROCLOCK *proclock; PROCLOCK *proclock;
uint32 hashcode; uint32 hashcode;
LWLock *partitionLock; LWLock *partitionLock;
@ -3064,14 +3049,10 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
/* /*
* Examine each existing holder (or awaiter) of the lock. * Examine each existing holder (or awaiter) of the lock.
*/ */
dlist_foreach(proclock_iter, &lock->procLocks)
procLocks = &(lock->procLocks);
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
offsetof(PROCLOCK, lockLink));
while (proclock)
{ {
proclock = dlist_container(PROCLOCK, lockLink, proclock_iter.cur);
if (conflictMask & proclock->holdMask) if (conflictMask & proclock->holdMask)
{ {
PGPROC *proc = proclock->tag.myProc; PGPROC *proc = proclock->tag.myProc;
@ -3097,9 +3078,6 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
/* else, xact already committed or aborted */ /* else, xact already committed or aborted */
} }
} }
proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
offsetof(PROCLOCK, lockLink));
} }
LWLockRelease(partitionLock); LWLockRelease(partitionLock);
@ -3498,8 +3476,8 @@ PostPrepare_Locks(TransactionId xid)
for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
{ {
LWLock *partitionLock; LWLock *partitionLock;
SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); dlist_head *procLocks = &(MyProc->myProcLocks[partition]);
PROCLOCK *nextplock; dlist_mutable_iter proclock_iter;
partitionLock = LockHashPartitionLockByIndex(partition); partitionLock = LockHashPartitionLockByIndex(partition);
@ -3511,21 +3489,14 @@ PostPrepare_Locks(TransactionId xid)
* another backend is adding something to our lists now. For safety, * another backend is adding something to our lists now. For safety,
* though, we code this the same way as in LockReleaseAll. * though, we code this the same way as in LockReleaseAll.
*/ */
if (SHMQueueNext(procLocks, procLocks, if (dlist_is_empty(procLocks))
offsetof(PROCLOCK, procLink)) == NULL)
continue; /* needn't examine this partition */ continue; /* needn't examine this partition */
LWLockAcquire(partitionLock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE);
for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, dlist_foreach_modify(proclock_iter, procLocks)
offsetof(PROCLOCK, procLink));
proclock;
proclock = nextplock)
{ {
/* Get link first, since we may unlink/relink this proclock */ proclock = dlist_container(PROCLOCK, procLink, proclock_iter.cur);
nextplock = (PROCLOCK *)
SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
Assert(proclock->tag.myProc == MyProc); Assert(proclock->tag.myProc == MyProc);
@ -3563,7 +3534,7 @@ PostPrepare_Locks(TransactionId xid)
* same hash partition, cf proclock_hash(). So the partition lock * same hash partition, cf proclock_hash(). So the partition lock
* we already hold is sufficient for this. * we already hold is sufficient for this.
*/ */
SHMQueueDelete(&proclock->procLink); dlist_delete(&proclock->procLink);
/* /*
* Create the new hash key for the proclock. * Create the new hash key for the proclock.
@ -3589,8 +3560,7 @@ PostPrepare_Locks(TransactionId xid)
elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks"); elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
/* Re-link into the new proc's proclock list */ /* Re-link into the new proc's proclock list */
SHMQueueInsertBefore(&(newproc->myProcLocks[partition]), dlist_push_tail(&newproc->myProcLocks[partition], &proclock->procLink);
&proclock->procLink);
PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock); PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
} /* loop over PROCLOCKs within this partition */ } /* loop over PROCLOCKs within this partition */
@ -3919,12 +3889,10 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
{ {
LOCK *theLock = blocked_proc->waitLock; LOCK *theLock = blocked_proc->waitLock;
BlockedProcData *bproc; BlockedProcData *bproc;
SHM_QUEUE *procLocks; dlist_iter proclock_iter;
PROCLOCK *proclock; dlist_iter proc_iter;
PROC_QUEUE *waitQueue; dclist_head *waitQueue;
PGPROC *queued_proc;
int queue_size; int queue_size;
int i;
/* Nothing to do if this proc is not blocked */ /* Nothing to do if this proc is not blocked */
if (theLock == NULL) if (theLock == NULL)
@ -3942,11 +3910,10 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
*/ */
/* Collect all PROCLOCKs associated with theLock */ /* Collect all PROCLOCKs associated with theLock */
procLocks = &(theLock->procLocks); dlist_foreach(proclock_iter, &theLock->procLocks)
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
offsetof(PROCLOCK, lockLink));
while (proclock)
{ {
PROCLOCK *proclock =
dlist_container(PROCLOCK, lockLink, proclock_iter.cur);
PGPROC *proc = proclock->tag.myProc; PGPROC *proc = proclock->tag.myProc;
LOCK *lock = proclock->tag.myLock; LOCK *lock = proclock->tag.myLock;
LockInstanceData *instance; LockInstanceData *instance;
@ -3971,14 +3938,11 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
instance->leaderPid = proclock->groupLeader->pid; instance->leaderPid = proclock->groupLeader->pid;
instance->fastpath = false; instance->fastpath = false;
data->nlocks++; data->nlocks++;
proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
offsetof(PROCLOCK, lockLink));
} }
/* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */ /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */
waitQueue = &(theLock->waitProcs); waitQueue = &(theLock->waitProcs);
queue_size = waitQueue->size; queue_size = dclist_count(waitQueue);
if (queue_size > data->maxpids - data->npids) if (queue_size > data->maxpids - data->npids)
{ {
@ -3989,9 +3953,9 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
} }
/* Collect PIDs from the lock's wait queue, stopping at blocked_proc */ /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */
queued_proc = (PGPROC *) waitQueue->links.next; dclist_foreach(proc_iter, waitQueue)
for (i = 0; i < queue_size; i++)
{ {
PGPROC *queued_proc = dlist_container(PGPROC, links, proc_iter.cur);
if (queued_proc == blocked_proc) if (queued_proc == blocked_proc)
break; break;
data->waiter_pids[data->npids++] = queued_proc->pid; data->waiter_pids[data->npids++] = queued_proc->pid;
@ -4113,9 +4077,6 @@ GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
void void
DumpLocks(PGPROC *proc) DumpLocks(PGPROC *proc)
{ {
SHM_QUEUE *procLocks;
PROCLOCK *proclock;
LOCK *lock;
int i; int i;
if (proc == NULL) if (proc == NULL)
@ -4126,23 +4087,17 @@ DumpLocks(PGPROC *proc)
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
{ {
procLocks = &(proc->myProcLocks[i]); dlist_head *procLocks = &proc->myProcLocks[i];
dlist_iter iter;
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, dlist_foreach(iter, procLocks)
offsetof(PROCLOCK, procLink));
while (proclock)
{ {
PROCLOCK *proclock = dlist_container(PROCLOCK, procLink, iter.cur);
LOCK *lock = proclock->tag.myLock;
Assert(proclock->tag.myProc == proc); Assert(proclock->tag.myProc == proc);
lock = proclock->tag.myLock;
PROCLOCK_PRINT("DumpLocks", proclock); PROCLOCK_PRINT("DumpLocks", proclock);
LOCK_PRINT("DumpLocks", lock, 0); LOCK_PRINT("DumpLocks", lock, 0);
proclock = (PROCLOCK *)
SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
} }
} }
} }
@ -4267,8 +4222,8 @@ lock_twophase_recover(TransactionId xid, uint16 info,
{ {
lock->grantMask = 0; lock->grantMask = 0;
lock->waitMask = 0; lock->waitMask = 0;
SHMQueueInit(&(lock->procLocks)); dlist_init(&lock->procLocks);
ProcQueueInit(&(lock->waitProcs)); dclist_init(&lock->waitProcs);
lock->nRequested = 0; lock->nRequested = 0;
lock->nGranted = 0; lock->nGranted = 0;
MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
@ -4310,7 +4265,7 @@ lock_twophase_recover(TransactionId xid, uint16 info,
* of shared memory, because there won't be anything to cause * of shared memory, because there won't be anything to cause
* anyone to release the lock object later. * anyone to release the lock object later.
*/ */
Assert(SHMQueueEmpty(&(lock->procLocks))); Assert(dlist_is_empty(&lock->procLocks));
if (!hash_search_with_hash_value(LockMethodLockHash, if (!hash_search_with_hash_value(LockMethodLockHash,
(void *) &(lock->tag), (void *) &(lock->tag),
hashcode, hashcode,
@ -4335,9 +4290,9 @@ lock_twophase_recover(TransactionId xid, uint16 info,
proclock->holdMask = 0; proclock->holdMask = 0;
proclock->releaseMask = 0; proclock->releaseMask = 0;
/* Add proclock to appropriate lists */ /* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); dlist_push_tail(&lock->procLocks, &proclock->lockLink);
SHMQueueInsertBefore(&(proc->myProcLocks[partition]), dlist_push_tail(&proc->myProcLocks[partition],
&proclock->procLink); &proclock->procLink);
PROCLOCK_PRINT("lock_twophase_recover: new", proclock); PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
} }
else else

View File

@ -15,8 +15,6 @@
/* /*
* Interface (a): * Interface (a):
* ProcSleep(), ProcWakeup(), * ProcSleep(), ProcWakeup(),
* ProcQueueAlloc() -- create a shm queue for sleeping processes
* ProcQueueInit() -- create a queue without allocing memory
* *
* Waiting for a lock causes the backend to be put to sleep. Whoever releases * Waiting for a lock causes the backend to be put to sleep. Whoever releases
* the lock wakes the process up again (and gives it an error code so it knows * the lock wakes the process up again (and gives it an error code so it knows
@ -173,10 +171,10 @@ InitProcGlobal(void)
* Initialize the data structures. * Initialize the data structures.
*/ */
ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY; ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
ProcGlobal->freeProcs = NULL; dlist_init(&ProcGlobal->freeProcs);
ProcGlobal->autovacFreeProcs = NULL; dlist_init(&ProcGlobal->autovacFreeProcs);
ProcGlobal->bgworkerFreeProcs = NULL; dlist_init(&ProcGlobal->bgworkerFreeProcs);
ProcGlobal->walsenderFreeProcs = NULL; dlist_init(&ProcGlobal->walsenderFreeProcs);
ProcGlobal->startupBufferPinWaitBufId = -1; ProcGlobal->startupBufferPinWaitBufId = -1;
ProcGlobal->walwriterLatch = NULL; ProcGlobal->walwriterLatch = NULL;
ProcGlobal->checkpointerLatch = NULL; ProcGlobal->checkpointerLatch = NULL;
@ -214,6 +212,8 @@ InitProcGlobal(void)
for (i = 0; i < TotalProcs; i++) for (i = 0; i < TotalProcs; i++)
{ {
PGPROC *proc = &procs[i];
/* Common initialization for all PGPROCs, regardless of type. */ /* Common initialization for all PGPROCs, regardless of type. */
/* /*
@ -223,11 +223,11 @@ InitProcGlobal(void)
*/ */
if (i < MaxBackends + NUM_AUXILIARY_PROCS) if (i < MaxBackends + NUM_AUXILIARY_PROCS)
{ {
procs[i].sem = PGSemaphoreCreate(); proc->sem = PGSemaphoreCreate();
InitSharedLatch(&(procs[i].procLatch)); InitSharedLatch(&(proc->procLatch));
LWLockInitialize(&(procs[i].fpInfoLock), LWTRANCHE_LOCK_FASTPATH); LWLockInitialize(&(proc->fpInfoLock), LWTRANCHE_LOCK_FASTPATH);
} }
procs[i].pgprocno = i; proc->pgprocno = i;
/* /*
* Newly created PGPROCs for normal backends, autovacuum and bgworkers * Newly created PGPROCs for normal backends, autovacuum and bgworkers
@ -240,46 +240,42 @@ InitProcGlobal(void)
if (i < MaxConnections) if (i < MaxConnections)
{ {
/* PGPROC for normal backend, add to freeProcs list */ /* PGPROC for normal backend, add to freeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; dlist_push_head(&ProcGlobal->freeProcs, &proc->links);
ProcGlobal->freeProcs = &procs[i]; proc->procgloballist = &ProcGlobal->freeProcs;
procs[i].procgloballist = &ProcGlobal->freeProcs;
} }
else if (i < MaxConnections + autovacuum_max_workers + 1) else if (i < MaxConnections + autovacuum_max_workers + 1)
{ {
/* PGPROC for AV launcher/worker, add to autovacFreeProcs list */ /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; dlist_push_head(&ProcGlobal->autovacFreeProcs, &proc->links);
ProcGlobal->autovacFreeProcs = &procs[i]; proc->procgloballist = &ProcGlobal->autovacFreeProcs;
procs[i].procgloballist = &ProcGlobal->autovacFreeProcs;
} }
else if (i < MaxConnections + autovacuum_max_workers + 1 + max_worker_processes) else if (i < MaxConnections + autovacuum_max_workers + 1 + max_worker_processes)
{ {
/* PGPROC for bgworker, add to bgworkerFreeProcs list */ /* PGPROC for bgworker, add to bgworkerFreeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs; dlist_push_head(&ProcGlobal->bgworkerFreeProcs, &proc->links);
ProcGlobal->bgworkerFreeProcs = &procs[i]; proc->procgloballist = &ProcGlobal->bgworkerFreeProcs;
procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs;
} }
else if (i < MaxBackends) else if (i < MaxBackends)
{ {
/* PGPROC for walsender, add to walsenderFreeProcs list */ /* PGPROC for walsender, add to walsenderFreeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->walsenderFreeProcs; dlist_push_head(&ProcGlobal->walsenderFreeProcs, &proc->links);
ProcGlobal->walsenderFreeProcs = &procs[i]; proc->procgloballist = &ProcGlobal->walsenderFreeProcs;
procs[i].procgloballist = &ProcGlobal->walsenderFreeProcs;
} }
/* Initialize myProcLocks[] shared memory queues. */ /* Initialize myProcLocks[] shared memory queues. */
for (j = 0; j < NUM_LOCK_PARTITIONS; j++) for (j = 0; j < NUM_LOCK_PARTITIONS; j++)
SHMQueueInit(&(procs[i].myProcLocks[j])); dlist_init(&(proc->myProcLocks[j]));
/* Initialize lockGroupMembers list. */ /* Initialize lockGroupMembers list. */
dlist_init(&procs[i].lockGroupMembers); dlist_init(&proc->lockGroupMembers);
/* /*
* Initialize the atomic variables, otherwise, it won't be safe to * Initialize the atomic variables, otherwise, it won't be safe to
* access them for backends that aren't currently in use. * access them for backends that aren't currently in use.
*/ */
pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO); pg_atomic_init_u32(&(proc->procArrayGroupNext), INVALID_PGPROCNO);
pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO); pg_atomic_init_u32(&(proc->clogGroupNext), INVALID_PGPROCNO);
pg_atomic_init_u64(&(procs[i].waitStart), 0); pg_atomic_init_u64(&(proc->waitStart), 0);
} }
/* /*
@ -300,7 +296,7 @@ InitProcGlobal(void)
void void
InitProcess(void) InitProcess(void)
{ {
PGPROC *volatile *procgloballist; dlist_head *procgloballist;
/* /*
* ProcGlobal should be set up already (if we are a backend, we inherit * ProcGlobal should be set up already (if we are a backend, we inherit
@ -333,11 +329,9 @@ InitProcess(void)
set_spins_per_delay(ProcGlobal->spins_per_delay); set_spins_per_delay(ProcGlobal->spins_per_delay);
MyProc = *procgloballist; if (!dlist_is_empty(procgloballist))
if (MyProc != NULL)
{ {
*procgloballist = (PGPROC *) MyProc->links.next; MyProc = (PGPROC*) dlist_pop_head_node(procgloballist);
SpinLockRelease(ProcStructLock); SpinLockRelease(ProcStructLock);
} }
else else
@ -378,7 +372,7 @@ InitProcess(void)
* Initialize all fields of MyProc, except for those previously * Initialize all fields of MyProc, except for those previously
* initialized by InitProcGlobal. * initialized by InitProcGlobal.
*/ */
SHMQueueElemInit(&(MyProc->links)); dlist_node_init(&MyProc->links);
MyProc->waitStatus = PROC_WAIT_STATUS_OK; MyProc->waitStatus = PROC_WAIT_STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId; MyProc->lxid = InvalidLocalTransactionId;
MyProc->fpVXIDLock = false; MyProc->fpVXIDLock = false;
@ -408,7 +402,7 @@ InitProcess(void)
/* Last process should have released all locks. */ /* Last process should have released all locks. */
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); Assert(dlist_is_empty(&(MyProc->myProcLocks[i])));
} }
#endif #endif
MyProc->recoveryConflictPending = false; MyProc->recoveryConflictPending = false;
@ -565,7 +559,7 @@ InitAuxiliaryProcess(void)
* Initialize all fields of MyProc, except for those previously * Initialize all fields of MyProc, except for those previously
* initialized by InitProcGlobal. * initialized by InitProcGlobal.
*/ */
SHMQueueElemInit(&(MyProc->links)); dlist_node_init(&MyProc->links);
MyProc->waitStatus = PROC_WAIT_STATUS_OK; MyProc->waitStatus = PROC_WAIT_STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId; MyProc->lxid = InvalidLocalTransactionId;
MyProc->fpVXIDLock = false; MyProc->fpVXIDLock = false;
@ -590,7 +584,7 @@ InitAuxiliaryProcess(void)
/* Last process should have released all locks. */ /* Last process should have released all locks. */
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); Assert(dlist_is_empty(&(MyProc->myProcLocks[i])));
} }
#endif #endif
@ -658,16 +652,15 @@ GetStartupBufferPinWaitBufId(void)
bool bool
HaveNFreeProcs(int n) HaveNFreeProcs(int n)
{ {
PGPROC *proc; dlist_iter iter;
SpinLockAcquire(ProcStructLock); SpinLockAcquire(ProcStructLock);
proc = ProcGlobal->freeProcs; dlist_foreach(iter, &ProcGlobal->freeProcs)
while (n > 0 && proc != NULL)
{ {
proc = (PGPROC *) proc->links.next;
n--; n--;
if (n == 0)
break;
} }
SpinLockRelease(ProcStructLock); SpinLockRelease(ProcStructLock);
@ -730,7 +723,7 @@ LockErrorCleanup(void)
partitionLock = LockHashPartitionLock(lockAwaited->hashcode); partitionLock = LockHashPartitionLock(lockAwaited->hashcode);
LWLockAcquire(partitionLock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE);
if (MyProc->links.next != NULL) if (!dlist_node_is_detached(&MyProc->links))
{ {
/* We could not have been granted the lock yet */ /* We could not have been granted the lock yet */
RemoveFromWaitQueue(MyProc, lockAwaited->hashcode); RemoveFromWaitQueue(MyProc, lockAwaited->hashcode);
@ -803,7 +796,7 @@ static void
ProcKill(int code, Datum arg) ProcKill(int code, Datum arg)
{ {
PGPROC *proc; PGPROC *proc;
PGPROC *volatile *procgloballist; dlist_head *procgloballist;
Assert(MyProc != NULL); Assert(MyProc != NULL);
@ -816,7 +809,7 @@ ProcKill(int code, Datum arg)
/* Last process should have released all locks. */ /* Last process should have released all locks. */
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); Assert(dlist_is_empty(&(MyProc->myProcLocks[i])));
} }
#endif #endif
@ -832,7 +825,7 @@ ProcKill(int code, Datum arg)
/* /*
* Detach from any lock group of which we are a member. If the leader * Detach from any lock group of which we are a member. If the leader
* exist before all other group members, its PGPROC will remain allocated * exits before all other group members, its PGPROC will remain allocated
* until the last group process exits; that process must return the * until the last group process exits; that process must return the
* leader's PGPROC to the appropriate list. * leader's PGPROC to the appropriate list.
*/ */
@ -853,8 +846,7 @@ ProcKill(int code, Datum arg)
/* Leader exited first; return its PGPROC. */ /* Leader exited first; return its PGPROC. */
SpinLockAcquire(ProcStructLock); SpinLockAcquire(ProcStructLock);
leader->links.next = (SHM_QUEUE *) *procgloballist; dlist_push_head(procgloballist, &leader->links);
*procgloballist = leader;
SpinLockRelease(ProcStructLock); SpinLockRelease(ProcStructLock);
} }
} }
@ -893,8 +885,7 @@ ProcKill(int code, Datum arg)
Assert(dlist_is_empty(&proc->lockGroupMembers)); Assert(dlist_is_empty(&proc->lockGroupMembers));
/* Return PGPROC structure (and semaphore) to appropriate freelist */ /* Return PGPROC structure (and semaphore) to appropriate freelist */
proc->links.next = (SHM_QUEUE *) *procgloballist; dlist_push_tail(procgloballist, &proc->links);
*procgloballist = proc;
} }
/* Update shared estimate of spins_per_delay */ /* Update shared estimate of spins_per_delay */
@ -986,44 +977,6 @@ AuxiliaryPidGetProc(int pid)
return result; return result;
} }
/*
* ProcQueue package: routines for putting processes to sleep
* and waking them up
*/
/*
* ProcQueueAlloc -- alloc/attach to a shared memory process queue
*
* Returns: a pointer to the queue
* Side Effects: Initializes the queue if it wasn't there before
*/
#ifdef NOT_USED
PROC_QUEUE *
ProcQueueAlloc(const char *name)
{
PROC_QUEUE *queue;
bool found;
queue = (PROC_QUEUE *)
ShmemInitStruct(name, sizeof(PROC_QUEUE), &found);
if (!found)
ProcQueueInit(queue);
return queue;
}
#endif
/*
* ProcQueueInit -- initialize a shared memory process queue
*/
void
ProcQueueInit(PROC_QUEUE *queue)
{
SHMQueueInit(&(queue->links));
queue->size = 0;
}
/* /*
* ProcSleep -- put a process to sleep on the specified lock * ProcSleep -- put a process to sleep on the specified lock
@ -1049,8 +1002,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
PROCLOCK *proclock = locallock->proclock; PROCLOCK *proclock = locallock->proclock;
uint32 hashcode = locallock->hashcode; uint32 hashcode = locallock->hashcode;
LWLock *partitionLock = LockHashPartitionLock(hashcode); LWLock *partitionLock = LockHashPartitionLock(hashcode);
PROC_QUEUE *waitQueue = &(lock->waitProcs); dclist_head *waitQueue = &lock->waitProcs;
SHM_QUEUE *waitQueuePos; PGPROC *insert_before = NULL;
LOCKMASK myHeldLocks = MyProc->heldLocks; LOCKMASK myHeldLocks = MyProc->heldLocks;
TimestampTz standbyWaitStart = 0; TimestampTz standbyWaitStart = 0;
bool early_deadlock = false; bool early_deadlock = false;
@ -1058,7 +1011,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
bool logged_recovery_conflict = false; bool logged_recovery_conflict = false;
ProcWaitStatus myWaitStatus; ProcWaitStatus myWaitStatus;
PGPROC *leader = MyProc->lockGroupLeader; PGPROC *leader = MyProc->lockGroupLeader;
int i;
/* /*
* If group locking is in use, locks held by members of my locking group * If group locking is in use, locks held by members of my locking group
@ -1072,18 +1024,16 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
*/ */
if (leader != NULL) if (leader != NULL)
{ {
SHM_QUEUE *procLocks = &(lock->procLocks); dlist_iter iter;
PROCLOCK *otherproclock;
otherproclock = (PROCLOCK *) dlist_foreach(iter, &lock->procLocks)
SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
while (otherproclock != NULL)
{ {
PROCLOCK *otherproclock;
otherproclock = dlist_container(PROCLOCK, lockLink, iter.cur);
if (otherproclock->groupLeader == leader) if (otherproclock->groupLeader == leader)
myHeldLocks |= otherproclock->holdMask; myHeldLocks |= otherproclock->holdMask;
otherproclock = (PROCLOCK *)
SHMQueueNext(procLocks, &otherproclock->lockLink,
offsetof(PROCLOCK, lockLink));
} }
} }
@ -1104,15 +1054,14 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* we are only considering the part of the wait queue before my insertion * we are only considering the part of the wait queue before my insertion
* point. * point.
*/ */
if (myHeldLocks != 0 && waitQueue->size > 0) if (myHeldLocks != 0 && !dclist_is_empty(waitQueue))
{ {
LOCKMASK aheadRequests = 0; LOCKMASK aheadRequests = 0;
SHM_QUEUE *proc_node; dlist_iter iter;
proc_node = waitQueue->links.next; dclist_foreach(iter, waitQueue)
for (i = 0; i < waitQueue->size; i++)
{ {
PGPROC *proc = (PGPROC *) proc_node; PGPROC *proc = dlist_container(PGPROC, links, iter.cur);
/* /*
* If we're part of the same locking group as this waiter, its * If we're part of the same locking group as this waiter, its
@ -1120,10 +1069,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* aheadRequests. * aheadRequests.
*/ */
if (leader != NULL && leader == proc->lockGroupLeader) if (leader != NULL && leader == proc->lockGroupLeader)
{
proc_node = proc->links.next;
continue; continue;
}
/* Must he wait for me? */ /* Must he wait for me? */
if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks) if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks)
{ {
@ -1151,31 +1098,23 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
GrantAwaitedLock(); GrantAwaitedLock();
return PROC_WAIT_STATUS_OK; return PROC_WAIT_STATUS_OK;
} }
/* Break out of loop to put myself before him */
/* Put myself into wait queue before conflicting process */
insert_before = proc;
break; break;
} }
/* Nope, so advance to next waiter */ /* Nope, so advance to next waiter */
aheadRequests |= LOCKBIT_ON(proc->waitLockMode); aheadRequests |= LOCKBIT_ON(proc->waitLockMode);
proc_node = proc->links.next;
} }
/*
* If we iterated through the whole queue, cur points to the waitQueue
* head, so we will insert at tail of queue as desired.
*/
waitQueuePos = proc_node;
}
else
{
/* I hold no locks, so I can't push in front of anyone. */
waitQueuePos = &waitQueue->links;
} }
/* /*
* Insert self into queue, at the position determined above. * Insert self into queue, at the position determined above.
*/ */
SHMQueueInsertBefore(waitQueuePos, &MyProc->links); if (insert_before)
waitQueue->size++; dclist_insert_before(waitQueue, &insert_before->links, &MyProc->links);
else
dclist_push_tail(waitQueue, &MyProc->links);
lock->waitMask |= LOCKBIT_ON(lockmode); lock->waitMask |= LOCKBIT_ON(lockmode);
@ -1453,7 +1392,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
long secs; long secs;
int usecs; int usecs;
long msecs; long msecs;
SHM_QUEUE *procLocks; dlist_iter proc_iter;
PROCLOCK *curproclock; PROCLOCK *curproclock;
bool first_holder = true, bool first_holder = true,
first_waiter = true; first_waiter = true;
@ -1483,12 +1422,11 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
LWLockAcquire(partitionLock, LW_SHARED); LWLockAcquire(partitionLock, LW_SHARED);
procLocks = &(lock->procLocks); dlist_foreach(proc_iter, &lock->procLocks)
curproclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
offsetof(PROCLOCK, lockLink));
while (curproclock)
{ {
curproclock =
dlist_container(PROCLOCK, lockLink, proc_iter.cur);
/* /*
* we are a waiter if myProc->waitProcLock == curproclock; we * we are a waiter if myProc->waitProcLock == curproclock; we
* are a holder if it is NULL or something different * are a holder if it is NULL or something different
@ -1519,10 +1457,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
lockHoldersNum++; lockHoldersNum++;
} }
curproclock = (PROCLOCK *) SHMQueueNext(procLocks,
&curproclock->lockLink,
offsetof(PROCLOCK, lockLink));
} }
LWLockRelease(partitionLock); LWLockRelease(partitionLock);
@ -1657,7 +1591,6 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* ProcWakeup -- wake up a process by setting its latch. * ProcWakeup -- wake up a process by setting its latch.
* *
* Also remove the process from the wait queue and set its links invalid. * Also remove the process from the wait queue and set its links invalid.
* RETURN: the next process in the wait queue.
* *
* The appropriate lock partition lock must be held by caller. * The appropriate lock partition lock must be held by caller.
* *
@ -1666,23 +1599,16 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* to twiddle the lock's request counts too --- see RemoveFromWaitQueue. * to twiddle the lock's request counts too --- see RemoveFromWaitQueue.
* Hence, in practice the waitStatus parameter must be PROC_WAIT_STATUS_OK. * Hence, in practice the waitStatus parameter must be PROC_WAIT_STATUS_OK.
*/ */
PGPROC * void
ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus) ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus)
{ {
PGPROC *retProc; if (dlist_node_is_detached(&proc->links))
return;
/* Proc should be sleeping ... */
if (proc->links.prev == NULL ||
proc->links.next == NULL)
return NULL;
Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING); Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING);
/* Save next process before we zap the list link */
retProc = (PGPROC *) proc->links.next;
/* Remove process from wait queue */ /* Remove process from wait queue */
SHMQueueDelete(&(proc->links)); dclist_delete_from_thoroughly(&proc->waitLock->waitProcs, &proc->links);
(proc->waitLock->waitProcs.size)--;
/* Clean up process' state and pass it the ok/fail signal */ /* Clean up process' state and pass it the ok/fail signal */
proc->waitLock = NULL; proc->waitLock = NULL;
@ -1692,8 +1618,6 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus)
/* And awaken it */ /* And awaken it */
SetLatch(&proc->procLatch); SetLatch(&proc->procLatch);
return retProc;
} }
/* /*
@ -1706,20 +1630,16 @@ ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus)
void void
ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock) ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
{ {
PROC_QUEUE *waitQueue = &(lock->waitProcs); dclist_head *waitQueue = &lock->waitProcs;
int queue_size = waitQueue->size;
PGPROC *proc;
LOCKMASK aheadRequests = 0; LOCKMASK aheadRequests = 0;
dlist_mutable_iter miter;
Assert(queue_size >= 0); if (dclist_is_empty(waitQueue))
if (queue_size == 0)
return; return;
proc = (PGPROC *) waitQueue->links.next; dclist_foreach_modify(miter, waitQueue)
while (queue_size-- > 0)
{ {
PGPROC *proc = dlist_container(PGPROC, links, miter.cur);
LOCKMODE lockmode = proc->waitLockMode; LOCKMODE lockmode = proc->waitLockMode;
/* /*
@ -1732,25 +1652,18 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
{ {
/* OK to waken */ /* OK to waken */
GrantLock(lock, proc->waitProcLock, lockmode); GrantLock(lock, proc->waitProcLock, lockmode);
proc = ProcWakeup(proc, PROC_WAIT_STATUS_OK); /* removes proc from the lock's waiting process queue */
ProcWakeup(proc, PROC_WAIT_STATUS_OK);
/*
* ProcWakeup removes proc from the lock's waiting process queue
* and returns the next proc in chain; don't use proc's next-link,
* because it's been cleared.
*/
} }
else else
{ {
/* /*
* Cannot wake this guy. Remember his request for later checks. * Lock conflicts: Don't wake, but remember requested mode for
* later checks.
*/ */
aheadRequests |= LOCKBIT_ON(lockmode); aheadRequests |= LOCKBIT_ON(lockmode);
proc = (PGPROC *) proc->links.next;
} }
} }
Assert(waitQueue->size >= 0);
} }
/* /*

View File

@ -18,6 +18,7 @@
#error "lock.h may not be included from frontend code" #error "lock.h may not be included from frontend code"
#endif #endif
#include "lib/ilist.h"
#include "storage/backendid.h" #include "storage/backendid.h"
#include "storage/lockdefs.h" #include "storage/lockdefs.h"
#include "storage/lwlock.h" #include "storage/lwlock.h"
@ -27,12 +28,6 @@
/* struct PGPROC is declared in proc.h, but must forward-reference it */ /* struct PGPROC is declared in proc.h, but must forward-reference it */
typedef struct PGPROC PGPROC; typedef struct PGPROC PGPROC;
typedef struct PROC_QUEUE
{
SHM_QUEUE links; /* head of list of PGPROC objects */
int size; /* number of entries in list */
} PROC_QUEUE;
/* GUC variables */ /* GUC variables */
extern PGDLLIMPORT int max_locks_per_xact; extern PGDLLIMPORT int max_locks_per_xact;
@ -318,8 +313,8 @@ typedef struct LOCK
/* data */ /* data */
LOCKMASK grantMask; /* bitmask for lock types already granted */ LOCKMASK grantMask; /* bitmask for lock types already granted */
LOCKMASK waitMask; /* bitmask for lock types awaited */ LOCKMASK waitMask; /* bitmask for lock types awaited */
SHM_QUEUE procLocks; /* list of PROCLOCK objects assoc. with lock */ dlist_head procLocks; /* list of PROCLOCK objects assoc. with lock */
PROC_QUEUE waitProcs; /* list of PGPROC objects waiting on lock */ dclist_head waitProcs; /* list of PGPROC objects waiting on lock */
int requested[MAX_LOCKMODES]; /* counts of requested locks */ int requested[MAX_LOCKMODES]; /* counts of requested locks */
int nRequested; /* total of requested[] array */ int nRequested; /* total of requested[] array */
int granted[MAX_LOCKMODES]; /* counts of granted locks */ int granted[MAX_LOCKMODES]; /* counts of granted locks */
@ -380,8 +375,8 @@ typedef struct PROCLOCK
PGPROC *groupLeader; /* proc's lock group leader, or proc itself */ PGPROC *groupLeader; /* proc's lock group leader, or proc itself */
LOCKMASK holdMask; /* bitmask for lock types currently held */ LOCKMASK holdMask; /* bitmask for lock types currently held */
LOCKMASK releaseMask; /* bitmask for lock types to be released */ LOCKMASK releaseMask; /* bitmask for lock types to be released */
SHM_QUEUE lockLink; /* list link in LOCK's list of proclocks */ dlist_node lockLink; /* list link in LOCK's list of proclocks */
SHM_QUEUE procLink; /* list link in PGPROC's list of proclocks */ dlist_node procLink; /* list link in PGPROC's list of proclocks */
} PROCLOCK; } PROCLOCK;
#define PROCLOCK_LOCKMETHOD(proclock) \ #define PROCLOCK_LOCKMETHOD(proclock) \

View File

@ -167,8 +167,8 @@ typedef enum
struct PGPROC struct PGPROC
{ {
/* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */ /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
SHM_QUEUE links; /* list link if process is in a list */ dlist_node links; /* list link if process is in a list */
PGPROC **procgloballist; /* procglobal list that owns this PGPROC */ dlist_head *procgloballist; /* procglobal list that owns this PGPROC */
PGSemaphore sem; /* ONE semaphore to sleep on */ PGSemaphore sem; /* ONE semaphore to sleep on */
ProcWaitStatus waitStatus; ProcWaitStatus waitStatus;
@ -255,7 +255,7 @@ struct PGPROC
* linked into one of these lists, according to the partition number of * linked into one of these lists, according to the partition number of
* their lock. * their lock.
*/ */
SHM_QUEUE myProcLocks[NUM_LOCK_PARTITIONS]; dlist_head myProcLocks[NUM_LOCK_PARTITIONS];
XidCacheStatus subxidStatus; /* mirrored with XidCacheStatus subxidStatus; /* mirrored with
* ProcGlobal->subxidStates[i] */ * ProcGlobal->subxidStates[i] */
@ -385,13 +385,13 @@ typedef struct PROC_HDR
/* Length of allProcs array */ /* Length of allProcs array */
uint32 allProcCount; uint32 allProcCount;
/* Head of list of free PGPROC structures */ /* Head of list of free PGPROC structures */
PGPROC *freeProcs; dlist_head freeProcs;
/* Head of list of autovacuum's free PGPROC structures */ /* Head of list of autovacuum's free PGPROC structures */
PGPROC *autovacFreeProcs; dlist_head autovacFreeProcs;
/* Head of list of bgworker free PGPROC structures */ /* Head of list of bgworker free PGPROC structures */
PGPROC *bgworkerFreeProcs; dlist_head bgworkerFreeProcs;
/* Head of list of walsender free PGPROC structures */ /* Head of list of walsender free PGPROC structures */
PGPROC *walsenderFreeProcs; dlist_head walsenderFreeProcs;
/* First pgproc waiting for group XID clear */ /* First pgproc waiting for group XID clear */
pg_atomic_uint32 procArrayGroupFirst; pg_atomic_uint32 procArrayGroupFirst;
/* First pgproc waiting for group transaction status update */ /* First pgproc waiting for group transaction status update */
@ -448,9 +448,8 @@ extern int GetStartupBufferPinWaitBufId(void);
extern bool HaveNFreeProcs(int n); extern bool HaveNFreeProcs(int n);
extern void ProcReleaseLocks(bool isCommit); extern void ProcReleaseLocks(bool isCommit);
extern void ProcQueueInit(PROC_QUEUE *queue);
extern ProcWaitStatus ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable); extern ProcWaitStatus ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable);
extern PGPROC *ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus); extern void ProcWakeup(PGPROC *proc, ProcWaitStatus waitStatus);
extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock); extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
extern void CheckDeadLockAlert(void); extern void CheckDeadLockAlert(void);
extern bool IsWaitingForLock(void); extern bool IsWaitingForLock(void);

View File

@ -1846,7 +1846,6 @@ PROCESS_INFORMATION
PROCLOCK PROCLOCK
PROCLOCKTAG PROCLOCKTAG
PROC_HDR PROC_HDR
PROC_QUEUE
PSID PSID
PSID_AND_ATTRIBUTES PSID_AND_ATTRIBUTES
PSQL_COMP_CASE PSQL_COMP_CASE