/*------------------------------------------------------------------------- * * sinvaladt.c * POSTGRES shared cache invalidation segment definitions. * * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.45 2002/03/02 23:35:57 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" #include "miscadmin.h" #include "storage/backendid.h" #include "storage/pmsignal.h" #include "storage/proc.h" #include "storage/sinvaladt.h" SISeg *shmInvalBuffer; static void CleanupInvalidationState(int status, Datum arg); static void SISetProcStateInvalid(SISeg *segP); /* * SInvalShmemSize --- return shared-memory space needed */ int SInvalShmemSize(int maxBackends) { /* * Figure space needed. Note sizeof(SISeg) includes the first * ProcState entry. */ return sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1); } /* * SIBufferInit * Create and initialize a new SI message buffer */ void SIBufferInit(int maxBackends) { int segSize; SISeg *segP; int i; /* Allocate space in shared memory */ segSize = SInvalShmemSize(maxBackends); shmInvalBuffer = segP = (SISeg *) ShmemAlloc(segSize); /* Clear message counters, save size of procState array */ segP->minMsgNum = 0; segP->maxMsgNum = 0; segP->lastBackend = 0; segP->maxBackends = maxBackends; /* The buffer[] array is initially all unused, so we need not fill it */ /* Mark all backends inactive */ for (i = 0; i < maxBackends; i++) { segP->procState[i].nextMsgNum = -1; /* inactive */ segP->procState[i].resetState = false; segP->procState[i].procStruct = INVALID_OFFSET; } } /* * SIBackendInit * Initialize a new backend to operate on the sinval buffer * * Returns: * >0 A-OK * 0 Failed to find a free procState slot (ie, MaxBackends exceeded) * <0 Some other failure (not currently used) * * NB: this routine, and all following ones, must be executed with the * SInvalLock lock held, since there may be multiple backends trying * to access the buffer. */ int SIBackendInit(SISeg *segP) { int index; ProcState *stateP = NULL; /* Look for a free entry in the procState array */ for (index = 0; index < segP->lastBackend; index++) { if (segP->procState[index].nextMsgNum < 0) /* inactive slot? */ { stateP = &segP->procState[index]; break; } } if (stateP == NULL) { if (segP->lastBackend < segP->maxBackends) { stateP = &segP->procState[segP->lastBackend]; Assert(stateP->nextMsgNum < 0); segP->lastBackend++; } else { /* out of procState slots */ MyBackendId = InvalidBackendId; return 0; } } MyBackendId = (stateP - &segP->procState[0]) + 1; #ifdef INVALIDDEBUG elog(DEBUG1, "SIBackendInit: backend id %d", MyBackendId); #endif /* INVALIDDEBUG */ /* mark myself active, with all extant messages already read */ stateP->nextMsgNum = segP->maxMsgNum; stateP->resetState = false; stateP->procStruct = MAKE_OFFSET(MyProc); /* register exit routine to mark my entry inactive at exit */ on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); return 1; } /* * CleanupInvalidationState * Mark the current backend as no longer active. * * This function is called via on_shmem_exit() during backend shutdown, * so the caller has NOT acquired the lock for us. * * arg is really of type "SISeg*". */ static void CleanupInvalidationState(int status, Datum arg) { SISeg *segP = (SISeg *) DatumGetPointer(arg); int i; Assert(PointerIsValid(segP)); LWLockAcquire(SInvalLock, LW_EXCLUSIVE); /* Mark myself inactive */ segP->procState[MyBackendId - 1].nextMsgNum = -1; segP->procState[MyBackendId - 1].resetState = false; segP->procState[MyBackendId - 1].procStruct = INVALID_OFFSET; /* Recompute index of last active backend */ for (i = segP->lastBackend; i > 0; i--) { if (segP->procState[i - 1].nextMsgNum >= 0) break; } segP->lastBackend = i; LWLockRelease(SInvalLock); } /* * SIInsertDataEntry * Add a new invalidation message to the buffer. * * If we are unable to insert the message because the buffer is full, * then clear the buffer and assert the "reset" flag to each backend. * This will cause all the backends to discard *all* invalidatable state. * * Returns true for normal successful insertion, false if had to reset. */ bool SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data) { int numMsgs = segP->maxMsgNum - segP->minMsgNum; /* Is the buffer full? */ if (numMsgs >= MAXNUMMESSAGES) { /* * Don't panic just yet: slowest backend might have consumed some * messages but not yet have done SIDelExpiredDataEntries() to * advance minMsgNum. So, make sure minMsgNum is up-to-date. */ SIDelExpiredDataEntries(segP); numMsgs = segP->maxMsgNum - segP->minMsgNum; if (numMsgs >= MAXNUMMESSAGES) { /* Yup, it's definitely full, no choice but to reset */ SISetProcStateInvalid(segP); return false; } } /* * Try to prevent table overflow. When the table is 70% full send a * WAKEN_CHILDREN request to the postmaster. The postmaster will send * a SIGUSR2 signal (ordinarily a NOTIFY signal) to all the backends. * This will force idle backends to execute a transaction to look * through pg_listener for NOTIFY messages, and as a byproduct of the * transaction start they will read SI entries. * * This should never happen if all the backends are actively executing * queries, but if a backend is sitting idle then it won't be starting * transactions and so won't be reading SI entries. * * dz - 27 Jan 1998 */ if (numMsgs == (MAXNUMMESSAGES * 70 / 100) && IsUnderPostmaster) { elog(DEBUG3, "SIInsertDataEntry: table is 70%% full, signaling postmaster"); SendPostmasterSignal(PMSIGNAL_WAKEN_CHILDREN); } /* * Insert new message into proper slot of circular buffer */ segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data; segP->maxMsgNum++; return true; } /* * SISetProcStateInvalid * Flush pending messages from buffer, assert reset flag for each backend * * This is used only to recover from SI buffer overflow. */ static void SISetProcStateInvalid(SISeg *segP) { int i; segP->minMsgNum = 0; segP->maxMsgNum = 0; for (i = 0; i < segP->lastBackend; i++) { if (segP->procState[i].nextMsgNum >= 0) /* active backend? */ { segP->procState[i].resetState = true; segP->procState[i].nextMsgNum = 0; } } } /* * SIGetDataEntry * get next SI message for specified backend, if there is one * * Possible return values: * 0: no SI message available * 1: next SI message has been extracted into *data * (there may be more messages available after this one!) * -1: SI reset message extracted * * NB: this can run in parallel with other instances of SIGetDataEntry * executing on behalf of other backends. See comments in sinval.c in * ReceiveSharedInvalidMessages(). */ int SIGetDataEntry(SISeg *segP, int backendId, SharedInvalidationMessage *data) { ProcState *stateP = &segP->procState[backendId - 1]; if (stateP->resetState) { /* * Force reset. We can say we have dealt with any messages added * since the reset, as well... */ stateP->resetState = false; stateP->nextMsgNum = segP->maxMsgNum; return -1; } if (stateP->nextMsgNum >= segP->maxMsgNum) return 0; /* nothing to read */ /* * Retrieve message and advance my counter. */ *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; stateP->nextMsgNum++; /* * There may be other backends that haven't read the message, so we * cannot delete it here. SIDelExpiredDataEntries() should be called * to remove dead messages. */ return 1; /* got a message */ } /* * SIDelExpiredDataEntries * Remove messages that have been consumed by all active backends */ void SIDelExpiredDataEntries(SISeg *segP) { int min, i, h; min = segP->maxMsgNum; if (min == segP->minMsgNum) return; /* fast path if no messages exist */ /* Recompute minMsgNum = minimum of all backends' nextMsgNum */ for (i = 0; i < segP->lastBackend; i++) { h = segP->procState[i].nextMsgNum; if (h >= 0) { /* backend active */ if (h < min) min = h; } } segP->minMsgNum = min; /* * When minMsgNum gets really large, decrement all message counters so * as to forestall overflow of the counters. */ if (min >= MSGNUMWRAPAROUND) { segP->minMsgNum -= MSGNUMWRAPAROUND; segP->maxMsgNum -= MSGNUMWRAPAROUND; for (i = 0; i < segP->lastBackend; i++) { if (segP->procState[i].nextMsgNum >= 0) segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; } } }