1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-07-07 22:41:18 +03:00

Properly propagate trace contexts

This also fixes a long-running issue where the OTEL context was not properly set in the tracing spans.
This commit is contained in:
Quentin Gliech
2023-04-03 11:14:24 +02:00
parent 1f748f7d1e
commit f4fff72b22
6 changed files with 215 additions and 102 deletions

8
Cargo.lock generated
View File

@ -3457,10 +3457,13 @@ dependencies = [
"mas-iana", "mas-iana",
"mas-jose", "mas-jose",
"oauth2-types", "oauth2-types",
"opentelemetry",
"rand_core 0.6.4", "rand_core 0.6.4",
"serde", "serde",
"serde_json", "serde_json",
"thiserror", "thiserror",
"tracing",
"tracing-opentelemetry",
"ulid", "ulid",
"url", "url",
] ]
@ -3499,20 +3502,19 @@ dependencies = [
"apalis-sql", "apalis-sql",
"async-trait", "async-trait",
"chrono", "chrono",
"futures-util",
"mas-data-model", "mas-data-model",
"mas-email", "mas-email",
"mas-storage", "mas-storage",
"mas-storage-pg", "mas-storage-pg",
"opentelemetry",
"rand 0.8.5", "rand 0.8.5",
"rand_chacha 0.3.1", "rand_chacha 0.3.1",
"serde", "serde",
"sqlx", "sqlx",
"thiserror", "thiserror",
"tokio",
"tokio-stream",
"tower", "tower",
"tracing", "tracing",
"tracing-opentelemetry",
"ulid", "ulid",
] ]

View File

@ -22,8 +22,7 @@ use clap::Parser;
use mas_config::TelemetryConfig; use mas_config::TelemetryConfig;
use sentry_tracing::EventFilter; use sentry_tracing::EventFilter;
use tracing_subscriber::{ use tracing_subscriber::{
filter::LevelFilter, layer::SubscriberExt, reload, util::SubscriberInitExt, EnvFilter, Layer, filter::LevelFilter, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry,
Registry,
}; };
mod commands; mod commands;
@ -59,33 +58,6 @@ async fn try_main() -> anyhow::Result<()> {
.or_else(|_| EnvFilter::try_new("info")) .or_else(|_| EnvFilter::try_new("info"))
.context("could not setup logging filter")?; .context("could not setup logging filter")?;
// Don't fill the telemetry layer for now, we want to configure it based on the
// app config, so we need to delay that a bit
let (telemetry_layer, telemetry_handle) = reload::Layer::new(None);
// We only want "INFO" level spans to go through OpenTelemetry
let telemetry_layer = telemetry_layer.with_filter(LevelFilter::INFO);
// Don't fill the Sentry layer for now, we want to configure it based on the
// app config, so we need to delay that a bit
let (sentry_layer, sentry_handle) = reload::Layer::new(None);
let subscriber = Registry::default()
.with(sentry_layer)
.with(telemetry_layer)
.with(filter_layer)
.with(fmt_layer);
subscriber
.try_init()
.context("could not initialize logging")?;
// Now that logging is set up, we can log stuff, like if the .env file was
// loaded or not
match dotenv_path {
Ok(Some(path)) => tracing::info!(?path, "Loaded environment variables from file"),
Ok(None) => {}
Err(err) => tracing::warn!(%err, "failed to load .env file"),
}
// Parse the CLI arguments // Parse the CLI arguments
let opts = self::commands::Options::parse(); let opts = self::commands::Options::parse();
@ -104,8 +76,9 @@ async fn try_main() -> anyhow::Result<()> {
..Default::default() ..Default::default()
}, },
)); ));
if sentry.is_enabled() {
let layer = sentry_tracing::layer().event_filter(|md| { let sentry_layer = sentry.is_enabled().then(|| {
sentry_tracing::layer().event_filter(|md| {
// All the spans in the handlers module send their data to Sentry themselves, so // All the spans in the handlers module send their data to Sentry themselves, so
// we only create breadcrumbs for them, instead of full events // we only create breadcrumbs for them, instead of full events
if md.target().starts_with("mas_handlers::") { if md.target().starts_with("mas_handlers::") {
@ -113,22 +86,35 @@ async fn try_main() -> anyhow::Result<()> {
} else { } else {
sentry_tracing::default_event_filter(md) sentry_tracing::default_event_filter(md)
} }
}); })
});
sentry_handle.reload(layer)?; // Setup OpenTelemetry tracing and metrics
}
// Setup OpenTelemtry tracing and metrics
let (tracer, _meter) = telemetry::setup(&telemetry_config) let (tracer, _meter) = telemetry::setup(&telemetry_config)
.await .await
.context("failed to setup opentelemetry")?; .context("failed to setup OpenTelemetry")?;
if let Some(tracer) = tracer {
// Now we can swap out the actual opentelemetry tracing layer let telemetry_layer = tracer.map(|tracer| {
telemetry_handle.reload( tracing_opentelemetry::layer()
tracing_opentelemetry::layer() .with_tracer(tracer)
.with_tracer(tracer) .with_tracked_inactivity(false)
.with_tracked_inactivity(false), .with_filter(LevelFilter::INFO)
)?; });
let subscriber = Registry::default()
.with(sentry_layer)
.with(telemetry_layer)
.with(filter_layer)
.with(fmt_layer);
subscriber
.try_init()
.context("could not initialize logging")?;
// Log about the .env loading
match dotenv_path {
Ok(Some(path)) => tracing::info!(?path, "Loaded environment variables from .env file"),
Ok(None) => {}
Err(e) => tracing::warn!(?e, "Failed to load .env file"),
} }
// And run the command // And run the command

