diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b647a81fc86..a400ba0e40c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -759,7 +759,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = !superuser_arg(owner) && opts.passwordrequired; - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true, true, must_use_password, stmt->subname, &err); if (!wrconn) ereport(ERROR, @@ -910,7 +910,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1537,7 +1537,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Try to connect to the publisher. */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, must_use_password, + wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, sub->name, &err); if (!wrconn) ereport(ERROR, @@ -1788,7 +1788,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) */ load_file("libpqwalreceiver", false); - wrconn = walrcv_connect(conninfo, true, must_use_password, + wrconn = walrcv_connect(conninfo, true, true, must_use_password, subname, &err); if (wrconn == NULL) { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2439733b55b..9270d7b855b 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -48,7 +48,8 @@ struct WalReceiverConn /* Prototypes for interface functions */ static WalReceiverConn *libpqrcv_connect(const char *conninfo, - bool logical, bool must_use_password, + bool replication, bool logical, + bool must_use_password, const char *appname, char **err); static void libpqrcv_check_conninfo(const char *conninfo, bool must_use_password); @@ -57,6 +58,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -99,6 +101,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -121,7 +124,11 @@ _PG_init(void) } /* - * Establish the connection to the primary server for XLOG streaming + * Establish the connection to the primary server. + * + * This function can be used for both replication and regular connections. + * If it is a replication connection, it could be either logical or physical + * based on input argument 'logical'. * * If an error occurs, this function will normally return NULL and set *err * to a palloc'ed error message. However, if must_use_password is true and @@ -132,8 +139,8 @@ _PG_init(void) * case. */ static WalReceiverConn * -libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, - const char *appname, char **err) +libpqrcv_connect(const char *conninfo, bool replication, bool logical, + bool must_use_password, const char *appname, char **err) { WalReceiverConn *conn; PostgresPollingStatusType status; @@ -156,36 +163,46 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password, */ keys[i] = "dbname"; vals[i] = conninfo; - keys[++i] = "replication"; - vals[i] = logical ? "database" : "true"; - if (!logical) + + /* We can not have logical without replication */ + Assert(replication || !logical); + + if (replication) { - /* - * The database name is ignored by the server in replication mode, but - * specify "replication" for .pgpass lookup. - */ - keys[++i] = "dbname"; - vals[i] = "replication"; + keys[++i] = "replication"; + vals[i] = logical ? "database" : "true"; + + if (logical) + { + /* Tell the publisher to translate to our encoding */ + keys[++i] = "client_encoding"; + vals[i] = GetDatabaseEncodingName(); + + /* + * Force assorted GUC parameters to settings that ensure that the + * publisher will output data values in a form that is unambiguous + * to the subscriber. (We don't want to modify the subscriber's + * GUC settings, since that might surprise user-defined code + * running in the subscriber, such as triggers.) This should + * match what pg_dump does. + */ + keys[++i] = "options"; + vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3"; + } + else + { + /* + * The database name is ignored by the server in replication mode, + * but specify "replication" for .pgpass lookup. + */ + keys[++i] = "dbname"; + vals[i] = "replication"; + } } + keys[++i] = "fallback_application_name"; vals[i] = appname; - if (logical) - { - /* Tell the publisher to translate to our encoding */ - keys[++i] = "client_encoding"; - vals[i] = GetDatabaseEncodingName(); - /* - * Force assorted GUC parameters to settings that ensure that the - * publisher will output data values in a form that is unambiguous to - * the subscriber. (We don't want to modify the subscriber's GUC - * settings, since that might surprise user-defined code running in - * the subscriber, such as triggers.) This should match what pg_dump - * does. - */ - keys[++i] = "options"; - vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3"; - } keys[++i] = NULL; vals[i] = NULL; @@ -471,6 +488,50 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) + { + /* + * If multiple dbnames are specified, then the last one will be + * returned + */ + if (strcmp(opt->keyword, "dbname") == 0 && opt->val && + *opt->val) + { + if (dbname) + pfree(dbname); + + dbname = pstrdup(opt->val); + } + } + + PQconninfoFree(opts); + return dbname; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 5acab3f3e23..ee066290881 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1329,7 +1329,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * so that synchronous replication can distinguish them. */ LogRepWorkerWalRcvConn = - walrcv_connect(MySubscription->conninfo, true, + walrcv_connect(MySubscription->conninfo, true, true, must_use_password, slotname, &err); if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 32ff4c03364..9dd2446fbfd 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4519,7 +4519,7 @@ run_apply_worker() !MySubscription->ownersuperuser; LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - must_use_password, + true, must_use_password, MySubscription->name, &err); if (LogRepWorkerWalRcvConn == NULL) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index e29a6196a3e..b80447d15f1 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -296,7 +296,7 @@ WalReceiverMain(void) sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); /* Establish the connection to the primary for XLOG streaming */ - wrconn = walrcv_connect(conninfo, false, false, + wrconn = walrcv_connect(conninfo, true, false, false, cluster_name[0] ? cluster_name : "walreceiver", &err); if (!wrconn) diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f566a99ba16..b906bb5ce83 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -228,8 +228,10 @@ typedef struct WalRcvExecResult /* * walrcv_connect_fn * - * Establish connection to a cluster. 'logical' is true if the - * connection is logical, and false if the connection is physical. + * Establish connection to a cluster. 'replication' is true if the + * connection is a replication connection, and false if it is a + * regular connection. If it is a replication connection, it could + * be either logical or physical based on input argument 'logical'. * 'appname' is a name associated to the connection, to use for example * with fallback_application_name or application_name. Returns the * details about the connection established, as defined by @@ -237,6 +239,7 @@ typedef struct WalRcvExecResult * returned with 'err' including the error generated. */ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, + bool replication, bool logical, bool must_use_password, const char *appname, @@ -279,6 +282,13 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the database name from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -403,6 +413,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -418,8 +429,8 @@ typedef struct WalReceiverFunctionsType extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; -#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \ - WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err) +#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \ + WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) #define walrcv_check_conninfo(conninfo, must_use_password) \ WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password) #define walrcv_get_conninfo(conn) \ @@ -428,6 +439,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \