1
0
mirror of https://github.com/apache/httpd.git synced 2025-08-08 15:02:10 +03:00

rework of output handling on stream/session close, rework of cleartext (http:) output to pass buckets to core filters, splitting of stream/io memory pools for stability and less sync

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1712300 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Stefan Eissing
2015-11-03 14:33:11 +00:00
parent d32c0ff682
commit 0b107b1c46
35 changed files with 786 additions and 572 deletions

View File

@@ -20,6 +20,8 @@ dnl # list of module object files
http2_objs="dnl http2_objs="dnl
mod_http2.lo dnl mod_http2.lo dnl
h2_alt_svc.lo dnl h2_alt_svc.lo dnl
h2_bucket_eoc.lo dnl
h2_bucket_eos.lo dnl
h2_config.lo dnl h2_config.lo dnl
h2_conn.lo dnl h2_conn.lo dnl
h2_conn_io.lo dnl h2_conn_io.lo dnl

View File

@@ -0,0 +1,108 @@
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <assert.h>
#include <stddef.h>
#include <httpd.h>
#include <http_core.h>
#include <http_connection.h>
#include <http_log.h>
#include "h2_private.h"
#include "h2_mplx.h"
#include "h2_session.h"
#include "h2_bucket_eoc.h"
typedef struct {
apr_bucket_refcount refcount;
h2_session *session;
} h2_bucket_eoc;
static apr_status_t bucket_cleanup(void *data)
{
h2_session **psession = data;
if (*psession) {
/*
* If bucket_destroy is called after us, this prevents
* bucket_destroy from trying to destroy the pool again.
*/
*psession = NULL;
}
return APR_SUCCESS;
}
static apr_status_t bucket_read(apr_bucket *b, const char **str,
apr_size_t *len, apr_read_type_e block)
{
*str = NULL;
*len = 0;
return APR_SUCCESS;
}
AP_DECLARE(apr_bucket *) h2_bucket_eoc_make(apr_bucket *b,
h2_session *session)
{
h2_bucket_eoc *h;
h = apr_bucket_alloc(sizeof(*h), b->list);
h->session = session;
b = apr_bucket_shared_make(b, h, 0, 0);
b->type = &ap_bucket_type_h2_eoc;
return b;
}
AP_DECLARE(apr_bucket *) h2_bucket_eoc_create(apr_bucket_alloc_t *list,
h2_session *session)
{
apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
APR_BUCKET_INIT(b);
b->free = apr_bucket_free;
b->list = list;
b = h2_bucket_eoc_make(b, session);
if (session) {
h2_bucket_eoc *h = b->data;
apr_pool_pre_cleanup_register(session->pool, &h->session, bucket_cleanup);
}
return b;
}
static void bucket_destroy(void *data)
{
h2_bucket_eoc *h = data;
if (apr_bucket_shared_destroy(h)) {
h2_session *session = h->session;
if (session) {
h2_session_cleanup(session);
}
apr_bucket_free(h);
}
}
AP_DECLARE_DATA const apr_bucket_type_t ap_bucket_type_h2_eoc = {
"H2EOC", 5, APR_BUCKET_METADATA,
bucket_destroy,
bucket_read,
apr_bucket_setaside_noop,
apr_bucket_split_notimpl,
apr_bucket_shared_copy
};

View File

@@ -0,0 +1,31 @@
/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef mod_http2_h2_bucket_eoc_h
#define mod_http2_h2_bucket_eoc_h
struct h2_session;
/** End Of HTTP/2 SESSION (H2EOC) bucket */
AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_h2_eoc;
AP_DECLARE(apr_bucket *) h2_bucket_eoc_make(apr_bucket *b,
struct h2_session *session);
AP_DECLARE(apr_bucket *) h2_bucket_eoc_create(apr_bucket_alloc_t *list,
struct h2_session *session);
#endif /* mod_http2_h2_bucket_eoc_h */

View File

@@ -0,0 +1,108 @@
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <assert.h>
#include <stddef.h>
#include <httpd.h>
#include <http_core.h>
#include <http_connection.h>
#include <http_log.h>
#include "h2_private.h"
#include "h2_mplx.h"
#include "h2_stream.h"
#include "h2_bucket_eos.h"
typedef struct {
apr_bucket_refcount refcount;
h2_stream *stream;
} h2_bucket_eos;
static apr_status_t bucket_cleanup(void *data)
{
h2_stream **pstream = data;
if (*pstream) {
/*
* If bucket_destroy is called after us, this prevents
* bucket_destroy from trying to destroy the pool again.
*/
*pstream = NULL;
}
return APR_SUCCESS;
}
static apr_status_t bucket_read(apr_bucket *b, const char **str,
apr_size_t *len, apr_read_type_e block)
{
*str = NULL;
*len = 0;
return APR_SUCCESS;
}
AP_DECLARE(apr_bucket *) h2_bucket_eos_make(apr_bucket *b,
h2_stream *stream)
{
h2_bucket_eos *h;
h = apr_bucket_alloc(sizeof(*h), b->list);
h->stream = stream;
b = apr_bucket_shared_make(b, h, 0, 0);
b->type = &ap_bucket_type_h2_eos;
return b;
}
AP_DECLARE(apr_bucket *) h2_bucket_eos_create(apr_bucket_alloc_t *list,
h2_stream *stream)
{
apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
APR_BUCKET_INIT(b);
b->free = apr_bucket_free;
b->list = list;
b = h2_bucket_eos_make(b, stream);
if (stream) {
h2_bucket_eos *h = b->data;
apr_pool_pre_cleanup_register(stream->pool, &h->stream, bucket_cleanup);
}
return b;
}
static void bucket_destroy(void *data)
{
h2_bucket_eos *h = data;
if (apr_bucket_shared_destroy(h)) {
h2_stream *stream = h->stream;
if (stream) {
h2_stream_cleanup(stream);
}
apr_bucket_free(h);
}
}
AP_DECLARE_DATA const apr_bucket_type_t ap_bucket_type_h2_eos = {
"H2EOS", 5, APR_BUCKET_METADATA,
bucket_destroy,
bucket_read,
apr_bucket_setaside_noop,
apr_bucket_split_notimpl,
apr_bucket_shared_copy
};

View File

@@ -0,0 +1,31 @@
/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef mod_http2_h2_bucket_stream_eos_h
#define mod_http2_h2_bucket_stream_eos_h
struct h2_stream;
/** End Of HTTP/2 STREAM (H2EOS) bucket */
AP_DECLARE_DATA extern const apr_bucket_type_t ap_bucket_type_h2_eos;
AP_DECLARE(apr_bucket *) h2_bucket_eos_make(apr_bucket *b,
struct h2_stream *stream);
AP_DECLARE(apr_bucket *) h2_bucket_eos_create(apr_bucket_alloc_t *list,
struct h2_stream *stream);
#endif /* mod_http2_h2_bucket_stream_eos_h */

View File

@@ -316,8 +316,8 @@ static const char *h2_conf_set_session_extra_files(cmd_parms *parms,
{ {
h2_config *cfg = h2_config_sget(parms->server); h2_config *cfg = h2_config_sget(parms->server);
apr_int64_t max = (int)apr_atoi64(value); apr_int64_t max = (int)apr_atoi64(value);
if (max <= 0) { if (max < 0) {
return "value must be a positive number"; return "value must be a non-negative number";
} }
cfg->session_extra_files = (int)max; cfg->session_extra_files = (int)max;
(void)arg; (void)arg;

View File

@@ -240,7 +240,7 @@ static apr_status_t h2_conn_loop(h2_session *session)
session->c->local_addr->port); session->c->local_addr->port);
if (status != APR_SUCCESS) { if (status != APR_SUCCESS) {
h2_session_abort(session, status, rv); h2_session_abort(session, status, rv);
h2_session_destroy(session); h2_session_cleanup(session);
return status; return status;
} }
@@ -343,12 +343,9 @@ static apr_status_t h2_conn_loop(h2_session *session)
ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c, ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c,
"h2_session(%ld): done", session->id); "h2_session(%ld): done", session->id);
h2_session_close(session);
ap_update_child_status_from_conn(session->c->sbh, SERVER_CLOSING, ap_update_child_status_from_conn(session->c->sbh, SERVER_CLOSING,
session->c); session->c);
h2_session_close(session);
h2_session_destroy(session);
return DONE; return DONE;
} }
@@ -411,11 +408,11 @@ conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *pool)
return c; return c;
} }
apr_status_t h2_conn_setup(h2_task_env *env, struct h2_worker *worker) apr_status_t h2_conn_setup(h2_task *task, struct h2_worker *worker)
{ {
conn_rec *master = env->mplx->c; conn_rec *master = task->mplx->c;
ap_log_perror(APLOG_MARK, APLOG_TRACE3, 0, env->pool, ap_log_perror(APLOG_MARK, APLOG_TRACE3, 0, task->pool,
"h2_conn(%ld): created from master", master->id); "h2_conn(%ld): created from master", master->id);
/* Ok, we are just about to start processing the connection and /* Ok, we are just about to start processing the connection and
@@ -424,17 +421,17 @@ apr_status_t h2_conn_setup(h2_task_env *env, struct h2_worker *worker)
* sub-resources from it, so that we get a nice reuse of * sub-resources from it, so that we get a nice reuse of
* pools. * pools.
*/ */
env->c.pool = env->pool; task->c->pool = task->pool;
env->c.bucket_alloc = h2_worker_get_bucket_alloc(worker); task->c->bucket_alloc = h2_worker_get_bucket_alloc(worker);
env->c.current_thread = h2_worker_get_thread(worker); task->c->current_thread = h2_worker_get_thread(worker);
env->c.conn_config = ap_create_conn_config(env->pool); task->c->conn_config = ap_create_conn_config(task->pool);
env->c.notes = apr_table_make(env->pool, 5); task->c->notes = apr_table_make(task->pool, 5);
/* In order to do this in 2.4.x, we need to add a member to conn_rec */ /* In order to do this in 2.4.x, we need to add a member to conn_rec */
env->c.master = master; task->c->master = master;
ap_set_module_config(env->c.conn_config, &core_module, ap_set_module_config(task->c->conn_config, &core_module,
h2_worker_get_socket(worker)); h2_worker_get_socket(worker));
/* This works for mpm_worker so far. Other mpm modules have /* This works for mpm_worker so far. Other mpm modules have
@@ -446,7 +443,7 @@ apr_status_t h2_conn_setup(h2_task_env *env, struct h2_worker *worker)
/* all fine */ /* all fine */
break; break;
case H2_MPM_EVENT: case H2_MPM_EVENT:
fix_event_conn(&env->c, master); fix_event_conn(task->c, master);
break; break;
default: default:
/* fingers crossed */ /* fingers crossed */
@@ -458,7 +455,7 @@ apr_status_t h2_conn_setup(h2_task_env *env, struct h2_worker *worker)
* 400 Bad Request * 400 Bad Request
* when names do not match. We prefer a predictable 421 status. * when names do not match. We prefer a predictable 421 status.
*/ */
env->c.keepalives = 1; task->c->keepalives = 1;
return APR_SUCCESS; return APR_SUCCESS;
} }

View File

