diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e71b6e21123..05ac7c5f7f8 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6280,7 +6280,7 @@ StartupXLOG(void) * Wake up all waiters for replay LSN. They need to report an error that * recovery was ended before reaching the target LSN. */ - WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr); + WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, InvalidXLogRecPtr); /* * Shutdown the recovery environment. This must occur after diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index a21ac48c9fe..0b5f871abe7 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1856,8 +1856,8 @@ PerformWalRecovery(void) */ if (waitLSNState && (XLogRecoveryCtl->lastReplayedEndRecPtr >= - pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY]))) - WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr); + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_REPLAY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr); /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c index 6c2bda763e2..5020ae1e52d 100644 --- a/src/backend/access/transam/xlogwait.c +++ b/src/backend/access/transam/xlogwait.c @@ -12,25 +12,30 @@ * This file implements waiting for WAL operations to reach specific LSNs * on both physical standby and primary servers. The core idea is simple: * every process that wants to wait publishes the LSN it needs to the - * shared memory, and the appropriate process (startup on standby, or - * WAL writer/backend on primary) wakes it once that LSN has been reached. + * shared memory, and the appropriate process (startup on standby, + * walreceiver on standby, or WAL writer/backend on primary) wakes it + * once that LSN has been reached. * * The shared memory used by this module comprises a procInfos * per-backend array with the information of the awaited LSN for each * of the backend processes. The elements of that array are organized - * into a pairing heap waitersHeap, which allows for very fast finding - * of the least awaited LSN. + * into pairing heaps (waitersHeap), one for each WaitLSNType, which + * allows for very fast finding of the least awaited LSN for each type. * - * In addition, the least-awaited LSN is cached as minWaitedLSN. The - * waiter process publishes information about itself to the shared - * memory and waits on the latch until it is woken up by the appropriate - * process, standby is promoted, or the postmaster dies. Then, it cleans - * information about itself in the shared memory. + * In addition, the least-awaited LSN for each type is cached in the + * minWaitedLSN array. The waiter process publishes information about + * itself to the shared memory and waits on the latch until it is woken + * up by the appropriate process, standby is promoted, or the postmaster + * dies. Then, it cleans information about itself in the shared memory. * - * On standby servers: After replaying a WAL record, the startup process - * first performs a fast path check minWaitedLSN > replayLSN. If this - * check is negative, it checks waitersHeap and wakes up the backend - * whose awaited LSNs are reached. + * On standby servers: + * - After replaying a WAL record, the startup process performs a fast + * path check minWaitedLSN[REPLAY] > replayLSN. If this check is + * negative, it checks waitersHeap[REPLAY] and wakes up the backends + * whose awaited LSNs are reached. + * - After receiving WAL, the walreceiver process performs similar checks + * against the flush and write LSNs, waking up waiters in the FLUSH + * and WRITE heaps, respectively. * * On primary servers: After flushing WAL, the WAL writer or backend * process performs a similar check against the flush LSN and wakes up @@ -49,6 +54,7 @@ #include "access/xlogwait.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/latch.h" #include "storage/proc.h" #include "storage/shmem.h" @@ -62,6 +68,47 @@ static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, struct WaitLSNState *waitLSNState = NULL; +/* + * Wait event for each WaitLSNType, used with WaitLatch() to report + * the wait in pg_stat_activity. + */ +static const uint32 WaitLSNWaitEvents[] = { + [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY, + [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE, + [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH, + [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH, +}; + +StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT, + "WaitLSNWaitEvents must match WaitLSNType enum"); + +/* + * Get the current LSN for the specified wait type. + */ +XLogRecPtr +GetCurrentLSNForWaitType(WaitLSNType lsnType) +{ + Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT); + + switch (lsnType) + { + case WAIT_LSN_TYPE_STANDBY_REPLAY: + return GetXLogReplayRecPtr(NULL); + + case WAIT_LSN_TYPE_STANDBY_WRITE: + return GetWalRcvWriteRecPtr(); + + case WAIT_LSN_TYPE_STANDBY_FLUSH: + return GetWalRcvFlushRecPtr(NULL, NULL); + + case WAIT_LSN_TYPE_PRIMARY_FLUSH: + return GetFlushRecPtr(NULL); + } + + elog(ERROR, "invalid LSN wait type: %d", lsnType); + pg_unreachable(); +} + /* Report the amount of shared memory space needed for WaitLSNState. */ Size WaitLSNShmemSize(void) @@ -302,6 +349,19 @@ WaitLSNCleanup(void) } } +/* + * Check if the given LSN type requires recovery to be in progress. + * Standby wait types (replay, write, flush) require recovery; + * primary wait types (flush) do not. + */ +static inline bool +WaitLSNTypeRequiresRecovery(WaitLSNType t) +{ + return t == WAIT_LSN_TYPE_STANDBY_REPLAY || + t == WAIT_LSN_TYPE_STANDBY_WRITE || + t == WAIT_LSN_TYPE_STANDBY_FLUSH; +} + /* * Wait using MyLatch till the given LSN is reached, the replica gets * promoted, or the postmaster dies. @@ -341,13 +401,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout) int rc; long delay_ms = -1; - if (lsnType == WAIT_LSN_TYPE_REPLAY) - currentLSN = GetXLogReplayRecPtr(NULL); - else - currentLSN = GetFlushRecPtr(NULL); + /* Get current LSN for the wait type */ + currentLSN = GetCurrentLSNForWaitType(lsnType); /* Check that recovery is still in-progress */ - if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress()) + if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress()) { /* * Recovery was ended, but check if target LSN was already @@ -376,7 +434,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout) CHECK_FOR_INTERRUPTS(); rc = WaitLatch(MyLatch, wake_events, delay_ms, - (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH); + WaitLSNWaitEvents[lsnType]); /* * Emergency bailout if postmaster has died. This is to avoid the diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c index e4509fffe06..57d2dec07f1 100644 --- a/src/backend/commands/wait.c +++ b/src/backend/commands/wait.c @@ -140,7 +140,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) */ Assert(MyProc->xmin == InvalidTransactionId); - waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout); + waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_STANDBY_REPLAY, lsn, timeout); /* * Process the result of WaitForLSN(). Throw appropriate error if needed. diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 43d870dbcf1..3299de23bb3 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." -WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary." +WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary or standby." WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby." +WAIT_FOR_WAL_WRITE "Waiting for WAL write to reach a target LSN on a standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h index b5fd3e74f1c..d12531d32b8 100644 --- a/src/include/access/xlogwait.h +++ b/src/include/access/xlogwait.h @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * xlogwait.h - * Declarations for LSN replay waiting routines. + * Declarations for WAL flush, write, and replay waiting routines. * * Copyright (c) 2025-2026, PostgreSQL Global Development Group * @@ -35,11 +35,16 @@ typedef enum */ typedef enum WaitLSNType { - WAIT_LSN_TYPE_REPLAY, /* Waiting for replay on standby */ - WAIT_LSN_TYPE_FLUSH, /* Waiting for flush on primary */ + /* Standby wait types (walreceiver/startup wakes) */ + WAIT_LSN_TYPE_STANDBY_REPLAY, + WAIT_LSN_TYPE_STANDBY_WRITE, + WAIT_LSN_TYPE_STANDBY_FLUSH, + + /* Primary wait types (WAL writer/backends wake) */ + WAIT_LSN_TYPE_PRIMARY_FLUSH, } WaitLSNType; -#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_FLUSH + 1) +#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_PRIMARY_FLUSH + 1) /* * WaitLSNProcInfo - the shared memory structure representing information @@ -97,6 +102,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState; extern Size WaitLSNShmemSize(void); extern void WaitLSNShmemInit(void); +extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType); extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN); extern void WaitLSNCleanup(void); extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,