1
0
mirror of https://github.com/apache/httpd.git synced 2026-01-06 09:01:14 +03:00

On the trunk:

mod_http2: rework of stream states and cleanup handling.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1782875 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Stefan Eissing
2017-02-13 21:00:30 +00:00
parent e775ad73d9
commit ba655809af
16 changed files with 1246 additions and 1286 deletions

View File

@@ -1,6 +1,11 @@
-*- coding: utf-8 -*-
Changes with Apache 2.5.0
*) mod_http2: http/2 streams now with state handling/transitions as defined
in RFC7540. Stream cleanup/connection shutdown reworked to become easier
to understand/maintain/debug. Added many asserts on state and cleanup
transitions. [Stefan Eissing]
*) mod_proxy_fcgi: Add ProxyFCGISetEnvIf to fixup CGI environment
variables just before invoking the FastCGI. [Eric Covener,
Jacob Champion]

View File

@@ -80,22 +80,13 @@ typedef enum {
H2_PUSH_FAST_LOAD,
} h2_push_policy;
typedef enum {
H2_STREAM_ST_IDLE,
H2_STREAM_ST_OPEN,
H2_STREAM_ST_RESV_LOCAL,
H2_STREAM_ST_RESV_REMOTE,
H2_STREAM_ST_CLOSED_INPUT,
H2_STREAM_ST_CLOSED_OUTPUT,
H2_STREAM_ST_CLOSED,
} h2_stream_state_t;
typedef enum {
H2_SESSION_ST_INIT, /* send initial SETTINGS, etc. */
H2_SESSION_ST_DONE, /* finished, connection close */
H2_SESSION_ST_IDLE, /* nothing to write, expecting data inc */
H2_SESSION_ST_BUSY, /* read/write without stop */
H2_SESSION_ST_WAIT, /* waiting for tasks reporting back */
H2_SESSION_ST_CLEANUP, /* pool is being cleaned up */
} h2_session_state;
typedef struct h2_session_props {
@@ -108,6 +99,25 @@ typedef struct h2_session_props {
unsigned int shutdown : 1; /* if the final GOAWAY has been sent */
} h2_session_props;
typedef enum h2_stream_state_t {
H2_SS_IDLE,
H2_SS_RSVD_R,
H2_SS_RSVD_L,
H2_SS_OPEN,
H2_SS_CLOSED_R,
H2_SS_CLOSED_L,
H2_SS_CLOSED,
H2_SS_CLEANUP,
H2_SS_MAX
} h2_stream_state_t;
typedef enum {
H2_SEV_CLOSED_L,
H2_SEV_CLOSED_R,
H2_SEV_CANCELLED,
H2_SEV_EOS_SENT
} h2_stream_event_t;
/* h2_request is the transformer of HTTP2 streams into HTTP/1.1 internal
* format that will be fed to various httpd input filters to finally

View File

@@ -191,9 +191,9 @@ static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam,
}
apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
const char *tag, const char *sep,
h2_blist *bl)
static apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
const char *tag, const char *sep,
h2_blist *bl)
{
apr_size_t off = 0;
const char *sp = "";
@@ -203,13 +203,15 @@ apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
memset(buffer, 0, bmax--);
off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
for (b = H2_BLIST_FIRST(bl);
bmax && (b != H2_BLIST_SENTINEL(bl));
(bmax > off) && (b != H2_BLIST_SENTINEL(bl));
b = APR_BUCKET_NEXT(b)) {
off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
sp = " ";
}
off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
if (bmax > off) {
off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
}
}
else {
off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
@@ -426,8 +428,10 @@ static apr_status_t beam_close(h2_bucket_beam *beam)
return APR_SUCCESS;
}
static void beam_set_send_pool(h2_bucket_beam *beam, apr_pool_t *pool);
static void beam_set_recv_pool(h2_bucket_beam *beam, apr_pool_t *pool);
int h2_beam_is_closed(h2_bucket_beam *beam)
{
return beam->closed;
}
static int pool_register(h2_bucket_beam *beam, apr_pool_t *pool,
apr_status_t (*cleanup)(void *))
@@ -457,20 +461,6 @@ static apr_status_t beam_recv_cleanup(void *data)
return APR_SUCCESS;
}
static void beam_set_recv_pool(h2_bucket_beam *beam, apr_pool_t *pool)
{
if (beam->recv_pool == pool ||
(beam->recv_pool && pool
&& apr_pool_is_ancestor(beam->recv_pool, pool))) {
/* when receiver same or sub-pool of existing, stick
* to the the pool we already have. */
return;
}
pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
beam->recv_pool = pool;
pool_register(beam, beam->recv_pool, beam_recv_cleanup);
}
static apr_status_t beam_send_cleanup(void *data)
{
h2_bucket_beam *beam = data;
@@ -683,6 +673,21 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
}
apr_status_t h2_beam_leave(h2_bucket_beam *beam)
{
h2_beam_lock bl;
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
apr_brigade_cleanup(beam->recv_buffer);
}
beam->aborted = 1;
beam_close(beam);
leave_yellow(beam, &bl);
}
return APR_SUCCESS;
}
apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
{
apr_status_t status;
@@ -919,7 +924,6 @@ transfer:
}
/* transfer enough buckets from our receiver brigade, if we have one */
beam_set_recv_pool(beam, bb->p);
while (beam->recv_buffer
&& !APR_BRIGADE_EMPTY(beam->recv_buffer)
&& (readbytes <= 0 || remain >= 0)) {
@@ -1229,3 +1233,29 @@ int h2_beam_report_consumption(h2_bucket_beam *beam)
return 0;
}
void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
{
if (0 && beam && APLOG_C_IS_LEVEL(c,level)) {
char buffer[2048];
apr_size_t blen = sizeof(buffer)/sizeof(buffer[0]) - 1;
apr_size_t off = 0;
buffer[0] = 0;
off += apr_snprintf(buffer+off, blen-off, "cl=%d, ", beam->closed);
off += h2_util_bl_print(buffer+off, blen-off, "to_send", ", ", &beam->send_list);
if (blen > off) {
off += h2_util_bb_print(buffer+off, blen-off, "recv_buffer", ", ", beam->recv_buffer);
if (blen > off) {
off += h2_util_bl_print(buffer+off, blen-off, "hold", ", ", &beam->hold_list);
if (blen > off) {
off += h2_util_bl_print(buffer+off, blen-off, "purge", "", &beam->purge_list);
}
}
}
buffer[blen-1] = 0;
ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d,%s): %s %s",
c->id, beam->id, beam->tag, msg, buffer);
}
}

View File

@@ -51,19 +51,6 @@ typedef struct {
APR_RING_PREPEND(&(a)->list, &(b)->list, apr_bucket, link); \
} while (0)
/**
* Print the buckets in the list into the buffer (type and lengths).
* @param buffer the buffer to print into
* @param bmax max number of characters to place in buffer, incl. trailing 0
* @param tag tag string for this bucket list
* @param sep separator to use
* @param bl the bucket list to print
* @return number of characters printed
*/
apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
const char *tag, const char *sep,
h2_blist *bl);
/*******************************************************************************
* h2_bucket_beam
******************************************************************************/
@@ -305,6 +292,16 @@ void h2_beam_abort(h2_bucket_beam *beam);
*/
apr_status_t h2_beam_close(h2_bucket_beam *beam);
/**
* Receives leaves the beam, e.g. will no longer read. This will
* interrupt any sender blocked writing and fail future send.
*
* Call from the receiver side only.
*/
apr_status_t h2_beam_leave(h2_bucket_beam *beam);
int h2_beam_is_closed(h2_bucket_beam *beam);
/**
* Return APR_SUCCESS when all buckets in transit have been handled.
* When called with APR_BLOCK_READ and a mutex set, will wait until the green
@@ -401,4 +398,6 @@ typedef apr_bucket *h2_bucket_beamer(h2_bucket_beam *beam,
void h2_register_bucket_beamer(h2_bucket_beamer *beamer);
void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg);
#endif /* h2_bucket_beam_h */

View File

@@ -95,7 +95,7 @@ static void bucket_destroy(void *data)
}
apr_bucket_free(h);
if (stream) {
h2_stream_eos_destroy(stream);
h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -72,13 +72,13 @@ struct h2_mplx {
unsigned int need_registration : 1;
struct h2_ihash_t *streams; /* all streams currently processing */
struct h2_ihash_t *sredo; /* all streams that need to be re-started */
struct h2_ihash_t *shold; /* all streams done with task ongoing */
struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
struct h2_iqueue *q; /* all stream ids that need to be started */
struct h2_iqueue *readyq; /* all stream ids ready for output */
struct h2_ihash_t *tasks; /* all tasks started and not destroyed */
struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */
int max_streams; /* max # of concurrent streams */
@@ -137,13 +137,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *master,
* @param m the mplx to be released and destroyed
* @param wait condition var to wait on for ref counter == 0
*/
apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait);
/**
* Aborts the multiplexer. It will answer all future invocation with
* APR_ECONNABORTED, leading to early termination of ongoing streams.
*/
void h2_mplx_abort(h2_mplx *mplx);
void h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait);
struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
@@ -165,14 +159,13 @@ int h2_mplx_is_busy(h2_mplx *m);
struct h2_stream *h2_mplx_stream_get(h2_mplx *m, int id);
/**
* Notifies mplx that a stream has finished processing.
* Notifies mplx that a stream has been completely handled on the main
* connection and is ready for cleanup.
*
* @param m the mplx itself
* @param stream the id of the stream being done
* @param rst_error if != 0, the stream was reset with the error given
*
* @param stream the stream ready for cleanup
*/
apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream);
apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, struct h2_stream *stream);
/**
* Waits on output data from any stream in this session to become available.

View File

@@ -40,7 +40,7 @@ typedef struct h2_proxy_stream {
const char *p_server_uri;
int standalone;
h2_stream_state_t state;
h2_proxy_stream_state_t state;
unsigned int suspended : 1;
unsigned int waiting_on_100 : 1;
unsigned int waiting_on_ping : 1;

View File

@@ -23,6 +23,16 @@
struct h2_proxy_iqueue;
struct h2_proxy_ihash_t;
typedef enum {
H2_STREAM_ST_IDLE,
H2_STREAM_ST_OPEN,
H2_STREAM_ST_RESV_LOCAL,
H2_STREAM_ST_RESV_REMOTE,
H2_STREAM_ST_CLOSED_INPUT,
H2_STREAM_ST_CLOSED_OUTPUT,
H2_STREAM_ST_CLOSED,
} h2_proxy_stream_state_t;
typedef enum {
H2_PROXYS_ST_INIT, /* send initial SETTINGS, etc. */
H2_PROXYS_ST_DONE, /* finished, connection close */

