1
0
mirror of https://github.com/apache/httpd.git synced 2025-08-08 15:02:10 +03:00

mod_http2: rewrote TLS buffering on master connection

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1742005 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Stefan Eissing
2016-05-02 16:39:42 +00:00
parent f11dc9f80b
commit 55f94ec98c
9 changed files with 334 additions and 264 deletions

View File

@@ -1,6 +1,12 @@
-*- coding: utf-8 -*- -*- coding: utf-8 -*-
Changes with Apache 2.5.0 Changes with Apache 2.5.0
*) mod_http2: elimination of fixed master connectin buffer for TLS
connections. New scratch bucket handling optimized for TLS write sizes.
File bucket data read directly into scratch buffers, avoiding one
copy. Non-TLS connections continue to pass buckets unchanged to the core
filters to allow sendfile() usage.
*) mod_http2/mod_proxy_http2: h2_request.c is no longer shared between these *) mod_http2/mod_proxy_http2: h2_request.c is no longer shared between these
modules. This simplifies building on platforms such as Windows, as module modules. This simplifies building on platforms such as Windows, as module
reference used in logging is now clear. reference used in logging is now clear.

View File

@@ -127,23 +127,13 @@ static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level,
} }
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
const h2_config *cfg, const h2_config *cfg)
apr_pool_t *pool)
{ {
io->c = c; io->c = c;
io->output = apr_brigade_create(c->pool, c->bucket_alloc); io->output = apr_brigade_create(c->pool, c->bucket_alloc);
io->buflen = 0;
io->is_tls = h2_h2_is_tls(c); io->is_tls = h2_h2_is_tls(c);
io->buffer_output = io->is_tls; io->buffer_output = io->is_tls;
if (io->buffer_output) {
io->bufsize = WRITE_BUFFER_SIZE;
io->buffer = apr_pcalloc(pool, io->bufsize);
}
else {
io->bufsize = 0;
}
if (io->is_tls) { if (io->is_tls) {
/* This is what we start with, /* This is what we start with,
* see https://issues.apache.org/jira/browse/TS-2503 * see https://issues.apache.org/jira/browse/TS-2503
@@ -151,12 +141,13 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
io->warmup_size = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE); io->warmup_size = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE);
io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS) io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS)
* APR_USEC_PER_SEC); * APR_USEC_PER_SEC);
io->write_size = WRITE_SIZE_INITIAL; io->write_size = (io->cooldown_usecs > 0?
WRITE_SIZE_INITIAL : WRITE_SIZE_MAX);
} }
else { else {
io->warmup_size = 0; io->warmup_size = 0;
io->cooldown_usecs = 0; io->cooldown_usecs = 0;
io->write_size = io->bufsize; io->write_size = 0;
} }
if (APLOGctrace1(c)) { if (APLOGctrace1(c)) {
@@ -170,9 +161,95 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
return APR_SUCCESS; return APR_SUCCESS;
} }
#define LOG_SCRATCH 0
static void append_scratch(h2_conn_io *io)
{
if (io->scratch && io->slen > 0) {
apr_bucket *b = apr_bucket_heap_create(io->scratch, io->slen,
apr_bucket_free,
io->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(io->output, b);
#if LOG_SCRATCH
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
"h2_conn_io(%ld): append_scratch(%ld)",
io->c->id, (long)io->slen);
#endif
io->scratch = NULL;
io->slen = io->ssize = 0;
}
}
static apr_size_t assure_scratch_space(h2_conn_io *io) {
apr_size_t remain = io->ssize - io->slen;
if (io->scratch && remain == 0) {
append_scratch(io);
}
if (!io->scratch) {
/* we control the size and it is larger than what buckets usually
* allocate. */
io->scratch = apr_bucket_alloc(io->write_size, io->c->bucket_alloc);
io->ssize = io->write_size;
io->slen = 0;
remain = io->ssize;
}
return remain;
}
static apr_status_t read_to_scratch(h2_conn_io *io, apr_bucket *b)
{
apr_status_t status;
const char *data;
apr_size_t len;
if (!b->length) {
return APR_SUCCESS;
}
AP_DEBUG_ASSERT(b->length <= (io->ssize - io->slen));
if (APR_BUCKET_IS_FILE(b)) {
apr_bucket_file *f = (apr_bucket_file *)b->data;
apr_file_t *fd = f->fd;
apr_off_t offset = b->start;
apr_size_t len = b->length;
/* file buckets will either mmap (which we do not want) or
* read 8000 byte chunks and split themself. However, we do
* know *exactly* how many bytes we need where.
*/
status = apr_file_seek(fd, APR_SET, &offset);
if (status != APR_SUCCESS) {
return status;
}
status = apr_file_read(fd, io->scratch + io->slen, &len);
#if LOG_SCRATCH
ap_log_cerror(APLOG_MARK, APLOG_INFO, status, io->c,
"h2_conn_io(%ld): FILE_to_scratch(%ld)",
io->c->id, (long)len);
#endif
if (status != APR_SUCCESS && status != APR_EOF) {
return status;
}
io->slen += len;
}
else {
status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status == APR_SUCCESS) {
#if LOG_SCRATCH
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
"h2_conn_io(%ld): read_to_scratch(%ld)",
io->c->id, (long)b->length);
#endif
memcpy(io->scratch+io->slen, data, len);
io->slen += len;
}
}
return status;
}
int h2_conn_io_is_buffered(h2_conn_io *io) int h2_conn_io_is_buffered(h2_conn_io *io)
{ {
return io->bufsize > 0; return io->buffer_output;
} }
typedef struct { typedef struct {
@@ -208,16 +285,8 @@ static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx)
return status; return status;
} }
/* Bring the current buffer content into the output brigade, appropriately static void check_write_size(h2_conn_io *io)
* chunked.
*/
static apr_status_t bucketeer_buffer(h2_conn_io *io)
{ {
const char *data = io->buffer;
apr_size_t remaining = io->buflen;
apr_bucket *b;
int bcount, i;
if (io->write_size > WRITE_SIZE_INITIAL if (io->write_size > WRITE_SIZE_INITIAL
&& (io->cooldown_usecs > 0) && (io->cooldown_usecs > 0)
&& (apr_time_now() - io->last_write) >= io->cooldown_usecs) { && (apr_time_now() - io->last_write) >= io->cooldown_usecs) {
@@ -236,32 +305,6 @@ static apr_status_t bucketeer_buffer(h2_conn_io *io)
"h2_conn_io(%ld): threshold reached, write size now %ld", "h2_conn_io(%ld): threshold reached, write size now %ld",
(long)io->c->id, (long)io->write_size); (long)io->c->id, (long)io->write_size);
} }
bcount = (int)(remaining / io->write_size);
for (i = 0; i < bcount; ++i) {
b = apr_bucket_transient_create(data, io->write_size,
io->output->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(io->output, b);
data += io->write_size;
remaining -= io->write_size;
}
if (remaining > 0) {
b = apr_bucket_transient_create(data, remaining,
io->output->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(io->output, b);
}
return APR_SUCCESS;
}
apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush)
{
APR_BRIGADE_INSERT_TAIL(io->output, b);
if (flush) {
b = apr_bucket_flush_create(io->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(io->output, b);
}
return APR_SUCCESS;
} }
static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc) static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
@@ -269,17 +312,10 @@ static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
pass_out_ctx ctx; pass_out_ctx ctx;
apr_bucket *b; apr_bucket *b;
if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) { append_scratch(io);
if (APR_BRIGADE_EMPTY(io->output)) {
return APR_SUCCESS; return APR_SUCCESS;
} }
if (io->buflen > 0) {
/* something in the buffer, put it in the output brigade */
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
"h2_conn_io: flush, flushing %ld bytes",
(long)io->buflen);
bucketeer_buffer(io);
}
if (flush) { if (flush) {
b = apr_bucket_flush_create(io->c->bucket_alloc); b = apr_bucket_flush_create(io->c->bucket_alloc);
@@ -287,7 +323,6 @@ static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
} }
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush"); ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
io->buflen = 0;
ctx.c = io->c; ctx.c = io->c;
ctx.io = eoc? NULL : io; ctx.io = eoc? NULL : io;
@@ -307,10 +342,9 @@ apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
if (!APR_BRIGADE_EMPTY(io->output)) { if (!APR_BRIGADE_EMPTY(io->output)) {
len = h2_brigade_mem_size(io->output); len = h2_brigade_mem_size(io->output);
} if (len >= WRITE_BUFFER_SIZE) {
len += io->buflen; return h2_conn_io_flush_int(io, 1, 0);
if (len >= WRITE_BUFFER_SIZE) { }
return h2_conn_io_flush_int(io, 1, 0);
} }
return APR_SUCCESS; return APR_SUCCESS;
} }
@@ -322,48 +356,98 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session)
return h2_conn_io_flush_int(io, 1, 1); return h2_conn_io_flush_int(io, 1, 1);
} }
apr_status_t h2_conn_io_write(h2_conn_io *io, apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
const char *buf, size_t length)
{ {
apr_status_t status = APR_SUCCESS; apr_status_t status = APR_SUCCESS;
pass_out_ctx ctx; apr_size_t remain;
ctx.c = io->c; if (io->buffer_output) {
ctx.io = io; while (length > 0) {
if (io->bufsize > 0) { remain = assure_scratch_space(io);
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, if (remain >= length) {
"h2_conn_io: buffering %ld bytes", (long)length); #if LOG_SCRATCH
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
if (!APR_BRIGADE_EMPTY(io->output)) { "h2_conn_io(%ld): write_to_scratch(%ld)",
status = h2_conn_io_flush_int(io, 0, 0); io->c->id, (long)length);
} #endif
memcpy(io->scratch + io->slen, data, length);
while (length > 0 && (status == APR_SUCCESS)) { io->slen += length;
apr_size_t avail = io->bufsize - io->buflen; length = 0;
if (avail <= 0) {
status = h2_conn_io_flush_int(io, 0, 0);
}
else if (length > avail) {
memcpy(io->buffer + io->buflen, buf, avail);
io->buflen += avail;
length -= avail;
buf += avail;
} }
else { else {
memcpy(io->buffer + io->buflen, buf, length); #if LOG_SCRATCH
io->buflen += length; ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
length = 0; "h2_conn_io(%ld): write_to_scratch(%ld)",
break; io->c->id, (long)remain);
#endif
memcpy(io->scratch + io->slen, data, remain);
io->slen += remain;
data += remain;
length -= remain;
} }
} }
} }
else { else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c, status = apr_brigade_write(io->output, NULL, NULL, data, length);
"h2_conn_io: writing %ld bytes to brigade", (long)length); }
status = apr_brigade_write(io->output, pass_out, &ctx, buf, length); return status;
}
apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
{
apr_bucket *b;
apr_status_t status = APR_SUCCESS;
check_write_size(io);
while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
b = APR_BRIGADE_FIRST(bb);
if (APR_BUCKET_IS_METADATA(b)) {
/* need to finish any open scratch bucket, as meta data
* needs to be forward "in order". */
append_scratch(io);
APR_BUCKET_REMOVE(b);
APR_BRIGADE_INSERT_TAIL(io->output, b);
if (APR_BUCKET_IS_FLUSH(b)) {
status = h2_conn_io_flush_int(io, 0, 0);
}
}
else if (io->buffer_output) {
apr_size_t remain = assure_scratch_space(io);
if (b->length > remain) {
apr_bucket_split(b, remain);
if (io->slen == 0) {
/* complete write_size bucket, append unchanged */
APR_BUCKET_REMOVE(b);
APR_BRIGADE_INSERT_TAIL(io->output, b);
#if LOG_SCRATCH
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
"h2_conn_io(%ld): pass bucket(%ld)",
io->c->id, (long)b->length);
#endif
continue;
}
}
else {
/* bucket fits in remain, copy to scratch */
read_to_scratch(io, b);
apr_bucket_delete(b);
continue;
}
}
else {
/* no buffering, forward buckets setaside on flush */
if (APR_BUCKET_IS_TRANSIENT(b)) {
apr_bucket_setaside(b, io->c->pool);
}
APR_BUCKET_REMOVE(b);
APR_BRIGADE_INSERT_TAIL(io->output, b);
}
}
if (status == APR_SUCCESS) {
return h2_conn_io_consider_pass(io);
} }
return status; return status;
} }

