1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-14 18:42:34 +03:00

psql: Add support for pipelines

With \bind, \parse, \bind_named and \close, it is possible to issue
queries from psql using the extended protocol.  However, it was not
possible to send these queries using libpq's pipeline mode.  This
feature has two advantages:
- Testing.  Pipeline tests were only possible with pgbench, using TAP
tests.  It now becomes possible to have more SQL tests that are able to
stress the backend with pipelines and extended queries.  More tests will
be added in a follow-up commit that were discussed on some other
threads.  Some external projects in the community had to implement their
own facility to work around this limitation.
- Emulation of custom workloads, with more control over the actions
taken by a client with libpq APIs.  It is possible to emulate more
workload patterns to bottleneck the backend with the extended query
protocol.

This patch adds six new meta-commands to be able to control pipelines:
* \startpipeline starts a new pipeline.  All extended queries are queued
until the end of the pipeline are reached or a sync request is sent and
processed.
* \endpipeline ends an existing pipeline.  All queued commands are sent
to the server and all responses are processed by psql.
* \syncpipeline queues a synchronisation request, without flushing the
commands to the server, equivalent of PQsendPipelineSync().
* \flush, equivalent of PQflush().
* \flushrequest, equivalent of PQsendFlushRequest()
* \getresults reads the server's results for the queries in a pipeline.
Unsent data is automatically pushed when \getresults is called.  It is
possible to control the number of results read in a single meta-command
execution with an optional parameter, 0 means that all the results
should be read.

Author: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Reviewed-by: Jelte Fennema-Nio <postgres@jeltef.nl>
Reviewed-by: Kirill Reshke <reshkekirill@gmail.com>
Discussion: https://postgr.es/m/CAO6_XqroE7JuMEm1sWz55rp9fAYX2JwmcP_3m_v51vnOFdsLiQ@mail.gmail.com
This commit is contained in:
Michael Paquier
2025-02-21 11:19:59 +09:00
parent 40af897eb7
commit 41625ab8ea
11 changed files with 1497 additions and 12 deletions

View File

@ -3674,6 +3674,73 @@ testdb=&gt; <userinput>\setenv LESS -imx4F</userinput>
</listitem>
</varlistentry>
<varlistentry id="app-psql-meta-command-pipeline">
<term><literal>\startpipeline</literal></term>
<term><literal>\syncpipeline</literal></term>
<term><literal>\endpipeline</literal></term>
<term><literal>\flushrequest</literal></term>
<term><literal>\flush</literal></term>
<term><literal>\getresults [ <replaceable class="parameter">number_results</replaceable> ]</literal></term>
<listitem>
<para>
This group of commands implements pipelining of SQL statements.
A pipeline must begin with a <command>\startpipeline</command>
and end with an <command>\endpipeline</command>. In between there
may be any number of <command>\syncpipeline</command> commands,
which sends a <link linkend="protocol-flow-ext-query">sync message</link>
without ending the ongoing pipeline and flushing the send buffer.
In pipeline mode, statements are sent to the server without waiting
for the results of previous statements.
See <xref linkend="libpq-pipeline-mode"/> for more details.
</para>
<para>
Pipeline mode requires the use of the extended query protocol. All
queries need to be sent using the meta-commands
<literal>\bind</literal>, <literal>\bind_named</literal>,
<literal>\close</literal> or <literal>\parse</literal>. While a
pipeline is ongoing, <literal>\g</literal> will append the current
query buffer to the pipeline. Other meta-commands like
<literal>\gx</literal> or <literal>\gdesc</literal> are not allowed
in pipeline mode.
</para>
<para>
<command>\flushrequest</command> appends a flush command to the
pipeline, allowing to read results with
<command>\getresults</command> without issuing a sync or ending the
pipeline. <command>\getresults</command> will automatically push
unsent data to the server. <command>\flush</command> can be used to
manually push unsent data.
</para>
<para>
<command>\getresults</command> accepts an optional
<replaceable class="parameter">number_results</replaceable> parameter.
If provided, only the first
<replaceable class="parameter">number_results</replaceable> pending
results will be read. If not provided or <literal>0</literal>, all
pending results are read. The commands <literal>\bind</literal>,
<literal>\bind_named</literal>, <literal>\close</literal>,
<literal>\parse</literal> and <literal>\syncpipeline</literal>
generate one result to get.
</para>
<para>
Example:
<programlisting>
\startpipeline
SELECT 1 \bind \g
\flushrequest
\getresults
\endpipeline
</programlisting>
</para>
</listitem>
</varlistentry>
<varlistentry id="app-psql-meta-command-t-lc">
<term><literal>\t</literal></term>

View File

