1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-07-29 22:01:14 +03:00

Allow querying browser sessions

This commit is contained in:
Quentin Gliech
2022-11-08 10:13:14 +01:00
parent ac40367c5f
commit e8e7e75514
11 changed files with 512 additions and 306 deletions

1
Cargo.lock generated
View File

@ -2604,7 +2604,6 @@ dependencies = [
"serde", "serde",
"sqlx", "sqlx",
"tokio", "tokio",
"tokio-stream",
"ulid", "ulid",
] ]

View File

@ -11,7 +11,6 @@ chrono = "0.4.22"
serde = { version = "1.0.147", features = ["derive"] } serde = { version = "1.0.147", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "postgres"] } sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "postgres"] }
tokio = { version = "1.21.2", features = ["time"] } tokio = { version = "1.21.2", features = ["time"] }
tokio-stream = "0.1.11"
ulid = "1.0.0" ulid = "1.0.0"
mas-axum-utils = { path = "../axum-utils" } mas-axum-utils = { path = "../axum-utils" }

View File

@ -11,6 +11,35 @@ type BrowserSession {
createdAt: DateTime! createdAt: DateTime!
} }
type BrowserSessionConnection {
"""
Information to aid in pagination.
"""
pageInfo: PageInfo!
"""
A list of edges.
"""
edges: [BrowserSessionEdge!]!
"""
A list of nodes.
"""
nodes: [BrowserSession!]!
}
"""
An edge in a connection.
"""
type BrowserSessionEdge {
"""
A cursor for use in pagination
"""
cursor: String!
"""
The item at the end of the edge
"""
node: BrowserSession!
}
""" """
Implement the DateTime<Utc> scalar Implement the DateTime<Utc> scalar
@ -21,13 +50,6 @@ scalar DateTime
type Mutation {
"""
A dummy mutation so that the mutation object is not empty
"""
hello: Boolean!
}
""" """
Information about pagination in a connection Information about pagination in a connection
""" """
@ -51,22 +73,22 @@ type PageInfo {
} }
type Query { type Query {
currentSession: BrowserSession """
Get the current logged in browser session
"""
currentBrowserSession: BrowserSession
"""
Get the current logged in user
"""
currentUser: User currentUser: User
} }
type Subscription {
"""
A dump subscription to try out the websocket
"""
integers(step: Int! = 1): Int!
}
type User { type User {
id: ID! id: ID!
username: String! username: String!
primaryEmail: UserEmail primaryEmail: UserEmail
browserSessions(after: String, before: String, first: Int, last: Int): BrowserSessionConnection!
emails(after: String, before: String, first: Int, last: Int): UserEmailConnection! emails(after: String, before: String, first: Int, last: Int): UserEmailConnection!
} }
@ -109,7 +131,5 @@ type UserEmailEdge {
schema { schema {
query: Query query: Query
mutation: Mutation
subscription: Subscription
} }

View File

@ -22,23 +22,20 @@
#![warn(clippy::pedantic)] #![warn(clippy::pedantic)]
#![allow(clippy::module_name_repetitions, clippy::missing_errors_doc)] #![allow(clippy::module_name_repetitions, clippy::missing_errors_doc)]
use std::time::Duration; use async_graphql::{Context, EmptyMutation, EmptySubscription};
use async_graphql::Context;
use mas_axum_utils::SessionInfo; use mas_axum_utils::SessionInfo;
use sqlx::PgPool; use sqlx::PgPool;
use tokio_stream::{Stream, StreamExt};
use self::model::{BrowserSession, User}; use self::model::{BrowserSession, User};
mod model; mod model;
pub type Schema = async_graphql::Schema<Query, Mutation, Subscription>; pub type Schema = async_graphql::Schema<Query, EmptyMutation, EmptySubscription>;
pub type SchemaBuilder = async_graphql::SchemaBuilder<Query, Mutation, Subscription>; pub type SchemaBuilder = async_graphql::SchemaBuilder<Query, EmptyMutation, EmptySubscription>;
#[must_use] #[must_use]
pub fn schema_builder() -> SchemaBuilder { pub fn schema_builder() -> SchemaBuilder {
async_graphql::Schema::build(Query::new(), Mutation::new(), Subscription::new()) async_graphql::Schema::build(Query::new(), EmptyMutation, EmptySubscription)
} }
#[derive(Default)] #[derive(Default)]
@ -55,7 +52,8 @@ impl Query {
#[async_graphql::Object] #[async_graphql::Object]
impl Query { impl Query {
async fn current_session( /// Get the current logged in browser session
async fn current_browser_session(
&self, &self,
ctx: &Context<'_>, ctx: &Context<'_>,
) -> Result<Option<BrowserSession>, async_graphql::Error> { ) -> Result<Option<BrowserSession>, async_graphql::Error> {
@ -67,6 +65,7 @@ impl Query {
Ok(session.map(BrowserSession::from)) Ok(session.map(BrowserSession::from))
} }
/// Get the current logged in user
async fn current_user(&self, ctx: &Context<'_>) -> Result<Option<User>, async_graphql::Error> { async fn current_user(&self, ctx: &Context<'_>) -> Result<Option<User>, async_graphql::Error> {
let database = ctx.data::<PgPool>()?; let database = ctx.data::<PgPool>()?;
let session_info = ctx.data::<SessionInfo>()?; let session_info = ctx.data::<SessionInfo>()?;
@ -76,48 +75,3 @@ impl Query {
Ok(session.map(User::from)) Ok(session.map(User::from))
} }
} }
#[derive(Default)]
pub struct Mutation {
_private: (),
}
impl Mutation {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[async_graphql::Object]
impl Mutation {
/// A dummy mutation so that the mutation object is not empty
async fn hello(&self) -> bool {
true
}
}
#[derive(Default)]
pub struct Subscription {
_private: (),
}
impl Subscription {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[async_graphql::Subscription]
impl Subscription {
/// A dump subscription to try out the websocket
async fn integers(&self, #[graphql(default = 1)] step: i32) -> impl Stream<Item = i32> {
let mut value = 0;
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1)))
.map(move |_| {
value += step;
value
})
}
}

View File

@ -12,184 +12,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use async_graphql::{ mod browser_sessions;
connection::{query, Connection, Edge, OpaqueCursor}, mod cursor;
Context, Object, ID, mod users;
pub use self::{
browser_sessions::{Authentication, BrowserSession},
cursor::{Cursor, NodeCursor, NodeType},
users::{User, UserEmail},
}; };
use chrono::{DateTime, Utc};
use mas_storage::PostgresqlBackend;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use ulid::Ulid;
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)]
#[serde(rename = "snake_case")]
enum NodeType {
User,
UserEmail,
BrowserSession,
}
#[derive(Serialize, Deserialize, PartialEq, Eq)]
struct NodeCursor(NodeType, Ulid);
impl NodeCursor {
fn extract_for_type(&self, node_type: NodeType) -> Result<Ulid, async_graphql::Error> {
if self.0 == node_type {
Ok(self.1)
} else {
Err(async_graphql::Error::new("invalid cursor"))
}
}
}
type Cursor = OpaqueCursor<NodeCursor>;
pub struct BrowserSession(mas_data_model::BrowserSession<PostgresqlBackend>);
impl From<mas_data_model::BrowserSession<PostgresqlBackend>> for BrowserSession {
fn from(v: mas_data_model::BrowserSession<PostgresqlBackend>) -> Self {
Self(v)
}
}
#[Object]
impl BrowserSession {
async fn id(&self) -> ID {
ID(self.0.data.to_string())
}
async fn user(&self) -> User {
User(self.0.user.clone())
}
async fn last_authentication(&self) -> Option<Authentication> {
self.0.last_authentication.clone().map(Authentication)
}
async fn created_at(&self) -> DateTime<Utc> {
self.0.created_at
}
}
pub struct User(mas_data_model::User<PostgresqlBackend>);
impl From<mas_data_model::User<PostgresqlBackend>> for User {
fn from(v: mas_data_model::User<PostgresqlBackend>) -> Self {
Self(v)
}
}
impl From<mas_data_model::BrowserSession<PostgresqlBackend>> for User {
fn from(v: mas_data_model::BrowserSession<PostgresqlBackend>) -> Self {
Self(v.user)
}
}
#[Object]
impl User {
async fn id(&self) -> ID {
ID(self.0.data.to_string())
}
async fn username(&self) -> &str {
&self.0.username
}
async fn primary_email(&self) -> Option<UserEmail> {
self.0.primary_email.clone().map(UserEmail)
}
async fn emails(
&self,
ctx: &Context<'_>,
after: Option<String>,
before: Option<String>,
first: Option<i32>,
last: Option<i32>,
) -> Result<Connection<Cursor, UserEmail, UserEmailsPagination>, async_graphql::Error> {
let database = ctx.data::<PgPool>()?;
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let mut conn = database.acquire().await?;
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::UserEmail))
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::UserEmail))
.transpose()?;
let (has_previous_page, has_next_page, edges) =
mas_storage::user::get_paginated_user_emails(
&mut conn, &self.0, before_id, after_id, first, last,
)
.await?;
let mut connection = Connection::with_additional_fields(
has_previous_page,
has_next_page,
UserEmailsPagination(self.0.clone()),
);
connection.edges.extend(edges.into_iter().map(|u| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::UserEmail, u.data)),
UserEmail(u),
)
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
}
pub struct Authentication(mas_data_model::Authentication<PostgresqlBackend>);
#[Object]
impl Authentication {
async fn id(&self) -> ID {
ID(self.0.data.to_string())
}
async fn created_at(&self) -> DateTime<Utc> {
self.0.created_at
}
}
pub struct UserEmail(mas_data_model::UserEmail<PostgresqlBackend>);
#[Object]
impl UserEmail {
async fn id(&self) -> ID {
ID(self.0.data.to_string())
}
async fn email(&self) -> &str {
&self.0.email
}
async fn created_at(&self) -> DateTime<Utc> {
self.0.created_at
}
async fn confirmed_at(&self) -> Option<DateTime<Utc>> {
self.0.confirmed_at
}
}
pub struct UserEmailsPagination(mas_data_model::User<PostgresqlBackend>);
#[Object]
impl UserEmailsPagination {
async fn total_count(&self, ctx: &Context<'_>) -> Result<i64, async_graphql::Error> {
let mut conn = ctx.data::<PgPool>()?.acquire().await?;
let count = mas_storage::user::count_user_emails(&mut conn, &self.0).await?;
Ok(count)
}
}

