1
0
mirror of https://github.com/apache/httpd.git synced 2025-08-08 15:02:10 +03:00

mod_http2: support for several different request engines per connection, fixes CVE-2016-1546

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1733727 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Stefan Eissing
2016-03-05 15:45:12 +00:00
parent e4c0735413
commit ebd03293e6
20 changed files with 886 additions and 536 deletions

View File

@@ -407,7 +407,7 @@ SET(mod_http2_extra_sources
modules/http2/h2_mplx.c modules/http2/h2_push.c
modules/http2/h2_request.c modules/http2/h2_response.c
modules/http2/h2_session.c modules/http2/h2_stream.c
modules/http2/h2_switch.c
modules/http2/h2_switch.c modules/http2/h2_ngn_shed.c
modules/http2/h2_task.c modules/http2/h2_task_input.c
modules/http2/h2_task_output.c modules/http2/h2_int_queue.c
modules/http2/h2_util.c modules/http2/h2_worker.c

View File

@@ -198,6 +198,7 @@ FILES_nlm_objs = \
$(OBJDIR)/h2_io.o \
$(OBJDIR)/h2_io_set.o \
$(OBJDIR)/h2_mplx.o \
$(OBJDIR)/h2_ngn_shed.o \
$(OBJDIR)/h2_push.o \
$(OBJDIR)/h2_request.o \
$(OBJDIR)/h2_response.o \

View File

@@ -33,6 +33,7 @@ h2_int_queue.lo dnl
h2_io.lo dnl
h2_io_set.lo dnl
h2_mplx.lo dnl
h2_ngn_shed.lo dnl
h2_push.lo dnl
h2_request.lo dnl
h2_response.lo dnl
@@ -200,7 +201,8 @@ is usually linked shared and requires loading. ], $http2_objs, , most, [
])
# Ensure that other modules can pick up mod_http2.h
APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current])
# icing: hold back for now until it is more stable
#APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current])

View File

