diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index b4b996236e4..380d0b1c35c 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -146,6 +146,29 @@ PostgreSQL documentation
+
+
+
+
+
+ Remove all objects of the specified type from specified databases on the
+ target server.
+
+
+ publications: The "all tables" publications established for this
+ subscriber are always removed; specifying this object type causes all
+ other publications replicated from the source server to be dropped as
+ well.
+
+
+ The objects selected to be dropped are individually logged and do show
+ up in a --dry-run. There is no opportunity to affect or stop the
+ dropping of the selected objects so consider taking a backup of them
+ using pg_dump.
+
+
+
+
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 6baf92e8024..d067eb44e6c 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -29,6 +29,7 @@
#include "getopt_long.h"
#define DEFAULT_SUB_PORT "50432"
+#define OBJECTTYPE_PUBLICATIONS 0x0001
/* Command-line options */
struct CreateSubscriberOptions
@@ -44,6 +45,7 @@ struct CreateSubscriberOptions
SimpleStringList sub_names; /* list of subscription names */
SimpleStringList replslot_names; /* list of replication slot names */
int recovery_timeout; /* stop recovery after this time */
+ SimpleStringList objecttypes_to_remove; /* list of object types to remove */
};
/* per-database publication/subscription info */
@@ -68,6 +70,8 @@ struct LogicalRepInfos
{
struct LogicalRepInfo *dbinfo;
bool two_phase; /* enable-two-phase option */
+ bits32 objecttypes_to_remove; /* flags indicating which object types
+ * to remove on subscriber */
};
static void cleanup_objects_atexit(void);
@@ -109,7 +113,9 @@ static void stop_standby_server(const char *datadir);
static void wait_for_end_recovery(const char *conninfo,
const struct CreateSubscriberOptions *opt);
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_publication(PGconn *conn, const char *pubname,
+ const char *dbname, bool *made_publication);
+static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
const char *lsn);
@@ -194,7 +200,8 @@ cleanup_objects_atexit(void)
if (conn != NULL)
{
if (dbinfo->made_publication)
- drop_publication(conn, dbinfo);
+ drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+ &dbinfo->made_publication);
if (dbinfo->made_replslot)
drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
disconnect_database(conn, false);
@@ -241,6 +248,8 @@ usage(void)
printf(_(" -n, --dry-run dry run, just show what would be done\n"));
printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
+ printf(_(" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"
+ " databases on the subscriber; accepts: publications\n"));
printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
@@ -1193,12 +1202,8 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
*/
check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
- /*
- * Since the publication was created before the consistent LSN, it is
- * available on the subscriber when the physical replica is promoted.
- * Remove publications from the subscriber because it has no use.
- */
- drop_publication(conn, &dbinfo[i]);
+ /* Check and drop the required publications in the given database. */
+ check_and_drop_publications(conn, &dbinfo[i]);
create_subscription(conn, &dbinfo[i]);
@@ -1663,10 +1668,11 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
}
/*
- * Remove publication if it couldn't finish all steps.
+ * Drop the specified publication in the given database.
*/
static void
-drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+drop_publication(PGconn *conn, const char *pubname, const char *dbname,
+ bool *made_publication)
{
PQExpBuffer str = createPQExpBuffer();
PGresult *res;
@@ -1674,10 +1680,10 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
Assert(conn != NULL);
- pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+ pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
pg_log_info("dropping publication \"%s\" in database \"%s\"",
- dbinfo->pubname, dbinfo->dbname);
+ pubname, dbname);
appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
@@ -1691,8 +1697,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
- dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
- dbinfo->made_publication = false; /* don't try again. */
+ pubname, dbname, PQresultErrorMessage(res));
+ *made_publication = false; /* don't try again. */
/*
* Don't disconnect and exit here. This routine is used by primary
@@ -1708,6 +1714,55 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
destroyPQExpBuffer(str);
}
+/*
+ * Retrieve and drop the publications.
+ *
+ * Since the publications were created before the consistent LSN, they
+ * remain on the subscriber even after the physical replica is
+ * promoted. Remove these publications from the subscriber because
+ * they have no use. Additionally, if requested, drop all pre-existing
+ * publications.
+ */
+static void
+check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
+{
+ PGresult *res;
+ bool drop_all_pubs = dbinfos.objecttypes_to_remove & OBJECTTYPE_PUBLICATIONS;
+
+ Assert(conn != NULL);
+
+ if (drop_all_pubs)
+ {
+ pg_log_info("dropping all existing publications in database \"%s\"",
+ dbinfo->dbname);
+
+ /* Fetch all publication names */
+ res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not obtain publication information: %s",
+ PQresultErrorMessage(res));
+ PQclear(res);
+ disconnect_database(conn, true);
+ }
+
+ /* Drop each publication */
+ for (int i = 0; i < PQntuples(res); i++)
+ drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
+ &dbinfo->made_publication);
+
+ PQclear(res);
+ }
+
+ /*
+ * In dry-run mode, we don't create publications, but we still try to drop
+ * those to provide necessary information to the user.
+ */
+ if (!drop_all_pubs || dry_run)
+ drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+ &dbinfo->made_publication);
+}
+
/*
* Create a subscription with some predefined options.
*
@@ -1914,6 +1969,7 @@ main(int argc, char **argv)
{"dry-run", no_argument, NULL, 'n'},
{"subscriber-port", required_argument, NULL, 'p'},
{"publisher-server", required_argument, NULL, 'P'},
+ {"remove", required_argument, NULL, 'R'},
{"socketdir", required_argument, NULL, 's'},
{"recovery-timeout", required_argument, NULL, 't'},
{"enable-two-phase", no_argument, NULL, 'T'},
@@ -1995,7 +2051,7 @@ main(int argc, char **argv)
get_restricted_token();
- while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
+ while ((c = getopt_long(argc, argv, "d:D:np:P:R:s:t:TU:v",
long_options, &option_index)) != -1)
{
switch (c)
@@ -2025,6 +2081,12 @@ main(int argc, char **argv)
case 'P':
opt.pub_conninfo_str = pg_strdup(optarg);
break;
+ case 'R':
+ if (!simple_string_list_member(&opt.objecttypes_to_remove, optarg))
+ simple_string_list_append(&opt.objecttypes_to_remove, optarg);
+ else
+ pg_fatal("object type \"%s\" is specified more than once for --remove", optarg);
+ break;
case 's':
opt.socket_dir = pg_strdup(optarg);
canonicalize_path(opt.socket_dir);
@@ -2189,6 +2251,19 @@ main(int argc, char **argv)
exit(1);
}
+ /* Verify the object types specified for removal from the subscriber */
+ for (SimpleStringListCell *cell = opt.objecttypes_to_remove.head; cell; cell = cell->next)
+ {
+ if (pg_strcasecmp(cell->val, "publications") == 0)
+ dbinfos.objecttypes_to_remove |= OBJECTTYPE_PUBLICATIONS;
+ else
+ {
+ pg_log_error("invalid object type \"%s\" specified for --remove", cell->val);
+ pg_log_error_hint("The valid option is: \"publications\"");
+ exit(1);
+ }
+ }
+
/* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c35fa108ce3..2c9bd5bdb9e 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -329,6 +329,21 @@ $node_p->safe_psql($db1,
"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
);
$node_p->wait_for_replay_catchup($node_s);
+
+# Create user-defined publications, wait for streaming replication to sync them
+# to the standby, then verify that '--remove'
+# removes them.
+$node_p->safe_psql(
+ $db1, qq(
+ CREATE PUBLICATION test_pub1 FOR ALL TABLES;
+ CREATE PUBLICATION test_pub2 FOR ALL TABLES;
+));
+
+$node_p->wait_for_replay_catchup($node_s);
+
+ok($node_s->safe_psql($db1, "SELECT COUNT(*) = 2 FROM pg_publication"),
+ 'two pre-existing publications on subscriber');
+
$node_s->stop;
# dry run mode on node S
@@ -373,7 +388,8 @@ command_ok(
# Run pg_createsubscriber on node S. --verbose is used twice
# to show more information.
-# In passing, also test the --enable-two-phase option
+# In passing, also test the --enable-two-phase option and
+# --remove option
command_ok(
[
'pg_createsubscriber',
@@ -389,7 +405,8 @@ command_ok(
'--replication-slot' => 'replslot2',
'--database' => $db1,
'--database' => $db2,
- '--enable-two-phase'
+ '--enable-two-phase',
+ '--remove' => 'publications',
],
'run pg_createsubscriber on node S');
@@ -408,6 +425,10 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
# Start subscriber
$node_s->start;
+# Confirm publications are removed from the subscriber node
+is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
+ '0', 'all publications on subscriber have been removed');
+
# Verify that all subtwophase states are pending or enabled,
# e.g. there are no subscriptions where subtwophase is disabled ('d')
is( $node_s->safe_psql(