1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Add --latency-limit option to pgbench.

This allows transactions that take longer than specified limit to be counted
separately. With --rate, transactions that are already late by the time we
get to execute them are skipped altogether. Using --latency-limit with
--rate allows you to "catch up" more quickly, if there's a hickup in the
server causing a lot of transactions to stall momentarily.

Fabien COELHO, reviewed by Rukh Meski and heavily refactored by me.
This commit is contained in:
Heikki Linnakangas
2014-10-13 20:25:56 +03:00
parent 30d7ae3c76
commit 98aed6c721
2 changed files with 237 additions and 68 deletions

View File

@ -140,6 +140,14 @@ double sample_rate = 0.0;
*/
int64 throttle_delay = 0;
/*
* Transactions which take longer than this limit (in usec) are counted as
* late, and reported as such, although they are completed anyway. When
* throttling is enabled, execution time slots that are more than this late
* are skipped altogether, and counted separately.
*/
int64 latency_limit = 0;
/*
* tablespace selection
*/
@ -238,6 +246,8 @@ typedef struct
int64 throttle_trigger; /* previous/next throttling (us) */
int64 throttle_lag; /* total transaction lag behind throttling */
int64 throttle_lag_max; /* max transaction lag */
int64 throttle_latency_skipped; /* lagging transactions skipped */
int64 latency_late; /* late transactions */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@ -250,6 +260,8 @@ typedef struct
int64 sqlats;
int64 throttle_lag;
int64 throttle_lag_max;
int64 throttle_latency_skipped;
int64 latency_late;
} TResult;
/*
@ -284,6 +296,8 @@ typedef struct
long start_time; /* when does the interval start */
int cnt; /* number of transactions */
int skipped; /* number of transactions skipped under
* --rate and --latency-limit */
double min_latency; /* min/max latencies */
double max_latency;
@ -348,7 +362,7 @@ static void setalarm(int seconds);
static void *threadRun(void *arg);
static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
AggVals *agg);
AggVals *agg, bool skipped);
static void
usage(void)
@ -375,6 +389,8 @@ usage(void)
" -f, --file=FILENAME read transaction script from FILENAME\n"
" -j, --jobs=NUM number of threads (default: 1)\n"
" -l, --log write transaction times to log file\n"
" -L, --latency-limit=NUM count transactions lasting more than NUM ms\n"
" as late.\n"
" -M, --protocol=simple|extended|prepared\n"
" protocol for submitting queries (default: simple)\n"
" -n, --no-vacuum do not run VACUUM before tests\n"
@ -994,7 +1010,9 @@ void
agg_vals_init(AggVals *aggs, instr_time start)
{
/* basic counters */
aggs->cnt = 0; /* number of transactions */
aggs->cnt = 0; /* number of transactions (includes skipped) */
aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
aggs->sum_latency = 0; /* SUM(latency) */
aggs->sum2_latency = 0; /* SUM(latency*latency) */
@ -1050,8 +1068,34 @@ top:
int64 wait = getPoissonRand(thread, throttle_delay);
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
/*
* If this --latency-limit is used, and this slot is already late so
* that the transaction will miss the latency limit even if it
* completed immediately, we skip this time slot and iterate till the
* next slot that isn't late yet.
*/
if (latency_limit)
{
int64 now_us;
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
now_us = INSTR_TIME_GET_MICROSEC(now);
while (thread->throttle_trigger < now_us - latency_limit)
{
thread->throttle_latency_skipped++;
if (logfile)
doLog(thread, st, logfile, &now, agg, true);
wait = getPoissonRand(thread, throttle_delay);
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
}
}
st->sleeping = 1;
st->throttling = true;
st->is_throttled = true;
@ -1119,12 +1163,13 @@ top:
if (commands[st->state + 1] == NULL)
{
/* only calculate latency if an option is used that needs it */
if (progress || throttle_delay)
if (progress || throttle_delay || latency_limit)
{
int64 latency;
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
st->txn_latencies += latency;
@ -1137,11 +1182,15 @@ top:
* transactions, overflow would take 256 hours.
*/
st->txn_sqlats += latency * latency;
/* record over the limit transactions if needed. */
if (latency_limit && latency > latency_limit)
thread->latency_late++;
}
/* record the time it took in the log */
if (logfile)
doLog(thread, st, logfile, &now, agg);
doLog(thread, st, logfile, &now, agg, false);
}
if (commands[st->state]->type == SQL_COMMAND)
@ -1227,7 +1276,7 @@ top:
}
/* Record transaction start time under logging, progress or throttling */
if ((logfile || progress || throttle_delay) && st->state == 0)
if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
{
INSTR_TIME_SET_CURRENT(st->txn_begin);
@ -1605,7 +1654,8 @@ top:
* print log entry after completing one transaction.
*/
static void
doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
bool skipped)
{
double lag;
double latency;
@ -1622,7 +1672,10 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
INSTR_TIME_SET_CURRENT(*now);
latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
if (skipped)
lag = latency;
else
lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
/* should we aggregate the results or not? */
if (agg_interval > 0)
@ -1634,26 +1687,34 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
{
agg->cnt += 1;
agg->sum_latency += latency;
agg->sum2_latency += latency * latency;
/* first in this aggregation interval */
if ((agg->cnt == 1) || (latency < agg->min_latency))
agg->min_latency = latency;
if ((agg->cnt == 1) || (latency > agg->max_latency))
agg->max_latency = latency;
/* and the same for schedule lag */
if (throttle_delay)
if (skipped)
{
agg->sum_lag += lag;
agg->sum2_lag += lag * lag;
/* there is no latency to record if the transaction was skipped */
agg->skipped += 1;
}
else
{
agg->sum_latency += latency;
agg->sum2_latency += latency * latency;
if ((agg->cnt == 1) || (lag < agg->min_lag))
agg->min_lag = lag;
if ((agg->cnt == 1) || (lag > agg->max_lag))
agg->max_lag = lag;
/* first in this aggregation interval */
if ((agg->cnt == 1) || (latency < agg->min_latency))
agg->min_latency = latency;
if ((agg->cnt == 1) || (latency > agg->max_latency))
agg->max_latency = latency;
/* and the same for schedule lag */
if (throttle_delay)
{
agg->sum_lag += lag;
agg->sum2_lag += lag * lag;
if ((agg->cnt == 1) || (lag < agg->min_lag))
agg->min_lag = lag;
if ((agg->cnt == 1) || (lag > agg->max_lag))
agg->max_lag = lag;
}
}
}
else
@ -1677,11 +1738,15 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
agg->min_latency,
agg->max_latency);
if (throttle_delay)
{
fprintf(logfile, " %.0f %.0f %.0f %.0f",
agg->sum_lag,
agg->sum2_lag,
agg->min_lag,
agg->max_lag);
if (latency_limit)
fprintf(logfile, " %d", agg->skipped);
}
fputc('\n', logfile);
/* move to the next inteval */
@ -1689,6 +1754,7 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
/* reset for "no transaction" intervals */
agg->cnt = 0;
agg->skipped = 0;
agg->min_latency = 0;
agg->max_latency = 0;
agg->sum_latency = 0;
@ -1701,10 +1767,11 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
/* reset the values to include only the current transaction. */
agg->cnt = 1;
agg->skipped = skipped ? 1 : 0;
agg->min_latency = latency;
agg->max_latency = latency;
agg->sum_latency = latency;
agg->sum2_latency = latency * latency;
agg->sum_latency = skipped ? 0.0 : latency;
agg->sum2_latency = skipped ? 0.0 : latency * latency;
agg->min_lag = lag;
agg->max_lag = lag;
agg->sum_lag = lag;
@ -1717,14 +1784,23 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg)
#ifndef WIN32
/* This is more than we really ought to know about instr_time */
fprintf(logfile, "%d %d %.0f %d %ld %ld",
st->id, st->cnt, latency, st->use_file,
(long) now->tv_sec, (long) now->tv_usec);
if (skipped)
fprintf(logfile, "%d %d skipped %d %ld %ld",
st->id, st->cnt, st->use_file,
(long) now->tv_sec, (long) now->tv_usec);
else
fprintf(logfile, "%d %d %.0f %d %ld %ld",
st->id, st->cnt, latency, st->use_file,
(long) now->tv_sec, (long) now->tv_usec);
#else
/* On Windows, instr_time doesn't provide a timestamp anyway */
fprintf(logfile, "%d %d %.0f %d 0 0",
st->id, st->cnt, latency, st->use_file);
if (skipped)
fprintf(logfile, "%d %d skipped %d 0 0",
st->id, st->cnt, st->use_file);
else
fprintf(logfile, "%d %d %.0f %d 0 0",
st->id, st->cnt, latency, st->use_file);
#endif
if (throttle_delay)
fprintf(logfile, " %.0f", lag);
@ -2424,7 +2500,8 @@ printResults(int ttype, int64 normal_xacts, int nclients,
TState *threads, int nthreads,
instr_time total_time, instr_time conn_total_time,
int64 total_latencies, int64 total_sqlats,
int64 throttle_lag, int64 throttle_lag_max)
int64 throttle_lag, int64 throttle_lag_max,
int64 throttle_latency_skipped, int64 latency_late)
{
double time_include,
tps_include,
@ -2463,7 +2540,17 @@ printResults(int ttype, int64 normal_xacts, int nclients,
normal_xacts);
}
if (throttle_delay || progress)
if (throttle_delay && latency_limit)
printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
throttle_latency_skipped,
100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
if (latency_limit)
printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
latency_limit / 1000.0, latency_late,
100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
if (throttle_delay || progress || latency_limit)
{
/* compute and show latency average and standard deviation */
double latency = 0.001 * total_latencies / normal_xacts;
@ -2578,6 +2665,7 @@ main(int argc, char **argv)
{"sampling-rate", required_argument, NULL, 4},
{"aggregate-interval", required_argument, NULL, 5},
{"rate", required_argument, NULL, 'R'},
{"latency-limit", required_argument, NULL, 'L'},
{NULL, 0, NULL, 0}
};
@ -2607,6 +2695,8 @@ main(int argc, char **argv)
int64 total_sqlats = 0;
int64 throttle_lag = 0;
int64 throttle_lag_max = 0;
int64 throttle_latency_skipped = 0;
int64 latency_late = 0;
int i;
@ -2651,7 +2741,7 @@ main(int argc, char **argv)
state = (CState *) pg_malloc(sizeof(CState));
memset(state, 0, sizeof(CState));
while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
{
switch (c)
{
@ -2848,6 +2938,18 @@ main(int argc, char **argv)
throttle_delay = (int64) (1000000.0 / throttle_value);
}
break;
case 'L':
{
double limit_ms = atof(optarg);
if (limit_ms <= 0.0)
{
fprintf(stderr, "invalid latency limit: %s\n", optarg);
exit(1);
}
benchmarking_option_set = true;
latency_limit = (int64) (limit_ms * 1000);
}
break;
case 0:
/* This covers long options which take no argument. */
if (foreign_keys || unlogged_tables)
@ -3143,6 +3245,8 @@ main(int argc, char **argv)
thread->random_state[0] = random();
thread->random_state[1] = random();
thread->random_state[2] = random();
thread->throttle_latency_skipped = 0;
thread->latency_late = 0;
if (is_latencies)
{
@ -3217,6 +3321,8 @@ main(int argc, char **argv)
total_latencies += r->latencies;
total_sqlats += r->sqlats;
throttle_lag += r->throttle_lag;
throttle_latency_skipped += r->throttle_latency_skipped;
latency_late += r->latency_late;
if (r->throttle_lag_max > throttle_lag_max)
throttle_lag_max = r->throttle_lag_max;
INSTR_TIME_ADD(conn_total_time, r->conn_time);
@ -3239,7 +3345,8 @@ main(int argc, char **argv)
INSTR_TIME_SUBTRACT(total_time, start_time);
printResults(ttype, total_xacts, nclients, threads, nthreads,
total_time, conn_total_time, total_latencies, total_sqlats,
throttle_lag, throttle_lag_max);
throttle_lag, throttle_lag_max, throttle_latency_skipped,
latency_late);
return 0;
}
@ -3264,7 +3371,8 @@ threadRun(void *arg)
int64 last_count = 0,
last_lats = 0,
last_sqlats = 0,
last_lags = 0;
last_lags = 0,
last_skipped = 0;
AggVals aggs;
@ -3467,7 +3575,8 @@ threadRun(void *arg)
/* generate and show report */
int64 count = 0,
lats = 0,
sqlats = 0;
sqlats = 0,
skipped = 0;
int64 lags = thread->throttle_lag;
int64 run = now - last_report;
double tps,
@ -3490,23 +3599,26 @@ threadRun(void *arg)
sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
lag = 0.001 * (lags - last_lags) / (count - last_count);
skipped = thread->throttle_latency_skipped - last_skipped;
fprintf(stderr,
"progress %d: %.1f s, %.1f tps, "
"lat %.3f ms stddev %.3f",
thread->tid, total_run, tps, latency, stdev);
if (throttle_delay)
fprintf(stderr,
"progress %d: %.1f s, %.1f tps, "
"lat %.3f ms stddev %.3f, lag %.3f ms\n",
thread->tid, total_run, tps, latency, stdev, lag);
else
fprintf(stderr,
"progress %d: %.1f s, %.1f tps, "
"lat %.3f ms stddev %.3f\n",
thread->tid, total_run, tps, latency, stdev);
{
fprintf(stderr, ", lag %.3f ms", lag);
if (latency_limit)
fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
}
fprintf(stderr, "\n");
last_count = count;
last_lats = lats;
last_sqlats = sqlats;
last_lags = lags;
last_report = now;
last_skipped = thread->throttle_latency_skipped;
next_report += (int64) progress *1000000;
}
}
@ -3525,7 +3637,8 @@ threadRun(void *arg)
int64 count = 0,
lats = 0,
sqlats = 0,
lags = 0;
lags = 0,
skipped = 0;
int64 run = now - last_report;
double tps,
total_run,
@ -3550,23 +3663,26 @@ threadRun(void *arg)
sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
lag = 0.001 * (lags - last_lags) / (count - last_count);
skipped = thread->throttle_latency_skipped - last_skipped;
fprintf(stderr,
"progress: %.1f s, %.1f tps, "
"lat %.3f ms stddev %.3f",
total_run, tps, latency, stdev);
if (throttle_delay)
fprintf(stderr,
"progress: %.1f s, %.1f tps, "
"lat %.3f ms stddev %.3f, lag %.3f ms\n",
total_run, tps, latency, stdev, lag);
else
fprintf(stderr,
"progress: %.1f s, %.1f tps, "
"lat %.3f ms stddev %.3f\n",
total_run, tps, latency, stdev);
{
fprintf(stderr, ", lag %.3f ms", lag);
if (latency_limit)
fprintf(stderr, ", " INT64_FORMAT " skipped", skipped);
}
fprintf(stderr, "\n");
last_count = count;
last_lats = lats;
last_sqlats = sqlats;
last_lags = lags;
last_report = now;
last_skipped = thread->throttle_latency_skipped;
next_report += (int64) progress *1000000;
}
}
@ -3587,6 +3703,9 @@ done:
}
result->throttle_lag = thread->throttle_lag;
result->throttle_lag_max = thread->throttle_lag_max;
result->throttle_latency_skipped = thread->throttle_latency_skipped;
result->latency_late = thread->latency_late;
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
if (logfile)