@@ -213,6 +213,16 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush)
return h2_conn_io_flush_int(io, flush, 0);
}
apr_status_t h2_conn_io_flush(h2_conn_io *io)
{
/* make sure we always write a flush, even if our buffers are empty.
* We want to flush not only our buffers, but alse ones further down
* the connection filters. */
apr_bucket *b = apr_bucket_flush_create(io->connection->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(io->output, b);
return h2_conn_io_flush_int(io, 0, 0);
}
apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
{
apr_off_t len = 0;

View File

@@ -78,6 +78,7 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session);
* @param flush if a flush bucket should be appended to any output
*/
apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush);
apr_status_t h2_conn_io_flush(h2_conn_io *io);
/**
* Check the amount of buffered output and pass it on if enough has accumulated.

View File

@@ -38,6 +38,7 @@
#include "h2_io_set.h"
#include "h2_response.h"
#include "h2_mplx.h"
#include "h2_ngn_shed.h"
#include "h2_request.h"
#include "h2_stream.h"
#include "h2_task.h"
@@ -143,10 +144,7 @@ static void h2_mplx_destroy(h2_mplx *m)
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): destroy, ios=%d",
m->id, (int)h2_io_set_size(m->stream_ios));
m->aborted = 1;
check_tx_free(m);
if (m->pool) {
apr_pool_destroy(m->pool);
}
@@ -197,7 +195,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
return NULL;
}
status = apr_thread_cond_create(&m->task_done, m->pool);
status = apr_thread_cond_create(&m->req_added, m->pool);
if (status != APR_SUCCESS) {
h2_mplx_destroy(m);
return NULL;
@@ -217,6 +215,9 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
m->tx_handles_reserved = 0;
m->tx_chunk_size = 4;
m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->stream_max_mem);
h2_ngn_shed_set_ctx(m->ngn_shed , m);
}
return m;
}
@@ -362,7 +363,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
h2_mplx_set_consumed_cb(m, NULL, NULL);
h2_iq_clear(m->q);
apr_thread_cond_broadcast(m->task_done);
apr_thread_cond_broadcast(m->req_added);
while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
/* iterate until all ios have been orphaned or destroyed */
}
@@ -397,8 +398,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
h2_io_set_iter(m->stream_ios, stream_print, m);
}
}
m->aborted = 1;
apr_thread_cond_broadcast(m->task_done);
h2_mplx_abort(m);
apr_thread_cond_broadcast(m->req_added);
}
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
@@ -412,16 +413,14 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
void h2_mplx_abort(h2_mplx *m)
{
apr_status_t status;
int acquired;
AP_DEBUG_ASSERT(m);
if (!m->aborted) {
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
m->aborted = 1;
h2_ngn_shed_abort(m->ngn_shed);
leave_mutex(m, acquired);
}
}
}
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
@@ -695,7 +694,8 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
}
static apr_status_t out_write(h2_mplx *m, h2_io *io,
ap_filter_t* f, apr_bucket_brigade *bb,
ap_filter_t* f, int blocking,
apr_bucket_brigade *bb,
apr_table_t *trailers,
struct apr_thread_cond_t *iowait)
{
@@ -719,6 +719,9 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io,
&& iowait
&& (m->stream_max_mem <= h2_io_out_length(io))
&& !is_aborted(m, &status)) {
if (!blocking) {
return APR_INCOMPLETE;
}
trailers = NULL;
if (f) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
@@ -757,7 +760,12 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
check_tx_reservation(m);
}
if (bb) {
status = out_write(m, io, f, bb, response->trailers, iowait);
status = out_write(m, io, f, 0, bb, response->trailers, iowait);
if (status == APR_INCOMPLETE) {
/* write will have transferred as much data as possible.
caller has to deal with non-empty brigade */
status = APR_SUCCESS;
}
}
have_out_data_for(m, stream_id);
}
@@ -791,7 +799,8 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
}
apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
ap_filter_t* f, apr_bucket_brigade *bb,
ap_filter_t* f, int blocking,
apr_bucket_brigade *bb,
apr_table_t *trailers,
struct apr_thread_cond_t *iowait)
{
@@ -802,7 +811,7 @@ apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io && !io->orphaned) {
status = out_write(m, io, f, bb, trailers, iowait);
status = out_write(m, io, f, blocking, bb, trailers, iowait);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_mplx(%ld-%d): write with trailers=%s",
m->id, io->id, trailers? "yes" : "no");
@@ -1111,7 +1120,9 @@ static void task_done(h2_mplx *m, h2_task *task)
if (task->frozen) {
/* this task was handed over to an engine for processing */
h2_task_thaw(task);
/* TODO: can we signal an engine that it can now start on this? */
/* TODO: not implemented yet... */
/*h2_task_set_io_blocking(task, 0);*/
apr_thread_cond_broadcast(m->req_added);
}
else {
h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
@@ -1174,7 +1185,6 @@ static void task_done(h2_mplx *m, h2_task *task)
/* hang around until the stream deregisteres */
}
}
apr_thread_cond_broadcast(m->task_done);
}
}
}
@@ -1337,59 +1347,8 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
* HTTP/2 request engines
******************************************************************************/
typedef struct h2_req_entry h2_req_entry;
struct h2_req_entry {
APR_RING_ENTRY(h2_req_entry) link;
request_rec *r;
};
#define H2_REQ_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
#define H2_REQ_ENTRY_PREV(e) APR_RING_PREV((e), link)
#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
typedef struct h2_req_engine_i h2_req_engine_i;
struct h2_req_engine_i {
h2_req_engine pub;
conn_rec *c; /* connection this engine is assigned to */
h2_mplx *m;
unsigned int shutdown : 1; /* engine is being shut down */
apr_thread_cond_t *io; /* condition var for waiting on data */
APR_RING_HEAD(h2_req_entries, h2_req_entry) entries;
apr_size_t no_assigned; /* # of assigned requests */
apr_size_t no_live; /* # of live */
apr_size_t no_finished; /* # of finished */
};
#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_req_entry, link)
#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_req_entry, link)
#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b)
#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \
h2_req_entry *ap__b = (e); \
APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link); \
} while (0)
#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \
h2_req_entry *ap__b = (e); \
APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link); \
} while (0)
static apr_status_t h2_mplx_engine_schedule(h2_mplx *m,
h2_req_engine_i *engine,
request_rec *r)
{
h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
APR_RING_ELEM_INIT(entry, link);
entry->r = r;
H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry);
return APR_SUCCESS;
}
apr_status_t h2_mplx_engine_push(const char *engine_type,
request_rec *r, h2_mplx_engine_init *einit)
apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
request_rec *r, h2_req_engine_init *einit)
{
apr_status_t status;
h2_mplx *m;
@@ -1409,63 +1368,7 @@ apr_status_t h2_mplx_engine_push(const char *engine_type,
status = APR_ECONNABORTED;
}
else {
h2_req_engine_i *engine = (h2_req_engine_i*)m->engine;
apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
status = APR_EOF;
if (task->ser_headers) {
/* Max compatibility, deny processing of this */
}
else if (engine && !strcmp(engine->pub.type, engine_type)) {
if (engine->shutdown
|| engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
"h2_mplx(%ld): engine shutdown or over %s",
m->c->id, engine->pub.id);
engine = NULL;
}
else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) {
/* this task will be processed in another thread,
* freeze any I/O for the time being. */
h2_task_freeze(task, r);
engine->no_assigned++;
status = APR_SUCCESS;
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
"h2_mplx(%ld): push request %s",
m->c->id, r->the_request);
}
else {
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
"h2_mplx(%ld): engine error adding req %s",
m->c->id, engine->pub.id);
engine = NULL;
}
}
if (!engine && einit) {
engine = apr_pcalloc(task->c->pool, sizeof(*engine));
engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d",
m->id, m->next_eng_id++);
engine->pub.pool = task->c->pool;
engine->pub.type = apr_pstrdup(task->c->pool, engine_type);
engine->pub.window_bits = 30;
engine->pub.req_window_bits = h2_log2(m->stream_max_mem);
engine->c = r->connection;
APR_RING_INIT(&engine->entries, h2_req_entry, link);
engine->m = m;
engine->io = task->io;
engine->no_assigned = 1;
engine->no_live = 1;
status = einit(&engine->pub, r);
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
"h2_mplx(%ld): init engine %s (%s)",
m->c->id, engine->pub.id, engine->pub.type);
if (status == APR_SUCCESS) {
m->engine = &engine->pub;
}
}
status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type, task, r, einit);
}
leave_mutex(m, acquired);
@@ -1473,163 +1376,66 @@ apr_status_t h2_mplx_engine_push(const char *engine_type,
return status;
}
static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine)
apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
apr_read_type_e block,
apr_uint32_t capacity,
request_rec **pr)
{
h2_req_entry *entry;
h2_task *task;
for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
entry = H2_REQ_ENTRY_NEXT(entry)) {
task = h2_ctx_rget_task(entry->r);
AP_DEBUG_ASSERT(task);
if (!task->frozen) {
H2_REQ_ENTRY_REMOVE(entry);
return entry;
}
}
return NULL;
}
static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine,
apr_read_type_e block, request_rec **pr)
{
h2_req_entry *entry;
AP_DEBUG_ASSERT(m);
AP_DEBUG_ASSERT(engine);
while (1) {
if (m->aborted) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): mplx abort while pulling requests %s",
m->id, engine->pub.id);
*pr = NULL;
return APR_EOF;
}
if (!H2_REQ_ENTRIES_EMPTY(&engine->entries)
&& (entry = pop_non_frozen(engine))) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
"h2_mplx(%ld): request %s pulled by engine %s",
m->c->id, entry->r->the_request, engine->pub.id);
engine->no_live++;
entry->r->connection->current_thread = engine->c->current_thread;
*pr = entry->r;
return APR_SUCCESS;
}
else if (APR_NONBLOCK_READ == block) {
*pr = NULL;
return APR_EAGAIN;
}
else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
engine->shutdown = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): emtpy queue, shutdown engine %s",
m->id, engine->pub.id);
*pr = NULL;
return APR_EOF;
}
apr_thread_cond_timedwait(m->task_done, m->lock,
apr_time_from_msec(100));
}
}
apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine,
apr_read_type_e block, request_rec **pr)
{
h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
h2_mplx *m = engine->m;
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
apr_status_t status;
int acquired;
*pr = NULL;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
status = engine_pull(m, engine, block, pr);
int want_shutdown = (block == APR_BLOCK_READ);
if (0 && want_shutdown) {
/* For a blocking read, check first if requests are to be
* had and, if not, wait a short while before doing the
* blocking, and if unsuccessful, terminating read.
*/
status = h2_ngn_shed_pull_req(shed, ngn, capacity, 0, pr);
if (status != APR_EAGAIN) {
return status;
}
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
"h2_mplx(%ld): start block engine pull", m->id);
apr_thread_cond_timedwait(m->req_added, m->lock,
apr_time_from_msec(100));
ap_log_cerror(APLOG_MARK, APLOG_INFO, status, m->c,
"h2_mplx(%ld): done block engine pull", m->id);
}
status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
leave_mutex(m, acquired);
}
return status;
}
static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task,
int waslive, int aborted)
void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
{
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
int acquired;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
"h2_mplx(%ld): task %s %s by %s",
m->id, task->id, aborted? "aborted":"done",
engine->pub.id);
h2_task_output_close(task->output);
engine->no_finished++;
if (waslive) engine->no_live--;
engine->no_assigned--;
if (task->c != engine->c) { /* do not release what the engine runs on */
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
if (h2_ngn_shed_done_req(shed, ngn, r_conn) == APR_SUCCESS) {
h2_task *task = h2_ctx_cget_task(r_conn);
if (task) {
task_done(m, task);
leave_mutex(m, acquired);
}
}
}
void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn)
{
h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
h2_mplx *m = engine->m;
h2_task *task;
int acquired;
task = h2_ctx_cget_task(r_conn);
if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) {
engine_done(m, engine, task, 1, 0);
leave_mutex(m, acquired);
}
}
void h2_mplx_engine_exit(h2_req_engine *pub_engine)
void h2_mplx_req_engine_exit(h2_req_engine *ngn)
{
h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
h2_mplx *m = engine->m;
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
if (!m->aborted
&& !H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
h2_req_entry *entry;
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): exit engine %s (%s), "
"has still requests queued, shutdown=%d,"
"assigned=%ld, live=%ld, finished=%ld",
m->c->id, engine->pub.id, engine->pub.type,
engine->shutdown,
(long)engine->no_assigned, (long)engine->no_live,
(long)engine->no_finished);
for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
entry = H2_REQ_ENTRY_NEXT(entry)) {
request_rec *r = entry->r;
h2_task *task = h2_ctx_rget_task(r);
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): engine %s has queued task %s, "
"frozen=%d, aborting",
m->c->id, engine->pub.id, task->id, task->frozen);
engine_done(m, engine, task, 0, 1);
}
}
if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): exit engine %s (%s), "
"assigned=%ld, live=%ld, finished=%ld",
m->c->id, engine->pub.id, engine->pub.type,
(long)engine->no_assigned, (long)engine->no_live,
(long)engine->no_finished);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): exit engine %s (%s)",
m->c->id, engine->pub.id, engine->pub.type);
}
if (m->engine == &engine->pub) {
m->engine = NULL; /* TODO */
}
h2_ngn_shed_done_ngn(shed, ngn);
leave_mutex(m, acquired);
}
}

