1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-02 09:02:37 +03:00

Use ProcNumbers instead of direct Latch pointers to address other procs

This is in preparation for replacing Latches with a new abstraction.
That's still work in progress, but this seems a little tidier anyway,
so let's get this refactoring out of the way already.

Discussion: https://www.postgresql.org/message-id/391abe21-413e-4d91-a650-b663af49500c%40iki.fi
This commit is contained in:
Heikki Linnakangas
2024-11-01 13:47:20 +02:00
parent e819bbb7c8
commit a9c546a5a3
11 changed files with 72 additions and 55 deletions

View File

@ -2671,8 +2671,14 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
wakeup = true; wakeup = true;
} }
if (wakeup && ProcGlobal->walwriterLatch) if (wakeup)
SetLatch(ProcGlobal->walwriterLatch); {
volatile PROC_HDR *procglobal = ProcGlobal;
ProcNumber walwriterProc = procglobal->walwriterProc;
if (walwriterProc != INVALID_PROC_NUMBER)
SetLatch(&GetPGProcByNumber(walwriterProc)->procLatch);
}
} }
/* /*

View File

@ -112,7 +112,7 @@ addLSNWaiter(XLogRecPtr lsn)
Assert(!procInfo->inHeap); Assert(!procInfo->inHeap);
procInfo->latch = MyLatch; procInfo->procno = MyProcNumber;
procInfo->waitLSN = lsn; procInfo->waitLSN = lsn;
pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode); pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode);
@ -154,16 +154,17 @@ void
WaitLSNSetLatches(XLogRecPtr currentLSN) WaitLSNSetLatches(XLogRecPtr currentLSN)
{ {
int i; int i;
Latch **wakeUpProcLatches; ProcNumber *wakeUpProcs;
int numWakeUpProcs = 0; int numWakeUpProcs = 0;
wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends); wakeUpProcs = palloc(sizeof(ProcNumber) * MaxBackends);
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
/* /*
* Iterate the pairing heap of waiting processes till we find LSN not yet * Iterate the pairing heap of waiting processes till we find LSN not yet
* replayed. Record the process latches to set them later. * replayed. Record the process numbers to wake up, but to avoid holding
* the lock for too long, send the wakeups only after releasing the lock.
*/ */
while (!pairingheap_is_empty(&waitLSNState->waitersHeap)) while (!pairingheap_is_empty(&waitLSNState->waitersHeap))
{ {
@ -174,7 +175,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
procInfo->waitLSN > currentLSN) procInfo->waitLSN > currentLSN)
break; break;
wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch; wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
(void) pairingheap_remove_first(&waitLSNState->waitersHeap); (void) pairingheap_remove_first(&waitLSNState->waitersHeap);
procInfo->inHeap = false; procInfo->inHeap = false;
} }
@ -191,9 +192,9 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
*/ */
for (i = 0; i < numWakeUpProcs; i++) for (i = 0; i < numWakeUpProcs; i++)
{ {
SetLatch(wakeUpProcLatches[i]); SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
} }
pfree(wakeUpProcLatches); pfree(wakeUpProcs);
} }
/* /*

View File

@ -324,10 +324,10 @@ CheckpointerMain(char *startup_data, size_t startup_data_len)
UpdateSharedMemoryConfig(); UpdateSharedMemoryConfig();
/* /*
* Advertise our latch that backends can use to wake us up while we're * Advertise our proc number that backends can use to wake us up while
* sleeping. * we're sleeping.
*/ */
ProcGlobal->checkpointerLatch = &MyProc->procLatch; ProcGlobal->checkpointerProc = MyProcNumber;
/* /*
* Loop forever * Loop forever
@ -1139,8 +1139,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
LWLockRelease(CheckpointerCommLock); LWLockRelease(CheckpointerCommLock);
/* ... but not till after we release the lock */ /* ... but not till after we release the lock */
if (too_full && ProcGlobal->checkpointerLatch) if (too_full)
SetLatch(ProcGlobal->checkpointerLatch); {
volatile PROC_HDR *procglobal = ProcGlobal;
ProcNumber checkpointerProc = procglobal->checkpointerProc;
if (checkpointerProc != INVALID_PROC_NUMBER)
SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch);
}
return true; return true;
} }

View File

