diff --git a/CHANGES b/CHANGES index f01124afd8..16d64dd962 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,9 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_http2: input buffering and dynamic flow windows for increased + throughput. [Stefan Eissing] + *) mod_http2: h2 workers with improved scalability for better scheduling performance. There are H2MaxWorkers threads created at start and the number is kept constant. [Stefan Eissing] diff --git a/modules/http2/h2.h b/modules/http2/h2.h index 21a673244f..df809fd411 100644 --- a/modules/http2/h2.h +++ b/modules/http2/h2.h @@ -115,7 +115,8 @@ typedef enum { H2_SEV_CLOSED_L, H2_SEV_CLOSED_R, H2_SEV_CANCELLED, - H2_SEV_EOS_SENT + H2_SEV_EOS_SENT, + H2_SEV_IN_DATA_PENDING, } h2_stream_event_t; diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 16f0a049d7..7fb6cb6f79 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -246,15 +246,17 @@ static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl) apr_off_t len = beam->received_bytes - beam->cons_bytes_reported; h2_beam_io_callback *cb = beam->cons_io_cb; - if (cb) { - void *ctx = beam->cons_ctx; - - if (pbl) leave_yellow(beam, pbl); - cb(ctx, beam, len); - if (pbl) enter_yellow(beam, pbl); - rv = 1; + if (len > 0) { + if (cb) { + void *ctx = beam->cons_ctx; + + if (pbl) leave_yellow(beam, pbl); + cb(ctx, beam, len); + if (pbl) enter_yellow(beam, pbl); + rv = 1; + } + beam->cons_bytes_reported += len; } - beam->cons_bytes_reported += len; return rv; } @@ -1250,9 +1252,9 @@ void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg) if (beam && APLOG_C_IS_LEVEL(c,level)) { ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s", - c->id, beam->id, beam->tag, beam->closed, beam->aborted, - h2_beam_empty(beam), (long)h2_beam_get_buffered(beam), - msg); + (c->master? c->master->id : c->id), beam->id, beam->tag, + beam->closed, beam->aborted, h2_beam_empty(beam), + (long)h2_beam_get_buffered(beam), msg); } } diff --git a/modules/http2/h2_filter.c b/modules/http2/h2_filter.c index 617d350d59..3a8a3b1ad1 100644 --- a/modules/http2/h2_filter.c +++ b/modules/http2/h2_filter.c @@ -44,55 +44,80 @@ #define UNSET -1 #define H2MIN(x,y) ((x) < (y) ? (x) : (y)) -static apr_status_t consume_brigade(h2_filter_cin *cin, - apr_bucket_brigade *bb, - apr_read_type_e block) +static apr_status_t recv_RAW_DATA(conn_rec *c, h2_filter_cin *cin, + apr_bucket *b, apr_read_type_e block) { + h2_session *session = cin->session; apr_status_t status = APR_SUCCESS; - apr_size_t readlen = 0; + apr_size_t len; + const char *data; + ssize_t n; - while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { + status = apr_bucket_read(b, &data, &len, block); + + while (status == APR_SUCCESS && len > 0) { + n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len); - apr_bucket* bucket = APR_BRIGADE_FIRST(bb); - if (APR_BUCKET_IS_METADATA(bucket)) { - /* we do nothing regarding any meta here */ - } - else { - const char *bucket_data = NULL; - apr_size_t bucket_length = 0; - status = apr_bucket_read(bucket, &bucket_data, - &bucket_length, block); - - if (status == APR_SUCCESS && bucket_length > 0) { - apr_size_t consumed = 0; - - status = cin->cb(cin->cb_ctx, bucket_data, bucket_length, &consumed); - if (status == APR_SUCCESS && bucket_length > consumed) { - /* We have data left in the bucket. Split it. */ - status = apr_bucket_split(bucket, consumed); - } - readlen += consumed; - cin->start_read = apr_time_now(); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + H2_SSSN_MSG(session, "fed %ld bytes to nghttp2, %ld read"), + (long)len, (long)n); + if (n < 0) { + if (nghttp2_is_fatal((int)n)) { + h2_session_event(session, H2_SESSION_EV_PROTO_ERROR, + (int)n, nghttp2_strerror((int)n)); + status = APR_EGENERAL; } } - apr_bucket_delete(bucket); + else { + session->io.bytes_read += n; + if (len <= n) { + break; + } + len -= n; + data += n; + } } - if (readlen == 0 && status == APR_SUCCESS && block == APR_NONBLOCK_READ) { + return status; +} + +static apr_status_t recv_RAW_brigade(conn_rec *c, h2_filter_cin *cin, + apr_bucket_brigade *bb, + apr_read_type_e block) +{ + apr_status_t status = APR_SUCCESS; + apr_bucket* b; + int consumed = 0; + + h2_util_bb_log(c, c->id, APLOG_TRACE2, "RAW_in", bb); + while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { + b = APR_BRIGADE_FIRST(bb); + + if (APR_BUCKET_IS_METADATA(b)) { + /* nop */ + } + else { + status = recv_RAW_DATA(c, cin, b, block); + } + consumed = 1; + apr_bucket_delete(b); + } + + if (!consumed && status == APR_SUCCESS && block == APR_NONBLOCK_READ) { return APR_EAGAIN; } return status; } -h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx) +h2_filter_cin *h2_filter_cin_create(h2_session *session) { h2_filter_cin *cin; - cin = apr_pcalloc(p, sizeof(*cin)); - cin->pool = p; - cin->cb = cb; - cin->cb_ctx = ctx; - cin->start_read = UNSET; + cin = apr_pcalloc(session->pool, sizeof(*cin)); + if (!cin) { + return NULL; + } + cin->session = session; return cin; } @@ -110,11 +135,14 @@ apr_status_t h2_filter_core_input(ap_filter_t* f, h2_filter_cin *cin = f->ctx; apr_status_t status = APR_SUCCESS; apr_interval_time_t saved_timeout = UNSET; + const int trace1 = APLOGctrace1(f->c); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_session(%ld): read, %s, mode=%d, readbytes=%ld", - (long)f->c->id, (block == APR_BLOCK_READ)? - "BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_session(%ld): read, %s, mode=%d, readbytes=%ld", + (long)f->c->id, (block == APR_BLOCK_READ)? + "BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes); + } if (mode == AP_MODE_INIT || mode == AP_MODE_SPECULATIVE) { return ap_get_brigade(f->next, brigade, mode, block, readbytes); @@ -125,20 +153,16 @@ apr_status_t h2_filter_core_input(ap_filter_t* f, } if (!cin->bb) { - cin->bb = apr_brigade_create(cin->pool, f->c->bucket_alloc); + cin->bb = apr_brigade_create(cin->session->pool, f->c->bucket_alloc); } if (!cin->socket) { cin->socket = ap_get_conn_socket(f->c); } - cin->start_read = apr_time_now(); if (APR_BRIGADE_EMPTY(cin->bb)) { /* We only do a blocking read when we have no streams to process. So, * in httpd scoreboard lingo, we are in a KEEPALIVE connection state. - * When reading non-blocking, we do have streams to process and update - * child with NULL request. That way, any current request information - * in the scoreboard is preserved. */ if (block == APR_BLOCK_READ) { if (cin->timeout > 0) { @@ -155,13 +179,15 @@ apr_status_t h2_filter_core_input(ap_filter_t* f, switch (status) { case APR_SUCCESS: - status = consume_brigade(cin, cin->bb, block); + status = recv_RAW_brigade(f->c, cin, cin->bb, block); break; case APR_EOF: case APR_EAGAIN: case APR_TIMEUP: - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_session(%ld): read", f->c->id); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_session(%ld): read", f->c->id); + } break; default: ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, f->c, APLOGNO(03046) diff --git a/modules/http2/h2_filter.h b/modules/http2/h2_filter.h index b3e34cc5ba..093d4ea3bc 100644 --- a/modules/http2/h2_filter.h +++ b/modules/http2/h2_filter.h @@ -21,21 +21,16 @@ struct h2_headers; struct h2_stream; struct h2_session; -typedef apr_status_t h2_filter_cin_cb(void *ctx, - const char *data, apr_size_t len, - apr_size_t *readlen); - typedef struct h2_filter_cin { apr_pool_t *pool; - apr_bucket_brigade *bb; - h2_filter_cin_cb *cb; - void *cb_ctx; apr_socket_t *socket; apr_interval_time_t timeout; - apr_time_t start_read; + apr_bucket_brigade *bb; + struct h2_session *session; + apr_bucket *cur; } h2_filter_cin; -h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx); +h2_filter_cin *h2_filter_cin_create(struct h2_session *session); void h2_filter_cin_timeout_set(h2_filter_cin *cin, apr_interval_time_t timeout); diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 14f42d4d28..2251dcb510 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -618,6 +618,18 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task) return status; } +static int report_input_consumption(void *ctx, void *val) +{ + h2_stream *stream = val; + + (void)ctx; + if (stream->input) { + h2_beam_report_consumption(stream->input); + } + return 1; +} + + apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, apr_thread_cond_t *iowait) { @@ -633,6 +645,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, } else { purge_streams(m); + h2_ihash_iter(m->streams, report_input_consumption, m); m->added_output = iowait; status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); if (APLOGctrace2(m->c)) { diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 4c5a3c5322..d05d998e10 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -56,6 +56,7 @@ static void transit(h2_session *session, const char *action, static void on_stream_state_enter(void *ctx, h2_stream *stream); static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev); +static void on_stream_event(void *ctx, h2_stream *stream, h2_stream_event_t ev); static int h2_session_status_from_apr_status(apr_status_t rv) { @@ -71,26 +72,79 @@ static int h2_session_status_from_apr_status(apr_status_t rv) return NGHTTP2_ERR_PROTO; } +static h2_stream *get_stream(h2_session *session, int stream_id) +{ + return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); +} + static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) { - h2_session *session = (h2_session*)ctx; - while (bytes_read > 0) { - int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read; - nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): consumed %d bytes", - session->id, stream_id, len); - bytes_read -= len; + h2_session *session = ctx; + + if (bytes_read > 0) { + h2_stream *stream = get_stream(session, stream_id); + apr_off_t consumed = bytes_read; + + while (consumed > 0) { + int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read; + nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read); + consumed -= len; + } + + if (stream) { + int cur_size = nghttp2_session_get_stream_local_window_size( + session->ngh2, stream->id); + int win = stream->in_window_size; + int thigh = win * 8/10; + int tlow = win * 2/10; + const int win_max = 2*1024*1024; + const int win_min = 32*1024; + + /* Work in progress, probably shoud add directives for these + * values once this stabilizes somewhat. The general idea is + * to adapt stream window sizes if the input window changes + * a) very quickly (< good RTT) from full to empty + * b) only a little bit (> bad RTT) + * where in a) it grows and in b) it shrinks again. + */ + if (cur_size > thigh && bytes_read > thigh && win < win_max) { + /* almost empty again with one reported consumption, how + * long did this take? */ + long ms = apr_time_msec(apr_time_now() - stream->in_last_write); + if (ms < 40) { + win = H2MIN(win_max, win + (64*1024)); + } + } + else if (cur_size < tlow && bytes_read < tlow && win > win_min) { + /* staying full, for how long already? */ + long ms = apr_time_msec(apr_time_now() - stream->in_last_write); + if (ms > 700) { + win = H2MAX(win_min, win - (32*1024)); + } + } + + if (win != stream->in_window_size) { + stream->in_window_size = win; + nghttp2_session_set_local_window_size(session->ngh2, + NGHTTP2_FLAG_NONE, stream_id, win); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d", + session->id, stream_id, (long)bytes_read, + cur_size, stream->in_window_size); + } } } -static apr_status_t h2_session_receive(void *ctx, - const char *data, apr_size_t len, - apr_size_t *readlen); - static void dispatch_event(h2_session *session, h2_session_event_t ev, int err, const char *msg); +void h2_session_event(h2_session *session, h2_session_event_t ev, + int err, const char *msg) +{ + dispatch_event(session, ev, err, msg); +} + static int rst_unprocessed_stream(h2_stream *stream, void *ctx) { int unprocessed = (!h2_stream_was_closed(stream) @@ -227,11 +281,6 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2, return 0; } -static h2_stream *get_stream(h2_session *session, int stream_id) -{ - return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); -} - static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *userp) @@ -803,23 +852,35 @@ static apr_status_t h2_session_create_int(h2_session **psession, return status; } + session->in_pending = h2_iq_create(session->pool, session->max_stream_count); + if (session->in_pending == NULL) { + apr_pool_destroy(pool); + return APR_ENOMEM; + } + + session->in_process = h2_iq_create(session->pool, session->max_stream_count); + if (session->in_process == NULL) { + apr_pool_destroy(pool); + return APR_ENOMEM; + } + session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor)); if (session->monitor == NULL) { apr_pool_destroy(pool); - return status; + return APR_ENOMEM; } session->monitor->ctx = session; session->monitor->on_state_enter = on_stream_state_enter; session->monitor->on_state_event = on_stream_state_event; + session->monitor->on_event = on_stream_event; session->mplx = h2_mplx_create(c, session->pool, session->config, workers); h2_mplx_set_consumed_cb(session->mplx, update_window, session); - /* Install the connection input filter that feeds the session */ - session->cin = h2_filter_cin_create(session->pool, - h2_session_receive, session); + /* connection input filter that feeds the session */ + session->cin = h2_filter_cin_create(session); ap_add_input_filter("H2_IN", session->cin, r, c); h2_conn_io_init(&session->io, c, session->config); @@ -1431,7 +1492,8 @@ send_headers: if (!stream->has_response) { /* but no response */ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - H2_STRM_LOG(APLOGNO(03466), stream, "no response, RST_STREAM")); + H2_STRM_LOG(APLOGNO(03466), stream, + "no response, RST_STREAM")); h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR); return APR_SUCCESS; } @@ -1444,32 +1506,32 @@ send_headers: return status; } -static apr_status_t h2_session_receive(void *ctx, const char *data, - apr_size_t len, apr_size_t *readlen) +static void h2_session_in_flush(h2_session *session) { - h2_session *session = ctx; - ssize_t n; + int id; - if (len > 0) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - H2_SSSN_MSG(session, "feeding %ld bytes to nghttp2"), - (long)len); - n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len); - if (n < 0) { - if (nghttp2_is_fatal((int)n)) { - dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror((int)n)); - return APR_EGENERAL; + while ((id = h2_iq_shift(session->in_process)) > 0) { + h2_stream *stream = get_stream(session, id); + if (stream) { + ap_assert(!stream->scheduled); + if (h2_stream_prep_processing(stream) == APR_SUCCESS) { + h2_mplx_process(session->mplx, stream, stream_pri_cmp, session); + } + else { + h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); } } - else { - *readlen = n; - session->io.bytes_read += n; + } + + while ((id = h2_iq_shift(session->in_pending)) > 0) { + h2_stream *stream = get_stream(session, id); + if (stream) { + h2_stream_flush_input(stream); } } - return APR_SUCCESS; } -static apr_status_t h2_session_read(h2_session *session, int block) +static apr_status_t session_read(h2_session *session, apr_size_t readlen, int block) { apr_status_t status, rstatus = APR_EAGAIN; conn_rec *c = session->c; @@ -1481,7 +1543,7 @@ static apr_status_t h2_session_read(h2_session *session, int block) status = ap_get_brigade(c->input_filters, session->bbtmp, AP_MODE_READBYTES, block? APR_BLOCK_READ : APR_NONBLOCK_READ, - APR_BUCKET_BUFF_SIZE); + H2MAX(APR_BUCKET_BUFF_SIZE, readlen)); /* get rid of any possible data we do not expect to get */ apr_brigade_cleanup(session->bbtmp); @@ -1523,16 +1585,25 @@ static apr_status_t h2_session_read(h2_session *session, int block) * status. */ return rstatus; } - if ((session->io.bytes_read - read_start) > (64*1024)) { + if ((session->io.bytes_read - read_start) > readlen) { /* read enough in one go, give write a chance */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c, - H2_SSSN_MSG(session, "read 64k, returning")); + H2_SSSN_MSG(session, "read enough, returning")); break; } } return rstatus; } +static apr_status_t h2_session_read(h2_session *session, int block) +{ + apr_status_t status = session_read(session, session->max_stream_mem + * H2MAX(2, session->open_streams), + block); + h2_session_in_flush(session); + return status; +} + static const char *StateNames[] = { "INIT", /* H2_SESSION_ST_INIT */ "DONE", /* H2_SESSION_ST_DONE */ @@ -1769,6 +1840,7 @@ static void h2_session_ev_pre_close(h2_session *session, int arg, const char *ms static void ev_stream_open(h2_session *session, h2_stream *stream) { + h2_iq_append(session->in_process, stream->id); switch (session->state) { case H2_SESSION_ST_IDLE: if (session->open_streams == 1) { @@ -1779,14 +1851,6 @@ static void ev_stream_open(h2_session *session, h2_stream *stream) default: break; } - - ap_assert(!stream->scheduled); - if (h2_stream_prep_processing(stream) == APR_SUCCESS) { - h2_mplx_process(session->mplx, stream, stream_pri_cmp, session); - } - else { - h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); - } } static void ev_stream_closed(h2_session *session, h2_stream *stream) @@ -1863,6 +1927,20 @@ static void on_stream_state_enter(void *ctx, h2_stream *stream) } } +static void on_stream_event(void *ctx, h2_stream *stream, + h2_stream_event_t ev) +{ + h2_session *session = ctx; + switch (ev) { + case H2_SEV_IN_DATA_PENDING: + h2_iq_append(session->in_pending, stream->id); + break; + default: + /* NOP */ + break; + } +} + static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev) { diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index fb3cd3d573..5751aed7bd 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -125,6 +125,10 @@ typedef struct h2_session { char status[64]; /* status message for scoreboard */ int last_status_code; /* the one already reported */ const char *last_status_msg; /* the one already reported */ + + struct h2_iqueue *in_pending; /* all streams with input pending */ + struct h2_iqueue *in_process; /* all streams ready for processing on slave */ + } h2_session; const char *h2_session_state_str(h2_session_state state); @@ -155,6 +159,9 @@ apr_status_t h2_session_rcreate(h2_session **psession, request_rec *r, struct h2_ctx *ctx, struct h2_workers *workers); +void h2_session_event(h2_session *session, h2_session_event_t ev, + int err, const char *msg); + /** * Process the given HTTP/2 session until it is ended or a fatal * error occurred. diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 71c38ca263..835bf238ff 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -157,9 +157,15 @@ static int on_frame_recv(h2_stream_state_t state, int frame_type) return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv)); } -static int on_event(h2_stream_state_t state, h2_stream_event_t ev) +static int on_event(h2_stream* stream, h2_stream_event_t ev) { - return on_map(state, trans_on_event[ev]); + if (stream->monitor && stream->monitor->on_event) { + stream->monitor->on_event(stream->monitor->ctx, stream, ev); + } + if (ev < H2_ALEN(trans_on_event)) { + return on_map(stream->state, trans_on_event[ev]); + } + return stream->state; } static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag) @@ -176,11 +182,16 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag) } static apr_status_t setup_input(h2_stream *stream) { - if (stream->input == NULL && !stream->input_eof) { - h2_beam_create(&stream->input, stream->pool, stream->id, - "input", H2_BEAM_OWNER_SEND, 0, - stream->session->s->timeout); - h2_beam_send_from(stream->input, stream->pool); + if (stream->input == NULL) { + int empty = (stream->input_eof + && (!stream->in_buffer + || APR_BRIGADE_EMPTY(stream->in_buffer))); + if (!empty) { + h2_beam_create(&stream->input, stream->pool, stream->id, + "input", H2_BEAM_OWNER_SEND, 0, + stream->session->s->timeout); + h2_beam_send_from(stream->input, stream->pool); + } } return APR_SUCCESS; } @@ -202,27 +213,27 @@ static apr_status_t close_input(h2_stream *stream) } if (stream->trailers && !apr_is_empty_table(stream->trailers)) { - apr_bucket_brigade *tmp; apr_bucket *b; h2_headers *r; - tmp = apr_brigade_create(stream->pool, c->bucket_alloc); + if (!stream->in_buffer) { + stream->in_buffer = apr_brigade_create(stream->pool, c->bucket_alloc); + } r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool); stream->trailers = NULL; b = h2_bucket_headers_create(c->bucket_alloc, r); - APR_BRIGADE_INSERT_TAIL(tmp, b); + APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b); b = apr_bucket_eos_create(c->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(tmp, b); + APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, H2_STRM_MSG(stream, "added trailers")); - setup_input(stream); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); + h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING); } if (stream->input) { + h2_stream_flush_input(stream); return h2_beam_close(stream->input); } return status; @@ -329,7 +340,7 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev) ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, H2_STRM_MSG(stream, "dispatch event %d"), ev); - new_state = on_event(stream->state, ev); + new_state = on_event(stream, ev); if (new_state < 0) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev); @@ -340,7 +351,7 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev) else if (new_state == stream->state) { /* nop */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - H2_STRM_MSG(stream, "ignored event %d"), ev); + H2_STRM_MSG(stream, "non-state event %d"), ev); return; } else { @@ -399,7 +410,7 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags) H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos); status = transit(stream, new_state); if (status == APR_SUCCESS && eos) { - status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L)); + status = transit(stream, on_event(stream, H2_SEV_CLOSED_L)); } return status; } @@ -449,7 +460,23 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags) } status = transit(stream, new_state); if (status == APR_SUCCESS && eos) { - status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R)); + status = transit(stream, on_event(stream, H2_SEV_CLOSED_R)); + } + return status; +} + +apr_status_t h2_stream_flush_input(h2_stream *stream) +{ + apr_status_t status = APR_SUCCESS; + + if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) { + setup_input(stream); + status = h2_beam_send(stream->input, stream->in_buffer, APR_BLOCK_READ); + stream->in_last_write = apr_time_now(); + } + if (stream->input_eof + && stream->input && !h2_beam_is_closed(stream->input)) { + status = h2_beam_close(stream->input); } return status; } @@ -459,21 +486,27 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, { h2_session *session = stream->session; apr_status_t status = APR_SUCCESS; - apr_bucket_brigade *tmp; - ap_assert(stream); - if (len > 0) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); - - tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc); - apr_brigade_write(tmp, NULL, NULL, (const char *)data, len); - setup_input(stream); - status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ); - apr_brigade_destroy(tmp); - } stream->in_data_frames++; - stream->in_data_octets += len; + if (len > 0) { + if (APLOGctrace3(session->c)) { + const char *load = apr_pstrndup(stream->pool, (const char *)data, len); + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c, + H2_STRM_MSG(stream, "recv DATA, len=%d: -->%s<--"), + (int)len, load); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, + H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len); + } + stream->in_data_octets += len; + if (!stream->in_buffer) { + stream->in_buffer = apr_brigade_create(stream->pool, + session->c->bucket_alloc); + } + apr_brigade_write(stream->in_buffer, NULL, NULL, (const char *)data, len); + h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING); + } return status; } @@ -500,7 +533,11 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session, h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0, session->s->timeout); - + + stream->in_window_size = + nghttp2_session_get_stream_local_window_size( + stream->session->ngh2, stream->id); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, H2_STRM_LOG(APLOGNO(03082), stream, "created")); on_state_enter(stream); diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index bbb2dc7677..15d4399540 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -57,6 +57,8 @@ typedef struct h2_stream_monitor { was detected */ h2_stream_event_cb *on_state_event; /* called right before the given event result in a new stream state */ + h2_stream_event_cb *on_event; /* called for events that do not + trigger a state change */ } h2_stream_monitor; struct h2_stream { @@ -74,9 +76,13 @@ struct h2_stream { int request_headers_added; /* number of request headers added */ struct h2_bucket_beam *input; + apr_bucket_brigade *in_buffer; + int in_window_size; + apr_time_t in_last_write; + struct h2_bucket_beam *output; - apr_size_t max_mem; /* maximum amount of data buffered */ apr_bucket_brigade *out_buffer; + apr_size_t max_mem; /* maximum amount of data buffered */ int rst_error; /* stream error for RST_STREAM */ unsigned int aborted : 1; /* was aborted */ @@ -205,6 +211,8 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int frame_type, int flags); apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, const uint8_t *data, size_t len); +apr_status_t h2_stream_flush_input(h2_stream *stream); + /** * Reset the stream. Stream write/reads will return errors afterwards. * @@ -291,7 +299,6 @@ const char *h2_stream_state_str(h2_stream *stream); */ int h2_stream_is_ready(h2_stream *stream); - #define H2_STRM_MSG(s, msg) \ "h2_stream(%ld-%d,%s): "msg, s->session->id, s->id, h2_stream_state_str(s) diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 8e668b8c4f..a93fce4cd2 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -216,14 +216,18 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, apr_status_t status = APR_SUCCESS; apr_bucket *b, *next; apr_off_t bblen; - apr_size_t rmax; + const int trace1 = APLOGctrace1(f->c); + apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)? + (apr_size_t)readbytes : APR_SIZE_MAX); task = h2_ctx_cget_task(f->c); ap_assert(task); - rmax = ((readbytes <= APR_SIZE_MAX)? (apr_size_t)readbytes : APR_SIZE_MAX); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld", - task->id, mode, block, (long)readbytes); + + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld", + task->id, mode, block, (long)readbytes); + } if (mode == AP_MODE_INIT) { return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes); @@ -249,19 +253,23 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, while (APR_BRIGADE_EMPTY(task->input.bb)) { /* Get more input data for our request. */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_slave_in(%s): get more data from mplx, block=%d, " - "readbytes=%ld", task->id, block, (long)readbytes); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_slave_in(%s): get more data from mplx, block=%d, " + "readbytes=%ld", task->id, block, (long)readbytes); + } if (task->input.beam) { status = h2_beam_receive(task->input.beam, task->input.bb, block, - H2MIN(readbytes, 32*1024)); + 128*1024); } else { status = APR_EOF; } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, - "h2_slave_in(%s): read returned", task->id); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c, + "h2_slave_in(%s): read returned", task->id); + } if (APR_STATUS_IS_EAGAIN(status) && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) { /* chunked input handling does not seem to like it if we @@ -275,9 +283,11 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, else if (status != APR_SUCCESS) { return status; } - - h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, - "input.beam recv raw", task->input.bb); + + if (trace1) { + h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, + "input.beam recv raw", task->input.bb); + } if (h2_task_logio_add_bytes_in) { apr_brigade_length(bb, 0, &bblen); h2_task_logio_add_bytes_in(f->c, bblen); @@ -291,12 +301,16 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, return (mode == AP_MODE_SPECULATIVE)? APR_EAGAIN : APR_EOF; } - h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, - "task_input.bb", task->input.bb); + if (trace1) { + h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2, + "task_input.bb", task->input.bb); + } if (APR_BRIGADE_EMPTY(task->input.bb)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_slave_in(%s): no data", task->id); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, + "h2_slave_in(%s): no data", task->id); + } return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF; } @@ -321,9 +335,11 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, apr_size_t len = sizeof(buffer)-1; apr_brigade_flatten(bb, buffer, &len); buffer[len] = 0; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_slave_in(%s): getline: %s", - task->id, buffer); + if (trace1) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, + "h2_slave_in(%s): getline: %s", + task->id, buffer); + } } } else { @@ -336,7 +352,7 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f, status = APR_ENOTIMPL; } - if (APLOGctrace1(f->c)) { + if (trace1) { apr_brigade_length(bb, 0, &bblen); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_slave_in(%s): %ld data bytes", task->id, (long)bblen); diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 0bd74dc28e..4f4616e5c5 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -991,17 +991,16 @@ apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest, apr_bucket_brigade *src, apr_off_t length) { - apr_bucket *b, *next; + apr_bucket *b; apr_off_t remain = length; apr_status_t status = APR_SUCCESS; - for (b = APR_BRIGADE_FIRST(src); - b != APR_BRIGADE_SENTINEL(src); - b = next) { - next = APR_BUCKET_NEXT(b); + while (!APR_BRIGADE_EMPTY(src)) { + b = APR_BRIGADE_FIRST(src); if (APR_BUCKET_IS_METADATA(b)) { - /* fall through */ + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(dest, b); } else { if (remain == b->length) { @@ -1024,10 +1023,10 @@ apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest, apr_bucket_split(b, remain); } } + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(dest, b); + remain -= b->length; } - APR_BUCKET_REMOVE(b); - APR_BRIGADE_INSERT_TAIL(dest, b); - remain -= b->length; } return status; } @@ -1215,55 +1214,14 @@ apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax, if (bmax <= off) { return off; } - if (APR_BUCKET_IS_METADATA(b)) { - if (APR_BUCKET_IS_EOS(b)) { - off += apr_snprintf(buffer+off, bmax-off, "eos"); - } - else if (APR_BUCKET_IS_FLUSH(b)) { - off += apr_snprintf(buffer+off, bmax-off, "flush"); - } - else if (AP_BUCKET_IS_EOR(b)) { - off += apr_snprintf(buffer+off, bmax-off, "eor"); - } - else { - off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name); - } + else if (APR_BUCKET_IS_METADATA(b)) { + off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name); } - else { - const char *btype = b->type->name; - if (APR_BUCKET_IS_FILE(b)) { - btype = "file"; - } - else if (APR_BUCKET_IS_PIPE(b)) { - btype = "pipe"; - } - else if (APR_BUCKET_IS_SOCKET(b)) { - btype = "socket"; - } - else if (APR_BUCKET_IS_HEAP(b)) { - btype = "heap"; - } - else if (APR_BUCKET_IS_TRANSIENT(b)) { - btype = "transient"; - } - else if (APR_BUCKET_IS_IMMORTAL(b)) { - btype = "immortal"; - } -#if APR_HAS_MMAP - else if (APR_BUCKET_IS_MMAP(b)) { - btype = "mmap"; - } -#endif - else if (APR_BUCKET_IS_POOL(b)) { - btype = "pool"; - } - - if (bmax > off) { - off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]", - btype, - (long)(b->length == ((apr_size_t)-1)? - -1 : b->length)); - } + else if (bmax > off) { + off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]", + b->type->name, + (long)(b->length == ((apr_size_t)-1)? + -1 : b->length)); } return off; } diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 503a8329bf..f6a4b9a43d 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -430,8 +430,8 @@ do { \ const char *line = "(null)"; \ apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \ len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \ - ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%s): %s", \ - (c)->log_id, (len? buffer : line)); \ + ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld): %s", \ + ((c)->master? (c)->master->id : (c)->id), (len? buffer : line)); \ } while(0)