View File

@@ -47,6 +47,7 @@ struct h2_io_set;
struct apr_thread_cond_t;
struct h2_workers;
struct h2_int_queue;
struct h2_ngn_shed;
struct h2_req_engine;
#include <apr_queue.h>
@@ -75,19 +76,19 @@ struct h2_mplx {
struct h2_io_set *ready_ios;
struct h2_io_set *redo_ios;
int max_stream_started; /* highest stream id that started processing */
int workers_busy; /* # of workers processing on this mplx */
int workers_limit; /* current # of workers limit, dynamic */
int workers_def_limit; /* default # of workers limit */
int workers_max; /* max, hard limit # of workers in a process */
apr_uint32_t max_stream_started; /* highest stream id that started processing */
apr_uint32_t workers_busy; /* # of workers processing on this mplx */
apr_uint32_t workers_limit; /* current # of workers limit, dynamic */
apr_uint32_t workers_def_limit; /* default # of workers limit */
apr_uint32_t workers_max; /* max, hard limit # of workers in a process */
apr_time_t last_idle_block; /* last time, this mplx entered IDLE while
* streams were ready */
apr_time_t last_limit_change;/* last time, worker limit changed */
apr_time_t last_limit_change; /* last time, worker limit changed */
apr_interval_time_t limit_change_interval;
apr_thread_mutex_t *lock;
struct apr_thread_cond_t *added_output;
struct apr_thread_cond_t *task_done;
struct apr_thread_cond_t *req_added;
struct apr_thread_cond_t *join_wait;
apr_size_t stream_max_mem;
@@ -103,10 +104,7 @@ struct h2_mplx {
h2_mplx_consumed_cb *input_consumed;
void *input_consumed_ctx;
struct h2_req_engine *engine;
/* TODO: signal for waiting tasks*/
apr_queue_t *engine_queue;
int next_eng_id;
struct h2_ngn_shed *ngn_shed;
};
@@ -308,12 +306,16 @@ apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
* of bytes buffered reaches configured max.
* @param stream_id the stream identifier
* @param filter the apache filter context of the data
* @param blocking == 0 iff call should return with APR_INCOMPLETE if
* the full brigade cannot be written at once
* @param bb the bucket brigade to append
* @param trailers optional trailers for response, maybe NULL
* @param iowait a conditional used for block/signalling in h2_mplx
*/
apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id,
ap_filter_t* filter, apr_bucket_brigade *bb,
ap_filter_t* filter,
int blocking,
apr_bucket_brigade *bb,
apr_table_t *trailers,
struct apr_thread_cond_t *iowait);
@@ -408,20 +410,24 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link); \
apr_status_t h2_mplx_idle(h2_mplx *m);
/*******************************************************************************
* h2_mplx h2_req_engine handling.
* h2_req_engine handling
******************************************************************************/
typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine,
typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_uint32_t req_buffer_size,
request_rec *r);
apr_status_t h2_mplx_engine_push(const char *engine_type,
request_rec *r, h2_mplx_engine_init *einit);
apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine,
apr_read_type_e block, request_rec **pr);
void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn);
void h2_mplx_engine_exit(struct h2_req_engine *engine);
apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
request_rec *r,
h2_mplx_req_engine_init *einit);
apr_status_t h2_mplx_req_engine_pull(struct h2_req_engine *ngn,
apr_read_type_e block,
apr_uint32_t capacity,
request_rec **pr);
void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn);
void h2_mplx_req_engine_exit(struct h2_req_engine *ngn);
#endif /* defined(__mod_h2__h2_mplx__) */

333
modules/http2/h2_ngn_shed.c Normal file
View File