@@ -17,7 +17,6 @@
#define __mod_h2__h2_conn__ #define __mod_h2__h2_conn__
struct h2_task; struct h2_task;
struct h2_task_env;
struct h2_worker; struct h2_worker;
/* Process the connection that is now starting the HTTP/2 /* Process the connection that is now starting the HTTP/2
@@ -52,7 +51,7 @@ h2_mpm_type_t h2_conn_mpm_type(void);
conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *stream_pool); conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *stream_pool);
apr_status_t h2_conn_setup(struct h2_task_env *env, struct h2_worker *worker); apr_status_t h2_conn_setup(struct h2_task *task, struct h2_worker *worker);
apr_status_t h2_conn_post(conn_rec *c, struct h2_worker *worker); apr_status_t h2_conn_post(conn_rec *c, struct h2_worker *worker);
apr_status_t h2_conn_process(conn_rec *c, apr_socket_t *socket); apr_status_t h2_conn_process(conn_rec *c, apr_socket_t *socket);

View File

@@ -87,12 +87,6 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c)
return APR_SUCCESS; return APR_SUCCESS;
} }
void h2_conn_io_destroy(h2_conn_io *io)
{
io->input = NULL;
io->output = NULL;
}
int h2_conn_io_is_buffered(h2_conn_io *io) int h2_conn_io_is_buffered(h2_conn_io *io)
{ {
return io->bufsize > 0; return io->bufsize > 0;
@@ -277,11 +271,17 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
const char *buf, size_t length) const char *buf, size_t length)
{ {
apr_status_t status = APR_SUCCESS; apr_status_t status = APR_SUCCESS;
io->unflushed = 1;
io->unflushed = 1;
if (io->bufsize > 0) { if (io->bufsize > 0) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, io->connection, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, io->connection,
"h2_conn_io: buffering %ld bytes", (long)length); "h2_conn_io: buffering %ld bytes", (long)length);
if (!APR_BRIGADE_EMPTY(io->output)) {
status = h2_conn_io_flush(io);
io->unflushed = 1;
}
while (length > 0 && (status == APR_SUCCESS)) { while (length > 0 && (status == APR_SUCCESS)) {
apr_size_t avail = io->bufsize - io->buflen; apr_size_t avail = io->bufsize - io->buflen;
if (avail <= 0) { if (avail <= 0) {
@@ -304,16 +304,6 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
} }
} }
else if (1) {
apr_bucket *b;
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, io->connection,
"h2_conn_io: passing %ld transient bytes to output filters",
(long)length);
b = apr_bucket_transient_create(buf,length, io->output->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(io->output, b);
status = pass_out(io->output, io);
}
else { else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, io->connection, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, io->connection,
"h2_conn_io: writing %ld bytes to brigade", (long)length); "h2_conn_io: writing %ld bytes to brigade", (long)length);
@@ -323,15 +313,38 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
return status; return status;
} }
apr_status_t h2_conn_io_append(h2_conn_io *io, apr_bucket *b) apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b)
{ {
APR_BRIGADE_INSERT_TAIL(io->output, b); APR_BRIGADE_INSERT_TAIL(io->output, b);
io->unflushed = 1;
return APR_SUCCESS; return APR_SUCCESS;
} }
apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb) apr_status_t h2_conn_io_consider_flush(h2_conn_io *io)
{ {
return h2_util_move(io->output, bb, 0, NULL, "h2_conn_io_pass"); apr_status_t status = APR_SUCCESS;
int flush_now = 0;
/* The HTTP/1.1 network output buffer/flush behaviour does not
* give optimal performance in the HTTP/2 case, as the pattern of
* buckets (data/eor/eos) is different.
* As long as we do not have found out the "best" way to deal with
* this, force a flush at least every WRITE_BUFFER_SIZE amount
* of data which seems to work nicely.
*/
if (io->unflushed) {
apr_off_t len = 0;
if (!APR_BRIGADE_EMPTY(io->output)) {
apr_brigade_length(io->output, 0, &len);
}
len += io->buflen;
flush_now = (len >= WRITE_BUFFER_SIZE);
}
if (flush_now) {
return h2_conn_io_flush(io);
}
return status;
} }
apr_status_t h2_conn_io_flush(h2_conn_io *io) apr_status_t h2_conn_io_flush(h2_conn_io *io)

View File

@@ -44,8 +44,6 @@ typedef struct {
apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c); apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c);
void h2_conn_io_destroy(h2_conn_io *io);
int h2_conn_io_is_buffered(h2_conn_io *io); int h2_conn_io_is_buffered(h2_conn_io *io);
typedef apr_status_t (*h2_conn_io_on_read_cb)(const char *data, apr_size_t len, typedef apr_status_t (*h2_conn_io_on_read_cb)(const char *data, apr_size_t len,
@@ -61,8 +59,9 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
const char *buf, const char *buf,
size_t length); size_t length);
apr_status_t h2_conn_io_append(h2_conn_io *io, apr_bucket *b); apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b);
apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb);
apr_status_t h2_conn_io_consider_flush(h2_conn_io *io);
apr_status_t h2_conn_io_flush(h2_conn_io *io); apr_status_t h2_conn_io_flush(h2_conn_io *io);

View File

@@ -32,11 +32,11 @@ static h2_ctx *h2_ctx_create(const conn_rec *c)
return ctx; return ctx;
} }
h2_ctx *h2_ctx_create_for(const conn_rec *c, h2_task_env *env) h2_ctx *h2_ctx_create_for(const conn_rec *c, h2_task *task)
{ {
h2_ctx *ctx = h2_ctx_create(c); h2_ctx *ctx = h2_ctx_create(c);
if (ctx) { if (ctx) {
ctx->task_env = env; ctx->task = task;
} }
return ctx; return ctx;
} }
@@ -76,7 +76,7 @@ h2_ctx *h2_ctx_server_set(h2_ctx *ctx, server_rec *s)
int h2_ctx_is_task(h2_ctx *ctx) int h2_ctx_is_task(h2_ctx *ctx)
{ {
return ctx && !!ctx->task_env; return ctx && !!ctx->task;
} }
int h2_ctx_is_active(h2_ctx *ctx) int h2_ctx_is_active(h2_ctx *ctx)
@@ -84,7 +84,7 @@ int h2_ctx_is_active(h2_ctx *ctx)
return ctx && ctx->is_h2; return ctx && ctx->is_h2;
} }
struct h2_task_env *h2_ctx_get_task(h2_ctx *ctx) struct h2_task *h2_ctx_get_task(h2_ctx *ctx)
{ {
return ctx->task_env; return ctx->task;
} }

View File

