diff --git a/Cargo.lock b/Cargo.lock index 67ab0d4e..a328226f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2604,7 +2604,6 @@ dependencies = [ "serde", "sqlx", "tokio", - "tokio-stream", "ulid", ] diff --git a/crates/graphql/Cargo.toml b/crates/graphql/Cargo.toml index 55b4cf22..ab29eefc 100644 --- a/crates/graphql/Cargo.toml +++ b/crates/graphql/Cargo.toml @@ -11,7 +11,6 @@ chrono = "0.4.22" serde = { version = "1.0.147", features = ["derive"] } sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "postgres"] } tokio = { version = "1.21.2", features = ["time"] } -tokio-stream = "0.1.11" ulid = "1.0.0" mas-axum-utils = { path = "../axum-utils" } diff --git a/crates/graphql/schema.graphql b/crates/graphql/schema.graphql index ab9dbe80..86eadde9 100644 --- a/crates/graphql/schema.graphql +++ b/crates/graphql/schema.graphql @@ -11,6 +11,35 @@ type BrowserSession { createdAt: DateTime! } +type BrowserSessionConnection { + """ + Information to aid in pagination. + """ + pageInfo: PageInfo! + """ + A list of edges. + """ + edges: [BrowserSessionEdge!]! + """ + A list of nodes. + """ + nodes: [BrowserSession!]! +} + +""" +An edge in a connection. +""" +type BrowserSessionEdge { + """ + A cursor for use in pagination + """ + cursor: String! + """ + The item at the end of the edge + """ + node: BrowserSession! +} + """ Implement the DateTime scalar @@ -21,13 +50,6 @@ scalar DateTime -type Mutation { - """ - A dummy mutation so that the mutation object is not empty - """ - hello: Boolean! -} - """ Information about pagination in a connection """ @@ -51,22 +73,22 @@ type PageInfo { } type Query { - currentSession: BrowserSession + """ + Get the current logged in browser session + """ + currentBrowserSession: BrowserSession + """ + Get the current logged in user + """ currentUser: User } -type Subscription { - """ - A dump subscription to try out the websocket - """ - integers(step: Int! = 1): Int! -} - type User { id: ID! username: String! primaryEmail: UserEmail + browserSessions(after: String, before: String, first: Int, last: Int): BrowserSessionConnection! emails(after: String, before: String, first: Int, last: Int): UserEmailConnection! } @@ -109,7 +131,5 @@ type UserEmailEdge { schema { query: Query - mutation: Mutation - subscription: Subscription } diff --git a/crates/graphql/src/lib.rs b/crates/graphql/src/lib.rs index dab25bfe..2f488873 100644 --- a/crates/graphql/src/lib.rs +++ b/crates/graphql/src/lib.rs @@ -22,23 +22,20 @@ #![warn(clippy::pedantic)] #![allow(clippy::module_name_repetitions, clippy::missing_errors_doc)] -use std::time::Duration; - -use async_graphql::Context; +use async_graphql::{Context, EmptyMutation, EmptySubscription}; use mas_axum_utils::SessionInfo; use sqlx::PgPool; -use tokio_stream::{Stream, StreamExt}; use self::model::{BrowserSession, User}; mod model; -pub type Schema = async_graphql::Schema; -pub type SchemaBuilder = async_graphql::SchemaBuilder; +pub type Schema = async_graphql::Schema; +pub type SchemaBuilder = async_graphql::SchemaBuilder; #[must_use] pub fn schema_builder() -> SchemaBuilder { - async_graphql::Schema::build(Query::new(), Mutation::new(), Subscription::new()) + async_graphql::Schema::build(Query::new(), EmptyMutation, EmptySubscription) } #[derive(Default)] @@ -55,7 +52,8 @@ impl Query { #[async_graphql::Object] impl Query { - async fn current_session( + /// Get the current logged in browser session + async fn current_browser_session( &self, ctx: &Context<'_>, ) -> Result, async_graphql::Error> { @@ -67,6 +65,7 @@ impl Query { Ok(session.map(BrowserSession::from)) } + /// Get the current logged in user async fn current_user(&self, ctx: &Context<'_>) -> Result, async_graphql::Error> { let database = ctx.data::()?; let session_info = ctx.data::()?; @@ -76,48 +75,3 @@ impl Query { Ok(session.map(User::from)) } } - -#[derive(Default)] -pub struct Mutation { - _private: (), -} - -impl Mutation { - #[must_use] - pub fn new() -> Self { - Self::default() - } -} - -#[async_graphql::Object] -impl Mutation { - /// A dummy mutation so that the mutation object is not empty - async fn hello(&self) -> bool { - true - } -} - -#[derive(Default)] -pub struct Subscription { - _private: (), -} - -impl Subscription { - #[must_use] - pub fn new() -> Self { - Self::default() - } -} - -#[async_graphql::Subscription] -impl Subscription { - /// A dump subscription to try out the websocket - async fn integers(&self, #[graphql(default = 1)] step: i32) -> impl Stream { - let mut value = 0; - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1))) - .map(move |_| { - value += step; - value - }) - } -} diff --git a/crates/graphql/src/model.rs b/crates/graphql/src/model.rs index cf464693..ed622561 100644 --- a/crates/graphql/src/model.rs +++ b/crates/graphql/src/model.rs @@ -12,184 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use async_graphql::{ - connection::{query, Connection, Edge, OpaqueCursor}, - Context, Object, ID, +mod browser_sessions; +mod cursor; +mod users; + +pub use self::{ + browser_sessions::{Authentication, BrowserSession}, + cursor::{Cursor, NodeCursor, NodeType}, + users::{User, UserEmail}, }; -use chrono::{DateTime, Utc}; -use mas_storage::PostgresqlBackend; -use serde::{Deserialize, Serialize}; -use sqlx::PgPool; -use ulid::Ulid; - -#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)] -#[serde(rename = "snake_case")] -enum NodeType { - User, - UserEmail, - BrowserSession, -} - -#[derive(Serialize, Deserialize, PartialEq, Eq)] -struct NodeCursor(NodeType, Ulid); - -impl NodeCursor { - fn extract_for_type(&self, node_type: NodeType) -> Result { - if self.0 == node_type { - Ok(self.1) - } else { - Err(async_graphql::Error::new("invalid cursor")) - } - } -} - -type Cursor = OpaqueCursor; - -pub struct BrowserSession(mas_data_model::BrowserSession); - -impl From> for BrowserSession { - fn from(v: mas_data_model::BrowserSession) -> Self { - Self(v) - } -} - -#[Object] -impl BrowserSession { - async fn id(&self) -> ID { - ID(self.0.data.to_string()) - } - - async fn user(&self) -> User { - User(self.0.user.clone()) - } - - async fn last_authentication(&self) -> Option { - self.0.last_authentication.clone().map(Authentication) - } - - async fn created_at(&self) -> DateTime { - self.0.created_at - } -} - -pub struct User(mas_data_model::User); - -impl From> for User { - fn from(v: mas_data_model::User) -> Self { - Self(v) - } -} - -impl From> for User { - fn from(v: mas_data_model::BrowserSession) -> Self { - Self(v.user) - } -} - -#[Object] -impl User { - async fn id(&self) -> ID { - ID(self.0.data.to_string()) - } - - async fn username(&self) -> &str { - &self.0.username - } - - async fn primary_email(&self) -> Option { - self.0.primary_email.clone().map(UserEmail) - } - - async fn emails( - &self, - ctx: &Context<'_>, - after: Option, - before: Option, - first: Option, - last: Option, - ) -> Result, async_graphql::Error> { - let database = ctx.data::()?; - - query( - after, - before, - first, - last, - |after, before, first, last| async move { - let mut conn = database.acquire().await?; - let after_id = after - .map(|x: OpaqueCursor| x.extract_for_type(NodeType::UserEmail)) - .transpose()?; - let before_id = before - .map(|x: OpaqueCursor| x.extract_for_type(NodeType::UserEmail)) - .transpose()?; - - let (has_previous_page, has_next_page, edges) = - mas_storage::user::get_paginated_user_emails( - &mut conn, &self.0, before_id, after_id, first, last, - ) - .await?; - - let mut connection = Connection::with_additional_fields( - has_previous_page, - has_next_page, - UserEmailsPagination(self.0.clone()), - ); - connection.edges.extend(edges.into_iter().map(|u| { - Edge::new( - OpaqueCursor(NodeCursor(NodeType::UserEmail, u.data)), - UserEmail(u), - ) - })); - - Ok::<_, async_graphql::Error>(connection) - }, - ) - .await - } -} - -pub struct Authentication(mas_data_model::Authentication); - -#[Object] -impl Authentication { - async fn id(&self) -> ID { - ID(self.0.data.to_string()) - } - - async fn created_at(&self) -> DateTime { - self.0.created_at - } -} - -pub struct UserEmail(mas_data_model::UserEmail); - -#[Object] -impl UserEmail { - async fn id(&self) -> ID { - ID(self.0.data.to_string()) - } - - async fn email(&self) -> &str { - &self.0.email - } - - async fn created_at(&self) -> DateTime { - self.0.created_at - } - - async fn confirmed_at(&self) -> Option> { - self.0.confirmed_at - } -} - -pub struct UserEmailsPagination(mas_data_model::User); - -#[Object] -impl UserEmailsPagination { - async fn total_count(&self, ctx: &Context<'_>) -> Result { - let mut conn = ctx.data::()?.acquire().await?; - let count = mas_storage::user::count_user_emails(&mut conn, &self.0).await?; - Ok(count) - } -} diff --git a/crates/graphql/src/model/browser_sessions.rs b/crates/graphql/src/model/browser_sessions.rs new file mode 100644 index 00000000..a0a3cda1 --- /dev/null +++ b/crates/graphql/src/model/browser_sessions.rs @@ -0,0 +1,59 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_graphql::{Object, ID}; +use chrono::{DateTime, Utc}; +use mas_storage::PostgresqlBackend; + +use super::User; + +pub struct BrowserSession(pub mas_data_model::BrowserSession); + +impl From> for BrowserSession { + fn from(v: mas_data_model::BrowserSession) -> Self { + Self(v) + } +} + +#[Object] +impl BrowserSession { + async fn id(&self) -> ID { + ID(self.0.data.to_string()) + } + + async fn user(&self) -> User { + User(self.0.user.clone()) + } + + async fn last_authentication(&self) -> Option { + self.0.last_authentication.clone().map(Authentication) + } + + async fn created_at(&self) -> DateTime { + self.0.created_at + } +} + +pub struct Authentication(pub mas_data_model::Authentication); + +#[Object] +impl Authentication { + async fn id(&self) -> ID { + ID(self.0.data.to_string()) + } + + async fn created_at(&self) -> DateTime { + self.0.created_at + } +} diff --git a/crates/graphql/src/model/cursor.rs b/crates/graphql/src/model/cursor.rs new file mode 100644 index 00000000..f72d36a7 --- /dev/null +++ b/crates/graphql/src/model/cursor.rs @@ -0,0 +1,39 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_graphql::connection::OpaqueCursor; +use serde::{Deserialize, Serialize}; +use ulid::Ulid; + +#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)] +#[serde(rename = "snake_case")] +pub enum NodeType { + UserEmail, + BrowserSession, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq)] +pub struct NodeCursor(pub NodeType, pub Ulid); + +impl NodeCursor { + pub fn extract_for_type(&self, node_type: NodeType) -> Result { + if self.0 == node_type { + Ok(self.1) + } else { + Err(async_graphql::Error::new("invalid cursor")) + } + } +} + +pub type Cursor = OpaqueCursor; diff --git a/crates/graphql/src/model/users.rs b/crates/graphql/src/model/users.rs new file mode 100644 index 00000000..9605721d --- /dev/null +++ b/crates/graphql/src/model/users.rs @@ -0,0 +1,176 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_graphql::{ + connection::{query, Connection, Edge, OpaqueCursor}, + Context, Object, ID, +}; +use chrono::{DateTime, Utc}; +use mas_storage::PostgresqlBackend; +use sqlx::PgPool; + +use super::{BrowserSession, Cursor, NodeCursor, NodeType}; + +pub struct User(pub mas_data_model::User); + +impl From> for User { + fn from(v: mas_data_model::User) -> Self { + Self(v) + } +} + +impl From> for User { + fn from(v: mas_data_model::BrowserSession) -> Self { + Self(v.user) + } +} + +#[Object] +impl User { + async fn id(&self) -> ID { + ID(self.0.data.to_string()) + } + + async fn username(&self) -> &str { + &self.0.username + } + + async fn primary_email(&self) -> Option { + self.0.primary_email.clone().map(UserEmail) + } + + async fn browser_sessions( + &self, + ctx: &Context<'_>, + after: Option, + before: Option, + first: Option, + last: Option, + ) -> Result, async_graphql::Error> { + let database = ctx.data::()?; + + query( + after, + before, + first, + last, + |after, before, first, last| async move { + let mut conn = database.acquire().await?; + let after_id = after + .map(|x: OpaqueCursor| x.extract_for_type(NodeType::UserEmail)) + .transpose()?; + let before_id = before + .map(|x: OpaqueCursor| x.extract_for_type(NodeType::UserEmail)) + .transpose()?; + + let (has_previous_page, has_next_page, edges) = + mas_storage::user::get_paginated_user_sessions( + &mut conn, &self.0, before_id, after_id, first, last, + ) + .await?; + + let mut connection = Connection::new(has_previous_page, has_next_page); + connection.edges.extend(edges.into_iter().map(|u| { + Edge::new( + OpaqueCursor(NodeCursor(NodeType::BrowserSession, u.data)), + BrowserSession(u), + ) + })); + + Ok::<_, async_graphql::Error>(connection) + }, + ) + .await + } + + async fn emails( + &self, + ctx: &Context<'_>, + after: Option, + before: Option, + first: Option, + last: Option, + ) -> Result, async_graphql::Error> { + let database = ctx.data::()?; + + query( + after, + before, + first, + last, + |after, before, first, last| async move { + let mut conn = database.acquire().await?; + let after_id = after + .map(|x: OpaqueCursor| x.extract_for_type(NodeType::UserEmail)) + .transpose()?; + let before_id = before + .map(|x: OpaqueCursor| x.extract_for_type(NodeType::UserEmail)) + .transpose()?; + + let (has_previous_page, has_next_page, edges) = + mas_storage::user::get_paginated_user_emails( + &mut conn, &self.0, before_id, after_id, first, last, + ) + .await?; + + let mut connection = Connection::with_additional_fields( + has_previous_page, + has_next_page, + UserEmailsPagination(self.0.clone()), + ); + connection.edges.extend(edges.into_iter().map(|u| { + Edge::new( + OpaqueCursor(NodeCursor(NodeType::UserEmail, u.data)), + UserEmail(u), + ) + })); + + Ok::<_, async_graphql::Error>(connection) + }, + ) + .await + } +} + +pub struct UserEmail(mas_data_model::UserEmail); + +#[Object] +impl UserEmail { + async fn id(&self) -> ID { + ID(self.0.data.to_string()) + } + + async fn email(&self) -> &str { + &self.0.email + } + + async fn created_at(&self) -> DateTime { + self.0.created_at + } + + async fn confirmed_at(&self) -> Option> { + self.0.confirmed_at + } +} + +pub struct UserEmailsPagination(mas_data_model::User); + +#[Object] +impl UserEmailsPagination { + async fn total_count(&self, ctx: &Context<'_>) -> Result { + let mut conn = ctx.data::()?.acquire().await?; + let count = mas_storage::user::count_user_emails(&mut conn, &self.0).await?; + Ok(count) + } +} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 06a71bb7..86b0febe 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -77,6 +77,7 @@ impl StorageBackendMarker for PostgresqlBackend {} pub mod compat; pub mod oauth2; +pub(crate) mod pagination; pub mod user; /// Embedded migrations, allowing them to run on startup diff --git a/crates/storage/src/pagination.rs b/crates/storage/src/pagination.rs new file mode 100644 index 00000000..eb2a505e --- /dev/null +++ b/crates/storage/src/pagination.rs @@ -0,0 +1,100 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Write; + +use sqlx::Arguments; +use ulid::Ulid; +use uuid::Uuid; + +pub fn generate_pagination<'a, A, W>( + query: &mut W, + id_field: &'static str, + arguments: &mut A, + before: Option, + after: Option, + first: Option, + last: Option, +) -> Result<(), anyhow::Error> +where + W: Write, + A: Arguments<'a>, + Uuid: sqlx::Type + sqlx::Encode<'a, A::Database>, + i64: sqlx::Type + sqlx::Encode<'a, A::Database>, +{ + // ref: https://github.com/graphql/graphql-relay-js/issues/94#issuecomment-232410564 + // 1. Start from the greedy query: SELECT * FROM table + + // 2. If the after argument is provided, add `id > parsed_cursor` to the `WHERE` + // clause + if let Some(after) = after { + write!(query, " AND {id_field} > ")?; + arguments.add(Uuid::from(after)); + arguments.format_placeholder(query)?; + } + + // 3. If the before argument is provided, add `id < parsed_cursor` to the + // `WHERE` clause + if let Some(before) = before { + write!(query, " AND {id_field} < ")?; + arguments.add(Uuid::from(before)); + arguments.format_placeholder(query)?; + } + + // 4. If the first argument is provided, add `ORDER BY id ASC LIMIT first+1` to + // the query + if let Some(count) = first { + write!(query, " ORDER BY {id_field} ASC LIMIT ")?; + arguments.add((count + 1) as i64); + arguments.format_placeholder(query)?; + // 5. If the first argument is provided, add `ORDER BY id DESC LIMIT last+1` + // to the query + } else if let Some(count) = last { + write!(query, " ORDER BY ue.user_email_id DESC LIMIT ")?; + arguments.add((count + 1) as i64); + arguments.format_placeholder(query)?; + } else { + anyhow::bail!("Either 'first' or 'last' must be specified"); + } + + Ok(()) +} + +pub fn process_page( + mut page: Vec, + first: Option, + last: Option, +) -> Result<(bool, bool, Vec), anyhow::Error> { + let limit = match (first, last) { + (Some(count), _) | (_, Some(count)) => count, + _ => anyhow::bail!("Either 'first' or 'last' must be specified"), + }; + + let is_full = page.len() == (limit + 1); + if is_full { + page.pop(); + } + + let (has_previous_page, has_next_page) = if first.is_some() { + (false, is_full) + } else if last.is_some() { + // 6. If the last argument is provided, I reverse the order of the results + page.reverse(); + (is_full, false) + } else { + unreachable!() + }; + + Ok((has_previous_page, has_next_page, page)) +} diff --git a/crates/storage/src/user.rs b/crates/storage/src/user.rs index e65a3970..6106799b 100644 --- a/crates/storage/src/user.rs +++ b/crates/storage/src/user.rs @@ -1,4 +1,4 @@ -// Copyright 2021 The Matrix.org Foundation C.I.C. +// Copyright 2021, 2022 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -31,7 +31,10 @@ use ulid::Ulid; use uuid::Uuid; use super::{DatabaseInconsistencyError, PostgresqlBackend}; -use crate::Clock; +use crate::{ + pagination::{generate_pagination, process_page}, + Clock, +}; #[derive(Debug, Clone)] struct UserLookup { @@ -121,6 +124,7 @@ impl ActiveSessionLookupError { } } +#[derive(sqlx::FromRow)] struct SessionLookup { user_session_id: Uuid, user_id: Uuid, @@ -223,6 +227,72 @@ pub async fn lookup_active_session( Ok(res) } +#[tracing::instrument( + skip_all, + fields( + user.id = %user.data, + user.username = user.username, + ), + err(Display), +)] +pub async fn get_paginated_user_sessions( + executor: impl PgExecutor<'_>, + user: &User, + before: Option, + after: Option, + first: Option, + last: Option, +) -> Result<(bool, bool, Vec>), anyhow::Error> { + let mut query = String::from( + r#" + SELECT + s.user_session_id, + u.user_id, + u.username, + s.created_at, + a.user_session_authentication_id AS "last_authentication_id", + a.created_at AS "last_authd_at", + ue.user_email_id AS "user_email_id", + ue.email AS "user_email", + ue.created_at AS "user_email_created_at", + ue.confirmed_at AS "user_email_confirmed_at" + FROM user_sessions s + INNER JOIN users u + USING (user_id) + LEFT JOIN user_session_authentications a + USING (user_session_id) + LEFT JOIN user_emails ue + ON ue.user_email_id = u.primary_user_email_id + "#, + ); + + let mut arguments = PgArguments::default(); + + query += " WHERE s.finished_at IS NULL AND s.user_id = "; + arguments.add(Uuid::from(user.data)); + arguments.format_placeholder(&mut query)?; + + generate_pagination( + &mut query, + "s.user_session_id", + &mut arguments, + before, + after, + first, + last, + )?; + + let page: Vec = sqlx::query_as_with(&query, arguments) + .fetch_all(executor) + .instrument(info_span!("Fetch paginated user emails", query = query)) + .await?; + + let (has_previous_page, has_next_page, page) = process_page(page, first, last)?; + + let page: Result, _> = page.into_iter().map(TryInto::try_into).collect(); + Ok((has_previous_page, has_next_page, page?)) +} + #[tracing::instrument( skip_all, fields( @@ -681,8 +751,6 @@ pub async fn get_paginated_user_emails( first: Option, last: Option, ) -> Result<(bool, bool, Vec>), anyhow::Error> { - // ref: https://github.com/graphql/graphql-relay-js/issues/94#issuecomment-232410564 - // 1. Start from the greedy query: SELECT * FROM table let mut query = String::from( r#" SELECT @@ -700,64 +768,27 @@ pub async fn get_paginated_user_emails( arguments.add(Uuid::from(user.data)); arguments.format_placeholder(&mut query)?; - // 2. If the after argument is provided, add `id > parsed_cursor` to the `WHERE` - // clause - if let Some(after) = after { - query += " AND ue.user_email_id > "; - arguments.add(Uuid::from(after)); - arguments.format_placeholder(&mut query)?; - } + generate_pagination( + &mut query, + "ue.user_email_id", + &mut arguments, + before, + after, + first, + last, + )?; - // 3. If the before argument is provided, add `id < parsed_cursor` to the - // `WHERE` clause - if let Some(before) = before { - query += " AND ue.user_email_id < "; - arguments.add(Uuid::from(before)); - arguments.format_placeholder(&mut query)?; - } - - // 4. If the first argument is provided, add `ORDER BY id ASC LIMIT first+1` to - // the query - let limit = if let Some(count) = first { - query += " ORDER BY ue.user_email_id ASC LIMIT "; - arguments.add((count + 1) as i64); - arguments.format_placeholder(&mut query)?; - count - // 5. If the first argument is provided, add `ORDER BY id DESC LIMIT last+1` - // to the query - } else if let Some(count) = last { - query += " ORDER BY ue.user_email_id DESC LIMIT "; - arguments.add((count + 1) as i64); - arguments.format_placeholder(&mut query)?; - count - } else { - bail!("Either 'first' or 'last' must be specified"); - }; - - let mut res: Vec = sqlx::query_as_with(&query, arguments) + let page: Vec = sqlx::query_as_with(&query, arguments) .fetch_all(executor) - .instrument(info_span!("Fetch paginated user emails", query = query)) + .instrument(info_span!("Fetch paginated user sessions", query = query)) .await?; - let is_full = res.len() == (limit + 1); - if is_full { - res.pop(); - } - - let (has_previous_page, has_next_page) = if first.is_some() { - (false, is_full) - } else if last.is_some() { - // 5. If the last argument is provided, I reverse the order of the results - res.reverse(); - (is_full, false) - } else { - unreachable!() - }; + let (has_previous_page, has_next_page, page) = process_page(page, first, last)?; Ok(( has_previous_page, has_next_page, - res.into_iter().map(Into::into).collect(), + page.into_iter().map(Into::into).collect(), )) }