diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index 4f446aab7a4..1ecf4bdab9f 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -143,6 +143,7 @@ typedef struct ProcState int nextMsgNum; /* next message number to read */ bool resetState; /* backend needs to reset its state */ bool signaled; /* backend has been sent catchup signal */ + bool hasMessages; /* backend has unread messages */ /* * Backend only sends invalidations, never receives them. This only makes @@ -248,6 +249,7 @@ CreateSharedInvalidationState(void) shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ shmInvalBuffer->procState[i].resetState = false; shmInvalBuffer->procState[i].signaled = false; + shmInvalBuffer->procState[i].hasMessages = false; shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; } } @@ -264,11 +266,9 @@ SharedInvalBackendInit(bool sendOnly) SISeg *segP = shmInvalBuffer; /* - * This can run in parallel with read operations, and for that matter with - * write operations; but not in parallel with additions and removals of - * backends, nor in parallel with SICleanupQueue. It doesn't seem worth - * having a third lock, so we choose to use SInvalWriteLock to serialize - * additions/removals. + * This can run in parallel with read operations, but not with write + * operations, since SIInsertDataEntries relies on lastBackend to set + * hasMessages appropriately. */ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); @@ -316,6 +316,7 @@ SharedInvalBackendInit(bool sendOnly) stateP->nextMsgNum = segP->maxMsgNum; stateP->resetState = false; stateP->signaled = false; + stateP->hasMessages = false; stateP->sendOnly = sendOnly; LWLockRelease(SInvalWriteLock); @@ -417,6 +418,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) int nthistime = Min(n, WRITE_QUANTUM); int numMsgs; int max; + int i; n -= nthistime; @@ -459,6 +461,19 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) SpinLockRelease(&vsegP->msgnumLock); } + /* + * Now that the maxMsgNum change is globally visible, we give + * everyone a swift kick to make sure they read the newly added + * messages. Releasing SInvalWriteLock will enforce a full memory + * barrier, so these (unlocked) changes will be committed to memory + * before we exit the function. + */ + for (i = 0; i < segP->lastBackend; i++) + { + ProcState *stateP = &segP->procState[i]; + stateP->hasMessages = TRUE; + } + LWLockRelease(SInvalWriteLock); } } @@ -499,11 +514,36 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) int max; int n; - LWLockAcquire(SInvalReadLock, LW_SHARED); - segP = shmInvalBuffer; stateP = &segP->procState[MyBackendId - 1]; + /* + * Before starting to take locks, do a quick, unlocked test to see whether + * there can possibly be anything to read. On a multiprocessor system, + * it's possible that this load could migrate backwards and occur before we + * actually enter this function, so we might miss a sinval message that + * was just added by some other processor. But they can't migrate + * backwards over a preceding lock acquisition, so it should be OK. If + * we haven't acquired a lock preventing against further relevant + * invalidations, any such occurrence is not much different than if the + * invalidation had arrived slightly later in the first place. + */ + if (!stateP->hasMessages) + return 0; + + LWLockAcquire(SInvalReadLock, LW_SHARED); + + /* + * We must reset hasMessages before determining how many messages we're + * going to read. That way, if new messages arrive after we have + * determined how many we're reading, the flag will get reset and we'll + * notice those messages part-way through. + * + * Note that, if we don't end up reading all of the messages, we had + * better be certain to reset this flag before exiting! + */ + stateP->hasMessages = FALSE; + /* Fetch current value of maxMsgNum using spinlock */ { /* use volatile pointer to prevent code rearrangement */ @@ -544,10 +584,16 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) } /* - * Reset our "signaled" flag whenever we have caught up completely. + * If we have caught up completely, reset our "signaled" flag so that + * we'll get another signal if we fall behind again. + * + * If we haven't catch up completely, reset the hasMessages flag so that + * we see the remaining messages next time. */ if (stateP->nextMsgNum >= max) stateP->signaled = false; + else + stateP->hasMessages = TRUE; LWLockRelease(SInvalReadLock); return n;