diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index 4a42999b183..f5be638867a 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -205,6 +205,7 @@ Complete list of usable sgml source files in this directory. + diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml new file mode 100644 index 00000000000..1132b7a69ac --- /dev/null +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -0,0 +1,523 @@ + + + + + pg_createsubscriber + + + + pg_createsubscriber + 1 + Application + + + + pg_createsubscriber + convert a physical replica into a new logical replica + + + + + pg_createsubscriber + option + + + + + + dbname + + + + + datadir + + + + + connstr + + + + + + Description + + + pg_createsubscriber creates a new logical + replica from a physical standby server. All tables in the specified + database are included in the logical replication setup. A pair of + publication and subscription objects are created for each database. It + must be run at the target server. + + + + After a successful run, the state of the target server is analogous to a + fresh logical replication setup. The main difference between the logical + replication setup and pg_createsubscriber is the + initial data copy. It does only the synchronization phase, which ensures + each table is brought up to a synchronized state. + + + + The pg_createsubscriber targets large database + systems because in logical replication setup, most of the time is spent + doing the initial data copy. Furthermore, a side effect of this long time + spent synchronizing data is usually a large amount of changes to be applied + (that were produced during the initial data copy), which increases even + more the time when the logical replica will be available. For smaller + databases, initial data + synchronization is recommended. + + + + + Options + + + pg_createsubscriber accepts the following + command-line arguments: + + + + + + + + The database name to create the subscription. Multiple databases can + be selected by writing multiple switches. + + + + + + + + + + The target directory that contains a cluster directory from a physical + replica. + + + + + + + + + + Do everything except actually modifying the target directory. + + + + + + + + + + The port number on which the target server is listening for + connections. Defaults to running the target server on port 50432 to + avoid unintended client connections. + + + + + + + + + + The connection string to the publisher. For details see . + + + + + + + + + + The directory to use for postmaster sockets on target server. The + default is current directory. + + + + + + + + + + The maximum number of seconds to wait for recovery to end. Setting to + 0 disables. The default is 0. + + + + + + + + + + The user name to connect as on target server. Defaults to the current + operating system user name. + + + + + + + + + + Enables verbose mode. This will cause + pg_createsubscriber to output progress + messages and detailed information about each step to standard error. + Repeating the option causes additional debug-level messages to appear + on standard error. + + + + + + + + + Use the specified main server configuration file for the target data + directory. The pg_createsubscriber uses + internally the pg_ctl command to start and + stop the target server. It allows you to specify the actual + postgresql.conf configuration file if it is stored + outside the data directory. + + + + + + + + + The publication name to set up the logical replication. Multiple + publications can be specified by writing multiple + switches. The number of publication + names must match the number of specified databases, otherwise an error + is reported. The order of the multiple publication name switches must + match the order of database switches. If this option is not specified, + a generated name is assigned to the publication name. + + + + + + + + + The subscription name to set up the logical replication. Multiple + subscriptions can be specified by writing multiple + switches. The number of subscription + names must match the number of specified databases, otherwise an error + is reported. The order of the multiple subscription name switches must + match the order of database switches. If this option is not specified, + a generated name is assigned to the subscription name. + + + + + + + + + The replication slot name to set up the logical replication. Multiple + replication slots can be specified by writing multiple + switches. The number of + replication slot names must match the number of specified databases, + otherwise an error is reported. The order of the multiple replication + slot name switches must match the order of database switches. If this + option is not specified, the subscription name is assigned to the + replication slot name. + + + + + + + + + + Show help about pg_createsubscriber command + line arguments, and exit. + + + + + + + + + + Print the pg_createsubscriber version and exit. + + + + + + + + + Notes + + + Prerequisites + + + There are some prerequisites for + pg_createsubscriber to convert the target server + into a logical replica. If these are not met, an error will be reported. + The source and target servers must have the same major version as the + pg_createsubscriber. The given target data + directory must have the same system identifier as the source data + directory. The given database user for the target data directory must have + privileges for creating subscriptions and using pg_replication_origin_advance(). + + + + The target server must be used as a physical standby. The target server + must have and configured to a value + greater than or equal to the number of specified databases. The target + server must have configured to a + value greater than the number of specified databases. The target server + must accept local connections. + + + + The source server must accept connections from the target server. The + source server must not be in recovery. The source server must have as logical. The source server + must have configured to a value + greater than or equal to the number of specified databases plus existing + replication slots. The source server must have configured to a value greater than or equal + to the number of specified databases and existing WAL sender processes. + + + + + Warnings + + + If pg_createsubscriber fails after the target + server was promoted, then the data directory is likely not in a state that + can be recovered. In such case, creating a new standby server is + recommended. + + + + pg_createsubscriber usually starts the target + server with different connection settings during transformation. Hence, + connections to the target server should fail. + + + + During the recovery process, if the target server disconnects from the + source server, pg_createsubscriber will check a + few times if the connection has been reestablished to stream the required + WAL. After a few attempts, it terminates with an error. + + + + Since DDL commands are not replicated by logical replication, avoid + executing DDL commands that change the database schema while running + pg_createsubscriber. If the target server has + already been converted to logical replica, the DDL commands might not be + replicated, which might cause an error. + + + + If pg_createsubscriber fails while processing, + objects (publications, replication slots) created on the source server are + removed. The removal might fail if the target server cannot connect to + the source server. In such a case, a warning message will inform the + objects left. If the target server is running, it will be stopped. + + + + If the replication is using , it + will be removed from the source server after the logical replication + setup. + + + + If the target server is a synchronous replica, transaction commits on the + primary might wait for replication while running + pg_createsubscriber. + + + + pg_createsubscriber changes the system + identifier using pg_resetwal. It would avoid + situations in which the target server might use WAL files from the source + server. If the target server has a standby, replication will break and a + fresh standby should be created. + + + + + How It Works + + + The basic idea is to have a replication start point from the source server + and set up a logical replication to start from this point: + + + + + + Start the target server with the specified command-line options. If the + target server is already running, + pg_createsubscriber will terminate with an + error. + + + + + + Check if the target server can be converted. There are also a few + checks on the source server. If any of the prerequisites are not met, + pg_createsubscriber will terminate with an + error. + + + + + + Create a publication and replication slot for each specified database on + the source server. Each publication is created using FOR ALL + TABLES. If option is + not specified, it has the following name pattern: + pg_createsubscriber_%u_%x (parameter: + database oid, random int). + If is not specified, the + replication slot has the following name pattern: + pg_createsubscriber_%u_%x (parameters: + database oid, random int). + These replication slots will be used by the subscriptions in a future + step. The last replication slot LSN is used as a stopping point in the + parameter and by the + subscriptions as a replication start point. It guarantees that no + transaction will be lost. + + + + + + Write recovery parameters into the target data directory and restart the + target server. It specifies an LSN () of the write-ahead log location up + to which recovery will proceed. It also specifies + promote as the action that the server should take + once the recovery target is reached. Additional recovery parameters + are added to avoid unexpected behavior during the recovery process such + as end of the recovery as soon as a consistent state is reached (WAL + should be applied until the replication start location) and multiple + recovery targets that can cause a failure. This step finishes once the + server ends standby mode and is accepting read-write transactions. If + option is set, + pg_createsubscriber terminates if recovery + does not end until the given number of seconds. + + + + + + Create a subscription for each specified database on the target server. + If is not specified, the subscription + has the following name pattern: + pg_createsubscriber_%u_%x (parameters: + database oid, random int). + It does not copy existing data from the source server. It does not + create a replication slot. Instead, it uses the replication slot that + was created in a previous step. The subscription is created but it is + not enabled yet. The reason is the replication progress must be set to + the replication start point before starting the replication. + + + + + + Drop publications on the target server that were replicated because they + were created before the replication start location. It has no use on + the subscriber. + + + + + + Set the replication progress to the replication start point for each + subscription. When the target server starts the recovery process, it + catches up to the replication start point. This is the exact LSN to be + used as a initial replication location for each subscription. The + replication origin name is obtained since the subscription was created. + The replication origin name and the replication start point are used in + pg_replication_origin_advance() + to set up the initial replication location. + + + + + + Enable the subscription for each specified database on the target server. + The subscription starts applying transactions from the replication start + point. + + + + + + If the standby server was using , + it has no use from now on so drop it. + + + + + + Update the system identifier on the target server. The + is run to modify the system identifier. + The target server is stopped as a pg_resetwal requirement. + + + + + + + + Examples + + + To create a logical replica for databases hr and + finance from a physical replica at + foo: + +$ pg_createsubscriber -D /usr/local/pgsql/data -P "host=foo" -d hr -d finance + + + + + + See Also + + + + + + diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index aa94f6adf6a..ff85ace83fc 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -282,6 +282,7 @@ &pgarchivecleanup; &pgChecksums; &pgControldata; + &pgCreateSubscriber; &pgCtl; &pgResetwal; &pgRewind; diff --git a/src/bin/pg_basebackup/.gitignore b/src/bin/pg_basebackup/.gitignore index 26048bdbd84..14d5de6c01e 100644 --- a/src/bin/pg_basebackup/.gitignore +++ b/src/bin/pg_basebackup/.gitignore @@ -1,4 +1,5 @@ /pg_basebackup +/pg_createsubscriber /pg_receivewal /pg_recvlogical diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index abfb6440ec8..26c53e473f5 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -44,11 +44,14 @@ BBOBJS = \ bbstreamer_tar.o \ bbstreamer_zstd.o -all: pg_basebackup pg_receivewal pg_recvlogical +all: pg_basebackup pg_createsubscriber pg_receivewal pg_recvlogical pg_basebackup: $(BBOBJS) $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils $(CC) $(CFLAGS) $(BBOBJS) $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) +pg_createsubscriber: pg_createsubscriber.o $(WIN32RES) | submake-libpq submake-libpgport submake-libpgfeutils + $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) + pg_receivewal: pg_receivewal.o $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils $(CC) $(CFLAGS) pg_receivewal.o $(OBJS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) @@ -57,6 +60,7 @@ pg_recvlogical: pg_recvlogical.o $(OBJS) | submake-libpq submake-libpgport subma install: all installdirs $(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)' + $(INSTALL_PROGRAM) pg_createsubscriber$(X) '$(DESTDIR)$(bindir)/pg_createsubscriber$(X)' $(INSTALL_PROGRAM) pg_receivewal$(X) '$(DESTDIR)$(bindir)/pg_receivewal$(X)' $(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)' @@ -65,12 +69,13 @@ installdirs: uninstall: rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)' + rm -f '$(DESTDIR)$(bindir)/pg_createsubscriber$(X)' rm -f '$(DESTDIR)$(bindir)/pg_receivewal$(X)' rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)' clean distclean: - rm -f pg_basebackup$(X) pg_receivewal$(X) pg_recvlogical$(X) \ - $(BBOBJS) pg_receivewal.o pg_recvlogical.o \ + rm -f pg_basebackup$(X) pg_createsubscriber$(X) pg_receivewal$(X) pg_recvlogical$(X) \ + $(BBOBJS) pg_createsubscriber.o pg_receivewal.o pg_recvlogical.o \ $(OBJS) rm -rf tmp_check diff --git a/src/bin/pg_basebackup/meson.build b/src/bin/pg_basebackup/meson.build index f7e60e6670a..c00acd5e118 100644 --- a/src/bin/pg_basebackup/meson.build +++ b/src/bin/pg_basebackup/meson.build @@ -38,6 +38,24 @@ pg_basebackup = executable('pg_basebackup', bin_targets += pg_basebackup +pg_createsubscriber_sources = files( + 'pg_createsubscriber.c' +) + +if host_system == 'windows' + pg_createsubscriber_sources += rc_bin_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'pg_createsubscriber', + '--FILEDESC', 'pg_createsubscriber - create a new logical replica from a standby server',]) +endif + +pg_createsubscriber = executable('pg_createsubscriber', + pg_createsubscriber_sources, + dependencies: [frontend_code, libpq], + kwargs: default_bin_args, +) +bin_targets += pg_createsubscriber + + pg_receivewal_sources = files( 'pg_receivewal.c', ) @@ -89,6 +107,7 @@ tests += { 't/011_in_place_tablespace.pl', 't/020_pg_receivewal.pl', 't/030_pg_recvlogical.pl', + 't/040_pg_createsubscriber.pl', ], }, } diff --git a/src/bin/pg_basebackup/nls.mk b/src/bin/pg_basebackup/nls.mk index fc475003e8e..7870cea71ce 100644 --- a/src/bin/pg_basebackup/nls.mk +++ b/src/bin/pg_basebackup/nls.mk @@ -8,6 +8,7 @@ GETTEXT_FILES = $(FRONTEND_COMMON_GETTEXT_FILES) \ bbstreamer_tar.c \ bbstreamer_zstd.c \ pg_basebackup.c \ + pg_createsubscriber.c \ pg_receivewal.c \ pg_recvlogical.c \ receivelog.c \ diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c new file mode 100644 index 00000000000..b8f82693405 --- /dev/null +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -0,0 +1,2141 @@ +/*------------------------------------------------------------------------- + * + * pg_createsubscriber.c + * Create a new logical replica from a standby server + * + * Copyright (C) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/pg_createsubscriber.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include +#include +#include + +#include "catalog/pg_authid_d.h" +#include "common/connect.h" +#include "common/controldata_utils.h" +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/pg_prng.h" +#include "common/restricted_token.h" +#include "fe_utils/recovery_gen.h" +#include "fe_utils/simple_list.h" +#include "getopt_long.h" + +#define DEFAULT_SUB_PORT "50432" + +/* Command-line options */ +struct CreateSubscriberOptions +{ + char *config_file; /* configuration file */ + char *pub_conninfo_str; /* publisher connection string */ + char *socket_dir; /* directory for Unix-domain socket, if any */ + char *sub_port; /* subscriber port number */ + const char *sub_username; /* subscriber username */ + SimpleStringList database_names; /* list of database names */ + SimpleStringList pub_names; /* list of publication names */ + SimpleStringList sub_names; /* list of subscription names */ + SimpleStringList replslot_names; /* list of replication slot names */ + int recovery_timeout; /* stop recovery after this time */ +}; + +struct LogicalRepInfo +{ + Oid oid; /* database OID */ + char *dbname; /* database name */ + char *pubconninfo; /* publisher connection string */ + char *subconninfo; /* subscriber connection string */ + char *pubname; /* publication name */ + char *subname; /* subscription name */ + char *replslotname; /* replication slot name */ + + bool made_replslot; /* replication slot was created */ + bool made_publication; /* publication was created */ +}; + +static void cleanup_objects_atexit(void); +static void usage(); +static char *get_base_conninfo(const char *conninfo, char **dbname); +static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt); +static char *get_exec_path(const char *argv0, const char *progname); +static void check_data_directory(const char *datadir); +static char *concat_conninfo_dbname(const char *conninfo, const char *dbname); +static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt, + const char *pub_base_conninfo, + const char *sub_base_conninfo); +static PGconn *connect_database(const char *conninfo, bool exit_on_error); +static void disconnect_database(PGconn *conn, bool exit_on_error); +static uint64 get_primary_sysid(const char *conninfo); +static uint64 get_standby_sysid(const char *datadir); +static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt); +static bool server_is_in_recovery(PGconn *conn); +static char *generate_object_name(PGconn *conn); +static void check_publisher(const struct LogicalRepInfo *dbinfo); +static char *setup_publisher(struct LogicalRepInfo *dbinfo); +static void check_subscriber(const struct LogicalRepInfo *dbinfo); +static void setup_subscriber(struct LogicalRepInfo *dbinfo, + const char *consistent_lsn); +static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, + const char *lsn); +static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, + const char *slotname); +static char *create_logical_replication_slot(PGconn *conn, + struct LogicalRepInfo *dbinfo); +static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, + const char *slot_name); +static void pg_ctl_status(const char *pg_ctl_cmd, int rc); +static void start_standby_server(const struct CreateSubscriberOptions *opt, + bool restricted_access); +static void stop_standby_server(const char *datadir); +static void wait_for_end_recovery(const char *conninfo, + const struct CreateSubscriberOptions *opt); +static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); +static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); +static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo); +static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, + const char *lsn); +static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo); + +#define USEC_PER_SEC 1000000 +#define WAIT_INTERVAL 1 /* 1 second */ + +static const char *progname; + +static char *primary_slot_name = NULL; +static bool dry_run = false; + +static bool success = false; + +static struct LogicalRepInfo *dbinfo; +static int num_dbs = 0; /* number of specified databases */ +static int num_pubs = 0; /* number of specified publications */ +static int num_subs = 0; /* number of specified subscriptions */ +static int num_replslots = 0; /* number of specified replication slots */ + +static pg_prng_state prng_state; + +static char *pg_ctl_path = NULL; +static char *pg_resetwal_path = NULL; + +/* standby / subscriber data directory */ +static char *subscriber_dir = NULL; + +static bool recovery_ended = false; +static bool standby_running = false; + +enum WaitPMResult +{ + POSTMASTER_READY, + POSTMASTER_STILL_STARTING +}; + + +/* + * Cleanup objects that were created by pg_createsubscriber if there is an + * error. + * + * Publications and replication slots are created on primary. Depending on the + * step it failed, it should remove the already created objects if it is + * possible (sometimes it won't work due to a connection issue). + * There is no cleanup on the target server. The steps on the target server are + * executed *after* promotion, hence, at this point, a failure means recreate + * the physical replica and start again. + */ +static void +cleanup_objects_atexit(void) +{ + if (success) + return; + + /* + * If the server is promoted, there is no way to use the current setup + * again. Warn the user that a new replication setup should be done before + * trying again. + */ + if (recovery_ended) + { + pg_log_warning("failed after the end of recovery"); + pg_log_warning_hint("The target server cannot be used as a physical replica anymore. " + "You must recreate the physical replica before continuing."); + } + + for (int i = 0; i < num_dbs; i++) + { + if (dbinfo[i].made_publication || dbinfo[i].made_replslot) + { + PGconn *conn; + + conn = connect_database(dbinfo[i].pubconninfo, false); + if (conn != NULL) + { + if (dbinfo[i].made_publication) + drop_publication(conn, &dbinfo[i]); + if (dbinfo[i].made_replslot) + drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname); + disconnect_database(conn, false); + } + else + { + /* + * If a connection could not be established, inform the user + * that some objects were left on primary and should be + * removed before trying again. + */ + if (dbinfo[i].made_publication) + { + pg_log_warning("publication \"%s\" in database \"%s\" on primary might be left behind", + dbinfo[i].pubname, dbinfo[i].dbname); + pg_log_warning_hint("Consider dropping this publication before trying again."); + } + if (dbinfo[i].made_replslot) + { + pg_log_warning("replication slot \"%s\" in database \"%s\" on primary might be left behind", + dbinfo[i].replslotname, dbinfo[i].dbname); + pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files."); + } + } + } + } + + if (standby_running) + stop_standby_server(subscriber_dir); +} + +static void +usage(void) +{ + printf(_("%s creates a new logical replica from a standby server.\n\n"), + progname); + printf(_("Usage:\n")); + printf(_(" %s [OPTION]...\n"), progname); + printf(_("\nOptions:\n")); + printf(_(" -d, --database=DBNAME database to create a subscription\n")); + printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n")); + printf(_(" -n, --dry-run dry run, just show what would be done\n")); + printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT); + printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n")); + printf(_(" -s, --socket-directory=DIR socket directory to use (default current directory)\n")); + printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n")); + printf(_(" -U, --subscriber-username=NAME subscriber username\n")); + printf(_(" -v, --verbose output verbose messages\n")); + printf(_(" --config-file=FILENAME use specified main server configuration\n" + " file when running target cluster\n")); + printf(_(" --publication=NAME publication name\n")); + printf(_(" --replication-slot=NAME replication slot name\n")); + printf(_(" --subscription=NAME subscription name\n")); + printf(_(" -V, --version output version information, then exit\n")); + printf(_(" -?, --help show this help, then exit\n")); + printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT); + printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL); +} + +/* + * Validate a connection string. Returns a base connection string that is a + * connection string without a database name. + * + * Since we might process multiple databases, each database name will be + * appended to this base connection string to provide a final connection + * string. If the second argument (dbname) is not null, returns dbname if the + * provided connection string contains it. If option --database is not + * provided, uses dbname as the only database to setup the logical replica. + * + * It is the caller's responsibility to free the returned connection string and + * dbname. + */ +static char * +get_base_conninfo(const char *conninfo, char **dbname) +{ + PQExpBuffer buf = createPQExpBuffer(); + PQconninfoOption *conn_opts = NULL; + PQconninfoOption *conn_opt; + char *errmsg = NULL; + char *ret; + int i; + + conn_opts = PQconninfoParse(conninfo, &errmsg); + if (conn_opts == NULL) + { + pg_log_error("could not parse connection string: %s", errmsg); + return NULL; + } + + i = 0; + for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++) + { + if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL) + { + if (dbname) + *dbname = pg_strdup(conn_opt->val); + continue; + } + + if (conn_opt->val != NULL && conn_opt->val[0] != '\0') + { + if (i > 0) + appendPQExpBufferChar(buf, ' '); + appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val); + i++; + } + } + + ret = pg_strdup(buf->data); + + destroyPQExpBuffer(buf); + PQconninfoFree(conn_opts); + + return ret; +} + +/* + * Build a subscriber connection string. Only a few parameters are supported + * since it starts a server with restricted access. + */ +static char * +get_sub_conninfo(const struct CreateSubscriberOptions *opt) +{ + PQExpBuffer buf = createPQExpBuffer(); + char *ret; + + appendPQExpBuffer(buf, "port=%s", opt->sub_port); +#if !defined(WIN32) + appendPQExpBuffer(buf, " host=%s", opt->socket_dir); +#endif + if (opt->sub_username != NULL) + appendPQExpBuffer(buf, " user=%s", opt->sub_username); + appendPQExpBuffer(buf, " fallback_application_name=%s", progname); + + ret = pg_strdup(buf->data); + + destroyPQExpBuffer(buf); + + return ret; +} + +/* + * Verify if a PostgreSQL binary (progname) is available in the same directory as + * pg_createsubscriber and it has the same version. It returns the absolute + * path of the progname. + */ +static char * +get_exec_path(const char *argv0, const char *progname) +{ + char *versionstr; + char *exec_path; + int ret; + + versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION); + exec_path = pg_malloc(MAXPGPATH); + ret = find_other_exec(argv0, progname, versionstr, exec_path); + + if (ret < 0) + { + char full_path[MAXPGPATH]; + + if (find_my_exec(argv0, full_path) < 0) + strlcpy(full_path, progname, sizeof(full_path)); + + if (ret == -1) + pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"", + progname, "pg_createsubscriber", full_path); + else + pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s", + progname, full_path, "pg_createsubscriber"); + } + + pg_log_debug("%s path is: %s", progname, exec_path); + + return exec_path; +} + +/* + * Is it a cluster directory? These are preliminary checks. It is far from + * making an accurate check. If it is not a clone from the publisher, it will + * eventually fail in a future step. + */ +static void +check_data_directory(const char *datadir) +{ + struct stat statbuf; + char versionfile[MAXPGPATH]; + + pg_log_info("checking if directory \"%s\" is a cluster data directory", + datadir); + + if (stat(datadir, &statbuf) != 0) + { + if (errno == ENOENT) + pg_fatal("data directory \"%s\" does not exist", datadir); + else + pg_fatal("could not access directory \"%s\": %s", datadir, + strerror(errno)); + } + + snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir); + if (stat(versionfile, &statbuf) != 0 && errno == ENOENT) + { + pg_fatal("directory \"%s\" is not a database cluster directory", + datadir); + } +} + +/* + * Append database name into a base connection string. + * + * dbname is the only parameter that changes so it is not included in the base + * connection string. This function concatenates dbname to build a "real" + * connection string. + */ +static char * +concat_conninfo_dbname(const char *conninfo, const char *dbname) +{ + PQExpBuffer buf = createPQExpBuffer(); + char *ret; + + Assert(conninfo != NULL); + + appendPQExpBufferStr(buf, conninfo); + appendPQExpBuffer(buf, " dbname=%s", dbname); + + ret = pg_strdup(buf->data); + destroyPQExpBuffer(buf); + + return ret; +} + +/* + * Store publication and subscription information. + * + * If publication, replication slot and subscription names were specified, + * store it here. Otherwise, a generated name will be assigned to the object in + * setup_publisher(). + */ +static struct LogicalRepInfo * +store_pub_sub_info(const struct CreateSubscriberOptions *opt, + const char *pub_base_conninfo, + const char *sub_base_conninfo) +{ + struct LogicalRepInfo *dbinfo; + SimpleStringListCell *pubcell = NULL; + SimpleStringListCell *subcell = NULL; + SimpleStringListCell *replslotcell = NULL; + int i = 0; + + dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs); + + if (num_pubs > 0) + pubcell = opt->pub_names.head; + if (num_subs > 0) + subcell = opt->sub_names.head; + if (num_replslots > 0) + replslotcell = opt->replslot_names.head; + + for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next) + { + char *conninfo; + + /* Fill publisher attributes */ + conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val); + dbinfo[i].pubconninfo = conninfo; + dbinfo[i].dbname = cell->val; + if (num_pubs > 0) + dbinfo[i].pubname = pubcell->val; + else + dbinfo[i].pubname = NULL; + if (num_replslots > 0) + dbinfo[i].replslotname = replslotcell->val; + else + dbinfo[i].replslotname = NULL; + dbinfo[i].made_replslot = false; + dbinfo[i].made_publication = false; + /* Fill subscriber attributes */ + conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val); + dbinfo[i].subconninfo = conninfo; + if (num_subs > 0) + dbinfo[i].subname = subcell->val; + else + dbinfo[i].subname = NULL; + /* Other fields will be filled later */ + + pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i, + dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)", + dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)", + dbinfo[i].pubconninfo); + pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i, + dbinfo[i].subname ? dbinfo[i].subname : "(auto)", + dbinfo[i].subconninfo); + + if (num_pubs > 0) + pubcell = pubcell->next; + if (num_subs > 0) + subcell = subcell->next; + if (num_replslots > 0) + replslotcell = replslotcell->next; + + i++; + } + + return dbinfo; +} + +/* + * Open a new connection. If exit_on_error is true, it has an undesired + * condition and it should exit immediately. + */ +static PGconn * +connect_database(const char *conninfo, bool exit_on_error) +{ + PGconn *conn; + PGresult *res; + + conn = PQconnectdb(conninfo); + if (PQstatus(conn) != CONNECTION_OK) + { + pg_log_error("connection to database failed: %s", + PQerrorMessage(conn)); + if (exit_on_error) + exit(1); + + return NULL; + } + + /* Secure search_path */ + res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not clear search_path: %s", + PQresultErrorMessage(res)); + if (exit_on_error) + exit(1); + + return NULL; + } + PQclear(res); + + return conn; +} + +/* + * Close the connection. If exit_on_error is true, it has an undesired + * condition and it should exit immediately. + */ +static void +disconnect_database(PGconn *conn, bool exit_on_error) +{ + Assert(conn != NULL); + + PQfinish(conn); + + if (exit_on_error) + exit(1); +} + +/* + * Obtain the system identifier using the provided connection. It will be used + * to compare if a data directory is a clone of another one. + */ +static uint64 +get_primary_sysid(const char *conninfo) +{ + PGconn *conn; + PGresult *res; + uint64 sysid; + + pg_log_info("getting system identifier from publisher"); + + conn = connect_database(conninfo, true); + + res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not get system identifier: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + if (PQntuples(res) != 1) + { + pg_log_error("could not get system identifier: got %d rows, expected %d row", + PQntuples(res), 1); + disconnect_database(conn, true); + } + + sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10); + + pg_log_info("system identifier is %llu on publisher", + (unsigned long long) sysid); + + PQclear(res); + disconnect_database(conn, false); + + return sysid; +} + +/* + * Obtain the system identifier from control file. It will be used to compare + * if a data directory is a clone of another one. This routine is used locally + * and avoids a connection. + */ +static uint64 +get_standby_sysid(const char *datadir) +{ + ControlFileData *cf; + bool crc_ok; + uint64 sysid; + + pg_log_info("getting system identifier from subscriber"); + + cf = get_controlfile(datadir, &crc_ok); + if (!crc_ok) + pg_fatal("control file appears to be corrupt"); + + sysid = cf->system_identifier; + + pg_log_info("system identifier is %llu on subscriber", + (unsigned long long) sysid); + + pg_free(cf); + + return sysid; +} + +/* + * Modify the system identifier. Since a standby server preserves the system + * identifier, it makes sense to change it to avoid situations in which WAL + * files from one of the systems might be used in the other one. + */ +static void +modify_subscriber_sysid(const struct CreateSubscriberOptions *opt) +{ + ControlFileData *cf; + bool crc_ok; + struct timeval tv; + + char *cmd_str; + + pg_log_info("modifying system identifier of subscriber"); + + cf = get_controlfile(subscriber_dir, &crc_ok); + if (!crc_ok) + pg_fatal("control file appears to be corrupt"); + + /* + * Select a new system identifier. + * + * XXX this code was extracted from BootStrapXLOG(). + */ + gettimeofday(&tv, NULL); + cf->system_identifier = ((uint64) tv.tv_sec) << 32; + cf->system_identifier |= ((uint64) tv.tv_usec) << 12; + cf->system_identifier |= getpid() & 0xFFF; + + if (!dry_run) + update_controlfile(subscriber_dir, cf, true); + + pg_log_info("system identifier is %llu on subscriber", + (unsigned long long) cf->system_identifier); + + pg_log_info("running pg_resetwal on the subscriber"); + + cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path, + subscriber_dir, DEVNULL); + + pg_log_debug("pg_resetwal command is: %s", cmd_str); + + if (!dry_run) + { + int rc = system(cmd_str); + + if (rc == 0) + pg_log_info("subscriber successfully changed the system identifier"); + else + pg_fatal("subscriber failed to change system identifier: exit code: %d", rc); + } + + pg_free(cf); +} + +/* + * Generate an object name using a prefix, database oid and a random integer. + * It is used in case the user does not specify an object name (publication, + * subscription, replication slot). + */ +static char * +generate_object_name(PGconn *conn) +{ + PGresult *res; + Oid oid; + uint32 rand; + char *objname; + + res = PQexec(conn, + "SELECT oid FROM pg_catalog.pg_database " + "WHERE datname = pg_catalog.current_database()"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain database OID: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + if (PQntuples(res) != 1) + { + pg_log_error("could not obtain database OID: got %d rows, expected %d rows", + PQntuples(res), 1); + disconnect_database(conn, true); + } + + /* Database OID */ + oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10); + + PQclear(res); + + /* Random unsigned integer */ + rand = pg_prng_uint32(&prng_state); + + /* + * Build the object name. The name must not exceed NAMEDATALEN - 1. This + * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 + + * '\0'). + */ + objname = psprintf("pg_createsubscriber_%u_%x", oid, rand); + + return objname; +} + +/* + * Create the publications and replication slots in preparation for logical + * replication. Returns the LSN from latest replication slot. It will be the + * replication start point that is used to adjust the subscriptions (see + * set_replication_progress). + */ +static char * +setup_publisher(struct LogicalRepInfo *dbinfo) +{ + char *lsn = NULL; + + pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL))); + + for (int i = 0; i < num_dbs; i++) + { + PGconn *conn; + char *genname = NULL; + + conn = connect_database(dbinfo[i].pubconninfo, true); + + /* + * If an object name was not specified as command-line options, assign + * a generated object name. The replication slot has a different rule. + * The subscription name is assigned to the replication slot name if + * no replication slot is specified. It follows the same rule as + * CREATE SUBSCRIPTION. + */ + if (num_pubs == 0 || num_subs == 0 || num_replslots == 0) + genname = generate_object_name(conn); + if (num_pubs == 0) + dbinfo[i].pubname = pg_strdup(genname); + if (num_subs == 0) + dbinfo[i].subname = pg_strdup(genname); + if (num_replslots == 0) + dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname); + + /* + * Create publication on publisher. This step should be executed + * *before* promoting the subscriber to avoid any transactions between + * consistent LSN and the new publication rows (such transactions + * wouldn't see the new publication rows resulting in an error). + */ + create_publication(conn, &dbinfo[i]); + + /* Create replication slot on publisher */ + if (lsn) + pg_free(lsn); + lsn = create_logical_replication_slot(conn, &dbinfo[i]); + if (lsn != NULL || dry_run) + pg_log_info("create replication slot \"%s\" on publisher", + dbinfo[i].replslotname); + else + exit(1); + + disconnect_database(conn, false); + } + + return lsn; +} + +/* + * Is recovery still in progress? + */ +static bool +server_is_in_recovery(PGconn *conn) +{ + PGresult *res; + int ret; + + res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain recovery progress: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + + ret = strcmp("t", PQgetvalue(res, 0, 0)); + + PQclear(res); + + return ret == 0; +} + +/* + * Is the primary server ready for logical replication? + * + * XXX Does it not allow a synchronous replica? + */ +static void +check_publisher(const struct LogicalRepInfo *dbinfo) +{ + PGconn *conn; + PGresult *res; + bool failed = false; + + char *wal_level; + int max_repslots; + int cur_repslots; + int max_walsenders; + int cur_walsenders; + + pg_log_info("checking settings on publisher"); + + conn = connect_database(dbinfo[0].pubconninfo, true); + + /* + * If the primary server is in recovery (i.e. cascading replication), + * objects (publication) cannot be created because it is read only. + */ + if (server_is_in_recovery(conn)) + { + pg_log_error("primary server cannot be in recovery"); + disconnect_database(conn, true); + } + + /*------------------------------------------------------------------------ + * Logical replication requires a few parameters to be set on publisher. + * Since these parameters are not a requirement for physical replication, + * we should check it to make sure it won't fail. + * + * - wal_level = logical + * - max_replication_slots >= current + number of dbs to be converted + * - max_wal_senders >= current + number of dbs to be converted + * ----------------------------------------------------------------------- + */ + res = PQexec(conn, + "WITH wl AS " + "(SELECT setting AS wallevel FROM pg_catalog.pg_settings " + "WHERE name = 'wal_level'), " + "total_mrs AS " + "(SELECT setting AS tmrs FROM pg_catalog.pg_settings " + "WHERE name = 'max_replication_slots'), " + "cur_mrs AS " + "(SELECT count(*) AS cmrs " + "FROM pg_catalog.pg_replication_slots), " + "total_mws AS " + "(SELECT setting AS tmws FROM pg_catalog.pg_settings " + "WHERE name = 'max_wal_senders'), " + "cur_mws AS " + "(SELECT count(*) AS cmws FROM pg_catalog.pg_stat_activity " + "WHERE backend_type = 'walsender') " + "SELECT wallevel, tmrs, cmrs, tmws, cmws " + "FROM wl, total_mrs, cur_mrs, total_mws, cur_mws"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain publisher settings: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + wal_level = pg_strdup(PQgetvalue(res, 0, 0)); + max_repslots = atoi(PQgetvalue(res, 0, 1)); + cur_repslots = atoi(PQgetvalue(res, 0, 2)); + max_walsenders = atoi(PQgetvalue(res, 0, 3)); + cur_walsenders = atoi(PQgetvalue(res, 0, 4)); + + PQclear(res); + + pg_log_debug("publisher: wal_level: %s", wal_level); + pg_log_debug("publisher: max_replication_slots: %d", max_repslots); + pg_log_debug("publisher: current replication slots: %d", cur_repslots); + pg_log_debug("publisher: max_wal_senders: %d", max_walsenders); + pg_log_debug("publisher: current wal senders: %d", cur_walsenders); + + /* + * If standby sets primary_slot_name, check if this replication slot is in + * use on primary for WAL retention purposes. This replication slot has no + * use after the transformation, hence, it will be removed at the end of + * this process. + */ + if (primary_slot_name) + { + PQExpBuffer str = createPQExpBuffer(); + char *psn_esc = PQescapeLiteral(conn, primary_slot_name, strlen(primary_slot_name)); + + appendPQExpBuffer(str, + "SELECT 1 FROM pg_catalog.pg_replication_slots " + "WHERE active AND slot_name = %s", + psn_esc); + + pg_free(psn_esc); + + pg_log_debug("command is: %s", str->data); + + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain replication slot information: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + if (PQntuples(res) != 1) + { + pg_log_error("could not obtain replication slot information: got %d rows, expected %d row", + PQntuples(res), 1); + disconnect_database(conn, true); + } + else + pg_log_info("primary has replication slot \"%s\"", + primary_slot_name); + + PQclear(res); + } + + disconnect_database(conn, false); + + if (strcmp(wal_level, "logical") != 0) + { + pg_log_error("publisher requires wal_level >= logical"); + failed = true; + } + + if (max_repslots - cur_repslots < num_dbs) + { + pg_log_error("publisher requires %d replication slots, but only %d remain", + num_dbs, max_repslots - cur_repslots); + pg_log_error_hint("Consider increasing max_replication_slots to at least %d.", + cur_repslots + num_dbs); + failed = true; + } + + if (max_walsenders - cur_walsenders < num_dbs) + { + pg_log_error("publisher requires %d wal sender processes, but only %d remain", + num_dbs, max_walsenders - cur_walsenders); + pg_log_error_hint("Consider increasing max_wal_senders to at least %d.", + cur_walsenders + num_dbs); + failed = true; + } + + if (failed) + exit(1); +} + +/* + * Is the standby server ready for logical replication? + * + * XXX Does it not allow a time-delayed replica? + * + * XXX In a cascaded replication scenario (P -> S -> C), if the target server + * is S, it cannot detect there is a replica (server C) because server S starts + * accepting only local connections and server C cannot connect to it. Hence, + * there is not a reliable way to provide a suitable error saying the server C + * will be broken at the end of this process (due to pg_resetwal). + */ +static void +check_subscriber(const struct LogicalRepInfo *dbinfo) +{ + PGconn *conn; + PGresult *res; + bool failed = false; + + int max_lrworkers; + int max_repslots; + int max_wprocs; + + pg_log_info("checking settings on subscriber"); + + conn = connect_database(dbinfo[0].subconninfo, true); + + /* The target server must be a standby */ + if (!server_is_in_recovery(conn)) + { + pg_log_error("target server must be a standby"); + disconnect_database(conn, true); + } + + /*------------------------------------------------------------------------ + * Logical replication requires a few parameters to be set on subscriber. + * Since these parameters are not a requirement for physical replication, + * we should check it to make sure it won't fail. + * + * - max_replication_slots >= number of dbs to be converted + * - max_logical_replication_workers >= number of dbs to be converted + * - max_worker_processes >= 1 + number of dbs to be converted + *------------------------------------------------------------------------ + */ + res = PQexec(conn, + "SELECT setting FROM pg_catalog.pg_settings WHERE name IN (" + "'max_logical_replication_workers', " + "'max_replication_slots', " + "'max_worker_processes', " + "'primary_slot_name') " + "ORDER BY name"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain subscriber settings: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + max_lrworkers = atoi(PQgetvalue(res, 0, 0)); + max_repslots = atoi(PQgetvalue(res, 1, 0)); + max_wprocs = atoi(PQgetvalue(res, 2, 0)); + if (strcmp(PQgetvalue(res, 3, 0), "") != 0) + primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0)); + + pg_log_debug("subscriber: max_logical_replication_workers: %d", + max_lrworkers); + pg_log_debug("subscriber: max_replication_slots: %d", max_repslots); + pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs); + if (primary_slot_name) + pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name); + + PQclear(res); + + disconnect_database(conn, false); + + if (max_repslots < num_dbs) + { + pg_log_error("subscriber requires %d replication slots, but only %d remain", + num_dbs, max_repslots); + pg_log_error_hint("Consider increasing max_replication_slots to at least %d.", + num_dbs); + failed = true; + } + + if (max_lrworkers < num_dbs) + { + pg_log_error("subscriber requires %d logical replication workers, but only %d remain", + num_dbs, max_lrworkers); + pg_log_error_hint("Consider increasing max_logical_replication_workers to at least %d.", + num_dbs); + failed = true; + } + + if (max_wprocs < num_dbs + 1) + { + pg_log_error("subscriber requires %d worker processes, but only %d remain", + num_dbs + 1, max_wprocs); + pg_log_error_hint("Consider increasing max_worker_processes to at least %d.", + num_dbs + 1); + failed = true; + } + + if (failed) + exit(1); +} + +/* + * Create the subscriptions, adjust the initial location for logical + * replication and enable the subscriptions. That's the last step for logical + * replication setup. + */ +static void +setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn) +{ + for (int i = 0; i < num_dbs; i++) + { + PGconn *conn; + + /* Connect to subscriber. */ + conn = connect_database(dbinfo[i].subconninfo, true); + + /* + * Since the publication was created before the consistent LSN, it is + * available on the subscriber when the physical replica is promoted. + * Remove publications from the subscriber because it has no use. + */ + drop_publication(conn, &dbinfo[i]); + + create_subscription(conn, &dbinfo[i]); + + /* Set the replication progress to the correct LSN */ + set_replication_progress(conn, &dbinfo[i], consistent_lsn); + + /* Enable subscription */ + enable_subscription(conn, &dbinfo[i]); + + disconnect_database(conn, false); + } +} + +/* + * Write the required recovery parameters. + */ +static void +setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn) +{ + PGconn *conn; + PQExpBuffer recoveryconfcontents; + + /* + * Despite of the recovery parameters will be written to the subscriber, + * use a publisher connection. The primary_conninfo is generated using the + * connection settings. + */ + conn = connect_database(dbinfo[0].pubconninfo, true); + + /* + * Write recovery parameters. + * + * The subscriber is not running yet. In dry run mode, the recovery + * parameters *won't* be written. An invalid LSN is used for printing + * purposes. Additional recovery parameters are added here. It avoids + * unexpected behavior such as end of recovery as soon as a consistent + * state is reached (recovery_target) and failure due to multiple recovery + * targets (name, time, xid, LSN). + */ + recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL); + appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n"); + appendPQExpBuffer(recoveryconfcontents, + "recovery_target_timeline = 'latest'\n"); + appendPQExpBuffer(recoveryconfcontents, + "recovery_target_inclusive = true\n"); + appendPQExpBuffer(recoveryconfcontents, + "recovery_target_action = promote\n"); + appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n"); + appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n"); + appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n"); + + if (dry_run) + { + appendPQExpBuffer(recoveryconfcontents, "# dry run mode"); + appendPQExpBuffer(recoveryconfcontents, + "recovery_target_lsn = '%X/%X'\n", + LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr)); + } + else + { + appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n", + lsn); + WriteRecoveryConfig(conn, datadir, recoveryconfcontents); + } + disconnect_database(conn, false); + + pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data); +} + +/* + * Drop physical replication slot on primary if the standby was using it. After + * the transformation, it has no use. + * + * XXX we might not fail here. Instead, we provide a warning so the user + * eventually drops this replication slot later. + */ +static void +drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname) +{ + PGconn *conn; + + /* Replication slot does not exist, do nothing */ + if (!primary_slot_name) + return; + + conn = connect_database(dbinfo[0].pubconninfo, false); + if (conn != NULL) + { + drop_replication_slot(conn, &dbinfo[0], slotname); + disconnect_database(conn, false); + } + else + { + pg_log_warning("could not drop replication slot \"%s\" on primary", + slotname); + pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files."); + } +} + +/* + * Create a logical replication slot and returns a LSN. + * + * CreateReplicationSlot() is not used because it does not provide the one-row + * result set that contains the LSN. + */ +static char * +create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo) +{ + PQExpBuffer str = createPQExpBuffer(); + PGresult *res = NULL; + const char *slot_name = dbinfo->replslotname; + char *slot_name_esc; + char *lsn = NULL; + + Assert(conn != NULL); + + pg_log_info("creating the replication slot \"%s\" on database \"%s\"", + slot_name, dbinfo->dbname); + + slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name)); + + appendPQExpBuffer(str, + "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)", + slot_name_esc); + + pg_free(slot_name_esc); + + pg_log_debug("command is: %s", str->data); + + if (!dry_run) + { + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s", + slot_name, dbinfo->dbname, + PQresultErrorMessage(res)); + return NULL; + } + + lsn = pg_strdup(PQgetvalue(res, 0, 0)); + PQclear(res); + } + + /* For cleanup purposes */ + dbinfo->made_replslot = true; + + destroyPQExpBuffer(str); + + return lsn; +} + +static void +drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, + const char *slot_name) +{ + PQExpBuffer str = createPQExpBuffer(); + char *slot_name_esc; + PGresult *res; + + Assert(conn != NULL); + + pg_log_info("dropping the replication slot \"%s\" on database \"%s\"", + slot_name, dbinfo->dbname); + + slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name)); + + appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc); + + pg_free(slot_name_esc); + + pg_log_debug("command is: %s", str->data); + + if (!dry_run) + { + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s", + slot_name, dbinfo->dbname, PQresultErrorMessage(res)); + dbinfo->made_replslot = false; /* don't try again. */ + } + + PQclear(res); + } + + destroyPQExpBuffer(str); +} + +/* + * Reports a suitable message if pg_ctl fails. + */ +static void +pg_ctl_status(const char *pg_ctl_cmd, int rc) +{ + if (rc != 0) + { + if (WIFEXITED(rc)) + { + pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc)); + } + else if (WIFSIGNALED(rc)) + { +#if defined(WIN32) + pg_log_error("pg_ctl was terminated by exception 0x%X", + WTERMSIG(rc)); + pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value."); +#else + pg_log_error("pg_ctl was terminated by signal %d: %s", + WTERMSIG(rc), pg_strsignal(WTERMSIG(rc))); +#endif + } + else + { + pg_log_error("pg_ctl exited with unrecognized status %d", rc); + } + + pg_log_error_detail("The failed command was: %s", pg_ctl_cmd); + exit(1); + } +} + +static void +start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access) +{ + PQExpBuffer pg_ctl_cmd = createPQExpBuffer(); + int rc; + + appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s", + pg_ctl_path, subscriber_dir); + if (restricted_access) + { + appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port); +#if !defined(WIN32) + + /* + * An empty listen_addresses list means the server does not listen on + * any IP interfaces; only Unix-domain sockets can be used to connect + * to the server. Prevent external connections to minimize the chance + * of failure. + */ + appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700"); + if (opt->socket_dir) + appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'", + opt->socket_dir); + appendPQExpBufferChar(pg_ctl_cmd, '"'); +#endif + } + if (opt->config_file != NULL) + appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"", + opt->config_file); + pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data); + rc = system(pg_ctl_cmd->data); + pg_ctl_status(pg_ctl_cmd->data, rc); + standby_running = true; + destroyPQExpBuffer(pg_ctl_cmd); + pg_log_info("server was started"); +} + +static void +stop_standby_server(const char *datadir) +{ + char *pg_ctl_cmd; + int rc; + + pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, + datadir); + pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd); + rc = system(pg_ctl_cmd); + pg_ctl_status(pg_ctl_cmd, rc); + standby_running = false; + pg_log_info("server was stopped"); +} + +/* + * Returns after the server finishes the recovery process. + * + * If recovery_timeout option is set, terminate abnormally without finishing + * the recovery process. By default, it waits forever. + */ +static void +wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt) +{ + PGconn *conn; + int status = POSTMASTER_STILL_STARTING; + int timer = 0; + int count = 0; /* number of consecutive connection attempts */ + +#define NUM_CONN_ATTEMPTS 10 + + pg_log_info("waiting for the target server to reach the consistent state"); + + conn = connect_database(conninfo, true); + + for (;;) + { + PGresult *res; + bool in_recovery = server_is_in_recovery(conn); + + /* + * Does the recovery process finish? In dry run mode, there is no + * recovery mode. Bail out as the recovery process has ended. + */ + if (!in_recovery || dry_run) + { + status = POSTMASTER_READY; + recovery_ended = true; + break; + } + + /* + * If it is still in recovery, make sure the target server is + * connected to the primary so it can receive the required WAL to + * finish the recovery process. If it is disconnected try + * NUM_CONN_ATTEMPTS in a row and bail out if not succeed. + */ + res = PQexec(conn, + "SELECT 1 FROM pg_catalog.pg_stat_wal_receiver"); + if (PQntuples(res) == 0) + { + if (++count > NUM_CONN_ATTEMPTS) + { + stop_standby_server(subscriber_dir); + pg_log_error("standby server disconnected from the primary"); + break; + } + } + else + count = 0; /* reset counter if it connects again */ + + PQclear(res); + + /* Bail out after recovery_timeout seconds if this option is set */ + if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout) + { + stop_standby_server(subscriber_dir); + pg_log_error("recovery timed out"); + disconnect_database(conn, true); + } + + /* Keep waiting */ + pg_usleep(WAIT_INTERVAL * USEC_PER_SEC); + + timer += WAIT_INTERVAL; + } + + disconnect_database(conn, false); + + if (status == POSTMASTER_STILL_STARTING) + pg_fatal("server did not end recovery"); + + pg_log_info("target server reached the consistent state"); + pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing."); +} + +/* + * Create a publication that includes all tables in the database. + */ +static void +create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo) +{ + PQExpBuffer str = createPQExpBuffer(); + PGresult *res; + char *ipubname_esc; + char *spubname_esc; + + Assert(conn != NULL); + + ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname)); + spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname)); + + /* Check if the publication already exists */ + appendPQExpBuffer(str, + "SELECT 1 FROM pg_catalog.pg_publication " + "WHERE pubname = %s", + spubname_esc); + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain publication information: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + if (PQntuples(res) == 1) + { + /* + * Unfortunately, if it reaches this code path, it will always fail + * (unless you decide to change the existing publication name). That's + * bad but it is very unlikely that the user will choose a name with + * pg_createsubscriber_ prefix followed by the exact database oid and + * a random number. + */ + pg_log_error("publication \"%s\" already exists", dbinfo->pubname); + pg_log_error_hint("Consider renaming this publication before continuing."); + disconnect_database(conn, true); + } + + PQclear(res); + resetPQExpBuffer(str); + + pg_log_info("creating publication \"%s\" on database \"%s\"", + dbinfo->pubname, dbinfo->dbname); + + appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", + ipubname_esc); + + pg_log_debug("command is: %s", str->data); + + if (!dry_run) + { + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not create publication \"%s\" on database \"%s\": %s", + dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + PQclear(res); + } + + /* For cleanup purposes */ + dbinfo->made_publication = true; + + pg_free(ipubname_esc); + pg_free(spubname_esc); + destroyPQExpBuffer(str); +} + +/* + * Remove publication if it couldn't finish all steps. + */ +static void +drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo) +{ + PQExpBuffer str = createPQExpBuffer(); + PGresult *res; + char *pubname_esc; + + Assert(conn != NULL); + + pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname)); + + pg_log_info("dropping publication \"%s\" on database \"%s\"", + dbinfo->pubname, dbinfo->dbname); + + appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc); + + pg_free(pubname_esc); + + pg_log_debug("command is: %s", str->data); + + if (!dry_run) + { + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not drop publication \"%s\" on database \"%s\": %s", + dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res)); + dbinfo->made_publication = false; /* don't try again. */ + + /* + * Don't disconnect and exit here. This routine is used by primary + * (cleanup publication / replication slot due to an error) and + * subscriber (remove the replicated publications). In both cases, + * it can continue and provide instructions for the user to remove + * it later if cleanup fails. + */ + } + PQclear(res); + } + + destroyPQExpBuffer(str); +} + +/* + * Create a subscription with some predefined options. + * + * A replication slot was already created in a previous step. Let's use it. It + * is not required to copy data. The subscription will be created but it will + * not be enabled now. That's because the replication progress must be set and + * the replication origin name (one of the function arguments) contains the + * subscription OID in its name. Once the subscription is created, + * set_replication_progress() can obtain the chosen origin name and set up its + * initial location. + */ +static void +create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo) +{ + PQExpBuffer str = createPQExpBuffer(); + PGresult *res; + char *pubname_esc; + char *subname_esc; + char *pubconninfo_esc; + char *replslotname_esc; + + Assert(conn != NULL); + + pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname)); + subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname)); + pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo)); + replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname)); + + pg_log_info("creating subscription \"%s\" on database \"%s\"", + dbinfo->subname, dbinfo->dbname); + + appendPQExpBuffer(str, + "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " + "WITH (create_slot = false, enabled = false, " + "slot_name = %s, copy_data = false)", + subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc); + + pg_free(pubname_esc); + pg_free(subname_esc); + pg_free(pubconninfo_esc); + pg_free(replslotname_esc); + + pg_log_debug("command is: %s", str->data); + + if (!dry_run) + { + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not create subscription \"%s\" on database \"%s\": %s", + dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + PQclear(res); + } + + destroyPQExpBuffer(str); +} + +/* + * Sets the replication progress to the consistent LSN. + * + * The subscriber caught up to the consistent LSN provided by the last + * replication slot that was created. The goal is to set up the initial + * location for the logical replication that is the exact LSN that the + * subscriber was promoted. Once the subscription is enabled it will start + * streaming from that location onwards. In dry run mode, the subscription OID + * and LSN are set to invalid values for printing purposes. + */ +static void +set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn) +{ + PQExpBuffer str = createPQExpBuffer(); + PGresult *res; + Oid suboid; + char *subname; + char *dbname; + char *originname; + char *lsnstr; + + Assert(conn != NULL); + + subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname)); + dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname)); + + appendPQExpBuffer(str, + "SELECT s.oid FROM pg_catalog.pg_subscription s " + "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) " + "WHERE s.subname = %s AND d.datname = %s", + subname, dbname); + + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain subscription OID: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + if (PQntuples(res) != 1 && !dry_run) + { + pg_log_error("could not obtain subscription OID: got %d rows, expected %d rows", + PQntuples(res), 1); + disconnect_database(conn, true); + } + + if (dry_run) + { + suboid = InvalidOid; + lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr)); + } + else + { + suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10); + lsnstr = psprintf("%s", lsn); + } + + PQclear(res); + + /* + * The origin name is defined as pg_%u. %u is the subscription OID. See + * ApplyWorkerMain(). + */ + originname = psprintf("pg_%u", suboid); + + pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"", + originname, lsnstr, dbinfo->dbname); + + resetPQExpBuffer(str); + appendPQExpBuffer(str, + "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')", + originname, lsnstr); + + pg_log_debug("command is: %s", str->data); + + if (!dry_run) + { + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not set replication progress for the subscription \"%s\": %s", + dbinfo->subname, PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + PQclear(res); + } + + pg_free(subname); + pg_free(dbname); + pg_free(originname); + pg_free(lsnstr); + destroyPQExpBuffer(str); +} + +/* + * Enables the subscription. + * + * The subscription was created in a previous step but it was disabled. After + * adjusting the initial logical replication location, enable the subscription. + */ +static void +enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo) +{ + PQExpBuffer str = createPQExpBuffer(); + PGresult *res; + char *subname; + + Assert(conn != NULL); + + subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname)); + + pg_log_info("enabling subscription \"%s\" on database \"%s\"", + dbinfo->subname, dbinfo->dbname); + + appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname); + + pg_log_debug("command is: %s", str->data); + + if (!dry_run) + { + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not enable subscription \"%s\": %s", + dbinfo->subname, PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + PQclear(res); + } + + pg_free(subname); + destroyPQExpBuffer(str); +} + +int +main(int argc, char **argv) +{ + static struct option long_options[] = + { + {"database", required_argument, NULL, 'd'}, + {"pgdata", required_argument, NULL, 'D'}, + {"dry-run", no_argument, NULL, 'n'}, + {"subscriber-port", required_argument, NULL, 'p'}, + {"publisher-server", required_argument, NULL, 'P'}, + {"socket-directory", required_argument, NULL, 's'}, + {"recovery-timeout", required_argument, NULL, 't'}, + {"subscriber-username", required_argument, NULL, 'U'}, + {"verbose", no_argument, NULL, 'v'}, + {"version", no_argument, NULL, 'V'}, + {"help", no_argument, NULL, '?'}, + {"config-file", required_argument, NULL, 1}, + {"publication", required_argument, NULL, 2}, + {"replication-slot", required_argument, NULL, 3}, + {"subscription", required_argument, NULL, 4}, + {NULL, 0, NULL, 0} + }; + + struct CreateSubscriberOptions opt = {0}; + + int c; + int option_index; + + char *pub_base_conninfo; + char *sub_base_conninfo; + char *dbname_conninfo = NULL; + + uint64 pub_sysid; + uint64 sub_sysid; + struct stat statbuf; + + char *consistent_lsn; + + char pidfile[MAXPGPATH]; + + pg_logging_init(argv[0]); + pg_logging_set_level(PG_LOG_WARNING); + progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber")); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(); + exit(0); + } + else if (strcmp(argv[1], "-V") == 0 + || strcmp(argv[1], "--version") == 0) + { + puts("pg_createsubscriber (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + /* Default settings */ + subscriber_dir = NULL; + opt.config_file = NULL; + opt.pub_conninfo_str = NULL; + opt.socket_dir = NULL; + opt.sub_port = DEFAULT_SUB_PORT; + opt.sub_username = NULL; + opt.database_names = (SimpleStringList) + { + 0 + }; + opt.recovery_timeout = 0; + + /* + * Don't allow it to be run as root. It uses pg_ctl which does not allow + * it either. + */ +#ifndef WIN32 + if (geteuid() == 0) + { + pg_log_error("cannot be executed by \"root\""); + pg_log_error_hint("You must run %s as the PostgreSQL superuser.", + progname); + exit(1); + } +#endif + + get_restricted_token(); + + while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v", + long_options, &option_index)) != -1) + { + switch (c) + { + case 'd': + if (!simple_string_list_member(&opt.database_names, optarg)) + { + simple_string_list_append(&opt.database_names, optarg); + num_dbs++; + } + else + { + pg_log_error("duplicate database \"%s\"", optarg); + exit(1); + } + break; + case 'D': + subscriber_dir = pg_strdup(optarg); + canonicalize_path(subscriber_dir); + break; + case 'n': + dry_run = true; + break; + case 'p': + opt.sub_port = pg_strdup(optarg); + break; + case 'P': + opt.pub_conninfo_str = pg_strdup(optarg); + break; + case 's': + opt.socket_dir = pg_strdup(optarg); + canonicalize_path(opt.socket_dir); + break; + case 't': + opt.recovery_timeout = atoi(optarg); + break; + case 'U': + opt.sub_username = pg_strdup(optarg); + break; + case 'v': + pg_logging_increase_verbosity(); + break; + case 1: + opt.config_file = pg_strdup(optarg); + break; + case 2: + if (!simple_string_list_member(&opt.pub_names, optarg)) + { + simple_string_list_append(&opt.pub_names, optarg); + num_pubs++; + } + else + { + pg_log_error("duplicate publication \"%s\"", optarg); + exit(1); + } + break; + case 3: + if (!simple_string_list_member(&opt.replslot_names, optarg)) + { + simple_string_list_append(&opt.replslot_names, optarg); + num_replslots++; + } + else + { + pg_log_error("duplicate replication slot \"%s\"", optarg); + exit(1); + } + break; + case 4: + if (!simple_string_list_member(&opt.sub_names, optarg)) + { + simple_string_list_append(&opt.sub_names, optarg); + num_subs++; + } + else + { + pg_log_error("duplicate subscription \"%s\"", optarg); + exit(1); + } + break; + default: + /* getopt_long already emitted a complaint */ + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + } + + /* Any non-option arguments? */ + if (optind < argc) + { + pg_log_error("too many command-line arguments (first is \"%s\")", + argv[optind]); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + /* Required arguments */ + if (subscriber_dir == NULL) + { + pg_log_error("no subscriber data directory specified"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + + /* If socket directory is not provided, use the current directory */ + if (opt.socket_dir == NULL) + { + char cwd[MAXPGPATH]; + + if (!getcwd(cwd, MAXPGPATH)) + pg_fatal("could not determine current directory"); + opt.socket_dir = pg_strdup(cwd); + canonicalize_path(opt.socket_dir); + } + + /* + * Parse connection string. Build a base connection string that might be + * reused by multiple databases. + */ + if (opt.pub_conninfo_str == NULL) + { + /* + * TODO use primary_conninfo (if available) from subscriber and + * extract publisher connection string. Assume that there are + * identical entries for physical and logical replication. If there is + * not, we would fail anyway. + */ + pg_log_error("no publisher connection string specified"); + pg_log_error_hint("Try \"%s --help\" for more information.", progname); + exit(1); + } + pg_log_info("validating connection string on publisher"); + pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str, + &dbname_conninfo); + if (pub_base_conninfo == NULL) + exit(1); + + pg_log_info("validating connection string on subscriber"); + sub_base_conninfo = get_sub_conninfo(&opt); + + if (opt.database_names.head == NULL) + { + pg_log_info("no database was specified"); + + /* + * If --database option is not provided, try to obtain the dbname from + * the publisher conninfo. If dbname parameter is not available, error + * out. + */ + if (dbname_conninfo) + { + simple_string_list_append(&opt.database_names, dbname_conninfo); + num_dbs++; + + pg_log_info("database \"%s\" was extracted from the publisher connection string", + dbname_conninfo); + } + else + { + pg_log_error("no database name specified"); + pg_log_error_hint("Try \"%s --help\" for more information.", + progname); + exit(1); + } + } + + /* Number of object names must match number of databases */ + if (num_pubs > 0 && num_pubs != num_dbs) + { + pg_log_error("wrong number of publication names"); + pg_log_error_hint("Number of publication names (%d) must match number of database names (%d).", + num_pubs, num_dbs); + exit(1); + } + if (num_subs > 0 && num_subs != num_dbs) + { + pg_log_error("wrong number of subscription names"); + pg_log_error_hint("Number of subscription names (%d) must match number of database names (%d).", + num_subs, num_dbs); + exit(1); + } + if (num_replslots > 0 && num_replslots != num_dbs) + { + pg_log_error("wrong number of replication slot names"); + pg_log_error_hint("Number of replication slot names (%d) must match number of database names (%d).", + num_replslots, num_dbs); + exit(1); + } + + /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */ + pg_ctl_path = get_exec_path(argv[0], "pg_ctl"); + pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal"); + + /* Rudimentary check for a data directory */ + check_data_directory(subscriber_dir); + + /* + * Store database information for publisher and subscriber. It should be + * called before atexit() because its return is used in the + * cleanup_objects_atexit(). + */ + dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo); + + /* Register a function to clean up objects in case of failure */ + atexit(cleanup_objects_atexit); + + /* + * Check if the subscriber data directory has the same system identifier + * than the publisher data directory. + */ + pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo); + sub_sysid = get_standby_sysid(subscriber_dir); + if (pub_sysid != sub_sysid) + pg_fatal("subscriber data directory is not a copy of the source database cluster"); + + /* Subscriber PID file */ + snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir); + + /* + * The standby server must not be running. If the server is started under + * service manager and pg_createsubscriber stops it, the service manager + * might react to this action and start the server again. Therefore, + * refuse to proceed if the server is running to avoid possible failures. + */ + if (stat(pidfile, &statbuf) == 0) + { + pg_log_error("standby is up and running"); + pg_log_error_hint("Stop the standby and try again."); + exit(1); + } + + /* + * Start a short-lived standby server with temporary parameters (provided + * by command-line options). The goal is to avoid connections during the + * transformation steps. + */ + pg_log_info("starting the standby with command-line options"); + start_standby_server(&opt, true); + + /* Check if the standby server is ready for logical replication */ + check_subscriber(dbinfo); + + /* + * Check if the primary server is ready for logical replication. This + * routine checks if a replication slot is in use on primary so it relies + * on check_subscriber() to obtain the primary_slot_name. That's why it is + * called after it. + */ + check_publisher(dbinfo); + + /* + * Stop the target server. The recovery process requires that the server + * reaches a consistent state before targeting the recovery stop point. + * Make sure a consistent state is reached (stop the target server + * guarantees it) *before* creating the replication slots in + * setup_publisher(). + */ + pg_log_info("stopping the subscriber"); + stop_standby_server(subscriber_dir); + + /* + * Create the required objects for each database on publisher. This step + * is here mainly because if we stop the standby we cannot verify if the + * primary slot is in use. We could use an extra connection for it but it + * doesn't seem worth. + */ + consistent_lsn = setup_publisher(dbinfo); + + /* Write the required recovery parameters */ + setup_recovery(dbinfo, subscriber_dir, consistent_lsn); + + /* + * Start subscriber so the recovery parameters will take effect. Wait + * until accepting connections. + */ + pg_log_info("starting the subscriber"); + start_standby_server(&opt, true); + + /* Waiting the subscriber to be promoted */ + wait_for_end_recovery(dbinfo[0].subconninfo, &opt); + + /* + * Create the subscription for each database on subscriber. It does not + * enable it immediately because it needs to adjust the replication start + * point to the LSN reported by setup_publisher(). It also cleans up + * publications created by this tool and replication to the standby. + */ + setup_subscriber(dbinfo, consistent_lsn); + + /* Remove primary_slot_name if it exists on primary */ + drop_primary_replication_slot(dbinfo, primary_slot_name); + + /* Stop the subscriber */ + pg_log_info("stopping the subscriber"); + stop_standby_server(subscriber_dir); + + /* Change system identifier from subscriber */ + modify_subscriber_sysid(&opt); + + success = true; + + pg_log_info("Done!"); + + return 0; +} diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl new file mode 100644 index 00000000000..63ae6fdfc67 --- /dev/null +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -0,0 +1,364 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +# +# Test using a standby server as the subscriber. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +program_help_ok('pg_createsubscriber'); +program_version_ok('pg_createsubscriber'); +program_options_handling_ok('pg_createsubscriber'); + +my $datadir = PostgreSQL::Test::Utils::tempdir; + +# +# Test mandatory options +command_fails(['pg_createsubscriber'], + 'no subscriber data directory specified'); +command_fails( + [ 'pg_createsubscriber', '--pgdata', $datadir ], + 'no publisher connection string specified'); +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--pgdata', $datadir, + '--publisher-server', 'port=5432' + ], + 'no database name specified'); +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--pgdata', $datadir, + '--publisher-server', 'port=5432', + '--database', 'pg1', + '--database', 'pg1' + ], + 'duplicate database name'); +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--pgdata', $datadir, + '--publisher-server', 'port=5432', + '--publication', 'foo1', + '--publication', 'foo1', + '--database', 'pg1', + '--database', 'pg2' + ], + 'duplicate publication name'); +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--pgdata', $datadir, + '--publisher-server', 'port=5432', + '--publication', 'foo1', + '--database', 'pg1', + '--database', 'pg2' + ], + 'wrong number of publication names'); +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--pgdata', $datadir, + '--publisher-server', 'port=5432', + '--publication', 'foo1', + '--publication', 'foo2', + '--subscription', 'bar1', + '--database', 'pg1', + '--database', 'pg2' + ], + 'wrong number of subscription names'); +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--pgdata', $datadir, + '--publisher-server', 'port=5432', + '--publication', 'foo1', + '--publication', 'foo2', + '--subscription', 'bar1', + '--subscription', 'bar2', + '--replication-slot', 'baz1', + '--database', 'pg1', + '--database', 'pg2' + ], + 'wrong number of replication slot names'); + +# Set up node P as primary +my $node_p = PostgreSQL::Test::Cluster->new('node_p'); +$node_p->init(allows_streaming => 'logical'); +$node_p->start; + +# Set up node F as about-to-fail node +# Force it to initialize a new cluster instead of copying a +# previously initdb'd cluster. New cluster has a different system identifier so +# we can test if the target cluster is a copy of the source cluster. +my $node_f = PostgreSQL::Test::Cluster->new('node_f'); +$node_f->init(force_initdb => 1, allows_streaming => 'logical'); + +# On node P +# - create databases +# - create test tables +# - insert a row +# - create a physical replication slot +$node_p->safe_psql( + 'postgres', q( + CREATE DATABASE pg1; + CREATE DATABASE pg2; +)); +$node_p->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)'); +$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')"); +$node_p->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)'); +my $slotname = 'physical_slot'; +$node_p->safe_psql('pg2', + "SELECT pg_create_physical_replication_slot('$slotname')"); + +# Set up node S as standby linking to node P +$node_p->backup('backup_1'); +my $node_s = PostgreSQL::Test::Cluster->new('node_s'); +$node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1); +$node_s->append_conf( + 'postgresql.conf', qq[ +primary_slot_name = '$slotname' +]); +$node_s->set_standby_mode(); +$node_s->start; + +# Set up node T as standby linking to node P then promote it +my $node_t = PostgreSQL::Test::Cluster->new('node_t'); +$node_t->init_from_backup($node_p, 'backup_1', has_streaming => 1); +$node_t->set_standby_mode(); +$node_t->start; +$node_t->promote; +$node_t->stop; + +# Run pg_createsubscriber on a promoted server +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--dry-run', '--pgdata', + $node_t->data_dir, '--publisher-server', + $node_p->connstr('pg1'), + '--socket-directory', $node_t->host, + '--subscriber-port', $node_t->port, + '--database', 'pg1', + '--database', 'pg2' + ], + 'target server is not in recovery'); + +# Run pg_createsubscriber when standby is running +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--dry-run', '--pgdata', + $node_s->data_dir, '--publisher-server', + $node_p->connstr('pg1'), + '--socket-directory', $node_s->host, + '--subscriber-port', $node_s->port, + '--database', 'pg1', + '--database', 'pg2' + ], + 'standby is up and running'); + +# Run pg_createsubscriber on about-to-fail node F +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--pgdata', $node_f->data_dir, + '--publisher-server', $node_p->connstr('pg1'), + '--socket-directory', $node_f->host, + '--subscriber-port', $node_f->port, + '--database', 'pg1', + '--database', 'pg2' + ], + 'subscriber data directory is not a copy of the source database cluster'); + +# Set up node C as standby linking to node S +$node_s->backup('backup_2'); +my $node_c = PostgreSQL::Test::Cluster->new('node_c'); +$node_c->init_from_backup($node_s, 'backup_2', has_streaming => 1); +$node_c->adjust_conf('postgresql.conf', 'primary_slot_name', undef); +$node_c->set_standby_mode(); + +# Run pg_createsubscriber on node C (P -> S -> C) +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--dry-run', '--pgdata', + $node_c->data_dir, '--publisher-server', + $node_s->connstr('pg1'), + '--socket-directory', $node_c->host, + '--subscriber-port', $node_c->port, + '--database', 'pg1', + '--database', 'pg2' + ], + 'primary server is in recovery'); + +# Insert another row on node P and wait node S to catch up +$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('second row')"); +$node_p->wait_for_replay_catchup($node_s); + +# Check some unmet conditions on node P +$node_p->append_conf('postgresql.conf', q{ +wal_level = replica +max_replication_slots = 1 +max_wal_senders = 1 +max_worker_processes = 2 +}); +$node_p->restart; +$node_s->stop; +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--dry-run', '--pgdata', + $node_s->data_dir, '--publisher-server', + $node_p->connstr('pg1'), + '--socket-directory', $node_s->host, + '--subscriber-port', $node_s->port, + '--database', 'pg1', + '--database', 'pg2' + ], + 'primary contains unmet conditions on node P'); +# Restore default settings here but only apply it after testing standby. Some +# standby settings should not be a lower setting than on the primary. +$node_p->append_conf('postgresql.conf', q{ +wal_level = logical +max_replication_slots = 10 +max_wal_senders = 10 +max_worker_processes = 8 +}); + +# Check some unmet conditions on node S +$node_s->append_conf('postgresql.conf', q{ +max_replication_slots = 1 +max_logical_replication_workers = 1 +max_worker_processes = 2 +}); +command_fails( + [ + 'pg_createsubscriber', '--verbose', + '--dry-run', '--pgdata', + $node_s->data_dir, '--publisher-server', + $node_p->connstr('pg1'), + '--socket-directory', $node_s->host, + '--subscriber-port', $node_s->port, + '--database', 'pg1', + '--database', 'pg2' + ], + 'standby contains unmet conditions on node S'); +$node_s->append_conf('postgresql.conf', q{ +max_replication_slots = 10 +max_logical_replication_workers = 4 +max_worker_processes = 8 +}); +# Restore default settings on both servers +$node_p->restart; + +# dry run mode on node S +command_ok( + [ + 'pg_createsubscriber', '--verbose', + '--dry-run', '--pgdata', + $node_s->data_dir, '--publisher-server', + $node_p->connstr('pg1'), + '--socket-directory', $node_s->host, + '--subscriber-port', $node_s->port, + '--publication', 'pub1', + '--publication', 'pub2', + '--subscription', 'sub1', + '--subscription', 'sub2', + '--database', 'pg1', + '--database', 'pg2' + ], + 'run pg_createsubscriber --dry-run on node S'); + +# Check if node S is still a standby +$node_s->start; +is($node_s->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'), + 't', 'standby is in recovery'); +$node_s->stop; + +# pg_createsubscriber can run without --databases option +command_ok( + [ + 'pg_createsubscriber', '--verbose', + '--dry-run', '--pgdata', + $node_s->data_dir, '--publisher-server', + $node_p->connstr('pg1'), + '--socket-directory', $node_s->host, + '--subscriber-port', $node_s->port, + '--replication-slot', 'replslot1' + ], + 'run pg_createsubscriber without --databases'); + +# Run pg_createsubscriber on node S +command_ok( + [ + 'pg_createsubscriber', '--verbose', + '--verbose', '--pgdata', + $node_s->data_dir, '--publisher-server', + $node_p->connstr('pg1'), + '--socket-directory', $node_s->host, + '--subscriber-port', $node_s->port, + '--publication', 'pub1', + '--publication', 'Pub2', + '--replication-slot', 'replslot1', + '--replication-slot', 'replslot2', + '--database', 'pg1', + '--database', 'pg2' + ], + 'run pg_createsubscriber on node S'); + +# Confirm the physical replication slot has been removed +my $result = $node_p->safe_psql('pg1', + "SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$slotname'" +); +is($result, qq(0), + 'the physical replication slot used as primary_slot_name has been removed' +); + +# Insert rows on P +$node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('third row')"); +$node_p->safe_psql('pg2', "INSERT INTO tbl2 VALUES('row 1')"); + +# Start subscriber +$node_s->start; + +# Get subscription names +$result = $node_s->safe_psql( + 'postgres', qq( + SELECT subname FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_' +)); +my @subnames = split("\n", $result); + +# Wait subscriber to catch up +$node_s->wait_for_subscription_sync($node_p, $subnames[0]); +$node_s->wait_for_subscription_sync($node_p, $subnames[1]); + +# Check result on database pg1 +$result = $node_s->safe_psql('pg1', 'SELECT * FROM tbl1'); +is( $result, qq(first row +second row +third row), + 'logical replication works on database pg1'); + +# Check result on database pg2 +$result = $node_s->safe_psql('pg2', 'SELECT * FROM tbl2'); +is($result, qq(row 1), 'logical replication works on database pg2'); + +# Different system identifier? +my $sysid_p = $node_p->safe_psql('postgres', + 'SELECT system_identifier FROM pg_control_system()'); +my $sysid_s = $node_s->safe_psql('postgres', + 'SELECT system_identifier FROM pg_control_system()'); +ok($sysid_p != $sysid_s, 'system identifier was changed'); + +# clean up +$node_p->teardown_node; +$node_s->teardown_node; +$node_t->teardown_node; +$node_f->teardown_node; + +done_testing();