1
0
mirror of https://github.com/apache/httpd.git synced 2025-08-08 15:02:10 +03:00

*) mod_http2: new configuration directive: ``H2Padding numbits`` to control

padding of HTTP/2 payload frames. 'numbits' is a number from 0-8,
     controlling the range of padding bytes added to a frame. The actual number
     added is chosen randomly per frame. This applies to HEADERS, DATA and PUSH_PROMISE
     frames equally. The default continues to be 0, e.g. no padding. [Stefan Eissing] 
  
  *) mod_http2: ripping out all the h2_req_engine internal features now that mod_proxy_http2
     has no more need for it. Optional functions are still declared but no longer implemented.
     While previous mod_proxy_http2 will work with this, it is recommeneded to run the matching
     versions of both modules. [Stefan Eissing]
  
  *) mod_proxy_http2: changed mod_proxy_http2 implementation and fixed several bugs which
     resolve PR63170. The proxy module does now a single h2 request on the (reused)
     connection and returns. [Stefan Eissing]



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1854963 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Stefan Eissing
2019-03-07 09:41:15 +00:00
parent 10d302435c
commit 9b3dd74c58
22 changed files with 200 additions and 1182 deletions

15
CHANGES
View File

@@ -1,6 +1,21 @@
-*- coding: utf-8 -*- -*- coding: utf-8 -*-
Changes with Apache 2.5.1 Changes with Apache 2.5.1
*) mod_http2: new configuration directive: ```H2Padding numbits``` to control
padding of HTTP/2 payload frames. 'numbits' is a number from 0-8,
controlling the range of padding bytes added to a frame. The actual number
added is chosen randomly per frame. This applies to HEADERS, DATA and PUSH_PROMISE
frames equally. The default continues to be 0, e.g. no padding. [Stefan Eissing]
*) mod_http2: ripping out all the h2_req_engine internal features now that mod_proxy_http2
has no more need for it. Optional functions are still declared but no longer implemented.
While previous mod_proxy_http2 will work with this, it is recommeneded to run the matching
versions of both modules. [Stefan Eissing]
*) mod_proxy_http2: changed mod_proxy_http2 implementation and fixed several bugs which
resolve PR63170. The proxy module does now a single h2 request on the (reused)
connection and returns. [Stefan Eissing]
*) mod_http2/mod_proxy_http2: proxy_http2 checks correct master connection aborted status *) mod_http2/mod_proxy_http2: proxy_http2 checks correct master connection aborted status
to trigger immediate shutdown of backend connections. This is now always signalled to trigger immediate shutdown of backend connections. This is now always signalled
by mod_http2 when the the session is being released. by mod_http2 when the the session is being released.

View File

@@ -31,7 +31,6 @@ h2_from_h1.lo dnl
h2_h2.lo dnl h2_h2.lo dnl
h2_headers.lo dnl h2_headers.lo dnl
h2_mplx.lo dnl h2_mplx.lo dnl
h2_ngn_shed.lo dnl
h2_push.lo dnl h2_push.lo dnl
h2_request.lo dnl h2_request.lo dnl
h2_session.lo dnl h2_session.lo dnl

View File

@@ -48,12 +48,12 @@ extern const char *H2_MAGIC_TOKEN;
#define H2_HEADER_PATH_LEN 5 #define H2_HEADER_PATH_LEN 5
#define H2_CRLF "\r\n" #define H2_CRLF "\r\n"
/* Max data size to write so it fits inside a TLS record */
#define H2_DATA_CHUNK_SIZE ((16*1024) - 100 - 9)
/* Size of the frame header itself in HTTP/2 */ /* Size of the frame header itself in HTTP/2 */
#define H2_FRAME_HDR_LEN 9 #define H2_FRAME_HDR_LEN 9
/* Max data size to write so it fits inside a TLS record */
#define H2_DATA_CHUNK_SIZE ((16*1024) - 100 - H2_FRAME_HDR_LEN)
/* Maximum number of padding bytes in a frame, rfc7540 */ /* Maximum number of padding bytes in a frame, rfc7540 */
#define H2_MAX_PADLEN 256 #define H2_MAX_PADLEN 256
/* Initial default window size, RFC 7540 ch. 6.5.2 */ /* Initial default window size, RFC 7540 ch. 6.5.2 */

View File

@@ -76,6 +76,8 @@ typedef struct h2_config {
int copy_files; /* if files shall be copied vs setaside on output */ int copy_files; /* if files shall be copied vs setaside on output */
apr_array_header_t *push_list;/* list of h2_push_res configurations */ apr_array_header_t *push_list;/* list of h2_push_res configurations */
int early_hints; /* support status code 103 */ int early_hints; /* support status code 103 */
int padding_bits;
int padding_always;
} h2_config; } h2_config;
typedef struct h2_dir_config { typedef struct h2_dir_config {
@@ -111,6 +113,8 @@ static h2_config defconf = {
0, /* copy files across threads */ 0, /* copy files across threads */
NULL, /* push list */ NULL, /* push list */
0, /* early hints, http status 103 */ 0, /* early hints, http status 103 */
0, /* padding bits */
1, /* padding always */
}; };
static h2_dir_config defdconf = { static h2_dir_config defdconf = {
@@ -153,6 +157,8 @@ void *h2_config_create_svr(apr_pool_t *pool, server_rec *s)
conf->copy_files = DEF_VAL; conf->copy_files = DEF_VAL;
conf->push_list = NULL; conf->push_list = NULL;
conf->early_hints = DEF_VAL; conf->early_hints = DEF_VAL;
conf->padding_bits = DEF_VAL;
conf->padding_always = DEF_VAL;
return conf; return conf;
} }
@@ -194,6 +200,8 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv)
n->push_list = add->push_list? add->push_list : base->push_list; n->push_list = add->push_list? add->push_list : base->push_list;
} }
n->early_hints = H2_CONFIG_GET(add, base, early_hints); n->early_hints = H2_CONFIG_GET(add, base, early_hints);
n->padding_bits = H2_CONFIG_GET(add, base, padding_bits);
n->padding_always = H2_CONFIG_GET(add, base, padding_always);
return n; return n;
} }
@@ -275,6 +283,10 @@ static apr_int64_t h2_srv_config_geti64(const h2_config *conf, h2_config_var_t v
return H2_CONFIG_GET(conf, &defconf, copy_files); return H2_CONFIG_GET(conf, &defconf, copy_files);
case H2_CONF_EARLY_HINTS: case H2_CONF_EARLY_HINTS:
return H2_CONFIG_GET(conf, &defconf, early_hints); return H2_CONFIG_GET(conf, &defconf, early_hints);
case H2_CONF_PADDING_BITS:
return H2_CONFIG_GET(conf, &defconf, padding_bits);
case H2_CONF_PADDING_ALWAYS:
return H2_CONFIG_GET(conf, &defconf, padding_always);
default: default:
return DEF_VAL; return DEF_VAL;
} }
@@ -334,6 +346,12 @@ static void h2_srv_config_seti(h2_config *conf, h2_config_var_t var, int val)
case H2_CONF_EARLY_HINTS: case H2_CONF_EARLY_HINTS:
H2_CONFIG_SET(conf, early_hints, val); H2_CONFIG_SET(conf, early_hints, val);
break; break;
case H2_CONF_PADDING_BITS:
H2_CONFIG_SET(conf, padding_bits, val);
break;
case H2_CONF_PADDING_ALWAYS:
H2_CONFIG_SET(conf, padding_always, val);
break;
default: default:
break; break;
} }
@@ -873,6 +891,22 @@ static const char *h2_conf_set_early_hints(cmd_parms *cmd,
return NULL; return NULL;
} }
static const char *h2_conf_set_padding(cmd_parms *cmd, void *dirconf, const char *value)
{
int val;
val = (int)apr_atoi64(value);
if (val < 0) {
return "number of bits must be >= 0";
}
if (val > 8) {
return "number of bits must be <= 8";
}
CONFIG_CMD_SET(cmd, dirconf, H2_CONF_PADDING_BITS, val);
return NULL;
}
void h2_get_num_workers(server_rec *s, int *minw, int *maxw) void h2_get_num_workers(server_rec *s, int *minw, int *maxw)
{ {
int threads_per_child = 0; int threads_per_child = 0;
@@ -941,6 +975,8 @@ const command_rec h2_cmds[] = {
OR_FILEINFO|OR_AUTHCFG, "add a resource to be pushed in this location/on this server."), OR_FILEINFO|OR_AUTHCFG, "add a resource to be pushed in this location/on this server."),
AP_INIT_TAKE1("H2EarlyHints", h2_conf_set_early_hints, NULL, AP_INIT_TAKE1("H2EarlyHints", h2_conf_set_early_hints, NULL,
RSRC_CONF, "on to enable interim status 103 responses"), RSRC_CONF, "on to enable interim status 103 responses"),
AP_INIT_TAKE1("H2Padding", h2_conf_set_padding, NULL,
RSRC_CONF, "set payload padding"),
AP_END_CMD AP_END_CMD
}; };

View File

