mirror of
https://github.com/postgres/postgres.git
synced 2025-07-28 23:42:10 +03:00
Add support for asynchronous execution.
This implements asynchronous execution, which runs multiple parts of a non-parallel-aware Append concurrently rather than serially to improve performance when possible. Currently, the only node type that can be run concurrently is a ForeignScan that is an immediate child of such an Append. In the case where such ForeignScans access data on different remote servers, this would run those ForeignScans concurrently, and overlap the remote operations to be performed simultaneously, so it'll improve the performance especially when the operations involve time-consuming ones such as remote join and remote aggregation. We may extend this to other node types such as joins or aggregates over ForeignScans in the future. This also adds the support for postgres_fdw, which is enabled by the table-level/server-level option "async_capable". The default is false. Robert Haas, Kyotaro Horiguchi, Thomas Munro, and myself. This commit is mostly based on the patch proposed by Robert Haas, but also uses stuff from the patch proposed by Kyotaro Horiguchi and from the patch proposed by Thomas Munro. Reviewed by Kyotaro Horiguchi, Konstantin Knizhnik, Andrey Lepikhov, Movead Li, Thomas Munro, Justin Pryzby, and others. Discussion: https://postgr.es/m/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.com Discussion: https://postgr.es/m/CA%2BhUKGLBRyu0rHrDCMC4%3DRn3252gogyp1SjOgG8SEKKZv%3DFwfQ%40mail.gmail.com Discussion: https://postgr.es/m/20200228.170650.667613673625155850.horikyota.ntt%40gmail.com
This commit is contained in:
@ -62,6 +62,7 @@ typedef struct ConnCacheEntry
|
||||
Oid serverid; /* foreign server OID used to get server name */
|
||||
uint32 server_hashvalue; /* hash value of foreign server OID */
|
||||
uint32 mapping_hashvalue; /* hash value of user mapping OID */
|
||||
PgFdwConnState state; /* extra per-connection state */
|
||||
} ConnCacheEntry;
|
||||
|
||||
/*
|
||||
@ -115,9 +116,12 @@ static bool disconnect_cached_connections(Oid serverid);
|
||||
* will_prep_stmt must be true if caller intends to create any prepared
|
||||
* statements. Since those don't go away automatically at transaction end
|
||||
* (not even on error), we need this flag to cue manual cleanup.
|
||||
*
|
||||
* If state is not NULL, *state receives the per-connection state associated
|
||||
* with the PGconn.
|
||||
*/
|
||||
PGconn *
|
||||
GetConnection(UserMapping *user, bool will_prep_stmt)
|
||||
GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
|
||||
{
|
||||
bool found;
|
||||
bool retry = false;
|
||||
@ -196,6 +200,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
|
||||
*/
|
||||
PG_TRY();
|
||||
{
|
||||
/* Process a pending asynchronous request if any. */
|
||||
if (entry->state.pendingAreq)
|
||||
process_pending_request(entry->state.pendingAreq);
|
||||
/* Start a new transaction or subtransaction if needed. */
|
||||
begin_remote_xact(entry);
|
||||
}
|
||||
@ -264,6 +271,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
|
||||
/* Remember if caller will prepare statements */
|
||||
entry->have_prep_stmt |= will_prep_stmt;
|
||||
|
||||
/* If caller needs access to the per-connection state, return it. */
|
||||
if (state)
|
||||
*state = &entry->state;
|
||||
|
||||
return entry->conn;
|
||||
}
|
||||
|
||||
@ -291,6 +302,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
|
||||
entry->mapping_hashvalue =
|
||||
GetSysCacheHashValue1(USERMAPPINGOID,
|
||||
ObjectIdGetDatum(user->umid));
|
||||
memset(&entry->state, 0, sizeof(entry->state));
|
||||
|
||||
/* Now try to make the connection */
|
||||
entry->conn = connect_pg_server(server, user);
|
||||
@ -648,8 +660,12 @@ GetPrepStmtNumber(PGconn *conn)
|
||||
* Caller is responsible for the error handling on the result.
|
||||
*/
|
||||
PGresult *
|
||||
pgfdw_exec_query(PGconn *conn, const char *query)
|
||||
pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
|
||||
{
|
||||
/* First, process a pending asynchronous request, if any. */
|
||||
if (state && state->pendingAreq)
|
||||
process_pending_request(state->pendingAreq);
|
||||
|
||||
/*
|
||||
* Submit a query. Since we don't use non-blocking mode, this also can
|
||||
* block. But its risk is relatively small, so we ignore that for now.
|
||||
@ -940,6 +956,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
||||
{
|
||||
entry->have_prep_stmt = false;
|
||||
entry->have_error = false;
|
||||
/* Also reset per-connection state */
|
||||
memset(&entry->state, 0, sizeof(entry->state));
|
||||
}
|
||||
|
||||
/* Disarm changing_xact_state if it all worked. */
|
||||
@ -1172,6 +1190,10 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
|
||||
* Cancel the currently-in-progress query (whose query text we do not have)
|
||||
* and ignore the result. Returns true if we successfully cancel the query
|
||||
* and discard any pending result, and false if not.
|
||||
*
|
||||
* XXX: if the query was one sent by fetch_more_data_begin(), we could get the
|
||||
* query text from the pendingAreq saved in the per-connection state, then
|
||||
* report the query using it.
|
||||
*/
|
||||
static bool
|
||||
pgfdw_cancel_query(PGconn *conn)
|
||||
|
@ -8946,7 +8946,7 @@ DO $d$
|
||||
END;
|
||||
$d$;
|
||||
ERROR: invalid option "password"
|
||||
HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
|
||||
HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size, async_capable
|
||||
CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
|
||||
PL/pgSQL function inline_code_block line 3 at EXECUTE
|
||||
-- If we add a password for our user mapping instead, we should get a different
|
||||
@ -9437,3 +9437,510 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
|
||||
|
||||
-- Clean up
|
||||
DROP TABLE batch_table, batch_cp_upd_test CASCADE;
|
||||
-- ===================================================================
|
||||
-- test asynchronous execution
|
||||
-- ===================================================================
|
||||
ALTER SERVER loopback OPTIONS (DROP extensions);
|
||||
ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
|
||||
ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
|
||||
CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
|
||||
CREATE TABLE base_tbl1 (a int, b int, c text);
|
||||
CREATE TABLE base_tbl2 (a int, b int, c text);
|
||||
CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
|
||||
SERVER loopback OPTIONS (table_name 'base_tbl1');
|
||||
CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
|
||||
SERVER loopback2 OPTIONS (table_name 'base_tbl2');
|
||||
INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
|
||||
INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
|
||||
ANALYZE async_pt;
|
||||
-- simple queries
|
||||
CREATE TABLE result_tbl (a int, b int, c text);
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------------------------------
|
||||
Insert on public.result_tbl
|
||||
-> Append
|
||||
-> Async Foreign Scan on public.async_p1 async_pt_1
|
||||
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0))
|
||||
-> Async Foreign Scan on public.async_p2 async_pt_2
|
||||
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0))
|
||||
(8 rows)
|
||||
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
a | b | c
|
||||
------+-----+------
|
||||
1000 | 0 | 0000
|
||||
1100 | 100 | 0100
|
||||
1200 | 200 | 0200
|
||||
1300 | 300 | 0300
|
||||
1400 | 400 | 0400
|
||||
1500 | 500 | 0500
|
||||
1600 | 600 | 0600
|
||||
1700 | 700 | 0700
|
||||
1800 | 800 | 0800
|
||||
1900 | 900 | 0900
|
||||
2000 | 0 | 0000
|
||||
2100 | 100 | 0100
|
||||
2200 | 200 | 0200
|
||||
2300 | 300 | 0300
|
||||
2400 | 400 | 0400
|
||||
2500 | 500 | 0500
|
||||
2600 | 600 | 0600
|
||||
2700 | 700 | 0700
|
||||
2800 | 800 | 0800
|
||||
2900 | 900 | 0900
|
||||
(20 rows)
|
||||
|
||||
DELETE FROM result_tbl;
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------
|
||||
Insert on public.result_tbl
|
||||
-> Append
|
||||
-> Async Foreign Scan on public.async_p1 async_pt_1
|
||||
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
|
||||
Filter: (async_pt_1.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1
|
||||
-> Async Foreign Scan on public.async_p2 async_pt_2
|
||||
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
|
||||
Filter: (async_pt_2.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2
|
||||
(10 rows)
|
||||
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
a | b | c
|
||||
------+-----+------
|
||||
1505 | 505 | 0505
|
||||
2505 | 505 | 0505
|
||||
(2 rows)
|
||||
|
||||
DELETE FROM result_tbl;
|
||||
-- Check case where multiple partitions use the same connection
|
||||
CREATE TABLE base_tbl3 (a int, b int, c text);
|
||||
CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
|
||||
SERVER loopback2 OPTIONS (table_name 'base_tbl3');
|
||||
INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
|
||||
ANALYZE async_pt;
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------
|
||||
Insert on public.result_tbl
|
||||
-> Append
|
||||
-> Async Foreign Scan on public.async_p1 async_pt_1
|
||||
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
|
||||
Filter: (async_pt_1.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1
|
||||
-> Async Foreign Scan on public.async_p2 async_pt_2
|
||||
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
|
||||
Filter: (async_pt_2.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2
|
||||
-> Async Foreign Scan on public.async_p3 async_pt_3
|
||||
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
|
||||
Filter: (async_pt_3.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl3
|
||||
(14 rows)
|
||||
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
a | b | c
|
||||
------+-----+------
|
||||
1505 | 505 | 0505
|
||||
2505 | 505 | 0505
|
||||
3505 | 505 | 0505
|
||||
(3 rows)
|
||||
|
||||
DELETE FROM result_tbl;
|
||||
DROP FOREIGN TABLE async_p3;
|
||||
DROP TABLE base_tbl3;
|
||||
-- Check case where the partitioned table has local/remote partitions
|
||||
CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
|
||||
INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
|
||||
ANALYZE async_pt;
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------
|
||||
Insert on public.result_tbl
|
||||
-> Append
|
||||
-> Async Foreign Scan on public.async_p1 async_pt_1
|
||||
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
|
||||
Filter: (async_pt_1.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1
|
||||
-> Async Foreign Scan on public.async_p2 async_pt_2
|
||||
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
|
||||
Filter: (async_pt_2.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2
|
||||
-> Seq Scan on public.async_p3 async_pt_3
|
||||
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
|
||||
Filter: (async_pt_3.b === 505)
|
||||
(13 rows)
|
||||
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
a | b | c
|
||||
------+-----+------
|
||||
1505 | 505 | 0505
|
||||
2505 | 505 | 0505
|
||||
3505 | 505 | 0505
|
||||
(3 rows)
|
||||
|
||||
DELETE FROM result_tbl;
|
||||
-- partitionwise joins
|
||||
SET enable_partitionwise_join TO true;
|
||||
CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
|
||||
QUERY PLAN
|
||||
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
Insert on public.join_tbl
|
||||
-> Append
|
||||
-> Async Foreign Scan
|
||||
Output: t1_1.a, t1_1.b, t1_1.c, t2_1.a, t2_1.b, t2_1.c
|
||||
Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1)
|
||||
Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0))))
|
||||
-> Async Foreign Scan
|
||||
Output: t1_2.a, t1_2.b, t1_2.c, t2_2.a, t2_2.b, t2_2.c
|
||||
Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2)
|
||||
Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0))))
|
||||
-> Hash Join
|
||||
Output: t1_3.a, t1_3.b, t1_3.c, t2_3.a, t2_3.b, t2_3.c
|
||||
Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b))
|
||||
-> Seq Scan on public.async_p3 t2_3
|
||||
Output: t2_3.a, t2_3.b, t2_3.c
|
||||
-> Hash
|
||||
Output: t1_3.a, t1_3.b, t1_3.c
|
||||
-> Seq Scan on public.async_p3 t1_3
|
||||
Output: t1_3.a, t1_3.b, t1_3.c
|
||||
Filter: ((t1_3.b % 100) = 0)
|
||||
(20 rows)
|
||||
|
||||
INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
|
||||
SELECT * FROM join_tbl ORDER BY a1;
|
||||
a1 | b1 | c1 | a2 | b2 | c2
|
||||
------+-----+------+------+-----+------
|
||||
1000 | 0 | 0000 | 1000 | 0 | 0000
|
||||
1100 | 100 | 0100 | 1100 | 100 | 0100
|
||||
1200 | 200 | 0200 | 1200 | 200 | 0200
|
||||
1300 | 300 | 0300 | 1300 | 300 | 0300
|
||||
1400 | 400 | 0400 | 1400 | 400 | 0400
|
||||
1500 | 500 | 0500 | 1500 | 500 | 0500
|
||||
1600 | 600 | 0600 | 1600 | 600 | 0600
|
||||
1700 | 700 | 0700 | 1700 | 700 | 0700
|
||||
1800 | 800 | 0800 | 1800 | 800 | 0800
|
||||
1900 | 900 | 0900 | 1900 | 900 | 0900
|
||||
2000 | 0 | 0000 | 2000 | 0 | 0000
|
||||
2100 | 100 | 0100 | 2100 | 100 | 0100
|
||||
2200 | 200 | 0200 | 2200 | 200 | 0200
|
||||
2300 | 300 | 0300 | 2300 | 300 | 0300
|
||||
2400 | 400 | 0400 | 2400 | 400 | 0400
|
||||
2500 | 500 | 0500 | 2500 | 500 | 0500
|
||||
2600 | 600 | 0600 | 2600 | 600 | 0600
|
||||
2700 | 700 | 0700 | 2700 | 700 | 0700
|
||||
2800 | 800 | 0800 | 2800 | 800 | 0800
|
||||
2900 | 900 | 0900 | 2900 | 900 | 0900
|
||||
3000 | 0 | 0000 | 3000 | 0 | 0000
|
||||
3100 | 100 | 0100 | 3100 | 100 | 0100
|
||||
3200 | 200 | 0200 | 3200 | 200 | 0200
|
||||
3300 | 300 | 0300 | 3300 | 300 | 0300
|
||||
3400 | 400 | 0400 | 3400 | 400 | 0400
|
||||
3500 | 500 | 0500 | 3500 | 500 | 0500
|
||||
3600 | 600 | 0600 | 3600 | 600 | 0600
|
||||
3700 | 700 | 0700 | 3700 | 700 | 0700
|
||||
3800 | 800 | 0800 | 3800 | 800 | 0800
|
||||
3900 | 900 | 0900 | 3900 | 900 | 0900
|
||||
(30 rows)
|
||||
|
||||
DELETE FROM join_tbl;
|
||||
RESET enable_partitionwise_join;
|
||||
-- Test interaction of async execution with plan-time partition pruning
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM async_pt WHERE a < 3000;
|
||||
QUERY PLAN
|
||||
-----------------------------------------------------------------------------
|
||||
Append
|
||||
-> Async Foreign Scan on public.async_p1 async_pt_1
|
||||
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000))
|
||||
-> Async Foreign Scan on public.async_p2 async_pt_2
|
||||
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000))
|
||||
(7 rows)
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM async_pt WHERE a < 2000;
|
||||
QUERY PLAN
|
||||
-----------------------------------------------------------------------
|
||||
Foreign Scan on public.async_p1 async_pt
|
||||
Output: async_pt.a, async_pt.b, async_pt.c
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 2000))
|
||||
(3 rows)
|
||||
|
||||
-- Test interaction of async execution with run-time partition pruning
|
||||
SET plan_cache_mode TO force_generic_plan;
|
||||
PREPARE async_pt_query (int, int) AS
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
EXECUTE async_pt_query (3000, 505);
|
||||
QUERY PLAN
|
||||
------------------------------------------------------------------------------------------
|
||||
Insert on public.result_tbl
|
||||
-> Append
|
||||
Subplans Removed: 1
|
||||
-> Async Foreign Scan on public.async_p1 async_pt_1
|
||||
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
|
||||
Filter: (async_pt_1.b === $2)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
|
||||
-> Async Foreign Scan on public.async_p2 async_pt_2
|
||||
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
|
||||
Filter: (async_pt_2.b === $2)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < $1::integer))
|
||||
(11 rows)
|
||||
|
||||
EXECUTE async_pt_query (3000, 505);
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
a | b | c
|
||||
------+-----+------
|
||||
1505 | 505 | 0505
|
||||
2505 | 505 | 0505
|
||||
(2 rows)
|
||||
|
||||
DELETE FROM result_tbl;
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
EXECUTE async_pt_query (2000, 505);
|
||||
QUERY PLAN
|
||||
------------------------------------------------------------------------------------------
|
||||
Insert on public.result_tbl
|
||||
-> Append
|
||||
Subplans Removed: 2
|
||||
-> Async Foreign Scan on public.async_p1 async_pt_1
|
||||
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
|
||||
Filter: (async_pt_1.b === $2)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
|
||||
(7 rows)
|
||||
|
||||
EXECUTE async_pt_query (2000, 505);
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
a | b | c
|
||||
------+-----+------
|
||||
1505 | 505 | 0505
|
||||
(1 row)
|
||||
|
||||
DELETE FROM result_tbl;
|
||||
RESET plan_cache_mode;
|
||||
CREATE TABLE local_tbl(a int, b int, c text);
|
||||
INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
|
||||
ANALYZE local_tbl;
|
||||
CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
|
||||
CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
|
||||
CREATE INDEX async_p3_idx ON async_p3 (a);
|
||||
ANALYZE base_tbl1;
|
||||
ANALYZE base_tbl2;
|
||||
ANALYZE async_p3;
|
||||
ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
|
||||
ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
|
||||
QUERY PLAN
|
||||
------------------------------------------------------------------------------------------
|
||||
Nested Loop
|
||||
Output: local_tbl.a, local_tbl.b, local_tbl.c, async_pt.a, async_pt.b, async_pt.c
|
||||
-> Seq Scan on public.local_tbl
|
||||
Output: local_tbl.a, local_tbl.b, local_tbl.c
|
||||
Filter: (local_tbl.c = 'bar'::text)
|
||||
-> Append
|
||||
-> Async Foreign Scan on public.async_p1 async_pt_1
|
||||
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (($1::integer = a))
|
||||
-> Async Foreign Scan on public.async_p2 async_pt_2
|
||||
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (($1::integer = a))
|
||||
-> Seq Scan on public.async_p3 async_pt_3
|
||||
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
|
||||
Filter: (local_tbl.a = async_pt_3.a)
|
||||
(15 rows)
|
||||
|
||||
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
|
||||
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
|
||||
QUERY PLAN
|
||||
-------------------------------------------------------------------------------
|
||||
Nested Loop (actual rows=1 loops=1)
|
||||
-> Seq Scan on local_tbl (actual rows=1 loops=1)
|
||||
Filter: (c = 'bar'::text)
|
||||
Rows Removed by Filter: 1
|
||||
-> Append (actual rows=1 loops=1)
|
||||
-> Async Foreign Scan on async_p1 async_pt_1 (never executed)
|
||||
-> Async Foreign Scan on async_p2 async_pt_2 (actual rows=1 loops=1)
|
||||
-> Seq Scan on async_p3 async_pt_3 (never executed)
|
||||
Filter: (local_tbl.a = a)
|
||||
(9 rows)
|
||||
|
||||
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
|
||||
a | b | c | a | b | c
|
||||
------+-----+-----+------+-----+------
|
||||
2505 | 505 | bar | 2505 | 505 | 0505
|
||||
(1 row)
|
||||
|
||||
ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
|
||||
ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
|
||||
DROP TABLE local_tbl;
|
||||
DROP INDEX base_tbl1_idx;
|
||||
DROP INDEX base_tbl2_idx;
|
||||
DROP INDEX async_p3_idx;
|
||||
-- Test that pending requests are processed properly
|
||||
SET enable_mergejoin TO false;
|
||||
SET enable_hashjoin TO false;
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------
|
||||
Nested Loop
|
||||
Output: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c
|
||||
Join Filter: (t1.a = t2.a)
|
||||
-> Append
|
||||
-> Async Foreign Scan on public.async_p1 t1_1
|
||||
Output: t1_1.a, t1_1.b, t1_1.c
|
||||
Filter: (t1_1.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1
|
||||
-> Async Foreign Scan on public.async_p2 t1_2
|
||||
Output: t1_2.a, t1_2.b, t1_2.c
|
||||
Filter: (t1_2.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2
|
||||
-> Seq Scan on public.async_p3 t1_3
|
||||
Output: t1_3.a, t1_3.b, t1_3.c
|
||||
Filter: (t1_3.b === 505)
|
||||
-> Materialize
|
||||
Output: t2.a, t2.b, t2.c
|
||||
-> Foreign Scan on public.async_p2 t2
|
||||
Output: t2.a, t2.b, t2.c
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2
|
||||
(20 rows)
|
||||
|
||||
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
|
||||
a | b | c | a | b | c
|
||||
------+-----+------+------+-----+------
|
||||
2505 | 505 | 0505 | 2505 | 505 | 0505
|
||||
(1 row)
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------
|
||||
Limit
|
||||
Output: t1.a, t1.b, t1.c
|
||||
-> Append
|
||||
-> Async Foreign Scan on public.async_p1 t1_1
|
||||
Output: t1_1.a, t1_1.b, t1_1.c
|
||||
Filter: (t1_1.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1
|
||||
-> Async Foreign Scan on public.async_p2 t1_2
|
||||
Output: t1_2.a, t1_2.b, t1_2.c
|
||||
Filter: (t1_2.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2
|
||||
-> Seq Scan on public.async_p3 t1_3
|
||||
Output: t1_3.a, t1_3.b, t1_3.c
|
||||
Filter: (t1_3.b === 505)
|
||||
(14 rows)
|
||||
|
||||
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
|
||||
a | b | c
|
||||
------+-----+------
|
||||
3505 | 505 | 0505
|
||||
(1 row)
|
||||
|
||||
-- Check with foreign modify
|
||||
CREATE TABLE local_tbl (a int, b int, c text);
|
||||
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
|
||||
CREATE TABLE base_tbl3 (a int, b int, c text);
|
||||
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
|
||||
SERVER loopback OPTIONS (table_name 'base_tbl3');
|
||||
INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
|
||||
CREATE TABLE base_tbl4 (a int, b int, c text);
|
||||
CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
|
||||
SERVER loopback OPTIONS (table_name 'base_tbl4');
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
|
||||
QUERY PLAN
|
||||
-------------------------------------------------------------------------
|
||||
Insert on public.insert_tbl
|
||||
Remote SQL: INSERT INTO public.base_tbl4(a, b, c) VALUES ($1, $2, $3)
|
||||
Batch Size: 1
|
||||
-> Append
|
||||
-> Seq Scan on public.local_tbl
|
||||
Output: local_tbl.a, local_tbl.b, local_tbl.c
|
||||
-> Async Foreign Scan on public.remote_tbl
|
||||
Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl3
|
||||
(9 rows)
|
||||
|
||||
INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
|
||||
SELECT * FROM insert_tbl ORDER BY a;
|
||||
a | b | c
|
||||
------+-----+-----
|
||||
1505 | 505 | foo
|
||||
2505 | 505 | bar
|
||||
(2 rows)
|
||||
|
||||
-- Check with direct modify
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
|
||||
INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------------------------------
|
||||
Insert on public.join_tbl
|
||||
CTE t
|
||||
-> Update on public.remote_tbl
|
||||
Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
|
||||
-> Foreign Update on public.remote_tbl
|
||||
Remote SQL: UPDATE public.base_tbl3 SET c = (c || c) RETURNING a, b, c
|
||||
-> Nested Loop Left Join
|
||||
Output: async_pt.a, async_pt.b, async_pt.c, t.a, t.b, t.c
|
||||
Join Filter: ((async_pt.a = t.a) AND (async_pt.b = t.b))
|
||||
-> Append
|
||||
-> Async Foreign Scan on public.async_p1 async_pt_1
|
||||
Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
|
||||
Filter: (async_pt_1.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl1
|
||||
-> Async Foreign Scan on public.async_p2 async_pt_2
|
||||
Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
|
||||
Filter: (async_pt_2.b === 505)
|
||||
Remote SQL: SELECT a, b, c FROM public.base_tbl2
|
||||
-> Seq Scan on public.async_p3 async_pt_3
|
||||
Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
|
||||
Filter: (async_pt_3.b === 505)
|
||||
-> CTE Scan on t
|
||||
Output: t.a, t.b, t.c
|
||||
(23 rows)
|
||||
|
||||
WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
|
||||
INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
|
||||
SELECT * FROM join_tbl ORDER BY a1;
|
||||
a1 | b1 | c1 | a2 | b2 | c2
|
||||
------+-----+------+------+-----+--------
|
||||
1505 | 505 | 0505 | | |
|
||||
2505 | 505 | 0505 | 2505 | 505 | barbar
|
||||
3505 | 505 | 0505 | | |
|
||||
(3 rows)
|
||||
|
||||
DELETE FROM join_tbl;
|
||||
RESET enable_mergejoin;
|
||||
RESET enable_hashjoin;
|
||||
-- Clean up
|
||||
DROP TABLE async_pt;
|
||||
DROP TABLE base_tbl1;
|
||||
DROP TABLE base_tbl2;
|
||||
DROP TABLE result_tbl;
|
||||
DROP TABLE local_tbl;
|
||||
DROP FOREIGN TABLE remote_tbl;
|
||||
DROP FOREIGN TABLE insert_tbl;
|
||||
DROP TABLE base_tbl3;
|
||||
DROP TABLE base_tbl4;
|
||||
DROP TABLE join_tbl;
|
||||
ALTER SERVER loopback OPTIONS (DROP async_capable);
|
||||
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
|
||||
|
@ -107,7 +107,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
|
||||
* Validate option value, when we can do so without any context.
|
||||
*/
|
||||
if (strcmp(def->defname, "use_remote_estimate") == 0 ||
|
||||
strcmp(def->defname, "updatable") == 0)
|
||||
strcmp(def->defname, "updatable") == 0 ||
|
||||
strcmp(def->defname, "async_capable") == 0)
|
||||
{
|
||||
/* these accept only boolean values */
|
||||
(void) defGetBoolean(def);
|
||||
@ -217,6 +218,9 @@ InitPgFdwOptions(void)
|
||||
/* batch_size is available on both server and table */
|
||||
{"batch_size", ForeignServerRelationId, false},
|
||||
{"batch_size", ForeignTableRelationId, false},
|
||||
/* async_capable is available on both server and table */
|
||||
{"async_capable", ForeignServerRelationId, false},
|
||||
{"async_capable", ForeignTableRelationId, false},
|
||||
{"password_required", UserMappingRelationId, false},
|
||||
|
||||
/*
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "commands/defrem.h"
|
||||
#include "commands/explain.h"
|
||||
#include "commands/vacuum.h"
|
||||
#include "executor/execAsync.h"
|
||||
#include "foreign/fdwapi.h"
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
@ -37,6 +38,7 @@
|
||||
#include "optimizer/tlist.h"
|
||||
#include "parser/parsetree.h"
|
||||
#include "postgres_fdw.h"
|
||||
#include "storage/latch.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/float.h"
|
||||
#include "utils/guc.h"
|
||||
@ -143,6 +145,7 @@ typedef struct PgFdwScanState
|
||||
|
||||
/* for remote query execution */
|
||||
PGconn *conn; /* connection for the scan */
|
||||
PgFdwConnState *conn_state; /* extra per-connection state */
|
||||
unsigned int cursor_number; /* quasi-unique ID for my cursor */
|
||||
bool cursor_exists; /* have we created the cursor? */
|
||||
int numParams; /* number of parameters passed to query */
|
||||
@ -159,6 +162,9 @@ typedef struct PgFdwScanState
|
||||
int fetch_ct_2; /* Min(# of fetches done, 2) */
|
||||
bool eof_reached; /* true if last fetch reached EOF */
|
||||
|
||||
/* for asynchronous execution */
|
||||
bool async_capable; /* engage asynchronous-capable logic? */
|
||||
|
||||
/* working memory contexts */
|
||||
MemoryContext batch_cxt; /* context holding current batch of tuples */
|
||||
MemoryContext temp_cxt; /* context for per-tuple temporary data */
|
||||
@ -176,6 +182,7 @@ typedef struct PgFdwModifyState
|
||||
|
||||
/* for remote query execution */
|
||||
PGconn *conn; /* connection for the scan */
|
||||
PgFdwConnState *conn_state; /* extra per-connection state */
|
||||
char *p_name; /* name of prepared statement, if created */
|
||||
|
||||
/* extracted fdw_private data */
|
||||
@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState
|
||||
|
||||
/* for remote query execution */
|
||||
PGconn *conn; /* connection for the update */
|
||||
PgFdwConnState *conn_state; /* extra per-connection state */
|
||||
int numParams; /* number of parameters passed to query */
|
||||
FmgrInfo *param_flinfo; /* output conversion functions for them */
|
||||
List *param_exprs; /* executable expressions for param values */
|
||||
@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
|
||||
RelOptInfo *input_rel,
|
||||
RelOptInfo *output_rel,
|
||||
void *extra);
|
||||
static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
|
||||
static void postgresForeignAsyncRequest(AsyncRequest *areq);
|
||||
static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
|
||||
static void postgresForeignAsyncNotify(AsyncRequest *areq);
|
||||
|
||||
/*
|
||||
* Helper functions
|
||||
@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
|
||||
void *arg);
|
||||
static void create_cursor(ForeignScanState *node);
|
||||
static void fetch_more_data(ForeignScanState *node);
|
||||
static void close_cursor(PGconn *conn, unsigned int cursor_number);
|
||||
static void close_cursor(PGconn *conn, unsigned int cursor_number,
|
||||
PgFdwConnState *conn_state);
|
||||
static PgFdwModifyState *create_foreign_modify(EState *estate,
|
||||
RangeTblEntry *rte,
|
||||
ResultRelInfo *resultRelInfo,
|
||||
@ -491,6 +504,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
||||
double *totaldeadrows);
|
||||
static void analyze_row_processor(PGresult *res, int row,
|
||||
PgFdwAnalyzeState *astate);
|
||||
static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
|
||||
static void fetch_more_data_begin(AsyncRequest *areq);
|
||||
static HeapTuple make_tuple_from_result_row(PGresult *res,
|
||||
int row,
|
||||
Relation rel,
|
||||
@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
|
||||
/* Support functions for upper relation push-down */
|
||||
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
|
||||
|
||||
/* Support functions for asynchronous execution */
|
||||
routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
|
||||
routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
|
||||
routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
|
||||
routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
|
||||
|
||||
PG_RETURN_POINTER(routine);
|
||||
}
|
||||
|
||||
@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root,
|
||||
|
||||
/*
|
||||
* Extract user-settable option values. Note that per-table settings of
|
||||
* use_remote_estimate and fetch_size override per-server settings of
|
||||
* them, respectively.
|
||||
* use_remote_estimate, fetch_size and async_capable override per-server
|
||||
* settings of them, respectively.
|
||||
*/
|
||||
fpinfo->use_remote_estimate = false;
|
||||
fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
|
||||
fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
|
||||
fpinfo->shippable_extensions = NIL;
|
||||
fpinfo->fetch_size = 100;
|
||||
fpinfo->async_capable = false;
|
||||
|
||||
apply_server_options(fpinfo);
|
||||
apply_table_options(fpinfo);
|
||||
@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
|
||||
* Get connection to the foreign server. Connection manager will
|
||||
* establish new connection if necessary.
|
||||
*/
|
||||
fsstate->conn = GetConnection(user, false);
|
||||
fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
|
||||
|
||||
/* Assign a unique ID for my cursor */
|
||||
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
|
||||
@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
|
||||
&fsstate->param_flinfo,
|
||||
&fsstate->param_exprs,
|
||||
&fsstate->param_values);
|
||||
|
||||
/* Set the async-capable flag */
|
||||
fsstate->async_capable = node->ss.ps.plan->async_capable;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node)
|
||||
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
|
||||
|
||||
/*
|
||||
* If this is the first call after Begin or ReScan, we need to create the
|
||||
* cursor on the remote side.
|
||||
* In sync mode, if this is the first call after Begin or ReScan, we need
|
||||
* to create the cursor on the remote side. In async mode, we would have
|
||||
* already created the cursor before we get here, even if this is the
|
||||
* first call after Begin or ReScan.
|
||||
*/
|
||||
if (!fsstate->cursor_exists)
|
||||
create_cursor(node);
|
||||
@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node)
|
||||
*/
|
||||
if (fsstate->next_tuple >= fsstate->num_tuples)
|
||||
{
|
||||
/* In async mode, just clear tuple slot. */
|
||||
if (fsstate->async_capable)
|
||||
return ExecClearTuple(slot);
|
||||
/* No point in another fetch if we already detected EOF, though. */
|
||||
if (!fsstate->eof_reached)
|
||||
fetch_more_data(node);
|
||||
@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node)
|
||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
||||
* without releasing the PGresult.
|
||||
*/
|
||||
res = pgfdw_exec_query(fsstate->conn, sql);
|
||||
res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
|
||||
PQclear(res);
|
||||
@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node)
|
||||
|
||||
/* Close the cursor if open, to prevent accumulation of cursors */
|
||||
if (fsstate->cursor_exists)
|
||||
close_cursor(fsstate->conn, fsstate->cursor_number);
|
||||
close_cursor(fsstate->conn, fsstate->cursor_number,
|
||||
fsstate->conn_state);
|
||||
|
||||
/* Release remote connection */
|
||||
ReleaseConnection(fsstate->conn);
|
||||
@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
|
||||
* Get connection to the foreign server. Connection manager will
|
||||
* establish new connection if necessary.
|
||||
*/
|
||||
dmstate->conn = GetConnection(user, false);
|
||||
dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
|
||||
|
||||
/* Update the foreign-join-related fields. */
|
||||
if (fsplan->scan.scanrelid == 0)
|
||||
@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root,
|
||||
false, &retrieved_attrs, NULL);
|
||||
|
||||
/* Get the remote estimate */
|
||||
conn = GetConnection(fpinfo->user, false);
|
||||
conn = GetConnection(fpinfo->user, false, NULL);
|
||||
get_remote_estimate(sql.data, conn, &rows, &width,
|
||||
&startup_cost, &total_cost);
|
||||
ReleaseConnection(conn);
|
||||
@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
|
||||
/*
|
||||
* Execute EXPLAIN remotely.
|
||||
*/
|
||||
res = pgfdw_exec_query(conn, sql);
|
||||
res = pgfdw_exec_query(conn, sql, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, false, sql);
|
||||
|
||||
@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node)
|
||||
StringInfoData buf;
|
||||
PGresult *res;
|
||||
|
||||
/* First, process a pending asynchronous request, if any. */
|
||||
if (fsstate->conn_state->pendingAreq)
|
||||
process_pending_request(fsstate->conn_state->pendingAreq);
|
||||
|
||||
/*
|
||||
* Construct array of query parameter values in text format. We do the
|
||||
* conversions in the short-lived per-tuple context, so as not to cause a
|
||||
@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node)
|
||||
PG_TRY();
|
||||
{
|
||||
PGconn *conn = fsstate->conn;
|
||||
char sql[64];
|
||||
int numrows;
|
||||
int i;
|
||||
|
||||
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
|
||||
fsstate->fetch_size, fsstate->cursor_number);
|
||||
if (fsstate->async_capable)
|
||||
{
|
||||
Assert(fsstate->conn_state->pendingAreq);
|
||||
|
||||
res = pgfdw_exec_query(conn, sql);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
|
||||
/*
|
||||
* The query was already sent by an earlier call to
|
||||
* fetch_more_data_begin. So now we just fetch the result.
|
||||
*/
|
||||
res = pgfdw_get_result(conn, fsstate->query);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
|
||||
|
||||
/* Reset per-connection state */
|
||||
fsstate->conn_state->pendingAreq = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
char sql[64];
|
||||
|
||||
/* This is a regular synchronous fetch. */
|
||||
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
|
||||
fsstate->fetch_size, fsstate->cursor_number);
|
||||
|
||||
res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
|
||||
}
|
||||
|
||||
/* Convert the data into HeapTuples */
|
||||
numrows = PQntuples(res);
|
||||
@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel)
|
||||
* Utility routine to close a cursor.
|
||||
*/
|
||||
static void
|
||||
close_cursor(PGconn *conn, unsigned int cursor_number)
|
||||
close_cursor(PGconn *conn, unsigned int cursor_number,
|
||||
PgFdwConnState *conn_state)
|
||||
{
|
||||
char sql[64];
|
||||
PGresult *res;
|
||||
@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
|
||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
||||
* without releasing the PGresult.
|
||||
*/
|
||||
res = pgfdw_exec_query(conn, sql);
|
||||
res = pgfdw_exec_query(conn, sql, conn_state);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, true, sql);
|
||||
PQclear(res);
|
||||
@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate,
|
||||
user = GetUserMapping(userid, table->serverid);
|
||||
|
||||
/* Open connection; report that we'll create a prepared statement. */
|
||||
fmstate->conn = GetConnection(user, true);
|
||||
fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
|
||||
fmstate->p_name = NULL; /* prepared statement not made yet */
|
||||
|
||||
/* Set up remote query information. */
|
||||
@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate,
|
||||
operation == CMD_UPDATE ||
|
||||
operation == CMD_DELETE);
|
||||
|
||||
/* First, process a pending asynchronous request, if any. */
|
||||
if (fmstate->conn_state->pendingAreq)
|
||||
process_pending_request(fmstate->conn_state->pendingAreq);
|
||||
|
||||
/*
|
||||
* If the existing query was deparsed and prepared for a different number
|
||||
* of rows, rebuild it for the proper number.
|
||||
@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
|
||||
char *p_name;
|
||||
PGresult *res;
|
||||
|
||||
/*
|
||||
* The caller would already have processed a pending asynchronous request
|
||||
* if any, so no need to do it here.
|
||||
*/
|
||||
|
||||
/* Construct name we'll use for the prepared statement. */
|
||||
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
|
||||
GetPrepStmtNumber(fmstate->conn));
|
||||
@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate)
|
||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
||||
* without releasing the PGresult.
|
||||
*/
|
||||
res = pgfdw_exec_query(fmstate->conn, sql);
|
||||
res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
|
||||
PQclear(res);
|
||||
@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node)
|
||||
int numParams = dmstate->numParams;
|
||||
const char **values = dmstate->param_values;
|
||||
|
||||
/* First, process a pending asynchronous request, if any. */
|
||||
if (dmstate->conn_state->pendingAreq)
|
||||
process_pending_request(dmstate->conn_state->pendingAreq);
|
||||
|
||||
/*
|
||||
* Construct array of query parameter values in text format.
|
||||
*/
|
||||
@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation,
|
||||
*/
|
||||
table = GetForeignTable(RelationGetRelid(relation));
|
||||
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
|
||||
conn = GetConnection(user, false);
|
||||
conn = GetConnection(user, false, NULL);
|
||||
|
||||
/*
|
||||
* Construct command to get page count for relation.
|
||||
@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation,
|
||||
/* In what follows, do not risk leaking any PGresults. */
|
||||
PG_TRY();
|
||||
{
|
||||
res = pgfdw_exec_query(conn, sql.data);
|
||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, false, sql.data);
|
||||
|
||||
@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
||||
table = GetForeignTable(RelationGetRelid(relation));
|
||||
server = GetForeignServer(table->serverid);
|
||||
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
|
||||
conn = GetConnection(user, false);
|
||||
conn = GetConnection(user, false, NULL);
|
||||
|
||||
/*
|
||||
* Construct cursor that retrieves whole rows from remote.
|
||||
@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
||||
int fetch_size;
|
||||
ListCell *lc;
|
||||
|
||||
res = pgfdw_exec_query(conn, sql.data);
|
||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, false, sql.data);
|
||||
PQclear(res);
|
||||
@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
||||
*/
|
||||
|
||||
/* Fetch some rows */
|
||||
res = pgfdw_exec_query(conn, fetch_sql);
|
||||
res = pgfdw_exec_query(conn, fetch_sql, NULL);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, false, sql.data);
|
||||
@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
||||
}
|
||||
|
||||
/* Close the cursor, just to be tidy. */
|
||||
close_cursor(conn, cursor_number);
|
||||
close_cursor(conn, cursor_number, NULL);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
|
||||
*/
|
||||
server = GetForeignServer(serverOid);
|
||||
mapping = GetUserMapping(GetUserId(), server->serverid);
|
||||
conn = GetConnection(mapping, false);
|
||||
conn = GetConnection(mapping, false, NULL);
|
||||
|
||||
/* Don't attempt to import collation if remote server hasn't got it */
|
||||
if (PQserverVersion(conn) < 90100)
|
||||
@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
|
||||
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
|
||||
deparseStringLiteral(&buf, stmt->remote_schema);
|
||||
|
||||
res = pgfdw_exec_query(conn, buf.data);
|
||||
res = pgfdw_exec_query(conn, buf.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, false, buf.data);
|
||||
|
||||
@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
|
||||
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
|
||||
|
||||
/* Fetch the data */
|
||||
res = pgfdw_exec_query(conn, buf.data);
|
||||
res = pgfdw_exec_query(conn, buf.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, false, buf.data);
|
||||
|
||||
@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
|
||||
ExtractExtensionList(defGetString(def), false);
|
||||
else if (strcmp(def->defname, "fetch_size") == 0)
|
||||
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
|
||||
else if (strcmp(def->defname, "async_capable") == 0)
|
||||
fpinfo->async_capable = defGetBoolean(def);
|
||||
}
|
||||
}
|
||||
|
||||
@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
|
||||
fpinfo->use_remote_estimate = defGetBoolean(def);
|
||||
else if (strcmp(def->defname, "fetch_size") == 0)
|
||||
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
|
||||
else if (strcmp(def->defname, "async_capable") == 0)
|
||||
fpinfo->async_capable = defGetBoolean(def);
|
||||
}
|
||||
}
|
||||
|
||||
@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
|
||||
fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
|
||||
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
|
||||
fpinfo->fetch_size = fpinfo_o->fetch_size;
|
||||
fpinfo->async_capable = fpinfo_o->async_capable;
|
||||
|
||||
/* Merge the table level options from either side of the join. */
|
||||
if (fpinfo_i)
|
||||
@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
|
||||
* relation sizes.
|
||||
*/
|
||||
fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
|
||||
|
||||
/*
|
||||
* We'll prefer to consider this join async-capable if any table from
|
||||
* either side of the join is considered async-capable.
|
||||
*/
|
||||
fpinfo->async_capable = fpinfo_o->async_capable ||
|
||||
fpinfo_i->async_capable;
|
||||
}
|
||||
}
|
||||
|
||||
@ -6489,6 +6571,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
|
||||
add_path(final_rel, (Path *) final_path);
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresIsForeignPathAsyncCapable
|
||||
* Check whether a given ForeignPath node is async-capable.
|
||||
*/
|
||||
static bool
|
||||
postgresIsForeignPathAsyncCapable(ForeignPath *path)
|
||||
{
|
||||
RelOptInfo *rel = ((Path *) path)->parent;
|
||||
PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
|
||||
|
||||
return fpinfo->async_capable;
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresForeignAsyncRequest
|
||||
* Asynchronously request next tuple from a foreign PostgreSQL table.
|
||||
*/
|
||||
static void
|
||||
postgresForeignAsyncRequest(AsyncRequest *areq)
|
||||
{
|
||||
produce_tuple_asynchronously(areq, true);
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresForeignAsyncConfigureWait
|
||||
* Configure a file descriptor event for which we wish to wait.
|
||||
*/
|
||||
static void
|
||||
postgresForeignAsyncConfigureWait(AsyncRequest *areq)
|
||||
{
|
||||
ForeignScanState *node = (ForeignScanState *) areq->requestee;
|
||||
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
|
||||
AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
|
||||
AppendState *requestor = (AppendState *) areq->requestor;
|
||||
WaitEventSet *set = requestor->as_eventset;
|
||||
|
||||
/* This should not be called unless callback_pending */
|
||||
Assert(areq->callback_pending);
|
||||
|
||||
/* The core code would have registered postmaster death event */
|
||||
Assert(GetNumRegisteredWaitEvents(set) >= 1);
|
||||
|
||||
/* Begin an asynchronous data fetch if not already done */
|
||||
if (!pendingAreq)
|
||||
fetch_more_data_begin(areq);
|
||||
else if (pendingAreq->requestor != areq->requestor)
|
||||
{
|
||||
/*
|
||||
* This is the case when the in-process request was made by another
|
||||
* Append. Note that it might be useless to process the request,
|
||||
* because the query might not need tuples from that Append anymore.
|
||||
* Skip the given request if there are any configured events other
|
||||
* than the postmaster death event; otherwise process the request,
|
||||
* then begin a fetch to configure the event below, because otherwise
|
||||
* we might end up with no configured events other than the postmaster
|
||||
* death event.
|
||||
*/
|
||||
if (GetNumRegisteredWaitEvents(set) > 1)
|
||||
return;
|
||||
process_pending_request(pendingAreq);
|
||||
fetch_more_data_begin(areq);
|
||||
}
|
||||
else if (pendingAreq->requestee != areq->requestee)
|
||||
{
|
||||
/*
|
||||
* This is the case when the in-process request was made by the same
|
||||
* parent but for a different child. Since we configure only the
|
||||
* event for the request made for that child, skip the given request.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
else
|
||||
Assert(pendingAreq == areq);
|
||||
|
||||
AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
|
||||
NULL, areq);
|
||||
}
|
||||
|
||||
/*
|
||||
* postgresForeignAsyncNotify
|
||||
* Fetch some more tuples from a file descriptor that becomes ready,
|
||||
* requesting next tuple.
|
||||
*/
|
||||
static void
|
||||
postgresForeignAsyncNotify(AsyncRequest *areq)
|
||||
{
|
||||
ForeignScanState *node = (ForeignScanState *) areq->requestee;
|
||||
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
|
||||
|
||||
/* The request should be currently in-process */
|
||||
Assert(fsstate->conn_state->pendingAreq == areq);
|
||||
|
||||
/* The core code would have initialized the callback_pending flag */
|
||||
Assert(!areq->callback_pending);
|
||||
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (!PQconsumeInput(fsstate->conn))
|
||||
pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
|
||||
|
||||
fetch_more_data(node);
|
||||
|
||||
produce_tuple_asynchronously(areq, true);
|
||||
}
|
||||
|
||||
/*
|
||||
* Asynchronously produce next tuple from a foreign PostgreSQL table.
|
||||
*/
|
||||
static void
|
||||
produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
|
||||
{
|
||||
ForeignScanState *node = (ForeignScanState *) areq->requestee;
|
||||
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
|
||||
AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
|
||||
TupleTableSlot *result;
|
||||
|
||||
/* This should not be called if the request is currently in-process */
|
||||
Assert(areq != pendingAreq);
|
||||
|
||||
/* Fetch some more tuples, if we've run out */
|
||||
if (fsstate->next_tuple >= fsstate->num_tuples)
|
||||
{
|
||||
/* No point in another fetch if we already detected EOF, though */
|
||||
if (!fsstate->eof_reached)
|
||||
{
|
||||
/* Mark the request as pending for a callback */
|
||||
ExecAsyncRequestPending(areq);
|
||||
/* Begin another fetch if requested and if no pending request */
|
||||
if (fetch && !pendingAreq)
|
||||
fetch_more_data_begin(areq);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* There's nothing more to do; just return a NULL pointer */
|
||||
result = NULL;
|
||||
/* Mark the request as complete */
|
||||
ExecAsyncRequestDone(areq, result);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* Get a tuple from the ForeignScan node */
|
||||
result = ExecProcNode((PlanState *) node);
|
||||
if (!TupIsNull(result))
|
||||
{
|
||||
/* Mark the request as complete */
|
||||
ExecAsyncRequestDone(areq, result);
|
||||
return;
|
||||
}
|
||||
Assert(fsstate->next_tuple >= fsstate->num_tuples);
|
||||
|
||||
/* Fetch some more tuples, if we've not detected EOF yet */
|
||||
if (!fsstate->eof_reached)
|
||||
{
|
||||
/* Mark the request as pending for a callback */
|
||||
ExecAsyncRequestPending(areq);
|
||||
/* Begin another fetch if requested and if no pending request */
|
||||
if (fetch && !pendingAreq)
|
||||
fetch_more_data_begin(areq);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* There's nothing more to do; just return a NULL pointer */
|
||||
result = NULL;
|
||||
/* Mark the request as complete */
|
||||
ExecAsyncRequestDone(areq, result);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Begin an asynchronous data fetch.
|
||||
*
|
||||
* Note: fetch_more_data must be called to fetch the result.
|
||||
*/
|
||||
static void
|
||||
fetch_more_data_begin(AsyncRequest *areq)
|
||||
{
|
||||
ForeignScanState *node = (ForeignScanState *) areq->requestee;
|
||||
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
|
||||
char sql[64];
|
||||
|
||||
Assert(!fsstate->conn_state->pendingAreq);
|
||||
|
||||
/* Create the cursor synchronously. */
|
||||
if (!fsstate->cursor_exists)
|
||||
create_cursor(node);
|
||||
|
||||
/* We will send this query, but not wait for the response. */
|
||||
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
|
||||
fsstate->fetch_size, fsstate->cursor_number);
|
||||
|
||||
if (PQsendQuery(fsstate->conn, sql) < 0)
|
||||
pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
|
||||
|
||||
/* Remember that the request is in process */
|
||||
fsstate->conn_state->pendingAreq = areq;
|
||||
}
|
||||
|
||||
/*
|
||||
* Process a pending asynchronous request.
|
||||
*/
|
||||
void
|
||||
process_pending_request(AsyncRequest *areq)
|
||||
{
|
||||
ForeignScanState *node = (ForeignScanState *) areq->requestee;
|
||||
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
|
||||
EState *estate = node->ss.ps.state;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
/* The request should be currently in-process */
|
||||
Assert(fsstate->conn_state->pendingAreq == areq);
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
|
||||
|
||||
/* The request would have been pending for a callback */
|
||||
Assert(areq->callback_pending);
|
||||
|
||||
/* Unlike AsyncNotify, we unset callback_pending ourselves */
|
||||
areq->callback_pending = false;
|
||||
|
||||
fetch_more_data(node);
|
||||
|
||||
/* We need to send a new query afterwards; don't fetch */
|
||||
produce_tuple_asynchronously(areq, false);
|
||||
|
||||
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
|
||||
ExecAsyncResponse(areq);
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a tuple from the specified row of the PGresult.
|
||||
*
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "foreign/foreign.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "libpq-fe.h"
|
||||
#include "nodes/execnodes.h"
|
||||
#include "nodes/pathnodes.h"
|
||||
#include "utils/relcache.h"
|
||||
|
||||
@ -78,6 +79,7 @@ typedef struct PgFdwRelationInfo
|
||||
Cost fdw_startup_cost;
|
||||
Cost fdw_tuple_cost;
|
||||
List *shippable_extensions; /* OIDs of shippable extensions */
|
||||
bool async_capable;
|
||||
|
||||
/* Cached catalog information. */
|
||||
ForeignTable *table;
|
||||
@ -124,17 +126,28 @@ typedef struct PgFdwRelationInfo
|
||||
int relation_index;
|
||||
} PgFdwRelationInfo;
|
||||
|
||||
/*
|
||||
* Extra control information relating to a connection.
|
||||
*/
|
||||
typedef struct PgFdwConnState
|
||||
{
|
||||
AsyncRequest *pendingAreq; /* pending async request */
|
||||
} PgFdwConnState;
|
||||
|
||||
/* in postgres_fdw.c */
|
||||
extern int set_transmission_modes(void);
|
||||
extern void reset_transmission_modes(int nestlevel);
|
||||
extern void process_pending_request(AsyncRequest *areq);
|
||||
|
||||
/* in connection.c */
|
||||
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
|
||||
extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
|
||||
PgFdwConnState **state);
|
||||
extern void ReleaseConnection(PGconn *conn);
|
||||
extern unsigned int GetCursorNumber(PGconn *conn);
|
||||
extern unsigned int GetPrepStmtNumber(PGconn *conn);
|
||||
extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
|
||||
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
|
||||
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
|
||||
PgFdwConnState *state);
|
||||
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
|
||||
bool clear, const char *sql);
|
||||
|
||||
|
@ -2928,3 +2928,198 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
|
||||
|
||||
-- Clean up
|
||||
DROP TABLE batch_table, batch_cp_upd_test CASCADE;
|
||||
|
||||
-- ===================================================================
|
||||
-- test asynchronous execution
|
||||
-- ===================================================================
|
||||
|
||||
ALTER SERVER loopback OPTIONS (DROP extensions);
|
||||
ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
|
||||
ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
|
||||
|
||||
CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
|
||||
CREATE TABLE base_tbl1 (a int, b int, c text);
|
||||
CREATE TABLE base_tbl2 (a int, b int, c text);
|
||||
CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
|
||||
SERVER loopback OPTIONS (table_name 'base_tbl1');
|
||||
CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
|
||||
SERVER loopback2 OPTIONS (table_name 'base_tbl2');
|
||||
INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
|
||||
INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
|
||||
ANALYZE async_pt;
|
||||
|
||||
-- simple queries
|
||||
CREATE TABLE result_tbl (a int, b int, c text);
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
|
||||
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
DELETE FROM result_tbl;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
DELETE FROM result_tbl;
|
||||
|
||||
-- Check case where multiple partitions use the same connection
|
||||
CREATE TABLE base_tbl3 (a int, b int, c text);
|
||||
CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
|
||||
SERVER loopback2 OPTIONS (table_name 'base_tbl3');
|
||||
INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
|
||||
ANALYZE async_pt;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
DELETE FROM result_tbl;
|
||||
|
||||
DROP FOREIGN TABLE async_p3;
|
||||
DROP TABLE base_tbl3;
|
||||
|
||||
-- Check case where the partitioned table has local/remote partitions
|
||||
CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
|
||||
INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
|
||||
ANALYZE async_pt;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
|
||||
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
DELETE FROM result_tbl;
|
||||
|
||||
-- partitionwise joins
|
||||
SET enable_partitionwise_join TO true;
|
||||
|
||||
CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
|
||||
INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
|
||||
|
||||
SELECT * FROM join_tbl ORDER BY a1;
|
||||
DELETE FROM join_tbl;
|
||||
|
||||
RESET enable_partitionwise_join;
|
||||
|
||||
-- Test interaction of async execution with plan-time partition pruning
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM async_pt WHERE a < 3000;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM async_pt WHERE a < 2000;
|
||||
|
||||
-- Test interaction of async execution with run-time partition pruning
|
||||
SET plan_cache_mode TO force_generic_plan;
|
||||
|
||||
PREPARE async_pt_query (int, int) AS
|
||||
INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
EXECUTE async_pt_query (3000, 505);
|
||||
EXECUTE async_pt_query (3000, 505);
|
||||
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
DELETE FROM result_tbl;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
EXECUTE async_pt_query (2000, 505);
|
||||
EXECUTE async_pt_query (2000, 505);
|
||||
|
||||
SELECT * FROM result_tbl ORDER BY a;
|
||||
DELETE FROM result_tbl;
|
||||
|
||||
RESET plan_cache_mode;
|
||||
|
||||
CREATE TABLE local_tbl(a int, b int, c text);
|
||||
INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
|
||||
ANALYZE local_tbl;
|
||||
|
||||
CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
|
||||
CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
|
||||
CREATE INDEX async_p3_idx ON async_p3 (a);
|
||||
ANALYZE base_tbl1;
|
||||
ANALYZE base_tbl2;
|
||||
ANALYZE async_p3;
|
||||
|
||||
ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
|
||||
ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
|
||||
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
|
||||
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
|
||||
SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
|
||||
|
||||
ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
|
||||
ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
|
||||
|
||||
DROP TABLE local_tbl;
|
||||
DROP INDEX base_tbl1_idx;
|
||||
DROP INDEX base_tbl2_idx;
|
||||
DROP INDEX async_p3_idx;
|
||||
|
||||
-- Test that pending requests are processed properly
|
||||
SET enable_mergejoin TO false;
|
||||
SET enable_hashjoin TO false;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
|
||||
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
|
||||
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
|
||||
|
||||
-- Check with foreign modify
|
||||
CREATE TABLE local_tbl (a int, b int, c text);
|
||||
INSERT INTO local_tbl VALUES (1505, 505, 'foo');
|
||||
|
||||
CREATE TABLE base_tbl3 (a int, b int, c text);
|
||||
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
|
||||
SERVER loopback OPTIONS (table_name 'base_tbl3');
|
||||
INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
|
||||
|
||||
CREATE TABLE base_tbl4 (a int, b int, c text);
|
||||
CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
|
||||
SERVER loopback OPTIONS (table_name 'base_tbl4');
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
|
||||
INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
|
||||
|
||||
SELECT * FROM insert_tbl ORDER BY a;
|
||||
|
||||
-- Check with direct modify
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
|
||||
INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
|
||||
WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
|
||||
INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
|
||||
|
||||
SELECT * FROM join_tbl ORDER BY a1;
|
||||
DELETE FROM join_tbl;
|
||||
|
||||
RESET enable_mergejoin;
|
||||
RESET enable_hashjoin;
|
||||
|
||||
-- Clean up
|
||||
DROP TABLE async_pt;
|
||||
DROP TABLE base_tbl1;
|
||||
DROP TABLE base_tbl2;
|
||||
DROP TABLE result_tbl;
|
||||
DROP TABLE local_tbl;
|
||||
DROP FOREIGN TABLE remote_tbl;
|
||||
DROP FOREIGN TABLE insert_tbl;
|
||||
DROP TABLE base_tbl3;
|
||||
DROP TABLE base_tbl4;
|
||||
DROP TABLE join_tbl;
|
||||
|
||||
ALTER SERVER loopback OPTIONS (DROP async_capable);
|
||||
ALTER SERVER loopback2 OPTIONS (DROP async_capable);
|
||||
|
Reference in New Issue
Block a user