1
0
mirror of https://github.com/apache/httpd.git synced 2026-01-06 09:01:14 +03:00

mod_http2: some more cleanup on stream/task/session takedowns

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1741648 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Stefan Eissing
2016-04-29 15:21:21 +00:00
parent fb0e026602
commit 79804c035b
8 changed files with 258 additions and 217 deletions

View File

@@ -535,6 +535,9 @@ apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block)
status = APR_EAGAIN;
break;
}
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
status = wait_cond(beam, bl.mutex);
}
leave_yellow(beam, &bl);
@@ -716,6 +719,9 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
transfer:
if (beam->aborted) {
if (!!APR_BRIGADE_EMPTY(beam->green)) {
apr_brigade_cleanup(beam->green);
}
status = APR_ECONNABORTED;
goto leave;
}

View File

@@ -131,7 +131,7 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
apr_pool_t *pool)
{
io->c = c;
io->output = apr_brigade_create(pool, c->bucket_alloc);
io->output = apr_brigade_create(c->pool, c->bucket_alloc);
io->buflen = 0;
io->is_tls = h2_h2_is_tls(c);
io->buffer_output = io->is_tls;

View File

@@ -189,6 +189,25 @@ static void check_tx_free(h2_mplx *m)
}
}
static int purge_stream(void *ctx, void *val)
{
h2_mplx *m = ctx;
h2_stream *stream = val;
h2_ihash_remove(m->spurge, stream->id);
h2_stream_destroy(stream);
return 0;
}
static void purge_streams(h2_mplx *m)
{
if (!h2_ihash_empty(m->spurge)) {
while(!h2_ihash_iter(m->spurge, purge_stream, m)) {
/* repeat until empty */
}
h2_ihash_clear(m->spurge);
}
}
static void h2_mplx_destroy(h2_mplx *m)
{
AP_DEBUG_ASSERT(m);
@@ -257,6 +276,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
@@ -294,10 +315,10 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
return max_stream_started;
}
static void input_consumed_signal(h2_mplx *m, h2_task *task)
static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
{
if (task->input.beam && task->worker_started) {
h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */
if (stream->input) {
h2_beam_send(stream->input, NULL, 0); /* trigger updates */
}
}
@@ -310,7 +331,7 @@ static int output_consumed_signal(h2_mplx *m, h2_task *task)
}
static void task_destroy(h2_mplx *m, h2_task *task, int events)
static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
{
conn_rec *slave = NULL;
int reuse_slave = 0;
@@ -323,18 +344,17 @@ static void task_destroy(h2_mplx *m, h2_task *task, int events)
"h2_task(%s): shutdown", task->id);
}
if (events) {
if (called_from_master) {
/* Process outstanding events before destruction */
input_consumed_signal(m, task);
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
if (stream) {
input_consumed_signal(m, stream);
}
}
/* The pool is cleared/destroyed which also closes all
* allocated file handles. Give this count back to our
* file handle pool. */
if (task->input.beam) {
m->tx_handles_reserved +=
h2_beam_get_files_beamed(task->input.beam);
}
if (task->output.beam) {
m->tx_handles_reserved +=
h2_beam_get_files_beamed(task->output.beam);
@@ -366,49 +386,69 @@ static void task_destroy(h2_mplx *m, h2_task *task, int events)
static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
{
h2_task *task;
h2_task *task = h2_ihash_get(m->tasks, stream->id);
/* Situation: we are, on the master connection, done with processing
* the stream. Either we have handled it successfully, or the stream
* was reset by the client or the connection is gone and we are
* shutting down the whole session.
*
* We possibly have created a task for this stream to be processed
* on a slave connection. The processing might actually be ongoing
* right now or has already finished. A finished task waits for its
* stream to be done. This is the common case.
*
* If the stream had input (e.g. the request had a body), a task
* may have read, or is still reading buckets from the input beam.
* This means that the task is referencing memory from the stream's
* pool (or the master connection bucket alloc). Before we can free
* the stream pool, we need to make sure that those references are
* gone. This is what h2_beam_shutdown() on the input waits for.
*
* With the input handled, we can tear down that beam and care
* about the output beam. The stream might still have buffered some
* buckets read from the output, so we need to get rid of those. That
* is done by h2_stream_cleanup().
*
* Now it is save to destroy the task (if it exists and is finished).
*
* FIXME: we currently destroy the stream, even if the task is still
* ongoing. This is not ok, since task->request is coming from stream
* memory. We should either copy it on task creation or wait with the
* stream destruction until the task is done.
*/
h2_ihash_remove(m->streams, stream->id);
if (stream->input) {
apr_status_t status;
status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
if (status == APR_EAGAIN) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_stream(%ld-%d): wait on input shutdown",
m->id, stream->id);
status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_stream(%ld-%d): input shutdown returned",
m->id, stream->id);
}
m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
}
h2_stream_cleanup(stream);
task = h2_ihash_get(m->tasks, stream->id);
if (task) {
/* Remove task from ready set, we will never submit it */
h2_ihash_remove(m->ready_tasks, stream->id);
task->input.beam = NULL;
if (task->worker_done) {
/* already finished or not even started yet */
h2_iq_remove(m->q, task->stream_id);
task_destroy(m, task, 0);
}
else {
if (!task->worker_done) {
/* task still running, cleanup once it is done */
task->orphaned = 1;
task->input.beam = NULL;
if (rst_error) {
h2_task_rst(task, rst_error);
}
/* FIXME: this should work, but does not
h2_ihash_add(m->shold, stream);
return;*/
}
else {
/* already finished */
h2_iq_remove(m->q, task->stream_id);
task_destroy(m, task, 0);
}
}
h2_stream_destroy(stream);
}
static int stream_done_iter(void *ctx, void *val)
{
h2_stream *stream = val;
stream_done((h2_mplx*)ctx, val, 0);
h2_stream_destroy(stream);
return 0;
}
@@ -416,6 +456,7 @@ static int task_print(void *ctx, void *val)
{
h2_mplx *m = ctx;
h2_task *task = val;
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
if (task->request) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%s): %s %s %s -> %s %d"
@@ -424,7 +465,7 @@ static int task_print(void *ctx, void *val)
task->request->authority, task->request->path,
task->response? "http" : (task->rst_error? "reset" : "?"),
task->response? task->response->http_status : task->rst_error,
task->orphaned, task->worker_started,
(stream? 0 : 1), task->worker_started,
task->worker_done);
}
else if (task) {
@@ -493,6 +534,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
apr_thread_cond_broadcast(m->task_thawed);
}
}
AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
purge_streams(m);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join (%d tasks left) -> destroy",
@@ -516,24 +560,17 @@ void h2_mplx_abort(h2_mplx *m)
}
}
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error)
apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
{
apr_status_t status = APR_SUCCESS;
int acquired;
/* This maybe called from inside callbacks that already hold the lock.
* E.g. when we are streaming out DATA and the EOF triggers the stream
* release.
*/
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_stream *stream = h2_ihash_get(m->streams, stream_id);
if (stream) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld-%d): marking stream as done.",
m->id, stream_id);
stream_done(m, stream, rst_error);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld-%d): marking stream as done.",
m->id, stream->id);
stream_done(m, stream, stream->rst_error);
leave_mutex(m, acquired);
}
return status;
@@ -547,8 +584,7 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
static int update_window(void *ctx, void *val)
{
h2_mplx *m = ctx;
input_consumed_signal(m, val);
input_consumed_signal(ctx, val);
return 1;
}
@@ -562,7 +598,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
return APR_ECONNABORTED;
}
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_ihash_iter(m->tasks, update_window, m);
h2_ihash_iter(m->streams, update_window, m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_session(%ld): windows updated", m->id);
@@ -580,7 +616,7 @@ static int task_iter_first(void *ctx, void *val)
return 0;
}
h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
h2_stream *h2_mplx_next_submit(h2_mplx *m)
{
apr_status_t status;
h2_stream *stream = NULL;
@@ -597,7 +633,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
h2_task *task = ctx.task;
h2_ihash_remove(m->ready_tasks, task->stream_id);
stream = h2_ihash_get(streams, task->stream_id);
stream = h2_ihash_get(m->streams, task->stream_id);
if (stream && task) {
task->submitted = 1;
if (task->rst_error) {
@@ -618,16 +654,14 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
"h2_mplx(%s): stream for response closed, "
"resetting io to close request processing",
task->id);
task->orphaned = 1;
h2_task_rst(task, H2_ERR_STREAM_CLOSED);
if (!task->worker_started || task->worker_done) {
task_destroy(m, task, 1);
}
else {
/* hang around until the h2_task is done, but
* shutdown input/output and send out any events asap. */
* shutdown output */
h2_task_shutdown(task, 0);
input_consumed_signal(m, task);
}
}
}
@@ -640,8 +674,9 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status = APR_SUCCESS;
h2_task *task = h2_ihash_get(m->tasks, stream_id);
h2_stream *stream = h2_ihash_get(m->streams, stream_id);
if (!task || task->orphaned) {
if (!task || !stream) {
return APR_ECONNABORTED;
}
@@ -691,8 +726,9 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
static apr_status_t out_close(h2_mplx *m, h2_task *task)
{
apr_status_t status = APR_SUCCESS;
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
if (!task || task->orphaned) {
if (!task || !stream) {
return APR_ECONNABORTED;
}
@@ -885,93 +921,103 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
{
if (task) {
if (task->frozen) {
/* this task was handed over to an engine for processing
* and the original worker has finished. That means the
* engine may start processing now. */
h2_task_thaw(task);
/* we do not want the task to block on writing response
* bodies into the mplx. */
/* FIXME: this implementation is incomplete. */
h2_task_set_io_blocking(task, 0);
apr_thread_cond_broadcast(m->task_thawed);
if (task->frozen) {
/* this task was handed over to an engine for processing
* and the original worker has finished. That means the
* engine may start processing now. */
h2_task_thaw(task);
/* we do not want the task to block on writing response
* bodies into the mplx. */
/* FIXME: this implementation is incomplete. */
h2_task_set_io_blocking(task, 0);
apr_thread_cond_broadcast(m->task_thawed);
return;
}
else {
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
out_close(m, task);
if (ngn) {
apr_off_t bytes = 0;
if (task->output.beam) {
h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
bytes += h2_beam_get_buffered(task->output.beam);
}
if (bytes > 0) {
/* we need to report consumed and current buffered output
* to the engine. The request will be streamed out or cancelled,
* no more data is coming from it and the engine should update
* its calculations before we destroy this information. */
h2_req_engine_out_consumed(ngn, task->c, bytes);
}
}
if (task->engine) {
if (!h2_req_engine_is_shutdown(task->engine)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): task(%s) has not-shutdown "
"engine(%s)", m->id, task->id,
h2_req_engine_get_id(task->engine));
}
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
}
if (!m->aborted && stream && m->redo_tasks
&& h2_ihash_get(m->redo_tasks, task->stream_id)) {
/* reset and schedule again */
h2_task_redo(task);
h2_ihash_remove(m->redo_tasks, task->stream_id);
h2_iq_add(m->q, task->stream_id, NULL, NULL);
return;
}
task->worker_done = 1;
task->done_at = apr_time_now();
if (task->output.beam) {
h2_beam_on_consumed(task->output.beam, NULL, NULL);
h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%s): request done, %f ms"
" elapsed", task->id,
(task->done_at - task->started_at) / 1000.0);
if (task->started_at > m->last_idle_block) {
/* this task finished without causing an 'idle block', e.g.
* a block by flow control.
*/
if (task->done_at- m->last_limit_change >= m->limit_change_interval
&& m->workers_limit < m->workers_max) {
/* Well behaving stream, allow it more workers */
m->workers_limit = H2MIN(m->workers_limit * 2,
m->workers_max);
m->last_limit_change = task->done_at;
m->need_registration = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): increase worker limit to %d",
m->id, m->workers_limit);
}
}
if (stream) {
/* hang around until the stream deregisters */
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
out_close(m, task);
if (ngn) {
apr_off_t bytes = 0;
if (task->output.beam) {
h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
bytes += h2_beam_get_buffered(task->output.beam);
}
if (bytes > 0) {
/* we need to report consumed and current buffered output
* to the engine. The request will be streamed out or cancelled,
* no more data is coming from it and the engine should update
* its calculations before we destroy this information. */
h2_req_engine_out_consumed(ngn, task->c, bytes);
}
stream = h2_ihash_get(m->shold, task->stream_id);
task_destroy(m, task, 0);
if (stream) {
stream->response = NULL; /* ref from task memory */
/* We cannot destroy the stream here since this is
* called from a worker thread and freeing memory pools
* is only safe in the only thread using it (and its
* parent pool / allocator) */
h2_ihash_remove(m->shold, stream->id);
h2_ihash_add(m->spurge, stream);
}
if (task->engine) {
if (!h2_req_engine_is_shutdown(task->engine)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
"h2_mplx(%ld): task(%s) has not-shutdown "
"engine(%s)", m->id, task->id,
h2_req_engine_get_id(task->engine));
}
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
}
if (!m->aborted && !task->orphaned && m->redo_tasks
&& h2_ihash_get(m->redo_tasks, task->stream_id)) {
/* reset and schedule again */
h2_task_redo(task);
h2_ihash_remove(m->redo_tasks, task->stream_id);
h2_iq_add(m->q, task->stream_id, NULL, NULL);
return;
}
task->worker_done = 1;
task->done_at = apr_time_now();
if (task->output.beam) {
h2_beam_on_consumed(task->output.beam, NULL, NULL);
h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%s): request done, %f ms"
" elapsed", task->id,
(task->done_at - task->started_at) / 1000.0);
if (task->started_at > m->last_idle_block) {
/* this task finished without causing an 'idle block', e.g.
* a block by flow control.
*/
if (task->done_at- m->last_limit_change >= m->limit_change_interval
&& m->workers_limit < m->workers_max) {
/* Well behaving stream, allow it more workers */
m->workers_limit = H2MIN(m->workers_limit * 2,
m->workers_max);
m->last_limit_change = task->done_at;
m->need_registration = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): increase worker limit to %d",
m->id, m->workers_limit);
}
}
if (task->orphaned) {
task_destroy(m, task, 0);
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
}
}
else {
/* hang around until the stream deregisters */
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
}
}
}
@@ -1177,11 +1223,13 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
task->r = r;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (task->orphaned) {
status = APR_ECONNABORTED;
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
if (stream) {
status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
}
else {
status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
status = APR_ECONNABORTED;
}
leave_mutex(m, acquired);
}