@@ -0,0 +1,333 @@
/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <assert.h>
#include <stddef.h>
#include <stdlib.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <apr_strings.h>
#include <apr_time.h>
#include <httpd.h>
#include <http_core.h>
#include <http_log.h>
#include "mod_http2.h"
#include "h2_private.h"
#include "h2_config.h"
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
#include "h2_int_queue.h"
#include "h2_response.h"
#include "h2_request.h"
#include "h2_task.h"
#include "h2_task_output.h"
#include "h2_util.h"
#include "h2_ngn_shed.h"
typedef struct h2_ngn_entry h2_ngn_entry;
struct h2_ngn_entry {
APR_RING_ENTRY(h2_ngn_entry) link;
request_rec *r;
};
#define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
#define H2_NGN_ENTRY_PREV(e) APR_RING_PREV((e), link)
#define H2_NGN_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_ngn_entry, link)
#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_ngn_entry, link)
#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b)
#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \
h2_ngn_entry *ap__b = (e); \
APR_RING_INSERT_HEAD((b), ap__b, h2_ngn_entry, link); \
} while (0)
#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \
h2_ngn_entry *ap__b = (e); \
APR_RING_INSERT_TAIL((b), ap__b, h2_ngn_entry, link); \
} while (0)
struct h2_req_engine {
const char *id; /* identifier */
const char *type; /* name of the engine type */
apr_pool_t *pool; /* pool for engine specific allocations */
conn_rec *c; /* connection this engine is assigned to */
h2_ngn_shed *shed;
unsigned int shutdown : 1; /* engine is being shut down */
APR_RING_HEAD(h2_req_entries, h2_ngn_entry) entries;
apr_uint32_t capacity; /* maximum concurrent requests */
apr_uint32_t no_assigned; /* # of assigned requests */
apr_uint32_t no_live; /* # of live */
apr_uint32_t no_finished; /* # of finished */
apr_thread_cond_t *io; /* condition var for waiting on data */
};
h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
apr_uint32_t req_buffer_size)
{
h2_ngn_shed *shed;
shed = apr_pcalloc(pool, sizeof(*shed));
shed->c = c;
shed->pool = pool;
shed->req_buffer_size = req_buffer_size;
shed->ngns = apr_hash_make(pool);
return shed;
}
void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx)
{
shed->user_ctx = user_ctx;
}
void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed)
{
return shed->user_ctx;
}
h2_ngn_shed *h2_ngn_shed_get_shed(h2_req_engine *ngn)
{
return ngn->shed;
}
void h2_ngn_shed_abort(h2_ngn_shed *shed)
{
shed->aborted = 1;
}
static apr_status_t ngn_schedule(h2_req_engine *ngn, request_rec *r)
{
h2_ngn_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
APR_RING_ELEM_INIT(entry, link);
entry->r = r;
H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
return APR_SUCCESS;
}
apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
h2_task *task, request_rec *r,
h2_req_engine_init *einit){
h2_req_engine *ngn;
apr_status_t status = APR_EOF;
AP_DEBUG_ASSERT(shed);
apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
if (task->ser_headers) {
/* Max compatibility, deny processing of this */
return APR_EOF;
}
ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
if (ngn) {
if (ngn->shutdown) {
ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r,
"h2_ngn_shed(%ld): %s in shutdown",
shed->c->id, ngn->id);
ngn = NULL;
}
else if (ngn->no_assigned >= ngn->capacity) {
ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r,
"h2_ngn_shed(%ld): %s over capacity %d/%d",
shed->c->id, ngn->id, ngn->no_assigned,
ngn->capacity);
ngn = NULL;
}
else if (ngn_schedule(ngn, r) == APR_SUCCESS) {
/* this task will be processed in another thread,
* freeze any I/O for the time being. */
h2_task_freeze(task, r);
ngn->no_assigned++;
status = APR_SUCCESS;
ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
"h2_ngn_shed(%ld): pushed request %s to %s",
shed->c->id, task->id, ngn->id);
}
else {
ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
"h2_ngn_shed(%ld): engine error adding req %s",
shed->c->id, ngn->id);
ngn = NULL;
}
}
if (!ngn && einit) {
ngn = apr_pcalloc(task->c->pool, sizeof(*ngn));
ngn->id = apr_psprintf(task->c->pool, "ngn-%ld-%d",
shed->c->id, shed->next_ngn_id++);
ngn->pool = task->c->pool;
ngn->type = apr_pstrdup(task->c->pool, ngn_type);
ngn->c = r->connection;
APR_RING_INIT(&ngn->entries, h2_ngn_entry, link);
ngn->shed = shed;
ngn->capacity = 100;
ngn->io = task->io;
ngn->no_assigned = 1;
ngn->no_live = 1;
status = einit(ngn, ngn->id, ngn->type, ngn->pool,
shed->req_buffer_size, r);
ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
"h2_ngn_shed(%ld): init engine %s (%s)",
shed->c->id, ngn->id, ngn->type);
if (status == APR_SUCCESS) {
apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, ngn);
}
}
return status;
}
static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
{
h2_ngn_entry *entry;
h2_task *task;
for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
entry = H2_NGN_ENTRY_NEXT(entry)) {
task = h2_ctx_rget_task(entry->r);
AP_DEBUG_ASSERT(task);
if (!task->frozen) {
H2_NGN_ENTRY_REMOVE(entry);
return entry;
}
}
return NULL;
}
apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed,
h2_req_engine *ngn,
apr_uint32_t capacity,
int want_shutdown,
request_rec **pr)
{
h2_ngn_entry *entry;
AP_DEBUG_ASSERT(ngn);
*pr = NULL;
if (shed->aborted) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c,
"h2_ngn_shed(%ld): abort while pulling requests %s",
shed->c->id, ngn->id);
return APR_EOF;
}
ngn->capacity = capacity;
if (!H2_REQ_ENTRIES_EMPTY(&ngn->entries)
&& (entry = pop_non_frozen(ngn))) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
"h2_ngn_shed(%ld): pulled request %s for engine %s",
shed->c->id, entry->r->the_request, ngn->id);
ngn->no_live++;
entry->r->connection->current_thread = ngn->c->current_thread;
*pr = entry->r;
return APR_SUCCESS;
}
else if (want_shutdown) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): emtpy queue, shutdown engine %s",
shed->c->id, ngn->id);
ngn->shutdown = 1;
return APR_EOF;
}
return APR_EAGAIN;
}
static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, h2_task *task,
int waslive, int aborted)
{
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
"h2_ngn_shed(%ld): task %s %s by %s",
shed->c->id, task->id, aborted? "aborted":"done", ngn->id);
h2_task_output_close(task->output);
ngn->no_finished++;
if (waslive) ngn->no_live--;
ngn->no_assigned--;
if (task->c != ngn->c) { /* do not release what the engine runs on */
return APR_SUCCESS;
}
return APR_EAGAIN;
}
apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed,
h2_req_engine *ngn, conn_rec *r_conn)
{
h2_task *task = h2_ctx_cget_task(r_conn);
if (task) {
return ngn_done_task(shed, ngn, task, 1, 0);
}
return APR_ECONNABORTED;
}
void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
{
h2_req_engine *existing;
if (!shed->aborted
&& !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
h2_ngn_entry *entry;
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
"h2_ngn_shed(%ld): exit engine %s (%s), "
"has still requests queued, shutdown=%d,"
"assigned=%ld, live=%ld, finished=%ld",
shed->c->id, ngn->id, ngn->type,
ngn->shutdown,
(long)ngn->no_assigned, (long)ngn->no_live,
(long)ngn->no_finished);
for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
entry = H2_NGN_ENTRY_NEXT(entry)) {
request_rec *r = entry->r;
h2_task *task = h2_ctx_rget_task(r);
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
"h2_ngn_shed(%ld): engine %s has queued task %s, "
"frozen=%d, aborting",
shed->c->id, ngn->id, task->id, task->frozen);
ngn_done_task(shed, ngn, task, 0, 1);
}
}
if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
"h2_ngn_shed(%ld): exit engine %s (%s), "
"assigned=%ld, live=%ld, finished=%ld",
shed->c->id, ngn->id, ngn->type,
(long)ngn->no_assigned, (long)ngn->no_live,
(long)ngn->no_finished);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, shed->c,
"h2_ngn_shed(%ld): exit engine %s (%s)",
shed->c->id, ngn->id, ngn->type);
}
existing = apr_hash_get(shed->ngns, ngn->type, APR_HASH_KEY_STRING);
if (existing == ngn) {
apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL);
}
}

View File

@@ -0,0 +1,64 @@
/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef h2_req_shed_h
#define h2_req_shed_h
struct h2_req_engine;
typedef struct h2_ngn_shed h2_ngn_shed;
struct h2_ngn_shed {
conn_rec *c;
apr_pool_t *pool;
apr_hash_t *ngns;
int next_ngn_id;
void *user_ctx;
unsigned int aborted : 1;
apr_uint32_t req_buffer_size; /* preferred buffer size for responses */
};
typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_uint32_t req_buffer_size,
request_rec *r);
h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
apr_uint32_t req_buffer_size);
void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx);
void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed);
h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn);
void h2_ngn_shed_abort(h2_ngn_shed *shed);
apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
struct h2_task *task, request_rec *r,
h2_shed_ngn_init *init_cb);
apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
apr_uint32_t capacity,
int want_shutdown, request_rec **pr);
apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed,
struct h2_req_engine *ngn, conn_rec *r_conn);
void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn);
#endif /* h2_req_shed_h */

View File

