You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-07-29 22:01:14 +03:00
Ability to run migrations on startup
Also adds a bunch of logging information on startup
This commit is contained in:
@ -22,6 +22,7 @@ use clap::Clap;
|
|||||||
use hyper::{header, Server};
|
use hyper::{header, Server};
|
||||||
use mas_config::RootConfig;
|
use mas_config::RootConfig;
|
||||||
use mas_core::{
|
use mas_core::{
|
||||||
|
storage::MIGRATOR,
|
||||||
tasks::{self, TaskQueue},
|
tasks::{self, TaskQueue},
|
||||||
templates::Templates,
|
templates::Templates,
|
||||||
};
|
};
|
||||||
@ -32,11 +33,16 @@ use tower_http::{
|
|||||||
trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer},
|
trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer},
|
||||||
LatencyUnit,
|
LatencyUnit,
|
||||||
};
|
};
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
use super::RootCommand;
|
use super::RootCommand;
|
||||||
|
|
||||||
#[derive(Clap, Debug, Default)]
|
#[derive(Clap, Debug, Default)]
|
||||||
pub(super) struct ServerCommand;
|
pub(super) struct ServerCommand {
|
||||||
|
/// Automatically apply pending migrations
|
||||||
|
#[clap(long)]
|
||||||
|
migrate: bool,
|
||||||
|
}
|
||||||
|
|
||||||
impl ServerCommand {
|
impl ServerCommand {
|
||||||
pub async fn run(&self, root: &RootCommand) -> anyhow::Result<()> {
|
pub async fn run(&self, root: &RootCommand) -> anyhow::Result<()> {
|
||||||
@ -48,6 +54,19 @@ impl ServerCommand {
|
|||||||
// Connect to the database
|
// Connect to the database
|
||||||
let pool = config.database.connect().await?;
|
let pool = config.database.connect().await?;
|
||||||
|
|
||||||
|
if self.migrate {
|
||||||
|
info!("Running pending migrations");
|
||||||
|
MIGRATOR
|
||||||
|
.run(&pool)
|
||||||
|
.await
|
||||||
|
.context("could not run migrations")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Starting task scheduler");
|
||||||
|
let queue = TaskQueue::default();
|
||||||
|
queue.recuring(Duration::from_secs(15), tasks::cleanup_expired(&pool));
|
||||||
|
queue.start();
|
||||||
|
|
||||||
// Load and compile the templates
|
// Load and compile the templates
|
||||||
// TODO: custom template path from the config
|
// TODO: custom template path from the config
|
||||||
let templates = Templates::load(None, true).context("could not load templates")?;
|
let templates = Templates::load(None, true).context("could not load templates")?;
|
||||||
@ -55,10 +74,6 @@ impl ServerCommand {
|
|||||||
// Start the server
|
// Start the server
|
||||||
let root = mas_core::handlers::root(&pool, &templates, &config);
|
let root = mas_core::handlers::root(&pool, &templates, &config);
|
||||||
|
|
||||||
let queue = TaskQueue::default();
|
|
||||||
queue.recuring(Duration::from_secs(15), tasks::cleanup_expired(&pool));
|
|
||||||
queue.start();
|
|
||||||
|
|
||||||
let warp_service = warp::service(root);
|
let warp_service = warp::service(root);
|
||||||
|
|
||||||
let service = ServiceBuilder::new()
|
let service = ServiceBuilder::new()
|
||||||
@ -83,7 +98,7 @@ impl ServerCommand {
|
|||||||
]))
|
]))
|
||||||
.service(warp_service);
|
.service(warp_service);
|
||||||
|
|
||||||
tracing::info!("Listening on http://{}", listener.local_addr().unwrap());
|
info!("Listening on http://{}", listener.local_addr().unwrap());
|
||||||
|
|
||||||
Server::from_tcp(listener)?
|
Server::from_tcp(listener)?
|
||||||
.serve(Shared::new(service))
|
.serve(Shared::new(service))
|
||||||
|
@ -20,6 +20,12 @@ use super::Task;
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct CleanupExpired(Pool<Postgres>);
|
struct CleanupExpired(Pool<Postgres>);
|
||||||
|
|
||||||
|
impl std::fmt::Debug for CleanupExpired {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("CleanupExpired").finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl Task for CleanupExpired {
|
impl Task for CleanupExpired {
|
||||||
async fn run(&self) {
|
async fn run(&self) {
|
||||||
|
@ -20,13 +20,14 @@ use tokio::{
|
|||||||
time::Interval,
|
time::Interval,
|
||||||
};
|
};
|
||||||
use tokio_stream::wrappers::IntervalStream;
|
use tokio_stream::wrappers::IntervalStream;
|
||||||
|
use tracing::debug;
|
||||||
|
|
||||||
mod database;
|
mod database;
|
||||||
|
|
||||||
pub use self::database::cleanup_expired;
|
pub use self::database::cleanup_expired;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait Task: Send + Sync + 'static {
|
pub trait Task: std::fmt::Debug + Send + Sync + 'static {
|
||||||
async fn run(&self);
|
async fn run(&self);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,7 +94,8 @@ impl TaskQueue {
|
|||||||
queue.schedule(task).await;
|
queue.schedule(task).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn recuring(&self, every: Duration, task: impl Task + Clone) {
|
pub fn recuring(&self, every: Duration, task: impl Task + Clone + std::fmt::Debug) {
|
||||||
|
debug!(?task, period = every.as_secs(), "Scheduling recuring task");
|
||||||
let queue = self.inner.clone();
|
let queue = self.inner.clone();
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
queue.recuring(tokio::time::interval(every), task).await;
|
queue.recuring(tokio::time::interval(every), task).await;
|
||||||
|
Reference in New Issue
Block a user