diff --git a/CHANGES b/CHANGES index 5ec96fb5f1..e05d90d890 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,12 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_http2: elimination of fixed master connectin buffer for TLS + connections. New scratch bucket handling optimized for TLS write sizes. + File bucket data read directly into scratch buffers, avoiding one + copy. Non-TLS connections continue to pass buckets unchanged to the core + filters to allow sendfile() usage. + *) mod_http2/mod_proxy_http2: h2_request.c is no longer shared between these modules. This simplifies building on platforms such as Windows, as module reference used in logging is now clear. diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index d21ae8b959..c1120740bf 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -127,23 +127,13 @@ static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, } apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, - const h2_config *cfg, - apr_pool_t *pool) + const h2_config *cfg) { io->c = c; io->output = apr_brigade_create(c->pool, c->bucket_alloc); - io->buflen = 0; io->is_tls = h2_h2_is_tls(c); io->buffer_output = io->is_tls; - if (io->buffer_output) { - io->bufsize = WRITE_BUFFER_SIZE; - io->buffer = apr_pcalloc(pool, io->bufsize); - } - else { - io->bufsize = 0; - } - if (io->is_tls) { /* This is what we start with, * see https://issues.apache.org/jira/browse/TS-2503 @@ -151,12 +141,13 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, io->warmup_size = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE); io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS) * APR_USEC_PER_SEC); - io->write_size = WRITE_SIZE_INITIAL; + io->write_size = (io->cooldown_usecs > 0? + WRITE_SIZE_INITIAL : WRITE_SIZE_MAX); } else { io->warmup_size = 0; io->cooldown_usecs = 0; - io->write_size = io->bufsize; + io->write_size = 0; } if (APLOGctrace1(c)) { @@ -170,9 +161,95 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, return APR_SUCCESS; } +#define LOG_SCRATCH 0 + +static void append_scratch(h2_conn_io *io) +{ + if (io->scratch && io->slen > 0) { + apr_bucket *b = apr_bucket_heap_create(io->scratch, io->slen, + apr_bucket_free, + io->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(io->output, b); +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): append_scratch(%ld)", + io->c->id, (long)io->slen); +#endif + io->scratch = NULL; + io->slen = io->ssize = 0; + } +} + +static apr_size_t assure_scratch_space(h2_conn_io *io) { + apr_size_t remain = io->ssize - io->slen; + if (io->scratch && remain == 0) { + append_scratch(io); + } + if (!io->scratch) { + /* we control the size and it is larger than what buckets usually + * allocate. */ + io->scratch = apr_bucket_alloc(io->write_size, io->c->bucket_alloc); + io->ssize = io->write_size; + io->slen = 0; + remain = io->ssize; + } + return remain; +} + +static apr_status_t read_to_scratch(h2_conn_io *io, apr_bucket *b) +{ + apr_status_t status; + const char *data; + apr_size_t len; + + if (!b->length) { + return APR_SUCCESS; + } + + AP_DEBUG_ASSERT(b->length <= (io->ssize - io->slen)); + if (APR_BUCKET_IS_FILE(b)) { + apr_bucket_file *f = (apr_bucket_file *)b->data; + apr_file_t *fd = f->fd; + apr_off_t offset = b->start; + apr_size_t len = b->length; + + /* file buckets will either mmap (which we do not want) or + * read 8000 byte chunks and split themself. However, we do + * know *exactly* how many bytes we need where. + */ + status = apr_file_seek(fd, APR_SET, &offset); + if (status != APR_SUCCESS) { + return status; + } + status = apr_file_read(fd, io->scratch + io->slen, &len); +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, status, io->c, + "h2_conn_io(%ld): FILE_to_scratch(%ld)", + io->c->id, (long)len); +#endif + if (status != APR_SUCCESS && status != APR_EOF) { + return status; + } + io->slen += len; + } + else { + status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); + if (status == APR_SUCCESS) { +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): read_to_scratch(%ld)", + io->c->id, (long)b->length); +#endif + memcpy(io->scratch+io->slen, data, len); + io->slen += len; + } + } + return status; +} + int h2_conn_io_is_buffered(h2_conn_io *io) { - return io->bufsize > 0; + return io->buffer_output; } typedef struct { @@ -208,16 +285,8 @@ static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx) return status; } -/* Bring the current buffer content into the output brigade, appropriately - * chunked. - */ -static apr_status_t bucketeer_buffer(h2_conn_io *io) +static void check_write_size(h2_conn_io *io) { - const char *data = io->buffer; - apr_size_t remaining = io->buflen; - apr_bucket *b; - int bcount, i; - if (io->write_size > WRITE_SIZE_INITIAL && (io->cooldown_usecs > 0) && (apr_time_now() - io->last_write) >= io->cooldown_usecs) { @@ -236,32 +305,6 @@ static apr_status_t bucketeer_buffer(h2_conn_io *io) "h2_conn_io(%ld): threshold reached, write size now %ld", (long)io->c->id, (long)io->write_size); } - - bcount = (int)(remaining / io->write_size); - for (i = 0; i < bcount; ++i) { - b = apr_bucket_transient_create(data, io->write_size, - io->output->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(io->output, b); - data += io->write_size; - remaining -= io->write_size; - } - - if (remaining > 0) { - b = apr_bucket_transient_create(data, remaining, - io->output->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(io->output, b); - } - return APR_SUCCESS; -} - -apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush) -{ - APR_BRIGADE_INSERT_TAIL(io->output, b); - if (flush) { - b = apr_bucket_flush_create(io->c->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(io->output, b); - } - return APR_SUCCESS; } static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc) @@ -269,17 +312,10 @@ static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc) pass_out_ctx ctx; apr_bucket *b; - if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) { + append_scratch(io); + if (APR_BRIGADE_EMPTY(io->output)) { return APR_SUCCESS; } - - if (io->buflen > 0) { - /* something in the buffer, put it in the output brigade */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, - "h2_conn_io: flush, flushing %ld bytes", - (long)io->buflen); - bucketeer_buffer(io); - } if (flush) { b = apr_bucket_flush_create(io->c->bucket_alloc); @@ -287,7 +323,6 @@ static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc) } ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush"); - io->buflen = 0; ctx.c = io->c; ctx.io = eoc? NULL : io; @@ -307,10 +342,9 @@ apr_status_t h2_conn_io_consider_pass(h2_conn_io *io) if (!APR_BRIGADE_EMPTY(io->output)) { len = h2_brigade_mem_size(io->output); - } - len += io->buflen; - if (len >= WRITE_BUFFER_SIZE) { - return h2_conn_io_flush_int(io, 1, 0); + if (len >= WRITE_BUFFER_SIZE) { + return h2_conn_io_flush_int(io, 1, 0); + } } return APR_SUCCESS; } @@ -322,48 +356,98 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session) return h2_conn_io_flush_int(io, 1, 1); } -apr_status_t h2_conn_io_write(h2_conn_io *io, - const char *buf, size_t length) +apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length) { apr_status_t status = APR_SUCCESS; - pass_out_ctx ctx; + apr_size_t remain; - ctx.c = io->c; - ctx.io = io; - if (io->bufsize > 0) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, - "h2_conn_io: buffering %ld bytes", (long)length); - - if (!APR_BRIGADE_EMPTY(io->output)) { - status = h2_conn_io_flush_int(io, 0, 0); - } - - while (length > 0 && (status == APR_SUCCESS)) { - apr_size_t avail = io->bufsize - io->buflen; - if (avail <= 0) { - status = h2_conn_io_flush_int(io, 0, 0); - } - else if (length > avail) { - memcpy(io->buffer + io->buflen, buf, avail); - io->buflen += avail; - length -= avail; - buf += avail; + if (io->buffer_output) { + while (length > 0) { + remain = assure_scratch_space(io); + if (remain >= length) { +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): write_to_scratch(%ld)", + io->c->id, (long)length); +#endif + memcpy(io->scratch + io->slen, data, length); + io->slen += length; + length = 0; } else { - memcpy(io->buffer + io->buflen, buf, length); - io->buflen += length; - length = 0; - break; +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): write_to_scratch(%ld)", + io->c->id, (long)remain); +#endif + memcpy(io->scratch + io->slen, data, remain); + io->slen += remain; + data += remain; + length -= remain; } } - } else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c, - "h2_conn_io: writing %ld bytes to brigade", (long)length); - status = apr_brigade_write(io->output, pass_out, &ctx, buf, length); + status = apr_brigade_write(io->output, NULL, NULL, data, length); + } + return status; +} + +apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb) +{ + apr_bucket *b; + apr_status_t status = APR_SUCCESS; + + check_write_size(io); + while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) { + b = APR_BRIGADE_FIRST(bb); + + if (APR_BUCKET_IS_METADATA(b)) { + /* need to finish any open scratch bucket, as meta data + * needs to be forward "in order". */ + append_scratch(io); + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(io->output, b); + + if (APR_BUCKET_IS_FLUSH(b)) { + status = h2_conn_io_flush_int(io, 0, 0); + } + } + else if (io->buffer_output) { + apr_size_t remain = assure_scratch_space(io); + if (b->length > remain) { + apr_bucket_split(b, remain); + if (io->slen == 0) { + /* complete write_size bucket, append unchanged */ + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(io->output, b); +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): pass bucket(%ld)", + io->c->id, (long)b->length); +#endif + continue; + } + } + else { + /* bucket fits in remain, copy to scratch */ + read_to_scratch(io, b); + apr_bucket_delete(b); + continue; + } + } + else { + /* no buffering, forward buckets setaside on flush */ + if (APR_BUCKET_IS_TRANSIENT(b)) { + apr_bucket_setaside(b, io->c->pool); + } + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(io->output, b); + } + } + if (status == APR_SUCCESS) { + return h2_conn_io_consider_pass(io); } - return status; } diff --git a/modules/http2/h2_conn_io.h b/modules/http2/h2_conn_io.h index c397e9f608..f1d877a3f6 100644 --- a/modules/http2/h2_conn_io.h +++ b/modules/http2/h2_conn_io.h @@ -39,14 +39,13 @@ typedef struct { apr_int64_t bytes_written; int buffer_output; - char *buffer; - apr_size_t buflen; - apr_size_t bufsize; + char *scratch; + apr_size_t ssize; + apr_size_t slen; } h2_conn_io; apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, - const struct h2_config *cfg, - apr_pool_t *pool); + const struct h2_config *cfg); int h2_conn_io_is_buffered(h2_conn_io *io); @@ -59,12 +58,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *buf, size_t length); -/** - * Append a bucket to the buffered output. - * @param io the connection io - * @param b the bucket to append - */ -apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush); +apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb); /** * Append an End-Of-Connection bucket to the output that, once destroyed, diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 9298592b56..9c8498e62e 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -167,6 +167,7 @@ static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) } static void have_out_data_for(h2_mplx *m, int stream_id); +static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master); static void check_tx_reservation(h2_mplx *m) { @@ -193,8 +194,12 @@ static int purge_stream(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; + h2_task *task = h2_ihash_get(m->tasks, stream->id); h2_ihash_remove(m->spurge, stream->id); h2_stream_destroy(stream); + if (task) { + task_destroy(m, task, 1); + } return 0; } @@ -386,7 +391,7 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master) static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) { - h2_task *task = h2_ihash_get(m->tasks, stream->id); + h2_task *task; /* Situation: we are, on the master connection, done with processing * the stream. Either we have handled it successfully, or the stream @@ -417,29 +422,28 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) * memory. We should either copy it on task creation or wait with the * stream destruction until the task is done. */ + h2_iq_remove(m->q, stream->id); + h2_ihash_remove(m->ready_tasks, stream->id); h2_ihash_remove(m->streams, stream->id); if (stream->input) { m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); } h2_stream_cleanup(stream); + task = h2_ihash_get(m->tasks, stream->id); if (task) { - /* Remove task from ready set, we will never submit it */ - h2_ihash_remove(m->ready_tasks, stream->id); - task->input.beam = NULL; - if (!task->worker_done) { /* task still running, cleanup once it is done */ if (rst_error) { h2_task_rst(task, rst_error); } - /* FIXME: this should work, but does not + /* FIXME: this should work, but does not h2_ihash_add(m->shold, stream); return;*/ + task->input.beam = NULL; } else { /* already finished */ - h2_iq_remove(m->q, task->stream_id); task_destroy(m, task, 0); } } @@ -492,6 +496,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) /* disable WINDOW_UPDATE callbacks */ h2_mplx_set_consumed_cb(m, NULL, NULL); + if (!h2_ihash_empty(m->shold)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): start release_join with %d streams in hold", + m->id, (int)h2_ihash_count(m->shold)); + } + if (!h2_ihash_empty(m->spurge)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): start release_join with %d streams to purge", + m->id, (int)h2_ihash_count(m->spurge)); + } + h2_iq_clear(m->q); apr_thread_cond_broadcast(m->task_thawed); while (!h2_ihash_iter(m->streams, stream_done_iter, m)) { @@ -499,19 +514,25 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } AP_DEBUG_ASSERT(h2_ihash_empty(m->streams)); + if (!h2_ihash_empty(m->shold)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): 2. release_join with %d streams in hold", + m->id, (int)h2_ihash_count(m->shold)); + } + if (!h2_ihash_empty(m->spurge)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): 2. release_join with %d streams to purge", + m->id, (int)h2_ihash_count(m->spurge)); + } + /* If we still have busy workers, we cannot release our memory - * pool yet, as slave connections have child pools of their respective - * h2_io's. - * Any remaining ios are processed in these workers. Any operation - * they do on their input/outputs will be errored ECONNRESET/ABORTED, - * so processing them should fail and workers *should* return. + * pool yet, as tasks have references to us. + * Any operation on the task slave connection will from now on + * be errored ECONNRESET/ABORTED, so processing them should fail + * and workers *should* return in a timely fashion. */ for (i = 0; m->workers_busy > 0; ++i) { m->join_wait = wait; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): release_join, waiting on %d tasks to report back", - m->id, (int)h2_ihash_count(m->tasks)); - status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); if (APR_STATUS_IS_TIMEUP(status)) { @@ -534,13 +555,23 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) apr_thread_cond_broadcast(m->task_thawed); } } - AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks)); - AP_DEBUG_ASSERT(h2_ihash_empty(m->shold)); - purge_streams(m); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) - "h2_mplx(%ld): release_join (%d tasks left) -> destroy", - m->id, (int)h2_ihash_count(m->tasks)); + AP_DEBUG_ASSERT(h2_ihash_empty(m->shold)); + if (!h2_ihash_empty(m->spurge)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): release_join %d streams to purge", + m->id, (int)h2_ihash_count(m->spurge)); + purge_streams(m); + } + AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge)); + AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks)); + + if (!h2_ihash_empty(m->tasks)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056) + "h2_mplx(%ld): release_join -> destroy, " + "%d tasks still present", + m->id, (int)h2_ihash_count(m->tasks)); + } leave_mutex(m, acquired); h2_mplx_destroy(m); /* all gone */ @@ -928,16 +959,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_task_thaw(task); /* we do not want the task to block on writing response * bodies into the mplx. */ - /* FIXME: this implementation is incomplete. */ h2_task_set_io_blocking(task, 0); apr_thread_cond_broadcast(m->task_thawed); return; } else { - h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); + h2_stream *stream; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): task(%s) done", m->id, task->id); out_close(m, task); + stream = h2_ihash_get(m->streams, task->stream_id); if (ngn) { apr_off_t bytes = 0; @@ -979,9 +1011,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_beam_on_consumed(task->output.beam, NULL, NULL); h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%s): request done, %f ms" - " elapsed", task->id, + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): request done, %f ms elapsed", task->id, (task->done_at - task->started_at) / 1000.0); if (task->started_at > m->last_idle_block) { /* this task finished without causing an 'idle block', e.g. @@ -1002,11 +1033,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) if (stream) { /* hang around until the stream deregisters */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): task_done, stream still open", + task->id); } else { + /* stream done, was it placed in hold? */ stream = h2_ihash_get(m->shold, task->stream_id); - task_destroy(m, task, 0); if (stream) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): task_done, stream in hold", + task->id); stream->response = NULL; /* ref from task memory */ /* We cannot destroy the stream here since this is * called from a worker thread and freeing memory pools @@ -1015,6 +1052,12 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_ihash_remove(m->shold, stream->id); h2_ihash_add(m->spurge, stream); } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): task_done, stream not found", + task->id); + task_destroy(m, task, 0); + } if (m->join_wait) { apr_thread_cond_signal(m->join_wait); diff --git a/modules/http2/h2_request.c b/modules/http2/h2_request.c index ca8a9bf7b9..2f23208062 100644 --- a/modules/http2/h2_request.c +++ b/modules/http2/h2_request.c @@ -42,26 +42,27 @@ static apr_status_t inspect_clen(h2_request *req, const char *s) return (s == end)? APR_EINVAL : APR_SUCCESS; } -apr_status_t h2_request_rwrite(h2_request *req, request_rec *r) +apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool, + request_rec *r) { apr_status_t status; const char *scheme, *authority; - scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme + scheme = apr_pstrdup(pool, r->parsed_uri.scheme? r->parsed_uri.scheme : ap_http_scheme(r)); - authority = r->hostname; + authority = apr_pstrdup(pool, r->hostname); if (!ap_strchr_c(authority, ':') && r->server && r->server->port) { apr_port_t defport = apr_uri_port_of_scheme(scheme); if (defport != r->server->port) { /* port info missing and port is not default for scheme: append */ - authority = apr_psprintf(r->pool, "%s:%d", authority, + authority = apr_psprintf(pool, "%s:%d", authority, (int)r->server->port); } } - status = h2_req_make(req, r->pool, r->method, scheme, authority, - apr_uri_unparse(r->pool, &r->parsed_uri, - APR_URI_UNP_OMITSITEPART), + status = h2_req_make(req, pool, apr_pstrdup(pool, r->method), scheme, + authority, apr_uri_unparse(pool, &r->parsed_uri, + APR_URI_UNP_OMITSITEPART), r->headers_in); return status; } diff --git a/modules/http2/h2_request.h b/modules/http2/h2_request.h index 168d3796a9..ba48f4a152 100644 --- a/modules/http2/h2_request.h +++ b/modules/http2/h2_request.h @@ -18,7 +18,8 @@ #include "h2.h" -apr_status_t h2_request_rwrite(h2_request *req, request_rec *r); +apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool, + request_rec *r); apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, const char *name, size_t nlen, diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 0f8accab92..aa62607496 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -315,6 +315,9 @@ static apr_status_t stream_release(h2_session *session, uint32_t error_code) { conn_rec *c = session->c; + apr_bucket *b; + apr_status_t status; + if (!error_code) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_stream(%ld-%d): handled, closing", @@ -333,8 +336,11 @@ static apr_status_t stream_release(h2_session *session, h2_stream_rst(stream, error_code); } - return h2_conn_io_writeb(&session->io, - h2_bucket_eos_create(c->bucket_alloc, stream), 0); + b = h2_bucket_eos_create(c->bucket_alloc, stream); + APR_BRIGADE_INSERT_TAIL(session->bbtmp, b); + status = h2_conn_io_pass(&session->io, session->bbtmp); + apr_brigade_cleanup(session->bbtmp); + return status; } static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, @@ -538,13 +544,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, return 0; } -static apr_status_t pass_data(void *ctx, - const char *data, apr_off_t length) -{ - return h2_conn_io_write(&((h2_session*)ctx)->io, data, length); -} - - static char immortal_zeros[H2_MAX_PADLEN]; static int on_send_data_cb(nghttp2_session *ngh2, @@ -582,49 +581,30 @@ static int on_send_data_cb(nghttp2_session *ngh2, "h2_stream(%ld-%d): send_data_cb for %ld bytes", session->id, (int)stream_id, (long)length); - if (h2_conn_io_is_buffered(&session->io)) { - status = h2_conn_io_write(&session->io, (const char *)framehd, 9); - if (status == APR_SUCCESS) { - if (padlen) { - status = h2_conn_io_write(&session->io, (const char *)&padlen, 1); - } - - if (status == APR_SUCCESS) { - apr_off_t len = length; - status = h2_stream_readx(stream, pass_data, session, &len, &eos); - if (status == APR_SUCCESS && len != length) { - status = APR_EINVAL; - } - } - - if (status == APR_SUCCESS && padlen) { - if (padlen) { - status = h2_conn_io_write(&session->io, immortal_zeros, padlen); - } - } - } + status = h2_conn_io_write(&session->io, (const char *)framehd, 9); + if (padlen && status == APR_SUCCESS) { + status = h2_conn_io_write(&session->io, (const char *)&padlen, 1); } - else { - status = h2_conn_io_write(&session->io, (const char *)framehd, 9); - if (padlen && status == APR_SUCCESS) { - status = h2_conn_io_write(&session->io, (const char *)&padlen, 1); - } - if (status == APR_SUCCESS) { - apr_off_t len = length; - status = h2_stream_read_to(stream, session->io.output, &len, &eos); - if (status == APR_SUCCESS && len != length) { - status = APR_EINVAL; - } - } - - if (status == APR_SUCCESS && padlen) { - b = apr_bucket_immortal_create(immortal_zeros, padlen, - session->c->bucket_alloc); - status = h2_conn_io_writeb(&session->io, b, 0); + + if (status == APR_SUCCESS) { + apr_off_t len = length; + status = h2_stream_read_to(stream, session->bbtmp, &len, &eos); + if (status == APR_SUCCESS && len != length) { + status = APR_EINVAL; } } + if (status == APR_SUCCESS && padlen) { + b = apr_bucket_immortal_create(immortal_zeros, padlen, + session->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(session->bbtmp, b); + } + if (status == APR_SUCCESS) { + status = h2_conn_io_pass(&session->io, session->bbtmp); + } + + apr_brigade_cleanup(session->bbtmp); if (status == APR_SUCCESS) { stream->data_frames_sent++; h2_conn_io_consider_pass(&session->io); @@ -684,45 +664,31 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb) return APR_SUCCESS; } -static void h2_session_cleanup(h2_session *session) -{ - AP_DEBUG_ASSERT(session); - /* This is an early cleanup of the session that may - * discard what is no longer necessary for *new* streams - * and general HTTP/2 processing. - * At this point, all frames are in transit or somehwere in - * our buffers or passed down output filters. - * h2 streams might still being written out. - */ - if (session->c) { - h2_ctx_clear(session->c); - } - if (session->ngh2) { - nghttp2_session_del(session->ngh2); - session->ngh2 = NULL; - } -} - static void h2_session_destroy(h2_session *session) { - AP_DEBUG_ASSERT(session); - - h2_session_cleanup(session); - AP_DEBUG_ASSERT(session->open_streams == h2_ihash_count(session->streams)); + AP_DEBUG_ASSERT(session); + h2_ihash_clear(session->streams); - session->open_streams = 0; - - ap_remove_input_filter_byhandle((session->r? session->r->input_filters : - session->c->input_filters), "H2_IN"); - if (APLOGctrace1(session->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - "h2_session(%ld): destroy", session->id); - } if (session->mplx) { h2_mplx_set_consumed_cb(session->mplx, NULL, NULL); h2_mplx_release_and_join(session->mplx, session->iowait); session->mplx = NULL; } + + ap_remove_input_filter_byhandle((session->r? session->r->input_filters : + session->c->input_filters), "H2_IN"); + if (session->ngh2) { + nghttp2_session_del(session->ngh2); + session->ngh2 = NULL; + } + if (session->c) { + h2_ctx_clear(session->c); + } + + if (APLOGctrace1(session->c)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%ld): destroy", session->id); + } if (session->pool) { apr_pool_destroy(session->pool); } @@ -903,7 +869,7 @@ static h2_session *h2_session_create_int(conn_rec *c, h2_session_receive, session); ap_add_input_filter("H2_IN", session->cin, r, c); - h2_conn_io_init(&session->io, c, session->config, session->pool); + h2_conn_io_init(&session->io, c, session->config); session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc); status = init_callbacks(c, &callbacks); @@ -1504,7 +1470,7 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream, apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): cleanup by EOS bucket destroy", + "h2_stream(%ld-%d): EOS bucket cleanup -> done", session->id, stream->id); h2_ihash_remove(session->streams, stream->id); --session->open_streams; diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 20d1d35042..b445da768b 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -203,6 +203,9 @@ void h2_stream_cleanup(h2_stream *stream) void h2_stream_destroy(h2_stream *stream) { AP_DEBUG_ASSERT(stream); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, + "h2_stream(%ld-%d): destroy", + stream->session->id, stream->id); if (stream->input) { h2_beam_destroy(stream->input); stream->input = NULL; @@ -248,7 +251,7 @@ apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r) return APR_ECONNRESET; } set_state(stream, H2_STREAM_ST_OPEN); - status = h2_request_rwrite(stream->request, r); + status = h2_request_rwrite(stream->request, stream->pool, r); stream->request->serialize = h2_config_geti(h2_config_rget(r), H2_CONF_SER_HEADERS); ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058) @@ -453,12 +456,14 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, return status; } +static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9); + apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, int *peos) { conn_rec *c = stream->session->c; apr_status_t status = APR_SUCCESS; - apr_off_t requested = (*plen > 0)? *plen : 32*1024; + apr_off_t requested; if (stream->rst_error) { *plen = 0; @@ -466,11 +471,19 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, return APR_ECONNRESET; } + if (*plen > 0) { + requested = H2MIN(*plen, DATA_CHUNK_SIZE); + } + else { + requested = DATA_CHUNK_SIZE; + } + *plen = requested; + H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre"); h2_util_bb_avail(stream->buffer, plen, peos); - if (!*peos && !*plen) { + if (!*peos && *plen < requested) { /* try to get more data */ - status = fill_buffer(stream, H2MIN(requested, 32*1024)); + status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE); if (APR_STATUS_IS_EOF(status)) { apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->buffer, eos); @@ -491,27 +504,6 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, } -apr_status_t h2_stream_readx(h2_stream *stream, - h2_io_data_cb *cb, void *ctx, - apr_off_t *plen, int *peos) -{ - conn_rec *c = stream->session->c; - apr_status_t status = APR_SUCCESS; - - if (stream->rst_error) { - return APR_ECONNRESET; - } - status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos); - if (status == APR_SUCCESS && !*peos && !*plen) { - status = APR_EAGAIN; - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c, - "h2_stream(%ld-%d): readx, len=%ld eos=%d", - c->id, stream->id, (long)*plen, *peos); - return status; -} - - apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_off_t *plen, int *peos) { diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 8ae600c78a..66dca0dbb4 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -204,23 +204,6 @@ apr_status_t h2_stream_set_response(h2_stream *stream, apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, int *peos); -/** - * Read data from the stream output. - * - * @param stream the stream to read from - * @param cb callback to invoke for byte chunks read. Might be invoked - * multiple times (with different values) for one read operation. - * @param ctx context data for callback - * @param plen (in-/out) max. number of bytes to read and on return actual - * number of bytes read - * @param peos (out) != 0 iff end of stream has been reached while reading - * @return APR_SUCCESS if out information was computed successfully. - * APR_EAGAIN if not data is available and end of stream has not been - * reached yet. - */ -apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, - void *ctx, apr_off_t *plen, int *peos); - /** * Read a maximum number of bytes into the bucket brigade. *