@@ -17,6 +17,7 @@
#include <apr_strings.h>
#include <nghttp2/nghttp2.h>
#include <mpm_common.h>
#include <httpd.h>
#include <mod_proxy.h>
#include <mod_http2.h>
@@ -91,23 +92,11 @@ static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
* issues in case of error returned below. */
apr_brigade_cleanup(bb);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(01084)
"pass request body failed to %pI (%s)",
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO()
"pass output failed to %pI (%s)",
p_conn->addr, p_conn->hostname);
if (origin->aborted) {
const char *ssl_note;
if (((ssl_note = apr_table_get(origin->notes, "SSL_connect_rv"))
!= NULL) && (strcmp(ssl_note, "err") == 0)) {
return HTTP_INTERNAL_SERVER_ERROR;
}
return HTTP_GATEWAY_TIME_OUT;
}
else {
return HTTP_BAD_REQUEST;
}
}
return OK;
return status;
}
static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
@@ -118,19 +107,19 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
apr_status_t status;
int flush = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d",
session->id, (int)length, flush);
if (data) {
b = apr_bucket_transient_create((const char*)data, length,
session->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(session->output, b);
}
status = proxy_pass_brigade(session->c->bucket_alloc,
session->p_conn, session->c,
session->output, flush);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
"h2_proxy_sesssion(%s): sending", session->id);
"h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d",
session->id, (int)length, flush);
if (status != APR_SUCCESS) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return length;
@@ -146,7 +135,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341)
"h2_session(%s): recv FRAME[%s]",
"h2_proxy_session(%s): recv FRAME[%s]",
session->id, buffer);
}
@@ -167,7 +156,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03342)
"h2_session(%s): recv FRAME[%s]",
"h2_proxy_session(%s): recv FRAME[%s]",
session->id, buffer);
}
break;
@@ -186,7 +175,7 @@ static int before_frame_send(nghttp2_session *ngh2,
h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343)
"h2_session(%s): sent FRAME[%s]",
"h2_proxy_session(%s): sent FRAME[%s]",
session->id, buffer);
}
return 0;
@@ -339,9 +328,13 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
apr_bucket *b;
apr_status_t status;
nghttp2_session_consume(ngh2, stream_id, len);
/*nghttp2_session_consume(ngh2, stream_id, len);*/
stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
if (!stream) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r,
"h2_proxy_session(%s): recv data chunk for "
"unknown stream %d, ignored",
session->id, stream_id);
return 0;
}
@@ -359,10 +352,14 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->output, b);
}
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r,
"h2_proxy_session(%s): pass response data for "
"stream %d, %d bytes", session->id, stream_id, (int)len);
status = ap_pass_brigade(stream->r->output_filters, stream->output);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
"h2_session(%s-%d): passing output",
"h2_proxy_session(%s): passing output on stream %d",
session->id, stream->id);
nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
stream_id, NGHTTP2_STREAM_CLOSED);
@@ -375,6 +372,9 @@ static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
uint32_t error_code, void *user_data)
{
h2_proxy_session *session = user_data;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
"h2_proxy_session(%s): stream=%d, closed, err=%d",
session->id, stream_id, error_code);
dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
return 0;
}
@@ -415,6 +415,9 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
*data_flags = 0;
stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
if (!stream) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r,
"h2_proxy_stream(%s): data_read, stream %d not found",
stream->session->id, stream_id);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
@@ -492,6 +495,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
h2_proxy_session *session;
nghttp2_session_callbacks *cbs;
nghttp2_option *option;
ap_filter_t *f;
session = apr_pcalloc(pool, sizeof(*session));
apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
@@ -522,6 +526,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
nghttp2_option_new(&option);
nghttp2_option_set_peer_max_concurrent_streams(option, 100);
nghttp2_option_set_no_auto_window_update(option, 0);
nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
@@ -531,6 +536,14 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"setup session for %s", p_conn->hostname);
f = session->c->input_filters;
while (f) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_proxy_session(%s): c->input_filter %s",
session->id, f->frec->name);
f = f->next;
}
}
return p_conn->data;
}
@@ -539,6 +552,12 @@ static apr_status_t session_start(h2_proxy_session *session)
{
nghttp2_settings_entry settings[2];
int rv, add_conn_window;
apr_socket_t *s;
s = ap_get_conn_socket(session->c);
if (s) {
ap_sock_disable_nagle(s);
}
settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
settings[0].value = 0;
@@ -557,47 +576,6 @@ static apr_status_t session_start(h2_proxy_session *session)
return rv? APR_EGENERAL : APR_SUCCESS;
}
static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
{
apr_status_t status = APR_SUCCESS;
apr_size_t readlen = 0;
ssize_t n;
while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
apr_bucket* b = APR_BRIGADE_FIRST(bb);
if (!APR_BUCKET_IS_METADATA(b)) {
const char *bdata = NULL;
apr_size_t blen = 0;
status = apr_bucket_read(b, &bdata, &blen, APR_NONBLOCK_READ);
if (status == APR_SUCCESS && blen > 0) {
n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
if (n < 0) {
if (nghttp2_is_fatal((int)n)) {
return APR_EGENERAL;
}
}
else {
readlen += n;
if (n < blen) {
apr_bucket_split(b, n);
}
}
}
}
apr_bucket_delete(b);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
"h2_session(%s): fed %ld bytes of input to session",
session->id, (long)readlen);
if (readlen == 0 && status == APR_SUCCESS) {
return APR_EAGAIN;
}
return status;
}
static apr_status_t open_stream(h2_proxy_session *session, const char *url,
request_rec *r, h2_proxy_stream **pstream)
{
@@ -668,13 +646,13 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st
const char *task_id = apr_table_get(stream->r->connection->notes,
H2_TASK_ID_NOTE);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_session(%s): submit %s%s -> %d (task %s)",
"h2_proxy_session(%s): submit %s%s -> %d (task %s)",
session->id, stream->req->authority, stream->req->path,
rv, task_id);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_session(%s-%d): submit %s%s",
"h2_proxy_session(%s-%d): submit %s%s",
session->id, rv, stream->req->authority, stream->req->path);
}
@@ -689,10 +667,73 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st
return APR_EGENERAL;
}
static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
{
apr_status_t status = APR_SUCCESS;
apr_size_t readlen = 0;
ssize_t n;
while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
apr_bucket* b = APR_BRIGADE_FIRST(bb);
if (APR_BUCKET_IS_METADATA(b)) {
if (APR_BUCKET_IS_EOS(b)) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_proxy_session(%s): read EOS from conn",
session->id);
}
else if (APR_BUCKET_IS_FLUSH(b)) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_proxy_session(%s): read FLUSH from conn",
session->id);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_proxy_session(%s): read unkown META from conn",
session->id);
}
}
else {
const char *bdata = NULL;
apr_size_t blen = 0;
status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
if (status == APR_SUCCESS && blen > 0) {
n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_proxy_session(%s): feeding %ld bytes -> %ld",
session->id, (long)blen, (long)n);
if (n < 0) {
if (nghttp2_is_fatal((int)n)) {
status = APR_EGENERAL;
}
}
else {
readlen += n;
if (n < blen) {
apr_bucket_split(b, n);
}
}
}
}
apr_bucket_delete(b);
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
"h2_proxy_session(%s): fed %ld bytes of input to session",
session->id, (long)readlen);
if (readlen == 0 && status == APR_SUCCESS) {
return APR_EAGAIN;
}
return status;
}
static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
apr_interval_time_t timeout)
{
apr_status_t status;
apr_status_t status = APR_SUCCESS;
if (APR_BRIGADE_EMPTY(session->input)) {
apr_socket_t *socket = NULL;
apr_time_t save_timeout = -1;
@@ -705,7 +746,7 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
else {
/* cannot block on timeout */
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c,
"h2_session(%s): unable to get conn socket",
"h2_proxy_session(%s): unable to get conn socket",
session->id);
return APR_ENOTIMPL;
}
@@ -715,24 +756,25 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
AP_MODE_READBYTES,
block? APR_BLOCK_READ : APR_NONBLOCK_READ,
64 * 1024);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
"h2_proxy_session(%s): read from conn", session->id);
if (socket && save_timeout != -1) {
apr_socket_timeout_set(socket, save_timeout);
}
}
if (status == APR_SUCCESS) {
if (APR_BRIGADE_EMPTY(session->input)) {
status = APR_EAGAIN;
}
else {
feed_brigade(session, session->input);
}
status = feed_brigade(session, session->input);
}
else if (APR_STATUS_IS_TIMEUP(status)) {
/* nop */
}
else if (!APR_STATUS_IS_EAGAIN(status)) {
dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
"h2_proxy_session(%s): read error", session->id);
dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
}
return status;
}
@@ -918,8 +960,8 @@ static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
break;
default:
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%s): conn error -> shutdown", session->id);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c,
"h2_proxy_session(%s): conn error -> shutdown", session->id);
session_shutdown(session, arg, msg);
break;
}
@@ -936,7 +978,7 @@ static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
default:
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%s): proto error -> shutdown", session->id);
"h2_proxy_session(%s): proto error -> shutdown", session->id);
session_shutdown(session, arg, msg);
break;
}
@@ -984,7 +1026,6 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
* task processing in other threads. Do a busy wait with
* backoff timer. */
transit(session, "no io", H2_PROXYS_ST_WAIT);
session->wait_timeout = 25;
}
break;
default:
@@ -1133,7 +1174,7 @@ static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
break;
default:
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%s): unknown event %d",
"h2_proxy_session(%s): unknown event %d",
session->id, ev);
break;
}
@@ -1145,7 +1186,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
int have_written = 0, have_read = 0;
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_session(%s): process", session->id);
"h2_proxy_session(%s): process", session->id);
switch (session->state) {
case H2_PROXYS_ST_INIT:
@@ -1154,7 +1195,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
}
else {
dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
}
break;
@@ -1165,8 +1206,8 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
int rv = nghttp2_session_send(session->ngh2);
if (rv < 0 && nghttp2_is_fatal(rv)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_session(%s): write, rv=%d", session->id, rv);
dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
"h2_proxy_session(%s): write, rv=%d", session->id, rv);
dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
break;
}
have_written = 1;
@@ -1189,14 +1230,27 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
if (check_suspended(session) == APR_EAGAIN) {
/* no stream has become resumed. Do a blocking read with
* ever increasing timeouts... */
status = h2_proxy_session_read(session, 0, session->wait_timeout);
if (status == APR_SUCCESS) {
dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
if (session->wait_timeout < 25) {
session->wait_timeout = 25;
}
else if (APR_STATUS_IS_TIMEUP(status)) {
else {
session->wait_timeout = H2MIN(apr_time_from_msec(100),
2*session->wait_timeout);
}
status = h2_proxy_session_read(session, 1, session->wait_timeout);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
"h2_proxy_session(%s): WAIT read, timeout=%fms",
session->id, (float)session->wait_timeout/1000.0);
if (status == APR_SUCCESS) {
have_read = 1;
dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
}
else if (APR_STATUS_IS_TIMEUP(status)
|| APR_STATUS_IS_EAGAIN(status)) {
/* go back to checking all inputs again */
transit(session, "wait cycle", H2_PROXYS_ST_BUSY);
}
}
break;
@@ -1208,13 +1262,17 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
default:
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
APLOGNO(03346)"h2_session(%s): unknown state %d",
APLOGNO(03346)"h2_proxy_session(%s): unknown state %d",
session->id, session->state);
dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
break;
}
if (have_read || have_written) {
session->wait_timeout = 0;
}
if (!nghttp2_session_want_read(session->ngh2)
&& !nghttp2_session_want_write(session->ngh2)) {
dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);