@@ -16,7 +16,7 @@
#ifndef __mod_h2__h2_ctx__ #ifndef __mod_h2__h2_ctx__
#define __mod_h2__h2_ctx__ #define __mod_h2__h2_ctx__
struct h2_task_env; struct h2_task;
struct h2_config; struct h2_config;
/** /**
@@ -30,7 +30,7 @@ struct h2_config;
typedef struct h2_ctx { typedef struct h2_ctx {
int is_h2; /* h2 engine is used */ int is_h2; /* h2 engine is used */
const char *protocol; /* the protocol negotiated */ const char *protocol; /* the protocol negotiated */
struct h2_task_env *task_env; /* the h2_task environment or NULL */ struct h2_task *task; /* the h2_task executing or NULL */
const char *hostname; /* hostname negotiated via SNI, optional */ const char *hostname; /* hostname negotiated via SNI, optional */
server_rec *server; /* httpd server config selected. */ server_rec *server; /* httpd server config selected. */
struct h2_config *config; /* effective config in this context */ struct h2_config *config; /* effective config in this context */
@@ -38,7 +38,7 @@ typedef struct h2_ctx {
h2_ctx *h2_ctx_get(const conn_rec *c); h2_ctx *h2_ctx_get(const conn_rec *c);
h2_ctx *h2_ctx_rget(const request_rec *r); h2_ctx *h2_ctx_rget(const request_rec *r);
h2_ctx *h2_ctx_create_for(const conn_rec *c, struct h2_task_env *env); h2_ctx *h2_ctx_create_for(const conn_rec *c, struct h2_task *task);
/* Set the h2 protocol established on this connection context or /* Set the h2 protocol established on this connection context or
@@ -58,6 +58,6 @@ const char *h2_ctx_protocol_get(const conn_rec *c);
int h2_ctx_is_task(h2_ctx *ctx); int h2_ctx_is_task(h2_ctx *ctx);
int h2_ctx_is_active(h2_ctx *ctx); int h2_ctx_is_active(h2_ctx *ctx);
struct h2_task_env *h2_ctx_get_task(h2_ctx *ctx); struct h2_task *h2_ctx_get_task(h2_ctx *ctx);
#endif /* defined(__mod_h2__h2_ctx__) */ #endif /* defined(__mod_h2__h2_ctx__) */

View File

@@ -492,8 +492,8 @@ static h2_response *create_response(h2_from_h1 *from_h1, request_rec *r)
apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb) apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
{ {
h2_task_env *env = f->ctx; h2_task *task = f->ctx;
h2_from_h1 *from_h1 = env->output? env->output->from_h1 : NULL; h2_from_h1 *from_h1 = task->output? task->output->from_h1 : NULL;
request_rec *r = f->r; request_rec *r = f->r;
apr_bucket *b; apr_bucket *b;
ap_bucket_error *eb = NULL; ap_bucket_error *eb = NULL;
@@ -503,7 +503,7 @@ apr_status_t h2_response_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_from_h1(%d): output_filter called", from_h1->stream_id); "h2_from_h1(%d): output_filter called", from_h1->stream_id);
if (r->header_only && env->output && from_h1->response) { if (r->header_only && task->output && from_h1->response) {
/* throw away any data after we have compiled the response */ /* throw away any data after we have compiled the response */
apr_brigade_cleanup(bb); apr_brigade_cleanup(bb);
return OK; return OK;

View File

@@ -662,21 +662,21 @@ int h2_h2_process_conn(conn_rec* c)
static int h2_h2_post_read_req(request_rec *r) static int h2_h2_post_read_req(request_rec *r)
{ {
h2_ctx *ctx = h2_ctx_rget(r); h2_ctx *ctx = h2_ctx_rget(r);
struct h2_task_env *env = h2_ctx_get_task(ctx); struct h2_task *task = h2_ctx_get_task(ctx);
if (env) { if (task) {
/* h2_task connection for a stream, not for h2c */ /* h2_task connection for a stream, not for h2c */
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
"adding h1_to_h2_resp output filter"); "adding h1_to_h2_resp output filter");
if (env->serialize_headers) { if (task->serialize_headers) {
ap_remove_output_filter_byhandle(r->output_filters, "H1_TO_H2_RESP"); ap_remove_output_filter_byhandle(r->output_filters, "H1_TO_H2_RESP");
ap_add_output_filter("H1_TO_H2_RESP", env, r, r->connection); ap_add_output_filter("H1_TO_H2_RESP", task, r, r->connection);
} }
else { else {
/* replace the core http filter that formats response headers /* replace the core http filter that formats response headers
* in HTTP/1 with our own that collects status and headers */ * in HTTP/1 with our own that collects status and headers */
ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER"); ap_remove_output_filter_byhandle(r->output_filters, "HTTP_HEADER");
ap_remove_output_filter_byhandle(r->output_filters, "H2_RESPONSE"); ap_remove_output_filter_byhandle(r->output_filters, "H2_RESPONSE");
ap_add_output_filter("H2_RESPONSE", env, r, r->connection); ap_add_output_filter("H2_RESPONSE", task, r, r->connection);
} }
} }
return DECLINED; return DECLINED;

View File

@@ -49,6 +49,9 @@ extern const char *H2_MAGIC_TOKEN;
#define H2_ERR_INADEQUATE_SECURITY (0x0c) #define H2_ERR_INADEQUATE_SECURITY (0x0c)
#define H2_ERR_HTTP_1_1_REQUIRED (0x0d) #define H2_ERR_HTTP_1_1_REQUIRED (0x0d)
/* Maximum number of padding bytes in a frame, rfc7540 */
#define H2_MAX_PADLEN 256
/** /**
* Provide a user readable description of the HTTP/2 error code- * Provide a user readable description of the HTTP/2 error code-
* @param h2_error http/2 error code, as in rfc 7540, ch. 7 * @param h2_error http/2 error code, as in rfc 7540, ch. 7

View File

@@ -23,6 +23,7 @@
#include "h2_private.h" #include "h2_private.h"
#include "h2_io.h" #include "h2_io.h"
#include "h2_response.h" #include "h2_response.h"
#include "h2_task.h"
#include "h2_util.h" #include "h2_util.h"
h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc) h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc)
@@ -39,7 +40,10 @@ h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc)
static void h2_io_cleanup(h2_io *io) static void h2_io_cleanup(h2_io *io)
{ {
(void)io; if (io->task) {
h2_task_destroy(io->task);
io->task = NULL;
}
} }
void h2_io_destroy(h2_io *io) void h2_io_destroy(h2_io *io)

View File

@@ -35,6 +35,9 @@ struct h2_io {
int eos_in; int eos_in;
int task_done; int task_done;
int rst_error; int rst_error;
int zombie;
struct h2_task *task; /* task created for this io */
apr_size_t input_consumed; /* how many bytes have been read */ apr_size_t input_consumed; /* how many bytes have been read */
struct apr_thread_cond_t *input_arrived; /* block on reading */ struct apr_thread_cond_t *input_arrived; /* block on reading */

View File

@@ -127,7 +127,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers)
m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
m->stream_ios = h2_io_set_create(m->pool); m->stream_ios = h2_io_set_create(m->pool);
m->ready_ios = h2_io_set_create(m->pool); m->ready_ios = h2_io_set_create(m->pool);
m->closed = h2_stream_set_create(m->pool);
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->workers = workers; m->workers = workers;
@@ -222,52 +221,18 @@ void h2_mplx_abort(h2_mplx *m)
} }
h2_stream *h2_mplx_open_io(h2_mplx *m, int stream_id) static void io_destroy(h2_mplx *m, h2_io *io)
{ {
h2_stream *stream = NULL;
apr_status_t status;
h2_io *io;
if (m->aborted) {
return NULL;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
apr_pool_t *stream_pool = m->spare_pool;
if (!stream_pool) {
apr_pool_create(&stream_pool, m->pool);
}
else {
m->spare_pool = NULL;
}
stream = h2_stream_create(stream_id, stream_pool, m);
stream->state = H2_STREAM_ST_OPEN;
io = h2_io_set_get(m->stream_ios, stream_id);
if (!io) {
io = h2_io_create(stream_id, stream_pool, m->bucket_alloc);
h2_io_set_add(m->stream_ios, io);
}
status = io? APR_SUCCESS : APR_ENOMEM;
apr_thread_mutex_unlock(m->lock);
}
return stream;
}
static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io)
{
apr_pool_t *pool = h2_stream_detach_pool(stream);
if (pool) {
apr_pool_clear(pool);
if (m->spare_pool) {
apr_pool_destroy(m->spare_pool);
}
m->spare_pool = pool;
}
h2_stream_destroy(stream);
if (io) { if (io) {
apr_pool_t *pool = io->pool;
if (pool) {
io->pool = NULL;
apr_pool_clear(pool);
if (m->spare_pool) {
apr_pool_destroy(m->spare_pool);
}
m->spare_pool = pool;
}
/* The pool is cleared/destroyed which also closes all /* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our * allocated file handles. Give this count back to our
* file handle pool. */ * file handle pool. */
@@ -278,31 +243,36 @@ static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io)
} }
} }
apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream) apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
{ {
apr_status_t status; apr_status_t status;
AP_DEBUG_ASSERT(m); AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock); status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) { if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream->id); h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io) { if (io) {
/* Remove io from ready set, we will never submit it */ /* Remove io from ready set, we will never submit it */
h2_io_set_remove(m->ready_ios, io); h2_io_set_remove(m->ready_ios, io);
if (stream->rst_error) {
/* Forward error code to fail any further attempt to if (io->task_done) {
* write to io */ io_destroy(m, io);
h2_io_rst(io, stream->rst_error); }
else {
/* cleanup once task is done */
io->zombie = 1;
if (rst_error) {
/* Forward error code to fail any further attempt to
* write to io */
h2_io_rst(io, rst_error);
}
} }
} }
if (!io || io->task_done) {
/* No more io or task already done -> cleanup immediately */
stream_destroy(m, stream, io);
}
else {
/* Add stream to closed set for cleanup when task is done */
h2_stream_set_add(m->closed, stream);
}
apr_thread_mutex_unlock(m->lock); apr_thread_mutex_unlock(m->lock);
} }
return status; return status;
@@ -312,21 +282,17 @@ void h2_mplx_task_done(h2_mplx *m, int stream_id)
{ {
apr_status_t status = apr_thread_mutex_lock(m->lock); apr_status_t status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) { if (APR_SUCCESS == status) {
h2_stream *stream = h2_stream_set_get(m->closed, stream_id);
h2_io *io = h2_io_set_get(m->stream_ios, stream_id); h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): task(%d) done", m->id, stream_id); "h2_mplx(%ld): task(%d) done", m->id, stream_id);
if (stream) { if (io) {
/* stream was already closed by main connection and is in
* zombie state. Now that the task is done with it, we
* can free its resources. */
h2_stream_set_remove(m->closed, stream);
stream_destroy(m, stream, io);
}
else if (io) {
/* main connection has not finished stream. Mark task as done
* so that eventual cleanup can start immediately. */
io->task_done = 1; io->task_done = 1;
if (io->zombie) {
io_destroy(m, io);
}
else {
/* hang around until the stream deregisteres */
}
} }
apr_thread_mutex_unlock(m->lock); apr_thread_mutex_unlock(m->lock);
} }
@@ -506,11 +472,11 @@ apr_status_t h2_mplx_out_read_to(h2_mplx *m, int stream_id,
if (APR_SUCCESS == status) { if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id); h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io) { if (io) {
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre"); H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_pre");
status = h2_io_out_read_to(io, bb, plen, peos); status = h2_io_out_read_to(io, bb, plen, peos);
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_post"); H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_read_to_post");
if (status == APR_SUCCESS && io->output_drained) { if (status == APR_SUCCESS && io->output_drained) {
apr_thread_cond_signal(io->output_drained); apr_thread_cond_signal(io->output_drained);
} }
@@ -614,8 +580,9 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
if (io) { if (io) {
if (f) { if (f) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
"h2_mplx(%ld-%d): open response: %s", "h2_mplx(%ld-%d): open response: %s, rst=%d",
m->id, stream_id, response->status); m->id, stream_id, response->status,
response->rst_error);
} }
h2_io_set_response(io, response); h2_io_set_response(io, response);
@@ -752,11 +719,8 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst"); H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
have_out_data_for(m, stream_id); have_out_data_for(m, stream_id);
if (m->aborted) { if (io->output_drained) {
/* if we were the last output, the whole session might apr_thread_cond_signal(io->output_drained);
* have gone down in the meantime.
*/
return APR_SUCCESS;
} }
} }
else { else {
@@ -873,8 +837,28 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
return status; return status;
} }
apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task, static h2_io *open_io(h2_mplx *m, int stream_id)
h2_stream_pri_cmp *cmp, void *ctx) {
apr_pool_t *io_pool = m->spare_pool;
h2_io *io;
if (!io_pool) {
apr_pool_create(&io_pool, m->pool);
}
else {
m->spare_pool = NULL;
}
io = h2_io_create(stream_id, io_pool, m->bucket_alloc);
h2_io_set_add(m->stream_ios, io);
return io;
}
apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
struct h2_request *r, int eos,
h2_stream_pri_cmp *cmp, void *ctx)
{ {
apr_status_t status; apr_status_t status;
@@ -884,17 +868,32 @@ apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task,
} }
status = apr_thread_mutex_lock(m->lock); status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) { if (APR_SUCCESS == status) {
conn_rec *c;
h2_io *io;
cmp_ctx x; cmp_ctx x;
x.cmp = cmp; io = open_io(m, stream_id);
x.ctx = ctx; c = h2_conn_create(m->c, io->pool);
h2_tq_add(m->q, task, task_cmp, &x); io->task = h2_task_create(m->id, stream_id, io->pool, m, c);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, status = h2_request_end_headers(r, m, io->task, eos);
"h2_mplx: do task(%s)", task->id); if (status == APR_SUCCESS && eos) {
status = h2_io_in_close(io);
}
if (status == APR_SUCCESS) {
x.cmp = cmp;
x.ctx = ctx;
h2_tq_add(m->q, io->task, task_cmp, &x);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
"h2_mplx(%ld-%d): process", m->c->id, stream_id);
apr_thread_mutex_unlock(m->lock); apr_thread_mutex_unlock(m->lock);
} }
workers_register(m);
if (status == APR_SUCCESS) {
workers_register(m);
}
return status; return status;
} }
@@ -910,30 +909,9 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
status = apr_thread_mutex_lock(m->lock); status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) { if (APR_SUCCESS == status) {
task = h2_tq_shift(m->q); task = h2_tq_shift(m->q);
if (task) {
h2_task_set_started(task);
}
*has_more = !h2_tq_empty(m->q); *has_more = !h2_tq_empty(m->q);
apr_thread_mutex_unlock(m->lock); apr_thread_mutex_unlock(m->lock);
} }
return task; return task;
} }
apr_status_t h2_mplx_create_task(h2_mplx *m, struct h2_stream *stream)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
conn_rec *c = h2_conn_create(m->c, stream->pool);
stream->task = h2_task_create(m->id, stream->id,
stream->pool, m, c);
apr_thread_mutex_unlock(m->lock);
}
return status;
}

View File

@@ -41,6 +41,7 @@ struct h2_config;
struct h2_response; struct h2_response;
struct h2_task; struct h2_task;
struct h2_stream; struct h2_stream;
struct h2_request;
struct h2_io_set; struct h2_io_set;
struct apr_thread_cond_t; struct apr_thread_cond_t;
struct h2_workers; struct h2_workers;
@@ -70,8 +71,7 @@ struct h2_mplx {
int aborted; int aborted;
apr_size_t stream_max_mem; apr_size_t stream_max_mem;
apr_pool_t *spare_pool; /* spare pool, ready for next stream */ apr_pool_t *spare_pool; /* spare pool, ready for next io */
struct h2_stream_set *closed; /* streams closed, but task ongoing */
struct h2_workers *workers; struct h2_workers *workers;
int file_handles_allowed; int file_handles_allowed;
}; };
@@ -120,15 +120,16 @@ void h2_mplx_task_done(h2_mplx *m, int stream_id);
/******************************************************************************* /*******************************************************************************
* IO lifetime of streams. * IO lifetime of streams.
******************************************************************************/ ******************************************************************************/
/**
* Prepares the multiplexer to handle in-/output on the given stream id.
*/
struct h2_stream *h2_mplx_open_io(h2_mplx *mplx, int stream_id);
/** /**
* Ends cleanup of a stream in sync with execution thread. * Notifies mplx that a stream has finished processing.
*
* @param m the mplx itself
* @param stream_id the id of the stream being done
* @param rst_error if != 0, the stream was reset with the error given
*
*/ */
apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, struct h2_stream *stream); apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
/* Return != 0 iff the multiplexer has data for the given stream. /* Return != 0 iff the multiplexer has data for the given stream.
*/ */
@@ -146,15 +147,18 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
******************************************************************************/ ******************************************************************************/
/** /**
* Schedule a task for execution. * Process a stream request.
* *
* @param m the multiplexer * @param m the multiplexer
* @param task the task to schedule * @param stream_id the identifier of the stream
* @param r the request to be processed
* @param eos if input is complete
* @param cmp the stream priority compare function * @param cmp the stream priority compare function
* @param ctx context data for the compare function * @param ctx context data for the compare function
*/ */
apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task, apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
h2_stream_pri_cmp *cmp, void *ctx); struct h2_request *r, int eos,
h2_stream_pri_cmp *cmp, void *ctx);
/** /**
* Stream priorities have changed, reschedule pending tasks. * Stream priorities have changed, reschedule pending tasks.
@@ -167,8 +171,6 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more); struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
apr_status_t h2_mplx_create_task(h2_mplx *mplx, struct h2_stream *stream);
/******************************************************************************* /*******************************************************************************
* Input handling of streams. * Input handling of streams.
******************************************************************************/ ******************************************************************************/

