mirror of
https://github.com/apache/httpd.git
synced 2026-01-06 09:01:14 +03:00
mod_http2: some code cleanup of stream request body handling, potential avoid a buffer copy
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1734428 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
@@ -45,6 +45,14 @@ h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request)
|
||||
return io;
|
||||
}
|
||||
|
||||
static void check_bbin(h2_io *io)
|
||||
{
|
||||
if (!io->bbin) {
|
||||
io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
|
||||
io->tmp = apr_brigade_create(io->pool, io->bucket_alloc);
|
||||
}
|
||||
}
|
||||
|
||||
void h2_io_redo(h2_io *io)
|
||||
{
|
||||
io->worker_started = 0;
|
||||
@@ -85,23 +93,12 @@ void h2_io_set_response(h2_io *io, h2_response *response)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void h2_io_rst(h2_io *io, int error)
|
||||
{
|
||||
io->rst_error = error;
|
||||
io->eos_in = 1;
|
||||
}
|
||||
|
||||
int h2_io_in_has_eos_for(h2_io *io)
|
||||
{
|
||||
return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
|
||||
}
|
||||
|
||||
int h2_io_in_has_data(h2_io *io)
|
||||
{
|
||||
return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
|
||||
}
|
||||
|
||||
int h2_io_out_has_data(h2_io *io)
|
||||
{
|
||||
return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
|
||||
@@ -298,7 +295,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
|
||||
return status;
|
||||
}
|
||||
|
||||
apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
|
||||
apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos)
|
||||
{
|
||||
if (io->rst_error) {
|
||||
return APR_ECONNABORTED;
|
||||
@@ -307,13 +304,12 @@ apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
|
||||
if (io->eos_in) {
|
||||
return APR_EOF;
|
||||
}
|
||||
io->eos_in = h2_util_has_eos(bb, -1);
|
||||
if (!APR_BRIGADE_EMPTY(bb)) {
|
||||
if (!io->bbin) {
|
||||
io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
|
||||
io->tmp = apr_brigade_create(io->pool, io->bucket_alloc);
|
||||
}
|
||||
return h2_util_move(io->bbin, bb, -1, NULL, "h2_io_in_write");
|
||||
if (eos) {
|
||||
io->eos_in = 1;
|
||||
}
|
||||
if (len > 0) {
|
||||
check_bbin(io);
|
||||
return apr_brigade_write(io->bbin, NULL, NULL, d, len);
|
||||
}
|
||||
return APR_SUCCESS;
|
||||
}
|
||||
|
||||
@@ -92,19 +92,10 @@ void h2_io_rst(h2_io *io, int error);
|
||||
int h2_io_is_repeatable(h2_io *io);
|
||||
void h2_io_redo(h2_io *io);
|
||||
|
||||
/**
|
||||
* The input data is completely queued. Blocked reads will return immediately
|
||||
* and give either data or EOF.
|
||||
*/
|
||||
int h2_io_in_has_eos_for(h2_io *io);
|
||||
/**
|
||||
* Output data is available.
|
||||
*/
|
||||
int h2_io_out_has_data(h2_io *io);
|
||||
/**
|
||||
* Input data is available.
|
||||
*/
|
||||
int h2_io_in_has_data(h2_io *io);
|
||||
|
||||
void h2_io_signal(h2_io *io, h2_io_op op);
|
||||
void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout,
|
||||
@@ -127,7 +118,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
|
||||
/**
|
||||
* Appends given bucket to the input.
|
||||
*/
|
||||
apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb);
|
||||
apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos);
|
||||
|
||||
/**
|
||||
* Closes the input. After existing data has been read, APR_EOF will
|
||||
|
||||
@@ -498,7 +498,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
|
||||
}
|
||||
|
||||
apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
|
||||
apr_bucket_brigade *bb)
|
||||
const char *data, apr_size_t len, int eos)
|
||||
{
|
||||
apr_status_t status;
|
||||
int acquired;
|
||||
@@ -508,7 +508,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
|
||||
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
|
||||
if (io && !io->orphaned) {
|
||||
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
|
||||
status = h2_io_in_write(io, bb);
|
||||
status = h2_io_in_write(io, data, len, eos);
|
||||
H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
|
||||
h2_io_signal(io, H2_IO_READ);
|
||||
io_process_events(m, io);
|
||||
@@ -898,46 +898,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
|
||||
return status;
|
||||
}
|
||||
|
||||
int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
|
||||
{
|
||||
int has_eos = 0;
|
||||
int acquired;
|
||||
|
||||
apr_status_t status;
|
||||
AP_DEBUG_ASSERT(m);
|
||||
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
|
||||
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
|
||||
if (io && !io->orphaned) {
|
||||
has_eos = h2_io_in_has_eos_for(io);
|
||||
}
|
||||
else {
|
||||
has_eos = 1;
|
||||
}
|
||||
leave_mutex(m, acquired);
|
||||
}
|
||||
return has_eos;
|
||||
}
|
||||
|
||||
int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
|
||||
{
|
||||
apr_status_t status;
|
||||
int has_data = 0;
|
||||
int acquired;
|
||||
|
||||
AP_DEBUG_ASSERT(m);
|
||||
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
|
||||
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
|
||||
if (io && !io->orphaned) {
|
||||
has_data = h2_io_in_has_data(io);
|
||||
}
|
||||
else {
|
||||
has_data = 0;
|
||||
}
|
||||
leave_mutex(m, acquired);
|
||||
}
|
||||
return has_data;
|
||||
}
|
||||
|
||||
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
|
||||
{
|
||||
apr_status_t status;
|
||||
|
||||
@@ -171,10 +171,6 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
|
||||
*/
|
||||
int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
|
||||
|
||||
/* Return != 0 iff the multiplexer has input data for the given stream.
|
||||
*/
|
||||
int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
|
||||
|
||||
/**
|
||||
* Waits on output data from any stream in this session to become available.
|
||||
* Returns APR_TIMEUP if no data arrived in the given time.
|
||||
@@ -238,20 +234,14 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
|
||||
* Appends data to the input of the given stream. Storage of input data is
|
||||
* not subject to flow control.
|
||||
*/
|
||||
apr_status_t h2_mplx_in_write(h2_mplx *mplx, int stream_id,
|
||||
apr_bucket_brigade *bb);
|
||||
apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
|
||||
const char *data, apr_size_t len, int eos);
|
||||
|
||||
/**
|
||||
* Closes the input for the given stream_id.
|
||||
*/
|
||||
apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
|
||||
|
||||
/**
|
||||
* Returns != 0 iff the input for the given stream has been closed. There
|
||||
* could still be data queued, but it can be read without blocking.
|
||||
*/
|
||||
int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id);
|
||||
|
||||
/**
|
||||
* Invoke the consumed callback for all streams that had bytes read since the
|
||||
* last call to this function. If no stream had input data consumed, the
|
||||
|
||||
@@ -235,8 +235,11 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
status = h2_stream_write_data(stream, (const char *)data, len);
|
||||
|
||||
/* FIXME: enabling setting EOS this way seems to break input handling
|
||||
* in mod_proxy_http2. why? */
|
||||
status = h2_stream_write_data(stream, (const char *)data, len,
|
||||
0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
|
||||
"h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
|
||||
session->id, stream_id, (long)len);
|
||||
|
||||
@@ -41,13 +41,6 @@
|
||||
#include "h2_util.h"
|
||||
|
||||
|
||||
#define H2_STREAM_IN(lvl,s,msg) \
|
||||
do { \
|
||||
if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
|
||||
h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \
|
||||
} while(0)
|
||||
|
||||
|
||||
static int state_transition[][7] = {
|
||||
/* ID OP RL RR CI CO CL */
|
||||
/*ID*/{ 1, 0, 0, 0, 0, 0, 0 },
|
||||
@@ -144,19 +137,13 @@ static int output_open(h2_stream *stream)
|
||||
|
||||
static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
|
||||
|
||||
h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
|
||||
h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
|
||||
{
|
||||
h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
|
||||
stream->id = id;
|
||||
stream->state = H2_STREAM_ST_IDLE;
|
||||
stream->pool = pool;
|
||||
stream->session = session;
|
||||
return stream;
|
||||
}
|
||||
|
||||
h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
|
||||
{
|
||||
h2_stream *stream = h2_stream_create(id, pool, session);
|
||||
set_state(stream, H2_STREAM_ST_OPEN);
|
||||
stream->request = h2_request_create(id, pool,
|
||||
h2_config_geti(session->config, H2_CONF_SER_HEADERS));
|
||||
@@ -296,8 +283,6 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
|
||||
if (status == APR_SUCCESS) {
|
||||
if (!eos) {
|
||||
stream->request->body = 1;
|
||||
stream->bbin = apr_brigade_create(stream->pool,
|
||||
stream->session->c->bucket_alloc);
|
||||
}
|
||||
stream->input_remaining = stream->request->content_length;
|
||||
|
||||
@@ -328,33 +313,6 @@ int h2_stream_is_scheduled(const h2_stream *stream)
|
||||
return stream->scheduled;
|
||||
}
|
||||
|
||||
static apr_status_t h2_stream_input_flush(h2_stream *stream)
|
||||
{
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) {
|
||||
|
||||
status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin);
|
||||
if (status != APR_SUCCESS) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->mplx->c,
|
||||
"h2_stream(%ld-%d): flushing input data",
|
||||
stream->session->id, stream->id);
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx)
|
||||
{
|
||||
(void)bb;
|
||||
return h2_stream_input_flush(ctx);
|
||||
}
|
||||
|
||||
static apr_status_t input_add_data(h2_stream *stream,
|
||||
const char *data, size_t len)
|
||||
{
|
||||
return apr_brigade_write(stream->bbin, input_flush, stream, data, len);
|
||||
}
|
||||
|
||||
apr_status_t h2_stream_close_input(h2_stream *stream)
|
||||
{
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
@@ -368,28 +326,23 @@ apr_status_t h2_stream_close_input(h2_stream *stream)
|
||||
return APR_ECONNRESET;
|
||||
}
|
||||
|
||||
H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre");
|
||||
if (close_input(stream) && stream->bbin) {
|
||||
status = h2_stream_input_flush(stream);
|
||||
if (status == APR_SUCCESS) {
|
||||
status = h2_mplx_in_close(stream->session->mplx, stream->id);
|
||||
}
|
||||
if (close_input(stream)) {
|
||||
status = h2_mplx_in_close(stream->session->mplx, stream->id);
|
||||
}
|
||||
H2_STREAM_IN(APLOG_TRACE2, stream, "close_post");
|
||||
return status;
|
||||
}
|
||||
|
||||
apr_status_t h2_stream_write_data(h2_stream *stream,
|
||||
const char *data, size_t len)
|
||||
const char *data, size_t len, int eos)
|
||||
{
|
||||
apr_status_t status = APR_SUCCESS;
|
||||
|
||||
AP_DEBUG_ASSERT(stream);
|
||||
if (input_closed(stream) || !stream->request->eoh || !stream->bbin) {
|
||||
if (input_closed(stream) || !stream->request->eoh) {
|
||||
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
|
||||
"h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d",
|
||||
"h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d",
|
||||
stream->session->id, stream->id, input_closed(stream),
|
||||
stream->request->eoh, !!stream->bbin);
|
||||
stream->request->eoh);
|
||||
return APR_EINVAL;
|
||||
}
|
||||
|
||||
@@ -397,7 +350,6 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
|
||||
"h2_stream(%ld-%d): add %ld input bytes",
|
||||
stream->session->id, stream->id, (long)len);
|
||||
|
||||
H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre");
|
||||
if (!stream->request->chunked) {
|
||||
stream->input_remaining -= len;
|
||||
if (stream->input_remaining < 0) {
|
||||
@@ -413,11 +365,10 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
|
||||
}
|
||||
}
|
||||
|
||||
status = input_add_data(stream, data, len);
|
||||
if (status == APR_SUCCESS) {
|
||||
status = h2_stream_input_flush(stream);
|
||||
status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos);
|
||||
if (eos) {
|
||||
close_input(stream);
|
||||
}
|
||||
H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post");
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
@@ -57,7 +57,6 @@ struct h2_stream {
|
||||
unsigned int submitted : 1; /* response HEADER has been sent */
|
||||
|
||||
apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */
|
||||
apr_bucket_brigade *bbin; /* input DATA */
|
||||
|
||||
struct h2_sos *sos; /* stream output source, e.g. to read output from */
|
||||
apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
|
||||
@@ -66,15 +65,6 @@ struct h2_stream {
|
||||
|
||||
#define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def))
|
||||
|
||||
/**
|
||||
* Create a stream in IDLE state.
|
||||
* @param id the stream identifier
|
||||
* @param pool the memory pool to use for this stream
|
||||
* @param session the session this stream belongs to
|
||||
* @return the newly created IDLE stream
|
||||
*/
|
||||
h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session);
|
||||
|
||||
/**
|
||||
* Create a stream in OPEN state.
|
||||
* @param id the stream identifier
|
||||
@@ -155,7 +145,7 @@ apr_status_t h2_stream_close_input(h2_stream *stream);
|
||||
* @param len the number of bytes to write
|
||||
*/
|
||||
apr_status_t h2_stream_write_data(h2_stream *stream,
|
||||
const char *data, size_t len);
|
||||
const char *data, size_t len, int eos);
|
||||
|
||||
/**
|
||||
* Reset the stream. Stream write/reads will return errors afterwards.
|
||||
|
||||
Reference in New Issue
Block a user