1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-11-20 12:02:22 +03:00

WIP: use apalis to schedule jobs

This commit is contained in:
Quentin Gliech
2023-03-31 12:29:26 +02:00
parent 43bcaf5308
commit cdd535ddc4
21 changed files with 782 additions and 250 deletions

View File

@@ -1,4 +1,4 @@
// Copyright 2021 The Matrix.org Foundation C.I.C.
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -14,50 +14,66 @@
//! Database-related tasks
use mas_storage::{oauth2::OAuth2AccessTokenRepository, Repository, RepositoryAccess, SystemClock};
use mas_storage_pg::PgRepository;
use sqlx::{Pool, Postgres};
use tracing::{debug, error, info};
use std::str::FromStr;
use super::Task;
use apalis_core::{
builder::{WorkerBuilder, WorkerFactory},
context::JobContext,
executor::TokioExecutor,
job::Job,
job_fn::job_fn,
monitor::Monitor,
};
use apalis_cron::CronStream;
use chrono::{DateTime, Utc};
use mas_storage::{oauth2::OAuth2AccessTokenRepository, RepositoryAccess};
use tracing::{debug, info};
#[derive(Clone)]
struct CleanupExpired(Pool<Postgres>, SystemClock);
use crate::{JobContextExt, State};
impl std::fmt::Debug for CleanupExpired {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CleanupExpired").finish_non_exhaustive()
#[derive(Default, Clone)]
pub struct CleanupExpiredTokensJob {
scheduled: DateTime<Utc>,
}
impl From<DateTime<Utc>> for CleanupExpiredTokensJob {
fn from(scheduled: DateTime<Utc>) -> Self {
Self { scheduled }
}
}
#[async_trait::async_trait]
impl Task for CleanupExpired {
async fn run(&self) {
let res = async move {
let mut repo = PgRepository::from_pool(&self.0).await?.boxed();
let res = repo.oauth2_access_token().cleanup_expired(&self.1).await;
repo.save().await?;
res
}
.await;
impl Job for CleanupExpiredTokensJob {
const NAME: &'static str = "cleanup-expired-tokens";
}
match res {
Ok(0) => {
debug!("no token to clean up");
}
Ok(count) => {
info!(count, "cleaned up expired tokens");
}
Err(error) => {
error!(?error, "failed to cleanup expired tokens");
}
}
pub async fn cleanup_expired_tokens(
job: CleanupExpiredTokensJob,
ctx: JobContext,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
debug!("cleanup expired tokens job scheduled at {}", job.scheduled);
let state = ctx.state();
let clock = state.clock();
let mut repo = state.repository().await?;
let count = repo.oauth2_access_token().cleanup_expired(&clock).await?;
repo.save().await?;
if count == 0 {
debug!("no token to clean up");
} else {
info!(count, "cleaned up expired tokens");
}
Ok(())
}
/// Cleanup expired tokens
#[must_use]
pub fn cleanup_expired(pool: &Pool<Postgres>) -> impl Task + Clone {
// XXX: the clock should come from somewhere else
CleanupExpired(pool.clone(), SystemClock::default())
pub(crate) fn register(monitor: Monitor<TokioExecutor>, state: &State) -> Monitor<TokioExecutor> {
let schedule = apalis_cron::Schedule::from_str("*/15 * * * * *").unwrap();
let worker = WorkerBuilder::new("cleanup-expired-tokens")
.stream(CronStream::new(schedule).to_stream())
.layer(state.inject())
.build(job_fn(cleanup_expired_tokens));
monitor.register(worker)
}