1
0
mirror of https://github.com/lammertb/libhttp.git synced 2025-12-22 04:02:04 +03:00

Use eventfd instead of pthread_cond (step 1/?)

This commit is contained in:
bel
2016-07-03 22:24:51 +02:00
parent 435c800467
commit 7023ea5643
2 changed files with 120 additions and 40 deletions

View File

@@ -1575,9 +1575,60 @@ typedef struct tagTHREADNAME_INFO {
DWORD dwFlags; /* Reserved for future use, must be zero. */ DWORD dwFlags; /* Reserved for future use, must be zero. */
} THREADNAME_INFO; } THREADNAME_INFO;
#pragma pack(pop) #pragma pack(pop)
#elif defined(__linux__) #elif defined(__linux__)
#include <sys/prctl.h> #include <sys/prctl.h>
#include <sys/sendfile.h> #include <sys/sendfile.h>
#include <sys/eventfd.h>
static int
event_create(void)
{
int ret = eventfd(0, EFD_CLOEXEC);
if (ret == -1) {
/* Linux uses -1 on error, Windows NULL. */
/* However, Linux does not return 0 on success either. */
return 0;
}
return ret;
}
static int
event_wait(int eventhdl)
{
uint64_t u;
int s = read(eventhdl, &u, sizeof(u));
if (s != sizeof(uint64_t)) {
/* error */
return 0;
}
(void)u; /* the value is not required */
return 1;
}
static int
event_signal(int eventhdl)
{
uint64_t u = 1;
int s = write(eventhdl, &u, sizeof(u));
if (s != sizeof(uint64_t)) {
/* error */
return 0;
}
return 1;
}
static void
event_destroy(int eventhdl)
{
close(eventhdl);
}
#endif #endif
@@ -3001,6 +3052,35 @@ pthread_cond_destroy(pthread_cond_t *cv)
} }
static int
event_create(void)
{
return (int)CreateEvent(NULL, FALSE, FALSE, NULL);
}
static int
event_wait(int eventhdl)
{
int res = WaitForSingleObject((HANDLE)eventhdl, INFINITE);
return (res == WAIT_OBJECT_0);
}
static int
event_signal(int eventhdl, unsigned timeout)
{
return (int)SetEvent((HANDLE)eventhdl);
}
static void
event_destroy(int eventhdl)
{
CloseHandle((HANDLE)eventhdl);
}
#if defined(__MINGW32__) #if defined(__MINGW32__)
/* Enable unused function warning again */ /* Enable unused function warning again */
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
@@ -12573,6 +12653,35 @@ consume_socket(struct mg_context *ctx, struct socket *sp)
} }
/* Master thread adds accepted socket to a queue */
static void
produce_socket(struct mg_context *ctx, const struct socket *sp)
{
#define QUEUE_SIZE(ctx) ((int)(ARRAY_SIZE(ctx->queue)))
if (!ctx) {
return;
}
(void)pthread_mutex_lock(&ctx->thread_mutex);
/* If the queue is full, wait */
while (ctx->stop_flag == 0
&& ctx->sq_head - ctx->sq_tail >= QUEUE_SIZE(ctx)) {
(void)pthread_cond_wait(&ctx->sq_empty, &ctx->thread_mutex);
}
if (ctx->sq_head - ctx->sq_tail < QUEUE_SIZE(ctx)) {
/* Copy socket to the queue and increment head */
ctx->queue[ctx->sq_head % QUEUE_SIZE(ctx)] = *sp;
ctx->sq_head++;
DEBUG_TRACE("queued socket %d", sp ? sp->sock : -1);
}
(void)pthread_cond_signal(&ctx->sq_full);
(void)pthread_mutex_unlock(&ctx->thread_mutex);
#undef QUEUE_SIZE
}
static void * static void *
worker_thread_run(void *thread_func_param) worker_thread_run(void *thread_func_param)
{ {
@@ -12687,35 +12796,6 @@ worker_thread(void *thread_func_param)
#endif /* _WIN32 */ #endif /* _WIN32 */
/* Master thread adds accepted socket to a queue */
static void
produce_socket(struct mg_context *ctx, const struct socket *sp)
{
#define QUEUE_SIZE(ctx) ((int)(ARRAY_SIZE(ctx->queue)))
if (!ctx) {
return;
}
(void)pthread_mutex_lock(&ctx->thread_mutex);
/* If the queue is full, wait */
while (ctx->stop_flag == 0
&& ctx->sq_head - ctx->sq_tail >= QUEUE_SIZE(ctx)) {
(void)pthread_cond_wait(&ctx->sq_empty, &ctx->thread_mutex);
}
if (ctx->sq_head - ctx->sq_tail < QUEUE_SIZE(ctx)) {
/* Copy socket to the queue and increment head */
ctx->queue[ctx->sq_head % QUEUE_SIZE(ctx)] = *sp;
ctx->sq_head++;
DEBUG_TRACE("queued socket %d", sp ? sp->sock : -1);
}
(void)pthread_cond_signal(&ctx->sq_full);
(void)pthread_mutex_unlock(&ctx->thread_mutex);
#undef QUEUE_SIZE
}
static void static void
accept_new_connection(const struct socket *listener, struct mg_context *ctx) accept_new_connection(const struct socket *listener, struct mg_context *ctx)
{ {
@@ -12767,15 +12847,12 @@ accept_new_connection(const struct socket *listener, struct mg_context *ctx)
strerror(ERRNO)); strerror(ERRNO));
} }
/* Disable TCP Nagle's algorithm. Normally TCP packets are coalesced
/* Disable TCP Nagle's algorithm. Normally TCP packets are
* coalesced
* to effectively fill up the underlying IP packet payload and * to effectively fill up the underlying IP packet payload and
* reduce * reduce the overhead of sending lots of small buffers. However
* the overhead of sending lots of small buffers. However this hurts * this hurts the server's throughput (ie. operations per second)
* the server's throughput (ie. operations per second) when HTTP 1.1 * when HTTP 1.1 persistent connections are used and the responses
* persistent connections are used and the responses are relatively * are relatively small (eg. less than 1400 bytes).
* small (eg. less than 1400 bytes).
*/ */
if ((ctx != NULL) && (ctx->config[CONFIG_TCP_NODELAY] != NULL) if ((ctx != NULL) && (ctx->config[CONFIG_TCP_NODELAY] != NULL)
&& (!strcmp(ctx->config[CONFIG_TCP_NODELAY], "1"))) { && (!strcmp(ctx->config[CONFIG_TCP_NODELAY], "1"))) {
@@ -13021,8 +13098,13 @@ mg_stop(struct mg_context *ctx)
} }
ctx->masterthreadid = 0; ctx->masterthreadid = 0;
/* Set stop flag, so all threads know they have to exit. */
ctx->stop_flag = 1; ctx->stop_flag = 1;
/* TODO: close all socket handles (will avoid SOCKET_TIMEOUT_QUANTUM) */
/* Wait until mg_fini() stops */ /* Wait until mg_fini() stops */
while (ctx->stop_flag != 2) { while (ctx->stop_flag != 2) {
(void)mg_sleep(10); (void)mg_sleep(10);

View File

@@ -1449,7 +1449,6 @@ lua_error_handler(lua_State *L)
static void * static void *
lua_allocator(void *ud, void *ptr, size_t osize, size_t nsize) lua_allocator(void *ud, void *ptr, size_t osize, size_t nsize)
{ {
(void)ud; (void)ud;
(void)osize; /* not used */ (void)osize; /* not used */
@@ -1610,7 +1609,6 @@ handle_lsp_request(struct mg_connection *conn,
filep->size, filep->size,
L); L);
cleanup_handle_lsp_request: cleanup_handle_lsp_request:
if (L != NULL && ls == NULL) if (L != NULL && ls == NULL)