@ -90,9 +90,12 @@ static backslashResult exec_command_else(PsqlScanState scan_state, ConditionalSt
PQExpBuffer query_buf);
static backslashResult exec_command_endif(PsqlScanState scan_state, ConditionalStack cstack,
PQExpBuffer query_buf);
static backslashResult exec_command_endpipeline(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_encoding(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_errverbose(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_f(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_flush(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_flushrequest(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_g(PsqlScanState scan_state, bool active_branch,
const char *cmd);
static backslashResult process_command_g_options(char *first_option,
@ -103,6 +106,7 @@ static backslashResult exec_command_gdesc(PsqlScanState scan_state, bool active_
static backslashResult exec_command_getenv(PsqlScanState scan_state, bool active_branch,
const char *cmd);
static backslashResult exec_command_gexec(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_getresults(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_gset(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_help(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_html(PsqlScanState scan_state, bool active_branch);
@ -132,6 +136,8 @@ static backslashResult exec_command_setenv(PsqlScanState scan_state, bool active
const char *cmd);
static backslashResult exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
const char *cmd, bool is_func);
static backslashResult exec_command_startpipeline(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_t(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_T(PsqlScanState scan_state, bool active_branch);
static backslashResult exec_command_timing(PsqlScanState scan_state, bool active_branch);
@ -351,18 +357,26 @@ exec_command(const char *cmd,
status = exec_command_else(scan_state, cstack, query_buf);
else if (strcmp(cmd, "endif") == 0)
status = exec_command_endif(scan_state, cstack, query_buf);
else if (strcmp(cmd, "endpipeline") == 0)
status = exec_command_endpipeline(scan_state, active_branch);
else if (strcmp(cmd, "encoding") == 0)
status = exec_command_encoding(scan_state, active_branch);
else if (strcmp(cmd, "errverbose") == 0)
status = exec_command_errverbose(scan_state, active_branch);
else if (strcmp(cmd, "f") == 0)
status = exec_command_f(scan_state, active_branch);
else if (strcmp(cmd, "flush") == 0)
status = exec_command_flush(scan_state, active_branch);
else if (strcmp(cmd, "flushrequest") == 0)
status = exec_command_flushrequest(scan_state, active_branch);
else if (strcmp(cmd, "g") == 0 || strcmp(cmd, "gx") == 0)
status = exec_command_g(scan_state, active_branch, cmd);
else if (strcmp(cmd, "gdesc") == 0)
status = exec_command_gdesc(scan_state, active_branch);
else if (strcmp(cmd, "getenv") == 0)
status = exec_command_getenv(scan_state, active_branch, cmd);
else if (strcmp(cmd, "getresults") == 0)
status = exec_command_getresults(scan_state, active_branch);
else if (strcmp(cmd, "gexec") == 0)
status = exec_command_gexec(scan_state, active_branch);
else if (strcmp(cmd, "gset") == 0)
@ -411,6 +425,10 @@ exec_command(const char *cmd,
status = exec_command_sf_sv(scan_state, active_branch, cmd, true);
else if (strcmp(cmd, "sv") == 0 || strcmp(cmd, "sv+") == 0)
status = exec_command_sf_sv(scan_state, active_branch, cmd, false);
else if (strcmp(cmd, "startpipeline") == 0)
status = exec_command_startpipeline(scan_state, active_branch);
else if (strcmp(cmd, "syncpipeline") == 0)
status = exec_command_syncpipeline(scan_state, active_branch);
else if (strcmp(cmd, "t") == 0)
status = exec_command_t(scan_state, active_branch);
else if (strcmp(cmd, "T") == 0)
@ -1515,6 +1533,44 @@ exec_command_f(PsqlScanState scan_state, bool active_branch)
return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
}
/*
* \flush -- call PQflush() on the connection
*/
static backslashResult
exec_command_flush(PsqlScanState scan_state, bool active_branch)
{
backslashResult status = PSQL_CMD_SKIP_LINE;
if (active_branch)
{
pset.send_mode = PSQL_SEND_FLUSH;
status = PSQL_CMD_SEND;
}
else
ignore_slash_options(scan_state);
return status;
}
/*
* \flushrequest -- call PQsendFlushRequest() on the connection
*/
static backslashResult
exec_command_flushrequest(PsqlScanState scan_state, bool active_branch)
{
backslashResult status = PSQL_CMD_SKIP_LINE;
if (active_branch)
{
pset.send_mode = PSQL_SEND_FLUSH_REQUEST;
status = PSQL_CMD_SEND;
}
else
ignore_slash_options(scan_state);
return status;
}
/*
* \g [(pset-option[=pset-value] ...)] [filename/shell-command]
* \gx [(pset-option[=pset-value] ...)] [filename/shell-command]
@ -1550,6 +1606,14 @@ exec_command_g(PsqlScanState scan_state, bool active_branch, const char *cmd)
if (status == PSQL_CMD_SKIP_LINE && active_branch)
{
if (strcmp(cmd, "gx") == 0 &&
PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
{
pg_log_error("\\gx not allowed in pipeline mode");
clean_extended_state();
return PSQL_CMD_ERROR;
}
if (!fname)
pset.gfname = NULL;
else
@ -1703,6 +1767,42 @@ exec_command_getenv(PsqlScanState scan_state, bool active_branch,
return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
}
/*
* \getresults -- read results
*/
static backslashResult
exec_command_getresults(PsqlScanState scan_state, bool active_branch)
{
backslashResult status = PSQL_CMD_SKIP_LINE;
if (active_branch)
{
char *opt;
int num_results;
pset.send_mode = PSQL_SEND_GET_RESULTS;
status = PSQL_CMD_SEND;
opt = psql_scan_slash_option(scan_state, OT_NORMAL, NULL, false);
pset.requested_results = 0;
if (opt != NULL)
{
num_results = atoi(opt);
if (num_results < 0)
{
pg_log_error("\\getresults: invalid number of requested results");
return PSQL_CMD_SKIP_LINE;
}
pset.requested_results = num_results;
}
}
else
ignore_slash_options(scan_state);
return status;
}
/*
* \gexec -- send query and execute each field of result
*/
@ -1713,6 +1813,12 @@ exec_command_gexec(PsqlScanState scan_state, bool active_branch)
if (active_branch)
{
if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
{
pg_log_error("\\gexec not allowed in pipeline mode");
clean_extended_state();
return PSQL_CMD_ERROR;
}
pset.gexec_flag = true;
status = PSQL_CMD_SEND;
}
@ -1733,6 +1839,13 @@ exec_command_gset(PsqlScanState scan_state, bool active_branch)
char *prefix = psql_scan_slash_option(scan_state,
OT_NORMAL, NULL, false);
if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
{
pg_log_error("\\gset not allowed in pipeline mode");
clean_extended_state();
return PSQL_CMD_ERROR;
}
if (prefix)
pset.gset_prefix = prefix;
else
@ -2718,6 +2831,63 @@ exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
return status;
}
/*
* \startpipeline -- enter pipeline mode
*/
static backslashResult
exec_command_startpipeline(PsqlScanState scan_state, bool active_branch)
{
backslashResult status = PSQL_CMD_SKIP_LINE;
if (active_branch)
{
pset.send_mode = PSQL_SEND_START_PIPELINE_MODE;
status = PSQL_CMD_SEND;
}
else
ignore_slash_options(scan_state);
return status;
}
/*
* \syncpipeline -- send a sync message to an active pipeline
*/
static backslashResult
exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch)
{
backslashResult status = PSQL_CMD_SKIP_LINE;
if (active_branch)
{
pset.send_mode = PSQL_SEND_PIPELINE_SYNC;
status = PSQL_CMD_SEND;
}
else
ignore_slash_options(scan_state);
return status;
}
/*
* \endpipeline -- end pipeline mode
*/
static backslashResult
exec_command_endpipeline(PsqlScanState scan_state, bool active_branch)
{
backslashResult status = PSQL_CMD_SKIP_LINE;
if (active_branch)
{
pset.send_mode = PSQL_SEND_END_PIPELINE_MODE;
status = PSQL_CMD_SEND;
}
else
ignore_slash_options(scan_state);
return status;
}
/*
* \t -- turn off table headers and row count
*/

View File

@ -121,6 +121,18 @@ CloseGOutput(FILE *gfile_fout, bool is_pipe)
}
}
/*
* Reset pset pipeline state
*/
static void
pipelineReset(void)
{
pset.piped_syncs = 0;
pset.piped_commands = 0;
pset.available_results = 0;
pset.requested_results = 0;
}
/*
* setQFout
* -- handler for -o command line option and \o command
@ -354,6 +366,7 @@ CheckConnection(void)
fprintf(stderr, _("The connection to the server was lost. Attempting reset: "));
PQreset(pset.db);
pipelineReset();
OK = ConnectionUp();
if (!OK)
{
@ -415,10 +428,12 @@ AcceptResult(const PGresult *result, bool show_error)
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
case PGRES_PIPELINE_SYNC:
/* Fine, do nothing */
OK = true;
break;
case PGRES_PIPELINE_ABORTED:
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
@ -1050,6 +1065,7 @@ PrintQueryResult(PGresult *result, bool last,
success = true;
break;
case PGRES_PIPELINE_ABORTED:
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
@ -1418,6 +1434,63 @@ DescribeQuery(const char *query, double *elapsed_msec)
return OK;
}
/*
* Read and discard all results in an aborted pipeline.
*
* If a synchronisation point is found, we can stop discarding results as
* the pipeline will switch back to a clean state. If no synchronisation
* point is available, we need to stop when ther are no more pending
* results, otherwise, calling PQgetResult() would block.
*/
static PGresult *
discardAbortedPipelineResults(void)
{
for (;;)
{
PGresult *res = PQgetResult(pset.db);
ExecStatusType result_status = PQresultStatus(res);
if (result_status == PGRES_PIPELINE_SYNC)
{
/*
* Found a synchronisation point. The sync counter is decremented
* by the caller.
*/
return res;
}
else if (res == NULL)
{
/* A query was processed, decrement the counters */
Assert(pset.available_results > 0);
Assert(pset.requested_results > 0);
pset.available_results--;
pset.requested_results--;
}
if (pset.requested_results == 0)
{
/* We have read all the requested results, leave */
return res;
}
if (pset.available_results == 0 && pset.piped_syncs == 0)
{
/*
* There are no more results to get and there is no
* synchronisation point to stop at. This will leave the pipeline
* in an aborted state.
*/
return res;
}
/*
* An aborted pipeline will have either NULL results or results in an
* PGRES_PIPELINE_ABORTED status.
*/
Assert(res == NULL || result_status == PGRES_PIPELINE_ABORTED);
PQclear(res);
}
}
/*
* ExecQueryAndProcessResults: utility function for use by SendQuery()
@ -1430,7 +1503,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
* input or output stream. In that event, we'll marshal data for the COPY.
*
* For other commands, the results are processed normally, depending on their
* status.
* status and the status of a pipeline.
*
* When invoked from \watch, is_watch is true and min_rows is the value
* of that option, or 0 if it wasn't set.
@ -1451,6 +1524,7 @@ ExecQueryAndProcessResults(const char *query,
bool timing = pset.timing;
bool success = false;
bool return_early = false;
bool end_pipeline = false;
instr_time before,
after;
PGresult *result;
@ -1466,9 +1540,13 @@ ExecQueryAndProcessResults(const char *query,
{
case PSQL_SEND_EXTENDED_CLOSE:
success = PQsendClosePrepared(pset.db, pset.stmtName);
if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
pset.piped_commands++;
break;
case PSQL_SEND_EXTENDED_PARSE:
success = PQsendPrepare(pset.db, pset.stmtName, query, 0, NULL);
if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
pset.piped_commands++;
break;
case PSQL_SEND_EXTENDED_QUERY_PARAMS:
Assert(pset.stmtName == NULL);
@ -1476,6 +1554,8 @@ ExecQueryAndProcessResults(const char *query,
pset.bind_nparams, NULL,
(const char *const *) pset.bind_params,
NULL, NULL, 0);
if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
pset.piped_commands++;
break;
case PSQL_SEND_EXTENDED_QUERY_PREPARED:
Assert(pset.stmtName != NULL);
@ -1483,6 +1563,89 @@ ExecQueryAndProcessResults(const char *query,
pset.bind_nparams,
(const char *const *) pset.bind_params,
NULL, NULL, 0);
if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
pset.piped_commands++;
break;
case PSQL_SEND_START_PIPELINE_MODE:
success = PQenterPipelineMode(pset.db);
break;
case PSQL_SEND_END_PIPELINE_MODE:
success = PQpipelineSync(pset.db);
if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
{
/*
* End of the pipeline, all queued commands need to be
* processed.
*/
end_pipeline = true;
pset.piped_syncs++;
/*
* The server will send a ReadyForQuery after a Sync is
* processed, flushing all the results back to the client.
*/
pset.available_results += pset.piped_commands;
pset.piped_commands = 0;
/* We want to read all results */
pset.requested_results = pset.available_results + pset.piped_syncs;
}
break;
case PSQL_SEND_PIPELINE_SYNC:
success = PQsendPipelineSync(pset.db);
if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
{
pset.piped_syncs++;
/*
* The server will send a ReadyForQuery after a Sync is
* processed, flushing all the results back to the client.
*/
pset.available_results += pset.piped_commands;
pset.piped_commands = 0;
}
break;
case PSQL_SEND_FLUSH:
success = PQflush(pset.db);
break;
case PSQL_SEND_FLUSH_REQUEST:
success = PQsendFlushRequest(pset.db);
if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
{
/*
* With the flush request, all commands in the pipeline are
* pushed and the server will flush the results back to the
* client, making them available.
*/
pset.available_results += pset.piped_commands;
pset.piped_commands = 0;
}
break;
case PSQL_SEND_GET_RESULTS:
if (pset.available_results == 0 && pset.piped_syncs == 0)
{
/*
* If no sync or flush request were sent, PQgetResult() would
* block as there are no results available. Forbid any
* attempt to get pending results should we try to reach this
* state.
*/
pg_log_info("No pending results to get");
success = false;
pset.requested_results = 0;
}
else
{
success = true;
/*
* Cap requested_results to the maximum number of known
* results.
*/
if (pset.requested_results == 0 ||
pset.requested_results > (pset.available_results + pset.piped_syncs))
pset.requested_results = pset.available_results + pset.piped_syncs;
}
break;
case PSQL_SEND_QUERY:
success = PQsendQuery(pset.db, query);
@ -1501,6 +1664,16 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
if (pset.requested_results == 0 && !end_pipeline &&
PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
{
/*
* We are in a pipeline and have not reached the pipeline end, or
* there was no request to read pipeline results, exit.
*/
return 1;
}
/*
* Fetch the result in chunks if FETCH_COUNT is set, except when:
*
@ -1548,7 +1721,7 @@ ExecQueryAndProcessResults(const char *query,
{
ExecStatusType result_status;
bool is_chunked_result = false;
PGresult *next_result;
PGresult *next_result = NULL;
bool last;
if (!AcceptResult(result, false))
@ -1571,6 +1744,9 @@ ExecQueryAndProcessResults(const char *query,
ClearOrSaveResult(result);
success = false;
if (result_status == PGRES_PIPELINE_ABORTED)
pg_log_info("Pipeline aborted, command did not run");
/*
* switch to next result
*/
@ -1586,6 +1762,22 @@ ExecQueryAndProcessResults(const char *query,
*/
result = NULL;
}
else if ((end_pipeline || pset.requested_results > 0)
&& PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
{
/*
* Error within a pipeline. All commands are aborted until
* the next synchronisation point. We need to consume all the
* results until this synchronisation point, or stop when
* there are no more result to discard.
*
* Checking the pipeline status is necessary for the case
* where the connection was reset. The new connection is not
* in any kind of pipeline state and thus has no result to
* discard.
*/
result = discardAbortedPipelineResults();
}
else
result = PQgetResult(pset.db);
@ -1772,12 +1964,66 @@ ExecQueryAndProcessResults(const char *query,
}
}
if (result_status == PGRES_PIPELINE_SYNC)
{
Assert(pset.piped_syncs > 0);
/*
* Sync response, decrease the sync and requested_results
* counters.
*/
pset.piped_syncs--;
pset.requested_results--;
/*
* After a synchronisation point, reset success state to print
* possible successful results that will be processed after this.
*/
success = true;
/*
* If all syncs were processed and pipeline end was requested,
* exit pipeline mode.
*/
if (end_pipeline && pset.piped_syncs == 0)
success &= PQexitPipelineMode(pset.db);
}
else if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF &&
result_status != PGRES_PIPELINE_SYNC)
{
/*
* In a pipeline with a non-sync response? Decrease the result
* counters.
*/
pset.available_results--;
pset.requested_results--;
}
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
* to process. We need to do that to check whether this is the last.
*/
next_result = PQgetResult(pset.db);
if (PQpipelineStatus(pset.db) == PQ_PIPELINE_OFF)
next_result = PQgetResult(pset.db);
else
{
/*
* In pipeline mode, a NULL result indicates the end of the
* current query being processed. Call PQgetResult() once to
* consume this state.
*/
if (result_status != PGRES_PIPELINE_SYNC)
{
next_result = PQgetResult(pset.db);
Assert(next_result == NULL);
}
/* Now, we can get the next result in the pipeline. */
if (pset.requested_results > 0)
next_result = PQgetResult(pset.db);
}
last = (next_result == NULL);
/*
@ -1799,8 +2045,13 @@ ExecQueryAndProcessResults(const char *query,
*elapsed_msec = INSTR_TIME_GET_MILLISEC(after);
}
/* this may or may not print something depending on settings */
if (result != NULL)
/*
* This may or may not print something depending on settings.
*
* A pipeline sync will have a non-NULL result but does not have
* anything to print, thus ignore results in this case.
*/
if (result != NULL && result_status != PGRES_PIPELINE_SYNC)
{
/*
* If results need to be printed into the file specified by \g,
@ -1826,9 +2077,15 @@ ExecQueryAndProcessResults(const char *query,
ClearOrSaveResult(result);
result = next_result;
if (cancel_pressed)
if (cancel_pressed && PQpipelineStatus(pset.db) == PQ_PIPELINE_OFF)
{
/* drop this next result, as well as any others not yet read */
/*
* Outside of a pipeline, drop the next result, as well as any
* others not yet read.
*
* Within a pipeline, we can let the outer loop handle this as an
* aborted pipeline, which will discard then all the results.
*/
ClearOrSaveResult(result);
ClearOrSaveAllResults();
break;
@ -1838,6 +2095,17 @@ ExecQueryAndProcessResults(const char *query,
/* close \g file if we opened it */
CloseGOutput(gfile_fout, gfile_is_pipe);
if (end_pipeline)
{
/* after a pipeline is processed, pipeline piped_syncs should be 0 */
Assert(pset.piped_syncs == 0);
/* all commands have been processed */
Assert(pset.piped_commands == 0);
/* all results were read */
Assert(pset.available_results == 0);
}
Assert(pset.requested_results == 0);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
return -1;
@ -2298,6 +2566,12 @@ clean_extended_state(void)
pset.bind_params = NULL;
break;
case PSQL_SEND_QUERY:
case PSQL_SEND_START_PIPELINE_MODE: /* \startpipeline */
case PSQL_SEND_END_PIPELINE_MODE: /* \endpipeline */
case PSQL_SEND_PIPELINE_SYNC: /* \syncpipeline */
case PSQL_SEND_FLUSH: /* \flush */
case PSQL_SEND_GET_RESULTS: /* \getresults */
case PSQL_SEND_FLUSH_REQUEST: /* \flushrequest */
break;
}

View File

@ -167,15 +167,22 @@ slashUsage(unsigned short int pager)
HELP0(" \\close STMT_NAME close an existing prepared statement\n");
HELP0(" \\copyright show PostgreSQL usage and distribution terms\n");
HELP0(" \\crosstabview [COLUMNS] execute query and display result in crosstab\n");
HELP0(" \\endpipeline exit pipeline mode\n");
HELP0(" \\errverbose show most recent error message at maximum verbosity\n");
HELP0(" \\flush push unsent data to the server\n");
HELP0(" \\flushrequest send a flushrequest command\n");
HELP0(" \\g [(OPTIONS)] [FILE] execute query (and send result to file or |pipe);\n"
" \\g with no arguments is equivalent to a semicolon\n");
HELP0(" \\gdesc describe result of query, without executing it\n");
HELP0(" \\getresults [NUM_RES] read NUM_RES pending results. All pending results are\n"
" read if no argument is provided\n");
HELP0(" \\gexec execute query, then execute each value in its result\n");
HELP0(" \\gset [PREFIX] execute query and store result in psql variables\n");
HELP0(" \\gx [(OPTIONS)] [FILE] as \\g, but forces expanded output mode\n");
HELP0(" \\parse STMT_NAME create a prepared statement\n");
HELP0(" \\q quit psql\n");
HELP0(" \\startpipeline enter pipeline mode\n");
HELP0(" \\syncpipeline add a synchronisation point to an ongoing pipeline\n");
HELP0(" \\watch [[i=]SEC] [c=N] [m=MIN]\n"
" execute query every SEC seconds, up to N times,\n"
" stop if less than MIN rows are returned\n");

View File

@ -69,6 +69,12 @@ typedef enum
PSQL_SEND_EXTENDED_PARSE,
PSQL_SEND_EXTENDED_QUERY_PARAMS,
PSQL_SEND_EXTENDED_QUERY_PREPARED,
PSQL_SEND_PIPELINE_SYNC,
PSQL_SEND_START_PIPELINE_MODE,
PSQL_SEND_END_PIPELINE_MODE,
PSQL_SEND_FLUSH,
PSQL_SEND_FLUSH_REQUEST,
PSQL_SEND_GET_RESULTS,
} PSQL_SEND_MODE;
typedef enum
@ -111,6 +117,12 @@ typedef struct _psqlSettings
char **bind_params; /* parameters for extended query protocol call */
char *stmtName; /* prepared statement name used for extended
* query protocol commands */
int piped_commands; /* number of piped commands */
int piped_syncs; /* number of piped syncs */
int available_results; /* number of results available to get */
int requested_results; /* number of requested results, including
* sync messages. Used to read a limited
* subset of the available_results. */
bool crosstab_flag; /* one-shot request to crosstab result */
char *ctv_args[4]; /* \crosstabview arguments */

View File

@ -1885,9 +1885,9 @@ psql_completion(const char *text, int start, int end)
"\\drds", "\\drg", "\\dRs", "\\dRp", "\\ds",
"\\dt", "\\dT", "\\dv", "\\du", "\\dx", "\\dX", "\\dy",
"\\echo", "\\edit", "\\ef", "\\elif", "\\else", "\\encoding",
"\\endif", "\\errverbose", "\\ev",
"\\f",
"\\g", "\\gdesc", "\\getenv", "\\gexec", "\\gset", "\\gx",
"\\endif", "\\endpipeline", "\\errverbose", "\\ev",
"\\f", "\\flush", "\\flushrequest",
"\\g", "\\gdesc", "\\getenv", "\\getresults", "\\gexec", "\\gset", "\\gx",
"\\help", "\\html",
"\\if", "\\include", "\\include_relative", "\\ir",
"\\list", "\\lo_import", "\\lo_export", "\\lo_list", "\\lo_unlink",
@ -1895,7 +1895,7 @@ psql_completion(const char *text, int start, int end)
"\\parse", "\\password", "\\print", "\\prompt", "\\pset",
"\\qecho", "\\quit",
"\\reset",
"\\s", "\\set", "\\setenv", "\\sf", "\\sv",
"\\s", "\\set", "\\setenv", "\\sf", "\\startpipeline", "\\sv", "\\syncpipeline",
"\\t", "\\T", "\\timing",
"\\unset",
"\\x",

View File

@ -4678,11 +4678,15 @@ bar 'bar' "bar"
\echo arg1 arg2 arg3 arg4 arg5
\echo arg1
\encoding arg1
\endpipeline
\errverbose
\f arg1
\flush
\flushrequest
\g arg1
\gx arg1
\gexec
\getresults
SELECT 1 AS one \gset
\h
\?
@ -4706,6 +4710,8 @@ invalid command \lo
\setenv arg1 arg2
\sf whole_line
\sv whole_line
\startpipeline
\syncpipeline
\t arg1
\T arg1
\timing arg1

View File

@ -0,0 +1,589 @@
--
-- Tests using psql pipelining
--
CREATE TABLE psql_pipeline(a INTEGER PRIMARY KEY, s TEXT);
-- Single query
\startpipeline
SELECT $1 \bind 'val1' \g
\endpipeline
?column?
----------
val1
(1 row)
-- Multiple queries
\startpipeline
SELECT $1 \bind 'val1' \g
SELECT $1, $2 \bind 'val2' 'val3' \g
SELECT $1, $2 \bind 'val2' 'val3' \g
\endpipeline
?column?
----------
val1
(1 row)
?column? | ?column?
----------+----------
val2 | val3
(1 row)
?column? | ?column?
----------+----------
val2 | val3
(1 row)
-- Test \flush
\startpipeline
\flush
SELECT $1 \bind 'val1' \g
\flush
SELECT $1, $2 \bind 'val2' 'val3' \g
SELECT $1, $2 \bind 'val2' 'val3' \g
\endpipeline
?column?
----------
val1
(1 row)
?column? | ?column?
----------+----------
val2 | val3
(1 row)
?column? | ?column?
----------+----------
val2 | val3
(1 row)
-- Send multiple syncs
\startpipeline
SELECT $1 \bind 'val1' \g
\syncpipeline
\syncpipeline
SELECT $1, $2 \bind 'val2' 'val3' \g
\syncpipeline
SELECT $1, $2 \bind 'val4' 'val5' \g
\endpipeline
?column?
----------
val1
(1 row)
?column? | ?column?
----------+----------
val2 | val3
(1 row)
?column? | ?column?
----------+----------
val4 | val5
(1 row)
-- \startpipeline should not have any effect if already in a pipeline.
\startpipeline
\startpipeline
SELECT $1 \bind 'val1' \g
\endpipeline
?column?
----------
val1
(1 row)
-- Convert an implicit transaction block to an explicit transaction block.
\startpipeline
INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
BEGIN \bind \g
INSERT INTO psql_pipeline VALUES ($1) \bind 2 \g
ROLLBACK \bind \g
\endpipeline
-- Multiple explicit transactions
\startpipeline
BEGIN \bind \g
INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
ROLLBACK \bind \g
BEGIN \bind \g
INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
COMMIT \bind \g
\endpipeline
-- COPY FROM STDIN
\startpipeline
SELECT $1 \bind 'val1' \g
COPY psql_pipeline FROM STDIN \bind \g
\endpipeline
?column?
----------
val1
(1 row)
-- COPY FROM STDIN with \flushrequest + \getresults
\startpipeline
SELECT $1 \bind 'val1' \g
COPY psql_pipeline FROM STDIN \bind \g
\flushrequest
\getresults
?column?
----------
val1
(1 row)
message type 0x5a arrived from server while idle
\endpipeline
-- COPY FROM STDIN with \syncpipeline + \getresults
\startpipeline
SELECT $1 \bind 'val1' \g
COPY psql_pipeline FROM STDIN \bind \g
\syncpipeline
\getresults
?column?
----------
val1
(1 row)
\endpipeline
-- COPY TO STDOUT
\startpipeline
SELECT $1 \bind 'val1' \g
copy psql_pipeline TO STDOUT \bind \g
\endpipeline
?column?
----------
val1
(1 row)
1 \N
2 test2
3 test3
4 test4
-- COPY TO STDOUT with \flushrequest + \getresults
\startpipeline
SELECT $1 \bind 'val1' \g
copy psql_pipeline TO STDOUT \bind \g
\flushrequest
\getresults
?column?
----------
val1
(1 row)
1 \N
2 test2
3 test3
4 test4
\endpipeline
-- COPY TO STDOUT with \syncpipeline + \getresults
\startpipeline
SELECT $1 \bind 'val1' \g
copy psql_pipeline TO STDOUT \bind \g
\syncpipeline
\getresults
?column?
----------
val1
(1 row)
1 \N
2 test2
3 test3
4 test4
\endpipeline
-- Use \parse and \bind_named
\startpipeline
SELECT $1 \parse ''
SELECT $1, $2 \parse ''
SELECT $2 \parse pipeline_1
\bind_named '' 1 2 \g
\bind_named pipeline_1 2 \g
\endpipeline
ERROR: could not determine data type of parameter $1
-- \getresults displays all results preceding a \flushrequest.
\startpipeline
SELECT $1 \bind 1 \g
SELECT $1 \bind 2 \g
\flushrequest
\getresults
?column?
----------
1
(1 row)
?column?
----------
2
(1 row)
\endpipeline
-- \getresults displays all results preceding a \syncpipeline.
\startpipeline
SELECT $1 \bind 1 \g
SELECT $1 \bind 2 \g
\syncpipeline
\getresults
?column?
----------
1
(1 row)
?column?
----------
2
(1 row)
\endpipeline
-- \getresults immediately returns if there is no result to fetch.
\startpipeline
\getresults
No pending results to get
SELECT $1 \bind 2 \g
\getresults
No pending results to get
\flushrequest
\endpipeline
?column?
----------
2
(1 row)
\getresults
No pending results to get
-- \getresults only fetches results preceding a \flushrequest.
\startpipeline
SELECT $1 \bind 2 \g
\flushrequest
SELECT $1 \bind 2 \g
\getresults
?column?
----------
2
(1 row)
\endpipeline
?column?
----------
2
(1 row)
-- \getresults only fetches results preceding a \syncpipeline.
\startpipeline
SELECT $1 \bind 2 \g
\syncpipeline
SELECT $1 \bind 2 \g
\getresults
?column?
----------
2
(1 row)
\endpipeline
?column?
----------
2
(1 row)
-- Use pipeline with chunked results for both \getresults and \endpipeline.
\startpipeline
\set FETCH_COUNT 10
SELECT $1 \bind 2 \g
\flushrequest
\getresults
?column?
----------
2
(1 row)
SELECT $1 \bind 2 \g
\endpipeline
?column?
----------
2
(1 row)
\unset FETCH_COUNT
-- \getresults with specific number of requested results.
\startpipeline
SELECT $1 \bind 1 \g
SELECT $1 \bind 2 \g
SELECT $1 \bind 3 \g
\syncpipeline
\getresults 1
?column?
----------
1
(1 row)
SELECT $1 \bind 4 \g
\getresults 3
?column?
----------
2
(1 row)
?column?
----------
3
(1 row)
\endpipeline
?column?
----------
4
(1 row)
-- \syncpipeline count as one command to fetch for \getresults.
\startpipeline
\syncpipeline
\syncpipeline
SELECT $1 \bind 1 \g
\flushrequest
\getresults 2
\getresults 1
?column?
----------
1
(1 row)
\endpipeline
-- \getresults 0 should get all the results.
\startpipeline
SELECT $1 \bind 1 \g
SELECT $1 \bind 2 \g
SELECT $1 \bind 3 \g
\syncpipeline
\getresults 0
?column?
----------
1
(1 row)
?column?
----------
2
(1 row)
?column?
----------
3
(1 row)
\endpipeline
--
-- Pipeline errors
--
-- \endpipeline outside of pipeline should fail
\endpipeline
cannot send pipeline when not in pipeline mode
-- Query using simple protocol should not be sent and should leave the
-- pipeline usable.
\startpipeline
SELECT 1;
PQsendQuery not allowed in pipeline mode
SELECT $1 \bind 'val1' \g
\endpipeline
?column?
----------
val1
(1 row)
-- After an aborted pipeline, commands after a \syncpipeline should be
-- displayed.
\startpipeline
SELECT $1 \bind \g
\syncpipeline
SELECT $1 \bind 1 \g
\endpipeline
ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
?column?
----------
1
(1 row)
-- For an incorrect number of parameters, the pipeline is aborted and
-- the following queries will not be executed.
\startpipeline
SELECT \bind 'val1' \g
SELECT $1 \bind 'val1' \g
\endpipeline
ERROR: bind message supplies 1 parameters, but prepared statement "" requires 0
-- An explicit transaction with an error needs to be rollbacked after
-- the pipeline.
\startpipeline
BEGIN \bind \g
INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
ROLLBACK \bind \g
\endpipeline
ERROR: duplicate key value violates unique constraint "psql_pipeline_pkey"
DETAIL: Key (a)=(1) already exists.
ROLLBACK;
-- \watch sends a simple query, something not allowed within a pipeline.
\startpipeline
SELECT \bind \g
\watch 1
PQsendQuery not allowed in pipeline mode
\endpipeline
--
(1 row)
-- \gdesc should fail as synchronous commands are not allowed in a pipeline,
-- and the pipeline should still be usable.
\startpipeline
SELECT $1 \bind 1 \gdesc
synchronous command execution functions are not allowed in pipeline mode
SELECT $1 \bind 1 \g
\endpipeline
?column?
----------
1
(1 row)
-- \gset is not allowed in a pipeline, pipeline should still be usable.
\startpipeline
SELECT $1 as i, $2 as j \parse ''
SELECT $1 as k, $2 as l \parse 'second'
\bind_named '' 1 2 \gset
\gset not allowed in pipeline mode
\bind_named second 1 2 \gset pref02_ \echo :pref02_i :pref02_j
\gset not allowed in pipeline mode
\bind_named '' 1 2 \g
\endpipeline
i | j
---+---
1 | 2
(1 row)
-- \gx is not allowed, pipeline should still be usable.
\startpipeline
SELECT $1 \bind 1 \gx
\gx not allowed in pipeline mode
\reset
SELECT $1 \bind 1 \g
\endpipeline
?column?
----------
1
(1 row)
-- \gx warning should be emitted in an aborted pipeline, with
-- pipeline still usable.
\startpipeline
SELECT $1 \bind \g
\flushrequest
\getresults
ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
SELECT $1 \bind 1 \gx
\gx not allowed in pipeline mode
\endpipeline
-- \gexec is not allowed, pipeline should still be usable.
\startpipeline
SELECT 'INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)' \parse 'insert_stmt'
\bind_named insert_stmt \gexec
\gexec not allowed in pipeline mode
\bind_named insert_stmt \g
SELECT COUNT(*) FROM psql_pipeline \bind \g
\endpipeline
?column?
------------------------------------------------------------
INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)
(1 row)
count
-------
4
(1 row)
-- After an error, pipeline is aborted and requires \syncpipeline to be
-- reusable.
\startpipeline
SELECT $1 \bind \g
SELECT $1 \bind 1 \g
SELECT $1 \parse a
\bind_named a 1 \g
\close a
\flushrequest
\getresults
ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
-- Pipeline is aborted.
SELECT $1 \bind 1 \g
SELECT $1 \parse a
\bind_named a 1 \g
\close a
-- Sync allows pipeline to recover.
\syncpipeline
\getresults
Pipeline aborted, command did not run
SELECT $1 \bind 1 \g
SELECT $1 \parse a
\bind_named a 1 \g
\close a
\flushrequest
\getresults
?column?
----------
1
(1 row)
?column?
----------
1
(1 row)
\endpipeline
-- In an aborted pipeline, \getresults 1 aborts commands one at a time.
\startpipeline
SELECT $1 \bind \g
SELECT $1 \bind 1 \g
SELECT $1 \parse a
\bind_named a 1 \g
\syncpipeline
\getresults 1
ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
\getresults 1
Pipeline aborted, command did not run
\getresults 1
Pipeline aborted, command did not run
\getresults 1
Pipeline aborted, command did not run
\getresults 1
\endpipeline
-- Test chunked results with an aborted pipeline.
\startpipeline
\set FETCH_COUNT 10
SELECT $1 \bind \g
\flushrequest
\getresults
ERROR: bind message supplies 0 parameters, but prepared statement "" requires 1
SELECT $1 \bind \g
\endpipeline
fetching results in chunked mode failed
Pipeline aborted, command did not run
\unset FETCH_COUNT
-- \getresults returns an error when an incorrect number is provided.
\startpipeline
\getresults -1
\getresults: invalid number of requested results
\endpipeline
-- \getresults when there is no result should not impact the next
-- query executed.
\getresults 1
No pending results to get
select 1;
?column?
----------
1
(1 row)
-- Error messages accumulate and are repeated.
\startpipeline
SELECT 1 \bind \g
SELECT 1;
PQsendQuery not allowed in pipeline mode
SELECT 1;
PQsendQuery not allowed in pipeline mode
PQsendQuery not allowed in pipeline mode
\endpipeline
?column?
----------
1
(1 row)
-- Clean up
DROP TABLE psql_pipeline;

View File

@ -81,7 +81,7 @@ test: brin_bloom brin_multi
test: create_table_like alter_generic alter_operator misc async dbsize merge misc_functions sysviews tsrf tid tidscan tidrangescan collate.utf8 collate.icu.utf8 incremental_sort create_role without_overlaps generated_virtual
# collate.linux.utf8 and collate.icu.utf8 tests cannot be run in parallel with each other
test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8 collate.windows.win1252
test: rules psql psql_crosstab psql_pipeline amutils stats_ext collate.linux.utf8 collate.windows.win1252
# ----------
# Run these alone so they don't run out of parallel workers

View File

@ -1047,11 +1047,15 @@ select \if false \\ (bogus \else \\ 42 \endif \\ forty_two;
\echo arg1 arg2 arg3 arg4 arg5
\echo arg1
\encoding arg1
\endpipeline
\errverbose
\f arg1
\flush
\flushrequest
\g arg1
\gx arg1
\gexec
\getresults
SELECT 1 AS one \gset
\h
\?
@ -1074,6 +1078,8 @@ select \if false \\ (bogus \else \\ 42 \endif \\ forty_two;
\setenv arg1 arg2
\sf whole_line
\sv whole_line
\startpipeline
\syncpipeline
\t arg1
\T arg1
\timing arg1

View File

@ -0,0 +1,354 @@
--
-- Tests using psql pipelining
--
CREATE TABLE psql_pipeline(a INTEGER PRIMARY KEY, s TEXT);
-- Single query
\startpipeline
SELECT $1 \bind 'val1' \g
\endpipeline
-- Multiple queries
\startpipeline
SELECT $1 \bind 'val1' \g
SELECT $1, $2 \bind 'val2' 'val3' \g
SELECT $1, $2 \bind 'val2' 'val3' \g
\endpipeline
-- Test \flush
\startpipeline
\flush
SELECT $1 \bind 'val1' \g
\flush
SELECT $1, $2 \bind 'val2' 'val3' \g
SELECT $1, $2 \bind 'val2' 'val3' \g
\endpipeline
-- Send multiple syncs
\startpipeline
SELECT $1 \bind 'val1' \g
\syncpipeline
\syncpipeline
SELECT $1, $2 \bind 'val2' 'val3' \g
\syncpipeline
SELECT $1, $2 \bind 'val4' 'val5' \g
\endpipeline
-- \startpipeline should not have any effect if already in a pipeline.
\startpipeline
\startpipeline
SELECT $1 \bind 'val1' \g
\endpipeline
-- Convert an implicit transaction block to an explicit transaction block.
\startpipeline
INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
BEGIN \bind \g
INSERT INTO psql_pipeline VALUES ($1) \bind 2 \g
ROLLBACK \bind \g
\endpipeline
-- Multiple explicit transactions
\startpipeline
BEGIN \bind \g
INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
ROLLBACK \bind \g
BEGIN \bind \g
INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
COMMIT \bind \g
\endpipeline
-- COPY FROM STDIN
\startpipeline
SELECT $1 \bind 'val1' \g
COPY psql_pipeline FROM STDIN \bind \g
\endpipeline
2 test2
\.
-- COPY FROM STDIN with \flushrequest + \getresults
\startpipeline
SELECT $1 \bind 'val1' \g
COPY psql_pipeline FROM STDIN \bind \g
\flushrequest
\getresults
3 test3
\.
\endpipeline
-- COPY FROM STDIN with \syncpipeline + \getresults
\startpipeline
SELECT $1 \bind 'val1' \g
COPY psql_pipeline FROM STDIN \bind \g
\syncpipeline
\getresults
4 test4
\.
\endpipeline
-- COPY TO STDOUT
\startpipeline
SELECT $1 \bind 'val1' \g
copy psql_pipeline TO STDOUT \bind \g
\endpipeline
-- COPY TO STDOUT with \flushrequest + \getresults
\startpipeline
SELECT $1 \bind 'val1' \g
copy psql_pipeline TO STDOUT \bind \g
\flushrequest
\getresults
\endpipeline
-- COPY TO STDOUT with \syncpipeline + \getresults
\startpipeline
SELECT $1 \bind 'val1' \g
copy psql_pipeline TO STDOUT \bind \g
\syncpipeline
\getresults
\endpipeline
-- Use \parse and \bind_named
\startpipeline
SELECT $1 \parse ''
SELECT $1, $2 \parse ''
SELECT $2 \parse pipeline_1
\bind_named '' 1 2 \g
\bind_named pipeline_1 2 \g
\endpipeline
-- \getresults displays all results preceding a \flushrequest.
\startpipeline
SELECT $1 \bind 1 \g
SELECT $1 \bind 2 \g
\flushrequest
\getresults
\endpipeline
-- \getresults displays all results preceding a \syncpipeline.
\startpipeline
SELECT $1 \bind 1 \g
SELECT $1 \bind 2 \g
\syncpipeline
\getresults
\endpipeline
-- \getresults immediately returns if there is no result to fetch.
\startpipeline
\getresults
SELECT $1 \bind 2 \g
\getresults
\flushrequest
\endpipeline
\getresults
-- \getresults only fetches results preceding a \flushrequest.
\startpipeline
SELECT $1 \bind 2 \g
\flushrequest
SELECT $1 \bind 2 \g
\getresults
\endpipeline
-- \getresults only fetches results preceding a \syncpipeline.
\startpipeline
SELECT $1 \bind 2 \g
\syncpipeline
SELECT $1 \bind 2 \g
\getresults
\endpipeline
-- Use pipeline with chunked results for both \getresults and \endpipeline.
\startpipeline
\set FETCH_COUNT 10
SELECT $1 \bind 2 \g
\flushrequest
\getresults
SELECT $1 \bind 2 \g
\endpipeline
\unset FETCH_COUNT
-- \getresults with specific number of requested results.
\startpipeline
SELECT $1 \bind 1 \g
SELECT $1 \bind 2 \g
SELECT $1 \bind 3 \g
\syncpipeline
\getresults 1
SELECT $1 \bind 4 \g
\getresults 3
\endpipeline
-- \syncpipeline count as one command to fetch for \getresults.
\startpipeline
\syncpipeline
\syncpipeline
SELECT $1 \bind 1 \g
\flushrequest
\getresults 2
\getresults 1
\endpipeline
-- \getresults 0 should get all the results.
\startpipeline
SELECT $1 \bind 1 \g
SELECT $1 \bind 2 \g
SELECT $1 \bind 3 \g
\syncpipeline
\getresults 0
\endpipeline
--
-- Pipeline errors
--
-- \endpipeline outside of pipeline should fail
\endpipeline
-- Query using simple protocol should not be sent and should leave the
-- pipeline usable.
\startpipeline
SELECT 1;
SELECT $1 \bind 'val1' \g
\endpipeline
-- After an aborted pipeline, commands after a \syncpipeline should be
-- displayed.
\startpipeline
SELECT $1 \bind \g
\syncpipeline
SELECT $1 \bind 1 \g
\endpipeline
-- For an incorrect number of parameters, the pipeline is aborted and
-- the following queries will not be executed.
\startpipeline
SELECT \bind 'val1' \g
SELECT $1 \bind 'val1' \g
\endpipeline
-- An explicit transaction with an error needs to be rollbacked after
-- the pipeline.
\startpipeline
BEGIN \bind \g
INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
ROLLBACK \bind \g
\endpipeline
ROLLBACK;
-- \watch sends a simple query, something not allowed within a pipeline.
\startpipeline
SELECT \bind \g
\watch 1
\endpipeline
-- \gdesc should fail as synchronous commands are not allowed in a pipeline,
-- and the pipeline should still be usable.
\startpipeline
SELECT $1 \bind 1 \gdesc
SELECT $1 \bind 1 \g
\endpipeline
-- \gset is not allowed in a pipeline, pipeline should still be usable.
\startpipeline
SELECT $1 as i, $2 as j \parse ''
SELECT $1 as k, $2 as l \parse 'second'
\bind_named '' 1 2 \gset
\bind_named second 1 2 \gset pref02_ \echo :pref02_i :pref02_j
\bind_named '' 1 2 \g
\endpipeline
-- \gx is not allowed, pipeline should still be usable.
\startpipeline
SELECT $1 \bind 1 \gx
\reset
SELECT $1 \bind 1 \g
\endpipeline
-- \gx warning should be emitted in an aborted pipeline, with
-- pipeline still usable.
\startpipeline
SELECT $1 \bind \g
\flushrequest
\getresults
SELECT $1 \bind 1 \gx
\endpipeline
-- \gexec is not allowed, pipeline should still be usable.
\startpipeline
SELECT 'INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)' \parse 'insert_stmt'
\bind_named insert_stmt \gexec
\bind_named insert_stmt \g
SELECT COUNT(*) FROM psql_pipeline \bind \g
\endpipeline
-- After an error, pipeline is aborted and requires \syncpipeline to be
-- reusable.
\startpipeline
SELECT $1 \bind \g
SELECT $1 \bind 1 \g
SELECT $1 \parse a
\bind_named a 1 \g
\close a
\flushrequest
\getresults
-- Pipeline is aborted.
SELECT $1 \bind 1 \g
SELECT $1 \parse a
\bind_named a 1 \g
\close a
-- Sync allows pipeline to recover.
\syncpipeline
\getresults
SELECT $1 \bind 1 \g
SELECT $1 \parse a
\bind_named a 1 \g
\close a
\flushrequest
\getresults
\endpipeline
-- In an aborted pipeline, \getresults 1 aborts commands one at a time.
\startpipeline
SELECT $1 \bind \g
SELECT $1 \bind 1 \g
SELECT $1 \parse a
\bind_named a 1 \g
\syncpipeline
\getresults 1
\getresults 1
\getresults 1
\getresults 1
\getresults 1
\endpipeline
-- Test chunked results with an aborted pipeline.
\startpipeline
\set FETCH_COUNT 10
SELECT $1 \bind \g
\flushrequest
\getresults
SELECT $1 \bind \g
\endpipeline
\unset FETCH_COUNT
-- \getresults returns an error when an incorrect number is provided.
\startpipeline
\getresults -1
\endpipeline
-- \getresults when there is no result should not impact the next
-- query executed.
\getresults 1
select 1;
-- Error messages accumulate and are repeated.
\startpipeline
SELECT 1 \bind \g
SELECT 1;
SELECT 1;
\endpipeline
-- Clean up
DROP TABLE psql_pipeline;