diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 824d301aad8..aa018b7747e 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -48,6 +48,7 @@ #include "funcapi.h" #include "lib/stringinfo.h" #include "libpq-fe.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "parser/scansup.h" @@ -59,6 +60,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/varlena.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -478,7 +480,7 @@ dblink_open(PG_FUNCTION_ARGS) /* If we are not in a transaction, start one */ if (PQtransactionStatus(conn) == PQTRANS_IDLE) { - res = PQexec(conn, "BEGIN"); + res = libpqsrv_exec(conn, "BEGIN", PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) dblink_res_internalerror(conn, res, "begin error"); PQclear(res); @@ -497,7 +499,7 @@ dblink_open(PG_FUNCTION_ARGS) (rconn->openCursorCount)++; appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql); - res = PQexec(conn, buf.data); + res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { dblink_res_error(conn, conname, res, fail, @@ -566,7 +568,7 @@ dblink_close(PG_FUNCTION_ARGS) appendStringInfo(&buf, "CLOSE %s", curname); /* close the cursor */ - res = PQexec(conn, buf.data); + res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { dblink_res_error(conn, conname, res, fail, @@ -586,7 +588,7 @@ dblink_close(PG_FUNCTION_ARGS) { rconn->newXactForCursor = false; - res = PQexec(conn, "COMMIT"); + res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) dblink_res_internalerror(conn, res, "commit error"); PQclear(res); @@ -668,7 +670,7 @@ dblink_fetch(PG_FUNCTION_ARGS) * PGresult will be long-lived even though we are still in a short-lived * memory context. */ - res = PQexec(conn, buf.data); + res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) @@ -816,7 +818,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) else { /* async result retrieval, do it the old way */ - PGresult *res = PQgetResult(conn); + PGresult *res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION); /* NULL means we're all done with the async results */ if (res) @@ -1130,7 +1132,8 @@ materializeQueryResult(FunctionCallInfo fcinfo, PQclear(sinfo.last_res); PQclear(sinfo.cur_res); /* and clear out any pending data in libpq */ - while ((res = PQgetResult(conn)) != NULL) + while ((res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION)) != + NULL) PQclear(res); PG_RE_THROW(); } @@ -1157,7 +1160,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql) { CHECK_FOR_INTERRUPTS(); - sinfo->cur_res = PQgetResult(conn); + sinfo->cur_res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION); if (!sinfo->cur_res) break; @@ -1485,7 +1488,7 @@ dblink_exec(PG_FUNCTION_ARGS) if (!conn) dblink_conn_not_avail(conname); - res = PQexec(conn, sql); + res = libpqsrv_exec(conn, sql, PG_WAIT_EXTENSION); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) @@ -2771,8 +2774,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res, /* * If we don't get a message from the PGresult, try the PGconn. This is - * needed because for connection-level failures, PQexec may just return - * NULL, not a PGresult at all. + * needed because for connection-level failures, PQgetResult may just + * return NULL, not a PGresult at all. */ if (message_primary == NULL) message_primary = pchomp(PQerrorMessage(conn)); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 405afe7727b..99c5f9f8784 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -622,12 +622,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Send a query and wait for the results by using the asynchronous libpq * functions and socket readiness events. * - * We must not use the regular blocking libpq functions like PQexec() - * since they are uninterruptible by signals on some platforms, such as - * Windows. - * - * The function is modeled on PQexec() in libpq, but only implements - * those parts that are in use in the walreceiver api. + * The function is modeled on libpqsrv_exec(), with the behavior difference + * being that it calls ProcessWalRcvInterrupts(). As an optimization, it + * skips try/catch, since all errors terminate the process. * * May return NULL, rather than an error result, on failure. */ diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h index 41e3bb4376a..a4b3e805b9d 100644 --- a/src/include/libpq/libpq-be-fe-helpers.h +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -49,6 +49,8 @@ static inline void libpqsrv_connect_prepare(void); static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info); /* @@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info) PG_END_TRY(); } +/* + * PQexec() wrapper that processes interrupts. + * + * Unless PQsetnonblocking(conn, 1) is in effect, this can't process + * interrupts while pushing the query text to the server. Consider that + * setting if query strings can be long relative to TCP buffer size. + * + * This has the preconditions of PQsendQuery(), not those of PQexec(). Most + * notably, PQexec() would silently discard any prior query results. + */ +static inline PGresult * +libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info) +{ + if (!PQsendQuery(conn, query)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * PQexecParams() wrapper that processes interrupts. + * + * See notes at libpqsrv_exec(). + */ +static inline PGresult * +libpqsrv_exec_params(PGconn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat, + uint32 wait_event_info) +{ + if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues, + paramLengths, paramFormats, resultFormat)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * Like PQexec(), loop over PQgetResult() until it returns NULL or another + * terminal state. Return the last non-NULL result or the terminal state. + */ +static inline PGresult * +libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info) +{ + PGresult *volatile lastResult = NULL; + + /* In what follows, do not leak any PGresults on an error. */ + PG_TRY(); + { + for (;;) + { + /* Wait for, and collect, the next PGresult. */ + PGresult *result; + + result = libpqsrv_get_result(conn, wait_event_info); + if (result == NULL) + break; /* query is complete, or failure */ + + /* + * Emulate PQexec()'s behavior of returning the last result when + * there are many. + */ + PQclear(lastResult); + lastResult = result; + + if (PQresultStatus(lastResult) == PGRES_COPY_IN || + PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQresultStatus(lastResult) == PGRES_COPY_BOTH || + PQstatus(conn) == CONNECTION_BAD) + break; + } + } + PG_CATCH(); + { + PQclear(lastResult); + PG_RE_THROW(); + } + PG_END_TRY(); + + return lastResult; +} + +/* + * Perform the equivalent of PQgetResult(), but watch for interrupts. + */ +static inline PGresult * +libpqsrv_get_result(PGconn *conn, uint32 wait_event_info) +{ + /* + * Collect data until PQgetResult is ready to get the result without + * blocking. + */ + while (PQisBusy(conn)) + { + int rc; + + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | + WL_SOCKET_READABLE, + PQsocket(conn), + 0, + wait_event_info); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* Consume whatever data is available from the socket */ + if (PQconsumeInput(conn) == 0) + { + /* trouble; expect PQgetResult() to return NULL */ + break; + } + } + + /* Now we can collect and return the next PGresult */ + return PQgetResult(conn); +} + #endif /* LIBPQ_BE_FE_HELPERS_H */