diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 59e3e678f9e..2e4f615a659 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -5171,7 +5171,10 @@ int PQflush(PGconn *conn);
PQisBusy, PQconsumeInput, etc
- operate as normal when processing pipeline results.
+ operate as normal when processing pipeline results. In particular,
+ a call to PQisBusy in the middle of a pipeline
+ returns 0 if the results for all the queries issued so far have been
+ consumed.
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index b13ddab393b..aca81890bb1 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn)
/*
* pqAppendCmdQueueEntry
- * Append a caller-allocated command queue entry to the queue.
+ * Append a caller-allocated entry to the command queue, and update
+ * conn->asyncStatus to account for it.
*
* The query itself must already have been put in the output buffer by the
* caller.
@@ -1239,6 +1240,38 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
conn->cmd_queue_tail->next = entry;
conn->cmd_queue_tail = entry;
+
+ switch (conn->pipelineStatus)
+ {
+ case PQ_PIPELINE_OFF:
+ case PQ_PIPELINE_ON:
+
+ /*
+ * When not in pipeline aborted state, if there's a result ready
+ * to be consumed, let it be so (that is, don't change away from
+ * READY or READY_MORE); otherwise set us busy to wait for
+ * something to arrive from the server.
+ */
+ if (conn->asyncStatus == PGASYNC_IDLE)
+ conn->asyncStatus = PGASYNC_BUSY;
+ break;
+
+ case PQ_PIPELINE_ABORTED:
+
+ /*
+ * In aborted pipeline state, we don't expect anything from the
+ * server (since we don't send any queries that are queued).
+ * Therefore, if IDLE then do what PQgetResult would do to let
+ * itself consume commands from the queue; if we're in any other
+ * state, we don't have to do anything.
+ */
+ if (conn->asyncStatus == PGASYNC_IDLE)
+ {
+ resetPQExpBuffer(&conn->errorMessage);
+ pqPipelineProcessQueue(conn);
+ }
+ break;
+ }
}
/*
@@ -1375,7 +1408,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
- conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
@@ -1510,10 +1542,6 @@ PQsendPrepare(PGconn *conn,
/* if insufficient memory, query just winds up NULL */
entry->query = strdup(query);
- pqAppendCmdQueueEntry(conn, entry);
-
- conn->asyncStatus = PGASYNC_BUSY;
-
/*
* Give the data a push (in pipeline mode, only if we're past the size
* threshold). In nonblock mode, don't complain if we're unable to send
@@ -1522,6 +1550,9 @@ PQsendPrepare(PGconn *conn,
if (pqPipelineFlush(conn) < 0)
goto sendFailed;
+ /* OK, it's launched! */
+ pqAppendCmdQueueEntry(conn, entry);
+
return 1;
sendFailed:
@@ -1815,7 +1846,7 @@ PQsendQueryGuts(PGconn *conn,
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
- conn->asyncStatus = PGASYNC_BUSY;
+
return 1;
sendFailed:
@@ -2445,7 +2476,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
- conn->asyncStatus = PGASYNC_BUSY;
+
return 1;
sendFailed:
@@ -2948,7 +2979,7 @@ pqCommandQueueAdvance(PGconn *conn)
* pqPipelineProcessQueue: subroutine for PQgetResult
* In pipeline mode, start processing the results of the next query in the queue.
*/
-void
+static void
pqPipelineProcessQueue(PGconn *conn)
{
switch (conn->asyncStatus)
@@ -3072,15 +3103,15 @@ PQpipelineSync(PGconn *conn)
pqPutMsgEnd(conn) < 0)
goto sendFailed;
- pqAppendCmdQueueEntry(conn, entry);
-
/*
* Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed.
*/
if (PQflush(conn) < 0)
goto sendFailed;
- conn->asyncStatus = PGASYNC_BUSY;
+
+ /* OK, it's launched! */
+ pqAppendCmdQueueEntry(conn, entry);
return 1;
@@ -3115,7 +3146,7 @@ PQsendFlushRequest(PGconn *conn)
{
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
- return false;
+ return 0;
}
if (pqPutMsgStart('H', conn) < 0 ||
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 249ee22105c..c27c4e0adaf 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -28,6 +28,8 @@
static void exit_nicely(PGconn *conn);
+static bool process_result(PGconn *conn, PGresult *res, int results,
+ int numsent);
const char *const progname = "libpq_pipeline";
@@ -1307,6 +1309,227 @@ test_transaction(PGconn *conn)
fprintf(stderr, "ok\n");
}
+/*
+ * In this test mode we send a stream of queries, with one in the middle
+ * causing an error. Verify that we can still send some more after the
+ * error and have libpq work properly.
+ */
+static void
+test_uniqviol(PGconn *conn)
+{
+ int sock = PQsocket(conn);
+ PGresult *res;
+ Oid paramTypes[2] = {INT8OID, INT8OID};
+ const char *paramValues[2];
+ char paramValue0[MAXINT8LEN];
+ char paramValue1[MAXINT8LEN];
+ int ctr = 0;
+ int numsent = 0;
+ int results = 0;
+ bool read_done = false;
+ bool write_done = false;
+ bool error_sent = false;
+ bool got_error = false;
+ int switched = 0;
+ int socketful = 0;
+ fd_set in_fds;
+ fd_set out_fds;
+
+ fprintf(stderr, "uniqviol ...");
+
+ PQsetnonblocking(conn, 1);
+
+ paramValues[0] = paramValue0;
+ paramValues[1] = paramValue1;
+ sprintf(paramValue1, "42");
+
+ res = PQexec(conn, "drop table if exists ppln_uniqviol;"
+ "create table ppln_uniqviol(id bigint primary key, idata bigint)");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to create table: %s", PQerrorMessage(conn));
+
+ res = PQexec(conn, "begin");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
+
+ res = PQprepare(conn, "insertion",
+ "insert into ppln_uniqviol values ($1, $2) returning id",
+ 2, paramTypes);
+ if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode");
+
+ while (!read_done)
+ {
+ /*
+ * Avoid deadlocks by reading everything the server has sent before
+ * sending anything. (Special precaution is needed here to process
+ * PQisBusy before testing the socket for read-readiness, because the
+ * socket does not turn read-ready after "sending" queries in aborted
+ * pipeline mode.)
+ */
+ while (PQisBusy(conn) == 0)
+ {
+ bool new_error;
+
+ if (results >= numsent)
+ {
+ if (write_done)
+ read_done = true;
+ break;
+ }
+
+ res = PQgetResult(conn);
+ new_error = process_result(conn, res, results, numsent);
+ if (new_error && got_error)
+ pg_fatal("got two errors");
+ got_error |= new_error;
+ if (results++ >= numsent - 1)
+ {
+ if (write_done)
+ read_done = true;
+ break;
+ }
+ }
+
+ if (read_done)
+ break;
+
+ FD_ZERO(&out_fds);
+ FD_SET(sock, &out_fds);
+
+ FD_ZERO(&in_fds);
+ FD_SET(sock, &in_fds);
+
+ if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
+ {
+ if (errno == EINTR)
+ continue;
+ pg_fatal("select() failed: %m");
+ }
+
+ if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
+ pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
+
+ /*
+ * If the socket is writable and we haven't finished sending queries,
+ * send some.
+ */
+ if (!write_done && FD_ISSET(sock, &out_fds))
+ {
+ for (;;)
+ {
+ int flush;
+
+ /*
+ * provoke uniqueness violation exactly once after having
+ * switched to read mode.
+ */
+ if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
+ {
+ sprintf(paramValue0, "%d", numsent / 2);
+ fprintf(stderr, "E");
+ error_sent = true;
+ }
+ else
+ {
+ fprintf(stderr, ".");
+ sprintf(paramValue0, "%d", ctr++);
+ }
+
+ if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
+ pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
+ numsent++;
+
+ /* Are we done writing? */
+ if (socketful != 0 && numsent % socketful == 42 && error_sent)
+ {
+ if (PQsendFlushRequest(conn) != 1)
+ pg_fatal("failed to send flush request");
+ write_done = true;
+ fprintf(stderr, "\ndone writing\n");
+ PQflush(conn);
+ break;
+ }
+
+ /* is the outgoing socket full? */
+ flush = PQflush(conn);
+ if (flush == -1)
+ pg_fatal("failed to flush: %s", PQerrorMessage(conn));
+ if (flush == 1)
+ {
+ if (socketful == 0)
+ socketful = numsent;
+ fprintf(stderr, "\nswitch to reading\n");
+ switched++;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!got_error)
+ pg_fatal("did not get expected error");
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
+ * Subroutine for test_uniqviol; given a PGresult, print it out and consume
+ * the expected NULL that should follow it.
+ *
+ * Returns true if we read a fatal error message, otherwise false.
+ */
+static bool
+process_result(PGconn *conn, PGresult *res, int results, int numsent)
+{
+ PGresult *res2;
+ bool got_error = false;
+
+ if (res == NULL)
+ pg_fatal("got unexpected NULL");
+
+ switch (PQresultStatus(res))
+ {
+ case PGRES_FATAL_ERROR:
+ got_error = true;
+ fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
+ PQclear(res);
+
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ break;
+
+ case PGRES_TUPLES_OK:
+ fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
+ PQclear(res);
+
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ break;
+
+ case PGRES_PIPELINE_ABORTED:
+ fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ break;
+
+ default:
+ pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
+ }
+
+ return got_error;
+}
+
+
static void
usage(const char *progname)
{
@@ -1331,6 +1554,7 @@ print_test_list(void)
printf("simple_pipeline\n");
printf("singlerow\n");
printf("transaction\n");
+ printf("uniqviol\n");
}
int
@@ -1436,6 +1660,8 @@ main(int argc, char **argv)
test_singlerowmode(conn);
else if (strcmp(testname, "transaction") == 0)
test_transaction(conn);
+ else if (strcmp(testname, "uniqviol") == 0)
+ test_uniqviol(conn);
else
{
fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);