@ -208,10 +208,10 @@ WalWriterMain(char *startup_data, size_t startup_data_len)
SetWalWriterSleeping(false); SetWalWriterSleeping(false);
/* /*
* Advertise our latch that backends can use to wake us up while we're * Advertise our proc number that backends can use to wake us up while
* sleeping. * we're sleeping.
*/ */
ProcGlobal->walwriterLatch = &MyProc->procLatch; ProcGlobal->walwriterProc = MyProcNumber;
/* /*
* Loop forever * Loop forever

View File

@ -30,6 +30,7 @@
#include "pgstat.h" #include "pgstat.h"
#include "pqexpbuffer.h" #include "pqexpbuffer.h"
#include "replication/walreceiver.h" #include "replication/walreceiver.h"
#include "storage/latch.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/pg_lsn.h" #include "utils/pg_lsn.h"

View File

@ -266,8 +266,8 @@ WalReceiverMain(char *startup_data, size_t startup_data_len)
walrcv->lastMsgSendTime = walrcv->lastMsgSendTime =
walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now; walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
/* Report the latch to use to awaken this process */ /* Report our proc number so that others can wake us up */
walrcv->latch = &MyProc->procLatch; walrcv->procno = MyProcNumber;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
@ -819,8 +819,8 @@ WalRcvDie(int code, Datum arg)
Assert(walrcv->pid == MyProcPid); Assert(walrcv->pid == MyProcPid);
walrcv->walRcvState = WALRCV_STOPPED; walrcv->walRcvState = WALRCV_STOPPED;
walrcv->pid = 0; walrcv->pid = 0;
walrcv->procno = INVALID_PROC_NUMBER;
walrcv->ready_to_display = false; walrcv->ready_to_display = false;
walrcv->latch = NULL;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
@ -1358,15 +1358,15 @@ WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
void void
WalRcvForceReply(void) WalRcvForceReply(void)
{ {
Latch *latch; ProcNumber procno;
WalRcv->force_reply = true; WalRcv->force_reply = true;
/* fetching the latch pointer might not be atomic, so use spinlock */ /* fetching the proc number is probably atomic, but don't rely on it */
SpinLockAcquire(&WalRcv->mutex); SpinLockAcquire(&WalRcv->mutex);
latch = WalRcv->latch; procno = WalRcv->procno;
SpinLockRelease(&WalRcv->mutex); SpinLockRelease(&WalRcv->mutex);
if (latch) if (procno != INVALID_PROC_NUMBER)
SetLatch(latch); SetLatch(&GetPGProcByNumber(procno)->procLatch);
} }
/* /*

View File

@ -27,6 +27,7 @@
#include "pgstat.h" #include "pgstat.h"
#include "replication/walreceiver.h" #include "replication/walreceiver.h"
#include "storage/pmsignal.h" #include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h" #include "storage/shmem.h"
#include "utils/timestamp.h" #include "utils/timestamp.h"
@ -66,7 +67,7 @@ WalRcvShmemInit(void)
ConditionVariableInit(&WalRcv->walRcvStoppedCV); ConditionVariableInit(&WalRcv->walRcvStoppedCV);
SpinLockInit(&WalRcv->mutex); SpinLockInit(&WalRcv->mutex);
pg_atomic_init_u64(&WalRcv->writtenUpto, 0); pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
WalRcv->latch = NULL; WalRcv->procno = INVALID_PROC_NUMBER;
} }
} }
@ -248,7 +249,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
WalRcvData *walrcv = WalRcv; WalRcvData *walrcv = WalRcv;
bool launch = false; bool launch = false;
pg_time_t now = (pg_time_t) time(NULL); pg_time_t now = (pg_time_t) time(NULL);
Latch *latch; ProcNumber walrcv_proc;
/* /*
* We always start at the beginning of the segment. That prevents a broken * We always start at the beginning of the segment. That prevents a broken
@ -309,14 +310,14 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
walrcv->receiveStart = recptr; walrcv->receiveStart = recptr;
walrcv->receiveStartTLI = tli; walrcv->receiveStartTLI = tli;
latch = walrcv->latch; walrcv_proc = walrcv->procno;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
if (launch) if (launch)
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
else if (latch) else if (walrcv_proc != INVALID_PROC_NUMBER)
SetLatch(latch); SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
} }
/* /*

View File

@ -194,8 +194,8 @@ InitProcGlobal(void)
dlist_init(&ProcGlobal->bgworkerFreeProcs); dlist_init(&ProcGlobal->bgworkerFreeProcs);
dlist_init(&ProcGlobal->walsenderFreeProcs); dlist_init(&ProcGlobal->walsenderFreeProcs);
ProcGlobal->startupBufferPinWaitBufId = -1; ProcGlobal->startupBufferPinWaitBufId = -1;
ProcGlobal->walwriterLatch = NULL; ProcGlobal->walwriterProc = INVALID_PROC_NUMBER;
ProcGlobal->checkpointerLatch = NULL; ProcGlobal->checkpointerProc = INVALID_PROC_NUMBER;
pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PROC_NUMBER); pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PROC_NUMBER);
pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PROC_NUMBER); pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PROC_NUMBER);

View File

@ -15,7 +15,7 @@
#include "lib/pairingheap.h" #include "lib/pairingheap.h"
#include "postgres.h" #include "postgres.h"
#include "port/atomics.h" #include "port/atomics.h"
#include "storage/latch.h" #include "storage/procnumber.h"
#include "storage/spin.h" #include "storage/spin.h"
#include "tcop/dest.h" #include "tcop/dest.h"
@ -29,11 +29,8 @@ typedef struct WaitLSNProcInfo
/* LSN, which this process is waiting for */ /* LSN, which this process is waiting for */
XLogRecPtr waitLSN; XLogRecPtr waitLSN;
/* /* Process to wake up once the waitLSN is replayed */
* A pointer to the latch, which should be set once the waitLSN is ProcNumber procno;
* replayed.
*/
Latch *latch;
/* A pairing heap node for participation in waitLSNState->waitersHeap */ /* A pairing heap node for participation in waitLSNState->waitersHeap */
pairingheap_node phNode; pairingheap_node phNode;