View File

@ -0,0 +1,59 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_graphql::{Object, ID};
use chrono::{DateTime, Utc};
use mas_storage::PostgresqlBackend;
use super::User;
pub struct BrowserSession(pub mas_data_model::BrowserSession<PostgresqlBackend>);
impl From<mas_data_model::BrowserSession<PostgresqlBackend>> for BrowserSession {
fn from(v: mas_data_model::BrowserSession<PostgresqlBackend>) -> Self {
Self(v)
}
}
#[Object]
impl BrowserSession {
async fn id(&self) -> ID {
ID(self.0.data.to_string())
}
async fn user(&self) -> User {
User(self.0.user.clone())
}
async fn last_authentication(&self) -> Option<Authentication> {
self.0.last_authentication.clone().map(Authentication)
}
async fn created_at(&self) -> DateTime<Utc> {
self.0.created_at
}
}
pub struct Authentication(pub mas_data_model::Authentication<PostgresqlBackend>);
#[Object]
impl Authentication {
async fn id(&self) -> ID {
ID(self.0.data.to_string())
}
async fn created_at(&self) -> DateTime<Utc> {
self.0.created_at
}
}

View File

@ -0,0 +1,39 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_graphql::connection::OpaqueCursor;
use serde::{Deserialize, Serialize};
use ulid::Ulid;
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)]
#[serde(rename = "snake_case")]
pub enum NodeType {
UserEmail,
BrowserSession,
}
#[derive(Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeCursor(pub NodeType, pub Ulid);
impl NodeCursor {
pub fn extract_for_type(&self, node_type: NodeType) -> Result<Ulid, async_graphql::Error> {
if self.0 == node_type {
Ok(self.1)
} else {
Err(async_graphql::Error::new("invalid cursor"))
}
}
}
pub type Cursor = OpaqueCursor<NodeCursor>;

