mirror of
https://github.com/postgres/postgres.git
synced 2025-06-19 04:21:08 +03:00
Allow setting failover property in the replication command.
This commit implements a new replication command called ALTER_REPLICATION_SLOT and a corresponding walreceiver API function named walrcv_alter_slot. Additionally, the CREATE_REPLICATION_SLOT command has been extended to support the failover option. These new additions allow the modification of the failover property of a replication slot on the publisher. A subsequent commit will make use of these commands in subscription commands and will add the tests as well to cover the functionality added/changed by this commit. Author: Hou Zhijie, Shveta Malik Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
This commit is contained in:
@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
|
|||||||
</para>
|
</para>
|
||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry>
|
||||||
|
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
If true, the slot is enabled to be synced to the standbys.
|
||||||
|
The default is false.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
</variablelist>
|
</variablelist>
|
||||||
|
|
||||||
<para>
|
<para>
|
||||||
@ -2124,6 +2134,46 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
|
|||||||
</listitem>
|
</listitem>
|
||||||
</varlistentry>
|
</varlistentry>
|
||||||
|
|
||||||
|
<varlistentry id="protocol-replication-alter-replication-slot" xreflabel="ALTER_REPLICATION_SLOT">
|
||||||
|
<term><literal>ALTER_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ( <replaceable class="parameter">option</replaceable> [, ...] )
|
||||||
|
<indexterm><primary>ALTER_REPLICATION_SLOT</primary></indexterm>
|
||||||
|
</term>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
Change the definition of a replication slot.
|
||||||
|
See <xref linkend="streaming-replication-slots"/> for more about
|
||||||
|
replication slots. This command is currently only supported for logical
|
||||||
|
replication slots.
|
||||||
|
</para>
|
||||||
|
|
||||||
|
<variablelist>
|
||||||
|
<varlistentry>
|
||||||
|
<term><replaceable class="parameter">slot_name</replaceable></term>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
The name of the slot to alter. Must be a valid replication slot
|
||||||
|
name (see <xref linkend="streaming-replication-slots-manipulation"/>).
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
</variablelist>
|
||||||
|
|
||||||
|
<para>The following option is supported:</para>
|
||||||
|
|
||||||
|
<variablelist>
|
||||||
|
<varlistentry>
|
||||||
|
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
|
||||||
|
<listitem>
|
||||||
|
<para>
|
||||||
|
If true, the slot is enabled to be synced to the standbys.
|
||||||
|
</para>
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
</variablelist>
|
||||||
|
|
||||||
|
</listitem>
|
||||||
|
</varlistentry>
|
||||||
|
|
||||||
<varlistentry id="protocol-replication-read-replication-slot">
|
<varlistentry id="protocol-replication-read-replication-slot">
|
||||||
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
|
<term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
|
||||||
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
|
<indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
|
||||||
|
@ -807,7 +807,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
|||||||
twophase_enabled = true;
|
twophase_enabled = true;
|
||||||
|
|
||||||
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
|
walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
|
||||||
CRS_NOEXPORT_SNAPSHOT, NULL);
|
false, CRS_NOEXPORT_SNAPSHOT, NULL);
|
||||||
|
|
||||||
if (twophase_enabled)
|
if (twophase_enabled)
|
||||||
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
|
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
|
||||||
|
@ -73,8 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
|
|||||||
const char *slotname,
|
const char *slotname,
|
||||||
bool temporary,
|
bool temporary,
|
||||||
bool two_phase,
|
bool two_phase,
|
||||||
|
bool failover,
|
||||||
CRSSnapshotAction snapshot_action,
|
CRSSnapshotAction snapshot_action,
|
||||||
XLogRecPtr *lsn);
|
XLogRecPtr *lsn);
|
||||||
|
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
|
||||||
|
bool failover);
|
||||||
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
|
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
|
||||||
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
|
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
|
||||||
const char *query,
|
const char *query,
|
||||||
@ -95,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
|
|||||||
.walrcv_receive = libpqrcv_receive,
|
.walrcv_receive = libpqrcv_receive,
|
||||||
.walrcv_send = libpqrcv_send,
|
.walrcv_send = libpqrcv_send,
|
||||||
.walrcv_create_slot = libpqrcv_create_slot,
|
.walrcv_create_slot = libpqrcv_create_slot,
|
||||||
|
.walrcv_alter_slot = libpqrcv_alter_slot,
|
||||||
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
|
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
|
||||||
.walrcv_exec = libpqrcv_exec,
|
.walrcv_exec = libpqrcv_exec,
|
||||||
.walrcv_disconnect = libpqrcv_disconnect
|
.walrcv_disconnect = libpqrcv_disconnect
|
||||||
@ -938,8 +942,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
|
|||||||
*/
|
*/
|
||||||
static char *
|
static char *
|
||||||
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
||||||
bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
|
bool temporary, bool two_phase, bool failover,
|
||||||
XLogRecPtr *lsn)
|
CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
|
||||||
{
|
{
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
StringInfoData cmd;
|
StringInfoData cmd;
|
||||||
@ -969,6 +973,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
|||||||
appendStringInfoChar(&cmd, ' ');
|
appendStringInfoChar(&cmd, ' ');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (failover)
|
||||||
|
{
|
||||||
|
appendStringInfoString(&cmd, "FAILOVER");
|
||||||
|
if (use_new_options_syntax)
|
||||||
|
appendStringInfoString(&cmd, ", ");
|
||||||
|
else
|
||||||
|
appendStringInfoChar(&cmd, ' ');
|
||||||
|
}
|
||||||
|
|
||||||
if (use_new_options_syntax)
|
if (use_new_options_syntax)
|
||||||
{
|
{
|
||||||
switch (snapshot_action)
|
switch (snapshot_action)
|
||||||
@ -1037,6 +1050,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
|||||||
return snapshot;
|
return snapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Change the definition of the replication slot.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
|
||||||
|
bool failover)
|
||||||
|
{
|
||||||
|
StringInfoData cmd;
|
||||||
|
PGresult *res;
|
||||||
|
|
||||||
|
initStringInfo(&cmd);
|
||||||
|
appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
|
||||||
|
quote_identifier(slotname),
|
||||||
|
failover ? "true" : "false");
|
||||||
|
|
||||||
|
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
|
||||||
|
pfree(cmd.data);
|
||||||
|
|
||||||
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
|
errmsg("could not alter replication slot \"%s\": %s",
|
||||||
|
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
|
||||||
|
|
||||||
|
PQclear(res);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Return PID of remote backend process.
|
* Return PID of remote backend process.
|
||||||
*/
|
*/
|
||||||
|
@ -1430,6 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
|
|||||||
*/
|
*/
|
||||||
walrcv_create_slot(LogRepWorkerWalRcvConn,
|
walrcv_create_slot(LogRepWorkerWalRcvConn,
|
||||||
slotname, false /* permanent */ , false /* two_phase */ ,
|
slotname, false /* permanent */ , false /* two_phase */ ,
|
||||||
|
false,
|
||||||
CRS_USE_SNAPSHOT, origin_startpos);
|
CRS_USE_SNAPSHOT, origin_startpos);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -64,6 +64,7 @@ Node *replication_parse_result;
|
|||||||
%token K_START_REPLICATION
|
%token K_START_REPLICATION
|
||||||
%token K_CREATE_REPLICATION_SLOT
|
%token K_CREATE_REPLICATION_SLOT
|
||||||
%token K_DROP_REPLICATION_SLOT
|
%token K_DROP_REPLICATION_SLOT
|
||||||
|
%token K_ALTER_REPLICATION_SLOT
|
||||||
%token K_TIMELINE_HISTORY
|
%token K_TIMELINE_HISTORY
|
||||||
%token K_WAIT
|
%token K_WAIT
|
||||||
%token K_TIMELINE
|
%token K_TIMELINE
|
||||||
@ -80,8 +81,9 @@ Node *replication_parse_result;
|
|||||||
|
|
||||||
%type <node> command
|
%type <node> command
|
||||||
%type <node> base_backup start_replication start_logical_replication
|
%type <node> base_backup start_replication start_logical_replication
|
||||||
create_replication_slot drop_replication_slot identify_system
|
create_replication_slot drop_replication_slot
|
||||||
read_replication_slot timeline_history show upload_manifest
|
alter_replication_slot identify_system read_replication_slot
|
||||||
|
timeline_history show upload_manifest
|
||||||
%type <list> generic_option_list
|
%type <list> generic_option_list
|
||||||
%type <defelt> generic_option
|
%type <defelt> generic_option
|
||||||
%type <uintval> opt_timeline
|
%type <uintval> opt_timeline
|
||||||
@ -112,6 +114,7 @@ command:
|
|||||||
| start_logical_replication
|
| start_logical_replication
|
||||||
| create_replication_slot
|
| create_replication_slot
|
||||||
| drop_replication_slot
|
| drop_replication_slot
|
||||||
|
| alter_replication_slot
|
||||||
| read_replication_slot
|
| read_replication_slot
|
||||||
| timeline_history
|
| timeline_history
|
||||||
| show
|
| show
|
||||||
@ -259,6 +262,18 @@ drop_replication_slot:
|
|||||||
}
|
}
|
||||||
;
|
;
|
||||||
|
|
||||||
|
/* ALTER_REPLICATION_SLOT slot (options) */
|
||||||
|
alter_replication_slot:
|
||||||
|
K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')'
|
||||||
|
{
|
||||||
|
AlterReplicationSlotCmd *cmd;
|
||||||
|
cmd = makeNode(AlterReplicationSlotCmd);
|
||||||
|
cmd->slotname = $2;
|
||||||
|
cmd->options = $4;
|
||||||
|
$$ = (Node *) cmd;
|
||||||
|
}
|
||||||
|
;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
|
* START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
|
||||||
*/
|
*/
|
||||||
@ -410,6 +425,7 @@ ident_or_keyword:
|
|||||||
| K_START_REPLICATION { $$ = "start_replication"; }
|
| K_START_REPLICATION { $$ = "start_replication"; }
|
||||||
| K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; }
|
| K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; }
|
||||||
| K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; }
|
| K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; }
|
||||||
|
| K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; }
|
||||||
| K_TIMELINE_HISTORY { $$ = "timeline_history"; }
|
| K_TIMELINE_HISTORY { $$ = "timeline_history"; }
|
||||||
| K_WAIT { $$ = "wait"; }
|
| K_WAIT { $$ = "wait"; }
|
||||||
| K_TIMELINE { $$ = "timeline"; }
|
| K_TIMELINE { $$ = "timeline"; }
|
||||||
|
@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; }
|
|||||||
START_REPLICATION { return K_START_REPLICATION; }
|
START_REPLICATION { return K_START_REPLICATION; }
|
||||||
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
|
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
|
||||||
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
|
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
|
||||||
|
ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; }
|
||||||
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
|
TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
|
||||||
PHYSICAL { return K_PHYSICAL; }
|
PHYSICAL { return K_PHYSICAL; }
|
||||||
RESERVE_WAL { return K_RESERVE_WAL; }
|
RESERVE_WAL { return K_RESERVE_WAL; }
|
||||||
@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void)
|
|||||||
case K_START_REPLICATION:
|
case K_START_REPLICATION:
|
||||||
case K_CREATE_REPLICATION_SLOT:
|
case K_CREATE_REPLICATION_SLOT:
|
||||||
case K_DROP_REPLICATION_SLOT:
|
case K_DROP_REPLICATION_SLOT:
|
||||||
|
case K_ALTER_REPLICATION_SLOT:
|
||||||
case K_READ_REPLICATION_SLOT:
|
case K_READ_REPLICATION_SLOT:
|
||||||
case K_TIMELINE_HISTORY:
|
case K_TIMELINE_HISTORY:
|
||||||
case K_UPLOAD_MANIFEST:
|
case K_UPLOAD_MANIFEST:
|
||||||
|
@ -683,6 +683,31 @@ ReplicationSlotDrop(const char *name, bool nowait)
|
|||||||
ReplicationSlotDropAcquired();
|
ReplicationSlotDropAcquired();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Change the definition of the slot identified by the specified name.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ReplicationSlotAlter(const char *name, bool failover)
|
||||||
|
{
|
||||||
|
Assert(MyReplicationSlot == NULL);
|
||||||
|
|
||||||
|
ReplicationSlotAcquire(name, false);
|
||||||
|
|
||||||
|
if (SlotIsPhysical(MyReplicationSlot))
|
||||||
|
ereport(ERROR,
|
||||||
|
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot use %s with a physical replication slot",
|
||||||
|
"ALTER_REPLICATION_SLOT"));
|
||||||
|
|
||||||
|
SpinLockAcquire(&MyReplicationSlot->mutex);
|
||||||
|
MyReplicationSlot->data.failover = failover;
|
||||||
|
SpinLockRelease(&MyReplicationSlot->mutex);
|
||||||
|
|
||||||
|
ReplicationSlotMarkDirty();
|
||||||
|
ReplicationSlotSave();
|
||||||
|
ReplicationSlotRelease();
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Permanently drop the currently acquired replication slot.
|
* Permanently drop the currently acquired replication slot.
|
||||||
*/
|
*/
|
||||||
|
@ -387,7 +387,7 @@ WalReceiverMain(void)
|
|||||||
"pg_walreceiver_%lld",
|
"pg_walreceiver_%lld",
|
||||||
(long long int) walrcv_get_backend_pid(wrconn));
|
(long long int) walrcv_get_backend_pid(wrconn));
|
||||||
|
|
||||||
walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
|
walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
|
||||||
|
|
||||||
SpinLockAcquire(&walrcv->mutex);
|
SpinLockAcquire(&walrcv->mutex);
|
||||||
strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
|
strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
|
||||||
|
@ -1126,12 +1126,13 @@ static void
|
|||||||
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
|
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
|
||||||
bool *reserve_wal,
|
bool *reserve_wal,
|
||||||
CRSSnapshotAction *snapshot_action,
|
CRSSnapshotAction *snapshot_action,
|
||||||
bool *two_phase)
|
bool *two_phase, bool *failover)
|
||||||
{
|
{
|
||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
bool snapshot_action_given = false;
|
bool snapshot_action_given = false;
|
||||||
bool reserve_wal_given = false;
|
bool reserve_wal_given = false;
|
||||||
bool two_phase_given = false;
|
bool two_phase_given = false;
|
||||||
|
bool failover_given = false;
|
||||||
|
|
||||||
/* Parse options */
|
/* Parse options */
|
||||||
foreach(lc, cmd->options)
|
foreach(lc, cmd->options)
|
||||||
@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
|
|||||||
two_phase_given = true;
|
two_phase_given = true;
|
||||||
*two_phase = defGetBoolean(defel);
|
*two_phase = defGetBoolean(defel);
|
||||||
}
|
}
|
||||||
|
else if (strcmp(defel->defname, "failover") == 0)
|
||||||
|
{
|
||||||
|
if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("conflicting or redundant options")));
|
||||||
|
failover_given = true;
|
||||||
|
*failover = defGetBoolean(defel);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
elog(ERROR, "unrecognized option: %s", defel->defname);
|
elog(ERROR, "unrecognized option: %s", defel->defname);
|
||||||
}
|
}
|
||||||
@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
|||||||
char *slot_name;
|
char *slot_name;
|
||||||
bool reserve_wal = false;
|
bool reserve_wal = false;
|
||||||
bool two_phase = false;
|
bool two_phase = false;
|
||||||
|
bool failover = false;
|
||||||
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
|
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
|
||||||
DestReceiver *dest;
|
DestReceiver *dest;
|
||||||
TupOutputState *tstate;
|
TupOutputState *tstate;
|
||||||
@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
|||||||
|
|
||||||
Assert(!MyReplicationSlot);
|
Assert(!MyReplicationSlot);
|
||||||
|
|
||||||
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
|
parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
|
||||||
|
&failover);
|
||||||
|
|
||||||
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
|
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
|
||||||
{
|
{
|
||||||
@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
|
|||||||
*/
|
*/
|
||||||
ReplicationSlotCreate(cmd->slotname, true,
|
ReplicationSlotCreate(cmd->slotname, true,
|
||||||
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
|
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
|
||||||
two_phase, false);
|
two_phase, failover);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Do options check early so that we can bail before calling the
|
* Do options check early so that we can bail before calling the
|
||||||
@ -1398,6 +1410,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
|
|||||||
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
|
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Process extra options given to ALTER_REPLICATION_SLOT.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
|
||||||
|
{
|
||||||
|
bool failover_given = false;
|
||||||
|
|
||||||
|
/* Parse options */
|
||||||
|
foreach_ptr(DefElem, defel, cmd->options)
|
||||||
|
{
|
||||||
|
if (strcmp(defel->defname, "failover") == 0)
|
||||||
|
{
|
||||||
|
if (failover_given)
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||||
|
errmsg("conflicting or redundant options")));
|
||||||
|
failover_given = true;
|
||||||
|
*failover = defGetBoolean(defel);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
elog(ERROR, "unrecognized option: %s", defel->defname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Change the definition of a replication slot.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
|
||||||
|
{
|
||||||
|
bool failover = false;
|
||||||
|
|
||||||
|
ParseAlterReplSlotOptions(cmd, &failover);
|
||||||
|
ReplicationSlotAlter(cmd->slotname, failover);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Load previously initiated logical slot and prepare for sending data (via
|
* Load previously initiated logical slot and prepare for sending data (via
|
||||||
* WalSndLoop).
|
* WalSndLoop).
|
||||||
@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string)
|
|||||||
EndReplicationCommand(cmdtag);
|
EndReplicationCommand(cmdtag);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case T_AlterReplicationSlotCmd:
|
||||||
|
cmdtag = "ALTER_REPLICATION_SLOT";
|
||||||
|
set_ps_display(cmdtag);
|
||||||
|
AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node);
|
||||||
|
EndReplicationCommand(cmdtag);
|
||||||
|
break;
|
||||||
|
|
||||||
case T_StartReplicationCmd:
|
case T_StartReplicationCmd:
|
||||||
{
|
{
|
||||||
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
|
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
|
||||||
|
@ -72,6 +72,18 @@ typedef struct DropReplicationSlotCmd
|
|||||||
} DropReplicationSlotCmd;
|
} DropReplicationSlotCmd;
|
||||||
|
|
||||||
|
|
||||||
|
/* ----------------------
|
||||||
|
* ALTER_REPLICATION_SLOT command
|
||||||
|
* ----------------------
|
||||||
|
*/
|
||||||
|
typedef struct AlterReplicationSlotCmd
|
||||||
|
{
|
||||||
|
NodeTag type;
|
||||||
|
char *slotname;
|
||||||
|
List *options;
|
||||||
|
} AlterReplicationSlotCmd;
|
||||||
|
|
||||||
|
|
||||||
/* ----------------------
|
/* ----------------------
|
||||||
* START_REPLICATION command
|
* START_REPLICATION command
|
||||||
* ----------------------
|
* ----------------------
|
||||||
|
@ -227,6 +227,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
|
|||||||
bool two_phase, bool failover);
|
bool two_phase, bool failover);
|
||||||
extern void ReplicationSlotPersist(void);
|
extern void ReplicationSlotPersist(void);
|
||||||
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
extern void ReplicationSlotDrop(const char *name, bool nowait);
|
||||||
|
extern void ReplicationSlotAlter(const char *name, bool failover);
|
||||||
|
|
||||||
extern void ReplicationSlotAcquire(const char *name, bool nowait);
|
extern void ReplicationSlotAcquire(const char *name, bool nowait);
|
||||||
extern void ReplicationSlotRelease(void);
|
extern void ReplicationSlotRelease(void);
|
||||||
|
@ -355,9 +355,20 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
|
|||||||
const char *slotname,
|
const char *slotname,
|
||||||
bool temporary,
|
bool temporary,
|
||||||
bool two_phase,
|
bool two_phase,
|
||||||
|
bool failover,
|
||||||
CRSSnapshotAction snapshot_action,
|
CRSSnapshotAction snapshot_action,
|
||||||
XLogRecPtr *lsn);
|
XLogRecPtr *lsn);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* walrcv_alter_slot_fn
|
||||||
|
*
|
||||||
|
* Change the definition of a replication slot. Currently, it only supports
|
||||||
|
* changing the failover property of the slot.
|
||||||
|
*/
|
||||||
|
typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
|
||||||
|
const char *slotname,
|
||||||
|
bool failover);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* walrcv_get_backend_pid_fn
|
* walrcv_get_backend_pid_fn
|
||||||
*
|
*
|
||||||
@ -399,6 +410,7 @@ typedef struct WalReceiverFunctionsType
|
|||||||
walrcv_receive_fn walrcv_receive;
|
walrcv_receive_fn walrcv_receive;
|
||||||
walrcv_send_fn walrcv_send;
|
walrcv_send_fn walrcv_send;
|
||||||
walrcv_create_slot_fn walrcv_create_slot;
|
walrcv_create_slot_fn walrcv_create_slot;
|
||||||
|
walrcv_alter_slot_fn walrcv_alter_slot;
|
||||||
walrcv_get_backend_pid_fn walrcv_get_backend_pid;
|
walrcv_get_backend_pid_fn walrcv_get_backend_pid;
|
||||||
walrcv_exec_fn walrcv_exec;
|
walrcv_exec_fn walrcv_exec;
|
||||||
walrcv_disconnect_fn walrcv_disconnect;
|
walrcv_disconnect_fn walrcv_disconnect;
|
||||||
@ -428,8 +440,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
|
|||||||
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
|
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
|
||||||
#define walrcv_send(conn, buffer, nbytes) \
|
#define walrcv_send(conn, buffer, nbytes) \
|
||||||
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
|
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
|
||||||
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
|
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
|
||||||
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
|
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
|
||||||
|
#define walrcv_alter_slot(conn, slotname, failover) \
|
||||||
|
WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
|
||||||
#define walrcv_get_backend_pid(conn) \
|
#define walrcv_get_backend_pid(conn) \
|
||||||
WalReceiverFunctions->walrcv_get_backend_pid(conn)
|
WalReceiverFunctions->walrcv_get_backend_pid(conn)
|
||||||
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
|
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
|
||||||
|
@ -85,6 +85,7 @@ AlterOwnerStmt
|
|||||||
AlterPolicyStmt
|
AlterPolicyStmt
|
||||||
AlterPublicationAction
|
AlterPublicationAction
|
||||||
AlterPublicationStmt
|
AlterPublicationStmt
|
||||||
|
AlterReplicationSlotCmd
|
||||||
AlterRoleSetStmt
|
AlterRoleSetStmt
|
||||||
AlterRoleStmt
|
AlterRoleStmt
|
||||||
AlterSeqStmt
|
AlterSeqStmt
|
||||||
@ -3879,6 +3880,7 @@ varattrib_1b_e
|
|||||||
varattrib_4b
|
varattrib_4b
|
||||||
vbits
|
vbits
|
||||||
verifier_context
|
verifier_context
|
||||||
|
walrcv_alter_slot_fn
|
||||||
walrcv_check_conninfo_fn
|
walrcv_check_conninfo_fn
|
||||||
walrcv_connect_fn
|
walrcv_connect_fn
|
||||||
walrcv_create_slot_fn
|
walrcv_create_slot_fn
|
||||||
|
Reference in New Issue
Block a user