From 04e706d4238f98a98e1c0b1a02db9d4280b96f04 Mon Sep 17 00:00:00 2001 From: Etsuro Fujita Date: Thu, 24 Feb 2022 14:30:00 +0900 Subject: [PATCH] postgres_fdw: Add support for parallel commit. postgres_fdw commits remote (sub)transactions opened on remote server(s) in a local (sub)transaction one by one when the local (sub)transaction commits. This patch allows it to commit the remote (sub)transactions in parallel to improve performance. This is enabled by the server option "parallel_commit". The default is false. Etsuro Fujita, reviewed by Fujii Masao and David Zhang. Discussion: http://postgr.es/m/CAPmGK17dAZCXvwnfpr1eTfknTGdt%3DhYTV9405Gt5SqPOX8K84w%40mail.gmail.com --- contrib/postgres_fdw/connection.c | 223 ++++++++++++++++-- .../postgres_fdw/expected/postgres_fdw.out | 78 +++++- contrib/postgres_fdw/option.c | 2 + contrib/postgres_fdw/sql/postgres_fdw.sql | 46 ++++ doc/src/sgml/postgres-fdw.sgml | 46 ++++ 5 files changed, 376 insertions(+), 19 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index f753c6e2324..8c64d42dda2 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -58,6 +58,7 @@ typedef struct ConnCacheEntry bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ + bool parallel_commit; /* do we commit (sub)xacts in parallel? */ bool invalidated; /* true if reconnect is pending */ bool keep_connections; /* setting value of keep_connections * server option */ @@ -92,6 +93,9 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); static void configure_remote_session(PGconn *conn); +static void do_sql_command_begin(PGconn *conn, const char *sql); +static void do_sql_command_end(PGconn *conn, const char *sql, + bool consume_input); static void begin_remote_xact(ConnCacheEntry *entry); static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_subxact_callback(SubXactEvent event, @@ -100,6 +104,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, void *arg); static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); +static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel); static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); @@ -107,6 +112,9 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out); static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel); +static void pgfdw_finish_pre_commit_cleanup(List *pending_entries); +static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, + int curlevel); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); @@ -316,14 +324,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) * is changed will be closed and re-made later. * * By default, all the connections to any foreign servers are kept open. + * + * Also determine whether to commit (sub)transactions opened on the remote + * server in parallel at (sub)transaction end. */ entry->keep_connections = true; + entry->parallel_commit = false; foreach(lc, server->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "keep_connections") == 0) entry->keep_connections = defGetBoolean(def); + else if (strcmp(def->defname, "parallel_commit") == 0) + entry->parallel_commit = defGetBoolean(def); } /* Now try to make the connection */ @@ -622,10 +636,30 @@ configure_remote_session(PGconn *conn) */ void do_sql_command(PGconn *conn, const char *sql) +{ + do_sql_command_begin(conn, sql); + do_sql_command_end(conn, sql, false); +} + +static void +do_sql_command_begin(PGconn *conn, const char *sql) +{ + if (!PQsendQuery(conn, sql)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); +} + +static void +do_sql_command_end(PGconn *conn, const char *sql, bool consume_input) { PGresult *res; - if (!PQsendQuery(conn, sql)) + /* + * If requested, consume whatever data is available from the socket. + * (Note that if all data is available, this allows pgfdw_get_result to + * call PQgetResult without forcing the overhead of WaitLatchOrSocket, + * which would be large compared to the overhead of PQconsumeInput.) + */ + if (consume_input && !PQconsumeInput(conn)) pgfdw_report_error(ERROR, NULL, conn, false, sql); res = pgfdw_get_result(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -888,6 +922,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) { HASH_SEQ_STATUS scan; ConnCacheEntry *entry; + List *pending_entries = NIL; /* Quick exit if no connections were touched in this transaction. */ if (!xact_got_connection) @@ -925,6 +960,12 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Commit all remote transactions during pre-commit */ entry->changing_xact_state = true; + if (entry->parallel_commit) + { + do_sql_command_begin(entry->conn, "COMMIT TRANSACTION"); + pending_entries = lappend(pending_entries, entry); + continue; + } do_sql_command(entry->conn, "COMMIT TRANSACTION"); entry->changing_xact_state = false; @@ -981,23 +1022,15 @@ pgfdw_xact_callback(XactEvent event, void *arg) } /* Reset state to show we're out of a transaction */ - entry->xact_depth = 0; + pgfdw_reset_xact_state(entry, true); + } - /* - * If the connection isn't in a good idle state, it is marked as - * invalid or keep_connections option of its server is disabled, then - * discard it to recover. Next GetConnection will open a new - * connection. - */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE || - entry->changing_xact_state || - entry->invalidated || - !entry->keep_connections) - { - elog(DEBUG3, "discarding connection %p", entry->conn); - disconnect_pg_server(entry); - } + /* If there are any pending connections, finish cleaning them up */ + if (pending_entries) + { + Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT || + event == XACT_EVENT_PRE_COMMIT); + pgfdw_finish_pre_commit_cleanup(pending_entries); } /* @@ -1021,6 +1054,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, HASH_SEQ_STATUS scan; ConnCacheEntry *entry; int curlevel; + List *pending_entries = NIL; /* Nothing to do at subxact start, nor after commit. */ if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB || @@ -1063,6 +1097,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, /* Commit all remote subtransactions during pre-commit */ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); entry->changing_xact_state = true; + if (entry->parallel_commit) + { + do_sql_command_begin(entry->conn, sql); + pending_entries = lappend(pending_entries, entry); + continue; + } do_sql_command(entry->conn, sql); entry->changing_xact_state = false; } @@ -1076,7 +1116,14 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, } /* OK, we're outta that level of subtransaction */ - entry->xact_depth--; + pgfdw_reset_xact_state(entry, false); + } + + /* If there are any pending connections, finish cleaning them up */ + if (pending_entries) + { + Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB); + pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel); } } @@ -1169,6 +1216,40 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) server->servername))); } +/* + * Reset state to show we're out of a (sub)transaction. + */ +static void +pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) +{ + if (toplevel) + { + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + + /* + * If the connection isn't in a good idle state, it is marked as + * invalid or keep_connections option of its server is disabled, then + * discard it to recover. Next GetConnection will open a new + * connection. + */ + if (PQstatus(entry->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn) != PQTRANS_IDLE || + entry->changing_xact_state || + entry->invalidated || + !entry->keep_connections) + { + elog(DEBUG3, "discarding connection %p", entry->conn); + disconnect_pg_server(entry); + } + } + else + { + /* Reset state to show we're out of a subtransaction */ + entry->xact_depth--; + } +} + /* * Cancel the currently-in-progress query (whose query text we do not have) * and ignore the result. Returns true if we successfully cancel the query @@ -1456,6 +1537,112 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel) entry->changing_xact_state = false; } +/* + * Finish pre-commit cleanup of connections on each of which we've sent a + * COMMIT command to the remote server. + */ +static void +pgfdw_finish_pre_commit_cleanup(List *pending_entries) +{ + ConnCacheEntry *entry; + List *pending_deallocs = NIL; + ListCell *lc; + + Assert(pending_entries); + + /* + * Get the result of the COMMIT command for each of the pending entries + */ + foreach(lc, pending_entries) + { + entry = (ConnCacheEntry *) lfirst(lc); + + Assert(entry->changing_xact_state); + /* + * We might already have received the result on the socket, so pass + * consume_input=true to try to consume it first + */ + do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true); + entry->changing_xact_state = false; + + /* Do a DEALLOCATE ALL in parallel if needed */ + if (entry->have_prep_stmt && entry->have_error) + { + /* Ignore errors (see notes in pgfdw_xact_callback) */ + if (PQsendQuery(entry->conn, "DEALLOCATE ALL")) + { + pending_deallocs = lappend(pending_deallocs, entry); + continue; + } + } + entry->have_prep_stmt = false; + entry->have_error = false; + + pgfdw_reset_xact_state(entry, true); + } + + /* No further work if no pending entries */ + if (!pending_deallocs) + return; + + /* + * Get the result of the DEALLOCATE command for each of the pending + * entries + */ + foreach(lc, pending_deallocs) + { + PGresult *res; + + entry = (ConnCacheEntry *) lfirst(lc); + + /* Ignore errors (see notes in pgfdw_xact_callback) */ + while ((res = PQgetResult(entry->conn)) != NULL) + { + PQclear(res); + /* Stop if the connection is lost (else we'll loop infinitely) */ + if (PQstatus(entry->conn) == CONNECTION_BAD) + break; + } + entry->have_prep_stmt = false; + entry->have_error = false; + + pgfdw_reset_xact_state(entry, true); + } +} + +/* + * Finish pre-subcommit cleanup of connections on each of which we've sent a + * RELEASE command to the remote server. + */ +static void +pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel) +{ + ConnCacheEntry *entry; + char sql[100]; + ListCell *lc; + + Assert(pending_entries); + + /* + * Get the result of the RELEASE command for each of the pending entries + */ + snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); + foreach(lc, pending_entries) + { + entry = (ConnCacheEntry *) lfirst(lc); + + Assert(entry->changing_xact_state); + /* + * We might already have received the result on the socket, so pass + * consume_input=true to try to consume it first + */ + do_sql_command_end(entry->conn, sql, true); + entry->changing_xact_state = false; + + pgfdw_reset_xact_state(entry, false); + } +} + /* * List active foreign server connections. * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 057342083c6..f210f911880 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9509,7 +9509,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, keep_connections +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, parallel_commit, keep_connections CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different @@ -10933,3 +10933,79 @@ SELECT pg_terminate_backend(pid, 180000) FROM pg_stat_activity --Clean up RESET postgres_fdw.application_name; RESET debug_discard_caches; +-- =================================================================== +-- test parallel commit +-- =================================================================== +ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); +ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true'); +CREATE TABLE ploc1 (f1 int, f2 text); +CREATE FOREIGN TABLE prem1 (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'ploc1'); +CREATE TABLE ploc2 (f1 int, f2 text); +CREATE FOREIGN TABLE prem2 (f1 int, f2 text) + SERVER loopback2 OPTIONS (table_name 'ploc2'); +BEGIN; +INSERT INTO prem1 VALUES (101, 'foo'); +INSERT INTO prem2 VALUES (201, 'bar'); +COMMIT; +SELECT * FROM prem1; + f1 | f2 +-----+----- + 101 | foo +(1 row) + +SELECT * FROM prem2; + f1 | f2 +-----+----- + 201 | bar +(1 row) + +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (102, 'foofoo'); +INSERT INTO prem2 VALUES (202, 'barbar'); +RELEASE SAVEPOINT s; +COMMIT; +SELECT * FROM prem1; + f1 | f2 +-----+-------- + 101 | foo + 102 | foofoo +(2 rows) + +SELECT * FROM prem2; + f1 | f2 +-----+-------- + 201 | bar + 202 | barbar +(2 rows) + +-- This tests executing DEALLOCATE ALL against foreign servers in parallel +-- during pre-commit +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (103, 'baz'); +INSERT INTO prem2 VALUES (203, 'qux'); +ROLLBACK TO SAVEPOINT s; +RELEASE SAVEPOINT s; +INSERT INTO prem1 VALUES (104, 'bazbaz'); +INSERT INTO prem2 VALUES (204, 'quxqux'); +COMMIT; +SELECT * FROM prem1; + f1 | f2 +-----+-------- + 101 | foo + 102 | foofoo + 104 | bazbaz +(3 rows) + +SELECT * FROM prem2; + f1 | f2 +-----+-------- + 201 | bar + 202 | barbar + 204 | quxqux +(3 rows) + +ALTER SERVER loopback OPTIONS (DROP parallel_commit); +ALTER SERVER loopback2 OPTIONS (DROP parallel_commit); diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 2c6b2894b96..572591a558d 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -121,6 +121,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) strcmp(def->defname, "updatable") == 0 || strcmp(def->defname, "truncatable") == 0 || strcmp(def->defname, "async_capable") == 0 || + strcmp(def->defname, "parallel_commit") == 0 || strcmp(def->defname, "keep_connections") == 0) { /* these accept only boolean values */ @@ -249,6 +250,7 @@ InitPgFdwOptions(void) /* async_capable is available on both server and table */ {"async_capable", ForeignServerRelationId, false}, {"async_capable", ForeignTableRelationId, false}, + {"parallel_commit", ForeignServerRelationId, false}, {"keep_connections", ForeignServerRelationId, false}, {"password_required", UserMappingRelationId, false}, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 6c9f579c41d..95b6b7192e6 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3515,3 +3515,49 @@ SELECT pg_terminate_backend(pid, 180000) FROM pg_stat_activity --Clean up RESET postgres_fdw.application_name; RESET debug_discard_caches; + +-- =================================================================== +-- test parallel commit +-- =================================================================== +ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); +ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true'); + +CREATE TABLE ploc1 (f1 int, f2 text); +CREATE FOREIGN TABLE prem1 (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'ploc1'); +CREATE TABLE ploc2 (f1 int, f2 text); +CREATE FOREIGN TABLE prem2 (f1 int, f2 text) + SERVER loopback2 OPTIONS (table_name 'ploc2'); + +BEGIN; +INSERT INTO prem1 VALUES (101, 'foo'); +INSERT INTO prem2 VALUES (201, 'bar'); +COMMIT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (102, 'foofoo'); +INSERT INTO prem2 VALUES (202, 'barbar'); +RELEASE SAVEPOINT s; +COMMIT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +-- This tests executing DEALLOCATE ALL against foreign servers in parallel +-- during pre-commit +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (103, 'baz'); +INSERT INTO prem2 VALUES (203, 'qux'); +ROLLBACK TO SAVEPOINT s; +RELEASE SAVEPOINT s; +INSERT INTO prem1 VALUES (104, 'bazbaz'); +INSERT INTO prem2 VALUES (204, 'quxqux'); +COMMIT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +ALTER SERVER loopback OPTIONS (DROP parallel_commit); +ALTER SERVER loopback2 OPTIONS (DROP parallel_commit); diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index dc57fe4b0d1..8ebf0dc3a05 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -456,6 +456,52 @@ OPTIONS (ADD password_required 'false'); + + Transaction Management Options + + + When multiple remote (sub)transactions are involved in a local + (sub)transaction, by default postgres_fdw commits + those remote (sub)transactions one by one when the local (sub)transaction + commits. + Performance can be improved with the following option: + + + + + + parallel_commit (boolean) + + + This option controls whether postgres_fdw commits + remote (sub)transactions opened on a foreign server in a local + (sub)transaction in parallel when the local (sub)transaction commits. + This option can only be specified for foreign servers, not per-table. + The default is false. + + + + If multiple foreign servers with this option enabled are involved in + a local (sub)transaction, multiple remote (sub)transactions opened on + those foreign servers in the local (sub)transaction are committed in + parallel across those foreign servers when the local (sub)transaction + commits. + + + + For a foreign server with this option enabled, if many remote + (sub)transactions are opened on the foreign server in a local + (sub)transaction, this option might increase the remote server’s load + when the local (sub)transaction commits, so be careful when using this + option. + + + + + + + + Updatability Options