You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-08-07 17:03:01 +03:00
Job to generate codes for all emails in a recovery session
This commit is contained in:
@@ -27,7 +27,10 @@ use mas_axum_utils::{
|
|||||||
};
|
};
|
||||||
use mas_data_model::UserAgent;
|
use mas_data_model::UserAgent;
|
||||||
use mas_router::UrlBuilder;
|
use mas_router::UrlBuilder;
|
||||||
use mas_storage::{BoxClock, BoxRepository, BoxRng};
|
use mas_storage::{
|
||||||
|
job::{JobRepositoryExt, SendAccountRecoveryEmailsJob},
|
||||||
|
BoxClock, BoxRepository, BoxRng,
|
||||||
|
};
|
||||||
use mas_templates::{
|
use mas_templates::{
|
||||||
FieldError, FormState, RecoveryStartContext, RecoveryStartFormField, TemplateContext, Templates,
|
FieldError, FormState, RecoveryStartContext, RecoveryStartFormField, TemplateContext, Templates,
|
||||||
};
|
};
|
||||||
@@ -125,7 +128,9 @@ pub(crate) async fn post(
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// TODO: spawn a job which will send all the emails
|
repo.job()
|
||||||
|
.schedule_job(SendAccountRecoveryEmailsJob::new(&session))
|
||||||
|
.await?;
|
||||||
|
|
||||||
repo.save().await?;
|
repo.save().await?;
|
||||||
|
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 The Matrix.org Foundation C.I.C.
|
// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
|
||||||
//
|
//
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License.
|
// you may not use this file except in compliance with the License.
|
||||||
@@ -231,7 +231,7 @@ where
|
|||||||
mod jobs {
|
mod jobs {
|
||||||
// XXX: Move this somewhere else?
|
// XXX: Move this somewhere else?
|
||||||
use apalis_core::job::Job;
|
use apalis_core::job::Job;
|
||||||
use mas_data_model::{Device, User, UserEmail};
|
use mas_data_model::{Device, User, UserEmail, UserRecoverySession};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use ulid::Ulid;
|
use ulid::Ulid;
|
||||||
|
|
||||||
@@ -432,8 +432,40 @@ mod jobs {
|
|||||||
impl Job for DeactivateUserJob {
|
impl Job for DeactivateUserJob {
|
||||||
const NAME: &'static str = "deactivate-user";
|
const NAME: &'static str = "deactivate-user";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send account recovery emails
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct SendAccountRecoveryEmailsJob {
|
||||||
|
user_recovery_session_id: Ulid,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SendAccountRecoveryEmailsJob {
|
||||||
|
/// Create a new job to send account recovery emails
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
///
|
||||||
|
/// * `user_recovery_session` - The user recovery session to send the
|
||||||
|
/// email for
|
||||||
|
#[must_use]
|
||||||
|
pub fn new(user_recovery_session: &UserRecoverySession) -> Self {
|
||||||
|
Self {
|
||||||
|
user_recovery_session_id: user_recovery_session.id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The ID of the user recovery session to send the email for
|
||||||
|
#[must_use]
|
||||||
|
pub fn user_recovery_session_id(&self) -> Ulid {
|
||||||
|
self.user_recovery_session_id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job for SendAccountRecoveryEmailsJob {
|
||||||
|
const NAME: &'static str = "send-account-recovery-email";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub use self::jobs::{
|
pub use self::jobs::{
|
||||||
DeactivateUserJob, DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, VerifyEmailJob,
|
DeactivateUserJob, DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob,
|
||||||
|
SendAccountRecoveryEmailsJob, VerifyEmailJob,
|
||||||
};
|
};
|
||||||
|
@@ -28,6 +28,7 @@ use crate::storage::PostgresStorageFactory;
|
|||||||
mod database;
|
mod database;
|
||||||
mod email;
|
mod email;
|
||||||
mod matrix;
|
mod matrix;
|
||||||
|
mod recovery;
|
||||||
mod storage;
|
mod storage;
|
||||||
mod user;
|
mod user;
|
||||||
mod utils;
|
mod utils;
|
||||||
@@ -152,6 +153,7 @@ pub async fn init(
|
|||||||
let monitor = self::email::register(name, monitor, &state, &factory);
|
let monitor = self::email::register(name, monitor, &state, &factory);
|
||||||
let monitor = self::matrix::register(name, monitor, &state, &factory);
|
let monitor = self::matrix::register(name, monitor, &state, &factory);
|
||||||
let monitor = self::user::register(name, monitor, &state, &factory);
|
let monitor = self::user::register(name, monitor, &state, &factory);
|
||||||
|
let monitor = self::recovery::register(name, monitor, &state, &factory);
|
||||||
// TODO: we might want to grab the join handle here
|
// TODO: we might want to grab the join handle here
|
||||||
factory.listen().await?;
|
factory.listen().await?;
|
||||||
debug!(?monitor, "workers registered");
|
debug!(?monitor, "workers registered");
|
||||||
|
105
crates/tasks/src/recovery.rs
Normal file
105
crates/tasks/src/recovery.rs
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
// Copyright 2024 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
use apalis_core::{context::JobContext, executor::TokioExecutor, monitor::Monitor};
|
||||||
|
use mas_storage::{
|
||||||
|
job::{JobWithSpanContext, SendAccountRecoveryEmailsJob},
|
||||||
|
user::{UserEmailFilter, UserRecoveryRepository},
|
||||||
|
Pagination, RepositoryAccess,
|
||||||
|
};
|
||||||
|
use rand::distributions::{Alphanumeric, DistString};
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
use crate::{storage::PostgresStorageFactory, JobContextExt, State};
|
||||||
|
|
||||||
|
/// Job to send account recovery emails for a given recovery session.
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "job.send_account_recovery_email",
|
||||||
|
fields(
|
||||||
|
user_recovery_session.id = %job.user_recovery_session_id(),
|
||||||
|
user_recovery_session.email,
|
||||||
|
),
|
||||||
|
skip_all,
|
||||||
|
err(Debug),
|
||||||
|
)]
|
||||||
|
async fn send_account_recovery_email_job(
|
||||||
|
job: JobWithSpanContext<SendAccountRecoveryEmailsJob>,
|
||||||
|
ctx: JobContext,
|
||||||
|
) -> Result<(), anyhow::Error> {
|
||||||
|
let state = ctx.state();
|
||||||
|
let clock = state.clock();
|
||||||
|
let mut rng = state.rng();
|
||||||
|
let mut repo = state.repository().await?;
|
||||||
|
|
||||||
|
let session = repo
|
||||||
|
.user_recovery()
|
||||||
|
.lookup_session(job.user_recovery_session_id())
|
||||||
|
.await?
|
||||||
|
.context("User recovery session not found")?;
|
||||||
|
|
||||||
|
tracing::Span::current().record("user_recovery_session.email", &session.email);
|
||||||
|
|
||||||
|
if session.consumed_at.is_some() {
|
||||||
|
info!("Recovery session already consumed, not sending email");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut cursor = Pagination::first(50);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let page = repo
|
||||||
|
.user_email()
|
||||||
|
.list(
|
||||||
|
UserEmailFilter::new()
|
||||||
|
.for_email(&session.email)
|
||||||
|
.verified_only(),
|
||||||
|
cursor,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
for email in page.edges {
|
||||||
|
let ticket = Alphanumeric.sample_string(&mut rng, 32);
|
||||||
|
|
||||||
|
let _ticket = repo
|
||||||
|
.user_recovery()
|
||||||
|
.add_ticket(&mut rng, &clock, &session, &email, ticket)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
info!("Sending recovery email to {}", email.email);
|
||||||
|
// TODO
|
||||||
|
|
||||||
|
cursor = cursor.after(email.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if !page.has_next_page {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
repo.save().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn register(
|
||||||
|
suffix: &str,
|
||||||
|
monitor: Monitor<TokioExecutor>,
|
||||||
|
state: &State,
|
||||||
|
storage_factory: &PostgresStorageFactory,
|
||||||
|
) -> Monitor<TokioExecutor> {
|
||||||
|
let send_user_recovery_email_worker = crate::build!(SendAccountRecoveryEmailsJob => send_account_recovery_email_job, suffix, state, storage_factory);
|
||||||
|
|
||||||
|
monitor.register(send_user_recovery_email_worker)
|
||||||
|
}
|
Reference in New Issue
Block a user