mirror of
https://github.com/postgres/postgres.git
synced 2025-05-17 06:41:24 +03:00
Fix handling of errors in libpq pipelines
The logic to keep the libpq command queue in sync with queries that have been processed had a bug when errors were returned for reasons other than problems in queries -- for example, when a connection is lost. We incorrectly consumed an element from the command queue every time, but this is wrong and can lead to the queue becoming empty ahead of time, leading to later malfunction: PQgetResult would return nothing, potentially causing the calling application to enter a busy loop. Fix by making the SYNC queue element a barrier that can only be consumed when a SYNC message is received. Backpatch to 14. Reported by: Иван Трофимов (Ivan Trofimov) <i.trofimow@yandex.ru> Discussion: https://postgr.es/m/17948-fcace7557e449957@postgresql.org
This commit is contained in:
parent
25f2a43756
commit
1171c6e741
@ -2119,29 +2119,21 @@ PQgetResult(PGconn *conn)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* We're about to return the NULL that terminates the round of
|
* We're about to return the NULL that terminates the round of
|
||||||
* results from the current query; prepare to send the results
|
* results from the current query; prepare to send the results of
|
||||||
* of the next query, if any, when we're called next. If there's
|
* the next query, if any, when we're called next. If there's no
|
||||||
* no next element in the command queue, this gets us in IDLE
|
* next element in the command queue, this gets us in IDLE state.
|
||||||
* state.
|
|
||||||
*/
|
*/
|
||||||
pqPipelineProcessQueue(conn);
|
pqPipelineProcessQueue(conn);
|
||||||
res = NULL; /* query is complete */
|
res = NULL; /* query is complete */
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case PGASYNC_READY:
|
case PGASYNC_READY:
|
||||||
|
|
||||||
/*
|
|
||||||
* For any query type other than simple query protocol, we advance
|
|
||||||
* the command queue here. This is because for simple query
|
|
||||||
* protocol we can get the READY state multiple times before the
|
|
||||||
* command is actually complete, since the command string can
|
|
||||||
* contain many queries. In simple query protocol, the queue
|
|
||||||
* advance is done by fe-protocol3 when it receives ReadyForQuery.
|
|
||||||
*/
|
|
||||||
if (conn->cmd_queue_head &&
|
|
||||||
conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE)
|
|
||||||
pqCommandQueueAdvance(conn);
|
|
||||||
res = pqPrepareAsyncResult(conn);
|
res = pqPrepareAsyncResult(conn);
|
||||||
|
|
||||||
|
/* Advance the queue as appropriate */
|
||||||
|
pqCommandQueueAdvance(conn, false,
|
||||||
|
res->resultStatus == PGRES_PIPELINE_SYNC);
|
||||||
|
|
||||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
@ -3040,18 +3032,44 @@ PQexitPipelineMode(PGconn *conn)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* pqCommandQueueAdvance
|
* pqCommandQueueAdvance
|
||||||
* Remove one query from the command queue, when we receive
|
* Remove one query from the command queue, if appropriate.
|
||||||
* all results from the server that pertain to it.
|
*
|
||||||
|
* If we have received all results corresponding to the head element
|
||||||
|
* in the command queue, remove it.
|
||||||
|
*
|
||||||
|
* In simple query protocol we must not advance the command queue until the
|
||||||
|
* ReadyForQuery message has been received. This is because in simple mode a
|
||||||
|
* command can have multiple queries, and we must process result for all of
|
||||||
|
* them before moving on to the next command.
|
||||||
|
*
|
||||||
|
* Another consideration is synchronization during error processing in
|
||||||
|
* extended query protocol: we refuse to advance the queue past a SYNC queue
|
||||||
|
* element, unless the result we've received is also a SYNC. In particular
|
||||||
|
* this protects us from advancing when an error is received at an
|
||||||
|
* inappropriate moment.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
pqCommandQueueAdvance(PGconn *conn)
|
pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery, bool gotSync)
|
||||||
{
|
{
|
||||||
PGcmdQueueEntry *prevquery;
|
PGcmdQueueEntry *prevquery;
|
||||||
|
|
||||||
if (conn->cmd_queue_head == NULL)
|
if (conn->cmd_queue_head == NULL)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/* delink from queue */
|
/*
|
||||||
|
* If processing a query of simple query protocol, we only advance the
|
||||||
|
* queue when we receive the ReadyForQuery message for it.
|
||||||
|
*/
|
||||||
|
if (conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE && !isReadyForQuery)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we're waiting for a SYNC, don't advance the queue until we get one.
|
||||||
|
*/
|
||||||
|
if (conn->cmd_queue_head->queryclass == PGQUERY_SYNC && !gotSync)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* delink element from queue */
|
||||||
prevquery = conn->cmd_queue_head;
|
prevquery = conn->cmd_queue_head;
|
||||||
conn->cmd_queue_head = conn->cmd_queue_head->next;
|
conn->cmd_queue_head = conn->cmd_queue_head->next;
|
||||||
|
|
||||||
@ -3059,7 +3077,7 @@ pqCommandQueueAdvance(PGconn *conn)
|
|||||||
if (conn->cmd_queue_head == NULL)
|
if (conn->cmd_queue_head == NULL)
|
||||||
conn->cmd_queue_tail = NULL;
|
conn->cmd_queue_tail = NULL;
|
||||||
|
|
||||||
/* and make it recyclable */
|
/* and make the queue element recyclable */
|
||||||
prevquery->next = NULL;
|
prevquery->next = NULL;
|
||||||
pqRecycleCmdQueueEntry(conn, prevquery);
|
pqRecycleCmdQueueEntry(conn, prevquery);
|
||||||
}
|
}
|
||||||
@ -3083,6 +3101,7 @@ pqPipelineProcessQueue(PGconn *conn)
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
case PGASYNC_IDLE:
|
case PGASYNC_IDLE:
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we're in IDLE mode and there's some command in the queue,
|
* If we're in IDLE mode and there's some command in the queue,
|
||||||
* get us into PIPELINE_IDLE mode and process normally. Otherwise
|
* get us into PIPELINE_IDLE mode and process normally. Otherwise
|
||||||
|
@ -240,13 +240,8 @@ pqParseInput3(PGconn *conn)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/*
|
/* Advance the command queue and set us idle */
|
||||||
* In simple query protocol, advance the command queue
|
pqCommandQueueAdvance(conn, true, false);
|
||||||
* (see PQgetResult).
|
|
||||||
*/
|
|
||||||
if (conn->cmd_queue_head &&
|
|
||||||
conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE)
|
|
||||||
pqCommandQueueAdvance(conn);
|
|
||||||
conn->asyncStatus = PGASYNC_IDLE;
|
conn->asyncStatus = PGASYNC_IDLE;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -673,7 +673,8 @@ extern void pqSaveMessageField(PGresult *res, char code,
|
|||||||
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
|
extern void pqSaveParameterStatus(PGconn *conn, const char *name,
|
||||||
const char *value);
|
const char *value);
|
||||||
extern int pqRowProcessor(PGconn *conn, const char **errmsgp);
|
extern int pqRowProcessor(PGconn *conn, const char **errmsgp);
|
||||||
extern void pqCommandQueueAdvance(PGconn *conn);
|
extern void pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery,
|
||||||
|
bool gotSync);
|
||||||
extern int PQsendQueryContinue(PGconn *conn, const char *query);
|
extern int PQsendQueryContinue(PGconn *conn, const char *query);
|
||||||
|
|
||||||
/* === in fe-protocol3.c === */
|
/* === in fe-protocol3.c === */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user