diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 1ba813bbb9a..c35045faa1a 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1766,6 +1766,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting for confirmation from a remote server during synchronous
replication.
+
+ WalrcvExit
+ Waiting for the walreceiver to exit.
+
XactGroupUpdate
Waiting for the group leader to update transaction status at
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 68eefb97227..b1e2d94951d 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4124,6 +4124,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_SYNC_REP:
event_name = "SyncRep";
break;
+ case WAIT_EVENT_WALRCV_EXIT:
+ event_name = "WalrcvExit";
+ break;
case WAIT_EVENT_XACT_GROUP_UPDATE:
event_name = "XactGroupUpdate";
break;
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e5f8a06fea0..8532296f26c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -207,6 +207,7 @@ WalReceiverMain(void)
case WALRCV_STOPPED:
SpinLockRelease(&walrcv->mutex);
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
proc_exit(1);
break;
@@ -784,6 +785,8 @@ WalRcvDie(int code, Datum arg)
walrcv->latch = NULL;
SpinLockRelease(&walrcv->mutex);
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
+
/* Terminate the connection gracefully. */
if (wrconn != NULL)
walrcv_disconnect(wrconn);
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 63e60478ea6..fff6c54c45d 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -23,6 +23,7 @@
#include
#include "access/xlog_internal.h"
+#include "pgstat.h"
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
@@ -62,6 +63,7 @@ WalRcvShmemInit(void)
/* First time through, so initialize */
MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED;
+ ConditionVariableInit(&WalRcv->walRcvStoppedCV);
SpinLockInit(&WalRcv->mutex);
pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
WalRcv->latch = NULL;
@@ -95,12 +97,18 @@ WalRcvRunning(void)
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
+ bool stopped = false;
+
SpinLockAcquire(&walrcv->mutex);
-
if (walrcv->walRcvState == WALRCV_STARTING)
+ {
state = walrcv->walRcvState = WALRCV_STOPPED;
-
+ stopped = true;
+ }
SpinLockRelease(&walrcv->mutex);
+
+ if (stopped)
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
}
}
@@ -140,12 +148,18 @@ WalRcvStreaming(void)
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
+ bool stopped = false;
+
SpinLockAcquire(&walrcv->mutex);
-
if (walrcv->walRcvState == WALRCV_STARTING)
+ {
state = walrcv->walRcvState = WALRCV_STOPPED;
-
+ stopped = true;
+ }
SpinLockRelease(&walrcv->mutex);
+
+ if (stopped)
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
}
}
@@ -165,6 +179,7 @@ ShutdownWalRcv(void)
{
WalRcvData *walrcv = WalRcv;
pid_t walrcvpid = 0;
+ bool stopped = false;
/*
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -178,6 +193,7 @@ ShutdownWalRcv(void)
break;
case WALRCV_STARTING:
walrcv->walRcvState = WALRCV_STOPPED;
+ stopped = true;
break;
case WALRCV_STREAMING:
@@ -191,6 +207,10 @@ ShutdownWalRcv(void)
}
SpinLockRelease(&walrcv->mutex);
+ /* Unnecessary but consistent. */
+ if (stopped)
+ ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
+
/*
* Signal walreceiver process if it was still running.
*/
@@ -201,16 +221,11 @@ ShutdownWalRcv(void)
* Wait for walreceiver to acknowledge its death by setting state to
* WALRCV_STOPPED.
*/
+ ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
while (WalRcvRunning())
- {
- /*
- * This possibly-long loop needs to handle interrupts of startup
- * process.
- */
- HandleStartupProcInterrupts();
-
- pg_usleep(100000); /* 100ms */
- }
+ ConditionVariableSleep(&walrcv->walRcvStoppedCV,
+ WAIT_EVENT_WALRCV_EXIT);
+ ConditionVariableCancelSleep();
}
/*
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f9166b86558..be43c048028 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1009,6 +1009,7 @@ typedef enum
WAIT_EVENT_REPLICATION_SLOT_DROP,
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP,
+ WAIT_EVENT_WALRCV_EXIT,
WAIT_EVENT_XACT_GROUP_UPDATE
} WaitEventIPC;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index a97a59a6a30..4fd7c25ea74 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -19,6 +19,7 @@
#include "port/atomics.h"
#include "replication/logicalproto.h"
#include "replication/walsender.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "utils/tuplestore.h"
@@ -62,6 +63,7 @@ typedef struct
*/
pid_t pid;
WalRcvState walRcvState;
+ ConditionVariable walRcvStoppedCV;
pg_time_t startTime;
/*