1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-27 12:41:57 +03:00

Add a new slot sync worker to synchronize logical slots.

By enabling slot synchronization, all the failover logical replication
slots on the primary (assuming configurations are appropriate) are
automatically created on the physical standbys and are synced
periodically. The slot sync worker on the standby server pings the primary
server at regular intervals to get the necessary failover logical slots
information and create/update the slots locally. The slots that no longer
require synchronization are automatically dropped by the worker.

The nap time of the worker is tuned according to the activity on the
primary. The slot sync worker waits for some time before the next
synchronization, with the duration varying based on whether any slots were
updated during the last cycle.

A new parameter sync_replication_slots enables or disables this new
process.

On promotion, the slot sync worker is shut down by the startup process to
drop any temporary slots acquired by the slot sync worker and to prevent
the worker from trying to fetch the failover slots.

A functionality to allow logical walsenders to wait for the physical will
be done in a subsequent commit.

Author: Shveta Malik, Hou Zhijie based on design inputs by Masahiko Sawada and Amit Kapila
Reviewed-by: Masahiko Sawada, Bertrand Drouvot, Peter Smith, Dilip Kumar, Ajin Cherian, Nisha Moond, Kuroda Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
This commit is contained in:
Amit Kapila
2024-02-22 15:25:15 +05:30
parent 3d47b75546
commit 93db6cbda0
19 changed files with 989 additions and 99 deletions

View File

@ -6,6 +6,9 @@
* loaded as a dynamic module to avoid linking the main server binary with
* libpq.
*
* Apart from walreceiver, the libpq-specific routines are now being used by
* logical replication workers and slot synchronization.
*
* Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
*
*

View File

