1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-24 01:29:19 +03:00

Redefine backend ID to be an index into the proc array

Previously, backend ID was an index into the ProcState array, in the
shared cache invalidation manager (sinvaladt.c). The entry in the
ProcState array was reserved at backend startup by scanning the array
for a free entry, and that was also when the backend got its backend
ID. Things become slightly simpler if we redefine backend ID to be the
index into the PGPROC array, and directly use it also as an index to
the ProcState array. This uses a little more memory, as we reserve a
few extra slots in the ProcState array for aux processes that don't
need them, but the simplicity is worth it.

Aux processes now also have a backend ID. This simplifies the
reservation of BackendStatusArray and ProcSignal slots.

You can now convert a backend ID into an index into the PGPROC array
simply by subtracting 1. We still use 0-based "pgprocnos" in various
places, for indexes into the PGPROC array, but the only difference now
is that backend IDs start at 1 while pgprocnos start at 0. (The next
commmit will get rid of the term "backend ID" altogether and make
everything 0-based.)

There is still a 'backendId' field in PGPROC, now part of 'vxid' which
encapsulates the backend ID and local transaction ID together. It's
needed for prepared xacts. For regular backends, the backendId is
always equal to pgprocno + 1, but for prepared xact PGPROC entries,
it's the ID of the original backend that processed the transaction.

Reviewed-by: Andres Freund, Reid Thompson
Discussion: https://www.postgresql.org/message-id/8171f1aa-496f-46a6-afc3-c46fe7a9b407@iki.fi
This commit is contained in:
Heikki Linnakangas
2024-03-03 19:37:28 +02:00
parent 30b8d6e4ce
commit ab355e3a88
28 changed files with 282 additions and 322 deletions

View File

@@ -151,7 +151,6 @@ typedef struct GlobalTransactionData
{ {
GlobalTransaction next; /* list link for free list */ GlobalTransaction next; /* list link for free list */
int pgprocno; /* ID of associated dummy PGPROC */ int pgprocno; /* ID of associated dummy PGPROC */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */ TimestampTz prepared_at; /* time of preparation */
/* /*
@@ -285,20 +284,6 @@ TwoPhaseShmemInit(void)
/* associate it with a PGPROC assigned by InitProcGlobal */ /* associate it with a PGPROC assigned by InitProcGlobal */
gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]); gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]);
/*
* Assign a unique ID for each dummy proc, so that the range of
* dummy backend IDs immediately follows the range of normal
* backend IDs. We don't dare to assign a real backend ID to dummy
* procs, because prepared transactions don't take part in cache
* invalidation like a real backend ID would imply, but having a
* unique ID for them is nevertheless handy. This arrangement
* allows you to allocate an array of size (MaxBackends +
* max_prepared_xacts + 1), and have a slot for every backend and
* prepared transaction. Currently multixact.c uses that
* technique.
*/
gxacts[i].dummyBackendId = MaxBackends + 1 + i;
} }
} }
else else
@@ -457,24 +442,24 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
Assert(gxact != NULL); Assert(gxact != NULL);
proc = &ProcGlobal->allProcs[gxact->pgprocno]; proc = GetPGProcByNumber(gxact->pgprocno);
/* Initialize the PGPROC entry */ /* Initialize the PGPROC entry */
MemSet(proc, 0, sizeof(PGPROC)); MemSet(proc, 0, sizeof(PGPROC));
dlist_node_init(&proc->links); dlist_node_init(&proc->links);
proc->waitStatus = PROC_WAIT_STATUS_OK; proc->waitStatus = PROC_WAIT_STATUS_OK;
if (LocalTransactionIdIsValid(MyProc->lxid)) if (LocalTransactionIdIsValid(MyProc->vxid.lxid))
{ {
/* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */ /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
proc->lxid = MyProc->lxid; proc->vxid.lxid = MyProc->vxid.lxid;
proc->backendId = MyBackendId; proc->vxid.backendId = MyBackendId;
} }
else else
{ {
Assert(AmStartupProcess() || !IsPostmasterEnvironment); Assert(AmStartupProcess() || !IsPostmasterEnvironment);
/* GetLockConflicts() uses this to specify a wait on the XID */ /* GetLockConflicts() uses this to specify a wait on the XID */
proc->lxid = xid; proc->vxid.lxid = xid;
proc->backendId = InvalidBackendId; proc->vxid.backendId = InvalidBackendId;
} }
proc->xid = xid; proc->xid = xid;
Assert(proc->xmin == InvalidTransactionId); Assert(proc->xmin == InvalidTransactionId);
@@ -522,7 +507,7 @@ static void
GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
TransactionId *children) TransactionId *children)
{ {
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno]; PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
/* We need no extra lock since the GXACT isn't valid yet */ /* We need no extra lock since the GXACT isn't valid yet */
if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS) if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
@@ -559,7 +544,7 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
* Put it into the global ProcArray so TransactionIdIsInProgress considers * Put it into the global ProcArray so TransactionIdIsInProgress considers
* the XID as still running. * the XID as still running.
*/ */
ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]); ProcArrayAdd(GetPGProcByNumber(gxact->pgprocno));
} }
/* /*
@@ -583,7 +568,7 @@ LockGXact(const char *gid, Oid user)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++) for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{ {
GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno]; PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
/* Ignore not-yet-valid GIDs */ /* Ignore not-yet-valid GIDs */
if (!gxact->valid) if (!gxact->valid)
@@ -884,7 +869,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
if (!gxact->valid) if (!gxact->valid)
continue; continue;
proc = &ProcGlobal->allProcs[gxact->pgprocno]; proc = GetPGProcByNumber(gxact->pgprocno);
GET_VXID_FROM_PGPROC(proc_vxid, *proc); GET_VXID_FROM_PGPROC(proc_vxid, *proc);
if (VirtualTransactionIdEquals(vxid, proc_vxid)) if (VirtualTransactionIdEquals(vxid, proc_vxid))
{ {
@@ -919,7 +904,7 @@ TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
{ {
GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held); GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
return gxact->dummyBackendId; return gxact->pgprocno + 1;
} }
/* /*

View File

@@ -600,9 +600,9 @@ GetStableLatestTransactionId(void)
static LocalTransactionId lxid = InvalidLocalTransactionId; static LocalTransactionId lxid = InvalidLocalTransactionId;
static TransactionId stablexid = InvalidTransactionId; static TransactionId stablexid = InvalidTransactionId;
if (lxid != MyProc->lxid) if (lxid != MyProc->vxid.lxid)
{ {
lxid = MyProc->lxid; lxid = MyProc->vxid.lxid;
stablexid = GetTopTransactionIdIfAny(); stablexid = GetTopTransactionIdIfAny();
if (!TransactionIdIsValid(stablexid)) if (!TransactionIdIsValid(stablexid))
stablexid = ReadNextTransactionId(); stablexid = ReadNextTransactionId();
@@ -2099,8 +2099,8 @@ StartTransaction(void)
* Advertise it in the proc array. We assume assignment of * Advertise it in the proc array. We assume assignment of
* localTransactionId is atomic, and the backendId should be set already. * localTransactionId is atomic, and the backendId should be set already.
*/ */
Assert(MyProc->backendId == vxid.backendId); Assert(MyProc->vxid.backendId == vxid.backendId);
MyProc->lxid = vxid.localTransactionId; MyProc->vxid.lxid = vxid.localTransactionId;
TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId); TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);
@@ -2289,7 +2289,7 @@ CommitTransaction(void)
ParallelWorkerReportLastRecEnd(XactLastRecEnd); ParallelWorkerReportLastRecEnd(XactLastRecEnd);
} }
TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid); TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->vxid.lxid);
/* /*
* Let others know about no transaction in progress by me. Note that this * Let others know about no transaction in progress by me. Note that this
@@ -2840,7 +2840,7 @@ AbortTransaction(void)
XLogSetAsyncXactLSN(XactLastRecEnd); XLogSetAsyncXactLSN(XactLastRecEnd);
} }
TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->vxid.lxid);
/* /*
* Let others know about no transaction in progress by me. Note that this * Let others know about no transaction in progress by me. Note that this

View File

@@ -49,7 +49,7 @@
#include "parser/parse_func.h" #include "parser/parse_func.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "storage/sinvaladt.h" #include "storage/procarray.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/catcache.h" #include "utils/catcache.h"

View File

@@ -1077,7 +1077,7 @@ setval3_oid(PG_FUNCTION_ARGS)
static Relation static Relation
lock_and_open_sequence(SeqTable seq) lock_and_open_sequence(SeqTable seq)
{ {
LocalTransactionId thislxid = MyProc->lxid; LocalTransactionId thislxid = MyProc->vxid.lxid;
/* Get the lock if not already held in this xact */ /* Get the lock if not already held in this xact */
if (seq->lxid != thislxid) if (seq->lxid != thislxid)

