From aeb57af8e64000cc4288a7b8b8d7cf6040eae900 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Wed, 10 Mar 2021 16:17:34 +1300 Subject: [PATCH] pgbench: Synchronize client threads. Wait until all pgbench threads are connected before benchmarking begins. This fixes a problem where some connections could take a very long time to be established because of lock contention from earlier connections, making results unstable and bogus with high connection counts. Author: Andres Freund Author: Fabien COELHO Reviewed-by: Marina Polyakova Reviewed-by: Kyotaro Horiguchi Reviewed-by: Hayato Kuroda Reviewed-by: David Rowley Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de --- src/bin/pgbench/pgbench.c | 42 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index b556d3f6b49..c0d2a124a92 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -126,9 +126,16 @@ typedef struct socket_set #define THREAD_JOIN(handle) \ (WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \ GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO()) +#define THREAD_BARRIER_T SYNCHRONIZATION_BARRIER +#define THREAD_BARRIER_INIT(barrier, n) \ + (InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO()) +#define THREAD_BARRIER_WAIT(barrier) \ + EnterSynchronizationBarrier((barrier), \ + SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY) +#define THREAD_BARRIER_DESTROY(barrier) #elif defined(ENABLE_THREAD_SAFETY) /* Use POSIX threads */ -#include +#include "port/pg_pthread.h" #define THREAD_T pthread_t #define THREAD_FUNC_RETURN_TYPE void * #define THREAD_FUNC_RETURN return NULL @@ -136,11 +143,20 @@ typedef struct socket_set pthread_create((handle), NULL, (function), (arg)) #define THREAD_JOIN(handle) \ pthread_join((handle), NULL) +#define THREAD_BARRIER_T pthread_barrier_t +#define THREAD_BARRIER_INIT(barrier, n) \ + pthread_barrier_init((barrier), NULL, (n)) +#define THREAD_BARRIER_WAIT(barrier) pthread_barrier_wait((barrier)) +#define THREAD_BARRIER_DESTROY(barrier) pthread_barrier_destroy((barrier)) #else /* No threads implementation, use none (-j 1) */ #define THREAD_T void * #define THREAD_FUNC_RETURN_TYPE void * #define THREAD_FUNC_RETURN return NULL +#define THREAD_BARRIER_T int +#define THREAD_BARRIER_INIT(barrier, n) (*(barrier) = 0) +#define THREAD_BARRIER_WAIT(barrier) +#define THREAD_BARRIER_DESTROY(barrier) #endif @@ -326,6 +342,9 @@ typedef struct RandomState /* Various random sequences are initialized from this one. */ static RandomState base_random_sequence; +/* Synchronization barrier for start and connection */ +static THREAD_BARRIER_T barrier; + /* * Connection state machine states. */ @@ -6121,6 +6140,10 @@ main(int argc, char **argv) if (duration > 0) setalarm(duration); + errno = THREAD_BARRIER_INIT(&barrier, nthreads); + if (errno != 0) + pg_log_fatal("could not initialize barrier: %m"); + #ifdef ENABLE_THREAD_SAFETY /* start all threads but thread 0 which is executed directly later */ for (i = 1; i < nthreads; i++) @@ -6191,6 +6214,8 @@ main(int argc, char **argv) printResults(&stats, pg_time_now() - bench_start, conn_total_duration, bench_start - start_time, latency_late); + THREAD_BARRIER_DESTROY(&barrier); + if (exit_code != 0) pg_log_fatal("Run was aborted; the above results are incomplete."); @@ -6237,6 +6262,8 @@ threadRun(void *arg) state[i].state = CSTATE_CHOOSE_SCRIPT; /* READY */ + THREAD_BARRIER_WAIT(&barrier); + thread_start = pg_time_now(); thread->started_time = thread_start; last_report = thread_start; @@ -6249,7 +6276,18 @@ threadRun(void *arg) for (int i = 0; i < nstate; i++) { if ((state[i].con = doConnect()) == NULL) + { + /* + * On connection failure, we meet the barrier here in place of + * GO before proceeding to the "done" path which will cleanup, + * so as to avoid locking the process. + * + * It is unclear whether it is worth doing anything rather than + * coldly exiting with an error message. + */ + THREAD_BARRIER_WAIT(&barrier); goto done; + } } /* compute connection delay */ @@ -6261,6 +6299,8 @@ threadRun(void *arg) thread->conn_duration = 0; } + /* GO */ + THREAD_BARRIER_WAIT(&barrier); start = pg_time_now(); thread->bench_start = start;