1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-07-29 22:01:14 +03:00

strorage: browser session and user password repositories

This commit is contained in:
Quentin Gliech
2023-01-03 15:21:47 +01:00
parent 4790897892
commit 53172d6a3f
25 changed files with 914 additions and 726 deletions

View File

@ -14,9 +14,9 @@
use axum_extra::extract::cookie::{Cookie, PrivateCookieJar};
use mas_data_model::BrowserSession;
use mas_storage::{user::lookup_active_session, DatabaseError};
use mas_storage::{user::BrowserSessionRepository, DatabaseError, Repository};
use serde::{Deserialize, Serialize};
use sqlx::{Executor, Postgres};
use sqlx::PgConnection;
use ulid::Ulid;
use crate::CookieExt;
@ -46,7 +46,7 @@ impl SessionInfo {
/// Load the [`BrowserSession`] from database
pub async fn load_session(
&self,
executor: impl Executor<'_, Database = Postgres>,
conn: &mut PgConnection,
) -> Result<Option<BrowserSession>, DatabaseError> {
let session_id = if let Some(id) = self.current {
id
@ -54,8 +54,14 @@ impl SessionInfo {
return Ok(None);
};
let res = lookup_active_session(executor, session_id).await?;
Ok(res)
let maybe_session = conn
.browser_session()
.lookup(session_id)
.await?
// Ensure that the session is still active
.filter(BrowserSession::active);
Ok(maybe_session)
}
}

View File

@ -20,7 +20,7 @@ use mas_router::UrlBuilder;
use mas_storage::{
oauth2::client::{insert_client_from_config, lookup_client, truncate_clients},
upstream_oauth2::UpstreamOAuthProviderRepository,
user::{add_user_password, UserEmailRepository, UserRepository},
user::{UserEmailRepository, UserPasswordRepository, UserRepository},
Clock, Repository,
};
use oauth2_types::scope::Scope;
@ -210,15 +210,8 @@ impl Options {
let (version, hashed_password) = password_manager.hash(&mut rng, password).await?;
add_user_password(
&mut txn,
&mut rng,
&clock,
&user,
version,
hashed_password,
None,
)
txn.user_password()
.add(&mut rng, &clock, &user, version, hashed_password, None)
.await?;
info!(%user.id, %user.username, "Password changed");

View File

@ -57,10 +57,16 @@ pub struct BrowserSession {
pub id: Ulid,
pub user: User,
pub created_at: DateTime<Utc>,
pub finished_at: Option<DateTime<Utc>>,
pub last_authentication: Option<Authentication>,
}
impl BrowserSession {
#[must_use]
pub fn active(&self) -> bool {
self.finished_at.is_none()
}
#[must_use]
pub fn was_authenticated_after(&self, after: DateTime<Utc>) -> bool {
if let Some(auth) = &self.last_authentication {
@ -80,6 +86,7 @@ impl BrowserSession {
id: Ulid::from_datetime_with_source(now.into(), rng),
user,
created_at: now,
finished_at: None,
last_authentication: None,
})
.collect()

View File

@ -31,8 +31,9 @@ use async_graphql::{
Context, Description, EmptyMutation, EmptySubscription, ID,
};
use mas_storage::{
upstream_oauth2::UpstreamOAuthProviderRepository, user::UserEmailRepository, Repository,
UpstreamOAuthLinkRepository,
upstream_oauth2::UpstreamOAuthProviderRepository,
user::{BrowserSessionRepository, UserEmailRepository},
Repository, UpstreamOAuthLinkRepository,
};
use model::CreationEvent;
use sqlx::PgPool;
@ -128,7 +129,7 @@ impl RootQuery {
let Some(session) = session else { return Ok(None) };
let current_user = session.user;
let browser_session = mas_storage::user::lookup_active_session(&mut conn, id).await?;
let browser_session = conn.browser_session().lookup(id).await?;
let ret = browser_session.and_then(|browser_session| {
if browser_session.user.id == current_user.id {

View File

@ -17,7 +17,10 @@ use async_graphql::{
Context, Description, Object, ID,
};
use chrono::{DateTime, Utc};
use mas_storage::{user::UserEmailRepository, Repository, UpstreamOAuthLinkRepository};
use mas_storage::{
user::{BrowserSessionRepository, UserEmailRepository},
Repository, UpstreamOAuthLinkRepository,
};
use sqlx::PgPool;
use super::{
@ -140,14 +143,13 @@ impl User {
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::BrowserSession))
.transpose()?;
let (has_previous_page, has_next_page, edges) =
mas_storage::user::get_paginated_user_sessions(
&mut conn, &self.0, before_id, after_id, first, last,
)
let page = conn
.browser_session()
.list_active_paginated(&self.0, before_id, after_id, first, last)
.await?;
let mut connection = Connection::new(has_previous_page, has_next_page);
connection.edges.extend(edges.into_iter().map(|u| {
let mut connection = Connection::new(page.has_previous_page, page.has_next_page);
connection.edges.extend(page.edges.into_iter().map(|u| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::BrowserSession, u.id)),
BrowserSession(u),

View File

@ -21,7 +21,7 @@ use mas_storage::{
add_compat_access_token, add_compat_refresh_token, get_compat_sso_login_by_token,
mark_compat_sso_login_as_exchanged, start_compat_session,
},
user::{add_user_password, lookup_user_password, UserRepository},
user::{UserPasswordRepository, UserRepository},
Clock, Repository,
};
use serde::{Deserialize, Serialize};
@ -321,7 +321,9 @@ async fn user_password_login(
.ok_or(RouteError::UserNotFound)?;
// Lookup its password
let user_password = lookup_user_password(&mut *txn, &user)
let user_password = txn
.user_password()
.active(&user)
.await?
.ok_or(RouteError::NoPassword)?;
@ -340,14 +342,14 @@ async fn user_password_login(
if let Some((version, hashed_password)) = new_password_hash {
// Save the upgraded password if needed
add_user_password(
&mut *txn,
txn.user_password()
.add(
&mut rng,
&clock,
&user,
version,
hashed_password,
Some(user_password),
Some(&user_password),
)
.await?;
}

View File

@ -67,7 +67,8 @@ pub async fn post(
let content_type = content_type.map(|TypedHeader(h)| h.to_string());
let (session_info, _cookie_jar) = cookie_jar.session_info();
let maybe_session = session_info.load_session(&pool).await?;
let mut conn = pool.acquire().await?;
let maybe_session = session_info.load_session(&mut conn).await?;
let mut request = async_graphql::http::receive_batch_body(
content_type,
@ -116,7 +117,8 @@ pub async fn get(
RawQuery(query): RawQuery,
) -> Result<impl IntoResponse, FancyError> {
let (session_info, _cookie_jar) = cookie_jar.session_info();
let maybe_session = session_info.load_session(&pool).await?;
let mut conn = pool.acquire().await?;
let maybe_session = session_info.load_session(&mut conn).await?;
let mut request = async_graphql::http::parse_query_string(&query.unwrap_or_default())?;

View File

@ -26,7 +26,7 @@ use mas_axum_utils::{
use mas_keystore::Encrypter;
use mas_storage::{
upstream_oauth2::UpstreamOAuthSessionRepository,
user::{authenticate_session_with_upstream, start_session, UserRepository},
user::{BrowserSessionRepository, UserRepository},
Repository, UpstreamOAuthLinkRepository,
};
use mas_templates::{
@ -134,14 +134,16 @@ pub(crate) async fn get(
let maybe_user_session = user_session_info.load_session(&mut txn).await?;
let render = match (maybe_user_session, link.user_id) {
(Some(mut session), Some(user_id)) if session.user.id == user_id => {
(Some(session), Some(user_id)) if session.user.id == user_id => {
// Session already linked, and link matches the currently logged
// user. Mark the session as consumed and renew the authentication.
txn.upstream_oauth_session()
.consume(&clock, upstream_session)
.await?;
authenticate_session_with_upstream(&mut txn, &mut rng, &clock, &mut session, &link)
let session = txn
.browser_session()
.authenticate_with_upstream(&mut rng, &clock, session, &link)
.await?;
cookie_jar = cookie_jar.set_session(&session);
@ -252,7 +254,7 @@ pub(crate) async fn post(
let (user_session_info, cookie_jar) = cookie_jar.session_info();
let maybe_user_session = user_session_info.load_session(&mut txn).await?;
let mut session = match (maybe_user_session, link.user_id, form) {
let session = match (maybe_user_session, link.user_id, form) {
(Some(session), None, FormData::Link) => {
txn.upstream_oauth_link()
.associate_to_user(&link, &session.user)
@ -268,7 +270,7 @@ pub(crate) async fn post(
.await?
.ok_or(RouteError::UserNotFound)?;
start_session(&mut txn, &mut rng, &clock, user).await?
txn.browser_session().add(&mut rng, &clock, &user).await?
}
(None, None, FormData::Register { username }) => {
@ -277,7 +279,7 @@ pub(crate) async fn post(
.associate_to_user(&link, &user)
.await?;
start_session(&mut txn, &mut rng, &clock, user).await?
txn.browser_session().add(&mut rng, &clock, &user).await?
}
_ => return Err(RouteError::InvalidFormAction),
@ -287,7 +289,10 @@ pub(crate) async fn post(
.consume(&clock, upstream_session)
.await?;
authenticate_session_with_upstream(&mut txn, &mut rng, &clock, &mut session, &link).await?;
let session = txn
.browser_session()
.authenticate_with_upstream(&mut rng, &clock, session, &link)
.await?;
let cookie_jar = sessions_cookie
.consume_link(link_id)?

View File

@ -24,7 +24,7 @@ use mas_axum_utils::{csrf::CsrfExt, FancyError, SessionInfoExt};
use mas_keystore::Encrypter;
use mas_router::Route;
use mas_storage::{
user::{count_active_sessions, UserEmailRepository},
user::{BrowserSessionRepository, UserEmailRepository},
Repository,
};
use mas_templates::{AccountContext, TemplateContext, Templates};
@ -50,7 +50,7 @@ pub(crate) async fn get(
return Ok((cookie_jar, login.go()).into_response());
};
let active_sessions = count_active_sessions(&mut conn, &session.user).await?;
let active_sessions = conn.browser_session().count_active(&session.user).await?;
let emails = conn.user_email().all(&session.user).await?;

View File

@ -26,8 +26,8 @@ use mas_data_model::BrowserSession;
use mas_keystore::Encrypter;
use mas_router::Route;
use mas_storage::{
user::{add_user_password, authenticate_session_with_password, lookup_user_password},
Clock,
user::{BrowserSessionRepository, UserPasswordRepository},
Clock, Repository,
};
use mas_templates::{EmptyContext, TemplateContext, Templates};
use rand::Rng;
@ -98,14 +98,16 @@ pub(crate) async fn post(
let maybe_session = session_info.load_session(&mut txn).await?;
let mut session = if let Some(session) = maybe_session {
let session = if let Some(session) = maybe_session {
session
} else {
let login = mas_router::Login::and_then(mas_router::PostAuthAction::ChangePassword);
return Ok((cookie_jar, login.go()).into_response());
};
let user_password = lookup_user_password(&mut txn, &session.user)
let user_password = txn
.user_password()
.active(&session.user)
.await?
.context("user has no password")?;
@ -127,8 +129,9 @@ pub(crate) async fn post(
}
let (version, hashed_password) = password_manager.hash(&mut rng, new_password).await?;
let user_password = add_user_password(
&mut txn,
let user_password = txn
.user_password()
.add(
&mut rng,
&clock,
&session.user,
@ -138,7 +141,9 @@ pub(crate) async fn post(
)
.await?;
authenticate_session_with_password(&mut txn, &mut rng, &clock, &mut session, &user_password)
let session = txn
.browser_session()
.authenticate_with_password(&mut rng, &clock, session, &user_password)
.await?;
let reply = render(&mut rng, &clock, templates.clone(), session, cookie_jar).await?;

View File

@ -25,10 +25,7 @@ use mas_data_model::BrowserSession;
use mas_keystore::Encrypter;
use mas_storage::{
upstream_oauth2::UpstreamOAuthProviderRepository,
user::{
add_user_password, authenticate_session_with_password, lookup_user_password, start_session,
UserRepository,
},
user::{BrowserSessionRepository, UserPasswordRepository, UserRepository},
Clock, Repository,
};
use mas_templates::{
@ -181,7 +178,9 @@ async fn login(
.ok_or(FormError::InvalidCredentials)?;
// And its password
let user_password = lookup_user_password(&mut *conn, &user)
let user_password = conn
.user_password()
.active(&user)
.await
.map_err(|_e| FormError::Internal)?
.ok_or(FormError::InvalidCredentials)?;
@ -201,14 +200,14 @@ async fn login(
let user_password = if let Some((version, new_password_hash)) = new_password_hash {
// Save the upgraded password
add_user_password(
&mut *conn,
conn.user_password()
.add(
&mut rng,
clock,
&user,
version,
new_password_hash,
Some(user_password),
Some(&user_password),
)
.await
.map_err(|_| FormError::Internal)?
@ -217,12 +216,16 @@ async fn login(
};
// Start a new session
let mut user_session = start_session(&mut *conn, &mut rng, clock, user)
let user_session = conn
.browser_session()
.add(&mut rng, clock, &user)
.await
.map_err(|_| FormError::Internal)?;
// And mark it as authenticated by the password
authenticate_session_with_password(&mut *conn, rng, clock, &mut user_session, &user_password)
let user_session = conn
.browser_session()
.authenticate_with_password(&mut rng, clock, user_session, &user_password)
.await
.map_err(|_| FormError::Internal)?;

View File

@ -23,7 +23,7 @@ use mas_axum_utils::{
};
use mas_keystore::Encrypter;
use mas_router::{PostAuthAction, Route};
use mas_storage::{user::end_session, Clock};
use mas_storage::{user::BrowserSessionRepository, Clock, Repository};
use sqlx::PgPool;
pub(crate) async fn post(
@ -41,7 +41,7 @@ pub(crate) async fn post(
let maybe_session = session_info.load_session(&mut txn).await?;
if let Some(session) = maybe_session {
end_session(&mut txn, &clock, &session).await?;
txn.browser_session().finish(&clock, session).await?;
cookie_jar = cookie_jar.update_session_info(&session_info.mark_session_ended());
}

View File

@ -24,8 +24,9 @@ use mas_axum_utils::{
};
use mas_keystore::Encrypter;
use mas_router::Route;
use mas_storage::user::{
add_user_password, authenticate_session_with_password, lookup_user_password,
use mas_storage::{
user::{BrowserSessionRepository, UserPasswordRepository},
Repository,
};
use mas_templates::{ReauthContext, TemplateContext, Templates};
use serde::Deserialize;
@ -93,7 +94,7 @@ pub(crate) async fn post(
let maybe_session = session_info.load_session(&mut txn).await?;
let mut session = if let Some(session) = maybe_session {
let session = if let Some(session) = maybe_session {
session
} else {
// If there is no session, redirect to the login screen, keeping the
@ -103,7 +104,9 @@ pub(crate) async fn post(
};
// Load the user password
let user_password = lookup_user_password(&mut txn, &session.user)
let user_password = txn
.user_password()
.active(&session.user)
.await?
.context("User has no password")?;
@ -122,14 +125,14 @@ pub(crate) async fn post(
let user_password = if let Some((version, new_password_hash)) = new_password_hash {
// Save the upgraded password
add_user_password(
&mut *txn,
txn.user_password()
.add(
&mut rng,
&clock,
&session.user,
version,
new_password_hash,
Some(user_password),
Some(&user_password),
)
.await?
} else {
@ -137,7 +140,10 @@ pub(crate) async fn post(
};
// Mark the session as authenticated by the password
authenticate_session_with_password(&mut txn, rng, &clock, &mut session, &user_password).await?;
let session = txn
.browser_session()
.authenticate_with_password(&mut rng, &clock, session, &user_password)
.await?;
let cookie_jar = cookie_jar.set_session(&session);
txn.commit().await?;

View File

@ -32,10 +32,7 @@ use mas_keystore::Encrypter;
use mas_policy::PolicyFactory;
use mas_router::Route;
use mas_storage::{
user::{
add_user_password, authenticate_session_with_password, start_session, UserEmailRepository,
UserRepository,
},
user::{BrowserSessionRepository, UserEmailRepository, UserPasswordRepository, UserRepository},
Repository,
};
use mas_templates::{
@ -191,15 +188,9 @@ pub(crate) async fn post(
let user = txn.user().add(&mut rng, &clock, form.username).await?;
let password = Zeroizing::new(form.password.into_bytes());
let (version, hashed_password) = password_manager.hash(&mut rng, password).await?;
let user_password = add_user_password(
&mut txn,
&mut rng,
&clock,
&user,
version,
hashed_password,
None,
)
let user_password = txn
.user_password()
.add(&mut rng, &clock, &user, version, hashed_password, None)
.await?;
let user_email = txn
@ -228,8 +219,11 @@ pub(crate) async fn post(
let next = mas_router::AccountVerifyEmail::new(user_email.id).and_maybe(query.post_auth_action);
let mut session = start_session(&mut txn, &mut rng, &clock, user).await?;
authenticate_session_with_password(&mut txn, &mut rng, &clock, &mut session, &user_password)
let session = txn.browser_session().add(&mut rng, &clock, &user).await?;
let session = txn
.browser_session()
.authenticate_with_password(&mut rng, &clock, session, &user_password)
.await?;
txn.commit().await?;

View File

@ -252,62 +252,6 @@
},
"query": "\n SELECT user_id\n , username\n , primary_user_email_id\n , created_at\n FROM users\n WHERE user_id = $1\n "
},
"09d995295b2e4f180181ec96023b1e524ddae9098694eedc4dcce857e3095c0e": {
"describe": {
"columns": [
{
"name": "user_session_id",
"ordinal": 0,
"type_info": "Uuid"
},
{
"name": "user_session_created_at",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "user_id",
"ordinal": 2,
"type_info": "Uuid"
},
{
"name": "user_username",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "user_primary_user_email_id",
"ordinal": 4,
"type_info": "Uuid"
},
{
"name": "last_authentication_id?",
"ordinal": 5,
"type_info": "Uuid"
},
{
"name": "last_authd_at?",
"ordinal": 6,
"type_info": "Timestamptz"
}
],
"nullable": [
false,
false,
false,
false,
true,
false,
false
],
"parameters": {
"Left": [
"Uuid"
]
}
},
"query": "\n SELECT s.user_session_id\n , s.created_at AS \"user_session_created_at\"\n , u.user_id\n , u.username AS \"user_username\"\n , u.primary_user_email_id AS \"user_primary_user_email_id\"\n , a.user_session_authentication_id AS \"last_authentication_id?\"\n , a.created_at AS \"last_authd_at?\"\n FROM user_sessions s\n INNER JOIN users u\n USING (user_id)\n LEFT JOIN user_session_authentications a\n USING (user_session_id)\n WHERE s.user_session_id = $1 AND s.finished_at IS NULL\n ORDER BY a.created_at DESC\n LIMIT 1\n "
},
"154e2e4488ff87e09163698750b56a43127cee4e1392785416a586d40a4d9b21": {
"describe": {
"columns": [
@ -851,20 +795,6 @@
},
"query": "\n SELECT user_email_confirmation_code_id\n , user_email_id\n , code\n , created_at\n , expires_at\n , consumed_at\n FROM user_email_confirmation_codes\n WHERE code = $1\n AND user_email_id = $2\n "
},
"1eb6d13e75d8f526c2785749a020731c18012f03e07995213acd38ab560ce497": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Timestamptz"
]
}
},
"query": "\n INSERT INTO user_session_authentications\n (user_session_authentication_id, user_session_id, created_at)\n VALUES ($1, $2, $3)\n "
},
"1ee5cecfafd4726a4ebc08da8a34c09178e6e1e072581c8fca9d3d76967792cb": {
"describe": {
"columns": [],
@ -1060,6 +990,20 @@
},
"query": "\n SELECT user_email_id\n , user_id\n , email\n , created_at\n , confirmed_at\n FROM user_emails\n\n WHERE user_email_id = $1\n "
},
"41c1aafbd338c24476f27d342cf80eef7de2836e85b078232d143d6712fc2be4": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Timestamptz"
]
}
},
"query": "\n INSERT INTO user_session_authentications\n (user_session_authentication_id, user_session_id, created_at)\n VALUES ($1, $2, $3)\n "
},
"43a5cafbdc8037e9fb779812a0793cf0859902aa0dc8d25d4c33d231d3d1118b": {
"describe": {
"columns": [],
@ -1076,6 +1020,50 @@
},
"query": "\n INSERT INTO oauth2_access_tokens\n (oauth2_access_token_id, oauth2_session_id, access_token, created_at, expires_at)\n VALUES\n ($1, $2, $3, $4, $5)\n "
},
"446a8d7bd8532a751810401adfab924dc20785c91770ed43d62df2e590e8da71": {
"describe": {
"columns": [
{
"name": "user_password_id",
"ordinal": 0,
"type_info": "Uuid"
},
{
"name": "hashed_password",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "version",
"ordinal": 2,
"type_info": "Int4"
},
{
"name": "upgraded_from_id",
"ordinal": 3,
"type_info": "Uuid"
},
{
"name": "created_at",
"ordinal": 4,
"type_info": "Timestamptz"
}
],
"nullable": [
false,
false,
false,
true,
false
],
"parameters": {
"Left": [
"Uuid"
]
}
},
"query": "\n SELECT up.user_password_id\n , up.hashed_password\n , up.version\n , up.upgraded_from_id\n , up.created_at\n FROM user_passwords up\n WHERE up.user_id = $1\n ORDER BY up.created_at DESC\n LIMIT 1\n "
},
"4693f2b9b3d51ff4a05e233b6667161ebc97f331d96bf5f1c61069e1c8492105": {
"describe": {
"columns": [],
@ -1308,19 +1296,6 @@
},
"query": "\n INSERT INTO oauth2_consents\n (oauth2_consent_id, user_id, oauth2_client_id, scope_token, created_at)\n SELECT id, $2, $3, scope_token, $5 FROM UNNEST($1::uuid[], $4::text[]) u(id, scope_token)\n ON CONFLICT (user_id, oauth2_client_id, scope_token) DO UPDATE SET refreshed_at = $5\n "
},
"64a56818dd16ac6368efe3e34196a77b7feda1eb87b696e0063a51bf50e499e5": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Timestamptz",
"Uuid"
]
}
},
"query": "\n UPDATE user_sessions\n SET finished_at = $1\n WHERE user_session_id = $2\n "
},
"64e6ea47c2e877c1ebe4338d64d9ad8a6c1c777d1daea024b8ca2e7f0dd75b0f": {
"describe": {
"columns": [],
@ -1438,6 +1413,43 @@
},
"query": "\n UPDATE oauth2_access_tokens\n SET revoked_at = $2\n WHERE oauth2_access_token_id = $1\n "
},
"6f97b5f9ad0d4d15387150bea3839fb7f81015f7ceef61ecaadba64521895cff": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Text",
"Int4",
"Uuid",
"Timestamptz"
]
}
},
"query": "\n INSERT INTO user_passwords\n (user_password_id, user_id, hashed_password, version, upgraded_from_id, created_at)\n VALUES ($1, $2, $3, $4, $5, $6)\n "
},
"751d549073d77ded84aea1aaba36d3b130ec71bc592d722eb75b959b80f0b4ff": {
"describe": {
"columns": [
{
"name": "count!",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": [
"Uuid"
]
}
},
"query": "\n SELECT COUNT(*) as \"count!\"\n FROM user_sessions s\n WHERE s.user_id = $1 AND s.finished_at IS NULL\n "
},
"7756a60c36a64a259f7450d6eb77ee92303638ca374a63f23ac4944ccf9f4436": {
"describe": {
"columns": [
@ -1554,6 +1566,68 @@
},
"query": "\n SELECT\n c.oauth2_client_id,\n c.encrypted_client_secret,\n ARRAY(\n SELECT redirect_uri\n FROM oauth2_client_redirect_uris r\n WHERE r.oauth2_client_id = c.oauth2_client_id\n ) AS \"redirect_uris!\",\n c.grant_type_authorization_code,\n c.grant_type_refresh_token,\n c.client_name,\n c.logo_uri,\n c.client_uri,\n c.policy_uri,\n c.tos_uri,\n c.jwks_uri,\n c.jwks,\n c.id_token_signed_response_alg,\n c.userinfo_signed_response_alg,\n c.token_endpoint_auth_method,\n c.token_endpoint_auth_signing_alg,\n c.initiate_login_uri\n FROM oauth2_clients c\n\n WHERE c.oauth2_client_id = ANY($1::uuid[])\n "
},
"79295f3d3a75f831e9469aabfa720d381a254d00dbe39fef1e9652029d51b89b": {
"describe": {
"columns": [
{
"name": "user_session_id",
"ordinal": 0,
"type_info": "Uuid"
},
{
"name": "user_session_created_at",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "user_session_finished_at",
"ordinal": 2,
"type_info": "Timestamptz"
},
{
"name": "user_id",
"ordinal": 3,
"type_info": "Uuid"
},
{
"name": "user_username",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "user_primary_user_email_id",
"ordinal": 5,
"type_info": "Uuid"
},
{
"name": "last_authentication_id?",
"ordinal": 6,
"type_info": "Uuid"
},
{
"name": "last_authd_at?",
"ordinal": 7,
"type_info": "Timestamptz"
}
],
"nullable": [
false,
false,
true,
false,
false,
true,
false,
false
],
"parameters": {
"Left": [
"Uuid"
]
}
},
"query": "\n SELECT s.user_session_id\n , s.created_at AS \"user_session_created_at\"\n , s.finished_at AS \"user_session_finished_at\"\n , u.user_id\n , u.username AS \"user_username\"\n , u.primary_user_email_id AS \"user_primary_user_email_id\"\n , a.user_session_authentication_id AS \"last_authentication_id?\"\n , a.created_at AS \"last_authd_at?\"\n FROM user_sessions s\n INNER JOIN users u\n USING (user_id)\n LEFT JOIN user_session_authentications a\n USING (user_session_id)\n WHERE s.user_session_id = $1\n ORDER BY a.created_at DESC\n LIMIT 1\n "
},
"7ce387b1b0aaf10e72adde667b19521b66eaafa51f73bf2f95e38b8f3b64a229": {
"describe": {
"columns": [],
@ -1872,70 +1946,6 @@
},
"query": "\n UPDATE oauth2_sessions\n SET finished_at = $2\n WHERE oauth2_session_id = $1\n "
},
"9edf5e8a3e00a7cdd8e55b97105df7831ee580096299df4bd6c1ed7c96b95e83": {
"describe": {
"columns": [
{
"name": "count!",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": [
"Uuid"
]
}
},
"query": "\n SELECT COUNT(*) as \"count!\"\n FROM user_sessions s\n WHERE s.user_id = $1 AND s.finished_at IS NULL\n "
},
"a1c19d9d7f1522d126787c7f9946ed51cbbd8f27a4947bc371acab3e7bf23267": {
"describe": {
"columns": [
{
"name": "user_password_id",
"ordinal": 0,
"type_info": "Uuid"
},
{
"name": "hashed_password",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "version",
"ordinal": 2,
"type_info": "Int4"
},
{
"name": "upgraded_from_id",
"ordinal": 3,
"type_info": "Uuid"
},
{
"name": "created_at",
"ordinal": 4,
"type_info": "Timestamptz"
}
],
"nullable": [
false,
false,
false,
true,
false
],
"parameters": {
"Left": [
"Uuid"
]
}
},
"query": "\n SELECT up.user_password_id\n , up.hashed_password\n , up.version\n , up.upgraded_from_id\n , up.created_at\n FROM user_passwords up\n WHERE up.user_id = $1\n ORDER BY up.created_at DESC\n LIMIT 1\n "
},
"a300fe99c95679c5664646a6a525c0491829e97db45f3234483872ed38436322": {
"describe": {
"columns": [
@ -2109,7 +2119,7 @@
},
"query": "\n UPDATE users\n SET primary_user_email_id = user_emails.user_email_id\n FROM user_emails\n WHERE user_emails.user_email_id = $1\n AND users.user_id = user_emails.user_id\n "
},
"bd7a4a008851f3f6d7591e3463e4369cee08820af57dcd3faf95f8e9be82857d": {
"c1d90a7f2287ec779c81a521fab19e5ede3fa95484033e0312c30d9b6ecc03f0": {
"describe": {
"columns": [],
"nullable": [],
@ -2117,14 +2127,11 @@
"Left": [
"Uuid",
"Uuid",
"Text",
"Int4",
"Uuid",
"Timestamptz"
]
}
},
"query": "\n INSERT INTO user_passwords\n (user_password_id, user_id, hashed_password, version, upgraded_from_id, created_at)\n VALUES ($1, $2, $3, $4, $5, $6)\n "
"query": "\n INSERT INTO user_sessions (user_session_id, user_id, created_at)\n VALUES ($1, $2, $3)\n "
},
"c88376abdba124ff0487a9a69d2345c7d69d7394f355111ec369cfa6d45fb40f": {
"describe": {
@ -2371,19 +2378,18 @@
},
"query": "\n INSERT INTO oauth2_refresh_tokens\n (oauth2_refresh_token_id, oauth2_session_id, oauth2_access_token_id,\n refresh_token, created_at)\n VALUES\n ($1, $2, $3, $4, $5)\n "
},
"e446e37d48c8838ef2e0d0fd82f8f7b04893c84ad46747cdf193ebd83755ceb2": {
"dbf4be84eeff9ea51b00185faae2d453ab449017ed492bf6711dc7fceb630880": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Uuid",
"Uuid",
"Timestamptz"
"Timestamptz",
"Uuid"
]
}
},
"query": "\n INSERT INTO user_sessions (user_session_id, user_id, created_at)\n VALUES ($1, $2, $3)\n "
"query": "\n UPDATE user_sessions\n SET finished_at = $1\n WHERE user_session_id = $2\n "
},
"e6dc63984aced9e19c20e90e9cd75d6f6d7ade64f782697715ac4da077b2e1fc": {
"describe": {

View File

@ -175,6 +175,7 @@ pub async fn lookup_active_access_token(
let browser_session = BrowserSession {
id: res.user_session_id.into(),
created_at: res.user_session_created_at,
finished_at: None,
user,
last_authentication,
};

View File

@ -224,6 +224,7 @@ impl GrantLookup {
id: user_session_id.into(),
user,
created_at: user_session_created_at,
finished_at: None,
last_authentication,
};

View File

@ -23,8 +23,8 @@ use uuid::Uuid;
use self::client::lookup_clients;
use crate::{
pagination::{process_page, QueryBuilderExt},
user::lookup_active_session,
Clock, DatabaseError, DatabaseInconsistencyError,
user::BrowserSessionRepository,
Clock, DatabaseError, DatabaseInconsistencyError, Repository,
};
pub mod access_token;
@ -134,11 +134,10 @@ pub async fn get_paginated_user_oauth_sessions(
// ideal
let mut browser_sessions: HashMap<Ulid, BrowserSession> = HashMap::new();
for id in browser_session_ids {
let v = lookup_active_session(&mut *conn, id)
.await?
.ok_or_else(|| {
let v = conn.browser_session().lookup(id).await?.ok_or_else(|| {
DatabaseInconsistencyError::on("oauth2_sessions").column("user_session_id")
})?;
browser_sessions.insert(id, v);
}

View File

@ -204,6 +204,7 @@ pub async fn lookup_active_refresh_token(
let browser_session = BrowserSession {
id: res.user_session_id.into(),
created_at: res.user_session_created_at,
finished_at: None,
user,
last_authentication,
};

View File

@ -19,7 +19,10 @@ use crate::{
PgUpstreamOAuthLinkRepository, PgUpstreamOAuthProviderRepository,
PgUpstreamOAuthSessionRepository,
},
user::{PgUserEmailRepository, PgUserRepository},
user::{
PgBrowserSessionRepository, PgUserEmailRepository, PgUserPasswordRepository,
PgUserRepository,
},
};
pub trait Repository {
@ -43,11 +46,21 @@ pub trait Repository {
where
Self: 'c;
type UserPasswordRepository<'c>
where
Self: 'c;
type BrowserSessionRepository<'c>
where
Self: 'c;
fn upstream_oauth_link(&mut self) -> Self::UpstreamOAuthLinkRepository<'_>;
fn upstream_oauth_provider(&mut self) -> Self::UpstreamOAuthProviderRepository<'_>;
fn upstream_oauth_session(&mut self) -> Self::UpstreamOAuthSessionRepository<'_>;
fn user(&mut self) -> Self::UserRepository<'_>;
fn user_email(&mut self) -> Self::UserEmailRepository<'_>;
fn user_password(&mut self) -> Self::UserPasswordRepository<'_>;
fn browser_session(&mut self) -> Self::BrowserSessionRepository<'_>;
}
impl Repository for PgConnection {
@ -56,6 +69,8 @@ impl Repository for PgConnection {
type UpstreamOAuthSessionRepository<'c> = PgUpstreamOAuthSessionRepository<'c> where Self: 'c;
type UserRepository<'c> = PgUserRepository<'c> where Self: 'c;
type UserEmailRepository<'c> = PgUserEmailRepository<'c> where Self: 'c;
type UserPasswordRepository<'c> = PgUserPasswordRepository<'c> where Self: 'c;
type BrowserSessionRepository<'c> = PgBrowserSessionRepository<'c> where Self: 'c;
fn upstream_oauth_link(&mut self) -> Self::UpstreamOAuthLinkRepository<'_> {
PgUpstreamOAuthLinkRepository::new(self)
@ -76,6 +91,14 @@ impl Repository for PgConnection {
fn user_email(&mut self) -> Self::UserEmailRepository<'_> {
PgUserEmailRepository::new(self)
}
fn user_password(&mut self) -> Self::UserPasswordRepository<'_> {
PgUserPasswordRepository::new(self)
}
fn browser_session(&mut self) -> Self::BrowserSessionRepository<'_> {
PgBrowserSessionRepository::new(self)
}
}
impl<'t> Repository for Transaction<'t, Postgres> {
@ -84,6 +107,8 @@ impl<'t> Repository for Transaction<'t, Postgres> {
type UpstreamOAuthSessionRepository<'c> = PgUpstreamOAuthSessionRepository<'c> where Self: 'c;
type UserRepository<'c> = PgUserRepository<'c> where Self: 'c;
type UserEmailRepository<'c> = PgUserEmailRepository<'c> where Self: 'c;
type UserPasswordRepository<'c> = PgUserPasswordRepository<'c> where Self: 'c;
type BrowserSessionRepository<'c> = PgBrowserSessionRepository<'c> where Self: 'c;
fn upstream_oauth_link(&mut self) -> Self::UpstreamOAuthLinkRepository<'_> {
PgUpstreamOAuthLinkRepository::new(self)
@ -104,4 +129,12 @@ impl<'t> Repository for Transaction<'t, Postgres> {
fn user_email(&mut self) -> Self::UserEmailRepository<'_> {
PgUserEmailRepository::new(self)
}
fn user_password(&mut self) -> Self::UserPasswordRepository<'_> {
PgUserPasswordRepository::new(self)
}
fn browser_session(&mut self) -> Self::BrowserSessionRepository<'_> {
PgBrowserSessionRepository::new(self)
}
}

View File

@ -1,105 +0,0 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use mas_data_model::{Authentication, BrowserSession, Password, UpstreamOAuthLink};
use rand::Rng;
use sqlx::PgExecutor;
use ulid::Ulid;
use uuid::Uuid;
use crate::Clock;
#[tracing::instrument(
skip_all,
fields(
user.id = %user_session.user.id,
%user_password.id,
%user_session.id,
user_session_authentication.id,
),
err,
)]
pub async fn authenticate_session_with_password(
executor: impl PgExecutor<'_>,
mut rng: impl Rng + Send,
clock: &Clock,
user_session: &mut BrowserSession,
user_password: &Password,
) -> Result<(), sqlx::Error> {
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), &mut rng);
tracing::Span::current().record(
"user_session_authentication.id",
tracing::field::display(id),
);
sqlx::query!(
r#"
INSERT INTO user_session_authentications
(user_session_authentication_id, user_session_id, created_at)
VALUES ($1, $2, $3)
"#,
Uuid::from(id),
Uuid::from(user_session.id),
created_at,
)
.execute(executor)
.await?;
user_session.last_authentication = Some(Authentication { id, created_at });
Ok(())
}
#[tracing::instrument(
skip_all,
fields(
user.id = %user_session.user.id,
%upstream_oauth_link.id,
%user_session.id,
user_session_authentication.id,
),
err,
)]
pub async fn authenticate_session_with_upstream(
executor: impl PgExecutor<'_>,
mut rng: impl Rng + Send,
clock: &Clock,
user_session: &mut BrowserSession,
upstream_oauth_link: &UpstreamOAuthLink,
) -> Result<(), sqlx::Error> {
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), &mut rng);
tracing::Span::current().record(
"user_session_authentication.id",
tracing::field::display(id),
);
sqlx::query!(
r#"
INSERT INTO user_session_authentications
(user_session_authentication_id, user_session_id, created_at)
VALUES ($1, $2, $3)
"#,
Uuid::from(id),
Uuid::from(user_session.id),
created_at,
)
.execute(executor)
.await?;
user_session.last_authentication = Some(Authentication { id, created_at });
Ok(())
}

View File

@ -14,27 +14,22 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use mas_data_model::{Authentication, BrowserSession, User};
use rand::{Rng, RngCore};
use sqlx::{PgConnection, PgExecutor, QueryBuilder};
use tracing::{info_span, Instrument};
use mas_data_model::User;
use rand::RngCore;
use sqlx::PgConnection;
use ulid::Ulid;
use uuid::Uuid;
use crate::{
pagination::{process_page, QueryBuilderExt},
tracing::ExecuteExt,
Clock, DatabaseError, DatabaseInconsistencyError, LookupResultExt,
};
use crate::{tracing::ExecuteExt, Clock, DatabaseError, LookupResultExt};
mod authentication;
mod email;
mod password;
mod session;
pub use self::{
authentication::{authenticate_session_with_password, authenticate_session_with_upstream},
email::{PgUserEmailRepository, UserEmailRepository},
password::{add_user_password, lookup_user_password},
password::{PgUserPasswordRepository, UserPasswordRepository},
session::{BrowserSessionRepository, PgBrowserSessionRepository},
};
#[async_trait]
@ -218,234 +213,3 @@ impl<'c> UserRepository for PgUserRepository<'c> {
Ok(exists)
}
}
#[derive(sqlx::FromRow)]
struct SessionLookup {
user_session_id: Uuid,
user_session_created_at: DateTime<Utc>,
user_id: Uuid,
user_username: String,
user_primary_user_email_id: Option<Uuid>,
last_authentication_id: Option<Uuid>,
last_authd_at: Option<DateTime<Utc>>,
}
impl TryInto<BrowserSession> for SessionLookup {
type Error = DatabaseInconsistencyError;
fn try_into(self) -> Result<BrowserSession, Self::Error> {
let id = Ulid::from(self.user_id);
let user = User {
id,
username: self.user_username,
sub: id.to_string(),
primary_user_email_id: self.user_primary_user_email_id.map(Into::into),
};
let last_authentication = match (self.last_authentication_id, self.last_authd_at) {
(Some(id), Some(created_at)) => Some(Authentication {
id: id.into(),
created_at,
}),
(None, None) => None,
_ => {
return Err(DatabaseInconsistencyError::on(
"user_session_authentications",
))
}
};
Ok(BrowserSession {
id: self.user_session_id.into(),
user,
created_at: self.user_session_created_at,
last_authentication,
})
}
}
#[tracing::instrument(
skip_all,
fields(user_session.id = %id),
err,
)]
pub async fn lookup_active_session(
executor: impl PgExecutor<'_>,
id: Ulid,
) -> Result<Option<BrowserSession>, DatabaseError> {
let res = sqlx::query_as!(
SessionLookup,
r#"
SELECT s.user_session_id
, s.created_at AS "user_session_created_at"
, u.user_id
, u.username AS "user_username"
, u.primary_user_email_id AS "user_primary_user_email_id"
, a.user_session_authentication_id AS "last_authentication_id?"
, a.created_at AS "last_authd_at?"
FROM user_sessions s
INNER JOIN users u
USING (user_id)
LEFT JOIN user_session_authentications a
USING (user_session_id)
WHERE s.user_session_id = $1 AND s.finished_at IS NULL
ORDER BY a.created_at DESC
LIMIT 1
"#,
Uuid::from(id),
)
.fetch_one(executor)
.await
.to_option()?;
let Some(res) = res else { return Ok(None) };
Ok(Some(res.try_into()?))
}
#[tracing::instrument(
skip_all,
fields(
%user.id,
%user.username,
),
err,
)]
pub async fn get_paginated_user_sessions(
executor: impl PgExecutor<'_>,
user: &User,
before: Option<Ulid>,
after: Option<Ulid>,
first: Option<usize>,
last: Option<usize>,
) -> Result<(bool, bool, Vec<BrowserSession>), DatabaseError> {
let mut query = QueryBuilder::new(
r#"
SELECT
s.user_session_id,
u.user_id,
u.username,
s.created_at,
a.user_session_authentication_id AS "last_authentication_id",
a.created_at AS "last_authd_at",
ue.user_email_id AS "user_email_id",
ue.email AS "user_email",
ue.created_at AS "user_email_created_at",
ue.confirmed_at AS "user_email_confirmed_at"
FROM user_sessions s
INNER JOIN users u
USING (user_id)
LEFT JOIN user_session_authentications a
USING (user_session_id)
LEFT JOIN user_emails ue
ON ue.user_email_id = u.primary_user_email_id
"#,
);
query
.push(" WHERE s.finished_at IS NULL AND s.user_id = ")
.push_bind(Uuid::from(user.id))
.generate_pagination("s.user_session_id", before, after, first, last)?;
let span = info_span!("Fetch paginated user emails", db.statement = query.sql());
let page: Vec<SessionLookup> = query
.build_query_as()
.fetch_all(executor)
.instrument(span)
.await?;
let (has_previous_page, has_next_page, page) = process_page(page, first, last)?;
let page: Result<Vec<_>, _> = page.into_iter().map(TryInto::try_into).collect();
Ok((has_previous_page, has_next_page, page?))
}
#[tracing::instrument(
skip_all,
fields(
%user.id,
user_session.id,
),
err,
)]
pub async fn start_session(
executor: impl PgExecutor<'_>,
mut rng: impl Rng + Send,
clock: &Clock,
user: User,
) -> Result<BrowserSession, sqlx::Error> {
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), &mut rng);
tracing::Span::current().record("user_session.id", tracing::field::display(id));
sqlx::query!(
r#"
INSERT INTO user_sessions (user_session_id, user_id, created_at)
VALUES ($1, $2, $3)
"#,
Uuid::from(id),
Uuid::from(user.id),
created_at,
)
.execute(executor)
.await?;
let session = BrowserSession {
id,
user,
created_at,
last_authentication: None,
};
Ok(session)
}
#[tracing::instrument(
skip_all,
fields(%user.id),
err,
)]
pub async fn count_active_sessions(
executor: impl PgExecutor<'_>,
user: &User,
) -> Result<i64, DatabaseError> {
let res = sqlx::query_scalar!(
r#"
SELECT COUNT(*) as "count!"
FROM user_sessions s
WHERE s.user_id = $1 AND s.finished_at IS NULL
"#,
Uuid::from(user.id),
)
.fetch_one(executor)
.await?;
Ok(res)
}
#[tracing::instrument(
skip_all,
fields(%user_session.id),
err,
)]
pub async fn end_session(
executor: impl PgExecutor<'_>,
clock: &Clock,
user_session: &BrowserSession,
) -> Result<(), DatabaseError> {
let now = clock.now();
let res = sqlx::query!(
r#"
UPDATE user_sessions
SET finished_at = $1
WHERE user_session_id = $2
"#,
now,
Uuid::from(user_session.id),
)
.execute(executor)
.instrument(info_span!("End session"))
.await?;
DatabaseError::ensure_affected_rows(&res, 1)
}

View File

@ -12,63 +12,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use mas_data_model::{Password, User};
use rand::Rng;
use sqlx::PgExecutor;
use rand::RngCore;
use sqlx::PgConnection;
use ulid::Ulid;
use uuid::Uuid;
use crate::{Clock, DatabaseError, DatabaseInconsistencyError, LookupResultExt};
use crate::{
tracing::ExecuteExt, Clock, DatabaseError, DatabaseInconsistencyError, LookupResultExt,
};
#[tracing::instrument(
skip_all,
fields(
%user.id,
%user.username,
user_password.id,
user_password.version = version,
),
err,
)]
pub async fn add_user_password(
executor: impl PgExecutor<'_>,
mut rng: impl Rng + Send,
#[async_trait]
pub trait UserPasswordRepository: Send + Sync {
type Error;
async fn active(&mut self, user: &User) -> Result<Option<Password>, Self::Error>;
async fn add(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &Clock,
user: &User,
version: u16,
hashed_password: String,
upgraded_from: Option<Password>,
) -> Result<Password, DatabaseError> {
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), &mut rng);
tracing::Span::current().record("user_password.id", tracing::field::display(id));
upgraded_from: Option<&Password>,
) -> Result<Password, Self::Error>;
}
let upgraded_from_id = upgraded_from.map(|p| p.id);
pub struct PgUserPasswordRepository<'c> {
conn: &'c mut PgConnection,
}
sqlx::query!(
r#"
INSERT INTO user_passwords
(user_password_id, user_id, hashed_password, version, upgraded_from_id, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
Uuid::from(id),
Uuid::from(user.id),
hashed_password,
i32::from(version),
upgraded_from_id.map(Uuid::from),
created_at,
)
.execute(executor)
.await?;
Ok(Password {
id,
hashed_password,
version,
upgraded_from_id,
created_at,
})
impl<'c> PgUserPasswordRepository<'c> {
pub fn new(conn: &'c mut PgConnection) -> Self {
Self { conn }
}
}
struct UserPasswordLookup {
@ -79,18 +58,21 @@ struct UserPasswordLookup {
created_at: DateTime<Utc>,
}
#[async_trait]
impl<'c> UserPasswordRepository for PgUserPasswordRepository<'c> {
type Error = DatabaseError;
#[tracing::instrument(
name = "db.user_password.active",
skip_all,
fields(
db.statement,
%user.id,
%user.username,
),
err,
)]
pub async fn lookup_user_password(
executor: impl PgExecutor<'_>,
user: &User,
) -> Result<Option<Password>, DatabaseError> {
async fn active(&mut self, user: &User) -> Result<Option<Password>, Self::Error> {
let res = sqlx::query_as!(
UserPasswordLookup,
r#"
@ -106,7 +88,8 @@ pub async fn lookup_user_password(
"#,
Uuid::from(user.id),
)
.fetch_one(executor)
.traced()
.fetch_one(&mut *self.conn)
.await
.to_option()?;
@ -133,3 +116,57 @@ pub async fn lookup_user_password(
created_at,
}))
}
#[tracing::instrument(
name = "db.user_password.add",
skip_all,
fields(
db.statement,
%user.id,
%user.username,
user_password.id,
user_password.version = version,
),
err,
)]
async fn add(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &Clock,
user: &User,
version: u16,
hashed_password: String,
upgraded_from: Option<&Password>,
) -> Result<Password, Self::Error> {
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), rng);
tracing::Span::current().record("user_password.id", tracing::field::display(id));
let upgraded_from_id = upgraded_from.map(|p| p.id);
sqlx::query!(
r#"
INSERT INTO user_passwords
(user_password_id, user_id, hashed_password, version, upgraded_from_id, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
"#,
Uuid::from(id),
Uuid::from(user.id),
hashed_password,
i32::from(version),
upgraded_from_id.map(Uuid::from),
created_at,
)
.traced()
.execute(&mut *self.conn)
.await?;
Ok(Password {
id,
hashed_password,
version,
upgraded_from_id,
created_at,
})
}
}

View File

@ -0,0 +1,425 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use mas_data_model::{Authentication, BrowserSession, Password, UpstreamOAuthLink, User};
use rand::RngCore;
use sqlx::{PgConnection, QueryBuilder};
use ulid::Ulid;
use uuid::Uuid;
use crate::{
pagination::{process_page, Page, QueryBuilderExt},
tracing::ExecuteExt,
Clock, DatabaseError, DatabaseInconsistencyError, LookupResultExt,
};
#[async_trait]
pub trait BrowserSessionRepository: Send + Sync {
type Error;
async fn lookup(&mut self, id: Ulid) -> Result<Option<BrowserSession>, Self::Error>;
async fn add(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &Clock,
user: &User,
) -> Result<BrowserSession, Self::Error>;
async fn finish(
&mut self,
clock: &Clock,
user_session: BrowserSession,
) -> Result<BrowserSession, Self::Error>;
async fn list_active_paginated(
&mut self,
user: &User,
before: Option<Ulid>,
after: Option<Ulid>,
first: Option<usize>,
last: Option<usize>,
) -> Result<Page<BrowserSession>, Self::Error>;
async fn count_active(&mut self, user: &User) -> Result<usize, Self::Error>;
async fn authenticate_with_password(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &Clock,
user_session: BrowserSession,
user_password: &Password,
) -> Result<BrowserSession, Self::Error>;
async fn authenticate_with_upstream(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &Clock,
user_session: BrowserSession,
upstream_oauth_link: &UpstreamOAuthLink,
) -> Result<BrowserSession, Self::Error>;
}
pub struct PgBrowserSessionRepository<'c> {
conn: &'c mut PgConnection,
}
impl<'c> PgBrowserSessionRepository<'c> {
pub fn new(conn: &'c mut PgConnection) -> Self {
Self { conn }
}
}
#[derive(sqlx::FromRow)]
struct SessionLookup {
user_session_id: Uuid,
user_session_created_at: DateTime<Utc>,
user_session_finished_at: Option<DateTime<Utc>>,
user_id: Uuid,
user_username: String,
user_primary_user_email_id: Option<Uuid>,
last_authentication_id: Option<Uuid>,
last_authd_at: Option<DateTime<Utc>>,
}
impl TryInto<BrowserSession> for SessionLookup {
type Error = DatabaseInconsistencyError;
fn try_into(self) -> Result<BrowserSession, Self::Error> {
let id = Ulid::from(self.user_id);
let user = User {
id,
username: self.user_username,
sub: id.to_string(),
primary_user_email_id: self.user_primary_user_email_id.map(Into::into),
};
let last_authentication = match (self.last_authentication_id, self.last_authd_at) {
(Some(id), Some(created_at)) => Some(Authentication {
id: id.into(),
created_at,
}),
(None, None) => None,
_ => {
return Err(DatabaseInconsistencyError::on(
"user_session_authentications",
))
}
};
Ok(BrowserSession {
id: self.user_session_id.into(),
user,
created_at: self.user_session_created_at,
finished_at: self.user_session_finished_at,
last_authentication,
})
}
}
#[async_trait]
impl<'c> BrowserSessionRepository for PgBrowserSessionRepository<'c> {
type Error = DatabaseError;
#[tracing::instrument(
name = "db.browser_session.lookup",
skip_all,
fields(
db.statement,
user_session.id = %id,
),
err,
)]
async fn lookup(&mut self, id: Ulid) -> Result<Option<BrowserSession>, Self::Error> {
let res = sqlx::query_as!(
SessionLookup,
r#"
SELECT s.user_session_id
, s.created_at AS "user_session_created_at"
, s.finished_at AS "user_session_finished_at"
, u.user_id
, u.username AS "user_username"
, u.primary_user_email_id AS "user_primary_user_email_id"
, a.user_session_authentication_id AS "last_authentication_id?"
, a.created_at AS "last_authd_at?"
FROM user_sessions s
INNER JOIN users u
USING (user_id)
LEFT JOIN user_session_authentications a
USING (user_session_id)
WHERE s.user_session_id = $1
ORDER BY a.created_at DESC
LIMIT 1
"#,
Uuid::from(id),
)
.traced()
.fetch_one(&mut *self.conn)
.await
.to_option()?;
let Some(res) = res else { return Ok(None) };
Ok(Some(res.try_into()?))
}
#[tracing::instrument(
name = "db.browser_session.add",
skip_all,
fields(
db.statement,
%user.id,
user_session.id,
),
err,
)]
async fn add(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &Clock,
user: &User,
) -> Result<BrowserSession, Self::Error> {
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), rng);
tracing::Span::current().record("user_session.id", tracing::field::display(id));
sqlx::query!(
r#"
INSERT INTO user_sessions (user_session_id, user_id, created_at)
VALUES ($1, $2, $3)
"#,
Uuid::from(id),
Uuid::from(user.id),
created_at,
)
.traced()
.execute(&mut *self.conn)
.await?;
let session = BrowserSession {
id,
// XXX
user: user.clone(),
created_at,
finished_at: None,
last_authentication: None,
};
Ok(session)
}
#[tracing::instrument(
name = "db.browser_session.finish",
skip_all,
fields(
db.statement,
%user_session.id,
),
err,
)]
async fn finish(
&mut self,
clock: &Clock,
mut user_session: BrowserSession,
) -> Result<BrowserSession, Self::Error> {
let finished_at = clock.now();
let res = sqlx::query!(
r#"
UPDATE user_sessions
SET finished_at = $1
WHERE user_session_id = $2
"#,
finished_at,
Uuid::from(user_session.id),
)
.traced()
.execute(&mut *self.conn)
.await?;
user_session.finished_at = Some(finished_at);
DatabaseError::ensure_affected_rows(&res, 1)?;
Ok(user_session)
}
#[tracing::instrument(
name = "db.browser_session.list_active_paginated",
skip_all,
fields(
db.statement,
%user.id,
),
err,
)]
async fn list_active_paginated(
&mut self,
user: &User,
before: Option<Ulid>,
after: Option<Ulid>,
first: Option<usize>,
last: Option<usize>,
) -> Result<Page<BrowserSession>, Self::Error> {
// TODO: ordering of last authentication is wrong
let mut query = QueryBuilder::new(
r#"
SELECT DISTINCT ON (s.user_session_id)
s.user_session_id,
u.user_id,
u.username,
s.created_at,
a.user_session_authentication_id AS "last_authentication_id",
a.created_at AS "last_authd_at",
FROM user_sessions s
INNER JOIN users u
USING (user_id)
LEFT JOIN user_session_authentications a
USING (user_session_id)
"#,
);
query
.push(" WHERE s.finished_at IS NULL AND s.user_id = ")
.push_bind(Uuid::from(user.id))
.generate_pagination("s.user_session_id", before, after, first, last)?;
let page: Vec<SessionLookup> = query
.build_query_as()
.traced()
.fetch_all(&mut *self.conn)
.await?;
let (has_previous_page, has_next_page, edges) = process_page(page, first, last)?;
let edges: Result<Vec<_>, _> = edges.into_iter().map(TryInto::try_into).collect();
Ok(Page {
has_previous_page,
has_next_page,
edges: edges?,
})
}
#[tracing::instrument(
name = "db.browser_session.count_active",
skip_all,
fields(
db.statement,
%user.id,
),
err,
)]
async fn count_active(&mut self, user: &User) -> Result<usize, Self::Error> {
let res = sqlx::query_scalar!(
r#"
SELECT COUNT(*) as "count!"
FROM user_sessions s
WHERE s.user_id = $1 AND s.finished_at IS NULL
"#,
Uuid::from(user.id),
)
.traced()
.fetch_one(&mut *self.conn)
.await?;
res.try_into().map_err(DatabaseError::to_invalid_operation)
}
#[tracing::instrument(
name = "db.browser_session.authenticate_with_password",
skip_all,
fields(
db.statement,
%user_session.id,
%user_password.id,
user_session_authentication.id,
),
err,
)]
async fn authenticate_with_password(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &Clock,
mut user_session: BrowserSession,
user_password: &Password,
) -> Result<BrowserSession, Self::Error> {
let _user_password = user_password;
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), rng);
tracing::Span::current().record(
"user_session_authentication.id",
tracing::field::display(id),
);
sqlx::query!(
r#"
INSERT INTO user_session_authentications
(user_session_authentication_id, user_session_id, created_at)
VALUES ($1, $2, $3)
"#,
Uuid::from(id),
Uuid::from(user_session.id),
created_at,
)
.traced()
.execute(&mut *self.conn)
.await?;
user_session.last_authentication = Some(Authentication { id, created_at });
Ok(user_session)
}
#[tracing::instrument(
name = "db.browser_session.authenticate_with_upstream",
skip_all,
fields(
db.statement,
%user_session.id,
%upstream_oauth_link.id,
user_session_authentication.id,
),
err,
)]
async fn authenticate_with_upstream(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &Clock,
mut user_session: BrowserSession,
upstream_oauth_link: &UpstreamOAuthLink,
) -> Result<BrowserSession, Self::Error> {
let _upstream_oauth_link = upstream_oauth_link;
let created_at = clock.now();
let id = Ulid::from_datetime_with_source(created_at.into(), rng);
tracing::Span::current().record(
"user_session_authentication.id",
tracing::field::display(id),
);
sqlx::query!(
r#"
INSERT INTO user_session_authentications
(user_session_authentication_id, user_session_id, created_at)
VALUES ($1, $2, $3)
"#,
Uuid::from(id),
Uuid::from(user_session.id),
created_at,
)
.traced()
.execute(&mut *self.conn)
.await?;
user_session.last_authentication = Some(Authentication { id, created_at });
Ok(user_session)
}
}

View File

@ -532,14 +532,14 @@ where {
/// Context used by the `account/index.html` template
#[derive(Serialize)]
pub struct AccountContext {
active_sessions: i64,
active_sessions: usize,
emails: Vec<UserEmail>,
}
impl AccountContext {
/// Constructs a context for the "my account" page
#[must_use]
pub fn new(active_sessions: i64, emails: Vec<UserEmail>) -> Self {
pub fn new(active_sessions: usize, emails: Vec<UserEmail>) -> Self {
Self {
active_sessions,
emails,