View File

@@ -799,7 +799,7 @@ init_sql_fcache(FunctionCallInfo fcinfo, Oid collation, bool lazyEvalOK)
lazyEvalOK); lazyEvalOK);
/* Mark fcache with time of creation to show it's valid */ /* Mark fcache with time of creation to show it's valid */
fcache->lxid = MyProc->lxid; fcache->lxid = MyProc->vxid.lxid;
fcache->subxid = GetCurrentSubTransactionId(); fcache->subxid = GetCurrentSubTransactionId();
ReleaseSysCache(procedureTuple); ReleaseSysCache(procedureTuple);
@@ -1081,7 +1081,7 @@ fmgr_sql(PG_FUNCTION_ARGS)
if (fcache != NULL) if (fcache != NULL)
{ {
if (fcache->lxid != MyProc->lxid || if (fcache->lxid != MyProc->vxid.lxid ||
!SubTransactionIsActive(fcache->subxid)) !SubTransactionIsActive(fcache->subxid))
{ {
/* It's stale; unlink and delete */ /* It's stale; unlink and delete */

View File

@@ -107,17 +107,7 @@ AuxiliaryProcessMain(AuxProcType auxtype)
BaseInit(); BaseInit();
/* ProcSignalInit();
* Assign the ProcSignalSlot for an auxiliary process. Since it doesn't
* have a BackendId, the slot is statically allocated based on the
* auxiliary process type (MyAuxProcType). Backends use slots indexed in
* the range from 1 to MaxBackends (inclusive), so we use MaxBackends +
* AuxProcType + 1 as the index of the slot for an auxiliary process.
*
* This will need rethinking if we ever want more than one of a particular
* auxiliary process type.
*/
ProcSignalInit(MaxBackends + MyAuxProcType + 1);
/* /*
* Auxiliary processes don't run transactions, but they may need a * Auxiliary processes don't run transactions, but they may need a

View File

@@ -701,7 +701,7 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
Assert(proc->subxidStatus.count == 0); Assert(proc->subxidStatus.count == 0);
Assert(!proc->subxidStatus.overflowed); Assert(!proc->subxidStatus.overflowed);
proc->lxid = InvalidLocalTransactionId; proc->vxid.lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId; proc->xmin = InvalidTransactionId;
/* be sure this is cleared in abort */ /* be sure this is cleared in abort */
@@ -743,7 +743,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
ProcGlobal->xids[pgxactoff] = InvalidTransactionId; ProcGlobal->xids[pgxactoff] = InvalidTransactionId;
proc->xid = InvalidTransactionId; proc->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId; proc->vxid.lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId; proc->xmin = InvalidTransactionId;
/* be sure this is cleared in abort */ /* be sure this is cleared in abort */
@@ -930,7 +930,7 @@ ProcArrayClearTransaction(PGPROC *proc)
ProcGlobal->xids[pgxactoff] = InvalidTransactionId; ProcGlobal->xids[pgxactoff] = InvalidTransactionId;
proc->xid = InvalidTransactionId; proc->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId; proc->vxid.lxid = InvalidLocalTransactionId;
proc->xmin = InvalidTransactionId; proc->xmin = InvalidTransactionId;
proc->recoveryConflictPending = false; proc->recoveryConflictPending = false;
@@ -2536,6 +2536,11 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
/* Get lock so source xact can't end while we're doing this */ /* Get lock so source xact can't end while we're doing this */
LWLockAcquire(ProcArrayLock, LW_SHARED); LWLockAcquire(ProcArrayLock, LW_SHARED);
/*
* Find the PGPROC entry of the source transaction. (This could use
* GetPGProcByBackendId(), unless it's a prepared xact. But this isn't
* performance critical.)
*/
for (index = 0; index < arrayP->numProcs; index++) for (index = 0; index < arrayP->numProcs; index++)
{ {
int pgprocno = arrayP->pgprocnos[index]; int pgprocno = arrayP->pgprocnos[index];
@@ -2548,9 +2553,9 @@ ProcArrayInstallImportedXmin(TransactionId xmin,
continue; continue;
/* We are only interested in the specific virtual transaction. */ /* We are only interested in the specific virtual transaction. */
if (proc->backendId != sourcevxid->backendId) if (proc->vxid.backendId != sourcevxid->backendId)
continue; continue;
if (proc->lxid != sourcevxid->localTransactionId) if (proc->vxid.lxid != sourcevxid->localTransactionId)
continue; continue;
/* /*
@@ -3099,6 +3104,64 @@ HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids, int type)
return result; return result;
} }
/*
* BackendIdGetProc -- get a backend's PGPROC given its backend ID
*
* The result may be out of date arbitrarily quickly, so the caller
* must be careful about how this information is used. NULL is
* returned if the backend is not active.
*/
PGPROC *
BackendIdGetProc(int backendID)
{
PGPROC *result;
if (backendID < 1 || backendID > ProcGlobal->allProcCount)
return NULL;
result = GetPGProcByBackendId(backendID);
if (result->pid == 0)
return NULL;
return result;
}
/*
* BackendIdGetTransactionIds -- get a backend's transaction status
*
* Get the xid, xmin, nsubxid and overflow status of the backend. The
* result may be out of date arbitrarily quickly, so the caller must be
* careful about how this information is used.
*/
void
BackendIdGetTransactionIds(int backendID, TransactionId *xid,
TransactionId *xmin, int *nsubxid, bool *overflowed)
{
PGPROC *proc;
*xid = InvalidTransactionId;
*xmin = InvalidTransactionId;
*nsubxid = 0;
*overflowed = false;
if (backendID < 1 || backendID > ProcGlobal->allProcCount)
return;
proc = GetPGProcByBackendId(backendID);
/* Need to lock out additions/removals of backends */
LWLockAcquire(ProcArrayLock, LW_SHARED);
if (proc->pid != 0)
{
*xid = proc->xid;
*xmin = proc->xmin;
*nsubxid = proc->subxidStatus.count;
*overflowed = proc->subxidStatus.overflowed;
}
LWLockRelease(ProcArrayLock);
}
/* /*
* BackendPidGetProc -- get a backend's PGPROC given its PID * BackendPidGetProc -- get a backend's PGPROC given its PID
* *

View File

@@ -87,7 +87,7 @@ typedef struct
* possible auxiliary process type. (This scheme assumes there is not * possible auxiliary process type. (This scheme assumes there is not
* more than one of any auxiliary process type at a time.) * more than one of any auxiliary process type at a time.)
*/ */
#define NumProcSignalSlots (MaxBackends + NUM_AUXPROCTYPES) #define NumProcSignalSlots (MaxBackends + NUM_AUXILIARY_PROCS)
/* Check whether the relevant type bit is set in the flags. */ /* Check whether the relevant type bit is set in the flags. */
#define BARRIER_SHOULD_CHECK(flags, type) \ #define BARRIER_SHOULD_CHECK(flags, type) \
@@ -154,24 +154,23 @@ ProcSignalShmemInit(void)
/* /*
* ProcSignalInit * ProcSignalInit
* Register the current process in the ProcSignal array * Register the current process in the ProcSignal array
*
* The passed index should be my BackendId if the process has one,
* or MaxBackends + aux process type if not.
*/ */
void void
ProcSignalInit(int pss_idx) ProcSignalInit(void)
{ {
ProcSignalSlot *slot; ProcSignalSlot *slot;
uint64 barrier_generation; uint64 barrier_generation;
Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots); if (MyBackendId <= 0)
elog(ERROR, "MyBackendId not set");
slot = &ProcSignal->psh_slot[pss_idx - 1]; if (MyBackendId > NumProcSignalSlots)
elog(ERROR, "unexpected MyBackendId %d in ProcSignalInit (max %d)", MyBackendId, NumProcSignalSlots);
slot = &ProcSignal->psh_slot[MyBackendId - 1];
/* sanity check */ /* sanity check */
if (slot->pss_pid != 0) if (slot->pss_pid != 0)
elog(LOG, "process %d taking over ProcSignal slot %d, but it's not empty", elog(LOG, "process %d taking over ProcSignal slot %d, but it's not empty",
MyProcPid, pss_idx); MyProcPid, MyBackendId - 1);
/* Clear out any leftover signal reasons */ /* Clear out any leftover signal reasons */
MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t)); MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t));
@@ -200,7 +199,7 @@ ProcSignalInit(int pss_idx)
MyProcSignalSlot = slot; MyProcSignalSlot = slot;
/* Set up to release the slot on process exit */ /* Set up to release the slot on process exit */
on_shmem_exit(CleanupProcSignalState, Int32GetDatum(pss_idx)); on_shmem_exit(CleanupProcSignalState, (Datum) 0);
} }
/* /*
@@ -212,11 +211,7 @@ ProcSignalInit(int pss_idx)
static void static void
CleanupProcSignalState(int status, Datum arg) CleanupProcSignalState(int status, Datum arg)
{ {
int pss_idx = DatumGetInt32(arg); ProcSignalSlot *slot = MyProcSignalSlot;
ProcSignalSlot *slot;
slot = &ProcSignal->psh_slot[pss_idx - 1];
Assert(slot == MyProcSignalSlot);
/* /*
* Clear MyProcSignalSlot, so that a SIGUSR1 received after this point * Clear MyProcSignalSlot, so that a SIGUSR1 received after this point
@@ -233,7 +228,7 @@ CleanupProcSignalState(int status, Datum arg)
* infinite loop trying to exit * infinite loop trying to exit
*/ */
elog(LOG, "process %d releasing ProcSignal slot %d, but it contains %d", elog(LOG, "process %d releasing ProcSignal slot %d, but it contains %d",
MyProcPid, pss_idx, (int) slot->pss_pid); MyProcPid, (int) (slot - ProcSignal->psh_slot), (int) slot->pss_pid);
return; /* XXX better to zero the slot anyway? */ return; /* XXX better to zero the slot anyway? */
} }

