1
0
mirror of https://github.com/apache/httpd.git synced 2026-01-26 19:01:35 +03:00

Replaced the mutex around the idle worker stack with

atomic compare-and-swap loops


git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@94705 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brian Pane
2002-04-19 06:33:08 +00:00
parent 1826f624eb
commit 49098fff91

View File

@@ -72,6 +72,7 @@
#include "apr_thread_cond.h"
#include "apr_thread_mutex.h"
#include "apr_proc_mutex.h"
#include "apr_atomic.h"
#define APR_WANT_STRFUNC
#include "apr_want.h"
@@ -172,6 +173,8 @@ static int requests_this_child;
static int num_listensocks = 0;
static int resource_shortage = 0;
typedef struct worker_wakeup_info worker_wakeup_info;
/* The structure used to pass unique initialization info to each thread */
typedef struct {
int pid;
@@ -243,102 +246,147 @@ static apr_proc_mutex_t *accept_mutex;
/* Structure used to wake up an idle worker thread
*/
typedef struct {
struct worker_wakeup_info {
apr_thread_cond_t *cond;
apr_thread_mutex_t *mutex;
} worker_wakeup_info;
apr_uint32_t next; /* index into worker_wakeups array,
* used to build a linked list
*/
};
static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool)
{
apr_status_t rv;
worker_wakeup_info *wakeup;
wakeup = (worker_wakeup_info *)apr_palloc(pool, sizeof(*wakeup));
if ((rv = apr_thread_cond_create(&wakeup->cond, pool)) != APR_SUCCESS) {
return NULL;
}
if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
pool)) != APR_SUCCESS) {
return NULL;
}
/* The wakeup's mutex will be unlocked automatically when
* the worker blocks on the condition variable
*/
apr_thread_mutex_lock(wakeup->mutex);
return wakeup;
}
/* Structure used to hold a stack of idle worker threads
*/
typedef struct {
apr_thread_mutex_t *mutex;
int no_listener;
worker_wakeup_info **stack;
apr_size_t nelts;
apr_size_t nalloc;
/* 'state' consists of several fields concatenated into a
* single 32-bit int for use with the apr_atomic_cas() API:
* state & STACK_FIRST is the thread ID of the first thread
* in a linked list of idle threads
* state & STACK_TERMINATED indicates whether the proc is shutting down
* state & STACK_NO_LISTENER indicates whether the process has
* no current listener thread
*/
apr_uint32_t state;
} worker_stack;
#define STACK_FIRST 0xffff
#define STACK_LIST_END 0xffff
#define STACK_TERMINATED 0x10000
#define STACK_NO_LISTENER 0x20000
static worker_wakeup_info **worker_wakeups = NULL;
static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max)
{
apr_status_t rv;
worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack));
if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT,
pool)) != APR_SUCCESS) {
return NULL;
}
stack->no_listener = 1;
stack->nelts = 0;
stack->nalloc = max;
stack->stack =
(worker_wakeup_info **)apr_palloc(pool, stack->nalloc *
sizeof(worker_wakeup_info *));
stack->state = STACK_NO_LISTENER | STACK_LIST_END;
return stack;
}
static apr_status_t worker_stack_wait(worker_stack *stack,
worker_wakeup_info *wakeup)
apr_uint32_t worker_id)
{
apr_status_t rv;
if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
return rv;
}
if (stack->no_listener) {
/* this thread should become the new listener immediately */
stack->no_listener = 0;
if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
return rv;
worker_wakeup_info *wakeup = worker_wakeups[worker_id];
while (1) {
apr_uint32_t state = stack->state;
if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) {
if (state & STACK_TERMINATED) {
return APR_EINVAL;
}
if (apr_atomic_cas(&(stack->state), STACK_LIST_END, state) !=
state) {
continue;
}
else {
return APR_SUCCESS;
}
}
return APR_SUCCESS;
}
else {
/* push this thread onto the stack of idle workers, and block
* on the condition variable until awoken
*/
if (stack->nelts == stack->nalloc) {
return APR_ENOSPC;
wakeup->next = state;
if (apr_atomic_cas(&(stack->state), worker_id, state) != state) {
continue;
}
stack->stack[stack->nelts++] = wakeup;
if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
return rv;
else {
return apr_thread_cond_wait(wakeup->cond, wakeup->mutex);
}
if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) !=
APR_SUCCESS) {
return rv;
}
return APR_SUCCESS;
}
}
}
static apr_status_t worker_stack_awaken_next(worker_stack *stack)
{
while (1) {
apr_uint32_t state = stack->state;
apr_uint32_t first = state & STACK_FIRST;
if (first == STACK_LIST_END) {
if (apr_atomic_cas(&(stack->state), state | STACK_NO_LISTENER,
state) != state) {
continue;
}
else {
return APR_SUCCESS;
}
}
else {
worker_wakeup_info *wakeup = worker_wakeups[first];
apr_uint32_t new_state = state & ~STACK_FIRST;
new_state |= wakeup->next;
if (apr_atomic_cas(&(stack->state), new_state, state) != state) {
continue;
}
else {
/* Acquire and release the idle worker's mutex to ensure
* that it's actually waiting on its condition variable
*/
apr_status_t rv;
if ((rv = apr_thread_mutex_lock(wakeup->mutex)) !=
APR_SUCCESS) {
return rv;
}
if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) !=
APR_SUCCESS) {
return rv;
}
return apr_thread_cond_signal(wakeup->cond);
}
}
}
}
static apr_status_t worker_stack_term(worker_stack *stack)
{
int i;
apr_status_t rv;
if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
return rv;
}
if (stack->nelts) {
worker_wakeup_info *wakeup = stack->stack[--stack->nelts];
if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
return rv;
}
/* Acquire and release the idle worker's mutex to ensure
* that it's actually waiting on its condition variable
*/
if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != APR_SUCCESS) {
return rv;
}
if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != APR_SUCCESS) {
return rv;
}
apr_thread_mutex_unlock(wakeup->mutex);
if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) {
apr_thread_mutex_unlock(stack->mutex);
return rv;
while (1) {
apr_uint32_t state = stack->state;
if (apr_atomic_cas(&(stack->state), state | STACK_TERMINATED,
state) == state) {
break;
}
}
else {
stack->no_listener = 1;
if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
for (i = 0; i < ap_threads_per_child; i++) {
if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) {
return rv;
}
}
@@ -355,16 +403,12 @@ static int terminate_mode = ST_INIT;
static void signal_threads(int mode)
{
int i;
if (terminate_mode == mode) {
return;
}
terminate_mode = mode;
workers_may_exit = 1;
for (i = 0; i < ap_threads_per_child; i++) {
(void)worker_stack_awaken_next(idle_worker_stack);
}
worker_stack_term(idle_worker_stack);
}
AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
@@ -726,6 +770,7 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
proc_info * ti = dummy;
int process_slot = ti->pid;
int thread_slot = ti->tid;
apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid);
apr_pool_t *tpool = apr_thread_pool_get(thd);
void *csd = NULL;
apr_allocator_t *allocator;
@@ -735,7 +780,6 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
apr_pollfd_t *pollset;
apr_status_t rv;
ap_listen_rec *lr, *last_lr = ap_listeners;
worker_wakeup_info *wakeup;
int is_listener;
ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL);
@@ -747,24 +791,6 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
apr_allocator_set_owner(allocator, ptrans);
bucket_alloc = apr_bucket_alloc_create(tpool);
wakeup = (worker_wakeup_info *)apr_palloc(tpool, sizeof(*wakeup));
if ((rv = apr_thread_cond_create(&wakeup->cond, tpool)) != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
"apr_thread_cond_create failed. Attempting to shutdown "
"process gracefully.");
signal_threads(ST_GRACEFUL);
goto done;
}
if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
tpool)) != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
"apr_thread_mutex_create failed. Attempting to shutdown "
"process gracefully.");
signal_threads(ST_GRACEFUL);
goto done;
}
apr_thread_mutex_lock(wakeup->mutex);
apr_poll_setup(&pollset, num_listensocks, tpool);
for(lr = ap_listeners ; lr != NULL ; lr = lr->next)
apr_poll_socket_add(pollset, lr->sd, APR_POLLIN);
@@ -778,10 +804,12 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
SERVER_READY, NULL);
if (!is_listener) {
/* Wait until it's our turn to become the listener */
if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) !=
if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) !=
APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
"worker_stack_wait failed. Shutting down");
if (rv != APR_EINVAL) {
ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
"worker_stack_wait failed. Shutting down");
}
break;
}
if (workers_may_exit) {
@@ -902,7 +930,8 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
}
}
done:
workers_may_exit = 1;
worker_stack_term(idle_worker_stack);
dying = 1;
ap_scoreboard_image->parent[process_slot].quiescing = 1;
@@ -951,15 +980,27 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy)
clean_child_exit(APEXIT_CHILDFATAL);
}
worker_wakeups = (worker_wakeup_info **)
apr_palloc(pchild, sizeof(worker_wakeup_info *) *
ap_threads_per_child);
loops = prev_threads_created = 0;
while (1) {
for (i = 0; i < ap_threads_per_child; i++) {
int status = ap_scoreboard_image->servers[child_num_arg][i].status;
worker_wakeup_info *wakeup;
if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
continue;
}
wakeup = worker_wakeup_create(pchild);
if (wakeup == NULL) {
ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0,
ap_server_conf, "worker_wakeup_create failed");
clean_child_exit(APEXIT_CHILDFATAL);
}
worker_wakeups[threads_created] = wakeup;
my_info = (proc_info *)malloc(sizeof(proc_info));
if (my_info == NULL) {
ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf,