mirror of
https://github.com/postgres/postgres.git
synced 2025-07-27 12:41:57 +03:00
Reap the benefits of not having to avoid leaking PGresults.
Remove a bunch of PG_TRY constructs, de-volatilize related variables, remove some PQclear calls in error paths. Aside from making the code simpler and shorter, this should provide some marginal performance gains. For ease of review, I did not re-indent code within the removed PG_TRY constructs. That'll be done in a separate patch. Author: Tom Lane <tgl@sss.pgh.pa.us> Reviewed-by: Matheus Alcantara <matheusssilv97@gmail.com> Discussion: https://postgr.es/m/2976982.1748049023@sss.pgh.pa.us
This commit is contained in:
@ -101,8 +101,8 @@ static void materializeQueryResult(FunctionCallInfo fcinfo,
|
|||||||
const char *conname,
|
const char *conname,
|
||||||
const char *sql,
|
const char *sql,
|
||||||
bool fail);
|
bool fail);
|
||||||
static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
|
static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
|
||||||
static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
|
static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
|
||||||
static remoteConn *getConnectionByName(const char *name);
|
static remoteConn *getConnectionByName(const char *name);
|
||||||
static HTAB *createConnHash(void);
|
static HTAB *createConnHash(void);
|
||||||
static remoteConn *createNewConnection(const char *name);
|
static remoteConn *createNewConnection(const char *name);
|
||||||
@ -169,14 +169,6 @@ typedef struct remoteConnHashEnt
|
|||||||
/* initial number of connection hashes */
|
/* initial number of connection hashes */
|
||||||
#define NUMCONN 16
|
#define NUMCONN 16
|
||||||
|
|
||||||
static char *
|
|
||||||
xpstrdup(const char *in)
|
|
||||||
{
|
|
||||||
if (in == NULL)
|
|
||||||
return NULL;
|
|
||||||
return pstrdup(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
pg_noreturn static void
|
pg_noreturn static void
|
||||||
dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
|
dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
|
||||||
{
|
{
|
||||||
@ -870,17 +862,14 @@ static void
|
|||||||
materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
|
materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
|
||||||
{
|
{
|
||||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||||
|
TupleDesc tupdesc;
|
||||||
|
bool is_sql_cmd;
|
||||||
|
int ntuples;
|
||||||
|
int nfields;
|
||||||
|
|
||||||
/* prepTuplestoreResult must have been called previously */
|
/* prepTuplestoreResult must have been called previously */
|
||||||
Assert(rsinfo->returnMode == SFRM_Materialize);
|
Assert(rsinfo->returnMode == SFRM_Materialize);
|
||||||
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
TupleDesc tupdesc;
|
|
||||||
bool is_sql_cmd;
|
|
||||||
int ntuples;
|
|
||||||
int nfields;
|
|
||||||
|
|
||||||
if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
is_sql_cmd = true;
|
is_sql_cmd = true;
|
||||||
@ -988,13 +977,8 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
|
|||||||
/* clean up GUC settings, if we changed any */
|
/* clean up GUC settings, if we changed any */
|
||||||
restoreLocalGucs(nestlevel);
|
restoreLocalGucs(nestlevel);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
PG_FINALLY();
|
|
||||||
{
|
|
||||||
/* be sure to release the libpq result */
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1013,16 +997,17 @@ materializeQueryResult(FunctionCallInfo fcinfo,
|
|||||||
bool fail)
|
bool fail)
|
||||||
{
|
{
|
||||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||||
PGresult *volatile res = NULL;
|
|
||||||
volatile storeInfo sinfo = {0};
|
|
||||||
|
|
||||||
/* prepTuplestoreResult must have been called previously */
|
/* prepTuplestoreResult must have been called previously */
|
||||||
Assert(rsinfo->returnMode == SFRM_Materialize);
|
Assert(rsinfo->returnMode == SFRM_Materialize);
|
||||||
|
|
||||||
sinfo.fcinfo = fcinfo;
|
/* Use a PG_TRY block to ensure we pump libpq dry of results */
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
|
storeInfo sinfo = {0};
|
||||||
|
PGresult *res;
|
||||||
|
|
||||||
|
sinfo.fcinfo = fcinfo;
|
||||||
/* Create short-lived memory context for data conversions */
|
/* Create short-lived memory context for data conversions */
|
||||||
sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
|
sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
|
||||||
"dblink temporary context",
|
"dblink temporary context",
|
||||||
@ -1035,14 +1020,7 @@ materializeQueryResult(FunctionCallInfo fcinfo,
|
|||||||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
|
(PQresultStatus(res) != PGRES_COMMAND_OK &&
|
||||||
PQresultStatus(res) != PGRES_TUPLES_OK))
|
PQresultStatus(res) != PGRES_TUPLES_OK))
|
||||||
{
|
{
|
||||||
/*
|
dblink_res_error(conn, conname, res, fail,
|
||||||
* dblink_res_error will clear the passed PGresult, so we need
|
|
||||||
* this ugly dance to avoid doing so twice during error exit
|
|
||||||
*/
|
|
||||||
PGresult *res1 = res;
|
|
||||||
|
|
||||||
res = NULL;
|
|
||||||
dblink_res_error(conn, conname, res1, fail,
|
|
||||||
"while executing query");
|
"while executing query");
|
||||||
/* if fail isn't set, we'll return an empty query result */
|
/* if fail isn't set, we'll return an empty query result */
|
||||||
}
|
}
|
||||||
@ -1081,7 +1059,6 @@ materializeQueryResult(FunctionCallInfo fcinfo,
|
|||||||
tuplestore_puttuple(tupstore, tuple);
|
tuplestore_puttuple(tupstore, tuple);
|
||||||
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
res = NULL;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1090,26 +1067,20 @@ materializeQueryResult(FunctionCallInfo fcinfo,
|
|||||||
Assert(rsinfo->setResult != NULL);
|
Assert(rsinfo->setResult != NULL);
|
||||||
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
res = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* clean up data conversion short-lived memory context */
|
/* clean up data conversion short-lived memory context */
|
||||||
if (sinfo.tmpcontext != NULL)
|
if (sinfo.tmpcontext != NULL)
|
||||||
MemoryContextDelete(sinfo.tmpcontext);
|
MemoryContextDelete(sinfo.tmpcontext);
|
||||||
sinfo.tmpcontext = NULL;
|
|
||||||
|
|
||||||
PQclear(sinfo.last_res);
|
PQclear(sinfo.last_res);
|
||||||
sinfo.last_res = NULL;
|
|
||||||
PQclear(sinfo.cur_res);
|
PQclear(sinfo.cur_res);
|
||||||
sinfo.cur_res = NULL;
|
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
{
|
{
|
||||||
/* be sure to release any libpq result we collected */
|
PGresult *res;
|
||||||
PQclear(res);
|
|
||||||
PQclear(sinfo.last_res);
|
/* be sure to clear out any pending data in libpq */
|
||||||
PQclear(sinfo.cur_res);
|
|
||||||
/* and clear out any pending data in libpq */
|
|
||||||
while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
|
while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
|
||||||
NULL)
|
NULL)
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
@ -1122,7 +1093,7 @@ materializeQueryResult(FunctionCallInfo fcinfo,
|
|||||||
* Execute query, and send any result rows to sinfo->tuplestore.
|
* Execute query, and send any result rows to sinfo->tuplestore.
|
||||||
*/
|
*/
|
||||||
static PGresult *
|
static PGresult *
|
||||||
storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
|
storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
|
||||||
{
|
{
|
||||||
bool first = true;
|
bool first = true;
|
||||||
int nestlevel = -1;
|
int nestlevel = -1;
|
||||||
@ -1190,7 +1161,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
|
|||||||
* (in this case the PGresult might contain either zero or one row).
|
* (in this case the PGresult might contain either zero or one row).
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
|
storeRow(storeInfo *sinfo, PGresult *res, bool first)
|
||||||
{
|
{
|
||||||
int nfields = PQnfields(res);
|
int nfields = PQnfields(res);
|
||||||
HeapTuple tuple;
|
HeapTuple tuple;
|
||||||
@ -2795,10 +2766,13 @@ dblink_connstr_check(const char *connstr)
|
|||||||
/*
|
/*
|
||||||
* Report an error received from the remote server
|
* Report an error received from the remote server
|
||||||
*
|
*
|
||||||
* res: the received error result (will be freed)
|
* res: the received error result
|
||||||
* fail: true for ERROR ereport, false for NOTICE
|
* fail: true for ERROR ereport, false for NOTICE
|
||||||
* fmt and following args: sprintf-style format and values for errcontext;
|
* fmt and following args: sprintf-style format and values for errcontext;
|
||||||
* the resulting string should be worded like "while <some action>"
|
* the resulting string should be worded like "while <some action>"
|
||||||
|
*
|
||||||
|
* If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
|
||||||
|
* in which case memory context cleanup will clear it eventually).
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
|
dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
|
||||||
@ -2806,15 +2780,11 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
|
|||||||
{
|
{
|
||||||
int level;
|
int level;
|
||||||
char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
|
char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
|
||||||
char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
|
char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
|
||||||
char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
|
char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
|
||||||
char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
|
char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
|
||||||
char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
|
char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
|
||||||
int sqlstate;
|
int sqlstate;
|
||||||
char *message_primary;
|
|
||||||
char *message_detail;
|
|
||||||
char *message_hint;
|
|
||||||
char *message_context;
|
|
||||||
va_list ap;
|
va_list ap;
|
||||||
char dblink_context_msg[512];
|
char dblink_context_msg[512];
|
||||||
|
|
||||||
@ -2832,11 +2802,6 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
|
|||||||
else
|
else
|
||||||
sqlstate = ERRCODE_CONNECTION_FAILURE;
|
sqlstate = ERRCODE_CONNECTION_FAILURE;
|
||||||
|
|
||||||
message_primary = xpstrdup(pg_diag_message_primary);
|
|
||||||
message_detail = xpstrdup(pg_diag_message_detail);
|
|
||||||
message_hint = xpstrdup(pg_diag_message_hint);
|
|
||||||
message_context = xpstrdup(pg_diag_context);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we don't get a message from the PGresult, try the PGconn. This is
|
* If we don't get a message from the PGresult, try the PGconn. This is
|
||||||
* needed because for connection-level failures, PQgetResult may just
|
* needed because for connection-level failures, PQgetResult may just
|
||||||
@ -2845,14 +2810,6 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
|
|||||||
if (message_primary == NULL)
|
if (message_primary == NULL)
|
||||||
message_primary = pchomp(PQerrorMessage(conn));
|
message_primary = pchomp(PQerrorMessage(conn));
|
||||||
|
|
||||||
/*
|
|
||||||
* Now that we've copied all the data we need out of the PGresult, it's
|
|
||||||
* safe to free it. We must do this to avoid PGresult leakage. We're
|
|
||||||
* leaking all the strings too, but those are in palloc'd memory that will
|
|
||||||
* get cleaned up eventually.
|
|
||||||
*/
|
|
||||||
PQclear(res);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Format the basic errcontext string. Below, we'll add on something
|
* Format the basic errcontext string. Below, we'll add on something
|
||||||
* about the connection name. That's a violation of the translatability
|
* about the connection name. That's a violation of the translatability
|
||||||
@ -2877,6 +2834,7 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
|
|||||||
dblink_context_msg, conname)) :
|
dblink_context_msg, conname)) :
|
||||||
(errcontext("%s on unnamed dblink connection",
|
(errcontext("%s on unnamed dblink connection",
|
||||||
dblink_context_msg))));
|
dblink_context_msg))));
|
||||||
|
PQclear(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -815,7 +815,7 @@ static void
|
|||||||
do_sql_command_begin(PGconn *conn, const char *sql)
|
do_sql_command_begin(PGconn *conn, const char *sql)
|
||||||
{
|
{
|
||||||
if (!PQsendQuery(conn, sql))
|
if (!PQsendQuery(conn, sql))
|
||||||
pgfdw_report_error(ERROR, NULL, conn, false, sql);
|
pgfdw_report_error(ERROR, NULL, conn, sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -830,10 +830,10 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
|
|||||||
* would be large compared to the overhead of PQconsumeInput.)
|
* would be large compared to the overhead of PQconsumeInput.)
|
||||||
*/
|
*/
|
||||||
if (consume_input && !PQconsumeInput(conn))
|
if (consume_input && !PQconsumeInput(conn))
|
||||||
pgfdw_report_error(ERROR, NULL, conn, false, sql);
|
pgfdw_report_error(ERROR, NULL, conn, sql);
|
||||||
res = pgfdw_get_result(conn);
|
res = pgfdw_get_result(conn);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, true, sql);
|
pgfdw_report_error(ERROR, res, conn, sql);
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -967,22 +967,21 @@ pgfdw_get_result(PGconn *conn)
|
|||||||
* Report an error we got from the remote server.
|
* Report an error we got from the remote server.
|
||||||
*
|
*
|
||||||
* elevel: error level to use (typically ERROR, but might be less)
|
* elevel: error level to use (typically ERROR, but might be less)
|
||||||
* res: PGresult containing the error
|
* res: PGresult containing the error (might be NULL)
|
||||||
* conn: connection we did the query on
|
* conn: connection we did the query on
|
||||||
* clear: if true, PQclear the result (otherwise caller will handle it)
|
|
||||||
* sql: NULL, or text of remote command we tried to execute
|
* sql: NULL, or text of remote command we tried to execute
|
||||||
*
|
*
|
||||||
|
* If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
|
||||||
|
* in which case memory context cleanup will clear it eventually).
|
||||||
|
*
|
||||||
* Note: callers that choose not to throw ERROR for a remote error are
|
* Note: callers that choose not to throw ERROR for a remote error are
|
||||||
* responsible for making sure that the associated ConnCacheEntry gets
|
* responsible for making sure that the associated ConnCacheEntry gets
|
||||||
* marked with have_error = true.
|
* marked with have_error = true.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
|
pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
|
||||||
bool clear, const char *sql)
|
const char *sql)
|
||||||
{
|
{
|
||||||
/* If requested, PGresult must be released before leaving this function. */
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
|
char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
|
||||||
char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
|
char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
|
||||||
char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
|
char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
|
||||||
@ -1016,13 +1015,7 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
|
|||||||
message_hint ? errhint("%s", message_hint) : 0,
|
message_hint ? errhint("%s", message_hint) : 0,
|
||||||
message_context ? errcontext("%s", message_context) : 0,
|
message_context ? errcontext("%s", message_context) : 0,
|
||||||
sql ? errcontext("remote SQL command: %s", sql) : 0));
|
sql ? errcontext("remote SQL command: %s", sql) : 0));
|
||||||
}
|
|
||||||
PG_FINALLY();
|
|
||||||
{
|
|
||||||
if (clear)
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1545,7 +1538,7 @@ pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
|
|||||||
*/
|
*/
|
||||||
if (!PQsendQuery(conn, query))
|
if (!PQsendQuery(conn, query))
|
||||||
{
|
{
|
||||||
pgfdw_report_error(WARNING, NULL, conn, false, query);
|
pgfdw_report_error(WARNING, NULL, conn, query);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1570,7 +1563,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
|
|||||||
*/
|
*/
|
||||||
if (consume_input && !PQconsumeInput(conn))
|
if (consume_input && !PQconsumeInput(conn))
|
||||||
{
|
{
|
||||||
pgfdw_report_error(WARNING, NULL, conn, false, query);
|
pgfdw_report_error(WARNING, NULL, conn, query);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1582,7 +1575,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
|
|||||||
(errmsg("could not get query result due to timeout"),
|
(errmsg("could not get query result due to timeout"),
|
||||||
errcontext("remote SQL command: %s", query)));
|
errcontext("remote SQL command: %s", query)));
|
||||||
else
|
else
|
||||||
pgfdw_report_error(WARNING, NULL, conn, false, query);
|
pgfdw_report_error(WARNING, NULL, conn, query);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -1590,7 +1583,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
|
|||||||
/* Issue a warning if not successful. */
|
/* Issue a warning if not successful. */
|
||||||
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
{
|
{
|
||||||
pgfdw_report_error(WARNING, result, conn, true, query);
|
pgfdw_report_error(WARNING, result, conn, query);
|
||||||
return ignore_errors;
|
return ignore_errors;
|
||||||
}
|
}
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
@ -1618,17 +1611,12 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
|
|||||||
PGresult **result,
|
PGresult **result,
|
||||||
bool *timed_out)
|
bool *timed_out)
|
||||||
{
|
{
|
||||||
volatile bool failed = false;
|
bool failed = false;
|
||||||
PGresult *volatile last_res = NULL;
|
PGresult *last_res = NULL;
|
||||||
|
int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
|
||||||
|
|
||||||
*result = NULL;
|
*result = NULL;
|
||||||
*timed_out = false;
|
*timed_out = false;
|
||||||
|
|
||||||
/* In what follows, do not leak any PGresults on an error. */
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
|
|
||||||
|
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
@ -1706,15 +1694,7 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
|
|||||||
PQclear(last_res);
|
PQclear(last_res);
|
||||||
last_res = res;
|
last_res = res;
|
||||||
}
|
}
|
||||||
exit: ;
|
exit:
|
||||||
}
|
|
||||||
PG_CATCH();
|
|
||||||
{
|
|
||||||
PQclear(last_res);
|
|
||||||
PG_RE_THROW();
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
if (failed)
|
if (failed)
|
||||||
PQclear(last_res);
|
PQclear(last_res);
|
||||||
else
|
else
|
||||||
|
@ -1702,13 +1702,9 @@ postgresReScanForeignScan(ForeignScanState *node)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
|
||||||
* without releasing the PGresult.
|
|
||||||
*/
|
|
||||||
res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
|
res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
|
pgfdw_report_error(ERROR, res, fsstate->conn, sql);
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
|
|
||||||
/* Now force a fresh FETCH. */
|
/* Now force a fresh FETCH. */
|
||||||
@ -3608,11 +3604,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
|
|||||||
double *rows, int *width,
|
double *rows, int *width,
|
||||||
Cost *startup_cost, Cost *total_cost)
|
Cost *startup_cost, Cost *total_cost)
|
||||||
{
|
{
|
||||||
PGresult *volatile res = NULL;
|
PGresult *res;
|
||||||
|
|
||||||
/* PGresult must be released before leaving this function. */
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
char *line;
|
char *line;
|
||||||
char *p;
|
char *p;
|
||||||
int n;
|
int n;
|
||||||
@ -3622,7 +3614,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
|
|||||||
*/
|
*/
|
||||||
res = pgfdw_exec_query(conn, sql, NULL);
|
res = pgfdw_exec_query(conn, sql, NULL);
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, false, sql);
|
pgfdw_report_error(ERROR, res, conn, sql);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Extract cost numbers for topmost plan node. Note we search for a
|
* Extract cost numbers for topmost plan node. Note we search for a
|
||||||
@ -3637,12 +3629,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
|
|||||||
startup_cost, total_cost, rows, width);
|
startup_cost, total_cost, rows, width);
|
||||||
if (n != 4)
|
if (n != 4)
|
||||||
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
|
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
|
||||||
}
|
|
||||||
PG_FINALLY();
|
|
||||||
{
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -3782,17 +3769,14 @@ create_cursor(ForeignScanState *node)
|
|||||||
*/
|
*/
|
||||||
if (!PQsendQueryParams(conn, buf.data, numParams,
|
if (!PQsendQueryParams(conn, buf.data, numParams,
|
||||||
NULL, values, NULL, NULL, 0))
|
NULL, values, NULL, NULL, 0))
|
||||||
pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
|
pgfdw_report_error(ERROR, NULL, conn, buf.data);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get the result, and check for success.
|
* Get the result, and check for success.
|
||||||
*
|
|
||||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
|
||||||
* without releasing the PGresult.
|
|
||||||
*/
|
*/
|
||||||
res = pgfdw_get_result(conn);
|
res = pgfdw_get_result(conn);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
|
pgfdw_report_error(ERROR, res, conn, fsstate->query);
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
|
|
||||||
/* Mark the cursor as created, and show no tuples have been retrieved */
|
/* Mark the cursor as created, and show no tuples have been retrieved */
|
||||||
@ -3814,7 +3798,10 @@ static void
|
|||||||
fetch_more_data(ForeignScanState *node)
|
fetch_more_data(ForeignScanState *node)
|
||||||
{
|
{
|
||||||
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
|
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
|
||||||
PGresult *volatile res = NULL;
|
PGconn *conn = fsstate->conn;
|
||||||
|
PGresult *res;
|
||||||
|
int numrows;
|
||||||
|
int i;
|
||||||
MemoryContext oldcontext;
|
MemoryContext oldcontext;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -3825,13 +3812,6 @@ fetch_more_data(ForeignScanState *node)
|
|||||||
MemoryContextReset(fsstate->batch_cxt);
|
MemoryContextReset(fsstate->batch_cxt);
|
||||||
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
|
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
|
||||||
|
|
||||||
/* PGresult must be released before leaving this function. */
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
PGconn *conn = fsstate->conn;
|
|
||||||
int numrows;
|
|
||||||
int i;
|
|
||||||
|
|
||||||
if (fsstate->async_capable)
|
if (fsstate->async_capable)
|
||||||
{
|
{
|
||||||
Assert(fsstate->conn_state->pendingAreq);
|
Assert(fsstate->conn_state->pendingAreq);
|
||||||
@ -3843,7 +3823,7 @@ fetch_more_data(ForeignScanState *node)
|
|||||||
res = pgfdw_get_result(conn);
|
res = pgfdw_get_result(conn);
|
||||||
/* On error, report the original query, not the FETCH. */
|
/* On error, report the original query, not the FETCH. */
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
|
pgfdw_report_error(ERROR, res, conn, fsstate->query);
|
||||||
|
|
||||||
/* Reset per-connection state */
|
/* Reset per-connection state */
|
||||||
fsstate->conn_state->pendingAreq = NULL;
|
fsstate->conn_state->pendingAreq = NULL;
|
||||||
@ -3859,7 +3839,7 @@ fetch_more_data(ForeignScanState *node)
|
|||||||
res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
|
res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
|
||||||
/* On error, report the original query, not the FETCH. */
|
/* On error, report the original query, not the FETCH. */
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
|
pgfdw_report_error(ERROR, res, conn, fsstate->query);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Convert the data into HeapTuples */
|
/* Convert the data into HeapTuples */
|
||||||
@ -3887,12 +3867,8 @@ fetch_more_data(ForeignScanState *node)
|
|||||||
|
|
||||||
/* Must be EOF if we didn't get as many tuples as we asked for. */
|
/* Must be EOF if we didn't get as many tuples as we asked for. */
|
||||||
fsstate->eof_reached = (numrows < fsstate->fetch_size);
|
fsstate->eof_reached = (numrows < fsstate->fetch_size);
|
||||||
}
|
|
||||||
PG_FINALLY();
|
|
||||||
{
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldcontext);
|
MemoryContextSwitchTo(oldcontext);
|
||||||
}
|
}
|
||||||
@ -3966,14 +3942,9 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
|
|||||||
PGresult *res;
|
PGresult *res;
|
||||||
|
|
||||||
snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
|
snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
|
||||||
|
|
||||||
/*
|
|
||||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
|
||||||
* without releasing the PGresult.
|
|
||||||
*/
|
|
||||||
res = pgfdw_exec_query(conn, sql, conn_state);
|
res = pgfdw_exec_query(conn, sql, conn_state);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, true, sql);
|
pgfdw_report_error(ERROR, res, conn, sql);
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4181,18 +4152,15 @@ execute_foreign_modify(EState *estate,
|
|||||||
NULL,
|
NULL,
|
||||||
NULL,
|
NULL,
|
||||||
0))
|
0))
|
||||||
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
|
pgfdw_report_error(ERROR, NULL, fmstate->conn, fmstate->query);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get the result, and check for success.
|
* Get the result, and check for success.
|
||||||
*
|
|
||||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
|
||||||
* without releasing the PGresult.
|
|
||||||
*/
|
*/
|
||||||
res = pgfdw_get_result(fmstate->conn);
|
res = pgfdw_get_result(fmstate->conn);
|
||||||
if (PQresultStatus(res) !=
|
if (PQresultStatus(res) !=
|
||||||
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
|
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
|
||||||
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
|
pgfdw_report_error(ERROR, res, fmstate->conn, fmstate->query);
|
||||||
|
|
||||||
/* Check number of rows affected, and fetch RETURNING tuple if any */
|
/* Check number of rows affected, and fetch RETURNING tuple if any */
|
||||||
if (fmstate->has_returning)
|
if (fmstate->has_returning)
|
||||||
@ -4251,17 +4219,14 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
|
|||||||
fmstate->query,
|
fmstate->query,
|
||||||
0,
|
0,
|
||||||
NULL))
|
NULL))
|
||||||
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
|
pgfdw_report_error(ERROR, NULL, fmstate->conn, fmstate->query);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get the result, and check for success.
|
* Get the result, and check for success.
|
||||||
*
|
|
||||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
|
||||||
* without releasing the PGresult.
|
|
||||||
*/
|
*/
|
||||||
res = pgfdw_get_result(fmstate->conn);
|
res = pgfdw_get_result(fmstate->conn);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
|
pgfdw_report_error(ERROR, res, fmstate->conn, fmstate->query);
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
|
|
||||||
/* This action shows that the prepare has been done. */
|
/* This action shows that the prepare has been done. */
|
||||||
@ -4352,16 +4317,11 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
|
|||||||
/*
|
/*
|
||||||
* store_returning_result
|
* store_returning_result
|
||||||
* Store the result of a RETURNING clause
|
* Store the result of a RETURNING clause
|
||||||
*
|
|
||||||
* On error, be sure to release the PGresult on the way out. Callers do not
|
|
||||||
* have PG_TRY blocks to ensure this happens.
|
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
store_returning_result(PgFdwModifyState *fmstate,
|
store_returning_result(PgFdwModifyState *fmstate,
|
||||||
TupleTableSlot *slot, PGresult *res)
|
TupleTableSlot *slot, PGresult *res)
|
||||||
{
|
{
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
HeapTuple newtup;
|
HeapTuple newtup;
|
||||||
|
|
||||||
newtup = make_tuple_from_result_row(res, 0,
|
newtup = make_tuple_from_result_row(res, 0,
|
||||||
@ -4376,13 +4336,6 @@ store_returning_result(PgFdwModifyState *fmstate,
|
|||||||
* heaptuples directly, so allow for conversion.
|
* heaptuples directly, so allow for conversion.
|
||||||
*/
|
*/
|
||||||
ExecForceStoreHeapTuple(newtup, slot, true);
|
ExecForceStoreHeapTuple(newtup, slot, true);
|
||||||
}
|
|
||||||
PG_CATCH();
|
|
||||||
{
|
|
||||||
PQclear(res);
|
|
||||||
PG_RE_THROW();
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -4418,14 +4371,9 @@ deallocate_query(PgFdwModifyState *fmstate)
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
|
snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
|
||||||
|
|
||||||
/*
|
|
||||||
* We don't use a PG_TRY block here, so be careful not to throw error
|
|
||||||
* without releasing the PGresult.
|
|
||||||
*/
|
|
||||||
res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
|
res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
|
pgfdw_report_error(ERROR, res, fmstate->conn, sql);
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
pfree(fmstate->p_name);
|
pfree(fmstate->p_name);
|
||||||
fmstate->p_name = NULL;
|
fmstate->p_name = NULL;
|
||||||
@ -4593,7 +4541,7 @@ execute_dml_stmt(ForeignScanState *node)
|
|||||||
*/
|
*/
|
||||||
if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
|
if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
|
||||||
NULL, values, NULL, NULL, 0))
|
NULL, values, NULL, NULL, 0))
|
||||||
pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
|
pgfdw_report_error(ERROR, NULL, dmstate->conn, dmstate->query);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get the result, and check for success.
|
* Get the result, and check for success.
|
||||||
@ -4601,7 +4549,7 @@ execute_dml_stmt(ForeignScanState *node)
|
|||||||
dmstate->result = pgfdw_get_result(dmstate->conn);
|
dmstate->result = pgfdw_get_result(dmstate->conn);
|
||||||
if (PQresultStatus(dmstate->result) !=
|
if (PQresultStatus(dmstate->result) !=
|
||||||
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
|
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
|
||||||
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
|
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn,
|
||||||
dmstate->query);
|
dmstate->query);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -4947,7 +4895,7 @@ postgresAnalyzeForeignTable(Relation relation,
|
|||||||
UserMapping *user;
|
UserMapping *user;
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
StringInfoData sql;
|
StringInfoData sql;
|
||||||
PGresult *volatile res = NULL;
|
PGresult *res;
|
||||||
|
|
||||||
/* Return the row-analysis function pointer */
|
/* Return the row-analysis function pointer */
|
||||||
*func = postgresAcquireSampleRowsFunc;
|
*func = postgresAcquireSampleRowsFunc;
|
||||||
@ -4973,22 +4921,14 @@ postgresAnalyzeForeignTable(Relation relation,
|
|||||||
initStringInfo(&sql);
|
initStringInfo(&sql);
|
||||||
deparseAnalyzeSizeSql(&sql, relation);
|
deparseAnalyzeSizeSql(&sql, relation);
|
||||||
|
|
||||||
/* In what follows, do not risk leaking any PGresults. */
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, false, sql.data);
|
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||||
|
|
||||||
if (PQntuples(res) != 1 || PQnfields(res) != 1)
|
if (PQntuples(res) != 1 || PQnfields(res) != 1)
|
||||||
elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
|
elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
|
||||||
*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
|
*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
|
||||||
}
|
|
||||||
PG_FINALLY();
|
|
||||||
{
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
ReleaseConnection(conn);
|
ReleaseConnection(conn);
|
||||||
|
|
||||||
@ -5009,9 +4949,9 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
|
|||||||
UserMapping *user;
|
UserMapping *user;
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
StringInfoData sql;
|
StringInfoData sql;
|
||||||
PGresult *volatile res = NULL;
|
PGresult *res;
|
||||||
volatile double reltuples = -1;
|
double reltuples;
|
||||||
volatile char relkind = 0;
|
char relkind;
|
||||||
|
|
||||||
/* assume the remote relation does not support TABLESAMPLE */
|
/* assume the remote relation does not support TABLESAMPLE */
|
||||||
*can_tablesample = false;
|
*can_tablesample = false;
|
||||||
@ -5030,24 +4970,15 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
|
|||||||
initStringInfo(&sql);
|
initStringInfo(&sql);
|
||||||
deparseAnalyzeInfoSql(&sql, relation);
|
deparseAnalyzeInfoSql(&sql, relation);
|
||||||
|
|
||||||
/* In what follows, do not risk leaking any PGresults. */
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, false, sql.data);
|
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||||
|
|
||||||
if (PQntuples(res) != 1 || PQnfields(res) != 2)
|
if (PQntuples(res) != 1 || PQnfields(res) != 2)
|
||||||
elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
|
elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
|
||||||
reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
|
reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
|
||||||
relkind = *(PQgetvalue(res, 0, 1));
|
relkind = *(PQgetvalue(res, 0, 1));
|
||||||
}
|
|
||||||
PG_FINALLY();
|
|
||||||
{
|
|
||||||
if (res)
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
ReleaseConnection(conn);
|
ReleaseConnection(conn);
|
||||||
|
|
||||||
@ -5090,7 +5021,9 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
|||||||
double reltuples;
|
double reltuples;
|
||||||
unsigned int cursor_number;
|
unsigned int cursor_number;
|
||||||
StringInfoData sql;
|
StringInfoData sql;
|
||||||
PGresult *volatile res = NULL;
|
PGresult *res;
|
||||||
|
char fetch_sql[64];
|
||||||
|
int fetch_size;
|
||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
|
|
||||||
/* Initialize workspace state */
|
/* Initialize workspace state */
|
||||||
@ -5267,17 +5200,10 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
|||||||
|
|
||||||
deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
|
deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
|
||||||
|
|
||||||
/* In what follows, do not risk leaking any PGresults. */
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
char fetch_sql[64];
|
|
||||||
int fetch_size;
|
|
||||||
|
|
||||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, false, sql.data);
|
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
res = NULL;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Determine the fetch size. The default is arbitrary, but shouldn't
|
* Determine the fetch size. The default is arbitrary, but shouldn't
|
||||||
@ -5328,7 +5254,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
|||||||
res = pgfdw_exec_query(conn, fetch_sql, NULL);
|
res = pgfdw_exec_query(conn, fetch_sql, NULL);
|
||||||
/* On error, report the original query, not the FETCH. */
|
/* On error, report the original query, not the FETCH. */
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, false, sql.data);
|
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||||
|
|
||||||
/* Process whatever we got. */
|
/* Process whatever we got. */
|
||||||
numrows = PQntuples(res);
|
numrows = PQntuples(res);
|
||||||
@ -5336,7 +5262,6 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
|||||||
analyze_row_processor(res, i, &astate);
|
analyze_row_processor(res, i, &astate);
|
||||||
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
res = NULL;
|
|
||||||
|
|
||||||
/* Must be EOF if we didn't get all the rows requested. */
|
/* Must be EOF if we didn't get all the rows requested. */
|
||||||
if (numrows < fetch_size)
|
if (numrows < fetch_size)
|
||||||
@ -5345,13 +5270,6 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
|||||||
|
|
||||||
/* Close the cursor, just to be tidy. */
|
/* Close the cursor, just to be tidy. */
|
||||||
close_cursor(conn, cursor_number, NULL);
|
close_cursor(conn, cursor_number, NULL);
|
||||||
}
|
|
||||||
PG_CATCH();
|
|
||||||
{
|
|
||||||
PQclear(res);
|
|
||||||
PG_RE_THROW();
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
ReleaseConnection(conn);
|
ReleaseConnection(conn);
|
||||||
|
|
||||||
@ -5463,7 +5381,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
|
|||||||
UserMapping *mapping;
|
UserMapping *mapping;
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
StringInfoData buf;
|
StringInfoData buf;
|
||||||
PGresult *volatile res = NULL;
|
PGresult *res;
|
||||||
int numrows,
|
int numrows,
|
||||||
i;
|
i;
|
||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
@ -5502,16 +5420,13 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
|
|||||||
/* Create workspace for strings */
|
/* Create workspace for strings */
|
||||||
initStringInfo(&buf);
|
initStringInfo(&buf);
|
||||||
|
|
||||||
/* In what follows, do not risk leaking any PGresults. */
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
/* Check that the schema really exists */
|
/* Check that the schema really exists */
|
||||||
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
|
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
|
||||||
deparseStringLiteral(&buf, stmt->remote_schema);
|
deparseStringLiteral(&buf, stmt->remote_schema);
|
||||||
|
|
||||||
res = pgfdw_exec_query(conn, buf.data, NULL);
|
res = pgfdw_exec_query(conn, buf.data, NULL);
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, false, buf.data);
|
pgfdw_report_error(ERROR, res, conn, buf.data);
|
||||||
|
|
||||||
if (PQntuples(res) != 1)
|
if (PQntuples(res) != 1)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
@ -5520,7 +5435,6 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
|
|||||||
stmt->remote_schema, server->servername)));
|
stmt->remote_schema, server->servername)));
|
||||||
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
res = NULL;
|
|
||||||
resetStringInfo(&buf);
|
resetStringInfo(&buf);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -5628,7 +5542,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
|
|||||||
/* Fetch the data */
|
/* Fetch the data */
|
||||||
res = pgfdw_exec_query(conn, buf.data, NULL);
|
res = pgfdw_exec_query(conn, buf.data, NULL);
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
pgfdw_report_error(ERROR, res, conn, false, buf.data);
|
pgfdw_report_error(ERROR, res, conn, buf.data);
|
||||||
|
|
||||||
/* Process results */
|
/* Process results */
|
||||||
numrows = PQntuples(res);
|
numrows = PQntuples(res);
|
||||||
@ -5733,12 +5647,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
|
|||||||
|
|
||||||
commands = lappend(commands, pstrdup(buf.data));
|
commands = lappend(commands, pstrdup(buf.data));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
PG_FINALLY();
|
|
||||||
{
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
ReleaseConnection(conn);
|
ReleaseConnection(conn);
|
||||||
|
|
||||||
@ -7406,7 +7315,7 @@ postgresForeignAsyncNotify(AsyncRequest *areq)
|
|||||||
|
|
||||||
/* On error, report the original query, not the FETCH. */
|
/* On error, report the original query, not the FETCH. */
|
||||||
if (!PQconsumeInput(fsstate->conn))
|
if (!PQconsumeInput(fsstate->conn))
|
||||||
pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
|
pgfdw_report_error(ERROR, NULL, fsstate->conn, fsstate->query);
|
||||||
|
|
||||||
fetch_more_data(node);
|
fetch_more_data(node);
|
||||||
|
|
||||||
@ -7505,7 +7414,7 @@ fetch_more_data_begin(AsyncRequest *areq)
|
|||||||
fsstate->fetch_size, fsstate->cursor_number);
|
fsstate->fetch_size, fsstate->cursor_number);
|
||||||
|
|
||||||
if (!PQsendQuery(fsstate->conn, sql))
|
if (!PQsendQuery(fsstate->conn, sql))
|
||||||
pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
|
pgfdw_report_error(ERROR, NULL, fsstate->conn, fsstate->query);
|
||||||
|
|
||||||
/* Remember that the request is in process */
|
/* Remember that the request is in process */
|
||||||
fsstate->conn_state->pendingAreq = areq;
|
fsstate->conn_state->pendingAreq = areq;
|
||||||
|
@ -167,7 +167,7 @@ extern PGresult *pgfdw_get_result(PGconn *conn);
|
|||||||
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
|
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
|
||||||
PgFdwConnState *state);
|
PgFdwConnState *state);
|
||||||
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
|
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
|
||||||
bool clear, const char *sql);
|
const char *sql);
|
||||||
|
|
||||||
/* in option.c */
|
/* in option.c */
|
||||||
extern int ExtractConnectionOptions(List *defelems,
|
extern int ExtractConnectionOptions(List *defelems,
|
||||||
|
@ -421,31 +421,22 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
|
|||||||
"IDENTIFY_SYSTEM",
|
"IDENTIFY_SYSTEM",
|
||||||
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
|
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
{
|
|
||||||
PQclear(res);
|
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
errmsg("could not receive database system identifier and timeline ID from "
|
errmsg("could not receive database system identifier and timeline ID from "
|
||||||
"the primary server: %s",
|
"the primary server: %s",
|
||||||
pchomp(PQerrorMessage(conn->streamConn)))));
|
pchomp(PQerrorMessage(conn->streamConn)))));
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
|
* IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
|
||||||
* 9.4 and onwards.
|
* 9.4 and onwards.
|
||||||
*/
|
*/
|
||||||
if (PQnfields(res) < 3 || PQntuples(res) != 1)
|
if (PQnfields(res) < 3 || PQntuples(res) != 1)
|
||||||
{
|
|
||||||
int ntuples = PQntuples(res);
|
|
||||||
int nfields = PQnfields(res);
|
|
||||||
|
|
||||||
PQclear(res);
|
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
errmsg("invalid response from primary server"),
|
errmsg("invalid response from primary server"),
|
||||||
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
|
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
|
||||||
ntuples, nfields, 1, 3)));
|
PQntuples(res), PQnfields(res), 1, 3)));
|
||||||
}
|
|
||||||
primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
|
primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
|
||||||
*primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
|
*primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
@ -607,13 +598,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
else if (PQresultStatus(res) != PGRES_COPY_BOTH)
|
else if (PQresultStatus(res) != PGRES_COPY_BOTH)
|
||||||
{
|
|
||||||
PQclear(res);
|
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
errmsg("could not start WAL streaming: %s",
|
errmsg("could not start WAL streaming: %s",
|
||||||
pchomp(PQerrorMessage(conn->streamConn)))));
|
pchomp(PQerrorMessage(conn->streamConn)))));
|
||||||
}
|
|
||||||
PQclear(res);
|
PQclear(res);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -721,26 +709,17 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
|
|||||||
cmd,
|
cmd,
|
||||||
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
|
WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
{
|
|
||||||
PQclear(res);
|
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
errmsg("could not receive timeline history file from "
|
errmsg("could not receive timeline history file from "
|
||||||
"the primary server: %s",
|
"the primary server: %s",
|
||||||
pchomp(PQerrorMessage(conn->streamConn)))));
|
pchomp(PQerrorMessage(conn->streamConn)))));
|
||||||
}
|
|
||||||
if (PQnfields(res) != 2 || PQntuples(res) != 1)
|
if (PQnfields(res) != 2 || PQntuples(res) != 1)
|
||||||
{
|
|
||||||
int ntuples = PQntuples(res);
|
|
||||||
int nfields = PQnfields(res);
|
|
||||||
|
|
||||||
PQclear(res);
|
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
errmsg("invalid response from primary server"),
|
errmsg("invalid response from primary server"),
|
||||||
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
|
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
|
||||||
ntuples, nfields)));
|
PQntuples(res), PQnfields(res))));
|
||||||
}
|
|
||||||
*filename = pstrdup(PQgetvalue(res, 0, 0));
|
*filename = pstrdup(PQgetvalue(res, 0, 0));
|
||||||
|
|
||||||
*len = PQgetlength(res, 0, 1);
|
*len = PQgetlength(res, 0, 1);
|
||||||
@ -844,13 +823,10 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
|
||||||
PQclear(res);
|
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
errmsg("could not receive data from WAL stream: %s",
|
errmsg("could not receive data from WAL stream: %s",
|
||||||
pchomp(PQerrorMessage(conn->streamConn)))));
|
pchomp(PQerrorMessage(conn->streamConn)))));
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (rawlen < -1)
|
if (rawlen < -1)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
@ -974,13 +950,10 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
|
|||||||
pfree(cmd.data);
|
pfree(cmd.data);
|
||||||
|
|
||||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||||
{
|
|
||||||
PQclear(res);
|
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
(errcode(ERRCODE_PROTOCOL_VIOLATION),
|
||||||
errmsg("could not create replication slot \"%s\": %s",
|
errmsg("could not create replication slot \"%s\": %s",
|
||||||
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
|
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
|
||||||
}
|
|
||||||
|
|
||||||
if (lsn)
|
if (lsn)
|
||||||
*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
|
*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
|
||||||
|
@ -279,11 +279,8 @@ libpqsrv_exec_params(PGconn *conn,
|
|||||||
static inline PGresult *
|
static inline PGresult *
|
||||||
libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
|
libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
|
||||||
{
|
{
|
||||||
PGresult *volatile lastResult = NULL;
|
PGresult *lastResult = NULL;
|
||||||
|
|
||||||
/* In what follows, do not leak any PGresults on an error. */
|
|
||||||
PG_TRY();
|
|
||||||
{
|
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
/* Wait for, and collect, the next PGresult. */
|
/* Wait for, and collect, the next PGresult. */
|
||||||
@ -306,14 +303,6 @@ libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
|
|||||||
PQstatus(conn) == CONNECTION_BAD)
|
PQstatus(conn) == CONNECTION_BAD)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
PG_CATCH();
|
|
||||||
{
|
|
||||||
PQclear(lastResult);
|
|
||||||
PG_RE_THROW();
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
return lastResult;
|
return lastResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user