1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-11 20:28:21 +03:00

Allow queries submitted by postgres_fdw to be canceled.

Back-patch of commits f039eaac71 and
1b812afb0e, which arranged (in 9.6+) to
make remote queries interruptible.  It was known at the time that the
same problem existed in the back-branches, but I did not back-patch
for lack of a user complaint.

Michael Paquier and Etsuro Fujita, adjusted for older branches by me.
Per gripe from Suraj Kharage.  This doesn't directly addresss Suraj's
gripe, but since the patch that will do so builds up on top of this
work, it seems best to back-patch this part first.

Discussion: http://postgr.es/m/CAF1DzPU8Kx+fMXEbFoP289xtm3bz3t+ZfxhmKavr98Bh-C0TqQ@mail.gmail.com
This commit is contained in:
Robert Haas
2017-05-06 22:21:37 -04:00
parent d617c7629c
commit cdf5a004bb
3 changed files with 194 additions and 42 deletions

View File

@ -17,6 +17,7 @@
#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
@ -448,6 +449,78 @@ GetPrepStmtNumber(PGconn *conn)
return ++prep_stmt_number;
}
/*
* Submit a query and wait for the result.
*
* This function is interruptible by signals.
*
* Caller is responsible for the error handling on the result.
*/
PGresult *
pgfdw_exec_query(PGconn *conn, const char *query)
{
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
*/
if (!PQsendQuery(conn, query))
pgfdw_report_error(ERROR, NULL, conn, false, query);
/* Wait for the result. */
return pgfdw_get_result(conn, query);
}
/*
* Wait for the result from a prior asynchronous execution function call.
*
* This function offers quick responsiveness by checking for any interruptions.
*
* This function emulates the PQexec()'s behavior of returning the last result
* when there are many.
*
* Caller is responsible for the error handling on the result.
*/
PGresult *
pgfdw_get_result(PGconn *conn, const char *query)
{
PGresult *last_res = NULL;
for (;;)
{
PGresult *res;
while (PQisBusy(conn))
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE,
PQsocket(conn),
-1L);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
pgfdw_report_error(ERROR, NULL, conn, false, query);
}
}
res = PQgetResult(conn);
if (res == NULL)
break; /* query is complete */
PQclear(last_res);
last_res = res;
}
return last_res;
}
/*
* Report an error we got from the remote server.
*
@ -599,6 +672,30 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
/*
* If a command has been submitted to the remote server by
* using an asynchronous execution function, the command
* might not have yet completed. Check to see if a command
* is still being processed by the remote server, and if so,
* request cancellation of the command.
*/
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
{
PGcancel *cancel;
char errbuf[256];
if ((cancel = PQgetCancel(entry->conn)))
{
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
ereport(WARNING,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send cancel request: %s",
errbuf)));
PQfreeCancel(cancel);
}
}
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
@ -700,6 +797,30 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
{
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
/*
* If a command has been submitted to the remote server by using an
* asynchronous execution function, the command might not have yet
* completed. Check to see if a command is still being processed by
* the remote server, and if so, request cancellation of the
* command.
*/
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
{
PGcancel *cancel;
char errbuf[256];
if ((cancel = PQgetCancel(entry->conn)))
{
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
ereport(WARNING,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send cancel request: %s",
errbuf)));
PQfreeCancel(cancel);
}
}
/* Rollback all remote subtransactions during abort */
snprintf(sql, sizeof(sql),
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",