View File

@@ -151,13 +151,21 @@ apr_status_t h2_request_write_data(h2_request *req,
apr_status_t h2_request_end_headers(h2_request *req, struct h2_mplx *m, apr_status_t h2_request_end_headers(h2_request *req, struct h2_mplx *m,
h2_task *task, int eos) h2_task *task, int eos)
{ {
apr_status_t status;
if (!req->to_h1) { if (!req->to_h1) {
apr_status_t status = insert_request_line(req, m); status = insert_request_line(req, m);
if (status != APR_SUCCESS) { if (status != APR_SUCCESS) {
return status; return status;
} }
} }
return h2_to_h1_end_headers(req->to_h1, task, eos); status = h2_to_h1_end_headers(req->to_h1, eos);
h2_task_set_request(task, req->to_h1->method,
req->to_h1->scheme,
req->to_h1->authority,
req->to_h1->path,
req->to_h1->headers, eos);
return status;
} }
apr_status_t h2_request_close(h2_request *req) apr_status_t h2_request_close(h2_request *req)

View File

@@ -24,6 +24,8 @@
#include <http_log.h> #include <http_log.h>
#include "h2_private.h" #include "h2_private.h"
#include "h2_bucket_eoc.h"
#include "h2_bucket_eos.h"
#include "h2_config.h" #include "h2_config.h"
#include "h2_h2.h" #include "h2_h2.h"
#include "h2_mplx.h" #include "h2_mplx.h"
@@ -56,29 +58,32 @@ static int h2_session_status_from_apr_status(apr_status_t rv)
static int stream_open(h2_session *session, int stream_id) static int stream_open(h2_session *session, int stream_id)
{ {
h2_stream * stream; h2_stream * stream;
apr_pool_t *stream_pool;
if (session->aborted) { if (session->aborted) {
return NGHTTP2_ERR_CALLBACK_FAILURE; return NGHTTP2_ERR_CALLBACK_FAILURE;
} }
stream = h2_mplx_open_io(session->mplx, stream_id); if (session->spare) {
if (stream) { stream_pool = session->spare;
h2_stream_set_add(session->streams, stream); session->spare = NULL;
if (stream->id > session->max_stream_received) { }
session->max_stream_received = stream->id; else {
} apr_pool_create(&stream_pool, session->pool);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_session: stream(%ld-%d): opened",
session->id, stream_id);
return 0;
} }
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, session->c, stream = h2_stream_create(stream_id, stream_pool, session);
APLOGNO(02918) stream->state = H2_STREAM_ST_OPEN;
"h2_session: stream(%ld-%d): unable to create",
h2_stream_set_add(session->streams, stream);
if (stream->id > session->max_stream_received) {
session->max_stream_received = stream->id;
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_session: stream(%ld-%d): opened",
session->id, stream_id); session->id, stream_id);
return NGHTTP2_ERR_INVALID_STREAM_ID;
return 0;
} }
/** /**
@@ -247,8 +252,6 @@ static int before_frame_send_cb(nghttp2_session *ngh2,
case NGHTTP2_GOAWAY: case NGHTTP2_GOAWAY:
session->flush = 1; session->flush = 1;
break; break;
case NGHTTP2_DATA:
default: default:
break; break;
@@ -317,8 +320,9 @@ static apr_status_t stream_destroy(h2_session *session,
h2_stream_rst(stream, error_code); h2_stream_rst(stream, error_code);
} }
h2_stream_set_remove(session->streams, stream); return h2_conn_io_writeb(&session->io,
return h2_mplx_cleanup_stream(session->mplx, stream); h2_bucket_eos_create(session->c->bucket_alloc,
stream));
} }
static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -342,9 +346,6 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
session->id, (int)stream_id, error_code); session->id, (int)stream_id, error_code);
} }
/* always flush on eos */
session->flush = 1;
return 0; return 0;
} }
@@ -522,7 +523,8 @@ static apr_status_t pass_data(void *ctx,
return h2_conn_io_write(&((h2_session*)ctx)->io, data, length); return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
} }
static char immortal_zeros[256];
static char immortal_zeros[H2_MAX_PADLEN];
static int on_send_data_cb(nghttp2_session *ngh2, static int on_send_data_cb(nghttp2_session *ngh2,
nghttp2_frame *frame, nghttp2_frame *frame,
@@ -575,9 +577,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
if (status == APR_SUCCESS && padlen) { if (status == APR_SUCCESS && padlen) {
if (padlen) { if (padlen) {
char pad[256]; status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
memset(pad, 0, padlen);
status = h2_conn_io_write(&session->io, pad, padlen);
} }
} }
} }
@@ -591,7 +591,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
} }
b = apr_bucket_pool_create(header, padlen? 10 : 9, b = apr_bucket_pool_create(header, padlen? 10 : 9,
stream->pool, session->c->bucket_alloc); stream->pool, session->c->bucket_alloc);
status = h2_conn_io_append(&session->io, b); status = h2_conn_io_writeb(&session->io, b);
if (status == APR_SUCCESS) { if (status == APR_SUCCESS) {
apr_size_t len = length; apr_size_t len = length;
@@ -605,12 +605,14 @@ static int on_send_data_cb(nghttp2_session *ngh2,
if (status == APR_SUCCESS && padlen) { if (status == APR_SUCCESS && padlen) {
b = apr_bucket_immortal_create(immortal_zeros, padlen, b = apr_bucket_immortal_create(immortal_zeros, padlen,
session->c->bucket_alloc); session->c->bucket_alloc);
status = h2_conn_io_append(&session->io, b); status = h2_conn_io_writeb(&session->io, b);
} }
} }
if (status == APR_SUCCESS) { if (status == APR_SUCCESS) {
stream->data_frames_sent++;
h2_conn_io_consider_flush(&session->io);
return 0; return 0;
} }
else { else {
@@ -779,13 +781,11 @@ void h2_session_destroy(h2_session *session)
nghttp2_session_del(session->ngh2); nghttp2_session_del(session->ngh2);
session->ngh2 = NULL; session->ngh2 = NULL;
} }
h2_conn_io_destroy(&session->io); }
if (session->iowait) {
apr_thread_cond_destroy(session->iowait);
session->iowait = NULL;
}
void h2_session_cleanup(h2_session *session)
{
h2_session_destroy(session);
if (session->pool) { if (session->pool) {
apr_pool_destroy(session->pool); apr_pool_destroy(session->pool);
} }
@@ -796,14 +796,18 @@ static apr_status_t h2_session_abort_int(h2_session *session, int reason)
AP_DEBUG_ASSERT(session); AP_DEBUG_ASSERT(session);
if (!session->aborted) { if (!session->aborted) {
session->aborted = 1; session->aborted = 1;
if (session->ngh2) {
if (!reason) { if (session->ngh2) {
if (NGHTTP2_ERR_EOF == reason) {
/* This is our way of indication that the connection is
* gone. No use to send any GOAWAY frames. */
nghttp2_session_terminate_session(session->ngh2, reason);
}
else if (!reason) {
nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
session->max_stream_received, session->max_stream_received,
reason, NULL, 0); reason, NULL, 0);
nghttp2_session_send(session->ngh2); nghttp2_session_send(session->ngh2);
h2_conn_io_flush(&session->io);
} }
else { else {
const char *err = nghttp2_strerror(reason); const char *err = nghttp2_strerror(reason);
@@ -812,22 +816,15 @@ static apr_status_t h2_session_abort_int(h2_session *session, int reason)
"session(%ld): aborting session, reason=%d %s", "session(%ld): aborting session, reason=%d %s",
session->id, reason, err); session->id, reason, err);
if (NGHTTP2_ERR_EOF == reason) { /* The connection might still be there and we shut down
/* This is our way of indication that the connection is * with GOAWAY and reason information. */
* gone. No use to send any GOAWAY frames. */ nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
nghttp2_session_terminate_session(session->ngh2, reason); session->max_stream_received,
} reason, (const uint8_t *)err,
else { strlen(err));
/* The connection might still be there and we shut down nghttp2_session_send(session->ngh2);
* with GOAWAY and reason information. */
nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE,
session->max_stream_received,
reason, (const uint8_t *)err,
strlen(err));
nghttp2_session_send(session->ngh2);
h2_conn_io_flush(&session->io);
}
} }
h2_conn_io_flush(&session->io);
} }
h2_mplx_abort(session->mplx); h2_mplx_abort(session->mplx);
} }
@@ -973,7 +970,7 @@ static int resume_on_data(void *ctx, h2_stream *stream) {
AP_DEBUG_ASSERT(stream); AP_DEBUG_ASSERT(stream);
if (h2_stream_is_suspended(stream)) { if (h2_stream_is_suspended(stream)) {
if (h2_mplx_out_has_data_for(stream->m, stream->id)) { if (h2_mplx_out_has_data_for(stream->session->mplx, stream->id)) {
int rv; int rv;
h2_stream_set_suspended(stream, 0); h2_stream_set_suspended(stream, 0);
++rctx->resume_count; ++rctx->resume_count;
@@ -1148,30 +1145,15 @@ apr_status_t h2_session_read(h2_session *session, apr_read_type_e block)
apr_status_t h2_session_close(h2_session *session) apr_status_t h2_session_close(h2_session *session)
{ {
AP_DEBUG_ASSERT(session); AP_DEBUG_ASSERT(session);
return session->aborted? APR_SUCCESS : h2_conn_io_flush(&session->io); if (!session->aborted) {
} h2_session_abort_int(session, 0);
}
/* The session wants to send more DATA for the given stream. ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0,session->c,
*/ "h2_session: closing, writing eoc");
h2_conn_io_writeb(&session->io,
typedef struct { h2_bucket_eoc_create(session->c->bucket_alloc,
char *buf; session));
size_t offset; return h2_conn_io_flush(&session->io);
h2_session *session;
h2_stream *stream;
} cpy_ctx;
static apr_status_t copy_buffer(void *ctx, const char *data, apr_size_t len)
{
cpy_ctx *c = ctx;
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c->session->c,
"h2_stream(%ld-%d): copy %ld bytes for DATA #%ld",
c->session->id, c->stream->id,
(long)len, (long)c->stream->data_frames_sent);
memcpy(c->buf + c->offset, data, len);
c->offset += len;
return APR_SUCCESS;
} }
static ssize_t stream_data_cb(nghttp2_session *ng2s, static ssize_t stream_data_cb(nghttp2_session *ng2s,
@@ -1189,12 +1171,21 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
h2_stream *stream; h2_stream *stream;
AP_DEBUG_ASSERT(session); AP_DEBUG_ASSERT(session);
/* The session wants to send more DATA for the stream. We need
* to find out how much of the requested length we can send without
* blocking.
* Indicate EOS when we encounter it or DEFERRED if the stream
* should be suspended.
* TODO: for handling of TRAILERS, the EOF indication needs
* to be aware of that.
*/
(void)ng2s; (void)ng2s;
(void)buf; (void)buf;
(void)source; (void)source;
stream = h2_stream_set_get(session->streams, stream_id); stream = h2_stream_set_get(session->streams, stream_id);
if (!stream) { if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c, ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02937) APLOGNO(02937)
"h2_stream(%ld-%d): data requested but stream not found", "h2_stream(%ld-%d): data requested but stream not found",
session->id, (int)stream_id); session->id, (int)stream_id);
@@ -1203,25 +1194,9 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream)); AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream));
if (1 || h2_conn_io_is_buffered(&session->io)) { status = h2_stream_prep_read(stream, &nread, &eos);
status = h2_stream_prep_read(stream, &nread, &eos); if (nread) {
if (nread) { *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
}
}
else {
cpy_ctx ctx;
ctx.buf = (char *)buf;
ctx.offset = 0;
ctx.session = session;
ctx.stream = stream;
status = h2_stream_readx(stream, copy_buffer, &ctx, &nread, &eos);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c,
"h2_stream(%ld-%d): read %ld bytes (DATA #%ld)",
session->id, (int)stream_id, (long)nread,
(long)stream->data_frames_sent);
stream->data_frames_sent++;
} }
switch (status) { switch (status) {
@@ -1332,6 +1307,24 @@ apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream)
return status; return status;
} }
apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream)
{
apr_pool_t *pool = h2_stream_detach_pool(stream);
h2_mplx_stream_done(session->mplx, stream->id, stream->rst_error);
h2_stream_set_remove(session->streams, stream->id);
h2_stream_destroy(stream);
if (pool) {
apr_pool_clear(pool);
if (session->spare) {
apr_pool_destroy(session->spare);
}
session->spare = pool;
}
return APR_SUCCESS;
}
int h2_session_is_done(h2_session *session) int h2_session_is_done(h2_session *session)
{ {
AP_DEBUG_ASSERT(session); AP_DEBUG_ASSERT(session);

View File

@@ -77,6 +77,8 @@ struct h2_session {
int max_stream_received; /* highest stream id created */ int max_stream_received; /* highest stream id created */
int max_stream_handled; /* highest stream id handled successfully */ int max_stream_handled; /* highest stream id handled successfully */
apr_pool_t *spare; /* spare stream pool */
struct nghttp2_session *ngh2; /* the nghttp2 session (internal use) */ struct nghttp2_session *ngh2; /* the nghttp2 session (internal use) */
struct h2_workers *workers; /* for executing stream tasks */ struct h2_workers *workers; /* for executing stream tasks */
}; };
@@ -111,6 +113,13 @@ h2_session *h2_session_rcreate(request_rec *r, struct h2_config *cfg,
*/ */
void h2_session_destroy(h2_session *session); void h2_session_destroy(h2_session *session);
/**
* Cleanup the session and all objects it still contains. This will not
* destroy h2_task instances that have not finished yet.
* @param session the session to destroy
*/
void h2_session_cleanup(h2_session *session);
/** /**
* Called once at start of session. * Called once at start of session.
* Sets up the session and sends the initial SETTINGS frame. * Sets up the session and sends the initial SETTINGS frame.
@@ -160,4 +169,12 @@ apr_status_t h2_session_handle_response(h2_session *session,
/* Get the h2_stream for the given stream idenrtifier. */ /* Get the h2_stream for the given stream idenrtifier. */
struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id); struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id);
/**
* Destroy the stream and release it everywhere. Reclaim all resources.
* @param session the session to which the stream belongs
* @param stream the stream to destroy
*/
apr_status_t h2_session_stream_destroy(h2_session *session,
struct h2_stream *stream);
#endif /* defined(__mod_h2__h2_session__) */ #endif /* defined(__mod_h2__h2_session__) */