@ -10,18 +10,25 @@
*
* This file contains the code for slot synchronization on a physical standby
* to fetch logical failover slots information from the primary server, create
* the slots on the standby and synchronize them. This is done by a call to SQL
* function pg_sync_replication_slots.
* the slots on the standby and synchronize them periodically.
*
* If on physical standby, the WAL corresponding to the remote's restart_lsn
* is not available or the remote's catalog_xmin precedes the oldest xid for which
* it is guaranteed that rows wouldn't have been removed then we cannot create
* the local standby slot because that would mean moving the local slot
* Slot synchronization can be performed either automatically by enabling slot
* sync worker or manually by calling SQL function pg_sync_replication_slots().
*
* If the WAL corresponding to the remote's restart_lsn is not available on the
* physical standby or the remote's catalog_xmin precedes the oldest xid for
* which it is guaranteed that rows wouldn't have been removed then we cannot
* create the local standby slot because that would mean moving the local slot
* backward and decoding won't be possible via such a slot. In this case, the
* slot will be marked as RS_TEMPORARY. Once the primary server catches up,
* the slot will be marked as RS_PERSISTENT (which means sync-ready) after
* which we can call pg_sync_replication_slots() periodically to perform
* syncs.
* which slot sync worker can perform the sync periodically or user can call
* pg_sync_replication_slots() periodically to perform the syncs.
*
* The slot sync worker waits for some time before the next synchronization,
* with the duration varying based on whether any slots were updated during
* the last cycle. Refer to the comments above wait_for_slot_activity() for
* more details.
*
* Any standby synchronized slots will be dropped if they no longer need
* to be synchronized. See comment atop drop_local_obsolete_slots() for more
@ -31,28 +38,84 @@
#include "postgres.h"
#include <time.h>
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
#include "libpq/pqsignal.h"
#include "pgstat.h"
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
/* Struct for sharing information to control slot synchronization. */
/*
* Struct for sharing information to control slot synchronization.
*
* The slot sync worker's pid is needed by the startup process to shut it
* down during promotion. The startup process shuts down the slot sync worker
* and also sets stopSignaled=true to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker. If stopSignaled is set, the worker will exit in such a
* case. Note that we don't need to reset this variable as after promotion the
* slot sync worker won't be restarted because the pmState changes to PM_RUN from
* PM_HOT_STANDBY and we don't support demoting primary without restarting the
* server. See MaybeStartSlotSyncWorker.
*
* The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
* overwrites.
*
* The 'last_start_time' is needed by postmaster to start the slot sync worker
* once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where a immediate restart
* is expected (e.g., slot sync GUCs change), slot sync worker will reset
* last_start_time before exiting, so that postmaster can start the worker
* without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
*
* All the fields except 'syncing' are used only by slotsync worker.
* 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
*/
typedef struct SlotSyncCtxStruct
{
/* prevents concurrent slot syncs to avoid slot overwrites */
pid_t pid;
bool stopSignaled;
bool syncing;
time_t last_start_time;
slock_t mutex;
} SlotSyncCtxStruct;
SlotSyncCtxStruct *SlotSyncCtx = NULL;
/* GUC variable */
bool sync_replication_slots = false;
/*
* The sleep time (ms) between slot-sync cycles varies dynamically
* (within a MIN/MAX range) according to slot activity. See
* wait_for_slot_activity() for details.
*/
#define MIN_WORKER_NAPTIME_MS 200
#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */
static long sleep_ms = MIN_WORKER_NAPTIME_MS;
/* The restart interval for slot sync work used by postmaster */
#define SLOTSYNC_RESTART_INTERVAL_SEC 10
/* Flag to tell if we are in a slot sync worker process */
static bool am_slotsync_worker = false;
/*
* Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
* in SlotSyncCtxStruct, this flag is true only if the current process is
@ -79,6 +142,13 @@ typedef struct RemoteSlot
ReplicationSlotInvalidationCause invalidated;
} RemoteSlot;
#ifdef EXEC_BACKEND
static pid_t slotsyncworker_forkexec(void);
#endif
NON_EXEC_STATIC void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn();
static void slotsync_failure_callback(int code, Datum arg);
/*
* If necessary, update the local synced slot's metadata based on the data
* from the remote slot.
@ -343,8 +413,11 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
* If the remote restart_lsn and catalog_xmin have caught up with the
* local ones, then update the LSNs and persist the local synced slot for
* future synchronization; otherwise, do nothing.
*
* Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
* false.
*/
static void
static bool
update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
@ -375,7 +448,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
remote_slot->catalog_xmin,
LSN_FORMAT_ARGS(slot->data.restart_lsn),
slot->data.catalog_xmin));
return;
return false;
}
/* First time slot update, the function must return true */
@ -387,6 +460,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(LOG,
errmsg("newly created slot \"%s\" is sync-ready now",
remote_slot->name));
return true;
}
/*
@ -399,12 +474,15 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* the remote_slot catches up with locally reserved position and local slot is
* updated. The slot is then persisted and is considered as sync-ready for
* periodic syncs.
*
* Returns TRUE if the local slot is updated.
*/
static void
static bool
synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
bool slot_updated = false;
/*
* Make sure that concerned WAL is received and flushed before syncing
@ -412,12 +490,17 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
*/
latestFlushPtr = GetStandbyFlushRecPtr(NULL);
if (remote_slot->confirmed_lsn > latestFlushPtr)
elog(ERROR,
"skipping slot synchronization as the received slot sync"
" LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
remote_slot->name,
LSN_FORMAT_ARGS(latestFlushPtr));
{
ereport(am_slotsync_worker ? LOG : ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipping slot synchronization as the received slot sync"
" LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
remote_slot->name,
LSN_FORMAT_ARGS(latestFlushPtr)));
return false;
}
/* Search for the named slot */
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
@ -465,19 +548,22 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Make sure the invalidated state persists across server restart */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
slot_updated = true;
}
/* Skip the sync of an invalidated slot */
if (slot->data.invalidated != RS_INVAL_NONE)
{
ReplicationSlotRelease();
return;
return slot_updated;
}
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
slot_updated = update_and_persist_local_synced_slot(remote_slot,
remote_dbid);
}
/* Slot ready for sync, so sync it. */
@ -500,6 +586,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlotMarkDirty();
ReplicationSlotSave();
slot_updated = true;
}
}
}
@ -511,7 +599,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Skip creating the local slot if remote_slot is invalidated already */
if (remote_slot->invalidated != RS_INVAL_NONE)
return;
return false;
/*
* We create temporary slots instead of ephemeral slots here because
@ -548,9 +636,13 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
LWLockRelease(ProcArrayLock);
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
slot_updated = true;
}
ReplicationSlotRelease();
return slot_updated;
}
/*
@ -558,8 +650,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
*
* Gets the failover logical slots info from the primary server and updates
* the slots locally. Creates the slots if not present on the standby.
*
* Returns TRUE if any of the slots gets updated in this sync-cycle.
*/
static void
static bool
synchronize_slots(WalReceiverConn *wrconn)
{
#define SLOTSYNC_COLUMN_COUNT 9
@ -569,6 +663,8 @@ synchronize_slots(WalReceiverConn *wrconn)
WalRcvExecResult *res;
TupleTableSlot *tupslot;
List *remote_slot_list = NIL;
bool some_slot_updated = false;
bool started_tx = false;
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
" restart_lsn, catalog_xmin, two_phase, failover,"
" database, conflict_reason"
@ -589,9 +685,15 @@ synchronize_slots(WalReceiverConn *wrconn)
syncing_slots = true;
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
StartTransactionCommand();
started_tx = true;
}
/* Execute the query */
res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
errmsg("could not fetch failover logical slots info from the primary server: %s",
@ -686,7 +788,7 @@ synchronize_slots(WalReceiverConn *wrconn)
*/
LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
synchronize_one_slot(remote_slot, remote_dbid);
some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
@ -696,11 +798,16 @@ synchronize_slots(WalReceiverConn *wrconn)
walrcv_clear_result(res);
if (started_tx)
CommitTransactionCommand();
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
return some_slot_updated;
}
/*
@ -720,6 +827,7 @@ validate_remote_info(WalReceiverConn *wrconn)
TupleTableSlot *tupslot;
bool remote_in_recovery;
bool primary_slot_valid;
bool started_tx = false;
initStringInfo(&cmd);
appendStringInfo(&cmd,
@ -728,6 +836,13 @@ validate_remote_info(WalReceiverConn *wrconn)
" WHERE slot_type='physical' AND slot_name=%s",
quote_literal_cstr(PrimarySlotName));
/* The syscache access in walrcv_exec() needs a transaction env. */
if (!IsTransactionState())
{
StartTransactionCommand();
started_tx = true;
}
res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
pfree(cmd.data);
@ -763,58 +878,21 @@ validate_remote_info(WalReceiverConn *wrconn)
ExecClearTuple(tupslot);
walrcv_clear_result(res);
if (started_tx)
CommitTransactionCommand();
}
/*
* Check all necessary GUCs for slot synchronization are set
* appropriately, otherwise, raise ERROR.
* Checks if dbname is specified in 'primary_conninfo'.
*
* Error out if not specified otherwise return it.
*/
void
ValidateSlotSyncParams(void)
char *
CheckAndGetDbnameFromConninfo(void)
{
char *dbname;
/*
* A physical replication slot(primary_slot_name) is required on the
* primary to ensure that the rows needed by the standby are not removed
* after restarting, so that the synchronized slot on the standby will not
* be invalidated.
*/
if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
ereport(ERROR,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
/*
* hot_standby_feedback must be enabled to cooperate with the physical
* replication slot, which allows informing the primary about the xmin and
* catalog_xmin values on the standby.
*/
if (!hot_standby_feedback)
ereport(ERROR,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be enabled",
"hot_standby_feedback"));
/* Logical slot sync/creation requires wal_level >= logical. */
if (wal_level < WAL_LEVEL_LOGICAL)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires wal_level >= \"logical\""));
/*
* The primary_conninfo is required to make connection to primary for
* getting slots information.
*/
if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
ereport(ERROR,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined",
"primary_conninfo"));
/*
* The slot synchronization needs a database connection for walrcv_exec to
* work.
@ -829,10 +907,523 @@ ValidateSlotSyncParams(void)
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires dbname to be specified in %s",
"primary_conninfo"));
return dbname;
}
/*
* Is current process syncing replication slots ?
* Return true if all necessary GUCs for slot synchronization are set
* appropriately, otherwise, return false.
*/
bool
ValidateSlotSyncParams(int elevel)
{
/*
* Logical slot sync/creation requires wal_level >= logical.
*
* Sincle altering the wal_level requires a server restart, so error out
* in this case regardless of elevel provided by caller.
*/
if (wal_level < WAL_LEVEL_LOGICAL)
{
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires wal_level >= \"logical\""));
return false;
}
/*
* A physical replication slot(primary_slot_name) is required on the
* primary to ensure that the rows needed by the standby are not removed
* after restarting, so that the synchronized slot on the standby will not
* be invalidated.
*/
if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
{
ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
return false;
}
/*
* hot_standby_feedback must be enabled to cooperate with the physical
* replication slot, which allows informing the primary about the xmin and
* catalog_xmin values on the standby.
*/
if (!hot_standby_feedback)
{
ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be enabled",
"hot_standby_feedback"));
return false;
}
/*
* The primary_conninfo is required to make connection to primary for
* getting slots information.
*/
if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
{
ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined",
"primary_conninfo"));
return false;
}
return true;
}
/*
* Re-read the config file.
*
* Exit if any of the slot sync GUCs have changed. The postmaster will
* restart it.
*/
static void
slotsync_reread_config(void)
{
char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
char *old_primary_slotname = pstrdup(PrimarySlotName);
bool old_sync_replication_slots = sync_replication_slots;
bool old_hot_standby_feedback = hot_standby_feedback;
bool conninfo_changed;
bool primary_slotname_changed;
Assert(sync_replication_slots);
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
pfree(old_primary_conninfo);
pfree(old_primary_slotname);
if (old_sync_replication_slots != sync_replication_slots)
{
ereport(LOG,
/* translator: %s is a GUC variable name */
errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots"));
proc_exit(0);
}
if (conninfo_changed ||
primary_slotname_changed ||
(old_hot_standby_feedback != hot_standby_feedback))
{
ereport(LOG,
errmsg("slot sync worker will restart because of a parameter change"));
/*
* Reset the last-start time for this worker so that the postmaster
* can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
*/
SlotSyncCtx->last_start_time = 0;
proc_exit(0);
}
}
/*
* Interrupt handler for main loop of slot sync worker.
*/
static void
ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
{
CHECK_FOR_INTERRUPTS();
if (ShutdownRequestPending)
{
ereport(LOG,
errmsg("slot sync worker is shutting down on receiving SIGINT"));
proc_exit(0);
}
if (ConfigReloadPending)
slotsync_reread_config();
}
/*
* Cleanup function for slotsync worker.
*
* Called on slotsync worker exit.
*/
static void
slotsync_worker_onexit(int code, Datum arg)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->pid = InvalidPid;
SpinLockRelease(&SlotSyncCtx->mutex);
}
/*
* Sleep for long enough that we believe it's likely that the slots on primary
* get updated.
*
* If there is no slot activity the wait time between sync-cycles will double
* (to a maximum of 30s). If there is some slot activity the wait time between
* sync-cycles is reset to the minimum (200ms).
*/
static void
wait_for_slot_activity(bool some_slot_updated)
{
int rc;
if (!some_slot_updated)
{
/*
* No slots were updated, so double the sleep time, but not beyond the
* maximum allowable value.
*/
sleep_ms = Min(sleep_ms * 2, MAX_WORKER_NAPTIME_MS);
}
else
{
/*
* Some slots were updated since the last sleep, so reset the sleep
* time.
*/
sleep_ms = MIN_WORKER_NAPTIME_MS;
}
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
sleep_ms,
WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
}
/*
* The main loop of our worker process.
*
* It connects to the primary server, fetches logical failover slots
* information periodically in order to create and sync the slots.
*/
NON_EXEC_STATIC void
ReplSlotSyncWorkerMain(int argc, char *argv[])
{
WalReceiverConn *wrconn = NULL;
char *dbname;
char *err;
sigjmp_buf local_sigjmp_buf;
StringInfoData app_name;
am_slotsync_worker = true;
MyBackendType = B_SLOTSYNC_WORKER;
init_ps_display(NULL);
SetProcessingMode(InitProcessing);
/*
* Create a per-backend PGPROC struct in shared memory. We must do this
* before we access any shared memory.
*/
InitProcess();
/*
* Early initialization.
*/
BaseInit();
Assert(SlotSyncCtx != NULL);
SpinLockAcquire(&SlotSyncCtx->mutex);
Assert(SlotSyncCtx->pid == InvalidPid);
/*
* Startup process signaled the slot sync worker to stop, so if meanwhile
* postmaster ended up starting the worker again, exit.
*/
if (SlotSyncCtx->stopSignaled)
{
SpinLockRelease(&SlotSyncCtx->mutex);
proc_exit(0);
}
/* Advertise our PID so that the startup process can kill us on promotion */
SlotSyncCtx->pid = MyProcPid;
SpinLockRelease(&SlotSyncCtx->mutex);
ereport(LOG, errmsg("slot sync worker started"));
/* Register it as soon as SlotSyncCtx->pid is initialized. */
before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
/* Setup signal handling */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGINT, SignalHandlerForShutdownRequest);
pqsignal(SIGTERM, die);
pqsignal(SIGFPE, FloatExceptionHandler);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGCHLD, SIG_DFL);
/*
* Establishes SIGALRM handler and initialize timeout module. It is needed
* by InitPostgres to register different timeouts.
*/
InitializeTimeouts();
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
/*
* If an exception is encountered, processing resumes here.
*
* We just need to clean up, report the error, and go away.
*
* If we do not have this handling here, then since this worker process
* operates at the bottom of the exception stack, ERRORs turn into FATALs.
* Therefore, we create our own exception handler to catch ERRORs.
*/
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
/* since not using PG_TRY, must reset error stack by hand */
error_context_stack = NULL;
/* Prevents interrupts while cleaning up */
HOLD_INTERRUPTS();
/* Report the error to the server log */
EmitErrorReport();
/*
* We can now go away. Note that because we called InitProcess, a
* callback was registered to do ProcKill, which will clean up
* necessary state.
*/
proc_exit(0);
}
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
/*
* Unblock signals (they were blocked when the postmaster forked us)
*/
sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
dbname = CheckAndGetDbnameFromConninfo();
/*
* Connect to the database specified by the user in primary_conninfo. We
* need a database connection for walrcv_exec to work which we use to
* fetch slot information from the remote node. See comments atop
* libpqrcv_exec.
*
* We do not specify a specific user here since the slot sync worker will
* operate as a superuser. This is safe because the slot sync worker does
* not interact with user tables, eliminating the risk of executing
* arbitrary code within triggers.
*/
InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
SetProcessingMode(NormalProcessing);
initStringInfo(&app_name);
if (cluster_name[0])
appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
else
appendStringInfo(&app_name, "%s", "slotsync worker");
/*
* Establish the connection to the primary server for slot
* synchronization.
*/
wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
app_name.data, &err);
pfree(app_name.data);
if (!wrconn)
ereport(ERROR,
errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the primary server: %s", err));
/*
* Register the failure callback once we have the connection.
*
* XXX: This can be combined with previous such cleanup registration of
* slotsync_worker_onexit() but that will need the connection to be made
* global and we want to avoid introducing global for this purpose.
*/
before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
/*
* Using the specified primary server connection, check that we are not a
* cascading standby and slot configured in 'primary_slot_name' exists on
* the primary server.
*/
validate_remote_info(wrconn);
/* Main loop to synchronize slots */
for (;;)
{
bool some_slot_updated = false;
ProcessSlotSyncInterrupts(wrconn);
some_slot_updated = synchronize_slots(wrconn);
wait_for_slot_activity(some_slot_updated);
}
/*
* The slot sync worker can't get here because it will only stop when it
* receives a SIGINT from the startup process, or when there is an error.
*/
Assert(false);
}
/*
* Main entry point for slot sync worker process, to be called from the
* postmaster.
*/
int
StartSlotSyncWorker(void)
{
pid_t pid;
#ifdef EXEC_BACKEND
switch ((pid = slotsyncworker_forkexec()))
{
#else
switch ((pid = fork_process()))
{
case 0:
/* in postmaster child ... */
InitPostmasterChild();
/* Close the postmaster's sockets */
ClosePostmasterPorts(false);
ReplSlotSyncWorkerMain(0, NULL);
break;
#endif
case -1:
ereport(LOG,
(errmsg("could not fork slot sync worker process: %m")));
return 0;
default:
return (int) pid;
}
/* shouldn't get here */
return 0;
}
#ifdef EXEC_BACKEND
/*
* The forkexec routine for the slot sync worker process.
*
* Format up the arglist, then fork and exec.
*/
static pid_t
slotsyncworker_forkexec(void)
{
char *av[10];
int ac = 0;
av[ac++] = "postgres";
av[ac++] = "--forkssworker";
av[ac++] = NULL; /* filled in by postmaster_forkexec */
av[ac] = NULL;
Assert(ac < lengthof(av));
return postmaster_forkexec(ac, av);
}
#endif
/*
* Shut down the slot sync worker.
*/
void
ShutDownSlotSync(void)
{
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->stopSignaled = true;
if (SlotSyncCtx->pid == InvalidPid)
{
SpinLockRelease(&SlotSyncCtx->mutex);
return;
}
SpinLockRelease(&SlotSyncCtx->mutex);
kill(SlotSyncCtx->pid, SIGINT);
/* Wait for it to die */
for (;;)
{
int rc;
/* Wait a bit, we don't expect to have to wait long */
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
SpinLockAcquire(&SlotSyncCtx->mutex);
/* Is it gone? */
if (SlotSyncCtx->pid == InvalidPid)
break;
SpinLockRelease(&SlotSyncCtx->mutex);
}
SpinLockRelease(&SlotSyncCtx->mutex);
}
/*
* SlotSyncWorkerCanRestart
*
* Returns true if enough time (SLOTSYNC_RESTART_INTERVAL_SEC) has passed
* since it was launched last. Otherwise returns false.
*
* This is a safety valve to protect against continuous respawn attempts if the
* worker is dying immediately at launch. Note that since we will retry to
* launch the worker from the postmaster main loop, we will get another
* chance later.
*/
bool
SlotSyncWorkerCanRestart(void)
{
time_t curtime = time(NULL);
/* Return false if too soon since last start. */
if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) <
(unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC)
return false;
SlotSyncCtx->last_start_time = curtime;
return true;
}
/*
* Is current process syncing replication slots?
*
* Could be either backend executing SQL function or slot sync worker.
*/
bool
IsSyncingReplicationSlots(void)
@ -840,6 +1431,15 @@ IsSyncingReplicationSlots(void)
return syncing_slots;
}
/*
* Is current process a slot sync worker?
*/
bool
IsLogicalSlotSyncWorker(void)
{
return am_slotsync_worker;
}
/*
* Amount of shared memory required for slot synchronization.
*/
@ -855,14 +1455,16 @@ SlotSyncShmemSize(void)
void
SlotSyncShmemInit(void)
{
Size size = SlotSyncShmemSize();
bool found;
SlotSyncCtx = (SlotSyncCtxStruct *)
ShmemInitStruct("Slot Sync Data", SlotSyncShmemSize(), &found);
ShmemInitStruct("Slot Sync Data", size, &found);
if (!found)
{
SlotSyncCtx->syncing = false;
memset(SlotSyncCtx, 0, size);
SlotSyncCtx->pid = InvalidPid;
SpinLockInit(&SlotSyncCtx->mutex);
}
}