View File

@@ -139,7 +139,6 @@ typedef struct ProcState
{ {
/* procPid is zero in an inactive ProcState array entry. */ /* procPid is zero in an inactive ProcState array entry. */
pid_t procPid; /* PID of backend, for signaling */ pid_t procPid; /* PID of backend, for signaling */
PGPROC *proc; /* PGPROC of backend */
/* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
int nextMsgNum; /* next message number to read */ int nextMsgNum; /* next message number to read */
bool resetState; /* backend needs to reset its state */ bool resetState; /* backend needs to reset its state */
@@ -172,8 +171,6 @@ typedef struct SISeg
int minMsgNum; /* oldest message still needed */ int minMsgNum; /* oldest message still needed */
int maxMsgNum; /* next message number to be assigned */ int maxMsgNum; /* next message number to be assigned */
int nextThreshold; /* # of messages to call SICleanupQueue */ int nextThreshold; /* # of messages to call SICleanupQueue */
int lastBackend; /* index of last active procState entry, +1 */
int maxBackends; /* size of procState array */
slock_t msgnumLock; /* spinlock protecting maxMsgNum */ slock_t msgnumLock; /* spinlock protecting maxMsgNum */
@@ -183,11 +180,29 @@ typedef struct SISeg
SharedInvalidationMessage buffer[MAXNUMMESSAGES]; SharedInvalidationMessage buffer[MAXNUMMESSAGES];
/* /*
* Per-backend invalidation state info (has MaxBackends entries). * Per-backend invalidation state info.
*
* 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
* 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
* a dense array of their indexes, to speed up scanning all in-use slots.
*
* 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
* having our separate copy avoids contention on ProcArrayLock, and allows
* us to track only the processes that participate in shared cache
* invalidations.
*/ */
int numProcs;
int *pgprocnos;
ProcState procState[FLEXIBLE_ARRAY_MEMBER]; ProcState procState[FLEXIBLE_ARRAY_MEMBER];
} SISeg; } SISeg;
/*
* We reserve a slot for each possible BackendId, plus one for each
* possible auxiliary process type. (This scheme assumes there is not
* more than one of any auxiliary process type at a time.)
*/
#define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
@@ -205,16 +220,8 @@ SInvalShmemSize(void)
Size size; Size size;
size = offsetof(SISeg, procState); size = offsetof(SISeg, procState);
size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
/* size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
* In Hot Standby mode, the startup process requests a procState array
* slot using InitRecoveryTransactionEnvironment(). Even though
* MaxBackends doesn't account for the startup process, it is guaranteed
* to get a free slot. This is because the autovacuum launcher and worker
* processes, which are included in MaxBackends, are not started in Hot
* Standby mode.
*/
size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
return size; return size;
} }
@@ -239,23 +246,22 @@ CreateSharedInvalidationState(void)
shmInvalBuffer->minMsgNum = 0; shmInvalBuffer->minMsgNum = 0;
shmInvalBuffer->maxMsgNum = 0; shmInvalBuffer->maxMsgNum = 0;
shmInvalBuffer->nextThreshold = CLEANUP_MIN; shmInvalBuffer->nextThreshold = CLEANUP_MIN;
shmInvalBuffer->lastBackend = 0;
shmInvalBuffer->maxBackends = MaxBackends;
SpinLockInit(&shmInvalBuffer->msgnumLock); SpinLockInit(&shmInvalBuffer->msgnumLock);
/* The buffer[] array is initially all unused, so we need not fill it */ /* The buffer[] array is initially all unused, so we need not fill it */
/* Mark all backends inactive, and initialize nextLXID */ /* Mark all backends inactive, and initialize nextLXID */
for (i = 0; i < shmInvalBuffer->maxBackends; i++) for (i = 0; i < NumProcStateSlots; i++)
{ {
shmInvalBuffer->procState[i].procPid = 0; /* inactive */ shmInvalBuffer->procState[i].procPid = 0; /* inactive */
shmInvalBuffer->procState[i].proc = NULL;
shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
shmInvalBuffer->procState[i].resetState = false; shmInvalBuffer->procState[i].resetState = false;
shmInvalBuffer->procState[i].signaled = false; shmInvalBuffer->procState[i].signaled = false;
shmInvalBuffer->procState[i].hasMessages = false; shmInvalBuffer->procState[i].hasMessages = false;
shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
} }
shmInvalBuffer->numProcs = 0;
shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
} }
/* /*
@@ -265,59 +271,41 @@ CreateSharedInvalidationState(void)
void void
SharedInvalBackendInit(bool sendOnly) SharedInvalBackendInit(bool sendOnly)
{ {
int index; ProcState *stateP;
ProcState *stateP = NULL; pid_t oldPid;
SISeg *segP = shmInvalBuffer; SISeg *segP = shmInvalBuffer;
int pgprocno;
if (MyBackendId <= 0)
elog(ERROR, "MyBackendId not set");
if (MyBackendId > NumProcStateSlots)
elog(PANIC, "unexpected MyBackendId %d in SharedInvalBackendInit (max %d)",
MyBackendId, NumProcStateSlots);
pgprocno = MyBackendId - 1;
stateP = &segP->procState[pgprocno];
/* /*
* This can run in parallel with read operations, but not with write * This can run in parallel with read operations, but not with write
* operations, since SIInsertDataEntries relies on lastBackend to set * operations, since SIInsertDataEntries relies on the pgprocnos array to
* hasMessages appropriately. * set hasMessages appropriately.
*/ */
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
/* Look for a free entry in the procState array */ oldPid = stateP->procPid;
for (index = 0; index < segP->lastBackend; index++) if (oldPid != 0)
{ {
if (segP->procState[index].procPid == 0) /* inactive slot? */
{
stateP = &segP->procState[index];
break;
}
}
if (stateP == NULL)
{
if (segP->lastBackend < segP->maxBackends)
{
stateP = &segP->procState[segP->lastBackend];
Assert(stateP->procPid == 0);
segP->lastBackend++;
}
else
{
/*
* out of procState slots: MaxBackends exceeded -- report normally
*/
MyBackendId = InvalidBackendId;
LWLockRelease(SInvalWriteLock); LWLockRelease(SInvalWriteLock);
ereport(FATAL, elog(ERROR, "sinval slot for backend %d is already in use by process %d",
(errcode(ERRCODE_TOO_MANY_CONNECTIONS), MyBackendId, (int) oldPid);
errmsg("sorry, too many clients already")));
}
} }
MyBackendId = (stateP - &segP->procState[0]) + 1; shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = pgprocno;
/* Advertise assigned backend ID in MyProc */
MyProc->backendId = MyBackendId;
/* Fetch next local transaction ID into local memory */ /* Fetch next local transaction ID into local memory */
nextLocalTransactionId = stateP->nextLXID; nextLocalTransactionId = stateP->nextLXID;
/* mark myself active, with all extant messages already read */ /* mark myself active, with all extant messages already read */
stateP->procPid = MyProcPid; stateP->procPid = MyProcPid;
stateP->proc = MyProc;
stateP->nextMsgNum = segP->maxMsgNum; stateP->nextMsgNum = segP->maxMsgNum;
stateP->resetState = false; stateP->resetState = false;
stateP->signaled = false; stateP->signaled = false;
@@ -328,8 +316,6 @@ SharedInvalBackendInit(bool sendOnly)
/* register exit routine to mark my entry inactive at exit */ /* register exit routine to mark my entry inactive at exit */
on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
elog(DEBUG4, "my backend ID is %d", MyBackendId);
} }
/* /*
@@ -345,96 +331,36 @@ CleanupInvalidationState(int status, Datum arg)
{ {
SISeg *segP = (SISeg *) DatumGetPointer(arg); SISeg *segP = (SISeg *) DatumGetPointer(arg);
ProcState *stateP; ProcState *stateP;
int pgprocno = MyBackendId - 1;
int i; int i;
Assert(PointerIsValid(segP)); Assert(PointerIsValid(segP));
LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
stateP = &segP->procState[MyBackendId - 1]; stateP = &segP->procState[pgprocno];
/* Update next local transaction ID for next holder of this backendID */ /* Update next local transaction ID for next holder of this backendID */
stateP->nextLXID = nextLocalTransactionId; stateP->nextLXID = nextLocalTransactionId;
/* Mark myself inactive */ /* Mark myself inactive */
stateP->procPid = 0; stateP->procPid = 0;
stateP->proc = NULL;
stateP->nextMsgNum = 0; stateP->nextMsgNum = 0;
stateP->resetState = false; stateP->resetState = false;
stateP->signaled = false; stateP->signaled = false;
/* Recompute index of last active backend */ for (i = segP->numProcs - 1; i >= 0; i--)
for (i = segP->lastBackend; i > 0; i--)
{ {
if (segP->procState[i - 1].procPid != 0) if (segP->pgprocnos[i] == pgprocno)
{
if (i != segP->numProcs - 1)
segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
break; break;
} }
segP->lastBackend = i;
LWLockRelease(SInvalWriteLock);
}
/*
* BackendIdGetProc
* Get the PGPROC structure for a backend, given the backend ID.
* The result may be out of date arbitrarily quickly, so the caller
* must be careful about how this information is used. NULL is
* returned if the backend is not active.
*/
PGPROC *
BackendIdGetProc(int backendID)
{
PGPROC *result = NULL;
SISeg *segP = shmInvalBuffer;
/* Need to lock out additions/removals of backends */
LWLockAcquire(SInvalWriteLock, LW_SHARED);
if (backendID > 0 && backendID <= segP->lastBackend)
{
ProcState *stateP = &segP->procState[backendID - 1];
result = stateP->proc;
}
LWLockRelease(SInvalWriteLock);
return result;
}
/*
* BackendIdGetTransactionIds
* Get the xid, xmin, nsubxid and overflow status of the backend. The
* result may be out of date arbitrarily quickly, so the caller must be
* careful about how this information is used.
*/
void
BackendIdGetTransactionIds(int backendID, TransactionId *xid,
TransactionId *xmin, int *nsubxid, bool *overflowed)
{
SISeg *segP = shmInvalBuffer;
*xid = InvalidTransactionId;
*xmin = InvalidTransactionId;
*nsubxid = 0;
*overflowed = false;
/* Need to lock out additions/removals of backends */
LWLockAcquire(SInvalWriteLock, LW_SHARED);
if (backendID > 0 && backendID <= segP->lastBackend)
{
ProcState *stateP = &segP->procState[backendID - 1];
PGPROC *proc = stateP->proc;
if (proc != NULL)
{
*xid = proc->xid;
*xmin = proc->xmin;
*nsubxid = proc->subxidStatus.count;
*overflowed = proc->subxidStatus.overflowed;
}
} }
if (i < 0)
elog(PANIC, "could not find entry in sinval array");
segP->numProcs--;
LWLockRelease(SInvalWriteLock); LWLockRelease(SInvalWriteLock);
} }
@@ -507,9 +433,9 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
* these (unlocked) changes will be committed to memory before we exit * these (unlocked) changes will be committed to memory before we exit
* the function. * the function.
*/ */
for (i = 0; i < segP->lastBackend; i++) for (i = 0; i < segP->numProcs; i++)
{ {
ProcState *stateP = &segP->procState[i]; ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
stateP->hasMessages = true; stateP->hasMessages = true;
} }
@@ -677,13 +603,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
minsig = min - SIG_THRESHOLD; minsig = min - SIG_THRESHOLD;
lowbound = min - MAXNUMMESSAGES + minFree; lowbound = min - MAXNUMMESSAGES + minFree;
for (i = 0; i < segP->lastBackend; i++) for (i = 0; i < segP->numProcs; i++)
{ {
ProcState *stateP = &segP->procState[i]; ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
int n = stateP->nextMsgNum; int n = stateP->nextMsgNum;
/* Ignore if inactive or already in reset state */ /* Ignore if already in reset state */
if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) Assert(stateP->procPid != 0);
if (stateP->resetState || stateP->sendOnly)
continue; continue;
/* /*
@@ -719,11 +646,8 @@ SICleanupQueue(bool callerHasWriteLock, int minFree)
{ {
segP->minMsgNum -= MSGNUMWRAPAROUND; segP->minMsgNum -= MSGNUMWRAPAROUND;
segP->maxMsgNum -= MSGNUMWRAPAROUND; segP->maxMsgNum -= MSGNUMWRAPAROUND;
for (i = 0; i < segP->lastBackend; i++) for (i = 0; i < segP->numProcs; i++)
{ segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
/* we don't bother skipping inactive entries here */
segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
}
} }
/* /*

View File

@@ -137,6 +137,7 @@ InitRecoveryTransactionEnvironment(void)
* are held by vxids and row level locks are held by xids. All queries * are held by vxids and row level locks are held by xids. All queries
* hold AccessShareLocks so never block while we write or lock new rows. * hold AccessShareLocks so never block while we write or lock new rows.
*/ */
MyProc->vxid.backendId = MyBackendId;
vxid.backendId = MyBackendId; vxid.backendId = MyBackendId;
vxid.localTransactionId = GetNextLocalTransactionId(); vxid.localTransactionId = GetNextLocalTransactionId();
VirtualXactLockTableInsert(vxid); VirtualXactLockTableInsert(vxid);