@@ -42,6 +42,8 @@ typedef enum {
H2_CONF_PUSH_DIARY_SIZE, H2_CONF_PUSH_DIARY_SIZE,
H2_CONF_COPY_FILES, H2_CONF_COPY_FILES,
H2_CONF_EARLY_HINTS, H2_CONF_EARLY_HINTS,
H2_CONF_PADDING_BITS,
H2_CONF_PADDING_ALWAYS,
} h2_config_var_t; } h2_config_var_t;
struct apr_hash_t; struct apr_hash_t;

View File

@@ -40,12 +40,17 @@
* ~= 1300 bytes */ * ~= 1300 bytes */
#define WRITE_SIZE_INITIAL 1300 #define WRITE_SIZE_INITIAL 1300
/* Calculated like this: max TLS record size 16*1024 /* The maximum we'd like to write in one chunk is
* - 40 (IP) - 20 (TCP) - 40 (TCP options) * the max size of a TLS record. When pushing
* - TLS overhead (60-100) * many frames down the h2 connection, this might
* which seems to create less TCP packets overall * align differently because of headers and other
* frames or simply as not sufficient data is
* in a response body.
* However keeping frames at or below this limit
* should make optimizations at the layer that writes
* to TLS easier.
*/ */
#define WRITE_SIZE_MAX (TLS_DATA_MAX - 100) #define WRITE_SIZE_MAX (TLS_DATA_MAX)
#define BUF_REMAIN ((apr_size_t)(bmax-off)) #define BUF_REMAIN ((apr_size_t)(bmax-off))

View File

@@ -40,7 +40,6 @@
#include "h2_ctx.h" #include "h2_ctx.h"
#include "h2_h2.h" #include "h2_h2.h"
#include "h2_mplx.h" #include "h2_mplx.h"
#include "h2_ngn_shed.h"
#include "h2_request.h" #include "h2_request.h"
#include "h2_stream.h" #include "h2_stream.h"
#include "h2_session.h" #include "h2_session.h"
@@ -83,12 +82,6 @@ static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);
static void stream_output_consumed(void *ctx, static void stream_output_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length) h2_bucket_beam *beam, apr_off_t length)
{ {
h2_stream *stream = ctx;
h2_task *task = stream->task;
if (length > 0 && task && task->assigned) {
h2_req_engine_out_consumed(task->assigned, task->c, length);
}
} }
static void stream_input_ev(void *ctx, h2_bucket_beam *beam) static void stream_input_ev(void *ctx, h2_bucket_beam *beam)
@@ -136,7 +129,6 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream)
} }
else if (stream->task) { else if (stream->task) {
stream->task->c->aborted = 1; stream->task->c->aborted = 1;
apr_thread_cond_broadcast(m->task_thawed);
} }
} }
@@ -198,12 +190,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, server_rec *s, apr_pool_t *parent,
return NULL; return NULL;
} }
status = apr_thread_cond_create(&m->task_thawed, m->pool);
if (status != APR_SUCCESS) {
apr_pool_destroy(m->pool);
return NULL;
}
m->max_streams = h2_config_sgeti(s, H2_CONF_MAX_STREAMS); m->max_streams = h2_config_sgeti(s, H2_CONF_MAX_STREAMS);
m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM); m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM);
@@ -226,10 +212,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, server_rec *s, apr_pool_t *parent,
m->limit_change_interval = apr_time_from_msec(100); m->limit_change_interval = apr_time_from_msec(100);
m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*)); m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams,
m->stream_max_mem);
h2_ngn_shed_set_ctx(m->ngn_shed , m);
} }
return m; return m;
} }
@@ -387,10 +369,10 @@ static int report_stream_iter(void *ctx, void *val) {
if (task) { if (task) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
H2_STRM_MSG(stream, "->03198: %s %s %s" H2_STRM_MSG(stream, "->03198: %s %s %s"
"[started=%d/done=%d/frozen=%d]"), "[started=%d/done=%d]"),
task->request->method, task->request->authority, task->request->method, task->request->authority,
task->request->path, task->worker_started, task->request->path, task->worker_started,
task->worker_done, task->frozen); task->worker_done);
} }
else { else {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
@@ -452,9 +434,7 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
/* until empty */ /* until empty */
} }
/* 2. terminate ngn_shed, no more streams /* 2. no more streams should be scheduled or in the active set */
* should be scheduled or in the active set */
h2_ngn_shed_abort(m->ngn_shed);
ap_assert(h2_ihash_empty(m->streams)); ap_assert(h2_ihash_empty(m->streams));
ap_assert(h2_iq_empty(m->q)); ap_assert(h2_iq_empty(m->q));
@@ -478,10 +458,6 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
ap_assert(m->tasks_active == 0); ap_assert(m->tasks_active == 0);
m->join_wait = NULL; m->join_wait = NULL;
/* 4. close the h2_req_enginge shed */
h2_ngn_shed_destroy(m->ngn_shed);
m->ngn_shed = NULL;
/* 4. With all workers done, all streams should be in spurge */ /* 4. With all workers done, all streams should be in spurge */
if (!h2_ihash_empty(m->shold)) { if (!h2_ihash_empty(m->shold)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03516) ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03516)
@@ -787,49 +763,14 @@ apr_status_t h2_mplx_pop_task(h2_mplx *m, h2_task **ptask)
return rv; return rv;
} }
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) static void task_done(h2_mplx *m, h2_task *task)
{ {
h2_stream *stream; h2_stream *stream;
if (task->frozen) {
/* this task was handed over to an engine for processing
* and the original worker has finished. That means the
* engine may start processing now. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done (frozen)", m->id, task->id);
h2_task_thaw(task);
apr_thread_cond_broadcast(m->task_thawed);
return;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id); "h2_mplx(%ld): task(%s) done", m->id, task->id);
out_close(m, task); out_close(m, task);
if (ngn) {
apr_off_t bytes = 0;
h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
bytes += h2_beam_get_buffered(task->output.beam);
if (bytes > 0) {
/* we need to report consumed and current buffered output
* to the engine. The request will be streamed out or cancelled,
* no more data is coming from it and the engine should update
* its calculations before we destroy this information. */
h2_req_engine_out_consumed(ngn, task->c, bytes);
}
}
if (task->engine) {
if (!m->aborted && !task->c->aborted
&& !h2_req_engine_is_shutdown(task->engine)) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022)
"h2_mplx(%ld): task(%s) has not-shutdown "
"engine(%s)", m->id, task->id,
h2_req_engine_get_id(task->engine));
}
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
}
task->worker_done = 1; task->worker_done = 1;
task->done_at = apr_time_now(); task->done_at = apr_time_now();
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
@@ -906,7 +847,7 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{ {
H2_MPLX_ENTER_ALWAYS(m); H2_MPLX_ENTER_ALWAYS(m);
task_done(m, task, NULL); task_done(m, task);
--m->tasks_active; --m->tasks_active;
if (m->join_wait) { if (m->join_wait) {
@@ -1099,143 +1040,6 @@ apr_status_t h2_mplx_idle(h2_mplx *m)
return status; return status;
} }
/*******************************************************************************
* HTTP/2 request engines
******************************************************************************/
typedef struct {
h2_mplx * m;
h2_req_engine *ngn;
int streams_updated;
} ngn_update_ctx;
static int ngn_update_window(void *ctx, void *val)
{
ngn_update_ctx *uctx = ctx;
h2_stream *stream = val;
if (stream->task && stream->task->assigned == uctx->ngn
&& output_consumed_signal(uctx->m, stream->task)) {
++uctx->streams_updated;
}
return 1;
}
static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
{
ngn_update_ctx ctx;
ctx.m = m;
ctx.ngn = ngn;
ctx.streams_updated = 0;
h2_ihash_iter(m->streams, ngn_update_window, &ctx);
return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
}
apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
request_rec *r,
http2_req_engine_init *einit)
{
apr_status_t status;
h2_mplx *m;
h2_task *task;
h2_stream *stream;
task = h2_ctx_get_task(r->connection);
if (!task) {
return APR_ECONNABORTED;
}
m = task->mplx;
H2_MPLX_ENTER(m);
stream = h2_ihash_get(m->streams, task->stream_id);
if (stream) {
status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit);
}
else {
status = APR_ECONNABORTED;
}
H2_MPLX_LEAVE(m);
return status;
}
apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
apr_read_type_e block,
int capacity,
request_rec **pr)
{
h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
h2_mplx *m = h2_ngn_shed_get_ctx(shed);
apr_status_t status;
int want_shutdown;
H2_MPLX_ENTER(m);
want_shutdown = (block == APR_BLOCK_READ);
/* Take this opportunity to update output consummation
* for this engine */
ngn_out_update_windows(m, ngn);
if (want_shutdown && !h2_iq_empty(m->q)) {
/* For a blocking read, check first if requests are to be
* had and, if not, wait a short while before doing the
* blocking, and if unsuccessful, terminating read.
*/
status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
if (APR_STATUS_IS_EAGAIN(status)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): start block engine pull", m->id);
apr_thread_cond_timedwait(m->task_thawed, m->lock,
apr_time_from_msec(20));
status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr);
}
}
else {
status = h2_ngn_shed_pull_request(shed, ngn, capacity,
want_shutdown, pr);
}
H2_MPLX_LEAVE(m);
return status;
}
void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
apr_status_t status)
{
h2_task *task = h2_ctx_get_task(r_conn);
if (task) {
h2_mplx *m = task->mplx;
h2_stream *stream;
int task_hosting_engine = (task->engine != NULL);
H2_MPLX_ENTER_ALWAYS(m);
stream = h2_ihash_get(m->streams, task->stream_id);
ngn_out_update_windows(m, ngn);
h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
if (status != APR_SUCCESS && stream
&& h2_task_can_redo(task)
&& !h2_ihash_get(m->sredo, stream->id)) {
ap_log_cerror(APLOG_MARK, APLOG_INFO, status, m->c,
"h2_mplx(%ld): task %s added to redo", m->id, task->id);
h2_ihash_add(m->sredo, stream);
}
/* cannot report that until hosted engine returns */
if (!task_hosting_engine) {
task_done(m, task, ngn);
}
H2_MPLX_LEAVE(m);
}
}
/******************************************************************************* /*******************************************************************************
* mplx master events dispatching * mplx master events dispatching
******************************************************************************/ ******************************************************************************/

