1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

postgres_fdw: Add functions to discard cached connections.

This commit introduces two new functions postgres_fdw_disconnect()
and postgres_fdw_disconnect_all(). The former function discards
the cached connections to the specified foreign server. The latter discards
all the cached connections. If the connection is used in the current
transaction, it's not closed and a warning message is emitted.

For example, these functions are useful when users want to explicitly
close the foreign server connections that are no longer necessary and
then to prevent them from eating up the foreign servers connections
capacity.

Author: Bharath Rupireddy, tweaked a bit by Fujii Masao
Reviewed-by: Alexey Kondratov, Zhijie Hou, Zhihong Yu, Fujii Masao
Discussion: https://postgr.es/m/CALj2ACVvrp5=AVp2PupEm+nAC8S4buqR3fJMmaCoc7ftT0aD2A@mail.gmail.com
This commit is contained in:
Fujii Masao
2021-01-26 03:54:46 +09:00
parent ee895a655c
commit 411ae64997
5 changed files with 505 additions and 13 deletions

View File

@@ -80,6 +80,8 @@ static bool xact_got_connection = false;
* SQL functions
*/
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
/* prototypes of private functions */
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@@ -102,6 +104,7 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
/*
* Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -1428,8 +1431,8 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS)
* Even though the server is dropped in the current transaction, the
* cache can still have associated active connection entry, say we
* call such connections dangling. Since we can not fetch the server
* name from system catalogs for dangling connections, instead we
* show NULL value for server name in output.
* name from system catalogs for dangling connections, instead we show
* NULL value for server name in output.
*
* We could have done better by storing the server name in the cache
* entry instead of server oid so that it could be used in the output.
@@ -1447,7 +1450,7 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS)
/*
* If the server has been dropped in the current explicit
* transaction, then this entry would have been invalidated in
* pgfdw_inval_callback at the end of drop sever command. Note
* pgfdw_inval_callback at the end of drop server command. Note
* that this connection would not have been closed in
* pgfdw_inval_callback because it is still being used in the
* current explicit transaction. So, assert that here.
@@ -1470,3 +1473,129 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* Disconnect the specified cached connections.
*
* This function discards the open connections that are established by
* postgres_fdw from the local session to the foreign server with
* the given name. Note that there can be multiple connections to
* the given server using different user mappings. If the connections
* are used in the current local transaction, they are not disconnected
* and warning messages are reported. This function returns true
* if it disconnects at least one connection, otherwise false. If no
* foreign server with the given name is found, an error is reported.
*/
Datum
postgres_fdw_disconnect(PG_FUNCTION_ARGS)
{
ForeignServer *server;
char *servername;
servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
server = GetForeignServerByName(servername, false);
PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
}
/*
* Disconnect all the cached connections.
*
* This function discards all the open connections that are established by
* postgres_fdw from the local session to the foreign servers.
* If the connections are used in the current local transaction, they are
* not disconnected and warning messages are reported. This function
* returns true if it disconnects at least one connection, otherwise false.
*/
Datum
postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
{
PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
}
/*
* Workhorse to disconnect cached connections.
*
* This function scans all the connection cache entries and disconnects
* the open connections whose foreign server OID matches with
* the specified one. If InvalidOid is specified, it disconnects all
* the cached connections.
*
* This function emits a warning for each connection that's used in
* the current transaction and doesn't close it. It returns true if
* it disconnects at least one connection, otherwise false.
*
* Note that this function disconnects even the connections that are
* established by other users in the same local session using different
* user mappings. This leads even non-superuser to be able to close
* the connections established by superusers in the same local session.
*
* XXX As of now we don't see any security risk doing this. But we should
* set some restrictions on that, for example, prevent non-superuser
* from closing the connections established by superusers even
* in the same session?
*/
static bool
disconnect_cached_connections(Oid serverid)
{
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
bool all = !OidIsValid(serverid);
bool result = false;
/*
* Connection cache hashtable has not been initialized yet in this
* session, so return false.
*/
if (!ConnectionHash)
return false;
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
/* Ignore cache entry if no open connection right now. */
if (!entry->conn)
continue;
if (all || entry->serverid == serverid)
{
/*
* Emit a warning because the connection to close is used in the
* current transaction and cannot be disconnected right now.
*/
if (entry->xact_depth > 0)
{
ForeignServer *server;
server = GetForeignServerExtended(entry->serverid,
FSV_MISSING_OK);
if (!server)
{
/*
* If the foreign server was dropped while its connection
* was used in the current transaction, the connection
* must have been marked as invalid by
* pgfdw_inval_callback at the end of DROP SERVER command.
*/
Assert(entry->invalidated);
ereport(WARNING,
(errmsg("cannot close dropped server connection because it is still in use")));
}
else
ereport(WARNING,
(errmsg("cannot close connection for server \"%s\" because it is still in use",
server->servername)));
}
else
{
elog(DEBUG3, "discarding connection %p", entry->conn);
disconnect_pg_server(entry);
result = true;
}
}
}
return result;
}

View File

@@ -17,7 +17,10 @@ DO $d$
OPTIONS (dbname '$$||current_database()||$$',
port '$$||current_setting('port')||$$'
)$$;
EXECUTE $$CREATE SERVER loopback4 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (dbname '$$||current_database()||$$',
port '$$||current_setting('port')||$$'
)$$;
END;
$d$;
CREATE USER MAPPING FOR public SERVER testserver1
@@ -25,6 +28,7 @@ CREATE USER MAPPING FOR public SERVER testserver1
CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
CREATE USER MAPPING FOR public SERVER loopback3;
CREATE USER MAPPING FOR public SERVER loopback4;
-- ===================================================================
-- create objects used through FDW loopback server
-- ===================================================================
@@ -140,6 +144,11 @@ CREATE FOREIGN TABLE ft7 (
c2 int NOT NULL,
c3 text
) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
CREATE FOREIGN TABLE ft8 (
c1 int NOT NULL,
c2 int NOT NULL,
c3 text
) SERVER loopback4 OPTIONS (schema_name 'S 1', table_name 'T 4');
-- ===================================================================
-- tests for validator
-- ===================================================================
@@ -211,7 +220,8 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') |
public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') |
public | ft7 | loopback3 | (schema_name 'S 1', table_name 'T 4') |
(6 rows)
public | ft8 | loopback4 | (schema_name 'S 1', table_name 'T 4') |
(7 rows)
-- Test that alteration of server options causes reconnection
-- Remote's errors might be non-English, so hide them to ensure stable results
@@ -9053,9 +9063,9 @@ ERROR: 08006
COMMIT;
-- Clean up
DROP PROCEDURE terminate_backend_and_wait(text);
-- ===================================================================
-- test connection invalidation cases
-- ===================================================================
-- =============================================================================
-- test connection invalidation cases and postgres_fdw_get_connections function
-- =============================================================================
-- This test case is for closing the connection in pgfdw_xact_callback
BEGIN;
-- List all the existing cached connections. Only loopback2 should be output.
@@ -9118,6 +9128,194 @@ SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
loopback2 | t
(1 row)
-- =======================================================================
-- test postgres_fdw_disconnect and postgres_fdw_disconnect_all functions
-- =======================================================================
-- Return true as all cached connections are closed.
SELECT postgres_fdw_disconnect_all();
postgres_fdw_disconnect_all
-----------------------------
t
(1 row)
-- Ensure to cache loopback connection.
SELECT 1 FROM ft1 LIMIT 1;
?column?
----------
1
(1 row)
BEGIN;
-- Ensure to cache loopback2 connection.
SELECT 1 FROM ft6 LIMIT 1;
?column?
----------
1
(1 row)
-- List all the existing cached connections. loopback and loopback2 should be
-- output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid
-------------+-------
loopback | t
loopback2 | t
(2 rows)
-- Issue a warning and return false as loopback2 connection is still in use and
-- can not be closed.
SELECT postgres_fdw_disconnect('loopback2');
WARNING: cannot close connection for server "loopback2" because it is still in use
postgres_fdw_disconnect
-------------------------
f
(1 row)
-- Close loopback connection, return true and issue a warning as loopback2
-- connection is still in use and can not be closed.
SELECT postgres_fdw_disconnect_all();
WARNING: cannot close connection for server "loopback2" because it is still in use
postgres_fdw_disconnect_all
-----------------------------
t
(1 row)
-- List all the existing cached connections. loopback2 should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid
-------------+-------
loopback2 | t
(1 row)
-- Ensure to cache loopback connection.
SELECT 1 FROM ft1 LIMIT 1;
?column?
----------
1
(1 row)
-- Ensure to cache loopback4 connection.
SELECT 1 FROM ft8 LIMIT 1;
?column?
----------
1
(1 row)
-- List all the existing cached connections. loopback, loopback2, loopback4
-- should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid
-------------+-------
loopback | t
loopback2 | t
loopback4 | t
(3 rows)
DROP SERVER loopback4 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to user mapping for public on server loopback4
drop cascades to foreign table ft8
-- Return false as connections are still in use, warnings are issued.
SELECT postgres_fdw_disconnect_all();
WARNING: cannot close dropped server connection because it is still in use
WARNING: cannot close connection for server "loopback" because it is still in use
WARNING: cannot close connection for server "loopback2" because it is still in use
postgres_fdw_disconnect_all
-----------------------------
f
(1 row)
COMMIT;
-- Close loopback2 connection and return true.
SELECT postgres_fdw_disconnect('loopback2');
postgres_fdw_disconnect
-------------------------
t
(1 row)
-- List all the existing cached connections. loopback should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid
-------------+-------
loopback | t
(1 row)
-- Return false as loopback2 connectin is closed already.
SELECT postgres_fdw_disconnect('loopback2');
postgres_fdw_disconnect
-------------------------
f
(1 row)
-- Return an error as there is no foreign server with given name.
SELECT postgres_fdw_disconnect('unknownserver');
ERROR: server "unknownserver" does not exist
-- Close loopback connection and return true.
SELECT postgres_fdw_disconnect_all();
postgres_fdw_disconnect_all
-----------------------------
t
(1 row)
-- List all the existing cached connections. No connection exists, so NULL
-- should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid
-------------+-------
(0 rows)
-- =============================================================================
-- test case for having multiple cached connections for a foreign server
-- =============================================================================
CREATE ROLE multi_conn_user1 SUPERUSER;
CREATE ROLE multi_conn_user2 SUPERUSER;
CREATE USER MAPPING FOR multi_conn_user1 SERVER loopback;
CREATE USER MAPPING FOR multi_conn_user2 SERVER loopback;
-- Will cache loopback connection with user mapping for multi_conn_user1
SET ROLE multi_conn_user1;
SELECT 1 FROM ft1 LIMIT 1;
?column?
----------
1
(1 row)
RESET ROLE;
-- Will cache loopback connection with user mapping for multi_conn_user2
SET ROLE multi_conn_user2;
SELECT 1 FROM ft1 LIMIT 1;
?column?
----------
1
(1 row)
RESET ROLE;
-- Should output two connections for loopback server
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid
-------------+-------
loopback | t
loopback | t
(2 rows)
-- Close loopback connections and return true.
SELECT postgres_fdw_disconnect('loopback');
postgres_fdw_disconnect
-------------------------
t
(1 row)
-- List all the existing cached connections. No connection exists, so NULL
-- should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid
-------------+-------
(0 rows)
-- Clean up
DROP USER MAPPING FOR multi_conn_user1 SERVER loopback;
DROP USER MAPPING FOR multi_conn_user2 SERVER loopback;
DROP ROLE multi_conn_user1;
DROP ROLE multi_conn_user2;
-- ===================================================================
-- batch insert
-- ===================================================================

