1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-28 23:42:10 +03:00

Add the ability to compute per-statement latencies (ie, average execution

times) to pgbench.

Florian Pflug, reviewed by Greg Smith
This commit is contained in:
Tom Lane
2010-08-12 20:39:39 +00:00
parent 47eeb5e662
commit 5a4e19abe6
2 changed files with 256 additions and 66 deletions

View File

@ -4,7 +4,7 @@
* A simple benchmark program for PostgreSQL
* Originally written by Tatsuo Ishii and enhanced by many contributors.
*
* $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.99 2010/07/06 19:18:55 momjian Exp $
* $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.100 2010/08/12 20:39:39 tgl Exp $
* Copyright (c) 2000-2010, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
@ -133,6 +133,7 @@ int fillfactor = 100;
bool use_log; /* log transaction latencies to a file */
bool is_connect; /* establish connection for each transaction */
bool is_latencies; /* report per-command latencies */
int main_pid; /* main process id used in log filename */
char *pghost = "";
@ -171,7 +172,8 @@ typedef struct
int64 until; /* napping until (usec) */
Variable *variables; /* array of variable definitions */
int nvariables;
instr_time txn_begin; /* used for measuring latencies */
instr_time txn_begin; /* used for measuring transaction latencies */
instr_time stmt_begin; /* used for measuring statement latencies */
int use_file; /* index in sql_files for this client */
bool prepared[MAX_FILES];
} CState;
@ -186,6 +188,8 @@ typedef struct
CState *state; /* array of CState */
int nstate; /* length of state[] */
instr_time start_time; /* thread start time */
instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
int *exec_count; /* number of cmd executions (per Command) */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@ -216,13 +220,16 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
typedef struct
{
char *line; /* full text of command line */
int command_num; /* unique index of this Command struct */
int type; /* command type (SQL_COMMAND or META_COMMAND) */
int argc; /* number of commands */
char *argv[MAX_ARGS]; /* command list */
int argc; /* number of command words */
char *argv[MAX_ARGS]; /* command word list */
} Command;
static Command **sql_files[MAX_FILES]; /* SQL script files */
static int num_files; /* number of script files */
static int num_commands = 0; /* total number of Command structs */
static int debug = 0; /* debug flag */
/* default scenario */
@ -287,6 +294,7 @@ usage(const char *progname)
" define variable for use by custom script\n"
" -f FILENAME read transaction script from FILENAME\n"
" -j NUM number of threads (default: 1)\n"
" -r report average latency per command\n"
" -l write transaction times to log file\n"
" -M {simple|extended|prepared}\n"
" protocol for submitting queries to server (default: simple)\n"
@ -629,11 +637,13 @@ runShellCommand(CState *st, char *variable, char **argv, int argc)
char *endptr;
int retval;
/*
* Join arguments with whilespace separaters. Arguments starting with
* exactly one colon are treated as variables: name - append a string
* "name" :var - append a variable named 'var'. ::name - append a string
* ":name"
/*----------
* Join arguments with whitespace separators. Arguments starting with
* exactly one colon are treated as variables:
* name - append a string "name"
* :var - append a variable named 'var'
* ::name - append a string ":name"
*----------
*/
for (i = 0; i < argc; i++)
{
@ -740,7 +750,7 @@ clientDone(CState *st, bool ok)
/* return false iff client should be disconnected */
static bool
doCustom(CState *st, instr_time *conn_time, FILE *logfile)
doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile)
{
PGresult *res;
Command **commands;
@ -775,7 +785,22 @@ top:
}
/*
* transaction finished: record the time it took in the log
* command finished: accumulate per-command execution times in
* thread-local data structure, if per-command latencies are requested
*/
if (is_latencies)
{
instr_time now;
int cnum = commands[st->state]->command_num;
INSTR_TIME_SET_CURRENT(now);
INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
now, st->stmt_begin);
thread->exec_count[cnum]++;
}
/*
* if transaction finished, record the time it took in the log
*/
if (logfile && commands[st->state + 1] == NULL)
{
@ -802,6 +827,10 @@ top:
if (commands[st->state]->type == SQL_COMMAND)
{
/*
* Read and discard the query result; note this is not included
* in the statement latency numbers.
*/
res = PQgetResult(st->con);
switch (PQresultStatus(res))
{
@ -856,9 +885,14 @@ top:
INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
}
/* Record transaction start time if logging is enabled */
if (logfile && st->state == 0)
INSTR_TIME_SET_CURRENT(st->txn_begin);
/* Record statement start time if per-command latencies are requested */
if (is_latencies)
INSTR_TIME_SET_CURRENT(st->stmt_begin);
if (commands[st->state]->type == SQL_COMMAND)
{
const Command *command = commands[st->state];
@ -1351,6 +1385,7 @@ parseQuery(Command *cmd, const char *raw_sql)
return true;
}
/* Parse a command; return a Command struct, or NULL if it's a comment */
static Command *
process_commands(char *buf)
{
@ -1361,24 +1396,28 @@ process_commands(char *buf)
char *p,
*tok;
/* Make the string buf end at the next newline */
if ((p = strchr(buf, '\n')) != NULL)
*p = '\0';
/* Skip leading whitespace */
p = buf;
while (isspace((unsigned char) *p))
p++;
/* If the line is empty or actually a comment, we're done */
if (*p == '\0' || strncmp(p, "--", 2) == 0)
{
return NULL;
}
/* Allocate and initialize Command structure */
my_commands = (Command *) malloc(sizeof(Command));
if (my_commands == NULL)
{
return NULL;
}
my_commands->line = strdup(buf);
if (my_commands->line == NULL)
return NULL;
my_commands->command_num = num_commands++;
my_commands->type = 0; /* until set */
my_commands->argc = 0;
if (*p == '\\')
@ -1547,26 +1586,13 @@ process_file(char *filename)
while (fgets(buf, sizeof(buf), fd) != NULL)
{
Command *commands;
int i;
Command *command;
i = 0;
while (isspace((unsigned char) buf[i]))
i++;
if (buf[i] != '\0' && strncmp(&buf[i], "--", 2) != 0)
{
commands = process_commands(&buf[i]);
if (commands == NULL)
{
fclose(fd);
return false;
}
}
else
command = process_commands(buf);
if (command == NULL)
continue;
my_commands[lineno] = commands;
my_commands[lineno] = command;
lineno++;
if (lineno >= alloc_num)
@ -1612,7 +1638,7 @@ process_builtin(char *tb)
for (;;)
{
char *p;
Command *commands;
Command *command;
p = buf;
while (*tb && *tb != '\n')
@ -1626,13 +1652,11 @@ process_builtin(char *tb)
*p = '\0';
commands = process_commands(buf);
if (commands == NULL)
{
return NULL;
}
command = process_commands(buf);
if (command == NULL)
continue;
my_commands[lineno] = commands;
my_commands[lineno] = command;
lineno++;
if (lineno >= alloc_num)
@ -1653,7 +1677,8 @@ process_builtin(char *tb)
/* print out results */
static void
printResults(int ttype, int normal_xacts, int nclients, int nthreads,
printResults(int ttype, int normal_xacts, int nclients,
TState *threads, int nthreads,
instr_time total_time, instr_time conn_total_time)
{
double time_include,
@ -1694,6 +1719,51 @@ printResults(int ttype, int normal_xacts, int nclients, int nthreads,
}
printf("tps = %f (including connections establishing)\n", tps_include);
printf("tps = %f (excluding connections establishing)\n", tps_exclude);
/* Report per-command latencies */
if (is_latencies)
{
int i;
for (i = 0; i < num_files; i++)
{
Command **commands;
if (num_files > 1)
printf("statement latencies in milliseconds, file %d:\n", i+1);
else
printf("statement latencies in milliseconds:\n");
for (commands = sql_files[i]; *commands != NULL; commands++)
{
Command *command = *commands;
int cnum = command->command_num;
double total_time;
instr_time total_exec_elapsed;
int total_exec_count;
int t;
/* Accumulate per-thread data for command */
INSTR_TIME_SET_ZERO(total_exec_elapsed);
total_exec_count = 0;
for (t = 0; t < nthreads; t++)
{
TState *thread = &threads[t];
INSTR_TIME_ADD(total_exec_elapsed,
thread->exec_elapsed[cnum]);
total_exec_count += thread->exec_count[cnum];
}
if (total_exec_count > 0)
total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
else
total_time = 0.0;
printf("\t%f\t%s\n", total_time, command->line);
}
}
}
}
@ -1770,7 +1840,7 @@ main(int argc, char **argv)
memset(state, 0, sizeof(*state));
while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
while ((c = getopt(argc, argv, "ih:nvp:dSNc:j:Crs:t:T:U:lf:D:F:M:")) != -1)
{
switch (c)
{
@ -1834,6 +1904,9 @@ main(int argc, char **argv)
case 'C':
is_connect = true;
break;
case 'r':
is_latencies = true;
break;
case 's':
scale_given = true;
scale = atoi(optarg);
@ -1954,6 +2027,22 @@ main(int argc, char **argv)
exit(1);
}
/*
* is_latencies only works with multiple threads in thread-based
* implementations, not fork-based ones, because it supposes that the
* parent can see changes made to the per-thread execution stats by child
* threads. It seems useful enough to accept despite this limitation,
* but perhaps we should FIXME someday (by passing the stats data back
* up through the parent-to-child pipes).
*/
#ifndef ENABLE_THREAD_SAFETY
if (is_latencies && nthreads > 1)
{
fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
exit(1);
}
#endif
/*
* save main process id in the global variable because process id will be
* changed after fork.
@ -2091,6 +2180,39 @@ main(int argc, char **argv)
break;
}
/* set up thread data structures */
threads = (TState *) malloc(sizeof(TState) * nthreads);
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
thread->tid = i;
thread->state = &state[nclients / nthreads * i];
thread->nstate = nclients / nthreads;
if (is_latencies)
{
/* Reserve memory for the thread to store per-command latencies */
int t;
thread->exec_elapsed = (instr_time *)
malloc(sizeof(instr_time) * num_commands);
thread->exec_count = (int *)
malloc(sizeof(int) * num_commands);
for (t = 0; t < num_commands; t++)
{
INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
thread->exec_count[t] = 0;
}
}
else
{
thread->exec_elapsed = NULL;
thread->exec_count = NULL;
}
}
/* get start up time */
INSTR_TIME_SET_CURRENT(start_time);
@ -2099,20 +2221,18 @@ main(int argc, char **argv)
setalarm(duration);
/* start threads */
threads = (TState *) malloc(sizeof(TState) * nthreads);
for (i = 0; i < nthreads; i++)
{
threads[i].tid = i;
threads[i].state = &state[nclients / nthreads * i];
threads[i].nstate = nclients / nthreads;
INSTR_TIME_SET_CURRENT(threads[i].start_time);
TState *thread = &threads[i];
INSTR_TIME_SET_CURRENT(thread->start_time);
/* the first thread (i = 0) is executed by main thread */
if (i > 0)
{
int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
int err = pthread_create(&thread->thread, NULL, threadRun, thread);
if (err != 0 || threads[i].thread == INVALID_THREAD)
if (err != 0 || thread->thread == INVALID_THREAD)
{
fprintf(stderr, "cannot create thread: %s\n", strerror(err));
exit(1);
@ -2120,7 +2240,7 @@ main(int argc, char **argv)
}
else
{
threads[i].thread = INVALID_THREAD;
thread->thread = INVALID_THREAD;
}
}
@ -2150,7 +2270,8 @@ main(int argc, char **argv)
/* get end time */
INSTR_TIME_SET_CURRENT(total_time);
INSTR_TIME_SUBTRACT(total_time, start_time);
printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time);
printResults(ttype, total_xacts, nclients, threads, nthreads,
total_time, conn_total_time);
return 0;
}
@ -2211,7 +2332,7 @@ threadRun(void *arg)
int prev_ecnt = st->ecnt;
st->use_file = getrand(0, num_files - 1);
if (!doCustom(st, &result->conn_time, logfile))
if (!doCustom(thread, st, &result->conn_time, logfile))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@ -2313,7 +2434,7 @@ threadRun(void *arg)
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|| commands[st->state]->type == META_COMMAND))
{
if (!doCustom(st, &result->conn_time, logfile))
if (!doCustom(thread, st, &result->conn_time, logfile))
remains--; /* I've aborted */
}