From 08f58db08bbea857bb83677ed47dd9d580bece83 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Sat, 15 Apr 2023 14:21:12 +0200 Subject: [PATCH] Replace the OTEL-based tracing layer with `tracing` based layers --- Cargo.lock | 37 +- crates/axum-utils/src/client_authorization.rs | 2 +- crates/axum-utils/src/http_client_factory.rs | 14 +- crates/cli/Cargo.toml | 1 + crates/cli/src/commands/debug.rs | 4 +- crates/cli/src/server.rs | 166 ++++++++- crates/cli/src/telemetry.rs | 3 +- crates/email/Cargo.toml | 5 + crates/email/src/transport/aws_ses.rs | 77 +++- .../handlers/src/upstream_oauth2/authorize.rs | 4 +- .../handlers/src/upstream_oauth2/callback.rs | 12 +- crates/http/Cargo.toml | 4 +- crates/http/src/client.rs | 32 +- crates/http/src/layers/client.rs | 48 ++- crates/http/src/layers/mod.rs | 1 - .../http/src/layers/otel/extract_context.rs | 51 --- crates/http/src/layers/otel/inject_context.rs | 76 ---- crates/http/src/layers/otel/layer.rs | 334 ------------------ .../src/layers/otel/make_metrics_labels.rs | 62 ---- .../http/src/layers/otel/make_span_builder.rs | 235 ------------ crates/http/src/layers/otel/mod.rs | 197 ----------- crates/http/src/layers/otel/on_error.rs | 46 --- crates/http/src/layers/otel/on_response.rs | 99 ------ crates/http/src/layers/otel/service.rs | 208 ----------- crates/http/src/layers/otel/utils.rs | 45 --- crates/http/src/lib.rs | 3 +- crates/tasks/Cargo.toml | 8 +- crates/tasks/src/database.rs | 3 +- crates/tasks/src/email.rs | 8 +- crates/tasks/src/layers.rs | 70 ---- crates/tasks/src/lib.rs | 9 +- crates/tasks/src/matrix.rs | 43 ++- crates/tasks/src/utils.rs | 79 +++++ crates/tower/Cargo.toml | 20 ++ crates/tower/src/lib.rs | 32 ++ crates/tower/src/metrics/duration.rs | 237 +++++++++++++ crates/tower/src/metrics/in_flight.rs | 168 +++++++++ crates/tower/src/metrics/make_attributes.rs | 154 ++++++++ crates/tower/src/metrics/mod.rs | 23 ++ crates/tower/src/trace_context.rs | 115 ++++++ crates/tower/src/tracing/enrich_span.rs | 114 ++++++ crates/tower/src/tracing/future.rs | 70 ++++ crates/tower/src/tracing/layer.rs | 105 ++++++ crates/tower/src/tracing/make_span.rs | 72 ++++ crates/tower/src/tracing/mod.rs | 27 ++ crates/tower/src/tracing/service.rs | 67 ++++ crates/tower/src/utils.rs | 33 ++ 47 files changed, 1703 insertions(+), 1520 deletions(-) delete mode 100644 crates/http/src/layers/otel/extract_context.rs delete mode 100644 crates/http/src/layers/otel/inject_context.rs delete mode 100644 crates/http/src/layers/otel/layer.rs delete mode 100644 crates/http/src/layers/otel/make_metrics_labels.rs delete mode 100644 crates/http/src/layers/otel/make_span_builder.rs delete mode 100644 crates/http/src/layers/otel/mod.rs delete mode 100644 crates/http/src/layers/otel/on_error.rs delete mode 100644 crates/http/src/layers/otel/on_response.rs delete mode 100644 crates/http/src/layers/otel/service.rs delete mode 100644 crates/http/src/layers/otel/utils.rs delete mode 100644 crates/tasks/src/layers.rs create mode 100644 crates/tasks/src/utils.rs create mode 100644 crates/tower/Cargo.toml create mode 100644 crates/tower/src/lib.rs create mode 100644 crates/tower/src/metrics/duration.rs create mode 100644 crates/tower/src/metrics/in_flight.rs create mode 100644 crates/tower/src/metrics/make_attributes.rs create mode 100644 crates/tower/src/metrics/mod.rs create mode 100644 crates/tower/src/trace_context.rs create mode 100644 crates/tower/src/tracing/enrich_span.rs create mode 100644 crates/tower/src/tracing/future.rs create mode 100644 crates/tower/src/tracing/layer.rs create mode 100644 crates/tower/src/tracing/make_span.rs create mode 100644 crates/tower/src/tracing/mod.rs create mode 100644 crates/tower/src/tracing/service.rs create mode 100644 crates/tower/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index bd6be3b1..5a6ec319 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3062,6 +3062,7 @@ dependencies = [ "mas-storage-pg", "mas-tasks", "mas-templates", + "mas-tower", "oauth2-types", "opentelemetry", "opentelemetry-http", @@ -3145,9 +3146,14 @@ dependencies = [ "aws-sdk-sesv2", "aws-smithy-async", "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-types", + "headers", "lettre", "mas-http", "mas-templates", + "mas-tower", "thiserror", "tracing", ] @@ -3240,10 +3246,9 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", + "mas-tower", "once_cell", "opentelemetry", - "opentelemetry-http", - "opentelemetry-semantic-conventions", "rustls", "rustls-native-certs", "serde", @@ -3513,6 +3518,8 @@ dependencies = [ "mas-http", "mas-storage", "mas-storage-pg", + "mas-tower", + "opentelemetry", "rand 0.8.5", "rand_chacha 0.3.1", "serde", @@ -3548,6 +3555,21 @@ dependencies = [ "url", ] +[[package]] +name = "mas-tower" +version = "0.1.0" +dependencies = [ + "aws-smithy-http", + "http", + "opentelemetry", + "opentelemetry-http", + "pin-project-lite", + "tokio", + "tower", + "tracing", + "tracing-opentelemetry", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -5951,14 +5973,13 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.26.0" +version = "1.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" +checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" dependencies = [ "autocfg 1.1.0", "bytes 1.4.0", "libc", - "memchr", "mio", "num_cpus", "parking_lot 0.12.1", @@ -5981,13 +6002,13 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "1.8.2" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" +checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" dependencies = [ "proc-macro2 1.0.52", "quote 1.0.26", - "syn 1.0.109", + "syn 2.0.12", ] [[package]] diff --git a/crates/axum-utils/src/client_authorization.rs b/crates/axum-utils/src/client_authorization.rs index 3f1101e6..d281709d 100644 --- a/crates/axum-utils/src/client_authorization.rs +++ b/crates/axum-utils/src/client_authorization.rs @@ -191,7 +191,7 @@ async fn fetch_jwks( .unwrap(); let mut client = http_client_factory - .client("fetch-jwks") + .client() .await? .response_body_to_bytes() .json_response::(); diff --git a/crates/axum-utils/src/http_client_factory.rs b/crates/axum-utils/src/http_client_factory.rs index a143616a..b2b562a0 100644 --- a/crates/axum-utils/src/http_client_factory.rs +++ b/crates/axum-utils/src/http_client_factory.rs @@ -43,16 +43,13 @@ impl HttpClientFactory { /// # Errors /// /// Returns an error if the client failed to initialise - pub async fn client( - &self, - operation: &'static str, - ) -> Result>, ClientInitError> + pub async fn client(&self) -> Result>, ClientInitError> where B: axum::body::HttpBody + Send, B::Data: Send, { let client = mas_http::make_traced_client::().await?; - let layer = ClientLayer::with_semaphore(operation, self.semaphore.clone()); + let layer = ClientLayer::with_semaphore(self.semaphore.clone()); Ok(layer.layer(client)) } @@ -61,11 +58,8 @@ impl HttpClientFactory { /// # Errors /// /// Returns an error if the client failed to initialise - pub async fn http_service( - &self, - operation: &'static str, - ) -> Result { - let client = self.client(operation).await?; + pub async fn http_service(&self) -> Result { + let client = self.client().await?; let client = ( MapErrLayer::new(BoxError::from), MapRequestLayer::new(|req: http::Request<_>| req.map(Full::new)), diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 1886d140..ed57ecb1 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -57,6 +57,7 @@ mas-storage = { path = "../storage" } mas-storage-pg = { path = "../storage-pg" } mas-tasks = { path = "../tasks" } mas-templates = { path = "../templates" } +mas-tower = { path = "../tower" } oauth2-types = { path = "../oauth2-types" } [dev-dependencies] diff --git a/crates/cli/src/commands/debug.rs b/crates/cli/src/commands/debug.rs index 24f9262c..6f5d9a1a 100644 --- a/crates/cli/src/commands/debug.rs +++ b/crates/cli/src/commands/debug.rs @@ -75,7 +75,7 @@ impl Options { url, } => { let _span = info_span!("cli.debug.http").entered(); - let mut client = http_client_factory.client("cli-debug-http").await?; + let mut client = http_client_factory.client().await?; let request = hyper::Request::builder() .uri(url) .body(hyper::Body::empty())?; @@ -101,7 +101,7 @@ impl Options { } => { let _span = info_span!("cli.debug.http").entered(); let mut client = http_client_factory - .client("cli-debug-http") + .client() .await? .response_body_to_bytes() .json_response(); diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index 195aadbc..08d26176 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -13,27 +13,153 @@ // limitations under the License. use std::{ + borrow::Cow, future::ready, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener, ToSocketAddrs}, os::unix::net::UnixListener, }; use anyhow::Context; -use axum::{body::HttpBody, error_handling::HandleErrorLayer, extract::FromRef, Extension, Router}; -use hyper::StatusCode; +use axum::{ + body::HttpBody, + error_handling::HandleErrorLayer, + extract::{FromRef, MatchedPath}, + Extension, Router, +}; +use hyper::{Method, Request, Response, StatusCode, Version}; use listenfd::ListenFd; use mas_config::{HttpBindConfig, HttpResource, HttpTlsConfig, UnixOrTcp}; use mas_handlers::AppState; -use mas_http::otel::TraceLayer; use mas_listener::{unix_or_tcp::UnixOrTcpListener, ConnectionInfo}; use mas_router::Route; use mas_spa::ViteManifestService; use mas_templates::Templates; -use opentelemetry::KeyValue; +use mas_tower::{ + make_span_fn, metrics_attributes_fn, DurationRecorderLayer, InFlightCounterLayer, TraceLayer, + KV, +}; +use opentelemetry::{Key, KeyValue}; +use opentelemetry_http::HeaderExtractor; +use opentelemetry_semantic_conventions::trace::{ + HTTP_METHOD, HTTP_ROUTE, HTTP_SCHEME, HTTP_STATUS_CODE, +}; use rustls::ServerConfig; use sentry_tower::{NewSentryLayer, SentryHttpLayer}; use tower::Layer; use tower_http::{compression::CompressionLayer, services::ServeDir}; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +const NET_PROTOCOL_NAME: Key = Key::from_static_str("net.protocol.name"); +const NET_PROTOCOL_VERSION: Key = Key::from_static_str("net.protocol.version"); +const MAS_LISTENER_NAME: Key = Key::from_static_str("mas.listener.name"); + +#[inline] +fn otel_http_method(request: &Request) -> Cow<'static, str> { + match request.method() { + &Method::OPTIONS => "OPTIONS".into(), + &Method::GET => "GET".into(), + &Method::POST => "POST".into(), + &Method::PUT => "PUT".into(), + &Method::DELETE => "DELETE".into(), + &Method::HEAD => "HEAD".into(), + &Method::TRACE => "TRACE".into(), + &Method::CONNECT => "CONNECT".into(), + &Method::PATCH => "PATCH".into(), + other => other.to_string().into(), + } +} + +#[inline] +fn otel_net_protocol_version(request: &Request) -> Cow<'static, str> { + match request.version() { + Version::HTTP_09 => "0.9".into(), + Version::HTTP_10 => "1.0".into(), + Version::HTTP_11 => "1.1".into(), + Version::HTTP_2 => "2.0".into(), + Version::HTTP_3 => "3.0".into(), + other => format!("{other:?}").into(), + } +} + +fn otel_http_route(request: &Request) -> Option<&str> { + request + .extensions() + .get::() + .map(MatchedPath::as_str) +} + +fn otel_http_target(request: &Request) -> &str { + request.uri().path_and_query().map_or("", |p| p.as_str()) +} + +fn otel_http_scheme(request: &Request) -> &'static str { + // XXX: maybe we should panic if the connection info was not injected in the + // request extensions + request + .extensions() + .get::() + .map_or("http", |conn_info| { + if conn_info.get_tls_ref().is_some() { + "https" + } else { + "http" + } + }) +} + +fn make_http_span(req: &Request) -> Span { + let method = otel_http_method(req); + let route = otel_http_route(req); + + let span_name = if let Some(route) = route.as_ref() { + format!("{method} {route}") + } else { + format!("{method}") + }; + + let span = tracing::info_span!( + "http.server.request", + "otel.kind" = "server", + "otel.name" = span_name, + "otel.status_code" = tracing::field::Empty, + "net.protocol.name" = "http", + "net.protocol.version" = otel_net_protocol_version(req).as_ref(), + "http.scheme" = otel_http_scheme(req), + "http.method" = method.as_ref(), + "http.route" = tracing::field::Empty, + "http.target" = otel_http_target(req), + "http.status_code" = tracing::field::Empty, + ); + + if let Some(route) = route.as_ref() { + span.record("http.route", route); + } + + // Extract the parent span context from the request headers + let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| { + let extractor = HeaderExtractor(req.headers()); + let context = opentelemetry::Context::new(); + propagator.extract_with_context(&context, &extractor) + }); + span.set_parent(parent_context); + + span +} + +fn on_http_request_labels(request: &Request) -> Vec { + vec![ + NET_PROTOCOL_NAME.string("http"), + NET_PROTOCOL_VERSION.string(otel_net_protocol_version(request)), + HTTP_METHOD.string(otel_http_method(request)), + HTTP_SCHEME.string(otel_http_scheme(request).as_ref()), + HTTP_ROUTE.string(otel_http_route(request).unwrap_or("FALLBACK").to_owned()), + ] +} + +fn on_http_response_labels(res: &Response) -> Vec { + vec![HTTP_STATUS_CODE.i64(res.status().as_u16().into())] +} pub fn build_router( state: AppState, @@ -112,16 +238,34 @@ where } } - let mut trace_layer = TraceLayer::axum(); - - if let Some(name) = name { - trace_layer = trace_layer.with_static_attribute(KeyValue::new("listener", name.to_owned())); - } - router + .layer( + InFlightCounterLayer::new("http.server.active_requests").on_request(( + name.map(|name| MAS_LISTENER_NAME.string(name.to_owned())), + metrics_attributes_fn(on_http_request_labels), + )), + ) + .layer( + DurationRecorderLayer::new("http.server.duration") + .on_request(( + name.map(|name| MAS_LISTENER_NAME.string(name.to_owned())), + metrics_attributes_fn(on_http_request_labels), + )) + .on_response_fn(on_http_response_labels), + ) + .layer( + TraceLayer::new(( + make_span_fn(make_http_span), + name.map(|name| KV("mas.listener.name", name.to_owned())), + )) + .on_response_fn(|span: &Span, response: &Response<_>| { + let status_code = response.status().as_u16(); + span.record("http.status_code", status_code); + span.record("otel.status_code", "OK"); + }), + ) .layer(SentryHttpLayer::new()) .layer(NewSentryLayer::new_from_top()) - .layer(trace_layer) .layer(CompressionLayer::new()) .with_state(state) } diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs index 794e7ce3..2ef9905d 100644 --- a/crates/cli/src/telemetry.rs +++ b/crates/cli/src/telemetry.rs @@ -375,8 +375,9 @@ fn prometheus_meter() -> anyhow::Result { fn prometheus_meter() -> anyhow::Result { let controller = sdk::metrics::controllers::basic( sdk::metrics::processors::factory( + // All histogram metrics are in milliseconds. Each bucket is ~2x the previous one. sdk::metrics::selectors::simple::histogram([ - 0.01, 0.02, 0.05, 0.10, 0.20, 0.50, 1.0, 2.0, 5.0, + 1.0, 3.0, 5.0, 10.0, 30.0, 50.0, 100.0, 300.0, 1000.0, ]), sdk::export::metrics::aggregation::cumulative_temporality_selector(), ) diff --git a/crates/email/Cargo.toml b/crates/email/Cargo.toml index dd82b25b..7d80c892 100644 --- a/crates/email/Cargo.toml +++ b/crates/email/Cargo.toml @@ -9,14 +9,19 @@ license = "Apache-2.0" async-trait = "0.1.68" tracing = "0.1.37" thiserror = "1.0.40" +headers = "0.3.8" aws-sdk-sesv2 = { version = "0.24.0", default-features = false } aws-config = { version = "0.54.1", default-features = false } aws-smithy-client = { version = "0.54.4", default-features = false, features = ["client-hyper"] } aws-smithy-async = { version = "0.54.4", default-features = false, features = ["rt-tokio"] } +aws-smithy-http = { version = "0.54.4", default-features = false } +aws-smithy-http-tower = { version = "0.54.4", default-features = false } +aws-types = "0.54.1" mas-templates = { path = "../templates" } mas-http = { path = "../http", features = ["aws-sdk", "client"] } +mas-tower = { path = "../tower", features = ["aws-sdk"] } [dependencies.lettre] version = "0.10.3" diff --git a/crates/email/src/transport/aws_ses.rs b/crates/email/src/transport/aws_ses.rs index 9a2cc4c0..00cdb2ad 100644 --- a/crates/email/src/transport/aws_ses.rs +++ b/crates/email/src/transport/aws_ses.rs @@ -25,8 +25,11 @@ use aws_sdk_sesv2::{ }; use aws_smithy_async::rt::sleep::TokioSleep; use aws_smithy_client::erase::{DynConnector, DynMiddleware}; +use headers::{ContentLength, HeaderMapExt, Host, UserAgent}; use lettre::{address::Envelope, AsyncTransport}; -use mas_http::{otel::TraceLayer, ClientInitError}; +use mas_http::ClientInitError; +use mas_tower::{enrich_span_fn, make_span_fn, TraceContextLayer, TraceLayer}; +use tracing::{info_span, Span}; pub type Error = aws_smithy_client::SdkError; @@ -58,7 +61,77 @@ impl Transport { let http_connector = DynConnector::new(http_connector); // Middleware to add tracing to AWS SDK operations - let middleware = DynMiddleware::new((TraceLayer::aws_sdk(), DefaultMiddleware::default())); + let middleware = DynMiddleware::new(( + DefaultMiddleware::default(), + // TODO: factor this out somewhere else + TraceLayer::new(make_span_fn(|op: &aws_smithy_http::operation::Request| { + let properties = op.properties(); + let request = op.http(); + let span = info_span!( + "aws.sdk.operation", + "otel.kind" = "client", + "otel.name" = tracing::field::Empty, + "otel.status_code" = tracing::field::Empty, + "rpc.system" = "aws-api", + "rpc.service" = tracing::field::Empty, + "rpc.method" = tracing::field::Empty, + "http.method" = %request.method(), + "http.url" = %request.uri(), + "http.host" = tracing::field::Empty, + "http.request_content_length" = tracing::field::Empty, + "http.response_content_length" = tracing::field::Empty, + "http.status_code" = tracing::field::Empty, + "user_agent.original" = tracing::field::Empty, + ); + + if let Some(metadata) = properties.get::() { + span.record("rpc.service", metadata.service()); + span.record("rpc.method", metadata.name()); + let name = format!("{}::{}", metadata.service(), metadata.name()); + span.record("otel.name", name); + } else if let Some(service) = properties.get::() { + span.record("rpc.service", tracing::field::debug(service)); + span.record("otel.name", tracing::field::debug(service)); + } + + let headers = request.headers(); + + if let Some(host) = headers.typed_get::() { + span.record("http.host", tracing::field::display(host)); + } + + if let Some(user_agent) = headers.typed_get::() { + span.record("user_agent.original", tracing::field::display(user_agent)); + } + + if let Some(ContentLength(content_length)) = headers.typed_get() { + span.record("http.request_content_length", content_length); + } + + span + })) + .on_response(enrich_span_fn( + |span: &Span, res: &aws_smithy_http::operation::Response| { + span.record("otel.status_code", "OK"); + let response = res.http(); + + let status = response.status(); + span.record("http.status_code", status.as_u16()); + + let headers = response.headers(); + if let Some(ContentLength(content_length)) = headers.typed_get() { + span.record("http.response_content_length", content_length); + } + }, + )) + .on_error(enrich_span_fn( + |span: &Span, err: &aws_smithy_http_tower::SendOperationError| { + span.record("otel.status_code", "ERROR"); + span.record("exception.message", tracing::field::debug(err)); + }, + )), + TraceContextLayer::new(), + )); // Use that connector for discovering the config let config = ProviderConfig::default().with_http_connector(http_connector.clone()); diff --git a/crates/handlers/src/upstream_oauth2/authorize.rs b/crates/handlers/src/upstream_oauth2/authorize.rs index b80bd338..fffb044c 100644 --- a/crates/handlers/src/upstream_oauth2/authorize.rs +++ b/crates/handlers/src/upstream_oauth2/authorize.rs @@ -78,9 +78,7 @@ pub(crate) async fn get( .await? .ok_or(RouteError::ProviderNotFound)?; - let http_service = http_client_factory - .http_service("upstream-discover") - .await?; + let http_service = http_client_factory.http_service().await?; // First, discover the provider let metadata = diff --git a/crates/handlers/src/upstream_oauth2/callback.rs b/crates/handlers/src/upstream_oauth2/callback.rs index 601f2bfa..0d28c3fc 100644 --- a/crates/handlers/src/upstream_oauth2/callback.rs +++ b/crates/handlers/src/upstream_oauth2/callback.rs @@ -184,19 +184,13 @@ pub(crate) async fn get( CodeOrError::Code { code } => code, }; - let http_service = http_client_factory - .http_service("upstream-discover") - .await?; + let http_service = http_client_factory.http_service().await?; // XXX: we shouldn't discover on-the-fly // Discover the provider let metadata = mas_oidc_client::requests::discovery::discover(&http_service, &provider.issuer).await?; - let http_service = http_client_factory - .http_service("upstream-fetch-jwks") - .await?; - // Fetch the JWKS let jwks = mas_oidc_client::requests::jose::fetch_jwks(&http_service, metadata.jwks_uri()).await?; @@ -227,10 +221,6 @@ pub(crate) async fn get( client_id: &provider.client_id, }; - let http_service = http_client_factory - .http_service("upstream-exchange-code") - .await?; - let (response, id_token) = mas_oidc_client::requests::authorization_code::access_token_with_authorization_code( &http_service, diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index e8395b0b..b96af7dc 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -18,8 +18,6 @@ hyper = "0.14.25" hyper-rustls = { version = "0.23.2", features = ["http1", "http2"], default-features = false, optional = true } once_cell = "1.17.1" opentelemetry = "0.18.0" -opentelemetry-http = "0.7.0" -opentelemetry-semantic-conventions = "0.10.0" rustls = { version = "0.20.8", optional = true } rustls-native-certs = { version = "0.6.2", optional = true } serde = "1.0.158" @@ -34,6 +32,8 @@ tracing-opentelemetry = "0.18.0" webpki = { version = "0.22.0", optional = true } webpki-roots = { version = "0.22.6", optional = true } +mas-tower = { path = "../tower" } + [dev-dependencies] anyhow = "1.0.69" serde = { version = "1.0.158", features = ["derive"] } diff --git a/crates/http/src/client.rs b/crates/http/src/client.rs index f23db552..81b27139 100644 --- a/crates/http/src/client.rs +++ b/crates/http/src/client.rs @@ -15,14 +15,20 @@ use std::convert::Infallible; use hyper::{ - client::{connect::dns::GaiResolver, HttpConnector}, + client::{ + connect::dns::{GaiResolver, Name}, + HttpConnector, + }, Client, }; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; +use mas_tower::{ + DurationRecorderLayer, DurationRecorderService, FnWrapper, InFlightCounterLayer, + InFlightCounterService, TraceLayer, TraceService, +}; use thiserror::Error; use tower::Layer; - -use crate::layers::otel::{TraceDns, TraceLayer}; +use tracing::Span; #[cfg(all(not(feature = "webpki-roots"), not(feature = "native-roots")))] compile_error!("enabling the 'client' feature requires also enabling the 'webpki-roots' or the 'native-roots' features"); @@ -165,8 +171,10 @@ where Ok(Client::builder().build(https)) } +pub type TraceResolver = + InFlightCounterService Span>>>>; pub type UntracedConnector = HttpsConnector>; -pub type TracedConnector = HttpsConnector>>; +pub type TracedConnector = HttpsConnector>>; /// Create a traced HTTP and HTTPS connector /// @@ -176,8 +184,20 @@ pub type TracedConnector = HttpsConnector>>; pub async fn make_traced_connector() -> Result where { - // Trace DNS requests - let resolver = TraceLayer::dns().layer(GaiResolver::new()); + let in_flight_counter = InFlightCounterLayer::new("dns.resolve.active_requests"); + let duration_recorder = DurationRecorderLayer::new("dns.resolve.duration"); + let trace_layer = TraceLayer::from_fn( + (|request: &Name| { + tracing::info_span!( + "dns.resolve", + "otel.kind" = "client", + "net.host.name" = %request, + ) + }) as fn(&Name) -> Span, + ); + + let resolver = (in_flight_counter, duration_recorder, trace_layer).layer(GaiResolver::new()); + let tls_config = make_tls_config().await?; Ok(make_connector(resolver, tls_config)) } diff --git a/crates/http/src/layers/client.rs b/crates/http/src/layers/client.rs index 2e112178..637d6ae0 100644 --- a/crates/http/src/layers/client.rs +++ b/crates/http/src/layers/client.rs @@ -14,7 +14,8 @@ use std::{sync::Arc, time::Duration}; -use http::{header::USER_AGENT, HeaderValue}; +use http::{header::USER_AGENT, HeaderValue, Request}; +use mas_tower::{MakeSpan, TraceContextLayer, TraceContextService, TraceLayer, TraceService}; use tokio::sync::Semaphore; use tower::{ limit::{ConcurrencyLimit, GlobalConcurrencyLimitLayer}, @@ -26,42 +27,61 @@ use tower_http::{ timeout::{Timeout, TimeoutLayer}, }; -use super::otel::TraceLayer; -use crate::otel::{TraceHttpClient, TraceHttpClientLayer}; - pub type ClientService = SetRequestHeader< - TraceHttpClient>>>>, + ConcurrencyLimit< + FollowRedirect>, MakeSpanForRequest>>, + >, HeaderValue, >; +#[derive(Debug, Clone)] +pub struct MakeSpanForRequest; + +impl MakeSpan> for MakeSpanForRequest { + fn make_span(&self, request: &Request) -> tracing::Span { + // TODO: better attributes + tracing::info_span!( + "http.client.request", + "http.method" = %request.method(), + "http.uri" = %request.uri(), + ) + } +} + #[derive(Debug, Clone)] pub struct ClientLayer { user_agent_layer: SetRequestHeaderLayer, - outer_trace_layer: TraceHttpClientLayer, concurrency_limit_layer: GlobalConcurrencyLimitLayer, follow_redirect_layer: FollowRedirectLayer, - inner_trace_layer: TraceHttpClientLayer, + trace_layer: TraceLayer, + trace_context_layer: TraceContextLayer, timeout_layer: TimeoutLayer, } +impl Default for ClientLayer { + fn default() -> Self { + Self::new() + } +} + impl ClientLayer { #[must_use] - pub fn new(operation: &'static str) -> Self { + pub fn new() -> Self { let semaphore = Arc::new(Semaphore::new(10)); - Self::with_semaphore(operation, semaphore) + Self::with_semaphore(semaphore) } #[must_use] - pub fn with_semaphore(operation: &'static str, semaphore: Arc) -> Self { + pub fn with_semaphore(semaphore: Arc) -> Self { Self { user_agent_layer: SetRequestHeaderLayer::overriding( USER_AGENT, HeaderValue::from_static("matrix-authentication-service/0.0.1"), ), - outer_trace_layer: TraceLayer::http_client(operation), concurrency_limit_layer: GlobalConcurrencyLimitLayer::with_semaphore(semaphore), follow_redirect_layer: FollowRedirectLayer::new(), - inner_trace_layer: TraceLayer::inner_http_client(), + trace_layer: TraceLayer::new(MakeSpanForRequest), + trace_context_layer: TraceContextLayer::new(), timeout_layer: TimeoutLayer::new(Duration::from_secs(10)), } } @@ -76,10 +96,10 @@ where fn layer(&self, inner: S) -> Self::Service { ( &self.user_agent_layer, - &self.outer_trace_layer, &self.concurrency_limit_layer, &self.follow_redirect_layer, - &self.inner_trace_layer, + &self.trace_layer, + &self.trace_context_layer, &self.timeout_layer, ) .layer(inner) diff --git a/crates/http/src/layers/mod.rs b/crates/http/src/layers/mod.rs index f489f1b9..b48b3f4d 100644 --- a/crates/http/src/layers/mod.rs +++ b/crates/http/src/layers/mod.rs @@ -18,7 +18,6 @@ pub mod catch_http_codes; pub mod form_urlencoded_request; pub mod json_request; pub mod json_response; -pub mod otel; #[cfg(feature = "client")] pub(crate) mod client; diff --git a/crates/http/src/layers/otel/extract_context.rs b/crates/http/src/layers/otel/extract_context.rs deleted file mode 100644 index 56015e71..00000000 --- a/crates/http/src/layers/otel/extract_context.rs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use http::Request; -use opentelemetry::{trace::TraceContextExt, Context}; -use opentelemetry_http::HeaderExtractor; - -pub trait ExtractContext { - fn extract_context(&self, request: &R) -> Context; -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct DefaultExtractContext; - -impl ExtractContext for DefaultExtractContext { - fn extract_context(&self, _request: &T) -> Context { - Context::current() - } -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct ExtractFromHttpRequest; - -impl ExtractContext> for ExtractFromHttpRequest { - fn extract_context(&self, request: &Request) -> Context { - let headers = request.headers(); - let extractor = HeaderExtractor(headers); - let parent_cx = Context::current(); - - let cx = opentelemetry::global::get_text_map_propagator(|propagator| { - propagator.extract_with_context(&parent_cx, &extractor) - }); - - if cx.span().span_context().is_remote() { - cx - } else { - parent_cx - } - } -} diff --git a/crates/http/src/layers/otel/inject_context.rs b/crates/http/src/layers/otel/inject_context.rs deleted file mode 100644 index edf3cafb..00000000 --- a/crates/http/src/layers/otel/inject_context.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use http::Request; -use opentelemetry::Context; -use opentelemetry_http::HeaderInjector; - -pub trait InjectContext { - type Output; - - fn inject_context(&self, cx: &Context, request: R) -> Self::Output; -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct DefaultInjectContext; - -impl InjectContext for DefaultInjectContext { - type Output = R; - - fn inject_context(&self, _cx: &Context, request: R) -> Self::Output { - request - } -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct InjectInHttpRequest; - -impl InjectContext> for InjectInHttpRequest { - type Output = Request; - - fn inject_context(&self, cx: &Context, mut request: Request) -> Self::Output { - let headers = request.headers_mut(); - let mut injector = HeaderInjector(headers); - - opentelemetry::global::get_text_map_propagator(|propagator| { - propagator.inject_context(cx, &mut injector); - }); - - request - } -} - -#[cfg(feature = "aws-sdk")] -#[derive(Debug, Clone, Copy, Default)] -pub struct InjectInAwsRequest; - -#[cfg(feature = "aws-sdk")] -impl InjectContext for InjectInAwsRequest { - type Output = aws_smithy_http::operation::Request; - - fn inject_context( - &self, - cx: &Context, - mut request: aws_smithy_http::operation::Request, - ) -> Self::Output { - let headers = request.http_mut().headers_mut(); - let mut injector = HeaderInjector(headers); - - opentelemetry::global::get_text_map_propagator(|propagator| { - propagator.inject_context(cx, &mut injector); - }); - - request - } -} diff --git a/crates/http/src/layers/otel/layer.rs b/crates/http/src/layers/otel/layer.rs deleted file mode 100644 index 48c0a891..00000000 --- a/crates/http/src/layers/otel/layer.rs +++ /dev/null @@ -1,334 +0,0 @@ -use std::sync::Arc; - -use opentelemetry::{ - metrics::{Counter, Histogram, UpDownCounter}, - KeyValue, -}; -use tower::Layer; - -use super::{ - extract_context::DefaultExtractContext, inject_context::DefaultInjectContext, - make_metrics_labels::DefaultMakeMetricsLabels, make_span_builder::DefaultMakeSpanBuilder, - on_error::DefaultOnError, on_response::DefaultOnResponse, service::Trace, -}; - -#[derive(Debug, Clone)] -pub struct TraceLayer< - ExtractContext = DefaultExtractContext, - InjectContext = DefaultInjectContext, - MakeSpanBuilder = DefaultMakeSpanBuilder, - MakeMetricsLabels = DefaultMakeMetricsLabels, - OnResponse = DefaultOnResponse, - OnError = DefaultOnError, -> { - tracer: Arc, - extract_context: ExtractContext, - inject_context: InjectContext, - make_span_builder: MakeSpanBuilder, - make_metrics_labels: MakeMetricsLabels, - on_response: OnResponse, - on_error: OnError, - - inflight_requests: UpDownCounter, - request_counter: Counter, - request_histogram: Histogram, - static_attributes: Vec, -} - -impl Default for TraceLayer { - fn default() -> Self { - Self::with_namespace("http") - } -} - -impl TraceLayer { - #[must_use] - pub fn with_namespace(namespace: &'static str) -> Self { - let tracer = Arc::new(opentelemetry::global::tracer("mas-http")); - let meter = opentelemetry::global::meter("mas-http"); - - let inflight_requests = meter - .i64_up_down_counter(format!("{namespace}.inflight_requests")) - .with_description("Number of in-flight requests") - .init(); - let request_counter = meter - .u64_counter(format!("{namespace}.requests_total")) - .with_description("Total number of requests made.") - .init(); - let request_histogram = meter - .f64_histogram(format!("{namespace}.request_duration_seconds")) - .with_description("The request latencies in seconds.") - .init(); - - Self::new( - tracer, - inflight_requests, - request_counter, - request_histogram, - ) - } -} - -impl - TraceLayer< - ExtractContext, - InjectContext, - MakeSpanBuilder, - MakeMetricsLabels, - OnResponse, - OnError, - > -{ - #[must_use] - pub fn new( - tracer: Arc, - inflight_requests: UpDownCounter, - request_counter: Counter, - request_histogram: Histogram, - ) -> Self - where - ExtractContext: Default, - InjectContext: Default, - MakeSpanBuilder: Default, - MakeMetricsLabels: Default, - OnResponse: Default, - OnError: Default, - { - Self { - tracer, - extract_context: ExtractContext::default(), - inject_context: InjectContext::default(), - make_span_builder: MakeSpanBuilder::default(), - make_metrics_labels: MakeMetricsLabels::default(), - on_response: OnResponse::default(), - on_error: OnError::default(), - inflight_requests, - request_counter, - request_histogram, - static_attributes: Vec::new(), - } - } - - #[must_use] - pub fn with_static_attribute(mut self, attribute: KeyValue) -> Self { - self.static_attributes.push(attribute); - self - } - - #[must_use] - pub fn with_static_attributes( - mut self, - attributes: impl IntoIterator, - ) -> Self { - self.static_attributes.extend(attributes); - self - } - - #[must_use] - pub fn extract_context( - self, - extract_context: NewExtractContext, - ) -> TraceLayer< - NewExtractContext, - InjectContext, - MakeSpanBuilder, - MakeMetricsLabels, - OnResponse, - OnError, - > { - TraceLayer { - tracer: self.tracer, - extract_context, - inject_context: self.inject_context, - make_span_builder: self.make_span_builder, - make_metrics_labels: self.make_metrics_labels, - on_response: self.on_response, - on_error: self.on_error, - inflight_requests: self.inflight_requests, - request_counter: self.request_counter, - request_histogram: self.request_histogram, - static_attributes: self.static_attributes, - } - } - - #[must_use] - pub fn inject_context( - self, - inject_context: NewInjectContext, - ) -> TraceLayer< - ExtractContext, - NewInjectContext, - MakeSpanBuilder, - MakeMetricsLabels, - OnResponse, - OnError, - > { - TraceLayer { - tracer: self.tracer, - extract_context: self.extract_context, - inject_context, - make_span_builder: self.make_span_builder, - make_metrics_labels: self.make_metrics_labels, - on_response: self.on_response, - on_error: self.on_error, - inflight_requests: self.inflight_requests, - request_counter: self.request_counter, - request_histogram: self.request_histogram, - static_attributes: self.static_attributes, - } - } - - #[must_use] - pub fn make_span_builder( - self, - make_span_builder: NewMakeSpanBuilder, - ) -> TraceLayer< - ExtractContext, - InjectContext, - NewMakeSpanBuilder, - MakeMetricsLabels, - OnResponse, - OnError, - > { - TraceLayer { - tracer: self.tracer, - extract_context: self.extract_context, - inject_context: self.inject_context, - make_span_builder, - make_metrics_labels: self.make_metrics_labels, - on_response: self.on_response, - on_error: self.on_error, - inflight_requests: self.inflight_requests, - request_counter: self.request_counter, - request_histogram: self.request_histogram, - static_attributes: self.static_attributes, - } - } - - #[must_use] - pub fn make_metrics_labels( - self, - make_metrics_labels: NewMakeMetricsLabels, - ) -> TraceLayer< - ExtractContext, - InjectContext, - MakeSpanBuilder, - NewMakeMetricsLabels, - OnResponse, - OnError, - > { - TraceLayer { - tracer: self.tracer, - extract_context: self.extract_context, - inject_context: self.inject_context, - make_span_builder: self.make_span_builder, - make_metrics_labels, - on_response: self.on_response, - on_error: self.on_error, - inflight_requests: self.inflight_requests, - request_counter: self.request_counter, - request_histogram: self.request_histogram, - static_attributes: self.static_attributes, - } - } - - #[must_use] - pub fn on_response( - self, - on_response: NewOnResponse, - ) -> TraceLayer< - ExtractContext, - InjectContext, - MakeSpanBuilder, - MakeMetricsLabels, - NewOnResponse, - OnError, - > { - TraceLayer { - tracer: self.tracer, - extract_context: self.extract_context, - inject_context: self.inject_context, - make_span_builder: self.make_span_builder, - make_metrics_labels: self.make_metrics_labels, - on_response, - on_error: self.on_error, - inflight_requests: self.inflight_requests, - request_counter: self.request_counter, - request_histogram: self.request_histogram, - static_attributes: self.static_attributes, - } - } - - #[must_use] - pub fn on_error( - self, - on_error: NewOnError, - ) -> TraceLayer< - ExtractContext, - InjectContext, - MakeSpanBuilder, - MakeMetricsLabels, - OnResponse, - NewOnError, - > { - TraceLayer { - tracer: self.tracer, - extract_context: self.extract_context, - inject_context: self.inject_context, - make_span_builder: self.make_span_builder, - make_metrics_labels: self.make_metrics_labels, - on_response: self.on_response, - on_error, - inflight_requests: self.inflight_requests, - request_counter: self.request_counter, - request_histogram: self.request_histogram, - static_attributes: self.static_attributes, - } - } -} - -impl - Layer - for TraceLayer< - ExtractContext, - InjectContext, - MakeSpanBuilder, - MakeMetricsLabels, - OnResponse, - OnError, - > -where - ExtractContext: Clone, - InjectContext: Clone, - MakeSpanBuilder: Clone, - MakeMetricsLabels: Clone, - OnResponse: Clone, - OnError: Clone, -{ - type Service = Trace< - ExtractContext, - InjectContext, - MakeSpanBuilder, - MakeMetricsLabels, - OnResponse, - OnError, - S, - >; - - fn layer(&self, inner: S) -> Self::Service { - Trace::new( - inner, - self.tracer.clone(), - self.extract_context.clone(), - self.inject_context.clone(), - self.make_span_builder.clone(), - self.make_metrics_labels.clone(), - self.on_response.clone(), - self.on_error.clone(), - self.inflight_requests.clone(), - self.request_counter.clone(), - self.request_histogram.clone(), - self.static_attributes.clone(), - ) - } -} diff --git a/crates/http/src/layers/otel/make_metrics_labels.rs b/crates/http/src/layers/otel/make_metrics_labels.rs deleted file mode 100644 index c7dbaf06..00000000 --- a/crates/http/src/layers/otel/make_metrics_labels.rs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#[cfg(feature = "axum")] -use std::borrow::Cow; - -use http::Request; -use opentelemetry::KeyValue; - -use super::utils::http_method_str; - -pub trait MakeMetricsLabels { - fn make_metrics_labels(&self, request: &R) -> Vec; -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct DefaultMakeMetricsLabels; - -impl MakeMetricsLabels for DefaultMakeMetricsLabels { - fn make_metrics_labels(&self, _request: &R) -> Vec { - Vec::new() - } -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct MetricsLabelsFromHttpRequest; - -impl MakeMetricsLabels> for MetricsLabelsFromHttpRequest { - fn make_metrics_labels(&self, request: &Request) -> Vec { - vec![KeyValue::new("method", http_method_str(request.method()))] - } -} - -#[cfg(feature = "axum")] -#[derive(Debug, Clone, Copy, Default)] -pub struct MetricsLabelsFromAxumRequest; - -#[cfg(feature = "axum")] -impl MakeMetricsLabels> for MetricsLabelsFromAxumRequest { - fn make_metrics_labels(&self, request: &Request) -> Vec { - let path: Cow<'static, str> = request - .extensions() - .get::() - .map_or("FALLBACK".into(), |path| path.as_str().to_owned().into()); - - vec![ - KeyValue::new("method", http_method_str(request.method())), - KeyValue::new("route", path), - ] - } -} diff --git a/crates/http/src/layers/otel/make_span_builder.rs b/crates/http/src/layers/otel/make_span_builder.rs deleted file mode 100644 index 5e057321..00000000 --- a/crates/http/src/layers/otel/make_span_builder.rs +++ /dev/null @@ -1,235 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#[cfg(any(feature = "axum", feature = "aws-sdk"))] -use std::borrow::Cow; - -#[cfg(feature = "axum")] -use axum::extract::{ConnectInfo, MatchedPath}; -use headers::{ContentLength, HeaderMapExt, Host, UserAgent}; -use http::Request; -#[cfg(feature = "client")] -use hyper::client::connect::dns::Name; -use opentelemetry::trace::{SpanBuilder, SpanKind}; -use opentelemetry_semantic_conventions::trace as SC; - -use super::utils::{http_flavor, http_method_str}; - -pub trait MakeSpanBuilder { - fn make_span_builder(&self, request: &R) -> SpanBuilder; -} - -#[derive(Debug, Clone, Copy)] -pub struct DefaultMakeSpanBuilder { - operation: &'static str, -} - -impl DefaultMakeSpanBuilder { - #[must_use] - pub fn new(operation: &'static str) -> Self { - Self { operation } - } -} - -impl Default for DefaultMakeSpanBuilder { - fn default() -> Self { - Self { - operation: "service", - } - } -} - -impl MakeSpanBuilder for DefaultMakeSpanBuilder { - fn make_span_builder(&self, _request: &R) -> SpanBuilder { - SpanBuilder::from_name(self.operation) - } -} - -#[derive(Debug, Clone)] -pub struct SpanFromHttpRequest { - operation: &'static str, - span_kind: SpanKind, -} - -impl SpanFromHttpRequest { - #[must_use] - pub fn server() -> Self { - Self { - operation: "http-server", - span_kind: SpanKind::Server, - } - } - - #[must_use] - pub fn inner_client() -> Self { - Self { - operation: "http-client", - span_kind: SpanKind::Client, - } - } - - #[must_use] - pub fn client(operation: &'static str) -> Self { - Self { - operation, - span_kind: SpanKind::Client, - } - } -} - -impl MakeSpanBuilder> for SpanFromHttpRequest { - fn make_span_builder(&self, request: &Request) -> SpanBuilder { - let mut attributes = vec![ - SC::HTTP_METHOD.string(http_method_str(request.method())), - SC::HTTP_FLAVOR.string(http_flavor(request.version())), - SC::HTTP_TARGET.string(request.uri().to_string()), - ]; - - let headers = request.headers(); - - if let Some(host) = headers.typed_get::() { - attributes.push(SC::HTTP_HOST.string(host.to_string())); - } - - if let Some(user_agent) = headers.typed_get::() { - attributes.push(SC::HTTP_USER_AGENT.string(user_agent.to_string())); - } - - if let Some(ContentLength(content_length)) = headers.typed_get() { - if let Ok(content_length) = content_length.try_into() { - attributes.push(SC::HTTP_REQUEST_CONTENT_LENGTH.i64(content_length)); - } - } - - SpanBuilder::from_name(self.operation) - .with_kind(self.span_kind.clone()) - .with_attributes(attributes) - } -} - -#[cfg(feature = "axum")] -#[derive(Debug, Clone)] -pub struct SpanFromAxumRequest; - -#[cfg(feature = "axum")] -impl MakeSpanBuilder> for SpanFromAxumRequest { - fn make_span_builder(&self, request: &Request) -> SpanBuilder { - let (name, route): (String, Cow<'static, str>) = - if let Some(path) = request.extensions().get::() { - let path = path.as_str().to_owned(); - let name = path.clone(); - (name, path.into()) - } else { - (request.uri().path().to_owned(), Cow::Borrowed("FALLBACK")) - }; - - let mut attributes = vec![ - SC::HTTP_METHOD.string(http_method_str(request.method())), - SC::HTTP_FLAVOR.string(http_flavor(request.version())), - SC::HTTP_TARGET.string(request.uri().to_string()), - SC::HTTP_ROUTE.string(route), - ]; - - let headers = request.headers(); - - if let Some(host) = headers.typed_get::() { - attributes.push(SC::HTTP_HOST.string(host.to_string())); - } - - if let Some(user_agent) = headers.typed_get::() { - attributes.push(SC::HTTP_USER_AGENT.string(user_agent.to_string())); - } - - if let Some(ContentLength(content_length)) = headers.typed_get() { - if let Ok(content_length) = content_length.try_into() { - attributes.push(SC::HTTP_REQUEST_CONTENT_LENGTH.i64(content_length)); - } - } - - if let Some(ConnectInfo(addr)) = request - .extensions() - .get::>() - { - attributes.push(SC::NET_TRANSPORT.string("ip_tcp")); - attributes.push(SC::NET_PEER_IP.string(addr.ip().to_string())); - attributes.push(SC::NET_PEER_PORT.i64(addr.port().into())); - } - - SpanBuilder::from_name(name) - .with_kind(SpanKind::Server) - .with_attributes(attributes) - } -} - -#[cfg(feature = "client")] -#[derive(Debug, Clone, Copy, Default)] -pub struct SpanFromDnsRequest; - -#[cfg(feature = "client")] -impl MakeSpanBuilder for SpanFromDnsRequest { - fn make_span_builder(&self, request: &Name) -> SpanBuilder { - let attributes = vec![SC::NET_HOST_NAME.string(request.as_str().to_owned())]; - - SpanBuilder::from_name("resolve") - .with_kind(SpanKind::Client) - .with_attributes(attributes) - } -} - -#[cfg(feature = "aws-sdk")] -#[derive(Debug, Clone, Copy, Default)] -pub struct SpanFromAwsRequest; - -#[cfg(feature = "aws-sdk")] -impl MakeSpanBuilder for SpanFromAwsRequest { - fn make_span_builder(&self, request: &aws_smithy_http::operation::Request) -> SpanBuilder { - let properties = request.properties(); - let request = request.http(); - let mut attributes = vec![ - SC::RPC_SYSTEM.string("aws-api"), - SC::HTTP_METHOD.string(http_method_str(request.method())), - SC::HTTP_FLAVOR.string(http_flavor(request.version())), - SC::HTTP_TARGET.string(request.uri().to_string()), - ]; - - let mut name = Cow::Borrowed("aws_sdk"); - if let Some(metadata) = properties.get::() { - attributes.push(SC::RPC_SERVICE.string(metadata.service().to_owned())); - attributes.push(SC::RPC_METHOD.string(metadata.name().to_owned())); - name = Cow::Owned(metadata.name().to_owned()); - } else if let Some(service) = properties.get::() { - attributes.push(SC::RPC_SERVICE.string(service.as_ref().to_owned())); - } - - let headers = request.headers(); - - if let Some(host) = headers.typed_get::() { - attributes.push(SC::HTTP_HOST.string(host.to_string())); - } - - if let Some(user_agent) = headers.typed_get::() { - attributes.push(SC::HTTP_USER_AGENT.string(user_agent.to_string())); - } - - if let Some(ContentLength(content_length)) = headers.typed_get() { - if let Ok(content_length) = content_length.try_into() { - attributes.push(SC::HTTP_REQUEST_CONTENT_LENGTH.i64(content_length)); - } - } - - SpanBuilder::from_name(name) - .with_kind(SpanKind::Client) - .with_attributes(attributes) - } -} diff --git a/crates/http/src/layers/otel/mod.rs b/crates/http/src/layers/otel/mod.rs deleted file mode 100644 index 5054670b..00000000 --- a/crates/http/src/layers/otel/mod.rs +++ /dev/null @@ -1,197 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod extract_context; -mod inject_context; -mod layer; -mod make_metrics_labels; -mod make_span_builder; -mod on_error; -mod on_response; -mod service; -mod utils; - -pub type TraceHttpServerLayer = TraceLayer< - ExtractFromHttpRequest, - DefaultInjectContext, - SpanFromHttpRequest, - MetricsLabelsFromHttpRequest, - OnHttpResponse, - DefaultOnError, ->; - -pub type TraceHttpServer = Trace< - ExtractFromHttpRequest, - DefaultInjectContext, - SpanFromHttpRequest, - MetricsLabelsFromHttpRequest, - OnHttpResponse, - DefaultOnError, - S, ->; - -#[cfg(feature = "axum")] -pub type TraceAxumServerLayer = TraceLayer< - ExtractFromHttpRequest, - DefaultInjectContext, - SpanFromAxumRequest, - MetricsLabelsFromAxumRequest, - OnHttpResponse, - DefaultOnError, ->; - -#[cfg(feature = "axum")] -pub type TraceAxumServer = Trace< - ExtractFromHttpRequest, - DefaultInjectContext, - SpanFromAxumRequest, - MetricsLabelsFromAxumRequest, - OnHttpResponse, - DefaultOnError, - S, ->; - -pub type TraceHttpClientLayer = TraceLayer< - DefaultExtractContext, - InjectInHttpRequest, - SpanFromHttpRequest, - MetricsLabelsFromHttpRequest, - OnHttpResponse, - DefaultOnError, ->; - -pub type TraceHttpClient = Trace< - DefaultExtractContext, - InjectInHttpRequest, - SpanFromHttpRequest, - MetricsLabelsFromHttpRequest, - OnHttpResponse, - DefaultOnError, - S, ->; - -#[cfg(feature = "client")] -pub type TraceDnsLayer = TraceLayer< - DefaultExtractContext, - DefaultInjectContext, - SpanFromDnsRequest, - DefaultMakeMetricsLabels, - DefaultOnResponse, - DefaultOnError, ->; - -#[cfg(feature = "client")] -pub type TraceDns = Trace< - DefaultExtractContext, - DefaultInjectContext, - SpanFromDnsRequest, - DefaultMakeMetricsLabels, - DefaultOnResponse, - DefaultOnError, - S, ->; - -#[cfg(feature = "aws-sdk")] -pub type TraceAwsSdkClientLayer = TraceLayer< - DefaultExtractContext, - InjectInAwsRequest, - SpanFromAwsRequest, - DefaultMakeMetricsLabels, - OnAwsResponse, - DebugOnError, ->; - -#[cfg(feature = "aws-sdk")] -pub type TraceAwsSdkClient = Trace< - DefaultExtractContext, - InjectInAwsRequest, - SpanFromAwsRequest, - DefaultMakeMetricsLabels, - OnAwsResponse, - DebugOnError, - S, ->; - -impl TraceHttpServerLayer { - #[must_use] - pub fn http_server() -> Self { - TraceLayer::with_namespace("http_server") - .make_span_builder(SpanFromHttpRequest::server()) - .make_metrics_labels(MetricsLabelsFromHttpRequest::default()) - .on_response(OnHttpResponse) - .extract_context(ExtractFromHttpRequest) - } -} - -#[cfg(feature = "axum")] -impl TraceAxumServerLayer { - #[must_use] - pub fn axum() -> Self { - TraceLayer::with_namespace("http_server") - .make_span_builder(SpanFromAxumRequest) - .make_metrics_labels(MetricsLabelsFromAxumRequest::default()) - .on_response(OnHttpResponse) - .extract_context(ExtractFromHttpRequest) - } -} - -impl TraceHttpClientLayer { - #[must_use] - pub fn http_client(operation: &'static str) -> Self { - TraceLayer::with_namespace("http_client") - .make_span_builder(SpanFromHttpRequest::client(operation)) - .make_metrics_labels(MetricsLabelsFromHttpRequest::default()) - .on_response(OnHttpResponse) - .inject_context(InjectInHttpRequest) - } - - #[must_use] - pub fn inner_http_client() -> Self { - TraceLayer::with_namespace("inner_http_client") - .make_span_builder(SpanFromHttpRequest::inner_client()) - .make_metrics_labels(MetricsLabelsFromHttpRequest::default()) - .on_response(OnHttpResponse) - .inject_context(InjectInHttpRequest) - } -} - -#[cfg(feature = "client")] -impl TraceDnsLayer { - #[must_use] - pub fn dns() -> Self { - TraceLayer::with_namespace("dns").make_span_builder(SpanFromDnsRequest) - } -} - -#[cfg(feature = "aws-sdk")] -impl TraceAwsSdkClientLayer { - #[must_use] - pub fn aws_sdk() -> Self { - TraceLayer::with_namespace("aws_sdk") - .make_span_builder(SpanFromAwsRequest) - .on_response(OnAwsResponse) - .on_error(DebugOnError) - .inject_context(InjectInAwsRequest) - } -} - -#[cfg(feature = "client")] -use self::make_metrics_labels::DefaultMakeMetricsLabels; -#[cfg(feature = "axum")] -use self::make_metrics_labels::MetricsLabelsFromAxumRequest; -use self::make_metrics_labels::MetricsLabelsFromHttpRequest; -pub use self::{ - extract_context::*, inject_context::*, layer::*, make_span_builder::*, on_error::*, - on_response::*, service::*, -}; diff --git a/crates/http/src/layers/otel/on_error.rs b/crates/http/src/layers/otel/on_error.rs deleted file mode 100644 index 8bd70d43..00000000 --- a/crates/http/src/layers/otel/on_error.rs +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use opentelemetry::{trace::SpanRef, KeyValue}; -use opentelemetry_semantic_conventions::trace::EXCEPTION_MESSAGE; - -pub trait OnError { - fn on_error(&self, span: &SpanRef<'_>, metrics_labels: &mut Vec, err: &E); -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct DefaultOnError; - -impl OnError for DefaultOnError -where - E: std::fmt::Display, -{ - fn on_error(&self, span: &SpanRef<'_>, _metrics_labels: &mut Vec, err: &E) { - let attributes = vec![EXCEPTION_MESSAGE.string(err.to_string())]; - span.add_event("exception".to_owned(), attributes); - } -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct DebugOnError; - -impl OnError for DebugOnError -where - E: std::fmt::Debug, -{ - fn on_error(&self, span: &SpanRef<'_>, _metrics_labels: &mut Vec, err: &E) { - let attributes = vec![EXCEPTION_MESSAGE.string(format!("{err:?}"))]; - span.add_event("exception".to_owned(), attributes); - } -} diff --git a/crates/http/src/layers/otel/on_response.rs b/crates/http/src/layers/otel/on_response.rs deleted file mode 100644 index 0b3bccce..00000000 --- a/crates/http/src/layers/otel/on_response.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use headers::{ContentLength, HeaderMapExt}; -use http::Response; -#[cfg(feature = "client")] -use hyper::client::connect::HttpInfo; -use opentelemetry::{trace::SpanRef, KeyValue}; -use opentelemetry_semantic_conventions::trace as SC; - -pub trait OnResponse { - fn on_response(&self, span: &SpanRef<'_>, metrics_labels: &mut Vec, response: &R); -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct DefaultOnResponse; - -impl OnResponse for DefaultOnResponse { - fn on_response(&self, _span: &SpanRef<'_>, _metrics_labels: &mut Vec, _response: &R) { - } -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct OnHttpResponse; - -impl OnResponse> for OnHttpResponse { - fn on_response( - &self, - span: &SpanRef<'_>, - metrics_labels: &mut Vec, - response: &Response, - ) { - let status_code = i64::from(response.status().as_u16()); - span.set_attribute(SC::HTTP_STATUS_CODE.i64(status_code)); - metrics_labels.push(KeyValue::new("status_code", status_code)); - - if let Some(ContentLength(content_length)) = response.headers().typed_get() { - if let Ok(content_length) = content_length.try_into() { - span.set_attribute(SC::HTTP_RESPONSE_CONTENT_LENGTH.i64(content_length)); - } - } - - #[cfg(feature = "client")] - // Get local and remote address from hyper's HttpInfo injected by the - // HttpConnector - if let Some(info) = response.extensions().get::() { - span.set_attribute(SC::NET_PEER_IP.string(info.remote_addr().ip().to_string())); - span.set_attribute(SC::NET_PEER_PORT.i64(info.remote_addr().port().into())); - span.set_attribute(SC::NET_HOST_IP.string(info.local_addr().ip().to_string())); - span.set_attribute(SC::NET_HOST_PORT.i64(info.local_addr().port().into())); - } - } -} - -#[cfg(feature = "aws-sdk")] -#[derive(Debug, Clone, Copy, Default)] -pub struct OnAwsResponse; - -#[cfg(feature = "aws-sdk")] -impl OnResponse for OnAwsResponse { - fn on_response( - &self, - span: &SpanRef<'_>, - metrics_labels: &mut Vec, - response: &aws_smithy_http::operation::Response, - ) { - let response = response.http(); - let status_code = i64::from(response.status().as_u16()); - span.set_attribute(SC::HTTP_STATUS_CODE.i64(status_code)); - metrics_labels.push(KeyValue::new("status_code", status_code)); - - if let Some(ContentLength(content_length)) = response.headers().typed_get() { - if let Ok(content_length) = content_length.try_into() { - span.set_attribute(SC::HTTP_RESPONSE_CONTENT_LENGTH.i64(content_length)); - } - } - - #[cfg(feature = "client")] - // Get local and remote address from hyper's HttpInfo injected by the - // HttpConnector - if let Some(info) = response.extensions().get::() { - span.set_attribute(SC::NET_PEER_IP.string(info.remote_addr().ip().to_string())); - span.set_attribute(SC::NET_PEER_PORT.i64(info.remote_addr().port().into())); - span.set_attribute(SC::NET_HOST_IP.string(info.local_addr().ip().to_string())); - span.set_attribute(SC::NET_HOST_PORT.i64(info.local_addr().port().into())); - } - } -} diff --git a/crates/http/src/layers/otel/service.rs b/crates/http/src/layers/otel/service.rs deleted file mode 100644 index 8dc49d6c..00000000 --- a/crates/http/src/layers/otel/service.rs +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{sync::Arc, task::Poll, time::SystemTime}; - -use futures_util::{future::BoxFuture, FutureExt as _}; -use opentelemetry::{ - metrics::{Counter, Histogram, UpDownCounter}, - trace::{FutureExt as _, TraceContextExt}, - Context, KeyValue, -}; -use tower::Service; - -use super::{ - extract_context::ExtractContext, inject_context::InjectContext, - make_metrics_labels::MakeMetricsLabels, make_span_builder::MakeSpanBuilder, on_error::OnError, - on_response::OnResponse, -}; - -#[derive(Debug, Clone)] -pub struct Trace< - ExtractContext, - InjectContext, - MakeSpanBuilder, - MakeMetricsLabels, - OnResponse, - OnError, - S, -> { - inner: S, - tracer: Arc, - extract_context: ExtractContext, - inject_context: InjectContext, - make_span_builder: MakeSpanBuilder, - make_metrics_labels: MakeMetricsLabels, - on_response: OnResponse, - on_error: OnError, - - inflight_requests: UpDownCounter, - request_counter: Counter, - request_histogram: Histogram, - static_attributes: Vec, -} - -impl - Trace -{ - #[allow(clippy::too_many_arguments)] - pub fn new( - service: S, - tracer: Arc, - extract_context: ExtractContext, - inject_context: InjectContext, - make_span_builder: MakeSpanBuilder, - make_metrics_labels: MakeMetricsLabels, - on_response: OnResponse, - on_error: OnError, - inflight_requests: UpDownCounter, - request_counter: Counter, - request_histogram: Histogram, - static_attributes: Vec, - ) -> Self { - Self { - inner: service, - tracer, - - extract_context, - inject_context, - make_span_builder, - make_metrics_labels, - on_response, - on_error, - - inflight_requests, - request_counter, - request_histogram, - static_attributes, - } - } -} - -struct InFlightGuard { - context: Context, - meter: UpDownCounter, - attributes: Vec, -} - -impl InFlightGuard { - fn increment(context: &Context, meter: &UpDownCounter, attributes: &[KeyValue]) -> Self { - meter.add(context, 1, attributes); - Self { - context: context.clone(), - meter: meter.clone(), - attributes: attributes.to_vec(), - } - } -} - -impl Drop for InFlightGuard { - fn drop(&mut self) { - self.meter.add(&self.context, -1, &self.attributes); - } -} - -impl< - Req, - S, - ExtractContextT, - InjectContextT, - MakeSpanBuilderT, - MakeMetricsLabelsT, - OnResponseT, - OnErrorT, - > Service - for Trace< - ExtractContextT, - InjectContextT, - MakeSpanBuilderT, - MakeMetricsLabelsT, - OnResponseT, - OnErrorT, - S, - > -where - ExtractContextT: ExtractContext + Send, - InjectContextT: InjectContext + Send, - S: Service + Send, - OnResponseT: OnResponse + Send + Clone + 'static, - OnErrorT: OnError + Send + Clone + 'static, - MakeSpanBuilderT: MakeSpanBuilder + Send, - MakeMetricsLabelsT: MakeMetricsLabels + Send, - S::Future: Send + 'static, -{ - type Response = S::Response; - type Error = S::Error; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, request: Req) -> Self::Future { - let request_counter = self.request_counter.clone(); - let request_histogram = self.request_histogram.clone(); - let start_time = SystemTime::now(); - - let cx = self.extract_context.extract_context(&request); - let mut span_builder = self.make_span_builder.make_span_builder(&request); - let mut metrics_labels = self.make_metrics_labels.make_metrics_labels(&request); - - // Add the static attributes to the metrics and the span - metrics_labels.extend_from_slice(&self.static_attributes[..]); - let mut span_attributes = span_builder.attributes.unwrap_or_default(); - span_attributes.extend(self.static_attributes.iter().cloned()); - span_builder.attributes = Some(span_attributes); - - let span = span_builder.start_with_context(self.tracer.as_ref(), &cx); - - let cx = cx.with_span(span); - let request = self.inject_context.inject_context(&cx, request); - - let guard = InFlightGuard::increment(&cx, &self.inflight_requests, &metrics_labels); - - let on_response = self.on_response.clone(); - let on_error = self.on_error.clone(); - let attachment = cx.clone().attach(); - let ret = self - .inner - .call(request) - .with_context(cx.clone()) - .inspect(move |r| { - // This ensures the guard gets moved to the future. In case the future panics, - // it will be dropped anyway, ensuring the in-flight counter stays accurate - let _guard = guard; - - let span = cx.span(); - match r { - Ok(response) => on_response.on_response(&span, &mut metrics_labels, response), - Err(err) => on_error.on_error(&span, &mut metrics_labels, err), - }; - - request_counter.add(&cx, 1, &metrics_labels); - request_histogram.record( - &cx, - start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()), - &metrics_labels, - ); - - span.end(); - }) - .boxed(); - - drop(attachment); - - ret - } -} diff --git a/crates/http/src/layers/otel/utils.rs b/crates/http/src/layers/otel/utils.rs deleted file mode 100644 index fc2053df..00000000 --- a/crates/http/src/layers/otel/utils.rs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::borrow::Cow; - -use http::{Method, Version}; - -#[inline] -pub(super) fn http_method_str(method: &Method) -> Cow<'static, str> { - match method { - &Method::OPTIONS => "OPTIONS".into(), - &Method::GET => "GET".into(), - &Method::POST => "POST".into(), - &Method::PUT => "PUT".into(), - &Method::DELETE => "DELETE".into(), - &Method::HEAD => "HEAD".into(), - &Method::TRACE => "TRACE".into(), - &Method::CONNECT => "CONNECT".into(), - &Method::PATCH => "PATCH".into(), - other => other.to_string().into(), - } -} - -#[inline] -pub(super) fn http_flavor(version: Version) -> Cow<'static, str> { - match version { - Version::HTTP_09 => "0.9".into(), - Version::HTTP_10 => "1.0".into(), - Version::HTTP_11 => "1.1".into(), - Version::HTTP_2 => "2.0".into(), - Version::HTTP_3 => "3.0".into(), - other => format!("{other:?}").into(), - } -} diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index 81546cc7..83c11c9f 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! [`tower`] layers and services to help building HTTP client and servers +//! [`tower`] layers and services to help building HTTP client and servers #![forbid(unsafe_code)] #![deny( @@ -47,7 +47,6 @@ pub use self::{ form_urlencoded_request::{self, FormUrlencodedRequest, FormUrlencodedRequestLayer}, json_request::{self, JsonRequest, JsonRequestLayer}, json_response::{self, JsonResponse, JsonResponseLayer}, - otel, }, service::{BoxCloneSyncService, HttpService}, }; diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 48cd4043..26114244 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -19,13 +19,15 @@ thiserror = "1.0.40" tower = "0.4.13" tracing = "0.1.37" tracing-opentelemetry = "0.18.0" +opentelemetry = "0.18.0" ulid = "1.0.0" url = "2.3.1" serde = { version = "1.0.159", features = ["derive"] } mas-axum-utils = { path = "../axum-utils" } -mas-storage = { path = "../storage" } -mas-storage-pg = { path = "../storage-pg" } +mas-data-model = { path = "../data-model" } mas-email = { path = "../email" } mas-http = { path = "../http" } -mas-data-model = { path = "../data-model" } +mas-storage = { path = "../storage" } +mas-storage-pg = { path = "../storage-pg" } +mas-tower = { path = "../tower" } diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 5fa4d1ef..97a668ef 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -28,7 +28,7 @@ use chrono::{DateTime, Utc}; use mas_storage::{oauth2::OAuth2AccessTokenRepository, RepositoryAccess}; use tracing::{debug, info}; -use crate::{JobContextExt, State}; +use crate::{utils::metrics_layer, JobContextExt, State}; #[derive(Default, Clone)] pub struct CleanupExpiredTokensJob { @@ -77,6 +77,7 @@ pub(crate) fn register( let worker = WorkerBuilder::new(worker_name) .stream(CronStream::new(schedule).to_stream()) .layer(state.inject()) + .layer(metrics_layer::()) .build_fn(cleanup_expired_tokens); monitor.register(worker) diff --git a/crates/tasks/src/email.rs b/crates/tasks/src/email.rs index fb3cbeaf..d66874af 100644 --- a/crates/tasks/src/email.rs +++ b/crates/tasks/src/email.rs @@ -27,7 +27,10 @@ use mas_storage::job::{JobWithSpanContext, VerifyEmailJob}; use rand::{distributions::Uniform, Rng}; use tracing::info; -use crate::{layers::TracingLayer, JobContextExt, State}; +use crate::{ + utils::{metrics_layer, trace_layer}, + JobContextExt, State, +}; #[tracing::instrument( name = "job.verify_email", @@ -98,7 +101,8 @@ pub(crate) fn register( let worker_name = format!("{job}-{suffix}", job = VerifyEmailJob::NAME); let worker = WorkerBuilder::new(worker_name) .layer(state.inject()) - .layer(TracingLayer::new()) + .layer(trace_layer::()) + .layer(metrics_layer::>()) .with_storage(storage) .build_fn(verify_email); monitor.register(worker) diff --git a/crates/tasks/src/layers.rs b/crates/tasks/src/layers.rs deleted file mode 100644 index 4b54ce97..00000000 --- a/crates/tasks/src/layers.rs +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2023 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::task::{Context, Poll}; - -use apalis_core::{job::Job, request::JobRequest}; -use mas_storage::job::JobWithSpanContext; -use tower::{Layer, Service}; -use tracing::{info_span, instrument::Instrumented, Instrument}; -use tracing_opentelemetry::OpenTelemetrySpanExt; - -pub struct TracingLayer; - -impl TracingLayer { - pub fn new() -> Self { - Self - } -} - -impl Layer for TracingLayer { - type Service = TracingService; - - fn layer(&self, inner: S) -> Self::Service { - TracingService { inner } - } -} - -pub struct TracingService { - inner: S, -} - -impl Service>> for TracingService -where - J: Job, - S: Service>>, -{ - type Response = S::Response; - type Error = S::Error; - type Future = Instrumented; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, req: JobRequest>) -> Self::Future { - let span = info_span!( - "job.run", - job.id = %req.id(), - job.attempts = req.attempts(), - job.name = J::NAME, - ); - - if let Some(context) = req.inner().span_context() { - span.add_link(context); - } - - self.inner.call(req).instrument(span) - } -} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index a115752a..b3b4d191 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -31,8 +31,8 @@ use tracing::debug; mod database; mod email; -mod layers; mod matrix; +mod utils; pub use self::matrix::HomeserverConnection; @@ -103,15 +103,12 @@ impl State { &self.homeserver } - pub async fn http_client( - &self, - operation: &'static str, - ) -> Result>, ClientInitError> + pub async fn http_client(&self) -> Result>, ClientInitError> where B: mas_axum_utils::axum::body::HttpBody + Send, B::Data: Send, { - self.http_client_factory.client(operation).await + self.http_client_factory.client().await } } diff --git a/crates/tasks/src/matrix.rs b/crates/tasks/src/matrix.rs index 7f1d68ad..9e4f69c4 100644 --- a/crates/tasks/src/matrix.rs +++ b/crates/tasks/src/matrix.rs @@ -33,10 +33,13 @@ use mas_storage::{ }; use serde::{Deserialize, Serialize}; use tower::{Service, ServiceExt}; -use tracing::info; +use tracing::{info, info_span, Instrument}; use url::Url; -use crate::{layers::TracingLayer, JobContextExt, State}; +use crate::{ + utils::{metrics_layer, trace_layer}, + JobContextExt, State, +}; pub struct HomeserverConnection { homeserver: String, @@ -101,7 +104,7 @@ async fn provision_user( let state = ctx.state(); let matrix = state.matrix_connection(); let mut client = state - .http_client("matrix.provision_user") + .http_client() .await? .request_bytes_to_body() .json_request(); @@ -158,7 +161,12 @@ async fn provision_user( let req = req.body(body).context("Failed to build request")?; - let response = client.ready().await?.call(req).await?; + let response = client + .ready() + .await? + .call(req) + .instrument(info_span!("matrix.provision_user")) + .await?; match response.status() { StatusCode::CREATED => info!(%user.id, %mxid, "User created"), @@ -194,7 +202,7 @@ async fn provision_device( let state = ctx.state(); let matrix = state.matrix_connection(); let mut client = state - .http_client("matrix.provision_device") + .http_client() .await? .request_bytes_to_body() .json_request(); @@ -225,7 +233,12 @@ async fn provision_device( }) .context("Failed to build request")?; - let response = client.ready().await?.call(req).await?; + let response = client + .ready() + .await? + .call(req) + .instrument(info_span!("matrix.create_device")) + .await?; match response.status() { StatusCode::CREATED => { @@ -255,7 +268,7 @@ async fn delete_device( ) -> Result<(), anyhow::Error> { let state = ctx.state(); let matrix = state.matrix_connection(); - let mut client = state.http_client("matrix.delete_device").await?; + let mut client = state.http_client().await?; let mut repo = state.repository().await?; let user = repo @@ -284,7 +297,12 @@ async fn delete_device( .body(EmptyBody::new()) .context("Failed to build request")?; - let response = client.ready().await?.call(req).await?; + let response = client + .ready() + .await? + .call(req) + .instrument(info_span!("matrix.delete_device")) + .await?; match response.status() { StatusCode::OK => info!(%user.id, %mxid, "Device deleted"), @@ -303,7 +321,8 @@ pub(crate) fn register( let worker_name = format!("{job}-{suffix}", job = ProvisionUserJob::NAME); let provision_user_worker = WorkerBuilder::new(worker_name) .layer(state.inject()) - .layer(TracingLayer::new()) + .layer(trace_layer()) + .layer(metrics_layer::>()) .with_storage(storage) .build_fn(provision_user); @@ -311,7 +330,8 @@ pub(crate) fn register( let worker_name = format!("{job}-{suffix}", job = ProvisionDeviceJob::NAME); let provision_device_worker = WorkerBuilder::new(worker_name) .layer(state.inject()) - .layer(TracingLayer::new()) + .layer(trace_layer()) + .layer(metrics_layer::>()) .with_storage(storage) .build_fn(provision_device); @@ -319,7 +339,8 @@ pub(crate) fn register( let worker_name = format!("{job}-{suffix}", job = DeleteDeviceJob::NAME); let delete_device_worker = WorkerBuilder::new(worker_name) .layer(state.inject()) - .layer(TracingLayer::new()) + .layer(trace_layer()) + .layer(metrics_layer::>()) .with_storage(storage) .build_fn(delete_device); diff --git a/crates/tasks/src/utils.rs b/crates/tasks/src/utils.rs new file mode 100644 index 00000000..6f071c45 --- /dev/null +++ b/crates/tasks/src/utils.rs @@ -0,0 +1,79 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use apalis_core::{job::Job, request::JobRequest}; +use mas_storage::job::JobWithSpanContext; +use mas_tower::{ + make_span_fn, DurationRecorderLayer, FnWrapper, InFlightCounterLayer, TraceLayer, KV, +}; +use opentelemetry::{Key, KeyValue}; +use tracing::info_span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +const JOB_NAME: Key = Key::from_static_str("job.name"); +const JOB_STATUS: Key = Key::from_static_str("job.status"); + +fn make_span_for_job_request(req: &JobRequest>) -> tracing::Span +where + J: Job, +{ + let span = info_span!( + "job.run", + "otel.kind" = "consumer", + "otel.status_code" = tracing::field::Empty, + "job.id" = %req.id(), + "job.attempts" = req.attempts(), + "job.name" = J::NAME, + ); + + if let Some(context) = req.inner().span_context() { + span.add_link(context); + } + + span +} + +type TraceLayerForJob = TraceLayer< + FnWrapper>) -> tracing::Span>, + KV<&'static str>, + KV<&'static str>, +>; + +pub(crate) fn trace_layer() -> TraceLayerForJob +where + J: Job, +{ + TraceLayer::new(make_span_fn( + make_span_for_job_request:: as fn(&JobRequest>) -> tracing::Span, + )) + .on_response(KV("otel.status_code", "OK")) + .on_error(KV("otel.status_code", "ERROR")) +} + +pub(crate) fn metrics_layer() -> ( + DurationRecorderLayer, + InFlightCounterLayer, +) +where + J: Job, +{ + let duration_recorder = DurationRecorderLayer::new("job.run.duration") + .on_request(JOB_NAME.string(J::NAME)) + .on_response(JOB_STATUS.string("success")) + .on_error(JOB_STATUS.string("error")); + let in_flight_counter = + InFlightCounterLayer::new("job.run.active").on_request(JOB_NAME.string(J::NAME)); + + (duration_recorder, in_flight_counter) +} diff --git a/crates/tower/Cargo.toml b/crates/tower/Cargo.toml new file mode 100644 index 00000000..fac21e91 --- /dev/null +++ b/crates/tower/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "mas-tower" +version = "0.1.0" +authors = ["Quentin Gliech "] +edition = "2021" +license = "Apache-2.0" + +[dependencies] +aws-smithy-http = { version = "0.54.4", optional = true } +http = "0.2.9" +tracing = "0.1.37" +tracing-opentelemetry = "0.18.0" +tower = "0.4.13" +tokio = { version = "1.27.0", features = ["time"] } +opentelemetry = { version = "0.18.0", features = ["metrics"] } +opentelemetry-http = "0.7.0" +pin-project-lite = "0.2.9" + +[features] +aws-sdk = ["dep:aws-smithy-http"] \ No newline at end of file diff --git a/crates/tower/src/lib.rs b/crates/tower/src/lib.rs new file mode 100644 index 00000000..78ab22c5 --- /dev/null +++ b/crates/tower/src/lib.rs @@ -0,0 +1,32 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![deny(clippy::all)] +#![warn(clippy::pedantic)] +#![allow(clippy::module_name_repetitions)] + +mod metrics; +mod trace_context; +mod tracing; +mod utils; + +pub use self::{metrics::*, trace_context::*, tracing::*, utils::*}; + +fn meter() -> opentelemetry::metrics::Meter { + opentelemetry::global::meter_with_version( + env!("CARGO_PKG_NAME"), + Some(env!("CARGO_PKG_VERSION")), + None, + ) +} diff --git a/crates/tower/src/metrics/duration.rs b/crates/tower/src/metrics/duration.rs new file mode 100644 index 00000000..508125c9 --- /dev/null +++ b/crates/tower/src/metrics/duration.rs @@ -0,0 +1,237 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; + +use opentelemetry::{metrics::Histogram, Context, KeyValue}; +use pin_project_lite::pin_project; +use tokio::time::Instant; +use tower::{Layer, Service}; + +use crate::{utils::FnWrapper, MetricsAttributes}; + +/// A [`Layer`] that records the duration of requests in milliseconds. +#[derive(Clone, Debug)] +pub struct DurationRecorderLayer { + histogram: Histogram, + on_request: OnRequest, + on_response: OnResponse, + on_error: OnError, +} + +impl DurationRecorderLayer { + /// Create a new [`DurationRecorderLayer`]. + #[must_use] + pub fn new(name: &'static str) -> Self { + let histogram = crate::meter().u64_histogram(name).init(); + Self { + histogram, + on_request: (), + on_response: (), + on_error: (), + } + } +} + +impl DurationRecorderLayer { + /// Set the [`MetricsAttributes`] to use on request. + #[must_use] + pub fn on_request( + self, + on_request: NewOnRequest, + ) -> DurationRecorderLayer { + DurationRecorderLayer { + histogram: self.histogram, + on_request, + on_response: self.on_response, + on_error: self.on_error, + } + } + + #[must_use] + pub fn on_request_fn( + self, + on_request: F, + ) -> DurationRecorderLayer, OnResponse, OnError> + where + F: Fn(&T) -> Vec, + { + self.on_request(FnWrapper(on_request)) + } + + /// Set the [`MetricsAttributes`] to use on response. + #[must_use] + pub fn on_response( + self, + on_response: NewOnResponse, + ) -> DurationRecorderLayer { + DurationRecorderLayer { + histogram: self.histogram, + on_request: self.on_request, + on_response, + on_error: self.on_error, + } + } + + #[must_use] + pub fn on_response_fn( + self, + on_response: F, + ) -> DurationRecorderLayer, OnError> + where + F: Fn(&T) -> Vec, + { + self.on_response(FnWrapper(on_response)) + } + + /// Set the [`MetricsAttributes`] to use on error. + #[must_use] + pub fn on_error( + self, + on_error: NewOnError, + ) -> DurationRecorderLayer { + DurationRecorderLayer { + histogram: self.histogram, + on_request: self.on_request, + on_response: self.on_response, + on_error, + } + } + + #[must_use] + pub fn on_error_fn( + self, + on_error: F, + ) -> DurationRecorderLayer> + where + F: Fn(&T) -> Vec, + { + self.on_error(FnWrapper(on_error)) + } +} + +impl Layer + for DurationRecorderLayer +where + OnRequest: Clone, + OnResponse: Clone, + OnError: Clone, +{ + type Service = DurationRecorderService; + + fn layer(&self, inner: S) -> Self::Service { + DurationRecorderService { + inner, + histogram: self.histogram.clone(), + on_request: self.on_request.clone(), + on_response: self.on_response.clone(), + on_error: self.on_error.clone(), + } + } +} + +/// A middleware that records the duration of requests in milliseconds. +#[derive(Clone, Debug)] +pub struct DurationRecorderService { + inner: S, + histogram: Histogram, + on_request: OnRequest, + on_response: OnResponse, + on_error: OnError, +} + +pin_project! { + /// The future returned by the [`DurationRecorderService`]. + pub struct DurationRecorderFuture { + #[pin] + inner: F, + + start: Instant, + histogram: Histogram, + attributes_from_request: Vec, + from_response: OnResponse, + from_error: OnError, + } +} + +impl Future for DurationRecorderFuture +where + F: Future>, + OnResponse: MetricsAttributes, + OnError: MetricsAttributes, +{ + type Output = F::Output; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + let result = std::task::ready!(this.inner.poll(cx)); + + // Measure the duration of the request. + let duration = this.start.elapsed(); + let duration_ms = duration.as_millis().try_into().unwrap_or(u64::MAX); + + // Collect the attributes from the request, response and error. + let mut attributes = this.attributes_from_request.clone(); + match &result { + Ok(response) => { + attributes.extend(this.from_response.attributes(response)); + } + Err(error) => { + attributes.extend(this.from_error.attributes(error)); + } + } + + this.histogram + .record(&Context::new(), duration_ms, &attributes); + std::task::Poll::Ready(result) + } +} + +impl Service + for DurationRecorderService +where + S: Service, + OnRequest: MetricsAttributes, + OnResponse: MetricsAttributes + Clone, + OnError: MetricsAttributes + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type Future = DurationRecorderFuture; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: R) -> Self::Future { + let start = Instant::now(); + let attributes_from_request = self.on_request.attributes(&request).collect(); + let inner = self.inner.call(request); + + DurationRecorderFuture { + inner, + start, + histogram: self.histogram.clone(), + attributes_from_request, + from_response: self.on_response.clone(), + from_error: self.on_error.clone(), + } + } +} diff --git a/crates/tower/src/metrics/in_flight.rs b/crates/tower/src/metrics/in_flight.rs new file mode 100644 index 00000000..57ed54df --- /dev/null +++ b/crates/tower/src/metrics/in_flight.rs @@ -0,0 +1,168 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; + +use opentelemetry::{ + metrics::{Unit, UpDownCounter}, + Context, KeyValue, +}; +use pin_project_lite::pin_project; +use tower::{Layer, Service}; + +use crate::MetricsAttributes; + +/// A [`Layer`] that records the number of in-flight requests. +/// +/// # Generic Parameters +/// +/// * `OnRequest`: A type that can extract attributes from a request. +#[derive(Clone, Debug)] +pub struct InFlightCounterLayer { + counter: UpDownCounter, + on_request: OnRequest, +} + +impl InFlightCounterLayer { + /// Create a new [`InFlightCounterLayer`]. + #[must_use] + pub fn new(name: &'static str) -> Self { + let counter = crate::meter() + .i64_up_down_counter(name) + .with_unit(Unit::new("ms")) + .with_description("The number of in-flight requests") + .init(); + + Self { + counter, + on_request: (), + } + } +} + +impl InFlightCounterLayer { + /// Set the [`MetricsAttributes`] to use. + #[must_use] + pub fn on_request(self, on_request: OnRequest) -> InFlightCounterLayer { + InFlightCounterLayer { + counter: self.counter, + on_request, + } + } +} + +impl Layer for InFlightCounterLayer +where + OnRequest: Clone, +{ + type Service = InFlightCounterService; + + fn layer(&self, inner: S) -> Self::Service { + InFlightCounterService { + inner, + counter: self.counter.clone(), + on_request: self.on_request.clone(), + } + } +} + +/// A middleware that records the number of in-flight requests. +/// +/// # Generic Parameters +/// +/// * `S`: The type of the inner service. +/// * `OnRequest`: A type that can extract attributes from a request. +#[derive(Clone, Debug)] +pub struct InFlightCounterService { + inner: S, + counter: UpDownCounter, + on_request: OnRequest, +} + +/// A guard that decrements the in-flight request count when dropped. +struct InFlightGuard { + counter: UpDownCounter, + attributes: Vec, +} + +impl InFlightGuard { + fn new(counter: UpDownCounter, attributes: Vec) -> Self { + counter.add(&Context::new(), 1, &attributes); + + Self { + counter, + attributes, + } + } +} + +impl Drop for InFlightGuard { + fn drop(&mut self) { + self.counter.add(&Context::new(), -1, &self.attributes); + } +} + +pin_project! { + /// The future returned by [`InFlightCounterService`] + pub struct InFlightFuture { + guard: InFlightGuard, + + #[pin] + inner: F, + } +} + +impl Future for InFlightFuture +where + F: Future, +{ + type Output = F::Output; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.project().inner.poll(cx) + } +} + +impl Service for InFlightCounterService +where + S: Service, + OnRequest: MetricsAttributes, +{ + type Response = S::Response; + type Error = S::Error; + type Future = InFlightFuture; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: R) -> Self::Future { + // Extract attributes from the request. + let attributes = self.on_request.attributes(&req).collect(); + + // Increment the in-flight request count. + let guard = InFlightGuard::new(self.counter.clone(), attributes); + + // Call the inner service, and return a future that decrements the in-flight + // when dropped. + let inner = self.inner.call(req); + InFlightFuture { guard, inner } + } +} diff --git a/crates/tower/src/metrics/make_attributes.rs b/crates/tower/src/metrics/make_attributes.rs new file mode 100644 index 00000000..ede398b1 --- /dev/null +++ b/crates/tower/src/metrics/make_attributes.rs @@ -0,0 +1,154 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use opentelemetry::{KeyValue, Value}; + +use crate::{utils::KV, FnWrapper}; + +/// Make metrics attributes from a type. +pub trait MetricsAttributes { + type Iter<'a>: Iterator + where + Self: 'a, + T: 'a; + + fn attributes<'a>(&'a self, t: &'a T) -> Self::Iter<'a>; +} + +pub fn metrics_attributes_fn(f: F) -> FnWrapper +where + F: Fn(&T) -> Vec + 'static, + T: 'static, +{ + FnWrapper(f) +} + +impl MetricsAttributes for FnWrapper +where + F: Fn(&T) -> Vec + 'static, + T: 'static, +{ + type Iter<'a> = std::vec::IntoIter; + + fn attributes<'a>(&'a self, t: &'a T) -> Self::Iter<'a> { + let values: Vec = self.0(t); + values.into_iter() + } +} + +impl MetricsAttributes for () +where + T: 'static, +{ + type Iter<'a> = std::iter::Empty; + + fn attributes(&self, _t: &T) -> Self::Iter<'_> { + std::iter::empty() + } +} + +impl MetricsAttributes for Vec +where + V: MetricsAttributes + 'static, + T: 'static, +{ + type Iter<'a> = Box + 'a>; + fn attributes<'a>(&'a self, t: &'a T) -> Self::Iter<'_> { + Box::new(self.iter().flat_map(|v| v.attributes(t))) + } +} + +impl MetricsAttributes for KV +where + V: Into + Clone + 'static, + T: 'static, +{ + type Iter<'a> = std::iter::Once; + fn attributes(&self, _t: &T) -> Self::Iter<'_> { + std::iter::once(KeyValue::new(self.0, self.1.clone().into())) + } +} + +impl MetricsAttributes for KeyValue +where + T: 'static, +{ + type Iter<'a> = std::iter::Once; + fn attributes(&self, _t: &T) -> Self::Iter<'_> { + std::iter::once(self.clone()) + } +} + +impl MetricsAttributes for Option +where + V: MetricsAttributes + 'static, + T: 'static, +{ + type Iter<'a> = std::iter::Flatten>>; + + fn attributes<'a>(&'a self, t: &'a T) -> Self::Iter<'_> { + self.as_ref().map(|v| v.attributes(t)).into_iter().flatten() + } +} + +macro_rules! chain_for { + // Sub-macro for reversing the list of types. + (@reverse ($( $reversed:ident ,)*)) => { + chain_for!(@build_chain $($reversed),*) + }; + (@reverse ($($reversed:ident,)*) $head:ident $(, $tail:ident)*) => { + chain_for!(@reverse ($head, $($reversed,)*) $($tail),*) + }; + + // Sub-macro for building the chain of iterators. + (@build_chain $last:ident) => { + $last::Iter<'a> + }; + (@build_chain $head:ident, $($tail:ident),*) => { + std::iter::Chain> + }; + + ($($idents:ident),+) => { + chain_for!(@reverse () $($idents),+) + }; +} + +macro_rules! impl_for_tuple { + ($first:ident $(,$rest:ident)*) => { + impl MetricsAttributes for ($first, $($rest,)*) + where + T: 'static, + $first: MetricsAttributes + 'static, + $($rest: MetricsAttributes + 'static,)* + { + type Iter<'a> = chain_for!($first $(, $rest)*); + fn attributes<'a>(&'a self, t: &'a T) -> Self::Iter<'a> { + #[allow(non_snake_case)] + let (head, $($rest,)*) = self; + head.attributes(t) + $(.chain($rest.attributes(t)))* + } + } + }; +} + +impl_for_tuple!(V1); +impl_for_tuple!(V1, V2); +impl_for_tuple!(V1, V2, V3); +impl_for_tuple!(V1, V2, V3, V4); +impl_for_tuple!(V1, V2, V3, V4, V5); +impl_for_tuple!(V1, V2, V3, V4, V5, V6); +impl_for_tuple!(V1, V2, V3, V4, V5, V6, V7); +impl_for_tuple!(V1, V2, V3, V4, V5, V6, V7, V8); +impl_for_tuple!(V1, V2, V3, V4, V5, V6, V7, V8, V9); diff --git a/crates/tower/src/metrics/mod.rs b/crates/tower/src/metrics/mod.rs new file mode 100644 index 00000000..072855b0 --- /dev/null +++ b/crates/tower/src/metrics/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod duration; +mod in_flight; +mod make_attributes; + +pub use self::{ + duration::{DurationRecorderFuture, DurationRecorderLayer, DurationRecorderService}, + in_flight::{InFlightCounterLayer, InFlightCounterService, InFlightFuture}, + make_attributes::{metrics_attributes_fn, MetricsAttributes}, +}; diff --git a/crates/tower/src/trace_context.rs b/crates/tower/src/trace_context.rs new file mode 100644 index 00000000..b41ae2c3 --- /dev/null +++ b/crates/tower/src/trace_context.rs @@ -0,0 +1,115 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use http::Request; +use opentelemetry::propagation::Injector; +use opentelemetry_http::HeaderInjector; +use tower::{Layer, Service}; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +/// A trait to get an [`Injector`] from a request. +trait AsInjector { + type Injector<'a>: Injector + where + Self: 'a; + + fn as_injector(&mut self) -> Self::Injector<'_>; +} + +impl AsInjector for Request { + type Injector<'a> = HeaderInjector<'a> where Self: 'a; + + fn as_injector(&mut self) -> Self::Injector<'_> { + HeaderInjector(self.headers_mut()) + } +} + +#[cfg(feature = "aws-sdk")] +impl AsInjector for aws_smithy_http::operation::Request { + type Injector<'a> = HeaderInjector<'a> where Self: 'a; + + fn as_injector(&mut self) -> Self::Injector<'_> { + HeaderInjector(self.http_mut().headers_mut()) + } +} + +/// A [`Layer`] that adds a trace context to the request. +#[derive(Debug, Clone, Copy, Default)] +pub struct TraceContextLayer { + _private: (), +} + +impl TraceContextLayer { + /// Create a new [`TraceContextLayer`]. + #[must_use] + pub fn new() -> Self { + Self::default() + } +} + +impl Layer for TraceContextLayer { + type Service = TraceContextService; + + fn layer(&self, inner: S) -> Self::Service { + TraceContextService::new(inner) + } +} + +/// A [`Service`] that adds a trace context to the request. +#[derive(Debug, Clone)] +pub struct TraceContextService { + inner: S, +} + +impl TraceContextService { + /// Create a new [`TraceContextService`]. + pub fn new(inner: S) -> Self { + Self { inner } + } +} + +impl Service for TraceContextService +where + S: Service, + R: AsInjector, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: R) -> Self::Future { + // Get the `opentelemetry` context out of the `tracing` span. + let context = Span::current().context(); + + // Inject the trace context into the request. The block is there to ensure that + // the injector is dropped before calling the inner service, to avoid borrowing + // issues. + { + let mut injector = req.as_injector(); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&context, &mut injector); + }); + } + + self.inner.call(req) + } +} diff --git a/crates/tower/src/tracing/enrich_span.rs b/crates/tower/src/tracing/enrich_span.rs new file mode 100644 index 00000000..732af30b --- /dev/null +++ b/crates/tower/src/tracing/enrich_span.rs @@ -0,0 +1,114 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tracing::{Span, Value}; + +use crate::utils::{FnWrapper, KV}; + +/// A trait for enriching a span with information a structure. +pub trait EnrichSpan { + fn enrich_span(&self, span: &Span, t: &T); +} + +impl EnrichSpan for FnWrapper +where + F: Fn(&Span, &T), +{ + fn enrich_span(&self, span: &Span, t: &T) { + (self.0)(span, t); + } +} + +/// Enrich span from a function. +#[must_use] +pub fn enrich_span_fn(f: F) -> FnWrapper +where + F: Fn(&Span, &T), +{ + FnWrapper(f) +} + +impl EnrichSpan for () { + fn enrich_span(&self, _span: &Span, _t: &T) {} +} + +impl EnrichSpan for KV +where + V: Value, +{ + fn enrich_span(&self, span: &Span, _t: &T) { + span.record(self.0, &self.1); + } +} + +/// A macro to implement [`EnrichSpan`] for a tuple of types that implement +/// [`EnrichSpan`]. +macro_rules! impl_for_tuple { + ($($T:ident),+) => { + impl EnrichSpan for ($($T,)+) + where + $($T: EnrichSpan),+ + { + fn enrich_span(&self, span: &Span, t: &T) { + #[allow(non_snake_case)] + let ($(ref $T,)+) = *self; + $( + $T.enrich_span(span, t); + )+ + } + } + }; +} + +impl_for_tuple!(T1); +impl_for_tuple!(T1, T2); +impl_for_tuple!(T1, T2, T3); +impl_for_tuple!(T1, T2, T3, T4); +impl_for_tuple!(T1, T2, T3, T4, T5); +impl_for_tuple!(T1, T2, T3, T4, T5, T6); +impl_for_tuple!(T1, T2, T3, T4, T5, T6, T7); +impl_for_tuple!(T1, T2, T3, T4, T5, T6, T7, T8); + +impl EnrichSpan for Option +where + T: EnrichSpan, +{ + fn enrich_span(&self, span: &Span, request: &R) { + if let Some(ref t) = *self { + t.enrich_span(span, request); + } + } +} + +impl EnrichSpan for [T; N] +where + T: EnrichSpan, +{ + fn enrich_span(&self, span: &Span, request: &R) { + for t in self { + t.enrich_span(span, request); + } + } +} + +impl EnrichSpan for Vec +where + T: EnrichSpan, +{ + fn enrich_span(&self, span: &Span, request: &R) { + for t in self { + t.enrich_span(span, request); + } + } +} diff --git a/crates/tower/src/tracing/future.rs b/crates/tower/src/tracing/future.rs new file mode 100644 index 00000000..492190e0 --- /dev/null +++ b/crates/tower/src/tracing/future.rs @@ -0,0 +1,70 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{future::Future, task::ready}; + +use pin_project_lite::pin_project; +use tracing::Span; + +pin_project! { + pub struct TraceFuture { + #[pin] + inner: F, + span: Span, + on_response: OnResponse, + on_error: OnError, + } +} + +impl TraceFuture { + pub fn new(inner: F, span: Span, on_response: OnResponse, on_error: OnError) -> Self { + Self { + inner, + span, + on_response, + on_error, + } + } +} + +impl Future for TraceFuture +where + F: Future>, + OnResponse: super::enrich_span::EnrichSpan, + OnError: super::enrich_span::EnrichSpan, +{ + type Output = Result; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + + // Poll the inner future, with the span entered. This is effectively what + // [`tracing::Instrumented`] does. + let result = ready!(this.span.in_scope(|| this.inner.poll(cx))); + + match &result { + Ok(response) => { + this.on_response.enrich_span(this.span, response); + } + Err(error) => { + this.on_error.enrich_span(this.span, error); + } + } + + std::task::Poll::Ready(result) + } +} diff --git a/crates/tower/src/tracing/layer.rs b/crates/tower/src/tracing/layer.rs new file mode 100644 index 00000000..ed57acf8 --- /dev/null +++ b/crates/tower/src/tracing/layer.rs @@ -0,0 +1,105 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tower::Layer; +use tracing::Span; + +use crate::{enrich_span_fn, make_span_fn, utils::FnWrapper}; + +#[derive(Clone, Debug)] +pub struct TraceLayer { + make_span: MakeSpan, + on_response: OnResponse, + on_error: OnError, +} + +impl TraceLayer> { + #[must_use] + pub fn from_fn(f: F) -> TraceLayer> + where + F: Fn(&T) -> Span, + { + TraceLayer::new(make_span_fn(f)) + } +} + +impl TraceLayer { + #[must_use] + pub fn new(make_span: MakeSpan) -> Self { + Self { + make_span, + on_response: (), + on_error: (), + } + } +} + +impl TraceLayer { + #[must_use] + pub fn on_response( + self, + on_response: NewOnResponse, + ) -> TraceLayer { + TraceLayer { + make_span: self.make_span, + on_response, + on_error: self.on_error, + } + } + + #[must_use] + pub fn on_response_fn(self, f: F) -> TraceLayer, OnError> + where + F: Fn(&Span, &T), + { + self.on_response(enrich_span_fn(f)) + } + + #[must_use] + pub fn on_error( + self, + on_error: NewOnError, + ) -> TraceLayer { + TraceLayer { + make_span: self.make_span, + on_response: self.on_response, + on_error, + } + } + + pub fn on_error_fn(self, f: F) -> TraceLayer> + where + F: Fn(&Span, &E), + { + self.on_error(enrich_span_fn(f)) + } +} + +impl Layer for TraceLayer +where + MakeSpan: Clone, + OnResponse: Clone, + OnError: Clone, +{ + type Service = super::service::TraceService; + + fn layer(&self, inner: S) -> Self::Service { + super::service::TraceService::new( + inner, + self.make_span.clone(), + self.on_response.clone(), + self.on_error.clone(), + ) + } +} diff --git a/crates/tower/src/tracing/make_span.rs b/crates/tower/src/tracing/make_span.rs new file mode 100644 index 00000000..2ebb55f8 --- /dev/null +++ b/crates/tower/src/tracing/make_span.rs @@ -0,0 +1,72 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tracing::Span; + +use super::enrich_span::EnrichSpan; +use crate::utils::FnWrapper; + +/// A trait for creating a span for a request. +pub trait MakeSpan { + fn make_span(&self, request: &R) -> Span; +} + +impl MakeSpan for FnWrapper +where + F: Fn(&R) -> Span, +{ + fn make_span(&self, request: &R) -> Span { + (self.0)(request) + } +} + +/// Make span from a function. +pub fn make_span_fn(f: F) -> FnWrapper +where + F: Fn(&R) -> Span, +{ + FnWrapper(f) +} + +/// A macro to implement [`MakeSpan`] for a tuple of types, where the first type +/// implements [`MakeSpan`] and the rest implement [`EnrichSpan`]. +macro_rules! impl_for_tuple { + (M, $($T:ident),+) => { + impl MakeSpan for (M, $($T),+) + where + M: MakeSpan, + $($T: EnrichSpan),+ + { + fn make_span(&self, request: &R) -> Span { + #[allow(non_snake_case)] + let (ref m, $(ref $T),+) = *self; + + let span = m.make_span(request); + $( + $T.enrich_span(&span, request); + )+ + span + } + } + }; +} + +impl_for_tuple!(M, T1); +impl_for_tuple!(M, T1, T2); +impl_for_tuple!(M, T1, T2, T3); +impl_for_tuple!(M, T1, T2, T3, T4); +impl_for_tuple!(M, T1, T2, T3, T4, T5); +impl_for_tuple!(M, T1, T2, T3, T4, T5, T6); +impl_for_tuple!(M, T1, T2, T3, T4, T5, T6, T7); +impl_for_tuple!(M, T1, T2, T3, T4, T5, T6, T7, T8); diff --git a/crates/tower/src/tracing/mod.rs b/crates/tower/src/tracing/mod.rs new file mode 100644 index 00000000..4be86517 --- /dev/null +++ b/crates/tower/src/tracing/mod.rs @@ -0,0 +1,27 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod enrich_span; +mod future; +mod layer; +mod make_span; +mod service; + +pub use self::{ + enrich_span::{enrich_span_fn, EnrichSpan}, + future::TraceFuture, + layer::TraceLayer, + make_span::{make_span_fn, MakeSpan}, + service::TraceService, +}; diff --git a/crates/tower/src/tracing/service.rs b/crates/tower/src/tracing/service.rs new file mode 100644 index 00000000..2385e235 --- /dev/null +++ b/crates/tower/src/tracing/service.rs @@ -0,0 +1,67 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tower::Service; + +use super::future::TraceFuture; + +#[derive(Clone, Debug)] +pub struct TraceService { + inner: S, + make_span: MakeSpan, + on_response: OnResponse, + on_error: OnError, +} + +impl TraceService { + /// Create a new [`TraceService`]. + #[must_use] + pub fn new(inner: S, make_span: MakeSpan, on_response: OnResponse, on_error: OnError) -> Self { + Self { + inner, + make_span, + on_response, + on_error, + } + } +} + +impl Service + for TraceService +where + S: Service, + MakeSpan: super::make_span::MakeSpan, + OnResponse: super::enrich_span::EnrichSpan + Clone, + OnError: super::enrich_span::EnrichSpan + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type Future = TraceFuture; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: R) -> Self::Future { + let span = self.make_span.make_span(&request); + let guard = span.enter(); + let inner = self.inner.call(request); + drop(guard); + + TraceFuture::new(inner, span, self.on_response.clone(), self.on_error.clone()) + } +} diff --git a/crates/tower/src/utils.rs b/crates/tower/src/utils.rs new file mode 100644 index 00000000..91116244 --- /dev/null +++ b/crates/tower/src/utils.rs @@ -0,0 +1,33 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use opentelemetry::{KeyValue, Value}; + +/// A simple static key-value pair. +#[derive(Clone, Debug)] +pub struct KV(pub &'static str, pub V); + +impl From> for KeyValue +where + V: Into, +{ + fn from(value: KV) -> Self { + Self::new(value.0, value.1.into()) + } +} + +/// A wrapper around a function that can be used to generate a key-value pair, +/// make or enrich spans. +#[derive(Clone, Debug)] +pub struct FnWrapper(pub F);