diff --git a/Cargo.lock b/Cargo.lock index a41b1a93..dcca97a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3457,10 +3457,13 @@ dependencies = [ "mas-iana", "mas-jose", "oauth2-types", + "opentelemetry", "rand_core 0.6.4", "serde", "serde_json", "thiserror", + "tracing", + "tracing-opentelemetry", "ulid", "url", ] @@ -3499,20 +3502,19 @@ dependencies = [ "apalis-sql", "async-trait", "chrono", - "futures-util", "mas-data-model", "mas-email", "mas-storage", "mas-storage-pg", + "opentelemetry", "rand 0.8.5", "rand_chacha 0.3.1", "serde", "sqlx", "thiserror", - "tokio", - "tokio-stream", "tower", "tracing", + "tracing-opentelemetry", "ulid", ] diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 93a96bb6..a472f3e6 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -22,8 +22,7 @@ use clap::Parser; use mas_config::TelemetryConfig; use sentry_tracing::EventFilter; use tracing_subscriber::{ - filter::LevelFilter, layer::SubscriberExt, reload, util::SubscriberInitExt, EnvFilter, Layer, - Registry, + filter::LevelFilter, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry, }; mod commands; @@ -59,33 +58,6 @@ async fn try_main() -> anyhow::Result<()> { .or_else(|_| EnvFilter::try_new("info")) .context("could not setup logging filter")?; - // Don't fill the telemetry layer for now, we want to configure it based on the - // app config, so we need to delay that a bit - let (telemetry_layer, telemetry_handle) = reload::Layer::new(None); - // We only want "INFO" level spans to go through OpenTelemetry - let telemetry_layer = telemetry_layer.with_filter(LevelFilter::INFO); - - // Don't fill the Sentry layer for now, we want to configure it based on the - // app config, so we need to delay that a bit - let (sentry_layer, sentry_handle) = reload::Layer::new(None); - - let subscriber = Registry::default() - .with(sentry_layer) - .with(telemetry_layer) - .with(filter_layer) - .with(fmt_layer); - subscriber - .try_init() - .context("could not initialize logging")?; - - // Now that logging is set up, we can log stuff, like if the .env file was - // loaded or not - match dotenv_path { - Ok(Some(path)) => tracing::info!(?path, "Loaded environment variables from file"), - Ok(None) => {} - Err(err) => tracing::warn!(%err, "failed to load .env file"), - } - // Parse the CLI arguments let opts = self::commands::Options::parse(); @@ -104,8 +76,9 @@ async fn try_main() -> anyhow::Result<()> { ..Default::default() }, )); - if sentry.is_enabled() { - let layer = sentry_tracing::layer().event_filter(|md| { + + let sentry_layer = sentry.is_enabled().then(|| { + sentry_tracing::layer().event_filter(|md| { // All the spans in the handlers module send their data to Sentry themselves, so // we only create breadcrumbs for them, instead of full events if md.target().starts_with("mas_handlers::") { @@ -113,22 +86,35 @@ async fn try_main() -> anyhow::Result<()> { } else { sentry_tracing::default_event_filter(md) } - }); + }) + }); - sentry_handle.reload(layer)?; - } - - // Setup OpenTelemtry tracing and metrics + // Setup OpenTelemetry tracing and metrics let (tracer, _meter) = telemetry::setup(&telemetry_config) .await - .context("failed to setup opentelemetry")?; - if let Some(tracer) = tracer { - // Now we can swap out the actual opentelemetry tracing layer - telemetry_handle.reload( - tracing_opentelemetry::layer() - .with_tracer(tracer) - .with_tracked_inactivity(false), - )?; + .context("failed to setup OpenTelemetry")?; + + let telemetry_layer = tracer.map(|tracer| { + tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_tracked_inactivity(false) + .with_filter(LevelFilter::INFO) + }); + + let subscriber = Registry::default() + .with(sentry_layer) + .with(telemetry_layer) + .with(filter_layer) + .with(fmt_layer); + subscriber + .try_init() + .context("could not initialize logging")?; + + // Log about the .env loading + match dotenv_path { + Ok(Some(path)) => tracing::info!(?path, "Loaded environment variables from .env file"), + Ok(None) => {} + Err(e) => tracing::warn!(?e, "Failed to load .env file"), } // And run the command diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index cd299eb7..195c0325 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -12,9 +12,12 @@ thiserror = "1.0.39" futures-util = "0.3.27" apalis-core = { version = "0.4.0-alpha.4", features = ["tokio-comp"] } +opentelemetry = "0.18.0" rand_core = "0.6.4" serde = "1.0.159" serde_json = "1.0.95" +tracing = "0.1.37" +tracing-opentelemetry = "0.18.0" url = "2.3.1" ulid = "1.0.0" diff --git a/crates/storage/src/job.rs b/crates/storage/src/job.rs index 237e7a49..78db872d 100644 --- a/crates/storage/src/job.rs +++ b/crates/storage/src/job.rs @@ -14,10 +14,14 @@ //! Repository to schedule persistent jobs. +use std::{num::ParseIntError, ops::Deref}; + pub use apalis_core::job::{Job, JobId}; use async_trait::async_trait; -use serde::Serialize; +use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState}; +use serde::{Deserialize, Serialize}; use serde_json::Value; +use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::repository_impl; @@ -27,6 +31,75 @@ pub struct JobSubmission { payload: Value, } +#[derive(Serialize, Deserialize)] +struct SerializableSpanContext { + trace_id: String, + span_id: String, + trace_flags: u8, +} + +impl From<&SpanContext> for SerializableSpanContext { + fn from(value: &SpanContext) -> Self { + Self { + trace_id: value.trace_id().to_string(), + span_id: value.span_id().to_string(), + trace_flags: value.trace_flags().to_u8(), + } + } +} + +impl TryFrom<&SerializableSpanContext> for SpanContext { + type Error = ParseIntError; + + fn try_from(value: &SerializableSpanContext) -> Result { + Ok(SpanContext::new( + TraceId::from_hex(&value.trace_id)?, + SpanId::from_hex(&value.span_id)?, + TraceFlags::new(value.trace_flags), + // XXX: is that fine? + true, + TraceState::default(), + )) + } +} + +/// A wrapper for [`Job`] which adds the span context in the payload. +#[derive(Serialize, Deserialize)] +pub struct JobWithSpanContext { + #[serde(skip_serializing_if = "Option::is_none")] + span_context: Option, + + #[serde(flatten)] + payload: T, +} + +impl Job for JobWithSpanContext { + const NAME: &'static str = J::NAME; +} + +impl Deref for JobWithSpanContext { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.payload + } +} + +impl JobWithSpanContext { + /// Get the span context of the job. + /// + /// # Returns + /// + /// Returns [`None`] if the job has no span context, or if the span context + /// is invalid. + #[must_use] + pub fn span_context(&self) -> Option { + self.span_context + .as_ref() + .and_then(|ctx| ctx.try_into().ok()) + } +} + impl JobSubmission { /// Create a new job submission out of a [`Job`]. /// @@ -35,12 +108,30 @@ impl JobSubmission { /// Panics if the job cannot be serialized. #[must_use] pub fn new(job: J) -> Self { + let payload = serde_json::to_value(job).expect("Could not serialize job"); + Self { name: J::NAME, - payload: serde_json::to_value(job).expect("failed to serialize job"), + payload, } } + /// Create a new job submission out of a [`Job`] and a [`SpanContext`]. + /// + /// # Panics + /// + /// Panics if the job cannot be serialized. + #[must_use] + pub fn new_with_span_context(job: J, span_context: &SpanContext) -> Self { + // Serialize the span context alongside the job. + let span_context = SerializableSpanContext::from(span_context); + + Self::new(JobWithSpanContext { + payload: job, + span_context: Some(span_context), + }) + } + /// The name of the job. #[must_use] pub fn name(&self) -> &'static str { @@ -104,8 +195,21 @@ where { type Error = T::Error; + #[tracing::instrument( + name = "db.job.schedule_job", + skip_all, + fields( + job.name = J::NAME, + ), + )] async fn schedule_job(&mut self, job: J) -> Result { - self.schedule_submission(JobSubmission::new(job)).await + let span = tracing::Span::current(); + let ctx = span.context(); + let span = ctx.span(); + let span_context = span.span_context(); + + self.schedule_submission(JobSubmission::new_with_span_context(job, span_context)) + .await } } diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 66758816..553f6813 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -12,15 +12,14 @@ apalis-cron = "0.4.0-alpha.4" apalis-sql = { version = "0.4.0-alpha.4", features = ["postgres", "tokio-comp"] } async-trait = "0.1.66" chrono = "0.4.24" -futures-util = "0.3.27" rand = "0.8.5" rand_chacha = "0.3.1" sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "postgres"] } thiserror = "1.0.30" -tokio = "1.26.0" -tokio-stream = "0.1.12" -tower = { version = "0.4.13", features = ["util"] } +tower = "0.4.13" tracing = "0.1.37" +tracing-opentelemetry = "0.18.0" +opentelemetry = "0.18.0" ulid = "1.0.0" serde = { version = "1.0.159", features = ["derive"] } diff --git a/crates/tasks/src/email.rs b/crates/tasks/src/email.rs index ef61e197..14b73f77 100644 --- a/crates/tasks/src/email.rs +++ b/crates/tasks/src/email.rs @@ -23,61 +23,80 @@ use apalis_core::{ }; use chrono::Duration; use mas_email::{Address, EmailVerificationContext, Mailbox}; -use mas_storage::job::VerifyEmailJob; +use mas_storage::job::{JobWithSpanContext, VerifyEmailJob}; use rand::{distributions::Uniform, Rng}; -use tracing::info; +use tracing::{info, info_span, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::{JobContextExt, State}; -async fn verify_email(job: VerifyEmailJob, ctx: JobContext) -> Result<(), anyhow::Error> { - let state = ctx.state(); - let mut repo = state.repository().await?; - let mut rng = state.rng(); - let mailer = state.mailer(); - let clock = state.clock(); - - // Lookup the user email - let user_email = repo - .user_email() - .lookup(job.user_email_id()) - .await? - .context("User email not found")?; - - // Lookup the user associated with the email - let user = repo - .user() - .lookup(user_email.user_id) - .await? - .context("User not found")?; - - // Generate a verification code - let range = Uniform::::from(0..1_000_000); - let code = rng.sample(range); - let code = format!("{code:06}"); - - let address: Address = user_email.email.parse()?; - - // Save the verification code in the database - let verification = repo - .user_email() - .add_verification_code(&mut rng, &clock, &user_email, Duration::hours(8), code) - .await?; - - // And send the verification email - let mailbox = Mailbox::new(Some(user.username.clone()), address); - - let context = EmailVerificationContext::new(user.clone(), verification.clone()); - - mailer.send_verification_email(mailbox, &context).await?; - - info!( - email.id = %user_email.id, - "Verification email sent" +async fn verify_email( + job: JobWithSpanContext, + ctx: JobContext, +) -> Result<(), anyhow::Error> { + let span = info_span!( + "job.verify_email", + job.id = %ctx.id(), + job.attempts = ctx.attempts(), + user_email.id = %job.user_email_id(), ); - repo.save().await?; + if let Some(context) = job.span_context() { + span.add_link(context); + } - Ok(()) + async move { + let state = ctx.state(); + let mut repo = state.repository().await?; + let mut rng = state.rng(); + let mailer = state.mailer(); + let clock = state.clock(); + + // Lookup the user email + let user_email = repo + .user_email() + .lookup(job.user_email_id()) + .await? + .context("User email not found")?; + + // Lookup the user associated with the email + let user = repo + .user() + .lookup(user_email.user_id) + .await? + .context("User not found")?; + + // Generate a verification code + let range = Uniform::::from(0..1_000_000); + let code = rng.sample(range); + let code = format!("{code:06}"); + + let address: Address = user_email.email.parse()?; + + // Save the verification code in the database + let verification = repo + .user_email() + .add_verification_code(&mut rng, &clock, &user_email, Duration::hours(8), code) + .await?; + + // And send the verification email + let mailbox = Mailbox::new(Some(user.username.clone()), address); + + let context = EmailVerificationContext::new(user.clone(), verification.clone()); + + mailer.send_verification_email(mailbox, &context).await?; + + info!( + email.id = %user_email.id, + "Verification email sent" + ); + + repo.save().await?; + + Ok(()) + } + .instrument(span) + .await } pub(crate) fn register(monitor: Monitor, state: &State) -> Monitor {