View File

@@ -48,6 +48,11 @@
static apr_status_t dispatch_master(h2_session *session);
static apr_status_t h2_session_read(h2_session *session, int block);
static void transit(h2_session *session, const char *action,
h2_session_state nstate);
static void on_stream_state_enter(void *ctx, h2_stream *stream);
static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev);
static int h2_session_status_from_apr_status(apr_status_t rv)
{
@@ -79,58 +84,30 @@ static apr_status_t h2_session_receive(void *ctx,
static void dispatch_event(h2_session *session, h2_session_event_t ev,
int err, const char *msg);
apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
static int rst_unprocessed_stream(h2_stream *stream, void *ctx)
{
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): EOS bucket cleanup -> done",
session->id, stream->id);
h2_mplx_stream_done(session->mplx, stream);
dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
return APR_SUCCESS;
}
typedef struct stream_sel_ctx {
h2_session *session;
h2_stream *candidate;
} stream_sel_ctx;
static int find_unprocessed_stream(h2_stream *stream, void *ictx)
{
stream_sel_ctx *ctx = ictx;
if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
if (!ctx->session->local.accepting
&& stream->id > ctx->session->local.accepted_max) {
ctx->candidate = stream;
return 0;
}
}
else {
if (!ctx->session->remote.accepting
&& stream->id > ctx->session->remote.accepted_max) {
ctx->candidate = stream;
return 0;
}
int unprocessed = (!h2_stream_was_closed(stream)
&& (H2_STREAM_CLIENT_INITIATED(stream->id)?
(!stream->session->local.accepting
&& stream->id > stream->session->local.accepted_max)
:
(!stream->session->remote.accepting
&& stream->id > stream->session->remote.accepted_max))
);
if (unprocessed) {
h2_stream_rst(stream, H2_ERR_NO_ERROR);
return 0;
}
return 1;
}
static void cleanup_unprocessed_streams(h2_session *session)
{
stream_sel_ctx ctx;
ctx.session = session;
while (1) {
ctx.candidate = NULL;
h2_mplx_stream_do(session->mplx, find_unprocessed_stream, &ctx);
if (!ctx.candidate) {
break;
}
h2_session_stream_done(session, ctx.candidate);
}
h2_mplx_stream_do(session->mplx, rst_unprocessed_stream, session);
}
h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
int initiated_on, const h2_request *req)
static h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
int initiated_on)
{
h2_stream * stream;
apr_pool_t *stream_pool;
@@ -138,29 +115,11 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
apr_pool_create(&stream_pool, session->pool);
apr_pool_tag(stream_pool, "h2_stream");
stream = h2_stream_open(stream_id, stream_pool, session,
initiated_on);
nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
if (req) {
h2_stream_set_request(stream, req);
stream = h2_stream_create(stream_id, stream_pool, session,
session->monitor, initiated_on);
if (stream) {
nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
}
if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
if (stream_id > session->remote.emitted_max) {
++session->remote.emitted_count;
session->remote.emitted_max = stream->id;
session->local.accepted_max = stream->id;
}
}
else {
if (stream_id > session->local.emitted_max) {
++session->local.emitted_count;
session->remote.emitted_max = stream->id;
}
}
dispatch_event(session, H2_SESSION_EV_STREAM_OPEN, 0, NULL);
return stream;
}
@@ -217,14 +176,6 @@ static int stream_pri_cmp(int sid1, int sid2, void *ctx)
return spri_cmp(sid1, s1, sid2, s2, session);
}
static apr_status_t stream_schedule(h2_session *session,
h2_stream *stream, int eos)
{
(void)session;
return h2_stream_schedule(stream, eos, h2_session_push_enabled(session),
stream_pri_cmp, session);
}
/*
* Callback when nghttp2 wants to send bytes back to the client.
*/
@@ -234,9 +185,9 @@ static ssize_t send_cb(nghttp2_session *ngh2,
{
h2_session *session = (h2_session *)userp;
apr_status_t status;
(void)ngh2;
(void)flags;
status = h2_conn_io_write(&session->io, (const char *)data, length);
if (status == APR_SUCCESS) {
return length;
@@ -278,76 +229,26 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
const uint8_t *data, size_t len, void *userp)
{
h2_session *session = (h2_session *)userp;
apr_status_t status = APR_SUCCESS;
apr_status_t status = APR_EINVAL;
h2_stream * stream;
int rv;
int rv = 0;
(void)flags;
stream = get_stream(session, stream_id);
if (!stream) {
if (stream) {
status = h2_stream_recv_DATA(stream, flags, data, len);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
"h2_stream(%ld-%d): on_data_chunk for unknown stream",
session->id, (int)stream_id);
rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
NGHTTP2_INTERNAL_ERROR);
if (nghttp2_is_fatal(rv)) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return 0;
rv = NGHTTP2_ERR_CALLBACK_FAILURE;
}
/* FIXME: enabling setting EOS this way seems to break input handling
* in mod_proxy_http2. why? */
status = h2_stream_write_data(stream, (const char *)data, len,
0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
session->id, stream_id, (long)len);
if (status != APR_SUCCESS) {
update_window(session, stream_id, len);
rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
if (nghttp2_is_fatal(rv)) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
}
return 0;
}
static apr_status_t stream_closed(h2_session *session,
h2_stream *stream,
uint32_t error_code)
{
conn_rec *c = session->c;
apr_bucket *b;
apr_status_t status;
if (!error_code) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): handled, closing",
session->id, (int)stream->id);
if (H2_STREAM_CLIENT_INITIATED(stream->id)
&& stream->id > session->local.completed_max) {
session->local.completed_max = stream->id;
}
if (status != APR_SUCCESS) {
/* count this as consumed explicitly as no one will read it */
nghttp2_session_consume(session->ngh2, stream_id, len);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(03065)
"h2_stream(%ld-%d): closing with err=%d %s",
session->id, (int)stream->id, (int)error_code,
h2_h2_err_description(error_code));
h2_stream_rst(stream, error_code);
}
/* The stream might have data in the buffers of the main connection.
* We can only free the allocated resources once all had been written.
* Send a special buckets on the connection that gets destroyed when
* all preceding data has been handled. On its destruction, it is safe
* to purge all resources of the stream. */
b = h2_bucket_eos_create(c->bucket_alloc, stream);
APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
status = h2_conn_io_pass(&session->io, session->bbtmp);
apr_brigade_cleanup(session->bbtmp);
return status;
return rv;
}
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -359,7 +260,12 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
(void)ngh2;
stream = get_stream(session, stream_id);
if (stream) {
stream_closed(session, stream, error_code);
if (error_code) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03065)
H2_STREAM_MSG(stream, "closing with err=%d %s"),
(int)error_code, h2_h2_err_description(error_code));
h2_stream_rst(stream, error_code);
}
}
return 0;
}
@@ -378,7 +284,7 @@ static int on_begin_headers_cb(nghttp2_session *ngh2,
/* nop */
}
else {
s = h2_session_open_stream(userp, frame->hd.stream_id, 0, NULL);
s = h2_session_open_stream(userp, frame->hd.stream_id, 0);
}
return s? 0 : NGHTTP2_ERR_START_STREAM_NOT_ALLOWED;
}
@@ -405,14 +311,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
status = h2_stream_add_header(stream, (const char *)name, namelen,
(const char *)value, valuelen);
if (status == APR_ECONNRESET) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2-stream(%ld-%d): on_header, reset stream",
session->id, stream->id);
nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream->id,
NGHTTP2_INTERNAL_ERROR);
}
else if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) {
if (status != APR_SUCCESS && !h2_stream_is_ready(stream)) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
return 0;
@@ -428,7 +327,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
void *userp)
{
h2_session *session = (h2_session *)userp;
apr_status_t status = APR_SUCCESS;
h2_stream *stream;
if (APLOGcdebug(session->c)) {
@@ -449,41 +347,16 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
* trailers */
stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
if (h2_stream_is_scheduled(stream)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_stream(%ld-%d): TRAILER, eos=%d",
session->id, frame->hd.stream_id, eos);
if (eos) {
status = h2_stream_close_input(stream);
}
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_stream(%ld-%d): HEADER, eos=%d",
session->id, frame->hd.stream_id, eos);
status = stream_schedule(session, stream, eos);
}
}
else {
status = APR_EINVAL;
h2_stream_recv_frame(stream, NGHTTP2_HEADERS, frame->hd.flags);
}
break;
case NGHTTP2_DATA:
stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_stream(%ld-%d): DATA, len=%ld, eos=%d",
session->id, frame->hd.stream_id,
(long)frame->hd.length, eos);
if (eos) {
status = h2_stream_close_input(stream);
}
}
else {
status = APR_EINVAL;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(02923)
H2_STREAM_MSG(stream, "DATA, len=%ld, flags=%d"),
(long)frame->hd.length, frame->hd.flags);
h2_stream_recv_frame(stream, NGHTTP2_DATA, frame->hd.flags);
}
break;
case NGHTTP2_PRIORITY:
@@ -539,22 +412,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
}
break;
}
if (status != APR_SUCCESS) {
int rv;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
APLOGNO(02923)
"h2_session: stream(%ld-%d): error handling frame",
session->id, (int)frame->hd.stream_id);
rv = nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
frame->hd.stream_id,
NGHTTP2_INTERNAL_ERROR);
if (nghttp2_is_fatal(rv)) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
}
return 0;
}
@@ -617,24 +474,21 @@ static int on_send_data_cb(nghttp2_session *ngh2,
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): writing frame header",
session->id, (int)stream_id);
H2_STREAM_MSG(stream, "writing frame header"));
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): send_data_cb, reading stream",
session->id, (int)stream_id);
H2_STREAM_MSG(stream, "send_data_cb, reading stream"));
apr_brigade_cleanup(session->bbtmp);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
else if (len != length) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): send_data_cb, wanted %ld bytes, "
"got %ld from stream",
session->id, (int)stream_id, (long)length, (long)len);
H2_STREAM_MSG(stream, "send_data_cb, wanted %ld bytes, "
"got %ld from stream"), (long)length, (long)len);
apr_brigade_cleanup(session->bbtmp);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
@@ -654,10 +508,8 @@ static int on_send_data_cb(nghttp2_session *ngh2,
return 0;
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
APLOGNO(02925)
"h2_stream(%ld-%d): failed send_data_cb",
session->id, (int)stream_id);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(02925)
H2_STREAM_MSG(stream, "failed send_data_cb"));
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
}
@@ -667,6 +519,19 @@ static int on_frame_send_cb(nghttp2_session *ngh2,
void *user_data)
{
h2_session *session = user_data;
h2_stream *stream;
int stream_id = frame->hd.stream_id;
++session->frames_sent;
switch (frame->hd.type) {
case NGHTTP2_PUSH_PROMISE:
/* PUSH_PROMISE we report on the promised stream */
stream_id = frame->push_promise.promised_stream_id;
break;
default:
break;
}
if (APLOGcdebug(session->c)) {
char buffer[256];
@@ -676,7 +541,11 @@ static int on_frame_send_cb(nghttp2_session *ngh2,
session->id, buffer, (long)session->frames_received,
(long)session->frames_sent);
}
++session->frames_sent;
stream = get_stream(session, stream_id);
if (stream) {
h2_stream_send_frame(stream, frame->hd.type, frame->hd.flags);
}
return 0;
}
@@ -688,6 +557,8 @@ static int on_invalid_header_cb(nghttp2_session *ngh2,
uint8_t flags, void *user_data)
{
h2_session *session = user_data;
h2_stream *stream;
if (APLOGcdebug(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03456)
"h2_session(%ld-%d): denying stream with invalid header "
@@ -695,9 +566,11 @@ static int on_invalid_header_cb(nghttp2_session *ngh2,
apr_pstrndup(session->pool, (const char *)name, namelen),
apr_pstrndup(session->pool, (const char *)value, valuelen));
}
return nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
frame->hd.stream_id,
NGHTTP2_PROTOCOL_ERROR);
stream = get_stream(session, frame->hd.stream_id);
if (stream) {
h2_stream_rst(stream, NGHTTP2_PROTOCOL_ERROR);
}
return 0;
}
#endif
@@ -791,12 +664,6 @@ static apr_status_t h2_session_shutdown(h2_session *session, int error,
"session(%ld): sent GOAWAY, err=%d, msg=%s",
session->id, error, msg? msg : "");
dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, error, msg);
if (force_close) {
apr_brigade_cleanup(session->bbtmp);
h2_mplx_abort(session->mplx);
}
return status;
}
@@ -822,15 +689,15 @@ static apr_status_t session_pool_cleanup(void *data)
session->id);
}
if (session->mplx) {
h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
h2_mplx_release_and_join(session->mplx, session->iowait);
session->mplx = NULL;
}
if (session->ngh2) {
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
}
transit(session, "pool cleanup", H2_SESSION_ST_CLEANUP);
h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
h2_mplx_release_and_join(session->mplx, session->iowait);
session->mplx = NULL;
ap_assert(session->ngh2);
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
return APR_SUCCESS;
}
@@ -891,7 +758,7 @@ static h2_session *h2_session_create_int(conn_rec *c,
/* get h2_session a lifetime beyond its pool and everything
* connected to it. */
session = apr_pcalloc(c->pool, sizeof(h2_session));
session = apr_pcalloc(pool, sizeof(h2_session));
if (session) {
int rv;
nghttp2_mem *mem;
@@ -920,6 +787,14 @@ static h2_session *h2_session_create_int(conn_rec *c,
return NULL;
}
session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor));
if (session->monitor == NULL) {
return NULL;
}
session->monitor->ctx = session;
session->monitor->on_state_enter = on_stream_state_enter;
session->monitor->on_state_event = on_stream_state_event;
session->mplx = h2_mplx_create(c, session->pool, session->config,
session->s->timeout, workers);
@@ -1052,7 +927,7 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
}
/* Now we need to auto-open stream 1 for the request we got. */
stream = h2_session_open_stream(session, 1, 0, NULL);
stream = h2_session_open_stream(session, 1, 0);
if (!stream) {
status = APR_EGENERAL;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, session->r,
@@ -1061,11 +936,7 @@ static apr_status_t h2_session_start(h2_session *session, int *rv)
return status;
}
status = h2_stream_set_request_rec(stream, session->r);
if (status != APR_SUCCESS) {
return status;
}
status = stream_schedule(session, stream, 1);
status = h2_stream_set_request_rec(stream, session->r, 1);
if (status != APR_SUCCESS) {
return status;
}
@@ -1157,6 +1028,9 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
status = h2_stream_out_prepare(stream, &nread, &eos, NULL);
if (nread) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
H2_STREAM_MSG(stream, "prepared no_copy, len=%ld, eos=%d"),
(long)nread, eos);
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
}
@@ -1165,8 +1039,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
break;
case APR_ECONNRESET:
return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
stream->id, stream->rst_error);
return 0;
case APR_EAGAIN:
/* If there is no data available, our session will automatically
@@ -1175,15 +1048,13 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
*/
nread = 0;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
"h2_stream(%ld-%d): suspending",
session->id, (int)stream_id);
H2_STREAM_MSG(stream, "suspending"));
return NGHTTP2_ERR_DEFERRED;
default:
nread = 0;
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
APLOGNO(02938) "h2_stream(%ld-%d): reading data",
session->id, (int)stream_id);
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c, APLOGNO(02938)
H2_STREAM_MSG(stream, "reading data"));
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
@@ -1196,7 +1067,6 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
h2_push *push)
{
apr_status_t status;
h2_stream *stream;
h2_ngheader *ngh;
int nid;
@@ -1206,41 +1076,30 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
ngh->nv, ngh->nvlen, NULL);
if (nid <= 0) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03075)
"h2_stream(%ld-%d): submitting push promise fail: %s",
session->id, is->id, nghttp2_strerror(nid));
H2_STREAM_MSG(is, "submitting push promise fail: %s"),
nghttp2_strerror(nid));
return NULL;
}
++session->pushes_promised;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03076)
"h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d",
session->id, is->id, nid,
push->req->method, push->req->path, is->id);
H2_STREAM_MSG(is, "SERVER_PUSH %d for %s %s on %d"),
nid, push->req->method, push->req->path, is->id);
stream = h2_session_open_stream(session, nid, is->id, push->req);
if (stream) {
h2_session_set_prio(session, stream, push->priority);
status = stream_schedule(session, stream, 1);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): scheduling push stream",
session->id, stream->id);
stream = NULL;
}
++session->unsent_promises;
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03077)
"h2_stream(%ld-%d): failed to create stream obj %d",
session->id, is->id, nid);
}
stream = h2_session_open_stream(session, nid, is->id);
if (!stream) {
/* try to tell the client that it should not wait. */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03077)
H2_STREAM_MSG(stream, "failed to create stream obj %d"),
nid);
/* kill the push_promise */
nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, nid,
NGHTTP2_INTERNAL_ERROR);
return NULL;
}
h2_session_set_prio(session, stream, push->priority);
h2_stream_set_request(stream, push->req);
++session->unsent_promises;
return stream;
}
@@ -1265,8 +1124,7 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
s = nghttp2_session_find_stream(session->ngh2, stream->id);
if (!s) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_stream(%ld-%d): lookup of nghttp2_stream failed",
session->id, stream->id);
H2_STREAM_MSG(stream, "lookup of nghttp2_stream failed"));
return APR_EINVAL;
}
@@ -1337,10 +1195,8 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
rv = nghttp2_session_change_stream_priority(session->ngh2, stream->id, &ps);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03203)
"h2_stream(%ld-%d): PUSH %s, weight=%d, "
"depends=%d, returned=%d",
session->id, stream->id, ptype,
ps.weight, ps.stream_id, rv);
H2_STREAM_MSG(stream, "PUSH %s, weight=%d, depends=%d, returned=%d"),
ptype, ps.weight, ps.stream_id, rv);
status = (rv < 0)? APR_EGENERAL : APR_SUCCESS;
}
#else
@@ -1404,14 +1260,9 @@ static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream,
ap_assert(session);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): on_headers", session->id, stream->id);
H2_STREAM_MSG(stream, "on_headers"));
if (headers->status < 100) {
int err = H2_STREAM_RST(stream, headers->status);
rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
stream->id, err);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_stream(%ld-%d): unpexected header status %d, stream rst",
session->id, stream->id, headers->status);
h2_stream_rst(stream, headers->status);
goto leave;
}
else if (stream->has_response) {
@@ -1419,8 +1270,7 @@ static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream,
nh = h2_util_ngheader_make(stream->pool, headers->headers);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03072)
"h2_stream(%ld-%d): submit %d trailers",
session->id, (int)stream->id,(int) nh->nvlen);
H2_STREAM_MSG(stream, "submit %d trailers"), (int)nh->nvlen);
rv = nghttp2_submit_trailer(session->ngh2, stream->id, nh->nv, nh->nvlen);
goto leave;
}
@@ -1431,8 +1281,8 @@ static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream,
const char *note;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
"h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
session->id, stream->id, headers->status,
H2_STREAM_MSG(stream, "submit response %d, REMOTE_WINDOW_SIZE=%u"),
headers->status,
(unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
if (!eos || len > 0) {
@@ -1548,14 +1398,14 @@ static apr_status_t on_stream_resume(void *ctx, h2_stream *stream)
ap_assert(stream);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): on_resume", session->id, stream->id);
H2_STREAM_MSG(stream, "on_resume"));
send_headers:
headers = NULL;
status = h2_stream_out_prepare(stream, &len, &eos, &headers);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
"h2_stream(%ld-%d): prepared len=%ld, eos=%d",
session->id, stream->id, (long)len, eos);
H2_STREAM_MSG(stream, "prepared len=%ld, eos=%d"),
(long)len, eos);
if (headers) {
status = on_stream_headers(session, stream, headers, len, eos);
if (status != APR_SUCCESS || stream->rst_error) {
@@ -1564,22 +1414,19 @@ send_headers:
goto send_headers;
}
else if (status != APR_EAGAIN) {
/* we have DATA to send */
if (!stream->has_response) {
int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
/* but no response */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466)
"h2_stream(%ld-%d): no response, RST_STREAM, err=%d",
session->id, stream->id, err);
nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
stream->id, err);
H2_STREAM_MSG(stream, "no response, RST_STREAM"));
h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
return APR_SUCCESS;
}
rv = nghttp2_session_resume_data(session->ngh2, stream->id);
session->have_written = 1;
ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
APLOG_ERR : APLOG_DEBUG, 0, session->c,
APLOGNO(02936)
"h2_stream(%ld-%d): resuming %s",
session->id, stream->id, rv? nghttp2_strerror(rv) : "");
APLOG_ERR : APLOG_DEBUG, 0, session->c, APLOGNO(02936)
H2_STREAM_MSG(stream, "resumed"));
}
return status;
}
@@ -1680,6 +1527,7 @@ static const char *StateNames[] = {
"IDLE", /* H2_SESSION_ST_IDLE */
"BUSY", /* H2_SESSION_ST_BUSY */
"WAIT", /* H2_SESSION_ST_WAIT */
"CLEANUP", /* H2_SESSION_ST_CLEANUP */
};
static const char *state_name(h2_session_state state)
@@ -1861,18 +1709,6 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
}
}
static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg)
{
switch (session->state) {
case H2_SESSION_ST_WAIT:
transit(session, "stream ready", H2_SESSION_ST_BUSY);
break;
default:
/* nop */
break;
}
}
static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg)
{
switch (session->state) {
@@ -1915,9 +1751,8 @@ static void h2_session_ev_pre_close(h2_session *session, int arg, const char *ms
h2_session_shutdown(session, arg, msg, 1);
}
static void h2_session_ev_stream_open(h2_session *session, int arg, const char *msg)
static void ev_stream_open(h2_session *session, h2_stream *stream)
{
++session->open_streams;
switch (session->state) {
case H2_SESSION_ST_IDLE:
if (session->open_streams == 1) {
@@ -1928,11 +1763,29 @@ static void h2_session_ev_stream_open(h2_session *session, int arg, const char *
default:
break;
}
ap_assert(!stream->scheduled);
if (stream->request) {
const h2_request *r = stream->request;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
H2_STREAM_MSG(stream, "schedule %s %s://%s%s chunked=%d"),
r->method, r->scheme, r->authority, r->path, r->chunked);
stream->scheduled = 1;
h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
}
else {
h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
}
}
static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
static void ev_stream_closed(h2_session *session, h2_stream *stream)
{
--session->open_streams;
apr_bucket *b;
if (H2_STREAM_CLIENT_INITIATED(stream->id)
&& (stream->id > session->local.completed_max)) {
session->local.completed_max = stream->id;
}
switch (session->state) {
case H2_SESSION_ST_IDLE:
if (session->open_streams == 0) {
@@ -1944,6 +1797,76 @@ static void h2_session_ev_stream_done(h2_session *session, int arg, const char *
default:
break;
}
/* The stream might have data in the buffers of the main connection.
* We can only free the allocated resources once all had been written.
* Send a special buckets on the connection that gets destroyed when
* all preceding data has been handled. On its destruction, it is safe
* to purge all resources of the stream. */
b = h2_bucket_eos_create(session->c->bucket_alloc, stream);
APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
h2_conn_io_pass(&session->io, session->bbtmp);
apr_brigade_cleanup(session->bbtmp);
}
static void on_stream_state_enter(void *ctx, h2_stream *stream)
{
h2_session *session = ctx;
/* stream entered a new state */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
H2_STREAM_MSG(stream, "entered state"));
switch (stream->state) {
case H2_SS_IDLE: /* stream was created */
++session->open_streams;
if (H2_STREAM_CLIENT_INITIATED(stream->id)) {
++session->remote.emitted_count;
if (stream->id > session->remote.emitted_max) {
session->remote.emitted_max = stream->id;
session->local.accepted_max = stream->id;
}
}
else {
if (stream->id > session->local.emitted_max) {
++session->local.emitted_count;
session->remote.emitted_max = stream->id;
}
}
break;
case H2_SS_OPEN: /* stream has request headers */
case H2_SS_RSVD_L: /* stream has request headers */
ev_stream_open(session, stream);
break;
case H2_SS_CLOSED_L: /* stream output was closed */
break;
case H2_SS_CLOSED_R: /* stream input was closed */
break;
case H2_SS_CLOSED: /* stream in+out were closed */
--session->open_streams;
ev_stream_closed(session, stream);
break;
case H2_SS_CLEANUP:
h2_mplx_stream_cleanup(session->mplx, stream);
break;
default:
break;
}
}
static void on_stream_state_event(void *ctx, h2_stream *stream,
h2_stream_event_t ev)
{
h2_session *session = ctx;
switch (ev) {
case H2_SEV_CANCELLED:
if (session->state != H2_SESSION_ST_DONE) {
nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
stream->id, stream->rst_error);
}
break;
default:
/* NOP */
break;
}
}
static void dispatch_event(h2_session *session, h2_session_event_t ev,
@@ -1971,9 +1894,6 @@ static void dispatch_event(h2_session *session, h2_session_event_t ev,
case H2_SESSION_EV_NO_IO:
h2_session_ev_no_io(session, arg, msg);
break;
case H2_SESSION_EV_STREAM_READY:
h2_session_ev_stream_ready(session, arg, msg);
break;
case H2_SESSION_EV_DATA_READ:
h2_session_ev_data_read(session, arg, msg);
break;
@@ -1986,23 +1906,12 @@ static void dispatch_event(h2_session *session, h2_session_event_t ev,
case H2_SESSION_EV_PRE_CLOSE:
h2_session_ev_pre_close(session, arg, msg);
break;
case H2_SESSION_EV_STREAM_OPEN:
h2_session_ev_stream_open(session, arg, msg);
break;
case H2_SESSION_EV_STREAM_DONE:
h2_session_ev_stream_done(session, arg, msg);
break;
default:
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld): unknown event %d",
session->id, ev);
break;
}
if (session->state == H2_SESSION_ST_DONE) {
apr_brigade_cleanup(session->bbtmp);
h2_mplx_abort(session->mplx);
}
}
/* trigger window updates, stream resumes and submits */

View File

@@ -51,6 +51,7 @@ struct h2_push;
struct h2_push_diary;
struct h2_session;
struct h2_stream;
struct h2_stream_monitor;
struct h2_task;
struct h2_workers;
@@ -64,13 +65,10 @@ typedef enum {
H2_SESSION_EV_PROTO_ERROR, /* protocol error */
H2_SESSION_EV_CONN_TIMEOUT, /* connection timeout */
H2_SESSION_EV_NO_IO, /* nothing has been read or written */
H2_SESSION_EV_STREAM_READY, /* stream signalled availability of headers/data */
H2_SESSION_EV_DATA_READ, /* connection data has been read */
H2_SESSION_EV_NGH2_DONE, /* nghttp2 wants neither read nor write anything */
H2_SESSION_EV_MPM_STOPPING, /* the process is stopping */
H2_SESSION_EV_PRE_CLOSE, /* connection will close after this */
H2_SESSION_EV_STREAM_OPEN, /* stream has been opened */
H2_SESSION_EV_STREAM_DONE, /* stream has been handled completely */
} h2_session_event_t;
typedef struct h2_session {
@@ -101,6 +99,7 @@ typedef struct h2_session {
struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
struct h2_stream_monitor *monitor;/* monitor callbacks for streams */
int open_streams; /* number of client streams open */
int unsent_submits; /* number of submitted, but not yet written responses. */
int unsent_promises; /* number of submitted, but not yet written push promises */
@@ -177,34 +176,12 @@ void h2_session_abort(h2_session *session, apr_status_t reason);
*/
void h2_session_close(h2_session *session);
/**
* Create and register a new stream under the given id.
*
* @param session the session to register in
* @param stream_id the new stream identifier
* @param initiated_on the stream id this one is initiated on or 0
* @param req the request for this stream or NULL if not known yet
* @return the new stream
*/
struct h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
int initiated_on,
const h2_request *req);
/**
* Returns if client settings have push enabled.
* @param != 0 iff push is enabled in client settings
*/
int h2_session_push_enabled(h2_session *session);
/**
* Destroy the stream and release it everywhere. Reclaim all resources.
* @param session the session to which the stream belongs
* @param stream the stream to destroy
*/
apr_status_t h2_session_stream_done(h2_session *session,
struct h2_stream *stream);
/**
* Submit a push promise on the stream and schedule the new steam for
* processing..
@@ -221,5 +198,4 @@ apr_status_t h2_session_set_prio(h2_session *session,
struct h2_stream *stream,
const struct h2_priority *prio);
#endif /* defined(__mod_h2__h2_session__) */

View File

@@ -43,16 +43,119 @@
#include "h2_util.h"
static int state_transition[][7] = {
/* ID OP RL RR CI CO CL */
/*ID*/{ 1, 0, 0, 0, 0, 0, 0 },
/*OP*/{ 1, 1, 0, 0, 0, 0, 0 },
/*RL*/{ 0, 0, 1, 0, 0, 0, 0 },
/*RR*/{ 0, 0, 0, 1, 0, 0, 0 },
/*CI*/{ 1, 1, 0, 0, 1, 0, 0 },
/*CO*/{ 1, 1, 0, 0, 0, 1, 0 },
/*CL*/{ 1, 1, 0, 0, 1, 1, 1 },
#define S_XXX (-2)
#define S_ERR (-1)
#define S_NOP (0)
#define S_IDL (H2_SS_IDL + 1)
#define S_RS_L (H2_SS_RSVD_L + 1)
#define S_RS_R (H2_SS_RSVD_R + 1)
#define S_OPEN (H2_SS_OPEN + 1)
#define S_CL_L (H2_SS_CLOSED_L + 1)
#define S_CL_R (H2_SS_CLOSED_R + 1)
#define S_CLS (H2_SS_CLOSED + 1)
#define S_CLN (H2_SS_CLEANUP + 1)
static const char *h2_ss_str(h2_stream_state_t state)
{
switch (state) {
case H2_SS_IDLE:
return "IDLE";
case H2_SS_RSVD_L:
return "RESERVED_LOCAL";
case H2_SS_RSVD_R:
return "RESERVED_REMOTE";
case H2_SS_OPEN:
return "OPEN";
case H2_SS_CLOSED_L:
return "HALF_CLOSED_LOCAL";
case H2_SS_CLOSED_R:
return "HALF_CLOSED_REMOTE";
case H2_SS_CLOSED:
return "CLOSED";
case H2_SS_CLEANUP:
return "CLEANUP";
default:
return "UNKNOWN";
}
}
const char *h2_stream_state_str(h2_stream *stream)
{
return h2_ss_str(stream->state);
}
static int trans_on_send[][H2_SS_MAX] = {
/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },
/* HEADERS, */ { S_ERR, S_ERR, S_CL_R, S_NOP, S_NOP, S_ERR, S_NOP, S_NOP, },
/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
/* RST_STREAM, */ { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
/* PUSH_PROMISE, */ { S_RS_L,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
};
static int trans_on_recv[][H2_SS_MAX] = {
/* S_IDLE,S_RS_R, S_RS_L, S_OPEN, S_CL_R, S_CL_L, S_CLS, S_CLN, */
/* DATA, */ { S_ERR, S_ERR, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },
/* HEADERS, */ { S_OPEN,S_CL_L, S_ERR, S_NOP, S_ERR, S_NOP, S_NOP, S_NOP, },
/* PRIORITY, */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
/* RST_STREAM, */ { S_ERR, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
/* SETTINGS, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
/* PUSH_PROMISE, */ { S_RS_R,S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
/* PING, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
/* GOAWAY, */ { S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, S_ERR, },
/* WINDOW_UPDATE,*/ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
/* CONT */ { S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, S_NOP, },
};
static int trans_on_event[][H2_SS_MAX] = {
/* H2_SEV_CLOSED_L*/{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },
/* H2_SEV_CLOSED_R*/{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },
/* H2_SEV_CANCELLED*/{S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },
/* H2_SEV_EOS_SENT*/{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },
};
static int on_map(h2_stream_state_t state, int map[H2_SS_MAX])
{
int op = map[state];
switch (op) {
case S_XXX:
case S_ERR:
return op;
case S_NOP:
return state;
default:
return op-1;
}
}
static int on_frame(h2_stream_state_t state, int frame_type,
int frame_map[][H2_SS_MAX], apr_size_t maxlen)
{
ap_assert(frame_type >= 0);
ap_assert(state >= 0);
if (frame_type >= maxlen) {
return state; /* NOP */
}
return on_map(state, frame_map[frame_type]);
}
static int on_frame_send(h2_stream_state_t state, int frame_type)
{
return on_frame(state, frame_type, trans_on_send, H2_ALEN(trans_on_send));
}
static int on_frame_recv(h2_stream_state_t state, int frame_type)
{
return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
}
static int on_event(h2_stream_state_t state, h2_stream_event_t ev)
{
return on_map(state, trans_on_event[ev]);
}
static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
{
@@ -68,87 +171,291 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
}
}
static int set_state(h2_stream *stream, h2_stream_state_t state)
static apr_status_t close_input(h2_stream *stream)
{
int allowed = state_transition[state][stream->state];
if (allowed) {
stream->state = state;
return 1;
conn_rec *c = stream->session->c;
apr_status_t status;
apr_bucket_brigade *tmp;
apr_bucket *b;
if (h2_beam_is_closed(stream->input)) {
return APR_SUCCESS;
}
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
"h2_stream(%ld-%d): invalid state transition from %d to %d",
stream->session->id, stream->id, stream->state, state);
return 0;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STREAM_MSG(stream, "closing input"));
if (stream->rst_error) {
return APR_ECONNRESET;
}
tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers,
NULL, stream->pool);
b = h2_bucket_headers_create(c->bucket_alloc, r);
APR_BRIGADE_INSERT_TAIL(tmp, b);
stream->trailers = NULL;
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
H2_STREAM_MSG(stream, "added trailers"));
}
b = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(tmp, b);
status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
apr_brigade_destroy(tmp);
h2_beam_close(stream->input);
return status;
}
static int close_input(h2_stream *stream)
static apr_status_t close_output(h2_stream *stream)
{
if (h2_beam_is_closed(stream->output)) {
return APR_SUCCESS;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STREAM_MSG(stream, "closing output"));
return h2_beam_leave(stream->output);
}
static void on_state_enter(h2_stream *stream)
{
if (stream->monitor && stream->monitor->on_state_enter) {
stream->monitor->on_state_enter(stream->monitor->ctx, stream);
}
}
static void on_state_event(h2_stream *stream, h2_stream_event_t ev)
{
if (stream->monitor && stream->monitor->on_state_event) {
stream->monitor->on_state_event(stream->monitor->ctx, stream, ev);
}
}
static void on_state_invalid(h2_stream *stream)
{
if (stream->monitor && stream->monitor->on_state_invalid) {
stream->monitor->on_state_invalid(stream->monitor->ctx, stream);
}
/* stream got an event/frame invalid in its state */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STREAM_MSG(stream, "invalid state event"));
switch (stream->state) {
case H2_STREAM_ST_CLOSED_INPUT:
case H2_STREAM_ST_CLOSED:
return 0; /* ignore, idempotent */
case H2_STREAM_ST_CLOSED_OUTPUT:
/* both closed now */
set_state(stream, H2_STREAM_ST_CLOSED);
case H2_SS_OPEN:
case H2_SS_RSVD_L:
case H2_SS_RSVD_R:
case H2_SS_CLOSED_L:
case H2_SS_CLOSED_R:
h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
break;
default:
/* everything else we jump to here */
set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
break;
}
return 1;
}
static int input_closed(h2_stream *stream)
static apr_status_t transit(h2_stream *stream, int new_state)
{
switch (stream->state) {
case H2_STREAM_ST_OPEN:
case H2_STREAM_ST_CLOSED_OUTPUT:
return 0;
default:
return 1;
if (new_state == stream->state) {
return APR_SUCCESS;
}
}
static int close_output(h2_stream *stream)
{
switch (stream->state) {
case H2_STREAM_ST_CLOSED_OUTPUT:
case H2_STREAM_ST_CLOSED:
return 0; /* ignore, idempotent */
case H2_STREAM_ST_CLOSED_INPUT:
/* both closed now */
set_state(stream, H2_STREAM_ST_CLOSED);
else if (new_state < 0) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
H2_STREAM_MSG(stream, "invalid transition"));
on_state_invalid(stream);
return APR_EINVAL;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STREAM_MSG(stream, "transit to [%s]"), h2_ss_str(new_state));
stream->state = new_state;
switch (new_state) {
case H2_SS_IDLE:
break;
default:
/* everything else we jump to here */
set_state(stream, H2_STREAM_ST_CLOSED_OUTPUT);
case H2_SS_RSVD_L:
close_input(stream);
break;
case H2_SS_RSVD_R:
break;
case H2_SS_OPEN:
break;
case H2_SS_CLOSED_L:
close_output(stream);
break;
case H2_SS_CLOSED_R:
close_input(stream);
break;
case H2_SS_CLOSED:
close_input(stream);
close_output(stream);
if (stream->out_buffer) {
apr_brigade_cleanup(stream->out_buffer);
}
break;
case H2_SS_CLEANUP:
break;
}
return 1;
on_state_enter(stream);
return APR_SUCCESS;
}
static int input_open(const h2_stream *stream)
void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor)
{
switch (stream->state) {
case H2_STREAM_ST_OPEN:
case H2_STREAM_ST_CLOSED_OUTPUT:
return 1;
default:
return 0;
stream->monitor = monitor;
}
void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
{
int new_state;
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
H2_STREAM_MSG(stream, "dispatch event %d"), ev);
new_state = on_event(stream->state, ev);
if (new_state < 0) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
H2_STREAM_MSG(stream, "invalid event %d"), ev);
on_state_invalid(stream);
AP_DEBUG_ASSERT(new_state > S_XXX);
return;
}
else if (new_state == stream->state) {
/* nop */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
H2_STREAM_MSG(stream, "ignored event %d"), ev);
return;
}
else {
on_state_event(stream, ev);
transit(stream, new_state);
}
}
static int output_open(h2_stream *stream)
static void set_policy_for(h2_stream *stream, h2_request *r)
{
switch (stream->state) {
case H2_STREAM_ST_OPEN:
case H2_STREAM_ST_CLOSED_INPUT:
return 1;
default:
return 0;
int enabled = h2_session_push_enabled(stream->session);
stream->push_policy = h2_push_policy_determine(r->headers, stream->pool,
enabled);
r->serialize = h2_config_geti(stream->session->config, H2_CONF_SER_HEADERS);
}
apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags)
{
apr_status_t status = APR_SUCCESS;
int new_state, eos = 0;
new_state = on_frame_send(stream->state, ftype);
if (new_state < 0) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STREAM_MSG(stream, "invalid frame %d send"), ftype);
AP_DEBUG_ASSERT(new_state > S_XXX);
return transit(stream, new_state);
}
switch (ftype) {
case NGHTTP2_DATA:
eos = (flags & NGHTTP2_FLAG_END_STREAM);
break;
case NGHTTP2_HEADERS:
eos = (flags & NGHTTP2_FLAG_END_STREAM);
break;
case NGHTTP2_PUSH_PROMISE:
/* start pushed stream */
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp != NULL);
status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
if (status != APR_SUCCESS) {
return status;
}
set_policy_for(stream, stream->rtmp);
stream->request = stream->rtmp;
stream->rtmp = NULL;
break;
default:
break;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STREAM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
status = transit(stream, new_state);
if (status == APR_SUCCESS && eos) {
status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L));
}
return status;
}
apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags)
{
apr_status_t status = APR_SUCCESS;
int new_state, eos = 0;
new_state = on_frame_recv(stream->state, ftype);
if (new_state < 0) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
H2_STREAM_MSG(stream, "invalid frame %d recv"), ftype);
AP_DEBUG_ASSERT(new_state > S_XXX);
return transit(stream, new_state);
}
switch (ftype) {
case NGHTTP2_DATA:
eos = (flags & NGHTTP2_FLAG_END_STREAM);
break;
case NGHTTP2_HEADERS:
eos = (flags & NGHTTP2_FLAG_END_STREAM);
if (stream->state == H2_SS_OPEN) {
/* trailer HEADER */
if (!eos) {
h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
}
}
else {
/* request HEADER */
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp != NULL);
status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
if (status != APR_SUCCESS) {
return status;
}
set_policy_for(stream, stream->rtmp);
stream->request = stream->rtmp;
stream->rtmp = NULL;
}
break;
default:
break;
}
status = transit(stream, new_state);
if (status == APR_SUCCESS && eos) {
status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R));
}
return status;
}
apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
const uint8_t *data, size_t len)
{
h2_session *session = stream->session;
apr_status_t status = APR_SUCCESS;
apr_bucket_brigade *tmp;
ap_assert(stream);
if (!stream->input) {
return APR_EOF;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
H2_STREAM_MSG(stream, "recv DATA, len=%d"), (int)len);
tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
apr_brigade_destroy(tmp);
stream->in_data_frames++;
stream->in_data_octets += len;
return status;
}
static void prep_output(h2_stream *stream) {
@@ -158,35 +465,26 @@ static void prep_output(h2_stream *stream) {
}
}
static void prepend_response(h2_stream *stream, h2_headers *response)
{
conn_rec *c = stream->session->c;
apr_bucket *b;
prep_output(stream);
b = h2_bucket_headers_create(c->bucket_alloc, response);
APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
}
h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
int initiated_on)
h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
h2_stream_monitor *monitor, int initiated_on)
{
h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
stream->id = id;
stream->initiated_on = initiated_on;
stream->created = apr_time_now();
stream->state = H2_STREAM_ST_IDLE;
stream->state = H2_SS_IDLE;
stream->pool = pool;
stream->session = session;
stream->monitor = monitor;
h2_beam_create(&stream->input, pool, id, "input", H2_BEAM_OWNER_SEND, 0);
h2_beam_send_from(stream->input, stream->pool);
h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0);
set_state(stream, H2_STREAM_ST_OPEN);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
"h2_stream(%ld-%d): opened", session->id, stream->id);
H2_STREAM_MSG(stream, "created"));
on_state_enter(stream);
return stream;
}
@@ -204,12 +502,10 @@ void h2_stream_cleanup(h2_stream *stream)
status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
if (status == APR_EAGAIN) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
"h2_stream(%ld-%d): wait on input drain",
stream->session->id, stream->id);
H2_STREAM_MSG(stream, "wait on input drain"));
status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
"h2_stream(%ld-%d): input drain returned",
stream->session->id, stream->id);
H2_STREAM_MSG(stream, "input drain returned"));
}
}
@@ -217,20 +513,13 @@ void h2_stream_destroy(h2_stream *stream)
{
ap_assert(stream);
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
"h2_stream(%ld-%d): destroy",
stream->session->id, stream->id);
H2_STREAM_MSG(stream, "destroy"));
if (stream->pool) {
apr_pool_destroy(stream->pool);
stream->pool = NULL;
}
}
void h2_stream_eos_destroy(h2_stream *stream)
{
h2_session_stream_done(stream->session, stream);
/* stream possibly destroyed */
}
apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
{
apr_pool_t *pool = stream->pool;
@@ -241,17 +530,15 @@ apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
void h2_stream_rst(h2_stream *stream, int error_code)
{
stream->rst_error = error_code;
close_input(stream);
close_output(stream);
if (stream->out_buffer) {
apr_brigade_cleanup(stream->out_buffer);
}
h2_beam_abort(stream->input);
h2_beam_leave(stream->output);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): reset, error=%d",
stream->session->id, stream->id, error_code);
H2_STREAM_MSG(stream, "reset, error=%d"), error_code);
h2_stream_dispatch(stream, H2_SEV_CANCELLED);
}
apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r)
apr_status_t h2_stream_set_request_rec(h2_stream *stream,
request_rec *r, int eos)
{
h2_request *req;
apr_status_t status;
@@ -264,20 +551,37 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r)
status = h2_request_rcreate(&req, stream->pool, r);
if (status == APR_SUCCESS) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
"h2_request(%d): set_request_rec %s host=%s://%s%s",
stream->id, req->method, req->scheme, req->authority,
req->path);
H2_STREAM_MSG(stream, "set_request_rec %s host=%s://%s%s"),
req->method, req->scheme, req->authority, req->path);
stream->rtmp = req;
/* simulate the frames that led to this */
return h2_stream_recv_frame(stream, NGHTTP2_HEADERS,
NGHTTP2_FLAG_END_STREAM);
}
return status;
}
apr_status_t h2_stream_set_request(h2_stream *stream, const h2_request *r)
void h2_stream_set_request(h2_stream *stream, const h2_request *r)
{
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp == NULL);
stream->rtmp = h2_request_clone(stream->pool, r);
return APR_SUCCESS;
}
static void set_error_response(h2_stream *stream, int http_status)
{
if (!h2_stream_is_ready(stream)) {
conn_rec *c = stream->session->c;
apr_bucket *b;
h2_headers *response;
response = h2_headers_die(http_status, stream->request, stream->pool);
prep_output(stream);
b = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
b = h2_bucket_headers_create(c->bucket_alloc, response);
APR_BRIGADE_INSERT_HEAD(stream->out_buffer, b);
}
}
static apr_status_t add_trailer(h2_stream *stream,
@@ -289,8 +593,7 @@ static apr_status_t add_trailer(h2_stream *stream,
if (nlen == 0 || name[0] == ':') {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EINVAL, c, APLOGNO(03060)
"h2_request(%ld-%d): pseudo header in trailer",
c->id, stream->id);
H2_STREAM_MSG(stream, "pseudo header in trailer"));
return APR_EINVAL;
}
if (h2_req_ignore_trailer(name, nlen)) {
@@ -311,192 +614,62 @@ apr_status_t h2_stream_add_header(h2_stream *stream,
const char *name, size_t nlen,
const char *value, size_t vlen)
{
h2_session *session = stream->session;
int error = 0;
ap_assert(stream);
apr_status_t status;
if (stream->has_response) {
return APR_EINVAL;
}
++stream->request_headers_added;
if (name[0] == ':') {
if ((vlen) > stream->session->s->limit_req_line) {
if ((vlen) > session->s->limit_req_line) {
/* pseudo header: approximation of request line size check */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): pseudo header %s too long",
stream->session->id, stream->id, name);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
H2_STREAM_MSG(stream, "pseudo %s too long"), name);
error = HTTP_REQUEST_URI_TOO_LARGE;
}
}
else if ((nlen + 2 + vlen) > stream->session->s->limit_req_fieldsize) {
else if ((nlen + 2 + vlen) > session->s->limit_req_fieldsize) {
/* header too long */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): header %s too long",
stream->session->id, stream->id, name);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
H2_STREAM_MSG(stream, "header %s too long"), name);
error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
}
if (stream->request_headers_added
> stream->session->s->limit_req_fields + 4) {
if (stream->request_headers_added > session->s->limit_req_fields + 4) {
/* too many header lines, include 4 pseudo headers */
if (stream->request_headers_added
> stream->session->s->limit_req_fields + 4 + 100) {
> session->s->limit_req_fields + 4 + 100) {
/* yeah, right */
h2_stream_rst(stream, H2_ERR_ENHANCE_YOUR_CALM);
return APR_ECONNRESET;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): too many header lines",
stream->session->id, stream->id);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
H2_STREAM_MSG(stream, "too many header lines"));
error = HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
}
if (h2_stream_is_scheduled(stream)) {
return add_trailer(stream, name, nlen, value, vlen);
if (error) {
set_error_response(stream, error);
return APR_EINVAL;
}
else if (error) {
return h2_stream_set_error(stream, error);
}
else {
else if (H2_SS_IDLE == stream->state) {
if (!stream->rtmp) {
stream->rtmp = h2_req_create(stream->id, stream->pool,
NULL, NULL, NULL, NULL, NULL, 0);
}
if (stream->state != H2_STREAM_ST_OPEN) {
return APR_ECONNRESET;
}
return h2_request_add_header(stream->rtmp, stream->pool,
name, nlen, value, vlen);
status = h2_request_add_header(stream->rtmp, stream->pool,
name, nlen, value, vlen);
}
}
apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status = APR_EINVAL;
ap_assert(stream);
ap_assert(stream->session);
ap_assert(stream->session->mplx);
if (!stream->scheduled) {
if (eos) {
close_input(stream);
}
if (h2_stream_is_ready(stream)) {
/* already have a resonse, probably a HTTP error code */
return h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
}
else if (!stream->request && stream->rtmp) {
/* This is the common case: a h2_request was being assembled, now
* it gets finalized and checked for completness */
status = h2_request_end_headers(stream->rtmp, stream->pool, eos);
if (status == APR_SUCCESS) {
stream->rtmp->serialize = h2_config_geti(stream->session->config,
H2_CONF_SER_HEADERS);
stream->request = stream->rtmp;
stream->rtmp = NULL;
stream->scheduled = 1;
stream->push_policy = h2_push_policy_determine(stream->request->headers,
stream->pool, push_enabled);
status = h2_mplx_process(stream->session->mplx, stream, cmp, ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): scheduled %s %s://%s%s "
"chunked=%d",
stream->session->id, stream->id,
stream->request->method, stream->request->scheme,
stream->request->authority, stream->request->path,
stream->request->chunked);
return status;
}
}
else {
status = APR_ECONNRESET;
}
else {
status = add_trailer(stream, name, nlen, value, vlen);
}
h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
"h2_stream(%ld-%d): RST=2 (internal err) %s %s://%s%s",
stream->session->id, stream->id,
stream->request->method, stream->request->scheme,
stream->request->authority, stream->request->path);
return status;
}
int h2_stream_is_scheduled(const h2_stream *stream)
{
return stream->scheduled;
}
apr_status_t h2_stream_close_input(h2_stream *stream)
{
conn_rec *c = stream->session->c;
apr_status_t status;
apr_bucket_brigade *tmp;
apr_bucket *b;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): closing input",
stream->session->id, stream->id);
if (stream->rst_error) {
return APR_ECONNRESET;
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
H2_STREAM_MSG(stream, "header %s not accepted"), name);
h2_stream_dispatch(stream, H2_SEV_CANCELLED);
}
tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers,
NULL, stream->pool);
b = h2_bucket_headers_create(c->bucket_alloc, r);
APR_BRIGADE_INSERT_TAIL(tmp, b);
stream->trailers = NULL;
}
b = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(tmp, b);
status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
apr_brigade_destroy(tmp);
h2_beam_close(stream->input);
return status;
}
apr_status_t h2_stream_write_data(h2_stream *stream,
const char *data, size_t len, int eos)
{
conn_rec *c = stream->session->c;
apr_status_t status = APR_SUCCESS;
apr_bucket_brigade *tmp;
ap_assert(stream);
if (!stream->input) {
return APR_EOF;
}
if (input_closed(stream) || !stream->request) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d",
stream->session->id, stream->id, input_closed(stream),
stream->request != NULL);
return APR_EINVAL;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_stream(%ld-%d): add %ld input bytes",
stream->session->id, stream->id, (long)len);
tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
apr_brigade_write(tmp, NULL, NULL, data, len);
status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
apr_brigade_destroy(tmp);
stream->in_data_frames++;
stream->in_data_octets += len;
if (eos) {
return h2_stream_close_input(stream);
}
return status;
}
@@ -510,28 +683,10 @@ static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount)
status = h2_beam_receive(stream->output, stream->out_buffer,
APR_NONBLOCK_READ, amount);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
"h2_stream(%ld-%d): beam_received",
stream->session->id, stream->id);
H2_STREAM_MSG(stream, "beam_received"));
return status;
}
apr_status_t h2_stream_set_error(h2_stream *stream, int http_status)
{
h2_headers *response;
if (h2_stream_is_ready(stream)) {
return APR_EINVAL;
}
if (stream->rtmp) {
stream->request = stream->rtmp;
stream->rtmp = NULL;
}
response = h2_headers_die(http_status, stream->request, stream->pool);
prepend_response(stream, response);
h2_beam_close(stream->output);
return APR_SUCCESS;
}
static apr_bucket *get_first_headers_bucket(apr_bucket_brigade *bb)
{
if (bb) {
@@ -563,9 +718,6 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
*peos = 1;
return APR_ECONNRESET;
}
else if (!output_open(stream)) {
return APR_ECONNRESET;
}
c = stream->session->c;
prep_output(stream);
@@ -639,14 +791,24 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
}
}
}
if (!*peos && !*plen && status == APR_SUCCESS
&& (!presponse || !*presponse)) {
status = APR_EAGAIN;
if (status == APR_SUCCESS) {
if (presponse && *presponse) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
H2_STREAM_MSG(stream, "prepare, response %d"),
(*presponse)->status);
}
else if (*peos || *plen) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
H2_STREAM_MSG(stream, "prepare, len=%ld eos=%d"),
(long)*plen, *peos);
}
else {
status = APR_EAGAIN;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
H2_STREAM_MSG(stream, "prepare, no data"));
}
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c,
"h2_stream(%ld-%d): prepare, len=%ld eos=%d",
c->id, stream->id, (long)*plen, *peos);
return status;
}
@@ -669,17 +831,12 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
status = APR_EAGAIN;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
"h2_stream(%ld-%d): read_to, len=%ld eos=%d",
c->id, stream->id, (long)*plen, *peos);
H2_STREAM_MSG(stream, "read_to, len=%ld eos=%d"),
(long)*plen, *peos);
return status;
}
int h2_stream_input_is_open(const h2_stream *stream)
{
return input_open(stream);
}
apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
{
apr_status_t status = APR_SUCCESS;
@@ -689,8 +846,8 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
pushes = h2_push_collect_update(stream, stream->request, response);
if (pushes && !apr_is_empty_array(pushes)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): found %d push candidates",
stream->session->id, stream->id, pushes->nelts);
H2_STREAM_MSG(stream, "found %d push candidates"),
pushes->nelts);
for (i = 0; i < pushes->nelts; ++i) {
h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
h2_stream *s = h2_session_push(stream->session, stream, push);
@@ -721,29 +878,6 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream,
return NULL;
}
const char *h2_stream_state_str(h2_stream *stream)
{
switch (stream->state) {
case H2_STREAM_ST_IDLE:
return "IDLE";
case H2_STREAM_ST_OPEN:
return "OPEN";
case H2_STREAM_ST_RESV_LOCAL:
return "RESERVED_LOCAL";
case H2_STREAM_ST_RESV_REMOTE:
return "RESERVED_REMOTE";
case H2_STREAM_ST_CLOSED_INPUT:
return "HALF_CLOSED_REMOTE";
case H2_STREAM_ST_CLOSED_OUTPUT:
return "HALF_CLOSED_LOCAL";
case H2_STREAM_ST_CLOSED:
return "CLOSED";
default:
return "UNKNOWN";
}
}
int h2_stream_is_ready(h2_stream *stream)
{
if (stream->has_response) {
@@ -755,4 +889,15 @@ int h2_stream_is_ready(h2_stream *stream)
return 0;
}
int h2_stream_was_closed(const h2_stream *stream)
{
switch (stream->state) {
case H2_SS_CLOSED:
case H2_SS_CLEANUP:
return 1;
default:
return 0;
}
}

View File

@@ -36,19 +36,33 @@ struct h2_priority;
struct h2_request;
struct h2_headers;
struct h2_session;
struct h2_sos;
struct h2_task;
struct h2_bucket_beam;
typedef struct h2_stream h2_stream;
typedef void h2_stream_state_cb(void *ctx, h2_stream *stream);
typedef void h2_stream_event_cb(void *ctx, h2_stream *stream,
h2_stream_event_t ev);
typedef struct h2_stream_monitor {
void *ctx;
h2_stream_state_cb *on_state_enter; /* called when a state is entered */
h2_stream_state_cb *on_state_invalid; /* called when an invalid state change
was detected */
h2_stream_event_cb *on_state_event; /* called right before the given event
result in a new stream state */
} h2_stream_monitor;
struct h2_stream {
int id; /* http2 stream id */
int initiated_on; /* initiating stream id (PUSH) or 0 */
apr_time_t created; /* when stream was created */
h2_stream_state_t state; /* http/2 state of this stream */
struct h2_session *session; /* the session this stream belongs to */
int id; /* http2 stream identifier */
int initiated_on; /* initiating stream id (PUSH) or 0 */
apr_pool_t *pool; /* the memory pool for this stream */
struct h2_session *session; /* the session this stream belongs to */
h2_stream_state_t state; /* state of this stream */
apr_time_t created; /* when stream was created */
const struct h2_request *request; /* the request made in this stream */
struct h2_request *rtmp; /* request being assembled */
apr_table_t *trailers; /* optional incoming trailers */
@@ -61,42 +75,53 @@ struct h2_stream {
int rst_error; /* stream error for RST_STREAM */
unsigned int aborted : 1; /* was aborted */
unsigned int scheduled : 1; /* stream has been scheduled */
unsigned int started : 1; /* stream has started processing */
unsigned int has_response : 1; /* response headers are known */
unsigned int push_policy; /* which push policy to use for this request */
struct h2_task *task; /* assigned task to fullfill request */
const h2_priority *pref_priority; /* preferred priority for this stream */
apr_off_t out_data_frames; /* # of DATA frames sent */
apr_off_t out_data_octets; /* # of DATA octets (payload) sent */
apr_off_t in_data_frames; /* # of DATA frames received */
apr_off_t in_data_octets; /* # of DATA octets (payload) received */
const char *sos_filter;
h2_stream_monitor *monitor; /* optional monitor for stream states */
};
#define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def))
/**
* Create a stream in OPEN state.
* Create a stream in H2_SS_IDLE state.
* @param id the stream identifier
* @param pool the memory pool to use for this stream
* @param session the session this stream belongs to
* @return the newly opened stream
*/
h2_stream *h2_stream_open(int id, apr_pool_t *pool, struct h2_session *session,
int initiated_on);
/**
* Cleanup any resources still held by the stream, called by last bucket.
*/
void h2_stream_eos_destroy(h2_stream *stream);
h2_stream *h2_stream_create(int id, apr_pool_t *pool,
struct h2_session *session,
h2_stream_monitor *monitor,
int initiated_on);
/**
* Destroy memory pool if still owned by the stream.
*/
void h2_stream_destroy(h2_stream *stream);
/*
* Set a new monitor for this stream, replacing any existing one. Can
* be called with NULL to have no monitor installed.
*/
void h2_stream_set_monitor(h2_stream *stream, h2_stream_monitor *monitor);
/**
* Dispatch (handle) an event on the given stream.
* @param stream the streama the event happened on
* @param ev the type of event
*/
void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev);
/**
* Cleanup references into requst processing.
*
@@ -114,20 +139,23 @@ void h2_stream_cleanup(h2_stream *stream);
apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
/**
* Initialize stream->request with the given h2_request.
* Set complete stream headers from given h2_request.
*
* @param stream stream to write request to
* @param r the request with all the meta data
* @param eos != 0 iff stream input is closed
*/
apr_status_t h2_stream_set_request(h2_stream *stream, const h2_request *r);
void h2_stream_set_request(h2_stream *stream, const h2_request *r);
/**
* Initialize stream->request with the given request_rec.
* Set complete stream header from given request_rec.
*
* @param stream stream to write request to
* @param r the request with all the meta data
* @param eos != 0 iff stream input is closed
*/
apr_status_t h2_stream_set_request_rec(h2_stream *stream, request_rec *r);
apr_status_t h2_stream_set_request_rec(h2_stream *stream,
request_rec *r, int eos);
/*
* Add a HTTP/2 header (including pseudo headers) or trailer
@@ -143,22 +171,19 @@ apr_status_t h2_stream_add_header(h2_stream *stream,
const char *name, size_t nlen,
const char *value, size_t vlen);
/**
* Closes the stream's input.
*
* @param stream stream to close intput of
*/
apr_status_t h2_stream_close_input(h2_stream *stream);
apr_status_t h2_stream_send_frame(h2_stream *stream, int frame_type, int flags);
apr_status_t h2_stream_recv_frame(h2_stream *stream, int frame_type, int flags);
/*
* Write a chunk of DATA to the stream.
* Process a frame of received DATA.
*
* @param stream stream to write the data to
* @param flags the frame flags
* @param data the beginning of the bytes to write
* @param len the number of bytes to write
*/
apr_status_t h2_stream_write_data(h2_stream *stream,
const char *data, size_t len, int eos);
apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
const uint8_t *data, size_t len);
/**
* Reset the stream. Stream write/reads will return errors afterwards.
@@ -169,29 +194,14 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
void h2_stream_rst(h2_stream *stream, int error_code);
/**
* Schedule the stream for execution. All header information must be
* present. Use the given priority comparison callback to determine
* order in queued streams.
*
* @param stream the stream to schedule
* @param eos != 0 iff no more input will arrive
* @param cmp priority comparison
* @param ctx context for comparison
*/
apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
h2_stream_pri_cmp *cmp, void *ctx);
/**
* Determine if stream has been scheduled already.
* Determine if stream was closed already. This is true for
* states H2_SS_CLOSED, H2_SS_CLEANUP. But not true
* for H2_SS_CLOSED_L and H2_SS_CLOSED_R.
*
* @param stream the stream to check on
* @return != 0 iff stream has been scheduled
* @return != 0 iff stream has been closed
*/
int h2_stream_is_scheduled(const h2_stream *stream);
/**
* Set the HTTP error status as response.
*/
apr_status_t h2_stream_set_error(h2_stream *stream, int http_status);
int h2_stream_was_closed(const h2_stream *stream);
/**
* Do a speculative read on the stream output to determine the
@@ -235,13 +245,6 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
*/
apr_table_t *h2_stream_get_trailers(h2_stream *stream);
/**
* Check if the stream has open input.
* @param stream the stream to check
* @return != 0 iff stream has open input.
*/
int h2_stream_input_is_open(const h2_stream *stream);
/**
* Submit any server push promises on this stream and schedule
* the tasks connection with these.
@@ -268,4 +271,9 @@ const char *h2_stream_state_str(h2_stream *stream);
*/
int h2_stream_is_ready(h2_stream *stream);
#define H2_STREAM_MSG(s, msg) \
"h2_stream(%ld-%d,%s): "msg, s->session->id, s->id, h2_stream_state_str(s)
#endif /* defined(__mod_h2__h2_stream__) */

