You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-09-11 22:30:47 +03:00
84 lines
2.8 KiB
PL/PgSQL
84 lines
2.8 KiB
PL/PgSQL
CREATE SCHEMA apalis;
|
|
|
|
CREATE TABLE IF NOT EXISTS apalis.workers (
|
|
id TEXT NOT NULL,
|
|
worker_type TEXT NOT NULL,
|
|
storage_name TEXT NOT NULL,
|
|
layers TEXT NOT NULL DEFAULT '',
|
|
last_seen timestamptz not null default now()
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS Idx ON apalis.workers(id);
|
|
|
|
CREATE UNIQUE INDEX IF NOT EXISTS unique_worker_id ON apalis.workers (id);
|
|
|
|
CREATE INDEX IF NOT EXISTS WTIdx ON apalis.workers(worker_type);
|
|
|
|
CREATE INDEX IF NOT EXISTS LSIdx ON apalis.workers(last_seen);
|
|
|
|
CREATE TABLE IF NOT EXISTS apalis.jobs (
|
|
job JSONB NOT NULL,
|
|
id TEXT NOT NULL,
|
|
job_type TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'Pending',
|
|
attempts INTEGER NOT NULL DEFAULT 0,
|
|
max_attempts INTEGER NOT NULL DEFAULT 25,
|
|
run_at timestamptz NOT NULL default now(),
|
|
last_error TEXT,
|
|
lock_at timestamptz,
|
|
lock_by TEXT,
|
|
done_at timestamptz,
|
|
CONSTRAINT fk_worker_lock_by FOREIGN KEY(lock_by) REFERENCES apalis.workers(id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS TIdx ON apalis.jobs(id);
|
|
|
|
CREATE INDEX IF NOT EXISTS SIdx ON apalis.jobs(status);
|
|
|
|
CREATE UNIQUE INDEX IF NOT EXISTS unique_job_id ON apalis.jobs (id);
|
|
|
|
CREATE INDEX IF NOT EXISTS LIdx ON apalis.jobs(lock_by);
|
|
|
|
CREATE INDEX IF NOT EXISTS JTIdx ON apalis.jobs(job_type);
|
|
|
|
CREATE OR replace FUNCTION apalis.get_job(
|
|
worker_id TEXT,
|
|
v_job_type TEXT
|
|
) returns apalis.jobs AS $$
|
|
DECLARE
|
|
v_job_id text;
|
|
v_job_row apalis.jobs;
|
|
BEGIN
|
|
SELECT id, job_type
|
|
INTO v_job_id, v_job_type
|
|
FROM apalis.jobs
|
|
WHERE status = 'Pending'
|
|
AND run_at < now()
|
|
AND job_type = v_job_type
|
|
ORDER BY run_at ASC limit 1 FOR UPDATE skip LOCKED;
|
|
|
|
IF v_job_id IS NULL THEN
|
|
RETURN NULL;
|
|
END IF;
|
|
|
|
UPDATE apalis.jobs
|
|
SET
|
|
status = 'Running',
|
|
lock_by = worker_id,
|
|
lock_at = now()
|
|
WHERE id = v_job_id
|
|
returning * INTO v_job_row;
|
|
RETURN v_job_row;
|
|
END;
|
|
$$ LANGUAGE plpgsql volatile;
|
|
|
|
CREATE FUNCTION apalis.notify_new_jobs() returns trigger as $$
|
|
BEGIN
|
|
perform pg_notify('apalis::job', 'insert');
|
|
return new;
|
|
END;
|
|
$$ language plpgsql;
|
|
|
|
CREATE TRIGGER notify_workers after insert on apalis.jobs for each statement execute procedure apalis.notify_new_jobs();
|
|
|