View File

@@ -16,8 +16,6 @@
#include <assert.h> #include <assert.h>
#include <stddef.h> #include <stddef.h>
#define APR_POOL_DEBUG 7
#include <httpd.h> #include <httpd.h>
#include <http_core.h> #include <http_core.h>
#include <http_connection.h> #include <http_connection.h>
@@ -30,6 +28,7 @@
#include "h2_mplx.h" #include "h2_mplx.h"
#include "h2_request.h" #include "h2_request.h"
#include "h2_response.h" #include "h2_response.h"
#include "h2_session.h"
#include "h2_stream.h" #include "h2_stream.h"
#include "h2_task.h" #include "h2_task.h"
#include "h2_ctx.h" #include "h2_ctx.h"
@@ -46,46 +45,47 @@ static void set_state(h2_stream *stream, h2_stream_state_t state)
} }
} }
h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_mplx *m) h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
{ {
h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream)); h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
if (stream != NULL) { if (stream != NULL) {
stream->id = id; stream->id = id;
stream->state = H2_STREAM_ST_IDLE; stream->state = H2_STREAM_ST_IDLE;
stream->pool = pool; stream->pool = pool;
stream->m = m; stream->session = session;
stream->request = h2_request_create(id, pool, m->c->bucket_alloc); stream->bbout = apr_brigade_create(stream->pool,
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, stream->session->c->bucket_alloc);
"h2_stream(%ld-%d): created", m->id, stream->id); stream->request = h2_request_create(id, pool, session->c->bucket_alloc);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
"h2_stream(%ld-%d): created", session->id, stream->id);
} }
return stream; return stream;
} }
static void h2_stream_cleanup(h2_stream *stream)
{
if (stream->request) {
h2_request_destroy(stream->request);
stream->request = NULL;
}
}
apr_status_t h2_stream_destroy(h2_stream *stream) apr_status_t h2_stream_destroy(h2_stream *stream)
{ {
AP_DEBUG_ASSERT(stream); AP_DEBUG_ASSERT(stream);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c,
"h2_stream(%ld-%d): destroy", stream->m->id, stream->id);
h2_stream_cleanup(stream);
if (stream->task) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
h2_task_destroy(stream->task); "h2_stream(%ld-%d): destroy", stream->session->id, stream->id);
stream->task = NULL; if (stream->request) {
h2_request_destroy(stream->request);
stream->request = NULL;
} }
if (stream->pool) { if (stream->pool) {
apr_pool_destroy(stream->pool); apr_pool_destroy(stream->pool);
} }
return APR_SUCCESS; return APR_SUCCESS;
} }
void h2_stream_cleanup(h2_stream *stream)
{
h2_session_stream_destroy(stream->session, stream);
/* stream is gone */
}
apr_pool_t *h2_stream_detach_pool(h2_stream *stream) apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
{ {
apr_pool_t *pool = stream->pool; apr_pool_t *pool = stream->pool;
@@ -96,9 +96,9 @@ apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
void h2_stream_rst(h2_stream *stream, int error_code) void h2_stream_rst(h2_stream *stream, int error_code)
{ {
stream->rst_error = error_code; stream->rst_error = error_code;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
"h2_stream(%ld-%d): reset, error=%d", "h2_stream(%ld-%d): reset, error=%d",
stream->m->id, stream->id, error_code); stream->session->id, stream->id, error_code);
} }
apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
@@ -109,27 +109,20 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
stream->response = response; stream->response = response;
if (bb && !APR_BRIGADE_EMPTY(bb)) { if (bb && !APR_BRIGADE_EMPTY(bb)) {
int move_all = INT_MAX; int move_all = INT_MAX;
if (!stream->bbout) {
stream->bbout = apr_brigade_create(stream->pool,
stream->m->c->bucket_alloc);
}
/* we can move file handles from h2_mplx into this h2_stream as many /* we can move file handles from h2_mplx into this h2_stream as many
* as we want, since the lifetimes are the same and we are not freeing * as we want, since the lifetimes are the same and we are not freeing
* the ones in h2_mplx->io before this stream is done. */ * the ones in h2_mplx->io before this stream is done. */
status = h2_util_move(stream->bbout, bb, 16 * 1024, &move_all, status = h2_util_move(stream->bbout, bb, 16 * 1024, &move_all,
"h2_stream_set_response"); "h2_stream_set_response");
} }
if (APLOGctrace1(stream->m->c)) { if (APLOGctrace1(stream->session->c)) {
apr_size_t len = 0; apr_size_t len = 0;
int eos = 0; int eos = 0;
if (stream->bbout) { h2_util_bb_avail(stream->bbout, &len, &eos);
h2_util_bb_avail(stream->bbout, &len, &eos); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c,
} "h2_stream(%ld-%d): set_response(%s), len=%ld, eos=%d",
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c, stream->session->id, stream->id, response->status,
"h2_stream(%ld-%d): set_response(%s), brigade=%s, " (long)len, (int)eos);
"len=%ld, eos=%d",
stream->m->id, stream->id, response->status,
(stream->bbout? "yes" : "no"), (long)len, (int)eos);
} }
return status; return status;
} }
@@ -160,7 +153,7 @@ apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r)
return APR_ECONNRESET; return APR_ECONNRESET;
} }
set_state(stream, H2_STREAM_ST_OPEN); set_state(stream, H2_STREAM_ST_OPEN);
status = h2_request_rwrite(stream->request, r, stream->m); status = h2_request_rwrite(stream->request, r, stream->session->mplx);
return status; return status;
} }
@@ -176,32 +169,26 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos,
/* Seeing the end-of-headers, we have everything we need to /* Seeing the end-of-headers, we have everything we need to
* start processing it. * start processing it.
*/ */
status = h2_mplx_create_task(stream->m, stream); status = h2_mplx_process(stream->session->mplx, stream->id,
if (status == APR_SUCCESS) { stream->request, eos, cmp, ctx);
status = h2_request_end_headers(stream->request, if (eos) {
stream->m, stream->task, eos); set_closed(stream);
if (status == APR_SUCCESS) {
status = h2_mplx_do_task(stream->m, stream->task, cmp, ctx);
}
if (eos) {
status = h2_stream_write_eos(stream);
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c,
"h2_mplx(%ld-%d): start stream, task %s %s (%s)",
stream->m->id, stream->id,
stream->request->method, stream->request->path,
stream->request->authority);
} }
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->session->c,
"h2_mplx(%ld-%d): start stream, task %s %s (%s)",
stream->session->id, stream->id,
stream->request->method, stream->request->path,
stream->request->authority);
return status; return status;
} }
apr_status_t h2_stream_write_eos(h2_stream *stream) apr_status_t h2_stream_write_eos(h2_stream *stream)
{ {
AP_DEBUG_ASSERT(stream); AP_DEBUG_ASSERT(stream);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c,
"h2_stream(%ld-%d): closing input", "h2_stream(%ld-%d): closing input",
stream->m->id, stream->id); stream->session->id, stream->id);
if (stream->rst_error) { if (stream->rst_error) {
return APR_ECONNRESET; return APR_ECONNRESET;
} }
@@ -229,7 +216,7 @@ apr_status_t h2_stream_write_header(h2_stream *stream,
return APR_EINVAL; return APR_EINVAL;
} }
return h2_request_write_header(stream->request, name, nlen, return h2_request_write_header(stream->request, name, nlen,
value, vlen, stream->m); value, vlen, stream->session->mplx);
} }
apr_status_t h2_stream_write_data(h2_stream *stream, apr_status_t h2_stream_write_data(h2_stream *stream,
@@ -257,7 +244,8 @@ apr_status_t h2_stream_prep_read(h2_stream *stream,
if (stream->rst_error) { if (stream->rst_error) {
return APR_ECONNRESET; return APR_ECONNRESET;
} }
if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) {
if (!APR_BRIGADE_EMPTY(stream->bbout)) {
src = "stream"; src = "stream";
status = h2_util_bb_avail(stream->bbout, plen, peos); status = h2_util_bb_avail(stream->bbout, plen, peos);
if (status == APR_SUCCESS && !*peos && !*plen) { if (status == APR_SUCCESS && !*peos && !*plen) {
@@ -267,15 +255,15 @@ apr_status_t h2_stream_prep_read(h2_stream *stream,
} }
else { else {
src = "mplx"; src = "mplx";
status = h2_mplx_out_readx(stream->m, stream->id, status = h2_mplx_out_readx(stream->session->mplx, stream->id,
NULL, NULL, plen, peos); NULL, NULL, plen, peos);
} }
if (status == APR_SUCCESS && !*peos && !*plen) { if (status == APR_SUCCESS && !*peos && !*plen) {
status = APR_EAGAIN; status = APR_EAGAIN;
} }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->m->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
"h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d", "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d",
stream->m->id, stream->id, src, (long)*plen, *peos); stream->session->id, stream->id, src, (long)*plen, *peos);
return status; return status;
} }
@@ -289,24 +277,31 @@ apr_status_t h2_stream_readx(h2_stream *stream,
if (stream->rst_error) { if (stream->rst_error) {
return APR_ECONNRESET; return APR_ECONNRESET;
} }
if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) { *peos = 0;
if (!APR_BRIGADE_EMPTY(stream->bbout)) {
apr_size_t origlen = *plen;
src = "stream"; src = "stream";
status = h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos); status = h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos);
if (status == APR_SUCCESS && !*peos && !*plen) { if (status == APR_SUCCESS && !*peos && !*plen) {
apr_brigade_cleanup(stream->bbout); apr_brigade_cleanup(stream->bbout);
*plen = origlen;
return h2_stream_readx(stream, cb, ctx, plen, peos); return h2_stream_readx(stream, cb, ctx, plen, peos);
} }
} }
else { else {
src = "mplx"; src = "mplx";
status = h2_mplx_out_readx(stream->m, stream->id, cb, ctx, plen, peos); status = h2_mplx_out_readx(stream->session->mplx, stream->id,
cb, ctx, plen, peos);
} }
if (status == APR_SUCCESS && !*peos && !*plen) { if (status == APR_SUCCESS && !*peos && !*plen) {
status = APR_EAGAIN; status = APR_EAGAIN;
} }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->m->c,
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
"h2_stream(%ld-%d): readx %s, len=%ld eos=%d", "h2_stream(%ld-%d): readx %s, len=%ld eos=%d",
stream->m->id, stream->id, src, (long)*plen, *peos); stream->session->id, stream->id, src, (long)*plen, *peos);
return status; return status;
} }
@@ -318,18 +313,29 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
if (stream->rst_error) { if (stream->rst_error) {
return APR_ECONNRESET; return APR_ECONNRESET;
} }
if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) {
status = h2_transfer_brigade(bb, stream->bbout, bb->p, plen, peos); if (APR_BRIGADE_EMPTY(stream->bbout)) {
apr_size_t tlen = *plen;
int eos;
status = h2_mplx_out_read_to(stream->session->mplx, stream->id,
stream->bbout, &tlen, &eos);
}
if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->bbout)) {
status = h2_transfer_brigade(bb, stream->bbout, stream->pool,
plen, peos);
} }
else { else {
status = h2_mplx_out_read_to(stream->m, stream->id, bb, plen, peos); *plen = 0;
*peos = 0;
} }
if (status == APR_SUCCESS && !*peos && !*plen) { if (status == APR_SUCCESS && !*peos && !*plen) {
status = APR_EAGAIN; status = APR_EAGAIN;
} }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->m->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
"h2_stream(%ld-%d): read_to, len=%ld eos=%d", "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
stream->m->id, stream->id, (long)*plen, *peos); stream->session->id, stream->id, (long)*plen, *peos);
return status; return status;
} }

