You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-07-29 22:01:14 +03:00
Propagate parent trace context
This commit is contained in:
14
Cargo.lock
generated
14
Cargo.lock
generated
@ -1397,6 +1397,7 @@ dependencies = [
|
|||||||
"mas-config",
|
"mas-config",
|
||||||
"mas-core",
|
"mas-core",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
|
"opentelemetry-http",
|
||||||
"opentelemetry-otlp",
|
"opentelemetry-otlp",
|
||||||
"opentelemetry-semantic-conventions",
|
"opentelemetry-semantic-conventions",
|
||||||
"schemars",
|
"schemars",
|
||||||
@ -1764,6 +1765,19 @@ dependencies = [
|
|||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "opentelemetry-http"
|
||||||
|
version = "0.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d50ceb0b0e8b75cb3e388a2571a807c8228dabc5d6670f317b6eb21301095373"
|
||||||
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
|
"bytes",
|
||||||
|
"futures-util",
|
||||||
|
"http",
|
||||||
|
"opentelemetry",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "opentelemetry-otlp"
|
name = "opentelemetry-otlp"
|
||||||
version = "0.9.0"
|
version = "0.9.0"
|
||||||
|
@ -28,6 +28,7 @@ url = "2.2.2"
|
|||||||
|
|
||||||
mas-config = { path = "../config" }
|
mas-config = { path = "../config" }
|
||||||
mas-core = { path = "../core" }
|
mas-core = { path = "../core" }
|
||||||
|
opentelemetry-http = "0.5.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
indoc = "1.0.3"
|
indoc = "1.0.3"
|
||||||
|
@ -19,21 +19,21 @@ use std::{
|
|||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use clap::Clap;
|
use clap::Clap;
|
||||||
use hyper::{header, Server};
|
use hyper::{header, Server, Version};
|
||||||
use mas_config::RootConfig;
|
use mas_config::RootConfig;
|
||||||
use mas_core::{
|
use mas_core::{
|
||||||
storage::MIGRATOR,
|
storage::MIGRATOR,
|
||||||
tasks::{self, TaskQueue},
|
tasks::{self, TaskQueue},
|
||||||
templates::Templates,
|
templates::Templates,
|
||||||
};
|
};
|
||||||
|
use opentelemetry_http::HeaderExtractor;
|
||||||
use tower::{make::Shared, ServiceBuilder};
|
use tower::{make::Shared, ServiceBuilder};
|
||||||
use tower_http::{
|
use tower_http::{
|
||||||
compression::CompressionLayer,
|
compression::CompressionLayer,
|
||||||
sensitive_headers::SetSensitiveHeadersLayer,
|
sensitive_headers::SetSensitiveHeadersLayer,
|
||||||
trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer},
|
trace::{MakeSpan, OnResponse, TraceLayer},
|
||||||
LatencyUnit,
|
|
||||||
};
|
};
|
||||||
use tracing::info;
|
use tracing::{field, info};
|
||||||
|
|
||||||
use super::RootCommand;
|
use super::RootCommand;
|
||||||
|
|
||||||
@ -44,6 +44,72 @@ pub(super) struct ServerCommand {
|
|||||||
migrate: bool,
|
migrate: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct OtelMakeSpan;
|
||||||
|
|
||||||
|
impl<B> MakeSpan<B> for OtelMakeSpan {
|
||||||
|
fn make_span(&mut self, request: &hyper::Request<B>) -> tracing::Span {
|
||||||
|
// Extract the context from the headers
|
||||||
|
let headers = request.headers();
|
||||||
|
let extractor = HeaderExtractor(headers);
|
||||||
|
|
||||||
|
let cx = opentelemetry::global::get_text_map_propagator(|propagator| {
|
||||||
|
propagator.extract(&extractor)
|
||||||
|
});
|
||||||
|
|
||||||
|
// Attach the context so when the request span is created it gets properly
|
||||||
|
// parented
|
||||||
|
let _guard = cx.attach();
|
||||||
|
|
||||||
|
let version = match request.version() {
|
||||||
|
Version::HTTP_09 => "0.9",
|
||||||
|
Version::HTTP_10 => "1.0",
|
||||||
|
Version::HTTP_11 => "1.1",
|
||||||
|
Version::HTTP_2 => "2.0",
|
||||||
|
Version::HTTP_3 => "3.0",
|
||||||
|
_ => "",
|
||||||
|
};
|
||||||
|
|
||||||
|
let span = tracing::info_span!(
|
||||||
|
"request",
|
||||||
|
http.method = %request.method(),
|
||||||
|
http.target = %request.uri(),
|
||||||
|
http.flavor = version,
|
||||||
|
http.status_code = field::Empty,
|
||||||
|
http.user_agent = field::Empty,
|
||||||
|
otel.kind = "server",
|
||||||
|
otel.status_code = field::Empty,
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(user_agent) = headers
|
||||||
|
.get(header::USER_AGENT)
|
||||||
|
.and_then(|s| s.to_str().ok())
|
||||||
|
{
|
||||||
|
span.record("http.user_agent", &user_agent);
|
||||||
|
}
|
||||||
|
|
||||||
|
span
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct OtelOnResponse;
|
||||||
|
|
||||||
|
impl<B> OnResponse<B> for OtelOnResponse {
|
||||||
|
fn on_response(self, response: &hyper::Response<B>, _latency: Duration, span: &tracing::Span) {
|
||||||
|
let s = response.status();
|
||||||
|
let status = if s.is_success() {
|
||||||
|
"ok"
|
||||||
|
} else if s.is_client_error() || s.is_server_error() {
|
||||||
|
"error"
|
||||||
|
} else {
|
||||||
|
"unset"
|
||||||
|
};
|
||||||
|
span.record("otel.status_code", &status);
|
||||||
|
span.record("http.status_code", &s.as_u16());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl ServerCommand {
|
impl ServerCommand {
|
||||||
pub async fn run(&self, root: &RootCommand) -> anyhow::Result<()> {
|
pub async fn run(&self, root: &RootCommand) -> anyhow::Result<()> {
|
||||||
let config: RootConfig = root.load_config()?;
|
let config: RootConfig = root.load_config()?;
|
||||||
@ -80,12 +146,8 @@ impl ServerCommand {
|
|||||||
// Add high level tracing/logging to all requests
|
// Add high level tracing/logging to all requests
|
||||||
.layer(
|
.layer(
|
||||||
TraceLayer::new_for_http()
|
TraceLayer::new_for_http()
|
||||||
.make_span_with(DefaultMakeSpan::new().include_headers(true))
|
.make_span_with(OtelMakeSpan)
|
||||||
.on_response(
|
.on_response(OtelOnResponse),
|
||||||
DefaultOnResponse::new()
|
|
||||||
.include_headers(true)
|
|
||||||
.latency_unit(LatencyUnit::Micros),
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
// Set a timeout
|
// Set a timeout
|
||||||
.timeout(Duration::from_secs(10))
|
.timeout(Duration::from_secs(10))
|
||||||
|
@ -18,12 +18,19 @@ use futures::stream::{Stream, StreamExt};
|
|||||||
use mas_config::{MetricsConfig, TelemetryConfig, TracingConfig};
|
use mas_config::{MetricsConfig, TelemetryConfig, TracingConfig};
|
||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
global,
|
global,
|
||||||
sdk::{self, trace::Tracer, Resource},
|
propagation::TextMapPropagator,
|
||||||
|
sdk::{
|
||||||
|
self,
|
||||||
|
propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator},
|
||||||
|
trace::Tracer,
|
||||||
|
Resource,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use opentelemetry_semantic_conventions as semcov;
|
use opentelemetry_semantic_conventions as semcov;
|
||||||
|
|
||||||
pub fn setup(config: &TelemetryConfig) -> anyhow::Result<Option<Tracer>> {
|
pub fn setup(config: &TelemetryConfig) -> anyhow::Result<Option<Tracer>> {
|
||||||
global::set_error_handler(|e| tracing::error!("{}", e))?;
|
global::set_error_handler(|e| tracing::error!("{}", e))?;
|
||||||
|
global::set_text_map_propagator(propagator());
|
||||||
|
|
||||||
let tracer = tracer(&config.tracing)?;
|
let tracer = tracer(&config.tracing)?;
|
||||||
meter(&config.metrics)?;
|
meter(&config.metrics)?;
|
||||||
@ -34,6 +41,17 @@ pub fn shutdown() {
|
|||||||
global::shutdown_tracer_provider();
|
global::shutdown_tracer_provider();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn propagator() -> impl TextMapPropagator {
|
||||||
|
// TODO: make this configurable
|
||||||
|
let baggage_propagator = BaggagePropagator::new();
|
||||||
|
let trace_context_propagator = TraceContextPropagator::new();
|
||||||
|
|
||||||
|
TextMapCompositePropagator::new(vec![
|
||||||
|
Box::new(baggage_propagator),
|
||||||
|
Box::new(trace_context_propagator),
|
||||||
|
])
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "otlp")]
|
#[cfg(feature = "otlp")]
|
||||||
fn otlp_tracer(endpoint: &Option<url::Url>) -> anyhow::Result<Tracer> {
|
fn otlp_tracer(endpoint: &Option<url::Url>) -> anyhow::Result<Tracer> {
|
||||||
use opentelemetry_otlp::WithExportConfig;
|
use opentelemetry_otlp::WithExportConfig;
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use hyper::Method;
|
||||||
use oauth2_types::{
|
use oauth2_types::{
|
||||||
oidc::Metadata,
|
oidc::Metadata,
|
||||||
pkce::CodeChallengeMethod,
|
pkce::CodeChallengeMethod,
|
||||||
@ -86,10 +87,15 @@ pub(super) fn filter(
|
|||||||
code_challenge_methods_supported,
|
code_challenge_methods_supported,
|
||||||
};
|
};
|
||||||
|
|
||||||
let cors = warp::cors().allow_any_origin();
|
// TODO: get the headers list from the global opentelemetry propagators
|
||||||
|
let cors = warp::cors()
|
||||||
|
.allow_method(Method::GET)
|
||||||
|
.allow_any_origin()
|
||||||
|
.allow_headers(["traceparent"]);
|
||||||
|
|
||||||
warp::path!(".well-known" / "openid-configuration")
|
warp::path!(".well-known" / "openid-configuration").and(
|
||||||
.and(warp::get())
|
warp::get()
|
||||||
.map(move || warp::reply::json(&metadata))
|
.map(move || warp::reply::json(&metadata))
|
||||||
.with(cors)
|
.with(cors),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user