1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-07-31 09:24:31 +03:00

Call the homeserver for user deactivation

This commit is contained in:
Quentin Gliech
2023-08-03 14:05:10 +02:00
parent 646b6cc0e3
commit 8142cad3d6
9 changed files with 373 additions and 6 deletions

1
Cargo.lock generated
View File

@ -3382,6 +3382,7 @@ dependencies = [
"mas-matrix",
"serde",
"tower",
"tracing",
"url",
]

View File

@ -18,14 +18,14 @@ use mas_config::{DatabaseConfig, PasswordsConfig};
use mas_data_model::{Device, TokenType};
use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionRepository},
job::{DeleteDeviceJob, JobRepositoryExt, ProvisionUserJob},
job::{DeactivateUserJob, DeleteDeviceJob, JobRepositoryExt, ProvisionUserJob},
user::{UserEmailRepository, UserPasswordRepository, UserRepository},
Repository, RepositoryAccess, SystemClock,
};
use mas_storage_pg::PgRepository;
use rand::SeedableRng;
use sqlx::types::Uuid;
use tracing::{info, info_span};
use tracing::{info, info_span, warn};
use crate::util::{database_from_config, password_manager_from_config};
@ -74,6 +74,10 @@ enum Subcommand {
LockUser {
/// User to lock
username: String,
/// Whether to deactivate the user
#[arg(long)]
deactivate: bool,
},
/// Unlock a user
@ -343,7 +347,10 @@ impl Options {
Ok(())
}
SC::LockUser { username } => {
SC::LockUser {
username,
deactivate,
} => {
let _span = info_span!("cli.manage.lock_user", user.username = username).entered();
let config: DatabaseConfig = root.load_config()?;
let pool = database_from_config(&config).await?;
@ -357,7 +364,17 @@ impl Options {
info!(%user.id, "Locking user");
repo.user().lock(&clock, user).await?;
// Even though the deactivation job will lock the user, we lock it here in case
// the worker is not running, as we don't have a good way to run a job
// synchronously yet.
let user = repo.user().lock(&clock, user).await?;
if deactivate {
warn!(%user.id, "Scheduling user deactivation");
repo.job()
.schedule_job(DeactivateUserJob::new(&user, false))
.await?;
}
repo.save().await?;
Ok(())

View File

@ -9,9 +9,10 @@ license = "Apache-2.0"
anyhow = "1.0.72"
async-trait = "0.1.72"
http = "0.2.9"
url = "2.4.0"
serde = { version = "1.0.177", features = ["derive"] }
tower = { version = "0.4.13", features = ["util"] }
tracing = "0.1.37"
url = "2.4.0"
mas-axum-utils = { path = "../axum-utils" }
mas-http = { path = "../http" }

View File

@ -124,6 +124,11 @@ struct SynapseDevice {
device_id: String,
}
#[derive(Serialize)]
struct SynapseDeactivateUserRequest {
erase: bool,
}
#[async_trait::async_trait]
impl HomeserverConnection for SynapseConnection {
type Error = anyhow::Error;
@ -132,6 +137,15 @@ impl HomeserverConnection for SynapseConnection {
&self.homeserver
}
#[tracing::instrument(
name = "homeserver.query_user",
skip_all,
fields(
matrix.homeserver = self.homeserver,
matrix.mxid = mxid,
),
err(Display),
)]
async fn query_user(&self, mxid: &str) -> Result<MatrixUser, Self::Error> {
let mut client = self
.http_client_factory
@ -158,6 +172,16 @@ impl HomeserverConnection for SynapseConnection {
})
}
#[tracing::instrument(
name = "homeserver.provision_user",
skip_all,
fields(
matrix.homeserver = self.homeserver,
matrix.mxid = request.mxid(),
user.id = request.sub(),
),
err(Display),
)]
async fn provision_user(&self, request: &ProvisionRequest) -> Result<bool, Self::Error> {
let mut body = SynapseUser {
external_ids: Some(vec![ExternalID {
@ -213,6 +237,16 @@ impl HomeserverConnection for SynapseConnection {
}
}
#[tracing::instrument(
name = "homeserver.create_device",
skip_all,
fields(
matrix.homeserver = self.homeserver,
matrix.mxid = mxid,
matrix.device_id = device_id,
),
err(Display),
)]
async fn create_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error> {
let mut client = self
.http_client_factory
@ -236,6 +270,16 @@ impl HomeserverConnection for SynapseConnection {
Ok(())
}
#[tracing::instrument(
name = "homeserver.delete_device",
skip_all,
fields(
matrix.homeserver = self.homeserver,
matrix.mxid = mxid,
matrix.device_id = device_id,
),
err(Display),
)]
async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error> {
let mut client = self.http_client_factory.client().await?;
@ -253,4 +297,35 @@ impl HomeserverConnection for SynapseConnection {
Ok(())
}
#[tracing::instrument(
name = "homeserver.delete_user",
skip_all,
fields(
matrix.homeserver = self.homeserver,
matrix.mxid = mxid,
erase = erase,
),
err(Display),
)]
async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), Self::Error> {
let mut client = self
.http_client_factory
.client()
.await?
.request_bytes_to_body()
.json_request();
let request = self
.post(&format!("_synapse/admin/v1/deactivate/{mxid}"))
.body(SynapseDeactivateUserRequest { erase })?;
let response = client.ready().await?.call(request).await?;
if response.status() != StatusCode::OK {
return Err(anyhow::anyhow!("Failed to delete user in Synapse"));
}
Ok(())
}
}

