From 9aa491abbf07ca8385a429385be8d68517384fdf Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Mon, 15 Mar 2021 18:33:03 -0300 Subject: [PATCH] Add libpq pipeline mode support to pgbench MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New metacommands \startpipeline and \endpipeline allow the user to run queries in libpq pipeline mode. Author: Daniel Vérité Reviewed-by: Álvaro Herrera Discussion: https://postgr.es/m/b4e34135-2bd9-4b8a-94ca-27d760da26d7@manitou-mail.org --- doc/src/sgml/ref/pgbench.sgml | 22 ++++ src/bin/pgbench/pgbench.c | 131 ++++++++++++++++--- src/bin/pgbench/t/001_pgbench_with_server.pl | 79 ++++++++++- 3 files changed, 216 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index 299d93b2419..50cf22ba6ba 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -1110,6 +1110,12 @@ pgbench options d row, the last value is kept. + + \gset and \aset cannot be used in + pipeline mode, since the query results are not yet available by the time + the commands would need them. + + The following example puts the final account balance from the first query into variable abalance, and fills variables @@ -1270,6 +1276,22 @@ SELECT 4 AS four \; SELECT 5 AS five \aset + + + \startpipeline + \endpipeline + + + + These commands delimit the start and end of a pipeline of SQL + statements. In pipeline mode, statements are sent to the server + without waiting for the results of previous statements. See + for more details. + Pipeline mode requires the use of extended query protocol. + + + + diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index f6a214669c1..e69d43b26ba 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -395,10 +395,11 @@ typedef enum * * CSTATE_START_COMMAND starts the execution of a command. On a SQL * command, the command is sent to the server, and we move to - * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set, - * and we enter the CSTATE_SLEEP state to wait for it to expire. Other - * meta-commands are executed immediately. If the command about to start - * is actually beyond the end of the script, advance to CSTATE_END_TX. + * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep + * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to + * wait for it to expire. Other meta-commands are executed immediately. If + * the command about to start is actually beyond the end of the script, + * advance to CSTATE_END_TX. * * CSTATE_WAIT_RESULT waits until we get a result set back from the server * for the current command. @@ -530,7 +531,9 @@ typedef enum MetaCommand META_IF, /* \if */ META_ELIF, /* \elif */ META_ELSE, /* \else */ - META_ENDIF /* \endif */ + META_ENDIF, /* \endif */ + META_STARTPIPELINE, /* \startpipeline */ + META_ENDPIPELINE /* \endpipeline */ } MetaCommand; typedef enum QueryMode @@ -2568,6 +2571,10 @@ getMetaCommand(const char *cmd) mc = META_GSET; else if (pg_strcasecmp(cmd, "aset") == 0) mc = META_ASET; + else if (pg_strcasecmp(cmd, "startpipeline") == 0) + mc = META_STARTPIPELINE; + else if (pg_strcasecmp(cmd, "endpipeline") == 0) + mc = META_ENDPIPELINE; else mc = META_NONE; return mc; @@ -2757,11 +2764,25 @@ sendCommand(CState *st, Command *command) if (commands[j]->type != SQL_COMMAND) continue; preparedStatementName(name, st->use_file, j); - res = PQprepare(st->con, name, - commands[j]->argv[0], commands[j]->argc - 1, NULL); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("%s", PQerrorMessage(st->con)); - PQclear(res); + if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF) + { + res = PQprepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pg_log_error("%s", PQerrorMessage(st->con)); + PQclear(res); + } + else + { + /* + * In pipeline mode, we use asynchronous functions. If a + * server-side error occurs, it will be processed later + * among the other results. + */ + if (!PQsendPrepare(st->con, name, + commands[j]->argv[0], commands[j]->argc - 1, NULL)) + pg_log_error("%s", PQerrorMessage(st->con)); + } } st->prepared[st->use_file] = true; } @@ -2802,10 +2823,11 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) int qrynum = 0; /* - * varprefix should be set only with \gset or \aset, and SQL commands do - * not need it. + * varprefix should be set only with \gset or \aset, and \endpipeline and + * SQL commands do not need it. */ Assert((meta == META_NONE && varprefix == NULL) || + ((meta == META_ENDPIPELINE) && varprefix == NULL) || ((meta == META_GSET || meta == META_ASET) && varprefix != NULL)); res = PQgetResult(st->con); @@ -2874,6 +2896,13 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) /* otherwise the result is simply thrown away by PQclear below */ break; + case PGRES_PIPELINE_SYNC: + pg_log_debug("client %d pipeline ending", st->id); + if (PQexitPipelineMode(st->con) != 1) + pg_log_error("client %d failed to exit pipeline mode: %s", st->id, + PQerrorMessage(st->con)); + break; + default: /* anything else is unexpected */ pg_log_error("client %d script %d aborted in command %d query %d: %s", @@ -3127,13 +3156,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) /* Execute the command */ if (command->type == SQL_COMMAND) { + /* disallow \aset and \gset in pipeline mode */ + if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF) + { + if (command->meta == META_GSET) + { + commandFailed(st, "gset", "\\gset is not allowed in pipeline mode"); + st->state = CSTATE_ABORTED; + break; + } + else if (command->meta == META_ASET) + { + commandFailed(st, "aset", "\\aset is not allowed in pipeline mode"); + st->state = CSTATE_ABORTED; + break; + } + } + if (!sendCommand(st, command)) { commandFailed(st, "SQL", "SQL command send failed"); st->state = CSTATE_ABORTED; } else - st->state = CSTATE_WAIT_RESULT; + { + /* Wait for results, unless in pipeline mode */ + if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF) + st->state = CSTATE_WAIT_RESULT; + else + st->state = CSTATE_END_COMMAND; + } } else if (command->type == META_COMMAND) { @@ -3273,7 +3325,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) if (readCommandResponse(st, sql_script[st->use_file].commands[st->command]->meta, sql_script[st->use_file].commands[st->command]->varprefix)) - st->state = CSTATE_END_COMMAND; + { + /* + * outside of pipeline mode: stop reading results. + * pipeline mode: continue reading results until an + * end-of-pipeline response. + */ + if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) + st->state = CSTATE_END_COMMAND; + } else st->state = CSTATE_ABORTED; break; @@ -3516,6 +3576,45 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) return CSTATE_ABORTED; } } + else if (command->meta == META_STARTPIPELINE) + { + /* + * In pipeline mode, we use a workflow based on libpq pipeline + * functions. + */ + if (querymode == QUERY_SIMPLE) + { + commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol"); + return CSTATE_ABORTED; + } + + if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF) + { + commandFailed(st, "startpipeline", "already in pipeline mode"); + return CSTATE_ABORTED; + } + if (PQenterPipelineMode(st->con) == 0) + { + commandFailed(st, "startpipeline", "failed to enter pipeline mode"); + return CSTATE_ABORTED; + } + } + else if (command->meta == META_ENDPIPELINE) + { + if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) + { + commandFailed(st, "endpipeline", "not in pipeline mode"); + return CSTATE_ABORTED; + } + if (!PQpipelineSync(st->con)) + { + commandFailed(st, "endpipeline", "failed to send a pipeline sync"); + return CSTATE_ABORTED; + } + /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */ + /* collect pending results before getting out of pipeline mode */ + return CSTATE_WAIT_RESULT; + } /* * executing the expression or shell command might have taken a @@ -4725,7 +4824,9 @@ process_backslash_command(PsqlScanState sstate, const char *source) syntax_error(source, lineno, my_command->first_line, my_command->argv[0], "missing command", NULL, -1); } - else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF) + else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF || + my_command->meta == META_STARTPIPELINE || + my_command->meta == META_ENDPIPELINE) { if (my_command->argc != 1) syntax_error(source, lineno, my_command->first_line, my_command->argv[0], diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index daffc18e521..d11c4e8c242 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -41,7 +41,7 @@ sub pgbench # filenames are expected to be unique on a test if (-e $filename) { - ok(0, "$filename must not already exists"); + ok(0, "$filename must not already exist"); unlink $filename or die "cannot unlink $filename: $!"; } append_to_file($filename, $$files{$fn}); @@ -755,6 +755,83 @@ pgbench( } }); +# Working \startpipeline +pgbench( + '-t 1 -n -M extended', + 0, + [ qr{type: .*/001_pgbench_pipeline}, qr{actually processed: 1/1} ], + [], + 'working \startpipeline', + { + '001_pgbench_pipeline' => q{ +-- test startpipeline +\startpipeline +} . "select 1;\n" x 10 . q{ +\endpipeline +} + }); + +# Working \startpipeline in prepared query mode +pgbench( + '-t 1 -n -M prepared', + 0, + [ qr{type: .*/001_pgbench_pipeline_prep}, qr{actually processed: 1/1} ], + [], + 'working \startpipeline', + { + '001_pgbench_pipeline_prep' => q{ +-- test startpipeline +\startpipeline +} . "select 1;\n" x 10 . q{ +\endpipeline +} + }); + +# Try \startpipeline twice +pgbench( + '-t 1 -n -M extended', + 2, + [], + [qr{already in pipeline mode}], + 'error: call \startpipeline twice', + { + '001_pgbench_pipeline_2' => q{ +-- startpipeline twice +\startpipeline +\startpipeline +} + }); + +# Try to end a pipeline that hasn't started +pgbench( + '-t 1 -n -M extended', + 2, + [], + [qr{not in pipeline mode}], + 'error: \endpipeline with no start', + { + '001_pgbench_pipeline_3' => q{ +-- pipeline not started +\endpipeline +} + }); + +# Try \gset in pipeline mode +pgbench( + '-t 1 -n -M extended', + 2, + [], + [qr{gset is not allowed in pipeline mode}], + 'error: \gset not allowed in pipeline mode', + { + '001_pgbench_pipeline_4' => q{ +\startpipeline +select 1 \gset f +\endpipeline +} + }); + + # trigger many expression errors my @errors = (