From b81f4caace4912c5e99fee0fb2043ec16e8ff012 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 24 Feb 2022 16:03:37 +0100 Subject: [PATCH] Better OpenTelemetry tracer --- Cargo.lock | 30 ++-- crates/cli/src/telemetry.rs | 6 +- crates/http/Cargo.toml | 3 +- crates/http/src/layers/client.rs | 33 +--- crates/http/src/layers/mod.rs | 2 +- .../http/src/layers/otel/extract_context.rs | 51 ++++++ crates/http/src/layers/otel/inject_context.rs | 52 ++++++ crates/http/src/layers/otel/layer.rs | 154 ++++++++++++++++ .../http/src/layers/otel/make_span_builder.rs | 139 ++++++++++++++ crates/http/src/layers/otel/mod.rs | 112 ++++++++++++ crates/http/src/layers/otel/on_error.rs | 33 ++++ crates/http/src/layers/otel/on_response.rs | 37 ++++ crates/http/src/layers/otel/service.rs | 86 +++++++++ crates/http/src/layers/server.rs | 6 +- crates/http/src/layers/trace.rs | 170 ------------------ crates/http/src/lib.rs | 22 ++- 16 files changed, 712 insertions(+), 224 deletions(-) create mode 100644 crates/http/src/layers/otel/extract_context.rs create mode 100644 crates/http/src/layers/otel/inject_context.rs create mode 100644 crates/http/src/layers/otel/layer.rs create mode 100644 crates/http/src/layers/otel/make_span_builder.rs create mode 100644 crates/http/src/layers/otel/mod.rs create mode 100644 crates/http/src/layers/otel/on_error.rs create mode 100644 crates/http/src/layers/otel/on_response.rs create mode 100644 crates/http/src/layers/otel/service.rs delete mode 100644 crates/http/src/layers/trace.rs diff --git a/Cargo.lock b/Cargo.lock index f0bb10ce..c48ba271 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -509,9 +509,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "blake2" -version = "0.10.3" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f08f9f6871a8eacbb960d18db3d077ae6db1f0bc0df3272a78ca09eef8c5a931" +checksum = "b9cf849ee05b2ee5fba5e36f97ff8ec2533916700fc0758d40d92136a42f3388" dependencies = [ "digest 0.10.3", ] @@ -746,9 +746,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "3.1.0" +version = "3.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd1122e63869df2cb309f449da1ad54a7c6dfeb7c7e6ccd8e0825d9eb93bb72" +checksum = "01d42c94ce7c2252681b5fed4d3627cc807b13dfc033246bd05d5b252399000e" dependencies = [ "heck 0.4.0", "proc-macro-error", @@ -1302,9 +1302,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" +checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" dependencies = [ "cfg-if", "libc", @@ -1673,9 +1673,9 @@ dependencies = [ [[package]] name = "integer-encoding" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90c11140ffea82edce8dcd74137ce9324ec24b3cf0175fc9d7e29164da9915b8" +checksum = "0e85a1509a128c855368e135cffcde7eac17d8e1083f41e2b98c58bc1a5074be" [[package]] name = "iovec" @@ -1779,9 +1779,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.118" +version = "0.2.119" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06e509672465a0504304aa87f9f176f2b2b716ed8fb105ebe5c02dc6dce96a94" +checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4" [[package]] name = "libm" @@ -1976,6 +1976,7 @@ dependencies = [ "hyper-rustls 0.23.0", "opentelemetry", "opentelemetry-http", + "opentelemetry-semantic-conventions", "rustls 0.20.4", "serde", "serde_json", @@ -3362,9 +3363,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0486718e92ec9a68fbed73bb5ef687d71103b142595b406835649bebd33f72c7" +checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d" [[package]] name = "serde" @@ -4133,7 +4134,6 @@ dependencies = [ "tower", "tower-layer", "tower-service", - "tracing", ] [[package]] @@ -4787,9 +4787,9 @@ dependencies = [ [[package]] name = "zeroize_derive" -version = "1.3.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81e8f13fef10b63c06356d65d416b070798ddabcadc10d3ece0c5be9b3c7eddb" +checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" dependencies = [ "proc-macro2", "quote", diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs index e6ce8ddc..220074ba 100644 --- a/crates/cli/src/telemetry.rs +++ b/crates/cli/src/telemetry.rs @@ -23,7 +23,7 @@ use opentelemetry::{ sdk::{ self, propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, - trace::Tracer, + trace::{Sampler, Tracer}, Resource, }, }; @@ -215,7 +215,9 @@ fn meter(config: &MetricsExporterConfig) -> anyhow::Result<()> { } fn trace_config() -> sdk::trace::Config { - sdk::trace::config().with_resource(resource()) + sdk::trace::config() + .with_resource(resource()) + .with_sampler(Sampler::AlwaysOn) } fn resource() -> Resource { diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 42f533d1..e3f628e6 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -14,12 +14,13 @@ hyper = "0.14.17" hyper-rustls = { version = "0.23.0", features = ["http1", "http2"] } opentelemetry = "0.17.0" opentelemetry-http = "0.6.0" +opentelemetry-semantic-conventions = "0.9.0" rustls = "0.20.4" serde = "1.0.136" serde_json = "1.0.79" thiserror = "1.0.30" tokio = { version = "1.17.0", features = ["sync", "parking_lot"] } tower = { version = "0.4.12", features = ["timeout", "limit"] } -tower-http = { version = "0.2.3", features = ["follow-redirect", "decompression-full", "set-header", "trace", "compression-full"] } +tower-http = { version = "0.2.3", features = ["follow-redirect", "decompression-full", "set-header", "compression-full"] } tracing = "0.1.31" tracing-opentelemetry = "0.17.2" diff --git a/crates/http/src/layers/client.rs b/crates/http/src/layers/client.rs index 8fac89c4..8d0f429d 100644 --- a/crates/http/src/layers/client.rs +++ b/crates/http/src/layers/client.rs @@ -15,7 +15,6 @@ use std::{marker::PhantomData, time::Duration}; use http::{header::USER_AGENT, HeaderValue, Request, Response}; -use http_body::combinators::BoxBody; use tower::{ limit::ConcurrencyLimitLayer, timeout::TimeoutLayer, util::BoxCloneService, Layer, Service, ServiceBuilder, ServiceExt, @@ -25,15 +24,13 @@ use tower_http::{ follow_redirect::FollowRedirectLayer, set_header::SetRequestHeaderLayer, }; -use tracing_opentelemetry::OpenTelemetrySpanExt; -use super::trace::OtelTraceLayer; +use super::otel::TraceLayer; +use crate::BoxError; static MAS_USER_AGENT: HeaderValue = HeaderValue::from_static("matrix-authentication-service/0.0.1"); -type BoxError = Box; - #[derive(Debug, Clone)] pub struct ClientLayer { operation: &'static str, @@ -50,16 +47,13 @@ impl ClientLayer { } } -pub type ClientResponse = Response< - DecompressionBody::Data, ::Error>>, ->; +pub type ClientResponse = Response>; impl Layer for ClientLayer where S: Service, Response = Response, Error = E> + Clone + Send + 'static, ReqBody: http_body::Body + Default + Send + 'static, ResBody: http_body::Body + Sync + Send + 'static, - ResBody::Error: std::fmt::Display + 'static, S::Future: Send + 'static, E: Into, { @@ -71,33 +65,20 @@ where // - the TimeoutLayer // - the DecompressionLayer // Those layers do type erasure of the error. - // The body is also type-erased because of the DecompressionLayer. - ServiceBuilder::new() .layer(DecompressionLayer::new()) - .map_response(|r: Response<_>| r.map(BoxBody::new)) .layer(SetRequestHeaderLayer::overriding( USER_AGENT, MAS_USER_AGENT.clone(), )) - // A trace that has the whole operation, with all the redirects, retries, rate limits - .layer(OtelTraceLayer::outer_client(self.operation)) + // A trace that has the whole operation, with all the redirects, timeouts and rate + // limits in it + .layer(TraceLayer::http_client(self.operation)) .layer(ConcurrencyLimitLayer::new(10)) .layer(FollowRedirectLayer::new()) // A trace for each "real" http request - .layer(OtelTraceLayer::inner_client()) + .layer(TraceLayer::inner_http_client()) .layer(TimeoutLayer::new(Duration::from_secs(10))) - // Propagate the span context - .map_request(|mut r: Request<_>| { - // TODO: this seems to be broken - let cx = tracing::Span::current().context(); - let mut injector = opentelemetry_http::HeaderInjector(r.headers_mut()); - opentelemetry::global::get_text_map_propagator(|propagator| { - propagator.inject_context(&cx, &mut injector); - }); - - r - }) .service(inner) .boxed_clone() } diff --git a/crates/http/src/layers/mod.rs b/crates/http/src/layers/mod.rs index 2fd44508..baefd54e 100644 --- a/crates/http/src/layers/mod.rs +++ b/crates/http/src/layers/mod.rs @@ -14,5 +14,5 @@ pub(crate) mod client; pub(crate) mod json; +pub mod otel; pub(crate) mod server; -pub(crate) mod trace; diff --git a/crates/http/src/layers/otel/extract_context.rs b/crates/http/src/layers/otel/extract_context.rs new file mode 100644 index 00000000..56015e71 --- /dev/null +++ b/crates/http/src/layers/otel/extract_context.rs @@ -0,0 +1,51 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use http::Request; +use opentelemetry::{trace::TraceContextExt, Context}; +use opentelemetry_http::HeaderExtractor; + +pub trait ExtractContext { + fn extract_context(&self, request: &R) -> Context; +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct DefaultExtractContext; + +impl ExtractContext for DefaultExtractContext { + fn extract_context(&self, _request: &T) -> Context { + Context::current() + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct ExtractFromHttpRequest; + +impl ExtractContext> for ExtractFromHttpRequest { + fn extract_context(&self, request: &Request) -> Context { + let headers = request.headers(); + let extractor = HeaderExtractor(headers); + let parent_cx = Context::current(); + + let cx = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract_with_context(&parent_cx, &extractor) + }); + + if cx.span().span_context().is_remote() { + cx + } else { + parent_cx + } + } +} diff --git a/crates/http/src/layers/otel/inject_context.rs b/crates/http/src/layers/otel/inject_context.rs new file mode 100644 index 00000000..100e2ffc --- /dev/null +++ b/crates/http/src/layers/otel/inject_context.rs @@ -0,0 +1,52 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use http::Request; +use opentelemetry::Context; +use opentelemetry_http::HeaderInjector; + +pub trait InjectContext { + type Output; + + fn inject_context(&self, cx: &Context, request: R) -> Self::Output; +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct DefaultInjectContext; + +impl InjectContext for DefaultInjectContext { + type Output = R; + + fn inject_context(&self, _cx: &Context, request: R) -> Self::Output { + request + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct InjectInHttpRequest; + +impl InjectContext> for InjectInHttpRequest { + type Output = Request; + + fn inject_context(&self, cx: &Context, mut request: Request) -> Self::Output { + let headers = request.headers_mut(); + let mut injector = HeaderInjector(headers); + + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(cx, &mut injector); + }); + + request + } +} diff --git a/crates/http/src/layers/otel/layer.rs b/crates/http/src/layers/otel/layer.rs new file mode 100644 index 00000000..0bf65cc7 --- /dev/null +++ b/crates/http/src/layers/otel/layer.rs @@ -0,0 +1,154 @@ +use std::sync::Arc; + +use tower::Layer; + +use super::{ + extract_context::DefaultExtractContext, inject_context::DefaultInjectContext, + make_span_builder::DefaultMakeSpanBuilder, on_error::DefaultOnError, + on_response::DefaultOnResponse, service::Trace, +}; + +#[derive(Debug, Clone)] +pub struct TraceLayer< + ExtractContext = DefaultExtractContext, + InjectContext = DefaultInjectContext, + MakeSpanBuilder = DefaultMakeSpanBuilder, + OnResponse = DefaultOnResponse, + OnError = DefaultOnError, +> { + tracer: Arc, + extract_context: ExtractContext, + inject_context: InjectContext, + make_span_builder: MakeSpanBuilder, + on_response: OnResponse, + on_error: OnError, +} + +impl Default for TraceLayer { + fn default() -> Self { + let tracer = Arc::new(opentelemetry::global::tracer("mas-http")); + Self::new(tracer) + } +} + +impl + TraceLayer +{ + #[must_use] + pub fn new(tracer: Arc) -> Self + where + ExtractContext: Default, + InjectContext: Default, + MakeSpanBuilder: Default, + OnResponse: Default, + OnError: Default, + { + Self { + tracer, + extract_context: ExtractContext::default(), + inject_context: InjectContext::default(), + make_span_builder: MakeSpanBuilder::default(), + on_response: OnResponse::default(), + on_error: OnError::default(), + } + } + + #[must_use] + pub fn extract_context( + self, + extract_context: NewExtractContext, + ) -> TraceLayer { + TraceLayer { + tracer: self.tracer, + extract_context, + inject_context: self.inject_context, + make_span_builder: self.make_span_builder, + on_response: self.on_response, + on_error: self.on_error, + } + } + + #[must_use] + pub fn inject_context( + self, + inject_context: NewInjectContext, + ) -> TraceLayer { + TraceLayer { + tracer: self.tracer, + extract_context: self.extract_context, + inject_context, + make_span_builder: self.make_span_builder, + on_response: self.on_response, + on_error: self.on_error, + } + } + + #[must_use] + pub fn make_span_builder( + self, + make_span_builder: NewMakeSpanBuilder, + ) -> TraceLayer { + TraceLayer { + tracer: self.tracer, + extract_context: self.extract_context, + inject_context: self.inject_context, + make_span_builder, + on_response: self.on_response, + on_error: self.on_error, + } + } + + #[must_use] + pub fn on_response( + self, + on_response: NewOnResponse, + ) -> TraceLayer { + TraceLayer { + tracer: self.tracer, + extract_context: self.extract_context, + inject_context: self.inject_context, + make_span_builder: self.make_span_builder, + on_response, + on_error: self.on_error, + } + } + + #[must_use] + pub fn on_error( + self, + on_error: NewOnError, + ) -> TraceLayer { + TraceLayer { + tracer: self.tracer, + extract_context: self.extract_context, + inject_context: self.inject_context, + make_span_builder: self.make_span_builder, + on_response: self.on_response, + on_error, + } + } +} + +impl Layer + for TraceLayer +where + ExtractContext: Clone, + InjectContext: Clone, + MakeSpanBuilder: Clone, + OnResponse: Clone, + OnError: Clone, +{ + type Service = Trace; + + fn layer(&self, inner: S) -> Self::Service { + Trace { + inner, + tracer: self.tracer.clone(), + extract_context: self.extract_context.clone(), + inject_context: self.inject_context.clone(), + make_span_builder: self.make_span_builder.clone(), + on_response: self.on_response.clone(), + on_error: self.on_error.clone(), + } + } +} diff --git a/crates/http/src/layers/otel/make_span_builder.rs b/crates/http/src/layers/otel/make_span_builder.rs new file mode 100644 index 00000000..fe3586a6 --- /dev/null +++ b/crates/http/src/layers/otel/make_span_builder.rs @@ -0,0 +1,139 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; + +use http::{Method, Request, Version}; +use hyper::client::connect::dns::Name; +use opentelemetry::trace::{SpanBuilder, SpanKind}; +use opentelemetry_semantic_conventions::trace::{ + HTTP_FLAVOR, HTTP_METHOD, HTTP_URL, NET_HOST_NAME, +}; + +pub trait MakeSpanBuilder { + fn make_span_builder(&self, request: &R) -> SpanBuilder; +} + +#[derive(Debug, Clone, Copy)] +pub struct DefaultMakeSpanBuilder { + operation: &'static str, +} + +impl DefaultMakeSpanBuilder { + #[must_use] + pub fn new(operation: &'static str) -> Self { + Self { operation } + } +} + +impl Default for DefaultMakeSpanBuilder { + fn default() -> Self { + Self { + operation: "service", + } + } +} + +impl MakeSpanBuilder for DefaultMakeSpanBuilder { + fn make_span_builder(&self, _request: &R) -> SpanBuilder { + SpanBuilder::from_name(self.operation) + } +} + +#[inline] +fn http_method_str(method: &Method) -> Cow<'static, str> { + match method { + &Method::OPTIONS => "OPTIONS".into(), + &Method::GET => "GET".into(), + &Method::POST => "POST".into(), + &Method::PUT => "PUT".into(), + &Method::DELETE => "DELETE".into(), + &Method::HEAD => "HEAD".into(), + &Method::TRACE => "TRACE".into(), + &Method::CONNECT => "CONNECT".into(), + &Method::PATCH => "PATCH".into(), + other => other.to_string().into(), + } +} + +#[inline] +fn http_flavor(version: Version) -> Cow<'static, str> { + match version { + Version::HTTP_09 => "0.9".into(), + Version::HTTP_10 => "1.0".into(), + Version::HTTP_11 => "1.1".into(), + Version::HTTP_2 => "2.0".into(), + Version::HTTP_3 => "3.0".into(), + other => format!("{:?}", other).into(), + } +} + +#[derive(Debug, Clone)] +pub struct SpanFromHttpRequest { + operation: &'static str, + span_kind: SpanKind, +} + +impl SpanFromHttpRequest { + #[must_use] + pub fn server() -> Self { + Self { + operation: "http-server", + span_kind: SpanKind::Server, + } + } + + #[must_use] + pub fn inner_client() -> Self { + Self { + operation: "http-client", + span_kind: SpanKind::Client, + } + } + + #[must_use] + pub fn client(operation: &'static str) -> Self { + Self { + operation, + span_kind: SpanKind::Client, + } + } +} + +impl MakeSpanBuilder> for SpanFromHttpRequest { + fn make_span_builder(&self, request: &Request) -> SpanBuilder { + let attributes = vec![ + HTTP_METHOD.string(http_method_str(request.method())), + HTTP_FLAVOR.string(http_flavor(request.version())), + HTTP_URL.string(request.uri().to_string()), + ]; + + SpanBuilder::from_name(self.operation) + .with_kind(self.span_kind.clone()) + .with_attributes(attributes) + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct SpanFromDnsRequest; + +impl MakeSpanBuilder for SpanFromDnsRequest { + fn make_span_builder(&self, request: &Name) -> SpanBuilder { + let attributes = vec![NET_HOST_NAME.string(request.as_str().to_string())]; + + SpanBuilder::from_name("resolve") + .with_kind(SpanKind::Client) + .with_attributes(attributes) + } +} diff --git a/crates/http/src/layers/otel/mod.rs b/crates/http/src/layers/otel/mod.rs new file mode 100644 index 00000000..32fb9330 --- /dev/null +++ b/crates/http/src/layers/otel/mod.rs @@ -0,0 +1,112 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod extract_context; +mod inject_context; +mod layer; +mod make_span_builder; +mod on_error; +mod on_response; +mod service; + +pub type TraceHttpServerLayer = TraceLayer< + ExtractFromHttpRequest, + DefaultInjectContext, + SpanFromHttpRequest, + OnHttpResponse, + DefaultOnError, +>; + +pub type TraceHttpServer = Trace< + ExtractFromHttpRequest, + DefaultInjectContext, + SpanFromHttpRequest, + OnHttpResponse, + DefaultOnError, + S, +>; + +pub type TraceHttpClientLayer = TraceLayer< + DefaultExtractContext, + InjectInHttpRequest, + SpanFromHttpRequest, + OnHttpResponse, + DefaultOnError, +>; + +pub type TraceHttpClient = Trace< + DefaultExtractContext, + InjectInHttpRequest, + SpanFromHttpRequest, + OnHttpResponse, + DefaultOnError, + S, +>; + +pub type TraceDnsLayer = TraceLayer< + DefaultExtractContext, + DefaultInjectContext, + SpanFromDnsRequest, + DefaultOnResponse, + DefaultOnError, +>; + +pub type TraceDns = Trace< + DefaultExtractContext, + DefaultInjectContext, + SpanFromDnsRequest, + DefaultOnResponse, + DefaultOnError, + S, +>; + +impl TraceHttpServerLayer { + #[must_use] + pub fn http_server() -> Self { + TraceLayer::default() + .make_span_builder(SpanFromHttpRequest::server()) + .on_response(OnHttpResponse) + .extract_context(ExtractFromHttpRequest) + } +} + +impl TraceHttpClientLayer { + #[must_use] + pub fn http_client(operation: &'static str) -> Self { + TraceLayer::default() + .make_span_builder(SpanFromHttpRequest::client(operation)) + .on_response(OnHttpResponse) + .inject_context(InjectInHttpRequest) + } + + #[must_use] + pub fn inner_http_client() -> Self { + TraceLayer::default() + .make_span_builder(SpanFromHttpRequest::inner_client()) + .on_response(OnHttpResponse) + .inject_context(InjectInHttpRequest) + } +} + +impl TraceDnsLayer { + #[must_use] + pub fn dns() -> Self { + TraceLayer::default().make_span_builder(SpanFromDnsRequest) + } +} + +pub use self::{ + extract_context::*, inject_context::*, layer::*, make_span_builder::*, on_error::*, + on_response::*, service::*, +}; diff --git a/crates/http/src/layers/otel/on_error.rs b/crates/http/src/layers/otel/on_error.rs new file mode 100644 index 00000000..93634079 --- /dev/null +++ b/crates/http/src/layers/otel/on_error.rs @@ -0,0 +1,33 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use opentelemetry::trace::SpanRef; +use opentelemetry_semantic_conventions::trace::EXCEPTION_MESSAGE; + +pub trait OnError { + fn on_error(&self, span: &SpanRef<'_>, err: &E); +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct DefaultOnError; + +impl OnError for DefaultOnError +where + E: std::fmt::Display, +{ + fn on_error(&self, span: &SpanRef<'_>, err: &E) { + let attributes = vec![EXCEPTION_MESSAGE.string(err.to_string())]; + span.add_event("exception".to_string(), attributes); + } +} diff --git a/crates/http/src/layers/otel/on_response.rs b/crates/http/src/layers/otel/on_response.rs new file mode 100644 index 00000000..ae96bb55 --- /dev/null +++ b/crates/http/src/layers/otel/on_response.rs @@ -0,0 +1,37 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use http::Response; +use opentelemetry::trace::SpanRef; +use opentelemetry_semantic_conventions::trace::HTTP_STATUS_CODE; + +pub trait OnResponse { + fn on_response(&self, span: &SpanRef<'_>, response: &R); +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct DefaultOnResponse; + +impl OnResponse for DefaultOnResponse { + fn on_response(&self, _span: &SpanRef<'_>, _response: &R) {} +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct OnHttpResponse; + +impl OnResponse> for OnHttpResponse { + fn on_response(&self, span: &SpanRef<'_>, response: &Response) { + span.set_attribute(HTTP_STATUS_CODE.i64(i64::from(response.status().as_u16()))); + } +} diff --git a/crates/http/src/layers/otel/service.rs b/crates/http/src/layers/otel/service.rs new file mode 100644 index 00000000..9e4e57d6 --- /dev/null +++ b/crates/http/src/layers/otel/service.rs @@ -0,0 +1,86 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{sync::Arc, task::Poll}; + +use futures_util::{future::BoxFuture, FutureExt as _}; +use opentelemetry::trace::{FutureExt as _, TraceContextExt}; +use tower::Service; + +use super::{ + extract_context::ExtractContext, inject_context::InjectContext, + make_span_builder::MakeSpanBuilder, on_error::OnError, on_response::OnResponse, +}; + +#[derive(Debug, Clone)] +pub struct Trace { + pub(crate) inner: S, + pub(crate) tracer: Arc, + pub(crate) extract_context: ExtractContext, + pub(crate) inject_context: InjectContext, + pub(crate) make_span_builder: MakeSpanBuilder, + pub(crate) on_response: OnResponse, + pub(crate) on_error: OnError, +} + +impl Service + for Trace +where + ExtractContextT: ExtractContext + Send, + InjectContextT: InjectContext + Send, + S: Service + Send, + OnResponseT: OnResponse + Send + Clone + 'static, + OnErrorT: OnError + Send + Clone + 'static, + MakeSpanBuilderT: MakeSpanBuilder + Send, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Req) -> Self::Future { + let cx = self.extract_context.extract_context(&request); + let span_builder = self.make_span_builder.make_span_builder(&request); + let span = span_builder.start_with_context(self.tracer.as_ref(), &cx); + + let cx = cx.with_span(span); + let request = self.inject_context.inject_context(&cx, request); + + let on_response = self.on_response.clone(); + let on_error = self.on_error.clone(); + let attachment = cx.clone().attach(); + let ret = self + .inner + .call(request) + .with_context(cx.clone()) + .inspect(move |r| { + let span = cx.span(); + match r { + Ok(response) => on_response.on_response(&span, response), + Err(err) => on_error.on_error(&span, err), + } + + span.end(); + }) + .boxed(); + + drop(attachment); + + ret + } +} diff --git a/crates/http/src/layers/server.rs b/crates/http/src/layers/server.rs index 1bebfec0..24d93323 100644 --- a/crates/http/src/layers/server.rs +++ b/crates/http/src/layers/server.rs @@ -21,7 +21,7 @@ use tower::{ }; use tower_http::compression::{CompressionBody, CompressionLayer}; -use super::trace::OtelTraceLayer; +use super::otel::TraceLayer; use crate::BoxError; #[derive(Debug, Default)] @@ -36,7 +36,7 @@ where ResBody: http_body::Body + Sync + Send + 'static, ResBody::Error: std::fmt::Display + 'static, S::Future: Send + 'static, - E: Into, + E: std::error::Error + Into, { #[allow(clippy::type_complexity)] type Service = BoxCloneService< @@ -48,8 +48,8 @@ where fn layer(&self, inner: S) -> Self::Service { ServiceBuilder::new() .layer(CompressionLayer::new()) + .layer(TraceLayer::http_server()) .map_response(|r: Response<_>| r.map(BoxBody::new)) - .layer(OtelTraceLayer::server()) .layer(TimeoutLayer::new(Duration::from_secs(10))) .service(inner) .boxed_clone() diff --git a/crates/http/src/layers/trace.rs b/crates/http/src/layers/trace.rs deleted file mode 100644 index 84aea725..00000000 --- a/crates/http/src/layers/trace.rs +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::Duration; - -use http::{header::USER_AGENT, Request, Response, Version}; -use opentelemetry::trace::TraceContextExt; -use opentelemetry_http::HeaderExtractor; -use tower::Layer; -use tower_http::{ - classify::{ServerErrorsAsFailures, SharedClassifier}, - trace::{DefaultOnRequest, MakeSpan, OnResponse, Trace}, -}; -use tracing::{field, Span}; -use tracing_opentelemetry::OpenTelemetrySpanExt; - -#[derive(Debug, Clone, Copy)] -pub enum OtelTraceLayer { - OuterClient(&'static str), - InnerClient, - Server, -} - -impl OtelTraceLayer { - pub const fn outer_client(operation: &'static str) -> Self { - Self::OuterClient(operation) - } - - pub const fn inner_client() -> Self { - Self::InnerClient - } - - pub const fn server() -> Self { - Self::Server - } -} - -impl Layer for OtelTraceLayer { - type Service = Trace< - S, - SharedClassifier, - MakeOtelSpan, - DefaultOnRequest, - OtelOnResponse, - >; - - fn layer(&self, inner: S) -> Self::Service { - let make_span = match self { - Self::OuterClient(o) => MakeOtelSpan::OuterClient(o), - Self::InnerClient => MakeOtelSpan::InnerClient, - Self::Server => MakeOtelSpan::Server, - }; - - Trace::new_for_http(inner) - .make_span_with(make_span) - .on_response(OtelOnResponse) - } -} - -#[derive(Debug, Clone, Copy)] -pub enum MakeOtelSpan { - OuterClient(&'static str), - InnerClient, - Server, -} - -impl MakeSpan for MakeOtelSpan { - fn make_span(&mut self, request: &Request) -> Span { - // Extract the context from the headers - let headers = request.headers(); - - let version = match request.version() { - Version::HTTP_09 => "0.9", - Version::HTTP_10 => "1.0", - Version::HTTP_11 => "1.1", - Version::HTTP_2 => "2.0", - Version::HTTP_3 => "3.0", - _ => "", - }; - - let span = match self { - Self::OuterClient(operation) => { - tracing::info_span!( - "client_request", - otel.name = operation, - otel.kind = "internal", - otel.status_code = field::Empty, - http.method = %request.method(), - http.target = %request.uri(), - http.flavor = version, - http.status_code = field::Empty, - http.user_agent = field::Empty, - ) - } - Self::InnerClient => { - tracing::info_span!( - "outgoing_request", - otel.kind = "client", - otel.status_code = field::Empty, - http.method = %request.method(), - http.target = %request.uri(), - http.flavor = version, - http.status_code = field::Empty, - http.user_agent = field::Empty, - ) - } - Self::Server => { - let span = tracing::info_span!( - "incoming_request", - otel.kind = "server", - otel.status_code = field::Empty, - http.method = %request.method(), - http.target = %request.uri(), - http.flavor = version, - http.status_code = field::Empty, - http.user_agent = field::Empty, - ); - - // Extract the context from the headers for server spans - let headers = request.headers(); - let extractor = HeaderExtractor(headers); - - let cx = opentelemetry::global::get_text_map_propagator(|propagator| { - propagator.extract(&extractor) - }); - - if cx.span().span_context().is_remote() { - span.set_parent(cx); - } - - span - } - }; - - if let Some(user_agent) = headers.get(USER_AGENT).and_then(|s| s.to_str().ok()) { - span.record("http.user_agent", &user_agent); - } - - span - } -} - -#[derive(Debug, Clone, Default)] -pub struct OtelOnResponse; - -impl OnResponse for OtelOnResponse { - fn on_response(self, response: &Response, _latency: Duration, span: &Span) { - let s = response.status(); - let status = if s.is_success() { - "ok" - } else if s.is_client_error() || s.is_server_error() { - "error" - } else { - "unset" - }; - span.record("otel.status_code", &status); - span.record("http.status_code", &s.as_u16()); - } -} diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index 81356db9..f39a4a9e 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -29,9 +29,15 @@ use bytes::Bytes; use futures_util::{FutureExt, TryFutureExt}; use http::{Request, Response}; use http_body::{combinators::BoxBody, Body}; -use hyper::{client::HttpConnector, Client}; +use hyper::{ + client::{connect::dns::GaiResolver, HttpConnector}, + Client, +}; use hyper_rustls::{ConfigBuilderExt, HttpsConnector, HttpsConnectorBuilder}; -use layers::client::ClientResponse; +use layers::{ + client::ClientResponse, + otel::{TraceDns, TraceLayer}, +}; use thiserror::Error; use tokio::{sync::OnceCell, task::JoinError}; use tower::{util::BoxCloneService, ServiceBuilder, ServiceExt}; @@ -43,7 +49,7 @@ mod layers; pub use self::{ ext::ServiceExt as HttpServiceExt, future_service::FutureService, - layers::{client::ClientLayer, json::JsonResponseLayer, server::ServerLayer}, + layers::{client::ClientLayer, json::JsonResponseLayer, otel, server::ServerLayer}, }; pub(crate) type BoxError = Box; @@ -71,13 +77,17 @@ pub enum ClientInitError { static TLS_CONFIG: OnceCell = OnceCell::const_new(); async fn make_base_client( -) -> Result, B>, ClientInitError> +) -> Result>>, B>, ClientInitError> where B: http_body::Body + Send + 'static, E: Into, { - // TODO: we could probably hook a tracing DNS resolver there - let mut http = HttpConnector::new(); + // Trace DNS requests + let resolver = ServiceBuilder::new() + .layer(TraceLayer::dns()) + .service(GaiResolver::new()); + + let mut http = HttpConnector::new_with_resolver(resolver); http.enforce_http(false); let tls_config = TLS_CONFIG