View File

@@ -47,8 +47,6 @@ struct h2_request;
struct apr_thread_cond_t; struct apr_thread_cond_t;
struct h2_workers; struct h2_workers;
struct h2_iqueue; struct h2_iqueue;
struct h2_ngn_shed;
struct h2_req_engine;
#include <apr_queue.h> #include <apr_queue.h>
@@ -86,7 +84,6 @@ struct h2_mplx {
apr_thread_mutex_t *lock; apr_thread_mutex_t *lock;
struct apr_thread_cond_t *added_output; struct apr_thread_cond_t *added_output;
struct apr_thread_cond_t *task_thawed;
struct apr_thread_cond_t *join_wait; struct apr_thread_cond_t *join_wait;
apr_size_t stream_max_mem; apr_size_t stream_max_mem;
@@ -95,8 +92,6 @@ struct h2_mplx {
apr_array_header_t *spare_slaves; /* spare slave connections */ apr_array_header_t *spare_slaves; /* spare slave connections */
struct h2_workers *workers; struct h2_workers *workers;
struct h2_ngn_shed *ngn_shed;
}; };
@@ -302,28 +297,4 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link); \
*/ */
apr_status_t h2_mplx_idle(h2_mplx *m); apr_status_t h2_mplx_idle(h2_mplx *m);
/*******************************************************************************
* h2_req_engine handling
******************************************************************************/
typedef void h2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_size_t req_buffer_size,
request_rec *r,
h2_output_consumed **pconsumed,
void **pbaton);
apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
request_rec *r,
h2_mplx_req_engine_init *einit);
apr_status_t h2_mplx_req_engine_pull(struct h2_req_engine *ngn,
apr_read_type_e block,
int capacity,
request_rec **pr);
void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn,
apr_status_t status);
#endif /* defined(__mod_h2__h2_mplx__) */ #endif /* defined(__mod_h2__h2_mplx__) */

View File

@@ -1,392 +0,0 @@
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <assert.h>
#include <stddef.h>
#include <stdlib.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include <apr_strings.h>
#include <apr_time.h>
#include <httpd.h>
#include <http_core.h>
#include <http_log.h>
#include "mod_http2.h"
#include "h2_private.h"
#include "h2.h"
#include "h2_config.h"
#include "h2_conn.h"
#include "h2_ctx.h"
#include "h2_h2.h"
#include "h2_mplx.h"
#include "h2_request.h"
#include "h2_task.h"
#include "h2_util.h"
#include "h2_ngn_shed.h"
typedef struct h2_ngn_entry h2_ngn_entry;
struct h2_ngn_entry {
APR_RING_ENTRY(h2_ngn_entry) link;
h2_task *task;
request_rec *r;
};
#define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link)
#define H2_NGN_ENTRY_PREV(e) APR_RING_PREV((e), link)
#define H2_NGN_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_ngn_entry, link)
#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_ngn_entry, link)
#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b)
#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \
h2_ngn_entry *ap__b = (e); \
APR_RING_INSERT_HEAD((b), ap__b, h2_ngn_entry, link); \
} while (0)
#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \
h2_ngn_entry *ap__b = (e); \
APR_RING_INSERT_TAIL((b), ap__b, h2_ngn_entry, link); \
} while (0)
struct h2_req_engine {
const char *id; /* identifier */
const char *type; /* name of the engine type */
apr_pool_t *pool; /* pool for engine specific allocations */
conn_rec *c; /* connection this engine is assigned to */
h2_task *task; /* the task this engine is based on, running in */
h2_ngn_shed *shed;
unsigned int shutdown : 1; /* engine is being shut down */
unsigned int done : 1; /* engine has finished */
APR_RING_HEAD(h2_req_entries, h2_ngn_entry) entries;
int capacity; /* maximum concurrent requests */
int no_assigned; /* # of assigned requests */
int no_live; /* # of live */
int no_finished; /* # of finished */
h2_output_consumed *out_consumed;
void *out_consumed_ctx;
};
const char *h2_req_engine_get_id(h2_req_engine *engine)
{
return engine->id;
}
int h2_req_engine_is_shutdown(h2_req_engine *engine)
{
return engine->shutdown;
}
void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c,
apr_off_t bytes)
{
if (engine->out_consumed) {
engine->out_consumed(engine->out_consumed_ctx, c, bytes);
}
}
h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
int default_capacity,
apr_size_t req_buffer_size)
{
h2_ngn_shed *shed;
shed = apr_pcalloc(pool, sizeof(*shed));
shed->c = c;
shed->pool = pool;
shed->default_capacity = default_capacity;
shed->req_buffer_size = req_buffer_size;
shed->ngns = apr_hash_make(pool);
return shed;
}
void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx)
{
shed->user_ctx = user_ctx;
}
void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed)
{
return shed->user_ctx;
}
h2_ngn_shed *h2_ngn_shed_get_shed(h2_req_engine *ngn)
{
return ngn->shed;
}
void h2_ngn_shed_abort(h2_ngn_shed *shed)
{
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, APLOGNO(03394)
"h2_ngn_shed(%ld): abort", shed->c->id);
shed->aborted = 1;
}
static void ngn_add_task(h2_req_engine *ngn, h2_task *task, request_rec *r)
{
h2_ngn_entry *entry = apr_pcalloc(task->pool, sizeof(*entry));
APR_RING_ELEM_INIT(entry, link);
entry->task = task;
entry->r = r;
H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry);
ngn->no_assigned++;
}
apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
request_rec *r,
http2_req_engine_init *einit)
{
h2_req_engine *ngn;
h2_task *task = h2_ctx_get_task(r->connection);
ap_assert(task);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id,
task->id);
if (task->request->serialize) {
/* Max compatibility, deny processing of this */
return APR_EOF;
}
if (task->assigned) {
--task->assigned->no_assigned;
--task->assigned->no_live;
task->assigned = NULL;
}
if (task->engine) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_ngn_shed(%ld): push task(%s) hosting engine %s "
"already with %d tasks",
shed->c->id, task->id, task->engine->id,
task->engine->no_assigned);
task->assigned = task->engine;
ngn_add_task(task->engine, task, r);
return APR_SUCCESS;
}
ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING);
if (ngn && !ngn->shutdown) {
/* this task will be processed in another thread,
* freeze any I/O for the time being. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_ngn_shed(%ld): pushing request %s to %s",
shed->c->id, task->id, ngn->id);
if (!h2_task_has_thawed(task)) {
h2_task_freeze(task);
}
ngn_add_task(ngn, task, r);
return APR_SUCCESS;
}
/* no existing engine or being shut down, start a new one */
if (einit) {
apr_status_t status;
apr_pool_t *pool = task->pool;
h2_req_engine *newngn;
newngn = apr_pcalloc(pool, sizeof(*ngn));
newngn->pool = pool;
newngn->id = apr_psprintf(pool, "ngn-%s", task->id);
newngn->type = apr_pstrdup(pool, ngn_type);
newngn->c = task->c;
newngn->shed = shed;
newngn->capacity = shed->default_capacity;
newngn->no_assigned = 1;
newngn->no_live = 1;
APR_RING_INIT(&newngn->entries, h2_ngn_entry, link);
status = einit(newngn, newngn->id, newngn->type, newngn->pool,
shed->req_buffer_size, r,
&newngn->out_consumed, &newngn->out_consumed_ctx);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03395)
"h2_ngn_shed(%ld): create engine %s (%s)",
shed->c->id, newngn->id, newngn->type);
if (status == APR_SUCCESS) {
newngn->task = task;
task->engine = newngn;
task->assigned = newngn;
apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn);
}
return status;
}
return APR_EOF;
}
static h2_ngn_entry *pop_detached(h2_req_engine *ngn)
{
h2_ngn_entry *entry;
for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
entry = H2_NGN_ENTRY_NEXT(entry)) {
if (h2_task_has_thawed(entry->task)
|| (entry->task->engine == ngn)) {
/* The task hosting this engine can always be pulled by it.
* For other task, they need to become detached, e.g. no longer
* assigned to another worker. */
H2_NGN_ENTRY_REMOVE(entry);
return entry;
}
}
return NULL;
}
apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed,
h2_req_engine *ngn,
int capacity,
int want_shutdown,
request_rec **pr)
{
h2_ngn_entry *entry;
ap_assert(ngn);
*pr = NULL;
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, shed->c, APLOGNO(03396)
"h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d",
shed->c->id, ngn->id, want_shutdown);
if (shed->aborted) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, APLOGNO(03397)
"h2_ngn_shed(%ld): abort while pulling requests %s",
shed->c->id, ngn->id);
ngn->shutdown = 1;
return APR_ECONNABORTED;
}
ngn->capacity = capacity;
if (H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
if (want_shutdown) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): empty queue, shutdown engine %s",
shed->c->id, ngn->id);
ngn->shutdown = 1;
}
return ngn->shutdown? APR_EOF : APR_EAGAIN;
}
if ((entry = pop_detached(ngn))) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c, APLOGNO(03398)
"h2_ngn_shed(%ld): pulled request %s for engine %s",
shed->c->id, entry->task->id, ngn->id);
ngn->no_live++;
*pr = entry->r;
entry->task->assigned = ngn;
/* task will now run in ngn's own thread. Modules like lua
* seem to require the correct thread set in the conn_rec.
* See PR 59542. */
if (entry->task->c && ngn->c) {
entry->task->c->current_thread = ngn->c->current_thread;
}
if (entry->task->engine == ngn) {
/* If an engine pushes its own base task, and then pulls
* it back to itself again, it needs to be thawed.
*/
h2_task_thaw(entry->task);
}
return APR_SUCCESS;
}
if (1) {
entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03399)
"h2_ngn_shed(%ld): pull task, nothing, first task %s",
shed->c->id, entry->task->id);
}
return APR_EAGAIN;
}
static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn,
h2_task *task, int waslive, int aborted)
{
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03400)
"h2_ngn_shed(%ld): task %s %s by %s",
shed->c->id, task->id, aborted? "aborted":"done", ngn->id);
ngn->no_finished++;
if (waslive) ngn->no_live--;
ngn->no_assigned--;
task->assigned = NULL;
return APR_SUCCESS;
}
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
struct h2_req_engine *ngn, h2_task *task)
{
return ngn_done_task(shed, ngn, task, 1, 0);
}
void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn)
{
if (ngn->done) {
return;
}
if (!shed->aborted && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) {
h2_ngn_entry *entry;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): exit engine %s (%s), "
"has still requests queued, shutdown=%d,"
"assigned=%ld, live=%ld, finished=%ld",
shed->c->id, ngn->id, ngn->type,
ngn->shutdown,
(long)ngn->no_assigned, (long)ngn->no_live,
(long)ngn->no_finished);
for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries);
entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries);
entry = H2_NGN_ENTRY_NEXT(entry)) {
h2_task *task = entry->task;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): engine %s has queued task %s, "
"frozen=%d, aborting",
shed->c->id, ngn->id, task->id, task->frozen);
ngn_done_task(shed, ngn, task, 0, 1);
task->engine = task->assigned = NULL;
}
}
if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): exit engine %s (%s), "
"assigned=%ld, live=%ld, finished=%ld",
shed->c->id, ngn->id, ngn->type,
(long)ngn->no_assigned, (long)ngn->no_live,
(long)ngn->no_finished);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c,
"h2_ngn_shed(%ld): exit engine %s",
shed->c->id, ngn->id);
}
apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL);
ngn->done = 1;
}
void h2_ngn_shed_destroy(h2_ngn_shed *shed)
{
ap_assert(apr_hash_count(shed->ngns) == 0);
}

