1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-27 12:41:57 +03:00

Allow setting failover property in the replication command.

This commit implements a new replication command called
ALTER_REPLICATION_SLOT and a corresponding walreceiver API function named
walrcv_alter_slot. Additionally, the CREATE_REPLICATION_SLOT command has
been extended to support the failover option.

These new additions allow the modification of the failover property of a
replication slot on the publisher. A subsequent commit will make use of
these commands in subscription commands and will add the tests as well to
cover the functionality added/changed by this commit.

Author: Hou Zhijie, Shveta Malik
Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
This commit is contained in:
Amit Kapila
2024-01-29 09:10:00 +05:30
parent 08e6344fd6
commit 7329240437
13 changed files with 230 additions and 11 deletions

View File

@ -73,8 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
const char *slotname,
bool temporary,
bool two_phase,
bool failover,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
bool failover);
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
const char *query,
@ -95,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
.walrcv_receive = libpqrcv_receive,
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
.walrcv_alter_slot = libpqrcv_alter_slot,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
@ -938,8 +942,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
*/
static char *
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn)
bool temporary, bool two_phase, bool failover,
CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
{
PGresult *res;
StringInfoData cmd;
@ -969,6 +973,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
appendStringInfoChar(&cmd, ' ');
}
if (failover)
{
appendStringInfoString(&cmd, "FAILOVER");
if (use_new_options_syntax)
appendStringInfoString(&cmd, ", ");
else
appendStringInfoChar(&cmd, ' ');
}
if (use_new_options_syntax)
{
switch (snapshot_action)
@ -1037,6 +1050,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
return snapshot;
}
/*
* Change the definition of the replication slot.
*/
static void
libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
bool failover)
{
StringInfoData cmd;
PGresult *res;
initStringInfo(&cmd);
appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
quote_identifier(slotname),
failover ? "true" : "false");
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not alter replication slot \"%s\": %s",
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
PQclear(res);
}
/*
* Return PID of remote backend process.
*/

View File

@ -1430,6 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
*/
walrcv_create_slot(LogRepWorkerWalRcvConn,
slotname, false /* permanent */ , false /* two_phase */ ,
false,
CRS_USE_SNAPSHOT, origin_startpos);
/*

View File

@ -64,6 +64,7 @@ Node *replication_parse_result;
%token K_START_REPLICATION
%token K_CREATE_REPLICATION_SLOT
%token K_DROP_REPLICATION_SLOT
%token K_ALTER_REPLICATION_SLOT
%token K_TIMELINE_HISTORY
%token K_WAIT
%token K_TIMELINE
@ -80,8 +81,9 @@ Node *replication_parse_result;
%type <node> command
%type <node> base_backup start_replication start_logical_replication
create_replication_slot drop_replication_slot identify_system
read_replication_slot timeline_history show upload_manifest
create_replication_slot drop_replication_slot
alter_replication_slot identify_system read_replication_slot
timeline_history show upload_manifest
%type <list> generic_option_list
%type <defelt> generic_option
%type <uintval> opt_timeline
@ -112,6 +114,7 @@ command:
| start_logical_replication
| create_replication_slot
| drop_replication_slot
| alter_replication_slot
| read_replication_slot
| timeline_history
| show
@ -259,6 +262,18 @@ drop_replication_slot:
}
;
/* ALTER_REPLICATION_SLOT slot (options) */
alter_replication_slot:
K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')'
{
AlterReplicationSlotCmd *cmd;
cmd = makeNode(AlterReplicationSlotCmd);
cmd->slotname = $2;
cmd->options = $4;
$$ = (Node *) cmd;
}
;
/*
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
*/
@ -410,6 +425,7 @@ ident_or_keyword:
| K_START_REPLICATION { $$ = "start_replication"; }
| K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; }
| K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; }
| K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; }
| K_TIMELINE_HISTORY { $$ = "timeline_history"; }
| K_WAIT { $$ = "wait"; }
| K_TIMELINE { $$ = "timeline"; }

View File

@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; }
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
PHYSICAL { return K_PHYSICAL; }
RESERVE_WAL { return K_RESERVE_WAL; }
@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void)
case K_START_REPLICATION:
case K_CREATE_REPLICATION_SLOT:
case K_DROP_REPLICATION_SLOT:
case K_ALTER_REPLICATION_SLOT:
case K_READ_REPLICATION_SLOT:
case K_TIMELINE_HISTORY:
case K_UPLOAD_MANIFEST:

View File

@ -683,6 +683,31 @@ ReplicationSlotDrop(const char *name, bool nowait)
ReplicationSlotDropAcquired();
}
/*
* Change the definition of the slot identified by the specified name.
*/
void
ReplicationSlotAlter(const char *name, bool failover)
{
Assert(MyReplicationSlot == NULL);
ReplicationSlotAcquire(name, false);
if (SlotIsPhysical(MyReplicationSlot))
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use %s with a physical replication slot",
"ALTER_REPLICATION_SLOT"));
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->data.failover = failover;
SpinLockRelease(&MyReplicationSlot->mutex);
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
}
/*
* Permanently drop the currently acquired replication slot.
*/

View File

@ -387,7 +387,7 @@ WalReceiverMain(void)
"pg_walreceiver_%lld",
(long long int) walrcv_get_backend_pid(wrconn));
walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
SpinLockAcquire(&walrcv->mutex);
strlcpy(walrcv->slotname, slotname, NAMEDATALEN);

View File

@ -1126,12 +1126,13 @@ static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal,
CRSSnapshotAction *snapshot_action,
bool *two_phase)
bool *two_phase, bool *failover)
{
ListCell *lc;
bool snapshot_action_given = false;
bool reserve_wal_given = false;
bool two_phase_given = false;
bool failover_given = false;
/* Parse options */
foreach(lc, cmd->options)
@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
two_phase_given = true;
*two_phase = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "failover") == 0)
{
if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
failover_given = true;
*failover = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
char *slot_name;
bool reserve_wal = false;
bool two_phase = false;
bool failover = false;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
DestReceiver *dest;
TupOutputState *tstate;
@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(!MyReplicationSlot);
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
&failover);
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{
@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
two_phase, false);
two_phase, failover);
/*
* Do options check early so that we can bail before calling the
@ -1398,6 +1410,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
}
/*
* Process extra options given to ALTER_REPLICATION_SLOT.
*/
static void
ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
{
bool failover_given = false;
/* Parse options */
foreach_ptr(DefElem, defel, cmd->options)
{
if (strcmp(defel->defname, "failover") == 0)
{
if (failover_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
failover_given = true;
*failover = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
}
/*
* Change the definition of a replication slot.
*/
static void
AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
{
bool failover = false;
ParseAlterReplSlotOptions(cmd, &failover);
ReplicationSlotAlter(cmd->slotname, failover);
}
/*
* Load previously initiated logical slot and prepare for sending data (via
* WalSndLoop).
@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand(cmdtag);
break;
case T_AlterReplicationSlotCmd:
cmdtag = "ALTER_REPLICATION_SLOT";
set_ps_display(cmdtag);
AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;