diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 663139624e..d4f56c6630 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -165,7 +165,7 @@ apr_status_t h2_conn_process(conn_rec *c, request_rec *r) ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, "h2_session(%ld): starting on %s:%d", session->id, - session->c->base_server->defn_name, + session->c->base_server->server_hostname, session->c->local_addr->port); if (status != APR_SUCCESS) { h2_session_abort(session, status, rv); diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 7a241146f0..aa8d4d5802 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -347,7 +347,6 @@ apr_status_t h2_conn_io_consider_flush(h2_conn_io *io) static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int force) { if (io->unflushed || force) { - apr_status_t status; if (io->buflen > 0) { /* something in the buffer, put it in the output brigade */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, io->connection, @@ -361,18 +360,13 @@ static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int force) apr_bucket_flush_create(io->output->bucket_alloc)); } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, io->connection, + "h2_conn_io: flush"); /* Send it out */ - status = pass_out(io->output, io); - - if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, io->connection, - "h2_conn_io: flush"); - return status; - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, io->connection, - "h2_conn_io: flushed"); io->unflushed = 0; + return pass_out(io->output, io); + /* no more access after this, as we might have flushed an EOC bucket + * that de-allocated us all. */ } return APR_SUCCESS; } diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 715bc5616c..b33faee1f3 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -67,7 +67,7 @@ void h2_io_rst(h2_io *io, int error) int h2_io_in_has_eos_for(h2_io *io) { - return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, 0)); + return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1)); } int h2_io_out_has_data(h2_io *io) @@ -124,12 +124,12 @@ apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb) if (io->eos_in) { return APR_EOF; } - io->eos_in = h2_util_has_eos(bb, 0); + io->eos_in = h2_util_has_eos(bb, -1); if (!APR_BRIGADE_EMPTY(bb)) { if (!io->bbin) { io->bbin = apr_brigade_create(io->pool, io->bucket_alloc); } - return h2_util_move(io->bbin, bb, 0, NULL, "h2_io_in_write"); + return h2_util_move(io->bbin, bb, -1, NULL, "h2_io_in_write"); } return APR_SUCCESS; } @@ -266,7 +266,7 @@ apr_status_t h2_io_out_close(h2_io *io) if (!io->bbout) { io->bbout = apr_brigade_create(io->pool, io->bucket_alloc); } - if (!io->eos_out && !h2_util_has_eos(io->bbout, 0)) { + if (!io->eos_out && !h2_util_has_eos(io->bbout, -1)) { APR_BRIGADE_INSERT_TAIL(io->bbout, apr_bucket_eos_create(io->bbout->bucket_alloc)); } diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index f318fd16a0..8c38bccdc9 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -16,7 +16,6 @@ #include #include -#include #include #include #include @@ -46,6 +45,19 @@ #include "h2_util.h" +#define H2_MPLX_IO_OUT(lvl,m,io,msg) \ + do { \ + if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ + h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \ + } while(0) + +#define H2_MPLX_IO_IN(lvl,m,io,msg) \ + do { \ + if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ + h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbin); \ + } while(0) + + static int is_aborted(h2_mplx *m, apr_status_t *pstatus) { AP_DEBUG_ASSERT(m); if (m->aborted) { @@ -112,9 +124,9 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers) if (m) { m->id = c->id; APR_RING_ELEM_INIT(m, link); - apr_atomic_set32(&m->refs, 1); + m->refs = 1; m->c = c; - apr_pool_create_ex(&m->pool, NULL, NULL, allocator); + apr_pool_create_ex(&m->pool, parent, NULL, allocator); if (!m->pool) { return NULL; } @@ -140,30 +152,28 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers) return m; } -static void reference(h2_mplx *m) -{ - apr_atomic_inc32(&m->refs); -} - static void release(h2_mplx *m, int lock) { - if (!apr_atomic_dec32(&m->refs)) { - if (lock) { - apr_thread_mutex_lock(m->lock); - } + if (lock) { + apr_thread_mutex_lock(m->lock); + --m->refs; if (m->join_wait) { apr_thread_cond_signal(m->join_wait); } - if (lock) { - apr_thread_mutex_unlock(m->lock); - } + apr_thread_mutex_unlock(m->lock); + } + else { + --m->refs; } } void h2_mplx_reference(h2_mplx *m) { - reference(m); + apr_thread_mutex_lock(m->lock); + ++m->refs; + apr_thread_mutex_unlock(m->lock); } + void h2_mplx_release(h2_mplx *m) { release(m, 1); @@ -196,16 +206,16 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { release(m, 0); - while (apr_atomic_read32(&m->refs) > 0) { + while (m->refs > 0) { m->join_wait = wait; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, "h2_mplx(%ld): release_join, refs=%d, waiting...", m->id, m->refs); apr_thread_cond_wait(wait, m->lock); } - m->join_wait = NULL; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, "h2_mplx(%ld): release_join -> destroy", m->id); + m->pool = NULL; apr_thread_mutex_unlock(m->lock); h2_mplx_destroy(m); } @@ -319,13 +329,15 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { io->input_arrived = iowait; - status = h2_io_in_read(io, bb, 0); + H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_pre"); + status = h2_io_in_read(io, bb, -1); while (APR_STATUS_IS_EAGAIN(status) && !is_aborted(m, &status) && block == APR_BLOCK_READ) { apr_thread_cond_wait(io->input_arrived, m->lock); - status = h2_io_in_read(io, bb, 0); + status = h2_io_in_read(io, bb, -1); } + H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_read_post"); io->input_arrived = NULL; } else { @@ -348,7 +360,9 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, if (APR_SUCCESS == status) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { + H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre"); status = h2_io_in_write(io, bb); + H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post"); if (io->input_arrived) { apr_thread_cond_signal(io->input_arrived); } @@ -373,6 +387,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id) h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { status = h2_io_in_close(io); + H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_close"); if (io->input_arrived) { apr_thread_cond_signal(io->input_arrived); } @@ -429,13 +444,6 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m, return status; } -#define H2_MPLX_IO_OUT(lvl,m,io,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ - h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \ - } while(0) - - apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id, h2_io_data_cb *cb, void *ctx, apr_off_t *plen, int *peos) @@ -515,7 +523,9 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams) } else { AP_DEBUG_ASSERT(io->response); + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_pre"); h2_stream_set_response(stream, io->response, io->bbout); + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_next_submit_post"); } } @@ -879,6 +889,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, "h2_mplx(%ld-%d): process", m->c->id, stream_id); + H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_process"); apr_thread_mutex_unlock(m->lock); } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 60aa74d535..5c950b9c27 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -56,7 +56,7 @@ typedef struct h2_mplx h2_mplx; struct h2_mplx { long id; APR_RING_ENTRY(h2_mplx) link; - volatile apr_uint32_t refs; + volatile int refs; conn_rec *c; apr_pool_t *pool; apr_bucket_alloc_t *bucket_alloc; diff --git a/modules/http2/h2_request.c b/modules/http2/h2_request.c index 2d8dcda27c..2a697a0eda 100644 --- a/modules/http2/h2_request.c +++ b/modules/http2/h2_request.c @@ -249,7 +249,6 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos) req->id, s); return APR_EINVAL; } - req->chunked = 0; } else { /* no content-length given */ @@ -266,16 +265,72 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos) /* If we have a content-type, but already see eos, no more * data will come. Signal a zero content length explicitly. */ - req->chunked = 0; apr_table_setn(req->headers, "Content-Length", "0"); } } req->eoh = 1; + /* In the presence of trailers, force behaviour of chunked encoding */ + s = apr_table_get(req->headers, "Trailer"); + if (s && s[0]) { + req->trailers = apr_table_make(pool, 5); + if (!req->chunked) { + req->chunked = 1; + apr_table_mergen(req->headers, "Transfer-Encoding", "chunked"); + } + } + return APR_SUCCESS; } +static apr_status_t add_h1_trailer(h2_request *req, apr_pool_t *pool, + const char *name, size_t nlen, + const char *value, size_t vlen) +{ + char *hname, *hvalue; + + if (H2_HD_MATCH_LIT("expect", name, nlen) + || H2_HD_MATCH_LIT("upgrade", name, nlen) + || H2_HD_MATCH_LIT("connection", name, nlen) + || H2_HD_MATCH_LIT("host", name, nlen) + || H2_HD_MATCH_LIT("proxy-connection", name, nlen) + || H2_HD_MATCH_LIT("transfer-encoding", name, nlen) + || H2_HD_MATCH_LIT("keep-alive", name, nlen) + || H2_HD_MATCH_LIT("http2-settings", name, nlen)) { + /* ignore these. */ + return APR_SUCCESS; + } + + hname = apr_pstrndup(pool, name, nlen); + hvalue = apr_pstrndup(pool, value, vlen); + h2_util_camel_case_header(hname, nlen); + + apr_table_mergen(req->trailers, hname, hvalue); + + return APR_SUCCESS; +} + + +apr_status_t h2_request_add_trailer(h2_request *req, apr_pool_t *pool, + const char *name, size_t nlen, + const char *value, size_t vlen) +{ + if (!req->trailers) { + ap_log_perror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, pool, + "h2_request(%d): unanounced trailers", + req->id); + return APR_EINVAL; + } + if (nlen == 0 || name[0] == ':') { + ap_log_perror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, pool, + "h2_request(%d): pseudo header in trailer", + req->id); + return APR_EINVAL; + } + return add_h1_trailer(req, pool, name, nlen, value, vlen); +} + #define OPT_COPY(p, s) ((s)? apr_pstrdup(p, s) : NULL) void h2_request_copy(apr_pool_t *p, h2_request *dst, const h2_request *src) diff --git a/modules/http2/h2_request.h b/modules/http2/h2_request.h index a3a8f8ead4..19005a88e6 100644 --- a/modules/http2/h2_request.h +++ b/modules/http2/h2_request.h @@ -36,6 +36,7 @@ struct h2_request { const char *path; apr_table_t *headers; + apr_table_t *trailers; apr_off_t content_length; int chunked; @@ -57,6 +58,10 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, const char *name, size_t nlen, const char *value, size_t vlen); +apr_status_t h2_request_add_trailer(h2_request *req, apr_pool_t *pool, + const char *name, size_t nlen, + const char *value, size_t vlen); + apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos); void h2_request_copy(apr_pool_t *p, h2_request *dst, const h2_request *src); diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 5a385bf19d..50c704d3df 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -142,8 +142,8 @@ static int stream_pri_cmp(int sid1, int sid2, void *ctx) return spri_cmp(sid1, s1, sid2, s2, session); } -static apr_status_t stream_end_headers(h2_session *session, - h2_stream *stream, int eos) +static apr_status_t stream_schedule(h2_session *session, + h2_stream *stream, int eos) { (void)session; return h2_stream_schedule(stream, eos, stream_pri_cmp, session); @@ -198,19 +198,19 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *userp) { - int rv; h2_session *session = (h2_session *)userp; + apr_status_t status = APR_SUCCESS; h2_stream * stream; - apr_status_t status; + int rv; (void)flags; if (session->aborted) { return NGHTTP2_ERR_CALLBACK_FAILURE; } + stream = h2_session_get_stream(session, stream_id); if (!stream) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, - APLOGNO(02919) + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_session: stream(%ld-%d): on_data_chunk for unknown stream", session->id, (int)stream_id); rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id, @@ -223,8 +223,8 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, status = h2_stream_write_data(stream, (const char *)data, len); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, - "h2_stream(%ld-%d): written DATA, length %d", - session->id, stream_id, (int)len); + "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes", + session->id, stream_id, (long)len); if (status != APR_SUCCESS) { rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id, H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR)); @@ -274,24 +274,25 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, if (stream) { stream_release(session, stream, error_code); } - - if (error_code) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_stream(%ld-%d): closed, error=%d", - session->id, (int)stream_id, error_code); - } - return 0; } static int on_begin_headers_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, void *userp) { + h2_session *session = (h2_session *)userp; h2_stream *s; - /* This starts a new stream. */ + /* We may see HEADERs at the start of a stream or after all DATA + * streams to carry trailers. */ (void)ngh2; - s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id); + s = h2_session_get_stream(session, frame->hd.stream_id); + if (s) { + /* nop */ + } + else { + s = h2_session_open_stream((h2_session *)userp, frame->hd.stream_id); + } return s? 0 : NGHTTP2_ERR_CALLBACK_FAILURE; } @@ -347,16 +348,34 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_session(%ld): on_frame_rcv #%ld, type=%d", session->id, + "h2_stream(%ld-%d): on_frame_rcv #%ld, type=%d", + session->id, frame->hd.stream_id, (long)session->frames_received, frame->hd.type); ++session->frames_received; switch (frame->hd.type) { case NGHTTP2_HEADERS: + /* This can be HEADERS for a new stream, defining the request, + * or HEADER may come after DATA at the end of a stream as in + * trailers */ stream = h2_session_get_stream(session, frame->hd.stream_id); if (stream) { int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM); - status = stream_end_headers(session, stream, eos); + + if (h2_stream_is_scheduled(stream)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_stream(%ld-%d): TRAILER, eos=%d", + session->id, frame->hd.stream_id, eos); + if (eos) { + status = h2_stream_close_input(stream); + } + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_stream(%ld-%d): HEADER, eos=%d", + session->id, frame->hd.stream_id, eos); + status = stream_schedule(session, stream, eos); + } } else { status = APR_EINVAL; @@ -366,6 +385,10 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, stream = h2_session_get_stream(session, frame->hd.stream_id); if (stream) { int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_stream(%ld-%d): DATA, len=%ld, eos=%d", + session->id, frame->hd.stream_id, + (long)frame->hd.length, eos); if (eos) { status = h2_stream_close_input(stream); } @@ -672,10 +695,6 @@ static void h2_session_cleanup(h2_session *session) * our buffers or passed down output filters. * h2 streams might still being written out. */ - if (session->mplx) { - h2_mplx_release_and_join(session->mplx, session->iowait); - session->mplx = NULL; - } if (session->ngh2) { nghttp2_session_del(session->ngh2); session->ngh2 = NULL; @@ -684,6 +703,10 @@ static void h2_session_cleanup(h2_session *session) apr_pool_destroy(session->spare); session->spare = NULL; } + if (session->mplx) { + h2_mplx_release_and_join(session->mplx, session->iowait); + session->mplx = NULL; + } } void h2_session_destroy(h2_session *session) @@ -843,7 +866,7 @@ apr_status_t h2_session_start(h2_session *session, int *rv) if (status != APR_SUCCESS) { return status; } - status = stream_end_headers(session, stream, 1); + status = stream_schedule(session, stream, 1); if (status != APR_SUCCESS) { return status; } @@ -1173,7 +1196,7 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is, stream = h2_session_open_stream(session, nid); if (stream) { h2_stream_set_h2_request(stream, is->id, push->req); - status = stream_end_headers(session, stream, 1); + status = stream_schedule(session, stream, 1); if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, "h2_stream(%ld-%d): scheduling push stream", diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index fd3ef35845..594175a785 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -39,6 +39,18 @@ #include "h2_util.h" +#define H2_STREAM_OUT(lvl,s,msg) \ + do { \ + if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \ + h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbout); \ + } while(0) +#define H2_STREAM_IN(lvl,s,msg) \ + do { \ + if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \ + h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \ + } while(0) + + static int state_transition[][7] = { /* ID OP RL RR CI CO CL */ /*ID*/{ 1, 0, 0, 0, 0, 0, 0 }, @@ -198,6 +210,9 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, { apr_status_t status = APR_SUCCESS; if (!output_open(stream)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c, + "h2_stream(%ld-%d): output closed", + stream->session->id, stream->id); return APR_ECONNRESET; } @@ -207,18 +222,15 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, /* we can move file handles from h2_mplx into this h2_stream as many * as we want, since the lifetimes are the same and we are not freeing * the ones in h2_mplx->io before this stream is done. */ - status = h2_util_move(stream->bbout, bb, 16 * 1024, &move_all, + H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream set_response_pre"); + status = h2_util_move(stream->bbout, bb, -1, &move_all, "h2_stream_set_response"); + H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream set_response_post"); } - if (APLOGctrace1(stream->session->c)) { - apr_off_t len = 0; - int eos = 0; - h2_util_bb_avail(stream->bbout, &len, &eos); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c, - "h2_stream(%ld-%d): set_response(%d), len=%ld, eos=%d", - stream->session->id, stream->id, response->http_status, - (long)len, (int)eos); - } + + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c, + "h2_stream(%ld-%d): set_response(%d)", + stream->session->id, stream->id, response->http_status); return status; } @@ -247,11 +259,17 @@ apr_status_t h2_stream_add_header(h2_stream *stream, const char *value, size_t vlen) { AP_DEBUG_ASSERT(stream); - if (!input_open(stream)) { - return APR_ECONNRESET; + if (h2_stream_is_scheduled(stream)) { + return h2_request_add_trailer(stream->request, stream->pool, + name, nlen, value, vlen); + } + else { + if (!input_open(stream)) { + return APR_ECONNRESET; + } + return h2_request_add_header(stream->request, stream->pool, + name, nlen, value, vlen); } - return h2_request_add_header(stream->request, stream->pool, - name, nlen, value, vlen); } apr_status_t h2_stream_schedule(h2_stream *stream, int eos, @@ -265,6 +283,9 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, if (!output_open(stream)) { return APR_ECONNRESET; } + if (stream->scheduled) { + return APR_EINVAL; + } if (eos) { close_input(stream); } @@ -282,6 +303,7 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, status = h2_mplx_process(stream->session->mplx, stream->id, stream->request, eos, cmp, ctx); + stream->scheduled = 1; } else { h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); @@ -296,6 +318,11 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, return status; } +int h2_stream_is_scheduled(h2_stream *stream) +{ + return stream->scheduled; +} + static apr_status_t h2_stream_input_flush(h2_stream *stream) { apr_status_t status = APR_SUCCESS; @@ -318,18 +345,26 @@ static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx) } static apr_status_t input_add_data(h2_stream *stream, - const char *data, size_t len) + const char *data, size_t len, int chunked) { apr_status_t status = APR_SUCCESS; - - status = apr_brigade_write(stream->bbin, input_flush, stream, data, len); - if (status == APR_SUCCESS) { - status = h2_stream_input_flush(stream); + + if (chunked) { + status = apr_brigade_printf(stream->bbin, input_flush, stream, + "%lx\r\n", (unsigned long)len); + if (status == APR_SUCCESS) { + status = apr_brigade_write(stream->bbin, input_flush, stream, data, len); + if (status == APR_SUCCESS) { + status = apr_brigade_puts(stream->bbin, input_flush, stream, "\r\n"); + } + } + } + else { + status = apr_brigade_write(stream->bbin, input_flush, stream, data, len); } return status; } - apr_status_t h2_stream_close_input(h2_stream *stream) { apr_status_t status = APR_SUCCESS; @@ -342,9 +377,11 @@ apr_status_t h2_stream_close_input(h2_stream *stream) if (stream->rst_error) { return APR_ECONNRESET; } + + H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre"); if (close_input(stream) && stream->bbin) { if (stream->request->chunked) { - status = input_add_data(stream, "0\r\n\r\n", 5); + status = input_add_data(stream, "0\r\n\r\n", 5, 0); } if (status == APR_SUCCESS) { @@ -354,16 +391,21 @@ apr_status_t h2_stream_close_input(h2_stream *stream) status = h2_mplx_in_close(stream->session->mplx, stream->id); } } + H2_STREAM_IN(APLOG_TRACE2, stream, "close_post"); return status; } apr_status_t h2_stream_write_data(h2_stream *stream, const char *data, size_t len) { - apr_status_t status; + apr_status_t status = APR_SUCCESS; AP_DEBUG_ASSERT(stream); if (input_closed(stream) || !stream->request->eoh || !stream->bbin) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c, + "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d", + stream->session->id, stream->id, input_closed(stream), + stream->request->eoh, !!stream->bbin); return APR_EINVAL; } @@ -371,19 +413,12 @@ apr_status_t h2_stream_write_data(h2_stream *stream, "h2_stream(%ld-%d): add %ld input bytes", stream->session->id, stream->id, (long)len); + H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre"); if (stream->request->chunked) { /* if input may have a body and we have not seen any * content-length header, we need to chunk the input data. */ - status = apr_brigade_printf(stream->bbin, NULL, NULL, - "%lx\r\n", (unsigned long)len); - if (status == APR_SUCCESS) { - status = input_add_data(stream, data, len); - if (status == APR_SUCCESS) { - status = apr_brigade_puts(stream->bbin, NULL, NULL, "\r\n"); - } - } - return status; + status = input_add_data(stream, data, len, 1); } else { stream->input_remaining -= len; @@ -398,8 +433,13 @@ apr_status_t h2_stream_write_data(h2_stream *stream, h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR); return APR_ECONNABORTED; } - return input_add_data(stream, data, len); + status = input_add_data(stream, data, len, 0); } + if (status == APR_SUCCESS) { + status = h2_stream_input_flush(stream); + } + H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post"); + return status; } apr_status_t h2_stream_prep_read(h2_stream *stream, @@ -407,15 +447,17 @@ apr_status_t h2_stream_prep_read(h2_stream *stream, { apr_status_t status = APR_SUCCESS; const char *src; + int test_read = (*plen == 0); if (stream->rst_error) { return APR_ECONNRESET; } + H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_read_pre"); if (!APR_BRIGADE_EMPTY(stream->bbout)) { src = "stream"; status = h2_util_bb_avail(stream->bbout, plen, peos); - if (status == APR_SUCCESS && !*peos && !*plen) { + if (!test_read && status == APR_SUCCESS && !*peos && !*plen) { apr_brigade_cleanup(stream->bbout); return h2_stream_prep_read(stream, plen, peos); } @@ -425,9 +467,10 @@ apr_status_t h2_stream_prep_read(h2_stream *stream, status = h2_mplx_out_readx(stream->session->mplx, stream->id, NULL, NULL, plen, peos); } - if (status == APR_SUCCESS && !*peos && !*plen) { + if (!test_read && status == APR_SUCCESS && !*peos && !*plen) { status = APR_EAGAIN; } + H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_read_post"); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d", stream->session->id, stream->id, src, (long)*plen, *peos); @@ -441,6 +484,7 @@ apr_status_t h2_stream_readx(h2_stream *stream, apr_status_t status = APR_SUCCESS; const char *src; + H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_pre"); if (stream->rst_error) { return APR_ECONNRESET; } @@ -466,9 +510,12 @@ apr_status_t h2_stream_readx(h2_stream *stream, status = APR_EAGAIN; } + H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_readx_post"); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, "h2_stream(%ld-%d): readx %s, len=%ld eos=%d", stream->session->id, stream->id, src, (long)*plen, *peos); + H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_post"); + return status; } @@ -477,6 +524,7 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, { apr_status_t status = APR_SUCCESS; + H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream read_to_pre"); if (stream->rst_error) { return APR_ECONNRESET; } @@ -500,6 +548,7 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, if (status == APR_SUCCESS && !*peos && !*plen) { status = APR_EAGAIN; } + H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream read_to_post"); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, "h2_stream(%ld-%d): read_to, len=%ld eos=%d", stream->session->id, stream->id, (long)*plen, *peos); diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 225f86b2a4..e5990a2bf3 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -61,7 +61,7 @@ struct h2_stream { int aborted; /* was aborted */ int suspended; /* DATA sending has been suspended */ int rst_error; /* stream error for RST_STREAM */ - int req_eoh; /* request HEADERs have been received */ + int scheduled; /* stream has been scheduled */ int submitted; /* response HEADER has been sent */ apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */ @@ -135,7 +135,8 @@ void h2_stream_set_h2_request(h2_stream *stream, int initiated_on, const struct h2_request *req); /* - * Add a HTTP/2 header (including pseudo headers) to the given stream. + * Add a HTTP/2 header (including pseudo headers) or trailer + * to the given stream, depending on stream state. * * @param stream stream to write the header to * @param name the name of the HTTP/2 header @@ -185,6 +186,13 @@ void h2_stream_rst(h2_stream *streamm, int error_code); apr_status_t h2_stream_schedule(h2_stream *stream, int eos, h2_stream_pri_cmp *cmp, void *ctx); +/** + * Determine if stream has been scheduled already. + * @param stream the stream to check on + * @return != 0 iff stream has been scheduled + */ +int h2_stream_is_scheduled(h2_stream *stream); + /** * Set the response for this stream. Invoked when all meta data for * the stream response has been collected. diff --git a/modules/http2/h2_task_input.c b/modules/http2/h2_task_input.c index 09572b9a34..49be7cfd22 100644 --- a/modules/http2/h2_task_input.c +++ b/modules/http2/h2_task_input.c @@ -145,7 +145,7 @@ apr_status_t h2_task_input_read(h2_task_input *input, return status; } if ((bblen == 0) && (block == APR_NONBLOCK_READ)) { - return h2_util_has_eos(input->bb, 0)? APR_EOF : APR_EAGAIN; + return h2_util_has_eos(input->bb, -1)? APR_EOF : APR_EAGAIN; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_input(%s): mplx in read, %ld bytes in brigade", diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 963d9ae354..e80a026880 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -229,7 +229,7 @@ static apr_status_t last_not_included(apr_bucket_brigade *bb, apr_status_t status = APR_SUCCESS; int files_allowed = pfile_buckets_allowed? *pfile_buckets_allowed : 0; - if (maxlen > 0) { + if (maxlen >= 0) { /* Find the bucket, up to which we reach maxlen/mem bytes */ for (b = APR_BRIGADE_FIRST(bb); (b != APR_BRIGADE_SENTINEL(bb)); @@ -555,24 +555,17 @@ apr_status_t h2_util_bb_avail(apr_bucket_brigade *bb, else if (blen == 0) { /* empty brigade, does it have an EOS bucket somwhere? */ *plen = 0; - *peos = h2_util_has_eos(bb, 0); + *peos = h2_util_has_eos(bb, -1); } - else if (blen > 0) { + else { /* data in the brigade, limit the length returned. Check for EOS * bucket only if we indicate data. This is required since plen == 0 * means "the whole brigade" for h2_util_hash_eos() */ - if (blen < (apr_off_t)*plen) { + if (blen < *plen || *plen < 0) { *plen = blen; } - *peos = (*plen > 0)? h2_util_has_eos(bb, *plen) : 0; - } - else if (blen < 0) { - /* famous SHOULD NOT HAPPEN, sinc we told apr_brigade_length to readall - */ - *plen = 0; - *peos = h2_util_has_eos(bb, 0); - return APR_EINVAL; + *peos = h2_util_has_eos(bb, *plen); } return APR_SUCCESS; }