1
0
mirror of https://github.com/postgres/postgres.git synced 2025-11-04 20:11:56 +03:00

Improve test coverage for LISTEN/NOTIFY.

Back-patch commit b10f40bf0 into older branches.  This adds reporting
of NOTIFY messages to isolationtester.c, and extends the async-notify
test to include direct tests of basic NOTIFY functionality.

This provides useful infrastructure for testing a bug fix I'm about
to back-patch, and there seems no good reason not to have better tests
of LISTEN/NOTIFY in the back branches.  The commit's survived long
enough in HEAD to make it unlikely that it will cause problems.

Back-patch as far as 9.6.  isolationtester.c changed too much in 9.6
to make it sane to try to fix older branches this way, and I don't
really want to back-patch those changes too.

Discussion: https://postgr.es/m/31304.1564246011@sss.pgh.pa.us
This commit is contained in:
Tom Lane
2019-11-23 17:30:00 -05:00
parent 8047a7b9dd
commit 7d4c311813
3 changed files with 210 additions and 50 deletions

View File

@@ -23,10 +23,12 @@
/*
* conns[0] is the global setup, teardown, and watchdog connection. Additional
* connections represent spec-defined sessions.
* connections represent spec-defined sessions. We also track the backend
* PID, in numeric and string formats, for each connection.
*/
static PGconn **conns = NULL;
static const char **backend_pids = NULL;
static int *backend_pids = NULL;
static const char **backend_pid_strs = NULL;
static int nconns = 0;
/* In dry run only output permutations to be run by the tester. */
@@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps);
#define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */
#define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
static bool try_complete_step(Step *step, int flags);
static bool try_complete_step(TestSpec *testspec, Step *step, int flags);
static int step_qsort_cmp(const void *a, const void *b);
static int step_bsearch_cmp(const void *a, const void *b);
@@ -159,9 +161,11 @@ main(int argc, char **argv)
* extra for lock wait detection and global work.
*/
nconns = 1 + testspec->nsessions;
conns = calloc(nconns, sizeof(PGconn *));
conns = (PGconn **) pg_malloc0(nconns * sizeof(PGconn *));
backend_pids = pg_malloc0(nconns * sizeof(*backend_pids));
backend_pid_strs = pg_malloc0(nconns * sizeof(*backend_pid_strs));
atexit(disconnect_atexit);
backend_pids = calloc(nconns, sizeof(*backend_pids));
for (i = 0; i < nconns; i++)
{
conns[i] = PQconnectdb(conninfo);
@@ -187,26 +191,9 @@ main(int argc, char **argv)
blackholeNoticeProcessor,
NULL);
/* Get the backend pid for lock wait checking. */
res = PQexec(conns[i], "SELECT pg_catalog.pg_backend_pid()");
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
if (PQntuples(res) == 1 && PQnfields(res) == 1)
backend_pids[i] = pg_strdup(PQgetvalue(res, 0, 0));
else
{
fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column",
PQntuples(res), PQnfields(res));
exit(1);
}
}
else
{
fprintf(stderr, "backend pid query failed: %s",
PQerrorMessage(conns[i]));
exit(1);
}
PQclear(res);
/* Save each connection's backend PID for subsequent use. */
backend_pids[i] = PQbackendPID(conns[i]);
backend_pid_strs[i] = psprintf("%d", backend_pids[i]);
}
/* Set the session index fields in steps. */
@@ -231,9 +218,9 @@ main(int argc, char **argv)
appendPQExpBufferStr(&wait_query,
"SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
/* The spec syntax requires at least one session; assume that here. */
appendPQExpBufferStr(&wait_query, backend_pids[1]);
appendPQExpBufferStr(&wait_query, backend_pid_strs[1]);
for (i = 2; i < nconns; i++)
appendPQExpBuffer(&wait_query, ",%s", backend_pids[i]);
appendPQExpBuffer(&wait_query, ",%s", backend_pid_strs[i]);
appendPQExpBufferStr(&wait_query, "}')");
res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL);
@@ -549,7 +536,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
oldstep = waiting[w];
/* Wait for previous step on this connection. */
try_complete_step(oldstep, STEP_RETRY);
try_complete_step(testspec, oldstep, STEP_RETRY);
/* Remove that step from the waiting[] array. */
if (w + 1 < nwaiting)
@@ -571,7 +558,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
nerrorstep = 0;
while (w < nwaiting)
{
if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
if (try_complete_step(testspec, waiting[w],
STEP_NONBLOCK | STEP_RETRY))
{
/* Still blocked on a lock, leave it alone. */
w++;
@@ -600,14 +588,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
}
/* Try to complete this step without blocking. */
mustwait = try_complete_step(step, STEP_NONBLOCK);
mustwait = try_complete_step(testspec, step, STEP_NONBLOCK);
/* Check for completion of any steps that were previously waiting. */
w = 0;
nerrorstep = 0;
while (w < nwaiting)
{
if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
if (try_complete_step(testspec, waiting[w],
STEP_NONBLOCK | STEP_RETRY))
w++;
else
{
@@ -630,7 +619,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
/* Wait for any remaining queries. */
for (w = 0; w < nwaiting; ++w)
{
try_complete_step(waiting[w], STEP_RETRY);
try_complete_step(testspec, waiting[w], STEP_RETRY);
report_error_message(waiting[w]);
}
@@ -693,7 +682,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
* a lock, returns true. Otherwise, returns false.
*/
static bool
try_complete_step(Step *step, int flags)
try_complete_step(TestSpec *testspec, Step *step, int flags)
{
PGconn *conn = conns[1 + step->session];
fd_set read_set;
@@ -702,6 +691,7 @@ try_complete_step(Step *step, int flags)
int sock = PQsocket(conn);
int ret;
PGresult *res;
PGnotify *notify;
bool canceled = false;
if (sock < 0)
@@ -738,7 +728,7 @@ try_complete_step(Step *step, int flags)
bool waiting;
res = PQexecPrepared(conns[0], PREP_WAITING, 1,
&backend_pids[step->session + 1],
&backend_pid_strs[step->session + 1],
NULL, NULL, 0);
if (PQresultStatus(res) != PGRES_TUPLES_OK ||
PQntuples(res) != 1)
@@ -880,6 +870,35 @@ try_complete_step(Step *step, int flags)
PQclear(res);
}
/* Report any available NOTIFY messages, too */
PQconsumeInput(conn);
while ((notify = PQnotifies(conn)) != NULL)
{
/* Try to identify which session it came from */
const char *sendername = NULL;
char pidstring[32];
for (int i = 0; i < testspec->nsessions; i++)
{
if (notify->be_pid == backend_pids[i + 1])
{
sendername = testspec->sessions[i]->name;
break;
}
}
if (sendername == NULL)
{
/* Doesn't seem to be any test session, so show the hard way */
snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
sendername = pidstring;
}
printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
testspec->sessions[step->session]->name,
notify->relname, notify->extra, sendername);
PQfreemem(notify);
PQconsumeInput(conn);
}
return false;
}