From 5b4fee15e7f0dbcfc6a19950dee7817b047e5ef1 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 13 Apr 2023 16:41:38 +0200 Subject: [PATCH] Upgrade to apalis-0.4.0-alpha.5 --- Cargo.lock | 16 ++++++----- Cargo.toml | 14 +--------- crates/cli/Cargo.toml | 2 +- .../20230408110421_drop_old_push_job.sql | 8 ++++++ .../20230408234928_add_get_jobs_fn_.sql | 27 +++++++++++++++++++ crates/storage/Cargo.toml | 2 +- crates/storage/src/job.rs | 10 +++++-- crates/tasks/Cargo.toml | 6 ++--- crates/tasks/src/database.rs | 5 ++-- crates/tasks/src/email.rs | 5 ++-- crates/tasks/src/lib.rs | 2 +- crates/tasks/src/matrix.rs | 11 ++++---- 12 files changed, 68 insertions(+), 40 deletions(-) create mode 100644 crates/storage-pg/migrations/20230408110421_drop_old_push_job.sql create mode 100644 crates/storage-pg/migrations/20230408234928_add_get_jobs_fn_.sql diff --git a/Cargo.lock b/Cargo.lock index 9f6c5e96..b2d54a28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,8 +123,9 @@ checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" [[package]] name = "apalis-core" -version = "0.4.0-alpha.4" -source = "git+https://github.com/geofmureithi/apalis.git?rev=1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8#1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" +version = "0.4.0-alpha.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23f04b6479bb7cf688641d70a948384dbac83a3af98e217f389e835713884ccd" dependencies = [ "async-stream", "async-trait", @@ -135,7 +136,6 @@ dependencies = [ "log", "pin-project-lite", "serde", - "serde_json", "strum", "thiserror", "tokio", @@ -146,8 +146,9 @@ dependencies = [ [[package]] name = "apalis-cron" -version = "0.4.0-alpha.4" -source = "git+https://github.com/geofmureithi/apalis.git?rev=1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8#1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" +version = "0.4.0-alpha.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbe7a486a93e7965f19cf6b3738161aeb3079ca00f80b81557579fd720ce86ae" dependencies = [ "apalis-core", "async-stream", @@ -160,8 +161,9 @@ dependencies = [ [[package]] name = "apalis-sql" -version = "0.4.0-alpha.4" -source = "git+https://github.com/geofmureithi/apalis.git?rev=1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8#1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" +version = "0.4.0-alpha.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f5876546628bfece76812f305a242b76a78ebe5868317bd15ca2446072094ba" dependencies = [ "apalis-core", "async-stream", diff --git a/Cargo.toml b/Cargo.toml index 8cd506d9..0c6f587b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,16 +11,4 @@ opt-level = 3 # Until https://github.com/dylanhart/ulid-rs/pull/56 gets released [patch.crates-io.ulid] git = "https://github.com/dylanhart/ulid-rs.git" -rev = "0b9295c2db2114cd87aa19abcc1fc00c16b272db" - -[patch.crates-io.apalis-core] -git = "https://github.com/geofmureithi/apalis.git" -rev = "1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" - -[patch.crates-io.apalis-sql] -git = "https://github.com/geofmureithi/apalis.git" -rev = "1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" - -[patch.crates-io.apalis-cron] -git = "https://github.com/geofmureithi/apalis.git" -rev = "1cc600571c0dc4f5bd6cbb90fa9eef7f34888ba8" +rev = "0b9295c2db2114cd87aa19abcc1fc00c16b272db" \ No newline at end of file diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 3b838be5..1d3110cc 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" license = "Apache-2.0" [dependencies] -apalis-core = "0.4.0-alpha.4" +apalis-core = "=0.4.0-alpha.5" anyhow = "1.0.69" atty = "0.2.14" axum = "0.6.11" diff --git a/crates/storage-pg/migrations/20230408110421_drop_old_push_job.sql b/crates/storage-pg/migrations/20230408110421_drop_old_push_job.sql new file mode 100644 index 00000000..7d799b9b --- /dev/null +++ b/crates/storage-pg/migrations/20230408110421_drop_old_push_job.sql @@ -0,0 +1,8 @@ +DROP FUNCTION IF EXISTS apalis.push_job( + job_type text, + job json, + job_id text, + status text, + run_at timestamptz, + max_attempts integer +); \ No newline at end of file diff --git a/crates/storage-pg/migrations/20230408234928_add_get_jobs_fn_.sql b/crates/storage-pg/migrations/20230408234928_add_get_jobs_fn_.sql new file mode 100644 index 00000000..660eca47 --- /dev/null +++ b/crates/storage-pg/migrations/20230408234928_add_get_jobs_fn_.sql @@ -0,0 +1,27 @@ +DROP FUNCTION apalis.get_job( + worker_id TEXT, + v_job_type TEXT + ); + +CREATE OR replace FUNCTION apalis.get_jobs( + worker_id TEXT, + v_job_type TEXT, + v_job_count integer DEFAULT 5 :: integer + ) returns setof apalis.jobs AS $$ BEGIN RETURN QUERY +UPDATE apalis.jobs +SET status = 'Running', + lock_by = worker_id, + lock_at = now() +WHERE id IN ( + SELECT id + FROM apalis.jobs + WHERE status = 'Pending' + AND run_at < now() + AND job_type = v_job_type + ORDER BY run_at ASC + limit v_job_count FOR + UPDATE skip LOCKED + ) +returning *; +END; +$$ LANGUAGE plpgsql volatile; \ No newline at end of file diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 195c0325..aeee0cef 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -11,7 +11,7 @@ chrono = "0.4.24" thiserror = "1.0.39" futures-util = "0.3.27" -apalis-core = { version = "0.4.0-alpha.4", features = ["tokio-comp"] } +apalis-core = { version = "=0.4.0-alpha.5", features = ["tokio-comp"] } opentelemetry = "0.18.0" rand_core = "0.6.4" serde = "1.0.159" diff --git a/crates/storage/src/job.rs b/crates/storage/src/job.rs index 3fdcc4ea..8a5d3564 100644 --- a/crates/storage/src/job.rs +++ b/crates/storage/src/job.rs @@ -185,7 +185,10 @@ pub trait JobRepositoryExt { /// # Errors /// /// Returns [`Self::Error`] if the underlying repository fails - async fn schedule_job(&mut self, job: J) -> Result; + async fn schedule_job( + &mut self, + job: J, + ) -> Result; } #[async_trait] @@ -202,7 +205,10 @@ where job.name = J::NAME, ), )] - async fn schedule_job(&mut self, job: J) -> Result { + async fn schedule_job( + &mut self, + job: J, + ) -> Result { let span = tracing::Span::current(); let ctx = span.context(); let span = ctx.span(); diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 33cdbe7e..6a064b20 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -7,9 +7,9 @@ license = "apache-2.0" [dependencies] anyhow = "1.0.70" -apalis-core = { version = "0.4.0-alpha.4", features = ["extensions", "tokio-comp"] } -apalis-cron = "0.4.0-alpha.4" -apalis-sql = { version = "0.4.0-alpha.4", features = ["postgres", "tokio-comp"] } +apalis-core = { version = "=0.4.0-alpha.5", features = ["extensions", "tokio-comp"] } +apalis-cron = "=0.4.0-alpha.5" +apalis-sql = { version = "=0.4.0-alpha.5", features = ["postgres", "tokio-comp"] } async-trait = "0.1.66" chrono = "0.4.24" rand = "0.8.5" diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index 7f5e0eb3..1b072616 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -17,11 +17,10 @@ use std::str::FromStr; use apalis_core::{ - builder::{WorkerBuilder, WorkerFactory}, + builder::{WorkerBuilder, WorkerFactory, WorkerFactoryFn}, context::JobContext, executor::TokioExecutor, job::Job, - job_fn::job_fn, monitor::Monitor, }; use apalis_cron::CronStream; @@ -78,7 +77,7 @@ pub(crate) fn register( let worker = WorkerBuilder::new(worker_name) .stream(CronStream::new(schedule).to_stream()) .layer(state.inject()) - .build(job_fn(cleanup_expired_tokens)); + .build_fn(cleanup_expired_tokens); monitor.register(worker) } diff --git a/crates/tasks/src/email.rs b/crates/tasks/src/email.rs index df9cfbc7..89caca44 100644 --- a/crates/tasks/src/email.rs +++ b/crates/tasks/src/email.rs @@ -14,11 +14,10 @@ use anyhow::Context; use apalis_core::{ - builder::{WorkerBuilder, WorkerFactory}, + builder::{WorkerBuilder, WorkerFactory, WorkerFactoryFn}, context::JobContext, executor::TokioExecutor, job::Job, - job_fn::job_fn, monitor::Monitor, storage::builder::WithStorage, }; @@ -101,6 +100,6 @@ pub(crate) fn register( .layer(state.inject()) .layer(TracingLayer::new()) .with_storage(storage) - .build(job_fn(verify_email)); + .build_fn(verify_email); monitor.register(worker) } diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 7474c745..a115752a 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -142,7 +142,7 @@ pub fn init( homeserver, http_client_factory.clone(), ); - let monitor = Monitor::new(); + let monitor = Monitor::new().executor(TokioExecutor::new()); let monitor = self::database::register(name, monitor, &state); let monitor = self::email::register(name, monitor, &state); let monitor = self::matrix::register(name, monitor, &state); diff --git a/crates/tasks/src/matrix.rs b/crates/tasks/src/matrix.rs index 5590b02c..4746025c 100644 --- a/crates/tasks/src/matrix.rs +++ b/crates/tasks/src/matrix.rs @@ -14,11 +14,10 @@ use anyhow::Context; use apalis_core::{ - builder::{WorkerBuilder, WorkerFactory}, + builder::{WorkerBuilder, WorkerFactory, WorkerFactoryFn}, context::JobContext, executor::TokioExecutor, job::Job, - job_fn::job_fn, monitor::Monitor, storage::builder::WithStorage, }; @@ -101,7 +100,7 @@ async fn provision_user( let state = ctx.state(); let matrix = state.matrix_connection(); let mut client = state - .http_client("matrx.provision_user") + .http_client("matrix.provision_user") .await? .request_bytes_to_body() .json_request(); @@ -305,7 +304,7 @@ pub(crate) fn register( .layer(state.inject()) .layer(TracingLayer::new()) .with_storage(storage) - .build(job_fn(provision_user)); + .build_fn(provision_user); let storage = state.store(); let worker_name = format!("{job}-{suffix}", job = ProvisionDeviceJob::NAME); @@ -313,7 +312,7 @@ pub(crate) fn register( .layer(state.inject()) .layer(TracingLayer::new()) .with_storage(storage) - .build(job_fn(provision_device)); + .build_fn(provision_device); let storage = state.store(); let worker_name = format!("{job}-{suffix}", job = DeleteDeviceJob::NAME); @@ -321,7 +320,7 @@ pub(crate) fn register( .layer(state.inject()) .layer(TracingLayer::new()) .with_storage(storage) - .build(job_fn(delete_device)); + .build_fn(delete_device); monitor .register(provision_user_worker)