1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-11-26 10:44:51 +03:00

Grab a database lock when syncing the config

Fixes #1475
This commit is contained in:
Quentin Gliech
2023-08-25 15:39:31 +02:00
parent 7ff9be99db
commit 9289922dfb
7 changed files with 162 additions and 76 deletions

View File

@@ -20,14 +20,14 @@ use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionRepository},
job::{DeactivateUserJob, DeleteDeviceJob, JobRepositoryExt, ProvisionUserJob},
user::{UserEmailRepository, UserPasswordRepository, UserRepository},
Repository, RepositoryAccess, SystemClock,
RepositoryAccess, SystemClock,
};
use mas_storage_pg::PgRepository;
use rand::SeedableRng;
use sqlx::types::Uuid;
use sqlx::{types::Uuid, Acquire};
use tracing::{info, info_span, warn};
use crate::util::{database_from_config, password_manager_from_config};
use crate::util::{database_connection_from_config, password_manager_from_config};
#[derive(Parser, Debug)]
pub(super) struct Options {
@@ -103,10 +103,11 @@ impl Options {
let database_config: DatabaseConfig = root.load_config()?;
let passwords_config: PasswordsConfig = root.load_config()?;
let pool = database_from_config(&database_config).await?;
let mut conn = database_connection_from_config(&database_config).await?;
let password_manager = password_manager_from_config(&passwords_config).await?;
let mut repo = PgRepository::from_pool(&pool).await?.boxed();
let txn = conn.begin().await?;
let mut repo = PgRepository::from_conn(txn);
let user = repo
.user()
.find_by_username(&username)
@@ -122,7 +123,7 @@ impl Options {
.await?;
info!(%user.id, %user.username, "Password changed");
repo.save().await?;
repo.into_inner().commit().await?;
Ok(())
}
@@ -135,9 +136,10 @@ impl Options {
)
.entered();
let config: DatabaseConfig = root.load_config()?;
let pool = database_from_config(&config).await?;
let mut repo = PgRepository::from_pool(&pool).await?.boxed();
let database_config: DatabaseConfig = root.load_config()?;
let mut conn = database_connection_from_config(&database_config).await?;
let txn = conn.begin().await?;
let mut repo = PgRepository::from_conn(txn);
let user = repo
.user()
@@ -152,7 +154,7 @@ impl Options {
.context("Email not found")?;
let email = repo.user_email().mark_as_verified(&clock, email).await?;
repo.save().await?;
repo.into_inner().commit().await?;
info!(?email, "Email marked as verified");
Ok(())
@@ -163,9 +165,10 @@ impl Options {
admin,
device_id,
} => {
let config: DatabaseConfig = root.load_config()?;
let pool = database_from_config(&config).await?;
let mut repo = PgRepository::from_pool(&pool).await?.boxed();
let database_config: DatabaseConfig = root.load_config()?;
let mut conn = database_connection_from_config(&database_config).await?;
let txn = conn.begin().await?;
let mut repo = PgRepository::from_conn(txn);
let user = repo
.user()
@@ -191,7 +194,7 @@ impl Options {
.add(&mut rng, &clock, &compat_session, token, None)
.await?;
repo.save().await?;
repo.into_inner().commit().await?;
info!(
%compat_access_token.id,
@@ -207,16 +210,16 @@ impl Options {
SC::ProvisionAllUsers => {
let _span = info_span!("cli.manage.provision_all_users").entered();
let config: DatabaseConfig = root.load_config()?;
let pool = database_from_config(&config).await?;
let mut conn = pool.acquire().await?;
let mut repo = PgRepository::from_pool(&pool).await?.boxed();
let database_config: DatabaseConfig = root.load_config()?;
let mut conn = database_connection_from_config(&database_config).await?;
let mut txn = conn.begin().await?;
// TODO: do some pagination here
let ids: Vec<Uuid> = sqlx::query_scalar("SELECT user_id FROM users")
.fetch_all(&mut *conn)
.fetch_all(&mut *txn)
.await?;
drop(conn);
let mut repo = PgRepository::from_conn(txn);
for id in ids {
let id = id.into();
@@ -225,7 +228,7 @@ impl Options {
repo.job().schedule_job(job).await?;
}
repo.save().await?;
repo.into_inner().commit().await?;
Ok(())
}
@@ -233,10 +236,10 @@ impl Options {
SC::KillSessions { username, dry_run } => {
let _span =
info_span!("cli.manage.kill_sessions", user.username = username).entered();
let config: DatabaseConfig = root.load_config()?;
let pool = database_from_config(&config).await?;
let mut conn = pool.acquire().await?;
let mut repo = PgRepository::from_pool(&pool).await?.boxed();
let database_config: DatabaseConfig = root.load_config()?;
let mut conn = database_connection_from_config(&database_config).await?;
let txn = conn.begin().await?;
let mut repo = PgRepository::from_conn(txn);
let user = repo
.user()
@@ -251,7 +254,7 @@ impl Options {
"#,
)
.bind(Uuid::from(user.id))
.fetch_all(&mut *conn)
.fetch_all(&mut **repo)
.await?;
for id in compat_sessions_ids {
@@ -281,7 +284,7 @@ impl Options {
"#,
)
.bind(Uuid::from(user.id))
.fetch_all(&mut *conn)
.fetch_all(&mut **repo)
.await?;
for id in oauth2_sessions_ids {
@@ -316,7 +319,7 @@ impl Options {
"#,
)
.bind(Uuid::from(user.id))
.fetch_all(&mut *conn)
.fetch_all(&mut **repo)
.await?;
for id in user_sessions_ids {
@@ -337,11 +340,12 @@ impl Options {
.await?;
}
let txn = repo.into_inner();
if dry_run {
info!("Dry run, not saving");
repo.cancel().await?;
txn.rollback().await?;
} else {
repo.save().await?;
txn.commit().await?;
}
Ok(())
@@ -353,8 +357,9 @@ impl Options {
} => {
let _span = info_span!("cli.manage.lock_user", user.username = username).entered();
let config: DatabaseConfig = root.load_config()?;
let pool = database_from_config(&config).await?;
let mut repo = PgRepository::from_pool(&pool).await?.boxed();
let mut conn = database_connection_from_config(&config).await?;
let txn = conn.begin().await?;
let mut repo = PgRepository::from_conn(txn);
let user = repo
.user()
@@ -375,7 +380,8 @@ impl Options {
.schedule_job(DeactivateUserJob::new(&user, false))
.await?;
}
repo.save().await?;
repo.into_inner().commit().await?;
Ok(())
}
@@ -383,8 +389,9 @@ impl Options {
SC::UnlockUser { username } => {
let _span = info_span!("cli.manage.lock_user", user.username = username).entered();
let config: DatabaseConfig = root.load_config()?;
let pool = database_from_config(&config).await?;
let mut repo = PgRepository::from_pool(&pool).await?.boxed();
let mut conn = database_connection_from_config(&config).await?;
let txn = conn.begin().await?;
let mut repo = PgRepository::from_conn(txn);
let user = repo
.user()
@@ -395,7 +402,7 @@ impl Options {
info!(%user.id, "Unlocking user");
repo.user().unlock(user).await?;
repo.save().await?;
repo.into_inner().commit().await?;
Ok(())
}