diff --git a/Cargo.lock b/Cargo.lock index dcca97a5..057c68b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,7 +124,7 @@ checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" [[package]] name = "apalis-core" version = "0.4.0-alpha.4" -source = "git+https://github.com/geofmureithi/apalis.git?rev=ead6f840b92a3590a8bf46398eaf65ed55aa92dc#ead6f840b92a3590a8bf46398eaf65ed55aa92dc" +source = "git+https://github.com/geofmureithi/apalis.git?rev=1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8#1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" dependencies = [ "async-stream", "async-trait", @@ -147,7 +147,7 @@ dependencies = [ [[package]] name = "apalis-cron" version = "0.4.0-alpha.4" -source = "git+https://github.com/geofmureithi/apalis.git?rev=ead6f840b92a3590a8bf46398eaf65ed55aa92dc#ead6f840b92a3590a8bf46398eaf65ed55aa92dc" +source = "git+https://github.com/geofmureithi/apalis.git?rev=1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8#1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" dependencies = [ "apalis-core", "async-stream", @@ -161,7 +161,7 @@ dependencies = [ [[package]] name = "apalis-sql" version = "0.4.0-alpha.4" -source = "git+https://github.com/geofmureithi/apalis.git?rev=ead6f840b92a3590a8bf46398eaf65ed55aa92dc#ead6f840b92a3590a8bf46398eaf65ed55aa92dc" +source = "git+https://github.com/geofmureithi/apalis.git?rev=1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8#1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" dependencies = [ "apalis-core", "async-stream", @@ -3506,7 +3506,6 @@ dependencies = [ "mas-email", "mas-storage", "mas-storage-pg", - "opentelemetry", "rand 0.8.5", "rand_chacha 0.3.1", "serde", diff --git a/Cargo.toml b/Cargo.toml index cf7288f0..8cd506d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,12 @@ rev = "0b9295c2db2114cd87aa19abcc1fc00c16b272db" [patch.crates-io.apalis-core] git = "https://github.com/geofmureithi/apalis.git" -rev = "ead6f840b92a3590a8bf46398eaf65ed55aa92dc" +rev = "1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" [patch.crates-io.apalis-sql] git = "https://github.com/geofmureithi/apalis.git" -rev = "ead6f840b92a3590a8bf46398eaf65ed55aa92dc" +rev = "1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" [patch.crates-io.apalis-cron] git = "https://github.com/geofmureithi/apalis.git" -rev = "ead6f840b92a3590a8bf46398eaf65ed55aa92dc" +rev = "1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index 9ef72534..312c9f76 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -22,6 +22,10 @@ use mas_handlers::{AppState, HttpClientFactory, MatrixHomeserver}; use mas_listener::{server::Server, shutdown::ShutdownStream}; use mas_router::UrlBuilder; use mas_storage_pg::MIGRATOR; +use rand::{ + distributions::{Alphanumeric, DistString}, + thread_rng, +}; use tokio::signal::unix::SignalKind; use tracing::{info, info_span, warn, Instrument}; @@ -52,7 +56,7 @@ impl Options { let config: RootConfig = root.load_config()?; // Connect to the database - info!("Conntecting to the database"); + info!("Connecting to the database"); let pool = database_from_config(&config.database).await?; if self.migrate { @@ -87,8 +91,12 @@ impl Options { let mailer = mailer_from_config(&config.email, &templates).await?; mailer.test_connection().await?; - info!("Starting task worker"); - let monitor = mas_tasks::init(&pool, &mailer); + #[allow(clippy::disallowed_methods)] + let mut rng = thread_rng(); + let worker_name = Alphanumeric.sample_string(&mut rng, 10); + + info!(worker_name, "Starting task worker"); + let monitor = mas_tasks::init(&worker_name, &pool, &mailer); // TODO: grab the handle tokio::spawn(monitor.run()); } diff --git a/crates/cli/src/commands/worker.rs b/crates/cli/src/commands/worker.rs index 23742dc1..fa7ebdb2 100644 --- a/crates/cli/src/commands/worker.rs +++ b/crates/cli/src/commands/worker.rs @@ -15,7 +15,11 @@ use clap::Parser; use mas_config::RootConfig; use mas_router::UrlBuilder; -use tracing::{info_span, log::info}; +use rand::{ + distributions::{Alphanumeric, DistString}, + thread_rng, +}; +use tracing::{info, info_span}; use crate::util::{database_from_config, mailer_from_config, templates_from_config}; @@ -28,7 +32,7 @@ impl Options { let config: RootConfig = root.load_config()?; // Connect to the database - info!("Conntecting to the database"); + info!("Connecting to the database"); let pool = database_from_config(&config.database).await?; let url_builder = UrlBuilder::new(config.http.public_base.clone()); @@ -40,8 +44,12 @@ impl Options { mailer.test_connection().await?; drop(config); - info!("Starting task scheduler"); - let monitor = mas_tasks::init(&pool, &mailer); + #[allow(clippy::disallowed_methods)] + let mut rng = thread_rng(); + let worker_name = Alphanumeric.sample_string(&mut rng, 10); + + info!(worker_name, "Starting task scheduler"); + let monitor = mas_tasks::init(&worker_name, &pool, &mailer); span.exit(); diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 553f6813..328a4672 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "mas-tasks" version = "0.1.0" -authors = ["Quentin Gliech "] +authors = ["quentin gliech "] edition = "2021" -license = "Apache-2.0" +license = "apache-2.0" [dependencies] anyhow = "1.0.70" @@ -19,7 +19,6 @@ thiserror = "1.0.30" tower = "0.4.13" tracing = "0.1.37" tracing-opentelemetry = "0.18.0" -opentelemetry = "0.18.0" ulid = "1.0.0" serde = { version = "1.0.159", features = ["derive"] } diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 319c1083..7f5e0eb3 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -68,9 +68,14 @@ pub async fn cleanup_expired_tokens( Ok(()) } -pub(crate) fn register(monitor: Monitor, state: &State) -> Monitor { +pub(crate) fn register( + suffix: &str, + monitor: Monitor, + state: &State, +) -> Monitor { let schedule = apalis_cron::Schedule::from_str("*/15 * * * * *").unwrap(); - let worker = WorkerBuilder::new("cleanup-expired-tokens") + let worker_name = format!("{job}-{suffix}", job = CleanupExpiredTokensJob::NAME); + let worker = WorkerBuilder::new(worker_name) .stream(CronStream::new(schedule).to_stream()) .layer(state.inject()) .build(job_fn(cleanup_expired_tokens)); diff --git a/crates/tasks/src/email.rs b/crates/tasks/src/email.rs index 14b73f77..df9cfbc7 100644 --- a/crates/tasks/src/email.rs +++ b/crates/tasks/src/email.rs @@ -17,6 +17,7 @@ use apalis_core::{ builder::{WorkerBuilder, WorkerFactory}, context::JobContext, executor::TokioExecutor, + job::Job, job_fn::job_fn, monitor::Monitor, storage::builder::WithStorage, @@ -25,84 +26,80 @@ use chrono::Duration; use mas_email::{Address, EmailVerificationContext, Mailbox}; use mas_storage::job::{JobWithSpanContext, VerifyEmailJob}; use rand::{distributions::Uniform, Rng}; -use tracing::{info, info_span, Instrument}; -use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing::info; -use crate::{JobContextExt, State}; +use crate::{layers::TracingLayer, JobContextExt, State}; +#[tracing::instrument( + name = "job.verify_email", + fields(user_email.id = %job.user_email_id()), + skip_all, + err(Debug), +)] async fn verify_email( job: JobWithSpanContext, ctx: JobContext, ) -> Result<(), anyhow::Error> { - let span = info_span!( - "job.verify_email", - job.id = %ctx.id(), - job.attempts = ctx.attempts(), - user_email.id = %job.user_email_id(), + let state = ctx.state(); + let mut repo = state.repository().await?; + let mut rng = state.rng(); + let mailer = state.mailer(); + let clock = state.clock(); + + // Lookup the user email + let user_email = repo + .user_email() + .lookup(job.user_email_id()) + .await? + .context("User email not found")?; + + // Lookup the user associated with the email + let user = repo + .user() + .lookup(user_email.user_id) + .await? + .context("User not found")?; + + // Generate a verification code + let range = Uniform::::from(0..1_000_000); + let code = rng.sample(range); + let code = format!("{code:06}"); + + let address: Address = user_email.email.parse()?; + + // Save the verification code in the database + let verification = repo + .user_email() + .add_verification_code(&mut rng, &clock, &user_email, Duration::hours(8), code) + .await?; + + // And send the verification email + let mailbox = Mailbox::new(Some(user.username.clone()), address); + + let context = EmailVerificationContext::new(user.clone(), verification.clone()); + + mailer.send_verification_email(mailbox, &context).await?; + + info!( + email.id = %user_email.id, + "Verification email sent" ); - if let Some(context) = job.span_context() { - span.add_link(context); - } + repo.save().await?; - async move { - let state = ctx.state(); - let mut repo = state.repository().await?; - let mut rng = state.rng(); - let mailer = state.mailer(); - let clock = state.clock(); - - // Lookup the user email - let user_email = repo - .user_email() - .lookup(job.user_email_id()) - .await? - .context("User email not found")?; - - // Lookup the user associated with the email - let user = repo - .user() - .lookup(user_email.user_id) - .await? - .context("User not found")?; - - // Generate a verification code - let range = Uniform::::from(0..1_000_000); - let code = rng.sample(range); - let code = format!("{code:06}"); - - let address: Address = user_email.email.parse()?; - - // Save the verification code in the database - let verification = repo - .user_email() - .add_verification_code(&mut rng, &clock, &user_email, Duration::hours(8), code) - .await?; - - // And send the verification email - let mailbox = Mailbox::new(Some(user.username.clone()), address); - - let context = EmailVerificationContext::new(user.clone(), verification.clone()); - - mailer.send_verification_email(mailbox, &context).await?; - - info!( - email.id = %user_email.id, - "Verification email sent" - ); - - repo.save().await?; - - Ok(()) - } - .instrument(span) - .await + Ok(()) } -pub(crate) fn register(monitor: Monitor, state: &State) -> Monitor { +pub(crate) fn register( + suffix: &str, + monitor: Monitor, + state: &State, +) -> Monitor { let storage = state.store(); - let worker = WorkerBuilder::new("verify-email") + let worker_name = format!("{job}-{suffix}", job = VerifyEmailJob::NAME); + let worker = WorkerBuilder::new(worker_name) .layer(state.inject()) + .layer(TracingLayer::new()) .with_storage(storage) .build(job_fn(verify_email)); monitor.register(worker) diff --git a/crates/tasks/src/layers.rs b/crates/tasks/src/layers.rs new file mode 100644 index 00000000..4b54ce97 --- /dev/null +++ b/crates/tasks/src/layers.rs @@ -0,0 +1,70 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::task::{Context, Poll}; + +use apalis_core::{job::Job, request::JobRequest}; +use mas_storage::job::JobWithSpanContext; +use tower::{Layer, Service}; +use tracing::{info_span, instrument::Instrumented, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; + +pub struct TracingLayer; + +impl TracingLayer { + pub fn new() -> Self { + Self + } +} + +impl Layer for TracingLayer { + type Service = TracingService; + + fn layer(&self, inner: S) -> Self::Service { + TracingService { inner } + } +} + +pub struct TracingService { + inner: S, +} + +impl Service>> for TracingService +where + J: Job, + S: Service>>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Instrumented; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: JobRequest>) -> Self::Future { + let span = info_span!( + "job.run", + job.id = %req.id(), + job.attempts = req.attempts(), + job.name = J::NAME, + ); + + if let Some(context) = req.inner().span_context() { + span.add_link(context); + } + + self.inner.call(req).instrument(span) + } +} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 6d442c79..4bea7f93 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -27,6 +27,7 @@ use tracing::debug; mod database; mod email; +mod layers; #[derive(Clone)] struct State { @@ -95,11 +96,11 @@ impl JobContextExt for apalis_core::context::JobContext { } #[must_use] -pub fn init(pool: &Pool, mailer: &Mailer) -> Monitor { +pub fn init(name: &str, pool: &Pool, mailer: &Mailer) -> Monitor { let state = State::new(pool.clone(), SystemClock::default(), mailer.clone()); let monitor = Monitor::new(); - let monitor = self::database::register(monitor, &state); - let monitor = self::email::register(monitor, &state); + let monitor = self::database::register(name, monitor, &state); + let monitor = self::email::register(name, monitor, &state); debug!(?monitor, "workers registered"); monitor }