1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-16 06:01:02 +03:00

Add two-phase option in pg_createsubscriber.

This patch introduces the '--enable-two-phase' option to the
'pg_createsubscriber' utility, allowing users to enable two-phase commit
for all subscriptions during their creation.

Note that even without this option users can enable the two_phase option
for the subscriptions created by pg_createsubscriber. However, it requires
the subscription to be disabled first which could be inconvenient for
users.

When two-phase commit is enabled, prepared transactions are sent to the
subscriber at the time of 'PREPARE TRANSACTION', and they are processed as
two-phase transactions on the subscriber as well. If disabled, prepared
transactions are sent only when committed and are processed immediately by
the subscriber.

Author: Shubham Khanna <khannashubham1197@gmail.com>
Reviewed-by: vignesh C <vignesh21@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Ajin Cherian <itsajin@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAHv8RjLPdFP=kA5LNSmWZ=+GMXmO+LczvV6p9HJjsXxZz10KGA@mail.gmail.com
This commit is contained in:
Amit Kapila
2025-02-26 11:12:50 +05:30
parent adc6032fa8
commit e117cfb2f6
3 changed files with 83 additions and 30 deletions

View File

@ -165,6 +165,19 @@ PostgreSQL documentation
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>-T</option></term>
<term><option>--enable-two-phase</option></term>
<listitem>
<para>
Enables <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
commit for the subscription. When multiple databases are specified, this
option applies uniformly to all subscriptions created on those databases.
The default is <literal>false</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>-U <replaceable class="parameter">username</replaceable></option></term> <term><option>-U <replaceable class="parameter">username</replaceable></option></term>
<term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term> <term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
@ -300,7 +313,9 @@ PostgreSQL documentation
greater than or equal to the number of specified databases. The target greater than or equal to the number of specified databases. The target
server must have <xref linkend="guc-max-worker-processes"/> configured to a server must have <xref linkend="guc-max-worker-processes"/> configured to a
value greater than the number of specified databases. The target server value greater than the number of specified databases. The target server
must accept local connections. must accept local connections. If you are planning to use the
<option>--enable-two-phase</option> switch then you will also need to set
the <xref linkend="guc-max-prepared-transactions"/> appropriately.
</para> </para>
<para> <para>
@ -360,6 +375,7 @@ PostgreSQL documentation
</para> </para>
<para> <para>
Unless the <option>--enable-two-phase</option> switch is specified,
<application>pg_createsubscriber</application> sets up logical <application>pg_createsubscriber</application> sets up logical
replication with two-phase commit disabled. This means that any replication with two-phase commit disabled. This means that any
prepared transactions will be replicated at the time prepared transactions will be replicated at the time

View File

