diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 8eb9194506c..2969351e9a9 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -60,6 +60,7 @@ typedef struct ConnCacheEntry
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
bool parallel_commit; /* do we commit (sub)xacts in parallel? */
+ bool parallel_abort; /* do we abort (sub)xacts in parallel? */
bool invalidated; /* true if reconnect is pending */
bool keep_connections; /* setting value of keep_connections
* server option */
@@ -81,6 +82,25 @@ static unsigned int prep_stmt_number = 0;
/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;
+/*
+ * Milliseconds to wait to cancel an in-progress query or execute a cleanup
+ * query; if it takes longer than 30 seconds to do these, we assume the
+ * connection is dead.
+ */
+#define CONNECTION_CLEANUP_TIMEOUT 30000
+
+/* Macro for constructing abort command to be sent */
+#define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
+ do { \
+ if (toplevel) \
+ snprintf((sql), sizeof(sql), \
+ "ABORT TRANSACTION"); \
+ else \
+ snprintf((sql), sizeof(sql), \
+ "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
+ (entry)->xact_depth, (entry)->xact_depth); \
+ } while(0)
+
/*
* SQL functions
*/
@@ -107,14 +127,28 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
+ bool consume_input);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
+static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
+static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+ TimestampTz endtime,
+ bool consume_input,
+ bool ignore_errors);
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result, bool *timed_out);
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
+static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+ List **pending_entries,
+ List **cancel_requested);
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
int curlevel);
+static void pgfdw_finish_abort_cleanup(List *pending_entries,
+ List *cancel_requested,
+ bool toplevel);
static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
@@ -320,8 +354,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
*
* By default, all the connections to any foreign servers are kept open.
*
- * Also determine whether to commit (sub)transactions opened on the remote
- * server in parallel at (sub)transaction end, which is disabled by
+ * Also determine whether to commit/abort (sub)transactions opened on the
+ * remote server in parallel at (sub)transaction end, which is disabled by
* default.
*
* Note: it's enough to determine these only when making a new connection
@@ -330,6 +364,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
*/
entry->keep_connections = true;
entry->parallel_commit = false;
+ entry->parallel_abort = false;
foreach(lc, server->options)
{
DefElem *def = (DefElem *) lfirst(lc);
@@ -338,6 +373,8 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
entry->keep_connections = defGetBoolean(def);
else if (strcmp(def->defname, "parallel_commit") == 0)
entry->parallel_commit = defGetBoolean(def);
+ else if (strcmp(def->defname, "parallel_abort") == 0)
+ entry->parallel_abort = defGetBoolean(def);
}
/* Now try to make the connection */
@@ -892,6 +929,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
List *pending_entries = NIL;
+ List *cancel_requested = NIL;
/* Quick exit if no connections were touched in this transaction. */
if (!xact_got_connection)
@@ -985,7 +1023,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
/* Rollback all remote transactions during abort */
- pgfdw_abort_cleanup(entry, true);
+ if (entry->parallel_abort)
+ {
+ if (pgfdw_abort_cleanup_begin(entry, true,
+ &pending_entries,
+ &cancel_requested))
+ continue;
+ }
+ else
+ pgfdw_abort_cleanup(entry, true);
break;
}
}
@@ -995,11 +1041,21 @@ pgfdw_xact_callback(XactEvent event, void *arg)
}
/* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
+ if (pending_entries || cancel_requested)
{
- Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
- event == XACT_EVENT_PRE_COMMIT);
- pgfdw_finish_pre_commit_cleanup(pending_entries);
+ if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
+ event == XACT_EVENT_PRE_COMMIT)
+ {
+ Assert(cancel_requested == NIL);
+ pgfdw_finish_pre_commit_cleanup(pending_entries);
+ }
+ else
+ {
+ Assert(event == XACT_EVENT_PARALLEL_ABORT ||
+ event == XACT_EVENT_ABORT);
+ pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+ true);
+ }
}
/*
@@ -1024,6 +1080,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
ConnCacheEntry *entry;
int curlevel;
List *pending_entries = NIL;
+ List *cancel_requested = NIL;
/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1078,7 +1135,15 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
else
{
/* Rollback all remote subtransactions during abort */
- pgfdw_abort_cleanup(entry, false);
+ if (entry->parallel_abort)
+ {
+ if (pgfdw_abort_cleanup_begin(entry, false,
+ &pending_entries,
+ &cancel_requested))
+ continue;
+ }
+ else
+ pgfdw_abort_cleanup(entry, false);
}
/* OK, we're outta that level of subtransaction */
@@ -1086,10 +1151,19 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
}
/* If there are any pending connections, finish cleaning them up */
- if (pending_entries)
+ if (pending_entries || cancel_requested)
{
- Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
- pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
+ {
+ Assert(cancel_requested == NIL);
+ pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
+ }
+ else
+ {
+ Assert(event == SUBXACT_EVENT_ABORT_SUB);
+ pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
+ false);
+ }
}
}
@@ -1233,17 +1307,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
static bool
pgfdw_cancel_query(PGconn *conn)
{
- PGcancel *cancel;
- char errbuf[256];
- PGresult *result = NULL;
TimestampTz endtime;
- bool timed_out;
/*
* If it takes too long to cancel the query and discard the result, assume
* the connection is dead.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_cancel_query_begin(conn))
+ return false;
+ return pgfdw_cancel_query_end(conn, endtime, false);
+}
+
+static bool
+pgfdw_cancel_query_begin(PGconn *conn)
+{
+ PGcancel *cancel;
+ char errbuf[256];
/*
* Issue cancel request. Unfortunately, there's no good way to limit the
@@ -1263,6 +1345,30 @@ pgfdw_cancel_query(PGconn *conn)
PQfreeCancel(cancel);
}
+ return true;
+}
+
+static bool
+pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
+{
+ PGresult *result = NULL;
+ bool timed_out;
+
+ /*
+ * If requested, consume whatever data is available from the socket. (Note
+ * that if all data is available, this allows pgfdw_get_cleanup_result to
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
+ * which would be large compared to the overhead of PQconsumeInput.)
+ */
+ if (consume_input && !PQconsumeInput(conn))
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not get result of cancel request: %s",
+ pchomp(PQerrorMessage(conn)))));
+ return false;
+ }
+
/* Get and discard the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
{
@@ -1297,9 +1403,7 @@ pgfdw_cancel_query(PGconn *conn)
static bool
pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
{
- PGresult *result = NULL;
TimestampTz endtime;
- bool timed_out;
/*
* If it takes too long to execute a cleanup query, assume the connection
@@ -1307,8 +1411,18 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
* place (e.g. statement timeout, user cancel), so the timeout shouldn't
* be too long.
*/
- endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+ if (!pgfdw_exec_cleanup_query_begin(conn, query))
+ return false;
+ return pgfdw_exec_cleanup_query_end(conn, query, endtime,
+ false, ignore_errors);
+}
+
+static bool
+pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
+{
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
@@ -1319,6 +1433,29 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
return false;
}
+ return true;
+}
+
+static bool
+pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
+ TimestampTz endtime, bool consume_input,
+ bool ignore_errors)
+{
+ PGresult *result = NULL;
+ bool timed_out;
+
+ /*
+ * If requested, consume whatever data is available from the socket. (Note
+ * that if all data is available, this allows pgfdw_get_cleanup_result to
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
+ * which would be large compared to the overhead of PQconsumeInput.)
+ */
+ if (consume_input && !PQconsumeInput(conn))
+ {
+ pgfdw_report_error(WARNING, NULL, conn, false, query);
+ return false;
+ }
+
/* Get the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
{
@@ -1474,12 +1611,7 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
!pgfdw_cancel_query(entry->conn))
return; /* Unable to cancel running query */
- if (toplevel)
- snprintf(sql, sizeof(sql), "ABORT TRANSACTION");
- else
- snprintf(sql, sizeof(sql),
- "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
- entry->xact_depth, entry->xact_depth);
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
return; /* Unable to abort remote (sub)transaction */
@@ -1508,6 +1640,65 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
entry->changing_xact_state = false;
}
+/*
+ * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
+ * don't wait for the result.
+ *
+ * Returns true if the abort command or cancel request is successfully issued,
+ * false otherwise. If the abort command is successfully issued, the given
+ * connection cache entry is appended to *pending_entries. Othewise, if the
+ * cancel request is successfully issued, it is appended to *cancel_requested.
+ */
+static bool
+pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
+ List **pending_entries, List **cancel_requested)
+{
+ /*
+ * Don't try to clean up the connection if we're already in error
+ * recursion trouble.
+ */
+ if (in_error_recursion_trouble())
+ entry->changing_xact_state = true;
+
+ /*
+ * If connection is already unsalvageable, don't touch it further.
+ */
+ if (entry->changing_xact_state)
+ return false;
+
+ /*
+ * Mark this connection as in the process of changing transaction state.
+ */
+ entry->changing_xact_state = true;
+
+ /* Assume we might have lost track of prepared statements */
+ entry->have_error = true;
+
+ /*
+ * If a command has been submitted to the remote server by using an
+ * asynchronous execution function, the command might not have yet
+ * completed. Check to see if a command is still being processed by the
+ * remote server, and if so, request cancellation of the command.
+ */
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ {
+ if (!pgfdw_cancel_query_begin(entry->conn))
+ return false; /* Unable to cancel running query */
+ *cancel_requested = lappend(*cancel_requested, entry);
+ }
+ else
+ {
+ char sql[100];
+
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+ return false; /* Unable to abort remote transaction */
+ *pending_entries = lappend(*pending_entries, entry);
+ }
+
+ return true;
+}
+
/*
* Finish pre-commit cleanup of connections on each of which we've sent a
* COMMIT command to the remote server.
@@ -1616,6 +1807,168 @@ pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
}
}
+/*
+ * Finish abort cleanup of connections on each of which we've sent an abort
+ * command or cancel request to the remote server.
+ */
+static void
+pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
+ bool toplevel)
+{
+ List *pending_deallocs = NIL;
+ ListCell *lc;
+
+ /*
+ * For each of the pending cancel requests (if any), get and discard the
+ * result of the query, and submit an abort command to the remote server.
+ */
+ if (cancel_requested)
+ {
+ foreach(lc, cancel_requested)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+ char sql[100];
+
+ Assert(entry->changing_xact_state);
+
+ /*
+ * Set end time. You might think we should do this before issuing
+ * cancel request like in normal mode, but that is problematic,
+ * because if, for example, it took longer than 30 seconds to
+ * process the first few entries in the cancel_requested list, it
+ * would cause a timeout error when processing each of the
+ * remaining entries in the list, leading to slamming that entry's
+ * connection shut.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
+ {
+ /* Unable to cancel running query */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+
+ /* Send an abort command in parallel if needed */
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
+ {
+ /* Unable to abort remote (sub)transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+ else
+ pending_entries = lappend(pending_entries, entry);
+ }
+ }
+
+ /* No further work if no pending entries */
+ if (!pending_entries)
+ return;
+
+ /*
+ * Get the result of the abort command for each of the pending entries
+ */
+ foreach(lc, pending_entries)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+ char sql[100];
+
+ Assert(entry->changing_xact_state);
+
+ /*
+ * Set end time. We do this now, not before issuing the command like
+ * in normal mode, for the same reason as for the cancel_requested
+ * entries.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
+ if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
+ true, false))
+ {
+ /* Unable to abort remote (sub)transaction */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+
+ if (toplevel)
+ {
+ /* Do a DEALLOCATE ALL in parallel if needed */
+ if (entry->have_prep_stmt && entry->have_error)
+ {
+ if (!pgfdw_exec_cleanup_query_begin(entry->conn,
+ "DEALLOCATE ALL"))
+ {
+ /* Trouble clearing prepared statements */
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+ else
+ pending_deallocs = lappend(pending_deallocs, entry);
+ continue;
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+ }
+
+ /* Reset the per-connection state if needed */
+ if (entry->state.pendingAreq)
+ memset(&entry->state, 0, sizeof(entry->state));
+
+ /* We're done with this entry; unset the changing_xact_state flag */
+ entry->changing_xact_state = false;
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+
+ /* No further work if no pending entries */
+ if (!pending_deallocs)
+ return;
+ Assert(toplevel);
+
+ /*
+ * Get the result of the DEALLOCATE command for each of the pending
+ * entries
+ */
+ foreach(lc, pending_deallocs)
+ {
+ ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
+ TimestampTz endtime;
+
+ Assert(entry->changing_xact_state);
+ Assert(entry->have_prep_stmt);
+ Assert(entry->have_error);
+
+ /*
+ * Set end time. We do this now, not before issuing the command like
+ * in normal mode, for the same reason as for the cancel_requested
+ * entries.
+ */
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+ CONNECTION_CLEANUP_TIMEOUT);
+
+ if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
+ endtime, true, true))
+ {
+ /* Trouble clearing prepared statements */
+ pgfdw_reset_xact_state(entry, toplevel);
+ continue;
+ }
+ entry->have_prep_stmt = false;
+ entry->have_error = false;
+
+ /* Reset the per-connection state if needed */
+ if (entry->state.pendingAreq)
+ memset(&entry->state, 0, sizeof(entry->state));
+
+ /* We're done with this entry; unset the changing_xact_state flag */
+ entry->changing_xact_state = false;
+ pgfdw_reset_xact_state(entry, toplevel);
+ }
+}
+
/*
* List active foreign server connections.
*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 04a3ef450cf..8f6a04f71b6 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -11683,10 +11683,12 @@ SELECT count(*) FROM remote_application_name
DROP FOREIGN TABLE remote_application_name;
DROP VIEW my_application_name;
-- ===================================================================
--- test parallel commit
+-- test parallel commit and parallel abort
-- ===================================================================
ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback OPTIONS (ADD parallel_abort 'true');
ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback2 OPTIONS (ADD parallel_abort 'true');
CREATE TABLE ploc1 (f1 int, f2 text);
CREATE FOREIGN TABLE prem1 (f1 int, f2 text)
SERVER loopback OPTIONS (table_name 'ploc1');
@@ -11756,8 +11758,57 @@ SELECT * FROM prem2;
204 | quxqux
(3 rows)
+BEGIN;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+ f1 | f2
+-----+--------
+ 101 | foo
+ 102 | foofoo
+ 104 | bazbaz
+(3 rows)
+
+SELECT * FROM prem2;
+ f1 | f2
+-----+--------
+ 201 | bar
+ 202 | barbar
+ 204 | quxqux
+(3 rows)
+
+-- This tests executing DEALLOCATE ALL against foreign servers in parallel
+-- during post-abort
+BEGIN;
+SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ROLLBACK TO SAVEPOINT s;
+RELEASE SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+ f1 | f2
+-----+--------
+ 101 | foo
+ 102 | foofoo
+ 104 | bazbaz
+(3 rows)
+
+SELECT * FROM prem2;
+ f1 | f2
+-----+--------
+ 201 | bar
+ 202 | barbar
+ 204 | quxqux
+(3 rows)
+
ALTER SERVER loopback OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback OPTIONS (DROP parallel_abort);
ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback2 OPTIONS (DROP parallel_abort);
-- ===================================================================
-- test for ANALYZE sampling
-- ===================================================================
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index d530f7d0860..4229d2048c3 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -125,6 +125,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
strcmp(def->defname, "truncatable") == 0 ||
strcmp(def->defname, "async_capable") == 0 ||
strcmp(def->defname, "parallel_commit") == 0 ||
+ strcmp(def->defname, "parallel_abort") == 0 ||
strcmp(def->defname, "keep_connections") == 0)
{
/* these accept only boolean values */
@@ -271,6 +272,7 @@ InitPgFdwOptions(void)
{"async_capable", ForeignServerRelationId, false},
{"async_capable", ForeignTableRelationId, false},
{"parallel_commit", ForeignServerRelationId, false},
+ {"parallel_abort", ForeignServerRelationId, false},
{"keep_connections", ForeignServerRelationId, false},
{"password_required", UserMappingRelationId, false},
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 4f3088c03ea..5bd69339dfc 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3899,10 +3899,12 @@ DROP FOREIGN TABLE remote_application_name;
DROP VIEW my_application_name;
-- ===================================================================
--- test parallel commit
+-- test parallel commit and parallel abort
-- ===================================================================
ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback OPTIONS (ADD parallel_abort 'true');
ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true');
+ALTER SERVER loopback2 OPTIONS (ADD parallel_abort 'true');
CREATE TABLE ploc1 (f1 int, f2 text);
CREATE FOREIGN TABLE prem1 (f1 int, f2 text)
@@ -3941,8 +3943,31 @@ COMMIT;
SELECT * FROM prem1;
SELECT * FROM prem2;
+BEGIN;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+SELECT * FROM prem2;
+
+-- This tests executing DEALLOCATE ALL against foreign servers in parallel
+-- during post-abort
+BEGIN;
+SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ROLLBACK TO SAVEPOINT s;
+RELEASE SAVEPOINT s;
+INSERT INTO prem1 VALUES (105, 'test1');
+INSERT INTO prem2 VALUES (205, 'test2');
+ABORT;
+SELECT * FROM prem1;
+SELECT * FROM prem2;
+
ALTER SERVER loopback OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback OPTIONS (DROP parallel_abort);
ALTER SERVER loopback2 OPTIONS (DROP parallel_commit);
+ALTER SERVER loopback2 OPTIONS (DROP parallel_abort);
-- ===================================================================
-- test for ANALYZE sampling
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index d43ea71407f..9e66987cf7f 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -510,12 +510,13 @@ OPTIONS (ADD password_required 'false');
corresponding remote transactions, and subtransactions are managed by
creating corresponding remote subtransactions. When multiple remote
transactions are involved in the current local transaction, by default
- postgres_fdw commits those remote transactions
- serially when the local transaction is committed. When multiple remote
- subtransactions are involved in the current local subtransaction, by
- default postgres_fdw commits those remote
- subtransactions serially when the local subtransaction is committed.
- Performance can be improved with the following option:
+ postgres_fdw commits or aborts those remote
+ transactions serially when the local transaction is committed or aborted.
+ When multiple remote subtransactions are involved in the current local
+ subtransaction, by default postgres_fdw commits or
+ aborts those remote subtransactions serially when the local subtransaction
+ is committed or abortd.
+ Performance can be improved with the following options:
@@ -531,24 +532,38 @@ OPTIONS (ADD password_required 'false');
specified for foreign servers, not per-table. The default is
false.
+
+
+
+ parallel_abort (boolean)
+
- If multiple foreign servers with this option enabled are involved in a
- local transaction, multiple remote transactions on those foreign
- servers are committed in parallel across those foreign servers when
- the local transaction is committed.
-
-
-
- When this option is enabled, a foreign server with many remote
- transactions may see a negative performance impact when the local
- transaction is committed.
+ This option controls whether postgres_fdw aborts
+ in parallel remote transactions opened on a foreign server in a local
+ transaction when the local transaction is aborted. This setting also
+ applies to remote and local subtransactions. This option can only be
+ specified for foreign servers, not per-table. The default is
+ false.
+
+ If multiple foreign servers with these options enabled are involved in a
+ local transaction, multiple remote transactions on those foreign servers
+ are committed or aborted in parallel across those foreign servers when
+ the local transaction is committed or aborted.
+
+
+
+ When these options are enabled, a foreign server with many remote
+ transactions may see a negative performance impact when the local
+ transaction is committed or aborted.
+
+