mirror of
https://github.com/postgres/postgres.git
synced 2025-05-28 05:21:27 +03:00
522 lines
14 KiB
C
522 lines
14 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* sinvaladt.c
|
|
* POSTGRES shared cache invalidation segment definitions.
|
|
*
|
|
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.69 2008/03/18 12:36:43 alvherre Exp $
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "miscadmin.h"
|
|
#include "storage/backendid.h"
|
|
#include "storage/ipc.h"
|
|
#include "storage/lwlock.h"
|
|
#include "storage/pmsignal.h"
|
|
#include "storage/proc.h"
|
|
#include "storage/shmem.h"
|
|
#include "storage/sinvaladt.h"
|
|
|
|
|
|
/*
|
|
* Conceptually, the shared cache invalidation messages are stored in an
|
|
* infinite array, where maxMsgNum is the next array subscript to store a
|
|
* submitted message in, minMsgNum is the smallest array subscript containing a
|
|
* message not yet read by all backends, and we always have maxMsgNum >=
|
|
* minMsgNum. (They are equal when there are no messages pending.) For each
|
|
* active backend, there is a nextMsgNum pointer indicating the next message it
|
|
* needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
|
|
* backend.
|
|
*
|
|
* In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
|
|
* entries. We translate MsgNum values into circular-buffer indexes by
|
|
* computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
|
|
* MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
|
|
* doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
|
|
* in the buffer. If the buffer does overflow, we reset it to empty and
|
|
* force each backend to "reset", ie, discard all its invalidatable state.
|
|
*
|
|
* We would have problems if the MsgNum values overflow an integer, so
|
|
* whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
|
|
* from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
|
|
* large so that we don't need to do this often. It must be a multiple of
|
|
* MAXNUMMESSAGES so that the existing circular-buffer entries don't need
|
|
* to be moved when we do it.
|
|
*/
|
|
|
|
|
|
/*
|
|
* Configurable parameters.
|
|
*
|
|
* MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
|
|
* Must be a power of 2 for speed.
|
|
*
|
|
* MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
|
|
* Must be a multiple of MAXNUMMESSAGES. Should be large.
|
|
*/
|
|
|
|
#define MAXNUMMESSAGES 4096
|
|
#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
|
|
|
|
/* Per-backend state in shared invalidation structure */
|
|
typedef struct ProcState
|
|
{
|
|
/* nextMsgNum is -1 in an inactive ProcState array entry. */
|
|
int nextMsgNum; /* next message number to read, or -1 */
|
|
bool resetState; /* true, if backend has to reset its state */
|
|
} ProcState;
|
|
|
|
/* Shared cache invalidation memory segment */
|
|
typedef struct SISeg
|
|
{
|
|
/*
|
|
* General state information
|
|
*/
|
|
int minMsgNum; /* oldest message still needed */
|
|
int maxMsgNum; /* next message number to be assigned */
|
|
int lastBackend; /* index of last active procState entry, +1 */
|
|
int maxBackends; /* size of procState array */
|
|
int freeBackends; /* number of empty procState slots */
|
|
|
|
/*
|
|
* Next LocalTransactionId to use for each idle backend slot. We keep
|
|
* this here because it is indexed by BackendId and it is convenient to
|
|
* copy the value to and from local memory when MyBackendId is set.
|
|
*/
|
|
LocalTransactionId *nextLXID; /* array of maxBackends entries */
|
|
|
|
/*
|
|
* Circular buffer holding shared-inval messages
|
|
*/
|
|
SharedInvalidationMessage buffer[MAXNUMMESSAGES];
|
|
|
|
/*
|
|
* Per-backend state info.
|
|
*
|
|
* We declare procState as 1 entry because C wants a fixed-size array, but
|
|
* actually it is maxBackends entries long.
|
|
*/
|
|
ProcState procState[1]; /* reflects the invalidation state */
|
|
} SISeg;
|
|
|
|
static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
|
|
|
|
|
|
static LocalTransactionId nextLocalTransactionId;
|
|
|
|
static void CleanupInvalidationState(int status, Datum arg);
|
|
static void SISetProcStateInvalid(SISeg *segP);
|
|
|
|
|
|
/*
|
|
* SInvalShmemSize --- return shared-memory space needed
|
|
*/
|
|
Size
|
|
SInvalShmemSize(void)
|
|
{
|
|
Size size;
|
|
|
|
size = offsetof(SISeg, procState);
|
|
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
|
|
|
|
size = add_size(size, mul_size(sizeof(LocalTransactionId), MaxBackends));
|
|
|
|
return size;
|
|
}
|
|
|
|
/*
|
|
* SharedInvalBufferInit
|
|
* Create and initialize the SI message buffer
|
|
*/
|
|
void
|
|
CreateSharedInvalidationState(void)
|
|
{
|
|
Size size;
|
|
int i;
|
|
bool found;
|
|
|
|
/* Allocate space in shared memory */
|
|
size = offsetof(SISeg, procState);
|
|
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
|
|
|
|
shmInvalBuffer = (SISeg *)
|
|
ShmemInitStruct("shmInvalBuffer", size, &found);
|
|
if (found)
|
|
return;
|
|
|
|
shmInvalBuffer->nextLXID = ShmemAlloc(sizeof(LocalTransactionId) * MaxBackends);
|
|
|
|
/* Clear message counters, save size of procState array */
|
|
shmInvalBuffer->minMsgNum = 0;
|
|
shmInvalBuffer->maxMsgNum = 0;
|
|
shmInvalBuffer->lastBackend = 0;
|
|
shmInvalBuffer->maxBackends = MaxBackends;
|
|
shmInvalBuffer->freeBackends = MaxBackends;
|
|
|
|
/* The buffer[] array is initially all unused, so we need not fill it */
|
|
|
|
/* Mark all backends inactive, and initialize nextLXID */
|
|
for (i = 0; i < shmInvalBuffer->maxBackends; i++)
|
|
{
|
|
shmInvalBuffer->procState[i].nextMsgNum = -1; /* inactive */
|
|
shmInvalBuffer->procState[i].resetState = false;
|
|
shmInvalBuffer->nextLXID[i] = InvalidLocalTransactionId;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* SharedInvalBackendInit
|
|
* Initialize a new backend to operate on the sinval buffer
|
|
*/
|
|
void
|
|
SharedInvalBackendInit(void)
|
|
{
|
|
int index;
|
|
ProcState *stateP = NULL;
|
|
SISeg *segP = shmInvalBuffer;
|
|
|
|
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
|
|
|
|
/* Look for a free entry in the procState array */
|
|
for (index = 0; index < segP->lastBackend; index++)
|
|
{
|
|
if (segP->procState[index].nextMsgNum < 0) /* inactive slot? */
|
|
{
|
|
stateP = &segP->procState[index];
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (stateP == NULL)
|
|
{
|
|
if (segP->lastBackend < segP->maxBackends)
|
|
{
|
|
stateP = &segP->procState[segP->lastBackend];
|
|
Assert(stateP->nextMsgNum < 0);
|
|
segP->lastBackend++;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* out of procState slots: MaxBackends exceeded -- report normally
|
|
*/
|
|
MyBackendId = InvalidBackendId;
|
|
LWLockRelease(SInvalLock);
|
|
ereport(FATAL,
|
|
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
|
|
errmsg("sorry, too many clients already")));
|
|
}
|
|
}
|
|
|
|
MyBackendId = (stateP - &segP->procState[0]) + 1;
|
|
|
|
#ifdef INVALIDDEBUG
|
|
elog(DEBUG2, "my backend id is %d", MyBackendId);
|
|
#endif /* INVALIDDEBUG */
|
|
|
|
/* Advertise assigned backend ID in MyProc */
|
|
MyProc->backendId = MyBackendId;
|
|
|
|
/* Reduce free slot count */
|
|
segP->freeBackends--;
|
|
|
|
/* Fetch next local transaction ID into local memory */
|
|
nextLocalTransactionId = segP->nextLXID[MyBackendId - 1];
|
|
|
|
/* mark myself active, with all extant messages already read */
|
|
stateP->nextMsgNum = segP->maxMsgNum;
|
|
stateP->resetState = false;
|
|
|
|
LWLockRelease(SInvalLock);
|
|
|
|
/* register exit routine to mark my entry inactive at exit */
|
|
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
|
|
}
|
|
|
|
/*
|
|
* CleanupInvalidationState
|
|
* Mark the current backend as no longer active.
|
|
*
|
|
* This function is called via on_shmem_exit() during backend shutdown,
|
|
* so the caller has NOT acquired the lock for us.
|
|
*
|
|
* arg is really of type "SISeg*".
|
|
*/
|
|
static void
|
|
CleanupInvalidationState(int status, Datum arg)
|
|
{
|
|
SISeg *segP = (SISeg *) DatumGetPointer(arg);
|
|
int i;
|
|
|
|
Assert(PointerIsValid(segP));
|
|
|
|
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
|
|
|
|
/* Update next local transaction ID for next holder of this backendID */
|
|
segP->nextLXID[MyBackendId - 1] = nextLocalTransactionId;
|
|
|
|
/* Mark myself inactive */
|
|
segP->procState[MyBackendId - 1].nextMsgNum = -1;
|
|
segP->procState[MyBackendId - 1].resetState = false;
|
|
|
|
/* Recompute index of last active backend */
|
|
for (i = segP->lastBackend; i > 0; i--)
|
|
{
|
|
if (segP->procState[i - 1].nextMsgNum >= 0)
|
|
break;
|
|
}
|
|
segP->lastBackend = i;
|
|
|
|
/* Adjust free slot count */
|
|
segP->freeBackends++;
|
|
|
|
LWLockRelease(SInvalLock);
|
|
}
|
|
|
|
/*
|
|
* SIInsertDataEntry
|
|
* Add a new invalidation message to the buffer.
|
|
*
|
|
* If we are unable to insert the message because the buffer is full,
|
|
* then clear the buffer and assert the "reset" flag to each backend.
|
|
* This will cause all the backends to discard *all* invalidatable state.
|
|
*
|
|
* Returns true for normal successful insertion, false if had to reset.
|
|
*/
|
|
bool
|
|
SIInsertDataEntry(SharedInvalidationMessage *data)
|
|
{
|
|
int numMsgs;
|
|
bool signal_postmaster = false;
|
|
SISeg *segP;
|
|
|
|
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
|
|
|
|
segP = shmInvalBuffer;
|
|
numMsgs = segP->maxMsgNum - segP->minMsgNum;
|
|
|
|
/* Is the buffer full? */
|
|
if (numMsgs >= MAXNUMMESSAGES)
|
|
{
|
|
/*
|
|
* Don't panic just yet: slowest backend might have consumed some
|
|
* messages but not yet have done SIDelExpiredDataEntries() to advance
|
|
* minMsgNum. So, make sure minMsgNum is up-to-date.
|
|
*/
|
|
SIDelExpiredDataEntries(true);
|
|
numMsgs = segP->maxMsgNum - segP->minMsgNum;
|
|
if (numMsgs >= MAXNUMMESSAGES)
|
|
{
|
|
/* Yup, it's definitely full, no choice but to reset */
|
|
SISetProcStateInvalid(segP);
|
|
LWLockRelease(SInvalLock);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Try to prevent table overflow. When the table is 70% full send a
|
|
* WAKEN_CHILDREN request to the postmaster. The postmaster will send a
|
|
* SIGUSR1 signal to all the backends, which will cause sinval.c to read
|
|
* any pending SI entries.
|
|
*
|
|
* This should never happen if all the backends are actively executing
|
|
* queries, but if a backend is sitting idle then it won't be starting
|
|
* transactions and so won't be reading SI entries.
|
|
*/
|
|
if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && IsUnderPostmaster)
|
|
signal_postmaster = true;
|
|
|
|
/*
|
|
* Insert new message into proper slot of circular buffer
|
|
*/
|
|
segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
|
|
segP->maxMsgNum++;
|
|
|
|
LWLockRelease(SInvalLock);
|
|
|
|
if (signal_postmaster)
|
|
{
|
|
elog(DEBUG4, "SI table is 70%% full, signaling postmaster");
|
|
SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* SISetProcStateInvalid
|
|
* Flush pending messages from buffer, assert reset flag for each backend
|
|
*
|
|
* This is used only to recover from SI buffer overflow.
|
|
*/
|
|
static void
|
|
SISetProcStateInvalid(SISeg *segP)
|
|
{
|
|
int i;
|
|
|
|
segP->minMsgNum = 0;
|
|
segP->maxMsgNum = 0;
|
|
|
|
for (i = 0; i < segP->lastBackend; i++)
|
|
{
|
|
if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
|
|
{
|
|
segP->procState[i].resetState = true;
|
|
segP->procState[i].nextMsgNum = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* SIGetDataEntry
|
|
* get next SI message for specified backend, if there is one
|
|
*
|
|
* Possible return values:
|
|
* 0: no SI message available
|
|
* 1: next SI message has been extracted into *data
|
|
* (there may be more messages available after this one!)
|
|
* -1: SI reset message extracted
|
|
*
|
|
* NB: this can run in parallel with other instances of SIGetDataEntry
|
|
* executing on behalf of other backends, since each instance will modify only
|
|
* fields of its own backend's ProcState, and no instance will look at fields
|
|
* of other backends' ProcStates. We express this by grabbing SInvalLock in
|
|
* shared mode. Note that this is not exactly the normal (read-only)
|
|
* interpretation of a shared lock! Look closely at the interactions before
|
|
* allowing SInvalLock to be grabbed in shared mode for any other reason!
|
|
*/
|
|
int
|
|
SIGetDataEntry(int backendId, SharedInvalidationMessage *data)
|
|
{
|
|
ProcState *stateP;
|
|
SISeg *segP;
|
|
|
|
LWLockAcquire(SInvalLock, LW_SHARED);
|
|
|
|
segP = shmInvalBuffer;
|
|
stateP = &segP->procState[backendId - 1];
|
|
|
|
if (stateP->resetState)
|
|
{
|
|
/*
|
|
* Force reset. We can say we have dealt with any messages added
|
|
* since the reset, as well...
|
|
*/
|
|
stateP->resetState = false;
|
|
stateP->nextMsgNum = segP->maxMsgNum;
|
|
LWLockRelease(SInvalLock);
|
|
return -1;
|
|
}
|
|
|
|
if (stateP->nextMsgNum >= segP->maxMsgNum)
|
|
{
|
|
LWLockRelease(SInvalLock);
|
|
return 0; /* nothing to read */
|
|
}
|
|
|
|
/*
|
|
* Retrieve message and advance my counter.
|
|
*/
|
|
*data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
|
|
stateP->nextMsgNum++;
|
|
|
|
/*
|
|
* There may be other backends that haven't read the message, so we cannot
|
|
* delete it here. SIDelExpiredDataEntries() should be called to remove
|
|
* dead messages.
|
|
*/
|
|
|
|
LWLockRelease(SInvalLock);
|
|
return 1; /* got a message */
|
|
}
|
|
|
|
/*
|
|
* SIDelExpiredDataEntries
|
|
* Remove messages that have been consumed by all active backends
|
|
*/
|
|
void
|
|
SIDelExpiredDataEntries(bool locked)
|
|
{
|
|
SISeg *segP = shmInvalBuffer;
|
|
int min,
|
|
i,
|
|
h;
|
|
|
|
if (!locked)
|
|
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
|
|
|
|
min = segP->maxMsgNum;
|
|
if (min == segP->minMsgNum)
|
|
{
|
|
if (!locked)
|
|
LWLockRelease(SInvalLock);
|
|
return; /* fast path if no messages exist */
|
|
}
|
|
|
|
/* Recompute minMsgNum = minimum of all backends' nextMsgNum */
|
|
|
|
for (i = 0; i < segP->lastBackend; i++)
|
|
{
|
|
h = segP->procState[i].nextMsgNum;
|
|
if (h >= 0)
|
|
{ /* backend active */
|
|
if (h < min)
|
|
min = h;
|
|
}
|
|
}
|
|
segP->minMsgNum = min;
|
|
|
|
/*
|
|
* When minMsgNum gets really large, decrement all message counters so as
|
|
* to forestall overflow of the counters.
|
|
*/
|
|
if (min >= MSGNUMWRAPAROUND)
|
|
{
|
|
segP->minMsgNum -= MSGNUMWRAPAROUND;
|
|
segP->maxMsgNum -= MSGNUMWRAPAROUND;
|
|
for (i = 0; i < segP->lastBackend; i++)
|
|
{
|
|
if (segP->procState[i].nextMsgNum >= 0)
|
|
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
|
|
}
|
|
}
|
|
|
|
if (!locked)
|
|
LWLockRelease(SInvalLock);
|
|
}
|
|
|
|
|
|
/*
|
|
* GetNextLocalTransactionId --- allocate a new LocalTransactionId
|
|
*
|
|
* We split VirtualTransactionIds into two parts so that it is possible
|
|
* to allocate a new one without any contention for shared memory, except
|
|
* for a bit of additional overhead during backend startup/shutdown.
|
|
* The high-order part of a VirtualTransactionId is a BackendId, and the
|
|
* low-order part is a LocalTransactionId, which we assign from a local
|
|
* counter. To avoid the risk of a VirtualTransactionId being reused
|
|
* within a short interval, successive procs occupying the same backend ID
|
|
* slot should use a consecutive sequence of local IDs, which is implemented
|
|
* by copying nextLocalTransactionId as seen above.
|
|
*/
|
|
LocalTransactionId
|
|
GetNextLocalTransactionId(void)
|
|
{
|
|
LocalTransactionId result;
|
|
|
|
/* loop to avoid returning InvalidLocalTransactionId at wraparound */
|
|
do
|
|
{
|
|
result = nextLocalTransactionId++;
|
|
} while (!LocalTransactionIdIsValid(result));
|
|
|
|
return result;
|
|
}
|