View File

@@ -3625,8 +3625,8 @@ GetLockStatusData(void)
proc->fpRelId[f]); proc->fpRelId[f]);
instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET; instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
instance->waitLockMode = NoLock; instance->waitLockMode = NoLock;
instance->backend = proc->backendId; instance->vxid.backendId = proc->vxid.backendId;
instance->lxid = proc->lxid; instance->vxid.localTransactionId = proc->vxid.lxid;
instance->pid = proc->pid; instance->pid = proc->pid;
instance->leaderPid = proc->pid; instance->leaderPid = proc->pid;
instance->fastpath = true; instance->fastpath = true;
@@ -3652,15 +3652,15 @@ GetLockStatusData(void)
repalloc(data->locks, sizeof(LockInstanceData) * els); repalloc(data->locks, sizeof(LockInstanceData) * els);
} }
vxid.backendId = proc->backendId; vxid.backendId = proc->vxid.backendId;
vxid.localTransactionId = proc->fpLocalTransactionId; vxid.localTransactionId = proc->fpLocalTransactionId;
instance = &data->locks[el]; instance = &data->locks[el];
SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid); SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
instance->holdMask = LOCKBIT_ON(ExclusiveLock); instance->holdMask = LOCKBIT_ON(ExclusiveLock);
instance->waitLockMode = NoLock; instance->waitLockMode = NoLock;
instance->backend = proc->backendId; instance->vxid.backendId = proc->vxid.backendId;
instance->lxid = proc->lxid; instance->vxid.localTransactionId = proc->vxid.lxid;
instance->pid = proc->pid; instance->pid = proc->pid;
instance->leaderPid = proc->pid; instance->leaderPid = proc->pid;
instance->fastpath = true; instance->fastpath = true;
@@ -3712,8 +3712,8 @@ GetLockStatusData(void)
instance->waitLockMode = proc->waitLockMode; instance->waitLockMode = proc->waitLockMode;
else else
instance->waitLockMode = NoLock; instance->waitLockMode = NoLock;
instance->backend = proc->backendId; instance->vxid.backendId = proc->vxid.backendId;
instance->lxid = proc->lxid; instance->vxid.localTransactionId = proc->vxid.lxid;
instance->pid = proc->pid; instance->pid = proc->pid;
instance->leaderPid = proclock->groupLeader->pid; instance->leaderPid = proclock->groupLeader->pid;
instance->fastpath = false; instance->fastpath = false;
@@ -3888,8 +3888,8 @@ GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
instance->waitLockMode = proc->waitLockMode; instance->waitLockMode = proc->waitLockMode;
else else
instance->waitLockMode = NoLock; instance->waitLockMode = NoLock;
instance->backend = proc->backendId; instance->vxid.backendId = proc->vxid.backendId;
instance->lxid = proc->lxid; instance->vxid.localTransactionId = proc->vxid.lxid;
instance->pid = proc->pid; instance->pid = proc->pid;
instance->leaderPid = proclock->groupLeader->pid; instance->leaderPid = proclock->groupLeader->pid;
instance->fastpath = false; instance->fastpath = false;
@@ -4374,8 +4374,8 @@ lock_twophase_postabort(TransactionId xid, uint16 info,
* lockers, as we haven't advertised this vxid via the ProcArray yet. * lockers, as we haven't advertised this vxid via the ProcArray yet.
* *
* Since MyProc->fpLocalTransactionId will normally contain the same data * Since MyProc->fpLocalTransactionId will normally contain the same data
* as MyProc->lxid, you might wonder if we really need both. The * as MyProc->vxid.lxid, you might wonder if we really need both. The
* difference is that MyProc->lxid is set and cleared unlocked, and * difference is that MyProc->vxid.lxid is set and cleared unlocked, and
* examined by procarray.c, while fpLocalTransactionId is protected by * examined by procarray.c, while fpLocalTransactionId is protected by
* fpInfoLock and is used only by the locking subsystem. Doing it this * fpInfoLock and is used only by the locking subsystem. Doing it this
* way makes it easier to verify that there are no funny race conditions. * way makes it easier to verify that there are no funny race conditions.
@@ -4391,7 +4391,7 @@ VirtualXactLockTableInsert(VirtualTransactionId vxid)
LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE);
Assert(MyProc->backendId == vxid.backendId); Assert(MyProc->vxid.backendId == vxid.backendId);
Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId); Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
Assert(MyProc->fpVXIDLock == false); Assert(MyProc->fpVXIDLock == false);
@@ -4413,7 +4413,7 @@ VirtualXactLockTableCleanup(void)
bool fastpath; bool fastpath;
LocalTransactionId lxid; LocalTransactionId lxid;
Assert(MyProc->backendId != InvalidBackendId); Assert(MyProc->vxid.backendId != InvalidBackendId);
/* /*
* Clean up shared memory state. * Clean up shared memory state.
@@ -4541,7 +4541,7 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
*/ */
LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE); LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE);
if (proc->backendId != vxid.backendId if (proc->vxid.backendId != vxid.backendId
|| proc->fpLocalTransactionId != vxid.localTransactionId) || proc->fpLocalTransactionId != vxid.localTransactionId)
{ {
/* VXID ended */ /* VXID ended */

View File

@@ -242,25 +242,25 @@ InitProcGlobal(void)
if (i < MaxConnections) if (i < MaxConnections)
{ {
/* PGPROC for normal backend, add to freeProcs list */ /* PGPROC for normal backend, add to freeProcs list */
dlist_push_head(&ProcGlobal->freeProcs, &proc->links); dlist_push_tail(&ProcGlobal->freeProcs, &proc->links);
proc->procgloballist = &ProcGlobal->freeProcs; proc->procgloballist = &ProcGlobal->freeProcs;
} }
else if (i < MaxConnections + autovacuum_max_workers + 1) else if (i < MaxConnections + autovacuum_max_workers + 1)
{ {
/* PGPROC for AV launcher/worker, add to autovacFreeProcs list */ /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */
dlist_push_head(&ProcGlobal->autovacFreeProcs, &proc->links); dlist_push_tail(&ProcGlobal->autovacFreeProcs, &proc->links);
proc->procgloballist = &ProcGlobal->autovacFreeProcs; proc->procgloballist = &ProcGlobal->autovacFreeProcs;
} }
else if (i < MaxConnections + autovacuum_max_workers + 1 + max_worker_processes) else if (i < MaxConnections + autovacuum_max_workers + 1 + max_worker_processes)
{ {
/* PGPROC for bgworker, add to bgworkerFreeProcs list */ /* PGPROC for bgworker, add to bgworkerFreeProcs list */
dlist_push_head(&ProcGlobal->bgworkerFreeProcs, &proc->links); dlist_push_tail(&ProcGlobal->bgworkerFreeProcs, &proc->links);
proc->procgloballist = &ProcGlobal->bgworkerFreeProcs; proc->procgloballist = &ProcGlobal->bgworkerFreeProcs;
} }
else if (i < MaxBackends) else if (i < MaxBackends)
{ {
/* PGPROC for walsender, add to walsenderFreeProcs list */ /* PGPROC for walsender, add to walsenderFreeProcs list */
dlist_push_head(&ProcGlobal->walsenderFreeProcs, &proc->links); dlist_push_tail(&ProcGlobal->walsenderFreeProcs, &proc->links);
proc->procgloballist = &ProcGlobal->walsenderFreeProcs; proc->procgloballist = &ProcGlobal->walsenderFreeProcs;
} }
@@ -355,6 +355,7 @@ InitProcess(void)
errmsg("sorry, too many clients already"))); errmsg("sorry, too many clients already")));
} }
MyProcNumber = GetNumberFromPGProc(MyProc); MyProcNumber = GetNumberFromPGProc(MyProc);
MyBackendId = GetBackendIdFromPGProc(MyProc);
/* /*
* Cross-check that the PGPROC is of the type we expect; if this were not * Cross-check that the PGPROC is of the type we expect; if this were not
@@ -381,14 +382,14 @@ InitProcess(void)
*/ */
dlist_node_init(&MyProc->links); dlist_node_init(&MyProc->links);
MyProc->waitStatus = PROC_WAIT_STATUS_OK; MyProc->waitStatus = PROC_WAIT_STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId;
MyProc->fpVXIDLock = false; MyProc->fpVXIDLock = false;
MyProc->fpLocalTransactionId = InvalidLocalTransactionId; MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
MyProc->xid = InvalidTransactionId; MyProc->xid = InvalidTransactionId;
MyProc->xmin = InvalidTransactionId; MyProc->xmin = InvalidTransactionId;
MyProc->pid = MyProcPid; MyProc->pid = MyProcPid;
/* backendId, databaseId and roleId will be filled in later */ MyProc->vxid.backendId = MyBackendId;
MyProc->backendId = InvalidBackendId; MyProc->vxid.lxid = InvalidLocalTransactionId;
/* databaseId and roleId will be filled in later */
MyProc->databaseId = InvalidOid; MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid; MyProc->roleId = InvalidOid;
MyProc->tempNamespaceId = InvalidOid; MyProc->tempNamespaceId = InvalidOid;
@@ -568,11 +569,11 @@ InitAuxiliaryProcess(void)
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
((volatile PGPROC *) auxproc)->pid = MyProcPid; ((volatile PGPROC *) auxproc)->pid = MyProcPid;
MyProc = auxproc;
SpinLockRelease(ProcStructLock); SpinLockRelease(ProcStructLock);
MyProc = auxproc;
MyProcNumber = GetNumberFromPGProc(MyProc); MyProcNumber = GetNumberFromPGProc(MyProc);
MyBackendId = GetBackendIdFromPGProc(MyProc);
/* /*
* Initialize all fields of MyProc, except for those previously * Initialize all fields of MyProc, except for those previously
@@ -580,12 +581,12 @@ InitAuxiliaryProcess(void)
*/ */
dlist_node_init(&MyProc->links); dlist_node_init(&MyProc->links);
MyProc->waitStatus = PROC_WAIT_STATUS_OK; MyProc->waitStatus = PROC_WAIT_STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId;
MyProc->fpVXIDLock = false; MyProc->fpVXIDLock = false;
MyProc->fpLocalTransactionId = InvalidLocalTransactionId; MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
MyProc->xid = InvalidTransactionId; MyProc->xid = InvalidTransactionId;
MyProc->xmin = InvalidTransactionId; MyProc->xmin = InvalidTransactionId;
MyProc->backendId = InvalidBackendId; MyProc->vxid.backendId = InvalidBackendId;
MyProc->vxid.lxid = InvalidLocalTransactionId;
MyProc->databaseId = InvalidOid; MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid; MyProc->roleId = InvalidOid;
MyProc->tempNamespaceId = InvalidOid; MyProc->tempNamespaceId = InvalidOid;
@@ -916,8 +917,14 @@ ProcKill(int code, Datum arg)
proc = MyProc; proc = MyProc;
MyProc = NULL; MyProc = NULL;
MyProcNumber = INVALID_PGPROCNO; MyProcNumber = INVALID_PGPROCNO;
MyBackendId = InvalidBackendId;
DisownLatch(&proc->procLatch); DisownLatch(&proc->procLatch);
/* Mark the proc no longer in use */
proc->pid = 0;
proc->vxid.backendId = InvalidBackendId;
proc->vxid.lxid = InvalidTransactionId;
procgloballist = proc->procgloballist; procgloballist = proc->procgloballist;
SpinLockAcquire(ProcStructLock); SpinLockAcquire(ProcStructLock);
@@ -992,12 +999,15 @@ AuxiliaryProcKill(int code, Datum arg)
proc = MyProc; proc = MyProc;
MyProc = NULL; MyProc = NULL;
MyProcNumber = INVALID_PGPROCNO; MyProcNumber = INVALID_PGPROCNO;
MyBackendId = InvalidBackendId;
DisownLatch(&proc->procLatch); DisownLatch(&proc->procLatch);
SpinLockAcquire(ProcStructLock); SpinLockAcquire(ProcStructLock);
/* Mark auxiliary proc no longer in use */ /* Mark auxiliary proc no longer in use */
proc->pid = 0; proc->pid = 0;
proc->vxid.backendId = InvalidBackendId;
proc->vxid.lxid = InvalidTransactionId;
/* Update shared estimate of spins_per_delay */ /* Update shared estimate of spins_per_delay */
ProcGlobal->spins_per_delay = update_spins_per_delay(ProcGlobal->spins_per_delay); ProcGlobal->spins_per_delay = update_spins_per_delay(ProcGlobal->spins_per_delay);

