diff --git a/modules/proxy/mod_proxy.h b/modules/proxy/mod_proxy.h index 58d6cc54d7..b013fcf089 100644 --- a/modules/proxy/mod_proxy.h +++ b/modules/proxy/mod_proxy.h @@ -1356,9 +1356,10 @@ PROXY_DECLARE(apr_status_t) ap_proxy_buckets_lifetime_transform(request_rec *r, * The flags for ap_proxy_transfer_between_connections(), where for legacy and * compatibility reasons FLUSH_EACH and FLUSH_AFTER are boolean values. */ -#define AP_PROXY_TRANSFER_FLUSH_EACH (0x0) -#define AP_PROXY_TRANSFER_FLUSH_AFTER (0x1) -#define AP_PROXY_TRANSFER_SHOULD_YIELD (0x2) +#define AP_PROXY_TRANSFER_FLUSH_EACH (0x00) +#define AP_PROXY_TRANSFER_FLUSH_AFTER (0x01) +#define AP_PROXY_TRANSFER_YIELD_PENDING (0x02) +#define AP_PROXY_TRANSFER_YIELD_MAX_READS (0x04) /* * Sends all data that can be read non blocking from the input filter chain of @@ -1381,7 +1382,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_buckets_lifetime_transform(request_rec *r, * @return apr_status_t of the operation. Could be any error returned from * either the input filter chain of c_i or the output filter chain * of c_o, APR_EPIPE if the outgoing connection was aborted, or - * APR_INCOMPLETE if AP_PROXY_TRANSFER_SHOULD_YIELD was set and + * APR_INCOMPLETE if AP_PROXY_TRANSFER_YIELD_PENDING was set and * the output stack gets full before the input stack is exhausted. */ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections( diff --git a/modules/proxy/mod_proxy_http.c b/modules/proxy/mod_proxy_http.c index 7b4132e837..01ba1ce534 100644 --- a/modules/proxy/mod_proxy_http.c +++ b/modules/proxy/mod_proxy_http.c @@ -1667,11 +1667,6 @@ int ap_proxy_http_process_response(proxy_http_req_t *req) tunnel->timeout = client_timeout; } - /* Bidirectional non-HTTP stream will confuse mod_reqtimeoout, we - * use a single idle timeout from now on. - */ - ap_remove_input_filter_byhandle(c->input_filters, "reqtimeout"); - /* Let proxy tunnel forward everything */ status = ap_proxy_tunnel_run(tunnel); if (ap_is_HTTP_ERROR(status)) { diff --git a/modules/proxy/mod_proxy_wstunnel.c b/modules/proxy/mod_proxy_wstunnel.c index f6d1e34664..75ac8e69eb 100644 --- a/modules/proxy/mod_proxy_wstunnel.c +++ b/modules/proxy/mod_proxy_wstunnel.c @@ -239,9 +239,7 @@ static int proxy_wstunnel_request(apr_pool_t *p, request_rec *r, apr_brigade_cleanup(header_brigade); - ap_remove_input_filter_byhandle(c->input_filters, "reqtimeout"); - - rv = ap_proxy_tunnel_create(&tunnel, r, conn->connection, scheme); + rv = ap_proxy_tunnel_create(&tunnel, r, backconn, scheme); if (rv != APR_SUCCESS) { ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02543) "error creating websocket tunnel"); diff --git a/modules/proxy/proxy_util.c b/modules/proxy/proxy_util.c index b7ec07a517..f7c2d03447 100644 --- a/modules/proxy/proxy_util.c +++ b/modules/proxy/proxy_util.c @@ -4107,6 +4107,16 @@ PROXY_DECLARE(apr_status_t) ap_proxy_buckets_lifetime_transform(request_rec *r, return rv; } +/* An arbitrary large value to address pathological case where we keep + * reading from one side only, without scheduling the other direction for + * too long. This can happen with large MTU and small read buffers, like + * micro-benchmarking huge files bidirectional transfer with client, proxy + * and backend on localhost for instance. Though we could just ignore the + * case and let the sender stop by itself at some point when/if it needs to + * receive data, or the receiver stop when/if it needs to send... + */ +#define PROXY_TRANSFER_MAX_READS 10000 + PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections( request_rec *r, conn_rec *c_i, @@ -4120,26 +4130,77 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections( { apr_status_t rv; int flush_each = 0; + unsigned int num_reads = 0; #ifdef DEBUGGING apr_off_t len; #endif /* * Compat: since FLUSH_EACH is default (and zero) for legacy reasons, we - * pretend it's no FLUSH_AFTER nor SHOULD_YIELD flags, the latter because + * pretend it's no FLUSH_AFTER nor YIELD_PENDING flags, the latter because * flushing would defeat the purpose of checking for pending data (hence * determine whether or not the output chain/stack is full for stopping). */ if (!(flags & (AP_PROXY_TRANSFER_FLUSH_AFTER | - AP_PROXY_TRANSFER_SHOULD_YIELD))) { + AP_PROXY_TRANSFER_YIELD_PENDING))) { flush_each = 1; } - do { + for (;;) { + /* Yield if the output filters stack is full? This is to avoid + * blocking and give the caller a chance to POLLOUT async. + */ + if (flags & AP_PROXY_TRANSFER_YIELD_PENDING) { + int rc = OK; + + if (!ap_filter_should_yield(c_o->output_filters)) { + rc = ap_filter_output_pending(c_o); + } + if (rc == OK) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, + "ap_proxy_transfer_between_connections: " + "yield (output pending)"); + rv = APR_INCOMPLETE; + break; + } + if (rc != DECLINED) { + rv = AP_FILTER_ERROR; + break; + } + } + + /* Yield if we keep hold of the thread for too long? This gives + * the caller a chance to schedule the other direction too. + */ + if ((flags & AP_PROXY_TRANSFER_YIELD_MAX_READS) + && ++num_reads > PROXY_TRANSFER_MAX_READS) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, + "ap_proxy_transfer_between_connections: " + "yield (max reads)"); + rv = APR_SUCCESS; + break; + } + apr_brigade_cleanup(bb_i); rv = ap_get_brigade(c_i->input_filters, bb_i, AP_MODE_READBYTES, APR_NONBLOCK_READ, bsize); - if (rv == APR_SUCCESS) { + if (rv != APR_SUCCESS) { + if (!APR_STATUS_IS_EAGAIN(rv) && !APR_STATUS_IS_EOF(rv)) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, rv, r, APLOGNO(03308) + "ap_proxy_transfer_between_connections: " + "error on %s - ap_get_brigade", + name); + if (rv == APR_INCOMPLETE) { + /* Don't return APR_INCOMPLETE, it'd mean "should yield" + * for the caller, while it means "incomplete body" here + * from ap_http_filter(), which is an error. + */ + rv = APR_EGENERAL; + } + } + break; + } + { if (c_o->aborted) { apr_brigade_cleanup(bb_i); flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER; @@ -4178,30 +4239,17 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections( APR_BRIGADE_INSERT_TAIL(bb_o, b); } rv = ap_pass_brigade(c_o->output_filters, bb_o); + apr_brigade_cleanup(bb_o); if (rv != APR_SUCCESS) { ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(03307) "ap_proxy_transfer_between_connections: " "error on %s - ap_pass_brigade", name); flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER; + break; } - else if ((flags & AP_PROXY_TRANSFER_SHOULD_YIELD) && - ap_filter_should_yield(c_o->output_filters)) { - ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, - "ap_proxy_transfer_between_connections: " - "output filters full"); - flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER; - rv = APR_INCOMPLETE; - } - apr_brigade_cleanup(bb_o); } - else if (!APR_STATUS_IS_EAGAIN(rv) && !APR_STATUS_IS_EOF(rv)) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, rv, r, APLOGNO(03308) - "ap_proxy_transfer_between_connections: " - "error on %s - ap_get_brigade", - name); - } - } while (rv == APR_SUCCESS); + } if (flags & AP_PROXY_TRANSFER_FLUSH_AFTER) { ap_fflush(c_o->output_filters, bb_o); @@ -4210,12 +4258,14 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections( apr_brigade_cleanup(bb_i); ap_log_rerror(APLOG_MARK, APLOG_TRACE2, rv, r, - "ap_proxy_transfer_between_connections complete"); + "ap_proxy_transfer_between_connections complete (%s %pI)", + (c_i == r->connection) ? "to" : "from", + (c_i == r->connection) ? c_o->client_addr + : c_i->client_addr); if (APR_STATUS_IS_EAGAIN(rv)) { rv = APR_SUCCESS; } - return rv; } @@ -4226,7 +4276,7 @@ struct proxy_tunnel_conn { apr_bucket_brigade *bb; struct proxy_tunnel_conn *other; unsigned int readable:1, - drain:1; + writable:1; }; PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel, @@ -4257,7 +4307,6 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel, tunnel->client->name = "client"; tunnel->client->bb = apr_brigade_create(c_i->pool, c_i->bucket_alloc); tunnel->client->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t); - memset(tunnel->client->pfd, 0, sizeof(*tunnel->client->pfd)); tunnel->client->pfd->p = r->pool; tunnel->client->pfd->desc_type = APR_POLL_SOCKET; tunnel->client->pfd->desc.s = ap_get_conn_socket(c_i); @@ -4269,7 +4318,6 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel, tunnel->origin->name = "origin"; tunnel->origin->bb = apr_brigade_create(c_o->pool, c_o->bucket_alloc); tunnel->origin->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t); - memset(tunnel->origin->pfd, 0, sizeof(*tunnel->origin->pfd)); tunnel->origin->pfd->p = r->pool; tunnel->origin->pfd->desc_type = APR_POLL_SOCKET; tunnel->origin->pfd->desc.s = ap_get_conn_socket(c_o); @@ -4277,12 +4325,9 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel, tunnel->origin->other = tunnel->client; tunnel->origin->readable = 1; -#if 0 + /* We should be nonblocking from now on the sockets */ apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1); - apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1); - apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1); - apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1); -#endif + apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_NONBLOCK, 1); /* No coalescing filters */ ap_remove_output_filter_byhandle(c_i->output_filters, @@ -4290,13 +4335,31 @@ PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel, ap_remove_output_filter_byhandle(c_o->output_filters, "SSL/TLS Coalescing Filter"); + /* Bidirectional non-HTTP stream will confuse mod_reqtimeoout */ + ap_remove_input_filter_byhandle(c_i->input_filters, "reqtimeout"); + /* The input/output filter stacks should contain connection filters only */ r->input_filters = r->proto_input_filters = c_i->input_filters; r->output_filters = r->proto_output_filters = c_i->output_filters; + /* Won't be reused after tunneling */ c_i->keepalive = AP_CONN_CLOSE; c_o->keepalive = AP_CONN_CLOSE; + /* Start with POLLOUT and let ap_proxy_tunnel_run() schedule both + * directions when there are no output data pending (anymore). + */ + tunnel->client->pfd->reqevents = APR_POLLOUT; + rv = apr_pollset_add(tunnel->pollset, tunnel->client->pfd); + if (rv != APR_SUCCESS) { + return rv; + } + tunnel->origin->pfd->reqevents = APR_POLLOUT; + rv = apr_pollset_add(tunnel->pollset, tunnel->origin->pfd); + if (rv != APR_SUCCESS) { + return rv; + } + *ptunnel = tunnel; return APR_SUCCESS; } @@ -4360,33 +4423,27 @@ PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel) int rc = OK; request_rec *r = tunnel->r; apr_pollset_t *pollset = tunnel->pollset; + struct proxy_tunnel_conn *client = tunnel->client, + *origin = tunnel->origin; apr_interval_time_t timeout = tunnel->timeout >= 0 ? tunnel->timeout : -1; - struct proxy_tunnel_conn *client = tunnel->client, *origin = tunnel->origin; apr_size_t read_buf_size = ap_get_read_buf_size(r); const char *scheme = tunnel->scheme; apr_status_t rv; ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10212) - "proxy: %s: tunnel running (timeout %" APR_TIME_T_FMT "." - "%" APR_TIME_T_FMT ")", - scheme, timeout > 0 ? apr_time_sec(timeout) : timeout, - timeout > 0 ? timeout % APR_USEC_PER_SEC : 0); - - client->pfd->reqevents = 0; - origin->pfd->reqevents = 0; - add_pollset(pollset, client->pfd, APR_POLLIN); - add_pollset(pollset, origin->pfd, APR_POLLIN); + "proxy: %s: tunnel running (timeout %lf)", + scheme, timeout >= 0 ? (double)timeout / APR_USEC_PER_SEC + : (double)-1.0); /* Loop until both directions of the connection are closed, * or a failure occurs. */ do { - struct proxy_tunnel_conn *in, *out; const apr_pollfd_t *results; apr_int32_t nresults, i; ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, - "proxy: %s: polling client=%hx, origin=%hx", + "proxy: %s: polling (client=%hx, origin=%hx)", scheme, client->pfd->reqevents, origin->pfd->reqevents); do { rv = apr_pollset_poll(pollset, timeout, &nresults, &results); @@ -4395,7 +4452,10 @@ PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel) if (rv != APR_SUCCESS) { if (APR_STATUS_IS_TIMEUP(rv)) { ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10213) - "proxy: %s: polling timeout", scheme); + "proxy: %s: polling timed out " + "(client=%hx, origin=%hx)", + scheme, client->pfd->reqevents, + origin->pfd->reqevents); rc = HTTP_GATEWAY_TIME_OUT; } else { @@ -4410,51 +4470,88 @@ PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel) "proxy: %s: woken up, %i result(s)", scheme, nresults); for (i = 0; i < nresults; i++) { - const apr_pollfd_t *cur = &results[i]; - int revents = cur->rtnevents; + const apr_pollfd_t *pfd = &results[i]; + struct proxy_tunnel_conn *tc = pfd->client_data; + + ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, + "proxy: %s: #%i: %s: %hx/%hx", scheme, i, + tc->name, pfd->rtnevents, tc->pfd->reqevents); /* sanity check */ - if (cur->desc.s != client->pfd->desc.s - && cur->desc.s != origin->pfd->desc.s) { + if (pfd->desc.s != client->pfd->desc.s + && pfd->desc.s != origin->pfd->desc.s) { ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10222) "proxy: %s: unknown socket in pollset", scheme); rc = HTTP_INTERNAL_SERVER_ERROR; goto cleanup; } - - in = cur->client_data; - if (revents & APR_POLLOUT) { - in = in->other; - } - else if (!(revents & (APR_POLLIN | APR_POLLHUP))) { + if (!(pfd->rtnevents & (APR_POLLIN | APR_POLLHUP | APR_POLLOUT))) { /* this catches POLLERR/POLLNVAL etc.. */ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10220) "proxy: %s: polling events error (%x)", - scheme, revents); + scheme, pfd->rtnevents); rc = HTTP_INTERNAL_SERVER_ERROR; goto cleanup; } - out = in->other; - ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, - "proxy: %s: #%i: %s/%hx => %s/%hx: %x", - scheme, i, in->name, in->pfd->reqevents, - out->name, out->pfd->reqevents, revents); + if (pfd->rtnevents & APR_POLLOUT) { + struct proxy_tunnel_conn *out = tc, *in = tc->other; - if (in->readable && (in->drain || !(revents & APR_POLLOUT))) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, + "proxy: %s: %s output ready", + scheme, out->name); + + rc = ap_filter_output_pending(out->c); + if (rc == OK) { + /* Keep polling out (only) */ + continue; + } + if (rc != DECLINED) { + /* Real failure, bail out */ + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10221) + "proxy: %s: %s flushing failed (%i)", + scheme, out->name, rc); + goto cleanup; + } + rc = OK; + + /* No more pending data. If the input side is not readable + * anymore it's time to shutdown for write (this direction + * is over). Otherwise back to normal business. + */ + del_pollset(pollset, out->pfd, APR_POLLOUT); + if (in->readable) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE5, 0, r, + "proxy: %s: %s resume writable", + scheme, out->name); + add_pollset(pollset, in->pfd, APR_POLLIN); + out->writable = 1; + } + else { + ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, + "proxy: %s: %s write shutdown", + scheme, out->name); + apr_socket_shutdown(out->pfd->desc.s, 1); + } + } + + if (pfd->rtnevents & (APR_POLLIN | APR_POLLHUP) + || (tc->readable && tc->other->writable + && ap_filter_input_pending(tc->c) == OK)) { + struct proxy_tunnel_conn *in = tc, *out = tc->other; int sent = 0; ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, - "proxy: %s: %s is %s", scheme, in->name, - (revents & APR_POLLOUT) ? "draining" - : "readable"); + "proxy: %s: %s input ready", + scheme, in->name); rv = ap_proxy_transfer_between_connections(r, - in->c, out->c, - in->bb, out->bb, - in->name, &sent, - read_buf_size, - AP_PROXY_TRANSFER_SHOULD_YIELD); + in->c, out->c, + in->bb, out->bb, + in->name, &sent, + read_buf_size, + AP_PROXY_TRANSFER_YIELD_PENDING | + AP_PROXY_TRANSFER_YIELD_MAX_READS); if (sent && out == client) { tunnel->replied = 1; } @@ -4464,73 +4561,30 @@ PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel) * side, hence avoid filling the output filters even * more and hence blocking there. */ - ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, + ap_log_rerror(APLOG_MARK, APLOG_TRACE5, 0, r, "proxy: %s: %s wait writable", scheme, out->name); - revents &= ~APR_POLLOUT; - in->drain = 1; + out->writable = 0; } else if (APR_STATUS_IS_EOF(rv)) { /* Stop POLLIN and wait for POLLOUT (flush) on the * other side to shut it down. */ - ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, + ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "proxy: %s: %s read shutdown", scheme, in->name); - in->readable = in->drain = 0; + in->readable = 0; } else { /* Real failure, bail out */ rc = HTTP_INTERNAL_SERVER_ERROR; goto cleanup; } + del_pollset(pollset, in->pfd, APR_POLLIN); - sent = 1; - } - else { - in->drain = 0; - } - - if (sent) { add_pollset(pollset, out->pfd, APR_POLLOUT); } } - - if (revents & APR_POLLOUT) { - ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, - "proxy: %s: %s is writable", - scheme, out->name); - - rv = ap_filter_output_pending(out->c); - if (rv == DECLINED) { - /* No more pending data. If the 'in' side is not readable - * anymore it's time to shutdown for write (this direction - * is over). Otherwise draining (if any) is done, back to - * normal business. - */ - if (!in->readable) { - ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, - "proxy: %s: %s write shutdown", - scheme, out->name); - del_pollset(pollset, out->pfd, APR_POLLOUT); - apr_socket_shutdown(out->pfd->desc.s, 1); - } - else { - add_pollset(pollset, in->pfd, APR_POLLIN); - if (!in->drain) { - del_pollset(pollset, out->pfd, APR_POLLOUT); - } - } - } - else if (rv != OK) { - /* Real failure, bail out */ - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10221) - "proxy: %s: %s flushing failed (%i)", - scheme, out->name, rv); - rc = HTTP_INTERNAL_SERVER_ERROR; - goto cleanup; - } - } } } while (client->pfd->reqevents || origin->pfd->reqevents); @@ -4538,8 +4592,6 @@ PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel) "proxy: %s: tunnel finished", scheme); cleanup: - del_pollset(pollset, client->pfd, ~0); - del_pollset(pollset, origin->pfd, ~0); return rc; }