You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-08-06 06:02:40 +03:00
Make mas-cli manage kill-sessions
finish sessions in bulk
This commit is contained in:
@@ -26,11 +26,12 @@ use mas_handlers::HttpClientFactory;
|
|||||||
use mas_matrix::HomeserverConnection;
|
use mas_matrix::HomeserverConnection;
|
||||||
use mas_matrix_synapse::SynapseConnection;
|
use mas_matrix_synapse::SynapseConnection;
|
||||||
use mas_storage::{
|
use mas_storage::{
|
||||||
compat::{CompatAccessTokenRepository, CompatSessionRepository},
|
compat::{CompatAccessTokenRepository, CompatSessionFilter, CompatSessionRepository},
|
||||||
job::{
|
job::{
|
||||||
DeactivateUserJob, JobRepositoryExt, ProvisionUserJob, ReactivateUserJob, SyncDevicesJob,
|
DeactivateUserJob, JobRepositoryExt, ProvisionUserJob, ReactivateUserJob, SyncDevicesJob,
|
||||||
},
|
},
|
||||||
user::{UserEmailRepository, UserPasswordRepository, UserRepository},
|
oauth2::OAuth2SessionFilter,
|
||||||
|
user::{BrowserSessionFilter, UserEmailRepository, UserPasswordRepository, UserRepository},
|
||||||
Clock, RepositoryAccess, SystemClock,
|
Clock, RepositoryAccess, SystemClock,
|
||||||
};
|
};
|
||||||
use mas_storage_pg::{DatabaseError, PgRepository};
|
use mas_storage_pg::{DatabaseError, PgRepository};
|
||||||
@@ -348,83 +349,43 @@ impl Options {
|
|||||||
.await?
|
.await?
|
||||||
.context("User not found")?;
|
.context("User not found")?;
|
||||||
|
|
||||||
let compat_sessions_ids: Vec<Uuid> = sqlx::query_scalar(
|
let filter = CompatSessionFilter::new().for_user(&user).active_only();
|
||||||
r"
|
let affected = if dry_run {
|
||||||
SELECT compat_session_id FROM compat_sessions
|
repo.compat_session().count(filter).await?
|
||||||
WHERE user_id = $1 AND finished_at IS NULL
|
} else {
|
||||||
",
|
repo.compat_session().finish_bulk(&clock, filter).await?
|
||||||
)
|
};
|
||||||
.bind(Uuid::from(user.id))
|
|
||||||
.fetch_all(&mut **repo)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
for id in compat_sessions_ids {
|
match affected {
|
||||||
let id = id.into();
|
0 => info!("No active compatibility sessions to end"),
|
||||||
let compat_session = repo
|
1 => info!("Ended 1 active compatibility session"),
|
||||||
.compat_session()
|
_ => info!("Ended {affected} active compatibility sessions"),
|
||||||
.lookup(id)
|
|
||||||
.await?
|
|
||||||
.context("Session not found")?;
|
|
||||||
info!(%compat_session.id, %compat_session.device, "Killing compat session");
|
|
||||||
|
|
||||||
if dry_run {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let oauth2_sessions_ids: Vec<Uuid> = sqlx::query_scalar(
|
let filter = OAuth2SessionFilter::new().for_user(&user).active_only();
|
||||||
r"
|
let affected = if dry_run {
|
||||||
SELECT oauth2_sessions.oauth2_session_id
|
repo.oauth2_session().count(filter).await?
|
||||||
FROM oauth2_sessions
|
} else {
|
||||||
INNER JOIN user_sessions USING (user_session_id)
|
repo.oauth2_session().finish_bulk(&clock, filter).await?
|
||||||
WHERE user_sessions.user_id = $1 AND oauth2_sessions.finished_at IS NULL
|
};
|
||||||
",
|
|
||||||
)
|
|
||||||
.bind(Uuid::from(user.id))
|
|
||||||
.fetch_all(&mut **repo)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
for id in oauth2_sessions_ids {
|
match affected {
|
||||||
let id = id.into();
|
0 => info!("No active compatibility sessions to end"),
|
||||||
let oauth2_session = repo
|
1 => info!("Ended 1 active OAuth 2.0 session"),
|
||||||
.oauth2_session()
|
_ => info!("Ended {affected} active OAuth 2.0 sessions"),
|
||||||
.lookup(id)
|
};
|
||||||
.await?
|
|
||||||
.context("Session not found")?;
|
|
||||||
info!(%oauth2_session.id, %oauth2_session.scope, "Killing oauth2 session");
|
|
||||||
|
|
||||||
if dry_run {
|
let filter = BrowserSessionFilter::new().for_user(&user).active_only();
|
||||||
continue;
|
let affected = if dry_run {
|
||||||
}
|
repo.browser_session().count(filter).await?
|
||||||
repo.oauth2_session().finish(&clock, oauth2_session).await?;
|
} else {
|
||||||
}
|
repo.browser_session().finish_bulk(&clock, filter).await?
|
||||||
|
};
|
||||||
|
|
||||||
let user_sessions_ids: Vec<Uuid> = sqlx::query_scalar(
|
match affected {
|
||||||
r"
|
0 => info!("No active browser sessions to end"),
|
||||||
SELECT user_session_id FROM user_sessions
|
1 => info!("Ended 1 active browser session"),
|
||||||
WHERE user_id = $1 AND finished_at IS NULL
|
_ => info!("Ended {affected} active browser sessions"),
|
||||||
",
|
|
||||||
)
|
|
||||||
.bind(Uuid::from(user.id))
|
|
||||||
.fetch_all(&mut **repo)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
for id in user_sessions_ids {
|
|
||||||
let id = id.into();
|
|
||||||
let browser_session = repo
|
|
||||||
.browser_session()
|
|
||||||
.lookup(id)
|
|
||||||
.await?
|
|
||||||
.context("Session not found")?;
|
|
||||||
info!(%browser_session.id, "Killing browser session");
|
|
||||||
|
|
||||||
if dry_run {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
repo.browser_session()
|
|
||||||
.finish(&clock, browser_session)
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule a job to sync the devices of the user with the homeserver
|
// Schedule a job to sync the devices of the user with the homeserver
|
||||||
|
Reference in New Issue
Block a user