View File

@@ -48,6 +48,7 @@ typedef enum {
struct h2_mplx; struct h2_mplx;
struct h2_request; struct h2_request;
struct h2_response; struct h2_response;
struct h2_session;
struct h2_task; struct h2_task;
typedef struct h2_stream h2_stream; typedef struct h2_stream h2_stream;
@@ -55,28 +56,30 @@ typedef struct h2_stream h2_stream;
struct h2_stream { struct h2_stream {
int id; /* http2 stream id */ int id; /* http2 stream id */
h2_stream_state_t state; /* http/2 state of this stream */ h2_stream_state_t state; /* http/2 state of this stream */
struct h2_mplx *m; /* the multiplexer to work with */ struct h2_session *session; /* the session this stream belongs to */
int aborted; /* was aborted */ int aborted; /* was aborted */
int suspended; /* DATA sending has been suspended */ int suspended; /* DATA sending has been suspended */
apr_size_t data_frames_sent;/* # of DATA frames sent out for this stream */ int rst_error; /* stream error for RST_STREAM */
apr_pool_t *pool; /* the memory pool for this stream */ apr_pool_t *pool; /* the memory pool for this stream */
struct h2_request *request; /* the request made in this stream */ struct h2_request *request; /* the request made in this stream */
struct h2_task *task; /* task created for this stream */
struct h2_response *response; /* the response, once ready */ struct h2_response *response; /* the response, once ready */
apr_bucket_brigade *bbout; /* output DATA */ apr_bucket_brigade *bbout; /* output DATA */
int rst_error; /* stream error for RST_STREAM */ apr_size_t data_frames_sent;/* # of DATA frames sent out for this stream */
}; };
#define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def)) #define H2_STREAM_RST(s, def) (s->rst_error? s->rst_error : (def))
h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_mplx *m); h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session);
apr_status_t h2_stream_destroy(h2_stream *stream); apr_status_t h2_stream_destroy(h2_stream *stream);
void h2_stream_cleanup(h2_stream *stream);
void h2_stream_rst(h2_stream *streamm, int error_code); void h2_stream_rst(h2_stream *streamm, int error_code);
apr_pool_t *h2_stream_detach_pool(h2_stream *stream); apr_pool_t *h2_stream_detach_pool(h2_stream *stream);

View File

