1
0
mirror of https://github.com/apache/httpd.git synced 2025-08-07 04:02:58 +03:00

mod_proxy_http: rework the flushing strategy when forwarding the request body.

Since the forwarding of 100-continue (end to end) in r1836588, we depended on
reading all of the requested HUGE_STRING_LEN bytes to avoid the flushes, but
this is a bit fragile.

This commit introduces the new stream_reqbody_read() function which will try a
nonblocking read first and, if it fails with EAGAIN, will flush on the backend
side before blocking for the next client side read.

We can then use it in stream_reqbody_{chunked,cl}() to flush client forwarded
data only when necessary. This both allows "optimal" flushing and simplifies
code (note that spool_reqbody_cl() also makes use of the new function but not
its nonblocking/flush functionality, thus only for consistency with the two
others, simplification and common error handling).

Also, since proxy_http_req_t::flushall/subprocess_env::proxy-flushall are now
meaningless (and unused) on the backend side, they are renamed respectively to
prefetch_nonblocking/proxy-prefetch-nonblocking, and solely determine whether
to prefetch in nonblocking mode or not. These flags were trunk only and may
not be really useful if we decided to prefetch in nonblocking mode in any case,
but for 2.4.x the opt-in looks wise.


git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1853407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yann Ylavic
2019-02-11 21:55:43 +00:00
parent 33b2ad9f1c
commit 01d8e196dc
2 changed files with 106 additions and 170 deletions

View File

