From 56fdb64a846154fd8d2d6acf60378bc69b529c7a Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 19 Sep 2022 12:14:23 +0200 Subject: [PATCH] HTTP metrics & other stuff --- crates/cli/src/telemetry.rs | 9 ++- crates/http/src/layers/otel/layer.rs | 78 +++++++++++++++--- .../http/src/layers/otel/make_span_builder.rs | 66 +++++++++------ crates/http/src/layers/otel/mod.rs | 10 +-- crates/http/src/layers/otel/on_error.rs | 8 +- crates/http/src/layers/otel/on_response.rs | 15 ++-- crates/http/src/layers/otel/service.rs | 81 ++++++++++++++++--- 7 files changed, 206 insertions(+), 61 deletions(-) diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs index aef14793..384e1507 100644 --- a/crates/cli/src/telemetry.rs +++ b/crates/cli/src/telemetry.rs @@ -292,14 +292,17 @@ fn stdout_meter() -> anyhow::Result { fn prometheus_meter(address: &str) -> anyhow::Result { let controller = sdk::metrics::controllers::basic( sdk::metrics::processors::factory( - sdk::metrics::selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), + sdk::metrics::selectors::simple::histogram([ + 0.01, 0.02, 0.05, 0.10, 0.20, 0.50, 1.0, 2.0, 5.0, + ]), sdk::export::metrics::aggregation::cumulative_temporality_selector(), ) .with_memory(true), ) + .with_resource(resource()) .build(); - let exporter = opentelemetry_prometheus::exporter(controller.clone()).init(); + let exporter = opentelemetry_prometheus::exporter(controller.clone()).try_init()?; let make_svc = make_service_fn(move |_conn| { let exporter = exporter.clone(); @@ -322,7 +325,7 @@ fn prometheus_meter(address: &str) -> anyhow::Result { } _ => Response::builder() .status(404) - .body(Body::from("404 not found")) + .body(Body::from("Metrics are exposed on /metrics")) .unwrap(), }; diff --git a/crates/http/src/layers/otel/layer.rs b/crates/http/src/layers/otel/layer.rs index 0bf65cc7..de245954 100644 --- a/crates/http/src/layers/otel/layer.rs +++ b/crates/http/src/layers/otel/layer.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use opentelemetry::metrics::{Counter, Histogram, UpDownCounter}; use tower::Layer; use super::{ @@ -22,12 +23,43 @@ pub struct TraceLayer< make_span_builder: MakeSpanBuilder, on_response: OnResponse, on_error: OnError, + + inflight_requests: UpDownCounter, + request_counter: Counter, + request_histogram: Histogram, } impl Default for TraceLayer { fn default() -> Self { + Self::with_namespace("http") + } +} + +impl TraceLayer { + #[must_use] + pub fn with_namespace(namespace: &'static str) -> Self { let tracer = Arc::new(opentelemetry::global::tracer("mas-http")); - Self::new(tracer) + let meter = opentelemetry::global::meter("mas-http"); + + let inflight_requests = meter + .i64_up_down_counter(format!("{namespace}.inflight_requests")) + .with_description("Number of in-flight requests") + .init(); + let request_counter = meter + .u64_counter(format!("{namespace}.requests_total")) + .with_description("Total number of requests made.") + .init(); + let request_histogram = meter + .f64_histogram(format!("{namespace}.request_duration_seconds")) + .with_description("The request latencies in seconds.") + .init(); + + Self::new( + tracer, + inflight_requests, + request_counter, + request_histogram, + ) } } @@ -35,7 +67,12 @@ impl TraceLayer { #[must_use] - pub fn new(tracer: Arc) -> Self + pub fn new( + tracer: Arc, + inflight_requests: UpDownCounter, + request_counter: Counter, + request_histogram: Histogram, + ) -> Self where ExtractContext: Default, InjectContext: Default, @@ -50,6 +87,9 @@ impl make_span_builder: MakeSpanBuilder::default(), on_response: OnResponse::default(), on_error: OnError::default(), + inflight_requests, + request_counter, + request_histogram, } } @@ -65,6 +105,9 @@ impl make_span_builder: self.make_span_builder, on_response: self.on_response, on_error: self.on_error, + inflight_requests: self.inflight_requests, + request_counter: self.request_counter, + request_histogram: self.request_histogram, } } @@ -80,6 +123,9 @@ impl make_span_builder: self.make_span_builder, on_response: self.on_response, on_error: self.on_error, + inflight_requests: self.inflight_requests, + request_counter: self.request_counter, + request_histogram: self.request_histogram, } } @@ -95,6 +141,9 @@ impl make_span_builder, on_response: self.on_response, on_error: self.on_error, + inflight_requests: self.inflight_requests, + request_counter: self.request_counter, + request_histogram: self.request_histogram, } } @@ -110,6 +159,9 @@ impl make_span_builder: self.make_span_builder, on_response, on_error: self.on_error, + inflight_requests: self.inflight_requests, + request_counter: self.request_counter, + request_histogram: self.request_histogram, } } @@ -125,6 +177,9 @@ impl make_span_builder: self.make_span_builder, on_response: self.on_response, on_error, + inflight_requests: self.inflight_requests, + request_counter: self.request_counter, + request_histogram: self.request_histogram, } } } @@ -141,14 +196,17 @@ where type Service = Trace; fn layer(&self, inner: S) -> Self::Service { - Trace { + Trace::new( inner, - tracer: self.tracer.clone(), - extract_context: self.extract_context.clone(), - inject_context: self.inject_context.clone(), - make_span_builder: self.make_span_builder.clone(), - on_response: self.on_response.clone(), - on_error: self.on_error.clone(), - } + self.tracer.clone(), + self.extract_context.clone(), + self.inject_context.clone(), + self.make_span_builder.clone(), + self.on_response.clone(), + self.on_error.clone(), + self.inflight_requests.clone(), + self.request_counter.clone(), + self.request_histogram.clone(), + ) } } diff --git a/crates/http/src/layers/otel/make_span_builder.rs b/crates/http/src/layers/otel/make_span_builder.rs index 12ae324c..94177fb4 100644 --- a/crates/http/src/layers/otel/make_span_builder.rs +++ b/crates/http/src/layers/otel/make_span_builder.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; +use std::{borrow::Cow, vec::Vec}; #[cfg(feature = "axum")] use axum::extract::{ConnectInfo, MatchedPath}; @@ -20,11 +20,14 @@ use headers::{ContentLength, HeaderMapExt, Host, UserAgent}; use http::{Method, Request, Version}; #[cfg(feature = "client")] use hyper::client::connect::dns::Name; -use opentelemetry::trace::{SpanBuilder, SpanKind}; +use opentelemetry::{ + trace::{SpanBuilder, SpanKind}, + KeyValue, +}; use opentelemetry_semantic_conventions::trace as SC; pub trait MakeSpanBuilder { - fn make_span_builder(&self, request: &R) -> SpanBuilder; + fn make_span_builder(&self, request: &R) -> (SpanBuilder, Vec); } #[derive(Debug, Clone, Copy)] @@ -48,8 +51,8 @@ impl Default for DefaultMakeSpanBuilder { } impl MakeSpanBuilder for DefaultMakeSpanBuilder { - fn make_span_builder(&self, _request: &R) -> SpanBuilder { - SpanBuilder::from_name(self.operation) + fn make_span_builder(&self, _request: &R) -> (SpanBuilder, Vec) { + (SpanBuilder::from_name(self.operation), Vec::new()) } } @@ -114,9 +117,10 @@ impl SpanFromHttpRequest { } impl MakeSpanBuilder> for SpanFromHttpRequest { - fn make_span_builder(&self, request: &Request) -> SpanBuilder { + fn make_span_builder(&self, request: &Request) -> (SpanBuilder, Vec) { + let method = SC::HTTP_METHOD.string(http_method_str(request.method())); let mut attributes = vec![ - SC::HTTP_METHOD.string(http_method_str(request.method())), + method.clone(), SC::HTTP_FLAVOR.string(http_flavor(request.version())), SC::HTTP_TARGET.string(request.uri().to_string()), ]; @@ -137,9 +141,12 @@ impl MakeSpanBuilder> for SpanFromHttpRequest { } } - SpanBuilder::from_name(self.operation) + let span_builder = SpanBuilder::from_name(self.operation) .with_kind(self.span_kind.clone()) - .with_attributes(attributes) + .with_attributes(attributes); + + let metrics_labels = vec![method]; + (span_builder, metrics_labels) } } @@ -149,9 +156,13 @@ pub struct SpanFromAxumRequest; #[cfg(feature = "axum")] impl MakeSpanBuilder> for SpanFromAxumRequest { - fn make_span_builder(&self, request: &Request) -> SpanBuilder { + fn make_span_builder(&self, request: &Request) -> (SpanBuilder, Vec) { + let method = SC::HTTP_METHOD.string(http_method_str(request.method())); + + let mut metrics_labels = vec![method.clone()]; + let mut attributes = vec![ - SC::HTTP_METHOD.string(http_method_str(request.method())), + method, SC::HTTP_FLAVOR.string(http_flavor(request.version())), SC::HTTP_TARGET.string(request.uri().to_string()), ]; @@ -181,17 +192,23 @@ impl MakeSpanBuilder> for SpanFromAxumRequest { attributes.push(SC::NET_PEER_PORT.i64(addr.port().into())); } - let name = if let Some(path) = request.extensions().get::() { - let path = path.as_str().to_owned(); - attributes.push(SC::HTTP_ROUTE.string(path.clone())); - path + let (name, route) = if let Some(path) = request.extensions().get::() { + let path = path.as_str(); + (path, path) } else { - request.uri().path().to_owned() + (request.uri().path(), "FALLBACK") }; - SpanBuilder::from_name(name) - .with_kind(SpanKind::Server) - .with_attributes(attributes) + let route = SC::HTTP_ROUTE.string(route.to_owned()); + attributes.push(route.clone()); + metrics_labels.push(route); + + ( + SpanBuilder::from_name(name.to_owned()) + .with_kind(SpanKind::Server) + .with_attributes(attributes), + metrics_labels, + ) } } @@ -201,11 +218,14 @@ pub struct SpanFromDnsRequest; #[cfg(feature = "client")] impl MakeSpanBuilder for SpanFromDnsRequest { - fn make_span_builder(&self, request: &Name) -> SpanBuilder { + fn make_span_builder(&self, request: &Name) -> (SpanBuilder, Vec) { let attributes = vec![SC::NET_HOST_NAME.string(request.as_str().to_owned())]; - SpanBuilder::from_name("resolve") - .with_kind(SpanKind::Client) - .with_attributes(attributes) + ( + SpanBuilder::from_name("resolve") + .with_kind(SpanKind::Client) + .with_attributes(attributes.clone()), + attributes, + ) } } diff --git a/crates/http/src/layers/otel/mod.rs b/crates/http/src/layers/otel/mod.rs index 56c8b66f..45dc7a16 100644 --- a/crates/http/src/layers/otel/mod.rs +++ b/crates/http/src/layers/otel/mod.rs @@ -95,7 +95,7 @@ pub type TraceDns = Trace< impl TraceHttpServerLayer { #[must_use] pub fn http_server() -> Self { - TraceLayer::default() + TraceLayer::with_namespace("http_server") .make_span_builder(SpanFromHttpRequest::server()) .on_response(OnHttpResponse) .extract_context(ExtractFromHttpRequest) @@ -106,7 +106,7 @@ impl TraceHttpServerLayer { impl TraceAxumServerLayer { #[must_use] pub fn axum() -> Self { - TraceLayer::default() + TraceLayer::with_namespace("http_server") .make_span_builder(SpanFromAxumRequest) .on_response(OnHttpResponse) .extract_context(ExtractFromHttpRequest) @@ -116,7 +116,7 @@ impl TraceAxumServerLayer { impl TraceHttpClientLayer { #[must_use] pub fn http_client(operation: &'static str) -> Self { - TraceLayer::default() + TraceLayer::with_namespace("http_client") .make_span_builder(SpanFromHttpRequest::client(operation)) .on_response(OnHttpResponse) .inject_context(InjectInHttpRequest) @@ -124,7 +124,7 @@ impl TraceHttpClientLayer { #[must_use] pub fn inner_http_client() -> Self { - TraceLayer::default() + TraceLayer::with_namespace("inner_http_client") .make_span_builder(SpanFromHttpRequest::inner_client()) .on_response(OnHttpResponse) .inject_context(InjectInHttpRequest) @@ -135,7 +135,7 @@ impl TraceHttpClientLayer { impl TraceDnsLayer { #[must_use] pub fn dns() -> Self { - TraceLayer::default().make_span_builder(SpanFromDnsRequest) + TraceLayer::with_namespace("dns").make_span_builder(SpanFromDnsRequest) } } diff --git a/crates/http/src/layers/otel/on_error.rs b/crates/http/src/layers/otel/on_error.rs index 9c3bcec2..12930545 100644 --- a/crates/http/src/layers/otel/on_error.rs +++ b/crates/http/src/layers/otel/on_error.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use opentelemetry::trace::SpanRef; +use opentelemetry::{trace::SpanRef, KeyValue}; use opentelemetry_semantic_conventions::trace::EXCEPTION_MESSAGE; pub trait OnError { - fn on_error(&self, span: &SpanRef<'_>, err: &E); + fn on_error(&self, span: &SpanRef<'_>, err: &E) -> Vec; } #[derive(Debug, Clone, Copy, Default)] @@ -26,8 +26,10 @@ impl OnError for DefaultOnError where E: std::fmt::Display, { - fn on_error(&self, span: &SpanRef<'_>, err: &E) { + fn on_error(&self, span: &SpanRef<'_>, err: &E) -> Vec { let attributes = vec![EXCEPTION_MESSAGE.string(err.to_string())]; span.add_event("exception".to_owned(), attributes); + + Vec::new() } } diff --git a/crates/http/src/layers/otel/on_response.rs b/crates/http/src/layers/otel/on_response.rs index 7fd5c5f8..0c104e0f 100644 --- a/crates/http/src/layers/otel/on_response.rs +++ b/crates/http/src/layers/otel/on_response.rs @@ -16,26 +16,29 @@ use headers::{ContentLength, HeaderMapExt}; use http::Response; #[cfg(feature = "client")] use hyper::client::connect::HttpInfo; -use opentelemetry::trace::SpanRef; +use opentelemetry::{trace::SpanRef, KeyValue}; use opentelemetry_semantic_conventions::trace as SC; pub trait OnResponse { - fn on_response(&self, span: &SpanRef<'_>, response: &R); + fn on_response(&self, span: &SpanRef<'_>, response: &R) -> Vec; } #[derive(Debug, Clone, Copy, Default)] pub struct DefaultOnResponse; impl OnResponse for DefaultOnResponse { - fn on_response(&self, _span: &SpanRef<'_>, _response: &R) {} + fn on_response(&self, _span: &SpanRef<'_>, _response: &R) -> Vec { + Vec::new() + } } #[derive(Debug, Clone, Copy, Default)] pub struct OnHttpResponse; impl OnResponse> for OnHttpResponse { - fn on_response(&self, span: &SpanRef<'_>, response: &Response) { - span.set_attribute(SC::HTTP_STATUS_CODE.i64(i64::from(response.status().as_u16()))); + fn on_response(&self, span: &SpanRef<'_>, response: &Response) -> Vec { + let status_code = i64::from(response.status().as_u16()); + span.set_attribute(SC::HTTP_STATUS_CODE.i64(status_code)); if let Some(ContentLength(content_length)) = response.headers().typed_get() { if let Ok(content_length) = content_length.try_into() { @@ -52,5 +55,7 @@ impl OnResponse> for OnHttpResponse { span.set_attribute(SC::NET_HOST_IP.string(info.local_addr().ip().to_string())); span.set_attribute(SC::NET_HOST_PORT.i64(info.local_addr().port().into())); } + + vec![KeyValue::new("status_code", status_code)] } } diff --git a/crates/http/src/layers/otel/service.rs b/crates/http/src/layers/otel/service.rs index 9e4e57d6..ff7ab24c 100644 --- a/crates/http/src/layers/otel/service.rs +++ b/crates/http/src/layers/otel/service.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{sync::Arc, task::Poll}; +use std::{sync::Arc, task::Poll, time::SystemTime}; use futures_util::{future::BoxFuture, FutureExt as _}; -use opentelemetry::trace::{FutureExt as _, TraceContextExt}; +use opentelemetry::{ + metrics::{Counter, Histogram, UpDownCounter}, + trace::{FutureExt as _, TraceContextExt}, +}; use tower::Service; use super::{ @@ -25,13 +28,50 @@ use super::{ #[derive(Debug, Clone)] pub struct Trace { - pub(crate) inner: S, - pub(crate) tracer: Arc, - pub(crate) extract_context: ExtractContext, - pub(crate) inject_context: InjectContext, - pub(crate) make_span_builder: MakeSpanBuilder, - pub(crate) on_response: OnResponse, - pub(crate) on_error: OnError, + inner: S, + tracer: Arc, + extract_context: ExtractContext, + inject_context: InjectContext, + make_span_builder: MakeSpanBuilder, + on_response: OnResponse, + on_error: OnError, + + inflight_requests: UpDownCounter, + request_counter: Counter, + request_histogram: Histogram, +} + +impl + Trace +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + service: S, + tracer: Arc, + extract_context: ExtractContext, + inject_context: InjectContext, + make_span_builder: MakeSpanBuilder, + on_response: OnResponse, + on_error: OnError, + inflight_requests: UpDownCounter, + request_counter: Counter, + request_histogram: Histogram, + ) -> Self { + Self { + inner: service, + tracer, + + extract_context, + inject_context, + make_span_builder, + on_response, + on_error, + + inflight_requests, + request_counter, + request_histogram, + } + } } impl Service @@ -54,13 +94,20 @@ where } fn call(&mut self, request: Req) -> Self::Future { + let inflight_requests = self.inflight_requests.clone(); + let request_counter = self.request_counter.clone(); + let request_histogram = self.request_histogram.clone(); + let start_time = SystemTime::now(); + let cx = self.extract_context.extract_context(&request); - let span_builder = self.make_span_builder.make_span_builder(&request); + let (span_builder, mut metrics_labels) = self.make_span_builder.make_span_builder(&request); let span = span_builder.start_with_context(self.tracer.as_ref(), &cx); let cx = cx.with_span(span); let request = self.inject_context.inject_context(&cx, request); + inflight_requests.add(&cx, 1, &metrics_labels); + let on_response = self.on_response.clone(); let on_error = self.on_error.clone(); let attachment = cx.clone().attach(); @@ -69,11 +116,21 @@ where .call(request) .with_context(cx.clone()) .inspect(move |r| { + inflight_requests.add(&cx, -1, &metrics_labels); + let span = cx.span(); - match r { + let extra_labels = match r { Ok(response) => on_response.on_response(&span, response), Err(err) => on_error.on_error(&span, err), - } + }; + metrics_labels.extend_from_slice(&extra_labels); + + request_counter.add(&cx, 1, &metrics_labels); + request_histogram.record( + &cx, + start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()), + &metrics_labels, + ); span.end(); })