@@ -54,9 +54,7 @@ void h2_stream_set_destroy(h2_stream_set *sp)
static int h2_stream_id_cmp(const void *s1, const void *s2) static int h2_stream_id_cmp(const void *s1, const void *s2)
{ {
h2_stream **pstream1 = (h2_stream **)s1; return (*((h2_stream **)s1))->id - (*((h2_stream **)s2))->id;
h2_stream **pstream2 = (h2_stream **)s2;
return (*pstream1)->id - (*pstream2)->id;
} }
h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id) h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id)
@@ -101,12 +99,12 @@ apr_status_t h2_stream_set_add(h2_stream_set *sp, h2_stream *stream)
return APR_SUCCESS; return APR_SUCCESS;
} }
h2_stream *h2_stream_set_remove(h2_stream_set *sp, h2_stream *stream) h2_stream *h2_stream_set_remove(h2_stream_set *sp, int stream_id)
{ {
int i; int i;
for (i = 0; i < sp->list->nelts; ++i) { for (i = 0; i < sp->list->nelts; ++i) {
h2_stream *s = H2_STREAM_IDX(sp->list, i); h2_stream *s = H2_STREAM_IDX(sp->list, i);
if (s == stream) { if (s->id == stream_id) {
int n; int n;
--sp->list->nelts; --sp->list->nelts;
n = sp->list->nelts - i; n = sp->list->nelts - i;

View File

@@ -35,7 +35,7 @@ apr_status_t h2_stream_set_add(h2_stream_set *sp, h2_stream *stream);
h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id); h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id);
h2_stream *h2_stream_set_remove(h2_stream_set *sp,h2_stream *stream); h2_stream *h2_stream_set_remove(h2_stream_set *sp, int stream_id);
void h2_stream_set_remove_all(h2_stream_set *sp); void h2_stream_set_remove_all(h2_stream_set *sp);

View File

@@ -52,35 +52,37 @@ static apr_status_t h2_filter_stream_input(ap_filter_t* filter,
ap_input_mode_t mode, ap_input_mode_t mode,
apr_read_type_e block, apr_read_type_e block,
apr_off_t readbytes) { apr_off_t readbytes) {
h2_task_env *env = filter->ctx; h2_task *task = filter->ctx;
AP_DEBUG_ASSERT(env); AP_DEBUG_ASSERT(task);
if (!env->input) { if (!task->input) {
return APR_ECONNABORTED; return APR_ECONNABORTED;
} }
return h2_task_input_read(env->input, filter, brigade, return h2_task_input_read(task->input, filter, brigade,
mode, block, readbytes); mode, block, readbytes);
} }
static apr_status_t h2_filter_stream_output(ap_filter_t* filter, static apr_status_t h2_filter_stream_output(ap_filter_t* filter,
apr_bucket_brigade* brigade) { apr_bucket_brigade* brigade) {
h2_task_env *env = filter->ctx; h2_task *task = filter->ctx;
AP_DEBUG_ASSERT(env); AP_DEBUG_ASSERT(task);
if (!env->output) { if (!task->output) {
return APR_ECONNABORTED; return APR_ECONNABORTED;
} }
return h2_task_output_write(env->output, filter, brigade); return h2_task_output_write(task->output, filter, brigade);
} }
static apr_status_t h2_filter_read_response(ap_filter_t* f, static apr_status_t h2_filter_read_response(ap_filter_t* f,
apr_bucket_brigade* bb) { apr_bucket_brigade* bb) {
h2_task_env *env = f->ctx; h2_task *task = f->ctx;
AP_DEBUG_ASSERT(env); AP_DEBUG_ASSERT(task);
if (!env->output || !env->output->from_h1) { if (!task->output || !task->output->from_h1) {
return APR_ECONNABORTED; return APR_ECONNABORTED;
} }
return h2_from_h1_read_response(env->output->from_h1, f, bb); return h2_from_h1_read_response(task->output->from_h1, f, bb);
} }
static apr_status_t h2_task_process_request(h2_task *task);
/******************************************************************************* /*******************************************************************************
* Register various hooks * Register various hooks
*/ */
@@ -119,15 +121,15 @@ static int h2_task_pre_conn(conn_rec* c, void *arg)
(void)arg; (void)arg;
if (h2_ctx_is_task(ctx)) { if (h2_ctx_is_task(ctx)) {
h2_task_env *env = h2_ctx_get_task(ctx); h2_task *task = h2_ctx_get_task(ctx);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
"h2_h2, pre_connection, found stream task"); "h2_h2, pre_connection, found stream task");
/* Add our own, network level in- and output filters. /* Add our own, network level in- and output filters.
*/ */
ap_add_input_filter("H2_TO_H1", env, NULL, c); ap_add_input_filter("H2_TO_H1", task, NULL, c);
ap_add_output_filter("H1_TO_H2", env, NULL, c); ap_add_output_filter("H1_TO_H2", task, NULL, c);
} }
return OK; return OK;
} }
@@ -137,14 +139,14 @@ static int h2_task_process_conn(conn_rec* c)
h2_ctx *ctx = h2_ctx_get(c); h2_ctx *ctx = h2_ctx_get(c);
if (h2_ctx_is_task(ctx)) { if (h2_ctx_is_task(ctx)) {
if (!ctx->task_env->serialize_headers) { if (!ctx->task->serialize_headers) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
"h2_h2, processing request directly"); "h2_h2, processing request directly");
h2_task_process_request(ctx->task_env); h2_task_process_request(ctx->task);
return DONE; return DONE;
} }
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c,
"h2_task(%s), serialized handling", ctx->task_env->id); "h2_task(%s), serialized handling", ctx->task->id);
} }
return DECLINED; return DECLINED;
} }
@@ -200,121 +202,73 @@ apr_status_t h2_task_do(h2_task *task, h2_worker *worker)
{ {
apr_status_t status = APR_SUCCESS; apr_status_t status = APR_SUCCESS;
h2_config *cfg = h2_config_get(task->mplx->c); h2_config *cfg = h2_config_get(task->mplx->c);
h2_task_env env;
AP_DEBUG_ASSERT(task); AP_DEBUG_ASSERT(task);
memset(&env, 0, sizeof(env)); task->serialize_headers = h2_config_geti(cfg, H2_CONF_SER_HEADERS);
env.id = task->id;
env.stream_id = task->stream_id;
env.mplx = task->mplx;
task->mplx = NULL;
env.input_eos = task->input_eos;
env.serialize_headers = h2_config_geti(cfg, H2_CONF_SER_HEADERS);
/* Create a subpool from the worker one to be used for all things /* Create a subpool from the worker one to be used for all things
* with life-time of this task_env execution. * with life-time of this task execution.
*/ */
apr_pool_create(&env.pool, h2_worker_get_pool(worker)); apr_pool_create(&task->pool, h2_worker_get_pool(worker));
/* Link the env to the worker which provides useful things such /* Link the task to the worker which provides useful things such
* as mutex, a socket etc. */ * as mutex, a socket etc. */
env.io = h2_worker_get_cond(worker); task->io = h2_worker_get_cond(worker);
/* Clone fields, so that lifetimes become (more) independent. */ status = h2_conn_setup(task, worker);
env.method = apr_pstrdup(env.pool, task->method);
env.scheme = apr_pstrdup(env.pool, task->scheme);
env.authority = apr_pstrdup(env.pool, task->authority);
env.path = apr_pstrdup(env.pool, task->path);
env.headers = apr_table_clone(env.pool, task->headers);
/* Setup the pseudo connection to use our own pool and bucket_alloc */
env.c = *task->c;
task->c = NULL;
status = h2_conn_setup(&env, worker);
/* save in connection that this one is a pseudo connection, prevents /* save in connection that this one is a pseudo connection, prevents
* other hooks from messing with it. */ * other hooks from messing with it. */
h2_ctx_create_for(&env.c, &env); h2_ctx_create_for(task->c, task);
if (status == APR_SUCCESS) { if (status == APR_SUCCESS) {
env.input = h2_task_input_create(&env, env.pool, task->input = h2_task_input_create(task, task->pool,
env.c.bucket_alloc); task->c->bucket_alloc);
env.output = h2_task_output_create(&env, env.pool, task->output = h2_task_output_create(task, task->pool,
env.c.bucket_alloc); task->c->bucket_alloc);
status = h2_conn_process(&env.c, h2_worker_get_socket(worker)); status = h2_conn_process(task->c, h2_worker_get_socket(worker));
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, &env.c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, task->c,
"h2_task(%s): processing done", env.id); "h2_task(%s): processing done", task->id);
} }
else { else {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, &env.c, ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, task->c,
APLOGNO(02957) "h2_task(%s): error setting up h2_task_env", APLOGNO(02957) "h2_task(%s): error setting up h2_task",
env.id); task->id);
} }
if (env.input) { if (task->input) {
h2_task_input_destroy(env.input); h2_task_input_destroy(task->input);
env.input = NULL; task->input = NULL;
} }
if (env.output) { if (task->output) {
h2_task_output_close(env.output); h2_task_output_close(task->output);
h2_task_output_destroy(env.output); h2_task_output_destroy(task->output);
env.output = NULL; task->output = NULL;
} }
h2_task_set_finished(task); if (task->io) {
if (env.io) { apr_thread_cond_signal(task->io);
apr_thread_cond_signal(env.io);
} }
if (env.pool) { if (task->pool) {
apr_pool_destroy(env.pool); apr_pool_destroy(task->pool);
env.pool = NULL; task->pool = NULL;
} }
if (env.c.id) { if (task->c->id) {
h2_conn_post(&env.c, worker); h2_conn_post(task->c, worker);
} }
h2_mplx_task_done(env.mplx, env.stream_id); h2_mplx_task_done(task->mplx, task->stream_id);
return status; return status;
} }
int h2_task_has_started(h2_task *task) static request_rec *h2_task_create_request(h2_task *task)
{ {
AP_DEBUG_ASSERT(task); conn_rec *conn = task->c;
return apr_atomic_read32(&task->has_started);
}
void h2_task_set_started(h2_task *task)
{
AP_DEBUG_ASSERT(task);
apr_atomic_set32(&task->has_started, 1);
}
int h2_task_has_finished(h2_task *task)
{
return apr_atomic_read32(&task->has_finished);
}
void h2_task_set_finished(h2_task *task)
{
apr_atomic_set32(&task->has_finished, 1);
}
void h2_task_die(h2_task_env *env, int status, request_rec *r)
{
(void)env;
ap_die(status, r);
}
static request_rec *h2_task_create_request(h2_task_env *env)
{
conn_rec *conn = &env->c;
request_rec *r; request_rec *r;
apr_pool_t *p; apr_pool_t *p;
int access_status = HTTP_OK; int access_status = HTTP_OK;
@@ -332,7 +286,7 @@ static request_rec *h2_task_create_request(h2_task_env *env)
r->allowed_methods = ap_make_method_list(p, 2); r->allowed_methods = ap_make_method_list(p, 2);
r->headers_in = apr_table_copy(r->pool, env->headers); r->headers_in = apr_table_copy(r->pool, task->headers);
r->trailers_in = apr_table_make(r->pool, 5); r->trailers_in = apr_table_make(r->pool, 5);
r->subprocess_env = apr_table_make(r->pool, 25); r->subprocess_env = apr_table_make(r->pool, 25);
r->headers_out = apr_table_make(r->pool, 12); r->headers_out = apr_table_make(r->pool, 12);
@@ -371,19 +325,19 @@ static request_rec *h2_task_create_request(h2_task_env *env)
/* Time to populate r with the data we have. */ /* Time to populate r with the data we have. */
r->request_time = apr_time_now(); r->request_time = apr_time_now();
r->method = env->method; r->method = task->method;
/* Provide quick information about the request method as soon as known */ /* Provide quick information about the request method as soon as known */
r->method_number = ap_method_number_of(r->method); r->method_number = ap_method_number_of(r->method);
if (r->method_number == M_GET && r->method[0] == 'H') { if (r->method_number == M_GET && r->method[0] == 'H') {
r->header_only = 1; r->header_only = 1;
} }
ap_parse_uri(r, env->path); ap_parse_uri(r, task->path);
r->protocol = (char*)"HTTP/2"; r->protocol = (char*)"HTTP/2";
r->proto_num = HTTP_VERSION(2, 0); r->proto_num = HTTP_VERSION(2, 0);
r->the_request = apr_psprintf(r->pool, "%s %s %s", r->the_request = apr_psprintf(r->pool, "%s %s %s",
r->method, env->path, r->protocol); r->method, task->path, r->protocol);
/* update what we think the virtual host is based on the headers we've /* update what we think the virtual host is based on the headers we've
* now read. may update status. * now read. may update status.
@@ -410,7 +364,7 @@ static request_rec *h2_task_create_request(h2_task_env *env)
/* Request check post hooks failed. An example of this would be a /* Request check post hooks failed. An example of this would be a
* request for a vhost where h2 is disabled --> 421. * request for a vhost where h2 is disabled --> 421.
*/ */
h2_task_die(env, access_status, r); ap_die(access_status, r);
ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r); ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r);
ap_run_log_transaction(r); ap_run_log_transaction(r);
r = NULL; r = NULL;
@@ -427,13 +381,13 @@ traceout:
} }
apr_status_t h2_task_process_request(h2_task_env *env) static apr_status_t h2_task_process_request(h2_task *task)
{ {
conn_rec *c = &env->c; conn_rec *c = task->c;
request_rec *r; request_rec *r;
conn_state_t *cs = c->cs; conn_state_t *cs = c->cs;
r = h2_task_create_request(env); r = h2_task_create_request(task);
if (r && (r->status == HTTP_OK)) { if (r && (r->status == HTTP_OK)) {
ap_update_child_status(c->sbh, SERVER_BUSY_READ, r); ap_update_child_status(c->sbh, SERVER_BUSY_READ, r);

View File

@@ -49,29 +49,6 @@ struct h2_task {
int stream_id; int stream_id;
struct h2_mplx *mplx; struct h2_mplx *mplx;
volatile apr_uint32_t has_started;
volatile apr_uint32_t has_finished;
const char *method;
const char *scheme;
const char *authority;
const char *path;
apr_table_t *headers;
int input_eos;
struct conn_rec *c;
};
typedef struct h2_task_env h2_task_env;
struct h2_task_env {
const char *id;
int stream_id;
struct h2_mplx *mplx;
apr_pool_t *pool; /* pool for task lifetime things */
apr_bucket_alloc_t *bucket_alloc;
const char *method; const char *method;
const char *scheme; const char *scheme;
const char *authority; const char *authority;
@@ -81,7 +58,10 @@ struct h2_task_env {
int serialize_headers; int serialize_headers;
struct conn_rec c; struct conn_rec *c;
apr_pool_t *pool; /* pool for task lifetime things */
apr_bucket_alloc_t *bucket_alloc;
struct h2_task_input *input; struct h2_task_input *input;
struct h2_task_output *output; struct h2_task_output *output;
@@ -103,14 +83,7 @@ void h2_task_set_request(h2_task *task,
apr_status_t h2_task_do(h2_task *task, struct h2_worker *worker); apr_status_t h2_task_do(h2_task *task, struct h2_worker *worker);
apr_status_t h2_task_process_request(h2_task_env *env);
int h2_task_has_started(h2_task *task);
void h2_task_set_started(h2_task *task);
int h2_task_has_finished(h2_task *task);
void h2_task_set_finished(h2_task *task);
void h2_task_register_hooks(void); void h2_task_register_hooks(void);
void h2_task_die(h2_task_env *env, int status, request_rec *r);
#endif /* defined(__mod_h2__h2_task__) */ #endif /* defined(__mod_h2__h2_task__) */

View File

@@ -42,28 +42,28 @@ static int ser_header(void *ctx, const char *name, const char *value)
return 1; return 1;
} }
h2_task_input *h2_task_input_create(h2_task_env *env, apr_pool_t *pool, h2_task_input *h2_task_input_create(h2_task *task, apr_pool_t *pool,
apr_bucket_alloc_t *bucket_alloc) apr_bucket_alloc_t *bucket_alloc)
{ {
h2_task_input *input = apr_pcalloc(pool, sizeof(h2_task_input)); h2_task_input *input = apr_pcalloc(pool, sizeof(h2_task_input));
if (input) { if (input) {
input->env = env; input->task = task;
input->bb = NULL; input->bb = NULL;
if (env->serialize_headers) { if (task->serialize_headers) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, &env->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_task_input(%s): serialize request %s %s", "h2_task_input(%s): serialize request %s %s",
env->id, env->method, env->path); task->id, task->method, task->path);
input->bb = apr_brigade_create(pool, bucket_alloc); input->bb = apr_brigade_create(pool, bucket_alloc);
apr_brigade_printf(input->bb, NULL, NULL, "%s %s HTTP/1.1\r\n", apr_brigade_printf(input->bb, NULL, NULL, "%s %s HTTP/1.1\r\n",
env->method, env->path); task->method, task->path);
apr_table_do(ser_header, input, env->headers, NULL); apr_table_do(ser_header, input, task->headers, NULL);
apr_brigade_puts(input->bb, NULL, NULL, "\r\n"); apr_brigade_puts(input->bb, NULL, NULL, "\r\n");
if (input->env->input_eos) { if (input->task->input_eos) {
APR_BRIGADE_INSERT_TAIL(input->bb, apr_bucket_eos_create(bucket_alloc)); APR_BRIGADE_INSERT_TAIL(input->bb, apr_bucket_eos_create(bucket_alloc));
} }
} }
else if (!input->env->input_eos) { else if (!input->task->input_eos) {
input->bb = apr_brigade_create(pool, bucket_alloc); input->bb = apr_brigade_create(pool, bucket_alloc);
} }
else { else {
@@ -71,7 +71,7 @@ h2_task_input *h2_task_input_create(h2_task_env *env, apr_pool_t *pool,
* create a bucket brigade. */ * create a bucket brigade. */
} }
if (APLOGcdebug(&env->c)) { if (APLOGcdebug(task->c)) {
char buffer[1024]; char buffer[1024];
apr_size_t len = sizeof(buffer)-1; apr_size_t len = sizeof(buffer)-1;
if (input->bb) { if (input->bb) {
@@ -81,9 +81,9 @@ h2_task_input *h2_task_input_create(h2_task_env *env, apr_pool_t *pool,
len = 0; len = 0;
} }
buffer[len] = 0; buffer[len] = 0;
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, &env->c, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c,
"h2_task_input(%s): request is: %s", "h2_task_input(%s): request is: %s",
env->id, buffer); task->id, buffer);
} }
} }
return input; return input;
@@ -106,12 +106,12 @@ apr_status_t h2_task_input_read(h2_task_input *input,
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
"h2_task_input(%s): read, block=%d, mode=%d, readbytes=%ld", "h2_task_input(%s): read, block=%d, mode=%d, readbytes=%ld",
input->env->id, block, mode, (long)readbytes); input->task->id, block, mode, (long)readbytes);
if (is_aborted(f)) { if (is_aborted(f)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_task_input(%s): is aborted", "h2_task_input(%s): is aborted",
input->env->id); input->task->id);
return APR_ECONNABORTED; return APR_ECONNABORTED;
} }
@@ -124,12 +124,12 @@ apr_status_t h2_task_input_read(h2_task_input *input,
if (status != APR_SUCCESS) { if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, f->c, ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, f->c,
APLOGNO(02958) "h2_task_input(%s): brigade length fail", APLOGNO(02958) "h2_task_input(%s): brigade length fail",
input->env->id); input->task->id);
return status; return status;
} }
} }
if ((bblen == 0) && input->env->input_eos) { if ((bblen == 0) && input->task->input_eos) {
return APR_EOF; return APR_EOF;
} }
@@ -139,19 +139,19 @@ apr_status_t h2_task_input_read(h2_task_input *input,
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_input(%s): get more data from mplx, block=%d, " "h2_task_input(%s): get more data from mplx, block=%d, "
"readbytes=%ld, queued=%ld", "readbytes=%ld, queued=%ld",
input->env->id, block, input->task->id, block,
(long)readbytes, (long)bblen); (long)readbytes, (long)bblen);
/* Although we sometimes get called with APR_NONBLOCK_READs, /* Although we sometimes get called with APR_NONBLOCK_READs,
we seem to fill our buffer blocking. Otherwise we get EAGAIN, we seem to fill our buffer blocking. Otherwise we get EAGAIN,
return that to our caller and everyone throws up their hands, return that to our caller and everyone throws up their hands,
never calling us again. */ never calling us again. */
status = h2_mplx_in_read(input->env->mplx, APR_BLOCK_READ, status = h2_mplx_in_read(input->task->mplx, APR_BLOCK_READ,
input->env->stream_id, input->bb, input->task->stream_id, input->bb,
input->env->io); input->task->io);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_input(%s): mplx in read returned", "h2_task_input(%s): mplx in read returned",
input->env->id); input->task->id);
if (status != APR_SUCCESS) { if (status != APR_SUCCESS) {
return status; return status;
} }
@@ -164,13 +164,13 @@ apr_status_t h2_task_input_read(h2_task_input *input,
} }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_input(%s): mplx in read, %ld bytes in brigade", "h2_task_input(%s): mplx in read, %ld bytes in brigade",
input->env->id, (long)bblen); input->task->id, (long)bblen);
} }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_input(%s): read, mode=%d, block=%d, " "h2_task_input(%s): read, mode=%d, block=%d, "
"readbytes=%ld, queued=%ld", "readbytes=%ld, queued=%ld",
input->env->id, mode, block, input->task->id, mode, block,
(long)readbytes, (long)bblen); (long)readbytes, (long)bblen);
if (!APR_BRIGADE_EMPTY(input->bb)) { if (!APR_BRIGADE_EMPTY(input->bb)) {
@@ -199,7 +199,7 @@ apr_status_t h2_task_input_read(h2_task_input *input,
buffer[len] = 0; buffer[len] = 0;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_input(%s): getline: %s", "h2_task_input(%s): getline: %s",
input->env->id, buffer); input->task->id, buffer);
} }
return status; return status;
} }

