diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index d9d704461b7..f0db0a66b5c 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -24,7 +24,53 @@ * IDENTIFICATION * src/backend/storage/lmgr/lwlock.c * - *------------------------------------------------------------------------- + * NOTES: + * + * This used to be a pretty straight forward reader-writer lock + * implementation, in which the internal state was protected by a + * spinlock. Unfortunately the overhead of taking the spinlock proved to be + * too high for workloads/locks that were taken in shared mode very + * frequently. Often we were spinning in the (obviously exclusive) spinlock, + * while trying to acquire a shared lock that was actually free. + * + * Thus a new implementation was devised that provides wait-free shared lock + * acquisition for locks that aren't exclusively locked. + * + * The basic idea is to have a single atomic variable 'lockcount' instead of + * the formerly separate shared and exclusive counters and to use atomic + * operations to acquire the lock. That's fairly easy to do for plain + * rw-spinlocks, but a lot harder for something like LWLocks that want to wait + * in the OS. + * + * For lock acquisition we use an atomic compare-and-exchange on the lockcount + * variable. For exclusive lock we swap in a sentinel value + * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders. + * + * To release the lock we use an atomic decrement to release the lock. If the + * new value is zero (we get that atomically), we know we can/have to release + * waiters. + * + * Obviously it is important that the sentinel value for exclusive locks + * doesn't conflict with the maximum number of possible share lockers - + * luckily MAX_BACKENDS makes that easily possible. + * + * + * The attentive reader might have noticed that naively doing the above has a + * glaring race condition: We try to lock using the atomic operations and + * notice that we have to wait. Unfortunately by the time we have finished + * queuing, the former locker very well might have already finished it's + * work. That's problematic because we're now stuck waiting inside the OS. + + * To mitigate those races we use a two phased attempt at locking: + * Phase 1: Try to do it atomically, if we succeed, nice + * Phase 2: Add ourselves to the waitqueue of the lock + * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from + * the queue + * Phase 4: Sleep till wake-up, goto Phase 1 + * + * This protects us against the problem from above as nobody can release too + * quick, before we're queued, since after Phase 2 we're already queued. + * ------------------------------------------------------------------------- */ #include "postgres.h" @@ -35,8 +81,8 @@ #include "commands/async.h" #include "miscadmin.h" #include "pg_trace.h" +#include "postmaster/postmaster.h" #include "replication/slot.h" -#include "storage/barrier.h" #include "storage/ipc.h" #include "storage/predicate.h" #include "storage/proc.h" @@ -51,6 +97,16 @@ /* We use the ShmemLock spinlock to protect LWLockAssign */ extern slock_t *ShmemLock; +#define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30) +#define LW_FLAG_RELEASE_OK ((uint32) 1 << 29) + +#define LW_VAL_EXCLUSIVE ((uint32) 1 << 24) +#define LW_VAL_SHARED 1 + +#define LW_LOCK_MASK ((uint32) ((1 << 25)-1)) +/* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */ +#define LW_SHARED_MASK ((uint32)(1 << 23)) + /* * This is indexed by tranche ID and stores metadata for all tranches known * to the current backend. @@ -81,8 +137,15 @@ static LWLockTranche MainLWLockTranche; */ #define MAX_SIMUL_LWLOCKS 200 +/* struct representing the LWLocks we're holding */ +typedef struct LWLockHandle +{ + LWLock *lock; + LWLockMode mode; +} LWLockHandle; + static int num_held_lwlocks = 0; -static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS]; +static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS]; static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; @@ -103,6 +166,7 @@ typedef struct lwlock_stats int sh_acquire_count; int ex_acquire_count; int block_count; + int dequeue_self_count; int spin_delay_count; } lwlock_stats; @@ -114,24 +178,42 @@ static lwlock_stats lwlock_stats_dummy; bool Trace_lwlocks = false; inline static void -PRINT_LWDEBUG(const char *where, const LWLock *lock) +PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode) { + /* hide statement & context here, otherwise the log is just too verbose */ if (Trace_lwlocks) - elog(LOG, "%s(%s %d): excl %d shared %d rOK %d", - where, T_NAME(lock), T_ID(lock), - (int) lock->exclusive, lock->shared, - (int) lock->releaseOK); + { + uint32 state = pg_atomic_read_u32(&lock->state); + ereport(LOG, + (errhidestmt(true), + errhidecontext(true), + errmsg("%d: %s(%s %d): excl %u shared %u haswaiters %u waiters %u rOK %d", + MyProcPid, + where, T_NAME(lock), T_ID(lock), + !!(state & LW_VAL_EXCLUSIVE), + state & LW_SHARED_MASK, + !!(state & LW_FLAG_HAS_WAITERS), + pg_atomic_read_u32(&lock->nwaiters), + !!(state & LW_FLAG_RELEASE_OK)))); + } } inline static void -LOG_LWDEBUG(const char *where, const char *name, int index, const char *msg) +LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg) { + /* hide statement & context here, otherwise the log is just too verbose */ if (Trace_lwlocks) - elog(LOG, "%s(%s %d): %s", where, name, index, msg); + { + ereport(LOG, + (errhidestmt(true), + errhidecontext(true), + errmsg("%s(%s %d): %s", where, T_NAME(lock), T_ID(lock), msg))); + } } + #else /* not LOCK_DEBUG */ -#define PRINT_LWDEBUG(a,b) -#define LOG_LWDEBUG(a,b,c,d) +#define PRINT_LWDEBUG(a,b,c) ((void)0) +#define LOG_LWDEBUG(a,b,c) ((void)0) #endif /* LOCK_DEBUG */ #ifdef LWLOCK_STATS @@ -192,11 +274,11 @@ print_lwlock_stats(int code, Datum arg) while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL) { fprintf(stderr, - "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u\n", + "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n", MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name, lwstats->key.instance, lwstats->sh_acquire_count, lwstats->ex_acquire_count, lwstats->block_count, - lwstats->spin_delay_count); + lwstats->spin_delay_count, lwstats->dequeue_self_count); } LWLockRelease(&MainLWLockArray[0].lock); @@ -226,6 +308,7 @@ get_lwlock_stats_entry(LWLock *lock) lwstats->sh_acquire_count = 0; lwstats->ex_acquire_count = 0; lwstats->block_count = 0; + lwstats->dequeue_self_count = 0; lwstats->spin_delay_count = 0; } return lwstats; @@ -336,6 +419,9 @@ LWLockShmemSize(void) void CreateLWLocks(void) { + StaticAssertExpr(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS, + "MAX_BACKENDS too big for lwlock.c"); + if (!IsUnderPostmaster) { int numLocks = NumLWLocks(); @@ -475,13 +561,330 @@ void LWLockInitialize(LWLock *lock, int tranche_id) { SpinLockInit(&lock->mutex); - lock->releaseOK = true; - lock->exclusive = 0; - lock->shared = 0; + pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK); +#ifdef LOCK_DEBUG + pg_atomic_init_u32(&lock->nwaiters, 0); +#endif lock->tranche = tranche_id; dlist_init(&lock->waiters); } +/* + * Internal function that tries to atomically acquire the lwlock in the passed + * in mode. + * + * This function will not block waiting for a lock to become free - that's the + * callers job. + * + * Returns true if the lock isn't free and we need to wait. + */ +static bool +LWLockAttemptLock(LWLock* lock, LWLockMode mode) +{ + AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED); + + /* loop until we've determined whether we could acquire the lock or not */ + while (true) + { + uint32 old_state; + uint32 expected_state; + uint32 desired_state; + bool lock_free; + + old_state = pg_atomic_read_u32(&lock->state); + expected_state = old_state; + desired_state = expected_state; + + if (mode == LW_EXCLUSIVE) + { + lock_free = (expected_state & LW_LOCK_MASK) == 0; + if (lock_free) + desired_state += LW_VAL_EXCLUSIVE; + } + else + { + lock_free = (expected_state & LW_VAL_EXCLUSIVE) == 0; + if (lock_free) + desired_state += LW_VAL_SHARED; + } + + /* + * Attempt to swap in the state we are expecting. If we didn't see + * lock to be free, that's just the old value. If we saw it as free, + * we'll attempt to mark it acquired. The reason that we always swap + * in the value is that this doubles as a memory barrier. We could try + * to be smarter and only swap in values if we saw the lock as free, + * but benchmark haven't shown it as beneficial so far. + * + * Retry if the value changed since we last looked at it. + */ + if (pg_atomic_compare_exchange_u32(&lock->state, + &expected_state, desired_state)) + { + if (lock_free) + { + /* Great! Got the lock. */ +#ifdef LOCK_DEBUG + if (mode == LW_EXCLUSIVE) + lock->owner = MyProc; +#endif + return false; + } + else + return true; /* someobdy else has the lock */ + } + } + pg_unreachable(); +} + +/* + * Wakeup all the lockers that currently have a chance to acquire the lock. + */ +static void +LWLockWakeup(LWLock *lock) +{ + bool new_release_ok; + bool wokeup_somebody = false; + dlist_head wakeup; + dlist_mutable_iter iter; +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; + + lwstats = get_lwlock_stats_entry(lock); +#endif + + dlist_init(&wakeup); + + new_release_ok = true; + + /* Acquire mutex. Time spent holding mutex should be short! */ +#ifdef LWLOCK_STATS + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); +#else + SpinLockAcquire(&lock->mutex); +#endif + + dlist_foreach_modify(iter, &lock->waiters) + { + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); + + if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE) + continue; + + dlist_delete(&waiter->lwWaitLink); + dlist_push_tail(&wakeup, &waiter->lwWaitLink); + + if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) + { + /* + * Prevent additional wakeups until retryer gets to run. Backends + * that are just waiting for the lock to become free don't retry + * automatically. + */ + new_release_ok = false; + /* + * Don't wakeup (further) exclusive locks. + */ + wokeup_somebody = true; + } + + /* + * Once we've woken up an exclusive lock, there's no point in waking + * up anybody else. + */ + if(waiter->lwWaitMode == LW_EXCLUSIVE) + break; + } + + Assert(dlist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS); + + /* Unset both flags at once if required */ + if (!new_release_ok && dlist_is_empty(&wakeup)) + pg_atomic_fetch_and_u32(&lock->state, + ~(LW_FLAG_RELEASE_OK | LW_FLAG_HAS_WAITERS)); + else if (!new_release_ok) + pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_RELEASE_OK); + else if (dlist_is_empty(&wakeup)) + pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS); + else if (new_release_ok) + pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); + + /* We are done updating the shared state of the lock queue. */ + SpinLockRelease(&lock->mutex); + + /* Awaken any waiters I removed from the queue. */ + dlist_foreach_modify(iter, &wakeup) + { + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); + + LOG_LWDEBUG("LWLockRelease", lock, "release waiter"); + dlist_delete(&waiter->lwWaitLink); + /* + * Guarantee that lwWaiting being unset only becomes visible once the + * unlink from the link has completed. Otherwise the target backend + * could be woken up for other reason and enqueue for a new lock - if + * that happens before the list unlink happens, the list would end up + * being corrupted. + * + * The barrier pairs with the SpinLockAcquire() when enqueing for + * another lock. + */ + pg_write_barrier(); + waiter->lwWaiting = false; + PGSemaphoreUnlock(&waiter->sem); + } +} + +/* + * Add ourselves to the end of the queue. + * + * NB: Mode can be LW_WAIT_UNTIL_FREE here! + */ +static void +LWLockQueueSelf(LWLock *lock, LWLockMode mode) +{ +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; + + lwstats = get_lwlock_stats_entry(lock); +#endif + + /* + * If we don't have a PGPROC structure, there's no way to wait. This + * should never occur, since MyProc should only be null during shared + * memory initialization. + */ + if (MyProc == NULL) + elog(PANIC, "cannot wait without a PGPROC structure"); + + if (MyProc->lwWaiting) + elog(PANIC, "queueing for lock while waiting on another one"); + +#ifdef LWLOCK_STATS + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); +#else + SpinLockAcquire(&lock->mutex); +#endif + + /* setting the flag is protected by the spinlock */ + pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS); + + MyProc->lwWaiting = true; + MyProc->lwWaitMode = mode; + + /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */ + if (mode == LW_WAIT_UNTIL_FREE) + dlist_push_head(&lock->waiters, &MyProc->lwWaitLink); + else + dlist_push_tail(&lock->waiters, &MyProc->lwWaitLink); + + /* Can release the mutex now */ + SpinLockRelease(&lock->mutex); + +#ifdef LOCK_DEBUG + pg_atomic_fetch_add_u32(&lock->nwaiters, 1); +#endif + +} + +/* + * Remove ourselves from the waitlist. + * + * This is used if we queued ourselves because we thought we needed to sleep + * but, after further checking, we discovered that we don't actually need to + * do so. Returns false if somebody else already has woken us up, otherwise + * returns true. + */ +static void +LWLockDequeueSelf(LWLock *lock) +{ + bool found = false; + dlist_mutable_iter iter; + +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; + + lwstats = get_lwlock_stats_entry(lock); + + lwstats->dequeue_self_count++; +#endif + +#ifdef LWLOCK_STATS + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); +#else + SpinLockAcquire(&lock->mutex); +#endif + + /* + * Can't just remove ourselves from the list, but we need to iterate over + * all entries as somebody else could have unqueued us. + */ + dlist_foreach_modify(iter, &lock->waiters) + { + PGPROC *proc = dlist_container(PGPROC, lwWaitLink, iter.cur); + if (proc == MyProc) + { + found = true; + dlist_delete(&proc->lwWaitLink); + break; + } + } + + if (dlist_is_empty(&lock->waiters) && + (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0) + { + pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS); + } + + SpinLockRelease(&lock->mutex); + + /* clear waiting state again, nice for debugging */ + if (found) + MyProc->lwWaiting = false; + else + { + int extraWaits = 0; + + /* + * Somebody else dequeued us and has or will wake us up. Deal with the + * superflous absorption of a wakeup. + */ + + /* + * Reset releaseOk if somebody woke us before we removed ourselves - + * they'll have set it to false. + */ + pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); + + /* + * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would + * get reset at some inconvenient point later. Most of the time this + * will immediately return. + */ + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&MyProc->sem, false); + if (!MyProc->lwWaiting) + break; + extraWaits++; + } + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&MyProc->sem); + } + +#ifdef LOCK_DEBUG + { + /* not waiting anymore */ + uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); + Assert(nwaiters < MAX_BACKENDS); + } +#endif +} /* * LWLockAcquire - acquire a lightweight lock in the specified mode @@ -513,18 +916,19 @@ static inline bool LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) { PGPROC *proc = MyProc; - bool retry = false; bool result = true; int extraWaits = 0; #ifdef LWLOCK_STATS lwlock_stats *lwstats; + + lwstats = get_lwlock_stats_entry(lock); #endif - PRINT_LWDEBUG("LWLockAcquire", lock); + AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); + + PRINT_LWDEBUG("LWLockAcquire", lock, mode); #ifdef LWLOCK_STATS - lwstats = get_lwlock_stats_entry(lock); - /* Count lock acquisition attempts */ if (mode == LW_EXCLUSIVE) lwstats->ex_acquire_count++; @@ -570,58 +974,44 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) { bool mustwait; - /* Acquire mutex. Time spent holding mutex should be short! */ -#ifdef LWLOCK_STATS - lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); -#else - SpinLockAcquire(&lock->mutex); -#endif - - /* If retrying, allow LWLockRelease to release waiters again */ - if (retry) - lock->releaseOK = true; - - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) - { - if (lock->exclusive == 0 && lock->shared == 0) - { - lock->exclusive++; - mustwait = false; - } - else - mustwait = true; - } - else - { - if (lock->exclusive == 0) - { - lock->shared++; - mustwait = false; - } - else - mustwait = true; - } + /* + * Try to grab the lock the first time, we're not in the waitqueue + * yet/anymore. + */ + mustwait = LWLockAttemptLock(lock, mode); if (!mustwait) + { + /* XXX: remove before commit? */ + LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock"); break; /* got the lock */ + } /* - * Add myself to wait queue. - * - * If we don't have a PGPROC structure, there's no way to wait. This - * should never occur, since MyProc should only be null during shared - * memory initialization. + * Ok, at this point we couldn't grab the lock on the first try. We + * cannot simply queue ourselves to the end of the list and wait to be + * woken up because by now the lock could long have been released. + * Instead add us to the queue and try to grab the lock again. If we + * succeed we need to revert the queuing and be happy, otherwise we + * recheck the lock. If we still couldn't grab it, we know that the + * other lock will see our queue entries when releasing since they + * existed before we checked for the lock. */ - if (proc == NULL) - elog(PANIC, "cannot wait without a PGPROC structure"); - proc->lwWaiting = true; - proc->lwWaitMode = mode; - dlist_push_head(&lock->waiters, &proc->lwWaitLink); + /* add to the queue */ + LWLockQueueSelf(lock, mode); - /* Can release the mutex now */ - SpinLockRelease(&lock->mutex); + /* we're now guaranteed to be woken up if necessary */ + mustwait = LWLockAttemptLock(lock, mode); + + /* ok, grabbed the lock the second time round, need to undo queueing */ + if (!mustwait) + { + LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue"); + + LWLockDequeueSelf(lock); + break; + } /* * Wait until awakened. @@ -635,7 +1025,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) * so that the lock manager or signal manager will see the received * signal when it next waits. */ - LOG_LWDEBUG("LWLockAcquire", T_NAME(lock), T_ID(lock), "waiting"); + LOG_LWDEBUG("LWLockAcquire", lock, "waiting"); #ifdef LWLOCK_STATS lwstats->block_count++; @@ -652,12 +1042,22 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) extraWaits++; } + /* Retrying, allow LWLockRelease to release waiters again. */ + pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); + +#ifdef LOCK_DEBUG + { + /* not waiting anymore */ + uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); + Assert(nwaiters < MAX_BACKENDS); + } +#endif + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode); - LOG_LWDEBUG("LWLockAcquire", T_NAME(lock), T_ID(lock), "awakened"); + LOG_LWDEBUG("LWLockAcquire", lock, "awakened"); /* Now loop back and try to acquire lock again. */ - retry = true; result = false; } @@ -665,13 +1065,11 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) if (valptr) *valptr = val; - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode); /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lock; + held_lwlocks[num_held_lwlocks].lock = lock; + held_lwlocks[num_held_lwlocks++].mode = mode; /* * Fix the process wait semaphore's count for any absorbed wakeups. @@ -694,7 +1092,9 @@ LWLockConditionalAcquire(LWLock *lock, LWLockMode mode) { bool mustwait; - PRINT_LWDEBUG("LWLockConditionalAcquire", lock); + AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); + + PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode); /* Ensure we will have room to remember the lock */ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) @@ -707,50 +1107,24 @@ LWLockConditionalAcquire(LWLock *lock, LWLockMode mode) */ HOLD_INTERRUPTS(); - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&lock->mutex); - - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) - { - if (lock->exclusive == 0 && lock->shared == 0) - { - lock->exclusive++; - mustwait = false; - } - else - mustwait = true; - } - else - { - if (lock->exclusive == 0) - { - lock->shared++; - mustwait = false; - } - else - mustwait = true; - } - - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); + /* Check for the lock */ + mustwait = LWLockAttemptLock(lock, mode); if (mustwait) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockConditionalAcquire", - T_NAME(lock), T_ID(lock), "failed"); - TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), - T_ID(lock), mode); + + LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed"); + TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), T_ID(lock), mode); } else { /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lock; + held_lwlocks[num_held_lwlocks].lock = lock; + held_lwlocks[num_held_lwlocks++].mode = mode; TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), T_ID(lock), mode); } - return !mustwait; } @@ -776,14 +1150,14 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) int extraWaits = 0; #ifdef LWLOCK_STATS lwlock_stats *lwstats; -#endif - PRINT_LWDEBUG("LWLockAcquireOrWait", lock); - -#ifdef LWLOCK_STATS lwstats = get_lwlock_stats_entry(lock); #endif + Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE); + + PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode); + /* Ensure we will have room to remember the lock */ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) elog(ERROR, "too many LWLocks taken"); @@ -795,81 +1169,63 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) */ HOLD_INTERRUPTS(); - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&lock->mutex); - - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) - { - if (lock->exclusive == 0 && lock->shared == 0) - { - lock->exclusive++; - mustwait = false; - } - else - mustwait = true; - } - else - { - if (lock->exclusive == 0) - { - lock->shared++; - mustwait = false; - } - else - mustwait = true; - } + /* + * NB: We're using nearly the same twice-in-a-row lock acquisition + * protocol as LWLockAcquire(). Check its comments for details. + */ + mustwait = LWLockAttemptLock(lock, mode); if (mustwait) { - /* - * Add myself to wait queue. - * - * If we don't have a PGPROC structure, there's no way to wait. This - * should never occur, since MyProc should only be null during shared - * memory initialization. - */ - if (proc == NULL) - elog(PANIC, "cannot wait without a PGPROC structure"); + LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); - proc->lwWaiting = true; - proc->lwWaitMode = LW_WAIT_UNTIL_FREE; - dlist_push_head(&lock->waiters, &proc->lwWaitLink); + mustwait = LWLockAttemptLock(lock, mode); - /* Can release the mutex now */ - SpinLockRelease(&lock->mutex); - - /* - * Wait until awakened. Like in LWLockAcquire, be prepared for bogus - * wakups, because we share the semaphore with ProcWaitForSignal. - */ - LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(lock), T_ID(lock), - "waiting"); + if (mustwait) + { + /* + * Wait until awakened. Like in LWLockAcquire, be prepared for bogus + * wakups, because we share the semaphore with ProcWaitForSignal. + */ + LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting"); #ifdef LWLOCK_STATS - lwstats->block_count++; + lwstats->block_count++; #endif + TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode); - TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode); + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } - for (;;) - { - /* "false" means cannot accept cancel/die interrupt here. */ - PGSemaphoreLock(&proc->sem, false); - if (!proc->lwWaiting) - break; - extraWaits++; +#ifdef LOCK_DEBUG + { + /* not waiting anymore */ + uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); + Assert(nwaiters < MAX_BACKENDS); + } +#endif + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode); + + LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened"); } + else + { + LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue"); - TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode); - - LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(lock), T_ID(lock), - "awakened"); - } - else - { - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); + /* + * Got lock in the second attempt, undo queueing. We need to + * treat this as having successfully acquired the lock, otherwise + * we'd not necessarily wake up people we've prevented from + * acquiring the lock. + */ + LWLockDequeueSelf(lock); + } } /* @@ -882,16 +1238,17 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(lock), T_ID(lock), "failed"); + LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed"); TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), T_ID(lock), mode); } else { + LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded"); /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lock; - TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock), - mode); + held_lwlocks[num_held_lwlocks].lock = lock; + held_lwlocks[num_held_lwlocks++].mode = mode; + TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock), mode); } return !mustwait; @@ -923,13 +1280,11 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) bool result = false; #ifdef LWLOCK_STATS lwlock_stats *lwstats; + + lwstats = get_lwlock_stats_entry(lock); #endif - PRINT_LWDEBUG("LWLockWaitForVar", lock); - -#ifdef LWLOCK_STATS - lwstats = get_lwlock_stats_entry(lock); -#endif /* LWLOCK_STATS */ + PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE); /* * Quick test first to see if it the slot is free right now. @@ -938,7 +1293,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) * barrier here as far as the current usage is concerned. But that might * not be safe in general. */ - if (lock->exclusive == 0) + if ((pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) == 0) return true; /* @@ -956,21 +1311,24 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) bool mustwait; uint64 value; - /* Acquire mutex. Time spent holding mutex should be short! */ + mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; + + if (mustwait) + { + /* + * Perform comparison using spinlock as we can't rely on atomic 64 + * bit reads/stores. + */ #ifdef LWLOCK_STATS - lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); #else - SpinLockAcquire(&lock->mutex); + SpinLockAcquire(&lock->mutex); #endif - /* Is the lock now free, and if not, does the value match? */ - if (lock->exclusive == 0) - { - result = true; - mustwait = false; - } - else - { + /* + * XXX: We can significantly optimize this on platforms with 64bit + * atomics. + */ value = *valptr; if (value != oldval) { @@ -980,27 +1338,42 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) } else mustwait = true; + SpinLockRelease(&lock->mutex); } + else + mustwait = false; if (!mustwait) break; /* the lock was free or value didn't match */ /* - * Add myself to wait queue. + * Add myself to wait queue. Note that this is racy, somebody else + * could wakeup before we're finished queuing. + * NB: We're using nearly the same twice-in-a-row lock acquisition + * protocol as LWLockAcquire(). Check its comments for details. */ - proc->lwWaiting = true; - proc->lwWaitMode = LW_WAIT_UNTIL_FREE; - /* waiters are added to the front of the queue */ - dlist_push_head(&lock->waiters, &proc->lwWaitLink); + LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); /* - * Set releaseOK, to make sure we get woken up as soon as the lock is - * released. + * Set RELEASE_OK flag, to make sure we get woken up as soon as the + * lock is released. */ - lock->releaseOK = true; + pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); - /* Can release the mutex now */ - SpinLockRelease(&lock->mutex); + /* + * We're now guaranteed to be woken up if necessary. Recheck the + * lock's state. + */ + mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; + + /* Ok, lock is free after we queued ourselves. Undo queueing. */ + if (!mustwait) + { + LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue"); + + LWLockDequeueSelf(lock); + break; + } /* * Wait until awakened. @@ -1014,7 +1387,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) * so that the lock manager or signal manager will see the received * signal when it next waits. */ - LOG_LWDEBUG("LWLockWaitForVar", T_NAME(lock), T_ID(lock), "waiting"); + LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting"); #ifdef LWLOCK_STATS lwstats->block_count++; @@ -1032,17 +1405,22 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) extraWaits++; } +#ifdef LOCK_DEBUG + { + /* not waiting anymore */ + uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); + Assert(nwaiters < MAX_BACKENDS); + } +#endif + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE); - LOG_LWDEBUG("LWLockWaitForVar", T_NAME(lock), T_ID(lock), "awakened"); + LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened"); /* Now loop back and check the status of the lock again. */ } - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE); /* @@ -1075,14 +1453,24 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) { dlist_head wakeup; dlist_mutable_iter iter; +#ifdef LWLOCK_STATS + lwlock_stats *lwstats; + + lwstats = get_lwlock_stats_entry(lock); +#endif + + PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE); dlist_init(&wakeup); /* Acquire mutex. Time spent holding mutex should be short! */ +#ifdef LWLOCK_STATS + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); +#else SpinLockAcquire(&lock->mutex); +#endif - /* we should hold the lock */ - Assert(lock->exclusive == 1); + Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE); /* Update the lock's value */ *valptr = val; @@ -1112,7 +1500,7 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) { PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); dlist_delete(&waiter->lwWaitLink); - /* check comment in LWLockRelease() about this barrier */ + /* check comment in LWLockWakeup() about this barrier */ pg_write_barrier(); waiter->lwWaiting = false; PGSemaphoreUnlock(&waiter->sem); @@ -1126,22 +1514,22 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) void LWLockRelease(LWLock *lock) { - dlist_head wakeup; - dlist_mutable_iter iter; + LWLockMode mode; + uint32 oldstate; + bool check_waiters; int i; - dlist_init(&wakeup); - - PRINT_LWDEBUG("LWLockRelease", lock); - /* * Remove lock from list of locks held. Usually, but not always, it will * be the latest-acquired lock; so search array backwards. */ for (i = num_held_lwlocks; --i >= 0;) { - if (lock == held_lwlocks[i]) + if (lock == held_lwlocks[i].lock) + { + mode = held_lwlocks[i].mode; break; + } } if (i < 0) elog(ERROR, "lock %s %d is not held", T_NAME(lock), T_ID(lock)); @@ -1149,88 +1537,45 @@ LWLockRelease(LWLock *lock) for (; i < num_held_lwlocks; i++) held_lwlocks[i] = held_lwlocks[i + 1]; - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&lock->mutex); - - /* Release my hold on lock */ - if (lock->exclusive > 0) - lock->exclusive--; - else - { - Assert(lock->shared > 0); - lock->shared--; - } + PRINT_LWDEBUG("LWLockRelease", lock, mode); /* - * See if I need to awaken any waiters. If I released a non-last shared - * hold, there cannot be anything to do. Also, do not awaken any waiters - * if someone has already awakened waiters that haven't yet acquired the - * lock. + * Release my hold on lock, after that it can immediately be acquired by + * others, even if we still have to wakeup other waiters. */ - if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK) + if (mode == LW_EXCLUSIVE) + oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE); + else + oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED); + + /* nobody else can have that kind of lock */ + Assert(!(oldstate & LW_VAL_EXCLUSIVE)); + + + /* + * We're still waiting for backends to get scheduled, don't wake them up + * again. + */ + if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) == + (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) && + (oldstate & LW_LOCK_MASK) == 0) + check_waiters = true; + else + check_waiters = false; + + /* + * As waking up waiters requires the spinlock to be acquired, only do so + * if necessary. + */ + if (check_waiters) { - /* - * Remove the to-be-awakened PGPROCs from the queue. - */ - bool releaseOK = true; - bool wokeup_somebody = false; - - dlist_foreach_modify(iter, &lock->waiters) - { - PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); - - if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE) - continue; - - dlist_delete(&waiter->lwWaitLink); - dlist_push_tail(&wakeup, &waiter->lwWaitLink); - - /* - * Prevent additional wakeups until retryer gets to - * run. Backends that are just waiting for the lock to become - * free don't retry automatically. - */ - if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) - { - releaseOK = false; - wokeup_somebody = true; - } - - if(waiter->lwWaitMode == LW_EXCLUSIVE) - break; - } - lock->releaseOK = releaseOK; + /* XXX: remove before commit? */ + LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters"); + LWLockWakeup(lock); } - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock), T_ID(lock)); - /* - * Awaken any waiters I removed from the queue. - */ - dlist_foreach_modify(iter, &wakeup) - { - PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); - LOG_LWDEBUG("LWLockRelease", T_NAME(lock), T_ID(lock), - "release waiter"); - dlist_delete(&waiter->lwWaitLink); - /* - * Guarantee that lwWaiting being unset only becomes visible once the - * unlink from the link has completed. Otherwise the target backend - * could be woken up for other reason and enqueue for a new lock - if - * that happens before the list unlink happens, the list would end up - * being corrupted. - * - * The barrier pairs with the SpinLockAcquire() when enqueing for - * another lock. - */ - pg_write_barrier(); - waiter->lwWaiting = false; - PGSemaphoreUnlock(&waiter->sem); - } - /* * Now okay to allow cancel/die interrupts. */ @@ -1254,7 +1599,7 @@ LWLockReleaseAll(void) { HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */ - LWLockRelease(held_lwlocks[num_held_lwlocks - 1]); + LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock); } } @@ -1262,8 +1607,8 @@ LWLockReleaseAll(void) /* * LWLockHeldByMe - test whether my process currently holds a lock * - * This is meant as debug support only. We do not distinguish whether the - * lock is held shared or exclusive. + * This is meant as debug support only. We currently do not distinguish + * whether the lock is held shared or exclusive. */ bool LWLockHeldByMe(LWLock *l) @@ -1272,7 +1617,7 @@ LWLockHeldByMe(LWLock *l) for (i = 0; i < num_held_lwlocks; i++) { - if (held_lwlocks[i] == l) + if (held_lwlocks[i].lock == l) return true; } return false; diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index c84970a7add..f15a9517144 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -16,6 +16,7 @@ #include "lib/ilist.h" #include "storage/s_lock.h" +#include "port/atomics.h" struct PGPROC; @@ -47,11 +48,16 @@ typedef struct LWLockTranche typedef struct LWLock { slock_t mutex; /* Protects LWLock and queue of PGPROCs */ - bool releaseOK; /* T if ok to release waiters */ - char exclusive; /* # of exclusive holders (0 or 1) */ - int shared; /* # of shared holders (0..MaxBackends) */ - int tranche; /* tranche ID */ + uint16 tranche; /* tranche ID */ + + pg_atomic_uint32 state; /* state of exlusive/nonexclusive lockers */ +#ifdef LOCK_DEBUG + pg_atomic_uint32 nwaiters; /* number of waiters */ +#endif dlist_head waiters; /* list of waiting PGPROCs */ +#ifdef LOCK_DEBUG + struct PGPROC *owner; /* last exlusive owner of the lock */ +#endif } LWLock; /* @@ -66,11 +72,11 @@ typedef struct LWLock * (Of course, we have to also ensure that the array start address is suitably * aligned.) * - * Even on a 32-bit platform, an lwlock will be more than 16 bytes, because - * it contains 2 integers and 2 pointers, plus other stuff. It should fit - * into 32 bytes, though, unless slock_t is really big. On a 64-bit platform, - * it should fit into 32 bytes unless slock_t is larger than 4 bytes. We - * allow for that just in case. + * On a 32-bit platforms a LWLock will these days fit into 16 bytes, but since + * that didn't use to be the case and cramming more lwlocks into a cacheline + * might be detrimental performancewise we still use 32 byte alignment + * there. So, both on 32 and 64 bit platforms, it should fit into 32 bytes + * unless slock_t is really big. We allow for that just in case. */ #define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 32 ? 32 : 64)