diff --git a/src/civetweb.c b/src/civetweb.c index 5c1cd2e4..27bc5219 100644 --- a/src/civetweb.c +++ b/src/civetweb.c @@ -20,6 +20,7 @@ * THE SOFTWARE. */ +#define ALTERNATIVE_QUEUE #if defined(_WIN32) #if !defined(_CRT_SECURE_NO_WARNINGS) @@ -1235,6 +1236,7 @@ struct socket { unsigned char is_ssl; /* Is port SSL-ed */ unsigned char ssl_redir; /* Is port supposed to redirect everything to SSL * port */ + unsigned char in_use; /* Is valid */ }; /* NOTE(lsm): this enum shoulds be in sync with the config_options below. */ @@ -1499,6 +1501,8 @@ struct mg_connection { #if defined(USE_LUA) && defined(USE_WEBSOCKET) void *lua_websocket_state; /* Lua_State for a websocket connection */ #endif + + int thread_index; /* Thread index within ctx */ }; @@ -3074,7 +3078,7 @@ event_wait(int eventhdl) static int -event_signal(int eventhdl, unsigned timeout) +event_signal(int eventhdl) { return (int)SetEvent((HANDLE)eventhdl); } @@ -12626,27 +12630,41 @@ produce_socket(struct mg_context *ctx, const struct socket *sp) { unsigned int i; - for (i = 0; i < ctx->cfg_worker_threads; i++) { - /* TODO: find a free worker slot and signal it */ + for (;;) { + for (i = 0; i < ctx->cfg_worker_threads; i++) { + /* find a free worker slot and signal it */ + if (ctx->client_socks[i].in_use == 0) { + ctx->client_socks[i] = *sp; + ctx->client_socks[i].in_use = 1; + event_signal(ctx->client_wait_events[i]); + return; + } + } + /* queue is full */ + mg_sleep(1); } } + static int -consume_socket(struct mg_context *ctx, struct socket *sp) +consume_socket(struct mg_context *ctx, struct socket *sp, int index) { - /* TODO: every thread must wait for exactly one slot in the list */ + ctx->client_socks[index].in_use = 0; + event_wait(ctx->client_wait_events[index]); + *sp = ctx->client_socks[index]; + + return !ctx->stop_flag; } #else /* ALTERNATIVE_QUEUE */ /* Worker threads take accepted socket from the queue */ static int -consume_socket(struct mg_context *ctx, struct socket *sp) +consume_socket(struct mg_context *ctx, struct socket *sp, int index) { #define QUEUE_SIZE(ctx) ((int)(ARRAY_SIZE(ctx->queue))) - if (!ctx) { - return 0; - } + + (void)index; (void)pthread_mutex_lock(&ctx->thread_mutex); DEBUG_TRACE("%s", "going idle"); @@ -12709,10 +12727,16 @@ produce_socket(struct mg_context *ctx, const struct socket *sp) #endif /* ALTERNATIVE_QUEUE */ +struct worker_thread_args { + struct mg_context *ctx; + int index; +}; + + static void * -worker_thread_run(void *thread_func_param) +worker_thread_run(struct worker_thread_args *thread_args) { - struct mg_context *ctx = (struct mg_context *)thread_func_param; + struct mg_context *ctx = thread_args->ctx; struct mg_connection *conn; struct mg_workerTLS tls; #if defined(MG_LEGACY_INTERFACE) @@ -12741,6 +12765,7 @@ worker_thread_run(void *thread_func_param) conn->buf_size = MAX_REQUEST_SIZE; conn->buf = (char *)(conn + 1); conn->ctx = ctx; + conn->thread_index = thread_args->index; conn->request_info.user_data = ctx->user_data; /* Allocate a mutex for this connection to allow communication both * within the request handler and from elsewhere in the application @@ -12750,7 +12775,7 @@ worker_thread_run(void *thread_func_param) /* Call consume_socket() even when ctx->stop_flag > 0, to let it * signal sq_empty condvar to wake up the master waiting in * produce_socket() */ - while (consume_socket(ctx, &conn->client)) { + while (consume_socket(ctx, &conn->client, conn->thread_index)) { conn->conn_birth_time = time(NULL); /* Fill in IP, port info early so even if SSL setup below fails, @@ -12810,14 +12835,20 @@ worker_thread_run(void *thread_func_param) #ifdef _WIN32 static unsigned __stdcall worker_thread(void *thread_func_param) { + struct worker_thread_args *pwta = + (struct worker_thread_args *)thread_func_param; worker_thread_run(thread_func_param); + mg_free(thread_func_param); return 0; } #else static void * worker_thread(void *thread_func_param) { + struct worker_thread_args *pwta = + (struct worker_thread_args *)thread_func_param; worker_thread_run(thread_func_param); + mg_free(thread_func_param); return NULL; } #endif /* _WIN32 */ @@ -13055,6 +13086,11 @@ free_context(struct mg_context *ctx) */ (void)pthread_mutex_destroy(&ctx->thread_mutex); #if defined(ALTERNATIVE_QUEUE) + mg_free(ctx->client_socks); + for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) { + event_destroy(ctx->client_wait_events[i]); + } + mg_free(ctx->client_wait_events); /* TODO: free allocated memory */ #else (void)pthread_cond_destroy(&ctx->sq_empty); @@ -13263,9 +13299,7 @@ mg_start(const struct mg_callbacks *callbacks, #endif ok = 0 == pthread_mutex_init(&ctx->thread_mutex, &pthread_mutex_attr); -#if defined(ALTERNATIVE_QUEUE) -/* TODO: allocate memory */ -#else +#if !defined(ALTERNATIVE_QUEUE) ok &= 0 == pthread_cond_init(&ctx->sq_empty, NULL); ok &= 0 == pthread_cond_init(&ctx->sq_full, NULL); #endif @@ -13371,6 +13405,37 @@ mg_start(const struct mg_callbacks *callbacks, pthread_setspecific(sTlsKey, NULL); return NULL; } + +#if defined(ALTERNATIVE_QUEUE) + ctx->client_wait_events = mg_calloc(sizeof(ctx->client_wait_events[0]), + ctx->cfg_worker_threads); + if (ctx->client_wait_events == NULL) { + mg_cry(fc(ctx), "Not enough memory for worker event array"); + mg_free(ctx->workerthreadids); + free_context(ctx); + pthread_setspecific(sTlsKey, NULL); + return NULL; + } + + ctx->client_socks = + mg_calloc(sizeof(ctx->client_socks[0]), ctx->cfg_worker_threads); + if (ctx->client_wait_events == NULL) { + mg_cry(fc(ctx), "Not enough memory for worker socket array"); + mg_free(ctx->client_socks); + mg_free(ctx->workerthreadids); + free_context(ctx); + pthread_setspecific(sTlsKey, NULL); + return NULL; + } + + for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) { + ctx->client_wait_events[i] = event_create(); + if (ctx->client_wait_events[i] == 0) { + mg_cry(fc(ctx), "Error creating worker event %i", i); + /* TODO: clean all and exit */ + } + } +#endif } #if defined(USE_TIMERS) @@ -13394,9 +13459,23 @@ mg_start(const struct mg_callbacks *callbacks, /* Start worker threads */ for (i = 0; i < ctx->cfg_worker_threads; i++) { - if (mg_start_thread_with_id(worker_thread, - ctx, - &ctx->workerthreadids[i]) != 0) { + struct worker_thread_args *wta = + mg_malloc(sizeof(struct worker_thread_args)); + if (wta) { + wta->ctx = ctx; + wta->index = i; + } + + if ((wta == NULL) + || (mg_start_thread_with_id(worker_thread, + wta, + &ctx->workerthreadids[i]) != 0)) { + + /* thread was not created */ + if (wta != NULL) { + mg_free(wta); + } + if (i > 0) { mg_cry(fc(ctx), "Cannot start worker thread %i: error %ld",