mirror of
https://github.com/postgres/postgres.git
synced 2025-07-24 14:22:24 +03:00
Add GUC option to control maximum active replication origins.
This commit introduces a new GUC option max_active_replication_origins to control the maximum number of active replication origins. Previously, this was controlled by 'max_replication_slots'. Having a separate GUC option provides better flexibility for setting up subscribers, as they may not require replication slots (for cascading replication) but always require replication origins. Author: Euler Taveira <euler@eulerto.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Peter Eisentraut <peter@eisentraut.org> Reviewed-by: vignesh C <vignesh21@gmail.com> Discussion: https://postgr.es/m/b81db436-8262-4575-b7c4-bc0c1551000b@app.fastmail.com
This commit is contained in:
@ -31,7 +31,7 @@
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/logicallauncher.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/origin.h"
|
||||
#include "replication/walreceiver.h"
|
||||
#include "replication/worker_internal.h"
|
||||
#include "storage/ipc.h"
|
||||
@ -325,10 +325,10 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
|
||||
subname)));
|
||||
|
||||
/* Report this after the initial starting message for consistency. */
|
||||
if (max_replication_slots == 0)
|
||||
if (max_active_replication_origins == 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||
errmsg("cannot start logical replication workers when \"max_replication_slots\"=0")));
|
||||
errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0")));
|
||||
|
||||
/*
|
||||
* We need to do the modification of the shared memory under lock so that
|
||||
|
@ -90,6 +90,7 @@
|
||||
#include "storage/lmgr.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/snapmgr.h"
|
||||
@ -99,6 +100,9 @@
|
||||
#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
|
||||
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
|
||||
|
||||
/* GUC variables */
|
||||
int max_active_replication_origins = 10;
|
||||
|
||||
/*
|
||||
* Replay progress of a single remote node.
|
||||
*/
|
||||
@ -151,7 +155,7 @@ typedef struct ReplicationStateCtl
|
||||
{
|
||||
/* Tranche to use for per-origin LWLocks */
|
||||
int tranche_id;
|
||||
/* Array of length max_replication_slots */
|
||||
/* Array of length max_active_replication_origins */
|
||||
ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
|
||||
} ReplicationStateCtl;
|
||||
|
||||
@ -162,10 +166,7 @@ TimestampTz replorigin_session_origin_timestamp = 0;
|
||||
|
||||
/*
|
||||
* Base address into a shared memory array of replication states of size
|
||||
* max_replication_slots.
|
||||
*
|
||||
* XXX: Should we use a separate variable to size this rather than
|
||||
* max_replication_slots?
|
||||
* max_active_replication_origins.
|
||||
*/
|
||||
static ReplicationState *replication_states;
|
||||
|
||||
@ -186,12 +187,12 @@ static ReplicationState *session_replication_state = NULL;
|
||||
#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
|
||||
|
||||
static void
|
||||
replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
|
||||
replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
|
||||
{
|
||||
if (check_slots && max_replication_slots == 0)
|
||||
if (check_origins && max_active_replication_origins == 0)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("cannot query or manipulate replication origin when \"max_replication_slots\" is 0")));
|
||||
errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
|
||||
|
||||
if (!recoveryOK && RecoveryInProgress())
|
||||
ereport(ERROR,
|
||||
@ -352,7 +353,7 @@ replorigin_state_clear(RepOriginId roident, bool nowait)
|
||||
restart:
|
||||
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
|
||||
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
for (i = 0; i < max_active_replication_origins; i++)
|
||||
{
|
||||
ReplicationState *state = &replication_states[i];
|
||||
|
||||
@ -511,18 +512,13 @@ ReplicationOriginShmemSize(void)
|
||||
{
|
||||
Size size = 0;
|
||||
|
||||
/*
|
||||
* XXX: max_replication_slots is arguably the wrong thing to use, as here
|
||||
* we keep the replay state of *remote* transactions. But for now it seems
|
||||
* sufficient to reuse it, rather than introduce a separate GUC.
|
||||
*/
|
||||
if (max_replication_slots == 0)
|
||||
if (max_active_replication_origins == 0)
|
||||
return size;
|
||||
|
||||
size = add_size(size, offsetof(ReplicationStateCtl, states));
|
||||
|
||||
size = add_size(size,
|
||||
mul_size(max_replication_slots, sizeof(ReplicationState)));
|
||||
mul_size(max_active_replication_origins, sizeof(ReplicationState)));
|
||||
return size;
|
||||
}
|
||||
|
||||
@ -531,7 +527,7 @@ ReplicationOriginShmemInit(void)
|
||||
{
|
||||
bool found;
|
||||
|
||||
if (max_replication_slots == 0)
|
||||
if (max_active_replication_origins == 0)
|
||||
return;
|
||||
|
||||
replication_states_ctl = (ReplicationStateCtl *)
|
||||
@ -548,7 +544,7 @@ ReplicationOriginShmemInit(void)
|
||||
|
||||
replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
|
||||
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
for (i = 0; i < max_active_replication_origins; i++)
|
||||
{
|
||||
LWLockInitialize(&replication_states[i].lock,
|
||||
replication_states_ctl->tranche_id);
|
||||
@ -570,7 +566,7 @@ ReplicationOriginShmemInit(void)
|
||||
*
|
||||
* So its just the magic, followed by the statically sized
|
||||
* ReplicationStateOnDisk structs. Note that the maximum number of
|
||||
* ReplicationState is determined by max_replication_slots.
|
||||
* ReplicationState is determined by max_active_replication_origins.
|
||||
* ---------------------------------------------------------------------------
|
||||
*/
|
||||
void
|
||||
@ -583,7 +579,7 @@ CheckPointReplicationOrigin(void)
|
||||
uint32 magic = REPLICATION_STATE_MAGIC;
|
||||
pg_crc32c crc;
|
||||
|
||||
if (max_replication_slots == 0)
|
||||
if (max_active_replication_origins == 0)
|
||||
return;
|
||||
|
||||
INIT_CRC32C(crc);
|
||||
@ -625,7 +621,7 @@ CheckPointReplicationOrigin(void)
|
||||
LWLockAcquire(ReplicationOriginLock, LW_SHARED);
|
||||
|
||||
/* write actual data */
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
for (i = 0; i < max_active_replication_origins; i++)
|
||||
{
|
||||
ReplicationStateOnDisk disk_state;
|
||||
ReplicationState *curstate = &replication_states[i];
|
||||
@ -718,7 +714,7 @@ StartupReplicationOrigin(void)
|
||||
already_started = true;
|
||||
#endif
|
||||
|
||||
if (max_replication_slots == 0)
|
||||
if (max_active_replication_origins == 0)
|
||||
return;
|
||||
|
||||
INIT_CRC32C(crc);
|
||||
@ -728,8 +724,8 @@ StartupReplicationOrigin(void)
|
||||
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
|
||||
|
||||
/*
|
||||
* might have had max_replication_slots == 0 last run, or we just brought
|
||||
* up a standby.
|
||||
* might have had max_active_replication_origins == 0 last run, or we just
|
||||
* brought up a standby.
|
||||
*/
|
||||
if (fd < 0 && errno == ENOENT)
|
||||
return;
|
||||
@ -796,10 +792,10 @@ StartupReplicationOrigin(void)
|
||||
|
||||
COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
|
||||
|
||||
if (last_state == max_replication_slots)
|
||||
if (last_state == max_active_replication_origins)
|
||||
ereport(PANIC,
|
||||
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||
errmsg("could not find free replication state, increase \"max_replication_slots\"")));
|
||||
errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
|
||||
|
||||
/* copy data to shared memory */
|
||||
replication_states[last_state].roident = disk_state.roident;
|
||||
@ -852,7 +848,7 @@ replorigin_redo(XLogReaderState *record)
|
||||
|
||||
xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
|
||||
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
for (i = 0; i < max_active_replication_origins; i++)
|
||||
{
|
||||
ReplicationState *state = &replication_states[i];
|
||||
|
||||
@ -917,7 +913,7 @@ replorigin_advance(RepOriginId node,
|
||||
* Search for either an existing slot for the origin, or a free one we can
|
||||
* use.
|
||||
*/
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
for (i = 0; i < max_active_replication_origins; i++)
|
||||
{
|
||||
ReplicationState *curstate = &replication_states[i];
|
||||
|
||||
@ -958,7 +954,7 @@ replorigin_advance(RepOriginId node,
|
||||
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||
errmsg("could not find free replication state slot for replication origin with ID %d",
|
||||
node),
|
||||
errhint("Increase \"max_replication_slots\" and try again.")));
|
||||
errhint("Increase \"max_active_replication_origins\" and try again.")));
|
||||
|
||||
if (replication_state == NULL)
|
||||
{
|
||||
@ -1024,7 +1020,7 @@ replorigin_get_progress(RepOriginId node, bool flush)
|
||||
/* prevent slots from being concurrently dropped */
|
||||
LWLockAcquire(ReplicationOriginLock, LW_SHARED);
|
||||
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
for (i = 0; i < max_active_replication_origins; i++)
|
||||
{
|
||||
ReplicationState *state;
|
||||
|
||||
@ -1110,7 +1106,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
|
||||
registered_cleanup = true;
|
||||
}
|
||||
|
||||
Assert(max_replication_slots > 0);
|
||||
Assert(max_active_replication_origins > 0);
|
||||
|
||||
if (session_replication_state != NULL)
|
||||
ereport(ERROR,
|
||||
@ -1124,7 +1120,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
|
||||
* Search for either an existing slot for the origin, or a free one we can
|
||||
* use.
|
||||
*/
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
for (i = 0; i < max_active_replication_origins; i++)
|
||||
{
|
||||
ReplicationState *curstate = &replication_states[i];
|
||||
|
||||
@ -1159,7 +1155,7 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
|
||||
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
||||
errmsg("could not find free replication state slot for replication origin with ID %d",
|
||||
node),
|
||||
errhint("Increase \"max_replication_slots\" and try again.")));
|
||||
errhint("Increase \"max_active_replication_origins\" and try again.")));
|
||||
else if (session_replication_state == NULL)
|
||||
{
|
||||
/* initialize new slot */
|
||||
@ -1195,7 +1191,7 @@ replorigin_session_reset(void)
|
||||
{
|
||||
ConditionVariable *cv;
|
||||
|
||||
Assert(max_replication_slots != 0);
|
||||
Assert(max_active_replication_origins != 0);
|
||||
|
||||
if (session_replication_state == NULL)
|
||||
ereport(ERROR,
|
||||
@ -1536,7 +1532,7 @@ pg_show_replication_origin_status(PG_FUNCTION_ARGS)
|
||||
* filled. Note that we do not take any locks, so slightly corrupted/out
|
||||
* of date values are a possibility.
|
||||
*/
|
||||
for (i = 0; i < max_replication_slots; i++)
|
||||
for (i = 0; i < max_active_replication_origins; i++)
|
||||
{
|
||||
ReplicationState *state;
|
||||
Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
|
||||
|
Reference in New Issue
Block a user