mirror of
https://github.com/postgres/postgres.git
synced 2025-06-29 10:41:53 +03:00
libpq: Improve idle state handling in pipeline mode
We were going into IDLE state too soon when executing queries via PQsendQuery in pipeline mode, causing several scenarios to misbehave in different ways -- most notably, as reported by Daniele Varrazzo, that a warning message is produced by libpq: message type 0x33 arrived from server while idle But it is also possible, if queries are sent and results consumed not in lockstep, for the expected mediating NULL result values from PQgetResult to be lost (a problem which has not been reported, but which is more serious). Fix this by introducing two new concepts: one is a command queue element PGQUERY_CLOSE to tell libpq to wait for the CloseComplete server response to the Close message that is sent by PQsendQuery. Because the application is not expecting any PGresult from this, the mechanism to consume it is a bit hackish. The other concept, authored by Horiguchi-san, is a PGASYNC_PIPELINE_IDLE state for libpq's state machine to differentiate "really idle" from merely "the idle state that occurs in between reading results from the server for elements in the pipeline". This makes libpq not go fully IDLE when the libpq command queue contains entries; in normal cases, we only go IDLE once at the end of the pipeline, when the server response to the final SYNC message is received. (However, there are corner cases it doesn't fix, such as terminating the query sequence by PQsendFlushRequest instead of PQpipelineSync; this sort of scenario is what requires PGQUERY_CLOSE bit above.) This last bit helps make the libpq state machine clearer; in particular we can get rid of an ugly hack in pqParseInput3 to avoid considering IDLE as such when the command queue contains entries. A new test mode is added to libpq_pipeline.c to tickle some related problematic cases. Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com> Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
This commit is contained in:
@ -1279,7 +1279,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
|
||||
* itself consume commands from the queue; if we're in any other
|
||||
* state, we don't have to do anything.
|
||||
*/
|
||||
if (conn->asyncStatus == PGASYNC_IDLE)
|
||||
if (conn->asyncStatus == PGASYNC_IDLE ||
|
||||
conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
|
||||
{
|
||||
resetPQExpBuffer(&conn->errorMessage);
|
||||
pqPipelineProcessQueue(conn);
|
||||
@ -1338,6 +1339,7 @@ static int
|
||||
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
|
||||
{
|
||||
PGcmdQueueEntry *entry = NULL;
|
||||
PGcmdQueueEntry *entry2 = NULL;
|
||||
|
||||
if (!PQsendQueryStart(conn, newQuery))
|
||||
return 0;
|
||||
@ -1353,6 +1355,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
|
||||
entry = pqAllocCmdQueueEntry(conn);
|
||||
if (entry == NULL)
|
||||
return 0; /* error msg already set */
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
{
|
||||
entry2 = pqAllocCmdQueueEntry(conn);
|
||||
if (entry2 == NULL)
|
||||
goto sendFailed;
|
||||
}
|
||||
|
||||
/* Send the query message(s) */
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
@ -1422,6 +1430,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
|
||||
|
||||
/* OK, it's launched! */
|
||||
pqAppendCmdQueueEntry(conn, entry);
|
||||
|
||||
/*
|
||||
* When pipeline mode is in use, we need a second entry in the command
|
||||
* queue to represent Close Portal message. This allows us later to wait
|
||||
* for the CloseComplete message to be received before getting in IDLE
|
||||
* state.
|
||||
*/
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
{
|
||||
entry2->queryclass = PGQUERY_CLOSE;
|
||||
entry2->query = NULL;
|
||||
pqAppendCmdQueueEntry(conn, entry2);
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
||||
sendFailed:
|
||||
@ -1667,11 +1689,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
|
||||
switch (conn->asyncStatus)
|
||||
{
|
||||
case PGASYNC_IDLE:
|
||||
case PGASYNC_PIPELINE_IDLE:
|
||||
case PGASYNC_READY:
|
||||
case PGASYNC_READY_MORE:
|
||||
case PGASYNC_BUSY:
|
||||
/* ok to queue */
|
||||
break;
|
||||
|
||||
case PGASYNC_COPY_IN:
|
||||
case PGASYNC_COPY_OUT:
|
||||
case PGASYNC_COPY_BOTH:
|
||||
@ -2047,19 +2071,22 @@ PQgetResult(PGconn *conn)
|
||||
{
|
||||
case PGASYNC_IDLE:
|
||||
res = NULL; /* query is complete */
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
|
||||
{
|
||||
/*
|
||||
* We're about to return the NULL that terminates the round of
|
||||
* results from the current query; prepare to send the results
|
||||
* of the next query when we're called next. Also, since this
|
||||
* is the start of the results of the next query, clear any
|
||||
* prior error message.
|
||||
*/
|
||||
resetPQExpBuffer(&conn->errorMessage);
|
||||
pqPipelineProcessQueue(conn);
|
||||
}
|
||||
break;
|
||||
case PGASYNC_PIPELINE_IDLE:
|
||||
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
|
||||
|
||||
/*
|
||||
* We're about to return the NULL that terminates the round of
|
||||
* results from the current query; prepare to send the results
|
||||
* of the next query, if any, when we're called next. If there's
|
||||
* no next element in the command queue, this gets us in IDLE
|
||||
* state.
|
||||
*/
|
||||
resetPQExpBuffer(&conn->errorMessage);
|
||||
pqPipelineProcessQueue(conn);
|
||||
res = NULL; /* query is complete */
|
||||
break;
|
||||
|
||||
case PGASYNC_READY:
|
||||
|
||||
/*
|
||||
@ -2080,7 +2107,7 @@ PQgetResult(PGconn *conn)
|
||||
* We're about to send the results of the current query. Set
|
||||
* us idle now, and ...
|
||||
*/
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
|
||||
|
||||
/*
|
||||
* ... in cases when we're sending a pipeline-sync result,
|
||||
@ -2124,6 +2151,22 @@ PQgetResult(PGconn *conn)
|
||||
break;
|
||||
}
|
||||
|
||||
/* If the next command we expect is CLOSE, read and consume it */
|
||||
if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
|
||||
conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
|
||||
{
|
||||
if (res && res->resultStatus != PGRES_FATAL_ERROR)
|
||||
{
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
parseInput(conn);
|
||||
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
|
||||
}
|
||||
else
|
||||
/* we won't ever see the Close */
|
||||
pqCommandQueueAdvance(conn);
|
||||
}
|
||||
|
||||
if (res)
|
||||
{
|
||||
int i;
|
||||
@ -2932,7 +2975,10 @@ PQexitPipelineMode(PGconn *conn)
|
||||
if (!conn)
|
||||
return 0;
|
||||
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
|
||||
(conn->asyncStatus == PGASYNC_IDLE ||
|
||||
conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
|
||||
conn->cmd_queue_head == NULL)
|
||||
return 1;
|
||||
|
||||
switch (conn->asyncStatus)
|
||||
@ -2949,9 +2995,16 @@ PQexitPipelineMode(PGconn *conn)
|
||||
libpq_gettext("cannot exit pipeline mode while busy\n"));
|
||||
return 0;
|
||||
|
||||
default:
|
||||
case PGASYNC_IDLE:
|
||||
case PGASYNC_PIPELINE_IDLE:
|
||||
/* OK */
|
||||
break;
|
||||
|
||||
case PGASYNC_COPY_IN:
|
||||
case PGASYNC_COPY_OUT:
|
||||
case PGASYNC_COPY_BOTH:
|
||||
appendPQExpBufferStr(&conn->errorMessage,
|
||||
libpq_gettext("cannot exit pipeline mode while in COPY\n"));
|
||||
}
|
||||
|
||||
/* still work to process */
|
||||
@ -2988,6 +3041,10 @@ pqCommandQueueAdvance(PGconn *conn)
|
||||
prevquery = conn->cmd_queue_head;
|
||||
conn->cmd_queue_head = conn->cmd_queue_head->next;
|
||||
|
||||
/* If the queue is now empty, reset the tail too */
|
||||
if (conn->cmd_queue_head == NULL)
|
||||
conn->cmd_queue_tail = NULL;
|
||||
|
||||
/* and make it recyclable */
|
||||
prevquery->next = NULL;
|
||||
pqRecycleCmdQueueEntry(conn, prevquery);
|
||||
@ -3010,15 +3067,35 @@ pqPipelineProcessQueue(PGconn *conn)
|
||||
case PGASYNC_BUSY:
|
||||
/* client still has to process current query or results */
|
||||
return;
|
||||
|
||||
case PGASYNC_IDLE:
|
||||
/*
|
||||
* If we're in IDLE mode and there's some command in the queue,
|
||||
* get us into PIPELINE_IDLE mode and process normally. Otherwise
|
||||
* there's nothing for us to do.
|
||||
*/
|
||||
if (conn->cmd_queue_head != NULL)
|
||||
{
|
||||
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
|
||||
break;
|
||||
}
|
||||
return;
|
||||
|
||||
case PGASYNC_PIPELINE_IDLE:
|
||||
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
|
||||
/* next query please */
|
||||
break;
|
||||
}
|
||||
|
||||
/* Nothing to do if not in pipeline mode, or queue is empty */
|
||||
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
|
||||
conn->cmd_queue_head == NULL)
|
||||
/*
|
||||
* If there are no further commands to process in the queue, get us in
|
||||
* "real idle" mode now.
|
||||
*/
|
||||
if (conn->cmd_queue_head == NULL)
|
||||
{
|
||||
conn->asyncStatus = PGASYNC_IDLE;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Initialize async result-accumulation state */
|
||||
pqClearAsyncResult(conn);
|
||||
@ -3105,6 +3182,7 @@ PQpipelineSync(PGconn *conn)
|
||||
case PGASYNC_READY_MORE:
|
||||
case PGASYNC_BUSY:
|
||||
case PGASYNC_IDLE:
|
||||
case PGASYNC_PIPELINE_IDLE:
|
||||
/* OK to send sync */
|
||||
break;
|
||||
}
|
||||
|
@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
|
||||
if (conn->asyncStatus != PGASYNC_IDLE)
|
||||
return;
|
||||
|
||||
/*
|
||||
* We're also notionally not-IDLE when in pipeline mode the state
|
||||
* says "idle" (so we have completed receiving the results of one
|
||||
* query from the server and dispatched them to the application)
|
||||
* but another query is queued; yield back control to caller so
|
||||
* that they can initiate processing of the next query in the
|
||||
* queue.
|
||||
*/
|
||||
if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
|
||||
conn->cmd_queue_head != NULL)
|
||||
return;
|
||||
|
||||
/*
|
||||
* Unexpected message in IDLE state; need to recover somehow.
|
||||
* ERROR messages are handled using the notice processor;
|
||||
@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn)
|
||||
}
|
||||
break;
|
||||
case '2': /* Bind Complete */
|
||||
/* Nothing to do for this message type */
|
||||
break;
|
||||
case '3': /* Close Complete */
|
||||
/* Nothing to do for these message types */
|
||||
/*
|
||||
* If we get CloseComplete when waiting for it, consume
|
||||
* the queue element and keep going. A result is not
|
||||
* expected from this message; it is just there so that
|
||||
* we know to wait for it when PQsendQuery is used in
|
||||
* pipeline mode, before going in IDLE state. Failing to
|
||||
* do this makes us receive CloseComplete when IDLE, which
|
||||
* creates problems.
|
||||
*/
|
||||
if (conn->cmd_queue_head &&
|
||||
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
|
||||
{
|
||||
pqCommandQueueAdvance(conn);
|
||||
}
|
||||
|
||||
break;
|
||||
case 'S': /* parameter status */
|
||||
if (getParameterStatus(conn))
|
||||
|
@ -224,7 +224,8 @@ typedef enum
|
||||
* query */
|
||||
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
|
||||
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
|
||||
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
|
||||
PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
|
||||
PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */
|
||||
} PGAsyncStatusType;
|
||||
|
||||
/* Target server type (decoded value of target_session_attrs) */
|
||||
@ -310,7 +311,8 @@ typedef enum
|
||||
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
|
||||
PGQUERY_PREPARE, /* Parse only (PQprepare) */
|
||||
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
|
||||
PGQUERY_SYNC /* Sync (at end of a pipeline) */
|
||||
PGQUERY_SYNC, /* Sync (at end of a pipeline) */
|
||||
PGQUERY_CLOSE
|
||||
} PGQueryClass;
|
||||
|
||||
/*
|
||||
|
Reference in New Issue
Block a user