View File

@ -0,0 +1,176 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_graphql::{
connection::{query, Connection, Edge, OpaqueCursor},
Context, Object, ID,
};
use chrono::{DateTime, Utc};
use mas_storage::PostgresqlBackend;
use sqlx::PgPool;
use super::{BrowserSession, Cursor, NodeCursor, NodeType};
pub struct User(pub mas_data_model::User<PostgresqlBackend>);
impl From<mas_data_model::User<PostgresqlBackend>> for User {
fn from(v: mas_data_model::User<PostgresqlBackend>) -> Self {
Self(v)
}
}
impl From<mas_data_model::BrowserSession<PostgresqlBackend>> for User {
fn from(v: mas_data_model::BrowserSession<PostgresqlBackend>) -> Self {
Self(v.user)
}
}
#[Object]
impl User {
async fn id(&self) -> ID {
ID(self.0.data.to_string())
}
async fn username(&self) -> &str {
&self.0.username
}
async fn primary_email(&self) -> Option<UserEmail> {
self.0.primary_email.clone().map(UserEmail)
}
async fn browser_sessions(
&self,
ctx: &Context<'_>,
after: Option<String>,
before: Option<String>,
first: Option<i32>,
last: Option<i32>,
) -> Result<Connection<Cursor, BrowserSession>, async_graphql::Error> {
let database = ctx.data::<PgPool>()?;
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let mut conn = database.acquire().await?;
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::UserEmail))
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::UserEmail))
.transpose()?;
let (has_previous_page, has_next_page, edges) =
mas_storage::user::get_paginated_user_sessions(
&mut conn, &self.0, before_id, after_id, first, last,
)
.await?;
let mut connection = Connection::new(has_previous_page, has_next_page);
connection.edges.extend(edges.into_iter().map(|u| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::BrowserSession, u.data)),
BrowserSession(u),
)
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
async fn emails(
&self,
ctx: &Context<'_>,
after: Option<String>,
before: Option<String>,
first: Option<i32>,
last: Option<i32>,
) -> Result<Connection<Cursor, UserEmail, UserEmailsPagination>, async_graphql::Error> {
let database = ctx.data::<PgPool>()?;
query(
after,
before,
first,
last,
|after, before, first, last| async move {
let mut conn = database.acquire().await?;
let after_id = after
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::UserEmail))
.transpose()?;
let before_id = before
.map(|x: OpaqueCursor<NodeCursor>| x.extract_for_type(NodeType::UserEmail))
.transpose()?;
let (has_previous_page, has_next_page, edges) =
mas_storage::user::get_paginated_user_emails(
&mut conn, &self.0, before_id, after_id, first, last,
)
.await?;
let mut connection = Connection::with_additional_fields(
has_previous_page,
has_next_page,
UserEmailsPagination(self.0.clone()),
);
connection.edges.extend(edges.into_iter().map(|u| {
Edge::new(
OpaqueCursor(NodeCursor(NodeType::UserEmail, u.data)),
UserEmail(u),
)
}));
Ok::<_, async_graphql::Error>(connection)
},
)
.await
}
}
pub struct UserEmail(mas_data_model::UserEmail<PostgresqlBackend>);
#[Object]
impl UserEmail {
async fn id(&self) -> ID {
ID(self.0.data.to_string())
}
async fn email(&self) -> &str {
&self.0.email
}
async fn created_at(&self) -> DateTime<Utc> {
self.0.created_at
}
async fn confirmed_at(&self) -> Option<DateTime<Utc>> {
self.0.confirmed_at
}
}
pub struct UserEmailsPagination(mas_data_model::User<PostgresqlBackend>);
#[Object]
impl UserEmailsPagination {
async fn total_count(&self, ctx: &Context<'_>) -> Result<i64, async_graphql::Error> {
let mut conn = ctx.data::<PgPool>()?.acquire().await?;
let count = mas_storage::user::count_user_emails(&mut conn, &self.0).await?;
Ok(count)
}
}

