From 53172d6a3f40381ec35599eef46161466ce7a5bf Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 3 Jan 2023 15:21:47 +0100 Subject: [PATCH] strorage: browser session and user password repositories --- crates/axum-utils/src/session.rs | 16 +- crates/cli/src/commands/manage.rs | 15 +- crates/data-model/src/users.rs | 7 + crates/graphql/src/lib.rs | 7 +- crates/graphql/src/model/users.rs | 16 +- crates/handlers/src/compat/login.rs | 26 +- crates/handlers/src/graphql.rs | 6 +- crates/handlers/src/upstream_oauth2/link.rs | 19 +- crates/handlers/src/views/account/mod.rs | 4 +- crates/handlers/src/views/account/password.rs | 35 +- crates/handlers/src/views/login.rs | 39 +- crates/handlers/src/views/logout.rs | 4 +- crates/handlers/src/views/reauth.rs | 36 +- crates/handlers/src/views/register.rs | 26 +- crates/storage/sqlx-data.json | 320 ++++++------- crates/storage/src/oauth2/access_token.rs | 1 + .../storage/src/oauth2/authorization_grant.rs | 1 + crates/storage/src/oauth2/mod.rs | 13 +- crates/storage/src/oauth2/refresh_token.rs | 1 + crates/storage/src/repository.rs | 35 +- crates/storage/src/user/authentication.rs | 105 ----- crates/storage/src/user/mod.rs | 250 +---------- crates/storage/src/user/password.rs | 229 ++++++---- crates/storage/src/user/session.rs | 425 ++++++++++++++++++ crates/templates/src/context.rs | 4 +- 25 files changed, 914 insertions(+), 726 deletions(-) delete mode 100644 crates/storage/src/user/authentication.rs create mode 100644 crates/storage/src/user/session.rs diff --git a/crates/axum-utils/src/session.rs b/crates/axum-utils/src/session.rs index a63c2266..64887895 100644 --- a/crates/axum-utils/src/session.rs +++ b/crates/axum-utils/src/session.rs @@ -14,9 +14,9 @@ use axum_extra::extract::cookie::{Cookie, PrivateCookieJar}; use mas_data_model::BrowserSession; -use mas_storage::{user::lookup_active_session, DatabaseError}; +use mas_storage::{user::BrowserSessionRepository, DatabaseError, Repository}; use serde::{Deserialize, Serialize}; -use sqlx::{Executor, Postgres}; +use sqlx::PgConnection; use ulid::Ulid; use crate::CookieExt; @@ -46,7 +46,7 @@ impl SessionInfo { /// Load the [`BrowserSession`] from database pub async fn load_session( &self, - executor: impl Executor<'_, Database = Postgres>, + conn: &mut PgConnection, ) -> Result, DatabaseError> { let session_id = if let Some(id) = self.current { id @@ -54,8 +54,14 @@ impl SessionInfo { return Ok(None); }; - let res = lookup_active_session(executor, session_id).await?; - Ok(res) + let maybe_session = conn + .browser_session() + .lookup(session_id) + .await? + // Ensure that the session is still active + .filter(BrowserSession::active); + + Ok(maybe_session) } } diff --git a/crates/cli/src/commands/manage.rs b/crates/cli/src/commands/manage.rs index d46130e2..60b94bfe 100644 --- a/crates/cli/src/commands/manage.rs +++ b/crates/cli/src/commands/manage.rs @@ -20,7 +20,7 @@ use mas_router::UrlBuilder; use mas_storage::{ oauth2::client::{insert_client_from_config, lookup_client, truncate_clients}, upstream_oauth2::UpstreamOAuthProviderRepository, - user::{add_user_password, UserEmailRepository, UserRepository}, + user::{UserEmailRepository, UserPasswordRepository, UserRepository}, Clock, Repository, }; use oauth2_types::scope::Scope; @@ -210,16 +210,9 @@ impl Options { let (version, hashed_password) = password_manager.hash(&mut rng, password).await?; - add_user_password( - &mut txn, - &mut rng, - &clock, - &user, - version, - hashed_password, - None, - ) - .await?; + txn.user_password() + .add(&mut rng, &clock, &user, version, hashed_password, None) + .await?; info!(%user.id, %user.username, "Password changed"); txn.commit().await?; diff --git a/crates/data-model/src/users.rs b/crates/data-model/src/users.rs index 995535d7..638ed77e 100644 --- a/crates/data-model/src/users.rs +++ b/crates/data-model/src/users.rs @@ -57,10 +57,16 @@ pub struct BrowserSession { pub id: Ulid, pub user: User, pub created_at: DateTime, + pub finished_at: Option>, pub last_authentication: Option, } impl BrowserSession { + #[must_use] + pub fn active(&self) -> bool { + self.finished_at.is_none() + } + #[must_use] pub fn was_authenticated_after(&self, after: DateTime) -> bool { if let Some(auth) = &self.last_authentication { @@ -80,6 +86,7 @@ impl BrowserSession { id: Ulid::from_datetime_with_source(now.into(), rng), user, created_at: now, + finished_at: None, last_authentication: None, }) .collect() diff --git a/crates/graphql/src/lib.rs b/crates/graphql/src/lib.rs index f01d8370..b79b6fe9 100644 --- a/crates/graphql/src/lib.rs +++ b/crates/graphql/src/lib.rs @@ -31,8 +31,9 @@ use async_graphql::{ Context, Description, EmptyMutation, EmptySubscription, ID, }; use mas_storage::{ - upstream_oauth2::UpstreamOAuthProviderRepository, user::UserEmailRepository, Repository, - UpstreamOAuthLinkRepository, + upstream_oauth2::UpstreamOAuthProviderRepository, + user::{BrowserSessionRepository, UserEmailRepository}, + Repository, UpstreamOAuthLinkRepository, }; use model::CreationEvent; use sqlx::PgPool; @@ -128,7 +129,7 @@ impl RootQuery { let Some(session) = session else { return Ok(None) }; let current_user = session.user; - let browser_session = mas_storage::user::lookup_active_session(&mut conn, id).await?; + let browser_session = conn.browser_session().lookup(id).await?; let ret = browser_session.and_then(|browser_session| { if browser_session.user.id == current_user.id { diff --git a/crates/graphql/src/model/users.rs b/crates/graphql/src/model/users.rs index 58119c09..dc40d6cd 100644 --- a/crates/graphql/src/model/users.rs +++ b/crates/graphql/src/model/users.rs @@ -17,7 +17,10 @@ use async_graphql::{ Context, Description, Object, ID, }; use chrono::{DateTime, Utc}; -use mas_storage::{user::UserEmailRepository, Repository, UpstreamOAuthLinkRepository}; +use mas_storage::{ + user::{BrowserSessionRepository, UserEmailRepository}, + Repository, UpstreamOAuthLinkRepository, +}; use sqlx::PgPool; use super::{ @@ -140,14 +143,13 @@ impl User { .map(|x: OpaqueCursor| x.extract_for_type(NodeType::BrowserSession)) .transpose()?; - let (has_previous_page, has_next_page, edges) = - mas_storage::user::get_paginated_user_sessions( - &mut conn, &self.0, before_id, after_id, first, last, - ) + let page = conn + .browser_session() + .list_active_paginated(&self.0, before_id, after_id, first, last) .await?; - let mut connection = Connection::new(has_previous_page, has_next_page); - connection.edges.extend(edges.into_iter().map(|u| { + let mut connection = Connection::new(page.has_previous_page, page.has_next_page); + connection.edges.extend(page.edges.into_iter().map(|u| { Edge::new( OpaqueCursor(NodeCursor(NodeType::BrowserSession, u.id)), BrowserSession(u), diff --git a/crates/handlers/src/compat/login.rs b/crates/handlers/src/compat/login.rs index dd4d4742..c3b91002 100644 --- a/crates/handlers/src/compat/login.rs +++ b/crates/handlers/src/compat/login.rs @@ -21,7 +21,7 @@ use mas_storage::{ add_compat_access_token, add_compat_refresh_token, get_compat_sso_login_by_token, mark_compat_sso_login_as_exchanged, start_compat_session, }, - user::{add_user_password, lookup_user_password, UserRepository}, + user::{UserPasswordRepository, UserRepository}, Clock, Repository, }; use serde::{Deserialize, Serialize}; @@ -321,7 +321,9 @@ async fn user_password_login( .ok_or(RouteError::UserNotFound)?; // Lookup its password - let user_password = lookup_user_password(&mut *txn, &user) + let user_password = txn + .user_password() + .active(&user) .await? .ok_or(RouteError::NoPassword)?; @@ -340,16 +342,16 @@ async fn user_password_login( if let Some((version, hashed_password)) = new_password_hash { // Save the upgraded password if needed - add_user_password( - &mut *txn, - &mut rng, - &clock, - &user, - version, - hashed_password, - Some(user_password), - ) - .await?; + txn.user_password() + .add( + &mut rng, + &clock, + &user, + version, + hashed_password, + Some(&user_password), + ) + .await?; } // Now that the user credentials have been verified, start a new compat session diff --git a/crates/handlers/src/graphql.rs b/crates/handlers/src/graphql.rs index ba691994..2177388b 100644 --- a/crates/handlers/src/graphql.rs +++ b/crates/handlers/src/graphql.rs @@ -67,7 +67,8 @@ pub async fn post( let content_type = content_type.map(|TypedHeader(h)| h.to_string()); let (session_info, _cookie_jar) = cookie_jar.session_info(); - let maybe_session = session_info.load_session(&pool).await?; + let mut conn = pool.acquire().await?; + let maybe_session = session_info.load_session(&mut conn).await?; let mut request = async_graphql::http::receive_batch_body( content_type, @@ -116,7 +117,8 @@ pub async fn get( RawQuery(query): RawQuery, ) -> Result { let (session_info, _cookie_jar) = cookie_jar.session_info(); - let maybe_session = session_info.load_session(&pool).await?; + let mut conn = pool.acquire().await?; + let maybe_session = session_info.load_session(&mut conn).await?; let mut request = async_graphql::http::parse_query_string(&query.unwrap_or_default())?; diff --git a/crates/handlers/src/upstream_oauth2/link.rs b/crates/handlers/src/upstream_oauth2/link.rs index dbd06059..80fa04f7 100644 --- a/crates/handlers/src/upstream_oauth2/link.rs +++ b/crates/handlers/src/upstream_oauth2/link.rs @@ -26,7 +26,7 @@ use mas_axum_utils::{ use mas_keystore::Encrypter; use mas_storage::{ upstream_oauth2::UpstreamOAuthSessionRepository, - user::{authenticate_session_with_upstream, start_session, UserRepository}, + user::{BrowserSessionRepository, UserRepository}, Repository, UpstreamOAuthLinkRepository, }; use mas_templates::{ @@ -134,14 +134,16 @@ pub(crate) async fn get( let maybe_user_session = user_session_info.load_session(&mut txn).await?; let render = match (maybe_user_session, link.user_id) { - (Some(mut session), Some(user_id)) if session.user.id == user_id => { + (Some(session), Some(user_id)) if session.user.id == user_id => { // Session already linked, and link matches the currently logged // user. Mark the session as consumed and renew the authentication. txn.upstream_oauth_session() .consume(&clock, upstream_session) .await?; - authenticate_session_with_upstream(&mut txn, &mut rng, &clock, &mut session, &link) + let session = txn + .browser_session() + .authenticate_with_upstream(&mut rng, &clock, session, &link) .await?; cookie_jar = cookie_jar.set_session(&session); @@ -252,7 +254,7 @@ pub(crate) async fn post( let (user_session_info, cookie_jar) = cookie_jar.session_info(); let maybe_user_session = user_session_info.load_session(&mut txn).await?; - let mut session = match (maybe_user_session, link.user_id, form) { + let session = match (maybe_user_session, link.user_id, form) { (Some(session), None, FormData::Link) => { txn.upstream_oauth_link() .associate_to_user(&link, &session.user) @@ -268,7 +270,7 @@ pub(crate) async fn post( .await? .ok_or(RouteError::UserNotFound)?; - start_session(&mut txn, &mut rng, &clock, user).await? + txn.browser_session().add(&mut rng, &clock, &user).await? } (None, None, FormData::Register { username }) => { @@ -277,7 +279,7 @@ pub(crate) async fn post( .associate_to_user(&link, &user) .await?; - start_session(&mut txn, &mut rng, &clock, user).await? + txn.browser_session().add(&mut rng, &clock, &user).await? } _ => return Err(RouteError::InvalidFormAction), @@ -287,7 +289,10 @@ pub(crate) async fn post( .consume(&clock, upstream_session) .await?; - authenticate_session_with_upstream(&mut txn, &mut rng, &clock, &mut session, &link).await?; + let session = txn + .browser_session() + .authenticate_with_upstream(&mut rng, &clock, session, &link) + .await?; let cookie_jar = sessions_cookie .consume_link(link_id)? diff --git a/crates/handlers/src/views/account/mod.rs b/crates/handlers/src/views/account/mod.rs index 07a70898..0188aef2 100644 --- a/crates/handlers/src/views/account/mod.rs +++ b/crates/handlers/src/views/account/mod.rs @@ -24,7 +24,7 @@ use mas_axum_utils::{csrf::CsrfExt, FancyError, SessionInfoExt}; use mas_keystore::Encrypter; use mas_router::Route; use mas_storage::{ - user::{count_active_sessions, UserEmailRepository}, + user::{BrowserSessionRepository, UserEmailRepository}, Repository, }; use mas_templates::{AccountContext, TemplateContext, Templates}; @@ -50,7 +50,7 @@ pub(crate) async fn get( return Ok((cookie_jar, login.go()).into_response()); }; - let active_sessions = count_active_sessions(&mut conn, &session.user).await?; + let active_sessions = conn.browser_session().count_active(&session.user).await?; let emails = conn.user_email().all(&session.user).await?; diff --git a/crates/handlers/src/views/account/password.rs b/crates/handlers/src/views/account/password.rs index 2ba4b3f8..42c0194b 100644 --- a/crates/handlers/src/views/account/password.rs +++ b/crates/handlers/src/views/account/password.rs @@ -26,8 +26,8 @@ use mas_data_model::BrowserSession; use mas_keystore::Encrypter; use mas_router::Route; use mas_storage::{ - user::{add_user_password, authenticate_session_with_password, lookup_user_password}, - Clock, + user::{BrowserSessionRepository, UserPasswordRepository}, + Clock, Repository, }; use mas_templates::{EmptyContext, TemplateContext, Templates}; use rand::Rng; @@ -98,14 +98,16 @@ pub(crate) async fn post( let maybe_session = session_info.load_session(&mut txn).await?; - let mut session = if let Some(session) = maybe_session { + let session = if let Some(session) = maybe_session { session } else { let login = mas_router::Login::and_then(mas_router::PostAuthAction::ChangePassword); return Ok((cookie_jar, login.go()).into_response()); }; - let user_password = lookup_user_password(&mut txn, &session.user) + let user_password = txn + .user_password() + .active(&session.user) .await? .context("user has no password")?; @@ -127,18 +129,21 @@ pub(crate) async fn post( } let (version, hashed_password) = password_manager.hash(&mut rng, new_password).await?; - let user_password = add_user_password( - &mut txn, - &mut rng, - &clock, - &session.user, - version, - hashed_password, - None, - ) - .await?; + let user_password = txn + .user_password() + .add( + &mut rng, + &clock, + &session.user, + version, + hashed_password, + None, + ) + .await?; - authenticate_session_with_password(&mut txn, &mut rng, &clock, &mut session, &user_password) + let session = txn + .browser_session() + .authenticate_with_password(&mut rng, &clock, session, &user_password) .await?; let reply = render(&mut rng, &clock, templates.clone(), session, cookie_jar).await?; diff --git a/crates/handlers/src/views/login.rs b/crates/handlers/src/views/login.rs index 5ba76b72..1ef5efbb 100644 --- a/crates/handlers/src/views/login.rs +++ b/crates/handlers/src/views/login.rs @@ -25,10 +25,7 @@ use mas_data_model::BrowserSession; use mas_keystore::Encrypter; use mas_storage::{ upstream_oauth2::UpstreamOAuthProviderRepository, - user::{ - add_user_password, authenticate_session_with_password, lookup_user_password, start_session, - UserRepository, - }, + user::{BrowserSessionRepository, UserPasswordRepository, UserRepository}, Clock, Repository, }; use mas_templates::{ @@ -181,7 +178,9 @@ async fn login( .ok_or(FormError::InvalidCredentials)?; // And its password - let user_password = lookup_user_password(&mut *conn, &user) + let user_password = conn + .user_password() + .active(&user) .await .map_err(|_e| FormError::Internal)? .ok_or(FormError::InvalidCredentials)?; @@ -201,28 +200,32 @@ async fn login( let user_password = if let Some((version, new_password_hash)) = new_password_hash { // Save the upgraded password - add_user_password( - &mut *conn, - &mut rng, - clock, - &user, - version, - new_password_hash, - Some(user_password), - ) - .await - .map_err(|_| FormError::Internal)? + conn.user_password() + .add( + &mut rng, + clock, + &user, + version, + new_password_hash, + Some(&user_password), + ) + .await + .map_err(|_| FormError::Internal)? } else { user_password }; // Start a new session - let mut user_session = start_session(&mut *conn, &mut rng, clock, user) + let user_session = conn + .browser_session() + .add(&mut rng, clock, &user) .await .map_err(|_| FormError::Internal)?; // And mark it as authenticated by the password - authenticate_session_with_password(&mut *conn, rng, clock, &mut user_session, &user_password) + let user_session = conn + .browser_session() + .authenticate_with_password(&mut rng, clock, user_session, &user_password) .await .map_err(|_| FormError::Internal)?; diff --git a/crates/handlers/src/views/logout.rs b/crates/handlers/src/views/logout.rs index 07043e64..88c4a9c2 100644 --- a/crates/handlers/src/views/logout.rs +++ b/crates/handlers/src/views/logout.rs @@ -23,7 +23,7 @@ use mas_axum_utils::{ }; use mas_keystore::Encrypter; use mas_router::{PostAuthAction, Route}; -use mas_storage::{user::end_session, Clock}; +use mas_storage::{user::BrowserSessionRepository, Clock, Repository}; use sqlx::PgPool; pub(crate) async fn post( @@ -41,7 +41,7 @@ pub(crate) async fn post( let maybe_session = session_info.load_session(&mut txn).await?; if let Some(session) = maybe_session { - end_session(&mut txn, &clock, &session).await?; + txn.browser_session().finish(&clock, session).await?; cookie_jar = cookie_jar.update_session_info(&session_info.mark_session_ended()); } diff --git a/crates/handlers/src/views/reauth.rs b/crates/handlers/src/views/reauth.rs index 875189a7..7911a930 100644 --- a/crates/handlers/src/views/reauth.rs +++ b/crates/handlers/src/views/reauth.rs @@ -24,8 +24,9 @@ use mas_axum_utils::{ }; use mas_keystore::Encrypter; use mas_router::Route; -use mas_storage::user::{ - add_user_password, authenticate_session_with_password, lookup_user_password, +use mas_storage::{ + user::{BrowserSessionRepository, UserPasswordRepository}, + Repository, }; use mas_templates::{ReauthContext, TemplateContext, Templates}; use serde::Deserialize; @@ -93,7 +94,7 @@ pub(crate) async fn post( let maybe_session = session_info.load_session(&mut txn).await?; - let mut session = if let Some(session) = maybe_session { + let session = if let Some(session) = maybe_session { session } else { // If there is no session, redirect to the login screen, keeping the @@ -103,7 +104,9 @@ pub(crate) async fn post( }; // Load the user password - let user_password = lookup_user_password(&mut txn, &session.user) + let user_password = txn + .user_password() + .active(&session.user) .await? .context("User has no password")?; @@ -122,22 +125,25 @@ pub(crate) async fn post( let user_password = if let Some((version, new_password_hash)) = new_password_hash { // Save the upgraded password - add_user_password( - &mut *txn, - &mut rng, - &clock, - &session.user, - version, - new_password_hash, - Some(user_password), - ) - .await? + txn.user_password() + .add( + &mut rng, + &clock, + &session.user, + version, + new_password_hash, + Some(&user_password), + ) + .await? } else { user_password }; // Mark the session as authenticated by the password - authenticate_session_with_password(&mut txn, rng, &clock, &mut session, &user_password).await?; + let session = txn + .browser_session() + .authenticate_with_password(&mut rng, &clock, session, &user_password) + .await?; let cookie_jar = cookie_jar.set_session(&session); txn.commit().await?; diff --git a/crates/handlers/src/views/register.rs b/crates/handlers/src/views/register.rs index 01dc2116..b2fe9fe0 100644 --- a/crates/handlers/src/views/register.rs +++ b/crates/handlers/src/views/register.rs @@ -32,10 +32,7 @@ use mas_keystore::Encrypter; use mas_policy::PolicyFactory; use mas_router::Route; use mas_storage::{ - user::{ - add_user_password, authenticate_session_with_password, start_session, UserEmailRepository, - UserRepository, - }, + user::{BrowserSessionRepository, UserEmailRepository, UserPasswordRepository, UserRepository}, Repository, }; use mas_templates::{ @@ -191,16 +188,10 @@ pub(crate) async fn post( let user = txn.user().add(&mut rng, &clock, form.username).await?; let password = Zeroizing::new(form.password.into_bytes()); let (version, hashed_password) = password_manager.hash(&mut rng, password).await?; - let user_password = add_user_password( - &mut txn, - &mut rng, - &clock, - &user, - version, - hashed_password, - None, - ) - .await?; + let user_password = txn + .user_password() + .add(&mut rng, &clock, &user, version, hashed_password, None) + .await?; let user_email = txn .user_email() @@ -228,8 +219,11 @@ pub(crate) async fn post( let next = mas_router::AccountVerifyEmail::new(user_email.id).and_maybe(query.post_auth_action); - let mut session = start_session(&mut txn, &mut rng, &clock, user).await?; - authenticate_session_with_password(&mut txn, &mut rng, &clock, &mut session, &user_password) + let session = txn.browser_session().add(&mut rng, &clock, &user).await?; + + let session = txn + .browser_session() + .authenticate_with_password(&mut rng, &clock, session, &user_password) .await?; txn.commit().await?; diff --git a/crates/storage/sqlx-data.json b/crates/storage/sqlx-data.json index 3191a9db..1676c69a 100644 --- a/crates/storage/sqlx-data.json +++ b/crates/storage/sqlx-data.json @@ -252,62 +252,6 @@ }, "query": "\n SELECT user_id\n , username\n , primary_user_email_id\n , created_at\n FROM users\n WHERE user_id = $1\n " }, - "09d995295b2e4f180181ec96023b1e524ddae9098694eedc4dcce857e3095c0e": { - "describe": { - "columns": [ - { - "name": "user_session_id", - "ordinal": 0, - "type_info": "Uuid" - }, - { - "name": "user_session_created_at", - "ordinal": 1, - "type_info": "Timestamptz" - }, - { - "name": "user_id", - "ordinal": 2, - "type_info": "Uuid" - }, - { - "name": "user_username", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "user_primary_user_email_id", - "ordinal": 4, - "type_info": "Uuid" - }, - { - "name": "last_authentication_id?", - "ordinal": 5, - "type_info": "Uuid" - }, - { - "name": "last_authd_at?", - "ordinal": 6, - "type_info": "Timestamptz" - } - ], - "nullable": [ - false, - false, - false, - false, - true, - false, - false - ], - "parameters": { - "Left": [ - "Uuid" - ] - } - }, - "query": "\n SELECT s.user_session_id\n , s.created_at AS \"user_session_created_at\"\n , u.user_id\n , u.username AS \"user_username\"\n , u.primary_user_email_id AS \"user_primary_user_email_id\"\n , a.user_session_authentication_id AS \"last_authentication_id?\"\n , a.created_at AS \"last_authd_at?\"\n FROM user_sessions s\n INNER JOIN users u\n USING (user_id)\n LEFT JOIN user_session_authentications a\n USING (user_session_id)\n WHERE s.user_session_id = $1 AND s.finished_at IS NULL\n ORDER BY a.created_at DESC\n LIMIT 1\n " - }, "154e2e4488ff87e09163698750b56a43127cee4e1392785416a586d40a4d9b21": { "describe": { "columns": [ @@ -851,20 +795,6 @@ }, "query": "\n SELECT user_email_confirmation_code_id\n , user_email_id\n , code\n , created_at\n , expires_at\n , consumed_at\n FROM user_email_confirmation_codes\n WHERE code = $1\n AND user_email_id = $2\n " }, - "1eb6d13e75d8f526c2785749a020731c18012f03e07995213acd38ab560ce497": { - "describe": { - "columns": [], - "nullable": [], - "parameters": { - "Left": [ - "Uuid", - "Uuid", - "Timestamptz" - ] - } - }, - "query": "\n INSERT INTO user_session_authentications\n (user_session_authentication_id, user_session_id, created_at)\n VALUES ($1, $2, $3)\n " - }, "1ee5cecfafd4726a4ebc08da8a34c09178e6e1e072581c8fca9d3d76967792cb": { "describe": { "columns": [], @@ -1060,6 +990,20 @@ }, "query": "\n SELECT user_email_id\n , user_id\n , email\n , created_at\n , confirmed_at\n FROM user_emails\n\n WHERE user_email_id = $1\n " }, + "41c1aafbd338c24476f27d342cf80eef7de2836e85b078232d143d6712fc2be4": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Timestamptz" + ] + } + }, + "query": "\n INSERT INTO user_session_authentications\n (user_session_authentication_id, user_session_id, created_at)\n VALUES ($1, $2, $3)\n " + }, "43a5cafbdc8037e9fb779812a0793cf0859902aa0dc8d25d4c33d231d3d1118b": { "describe": { "columns": [], @@ -1076,6 +1020,50 @@ }, "query": "\n INSERT INTO oauth2_access_tokens\n (oauth2_access_token_id, oauth2_session_id, access_token, created_at, expires_at)\n VALUES\n ($1, $2, $3, $4, $5)\n " }, + "446a8d7bd8532a751810401adfab924dc20785c91770ed43d62df2e590e8da71": { + "describe": { + "columns": [ + { + "name": "user_password_id", + "ordinal": 0, + "type_info": "Uuid" + }, + { + "name": "hashed_password", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "version", + "ordinal": 2, + "type_info": "Int4" + }, + { + "name": "upgraded_from_id", + "ordinal": 3, + "type_info": "Uuid" + }, + { + "name": "created_at", + "ordinal": 4, + "type_info": "Timestamptz" + } + ], + "nullable": [ + false, + false, + false, + true, + false + ], + "parameters": { + "Left": [ + "Uuid" + ] + } + }, + "query": "\n SELECT up.user_password_id\n , up.hashed_password\n , up.version\n , up.upgraded_from_id\n , up.created_at\n FROM user_passwords up\n WHERE up.user_id = $1\n ORDER BY up.created_at DESC\n LIMIT 1\n " + }, "4693f2b9b3d51ff4a05e233b6667161ebc97f331d96bf5f1c61069e1c8492105": { "describe": { "columns": [], @@ -1308,19 +1296,6 @@ }, "query": "\n INSERT INTO oauth2_consents\n (oauth2_consent_id, user_id, oauth2_client_id, scope_token, created_at)\n SELECT id, $2, $3, scope_token, $5 FROM UNNEST($1::uuid[], $4::text[]) u(id, scope_token)\n ON CONFLICT (user_id, oauth2_client_id, scope_token) DO UPDATE SET refreshed_at = $5\n " }, - "64a56818dd16ac6368efe3e34196a77b7feda1eb87b696e0063a51bf50e499e5": { - "describe": { - "columns": [], - "nullable": [], - "parameters": { - "Left": [ - "Timestamptz", - "Uuid" - ] - } - }, - "query": "\n UPDATE user_sessions\n SET finished_at = $1\n WHERE user_session_id = $2\n " - }, "64e6ea47c2e877c1ebe4338d64d9ad8a6c1c777d1daea024b8ca2e7f0dd75b0f": { "describe": { "columns": [], @@ -1438,6 +1413,43 @@ }, "query": "\n UPDATE oauth2_access_tokens\n SET revoked_at = $2\n WHERE oauth2_access_token_id = $1\n " }, + "6f97b5f9ad0d4d15387150bea3839fb7f81015f7ceef61ecaadba64521895cff": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Text", + "Int4", + "Uuid", + "Timestamptz" + ] + } + }, + "query": "\n INSERT INTO user_passwords\n (user_password_id, user_id, hashed_password, version, upgraded_from_id, created_at)\n VALUES ($1, $2, $3, $4, $5, $6)\n " + }, + "751d549073d77ded84aea1aaba36d3b130ec71bc592d722eb75b959b80f0b4ff": { + "describe": { + "columns": [ + { + "name": "count!", + "ordinal": 0, + "type_info": "Int8" + } + ], + "nullable": [ + null + ], + "parameters": { + "Left": [ + "Uuid" + ] + } + }, + "query": "\n SELECT COUNT(*) as \"count!\"\n FROM user_sessions s\n WHERE s.user_id = $1 AND s.finished_at IS NULL\n " + }, "7756a60c36a64a259f7450d6eb77ee92303638ca374a63f23ac4944ccf9f4436": { "describe": { "columns": [ @@ -1554,6 +1566,68 @@ }, "query": "\n SELECT\n c.oauth2_client_id,\n c.encrypted_client_secret,\n ARRAY(\n SELECT redirect_uri\n FROM oauth2_client_redirect_uris r\n WHERE r.oauth2_client_id = c.oauth2_client_id\n ) AS \"redirect_uris!\",\n c.grant_type_authorization_code,\n c.grant_type_refresh_token,\n c.client_name,\n c.logo_uri,\n c.client_uri,\n c.policy_uri,\n c.tos_uri,\n c.jwks_uri,\n c.jwks,\n c.id_token_signed_response_alg,\n c.userinfo_signed_response_alg,\n c.token_endpoint_auth_method,\n c.token_endpoint_auth_signing_alg,\n c.initiate_login_uri\n FROM oauth2_clients c\n\n WHERE c.oauth2_client_id = ANY($1::uuid[])\n " }, + "79295f3d3a75f831e9469aabfa720d381a254d00dbe39fef1e9652029d51b89b": { + "describe": { + "columns": [ + { + "name": "user_session_id", + "ordinal": 0, + "type_info": "Uuid" + }, + { + "name": "user_session_created_at", + "ordinal": 1, + "type_info": "Timestamptz" + }, + { + "name": "user_session_finished_at", + "ordinal": 2, + "type_info": "Timestamptz" + }, + { + "name": "user_id", + "ordinal": 3, + "type_info": "Uuid" + }, + { + "name": "user_username", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "user_primary_user_email_id", + "ordinal": 5, + "type_info": "Uuid" + }, + { + "name": "last_authentication_id?", + "ordinal": 6, + "type_info": "Uuid" + }, + { + "name": "last_authd_at?", + "ordinal": 7, + "type_info": "Timestamptz" + } + ], + "nullable": [ + false, + false, + true, + false, + false, + true, + false, + false + ], + "parameters": { + "Left": [ + "Uuid" + ] + } + }, + "query": "\n SELECT s.user_session_id\n , s.created_at AS \"user_session_created_at\"\n , s.finished_at AS \"user_session_finished_at\"\n , u.user_id\n , u.username AS \"user_username\"\n , u.primary_user_email_id AS \"user_primary_user_email_id\"\n , a.user_session_authentication_id AS \"last_authentication_id?\"\n , a.created_at AS \"last_authd_at?\"\n FROM user_sessions s\n INNER JOIN users u\n USING (user_id)\n LEFT JOIN user_session_authentications a\n USING (user_session_id)\n WHERE s.user_session_id = $1\n ORDER BY a.created_at DESC\n LIMIT 1\n " + }, "7ce387b1b0aaf10e72adde667b19521b66eaafa51f73bf2f95e38b8f3b64a229": { "describe": { "columns": [], @@ -1872,70 +1946,6 @@ }, "query": "\n UPDATE oauth2_sessions\n SET finished_at = $2\n WHERE oauth2_session_id = $1\n " }, - "9edf5e8a3e00a7cdd8e55b97105df7831ee580096299df4bd6c1ed7c96b95e83": { - "describe": { - "columns": [ - { - "name": "count!", - "ordinal": 0, - "type_info": "Int8" - } - ], - "nullable": [ - null - ], - "parameters": { - "Left": [ - "Uuid" - ] - } - }, - "query": "\n SELECT COUNT(*) as \"count!\"\n FROM user_sessions s\n WHERE s.user_id = $1 AND s.finished_at IS NULL\n " - }, - "a1c19d9d7f1522d126787c7f9946ed51cbbd8f27a4947bc371acab3e7bf23267": { - "describe": { - "columns": [ - { - "name": "user_password_id", - "ordinal": 0, - "type_info": "Uuid" - }, - { - "name": "hashed_password", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "version", - "ordinal": 2, - "type_info": "Int4" - }, - { - "name": "upgraded_from_id", - "ordinal": 3, - "type_info": "Uuid" - }, - { - "name": "created_at", - "ordinal": 4, - "type_info": "Timestamptz" - } - ], - "nullable": [ - false, - false, - false, - true, - false - ], - "parameters": { - "Left": [ - "Uuid" - ] - } - }, - "query": "\n SELECT up.user_password_id\n , up.hashed_password\n , up.version\n , up.upgraded_from_id\n , up.created_at\n FROM user_passwords up\n WHERE up.user_id = $1\n ORDER BY up.created_at DESC\n LIMIT 1\n " - }, "a300fe99c95679c5664646a6a525c0491829e97db45f3234483872ed38436322": { "describe": { "columns": [ @@ -2109,7 +2119,7 @@ }, "query": "\n UPDATE users\n SET primary_user_email_id = user_emails.user_email_id\n FROM user_emails\n WHERE user_emails.user_email_id = $1\n AND users.user_id = user_emails.user_id\n " }, - "bd7a4a008851f3f6d7591e3463e4369cee08820af57dcd3faf95f8e9be82857d": { + "c1d90a7f2287ec779c81a521fab19e5ede3fa95484033e0312c30d9b6ecc03f0": { "describe": { "columns": [], "nullable": [], @@ -2117,14 +2127,11 @@ "Left": [ "Uuid", "Uuid", - "Text", - "Int4", - "Uuid", "Timestamptz" ] } }, - "query": "\n INSERT INTO user_passwords\n (user_password_id, user_id, hashed_password, version, upgraded_from_id, created_at)\n VALUES ($1, $2, $3, $4, $5, $6)\n " + "query": "\n INSERT INTO user_sessions (user_session_id, user_id, created_at)\n VALUES ($1, $2, $3)\n " }, "c88376abdba124ff0487a9a69d2345c7d69d7394f355111ec369cfa6d45fb40f": { "describe": { @@ -2371,19 +2378,18 @@ }, "query": "\n INSERT INTO oauth2_refresh_tokens\n (oauth2_refresh_token_id, oauth2_session_id, oauth2_access_token_id,\n refresh_token, created_at)\n VALUES\n ($1, $2, $3, $4, $5)\n " }, - "e446e37d48c8838ef2e0d0fd82f8f7b04893c84ad46747cdf193ebd83755ceb2": { + "dbf4be84eeff9ea51b00185faae2d453ab449017ed492bf6711dc7fceb630880": { "describe": { "columns": [], "nullable": [], "parameters": { "Left": [ - "Uuid", - "Uuid", - "Timestamptz" + "Timestamptz", + "Uuid" ] } }, - "query": "\n INSERT INTO user_sessions (user_session_id, user_id, created_at)\n VALUES ($1, $2, $3)\n " + "query": "\n UPDATE user_sessions\n SET finished_at = $1\n WHERE user_session_id = $2\n " }, "e6dc63984aced9e19c20e90e9cd75d6f6d7ade64f782697715ac4da077b2e1fc": { "describe": { diff --git a/crates/storage/src/oauth2/access_token.rs b/crates/storage/src/oauth2/access_token.rs index 71e014e4..5c2347d2 100644 --- a/crates/storage/src/oauth2/access_token.rs +++ b/crates/storage/src/oauth2/access_token.rs @@ -175,6 +175,7 @@ pub async fn lookup_active_access_token( let browser_session = BrowserSession { id: res.user_session_id.into(), created_at: res.user_session_created_at, + finished_at: None, user, last_authentication, }; diff --git a/crates/storage/src/oauth2/authorization_grant.rs b/crates/storage/src/oauth2/authorization_grant.rs index 957400d9..b7ffb30d 100644 --- a/crates/storage/src/oauth2/authorization_grant.rs +++ b/crates/storage/src/oauth2/authorization_grant.rs @@ -224,6 +224,7 @@ impl GrantLookup { id: user_session_id.into(), user, created_at: user_session_created_at, + finished_at: None, last_authentication, }; diff --git a/crates/storage/src/oauth2/mod.rs b/crates/storage/src/oauth2/mod.rs index 81a74363..bdc9c1b5 100644 --- a/crates/storage/src/oauth2/mod.rs +++ b/crates/storage/src/oauth2/mod.rs @@ -23,8 +23,8 @@ use uuid::Uuid; use self::client::lookup_clients; use crate::{ pagination::{process_page, QueryBuilderExt}, - user::lookup_active_session, - Clock, DatabaseError, DatabaseInconsistencyError, + user::BrowserSessionRepository, + Clock, DatabaseError, DatabaseInconsistencyError, Repository, }; pub mod access_token; @@ -134,11 +134,10 @@ pub async fn get_paginated_user_oauth_sessions( // ideal let mut browser_sessions: HashMap = HashMap::new(); for id in browser_session_ids { - let v = lookup_active_session(&mut *conn, id) - .await? - .ok_or_else(|| { - DatabaseInconsistencyError::on("oauth2_sessions").column("user_session_id") - })?; + let v = conn.browser_session().lookup(id).await?.ok_or_else(|| { + DatabaseInconsistencyError::on("oauth2_sessions").column("user_session_id") + })?; + browser_sessions.insert(id, v); } diff --git a/crates/storage/src/oauth2/refresh_token.rs b/crates/storage/src/oauth2/refresh_token.rs index 79d90c01..5c2b6318 100644 --- a/crates/storage/src/oauth2/refresh_token.rs +++ b/crates/storage/src/oauth2/refresh_token.rs @@ -204,6 +204,7 @@ pub async fn lookup_active_refresh_token( let browser_session = BrowserSession { id: res.user_session_id.into(), created_at: res.user_session_created_at, + finished_at: None, user, last_authentication, }; diff --git a/crates/storage/src/repository.rs b/crates/storage/src/repository.rs index f321e941..ef1a567a 100644 --- a/crates/storage/src/repository.rs +++ b/crates/storage/src/repository.rs @@ -19,7 +19,10 @@ use crate::{ PgUpstreamOAuthLinkRepository, PgUpstreamOAuthProviderRepository, PgUpstreamOAuthSessionRepository, }, - user::{PgUserEmailRepository, PgUserRepository}, + user::{ + PgBrowserSessionRepository, PgUserEmailRepository, PgUserPasswordRepository, + PgUserRepository, + }, }; pub trait Repository { @@ -43,11 +46,21 @@ pub trait Repository { where Self: 'c; + type UserPasswordRepository<'c> + where + Self: 'c; + + type BrowserSessionRepository<'c> + where + Self: 'c; + fn upstream_oauth_link(&mut self) -> Self::UpstreamOAuthLinkRepository<'_>; fn upstream_oauth_provider(&mut self) -> Self::UpstreamOAuthProviderRepository<'_>; fn upstream_oauth_session(&mut self) -> Self::UpstreamOAuthSessionRepository<'_>; fn user(&mut self) -> Self::UserRepository<'_>; fn user_email(&mut self) -> Self::UserEmailRepository<'_>; + fn user_password(&mut self) -> Self::UserPasswordRepository<'_>; + fn browser_session(&mut self) -> Self::BrowserSessionRepository<'_>; } impl Repository for PgConnection { @@ -56,6 +69,8 @@ impl Repository for PgConnection { type UpstreamOAuthSessionRepository<'c> = PgUpstreamOAuthSessionRepository<'c> where Self: 'c; type UserRepository<'c> = PgUserRepository<'c> where Self: 'c; type UserEmailRepository<'c> = PgUserEmailRepository<'c> where Self: 'c; + type UserPasswordRepository<'c> = PgUserPasswordRepository<'c> where Self: 'c; + type BrowserSessionRepository<'c> = PgBrowserSessionRepository<'c> where Self: 'c; fn upstream_oauth_link(&mut self) -> Self::UpstreamOAuthLinkRepository<'_> { PgUpstreamOAuthLinkRepository::new(self) @@ -76,6 +91,14 @@ impl Repository for PgConnection { fn user_email(&mut self) -> Self::UserEmailRepository<'_> { PgUserEmailRepository::new(self) } + + fn user_password(&mut self) -> Self::UserPasswordRepository<'_> { + PgUserPasswordRepository::new(self) + } + + fn browser_session(&mut self) -> Self::BrowserSessionRepository<'_> { + PgBrowserSessionRepository::new(self) + } } impl<'t> Repository for Transaction<'t, Postgres> { @@ -84,6 +107,8 @@ impl<'t> Repository for Transaction<'t, Postgres> { type UpstreamOAuthSessionRepository<'c> = PgUpstreamOAuthSessionRepository<'c> where Self: 'c; type UserRepository<'c> = PgUserRepository<'c> where Self: 'c; type UserEmailRepository<'c> = PgUserEmailRepository<'c> where Self: 'c; + type UserPasswordRepository<'c> = PgUserPasswordRepository<'c> where Self: 'c; + type BrowserSessionRepository<'c> = PgBrowserSessionRepository<'c> where Self: 'c; fn upstream_oauth_link(&mut self) -> Self::UpstreamOAuthLinkRepository<'_> { PgUpstreamOAuthLinkRepository::new(self) @@ -104,4 +129,12 @@ impl<'t> Repository for Transaction<'t, Postgres> { fn user_email(&mut self) -> Self::UserEmailRepository<'_> { PgUserEmailRepository::new(self) } + + fn user_password(&mut self) -> Self::UserPasswordRepository<'_> { + PgUserPasswordRepository::new(self) + } + + fn browser_session(&mut self) -> Self::BrowserSessionRepository<'_> { + PgBrowserSessionRepository::new(self) + } } diff --git a/crates/storage/src/user/authentication.rs b/crates/storage/src/user/authentication.rs deleted file mode 100644 index 546b54a2..00000000 --- a/crates/storage/src/user/authentication.rs +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2022 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use mas_data_model::{Authentication, BrowserSession, Password, UpstreamOAuthLink}; -use rand::Rng; -use sqlx::PgExecutor; -use ulid::Ulid; -use uuid::Uuid; - -use crate::Clock; - -#[tracing::instrument( - skip_all, - fields( - user.id = %user_session.user.id, - %user_password.id, - %user_session.id, - user_session_authentication.id, - ), - err, -)] -pub async fn authenticate_session_with_password( - executor: impl PgExecutor<'_>, - mut rng: impl Rng + Send, - clock: &Clock, - user_session: &mut BrowserSession, - user_password: &Password, -) -> Result<(), sqlx::Error> { - let created_at = clock.now(); - let id = Ulid::from_datetime_with_source(created_at.into(), &mut rng); - tracing::Span::current().record( - "user_session_authentication.id", - tracing::field::display(id), - ); - - sqlx::query!( - r#" - INSERT INTO user_session_authentications - (user_session_authentication_id, user_session_id, created_at) - VALUES ($1, $2, $3) - "#, - Uuid::from(id), - Uuid::from(user_session.id), - created_at, - ) - .execute(executor) - .await?; - - user_session.last_authentication = Some(Authentication { id, created_at }); - - Ok(()) -} - -#[tracing::instrument( - skip_all, - fields( - user.id = %user_session.user.id, - %upstream_oauth_link.id, - %user_session.id, - user_session_authentication.id, - ), - err, -)] -pub async fn authenticate_session_with_upstream( - executor: impl PgExecutor<'_>, - mut rng: impl Rng + Send, - clock: &Clock, - user_session: &mut BrowserSession, - upstream_oauth_link: &UpstreamOAuthLink, -) -> Result<(), sqlx::Error> { - let created_at = clock.now(); - let id = Ulid::from_datetime_with_source(created_at.into(), &mut rng); - tracing::Span::current().record( - "user_session_authentication.id", - tracing::field::display(id), - ); - - sqlx::query!( - r#" - INSERT INTO user_session_authentications - (user_session_authentication_id, user_session_id, created_at) - VALUES ($1, $2, $3) - "#, - Uuid::from(id), - Uuid::from(user_session.id), - created_at, - ) - .execute(executor) - .await?; - - user_session.last_authentication = Some(Authentication { id, created_at }); - - Ok(()) -} diff --git a/crates/storage/src/user/mod.rs b/crates/storage/src/user/mod.rs index 50f71752..592cb59d 100644 --- a/crates/storage/src/user/mod.rs +++ b/crates/storage/src/user/mod.rs @@ -14,27 +14,22 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use mas_data_model::{Authentication, BrowserSession, User}; -use rand::{Rng, RngCore}; -use sqlx::{PgConnection, PgExecutor, QueryBuilder}; -use tracing::{info_span, Instrument}; +use mas_data_model::User; +use rand::RngCore; +use sqlx::PgConnection; use ulid::Ulid; use uuid::Uuid; -use crate::{ - pagination::{process_page, QueryBuilderExt}, - tracing::ExecuteExt, - Clock, DatabaseError, DatabaseInconsistencyError, LookupResultExt, -}; +use crate::{tracing::ExecuteExt, Clock, DatabaseError, LookupResultExt}; -mod authentication; mod email; mod password; +mod session; pub use self::{ - authentication::{authenticate_session_with_password, authenticate_session_with_upstream}, email::{PgUserEmailRepository, UserEmailRepository}, - password::{add_user_password, lookup_user_password}, + password::{PgUserPasswordRepository, UserPasswordRepository}, + session::{BrowserSessionRepository, PgBrowserSessionRepository}, }; #[async_trait] @@ -218,234 +213,3 @@ impl<'c> UserRepository for PgUserRepository<'c> { Ok(exists) } } - -#[derive(sqlx::FromRow)] -struct SessionLookup { - user_session_id: Uuid, - user_session_created_at: DateTime, - user_id: Uuid, - user_username: String, - user_primary_user_email_id: Option, - last_authentication_id: Option, - last_authd_at: Option>, -} - -impl TryInto for SessionLookup { - type Error = DatabaseInconsistencyError; - - fn try_into(self) -> Result { - let id = Ulid::from(self.user_id); - let user = User { - id, - username: self.user_username, - sub: id.to_string(), - primary_user_email_id: self.user_primary_user_email_id.map(Into::into), - }; - - let last_authentication = match (self.last_authentication_id, self.last_authd_at) { - (Some(id), Some(created_at)) => Some(Authentication { - id: id.into(), - created_at, - }), - (None, None) => None, - _ => { - return Err(DatabaseInconsistencyError::on( - "user_session_authentications", - )) - } - }; - - Ok(BrowserSession { - id: self.user_session_id.into(), - user, - created_at: self.user_session_created_at, - last_authentication, - }) - } -} - -#[tracing::instrument( - skip_all, - fields(user_session.id = %id), - err, -)] -pub async fn lookup_active_session( - executor: impl PgExecutor<'_>, - id: Ulid, -) -> Result, DatabaseError> { - let res = sqlx::query_as!( - SessionLookup, - r#" - SELECT s.user_session_id - , s.created_at AS "user_session_created_at" - , u.user_id - , u.username AS "user_username" - , u.primary_user_email_id AS "user_primary_user_email_id" - , a.user_session_authentication_id AS "last_authentication_id?" - , a.created_at AS "last_authd_at?" - FROM user_sessions s - INNER JOIN users u - USING (user_id) - LEFT JOIN user_session_authentications a - USING (user_session_id) - WHERE s.user_session_id = $1 AND s.finished_at IS NULL - ORDER BY a.created_at DESC - LIMIT 1 - "#, - Uuid::from(id), - ) - .fetch_one(executor) - .await - .to_option()?; - - let Some(res) = res else { return Ok(None) }; - - Ok(Some(res.try_into()?)) -} - -#[tracing::instrument( - skip_all, - fields( - %user.id, - %user.username, - ), - err, -)] -pub async fn get_paginated_user_sessions( - executor: impl PgExecutor<'_>, - user: &User, - before: Option, - after: Option, - first: Option, - last: Option, -) -> Result<(bool, bool, Vec), DatabaseError> { - let mut query = QueryBuilder::new( - r#" - SELECT - s.user_session_id, - u.user_id, - u.username, - s.created_at, - a.user_session_authentication_id AS "last_authentication_id", - a.created_at AS "last_authd_at", - ue.user_email_id AS "user_email_id", - ue.email AS "user_email", - ue.created_at AS "user_email_created_at", - ue.confirmed_at AS "user_email_confirmed_at" - FROM user_sessions s - INNER JOIN users u - USING (user_id) - LEFT JOIN user_session_authentications a - USING (user_session_id) - LEFT JOIN user_emails ue - ON ue.user_email_id = u.primary_user_email_id - "#, - ); - - query - .push(" WHERE s.finished_at IS NULL AND s.user_id = ") - .push_bind(Uuid::from(user.id)) - .generate_pagination("s.user_session_id", before, after, first, last)?; - - let span = info_span!("Fetch paginated user emails", db.statement = query.sql()); - let page: Vec = query - .build_query_as() - .fetch_all(executor) - .instrument(span) - .await?; - - let (has_previous_page, has_next_page, page) = process_page(page, first, last)?; - - let page: Result, _> = page.into_iter().map(TryInto::try_into).collect(); - Ok((has_previous_page, has_next_page, page?)) -} - -#[tracing::instrument( - skip_all, - fields( - %user.id, - user_session.id, - ), - err, -)] -pub async fn start_session( - executor: impl PgExecutor<'_>, - mut rng: impl Rng + Send, - clock: &Clock, - user: User, -) -> Result { - let created_at = clock.now(); - let id = Ulid::from_datetime_with_source(created_at.into(), &mut rng); - tracing::Span::current().record("user_session.id", tracing::field::display(id)); - - sqlx::query!( - r#" - INSERT INTO user_sessions (user_session_id, user_id, created_at) - VALUES ($1, $2, $3) - "#, - Uuid::from(id), - Uuid::from(user.id), - created_at, - ) - .execute(executor) - .await?; - - let session = BrowserSession { - id, - user, - created_at, - last_authentication: None, - }; - - Ok(session) -} - -#[tracing::instrument( - skip_all, - fields(%user.id), - err, -)] -pub async fn count_active_sessions( - executor: impl PgExecutor<'_>, - user: &User, -) -> Result { - let res = sqlx::query_scalar!( - r#" - SELECT COUNT(*) as "count!" - FROM user_sessions s - WHERE s.user_id = $1 AND s.finished_at IS NULL - "#, - Uuid::from(user.id), - ) - .fetch_one(executor) - .await?; - - Ok(res) -} - -#[tracing::instrument( - skip_all, - fields(%user_session.id), - err, -)] -pub async fn end_session( - executor: impl PgExecutor<'_>, - clock: &Clock, - user_session: &BrowserSession, -) -> Result<(), DatabaseError> { - let now = clock.now(); - let res = sqlx::query!( - r#" - UPDATE user_sessions - SET finished_at = $1 - WHERE user_session_id = $2 - "#, - now, - Uuid::from(user_session.id), - ) - .execute(executor) - .instrument(info_span!("End session")) - .await?; - - DatabaseError::ensure_affected_rows(&res, 1) -} diff --git a/crates/storage/src/user/password.rs b/crates/storage/src/user/password.rs index 14ac5222..56c8a439 100644 --- a/crates/storage/src/user/password.rs +++ b/crates/storage/src/user/password.rs @@ -12,63 +12,42 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; use chrono::{DateTime, Utc}; use mas_data_model::{Password, User}; -use rand::Rng; -use sqlx::PgExecutor; +use rand::RngCore; +use sqlx::PgConnection; use ulid::Ulid; use uuid::Uuid; -use crate::{Clock, DatabaseError, DatabaseInconsistencyError, LookupResultExt}; +use crate::{ + tracing::ExecuteExt, Clock, DatabaseError, DatabaseInconsistencyError, LookupResultExt, +}; -#[tracing::instrument( - skip_all, - fields( - %user.id, - %user.username, - user_password.id, - user_password.version = version, - ), - err, -)] -pub async fn add_user_password( - executor: impl PgExecutor<'_>, - mut rng: impl Rng + Send, - clock: &Clock, - user: &User, - version: u16, - hashed_password: String, - upgraded_from: Option, -) -> Result { - let created_at = clock.now(); - let id = Ulid::from_datetime_with_source(created_at.into(), &mut rng); - tracing::Span::current().record("user_password.id", tracing::field::display(id)); +#[async_trait] +pub trait UserPasswordRepository: Send + Sync { + type Error; - let upgraded_from_id = upgraded_from.map(|p| p.id); + async fn active(&mut self, user: &User) -> Result, Self::Error>; + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &Clock, + user: &User, + version: u16, + hashed_password: String, + upgraded_from: Option<&Password>, + ) -> Result; +} - sqlx::query!( - r#" - INSERT INTO user_passwords - (user_password_id, user_id, hashed_password, version, upgraded_from_id, created_at) - VALUES ($1, $2, $3, $4, $5, $6) - "#, - Uuid::from(id), - Uuid::from(user.id), - hashed_password, - i32::from(version), - upgraded_from_id.map(Uuid::from), - created_at, - ) - .execute(executor) - .await?; +pub struct PgUserPasswordRepository<'c> { + conn: &'c mut PgConnection, +} - Ok(Password { - id, - hashed_password, - version, - upgraded_from_id, - created_at, - }) +impl<'c> PgUserPasswordRepository<'c> { + pub fn new(conn: &'c mut PgConnection) -> Self { + Self { conn } + } } struct UserPasswordLookup { @@ -79,57 +58,115 @@ struct UserPasswordLookup { created_at: DateTime, } -#[tracing::instrument( - skip_all, - fields( - %user.id, - %user.username, - ), - err, -)] -pub async fn lookup_user_password( - executor: impl PgExecutor<'_>, - user: &User, -) -> Result, DatabaseError> { - let res = sqlx::query_as!( - UserPasswordLookup, - r#" - SELECT up.user_password_id - , up.hashed_password - , up.version - , up.upgraded_from_id - , up.created_at - FROM user_passwords up - WHERE up.user_id = $1 - ORDER BY up.created_at DESC - LIMIT 1 - "#, - Uuid::from(user.id), - ) - .fetch_one(executor) - .await - .to_option()?; +#[async_trait] +impl<'c> UserPasswordRepository for PgUserPasswordRepository<'c> { + type Error = DatabaseError; - let Some(res) = res else { return Ok(None) }; + #[tracing::instrument( + name = "db.user_password.active", + skip_all, + fields( + db.statement, + %user.id, + %user.username, + ), + err, + )] + async fn active(&mut self, user: &User) -> Result, Self::Error> { + let res = sqlx::query_as!( + UserPasswordLookup, + r#" + SELECT up.user_password_id + , up.hashed_password + , up.version + , up.upgraded_from_id + , up.created_at + FROM user_passwords up + WHERE up.user_id = $1 + ORDER BY up.created_at DESC + LIMIT 1 + "#, + Uuid::from(user.id), + ) + .traced() + .fetch_one(&mut *self.conn) + .await + .to_option()?; - let id = Ulid::from(res.user_password_id); + let Some(res) = res else { return Ok(None) }; - let version = res.version.try_into().map_err(|e| { - DatabaseInconsistencyError::on("user_passwords") - .column("version") - .row(id) - .source(e) - })?; + let id = Ulid::from(res.user_password_id); - let upgraded_from_id = res.upgraded_from_id.map(Ulid::from); - let created_at = res.created_at; - let hashed_password = res.hashed_password; + let version = res.version.try_into().map_err(|e| { + DatabaseInconsistencyError::on("user_passwords") + .column("version") + .row(id) + .source(e) + })?; - Ok(Some(Password { - id, - hashed_password, - version, - upgraded_from_id, - created_at, - })) + let upgraded_from_id = res.upgraded_from_id.map(Ulid::from); + let created_at = res.created_at; + let hashed_password = res.hashed_password; + + Ok(Some(Password { + id, + hashed_password, + version, + upgraded_from_id, + created_at, + })) + } + + #[tracing::instrument( + name = "db.user_password.add", + skip_all, + fields( + db.statement, + %user.id, + %user.username, + user_password.id, + user_password.version = version, + ), + err, + )] + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &Clock, + user: &User, + version: u16, + hashed_password: String, + upgraded_from: Option<&Password>, + ) -> Result { + let created_at = clock.now(); + let id = Ulid::from_datetime_with_source(created_at.into(), rng); + tracing::Span::current().record("user_password.id", tracing::field::display(id)); + + let upgraded_from_id = upgraded_from.map(|p| p.id); + + sqlx::query!( + r#" + INSERT INTO user_passwords + (user_password_id, user_id, hashed_password, version, upgraded_from_id, created_at) + VALUES ($1, $2, $3, $4, $5, $6) + "#, + Uuid::from(id), + Uuid::from(user.id), + hashed_password, + i32::from(version), + upgraded_from_id.map(Uuid::from), + created_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(Password { + id, + hashed_password, + version, + upgraded_from_id, + created_at, + }) + } } diff --git a/crates/storage/src/user/session.rs b/crates/storage/src/user/session.rs new file mode 100644 index 00000000..01102ca9 --- /dev/null +++ b/crates/storage/src/user/session.rs @@ -0,0 +1,425 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use mas_data_model::{Authentication, BrowserSession, Password, UpstreamOAuthLink, User}; +use rand::RngCore; +use sqlx::{PgConnection, QueryBuilder}; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{ + pagination::{process_page, Page, QueryBuilderExt}, + tracing::ExecuteExt, + Clock, DatabaseError, DatabaseInconsistencyError, LookupResultExt, +}; + +#[async_trait] +pub trait BrowserSessionRepository: Send + Sync { + type Error; + + async fn lookup(&mut self, id: Ulid) -> Result, Self::Error>; + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &Clock, + user: &User, + ) -> Result; + async fn finish( + &mut self, + clock: &Clock, + user_session: BrowserSession, + ) -> Result; + async fn list_active_paginated( + &mut self, + user: &User, + before: Option, + after: Option, + first: Option, + last: Option, + ) -> Result, Self::Error>; + async fn count_active(&mut self, user: &User) -> Result; + + async fn authenticate_with_password( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &Clock, + user_session: BrowserSession, + user_password: &Password, + ) -> Result; + + async fn authenticate_with_upstream( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &Clock, + user_session: BrowserSession, + upstream_oauth_link: &UpstreamOAuthLink, + ) -> Result; +} + +pub struct PgBrowserSessionRepository<'c> { + conn: &'c mut PgConnection, +} + +impl<'c> PgBrowserSessionRepository<'c> { + pub fn new(conn: &'c mut PgConnection) -> Self { + Self { conn } + } +} + +#[derive(sqlx::FromRow)] +struct SessionLookup { + user_session_id: Uuid, + user_session_created_at: DateTime, + user_session_finished_at: Option>, + user_id: Uuid, + user_username: String, + user_primary_user_email_id: Option, + last_authentication_id: Option, + last_authd_at: Option>, +} + +impl TryInto for SessionLookup { + type Error = DatabaseInconsistencyError; + + fn try_into(self) -> Result { + let id = Ulid::from(self.user_id); + let user = User { + id, + username: self.user_username, + sub: id.to_string(), + primary_user_email_id: self.user_primary_user_email_id.map(Into::into), + }; + + let last_authentication = match (self.last_authentication_id, self.last_authd_at) { + (Some(id), Some(created_at)) => Some(Authentication { + id: id.into(), + created_at, + }), + (None, None) => None, + _ => { + return Err(DatabaseInconsistencyError::on( + "user_session_authentications", + )) + } + }; + + Ok(BrowserSession { + id: self.user_session_id.into(), + user, + created_at: self.user_session_created_at, + finished_at: self.user_session_finished_at, + last_authentication, + }) + } +} + +#[async_trait] +impl<'c> BrowserSessionRepository for PgBrowserSessionRepository<'c> { + type Error = DatabaseError; + + #[tracing::instrument( + name = "db.browser_session.lookup", + skip_all, + fields( + db.statement, + user_session.id = %id, + ), + err, + )] + async fn lookup(&mut self, id: Ulid) -> Result, Self::Error> { + let res = sqlx::query_as!( + SessionLookup, + r#" + SELECT s.user_session_id + , s.created_at AS "user_session_created_at" + , s.finished_at AS "user_session_finished_at" + , u.user_id + , u.username AS "user_username" + , u.primary_user_email_id AS "user_primary_user_email_id" + , a.user_session_authentication_id AS "last_authentication_id?" + , a.created_at AS "last_authd_at?" + FROM user_sessions s + INNER JOIN users u + USING (user_id) + LEFT JOIN user_session_authentications a + USING (user_session_id) + WHERE s.user_session_id = $1 + ORDER BY a.created_at DESC + LIMIT 1 + "#, + Uuid::from(id), + ) + .traced() + .fetch_one(&mut *self.conn) + .await + .to_option()?; + + let Some(res) = res else { return Ok(None) }; + + Ok(Some(res.try_into()?)) + } + + #[tracing::instrument( + name = "db.browser_session.add", + skip_all, + fields( + db.statement, + %user.id, + user_session.id, + ), + err, + )] + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &Clock, + user: &User, + ) -> Result { + let created_at = clock.now(); + let id = Ulid::from_datetime_with_source(created_at.into(), rng); + tracing::Span::current().record("user_session.id", tracing::field::display(id)); + + sqlx::query!( + r#" + INSERT INTO user_sessions (user_session_id, user_id, created_at) + VALUES ($1, $2, $3) + "#, + Uuid::from(id), + Uuid::from(user.id), + created_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + let session = BrowserSession { + id, + // XXX + user: user.clone(), + created_at, + finished_at: None, + last_authentication: None, + }; + + Ok(session) + } + + #[tracing::instrument( + name = "db.browser_session.finish", + skip_all, + fields( + db.statement, + %user_session.id, + ), + err, + )] + async fn finish( + &mut self, + clock: &Clock, + mut user_session: BrowserSession, + ) -> Result { + let finished_at = clock.now(); + let res = sqlx::query!( + r#" + UPDATE user_sessions + SET finished_at = $1 + WHERE user_session_id = $2 + "#, + finished_at, + Uuid::from(user_session.id), + ) + .traced() + .execute(&mut *self.conn) + .await?; + + user_session.finished_at = Some(finished_at); + + DatabaseError::ensure_affected_rows(&res, 1)?; + + Ok(user_session) + } + + #[tracing::instrument( + name = "db.browser_session.list_active_paginated", + skip_all, + fields( + db.statement, + %user.id, + ), + err, + )] + async fn list_active_paginated( + &mut self, + user: &User, + before: Option, + after: Option, + first: Option, + last: Option, + ) -> Result, Self::Error> { + // TODO: ordering of last authentication is wrong + let mut query = QueryBuilder::new( + r#" + SELECT DISTINCT ON (s.user_session_id) + s.user_session_id, + u.user_id, + u.username, + s.created_at, + a.user_session_authentication_id AS "last_authentication_id", + a.created_at AS "last_authd_at", + FROM user_sessions s + INNER JOIN users u + USING (user_id) + LEFT JOIN user_session_authentications a + USING (user_session_id) + "#, + ); + + query + .push(" WHERE s.finished_at IS NULL AND s.user_id = ") + .push_bind(Uuid::from(user.id)) + .generate_pagination("s.user_session_id", before, after, first, last)?; + + let page: Vec = query + .build_query_as() + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let (has_previous_page, has_next_page, edges) = process_page(page, first, last)?; + + let edges: Result, _> = edges.into_iter().map(TryInto::try_into).collect(); + Ok(Page { + has_previous_page, + has_next_page, + edges: edges?, + }) + } + + #[tracing::instrument( + name = "db.browser_session.count_active", + skip_all, + fields( + db.statement, + %user.id, + ), + err, + )] + async fn count_active(&mut self, user: &User) -> Result { + let res = sqlx::query_scalar!( + r#" + SELECT COUNT(*) as "count!" + FROM user_sessions s + WHERE s.user_id = $1 AND s.finished_at IS NULL + "#, + Uuid::from(user.id), + ) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + res.try_into().map_err(DatabaseError::to_invalid_operation) + } + + #[tracing::instrument( + name = "db.browser_session.authenticate_with_password", + skip_all, + fields( + db.statement, + %user_session.id, + %user_password.id, + user_session_authentication.id, + ), + err, + )] + async fn authenticate_with_password( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &Clock, + mut user_session: BrowserSession, + user_password: &Password, + ) -> Result { + let _user_password = user_password; + let created_at = clock.now(); + let id = Ulid::from_datetime_with_source(created_at.into(), rng); + tracing::Span::current().record( + "user_session_authentication.id", + tracing::field::display(id), + ); + + sqlx::query!( + r#" + INSERT INTO user_session_authentications + (user_session_authentication_id, user_session_id, created_at) + VALUES ($1, $2, $3) + "#, + Uuid::from(id), + Uuid::from(user_session.id), + created_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + user_session.last_authentication = Some(Authentication { id, created_at }); + + Ok(user_session) + } + + #[tracing::instrument( + name = "db.browser_session.authenticate_with_upstream", + skip_all, + fields( + db.statement, + %user_session.id, + %upstream_oauth_link.id, + user_session_authentication.id, + ), + err, + )] + async fn authenticate_with_upstream( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &Clock, + mut user_session: BrowserSession, + upstream_oauth_link: &UpstreamOAuthLink, + ) -> Result { + let _upstream_oauth_link = upstream_oauth_link; + let created_at = clock.now(); + let id = Ulid::from_datetime_with_source(created_at.into(), rng); + tracing::Span::current().record( + "user_session_authentication.id", + tracing::field::display(id), + ); + + sqlx::query!( + r#" + INSERT INTO user_session_authentications + (user_session_authentication_id, user_session_id, created_at) + VALUES ($1, $2, $3) + "#, + Uuid::from(id), + Uuid::from(user_session.id), + created_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + user_session.last_authentication = Some(Authentication { id, created_at }); + + Ok(user_session) + } +} diff --git a/crates/templates/src/context.rs b/crates/templates/src/context.rs index 8e8e0c8a..cd70289b 100644 --- a/crates/templates/src/context.rs +++ b/crates/templates/src/context.rs @@ -532,14 +532,14 @@ where { /// Context used by the `account/index.html` template #[derive(Serialize)] pub struct AccountContext { - active_sessions: i64, + active_sessions: usize, emails: Vec, } impl AccountContext { /// Constructs a context for the "my account" page #[must_use] - pub fn new(active_sessions: i64, emails: Vec) -> Self { + pub fn new(active_sessions: usize, emails: Vec) -> Self { Self { active_sessions, emails,