1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-07-31 09:24:31 +03:00

Upgrade to apalis-0.4.0-alpha.5

This commit is contained in:
Quentin Gliech
2023-04-13 16:41:38 +02:00
parent 1974786209
commit 5b4fee15e7
12 changed files with 68 additions and 40 deletions

16
Cargo.lock generated
View File

@ -123,8 +123,9 @@ checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4"
[[package]]
name = "apalis-core"
version = "0.4.0-alpha.4"
source = "git+https://github.com/geofmureithi/apalis.git?rev=1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8#1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8"
version = "0.4.0-alpha.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23f04b6479bb7cf688641d70a948384dbac83a3af98e217f389e835713884ccd"
dependencies = [
"async-stream",
"async-trait",
@ -135,7 +136,6 @@ dependencies = [
"log",
"pin-project-lite",
"serde",
"serde_json",
"strum",
"thiserror",
"tokio",
@ -146,8 +146,9 @@ dependencies = [
[[package]]
name = "apalis-cron"
version = "0.4.0-alpha.4"
source = "git+https://github.com/geofmureithi/apalis.git?rev=1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8#1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8"
version = "0.4.0-alpha.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbe7a486a93e7965f19cf6b3738161aeb3079ca00f80b81557579fd720ce86ae"
dependencies = [
"apalis-core",
"async-stream",
@ -160,8 +161,9 @@ dependencies = [
[[package]]
name = "apalis-sql"
version = "0.4.0-alpha.4"
source = "git+https://github.com/geofmureithi/apalis.git?rev=1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8#1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8"
version = "0.4.0-alpha.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f5876546628bfece76812f305a242b76a78ebe5868317bd15ca2446072094ba"
dependencies = [
"apalis-core",
"async-stream",

View File

@ -12,15 +12,3 @@ opt-level = 3
[patch.crates-io.ulid]
git = "https://github.com/dylanhart/ulid-rs.git"
rev = "0b9295c2db2114cd87aa19abcc1fc00c16b272db"
[patch.crates-io.apalis-core]
git = "https://github.com/geofmureithi/apalis.git"
rev = "1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8"
[patch.crates-io.apalis-sql]
git = "https://github.com/geofmureithi/apalis.git"
rev = "1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8"
[patch.crates-io.apalis-cron]
git = "https://github.com/geofmureithi/apalis.git"
rev = "1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8"

View File

@ -6,7 +6,7 @@ edition = "2021"
license = "Apache-2.0"
[dependencies]
apalis-core = "0.4.0-alpha.4"
apalis-core = "=0.4.0-alpha.5"
anyhow = "1.0.69"
atty = "0.2.14"
axum = "0.6.11"

View File

@ -0,0 +1,8 @@
DROP FUNCTION IF EXISTS apalis.push_job(
job_type text,
job json,
job_id text,
status text,
run_at timestamptz,
max_attempts integer
);

View File

@ -0,0 +1,27 @@
DROP FUNCTION apalis.get_job(
worker_id TEXT,
v_job_type TEXT
);
CREATE OR replace FUNCTION apalis.get_jobs(
worker_id TEXT,
v_job_type TEXT,
v_job_count integer DEFAULT 5 :: integer
) returns setof apalis.jobs AS $$ BEGIN RETURN QUERY
UPDATE apalis.jobs
SET status = 'Running',
lock_by = worker_id,
lock_at = now()
WHERE id IN (
SELECT id
FROM apalis.jobs
WHERE status = 'Pending'
AND run_at < now()
AND job_type = v_job_type
ORDER BY run_at ASC
limit v_job_count FOR
UPDATE skip LOCKED
)
returning *;
END;
$$ LANGUAGE plpgsql volatile;

View File

@ -11,7 +11,7 @@ chrono = "0.4.24"
thiserror = "1.0.39"
futures-util = "0.3.27"
apalis-core = { version = "0.4.0-alpha.4", features = ["tokio-comp"] }
apalis-core = { version = "=0.4.0-alpha.5", features = ["tokio-comp"] }
opentelemetry = "0.18.0"
rand_core = "0.6.4"
serde = "1.0.159"

View File

@ -185,7 +185,10 @@ pub trait JobRepositoryExt {
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn schedule_job<J: Job + Serialize>(&mut self, job: J) -> Result<JobId, Self::Error>;
async fn schedule_job<J: Job + Serialize + Send>(
&mut self,
job: J,
) -> Result<JobId, Self::Error>;
}
#[async_trait]
@ -202,7 +205,10 @@ where
job.name = J::NAME,
),
)]
async fn schedule_job<J: Job + Serialize>(&mut self, job: J) -> Result<JobId, Self::Error> {
async fn schedule_job<J: Job + Serialize + Send>(
&mut self,
job: J,
) -> Result<JobId, Self::Error> {
let span = tracing::Span::current();
let ctx = span.context();
let span = ctx.span();

View File

@ -7,9 +7,9 @@ license = "apache-2.0"
[dependencies]
anyhow = "1.0.70"
apalis-core = { version = "0.4.0-alpha.4", features = ["extensions", "tokio-comp"] }
apalis-cron = "0.4.0-alpha.4"
apalis-sql = { version = "0.4.0-alpha.4", features = ["postgres", "tokio-comp"] }
apalis-core = { version = "=0.4.0-alpha.5", features = ["extensions", "tokio-comp"] }
apalis-cron = "=0.4.0-alpha.5"
apalis-sql = { version = "=0.4.0-alpha.5", features = ["postgres", "tokio-comp"] }
async-trait = "0.1.66"
chrono = "0.4.24"
rand = "0.8.5"

View File

@ -17,11 +17,10 @@
use std::str::FromStr;
use apalis_core::{
builder::{WorkerBuilder, WorkerFactory},
builder::{WorkerBuilder, WorkerFactory, WorkerFactoryFn},
context::JobContext,
executor::TokioExecutor,
job::Job,
job_fn::job_fn,
monitor::Monitor,
};
use apalis_cron::CronStream;
@ -78,7 +77,7 @@ pub(crate) fn register(
let worker = WorkerBuilder::new(worker_name)
.stream(CronStream::new(schedule).to_stream())
.layer(state.inject())
.build(job_fn(cleanup_expired_tokens));
.build_fn(cleanup_expired_tokens);
monitor.register(worker)
}

View File

@ -14,11 +14,10 @@
use anyhow::Context;
use apalis_core::{
builder::{WorkerBuilder, WorkerFactory},
builder::{WorkerBuilder, WorkerFactory, WorkerFactoryFn},
context::JobContext,
executor::TokioExecutor,
job::Job,
job_fn::job_fn,
monitor::Monitor,
storage::builder::WithStorage,
};
@ -101,6 +100,6 @@ pub(crate) fn register(
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(verify_email));
.build_fn(verify_email);
monitor.register(worker)
}

View File

@ -142,7 +142,7 @@ pub fn init(
homeserver,
http_client_factory.clone(),
);
let monitor = Monitor::new();
let monitor = Monitor::new().executor(TokioExecutor::new());
let monitor = self::database::register(name, monitor, &state);
let monitor = self::email::register(name, monitor, &state);
let monitor = self::matrix::register(name, monitor, &state);

View File

@ -14,11 +14,10 @@
use anyhow::Context;
use apalis_core::{
builder::{WorkerBuilder, WorkerFactory},
builder::{WorkerBuilder, WorkerFactory, WorkerFactoryFn},
context::JobContext,
executor::TokioExecutor,
job::Job,
job_fn::job_fn,
monitor::Monitor,
storage::builder::WithStorage,
};
@ -101,7 +100,7 @@ async fn provision_user(
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state
.http_client("matrx.provision_user")
.http_client("matrix.provision_user")
.await?
.request_bytes_to_body()
.json_request();
@ -305,7 +304,7 @@ pub(crate) fn register(
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(provision_user));
.build_fn(provision_user);
let storage = state.store();
let worker_name = format!("{job}-{suffix}", job = ProvisionDeviceJob::NAME);
@ -313,7 +312,7 @@ pub(crate) fn register(
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(provision_device));
.build_fn(provision_device);
let storage = state.store();
let worker_name = format!("{job}-{suffix}", job = DeleteDeviceJob::NAME);
@ -321,7 +320,7 @@ pub(crate) fn register(
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(delete_device));
.build_fn(delete_device);
monitor
.register(provision_user_worker)