mirror of
https://github.com/postgres/postgres.git
synced 2025-07-30 11:03:19 +03:00
pgbench: Refactor thread portability support.
Instead of maintaining an incomplete emulation of POSIX threads for Windows, let's use an extremely minimalist macro-based abstraction for now. A later patch will extend this, without the need to supply more complicated pthread emulation code. (There may be a need for a more serious portable thread abstraction in later projects, but this is not it.) Minor incidental problems fixed: it wasn't OK to use (pthread_t) 0 as a special value, it wasn't OK to compare thread_t values with ==, and we incorrectly assumed that pthread functions set errno. Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de
This commit is contained in:
@ -111,22 +111,36 @@ typedef struct socket_set
|
|||||||
#endif /* POLL_USING_SELECT */
|
#endif /* POLL_USING_SELECT */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Multi-platform pthread implementations
|
* Multi-platform thread implementations
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifdef WIN32
|
#ifdef WIN32
|
||||||
/* Use native win32 threads on Windows */
|
/* Use Windows threads */
|
||||||
typedef struct win32_pthread *pthread_t;
|
#include <windows.h>
|
||||||
typedef int pthread_attr_t;
|
#define GETERRNO() (_dosmaperr(GetLastError()), errno)
|
||||||
|
#define THREAD_T HANDLE
|
||||||
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
|
#define THREAD_FUNC_RETURN_TYPE unsigned
|
||||||
static int pthread_join(pthread_t th, void **thread_return);
|
#define THREAD_FUNC_RETURN return 0
|
||||||
|
#define THREAD_CREATE(handle, function, arg) \
|
||||||
|
((*(handle) = (HANDLE) _beginthreadex(NULL, 0, (function), (arg), 0, NULL)) == 0 ? errno : 0)
|
||||||
|
#define THREAD_JOIN(handle) \
|
||||||
|
(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
|
||||||
|
GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
|
||||||
#elif defined(ENABLE_THREAD_SAFETY)
|
#elif defined(ENABLE_THREAD_SAFETY)
|
||||||
/* Use platform-dependent pthread capability */
|
/* Use POSIX threads */
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#define THREAD_T pthread_t
|
||||||
|
#define THREAD_FUNC_RETURN_TYPE void *
|
||||||
|
#define THREAD_FUNC_RETURN return NULL
|
||||||
|
#define THREAD_CREATE(handle, function, arg) \
|
||||||
|
pthread_create((handle), NULL, (function), (arg))
|
||||||
|
#define THREAD_JOIN(handle) \
|
||||||
|
pthread_join((handle), NULL)
|
||||||
#else
|
#else
|
||||||
/* No threads implementation, use none (-j 1) */
|
/* No threads implementation, use none (-j 1) */
|
||||||
#define pthread_t void *
|
#define THREAD_T void *
|
||||||
|
#define THREAD_FUNC_RETURN_TYPE void *
|
||||||
|
#define THREAD_FUNC_RETURN return NULL
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
@ -436,7 +450,7 @@ typedef struct
|
|||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
int tid; /* thread id */
|
int tid; /* thread id */
|
||||||
pthread_t thread; /* thread handle */
|
THREAD_T thread; /* thread handle */
|
||||||
CState *state; /* array of CState */
|
CState *state; /* array of CState */
|
||||||
int nstate; /* length of state[] */
|
int nstate; /* length of state[] */
|
||||||
|
|
||||||
@ -459,8 +473,6 @@ typedef struct
|
|||||||
int64 latency_late; /* executed but late transactions */
|
int64 latency_late; /* executed but late transactions */
|
||||||
} TState;
|
} TState;
|
||||||
|
|
||||||
#define INVALID_THREAD ((pthread_t) 0)
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* queries read from files
|
* queries read from files
|
||||||
*/
|
*/
|
||||||
@ -604,7 +616,7 @@ static void doLog(TState *thread, CState *st,
|
|||||||
static void processXactStats(TState *thread, CState *st, instr_time *now,
|
static void processXactStats(TState *thread, CState *st, instr_time *now,
|
||||||
bool skipped, StatsData *agg);
|
bool skipped, StatsData *agg);
|
||||||
static void addScript(ParsedScript script);
|
static void addScript(ParsedScript script);
|
||||||
static void *threadRun(void *arg);
|
static THREAD_FUNC_RETURN_TYPE threadRun(void *arg);
|
||||||
static void finishCon(CState *st);
|
static void finishCon(CState *st);
|
||||||
static void setalarm(int seconds);
|
static void setalarm(int seconds);
|
||||||
static socket_set *alloc_socket_set(int count);
|
static socket_set *alloc_socket_set(int count);
|
||||||
@ -6142,18 +6154,14 @@ main(int argc, char **argv)
|
|||||||
/* the first thread (i = 0) is executed by main thread */
|
/* the first thread (i = 0) is executed by main thread */
|
||||||
if (i > 0)
|
if (i > 0)
|
||||||
{
|
{
|
||||||
int err = pthread_create(&thread->thread, NULL, threadRun, thread);
|
errno = THREAD_CREATE(&thread->thread, threadRun, thread);
|
||||||
|
|
||||||
if (err != 0 || thread->thread == INVALID_THREAD)
|
if (errno != 0)
|
||||||
{
|
{
|
||||||
pg_log_fatal("could not create thread: %m");
|
pg_log_fatal("could not create thread: %m");
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
thread->thread = INVALID_THREAD;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
INSTR_TIME_SET_CURRENT(threads[0].start_time);
|
INSTR_TIME_SET_CURRENT(threads[0].start_time);
|
||||||
@ -6161,7 +6169,6 @@ main(int argc, char **argv)
|
|||||||
if (duration > 0)
|
if (duration > 0)
|
||||||
end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
|
end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
|
||||||
(int64) 1000000 * duration;
|
(int64) 1000000 * duration;
|
||||||
threads[0].thread = INVALID_THREAD;
|
|
||||||
#endif /* ENABLE_THREAD_SAFETY */
|
#endif /* ENABLE_THREAD_SAFETY */
|
||||||
|
|
||||||
/* wait for threads and accumulate results */
|
/* wait for threads and accumulate results */
|
||||||
@ -6172,12 +6179,12 @@ main(int argc, char **argv)
|
|||||||
TState *thread = &threads[i];
|
TState *thread = &threads[i];
|
||||||
|
|
||||||
#ifdef ENABLE_THREAD_SAFETY
|
#ifdef ENABLE_THREAD_SAFETY
|
||||||
if (threads[i].thread == INVALID_THREAD)
|
if (i == 0)
|
||||||
/* actually run this thread directly in the main thread */
|
/* actually run this thread directly in the main thread */
|
||||||
(void) threadRun(thread);
|
(void) threadRun(thread);
|
||||||
else
|
else
|
||||||
/* wait of other threads. should check that 0 is returned? */
|
/* wait of other threads. should check that 0 is returned? */
|
||||||
pthread_join(thread->thread, NULL);
|
THREAD_JOIN(thread->thread);
|
||||||
#else
|
#else
|
||||||
(void) threadRun(thread);
|
(void) threadRun(thread);
|
||||||
#endif /* ENABLE_THREAD_SAFETY */
|
#endif /* ENABLE_THREAD_SAFETY */
|
||||||
@ -6216,7 +6223,7 @@ main(int argc, char **argv)
|
|||||||
return exit_code;
|
return exit_code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *
|
static THREAD_FUNC_RETURN_TYPE
|
||||||
threadRun(void *arg)
|
threadRun(void *arg)
|
||||||
{
|
{
|
||||||
TState *thread = (TState *) arg;
|
TState *thread = (TState *) arg;
|
||||||
@ -6501,7 +6508,7 @@ done:
|
|||||||
thread->logfile = NULL;
|
thread->logfile = NULL;
|
||||||
}
|
}
|
||||||
free_socket_set(sockets);
|
free_socket_set(sockets);
|
||||||
return NULL;
|
THREAD_FUNC_RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -6732,74 +6739,3 @@ socket_has_input(socket_set *sa, int fd, int idx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#endif /* POLL_USING_SELECT */
|
#endif /* POLL_USING_SELECT */
|
||||||
|
|
||||||
|
|
||||||
/* partial pthread implementation for Windows */
|
|
||||||
|
|
||||||
#ifdef WIN32
|
|
||||||
|
|
||||||
typedef struct win32_pthread
|
|
||||||
{
|
|
||||||
HANDLE handle;
|
|
||||||
void *(*routine) (void *);
|
|
||||||
void *arg;
|
|
||||||
void *result;
|
|
||||||
} win32_pthread;
|
|
||||||
|
|
||||||
static unsigned __stdcall
|
|
||||||
win32_pthread_run(void *arg)
|
|
||||||
{
|
|
||||||
win32_pthread *th = (win32_pthread *) arg;
|
|
||||||
|
|
||||||
th->result = th->routine(th->arg);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
pthread_create(pthread_t *thread,
|
|
||||||
pthread_attr_t *attr,
|
|
||||||
void *(*start_routine) (void *),
|
|
||||||
void *arg)
|
|
||||||
{
|
|
||||||
int save_errno;
|
|
||||||
win32_pthread *th;
|
|
||||||
|
|
||||||
th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
|
|
||||||
th->routine = start_routine;
|
|
||||||
th->arg = arg;
|
|
||||||
th->result = NULL;
|
|
||||||
|
|
||||||
th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
|
|
||||||
if (th->handle == NULL)
|
|
||||||
{
|
|
||||||
save_errno = errno;
|
|
||||||
free(th);
|
|
||||||
return save_errno;
|
|
||||||
}
|
|
||||||
|
|
||||||
*thread = th;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
pthread_join(pthread_t th, void **thread_return)
|
|
||||||
{
|
|
||||||
if (th == NULL || th->handle == NULL)
|
|
||||||
return errno = EINVAL;
|
|
||||||
|
|
||||||
if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
|
|
||||||
{
|
|
||||||
_dosmaperr(GetLastError());
|
|
||||||
return errno;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread_return)
|
|
||||||
*thread_return = th->result;
|
|
||||||
|
|
||||||
CloseHandle(th->handle);
|
|
||||||
free(th);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif /* WIN32 */
|
|
||||||
|
Reference in New Issue
Block a user