diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 88450facebd..b77d189a500 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1751,6 +1751,12 @@ description | Waiting for a newly initialized WAL file to reach durable storage but is not yet initialized. + + + connecting: WAL receiver is connecting to the + upstream server, replication has not yet started. + + stopping: WAL receiver has been requested to diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a41453530a1..6970af3f3ff 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -205,6 +205,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) /* The usual case */ break; + case WALRCV_CONNECTING: case WALRCV_WAITING: case WALRCV_STREAMING: case WALRCV_RESTARTING: @@ -215,7 +216,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; /* Fetch information required to start streaming */ walrcv->ready_to_display = false; @@ -395,6 +396,17 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) LSN_FORMAT_ARGS(startpoint), startpointTLI)); first_stream = false; + /* + * Switch to STREAMING after a successful connection if current + * state is CONNECTING. This switch happens after an initial + * startup, or after a restart as determined by + * WalRcvWaitForStartPosition(). + */ + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_CONNECTING) + walrcv->walRcvState = WALRCV_STREAMING; + SpinLockRelease(&walrcv->mutex); + /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); @@ -650,7 +662,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; - if (state != WALRCV_STREAMING) + if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING) { SpinLockRelease(&walrcv->mutex); if (state == WALRCV_STOPPING) @@ -689,7 +701,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) */ *startpoint = walrcv->receiveStart; *startpointTLI = walrcv->receiveStartTLI; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; SpinLockRelease(&walrcv->mutex); break; } @@ -792,6 +804,7 @@ WalRcvDie(int code, Datum arg) /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_STREAMING || + walrcv->walRcvState == WALRCV_CONNECTING || walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_STARTING || walrcv->walRcvState == WALRCV_WAITING || @@ -1391,6 +1404,8 @@ WalRcvGetStateString(WalRcvState state) return "stopped"; case WALRCV_STARTING: return "starting"; + case WALRCV_CONNECTING: + return "connecting"; case WALRCV_STREAMING: return "streaming"; case WALRCV_WAITING: diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index da8794cba7c..42e3e170bc0 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -179,7 +179,7 @@ WalRcvStreaming(void) } if (state == WALRCV_STREAMING || state == WALRCV_STARTING || - state == WALRCV_RESTARTING) + state == WALRCV_CONNECTING || state == WALRCV_RESTARTING) return true; else return false; @@ -211,6 +211,7 @@ ShutdownWalRcv(void) stopped = true; break; + case WALRCV_CONNECTING: case WALRCV_STREAMING: case WALRCV_WAITING: case WALRCV_RESTARTING: diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f3ad00fb6f3..9b9bd916314 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -47,6 +47,7 @@ typedef enum WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ + WALRCV_CONNECTING, /* connecting to upstream server */ WALRCV_STREAMING, /* walreceiver is streaming */ WALRCV_WAITING, /* stopped streaming, waiting for orders */ WALRCV_RESTARTING, /* asked to restart streaming */