mirror of
https://github.com/postgres/postgres.git
synced 2025-08-24 09:27:52 +03:00
Fix libpq state machine in pipeline mode
The original coding required that PQpipelineSync had been called before
the first call to PQgetResult, and failure to do that would result in an
unexpected NULL result being returned. Fix by setting the right state
when a query is sent, rather than leaving it unchanged and having
PQpipelineSync apply the necessary state change.
A new test case to verify the behavior is added, which relies on the new
PQsendFlushRequest() function added by commit a7192326c7
.
Backpatch to 14, where pipeline mode was added.
Reported-by: Boris Kolpackov <boris@codesynthesis.com>
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/boris.20210616110321@codesynthesis.com
This commit is contained in:
@@ -230,6 +230,93 @@ test_multi_pipelines(PGconn *conn)
|
||||
fprintf(stderr, "ok\n");
|
||||
}
|
||||
|
||||
/*
|
||||
* Test behavior when a pipeline dispatches a number of commands that are
|
||||
* not flushed by a sync point.
|
||||
*/
|
||||
static void
|
||||
test_nosync(PGconn *conn)
|
||||
{
|
||||
int numqueries = 10;
|
||||
int results = 0;
|
||||
int sock = PQsocket(conn);
|
||||
|
||||
fprintf(stderr, "nosync... ");
|
||||
|
||||
if (sock < 0)
|
||||
pg_fatal("invalid socket");
|
||||
|
||||
if (PQenterPipelineMode(conn) != 1)
|
||||
pg_fatal("could not enter pipeline mode");
|
||||
for (int i = 0; i < numqueries; i++)
|
||||
{
|
||||
fd_set input_mask;
|
||||
struct timeval tv;
|
||||
|
||||
if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
|
||||
0, NULL, NULL, NULL, NULL, 0) != 1)
|
||||
pg_fatal("error sending select: %s", PQerrorMessage(conn));
|
||||
PQflush(conn);
|
||||
|
||||
/*
|
||||
* If the server has written anything to us, read (some of) it now.
|
||||
*/
|
||||
FD_ZERO(&input_mask);
|
||||
FD_SET(sock, &input_mask);
|
||||
tv.tv_sec = 0;
|
||||
tv.tv_usec = 0;
|
||||
if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
|
||||
{
|
||||
fprintf(stderr, "select() failed: %s\n", strerror(errno));
|
||||
exit_nicely(conn);
|
||||
}
|
||||
if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
|
||||
pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
|
||||
}
|
||||
|
||||
/* tell server to flush its output buffer */
|
||||
if (PQsendFlushRequest(conn) != 1)
|
||||
pg_fatal("failed to send flush request");
|
||||
PQflush(conn);
|
||||
|
||||
/* Now read all results */
|
||||
for (;;)
|
||||
{
|
||||
PGresult *res;
|
||||
|
||||
res = PQgetResult(conn);
|
||||
|
||||
/* NULL results are only expected after TUPLES_OK */
|
||||
if (res == NULL)
|
||||
pg_fatal("got unexpected NULL result after %d results", results);
|
||||
|
||||
/* We expect exactly one TUPLES_OK result for each query we sent */
|
||||
if (PQresultStatus(res) == PGRES_TUPLES_OK)
|
||||
{
|
||||
PGresult *res2;
|
||||
|
||||
/* and one NULL result should follow each */
|
||||
res2 = PQgetResult(conn);
|
||||
if (res2 != NULL)
|
||||
pg_fatal("expected NULL, got %s",
|
||||
PQresStatus(PQresultStatus(res2)));
|
||||
PQclear(res);
|
||||
results++;
|
||||
|
||||
/* if we're done, we're done */
|
||||
if (results == numqueries)
|
||||
break;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
/* anything else is unexpected */
|
||||
pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
|
||||
}
|
||||
|
||||
fprintf(stderr, "ok\n");
|
||||
}
|
||||
|
||||
/*
|
||||
* When an operation in a pipeline fails the rest of the pipeline is flushed. We
|
||||
* still have to get results for each pipeline item, but the item will just be
|
||||
@@ -1237,6 +1324,7 @@ print_test_list(void)
|
||||
{
|
||||
printf("disallowed_in_pipeline\n");
|
||||
printf("multi_pipelines\n");
|
||||
printf("nosync\n");
|
||||
printf("pipeline_abort\n");
|
||||
printf("pipelined_insert\n");
|
||||
printf("prepared\n");
|
||||
@@ -1334,6 +1422,8 @@ main(int argc, char **argv)
|
||||
test_disallowed_in_pipeline(conn);
|
||||
else if (strcmp(testname, "multi_pipelines") == 0)
|
||||
test_multi_pipelines(conn);
|
||||
else if (strcmp(testname, "nosync") == 0)
|
||||
test_nosync(conn);
|
||||
else if (strcmp(testname, "pipeline_abort") == 0)
|
||||
test_pipeline_abort(conn);
|
||||
else if (strcmp(testname, "pipelined_insert") == 0)
|
||||
|
Reference in New Issue
Block a user