View File

@@ -39,14 +39,13 @@ typedef struct {
apr_int64_t bytes_written; apr_int64_t bytes_written;
int buffer_output; int buffer_output;
char *buffer; char *scratch;
apr_size_t buflen; apr_size_t ssize;
apr_size_t bufsize; apr_size_t slen;
} h2_conn_io; } h2_conn_io;
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
const struct h2_config *cfg, const struct h2_config *cfg);
apr_pool_t *pool);
int h2_conn_io_is_buffered(h2_conn_io *io); int h2_conn_io_is_buffered(h2_conn_io *io);
@@ -59,12 +58,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
const char *buf, const char *buf,
size_t length); size_t length);
/** apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb);
* Append a bucket to the buffered output.
* @param io the connection io
* @param b the bucket to append
*/
apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush);
/** /**
* Append an End-Of-Connection bucket to the output that, once destroyed, * Append an End-Of-Connection bucket to the output that, once destroyed,

View File

@@ -167,6 +167,7 @@ static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
} }
static void have_out_data_for(h2_mplx *m, int stream_id); static void have_out_data_for(h2_mplx *m, int stream_id);
static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
static void check_tx_reservation(h2_mplx *m) static void check_tx_reservation(h2_mplx *m)
{ {
@@ -193,8 +194,12 @@ static int purge_stream(void *ctx, void *val)
{ {
h2_mplx *m = ctx; h2_mplx *m = ctx;
h2_stream *stream = val; h2_stream *stream = val;
h2_task *task = h2_ihash_get(m->tasks, stream->id);
h2_ihash_remove(m->spurge, stream->id); h2_ihash_remove(m->spurge, stream->id);
h2_stream_destroy(stream); h2_stream_destroy(stream);
if (task) {
task_destroy(m, task, 1);
}
return 0; return 0;
} }
@@ -386,7 +391,7 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
{ {
h2_task *task = h2_ihash_get(m->tasks, stream->id); h2_task *task;
/* Situation: we are, on the master connection, done with processing /* Situation: we are, on the master connection, done with processing
* the stream. Either we have handled it successfully, or the stream * the stream. Either we have handled it successfully, or the stream
@@ -417,29 +422,28 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
* memory. We should either copy it on task creation or wait with the * memory. We should either copy it on task creation or wait with the
* stream destruction until the task is done. * stream destruction until the task is done.
*/ */
h2_iq_remove(m->q, stream->id);
h2_ihash_remove(m->ready_tasks, stream->id);
h2_ihash_remove(m->streams, stream->id); h2_ihash_remove(m->streams, stream->id);
if (stream->input) { if (stream->input) {
m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
} }
h2_stream_cleanup(stream); h2_stream_cleanup(stream);
task = h2_ihash_get(m->tasks, stream->id);
if (task) { if (task) {
/* Remove task from ready set, we will never submit it */
h2_ihash_remove(m->ready_tasks, stream->id);
task->input.beam = NULL;
if (!task->worker_done) { if (!task->worker_done) {
/* task still running, cleanup once it is done */ /* task still running, cleanup once it is done */
if (rst_error) { if (rst_error) {
h2_task_rst(task, rst_error); h2_task_rst(task, rst_error);
} }
/* FIXME: this should work, but does not /* FIXME: this should work, but does not
h2_ihash_add(m->shold, stream); h2_ihash_add(m->shold, stream);
return;*/ return;*/
task->input.beam = NULL;
} }
else { else {
/* already finished */ /* already finished */
h2_iq_remove(m->q, task->stream_id);
task_destroy(m, task, 0); task_destroy(m, task, 0);
} }
} }
@@ -492,6 +496,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
/* disable WINDOW_UPDATE callbacks */ /* disable WINDOW_UPDATE callbacks */
h2_mplx_set_consumed_cb(m, NULL, NULL); h2_mplx_set_consumed_cb(m, NULL, NULL);
if (!h2_ihash_empty(m->shold)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): start release_join with %d streams in hold",
m->id, (int)h2_ihash_count(m->shold));
}
if (!h2_ihash_empty(m->spurge)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): start release_join with %d streams to purge",
m->id, (int)h2_ihash_count(m->spurge));
}
h2_iq_clear(m->q); h2_iq_clear(m->q);
apr_thread_cond_broadcast(m->task_thawed); apr_thread_cond_broadcast(m->task_thawed);
while (!h2_ihash_iter(m->streams, stream_done_iter, m)) { while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
@@ -499,19 +514,25 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
} }
AP_DEBUG_ASSERT(h2_ihash_empty(m->streams)); AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
if (!h2_ihash_empty(m->shold)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): 2. release_join with %d streams in hold",
m->id, (int)h2_ihash_count(m->shold));
}
if (!h2_ihash_empty(m->spurge)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): 2. release_join with %d streams to purge",
m->id, (int)h2_ihash_count(m->spurge));
}
/* If we still have busy workers, we cannot release our memory /* If we still have busy workers, we cannot release our memory
* pool yet, as slave connections have child pools of their respective * pool yet, as tasks have references to us.
* h2_io's. * Any operation on the task slave connection will from now on
* Any remaining ios are processed in these workers. Any operation * be errored ECONNRESET/ABORTED, so processing them should fail
* they do on their input/outputs will be errored ECONNRESET/ABORTED, * and workers *should* return in a timely fashion.
* so processing them should fail and workers *should* return.
*/ */
for (i = 0; m->workers_busy > 0; ++i) { for (i = 0; m->workers_busy > 0; ++i) {
m->join_wait = wait; m->join_wait = wait;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): release_join, waiting on %d tasks to report back",
m->id, (int)h2_ihash_count(m->tasks));
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
if (APR_STATUS_IS_TIMEUP(status)) { if (APR_STATUS_IS_TIMEUP(status)) {
@@ -534,13 +555,23 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
apr_thread_cond_broadcast(m->task_thawed); apr_thread_cond_broadcast(m->task_thawed);
} }
} }
AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
purge_streams(m);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
"h2_mplx(%ld): release_join (%d tasks left) -> destroy", if (!h2_ihash_empty(m->spurge)) {
m->id, (int)h2_ihash_count(m->tasks)); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): release_join %d streams to purge",
m->id, (int)h2_ihash_count(m->spurge));
purge_streams(m);
}
AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
if (!h2_ihash_empty(m->tasks)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy, "
"%d tasks still present",
m->id, (int)h2_ihash_count(m->tasks));
}
leave_mutex(m, acquired); leave_mutex(m, acquired);
h2_mplx_destroy(m); h2_mplx_destroy(m);
/* all gone */ /* all gone */
@@ -928,16 +959,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
h2_task_thaw(task); h2_task_thaw(task);
/* we do not want the task to block on writing response /* we do not want the task to block on writing response
* bodies into the mplx. */ * bodies into the mplx. */
/* FIXME: this implementation is incomplete. */
h2_task_set_io_blocking(task, 0); h2_task_set_io_blocking(task, 0);
apr_thread_cond_broadcast(m->task_thawed); apr_thread_cond_broadcast(m->task_thawed);
return; return;
} }
else { else {
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); h2_stream *stream;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id); "h2_mplx(%ld): task(%s) done", m->id, task->id);
out_close(m, task); out_close(m, task);
stream = h2_ihash_get(m->streams, task->stream_id);
if (ngn) { if (ngn) {
apr_off_t bytes = 0; apr_off_t bytes = 0;
@@ -979,9 +1011,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
h2_beam_on_consumed(task->output.beam, NULL, NULL); h2_beam_on_consumed(task->output.beam, NULL, NULL);
h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
} }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): request done, %f ms" "h2_mplx(%s): request done, %f ms elapsed", task->id,
" elapsed", task->id,
(task->done_at - task->started_at) / 1000.0); (task->done_at - task->started_at) / 1000.0);
if (task->started_at > m->last_idle_block) { if (task->started_at > m->last_idle_block) {
/* this task finished without causing an 'idle block', e.g. /* this task finished without causing an 'idle block', e.g.
@@ -1002,11 +1033,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
if (stream) { if (stream) {
/* hang around until the stream deregisters */ /* hang around until the stream deregisters */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): task_done, stream still open",
task->id);
} }
else { else {
/* stream done, was it placed in hold? */
stream = h2_ihash_get(m->shold, task->stream_id); stream = h2_ihash_get(m->shold, task->stream_id);
task_destroy(m, task, 0);
if (stream) { if (stream) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): task_done, stream in hold",
task->id);
stream->response = NULL; /* ref from task memory */ stream->response = NULL; /* ref from task memory */
/* We cannot destroy the stream here since this is /* We cannot destroy the stream here since this is
* called from a worker thread and freeing memory pools * called from a worker thread and freeing memory pools
@@ -1015,6 +1052,12 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
h2_ihash_remove(m->shold, stream->id); h2_ihash_remove(m->shold, stream->id);
h2_ihash_add(m->spurge, stream); h2_ihash_add(m->spurge, stream);
} }
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): task_done, stream not found",
task->id);
task_destroy(m, task, 0);
}
if (m->join_wait) { if (m->join_wait) {
apr_thread_cond_signal(m->join_wait); apr_thread_cond_signal(m->join_wait);

View File

@@ -42,26 +42,27 @@ static apr_status_t inspect_clen(h2_request *req, const char *s)
return (s == end)? APR_EINVAL : APR_SUCCESS; return (s == end)? APR_EINVAL : APR_SUCCESS;
} }
apr_status_t h2_request_rwrite(h2_request *req, request_rec *r) apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool,
request_rec *r)
{ {
apr_status_t status; apr_status_t status;
const char *scheme, *authority; const char *scheme, *authority;
scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme scheme = apr_pstrdup(pool, r->parsed_uri.scheme? r->parsed_uri.scheme
: ap_http_scheme(r)); : ap_http_scheme(r));
authority = r->hostname; authority = apr_pstrdup(pool, r->hostname);
if (!ap_strchr_c(authority, ':') && r->server && r->server->port) { if (!ap_strchr_c(authority, ':') && r->server && r->server->port) {
apr_port_t defport = apr_uri_port_of_scheme(scheme); apr_port_t defport = apr_uri_port_of_scheme(scheme);
if (defport != r->server->port) { if (defport != r->server->port) {
/* port info missing and port is not default for scheme: append */ /* port info missing and port is not default for scheme: append */
authority = apr_psprintf(r->pool, "%s:%d", authority, authority = apr_psprintf(pool, "%s:%d", authority,
(int)r->server->port); (int)r->server->port);
} }
} }
status = h2_req_make(req, r->pool, r->method, scheme, authority, status = h2_req_make(req, pool, apr_pstrdup(pool, r->method), scheme,
apr_uri_unparse(r->pool, &r->parsed_uri, authority, apr_uri_unparse(pool, &r->parsed_uri,
APR_URI_UNP_OMITSITEPART), APR_URI_UNP_OMITSITEPART),
r->headers_in); r->headers_in);
return status; return status;
} }