View File

@@ -22,16 +22,16 @@
*/ */
struct apr_thread_cond_t; struct apr_thread_cond_t;
struct h2_mplx; struct h2_mplx;
struct h2_task_env; struct h2_task;
typedef struct h2_task_input h2_task_input; typedef struct h2_task_input h2_task_input;
struct h2_task_input { struct h2_task_input {
struct h2_task_env *env; struct h2_task *task;
apr_bucket_brigade *bb; apr_bucket_brigade *bb;
}; };
h2_task_input *h2_task_input_create(struct h2_task_env *env, apr_pool_t *pool, h2_task_input *h2_task_input_create(struct h2_task *task, apr_pool_t *pool,
apr_bucket_alloc_t *bucket_alloc); apr_bucket_alloc_t *bucket_alloc);
void h2_task_input_destroy(h2_task_input *input); void h2_task_input_destroy(h2_task_input *input);

View File

@@ -33,16 +33,16 @@
#include "h2_util.h" #include "h2_util.h"
h2_task_output *h2_task_output_create(h2_task_env *env, apr_pool_t *pool, h2_task_output *h2_task_output_create(h2_task *task, apr_pool_t *pool,
apr_bucket_alloc_t *bucket_alloc) apr_bucket_alloc_t *bucket_alloc)
{ {
h2_task_output *output = apr_pcalloc(pool, sizeof(h2_task_output)); h2_task_output *output = apr_pcalloc(pool, sizeof(h2_task_output));
(void)bucket_alloc; (void)bucket_alloc;
if (output) { if (output) {
output->env = env; output->task = task;
output->state = H2_TASK_OUT_INIT; output->state = H2_TASK_OUT_INIT;
output->from_h1 = h2_from_h1_create(env->stream_id, pool); output->from_h1 = h2_from_h1_create(task->stream_id, pool);
if (!output->from_h1) { if (!output->from_h1) {
return NULL; return NULL;
} }
@@ -73,18 +73,18 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f,
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
"h2_task_output(%s): write without response " "h2_task_output(%s): write without response "
"for %s %s %s", "for %s %s %s",
output->env->id, output->env->method, output->task->id, output->task->method,
output->env->authority, output->env->path); output->task->authority, output->task->path);
f->c->aborted = 1; f->c->aborted = 1;
} }
if (output->env->io) { if (output->task->io) {
apr_thread_cond_broadcast(output->env->io); apr_thread_cond_broadcast(output->task->io);
} }
return APR_ECONNABORTED; return APR_ECONNABORTED;
} }
return h2_mplx_out_open(output->env->mplx, output->env->stream_id, return h2_mplx_out_open(output->task->mplx, output->task->stream_id,
response, f, bb, output->env->io); response, f, bb, output->task->io);
} }
return APR_EOF; return APR_EOF;
} }
@@ -93,7 +93,7 @@ void h2_task_output_close(h2_task_output *output)
{ {
open_if_needed(output, NULL, NULL); open_if_needed(output, NULL, NULL);
if (output->state != H2_TASK_OUT_DONE) { if (output->state != H2_TASK_OUT_DONE) {
h2_mplx_out_close(output->env->mplx, output->env->stream_id); h2_mplx_out_close(output->task->mplx, output->task->stream_id);
output->state = H2_TASK_OUT_DONE; output->state = H2_TASK_OUT_DONE;
} }
} }
@@ -113,7 +113,7 @@ apr_status_t h2_task_output_write(h2_task_output *output,
apr_status_t status; apr_status_t status;
if (APR_BRIGADE_EMPTY(bb)) { if (APR_BRIGADE_EMPTY(bb)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_task_output(%s): empty write", output->env->id); "h2_task_output(%s): empty write", output->task->id);
return APR_SUCCESS; return APR_SUCCESS;
} }
@@ -121,12 +121,12 @@ apr_status_t h2_task_output_write(h2_task_output *output,
if (status != APR_EOF) { if (status != APR_EOF) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task_output(%s): opened and passed brigade", "h2_task_output(%s): opened and passed brigade",
output->env->id); output->task->id);
return status; return status;
} }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
"h2_task_output(%s): write brigade", output->env->id); "h2_task_output(%s): write brigade", output->task->id);
return h2_mplx_out_write(output->env->mplx, output->env->stream_id, return h2_mplx_out_write(output->task->mplx, output->task->stream_id,
f, bb, output->env->io); f, bb, output->task->io);
} }

View File

@@ -23,7 +23,7 @@
*/ */
struct apr_thread_cond_t; struct apr_thread_cond_t;
struct h2_mplx; struct h2_mplx;
struct h2_task_env; struct h2_task;
struct h2_from_h1; struct h2_from_h1;
typedef enum { typedef enum {
@@ -35,12 +35,12 @@ typedef enum {
typedef struct h2_task_output h2_task_output; typedef struct h2_task_output h2_task_output;
struct h2_task_output { struct h2_task_output {
struct h2_task_env *env; struct h2_task *task;
h2_task_output_state_t state; h2_task_output_state_t state;
struct h2_from_h1 *from_h1; struct h2_from_h1 *from_h1;
}; };
h2_task_output *h2_task_output_create(struct h2_task_env *env, apr_pool_t *pool, h2_task_output *h2_task_output_create(struct h2_task *task, apr_pool_t *pool,
apr_bucket_alloc_t *bucket_alloc); apr_bucket_alloc_t *bucket_alloc);
void h2_task_output_destroy(h2_task_output *output); void h2_task_output_destroy(h2_task_output *output);

View File

@@ -156,7 +156,7 @@ apr_status_t h2_to_h1_add_headers(h2_to_h1 *to_h1, apr_table_t *headers)
return APR_SUCCESS; return APR_SUCCESS;
} }
apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, h2_task *task, int eos) apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, int eos)
{ {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, to_h1->m->c, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, to_h1->m->c,
"h2_to_h1(%ld-%d): end headers", "h2_to_h1(%ld-%d): end headers",
@@ -189,23 +189,8 @@ apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, h2_task *task, int eos)
apr_table_mergen(to_h1->headers, "Transfer-Encoding", "chunked"); apr_table_mergen(to_h1->headers, "Transfer-Encoding", "chunked");
} }
h2_task_set_request(task, to_h1->method,
to_h1->scheme,
to_h1->authority,
to_h1->path,
to_h1->headers, eos);
to_h1->eoh = 1; to_h1->eoh = 1;
if (eos) {
apr_status_t status = h2_to_h1_close(to_h1);
if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, to_h1->m->c,
APLOGNO(02960)
"h2_to_h1(%ld-%d): end headers, eos=%d",
to_h1->m->id, to_h1->stream_id, eos);
}
return status;
}
return APR_SUCCESS; return APR_SUCCESS;
} }

View File

@@ -68,8 +68,7 @@ apr_status_t h2_to_h1_add_headers(h2_to_h1 *to_h1, apr_table_t *headers);
/** End the request headers. /** End the request headers.
*/ */
apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, apr_status_t h2_to_h1_end_headers(h2_to_h1 *to_h1, int eos);
struct h2_task *task, int eos);
/* Add request body data. /* Add request body data.
*/ */