mirror of
https://github.com/apache/httpd.git
synced 2025-08-07 04:02:58 +03:00
mod_http2/mod_proxy_http2: proper 100-continue handling up to backend, more robustness on connection reuse with PING frames
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1765318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
7
CHANGES
7
CHANGES
@@ -1,6 +1,13 @@
|
||||
-*- coding: utf-8 -*-
|
||||
Changes with Apache 2.5.0
|
||||
|
||||
*) mod_http2/mod_proxy_http2: 100-continue handling now properly implemented
|
||||
up to the backend. Reused HTTP/2 proxy connections with more than a second
|
||||
not used will block request bodies until a PING answer is received.
|
||||
Requests headers are not delayed by this, since they are repeatable in
|
||||
case of failure. This greatly increases robustness, especially with
|
||||
busy server and/or low keepalive connections. [Stefan Eissing]
|
||||
|
||||
*) mod_dav: Fix a potential cause of unbounded memory usage or incorrect
|
||||
behavior in a routine that sends <DAV:response>'s to the output filters.
|
||||
[Evgeny Kotkov]
|
||||
|
@@ -1 +1 @@
|
||||
3467
|
||||
3472
|
||||
|
@@ -129,6 +129,8 @@ struct h2_request {
|
||||
|
||||
unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */
|
||||
unsigned int serialize : 1; /* iff this request is written in HTTP/1.1 serialization */
|
||||
unsigned int expect_100 : 1; /* iff we need a 100-continue response */
|
||||
unsigned int expect_failed : 1; /* iff we are unable to fullfill expects */
|
||||
};
|
||||
|
||||
typedef struct h2_headers h2_headers;
|
||||
|
@@ -16,6 +16,7 @@
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include <apr_date.h>
|
||||
#include <apr_lib.h>
|
||||
#include <apr_strings.h>
|
||||
|
||||
@@ -296,6 +297,209 @@ static h2_headers *create_response(h2_task *task, request_rec *r)
|
||||
return h2_headers_rcreate(r, r->status, headers, r->pool);
|
||||
}
|
||||
|
||||
typedef enum {
|
||||
H2_RP_STATUS_LINE,
|
||||
H2_RP_HEADER_LINE,
|
||||
H2_RP_DONE
|
||||
} h2_rp_state_t;
|
||||
|
||||
typedef struct h2_response_parser {
|
||||
h2_rp_state_t state;
|
||||
h2_task *task;
|
||||
int http_status;
|
||||
apr_array_header_t *hlines;
|
||||
apr_bucket_brigade *tmp;
|
||||
} h2_response_parser;
|
||||
|
||||
static apr_status_t parse_header(h2_response_parser *parser, char *line) {
|
||||
const char *hline;
|
||||
if (line[0] == ' ' || line[0] == '\t') {
|
||||
char **plast;
|
||||
/* continuation line from the header before this */
|
||||
while (line[0] == ' ' || line[0] == '\t') {
|
||||
++line;
|
||||
}
|
||||
|
||||
plast = apr_array_pop(parser->hlines);
|
||||
if (plast == NULL) {
|
||||
/* not well formed */
|
||||
return APR_EINVAL;
|
||||
}
|
||||
hline = apr_psprintf(parser->task->pool, "%s %s", *plast, line);
|
||||
}
|
||||
else {
|
||||
/* new header line */
|
||||
hline = apr_pstrdup(parser->task->pool, line);
|
||||
}
|
||||
APR_ARRAY_PUSH(parser->hlines, const char*) = hline;
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
||||
static apr_status_t get_line(h2_response_parser *parser, apr_bucket_brigade *bb,
|
||||
char *line, apr_size_t len)
|
||||
{
|
||||
h2_task *task = parser->task;
|
||||
apr_status_t status;
|
||||
|
||||
if (!parser->tmp) {
|
||||
parser->tmp = apr_brigade_create(task->pool, task->c->bucket_alloc);
|
||||
}
|
||||
status = apr_brigade_split_line(parser->tmp, bb, APR_BLOCK_READ,
|
||||
HUGE_STRING_LEN);
|
||||
if (status == APR_SUCCESS) {
|
||||
--len;
|
||||
status = apr_brigade_flatten(parser->tmp, line, &len);
|
||||
if (status == APR_SUCCESS) {
|
||||
/* we assume a non-0 containing line and remove trailing crlf. */
|
||||
line[len] = '\0';
|
||||
if (len >= 2 && !strcmp(H2_CRLF, line + len - 2)) {
|
||||
len -= 2;
|
||||
line[len] = '\0';
|
||||
apr_brigade_cleanup(parser->tmp);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
|
||||
"h2_task(%s): read response line: %s",
|
||||
task->id, line);
|
||||
}
|
||||
else {
|
||||
/* this does not look like a complete line yet */
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
|
||||
"h2_task(%s): read response, incomplete line: %s",
|
||||
task->id, line);
|
||||
return APR_EAGAIN;
|
||||
}
|
||||
}
|
||||
}
|
||||
apr_brigade_cleanup(parser->tmp);
|
||||
return status;
|
||||
}
|
||||
|
||||
static apr_table_t *make_table(h2_response_parser *parser)
|
||||
{
|
||||
h2_task *task = parser->task;
|
||||
apr_array_header_t *hlines = parser->hlines;
|
||||
if (hlines) {
|
||||
apr_table_t *headers = apr_table_make(task->pool, hlines->nelts);
|
||||
int i;
|
||||
|
||||
for (i = 0; i < hlines->nelts; ++i) {
|
||||
char *hline = ((char **)hlines->elts)[i];
|
||||
char *sep = ap_strchr(hline, ':');
|
||||
if (!sep) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_EINVAL, task->c,
|
||||
APLOGNO(02955) "h2_task(%s): invalid header[%d] '%s'",
|
||||
task->id, i, (char*)hline);
|
||||
/* not valid format, abort */
|
||||
return NULL;
|
||||
}
|
||||
(*sep++) = '\0';
|
||||
while (*sep == ' ' || *sep == '\t') {
|
||||
++sep;
|
||||
}
|
||||
|
||||
if (!h2_util_ignore_header(hline)) {
|
||||
apr_table_merge(headers, hline, sep);
|
||||
}
|
||||
}
|
||||
return headers;
|
||||
}
|
||||
else {
|
||||
return apr_table_make(task->pool, 0);
|
||||
}
|
||||
}
|
||||
|
||||
static apr_status_t pass_response(h2_task *task, ap_filter_t *f,
|
||||
h2_response_parser *parser)
|
||||
{
|
||||
apr_bucket *b;
|
||||
apr_status_t status;
|
||||
|
||||
h2_headers *response = h2_headers_create(parser->http_status,
|
||||
make_table(parser),
|
||||
NULL, task->pool);
|
||||
apr_brigade_cleanup(parser->tmp);
|
||||
b = h2_bucket_headers_create(task->c->bucket_alloc, response);
|
||||
APR_BRIGADE_INSERT_TAIL(parser->tmp, b);
|
||||
b = apr_bucket_flush_create(task->c->bucket_alloc);
|
||||
APR_BRIGADE_INSERT_TAIL(parser->tmp, b);
|
||||
status = ap_pass_brigade(f->next, parser->tmp);
|
||||
apr_brigade_cleanup(parser->tmp);
|
||||
|
||||
parser->state = H2_RP_DONE;
|
||||
task->output.parse_response = 0;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
|
||||
APLOGNO(03197) "h2_task(%s): passed response %d",
|
||||
task->id, response->status);
|
||||
return status;
|
||||
}
|
||||
|
||||
static apr_status_t parse_status(h2_task *task, char *line)
|
||||
{
|
||||
h2_response_parser *parser = task->output.rparser;
|
||||
int sindex = (apr_date_checkmask(line, "HTTP/#.# ###*")? 9 :
|
||||
(apr_date_checkmask(line, "HTTP/# ###*")? 7 : 0));
|
||||
if (sindex > 0) {
|
||||
int k = sindex + 3;
|
||||
char keepchar = line[k];
|
||||
line[k] = '\0';
|
||||
parser->http_status = atoi(&line[sindex]);
|
||||
line[k] = keepchar;
|
||||
parser->state = H2_RP_HEADER_LINE;
|
||||
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, task->c, APLOGNO(03467)
|
||||
"h2_task(%s): unable to parse status line: %s",
|
||||
task->id, line);
|
||||
return APR_EINVAL;
|
||||
}
|
||||
|
||||
apr_status_t h2_from_h1_parse_response(h2_task *task, ap_filter_t *f,
|
||||
apr_bucket_brigade *bb)
|
||||
{
|
||||
h2_response_parser *parser = task->output.rparser;
|
||||
char line[HUGE_STRING_LEN];
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
|
||||
if (!parser) {
|
||||
parser = apr_pcalloc(task->pool, sizeof(*parser));
|
||||
parser->task = task;
|
||||
parser->state = H2_RP_STATUS_LINE;
|
||||
parser->hlines = apr_array_make(task->pool, 10, sizeof(char *));
|
||||
task->output.rparser = parser;
|
||||
}
|
||||
|
||||
while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
|
||||
switch (parser->state) {
|
||||
case H2_RP_STATUS_LINE:
|
||||
case H2_RP_HEADER_LINE:
|
||||
status = get_line(parser, bb, line, sizeof(line));
|
||||
if (status == APR_EAGAIN) {
|
||||
/* need more data */
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
else if (status != APR_SUCCESS) {
|
||||
return status;
|
||||
}
|
||||
if (parser->state == H2_RP_STATUS_LINE) {
|
||||
/* instead of parsing, just take it directly */
|
||||
status = parse_status(task, line);
|
||||
}
|
||||
else if (line[0] == '\0') {
|
||||
/* end of headers, pass response onward */
|
||||
return pass_response(task, f, parser);
|
||||
}
|
||||
else {
|
||||
status = parse_header(parser, line);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
return status;
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
apr_status_t h2_filter_headers_out(ap_filter_t *f, apr_bucket_brigade *bb)
|
||||
{
|
||||
h2_task *task = f->ctx;
|
||||
@@ -354,7 +558,6 @@ apr_status_t h2_filter_headers_out(ap_filter_t *f, apr_bucket_brigade *bb)
|
||||
|
||||
bresp = h2_bucket_headers_create(f->c->bucket_alloc, response);
|
||||
APR_BUCKET_INSERT_BEFORE(body_bucket, bresp);
|
||||
/*APR_BRIGADE_INSERT_HEAD(bb, bresp);*/
|
||||
task->output.sent_response = 1;
|
||||
r->sent_bodyct = 1;
|
||||
}
|
||||
@@ -383,7 +586,7 @@ apr_status_t h2_filter_headers_out(ap_filter_t *f, apr_bucket_brigade *bb)
|
||||
}
|
||||
|
||||
static void make_chunk(h2_task *task, apr_bucket_brigade *bb,
|
||||
apr_bucket *first, apr_uint64_t chunk_len,
|
||||
apr_bucket *first, apr_off_t chunk_len,
|
||||
apr_bucket *tail)
|
||||
{
|
||||
/* Surround the buckets [first, tail[ with new buckets carrying the
|
||||
@@ -394,7 +597,7 @@ static void make_chunk(h2_task *task, apr_bucket_brigade *bb,
|
||||
int len;
|
||||
|
||||
len = apr_snprintf(buffer, H2_ALEN(buffer),
|
||||
"%"APR_UINT64_T_HEX_FMT"\r\n", chunk_len);
|
||||
"%"APR_UINT64_T_HEX_FMT"\r\n", (apr_uint64_t)chunk_len);
|
||||
c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc);
|
||||
APR_BUCKET_INSERT_BEFORE(first, c);
|
||||
c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc);
|
||||
@@ -404,6 +607,10 @@ static void make_chunk(h2_task *task, apr_bucket_brigade *bb,
|
||||
else {
|
||||
APR_BRIGADE_INSERT_TAIL(bb, c);
|
||||
}
|
||||
task->input.chunked_total += chunk_len;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, task->c,
|
||||
"h2_task(%s): added chunk %"APR_OFF_T_FMT", total %"
|
||||
APR_OFF_T_FMT, task->id, chunk_len, task->input.chunked_total);
|
||||
}
|
||||
|
||||
static int ser_header(void *ctx, const char *name, const char *value)
|
||||
@@ -413,6 +620,100 @@ static int ser_header(void *ctx, const char *name, const char *value)
|
||||
return 1;
|
||||
}
|
||||
|
||||
static apr_status_t read_and_chunk(ap_filter_t *f, h2_task *task,
|
||||
apr_read_type_e block) {
|
||||
request_rec *r = f->r;
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
apr_bucket_brigade *bb = task->input.bbchunk;
|
||||
|
||||
if (!bb) {
|
||||
bb = apr_brigade_create(r->pool, f->c->bucket_alloc);
|
||||
task->input.bbchunk = bb;
|
||||
}
|
||||
|
||||
if (APR_BRIGADE_EMPTY(bb)) {
|
||||
apr_bucket *b, *next, *first_data = NULL;
|
||||
apr_bucket_brigade *tmp;
|
||||
apr_off_t bblen = 0;
|
||||
|
||||
/* get more data from the lower layer filters. Always do this
|
||||
* in larger pieces, since we handle the read modes ourself. */
|
||||
status = ap_get_brigade(f->next, bb,
|
||||
AP_MODE_READBYTES, block, 32*1024);
|
||||
if (status == APR_EOF) {
|
||||
if (!task->input.eos) {
|
||||
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
|
||||
task->input.eos = 1;
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
ap_remove_input_filter(f);
|
||||
return status;
|
||||
|
||||
}
|
||||
else if (status != APR_SUCCESS) {
|
||||
return status;
|
||||
}
|
||||
|
||||
for (b = APR_BRIGADE_FIRST(bb);
|
||||
b != APR_BRIGADE_SENTINEL(bb) && !task->input.eos;
|
||||
b = next) {
|
||||
next = APR_BUCKET_NEXT(b);
|
||||
if (APR_BUCKET_IS_METADATA(b)) {
|
||||
if (first_data) {
|
||||
make_chunk(task, bb, first_data, bblen, b);
|
||||
first_data = NULL;
|
||||
}
|
||||
|
||||
if (H2_BUCKET_IS_HEADERS(b)) {
|
||||
h2_headers *headers = h2_bucket_headers_get(b);
|
||||
|
||||
ap_assert(headers);
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
|
||||
"h2_task(%s): receiving trailers", task->id);
|
||||
tmp = apr_brigade_split_ex(bb, b, NULL);
|
||||
if (!apr_is_empty_table(headers->headers)) {
|
||||
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
|
||||
apr_table_do(ser_header, bb, headers->headers, NULL);
|
||||
status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
|
||||
}
|
||||
else {
|
||||
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
|
||||
}
|
||||
r->trailers_in = apr_table_clone(r->pool, headers->headers);
|
||||
APR_BUCKET_REMOVE(b);
|
||||
apr_bucket_destroy(b);
|
||||
APR_BRIGADE_CONCAT(bb, tmp);
|
||||
apr_brigade_destroy(tmp);
|
||||
task->input.eos = 1;
|
||||
}
|
||||
else if (APR_BUCKET_IS_EOS(b)) {
|
||||
tmp = apr_brigade_split_ex(bb, b, NULL);
|
||||
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
|
||||
APR_BRIGADE_CONCAT(bb, tmp);
|
||||
apr_brigade_destroy(tmp);
|
||||
task->input.eos = 1;
|
||||
}
|
||||
}
|
||||
else if (b->length == 0) {
|
||||
APR_BUCKET_REMOVE(b);
|
||||
apr_bucket_destroy(b);
|
||||
}
|
||||
else {
|
||||
if (!first_data) {
|
||||
first_data = b;
|
||||
bblen = 0;
|
||||
}
|
||||
bblen += b->length;
|
||||
}
|
||||
}
|
||||
|
||||
if (first_data) {
|
||||
make_chunk(task, bb, first_data, bblen, NULL);
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
apr_status_t h2_filter_request_in(ap_filter_t* f,
|
||||
apr_bucket_brigade* bb,
|
||||
ap_input_mode_t mode,
|
||||
@@ -422,9 +723,10 @@ apr_status_t h2_filter_request_in(ap_filter_t* f,
|
||||
h2_task *task = f->ctx;
|
||||
request_rec *r = f->r;
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
apr_bucket *b, *next, *first_data = NULL;
|
||||
apr_off_t bblen = 0;
|
||||
apr_bucket *b, *next;
|
||||
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r,
|
||||
"h2_task(%s): request filter, exp=%d", task->id, r->expecting_100);
|
||||
if (!task->input.chunked) {
|
||||
status = ap_get_brigade(f->next, bb, mode, block, readbytes);
|
||||
/* pipe data through, just take care of trailers */
|
||||
@@ -451,90 +753,8 @@ apr_status_t h2_filter_request_in(ap_filter_t* f,
|
||||
* transfer encoding and trailers.
|
||||
* We need to simulate chunked encoding for it to be happy.
|
||||
*/
|
||||
|
||||
if (!task->input.bbchunk) {
|
||||
task->input.bbchunk = apr_brigade_create(r->pool, f->c->bucket_alloc);
|
||||
}
|
||||
if (APR_BRIGADE_EMPTY(task->input.bbchunk)) {
|
||||
/* get more data from the lower layer filters. Always do this
|
||||
* in larger pieces, since we handle the read modes ourself.
|
||||
*/
|
||||
status = ap_get_brigade(f->next, task->input.bbchunk,
|
||||
AP_MODE_READBYTES, block, 32*1024);
|
||||
if (status == APR_EOF) {
|
||||
if (!task->input.eos) {
|
||||
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
|
||||
task->input.eos = 1;
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
ap_remove_input_filter(f);
|
||||
if ((status = read_and_chunk(f, task, block)) != APR_SUCCESS) {
|
||||
return status;
|
||||
|
||||
}
|
||||
else if (status != APR_SUCCESS) {
|
||||
return status;
|
||||
}
|
||||
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
|
||||
"h2_task(%s): trailers_in inspecting brigade", task->id);
|
||||
for (b = APR_BRIGADE_FIRST(task->input.bbchunk);
|
||||
b != APR_BRIGADE_SENTINEL(task->input.bbchunk) && !task->input.eos;
|
||||
b = next) {
|
||||
next = APR_BUCKET_NEXT(b);
|
||||
if (APR_BUCKET_IS_METADATA(b)) {
|
||||
if (first_data) {
|
||||
make_chunk(task, task->input.bbchunk, first_data, bblen, b);
|
||||
first_data = NULL;
|
||||
bblen = 0;
|
||||
}
|
||||
|
||||
if (H2_BUCKET_IS_HEADERS(b)) {
|
||||
apr_bucket_brigade *tmp;
|
||||
h2_headers *headers = h2_bucket_headers_get(b);
|
||||
|
||||
ap_assert(headers);
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
|
||||
"h2_task(%s): receiving trailers", task->id);
|
||||
tmp = apr_brigade_split_ex(task->input.bbchunk, b, NULL);
|
||||
if (!apr_is_empty_table(headers->headers)) {
|
||||
status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "0\r\n");
|
||||
apr_table_do(ser_header, task->input.bbchunk, headers->headers, NULL);
|
||||
status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "\r\n");
|
||||
}
|
||||
else {
|
||||
status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "0\r\n\r\n");
|
||||
}
|
||||
APR_BRIGADE_CONCAT(task->input.bbchunk, tmp);
|
||||
apr_brigade_destroy(tmp);
|
||||
r->trailers_in = apr_table_clone(r->pool, headers->headers);
|
||||
APR_BUCKET_REMOVE(b);
|
||||
apr_bucket_destroy(b);
|
||||
task->input.eos = 1;
|
||||
}
|
||||
else if (APR_BUCKET_IS_EOS(b)) {
|
||||
apr_bucket_brigade *tmp = apr_brigade_split_ex(task->input.bbchunk, b, NULL);
|
||||
status = apr_brigade_puts(task->input.bbchunk, NULL, NULL, "0\r\n\r\n");
|
||||
APR_BRIGADE_CONCAT(task->input.bbchunk, tmp);
|
||||
apr_brigade_destroy(tmp);
|
||||
task->input.eos = 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
else if (b->length == 0) {
|
||||
APR_BUCKET_REMOVE(b);
|
||||
apr_bucket_destroy(b);
|
||||
}
|
||||
else {
|
||||
if (!first_data) {
|
||||
first_data = b;
|
||||
}
|
||||
bblen += b->length;
|
||||
}
|
||||
}
|
||||
|
||||
if (first_data) {
|
||||
make_chunk(task, task->input.bbchunk, first_data, bblen, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
if (mode == AP_MODE_EXHAUSTIVE) {
|
||||
|
@@ -33,6 +33,9 @@
|
||||
struct h2_headers;
|
||||
struct h2_task;
|
||||
|
||||
apr_status_t h2_from_h1_parse_response(struct h2_task *task, ap_filter_t *f,
|
||||
apr_bucket_brigade *bb);
|
||||
|
||||
apr_status_t h2_filter_headers_out(ap_filter_t *f, apr_bucket_brigade *bb);
|
||||
|
||||
apr_status_t h2_filter_request_in(ap_filter_t* f,
|
||||
|
@@ -1195,7 +1195,7 @@ static apr_status_t unschedule_slow_tasks(h2_mplx *m)
|
||||
/* Try to get rid of streams that occupy workers. Look for safe requests
|
||||
* that are repeatable. If none found, fail the connection.
|
||||
*/
|
||||
n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks));
|
||||
n = (m->workers_busy - m->workers_limit - (int)h2_ihash_count(m->redo_tasks));
|
||||
while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
|
||||
h2_task_rst(task, H2_ERR_CANCEL);
|
||||
h2_ihash_add(m->redo_tasks, task);
|
||||
|
@@ -254,7 +254,7 @@ apr_status_t h2_ngn_shed_pull_request(h2_ngn_shed *shed,
|
||||
|
||||
AP_DEBUG_ASSERT(ngn);
|
||||
*pr = NULL;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, APLOGNO(03396)
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, shed->c, APLOGNO(03396)
|
||||
"h2_ngn_shed(%ld): pull task for engine %s, shutdown=%d",
|
||||
shed->c->id, ngn->id, want_shutdown);
|
||||
if (shed->aborted) {
|
||||
|
@@ -40,12 +40,14 @@ typedef struct h2_proxy_stream {
|
||||
|
||||
h2_stream_state_t state;
|
||||
unsigned int suspended : 1;
|
||||
unsigned int data_sent : 1;
|
||||
unsigned int data_received : 1;
|
||||
unsigned int waiting_on_100 : 1;
|
||||
unsigned int waiting_on_ping : 1;
|
||||
uint32_t error_code;
|
||||
|
||||
apr_bucket_brigade *input;
|
||||
apr_off_t data_sent;
|
||||
apr_bucket_brigade *output;
|
||||
apr_off_t data_received;
|
||||
|
||||
apr_table_t *saves;
|
||||
} h2_proxy_stream;
|
||||
@@ -53,6 +55,9 @@ typedef struct h2_proxy_stream {
|
||||
|
||||
static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
|
||||
int arg, const char *msg);
|
||||
static void ping_arrived(h2_proxy_session *session);
|
||||
static apr_status_t check_suspended(h2_proxy_session *session);
|
||||
static void stream_resume(h2_proxy_stream *stream);
|
||||
|
||||
|
||||
static apr_status_t proxy_session_pre_close(void *theconn)
|
||||
@@ -131,6 +136,8 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
|
||||
void *user_data)
|
||||
{
|
||||
h2_proxy_session *session = user_data;
|
||||
h2_proxy_stream *stream;
|
||||
request_rec *r;
|
||||
int n;
|
||||
|
||||
if (APLOGcdebug(session->c)) {
|
||||
@@ -142,8 +149,29 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
|
||||
session->id, buffer);
|
||||
}
|
||||
|
||||
session->last_frame_received = apr_time_now();
|
||||
switch (frame->hd.type) {
|
||||
case NGHTTP2_HEADERS:
|
||||
stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
|
||||
if (!stream) {
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
}
|
||||
r = stream->r;
|
||||
if (r->status >= 100 && r->status < 200) {
|
||||
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
|
||||
"h2_proxy_session(%s): got interim HEADERS, status=%d",
|
||||
session->id, r->status);
|
||||
r->status_line = ap_get_status_line(r->status);
|
||||
ap_send_interim_response(r, 1);
|
||||
}
|
||||
stream->waiting_on_100 = 0;
|
||||
stream_resume(stream);
|
||||
break;
|
||||
case NGHTTP2_PING:
|
||||
if (session->check_ping) {
|
||||
session->check_ping = 0;
|
||||
ping_arrived(session);
|
||||
}
|
||||
break;
|
||||
case NGHTTP2_PUSH_PROMISE:
|
||||
break;
|
||||
@@ -320,7 +348,7 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
|
||||
}
|
||||
}
|
||||
|
||||
static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
|
||||
static int stream_response_data(nghttp2_session *ngh2, uint8_t flags,
|
||||
int32_t stream_id, const uint8_t *data,
|
||||
size_t len, void *user_data)
|
||||
{
|
||||
@@ -342,8 +370,8 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
|
||||
/* last chance to manipulate response headers.
|
||||
* after this, only trailers */
|
||||
h2_proxy_stream_end_headers_out(stream);
|
||||
stream->data_received = 1;
|
||||
}
|
||||
stream->data_received += len;
|
||||
|
||||
b = apr_bucket_transient_create((const char*)data, len,
|
||||
stream->r->connection->bucket_alloc);
|
||||
@@ -353,10 +381,11 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
|
||||
b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
|
||||
APR_BRIGADE_INSERT_TAIL(stream->output, b);
|
||||
|
||||
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, APLOGNO(03359)
|
||||
"h2_proxy_session(%s): pass response data for "
|
||||
"stream %d, %d bytes", session->id, stream_id, (int)len);
|
||||
status = ap_pass_brigade(stream->r->output_filters, stream->output);
|
||||
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03359)
|
||||
"h2_proxy_session(%s): stream=%d, response DATA %ld, %"
|
||||
APR_OFF_T_FMT " total", session->id, stream_id, (long)len,
|
||||
stream->data_received);
|
||||
if (status != APR_SUCCESS) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
|
||||
"h2_proxy_session(%s): passing output on stream %d",
|
||||
@@ -417,7 +446,7 @@ static int on_header(nghttp2_session *ngh2, const nghttp2_frame *frame,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
|
||||
static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id,
|
||||
uint8_t *buf, size_t length,
|
||||
uint32_t *data_flags,
|
||||
nghttp2_data_source *source, void *user_data)
|
||||
@@ -434,7 +463,17 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
}
|
||||
|
||||
if (APR_BRIGADE_EMPTY(stream->input)) {
|
||||
if (stream->session->check_ping) {
|
||||
/* suspend until we hear from the other side */
|
||||
stream->waiting_on_ping = 1;
|
||||
status = APR_EAGAIN;
|
||||
}
|
||||
else if (stream->r->expecting_100) {
|
||||
/* suspend until the answer comes */
|
||||
stream->waiting_on_100 = 1;
|
||||
status = APR_EAGAIN;
|
||||
}
|
||||
else if (APR_BRIGADE_EMPTY(stream->input)) {
|
||||
status = ap_get_brigade(stream->r->input_filters, stream->input,
|
||||
AP_MODE_READBYTES, APR_NONBLOCK_READ,
|
||||
H2MAX(APR_BUCKET_BUFF_SIZE, length));
|
||||
@@ -476,10 +515,12 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
|
||||
apr_bucket_delete(b);
|
||||
}
|
||||
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r,
|
||||
"h2_proxy_stream(%d): request body read %ld bytes, flags=%d",
|
||||
stream->id, (long)readlen, (int)*data_flags);
|
||||
stream->data_sent = 1;
|
||||
stream->data_sent += readlen;
|
||||
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03468)
|
||||
"h2_proxy_stream(%d): request DATA %ld, %"
|
||||
APR_OFF_T_FMT" total, flags=%d",
|
||||
stream->id, (long)readlen, stream->data_sent,
|
||||
(int)*data_flags);
|
||||
return readlen;
|
||||
}
|
||||
else if (APR_STATUS_IS_EAGAIN(status)) {
|
||||
@@ -498,6 +539,27 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef H2_NG2_INVALID_HEADER_CB
|
||||
static int on_invalid_header_cb(nghttp2_session *ngh2,
|
||||
const nghttp2_frame *frame,
|
||||
const uint8_t *name, size_t namelen,
|
||||
const uint8_t *value, size_t valuelen,
|
||||
uint8_t flags, void *user_data)
|
||||
{
|
||||
h2_proxy_session *session = user_data;
|
||||
if (APLOGcdebug(session->c)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03469)
|
||||
"h2_proxy_session(%s-%d): denying stream with invalid header "
|
||||
"'%s: %s'", session->id, (int)frame->hd.stream_id,
|
||||
apr_pstrndup(session->pool, (const char *)name, namelen),
|
||||
apr_pstrndup(session->pool, (const char *)value, valuelen));
|
||||
}
|
||||
return nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
|
||||
frame->hd.stream_id,
|
||||
NGHTTP2_PROTOCOL_ERROR);
|
||||
}
|
||||
#endif
|
||||
|
||||
h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
|
||||
proxy_server_conf *conf,
|
||||
unsigned char window_bits_connection,
|
||||
@@ -531,11 +593,14 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
|
||||
|
||||
nghttp2_session_callbacks_new(&cbs);
|
||||
nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv);
|
||||
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, on_data_chunk_recv);
|
||||
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, stream_response_data);
|
||||
nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close);
|
||||
nghttp2_session_callbacks_set_on_header_callback(cbs, on_header);
|
||||
nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
|
||||
nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
|
||||
#ifdef H2_NG2_INVALID_HEADER_CB
|
||||
nghttp2_session_callbacks_set_on_invalid_header_callback(cbs, on_invalid_header_cb);
|
||||
#endif
|
||||
|
||||
nghttp2_option_new(&option);
|
||||
nghttp2_option_set_peer_max_concurrent_streams(option, 100);
|
||||
@@ -549,6 +614,14 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362)
|
||||
"setup session for %s", p_conn->hostname);
|
||||
}
|
||||
else {
|
||||
h2_proxy_session *session = p_conn->data;
|
||||
apr_interval_time_t age = apr_time_now() - session->last_frame_received;
|
||||
if (age > apr_time_from_sec(1)) {
|
||||
session->check_ping = 1;
|
||||
nghttp2_submit_ping(session->ngh2, 0, (const uint8_t *)"nevergonnagiveyouup");
|
||||
}
|
||||
}
|
||||
return p_conn->data;
|
||||
}
|
||||
|
||||
@@ -634,33 +707,38 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st
|
||||
h2_proxy_ngheader *hd;
|
||||
nghttp2_data_provider *pp = NULL;
|
||||
nghttp2_data_provider provider;
|
||||
int rv;
|
||||
int rv, may_have_request_body = 1;
|
||||
apr_status_t status;
|
||||
|
||||
hd = h2_proxy_util_nghd_make_req(stream->pool, stream->req);
|
||||
|
||||
/* If we expect a 100-continue response, we must refrain from reading
|
||||
any input until we get it. Reading the input will possibly trigger
|
||||
HTTP_IN filter to generate the 100-continue itself. */
|
||||
if (stream->waiting_on_100 || stream->waiting_on_ping) {
|
||||
/* make a small test if we get an EOF/EOS immediately */
|
||||
status = ap_get_brigade(stream->r->input_filters, stream->input,
|
||||
AP_MODE_READBYTES, APR_NONBLOCK_READ,
|
||||
APR_BUCKET_BUFF_SIZE);
|
||||
if ((status == APR_SUCCESS && !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(stream->input)))
|
||||
|| APR_STATUS_IS_EAGAIN(status)) {
|
||||
/* there might be data coming */
|
||||
may_have_request_body = APR_STATUS_IS_EAGAIN(status)
|
||||
|| (status == APR_SUCCESS
|
||||
&& !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(stream->input)));
|
||||
}
|
||||
|
||||
if (may_have_request_body) {
|
||||
provider.source.fd = 0;
|
||||
provider.source.ptr = NULL;
|
||||
provider.read_callback = stream_data_read;
|
||||
provider.read_callback = stream_request_data;
|
||||
pp = &provider;
|
||||
}
|
||||
|
||||
rv = nghttp2_submit_request(session->ngh2, NULL,
|
||||
hd->nv, hd->nvlen, pp, stream);
|
||||
|
||||
if (APLOGcdebug(session->c)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03363)
|
||||
"h2_proxy_session(%s): submit %s%s -> %d",
|
||||
session->id, stream->req->authority, stream->req->path,
|
||||
rv);
|
||||
}
|
||||
|
||||
if (rv > 0) {
|
||||
stream->id = rv;
|
||||
stream->state = H2_STREAM_ST_OPEN;
|
||||
@@ -747,7 +825,7 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
|
||||
AP_MODE_READBYTES,
|
||||
block? APR_BLOCK_READ : APR_NONBLOCK_READ,
|
||||
64 * 1024);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
|
||||
"h2_proxy_session(%s): read from conn", session->id);
|
||||
if (socket && save_timeout != -1) {
|
||||
apr_socket_timeout_set(socket, save_timeout);
|
||||
@@ -788,6 +866,18 @@ apr_status_t h2_proxy_session_submit(h2_proxy_session *session,
|
||||
return status;
|
||||
}
|
||||
|
||||
static void stream_resume(h2_proxy_stream *stream)
|
||||
{
|
||||
h2_proxy_session *session = stream->session;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
|
||||
"h2_proxy_stream(%s-%d): resuming",
|
||||
session->id, stream->id);
|
||||
stream->suspended = 0;
|
||||
h2_proxy_iq_remove(session->suspended, stream->id);
|
||||
nghttp2_session_resume_data(session->ngh2, stream->id);
|
||||
dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
|
||||
}
|
||||
|
||||
static apr_status_t check_suspended(h2_proxy_session *session)
|
||||
{
|
||||
h2_proxy_stream *stream;
|
||||
@@ -798,17 +888,16 @@ static apr_status_t check_suspended(h2_proxy_session *session)
|
||||
stream_id = session->suspended->elts[i];
|
||||
stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
|
||||
if (stream) {
|
||||
if (stream->waiting_on_100 || stream->waiting_on_ping) {
|
||||
status = APR_EAGAIN;
|
||||
}
|
||||
else {
|
||||
status = ap_get_brigade(stream->r->input_filters, stream->input,
|
||||
AP_MODE_READBYTES, APR_NONBLOCK_READ,
|
||||
APR_BUCKET_BUFF_SIZE);
|
||||
}
|
||||
if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
|
||||
"h2_proxy_stream(%s-%d): resuming",
|
||||
session->id, stream_id);
|
||||
stream->suspended = 0;
|
||||
h2_proxy_iq_remove(session->suspended, stream_id);
|
||||
nghttp2_session_resume_data(session->ngh2, stream_id);
|
||||
dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
|
||||
stream_resume(stream);
|
||||
check_suspended(session);
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
@@ -816,8 +905,7 @@ static apr_status_t check_suspended(h2_proxy_session *session)
|
||||
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c,
|
||||
APLOGNO(03382) "h2_proxy_stream(%s-%d): check input",
|
||||
session->id, stream_id);
|
||||
h2_proxy_iq_remove(session->suspended, stream_id);
|
||||
dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
|
||||
stream_resume(stream);
|
||||
check_suspended(session);
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
@@ -1052,13 +1140,16 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id,
|
||||
int complete = (stream->error_code == 0);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
|
||||
"h2_proxy_sesssion(%s): stream(%d) closed "
|
||||
"(complete=%d, touched=%d)",
|
||||
session->id, stream_id, complete, touched);
|
||||
"(touched=%d, error=%d)",
|
||||
session->id, stream_id, touched, stream->error_code);
|
||||
|
||||
if (complete && !stream->data_received) {
|
||||
if (!complete) {
|
||||
stream->r->status = 500;
|
||||
}
|
||||
else if (!stream->data_received) {
|
||||
apr_bucket *b;
|
||||
/* if the response had no body, this is the time to flush
|
||||
* an empty brigade which will also "write" the resonse
|
||||
* an empty brigade which will also write the resonse
|
||||
* headers */
|
||||
h2_proxy_stream_end_headers_out(stream);
|
||||
stream->data_received = 1;
|
||||
@@ -1247,7 +1338,7 @@ run_loop:
|
||||
}
|
||||
|
||||
status = h2_proxy_session_read(session, 1, session->wait_timeout);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
|
||||
APLOGNO(03365)
|
||||
"h2_proxy_session(%s): WAIT read, timeout=%fms",
|
||||
session->id, (float)session->wait_timeout/1000.0);
|
||||
@@ -1299,7 +1390,7 @@ static int done_iter(void *udata, void *val)
|
||||
{
|
||||
cleanup_iter_ctx *ctx = udata;
|
||||
h2_proxy_stream *stream = val;
|
||||
int touched = (!ctx->session->last_stream_id ||
|
||||
int touched = (stream->data_sent ||
|
||||
stream->id <= ctx->session->last_stream_id);
|
||||
ctx->done(ctx->session, stream->r, 0, touched);
|
||||
return 1;
|
||||
@@ -1308,7 +1399,7 @@ static int done_iter(void *udata, void *val)
|
||||
void h2_proxy_session_cleanup(h2_proxy_session *session,
|
||||
h2_proxy_request_done *done)
|
||||
{
|
||||
if (session->streams && !h2_proxy_ihash_empty(session->streams)) {
|
||||
if (!h2_proxy_ihash_empty(session->streams)) {
|
||||
cleanup_iter_ctx ctx;
|
||||
ctx.session = session;
|
||||
ctx.done = done;
|
||||
@@ -1320,6 +1411,26 @@ void h2_proxy_session_cleanup(h2_proxy_session *session,
|
||||
}
|
||||
}
|
||||
|
||||
static int ping_arrived_iter(void *udata, void *val)
|
||||
{
|
||||
h2_proxy_stream *stream = val;
|
||||
if (stream->waiting_on_ping) {
|
||||
stream->waiting_on_ping = 0;
|
||||
stream_resume(stream);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void ping_arrived(h2_proxy_session *session)
|
||||
{
|
||||
if (!h2_proxy_ihash_empty(session->streams)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03470)
|
||||
"h2_proxy_session(%s): ping arrived, unblocking streams",
|
||||
session->id);
|
||||
h2_proxy_ihash_iter(session->streams, ping_arrived_iter, &session);
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
h2_proxy_session *session;
|
||||
conn_rec *c;
|
||||
@@ -1347,7 +1458,7 @@ static int win_update_iter(void *udata, void *val)
|
||||
void h2_proxy_session_update_window(h2_proxy_session *session,
|
||||
conn_rec *c, apr_off_t bytes)
|
||||
{
|
||||
if (session->streams && !h2_proxy_ihash_empty(session->streams)) {
|
||||
if (!h2_proxy_ihash_empty(session->streams)) {
|
||||
win_update_ctx ctx;
|
||||
ctx.session = session;
|
||||
ctx.c = c;
|
||||
|
@@ -63,6 +63,7 @@ struct h2_proxy_session {
|
||||
nghttp2_session *ngh2; /* the nghttp2 session itself */
|
||||
|
||||
unsigned int aborted : 1;
|
||||
unsigned int check_ping : 1;
|
||||
|
||||
h2_proxy_request_done *done;
|
||||
void *user_data;
|
||||
@@ -77,6 +78,7 @@ struct h2_proxy_session {
|
||||
struct h2_proxy_iqueue *suspended;
|
||||
apr_size_t remote_max_concurrent;
|
||||
int last_stream_id; /* last stream id processed by backend, or 0 */
|
||||
apr_time_t last_frame_received;
|
||||
|
||||
apr_bucket_brigade *input;
|
||||
apr_bucket_brigade *output;
|
||||
|
@@ -458,7 +458,6 @@ typedef struct {
|
||||
#define H2_LIT_ARGS(a) (a),H2_ALEN(a)
|
||||
|
||||
static literal IgnoredRequestHeaders[] = {
|
||||
H2_DEF_LITERAL("expect"),
|
||||
H2_DEF_LITERAL("upgrade"),
|
||||
H2_DEF_LITERAL("connection"),
|
||||
H2_DEF_LITERAL("keep-alive"),
|
||||
|
@@ -163,10 +163,10 @@ static char *mk_str(link_ctx *ctx, size_t end)
|
||||
if (ctx->i < end) {
|
||||
return apr_pstrndup(ctx->pool, ctx->s + ctx->i, end - ctx->i);
|
||||
}
|
||||
return "";
|
||||
return (char*)"";
|
||||
}
|
||||
|
||||
static int read_qstring(link_ctx *ctx, char **ps)
|
||||
static int read_qstring(link_ctx *ctx, const char **ps)
|
||||
{
|
||||
if (skip_ws(ctx) && read_chr(ctx, '\"')) {
|
||||
size_t end;
|
||||
@@ -179,7 +179,7 @@ static int read_qstring(link_ctx *ctx, char **ps)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int read_ptoken(link_ctx *ctx, char **ps)
|
||||
static int read_ptoken(link_ctx *ctx, const char **ps)
|
||||
{
|
||||
if (skip_ws(ctx)) {
|
||||
size_t i;
|
||||
@@ -209,7 +209,7 @@ static int read_link(link_ctx *ctx)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int read_pname(link_ctx *ctx, char **pname)
|
||||
static int read_pname(link_ctx *ctx, const char **pname)
|
||||
{
|
||||
if (skip_ws(ctx)) {
|
||||
size_t i;
|
||||
@@ -225,7 +225,7 @@ static int read_pname(link_ctx *ctx, char **pname)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int read_pvalue(link_ctx *ctx, char **pvalue)
|
||||
static int read_pvalue(link_ctx *ctx, const char **pvalue)
|
||||
{
|
||||
if (skip_ws(ctx) && read_chr(ctx, '=')) {
|
||||
if (read_qstring(ctx, pvalue) || read_ptoken(ctx, pvalue)) {
|
||||
@@ -238,7 +238,7 @@ static int read_pvalue(link_ctx *ctx, char **pvalue)
|
||||
static int read_param(link_ctx *ctx)
|
||||
{
|
||||
if (skip_ws(ctx) && read_chr(ctx, ';')) {
|
||||
char *name, *value = "";
|
||||
const char *name, *value = "";
|
||||
if (read_pname(ctx, &name)) {
|
||||
read_pvalue(ctx, &value); /* value is optional */
|
||||
apr_table_setn(ctx->params, name, value);
|
||||
@@ -530,9 +530,9 @@ static unsigned int val_apr_hash(const char *str)
|
||||
static void calc_apr_hash(h2_push_diary *diary, apr_uint64_t *phash, h2_push *push)
|
||||
{
|
||||
apr_uint64_t val;
|
||||
#if APR_UINT64MAX > APR_UINT_MAX
|
||||
val = (val_apr_hash(push->req->scheme) << 32);
|
||||
val ^= (val_apr_hash(push->req->authority) << 16);
|
||||
#if APR_UINT64_MAX > UINT_MAX
|
||||
val = ((apr_uint64_t)(val_apr_hash(push->req->scheme)) << 32);
|
||||
val ^= ((apr_uint64_t)(val_apr_hash(push->req->authority)) << 16);
|
||||
val ^= val_apr_hash(push->req->path);
|
||||
#else
|
||||
val = val_apr_hash(push->req->scheme);
|
||||
@@ -555,7 +555,7 @@ static apr_int32_t ceil_power_of_2(apr_int32_t n)
|
||||
}
|
||||
|
||||
static h2_push_diary *diary_create(apr_pool_t *p, h2_push_digest_type dtype,
|
||||
apr_size_t N)
|
||||
int N)
|
||||
{
|
||||
h2_push_diary *diary = NULL;
|
||||
|
||||
@@ -590,7 +590,7 @@ static h2_push_diary *diary_create(apr_pool_t *p, h2_push_digest_type dtype,
|
||||
return diary;
|
||||
}
|
||||
|
||||
h2_push_diary *h2_push_diary_create(apr_pool_t *p, apr_size_t N)
|
||||
h2_push_diary *h2_push_diary_create(apr_pool_t *p, int N)
|
||||
{
|
||||
return diary_create(p, H2_PUSH_DIGEST_SHA256, N);
|
||||
}
|
||||
@@ -818,7 +818,7 @@ apr_status_t h2_push_diary_digest_get(h2_push_diary *diary, apr_pool_t *pool,
|
||||
int maxP, const char *authority,
|
||||
const char **pdata, apr_size_t *plen)
|
||||
{
|
||||
apr_size_t nelts, N, i;
|
||||
int nelts, N, i;
|
||||
unsigned char log2n, log2pmax;
|
||||
gset_encoder encoder;
|
||||
apr_uint64_t *hashes;
|
||||
@@ -968,7 +968,7 @@ apr_status_t h2_push_diary_digest_set(h2_push_diary *diary, const char *authorit
|
||||
{
|
||||
gset_decoder decoder;
|
||||
unsigned char log2n, log2p;
|
||||
apr_size_t N, i;
|
||||
int N, i;
|
||||
apr_pool_t *pool = diary->entries->pool;
|
||||
h2_push_diary_entry e;
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
|
@@ -68,7 +68,7 @@ apr_array_header_t *h2_push_collect(apr_pool_t *p,
|
||||
* @param N the max number of entries, rounded up to 2^x
|
||||
* @return the created diary, might be NULL of max_entries is 0
|
||||
*/
|
||||
h2_push_diary *h2_push_diary_create(apr_pool_t *p, apr_size_t N);
|
||||
h2_push_diary *h2_push_diary_create(apr_pool_t *p, int N);
|
||||
|
||||
/**
|
||||
* Filters the given pushes against the diary and returns only those pushes
|
||||
|
@@ -187,6 +187,16 @@ apr_status_t h2_request_end_headers(h2_request *req, apr_pool_t *pool, int eos)
|
||||
}
|
||||
}
|
||||
|
||||
s = apr_table_get(req->headers, "Expect");
|
||||
if (s && s[0]) {
|
||||
if (ap_cstr_casecmp(s, "100-continue") == 0) {
|
||||
req->expect_100 = 1;
|
||||
}
|
||||
else {
|
||||
req->expect_failed = 1;
|
||||
}
|
||||
}
|
||||
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
||||
@@ -204,7 +214,6 @@ h2_request *h2_request_clone(apr_pool_t *p, const h2_request *src)
|
||||
request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
|
||||
{
|
||||
int access_status = HTTP_OK;
|
||||
const char *expect;
|
||||
const char *rpath;
|
||||
|
||||
request_rec *r = ap_create_request(c);
|
||||
@@ -224,7 +233,7 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
|
||||
|
||||
rpath = (req->path ? req->path : "");
|
||||
ap_parse_uri(r, rpath);
|
||||
r->protocol = "HTTP/2.0";
|
||||
r->protocol = (char*)"HTTP/2.0";
|
||||
r->proto_num = HTTP_VERSION(2, 0);
|
||||
|
||||
r->the_request = apr_psprintf(r->pool, "%s %s %s",
|
||||
@@ -241,17 +250,13 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *c)
|
||||
/* we may have switched to another server */
|
||||
r->per_dir_config = r->server->lookup_defaults;
|
||||
|
||||
if (r && ((expect = apr_table_get(r->headers_in, "Expect")) != NULL)
|
||||
&& (expect[0] != '\0')) {
|
||||
if (ap_cstr_casecmp(expect, "100-continue") == 0) {
|
||||
if (req->expect_100) {
|
||||
r->expecting_100 = 1;
|
||||
ap_add_input_filter("H2_CONTINUE", NULL, r, c);
|
||||
}
|
||||
else {
|
||||
else if (req->expect_failed) {
|
||||
r->status = HTTP_EXPECTATION_FAILED;
|
||||
ap_send_error_response(r, 0);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Add the HTTP_IN filter here to ensure that ap_discard_request_body
|
||||
|
@@ -1290,7 +1290,7 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
|
||||
nghttp2_priority_spec ps;
|
||||
int id_parent, id_grandpa, w_parent, w;
|
||||
int rv = 0;
|
||||
char *ptype = "AFTER";
|
||||
const char *ptype = "AFTER";
|
||||
h2_dependency dep = prio->dependency;
|
||||
|
||||
id_parent = nghttp2_stream_get_stream_id(s_parent);
|
||||
@@ -1420,18 +1420,13 @@ static apr_status_t on_stream_headers(h2_session *session, h2_stream *stream,
|
||||
ap_assert(session);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
|
||||
"h2_stream(%ld-%d): on_headers", session->id, stream->id);
|
||||
if (!headers) {
|
||||
int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466)
|
||||
"h2_stream(%ld-%d): RST_STREAM, err=%d",
|
||||
session->id, stream->id, err);
|
||||
if (headers->status < 100) {
|
||||
int err = H2_STREAM_RST(stream, headers->status);
|
||||
rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
|
||||
stream->id, err);
|
||||
goto leave;
|
||||
}
|
||||
else if (headers->status < 100) {
|
||||
rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
|
||||
stream->id, headers->status);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
|
||||
"h2_stream(%ld-%d): unpexected header status %d, stream rst",
|
||||
session->id, stream->id, headers->status);
|
||||
goto leave;
|
||||
}
|
||||
else if (stream->has_response) {
|
||||
@@ -1555,25 +1550,35 @@ static apr_status_t on_stream_resume(void *ctx, h2_stream *stream)
|
||||
int rv;
|
||||
apr_off_t len = 0;
|
||||
int eos = 0;
|
||||
h2_headers *headers = NULL;
|
||||
h2_headers *headers;
|
||||
|
||||
ap_assert(stream);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
|
||||
"h2_stream(%ld-%d): on_resume", session->id, stream->id);
|
||||
|
||||
send_headers:
|
||||
headers = NULL;
|
||||
status = h2_stream_out_prepare(stream, &len, &eos, &headers);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
|
||||
"h2_stream(%ld-%d): prepared len=%ld, eos=%d",
|
||||
session->id, stream->id, (long)len, eos);
|
||||
if (headers) {
|
||||
status = on_stream_headers(session, stream, headers, len, eos);
|
||||
if (status != APR_SUCCESS) {
|
||||
if (status != APR_SUCCESS || stream->rst_error) {
|
||||
return status;
|
||||
}
|
||||
goto send_headers;
|
||||
}
|
||||
else if (status != APR_EAGAIN) {
|
||||
if (!stream->has_response) {
|
||||
int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03466)
|
||||
"h2_stream(%ld-%d): no response, RST_STREAM, err=%d",
|
||||
session->id, stream->id, err);
|
||||
nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
|
||||
stream->id, err);
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
rv = nghttp2_session_resume_data(session->ngh2, stream->id);
|
||||
session->have_written = 1;
|
||||
ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
|
||||
@@ -1598,7 +1603,7 @@ static apr_status_t h2_session_receive(void *ctx, const char *data,
|
||||
n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len);
|
||||
if (n < 0) {
|
||||
if (nghttp2_is_fatal((int)n)) {
|
||||
dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror(n));
|
||||
dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror((int)n));
|
||||
return APR_EGENERAL;
|
||||
}
|
||||
}
|
||||
@@ -2147,12 +2152,18 @@ apr_status_t h2_session_process(h2_session *session, int async)
|
||||
}
|
||||
/* continue reading handling */
|
||||
}
|
||||
else if (APR_STATUS_IS_ECONNABORTED(status)
|
||||
|| APR_STATUS_IS_ECONNRESET(status)
|
||||
|| APR_STATUS_IS_EOF(status)
|
||||
|| APR_STATUS_IS_EBADF(status)) {
|
||||
ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
|
||||
"h2_session(%ld): input gone", session->id);
|
||||
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
|
||||
}
|
||||
else {
|
||||
if (trace) {
|
||||
ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
|
||||
"h2_session(%ld): idle(1 sec timeout) "
|
||||
"read failed", session->id);
|
||||
}
|
||||
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
|
||||
}
|
||||
}
|
||||
|
@@ -54,7 +54,7 @@ static int state_transition[][7] = {
|
||||
/*CL*/{ 1, 1, 0, 0, 1, 1, 1 },
|
||||
};
|
||||
|
||||
static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, char *tag)
|
||||
static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, const char *tag)
|
||||
{
|
||||
if (APLOG_C_IS_LEVEL(s->session->c, lvl)) {
|
||||
conn_rec *c = s->session->c;
|
||||
@@ -645,7 +645,8 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen,
|
||||
b = APR_BRIGADE_FIRST(stream->buffer);
|
||||
while (b != APR_BRIGADE_SENTINEL(stream->buffer)) {
|
||||
e = APR_BUCKET_NEXT(b);
|
||||
if (APR_BUCKET_IS_FLUSH(b)) {
|
||||
if (APR_BUCKET_IS_FLUSH(b)
|
||||
|| (!APR_BUCKET_IS_METADATA(b) && b->length == 0)) {
|
||||
APR_BUCKET_REMOVE(b);
|
||||
apr_bucket_destroy(b);
|
||||
}
|
||||
|
@@ -49,7 +49,8 @@
|
||||
#include "h2_worker.h"
|
||||
#include "h2_util.h"
|
||||
|
||||
static void H2_TASK_OUT_LOG(int lvl, h2_task *task, apr_bucket_brigade *bb, char *tag)
|
||||
static void H2_TASK_OUT_LOG(int lvl, h2_task *task, apr_bucket_brigade *bb,
|
||||
const char *tag)
|
||||
{
|
||||
if (APLOG_C_IS_LEVEL(task->c, lvl)) {
|
||||
conn_rec *c = task->c;
|
||||
@@ -74,145 +75,23 @@ static int input_ser_header(void *ctx, const char *name, const char *value)
|
||||
return 1;
|
||||
}
|
||||
|
||||
static apr_status_t input_read(h2_task *task, ap_filter_t* f,
|
||||
apr_bucket_brigade* bb, ap_input_mode_t mode,
|
||||
apr_read_type_e block, apr_off_t readbytes)
|
||||
{
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
apr_bucket *b, *next;
|
||||
apr_off_t bblen;
|
||||
apr_size_t rmax = ((readbytes <= APR_SIZE_MAX)?
|
||||
(apr_size_t)readbytes : APR_SIZE_MAX);
|
||||
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
||||
"h2_task(%s): read, mode=%d, block=%d, readbytes=%ld",
|
||||
task->id, mode, block, (long)readbytes);
|
||||
|
||||
if (mode == AP_MODE_INIT) {
|
||||
return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes);
|
||||
}
|
||||
|
||||
if (f->c->aborted) {
|
||||
return APR_ECONNABORTED;
|
||||
}
|
||||
|
||||
if (!task->input.bb) {
|
||||
return APR_EOF;
|
||||
}
|
||||
|
||||
/* Cleanup brigades from those nasty 0 length non-meta buckets
|
||||
* that apr_brigade_split_line() sometimes produces. */
|
||||
for (b = APR_BRIGADE_FIRST(task->input.bb);
|
||||
b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
|
||||
next = APR_BUCKET_NEXT(b);
|
||||
if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
|
||||
apr_bucket_delete(b);
|
||||
}
|
||||
}
|
||||
|
||||
while (APR_BRIGADE_EMPTY(task->input.bb)) {
|
||||
/* Get more input data for our request. */
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||
"h2_task(%s): get more data from mplx, block=%d, "
|
||||
"readbytes=%ld", task->id, block, (long)readbytes);
|
||||
|
||||
/* Override the block mode we get called with depending on the input's
|
||||
* setting. */
|
||||
if (task->input.beam) {
|
||||
status = h2_beam_receive(task->input.beam, task->input.bb, block,
|
||||
H2MIN(readbytes, 32*1024));
|
||||
}
|
||||
else {
|
||||
status = APR_EOF;
|
||||
}
|
||||
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
|
||||
"h2_task(%s): read returned", task->id);
|
||||
if (APR_STATUS_IS_EAGAIN(status)
|
||||
&& (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) {
|
||||
/* chunked input handling does not seem to like it if we
|
||||
* return with APR_EAGAIN from a GETLINE read...
|
||||
* upload 100k test on test-ser.example.org hangs */
|
||||
status = APR_SUCCESS;
|
||||
}
|
||||
else if (APR_STATUS_IS_EOF(status)) {
|
||||
break;
|
||||
}
|
||||
else if (status != APR_SUCCESS) {
|
||||
return status;
|
||||
}
|
||||
|
||||
/* Inspect the buckets received, detect EOS and apply
|
||||
* chunked encoding if necessary */
|
||||
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
|
||||
"input.beam recv raw", task->input.bb);
|
||||
if (h2_task_logio_add_bytes_in) {
|
||||
apr_brigade_length(bb, 0, &bblen);
|
||||
h2_task_logio_add_bytes_in(f->c, bblen);
|
||||
}
|
||||
}
|
||||
|
||||
if (status == APR_EOF && APR_BRIGADE_EMPTY(task->input.bb)) {
|
||||
return APR_EOF;
|
||||
}
|
||||
|
||||
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
|
||||
"task_input.bb", task->input.bb);
|
||||
|
||||
if (APR_BRIGADE_EMPTY(task->input.bb)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
||||
"h2_task(%s): no data", task->id);
|
||||
return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF;
|
||||
}
|
||||
|
||||
if (mode == AP_MODE_EXHAUSTIVE) {
|
||||
/* return all we have */
|
||||
APR_BRIGADE_CONCAT(bb, task->input.bb);
|
||||
}
|
||||
else if (mode == AP_MODE_READBYTES) {
|
||||
status = h2_brigade_concat_length(bb, task->input.bb, rmax);
|
||||
}
|
||||
else if (mode == AP_MODE_SPECULATIVE) {
|
||||
status = h2_brigade_copy_length(bb, task->input.bb, rmax);
|
||||
}
|
||||
else if (mode == AP_MODE_GETLINE) {
|
||||
/* we are reading a single LF line, e.g. the HTTP headers.
|
||||
* this has the nasty side effect to split the bucket, even
|
||||
* though it ends with CRLF and creates a 0 length bucket */
|
||||
status = apr_brigade_split_line(bb, task->input.bb, block,
|
||||
HUGE_STRING_LEN);
|
||||
if (APLOGctrace1(f->c)) {
|
||||
char buffer[1024];
|
||||
apr_size_t len = sizeof(buffer)-1;
|
||||
apr_brigade_flatten(bb, buffer, &len);
|
||||
buffer[len] = 0;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||
"h2_task(%s): getline: %s",
|
||||
task->id, buffer);
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not
|
||||
* to support it. Seems to work. */
|
||||
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c,
|
||||
APLOGNO(02942)
|
||||
"h2_task, unsupported READ mode %d", mode);
|
||||
status = APR_ENOTIMPL;
|
||||
}
|
||||
|
||||
if (APLOGctrace1(f->c)) {
|
||||
apr_brigade_length(bb, 0, &bblen);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||
"h2_task(%s): return %ld data bytes",
|
||||
task->id, (long)bblen);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
/*******************************************************************************
|
||||
* task output handling
|
||||
******************************************************************************/
|
||||
|
||||
static void prep_output(h2_task *task)
|
||||
{
|
||||
if (!task->output.beam) {
|
||||
h2_beam_create(&task->output.beam, task->pool,
|
||||
task->stream_id, "output", 0);
|
||||
if (task->output.copy_files) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
|
||||
"h2_slave_out(%s): copy_files on", task->id);
|
||||
h2_beam_on_file_beam(task->output.beam, h2_beam_no_files, NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static apr_status_t open_output(h2_task *task)
|
||||
{
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03348)
|
||||
@@ -220,6 +99,7 @@ static apr_status_t open_output(h2_task *task)
|
||||
task->id, task->request->method,
|
||||
task->request->authority,
|
||||
task->request->path);
|
||||
prep_output(task);
|
||||
task->output.opened = 1;
|
||||
return h2_mplx_out_open(task->mplx, task->stream_id, task->output.beam);
|
||||
}
|
||||
@@ -268,7 +148,7 @@ static apr_status_t slave_out(h2_task *task, ap_filter_t* f,
|
||||
|
||||
if (APR_BRIGADE_EMPTY(bb)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
|
||||
"h2_task(%s): empty write", task->id);
|
||||
"h2_slave_out(%s): empty write", task->id);
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
||||
@@ -288,15 +168,7 @@ static apr_status_t slave_out(h2_task *task, ap_filter_t* f,
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
||||
if (!task->output.beam) {
|
||||
h2_beam_create(&task->output.beam, task->pool,
|
||||
task->stream_id, "output", 0);
|
||||
if (task->output.copy_files) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
|
||||
"h2_task(%s): copy_files on", task->id);
|
||||
h2_beam_on_file_beam(task->output.beam, h2_beam_no_files, NULL);
|
||||
}
|
||||
}
|
||||
prep_output(task);
|
||||
|
||||
/* Attempt to write saved brigade first */
|
||||
if (task->output.bb && !APR_BRIGADE_EMPTY(task->output.bb)) {
|
||||
@@ -332,7 +204,7 @@ static apr_status_t slave_out(h2_task *task, ap_filter_t* f,
|
||||
/* If the passed brigade is not empty, save it before return */
|
||||
if (!APR_BRIGADE_EMPTY(bb)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03405)
|
||||
"h2_task(%s): could not write all, saving brigade",
|
||||
"h2_slave_out(%s): could not write all, saving brigade",
|
||||
task->id);
|
||||
if (!task->output.bb) {
|
||||
task->output.bb = apr_brigade_create(task->pool,
|
||||
@@ -351,7 +223,7 @@ static apr_status_t slave_out(h2_task *task, ap_filter_t* f,
|
||||
status = open_output(task);
|
||||
}
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c,
|
||||
"h2_task(%s): slave_out leave", task->id);
|
||||
"h2_slave_out(%s): slave_out leave", task->id);
|
||||
return status;
|
||||
}
|
||||
|
||||
@@ -367,49 +239,144 @@ static apr_status_t output_finish(h2_task *task)
|
||||
* task slave connection filters
|
||||
******************************************************************************/
|
||||
|
||||
static apr_status_t h2_filter_slave_input(ap_filter_t* filter,
|
||||
apr_bucket_brigade* brigade,
|
||||
static apr_status_t h2_filter_slave_in(ap_filter_t* f,
|
||||
apr_bucket_brigade* bb,
|
||||
ap_input_mode_t mode,
|
||||
apr_read_type_e block,
|
||||
apr_off_t readbytes)
|
||||
{
|
||||
h2_task *task = h2_ctx_cget_task(filter->c);
|
||||
AP_DEBUG_ASSERT(task);
|
||||
return input_read(task, filter, brigade, mode, block, readbytes);
|
||||
h2_task *task;
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
apr_bucket *b, *next;
|
||||
apr_off_t bblen;
|
||||
apr_size_t rmax;
|
||||
|
||||
task = h2_ctx_cget_task(f->c);
|
||||
ap_assert(task);
|
||||
rmax = ((readbytes <= APR_SIZE_MAX)? (apr_size_t)readbytes : APR_SIZE_MAX);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
||||
"h2_slave_in(%s): read, mode=%d, block=%d, readbytes=%ld",
|
||||
task->id, mode, block, (long)readbytes);
|
||||
|
||||
if (mode == AP_MODE_INIT) {
|
||||
return ap_get_brigade(f->c->input_filters, bb, mode, block, readbytes);
|
||||
}
|
||||
|
||||
static apr_status_t h2_filter_continue(ap_filter_t* f,
|
||||
apr_bucket_brigade* brigade,
|
||||
ap_input_mode_t mode,
|
||||
apr_read_type_e block,
|
||||
apr_off_t readbytes)
|
||||
{
|
||||
h2_task *task = h2_ctx_cget_task(f->c);
|
||||
apr_status_t status;
|
||||
if (f->c->aborted) {
|
||||
return APR_ECONNABORTED;
|
||||
}
|
||||
|
||||
ap_assert(task);
|
||||
if (f->r->expecting_100 && ap_is_HTTP_SUCCESS(f->r->status)) {
|
||||
h2_headers *response;
|
||||
apr_bucket_brigade *tmp;
|
||||
apr_bucket *b;
|
||||
if (!task->input.bb) {
|
||||
return APR_EOF;
|
||||
}
|
||||
|
||||
response = h2_headers_rcreate(f->r, HTTP_CONTINUE, NULL, f->r->pool);
|
||||
tmp = apr_brigade_create(f->r->pool, f->c->bucket_alloc);
|
||||
b = h2_bucket_headers_create(f->c->bucket_alloc, response);
|
||||
APR_BRIGADE_INSERT_TAIL(tmp, b);
|
||||
b = apr_bucket_flush_create(f->c->bucket_alloc);
|
||||
APR_BRIGADE_INSERT_TAIL(tmp, b);
|
||||
status = ap_pass_brigade(f->r->output_filters, tmp);
|
||||
apr_brigade_destroy(tmp);
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, f->r,
|
||||
"h2_task(%s): sent 100 Continue", task->id);
|
||||
if (status != APR_SUCCESS) {
|
||||
/* Cleanup brigades from those nasty 0 length non-meta buckets
|
||||
* that apr_brigade_split_line() sometimes produces. */
|
||||
for (b = APR_BRIGADE_FIRST(task->input.bb);
|
||||
b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
|
||||
next = APR_BUCKET_NEXT(b);
|
||||
if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
|
||||
apr_bucket_delete(b);
|
||||
}
|
||||
}
|
||||
|
||||
while (APR_BRIGADE_EMPTY(task->input.bb)) {
|
||||
/* Get more input data for our request. */
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||
"h2_slave_in(%s): get more data from mplx, block=%d, "
|
||||
"readbytes=%ld", task->id, block, (long)readbytes);
|
||||
|
||||
/* Override the block mode we get called with depending on the input's
|
||||
* setting. */
|
||||
if (task->input.beam) {
|
||||
status = h2_beam_receive(task->input.beam, task->input.bb, block,
|
||||
H2MIN(readbytes, 32*1024));
|
||||
}
|
||||
else {
|
||||
status = APR_EOF;
|
||||
}
|
||||
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
|
||||
"h2_slave_in(%s): read returned", task->id);
|
||||
if (APR_STATUS_IS_EAGAIN(status)
|
||||
&& (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) {
|
||||
/* chunked input handling does not seem to like it if we
|
||||
* return with APR_EAGAIN from a GETLINE read...
|
||||
* upload 100k test on test-ser.example.org hangs */
|
||||
status = APR_SUCCESS;
|
||||
}
|
||||
else if (APR_STATUS_IS_EOF(status)) {
|
||||
break;
|
||||
}
|
||||
else if (status != APR_SUCCESS) {
|
||||
return status;
|
||||
}
|
||||
f->r->expecting_100 = 0;
|
||||
apr_table_clear(f->r->headers_out);
|
||||
|
||||
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
|
||||
"input.beam recv raw", task->input.bb);
|
||||
if (h2_task_logio_add_bytes_in) {
|
||||
apr_brigade_length(bb, 0, &bblen);
|
||||
h2_task_logio_add_bytes_in(f->c, bblen);
|
||||
}
|
||||
return ap_get_brigade(f->next, brigade, mode, block, readbytes);
|
||||
}
|
||||
|
||||
/* Inspect the buckets received, detect EOS and apply
|
||||
* chunked encoding if necessary */
|
||||
if (status == APR_EOF && APR_BRIGADE_EMPTY(task->input.bb)) {
|
||||
return APR_EOF;
|
||||
}
|
||||
|
||||
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
|
||||
"task_input.bb", task->input.bb);
|
||||
|
||||
if (APR_BRIGADE_EMPTY(task->input.bb)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
|
||||
"h2_slave_in(%s): no data", task->id);
|
||||
return (block == APR_NONBLOCK_READ)? APR_EAGAIN : APR_EOF;
|
||||
}
|
||||
|
||||
if (mode == AP_MODE_EXHAUSTIVE) {
|
||||
/* return all we have */
|
||||
APR_BRIGADE_CONCAT(bb, task->input.bb);
|
||||
}
|
||||
else if (mode == AP_MODE_READBYTES) {
|
||||
status = h2_brigade_concat_length(bb, task->input.bb, rmax);
|
||||
}
|
||||
else if (mode == AP_MODE_SPECULATIVE) {
|
||||
status = h2_brigade_copy_length(bb, task->input.bb, rmax);
|
||||
}
|
||||
else if (mode == AP_MODE_GETLINE) {
|
||||
/* we are reading a single LF line, e.g. the HTTP headers.
|
||||
* this has the nasty side effect to split the bucket, even
|
||||
* though it ends with CRLF and creates a 0 length bucket */
|
||||
status = apr_brigade_split_line(bb, task->input.bb, block,
|
||||
HUGE_STRING_LEN);
|
||||
if (APLOGctrace1(f->c)) {
|
||||
char buffer[1024];
|
||||
apr_size_t len = sizeof(buffer)-1;
|
||||
apr_brigade_flatten(bb, buffer, &len);
|
||||
buffer[len] = 0;
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||
"h2_slave_in(%s): getline: %s",
|
||||
task->id, buffer);
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* Hmm, well. There is mode AP_MODE_EATCRLF, but we chose not
|
||||
* to support it. Seems to work. */
|
||||
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOTIMPL, f->c,
|
||||
APLOGNO(02942)
|
||||
"h2_slave_in(%s), unsupported READ mode %d",
|
||||
task->id, mode);
|
||||
status = APR_ENOTIMPL;
|
||||
}
|
||||
|
||||
if (APLOGctrace1(f->c)) {
|
||||
apr_brigade_length(bb, 0, &bblen);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
|
||||
"h2_slave_in(%s): %ld data bytes", task->id, (long)bblen);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
static apr_status_t h2_filter_slave_output(ap_filter_t* filter,
|
||||
@@ -426,6 +393,27 @@ static apr_status_t h2_filter_slave_output(ap_filter_t* filter,
|
||||
return status;
|
||||
}
|
||||
|
||||
static apr_status_t h2_filter_parse_h1(ap_filter_t* f, apr_bucket_brigade* bb)
|
||||
{
|
||||
h2_task *task = h2_ctx_cget_task(f->c);
|
||||
apr_status_t status;
|
||||
|
||||
ap_assert(task);
|
||||
/* There are cases where we need to parse a serialized http/1.1
|
||||
* response. One example is a 100-continue answer in serialized mode
|
||||
* or via a mod_proxy setup */
|
||||
while (task->output.parse_response) {
|
||||
status = h2_from_h1_parse_response(task, f, bb);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, f->c,
|
||||
"h2_task(%s): parsed response", task->id);
|
||||
if (APR_BRIGADE_EMPTY(bb) || status != APR_SUCCESS) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
return ap_pass_brigade(f->next, bb);
|
||||
}
|
||||
|
||||
/*******************************************************************************
|
||||
* task things
|
||||
******************************************************************************/
|
||||
@@ -483,13 +471,13 @@ void h2_task_register_hooks(void)
|
||||
ap_hook_process_connection(h2_task_process_conn,
|
||||
NULL, NULL, APR_HOOK_FIRST);
|
||||
|
||||
ap_register_input_filter("H2_SLAVE_IN", h2_filter_slave_input,
|
||||
ap_register_input_filter("H2_SLAVE_IN", h2_filter_slave_in,
|
||||
NULL, AP_FTYPE_NETWORK);
|
||||
ap_register_output_filter("H2_SLAVE_OUT", h2_filter_slave_output,
|
||||
NULL, AP_FTYPE_NETWORK);
|
||||
ap_register_output_filter("H2_PARSE_H1", h2_filter_parse_h1,
|
||||
NULL, AP_FTYPE_NETWORK);
|
||||
|
||||
ap_register_input_filter("H2_CONTINUE", h2_filter_continue,
|
||||
NULL, AP_FTYPE_PROTOCOL);
|
||||
ap_register_input_filter("H2_REQUEST", h2_filter_request_in,
|
||||
NULL, AP_FTYPE_PROTOCOL);
|
||||
ap_register_output_filter("H2_RESPONSE", h2_filter_headers_out,
|
||||
@@ -521,6 +509,7 @@ static int h2_task_pre_conn(conn_rec* c, void *arg)
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
|
||||
"h2_h2, pre_connection, found stream task");
|
||||
ap_add_input_filter("H2_SLAVE_IN", NULL, NULL, c);
|
||||
ap_add_output_filter("H2_PARSE_H1", NULL, NULL, c);
|
||||
ap_add_output_filter("H2_SLAVE_OUT", NULL, NULL, c);
|
||||
}
|
||||
return OK;
|
||||
@@ -582,13 +571,23 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_t *thread)
|
||||
task->input.bb = apr_brigade_create(task->pool, task->c->bucket_alloc);
|
||||
if (task->request->serialize) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
|
||||
"h2_task(%s): serialize request %s %s",
|
||||
task->id, task->request->method, task->request->path);
|
||||
"h2_task(%s): serialize request %s %s, expect-100=%d",
|
||||
task->id, task->request->method, task->request->path,
|
||||
task->request->expect_100);
|
||||
apr_brigade_printf(task->input.bb, NULL,
|
||||
NULL, "%s %s HTTP/1.1\r\n",
|
||||
task->request->method, task->request->path);
|
||||
apr_table_do(input_ser_header, task, task->request->headers, NULL);
|
||||
apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
|
||||
if (task->request->expect_100) {
|
||||
/* we are unable to suppress the serialization of the
|
||||
* intermediate response and need to parse it */
|
||||
task->output.parse_response = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (task->request->expect_100) {
|
||||
task->output.parse_response = 1;
|
||||
}
|
||||
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
|
||||
|
@@ -44,6 +44,7 @@ struct h2_mplx;
|
||||
struct h2_task;
|
||||
struct h2_req_engine;
|
||||
struct h2_request;
|
||||
struct h2_response_parser;
|
||||
struct h2_worker;
|
||||
|
||||
typedef struct h2_task h2_task;
|
||||
@@ -63,12 +64,15 @@ struct h2_task {
|
||||
unsigned int eos : 1;
|
||||
apr_bucket_brigade *bb;
|
||||
apr_bucket_brigade *bbchunk;
|
||||
apr_off_t chunked_total;
|
||||
} input;
|
||||
struct {
|
||||
struct h2_bucket_beam *beam;
|
||||
unsigned int opened : 1;
|
||||
unsigned int sent_response : 1;
|
||||
unsigned int copy_files : 1;
|
||||
unsigned int parse_response : 1;
|
||||
struct h2_response_parser *rparser;
|
||||
apr_bucket_brigade *bb;
|
||||
} output;
|
||||
|
||||
|
@@ -618,7 +618,7 @@ static apr_status_t last_not_included(apr_bucket_brigade *bb,
|
||||
{
|
||||
apr_bucket *b;
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
int files_allowed = pfile_buckets_allowed? *pfile_buckets_allowed : 0;
|
||||
int files_allowed = pfile_buckets_allowed? (int)*pfile_buckets_allowed : 0;
|
||||
|
||||
if (maxlen >= 0) {
|
||||
/* Find the bucket, up to which we reach maxlen/mem bytes */
|
||||
|
@@ -343,7 +343,7 @@ static char *http2_var_lookup(apr_pool_t *p, server_rec *s,
|
||||
return (char *)vdef->lookup(p, s, c, r, ctx);
|
||||
}
|
||||
}
|
||||
return "";
|
||||
return (char*)"";
|
||||
}
|
||||
|
||||
static int h2_h2_fixups(request_rec *r)
|
||||
|
@@ -65,7 +65,7 @@ typedef struct h2_proxy_ctx {
|
||||
apr_pool_t *engine_pool;
|
||||
apr_size_t req_buffer_size;
|
||||
request_rec *next;
|
||||
apr_size_t capacity;
|
||||
int capacity;
|
||||
|
||||
unsigned standalone : 1;
|
||||
unsigned is_ssl : 1;
|
||||
@@ -168,7 +168,7 @@ static int proxy_http2_canon(request_rec *r, char *url)
|
||||
path = url; /* this is the raw path */
|
||||
}
|
||||
else {
|
||||
path = ap_proxy_canonenc(r->pool, url, strlen(url),
|
||||
path = ap_proxy_canonenc(r->pool, url, (int)strlen(url),
|
||||
enc_path, 0, r->proxyreq);
|
||||
search = r->args;
|
||||
}
|
||||
@@ -275,7 +275,8 @@ static void request_done(h2_proxy_session *session, request_rec *r,
|
||||
h2_proxy_ctx *ctx = session->user_data;
|
||||
const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE);
|
||||
|
||||
if (!complete && !touched) {
|
||||
if (!complete) {
|
||||
if (!touched) {
|
||||
/* untouched request, need rescheduling */
|
||||
if (req_engine_push && is_h2 && is_h2(ctx->owner)) {
|
||||
if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) {
|
||||
@@ -288,28 +289,34 @@ static void request_done(h2_proxy_session *session, request_rec *r,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (r == ctx->rbase && complete) {
|
||||
ctx->r_status = APR_SUCCESS;
|
||||
else {
|
||||
const char *uri;
|
||||
uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
|
||||
APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s "
|
||||
"not complete, was touched",
|
||||
ctx->engine_id, task_id, uri);
|
||||
}
|
||||
}
|
||||
|
||||
if (r == ctx->rbase) {
|
||||
ctx->r_status = complete? APR_SUCCESS : HTTP_GATEWAY_TIME_OUT;
|
||||
}
|
||||
|
||||
if (complete) {
|
||||
if (req_engine_done && ctx->engine) {
|
||||
if (complete) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
|
||||
APLOGNO(03370)
|
||||
"h2_proxy_session(%s): finished request %s",
|
||||
ctx->engine_id, task_id);
|
||||
req_engine_done(ctx->engine, r->connection);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (req_engine_done && ctx->engine) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, r->connection,
|
||||
APLOGNO(03371)
|
||||
"h2_proxy_session(%s): failed request %s",
|
||||
ctx->engine_id, task_id);
|
||||
req_engine_done(ctx->engine, r->connection);
|
||||
}
|
||||
req_engine_done(ctx->engine, r->connection);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -323,7 +330,7 @@ static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave)
|
||||
status = req_engine_pull(ctx->engine, before_leave?
|
||||
APR_BLOCK_READ: APR_NONBLOCK_READ,
|
||||
ctx->capacity, &ctx->next);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, ctx->owner,
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner,
|
||||
"h2_proxy_engine(%s): pulled request (%s) %s",
|
||||
ctx->engine_id,
|
||||
before_leave? "before leave" : "regular",
|
||||
@@ -342,7 +349,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, ctx->owner,
|
||||
"eng(%s): setup session", ctx->engine_id);
|
||||
ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf,
|
||||
30, h2_proxy_log2(ctx->req_buffer_size),
|
||||
30, h2_proxy_log2((int)ctx->req_buffer_size),
|
||||
request_done);
|
||||
if (!ctx->session) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner,
|
||||
@@ -367,7 +374,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) {
|
||||
/* ongoing processing, call again */
|
||||
if (ctx->session->remote_max_concurrent > 0
|
||||
&& ctx->session->remote_max_concurrent != ctx->capacity) {
|
||||
ctx->capacity = ctx->session->remote_max_concurrent;
|
||||
ctx->capacity = (int)ctx->session->remote_max_concurrent;
|
||||
}
|
||||
s2 = next_request(ctx, 0);
|
||||
if (s2 == APR_ECONNABORTED) {
|
||||
@@ -547,7 +554,7 @@ run_connect:
|
||||
* backend->hostname. */
|
||||
if (ap_proxy_connect_backend(ctx->proxy_func, ctx->p_conn, ctx->worker,
|
||||
ctx->server)) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, ctx->owner, APLOGNO(03352)
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03352)
|
||||
"H2: failed to make connection to backend: %s",
|
||||
ctx->p_conn->hostname);
|
||||
goto cleanup;
|
||||
@@ -555,31 +562,29 @@ run_connect:
|
||||
|
||||
/* Step Three: Create conn_rec for the socket we have open now. */
|
||||
if (!ctx->p_conn->connection) {
|
||||
status = ap_proxy_connection_create_ex(ctx->proxy_func,
|
||||
ctx->p_conn, ctx->rbase);
|
||||
if (status != OK) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03353)
|
||||
"setup new connection: is_ssl=%d %s %s %s",
|
||||
ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname,
|
||||
locurl, ctx->p_conn->hostname);
|
||||
status = ap_proxy_connection_create_ex(ctx->proxy_func,
|
||||
ctx->p_conn, ctx->rbase);
|
||||
if (status != OK) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/*
|
||||
* On SSL connections set a note on the connection what CN is
|
||||
* requested, such that mod_ssl can check if it is requested to do
|
||||
* so.
|
||||
*/
|
||||
if (!ctx->p_conn->data) {
|
||||
/* New conection: set a note on the connection what CN is
|
||||
* requested and what protocol we want */
|
||||
if (ctx->p_conn->ssl_hostname) {
|
||||
apr_table_setn(ctx->p_conn->connection->notes,
|
||||
"proxy-request-hostname", ctx->p_conn->ssl_hostname);
|
||||
}
|
||||
|
||||
if (ctx->is_ssl) {
|
||||
apr_table_setn(ctx->p_conn->connection->notes,
|
||||
"proxy-request-alpn-protos", "h2");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
run_session:
|
||||
status = proxy_engine_run(ctx);
|
||||
|
Reference in New Issue
Block a user