View File

@@ -19,6 +19,7 @@
#include "port/atomics.h" /* for memory barriers */ #include "port/atomics.h" /* for memory barriers */
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/proc.h" /* for MyProc */ #include "storage/proc.h" /* for MyProc */
#include "storage/procarray.h"
#include "storage/sinvaladt.h" #include "storage/sinvaladt.h"
#include "utils/ascii.h" #include "utils/ascii.h"
#include "utils/backend_status.h" #include "utils/backend_status.h"
@@ -29,13 +30,12 @@
/* ---------- /* ----------
* Total number of backends including auxiliary * Total number of backends including auxiliary
* *
* We reserve a slot for each possible BackendId, plus one for each * We reserve a slot for each possible PGPROC entry, including aux processes.
* possible auxiliary process type. (This scheme assumes there is not * (But not including PGPROC entries reserved for prepared xacts; they are not
* more than one of any auxiliary process type at a time.) MaxBackends * real processes.)
* includes autovacuum workers and background workers as well.
* ---------- * ----------
*/ */
#define NumBackendStatSlots (MaxBackends + NUM_AUXPROCTYPES) #define NumBackendStatSlots (MaxBackends + NUM_AUXILIARY_PROCS)
/* ---------- /* ----------
@@ -238,10 +238,9 @@ CreateSharedBackendStatus(void)
/* /*
* Initialize pgstats backend activity state, and set up our on-proc-exit * Initialize pgstats backend activity state, and set up our on-proc-exit
* hook. Called from InitPostgres and AuxiliaryProcessMain. For auxiliary * hook. Called from InitPostgres and AuxiliaryProcessMain. MyBackendId must
* process, MyBackendId is invalid. Otherwise, MyBackendId must be set, but we * be set, but we must not have started any transaction yet (since the exit
* must not have started any transaction yet (since the exit hook must run * hook must run after the last transaction exit).
* after the last transaction exit).
* *
* NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful. * NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful.
*/ */
@@ -249,26 +248,9 @@ void
pgstat_beinit(void) pgstat_beinit(void)
{ {
/* Initialize MyBEEntry */ /* Initialize MyBEEntry */
if (MyBackendId != InvalidBackendId) Assert(MyBackendId != InvalidBackendId);
{ Assert(MyBackendId >= 1 && MyBackendId <= NumBackendStatSlots);
Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
MyBEEntry = &BackendStatusArray[MyBackendId - 1]; MyBEEntry = &BackendStatusArray[MyBackendId - 1];
}
else
{
/* Must be an auxiliary process */
Assert(MyAuxProcType != NotAnAuxProcess);
/*
* Assign the MyBEEntry for an auxiliary process. Since it doesn't
* have a BackendId, the slot is statically allocated based on the
* auxiliary process type (MyAuxProcType). Backends use slots indexed
* in the range from 0 to MaxBackends (exclusive), so we use
* MaxBackends + AuxProcType as the index of the slot for an auxiliary
* process.
*/
MyBEEntry = &BackendStatusArray[MaxBackends + MyAuxProcType];
}
/* Set up a process-exit hook to clean up */ /* Set up a process-exit hook to clean up */
on_shmem_exit(pgstat_beshutdown_hook, 0); on_shmem_exit(pgstat_beshutdown_hook, 0);
@@ -281,12 +263,12 @@ pgstat_beinit(void)
* Initialize this backend's entry in the PgBackendStatus array. * Initialize this backend's entry in the PgBackendStatus array.
* Called from InitPostgres. * Called from InitPostgres.
* *
* Apart from auxiliary processes, MyBackendId, MyDatabaseId, * Apart from auxiliary processes, MyDatabaseId, session userid, and
* session userid, and application_name must be set for a * application_name must already be set (hence, this cannot be combined
* backend (hence, this cannot be combined with pgstat_beinit). * with pgstat_beinit). Note also that we must be inside a transaction
* Note also that we must be inside a transaction if this isn't an aux * if this isn't an aux process, as we may need to do encoding conversion
* process, as we may need to do encoding conversion on some strings. * on some strings.
* ---------- *----------
*/ */
void void
pgstat_bestart(void) pgstat_bestart(void)

View File

@@ -353,7 +353,7 @@ pg_lock_status(PG_FUNCTION_ARGS)
break; break;
} }
values[10] = VXIDGetDatum(instance->backend, instance->lxid); values[10] = VXIDGetDatum(instance->vxid.backendId, instance->vxid.localTransactionId);
if (instance->pid != 0) if (instance->pid != 0)
values[11] = Int32GetDatum(instance->pid); values[11] = Int32GetDatum(instance->pid);
else else

