1
0
mirror of https://github.com/postgres/postgres.git synced 2026-01-26 09:41:40 +03:00

Add WALRCV_CONNECTING state to the WAL receiver

Previously, a WAL receiver freshly started would set its state to
WALRCV_STREAMING immediately at startup, before actually establishing a
replication connection.

This commit introduces a new state called WALRCV_CONNECTING, which is
the state used when the WAL receiver freshly starts, or when a restart
is requested, with a switch to WALRCV_STREAMING once the connection to
the upstream server has been established with COPY_BOTH, meaning that
the WAL receiver is ready to stream changes.  This change is useful for
monitoring purposes, especially in environments with a high latency
where a connection could take some time to be established, giving some
room between the [re]start phase and the streaming activity.

From the point of view of the startup process, that flips the shared
memory state of the WAL receiver when it needs to be stopped, the
existing WALRCV_STREAMING and the new WALRCV_CONNECTING states have the
same semantics: the WAL receiver has started and it can be stopped.

Based on an initial suggestion from Noah Misch, with some input from me
about the design.

Author: Xuneng Zhou <xunengzhou@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Rahila Syed <rahilasyed90@gmail.com>
Discussion: https://postgr.es/m/CABPTF7VQ5tGOSG5TS-Cg+Fb8gLCGFzxJ_eX4qg+WZ3ZPt=FtwQ@mail.gmail.com
This commit is contained in:
Michael Paquier
2026-01-23 14:17:28 +09:00
parent f9a468c664
commit a36164e746
4 changed files with 27 additions and 4 deletions

View File

@@ -1751,6 +1751,12 @@ description | Waiting for a newly initialized WAL file to reach durable storage
but is not yet initialized.
</para>
</listitem>
<listitem>
<para>
<literal>connecting</literal>: WAL receiver is connecting to the
upstream server, replication has not yet started.
</para>
</listitem>
<listitem>
<para>
<literal>stopping</literal>: WAL receiver has been requested to

View File

@@ -205,6 +205,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
/* The usual case */
break;
case WALRCV_CONNECTING:
case WALRCV_WAITING:
case WALRCV_STREAMING:
case WALRCV_RESTARTING:
@@ -215,7 +216,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
}
/* Advertise our PID so that the startup process can kill us */
walrcv->pid = MyProcPid;
walrcv->walRcvState = WALRCV_STREAMING;
walrcv->walRcvState = WALRCV_CONNECTING;
/* Fetch information required to start streaming */
walrcv->ready_to_display = false;
@@ -395,6 +396,17 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
LSN_FORMAT_ARGS(startpoint), startpointTLI));
first_stream = false;
/*
* Switch to STREAMING after a successful connection if current
* state is CONNECTING. This switch happens after an initial
* startup, or after a restart as determined by
* WalRcvWaitForStartPosition().
*/
SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_CONNECTING)
walrcv->walRcvState = WALRCV_STREAMING;
SpinLockRelease(&walrcv->mutex);
/* Initialize LogstreamResult and buffers for processing messages */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
initStringInfo(&reply_message);
@@ -650,7 +662,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
SpinLockAcquire(&walrcv->mutex);
state = walrcv->walRcvState;
if (state != WALRCV_STREAMING)
if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING)
{
SpinLockRelease(&walrcv->mutex);
if (state == WALRCV_STOPPING)
@@ -689,7 +701,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
*/
*startpoint = walrcv->receiveStart;
*startpointTLI = walrcv->receiveStartTLI;
walrcv->walRcvState = WALRCV_STREAMING;
walrcv->walRcvState = WALRCV_CONNECTING;
SpinLockRelease(&walrcv->mutex);
break;
}
@@ -792,6 +804,7 @@ WalRcvDie(int code, Datum arg)
/* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
walrcv->walRcvState == WALRCV_CONNECTING ||
walrcv->walRcvState == WALRCV_RESTARTING ||
walrcv->walRcvState == WALRCV_STARTING ||
walrcv->walRcvState == WALRCV_WAITING ||
@@ -1391,6 +1404,8 @@ WalRcvGetStateString(WalRcvState state)
return "stopped";
case WALRCV_STARTING:
return "starting";
case WALRCV_CONNECTING:
return "connecting";
case WALRCV_STREAMING:
return "streaming";
case WALRCV_WAITING:

View File

@@ -179,7 +179,7 @@ WalRcvStreaming(void)
}
if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
state == WALRCV_RESTARTING)
state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
return true;
else
return false;
@@ -211,6 +211,7 @@ ShutdownWalRcv(void)
stopped = true;
break;
case WALRCV_CONNECTING:
case WALRCV_STREAMING:
case WALRCV_WAITING:
case WALRCV_RESTARTING:

View File

@@ -47,6 +47,7 @@ typedef enum
WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
WALRCV_CONNECTING, /* connecting to upstream server */
WALRCV_STREAMING, /* walreceiver is streaming */
WALRCV_WAITING, /* stopped streaming, waiting for orders */
WALRCV_RESTARTING, /* asked to restart streaming */