diff --git a/crates/http/src/layers/otel/service.rs b/crates/http/src/layers/otel/service.rs index ff7ab24c..84102c40 100644 --- a/crates/http/src/layers/otel/service.rs +++ b/crates/http/src/layers/otel/service.rs @@ -18,6 +18,7 @@ use futures_util::{future::BoxFuture, FutureExt as _}; use opentelemetry::{ metrics::{Counter, Histogram, UpDownCounter}, trace::{FutureExt as _, TraceContextExt}, + Context, KeyValue, }; use tower::Service; @@ -74,6 +75,29 @@ impl } } +struct InFlightGuard { + context: Context, + meter: UpDownCounter, + attributes: Vec, +} + +impl InFlightGuard { + fn increment(context: &Context, meter: &UpDownCounter, attributes: &[KeyValue]) -> Self { + meter.add(context, 1, attributes); + Self { + context: context.clone(), + meter: meter.clone(), + attributes: attributes.to_vec(), + } + } +} + +impl Drop for InFlightGuard { + fn drop(&mut self) { + self.meter.add(&self.context, -1, &self.attributes); + } +} + impl Service for Trace where @@ -94,7 +118,6 @@ where } fn call(&mut self, request: Req) -> Self::Future { - let inflight_requests = self.inflight_requests.clone(); let request_counter = self.request_counter.clone(); let request_histogram = self.request_histogram.clone(); let start_time = SystemTime::now(); @@ -106,7 +129,7 @@ where let cx = cx.with_span(span); let request = self.inject_context.inject_context(&cx, request); - inflight_requests.add(&cx, 1, &metrics_labels); + let guard = InFlightGuard::increment(&cx, &self.inflight_requests, &metrics_labels); let on_response = self.on_response.clone(); let on_error = self.on_error.clone(); @@ -116,7 +139,9 @@ where .call(request) .with_context(cx.clone()) .inspect(move |r| { - inflight_requests.add(&cx, -1, &metrics_labels); + // This ensures the guard gets moved to the future. In case the future panics, + // it will be dropped anyway, ensuring the in-flight counter stays accurate + let _guard = guard; let span = cx.span(); let extra_labels = match r {