View File

@@ -18,7 +18,8 @@
#include "h2.h" #include "h2.h"
apr_status_t h2_request_rwrite(h2_request *req, request_rec *r); apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool,
request_rec *r);
apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
const char *name, size_t nlen, const char *name, size_t nlen,

View File

@@ -315,6 +315,9 @@ static apr_status_t stream_release(h2_session *session,
uint32_t error_code) uint32_t error_code)
{ {
conn_rec *c = session->c; conn_rec *c = session->c;
apr_bucket *b;
apr_status_t status;
if (!error_code) { if (!error_code) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): handled, closing", "h2_stream(%ld-%d): handled, closing",
@@ -333,8 +336,11 @@ static apr_status_t stream_release(h2_session *session,
h2_stream_rst(stream, error_code); h2_stream_rst(stream, error_code);
} }
return h2_conn_io_writeb(&session->io, b = h2_bucket_eos_create(c->bucket_alloc, stream);
h2_bucket_eos_create(c->bucket_alloc, stream), 0); APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
status = h2_conn_io_pass(&session->io, session->bbtmp);
apr_brigade_cleanup(session->bbtmp);
return status;
} }
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -538,13 +544,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
return 0; return 0;
} }
static apr_status_t pass_data(void *ctx,
const char *data, apr_off_t length)
{
return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
}
static char immortal_zeros[H2_MAX_PADLEN]; static char immortal_zeros[H2_MAX_PADLEN];
static int on_send_data_cb(nghttp2_session *ngh2, static int on_send_data_cb(nghttp2_session *ngh2,
@@ -582,49 +581,30 @@ static int on_send_data_cb(nghttp2_session *ngh2,
"h2_stream(%ld-%d): send_data_cb for %ld bytes", "h2_stream(%ld-%d): send_data_cb for %ld bytes",
session->id, (int)stream_id, (long)length); session->id, (int)stream_id, (long)length);
if (h2_conn_io_is_buffered(&session->io)) { status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
status = h2_conn_io_write(&session->io, (const char *)framehd, 9); if (padlen && status == APR_SUCCESS) {
if (status == APR_SUCCESS) { status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
if (padlen) {
status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
}
if (status == APR_SUCCESS) {
apr_off_t len = length;
status = h2_stream_readx(stream, pass_data, session, &len, &eos);
if (status == APR_SUCCESS && len != length) {
status = APR_EINVAL;
}
}
if (status == APR_SUCCESS && padlen) {
if (padlen) {
status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
}
}
}
} }
else {
status = h2_conn_io_write(&session->io, (const char *)framehd, 9); if (status == APR_SUCCESS) {
if (padlen && status == APR_SUCCESS) { apr_off_t len = length;
status = h2_conn_io_write(&session->io, (const char *)&padlen, 1); status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
} if (status == APR_SUCCESS && len != length) {
if (status == APR_SUCCESS) { status = APR_EINVAL;
apr_off_t len = length;
status = h2_stream_read_to(stream, session->io.output, &len, &eos);
if (status == APR_SUCCESS && len != length) {
status = APR_EINVAL;
}
}
if (status == APR_SUCCESS && padlen) {
b = apr_bucket_immortal_create(immortal_zeros, padlen,
session->c->bucket_alloc);
status = h2_conn_io_writeb(&session->io, b, 0);
} }
} }
if (status == APR_SUCCESS && padlen) {
b = apr_bucket_immortal_create(immortal_zeros, padlen,
session->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
}
if (status == APR_SUCCESS) {
status = h2_conn_io_pass(&session->io, session->bbtmp);
}
apr_brigade_cleanup(session->bbtmp);
if (status == APR_SUCCESS) { if (status == APR_SUCCESS) {
stream->data_frames_sent++; stream->data_frames_sent++;
h2_conn_io_consider_pass(&session->io); h2_conn_io_consider_pass(&session->io);
@@ -684,45 +664,31 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb)
return APR_SUCCESS; return APR_SUCCESS;
} }
static void h2_session_cleanup(h2_session *session)
{
AP_DEBUG_ASSERT(session);
/* This is an early cleanup of the session that may
* discard what is no longer necessary for *new* streams
* and general HTTP/2 processing.
* At this point, all frames are in transit or somehwere in
* our buffers or passed down output filters.
* h2 streams might still being written out.
*/
if (session->c) {
h2_ctx_clear(session->c);
}
if (session->ngh2) {
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
}
}
static void h2_session_destroy(h2_session *session) static void h2_session_destroy(h2_session *session)
{ {
AP_DEBUG_ASSERT(session); AP_DEBUG_ASSERT(session);
h2_session_cleanup(session);
AP_DEBUG_ASSERT(session->open_streams == h2_ihash_count(session->streams));
h2_ihash_clear(session->streams); h2_ihash_clear(session->streams);
session->open_streams = 0;
ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
session->c->input_filters), "H2_IN");
if (APLOGctrace1(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld): destroy", session->id);
}
if (session->mplx) { if (session->mplx) {
h2_mplx_set_consumed_cb(session->mplx, NULL, NULL); h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
h2_mplx_release_and_join(session->mplx, session->iowait); h2_mplx_release_and_join(session->mplx, session->iowait);
session->mplx = NULL; session->mplx = NULL;
} }
ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
session->c->input_filters), "H2_IN");
if (session->ngh2) {
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
}
if (session->c) {
h2_ctx_clear(session->c);
}
if (APLOGctrace1(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld): destroy", session->id);
}
if (session->pool) { if (session->pool) {
apr_pool_destroy(session->pool); apr_pool_destroy(session->pool);
} }
@@ -903,7 +869,7 @@ static h2_session *h2_session_create_int(conn_rec *c,
h2_session_receive, session); h2_session_receive, session);
ap_add_input_filter("H2_IN", session->cin, r, c); ap_add_input_filter("H2_IN", session->cin, r, c);
h2_conn_io_init(&session->io, c, session->config, session->pool); h2_conn_io_init(&session->io, c, session->config);
session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc); session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
status = init_callbacks(c, &callbacks); status = init_callbacks(c, &callbacks);
@@ -1504,7 +1470,7 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream) apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
{ {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): cleanup by EOS bucket destroy", "h2_stream(%ld-%d): EOS bucket cleanup -> done",
session->id, stream->id); session->id, stream->id);
h2_ihash_remove(session->streams, stream->id); h2_ihash_remove(session->streams, stream->id);
--session->open_streams; --session->open_streams;

