1
0
mirror of https://github.com/apache/httpd.git synced 2025-08-08 15:02:10 +03:00

*) mod_http2: use the new REQUEST buckets to forward request

on secondary connections. Use the now generic
     ap_process_connection() in h2 workers to process those.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1899802 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Stefan Eissing
2022-04-13 08:38:12 +00:00
parent fbb84e00fa
commit 5d3b2f1f0c
18 changed files with 355 additions and 294 deletions

View File

@@ -0,0 +1,4 @@
*) mod_http2: use the new REQUEST buckets to forward request
on secondary connections. Use the now generic
ap_process_connection() in h2 workers to process those.
[Stefan Eissing]

View File

@@ -188,7 +188,6 @@ struct h2_request {
apr_table_t *headers;
apr_time_t request_time;
unsigned int chunked : 1; /* iff request body needs to be forwarded as chunked */
apr_off_t raw_bytes; /* RAW network bytes that generated this request - if known. */
int http_status; /* Store a possible HTTP status code that gets
* defined before creating the dummy HTTP/1.1

View File

@@ -57,6 +57,7 @@ static apr_socket_t *dummy_socket;
static ap_filter_rec_t *c2_net_in_filter_handle;
static ap_filter_rec_t *c2_net_out_filter_handle;
static ap_filter_rec_t *c2_request_in_filter_handle;
static ap_filter_rec_t *c2_notes_out_filter_handle;
@@ -165,12 +166,11 @@ static apr_status_t h2_c2_filter_in(ap_filter_t* f,
apr_status_t status = APR_SUCCESS;
apr_bucket *b;
apr_off_t bblen;
const int trace1 = APLOGctrace1(f->c);
apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)?
(apr_size_t)readbytes : APR_SIZE_MAX);
conn_ctx = h2_conn_ctx_get(f->c);
ap_assert(conn_ctx);
AP_DEBUG_ASSERT(conn_ctx);
if (mode == AP_MODE_INIT) {
return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes);
@@ -180,8 +180,8 @@ static apr_status_t h2_c2_filter_in(ap_filter_t* f,
return APR_ECONNABORTED;
}
if (APLOGctrace1(f->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
if (APLOGctrace3(f->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, f->c,
"h2_c2_in(%s-%d): read, mode=%d, block=%d, readbytes=%ld",
conn_ctx->id, conn_ctx->stream_id, mode, block, (long)readbytes);
}
@@ -198,7 +198,7 @@ static apr_status_t h2_c2_filter_in(ap_filter_t* f,
while (APR_BRIGADE_EMPTY(fctx->bb)) {
/* Get more input data for our request. */
if (APLOGctrace1(f->c)) {
if (APLOGctrace2(f->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
"h2_c2_in(%s-%d): get more data from mplx, block=%d, "
"readbytes=%ld",
@@ -260,8 +260,8 @@ receive:
return status;
}
if (trace1) {
h2_util_bb_log(f->c, conn_ctx->stream_id, APLOG_TRACE2,
if (APLOGctrace3(f->c)) {
h2_util_bb_log(f->c, conn_ctx->stream_id, APLOG_TRACE3,
"c2 input.bb", fctx->bb);
}
@@ -357,9 +357,27 @@ static apr_status_t beam_out(conn_rec *c2, h2_conn_ctx_t *conn_ctx, apr_bucket_b
static apr_status_t h2_c2_filter_out(ap_filter_t* f, apr_bucket_brigade* bb)
{
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(f->c);
apr_bucket *e;
apr_status_t rv;
ap_assert(conn_ctx);
if (!conn_ctx->has_final_response) {
for (e = APR_BRIGADE_FIRST(bb);
e != APR_BRIGADE_SENTINEL(bb);
e = APR_BUCKET_NEXT(e))
{
if (AP_BUCKET_IS_RESPONSE(e)) {
ap_bucket_response *resp = e->data;
if (resp->status >= 200) {
conn_ctx->has_final_response = 1;
break;
}
}
if (APR_BUCKET_IS_EOS(e)) {
break;
}
}
}
rv = beam_out(f->c, conn_ctx, bb);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, f->c,
@@ -371,8 +389,19 @@ static apr_status_t h2_c2_filter_out(ap_filter_t* f, apr_bucket_brigade* bb)
return rv;
}
static apr_status_t c2_run_pre_connection(conn_rec *c2, apr_socket_t *csd)
static int c2_hook_pre_connection(conn_rec *c2, void *csd)
{
h2_conn_ctx_t *conn_ctx;
if (!c2->master || !(conn_ctx = h2_conn_ctx_get(c2)) || !conn_ctx->stream_id) {
return DECLINED;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
"h2_c2(%s-%d), adding filters",
conn_ctx->id, conn_ctx->stream_id);
ap_add_input_filter_handle(c2_net_in_filter_handle, NULL, NULL, c2);
ap_add_output_filter_handle(c2_net_out_filter_handle, NULL, NULL, c2);
if (c2->keepalives == 0) {
/* Simulate that we had already a request on this connection. Some
* hooks trigger special behaviour when keepalives is 0.
@@ -388,169 +417,8 @@ static apr_status_t c2_run_pre_connection(conn_rec *c2, apr_socket_t *csd)
* is unnecessary on a h2 stream.
*/
c2->keepalive = AP_CONN_CLOSE;
return ap_run_pre_connection(c2, csd);
}
ap_assert(c2->output_filters);
return APR_SUCCESS;
}
apr_status_t h2_c2_process(conn_rec *c2, apr_thread_t *thread, int worker_id)
{
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
ap_assert(conn_ctx);
ap_assert(conn_ctx->mplx);
/* See the discussion at <https://github.com/icing/mod_h2/issues/195>
*
* Each conn_rec->id is supposed to be unique at a point in time. Since
* some modules (and maybe external code) uses this id as an identifier
* for the request_rec they handle, it needs to be unique for secondary
* connections also.
*
* The MPM module assigns the connection ids and mod_unique_id is using
* that one to generate identifier for requests. While the implementation
* works for HTTP/1.x, the parallel execution of several requests per
* connection will generate duplicate identifiers on load.
*
* The original implementation for secondary connection identifiers used
* to shift the master connection id up and assign the stream id to the
* lower bits. This was cramped on 32 bit systems, but on 64bit there was
* enough space.
*
* As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
* connection id, even on 64bit systems. Therefore collisions in request ids.
*
* The way master connection ids are generated, there is some space "at the
* top" of the lower 32 bits on allmost all systems. If you have a setup
* with 64k threads per child and 255 child processes, you live on the edge.
*
* The new implementation shifts 8 bits and XORs in the worker
* id. This will experience collisions with > 256 h2 workers and heavy
* load still. There seems to be no way to solve this in all possible
* configurations by mod_h2 alone.
*/
c2->id = (c2->master->id << 8)^worker_id;
if (!conn_ctx->pre_conn_done) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
"h2_c2(%s-%d), adding filters",
conn_ctx->id, conn_ctx->stream_id);
ap_add_input_filter_handle(c2_net_in_filter_handle, NULL, NULL, c2);
ap_add_output_filter_handle(c2_net_out_filter_handle, NULL, NULL, c2);
c2_run_pre_connection(c2, ap_get_conn_socket(c2));
conn_ctx->pre_conn_done = 1;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
"h2_c2(%s-%d): process connection",
conn_ctx->id, conn_ctx->stream_id);
c2->current_thread = thread;
ap_run_process_connection(c2);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2,
"h2_c2(%s-%d): processing done",
conn_ctx->id, conn_ctx->stream_id);
return APR_SUCCESS;
}
static apr_status_t c2_process(h2_conn_ctx_t *conn_ctx, conn_rec *c)
{
const h2_request *req = conn_ctx->request;
conn_state_t *cs = c->cs;
request_rec *r;
r = h2_create_request_rec(conn_ctx->request, c);
if (!r) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_c2(%s-%d): create request_rec failed, r=NULL",
conn_ctx->id, conn_ctx->stream_id);
goto cleanup;
}
if (r->status != HTTP_OK) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_c2(%s-%d): create request_rec failed, r->status=%d",
conn_ctx->id, conn_ctx->stream_id, r->status);
goto cleanup;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_c2(%s-%d): created request_rec for %s",
conn_ctx->id, conn_ctx->stream_id, r->the_request);
conn_ctx->server = r->server;
/* the request_rec->server carries the timeout value that applies */
h2_conn_ctx_set_timeout(conn_ctx, r->server->timeout);
/* We only handle this one request on the connection and tell everyone
* that there is no need to keep it "clean" if something fails. Also,
* this prevents mod_reqtimeout from doing funny business with monitoring
* keepalive timeouts.
*/
r->connection->keepalive = AP_CONN_CLOSE;
if (conn_ctx->beam_in && !apr_table_get(r->headers_in, "Content-Length")) {
r->body_indeterminate = 1;
}
if (h2_config_sgeti(conn_ctx->server, H2_CONF_COPY_FILES)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_mplx(%s-%d): copy_files in output",
conn_ctx->id, conn_ctx->stream_id);
h2_beam_set_copy_files(conn_ctx->beam_out, 1);
}
ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r);
if (cs) {
cs->state = CONN_STATE_HANDLER;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_c2(%s-%d): start process_request",
conn_ctx->id, conn_ctx->stream_id);
/* Add the raw bytes of the request (e.g. header frame lengths to
* the logio for this request. */
if (req->raw_bytes && h2_c_logio_add_bytes_in) {
h2_c_logio_add_bytes_in(c, req->raw_bytes);
}
ap_process_request(r);
/* After the call to ap_process_request, the
* request pool may have been deleted. */
r = NULL;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_c2(%s-%d): process_request done",
conn_ctx->id, conn_ctx->stream_id);
if (cs)
cs->state = CONN_STATE_WRITE_COMPLETION;
cleanup:
return APR_SUCCESS;
}
static int h2_c2_hook_process(conn_rec* c)
{
h2_conn_ctx_t *ctx;
if (!c->master) {
return DECLINED;
}
ctx = h2_conn_ctx_get(c);
if (ctx->stream_id) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_h2, processing request directly");
c2_process(ctx, c);
return DONE;
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"secondary_conn(%ld): no h2 stream assing?", c->id);
}
return DECLINED;
return OK;
}
static void check_push(request_rec *r, const char *tag)
@@ -580,21 +448,58 @@ static void check_push(request_rec *r, const char *tag)
}
}
static int h2_c2_hook_post_read_request(request_rec *r)
static void c2_pre_read_request(request_rec *r, conn_rec *c2)
{
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(r->connection);
if (conn_ctx && conn_ctx->stream_id) {
h2_conn_ctx_t *conn_ctx;
if (!c2->master || !(conn_ctx = h2_conn_ctx_get(c2)) || !conn_ctx->stream_id) {
return;
}
ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r,
"h2_c2(%s-%d): adding request filters",
conn_ctx->id, conn_ctx->stream_id);
ap_add_input_filter_handle(c2_request_in_filter_handle, NULL, r, r->connection);
ap_add_output_filter_handle(c2_notes_out_filter_handle, NULL, r, r->connection);
}
static int c2_post_read_request(request_rec *r)
{
h2_conn_ctx_t *conn_ctx;
conn_rec *c2 = r->connection;
if (!c2->master || !(conn_ctx = h2_conn_ctx_get(c2)) || !conn_ctx->stream_id) {
return DECLINED;
}
/* Now that the request_rec is fully initialized, set relevant params */
conn_ctx->server = r->server;
h2_conn_ctx_set_timeout(conn_ctx, r->server->timeout);
/* We only handle this one request on the connection and tell everyone
* that there is no need to keep it "clean" if something fails. Also,
* this prevents mod_reqtimeout from doing funny business with monitoring
* keepalive timeouts.
*/
r->connection->keepalive = AP_CONN_CLOSE;
static int h2_c2_hook_fixups(request_rec *r)
if (conn_ctx->beam_in && !apr_table_get(r->headers_in, "Content-Length")) {
r->body_indeterminate = 1;
}
if (h2_config_sgeti(conn_ctx->server, H2_CONF_COPY_FILES)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r,
"h2_mplx(%s-%d): copy_files in output",
conn_ctx->id, conn_ctx->stream_id);
h2_beam_set_copy_files(conn_ctx->beam_out, 1);
}
/* Add the raw bytes of the request (e.g. header frame lengths to
* the logio for this request. */
if (conn_ctx->request->raw_bytes && h2_c_logio_add_bytes_in) {
h2_c_logio_add_bytes_in(c2, conn_ctx->request->raw_bytes);
}
return OK;
}
static int c2_hook_fixups(request_rec *r)
{
conn_rec *c2 = r->connection;
h2_conn_ctx_t *conn_ctx;
@@ -613,12 +518,14 @@ void h2_c2_register_hooks(void)
/* When the connection processing actually starts, we might
* take over, if the connection is for a h2 stream.
*/
ap_hook_process_connection(h2_c2_hook_process,
NULL, NULL, APR_HOOK_FIRST);
ap_hook_pre_connection(c2_hook_pre_connection,
NULL, NULL, APR_HOOK_MIDDLE);
/* We need to manipulate the standard HTTP/1.1 protocol filters and
* install our own. This needs to be done very early. */
ap_hook_post_read_request(h2_c2_hook_post_read_request, NULL, NULL, APR_HOOK_REALLY_FIRST);
ap_hook_fixups(h2_c2_hook_fixups, NULL, NULL, APR_HOOK_LAST);
ap_hook_pre_read_request(c2_pre_read_request, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_post_read_request(c2_post_read_request, NULL, NULL, APR_HOOK_REALLY_FIRST);
ap_hook_fixups(c2_hook_fixups, NULL, NULL, APR_HOOK_LAST);
c2_net_in_filter_handle =
ap_register_input_filter("H2_C2_NET_IN", h2_c2_filter_in,
@@ -626,6 +533,9 @@ void h2_c2_register_hooks(void)
c2_net_out_filter_handle =
ap_register_output_filter("H2_C2_NET_OUT", h2_c2_filter_out,
NULL, AP_FTYPE_NETWORK);
c2_request_in_filter_handle =
ap_register_input_filter("H2_C2_REQUEST_IN", h2_c2_filter_request_in,
NULL, AP_FTYPE_PROTOCOL);
c2_notes_out_filter_handle =
ap_register_output_filter("H2_C2_NOTES_OUT", h2_c2_filter_notes_out,
NULL, AP_FTYPE_PROTOCOL);

View File

@@ -38,11 +38,6 @@ void h2_c2_destroy(conn_rec *c2);
*/
void h2_c2_abort(conn_rec *c2, conn_rec *from);
/**
* Process a secondary connection for a HTTP/2 stream request.
*/
apr_status_t h2_c2_process(conn_rec *c, apr_thread_t *thread, int worker_id);
void h2_c2_register_hooks(void);
#endif /* defined(__mod_h2__h2_c2__) */

View File

@@ -87,3 +87,40 @@ apr_status_t h2_c2_filter_notes_out(ap_filter_t *f, apr_bucket_brigade *bb)
pass:
return ap_pass_brigade(f->next, bb);
}
apr_status_t h2_c2_filter_request_in(ap_filter_t *f,
apr_bucket_brigade *bb,
ap_input_mode_t mode,
apr_read_type_e block,
apr_off_t readbytes)
{
h2_conn_ctx_t *conn_ctx;
apr_bucket *b;
/* just get out of the way for things we don't want to handle. */
if (mode != AP_MODE_READBYTES && mode != AP_MODE_GETLINE) {
return ap_get_brigade(f->next, bb, mode, block, readbytes);
}
/* This filter is a one-time wonder */
ap_remove_input_filter(f);
if (f->c->master && (conn_ctx = h2_conn_ctx_get(f->c)) && conn_ctx->stream_id) {
if (conn_ctx->request->http_status != H2_HTTP_STATUS_UNSET) {
/* error was encountered preparing this request */
b = ap_bucket_error_create(conn_ctx->request->http_status, NULL, f->r->pool,
f->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(bb, b);
return APR_SUCCESS;
}
b = h2_request_create_bucket(conn_ctx->request, f->r);
APR_BRIGADE_INSERT_TAIL(bb, b);
if (!conn_ctx->beam_in) {
b = apr_bucket_eos_create(f->c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(bb, b);
}
return APR_SUCCESS;
}
return ap_get_brigade(f->next, bb, mode, block, readbytes);
}

View File

@@ -18,21 +18,20 @@
#define __mod_h2__h2_c2_filter__
/**
* h2_from_h1 parses a HTTP/1.1 response into
* - response status
* - a list of header values
* - a series of bytes that represent the response body alone, without
* any meta data, such as inserted by chunked transfer encoding.
*
* All data is allocated from the stream memory pool.
*
* Again, see comments in h2_request: ideally we would take the headers
* and status from the httpd structures instead of parsing them here, but
* we need to have all handlers and filters involved in request/response
* processing, so this seems to be the way for now.
* Output filter that inspects the request_rec->notes of the request
* itself and possible internal redirects to detect conditions that
* merit specific HTTP/2 response codes, such as 421.
*/
struct h2_response_parser;
apr_status_t h2_c2_filter_notes_out(ap_filter_t *f, apr_bucket_brigade *bb);
/**
* Input filter on secondary connections that insert the REQUEST bucket
* with the request to perform and then removes itself.
*/
apr_status_t h2_c2_filter_request_in(ap_filter_t *f,
apr_bucket_brigade *bb,
ap_input_mode_t mode,
apr_read_type_e block,
apr_off_t readbytes);
#endif /* defined(__mod_h2__h2_c2_filter__) */

View File

@@ -43,7 +43,6 @@ struct h2_conn_ctx_t {
struct h2_mplx *mplx; /* c2: the multiplexer */
struct h2_c2_transit *transit; /* c2: transit pool and bucket_alloc */
int pre_conn_done; /* has pre_connection setup run? */
int stream_id; /* c1: 0, c2: stream id processed */
apr_pool_t *req_pool; /* c2: a c2 child pool for a request */
const struct h2_request *request; /* c2: the request to process */

View File

@@ -715,27 +715,6 @@ void h2_mplx_c1_process(h2_mplx *m,
H2_MPLX_LEAVE(m);
}
apr_status_t h2_mplx_c1_fwd_input(h2_mplx *m, struct h2_iqueue *input_pending,
h2_stream_get_fn *get_stream,
struct h2_session *session)
{
int sid;
H2_MPLX_ENTER(m);
while ((sid = h2_iq_shift(input_pending)) > 0) {
h2_stream *stream = get_stream(session, sid);
if (stream) {
H2_MPLX_LEAVE(m);
h2_stream_flush_input(stream);
H2_MPLX_ENTER(m);
}
}
H2_MPLX_LEAVE(m);
return APR_SUCCESS;
}
static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam)
{
conn_rec *c = ctx;
@@ -1083,6 +1062,31 @@ apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id)
return status;
}
apr_status_t h2_mplx_c1_input_closed(h2_mplx *m, int stream_id)
{
h2_stream *stream;
h2_conn_ctx_t *c2_ctx;
apr_status_t status = APR_EAGAIN;
H2_MPLX_ENTER_ALWAYS(m);
stream = h2_ihash_get(m->streams, stream_id);
if (stream && (c2_ctx = h2_conn_ctx_get(stream->c2))) {
if (c2_ctx->beam_in) {
apr_bucket_brigade *tmp =apr_brigade_create(
stream->pool, m->c1->bucket_alloc);
apr_bucket *eos = apr_bucket_eos_create(m->c1->bucket_alloc);
apr_off_t written;
APR_BRIGADE_INSERT_TAIL(tmp, eos);
status = h2_beam_send(c2_ctx->beam_in, m->c1,
tmp, APR_BLOCK_READ, &written);
apr_brigade_destroy(tmp);
}
}
H2_MPLX_LEAVE(m);
return status;
}
static apr_status_t mplx_pollset_create(h2_mplx *m)
{
/* stream0 output only */

View File

@@ -153,11 +153,6 @@ void h2_mplx_c1_process(h2_mplx *m,
struct h2_session *session,
int *pstream_count);
apr_status_t h2_mplx_c1_fwd_input(h2_mplx *m, struct h2_iqueue *input_pending,
h2_stream_get_fn *get_stream,
struct h2_session *session);
/**
* Stream priorities have changed, reschedule pending requests.
*
@@ -199,6 +194,15 @@ apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
*/
apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id);
/**
* Input for stream has been closed. Notify a possibly started
* and waiting stream by sending an EOS.
* @param m the mplx
* @param stream_id the closed stream
* @return APR_SUCCESS iff EOS was sent, APR_EAGAIN if not necessary
*/
apr_status_t h2_mplx_c1_input_closed(h2_mplx *m, int stream_id);
/**
* Get readonly access to a stream for a secondary connection.
*/

View File

@@ -17,6 +17,8 @@
#ifndef __mod_h2__h2_push__
#define __mod_h2__h2_push__
#include <http_protocol.h>
#include "h2.h"
struct h2_request;

View File

@@ -205,13 +205,9 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos,
apr_table_setn(req->headers, "Host", req->authority);
}
if (eos) {
s = apr_table_get(req->headers, "Content-Length");
if (!s) {
/* HTTP/2 does not need a Content-Length for framing, but our
* internal request processing is used to HTTP/1.1, so we
* need to either add a Content-Length or a Transfer-Encoding
* if any content can be expected. */
if (eos && apr_table_get(req->headers, "Content-Type")) {
if (!s && apr_table_get(req->headers, "Content-Type")) {
/* If we have a content-type, but already seen eos, no more
* data will come. Signal a zero content length explicitly.
*/
@@ -291,6 +287,27 @@ static request_rec *my_ap_create_request(conn_rec *c)
}
#endif
apr_bucket *h2_request_create_bucket(const h2_request *req, request_rec *r)
{
conn_rec *c = r->connection;
apr_table_t *headers = apr_table_copy(r->pool, req->headers);
const char *uri = req->path;
AP_DEBUG_ASSERT(req->authority);
if (req->scheme && (ap_cstr_casecmp(req->scheme,
ap_ssl_conn_is_ssl(c->master? c->master : c)? "https" : "http")
|| !ap_cstr_casecmp("CONNECT", req->method))) {
/* Client sent a non-matching ':scheme' pseudo header or CONNECT.
* In this case, we use an absolute URI.
*/
uri = apr_psprintf(r->pool, "%s://%s%s",
req->scheme, req->authority, req->path ? req->path : "");
}
return ap_bucket_request_create(req->method, uri, "HTTP/2.0", headers,
r->pool, c->bucket_alloc);
}
request_rec *h2_create_request_rec(const h2_request *req, conn_rec *c)
{
int access_status = HTTP_OK;

View File

@@ -49,5 +49,7 @@ h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src);
*/
request_rec *h2_create_request_rec(const h2_request *req, conn_rec *conn);
apr_bucket *h2_request_create_bucket(const h2_request *req, request_rec *r);
#endif /* defined(__mod_h2__h2_request__) */

View File

@@ -879,7 +879,6 @@ apr_status_t h2_session_create(h2_session **psession, conn_rec *c, request_rec *
session->max_stream_count = h2_config_sgeti(s, H2_CONF_MAX_STREAMS);
session->max_stream_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM);
session->in_pending = h2_iq_create(session->pool, (int)session->max_stream_count);
session->out_c1_blocked = h2_iq_create(session->pool, (int)session->max_stream_count);
session->ready_to_process = h2_iq_create(session->pool, (int)session->max_stream_count);
@@ -1635,7 +1634,7 @@ static void on_stream_event(void *ctx, h2_stream *stream, h2_stream_event_t ev)
h2_session *session = ctx;
switch (ev) {
case H2_SEV_IN_DATA_PENDING:
h2_iq_append(session->in_pending, stream->id);
session->input_flushed = 1;
break;
case H2_SEV_OUT_C1_BLOCK:
h2_iq_append(session->out_c1_blocked, stream->id);
@@ -1787,10 +1786,9 @@ apr_status_t h2_session_process(h2_session *session, int async)
transit(session, "scheduled stream", H2_SESSION_ST_BUSY);
}
if (!h2_iq_empty(session->in_pending)) {
h2_mplx_c1_fwd_input(session->mplx, session->in_pending,
get_stream, session);
if (session->input_flushed) {
transit(session, "forwarded input", H2_SESSION_ST_BUSY);
session->input_flushed = 0;
}
if (!h2_iq_empty(session->out_c1_blocked)) {

View File

@@ -113,7 +113,7 @@ typedef struct h2_session {
int last_status_code; /* the one already reported */
const char *last_status_msg; /* the one already reported */
struct h2_iqueue *in_pending; /* all streams with input pending */
int input_flushed; /* stream input was flushed */
struct h2_iqueue *out_c1_blocked; /* all streams with output blocked on c1 buffer full */
struct h2_iqueue *ready_to_process; /* all streams ready for processing */

View File

@@ -211,6 +211,30 @@ cleanup:
return APR_SUCCESS;
}
static apr_status_t input_flush(h2_stream *stream)
{
apr_status_t status = APR_SUCCESS;
apr_off_t written;
if (!stream->in_buffer) goto cleanup;
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1,
H2_STRM_MSG(stream, "flush input"));
if (!stream->input) {
h2_stream_setup_input(stream);
}
status = h2_beam_send(stream->input, stream->session->c1,
stream->in_buffer, APR_BLOCK_READ, &written);
stream->in_last_write = apr_time_now();
if (APR_SUCCESS != status && stream->state == H2_SS_CLOSED_L) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c1,
H2_STRM_MSG(stream, "send input error"));
h2_stream_dispatch(stream, H2_SEV_IN_ERROR);
}
cleanup:
return status;
}
static void input_append_bucket(h2_stream *stream, apr_bucket *b)
{
if (!stream->in_buffer) {
@@ -252,11 +276,18 @@ static apr_status_t close_input(h2_stream *stream)
}
stream->input_closed = 1;
if (stream->in_buffer || stream->input) {
if (stream->in_buffer) {
b = apr_bucket_eos_create(c->bucket_alloc);
input_append_bucket(stream, b);
input_flush(stream);
h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
}
else {
rv = h2_mplx_c1_input_closed(stream->session->mplx, stream->id);
if (APR_SUCCESS == rv) {
h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
}
}
cleanup:
return rv;
}
@@ -472,29 +503,6 @@ leave:
return status;
}
apr_status_t h2_stream_flush_input(h2_stream *stream)
{
apr_status_t status = APR_SUCCESS;
apr_off_t written;
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1,
H2_STRM_MSG(stream, "flush input"));
if (stream->in_buffer && !APR_BRIGADE_EMPTY(stream->in_buffer)) {
if (!stream->input) {
h2_stream_setup_input(stream);
}
status = h2_beam_send(stream->input, stream->session->c1,
stream->in_buffer, APR_BLOCK_READ, &written);
stream->in_last_write = apr_time_now();
if (APR_SUCCESS != status && stream->state == H2_SS_CLOSED_L) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c1,
H2_STRM_MSG(stream, "send input error"));
h2_stream_dispatch(stream, H2_SEV_IN_ERROR);
}
}
return status;
}
apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
const uint8_t *data, size_t len)
{
@@ -515,6 +523,7 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
}
stream->in_data_octets += len;
input_append_data(stream, (const char*)data, len);
input_flush(stream);
h2_stream_dispatch(stream, H2_SEV_IN_DATA_PENDING);
}
return status;
@@ -858,6 +867,20 @@ cleanup:
if (APR_SUCCESS == status) {
stream->request = req;
stream->rtmp = NULL;
if (APLOGctrace4(stream->session->c1)) {
int i;
const apr_array_header_t *t_h = apr_table_elts(req->headers);
const apr_table_entry_t *t_elt = (apr_table_entry_t *)t_h->elts;
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, stream->session->c1,
H2_STRM_MSG(stream,"headers received from client:"));
for (i = 0; i < t_h->nelts; i++, t_elt++) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, stream->session->c1,
H2_STRM_MSG(stream, " %s: %s"),
ap_escape_logitem(stream->pool, t_elt->key),
ap_escape_logitem(stream->pool, t_elt->val));
}
}
}
return status;
}
@@ -911,6 +934,10 @@ static apr_status_t buffer_output_receive(h2_stream *stream)
goto cleanup;
}
if (stream->output_eos) {
rv = APR_BRIGADE_EMPTY(stream->out_buffer)? APR_EOF : APR_SUCCESS;
}
else {
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
rv = h2_beam_receive(stream->output, stream->session->c1, stream->out_buffer,
APR_NONBLOCK_READ, stream->session->max_stream_mem - buf_len);
@@ -919,6 +946,7 @@ static apr_status_t buffer_output_receive(h2_stream *stream)
H2_STRM_MSG(stream, "out_buffer, receive unsuccessful"));
goto cleanup;
}
}
/* get rid of buckets we have no need for */
if (!APR_BRIGADE_EMPTY(stream->out_buffer)) {
@@ -930,6 +958,9 @@ static apr_status_t buffer_output_receive(h2_stream *stream)
APR_BUCKET_REMOVE(b);
apr_bucket_destroy(b);
}
else if (APR_BUCKET_IS_EOS(b)) {
stream->output_eos = 1;
}
}
else if (b->length == 0) { /* zero length data */
APR_BUCKET_REMOVE(b);
@@ -1281,7 +1312,7 @@ apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
return APR_SUCCESS;
}
static apr_off_t buffer_output_data_to_send(h2_stream *stream, int *peos)
static apr_off_t output_data_buffered(h2_stream *stream, int *peos)
{
/* How much data do we have in our buffers that we can write? */
apr_off_t buf_len = 0;
@@ -1380,7 +1411,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
}
/* How much data do we have in our buffers that we can write? */
buf_len = buffer_output_data_to_send(stream, &eos);
buf_len = output_data_buffered(stream, &eos);
if (buf_len < length && !eos) {
/* read more? */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c1,
@@ -1388,7 +1419,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
session->id, (int)stream_id, (long)length, (long)buf_len);
rv = buffer_output_receive(stream);
/* process all headers sitting at the buffer head. */
while (APR_SUCCESS == rv) {
while (APR_SUCCESS == rv && !eos && !stream->sent_trailers) {
rv = buffer_output_process_headers(stream);
if (APR_SUCCESS != rv && APR_EAGAIN != rv) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1,
@@ -1396,17 +1427,20 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
"data_cb, error processing headers"));
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
buf_len = buffer_output_data_to_send(stream, &eos);
buf_len = output_data_buffered(stream, &eos);
}
if (APR_EOF == rv) {
eos = 1;
}
else if (APR_SUCCESS != rv && !APR_STATUS_IS_EAGAIN(rv)) {
if (APR_SUCCESS != rv && !APR_STATUS_IS_EAGAIN(rv)) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1,
H2_STRM_LOG(APLOGNO(02938), stream, "data_cb, reading data"));
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
if (stream->sent_trailers) {
AP_DEBUG_ASSERT(eos);
AP_DEBUG_ASSERT(buf_len == 0);
return NGHTTP2_ERR_DEFERRED;
}
}
if (buf_len > (apr_off_t)length) {

View File

@@ -17,6 +17,8 @@
#ifndef __mod_h2__h2_stream__
#define __mod_h2__h2_stream__
#include <http_protocol.h>
#include "h2.h"
/**
@@ -26,7 +28,7 @@
* connection to the client. The h2_session writes to the h2_stream,
* adding HEADERS and DATA and finally an EOS. When headers are done,
* h2_stream is scheduled for handling, which is expected to produce
* RESPONSE buclets.
* RESPONSE buckets.
*/
struct h2_mplx;
@@ -207,8 +209,6 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int frame_type, int flags,
apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
const uint8_t *data, size_t len);
apr_status_t h2_stream_flush_input(h2_stream *stream);
/**
* Reset the stream. Stream write/reads will return errors afterwards.
*

View File

@@ -125,6 +125,28 @@ static int h2_protocol_propose(conn_rec *c, request_rec *r,
return proposed? DECLINED : OK;
}
static void remove_output_filters_below(ap_filter_t *f, ap_filter_type ftype)
{
ap_filter_t *fnext;
while (f && f->frec->ftype < ftype) {
fnext = f->next;
ap_remove_output_filter(f);
f = fnext;
}
}
static void remove_input_filters_below(ap_filter_t *f, ap_filter_type ftype)
{
ap_filter_t *fnext;
while (f && f->frec->ftype < ftype) {
fnext = f->next;
ap_remove_input_filter(f);
f = fnext;
}
}
static int h2_protocol_switch(conn_rec *c, request_rec *r, server_rec *s,
const char *protocol)
{
@@ -155,10 +177,11 @@ static int h2_protocol_switch(conn_rec *c, request_rec *r, server_rec *s,
/* Switching in the middle of a request means that
* we have to send out the response to this one in h2
* format. So we need to take over the connection
* right away.
* and remove all old filters with type up to the
* CONNEDCTION/NETWORK ones.
*/
ap_remove_input_filter_byhandle(r->input_filters, "http_in");
ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
remove_input_filters_below(r->input_filters, AP_FTYPE_CONNECTION);
remove_output_filters_below(r->output_filters, AP_FTYPE_CONNECTION);
/* Ok, start an h2_conn on this one. */
status = h2_c1_setup(c, r, s);

