diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4c332652e3c..b4fa89bce90 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -21,6 +21,7 @@ #include "miscadmin.h" #include "storage/latch.h" #include "utils/hsearch.h" +#include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -49,11 +50,15 @@ typedef struct ConnCacheEntry { ConnCacheKey key; /* hash key (must be first) */ PGconn *conn; /* connection to foreign server, or NULL */ + /* Remaining fields are invalid when conn is NULL: */ int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = * one level of subxact open, etc */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ + bool invalidated; /* true if reconnect is pending */ + uint32 server_hashvalue; /* hash value of foreign server OID */ + uint32 mapping_hashvalue; /* hash value of user mapping OID */ } ConnCacheEntry; /* @@ -70,6 +75,7 @@ static bool xact_got_connection = false; /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); @@ -79,6 +85,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg); +static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, @@ -96,13 +103,6 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, * will_prep_stmt must be true if caller intends to create any prepared * statements. Since those don't go away automatically at transaction end * (not even on error), we need this flag to cue manual cleanup. - * - * XXX Note that caching connections theoretically requires a mechanism to - * detect change of FDW objects to invalidate already established connections. - * We could manage that by watching for invalidation events on the relevant - * syscaches. For the moment, though, it's not clear that this would really - * be useful and not mere pedantry. We could not flush any active connections - * mid-transaction anyway. */ PGconn * GetConnection(ForeignServer *server, UserMapping *user, @@ -132,6 +132,10 @@ GetConnection(ForeignServer *server, UserMapping *user, */ RegisterXactCallback(pgfdw_xact_callback, NULL); RegisterSubXactCallback(pgfdw_subxact_callback, NULL); + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, + pgfdw_inval_callback, (Datum) 0); + CacheRegisterSyscacheCallback(USERMAPPINGOID, + pgfdw_inval_callback, (Datum) 0); } /* Set flag that we did GetConnection during the current transaction */ @@ -147,17 +151,27 @@ GetConnection(ForeignServer *server, UserMapping *user, entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); if (!found) { - /* initialize new hashtable entry (key is already filled in) */ + /* + * We need only clear "conn" here; remaining fields will be filled + * later when "conn" is set. + */ entry->conn = NULL; - entry->xact_depth = 0; - entry->have_prep_stmt = false; - entry->have_error = false; - entry->changing_xact_state = false; } /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); + /* + * If the connection needs to be remade due to invalidation, disconnect as + * soon as we're out of all transactions. + */ + if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0) + { + elog(DEBUG3, "closing connection %p for option changes to take effect", + entry->conn); + disconnect_pg_server(entry); + } + /* * We don't check the health of cached connection here, because it would * require some overhead. Broken connection will be detected when the @@ -167,13 +181,36 @@ GetConnection(ForeignServer *server, UserMapping *user, /* * If cache entry doesn't have a connection, we have to establish a new * connection. (If connect_pg_server throws an error, the cache entry - * will be left in a valid empty state.) + * will remain in a valid empty state, ie conn == NULL.) */ if (entry->conn == NULL) { - entry->xact_depth = 0; /* just to be sure */ + Oid umoid; + + /* Reset all transient state fields, to be sure all are clean */ + entry->xact_depth = 0; entry->have_prep_stmt = false; entry->have_error = false; + entry->changing_xact_state = false; + entry->invalidated = false; + entry->server_hashvalue = + GetSysCacheHashValue1(FOREIGNSERVEROID, + ObjectIdGetDatum(server->serverid)); + /* Pre-9.6, UserMapping doesn't store its OID, so look it up again */ + umoid = GetSysCacheOid2(USERMAPPINGUSERSERVER, + ObjectIdGetDatum(user->userid), + ObjectIdGetDatum(user->serverid)); + if (!OidIsValid(umoid)) + { + /* Not found for the specific user -- try PUBLIC */ + umoid = GetSysCacheOid2(USERMAPPINGUSERSERVER, + ObjectIdGetDatum(InvalidOid), + ObjectIdGetDatum(user->serverid)); + } + entry->mapping_hashvalue = + GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(umoid)); + + /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"", entry->conn, server->servername); @@ -286,6 +323,19 @@ connect_pg_server(ForeignServer *server, UserMapping *user) return conn; } +/* + * Disconnect any open connection for a connection cache entry. + */ +static void +disconnect_pg_server(ConnCacheEntry *entry) +{ + if (entry->conn != NULL) + { + PQfinish(entry->conn); + entry->conn = NULL; + } +} + /* * For non-superusers, insist that the connstr specify a password. This * prevents a password from being picked up from .pgpass, a service file, @@ -787,9 +837,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) entry->changing_xact_state) { elog(DEBUG3, "discarding connection %p", entry->conn); - PQfinish(entry->conn); - entry->conn = NULL; - entry->changing_xact_state = false; + disconnect_pg_server(entry); } } @@ -906,6 +954,47 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, } } +/* + * Connection invalidation callback function + * + * After a change to a pg_foreign_server or pg_user_mapping catalog entry, + * mark connections depending on that entry as needing to be remade. + * We can't immediately destroy them, since they might be in the midst of + * a transaction, but we'll remake them at the next opportunity. + * + * Although most cache invalidation callbacks blow away all the related stuff + * regardless of the given hashvalue, connections are expensive enough that + * it's worth trying to avoid that. + * + * NB: We could avoid unnecessary disconnection more strictly by examining + * individual option values, but it seems too much effort for the gain. + */ +static void +pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID); + + /* ConnectionHash must exist already, if we're registered */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + /* Ignore invalid entries */ + if (entry->conn == NULL) + continue; + + /* hashvalue == 0 means a cache reset, must clear all state */ + if (hashvalue == 0 || + (cacheid == FOREIGNSERVEROID && + entry->server_hashvalue == hashvalue) || + (cacheid == USERMAPPINGOID && + entry->mapping_hashvalue == hashvalue)) + entry->invalidated = true; + } +} + /* * Raise an error if the given connection cache entry is marked as being * in the middle of an xact state change. This should be called at which no @@ -921,10 +1010,16 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) { ForeignServer *server; - if (!entry->changing_xact_state) + /* nothing to do for inactive entries and entries of sane state */ + if (entry->conn == NULL || !entry->changing_xact_state) return; + /* make sure this entry is inactive */ + disconnect_pg_server(entry); + + /* find server name to be shown in the message below */ server = GetForeignServer(entry->key.serverid); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("connection to server \"%s\" was lost", diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index e3ae021acef..74ab8c4e6ea 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -126,6 +126,43 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | (2 rows) +-- Test that alteration of server options causes reconnection +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work + c3 | c4 +-------+------------------------------ + 00001 | Fri Jan 02 00:00:00 1970 PST +(1 row) + +ALTER SERVER loopback OPTIONS (SET dbname 'no such database'); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail +ERROR: could not connect to server "loopback" +DETAIL: FATAL: database "no such database" does not exist +DO $d$ + BEGIN + EXECUTE $$ALTER SERVER loopback + OPTIONS (SET dbname '$$||current_database()||$$')$$; + END; +$d$; +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + c3 | c4 +-------+------------------------------ + 00001 | Fri Jan 02 00:00:00 1970 PST +(1 row) + +-- Test that alteration of user mapping options causes reconnection +ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (ADD user 'no such user'); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail +ERROR: could not connect to server "loopback" +DETAIL: FATAL: role "no such user" does not exist +ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (DROP user); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + c3 | c4 +-------+------------------------------ + 00001 | Fri Jan 02 00:00:00 1970 PST +(1 row) + -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and remote-estimate mode on ft2. diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 27031f0e53f..b27bac65f0c 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -129,6 +129,26 @@ ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ +-- Test that alteration of server options causes reconnection +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work +ALTER SERVER loopback OPTIONS (SET dbname 'no such database'); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail +DO $d$ + BEGIN + EXECUTE $$ALTER SERVER loopback + OPTIONS (SET dbname '$$||current_database()||$$')$$; + END; +$d$; +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + +-- Test that alteration of user mapping options causes reconnection +ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (ADD user 'no such user'); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail +ALTER USER MAPPING FOR CURRENT_USER SERVER loopback + OPTIONS (DROP user); +SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again + -- Now we should be able to run ANALYZE. -- To exercise multiple code paths, we use local stats on ft1 -- and remote-estimate mode on ft2.