1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Add -f option which enables to read SQL commands from a file.

Patches Contributed by Tomoaki Sato.
This commit is contained in:
Tatsuo Ishii
2005-09-29 13:44:25 +00:00
parent 8928d4d69d
commit 9b19abd74f
3 changed files with 668 additions and 94 deletions

View File

@ -1,10 +1,10 @@
/*
* $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.36 2005/05/24 00:26:40 neilc Exp $
* $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.37 2005/09/29 13:44:25 ishii Exp $
*
* pgbench: a simple TPC-B like benchmark program for PostgreSQL
* written by Tatsuo Ishii
*
* Copyright (c) 2000-2004 Tatsuo Ishii
* Copyright (c) 2000-2005 Tatsuo Ishii
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
@ -41,6 +41,9 @@
#include <sys/resource.h>
#endif /* ! WIN32 */
#include <ctype.h>
#include <search.h>
extern char *optarg;
extern int optind;
@ -72,6 +75,9 @@ int tps = 1;
#define ntellers 10
#define naccounts 100000
#define SQL_COMMAND 1
#define META_COMMAND 2
FILE *LOGFILE = NULL;
bool use_log; /* log transaction latencies to a file */
@ -89,6 +95,12 @@ char *login = NULL;
char *pwd = NULL;
char *dbName;
typedef struct
{
char *name;
char *value;
} Variable;
typedef struct
{
PGconn *con; /* connection handle to DB */
@ -103,13 +115,23 @@ typedef struct
int tid; /* teller id for this transaction */
int delta;
int abalance;
void *variables;
struct timeval txn_begin; /* used for measuring latencies */
} CState;
typedef struct
{
int type;
int argc;
char **argv;
} Command;
Command **commands = NULL;
static void
usage(void)
{
fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-l][-U login][-P password][-d][dbname]\n");
fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-f filename][-l][-U login][-P password][-d][dbname]\n");
fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
}
@ -190,6 +212,115 @@ check(CState * state, PGresult *res, int n, int good)
return (0); /* OK */
}
static int
compareVariables(const void *v1, const void *v2)
{
return strcmp(((Variable *)v1)->name, ((Variable *)v2)->name);
}
static char *
getVariable(CState * st, char *name)
{
Variable key = { name }, *var;
var = tfind(&key, &st->variables, compareVariables);
if (var != NULL)
return (*(Variable **)var)->value;
else
return NULL;
}
static int
putVariable(CState * st, char *name, char *value)
{
Variable key = { name }, *var;
var = tfind(&key, &st->variables, compareVariables);
if (var == NULL)
{
if ((var = malloc(sizeof(Variable))) == NULL)
return false;
var->name = NULL;
var->value = NULL;
if ((var->name = strdup(name)) == NULL
|| (var->value = strdup(value)) == NULL
|| tsearch(var, &st->variables, compareVariables) == NULL)
{
free(var->name);
free(var->value);
free(var);
return false;
}
}
else
{
free((*(Variable **)var)->value);
if (((*(Variable **)var)->value = strdup(value)) == NULL)
return false;
}
return true;
}
static char *
assignVariables(CState * st, char *sql)
{
int i, j;
char *p, *name, *val;
void *tmp;
i = 0;
while ((p = strchr(&sql[i], ':')) != NULL)
{
i = j = p - sql;
do
i++;
while (isalnum(sql[i]) != 0 || sql[i] == '_');
if (i == j + 1)
continue;
if ((name = strndup(&sql[j + 1], i - (j + 1))) == NULL)
return NULL;
val = getVariable(st, name);
free(name);
if (val == NULL)
continue;
if (strlen(val) > i - j)
{
tmp = realloc(sql, strlen(sql) - (i - j) + strlen(val) + 1);
if (tmp == NULL)
{
free(sql);
return NULL;
}
sql = tmp;
}
if (strlen(val) != i - j)
memmove(&sql[j + strlen(val)], &sql[i], strlen(&sql[i]) + 1);
strncpy(&sql[j], val, strlen(val));
if (strlen(val) < i - j)
{
tmp = realloc(sql, strlen(sql) + 1);
if (tmp == NULL)
{
free(sql);
return NULL;
}
sql = tmp;
}
i = j + strlen(val);
}
return sql;
}
/* process a transaction */
static void
doOne(CState * state, int n, int debug, int ttype)
@ -465,6 +596,170 @@ doSelectOnly(CState * state, int n, int debug)
}
}
static void
doCustom(CState * state, int n, int debug)
{
PGresult *res;
CState *st = &state[n];
if (st->listen)
{ /* are we receiver? */
if (commands[st->state]->type == SQL_COMMAND)
{
if (debug)
fprintf(stderr, "client %d receiving\n", n);
if (!PQconsumeInput(st->con))
{ /* there's something wrong */
fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
return;
}
if (PQisBusy(st->con))
return; /* don't have the whole result yet */
}
/*
* transaction finished: record the time it took in the
* log
*/
if (use_log && commands[st->state + 1] == NULL)
{
double diff;
struct timeval now;
gettimeofday(&now, NULL);
diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
(int) (now.tv_usec - st->txn_begin.tv_usec);
fprintf(LOGFILE, "%d %d %.0f\n", st->id, st->cnt, diff);
}
if (commands[st->state]->type == SQL_COMMAND)
{
res = PQgetResult(st->con);
if (strncasecmp(commands[st->state]->argv[0], "select", 6) != 0)
{
if (check(state, res, n, PGRES_COMMAND_OK))
return;
}
else
{
if (check(state, res, n, PGRES_TUPLES_OK))
return;
}
PQclear(res);
discard_response(st);
}
if (commands[st->state + 1] == NULL)
{
if (is_connect)
{
PQfinish(st->con);
st->con = NULL;
}
if (++st->cnt >= nxacts)
{
remains--; /* I'm done */
if (st->con != NULL)
{
PQfinish(st->con);
st->con = NULL;
}
return;
}
}
/* increment state counter */
st->state++;
if (commands[st->state] == NULL)
st->state = 0;
}
if (st->con == NULL)
{
if ((st->con = doConnect()) == NULL)
{
fprintf(stderr, "Client %d aborted in establishing connection.\n",
n);
remains--; /* I've aborted */
PQfinish(st->con);
st->con = NULL;
return;
}
}
if (use_log && st->state == 0)
gettimeofday(&(st->txn_begin), NULL);
if (commands[st->state]->type == SQL_COMMAND)
{
char *sql;
if ((sql = strdup(commands[st->state]->argv[0])) == NULL
|| (sql = assignVariables(st, sql)) == NULL)
{
fprintf(stderr, "out of memory\n");
st->ecnt++;
return;
}
if (debug)
fprintf(stderr, "client %d sending %s\n", n, sql);
if (PQsendQuery(st->con, sql) == 0)
{
if (debug)
fprintf(stderr, "PQsendQuery(%s)failed\n", sql);
st->ecnt++;
}
else
{
st->listen++; /* flags that should be listened */
}
}
else if (commands[st->state]->type == META_COMMAND)
{
int argc = commands[st->state]->argc, i;
char **argv = commands[st->state]->argv;
if (debug)
{
fprintf(stderr, "client %d executing \\%s", n, argv[0]);
for (i = 1; i < argc; i++)
fprintf(stderr, " %s", argv[i]);
fprintf(stderr, "\n");
}
if (strcasecmp(argv[0], "setrandom") == 0)
{
char *val;
if ((val = malloc(strlen(argv[3]) + 1)) == NULL)
{
fprintf(stderr, "%s: out of memory\n", argv[0]);
st->ecnt++;
return;
}
sprintf(val, "%d", getrand(atoi(argv[2]), atoi(argv[3])));
if (putVariable(st, argv[1], val) == false)
{
fprintf(stderr, "%s: out of memory\n", argv[0]);
free(val);
st->ecnt++;
return;
}
free(val);
st->listen++;
}
}
}
/* discard connections */
static void
disconnect_all(CState * state)
@ -644,6 +939,160 @@ init(void)
PQfinish(con);
}
static int
process_file(char *filename)
{
const char delim[] = " \f\n\r\t\v";
FILE *fd;
int lineno, i, j;
char buf[BUFSIZ], *p, *tok;
void *tmp;
if (strcmp(filename, "-") == 0)
fd = stdin;
else if ((fd = fopen(filename, "r")) == NULL)
{
fprintf(stderr, "%s: %s\n", strerror(errno), filename);
return false;
}
fprintf(stderr, "processing file...\n");
lineno = 1;
i = 0;
while (fgets(buf, sizeof(buf), fd) != NULL)
{
if ((p = strchr(buf, '\n')) != NULL)
*p = '\0';
p = buf;
while (isspace(*p))
p++;
if (*p == '\0' || strncmp(p, "--", 2) == 0)
{
lineno++;
continue;
}
if ((tmp = realloc(commands, sizeof(Command *) * (i + 1))) == NULL)
{
i--;
goto error;
}
commands = tmp;
if ((commands[i] = malloc(sizeof(Command))) == NULL)
goto error;
commands[i]->argv = NULL;
commands[i]->argc = 0;
if (*p == '\\')
{
commands[i]->type = META_COMMAND;
j = 0;
tok = strtok(++p, delim);
while (tok != NULL)
{
tmp = realloc(commands[i]->argv, sizeof(char *) * (j + 1));
if (tmp == NULL)
goto error;
commands[i]->argv = tmp;
if ((commands[i]->argv[j] = strdup(tok)) == NULL)
goto error;
commands[i]->argc++;
j++;
tok = strtok(NULL, delim);
}
if (strcasecmp(commands[i]->argv[0], "setrandom") == 0)
{
int min, max;
if (commands[i]->argc < 4)
{
fprintf(stderr, "%s: %d: \\%s: missing argument\n", filename, lineno, commands[i]->argv[0]);
goto error;
}
for (j = 4; j < commands[i]->argc; j++)
fprintf(stderr, "%s: %d: \\%s: extra argument \"%s\" ignored\n", filename, lineno, commands[i]->argv[0], commands[i]->argv[j]);
if ((min = atoi(commands[i]->argv[2])) < 0)
{
fprintf(stderr, "%s: %d: \\%s: invalid minimum number %s\n", filename, lineno, commands[i]->argv[0], commands[i]->argv[2]);
goto error;
}
if ((max = atoi(commands[i]->argv[3])) < min || max > RAND_MAX)
{
fprintf(stderr, "%s: %d: \\%s: invalid maximum number %s\n", filename, lineno, commands[i]->argv[0], commands[i]->argv[3]);
goto error;
}
}
else
{
fprintf(stderr, "%s: %d: invalid command \\%s\n", filename, lineno, commands[i]->argv[0]);
goto error;
}
}
else
{
commands[i]->type = SQL_COMMAND;
if ((commands[i]->argv = malloc(sizeof(char *))) == NULL)
goto error;
if ((commands[i]->argv[0] = strdup(p)) == NULL)
goto error;
commands[i]->argc++;
}
i++;
lineno++;
}
fclose(fd);
if ((tmp = realloc(commands, sizeof(Command *) * (i + 1))) == NULL)
goto error;
commands = tmp;
commands[i] = NULL;
return true;
error:
if (errno == ENOMEM)
fprintf(stderr, "%s: %d: out of memory\n", filename, lineno);
fclose(fd);
if (commands == NULL)
return false;
while (i >= 0)
{
if (commands[i] != NULL)
{
for (j = 0; j < commands[i]->argc; j++)
free(commands[i]->argv[j]);
free(commands[i]->argv);
free(commands[i]);
}
i--;
}
free(commands);
return false;
}
/* print out results */
static void
printResults(
@ -670,8 +1119,10 @@ printResults(
s = "TPC-B (sort of)";
else if (ttype == 2)
s = "Update only accounts";
else
else if (ttype == 1)
s = "SELECT only";
else
s = "Custom query";
printf("transaction type: %s\n", s);
printf("scaling factor: %d\n", tps);
@ -695,6 +1146,7 @@ main(int argc, char **argv)
int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT
* only, 2: skip update of branches and
* tellers */
char *filename = NULL;
static CState *state; /* status of clients */
@ -724,7 +1176,7 @@ main(int argc, char **argv)
else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
login = env;
while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSl")) != -1)
while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSlf:")) != -1)
{
switch (c)
{
@ -806,6 +1258,10 @@ main(int argc, char **argv)
case 'l':
use_log = true;
break;
case 'f':
ttype = 3;
filename = optarg;
break;
default:
usage();
exit(1);
@ -868,74 +1324,83 @@ main(int argc, char **argv)
exit(1);
}
/*
* get the scaling factor that should be same as count(*) from
* branches...
*/
res = PQexec(con, "select count(*) from branches");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
if (ttype == 3)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
PQfinish(con);
if (process_file(filename) == false)
exit(1);
}
tps = atoi(PQgetvalue(res, 0, 0));
if (tps < 0)
else
{
fprintf(stderr, "count(*) from branches invalid (%d)\n", tps);
exit(1);
}
PQclear(res);
if (!is_no_vacuum)
{
fprintf(stderr, "starting vacuum...");
res = PQexec(con, "vacuum branches");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
/*
* get the scaling factor that should be same as count(*) from
* branches...
*/
res = PQexec(con, "select count(*) from branches");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
res = PQexec(con, "vacuum tellers");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
tps = atoi(PQgetvalue(res, 0, 0));
if (tps < 0)
{
fprintf(stderr, "%s", PQerrorMessage(con));
fprintf(stderr, "count(*) from branches invalid (%d)\n", tps);
exit(1);
}
PQclear(res);
res = PQexec(con, "delete from history");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
if (!is_no_vacuum)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
res = PQexec(con, "vacuum history");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
fprintf(stderr, "end.\n");
if (is_full_vacuum)
{
fprintf(stderr, "starting full vacuum...");
res = PQexec(con, "vacuum analyze accounts");
fprintf(stderr, "starting vacuum...");
res = PQexec(con, "vacuum branches");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
res = PQexec(con, "vacuum tellers");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
res = PQexec(con, "delete from history");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
res = PQexec(con, "vacuum history");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
fprintf(stderr, "end.\n");
if (is_full_vacuum)
{
fprintf(stderr, "starting full vacuum...");
res = PQexec(con, "vacuum analyze accounts");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "%s", PQerrorMessage(con));
exit(1);
}
PQclear(res);
fprintf(stderr, "end.\n");
}
}
PQfinish(con);
}
PQfinish(con);
/* set random seed */
gettimeofday(&tv1, NULL);
@ -965,6 +1430,8 @@ main(int argc, char **argv)
doOne(state, i, debug, ttype);
else if (ttype == 1)
doSelectOnly(state, i, debug);
else if (ttype == 3)
doCustom(state, i, debug);
}
for (;;)
@ -982,16 +1449,16 @@ main(int argc, char **argv)
FD_ZERO(&input_mask);
maxsock = 0;
maxsock = -1;
for (i = 0; i < nclients; i++)
{
if (state[i].con)
if (state[i].con &&
(ttype != 3 || commands[state[i].state]->type != META_COMMAND))
{
int sock = PQsocket(state[i].con);
if (sock < 0)
{
fprintf(stderr, "Client %d: PQsocket failed\n", i);
disconnect_all(state);
exit(1);
}
@ -1001,36 +1468,43 @@ main(int argc, char **argv)
}
}
if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
(fd_set *) NULL, (struct timeval *) NULL)) < 0)
if (maxsock != -1)
{
if (errno == EINTR)
continue;
/* must be something wrong */
disconnect_all(state);
fprintf(stderr, "select failed: %s\n", strerror(errno));
exit(1);
}
else if (nsocks == 0)
{ /* timeout */
fprintf(stderr, "select timeout\n");
for (i = 0; i < nclients; i++)
if ((nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
(fd_set *) NULL, (struct timeval *) NULL)) < 0)
{
fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
if (errno == EINTR)
continue;
/* must be something wrong */
disconnect_all(state);
fprintf(stderr, "select failed: %s\n", strerror(errno));
exit(1);
}
else if (nsocks == 0)
{ /* timeout */
fprintf(stderr, "select timeout\n");
for (i = 0; i < nclients; i++)
{
fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
}
exit(0);
}
exit(0);
}
/* ok, backend returns reply */
for (i = 0; i < nclients; i++)
{
if (state[i].con && FD_ISSET(PQsocket(state[i].con), &input_mask))
if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
|| (ttype == 3
&& commands[state[i].state]->type == META_COMMAND)))
{
if (ttype == 0 || ttype == 2)
doOne(state, i, debug, ttype);
else if (ttype == 1)
doSelectOnly(state, i, debug);
else if (ttype == 3)
doCustom(state, i, debug);
}
}
}