View File

@@ -451,6 +451,9 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
/* Request check post hooks failed. An example of this would be a
* request for a vhost where h2 is disabled --> 421.
*/
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, conn, APLOGNO()
"h2_request(%d): access_status=%d, request_create failed",
req->id, access_status);
ap_die(access_status, r);
ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r);
ap_run_log_transaction(r);

View File

@@ -597,15 +597,6 @@ static int on_frame_send_cb(nghttp2_session *ngh2,
(long)session->frames_sent);
}
++session->frames_sent;
switch (frame->hd.type) {
case NGHTTP2_HEADERS:
case NGHTTP2_DATA:
/* no explicit flushing necessary */
break;
default:
session->flush = 1;
break;
}
return 0;
}
@@ -2021,6 +2012,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
no_streams = h2_ihash_is_empty(session->streams);
update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE
: SERVER_BUSY_READ), "idle");
/* make certain, the client receives everything before we idle */
h2_conn_io_flush(&session->io);
if (async && no_streams && !session->r && session->requests_received) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): async idle, nonblock read", session->id);
@@ -2176,6 +2169,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
"h2_session: wait for data, %ld micros",
(long)session->wait_us);
}
/* make certain, the client receives everything before we idle */
h2_conn_io_flush(&session->io);
status = h2_mplx_out_trywait(session->mplx, session->wait_us,
session->iowait);
if (status == APR_SUCCESS) {
@@ -2212,8 +2207,7 @@ apr_status_t h2_session_process(h2_session *session, int async)
}
out:
h2_conn_io_pass(&session->io, session->flush);
session->flush = 0;
h2_conn_io_flush(&session->io);
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): [%s] process returns",

View File

@@ -93,16 +93,16 @@ static apr_status_t h2_response_freeze_filter(ap_filter_t* f,
AP_DEBUG_ASSERT(task);
if (task->frozen) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
"h2_response_freeze_filter, saving");
return ap_save_brigade(f, &task->frozen_out, &bb, task->c->pool);
return ap_save_brigade(f, &task->output->frozen_bb, &bb, task->c->pool);
}
if (APR_BRIGADE_EMPTY(bb)) {
return APR_SUCCESS;
}
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
"h2_response_freeze_filter, passing");
return ap_pass_brigade(f->next, bb);
}
@@ -197,12 +197,18 @@ h2_task *h2_task_create(long session_id, const h2_request *req,
task->request = req;
task->input_eos = !req->body;
task->ser_headers = req->serialize;
task->blocking = 1;
h2_ctx_create_for(c, task);
return task;
}
void h2_task_set_io_blocking(h2_task *task, int blocking)
{
task->blocking = blocking;
}
apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond)
{
apr_status_t status;
@@ -212,6 +218,8 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond)
task->input = h2_task_input_create(task, task->c);
task->output = h2_task_output_create(task, task->c);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_task(%s): process connection", task->id);
ap_process_connection(task->c, ap_get_conn_socket(task->c));
if (task->frozen) {
@@ -236,6 +244,8 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
conn_state_t *cs = c->cs;
request_rec *r;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): create request_rec", task->id);
r = h2_request_create_rec(req, c);
if (r && (r->status == HTTP_OK)) {
ap_update_child_status(c->sbh, SERVER_BUSY_READ, r);
@@ -264,6 +274,15 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
cs->state = CONN_STATE_WRITE_COMPLETION;
r = NULL;
}
else if (!r) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): create request_rec failed, r=NULL", task->id);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): create request_rec failed, r->status=%d",
task->id, r->status);
}
c->sbh = NULL;
return APR_SUCCESS;
@@ -297,7 +316,7 @@ apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
conn_rec *c = task->c;
task->frozen = 1;
task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc);
task->output->frozen_bb = apr_brigade_create(c->pool, c->bucket_alloc);
ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c,
"h2_task(%s), frozen", task->id);

View File

