diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 1b691fb05e0..9818d2750f8 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -485,7 +485,7 @@ pgfdw_exec_query(PGconn *conn, const char *query) * * This function offers quick responsiveness by checking for any interruptions. * - * This function emulates the PQexec()'s behavior of returning the last result + * This function emulates PQexec()'s behavior of returning the last result * when there are many. * * Caller is responsible for the error handling on the result. @@ -493,40 +493,50 @@ pgfdw_exec_query(PGconn *conn, const char *query) PGresult * pgfdw_get_result(PGconn *conn, const char *query) { - PGresult *last_res = NULL; + PGresult *volatile last_res = NULL; - for (;;) + /* In what follows, do not leak any PGresults on an error. */ + PG_TRY(); { - PGresult *res; - - while (PQisBusy(conn)) + for (;;) { - int wc; + PGresult *res; - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE, - PQsocket(conn), - -1L, PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket */ - if (wc & WL_SOCKET_READABLE) + while (PQisBusy(conn)) { - if (!PQconsumeInput(conn)) - pgfdw_report_error(ERROR, NULL, conn, false, query); + int wc; + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE, + PQsocket(conn), + -1L, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket? */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) + pgfdw_report_error(ERROR, NULL, conn, false, query); + } } + + res = PQgetResult(conn); + if (res == NULL) + break; /* query is complete */ + + PQclear(last_res); + last_res = res; } - - res = PQgetResult(conn); - if (res == NULL) - break; /* query is complete */ - - PQclear(last_res); - last_res = res; } + PG_CATCH(); + { + PQclear(last_res); + PG_RE_THROW(); + } + PG_END_TRY(); return last_res; } @@ -1006,6 +1016,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) pgfdw_report_error(WARNING, result, conn, true, query); return ignore_errors; } + PQclear(result); return true; } @@ -1028,56 +1039,75 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result) { - PGresult *last_res = NULL; + volatile bool timed_out = false; + PGresult *volatile last_res = NULL; - for (;;) + /* In what follows, do not leak any PGresults on an error. */ + PG_TRY(); { - PGresult *res; - - while (PQisBusy(conn)) + for (;;) { - int wc; - TimestampTz now = GetCurrentTimestamp(); - long secs; - int microsecs; - long cur_timeout; + PGresult *res; - /* If timeout has expired, give up, else get sleep time. */ - if (now >= endtime) - return true; - TimestampDifference(now, endtime, &secs, µsecs); - - /* To protect against clock skew, limit sleep to one minute. */ - cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs); - - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT, - PQsocket(conn), - cur_timeout, PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket */ - if (wc & WL_SOCKET_READABLE) + while (PQisBusy(conn)) { - if (!PQconsumeInput(conn)) + int wc; + TimestampTz now = GetCurrentTimestamp(); + long secs; + int microsecs; + long cur_timeout; + + /* If timeout has expired, give up, else get sleep time. */ + if (now >= endtime) { - *result = NULL; - return false; + timed_out = true; + goto exit; + } + TimestampDifference(now, endtime, &secs, µsecs); + + /* To protect against clock skew, limit sleep to one minute. */ + cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs); + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, + WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT, + PQsocket(conn), + cur_timeout, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket? */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) + { + /* connection trouble; treat the same as a timeout */ + timed_out = true; + goto exit; + } } } + + res = PQgetResult(conn); + if (res == NULL) + break; /* query is complete */ + + PQclear(last_res); + last_res = res; } - - res = PQgetResult(conn); - if (res == NULL) - break; /* query is complete */ - - PQclear(last_res); - last_res = res; +exit: ; } + PG_CATCH(); + { + PQclear(last_res); + PG_RE_THROW(); + } + PG_END_TRY(); - *result = last_res; - return false; + if (timed_out) + PQclear(last_res); + else + *result = last_res; + return timed_out; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 7509b4fe60a..f6fa0e4c16a 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -591,13 +591,19 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } + + /* Consume whatever data is available from the socket */ if (PQconsumeInput(streamConn) == 0) - return NULL; /* trouble */ + { + /* trouble; drop whatever we had and return NULL */ + PQclear(lastResult); + return NULL; + } } /* - * Emulate the PQexec()'s behavior of returning the last result when - * there are many. We are fine with returning just last error message. + * Emulate PQexec()'s behavior of returning the last result when there + * are many. We are fine with returning just last error message. */ result = PQgetResult(streamConn); if (result == NULL)