mirror of
https://github.com/postgres/postgres.git
synced 2025-05-17 06:41:24 +03:00
pgbench: refactor handling of stats tracking
This doesn't add any functionality but just shuffles things around so that it can be reused and improved later. Author: Fabien Coelho Reviewed-by: Michael Paquier, Álvaro Herrera
This commit is contained in:
parent
7e22470471
commit
b603766496
@ -166,10 +166,8 @@ int agg_interval; /* log aggregates instead of individual
|
|||||||
* transactions */
|
* transactions */
|
||||||
int progress = 0; /* thread progress report every this seconds */
|
int progress = 0; /* thread progress report every this seconds */
|
||||||
bool progress_timestamp = false; /* progress report with Unix time */
|
bool progress_timestamp = false; /* progress report with Unix time */
|
||||||
int progress_nclients = 0; /* number of clients for progress
|
int nclients = 1; /* number of clients */
|
||||||
* report */
|
int nthreads = 1; /* number of threads */
|
||||||
int progress_nthreads = 0; /* number of threads for progress
|
|
||||||
* report */
|
|
||||||
bool is_connect; /* establish connection for each transaction */
|
bool is_connect; /* establish connection for each transaction */
|
||||||
bool is_latencies; /* report per-command latencies */
|
bool is_latencies; /* report per-command latencies */
|
||||||
int main_pid; /* main process id used in log filename */
|
int main_pid; /* main process id used in log filename */
|
||||||
@ -192,6 +190,35 @@ typedef struct
|
|||||||
#define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
|
#define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
|
||||||
#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
|
#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Simple data structure to keep stats about something.
|
||||||
|
*
|
||||||
|
* XXX probably the first value should be kept and used as an offset for
|
||||||
|
* better numerical stability...
|
||||||
|
*/
|
||||||
|
typedef struct SimpleStats
|
||||||
|
{
|
||||||
|
int64 count; /* how many values were encountered */
|
||||||
|
double min; /* the minimum seen */
|
||||||
|
double max; /* the maximum seen */
|
||||||
|
double sum; /* sum of values */
|
||||||
|
double sum2; /* sum of squared values */
|
||||||
|
} SimpleStats;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Data structure to hold various statistics: per-thread stats are maintained
|
||||||
|
* and merged together.
|
||||||
|
*/
|
||||||
|
typedef struct StatsData
|
||||||
|
{
|
||||||
|
long start_time; /* interval start time, for aggregates */
|
||||||
|
int64 cnt; /* number of transactions */
|
||||||
|
int64 skipped; /* number of transactions skipped under --rate
|
||||||
|
* and --latency-limit */
|
||||||
|
SimpleStats latency;
|
||||||
|
SimpleStats lag;
|
||||||
|
} StatsData;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Connection state
|
* Connection state
|
||||||
*/
|
*/
|
||||||
@ -213,10 +240,8 @@ typedef struct
|
|||||||
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
|
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
|
||||||
|
|
||||||
/* per client collected stats */
|
/* per client collected stats */
|
||||||
int cnt; /* xacts count */
|
int64 cnt; /* transaction count */
|
||||||
int ecnt; /* error count */
|
int ecnt; /* error count */
|
||||||
int64 txn_latencies; /* cumulated latencies */
|
|
||||||
int64 txn_sqlats; /* cumulated square latencies */
|
|
||||||
} CState;
|
} CState;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -228,19 +253,14 @@ typedef struct
|
|||||||
pthread_t thread; /* thread handle */
|
pthread_t thread; /* thread handle */
|
||||||
CState *state; /* array of CState */
|
CState *state; /* array of CState */
|
||||||
int nstate; /* length of state[] */
|
int nstate; /* length of state[] */
|
||||||
instr_time start_time; /* thread start time */
|
|
||||||
instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
|
|
||||||
int *exec_count; /* number of cmd executions (per Command) */
|
|
||||||
unsigned short random_state[3]; /* separate randomness for each thread */
|
unsigned short random_state[3]; /* separate randomness for each thread */
|
||||||
int64 throttle_trigger; /* previous/next throttling (us) */
|
int64 throttle_trigger; /* previous/next throttling (us) */
|
||||||
|
|
||||||
/* per thread collected stats */
|
/* per thread collected stats */
|
||||||
|
instr_time start_time; /* thread start time */
|
||||||
instr_time conn_time;
|
instr_time conn_time;
|
||||||
int64 throttle_lag; /* total transaction lag behind throttling */
|
StatsData stats;
|
||||||
int64 throttle_lag_max; /* max transaction lag */
|
int64 latency_late; /* executed but late transactions */
|
||||||
int64 throttle_latency_skipped; /* lagging transactions
|
|
||||||
* skipped */
|
|
||||||
int64 latency_late; /* late transactions */
|
|
||||||
} TState;
|
} TState;
|
||||||
|
|
||||||
#define INVALID_THREAD ((pthread_t) 0)
|
#define INVALID_THREAD ((pthread_t) 0)
|
||||||
@ -272,28 +292,9 @@ typedef struct
|
|||||||
char *argv[MAX_ARGS]; /* command word list */
|
char *argv[MAX_ARGS]; /* command word list */
|
||||||
int cols[MAX_ARGS]; /* corresponding column starting from 1 */
|
int cols[MAX_ARGS]; /* corresponding column starting from 1 */
|
||||||
PgBenchExpr *expr; /* parsed expression */
|
PgBenchExpr *expr; /* parsed expression */
|
||||||
|
SimpleStats stats; /* time spent in this command */
|
||||||
} Command;
|
} Command;
|
||||||
|
|
||||||
typedef struct
|
|
||||||
{
|
|
||||||
|
|
||||||
long start_time; /* when does the interval start */
|
|
||||||
int cnt; /* number of transactions */
|
|
||||||
int skipped; /* number of transactions skipped under --rate
|
|
||||||
* and --latency-limit */
|
|
||||||
|
|
||||||
double min_latency; /* min/max latencies */
|
|
||||||
double max_latency;
|
|
||||||
double sum_latency; /* sum(latency), sum(latency^2) - for
|
|
||||||
* estimates */
|
|
||||||
double sum2_latency;
|
|
||||||
|
|
||||||
double min_lag;
|
|
||||||
double max_lag;
|
|
||||||
double sum_lag; /* sum(lag) */
|
|
||||||
double sum2_lag; /* sum(lag*lag) */
|
|
||||||
} AggVals;
|
|
||||||
|
|
||||||
static struct
|
static struct
|
||||||
{
|
{
|
||||||
const char *name;
|
const char *name;
|
||||||
@ -362,8 +363,11 @@ static struct
|
|||||||
static void setalarm(int seconds);
|
static void setalarm(int seconds);
|
||||||
static void *threadRun(void *arg);
|
static void *threadRun(void *arg);
|
||||||
|
|
||||||
|
static void processXactStats(TState *thread, CState *st, instr_time *now,
|
||||||
|
bool skipped, FILE *logfile, StatsData *agg);
|
||||||
static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
|
static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
|
||||||
AggVals *agg, bool skipped);
|
StatsData *agg, bool skipped, double latency, double lag);
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
usage(void)
|
usage(void)
|
||||||
@ -602,6 +606,82 @@ getPoissonRand(TState *thread, int64 center)
|
|||||||
return (int64) (-log(uniform) * ((double) center) + 0.5);
|
return (int64) (-log(uniform) * ((double) center) + 0.5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Initialize the given SimpleStats struct to all zeroes
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
initSimpleStats(SimpleStats *ss)
|
||||||
|
{
|
||||||
|
memset(ss, 0, sizeof(SimpleStats));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Accumulate one value into a SimpleStats struct.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
addToSimpleStats(SimpleStats *ss, double val)
|
||||||
|
{
|
||||||
|
if (ss->count == 0 || val < ss->min)
|
||||||
|
ss->min = val;
|
||||||
|
if (ss->count == 0 || val > ss->max)
|
||||||
|
ss->max = val;
|
||||||
|
ss->count++;
|
||||||
|
ss->sum += val;
|
||||||
|
ss->sum2 += val * val;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Merge two SimpleStats objects
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
|
||||||
|
{
|
||||||
|
if (acc->count == 0 || ss->min < acc->min)
|
||||||
|
acc->min = ss->min;
|
||||||
|
if (acc->count == 0 || ss->max > acc->max)
|
||||||
|
acc->max = ss->max;
|
||||||
|
acc->count += ss->count;
|
||||||
|
acc->sum += ss->sum;
|
||||||
|
acc->sum2 += ss->sum2;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Initialize a StatsData struct to mostly zeroes, with its start time set to
|
||||||
|
* the given value.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
initStats(StatsData *sd, double start_time)
|
||||||
|
{
|
||||||
|
sd->start_time = start_time;
|
||||||
|
sd->cnt = 0;
|
||||||
|
sd->skipped = 0;
|
||||||
|
initSimpleStats(&sd->latency);
|
||||||
|
initSimpleStats(&sd->lag);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Accumulate one additional item into the given stats object.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
accumStats(StatsData *stats, bool skipped, double lat, double lag)
|
||||||
|
{
|
||||||
|
stats->cnt++;
|
||||||
|
|
||||||
|
if (skipped)
|
||||||
|
{
|
||||||
|
/* no latency to record on skipped transactions */
|
||||||
|
stats->skipped++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
addToSimpleStats(&stats->latency, lat);
|
||||||
|
|
||||||
|
/* and possibly the same for schedule lag */
|
||||||
|
if (throttle_delay)
|
||||||
|
addToSimpleStats(&stats->lag, lag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* call PQexec() and exit() on failure */
|
/* call PQexec() and exit() on failure */
|
||||||
static void
|
static void
|
||||||
executeStatement(PGconn *con, const char *sql)
|
executeStatement(PGconn *con, const char *sql)
|
||||||
@ -1121,30 +1201,6 @@ clientDone(CState *st, bool ok)
|
|||||||
return false; /* always false */
|
return false; /* always false */
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
|
||||||
agg_vals_init(AggVals *aggs, instr_time start)
|
|
||||||
{
|
|
||||||
/* basic counters */
|
|
||||||
aggs->cnt = 0; /* number of transactions (includes skipped) */
|
|
||||||
aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
|
|
||||||
|
|
||||||
aggs->sum_latency = 0; /* SUM(latency) */
|
|
||||||
aggs->sum2_latency = 0; /* SUM(latency*latency) */
|
|
||||||
|
|
||||||
/* min and max transaction duration */
|
|
||||||
aggs->min_latency = 0;
|
|
||||||
aggs->max_latency = 0;
|
|
||||||
|
|
||||||
/* schedule lag counters */
|
|
||||||
aggs->sum_lag = 0;
|
|
||||||
aggs->sum2_lag = 0;
|
|
||||||
aggs->min_lag = 0;
|
|
||||||
aggs->max_lag = 0;
|
|
||||||
|
|
||||||
/* start of the current interval */
|
|
||||||
aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
static int
|
||||||
chooseScript(TState *thread)
|
chooseScript(TState *thread)
|
||||||
{
|
{
|
||||||
@ -1156,7 +1212,7 @@ chooseScript(TState *thread)
|
|||||||
|
|
||||||
/* return false iff client should be disconnected */
|
/* return false iff client should be disconnected */
|
||||||
static bool
|
static bool
|
||||||
doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
|
doCustom(TState *thread, CState *st, FILE *logfile, StatsData *agg)
|
||||||
{
|
{
|
||||||
PGresult *res;
|
PGresult *res;
|
||||||
Command **commands;
|
Command **commands;
|
||||||
@ -1210,11 +1266,8 @@ top:
|
|||||||
now_us = INSTR_TIME_GET_MICROSEC(now);
|
now_us = INSTR_TIME_GET_MICROSEC(now);
|
||||||
while (thread->throttle_trigger < now_us - latency_limit)
|
while (thread->throttle_trigger < now_us - latency_limit)
|
||||||
{
|
{
|
||||||
thread->throttle_latency_skipped++;
|
processXactStats(thread, st, &now, true, logfile, agg);
|
||||||
|
/* next rendez-vous */
|
||||||
if (logfile)
|
|
||||||
doLog(thread, st, logfile, &now, agg, true);
|
|
||||||
|
|
||||||
wait = getPoissonRand(thread, throttle_delay);
|
wait = getPoissonRand(thread, throttle_delay);
|
||||||
thread->throttle_trigger += wait;
|
thread->throttle_trigger += wait;
|
||||||
st->txn_scheduled = thread->throttle_trigger;
|
st->txn_scheduled = thread->throttle_trigger;
|
||||||
@ -1231,28 +1284,13 @@ top:
|
|||||||
|
|
||||||
if (st->sleeping)
|
if (st->sleeping)
|
||||||
{ /* are we sleeping? */
|
{ /* are we sleeping? */
|
||||||
int64 now_us;
|
|
||||||
|
|
||||||
if (INSTR_TIME_IS_ZERO(now))
|
if (INSTR_TIME_IS_ZERO(now))
|
||||||
INSTR_TIME_SET_CURRENT(now);
|
INSTR_TIME_SET_CURRENT(now);
|
||||||
now_us = INSTR_TIME_GET_MICROSEC(now);
|
if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
|
||||||
if (st->txn_scheduled <= now_us)
|
|
||||||
{
|
|
||||||
/* Done sleeping, go ahead with next command */
|
|
||||||
st->sleeping = false;
|
|
||||||
if (st->throttling)
|
|
||||||
{
|
|
||||||
/* Measure lag of throttled transaction relative to target */
|
|
||||||
int64 lag = now_us - st->txn_scheduled;
|
|
||||||
|
|
||||||
thread->throttle_lag += lag;
|
|
||||||
if (lag > thread->throttle_lag_max)
|
|
||||||
thread->throttle_lag_max = lag;
|
|
||||||
st->throttling = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
return true; /* Still sleeping, nothing to do here */
|
return true; /* Still sleeping, nothing to do here */
|
||||||
|
/* Else done sleeping, go ahead with next command */
|
||||||
|
st->sleeping = false;
|
||||||
|
st->throttling = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (st->listen)
|
if (st->listen)
|
||||||
@ -1276,47 +1314,22 @@ top:
|
|||||||
*/
|
*/
|
||||||
if (is_latencies)
|
if (is_latencies)
|
||||||
{
|
{
|
||||||
int cnum = commands[st->state]->command_num;
|
|
||||||
|
|
||||||
if (INSTR_TIME_IS_ZERO(now))
|
if (INSTR_TIME_IS_ZERO(now))
|
||||||
INSTR_TIME_SET_CURRENT(now);
|
INSTR_TIME_SET_CURRENT(now);
|
||||||
INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
|
|
||||||
now, st->stmt_begin);
|
/* XXX could use a mutex here, but we choose not to */
|
||||||
thread->exec_count[cnum]++;
|
addToSimpleStats(&commands[st->state]->stats,
|
||||||
|
INSTR_TIME_GET_DOUBLE(now) -
|
||||||
|
INSTR_TIME_GET_DOUBLE(st->stmt_begin));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* transaction finished: calculate latency and log the transaction */
|
/* transaction finished: calculate latency and log the transaction */
|
||||||
if (commands[st->state + 1] == NULL)
|
if (commands[st->state + 1] == NULL)
|
||||||
{
|
{
|
||||||
/* only calculate latency if an option is used that needs it */
|
if (progress || throttle_delay || latency_limit || logfile)
|
||||||
if (progress || throttle_delay || latency_limit)
|
processXactStats(thread, st, &now, false, logfile, agg);
|
||||||
{
|
else
|
||||||
int64 latency;
|
thread->stats.cnt++;
|
||||||
|
|
||||||
if (INSTR_TIME_IS_ZERO(now))
|
|
||||||
INSTR_TIME_SET_CURRENT(now);
|
|
||||||
|
|
||||||
latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
|
|
||||||
|
|
||||||
st->txn_latencies += latency;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* XXX In a long benchmark run of high-latency transactions,
|
|
||||||
* this int64 addition eventually overflows. For example, 100
|
|
||||||
* threads running 10s transactions will overflow it in 2.56
|
|
||||||
* hours. With a more-typical OLTP workload of .1s
|
|
||||||
* transactions, overflow would take 256 hours.
|
|
||||||
*/
|
|
||||||
st->txn_sqlats += latency * latency;
|
|
||||||
|
|
||||||
/* record over the limit transactions if needed. */
|
|
||||||
if (latency_limit && latency > latency_limit)
|
|
||||||
thread->latency_late++;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* record the time it took in the log */
|
|
||||||
if (logfile)
|
|
||||||
doLog(thread, st, logfile, &now, agg, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commands[st->state]->type == SQL_COMMAND)
|
if (commands[st->state]->type == SQL_COMMAND)
|
||||||
@ -1391,7 +1404,7 @@ top:
|
|||||||
return clientDone(st, false);
|
return clientDone(st, false);
|
||||||
}
|
}
|
||||||
INSTR_TIME_SET_CURRENT(end);
|
INSTR_TIME_SET_CURRENT(end);
|
||||||
INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
|
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1734,6 +1747,8 @@ top:
|
|||||||
else /* succeeded */
|
else /* succeeded */
|
||||||
st->listen = true;
|
st->listen = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* after a meta command, immediately proceed with next command */
|
||||||
goto top;
|
goto top;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1744,12 +1759,9 @@ top:
|
|||||||
* print log entry after completing one transaction.
|
* print log entry after completing one transaction.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
|
doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
|
||||||
bool skipped)
|
StatsData *agg, bool skipped, double latency, double lag)
|
||||||
{
|
{
|
||||||
double lag;
|
|
||||||
double latency;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Skip the log entry if sampling is enabled and this row doesn't belong
|
* Skip the log entry if sampling is enabled and this row doesn't belong
|
||||||
* to the random sample.
|
* to the random sample.
|
||||||
@ -1758,118 +1770,42 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
|
|||||||
pg_erand48(thread->random_state) > sample_rate)
|
pg_erand48(thread->random_state) > sample_rate)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (INSTR_TIME_IS_ZERO(*now))
|
|
||||||
INSTR_TIME_SET_CURRENT(*now);
|
|
||||||
|
|
||||||
latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
|
|
||||||
if (skipped)
|
|
||||||
lag = latency;
|
|
||||||
else
|
|
||||||
lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
|
|
||||||
|
|
||||||
/* should we aggregate the results or not? */
|
/* should we aggregate the results or not? */
|
||||||
if (agg_interval > 0)
|
if (agg_interval > 0)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Are we still in the same interval? If yes, accumulate the values
|
* Loop until we reach the interval of the current transaction, and
|
||||||
* (print them otherwise)
|
* print all the empty intervals in between (this may happen with very
|
||||||
*/
|
* low tps, e.g. --rate=0.1).
|
||||||
if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
|
|
||||||
{
|
|
||||||
agg->cnt += 1;
|
|
||||||
if (skipped)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* there is no latency to record if the transaction was
|
|
||||||
* skipped
|
|
||||||
*/
|
|
||||||
agg->skipped += 1;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
agg->sum_latency += latency;
|
|
||||||
agg->sum2_latency += latency * latency;
|
|
||||||
|
|
||||||
/* first in this aggregation interval */
|
|
||||||
if ((agg->cnt == 1) || (latency < agg->min_latency))
|
|
||||||
agg->min_latency = latency;
|
|
||||||
|
|
||||||
if ((agg->cnt == 1) || (latency > agg->max_latency))
|
|
||||||
agg->max_latency = latency;
|
|
||||||
|
|
||||||
/* and the same for schedule lag */
|
|
||||||
if (throttle_delay)
|
|
||||||
{
|
|
||||||
agg->sum_lag += lag;
|
|
||||||
agg->sum2_lag += lag * lag;
|
|
||||||
|
|
||||||
if ((agg->cnt == 1) || (lag < agg->min_lag))
|
|
||||||
agg->min_lag = lag;
|
|
||||||
if ((agg->cnt == 1) || (lag > agg->max_lag))
|
|
||||||
agg->max_lag = lag;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Loop until we reach the interval of the current transaction
|
|
||||||
* (and print all the empty intervals in between).
|
|
||||||
*/
|
*/
|
||||||
while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
|
while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
|
||||||
{
|
{
|
||||||
/*
|
/* print aggregated report to logfile */
|
||||||
* This is a non-Windows branch (thanks to the ifdef in
|
fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
|
||||||
* usage), so we don't need to handle this in a special way
|
|
||||||
* (see below).
|
|
||||||
*/
|
|
||||||
fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
|
|
||||||
agg->start_time,
|
agg->start_time,
|
||||||
agg->cnt,
|
agg->cnt,
|
||||||
agg->sum_latency,
|
agg->latency.sum,
|
||||||
agg->sum2_latency,
|
agg->latency.sum2,
|
||||||
agg->min_latency,
|
agg->latency.min,
|
||||||
agg->max_latency);
|
agg->latency.max);
|
||||||
if (throttle_delay)
|
if (throttle_delay)
|
||||||
{
|
{
|
||||||
fprintf(logfile, " %.0f %.0f %.0f %.0f",
|
fprintf(logfile, " %.0f %.0f %.0f %.0f",
|
||||||
agg->sum_lag,
|
agg->lag.sum,
|
||||||
agg->sum2_lag,
|
agg->lag.sum2,
|
||||||
agg->min_lag,
|
agg->lag.min,
|
||||||
agg->max_lag);
|
agg->lag.max);
|
||||||
if (latency_limit)
|
if (latency_limit)
|
||||||
fprintf(logfile, " %d", agg->skipped);
|
fprintf(logfile, " " INT64_FORMAT, agg->skipped);
|
||||||
}
|
}
|
||||||
fputc('\n', logfile);
|
fputc('\n', logfile);
|
||||||
|
|
||||||
/* move to the next inteval */
|
/* reset data and move to next interval */
|
||||||
agg->start_time = agg->start_time + agg_interval;
|
initStats(agg, agg->start_time + agg_interval);
|
||||||
|
|
||||||
/* reset for "no transaction" intervals */
|
|
||||||
agg->cnt = 0;
|
|
||||||
agg->skipped = 0;
|
|
||||||
agg->min_latency = 0;
|
|
||||||
agg->max_latency = 0;
|
|
||||||
agg->sum_latency = 0;
|
|
||||||
agg->sum2_latency = 0;
|
|
||||||
agg->min_lag = 0;
|
|
||||||
agg->max_lag = 0;
|
|
||||||
agg->sum_lag = 0;
|
|
||||||
agg->sum2_lag = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* reset the values to include only the current transaction. */
|
/* accumulate the current transaction */
|
||||||
agg->cnt = 1;
|
accumStats(agg, skipped, latency, lag);
|
||||||
agg->skipped = skipped ? 1 : 0;
|
|
||||||
agg->min_latency = latency;
|
|
||||||
agg->max_latency = latency;
|
|
||||||
agg->sum_latency = skipped ? 0.0 : latency;
|
|
||||||
agg->sum2_latency = skipped ? 0.0 : latency * latency;
|
|
||||||
agg->min_lag = lag;
|
|
||||||
agg->max_lag = lag;
|
|
||||||
agg->sum_lag = lag;
|
|
||||||
agg->sum2_lag = lag * lag;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1878,21 +1814,21 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
|
|||||||
|
|
||||||
/* This is more than we really ought to know about instr_time */
|
/* This is more than we really ought to know about instr_time */
|
||||||
if (skipped)
|
if (skipped)
|
||||||
fprintf(logfile, "%d %d skipped %d %ld %ld",
|
fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
|
||||||
st->id, st->cnt, st->use_file,
|
st->id, st->cnt, st->use_file,
|
||||||
(long) now->tv_sec, (long) now->tv_usec);
|
(long) now->tv_sec, (long) now->tv_usec);
|
||||||
else
|
else
|
||||||
fprintf(logfile, "%d %d %.0f %d %ld %ld",
|
fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
|
||||||
st->id, st->cnt, latency, st->use_file,
|
st->id, st->cnt, latency, st->use_file,
|
||||||
(long) now->tv_sec, (long) now->tv_usec);
|
(long) now->tv_sec, (long) now->tv_usec);
|
||||||
#else
|
#else
|
||||||
|
|
||||||
/* On Windows, instr_time doesn't provide a timestamp anyway */
|
/* On Windows, instr_time doesn't provide a timestamp anyway */
|
||||||
if (skipped)
|
if (skipped)
|
||||||
fprintf(logfile, "%d %d skipped %d 0 0",
|
fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0",
|
||||||
st->id, st->cnt, st->use_file);
|
st->id, st->cnt, st->use_file);
|
||||||
else
|
else
|
||||||
fprintf(logfile, "%d %d %.0f %d 0 0",
|
fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0",
|
||||||
st->id, st->cnt, latency, st->use_file);
|
st->id, st->cnt, latency, st->use_file);
|
||||||
#endif
|
#endif
|
||||||
if (throttle_delay)
|
if (throttle_delay)
|
||||||
@ -1901,6 +1837,44 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Accumulate and report statistics at end of a transaction.
|
||||||
|
*
|
||||||
|
* (This is also called when a transaction is late and thus skipped.)
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
processXactStats(TState *thread, CState *st, instr_time *now,
|
||||||
|
bool skipped, FILE *logfile, StatsData *agg)
|
||||||
|
{
|
||||||
|
double latency = 0.0,
|
||||||
|
lag = 0.0;
|
||||||
|
|
||||||
|
if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now))
|
||||||
|
INSTR_TIME_SET_CURRENT(*now);
|
||||||
|
|
||||||
|
if (!skipped)
|
||||||
|
{
|
||||||
|
/* compute latency & lag */
|
||||||
|
latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
|
||||||
|
lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (progress || throttle_delay || latency_limit)
|
||||||
|
{
|
||||||
|
accumStats(&thread->stats, skipped, latency, lag);
|
||||||
|
|
||||||
|
/* count transactions over the latency limit, if needed */
|
||||||
|
if (latency_limit && latency > latency_limit)
|
||||||
|
thread->latency_late++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
thread->stats.cnt++;
|
||||||
|
|
||||||
|
if (use_log)
|
||||||
|
doLog(thread, st, logfile, now, agg, skipped, latency, lag);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* discard connections */
|
/* discard connections */
|
||||||
static void
|
static void
|
||||||
disconnect_all(CState *state, int length)
|
disconnect_all(CState *state, int length)
|
||||||
@ -2297,6 +2271,7 @@ process_commands(char *buf, const char *source, const int lineno)
|
|||||||
my_commands->command_num = num_commands++;
|
my_commands->command_num = num_commands++;
|
||||||
my_commands->type = 0; /* until set */
|
my_commands->type = 0; /* until set */
|
||||||
my_commands->argc = 0;
|
my_commands->argc = 0;
|
||||||
|
initSimpleStats(&my_commands->stats);
|
||||||
|
|
||||||
if (*p == '\\')
|
if (*p == '\\')
|
||||||
{
|
{
|
||||||
@ -2689,22 +2664,29 @@ addScript(const char *name, Command **commands)
|
|||||||
num_scripts++;
|
num_scripts++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
printSimpleStats(char *prefix, SimpleStats *ss)
|
||||||
|
{
|
||||||
|
/* print NaN if no transactions where executed */
|
||||||
|
double latency = ss->sum / ss->count;
|
||||||
|
double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
|
||||||
|
|
||||||
|
printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
|
||||||
|
printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
|
||||||
|
}
|
||||||
|
|
||||||
/* print out results */
|
/* print out results */
|
||||||
static void
|
static void
|
||||||
printResults(int64 normal_xacts, int nclients,
|
printResults(TState *threads, StatsData *total, instr_time total_time,
|
||||||
TState *threads, int nthreads,
|
instr_time conn_total_time, int latency_late)
|
||||||
instr_time total_time, instr_time conn_total_time,
|
|
||||||
int64 total_latencies, int64 total_sqlats,
|
|
||||||
int64 throttle_lag, int64 throttle_lag_max,
|
|
||||||
int64 throttle_latency_skipped, int64 latency_late)
|
|
||||||
{
|
{
|
||||||
double time_include,
|
double time_include,
|
||||||
tps_include,
|
tps_include,
|
||||||
tps_exclude;
|
tps_exclude;
|
||||||
|
|
||||||
time_include = INSTR_TIME_GET_DOUBLE(total_time);
|
time_include = INSTR_TIME_GET_DOUBLE(total_time);
|
||||||
tps_include = normal_xacts / time_include;
|
tps_include = total->cnt / time_include;
|
||||||
tps_exclude = normal_xacts / (time_include -
|
tps_exclude = total->cnt / (time_include -
|
||||||
(INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
|
(INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
|
||||||
|
|
||||||
printf("transaction type: %s\n",
|
printf("transaction type: %s\n",
|
||||||
@ -2716,46 +2698,36 @@ printResults(int64 normal_xacts, int nclients,
|
|||||||
if (duration <= 0)
|
if (duration <= 0)
|
||||||
{
|
{
|
||||||
printf("number of transactions per client: %d\n", nxacts);
|
printf("number of transactions per client: %d\n", nxacts);
|
||||||
printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
|
printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
|
||||||
normal_xacts, (int64) nxacts * nclients);
|
total->cnt, nxacts * nclients);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
printf("duration: %d s\n", duration);
|
printf("duration: %d s\n", duration);
|
||||||
printf("number of transactions actually processed: " INT64_FORMAT "\n",
|
printf("number of transactions actually processed: " INT64_FORMAT "\n",
|
||||||
normal_xacts);
|
total->cnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remaining stats are nonsensical if we failed to execute any xacts */
|
/* Remaining stats are nonsensical if we failed to execute any xacts */
|
||||||
if (normal_xacts <= 0)
|
if (total->cnt <= 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (throttle_delay && latency_limit)
|
if (throttle_delay && latency_limit)
|
||||||
printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
|
printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
|
||||||
throttle_latency_skipped,
|
total->skipped,
|
||||||
100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
|
100.0 * total->skipped / (total->skipped + total->cnt));
|
||||||
|
|
||||||
if (latency_limit)
|
if (latency_limit)
|
||||||
printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
|
printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n",
|
||||||
latency_limit / 1000.0, latency_late,
|
latency_limit / 1000.0, latency_late,
|
||||||
100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
|
100.0 * latency_late / (total->skipped + total->cnt));
|
||||||
|
|
||||||
if (throttle_delay || progress || latency_limit)
|
if (throttle_delay || progress || latency_limit)
|
||||||
{
|
printSimpleStats("latency", &total->latency);
|
||||||
/* compute and show latency average and standard deviation */
|
|
||||||
double latency = 0.001 * total_latencies / normal_xacts;
|
|
||||||
double sqlat = (double) total_sqlats / normal_xacts;
|
|
||||||
|
|
||||||
printf("latency average: %.3f ms\n"
|
|
||||||
"latency stddev: %.3f ms\n",
|
|
||||||
latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
|
||||||
/* only an average latency computed from the duration is available */
|
/* only an average latency computed from the duration is available */
|
||||||
printf("latency average: %.3f ms\n",
|
printf("latency average: %.3f ms\n",
|
||||||
1000.0 * duration * nclients / normal_xacts);
|
1000.0 * duration * nclients / total->cnt);
|
||||||
}
|
|
||||||
|
|
||||||
if (throttle_delay)
|
if (throttle_delay)
|
||||||
{
|
{
|
||||||
@ -2766,7 +2738,7 @@ printResults(int64 normal_xacts, int nclients,
|
|||||||
* the database load, or the Poisson throttling process.
|
* the database load, or the Poisson throttling process.
|
||||||
*/
|
*/
|
||||||
printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
|
printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
|
||||||
0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
|
0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("tps = %f (including connections establishing)\n", tps_include);
|
printf("tps = %f (including connections establishing)\n", tps_include);
|
||||||
@ -2785,33 +2757,9 @@ printResults(int64 normal_xacts, int nclients,
|
|||||||
printf(" - statement latencies in milliseconds:\n");
|
printf(" - statement latencies in milliseconds:\n");
|
||||||
|
|
||||||
for (commands = sql_script[i].commands; *commands != NULL; commands++)
|
for (commands = sql_script[i].commands; *commands != NULL; commands++)
|
||||||
{
|
printf(" %11.3f %s\n",
|
||||||
Command *command = *commands;
|
1000.0 * (*commands)->stats.sum / (*commands)->stats.count,
|
||||||
int cnum = command->command_num;
|
(*commands)->line);
|
||||||
double total_time;
|
|
||||||
instr_time total_exec_elapsed;
|
|
||||||
int total_exec_count;
|
|
||||||
int t;
|
|
||||||
|
|
||||||
/* Accumulate per-thread data for command */
|
|
||||||
INSTR_TIME_SET_ZERO(total_exec_elapsed);
|
|
||||||
total_exec_count = 0;
|
|
||||||
for (t = 0; t < nthreads; t++)
|
|
||||||
{
|
|
||||||
TState *thread = &threads[t];
|
|
||||||
|
|
||||||
INSTR_TIME_ADD(total_exec_elapsed,
|
|
||||||
thread->exec_elapsed[cnum]);
|
|
||||||
total_exec_count += thread->exec_count[cnum];
|
|
||||||
}
|
|
||||||
|
|
||||||
if (total_exec_count > 0)
|
|
||||||
total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
|
|
||||||
else
|
|
||||||
total_time = 0.0;
|
|
||||||
|
|
||||||
printf("\t%f\t%s\n", total_time, command->line);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2860,8 +2808,6 @@ main(int argc, char **argv)
|
|||||||
};
|
};
|
||||||
|
|
||||||
int c;
|
int c;
|
||||||
int nclients = 1; /* default number of simulated clients */
|
|
||||||
int nthreads = 1; /* default number of threads */
|
|
||||||
int is_init_mode = 0; /* initialize mode? */
|
int is_init_mode = 0; /* initialize mode? */
|
||||||
int is_no_vacuum = 0; /* no vacuum at all before testing? */
|
int is_no_vacuum = 0; /* no vacuum at all before testing? */
|
||||||
int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
|
int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
|
||||||
@ -2878,13 +2824,8 @@ main(int argc, char **argv)
|
|||||||
instr_time start_time; /* start up time */
|
instr_time start_time; /* start up time */
|
||||||
instr_time total_time;
|
instr_time total_time;
|
||||||
instr_time conn_total_time;
|
instr_time conn_total_time;
|
||||||
int64 total_xacts = 0;
|
|
||||||
int64 total_latencies = 0;
|
|
||||||
int64 total_sqlats = 0;
|
|
||||||
int64 throttle_lag = 0;
|
|
||||||
int64 throttle_lag_max = 0;
|
|
||||||
int64 throttle_latency_skipped = 0;
|
|
||||||
int64 latency_late = 0;
|
int64 latency_late = 0;
|
||||||
|
StatsData stats;
|
||||||
char *desc;
|
char *desc;
|
||||||
|
|
||||||
int i;
|
int i;
|
||||||
@ -3311,8 +3252,6 @@ main(int argc, char **argv)
|
|||||||
* changed after fork.
|
* changed after fork.
|
||||||
*/
|
*/
|
||||||
main_pid = (int) getpid();
|
main_pid = (int) getpid();
|
||||||
progress_nclients = nclients;
|
|
||||||
progress_nthreads = nthreads;
|
|
||||||
|
|
||||||
if (nclients > 1)
|
if (nclients > 1)
|
||||||
{
|
{
|
||||||
@ -3454,32 +3393,10 @@ main(int argc, char **argv)
|
|||||||
thread->random_state[0] = random();
|
thread->random_state[0] = random();
|
||||||
thread->random_state[1] = random();
|
thread->random_state[1] = random();
|
||||||
thread->random_state[2] = random();
|
thread->random_state[2] = random();
|
||||||
thread->throttle_latency_skipped = 0;
|
|
||||||
thread->latency_late = 0;
|
thread->latency_late = 0;
|
||||||
|
initStats(&thread->stats, 0.0);
|
||||||
|
|
||||||
nclients_dealt += thread->nstate;
|
nclients_dealt += thread->nstate;
|
||||||
|
|
||||||
if (is_latencies)
|
|
||||||
{
|
|
||||||
/* Reserve memory for the thread to store per-command latencies */
|
|
||||||
int t;
|
|
||||||
|
|
||||||
thread->exec_elapsed = (instr_time *)
|
|
||||||
pg_malloc(sizeof(instr_time) * num_commands);
|
|
||||||
thread->exec_count = (int *)
|
|
||||||
pg_malloc(sizeof(int) * num_commands);
|
|
||||||
|
|
||||||
for (t = 0; t < num_commands; t++)
|
|
||||||
{
|
|
||||||
INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
|
|
||||||
thread->exec_count[t] = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
thread->exec_elapsed = NULL;
|
|
||||||
thread->exec_count = NULL;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* all clients must be assigned to a thread */
|
/* all clients must be assigned to a thread */
|
||||||
@ -3522,11 +3439,11 @@ main(int argc, char **argv)
|
|||||||
#endif /* ENABLE_THREAD_SAFETY */
|
#endif /* ENABLE_THREAD_SAFETY */
|
||||||
|
|
||||||
/* wait for threads and accumulate results */
|
/* wait for threads and accumulate results */
|
||||||
|
initStats(&stats, 0.0);
|
||||||
INSTR_TIME_SET_ZERO(conn_total_time);
|
INSTR_TIME_SET_ZERO(conn_total_time);
|
||||||
for (i = 0; i < nthreads; i++)
|
for (i = 0; i < nthreads; i++)
|
||||||
{
|
{
|
||||||
TState *thread = &threads[i];
|
TState *thread = &threads[i];
|
||||||
int j;
|
|
||||||
|
|
||||||
#ifdef ENABLE_THREAD_SAFETY
|
#ifdef ENABLE_THREAD_SAFETY
|
||||||
if (threads[i].thread == INVALID_THREAD)
|
if (threads[i].thread == INVALID_THREAD)
|
||||||
@ -3539,21 +3456,13 @@ main(int argc, char **argv)
|
|||||||
(void) threadRun(thread);
|
(void) threadRun(thread);
|
||||||
#endif /* ENABLE_THREAD_SAFETY */
|
#endif /* ENABLE_THREAD_SAFETY */
|
||||||
|
|
||||||
/* thread level stats */
|
/* aggregate thread level stats */
|
||||||
throttle_lag += thread->throttle_lag;
|
mergeSimpleStats(&stats.latency, &thread->stats.latency);
|
||||||
throttle_latency_skipped += threads->throttle_latency_skipped;
|
mergeSimpleStats(&stats.lag, &thread->stats.lag);
|
||||||
|
stats.cnt += thread->stats.cnt;
|
||||||
|
stats.skipped += thread->stats.skipped;
|
||||||
latency_late += thread->latency_late;
|
latency_late += thread->latency_late;
|
||||||
if (throttle_lag_max > thread->throttle_lag_max)
|
|
||||||
throttle_lag_max = thread->throttle_lag_max;
|
|
||||||
INSTR_TIME_ADD(conn_total_time, thread->conn_time);
|
INSTR_TIME_ADD(conn_total_time, thread->conn_time);
|
||||||
|
|
||||||
/* client-level stats */
|
|
||||||
for (j = 0; j < thread->nstate; j++)
|
|
||||||
{
|
|
||||||
total_xacts += thread->state[j].cnt;
|
|
||||||
total_latencies += thread->state[j].txn_latencies;
|
|
||||||
total_sqlats += thread->state[j].txn_sqlats;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
disconnect_all(state, nclients);
|
disconnect_all(state, nclients);
|
||||||
|
|
||||||
@ -3569,10 +3478,7 @@ main(int argc, char **argv)
|
|||||||
*/
|
*/
|
||||||
INSTR_TIME_SET_CURRENT(total_time);
|
INSTR_TIME_SET_CURRENT(total_time);
|
||||||
INSTR_TIME_SUBTRACT(total_time, start_time);
|
INSTR_TIME_SUBTRACT(total_time, start_time);
|
||||||
printResults(total_xacts, nclients, threads, nthreads,
|
printResults(threads, &stats, total_time, conn_total_time, latency_late);
|
||||||
total_time, conn_total_time, total_latencies, total_sqlats,
|
|
||||||
throttle_lag, throttle_lag_max, throttle_latency_skipped,
|
|
||||||
latency_late);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -3593,13 +3499,8 @@ threadRun(void *arg)
|
|||||||
int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
|
int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
|
||||||
int64 last_report = thread_start;
|
int64 last_report = thread_start;
|
||||||
int64 next_report = last_report + (int64) progress * 1000000;
|
int64 next_report = last_report + (int64) progress * 1000000;
|
||||||
int64 last_count = 0,
|
StatsData last,
|
||||||
last_lats = 0,
|
aggs;
|
||||||
last_sqlats = 0,
|
|
||||||
last_lags = 0,
|
|
||||||
last_skipped = 0;
|
|
||||||
|
|
||||||
AggVals aggs;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initialize throttling rate target for all of the thread's clients. It
|
* Initialize throttling rate target for all of the thread's clients. It
|
||||||
@ -3609,8 +3510,6 @@ threadRun(void *arg)
|
|||||||
*/
|
*/
|
||||||
INSTR_TIME_SET_CURRENT(start);
|
INSTR_TIME_SET_CURRENT(start);
|
||||||
thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
|
thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
|
||||||
thread->throttle_lag = 0;
|
|
||||||
thread->throttle_lag_max = 0;
|
|
||||||
|
|
||||||
INSTR_TIME_SET_ZERO(thread->conn_time);
|
INSTR_TIME_SET_ZERO(thread->conn_time);
|
||||||
|
|
||||||
@ -3647,7 +3546,8 @@ threadRun(void *arg)
|
|||||||
INSTR_TIME_SET_CURRENT(thread->conn_time);
|
INSTR_TIME_SET_CURRENT(thread->conn_time);
|
||||||
INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
|
INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
|
||||||
|
|
||||||
agg_vals_init(&aggs, thread->start_time);
|
initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
|
||||||
|
last = aggs;
|
||||||
|
|
||||||
/* send start up queries in async manner */
|
/* send start up queries in async manner */
|
||||||
for (i = 0; i < nstate; i++)
|
for (i = 0; i < nstate; i++)
|
||||||
@ -3661,7 +3561,7 @@ threadRun(void *arg)
|
|||||||
if (debug)
|
if (debug)
|
||||||
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
|
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
|
||||||
sql_script[st->use_file].name);
|
sql_script[st->use_file].name);
|
||||||
if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
|
if (!doCustom(thread, st, logfile, &aggs))
|
||||||
remains--; /* I've aborted */
|
remains--; /* I've aborted */
|
||||||
|
|
||||||
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
|
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
|
||||||
@ -3800,7 +3700,7 @@ threadRun(void *arg)
|
|||||||
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|
||||||
|| commands[st->state]->type == META_COMMAND))
|
|| commands[st->state]->type == META_COMMAND))
|
||||||
{
|
{
|
||||||
if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
|
if (!doCustom(thread, st, logfile, &aggs))
|
||||||
remains--; /* I've aborted */
|
remains--; /* I've aborted */
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3825,11 +3725,7 @@ threadRun(void *arg)
|
|||||||
if (now >= next_report)
|
if (now >= next_report)
|
||||||
{
|
{
|
||||||
/* generate and show report */
|
/* generate and show report */
|
||||||
int64 count = 0,
|
StatsData cur;
|
||||||
lats = 0,
|
|
||||||
sqlats = 0,
|
|
||||||
lags = 0,
|
|
||||||
skipped = 0;
|
|
||||||
int64 run = now - last_report;
|
int64 run = now - last_report;
|
||||||
double tps,
|
double tps,
|
||||||
total_run,
|
total_run,
|
||||||
@ -3850,25 +3746,24 @@ threadRun(void *arg)
|
|||||||
* (If a read from a 64-bit integer is not atomic, you might
|
* (If a read from a 64-bit integer is not atomic, you might
|
||||||
* get a "torn" read and completely bogus latencies though!)
|
* get a "torn" read and completely bogus latencies though!)
|
||||||
*/
|
*/
|
||||||
for (i = 0; i < progress_nclients; i++)
|
initStats(&cur, 0.0);
|
||||||
|
for (i = 0; i < nthreads; i++)
|
||||||
{
|
{
|
||||||
count += state[i].cnt;
|
mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
|
||||||
lats += state[i].txn_latencies;
|
mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
|
||||||
sqlats += state[i].txn_sqlats;
|
cur.cnt += thread[i].stats.cnt;
|
||||||
}
|
cur.skipped += thread[i].stats.skipped;
|
||||||
|
|
||||||
for (i = 0; i < progress_nthreads; i++)
|
|
||||||
{
|
|
||||||
skipped += thread[i].throttle_latency_skipped;
|
|
||||||
lags += thread[i].throttle_lag;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
total_run = (now - thread_start) / 1000000.0;
|
total_run = (now - thread_start) / 1000000.0;
|
||||||
tps = 1000000.0 * (count - last_count) / run;
|
tps = 1000000.0 * (cur.cnt - last.cnt) / run;
|
||||||
latency = 0.001 * (lats - last_lats) / (count - last_count);
|
latency = 0.001 * (cur.latency.sum - last.latency.sum) /
|
||||||
sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
|
(cur.cnt - last.cnt);
|
||||||
|
sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
|
||||||
|
/ (cur.cnt - last.cnt);
|
||||||
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
|
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
|
||||||
lag = 0.001 * (lags - last_lags) / (count - last_count);
|
lag = 0.001 * (cur.lag.sum - last.lag.sum) /
|
||||||
|
(cur.cnt - last.cnt);
|
||||||
|
|
||||||
if (progress_timestamp)
|
if (progress_timestamp)
|
||||||
sprintf(tbuf, "%.03f s",
|
sprintf(tbuf, "%.03f s",
|
||||||
@ -3885,16 +3780,12 @@ threadRun(void *arg)
|
|||||||
fprintf(stderr, ", lag %.3f ms", lag);
|
fprintf(stderr, ", lag %.3f ms", lag);
|
||||||
if (latency_limit)
|
if (latency_limit)
|
||||||
fprintf(stderr, ", " INT64_FORMAT " skipped",
|
fprintf(stderr, ", " INT64_FORMAT " skipped",
|
||||||
skipped - last_skipped);
|
cur.skipped - last.skipped);
|
||||||
}
|
}
|
||||||
fprintf(stderr, "\n");
|
fprintf(stderr, "\n");
|
||||||
|
|
||||||
last_count = count;
|
last = cur;
|
||||||
last_lats = lats;
|
|
||||||
last_sqlats = sqlats;
|
|
||||||
last_lags = lags;
|
|
||||||
last_report = now;
|
last_report = now;
|
||||||
last_skipped = skipped;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Ensure that the next report is in the future, in case
|
* Ensure that the next report is in the future, in case
|
||||||
@ -3914,7 +3805,14 @@ done:
|
|||||||
INSTR_TIME_SET_CURRENT(end);
|
INSTR_TIME_SET_CURRENT(end);
|
||||||
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
|
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
|
||||||
if (logfile)
|
if (logfile)
|
||||||
|
{
|
||||||
|
if (agg_interval)
|
||||||
|
{
|
||||||
|
/* log aggregated but not yet reported transactions */
|
||||||
|
doLog(thread, state, logfile, &end, &aggs, false, 0, 0);
|
||||||
|
}
|
||||||
fclose(logfile);
|
fclose(logfile);
|
||||||
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user