diff --git a/Cargo.lock b/Cargo.lock index 05d39e43..c780b29a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3039,12 +3039,13 @@ dependencies = [ "mas-templates", "mas-tower", "oauth2-types", - "opentelemetry 0.20.0", - "opentelemetry-http 0.9.0", + "opentelemetry", + "opentelemetry-http", "opentelemetry-jaeger", "opentelemetry-otlp", "opentelemetry-prometheus", - "opentelemetry-semantic-conventions 0.12.0", + "opentelemetry-semantic-conventions", + "opentelemetry-stdout", "opentelemetry-zipkin", "prometheus", "rand 0.8.5", @@ -3192,7 +3193,8 @@ dependencies = [ "mas-templates", "mime", "oauth2-types", - "opentelemetry 0.20.0", + "opentelemetry", + "opentelemetry-semantic-conventions", "pbkdf2", "rand 0.8.5", "rand_chacha 0.3.1", @@ -3231,7 +3233,7 @@ dependencies = [ "hyper-rustls", "mas-tower", "once_cell", - "opentelemetry 0.20.0", + "opentelemetry", "rustls", "rustls-native-certs", "serde", @@ -3472,7 +3474,7 @@ dependencies = [ "mas-iana", "mas-jose", "oauth2-types", - "opentelemetry 0.20.0", + "opentelemetry", "rand_core 0.6.4", "serde", "serde_json", @@ -3527,7 +3529,7 @@ dependencies = [ "mas-storage", "mas-storage-pg", "mas-tower", - "opentelemetry 0.20.0", + "opentelemetry", "rand 0.8.5", "rand_chacha 0.3.1", "serde", @@ -3572,8 +3574,9 @@ version = "0.1.0" dependencies = [ "aws-smithy-http", "http", - "opentelemetry 0.20.0", - "opentelemetry-http 0.9.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-semantic-conventions", "pin-project-lite", "tokio", "tower", @@ -3876,36 +3879,14 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "opentelemetry" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" -dependencies = [ - "opentelemetry_api 0.19.0", - "opentelemetry_sdk 0.19.0", -] - [[package]] name = "opentelemetry" version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" dependencies = [ - "opentelemetry_api 0.20.0", - "opentelemetry_sdk 0.20.0", -] - -[[package]] -name = "opentelemetry-http" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a819b71d6530c4297b49b3cae2939ab3a8cc1b9f382826a1bc29dd0ca3864906" -dependencies = [ - "async-trait", - "bytes 1.4.0", - "http", - "opentelemetry_api 0.19.0", + "opentelemetry_api", + "opentelemetry_sdk", ] [[package]] @@ -3918,7 +3899,7 @@ dependencies = [ "bytes 1.4.0", "http", "hyper", - "opentelemetry_api 0.20.0", + "opentelemetry_api", "tokio", ] @@ -3932,9 +3913,9 @@ dependencies = [ "futures-core", "futures-util", "http", - "opentelemetry 0.20.0", - "opentelemetry-http 0.9.0", - "opentelemetry-semantic-conventions 0.12.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-semantic-conventions", "thrift", "tokio", ] @@ -3949,9 +3930,9 @@ dependencies = [ "futures-core", "http", "opentelemetry-proto", - "opentelemetry-semantic-conventions 0.12.0", - "opentelemetry_api 0.20.0", - "opentelemetry_sdk 0.20.0", + "opentelemetry-semantic-conventions", + "opentelemetry_api", + "opentelemetry_sdk", "prost", "thiserror", "tokio", @@ -3965,8 +3946,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7d81bc254e2d572120363a2b16cdb0d715d301b5789be0cfc26ad87e4e10e53" dependencies = [ "once_cell", - "opentelemetry_api 0.20.0", - "opentelemetry_sdk 0.20.0", + "opentelemetry_api", + "opentelemetry_sdk", "prometheus", "protobuf", ] @@ -3977,64 +3958,55 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb" dependencies = [ - "opentelemetry_api 0.20.0", - "opentelemetry_sdk 0.20.0", + "opentelemetry_api", + "opentelemetry_sdk", "prost", "tonic", ] -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e33428e6bf08c6f7fcea4ddb8e358fab0fe48ab877a87c70c6ebe20f673ce5" -dependencies = [ - "opentelemetry 0.19.0", -] - [[package]] name = "opentelemetry-semantic-conventions" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269" dependencies = [ - "opentelemetry 0.20.0", + "opentelemetry", +] + +[[package]] +name = "opentelemetry-stdout" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bd550321bc0f9d3f6dcbfe5c75262789de5b3e2776da2cbcfd2392aa05db0c6" +dependencies = [ + "async-trait", + "futures-util", + "opentelemetry_api", + "opentelemetry_sdk", + "ordered-float 3.7.0", + "serde", + "serde_json", ] [[package]] name = "opentelemetry-zipkin" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1fd48caee5e1db71454c95be32d1daeb6fae265321ff8f51b1efc8a50b0be80" +checksum = "eb966f01235207a6933c0aec98374fe9782df1c1d2b3d1db35c458451d138143" dependencies = [ "async-trait", "futures-core", "http", "once_cell", - "opentelemetry 0.19.0", - "opentelemetry-http 0.8.0", - "opentelemetry-semantic-conventions 0.11.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-semantic-conventions", "serde", "serde_json", "thiserror", "typed-builder", ] -[[package]] -name = "opentelemetry_api" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" -dependencies = [ - "futures-channel", - "futures-util", - "indexmap 1.9.3", - "once_cell", - "pin-project-lite", - "thiserror", - "urlencoding", -] - [[package]] name = "opentelemetry_api" version = "0.20.0" @@ -4051,24 +4023,6 @@ dependencies = [ "urlencoding", ] -[[package]] -name = "opentelemetry_sdk" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" -dependencies = [ - "async-trait", - "crossbeam-channel", - "futures-channel", - "futures-executor", - "futures-util", - "once_cell", - "opentelemetry_api 0.19.0", - "percent-encoding", - "rand 0.8.5", - "thiserror", -] - [[package]] name = "opentelemetry_sdk" version = "0.20.0" @@ -4081,7 +4035,7 @@ dependencies = [ "futures-executor", "futures-util", "once_cell", - "opentelemetry_api 0.20.0", + "opentelemetry_api", "ordered-float 3.7.0", "percent-encoding", "rand 0.8.5", @@ -6353,7 +6307,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc09e402904a5261e42cf27aea09ccb7d5318c6717a9eec3d8e2e65c56b18f19" dependencies = [ "once_cell", - "opentelemetry 0.20.0", + "opentelemetry", "tracing", "tracing-core", "tracing-log", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index a5327ca5..59d17cf4 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -34,12 +34,13 @@ tracing-appender = "0.2.2" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } tracing-opentelemetry = "0.20.0" opentelemetry = { version = "0.20.0", features = ["trace", "metrics", "rt-tokio"] } -opentelemetry-semantic-conventions = "0.12.0" +opentelemetry-http = { version = "0.9.0", features = ["tokio", "hyper"] } opentelemetry-jaeger = { version = "0.19.0", features = ["rt-tokio", "collector_client"] } opentelemetry-otlp = { version = "0.13.0", features = ["trace", "metrics"] } -opentelemetry-zipkin = { version = "0.17.0", features = ["opentelemetry-http"], default-features = false } -opentelemetry-http = { version = "0.9.0", features = ["tokio", "hyper"] } -opentelemetry-prometheus = { version = "0.13.0" } +opentelemetry-prometheus = "0.13.0" +opentelemetry-semantic-conventions = "0.12.0" +opentelemetry-stdout = { version = "0.1.0", features = ["trace", "metrics"] } +opentelemetry-zipkin = { version = "0.18.0", default-features = false } prometheus = "0.13.3" sentry = { version = "0.31.5", default-features = false, features = ["backtrace", "contexts", "panic", "tower"] } sentry-tracing = "0.31.5" diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 7a0696ba..09cf9913 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -100,7 +100,7 @@ async fn try_main() -> anyhow::Result<()> { }); // Setup OpenTelemetry tracing and metrics - let (tracer, _meter) = telemetry::setup(&telemetry_config) + let tracer = telemetry::setup(&telemetry_config) .await .context("failed to setup OpenTelemetry")?; diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index 92deb976..e32e9a5b 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::{ - borrow::Cow, future::ready, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener, ToSocketAddrs}, os::unix::net::UnixListener, @@ -27,7 +26,7 @@ use axum::{ Extension, Router, }; use hyper::{ - header::{HeaderValue, CACHE_CONTROL}, + header::{HeaderValue, CACHE_CONTROL, USER_AGENT}, Method, Request, Response, StatusCode, Version, }; use listenfd::ListenFd; @@ -40,10 +39,11 @@ use mas_tower::{ make_span_fn, metrics_attributes_fn, DurationRecorderLayer, InFlightCounterLayer, TraceLayer, KV, }; -use opentelemetry::{Key, KeyValue}; +use opentelemetry::{trace::TraceContextExt, Key, KeyValue}; use opentelemetry_http::HeaderExtractor; use opentelemetry_semantic_conventions::trace::{ - HTTP_METHOD, HTTP_ROUTE, HTTP_SCHEME, HTTP_STATUS_CODE, + HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE, NETWORK_PROTOCOL_NAME, + NETWORK_PROTOCOL_VERSION, URL_SCHEME, }; use rustls::ServerConfig; use sentry_tower::{NewSentryLayer, SentryHttpLayer}; @@ -52,35 +52,33 @@ use tower_http::{services::ServeDir, set_header::SetResponseHeaderLayer}; use tracing::{warn, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -const NET_PROTOCOL_NAME: Key = Key::from_static_str("net.protocol.name"); -const NET_PROTOCOL_VERSION: Key = Key::from_static_str("net.protocol.version"); const MAS_LISTENER_NAME: Key = Key::from_static_str("mas.listener.name"); #[inline] -fn otel_http_method(request: &Request) -> Cow<'static, str> { +fn otel_http_method(request: &Request) -> &'static str { match request.method() { - &Method::OPTIONS => "OPTIONS".into(), - &Method::GET => "GET".into(), - &Method::POST => "POST".into(), - &Method::PUT => "PUT".into(), - &Method::DELETE => "DELETE".into(), - &Method::HEAD => "HEAD".into(), - &Method::TRACE => "TRACE".into(), - &Method::CONNECT => "CONNECT".into(), - &Method::PATCH => "PATCH".into(), - other => other.to_string().into(), + &Method::OPTIONS => "OPTIONS", + &Method::GET => "GET", + &Method::POST => "POST", + &Method::PUT => "PUT", + &Method::DELETE => "DELETE", + &Method::HEAD => "HEAD", + &Method::TRACE => "TRACE", + &Method::CONNECT => "CONNECT", + &Method::PATCH => "PATCH", + _other => "_OTHER", } } #[inline] -fn otel_net_protocol_version(request: &Request) -> Cow<'static, str> { +fn otel_net_protocol_version(request: &Request) -> &'static str { match request.version() { - Version::HTTP_09 => "0.9".into(), - Version::HTTP_10 => "1.0".into(), - Version::HTTP_11 => "1.1".into(), - Version::HTTP_2 => "2.0".into(), - Version::HTTP_3 => "3.0".into(), - other => format!("{other:?}").into(), + Version::HTTP_09 => "0.9", + Version::HTTP_10 => "1.0", + Version::HTTP_11 => "1.1", + Version::HTTP_2 => "2.0", + Version::HTTP_3 => "3.0", + _other => "_OTHER", } } @@ -91,11 +89,7 @@ fn otel_http_route(request: &Request) -> Option<&str> { .map(MatchedPath::as_str) } -fn otel_http_target(request: &Request) -> &str { - request.uri().path_and_query().map_or("", |p| p.as_str()) -} - -fn otel_http_scheme(request: &Request) -> &'static str { +fn otel_url_scheme(request: &Request) -> &'static str { // XXX: maybe we should panic if the connection info was not injected in the // request extensions request @@ -117,7 +111,7 @@ fn make_http_span(req: &Request) -> Span { let span_name = if let Some(route) = route.as_ref() { format!("{method} {route}") } else { - format!("{method}") + method.to_owned() }; let span = tracing::info_span!( @@ -125,42 +119,60 @@ fn make_http_span(req: &Request) -> Span { "otel.kind" = "server", "otel.name" = span_name, "otel.status_code" = tracing::field::Empty, - "net.protocol.name" = "http", - "net.protocol.version" = otel_net_protocol_version(req).as_ref(), - "http.scheme" = otel_http_scheme(req), - "http.method" = method.as_ref(), + "network.protocol.name" = "http", + "network.protocol.version" = otel_net_protocol_version(req), + "http.method" = method, "http.route" = tracing::field::Empty, - "http.target" = otel_http_target(req), - "http.status_code" = tracing::field::Empty, + "http.response.status_code" = tracing::field::Empty, + "url.path" = req.uri().path(), + "url.query" = tracing::field::Empty, + "url.scheme" = otel_url_scheme(req), + "user_agent.original" = tracing::field::Empty, ); if let Some(route) = route.as_ref() { span.record("http.route", route); } + if let Some(query) = req.uri().query() { + span.record("url.query", query); + } + + if let Some(user_agent) = req.headers().get(USER_AGENT) { + span.record( + "user_agent.original", + user_agent.to_str().unwrap_or("INVALID"), + ); + } + // Extract the parent span context from the request headers let parent_context = opentelemetry::global::get_text_map_propagator(|propagator| { let extractor = HeaderExtractor(req.headers()); let context = opentelemetry::Context::new(); propagator.extract_with_context(&context, &extractor) }); - span.set_parent(parent_context); + + if parent_context.span().span_context().is_remote() { + // For now, set_parent is broken, so in the meantime we're using add_link + // instead + span.add_link(parent_context.span().span_context().clone()); + } span } fn on_http_request_labels(request: &Request) -> Vec { vec![ - NET_PROTOCOL_NAME.string("http"), - NET_PROTOCOL_VERSION.string(otel_net_protocol_version(request)), - HTTP_METHOD.string(otel_http_method(request)), - HTTP_SCHEME.string(otel_http_scheme(request).as_ref()), + NETWORK_PROTOCOL_NAME.string("http"), + NETWORK_PROTOCOL_VERSION.string(otel_net_protocol_version(request)), + HTTP_REQUEST_METHOD.string(otel_http_method(request)), HTTP_ROUTE.string(otel_http_route(request).unwrap_or("FALLBACK").to_owned()), + URL_SCHEME.string(otel_url_scheme(request).as_ref()), ] } fn on_http_response_labels(res: &Response) -> Vec { - vec![HTTP_STATUS_CODE.i64(res.status().as_u16().into())] + vec![HTTP_RESPONSE_STATUS_CODE.i64(res.status().as_u16().into())] } pub fn build_router( @@ -259,7 +271,7 @@ where )) .on_response_fn(|span: &Span, response: &Response<_>| { let status_code = response.status().as_u16(); - span.record("http.status_code", status_code); + span.record("http.response.status_code", status_code); span.record("otel.status_code", "OK"); }), ) diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs index c06adf45..2bf72290 100644 --- a/crates/cli/src/telemetry.rs +++ b/crates/cli/src/telemetry.rs @@ -25,26 +25,29 @@ use opentelemetry::{ propagation::TextMapPropagator, sdk::{ self, - metrics::controllers::BasicController, + metrics::{ + reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, + MeterProvider, PeriodicReader, + }, propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, - trace::{Sampler, Tracer}, + trace::{Sampler, Tracer, TracerProvider}, Resource, }, - Context, + trace::TracerProvider as _, }; use opentelemetry_jaeger::Propagator as JaegerPropagator; +use opentelemetry_otlp::MetricsExporterBuilder; use opentelemetry_prometheus::PrometheusExporter; use opentelemetry_semantic_conventions as semcov; use opentelemetry_zipkin::{B3Encoding, Propagator as ZipkinPropagator}; +use prometheus::Registry; use tokio::sync::OnceCell; use url::Url; -static METRICS_BASIC_CONTROLLER: OnceCell = OnceCell::const_new(); -static PROMETHEUS_EXPORTER: OnceCell = OnceCell::const_new(); +static METER_PROVIDER: OnceCell = OnceCell::const_new(); +static PROMETHEUS_REGISTRY: OnceCell = OnceCell::const_new(); -pub async fn setup( - config: &TelemetryConfig, -) -> anyhow::Result<(Option, Option)> { +pub async fn setup(config: &TelemetryConfig) -> anyhow::Result> { global::set_error_handler(|e| tracing::error!("{}", e))?; let propagator = propagator(&config.tracing.propagators); @@ -57,20 +60,16 @@ pub async fn setup( .await .context("Failed to configure traces exporter")?; - let meter = meter(&config.metrics.exporter).context("Failed to configure metrics exporter")?; - if let Some(meter) = meter.as_ref() { - METRICS_BASIC_CONTROLLER.set(meter.clone())?; - } + init_meter(&config.metrics.exporter).context("Failed to configure metrics exporter")?; - Ok((tracer, meter)) + Ok(tracer) } pub fn shutdown() { global::shutdown_tracer_provider(); - if let Some(controller) = METRICS_BASIC_CONTROLLER.get() { - let cx = Context::new(); - controller.stop(&cx).unwrap(); + if let Some(meter_provider) = METER_PROVIDER.get() { + meter_provider.shutdown().unwrap(); } } @@ -100,11 +99,11 @@ async fn http_client() -> anyhow::Result Tracer { - sdk::export::trace::stdout::new_pipeline() - .with_pretty_print(true) - .with_trace_config(trace_config()) - .install_simple() +fn stdout_tracer_provider() -> TracerProvider { + let exporter = opentelemetry_stdout::SpanExporter::default(); + TracerProvider::builder() + .with_simple_exporter(exporter) + .build() } fn otlp_tracer(endpoint: Option<&Url>) -> anyhow::Result { @@ -125,24 +124,24 @@ fn otlp_tracer(endpoint: Option<&Url>) -> anyhow::Result { Ok(tracer) } -fn jaeger_agent_tracer(host: &str, port: u16) -> anyhow::Result { +fn jaeger_agent_tracer_provider(host: &str, port: u16) -> anyhow::Result { let pipeline = opentelemetry_jaeger::new_agent_pipeline() .with_service_name(env!("CARGO_PKG_NAME")) .with_trace_config(trace_config()) .with_endpoint((host, port)); - let tracer = pipeline - .install_batch(opentelemetry::runtime::Tokio) + let tracer_provider = pipeline + .build_batch(opentelemetry::runtime::Tokio) .context("Failed to configure Jaeger agent exporter")?; - Ok(tracer) + Ok(tracer_provider) } -async fn jaeger_collector_tracer( +async fn jaeger_collector_tracer_provider( endpoint: &str, username: Option<&str>, password: Option<&str>, -) -> anyhow::Result { +) -> anyhow::Result { let http_client = http_client().await?; let mut pipeline = opentelemetry_jaeger::new_collector_pipeline() .with_service_name(env!("CARGO_PKG_NAME")) @@ -158,11 +157,11 @@ async fn jaeger_collector_tracer( pipeline = pipeline.with_password(password); } - let tracer = pipeline - .install_batch(opentelemetry::runtime::Tokio) + let tracer_provider = pipeline + .build_batch(opentelemetry::runtime::Tokio) .context("Failed to configure Jaeger collector exporter")?; - Ok(tracer) + Ok(tracer_provider) } async fn zipkin_tracer(collector_endpoint: &Option) -> anyhow::Result { @@ -185,28 +184,43 @@ async fn zipkin_tracer(collector_endpoint: &Option) -> anyhow::Result anyhow::Result> { - let tracer = match config { + let tracer_provider = match config { TracingExporterConfig::None => return Ok(None), - TracingExporterConfig::Stdout => stdout_tracer(), - TracingExporterConfig::Otlp { endpoint } => otlp_tracer(endpoint.as_ref())?, + TracingExporterConfig::Stdout => stdout_tracer_provider(), + TracingExporterConfig::Otlp { endpoint } => { + // The OTLP exporter already creates a tracer and installs it + return Ok(Some(otlp_tracer(endpoint.as_ref())?)); + } TracingExporterConfig::Jaeger(JaegerExporterProtocolConfig::UdpThriftCompact { agent_host, agent_port, - }) => jaeger_agent_tracer(agent_host, *agent_port)?, + }) => jaeger_agent_tracer_provider(agent_host, *agent_port)?, TracingExporterConfig::Jaeger(JaegerExporterProtocolConfig::HttpThriftBinary { endpoint, username, password, - }) => jaeger_collector_tracer(endpoint, username.as_deref(), password.as_deref()).await?, + }) => { + jaeger_collector_tracer_provider(endpoint, username.as_deref(), password.as_deref()) + .await? + } TracingExporterConfig::Zipkin { collector_endpoint } => { - zipkin_tracer(collector_endpoint).await? + // The Zipkin exporter already creates a tracer and installs it + return Ok(Some(zipkin_tracer(collector_endpoint).await?)); } }; + let tracer = tracer_provider.versioned_tracer( + env!("CARGO_PKG_NAME"), + Some(env!("CARGO_PKG_VERSION")), + Some(semcov::SCHEMA_URL), + None, + ); + global::set_tracer_provider(tracer_provider); + Ok(Some(tracer)) } -fn otlp_meter(endpoint: Option<&url::Url>) -> anyhow::Result { +fn otlp_metric_reader(endpoint: Option<&url::Url>) -> anyhow::Result { use opentelemetry_otlp::WithExportConfig; let mut exporter = opentelemetry_otlp::new_exporter().tonic(); @@ -214,35 +228,17 @@ fn otlp_meter(endpoint: Option<&url::Url>) -> anyhow::Result { exporter = exporter.with_endpoint(endpoint.to_string()); } - let controller = opentelemetry_otlp::new_pipeline() - .metrics( - sdk::metrics::selectors::simple::inexpensive(), - sdk::export::metrics::aggregation::cumulative_temporality_selector(), - opentelemetry::runtime::Tokio, - ) - .with_resource(resource()) - .with_exporter(exporter) - .build() - .context("Failed to configure OTLP metrics exporter")?; + let exporter = MetricsExporterBuilder::from(exporter).build_metrics_exporter( + Box::new(DefaultTemporalitySelector::new()), + Box::new(DefaultAggregationSelector::new()), + )?; - Ok(controller) + Ok(PeriodicReader::builder(exporter, opentelemetry::runtime::Tokio).build()) } -fn stdout_meter() -> anyhow::Result { - let exporter = sdk::export::metrics::stdout().build()?; - let controller = sdk::metrics::controllers::basic(sdk::metrics::processors::factory( - sdk::metrics::selectors::simple::inexpensive(), - exporter.temporality_selector(), - )) - .with_resource(resource()) - .with_exporter(exporter) - .build(); - - let cx = Context::new(); - controller.start(&cx, opentelemetry::runtime::Tokio)?; - - global::set_meter_provider(controller.clone()); - Ok(controller) +fn stdout_metric_reader() -> PeriodicReader { + let exporter = opentelemetry_stdout::MetricsExporter::default(); + PeriodicReader::builder(exporter, opentelemetry::runtime::Tokio).build() } pub fn prometheus_service() -> tower::util::ServiceFn< @@ -250,15 +246,15 @@ pub fn prometheus_service() -> tower::util::ServiceFn< > { use prometheus::{Encoder, TextEncoder}; - if !PROMETHEUS_EXPORTER.initialized() { + if !PROMETHEUS_REGISTRY.initialized() { tracing::warn!("A Prometheus resource was mounted on a listener, but the Prometheus exporter was not setup in the config"); } tower::service_fn(move |_req| { - let response = if let Some(exporter) = PROMETHEUS_EXPORTER.get() { + let response = if let Some(registry) = PROMETHEUS_REGISTRY.get() { let mut buffer = vec![]; let encoder = TextEncoder::new(); - let metric_families = exporter.registry().gather(); + let metric_families = registry.gather(); // That shouldn't panic, unless we're constructing invalid labels encoder.encode(&metric_families, &mut buffer).unwrap(); @@ -280,32 +276,41 @@ pub fn prometheus_service() -> tower::util::ServiceFn< }) } -fn prometheus_meter() -> anyhow::Result { - let controller = sdk::metrics::controllers::basic(sdk::metrics::processors::factory( - // All histogram metrics are in milliseconds. Each bucket is ~2x the previous one. - sdk::metrics::selectors::simple::histogram([ - 1.0, 3.0, 5.0, 10.0, 30.0, 50.0, 100.0, 300.0, 1000.0, - ]), - sdk::export::metrics::aggregation::cumulative_temporality_selector(), - )) - .with_resource(resource()) - .build(); +fn prometheus_metric_reader() -> anyhow::Result { + let registry = Registry::new(); + PROMETHEUS_REGISTRY.set(registry.clone())?; - let exporter = opentelemetry_prometheus::exporter(controller.clone()).try_init()?; - PROMETHEUS_EXPORTER.set(exporter)?; + let exporter = opentelemetry_prometheus::exporter() + .with_registry(registry) + .without_scope_info() + .build()?; - Ok(controller) + Ok(exporter) } -fn meter(config: &MetricsExporterConfig) -> anyhow::Result> { - let controller = match config { - MetricsExporterConfig::None => None, - MetricsExporterConfig::Stdout => Some(stdout_meter()?), - MetricsExporterConfig::Otlp { endpoint } => Some(otlp_meter(endpoint.as_ref())?), - MetricsExporterConfig::Prometheus => Some(prometheus_meter()?), +fn init_meter(config: &MetricsExporterConfig) -> anyhow::Result<()> { + let mut meter_provider_builder = MeterProvider::builder(); + match config { + MetricsExporterConfig::None => {} + MetricsExporterConfig::Stdout => { + meter_provider_builder = meter_provider_builder.with_reader(stdout_metric_reader()); + } + MetricsExporterConfig::Otlp { endpoint } => { + meter_provider_builder = + meter_provider_builder.with_reader(otlp_metric_reader(endpoint.as_ref())?); + } + MetricsExporterConfig::Prometheus => { + meter_provider_builder = + meter_provider_builder.with_reader(prometheus_metric_reader()?); + } }; - Ok(controller) + let meter_provider = meter_provider_builder.with_resource(resource()).build(); + + METER_PROVIDER.set(meter_provider.clone())?; + global::set_meter_provider(meter_provider.clone()); + + Ok(()) } fn trace_config() -> sdk::trace::Config { @@ -326,6 +331,7 @@ fn resource() -> Resource { Box::new(sdk::resource::EnvResourceDetector::new()), Box::new(sdk::resource::OsResourceDetector), Box::new(sdk::resource::ProcessResourceDetector), + Box::new(sdk::resource::TelemetryResourceDetector), ], ); diff --git a/crates/handlers/Cargo.toml b/crates/handlers/Cargo.toml index 845539bf..5d94aaf6 100644 --- a/crates/handlers/Cargo.toml +++ b/crates/handlers/Cargo.toml @@ -13,6 +13,7 @@ futures-util = "0.3.28" # Logging and tracing tracing = "0.1.37" opentelemetry = "0.20.0" +opentelemetry-semantic-conventions = "0.12.0" # Error management thiserror = "1.0.44" diff --git a/crates/handlers/src/app_state.rs b/crates/handlers/src/app_state.rs index 2594c0b7..c71467dd 100644 --- a/crates/handlers/src/app_state.rs +++ b/crates/handlers/src/app_state.rs @@ -29,7 +29,7 @@ use mas_storage_pg::PgRepository; use mas_templates::Templates; use opentelemetry::{ metrics::{Histogram, MetricsError, Unit}, - Context, KeyValue, + KeyValue, }; use rand::SeedableRng; use sqlx::PgPool; @@ -60,7 +60,12 @@ impl AppState { /// Returns an error if the metrics could not be initialized. pub fn init_metrics(&mut self) -> Result<(), MetricsError> { // XXX: do we want to put that somewhere else? - let meter = opentelemetry::global::meter("mas-handlers"); + let meter = opentelemetry::global::meter_with_version( + env!("CARGO_PKG_NAME"), + Some(env!("CARGO_PKG_VERSION")), + Some(opentelemetry_semantic_conventions::SCHEMA_URL), + None, + ); let pool = self.pool.clone(); let usage = meter .i64_observable_up_down_counter("db.connections.usage") @@ -75,13 +80,13 @@ impl AppState { .init(); // Observe the number of active and idle connections in the pool - meter.register_callback(move |cx| { + meter.register_callback(&[usage.as_any(), max.as_any()], move |observer| { let idle = u32::try_from(pool.num_idle()).unwrap_or(u32::MAX); let used = pool.size() - idle; let max_conn = pool.options().get_max_connections(); - usage.observe(cx, i64::from(idle), &[KeyValue::new("state", "idle")]); - usage.observe(cx, i64::from(used), &[KeyValue::new("state", "used")]); - max.observe(cx, i64::from(max_conn), &[]); + observer.observe_i64(&usage, i64::from(idle), &[KeyValue::new("state", "idle")]); + observer.observe_i64(&usage, i64::from(used), &[KeyValue::new("state", "used")]); + observer.observe_i64(&max, i64::from(max_conn), &[]); })?; // Track the connection acquisition time @@ -212,7 +217,7 @@ impl FromRequestParts for BoxRepository { let duration_ms = duration.as_millis().try_into().unwrap_or(u64::MAX); if let Some(histogram) = &state.conn_acquisition_histogram { - histogram.record(&Context::new(), duration_ms, &[]); + histogram.record(duration_ms, &[]); } Ok(repo diff --git a/crates/handlers/src/views/index.rs b/crates/handlers/src/views/index.rs index 4185f004..4cdb3182 100644 --- a/crates/handlers/src/views/index.rs +++ b/crates/handlers/src/views/index.rs @@ -42,5 +42,7 @@ pub async fn get( let content = templates.render_index(&ctx).await?; + tracing::info!("rendered index page"); + Ok((cookie_jar, Html(content))) } diff --git a/crates/tower/Cargo.toml b/crates/tower/Cargo.toml index 9bf58b41..c2968d3d 100644 --- a/crates/tower/Cargo.toml +++ b/crates/tower/Cargo.toml @@ -14,6 +14,7 @@ tower = "0.4.13" tokio = { version = "1.30.0", features = ["time"] } opentelemetry = { version = "0.20.0", features = ["metrics"] } opentelemetry-http = "0.9.0" +opentelemetry-semantic-conventions = "0.12.0" pin-project-lite = "0.2.12" [features] diff --git a/crates/tower/src/lib.rs b/crates/tower/src/lib.rs index 78ab22c5..0883a2a8 100644 --- a/crates/tower/src/lib.rs +++ b/crates/tower/src/lib.rs @@ -27,6 +27,7 @@ fn meter() -> opentelemetry::metrics::Meter { opentelemetry::global::meter_with_version( env!("CARGO_PKG_NAME"), Some(env!("CARGO_PKG_VERSION")), + Some(opentelemetry_semantic_conventions::SCHEMA_URL), None, ) } diff --git a/crates/tower/src/metrics/duration.rs b/crates/tower/src/metrics/duration.rs index 508125c9..f6f80f0e 100644 --- a/crates/tower/src/metrics/duration.rs +++ b/crates/tower/src/metrics/duration.rs @@ -14,7 +14,7 @@ use std::future::Future; -use opentelemetry::{metrics::Histogram, Context, KeyValue}; +use opentelemetry::{metrics::Histogram, KeyValue}; use pin_project_lite::pin_project; use tokio::time::Instant; use tower::{Layer, Service}; @@ -195,8 +195,7 @@ where } } - this.histogram - .record(&Context::new(), duration_ms, &attributes); + this.histogram.record(duration_ms, &attributes); std::task::Poll::Ready(result) } } diff --git a/crates/tower/src/metrics/in_flight.rs b/crates/tower/src/metrics/in_flight.rs index 57ed54df..82b3c90f 100644 --- a/crates/tower/src/metrics/in_flight.rs +++ b/crates/tower/src/metrics/in_flight.rs @@ -16,7 +16,7 @@ use std::future::Future; use opentelemetry::{ metrics::{Unit, UpDownCounter}, - Context, KeyValue, + KeyValue, }; use pin_project_lite::pin_project; use tower::{Layer, Service}; @@ -98,7 +98,7 @@ struct InFlightGuard { impl InFlightGuard { fn new(counter: UpDownCounter, attributes: Vec) -> Self { - counter.add(&Context::new(), 1, &attributes); + counter.add(1, &attributes); Self { counter, @@ -109,7 +109,7 @@ impl InFlightGuard { impl Drop for InFlightGuard { fn drop(&mut self) { - self.counter.add(&Context::new(), -1, &self.attributes); + self.counter.add(-1, &self.attributes); } } diff --git a/crates/tower/src/tracing/future.rs b/crates/tower/src/tracing/future.rs index 492190e0..1f2fb7b4 100644 --- a/crates/tower/src/tracing/future.rs +++ b/crates/tower/src/tracing/future.rs @@ -54,7 +54,8 @@ where // Poll the inner future, with the span entered. This is effectively what // [`tracing::Instrumented`] does. - let result = ready!(this.span.in_scope(|| this.inner.poll(cx))); + let _guard = this.span.enter(); + let result = ready!(this.inner.poll(cx)); match &result { Ok(response) => {