1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-07-29 22:01:14 +03:00

Better OpenTelemetry tracer

This commit is contained in:
Quentin Gliech
2022-02-24 16:03:37 +01:00
parent 4c31f8d831
commit b81f4caace
16 changed files with 712 additions and 224 deletions

30
Cargo.lock generated
View File

@ -509,9 +509,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "blake2" name = "blake2"
version = "0.10.3" version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f08f9f6871a8eacbb960d18db3d077ae6db1f0bc0df3272a78ca09eef8c5a931" checksum = "b9cf849ee05b2ee5fba5e36f97ff8ec2533916700fc0758d40d92136a42f3388"
dependencies = [ dependencies = [
"digest 0.10.3", "digest 0.10.3",
] ]
@ -746,9 +746,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_derive" name = "clap_derive"
version = "3.1.0" version = "3.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fd1122e63869df2cb309f449da1ad54a7c6dfeb7c7e6ccd8e0825d9eb93bb72" checksum = "01d42c94ce7c2252681b5fed4d3627cc807b13dfc033246bd05d5b252399000e"
dependencies = [ dependencies = [
"heck 0.4.0", "heck 0.4.0",
"proc-macro-error", "proc-macro-error",
@ -1302,9 +1302,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.4" version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
@ -1673,9 +1673,9 @@ dependencies = [
[[package]] [[package]]
name = "integer-encoding" name = "integer-encoding"
version = "3.0.2" version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90c11140ffea82edce8dcd74137ce9324ec24b3cf0175fc9d7e29164da9915b8" checksum = "0e85a1509a128c855368e135cffcde7eac17d8e1083f41e2b98c58bc1a5074be"
[[package]] [[package]]
name = "iovec" name = "iovec"
@ -1779,9 +1779,9 @@ dependencies = [
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.118" version = "0.2.119"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06e509672465a0504304aa87f9f176f2b2b716ed8fb105ebe5c02dc6dce96a94" checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4"
[[package]] [[package]]
name = "libm" name = "libm"
@ -1976,6 +1976,7 @@ dependencies = [
"hyper-rustls 0.23.0", "hyper-rustls 0.23.0",
"opentelemetry", "opentelemetry",
"opentelemetry-http", "opentelemetry-http",
"opentelemetry-semantic-conventions",
"rustls 0.20.4", "rustls 0.20.4",
"serde", "serde",
"serde_json", "serde_json",
@ -3362,9 +3363,9 @@ dependencies = [
[[package]] [[package]]
name = "semver" name = "semver"
version = "1.0.5" version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0486718e92ec9a68fbed73bb5ef687d71103b142595b406835649bebd33f72c7" checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d"
[[package]] [[package]]
name = "serde" name = "serde"
@ -4133,7 +4134,6 @@ dependencies = [
"tower", "tower",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing",
] ]
[[package]] [[package]]
@ -4787,9 +4787,9 @@ dependencies = [
[[package]] [[package]]
name = "zeroize_derive" name = "zeroize_derive"
version = "1.3.1" version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81e8f13fef10b63c06356d65d416b070798ddabcadc10d3ece0c5be9b3c7eddb" checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -23,7 +23,7 @@ use opentelemetry::{
sdk::{ sdk::{
self, self,
propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator},
trace::Tracer, trace::{Sampler, Tracer},
Resource, Resource,
}, },
}; };
@ -215,7 +215,9 @@ fn meter(config: &MetricsExporterConfig) -> anyhow::Result<()> {
} }
fn trace_config() -> sdk::trace::Config { fn trace_config() -> sdk::trace::Config {
sdk::trace::config().with_resource(resource()) sdk::trace::config()
.with_resource(resource())
.with_sampler(Sampler::AlwaysOn)
} }
fn resource() -> Resource { fn resource() -> Resource {

View File

@ -14,12 +14,13 @@ hyper = "0.14.17"
hyper-rustls = { version = "0.23.0", features = ["http1", "http2"] } hyper-rustls = { version = "0.23.0", features = ["http1", "http2"] }
opentelemetry = "0.17.0" opentelemetry = "0.17.0"
opentelemetry-http = "0.6.0" opentelemetry-http = "0.6.0"
opentelemetry-semantic-conventions = "0.9.0"
rustls = "0.20.4" rustls = "0.20.4"
serde = "1.0.136" serde = "1.0.136"
serde_json = "1.0.79" serde_json = "1.0.79"
thiserror = "1.0.30" thiserror = "1.0.30"
tokio = { version = "1.17.0", features = ["sync", "parking_lot"] } tokio = { version = "1.17.0", features = ["sync", "parking_lot"] }
tower = { version = "0.4.12", features = ["timeout", "limit"] } tower = { version = "0.4.12", features = ["timeout", "limit"] }
tower-http = { version = "0.2.3", features = ["follow-redirect", "decompression-full", "set-header", "trace", "compression-full"] } tower-http = { version = "0.2.3", features = ["follow-redirect", "decompression-full", "set-header", "compression-full"] }
tracing = "0.1.31" tracing = "0.1.31"
tracing-opentelemetry = "0.17.2" tracing-opentelemetry = "0.17.2"

View File

@ -15,7 +15,6 @@
use std::{marker::PhantomData, time::Duration}; use std::{marker::PhantomData, time::Duration};
use http::{header::USER_AGENT, HeaderValue, Request, Response}; use http::{header::USER_AGENT, HeaderValue, Request, Response};
use http_body::combinators::BoxBody;
use tower::{ use tower::{
limit::ConcurrencyLimitLayer, timeout::TimeoutLayer, util::BoxCloneService, Layer, Service, limit::ConcurrencyLimitLayer, timeout::TimeoutLayer, util::BoxCloneService, Layer, Service,
ServiceBuilder, ServiceExt, ServiceBuilder, ServiceExt,
@ -25,15 +24,13 @@ use tower_http::{
follow_redirect::FollowRedirectLayer, follow_redirect::FollowRedirectLayer,
set_header::SetRequestHeaderLayer, set_header::SetRequestHeaderLayer,
}; };
use tracing_opentelemetry::OpenTelemetrySpanExt;
use super::trace::OtelTraceLayer; use super::otel::TraceLayer;
use crate::BoxError;
static MAS_USER_AGENT: HeaderValue = static MAS_USER_AGENT: HeaderValue =
HeaderValue::from_static("matrix-authentication-service/0.0.1"); HeaderValue::from_static("matrix-authentication-service/0.0.1");
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ClientLayer<ReqBody> { pub struct ClientLayer<ReqBody> {
operation: &'static str, operation: &'static str,
@ -50,16 +47,13 @@ impl<B> ClientLayer<B> {
} }
} }
pub type ClientResponse<B> = Response< pub type ClientResponse<B> = Response<DecompressionBody<B>>;
DecompressionBody<BoxBody<<B as http_body::Body>::Data, <B as http_body::Body>::Error>>,
>;
impl<ReqBody, ResBody, S, E> Layer<S> for ClientLayer<ReqBody> impl<ReqBody, ResBody, S, E> Layer<S> for ClientLayer<ReqBody>
where where
S: Service<Request<ReqBody>, Response = Response<ResBody>, Error = E> + Clone + Send + 'static, S: Service<Request<ReqBody>, Response = Response<ResBody>, Error = E> + Clone + Send + 'static,
ReqBody: http_body::Body + Default + Send + 'static, ReqBody: http_body::Body + Default + Send + 'static,
ResBody: http_body::Body + Sync + Send + 'static, ResBody: http_body::Body + Sync + Send + 'static,
ResBody::Error: std::fmt::Display + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
E: Into<BoxError>, E: Into<BoxError>,
{ {
@ -71,33 +65,20 @@ where
// - the TimeoutLayer // - the TimeoutLayer
// - the DecompressionLayer // - the DecompressionLayer
// Those layers do type erasure of the error. // Those layers do type erasure of the error.
// The body is also type-erased because of the DecompressionLayer.
ServiceBuilder::new() ServiceBuilder::new()
.layer(DecompressionLayer::new()) .layer(DecompressionLayer::new())
.map_response(|r: Response<_>| r.map(BoxBody::new))
.layer(SetRequestHeaderLayer::overriding( .layer(SetRequestHeaderLayer::overriding(
USER_AGENT, USER_AGENT,
MAS_USER_AGENT.clone(), MAS_USER_AGENT.clone(),
)) ))
// A trace that has the whole operation, with all the redirects, retries, rate limits // A trace that has the whole operation, with all the redirects, timeouts and rate
.layer(OtelTraceLayer::outer_client(self.operation)) // limits in it
.layer(TraceLayer::http_client(self.operation))
.layer(ConcurrencyLimitLayer::new(10)) .layer(ConcurrencyLimitLayer::new(10))
.layer(FollowRedirectLayer::new()) .layer(FollowRedirectLayer::new())
// A trace for each "real" http request // A trace for each "real" http request
.layer(OtelTraceLayer::inner_client()) .layer(TraceLayer::inner_http_client())
.layer(TimeoutLayer::new(Duration::from_secs(10))) .layer(TimeoutLayer::new(Duration::from_secs(10)))
// Propagate the span context
.map_request(|mut r: Request<_>| {
// TODO: this seems to be broken
let cx = tracing::Span::current().context();
let mut injector = opentelemetry_http::HeaderInjector(r.headers_mut());
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut injector);
});
r
})
.service(inner) .service(inner)
.boxed_clone() .boxed_clone()
} }

View File

@ -14,5 +14,5 @@
pub(crate) mod client; pub(crate) mod client;
pub(crate) mod json; pub(crate) mod json;
pub mod otel;
pub(crate) mod server; pub(crate) mod server;
pub(crate) mod trace;

View File

@ -0,0 +1,51 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use http::Request;
use opentelemetry::{trace::TraceContextExt, Context};
use opentelemetry_http::HeaderExtractor;
pub trait ExtractContext<R> {
fn extract_context(&self, request: &R) -> Context;
}
#[derive(Debug, Clone, Copy, Default)]
pub struct DefaultExtractContext;
impl<T> ExtractContext<T> for DefaultExtractContext {
fn extract_context(&self, _request: &T) -> Context {
Context::current()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct ExtractFromHttpRequest;
impl<T> ExtractContext<Request<T>> for ExtractFromHttpRequest {
fn extract_context(&self, request: &Request<T>) -> Context {
let headers = request.headers();
let extractor = HeaderExtractor(headers);
let parent_cx = Context::current();
let cx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract_with_context(&parent_cx, &extractor)
});
if cx.span().span_context().is_remote() {
cx
} else {
parent_cx
}
}
}

View File

@ -0,0 +1,52 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use http::Request;
use opentelemetry::Context;
use opentelemetry_http::HeaderInjector;
pub trait InjectContext<R> {
type Output;
fn inject_context(&self, cx: &Context, request: R) -> Self::Output;
}
#[derive(Debug, Clone, Copy, Default)]
pub struct DefaultInjectContext;
impl<R> InjectContext<R> for DefaultInjectContext {
type Output = R;
fn inject_context(&self, _cx: &Context, request: R) -> Self::Output {
request
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct InjectInHttpRequest;
impl<T> InjectContext<Request<T>> for InjectInHttpRequest {
type Output = Request<T>;
fn inject_context(&self, cx: &Context, mut request: Request<T>) -> Self::Output {
let headers = request.headers_mut();
let mut injector = HeaderInjector(headers);
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(cx, &mut injector);
});
request
}
}

View File

@ -0,0 +1,154 @@
use std::sync::Arc;
use tower::Layer;
use super::{
extract_context::DefaultExtractContext, inject_context::DefaultInjectContext,
make_span_builder::DefaultMakeSpanBuilder, on_error::DefaultOnError,
on_response::DefaultOnResponse, service::Trace,
};
#[derive(Debug, Clone)]
pub struct TraceLayer<
ExtractContext = DefaultExtractContext,
InjectContext = DefaultInjectContext,
MakeSpanBuilder = DefaultMakeSpanBuilder,
OnResponse = DefaultOnResponse,
OnError = DefaultOnError,
> {
tracer: Arc<opentelemetry::global::BoxedTracer>,
extract_context: ExtractContext,
inject_context: InjectContext,
make_span_builder: MakeSpanBuilder,
on_response: OnResponse,
on_error: OnError,
}
impl Default for TraceLayer {
fn default() -> Self {
let tracer = Arc::new(opentelemetry::global::tracer("mas-http"));
Self::new(tracer)
}
}
impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
TraceLayer<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
{
#[must_use]
pub fn new(tracer: Arc<opentelemetry::global::BoxedTracer>) -> Self
where
ExtractContext: Default,
InjectContext: Default,
MakeSpanBuilder: Default,
OnResponse: Default,
OnError: Default,
{
Self {
tracer,
extract_context: ExtractContext::default(),
inject_context: InjectContext::default(),
make_span_builder: MakeSpanBuilder::default(),
on_response: OnResponse::default(),
on_error: OnError::default(),
}
}
#[must_use]
pub fn extract_context<NewExtractContext>(
self,
extract_context: NewExtractContext,
) -> TraceLayer<NewExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError> {
TraceLayer {
tracer: self.tracer,
extract_context,
inject_context: self.inject_context,
make_span_builder: self.make_span_builder,
on_response: self.on_response,
on_error: self.on_error,
}
}
#[must_use]
pub fn inject_context<NewInjectContext>(
self,
inject_context: NewInjectContext,
) -> TraceLayer<ExtractContext, NewInjectContext, MakeSpanBuilder, OnResponse, OnError> {
TraceLayer {
tracer: self.tracer,
extract_context: self.extract_context,
inject_context,
make_span_builder: self.make_span_builder,
on_response: self.on_response,
on_error: self.on_error,
}
}
#[must_use]
pub fn make_span_builder<NewMakeSpanBuilder>(
self,
make_span_builder: NewMakeSpanBuilder,
) -> TraceLayer<ExtractContext, InjectContext, NewMakeSpanBuilder, OnResponse, OnError> {
TraceLayer {
tracer: self.tracer,
extract_context: self.extract_context,
inject_context: self.inject_context,
make_span_builder,
on_response: self.on_response,
on_error: self.on_error,
}
}
#[must_use]
pub fn on_response<NewOnResponse>(
self,
on_response: NewOnResponse,
) -> TraceLayer<ExtractContext, InjectContext, MakeSpanBuilder, NewOnResponse, OnError> {
TraceLayer {
tracer: self.tracer,
extract_context: self.extract_context,
inject_context: self.inject_context,
make_span_builder: self.make_span_builder,
on_response,
on_error: self.on_error,
}
}
#[must_use]
pub fn on_error<NewOnError>(
self,
on_error: NewOnError,
) -> TraceLayer<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, NewOnError> {
TraceLayer {
tracer: self.tracer,
extract_context: self.extract_context,
inject_context: self.inject_context,
make_span_builder: self.make_span_builder,
on_response: self.on_response,
on_error,
}
}
}
impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError, S> Layer<S>
for TraceLayer<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
where
ExtractContext: Clone,
InjectContext: Clone,
MakeSpanBuilder: Clone,
OnResponse: Clone,
OnError: Clone,
{
type Service = Trace<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError, S>;
fn layer(&self, inner: S) -> Self::Service {
Trace {
inner,
tracer: self.tracer.clone(),
extract_context: self.extract_context.clone(),
inject_context: self.inject_context.clone(),
make_span_builder: self.make_span_builder.clone(),
on_response: self.on_response.clone(),
on_error: self.on_error.clone(),
}
}
}

View File

@ -0,0 +1,139 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use http::{Method, Request, Version};
use hyper::client::connect::dns::Name;
use opentelemetry::trace::{SpanBuilder, SpanKind};
use opentelemetry_semantic_conventions::trace::{
HTTP_FLAVOR, HTTP_METHOD, HTTP_URL, NET_HOST_NAME,
};
pub trait MakeSpanBuilder<R> {
fn make_span_builder(&self, request: &R) -> SpanBuilder;
}
#[derive(Debug, Clone, Copy)]
pub struct DefaultMakeSpanBuilder {
operation: &'static str,
}
impl DefaultMakeSpanBuilder {
#[must_use]
pub fn new(operation: &'static str) -> Self {
Self { operation }
}
}
impl Default for DefaultMakeSpanBuilder {
fn default() -> Self {
Self {
operation: "service",
}
}
}
impl<R> MakeSpanBuilder<R> for DefaultMakeSpanBuilder {
fn make_span_builder(&self, _request: &R) -> SpanBuilder {
SpanBuilder::from_name(self.operation)
}
}
#[inline]
fn http_method_str(method: &Method) -> Cow<'static, str> {
match method {
&Method::OPTIONS => "OPTIONS".into(),
&Method::GET => "GET".into(),
&Method::POST => "POST".into(),
&Method::PUT => "PUT".into(),
&Method::DELETE => "DELETE".into(),
&Method::HEAD => "HEAD".into(),
&Method::TRACE => "TRACE".into(),
&Method::CONNECT => "CONNECT".into(),
&Method::PATCH => "PATCH".into(),
other => other.to_string().into(),
}
}
#[inline]
fn http_flavor(version: Version) -> Cow<'static, str> {
match version {
Version::HTTP_09 => "0.9".into(),
Version::HTTP_10 => "1.0".into(),
Version::HTTP_11 => "1.1".into(),
Version::HTTP_2 => "2.0".into(),
Version::HTTP_3 => "3.0".into(),
other => format!("{:?}", other).into(),
}
}
#[derive(Debug, Clone)]
pub struct SpanFromHttpRequest {
operation: &'static str,
span_kind: SpanKind,
}
impl SpanFromHttpRequest {
#[must_use]
pub fn server() -> Self {
Self {
operation: "http-server",
span_kind: SpanKind::Server,
}
}
#[must_use]
pub fn inner_client() -> Self {
Self {
operation: "http-client",
span_kind: SpanKind::Client,
}
}
#[must_use]
pub fn client(operation: &'static str) -> Self {
Self {
operation,
span_kind: SpanKind::Client,
}
}
}
impl<B> MakeSpanBuilder<Request<B>> for SpanFromHttpRequest {
fn make_span_builder(&self, request: &Request<B>) -> SpanBuilder {
let attributes = vec![
HTTP_METHOD.string(http_method_str(request.method())),
HTTP_FLAVOR.string(http_flavor(request.version())),
HTTP_URL.string(request.uri().to_string()),
];
SpanBuilder::from_name(self.operation)
.with_kind(self.span_kind.clone())
.with_attributes(attributes)
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct SpanFromDnsRequest;
impl MakeSpanBuilder<Name> for SpanFromDnsRequest {
fn make_span_builder(&self, request: &Name) -> SpanBuilder {
let attributes = vec![NET_HOST_NAME.string(request.as_str().to_string())];
SpanBuilder::from_name("resolve")
.with_kind(SpanKind::Client)
.with_attributes(attributes)
}
}

View File

@ -0,0 +1,112 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod extract_context;
mod inject_context;
mod layer;
mod make_span_builder;
mod on_error;
mod on_response;
mod service;
pub type TraceHttpServerLayer = TraceLayer<
ExtractFromHttpRequest,
DefaultInjectContext,
SpanFromHttpRequest,
OnHttpResponse,
DefaultOnError,
>;
pub type TraceHttpServer<S> = Trace<
ExtractFromHttpRequest,
DefaultInjectContext,
SpanFromHttpRequest,
OnHttpResponse,
DefaultOnError,
S,
>;
pub type TraceHttpClientLayer = TraceLayer<
DefaultExtractContext,
InjectInHttpRequest,
SpanFromHttpRequest,
OnHttpResponse,
DefaultOnError,
>;
pub type TraceHttpClient<S> = Trace<
DefaultExtractContext,
InjectInHttpRequest,
SpanFromHttpRequest,
OnHttpResponse,
DefaultOnError,
S,
>;
pub type TraceDnsLayer = TraceLayer<
DefaultExtractContext,
DefaultInjectContext,
SpanFromDnsRequest,
DefaultOnResponse,
DefaultOnError,
>;
pub type TraceDns<S> = Trace<
DefaultExtractContext,
DefaultInjectContext,
SpanFromDnsRequest,
DefaultOnResponse,
DefaultOnError,
S,
>;
impl TraceHttpServerLayer {
#[must_use]
pub fn http_server() -> Self {
TraceLayer::default()
.make_span_builder(SpanFromHttpRequest::server())
.on_response(OnHttpResponse)
.extract_context(ExtractFromHttpRequest)
}
}
impl TraceHttpClientLayer {
#[must_use]
pub fn http_client(operation: &'static str) -> Self {
TraceLayer::default()
.make_span_builder(SpanFromHttpRequest::client(operation))
.on_response(OnHttpResponse)
.inject_context(InjectInHttpRequest)
}
#[must_use]
pub fn inner_http_client() -> Self {
TraceLayer::default()
.make_span_builder(SpanFromHttpRequest::inner_client())
.on_response(OnHttpResponse)
.inject_context(InjectInHttpRequest)
}
}
impl TraceDnsLayer {
#[must_use]
pub fn dns() -> Self {
TraceLayer::default().make_span_builder(SpanFromDnsRequest)
}
}
pub use self::{
extract_context::*, inject_context::*, layer::*, make_span_builder::*, on_error::*,
on_response::*, service::*,
};

View File

@ -0,0 +1,33 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use opentelemetry::trace::SpanRef;
use opentelemetry_semantic_conventions::trace::EXCEPTION_MESSAGE;
pub trait OnError<E> {
fn on_error(&self, span: &SpanRef<'_>, err: &E);
}
#[derive(Debug, Clone, Copy, Default)]
pub struct DefaultOnError;
impl<E> OnError<E> for DefaultOnError
where
E: std::fmt::Display,
{
fn on_error(&self, span: &SpanRef<'_>, err: &E) {
let attributes = vec![EXCEPTION_MESSAGE.string(err.to_string())];
span.add_event("exception".to_string(), attributes);
}
}

View File

@ -0,0 +1,37 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use http::Response;
use opentelemetry::trace::SpanRef;
use opentelemetry_semantic_conventions::trace::HTTP_STATUS_CODE;
pub trait OnResponse<R> {
fn on_response(&self, span: &SpanRef<'_>, response: &R);
}
#[derive(Debug, Clone, Copy, Default)]
pub struct DefaultOnResponse;
impl<R> OnResponse<R> for DefaultOnResponse {
fn on_response(&self, _span: &SpanRef<'_>, _response: &R) {}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct OnHttpResponse;
impl<B> OnResponse<Response<B>> for OnHttpResponse {
fn on_response(&self, span: &SpanRef<'_>, response: &Response<B>) {
span.set_attribute(HTTP_STATUS_CODE.i64(i64::from(response.status().as_u16())));
}
}

View File

@ -0,0 +1,86 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{sync::Arc, task::Poll};
use futures_util::{future::BoxFuture, FutureExt as _};
use opentelemetry::trace::{FutureExt as _, TraceContextExt};
use tower::Service;
use super::{
extract_context::ExtractContext, inject_context::InjectContext,
make_span_builder::MakeSpanBuilder, on_error::OnError, on_response::OnResponse,
};
#[derive(Debug, Clone)]
pub struct Trace<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError, S> {
pub(crate) inner: S,
pub(crate) tracer: Arc<opentelemetry::global::BoxedTracer>,
pub(crate) extract_context: ExtractContext,
pub(crate) inject_context: InjectContext,
pub(crate) make_span_builder: MakeSpanBuilder,
pub(crate) on_response: OnResponse,
pub(crate) on_error: OnError,
}
impl<Req, S, ExtractContextT, InjectContextT, MakeSpanBuilderT, OnResponseT, OnErrorT> Service<Req>
for Trace<ExtractContextT, InjectContextT, MakeSpanBuilderT, OnResponseT, OnErrorT, S>
where
ExtractContextT: ExtractContext<Req> + Send,
InjectContextT: InjectContext<Req> + Send,
S: Service<InjectContextT::Output> + Send,
OnResponseT: OnResponse<S::Response> + Send + Clone + 'static,
OnErrorT: OnError<S::Error> + Send + Clone + 'static,
MakeSpanBuilderT: MakeSpanBuilder<Req> + Send,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, request: Req) -> Self::Future {
let cx = self.extract_context.extract_context(&request);
let span_builder = self.make_span_builder.make_span_builder(&request);
let span = span_builder.start_with_context(self.tracer.as_ref(), &cx);
let cx = cx.with_span(span);
let request = self.inject_context.inject_context(&cx, request);
let on_response = self.on_response.clone();
let on_error = self.on_error.clone();
let attachment = cx.clone().attach();
let ret = self
.inner
.call(request)
.with_context(cx.clone())
.inspect(move |r| {
let span = cx.span();
match r {
Ok(response) => on_response.on_response(&span, response),
Err(err) => on_error.on_error(&span, err),
}
span.end();
})
.boxed();
drop(attachment);
ret
}
}

View File

@ -21,7 +21,7 @@ use tower::{
}; };
use tower_http::compression::{CompressionBody, CompressionLayer}; use tower_http::compression::{CompressionBody, CompressionLayer};
use super::trace::OtelTraceLayer; use super::otel::TraceLayer;
use crate::BoxError; use crate::BoxError;
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -36,7 +36,7 @@ where
ResBody: http_body::Body + Sync + Send + 'static, ResBody: http_body::Body + Sync + Send + 'static,
ResBody::Error: std::fmt::Display + 'static, ResBody::Error: std::fmt::Display + 'static,
S::Future: Send + 'static, S::Future: Send + 'static,
E: Into<BoxError>, E: std::error::Error + Into<BoxError>,
{ {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
type Service = BoxCloneService< type Service = BoxCloneService<
@ -48,8 +48,8 @@ where
fn layer(&self, inner: S) -> Self::Service { fn layer(&self, inner: S) -> Self::Service {
ServiceBuilder::new() ServiceBuilder::new()
.layer(CompressionLayer::new()) .layer(CompressionLayer::new())
.layer(TraceLayer::http_server())
.map_response(|r: Response<_>| r.map(BoxBody::new)) .map_response(|r: Response<_>| r.map(BoxBody::new))
.layer(OtelTraceLayer::server())
.layer(TimeoutLayer::new(Duration::from_secs(10))) .layer(TimeoutLayer::new(Duration::from_secs(10)))
.service(inner) .service(inner)
.boxed_clone() .boxed_clone()

View File

@ -1,170 +0,0 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use http::{header::USER_AGENT, Request, Response, Version};
use opentelemetry::trace::TraceContextExt;
use opentelemetry_http::HeaderExtractor;
use tower::Layer;
use tower_http::{
classify::{ServerErrorsAsFailures, SharedClassifier},
trace::{DefaultOnRequest, MakeSpan, OnResponse, Trace},
};
use tracing::{field, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
#[derive(Debug, Clone, Copy)]
pub enum OtelTraceLayer {
OuterClient(&'static str),
InnerClient,
Server,
}
impl OtelTraceLayer {
pub const fn outer_client(operation: &'static str) -> Self {
Self::OuterClient(operation)
}
pub const fn inner_client() -> Self {
Self::InnerClient
}
pub const fn server() -> Self {
Self::Server
}
}
impl<S> Layer<S> for OtelTraceLayer {
type Service = Trace<
S,
SharedClassifier<ServerErrorsAsFailures>,
MakeOtelSpan,
DefaultOnRequest,
OtelOnResponse,
>;
fn layer(&self, inner: S) -> Self::Service {
let make_span = match self {
Self::OuterClient(o) => MakeOtelSpan::OuterClient(o),
Self::InnerClient => MakeOtelSpan::InnerClient,
Self::Server => MakeOtelSpan::Server,
};
Trace::new_for_http(inner)
.make_span_with(make_span)
.on_response(OtelOnResponse)
}
}
#[derive(Debug, Clone, Copy)]
pub enum MakeOtelSpan {
OuterClient(&'static str),
InnerClient,
Server,
}
impl<B> MakeSpan<B> for MakeOtelSpan {
fn make_span(&mut self, request: &Request<B>) -> Span {
// Extract the context from the headers
let headers = request.headers();
let version = match request.version() {
Version::HTTP_09 => "0.9",
Version::HTTP_10 => "1.0",
Version::HTTP_11 => "1.1",
Version::HTTP_2 => "2.0",
Version::HTTP_3 => "3.0",
_ => "",
};
let span = match self {
Self::OuterClient(operation) => {
tracing::info_span!(
"client_request",
otel.name = operation,
otel.kind = "internal",
otel.status_code = field::Empty,
http.method = %request.method(),
http.target = %request.uri(),
http.flavor = version,
http.status_code = field::Empty,
http.user_agent = field::Empty,
)
}
Self::InnerClient => {
tracing::info_span!(
"outgoing_request",
otel.kind = "client",
otel.status_code = field::Empty,
http.method = %request.method(),
http.target = %request.uri(),
http.flavor = version,
http.status_code = field::Empty,
http.user_agent = field::Empty,
)
}
Self::Server => {
let span = tracing::info_span!(
"incoming_request",
otel.kind = "server",
otel.status_code = field::Empty,
http.method = %request.method(),
http.target = %request.uri(),
http.flavor = version,
http.status_code = field::Empty,
http.user_agent = field::Empty,
);
// Extract the context from the headers for server spans
let headers = request.headers();
let extractor = HeaderExtractor(headers);
let cx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&extractor)
});
if cx.span().span_context().is_remote() {
span.set_parent(cx);
}
span
}
};
if let Some(user_agent) = headers.get(USER_AGENT).and_then(|s| s.to_str().ok()) {
span.record("http.user_agent", &user_agent);
}
span
}
}
#[derive(Debug, Clone, Default)]
pub struct OtelOnResponse;
impl<B> OnResponse<B> for OtelOnResponse {
fn on_response(self, response: &Response<B>, _latency: Duration, span: &Span) {
let s = response.status();
let status = if s.is_success() {
"ok"
} else if s.is_client_error() || s.is_server_error() {
"error"
} else {
"unset"
};
span.record("otel.status_code", &status);
span.record("http.status_code", &s.as_u16());
}
}

View File

@ -29,9 +29,15 @@ use bytes::Bytes;
use futures_util::{FutureExt, TryFutureExt}; use futures_util::{FutureExt, TryFutureExt};
use http::{Request, Response}; use http::{Request, Response};
use http_body::{combinators::BoxBody, Body}; use http_body::{combinators::BoxBody, Body};
use hyper::{client::HttpConnector, Client}; use hyper::{
client::{connect::dns::GaiResolver, HttpConnector},
Client,
};
use hyper_rustls::{ConfigBuilderExt, HttpsConnector, HttpsConnectorBuilder}; use hyper_rustls::{ConfigBuilderExt, HttpsConnector, HttpsConnectorBuilder};
use layers::client::ClientResponse; use layers::{
client::ClientResponse,
otel::{TraceDns, TraceLayer},
};
use thiserror::Error; use thiserror::Error;
use tokio::{sync::OnceCell, task::JoinError}; use tokio::{sync::OnceCell, task::JoinError};
use tower::{util::BoxCloneService, ServiceBuilder, ServiceExt}; use tower::{util::BoxCloneService, ServiceBuilder, ServiceExt};
@ -43,7 +49,7 @@ mod layers;
pub use self::{ pub use self::{
ext::ServiceExt as HttpServiceExt, ext::ServiceExt as HttpServiceExt,
future_service::FutureService, future_service::FutureService,
layers::{client::ClientLayer, json::JsonResponseLayer, server::ServerLayer}, layers::{client::ClientLayer, json::JsonResponseLayer, otel, server::ServerLayer},
}; };
pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>; pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
@ -71,13 +77,17 @@ pub enum ClientInitError {
static TLS_CONFIG: OnceCell<rustls::ClientConfig> = OnceCell::const_new(); static TLS_CONFIG: OnceCell<rustls::ClientConfig> = OnceCell::const_new();
async fn make_base_client<B, E>( async fn make_base_client<B, E>(
) -> Result<hyper::Client<HttpsConnector<HttpConnector>, B>, ClientInitError> ) -> Result<hyper::Client<HttpsConnector<HttpConnector<TraceDns<GaiResolver>>>, B>, ClientInitError>
where where
B: http_body::Body<Data = Bytes, Error = E> + Send + 'static, B: http_body::Body<Data = Bytes, Error = E> + Send + 'static,
E: Into<BoxError>, E: Into<BoxError>,
{ {
// TODO: we could probably hook a tracing DNS resolver there // Trace DNS requests
let mut http = HttpConnector::new(); let resolver = ServiceBuilder::new()
.layer(TraceLayer::dns())
.service(GaiResolver::new());
let mut http = HttpConnector::new_with_resolver(resolver);
http.enforce_http(false); http.enforce_http(false);
let tls_config = TLS_CONFIG let tls_config = TLS_CONFIG