@@ -58,12 +58,11 @@ struct h2_task {
unsigned int input_eos : 1;
unsigned int ser_headers : 1;
unsigned int frozen : 1;
unsigned int blocking : 1;
struct h2_task_input *input;
struct h2_task_output *output;
struct apr_thread_cond_t *io; /* used to wait for events on */
apr_bucket_brigade *frozen_out;
};
h2_task *h2_task_create(long session_id, const struct h2_request *req,
@@ -83,4 +82,6 @@ extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out
apr_status_t h2_task_freeze(h2_task *task, request_rec *r);
apr_status_t h2_task_thaw(h2_task *task);
void h2_task_set_io_blocking(h2_task *task, int blocking);
#endif /* defined(__mod_h2__h2_task__) */

View File

@@ -77,14 +77,14 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
if (f) {
/* This happens currently when ap_die(status, r) is invoked
* by a read request filter. */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204)
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204)
"h2_task_output(%s): write without response by %s "
"for %s %s %s",
output->task->id, caller,
output->task->request->method,
output->task->request->authority,
output->task->request->path);
f->c->aborted = 1;
output->c->aborted = 1;
}
if (output->task->io) {
apr_thread_cond_broadcast(output->task->io);
@@ -94,37 +94,48 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
if (h2_task_logio_add_bytes_out) {
/* counter headers as if we'd do a HTTP/1.1 serialization */
/* TODO: counter a virtual status line? */
apr_off_t bytes_written;
apr_brigade_length(bb, 0, &bytes_written);
bytes_written += h2_util_table_bytes(response->headers, 3)+1;
h2_task_logio_add_bytes_out(f->c, bytes_written);
output->written = h2_util_table_bytes(response->headers, 3)+1;
h2_task_logio_add_bytes_out(output->c, output->written);
}
get_trailers(output);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03348)
"h2_task_output(%s): open as needed %s %s %s",
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348)
"h2_task(%s): open response to %s %s %s",
output->task->id, output->task->request->method,
output->task->request->authority,
output->task->request->path);
return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
response, f, bb, output->task->io);
}
return APR_EOF;
return APR_SUCCESS;
}
void h2_task_output_close(h2_task_output *output)
static apr_status_t write_brigade_raw(h2_task_output *output,
ap_filter_t* f, apr_bucket_brigade* bb)
{
open_if_needed(output, NULL, NULL, "close");
if (output->state != H2_TASK_OUT_DONE) {
if (output->task->frozen_out
&& !APR_BRIGADE_EMPTY(output->task->frozen_out)) {
h2_mplx_out_write(output->task->mplx, output->task->stream_id,
NULL, output->task->frozen_out, NULL, NULL);
apr_off_t written, left;
apr_status_t status;
apr_brigade_length(bb, 0, &written);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
"h2_task(%s): write response body (%ld bytes)",
output->task->id, (long)written);
status = h2_mplx_out_write(output->task->mplx, output->task->stream_id,
f, output->task->blocking, bb,
get_trailers(output), output->task->io);
if (status == APR_INCOMPLETE) {
apr_brigade_length(bb, 0, &left);
written -= left;
status = APR_SUCCESS;
}
h2_mplx_out_close(output->task->mplx, output->task->stream_id,
get_trailers(output));
output->state = H2_TASK_OUT_DONE;
if (status == APR_SUCCESS) {
output->written += written;
if (h2_task_logio_add_bytes_out) {
h2_task_logio_add_bytes_out(output->c, written);
}
}
return status;
}
/* Bring the data from the brigade (which represents the result of the
@@ -137,34 +148,57 @@ apr_status_t h2_task_output_write(h2_task_output *output,
apr_status_t status;
if (APR_BRIGADE_EMPTY(bb)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_task_output(%s): empty write", output->task->id);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c,
"h2_task(%s): empty write", output->task->id);
return APR_SUCCESS;
}
if (output->task->frozen) {
h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2,
"frozen task output write", bb);
return ap_save_brigade(f, &output->task->frozen_out, &bb,
output->c->pool);
return ap_save_brigade(f, &output->frozen_bb, &bb, output->c->pool);
}
status = open_if_needed(output, f, bb, "write");
if (status != APR_EOF) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_output(%s): opened and passed brigade",
output->task->id);
return status;
/* Attempt to write saved brigade first */
if (status == APR_SUCCESS && output->bb
&& !APR_BRIGADE_EMPTY(output->bb)) {
status = write_brigade_raw(output, f, output->bb);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_task_output(%s): write brigade", output->task->id);
if (h2_task_logio_add_bytes_out) {
apr_off_t bytes_written;
apr_brigade_length(bb, 0, &bytes_written);
h2_task_logio_add_bytes_out(f->c, bytes_written);
/* If there is nothing saved (anymore), try to write the brigade passed */
if (status == APR_SUCCESS
&& (!output->bb || APR_BRIGADE_EMPTY(output->bb))
&& !APR_BRIGADE_EMPTY(bb)) {
status = write_brigade_raw(output, f, bb);
}
/* If the passed brigade is not empty, save it before return */
if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, output->c,
"h2_task(%s): could not write all, saving brigade",
output->task->id);
if (!output->bb) {
output->bb = apr_brigade_create(output->c->pool, output->c->bucket_alloc);
}
return ap_save_brigade(f, &output->bb, &bb, output->c->pool);
}
return status;
}
void h2_task_output_close(h2_task_output *output)
{
open_if_needed(output, NULL, NULL, "close");
if (output->state != H2_TASK_OUT_DONE) {
if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) {
h2_mplx_out_write(output->task->mplx, output->task->stream_id,
NULL, 1, output->frozen_bb, NULL, NULL);
}
h2_mplx_out_close(output->task->mplx, output->task->stream_id,
get_trailers(output));
output->state = H2_TASK_OUT_DONE;
}
return h2_mplx_out_write(output->task->mplx, output->task->stream_id,
f, bb, get_trailers(output), output->task->io);
}

View File

@@ -30,16 +30,21 @@ typedef enum {
H2_TASK_OUT_INIT,
H2_TASK_OUT_STARTED,
H2_TASK_OUT_DONE,
} h2_task_output_state_t;
} h2_task_out_state_t;
typedef struct h2_task_output h2_task_output;
struct h2_task_output {
conn_rec *c;
struct h2_task *task;
h2_task_output_state_t state;
h2_task_out_state_t state;
struct h2_from_h1 *from_h1;
unsigned int trailers_passed : 1;
apr_off_t written;
apr_bucket_brigade *bb;
apr_bucket_brigade *frozen_bb;
};
h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c);

View File

@@ -26,7 +26,7 @@
* @macro
* Version number of the http2 module as c string
*/
#define MOD_HTTP2_VERSION "1.3.3-DEV"
#define MOD_HTTP2_VERSION "1.4.0-DEV"
/**
* @macro
@@ -34,7 +34,7 @@
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
#define MOD_HTTP2_VERSION_NUM 0x010303
#define MOD_HTTP2_VERSION_NUM 0x010400
#endif /* mod_h2_h2_version_h */

View File