View File

@@ -64,24 +64,6 @@ static void H2_TASK_OUT_LOG(int lvl, h2_task *task, apr_bucket_brigade *bb,
}
}
static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg,
conn_rec *c, int level)
{
if (beam && APLOG_C_IS_LEVEL(c,level)) {
char buffer[2048];
apr_size_t off = 0;
off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->send_list);
off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->recv_buffer);
off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold_list);
off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge_list);
ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s",
c->id, id, msg, buffer);
}
}
/*******************************************************************************
* task input handling
******************************************************************************/
@@ -115,11 +97,11 @@ static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb, int block)
apr_brigade_length(bb, 0, &written);
H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out");
h2_beam_log(task->output.beam, task->stream_id, "send_out(before)", task->c, APLOG_TRACE2);
h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(before)");
/* engines send unblocking */
status = h2_beam_send(task->output.beam, bb,
block? APR_BLOCK_READ : APR_NONBLOCK_READ);
h2_beam_log(task->output.beam, task->stream_id, "send_out(after)", task->c, APLOG_TRACE2);
h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(after)");
if (APR_STATUS_IS_EAGAIN(status)) {
apr_brigade_length(bb, 0, &left);
@@ -423,7 +405,7 @@ void h2_task_redo(h2_task *task)
void h2_task_rst(h2_task *task, int error)
{
task->rst_error = error;
h2_beam_abort(task->input.beam);
h2_beam_leave(task->input.beam);
if (!task->worker_done) {
h2_beam_abort(task->output.beam);
}
@@ -642,7 +624,7 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
}
/* After the call to ap_process_request, the
* request pool will have been deleted. We set
* request pool may have been deleted. We set
* r=NULL here to ensure that any dereference
* of r that might be added later in this function
* will result in a segfault immediately instead

View File

@@ -922,6 +922,9 @@ apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax,
off += apr_snprintf(buffer+off, bmax-off, "%s", sep);
}
if (bmax <= off) {
return off;
}
if (APR_BUCKET_IS_METADATA(b)) {
if (APR_BUCKET_IS_EOS(b)) {
off += apr_snprintf(buffer+off, bmax-off, "eos");
@@ -965,10 +968,12 @@ apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax,
btype = "pool";
}
off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
btype,
(long)(b->length == ((apr_size_t)-1)?
-1 : b->length));
if (bmax > off) {
off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
btype,
(long)(b->length == ((apr_size_t)-1)?
-1 : b->length));
}
}
return off;
}
@@ -981,20 +986,24 @@ apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax,
const char *sp = "";
apr_bucket *b;
if (bb) {
memset(buffer, 0, bmax--);
off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
for (b = APR_BRIGADE_FIRST(bb);
bmax && (b != APR_BRIGADE_SENTINEL(bb));
b = APR_BUCKET_NEXT(b)) {
off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
sp = " ";
if (bmax > 1) {
if (bb) {
memset(buffer, 0, bmax--);
off += apr_snprintf(buffer+off, bmax-off, "%s(", tag);
for (b = APR_BRIGADE_FIRST(bb);
(bmax > off) && (b != APR_BRIGADE_SENTINEL(bb));
b = APR_BUCKET_NEXT(b)) {
off += h2_util_bucket_print(buffer+off, bmax-off, b, sp);
sp = " ";
}
if (bmax > off) {
off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
}
}
else {
off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
}
off += apr_snprintf(buffer+off, bmax-off, ")%s", sep);
}
else {
off += apr_snprintf(buffer+off, bmax-off, "%s(null)%s", tag, sep);
}
return off;
}

View File

@@ -26,7 +26,7 @@
* @macro
* Version number of the http2 module as c string
*/
#define MOD_HTTP2_VERSION "1.8.12-DEV"
#define MOD_HTTP2_VERSION "1.9.0-DEV"
/**
* @macro
@@ -34,7 +34,7 @@
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
#define MOD_HTTP2_VERSION_NUM 0x01080c
#define MOD_HTTP2_VERSION_NUM 0x010900
#endif /* mod_h2_h2_version_h */