diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index ebcd936acb6..291f504d234 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29349,6 +29349,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
for details.
+ Note that this function cannot be executed if
+
+ sync_replication_slots is enabled and the slotsync
+ worker is already running to perform the synchronization of slots.
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index cb39adcd0ea..578cfce896e 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -79,10 +79,11 @@
* and also sets stopSignaled=true to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker. If stopSignaled is set, the worker will exit in such a
- * case. Note that we don't need to reset this variable as after promotion the
- * slot sync worker won't be restarted because the pmState changes to PM_RUN from
- * PM_HOT_STANDBY and we don't support demoting primary without restarting the
- * server. See MaybeStartSlotSyncWorker.
+ * case. The SQL function pg_sync_replication_slots() will also error out if
+ * this flag is set. Note that we don't need to reset this variable as after
+ * promotion the slot sync worker won't be restarted because the pmState
+ * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
+ * primary without restarting the server. See MaybeStartSlotSyncWorker.
*
* The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
* overwrites.
@@ -92,9 +93,6 @@
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
- *
- * All the fields except 'syncing' are used only by slotsync worker.
- * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
@@ -807,20 +805,6 @@ synchronize_slots(WalReceiverConn *wrconn)
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary";
- SpinLockAcquire(&SlotSyncCtx->mutex);
- if (SlotSyncCtx->syncing)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot synchronize replication slots concurrently"));
- }
-
- SlotSyncCtx->syncing = true;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = true;
-
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
@@ -937,12 +921,6 @@ synchronize_slots(WalReceiverConn *wrconn)
if (started_tx)
CommitTransactionCommand();
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- syncing_slots = false;
-
return some_slot_updated;
}
@@ -1190,6 +1168,19 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
slotsync_reread_config();
}
+/*
+ * Connection cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_disconnect(int code, Datum arg)
+{
+ WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
+
+ walrcv_disconnect(wrconn);
+}
+
/*
* Cleanup function for slotsync worker.
*
@@ -1198,8 +1189,38 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
static void
slotsync_worker_onexit(int code, Datum arg)
{
+ /*
+ * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+ *
+ * The startup process during promotion invokes ShutDownSlotSync() which
+ * waits for slot sync to finish and it does that by checking the
+ * 'syncing' flag. Thus the slot sync worker must be done with slots'
+ * release and cleanup to avoid any dangling temporary slots or active
+ * slots before it marks itself as finished syncing.
+ */
+
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
+ /* Also cleanup the temporary slots. */
+ ReplicationSlotCleanup(false);
+
SpinLockAcquire(&SlotSyncCtx->mutex);
+
SlotSyncCtx->pid = InvalidPid;
+
+ /*
+ * If syncing_slots is true, it indicates that the process errored out
+ * without resetting the flag. So, we need to clean up shared memory and
+ * reset the flag here.
+ */
+ if (syncing_slots)
+ {
+ SlotSyncCtx->syncing = false;
+ syncing_slots = false;
+ }
+
SpinLockRelease(&SlotSyncCtx->mutex);
}
@@ -1242,6 +1263,64 @@ wait_for_slot_activity(bool some_slot_updated)
ResetLatch(MyLatch);
}
+/*
+ * Emit an error if a promotion or a concurrent sync call is in progress.
+ * Otherwise, advertise that a sync is in progress.
+ */
+static void
+check_and_set_sync_info(pid_t worker_pid)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* The worker pid must not be already assigned in SlotSyncCtx */
+ Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Emit an error if startup process signaled the slot sync machinery to
+ * stop. See comments atop SlotSyncCtxStruct.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
+ }
+
+ if (SlotSyncCtx->syncing)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot synchronize replication slots concurrently"));
+ }
+
+ SlotSyncCtx->syncing = true;
+
+ /*
+ * Advertise the required PID so that the startup process can kill the
+ * slot sync worker on promotion.
+ */
+ SlotSyncCtx->pid = worker_pid;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = true;
+}
+
+/*
+ * Reset syncing flag.
+ */
+static void
+reset_syncing_flag()
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->syncing = false;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ syncing_slots = false;
+};
+
/*
* The main loop of our worker process.
*
@@ -1278,47 +1357,6 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
Assert(SlotSyncCtx != NULL);
- SpinLockAcquire(&SlotSyncCtx->mutex);
- Assert(SlotSyncCtx->pid == InvalidPid);
-
- /*
- * Startup process signaled the slot sync worker to stop, so if meanwhile
- * postmaster ended up starting the worker again, exit.
- */
- if (SlotSyncCtx->stopSignaled)
- {
- SpinLockRelease(&SlotSyncCtx->mutex);
- proc_exit(0);
- }
-
- /* Advertise our PID so that the startup process can kill us on promotion */
- SlotSyncCtx->pid = MyProcPid;
- SpinLockRelease(&SlotSyncCtx->mutex);
-
- ereport(LOG, errmsg("slot sync worker started"));
-
- /* Register it as soon as SlotSyncCtx->pid is initialized. */
- before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
-
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGINT, SignalHandlerForShutdownRequest);
- pqsignal(SIGTERM, die);
- pqsignal(SIGFPE, FloatExceptionHandler);
- pqsignal(SIGUSR1, procsignal_sigusr1_handler);
- pqsignal(SIGUSR2, SIG_IGN);
- pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGCHLD, SIG_DFL);
-
- /*
- * Establishes SIGALRM handler and initialize timeout module. It is needed
- * by InitPostgres to register different timeouts.
- */
- InitializeTimeouts();
-
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
-
/*
* If an exception is encountered, processing resumes here.
*
@@ -1350,6 +1388,32 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ check_and_set_sync_info(MyProcPid);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /* Register it as soon as SlotSyncCtx->pid is initialized. */
+ before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
@@ -1402,13 +1466,13 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
errmsg("could not connect to the primary server: %s", err));
/*
- * Register the failure callback once we have the connection.
+ * Register the disconnection callback.
*
- * XXX: This can be combined with previous such cleanup registration of
+ * XXX: This can be combined with previous cleanup registration of
* slotsync_worker_onexit() but that will need the connection to be made
* global and we want to avoid introducing global for this purpose.
*/
- before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+ before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
/*
* Using the specified primary server connection, check that we are not a
@@ -1457,8 +1521,8 @@ update_synced_slots_inactive_since(void)
if (!StandbyMode)
return;
- /* The slot sync worker mustn't be running by now */
- Assert(SlotSyncCtx->pid == InvalidPid);
+ /* The slot sync worker or SQL function mustn't be running by now */
+ Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -1471,6 +1535,9 @@ update_synced_slots_inactive_since(void)
{
Assert(SlotIsLogical(s));
+ /* The slot must not be acquired by any process */
+ Assert(s->active_pid == 0);
+
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
@@ -1486,25 +1553,39 @@ update_synced_slots_inactive_since(void)
/*
* Shut down the slot sync worker.
+ *
+ * This function sends signal to shutdown slot sync worker, if required. It
+ * also waits till the slot sync worker has exited or
+ * pg_sync_replication_slots() has finished.
*/
void
ShutDownSlotSync(void)
{
+ pid_t worker_pid;
+
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->stopSignaled = true;
- if (SlotSyncCtx->pid == InvalidPid)
+ /*
+ * Return if neither the slot sync worker is running nor the function
+ * pg_sync_replication_slots() is executing.
+ */
+ if (!SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
return;
}
+
+ worker_pid = SlotSyncCtx->pid;
+
SpinLockRelease(&SlotSyncCtx->mutex);
- kill(SlotSyncCtx->pid, SIGINT);
+ if (worker_pid != InvalidPid)
+ kill(worker_pid, SIGINT);
- /* Wait for it to die */
+ /* Wait for slot sync to end */
for (;;)
{
int rc;
@@ -1522,8 +1603,8 @@ ShutDownSlotSync(void)
SpinLockAcquire(&SlotSyncCtx->mutex);
- /* Is it gone? */
- if (SlotSyncCtx->pid == InvalidPid)
+ /* Ensure that no process is syncing the slots. */
+ if (!SlotSyncCtx->syncing)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
@@ -1601,26 +1682,37 @@ SlotSyncShmemInit(void)
}
/*
- * Error cleanup callback for slot synchronization.
+ * Error cleanup callback for slot sync SQL function.
*/
static void
slotsync_failure_callback(int code, Datum arg)
{
WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
- if (syncing_slots)
- {
- /*
- * If syncing_slots is true, it indicates that the process errored out
- * without resetting the flag. So, we need to clean up shared memory
- * and reset the flag here.
- */
- SpinLockAcquire(&SlotSyncCtx->mutex);
- SlotSyncCtx->syncing = false;
- SpinLockRelease(&SlotSyncCtx->mutex);
+ /*
+ * We need to do slots cleanup here just like WalSndErrorCleanup() does.
+ *
+ * The startup process during promotion invokes ShutDownSlotSync() which
+ * waits for slot sync to finish and it does that by checking the
+ * 'syncing' flag. Thus the SQL function must be done with slots' release
+ * and cleanup to avoid any dangling temporary slots or active slots
+ * before it marks itself as finished syncing.
+ */
- syncing_slots = false;
- }
+ /* Make sure active replication slots are released */
+ if (MyReplicationSlot != NULL)
+ ReplicationSlotRelease();
+
+ /* Also cleanup the synced temporary slots. */
+ ReplicationSlotCleanup(true);
+
+ /*
+ * The set syncing_slots indicates that the process errored out without
+ * resetting the flag. So, we need to clean up shared memory and reset the
+ * flag here.
+ */
+ if (syncing_slots)
+ reset_syncing_flag();
walrcv_disconnect(wrconn);
}
@@ -1634,9 +1726,17 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
+ check_and_set_sync_info(InvalidPid);
+
validate_remote_info(wrconn);
synchronize_slots(wrconn);
+
+ /* Cleanup the synced temporary slots */
+ ReplicationSlotCleanup(true);
+
+ /* We are done with sync, so reset sync flag */
+ reset_syncing_flag();
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cebf44bb0fe..aa4ea387da0 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -237,7 +237,7 @@ ReplicationSlotShmemExit(int code, Datum arg)
ReplicationSlotRelease();
/* Also cleanup all the temporary slots. */
- ReplicationSlotCleanup();
+ ReplicationSlotCleanup(false);
}
/*
@@ -736,10 +736,13 @@ ReplicationSlotRelease(void)
}
/*
- * Cleanup all temporary slots created in current session.
+ * Cleanup temporary slots created in current session.
+ *
+ * Cleanup only synced temporary slots if 'synced_only' is true, else
+ * cleanup all temporary slots.
*/
void
-ReplicationSlotCleanup(void)
+ReplicationSlotCleanup(bool synced_only)
{
int i;
@@ -755,7 +758,8 @@ restart:
continue;
SpinLockAcquire(&s->mutex);
- if (s->active_pid == MyProcPid)
+ if ((s->active_pid == MyProcPid &&
+ (!synced_only || s->data.synced)))
{
Assert(s->data.persistency == RS_TEMPORARY);
SpinLockRelease(&s->mutex);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9bf7c67f37d..c623b07cf02 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -336,7 +336,7 @@ WalSndErrorCleanup(void)
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
- ReplicationSlotCleanup();
+ ReplicationSlotCleanup(false);
replication_active = false;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 76f48b13d20..2dff28afcef 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4410,7 +4410,7 @@ PostgresMain(const char *dbname, const char *username)
ReplicationSlotRelease();
/* We also want to cleanup temporary slots on error. */
- ReplicationSlotCleanup();
+ ReplicationSlotCleanup(false);
jit_reset_after_error();
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7b937d1a0cf..1bc80960ef7 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -247,7 +247,7 @@ extern void ReplicationSlotAlter(const char *name, bool failover);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
-extern void ReplicationSlotCleanup(void);
+extern void ReplicationSlotCleanup(bool synced_only);
extern void ReplicationSlotSave(void);
extern void ReplicationSlotMarkDirty(void);