mirror of
https://github.com/apache/httpd.git
synced 2025-08-08 15:02:10 +03:00
On the trunk:
mod_http2: input buffering and dynamic flow windows for increased throughput. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1788981 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
3
CHANGES
3
CHANGES
@@ -1,6 +1,9 @@
|
|||||||
-*- coding: utf-8 -*-
|
-*- coding: utf-8 -*-
|
||||||
Changes with Apache 2.5.0
|
Changes with Apache 2.5.0
|
||||||
|
|
||||||
|
*) mod_http2: input buffering and dynamic flow windows for increased
|
||||||
|
throughput. [Stefan Eissing]
|
||||||
|
|
||||||
*) mod_http2: h2 workers with improved scalability for better scheduling
|
*) mod_http2: h2 workers with improved scalability for better scheduling
|
||||||
performance. There are H2MaxWorkers threads created at start and the
|
performance. There are H2MaxWorkers threads created at start and the
|
||||||
number is kept constant. [Stefan Eissing]
|
number is kept constant. [Stefan Eissing]
|
||||||
|
@@ -115,7 +115,8 @@ typedef enum {
|
|||||||
H2_SEV_CLOSED_L,
|
H2_SEV_CLOSED_L,
|
||||||
H2_SEV_CLOSED_R,
|
H2_SEV_CLOSED_R,
|
||||||
H2_SEV_CANCELLED,
|
H2_SEV_CANCELLED,
|
||||||
H2_SEV_EOS_SENT
|
H2_SEV_EOS_SENT,
|
||||||
|
H2_SEV_IN_DATA_PENDING,
|
||||||
} h2_stream_event_t;
|
} h2_stream_event_t;
|
||||||
|
|
||||||
|
|
||||||
|
@@ -246,15 +246,17 @@ static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
|
|||||||
apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
|
apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
|
||||||
h2_beam_io_callback *cb = beam->cons_io_cb;
|
h2_beam_io_callback *cb = beam->cons_io_cb;
|
||||||
|
|
||||||
if (cb) {
|
if (len > 0) {
|
||||||
void *ctx = beam->cons_ctx;
|
if (cb) {
|
||||||
|
void *ctx = beam->cons_ctx;
|
||||||
if (pbl) leave_yellow(beam, pbl);
|
|
||||||
cb(ctx, beam, len);
|
if (pbl) leave_yellow(beam, pbl);
|
||||||
if (pbl) enter_yellow(beam, pbl);
|
cb(ctx, beam, len);
|
||||||
rv = 1;
|
if (pbl) enter_yellow(beam, pbl);
|
||||||
|
rv = 1;
|
||||||
|
}
|
||||||
|
beam->cons_bytes_reported += len;
|
||||||
}
|
}
|
||||||
beam->cons_bytes_reported += len;
|
|
||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1250,9 +1252,9 @@ void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
|
|||||||
if (beam && APLOG_C_IS_LEVEL(c,level)) {
|
if (beam && APLOG_C_IS_LEVEL(c,level)) {
|
||||||
ap_log_cerror(APLOG_MARK, level, 0, c,
|
ap_log_cerror(APLOG_MARK, level, 0, c,
|
||||||
"beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s",
|
"beam(%ld-%d,%s,closed=%d,aborted=%d,empty=%d,buf=%ld): %s",
|
||||||
c->id, beam->id, beam->tag, beam->closed, beam->aborted,
|
(c->master? c->master->id : c->id), beam->id, beam->tag,
|
||||||
h2_beam_empty(beam), (long)h2_beam_get_buffered(beam),
|
beam->closed, beam->aborted, h2_beam_empty(beam),
|
||||||
msg);
|
(long)h2_beam_get_buffered(beam), msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -44,55 +44,80 @@
|
|||||||
#define UNSET -1
|
#define UNSET -1
|
||||||
#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
|
#define H2MIN(x,y) ((x) < (y) ? (x) : (y))
|
||||||
|
|
||||||
static apr_status_t consume_brigade(h2_filter_cin *cin,
|
static apr_status_t recv_RAW_DATA(conn_rec *c, h2_filter_cin *cin,
|
||||||
apr_bucket_brigade *bb,
|
apr_bucket *b, apr_read_type_e block)
|
||||||
apr_read_type_e block)
|
|
||||||
{
|
{
|
||||||
|
h2_session *session = cin->session;
|
||||||
apr_status_t status = APR_SUCCESS;
|
apr_status_t status = APR_SUCCESS;
|
||||||
apr_size_t readlen = 0;
|
apr_size_t len;
|
||||||
|
const char *data;
|
||||||
|
ssize_t n;
|
||||||
|
|
||||||
while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
|
status = apr_bucket_read(b, &data, &len, block);
|
||||||
|
|
||||||
|
while (status == APR_SUCCESS && len > 0) {
|
||||||
|
n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
|
||||||
|
|
||||||
apr_bucket* bucket = APR_BRIGADE_FIRST(bb);
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
|
||||||
if (APR_BUCKET_IS_METADATA(bucket)) {
|
H2_SSSN_MSG(session, "fed %ld bytes to nghttp2, %ld read"),
|
||||||
/* we do nothing regarding any meta here */
|
(long)len, (long)n);
|
||||||
}
|
if (n < 0) {
|
||||||
else {
|
if (nghttp2_is_fatal((int)n)) {
|
||||||
const char *bucket_data = NULL;
|
h2_session_event(session, H2_SESSION_EV_PROTO_ERROR,
|
||||||
apr_size_t bucket_length = 0;
|
(int)n, nghttp2_strerror((int)n));
|
||||||
status = apr_bucket_read(bucket, &bucket_data,
|
status = APR_EGENERAL;
|
||||||
&bucket_length, block);
|
|
||||||
|
|
||||||
if (status == APR_SUCCESS && bucket_length > 0) {
|
|
||||||
apr_size_t consumed = 0;
|
|
||||||
|
|
||||||
status = cin->cb(cin->cb_ctx, bucket_data, bucket_length, &consumed);
|
|
||||||
if (status == APR_SUCCESS && bucket_length > consumed) {
|
|
||||||
/* We have data left in the bucket. Split it. */
|
|
||||||
status = apr_bucket_split(bucket, consumed);
|
|
||||||
}
|
|
||||||
readlen += consumed;
|
|
||||||
cin->start_read = apr_time_now();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
apr_bucket_delete(bucket);
|
else {
|
||||||
|
session->io.bytes_read += n;
|
||||||
|
if (len <= n) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
len -= n;
|
||||||
|
data += n;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (readlen == 0 && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
static apr_status_t recv_RAW_brigade(conn_rec *c, h2_filter_cin *cin,
|
||||||
|
apr_bucket_brigade *bb,
|
||||||
|
apr_read_type_e block)
|
||||||
|
{
|
||||||
|
apr_status_t status = APR_SUCCESS;
|
||||||
|
apr_bucket* b;
|
||||||
|
int consumed = 0;
|
||||||
|
|
||||||
|
h2_util_bb_log(c, c->id, APLOG_TRACE2, "RAW_in", bb);
|
||||||
|
while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
|
||||||
|
b = APR_BRIGADE_FIRST(bb);
|
||||||
|
|
||||||
|
if (APR_BUCKET_IS_METADATA(b)) {
|
||||||
|
/* nop */
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
status = recv_RAW_DATA(c, cin, b, block);
|
||||||
|
}
|
||||||
|
consumed = 1;
|
||||||
|
apr_bucket_delete(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!consumed && status == APR_SUCCESS && block == APR_NONBLOCK_READ) {
|
||||||
return APR_EAGAIN;
|
return APR_EAGAIN;
|
||||||
}
|
}
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx)
|
h2_filter_cin *h2_filter_cin_create(h2_session *session)
|
||||||
{
|
{
|
||||||
h2_filter_cin *cin;
|
h2_filter_cin *cin;
|
||||||
|
|
||||||
cin = apr_pcalloc(p, sizeof(*cin));
|
cin = apr_pcalloc(session->pool, sizeof(*cin));
|
||||||
cin->pool = p;
|
if (!cin) {
|
||||||
cin->cb = cb;
|
return NULL;
|
||||||
cin->cb_ctx = ctx;
|
}
|
||||||
cin->start_read = UNSET;
|
cin->session = session;
|
||||||
return cin;
|
return cin;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -110,11 +135,14 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
|
|||||||
h2_filter_cin *cin = f->ctx;
|
h2_filter_cin *cin = f->ctx;
|
||||||
apr_status_t status = APR_SUCCESS;
|
apr_status_t status = APR_SUCCESS;
|
||||||
apr_interval_time_t saved_timeout = UNSET;
|
apr_interval_time_t saved_timeout = UNSET;
|
||||||
|
const int trace1 = APLOGctrace1(f->c);
|
||||||
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
if (trace1) {
|
||||||
"h2_session(%ld): read, %s, mode=%d, readbytes=%ld",
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
||||||
(long)f->c->id, (block == APR_BLOCK_READ)?
|
"h2_session(%ld): read, %s, mode=%d, readbytes=%ld",
|
||||||
"BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes);
|
(long)f->c->id, (block == APR_BLOCK_READ)?
|
||||||
|
"BLOCK_READ" : "NONBLOCK_READ", mode, (long)readbytes);
|
||||||
|
}
|
||||||
|
|
||||||
if (mode == AP_MODE_INIT || mode == AP_MODE_SPECULATIVE) {
|
if (mode == AP_MODE_INIT || mode == AP_MODE_SPECULATIVE) {
|
||||||
return ap_get_brigade(f->next, brigade, mode, block, readbytes);
|
return ap_get_brigade(f->next, brigade, mode, block, readbytes);
|
||||||
@@ -125,20 +153,16 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!cin->bb) {
|
if (!cin->bb) {
|
||||||
cin->bb = apr_brigade_create(cin->pool, f->c->bucket_alloc);
|
cin->bb = apr_brigade_create(cin->session->pool, f->c->bucket_alloc);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!cin->socket) {
|
if (!cin->socket) {
|
||||||
cin->socket = ap_get_conn_socket(f->c);
|
cin->socket = ap_get_conn_socket(f->c);
|
||||||
}
|
}
|
||||||
|
|
||||||
cin->start_read = apr_time_now();
|
|
||||||
if (APR_BRIGADE_EMPTY(cin->bb)) {
|
if (APR_BRIGADE_EMPTY(cin->bb)) {
|
||||||
/* We only do a blocking read when we have no streams to process. So,
|
/* We only do a blocking read when we have no streams to process. So,
|
||||||
* in httpd scoreboard lingo, we are in a KEEPALIVE connection state.
|
* in httpd scoreboard lingo, we are in a KEEPALIVE connection state.
|
||||||
* When reading non-blocking, we do have streams to process and update
|
|
||||||
* child with NULL request. That way, any current request information
|
|
||||||
* in the scoreboard is preserved.
|
|
||||||
*/
|
*/
|
||||||
if (block == APR_BLOCK_READ) {
|
if (block == APR_BLOCK_READ) {
|
||||||
if (cin->timeout > 0) {
|
if (cin->timeout > 0) {
|
||||||
@@ -155,13 +179,15 @@ apr_status_t h2_filter_core_input(ap_filter_t* f,
|
|||||||
|
|
||||||
switch (status) {
|
switch (status) {
|
||||||
case APR_SUCCESS:
|
case APR_SUCCESS:
|
||||||
status = consume_brigade(cin, cin->bb, block);
|
status = recv_RAW_brigade(f->c, cin, cin->bb, block);
|
||||||
break;
|
break;
|
||||||
case APR_EOF:
|
case APR_EOF:
|
||||||
case APR_EAGAIN:
|
case APR_EAGAIN:
|
||||||
case APR_TIMEUP:
|
case APR_TIMEUP:
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
if (trace1) {
|
||||||
"h2_session(%ld): read", f->c->id);
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||||
|
"h2_session(%ld): read", f->c->id);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, f->c, APLOGNO(03046)
|
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, f->c, APLOGNO(03046)
|
||||||
|
@@ -21,21 +21,16 @@ struct h2_headers;
|
|||||||
struct h2_stream;
|
struct h2_stream;
|
||||||
struct h2_session;
|
struct h2_session;
|
||||||
|
|
||||||
typedef apr_status_t h2_filter_cin_cb(void *ctx,
|
|
||||||
const char *data, apr_size_t len,
|
|
||||||
apr_size_t *readlen);
|
|
||||||
|
|
||||||
typedef struct h2_filter_cin {
|
typedef struct h2_filter_cin {
|
||||||
apr_pool_t *pool;
|
apr_pool_t *pool;
|
||||||
apr_bucket_brigade *bb;
|
|
||||||
h2_filter_cin_cb *cb;
|
|
||||||
void *cb_ctx;
|
|
||||||
apr_socket_t *socket;
|
apr_socket_t *socket;
|
||||||
apr_interval_time_t timeout;
|
apr_interval_time_t timeout;
|
||||||
apr_time_t start_read;
|
apr_bucket_brigade *bb;
|
||||||
|
struct h2_session *session;
|
||||||
|
apr_bucket *cur;
|
||||||
} h2_filter_cin;
|
} h2_filter_cin;
|
||||||
|
|
||||||
h2_filter_cin *h2_filter_cin_create(apr_pool_t *p, h2_filter_cin_cb *cb, void *ctx);
|
h2_filter_cin *h2_filter_cin_create(struct h2_session *session);
|
||||||
|
|
||||||
void h2_filter_cin_timeout_set(h2_filter_cin *cin, apr_interval_time_t timeout);
|
void h2_filter_cin_timeout_set(h2_filter_cin *cin, apr_interval_time_t timeout);
|
||||||
|
|
||||||
|
@@ -618,6 +618,18 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task)
|
|||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int report_input_consumption(void *ctx, void *val)
|
||||||
|
{
|
||||||
|
h2_stream *stream = val;
|
||||||
|
|
||||||
|
(void)ctx;
|
||||||
|
if (stream->input) {
|
||||||
|
h2_beam_report_consumption(stream->input);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
|
apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
|
||||||
apr_thread_cond_t *iowait)
|
apr_thread_cond_t *iowait)
|
||||||
{
|
{
|
||||||
@@ -633,6 +645,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
purge_streams(m);
|
purge_streams(m);
|
||||||
|
h2_ihash_iter(m->streams, report_input_consumption, m);
|
||||||
m->added_output = iowait;
|
m->added_output = iowait;
|
||||||
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
|
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
|
||||||
if (APLOGctrace2(m->c)) {
|
if (APLOGctrace2(m->c)) {
|
||||||
|
@@ -56,6 +56,7 @@ static void transit(h2_session *session, const char *action,
|
|||||||
|
|
||||||
static void on_stream_state_enter(void *ctx, h2_stream *stream);
|
static void on_stream_state_enter(void *ctx, h2_stream *stream);
|
||||||
static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev);
|
static void on_stream_state_event(void *ctx, h2_stream *stream, h2_stream_event_t ev);
|
||||||
|
static void on_stream_event(void *ctx, h2_stream *stream, h2_stream_event_t ev);
|
||||||
|
|
||||||
static int h2_session_status_from_apr_status(apr_status_t rv)
|
static int h2_session_status_from_apr_status(apr_status_t rv)
|
||||||
{
|
{
|
||||||
@@ -71,26 +72,79 @@ static int h2_session_status_from_apr_status(apr_status_t rv)
|
|||||||
return NGHTTP2_ERR_PROTO;
|
return NGHTTP2_ERR_PROTO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static h2_stream *get_stream(h2_session *session, int stream_id)
|
||||||
|
{
|
||||||
|
return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
|
||||||
|
}
|
||||||
|
|
||||||
static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
|
static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
|
||||||
{
|
{
|
||||||
h2_session *session = (h2_session*)ctx;
|
h2_session *session = ctx;
|
||||||
while (bytes_read > 0) {
|
|
||||||
int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read;
|
if (bytes_read > 0) {
|
||||||
nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read);
|
h2_stream *stream = get_stream(session, stream_id);
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
|
apr_off_t consumed = bytes_read;
|
||||||
"h2_stream(%ld-%d): consumed %d bytes",
|
|
||||||
session->id, stream_id, len);
|
while (consumed > 0) {
|
||||||
bytes_read -= len;
|
int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read;
|
||||||
|
nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read);
|
||||||
|
consumed -= len;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stream) {
|
||||||
|
int cur_size = nghttp2_session_get_stream_local_window_size(
|
||||||
|
session->ngh2, stream->id);
|
||||||
|
int win = stream->in_window_size;
|
||||||
|
int thigh = win * 8/10;
|
||||||
|
int tlow = win * 2/10;
|
||||||
|
const int win_max = 2*1024*1024;
|
||||||
|
const int win_min = 32*1024;
|
||||||
|
|
||||||
|
/* Work in progress, probably shoud add directives for these
|
||||||
|
* values once this stabilizes somewhat. The general idea is
|
||||||
|
* to adapt stream window sizes if the input window changes
|
||||||
|
* a) very quickly (< good RTT) from full to empty
|
||||||
|
* b) only a little bit (> bad RTT)
|
||||||
|
* where in a) it grows and in b) it shrinks again.
|
||||||
|
*/
|
||||||
|
if (cur_size > thigh && bytes_read > thigh && win < win_max) {
|
||||||
|
/* almost empty again with one reported consumption, how
|
||||||
|
* long did this take? */
|
||||||
|
long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
|
||||||
|
if (ms < 40) {
|
||||||
|
win = H2MIN(win_max, win + (64*1024));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (cur_size < tlow && bytes_read < tlow && win > win_min) {
|
||||||
|
/* staying full, for how long already? */
|
||||||
|
long ms = apr_time_msec(apr_time_now() - stream->in_last_write);
|
||||||
|
if (ms > 700) {
|
||||||
|
win = H2MAX(win_min, win - (32*1024));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (win != stream->in_window_size) {
|
||||||
|
stream->in_window_size = win;
|
||||||
|
nghttp2_session_set_local_window_size(session->ngh2,
|
||||||
|
NGHTTP2_FLAG_NONE, stream_id, win);
|
||||||
|
}
|
||||||
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
|
||||||
|
"h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d",
|
||||||
|
session->id, stream_id, (long)bytes_read,
|
||||||
|
cur_size, stream->in_window_size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static apr_status_t h2_session_receive(void *ctx,
|
|
||||||
const char *data, apr_size_t len,
|
|
||||||
apr_size_t *readlen);
|
|
||||||
|
|
||||||
static void dispatch_event(h2_session *session, h2_session_event_t ev,
|
static void dispatch_event(h2_session *session, h2_session_event_t ev,
|
||||||
int err, const char *msg);
|
int err, const char *msg);
|
||||||
|
|
||||||
|
void h2_session_event(h2_session *session, h2_session_event_t ev,
|
||||||
|
int err, const char *msg)
|
||||||
|
{
|
||||||
|
dispatch_event(session, ev, err, msg);
|
||||||
|
}
|
||||||
|
|
||||||
static int rst_unprocessed_stream(h2_stream *stream, void *ctx)
|
static int rst_unprocessed_stream(h2_stream *stream, void *ctx)
|
||||||
{
|
{
|
||||||
int unprocessed = (!h2_stream_was_closed(stream)
|
int unprocessed = (!h2_stream_was_closed(stream)
|
||||||
@@ -227,11 +281,6 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static h2_stream *get_stream(h2_session *session, int stream_id)
|
|
||||||
{
|
|
||||||
return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
|
static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
|
||||||
int32_t stream_id,
|
int32_t stream_id,
|
||||||
const uint8_t *data, size_t len, void *userp)
|
const uint8_t *data, size_t len, void *userp)
|
||||||
@@ -803,23 +852,35 @@ static apr_status_t h2_session_create_int(h2_session **psession,
|
|||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
session->in_pending = h2_iq_create(session->pool, session->max_stream_count);
|
||||||
|
if (session->in_pending == NULL) {
|
||||||
|
apr_pool_destroy(pool);
|
||||||
|
return APR_ENOMEM;
|
||||||
|
}
|
||||||
|
|
||||||
|
session->in_process = h2_iq_create(session->pool, session->max_stream_count);
|
||||||
|
if (session->in_process == NULL) {
|
||||||
|
apr_pool_destroy(pool);
|
||||||
|
return APR_ENOMEM;
|
||||||
|
}
|
||||||
|
|
||||||
session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor));
|
session->monitor = apr_pcalloc(pool, sizeof(h2_stream_monitor));
|
||||||
if (session->monitor == NULL) {
|
if (session->monitor == NULL) {
|
||||||
apr_pool_destroy(pool);
|
apr_pool_destroy(pool);
|
||||||
return status;
|
return APR_ENOMEM;
|
||||||
}
|
}
|
||||||
session->monitor->ctx = session;
|
session->monitor->ctx = session;
|
||||||
session->monitor->on_state_enter = on_stream_state_enter;
|
session->monitor->on_state_enter = on_stream_state_enter;
|
||||||
session->monitor->on_state_event = on_stream_state_event;
|
session->monitor->on_state_event = on_stream_state_event;
|
||||||
|
session->monitor->on_event = on_stream_event;
|
||||||
|
|
||||||
session->mplx = h2_mplx_create(c, session->pool, session->config,
|
session->mplx = h2_mplx_create(c, session->pool, session->config,
|
||||||
workers);
|
workers);
|
||||||
|
|
||||||
h2_mplx_set_consumed_cb(session->mplx, update_window, session);
|
h2_mplx_set_consumed_cb(session->mplx, update_window, session);
|
||||||
|
|
||||||
/* Install the connection input filter that feeds the session */
|
/* connection input filter that feeds the session */
|
||||||
session->cin = h2_filter_cin_create(session->pool,
|
session->cin = h2_filter_cin_create(session);
|
||||||
h2_session_receive, session);
|
|
||||||
ap_add_input_filter("H2_IN", session->cin, r, c);
|
ap_add_input_filter("H2_IN", session->cin, r, c);
|
||||||
|
|
||||||
h2_conn_io_init(&session->io, c, session->config);
|
h2_conn_io_init(&session->io, c, session->config);
|
||||||
@@ -1431,7 +1492,8 @@ send_headers:
|
|||||||
if (!stream->has_response) {
|
if (!stream->has_response) {
|
||||||
/* but no response */
|
/* but no response */
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
||||||
H2_STRM_LOG(APLOGNO(03466), stream, "no response, RST_STREAM"));
|
H2_STRM_LOG(APLOGNO(03466), stream,
|
||||||
|
"no response, RST_STREAM"));
|
||||||
h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
|
h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
|
||||||
return APR_SUCCESS;
|
return APR_SUCCESS;
|
||||||
}
|
}
|
||||||
@@ -1444,32 +1506,32 @@ send_headers:
|
|||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
static apr_status_t h2_session_receive(void *ctx, const char *data,
|
static void h2_session_in_flush(h2_session *session)
|
||||||
apr_size_t len, apr_size_t *readlen)
|
|
||||||
{
|
{
|
||||||
h2_session *session = ctx;
|
int id;
|
||||||
ssize_t n;
|
|
||||||
|
|
||||||
if (len > 0) {
|
while ((id = h2_iq_shift(session->in_process)) > 0) {
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
|
h2_stream *stream = get_stream(session, id);
|
||||||
H2_SSSN_MSG(session, "feeding %ld bytes to nghttp2"),
|
if (stream) {
|
||||||
(long)len);
|
ap_assert(!stream->scheduled);
|
||||||
n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
|
if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
|
||||||
if (n < 0) {
|
h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
|
||||||
if (nghttp2_is_fatal((int)n)) {
|
}
|
||||||
dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror((int)n));
|
else {
|
||||||
return APR_EGENERAL;
|
h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
}
|
||||||
*readlen = n;
|
|
||||||
session->io.bytes_read += n;
|
while ((id = h2_iq_shift(session->in_pending)) > 0) {
|
||||||
|
h2_stream *stream = get_stream(session, id);
|
||||||
|
if (stream) {
|
||||||
|
h2_stream_flush_input(stream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return APR_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static apr_status_t h2_session_read(h2_session *session, int block)
|
static apr_status_t session_read(h2_session *session, apr_size_t readlen, int block)
|
||||||
{
|
{
|
||||||
apr_status_t status, rstatus = APR_EAGAIN;
|
apr_status_t status, rstatus = APR_EAGAIN;
|
||||||
conn_rec *c = session->c;
|
conn_rec *c = session->c;
|
||||||
@@ -1481,7 +1543,7 @@ static apr_status_t h2_session_read(h2_session *session, int block)
|
|||||||
status = ap_get_brigade(c->input_filters,
|
status = ap_get_brigade(c->input_filters,
|
||||||
session->bbtmp, AP_MODE_READBYTES,
|
session->bbtmp, AP_MODE_READBYTES,
|
||||||
block? APR_BLOCK_READ : APR_NONBLOCK_READ,
|
block? APR_BLOCK_READ : APR_NONBLOCK_READ,
|
||||||
APR_BUCKET_BUFF_SIZE);
|
H2MAX(APR_BUCKET_BUFF_SIZE, readlen));
|
||||||
/* get rid of any possible data we do not expect to get */
|
/* get rid of any possible data we do not expect to get */
|
||||||
apr_brigade_cleanup(session->bbtmp);
|
apr_brigade_cleanup(session->bbtmp);
|
||||||
|
|
||||||
@@ -1523,16 +1585,25 @@ static apr_status_t h2_session_read(h2_session *session, int block)
|
|||||||
* status. */
|
* status. */
|
||||||
return rstatus;
|
return rstatus;
|
||||||
}
|
}
|
||||||
if ((session->io.bytes_read - read_start) > (64*1024)) {
|
if ((session->io.bytes_read - read_start) > readlen) {
|
||||||
/* read enough in one go, give write a chance */
|
/* read enough in one go, give write a chance */
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
|
||||||
H2_SSSN_MSG(session, "read 64k, returning"));
|
H2_SSSN_MSG(session, "read enough, returning"));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rstatus;
|
return rstatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static apr_status_t h2_session_read(h2_session *session, int block)
|
||||||
|
{
|
||||||
|
apr_status_t status = session_read(session, session->max_stream_mem
|
||||||
|
* H2MAX(2, session->open_streams),
|
||||||
|
block);
|
||||||
|
h2_session_in_flush(session);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
static const char *StateNames[] = {
|
static const char *StateNames[] = {
|
||||||
"INIT", /* H2_SESSION_ST_INIT */
|
"INIT", /* H2_SESSION_ST_INIT */
|
||||||
"DONE", /* H2_SESSION_ST_DONE */
|
"DONE", /* H2_SESSION_ST_DONE */
|
||||||
@@ -1769,6 +1840,7 @@ static void h2_session_ev_pre_close(h2_session *session, int arg, const char *ms
|
|||||||
|
|
||||||
static void ev_stream_open(h2_session *session, h2_stream *stream)
|
static void ev_stream_open(h2_session *session, h2_stream *stream)
|
||||||
{
|
{
|
||||||
|
h2_iq_append(session->in_process, stream->id);
|
||||||
switch (session->state) {
|
switch (session->state) {
|
||||||
case H2_SESSION_ST_IDLE:
|
case H2_SESSION_ST_IDLE:
|
||||||
if (session->open_streams == 1) {
|
if (session->open_streams == 1) {
|
||||||
@@ -1779,14 +1851,6 @@ static void ev_stream_open(h2_session *session, h2_stream *stream)
|
|||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
ap_assert(!stream->scheduled);
|
|
||||||
if (h2_stream_prep_processing(stream) == APR_SUCCESS) {
|
|
||||||
h2_mplx_process(session->mplx, stream, stream_pri_cmp, session);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ev_stream_closed(h2_session *session, h2_stream *stream)
|
static void ev_stream_closed(h2_session *session, h2_stream *stream)
|
||||||
@@ -1863,6 +1927,20 @@ static void on_stream_state_enter(void *ctx, h2_stream *stream)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void on_stream_event(void *ctx, h2_stream *stream,
|
||||||
|
h2_stream_event_t ev)
|
||||||
|
{
|
||||||
|
h2_session *session = ctx;
|
||||||
|
switch (ev) {
|
||||||
|
case H2_SEV_IN_DATA_PENDING:
|
||||||
|
h2_iq_append(session->in_pending, stream->id);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
/* NOP */
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void on_stream_state_event(void *ctx, h2_stream *stream,
|
static void on_stream_state_event(void *ctx, h2_stream *stream,
|
||||||
h2_stream_event_t ev)
|
h2_stream_event_t ev)
|
||||||
{
|
{
|
||||||
|
@@ -125,6 +125,10 @@ typedef struct h2_session {
|
|||||||
char status[64]; /* status message for scoreboard */
|
char status[64]; /* status message for scoreboard */
|
||||||
int last_status_code; /* the one already reported */
|
int last_status_code; /* the one already reported */
|
||||||
const char *last_status_msg; /* the one already reported */
|
const char *last_status_msg; /* the one already reported */
|
||||||
|
|
||||||
|
struct h2_iqueue *in_pending; /* all streams with input pending */
|
||||||
|
struct h2_iqueue *in_process; /* all streams ready for processing on slave */
|
||||||
|
|
||||||
} h2_session;
|
} h2_session;
|
||||||
|
|
||||||
const char *h2_session_state_str(h2_session_state state);
|
const char *h2_session_state_str(h2_session_state state);
|
||||||
@@ -155,6 +159,9 @@ apr_status_t h2_session_rcreate(h2_session **psession,
|
|||||||
request_rec *r, struct h2_ctx *ctx,
|
request_rec *r, struct h2_ctx *ctx,
|
||||||
struct h2_workers *workers);
|
struct h2_workers *workers);
|
||||||
|
|
||||||
|
void h2_session_event(h2_session *session, h2_session_event_t ev,
|
||||||
|
int err, const char *msg);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the given HTTP/2 session until it is ended or a fatal
|
* Process the given HTTP/2 session until it is ended or a fatal
|
||||||
* error occurred.
|
* error occurred.
|
||||||
|
@@ -157,9 +157,15 @@ static int on_frame_recv(h2_stream_state_t state, int frame_type)
|
|||||||
return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
|
return on_frame(state, frame_type, trans_on_recv, H2_ALEN(trans_on_recv));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int on_event(h2_stream_state_t state, h2_stream_event_t ev)
|
static int on_event(h2_stream* stream, h2_stream_event_t ev)
|
||||||
{
|
{
|
||||||
return on_map(state, trans_on_event[ev]);
|
if (stream->monitor && stream->monitor->on_event) {
|
||||||
|
stream->monitor->on_event(stream->monitor->ctx, stream, ev);
|
||||||
|
}
|
||||||
|
if (ev < H2_ALEN(trans_on_event)) {
|
||||||
|
return on_map(stream->state, trans_on_event[ev]);
|
||||||
|
}
|
||||||
|
return stream->state;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
|
static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
|
||||||
@@ -176,11 +182,16 @@ static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static apr_status_t setup_input(h2_stream *stream) {
|
static apr_status_t setup_input(h2_stream *stream) {
|
||||||
if (stream->input == NULL && !stream->input_eof) {
|
if (stream->input == NULL) {
|
||||||
h2_beam_create(&stream->input, stream->pool, stream->id,
|
int empty = (stream->input_eof
|
||||||
"input", H2_BEAM_OWNER_SEND, 0,
|
&& (!stream->in_buffer
|
||||||
stream->session->s->timeout);
|
|| APR_BRIGADE_EMPTY(stream->in_buffer)));
|
||||||
h2_beam_send_from(stream->input, stream->pool);
|
if (!empty) {
|
||||||
|
h2_beam_create(&stream->input, stream->pool, stream->id,
|
||||||
|
"input", H2_BEAM_OWNER_SEND, 0,
|
||||||
|
stream->session->s->timeout);
|
||||||
|
h2_beam_send_from(stream->input, stream->pool);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return APR_SUCCESS;
|
return APR_SUCCESS;
|
||||||
}
|
}
|
||||||
@@ -202,27 +213,27 @@ static apr_status_t close_input(h2_stream *stream)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
|
if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
|
||||||
apr_bucket_brigade *tmp;
|
|
||||||
apr_bucket *b;
|
apr_bucket *b;
|
||||||
h2_headers *r;
|
h2_headers *r;
|
||||||
|
|
||||||
tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
|
if (!stream->in_buffer) {
|
||||||
|
stream->in_buffer = apr_brigade_create(stream->pool, c->bucket_alloc);
|
||||||
|
}
|
||||||
|
|
||||||
r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool);
|
r = h2_headers_create(HTTP_OK, stream->trailers, NULL, stream->pool);
|
||||||
stream->trailers = NULL;
|
stream->trailers = NULL;
|
||||||
b = h2_bucket_headers_create(c->bucket_alloc, r);
|
b = h2_bucket_headers_create(c->bucket_alloc, r);
|
||||||
APR_BRIGADE_INSERT_TAIL(tmp, b);
|
APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
|
||||||
|
|
||||||
b = apr_bucket_eos_create(c->bucket_alloc);
|
b = apr_bucket_eos_create(c->bucket_alloc);
|
||||||
APR_BRIGADE_INSERT_TAIL(tmp, b);
|
APR_BRIGADE_INSERT_TAIL(stream->in_buffer, b);
|
||||||
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
|
||||||
H2_STRM_MSG(stream, "added trailers"));
|
H2_STRM_MSG(stream, "added trailers"));
|
||||||
setup_input(stream);
|
h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
|
||||||
status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
|
|
||||||
apr_brigade_destroy(tmp);
|
|
||||||
}
|
}
|
||||||
if (stream->input) {
|
if (stream->input) {
|
||||||
|
h2_stream_flush_input(stream);
|
||||||
return h2_beam_close(stream->input);
|
return h2_beam_close(stream->input);
|
||||||
}
|
}
|
||||||
return status;
|
return status;
|
||||||
@@ -329,7 +340,7 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
|
|||||||
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
|
||||||
H2_STRM_MSG(stream, "dispatch event %d"), ev);
|
H2_STRM_MSG(stream, "dispatch event %d"), ev);
|
||||||
new_state = on_event(stream->state, ev);
|
new_state = on_event(stream, ev);
|
||||||
if (new_state < 0) {
|
if (new_state < 0) {
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
|
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
|
||||||
H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev);
|
H2_STRM_LOG(APLOGNO(10002), stream, "invalid event %d"), ev);
|
||||||
@@ -340,7 +351,7 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
|
|||||||
else if (new_state == stream->state) {
|
else if (new_state == stream->state) {
|
||||||
/* nop */
|
/* nop */
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
|
||||||
H2_STRM_MSG(stream, "ignored event %d"), ev);
|
H2_STRM_MSG(stream, "non-state event %d"), ev);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@@ -399,7 +410,7 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags)
|
|||||||
H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
|
H2_STRM_MSG(stream, "send frame %d, eos=%d"), ftype, eos);
|
||||||
status = transit(stream, new_state);
|
status = transit(stream, new_state);
|
||||||
if (status == APR_SUCCESS && eos) {
|
if (status == APR_SUCCESS && eos) {
|
||||||
status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_L));
|
status = transit(stream, on_event(stream, H2_SEV_CLOSED_L));
|
||||||
}
|
}
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
@@ -449,7 +460,23 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags)
|
|||||||
}
|
}
|
||||||
status = transit(stream, new_state);
|
status = transit(stream, new_state);
|
||||||
if (status == APR_SUCCESS && eos) {
|
if (status == APR_SUCCESS && eos) {
|
||||||
status = transit(stream, on_event(stream->state, H2_SEV_CLOSED_R));
|
status = transit(stream, on_event(stream, H2_SEV_CLOSED_R));
|
||||||
|
}
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
apr_status_t h2_stream_flush_input(h2_stream *stream)
|
||||||
|
{
|
||||||
|
apr_status_t status = APR_SUCCESS;
|
||||||
|
|
||||||
|
if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) {
|
||||||
|
setup_input(stream);
|
||||||
|
status = h2_beam_send(stream->input, stream->in_buffer, APR_BLOCK_READ);
|
||||||
|
stream->in_last_write = apr_time_now();
|
||||||
|
}
|
||||||
|
if (stream->input_eof
|
||||||
|
&& stream->input && !h2_beam_is_closed(stream->input)) {
|
||||||
|
status = h2_beam_close(stream->input);
|
||||||
}
|
}
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
@@ -459,21 +486,27 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
|
|||||||
{
|
{
|
||||||
h2_session *session = stream->session;
|
h2_session *session = stream->session;
|
||||||
apr_status_t status = APR_SUCCESS;
|
apr_status_t status = APR_SUCCESS;
|
||||||
apr_bucket_brigade *tmp;
|
|
||||||
|
|
||||||
ap_assert(stream);
|
|
||||||
if (len > 0) {
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
|
|
||||||
H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
|
|
||||||
|
|
||||||
tmp = apr_brigade_create(stream->pool, session->c->bucket_alloc);
|
|
||||||
apr_brigade_write(tmp, NULL, NULL, (const char *)data, len);
|
|
||||||
setup_input(stream);
|
|
||||||
status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
|
|
||||||
apr_brigade_destroy(tmp);
|
|
||||||
}
|
|
||||||
stream->in_data_frames++;
|
stream->in_data_frames++;
|
||||||
stream->in_data_octets += len;
|
if (len > 0) {
|
||||||
|
if (APLOGctrace3(session->c)) {
|
||||||
|
const char *load = apr_pstrndup(stream->pool, (const char *)data, len);
|
||||||
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, session->c,
|
||||||
|
H2_STRM_MSG(stream, "recv DATA, len=%d: -->%s<--"),
|
||||||
|
(int)len, load);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
|
||||||
|
H2_STRM_MSG(stream, "recv DATA, len=%d"), (int)len);
|
||||||
|
}
|
||||||
|
stream->in_data_octets += len;
|
||||||
|
if (!stream->in_buffer) {
|
||||||
|
stream->in_buffer = apr_brigade_create(stream->pool,
|
||||||
|
session->c->bucket_alloc);
|
||||||
|
}
|
||||||
|
apr_brigade_write(stream->in_buffer, NULL, NULL, (const char *)data, len);
|
||||||
|
h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
|
||||||
|
}
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -500,7 +533,11 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
|
|||||||
|
|
||||||
h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0,
|
h2_beam_create(&stream->output, pool, id, "output", H2_BEAM_OWNER_RECV, 0,
|
||||||
session->s->timeout);
|
session->s->timeout);
|
||||||
|
|
||||||
|
stream->in_window_size =
|
||||||
|
nghttp2_session_get_stream_local_window_size(
|
||||||
|
stream->session->ngh2, stream->id);
|
||||||
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
||||||
H2_STRM_LOG(APLOGNO(03082), stream, "created"));
|
H2_STRM_LOG(APLOGNO(03082), stream, "created"));
|
||||||
on_state_enter(stream);
|
on_state_enter(stream);
|
||||||
|
@@ -57,6 +57,8 @@ typedef struct h2_stream_monitor {
|
|||||||
was detected */
|
was detected */
|
||||||
h2_stream_event_cb *on_state_event; /* called right before the given event
|
h2_stream_event_cb *on_state_event; /* called right before the given event
|
||||||
result in a new stream state */
|
result in a new stream state */
|
||||||
|
h2_stream_event_cb *on_event; /* called for events that do not
|
||||||
|
trigger a state change */
|
||||||
} h2_stream_monitor;
|
} h2_stream_monitor;
|
||||||
|
|
||||||
struct h2_stream {
|
struct h2_stream {
|
||||||
@@ -74,9 +76,13 @@ struct h2_stream {
|
|||||||
int request_headers_added; /* number of request headers added */
|
int request_headers_added; /* number of request headers added */
|
||||||
|
|
||||||
struct h2_bucket_beam *input;
|
struct h2_bucket_beam *input;
|
||||||
|
apr_bucket_brigade *in_buffer;
|
||||||
|
int in_window_size;
|
||||||
|
apr_time_t in_last_write;
|
||||||
|
|
||||||
struct h2_bucket_beam *output;
|
struct h2_bucket_beam *output;
|
||||||
apr_size_t max_mem; /* maximum amount of data buffered */
|
|
||||||
apr_bucket_brigade *out_buffer;
|
apr_bucket_brigade *out_buffer;
|
||||||
|
apr_size_t max_mem; /* maximum amount of data buffered */
|
||||||
|
|
||||||
int rst_error; /* stream error for RST_STREAM */
|
int rst_error; /* stream error for RST_STREAM */
|
||||||
unsigned int aborted : 1; /* was aborted */
|
unsigned int aborted : 1; /* was aborted */
|
||||||
@@ -205,6 +211,8 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int frame_type, int flags);
|
|||||||
apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
|
apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
|
||||||
const uint8_t *data, size_t len);
|
const uint8_t *data, size_t len);
|
||||||
|
|
||||||
|
apr_status_t h2_stream_flush_input(h2_stream *stream);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the stream. Stream write/reads will return errors afterwards.
|
* Reset the stream. Stream write/reads will return errors afterwards.
|
||||||
*
|
*
|
||||||
@@ -291,7 +299,6 @@ const char *h2_stream_state_str(h2_stream *stream);
|
|||||||
*/
|
*/
|
||||||
int h2_stream_is_ready(h2_stream *stream);
|
int h2_stream_is_ready(h2_stream *stream);
|
||||||
|
|
||||||
|
|
||||||
#define H2_STRM_MSG(s, msg) \
|
#define H2_STRM_MSG(s, msg) \
|
||||||
"h2_stream(%ld-%d,%s): "msg, s->session->id, s->id, h2_stream_state_str(s)
|
"h2_stream(%ld-%d,%s): "msg, s->session->id, s->id, h2_stream_state_str(s)
|
||||||
|
|
||||||
|
@@ -216,14 +216,18 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f,
|
|||||||
apr_status_t status = APR_SUCCESS;
|
apr_status_t status = APR_SUCCESS;
|
||||||
apr_bucket *b, *next;
|
apr_bucket *b, *next;
|
||||||
apr_off_t bblen;
|
apr_off_t bblen;
|
||||||
apr_size_t rmax;
|
const int trace1 = APLOGctrace1(f->c);
|
||||||
|
apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)?
|
||||||
|
(apr_size_t)readbytes : APR_SIZE_MAX);
|
||||||
|
|
||||||
task = h2_ctx_cget_task(f->c);
|
task = h2_ctx_cget_task(f->c);
|
||||||
ap_assert(task);
|
ap_assert(task);
|
||||||
rmax = ((readbytes <= APR_SIZE_MAX)? (apr_size_t)readbytes : APR_SIZE_MAX);
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
if (trace1) {
|
||||||
"h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld",
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
||||||
task->id, mode, block, (long)readbytes);
|
"h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld",
|
||||||
|
task->id, mode, block, (long)readbytes);
|
||||||
|
}
|
||||||
|
|
||||||
if (mode == AP_MODE_INIT) {
|
if (mode == AP_MODE_INIT) {
|
||||||
return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes);
|
return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes);
|
||||||
@@ -249,19 +253,23 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f,
|
|||||||
|
|
||||||
while (APR_BRIGADE_EMPTY(task->input.bb)) {
|
while (APR_BRIGADE_EMPTY(task->input.bb)) {
|
||||||
/* Get more input data for our request. */
|
/* Get more input data for our request. */
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
if (trace1) {
|
||||||
"h2_slave_in(%s): get more data from mplx, block=%d, "
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||||
"readbytes=%ld", task->id, block, (long)readbytes);
|
"h2_slave_in(%s): get more data from mplx, block=%d, "
|
||||||
|
"readbytes=%ld", task->id, block, (long)readbytes);
|
||||||
|
}
|
||||||
if (task->input.beam) {
|
if (task->input.beam) {
|
||||||
status = h2_beam_receive(task->input.beam, task->input.bb, block,
|
status = h2_beam_receive(task->input.beam, task->input.bb, block,
|
||||||
H2MIN(readbytes, 32*1024));
|
128*1024);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
status = APR_EOF;
|
status = APR_EOF;
|
||||||
}
|
}
|
||||||
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
|
if (trace1) {
|
||||||
"h2_slave_in(%s): read returned", task->id);
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
|
||||||
|
"h2_slave_in(%s): read returned", task->id);
|
||||||
|
}
|
||||||
if (APR_STATUS_IS_EAGAIN(status)
|
if (APR_STATUS_IS_EAGAIN(status)
|
||||||
&& (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) {
|
&& (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) {
|
||||||
/* chunked input handling does not seem to like it if we
|
/* chunked input handling does not seem to like it if we
|
||||||
@@ -275,9 +283,11 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f,
|
|||||||
else if (status != APR_SUCCESS) {
|
else if (status != APR_SUCCESS) {
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
|
if (trace1) {
|
||||||
"input.beam recv raw", task->input.bb);
|
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
|
||||||
|
"input.beam recv raw", task->input.bb);
|
||||||
|
}
|
||||||
if (h2_task_logio_add_bytes_in) {
|
if (h2_task_logio_add_bytes_in) {
|
||||||
apr_brigade_length(bb, 0, &bblen);
|
apr_brigade_length(bb, 0, &bblen);
|
||||||
h2_task_logio_add_bytes_in(f->c, bblen);
|
h2_task_logio_add_bytes_in(f->c, bblen);
|
||||||
@@ -291,12 +301,16 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f,
|
|||||||
return (mode == AP_MODE_SPECULATIVE)? APR_EAGAIN : APR_EOF;
|
return (mode == AP_MODE_SPECULATIVE)? APR_EAGAIN : APR_EOF;
|
||||||
}
|
}
|
||||||
|
|
||||||
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
|
if (trace1) {
|
||||||
"task_input.bb", task->input.bb);
|
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
|
||||||
|
"task_input.bb", task->input.bb);
|
||||||
|
}
|
||||||
|
|
||||||
if (APR_BRIGADE_EMPTY(task->input.bb)) {
|
if (APR_BRIGADE_EMPTY(task->input.bb)) {
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
if (trace1) {
|
||||||
"h2_slave_in(%s): no data", task->id);
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
||||||
|
"h2_slave_in(%s): no data", task->id);
|
||||||
|
}
|
||||||
return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF;
|
return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -321,9 +335,11 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f,
|
|||||||
apr_size_t len = sizeof(buffer)-1;
|
apr_size_t len = sizeof(buffer)-1;
|
||||||
apr_brigade_flatten(bb, buffer, &len);
|
apr_brigade_flatten(bb, buffer, &len);
|
||||||
buffer[len] = 0;
|
buffer[len] = 0;
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
if (trace1) {
|
||||||
"h2_slave_in(%s): getline: %s",
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||||
task->id, buffer);
|
"h2_slave_in(%s): getline: %s",
|
||||||
|
task->id, buffer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@@ -336,7 +352,7 @@ static apr_status_t h2_filter_slave_in(ap_filter_t* f,
|
|||||||
status = APR_ENOTIMPL;
|
status = APR_ENOTIMPL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (APLOGctrace1(f->c)) {
|
if (trace1) {
|
||||||
apr_brigade_length(bb, 0, &bblen);
|
apr_brigade_length(bb, 0, &bblen);
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||||
"h2_slave_in(%s): %ld data bytes", task->id, (long)bblen);
|
"h2_slave_in(%s): %ld data bytes", task->id, (long)bblen);
|
||||||
|
@@ -991,17 +991,16 @@ apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest,
|
|||||||
apr_bucket_brigade *src,
|
apr_bucket_brigade *src,
|
||||||
apr_off_t length)
|
apr_off_t length)
|
||||||
{
|
{
|
||||||
apr_bucket *b, *next;
|
apr_bucket *b;
|
||||||
apr_off_t remain = length;
|
apr_off_t remain = length;
|
||||||
apr_status_t status = APR_SUCCESS;
|
apr_status_t status = APR_SUCCESS;
|
||||||
|
|
||||||
for (b = APR_BRIGADE_FIRST(src);
|
while (!APR_BRIGADE_EMPTY(src)) {
|
||||||
b != APR_BRIGADE_SENTINEL(src);
|
b = APR_BRIGADE_FIRST(src);
|
||||||
b = next) {
|
|
||||||
next = APR_BUCKET_NEXT(b);
|
|
||||||
|
|
||||||
if (APR_BUCKET_IS_METADATA(b)) {
|
if (APR_BUCKET_IS_METADATA(b)) {
|
||||||
/* fall through */
|
APR_BUCKET_REMOVE(b);
|
||||||
|
APR_BRIGADE_INSERT_TAIL(dest, b);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
if (remain == b->length) {
|
if (remain == b->length) {
|
||||||
@@ -1024,10 +1023,10 @@ apr_status_t h2_brigade_concat_length(apr_bucket_brigade *dest,
|
|||||||
apr_bucket_split(b, remain);
|
apr_bucket_split(b, remain);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
APR_BUCKET_REMOVE(b);
|
||||||
|
APR_BRIGADE_INSERT_TAIL(dest, b);
|
||||||
|
remain -= b->length;
|
||||||
}
|
}
|
||||||
APR_BUCKET_REMOVE(b);
|
|
||||||
APR_BRIGADE_INSERT_TAIL(dest, b);
|
|
||||||
remain -= b->length;
|
|
||||||
}
|
}
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
@@ -1215,55 +1214,14 @@ apr_size_t h2_util_bucket_print(char *buffer, apr_size_t bmax,
|
|||||||
if (bmax <= off) {
|
if (bmax <= off) {
|
||||||
return off;
|
return off;
|
||||||
}
|
}
|
||||||
if (APR_BUCKET_IS_METADATA(b)) {
|
else if (APR_BUCKET_IS_METADATA(b)) {
|
||||||
if (APR_BUCKET_IS_EOS(b)) {
|
off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name);
|
||||||
off += apr_snprintf(buffer+off, bmax-off, "eos");
|
|
||||||
}
|
|
||||||
else if (APR_BUCKET_IS_FLUSH(b)) {
|
|
||||||
off += apr_snprintf(buffer+off, bmax-off, "flush");
|
|
||||||
}
|
|
||||||
else if (AP_BUCKET_IS_EOR(b)) {
|
|
||||||
off += apr_snprintf(buffer+off, bmax-off, "eor");
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
off += apr_snprintf(buffer+off, bmax-off, "%s", b->type->name);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else {
|
else if (bmax > off) {
|
||||||
const char *btype = b->type->name;
|
off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
|
||||||
if (APR_BUCKET_IS_FILE(b)) {
|
b->type->name,
|
||||||
btype = "file";
|
(long)(b->length == ((apr_size_t)-1)?
|
||||||
}
|
-1 : b->length));
|
||||||
else if (APR_BUCKET_IS_PIPE(b)) {
|
|
||||||
btype = "pipe";
|
|
||||||
}
|
|
||||||
else if (APR_BUCKET_IS_SOCKET(b)) {
|
|
||||||
btype = "socket";
|
|
||||||
}
|
|
||||||
else if (APR_BUCKET_IS_HEAP(b)) {
|
|
||||||
btype = "heap";
|
|
||||||
}
|
|
||||||
else if (APR_BUCKET_IS_TRANSIENT(b)) {
|
|
||||||
btype = "transient";
|
|
||||||
}
|
|
||||||
else if (APR_BUCKET_IS_IMMORTAL(b)) {
|
|
||||||
btype = "immortal";
|
|
||||||
}
|
|
||||||
#if APR_HAS_MMAP
|
|
||||||
else if (APR_BUCKET_IS_MMAP(b)) {
|
|
||||||
btype = "mmap";
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
else if (APR_BUCKET_IS_POOL(b)) {
|
|
||||||
btype = "pool";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bmax > off) {
|
|
||||||
off += apr_snprintf(buffer+off, bmax-off, "%s[%ld]",
|
|
||||||
btype,
|
|
||||||
(long)(b->length == ((apr_size_t)-1)?
|
|
||||||
-1 : b->length));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return off;
|
return off;
|
||||||
}
|
}
|
||||||
|
@@ -430,8 +430,8 @@ do { \
|
|||||||
const char *line = "(null)"; \
|
const char *line = "(null)"; \
|
||||||
apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \
|
apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \
|
||||||
len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \
|
len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \
|
||||||
ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%s): %s", \
|
ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld): %s", \
|
||||||
(c)->log_id, (len? buffer : line)); \
|
((c)->master? (c)->master->id : (c)->id), (len? buffer : line)); \
|
||||||
} while(0)
|
} while(0)
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user