mirror of
https://github.com/postgres/postgres.git
synced 2025-04-24 10:47:04 +03:00
Allow \watch queries to stop on minimum rows returned
When running a repeat query with \watch in psql, it can be helpful to be able to stop the watch process when the query no longer returns the expected amount of rows. An example would be to watch for the presence of a certain event in pg_stat_activity and stopping when the event is no longer present, or to watch an index creation and stop when the index is created. This adds a min_rows=MIN parameter to \watch which can be set to a non-negative integer, and the watch query will stop executing when it returns less than MIN rows. Author: Greg Sabino Mullane <htamfids@gmail.com> Reviewed-by: Michael Paquier <michael@paquier.xyz> Reviewed-by: Daniel Gustafsson <daniel@yesql.se> Discussion: https://postgr.es/m/CAKAnmmKStATuddYxP71L+p0DHtp9Rvjze3XRoy0Dyw67VQ45UA@mail.gmail.com
This commit is contained in:
parent
95fff2abee
commit
f347ec76e2
@ -3566,13 +3566,14 @@ testdb=> <userinput>\setenv LESS -imx4F</userinput>
|
|||||||
|
|
||||||
|
|
||||||
<varlistentry id="app-psql-meta-command-watch">
|
<varlistentry id="app-psql-meta-command-watch">
|
||||||
<term><literal>\watch [ i[nterval]=<replaceable class="parameter">seconds</replaceable> ] [ c[ount]=<replaceable class="parameter">times</replaceable> ] [ <replaceable class="parameter">seconds</replaceable> ]</literal></term>
|
<term><literal>\watch [ i[nterval]=<replaceable class="parameter">seconds</replaceable> ] [ c[ount]=<replaceable class="parameter">times</replaceable> ] [ m[in_rows]=<replaceable class="parameter">rows</replaceable> ] [ <replaceable class="parameter">seconds</replaceable> ]</literal></term>
|
||||||
<listitem>
|
<listitem>
|
||||||
<para>
|
<para>
|
||||||
Repeatedly execute the current query buffer (as <literal>\g</literal> does)
|
Repeatedly execute the current query buffer (as <literal>\g</literal> does)
|
||||||
until interrupted, or the query fails, or the execution count limit
|
until interrupted, or the query fails, or the execution count limit
|
||||||
(if given) is reached. Wait the specified number of
|
(if given) is reached, or the query no longer returns the minimum number
|
||||||
seconds (default 2) between executions. For backwards compatibility,
|
of rows. Wait the specified number of seconds (default 2) between executions.
|
||||||
|
For backwards compatibility,
|
||||||
<replaceable class="parameter">seconds</replaceable> can be specified
|
<replaceable class="parameter">seconds</replaceable> can be specified
|
||||||
with or without an <literal>interval=</literal> prefix.
|
with or without an <literal>interval=</literal> prefix.
|
||||||
Each query result is
|
Each query result is
|
||||||
|
@ -162,7 +162,7 @@ static bool do_connect(enum trivalue reuse_previous_specification,
|
|||||||
static bool do_edit(const char *filename_arg, PQExpBuffer query_buf,
|
static bool do_edit(const char *filename_arg, PQExpBuffer query_buf,
|
||||||
int lineno, bool discard_on_quit, bool *edited);
|
int lineno, bool discard_on_quit, bool *edited);
|
||||||
static bool do_shell(const char *command);
|
static bool do_shell(const char *command);
|
||||||
static bool do_watch(PQExpBuffer query_buf, double sleep, int iter);
|
static bool do_watch(PQExpBuffer query_buf, double sleep, int iter, int min_rows);
|
||||||
static bool lookup_object_oid(EditableObjectType obj_type, const char *desc,
|
static bool lookup_object_oid(EditableObjectType obj_type, const char *desc,
|
||||||
Oid *obj_oid);
|
Oid *obj_oid);
|
||||||
static bool get_create_object_cmd(EditableObjectType obj_type, Oid oid,
|
static bool get_create_object_cmd(EditableObjectType obj_type, Oid oid,
|
||||||
@ -2775,13 +2775,15 @@ exec_command_watch(PsqlScanState scan_state, bool active_branch,
|
|||||||
{
|
{
|
||||||
bool have_sleep = false;
|
bool have_sleep = false;
|
||||||
bool have_iter = false;
|
bool have_iter = false;
|
||||||
|
bool have_min_rows = false;
|
||||||
double sleep = 2;
|
double sleep = 2;
|
||||||
int iter = 0;
|
int iter = 0;
|
||||||
|
int min_rows = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Parse arguments. We allow either an unlabeled interval or
|
* Parse arguments. We allow either an unlabeled interval or
|
||||||
* "name=value", where name is from the set ('i', 'interval', 'c',
|
* "name=value", where name is from the set ('i', 'interval', 'c',
|
||||||
* 'count').
|
* 'count', 'm', 'min_rows').
|
||||||
*/
|
*/
|
||||||
while (success)
|
while (success)
|
||||||
{
|
{
|
||||||
@ -2838,6 +2840,26 @@ exec_command_watch(PsqlScanState scan_state, bool active_branch,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (strncmp("m=", opt, strlen("m=")) == 0 ||
|
||||||
|
strncmp("min_rows=", opt, strlen("min_rows=")) == 0)
|
||||||
|
{
|
||||||
|
if (have_min_rows)
|
||||||
|
{
|
||||||
|
pg_log_error("\\watch: minimum row count specified more than once");
|
||||||
|
success = false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
have_min_rows = true;
|
||||||
|
errno = 0;
|
||||||
|
min_rows = strtoint(valptr, &opt_end, 10);
|
||||||
|
if (min_rows <= 0 || *opt_end || errno == ERANGE)
|
||||||
|
{
|
||||||
|
pg_log_error("\\watch: incorrect minimum row count \"%s\"", valptr);
|
||||||
|
success = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
pg_log_error("\\watch: unrecognized parameter \"%s\"", opt);
|
pg_log_error("\\watch: unrecognized parameter \"%s\"", opt);
|
||||||
@ -2874,7 +2896,7 @@ exec_command_watch(PsqlScanState scan_state, bool active_branch,
|
|||||||
/* If query_buf is empty, recall and execute previous query */
|
/* If query_buf is empty, recall and execute previous query */
|
||||||
(void) copy_previous_query(query_buf, previous_buf);
|
(void) copy_previous_query(query_buf, previous_buf);
|
||||||
|
|
||||||
success = do_watch(query_buf, sleep, iter);
|
success = do_watch(query_buf, sleep, iter, min_rows);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Reset the query buffer as though for \r */
|
/* Reset the query buffer as though for \r */
|
||||||
@ -5144,7 +5166,7 @@ do_shell(const char *command)
|
|||||||
* onto a bunch of exec_command's variables to silence stupider compilers.
|
* onto a bunch of exec_command's variables to silence stupider compilers.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
do_watch(PQExpBuffer query_buf, double sleep, int iter)
|
do_watch(PQExpBuffer query_buf, double sleep, int iter, int min_rows)
|
||||||
{
|
{
|
||||||
long sleep_ms = (long) (sleep * 1000);
|
long sleep_ms = (long) (sleep * 1000);
|
||||||
printQueryOpt myopt = pset.popt;
|
printQueryOpt myopt = pset.popt;
|
||||||
@ -5274,7 +5296,7 @@ do_watch(PQExpBuffer query_buf, double sleep, int iter)
|
|||||||
myopt.title = title;
|
myopt.title = title;
|
||||||
|
|
||||||
/* Run the query and print out the result */
|
/* Run the query and print out the result */
|
||||||
res = PSQLexecWatch(query_buf->data, &myopt, pagerpipe);
|
res = PSQLexecWatch(query_buf->data, &myopt, pagerpipe, min_rows);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PSQLexecWatch handles the case where we can no longer repeat the
|
* PSQLexecWatch handles the case where we can no longer repeat the
|
||||||
|
@ -36,6 +36,7 @@ static int ExecQueryAndProcessResults(const char *query,
|
|||||||
double *elapsed_msec,
|
double *elapsed_msec,
|
||||||
bool *svpt_gone_p,
|
bool *svpt_gone_p,
|
||||||
bool is_watch,
|
bool is_watch,
|
||||||
|
int min_rows,
|
||||||
const printQueryOpt *opt,
|
const printQueryOpt *opt,
|
||||||
FILE *printQueryFout);
|
FILE *printQueryFout);
|
||||||
static bool command_no_begin(const char *query);
|
static bool command_no_begin(const char *query);
|
||||||
@ -632,7 +633,7 @@ PSQLexec(const char *query)
|
|||||||
* e.g., because of the interrupt, -1 on error.
|
* e.g., because of the interrupt, -1 on error.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
PSQLexecWatch(const char *query, const printQueryOpt *opt, FILE *printQueryFout)
|
PSQLexecWatch(const char *query, const printQueryOpt *opt, FILE *printQueryFout, int min_rows)
|
||||||
{
|
{
|
||||||
bool timing = pset.timing;
|
bool timing = pset.timing;
|
||||||
double elapsed_msec = 0;
|
double elapsed_msec = 0;
|
||||||
@ -646,7 +647,7 @@ PSQLexecWatch(const char *query, const printQueryOpt *opt, FILE *printQueryFout)
|
|||||||
|
|
||||||
SetCancelConn(pset.db);
|
SetCancelConn(pset.db);
|
||||||
|
|
||||||
res = ExecQueryAndProcessResults(query, &elapsed_msec, NULL, true, opt, printQueryFout);
|
res = ExecQueryAndProcessResults(query, &elapsed_msec, NULL, true, min_rows, opt, printQueryFout);
|
||||||
|
|
||||||
ResetCancelConn();
|
ResetCancelConn();
|
||||||
|
|
||||||
@ -1134,7 +1135,7 @@ SendQuery(const char *query)
|
|||||||
pset.crosstab_flag || !is_select_command(query))
|
pset.crosstab_flag || !is_select_command(query))
|
||||||
{
|
{
|
||||||
/* Default fetch-it-all-and-print mode */
|
/* Default fetch-it-all-and-print mode */
|
||||||
OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, NULL, NULL) > 0);
|
OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1415,11 +1416,12 @@ DescribeQuery(const char *query, double *elapsed_msec)
|
|||||||
static int
|
static int
|
||||||
ExecQueryAndProcessResults(const char *query,
|
ExecQueryAndProcessResults(const char *query,
|
||||||
double *elapsed_msec, bool *svpt_gone_p,
|
double *elapsed_msec, bool *svpt_gone_p,
|
||||||
bool is_watch,
|
bool is_watch, int min_rows,
|
||||||
const printQueryOpt *opt, FILE *printQueryFout)
|
const printQueryOpt *opt, FILE *printQueryFout)
|
||||||
{
|
{
|
||||||
bool timing = pset.timing;
|
bool timing = pset.timing;
|
||||||
bool success;
|
bool success;
|
||||||
|
bool return_early = false;
|
||||||
instr_time before,
|
instr_time before,
|
||||||
after;
|
after;
|
||||||
PGresult *result;
|
PGresult *result;
|
||||||
@ -1461,6 +1463,10 @@ ExecQueryAndProcessResults(const char *query,
|
|||||||
|
|
||||||
/* first result */
|
/* first result */
|
||||||
result = PQgetResult(pset.db);
|
result = PQgetResult(pset.db);
|
||||||
|
if (min_rows > 0 && PQntuples(result) < min_rows)
|
||||||
|
{
|
||||||
|
return_early = true;
|
||||||
|
}
|
||||||
|
|
||||||
while (result != NULL)
|
while (result != NULL)
|
||||||
{
|
{
|
||||||
@ -1683,7 +1689,10 @@ ExecQueryAndProcessResults(const char *query,
|
|||||||
if (!CheckConnection())
|
if (!CheckConnection())
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
return cancel_pressed ? 0 : success ? 1 : -1;
|
if (cancel_pressed || return_early)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
return success ? 1 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ extern void psql_setup_cancel_handler(void);
|
|||||||
extern void SetShellResultVariables(int wait_result);
|
extern void SetShellResultVariables(int wait_result);
|
||||||
|
|
||||||
extern PGresult *PSQLexec(const char *query);
|
extern PGresult *PSQLexec(const char *query);
|
||||||
extern int PSQLexecWatch(const char *query, const printQueryOpt *opt, FILE *printQueryFout);
|
extern int PSQLexecWatch(const char *query, const printQueryOpt *opt, FILE *printQueryFout, int min_rows);
|
||||||
|
|
||||||
extern bool SendQuery(const char *query);
|
extern bool SendQuery(const char *query);
|
||||||
|
|
||||||
|
@ -200,7 +200,9 @@ slashUsage(unsigned short int pager)
|
|||||||
HELP0(" \\gset [PREFIX] execute query and store result in psql variables\n");
|
HELP0(" \\gset [PREFIX] execute query and store result in psql variables\n");
|
||||||
HELP0(" \\gx [(OPTIONS)] [FILE] as \\g, but forces expanded output mode\n");
|
HELP0(" \\gx [(OPTIONS)] [FILE] as \\g, but forces expanded output mode\n");
|
||||||
HELP0(" \\q quit psql\n");
|
HELP0(" \\q quit psql\n");
|
||||||
HELP0(" \\watch [[i=]SEC] [c=N] execute query every SEC seconds, up to N times\n");
|
HELP0(" \\watch [[i=]SEC] [c=N] [m=MIN]\n");
|
||||||
|
HELP0(" execute query every SEC seconds, up to N times\n");
|
||||||
|
HELP0(" stop if less than MIN rows are returned\n");
|
||||||
HELP0("\n");
|
HELP0("\n");
|
||||||
|
|
||||||
HELP0("Help\n");
|
HELP0("Help\n");
|
||||||
|
@ -355,6 +355,29 @@ psql_like(
|
|||||||
psql_like($node, sprintf('SELECT 1 \watch c=3 i=%g', 0.01),
|
psql_like($node, sprintf('SELECT 1 \watch c=3 i=%g', 0.01),
|
||||||
qr/1\n1\n1/, '\watch with 3 iterations');
|
qr/1\n1\n1/, '\watch with 3 iterations');
|
||||||
|
|
||||||
|
# Check \watch minimum row count
|
||||||
|
psql_fails_like(
|
||||||
|
$node,
|
||||||
|
'SELECT 3 \watch m=x',
|
||||||
|
qr/incorrect minimum row count/,
|
||||||
|
'\watch, invalid minimum row setting');
|
||||||
|
|
||||||
|
psql_fails_like(
|
||||||
|
$node,
|
||||||
|
'SELECT 3 \watch m=1 min_rows=2',
|
||||||
|
qr/minimum row count specified more than once/,
|
||||||
|
'\watch, minimum rows is specified more than once');
|
||||||
|
|
||||||
|
psql_like(
|
||||||
|
$node,
|
||||||
|
q{with x as (
|
||||||
|
select now()-backend_start AS howlong
|
||||||
|
from pg_stat_activity
|
||||||
|
where pid = pg_backend_pid()
|
||||||
|
) select 123 from x where howlong < '2 seconds' \watch i=0.5 m=2},
|
||||||
|
qr/^123$/,
|
||||||
|
'\watch, 2 minimum rows');
|
||||||
|
|
||||||
# Check \watch errors
|
# Check \watch errors
|
||||||
psql_fails_like(
|
psql_fails_like(
|
||||||
$node,
|
$node,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user