mirror of
https://github.com/apache/httpd.git
synced 2025-08-07 04:02:58 +03:00
Async write completion for Event MPM
(backported from async-dev branch to 2.3 trunk) git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@327945 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
2
CHANGES
2
CHANGES
@@ -2,6 +2,8 @@
|
|||||||
Changes with Apache 2.3.0
|
Changes with Apache 2.3.0
|
||||||
[Remove entries to the current 2.0 and 2.2 section below, when backported]
|
[Remove entries to the current 2.0 and 2.2 section below, when backported]
|
||||||
|
|
||||||
|
*) Asynchronous write completion for the Event MPM. [Brian Pane]
|
||||||
|
|
||||||
*) Added an End-Of-Request bucket type. The logging of a request and
|
*) Added an End-Of-Request bucket type. The logging of a request and
|
||||||
the freeing of its pool are now done when the EOR bucket is destroyed.
|
the freeing of its pool are now done when the EOR bucket is destroyed.
|
||||||
This has the effect of delaying the logging until right after the last
|
This has the effect of delaying the logging until right after the last
|
||||||
|
@@ -122,26 +122,18 @@ static int ap_process_http_async_connection(conn_rec *c)
|
|||||||
/* process the request if it was read without error */
|
/* process the request if it was read without error */
|
||||||
|
|
||||||
ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r);
|
ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r);
|
||||||
if (r->status == HTTP_OK)
|
if (r->status == HTTP_OK) {
|
||||||
ap_process_request(r);
|
cs->state = CONN_STATE_HANDLER;
|
||||||
|
ap_process_async_request(r);
|
||||||
|
}
|
||||||
|
|
||||||
if (ap_extended_status)
|
if (ap_extended_status)
|
||||||
ap_increment_counts(c->sbh, r);
|
ap_increment_counts(c->sbh, r);
|
||||||
|
|
||||||
if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted
|
if (cs->state != CONN_STATE_WRITE_COMPLETION) {
|
||||||
|| ap_graceful_stop_signalled()) {
|
/* Something went wrong; close the connection */
|
||||||
cs->state = CONN_STATE_LINGER;
|
cs->state = CONN_STATE_LINGER;
|
||||||
}
|
}
|
||||||
else if (!c->data_in_input_filters) {
|
|
||||||
cs->state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
/* else we are pipelining. Stay in READ_REQUEST_LINE state
|
|
||||||
* and stay in the loop
|
|
||||||
*/
|
|
||||||
cs->state = CONN_STATE_READ_REQUEST_LINE;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
else { /* ap_read_request failed - client may have closed */
|
else { /* ap_read_request failed - client may have closed */
|
||||||
cs->state = CONN_STATE_LINGER;
|
cs->state = CONN_STATE_LINGER;
|
||||||
|
@@ -191,47 +191,23 @@ AP_DECLARE(void) ap_die(int type, request_rec *r)
|
|||||||
ap_send_error_response(r_1st_err, recursive_error);
|
ap_send_error_response(r_1st_err, recursive_error);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void check_pipeline_flush(conn_rec *c)
|
static void check_pipeline(conn_rec *c)
|
||||||
{
|
{
|
||||||
apr_bucket *e;
|
|
||||||
apr_bucket_brigade *bb;
|
|
||||||
|
|
||||||
/* ### if would be nice if we could PEEK without a brigade. that would
|
|
||||||
### allow us to defer creation of the brigade to when we actually
|
|
||||||
### need to send a FLUSH. */
|
|
||||||
bb = apr_brigade_create(c->pool, c->bucket_alloc);
|
|
||||||
|
|
||||||
/* Flush the filter contents if:
|
|
||||||
*
|
|
||||||
* 1) the connection will be closed
|
|
||||||
* 2) there isn't a request ready to be read
|
|
||||||
*/
|
|
||||||
/* ### is zero correct? that means "read one line" */
|
/* ### is zero correct? that means "read one line" */
|
||||||
if (c->keepalive != AP_CONN_CLOSE) {
|
if (c->keepalive != AP_CONN_CLOSE) {
|
||||||
|
apr_bucket_brigade *bb = apr_brigade_create(c->pool, c->bucket_alloc);
|
||||||
if (ap_get_brigade(c->input_filters, bb, AP_MODE_EATCRLF,
|
if (ap_get_brigade(c->input_filters, bb, AP_MODE_EATCRLF,
|
||||||
APR_NONBLOCK_READ, 0) != APR_SUCCESS) {
|
APR_NONBLOCK_READ, 0) != APR_SUCCESS) {
|
||||||
c->data_in_input_filters = 0; /* we got APR_EOF or an error */
|
c->data_in_input_filters = 0; /* we got APR_EOF or an error */
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
c->data_in_input_filters = 1;
|
c->data_in_input_filters = 1;
|
||||||
return; /* don't flush */
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
e = apr_bucket_flush_create(c->bucket_alloc);
|
|
||||||
|
|
||||||
/* We just send directly to the connection based filters. At
|
|
||||||
* this point, we know that we have seen all of the data
|
|
||||||
* (request finalization sent an EOS bucket, which empties all
|
|
||||||
* of the request filters). We just want to flush the buckets
|
|
||||||
* if something hasn't been sent to the network yet.
|
|
||||||
*/
|
|
||||||
APR_BRIGADE_INSERT_HEAD(bb, e);
|
|
||||||
ap_pass_brigade(c->output_filters, bb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ap_process_request(request_rec *r)
|
void ap_process_async_request(request_rec *r)
|
||||||
{
|
{
|
||||||
int access_status;
|
int access_status;
|
||||||
apr_bucket_brigade *bb;
|
apr_bucket_brigade *bb;
|
||||||
@@ -289,11 +265,30 @@ void ap_process_request(request_rec *r)
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
c->cs->state = CONN_STATE_WRITE_COMPLETION;
|
c->cs->state = CONN_STATE_WRITE_COMPLETION;
|
||||||
check_pipeline_flush(c);
|
check_pipeline(c);
|
||||||
if (ap_extended_status)
|
if (ap_extended_status)
|
||||||
ap_time_process_request(c->sbh, STOP_PREQUEST);
|
ap_time_process_request(c->sbh, STOP_PREQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ap_process_request(request_rec *r)
|
||||||
|
{
|
||||||
|
apr_bucket_brigade *bb;
|
||||||
|
apr_bucket *b;
|
||||||
|
conn_rec *c = r->connection;
|
||||||
|
|
||||||
|
ap_process_async_request(r);
|
||||||
|
|
||||||
|
if (!c->data_in_input_filters) {
|
||||||
|
bb = apr_brigade_create(c->pool, c->bucket_alloc);
|
||||||
|
b = apr_bucket_flush_create(c->bucket_alloc);
|
||||||
|
APR_BRIGADE_INSERT_HEAD(bb, b);
|
||||||
|
ap_pass_brigade(c->output_filters, bb);
|
||||||
|
}
|
||||||
|
if (ap_extended_status) {
|
||||||
|
ap_time_process_request(c->sbh, STOP_PREQUEST);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static apr_table_t *rename_original_env(apr_pool_t *p, apr_table_t *t)
|
static apr_table_t *rename_original_env(apr_pool_t *p, apr_table_t *t)
|
||||||
{
|
{
|
||||||
const apr_array_header_t *env_arr = apr_table_elts(t);
|
const apr_array_header_t *env_arr = apr_table_elts(t);
|
||||||
|
@@ -164,7 +164,7 @@ static int sick_child_detected;
|
|||||||
|
|
||||||
apr_thread_mutex_t *timeout_mutex;
|
apr_thread_mutex_t *timeout_mutex;
|
||||||
APR_RING_HEAD(timeout_head_t, conn_state_t);
|
APR_RING_HEAD(timeout_head_t, conn_state_t);
|
||||||
static struct timeout_head_t timeout_head;
|
static struct timeout_head_t timeout_head, keepalive_timeout_head;
|
||||||
|
|
||||||
static apr_pollset_t *event_pollset;
|
static apr_pollset_t *event_pollset;
|
||||||
|
|
||||||
@@ -592,6 +592,7 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock,
|
|||||||
pt->status = 1;
|
pt->status = 1;
|
||||||
pt->baton = cs;
|
pt->baton = cs;
|
||||||
cs->pfd.client_data = pt;
|
cs->pfd.client_data = pt;
|
||||||
|
APR_RING_ELEM_INIT(cs, timeout_list);
|
||||||
|
|
||||||
ap_update_vhost_given_ip(c);
|
ap_update_vhost_given_ip(c);
|
||||||
|
|
||||||
@@ -621,8 +622,10 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock,
|
|||||||
else {
|
else {
|
||||||
c = cs->c;
|
c = cs->c;
|
||||||
c->sbh = sbh;
|
c->sbh = sbh;
|
||||||
|
pt = cs->pfd.client_data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
read_request:
|
||||||
if (cs->state == CONN_STATE_READ_REQUEST_LINE) {
|
if (cs->state == CONN_STATE_READ_REQUEST_LINE) {
|
||||||
if (!c->aborted) {
|
if (!c->aborted) {
|
||||||
ap_run_process_connection(c);
|
ap_run_process_connection(c);
|
||||||
@@ -636,6 +639,49 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock,
|
|||||||
cs->state = CONN_STATE_LINGER;
|
cs->state = CONN_STATE_LINGER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (cs->state == CONN_STATE_WRITE_COMPLETION) {
|
||||||
|
/* For now, do blocking writes in this thread to transfer the
|
||||||
|
* rest of the response. TODO: Hand off this connection to a
|
||||||
|
* pollset for asynchronous write completion.
|
||||||
|
*/
|
||||||
|
ap_filter_t *output_filter = c->output_filters;
|
||||||
|
apr_status_t rv;
|
||||||
|
while (output_filter->next != NULL) {
|
||||||
|
output_filter = output_filter->next;
|
||||||
|
}
|
||||||
|
rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
|
||||||
|
if (rv != APR_SUCCESS) {
|
||||||
|
ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf,
|
||||||
|
"network write failure in core output filter");
|
||||||
|
cs->state = CONN_STATE_LINGER;
|
||||||
|
}
|
||||||
|
else if (c->data_in_output_filters) {
|
||||||
|
/* Still in WRITE_COMPLETION_STATE:
|
||||||
|
* Set a write timeout for this connection, and let the
|
||||||
|
* event thread poll for writeability.
|
||||||
|
*/
|
||||||
|
cs->expiration_time = ap_server_conf->timeout + time_now;
|
||||||
|
apr_thread_mutex_lock(timeout_mutex);
|
||||||
|
APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
|
||||||
|
apr_thread_mutex_unlock(timeout_mutex);
|
||||||
|
pt->status = 0;
|
||||||
|
cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR;
|
||||||
|
rc = apr_pollset_add(event_pollset, &cs->pfd);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
|
||||||
|
ap_graceful_stop_signalled()) {
|
||||||
|
c->cs->state = CONN_STATE_LINGER;
|
||||||
|
}
|
||||||
|
else if (c->data_in_input_filters) {
|
||||||
|
cs->state = CONN_STATE_READ_REQUEST_LINE;
|
||||||
|
goto read_request;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
cs->state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (cs->state == CONN_STATE_LINGER) {
|
if (cs->state == CONN_STATE_LINGER) {
|
||||||
ap_lingering_close(c);
|
ap_lingering_close(c);
|
||||||
@@ -658,11 +704,12 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock,
|
|||||||
*/
|
*/
|
||||||
cs->expiration_time = ap_server_conf->keep_alive_timeout + time_now;
|
cs->expiration_time = ap_server_conf->keep_alive_timeout + time_now;
|
||||||
apr_thread_mutex_lock(timeout_mutex);
|
apr_thread_mutex_lock(timeout_mutex);
|
||||||
APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
|
APR_RING_INSERT_TAIL(&keepalive_timeout_head, cs, conn_state_t, timeout_list);
|
||||||
apr_thread_mutex_unlock(timeout_mutex);
|
apr_thread_mutex_unlock(timeout_mutex);
|
||||||
|
|
||||||
pt->status = 0;
|
pt->status = 0;
|
||||||
/* Add work to pollset. These are always read events */
|
/* Add work to pollset. */
|
||||||
|
cs->pfd.reqevents = APR_POLLIN;
|
||||||
rc = apr_pollset_add(event_pollset, &cs->pfd);
|
rc = apr_pollset_add(event_pollset, &cs->pfd);
|
||||||
|
|
||||||
if (rc != APR_SUCCESS) {
|
if (rc != APR_SUCCESS) {
|
||||||
@@ -839,11 +886,12 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
|
|||||||
}
|
}
|
||||||
|
|
||||||
APR_RING_INIT(&timeout_head, conn_state_t, timeout_list);
|
APR_RING_INIT(&timeout_head, conn_state_t, timeout_list);
|
||||||
|
APR_RING_INIT(&keepalive_timeout_head, conn_state_t, timeout_list);
|
||||||
|
|
||||||
/* Create the main pollset */
|
/* Create the main pollset */
|
||||||
rc = apr_pollset_create(&event_pollset,
|
rc = apr_pollset_create(&event_pollset,
|
||||||
ap_threads_per_child,
|
ap_threads_per_child,
|
||||||
tpool, APR_POLLSET_THREADSAFE);
|
tpool, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
|
||||||
if (rc != APR_SUCCESS) {
|
if (rc != APR_SUCCESS) {
|
||||||
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
|
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
|
||||||
"apr_pollset_create with Thread Safety failed. "
|
"apr_pollset_create with Thread Safety failed. "
|
||||||
@@ -853,19 +901,19 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (lr = ap_listeners; lr != NULL; lr = lr->next) {
|
for (lr = ap_listeners; lr != NULL; lr = lr->next) {
|
||||||
apr_pollfd_t pfd = { 0 };
|
apr_pollfd_t *pfd = apr_palloc(tpool, sizeof(*pfd));
|
||||||
pt = apr_pcalloc(tpool, sizeof(*pt));
|
pt = apr_pcalloc(tpool, sizeof(*pt));
|
||||||
pfd.desc_type = APR_POLL_SOCKET;
|
pfd->desc_type = APR_POLL_SOCKET;
|
||||||
pfd.desc.s = lr->sd;
|
pfd->desc.s = lr->sd;
|
||||||
pfd.reqevents = APR_POLLIN;
|
pfd->reqevents = APR_POLLIN;
|
||||||
|
|
||||||
pt->type = PT_ACCEPT;
|
pt->type = PT_ACCEPT;
|
||||||
pt->baton = lr;
|
pt->baton = lr;
|
||||||
|
|
||||||
pfd.client_data = pt;
|
pfd->client_data = pt;
|
||||||
|
|
||||||
apr_socket_opt_set(pfd.desc.s, APR_SO_NONBLOCK, 1);
|
apr_socket_opt_set(pfd->desc.s, APR_SO_NONBLOCK, 1);
|
||||||
apr_pollset_add(event_pollset, &pfd);
|
apr_pollset_add(event_pollset, pfd);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Unblock the signal used to wake this thread up, and set a handler for
|
/* Unblock the signal used to wake this thread up, and set a handler for
|
||||||
@@ -907,6 +955,8 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
|
|||||||
case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
|
case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
|
||||||
cs->state = CONN_STATE_READ_REQUEST_LINE;
|
cs->state = CONN_STATE_READ_REQUEST_LINE;
|
||||||
break;
|
break;
|
||||||
|
case CONN_STATE_WRITE_COMPLETION:
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
ap_log_error(APLOG_MARK, APLOG_ERR, rc,
|
ap_log_error(APLOG_MARK, APLOG_ERR, rc,
|
||||||
ap_server_conf,
|
ap_server_conf,
|
||||||
@@ -918,6 +968,7 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
|
|||||||
apr_thread_mutex_lock(timeout_mutex);
|
apr_thread_mutex_lock(timeout_mutex);
|
||||||
APR_RING_REMOVE(cs, timeout_list);
|
APR_RING_REMOVE(cs, timeout_list);
|
||||||
apr_thread_mutex_unlock(timeout_mutex);
|
apr_thread_mutex_unlock(timeout_mutex);
|
||||||
|
APR_RING_ELEM_INIT(cs, timeout_list);
|
||||||
|
|
||||||
rc = push2worker(out_pfd, event_pollset);
|
rc = push2worker(out_pfd, event_pollset);
|
||||||
if (rc != APR_SUCCESS) {
|
if (rc != APR_SUCCESS) {
|
||||||
@@ -1002,9 +1053,10 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
|
|||||||
/* handle timed out sockets */
|
/* handle timed out sockets */
|
||||||
apr_thread_mutex_lock(timeout_mutex);
|
apr_thread_mutex_lock(timeout_mutex);
|
||||||
|
|
||||||
cs = APR_RING_FIRST(&timeout_head);
|
/* Step 1: keepalive timeouts */
|
||||||
|
cs = APR_RING_FIRST(&keepalive_timeout_head);
|
||||||
timeout_time = time_now + TIMEOUT_FUDGE_FACTOR;
|
timeout_time = time_now + TIMEOUT_FUDGE_FACTOR;
|
||||||
while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
|
while (!APR_RING_EMPTY(&keepalive_timeout_head, conn_state_t, timeout_list)
|
||||||
&& cs->expiration_time < timeout_time
|
&& cs->expiration_time < timeout_time
|
||||||
&& get_worker(&have_idle_worker)) {
|
&& get_worker(&have_idle_worker)) {
|
||||||
|
|
||||||
@@ -1023,8 +1075,25 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
|
|||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
have_idle_worker = 0;
|
have_idle_worker = 0;
|
||||||
|
cs = APR_RING_FIRST(&keepalive_timeout_head);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Step 2: write completion timeouts */
|
||||||
|
cs = APR_RING_FIRST(&timeout_head);
|
||||||
|
while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
|
||||||
|
&& cs->expiration_time < timeout_time
|
||||||
|
&& get_worker(&have_idle_worker)) {
|
||||||
|
|
||||||
|
cs->state = CONN_STATE_LINGER;
|
||||||
|
APR_RING_REMOVE(cs, timeout_list);
|
||||||
|
rc = push2worker(&cs->pfd, event_pollset);
|
||||||
|
if (rc != APR_SUCCESS) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
have_idle_worker = 0;
|
||||||
cs = APR_RING_FIRST(&timeout_head);
|
cs = APR_RING_FIRST(&timeout_head);
|
||||||
}
|
}
|
||||||
|
|
||||||
apr_thread_mutex_unlock(timeout_mutex);
|
apr_thread_mutex_unlock(timeout_mutex);
|
||||||
|
|
||||||
} /* listener main loop */
|
} /* listener main loop */
|
||||||
@@ -2132,7 +2201,7 @@ static int worker_pre_config(apr_pool_t * pconf, apr_pool_t * plog,
|
|||||||
if (restart_num++ == 1) {
|
if (restart_num++ == 1) {
|
||||||
is_graceful = 0;
|
is_graceful = 0;
|
||||||
rv = apr_pollset_create(&event_pollset, 1, plog,
|
rv = apr_pollset_create(&event_pollset, 1, plog,
|
||||||
APR_POLLSET_THREADSAFE);
|
APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
|
||||||
if (rv != APR_SUCCESS) {
|
if (rv != APR_SUCCESS) {
|
||||||
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
|
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
|
||||||
"Couldn't create a Thread Safe Pollset. "
|
"Couldn't create a Thread Safe Pollset. "
|
||||||
|
Reference in New Issue
Block a user