1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-13 07:41:39 +03:00

postgres_fdw: Add "used_in_xact" column to postgres_fdw_get_connections().

This commit extends the postgres_fdw_get_connections() function to
include a new used_in_xact column, indicating whether each connection
is used in the current transaction.

This addition is particularly useful for the upcoming feature that
will check if connections are closed. By using those information,
users can verify if postgres_fdw connections used in a transaction
remain open. If any connection is closed, the transaction cannot
be committed successfully. In this case users can roll back it
immediately without waiting for transaction end.

The SQL API for postgres_fdw_get_connections() is updated by
this commit and may change in the future. To handle compatibility
with older SQL declarations, an API versioning system is introduced,
allowing the function to behave differently based on the API version.

Author: Hayato Kuroda
Reviewed-by: Fujii Masao
Discussion: https://postgr.es/m/be9382f7-5072-4760-8b3f-31d6dffa8d62@oss.nttdata.com
This commit is contained in:
Fujii Masao
2024-07-26 22:15:51 +09:00
parent 5687f8c0dd
commit c297a47c5f
7 changed files with 112 additions and 20 deletions

View File

@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq) SHLIB_LINK_INTERNAL = $(libpq)
EXTENSION = postgres_fdw EXTENSION = postgres_fdw
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql
REGRESS = postgres_fdw query_cancel REGRESS = postgres_fdw query_cancel

View File

@ -107,10 +107,20 @@ static uint32 pgfdw_we_get_result = 0;
(entry)->xact_depth, (entry)->xact_depth); \ (entry)->xact_depth, (entry)->xact_depth); \
} while(0) } while(0)
/*
* Extension version number, for supporting older extension versions' objects
*/
enum pgfdwVersion
{
PGFDW_V1_1 = 0,
PGFDW_V1_2,
};
/* /*
* SQL functions * SQL functions
*/ */
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections); PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
@ -159,6 +169,8 @@ static void pgfdw_security_check(const char **keywords, const char **values,
UserMapping *user, PGconn *conn); UserMapping *user, PGconn *conn);
static bool UserMappingPasswordRequired(UserMapping *user); static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid); static bool disconnect_cached_connections(Oid serverid);
static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
enum pgfdwVersion api_version);
/* /*
* Get a PGconn which can be used to execute queries on the remote PostgreSQL * Get a PGconn which can be used to execute queries on the remote PostgreSQL
@ -1977,23 +1989,34 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
} }
} }
/* Number of output arguments (columns) for various API versions */
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 3
#define POSTGRES_FDW_GET_CONNECTIONS_COLS 3 /* maximum of above */
/* /*
* List active foreign server connections. * Internal function used by postgres_fdw_get_connections variants.
*
* For API version 1.1, this function returns a set of records with
* the following values:
* *
* This function takes no input parameter and returns setof record made of
* following values:
* - server_name - server name of active connection. In case the foreign server * - server_name - server name of active connection. In case the foreign server
* is dropped but still the connection is active, then the server name will * is dropped but still the connection is active, then the server name will
* be NULL in output. * be NULL in output.
* - valid - true/false representing whether the connection is valid or not. * - valid - true/false representing whether the connection is valid or not.
* Note that the connections can get invalidated in pgfdw_inval_callback. * Note that connections can become invalid in pgfdw_inval_callback.
*
* For API version 1.2 and later, this function returns the following
* additional value along with the two values from version 1.1:
*
* - used_in_xact - true if the connection is used in the current transaction.
* *
* No records are returned when there are no cached connections at all. * No records are returned when there are no cached connections at all.
*/ */
Datum static void
postgres_fdw_get_connections(PG_FUNCTION_ARGS) postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
enum pgfdwVersion api_version)
{ {
#define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
HASH_SEQ_STATUS scan; HASH_SEQ_STATUS scan;
ConnCacheEntry *entry; ConnCacheEntry *entry;
@ -2002,7 +2025,22 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS)
/* If cache doesn't exist, we return no records */ /* If cache doesn't exist, we return no records */
if (!ConnectionHash) if (!ConnectionHash)
PG_RETURN_VOID(); return;
/* Check we have the expected number of output arguments */
switch (rsinfo->setDesc->natts)
{
case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
if (api_version != PGFDW_V1_1)
elog(ERROR, "incorrect number of output arguments");
break;
case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
if (api_version != PGFDW_V1_2)
elog(ERROR, "incorrect number of output arguments");
break;
default:
elog(ERROR, "incorrect number of output arguments");
}
hash_seq_init(&scan, ConnectionHash); hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
@ -2061,8 +2099,36 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS)
values[1] = BoolGetDatum(!entry->invalidated); values[1] = BoolGetDatum(!entry->invalidated);
if (api_version >= PGFDW_V1_2)
{
/* Is this connection used in the current transaction? */
values[2] = BoolGetDatum(entry->xact_depth > 0);
}
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
} }
}
/*
* List active foreign server connections.
*
* The SQL API of this function has changed multiple times, and will likely
* do so again in future. To support the case where a newer version of this
* loadable module is being used with an old SQL declaration of the function,
* we continue to support the older API versions.
*/
Datum
postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
{
postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
PG_RETURN_VOID();
}
Datum
postgres_fdw_get_connections(PG_FUNCTION_ARGS)
{
postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -10464,10 +10464,10 @@ drop cascades to foreign table ft7
-- should be output as invalid connections. Also the server name for -- should be output as invalid connections. Also the server name for
-- loopback3 should be NULL because the server was dropped. -- loopback3 should be NULL because the server was dropped.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid server_name | valid | used_in_xact
-------------+------- -------------+-------+--------------
loopback | f loopback | f | t
| f | f | t
(2 rows) (2 rows)
-- The invalid connections get closed in pgfdw_xact_callback during commit. -- The invalid connections get closed in pgfdw_xact_callback during commit.

View File

@ -26,6 +26,7 @@ install_data(
'postgres_fdw.control', 'postgres_fdw.control',
'postgres_fdw--1.0.sql', 'postgres_fdw--1.0.sql',
'postgres_fdw--1.0--1.1.sql', 'postgres_fdw--1.0--1.1.sql',
'postgres_fdw--1.1--1.2.sql',
kwargs: contrib_data_args, kwargs: contrib_data_args,
) )