View File

@ -21,7 +21,6 @@
#include "replication/logicalproto.h" #include "replication/logicalproto.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "storage/condition_variable.h" #include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/spin.h" #include "storage/spin.h"
#include "utils/tuplestore.h" #include "utils/tuplestore.h"
@ -58,13 +57,24 @@ typedef enum
typedef struct typedef struct
{ {
/* /*
* PID of currently active walreceiver process, its current state and * Currently active walreceiver process's proc number and PID.
* start time (actually, the time at which it was requested to be *
* started). * The startup process uses the proc number to wake it up after telling it
* where to start streaming (after setting receiveStart and
* receiveStartTLI), and also to tell it to send apply feedback to the
* primary whenever specially marked commit records are applied.
*/ */
ProcNumber procno;
pid_t pid; pid_t pid;
/* Its current state */
WalRcvState walRcvState; WalRcvState walRcvState;
ConditionVariable walRcvStoppedCV; ConditionVariable walRcvStoppedCV;
/*
* Its start time (actually, the time at which it was requested to be
* started).
*/
pg_time_t startTime; pg_time_t startTime;
/* /*
@ -134,15 +144,6 @@ typedef struct
/* set true once conninfo is ready to display (obfuscated pwds etc) */ /* set true once conninfo is ready to display (obfuscated pwds etc) */
bool ready_to_display; bool ready_to_display;
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
* receiveStartTLI), and also to tell it to send apply feedback to the
* primary whenever specially marked commit records are applied. This is
* normally mapped to procLatch when walreceiver is running.
*/
Latch *latch;
slock_t mutex; /* locks shared variables shown above */ slock_t mutex; /* locks shared variables shown above */
/* /*

View File

@ -418,10 +418,14 @@ typedef struct PROC_HDR
pg_atomic_uint32 procArrayGroupFirst; pg_atomic_uint32 procArrayGroupFirst;
/* First pgproc waiting for group transaction status update */ /* First pgproc waiting for group transaction status update */
pg_atomic_uint32 clogGroupFirst; pg_atomic_uint32 clogGroupFirst;
/* WALWriter process's latch */
Latch *walwriterLatch; /*
/* Checkpointer process's latch */ * Current slot numbers of some auxiliary processes. There can be only one
Latch *checkpointerLatch; * of each of these running at a time.
*/
ProcNumber walwriterProc;
ProcNumber checkpointerProc;
/* Current shared estimate of appropriate spins_per_delay value */ /* Current shared estimate of appropriate spins_per_delay value */
int spins_per_delay; int spins_per_delay;
/* Buffer id of the buffer that Startup process waits for pin on, or -1 */ /* Buffer id of the buffer that Startup process waits for pin on, or -1 */