View File

@ -77,6 +77,7 @@ impl StorageBackendMarker for PostgresqlBackend {}
pub mod compat; pub mod compat;
pub mod oauth2; pub mod oauth2;
pub(crate) mod pagination;
pub mod user; pub mod user;
/// Embedded migrations, allowing them to run on startup /// Embedded migrations, allowing them to run on startup

View File

@ -0,0 +1,100 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Write;
use sqlx::Arguments;
use ulid::Ulid;
use uuid::Uuid;
pub fn generate_pagination<'a, A, W>(
query: &mut W,
id_field: &'static str,
arguments: &mut A,
before: Option<Ulid>,
after: Option<Ulid>,
first: Option<usize>,
last: Option<usize>,
) -> Result<(), anyhow::Error>
where
W: Write,
A: Arguments<'a>,
Uuid: sqlx::Type<A::Database> + sqlx::Encode<'a, A::Database>,
i64: sqlx::Type<A::Database> + sqlx::Encode<'a, A::Database>,
{
// ref: https://github.com/graphql/graphql-relay-js/issues/94#issuecomment-232410564
// 1. Start from the greedy query: SELECT * FROM table
// 2. If the after argument is provided, add `id > parsed_cursor` to the `WHERE`
// clause
if let Some(after) = after {
write!(query, " AND {id_field} > ")?;
arguments.add(Uuid::from(after));
arguments.format_placeholder(query)?;
}
// 3. If the before argument is provided, add `id < parsed_cursor` to the
// `WHERE` clause
if let Some(before) = before {
write!(query, " AND {id_field} < ")?;
arguments.add(Uuid::from(before));
arguments.format_placeholder(query)?;
}
// 4. If the first argument is provided, add `ORDER BY id ASC LIMIT first+1` to
// the query
if let Some(count) = first {
write!(query, " ORDER BY {id_field} ASC LIMIT ")?;
arguments.add((count + 1) as i64);
arguments.format_placeholder(query)?;
// 5. If the first argument is provided, add `ORDER BY id DESC LIMIT last+1`
// to the query
} else if let Some(count) = last {
write!(query, " ORDER BY ue.user_email_id DESC LIMIT ")?;
arguments.add((count + 1) as i64);
arguments.format_placeholder(query)?;
} else {
anyhow::bail!("Either 'first' or 'last' must be specified");
}
Ok(())
}
pub fn process_page<T>(
mut page: Vec<T>,
first: Option<usize>,
last: Option<usize>,
) -> Result<(bool, bool, Vec<T>), anyhow::Error> {
let limit = match (first, last) {
(Some(count), _) | (_, Some(count)) => count,
_ => anyhow::bail!("Either 'first' or 'last' must be specified"),
};
let is_full = page.len() == (limit + 1);
if is_full {
page.pop();
}
let (has_previous_page, has_next_page) = if first.is_some() {
(false, is_full)
} else if last.is_some() {
// 6. If the last argument is provided, I reverse the order of the results
page.reverse();
(is_full, false)
} else {
unreachable!()
};
Ok((has_previous_page, has_next_page, page))
}

