1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-31 17:02:12 +03:00

Split the shared-memory array of PGPROC pointers out of the sinval

communication structure, and make it its own module with its own lock.
This should reduce contention at least a little, and it definitely makes
the code seem cleaner.  Per my recent proposal.
This commit is contained in:
Tom Lane
2005-05-19 21:35:48 +00:00
parent 6910032a56
commit ee3b71f6bc
30 changed files with 923 additions and 860 deletions

View File

@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.75 2004/12/31 22:00:56 pgsql Exp $
* $PostgreSQL: pgsql/src/backend/storage/ipc/sinval.c,v 1.76 2005/05/19 21:35:46 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -16,41 +16,17 @@
#include <signal.h>
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/xact.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "storage/backendid.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/sinval.h"
#include "storage/sinvaladt.h"
#include "utils/inval.h"
#include "utils/tqual.h"
#include "miscadmin.h"
#ifdef XIDCACHE_DEBUG
/* counters for XidCache measurement */
static long xc_by_recent_xmin = 0;
static long xc_by_main_xid = 0;
static long xc_by_child_xid = 0;
static long xc_slow_answer = 0;
#define xc_by_recent_xmin_inc() (xc_by_recent_xmin++)
#define xc_by_main_xid_inc() (xc_by_main_xid++)
#define xc_by_child_xid_inc() (xc_by_child_xid++)
#define xc_slow_answer_inc() (xc_slow_answer++)
static void DisplayXidCache(int code, Datum arg);
#else /* !XIDCACHE_DEBUG */
#define xc_by_recent_xmin_inc() ((void) 0)
#define xc_by_main_xid_inc() ((void) 0)
#define xc_by_child_xid_inc() ((void) 0)
#define xc_slow_answer_inc() ((void) 0)
#endif /* XIDCACHE_DEBUG */
/*
* Because backends sitting idle will not be reading sinval events, we
* need a way to give an idle backend a swift kick in the rear and make
@@ -103,10 +79,6 @@ InitBackendSharedInvalidationState(void)
ereport(FATAL,
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
#ifdef XIDCACHE_DEBUG
on_proc_exit(DisplayXidCache, (Datum) 0);
#endif /* XIDCACHE_DEBUG */
}
/*
@@ -161,12 +133,6 @@ ReceiveSharedInvalidMessages(
* this is not exactly the normal (read-only) interpretation of a
* shared lock! Look closely at the interactions before allowing
* SInvalLock to be grabbed in shared mode for any other reason!
*
* The routines later in this file that use shared mode are okay with
* this, because they aren't looking at the ProcState fields
* associated with SI message transfer; they only use the
* ProcState array as an easy way to find all the PGPROC
* structures.
*/
LWLockAcquire(SInvalLock, LW_SHARED);
getResult = SIGetDataEntry(shmInvalBuffer, MyBackendId, &data);
@@ -391,725 +357,3 @@ ProcessCatchupEvent(void)
if (notify_enabled)
EnableNotifyInterrupt();
}
/****************************************************************************/
/* Functions that need to scan the PGPROC structures of all running backends. */
/* It's a bit strange to keep these in sinval.c, since they don't have any */
/* direct relationship to shared-cache invalidation. But the procState */
/* array in the SI segment is the only place in the system where we have */
/* an array of per-backend data, so it is the most convenient place to keep */
/* pointers to the backends' PGPROC structures. We used to implement these */
/* functions with a slow, ugly search through the ShmemIndex hash table --- */
/* now they are simple loops over the SI ProcState array. */
/****************************************************************************/
/*
* DatabaseHasActiveBackends -- are there any backends running in the given DB
*
* If 'ignoreMyself' is TRUE, ignore this particular backend while checking
* for backends in the target database.
*
* This function is used to interlock DROP DATABASE against there being
* any active backends in the target DB --- dropping the DB while active
* backends remain would be a Bad Thing. Note that we cannot detect here
* the possibility of a newly-started backend that is trying to connect
* to the doomed database, so additional interlocking is needed during
* backend startup.
*/
bool
DatabaseHasActiveBackends(Oid databaseId, bool ignoreMyself)
{
bool result = false;
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int index;
LWLockAcquire(SInvalLock, LW_SHARED);
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
if (proc->databaseId == databaseId)
{
if (ignoreMyself && proc == MyProc)
continue;
result = true;
break;
}
}
}
LWLockRelease(SInvalLock);
return result;
}
/*
* IsBackendPid -- is a given pid a running backend
*/
bool
IsBackendPid(int pid)
{
bool result = false;
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int index;
LWLockAcquire(SInvalLock, LW_SHARED);
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
if (proc->pid == pid)
{
result = true;
break;
}
}
}
LWLockRelease(SInvalLock);
return result;
}
/*
* TransactionIdIsInProgress -- is given transaction running in some backend
*
* There are three possibilities for finding a running transaction:
*
* 1. the given Xid is a main transaction Id. We will find this out cheaply
* by looking at the PGPROC struct for each backend.
*
* 2. the given Xid is one of the cached subxact Xids in the PGPROC array.
* We can find this out cheaply too.
*
* 3. Search the SubTrans tree to find the Xid's topmost parent, and then
* see if that is running according to PGPROC. This is the slowest, but
* sadly it has to be done always if the other two failed, unless we see
* that the cached subxact sets are complete (none have overflowed).
*
* SInvalLock has to be held while we do 1 and 2. If we save the top Xids
* while doing 1, we can release the SInvalLock while we do 3. This buys back
* some concurrency (we can't retrieve the main Xids from PGPROC again anyway;
* see GetNewTransactionId).
*/
bool
TransactionIdIsInProgress(TransactionId xid)
{
bool result = false;
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int i,
j;
int nxids = 0;
TransactionId *xids;
TransactionId topxid;
bool locked;
/*
* Don't bother checking a transaction older than RecentXmin; it
* could not possibly still be running.
*/
if (TransactionIdPrecedes(xid, RecentXmin))
{
xc_by_recent_xmin_inc();
return false;
}
/* Get workspace to remember main XIDs in */
xids = (TransactionId *) palloc(sizeof(TransactionId) * segP->maxBackends);
LWLockAcquire(SInvalLock, LW_SHARED);
locked = true;
for (i = 0; i < segP->lastBackend; i++)
{
SHMEM_OFFSET pOffset = stateP[i].procStruct;
if (pOffset != INVALID_OFFSET)
{
PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
/* Fetch xid just once - see GetNewTransactionId */
TransactionId pxid = proc->xid;
if (!TransactionIdIsValid(pxid))
continue;
/*
* Step 1: check the main Xid
*/
if (TransactionIdEquals(pxid, xid))
{
xc_by_main_xid_inc();
result = true;
goto result_known;
}
/*
* We can ignore main Xids that are younger than the target
* Xid, since the target could not possibly be their child.
*/
if (TransactionIdPrecedes(xid, pxid))
continue;
/*
* Step 2: check the cached child-Xids arrays
*/
for (j = proc->subxids.nxids - 1; j >= 0; j--)
{
/* Fetch xid just once - see GetNewTransactionId */
TransactionId cxid = proc->subxids.xids[j];
if (TransactionIdEquals(cxid, xid))
{
xc_by_child_xid_inc();
result = true;
goto result_known;
}
}
/*
* Save the main Xid for step 3. We only need to remember
* main Xids that have uncached children. (Note: there is no
* race condition here because the overflowed flag cannot be
* cleared, only set, while we hold SInvalLock. So we can't
* miss an Xid that we need to worry about.)
*/
if (proc->subxids.overflowed)
xids[nxids++] = pxid;
}
}
LWLockRelease(SInvalLock);
locked = false;
/*
* If none of the relevant caches overflowed, we know the Xid is not
* running without looking at pg_subtrans.
*/
if (nxids == 0)
goto result_known;
/*
* Step 3: have to check pg_subtrans.
*
* At this point, we know it's either a subtransaction of one of the Xids
* in xids[], or it's not running. If it's an already-failed
* subtransaction, we want to say "not running" even though its parent
* may still be running. So first, check pg_clog to see if it's been
* aborted.
*/
xc_slow_answer_inc();
if (TransactionIdDidAbort(xid))
goto result_known;
/*
* It isn't aborted, so check whether the transaction tree it belongs
* to is still running (or, more precisely, whether it was running
* when this routine started -- note that we already released
* SInvalLock).
*/
topxid = SubTransGetTopmostTransaction(xid);
Assert(TransactionIdIsValid(topxid));
if (!TransactionIdEquals(topxid, xid))
{
for (i = 0; i < nxids; i++)
{
if (TransactionIdEquals(xids[i], topxid))
{
result = true;
break;
}
}
}
result_known:
if (locked)
LWLockRelease(SInvalLock);
pfree(xids);
return result;
}
/*
* GetOldestXmin -- returns oldest transaction that was running
* when any current transaction was started.
*
* If allDbs is TRUE then all backends are considered; if allDbs is FALSE
* then only backends running in my own database are considered.
*
* This is used by VACUUM to decide which deleted tuples must be preserved
* in a table. allDbs = TRUE is needed for shared relations, but allDbs =
* FALSE is sufficient for non-shared relations, since only backends in my
* own database could ever see the tuples in them.
*
* This is also used to determine where to truncate pg_subtrans. allDbs
* must be TRUE for that case.
*
* Note: we include the currently running xids in the set of considered xids.
* This ensures that if a just-started xact has not yet set its snapshot,
* when it does set the snapshot it cannot set xmin less than what we compute.
*/
TransactionId
GetOldestXmin(bool allDbs)
{
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
TransactionId result;
int index;
/*
* Normally we start the min() calculation with our own XID. But if
* called by checkpointer, we will not be inside a transaction, so use
* next XID as starting point for min() calculation. (Note that if
* there are no xacts running at all, that will be the subtrans
* truncation point!)
*/
if (IsTransactionState())
result = GetTopTransactionId();
else
result = ReadNewTransactionId();
LWLockAcquire(SInvalLock, LW_SHARED);
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
if (allDbs || proc->databaseId == MyDatabaseId)
{
/* Fetch xid just once - see GetNewTransactionId */
TransactionId xid = proc->xid;
if (TransactionIdIsNormal(xid))
{
if (TransactionIdPrecedes(xid, result))
result = xid;
xid = proc->xmin;
if (TransactionIdIsNormal(xid))
if (TransactionIdPrecedes(xid, result))
result = xid;
}
}
}
}
LWLockRelease(SInvalLock);
return result;
}
/*----------
* GetSnapshotData -- returns information about running transactions.
*
* The returned snapshot includes xmin (lowest still-running xact ID),
* xmax (next xact ID to be assigned), and a list of running xact IDs
* in the range xmin <= xid < xmax. It is used as follows:
* All xact IDs < xmin are considered finished.
* All xact IDs >= xmax are considered still running.
* For an xact ID xmin <= xid < xmax, consult list to see whether
* it is considered running or not.
* This ensures that the set of transactions seen as "running" by the
* current xact will not change after it takes the snapshot.
*
* Note that only top-level XIDs are included in the snapshot. We can
* still apply the xmin and xmax limits to subtransaction XIDs, but we
* need to work a bit harder to see if XIDs in [xmin..xmax) are running.
*
* We also update the following backend-global variables:
* TransactionXmin: the oldest xmin of any snapshot in use in the
* current transaction (this is the same as MyProc->xmin). This
* is just the xmin computed for the first, serializable snapshot.
* RecentXmin: the xmin computed for the most recent snapshot. XIDs
* older than this are known not running any more.
* RecentGlobalXmin: the global xmin (oldest TransactionXmin across all
* running transactions). This is the same computation done by
* GetOldestXmin(TRUE).
*----------
*/
Snapshot
GetSnapshotData(Snapshot snapshot, bool serializable)
{
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
TransactionId xmin;
TransactionId xmax;
TransactionId globalxmin;
int index;
int count = 0;
Assert(snapshot != NULL);
/* Serializable snapshot must be computed before any other... */
Assert(serializable ?
!TransactionIdIsValid(MyProc->xmin) :
TransactionIdIsValid(MyProc->xmin));
/*
* Allocating space for MaxBackends xids is usually overkill;
* lastBackend would be sufficient. But it seems better to do the
* malloc while not holding the lock, so we can't look at lastBackend.
*
* This does open a possibility for avoiding repeated malloc/free: since
* MaxBackends does not change at runtime, we can simply reuse the
* previous xip array if any. (This relies on the fact that all
* callers pass static SnapshotData structs.)
*/
if (snapshot->xip == NULL)
{
/*
* First call for this snapshot
*/
snapshot->xip = (TransactionId *)
malloc(MaxBackends * sizeof(TransactionId));
if (snapshot->xip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
globalxmin = xmin = GetTopTransactionId();
/*
* If we are going to set MyProc->xmin then we'd better get exclusive
* lock; if not, this is a read-only operation so it can be shared.
*/
LWLockAcquire(SInvalLock, serializable ? LW_EXCLUSIVE : LW_SHARED);
/*--------------------
* Unfortunately, we have to call ReadNewTransactionId() after acquiring
* SInvalLock above. It's not good because ReadNewTransactionId() does
* LWLockAcquire(XidGenLock), but *necessary*. We need to be sure that
* no transactions exit the set of currently-running transactions
* between the time we fetch xmax and the time we finish building our
* snapshot. Otherwise we could have a situation like this:
*
* 1. Tx Old is running (in Read Committed mode).
* 2. Tx S reads new transaction ID into xmax, then
* is swapped out before acquiring SInvalLock.
* 3. Tx New gets new transaction ID (>= S' xmax),
* makes changes and commits.
* 4. Tx Old changes some row R changed by Tx New and commits.
* 5. Tx S finishes getting its snapshot data. It sees Tx Old as
* done, but sees Tx New as still running (since New >= xmax).
*
* Now S will see R changed by both Tx Old and Tx New, *but* does not
* see other changes made by Tx New. If S is supposed to be in
* Serializable mode, this is wrong.
*
* By locking SInvalLock before we read xmax, we ensure that TX Old
* cannot exit the set of running transactions seen by Tx S. Therefore
* both Old and New will be seen as still running => no inconsistency.
*--------------------
*/
xmax = ReadNewTransactionId();
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
/* Fetch xid just once - see GetNewTransactionId */
TransactionId xid = proc->xid;
/*
* Ignore my own proc (dealt with my xid above), procs not
* running a transaction, and xacts started since we read the
* next transaction ID. There's no need to store XIDs above
* what we got from ReadNewTransactionId, since we'll treat
* them as running anyway. We also assume that such xacts
* can't compute an xmin older than ours, so they needn't be
* considered in computing globalxmin.
*/
if (proc == MyProc ||
!TransactionIdIsNormal(xid) ||
TransactionIdFollowsOrEquals(xid, xmax))
continue;
if (TransactionIdPrecedes(xid, xmin))
xmin = xid;
snapshot->xip[count] = xid;
count++;
/* Update globalxmin to be the smallest valid xmin */
xid = proc->xmin;
if (TransactionIdIsNormal(xid))
if (TransactionIdPrecedes(xid, globalxmin))
globalxmin = xid;
}
}
if (serializable)
MyProc->xmin = TransactionXmin = xmin;
LWLockRelease(SInvalLock);
/*
* Update globalxmin to include actual process xids. This is a
* slightly different way of computing it than GetOldestXmin uses, but
* should give the same result.
*/
if (TransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;
/* Update global variables too */
RecentGlobalXmin = globalxmin;
RecentXmin = xmin;
snapshot->xmin = xmin;
snapshot->xmax = xmax;
snapshot->xcnt = count;
snapshot->curcid = GetCurrentCommandId();
return snapshot;
}
/*
* CountActiveBackends --- count backends (other than myself) that are in
* active transactions. This is used as a heuristic to decide if
* a pre-XLOG-flush delay is worthwhile during commit.
*
* An active transaction is something that has written at least one XLOG
* record; read-only transactions don't count. Also, do not count backends
* that are blocked waiting for locks, since they are not going to get to
* run until someone else commits.
*/
int
CountActiveBackends(void)
{
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
int count = 0;
int index;
/*
* Note: for speed, we don't acquire SInvalLock. This is a little bit
* bogus, but since we are only testing xrecoff for zero or nonzero,
* it should be OK. The result is only used for heuristic purposes
* anyway...
*/
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
if (proc == MyProc)
continue; /* do not count myself */
if (proc->logRec.xrecoff == 0)
continue; /* do not count if not in a transaction */
if (proc->waitLock != NULL)
continue; /* do not count if blocked on a lock */
count++;
}
}
return count;
}
#ifdef NOT_USED
/*
* GetUndoRecPtr -- returns oldest PGPROC->logRec.
*/
XLogRecPtr
GetUndoRecPtr(void)
{
SISeg *segP = shmInvalBuffer;
ProcState *stateP = segP->procState;
XLogRecPtr urec = {0, 0};
XLogRecPtr tempr;
int index;
LWLockAcquire(SInvalLock, LW_SHARED);
for (index = 0; index < segP->lastBackend; index++)
{
SHMEM_OFFSET pOffset = stateP[index].procStruct;
if (pOffset != INVALID_OFFSET)
{
PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
tempr = proc->logRec;
if (tempr.xrecoff == 0)
continue;
if (urec.xrecoff != 0 && XLByteLT(urec, tempr))
continue;
urec = tempr;
}
}
LWLockRelease(SInvalLock);
return (urec);
}
#endif /* NOT_USED */
/*
* BackendIdGetProc - given a BackendId, find its PGPROC structure
*
* This is a trivial lookup in the ProcState array. We assume that the caller
* knows that the backend isn't going to go away, so we do not bother with
* locking.
*/
struct PGPROC *
BackendIdGetProc(BackendId procId)
{
SISeg *segP = shmInvalBuffer;
if (procId > 0 && procId <= segP->lastBackend)
{
ProcState *stateP = &segP->procState[procId - 1];
SHMEM_OFFSET pOffset = stateP->procStruct;
if (pOffset != INVALID_OFFSET)
{
PGPROC *proc = (PGPROC *) MAKE_PTR(pOffset);
return proc;
}
}
return NULL;
}
/*
* CountEmptyBackendSlots - count empty slots in backend process table
*
* We don't actually need to count, since sinvaladt.c maintains a
* freeBackends counter in the SI segment.
*
* Acquiring the lock here is almost certainly overkill, but just in
* case fetching an int is not atomic on your machine ...
*/
int
CountEmptyBackendSlots(void)
{
int count;
LWLockAcquire(SInvalLock, LW_SHARED);
count = shmInvalBuffer->freeBackends;
LWLockRelease(SInvalLock);
return count;
}
#define XidCacheRemove(i) \
do { \
MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProc->subxids.nxids - 1]; \
MyProc->subxids.nxids--; \
} while (0)
/*
* XidCacheRemoveRunningXids
*
* Remove a bunch of TransactionIds from the list of known-running
* subtransactions for my backend. Both the specified xid and those in
* the xids[] array (of length nxids) are removed from the subxids cache.
*/
void
XidCacheRemoveRunningXids(TransactionId xid, int nxids, TransactionId *xids)
{
int i,
j;
Assert(!TransactionIdEquals(xid, InvalidTransactionId));
/*
* We must hold SInvalLock exclusively in order to remove transactions
* from the PGPROC array. (See notes in GetSnapshotData.) It's
* possible this could be relaxed since we know this routine is only
* used to abort subtransactions, but pending closer analysis we'd
* best be conservative.
*/
LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
/*
* Under normal circumstances xid and xids[] will be in increasing
* order, as will be the entries in subxids. Scan backwards to avoid
* O(N^2) behavior when removing a lot of xids.
*/
for (i = nxids - 1; i >= 0; i--)
{
TransactionId anxid = xids[i];
for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
{
if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
{
XidCacheRemove(j);
break;
}
}
/*
* Ordinarily we should have found it, unless the cache has overflowed.
* However it's also possible for this routine to be invoked multiple
* times for the same subtransaction, in case of an error during
* AbortSubTransaction. So instead of Assert, emit a debug warning.
*/
if (j < 0 && !MyProc->subxids.overflowed)
elog(WARNING, "did not find subXID %u in MyProc", anxid);
}
for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
{
if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
{
XidCacheRemove(j);
break;
}
}
/* Ordinarily we should have found it, unless the cache has overflowed */
if (j < 0 && !MyProc->subxids.overflowed)
elog(WARNING, "did not find subXID %u in MyProc", xid);
LWLockRelease(SInvalLock);
}
#ifdef XIDCACHE_DEBUG
/*
* on_proc_exit hook to print stats about effectiveness of XID cache
*/
static void
DisplayXidCache(int code, Datum arg)
{
fprintf(stderr,
"XidCache: xmin: %ld, mainxid: %ld, childxid: %ld, slow: %ld\n",
xc_by_recent_xmin,
xc_by_main_xid,
xc_by_child_xid,
xc_slow_answer);
}
#endif /* XIDCACHE_DEBUG */