View File

@@ -148,19 +148,11 @@ pg_log_backend_memory_contexts(PG_FUNCTION_ARGS)
PGPROC *proc; PGPROC *proc;
BackendId backendId = InvalidBackendId; BackendId backendId = InvalidBackendId;
proc = BackendPidGetProc(pid);
/* /*
* See if the process with given pid is a backend or an auxiliary process. * See if the process with given pid is a backend or an auxiliary process.
*
* If the given process is a backend, use its backend id in
* SendProcSignal() later to speed up the operation. Otherwise, don't do
* that because auxiliary processes (except the startup process) don't
* have a valid backend id.
*/ */
if (proc != NULL) proc = BackendPidGetProc(pid);
backendId = proc->backendId; if (proc == NULL)
else
proc = AuxiliaryPidGetProc(pid); proc = AuxiliaryPidGetProc(pid);
/* /*
@@ -183,6 +175,8 @@ pg_log_backend_memory_contexts(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(false); PG_RETURN_BOOL(false);
} }
if (proc != NULL)
backendId = GetBackendIdFromPGProc(proc);
if (SendProcSignal(pid, PROCSIG_LOG_MEMORY_CONTEXT, backendId) < 0) if (SendProcSignal(pid, PROCSIG_LOG_MEMORY_CONTEXT, backendId) < 0)
{ {
/* Again, just a warning to allow loops */ /* Again, just a warning to allow loops */

