You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-11-20 12:02:22 +03:00
16
crates/storage-pg/.sqlx/query-359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b.json
generated
Normal file
16
crates/storage-pg/.sqlx/query-359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b.json
generated
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO apalis.jobs (job, id, job_type)\n VALUES ($1::json, $2::text, $3::text)\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Json",
|
||||
"Text",
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b"
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT id as \"id!\"\n FROM apalis.push_job($1::text, $2::json, 'Pending', now(), 25)\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id!",
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Json"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "b753790eecbbb4bcd87b9e9a1d1b0dd6c3b50e82ffbfee356e2cf755d72f00be"
|
||||
}
|
||||
@@ -1,46 +0,0 @@
|
||||
CREATE OR REPLACE FUNCTION apalis.push_job(
|
||||
job_type text,
|
||||
job json DEFAULT NULL :: json,
|
||||
job_id text DEFAULT NULL :: text,
|
||||
status text DEFAULT 'Pending' :: text,
|
||||
run_at timestamptz DEFAULT NOW() :: timestamptz,
|
||||
max_attempts integer DEFAULT 25 :: integer
|
||||
) RETURNS apalis.jobs AS $$
|
||||
|
||||
DECLARE
|
||||
v_job_row apalis.jobs;
|
||||
v_job_id text;
|
||||
|
||||
BEGIN
|
||||
IF job_type is not NULL and length(job_type) > 512 THEN raise exception 'Job_type is too long (max length: 512).' USING errcode = 'APAJT';
|
||||
END IF;
|
||||
|
||||
IF max_attempts < 1 THEN raise exception 'Job maximum attempts must be at least 1.' USING errcode = 'APAMA';
|
||||
end IF;
|
||||
|
||||
SELECT
|
||||
uuid_in(
|
||||
md5(random() :: text || now() :: text) :: cstring
|
||||
) INTO v_job_id;
|
||||
INSERT INTO
|
||||
apalis.jobs
|
||||
VALUES
|
||||
(
|
||||
job,
|
||||
v_job_id,
|
||||
job_type,
|
||||
status,
|
||||
0,
|
||||
max_attempts,
|
||||
run_at,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL
|
||||
)
|
||||
returning * INTO v_job_row;
|
||||
RETURN v_job_row;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql volatile;
|
||||
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
CREATE EXTENSION IF NOT EXISTS pgcrypto;
|
||||
|
||||
CREATE OR REPLACE FUNCTION generate_ulid()
|
||||
RETURNS TEXT
|
||||
AS $$
|
||||
DECLARE
|
||||
-- Crockford's Base32
|
||||
encoding BYTEA = '0123456789ABCDEFGHJKMNPQRSTVWXYZ';
|
||||
timestamp BYTEA = E'\\000\\000\\000\\000\\000\\000';
|
||||
output TEXT = '';
|
||||
|
||||
unix_time BIGINT;
|
||||
ulid BYTEA;
|
||||
BEGIN
|
||||
-- 6 timestamp bytes
|
||||
unix_time = (EXTRACT(EPOCH FROM CLOCK_TIMESTAMP()) * 1000)::BIGINT;
|
||||
timestamp = SET_BYTE(timestamp, 0, (unix_time >> 40)::BIT(8)::INTEGER);
|
||||
timestamp = SET_BYTE(timestamp, 1, (unix_time >> 32)::BIT(8)::INTEGER);
|
||||
timestamp = SET_BYTE(timestamp, 2, (unix_time >> 24)::BIT(8)::INTEGER);
|
||||
timestamp = SET_BYTE(timestamp, 3, (unix_time >> 16)::BIT(8)::INTEGER);
|
||||
timestamp = SET_BYTE(timestamp, 4, (unix_time >> 8)::BIT(8)::INTEGER);
|
||||
timestamp = SET_BYTE(timestamp, 5, unix_time::BIT(8)::INTEGER);
|
||||
|
||||
-- 10 entropy bytes
|
||||
ulid = timestamp || gen_random_bytes(10);
|
||||
|
||||
-- Encode the timestamp
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 224) >> 5));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 31)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 1) & 248) >> 3));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 1) & 7) << 2) | ((GET_BYTE(ulid, 2) & 192) >> 6)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 2) & 62) >> 1));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 2) & 1) << 4) | ((GET_BYTE(ulid, 3) & 240) >> 4)));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 3) & 15) << 1) | ((GET_BYTE(ulid, 4) & 128) >> 7)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 4) & 124) >> 2));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 4) & 3) << 3) | ((GET_BYTE(ulid, 5) & 224) >> 5)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 5) & 31)));
|
||||
|
||||
-- Encode the entropy
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 6) & 248) >> 3));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 6) & 7) << 2) | ((GET_BYTE(ulid, 7) & 192) >> 6)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 7) & 62) >> 1));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 7) & 1) << 4) | ((GET_BYTE(ulid, 8) & 240) >> 4)));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 8) & 15) << 1) | ((GET_BYTE(ulid, 9) & 128) >> 7)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 9) & 124) >> 2));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 9) & 3) << 3) | ((GET_BYTE(ulid, 10) & 224) >> 5)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 10) & 31)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 11) & 248) >> 3));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 11) & 7) << 2) | ((GET_BYTE(ulid, 12) & 192) >> 6)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 12) & 62) >> 1));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 12) & 1) << 4) | ((GET_BYTE(ulid, 13) & 240) >> 4)));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 13) & 15) << 1) | ((GET_BYTE(ulid, 14) & 128) >> 7)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 14) & 124) >> 2));
|
||||
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 14) & 3) << 3) | ((GET_BYTE(ulid, 15) & 224) >> 5)));
|
||||
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 15) & 31)));
|
||||
|
||||
RETURN output;
|
||||
END
|
||||
$$
|
||||
LANGUAGE plpgsql
|
||||
VOLATILE;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION apalis.push_job(
|
||||
job_type text,
|
||||
job json DEFAULT NULL :: json,
|
||||
status text DEFAULT 'Pending' :: text,
|
||||
run_at timestamptz DEFAULT NOW() :: timestamptz,
|
||||
max_attempts integer DEFAULT 25 :: integer
|
||||
) RETURNS apalis.jobs AS $$
|
||||
|
||||
DECLARE
|
||||
v_job_row apalis.jobs;
|
||||
v_job_id text;
|
||||
|
||||
BEGIN
|
||||
IF job_type is not NULL and length(job_type) > 512 THEN raise exception 'Job_type is too long (max length: 512).' USING errcode = 'APAJT';
|
||||
END IF;
|
||||
|
||||
IF max_attempts < 1 THEN raise exception 'Job maximum attempts must be at least 1.' USING errcode = 'APAMA';
|
||||
end IF;
|
||||
|
||||
SELECT
|
||||
CONCAT('JID-' || generate_ulid()) INTO v_job_id;
|
||||
INSERT INTO
|
||||
apalis.jobs
|
||||
VALUES
|
||||
(
|
||||
job,
|
||||
v_job_id,
|
||||
job_type,
|
||||
status,
|
||||
0,
|
||||
max_attempts,
|
||||
run_at,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL
|
||||
)
|
||||
returning * INTO v_job_row;
|
||||
RETURN v_job_row;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql volatile;
|
||||
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
DROP FUNCTION IF EXISTS apalis.push_job(
|
||||
job_type text,
|
||||
job json,
|
||||
job_id text,
|
||||
status text,
|
||||
run_at timestamptz,
|
||||
max_attempts integer
|
||||
);
|
||||
@@ -0,0 +1,53 @@
|
||||
-- Copyright 2023 The Matrix.org Foundation C.I.C.
|
||||
--
|
||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||
-- you may not use this file except in compliance with the License.
|
||||
-- You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
|
||||
/**
|
||||
* This fully drops any existing push_job functions, as we're not relying on them anymore
|
||||
*/
|
||||
|
||||
-- Temporarily change the client_min_messages to suppress the NOTICEs
|
||||
SET client_min_messages = 'ERROR';
|
||||
|
||||
DROP FUNCTION IF EXISTS apalis.push_job(
|
||||
job_type text,
|
||||
job json,
|
||||
job_id text,
|
||||
status text,
|
||||
run_at timestamptz,
|
||||
max_attempts integer
|
||||
);
|
||||
|
||||
DROP FUNCTION IF EXISTS apalis.push_job(
|
||||
job_type text,
|
||||
job json,
|
||||
status text,
|
||||
run_at timestamptz,
|
||||
max_attempts integer
|
||||
);
|
||||
|
||||
-- Reset the client_min_messages
|
||||
RESET client_min_messages;
|
||||
|
||||
/**
|
||||
* Remove the old applied migrations in case they were applied:
|
||||
* - 20220709210445_add_job_fn.sql
|
||||
* - 20230330210841_replace_add_job_fn.sql
|
||||
* - 20230408110421_drop_old_push_job.sql
|
||||
*/
|
||||
DELETE FROM public._sqlx_migrations
|
||||
WHERE version IN (
|
||||
20220709210445,
|
||||
20230330210841,
|
||||
20230408110421
|
||||
);
|
||||
@@ -18,7 +18,7 @@ use async_trait::async_trait;
|
||||
use mas_storage::job::{JobId, JobRepository, JobSubmission};
|
||||
use sqlx::PgConnection;
|
||||
|
||||
use crate::{errors::DatabaseInconsistencyError, DatabaseError, ExecuteExt};
|
||||
use crate::{DatabaseError, ExecuteExt};
|
||||
|
||||
/// An implementation of [`JobRepository`] for a PostgreSQL connection.
|
||||
pub struct PgJobRepository<'c> {
|
||||
@@ -43,7 +43,7 @@ impl<'c> JobRepository for PgJobRepository<'c> {
|
||||
fields(
|
||||
db.statement,
|
||||
job.id,
|
||||
job.name,
|
||||
job.name = submission.name(),
|
||||
),
|
||||
err,
|
||||
)]
|
||||
@@ -51,25 +51,24 @@ impl<'c> JobRepository for PgJobRepository<'c> {
|
||||
&mut self,
|
||||
submission: JobSubmission,
|
||||
) -> Result<JobId, Self::Error> {
|
||||
// XXX: The apalis.push_job function is not unique, so we have to specify all
|
||||
// the arguments
|
||||
let res = sqlx::query_scalar!(
|
||||
// XXX: This does not use the clock nor the rng
|
||||
let id = JobId::new();
|
||||
tracing::Span::current().record("job.id", tracing::field::display(&id));
|
||||
|
||||
let res = sqlx::query!(
|
||||
r#"
|
||||
SELECT id as "id!"
|
||||
FROM apalis.push_job($1::text, $2::json, 'Pending', now(), 25)
|
||||
INSERT INTO apalis.jobs (job, id, job_type)
|
||||
VALUES ($1::json, $2::text, $3::text)
|
||||
"#,
|
||||
submission.name(),
|
||||
submission.payload(),
|
||||
id.to_string(),
|
||||
submission.name(),
|
||||
)
|
||||
.traced()
|
||||
.fetch_one(&mut *self.conn)
|
||||
.execute(&mut *self.conn)
|
||||
.await?;
|
||||
|
||||
let id = res
|
||||
.parse()
|
||||
.map_err(|source| DatabaseInconsistencyError::on("apalis.push_job").source(source))?;
|
||||
|
||||
tracing::Span::current().record("job.id", tracing::field::display(&id));
|
||||
DatabaseError::ensure_affected_rows(&res, 1)?;
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
@@ -193,4 +193,13 @@ pub(crate) use self::errors::DatabaseInconsistencyError;
|
||||
pub use self::{errors::DatabaseError, repository::PgRepository, tracing::ExecuteExt};
|
||||
|
||||
/// Embedded migrations, allowing them to run on startup
|
||||
pub static MIGRATOR: Migrator = sqlx::migrate!();
|
||||
pub static MIGRATOR: Migrator = {
|
||||
// XXX: The macro does not let us ignore missing migrations, so we have to do it
|
||||
// like this. See https://github.com/launchbadge/sqlx/issues/1788
|
||||
let mut m = sqlx::migrate!();
|
||||
|
||||
// We manually removed some migrations because they made us depend on the
|
||||
// `pgcrypto` extension. See: https://github.com/matrix-org/matrix-authentication-service/issues/1557
|
||||
m.ignore_missing = true;
|
||||
m
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user