View File

@@ -8,3 +8,13 @@ CREATE FUNCTION postgres_fdw_get_connections (OUT server_name text,
RETURNS SETOF record
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT PARALLEL RESTRICTED;
CREATE FUNCTION postgres_fdw_disconnect (text)
RETURNS bool
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT PARALLEL RESTRICTED;
CREATE FUNCTION postgres_fdw_disconnect_all ()
RETURNS bool
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT PARALLEL RESTRICTED;

View File

@@ -19,7 +19,10 @@ DO $d$
OPTIONS (dbname '$$||current_database()||$$',
port '$$||current_setting('port')||$$'
)$$;
EXECUTE $$CREATE SERVER loopback4 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (dbname '$$||current_database()||$$',
port '$$||current_setting('port')||$$'
)$$;
END;
$d$;
@@ -28,6 +31,7 @@ CREATE USER MAPPING FOR public SERVER testserver1
CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
CREATE USER MAPPING FOR public SERVER loopback3;
CREATE USER MAPPING FOR public SERVER loopback4;
-- ===================================================================
-- create objects used through FDW loopback server
@@ -154,6 +158,12 @@ CREATE FOREIGN TABLE ft7 (
c3 text
) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 4');
CREATE FOREIGN TABLE ft8 (
c1 int NOT NULL,
c2 int NOT NULL,
c3 text
) SERVER loopback4 OPTIONS (schema_name 'S 1', table_name 'T 4');
-- ===================================================================
-- tests for validator
-- ===================================================================
@@ -2710,9 +2720,9 @@ COMMIT;
-- Clean up
DROP PROCEDURE terminate_backend_and_wait(text);
-- ===================================================================
-- test connection invalidation cases
-- ===================================================================
-- =============================================================================
-- test connection invalidation cases and postgres_fdw_get_connections function
-- =============================================================================
-- This test case is for closing the connection in pgfdw_xact_callback
BEGIN;
-- List all the existing cached connections. Only loopback2 should be output.
@@ -2739,6 +2749,86 @@ COMMIT;
-- the above transaction.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
-- =======================================================================
-- test postgres_fdw_disconnect and postgres_fdw_disconnect_all functions
-- =======================================================================
-- Return true as all cached connections are closed.
SELECT postgres_fdw_disconnect_all();
-- Ensure to cache loopback connection.
SELECT 1 FROM ft1 LIMIT 1;
BEGIN;
-- Ensure to cache loopback2 connection.
SELECT 1 FROM ft6 LIMIT 1;
-- List all the existing cached connections. loopback and loopback2 should be
-- output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
-- Issue a warning and return false as loopback2 connection is still in use and
-- can not be closed.
SELECT postgres_fdw_disconnect('loopback2');
-- Close loopback connection, return true and issue a warning as loopback2
-- connection is still in use and can not be closed.
SELECT postgres_fdw_disconnect_all();
-- List all the existing cached connections. loopback2 should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
-- Ensure to cache loopback connection.
SELECT 1 FROM ft1 LIMIT 1;
-- Ensure to cache loopback4 connection.
SELECT 1 FROM ft8 LIMIT 1;
-- List all the existing cached connections. loopback, loopback2, loopback4
-- should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
DROP SERVER loopback4 CASCADE;
-- Return false as connections are still in use, warnings are issued.
SELECT postgres_fdw_disconnect_all();
COMMIT;
-- Close loopback2 connection and return true.
SELECT postgres_fdw_disconnect('loopback2');
-- List all the existing cached connections. loopback should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
-- Return false as loopback2 connectin is closed already.
SELECT postgres_fdw_disconnect('loopback2');
-- Return an error as there is no foreign server with given name.
SELECT postgres_fdw_disconnect('unknownserver');
-- Close loopback connection and return true.
SELECT postgres_fdw_disconnect_all();
-- List all the existing cached connections. No connection exists, so NULL
-- should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
-- =============================================================================
-- test case for having multiple cached connections for a foreign server
-- =============================================================================
CREATE ROLE multi_conn_user1 SUPERUSER;
CREATE ROLE multi_conn_user2 SUPERUSER;
CREATE USER MAPPING FOR multi_conn_user1 SERVER loopback;
CREATE USER MAPPING FOR multi_conn_user2 SERVER loopback;
-- Will cache loopback connection with user mapping for multi_conn_user1
SET ROLE multi_conn_user1;
SELECT 1 FROM ft1 LIMIT 1;
RESET ROLE;
-- Will cache loopback connection with user mapping for multi_conn_user2
SET ROLE multi_conn_user2;
SELECT 1 FROM ft1 LIMIT 1;
RESET ROLE;
-- Should output two connections for loopback server
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
-- Close loopback connections and return true.
SELECT postgres_fdw_disconnect('loopback');
-- List all the existing cached connections. No connection exists, so NULL
-- should be output.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
-- Clean up
DROP USER MAPPING FOR multi_conn_user1 SERVER loopback;
DROP USER MAPPING FOR multi_conn_user2 SERVER loopback;
DROP ROLE multi_conn_user1;
DROP ROLE multi_conn_user2;
-- ===================================================================
-- batch insert
-- ===================================================================