View File

@@ -152,8 +152,8 @@ write_csvlog(ErrorData *edata)
/* Virtual transaction id */ /* Virtual transaction id */
/* keep VXID format in sync with lockfuncs.c */ /* keep VXID format in sync with lockfuncs.c */
if (MyProc != NULL && MyProc->backendId != InvalidBackendId) if (MyProc != NULL && MyProc->vxid.backendId != InvalidBackendId)
appendStringInfo(&buf, "%d/%u", MyProc->backendId, MyProc->lxid); appendStringInfo(&buf, "%d/%u", MyProc->vxid.backendId, MyProc->vxid.lxid);
appendStringInfoChar(&buf, ','); appendStringInfoChar(&buf, ',');
/* Transaction id */ /* Transaction id */

View File

@@ -3076,18 +3076,18 @@ log_status_format(StringInfo buf, const char *format, ErrorData *edata)
break; break;
case 'v': case 'v':
/* keep VXID format in sync with lockfuncs.c */ /* keep VXID format in sync with lockfuncs.c */
if (MyProc != NULL && MyProc->backendId != InvalidBackendId) if (MyProc != NULL && MyProc->vxid.backendId != InvalidBackendId)
{ {
if (padding != 0) if (padding != 0)
{ {
char strfbuf[128]; char strfbuf[128];
snprintf(strfbuf, sizeof(strfbuf) - 1, "%d/%u", snprintf(strfbuf, sizeof(strfbuf) - 1, "%d/%u",
MyProc->backendId, MyProc->lxid); MyProc->vxid.backendId, MyProc->vxid.lxid);
appendStringInfo(buf, "%*s", padding, strfbuf); appendStringInfo(buf, "%*s", padding, strfbuf);
} }
else else
appendStringInfo(buf, "%d/%u", MyProc->backendId, MyProc->lxid); appendStringInfo(buf, "%d/%u", MyProc->vxid.backendId, MyProc->vxid.lxid);
} }
else if (padding != 0) else if (padding != 0)
appendStringInfoSpaces(buf, appendStringInfoSpaces(buf,

View File

@@ -197,9 +197,9 @@ write_jsonlog(ErrorData *edata)
/* Virtual transaction id */ /* Virtual transaction id */
/* keep VXID format in sync with lockfuncs.c */ /* keep VXID format in sync with lockfuncs.c */
if (MyProc != NULL && MyProc->backendId != InvalidBackendId) if (MyProc != NULL && MyProc->vxid.backendId != InvalidBackendId)
appendJSONKeyValueFmt(&buf, "vxid", true, "%d/%u", MyProc->backendId, appendJSONKeyValueFmt(&buf, "vxid", true, "%d/%u",
MyProc->lxid); MyProc->vxid.backendId, MyProc->vxid.lxid);
/* Transaction id */ /* Transaction id */
appendJSONKeyValueFmt(&buf, "txid", false, "%u", appendJSONKeyValueFmt(&buf, "txid", false, "%u",

View File

@@ -742,18 +742,10 @@ InitPostgres(const char *in_dbname, Oid dboid,
/* /*
* Initialize my entry in the shared-invalidation manager's array of * Initialize my entry in the shared-invalidation manager's array of
* per-backend data. * per-backend data.
*
* Sets up MyBackendId, a unique backend identifier.
*/ */
MyBackendId = InvalidBackendId;
SharedInvalBackendInit(false); SharedInvalBackendInit(false);
if (MyBackendId > MaxBackends || MyBackendId <= 0) ProcSignalInit();
elog(FATAL, "bad backend ID: %d", MyBackendId);
/* Now that we have a BackendId, we can participate in ProcSignal */
ProcSignalInit(MyBackendId);
/* /*
* Also set up timeout handlers needed for backend operation. We need * Also set up timeout handlers needed for backend operation. We need

View File

@@ -1154,7 +1154,8 @@ ExportSnapshot(Snapshot snapshot)
* inside the transaction from 1. * inside the transaction from 1.
*/ */
snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d", snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d",
MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1); MyProc->vxid.backendId, MyProc->vxid.lxid,
list_length(exportedSnapshots) + 1);
/* /*
* Copy the snapshot into TopTransactionContext, add it to the * Copy the snapshot into TopTransactionContext, add it to the
@@ -1181,7 +1182,7 @@ ExportSnapshot(Snapshot snapshot)
*/ */
initStringInfo(&buf); initStringInfo(&buf);
appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid); appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->vxid.backendId, MyProc->vxid.lxid);
appendStringInfo(&buf, "pid:%d\n", MyProcPid); appendStringInfo(&buf, "pid:%d\n", MyProcPid);
appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId); appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId);
appendStringInfo(&buf, "iso:%d\n", XactIsoLevel); appendStringInfo(&buf, "iso:%d\n", XactIsoLevel);

View File

@@ -454,8 +454,6 @@ typedef enum
WalWriterProcess, WalWriterProcess,
WalReceiverProcess, WalReceiverProcess,
WalSummarizerProcess, WalSummarizerProcess,
NUM_AUXPROCTYPES /* Must be last! */
} AuxProcType; } AuxProcType;
extern PGDLLIMPORT AuxProcType MyAuxProcType; extern PGDLLIMPORT AuxProcType MyAuxProcType;

View File

@@ -14,11 +14,15 @@
#ifndef BACKENDID_H #ifndef BACKENDID_H
#define BACKENDID_H #define BACKENDID_H
/* ---------------- /*
* -cim 8/17/90 * BackendId uniquely identifies an active backend or auxiliary process. It's
* ---------------- * assigned at backend startup after authentication. Note that a backend ID
* can be reused for a different backend immediately after a backend exits.
*
* Backend IDs are assigned starting from 1. For historical reasons, BackendId
* 0 is unused, but InvalidBackendId is defined as -1.
*/ */
typedef int BackendId; /* unique currently active backend identifier */ typedef int BackendId;
#define InvalidBackendId (-1) #define InvalidBackendId (-1)

View File

@@ -74,9 +74,9 @@ typedef struct
#define SetInvalidVirtualTransactionId(vxid) \ #define SetInvalidVirtualTransactionId(vxid) \
((vxid).backendId = InvalidBackendId, \ ((vxid).backendId = InvalidBackendId, \
(vxid).localTransactionId = InvalidLocalTransactionId) (vxid).localTransactionId = InvalidLocalTransactionId)
#define GET_VXID_FROM_PGPROC(vxid, proc) \ #define GET_VXID_FROM_PGPROC(vxid_dst, proc) \
((vxid).backendId = (proc).backendId, \ ((vxid_dst).backendId = (proc).vxid.backendId, \
(vxid).localTransactionId = (proc).lxid) (vxid_dst).localTransactionId = (proc).vxid.lxid)
/* MAX_LOCKMODES cannot be larger than the # of bits in LOCKMASK */ /* MAX_LOCKMODES cannot be larger than the # of bits in LOCKMASK */
#define MAX_LOCKMODES 10 #define MAX_LOCKMODES 10
@@ -454,8 +454,7 @@ typedef struct LockInstanceData
LOCKTAG locktag; /* tag for locked object */ LOCKTAG locktag; /* tag for locked object */
LOCKMASK holdMask; /* locks held by this PGPROC */ LOCKMASK holdMask; /* locks held by this PGPROC */
LOCKMODE waitLockMode; /* lock awaited by this PGPROC, if any */ LOCKMODE waitLockMode; /* lock awaited by this PGPROC, if any */
BackendId backend; /* backend ID of this PGPROC */ VirtualTransactionId vxid; /* virtual transaction ID of this PGPROC */
LocalTransactionId lxid; /* local transaction ID of this PGPROC */
TimestampTz waitStart; /* time at which this PGPROC started waiting TimestampTz waitStart; /* time at which this PGPROC started waiting
* for lock */ * for lock */
int pid; /* pid of this PGPROC */ int pid; /* pid of this PGPROC */