@@ -77,7 +77,6 @@ typedef struct h2_proxy_ctx {
unsigned standalone : 1; unsigned standalone : 1;
unsigned is_ssl : 1; unsigned is_ssl : 1;
unsigned flushall : 1;
apr_status_t r_status; /* status of our first request work */ apr_status_t r_status; /* status of our first request work */
h2_proxy_session *session; /* current http2 session against backend */ h2_proxy_session *session; /* current http2 session against backend */
@@ -509,7 +508,6 @@ static int proxy_http2_handler(request_rec *r,
ctx->is_ssl = is_ssl; ctx->is_ssl = is_ssl;
ctx->worker = worker; ctx->worker = worker;
ctx->conf = conf; ctx->conf = conf;
ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0;
ctx->r_status = HTTP_SERVICE_UNAVAILABLE; ctx->r_status = HTTP_SERVICE_UNAVAILABLE;
h2_proxy_fifo_set_create(&ctx->requests, ctx->pool, 100); h2_proxy_fifo_set_create(&ctx->requests, ctx->pool, 100);

View File

@@ -250,34 +250,80 @@ typedef struct {
apr_bucket_brigade *header_brigade; apr_bucket_brigade *header_brigade;
apr_bucket_brigade *input_brigade; apr_bucket_brigade *input_brigade;
char *old_cl_val, *old_te_val; char *old_cl_val, *old_te_val;
apr_off_t cl_val, bytes_spooled; apr_off_t cl_val;
rb_methods rb_method; rb_methods rb_method;
int expecting_100; int expecting_100;
unsigned int do_100_continue:1, unsigned int do_100_continue:1,
flushall:1; prefetch_nonblocking:1;
} proxy_http_req_t; } proxy_http_req_t;
/* Read what's in the client pipe. If nonblocking is set and read is EAGAIN,
* pass a FLUSH bucket to the backend and read again in blocking mode.
*/
static int stream_reqbody_read(proxy_http_req_t *req, apr_bucket_brigade *bb,
int nonblocking)
{
request_rec *r = req->r;
proxy_conn_rec *p_conn = req->backend;
apr_bucket_alloc_t *bucket_alloc = req->bucket_alloc;
apr_read_type_e block = nonblocking ? APR_NONBLOCK_READ : APR_BLOCK_READ;
apr_status_t status;
int rv;
for (;;) {
status = ap_get_brigade(r->input_filters, bb, AP_MODE_READBYTES,
block, HUGE_STRING_LEN);
if (block == APR_BLOCK_READ
|| (!APR_STATUS_IS_EAGAIN(status)
&& (status != APR_SUCCESS || !APR_BRIGADE_EMPTY(bb)))) {
break;
}
/* Flush and retry (blocking) */
apr_brigade_cleanup(bb);
rv = ap_proxy_pass_brigade(bucket_alloc, r, p_conn, req->origin, bb, 1);
if (rv != OK) {
return rv;
}
block = APR_BLOCK_READ;
}
if (status != APR_SUCCESS) {
conn_rec *c = r->connection;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, r, APLOGNO(02608)
"read request body failed to %pI (%s)"
" from %s (%s)", p_conn->addr,
p_conn->hostname ? p_conn->hostname: "",
c->client_ip, c->remote_host ? c->remote_host: "");
return ap_map_http_request_error(status, HTTP_BAD_REQUEST);
}
return OK;
}
static int stream_reqbody_chunked(proxy_http_req_t *req) static int stream_reqbody_chunked(proxy_http_req_t *req)
{ {
request_rec *r = req->r; request_rec *r = req->r;
int seen_eos = 0, rv = OK; int seen_eos = 0, rv = OK;
apr_size_t hdr_len; apr_size_t hdr_len;
apr_off_t bytes; apr_off_t bytes;
apr_status_t status;
char chunk_hdr[20]; /* must be here due to transient bucket. */ char chunk_hdr[20]; /* must be here due to transient bucket. */
proxy_conn_rec *p_conn = req->backend; proxy_conn_rec *p_conn = req->backend;
apr_bucket_alloc_t *bucket_alloc = req->bucket_alloc; apr_bucket_alloc_t *bucket_alloc = req->bucket_alloc;
apr_bucket_brigade *header_brigade = req->header_brigade; apr_bucket_brigade *header_brigade = req->header_brigade;
apr_bucket_brigade *input_brigade = req->input_brigade; apr_bucket_brigade *input_brigade = req->input_brigade;
apr_bucket_brigade *bb;
apr_bucket *e; apr_bucket *e;
while (APR_BRIGADE_EMPTY(input_brigade) do {
|| !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(input_brigade))) if (APR_BRIGADE_EMPTY(input_brigade)
{ && APR_BRIGADE_EMPTY(header_brigade)) {
int flush = req->flushall; rv = stream_reqbody_read(req, input_brigade, 1);
if (rv != OK) {
return rv;
}
}
if (!APR_BRIGADE_EMPTY(input_brigade)) { if (!APR_BRIGADE_EMPTY(input_brigade)) {
/* If this brigade contains EOS, either stop or remove it. */ /* If this brigade contains EOS, either stop or remove it. */
@@ -290,12 +336,6 @@ static int stream_reqbody_chunked(proxy_http_req_t *req)
} }
apr_brigade_length(input_brigade, 1, &bytes); apr_brigade_length(input_brigade, 1, &bytes);
/* Flush only if we did not get the requested #bytes. */
if (bytes < HUGE_STRING_LEN) {
flush = 0;
}
hdr_len = apr_snprintf(chunk_hdr, sizeof(chunk_hdr), hdr_len = apr_snprintf(chunk_hdr, sizeof(chunk_hdr),
"%" APR_UINT64_T_HEX_FMT CRLF, "%" APR_UINT64_T_HEX_FMT CRLF,
(apr_uint64_t)bytes); (apr_uint64_t)bytes);
@@ -312,108 +352,62 @@ static int stream_reqbody_chunked(proxy_http_req_t *req)
APR_BRIGADE_INSERT_TAIL(input_brigade, e); APR_BRIGADE_INSERT_TAIL(input_brigade, e);
} }
if (!APR_BRIGADE_EMPTY(header_brigade)) { /* If we never sent the header brigade, so go ahead and
/* we never sent the header brigade, so go ahead and * take care of that now by prepending it.
* take care of that now */
*/ APR_BRIGADE_PREPEND(input_brigade, header_brigade);
bb = header_brigade;
APR_BRIGADE_CONCAT(bb, input_brigade);
/* Flush now since we have the header and (enough of) the /* No flush here since it's done either on the next loop depending
* prefeched body, or racing KeepAliveTimeout on the backend * on stream_reqbody_read(), or after the loop with the EOS chunk.
* side may kill our connection while we read more client data. */
*/
flush = 1;
}
else {
bb = input_brigade;
}
/* Once we hit EOS, flush below this loop with the EOS chunk. */
rv = ap_proxy_pass_brigade(bucket_alloc, r, p_conn, req->origin, rv = ap_proxy_pass_brigade(bucket_alloc, r, p_conn, req->origin,
bb, flush && !seen_eos); input_brigade, 0);
if (rv != OK) { if (rv != OK) {
return rv; return rv;
} }
} while (!seen_eos);
if (seen_eos) {
break;
}
status = ap_get_brigade(r->input_filters, input_brigade,
AP_MODE_READBYTES, APR_BLOCK_READ,
HUGE_STRING_LEN);
if (status != APR_SUCCESS) {
conn_rec *c = r->connection;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, r, APLOGNO(02608)
"read request body failed to %pI (%s)"
" from %s (%s)", p_conn->addr,
p_conn->hostname ? p_conn->hostname: "",
c->client_ip, c->remote_host ? c->remote_host: "");
return ap_map_http_request_error(status, HTTP_BAD_REQUEST);
}
}
if (!APR_BRIGADE_EMPTY(header_brigade)) {
/* we never sent the header brigade because there was no request body;
* send it now
*/
bb = header_brigade;
}
else {
if (!APR_BRIGADE_EMPTY(input_brigade)) {
/* input brigade still has an EOS which we can't pass to the output_filters. */
e = APR_BRIGADE_LAST(input_brigade);
AP_DEBUG_ASSERT(APR_BUCKET_IS_EOS(e));
apr_bucket_delete(e);
}
bb = input_brigade;
}
e = apr_bucket_immortal_create(ZERO_ASCII CRLF_ASCII e = apr_bucket_immortal_create(ZERO_ASCII CRLF_ASCII
/* <trailers> */ /* <trailers> */
CRLF_ASCII, CRLF_ASCII,
5, bucket_alloc); 5, bucket_alloc);
APR_BRIGADE_INSERT_TAIL(bb, e); APR_BRIGADE_INSERT_TAIL(input_brigade, e);
if (apr_table_get(r->subprocess_env, "proxy-sendextracrlf")) { if (apr_table_get(r->subprocess_env, "proxy-sendextracrlf")) {
e = apr_bucket_immortal_create(CRLF_ASCII, 2, bucket_alloc); e = apr_bucket_immortal_create(CRLF_ASCII, 2, bucket_alloc);
APR_BRIGADE_INSERT_TAIL(bb, e); APR_BRIGADE_INSERT_TAIL(input_brigade, e);
} }
/* Now we have headers-only, or the chunk EOS mark; flush it */ /* Now we have headers-only, or the chunk EOS mark; flush it */
return ap_proxy_pass_brigade(bucket_alloc, r, p_conn, req->origin, bb, 1); return ap_proxy_pass_brigade(bucket_alloc, r, p_conn, req->origin,
input_brigade, 1);
} }
static int stream_reqbody_cl(proxy_http_req_t *req) static int stream_reqbody_cl(proxy_http_req_t *req)
{ {
request_rec *r = req->r; request_rec *r = req->r;
int seen_eos = 0, rv = 0; int seen_eos = 0, rv = OK;
apr_status_t status = APR_SUCCESS;
proxy_conn_rec *p_conn = req->backend; proxy_conn_rec *p_conn = req->backend;
apr_bucket_alloc_t *bucket_alloc = req->bucket_alloc; apr_bucket_alloc_t *bucket_alloc = req->bucket_alloc;
apr_bucket_brigade *header_brigade = req->header_brigade; apr_bucket_brigade *header_brigade = req->header_brigade;
apr_bucket_brigade *input_brigade = req->input_brigade; apr_bucket_brigade *input_brigade = req->input_brigade;
apr_bucket_brigade *bb;
apr_bucket *e; apr_bucket *e;
apr_off_t bytes; apr_off_t bytes;
apr_off_t bytes_streamed = 0; apr_off_t bytes_streamed = 0;
while (APR_BRIGADE_EMPTY(input_brigade) do {
|| !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(input_brigade))) if (APR_BRIGADE_EMPTY(input_brigade)
{ && APR_BRIGADE_EMPTY(header_brigade)) {
int flush = req->flushall; rv = stream_reqbody_read(req, input_brigade, 1);
if (rv != OK) {
return rv;
}
}
if (!APR_BRIGADE_EMPTY(input_brigade)) { if (!APR_BRIGADE_EMPTY(input_brigade)) {
apr_brigade_length(input_brigade, 1, &bytes); apr_brigade_length(input_brigade, 1, &bytes);
bytes_streamed += bytes; bytes_streamed += bytes;
/* Flush only if we did not get the requested #bytes. */
if (bytes < HUGE_STRING_LEN) {
flush = 0;
}
/* If this brigade contains EOS, either stop or remove it. */ /* If this brigade contains EOS, either stop or remove it. */
if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) { if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) {
seen_eos = 1; seen_eos = 1;
@@ -448,48 +442,18 @@ static int stream_reqbody_cl(proxy_http_req_t *req)
} }
} }
if (!APR_BRIGADE_EMPTY(header_brigade)) { /* If we never sent the header brigade, so go ahead and
/* we never sent the header brigade, so go ahead and * take care of that now by prepending it.
* take care of that now */
*/ APR_BRIGADE_PREPEND(input_brigade, header_brigade);
bb = header_brigade;
APR_BRIGADE_CONCAT(bb, input_brigade);
/* Flush now since we have the header and (enough of) the /* Flush here on EOS because we won't stream_reqbody_read() again */
* prefeched body, or racing KeepAliveTimeout on the backend
* side may kill our connection while we read more client data.
*/
flush = 1;
}
else {
bb = input_brigade;
}
/* Once we hit EOS, we are ready to flush. */
rv = ap_proxy_pass_brigade(bucket_alloc, r, p_conn, req->origin, rv = ap_proxy_pass_brigade(bucket_alloc, r, p_conn, req->origin,
input_brigade, flush || seen_eos); input_brigade, seen_eos);
if (rv != OK) { if (rv != OK) {
return rv; return rv;
} }
} while (!seen_eos);
if (seen_eos) {
break;
}
status = ap_get_brigade(r->input_filters, input_brigade,
AP_MODE_READBYTES, APR_BLOCK_READ,
HUGE_STRING_LEN);
if (status != APR_SUCCESS) {
conn_rec *c = r->connection;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, r, APLOGNO(02609)
"read request body failed to %pI (%s)"
" from %s (%s)", p_conn->addr,
p_conn->hostname ? p_conn->hostname: "",
c->client_ip, c->remote_host ? c->remote_host: "");
return ap_map_http_request_error(status, HTTP_BAD_REQUEST);
}
}
if (bytes_streamed != req->cl_val) { if (bytes_streamed != req->cl_val) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(01087) ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(01087)
@@ -498,22 +462,14 @@ static int stream_reqbody_cl(proxy_http_req_t *req)
return HTTP_BAD_REQUEST; return HTTP_BAD_REQUEST;
} }
if (!APR_BRIGADE_EMPTY(header_brigade)) {
/* we never sent the header brigade since there was no request
* body; send it now with the flush flag
*/
return ap_proxy_pass_brigade(bucket_alloc, r, p_conn, req->origin,
header_brigade, 1);
}
return OK; return OK;
} }
static int spool_reqbody_cl(proxy_http_req_t *req) static int spool_reqbody_cl(proxy_http_req_t *req, apr_off_t *bytes_spooled)
{ {
apr_pool_t *p = req->p; apr_pool_t *p = req->p;
request_rec *r = req->r; request_rec *r = req->r;
int seen_eos = 0; int seen_eos = 0, rv = OK;
apr_status_t status = APR_SUCCESS; apr_status_t status = APR_SUCCESS;
apr_bucket_alloc_t *bucket_alloc = req->bucket_alloc; apr_bucket_alloc_t *bucket_alloc = req->bucket_alloc;
apr_bucket_brigade *input_brigade = req->input_brigade; apr_bucket_brigade *input_brigade = req->input_brigade;
@@ -524,17 +480,18 @@ static int spool_reqbody_cl(proxy_http_req_t *req)
apr_off_t limit; apr_off_t limit;
body_brigade = apr_brigade_create(p, bucket_alloc); body_brigade = apr_brigade_create(p, bucket_alloc);
*bytes_spooled = 0;
limit = ap_get_limit_req_body(r); limit = ap_get_limit_req_body(r);
if (APR_BRIGADE_EMPTY(input_brigade)) { do {
status = ap_get_brigade(r->input_filters, input_brigade, if (APR_BRIGADE_EMPTY(input_brigade)) {
AP_MODE_READBYTES, APR_BLOCK_READ, rv = stream_reqbody_read(req, input_brigade, 0);
HUGE_STRING_LEN); if (rv != OK) {
} return rv;
while (status == APR_SUCCESS }
&& !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(input_brigade))) }
{
/* If this brigade contains EOS, either stop or remove it. */ /* If this brigade contains EOS, either stop or remove it. */
if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) { if (APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))) {
seen_eos = 1; seen_eos = 1;
@@ -546,13 +503,13 @@ static int spool_reqbody_cl(proxy_http_req_t *req)
apr_brigade_length(input_brigade, 1, &bytes); apr_brigade_length(input_brigade, 1, &bytes);
if (req->bytes_spooled + bytes > MAX_MEM_SPOOL) { if (*bytes_spooled + bytes > MAX_MEM_SPOOL) {
/* /*
* LimitRequestBody does not affect Proxy requests (Should it?). * LimitRequestBody does not affect Proxy requests (Should it?).
* Let it take effect if we decide to store the body in a * Let it take effect if we decide to store the body in a
* temporary file on disk. * temporary file on disk.
*/ */
if (limit && (req->bytes_spooled + bytes > limit)) { if (limit && (*bytes_spooled + bytes > limit)) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(01088) ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(01088)
"Request body is larger than the configured " "Request body is larger than the configured "
"limit of %" APR_OFF_T_FMT, limit); "limit of %" APR_OFF_T_FMT, limit);
@@ -622,23 +579,8 @@ static int spool_reqbody_cl(proxy_http_req_t *req)
} }
req->bytes_spooled += bytes; *bytes_spooled += bytes;
} while (!seen_eos);
if (seen_eos) {
break;
}
status = ap_get_brigade(r->input_filters, input_brigade,
AP_MODE_READBYTES, APR_BLOCK_READ,
HUGE_STRING_LEN);
}
if (status != APR_SUCCESS) {
conn_rec *c = r->connection;
ap_log_rerror(APLOG_MARK, APLOG_ERR, status, r, APLOGNO(02610)
"read request body failed from %s (%s)",
c->client_ip, c->remote_host ? c->remote_host: "");
return ap_map_http_request_error(status, HTTP_BAD_REQUEST);
}
APR_BRIGADE_CONCAT(input_brigade, body_brigade); APR_BRIGADE_CONCAT(input_brigade, body_brigade);
if (tmpfile) { if (tmpfile) {
@@ -740,7 +682,7 @@ static int ap_proxy_http_prefetch(proxy_http_req_t *req,
* reasonable size. * reasonable size.
*/ */
temp_brigade = apr_brigade_create(p, bucket_alloc); temp_brigade = apr_brigade_create(p, bucket_alloc);
block = req->flushall ? APR_NONBLOCK_READ : APR_BLOCK_READ; block = req->prefetch_nonblocking ? APR_NONBLOCK_READ : APR_BLOCK_READ;
do { do {
status = ap_get_brigade(r->input_filters, temp_brigade, status = ap_get_brigade(r->input_filters, temp_brigade,
AP_MODE_READBYTES, block, AP_MODE_READBYTES, block,
@@ -791,7 +733,7 @@ static int ap_proxy_http_prefetch(proxy_http_req_t *req,
*/ */
} while ((bytes_read < MAX_MEM_SPOOL - 80) } while ((bytes_read < MAX_MEM_SPOOL - 80)
&& !APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade)) && !APR_BUCKET_IS_EOS(APR_BRIGADE_LAST(input_brigade))
&& !req->flushall); && !req->prefetch_nonblocking);
/* Use chunked request body encoding or send a content-length body? /* Use chunked request body encoding or send a content-length body?
* *
@@ -899,16 +841,12 @@ static int ap_proxy_http_prefetch(proxy_http_req_t *req,
/* If we have to spool the body, do it now, before connecting or /* If we have to spool the body, do it now, before connecting or
* reusing the backend connection. * reusing the backend connection.
*/ */
rv = spool_reqbody_cl(req); rv = spool_reqbody_cl(req, &bytes);
if (rv != OK) { if (rv != OK) {
return rv; return rv;
} }
if (bytes_read > 0 if (bytes || req->old_te_val || req->old_cl_val) {
|| req->old_te_val add_cl(p, bucket_alloc, header_brigade, apr_off_t_toa(p, bytes));
|| req->old_cl_val
|| req->bytes_spooled) {
add_cl(p, bucket_alloc, header_brigade,
apr_off_t_toa(p, req->bytes_spooled));
} }
} }
@@ -2138,15 +2076,15 @@ static int proxy_http_handler(request_rec *r, proxy_worker *worker,
* req->expecting_100 (i.e. cleared only if mod_proxy_http sent the * req->expecting_100 (i.e. cleared only if mod_proxy_http sent the
* "100 Continue" according to its policy). * "100 Continue" according to its policy).
*/ */
req->do_100_continue = req->flushall = 1; req->do_100_continue = req->prefetch_nonblocking = 1;
req->expecting_100 = r->expecting_100; req->expecting_100 = r->expecting_100;
r->expecting_100 = 0; r->expecting_100 = 0;
} }
/* Should we block while prefetching the body or try nonblocking and flush /* Should we block while prefetching the body or try nonblocking and flush
* data to the backend ASAP? * data to the backend ASAP?
*/ */
else if (apr_table_get(r->subprocess_env, "proxy-flushall")) { else if (apr_table_get(r->subprocess_env, "proxy-prefetch-nonblocking")) {
req->flushall = 1; req->prefetch_nonblocking = 1;
} }
/* /*