View File

@ -12,9 +12,12 @@ thiserror = "1.0.39"
futures-util = "0.3.27" futures-util = "0.3.27"
apalis-core = { version = "0.4.0-alpha.4", features = ["tokio-comp"] } apalis-core = { version = "0.4.0-alpha.4", features = ["tokio-comp"] }
opentelemetry = "0.18.0"
rand_core = "0.6.4" rand_core = "0.6.4"
serde = "1.0.159" serde = "1.0.159"
serde_json = "1.0.95" serde_json = "1.0.95"
tracing = "0.1.37"
tracing-opentelemetry = "0.18.0"
url = "2.3.1" url = "2.3.1"
ulid = "1.0.0" ulid = "1.0.0"

View File

@ -14,10 +14,14 @@
//! Repository to schedule persistent jobs. //! Repository to schedule persistent jobs.
use std::{num::ParseIntError, ops::Deref};
pub use apalis_core::job::{Job, JobId}; pub use apalis_core::job::{Job, JobId};
use async_trait::async_trait; use async_trait::async_trait;
use serde::Serialize; use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState};
use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::repository_impl; use crate::repository_impl;
@ -27,6 +31,75 @@ pub struct JobSubmission {
payload: Value, payload: Value,
} }
#[derive(Serialize, Deserialize)]
struct SerializableSpanContext {
trace_id: String,
span_id: String,
trace_flags: u8,
}
impl From<&SpanContext> for SerializableSpanContext {
fn from(value: &SpanContext) -> Self {
Self {
trace_id: value.trace_id().to_string(),
span_id: value.span_id().to_string(),
trace_flags: value.trace_flags().to_u8(),
}
}
}
impl TryFrom<&SerializableSpanContext> for SpanContext {
type Error = ParseIntError;
fn try_from(value: &SerializableSpanContext) -> Result<Self, Self::Error> {
Ok(SpanContext::new(
TraceId::from_hex(&value.trace_id)?,
SpanId::from_hex(&value.span_id)?,
TraceFlags::new(value.trace_flags),
// XXX: is that fine?
true,
TraceState::default(),
))
}
}
/// A wrapper for [`Job`] which adds the span context in the payload.
#[derive(Serialize, Deserialize)]
pub struct JobWithSpanContext<T> {
#[serde(skip_serializing_if = "Option::is_none")]
span_context: Option<SerializableSpanContext>,
#[serde(flatten)]
payload: T,
}
impl<J: Job> Job for JobWithSpanContext<J> {
const NAME: &'static str = J::NAME;
}
impl<T> Deref for JobWithSpanContext<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.payload
}
}
impl<T> JobWithSpanContext<T> {
/// Get the span context of the job.
///
/// # Returns
///
/// Returns [`None`] if the job has no span context, or if the span context
/// is invalid.
#[must_use]
pub fn span_context(&self) -> Option<SpanContext> {
self.span_context
.as_ref()
.and_then(|ctx| ctx.try_into().ok())
}
}
impl JobSubmission { impl JobSubmission {
/// Create a new job submission out of a [`Job`]. /// Create a new job submission out of a [`Job`].
/// ///
@ -35,12 +108,30 @@ impl JobSubmission {
/// Panics if the job cannot be serialized. /// Panics if the job cannot be serialized.
#[must_use] #[must_use]
pub fn new<J: Job + Serialize>(job: J) -> Self { pub fn new<J: Job + Serialize>(job: J) -> Self {
let payload = serde_json::to_value(job).expect("Could not serialize job");
Self { Self {
name: J::NAME, name: J::NAME,
payload: serde_json::to_value(job).expect("failed to serialize job"), payload,
} }
} }
/// Create a new job submission out of a [`Job`] and a [`SpanContext`].
///
/// # Panics
///
/// Panics if the job cannot be serialized.
#[must_use]
pub fn new_with_span_context<J: Job + Serialize>(job: J, span_context: &SpanContext) -> Self {
// Serialize the span context alongside the job.
let span_context = SerializableSpanContext::from(span_context);
Self::new(JobWithSpanContext {
payload: job,
span_context: Some(span_context),
})
}
/// The name of the job. /// The name of the job.
#[must_use] #[must_use]
pub fn name(&self) -> &'static str { pub fn name(&self) -> &'static str {
@ -104,8 +195,21 @@ where
{ {
type Error = T::Error; type Error = T::Error;
#[tracing::instrument(
name = "db.job.schedule_job",
skip_all,
fields(
job.name = J::NAME,
),
)]
async fn schedule_job<J: Job + Serialize>(&mut self, job: J) -> Result<JobId, Self::Error> { async fn schedule_job<J: Job + Serialize>(&mut self, job: J) -> Result<JobId, Self::Error> {
self.schedule_submission(JobSubmission::new(job)).await let span = tracing::Span::current();
let ctx = span.context();
let span = ctx.span();
let span_context = span.span_context();
self.schedule_submission(JobSubmission::new_with_span_context(job, span_context))
.await
} }
} }

View File

@ -12,15 +12,14 @@ apalis-cron = "0.4.0-alpha.4"
apalis-sql = { version = "0.4.0-alpha.4", features = ["postgres", "tokio-comp"] } apalis-sql = { version = "0.4.0-alpha.4", features = ["postgres", "tokio-comp"] }
async-trait = "0.1.66" async-trait = "0.1.66"
chrono = "0.4.24" chrono = "0.4.24"
futures-util = "0.3.27"
rand = "0.8.5" rand = "0.8.5"
rand_chacha = "0.3.1" rand_chacha = "0.3.1"
sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "postgres"] } sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "postgres"] }
thiserror = "1.0.30" thiserror = "1.0.30"
tokio = "1.26.0" tower = "0.4.13"
tokio-stream = "0.1.12"
tower = { version = "0.4.13", features = ["util"] }
tracing = "0.1.37" tracing = "0.1.37"
tracing-opentelemetry = "0.18.0"
opentelemetry = "0.18.0"
ulid = "1.0.0" ulid = "1.0.0"
serde = { version = "1.0.159", features = ["derive"] } serde = { version = "1.0.159", features = ["derive"] }

