mirror of
https://github.com/postgres/postgres.git
synced 2025-06-14 18:42:34 +03:00
Add two-phase option in pg_createsubscriber.
This patch introduces the '--enable-two-phase' option to the 'pg_createsubscriber' utility, allowing users to enable two-phase commit for all subscriptions during their creation. Note that even without this option users can enable the two_phase option for the subscriptions created by pg_createsubscriber. However, it requires the subscription to be disabled first which could be inconvenient for users. When two-phase commit is enabled, prepared transactions are sent to the subscriber at the time of 'PREPARE TRANSACTION', and they are processed as two-phase transactions on the subscriber as well. If disabled, prepared transactions are sent only when committed and are processed immediately by the subscriber. Author: Shubham Khanna <khannashubham1197@gmail.com> Reviewed-by: vignesh C <vignesh21@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Peter Smith <smithpb2250@gmail.com> Reviewed-by: Ajin Cherian <itsajin@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Discussion: https://postgr.es/m/CAHv8RjLPdFP=kA5LNSmWZ=+GMXmO+LczvV6p9HJjsXxZz10KGA@mail.gmail.com
This commit is contained in:
@ -165,6 +165,19 @@ PostgreSQL documentation
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><option>-T</option></term>
|
||||
<term><option>--enable-two-phase</option></term>
|
||||
<listitem>
|
||||
<para>
|
||||
Enables <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
|
||||
commit for the subscription. When multiple databases are specified, this
|
||||
option applies uniformly to all subscriptions created on those databases.
|
||||
The default is <literal>false</literal>.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><option>-U <replaceable class="parameter">username</replaceable></option></term>
|
||||
<term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
|
||||
@ -300,7 +313,9 @@ PostgreSQL documentation
|
||||
greater than or equal to the number of specified databases. The target
|
||||
server must have <xref linkend="guc-max-worker-processes"/> configured to a
|
||||
value greater than the number of specified databases. The target server
|
||||
must accept local connections.
|
||||
must accept local connections. If you are planning to use the
|
||||
<option>--enable-two-phase</option> switch then you will also need to set
|
||||
the <xref linkend="guc-max-prepared-transactions"/> appropriately.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
@ -360,6 +375,7 @@ PostgreSQL documentation
|
||||
</para>
|
||||
|
||||
<para>
|
||||
Unless the <option>--enable-two-phase</option> switch is specified,
|
||||
<application>pg_createsubscriber</application> sets up logical
|
||||
replication with two-phase commit disabled. This means that any
|
||||
prepared transactions will be replicated at the time
|
||||
|
@ -38,6 +38,7 @@ struct CreateSubscriberOptions
|
||||
char *socket_dir; /* directory for Unix-domain socket, if any */
|
||||
char *sub_port; /* subscriber port number */
|
||||
const char *sub_username; /* subscriber username */
|
||||
bool two_phase; /* enable-two-phase option */
|
||||
SimpleStringList database_names; /* list of database names */
|
||||
SimpleStringList pub_names; /* list of publication names */
|
||||
SimpleStringList sub_names; /* list of subscription names */
|
||||
@ -45,6 +46,7 @@ struct CreateSubscriberOptions
|
||||
int recovery_timeout; /* stop recovery after this time */
|
||||
};
|
||||
|
||||
/* per-database publication/subscription info */
|
||||
struct LogicalRepInfo
|
||||
{
|
||||
char *dbname; /* database name */
|
||||
@ -58,6 +60,16 @@ struct LogicalRepInfo
|
||||
bool made_publication; /* publication was created */
|
||||
};
|
||||
|
||||
/*
|
||||
* Information shared across all the databases (or publications and
|
||||
* subscriptions).
|
||||
*/
|
||||
struct LogicalRepInfos
|
||||
{
|
||||
struct LogicalRepInfo *dbinfo;
|
||||
bool two_phase; /* enable-two-phase option */
|
||||
};
|
||||
|
||||
static void cleanup_objects_atexit(void);
|
||||
static void usage();
|
||||
static char *get_base_conninfo(const char *conninfo, char **dbname);
|
||||
@ -117,7 +129,7 @@ static bool dry_run = false;
|
||||
|
||||
static bool success = false;
|
||||
|
||||
static struct LogicalRepInfo *dbinfo;
|
||||
static struct LogicalRepInfos dbinfos;
|
||||
static int num_dbs = 0; /* number of specified databases */
|
||||
static int num_pubs = 0; /* number of specified publications */
|
||||
static int num_subs = 0; /* number of specified subscriptions */
|
||||
@ -172,17 +184,17 @@ cleanup_objects_atexit(void)
|
||||
|
||||
for (int i = 0; i < num_dbs; i++)
|
||||
{
|
||||
if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
|
||||
if (dbinfos.dbinfo[i].made_publication || dbinfos.dbinfo[i].made_replslot)
|
||||
{
|
||||
PGconn *conn;
|
||||
|
||||
conn = connect_database(dbinfo[i].pubconninfo, false);
|
||||
conn = connect_database(dbinfos.dbinfo[i].pubconninfo, false);
|
||||
if (conn != NULL)
|
||||
{
|
||||
if (dbinfo[i].made_publication)
|
||||
drop_publication(conn, &dbinfo[i]);
|
||||
if (dbinfo[i].made_replslot)
|
||||
drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
|
||||
if (dbinfos.dbinfo[i].made_publication)
|
||||
drop_publication(conn, &dbinfos.dbinfo[i]);
|
||||
if (dbinfos.dbinfo[i].made_replslot)
|
||||
drop_replication_slot(conn, &dbinfos.dbinfo[i], dbinfos.dbinfo[i].replslotname);
|
||||
disconnect_database(conn, false);
|
||||
}
|
||||
else
|
||||
@ -192,16 +204,18 @@ cleanup_objects_atexit(void)
|
||||
* that some objects were left on primary and should be
|
||||
* removed before trying again.
|
||||
*/
|
||||
if (dbinfo[i].made_publication)
|
||||
if (dbinfos.dbinfo[i].made_publication)
|
||||
{
|
||||
pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
|
||||
dbinfo[i].pubname, dbinfo[i].dbname);
|
||||
dbinfos.dbinfo[i].pubname,
|
||||
dbinfos.dbinfo[i].dbname);
|
||||
pg_log_warning_hint("Drop this publication before trying again.");
|
||||
}
|
||||
if (dbinfo[i].made_replslot)
|
||||
if (dbinfos.dbinfo[i].made_replslot)
|
||||
{
|
||||
pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
|
||||
dbinfo[i].replslotname, dbinfo[i].dbname);
|
||||
dbinfos.dbinfo[i].replslotname,
|
||||
dbinfos.dbinfo[i].dbname);
|
||||
pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
|
||||
}
|
||||
}
|
||||
@ -227,6 +241,7 @@ usage(void)
|
||||
printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
|
||||
printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
|
||||
printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
|
||||
printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
|
||||
printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
|
||||
printf(_(" -v, --verbose output verbose messages\n"));
|
||||
printf(_(" --config-file=FILENAME use specified main server configuration\n"
|
||||
@ -479,9 +494,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
|
||||
dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
|
||||
dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
|
||||
dbinfo[i].pubconninfo);
|
||||
pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
|
||||
pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
|
||||
dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
|
||||
dbinfo[i].subconninfo);
|
||||
dbinfo[i].subconninfo,
|
||||
dbinfos.two_phase ? "true" : "false");
|
||||
|
||||
if (num_pubs > 0)
|
||||
pubcell = pubcell->next;
|
||||
@ -938,11 +954,12 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
|
||||
failed = true;
|
||||
}
|
||||
|
||||
if (max_prepared_transactions != 0)
|
||||
if (max_prepared_transactions != 0 && !dbinfos.two_phase)
|
||||
{
|
||||
pg_log_warning("two_phase option will not be enabled for replication slots");
|
||||
pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
|
||||
"Prepared transactions will be replicated at COMMIT PREPARED.");
|
||||
pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1345,8 +1362,9 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
|
||||
slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
|
||||
|
||||
appendPQExpBuffer(str,
|
||||
"SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
|
||||
slot_name_esc);
|
||||
"SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
|
||||
slot_name_esc,
|
||||
dbinfos.two_phase ? "true" : "false");
|
||||
|
||||
PQfreemem(slot_name_esc);
|
||||
|
||||
@ -1722,8 +1740,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
|
||||
appendPQExpBuffer(str,
|
||||
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
|
||||
"WITH (create_slot = false, enabled = false, "
|
||||
"slot_name = %s, copy_data = false)",
|
||||
subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
|
||||
"slot_name = %s, copy_data = false, two_phase = %s)",
|
||||
subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
|
||||
dbinfos.two_phase ? "true" : "false");
|
||||
|
||||
PQfreemem(pubname_esc);
|
||||
PQfreemem(subname_esc);
|
||||
@ -1895,6 +1914,7 @@ main(int argc, char **argv)
|
||||
{"publisher-server", required_argument, NULL, 'P'},
|
||||
{"socketdir", required_argument, NULL, 's'},
|
||||
{"recovery-timeout", required_argument, NULL, 't'},
|
||||
{"enable-two-phase", no_argument, NULL, 'T'},
|
||||
{"subscriber-username", required_argument, NULL, 'U'},
|
||||
{"verbose", no_argument, NULL, 'v'},
|
||||
{"version", no_argument, NULL, 'V'},
|
||||
@ -1950,6 +1970,7 @@ main(int argc, char **argv)
|
||||
opt.socket_dir = NULL;
|
||||
opt.sub_port = DEFAULT_SUB_PORT;
|
||||
opt.sub_username = NULL;
|
||||
opt.two_phase = false;
|
||||
opt.database_names = (SimpleStringList)
|
||||
{
|
||||
0
|
||||
@ -1972,7 +1993,7 @@ main(int argc, char **argv)
|
||||
|
||||
get_restricted_token();
|
||||
|
||||
while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
|
||||
while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
|
||||
long_options, &option_index)) != -1)
|
||||
{
|
||||
switch (c)
|
||||
@ -2009,6 +2030,9 @@ main(int argc, char **argv)
|
||||
case 't':
|
||||
opt.recovery_timeout = atoi(optarg);
|
||||
break;
|
||||
case 'T':
|
||||
opt.two_phase = true;
|
||||
break;
|
||||
case 'U':
|
||||
opt.sub_username = pg_strdup(optarg);
|
||||
break;
|
||||
@ -2170,12 +2194,14 @@ main(int argc, char **argv)
|
||||
/* Rudimentary check for a data directory */
|
||||
check_data_directory(subscriber_dir);
|
||||
|
||||
dbinfos.two_phase = opt.two_phase;
|
||||
|
||||
/*
|
||||
* Store database information for publisher and subscriber. It should be
|
||||
* called before atexit() because its return is used in the
|
||||
* cleanup_objects_atexit().
|
||||
*/
|
||||
dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
|
||||
dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
|
||||
|
||||
/* Register a function to clean up objects in case of failure */
|
||||
atexit(cleanup_objects_atexit);
|
||||
@ -2184,7 +2210,7 @@ main(int argc, char **argv)
|
||||
* Check if the subscriber data directory has the same system identifier
|
||||
* than the publisher data directory.
|
||||
*/
|
||||
pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
|
||||
pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
|
||||
sub_sysid = get_standby_sysid(subscriber_dir);
|
||||
if (pub_sysid != sub_sysid)
|
||||
pg_fatal("subscriber data directory is not a copy of the source database cluster");
|
||||
@ -2214,10 +2240,10 @@ main(int argc, char **argv)
|
||||
start_standby_server(&opt, true, false);
|
||||
|
||||
/* Check if the standby server is ready for logical replication */
|
||||
check_subscriber(dbinfo);
|
||||
check_subscriber(dbinfos.dbinfo);
|
||||
|
||||
/* Check if the primary server is ready for logical replication */
|
||||
check_publisher(dbinfo);
|
||||
check_publisher(dbinfos.dbinfo);
|
||||
|
||||
/*
|
||||
* Stop the target server. The recovery process requires that the server
|
||||
@ -2230,10 +2256,10 @@ main(int argc, char **argv)
|
||||
stop_standby_server(subscriber_dir);
|
||||
|
||||
/* Create the required objects for each database on publisher */
|
||||
consistent_lsn = setup_publisher(dbinfo);
|
||||
consistent_lsn = setup_publisher(dbinfos.dbinfo);
|
||||
|
||||
/* Write the required recovery parameters */
|
||||
setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
|
||||
setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
|
||||
|
||||
/*
|
||||
* Start subscriber so the recovery parameters will take effect. Wait
|
||||
@ -2244,7 +2270,7 @@ main(int argc, char **argv)
|
||||
start_standby_server(&opt, true, true);
|
||||
|
||||
/* Waiting the subscriber to be promoted */
|
||||
wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
|
||||
wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
|
||||
|
||||
/*
|
||||
* Create the subscription for each database on subscriber. It does not
|
||||
@ -2252,13 +2278,13 @@ main(int argc, char **argv)
|
||||
* point to the LSN reported by setup_publisher(). It also cleans up
|
||||
* publications created by this tool and replication to the standby.
|
||||
*/
|
||||
setup_subscriber(dbinfo, consistent_lsn);
|
||||
setup_subscriber(dbinfos.dbinfo, consistent_lsn);
|
||||
|
||||
/* Remove primary_slot_name if it exists on primary */
|
||||
drop_primary_replication_slot(dbinfo, primary_slot_name);
|
||||
drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
|
||||
|
||||
/* Remove failover replication slots if they exist on subscriber */
|
||||
drop_failover_replication_slots(dbinfo);
|
||||
drop_failover_replication_slots(dbinfos.dbinfo);
|
||||
|
||||
/* Stop the subscriber */
|
||||
pg_log_info("stopping the subscriber");
|
||||
|
@ -373,6 +373,7 @@ command_ok(
|
||||
|
||||
# Run pg_createsubscriber on node S. --verbose is used twice
|
||||
# to show more information.
|
||||
# In passing, also test the --enable-two-phase option
|
||||
command_ok(
|
||||
[
|
||||
'pg_createsubscriber',
|
||||
@ -388,6 +389,7 @@ command_ok(
|
||||
'--replication-slot' => 'replslot2',
|
||||
'--database' => $db1,
|
||||
'--database' => $db2,
|
||||
'--enable-two-phase'
|
||||
],
|
||||
'run pg_createsubscriber on node S');
|
||||
|
||||
@ -406,6 +408,15 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
|
||||
# Start subscriber
|
||||
$node_s->start;
|
||||
|
||||
# Verify that all subtwophase states are pending or enabled,
|
||||
# e.g. there are no subscriptions where subtwophase is disabled ('d')
|
||||
is( $node_s->safe_psql(
|
||||
'postgres',
|
||||
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'"
|
||||
),
|
||||
't',
|
||||
'subscriptions are created with the two-phase option enabled');
|
||||
|
||||
# Confirm the pre-existing subscription has been removed
|
||||
$result = $node_s->safe_psql(
|
||||
'postgres', qq(
|
||||
|
Reference in New Issue
Block a user