View File

@ -1,4 +1,4 @@
// Copyright 2021 The Matrix.org Foundation C.I.C. // Copyright 2021, 2022 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -31,7 +31,10 @@ use ulid::Ulid;
use uuid::Uuid; use uuid::Uuid;
use super::{DatabaseInconsistencyError, PostgresqlBackend}; use super::{DatabaseInconsistencyError, PostgresqlBackend};
use crate::Clock; use crate::{
pagination::{generate_pagination, process_page},
Clock,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct UserLookup { struct UserLookup {
@ -121,6 +124,7 @@ impl ActiveSessionLookupError {
} }
} }
#[derive(sqlx::FromRow)]
struct SessionLookup { struct SessionLookup {
user_session_id: Uuid, user_session_id: Uuid,
user_id: Uuid, user_id: Uuid,
@ -223,6 +227,72 @@ pub async fn lookup_active_session(
Ok(res) Ok(res)
} }
#[tracing::instrument(
skip_all,
fields(
user.id = %user.data,
user.username = user.username,
),
err(Display),
)]
pub async fn get_paginated_user_sessions(
executor: impl PgExecutor<'_>,
user: &User<PostgresqlBackend>,
before: Option<Ulid>,
after: Option<Ulid>,
first: Option<usize>,
last: Option<usize>,
) -> Result<(bool, bool, Vec<BrowserSession<PostgresqlBackend>>), anyhow::Error> {
let mut query = String::from(
r#"
SELECT
s.user_session_id,
u.user_id,
u.username,
s.created_at,
a.user_session_authentication_id AS "last_authentication_id",
a.created_at AS "last_authd_at",
ue.user_email_id AS "user_email_id",
ue.email AS "user_email",
ue.created_at AS "user_email_created_at",
ue.confirmed_at AS "user_email_confirmed_at"
FROM user_sessions s
INNER JOIN users u
USING (user_id)
LEFT JOIN user_session_authentications a
USING (user_session_id)
LEFT JOIN user_emails ue
ON ue.user_email_id = u.primary_user_email_id
"#,
);
let mut arguments = PgArguments::default();
query += " WHERE s.finished_at IS NULL AND s.user_id = ";
arguments.add(Uuid::from(user.data));
arguments.format_placeholder(&mut query)?;
generate_pagination(
&mut query,
"s.user_session_id",
&mut arguments,
before,
after,
first,
last,
)?;
let page: Vec<SessionLookup> = sqlx::query_as_with(&query, arguments)
.fetch_all(executor)
.instrument(info_span!("Fetch paginated user emails", query = query))
.await?;
let (has_previous_page, has_next_page, page) = process_page(page, first, last)?;
let page: Result<Vec<_>, _> = page.into_iter().map(TryInto::try_into).collect();
Ok((has_previous_page, has_next_page, page?))
}
#[tracing::instrument( #[tracing::instrument(
skip_all, skip_all,
fields( fields(
@ -681,8 +751,6 @@ pub async fn get_paginated_user_emails(
first: Option<usize>, first: Option<usize>,
last: Option<usize>, last: Option<usize>,
) -> Result<(bool, bool, Vec<UserEmail<PostgresqlBackend>>), anyhow::Error> { ) -> Result<(bool, bool, Vec<UserEmail<PostgresqlBackend>>), anyhow::Error> {
// ref: https://github.com/graphql/graphql-relay-js/issues/94#issuecomment-232410564
// 1. Start from the greedy query: SELECT * FROM table
let mut query = String::from( let mut query = String::from(
r#" r#"
SELECT SELECT
@ -700,64 +768,27 @@ pub async fn get_paginated_user_emails(
arguments.add(Uuid::from(user.data)); arguments.add(Uuid::from(user.data));
arguments.format_placeholder(&mut query)?; arguments.format_placeholder(&mut query)?;
// 2. If the after argument is provided, add `id > parsed_cursor` to the `WHERE` generate_pagination(
// clause &mut query,
if let Some(after) = after { "ue.user_email_id",
query += " AND ue.user_email_id > "; &mut arguments,
arguments.add(Uuid::from(after)); before,
arguments.format_placeholder(&mut query)?; after,
} first,
last,
)?;
// 3. If the before argument is provided, add `id < parsed_cursor` to the let page: Vec<UserEmailLookup> = sqlx::query_as_with(&query, arguments)
// `WHERE` clause
if let Some(before) = before {
query += " AND ue.user_email_id < ";
arguments.add(Uuid::from(before));
arguments.format_placeholder(&mut query)?;
}
// 4. If the first argument is provided, add `ORDER BY id ASC LIMIT first+1` to
// the query
let limit = if let Some(count) = first {
query += " ORDER BY ue.user_email_id ASC LIMIT ";
arguments.add((count + 1) as i64);
arguments.format_placeholder(&mut query)?;
count
// 5. If the first argument is provided, add `ORDER BY id DESC LIMIT last+1`
// to the query
} else if let Some(count) = last {
query += " ORDER BY ue.user_email_id DESC LIMIT ";
arguments.add((count + 1) as i64);
arguments.format_placeholder(&mut query)?;
count
} else {
bail!("Either 'first' or 'last' must be specified");
};
let mut res: Vec<UserEmailLookup> = sqlx::query_as_with(&query, arguments)
.fetch_all(executor) .fetch_all(executor)
.instrument(info_span!("Fetch paginated user emails", query = query)) .instrument(info_span!("Fetch paginated user sessions", query = query))
.await?; .await?;
let is_full = res.len() == (limit + 1); let (has_previous_page, has_next_page, page) = process_page(page, first, last)?;
if is_full {
res.pop();
}
let (has_previous_page, has_next_page) = if first.is_some() {
(false, is_full)
} else if last.is_some() {
// 5. If the last argument is provided, I reverse the order of the results
res.reverse();
(is_full, false)
} else {
unreachable!()
};
Ok(( Ok((
has_previous_page, has_previous_page,
has_next_page, has_next_page,
res.into_iter().map(Into::into).collect(), page.into_iter().map(Into::into).collect(),
)) ))
} }