diff --git a/Cargo.lock b/Cargo.lock index fd42f90a..a943b015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3382,6 +3382,7 @@ dependencies = [ "mas-matrix", "serde", "tower", + "tracing", "url", ] diff --git a/crates/cli/src/commands/manage.rs b/crates/cli/src/commands/manage.rs index 192b46c4..4d41ee82 100644 --- a/crates/cli/src/commands/manage.rs +++ b/crates/cli/src/commands/manage.rs @@ -18,14 +18,14 @@ use mas_config::{DatabaseConfig, PasswordsConfig}; use mas_data_model::{Device, TokenType}; use mas_storage::{ compat::{CompatAccessTokenRepository, CompatSessionRepository}, - job::{DeleteDeviceJob, JobRepositoryExt, ProvisionUserJob}, + job::{DeactivateUserJob, DeleteDeviceJob, JobRepositoryExt, ProvisionUserJob}, user::{UserEmailRepository, UserPasswordRepository, UserRepository}, Repository, RepositoryAccess, SystemClock, }; use mas_storage_pg::PgRepository; use rand::SeedableRng; use sqlx::types::Uuid; -use tracing::{info, info_span}; +use tracing::{info, info_span, warn}; use crate::util::{database_from_config, password_manager_from_config}; @@ -74,6 +74,10 @@ enum Subcommand { LockUser { /// User to lock username: String, + + /// Whether to deactivate the user + #[arg(long)] + deactivate: bool, }, /// Unlock a user @@ -343,7 +347,10 @@ impl Options { Ok(()) } - SC::LockUser { username } => { + SC::LockUser { + username, + deactivate, + } => { let _span = info_span!("cli.manage.lock_user", user.username = username).entered(); let config: DatabaseConfig = root.load_config()?; let pool = database_from_config(&config).await?; @@ -357,7 +364,17 @@ impl Options { info!(%user.id, "Locking user"); - repo.user().lock(&clock, user).await?; + // Even though the deactivation job will lock the user, we lock it here in case + // the worker is not running, as we don't have a good way to run a job + // synchronously yet. + let user = repo.user().lock(&clock, user).await?; + + if deactivate { + warn!(%user.id, "Scheduling user deactivation"); + repo.job() + .schedule_job(DeactivateUserJob::new(&user, false)) + .await?; + } repo.save().await?; Ok(()) diff --git a/crates/matrix-synapse/Cargo.toml b/crates/matrix-synapse/Cargo.toml index 64bb19f7..f0a47b97 100644 --- a/crates/matrix-synapse/Cargo.toml +++ b/crates/matrix-synapse/Cargo.toml @@ -9,9 +9,10 @@ license = "Apache-2.0" anyhow = "1.0.72" async-trait = "0.1.72" http = "0.2.9" -url = "2.4.0" serde = { version = "1.0.177", features = ["derive"] } tower = { version = "0.4.13", features = ["util"] } +tracing = "0.1.37" +url = "2.4.0" mas-axum-utils = { path = "../axum-utils" } mas-http = { path = "../http" } diff --git a/crates/matrix-synapse/src/lib.rs b/crates/matrix-synapse/src/lib.rs index 1620faac..3b888e61 100644 --- a/crates/matrix-synapse/src/lib.rs +++ b/crates/matrix-synapse/src/lib.rs @@ -124,6 +124,11 @@ struct SynapseDevice { device_id: String, } +#[derive(Serialize)] +struct SynapseDeactivateUserRequest { + erase: bool, +} + #[async_trait::async_trait] impl HomeserverConnection for SynapseConnection { type Error = anyhow::Error; @@ -132,6 +137,15 @@ impl HomeserverConnection for SynapseConnection { &self.homeserver } + #[tracing::instrument( + name = "homeserver.query_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.mxid = mxid, + ), + err(Display), + )] async fn query_user(&self, mxid: &str) -> Result { let mut client = self .http_client_factory @@ -158,6 +172,16 @@ impl HomeserverConnection for SynapseConnection { }) } + #[tracing::instrument( + name = "homeserver.provision_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.mxid = request.mxid(), + user.id = request.sub(), + ), + err(Display), + )] async fn provision_user(&self, request: &ProvisionRequest) -> Result { let mut body = SynapseUser { external_ids: Some(vec![ExternalID { @@ -213,6 +237,16 @@ impl HomeserverConnection for SynapseConnection { } } + #[tracing::instrument( + name = "homeserver.create_device", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.mxid = mxid, + matrix.device_id = device_id, + ), + err(Display), + )] async fn create_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error> { let mut client = self .http_client_factory @@ -236,6 +270,16 @@ impl HomeserverConnection for SynapseConnection { Ok(()) } + #[tracing::instrument( + name = "homeserver.delete_device", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.mxid = mxid, + matrix.device_id = device_id, + ), + err(Display), + )] async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error> { let mut client = self.http_client_factory.client().await?; @@ -253,4 +297,35 @@ impl HomeserverConnection for SynapseConnection { Ok(()) } + + #[tracing::instrument( + name = "homeserver.delete_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.mxid = mxid, + erase = erase, + ), + err(Display), + )] + async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), Self::Error> { + let mut client = self + .http_client_factory + .client() + .await? + .request_bytes_to_body() + .json_request(); + + let request = self + .post(&format!("_synapse/admin/v1/deactivate/{mxid}")) + .body(SynapseDeactivateUserRequest { erase })?; + + let response = client.ready().await?.call(request).await?; + + if response.status() != StatusCode::OK { + return Err(anyhow::anyhow!("Failed to delete user in Synapse")); + } + + Ok(()) + } } diff --git a/crates/matrix/src/lib.rs b/crates/matrix/src/lib.rs index b1e3e4e5..b5f799d1 100644 --- a/crates/matrix/src/lib.rs +++ b/crates/matrix/src/lib.rs @@ -43,6 +43,12 @@ pub struct ProvisionRequest { } impl ProvisionRequest { + /// Create a new [`ProvisionRequest`]. + /// + /// # Parameters + /// + /// * `mxid` - The Matrix ID to provision. + /// * `sub` - The `sub` of the user, aka the internal ID. #[must_use] pub fn new(mxid: impl Into, sub: impl Into) -> Self { Self { @@ -54,28 +60,41 @@ impl ProvisionRequest { } } + /// Get the `sub` of the user to provision, aka the internal ID. #[must_use] pub fn sub(&self) -> &str { &self.sub } + /// Get the Matrix ID to provision. #[must_use] pub fn mxid(&self) -> &str { &self.mxid } + /// Ask to set the displayname of the user. + /// + /// # Parameters + /// + /// * `displayname` - The displayname to set. #[must_use] pub fn set_displayname(mut self, displayname: String) -> Self { self.displayname = FieldAction::Set(displayname); self } + /// Ask to unset the displayname of the user. #[must_use] pub fn unset_displayname(mut self) -> Self { self.displayname = FieldAction::Unset; self } + /// Call the given callback if the displayname should be set or unset. + /// + /// # Parameters + /// + /// * `callback` - The callback to call. pub fn on_displayname(&self, callback: impl FnOnce(Option<&str>)) -> &Self { match &self.displayname { FieldAction::Unset => callback(None), @@ -86,18 +105,29 @@ impl ProvisionRequest { self } + /// Ask to set the avatar URL of the user. + /// + /// # Parameters + /// + /// * `avatar_url` - The avatar URL to set. #[must_use] pub fn set_avatar_url(mut self, avatar_url: String) -> Self { self.avatar_url = FieldAction::Set(avatar_url); self } + /// Ask to unset the avatar URL of the user. #[must_use] pub fn unset_avatar_url(mut self) -> Self { self.avatar_url = FieldAction::Unset; self } + /// Call the given callback if the avatar URL should be set or unset. + /// + /// # Parameters + /// + /// * `callback` - The callback to call. pub fn on_avatar_url(&self, callback: impl FnOnce(Option<&str>)) -> &Self { match &self.avatar_url { FieldAction::Unset => callback(None), @@ -108,18 +138,29 @@ impl ProvisionRequest { self } + /// Ask to set the emails of the user. + /// + /// # Parameters + /// + /// * `emails` - The list of emails to set. #[must_use] pub fn set_emails(mut self, emails: Vec) -> Self { self.emails = FieldAction::Set(emails); self } + /// Ask to unset the emails of the user. #[must_use] pub fn unset_emails(mut self) -> Self { self.emails = FieldAction::Unset; self } + /// Call the given callback if the emails should be set or unset. + /// + /// # Parameters + /// + /// * `callback` - The callback to call. pub fn on_emails(&self, callback: impl FnOnce(Option<&[String]>)) -> &Self { match &self.emails { FieldAction::Unset => callback(None), @@ -133,17 +174,84 @@ impl ProvisionRequest { #[async_trait::async_trait] pub trait HomeserverConnection: Send + Sync { + /// The error type returned by all methods. type Error; + /// Get the homeserver URL. fn homeserver(&self) -> &str; + + /// Get the Matrix ID of the user with the given localpart. + /// + /// # Parameters + /// + /// * `localpart` - The localpart of the user. fn mxid(&self, localpart: &str) -> String { format!("@{}:{}", localpart, self.homeserver()) } + /// Query the state of a user on the homeserver. + /// + /// # Parameters + /// + /// * `mxid` - The Matrix ID of the user to query. + /// + /// # Errors + /// + /// Returns an error if the homeserver is unreachable or the user does not + /// exist. async fn query_user(&self, mxid: &str) -> Result; + + /// Provision a user on the homeserver. + /// + /// # Parameters + /// + /// * `request` - a [`ProvisionRequest`] containing the details of the user + /// to provision. + /// + /// # Errors + /// + /// Returns an error if the homeserver is unreachable or the user could not + /// be provisioned. async fn provision_user(&self, request: &ProvisionRequest) -> Result; + + /// Create a device for a user on the homeserver. + /// + /// # Parameters + /// + /// * `mxid` - The Matrix ID of the user to create a device for. + /// * `device_id` - The device ID to create. + /// + /// # Errors + /// + /// Returns an error if the homeserver is unreachable or the device could + /// not be created. async fn create_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error>; + + /// Delete a device for a user on the homeserver. + /// + /// # Parameters + /// + /// * `mxid` - The Matrix ID of the user to delete a device for. + /// * `device_id` - The device ID to delete. + /// + /// # Errors + /// + /// Returns an error if the homeserver is unreachable or the device could + /// not be deleted. async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error>; + + /// Delete a user on the homeserver. + /// + /// # Parameters + /// + /// * `mxid` - The Matrix ID of the user to delete. + /// * `erase` - Whether to ask the homeserver to erase the user's data. + /// + /// # Errors + /// + /// Returns an error if the homeserver is unreachable or the user could not + /// be deleted. + async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), Self::Error>; } #[async_trait::async_trait] @@ -169,4 +277,8 @@ impl HomeserverConnection for &T async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error> { (**self).delete_device(mxid, device_id).await } + + async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), Self::Error> { + (**self).delete_user(mxid, erase).await + } } diff --git a/crates/matrix/src/mock.rs b/crates/matrix/src/mock.rs index b13aa0be..33d1e0d4 100644 --- a/crates/matrix/src/mock.rs +++ b/crates/matrix/src/mock.rs @@ -109,6 +109,19 @@ impl HomeserverConnection for MockHomeserverConnection { user.devices.remove(device_id); Ok(()) } + + async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), Self::Error> { + let mut users = self.users.write().await; + let user = users.get_mut(mxid).context("User not found")?; + user.devices.clear(); + user.emails = None; + if erase { + user.avatar_url = None; + user.displayname = None; + } + + Ok(()) + } } #[cfg(test)] diff --git a/crates/storage/src/job.rs b/crates/storage/src/job.rs index d42b2138..2d2f85fc 100644 --- a/crates/storage/src/job.rs +++ b/crates/storage/src/job.rs @@ -73,6 +73,15 @@ pub struct JobWithSpanContext { payload: T, } +impl From for JobWithSpanContext { + fn from(payload: J) -> Self { + Self { + span_context: None, + payload, + } + } +} + impl Job for JobWithSpanContext { const NAME: &'static str = J::NAME; } @@ -369,6 +378,47 @@ mod jobs { impl Job for DeleteDeviceJob { const NAME: &'static str = "delete-device"; } + + /// A job to deactivate and lock a user + #[derive(Serialize, Deserialize, Debug, Clone)] + pub struct DeactivateUserJob { + user_id: Ulid, + hs_erase: bool, + } + + impl DeactivateUserJob { + /// Create a new job to deactivate and lock a user + /// + /// # Parameters + /// + /// * `user` - The user to deactivate + /// * `hs_erase` - Whether to erase the user from the homeserver + #[must_use] + pub fn new(user: &User, hs_erase: bool) -> Self { + Self { + user_id: user.id, + hs_erase, + } + } + + /// The ID of the user to deactivate + #[must_use] + pub fn user_id(&self) -> Ulid { + self.user_id + } + + /// Whether to erase the user from the homeserver + #[must_use] + pub fn hs_erase(&self) -> bool { + self.hs_erase + } + } + + impl Job for DeactivateUserJob { + const NAME: &'static str = "deactivate-user"; + } } -pub use self::jobs::{DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, VerifyEmailJob}; +pub use self::jobs::{ + DeactivateUserJob, DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, VerifyEmailJob, +}; diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index da599687..1019bc4f 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -33,6 +33,7 @@ mod database; mod email; mod matrix; mod storage; +mod user; mod utils; #[derive(Clone)] @@ -128,6 +129,7 @@ pub async fn init( let monitor = self::database::register(name, monitor, &state); let monitor = self::email::register(name, monitor, &state, &factory); let monitor = self::matrix::register(name, monitor, &state, &factory); + let monitor = self::user::register(name, monitor, &state, &factory); // TODO: we might want to grab the join handle here factory.listen().await?; debug!(?monitor, "workers registered"); diff --git a/crates/tasks/src/user.rs b/crates/tasks/src/user.rs new file mode 100644 index 00000000..398f8d54 --- /dev/null +++ b/crates/tasks/src/user.rs @@ -0,0 +1,96 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use anyhow::Context; +use apalis_core::{ + builder::{WorkerBuilder, WorkerFactoryFn}, + context::JobContext, + executor::TokioExecutor, + job::Job, + monitor::Monitor, + storage::builder::WithStorage, +}; +use mas_storage::{ + job::{DeactivateUserJob, DeleteDeviceJob, JobWithSpanContext}, + user::UserRepository, + RepositoryAccess, +}; +use tracing::info; + +use crate::{ + storage::PostgresStorageFactory, + utils::{metrics_layer, trace_layer}, + JobContextExt, State, +}; + +/// Job to deactivate a user, both locally and on the Matrix homeserver. +#[tracing::instrument( + name = "job.deactivate_user" + fields(user.id = %job.user_id(), erase = %job.hs_erase()), + skip_all, + err(Debug), +)] +async fn deactivate_user( + job: JobWithSpanContext, + ctx: JobContext, +) -> Result<(), anyhow::Error> { + let state = ctx.state(); + let clock = state.clock(); + let matrix = state.matrix_connection(); + let mut repo = state.repository().await?; + + let user = repo + .user() + .lookup(job.user_id()) + .await? + .context("User not found")?; + + // Let's first lock the user + let user = repo + .user() + .lock(&clock, user) + .await + .context("Failed to lock user")?; + + // TODO: delete the sessions & access tokens + + // Before calling back to the homeserver, commit the changes to the database + repo.save().await?; + + let mxid = matrix.mxid(&user.username); + info!("Deactivating user {} on homeserver", mxid); + matrix.delete_user(&mxid, job.hs_erase()).await?; + + Ok(()) +} + +pub(crate) fn register( + suffix: &str, + monitor: Monitor, + state: &State, + storage_factory: &PostgresStorageFactory, +) -> Monitor { + let storage = storage_factory.build(); + let worker_name = format!("{job}-{suffix}", job = DeleteDeviceJob::NAME); + let deactivate_user_worker = WorkerBuilder::new(worker_name) + .layer(state.inject()) + .layer(trace_layer()) + .layer(metrics_layer()) + .with_storage_config(storage, |c| c.fetch_interval(Duration::from_secs(1))) + .build_fn(deactivate_user); + + monitor.register(deactivate_user_worker) +}