View File

@@ -73,6 +73,8 @@ struct h2_mplx {
unsigned int need_registration : 1;
struct h2_ihash_t *streams; /* all streams currently processing */
struct h2_ihash_t *shold; /* all streams done with task ongoing */
struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
struct h2_iqueue *q; /* all stream ids that need to be started */
struct h2_ihash_t *tasks; /* all tasks started and not destroyed */
@@ -167,7 +169,7 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m);
* @param rst_error if != 0, the stream was reset with the error given
*
*/
apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream);
/**
* Waits on output data from any stream in this session to become available.
@@ -235,8 +237,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
* @param m the mplxer to get a response from
* @param bb the brigade to place any existing repsonse body data into
*/
struct h2_stream *h2_mplx_next_submit(h2_mplx *m,
struct h2_ihash_t *streams);
struct h2_stream *h2_mplx_next_submit(h2_mplx *m);
/**
* Opens the output for the given stream with the specified response.

View File

@@ -128,19 +128,16 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id,
h2_stream * stream;
apr_pool_t *stream_pool;
if (session->spare) {
stream_pool = session->spare;
session->spare = NULL;
}
else {
apr_pool_create(&stream_pool, session->pool);
apr_pool_tag(stream_pool, "h2_stream");
}
apr_pool_create(&stream_pool, session->pool);
apr_pool_tag(stream_pool, "h2_stream");
stream = h2_stream_open(stream_id, stream_pool, session,
initiated_on, req);
++session->open_streams;
++session->unanswered_streams;
nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
h2_ihash_add(session->streams, stream);
if (H2_STREAM_CLIENT_INITIATED(stream_id)) {
if (stream_id > session->remote.emitted_max) {
++session->remote.emitted_count;
@@ -262,6 +259,11 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2,
return 0;
}
static h2_stream *get_stream(h2_session *session, int stream_id)
{
return nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
}
static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
int32_t stream_id,
const uint8_t *data, size_t len, void *userp)
@@ -277,7 +279,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
return 0;
}
stream = h2_session_get_stream(session, stream_id);
stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064)
"h2_stream(%ld-%d): on_data_chunk for unknown stream",
@@ -342,7 +344,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
h2_stream *stream;
(void)ngh2;
stream = h2_session_get_stream(session, stream_id);
stream = get_stream(session, stream_id);
if (stream) {
stream_release(session, stream, error_code);
}
@@ -358,7 +360,7 @@ static int on_begin_headers_cb(nghttp2_session *ngh2,
/* We may see HEADERs at the start of a stream or after all DATA
* streams to carry trailers. */
(void)ngh2;
s = h2_session_get_stream(session, frame->hd.stream_id);
s = get_stream(session, frame->hd.stream_id);
if (s) {
/* nop */
}
@@ -385,7 +387,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame,
return 0;
}
stream = h2_session_get_stream(session, frame->hd.stream_id);
stream = get_stream(session, frame->hd.stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02920)
@@ -432,7 +434,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
/* This can be HEADERS for a new stream, defining the request,
* or HEADER may come after DATA at the end of a stream as in
* trailers */
stream = h2_session_get_stream(session, frame->hd.stream_id);
stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
@@ -456,7 +458,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
}
break;
case NGHTTP2_DATA:
stream = h2_session_get_stream(session, frame->hd.stream_id);
stream = get_stream(session, frame->hd.stream_id);
if (stream) {
int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
@@ -493,7 +495,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
"h2_session(%ld-%d): RST_STREAM by client, errror=%d",
session->id, (int)frame->hd.stream_id,
(int)frame->rst_stream.error_code);
stream = h2_session_get_stream(session, frame->hd.stream_id);
stream = get_stream(session, frame->hd.stream_id);
if (stream && stream->request && stream->request->initiated_on) {
++session->pushes_reset;
}
@@ -567,7 +569,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
}
padlen = (unsigned char)frame->data.padlen;
stream = h2_session_get_stream(session, stream_id);
stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c,
APLOGNO(02924)
@@ -699,10 +701,6 @@ static void h2_session_cleanup(h2_session *session)
nghttp2_session_del(session->ngh2);
session->ngh2 = NULL;
}
if (session->spare) {
apr_pool_destroy(session->spare);
session->spare = NULL;
}
}
static void h2_session_destroy(h2_session *session)
@@ -710,8 +708,12 @@ static void h2_session_destroy(h2_session *session)
AP_DEBUG_ASSERT(session);
h2_session_cleanup(session);
AP_DEBUG_ASSERT(session->open_streams == h2_ihash_count(session->streams));
h2_ihash_clear(session->streams);
session->open_streams = 0;
ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
session->c->input_filters), "H2_IN");
if (APLOGctrace1(session->c)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session(%ld): destroy", session->id);
@@ -1138,10 +1140,8 @@ static int resume_on_data(void *ctx, void *val)
static int h2_session_resume_streams_with_data(h2_session *session)
{
AP_DEBUG_ASSERT(session);
if (!h2_ihash_empty(session->streams)
&& session->mplx && !session->mplx->aborted) {
if (session->open_streams && !session->mplx->aborted) {
resume_ctx ctx;
ctx.session = session;
ctx.resume_count = 0;
@@ -1153,11 +1153,6 @@ static int h2_session_resume_streams_with_data(h2_session *session)
return 0;
}
h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
{
return h2_ihash_get(session->streams, stream_id);
}
static ssize_t stream_data_cb(nghttp2_session *ng2s,
int32_t stream_id,
uint8_t *buf,
@@ -1183,7 +1178,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
(void)ng2s;
(void)buf;
(void)source;
stream = h2_session_get_stream(session, stream_id);
stream = get_stream(session, stream_id);
if (!stream) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c,
APLOGNO(02937)
@@ -1334,7 +1329,7 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream)
stream->id, err);
}
stream->submitted = 1;
--session->unanswered_streams;
if (stream->request && stream->request->initiated_on) {
++session->pushes_submitted;
}
@@ -1384,7 +1379,6 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
"h2_stream(%ld-%d): scheduling push stream",
session->id, stream->id);
h2_stream_cleanup(stream);
stream = NULL;
}
++session->unsent_promises;
@@ -1509,29 +1503,14 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
{
apr_pool_t *pool = h2_stream_detach_pool(stream);
int stream_id = stream->id;
int rst_error = stream->rst_error;
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): cleanup by EOS bucket destroy",
session->id, stream_id);
if (session->streams) {
h2_ihash_remove(session->streams, stream_id);
}
session->id, stream->id);
h2_ihash_remove(session->streams, stream->id);
--session->open_streams;
--session->unanswered_streams;
h2_mplx_stream_done(session->mplx, stream);
h2_stream_cleanup(stream);
h2_mplx_stream_done(session->mplx, stream_id, rst_error);
h2_stream_destroy(stream);
if (pool) {
apr_pool_clear(pool);
if (session->spare) {
apr_pool_destroy(session->spare);
}
session->spare = pool;
}
return APR_SUCCESS;
}
@@ -1708,7 +1687,7 @@ static apr_status_t h2_session_submit(h2_session *session)
if (has_unsubmitted_streams(session)) {
/* If we have responses ready, submit them now. */
while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) {
while ((stream = h2_mplx_next_submit(session->mplx))) {
status = submit_response(session, stream);
++session->unsent_submits;
@@ -1770,7 +1749,7 @@ static void update_child_status(h2_session *session, int status, const char *msg
apr_snprintf(session->status, sizeof(session->status),
"%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)",
msg? msg : "-",
(int)h2_ihash_count(session->streams),
(int)session->open_streams,
(int)session->remote.emitted_count,
(int)session->responses_submitted,
(int)session->pushes_submitted,
@@ -1788,7 +1767,7 @@ static void transit(h2_session *session, const char *action, h2_session_state ns
session->state = nstate;
switch (session->state) {
case H2_SESSION_ST_IDLE:
update_child_status(session, (h2_ihash_empty(session->streams)?
update_child_status(session, (session->open_streams == 0?
SERVER_BUSY_KEEPALIVE
: SERVER_BUSY_READ), "idle");
break;
@@ -1920,7 +1899,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg)
if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
}
if (h2_ihash_empty(session->streams)) {
if (!session->open_streams) {
if (!is_accepting_streams(session)) {
/* We are no longer accepting new streams and have
* finished processing existing ones. Time to leave. */
@@ -2125,9 +2104,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
break;
case H2_SESSION_ST_IDLE:
/* make certain, the client receives everything before we idle */
if (!session->keep_sync_until
&& async && h2_ihash_empty(session->streams)
/* make certain, we send everything before we idle */
if (!session->keep_sync_until && async && !session->open_streams
&& !session->r && session->remote.emitted_count) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
"h2_session(%ld): async idle, nonblock read", session->id);
@@ -2225,8 +2203,8 @@ apr_status_t h2_session_process(h2_session *session, int async)
}
}
if (!h2_ihash_empty(session->streams)) {
/* resume any streams for which data is available again */
if (session->open_streams) {
/* resume any streams with output data */
h2_session_resume_streams_with_data(session);
/* Submit any responses/push_promises that are ready */
status = h2_session_submit(session);

View File

@@ -100,6 +100,8 @@ typedef struct h2_session {
struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
int open_streams; /* number of streams open */
int unanswered_streams; /* number of streams waiting for response */
int unsent_submits; /* number of submitted, but not yet written responses. */
int unsent_promises; /* number of submitted, but not yet written push promised */
@@ -122,8 +124,6 @@ typedef struct h2_session {
apr_bucket_brigade *bbtmp; /* brigade for keeping temporary data */
struct apr_thread_cond_t *iowait; /* our cond when trywaiting for data */
apr_pool_t *spare; /* spare stream pool */
char status[64]; /* status message for scoreboard */
int last_status_code; /* the one already reported */
const char *last_status_msg; /* the one already reported */
@@ -190,9 +190,6 @@ void h2_session_close(h2_session *session);
apr_status_t h2_session_handle_response(h2_session *session,
struct h2_stream *stream);
/* Get the h2_stream for the given stream idenrtifier. */
struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id);
/**
* Create and register a new stream under the given id.
*

View File

@@ -182,19 +182,31 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session,
void h2_stream_cleanup(h2_stream *stream)
{
AP_DEBUG_ASSERT(stream);
if (stream->input) {
h2_beam_destroy(stream->input);
stream->input = NULL;
}
if (stream->buffer) {
apr_brigade_cleanup(stream->buffer);
}
if (stream->input) {
apr_status_t status;
status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
if (status == APR_EAGAIN) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
"h2_stream(%ld-%d): wait on input shutdown",
stream->session->id, stream->id);
status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
"h2_stream(%ld-%d): input shutdown returned",
stream->session->id, stream->id);
}
}
}
void h2_stream_destroy(h2_stream *stream)
{
AP_DEBUG_ASSERT(stream);
h2_stream_cleanup(stream);
if (stream->input) {
h2_beam_destroy(stream->input);
stream->input = NULL;
}
if (stream->pool) {
apr_pool_destroy(stream->pool);
}

View File

@@ -83,7 +83,6 @@ struct h2_task {
unsigned int frozen : 1;
unsigned int blocking : 1;
unsigned int detached : 1;
unsigned int orphaned : 1; /* h2_stream is gone for this task */
unsigned int submitted : 1; /* response has been submitted to client */
unsigned int worker_started : 1; /* h2_worker started processing for this io */
unsigned int worker_done : 1; /* h2_worker finished for this io */