@ -38,6 +38,7 @@ struct CreateSubscriberOptions
char *socket_dir; /* directory for Unix-domain socket, if any */ char *socket_dir; /* directory for Unix-domain socket, if any */
char *sub_port; /* subscriber port number */ char *sub_port; /* subscriber port number */
const char *sub_username; /* subscriber username */ const char *sub_username; /* subscriber username */
bool two_phase; /* enable-two-phase option */
SimpleStringList database_names; /* list of database names */ SimpleStringList database_names; /* list of database names */
SimpleStringList pub_names; /* list of publication names */ SimpleStringList pub_names; /* list of publication names */
SimpleStringList sub_names; /* list of subscription names */ SimpleStringList sub_names; /* list of subscription names */
@ -45,6 +46,7 @@ struct CreateSubscriberOptions
int recovery_timeout; /* stop recovery after this time */ int recovery_timeout; /* stop recovery after this time */
}; };
/* per-database publication/subscription info */
struct LogicalRepInfo struct LogicalRepInfo
{ {
char *dbname; /* database name */ char *dbname; /* database name */
@ -58,6 +60,16 @@ struct LogicalRepInfo
bool made_publication; /* publication was created */ bool made_publication; /* publication was created */
}; };
/*
* Information shared across all the databases (or publications and
* subscriptions).
*/
struct LogicalRepInfos
{
struct LogicalRepInfo *dbinfo;
bool two_phase; /* enable-two-phase option */
};
static void cleanup_objects_atexit(void); static void cleanup_objects_atexit(void);
static void usage(); static void usage();
static char *get_base_conninfo(const char *conninfo, char **dbname); static char *get_base_conninfo(const char *conninfo, char **dbname);
@ -117,7 +129,7 @@ static bool dry_run = false;
static bool success = false; static bool success = false;
static struct LogicalRepInfo *dbinfo; static struct LogicalRepInfos dbinfos;
static int num_dbs = 0; /* number of specified databases */ static int num_dbs = 0; /* number of specified databases */
static int num_pubs = 0; /* number of specified publications */ static int num_pubs = 0; /* number of specified publications */
static int num_subs = 0; /* number of specified subscriptions */ static int num_subs = 0; /* number of specified subscriptions */
@ -172,17 +184,17 @@ cleanup_objects_atexit(void)
for (int i = 0; i < num_dbs; i++) for (int i = 0; i < num_dbs; i++)
{ {
if (dbinfo[i].made_publication || dbinfo[i].made_replslot) if (dbinfos.dbinfo[i].made_publication || dbinfos.dbinfo[i].made_replslot)
{ {
PGconn *conn; PGconn *conn;
conn = connect_database(dbinfo[i].pubconninfo, false); conn = connect_database(dbinfos.dbinfo[i].pubconninfo, false);
if (conn != NULL) if (conn != NULL)
{ {
if (dbinfo[i].made_publication) if (dbinfos.dbinfo[i].made_publication)
drop_publication(conn, &dbinfo[i]); drop_publication(conn, &dbinfos.dbinfo[i]);
if (dbinfo[i].made_replslot) if (dbinfos.dbinfo[i].made_replslot)
drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname); drop_replication_slot(conn, &dbinfos.dbinfo[i], dbinfos.dbinfo[i].replslotname);
disconnect_database(conn, false); disconnect_database(conn, false);
} }
else else
@ -192,16 +204,18 @@ cleanup_objects_atexit(void)
* that some objects were left on primary and should be * that some objects were left on primary and should be
* removed before trying again. * removed before trying again.
*/ */
if (dbinfo[i].made_publication) if (dbinfos.dbinfo[i].made_publication)
{ {
pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind", pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
dbinfo[i].pubname, dbinfo[i].dbname); dbinfos.dbinfo[i].pubname,
dbinfos.dbinfo[i].dbname);
pg_log_warning_hint("Drop this publication before trying again."); pg_log_warning_hint("Drop this publication before trying again.");
} }
if (dbinfo[i].made_replslot) if (dbinfos.dbinfo[i].made_replslot)
{ {
pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind", pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
dbinfo[i].replslotname, dbinfo[i].dbname); dbinfos.dbinfo[i].replslotname,
dbinfos.dbinfo[i].dbname);
pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files."); pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
} }
} }
@ -227,6 +241,7 @@ usage(void)
printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n")); printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n")); printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n")); printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n")); printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" --config-file=FILENAME use specified main server configuration\n" printf(_(" --config-file=FILENAME use specified main server configuration\n"
@ -479,9 +494,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)", dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)", dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
dbinfo[i].pubconninfo); dbinfo[i].pubconninfo);
pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i, pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
dbinfo[i].subname ? dbinfo[i].subname : "(auto)", dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
dbinfo[i].subconninfo); dbinfo[i].subconninfo,
dbinfos.two_phase ? "true" : "false");
if (num_pubs > 0) if (num_pubs > 0)
pubcell = pubcell->next; pubcell = pubcell->next;
@ -938,11 +954,12 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
failed = true; failed = true;
} }
if (max_prepared_transactions != 0) if (max_prepared_transactions != 0 && !dbinfos.two_phase)
{ {
pg_log_warning("two_phase option will not be enabled for replication slots"); pg_log_warning("two_phase option will not be enabled for replication slots");
pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. " pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
"Prepared transactions will be replicated at COMMIT PREPARED."); "Prepared transactions will be replicated at COMMIT PREPARED.");
pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
} }
/* /*
@ -1345,8 +1362,9 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name)); slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
appendPQExpBuffer(str, appendPQExpBuffer(str,
"SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)", "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
slot_name_esc); slot_name_esc,
dbinfos.two_phase ? "true" : "false");
PQfreemem(slot_name_esc); PQfreemem(slot_name_esc);
@ -1722,8 +1740,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
appendPQExpBuffer(str, appendPQExpBuffer(str,
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
"WITH (create_slot = false, enabled = false, " "WITH (create_slot = false, enabled = false, "
"slot_name = %s, copy_data = false)", "slot_name = %s, copy_data = false, two_phase = %s)",
subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc); subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
dbinfos.two_phase ? "true" : "false");
PQfreemem(pubname_esc); PQfreemem(pubname_esc);
PQfreemem(subname_esc); PQfreemem(subname_esc);
@ -1895,6 +1914,7 @@ main(int argc, char **argv)
{"publisher-server", required_argument, NULL, 'P'}, {"publisher-server", required_argument, NULL, 'P'},
{"socketdir", required_argument, NULL, 's'}, {"socketdir", required_argument, NULL, 's'},
{"recovery-timeout", required_argument, NULL, 't'}, {"recovery-timeout", required_argument, NULL, 't'},
{"enable-two-phase", no_argument, NULL, 'T'},
{"subscriber-username", required_argument, NULL, 'U'}, {"subscriber-username", required_argument, NULL, 'U'},
{"verbose", no_argument, NULL, 'v'}, {"verbose", no_argument, NULL, 'v'},
{"version", no_argument, NULL, 'V'}, {"version", no_argument, NULL, 'V'},
@ -1950,6 +1970,7 @@ main(int argc, char **argv)
opt.socket_dir = NULL; opt.socket_dir = NULL;
opt.sub_port = DEFAULT_SUB_PORT; opt.sub_port = DEFAULT_SUB_PORT;
opt.sub_username = NULL; opt.sub_username = NULL;
opt.two_phase = false;
opt.database_names = (SimpleStringList) opt.database_names = (SimpleStringList)
{ {
0 0
@ -1972,7 +1993,7 @@ main(int argc, char **argv)
get_restricted_token(); get_restricted_token();
while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v", while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
long_options, &option_index)) != -1) long_options, &option_index)) != -1)
{ {
switch (c) switch (c)
@ -2009,6 +2030,9 @@ main(int argc, char **argv)
case 't': case 't':
opt.recovery_timeout = atoi(optarg); opt.recovery_timeout = atoi(optarg);
break; break;
case 'T':
opt.two_phase = true;
break;
case 'U': case 'U':
opt.sub_username = pg_strdup(optarg); opt.sub_username = pg_strdup(optarg);
break; break;
@ -2170,12 +2194,14 @@ main(int argc, char **argv)
/* Rudimentary check for a data directory */ /* Rudimentary check for a data directory */
check_data_directory(subscriber_dir); check_data_directory(subscriber_dir);
dbinfos.two_phase = opt.two_phase;
/* /*
* Store database information for publisher and subscriber. It should be * Store database information for publisher and subscriber. It should be
* called before atexit() because its return is used in the * called before atexit() because its return is used in the
* cleanup_objects_atexit(). * cleanup_objects_atexit().
*/ */
dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo); dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
/* Register a function to clean up objects in case of failure */ /* Register a function to clean up objects in case of failure */
atexit(cleanup_objects_atexit); atexit(cleanup_objects_atexit);
@ -2184,7 +2210,7 @@ main(int argc, char **argv)
* Check if the subscriber data directory has the same system identifier * Check if the subscriber data directory has the same system identifier
* than the publisher data directory. * than the publisher data directory.
*/ */
pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo); pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
sub_sysid = get_standby_sysid(subscriber_dir); sub_sysid = get_standby_sysid(subscriber_dir);
if (pub_sysid != sub_sysid) if (pub_sysid != sub_sysid)
pg_fatal("subscriber data directory is not a copy of the source database cluster"); pg_fatal("subscriber data directory is not a copy of the source database cluster");
@ -2214,10 +2240,10 @@ main(int argc, char **argv)
start_standby_server(&opt, true, false); start_standby_server(&opt, true, false);
/* Check if the standby server is ready for logical replication */ /* Check if the standby server is ready for logical replication */
check_subscriber(dbinfo); check_subscriber(dbinfos.dbinfo);
/* Check if the primary server is ready for logical replication */ /* Check if the primary server is ready for logical replication */
check_publisher(dbinfo); check_publisher(dbinfos.dbinfo);
/* /*
* Stop the target server. The recovery process requires that the server * Stop the target server. The recovery process requires that the server
@ -2230,10 +2256,10 @@ main(int argc, char **argv)
stop_standby_server(subscriber_dir); stop_standby_server(subscriber_dir);
/* Create the required objects for each database on publisher */ /* Create the required objects for each database on publisher */
consistent_lsn = setup_publisher(dbinfo); consistent_lsn = setup_publisher(dbinfos.dbinfo);
/* Write the required recovery parameters */ /* Write the required recovery parameters */
setup_recovery(dbinfo, subscriber_dir, consistent_lsn); setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
/* /*
* Start subscriber so the recovery parameters will take effect. Wait * Start subscriber so the recovery parameters will take effect. Wait
@ -2244,7 +2270,7 @@ main(int argc, char **argv)
start_standby_server(&opt, true, true); start_standby_server(&opt, true, true);
/* Waiting the subscriber to be promoted */ /* Waiting the subscriber to be promoted */
wait_for_end_recovery(dbinfo[0].subconninfo, &opt); wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt);
/* /*
* Create the subscription for each database on subscriber. It does not * Create the subscription for each database on subscriber. It does not
@ -2252,13 +2278,13 @@ main(int argc, char **argv)
* point to the LSN reported by setup_publisher(). It also cleans up * point to the LSN reported by setup_publisher(). It also cleans up
* publications created by this tool and replication to the standby. * publications created by this tool and replication to the standby.
*/ */
setup_subscriber(dbinfo, consistent_lsn); setup_subscriber(dbinfos.dbinfo, consistent_lsn);
/* Remove primary_slot_name if it exists on primary */ /* Remove primary_slot_name if it exists on primary */
drop_primary_replication_slot(dbinfo, primary_slot_name); drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
/* Remove failover replication slots if they exist on subscriber */ /* Remove failover replication slots if they exist on subscriber */
drop_failover_replication_slots(dbinfo); drop_failover_replication_slots(dbinfos.dbinfo);
/* Stop the subscriber */ /* Stop the subscriber */
pg_log_info("stopping the subscriber"); pg_log_info("stopping the subscriber");

View File

@ -373,6 +373,7 @@ command_ok(
# Run pg_createsubscriber on node S. --verbose is used twice # Run pg_createsubscriber on node S. --verbose is used twice
# to show more information. # to show more information.
# In passing, also test the --enable-two-phase option
command_ok( command_ok(
[ [
'pg_createsubscriber', 'pg_createsubscriber',
@ -388,6 +389,7 @@ command_ok(
'--replication-slot' => 'replslot2', '--replication-slot' => 'replslot2',
'--database' => $db1, '--database' => $db1,
'--database' => $db2, '--database' => $db2,
'--enable-two-phase'
], ],
'run pg_createsubscriber on node S'); 'run pg_createsubscriber on node S');
@ -406,6 +408,15 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
# Start subscriber # Start subscriber
$node_s->start; $node_s->start;
# Verify that all subtwophase states are pending or enabled,
# e.g. there are no subscriptions where subtwophase is disabled ('d')
is( $node_s->safe_psql(
'postgres',
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'"
),
't',
'subscriptions are created with the two-phase option enabled');
# Confirm the pre-existing subscription has been removed # Confirm the pre-existing subscription has been removed
$result = $node_s->safe_psql( $result = $node_s->safe_psql(
'postgres', qq( 'postgres', qq(