View File

@ -23,61 +23,80 @@ use apalis_core::{
}; };
use chrono::Duration; use chrono::Duration;
use mas_email::{Address, EmailVerificationContext, Mailbox}; use mas_email::{Address, EmailVerificationContext, Mailbox};
use mas_storage::job::VerifyEmailJob; use mas_storage::job::{JobWithSpanContext, VerifyEmailJob};
use rand::{distributions::Uniform, Rng}; use rand::{distributions::Uniform, Rng};
use tracing::info; use tracing::{info, info_span, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::{JobContextExt, State}; use crate::{JobContextExt, State};
async fn verify_email(job: VerifyEmailJob, ctx: JobContext) -> Result<(), anyhow::Error> { async fn verify_email(
let state = ctx.state(); job: JobWithSpanContext<VerifyEmailJob>,
let mut repo = state.repository().await?; ctx: JobContext,
let mut rng = state.rng(); ) -> Result<(), anyhow::Error> {
let mailer = state.mailer(); let span = info_span!(
let clock = state.clock(); "job.verify_email",
job.id = %ctx.id(),
// Lookup the user email job.attempts = ctx.attempts(),
let user_email = repo user_email.id = %job.user_email_id(),
.user_email()
.lookup(job.user_email_id())
.await?
.context("User email not found")?;
// Lookup the user associated with the email
let user = repo
.user()
.lookup(user_email.user_id)
.await?
.context("User not found")?;
// Generate a verification code
let range = Uniform::<u32>::from(0..1_000_000);
let code = rng.sample(range);
let code = format!("{code:06}");
let address: Address = user_email.email.parse()?;
// Save the verification code in the database
let verification = repo
.user_email()
.add_verification_code(&mut rng, &clock, &user_email, Duration::hours(8), code)
.await?;
// And send the verification email
let mailbox = Mailbox::new(Some(user.username.clone()), address);
let context = EmailVerificationContext::new(user.clone(), verification.clone());
mailer.send_verification_email(mailbox, &context).await?;
info!(
email.id = %user_email.id,
"Verification email sent"
); );
repo.save().await?; if let Some(context) = job.span_context() {
span.add_link(context);
}
Ok(()) async move {
let state = ctx.state();
let mut repo = state.repository().await?;
let mut rng = state.rng();
let mailer = state.mailer();
let clock = state.clock();
// Lookup the user email
let user_email = repo
.user_email()
.lookup(job.user_email_id())
.await?
.context("User email not found")?;
// Lookup the user associated with the email
let user = repo
.user()
.lookup(user_email.user_id)
.await?
.context("User not found")?;
// Generate a verification code
let range = Uniform::<u32>::from(0..1_000_000);
let code = rng.sample(range);
let code = format!("{code:06}");
let address: Address = user_email.email.parse()?;
// Save the verification code in the database
let verification = repo
.user_email()
.add_verification_code(&mut rng, &clock, &user_email, Duration::hours(8), code)
.await?;
// And send the verification email
let mailbox = Mailbox::new(Some(user.username.clone()), address);
let context = EmailVerificationContext::new(user.clone(), verification.clone());
mailer.send_verification_email(mailbox, &context).await?;
info!(
email.id = %user_email.id,
"Verification email sent"
);
repo.save().await?;
Ok(())
}
.instrument(span)
.await
} }
pub(crate) fn register(monitor: Monitor<TokioExecutor>, state: &State) -> Monitor<TokioExecutor> { pub(crate) fn register(monitor: Monitor<TokioExecutor>, state: &State) -> Monitor<TokioExecutor> {