You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-07-31 09:24:31 +03:00
Move the in-flight counter decrement to a drop guard
This commit is contained in:
@ -18,6 +18,7 @@ use futures_util::{future::BoxFuture, FutureExt as _};
|
|||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
metrics::{Counter, Histogram, UpDownCounter},
|
metrics::{Counter, Histogram, UpDownCounter},
|
||||||
trace::{FutureExt as _, TraceContextExt},
|
trace::{FutureExt as _, TraceContextExt},
|
||||||
|
Context, KeyValue,
|
||||||
};
|
};
|
||||||
use tower::Service;
|
use tower::Service;
|
||||||
|
|
||||||
@ -74,6 +75,29 @@ impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError, S>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct InFlightGuard {
|
||||||
|
context: Context,
|
||||||
|
meter: UpDownCounter<i64>,
|
||||||
|
attributes: Vec<KeyValue>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InFlightGuard {
|
||||||
|
fn increment(context: &Context, meter: &UpDownCounter<i64>, attributes: &[KeyValue]) -> Self {
|
||||||
|
meter.add(context, 1, attributes);
|
||||||
|
Self {
|
||||||
|
context: context.clone(),
|
||||||
|
meter: meter.clone(),
|
||||||
|
attributes: attributes.to_vec(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for InFlightGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.meter.add(&self.context, -1, &self.attributes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<Req, S, ExtractContextT, InjectContextT, MakeSpanBuilderT, OnResponseT, OnErrorT> Service<Req>
|
impl<Req, S, ExtractContextT, InjectContextT, MakeSpanBuilderT, OnResponseT, OnErrorT> Service<Req>
|
||||||
for Trace<ExtractContextT, InjectContextT, MakeSpanBuilderT, OnResponseT, OnErrorT, S>
|
for Trace<ExtractContextT, InjectContextT, MakeSpanBuilderT, OnResponseT, OnErrorT, S>
|
||||||
where
|
where
|
||||||
@ -94,7 +118,6 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, request: Req) -> Self::Future {
|
fn call(&mut self, request: Req) -> Self::Future {
|
||||||
let inflight_requests = self.inflight_requests.clone();
|
|
||||||
let request_counter = self.request_counter.clone();
|
let request_counter = self.request_counter.clone();
|
||||||
let request_histogram = self.request_histogram.clone();
|
let request_histogram = self.request_histogram.clone();
|
||||||
let start_time = SystemTime::now();
|
let start_time = SystemTime::now();
|
||||||
@ -106,7 +129,7 @@ where
|
|||||||
let cx = cx.with_span(span);
|
let cx = cx.with_span(span);
|
||||||
let request = self.inject_context.inject_context(&cx, request);
|
let request = self.inject_context.inject_context(&cx, request);
|
||||||
|
|
||||||
inflight_requests.add(&cx, 1, &metrics_labels);
|
let guard = InFlightGuard::increment(&cx, &self.inflight_requests, &metrics_labels);
|
||||||
|
|
||||||
let on_response = self.on_response.clone();
|
let on_response = self.on_response.clone();
|
||||||
let on_error = self.on_error.clone();
|
let on_error = self.on_error.clone();
|
||||||
@ -116,7 +139,9 @@ where
|
|||||||
.call(request)
|
.call(request)
|
||||||
.with_context(cx.clone())
|
.with_context(cx.clone())
|
||||||
.inspect(move |r| {
|
.inspect(move |r| {
|
||||||
inflight_requests.add(&cx, -1, &metrics_labels);
|
// This ensures the guard gets moved to the future. In case the future panics,
|
||||||
|
// it will be dropped anyway, ensuring the in-flight counter stays accurate
|
||||||
|
let _guard = guard;
|
||||||
|
|
||||||
let span = cx.span();
|
let span = cx.span();
|
||||||
let extra_labels = match r {
|
let extra_labels = match r {
|
||||||
|
Reference in New Issue
Block a user