|
|
|
|
@@ -82,11 +82,11 @@ typedef struct ReplicationSlotOnDisk
|
|
|
|
|
} ReplicationSlotOnDisk;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Struct for the configuration of standby_slot_names.
|
|
|
|
|
* Struct for the configuration of synchronized_standby_slots.
|
|
|
|
|
*
|
|
|
|
|
* Note: this must be a flat representation that can be held in a single chunk
|
|
|
|
|
* of guc_malloc'd memory, so that it can be stored as the "extra" data for the
|
|
|
|
|
* standby_slot_names GUC.
|
|
|
|
|
* synchronized_standby_slots GUC.
|
|
|
|
|
*/
|
|
|
|
|
typedef struct
|
|
|
|
|
{
|
|
|
|
|
@@ -97,7 +97,7 @@ typedef struct
|
|
|
|
|
* slot_names contains 'nslotnames' consecutive null-terminated C strings.
|
|
|
|
|
*/
|
|
|
|
|
char slot_names[FLEXIBLE_ARRAY_MEMBER];
|
|
|
|
|
} StandbySlotNamesConfigData;
|
|
|
|
|
} SyncStandbySlotsConfigData;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Lookup table for slot invalidation causes.
|
|
|
|
|
@@ -145,14 +145,14 @@ int max_replication_slots = 10; /* the maximum number of replication
|
|
|
|
|
* This GUC lists streaming replication standby server slot names that
|
|
|
|
|
* logical WAL sender processes will wait for.
|
|
|
|
|
*/
|
|
|
|
|
char *standby_slot_names;
|
|
|
|
|
char *synchronized_standby_slots;
|
|
|
|
|
|
|
|
|
|
/* This is the parsed and cached configuration for standby_slot_names */
|
|
|
|
|
static StandbySlotNamesConfigData *standby_slot_names_config;
|
|
|
|
|
/* This is the parsed and cached configuration for synchronized_standby_slots */
|
|
|
|
|
static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Oldest LSN that has been confirmed to be flushed to the standbys
|
|
|
|
|
* corresponding to the physical slots specified in the standby_slot_names GUC.
|
|
|
|
|
* corresponding to the physical slots specified in the synchronized_standby_slots GUC.
|
|
|
|
|
*/
|
|
|
|
|
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
|
|
|
|
|
|
|
|
|
|
@@ -2412,12 +2412,12 @@ GetSlotInvalidationCause(const char *invalidation_reason)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* A helper function to validate slots specified in GUC standby_slot_names.
|
|
|
|
|
* A helper function to validate slots specified in GUC synchronized_standby_slots.
|
|
|
|
|
*
|
|
|
|
|
* The rawname will be parsed, and the result will be saved into *elemlist.
|
|
|
|
|
*/
|
|
|
|
|
static bool
|
|
|
|
|
validate_standby_slots(char *rawname, List **elemlist)
|
|
|
|
|
validate_sync_standby_slots(char *rawname, List **elemlist)
|
|
|
|
|
{
|
|
|
|
|
bool ok;
|
|
|
|
|
|
|
|
|
|
@@ -2472,17 +2472,17 @@ validate_standby_slots(char *rawname, List **elemlist)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* GUC check_hook for standby_slot_names
|
|
|
|
|
* GUC check_hook for synchronized_standby_slots
|
|
|
|
|
*/
|
|
|
|
|
bool
|
|
|
|
|
check_standby_slot_names(char **newval, void **extra, GucSource source)
|
|
|
|
|
check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
|
|
|
|
|
{
|
|
|
|
|
char *rawname;
|
|
|
|
|
char *ptr;
|
|
|
|
|
List *elemlist;
|
|
|
|
|
int size;
|
|
|
|
|
bool ok;
|
|
|
|
|
StandbySlotNamesConfigData *config;
|
|
|
|
|
SyncStandbySlotsConfigData *config;
|
|
|
|
|
|
|
|
|
|
if ((*newval)[0] == '\0')
|
|
|
|
|
return true;
|
|
|
|
|
@@ -2491,7 +2491,7 @@ check_standby_slot_names(char **newval, void **extra, GucSource source)
|
|
|
|
|
rawname = pstrdup(*newval);
|
|
|
|
|
|
|
|
|
|
/* Now verify if the specified slots exist and have correct type */
|
|
|
|
|
ok = validate_standby_slots(rawname, &elemlist);
|
|
|
|
|
ok = validate_sync_standby_slots(rawname, &elemlist);
|
|
|
|
|
|
|
|
|
|
if (!ok || elemlist == NIL)
|
|
|
|
|
{
|
|
|
|
|
@@ -2500,15 +2500,15 @@ check_standby_slot_names(char **newval, void **extra, GucSource source)
|
|
|
|
|
return ok;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Compute the size required for the StandbySlotNamesConfigData struct */
|
|
|
|
|
size = offsetof(StandbySlotNamesConfigData, slot_names);
|
|
|
|
|
/* Compute the size required for the SyncStandbySlotsConfigData struct */
|
|
|
|
|
size = offsetof(SyncStandbySlotsConfigData, slot_names);
|
|
|
|
|
foreach_ptr(char, slot_name, elemlist)
|
|
|
|
|
size += strlen(slot_name) + 1;
|
|
|
|
|
|
|
|
|
|
/* GUC extra value must be guc_malloc'd, not palloc'd */
|
|
|
|
|
config = (StandbySlotNamesConfigData *) guc_malloc(LOG, size);
|
|
|
|
|
config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
|
|
|
|
|
|
|
|
|
|
/* Transform the data into StandbySlotNamesConfigData */
|
|
|
|
|
/* Transform the data into SyncStandbySlotsConfigData */
|
|
|
|
|
config->nslotnames = list_length(elemlist);
|
|
|
|
|
|
|
|
|
|
ptr = config->slot_names;
|
|
|
|
|
@@ -2526,10 +2526,10 @@ check_standby_slot_names(char **newval, void **extra, GucSource source)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* GUC assign_hook for standby_slot_names
|
|
|
|
|
* GUC assign_hook for synchronized_standby_slots
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
assign_standby_slot_names(const char *newval, void *extra)
|
|
|
|
|
assign_synchronized_standby_slots(const char *newval, void *extra)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* The standby slots may have changed, so we must recompute the oldest
|
|
|
|
|
@@ -2537,19 +2537,19 @@ assign_standby_slot_names(const char *newval, void *extra)
|
|
|
|
|
*/
|
|
|
|
|
ss_oldest_flush_lsn = InvalidXLogRecPtr;
|
|
|
|
|
|
|
|
|
|
standby_slot_names_config = (StandbySlotNamesConfigData *) extra;
|
|
|
|
|
synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Check if the passed slot_name is specified in the standby_slot_names GUC.
|
|
|
|
|
* Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
|
|
|
|
|
*/
|
|
|
|
|
bool
|
|
|
|
|
SlotExistsInStandbySlotNames(const char *slot_name)
|
|
|
|
|
SlotExistsInSyncStandbySlots(const char *slot_name)
|
|
|
|
|
{
|
|
|
|
|
const char *standby_slot_name;
|
|
|
|
|
|
|
|
|
|
/* Return false if there is no value in standby_slot_names */
|
|
|
|
|
if (standby_slot_names_config == NULL)
|
|
|
|
|
/* Return false if there is no value in synchronized_standby_slots */
|
|
|
|
|
if (synchronized_standby_slots_config == NULL)
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -2557,8 +2557,8 @@ SlotExistsInStandbySlotNames(const char *slot_name)
|
|
|
|
|
* shouldn't hurt but if that turns out not to be true then we can cache
|
|
|
|
|
* this information for each WalSender as well.
|
|
|
|
|
*/
|
|
|
|
|
standby_slot_name = standby_slot_names_config->slot_names;
|
|
|
|
|
for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
|
|
|
|
|
standby_slot_name = synchronized_standby_slots_config->slot_names;
|
|
|
|
|
for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
|
|
|
|
|
{
|
|
|
|
|
if (strcmp(standby_slot_name, slot_name) == 0)
|
|
|
|
|
return true;
|
|
|
|
|
@@ -2570,7 +2570,7 @@ SlotExistsInStandbySlotNames(const char *slot_name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Return true if the slots specified in standby_slot_names have caught up to
|
|
|
|
|
* Return true if the slots specified in synchronized_standby_slots have caught up to
|
|
|
|
|
* the given WAL location, false otherwise.
|
|
|
|
|
*
|
|
|
|
|
* The elevel parameter specifies the error level used for logging messages
|
|
|
|
|
@@ -2585,9 +2585,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Don't need to wait for the standbys to catch up if there is no value in
|
|
|
|
|
* standby_slot_names.
|
|
|
|
|
* synchronized_standby_slots.
|
|
|
|
|
*/
|
|
|
|
|
if (standby_slot_names_config == NULL)
|
|
|
|
|
if (synchronized_standby_slots_config == NULL)
|
|
|
|
|
return true;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -2611,8 +2611,8 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
|
|
|
|
|
*/
|
|
|
|
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
|
|
|
|
|
|
|
|
|
name = standby_slot_names_config->slot_names;
|
|
|
|
|
for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
|
|
|
|
|
name = synchronized_standby_slots_config->slot_names;
|
|
|
|
|
for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
|
|
|
|
|
{
|
|
|
|
|
XLogRecPtr restart_lsn;
|
|
|
|
|
bool invalidated;
|
|
|
|
|
@@ -2624,43 +2624,44 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
|
|
|
|
|
if (!slot)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* If a slot name provided in standby_slot_names does not exist,
|
|
|
|
|
* report a message and exit the loop. A user can specify a slot
|
|
|
|
|
* name that does not exist just before the server startup. The
|
|
|
|
|
* GUC check_hook(validate_standby_slots) cannot validate such a
|
|
|
|
|
* slot during startup as the ReplicationSlotCtl shared memory is
|
|
|
|
|
* not initialized at that time. It is also possible for a user to
|
|
|
|
|
* drop the slot in standby_slot_names afterwards.
|
|
|
|
|
* If a slot name provided in synchronized_standby_slots does not
|
|
|
|
|
* exist, report a message and exit the loop. A user can specify a
|
|
|
|
|
* slot name that does not exist just before the server startup.
|
|
|
|
|
* The GUC check_hook(validate_sync_standby_slots) cannot validate
|
|
|
|
|
* such a slot during startup as the ReplicationSlotCtl shared
|
|
|
|
|
* memory is not initialized at that time. It is also possible for
|
|
|
|
|
* a user to drop the slot in synchronized_standby_slots
|
|
|
|
|
* afterwards.
|
|
|
|
|
*/
|
|
|
|
|
ereport(elevel,
|
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
|
errmsg("replication slot \"%s\" specified in parameter %s does not exist",
|
|
|
|
|
name, "standby_slot_names"),
|
|
|
|
|
name, "synchronized_standby_slots"),
|
|
|
|
|
errdetail("Logical replication is waiting on the standby associated with \"%s\".",
|
|
|
|
|
name),
|
|
|
|
|
errhint("Consider creating the slot \"%s\" or amend parameter %s.",
|
|
|
|
|
name, "standby_slot_names"));
|
|
|
|
|
name, "synchronized_standby_slots"));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (SlotIsLogical(slot))
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* If a logical slot name is provided in standby_slot_names,
|
|
|
|
|
* report a message and exit the loop. Similar to the non-existent
|
|
|
|
|
* case, a user can specify a logical slot name in
|
|
|
|
|
* standby_slot_names before the server startup, or drop an
|
|
|
|
|
* existing physical slot and recreate a logical slot with the
|
|
|
|
|
* same name.
|
|
|
|
|
* If a logical slot name is provided in
|
|
|
|
|
* synchronized_standby_slots, report a message and exit the loop.
|
|
|
|
|
* Similar to the non-existent case, a user can specify a logical
|
|
|
|
|
* slot name in synchronized_standby_slots before the server
|
|
|
|
|
* startup, or drop an existing physical slot and recreate a
|
|
|
|
|
* logical slot with the same name.
|
|
|
|
|
*/
|
|
|
|
|
ereport(elevel,
|
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
|
|
|
errmsg("cannot have logical replication slot \"%s\" in parameter %s",
|
|
|
|
|
name, "standby_slot_names"),
|
|
|
|
|
name, "synchronized_standby_slots"),
|
|
|
|
|
errdetail("Logical replication is waiting for correction on \"%s\".",
|
|
|
|
|
name),
|
|
|
|
|
errhint("Consider removing logical slot \"%s\" from parameter %s.",
|
|
|
|
|
name, "standby_slot_names"));
|
|
|
|
|
name, "synchronized_standby_slots"));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -2676,11 +2677,11 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
|
|
|
|
|
ereport(elevel,
|
|
|
|
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
|
|
|
errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
|
|
|
|
|
name, "standby_slot_names"),
|
|
|
|
|
name, "synchronized_standby_slots"),
|
|
|
|
|
errdetail("Logical replication is waiting on the standby associated with \"%s\".",
|
|
|
|
|
name),
|
|
|
|
|
errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
|
|
|
|
|
name, "standby_slot_names"));
|
|
|
|
|
name, "synchronized_standby_slots"));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -2691,11 +2692,11 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
|
|
|
|
|
ereport(elevel,
|
|
|
|
|
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
|
|
|
errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
|
|
|
|
|
name, "standby_slot_names"),
|
|
|
|
|
name, "synchronized_standby_slots"),
|
|
|
|
|
errdetail("Logical replication is waiting on the standby associated with \"%s\".",
|
|
|
|
|
name),
|
|
|
|
|
errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
|
|
|
|
|
name, "standby_slot_names"));
|
|
|
|
|
name, "synchronized_standby_slots"));
|
|
|
|
|
|
|
|
|
|
/* Continue if the current slot hasn't caught up. */
|
|
|
|
|
break;
|
|
|
|
|
@@ -2718,7 +2719,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
|
|
|
|
|
* Return false if not all the standbys have caught up to the specified
|
|
|
|
|
* WAL location.
|
|
|
|
|
*/
|
|
|
|
|
if (caught_up_slot_num != standby_slot_names_config->nslotnames)
|
|
|
|
|
if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
/* The ss_oldest_flush_lsn must not retreat. */
|
|
|
|
|
@@ -2734,7 +2735,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
|
|
|
|
|
* Wait for physical standbys to confirm receiving the given lsn.
|
|
|
|
|
*
|
|
|
|
|
* Used by logical decoding SQL functions. It waits for physical standbys
|
|
|
|
|
* corresponding to the physical slots specified in the standby_slot_names GUC.
|
|
|
|
|
* corresponding to the physical slots specified in the synchronized_standby_slots GUC.
|
|
|
|
|
*/
|
|
|
|
|
void
|
|
|
|
|
WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
|
|
|
|
|
@@ -2742,9 +2743,9 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
|
|
|
|
|
/*
|
|
|
|
|
* Don't need to wait for the standby to catch up if the current acquired
|
|
|
|
|
* slot is not a logical failover slot, or there is no value in
|
|
|
|
|
* standby_slot_names.
|
|
|
|
|
* synchronized_standby_slots.
|
|
|
|
|
*/
|
|
|
|
|
if (!MyReplicationSlot->data.failover || !standby_slot_names_config)
|
|
|
|
|
if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
|
|
|
|
|
@@ -2764,9 +2765,9 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Wait for the slots in the standby_slot_names to catch up, but use a
|
|
|
|
|
* timeout (1s) so we can also check if the standby_slot_names has
|
|
|
|
|
* been changed.
|
|
|
|
|
* Wait for the slots in the synchronized_standby_slots to catch up,
|
|
|
|
|
* but use a timeout (1s) so we can also check if the
|
|
|
|
|
* synchronized_standby_slots has been changed.
|
|
|
|
|
*/
|
|
|
|
|
ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
|
|
|
|
|
WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
|
|
|
|
|
|