View File

@@ -1,79 +0,0 @@
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef h2_req_shed_h
#define h2_req_shed_h
struct h2_req_engine;
struct h2_task;
typedef struct h2_ngn_shed h2_ngn_shed;
struct h2_ngn_shed {
conn_rec *c;
apr_pool_t *pool;
apr_hash_t *ngns;
void *user_ctx;
unsigned int aborted : 1;
int default_capacity;
apr_size_t req_buffer_size; /* preferred buffer size for responses */
};
const char *h2_req_engine_get_id(h2_req_engine *engine);
int h2_req_engine_is_shutdown(h2_req_engine *engine);
void h2_req_engine_out_consumed(h2_req_engine *engine, conn_rec *c,
apr_off_t bytes);
typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_size_t req_buffer_size,
request_rec *r,
h2_output_consumed **pconsumed,
void **pbaton);
h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c,
int default_capactiy,
apr_size_t req_buffer_size);
void h2_ngn_shed_destroy(h2_ngn_shed *shed);
void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx);
void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed);
h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn);
void h2_ngn_shed_abort(h2_ngn_shed *shed);
apr_status_t h2_ngn_shed_push_request(h2_ngn_shed *shed, const char *ngn_type,
request_rec *r,
h2_shed_ngn_init *init_cb);
apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed, h2_req_engine *pub_ngn,
int capacity,
int want_shutdown, request_rec **pr);
apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed,
struct h2_req_engine *ngn,
struct h2_task *task);
void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn);
#endif /* h2_req_shed_h */

View File

@@ -429,12 +429,6 @@ static int stream_response_data(nghttp2_session *ngh2, uint8_t flags,
stream_id, NGHTTP2_STREAM_CLOSED); stream_id, NGHTTP2_STREAM_CLOSED);
return NGHTTP2_ERR_STREAM_CLOSING; return NGHTTP2_ERR_STREAM_CLOSING;
} }
if (stream->standalone) {
nghttp2_session_consume(ngh2, stream_id, len);
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
"h2_proxy_session(%s): stream %d, win_update %d bytes",
session->id, stream_id, (int)len);
}
return 0; return 0;
} }
@@ -641,7 +635,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
nghttp2_option_new(&option); nghttp2_option_new(&option);
nghttp2_option_set_peer_max_concurrent_streams(option, 100); nghttp2_option_set_peer_max_concurrent_streams(option, 100);
nghttp2_option_set_no_auto_window_update(option, 1); nghttp2_option_set_no_auto_window_update(option, 0);
nghttp2_session_client_new2(&session->ngh2, cbs, session, option); nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
@@ -1545,42 +1539,3 @@ typedef struct {
int updated; int updated;
} win_update_ctx; } win_update_ctx;
static int win_update_iter(void *udata, void *val)
{
win_update_ctx *ctx = udata;
h2_proxy_stream *stream = val;
if (stream->r && stream->r->connection == ctx->c) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c,
"h2_proxy_session(%s-%d): win_update %ld bytes",
ctx->session->id, (int)stream->id, (long)ctx->bytes);
nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes);
ctx->updated = 1;
return 0;
}
return 1;
}
void h2_proxy_session_update_window(h2_proxy_session *session,
conn_rec *c, apr_off_t bytes)
{
if (!h2_proxy_ihash_empty(session->streams)) {
win_update_ctx ctx;
ctx.session = session;
ctx.c = c;
ctx.bytes = bytes;
ctx.updated = 0;
h2_proxy_ihash_iter(session->streams, win_update_iter, &ctx);
if (!ctx.updated) {
/* could not find the stream any more, possibly closed, update
* the connection window at least */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_proxy_session(%s): win_update conn %ld bytes",
session->id, (long)bytes);
nghttp2_session_consume_connection(session->ngh2, (size_t)bytes);
}
}
}

View File

@@ -120,9 +120,6 @@ void h2_proxy_session_cancel_all(h2_proxy_session *s);
void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done); void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done);
void h2_proxy_session_update_window(h2_proxy_session *s,
conn_rec *c, apr_off_t bytes);
#define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url" #define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url"
#endif /* h2_proxy_session_h */ #endif /* h2_proxy_session_h */

View File

