1
0
mirror of https://github.com/apache/httpd.git synced 2025-08-05 16:55:50 +03:00

mod_proxy: Improve tunneling loop.

Support half closed connections and pending data draining (for protocols like
rsync). PR 61616.

When reading on one side goes faster than writing on the other side, the output
filters chain may start buffering data and finally block, which will break
bidirectional tunneling for some protocols.

To avoid this, proxy_tunnel_run() now stops polling/reading until pending data
are drained, and recovers appropriately.


git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1869420 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yann Ylavic
2019-11-05 16:41:14 +00:00
parent 3d3e03a63a
commit 124a26fb09
6 changed files with 368 additions and 184 deletions

View File

@@ -4051,13 +4051,25 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
const char *name,
int *sent,
apr_off_t bsize,
int after)
int flags)
{
apr_status_t rv;
int flush_each = 0;
#ifdef DEBUGGING
apr_off_t len;
#endif
/*
* Compat: since FLUSH_EACH is default (and zero) for legacy reasons, we
* pretend it's no FLUSH_AFTER nor SHOULD_YIELD flags, the latter because
* flushing would defeat the purpose of checking for pending data (hence
* determine whether or not the output chain/stack is full for stopping).
*/
if (!(flags & (AP_PROXY_TRANSFER_FLUSH_AFTER |
AP_PROXY_TRANSFER_SHOULD_YIELD))) {
flush_each = 1;
}
do {
apr_brigade_cleanup(bb_i);
rv = ap_get_brigade(c_i->input_filters, bb_i, AP_MODE_READBYTES,
@@ -4065,7 +4077,9 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
if (rv == APR_SUCCESS) {
if (c_o->aborted) {
apr_brigade_cleanup(bb_i);
return APR_EPIPE;
flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
rv = APR_EPIPE;
break;
}
if (APR_BRIGADE_EMPTY(bb_i)) {
break;
@@ -4082,14 +4096,14 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
*sent = 1;
}
ap_proxy_buckets_lifetime_transform(r, bb_i, bb_o);
if (!after) {
if (flush_each) {
apr_bucket *b;
/*
* Do not use ap_fflush here since this would cause the flush
* bucket to be sent in a separate brigade afterwards which
* causes some filters to set aside the buckets from the first
* brigade and process them when the flush arrives in the second
* brigade and process them when FLUSH arrives in the second
* brigade. As set asides of our transformed buckets involve
* memory copying we try to avoid this. If we have the flush
* bucket in the first brigade they directly process the
@@ -4104,6 +4118,15 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
"ap_proxy_transfer_between_connections: "
"error on %s - ap_pass_brigade",
name);
flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
}
else if ((flags & AP_PROXY_TRANSFER_SHOULD_YIELD) &&
ap_filter_should_yield(c_o->output_filters)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
"ap_proxy_transfer_between_connections: "
"output filters full");
flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
rv = APR_INCOMPLETE;
}
apr_brigade_cleanup(bb_o);
}
@@ -4115,7 +4138,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
}
} while (rv == APR_SUCCESS);
if (after) {
if (flags & AP_PROXY_TRANSFER_FLUSH_AFTER) {
ap_fflush(c_o->output_filters, bb_o);
apr_brigade_cleanup(bb_o);
}
@@ -4131,191 +4154,328 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
return rv;
}
struct proxy_tunnel_conn {
conn_rec *c;
const char *name;
apr_pollfd_t *pfd;
apr_bucket_brigade *bb;
struct proxy_tunnel_conn *other;
unsigned int readable:1,
drain:1;
};
PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel,
request_rec *r,
conn_rec *origin)
request_rec *r, conn_rec *c_o,
const char *scheme)
{
apr_status_t rv;
apr_pollfd_t *pfds;
conn_rec *c = r->connection;
conn_rec *c_i = r->connection;
proxy_tunnel_rec *tunnel;
*ptunnel = NULL;
tunnel = apr_pcalloc(r->pool, sizeof(*tunnel));
tunnel->r = r;
tunnel->origin = origin;
tunnel->bb_i = apr_brigade_create(r->pool,
c->bucket_alloc);
tunnel->bb_o = apr_brigade_create(origin->pool,
origin->bucket_alloc);
tunnel->timeout = -1;
rv = apr_pollset_create(&tunnel->pollset, 2, r->pool,
APR_POLLSET_NOCOPY);
rv = apr_pollset_create(&tunnel->pollset, 2, r->pool, APR_POLLSET_NOCOPY);
if (rv != APR_SUCCESS) {
return rv;
}
tunnel->r = r;
tunnel->scheme = apr_pstrdup(r->pool, scheme);
tunnel->client = apr_pcalloc(r->pool, sizeof(struct proxy_tunnel_conn));
tunnel->origin = apr_pcalloc(r->pool, sizeof(struct proxy_tunnel_conn));
tunnel->pfds = apr_array_make(r->pool, 2, sizeof(apr_pollfd_t));
apr_array_push(tunnel->pfds); /* pfds[0] */
apr_array_push(tunnel->pfds); /* pfds[1] */
tunnel->timeout = -1;
pfds = &APR_ARRAY_IDX(tunnel->pfds, 0, apr_pollfd_t);
pfds[0].desc.s = ap_get_conn_socket(c);
pfds[1].desc.s = ap_get_conn_socket(origin);
pfds[0].desc_type = pfds[1].desc_type = APR_POLL_SOCKET;
pfds[0].reqevents = pfds[1].reqevents = APR_POLLIN | APR_POLLHUP;
pfds[0].p = pfds[1].p = r->pool;
tunnel->client->c = c_i;
tunnel->client->name = "client";
tunnel->client->bb = apr_brigade_create(c_i->pool, c_i->bucket_alloc);
tunnel->client->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t);
memset(tunnel->client->pfd, 0, sizeof(*tunnel->client->pfd));
tunnel->client->pfd->p = r->pool;
tunnel->client->pfd->desc_type = APR_POLL_SOCKET;
tunnel->client->pfd->desc.s = ap_get_conn_socket(c_i);
tunnel->client->pfd->client_data = tunnel->client;
tunnel->client->other = tunnel->origin;
tunnel->client->readable = 1;
tunnel->origin->c = c_o;
tunnel->origin->name = "origin";
tunnel->origin->bb = apr_brigade_create(c_o->pool, c_o->bucket_alloc);
tunnel->origin->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t);
memset(tunnel->origin->pfd, 0, sizeof(*tunnel->origin->pfd));
tunnel->origin->pfd->p = r->pool;
tunnel->origin->pfd->desc_type = APR_POLL_SOCKET;
tunnel->origin->pfd->desc.s = ap_get_conn_socket(c_o);
tunnel->origin->pfd->client_data = tunnel->origin;
tunnel->origin->other = tunnel->client;
tunnel->origin->readable = 1;
#if 0
apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1);
apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1);
#endif
/* No coalescing filters */
ap_remove_output_filter_byhandle(c_i->output_filters,
"SSL/TLS Coalescing Filter");
ap_remove_output_filter_byhandle(c_o->output_filters,
"SSL/TLS Coalescing Filter");
/* The input/output filter stacks should contain connection filters only */
r->output_filters = c->output_filters;
r->proto_output_filters = c->output_filters;
r->input_filters = c->input_filters;
r->proto_input_filters = c->input_filters;
r->input_filters = r->proto_input_filters = c_i->input_filters;
r->output_filters = r->proto_output_filters = c_i->output_filters;
c->keepalive = AP_CONN_CLOSE;
origin->keepalive = AP_CONN_CLOSE;
c_i->keepalive = AP_CONN_CLOSE;
c_o->keepalive = AP_CONN_CLOSE;
*ptunnel = tunnel;
return APR_SUCCESS;
}
PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel)
static void add_pollset(apr_pollset_t *pollset, apr_pollfd_t *pfd,
apr_int16_t events)
{
apr_status_t rv;
request_rec *r = tunnel->r;
conn_rec *c_i = r->connection;
conn_rec *c_o = tunnel->origin;
apr_socket_t *sock_i = ap_get_conn_socket(c_i);
apr_socket_t *sock_o = ap_get_conn_socket(c_o);
apr_interval_time_t timeout = tunnel->timeout >= 0 ? tunnel->timeout : -1;
apr_pollfd_t *pfds = &APR_ARRAY_IDX(tunnel->pfds, 0, apr_pollfd_t);
apr_pollset_t *pollset = tunnel->pollset;
const apr_pollfd_t *signalled;
apr_int32_t pollcnt, pi;
int done = 0;
AP_DEBUG_ASSERT(tunnel->pfds->nelts == 2);
AP_DEBUG_ASSERT(pfds[0].desc.s == sock_i);
AP_DEBUG_ASSERT(pfds[1].desc.s == sock_o);
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10212)
"proxy: tunnel: running (timeout %" APR_TIME_T_FMT "."
"%" APR_TIME_T_FMT ")",
timeout > 0 ? apr_time_sec(timeout) : timeout,
timeout > 0 ? timeout % APR_USEC_PER_SEC : 0);
#if 0
apr_socket_opt_set(sock_i, APR_SO_NONBLOCK, 1);
apr_socket_opt_set(sock_i, APR_SO_NONBLOCK, 1);
apr_socket_opt_set(sock_o, APR_SO_KEEPALIVE, 1);
apr_socket_opt_set(sock_o, APR_SO_KEEPALIVE, 1);
#endif
apr_pollset_add(pollset, &pfds[0]);
apr_pollset_add(pollset, &pfds[1]);
do { /* Loop until done (one side closes the connection, or an error) */
rv = apr_pollset_poll(tunnel->pollset, timeout, &pollcnt, &signalled);
if (rv != APR_SUCCESS) {
if (APR_STATUS_IS_EINTR(rv)) {
continue;
}
apr_pollset_remove(pollset, &pfds[1]);
apr_pollset_remove(pollset, &pfds[0]);
if (APR_STATUS_IS_TIMEUP(rv)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10213)
"proxy: tunnel: woken up, i=%d", (int)pollcnt);
return HTTP_GATEWAY_TIME_OUT;
}
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10214)
"proxy: tunnel: polling failed");
return HTTP_INTERNAL_SERVER_ERROR;
}
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10215)
"proxy: tunnel: woken up, i=%d", (int)pollcnt);
for (pi = 0; pi < pollcnt; pi++) {
const apr_pollfd_t *cur = &signalled[pi];
apr_int16_t pollevent = cur->rtnevents;
if (cur->desc.s == sock_o) {
if (pollevent & (APR_POLLIN | APR_POLLHUP)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10216)
"proxy: tunnel: backend was readable");
rv = ap_proxy_transfer_between_connections(r, c_o, c_i,
tunnel->bb_o,
tunnel->bb_i,
"backend",
&tunnel->replied,
AP_IOBUFSIZE,
0);
done |= (rv != APR_SUCCESS);
}
else if (pollevent & APR_POLLERR) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10217)
"proxy: tunnel: error on backend connection");
c_o->aborted = 1;
done = 1;
}
else {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10218)
"proxy: tunnel: unknown event %d on backend connection",
(int)pollevent);
done = 1;
}
}
else if (cur->desc.s == sock_i) {
if (pollevent & (APR_POLLIN | APR_POLLHUP)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10219)
"proxy: tunnel: client was readable");
rv = ap_proxy_transfer_between_connections(r, c_i, c_o,
tunnel->bb_i,
tunnel->bb_o,
"client", NULL,
AP_IOBUFSIZE,
0);
done |= (rv != APR_SUCCESS);
}
else if (pollevent & APR_POLLERR) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10220)
"proxy: tunnel: error on client connection");
c_i->aborted = 1;
done = 1;
}
else {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10221)
"proxy: tunnel: unknown event %d on client connection",
(int)pollevent);
done = 1;
}
}
else {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10222)
"proxy: tunnel: unknown socket in pollset");
done = 1;
}
}
} while (!done);
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10223)
"proxy: tunnel: finished");
apr_pollset_remove(pollset, &pfds[1]);
apr_pollset_remove(pollset, &pfds[0]);
if (!tunnel->replied) {
return HTTP_BAD_GATEWAY;
if (events & APR_POLLIN) {
events |= APR_POLLHUP;
}
return OK;
if ((pfd->reqevents & events) == events) {
return;
}
if (pfd->reqevents) {
rv = apr_pollset_remove(pollset, pfd);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(1);
}
}
pfd->reqevents |= events;
rv = apr_pollset_add(pollset, pfd);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(1);
}
}
static void del_pollset(apr_pollset_t *pollset, apr_pollfd_t *pfd,
apr_int16_t events)
{
apr_status_t rv;
if (events & APR_POLLIN) {
events |= APR_POLLHUP;
}
if ((pfd->reqevents & events) == 0) {
return;
}
rv = apr_pollset_remove(pollset, pfd);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(1);
}
pfd->reqevents &= ~events;
if (pfd->reqevents) {
rv = apr_pollset_add(pollset, pfd);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(1);
}
}
}
PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel)
{
int rc = OK;
request_rec *r = tunnel->r;
apr_pollset_t *pollset = tunnel->pollset;
apr_interval_time_t timeout = tunnel->timeout >= 0 ? tunnel->timeout : -1;
struct proxy_tunnel_conn *client = tunnel->client, *origin = tunnel->origin;
apr_size_t read_buf_size = ap_get_read_buf_size(r);
const char *scheme = tunnel->scheme;
apr_status_t rv;
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10212)
"proxy: %s: tunnel running (timeout %" APR_TIME_T_FMT "."
"%" APR_TIME_T_FMT ")",
scheme, timeout > 0 ? apr_time_sec(timeout) : timeout,
timeout > 0 ? timeout % APR_USEC_PER_SEC : 0);
client->pfd->reqevents = 0;
origin->pfd->reqevents = 0;
add_pollset(pollset, client->pfd, APR_POLLIN);
add_pollset(pollset, origin->pfd, APR_POLLIN);
/* Loop until both directions of the connection are closed,
* or a failure occurs.
*/
do {
struct proxy_tunnel_conn *in, *out;
const apr_pollfd_t *results;
apr_int32_t nresults, i;
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
"proxy: %s: polling client=%hx, origin=%hx",
scheme, client->pfd->reqevents, origin->pfd->reqevents);
do {
rv = apr_pollset_poll(pollset, timeout, &nresults, &results);
} while (APR_STATUS_IS_EINTR(rv));
if (rv != APR_SUCCESS) {
if (APR_STATUS_IS_TIMEUP(rv)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10213)
"proxy: %s: polling timeout", scheme);
rc = HTTP_GATEWAY_TIME_OUT;
}
else {
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10214)
"proxy: %s: polling failed", scheme);
rc = HTTP_INTERNAL_SERVER_ERROR;
}
goto cleanup;
}
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, APLOGNO(10215)
"proxy: %s: woken up, %i result(s)", scheme, nresults);
for (i = 0; i < nresults; i++) {
const apr_pollfd_t *cur = &results[i];
int revents = cur->rtnevents;
/* sanity check */
if (cur->desc.s != client->pfd->desc.s
&& cur->desc.s != origin->pfd->desc.s) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10222)
"proxy: %s: unknown socket in pollset", scheme);
rc = HTTP_INTERNAL_SERVER_ERROR;
goto cleanup;
}
in = cur->client_data;
if (revents & APR_POLLOUT) {
in = in->other;
}
else if (!(revents & (APR_POLLIN | APR_POLLHUP))) {
/* this catches POLLERR/POLLNVAL etc.. */
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10220)
"proxy: %s: polling events error (%x)",
scheme, revents);
rc = HTTP_INTERNAL_SERVER_ERROR;
goto cleanup;
}
out = in->other;
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
"proxy: %s: #%i: %s/%hx => %s/%hx: %x",
scheme, i, in->name, in->pfd->reqevents,
out->name, out->pfd->reqevents, revents);
if (in->readable && (in->drain || !(revents & APR_POLLOUT))) {
int sent = 0;
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
"proxy: %s: %s is %s", scheme, in->name,
(revents & APR_POLLOUT) ? "draining"
: "readable");
rv = ap_proxy_transfer_between_connections(r,
in->c, out->c,
in->bb, out->bb,
in->name, &sent,
read_buf_size,
AP_PROXY_TRANSFER_SHOULD_YIELD);
if (sent && out == client) {
tunnel->replied = 1;
}
if (rv != APR_SUCCESS) {
if (APR_STATUS_IS_INCOMPLETE(rv)) {
/* Pause POLLIN while waiting for POLLOUT on the other
* side, hence avoid filling the output filters even
* more and hence blocking there.
*/
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
"proxy: %s: %s wait writable",
scheme, out->name);
revents &= ~APR_POLLOUT;
in->drain = 1;
}
else if (APR_STATUS_IS_EOF(rv)) {
/* Stop POLLIN and wait for POLLOUT (flush) on the
* other side to shut it down.
*/
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
"proxy: %s: %s read shutdown",
scheme, in->name);
in->readable = in->drain = 0;
}
else {
/* Real failure, bail out */
rc = HTTP_INTERNAL_SERVER_ERROR;
goto cleanup;
}
del_pollset(pollset, in->pfd, APR_POLLIN);
sent = 1;
}
else {
in->drain = 0;
}
if (sent) {
add_pollset(pollset, out->pfd, APR_POLLOUT);
}
}
if (revents & APR_POLLOUT) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
"proxy: %s: %s is writable",
scheme, out->name);
rv = ap_filter_output_pending(out->c);
if (rv == DECLINED) {
/* No more pending data. If the 'in' side is not readable
* anymore it's time to shutdown for write (this direction
* is over). Otherwise draining (if any) is done, back to
* normal business.
*/
if (!in->readable) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
"proxy: %s: %s write shutdown",
scheme, out->name);
del_pollset(pollset, out->pfd, APR_POLLOUT);
apr_socket_shutdown(out->pfd->desc.s, 1);
}
else {
add_pollset(pollset, in->pfd, APR_POLLIN);
if (!in->drain) {
del_pollset(pollset, out->pfd, APR_POLLOUT);
}
}
}
else if (rv != OK) {
/* Real failure, bail out */
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10221)
"proxy: %s: %s flushing failed (%i)",
scheme, out->name, rv);
rc = HTTP_INTERNAL_SERVER_ERROR;
goto cleanup;
}
}
}
} while (client->pfd->reqevents || origin->pfd->reqevents);
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10223)
"proxy: %s: tunnel finished", scheme);
cleanup:
del_pollset(pollset, client->pfd, ~0);
del_pollset(pollset, origin->pfd, ~0);
return rc;
}
PROXY_DECLARE (const char *) ap_proxy_show_hcmethod(hcmethod_t method)