mirror of
https://github.com/apache/httpd.git
synced 2025-08-08 15:02:10 +03:00
mod_proxy_http2: stability improvements
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1734097 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
@@ -402,6 +402,12 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
|
||||
apr_thread_cond_broadcast(m->req_added);
|
||||
}
|
||||
}
|
||||
|
||||
if (!h2_io_set_is_empty(m->stream_ios)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
|
||||
"h2_mplx(%ld): release_join, %d streams still open",
|
||||
m->id, (int)h2_io_set_size(m->stream_ios));
|
||||
}
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
|
||||
"h2_mplx(%ld): release_join -> destroy", m->id);
|
||||
leave_mutex(m, acquired);
|
||||
@@ -844,7 +850,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers)
|
||||
h2_response *r = h2_response_die(stream_id, APR_EGENERAL,
|
||||
io->request, m->pool);
|
||||
status = out_open(m, stream_id, r, NULL, NULL, NULL);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
|
||||
"h2_mplx(%ld-%d): close, no response, no rst",
|
||||
m->id, io->id);
|
||||
}
|
||||
@@ -1135,12 +1141,27 @@ static void task_done(h2_mplx *m, h2_task *task)
|
||||
* long as it has requests to handle. Might no be fair to
|
||||
* other mplx's. Perhaps leave after n requests? */
|
||||
h2_mplx_out_close(m, task->stream_id, NULL);
|
||||
|
||||
if (task->engine) {
|
||||
/* should already have been done by the task, but as
|
||||
* a last resort, we get rid of it here. */
|
||||
if (!h2_req_engine_is_shutdown(task->engine)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
|
||||
"h2_mplx(%ld): task(%s) has not-shutdown "
|
||||
"engine(%s)", m->id, task->id,
|
||||
h2_req_engine_get_id(task->engine));
|
||||
}
|
||||
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
|
||||
}
|
||||
|
||||
if (m->spare_allocator) {
|
||||
apr_allocator_destroy(m->spare_allocator);
|
||||
m->spare_allocator = NULL;
|
||||
}
|
||||
|
||||
h2_slave_destroy(task->c, &m->spare_allocator);
|
||||
task = NULL;
|
||||
|
||||
if (io) {
|
||||
apr_time_t now = apr_time_now();
|
||||
if (!io->orphaned && m->redo_ios
|
||||
@@ -1389,23 +1410,23 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
|
||||
*pr = NULL;
|
||||
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
|
||||
int want_shutdown = (block == APR_BLOCK_READ);
|
||||
if (0 && want_shutdown) {
|
||||
if (want_shutdown && !h2_iq_empty(m->q)) {
|
||||
/* For a blocking read, check first if requests are to be
|
||||
* had and, if not, wait a short while before doing the
|
||||
* blocking, and if unsuccessful, terminating read.
|
||||
*/
|
||||
status = h2_ngn_shed_pull_req(shed, ngn, capacity, 0, pr);
|
||||
if (status != APR_EAGAIN) {
|
||||
return status;
|
||||
status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
|
||||
if (APR_STATUS_IS_EAGAIN(status)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
|
||||
"h2_mplx(%ld): start block engine pull", m->id);
|
||||
apr_thread_cond_timedwait(m->req_added, m->lock,
|
||||
apr_time_from_msec(20));
|
||||
status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr);
|
||||
}
|
||||
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
|
||||
"h2_mplx(%ld): start block engine pull", m->id);
|
||||
apr_thread_cond_timedwait(m->req_added, m->lock,
|
||||
apr_time_from_msec(100));
|
||||
ap_log_cerror(APLOG_MARK, APLOG_INFO, status, m->c,
|
||||
"h2_mplx(%ld): done block engine pull", m->id);
|
||||
}
|
||||
status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
|
||||
else {
|
||||
status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr);
|
||||
}
|
||||
leave_mutex(m, acquired);
|
||||
}
|
||||
return status;
|
||||
@@ -1413,29 +1434,23 @@ apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
|
||||
|
||||
void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
|
||||
{
|
||||
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
|
||||
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
|
||||
int acquired;
|
||||
h2_task *task = h2_ctx_cget_task(r_conn);
|
||||
|
||||
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
|
||||
if (h2_ngn_shed_done_req(shed, ngn, r_conn) == APR_SUCCESS) {
|
||||
h2_task *task = h2_ctx_cget_task(r_conn);
|
||||
if (task) {
|
||||
if (task) {
|
||||
h2_mplx *m = task->mplx;
|
||||
int acquired;
|
||||
|
||||
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
|
||||
h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
|
||||
if (task->engine) {
|
||||
/* cannot report that as done until engine returns */
|
||||
}
|
||||
else {
|
||||
h2_task_output_close(task->output);
|
||||
task_done(m, task);
|
||||
}
|
||||
leave_mutex(m, acquired);
|
||||
}
|
||||
leave_mutex(m, acquired);
|
||||
}
|
||||
}
|
||||
|
||||
void h2_mplx_req_engine_exit(h2_req_engine *ngn)
|
||||
{
|
||||
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
|
||||
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
|
||||
int acquired;
|
||||
|
||||
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
|
||||
h2_ngn_shed_done_ngn(shed, ngn);
|
||||
leave_mutex(m, acquired);
|
||||
}
|
||||
}
|
||||
|
@@ -428,6 +428,5 @@ apr_status_t h2_mplx_req_engine_pull(struct h2_req_engine *ngn,
|
||||
apr_uint32_t capacity,
|
||||
request_rec **pr);
|
||||
void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn);
|
||||
void h2_mplx_req_engine_exit(struct h2_req_engine *ngn);
|
||||
|
||||
#endif /* defined(__mod_h2__h2_mplx__) */
|
||||
|
@@ -45,6 +45,7 @@
|
||||
typedef struct h2_ngn_entry h2_ngn_entry;
|
||||
struct h2_ngn_entry {
|
||||
APR_RING_ENTRY(h2_ngn_entry) link;
|
||||
h2_task *task;
|
||||
request_rec *r;
|
||||
};
|
||||
|
||||
@@ -72,6 +73,7 @@ struct h2_req_engine {
|
||||
const char *type; /* name of the engine type */
|
||||
apr_pool_t *pool; /* pool for engine specific allocations */
|
||||
conn_rec *c; /* connection this engine is assigned to */
|
||||
h2_task *task; /* the task this engine is base on, running in */
|
||||
h2_ngn_shed *shed;
|
||||
|
||||
unsigned int shutdown : 1; /* engine is being shut down */
|
||||
@@ -81,10 +83,18 @@ struct h2_req_engine {
|
||||
apr_uint32_t no_assigned; /* # of assigned requests */
|
||||
apr_uint32_t no_live; /* # of live */
|
||||
apr_uint32_t no_finished; /* # of finished */
|
||||
|
||||
apr_thread_cond_t *io; /* condition var for waiting on data */
|
||||
};
|
||||
|
||||
const char *h2_req_engine_get_id(h2_req_engine *engine)
|
||||
{
|
||||
return engine->id;
|
||||
}
|
||||
|
||||
int h2_req_engine_is_shutdown(h2_req_engine *engine)
|
||||
{
|
||||
return engine->shutdown;
|
||||
}
|
||||
|
||||
h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
|
||||
apr_uint32_t req_buffer_size)
|
||||
{
|
||||
@@ -119,14 +129,13 @@ void h2_ngn_shed_abort(h2_ngn_shed *shed)
|
||||
shed->aborted = 1;
|
||||
}
|
||||
|
||||
static apr_status_t ngn_schedule(h2_req_engine *ngn, request_rec *r)
|
||||
static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r)
|
||||
{
|
||||
h2_ngn_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
|
||||
|
||||
h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry));
|
||||
APR_RING_ELEM_INIT(entry, link);
|
||||
entry->task = task;
|
||||
entry->r = r;
|
||||
H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
@@ -134,7 +143,6 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
|
||||
h2_task *task, request_rec *r,
|
||||
h2_req_engine_init *einit){
|
||||
h2_req_engine *ngn;
|
||||
apr_status_t status = APR_EOF;
|
||||
|
||||
AP_DEBUG_ASSERT(shed);
|
||||
|
||||
@@ -147,73 +155,69 @@ apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type,
|
||||
ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
|
||||
if (ngn) {
|
||||
if (ngn->shutdown) {
|
||||
ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
|
||||
"h2_ngn_shed(%ld): %s in shutdown",
|
||||
shed->c->id, ngn->id);
|
||||
ngn = NULL;
|
||||
}
|
||||
else if (ngn->no_assigned >= ngn->capacity) {
|
||||
ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
|
||||
"h2_ngn_shed(%ld): %s over capacity %d/%d",
|
||||
shed->c->id, ngn->id, ngn->no_assigned,
|
||||
ngn->capacity);
|
||||
ngn = NULL;
|
||||
}
|
||||
else if (ngn_schedule(ngn, r) == APR_SUCCESS) {
|
||||
else {
|
||||
/* this task will be processed in another thread,
|
||||
* freeze any I/O for the time being. */
|
||||
h2_task_freeze(task, r);
|
||||
ngn_add_req(ngn, task, r);
|
||||
ngn->no_assigned++;
|
||||
status = APR_SUCCESS;
|
||||
ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
|
||||
"h2_ngn_shed(%ld): pushed request %s to %s",
|
||||
shed->c->id, task->id, ngn->id);
|
||||
}
|
||||
else {
|
||||
ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
|
||||
"h2_ngn_shed(%ld): engine error adding req %s",
|
||||
shed->c->id, ngn->id);
|
||||
ngn = NULL;
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
if (!ngn && einit) {
|
||||
ngn = apr_pcalloc(task->c->pool, sizeof(*ngn));
|
||||
ngn->id = apr_psprintf(task->c->pool, "ngn-%ld-%d",
|
||||
/* none of the existing engines has capacity */
|
||||
if (einit) {
|
||||
apr_status_t status;
|
||||
h2_req_engine *newngn;
|
||||
|
||||
newngn = apr_pcalloc(task->c->pool, sizeof(*ngn));
|
||||
newngn->id = apr_psprintf(task->c->pool, "ngn-%ld-%d",
|
||||
shed->c->id, shed->next_ngn_id++);
|
||||
ngn->pool = task->c->pool;
|
||||
ngn->type = apr_pstrdup(task->c->pool, ngn_type);
|
||||
ngn->c = r->connection;
|
||||
APR_RING_INIT(&ngn->entries, h2_ngn_entry, link);
|
||||
ngn->shed = shed;
|
||||
ngn->capacity = 100;
|
||||
ngn->io = task->io;
|
||||
ngn->no_assigned = 1;
|
||||
ngn->no_live = 1;
|
||||
newngn->pool = task->c->pool;
|
||||
newngn->type = apr_pstrdup(task->c->pool, ngn_type);
|
||||
newngn->c = r->connection;
|
||||
APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
|
||||
newngn->shed = shed;
|
||||
newngn->capacity = 100;
|
||||
newngn->no_assigned = 1;
|
||||
newngn->no_live = 1;
|
||||
|
||||
status = einit(ngn, ngn->id, ngn->type, ngn->pool,
|
||||
status = einit(newngn, newngn->id, newngn->type, newngn->pool,
|
||||
shed->req_buffer_size, r);
|
||||
ap_log_rerror(APLOG_MARK, APLOG_INFO, status, r,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
|
||||
"h2_ngn_shed(%ld): init engine %s (%s)",
|
||||
shed->c->id, ngn->id, ngn->type);
|
||||
shed->c->id, newngn->id, newngn->type);
|
||||
if (status == APR_SUCCESS) {
|
||||
apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, ngn);
|
||||
newngn->task = task;
|
||||
AP_DEBUG_ASSERT(task->engine == NULL);
|
||||
task->engine = newngn;
|
||||
apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
return status;
|
||||
return APR_EOF;
|
||||
}
|
||||
|
||||
static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn)
|
||||
{
|
||||
h2_ngn_entry *entry;
|
||||
h2_task *task;
|
||||
|
||||
for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
|
||||
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
|
||||
entry = H2_NGN_ENTRY_NEXT(entry)) {
|
||||
task = h2_ctx_rget_task(entry->r);
|
||||
AP_DEBUG_ASSERT(task);
|
||||
if (!task->frozen) {
|
||||
if (!entry->task->frozen) {
|
||||
H2_NGN_ENTRY_REMOVE(entry);
|
||||
return entry;
|
||||
}
|
||||
@@ -235,62 +239,60 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c,
|
||||
"h2_ngn_shed(%ld): abort while pulling requests %s",
|
||||
shed->c->id, ngn->id);
|
||||
return APR_EOF;
|
||||
ngn->shutdown = 1;
|
||||
return APR_ECONNABORTED;
|
||||
}
|
||||
|
||||
ngn->capacity = capacity;
|
||||
if (!H2_REQ_ENTRIES_EMPTY(&ngn->entries)
|
||||
&& (entry = pop_non_frozen(ngn))) {
|
||||
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
|
||||
if (H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
|
||||
if (want_shutdown) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
|
||||
"h2_ngn_shed(%ld): emtpy queue, shutdown engine %s",
|
||||
shed->c->id, ngn->id);
|
||||
ngn->shutdown = 1;
|
||||
}
|
||||
return ngn->shutdown? APR_EOF : APR_EAGAIN;
|
||||
}
|
||||
|
||||
if ((entry = pop_non_frozen(ngn))) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c,
|
||||
"h2_ngn_shed(%ld): pulled request %s for engine %s",
|
||||
shed->c->id, entry->r->the_request, ngn->id);
|
||||
shed->c->id, entry->task->id, ngn->id);
|
||||
ngn->no_live++;
|
||||
entry->r->connection->current_thread = ngn->c->current_thread;
|
||||
*pr = entry->r;
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
else if (want_shutdown) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
|
||||
"h2_ngn_shed(%ld): emtpy queue, shutdown engine %s",
|
||||
shed->c->id, ngn->id);
|
||||
ngn->shutdown = 1;
|
||||
return APR_EOF;
|
||||
}
|
||||
return APR_EAGAIN;
|
||||
}
|
||||
|
||||
static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, h2_task *task,
|
||||
int waslive, int aborted)
|
||||
static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn,
|
||||
h2_task *task, int waslive, int aborted,
|
||||
int close)
|
||||
{
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
|
||||
"h2_ngn_shed(%ld): task %s %s by %s",
|
||||
shed->c->id, task->id, aborted? "aborted":"done", ngn->id);
|
||||
h2_task_output_close(task->output);
|
||||
ngn->no_finished++;
|
||||
if (waslive) ngn->no_live--;
|
||||
ngn->no_assigned--;
|
||||
if (task->c != ngn->c) { /* do not release what the engine runs on */
|
||||
return APR_SUCCESS;
|
||||
|
||||
if (close) {
|
||||
h2_task_output_close(task->output);
|
||||
}
|
||||
return APR_EAGAIN;
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
||||
apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed,
|
||||
h2_req_engine *ngn, conn_rec *r_conn)
|
||||
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
|
||||
struct h2_req_engine *ngn, h2_task *task)
|
||||
{
|
||||
h2_task *task = h2_ctx_cget_task(r_conn);
|
||||
if (task) {
|
||||
return ngn_done_task(shed, ngn, task, 1, 0);
|
||||
}
|
||||
return APR_ECONNABORTED;
|
||||
return ngn_done_task(shed, ngn, task, 1, 0, 0);
|
||||
}
|
||||
|
||||
void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
|
||||
{
|
||||
h2_req_engine *existing;
|
||||
|
||||
if (!shed->aborted
|
||||
&& !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
|
||||
if (!shed->aborted && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
|
||||
h2_ngn_entry *entry;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c,
|
||||
"h2_ngn_shed(%ld): exit engine %s (%s), "
|
||||
@@ -309,7 +311,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
|
||||
"h2_ngn_shed(%ld): engine %s has queued task %s, "
|
||||
"frozen=%d, aborting",
|
||||
shed->c->id, ngn->id, task->id, task->frozen);
|
||||
ngn_done_task(shed, ngn, task, 0, 1);
|
||||
ngn_done_task(shed, ngn, task, 0, 1, 1);
|
||||
}
|
||||
}
|
||||
if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
|
||||
@@ -321,7 +323,7 @@ void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
|
||||
(long)ngn->no_finished);
|
||||
}
|
||||
else {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, shed->c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c,
|
||||
"h2_ngn_shed(%ld): exit engine %s (%s)",
|
||||
shed->c->id, ngn->id, ngn->type);
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@
|
||||
#define h2_req_shed_h
|
||||
|
||||
struct h2_req_engine;
|
||||
struct h2_task;
|
||||
|
||||
typedef struct h2_ngn_shed h2_ngn_shed;
|
||||
struct h2_ngn_shed {
|
||||
@@ -30,6 +31,9 @@ struct h2_ngn_shed {
|
||||
apr_uint32_t req_buffer_size; /* preferred buffer size for responses */
|
||||
};
|
||||
|
||||
const char *h2_req_engine_get_id(h2_req_engine *engine);
|
||||
int h2_req_engine_is_shutdown(h2_req_engine *engine);
|
||||
|
||||
typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine,
|
||||
const char *id,
|
||||
const char *type,
|
||||
@@ -55,8 +59,9 @@ apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
|
||||
apr_uint32_t capacity,
|
||||
int want_shutdown, request_rec **pr);
|
||||
|
||||
apr_status_t h2_ngn_shed_done_req(h2_ngn_shed *shed,
|
||||
struct h2_req_engine *ngn, conn_rec *r_conn);
|
||||
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
|
||||
struct h2_req_engine *ngn,
|
||||
struct h2_task *task);
|
||||
|
||||
void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn);
|
||||
|
||||
|
@@ -64,6 +64,7 @@ static apr_status_t proxy_session_pre_close(void *theconn)
|
||||
"proxy_session(%s): pool cleanup, state=%d, streams=%d",
|
||||
session->id, session->state,
|
||||
(int)h2_ihash_count(session->streams));
|
||||
session->aborted = 1;
|
||||
dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
|
||||
nghttp2_session_del(session->ngh2);
|
||||
session->ngh2 = NULL;
|
||||
@@ -116,7 +117,7 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
|
||||
status = proxy_pass_brigade(session->c->bucket_alloc,
|
||||
session->p_conn, session->c,
|
||||
session->output, flush);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
|
||||
"h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d",
|
||||
session->id, (int)length, flush);
|
||||
if (status != APR_SUCCESS) {
|
||||
@@ -155,7 +156,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
|
||||
char buffer[256];
|
||||
|
||||
h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03342)
|
||||
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO(03342)
|
||||
"h2_proxy_session(%s): recv FRAME[%s]",
|
||||
session->id, buffer);
|
||||
}
|
||||
@@ -351,6 +352,8 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
|
||||
if (flags & NGHTTP2_DATA_FLAG_EOF) {
|
||||
b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
|
||||
APR_BRIGADE_INSERT_TAIL(stream->output, b);
|
||||
/*b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
|
||||
APR_BRIGADE_INSERT_TAIL(stream->output, b);*/
|
||||
}
|
||||
|
||||
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r,
|
||||
@@ -372,10 +375,12 @@ static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
|
||||
uint32_t error_code, void *user_data)
|
||||
{
|
||||
h2_proxy_session *session = user_data;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
|
||||
"h2_proxy_session(%s): stream=%d, closed, err=%d",
|
||||
session->id, stream_id, error_code);
|
||||
dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
|
||||
if (!session->aborted) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
|
||||
"h2_proxy_session(%s): stream=%d, closed, err=%d",
|
||||
session->id, stream_id, error_code);
|
||||
dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -495,7 +500,6 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
|
||||
h2_proxy_session *session;
|
||||
nghttp2_session_callbacks *cbs;
|
||||
nghttp2_option *option;
|
||||
ap_filter_t *f;
|
||||
|
||||
session = apr_pcalloc(pool, sizeof(*session));
|
||||
apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
|
||||
@@ -535,15 +539,6 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
|
||||
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
||||
"setup session for %s", p_conn->hostname);
|
||||
|
||||
f = session->c->input_filters;
|
||||
while (f) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
||||
"h2_proxy_session(%s): c->input_filter %s",
|
||||
session->id, f->frec->name);
|
||||
f = f->next;
|
||||
}
|
||||
|
||||
}
|
||||
return p_conn->data;
|
||||
}
|
||||
@@ -677,21 +672,7 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *
|
||||
apr_bucket* b = APR_BRIGADE_FIRST(bb);
|
||||
|
||||
if (APR_BUCKET_IS_METADATA(b)) {
|
||||
if (APR_BUCKET_IS_EOS(b)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
||||
"h2_proxy_session(%s): read EOS from conn",
|
||||
session->id);
|
||||
}
|
||||
else if (APR_BUCKET_IS_FLUSH(b)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
||||
"h2_proxy_session(%s): read FLUSH from conn",
|
||||
session->id);
|
||||
}
|
||||
else {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
||||
"h2_proxy_session(%s): read unkown META from conn",
|
||||
session->id);
|
||||
}
|
||||
/* nop */
|
||||
}
|
||||
else {
|
||||
const char *bdata = NULL;
|
||||
@@ -700,7 +681,7 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *
|
||||
status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
|
||||
if (status == APR_SUCCESS && blen > 0) {
|
||||
n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
|
||||
"h2_proxy_session(%s): feeding %ld bytes -> %ld",
|
||||
session->id, (long)blen, (long)n);
|
||||
if (n < 0) {
|
||||
@@ -719,7 +700,7 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *
|
||||
apr_bucket_delete(b);
|
||||
}
|
||||
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
|
||||
"h2_proxy_session(%s): fed %ld bytes of input to session",
|
||||
session->id, (long)readlen);
|
||||
if (readlen == 0 && status == APR_SUCCESS) {
|
||||
@@ -756,7 +737,7 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
|
||||
AP_MODE_READBYTES,
|
||||
block? APR_BLOCK_READ : APR_NONBLOCK_READ,
|
||||
64 * 1024);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
|
||||
"h2_proxy_session(%s): read from conn", session->id);
|
||||
if (socket && save_timeout != -1) {
|
||||
apr_socket_timeout_set(socket, save_timeout);
|
||||
@@ -770,7 +751,7 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
|
||||
/* nop */
|
||||
}
|
||||
else if (!APR_STATUS_IS_EAGAIN(status)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_INFO, status, session->c,
|
||||
"h2_proxy_session(%s): read error", session->id);
|
||||
dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
|
||||
}
|
||||
@@ -1016,7 +997,7 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
|
||||
transit(session, "no io", H2_PROXYS_ST_DONE);
|
||||
}
|
||||
else {
|
||||
/* When we have no streams, no task event are possible,
|
||||
/* When we have no streams, no task events are possible,
|
||||
* switch to blocking reads */
|
||||
transit(session, "no io", H2_PROXYS_ST_IDLE);
|
||||
}
|
||||
@@ -1059,10 +1040,19 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id,
|
||||
"h2_proxy_sesssion(%s): stream(%d) closed",
|
||||
session->id, stream_id);
|
||||
if (!stream->data_received) {
|
||||
/* last chance to manipulate response headers.
|
||||
* after this, only trailers */
|
||||
apr_bucket *b;
|
||||
/* if the response had no body, this is the time to flush
|
||||
* an empty brigade which will also "write" the resonse
|
||||
* headers */
|
||||
h2_proxy_stream_end_headers_out(stream);
|
||||
stream->data_received = 1;
|
||||
b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
|
||||
APR_BRIGADE_INSERT_TAIL(stream->output, b);
|
||||
b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
|
||||
APR_BRIGADE_INSERT_TAIL(stream->output, b);
|
||||
ap_pass_brigade(stream->r->output_filters, stream->output);
|
||||
}
|
||||
|
||||
stream->state = H2_STREAM_ST_CLOSED;
|
||||
h2_ihash_remove(session->streams, stream_id);
|
||||
h2_iq_remove(session->suspended, stream_id);
|
||||
@@ -1188,11 +1178,13 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
|
||||
"h2_proxy_session(%s): process", session->id);
|
||||
|
||||
run_loop:
|
||||
switch (session->state) {
|
||||
case H2_PROXYS_ST_INIT:
|
||||
status = session_start(session);
|
||||
if (status == APR_SUCCESS) {
|
||||
dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
|
||||
goto run_loop;
|
||||
}
|
||||
else {
|
||||
dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
|
||||
@@ -1223,6 +1215,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
|
||||
if (!have_written && !have_read
|
||||
&& !nghttp2_session_want_write(session->ngh2)) {
|
||||
dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
|
||||
goto run_loop;
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -1255,10 +1248,10 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
|
||||
break;
|
||||
|
||||
case H2_PROXYS_ST_IDLE:
|
||||
return APR_SUCCESS;
|
||||
break;
|
||||
|
||||
case H2_PROXYS_ST_DONE:
|
||||
return APR_SUCCESS;
|
||||
case H2_PROXYS_ST_DONE: /* done, session terminated */
|
||||
return APR_EOF;
|
||||
|
||||
default:
|
||||
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
|
||||
@@ -1278,7 +1271,7 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
|
||||
dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
|
||||
}
|
||||
|
||||
return APR_EAGAIN;
|
||||
return APR_SUCCESS; /* needs to be called again */
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
@@ -1297,10 +1290,15 @@ static int cleanup_iter(void *udata, void *val)
|
||||
void h2_proxy_session_cleanup(h2_proxy_session *session,
|
||||
h2_proxy_request_done *done)
|
||||
{
|
||||
cleanup_iter_ctx ctx;
|
||||
ctx.session = session;
|
||||
ctx.done = done;
|
||||
h2_ihash_iter(session->streams, cleanup_iter, &ctx);
|
||||
h2_ihash_clear(session->streams);
|
||||
if (session->streams && !h2_ihash_is_empty(session->streams)) {
|
||||
cleanup_iter_ctx ctx;
|
||||
ctx.session = session;
|
||||
ctx.done = done;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c,
|
||||
"h2_proxy_session(%s): terminated, %d streams unfinished",
|
||||
session->id, (int)h2_ihash_count(session->streams));
|
||||
h2_ihash_iter(session->streams, cleanup_iter, &ctx);
|
||||
h2_ihash_clear(session->streams);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -61,6 +61,8 @@ struct h2_proxy_session {
|
||||
apr_pool_t *pool;
|
||||
nghttp2_session *ngh2; /* the nghttp2 session itself */
|
||||
|
||||
unsigned int aborted : 1;
|
||||
|
||||
h2_proxy_request_done *done;
|
||||
void *user_data;
|
||||
|
||||
@@ -88,6 +90,14 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
|
||||
apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url,
|
||||
request_rec *r);
|
||||
|
||||
/**
|
||||
* Perform a step in processing the proxy session. Will return aftert
|
||||
* one read/write cycle and indicate session status by status code.
|
||||
* @param s the session to process
|
||||
* @return APR_EAGAIN when processing needs to be invoked again
|
||||
* APR_SUCCESS when all streams have been processed, session still live
|
||||
* APR_EOF when the session has been terminated
|
||||
*/
|
||||
apr_status_t h2_proxy_session_process(h2_proxy_session *s);
|
||||
|
||||
void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done);
|
||||
|
@@ -354,7 +354,7 @@ h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
|
||||
return nreq;
|
||||
}
|
||||
|
||||
request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
|
||||
request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
|
||||
{
|
||||
int access_status = HTTP_OK;
|
||||
|
||||
@@ -362,7 +362,7 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
|
||||
|
||||
r->headers_in = apr_table_clone(r->pool, req->headers);
|
||||
|
||||
ap_run_pre_read_request(r, conn);
|
||||
ap_run_pre_read_request(r, c);
|
||||
|
||||
/* Time to populate r with the data we have. */
|
||||
r->request_time = req->request_time;
|
||||
@@ -405,11 +405,11 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn)
|
||||
/* Request check post hooks failed. An example of this would be a
|
||||
* request for a vhost where h2 is disabled --> 421.
|
||||
*/
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, conn, APLOGNO()
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO()
|
||||
"h2_request(%d): access_status=%d, request_create failed",
|
||||
req->id, access_status);
|
||||
ap_die(access_status, r);
|
||||
ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r);
|
||||
ap_update_child_status(c->sbh, SERVER_BUSY_LOG, r);
|
||||
ap_run_log_transaction(r);
|
||||
r = NULL;
|
||||
goto traceout;
|
||||
|
@@ -299,14 +299,18 @@ static int h2_task_process_conn(conn_rec* c)
|
||||
ctx = h2_ctx_get(c, 0);
|
||||
if (h2_ctx_is_task(ctx)) {
|
||||
if (!ctx->task->ser_headers) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
|
||||
"h2_h2, processing request directly");
|
||||
h2_task_process_request(ctx->task, c);
|
||||
return DONE;
|
||||
}
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
|
||||
"h2_task(%s), serialized handling", ctx->task->id);
|
||||
}
|
||||
else {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
|
||||
"slave_conn(%ld): has no task", c->id);
|
||||
}
|
||||
return DECLINED;
|
||||
}
|
||||
|
||||
|
@@ -41,6 +41,7 @@ struct apr_thread_cond_t;
|
||||
struct h2_conn;
|
||||
struct h2_mplx;
|
||||
struct h2_task;
|
||||
struct h2_req_engine;
|
||||
struct h2_request;
|
||||
struct h2_resp_head;
|
||||
struct h2_worker;
|
||||
@@ -63,6 +64,8 @@ struct h2_task {
|
||||
struct h2_task_input *input;
|
||||
struct h2_task_output *output;
|
||||
struct apr_thread_cond_t *io; /* used to wait for events on */
|
||||
|
||||
struct h2_req_engine *engine;
|
||||
};
|
||||
|
||||
h2_task *h2_task_create(long session_id, const struct h2_request *req,
|
||||
|
@@ -190,15 +190,18 @@ apr_status_t h2_task_output_write(h2_task_output *output,
|
||||
|
||||
void h2_task_output_close(h2_task_output *output)
|
||||
{
|
||||
if (output->task->frozen) {
|
||||
return;
|
||||
}
|
||||
open_if_needed(output, NULL, NULL, "close");
|
||||
if (output->state != H2_TASK_OUT_DONE) {
|
||||
if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) {
|
||||
h2_mplx_out_write(output->task->mplx, output->task->stream_id,
|
||||
NULL, 1, output->frozen_bb, NULL, NULL);
|
||||
}
|
||||
output->state = H2_TASK_OUT_DONE;
|
||||
h2_mplx_out_close(output->task->mplx, output->task->stream_id,
|
||||
get_trailers(output));
|
||||
output->state = H2_TASK_OUT_DONE;
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -148,12 +148,6 @@ static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
|
||||
h2_mplx_req_engine_done(ngn, r_conn);
|
||||
}
|
||||
|
||||
static void http2_req_engine_exit(h2_req_engine *ngn)
|
||||
{
|
||||
h2_mplx_req_engine_exit(ngn);
|
||||
}
|
||||
|
||||
|
||||
/* Runs once per created child process. Perform any process
|
||||
* related initionalization here.
|
||||
*/
|
||||
@@ -179,7 +173,6 @@ static void h2_hooks(apr_pool_t *pool)
|
||||
APR_REGISTER_OPTIONAL_FN(http2_req_engine_push);
|
||||
APR_REGISTER_OPTIONAL_FN(http2_req_engine_pull);
|
||||
APR_REGISTER_OPTIONAL_FN(http2_req_engine_done);
|
||||
APR_REGISTER_OPTIONAL_FN(http2_req_engine_exit);
|
||||
|
||||
ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, "installing hooks");
|
||||
|
||||
|
@@ -74,8 +74,9 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t,
|
||||
* @param timeout wait a maximum amount of time for a new slave, 0 will not wait
|
||||
* @param pslave the slave connection that needs processing or NULL
|
||||
* @return APR_SUCCESS if new request was assigned
|
||||
* APR_EAGAIN/APR_TIMEUP if no new request is available
|
||||
* APR_ECONNABORTED if the engine needs to shut down
|
||||
* APR_EAGAIN if no new request is available
|
||||
* APR_EOF if engine may shut down, as no more request will be scheduled
|
||||
* APR_ECONNABORTED if the engine needs to shut down immediately
|
||||
*/
|
||||
APR_DECLARE_OPTIONAL_FN(apr_status_t,
|
||||
http2_req_engine_pull, (h2_req_engine *engine,
|
||||
@@ -85,15 +86,6 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t,
|
||||
APR_DECLARE_OPTIONAL_FN(void,
|
||||
http2_req_engine_done, (h2_req_engine *engine,
|
||||
conn_rec *rconn));
|
||||
/**
|
||||
* The given request engine is done processing and needs to be excluded
|
||||
* from further handling.
|
||||
* @param engine the engine to exit
|
||||
*/
|
||||
APR_DECLARE_OPTIONAL_FN(void,
|
||||
http2_req_engine_exit, (h2_req_engine *engine));
|
||||
|
||||
|
||||
#define H2_TASK_ID_NOTE "http2-task-id"
|
||||
|
||||
#endif
|
||||
|
@@ -48,7 +48,6 @@ static apr_status_t (*req_engine_pull)(h2_req_engine *engine,
|
||||
apr_uint32_t capacity,
|
||||
request_rec **pr);
|
||||
static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
|
||||
static void (*req_engine_exit)(h2_req_engine *engine);
|
||||
|
||||
typedef struct h2_proxy_ctx {
|
||||
conn_rec *owner;
|
||||
@@ -65,6 +64,8 @@ typedef struct h2_proxy_ctx {
|
||||
const char *engine_type;
|
||||
apr_pool_t *engine_pool;
|
||||
apr_uint32_t req_buffer_size;
|
||||
request_rec *next;
|
||||
apr_size_t capacity;
|
||||
|
||||
unsigned standalone : 1;
|
||||
unsigned is_ssl : 1;
|
||||
@@ -98,15 +99,12 @@ static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
|
||||
req_engine_push = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_push);
|
||||
req_engine_pull = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_pull);
|
||||
req_engine_done = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_done);
|
||||
req_engine_exit = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_exit);
|
||||
|
||||
/* we need all of them */
|
||||
if (!req_engine_push || !req_engine_pull
|
||||
|| !req_engine_done || !req_engine_exit) {
|
||||
if (!req_engine_push || !req_engine_pull || !req_engine_done) {
|
||||
req_engine_push = NULL;
|
||||
req_engine_pull = NULL;
|
||||
req_engine_done = NULL;
|
||||
req_engine_exit = NULL;
|
||||
}
|
||||
|
||||
return status;
|
||||
@@ -213,6 +211,7 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine,
|
||||
ctx->engine_type = type;
|
||||
ctx->engine_pool = pool;
|
||||
ctx->req_buffer_size = req_buffer_size;
|
||||
ctx->capacity = 100;
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r,
|
||||
@@ -245,79 +244,38 @@ static void request_done(h2_proxy_session *session, request_rec *r)
|
||||
if (r == ctx->rbase) {
|
||||
ctx->r_status = APR_SUCCESS;
|
||||
}
|
||||
else if (req_engine_done && ctx->engine) {
|
||||
|
||||
if (req_engine_done && ctx->engine) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
|
||||
"h2_proxy_session(%s): request %s",
|
||||
ctx->engine_id, r->the_request);
|
||||
req_engine_done(ctx->engine, r->connection);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static apr_status_t next_request(h2_proxy_ctx *ctx, h2_proxy_session *session,
|
||||
request_rec *r, int before_leave,
|
||||
request_rec **pr)
|
||||
static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
|
||||
{
|
||||
*pr = r;
|
||||
if (!r && ctx->engine) {
|
||||
if (ctx->next) {
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
else if (req_engine_pull && ctx->engine) {
|
||||
apr_status_t status;
|
||||
status = req_engine_pull(ctx->engine,
|
||||
before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ,
|
||||
H2MAX(1, session->remote_max_concurrent), pr);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
|
||||
"h2_proxy_session(%s): pulled request %s",
|
||||
session->id, (*pr? (*pr)->the_request : "NULL"));
|
||||
return status;
|
||||
ctx->capacity, &ctx->next);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, ctx->owner,
|
||||
"h2_proxy_engine(%s): pulled request %s",
|
||||
ctx->engine_id,
|
||||
(ctx->next? ctx->next->the_request : "NULL"));
|
||||
return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status;
|
||||
}
|
||||
return *pr? APR_SUCCESS : APR_EAGAIN;
|
||||
return APR_EOF;
|
||||
}
|
||||
|
||||
static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
|
||||
static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
|
||||
apr_status_t status = OK;
|
||||
h2_proxy_session *session;
|
||||
|
||||
setup_backend:
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
|
||||
"eng(%s): setup backend", ctx->engine_id);
|
||||
/* Step Two: Make the Connection (or check that an already existing
|
||||
* socket is still usable). On success, we have a socket connected to
|
||||
* backend->hostname. */
|
||||
if (ap_proxy_connect_backend(ctx->proxy_func, ctx->p_conn, ctx->worker,
|
||||
ctx->server)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, ctx->owner, APLOGNO(03352)
|
||||
"H2: failed to make connection to backend: %s",
|
||||
ctx->p_conn->hostname);
|
||||
return HTTP_SERVICE_UNAVAILABLE;
|
||||
}
|
||||
|
||||
/* Step Three: Create conn_rec for the socket we have open now. */
|
||||
if (!ctx->p_conn->connection) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03353)
|
||||
"setup new connection: is_ssl=%d %s %s %s",
|
||||
ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname,
|
||||
r->hostname, ctx->p_conn->hostname);
|
||||
if ((status = ap_proxy_connection_create(ctx->proxy_func, ctx->p_conn,
|
||||
ctx->owner,
|
||||
ctx->server)) != OK) {
|
||||
return status;
|
||||
}
|
||||
|
||||
/*
|
||||
* On SSL connections set a note on the connection what CN is
|
||||
* requested, such that mod_ssl can check if it is requested to do
|
||||
* so.
|
||||
*/
|
||||
if (ctx->p_conn->ssl_hostname) {
|
||||
apr_table_setn(ctx->p_conn->connection->notes,
|
||||
"proxy-request-hostname", ctx->p_conn->ssl_hostname);
|
||||
}
|
||||
|
||||
if (ctx->is_ssl) {
|
||||
apr_table_setn(ctx->p_conn->connection->notes,
|
||||
"proxy-request-alpn-protos", "h2");
|
||||
}
|
||||
}
|
||||
|
||||
/* Step Four: Send the Request in a new HTTP/2 stream and
|
||||
* loop until we got the response or encounter errors.
|
||||
*/
|
||||
@@ -327,67 +285,103 @@ setup_backend:
|
||||
30, h2_log2(ctx->req_buffer_size),
|
||||
request_done);
|
||||
if (!session) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
|
||||
"session unavailable");
|
||||
return HTTP_SERVICE_UNAVAILABLE;
|
||||
}
|
||||
|
||||
run_session:
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
|
||||
"eng(%s): run session %s", ctx->engine_id, session->id);
|
||||
session->user_data = ctx;
|
||||
status = h2_proxy_session_process(session);
|
||||
while (APR_STATUS_IS_EAGAIN(status)) {
|
||||
status = next_request(ctx, session, r, 0, &r);
|
||||
if (status == APR_SUCCESS) {
|
||||
add_request(session, r);
|
||||
r = NULL;
|
||||
}
|
||||
else if (!APR_STATUS_IS_EAGAIN(status)) {
|
||||
break;
|
||||
|
||||
while (1) {
|
||||
if (ctx->next) {
|
||||
add_request(session, ctx->next);
|
||||
ctx->next = NULL;
|
||||
}
|
||||
|
||||
status = h2_proxy_session_process(session);
|
||||
}
|
||||
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, ctx->owner,
|
||||
"eng(%s): end of session run", ctx->engine_id);
|
||||
if (session->state == H2_PROXYS_ST_DONE || status != APR_SUCCESS) {
|
||||
ctx->p_conn->close = 1;
|
||||
}
|
||||
|
||||
if (status == APR_SUCCESS) {
|
||||
status = next_request(ctx, session, r, 1, &r);
|
||||
}
|
||||
if (status == APR_SUCCESS) {
|
||||
if (ctx->p_conn->close) {
|
||||
/* the connection is/willbe closed, the session is terminated.
|
||||
if (status == APR_SUCCESS) {
|
||||
apr_status_t s2;
|
||||
/* ongoing processing, call again */
|
||||
ctx->capacity = H2MAX(100, session->remote_max_concurrent);
|
||||
s2 = next_request(ctx, 0);
|
||||
if (s2 == APR_ECONNABORTED) {
|
||||
/* master connection gone */
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, s2, ctx->owner,
|
||||
"eng(%s): pull request", ctx->engine_id);
|
||||
status = s2;
|
||||
break;
|
||||
}
|
||||
if (!ctx->next && h2_ihash_is_empty(session->streams)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* end of processing, maybe error */
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner,
|
||||
"eng(%s): end of session run", ctx->engine_id);
|
||||
/*
|
||||
* Any open stream of that session needs to
|
||||
* a) be reopened on the new session iff safe to do so
|
||||
* b) reported as done (failed) otherwise
|
||||
*/
|
||||
h2_proxy_session_cleanup(session, request_done);
|
||||
goto setup_backend;
|
||||
break;
|
||||
}
|
||||
add_request(session, r);
|
||||
r = NULL;
|
||||
goto run_session;
|
||||
}
|
||||
|
||||
if (session->streams && !h2_ihash_is_empty(session->streams)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status,
|
||||
ctx->p_conn->connection,
|
||||
"session run done with %d streams unfinished",
|
||||
(int)h2_ihash_count(session->streams));
|
||||
}
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status,
|
||||
ctx->p_conn->connection, "eng(%s): session run done",
|
||||
ctx->engine_id);
|
||||
|
||||
session->user_data = NULL;
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
static apr_status_t setup_engine(h2_proxy_ctx *ctx)
|
||||
{
|
||||
conn_rec *c = ctx->owner;
|
||||
const char *engine_type, *hostname;
|
||||
|
||||
hostname = (ctx->p_conn->ssl_hostname?
|
||||
ctx->p_conn->ssl_hostname : ctx->p_conn->hostname);
|
||||
engine_type = apr_psprintf(c->pool, "proxy_http2 %s%s", hostname,
|
||||
ctx->server_portstr);
|
||||
|
||||
if (c->master && req_engine_push && ctx->next && is_h2 && is_h2(c)) {
|
||||
/* If we are have req_engine capabilities, push the handling of this
|
||||
* request (e.g. slave connection) to a proxy_http2 engine which
|
||||
* uses the same backend. We may be called to create an engine
|
||||
* ourself. */
|
||||
if (req_engine_push(engine_type, ctx->next, proxy_engine_init)
|
||||
== APR_SUCCESS && ctx->engine == NULL) {
|
||||
/* Another engine instance has taken over processing of this
|
||||
* request. */
|
||||
ctx->r_status = APR_SUCCESS;
|
||||
ctx->next = NULL;
|
||||
|
||||
return APR_EOF;
|
||||
}
|
||||
}
|
||||
|
||||
if (!ctx->engine) {
|
||||
/* No engine was available or has been initialized, handle this
|
||||
* request just by ourself. */
|
||||
ctx->engine_id = apr_psprintf(c->pool, "eng-proxy-%ld", c->id);
|
||||
ctx->engine_type = engine_type;
|
||||
ctx->engine_pool = c->pool;
|
||||
ctx->req_buffer_size = (32*1024);
|
||||
ctx->standalone = 1;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
|
||||
"h2_proxy_http2(%ld): setup standalone engine for type %s",
|
||||
c->id, engine_type);
|
||||
}
|
||||
else {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
|
||||
"H2: hosting engine %s", ctx->engine_id);
|
||||
}
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
||||
static int proxy_http2_handler(request_rec *r,
|
||||
proxy_worker *worker,
|
||||
proxy_server_conf *conf,
|
||||
@@ -405,7 +399,6 @@ static int proxy_http2_handler(request_rec *r,
|
||||
apr_pool_t *p = c->pool;
|
||||
apr_uri_t *uri = apr_palloc(p, sizeof(*uri));
|
||||
h2_proxy_ctx *ctx;
|
||||
const char *engine_type, *hostname;
|
||||
|
||||
/* find the scheme */
|
||||
if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') {
|
||||
@@ -441,13 +434,16 @@ static int proxy_http2_handler(request_rec *r,
|
||||
ctx->conf = conf;
|
||||
ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
|
||||
ctx->r_status = HTTP_SERVICE_UNAVAILABLE;
|
||||
|
||||
ctx->next = r;
|
||||
r = NULL;
|
||||
ap_set_module_config(c->conn_config, &proxy_http2_module, ctx);
|
||||
apr_table_setn(r->notes, H2_PROXY_REQ_URL_NOTE, url);
|
||||
|
||||
/* scheme says, this is for us. */
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "H2: serving URL %s", url);
|
||||
apr_table_setn(ctx->rbase->notes, H2_PROXY_REQ_URL_NOTE, url);
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->rbase,
|
||||
"H2: serving URL %s", url);
|
||||
|
||||
run_connect:
|
||||
/* Get a proxy_conn_rec from the worker, might be a new one, might
|
||||
* be one still open from another request, or it might fail if the
|
||||
* worker is stopped or in error. */
|
||||
@@ -460,79 +456,96 @@ static int proxy_http2_handler(request_rec *r,
|
||||
if (ctx->is_ssl) {
|
||||
/* If there is still some data on an existing ssl connection, now
|
||||
* would be a good timne to get rid of it. */
|
||||
ap_proxy_ssl_connection_cleanup(ctx->p_conn, r);
|
||||
ap_proxy_ssl_connection_cleanup(ctx->p_conn, ctx->rbase);
|
||||
}
|
||||
|
||||
/* Step One: Determine the URL to connect to (might be a proxy),
|
||||
* initialize the backend accordingly and determine the server
|
||||
* port string we can expect in responses. */
|
||||
if ((status = ap_proxy_determine_connection(p, r, conf, worker, ctx->p_conn,
|
||||
uri, &locurl, proxyname,
|
||||
proxyport, ctx->server_portstr,
|
||||
if ((status = ap_proxy_determine_connection(p, ctx->rbase, conf, worker,
|
||||
ctx->p_conn, uri, &locurl,
|
||||
proxyname, proxyport,
|
||||
ctx->server_portstr,
|
||||
sizeof(ctx->server_portstr))) != OK) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
hostname = (ctx->p_conn->ssl_hostname?
|
||||
ctx->p_conn->ssl_hostname : ctx->p_conn->hostname);
|
||||
engine_type = apr_psprintf(p, "proxy_http2 %s%s", hostname, ctx->server_portstr);
|
||||
if (!ctx->engine && setup_engine(ctx) != APR_SUCCESS) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (c->master && req_engine_push && is_h2 && is_h2(ctx->owner)) {
|
||||
/* If we are have req_engine capabilities, push the handling of this
|
||||
* request (e.g. slave connection) to a proxy_http2 engine which uses
|
||||
* the same backend. We may be called to create an engine ourself.
|
||||
*/
|
||||
status = req_engine_push(engine_type, r, proxy_engine_init);
|
||||
if (status == APR_SUCCESS && ctx->engine == NULL) {
|
||||
/* Another engine instance has taken over processing of this
|
||||
* request. */
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
|
||||
"H2: pushed request %s to engine type %s",
|
||||
url, engine_type);
|
||||
ctx->r_status = APR_SUCCESS;
|
||||
/* Step Two: Make the Connection (or check that an already existing
|
||||
* socket is still usable). On success, we have a socket connected to
|
||||
* backend->hostname. */
|
||||
if (ap_proxy_connect_backend(ctx->proxy_func, ctx->p_conn, ctx->worker,
|
||||
ctx->server)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, ctx->owner, APLOGNO(03352)
|
||||
"H2: failed to make connection to backend: %s",
|
||||
ctx->p_conn->hostname);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* Step Three: Create conn_rec for the socket we have open now. */
|
||||
if (!ctx->p_conn->connection) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03353)
|
||||
"setup new connection: is_ssl=%d %s %s %s",
|
||||
ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname,
|
||||
locurl, ctx->p_conn->hostname);
|
||||
if ((status = ap_proxy_connection_create(ctx->proxy_func, ctx->p_conn,
|
||||
ctx->owner,
|
||||
ctx->server)) != OK) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* On SSL connections set a note on the connection what CN is
|
||||
* requested, such that mod_ssl can check if it is requested to do
|
||||
* so.
|
||||
*/
|
||||
if (ctx->p_conn->ssl_hostname) {
|
||||
apr_table_setn(ctx->p_conn->connection->notes,
|
||||
"proxy-request-hostname", ctx->p_conn->ssl_hostname);
|
||||
}
|
||||
|
||||
if (ctx->is_ssl) {
|
||||
apr_table_setn(ctx->p_conn->connection->notes,
|
||||
"proxy-request-alpn-protos", "h2");
|
||||
}
|
||||
}
|
||||
|
||||
if (!ctx->engine) {
|
||||
/* No engine was available or has been initialized, handle this
|
||||
* request just by ourself. */
|
||||
ctx->engine_id = apr_psprintf(p, "eng-proxy-%ld", c->id);
|
||||
ctx->engine_type = engine_type;
|
||||
ctx->engine_pool = p;
|
||||
ctx->req_buffer_size = (32*1024);
|
||||
ctx->standalone = 1;
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
|
||||
"h2_proxy_http2(%ld): setup standalone engine for type %s",
|
||||
c->id, engine_type);
|
||||
}
|
||||
else {
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
|
||||
"H2: hosting engine %s for request %s", ctx->engine_id, url);
|
||||
}
|
||||
|
||||
status = proxy_engine_run(ctx, r);
|
||||
|
||||
cleanup:
|
||||
if (ctx->engine && req_engine_exit) {
|
||||
req_engine_exit(ctx->engine);
|
||||
run_session:
|
||||
status = proxy_engine_run(ctx);
|
||||
if (status == APR_SUCCESS) {
|
||||
/* session and connection still ok */
|
||||
if (next_request(ctx, 1) == APR_SUCCESS) {
|
||||
/* more requests, run again */
|
||||
goto run_session;
|
||||
}
|
||||
/* done */
|
||||
ctx->engine = NULL;
|
||||
}
|
||||
|
||||
if (ctx) {
|
||||
if (ctx->p_conn) {
|
||||
if (status != APR_SUCCESS) {
|
||||
ctx->p_conn->close = 1;
|
||||
}
|
||||
proxy_run_detach_backend(r, ctx->p_conn);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "cleanup, releasing connection");
|
||||
ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
|
||||
cleanup:
|
||||
if (ctx->engine && next_request(ctx, 1) == APR_SUCCESS) {
|
||||
/* Still more to do, tear down old conn and start over */
|
||||
ctx->p_conn->close = 1;
|
||||
proxy_run_detach_backend(r, ctx->p_conn);
|
||||
ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
|
||||
ctx->p_conn = NULL;
|
||||
goto run_connect;
|
||||
}
|
||||
|
||||
if (ctx->p_conn) {
|
||||
if (status != APR_SUCCESS) {
|
||||
/* close socket when errors happened or session shut down (EOF) */
|
||||
ctx->p_conn->close = 1;
|
||||
}
|
||||
ctx->worker = NULL;
|
||||
ctx->conf = NULL;
|
||||
proxy_run_detach_backend(ctx->rbase, ctx->p_conn);
|
||||
ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
|
||||
ctx->p_conn = NULL;
|
||||
}
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "leaving handler");
|
||||
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, "leaving handler");
|
||||
return ctx->r_status;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user