@@ -282,7 +282,7 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
/* Time to populate r with the data we have. */ /* Time to populate r with the data we have. */
r->request_time = req->request_time; r->request_time = req->request_time;
r->method = apr_pstrdup(r->pool, req->method); r->method = req->method;
/* Provide quick information about the request method as soon as known */ /* Provide quick information about the request method as soon as known */
r->method_number = ap_method_number_of(r->method); r->method_number = ap_method_number_of(r->method);
if (r->method_number == M_GET && r->method[0] == 'H') { if (r->method_number == M_GET && r->method[0] == 'H') {

View File

@@ -495,9 +495,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
return NGHTTP2_ERR_WOULDBLOCK; return NGHTTP2_ERR_WOULDBLOCK;
} }
if (frame->data.padlen > H2_MAX_PADLEN) { ap_assert(frame->data.padlen <= (H2_MAX_PADLEN+1));
return NGHTTP2_ERR_PROTO;
}
padlen = (unsigned char)frame->data.padlen; padlen = (unsigned char)frame->data.padlen;
stream = get_stream(session, stream_id); stream = get_stream(session, stream_id);
@@ -513,8 +511,9 @@ static int on_send_data_cb(nghttp2_session *ngh2,
H2_STRM_MSG(stream, "send_data_cb for %ld bytes"), H2_STRM_MSG(stream, "send_data_cb for %ld bytes"),
(long)length); (long)length);
status = h2_conn_io_write(&session->io, (const char *)framehd, 9); status = h2_conn_io_write(&session->io, (const char *)framehd, H2_FRAME_HDR_LEN);
if (padlen && status == APR_SUCCESS) { if (padlen && status == APR_SUCCESS) {
--padlen;
status = h2_conn_io_write(&session->io, (const char *)&padlen, 1); status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
} }
@@ -622,6 +621,39 @@ static int on_invalid_header_cb(nghttp2_session *ngh2,
} }
#endif #endif
static ssize_t select_padding_cb(nghttp2_session *ngh2,
const nghttp2_frame *frame,
size_t max_payloadlen, void *user_data)
{
h2_session *session = user_data;
ssize_t frame_len = frame->hd.length + H2_FRAME_HDR_LEN; /* the total length without padding */
ssize_t padded_len = frame_len;
/* Determine # of padding bytes to append to frame. Unless session->padding_always
* the number my be capped by the ui.write_size that currently applies.
*/
if (session->padding_max) {
int n = ap_random_pick(0, session->padding_max);
padded_len = H2MIN(max_payloadlen + H2_FRAME_HDR_LEN, frame_len + n);
}
if (padded_len != frame_len) {
if (!session->padding_always && session->io.write_size
&& (padded_len > session->io.write_size)
&& (frame_len <= session->io.write_size)) {
padded_len = session->io.write_size;
}
if (APLOGctrace2(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"select padding from [%d, %d]: %d (frame length: 0x%04x, write size: %d)",
(int)frame_len, (int)max_payloadlen+H2_FRAME_HDR_LEN,
(int)(padded_len - frame_len), (int)padded_len, (int)session->io.write_size);
}
return padded_len - H2_FRAME_HDR_LEN;
}
return frame->hd.length;
}
#define NGH2_SET_CALLBACK(callbacks, name, fn)\ #define NGH2_SET_CALLBACK(callbacks, name, fn)\
nghttp2_session_callbacks_set_##name##_callback(callbacks, fn) nghttp2_session_callbacks_set_##name##_callback(callbacks, fn)
@@ -647,6 +679,7 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb)
#ifdef H2_NG2_INVALID_HEADER_CB #ifdef H2_NG2_INVALID_HEADER_CB
NGH2_SET_CALLBACK(*pcb, on_invalid_header, on_invalid_header_cb); NGH2_SET_CALLBACK(*pcb, on_invalid_header, on_invalid_header_cb);
#endif #endif
NGH2_SET_CALLBACK(*pcb, select_padding, select_padding_cb);
return APR_SUCCESS; return APR_SUCCESS;
} }
@@ -862,6 +895,11 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec *
ap_add_input_filter("H2_IN", session->cin, r, c); ap_add_input_filter("H2_IN", session->cin, r, c);
h2_conn_io_init(&session->io, c, s); h2_conn_io_init(&session->io, c, s);
session->padding_max = h2_config_sgeti(s, H2_CONF_PADDING_BITS);
if (session->padding_max) {
session->padding_max = (0x01 << session->padding_max) - 1;
}
session->padding_always = h2_config_sgeti(s, H2_CONF_PADDING_ALWAYS);
session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc); session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
status = init_callbacks(c, &callbacks); status = init_callbacks(c, &callbacks);

View File

@@ -85,6 +85,8 @@ typedef struct h2_session {
struct h2_workers *workers; /* for executing stream tasks */ struct h2_workers *workers; /* for executing stream tasks */
struct h2_filter_cin *cin; /* connection input filter context */ struct h2_filter_cin *cin; /* connection input filter context */
h2_conn_io io; /* io on httpd conn filters */ h2_conn_io io; /* io on httpd conn filters */
int padding_max; /* max number of padding bytes */
int padding_always; /* padding has precedence over I/O optimizations */
struct nghttp2_session *ngh2; /* the nghttp2 session (internal use) */ struct nghttp2_session *ngh2; /* the nghttp2 session (internal use) */
h2_session_state state; /* state session is in */ h2_session_state state; /* state session is in */

View File

@@ -854,7 +854,7 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
* is requested. But we can reduce the size in case the master * is requested. But we can reduce the size in case the master
* connection operates in smaller chunks. (TSL warmup) */ * connection operates in smaller chunks. (TSL warmup) */
if (stream->session->io.write_size > 0) { if (stream->session->io.write_size > 0) {
max_chunk = stream->session->io.write_size - 9; /* header bits */ max_chunk = stream->session->io.write_size - H2_FRAME_HDR_LEN;
} }
requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk; requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;

View File

@@ -97,7 +97,7 @@ static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb, int block)
apr_brigade_length(bb, 0, &written); apr_brigade_length(bb, 0, &written);
H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out"); H2_TASK_OUT_LOG(APLOG_TRACE2, task, bb, "h2_task send_out");
h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(before)"); h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(before)");
/* engines send unblocking */
status = h2_beam_send(task->output.beam, bb, status = h2_beam_send(task->output.beam, bb,
block? APR_BLOCK_READ : APR_NONBLOCK_READ); block? APR_BLOCK_READ : APR_NONBLOCK_READ);
h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(after)"); h2_beam_log(task->output.beam, task->c, APLOG_TRACE2, "send_out(after)");
@@ -133,26 +133,9 @@ static apr_status_t slave_out(h2_task *task, ap_filter_t* f,
apr_status_t rv = APR_SUCCESS; apr_status_t rv = APR_SUCCESS;
int flush = 0, blocking; int flush = 0, blocking;
if (task->frozen) {
h2_util_bb_log(task->c, task->stream_id, APLOG_TRACE2,
"frozen task output write, ignored", bb);
while (!APR_BRIGADE_EMPTY(bb)) {
b = APR_BRIGADE_FIRST(bb);
if (AP_BUCKET_IS_EOR(b)) {
APR_BUCKET_REMOVE(b);
task->eor = b;
}
else {
apr_bucket_delete(b);
}
}
return APR_SUCCESS;
}
send: send:
/* we send block once we opened the output, so someone is there /* we send block once we opened the output, so someone is there reading it */
* reading it *and* the task is not assigned to a h2_req_engine */ blocking = task->output.opened;
blocking = (!task->assigned && task->output.opened);
for (b = APR_BRIGADE_FIRST(bb); for (b = APR_BRIGADE_FIRST(bb);
b != APR_BRIGADE_SENTINEL(bb); b != APR_BRIGADE_SENTINEL(bb);
b = APR_BUCKET_NEXT(b)) { b = APR_BUCKET_NEXT(b)) {
@@ -632,18 +615,9 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread, int worker_id)
task->c->current_thread = thread; task->c->current_thread = thread;
ap_run_process_connection(c); ap_run_process_connection(c);
if (task->frozen) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): process_conn returned frozen task",
task->id);
/* cleanup delayed */
return APR_EAGAIN;
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): processing done", task->id); "h2_task(%s): processing done", task->id);
return output_finish(task); return output_finish(task);
}
} }
static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
@@ -681,14 +655,8 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)
ap_process_request(r); ap_process_request(r);
if (task->frozen) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): process_request frozen", task->id);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_task(%s): process_request done", task->id); "h2_task(%s): process_request done", task->id);
}
/* After the call to ap_process_request, the /* After the call to ap_process_request, the
* request pool may have been deleted. We set * request pool may have been deleted. We set
@@ -740,28 +708,3 @@ static int h2_task_process_conn(conn_rec* c)
return DECLINED; return DECLINED;
} }
apr_status_t h2_task_freeze(h2_task *task)
{
if (!task->frozen) {
task->frozen = 1;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03406)
"h2_task(%s), frozen", task->id);
}
return APR_SUCCESS;
}
apr_status_t h2_task_thaw(h2_task *task)
{
if (task->frozen) {
task->frozen = 0;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03407)
"h2_task(%s), thawed", task->id);
}
task->thawed = 1;
return APR_SUCCESS;
}
int h2_task_has_thawed(h2_task *task)
{
return task->thawed;
}

View File

@@ -42,7 +42,6 @@ struct h2_bucket_beam;
struct h2_conn; struct h2_conn;
struct h2_mplx; struct h2_mplx;
struct h2_task; struct h2_task;
struct h2_req_engine;
struct h2_request; struct h2_request;
struct h2_response_parser; struct h2_response_parser;
struct h2_stream; struct h2_stream;
@@ -80,8 +79,6 @@ struct h2_task {
struct h2_mplx *mplx; struct h2_mplx *mplx;
unsigned int filters_set : 1; unsigned int filters_set : 1;
unsigned int frozen : 1;
unsigned int thawed : 1;
unsigned int worker_started : 1; /* h2_worker started processing */ unsigned int worker_started : 1; /* h2_worker started processing */
int worker_done; /* h2_worker finished */ int worker_done; /* h2_worker finished */
@@ -90,9 +87,6 @@ struct h2_task {
apr_time_t started_at; /* when processing started */ apr_time_t started_at; /* when processing started */
apr_time_t done_at; /* when processing was done */ apr_time_t done_at; /* when processing was done */
apr_bucket *eor; apr_bucket *eor;
struct h2_req_engine *engine; /* engine hosted by this task */
struct h2_req_engine *assigned; /* engine that task has been assigned to */
}; };
h2_task *h2_task_create(conn_rec *slave, int stream_id, h2_task *h2_task_create(conn_rec *slave, int stream_id,
@@ -122,8 +116,4 @@ apr_status_t h2_task_init(apr_pool_t *pool, server_rec *s);
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in; extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in;
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out; extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out;
apr_status_t h2_task_freeze(h2_task *task);
apr_status_t h2_task_thaw(h2_task *task);
int h2_task_has_thawed(h2_task *task);
#endif /* defined(__mod_h2__h2_task__) */ #endif /* defined(__mod_h2__h2_task__) */

View File

@@ -27,7 +27,7 @@
* @macro * @macro
* Version number of the http2 module as c string * Version number of the http2 module as c string
*/ */
#define MOD_HTTP2_VERSION "1.12.6-DEV" #define MOD_HTTP2_VERSION "1.14.1-git"
/** /**
* @macro * @macro
@@ -35,7 +35,7 @@
* release. This is a 24 bit number with 8 bits for major number, 8 bits * release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/ */
#define MOD_HTTP2_VERSION_NUM 0x010c06 #define MOD_HTTP2_VERSION_NUM 0x010e01
#endif /* mod_h2_h2_version_h */ #endif /* mod_h2_h2_version_h */

View File

@@ -172,27 +172,6 @@ static char *http2_var_lookup(apr_pool_t *, server_rec *,
conn_rec *, request_rec *, char *name); conn_rec *, request_rec *, char *name);
static int http2_is_h2(conn_rec *); static int http2_is_h2(conn_rec *);
static apr_status_t http2_req_engine_push(const char *ngn_type,
request_rec *r,
http2_req_engine_init *einit)
{
return h2_mplx_req_engine_push(ngn_type, r, einit);
}
static apr_status_t http2_req_engine_pull(h2_req_engine *ngn,
apr_read_type_e block,
int capacity,
request_rec **pr)
{
return h2_mplx_req_engine_pull(ngn, block, (apr_uint32_t)capacity, pr);
}
static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn,
apr_status_t status)
{
h2_mplx_req_engine_done(ngn, r_conn, status);
}
static void http2_get_num_workers(server_rec *s, int *minw, int *maxw) static void http2_get_num_workers(server_rec *s, int *minw, int *maxw)
{ {
h2_get_num_workers(s, minw, maxw); h2_get_num_workers(s, minw, maxw);
@@ -220,9 +199,6 @@ static void h2_hooks(apr_pool_t *pool)
APR_REGISTER_OPTIONAL_FN(http2_is_h2); APR_REGISTER_OPTIONAL_FN(http2_is_h2);
APR_REGISTER_OPTIONAL_FN(http2_var_lookup); APR_REGISTER_OPTIONAL_FN(http2_var_lookup);
APR_REGISTER_OPTIONAL_FN(http2_req_engine_push);
APR_REGISTER_OPTIONAL_FN(http2_req_engine_pull);
APR_REGISTER_OPTIONAL_FN(http2_req_engine_done);
APR_REGISTER_OPTIONAL_FN(http2_get_num_workers); APR_REGISTER_OPTIONAL_FN(http2_get_num_workers);
ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, "installing hooks"); ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, "installing hooks");