View File

@@ -186,16 +186,31 @@ struct PGPROC
* vacuum must not remove tuples deleted by * vacuum must not remove tuples deleted by
* xid >= xmin ! */ * xid >= xmin ! */
LocalTransactionId lxid; /* local id of top-level transaction currently
* being executed by this proc, if running;
* else InvalidLocalTransactionId */
int pid; /* Backend's process ID; 0 if prepared xact */ int pid; /* Backend's process ID; 0 if prepared xact */
int pgxactoff; /* offset into various ProcGlobal->arrays with int pgxactoff; /* offset into various ProcGlobal->arrays with
* data mirrored from this PGPROC */ * data mirrored from this PGPROC */
/*
* Currently running top-level transaction's virtual xid. Together these
* form a VirtualTransactionId, but we don't use that struct because this
* is not atomically assignable as whole, and we want to enforce code to
* consider both parts separately. See comments at VirtualTransactionId.
*/
struct
{
BackendId backendId; /* For regular backends, equal to
* GetBackendIdFromPGProc(proc). For prepared
* xacts, ID of the original backend that
* processed the transaction. For unused
* PGPROC entries, InvalidBackendID. */
LocalTransactionId lxid; /* local id of top-level transaction
* currently * being executed by this
* proc, if running; else
* InvalidLocaltransactionId */
} vxid;
/* These fields are zero while a backend is still starting up: */ /* These fields are zero while a backend is still starting up: */
BackendId backendId; /* This backend's backend ID (if assigned) */
Oid databaseId; /* OID of database this backend is using */ Oid databaseId; /* OID of database this backend is using */
Oid roleId; /* OID of role using this backend */ Oid roleId; /* OID of role using this backend */
@@ -406,9 +421,16 @@ extern PGDLLIMPORT PROC_HDR *ProcGlobal;
extern PGDLLIMPORT PGPROC *PreparedXactProcs; extern PGDLLIMPORT PGPROC *PreparedXactProcs;
/* Accessor for PGPROC given a pgprocno, and vice versa. */ /*
* Accessors for getting PGPROC given a pgprocno or BackendId, and vice versa.
*
* For historical reasons, some code uses 0-based "proc numbers", while other
* code uses 1-based backend IDs.
*/
#define GetPGProcByNumber(n) (&ProcGlobal->allProcs[(n)]) #define GetPGProcByNumber(n) (&ProcGlobal->allProcs[(n)])
#define GetNumberFromPGProc(proc) ((proc) - &ProcGlobal->allProcs[0]) #define GetNumberFromPGProc(proc) ((proc) - &ProcGlobal->allProcs[0])
#define GetPGProcByBackendId(n) (&ProcGlobal->allProcs[(n) - 1])
#define GetBackendIdFromPGProc(proc) (GetNumberFromPGProc(proc) + 1)
/* /*
* We set aside some extra PGPROC structures for auxiliary processes, * We set aside some extra PGPROC structures for auxiliary processes,

View File

@@ -64,6 +64,10 @@ extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids, int type);
extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids,
int nvxids, int type); int nvxids, int type);
extern PGPROC *BackendIdGetProc(int backendID);
extern void BackendIdGetTransactionIds(int backendID, TransactionId *xid,
TransactionId *xmin, int *nsubxid,
bool *overflowed);
extern PGPROC *BackendPidGetProc(int pid); extern PGPROC *BackendPidGetProc(int pid);
extern PGPROC *BackendPidGetProcWithLock(int pid); extern PGPROC *BackendPidGetProcWithLock(int pid);
extern int BackendXidGetPid(TransactionId xid); extern int BackendXidGetPid(TransactionId xid);

View File

@@ -62,7 +62,7 @@ typedef enum
extern Size ProcSignalShmemSize(void); extern Size ProcSignalShmemSize(void);
extern void ProcSignalShmemInit(void); extern void ProcSignalShmemInit(void);
extern void ProcSignalInit(int pss_idx); extern void ProcSignalInit(void);
extern int SendProcSignal(pid_t pid, ProcSignalReason reason, extern int SendProcSignal(pid_t pid, ProcSignalReason reason,
BackendId backendId); BackendId backendId);

View File

@@ -31,10 +31,6 @@
extern Size SInvalShmemSize(void); extern Size SInvalShmemSize(void);
extern void CreateSharedInvalidationState(void); extern void CreateSharedInvalidationState(void);
extern void SharedInvalBackendInit(bool sendOnly); extern void SharedInvalBackendInit(bool sendOnly);
extern PGPROC *BackendIdGetProc(int backendID);
extern void BackendIdGetTransactionIds(int backendID, TransactionId *xid,
TransactionId *xmin, int *nsubxid,
bool *overflowed);
extern void SIInsertDataEntries(const SharedInvalidationMessage *data, int n); extern void SIInsertDataEntries(const SharedInvalidationMessage *data, int n);
extern int SIGetDataEntries(SharedInvalidationMessage *data, int datasize); extern int SIGetDataEntries(SharedInvalidationMessage *data, int datasize);

View File

@@ -2211,7 +2211,7 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
paramLI = setup_param_list(estate, expr); paramLI = setup_param_list(estate, expr);
before_lxid = MyProc->lxid; before_lxid = MyProc->vxid.lxid;
/* /*
* If we have a procedure-lifespan resowner, use that to hold the refcount * If we have a procedure-lifespan resowner, use that to hold the refcount
@@ -2232,7 +2232,7 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
elog(ERROR, "SPI_execute_plan_extended failed executing query \"%s\": %s", elog(ERROR, "SPI_execute_plan_extended failed executing query \"%s\": %s",
expr->query, SPI_result_code_string(rc)); expr->query, SPI_result_code_string(rc));
after_lxid = MyProc->lxid; after_lxid = MyProc->vxid.lxid;
if (before_lxid != after_lxid) if (before_lxid != after_lxid)
{ {
@@ -6037,7 +6037,7 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
int32 *rettypmod) int32 *rettypmod)
{ {
ExprContext *econtext = estate->eval_econtext; ExprContext *econtext = estate->eval_econtext;
LocalTransactionId curlxid = MyProc->lxid; LocalTransactionId curlxid = MyProc->vxid.lxid;
ParamListInfo paramLI; ParamListInfo paramLI;
void *save_setup_arg; void *save_setup_arg;
bool need_snapshot; bool need_snapshot;
@@ -7943,7 +7943,7 @@ get_cast_hashentry(PLpgSQL_execstate *estate,
* functions do; DO blocks have private simple_eval_estates, and private * functions do; DO blocks have private simple_eval_estates, and private
* cast hash tables to go with them.) * cast hash tables to go with them.)
*/ */
curlxid = MyProc->lxid; curlxid = MyProc->vxid.lxid;
if (cast_entry->cast_lxid != curlxid || cast_entry->cast_in_use) if (cast_entry->cast_lxid != curlxid || cast_entry->cast_in_use)
{ {
oldcontext = MemoryContextSwitchTo(estate->simple_eval_estate->es_query_cxt); oldcontext = MemoryContextSwitchTo(estate->simple_eval_estate->es_query_cxt);
@@ -8070,7 +8070,7 @@ exec_simple_check_plan(PLpgSQL_execstate *estate, PLpgSQL_expr *expr)
/* Remember that we have the refcount */ /* Remember that we have the refcount */
expr->expr_simple_plansource = plansource; expr->expr_simple_plansource = plansource;
expr->expr_simple_plan = cplan; expr->expr_simple_plan = cplan;
expr->expr_simple_plan_lxid = MyProc->lxid; expr->expr_simple_plan_lxid = MyProc->vxid.lxid;
/* Share the remaining work with the replan code path */ /* Share the remaining work with the replan code path */
exec_save_simple_expr(expr, cplan); exec_save_simple_expr(expr, cplan);