From a36164e7465fd1e10e94569e9c1c07656e38a9de Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Fri, 23 Jan 2026 14:17:28 +0900 Subject: [PATCH] Add WALRCV_CONNECTING state to the WAL receiver Previously, a WAL receiver freshly started would set its state to WALRCV_STREAMING immediately at startup, before actually establishing a replication connection. This commit introduces a new state called WALRCV_CONNECTING, which is the state used when the WAL receiver freshly starts, or when a restart is requested, with a switch to WALRCV_STREAMING once the connection to the upstream server has been established with COPY_BOTH, meaning that the WAL receiver is ready to stream changes. This change is useful for monitoring purposes, especially in environments with a high latency where a connection could take some time to be established, giving some room between the [re]start phase and the streaming activity. From the point of view of the startup process, that flips the shared memory state of the WAL receiver when it needs to be stopped, the existing WALRCV_STREAMING and the new WALRCV_CONNECTING states have the same semantics: the WAL receiver has started and it can be stopped. Based on an initial suggestion from Noah Misch, with some input from me about the design. Author: Xuneng Zhou Reviewed-by: Chao Li Reviewed-by: Michael Paquier Reviewed-by: Rahila Syed Discussion: https://postgr.es/m/CABPTF7VQ5tGOSG5TS-Cg+Fb8gLCGFzxJ_eX4qg+WZ3ZPt=FtwQ@mail.gmail.com --- doc/src/sgml/monitoring.sgml | 6 ++++++ src/backend/replication/walreceiver.c | 21 ++++++++++++++++++--- src/backend/replication/walreceiverfuncs.c | 3 ++- src/include/replication/walreceiver.h | 1 + 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 88450facebd..b77d189a500 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1751,6 +1751,12 @@ description | Waiting for a newly initialized WAL file to reach durable storage but is not yet initialized. + + + connecting: WAL receiver is connecting to the + upstream server, replication has not yet started. + + stopping: WAL receiver has been requested to diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a41453530a1..6970af3f3ff 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -205,6 +205,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) /* The usual case */ break; + case WALRCV_CONNECTING: case WALRCV_WAITING: case WALRCV_STREAMING: case WALRCV_RESTARTING: @@ -215,7 +216,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; /* Fetch information required to start streaming */ walrcv->ready_to_display = false; @@ -395,6 +396,17 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) LSN_FORMAT_ARGS(startpoint), startpointTLI)); first_stream = false; + /* + * Switch to STREAMING after a successful connection if current + * state is CONNECTING. This switch happens after an initial + * startup, or after a restart as determined by + * WalRcvWaitForStartPosition(). + */ + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_CONNECTING) + walrcv->walRcvState = WALRCV_STREAMING; + SpinLockRelease(&walrcv->mutex); + /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); @@ -650,7 +662,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; - if (state != WALRCV_STREAMING) + if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING) { SpinLockRelease(&walrcv->mutex); if (state == WALRCV_STOPPING) @@ -689,7 +701,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) */ *startpoint = walrcv->receiveStart; *startpointTLI = walrcv->receiveStartTLI; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; SpinLockRelease(&walrcv->mutex); break; } @@ -792,6 +804,7 @@ WalRcvDie(int code, Datum arg) /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_STREAMING || + walrcv->walRcvState == WALRCV_CONNECTING || walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_STARTING || walrcv->walRcvState == WALRCV_WAITING || @@ -1391,6 +1404,8 @@ WalRcvGetStateString(WalRcvState state) return "stopped"; case WALRCV_STARTING: return "starting"; + case WALRCV_CONNECTING: + return "connecting"; case WALRCV_STREAMING: return "streaming"; case WALRCV_WAITING: diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index da8794cba7c..42e3e170bc0 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -179,7 +179,7 @@ WalRcvStreaming(void) } if (state == WALRCV_STREAMING || state == WALRCV_STARTING || - state == WALRCV_RESTARTING) + state == WALRCV_CONNECTING || state == WALRCV_RESTARTING) return true; else return false; @@ -211,6 +211,7 @@ ShutdownWalRcv(void) stopped = true; break; + case WALRCV_CONNECTING: case WALRCV_STREAMING: case WALRCV_WAITING: case WALRCV_RESTARTING: diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f3ad00fb6f3..9b9bd916314 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -47,6 +47,7 @@ typedef enum WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ + WALRCV_CONNECTING, /* connecting to upstream server */ WALRCV_STREAMING, /* walreceiver is streaming */ WALRCV_WAITING, /* stopped streaming, waiting for orders */ WALRCV_RESTARTING, /* asked to restart streaming */