mirror of
https://github.com/apache/httpd.git
synced 2025-08-07 04:02:58 +03:00
core: Extend support for asynchronous write completion from the
network filter to any connection or request filter. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1706669 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
3
CHANGES
3
CHANGES
@@ -1,6 +1,9 @@
|
|||||||
-*- coding: utf-8 -*-
|
-*- coding: utf-8 -*-
|
||||||
Changes with Apache 2.5.0
|
Changes with Apache 2.5.0
|
||||||
|
|
||||||
|
*) core: Extend support for asynchronous write completion from the
|
||||||
|
network filter to any connection or request filter. [Graham Leggett]
|
||||||
|
|
||||||
*) mpm_event: Free memory earlier when shutting down processes.
|
*) mpm_event: Free memory earlier when shutting down processes.
|
||||||
[Stefan Fritsch]
|
[Stefan Fritsch]
|
||||||
|
|
||||||
|
@@ -489,6 +489,11 @@
|
|||||||
* protocol_switch and protocol_get. Add
|
* protocol_switch and protocol_get. Add
|
||||||
* ap_select_protocol(), ap_switch_protocol(),
|
* ap_select_protocol(), ap_switch_protocol(),
|
||||||
* ap_get_protocol(). Add HTTP_MISDIRECTED_REQUEST.
|
* ap_get_protocol(). Add HTTP_MISDIRECTED_REQUEST.
|
||||||
|
* 20150222.5 (2.5.0-dev) Add ap_request_core_filter(),
|
||||||
|
* ap_filter_setaside_brigade(),
|
||||||
|
* ap_filter_reinstate_brigade() and
|
||||||
|
* ap_filter_should_yield(). Add empty and filters to
|
||||||
|
* conn_rec.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define MODULE_MAGIC_COOKIE 0x41503235UL /* "AP25" */
|
#define MODULE_MAGIC_COOKIE 0x41503235UL /* "AP25" */
|
||||||
@@ -496,7 +501,7 @@
|
|||||||
#ifndef MODULE_MAGIC_NUMBER_MAJOR
|
#ifndef MODULE_MAGIC_NUMBER_MAJOR
|
||||||
#define MODULE_MAGIC_NUMBER_MAJOR 20150222
|
#define MODULE_MAGIC_NUMBER_MAJOR 20150222
|
||||||
#endif
|
#endif
|
||||||
#define MODULE_MAGIC_NUMBER_MINOR 4 /* 0...n */
|
#define MODULE_MAGIC_NUMBER_MINOR 5 /* 0...n */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determine if the server's current MODULE_MAGIC_NUMBER is at least a
|
* Determine if the server's current MODULE_MAGIC_NUMBER is at least a
|
||||||
|
@@ -792,6 +792,7 @@ AP_DECLARE_DATA extern ap_filter_rec_t *ap_subreq_core_filter_handle;
|
|||||||
AP_DECLARE_DATA extern ap_filter_rec_t *ap_core_output_filter_handle;
|
AP_DECLARE_DATA extern ap_filter_rec_t *ap_core_output_filter_handle;
|
||||||
AP_DECLARE_DATA extern ap_filter_rec_t *ap_content_length_filter_handle;
|
AP_DECLARE_DATA extern ap_filter_rec_t *ap_content_length_filter_handle;
|
||||||
AP_DECLARE_DATA extern ap_filter_rec_t *ap_core_input_filter_handle;
|
AP_DECLARE_DATA extern ap_filter_rec_t *ap_core_input_filter_handle;
|
||||||
|
AP_DECLARE_DATA extern ap_filter_rec_t *ap_request_core_filter_handle;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This hook provdes a way for modules to provide metrics/statistics about
|
* This hook provdes a way for modules to provide metrics/statistics about
|
||||||
|
@@ -149,6 +149,18 @@ AP_DECLARE(int) ap_run_sub_req(request_rec *r);
|
|||||||
*/
|
*/
|
||||||
AP_DECLARE(void) ap_destroy_sub_req(request_rec *r);
|
AP_DECLARE(void) ap_destroy_sub_req(request_rec *r);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An output filter to ensure that we avoid passing morphing buckets to
|
||||||
|
* connection filters and in so doing defeat async write completion when
|
||||||
|
* they are set aside. This should be inserted at the end of a request
|
||||||
|
* filter stack.
|
||||||
|
* @param f The current filter
|
||||||
|
* @param bb The brigade to filter
|
||||||
|
* @return status code
|
||||||
|
*/
|
||||||
|
AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f,
|
||||||
|
apr_bucket_brigade *bb);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Then there's the case that you want some other request to be served
|
* Then there's the case that you want some other request to be served
|
||||||
* as the top-level request INSTEAD of what the client requested directly.
|
* as the top-level request INSTEAD of what the client requested directly.
|
||||||
|
@@ -55,6 +55,7 @@
|
|||||||
#include "apr_buckets.h"
|
#include "apr_buckets.h"
|
||||||
#include "apr_poll.h"
|
#include "apr_poll.h"
|
||||||
#include "apr_thread_proc.h"
|
#include "apr_thread_proc.h"
|
||||||
|
#include "apr_hash.h"
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
@@ -1136,7 +1137,7 @@ struct conn_rec {
|
|||||||
conn_state_t *cs;
|
conn_state_t *cs;
|
||||||
/** Is there data pending in the input filters? */
|
/** Is there data pending in the input filters? */
|
||||||
int data_in_input_filters;
|
int data_in_input_filters;
|
||||||
/** Is there data pending in the output filters? */
|
/** No longer used, replaced with ap_filter_should_yield() */
|
||||||
int data_in_output_filters;
|
int data_in_output_filters;
|
||||||
|
|
||||||
/** Are there any filters that clogg/buffer the input stream, breaking
|
/** Are there any filters that clogg/buffer the input stream, breaking
|
||||||
@@ -1191,6 +1192,12 @@ struct conn_rec {
|
|||||||
|
|
||||||
/** Array of requests being handled under this connection. */
|
/** Array of requests being handled under this connection. */
|
||||||
apr_array_header_t *requests;
|
apr_array_header_t *requests;
|
||||||
|
|
||||||
|
/** Empty bucket brigade */
|
||||||
|
apr_bucket_brigade *empty;
|
||||||
|
|
||||||
|
/** Hashtable of filters with setaside buckets for write completion */
|
||||||
|
apr_hash_t *filters;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct conn_slave_rec {
|
struct conn_slave_rec {
|
||||||
|
@@ -278,6 +278,13 @@ struct ap_filter_t {
|
|||||||
* to the request_rec, except that it is used for connection filters.
|
* to the request_rec, except that it is used for connection filters.
|
||||||
*/
|
*/
|
||||||
conn_rec *c;
|
conn_rec *c;
|
||||||
|
|
||||||
|
/** Buffered data associated with the current filter. */
|
||||||
|
apr_bucket_brigade *bb;
|
||||||
|
|
||||||
|
/** Dedicated pool to use for deferred writes. */
|
||||||
|
apr_pool_t *deferred_pool;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -519,8 +526,11 @@ AP_DECLARE(apr_status_t) ap_remove_output_filter_byhandle(ap_filter_t *next,
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* prepare a bucket brigade to be setaside. If a different brigade was
|
* Prepare a bucket brigade to be setaside. If a different brigade was
|
||||||
* set-aside earlier, then the two brigades are concatenated together.
|
* set-aside earlier, then the two brigades are concatenated together.
|
||||||
|
*
|
||||||
|
* If *save_to is NULL, the brigade will be created, and a cleanup registered
|
||||||
|
* to clear the brigade address when the pool is destroyed.
|
||||||
* @param f The current filter
|
* @param f The current filter
|
||||||
* @param save_to The brigade that was previously set-aside. Regardless, the
|
* @param save_to The brigade that was previously set-aside. Regardless, the
|
||||||
* new bucket brigade is returned in this location.
|
* new bucket brigade is returned in this location.
|
||||||
@@ -532,6 +542,53 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f,
|
|||||||
apr_bucket_brigade **save_to,
|
apr_bucket_brigade **save_to,
|
||||||
apr_bucket_brigade **b, apr_pool_t *p);
|
apr_bucket_brigade **b, apr_pool_t *p);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare a bucket brigade to be setaside, creating a dedicated pool if
|
||||||
|
* necessary within the filter to handle the lifetime of the setaside brigade.
|
||||||
|
* @param f The current filter
|
||||||
|
* @param bb The bucket brigade to set aside. This brigade is always empty
|
||||||
|
* on return
|
||||||
|
*/
|
||||||
|
AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f,
|
||||||
|
apr_bucket_brigade *bb);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reinstate a brigade setaside earlier, and calculate the amount of data we
|
||||||
|
* should write based on the presence of flush buckets, size limits on in
|
||||||
|
* memory buckets, and the number of outstanding requests in the pipeline.
|
||||||
|
* This is a safety mechanism to protect against a module that might try
|
||||||
|
* generate data too quickly for downstream to handle without yielding as
|
||||||
|
* it should.
|
||||||
|
*
|
||||||
|
* If the brigade passed in is empty, we reinstate the brigade and return
|
||||||
|
* immediately on the assumption that any buckets needing to be flushed were
|
||||||
|
* flushed before being passed to ap_filter_setaside_brigade().
|
||||||
|
*
|
||||||
|
* @param f The current filter
|
||||||
|
* @param bb The bucket brigade to restore to.
|
||||||
|
* @param flush_upto Work out the bucket we need to flush up to, based on the
|
||||||
|
* presence of a flush bucket, size limits on in-memory
|
||||||
|
* buckets, size limits on the number of requests outstanding
|
||||||
|
* in the pipeline.
|
||||||
|
* @return APR_SUCCESS.
|
||||||
|
*/
|
||||||
|
AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f,
|
||||||
|
apr_bucket_brigade *bb,
|
||||||
|
apr_bucket **flush_upto);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function calculates whether there are any as yet unsent
|
||||||
|
* buffered brigades in downstream filters, and returns non zero
|
||||||
|
* if so.
|
||||||
|
*
|
||||||
|
* A filter should use this to determine whether the passing of data
|
||||||
|
* downstream might block, and so defer the passing of brigades
|
||||||
|
* downstream with ap_filter_setaside_brigade().
|
||||||
|
*
|
||||||
|
* This function can be called safely from a handler.
|
||||||
|
*/
|
||||||
|
AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush function for apr_brigade_* calls. This calls ap_pass_brigade
|
* Flush function for apr_brigade_* calls. This calls ap_pass_brigade
|
||||||
* to flush the brigade if the brigade buffer overflows.
|
* to flush the brigade if the brigade buffer overflows.
|
||||||
|
@@ -263,6 +263,8 @@ static int http_create_request(request_rec *r)
|
|||||||
NULL, r, r->connection);
|
NULL, r, r->connection);
|
||||||
ap_add_output_filter_handle(ap_http_outerror_filter_handle,
|
ap_add_output_filter_handle(ap_http_outerror_filter_handle,
|
||||||
NULL, r, r->connection);
|
NULL, r, r->connection);
|
||||||
|
ap_add_output_filter_handle(ap_request_core_filter_handle,
|
||||||
|
NULL, r, r->connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
return OK;
|
return OK;
|
||||||
|
@@ -256,6 +256,14 @@ AP_DECLARE(void) ap_process_request_after_handler(request_rec *r)
|
|||||||
apr_bucket *b;
|
apr_bucket *b;
|
||||||
conn_rec *c = r->connection;
|
conn_rec *c = r->connection;
|
||||||
|
|
||||||
|
/* Find the last request, taking into account internal
|
||||||
|
* redirects. We want to send the EOR bucket at the end of
|
||||||
|
* all the buckets so it does not jump the queue.
|
||||||
|
*/
|
||||||
|
while (r->next) {
|
||||||
|
r = r->next;
|
||||||
|
}
|
||||||
|
|
||||||
/* Send an EOR bucket through the output filter chain. When
|
/* Send an EOR bucket through the output filter chain. When
|
||||||
* this bucket is destroyed, the request will be logged and
|
* this bucket is destroyed, the request will be logged and
|
||||||
* its pool will be freed
|
* its pool will be freed
|
||||||
@@ -264,8 +272,8 @@ AP_DECLARE(void) ap_process_request_after_handler(request_rec *r)
|
|||||||
b = ap_bucket_eor_create(c->bucket_alloc, r);
|
b = ap_bucket_eor_create(c->bucket_alloc, r);
|
||||||
APR_BRIGADE_INSERT_HEAD(bb, b);
|
APR_BRIGADE_INSERT_HEAD(bb, b);
|
||||||
|
|
||||||
ap_pass_brigade(c->output_filters, bb);
|
ap_pass_brigade(r->output_filters, bb);
|
||||||
|
|
||||||
/* The EOR bucket has either been handled by an output filter (eg.
|
/* The EOR bucket has either been handled by an output filter (eg.
|
||||||
* deleted or moved to a buffered_bb => no more in bb), or an error
|
* deleted or moved to a buffered_bb => no more in bb), or an error
|
||||||
* occured before that (eg. c->aborted => still in bb) and we ought
|
* occured before that (eg. c->aborted => still in bb) and we ought
|
||||||
|
@@ -1682,6 +1682,7 @@ static apr_status_t ssl_io_filter_output(ap_filter_t *f,
|
|||||||
ssl_filter_ctx_t *filter_ctx = f->ctx;
|
ssl_filter_ctx_t *filter_ctx = f->ctx;
|
||||||
bio_filter_in_ctx_t *inctx;
|
bio_filter_in_ctx_t *inctx;
|
||||||
bio_filter_out_ctx_t *outctx;
|
bio_filter_out_ctx_t *outctx;
|
||||||
|
apr_bucket *flush_upto = NULL;
|
||||||
apr_read_type_e rblock = APR_NONBLOCK_READ;
|
apr_read_type_e rblock = APR_NONBLOCK_READ;
|
||||||
|
|
||||||
if (f->c->aborted) {
|
if (f->c->aborted) {
|
||||||
@@ -1689,6 +1690,9 @@ static apr_status_t ssl_io_filter_output(ap_filter_t *f,
|
|||||||
return APR_ECONNABORTED;
|
return APR_ECONNABORTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Reinstate any buffered content */
|
||||||
|
ap_filter_reinstate_brigade(f, bb, &flush_upto);
|
||||||
|
|
||||||
if (!filter_ctx->pssl) {
|
if (!filter_ctx->pssl) {
|
||||||
/* ssl_filter_io_shutdown was called */
|
/* ssl_filter_io_shutdown was called */
|
||||||
return ap_pass_brigade(f->next, bb);
|
return ap_pass_brigade(f->next, bb);
|
||||||
@@ -1711,6 +1715,16 @@ static apr_status_t ssl_io_filter_output(ap_filter_t *f,
|
|||||||
while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
|
while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
|
||||||
apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
|
apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
|
||||||
|
|
||||||
|
/* if the core has set aside data, back off and try later */
|
||||||
|
if (!flush_upto) {
|
||||||
|
if (ap_filter_should_yield(f)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (flush_upto == bucket) {
|
||||||
|
flush_upto = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (APR_BUCKET_IS_METADATA(bucket)) {
|
if (APR_BUCKET_IS_METADATA(bucket)) {
|
||||||
/* Pass through metadata buckets untouched. EOC is
|
/* Pass through metadata buckets untouched. EOC is
|
||||||
* special; terminate the SSL layer first. */
|
* special; terminate the SSL layer first. */
|
||||||
@@ -1762,6 +1776,10 @@ static apr_status_t ssl_io_filter_output(ap_filter_t *f,
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (APR_STATUS_IS_EOF(status) || (status == APR_SUCCESS)) {
|
||||||
|
return ap_filter_setaside_brigade(f, bb);
|
||||||
|
}
|
||||||
|
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -112,6 +112,7 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, insert_network_bucket,
|
|||||||
|
|
||||||
/* Handles for core filters */
|
/* Handles for core filters */
|
||||||
AP_DECLARE_DATA ap_filter_rec_t *ap_subreq_core_filter_handle;
|
AP_DECLARE_DATA ap_filter_rec_t *ap_subreq_core_filter_handle;
|
||||||
|
AP_DECLARE_DATA ap_filter_rec_t *ap_request_core_filter_handle;
|
||||||
AP_DECLARE_DATA ap_filter_rec_t *ap_core_output_filter_handle;
|
AP_DECLARE_DATA ap_filter_rec_t *ap_core_output_filter_handle;
|
||||||
AP_DECLARE_DATA ap_filter_rec_t *ap_content_length_filter_handle;
|
AP_DECLARE_DATA ap_filter_rec_t *ap_content_length_filter_handle;
|
||||||
AP_DECLARE_DATA ap_filter_rec_t *ap_core_input_filter_handle;
|
AP_DECLARE_DATA ap_filter_rec_t *ap_core_input_filter_handle;
|
||||||
@@ -5007,6 +5008,8 @@ static conn_rec *core_create_conn(apr_pool_t *ptrans, server_rec *s,
|
|||||||
|
|
||||||
c->id = id;
|
c->id = id;
|
||||||
c->bucket_alloc = alloc;
|
c->bucket_alloc = alloc;
|
||||||
|
c->empty = apr_brigade_create(c->pool, c->bucket_alloc);
|
||||||
|
c->filters = apr_hash_make(c->pool);
|
||||||
|
|
||||||
c->clogging_input_filters = 0;
|
c->clogging_input_filters = 0;
|
||||||
|
|
||||||
@@ -5395,6 +5398,9 @@ static void register_hooks(apr_pool_t *p)
|
|||||||
ap_core_output_filter_handle =
|
ap_core_output_filter_handle =
|
||||||
ap_register_output_filter("CORE", ap_core_output_filter,
|
ap_register_output_filter("CORE", ap_core_output_filter,
|
||||||
NULL, AP_FTYPE_NETWORK);
|
NULL, AP_FTYPE_NETWORK);
|
||||||
|
ap_request_core_filter_handle =
|
||||||
|
ap_register_output_filter("REQ_CORE", ap_request_core_filter,
|
||||||
|
NULL, AP_FTYPE_TRANSCODE);
|
||||||
ap_subreq_core_filter_handle =
|
ap_subreq_core_filter_handle =
|
||||||
ap_register_output_filter("SUBREQ_CORE", ap_sub_req_output_filter,
|
ap_register_output_filter("SUBREQ_CORE", ap_sub_req_output_filter,
|
||||||
NULL, AP_FTYPE_CONTENT_SET);
|
NULL, AP_FTYPE_CONTENT_SET);
|
||||||
|
@@ -78,9 +78,7 @@ do { \
|
|||||||
#define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
|
#define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
|
||||||
|
|
||||||
struct core_output_filter_ctx {
|
struct core_output_filter_ctx {
|
||||||
apr_bucket_brigade *buffered_bb;
|
|
||||||
apr_bucket_brigade *tmp_flush_bb;
|
apr_bucket_brigade *tmp_flush_bb;
|
||||||
apr_pool_t *deferred_write_pool;
|
|
||||||
apr_size_t bytes_written;
|
apr_size_t bytes_written;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -328,11 +326,6 @@ apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
|
|||||||
return APR_SUCCESS;
|
return APR_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setaside_remaining_output(ap_filter_t *f,
|
|
||||||
core_output_filter_ctx_t *ctx,
|
|
||||||
apr_bucket_brigade *bb,
|
|
||||||
conn_rec *c);
|
|
||||||
|
|
||||||
static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
|
static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
|
||||||
apr_bucket_brigade *bb,
|
apr_bucket_brigade *bb,
|
||||||
apr_size_t *bytes_written,
|
apr_size_t *bytes_written,
|
||||||
@@ -358,33 +351,23 @@ static apr_status_t sendfile_nonblocking(apr_socket_t *s,
|
|||||||
conn_rec *c);
|
conn_rec *c);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* XXX: Should these be configurable parameters? */
|
|
||||||
#define THRESHOLD_MIN_WRITE 4096
|
|
||||||
#define THRESHOLD_MAX_BUFFER 65536
|
|
||||||
#define MAX_REQUESTS_IN_PIPELINE 5
|
|
||||||
|
|
||||||
/* Optional function coming from mod_logio, used for logging of output
|
/* Optional function coming from mod_logio, used for logging of output
|
||||||
* traffic
|
* traffic
|
||||||
*/
|
*/
|
||||||
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out;
|
extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out;
|
||||||
|
|
||||||
apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
|
apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
|
||||||
{
|
{
|
||||||
conn_rec *c = f->c;
|
conn_rec *c = f->c;
|
||||||
core_net_rec *net = f->ctx;
|
core_net_rec *net = f->ctx;
|
||||||
core_output_filter_ctx_t *ctx = net->out_ctx;
|
core_output_filter_ctx_t *ctx = net->out_ctx;
|
||||||
apr_bucket_brigade *bb = NULL;
|
apr_bucket *flush_upto = NULL;
|
||||||
apr_bucket *bucket, *next, *flush_upto = NULL;
|
|
||||||
apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
|
|
||||||
int eor_buckets_in_brigade, morphing_bucket_in_brigade;
|
|
||||||
apr_status_t rv;
|
apr_status_t rv;
|
||||||
int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX);
|
int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX);
|
||||||
|
|
||||||
/* Fail quickly if the connection has already been aborted. */
|
/* Fail quickly if the connection has already been aborted. */
|
||||||
if (c->aborted) {
|
if (c->aborted) {
|
||||||
if (new_bb != NULL) {
|
apr_brigade_cleanup(bb);
|
||||||
apr_brigade_cleanup(new_bb);
|
|
||||||
}
|
|
||||||
return APR_ECONNABORTED;
|
return APR_ECONNABORTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -397,33 +380,14 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
|
|||||||
* allocated from bb->pool which might be wrong.
|
* allocated from bb->pool which might be wrong.
|
||||||
*/
|
*/
|
||||||
ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
|
ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
|
||||||
/* same for buffered_bb and ap_save_brigade */
|
|
||||||
ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (new_bb != NULL)
|
|
||||||
bb = new_bb;
|
|
||||||
|
|
||||||
if ((ctx->buffered_bb != NULL) &&
|
|
||||||
!APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
|
|
||||||
if (new_bb != NULL) {
|
|
||||||
APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
bb = ctx->buffered_bb;
|
|
||||||
}
|
|
||||||
c->data_in_output_filters = 0;
|
|
||||||
}
|
|
||||||
else if (new_bb == NULL) {
|
|
||||||
return APR_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Scan through the brigade and decide whether to attempt a write,
|
/* Scan through the brigade and decide whether to attempt a write,
|
||||||
* and how much to write, based on the following rules:
|
* and how much to write, based on the following rules:
|
||||||
*
|
*
|
||||||
* 1) The new_bb is null: Do a nonblocking write of as much as
|
* 1) The bb is empty: Do a nonblocking write of as much as
|
||||||
* possible: do a nonblocking write of as much data as possible,
|
* possible: do a nonblocking write of as much data as possible,
|
||||||
* then save the rest in ctx->buffered_bb. (If new_bb == NULL,
|
* then save the rest in ctx->buffered_bb. (If bb is empty,
|
||||||
* it probably means that the MPM is doing asynchronous write
|
* it probably means that the MPM is doing asynchronous write
|
||||||
* completion and has just determined that this connection
|
* completion and has just determined that this connection
|
||||||
* is writable.)
|
* is writable.)
|
||||||
@@ -459,93 +423,14 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
|
|||||||
* 3) Actually do the blocking write up to the last bucket determined
|
* 3) Actually do the blocking write up to the last bucket determined
|
||||||
* by rules 2a-d. The point of doing only one flush is to make as
|
* by rules 2a-d. The point of doing only one flush is to make as
|
||||||
* few calls to writev() as possible.
|
* few calls to writev() as possible.
|
||||||
*
|
|
||||||
* 4) If the brigade contains at least THRESHOLD_MIN_WRITE
|
|
||||||
* bytes: Do a nonblocking write of as much data as possible,
|
|
||||||
* then save the rest in ctx->buffered_bb.
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if (new_bb == NULL) {
|
ap_filter_reinstate_brigade(f, bb, &flush_upto);
|
||||||
rv = send_brigade_nonblocking(net->client_socket, bb,
|
|
||||||
&(ctx->bytes_written), c);
|
if (APR_BRIGADE_EMPTY(bb)) {
|
||||||
if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) {
|
|
||||||
/* The client has aborted the connection */
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
|
|
||||||
"core_output_filter: writing data to the network");
|
|
||||||
apr_brigade_cleanup(bb);
|
|
||||||
c->aborted = 1;
|
|
||||||
return rv;
|
|
||||||
}
|
|
||||||
setaside_remaining_output(f, ctx, bb, c);
|
|
||||||
return APR_SUCCESS;
|
return APR_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
bytes_in_brigade = 0;
|
|
||||||
non_file_bytes_in_brigade = 0;
|
|
||||||
eor_buckets_in_brigade = 0;
|
|
||||||
morphing_bucket_in_brigade = 0;
|
|
||||||
|
|
||||||
for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
|
|
||||||
bucket = next) {
|
|
||||||
next = APR_BUCKET_NEXT(bucket);
|
|
||||||
|
|
||||||
if (!APR_BUCKET_IS_METADATA(bucket)) {
|
|
||||||
if (bucket->length == (apr_size_t)-1) {
|
|
||||||
/*
|
|
||||||
* A setaside of morphing buckets would read everything into
|
|
||||||
* memory. Instead, we will flush everything up to and
|
|
||||||
* including this bucket.
|
|
||||||
*/
|
|
||||||
morphing_bucket_in_brigade = 1;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
bytes_in_brigade += bucket->length;
|
|
||||||
if (!APR_BUCKET_IS_FILE(bucket))
|
|
||||||
non_file_bytes_in_brigade += bucket->length;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (AP_BUCKET_IS_EOR(bucket)) {
|
|
||||||
eor_buckets_in_brigade++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (APR_BUCKET_IS_FLUSH(bucket)
|
|
||||||
|| non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
|
|
||||||
|| morphing_bucket_in_brigade
|
|
||||||
|| eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
|
|
||||||
/* this segment of the brigade MUST be sent before returning. */
|
|
||||||
|
|
||||||
if (loglevel >= APLOG_TRACE6) {
|
|
||||||
char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
|
|
||||||
"FLUSH bucket" :
|
|
||||||
(non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
|
|
||||||
"THRESHOLD_MAX_BUFFER" :
|
|
||||||
morphing_bucket_in_brigade ? "morphing bucket" :
|
|
||||||
"MAX_REQUESTS_IN_PIPELINE";
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
|
|
||||||
"will flush because of %s", reason);
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
|
|
||||||
"seen in brigade%s: bytes: %" APR_SIZE_T_FMT
|
|
||||||
", non-file bytes: %" APR_SIZE_T_FMT ", eor "
|
|
||||||
"buckets: %d, morphing buckets: %d",
|
|
||||||
flush_upto == NULL ? " so far"
|
|
||||||
: " since last flush point",
|
|
||||||
bytes_in_brigade,
|
|
||||||
non_file_bytes_in_brigade,
|
|
||||||
eor_buckets_in_brigade,
|
|
||||||
morphing_bucket_in_brigade);
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
* Defer the actual blocking write to avoid doing many writes.
|
|
||||||
*/
|
|
||||||
flush_upto = next;
|
|
||||||
|
|
||||||
bytes_in_brigade = 0;
|
|
||||||
non_file_bytes_in_brigade = 0;
|
|
||||||
eor_buckets_in_brigade = 0;
|
|
||||||
morphing_bucket_in_brigade = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (flush_upto != NULL) {
|
if (flush_upto != NULL) {
|
||||||
ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
|
ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
|
||||||
ctx->tmp_flush_bb);
|
ctx->tmp_flush_bb);
|
||||||
@@ -571,69 +456,28 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
|
|||||||
APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
|
APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rv = send_brigade_nonblocking(net->client_socket, bb, &(ctx->bytes_written),
|
||||||
|
c);
|
||||||
|
if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
|
||||||
|
/* The client has aborted the connection */
|
||||||
|
ap_log_cerror(
|
||||||
|
APLOG_MARK, APLOG_TRACE1, rv, c,
|
||||||
|
"core_output_filter: writing data to the network");
|
||||||
|
apr_brigade_cleanup(bb);
|
||||||
|
c->aborted = 1;
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
if (loglevel >= APLOG_TRACE8) {
|
if (loglevel >= APLOG_TRACE8) {
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
|
ap_log_cerror(
|
||||||
"brigade contains: bytes: %" APR_SIZE_T_FMT
|
APLOG_MARK, APLOG_TRACE8, 0, c,
|
||||||
", non-file bytes: %" APR_SIZE_T_FMT
|
"tried nonblocking write, total bytes "
|
||||||
", eor buckets: %d, morphing buckets: %d",
|
"written: %" APR_SIZE_T_FMT, ctx->bytes_written);
|
||||||
bytes_in_brigade, non_file_bytes_in_brigade,
|
|
||||||
eor_buckets_in_brigade, morphing_bucket_in_brigade);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
|
|
||||||
rv = send_brigade_nonblocking(net->client_socket, bb,
|
|
||||||
&(ctx->bytes_written), c);
|
|
||||||
if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
|
|
||||||
/* The client has aborted the connection */
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
|
|
||||||
"core_output_filter: writing data to the network");
|
|
||||||
apr_brigade_cleanup(bb);
|
|
||||||
c->aborted = 1;
|
|
||||||
return rv;
|
|
||||||
}
|
|
||||||
if (loglevel >= APLOG_TRACE8) {
|
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
|
|
||||||
"tried nonblocking write, total bytes "
|
|
||||||
"written: %" APR_SIZE_T_FMT,
|
|
||||||
ctx->bytes_written);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
setaside_remaining_output(f, ctx, bb, c);
|
|
||||||
return APR_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This function assumes that either ctx->buffered_bb == NULL, or
|
|
||||||
* ctx->buffered_bb is empty, or ctx->buffered_bb == bb
|
|
||||||
*/
|
|
||||||
static void setaside_remaining_output(ap_filter_t *f,
|
|
||||||
core_output_filter_ctx_t *ctx,
|
|
||||||
apr_bucket_brigade *bb,
|
|
||||||
conn_rec *c)
|
|
||||||
{
|
|
||||||
if (bb == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
remove_empty_buckets(bb);
|
remove_empty_buckets(bb);
|
||||||
if (!APR_BRIGADE_EMPTY(bb)) {
|
ap_filter_setaside_brigade(f, bb);
|
||||||
c->data_in_output_filters = 1;
|
|
||||||
if (bb != ctx->buffered_bb) {
|
return APR_SUCCESS;
|
||||||
if (!ctx->deferred_write_pool) {
|
|
||||||
apr_pool_create(&ctx->deferred_write_pool, c->pool);
|
|
||||||
apr_pool_tag(ctx->deferred_write_pool, "deferred_write");
|
|
||||||
}
|
|
||||||
ap_save_brigade(f, &(ctx->buffered_bb), &bb,
|
|
||||||
ctx->deferred_write_pool);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (ctx->deferred_write_pool) {
|
|
||||||
/*
|
|
||||||
* There are no more requests in the pipeline. We can just clear the
|
|
||||||
* pool.
|
|
||||||
*/
|
|
||||||
apr_pool_clear(ctx->deferred_write_pool);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifndef APR_MAX_IOVEC_SIZE
|
#ifndef APR_MAX_IOVEC_SIZE
|
||||||
|
@@ -1146,19 +1146,38 @@ read_request:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
|
if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
|
||||||
ap_filter_t *output_filter = c->output_filters;
|
apr_hash_index_t *rindex;
|
||||||
apr_status_t rv;
|
apr_status_t rv = APR_SUCCESS;
|
||||||
|
int data_in_output_filters = 0;
|
||||||
ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
|
ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
|
||||||
while (output_filter->next != NULL) {
|
|
||||||
output_filter = output_filter->next;
|
rindex = apr_hash_first(NULL, c->filters);
|
||||||
|
while (rindex) {
|
||||||
|
ap_filter_t *f = apr_hash_this_val(rindex);
|
||||||
|
|
||||||
|
if (!APR_BRIGADE_EMPTY(f->bb)) {
|
||||||
|
|
||||||
|
rv = ap_pass_brigade(f, c->empty);
|
||||||
|
apr_brigade_cleanup(c->empty);
|
||||||
|
if (APR_SUCCESS != rv) {
|
||||||
|
ap_log_cerror(
|
||||||
|
APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
|
||||||
|
"write failure in '%s' output filter", f->frec->name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ap_filter_should_yield(f)) {
|
||||||
|
data_in_output_filters = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rindex = apr_hash_next(rindex);
|
||||||
}
|
}
|
||||||
rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
|
|
||||||
if (rv != APR_SUCCESS) {
|
if (rv != APR_SUCCESS) {
|
||||||
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
|
|
||||||
"network write failure in core output filter");
|
|
||||||
cs->pub.state = CONN_STATE_LINGER;
|
cs->pub.state = CONN_STATE_LINGER;
|
||||||
}
|
}
|
||||||
else if (c->data_in_output_filters) {
|
else if (data_in_output_filters) {
|
||||||
/* Still in WRITE_COMPLETION_STATE:
|
/* Still in WRITE_COMPLETION_STATE:
|
||||||
* Set a write timeout for this connection, and let the
|
* Set a write timeout for this connection, and let the
|
||||||
* event thread poll for writeability.
|
* event thread poll for writeability.
|
||||||
|
@@ -359,21 +359,38 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
|
if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
|
||||||
ap_filter_t *output_filter = c->output_filters;
|
apr_hash_index_t *rindex;
|
||||||
ap_update_child_status_from_conn(scon->sbh, SERVER_BUSY_WRITE, c);
|
apr_status_t rv = APR_SUCCESS;
|
||||||
while (output_filter->next != NULL) {
|
int data_in_output_filters = 0;
|
||||||
output_filter = output_filter->next;
|
ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
|
||||||
}
|
|
||||||
|
|
||||||
rv = output_filter->frec->filter_func.out_func(output_filter,
|
rindex = apr_hash_first(NULL, c->filters);
|
||||||
NULL);
|
while (rindex) {
|
||||||
|
ap_filter_t *f = apr_hash_this_val(rindex);
|
||||||
|
|
||||||
|
if (!APR_BRIGADE_EMPTY(f->bb)) {
|
||||||
|
|
||||||
|
rv = ap_pass_brigade(f, c->empty);
|
||||||
|
apr_brigade_cleanup(c->empty);
|
||||||
|
if (APR_SUCCESS != rv) {
|
||||||
|
ap_log_cerror(
|
||||||
|
APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(02848)
|
||||||
|
"write failure in '%s' output filter", f->frec->name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ap_filter_should_yield(f)) {
|
||||||
|
data_in_output_filters = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rindex = apr_hash_next(rindex);
|
||||||
|
}
|
||||||
|
|
||||||
if (rv != APR_SUCCESS) {
|
if (rv != APR_SUCCESS) {
|
||||||
ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(02848)
|
|
||||||
"network write failure in core output filter");
|
|
||||||
scon->cs.state = CONN_STATE_LINGER;
|
scon->cs.state = CONN_STATE_LINGER;
|
||||||
}
|
}
|
||||||
else if (c->data_in_output_filters) {
|
else if (data_in_output_filters) {
|
||||||
/* Still in WRITE_COMPLETION_STATE:
|
/* Still in WRITE_COMPLETION_STATE:
|
||||||
* Set a write timeout for this connection, and let the
|
* Set a write timeout for this connection, and let the
|
||||||
* event thread poll for writeability.
|
* event thread poll for writeability.
|
||||||
|
@@ -92,20 +92,37 @@ static apr_status_t simple_io_process(simple_conn_t * scon)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
|
if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
|
||||||
ap_filter_t *output_filter = c->output_filters;
|
apr_hash_index_t *rindex;
|
||||||
while (output_filter->next != NULL) {
|
apr_status_t rv = APR_SUCCESS;
|
||||||
output_filter = output_filter->next;
|
int data_in_output_filters = 0;
|
||||||
}
|
|
||||||
|
|
||||||
rv = output_filter->frec->filter_func.out_func(output_filter,
|
rindex = apr_hash_first(NULL, c->filters);
|
||||||
NULL);
|
while (rindex) {
|
||||||
|
ap_filter_t *f = apr_hash_this_val(rindex);
|
||||||
|
|
||||||
|
if (!APR_BRIGADE_EMPTY(f->bb)) {
|
||||||
|
|
||||||
|
rv = ap_pass_brigade(f, c->empty);
|
||||||
|
apr_brigade_cleanup(c->empty);
|
||||||
|
if (APR_SUCCESS != rv) {
|
||||||
|
ap_log_cerror(
|
||||||
|
APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00249)
|
||||||
|
"write failure in '%s' output filter", f->frec->name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ap_filter_should_yield(f)) {
|
||||||
|
data_in_output_filters = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rindex = apr_hash_next(rindex);
|
||||||
|
}
|
||||||
|
|
||||||
if (rv != APR_SUCCESS) {
|
if (rv != APR_SUCCESS) {
|
||||||
ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(00249)
|
|
||||||
"network write failure in core output filter");
|
|
||||||
scon->cs.state = CONN_STATE_LINGER;
|
scon->cs.state = CONN_STATE_LINGER;
|
||||||
}
|
}
|
||||||
else if (c->data_in_output_filters) {
|
else if (data_in_output_filters) {
|
||||||
/* Still in WRITE_COMPLETION_STATE:
|
/* Still in WRITE_COMPLETION_STATE:
|
||||||
* Set a write timeout for this connection, and let the
|
* Set a write timeout for this connection, and let the
|
||||||
* event thread poll for writeability.
|
* event thread poll for writeability.
|
||||||
|
@@ -2036,6 +2036,64 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_sub_req_output_filter(ap_filter_t *f,
|
|||||||
return APR_SUCCESS;
|
return APR_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f,
|
||||||
|
apr_bucket_brigade *bb)
|
||||||
|
{
|
||||||
|
apr_bucket *flush_upto = NULL;
|
||||||
|
apr_status_t status = APR_SUCCESS;
|
||||||
|
apr_bucket_brigade *tmp_bb = f->ctx;
|
||||||
|
|
||||||
|
if (!tmp_bb) {
|
||||||
|
tmp_bb = f->ctx = apr_brigade_create(f->r->pool, f->c->bucket_alloc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Reinstate any buffered content */
|
||||||
|
ap_filter_reinstate_brigade(f, bb, &flush_upto);
|
||||||
|
|
||||||
|
while (!APR_BRIGADE_EMPTY(bb)) {
|
||||||
|
apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
|
||||||
|
|
||||||
|
/* if the core has set aside data, back off and try later */
|
||||||
|
if (!flush_upto) {
|
||||||
|
if (ap_filter_should_yield(f)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (flush_upto == bucket) {
|
||||||
|
flush_upto = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* have we found a morphing bucket? if so, force it to morph into something
|
||||||
|
* safe to pass down to the connection filters without needing to be set
|
||||||
|
* aside.
|
||||||
|
*/
|
||||||
|
if (!APR_BUCKET_IS_METADATA(bucket)) {
|
||||||
|
if (bucket->length == (apr_size_t) - 1) {
|
||||||
|
const char *data;
|
||||||
|
apr_size_t size;
|
||||||
|
if (APR_SUCCESS
|
||||||
|
!= (status = apr_bucket_read(bucket, &data, &size,
|
||||||
|
APR_BLOCK_READ))) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* pass each bucket down the chain */
|
||||||
|
APR_BUCKET_REMOVE(bucket);
|
||||||
|
APR_BRIGADE_INSERT_TAIL(tmp_bb, bucket);
|
||||||
|
|
||||||
|
status = ap_pass_brigade(f->next, tmp_bb);
|
||||||
|
if (!APR_STATUS_IS_EOF(status) && (status != APR_SUCCESS)) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
ap_filter_setaside_brigade(f, bb);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
extern APR_OPTIONAL_FN_TYPE(authz_some_auth_required) *ap__authz_ap_some_auth_required;
|
extern APR_OPTIONAL_FN_TYPE(authz_some_auth_required) *ap__authz_ap_some_auth_required;
|
||||||
|
|
||||||
AP_DECLARE(int) ap_some_auth_required(request_rec *r)
|
AP_DECLARE(int) ap_some_auth_required(request_rec *r)
|
||||||
|
@@ -24,6 +24,7 @@
|
|||||||
#include "http_config.h"
|
#include "http_config.h"
|
||||||
#include "http_core.h"
|
#include "http_core.h"
|
||||||
#include "http_log.h"
|
#include "http_log.h"
|
||||||
|
#include "http_request.h"
|
||||||
#include "util_filter.h"
|
#include "util_filter.h"
|
||||||
|
|
||||||
/* NOTE: Apache's current design doesn't allow a pool to be passed thru,
|
/* NOTE: Apache's current design doesn't allow a pool to be passed thru,
|
||||||
@@ -32,6 +33,10 @@
|
|||||||
#define FILTER_POOL apr_hook_global_pool
|
#define FILTER_POOL apr_hook_global_pool
|
||||||
#include "ap_hooks.h" /* for apr_hook_global_pool */
|
#include "ap_hooks.h" /* for apr_hook_global_pool */
|
||||||
|
|
||||||
|
/* XXX: Should these be configurable parameters? */
|
||||||
|
#define THRESHOLD_MAX_BUFFER 65536
|
||||||
|
#define MAX_REQUESTS_IN_PIPELINE 5
|
||||||
|
|
||||||
/*
|
/*
|
||||||
** This macro returns true/false if a given filter should be inserted BEFORE
|
** This macro returns true/false if a given filter should be inserted BEFORE
|
||||||
** another filter. This will happen when one of: 1) there isn't another
|
** another filter. This will happen when one of: 1) there isn't another
|
||||||
@@ -319,6 +324,8 @@ static ap_filter_t *add_any_filter_handle(ap_filter_rec_t *frec, void *ctx,
|
|||||||
f->r = frec->ftype < AP_FTYPE_CONNECTION ? r : NULL;
|
f->r = frec->ftype < AP_FTYPE_CONNECTION ? r : NULL;
|
||||||
f->c = c;
|
f->c = c;
|
||||||
f->next = NULL;
|
f->next = NULL;
|
||||||
|
f->bb = NULL;
|
||||||
|
f->deferred_pool = NULL;
|
||||||
|
|
||||||
if (INSERT_BEFORE(f, *outf)) {
|
if (INSERT_BEFORE(f, *outf)) {
|
||||||
f->next = *outf;
|
f->next = *outf;
|
||||||
@@ -474,6 +481,16 @@ AP_DECLARE(void) ap_remove_input_filter(ap_filter_t *f)
|
|||||||
|
|
||||||
AP_DECLARE(void) ap_remove_output_filter(ap_filter_t *f)
|
AP_DECLARE(void) ap_remove_output_filter(ap_filter_t *f)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
if ((f->bb) && !APR_BRIGADE_EMPTY(f->bb)) {
|
||||||
|
apr_brigade_cleanup(f->bb);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (f->deferred_pool) {
|
||||||
|
apr_pool_destroy(f->deferred_pool);
|
||||||
|
f->deferred_pool = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
remove_any_filter(f, f->r ? &f->r->output_filters : NULL,
|
remove_any_filter(f, f->r ? &f->r->output_filters : NULL,
|
||||||
f->r ? &f->r->proto_output_filters : NULL,
|
f->r ? &f->r->proto_output_filters : NULL,
|
||||||
&f->c->output_filters);
|
&f->c->output_filters);
|
||||||
@@ -566,6 +583,7 @@ AP_DECLARE(apr_status_t) ap_pass_brigade(ap_filter_t *next,
|
|||||||
{
|
{
|
||||||
if (next) {
|
if (next) {
|
||||||
apr_bucket *e;
|
apr_bucket *e;
|
||||||
|
|
||||||
if ((e = APR_BRIGADE_LAST(bb)) && APR_BUCKET_IS_EOS(e) && next->r) {
|
if ((e = APR_BRIGADE_LAST(bb)) && APR_BUCKET_IS_EOS(e) && next->r) {
|
||||||
/* This is only safe because HTTP_HEADER filter is always in
|
/* This is only safe because HTTP_HEADER filter is always in
|
||||||
* the filter stack. This ensures that there is ALWAYS a
|
* the filter stack. This ensures that there is ALWAYS a
|
||||||
@@ -635,7 +653,8 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f,
|
|||||||
apr_status_t rv, srv = APR_SUCCESS;
|
apr_status_t rv, srv = APR_SUCCESS;
|
||||||
|
|
||||||
/* If have never stored any data in the filter, then we had better
|
/* If have never stored any data in the filter, then we had better
|
||||||
* create an empty bucket brigade so that we can concat.
|
* create an empty bucket brigade so that we can concat. Register
|
||||||
|
* a cleanup to zero out the pointer if the pool is cleared.
|
||||||
*/
|
*/
|
||||||
if (!(*saveto)) {
|
if (!(*saveto)) {
|
||||||
*saveto = apr_brigade_create(p, f->c->bucket_alloc);
|
*saveto = apr_brigade_create(p, f->c->bucket_alloc);
|
||||||
@@ -673,6 +692,248 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f,
|
|||||||
return srv;
|
return srv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static apr_status_t filters_cleanup(void *data)
|
||||||
|
{
|
||||||
|
ap_filter_t **key = data;
|
||||||
|
|
||||||
|
apr_hash_set((*key)->c->filters, key, sizeof(ap_filter_t **), NULL);
|
||||||
|
|
||||||
|
return APR_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f,
|
||||||
|
apr_bucket_brigade *bb)
|
||||||
|
{
|
||||||
|
int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
|
||||||
|
|
||||||
|
if (loglevel >= APLOG_TRACE6) {
|
||||||
|
ap_log_cerror(
|
||||||
|
APLOG_MARK, APLOG_TRACE6, 0, f->c,
|
||||||
|
"setaside %s brigade to %s brigade in '%s' output filter",
|
||||||
|
(APR_BRIGADE_EMPTY(bb) ? "empty" : "full"),
|
||||||
|
(!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), f->frec->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!APR_BRIGADE_EMPTY(bb)) {
|
||||||
|
apr_pool_t *pool;
|
||||||
|
/*
|
||||||
|
* Set aside the brigade bb within f->bb.
|
||||||
|
*/
|
||||||
|
if (!f->bb) {
|
||||||
|
ap_filter_t **key;
|
||||||
|
|
||||||
|
pool = f->r ? f->r->pool : f->c->pool;
|
||||||
|
|
||||||
|
key = apr_palloc(pool, sizeof(ap_filter_t **));
|
||||||
|
*key = f;
|
||||||
|
apr_hash_set(f->c->filters, key, sizeof(ap_filter_t **), f);
|
||||||
|
|
||||||
|
f->bb = apr_brigade_create(pool, f->c->bucket_alloc);
|
||||||
|
|
||||||
|
apr_pool_pre_cleanup_register(pool, key, filters_cleanup);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/* decide what pool we setaside to, request pool or deferred pool? */
|
||||||
|
if (f->r) {
|
||||||
|
pool = f->r->pool;
|
||||||
|
APR_BRIGADE_CONCAT(f->bb, bb);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (!f->deferred_pool) {
|
||||||
|
apr_pool_create(&f->deferred_pool, f->c->pool);
|
||||||
|
apr_pool_tag(f->deferred_pool, "deferred_pool");
|
||||||
|
}
|
||||||
|
pool = f->deferred_pool;
|
||||||
|
return ap_save_brigade(f, &f->bb, &bb, pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
else if (f->deferred_pool) {
|
||||||
|
/*
|
||||||
|
* There are no more requests in the pipeline. We can just clear the
|
||||||
|
* pool.
|
||||||
|
*/
|
||||||
|
apr_brigade_cleanup(f->bb);
|
||||||
|
apr_pool_clear(f->deferred_pool);
|
||||||
|
}
|
||||||
|
return APR_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f,
|
||||||
|
apr_bucket_brigade *bb,
|
||||||
|
apr_bucket **flush_upto)
|
||||||
|
{
|
||||||
|
apr_bucket *bucket, *next;
|
||||||
|
apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
|
||||||
|
int eor_buckets_in_brigade, morphing_bucket_in_brigade;
|
||||||
|
int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
|
||||||
|
|
||||||
|
if (loglevel >= APLOG_TRACE6) {
|
||||||
|
ap_log_cerror(
|
||||||
|
APLOG_MARK, APLOG_TRACE6, 0, f->c,
|
||||||
|
"reinstate %s brigade to %s brigade in '%s' output filter",
|
||||||
|
(!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"),
|
||||||
|
(APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), f->frec->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) {
|
||||||
|
APR_BRIGADE_PREPEND(bb, f->bb);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Determine if and up to which bucket we need to do a blocking write:
|
||||||
|
*
|
||||||
|
* a) The brigade contains a flush bucket: Do a blocking write
|
||||||
|
* of everything up that point.
|
||||||
|
*
|
||||||
|
* b) The request is in CONN_STATE_HANDLER state, and the brigade
|
||||||
|
* contains at least THRESHOLD_MAX_BUFFER bytes in non-file
|
||||||
|
* buckets: Do blocking writes until the amount of data in the
|
||||||
|
* buffer is less than THRESHOLD_MAX_BUFFER. (The point of this
|
||||||
|
* rule is to provide flow control, in case a handler is
|
||||||
|
* streaming out lots of data faster than the data can be
|
||||||
|
* sent to the client.)
|
||||||
|
*
|
||||||
|
* c) The request is in CONN_STATE_HANDLER state, and the brigade
|
||||||
|
* contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
|
||||||
|
* Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
|
||||||
|
* buckets are left. (The point of this rule is to prevent too many
|
||||||
|
* FDs being kept open by pipelined requests, possibly allowing a
|
||||||
|
* DoS).
|
||||||
|
*
|
||||||
|
* d) The request is being served by a connection filter and the
|
||||||
|
* brigade contains a morphing bucket: If there was no other
|
||||||
|
* reason to do a blocking write yet, try reading the bucket. If its
|
||||||
|
* contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
|
||||||
|
* everything is fine. Otherwise we need to do a blocking write the
|
||||||
|
* up to and including the morphing bucket, because ap_save_brigade()
|
||||||
|
* would read the whole bucket into memory later on.
|
||||||
|
*/
|
||||||
|
|
||||||
|
*flush_upto = NULL;
|
||||||
|
|
||||||
|
bytes_in_brigade = 0;
|
||||||
|
non_file_bytes_in_brigade = 0;
|
||||||
|
eor_buckets_in_brigade = 0;
|
||||||
|
morphing_bucket_in_brigade = 0;
|
||||||
|
|
||||||
|
for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
|
||||||
|
bucket = next) {
|
||||||
|
next = APR_BUCKET_NEXT(bucket);
|
||||||
|
|
||||||
|
if (!APR_BUCKET_IS_METADATA(bucket)) {
|
||||||
|
if (bucket->length == (apr_size_t)-1) {
|
||||||
|
/*
|
||||||
|
* A setaside of morphing buckets would read everything into
|
||||||
|
* memory. Instead, we will flush everything up to and
|
||||||
|
* including this bucket.
|
||||||
|
*/
|
||||||
|
morphing_bucket_in_brigade = 1;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
bytes_in_brigade += bucket->length;
|
||||||
|
if (!APR_BUCKET_IS_FILE(bucket))
|
||||||
|
non_file_bytes_in_brigade += bucket->length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (AP_BUCKET_IS_EOR(bucket)) {
|
||||||
|
eor_buckets_in_brigade++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (APR_BUCKET_IS_FLUSH(bucket)
|
||||||
|
|| non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
|
||||||
|
|| (!f->r && morphing_bucket_in_brigade)
|
||||||
|
|| eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
|
||||||
|
/* this segment of the brigade MUST be sent before returning. */
|
||||||
|
|
||||||
|
if (loglevel >= APLOG_TRACE6) {
|
||||||
|
char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
|
||||||
|
"FLUSH bucket" :
|
||||||
|
(non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
|
||||||
|
"THRESHOLD_MAX_BUFFER" :
|
||||||
|
(!f->r && morphing_bucket_in_brigade) ? "morphing bucket" :
|
||||||
|
"MAX_REQUESTS_IN_PIPELINE";
|
||||||
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
|
||||||
|
"will flush because of %s", reason);
|
||||||
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
|
||||||
|
"seen in brigade%s: bytes: %" APR_SIZE_T_FMT
|
||||||
|
", non-file bytes: %" APR_SIZE_T_FMT ", eor "
|
||||||
|
"buckets: %d, morphing buckets: %d",
|
||||||
|
flush_upto == NULL ? " so far"
|
||||||
|
: " since last flush point",
|
||||||
|
bytes_in_brigade,
|
||||||
|
non_file_bytes_in_brigade,
|
||||||
|
eor_buckets_in_brigade,
|
||||||
|
morphing_bucket_in_brigade);
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Defer the actual blocking write to avoid doing many writes.
|
||||||
|
*/
|
||||||
|
*flush_upto = next;
|
||||||
|
|
||||||
|
bytes_in_brigade = 0;
|
||||||
|
non_file_bytes_in_brigade = 0;
|
||||||
|
eor_buckets_in_brigade = 0;
|
||||||
|
morphing_bucket_in_brigade = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (loglevel >= APLOG_TRACE8) {
|
||||||
|
ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
|
||||||
|
"brigade contains: bytes: %" APR_SIZE_T_FMT
|
||||||
|
", non-file bytes: %" APR_SIZE_T_FMT
|
||||||
|
", eor buckets: %d, morphing buckets: %d",
|
||||||
|
bytes_in_brigade, non_file_bytes_in_brigade,
|
||||||
|
eor_buckets_in_brigade, morphing_bucket_in_brigade);
|
||||||
|
}
|
||||||
|
|
||||||
|
return APR_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* This function decides whether a filter should yield due to buffered
|
||||||
|
* data in a downstream filter. If a downstream filter buffers we
|
||||||
|
* must back off so we don't overwhelm the server. If this function
|
||||||
|
* returns true, the filter should call ap_filter_setaside_brigade()
|
||||||
|
* to save unprocessed buckets, and then reinstate those buckets on
|
||||||
|
* the next call with ap_filter_reinstate_brigade() and continue
|
||||||
|
* where it left off.
|
||||||
|
*
|
||||||
|
* If this function is forced to return zero, we return back to
|
||||||
|
* synchronous filter behaviour.
|
||||||
|
*
|
||||||
|
* Subrequests present us with a problem - we don't know how much data
|
||||||
|
* they will produce and therefore how much buffering we'll need, and
|
||||||
|
* if a subrequest had to trigger buffering, but next subrequest wouldn't
|
||||||
|
* know when the previous one had finished sending data and buckets
|
||||||
|
* could be sent out of order.
|
||||||
|
*
|
||||||
|
* In the case of subrequests, deny the ability to yield. When the data
|
||||||
|
* reaches the filters from the main request, they will be setaside
|
||||||
|
* there in the right order and the request will be given the
|
||||||
|
* opportunity to yield.
|
||||||
|
*/
|
||||||
|
if (f->r && f->r->main) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This is either a main request or internal redirect, or it is a
|
||||||
|
* connection filter. Yield if there is any buffered data downstream
|
||||||
|
* from us.
|
||||||
|
*/
|
||||||
|
while (f) {
|
||||||
|
if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
f = f->next;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
AP_DECLARE_NONSTD(apr_status_t) ap_filter_flush(apr_bucket_brigade *bb,
|
AP_DECLARE_NONSTD(apr_status_t) ap_filter_flush(apr_bucket_brigade *bb,
|
||||||
void *ctx)
|
void *ctx)
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user