1
0
mirror of https://github.com/lammertb/libhttp.git synced 2025-12-22 04:02:04 +03:00

Use eventfd instead of pthread_cond (step 3/?)

This commit is contained in:
bel
2016-07-04 21:33:48 +02:00
parent fd43dd37b1
commit abdacc98e4

View File

@@ -20,6 +20,7 @@
* THE SOFTWARE. * THE SOFTWARE.
*/ */
#define ALTERNATIVE_QUEUE
#if defined(_WIN32) #if defined(_WIN32)
#if !defined(_CRT_SECURE_NO_WARNINGS) #if !defined(_CRT_SECURE_NO_WARNINGS)
@@ -1235,6 +1236,7 @@ struct socket {
unsigned char is_ssl; /* Is port SSL-ed */ unsigned char is_ssl; /* Is port SSL-ed */
unsigned char ssl_redir; /* Is port supposed to redirect everything to SSL unsigned char ssl_redir; /* Is port supposed to redirect everything to SSL
* port */ * port */
unsigned char in_use; /* Is valid */
}; };
/* NOTE(lsm): this enum shoulds be in sync with the config_options below. */ /* NOTE(lsm): this enum shoulds be in sync with the config_options below. */
@@ -1499,6 +1501,8 @@ struct mg_connection {
#if defined(USE_LUA) && defined(USE_WEBSOCKET) #if defined(USE_LUA) && defined(USE_WEBSOCKET)
void *lua_websocket_state; /* Lua_State for a websocket connection */ void *lua_websocket_state; /* Lua_State for a websocket connection */
#endif #endif
int thread_index; /* Thread index within ctx */
}; };
@@ -3074,7 +3078,7 @@ event_wait(int eventhdl)
static int static int
event_signal(int eventhdl, unsigned timeout) event_signal(int eventhdl)
{ {
return (int)SetEvent((HANDLE)eventhdl); return (int)SetEvent((HANDLE)eventhdl);
} }
@@ -12626,27 +12630,41 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
{ {
unsigned int i; unsigned int i;
for (i = 0; i < ctx->cfg_worker_threads; i++) { for (;;) {
/* TODO: find a free worker slot and signal it */ for (i = 0; i < ctx->cfg_worker_threads; i++) {
/* find a free worker slot and signal it */
if (ctx->client_socks[i].in_use == 0) {
ctx->client_socks[i] = *sp;
ctx->client_socks[i].in_use = 1;
event_signal(ctx->client_wait_events[i]);
return;
}
}
/* queue is full */
mg_sleep(1);
} }
} }
static int static int
consume_socket(struct mg_context *ctx, struct socket *sp) consume_socket(struct mg_context *ctx, struct socket *sp, int index)
{ {
/* TODO: every thread must wait for exactly one slot in the list */ ctx->client_socks[index].in_use = 0;
event_wait(ctx->client_wait_events[index]);
*sp = ctx->client_socks[index];
return !ctx->stop_flag;
} }
#else /* ALTERNATIVE_QUEUE */ #else /* ALTERNATIVE_QUEUE */
/* Worker threads take accepted socket from the queue */ /* Worker threads take accepted socket from the queue */
static int static int
consume_socket(struct mg_context *ctx, struct socket *sp) consume_socket(struct mg_context *ctx, struct socket *sp, int index)
{ {
#define QUEUE_SIZE(ctx) ((int)(ARRAY_SIZE(ctx->queue))) #define QUEUE_SIZE(ctx) ((int)(ARRAY_SIZE(ctx->queue)))
if (!ctx) {
return 0; (void)index;
}
(void)pthread_mutex_lock(&ctx->thread_mutex); (void)pthread_mutex_lock(&ctx->thread_mutex);
DEBUG_TRACE("%s", "going idle"); DEBUG_TRACE("%s", "going idle");
@@ -12709,10 +12727,16 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
#endif /* ALTERNATIVE_QUEUE */ #endif /* ALTERNATIVE_QUEUE */
struct worker_thread_args {
struct mg_context *ctx;
int index;
};
static void * static void *
worker_thread_run(void *thread_func_param) worker_thread_run(struct worker_thread_args *thread_args)
{ {
struct mg_context *ctx = (struct mg_context *)thread_func_param; struct mg_context *ctx = thread_args->ctx;
struct mg_connection *conn; struct mg_connection *conn;
struct mg_workerTLS tls; struct mg_workerTLS tls;
#if defined(MG_LEGACY_INTERFACE) #if defined(MG_LEGACY_INTERFACE)
@@ -12741,6 +12765,7 @@ worker_thread_run(void *thread_func_param)
conn->buf_size = MAX_REQUEST_SIZE; conn->buf_size = MAX_REQUEST_SIZE;
conn->buf = (char *)(conn + 1); conn->buf = (char *)(conn + 1);
conn->ctx = ctx; conn->ctx = ctx;
conn->thread_index = thread_args->index;
conn->request_info.user_data = ctx->user_data; conn->request_info.user_data = ctx->user_data;
/* Allocate a mutex for this connection to allow communication both /* Allocate a mutex for this connection to allow communication both
* within the request handler and from elsewhere in the application * within the request handler and from elsewhere in the application
@@ -12750,7 +12775,7 @@ worker_thread_run(void *thread_func_param)
/* Call consume_socket() even when ctx->stop_flag > 0, to let it /* Call consume_socket() even when ctx->stop_flag > 0, to let it
* signal sq_empty condvar to wake up the master waiting in * signal sq_empty condvar to wake up the master waiting in
* produce_socket() */ * produce_socket() */
while (consume_socket(ctx, &conn->client)) { while (consume_socket(ctx, &conn->client, conn->thread_index)) {
conn->conn_birth_time = time(NULL); conn->conn_birth_time = time(NULL);
/* Fill in IP, port info early so even if SSL setup below fails, /* Fill in IP, port info early so even if SSL setup below fails,
@@ -12810,14 +12835,20 @@ worker_thread_run(void *thread_func_param)
#ifdef _WIN32 #ifdef _WIN32
static unsigned __stdcall worker_thread(void *thread_func_param) static unsigned __stdcall worker_thread(void *thread_func_param)
{ {
struct worker_thread_args *pwta =
(struct worker_thread_args *)thread_func_param;
worker_thread_run(thread_func_param); worker_thread_run(thread_func_param);
mg_free(thread_func_param);
return 0; return 0;
} }
#else #else
static void * static void *
worker_thread(void *thread_func_param) worker_thread(void *thread_func_param)
{ {
struct worker_thread_args *pwta =
(struct worker_thread_args *)thread_func_param;
worker_thread_run(thread_func_param); worker_thread_run(thread_func_param);
mg_free(thread_func_param);
return NULL; return NULL;
} }
#endif /* _WIN32 */ #endif /* _WIN32 */
@@ -13055,6 +13086,11 @@ free_context(struct mg_context *ctx)
*/ */
(void)pthread_mutex_destroy(&ctx->thread_mutex); (void)pthread_mutex_destroy(&ctx->thread_mutex);
#if defined(ALTERNATIVE_QUEUE) #if defined(ALTERNATIVE_QUEUE)
mg_free(ctx->client_socks);
for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
event_destroy(ctx->client_wait_events[i]);
}
mg_free(ctx->client_wait_events);
/* TODO: free allocated memory */ /* TODO: free allocated memory */
#else #else
(void)pthread_cond_destroy(&ctx->sq_empty); (void)pthread_cond_destroy(&ctx->sq_empty);
@@ -13263,9 +13299,7 @@ mg_start(const struct mg_callbacks *callbacks,
#endif #endif
ok = 0 == pthread_mutex_init(&ctx->thread_mutex, &pthread_mutex_attr); ok = 0 == pthread_mutex_init(&ctx->thread_mutex, &pthread_mutex_attr);
#if defined(ALTERNATIVE_QUEUE) #if !defined(ALTERNATIVE_QUEUE)
/* TODO: allocate memory */
#else
ok &= 0 == pthread_cond_init(&ctx->sq_empty, NULL); ok &= 0 == pthread_cond_init(&ctx->sq_empty, NULL);
ok &= 0 == pthread_cond_init(&ctx->sq_full, NULL); ok &= 0 == pthread_cond_init(&ctx->sq_full, NULL);
#endif #endif
@@ -13371,6 +13405,37 @@ mg_start(const struct mg_callbacks *callbacks,
pthread_setspecific(sTlsKey, NULL); pthread_setspecific(sTlsKey, NULL);
return NULL; return NULL;
} }
#if defined(ALTERNATIVE_QUEUE)
ctx->client_wait_events = mg_calloc(sizeof(ctx->client_wait_events[0]),
ctx->cfg_worker_threads);
if (ctx->client_wait_events == NULL) {
mg_cry(fc(ctx), "Not enough memory for worker event array");
mg_free(ctx->workerthreadids);
free_context(ctx);
pthread_setspecific(sTlsKey, NULL);
return NULL;
}
ctx->client_socks =
mg_calloc(sizeof(ctx->client_socks[0]), ctx->cfg_worker_threads);
if (ctx->client_wait_events == NULL) {
mg_cry(fc(ctx), "Not enough memory for worker socket array");
mg_free(ctx->client_socks);
mg_free(ctx->workerthreadids);
free_context(ctx);
pthread_setspecific(sTlsKey, NULL);
return NULL;
}
for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
ctx->client_wait_events[i] = event_create();
if (ctx->client_wait_events[i] == 0) {
mg_cry(fc(ctx), "Error creating worker event %i", i);
/* TODO: clean all and exit */
}
}
#endif
} }
#if defined(USE_TIMERS) #if defined(USE_TIMERS)
@@ -13394,9 +13459,23 @@ mg_start(const struct mg_callbacks *callbacks,
/* Start worker threads */ /* Start worker threads */
for (i = 0; i < ctx->cfg_worker_threads; i++) { for (i = 0; i < ctx->cfg_worker_threads; i++) {
if (mg_start_thread_with_id(worker_thread, struct worker_thread_args *wta =
ctx, mg_malloc(sizeof(struct worker_thread_args));
&ctx->workerthreadids[i]) != 0) { if (wta) {
wta->ctx = ctx;
wta->index = i;
}
if ((wta == NULL)
|| (mg_start_thread_with_id(worker_thread,
wta,
&ctx->workerthreadids[i]) != 0)) {
/* thread was not created */
if (wta != NULL) {
mg_free(wta);
}
if (i > 0) { if (i > 0) {
mg_cry(fc(ctx), mg_cry(fc(ctx),
"Cannot start worker thread %i: error %ld", "Cannot start worker thread %i: error %ld",