View File

@ -43,6 +43,12 @@ pub struct ProvisionRequest {
}
impl ProvisionRequest {
/// Create a new [`ProvisionRequest`].
///
/// # Parameters
///
/// * `mxid` - The Matrix ID to provision.
/// * `sub` - The `sub` of the user, aka the internal ID.
#[must_use]
pub fn new(mxid: impl Into<String>, sub: impl Into<String>) -> Self {
Self {
@ -54,28 +60,41 @@ impl ProvisionRequest {
}
}
/// Get the `sub` of the user to provision, aka the internal ID.
#[must_use]
pub fn sub(&self) -> &str {
&self.sub
}
/// Get the Matrix ID to provision.
#[must_use]
pub fn mxid(&self) -> &str {
&self.mxid
}
/// Ask to set the displayname of the user.
///
/// # Parameters
///
/// * `displayname` - The displayname to set.
#[must_use]
pub fn set_displayname(mut self, displayname: String) -> Self {
self.displayname = FieldAction::Set(displayname);
self
}
/// Ask to unset the displayname of the user.
#[must_use]
pub fn unset_displayname(mut self) -> Self {
self.displayname = FieldAction::Unset;
self
}
/// Call the given callback if the displayname should be set or unset.
///
/// # Parameters
///
/// * `callback` - The callback to call.
pub fn on_displayname(&self, callback: impl FnOnce(Option<&str>)) -> &Self {
match &self.displayname {
FieldAction::Unset => callback(None),
@ -86,18 +105,29 @@ impl ProvisionRequest {
self
}
/// Ask to set the avatar URL of the user.
///
/// # Parameters
///
/// * `avatar_url` - The avatar URL to set.
#[must_use]
pub fn set_avatar_url(mut self, avatar_url: String) -> Self {
self.avatar_url = FieldAction::Set(avatar_url);
self
}
/// Ask to unset the avatar URL of the user.
#[must_use]
pub fn unset_avatar_url(mut self) -> Self {
self.avatar_url = FieldAction::Unset;
self
}
/// Call the given callback if the avatar URL should be set or unset.
///
/// # Parameters
///
/// * `callback` - The callback to call.
pub fn on_avatar_url(&self, callback: impl FnOnce(Option<&str>)) -> &Self {
match &self.avatar_url {
FieldAction::Unset => callback(None),
@ -108,18 +138,29 @@ impl ProvisionRequest {
self
}
/// Ask to set the emails of the user.
///
/// # Parameters
///
/// * `emails` - The list of emails to set.
#[must_use]
pub fn set_emails(mut self, emails: Vec<String>) -> Self {
self.emails = FieldAction::Set(emails);
self
}
/// Ask to unset the emails of the user.
#[must_use]
pub fn unset_emails(mut self) -> Self {
self.emails = FieldAction::Unset;
self
}
/// Call the given callback if the emails should be set or unset.
///
/// # Parameters
///
/// * `callback` - The callback to call.
pub fn on_emails(&self, callback: impl FnOnce(Option<&[String]>)) -> &Self {
match &self.emails {
FieldAction::Unset => callback(None),
@ -133,17 +174,84 @@ impl ProvisionRequest {
#[async_trait::async_trait]
pub trait HomeserverConnection: Send + Sync {
/// The error type returned by all methods.
type Error;
/// Get the homeserver URL.
fn homeserver(&self) -> &str;
/// Get the Matrix ID of the user with the given localpart.
///
/// # Parameters
///
/// * `localpart` - The localpart of the user.
fn mxid(&self, localpart: &str) -> String {
format!("@{}:{}", localpart, self.homeserver())
}
/// Query the state of a user on the homeserver.
///
/// # Parameters
///
/// * `mxid` - The Matrix ID of the user to query.
///
/// # Errors
///
/// Returns an error if the homeserver is unreachable or the user does not
/// exist.
async fn query_user(&self, mxid: &str) -> Result<MatrixUser, Self::Error>;
/// Provision a user on the homeserver.
///
/// # Parameters
///
/// * `request` - a [`ProvisionRequest`] containing the details of the user
/// to provision.
///
/// # Errors
///
/// Returns an error if the homeserver is unreachable or the user could not
/// be provisioned.
async fn provision_user(&self, request: &ProvisionRequest) -> Result<bool, Self::Error>;
/// Create a device for a user on the homeserver.
///
/// # Parameters
///
/// * `mxid` - The Matrix ID of the user to create a device for.
/// * `device_id` - The device ID to create.
///
/// # Errors
///
/// Returns an error if the homeserver is unreachable or the device could
/// not be created.
async fn create_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error>;
/// Delete a device for a user on the homeserver.
///
/// # Parameters
///
/// * `mxid` - The Matrix ID of the user to delete a device for.
/// * `device_id` - The device ID to delete.
///
/// # Errors
///
/// Returns an error if the homeserver is unreachable or the device could
/// not be deleted.
async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error>;
/// Delete a user on the homeserver.
///
/// # Parameters
///
/// * `mxid` - The Matrix ID of the user to delete.
/// * `erase` - Whether to ask the homeserver to erase the user's data.
///
/// # Errors
///
/// Returns an error if the homeserver is unreachable or the user could not
/// be deleted.
async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), Self::Error>;
}
#[async_trait::async_trait]
@ -169,4 +277,8 @@ impl<T: HomeserverConnection + Send + Sync + ?Sized> HomeserverConnection for &T
async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), Self::Error> {
(**self).delete_device(mxid, device_id).await
}
async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), Self::Error> {
(**self).delete_user(mxid, erase).await
}
}

View File

@ -109,6 +109,19 @@ impl HomeserverConnection for MockHomeserverConnection {
user.devices.remove(device_id);
Ok(())
}
async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), Self::Error> {
let mut users = self.users.write().await;
let user = users.get_mut(mxid).context("User not found")?;
user.devices.clear();
user.emails = None;
if erase {
user.avatar_url = None;
user.displayname = None;
}
Ok(())
}
}
#[cfg(test)]

View File

@ -73,6 +73,15 @@ pub struct JobWithSpanContext<T> {
payload: T,
}
impl<J> From<J> for JobWithSpanContext<J> {
fn from(payload: J) -> Self {
Self {
span_context: None,
payload,
}
}
}
impl<J: Job> Job for JobWithSpanContext<J> {
const NAME: &'static str = J::NAME;
}
@ -369,6 +378,47 @@ mod jobs {
impl Job for DeleteDeviceJob {
const NAME: &'static str = "delete-device";
}
/// A job to deactivate and lock a user
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DeactivateUserJob {
user_id: Ulid,
hs_erase: bool,
}
impl DeactivateUserJob {
/// Create a new job to deactivate and lock a user
///
/// # Parameters
///
/// * `user` - The user to deactivate
/// * `hs_erase` - Whether to erase the user from the homeserver
#[must_use]
pub fn new(user: &User, hs_erase: bool) -> Self {
Self {
user_id: user.id,
hs_erase,
}
}
/// The ID of the user to deactivate
#[must_use]
pub fn user_id(&self) -> Ulid {
self.user_id
}
/// Whether to erase the user from the homeserver
#[must_use]
pub fn hs_erase(&self) -> bool {
self.hs_erase
}
}
impl Job for DeactivateUserJob {
const NAME: &'static str = "deactivate-user";
}
}
pub use self::jobs::{DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, VerifyEmailJob};
pub use self::jobs::{
DeactivateUserJob, DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, VerifyEmailJob,
};

View File

@ -33,6 +33,7 @@ mod database;
mod email;
mod matrix;
mod storage;
mod user;
mod utils;
#[derive(Clone)]
@ -128,6 +129,7 @@ pub async fn init(
let monitor = self::database::register(name, monitor, &state);
let monitor = self::email::register(name, monitor, &state, &factory);
let monitor = self::matrix::register(name, monitor, &state, &factory);
let monitor = self::user::register(name, monitor, &state, &factory);
// TODO: we might want to grab the join handle here
factory.listen().await?;
debug!(?monitor, "workers registered");

96
crates/tasks/src/user.rs Normal file
View File

@ -0,0 +1,96 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use anyhow::Context;
use apalis_core::{
builder::{WorkerBuilder, WorkerFactoryFn},
context::JobContext,
executor::TokioExecutor,
job::Job,
monitor::Monitor,
storage::builder::WithStorage,
};
use mas_storage::{
job::{DeactivateUserJob, DeleteDeviceJob, JobWithSpanContext},
user::UserRepository,
RepositoryAccess,
};
use tracing::info;
use crate::{
storage::PostgresStorageFactory,
utils::{metrics_layer, trace_layer},
JobContextExt, State,
};
/// Job to deactivate a user, both locally and on the Matrix homeserver.
#[tracing::instrument(
name = "job.deactivate_user"
fields(user.id = %job.user_id(), erase = %job.hs_erase()),
skip_all,
err(Debug),
)]
async fn deactivate_user(
job: JobWithSpanContext<DeactivateUserJob>,
ctx: JobContext,
) -> Result<(), anyhow::Error> {
let state = ctx.state();
let clock = state.clock();
let matrix = state.matrix_connection();
let mut repo = state.repository().await?;
let user = repo
.user()
.lookup(job.user_id())
.await?
.context("User not found")?;
// Let's first lock the user
let user = repo
.user()
.lock(&clock, user)
.await
.context("Failed to lock user")?;
// TODO: delete the sessions & access tokens
// Before calling back to the homeserver, commit the changes to the database
repo.save().await?;
let mxid = matrix.mxid(&user.username);
info!("Deactivating user {} on homeserver", mxid);
matrix.delete_user(&mxid, job.hs_erase()).await?;
Ok(())
}
pub(crate) fn register(
suffix: &str,
monitor: Monitor<TokioExecutor>,
state: &State,
storage_factory: &PostgresStorageFactory,
) -> Monitor<TokioExecutor> {
let storage = storage_factory.build();
let worker_name = format!("{job}-{suffix}", job = DeleteDeviceJob::NAME);
let deactivate_user_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(trace_layer())
.layer(metrics_layer())
.with_storage_config(storage, |c| c.fetch_interval(Duration::from_secs(1)))
.build_fn(deactivate_user);
monitor.register(deactivate_user_worker)
}