1
0
mirror of https://github.com/postgres/postgres.git synced 2025-10-25 13:17:41 +03:00

Improve connection-failure error handling in contrib/postgres_fdw.

postgres_fdw tended to say "unknown error" if it tried to execute a command
on an already-dead connection, because some paths in libpq just return a
null PGresult for such cases.  Out-of-memory might result in that, too.
To fix, pass the PGconn to pgfdw_report_error, and look at its
PQerrorMessage() string if we can't get anything out of the PGresult.

Also, fix the transaction-exit logic to reliably drop a dead connection.
It was attempting to do that already, but it assumed that only connection
cache entries with xact_depth > 0 needed to be examined.  The folly in that
is that if we fail while issuing START TRANSACTION, we'll not have bumped
xact_depth.  (At least for the case I was testing, this fix masks the
other problem; but it still seems like a good idea to have the PGconn
fallback logic.)

Per investigation of bug #9087 from Craig Lucas.  Backpatch to 9.3 where
this code was introduced.
This commit is contained in:
Tom Lane
2014-02-03 21:30:02 -05:00
parent 489e6ac5a1
commit 00d4f2af8b
3 changed files with 98 additions and 83 deletions

View File

@@ -355,7 +355,7 @@ do_sql_command(PGconn *conn, const char *sql)
res = PQexec(conn, sql); res = PQexec(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, sql); pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res); PQclear(res);
} }
@@ -454,6 +454,7 @@ GetPrepStmtNumber(PGconn *conn)
* *
* elevel: error level to use (typically ERROR, but might be less) * elevel: error level to use (typically ERROR, but might be less)
* res: PGresult containing the error * res: PGresult containing the error
* conn: connection we did the query on
* clear: if true, PQclear the result (otherwise caller will handle it) * clear: if true, PQclear the result (otherwise caller will handle it)
* sql: NULL, or text of remote command we tried to execute * sql: NULL, or text of remote command we tried to execute
* *
@@ -462,7 +463,8 @@ GetPrepStmtNumber(PGconn *conn)
* marked with have_error = true. * marked with have_error = true.
*/ */
void void
pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql) pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
bool clear, const char *sql)
{ {
/* If requested, PGresult must be released before leaving this function. */ /* If requested, PGresult must be released before leaving this function. */
PG_TRY(); PG_TRY();
@@ -483,6 +485,14 @@ pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
else else
sqlstate = ERRCODE_CONNECTION_FAILURE; sqlstate = ERRCODE_CONNECTION_FAILURE;
/*
* If we don't get a message from the PGresult, try the PGconn. This
* is needed because for connection-level failures, PQexec may just
* return NULL, not a PGresult at all.
*/
if (message_primary == NULL)
message_primary = PQerrorMessage(conn);
ereport(elevel, ereport(elevel,
(errcode(sqlstate), (errcode(sqlstate),
message_primary ? errmsg_internal("%s", message_primary) : message_primary ? errmsg_internal("%s", message_primary) :
@@ -525,74 +535,37 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{ {
PGresult *res; PGresult *res;
/* We only care about connections with open remote transactions */ /* Ignore cache entry if no open connection right now */
if (entry->conn == NULL || entry->xact_depth == 0) if (entry->conn == NULL)
continue; continue;
elog(DEBUG3, "closing remote transaction on connection %p", /* If it has an open remote transaction, try to close it */
entry->conn); if (entry->xact_depth > 0)
switch (event)
{ {
case XACT_EVENT_PRE_COMMIT: elog(DEBUG3, "closing remote transaction on connection %p",
/* Commit all remote transactions during pre-commit */ entry->conn);
do_sql_command(entry->conn, "COMMIT TRANSACTION");
/* switch (event)
* If there were any errors in subtransactions, and we made {
* prepared statements, do a DEALLOCATE ALL to make sure we case XACT_EVENT_PRE_COMMIT:
* get rid of all prepared statements. This is annoying and /* Commit all remote transactions during pre-commit */
* not terribly bulletproof, but it's probably not worth do_sql_command(entry->conn, "COMMIT TRANSACTION");
* trying harder.
*
* DEALLOCATE ALL only exists in 8.3 and later, so this
* constrains how old a server postgres_fdw can communicate
* with. We intentionally ignore errors in the DEALLOCATE, so
* that we can hobble along to some extent with older servers
* (leaking prepared statements as we go; but we don't really
* support update operations pre-8.3 anyway).
*/
if (entry->have_prep_stmt && entry->have_error)
{
res = PQexec(entry->conn, "DEALLOCATE ALL");
PQclear(res);
}
entry->have_prep_stmt = false;
entry->have_error = false;
break;
case XACT_EVENT_PRE_PREPARE:
/* /*
* We disallow remote transactions that modified anything, * If there were any errors in subtransactions, and we
* since it's not really reasonable to hold them open until * made prepared statements, do a DEALLOCATE ALL to make
* the prepared transaction is committed. For the moment, * sure we get rid of all prepared statements. This is
* throw error unconditionally; later we might allow read-only * annoying and not terribly bulletproof, but it's
* cases. Note that the error will cause us to come right * probably not worth trying harder.
* back here with event == XACT_EVENT_ABORT, so we'll clean up *
* the connection state at that point. * DEALLOCATE ALL only exists in 8.3 and later, so this
*/ * constrains how old a server postgres_fdw can
ereport(ERROR, * communicate with. We intentionally ignore errors in
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), * the DEALLOCATE, so that we can hobble along to some
errmsg("cannot prepare a transaction that modified remote tables"))); * extent with older servers (leaking prepared statements
break; * as we go; but we don't really support update operations
case XACT_EVENT_COMMIT: * pre-8.3 anyway).
case XACT_EVENT_PREPARE: */
/* Should not get here -- pre-commit should have handled it */
elog(ERROR, "missed cleaning up connection during pre-commit");
break;
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(WARNING, res, true,
"ABORT TRANSACTION");
else
{
PQclear(res);
/* As above, make sure we've cleared any prepared stmts */
if (entry->have_prep_stmt && entry->have_error) if (entry->have_prep_stmt && entry->have_error)
{ {
res = PQexec(entry->conn, "DEALLOCATE ALL"); res = PQexec(entry->conn, "DEALLOCATE ALL");
@@ -600,8 +573,50 @@ pgfdw_xact_callback(XactEvent event, void *arg)
} }
entry->have_prep_stmt = false; entry->have_prep_stmt = false;
entry->have_error = false; entry->have_error = false;
} break;
break; case XACT_EVENT_PRE_PREPARE:
/*
* We disallow remote transactions that modified anything,
* since it's not very reasonable to hold them open until
* the prepared transaction is committed. For the moment,
* throw error unconditionally; later we might allow
* read-only cases. Note that the error will cause us to
* come right back here with event == XACT_EVENT_ABORT, so
* we'll clean up the connection state at that point.
*/
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot prepare a transaction that modified remote tables")));
break;
case XACT_EVENT_COMMIT:
case XACT_EVENT_PREPARE:
/* Pre-commit should have closed the open transaction */
elog(ERROR, "missed cleaning up connection during pre-commit");
break;
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(WARNING, res, entry->conn, true,
"ABORT TRANSACTION");
else
{
PQclear(res);
/* As above, make sure to clear any prepared stmts */
if (entry->have_prep_stmt && entry->have_error)
{
res = PQexec(entry->conn, "DEALLOCATE ALL");
PQclear(res);
}
entry->have_prep_stmt = false;
entry->have_error = false;
}
break;
}
} }
/* Reset state to show we're out of a transaction */ /* Reset state to show we're out of a transaction */
@@ -689,7 +704,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
curlevel, curlevel); curlevel, curlevel);
res = PQexec(entry->conn, sql); res = PQexec(entry->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(WARNING, res, true, sql); pgfdw_report_error(WARNING, res, entry->conn, true, sql);
else else
PQclear(res); PQclear(res);
} }

View File

@@ -1040,7 +1040,7 @@ postgresReScanForeignScan(ForeignScanState *node)
*/ */
res = PQexec(fsstate->conn, sql); res = PQexec(fsstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, sql); pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res); PQclear(res);
/* Now force a fresh FETCH. */ /* Now force a fresh FETCH. */
@@ -1374,7 +1374,7 @@ postgresExecForeignInsert(EState *estate,
0); 0);
if (PQresultStatus(res) != if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, true, fmstate->query); pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */ /* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning) if (fmstate->has_returning)
@@ -1444,7 +1444,7 @@ postgresExecForeignUpdate(EState *estate,
0); 0);
if (PQresultStatus(res) != if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, true, fmstate->query); pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */ /* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning) if (fmstate->has_returning)
@@ -1514,7 +1514,7 @@ postgresExecForeignDelete(EState *estate,
0); 0);
if (PQresultStatus(res) != if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, true, fmstate->query); pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
/* Check number of rows affected, and fetch RETURNING tuple if any */ /* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning) if (fmstate->has_returning)
@@ -1563,7 +1563,7 @@ postgresEndForeignModify(EState *estate,
*/ */
res = PQexec(fmstate->conn, sql); res = PQexec(fmstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, sql); pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res); PQclear(res);
fmstate->p_name = NULL; fmstate->p_name = NULL;
} }
@@ -1800,7 +1800,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
*/ */
res = PQexec(conn, sql); res = PQexec(conn, sql);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, false, sql); pgfdw_report_error(ERROR, res, conn, false, sql);
/* /*
* Extract cost numbers for topmost plan node. Note we search for a * Extract cost numbers for topmost plan node. Note we search for a
@@ -1934,7 +1934,7 @@ create_cursor(ForeignScanState *node)
res = PQexecParams(conn, buf.data, numParams, NULL, values, res = PQexecParams(conn, buf.data, numParams, NULL, values,
NULL, NULL, 0); NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, fsstate->query); pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
PQclear(res); PQclear(res);
/* Mark the cursor as created, and show no tuples have been retrieved */ /* Mark the cursor as created, and show no tuples have been retrieved */
@@ -1985,7 +1985,7 @@ fetch_more_data(ForeignScanState *node)
res = PQexec(conn, sql); res = PQexec(conn, sql);
/* On error, report the original query, not the FETCH. */ /* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, false, fsstate->query); pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
/* Convert the data into HeapTuples */ /* Convert the data into HeapTuples */
numrows = PQntuples(res); numrows = PQntuples(res);
@@ -2091,7 +2091,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
*/ */
res = PQexec(conn, sql); res = PQexec(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, sql); pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res); PQclear(res);
} }
@@ -2128,7 +2128,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
NULL); NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, fmstate->query); pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
PQclear(res); PQclear(res);
/* This action shows that the prepare has been done. */ /* This action shows that the prepare has been done. */
@@ -2278,7 +2278,7 @@ postgresAnalyzeForeignTable(Relation relation,
{ {
res = PQexec(conn, sql.data); res = PQexec(conn, sql.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, false, sql.data); pgfdw_report_error(ERROR, res, conn, false, sql.data);
if (PQntuples(res) != 1 || PQnfields(res) != 1) if (PQntuples(res) != 1 || PQnfields(res) != 1)
elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
@@ -2372,7 +2372,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
{ {
res = PQexec(conn, sql.data); res = PQexec(conn, sql.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, false, sql.data); pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res); PQclear(res);
res = NULL; res = NULL;
@@ -2403,7 +2403,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
res = PQexec(conn, fetch_sql); res = PQexec(conn, fetch_sql);
/* On error, report the original query, not the FETCH. */ /* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, false, sql.data); pgfdw_report_error(ERROR, res, conn, false, sql.data);
/* Process whatever we got. */ /* Process whatever we got. */
numrows = PQntuples(res); numrows = PQntuples(res);

View File

@@ -30,8 +30,8 @@ extern PGconn *GetConnection(ForeignServer *server, UserMapping *user,
extern void ReleaseConnection(PGconn *conn); extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern void pgfdw_report_error(int elevel, PGresult *res, bool clear, extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
const char *sql); bool clear, const char *sql);
/* in option.c */ /* in option.c */
extern int ExtractConnectionOptions(List *defelems, extern int ExtractConnectionOptions(List *defelems,