1
0
mirror of https://github.com/postgres/postgres.git synced 2025-04-20 00:42:27 +03:00

pg_recvlogical: Add --failover option.

This new option instructs pg_recvlogical to create the logical
replication slot with the failover option enabled. It can be used in
conjunction with the --create-slot option.

Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Michael Banck <mbanck@gmx.net>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/OSCPR01MB14966C54097FC83AF19F3516BF5AC2@OSCPR01MB14966.jpnprd01.prod.outlook.com
This commit is contained in:
Masahiko Sawada 2025-04-04 10:39:57 -07:00
parent 3556c89321
commit cf2655a902
7 changed files with 61 additions and 11 deletions

View File

@ -79,8 +79,8 @@ PostgreSQL documentation
</para> </para>
<para> <para>
The <option>--two-phase</option> can be specified with The <option>--two-phase</option> and <option>--falover</option> options
<option>--create-slot</option> to enable decoding of prepared transactions. can be specified with <option>--create-slot</option>.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
@ -165,6 +165,16 @@ PostgreSQL documentation
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>--failover</option></term>
<listitem>
<para>
Enables the slot to be synchronized to the standbys. This option may
only be specified with <option>--create-slot</option>.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>-f <replaceable>filename</replaceable></option></term> <term><option>-f <replaceable>filename</replaceable></option></term>
<term><option>--file=<replaceable>filename</replaceable></option></term> <term><option>--file=<replaceable>filename</replaceable></option></term>

View File

@ -667,7 +667,8 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier,
if (temp_replication_slot || create_slot) if (temp_replication_slot || create_slot)
{ {
if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL, if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
temp_replication_slot, true, true, false, false)) temp_replication_slot, true, true, false,
false, false))
exit(1); exit(1);
if (verbose) if (verbose)

View File

@ -889,7 +889,7 @@ main(int argc, char **argv)
pg_log_info("creating replication slot \"%s\"", replication_slot); pg_log_info("creating replication slot \"%s\"", replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false, if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
slot_exists_ok, false)) slot_exists_ok, false, false))
exit(1); exit(1);
exit(0); exit(0);
} }

View File

