mirror of
https://github.com/postgres/postgres.git
synced 2025-05-21 15:54:08 +03:00
780 lines
25 KiB
C
780 lines
25 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* sinvaladt.c
|
|
* POSTGRES shared cache invalidation data manager.
|
|
*
|
|
* Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/storage/ipc/sinvaladt.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include <signal.h>
|
|
#include <unistd.h>
|
|
|
|
#include "access/transam.h"
|
|
#include "miscadmin.h"
|
|
#include "storage/backendid.h"
|
|
#include "storage/ipc.h"
|
|
#include "storage/proc.h"
|
|
#include "storage/procsignal.h"
|
|
#include "storage/shmem.h"
|
|
#include "storage/sinvaladt.h"
|
|
#include "storage/spin.h"
|
|
|
|
/*
|
|
* Conceptually, the shared cache invalidation messages are stored in an
|
|
* infinite array, where maxMsgNum is the next array subscript to store a
|
|
* submitted message in, minMsgNum is the smallest array subscript containing
|
|
* a message not yet read by all backends, and we always have maxMsgNum >=
|
|
* minMsgNum. (They are equal when there are no messages pending.) For each
|
|
* active backend, there is a nextMsgNum pointer indicating the next message it
|
|
* needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
|
|
* backend.
|
|
*
|
|
* (In the current implementation, minMsgNum is a lower bound for the
|
|
* per-process nextMsgNum values, but it isn't rigorously kept equal to the
|
|
* smallest nextMsgNum --- it may lag behind. We only update it when
|
|
* SICleanupQueue is called, and we try not to do that often.)
|
|
*
|
|
* In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
|
|
* entries. We translate MsgNum values into circular-buffer indexes by
|
|
* computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
|
|
* MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
|
|
* doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
|
|
* in the buffer. If the buffer does overflow, we recover by setting the
|
|
* "reset" flag for each backend that has fallen too far behind. A backend
|
|
* that is in "reset" state is ignored while determining minMsgNum. When
|
|
* it does finally attempt to receive inval messages, it must discard all
|
|
* its invalidatable state, since it won't know what it missed.
|
|
*
|
|
* To reduce the probability of needing resets, we send a "catchup" interrupt
|
|
* to any backend that seems to be falling unreasonably far behind. The
|
|
* normal behavior is that at most one such interrupt is in flight at a time;
|
|
* when a backend completes processing a catchup interrupt, it executes
|
|
* SICleanupQueue, which will signal the next-furthest-behind backend if
|
|
* needed. This avoids undue contention from multiple backends all trying
|
|
* to catch up at once. However, the furthest-back backend might be stuck
|
|
* in a state where it can't catch up. Eventually it will get reset, so it
|
|
* won't cause any more problems for anyone but itself. But we don't want
|
|
* to find that a bunch of other backends are now too close to the reset
|
|
* threshold to be saved. So SICleanupQueue is designed to occasionally
|
|
* send extra catchup interrupts as the queue gets fuller, to backends that
|
|
* are far behind and haven't gotten one yet. As long as there aren't a lot
|
|
* of "stuck" backends, we won't need a lot of extra interrupts, since ones
|
|
* that aren't stuck will propagate their interrupts to the next guy.
|
|
*
|
|
* We would have problems if the MsgNum values overflow an integer, so
|
|
* whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
|
|
* from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
|
|
* large so that we don't need to do this often. It must be a multiple of
|
|
* MAXNUMMESSAGES so that the existing circular-buffer entries don't need
|
|
* to be moved when we do it.
|
|
*
|
|
* Access to the shared sinval array is protected by two locks, SInvalReadLock
|
|
* and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
|
|
* authorizes them to modify their own ProcState but not to modify or even
|
|
* look at anyone else's. When we need to perform array-wide updates,
|
|
* such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
|
|
* lock out all readers. Writers take SInvalWriteLock (always in exclusive
|
|
* mode) to serialize adding messages to the queue. Note that a writer
|
|
* can operate in parallel with one or more readers, because the writer
|
|
* has no need to touch anyone's ProcState, except in the infrequent cases
|
|
* when SICleanupQueue is needed. The only point of overlap is that
|
|
* the writer wants to change maxMsgNum while readers need to read it.
|
|
* We deal with that by having a spinlock that readers must take for just
|
|
* long enough to read maxMsgNum, while writers take it for just long enough
|
|
* to write maxMsgNum. (The exact rule is that you need the spinlock to
|
|
* read maxMsgNum if you are not holding SInvalWriteLock, and you need the
|
|
* spinlock to write maxMsgNum unless you are holding both locks.)
|
|
*
|
|
* Note: since maxMsgNum is an int and hence presumably atomically readable/
|
|
* writable, the spinlock might seem unnecessary. The reason it is needed
|
|
* is to provide a memory barrier: we need to be sure that messages written
|
|
* to the array are actually there before maxMsgNum is increased, and that
|
|
* readers will see that data after fetching maxMsgNum. Multiprocessors
|
|
* that have weak memory-ordering guarantees can fail without the memory
|
|
* barrier instructions that are included in the spinlock sequences.
|
|
*/
|
|
|
|
|
|
/*
|
|
* Configurable parameters.
|
|
*
|
|
* MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
|
|
* Must be a power of 2 for speed.
|
|
*
|
|
* MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
|
|
* Must be a multiple of MAXNUMMESSAGES. Should be large.
|
|
*
|
|
* CLEANUP_MIN: the minimum number of messages that must be in the buffer
|
|
* before we bother to call SICleanupQueue.
|
|
*
|
|
* CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
|
|
* we exceed CLEANUP_MIN. Should be a power of 2 for speed.
|
|
*
|
|
* SIG_THRESHOLD: the minimum number of messages a backend must have fallen
|
|
* behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
|
|
*
|
|
* WRITE_QUANTUM: the max number of messages to push into the buffer per
|
|
* iteration of SIInsertDataEntries. Noncritical but should be less than
|
|
* CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
|
|
* per iteration.
|
|
*/
|
|
|
|
#define MAXNUMMESSAGES 4096
|
|
#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
|
|
#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
|
|
#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
|
|
#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
|
|
#define WRITE_QUANTUM 64
|
|
|
|
/* Per-backend state in shared invalidation structure */
|
|
typedef struct ProcState
|
|
{
|
|
/* procPid is zero in an inactive ProcState array entry. */
|
|
pid_t procPid; /* PID of backend, for signaling */
|
|
PGPROC *proc; /* PGPROC of backend */
|
|
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
|
|
int nextMsgNum; /* next message number to read */
|
|
bool resetState; /* backend needs to reset its state */
|
|
bool signaled; /* backend has been sent catchup signal */
|
|
bool hasMessages; /* backend has unread messages */
|
|
|
|
/*
|
|
* Backend only sends invalidations, never receives them. This only makes
|
|
* sense for Startup process during recovery because it doesn't maintain a
|
|
* relcache, yet it fires inval messages to allow query backends to see
|
|
* schema changes.
|
|
*/
|
|
bool sendOnly; /* backend only sends, never receives */
|
|
|
|
/*
|
|
* Next LocalTransactionId to use for each idle backend slot. We keep
|
|
* this here because it is indexed by BackendId and it is convenient to
|
|
* copy the value to and from local memory when MyBackendId is set. It's
|
|
* meaningless in an active ProcState entry.
|
|
*/
|
|
LocalTransactionId nextLXID;
|
|
} ProcState;
|
|
|
|
/* Shared cache invalidation memory segment */
|
|
typedef struct SISeg
|
|
{
|
|
/*
|
|
* General state information
|
|
*/
|
|
int minMsgNum; /* oldest message still needed */
|
|
int maxMsgNum; /* next message number to be assigned */
|
|
int nextThreshold; /* # of messages to call SICleanupQueue */
|
|
int lastBackend; /* index of last active procState entry, +1 */
|
|
int maxBackends; /* size of procState array */
|
|
|
|
slock_t msgnumLock; /* spinlock protecting maxMsgNum */
|
|
|
|
/*
|
|
* Circular buffer holding shared-inval messages
|
|
*/
|
|
SharedInvalidationMessage buffer[MAXNUMMESSAGES];
|
|
|
|
/*
|
|
* Per-backend invalidation state info (has MaxBackends entries).
|
|
*/
|
|
ProcState procState[FLEXIBLE_ARRAY_MEMBER];
|
|
} SISeg;
|
|
|
|
static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
|
|
|
|
|
|
static LocalTransactionId nextLocalTransactionId;
|
|
|
|
static void CleanupInvalidationState(int status, Datum arg);
|
|
|
|
|
|
/*
|
|
* SInvalShmemSize --- return shared-memory space needed
|
|
*/
|
|
Size
|
|
SInvalShmemSize(void)
|
|
{
|
|
Size size;
|
|
|
|
size = offsetof(SISeg, procState);
|
|
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
|
|
|
|
return size;
|
|
}
|
|
|
|
/*
|
|
* CreateSharedInvalidationState
|
|
* Create and initialize the SI message buffer
|
|
*/
|
|
void
|
|
CreateSharedInvalidationState(void)
|
|
{
|
|
int i;
|
|
bool found;
|
|
|
|
/* Allocate space in shared memory */
|
|
shmInvalBuffer = (SISeg *)
|
|
ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
|
|
if (found)
|
|
return;
|
|
|
|
/* Clear message counters, save size of procState array, init spinlock */
|
|
shmInvalBuffer->minMsgNum = 0;
|
|
shmInvalBuffer->maxMsgNum = 0;
|
|
shmInvalBuffer->nextThreshold = CLEANUP_MIN;
|
|
shmInvalBuffer->lastBackend = 0;
|
|
shmInvalBuffer->maxBackends = MaxBackends;
|
|
SpinLockInit(&shmInvalBuffer->msgnumLock);
|
|
|
|
/* The buffer[] array is initially all unused, so we need not fill it */
|
|
|
|
/* Mark all backends inactive, and initialize nextLXID */
|
|
for (i = 0; i < shmInvalBuffer->maxBackends; i++)
|
|
{
|
|
shmInvalBuffer->procState[i].procPid = 0; /* inactive */
|
|
shmInvalBuffer->procState[i].proc = NULL;
|
|
shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
|
|
shmInvalBuffer->procState[i].resetState = false;
|
|
shmInvalBuffer->procState[i].signaled = false;
|
|
shmInvalBuffer->procState[i].hasMessages = false;
|
|
shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* SharedInvalBackendInit
|
|
* Initialize a new backend to operate on the sinval buffer
|
|
*/
|
|
void
|
|
SharedInvalBackendInit(bool sendOnly)
|
|
{
|
|
int index;
|
|
ProcState *stateP = NULL;
|
|
SISeg *segP = shmInvalBuffer;
|
|
|
|
/*
|
|
* This can run in parallel with read operations, but not with write
|
|
* operations, since SIInsertDataEntries relies on lastBackend to set
|
|
* hasMessages appropriately.
|
|
*/
|
|
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
|
|
|
|
/* Look for a free entry in the procState array */
|
|
for (index = 0; index < segP->lastBackend; index++)
|
|
{
|
|
if (segP->procState[index].procPid == 0) /* inactive slot? */
|
|
{
|
|
stateP = &segP->procState[index];
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (stateP == NULL)
|
|
{
|
|
if (segP->lastBackend < segP->maxBackends)
|
|
{
|
|
stateP = &segP->procState[segP->lastBackend];
|
|
Assert(stateP->procPid == 0);
|
|
segP->lastBackend++;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* out of procState slots: MaxBackends exceeded -- report normally
|
|
*/
|
|
MyBackendId = InvalidBackendId;
|
|
LWLockRelease(SInvalWriteLock);
|
|
ereport(FATAL,
|
|
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
|
|
errmsg("sorry, too many clients already")));
|
|
}
|
|
}
|
|
|
|
MyBackendId = (stateP - &segP->procState[0]) + 1;
|
|
|
|
/* Advertise assigned backend ID in MyProc */
|
|
MyProc->backendId = MyBackendId;
|
|
|
|
/* Fetch next local transaction ID into local memory */
|
|
nextLocalTransactionId = stateP->nextLXID;
|
|
|
|
/* mark myself active, with all extant messages already read */
|
|
stateP->procPid = MyProcPid;
|
|
stateP->proc = MyProc;
|
|
stateP->nextMsgNum = segP->maxMsgNum;
|
|
stateP->resetState = false;
|
|
stateP->signaled = false;
|
|
stateP->hasMessages = false;
|
|
stateP->sendOnly = sendOnly;
|
|
|
|
LWLockRelease(SInvalWriteLock);
|
|
|
|
/* register exit routine to mark my entry inactive at exit */
|
|
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
|
|
|
|
elog(DEBUG4, "my backend ID is %d", MyBackendId);
|
|
}
|
|
|
|
/*
|
|
* CleanupInvalidationState
|
|
* Mark the current backend as no longer active.
|
|
*
|
|
* This function is called via on_shmem_exit() during backend shutdown.
|
|
*
|
|
* arg is really of type "SISeg*".
|
|
*/
|
|
static void
|
|
CleanupInvalidationState(int status, Datum arg)
|
|
{
|
|
SISeg *segP = (SISeg *) DatumGetPointer(arg);
|
|
ProcState *stateP;
|
|
int i;
|
|
|
|
Assert(PointerIsValid(segP));
|
|
|
|
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
|
|
|
|
stateP = &segP->procState[MyBackendId - 1];
|
|
|
|
/* Update next local transaction ID for next holder of this backendID */
|
|
stateP->nextLXID = nextLocalTransactionId;
|
|
|
|
/* Mark myself inactive */
|
|
stateP->procPid = 0;
|
|
stateP->proc = NULL;
|
|
stateP->nextMsgNum = 0;
|
|
stateP->resetState = false;
|
|
stateP->signaled = false;
|
|
|
|
/* Recompute index of last active backend */
|
|
for (i = segP->lastBackend; i > 0; i--)
|
|
{
|
|
if (segP->procState[i - 1].procPid != 0)
|
|
break;
|
|
}
|
|
segP->lastBackend = i;
|
|
|
|
LWLockRelease(SInvalWriteLock);
|
|
}
|
|
|
|
/*
|
|
* BackendIdGetProc
|
|
* Get the PGPROC structure for a backend, given the backend ID.
|
|
* The result may be out of date arbitrarily quickly, so the caller
|
|
* must be careful about how this information is used. NULL is
|
|
* returned if the backend is not active.
|
|
*/
|
|
PGPROC *
|
|
BackendIdGetProc(int backendID)
|
|
{
|
|
PGPROC *result = NULL;
|
|
SISeg *segP = shmInvalBuffer;
|
|
|
|
/* Need to lock out additions/removals of backends */
|
|
LWLockAcquire(SInvalWriteLock, LW_SHARED);
|
|
|
|
if (backendID > 0 && backendID <= segP->lastBackend)
|
|
{
|
|
ProcState *stateP = &segP->procState[backendID - 1];
|
|
|
|
result = stateP->proc;
|
|
}
|
|
|
|
LWLockRelease(SInvalWriteLock);
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* BackendIdGetTransactionIds
|
|
* Get the xid and xmin of the backend. The result may be out of date
|
|
* arbitrarily quickly, so the caller must be careful about how this
|
|
* information is used.
|
|
*/
|
|
void
|
|
BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
|
|
{
|
|
SISeg *segP = shmInvalBuffer;
|
|
|
|
*xid = InvalidTransactionId;
|
|
*xmin = InvalidTransactionId;
|
|
|
|
/* Need to lock out additions/removals of backends */
|
|
LWLockAcquire(SInvalWriteLock, LW_SHARED);
|
|
|
|
if (backendID > 0 && backendID <= segP->lastBackend)
|
|
{
|
|
ProcState *stateP = &segP->procState[backendID - 1];
|
|
PGPROC *proc = stateP->proc;
|
|
|
|
if (proc != NULL)
|
|
{
|
|
PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno];
|
|
|
|
*xid = xact->xid;
|
|
*xmin = xact->xmin;
|
|
}
|
|
}
|
|
|
|
LWLockRelease(SInvalWriteLock);
|
|
}
|
|
|
|
/*
|
|
* SIInsertDataEntries
|
|
* Add new invalidation message(s) to the buffer.
|
|
*/
|
|
void
|
|
SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
|
|
{
|
|
SISeg *segP = shmInvalBuffer;
|
|
|
|
/*
|
|
* N can be arbitrarily large. We divide the work into groups of no more
|
|
* than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
|
|
* an unreasonably long time. (This is not so much because we care about
|
|
* letting in other writers, as that some just-caught-up backend might be
|
|
* trying to do SICleanupQueue to pass on its signal, and we don't want it
|
|
* to have to wait a long time.) Also, we need to consider calling
|
|
* SICleanupQueue every so often.
|
|
*/
|
|
while (n > 0)
|
|
{
|
|
int nthistime = Min(n, WRITE_QUANTUM);
|
|
int numMsgs;
|
|
int max;
|
|
int i;
|
|
|
|
n -= nthistime;
|
|
|
|
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
|
|
|
|
/*
|
|
* If the buffer is full, we *must* acquire some space. Clean the
|
|
* queue and reset anyone who is preventing space from being freed.
|
|
* Otherwise, clean the queue only when it's exceeded the next
|
|
* fullness threshold. We have to loop and recheck the buffer state
|
|
* after any call of SICleanupQueue.
|
|
*/
|
|
for (;;)
|
|
{
|
|
numMsgs = segP->maxMsgNum - segP->minMsgNum;
|
|
if (numMsgs + nthistime > MAXNUMMESSAGES ||
|
|
numMsgs >= segP->nextThreshold)
|
|
SICleanupQueue(true, nthistime);
|
|
else
|
|
break;
|
|
}
|
|
|
|
/*
|
|
* Insert new message(s) into proper slot of circular buffer
|
|
*/
|
|
max = segP->maxMsgNum;
|
|
while (nthistime-- > 0)
|
|
{
|
|
segP->buffer[max % MAXNUMMESSAGES] = *data++;
|
|
max++;
|
|
}
|
|
|
|
/* Update current value of maxMsgNum using spinlock */
|
|
SpinLockAcquire(&segP->msgnumLock);
|
|
segP->maxMsgNum = max;
|
|
SpinLockRelease(&segP->msgnumLock);
|
|
|
|
/*
|
|
* Now that the maxMsgNum change is globally visible, we give everyone
|
|
* a swift kick to make sure they read the newly added messages.
|
|
* Releasing SInvalWriteLock will enforce a full memory barrier, so
|
|
* these (unlocked) changes will be committed to memory before we exit
|
|
* the function.
|
|
*/
|
|
for (i = 0; i < segP->lastBackend; i++)
|
|
{
|
|
ProcState *stateP = &segP->procState[i];
|
|
|
|
stateP->hasMessages = true;
|
|
}
|
|
|
|
LWLockRelease(SInvalWriteLock);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* SIGetDataEntries
|
|
* get next SI message(s) for current backend, if there are any
|
|
*
|
|
* Possible return values:
|
|
* 0: no SI message available
|
|
* n>0: next n SI messages have been extracted into data[]
|
|
* -1: SI reset message extracted
|
|
*
|
|
* If the return value is less than the array size "datasize", the caller
|
|
* can assume that there are no more SI messages after the one(s) returned.
|
|
* Otherwise, another call is needed to collect more messages.
|
|
*
|
|
* NB: this can run in parallel with other instances of SIGetDataEntries
|
|
* executing on behalf of other backends, since each instance will modify only
|
|
* fields of its own backend's ProcState, and no instance will look at fields
|
|
* of other backends' ProcStates. We express this by grabbing SInvalReadLock
|
|
* in shared mode. Note that this is not exactly the normal (read-only)
|
|
* interpretation of a shared lock! Look closely at the interactions before
|
|
* allowing SInvalReadLock to be grabbed in shared mode for any other reason!
|
|
*
|
|
* NB: this can also run in parallel with SIInsertDataEntries. It is not
|
|
* guaranteed that we will return any messages added after the routine is
|
|
* entered.
|
|
*
|
|
* Note: we assume that "datasize" is not so large that it might be important
|
|
* to break our hold on SInvalReadLock into segments.
|
|
*/
|
|
int
|
|
SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
|
|
{
|
|
SISeg *segP;
|
|
ProcState *stateP;
|
|
int max;
|
|
int n;
|
|
|
|
segP = shmInvalBuffer;
|
|
stateP = &segP->procState[MyBackendId - 1];
|
|
|
|
/*
|
|
* Before starting to take locks, do a quick, unlocked test to see whether
|
|
* there can possibly be anything to read. On a multiprocessor system,
|
|
* it's possible that this load could migrate backwards and occur before
|
|
* we actually enter this function, so we might miss a sinval message that
|
|
* was just added by some other processor. But they can't migrate
|
|
* backwards over a preceding lock acquisition, so it should be OK. If we
|
|
* haven't acquired a lock preventing against further relevant
|
|
* invalidations, any such occurrence is not much different than if the
|
|
* invalidation had arrived slightly later in the first place.
|
|
*/
|
|
if (!stateP->hasMessages)
|
|
return 0;
|
|
|
|
LWLockAcquire(SInvalReadLock, LW_SHARED);
|
|
|
|
/*
|
|
* We must reset hasMessages before determining how many messages we're
|
|
* going to read. That way, if new messages arrive after we have
|
|
* determined how many we're reading, the flag will get reset and we'll
|
|
* notice those messages part-way through.
|
|
*
|
|
* Note that, if we don't end up reading all of the messages, we had
|
|
* better be certain to reset this flag before exiting!
|
|
*/
|
|
stateP->hasMessages = false;
|
|
|
|
/* Fetch current value of maxMsgNum using spinlock */
|
|
SpinLockAcquire(&segP->msgnumLock);
|
|
max = segP->maxMsgNum;
|
|
SpinLockRelease(&segP->msgnumLock);
|
|
|
|
if (stateP->resetState)
|
|
{
|
|
/*
|
|
* Force reset. We can say we have dealt with any messages added
|
|
* since the reset, as well; and that means we should clear the
|
|
* signaled flag, too.
|
|
*/
|
|
stateP->nextMsgNum = max;
|
|
stateP->resetState = false;
|
|
stateP->signaled = false;
|
|
LWLockRelease(SInvalReadLock);
|
|
return -1;
|
|
}
|
|
|
|
/*
|
|
* Retrieve messages and advance backend's counter, until data array is
|
|
* full or there are no more messages.
|
|
*
|
|
* There may be other backends that haven't read the message(s), so we
|
|
* cannot delete them here. SICleanupQueue() will eventually remove them
|
|
* from the queue.
|
|
*/
|
|
n = 0;
|
|
while (n < datasize && stateP->nextMsgNum < max)
|
|
{
|
|
data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
|
|
stateP->nextMsgNum++;
|
|
}
|
|
|
|
/*
|
|
* If we have caught up completely, reset our "signaled" flag so that
|
|
* we'll get another signal if we fall behind again.
|
|
*
|
|
* If we haven't caught up completely, reset the hasMessages flag so that
|
|
* we see the remaining messages next time.
|
|
*/
|
|
if (stateP->nextMsgNum >= max)
|
|
stateP->signaled = false;
|
|
else
|
|
stateP->hasMessages = true;
|
|
|
|
LWLockRelease(SInvalReadLock);
|
|
return n;
|
|
}
|
|
|
|
/*
|
|
* SICleanupQueue
|
|
* Remove messages that have been consumed by all active backends
|
|
*
|
|
* callerHasWriteLock is true if caller is holding SInvalWriteLock.
|
|
* minFree is the minimum number of message slots to make free.
|
|
*
|
|
* Possible side effects of this routine include marking one or more
|
|
* backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
|
|
* to some backend that seems to be getting too far behind. We signal at
|
|
* most one backend at a time, for reasons explained at the top of the file.
|
|
*
|
|
* Caution: because we transiently release write lock when we have to signal
|
|
* some other backend, it is NOT guaranteed that there are still minFree
|
|
* free message slots at exit. Caller must recheck and perhaps retry.
|
|
*/
|
|
void
|
|
SICleanupQueue(bool callerHasWriteLock, int minFree)
|
|
{
|
|
SISeg *segP = shmInvalBuffer;
|
|
int min,
|
|
minsig,
|
|
lowbound,
|
|
numMsgs,
|
|
i;
|
|
ProcState *needSig = NULL;
|
|
|
|
/* Lock out all writers and readers */
|
|
if (!callerHasWriteLock)
|
|
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
|
|
LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
|
|
|
|
/*
|
|
* Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
|
|
* furthest-back backend that needs signaling (if any), and reset any
|
|
* backends that are too far back. Note that because we ignore sendOnly
|
|
* backends here it is possible for them to keep sending messages without
|
|
* a problem even when they are the only active backend.
|
|
*/
|
|
min = segP->maxMsgNum;
|
|
minsig = min - SIG_THRESHOLD;
|
|
lowbound = min - MAXNUMMESSAGES + minFree;
|
|
|
|
for (i = 0; i < segP->lastBackend; i++)
|
|
{
|
|
ProcState *stateP = &segP->procState[i];
|
|
int n = stateP->nextMsgNum;
|
|
|
|
/* Ignore if inactive or already in reset state */
|
|
if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
|
|
continue;
|
|
|
|
/*
|
|
* If we must free some space and this backend is preventing it, force
|
|
* him into reset state and then ignore until he catches up.
|
|
*/
|
|
if (n < lowbound)
|
|
{
|
|
stateP->resetState = true;
|
|
/* no point in signaling him ... */
|
|
continue;
|
|
}
|
|
|
|
/* Track the global minimum nextMsgNum */
|
|
if (n < min)
|
|
min = n;
|
|
|
|
/* Also see who's furthest back of the unsignaled backends */
|
|
if (n < minsig && !stateP->signaled)
|
|
{
|
|
minsig = n;
|
|
needSig = stateP;
|
|
}
|
|
}
|
|
segP->minMsgNum = min;
|
|
|
|
/*
|
|
* When minMsgNum gets really large, decrement all message counters so as
|
|
* to forestall overflow of the counters. This happens seldom enough that
|
|
* folding it into the previous loop would be a loser.
|
|
*/
|
|
if (min >= MSGNUMWRAPAROUND)
|
|
{
|
|
segP->minMsgNum -= MSGNUMWRAPAROUND;
|
|
segP->maxMsgNum -= MSGNUMWRAPAROUND;
|
|
for (i = 0; i < segP->lastBackend; i++)
|
|
{
|
|
/* we don't bother skipping inactive entries here */
|
|
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Determine how many messages are still in the queue, and set the
|
|
* threshold at which we should repeat SICleanupQueue().
|
|
*/
|
|
numMsgs = segP->maxMsgNum - segP->minMsgNum;
|
|
if (numMsgs < CLEANUP_MIN)
|
|
segP->nextThreshold = CLEANUP_MIN;
|
|
else
|
|
segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
|
|
|
|
/*
|
|
* Lastly, signal anyone who needs a catchup interrupt. Since
|
|
* SendProcSignal() might not be fast, we don't want to hold locks while
|
|
* executing it.
|
|
*/
|
|
if (needSig)
|
|
{
|
|
pid_t his_pid = needSig->procPid;
|
|
BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
|
|
|
|
needSig->signaled = true;
|
|
LWLockRelease(SInvalReadLock);
|
|
LWLockRelease(SInvalWriteLock);
|
|
elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
|
|
SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
|
|
if (callerHasWriteLock)
|
|
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
|
|
}
|
|
else
|
|
{
|
|
LWLockRelease(SInvalReadLock);
|
|
if (!callerHasWriteLock)
|
|
LWLockRelease(SInvalWriteLock);
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
* GetNextLocalTransactionId --- allocate a new LocalTransactionId
|
|
*
|
|
* We split VirtualTransactionIds into two parts so that it is possible
|
|
* to allocate a new one without any contention for shared memory, except
|
|
* for a bit of additional overhead during backend startup/shutdown.
|
|
* The high-order part of a VirtualTransactionId is a BackendId, and the
|
|
* low-order part is a LocalTransactionId, which we assign from a local
|
|
* counter. To avoid the risk of a VirtualTransactionId being reused
|
|
* within a short interval, successive procs occupying the same backend ID
|
|
* slot should use a consecutive sequence of local IDs, which is implemented
|
|
* by copying nextLocalTransactionId as seen above.
|
|
*/
|
|
LocalTransactionId
|
|
GetNextLocalTransactionId(void)
|
|
{
|
|
LocalTransactionId result;
|
|
|
|
/* loop to avoid returning InvalidLocalTransactionId at wraparound */
|
|
do
|
|
{
|
|
result = nextLocalTransactionId++;
|
|
} while (!LocalTransactionIdIsValid(result));
|
|
|
|
return result;
|
|
}
|