View File

@@ -30,22 +30,20 @@ APR_DECLARE_OPTIONAL_FN(int,
/******************************************************************************* /*******************************************************************************
* HTTP/2 request engines * START HTTP/2 request engines (DEPRECATED)
******************************************************************************/ ******************************************************************************/
/* The following functions were introduced for the experimental mod_proxy_http2
* support, but have been abandoned since.
* They are still declared here for backward compatibiliy, in case someone
* tries to build an old mod_proxy_http2 against it, but will disappear
* completely sometime in the future.
*/
struct apr_thread_cond_t; struct apr_thread_cond_t;
typedef struct h2_req_engine h2_req_engine; typedef struct h2_req_engine h2_req_engine;
typedef void http2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed); typedef void http2_output_consumed(void *ctx, conn_rec *c, apr_off_t consumed);
/**
* Initialize a h2_req_engine. The structure will be passed in but
* only the name and master are set. The function should initialize
* all fields.
* @param engine the allocated, partially filled structure
* @param r the first request to process, or NULL
*/
typedef apr_status_t http2_req_engine_init(h2_req_engine *engine, typedef apr_status_t http2_req_engine_init(h2_req_engine *engine,
const char *id, const char *id,
const char *type, const char *type,
@@ -55,35 +53,11 @@ typedef apr_status_t http2_req_engine_init(h2_req_engine *engine,
http2_output_consumed **pconsumed, http2_output_consumed **pconsumed,
void **pbaton); void **pbaton);
/**
* Push a request to an engine with the specified name for further processing.
* If no such engine is available, einit is not NULL, einit is called
* with a new engine record and the caller is responsible for running the
* new engine instance.
* @param engine_type the type of the engine to add the request to
* @param r the request to push to an engine for processing
* @param einit an optional initialization callback for a new engine
* of the requested type, should no instance be available.
* By passing a non-NULL callback, the caller is willing
* to init and run a new engine itself.
* @return APR_SUCCESS iff slave was successfully added to an engine
*/
APR_DECLARE_OPTIONAL_FN(apr_status_t, APR_DECLARE_OPTIONAL_FN(apr_status_t,
http2_req_engine_push, (const char *engine_type, http2_req_engine_push, (const char *engine_type,
request_rec *r, request_rec *r,
http2_req_engine_init *einit)); http2_req_engine_init *einit));
/**
* Get a new request for processing in this engine.
* @param engine the engine which is done processing the slave
* @param block if call should block waiting for request to come
* @param capacity how many parallel requests are acceptable
* @param pr the request that needs processing or NULL
* @return APR_SUCCESS if new request was assigned
* APR_EAGAIN if no new request is available
* APR_EOF if engine may shut down, as no more request will be scheduled
* APR_ECONNABORTED if the engine needs to shut down immediately
*/
APR_DECLARE_OPTIONAL_FN(apr_status_t, APR_DECLARE_OPTIONAL_FN(apr_status_t,
http2_req_engine_pull, (h2_req_engine *engine, http2_req_engine_pull, (h2_req_engine *engine,
apr_read_type_e block, apr_read_type_e block,
@@ -98,4 +72,8 @@ APR_DECLARE_OPTIONAL_FN(void,
http2_get_num_workers, (server_rec *s, http2_get_num_workers, (server_rec *s,
int *minw, int *max)); int *minw, int *max));
/*******************************************************************************
* END HTTP/2 request engines (DEPRECATED)
******************************************************************************/
#endif #endif

View File

@@ -47,20 +47,12 @@ AP_DECLARE_MODULE(proxy_http2) = {
/* Optional functions from mod_http2 */ /* Optional functions from mod_http2 */
static int (*is_h2)(conn_rec *c); static int (*is_h2)(conn_rec *c);
static apr_status_t (*req_engine_push)(const char *name, request_rec *r,
http2_req_engine_init *einit);
static apr_status_t (*req_engine_pull)(h2_req_engine *engine,
apr_read_type_e block,
int capacity,
request_rec **pr);
static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn,
apr_status_t status);
typedef struct h2_proxy_ctx { typedef struct h2_proxy_ctx {
const char *id;
conn_rec *master; conn_rec *master;
conn_rec *owner; conn_rec *owner;
apr_pool_t *pool; apr_pool_t *pool;
request_rec *rbase;
server_rec *server; server_rec *server;
const char *proxy_func; const char *proxy_func;
char server_portstr[32]; char server_portstr[32];
@@ -68,19 +60,16 @@ typedef struct h2_proxy_ctx {
proxy_worker *worker; proxy_worker *worker;
proxy_server_conf *conf; proxy_server_conf *conf;
h2_req_engine *engine;
const char *engine_id;
const char *engine_type;
apr_pool_t *engine_pool;
apr_size_t req_buffer_size; apr_size_t req_buffer_size;
h2_proxy_fifo *requests;
int capacity; int capacity;
unsigned standalone : 1;
unsigned is_ssl : 1; unsigned is_ssl : 1;
unsigned flushall : 1; unsigned flushall : 1;
apr_status_t r_status; /* status of our first request work */ request_rec *r; /* the request processed in this ctx */
apr_status_t r_status; /* status of request work */
int r_done; /* request was processed, not necessarily successfully */
int r_may_retry; /* request may be retried */
h2_proxy_session *session; /* current http2 session against backend */ h2_proxy_session *session; /* current http2 session against backend */
} h2_proxy_ctx; } h2_proxy_ctx;
@@ -106,16 +95,6 @@ static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog,
MOD_HTTP2_VERSION, ngh2? ngh2->version_str : "unknown"); MOD_HTTP2_VERSION, ngh2? ngh2->version_str : "unknown");
is_h2 = APR_RETRIEVE_OPTIONAL_FN(http2_is_h2); is_h2 = APR_RETRIEVE_OPTIONAL_FN(http2_is_h2);
req_engine_push = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_push);
req_engine_pull = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_pull);
req_engine_done = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_done);
/* we need all of them */
if (!req_engine_push || !req_engine_pull || !req_engine_done) {
req_engine_push = NULL;
req_engine_pull = NULL;
req_engine_done = NULL;
}
return status; return status;
} }
@@ -206,45 +185,6 @@ static int proxy_http2_canon(request_rec *r, char *url)
return OK; return OK;
} }
static void out_consumed(void *baton, conn_rec *c, apr_off_t bytes)
{
h2_proxy_ctx *ctx = baton;
if (ctx->session) {
h2_proxy_session_update_window(ctx->session, c, bytes);
}
}
static apr_status_t proxy_engine_init(h2_req_engine *engine,
const char *id,
const char *type,
apr_pool_t *pool,
apr_size_t req_buffer_size,
request_rec *r,
http2_output_consumed **pconsumed,
void **pctx)
{
h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config,
&proxy_http2_module);
if (!ctx) {
ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(03368)
"h2_proxy_session, engine init, no ctx found");
return APR_ENOTIMPL;
}
ctx->pool = pool;
ctx->engine = engine;
ctx->engine_id = id;
ctx->engine_type = type;
ctx->engine_pool = pool;
ctx->req_buffer_size = req_buffer_size;
ctx->capacity = H2MIN(100, h2_proxy_fifo_capacity(ctx->requests));
*pconsumed = out_consumed;
*pctx = ctx;
return APR_SUCCESS;
}
static apr_status_t add_request(h2_proxy_session *session, request_rec *r) static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
{ {
h2_proxy_ctx *ctx = session->user_data; h2_proxy_ctx *ctx = session->user_data;
@@ -254,7 +194,7 @@ static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE); url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE);
apr_table_setn(r->notes, "proxy-source-port", apr_psprintf(r->pool, "%hu", apr_table_setn(r->notes, "proxy-source-port", apr_psprintf(r->pool, "%hu",
ctx->p_conn->connection->local_addr->port)); ctx->p_conn->connection->local_addr->port));
status = h2_proxy_session_submit(session, url, r, ctx->standalone); status = h2_proxy_session_submit(session, url, r, 1);
if (status != APR_SUCCESS) { if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r->connection, APLOGNO(03351) ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r->connection, APLOGNO(03351)
"pass request body failed to %pI (%s) from %s (%s)", "pass request body failed to %pI (%s) from %s (%s)",
@@ -268,43 +208,15 @@ static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
static void request_done(h2_proxy_ctx *ctx, request_rec *r, static void request_done(h2_proxy_ctx *ctx, request_rec *r,
apr_status_t status, int touched) apr_status_t status, int touched)
{ {
const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE); if (r == ctx->r) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection,
"h2_proxy_session(%s): request done %s, touched=%d", "h2_proxy_session(%s): request done, touched=%d",
ctx->engine_id, task_id, touched); ctx->id, touched);
if (status != APR_SUCCESS) { ctx->r_done = 1;
if (!touched) { if (touched) ctx->r_may_retry = 0;
/* untouched request, need rescheduling */
status = h2_proxy_fifo_push(ctx->requests, r);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
APLOGNO(03369)
"h2_proxy_session(%s): rescheduled request %s",
ctx->engine_id, task_id);
return;
}
else {
const char *uri;
uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s "
"not complete, cannot repeat",
ctx->engine_id, task_id, uri);
}
}
if (r == ctx->rbase) {
ctx->r_status = ((status == APR_SUCCESS)? APR_SUCCESS ctx->r_status = ((status == APR_SUCCESS)? APR_SUCCESS
: HTTP_SERVICE_UNAVAILABLE); : HTTP_SERVICE_UNAVAILABLE);
} }
if (req_engine_done && ctx->engine) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection,
APLOGNO(03370)
"h2_proxy_session(%s): finished request %s",
ctx->engine_id, task_id);
req_engine_done(ctx->engine, r->connection, status);
}
} }
static void session_req_done(h2_proxy_session *session, request_rec *r, static void session_req_done(h2_proxy_session *session, request_rec *r,
@@ -313,43 +225,15 @@ static void session_req_done(h2_proxy_session *session, request_rec *r,
request_done(session->user_data, r, status, touched); request_done(session->user_data, r, status, touched);
} }
static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave) static apr_status_t ctx_run(h2_proxy_ctx *ctx) {
{
if (h2_proxy_fifo_count(ctx->requests) > 0) {
return APR_SUCCESS;
}
else if (req_engine_pull && ctx->engine) {
apr_status_t status;
request_rec *r = NULL;
status = req_engine_pull(ctx->engine, before_leave?
APR_BLOCK_READ: APR_NONBLOCK_READ,
ctx->capacity, &r);
if (status == APR_SUCCESS && r) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner,
"h2_proxy_engine(%s): pulled request (%s) %s",
ctx->engine_id,
before_leave? "before leave" : "regular",
r->the_request);
h2_proxy_fifo_push(ctx->requests, r);
}
return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status;
}
return APR_EOF;
}
static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
apr_status_t status = OK; apr_status_t status = OK;
int h2_front; int h2_front;
request_rec *r;
/* Step Four: Send the Request in a new HTTP/2 stream and /* Step Four: Send the Request in a new HTTP/2 stream and
* loop until we got the response or encounter errors. * loop until we got the response or encounter errors.
*/ */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
"eng(%s): setup session", ctx->engine_id);
h2_front = is_h2? is_h2(ctx->owner) : 0; h2_front = is_h2? is_h2(ctx->owner) : 0;
ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf, ctx->session = h2_proxy_session_setup(ctx->id, ctx->p_conn, ctx->conf,
h2_front, 30, h2_front, 30,
h2_proxy_log2((int)ctx->req_buffer_size), h2_proxy_log2((int)ctx->req_buffer_size),
session_req_done); session_req_done);
@@ -360,45 +244,20 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
} }
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03373) ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03373)
"eng(%s): run session %s", ctx->engine_id, ctx->session->id); "eng(%s): run session %s", ctx->id, ctx->session->id);
ctx->session->user_data = ctx; ctx->session->user_data = ctx;
while (1) { ctx->r_done = 0;
if (ctx->master->aborted) { add_request(ctx->session, ctx->r);
status = APR_ECONNABORTED;
goto out; while (!ctx->master->aborted && !ctx->r_done) {
}
if (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
add_request(ctx->session, r);
}
status = h2_proxy_session_process(ctx->session); status = h2_proxy_session_process(ctx->session);
if (status != APR_SUCCESS) {
if (status == APR_SUCCESS) {
/* ongoing processing, check if we have room to handle more streams,
* maybe the remote side changed their limit */
if (ctx->session->remote_max_concurrent > 0
&& ctx->session->remote_max_concurrent != ctx->capacity) {
ctx->capacity = H2MIN((int)ctx->session->remote_max_concurrent,
h2_proxy_fifo_capacity(ctx->requests));
}
/* try to pull more request, if our capacity allows it */
if (APR_ECONNABORTED == next_request(ctx, 0)) {
status = APR_ECONNABORTED;
goto out;
}
/* If we have no ongoing streams and nothing in our queue, we
* terminate processing and return to our caller. */
if ((h2_proxy_fifo_count(ctx->requests) == 0)
&& h2_proxy_ihash_empty(ctx->session->streams)) {
goto out;
}
}
else {
/* Encountered an error during session processing */ /* Encountered an error during session processing */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner,
APLOGNO(03375) "eng(%s): end of session %s", APLOGNO(03375) "eng(%s): end of session %s",
ctx->engine_id, ctx->session->id); ctx->id, ctx->session->id);
/* Any open stream of that session needs to /* Any open stream of that session needs to
* a) be reopened on the new session iff safe to do so * a) be reopened on the new session iff safe to do so
* b) reported as done (failed) otherwise * b) reported as done (failed) otherwise
@@ -409,12 +268,11 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
} }
out: out:
if (APR_ECONNABORTED == status) { if (ctx->master->aborted) {
/* master connection gone */ /* master connection gone */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner,
APLOGNO(03374) "eng(%s): master connection gone", ctx->engine_id); APLOGNO(03374) "eng(%s): master connection gone", ctx->id);
/* give notice that we're leaving and cancel all ongoing streams. */ /* cancel all ongoing requests */
next_request(ctx, 1);
h2_proxy_session_cancel_all(ctx->session); h2_proxy_session_cancel_all(ctx->session);
h2_proxy_session_process(ctx->session); h2_proxy_session_process(ctx->session);
if (!ctx->master->aborted) { if (!ctx->master->aborted) {
@@ -427,49 +285,6 @@ out:
return status; return status;
} }
static apr_status_t push_request_somewhere(h2_proxy_ctx *ctx, request_rec *r)
{
conn_rec *c = ctx->owner;
const char *engine_type, *hostname;
hostname = (ctx->p_conn->ssl_hostname?
ctx->p_conn->ssl_hostname : ctx->p_conn->hostname);
engine_type = apr_psprintf(ctx->pool, "proxy_http2 %s%s", hostname,
ctx->server_portstr);
if (c->master && req_engine_push && r && is_h2 && is_h2(c)) {
/* If we are have req_engine capabilities, push the handling of this
* request (e.g. slave connection) to a proxy_http2 engine which
* uses the same backend. We may be called to create an engine
* ourself. */
if (req_engine_push(engine_type, r, proxy_engine_init) == APR_SUCCESS) {
if (ctx->engine == NULL) {
/* request has been assigned to an engine in another thread */
return SUSPENDED;
}
}
}
if (!ctx->engine) {
/* No engine was available or has been initialized, handle this
* request just by ourself. */
ctx->engine_id = apr_psprintf(ctx->pool, "eng-proxy-%ld", c->id);
ctx->engine_type = engine_type;
ctx->engine_pool = ctx->pool;
ctx->req_buffer_size = (32*1024);
ctx->standalone = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_proxy_http2(%ld): setup standalone engine for type %s",
c->id, engine_type);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"H2: hosting engine %s", ctx->engine_id);
}
return h2_proxy_fifo_push(ctx->requests, r);
}
static int proxy_http2_handler(request_rec *r, static int proxy_http2_handler(request_rec *r,
proxy_worker *worker, proxy_worker *worker,
proxy_server_conf *conf, proxy_server_conf *conf,
@@ -477,7 +292,7 @@ static int proxy_http2_handler(request_rec *r,
const char *proxyname, const char *proxyname,
apr_port_t proxyport) apr_port_t proxyport)
{ {
const char *proxy_func; const char *proxy_func, *task_id;
char *locurl = url, *u; char *locurl = url, *u;
apr_size_t slen; apr_size_t slen;
int is_ssl = 0; int is_ssl = 0;
@@ -510,33 +325,34 @@ static int proxy_http2_handler(request_rec *r,
return DECLINED; return DECLINED;
} }
task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE);
ctx = apr_pcalloc(r->pool, sizeof(*ctx)); ctx = apr_pcalloc(r->pool, sizeof(*ctx));
ctx->master = r->connection->master? r->connection->master : r->connection; ctx->master = r->connection->master? r->connection->master : r->connection;
ctx->id = task_id? task_id : apr_psprintf(r->pool, "%ld", (long)ctx->master->id);
ctx->owner = r->connection; ctx->owner = r->connection;
ctx->pool = r->pool; ctx->pool = r->pool;
ctx->rbase = r;
ctx->server = r->server; ctx->server = r->server;
ctx->proxy_func = proxy_func; ctx->proxy_func = proxy_func;
ctx->is_ssl = is_ssl; ctx->is_ssl = is_ssl;
ctx->worker = worker; ctx->worker = worker;
ctx->conf = conf; ctx->conf = conf;
ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0; ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
ctx->r_status = HTTP_SERVICE_UNAVAILABLE; ctx->req_buffer_size = (32*1024);
ctx->r = r;
h2_proxy_fifo_set_create(&ctx->requests, ctx->pool, 100); ctx->r_status = status = HTTP_SERVICE_UNAVAILABLE;
ctx->r_done = 0;
ctx->r_may_retry = 1;
ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx); ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx);
/* scheme says, this is for us. */ /* scheme says, this is for us. */
apr_table_setn(ctx->rbase->notes, H2_PROXY_REQ_URL_NOTE, url); apr_table_setn(ctx->r->notes, H2_PROXY_REQ_URL_NOTE, url);
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->rbase, ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->r,
"H2: serving URL %s", url); "H2: serving URL %s", url);
run_connect: run_connect:
if (ctx->master->aborted) { if (ctx->master->aborted) goto cleanup;
ctx->r_status = APR_ECONNABORTED;
goto cleanup;
}
/* Get a proxy_conn_rec from the worker, might be a new one, might /* Get a proxy_conn_rec from the worker, might be a new one, might
* be one still open from another request, or it might fail if the * be one still open from another request, or it might fail if the
@@ -551,7 +367,7 @@ run_connect:
/* Step One: Determine the URL to connect to (might be a proxy), /* Step One: Determine the URL to connect to (might be a proxy),
* initialize the backend accordingly and determine the server * initialize the backend accordingly and determine the server
* port string we can expect in responses. */ * port string we can expect in responses. */
if ((status = ap_proxy_determine_connection(ctx->pool, ctx->rbase, conf, worker, if ((status = ap_proxy_determine_connection(ctx->pool, ctx->r, conf, worker,
ctx->p_conn, &uri, &locurl, ctx->p_conn, &uri, &locurl,
proxyname, proxyport, proxyname, proxyport,
ctx->server_portstr, ctx->server_portstr,
@@ -559,17 +375,6 @@ run_connect:
goto cleanup; goto cleanup;
} }
/* If we are not already hosting an engine, try to push the request
* to an already existing engine or host a new engine here. */
if (r && !ctx->engine) {
ctx->r_status = push_request_somewhere(ctx, r);
r = NULL;
if (ctx->r_status == SUSPENDED) {
/* request was pushed to another thread, leave processing here */
goto cleanup;
}
}
/* Step Two: Make the Connection (or check that an already existing /* Step Two: Make the Connection (or check that an already existing
* socket is still usable). On success, we have a socket connected to * socket is still usable). On success, we have a socket connected to
* backend->hostname. */ * backend->hostname. */
@@ -578,19 +383,19 @@ run_connect:
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03352) ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03352)
"H2: failed to make connection to backend: %s", "H2: failed to make connection to backend: %s",
ctx->p_conn->hostname); ctx->p_conn->hostname);
goto reconnect; goto cleanup;
} }
/* Step Three: Create conn_rec for the socket we have open now. */ /* Step Three: Create conn_rec for the socket we have open now. */
if (!ctx->p_conn->connection) { if (!ctx->p_conn->connection) {
status = ap_proxy_connection_create_ex(ctx->proxy_func, status = ap_proxy_connection_create_ex(ctx->proxy_func, ctx->p_conn, ctx->r);
ctx->p_conn, ctx->rbase);
if (status != OK) { if (status != OK) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03353) ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03353)
"setup new connection: is_ssl=%d %s %s %s", "setup new connection: is_ssl=%d %s %s %s",
ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname, ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname,
locurl, ctx->p_conn->hostname); locurl, ctx->p_conn->hostname);
goto reconnect; ctx->r_status = status;
goto cleanup;
} }
if (!ctx->p_conn->data && ctx->is_ssl) { if (!ctx->p_conn->data && ctx->is_ssl) {
@@ -600,7 +405,7 @@ run_connect:
apr_table_setn(ctx->p_conn->connection->notes, apr_table_setn(ctx->p_conn->connection->notes,
"proxy-request-alpn-protos", "h2"); "proxy-request-alpn-protos", "h2");
if (ctx->p_conn->ssl_hostname) { if (ctx->p_conn->ssl_hostname) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, ctx->owner, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
"set SNI to %s for (%s)", "set SNI to %s for (%s)",
ctx->p_conn->ssl_hostname, ctx->p_conn->ssl_hostname,
ctx->p_conn->hostname); ctx->p_conn->hostname);
@@ -610,33 +415,11 @@ run_connect:
} }
} }
run_session: if (ctx->master->aborted) goto cleanup;
if (ctx->owner->aborted) { status = ctx_run(ctx);
ctx->r_status = APR_ECONNABORTED;
goto cleanup;
}
status = proxy_engine_run(ctx); if (ctx->r_status != APR_SUCCESS && ctx->r_may_retry && !ctx->master->aborted) {
if (status == APR_SUCCESS) { /* Not successfully processed, but may retry, tear down old conn and start over */
/* session and connection still ok */
if (next_request(ctx, 1) == APR_SUCCESS) {
/* more requests, run again */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03376)
"run_session, again");
goto run_session;
}
/* done */
ctx->engine = NULL;
}
reconnect:
if (ctx->master->aborted) {
ctx->r_status = APR_ECONNABORTED;
goto cleanup;
}
if (next_request(ctx, 1) == APR_SUCCESS) {
/* Still more to do, tear down old conn and start over */
if (ctx->p_conn) { if (ctx->p_conn) {
ctx->p_conn->close = 1; ctx->p_conn->close = 1;
#if AP_MODULE_MAGIC_AT_LEAST(20140207, 2) #if AP_MODULE_MAGIC_AT_LEAST(20140207, 2)
@@ -646,12 +429,12 @@ reconnect:
ctx->p_conn = NULL; ctx->p_conn = NULL;
} }
++reconnects; ++reconnects;
if (reconnects < 5 && !ctx->master->aborted) { if (reconnects < 5) {
goto run_connect; goto run_connect;
} }
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(10023) ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(10023)
"giving up after %d reconnects, %d requests todo", "giving up after %d reconnects, request-done=%d",
reconnects, h2_proxy_fifo_count(ctx->requests)); reconnects, ctx->r_done);
} }
cleanup: cleanup:
@@ -661,17 +444,12 @@ cleanup:
ctx->p_conn->close = 1; ctx->p_conn->close = 1;
} }
#if AP_MODULE_MAGIC_AT_LEAST(20140207, 2) #if AP_MODULE_MAGIC_AT_LEAST(20140207, 2)
proxy_run_detach_backend(ctx->rbase, ctx->p_conn); proxy_run_detach_backend(ctx->r, ctx->p_conn);
#endif #endif
ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server); ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server);
ctx->p_conn = NULL; ctx->p_conn = NULL;
} }
/* Any requests we still have need to fail */
while (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) {
request_done(ctx, r, HTTP_SERVICE_UNAVAILABLE, 1);
}
ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, NULL); ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, NULL);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner,
APLOGNO(03377) "leaving handler"); APLOGNO(03377) "leaving handler");