@ -42,6 +42,7 @@ typedef enum
static char *outfile = NULL; static char *outfile = NULL;
static int verbose = 0; static int verbose = 0;
static bool two_phase = false; static bool two_phase = false;
static bool failover = false;
static int noloop = 0; static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 10 * 1000; /* 10 sec = default */
@ -89,6 +90,8 @@ usage(void)
printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n")); printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
printf(_("\nOptions:\n")); printf(_("\nOptions:\n"));
printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n")); printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
printf(_(" --failover enable replication slot synchronization to standby servers when\n"
" creating a slot\n"));
printf(_(" -f, --file=FILE receive log into this file, - for stdout\n")); printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
printf(_(" -F --fsync-interval=SECS\n" printf(_(" -F --fsync-interval=SECS\n"
" time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000)); " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
@ -695,6 +698,7 @@ main(int argc, char **argv)
{"file", required_argument, NULL, 'f'}, {"file", required_argument, NULL, 'f'},
{"fsync-interval", required_argument, NULL, 'F'}, {"fsync-interval", required_argument, NULL, 'F'},
{"no-loop", no_argument, NULL, 'n'}, {"no-loop", no_argument, NULL, 'n'},
{"failover", no_argument, NULL, 5},
{"verbose", no_argument, NULL, 'v'}, {"verbose", no_argument, NULL, 'v'},
{"two-phase", no_argument, NULL, 't'}, {"two-phase", no_argument, NULL, 't'},
{"version", no_argument, NULL, 'V'}, {"version", no_argument, NULL, 'V'},
@ -770,6 +774,9 @@ main(int argc, char **argv)
case 'v': case 'v':
verbose++; verbose++;
break; break;
case 5:
failover = true;
break;
/* connection options */ /* connection options */
case 'd': case 'd':
dbname = pg_strdup(optarg); dbname = pg_strdup(optarg);
@ -917,13 +924,23 @@ main(int argc, char **argv)
exit(1); exit(1);
} }
if (two_phase && !do_create_slot) if (!do_create_slot)
{
if (two_phase)
{ {
pg_log_error("--two-phase may only be specified with --create-slot"); pg_log_error("--two-phase may only be specified with --create-slot");
pg_log_error_hint("Try \"%s --help\" for more information.", progname); pg_log_error_hint("Try \"%s --help\" for more information.", progname);
exit(1); exit(1);
} }
if (failover)
{
pg_log_error("--failover may only be specified with --create-slot");
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
exit(1);
}
}
/* /*
* Obtain a connection to server. Notably, if we need a password, we want * Obtain a connection to server. Notably, if we need a password, we want
* to collect it from the user immediately. * to collect it from the user immediately.
@ -984,7 +1001,8 @@ main(int argc, char **argv)
pg_log_info("creating replication slot \"%s\"", replication_slot); pg_log_info("creating replication slot \"%s\"", replication_slot);
if (!CreateReplicationSlot(conn, replication_slot, plugin, false, if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
false, false, slot_exists_ok, two_phase)) false, false, slot_exists_ok, two_phase,
failover))
exit(1); exit(1);
startpos = InvalidXLogRecPtr; startpos = InvalidXLogRecPtr;
} }

View File

@ -583,7 +583,7 @@ GetSlotInformation(PGconn *conn, const char *slot_name,
bool bool
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
bool is_temporary, bool is_physical, bool reserve_wal, bool is_temporary, bool is_physical, bool reserve_wal,
bool slot_exists_ok, bool two_phase) bool slot_exists_ok, bool two_phase, bool failover)
{ {
PQExpBuffer query; PQExpBuffer query;
PGresult *res; PGresult *res;
@ -594,6 +594,7 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
Assert((is_physical && plugin == NULL) || Assert((is_physical && plugin == NULL) ||
(!is_physical && plugin != NULL)); (!is_physical && plugin != NULL));
Assert(!(two_phase && is_physical)); Assert(!(two_phase && is_physical));
Assert(!(failover && is_physical));
Assert(slot_name != NULL); Assert(slot_name != NULL);
/* Build base portion of query */ /* Build base portion of query */
@ -616,6 +617,10 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
} }
else else
{ {
if (failover && PQserverVersion(conn) >= 170000)
AppendPlainCommandOption(query, use_new_option_syntax,
"FAILOVER");
if (two_phase && PQserverVersion(conn) >= 150000) if (two_phase && PQserverVersion(conn) >= 150000)
AppendPlainCommandOption(query, use_new_option_syntax, AppendPlainCommandOption(query, use_new_option_syntax,
"TWO_PHASE"); "TWO_PHASE");

View File

@ -35,7 +35,8 @@ extern PGconn *GetConnection(void);
extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name, extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
const char *plugin, bool is_temporary, const char *plugin, bool is_temporary,
bool is_physical, bool reserve_wal, bool is_physical, bool reserve_wal,
bool slot_exists_ok, bool two_phase); bool slot_exists_ok, bool two_phase,
bool failover);
extern bool DropReplicationSlot(PGconn *conn, const char *slot_name); extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
extern bool RunIdentifySystem(PGconn *conn, char **sysid, extern bool RunIdentifySystem(PGconn *conn, char **sysid,
TimeLineID *starttli, TimeLineID *starttli,

View File

@ -135,4 +135,19 @@ $node->command_ok(
], ],
'drop could work without dbname'); 'drop could work without dbname');
# test with failover option enabled
$node->command_ok(
[
'pg_recvlogical',
'--slot' => 'test',
'--dbname' => $node->connstr('postgres'),
'--create-slot',
'--failover',
],
'slot with failover created');
my $result = $node->safe_psql('postgres',
"SELECT failover FROM pg_catalog.pg_replication_slots WHERE slot_name = 'test'");
is($result, 't', "failover is enabled for the new slot");
done_testing(); done_testing();