View File

@ -1252,6 +1252,20 @@ restart:
* concurrently being dropped by a backend connected to another DB.
*
* That's fairly unlikely in practice, so we'll just bail out.
*
* The slot sync worker holds a shared lock on the database before
* operating on synced logical slots to avoid conflict with the drop
* happening here. The persistent synced slots are thus safe but there
* is a possibility that the slot sync worker has created a temporary
* slot (which stays active even on release) and we are trying to drop
* that here. In practice, the chances of hitting this scenario are
* less as during slot synchronization, the temporary slot is
* immediately converted to persistent and thus is safe due to the
* shared lock taken on the database. So, we'll just bail out in such
* a case.
*
* XXX: We can consider shutting down the slot sync worker before
* trying to drop synced temporary slots here.
*/
if (active_pid)
ereport(ERROR,

View File

@ -960,10 +960,12 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS)
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("replication slots can only be synchronized to a standby server"));
ValidateSlotSyncParams(ERROR);
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
ValidateSlotSyncParams();
(void) CheckAndGetDbnameFromConninfo();
initStringInfo(&app_name);
if (cluster_name[0])

View File

@ -3389,7 +3389,7 @@ WalSndDone(WalSndSendDataCallback send_data)
* This should only be called when in recovery.
*
* This is called either by cascading walsender to find WAL postion to be sent
* to a cascaded standby or by slot synchronization function to validate remote
* to a cascaded standby or by slot synchronization operation to validate remote
* slot's lsn before syncing it locally.
*
* As a side-effect, *tli is updated to the TLI of the last