View File

@ -0,0 +1,16 @@
/* contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql */
-- complain if script is sourced in psql, rather than via ALTER EXTENSION
\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.2'" to load this file. \quit
/* First we have to remove it from the extension */
ALTER EXTENSION postgres_fdw DROP FUNCTION postgres_fdw_get_connections ();
/* Then we can drop it */
DROP FUNCTION postgres_fdw_get_connections ();
CREATE FUNCTION postgres_fdw_get_connections (OUT server_name text,
OUT valid boolean, OUT used_in_xact boolean)
RETURNS SETOF record
AS 'MODULE_PATHNAME', 'postgres_fdw_get_connections_1_2'
LANGUAGE C STRICT PARALLEL RESTRICTED;

View File

@ -1,5 +1,5 @@
# postgres_fdw extension # postgres_fdw extension
comment = 'foreign-data wrapper for remote PostgreSQL servers' comment = 'foreign-data wrapper for remote PostgreSQL servers'
default_version = '1.1' default_version = '1.2'
module_pathname = '$libdir/postgres_fdw' module_pathname = '$libdir/postgres_fdw'
relocatable = true relocatable = true

View File

@ -777,7 +777,9 @@ OPTIONS (ADD password_required 'false');
<variablelist> <variablelist>
<varlistentry> <varlistentry>
<term><function>postgres_fdw_get_connections(OUT server_name text, OUT valid boolean) returns setof record</function></term> <term><function>postgres_fdw_get_connections(OUT server_name text,
OUT valid boolean, OUT used_in_xact boolean)
returns setof record</function></term>
<listitem> <listitem>
<para> <para>
This function returns information about all open connections postgres_fdw This function returns information about all open connections postgres_fdw
@ -785,11 +787,11 @@ OPTIONS (ADD password_required 'false');
no open connections, no records are returned. no open connections, no records are returned.
Example usage of the function: Example usage of the function:
<screen> <screen>
postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; postgres=*# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid server_name | valid | used_in_xact
-------------+------- -------------+-------+--------------
loopback1 | t loopback1 | t | t
loopback2 | f loopback2 | f | t
</screen> </screen>
The output columns are described in The output columns are described in
<xref linkend="postgres-fdw-get-connections-columns"/>. <xref linkend="postgres-fdw-get-connections-columns"/>.
@ -827,6 +829,13 @@ postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
the transaction. True is returned otherwise. the transaction. True is returned otherwise.
</entry> </entry>
</row> </row>
<row>
<entry><structfield>used_in_xact</structfield></entry>
<entry><type>boolean</type></entry>
<entry>
True if this connection is used in the current transaction.
</entry>
</row>
</tbody> </tbody>
</tgroup> </tgroup>
</table> </table>