View File

@@ -203,6 +203,9 @@ void h2_stream_cleanup(h2_stream *stream)
void h2_stream_destroy(h2_stream *stream) void h2_stream_destroy(h2_stream *stream)
{ {
AP_DEBUG_ASSERT(stream); AP_DEBUG_ASSERT(stream);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
"h2_stream(%ld-%d): destroy",
stream->session->id, stream->id);
if (stream->input) { if (stream->input) {
h2_beam_destroy(stream->input); h2_beam_destroy(stream->input);
stream->input = NULL; stream->input = NULL;
@@ -248,7 +251,7 @@ apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
return APR_ECONNRESET; return APR_ECONNRESET;
} }
set_state(stream, H2_STREAM_ST_OPEN); set_state(stream, H2_STREAM_ST_OPEN);
status = h2_request_rwrite(stream->request, r); status = h2_request_rwrite(stream->request, stream->pool, r);
stream->request->serialize = h2_config_geti(h2_config_rget(r), stream->request->serialize = h2_config_geti(h2_config_rget(r),
H2_CONF_SER_HEADERS); H2_CONF_SER_HEADERS);
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058) ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
@@ -453,12 +456,14 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
return status; return status;
} }
static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9);
apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_status_t h2_stream_out_prepare(h2_stream *stream,
apr_off_t *plen, int *peos) apr_off_t *plen, int *peos)
{ {
conn_rec *c = stream->session->c; conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS; apr_status_t status = APR_SUCCESS;
apr_off_t requested = (*plen > 0)? *plen : 32*1024; apr_off_t requested;
if (stream->rst_error) { if (stream->rst_error) {
*plen = 0; *plen = 0;
@@ -466,11 +471,19 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
return APR_ECONNRESET; return APR_ECONNRESET;
} }
if (*plen > 0) {
requested = H2MIN(*plen, DATA_CHUNK_SIZE);
}
else {
requested = DATA_CHUNK_SIZE;
}
*plen = requested;
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre"); H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
h2_util_bb_avail(stream->buffer, plen, peos); h2_util_bb_avail(stream->buffer, plen, peos);
if (!*peos && !*plen) { if (!*peos && *plen < requested) {
/* try to get more data */ /* try to get more data */
status = fill_buffer(stream, H2MIN(requested, 32*1024)); status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE);
if (APR_STATUS_IS_EOF(status)) { if (APR_STATUS_IS_EOF(status)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->buffer, eos); APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
@@ -491,27 +504,6 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
} }
apr_status_t h2_stream_readx(h2_stream *stream,
h2_io_data_cb *cb, void *ctx,
apr_off_t *plen, int *peos)
{
conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS;
if (stream->rst_error) {
return APR_ECONNRESET;
}
status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos);
if (status == APR_SUCCESS && !*peos && !*plen) {
status = APR_EAGAIN;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
"h2_stream(%ld-%d): readx, len=%ld eos=%d",
c->id, stream->id, (long)*plen, *peos);
return status;
}
apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
apr_off_t *plen, int *peos) apr_off_t *plen, int *peos)
{ {

View File

@@ -204,23 +204,6 @@ apr_status_t h2_stream_set_response(h2_stream *stream,
apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_status_t h2_stream_out_prepare(h2_stream *stream,
apr_off_t *plen, int *peos); apr_off_t *plen, int *peos);
/**
* Read data from the stream output.
*
* @param stream the stream to read from
* @param cb callback to invoke for byte chunks read. Might be invoked
* multiple times (with different values) for one read operation.
* @param ctx context data for callback
* @param plen (in-/out) max. number of bytes to read and on return actual
* number of bytes read
* @param peos (out) != 0 iff end of stream has been reached while reading
* @return APR_SUCCESS if out information was computed successfully.
* APR_EAGAIN if not data is available and end of stream has not been
* reached yet.
*/
apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb,
void *ctx, apr_off_t *plen, int *peos);
/** /**
* Read a maximum number of bytes into the bucket brigade. * Read a maximum number of bytes into the bucket brigade.
* *