1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-09-11 22:30:47 +03:00
Files
authentication-service/crates/storage-pg/migrations/20220530084123_jobs_workers.sql
2023-04-14 10:22:22 +02:00

84 lines
2.8 KiB
PL/PgSQL

CREATE SCHEMA apalis;
CREATE TABLE IF NOT EXISTS apalis.workers (
id TEXT NOT NULL,
worker_type TEXT NOT NULL,
storage_name TEXT NOT NULL,
layers TEXT NOT NULL DEFAULT '',
last_seen timestamptz not null default now()
);
CREATE INDEX IF NOT EXISTS Idx ON apalis.workers(id);
CREATE UNIQUE INDEX IF NOT EXISTS unique_worker_id ON apalis.workers (id);
CREATE INDEX IF NOT EXISTS WTIdx ON apalis.workers(worker_type);
CREATE INDEX IF NOT EXISTS LSIdx ON apalis.workers(last_seen);
CREATE TABLE IF NOT EXISTS apalis.jobs (
job JSONB NOT NULL,
id TEXT NOT NULL,
job_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'Pending',
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 25,
run_at timestamptz NOT NULL default now(),
last_error TEXT,
lock_at timestamptz,
lock_by TEXT,
done_at timestamptz,
CONSTRAINT fk_worker_lock_by FOREIGN KEY(lock_by) REFERENCES apalis.workers(id)
);
CREATE INDEX IF NOT EXISTS TIdx ON apalis.jobs(id);
CREATE INDEX IF NOT EXISTS SIdx ON apalis.jobs(status);
CREATE UNIQUE INDEX IF NOT EXISTS unique_job_id ON apalis.jobs (id);
CREATE INDEX IF NOT EXISTS LIdx ON apalis.jobs(lock_by);
CREATE INDEX IF NOT EXISTS JTIdx ON apalis.jobs(job_type);
CREATE OR replace FUNCTION apalis.get_job(
worker_id TEXT,
v_job_type TEXT
) returns apalis.jobs AS $$
DECLARE
v_job_id text;
v_job_row apalis.jobs;
BEGIN
SELECT id, job_type
INTO v_job_id, v_job_type
FROM apalis.jobs
WHERE status = 'Pending'
AND run_at < now()
AND job_type = v_job_type
ORDER BY run_at ASC limit 1 FOR UPDATE skip LOCKED;
IF v_job_id IS NULL THEN
RETURN NULL;
END IF;
UPDATE apalis.jobs
SET
status = 'Running',
lock_by = worker_id,
lock_at = now()
WHERE id = v_job_id
returning * INTO v_job_row;
RETURN v_job_row;
END;
$$ LANGUAGE plpgsql volatile;
CREATE FUNCTION apalis.notify_new_jobs() returns trigger as $$
BEGIN
perform pg_notify('apalis::job', 'insert');
return new;
END;
$$ language plpgsql;
CREATE TRIGGER notify_workers after insert on apalis.jobs for each statement execute procedure apalis.notify_new_jobs();