diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index 05d3f81619f..279bb0ad7df 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -1386,13 +1386,19 @@ SELECT 4 AS four \; SELECT 5 AS five \aset \startpipeline + \syncpipeline \endpipeline - These commands delimit the start and end of a pipeline of SQL - statements. In pipeline mode, statements are sent to the server - without waiting for the results of previous statements. See + This group of commands implements pipelining of SQL statements. + A pipeline must begin with a \startpipeline + and end with an \endpipeline. In between there + may be any number of \syncpipeline commands, + which sends a sync message + without ending the ongoing pipeline and flushing the send buffer. + In pipeline mode, statements are sent to the server without waiting + for the results of previous statements. See for more details. Pipeline mode requires the use of extended query protocol. diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 7b53f9c24da..af1f75257ff 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -608,6 +608,7 @@ typedef struct int use_file; /* index in sql_script for this client */ int command; /* command number in script */ + int num_syncs; /* number of ongoing sync commands */ /* client variables */ Variables variables; @@ -697,6 +698,7 @@ typedef enum MetaCommand META_ELSE, /* \else */ META_ENDIF, /* \endif */ META_STARTPIPELINE, /* \startpipeline */ + META_SYNCPIPELINE, /* \syncpipeline */ META_ENDPIPELINE, /* \endpipeline */ } MetaCommand; @@ -2902,6 +2904,8 @@ getMetaCommand(const char *cmd) mc = META_ASET; else if (pg_strcasecmp(cmd, "startpipeline") == 0) mc = META_STARTPIPELINE; + else if (pg_strcasecmp(cmd, "syncpipeline") == 0) + mc = META_SYNCPIPELINE; else if (pg_strcasecmp(cmd, "endpipeline") == 0) mc = META_ENDPIPELINE; else @@ -3317,8 +3321,10 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) break; case PGRES_PIPELINE_SYNC: - pg_log_debug("client %d pipeline ending", st->id); - if (PQexitPipelineMode(st->con) != 1) + pg_log_debug("client %d pipeline ending, ongoing syncs: %d", + st->id, st->num_syncs); + st->num_syncs--; + if (st->num_syncs == 0 && PQexitPipelineMode(st->con) != 1) pg_log_error("client %d failed to exit pipeline mode: %s", st->id, PQerrorMessage(st->con)); break; @@ -4449,6 +4455,20 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) return CSTATE_ABORTED; } } + else if (command->meta == META_SYNCPIPELINE) + { + if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) + { + commandFailed(st, "syncpipeline", "not in pipeline mode"); + return CSTATE_ABORTED; + } + if (PQsendPipelineSync(st->con) == 0) + { + commandFailed(st, "syncpipeline", "failed to send a pipeline sync"); + return CSTATE_ABORTED; + } + st->num_syncs++; + } else if (command->meta == META_ENDPIPELINE) { if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) @@ -4461,6 +4481,7 @@ executeMetaCommand(CState *st, pg_time_usec_t *now) commandFailed(st, "endpipeline", "failed to send a pipeline sync"); return CSTATE_ABORTED; } + st->num_syncs++; /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */ /* collect pending results before getting out of pipeline mode */ return CSTATE_WAIT_RESULT; @@ -5794,7 +5815,8 @@ process_backslash_command(PsqlScanState sstate, const char *source) } else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF || my_command->meta == META_STARTPIPELINE || - my_command->meta == META_ENDPIPELINE) + my_command->meta == META_ENDPIPELINE || + my_command->meta == META_SYNCPIPELINE) { if (my_command->argc != 1) syntax_error(source, lineno, my_command->first_line, my_command->argv[0], diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index f60f7e89c1f..5d2341a2035 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -814,6 +814,27 @@ $node->pgbench( } }); +# Working \startpipeline with \syncpipeline +$node->pgbench( + '-t 1 -n -M extended', + 0, + [ qr{type: .*/001_pgbench_pipeline_sync}, qr{actually processed: 1/1} ], + [], + 'working \startpipeline with \syncpipeline', + { + '001_pgbench_pipeline_sync' => q{ +-- test startpipeline +\startpipeline +select 1; +\syncpipeline +\syncpipeline +select 2; +\syncpipeline +select 3; +\endpipeline +} + }); + # Working \startpipeline in prepared query mode $node->pgbench( '-t 1 -n -M prepared', @@ -904,6 +925,21 @@ $node->pgbench( } }); +# Try \startpipeline with \syncpipeline without \endpipeline +$node->pgbench( + '-t 2 -n -M extended', + 2, + [], + [qr{end of script reached with pipeline open}], + 'error: call \startpipeline and \syncpipeline without \endpipeline', + { + '001_pgbench_pipeline_7' => q{ +-- startpipeline with \syncpipeline only +\startpipeline +\syncpipeline +} + }); + # Working \startpipeline in prepared query mode with serializable $node->pgbench( '-c4 -t 10 -n -M prepared',