@@ -128,28 +128,29 @@ static char *http2_var_lookup(apr_pool_t *, server_rec *,
conn_rec *, request_rec *, char *name);
static int http2_is_h2(conn_rec *);
static apr_status_t http2_req_engine_push(const char *engine_type,
static apr_status_t http2_req_engine_push(const char *ngn_type,
request_rec *r,
h2_req_engine_init *einit)
{
return h2_mplx_engine_push(engine_type, r, einit);
return h2_mplx_req_engine_push(ngn_type, r, einit);
}
static apr_status_t http2_req_engine_pull(h2_req_engine *engine,
static apr_status_t http2_req_engine_pull(h2_req_engine *ngn,
apr_read_type_e block,
apr_uint32_t capacity,
request_rec **pr)
{
return h2_mplx_engine_pull(engine, block, pr);
return h2_mplx_req_engine_pull(ngn, block, capacity, pr);
}
static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn)
static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
{
h2_mplx_engine_done(engine, r_conn);
h2_mplx_req_engine_done(ngn, r_conn);
}
static void http2_req_engine_exit(h2_req_engine *engine)
static void http2_req_engine_exit(h2_req_engine *ngn)
{
h2_mplx_engine_exit(engine);
h2_mplx_req_engine_exit(ngn);
}

View File

@@ -43,27 +43,12 @@ typedef struct h2_req_engine h2_req_engine;
* @param engine the allocated, partially filled structure
* @param r the first request to process, or NULL
*/
typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r);
/**
* The public structure of a h2_req_engine. It gets allocated by the http2
* infrastructure, assigned id, type, pool, io and connection and passed to the
* h2_req_engine_init() callback to complete initialization.
* This happens whenever a new request gets "push"ed for an engine type and
* no instance, or no free instance, for the type is available.
*/
struct h2_req_engine {
const char *id; /* identifier */
apr_pool_t *pool; /* pool for engine specific allocations */
const char *type; /* name of the engine type */
unsigned char window_bits;/* preferred size of overall response data
* mod_http2 is willing to buffer as log2 */
unsigned char req_window_bits;/* preferred size of response body data
* mod_http2 is willing to buffer per request,
* as log2 */
apr_size_t capacity; /* maximum concurrent requests */
void *user_data; /* user specific data */
};
typedef apr_status_t h2_req_engine_init(h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_uint32_t req_buffer_size,
request_rec *r);
/**
* Push a request to an engine with the specified name for further processing.
@@ -95,6 +80,7 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t,
APR_DECLARE_OPTIONAL_FN(apr_status_t,
http2_req_engine_pull, (h2_req_engine *engine,
apr_read_type_e block,
apr_uint32_t capacity,
request_rec **pr));
APR_DECLARE_OPTIONAL_FN(void,
http2_req_engine_done, (h2_req_engine *engine,

View File

@@ -44,7 +44,9 @@ static int (*is_h2)(conn_rec *c);
static apr_status_t (*req_engine_push)(const char *name, request_rec *r,
h2_req_engine_init *einit);
static apr_status_t (*req_engine_pull)(h2_req_engine *engine,
apr_read_type_e block, request_rec **pr);
apr_read_type_e block,
apr_uint32_t capacity,
request_rec **pr);
static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
static void (*req_engine_exit)(h2_req_engine *engine);
@@ -59,9 +61,16 @@ typedef struct h2_proxy_ctx {
proxy_server_conf *conf;
h2_req_engine *engine;
const char *engine_id;
const char *engine_type;
apr_pool_t *engine_pool;
apr_uint32_t req_buffer_size;
unsigned standalone : 1;
unsigned is_ssl : 1;
unsigned flushall : 1;
apr_status_t r_status; /* status of our first request work */
} h2_proxy_ctx;
static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
@@ -189,13 +198,21 @@ static int proxy_http2_canon(request_rec *r, char *url)
return OK;
}
static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r)
static apr_status_t proxy_engine_init(h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_uint32_t req_buffer_size,
request_rec *r)
{
h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config,
&proxy_http2_module);
if (ctx) {
engine->capacity = 100; /* guess until we know */
ctx->engine = engine;
ctx->engine_id = id;
ctx->engine_type = type;
ctx->engine_pool = pool;
ctx->req_buffer_size = req_buffer_size;
return APR_SUCCESS;
}
ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r,
@@ -225,28 +242,34 @@ static void request_done(h2_proxy_session *session, request_rec *r)
{
h2_proxy_ctx *ctx = session->user_data;
if (req_engine_done && r != ctx->rbase) {
if (r == ctx->rbase) {
ctx->r_status = APR_SUCCESS;
}
else if (req_engine_done && ctx->engine) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
"h2_proxy_session(%s): request %s",
ctx->engine->id, r->the_request);
ctx->engine_id, r->the_request);
req_engine_done(ctx->engine, r->connection);
}
}
static request_rec *next_request(h2_proxy_ctx *ctx, h2_proxy_session *session,
request_rec *r, int before_leave)
static apr_status_t next_request(h2_proxy_ctx *ctx, h2_proxy_session *session,
request_rec *r, int before_leave,
request_rec **pr)
{
if (!r && !ctx->standalone) {
ctx->engine->capacity = session->remote_max_concurrent;
if (req_engine_pull(ctx->engine,
*pr = r;
if (!r && ctx->engine) {
apr_status_t status;
status = req_engine_pull(ctx->engine,
before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ,
&r) == APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
H2MAX(1, session->remote_max_concurrent), pr);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_proxy_session(%s): pulled request %s",
session->id, r->the_request);
session->id, (*pr? (*pr)->the_request : "NULL"));
return status;
}
}
return r;
return *pr? APR_SUCCESS : APR_EAGAIN;
}
static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
@@ -255,7 +278,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
setup_backend:
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
"eng(%s): setup backend", ctx->engine->id);
"eng(%s): setup backend", ctx->engine_id);
/* Step Two: Make the Connection (or check that an already existing
* socket is still usable). On success, we have a socket connected to
* backend->hostname. */
@@ -299,10 +322,9 @@ setup_backend:
* loop until we got the response or encounter errors.
*/
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
"eng(%s): setup session", ctx->engine->id);
session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn, ctx->conf,
ctx->engine->window_bits,
ctx->engine->req_window_bits,
"eng(%s): setup session", ctx->engine_id);
session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf,
30, h2_log2(ctx->req_buffer_size),
request_done);
if (!session) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection,
@@ -312,27 +334,32 @@ setup_backend:
run_session:
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
"eng(%s): run session %s", ctx->engine->id, session->id);
"eng(%s): run session %s", ctx->engine_id, session->id);
session->user_data = ctx;
status = h2_proxy_session_process(session);
while (APR_STATUS_IS_EAGAIN(status)) {
r = next_request(ctx, session, r, 0);
if (r) {
status = next_request(ctx, session, r, 0, &r);
if (status == APR_SUCCESS) {
add_request(session, r);
r = NULL;
}
else if (!APR_STATUS_IS_EAGAIN(status)) {
break;
}
status = h2_proxy_session_process(session);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, ctx->owner,
"eng(%s): end of session run", ctx->engine->id);
"eng(%s): end of session run", ctx->engine_id);
if (session->state == H2_PROXYS_ST_DONE || status != APR_SUCCESS) {
ctx->p_conn->close = 1;
}
r = next_request(ctx, session, r, 1);
if (r) {
if (status == APR_SUCCESS) {
status = next_request(ctx, session, r, 1, &r);
}
if (status == APR_SUCCESS) {
if (ctx->p_conn->close) {
/* the connection is/willbe closed, the session is terminated.
* Any open stream of that session needs to
@@ -355,7 +382,8 @@ run_session:
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status,
ctx->p_conn->connection, "eng(%s): session run done",
ctx->engine->id);
ctx->engine_id);
session->user_data = NULL;
return status;
}
@@ -412,6 +440,7 @@ static int proxy_http2_handler(request_rec *r,
ctx->worker = worker;
ctx->conf = conf;
ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
ctx->r_status = HTTP_SERVICE_UNAVAILABLE;
ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
apr_table_setn(r->notes, H2_PROXY_REQ_URL_NOTE, url);
@@ -460,6 +489,7 @@ static int proxy_http2_handler(request_rec *r,
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
"H2: pushed request %s to engine type %s",
url, engine_type);
ctx->r_status = APR_SUCCESS;
goto cleanup;
}
}
@@ -467,14 +497,10 @@ static int proxy_http2_handler(request_rec *r,
if (!ctx->engine) {
/* No engine was available or has been initialized, handle this
* request just by ourself. */
h2_req_engine *engine = apr_pcalloc(p, sizeof(*engine));
engine->id = apr_psprintf(p, "eng-proxy-%ld", c->id);
engine->type = engine_type;
engine->pool = p;
engine->capacity = 1;
engine->window_bits = 30;
engine->req_window_bits = 16;
ctx->engine = engine;
ctx->engine_id = apr_psprintf(p, "eng-proxy-%ld", c->id);
ctx->engine_type = engine_type;
ctx->engine_pool = p;
ctx->req_buffer_size = (32*1024);
ctx->standalone = 1;
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
"h2_proxy_http2(%ld): setup standalone engine for type %s",
@@ -482,16 +508,16 @@ static int proxy_http2_handler(request_rec *r,
}
else {
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
"H2: hosting engine %s for request %s", ctx->engine->id, url);
"H2: hosting engine %s for request %s", ctx->engine_id, url);
}
status = proxy_engine_run(ctx, r);
cleanup:
if (!ctx->standalone && ctx->engine && req_engine_exit) {
if (ctx->engine && req_engine_exit) {
req_engine_exit(ctx->engine);
}
ctx->engine = NULL;
}
if (ctx) {
if (ctx->p_conn) {
@@ -507,7 +533,7 @@ cleanup:
ctx->p_conn = NULL;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "leaving handler");
return status;
return ctx->r_status;
}
static void register_hook(apr_pool_t *p)