diff --git a/crates/handlers/src/views/recovery/start.rs b/crates/handlers/src/views/recovery/start.rs index 1d922905..2b904f1c 100644 --- a/crates/handlers/src/views/recovery/start.rs +++ b/crates/handlers/src/views/recovery/start.rs @@ -27,7 +27,10 @@ use mas_axum_utils::{ }; use mas_data_model::UserAgent; use mas_router::UrlBuilder; -use mas_storage::{BoxClock, BoxRepository, BoxRng}; +use mas_storage::{ + job::{JobRepositoryExt, SendAccountRecoveryEmailsJob}, + BoxClock, BoxRepository, BoxRng, +}; use mas_templates::{ FieldError, FormState, RecoveryStartContext, RecoveryStartFormField, TemplateContext, Templates, }; @@ -125,7 +128,9 @@ pub(crate) async fn post( ) .await?; - // TODO: spawn a job which will send all the emails + repo.job() + .schedule_job(SendAccountRecoveryEmailsJob::new(&session)) + .await?; repo.save().await?; diff --git a/crates/storage/src/job.rs b/crates/storage/src/job.rs index 58e41326..e7c8257f 100644 --- a/crates/storage/src/job.rs +++ b/crates/storage/src/job.rs @@ -1,4 +1,4 @@ -// Copyright 2023 The Matrix.org Foundation C.I.C. +// Copyright 2023, 2024 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -231,7 +231,7 @@ where mod jobs { // XXX: Move this somewhere else? use apalis_core::job::Job; - use mas_data_model::{Device, User, UserEmail}; + use mas_data_model::{Device, User, UserEmail, UserRecoverySession}; use serde::{Deserialize, Serialize}; use ulid::Ulid; @@ -432,8 +432,40 @@ mod jobs { impl Job for DeactivateUserJob { const NAME: &'static str = "deactivate-user"; } + + /// Send account recovery emails + #[derive(Serialize, Deserialize, Debug, Clone)] + pub struct SendAccountRecoveryEmailsJob { + user_recovery_session_id: Ulid, + } + + impl SendAccountRecoveryEmailsJob { + /// Create a new job to send account recovery emails + /// + /// # Parameters + /// + /// * `user_recovery_session` - The user recovery session to send the + /// email for + #[must_use] + pub fn new(user_recovery_session: &UserRecoverySession) -> Self { + Self { + user_recovery_session_id: user_recovery_session.id, + } + } + + /// The ID of the user recovery session to send the email for + #[must_use] + pub fn user_recovery_session_id(&self) -> Ulid { + self.user_recovery_session_id + } + } + + impl Job for SendAccountRecoveryEmailsJob { + const NAME: &'static str = "send-account-recovery-email"; + } } pub use self::jobs::{ - DeactivateUserJob, DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, VerifyEmailJob, + DeactivateUserJob, DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, + SendAccountRecoveryEmailsJob, VerifyEmailJob, }; diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 7362e688..30a84f7c 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -28,6 +28,7 @@ use crate::storage::PostgresStorageFactory; mod database; mod email; mod matrix; +mod recovery; mod storage; mod user; mod utils; @@ -152,6 +153,7 @@ pub async fn init( let monitor = self::email::register(name, monitor, &state, &factory); let monitor = self::matrix::register(name, monitor, &state, &factory); let monitor = self::user::register(name, monitor, &state, &factory); + let monitor = self::recovery::register(name, monitor, &state, &factory); // TODO: we might want to grab the join handle here factory.listen().await?; debug!(?monitor, "workers registered"); diff --git a/crates/tasks/src/recovery.rs b/crates/tasks/src/recovery.rs new file mode 100644 index 00000000..2e3a8358 --- /dev/null +++ b/crates/tasks/src/recovery.rs @@ -0,0 +1,105 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use anyhow::Context; +use apalis_core::{context::JobContext, executor::TokioExecutor, monitor::Monitor}; +use mas_storage::{ + job::{JobWithSpanContext, SendAccountRecoveryEmailsJob}, + user::{UserEmailFilter, UserRecoveryRepository}, + Pagination, RepositoryAccess, +}; +use rand::distributions::{Alphanumeric, DistString}; +use tracing::info; + +use crate::{storage::PostgresStorageFactory, JobContextExt, State}; + +/// Job to send account recovery emails for a given recovery session. +#[tracing::instrument( + name = "job.send_account_recovery_email", + fields( + user_recovery_session.id = %job.user_recovery_session_id(), + user_recovery_session.email, + ), + skip_all, + err(Debug), +)] +async fn send_account_recovery_email_job( + job: JobWithSpanContext, + ctx: JobContext, +) -> Result<(), anyhow::Error> { + let state = ctx.state(); + let clock = state.clock(); + let mut rng = state.rng(); + let mut repo = state.repository().await?; + + let session = repo + .user_recovery() + .lookup_session(job.user_recovery_session_id()) + .await? + .context("User recovery session not found")?; + + tracing::Span::current().record("user_recovery_session.email", &session.email); + + if session.consumed_at.is_some() { + info!("Recovery session already consumed, not sending email"); + return Ok(()); + } + + let mut cursor = Pagination::first(50); + + loop { + let page = repo + .user_email() + .list( + UserEmailFilter::new() + .for_email(&session.email) + .verified_only(), + cursor, + ) + .await?; + + for email in page.edges { + let ticket = Alphanumeric.sample_string(&mut rng, 32); + + let _ticket = repo + .user_recovery() + .add_ticket(&mut rng, &clock, &session, &email, ticket) + .await?; + + info!("Sending recovery email to {}", email.email); + // TODO + + cursor = cursor.after(email.id); + } + + if !page.has_next_page { + break; + } + } + + repo.save().await?; + + Ok(()) +} + +pub(crate) fn register( + suffix: &str, + monitor: Monitor, + state: &State, + storage_factory: &PostgresStorageFactory, +) -> Monitor { + let send_user_recovery_email_worker = crate::build!(SendAccountRecoveryEmailsJob => send_account_recovery_email_job, suffix, state, storage_factory); + + monitor.register(send_user_recovery_email_worker) +}