View File

@@ -21,6 +21,7 @@
#include <mpm_common.h>
#include <httpd.h>
#include <http_connection.h>
#include <http_core.h>
#include <http_log.h>
#include <http_protocol.h>
@@ -256,8 +257,41 @@ static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
/* Get the next c2 from mplx to process. */
while (get_next(slot)) {
ap_assert(slot->connection != NULL);
h2_c2_process(slot->connection, thread, slot->id);
/* See the discussion at <https://github.com/icing/mod_h2/issues/195>
*
* Each conn_rec->id is supposed to be unique at a point in time. Since
* some modules (and maybe external code) uses this id as an identifier
* for the request_rec they handle, it needs to be unique for secondary
* connections also.
*
* The MPM module assigns the connection ids and mod_unique_id is using
* that one to generate identifier for requests. While the implementation
* works for HTTP/1.x, the parallel execution of several requests per
* connection will generate duplicate identifiers on load.
*
* The original implementation for secondary connection identifiers used
* to shift the master connection id up and assign the stream id to the
* lower bits. This was cramped on 32 bit systems, but on 64bit there was
* enough space.
*
* As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
* connection id, even on 64bit systems. Therefore collisions in request ids.
*
* The way master connection ids are generated, there is some space "at the
* top" of the lower 32 bits on allmost all systems. If you have a setup
* with 64k threads per child and 255 child processes, you live on the edge.
*
* The new implementation shifts 8 bits and XORs in the worker
* id. This will experience collisions with > 256 h2 workers and heavy
* load still. There seems to be no way to solve this in all possible
* configurations by mod_h2 alone.
*/
AP_DEBUG_ASSERT(slot->connection != NULL);
slot->connection->id = (slot->connection->master->id << 8)^slot->id;
slot->connection->current_thread = thread;
ap_process_connection(slot->connection, ap_get_conn_socket(slot->connection));
h2_mplx_worker_c2_done(slot->connection);
slot->connection = NULL;
}