1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-17 17:02:08 +03:00

libpq: Fix sending queries in pipeline aborted state

When sending queries in pipeline mode, we were careless about leaving
the connection in the right state so that PQgetResult would behave
correctly; trying to read further results after sending a query after
having read a result with an error would sometimes hang.  Fix by
ensuring internal libpq state is changed properly.  All the state
changes were being done by the callers of pqAppendCmdQueueEntry(); it
would have become too repetitious to have this logic in each of them, so
instead put it all in that function and relieve callers of the
responsibility.

Add a test to verify this case.  Without the code fix, this new test
hangs sometimes.

Also, document that PQisBusy() would return false when no queries are
pending result.  This is not intuitively obvious, and NULL would be
obtained by calling PQgetResult() at that point, which is confusing.
Wording by Boris Kolpackov.

In passing, fix bogus use of "false" to mean "0", per Ranier Vilela.

Backpatch to 14.

Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reported-by: Boris Kolpackov <boris@codesynthesis.com>
Discussion: https://postgr.es/m/boris.20210624103805@codesynthesis.com
This commit is contained in:
Alvaro Herrera
2021-07-09 15:57:59 -04:00
parent 8e7811e952
commit ab09679429
3 changed files with 274 additions and 14 deletions

View File

@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn)
/*
* pqAppendCmdQueueEntry
* Append a caller-allocated command queue entry to the queue.
* Append a caller-allocated entry to the command queue, and update
* conn->asyncStatus to account for it.
*
* The query itself must already have been put in the output buffer by the
* caller.
@ -1239,6 +1240,38 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
conn->cmd_queue_tail->next = entry;
conn->cmd_queue_tail = entry;
switch (conn->pipelineStatus)
{
case PQ_PIPELINE_OFF:
case PQ_PIPELINE_ON:
/*
* When not in pipeline aborted state, if there's a result ready
* to be consumed, let it be so (that is, don't change away from
* READY or READY_MORE); otherwise set us busy to wait for
* something to arrive from the server.
*/
if (conn->asyncStatus == PGASYNC_IDLE)
conn->asyncStatus = PGASYNC_BUSY;
break;
case PQ_PIPELINE_ABORTED:
/*
* In aborted pipeline state, we don't expect anything from the
* server (since we don't send any queries that are queued).
* Therefore, if IDLE then do what PQgetResult would do to let
* itself consume commands from the queue; if we're in any other
* state, we don't have to do anything.
*/
if (conn->asyncStatus == PGASYNC_IDLE)
{
resetPQExpBuffer(&conn->errorMessage);
pqPipelineProcessQueue(conn);
}
break;
}
}
/*
@ -1375,7 +1408,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
@ -1510,10 +1542,6 @@ PQsendPrepare(PGconn *conn,
/* if insufficient memory, query just winds up NULL */
entry->query = strdup(query);
pqAppendCmdQueueEntry(conn, entry);
conn->asyncStatus = PGASYNC_BUSY;
/*
* Give the data a push (in pipeline mode, only if we're past the size
* threshold). In nonblock mode, don't complain if we're unable to send
@ -1522,6 +1550,9 @@ PQsendPrepare(PGconn *conn,
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
return 1;
sendFailed:
@ -1815,7 +1846,7 @@ PQsendQueryGuts(PGconn *conn,
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
@ -2445,7 +2476,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
@ -2948,7 +2979,7 @@ pqCommandQueueAdvance(PGconn *conn)
* pqPipelineProcessQueue: subroutine for PQgetResult
* In pipeline mode, start processing the results of the next query in the queue.
*/
void
static void
pqPipelineProcessQueue(PGconn *conn)
{
switch (conn->asyncStatus)
@ -3072,15 +3103,15 @@ PQpipelineSync(PGconn *conn)
pqPutMsgEnd(conn) < 0)
goto sendFailed;
pqAppendCmdQueueEntry(conn, entry);
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
*/
if (PQflush(conn) < 0)
goto sendFailed;
conn->asyncStatus = PGASYNC_BUSY;
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
return 1;
@ -3115,7 +3146,7 @@ PQsendFlushRequest(PGconn *conn)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
return false;
return 0;
}
if (pqPutMsgStart('H', conn) < 0 ||