You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-08-07 17:03:01 +03:00
Refactor and simplify the templates hot-reload logic
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2701,7 +2701,6 @@ dependencies = [
|
|||||||
"camino",
|
"camino",
|
||||||
"clap",
|
"clap",
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"futures-util",
|
|
||||||
"hyper",
|
"hyper",
|
||||||
"indoc",
|
"indoc",
|
||||||
"itertools",
|
"itertools",
|
||||||
|
@@ -12,7 +12,6 @@ axum = "0.6.1"
|
|||||||
camino = "1.1.1"
|
camino = "1.1.1"
|
||||||
clap = { version = "4.0.29", features = ["derive"] }
|
clap = { version = "4.0.29", features = ["derive"] }
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
futures-util = "0.3.25"
|
|
||||||
hyper = { version = "0.14.23", features = ["full"] }
|
hyper = { version = "0.14.23", features = ["full"] }
|
||||||
itertools = "0.10.5"
|
itertools = "0.10.5"
|
||||||
listenfd = "1.0.0"
|
listenfd = "1.0.0"
|
||||||
|
@@ -16,7 +16,6 @@ use std::{sync::Arc, time::Duration};
|
|||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use futures_util::stream::{StreamExt, TryStreamExt};
|
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use mas_config::RootConfig;
|
use mas_config::RootConfig;
|
||||||
use mas_handlers::{AppState, HttpClientFactory, MatrixHomeserver};
|
use mas_handlers::{AppState, HttpClientFactory, MatrixHomeserver};
|
||||||
@@ -24,13 +23,12 @@ use mas_listener::{server::Server, shutdown::ShutdownStream};
|
|||||||
use mas_router::UrlBuilder;
|
use mas_router::UrlBuilder;
|
||||||
use mas_storage::MIGRATOR;
|
use mas_storage::MIGRATOR;
|
||||||
use mas_tasks::TaskQueue;
|
use mas_tasks::TaskQueue;
|
||||||
use mas_templates::Templates;
|
|
||||||
use tokio::signal::unix::SignalKind;
|
use tokio::signal::unix::SignalKind;
|
||||||
use tracing::{error, info, log::warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use crate::util::{
|
use crate::util::{
|
||||||
database_from_config, mailer_from_config, password_manager_from_config,
|
database_from_config, mailer_from_config, password_manager_from_config,
|
||||||
policy_factory_from_config, templates_from_config,
|
policy_factory_from_config, templates_from_config, watch_templates,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Parser, Debug, Default)]
|
#[derive(Parser, Debug, Default)]
|
||||||
@@ -44,70 +42,13 @@ pub(super) struct Options {
|
|||||||
watch: bool,
|
watch: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Watch for changes in the templates folders
|
|
||||||
async fn watch_templates(
|
|
||||||
client: &watchman_client::Client,
|
|
||||||
templates: &Templates,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
use watchman_client::{
|
|
||||||
fields::NameOnly,
|
|
||||||
pdu::{QueryResult, SubscribeRequest},
|
|
||||||
CanonicalPath, SubscriptionData,
|
|
||||||
};
|
|
||||||
|
|
||||||
let templates = templates.clone();
|
|
||||||
|
|
||||||
// Find which root we're supposed to watch
|
|
||||||
let root = templates.watch_root();
|
|
||||||
|
|
||||||
// For each root, create a subscription
|
|
||||||
let resolved = client
|
|
||||||
.resolve_root(CanonicalPath::canonicalize(root)?)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// TODO: we could subscribe to less, properly filter here
|
|
||||||
let (subscription, _) = client
|
|
||||||
.subscribe::<NameOnly>(&resolved, SubscribeRequest::default())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Create a stream out of that subscription
|
|
||||||
let fut = futures_util::stream::try_unfold(subscription, |mut sub| async move {
|
|
||||||
let next = sub.next().await?;
|
|
||||||
anyhow::Ok(Some((next, sub)))
|
|
||||||
})
|
|
||||||
.try_filter_map(|event| async move {
|
|
||||||
match event {
|
|
||||||
SubscriptionData::FilesChanged(QueryResult {
|
|
||||||
files: Some(files), ..
|
|
||||||
}) => {
|
|
||||||
let files: Vec<_> = files.into_iter().map(|f| f.name.into_inner()).collect();
|
|
||||||
Ok(Some(files))
|
|
||||||
}
|
|
||||||
_ => Ok(None),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.for_each(move |files| {
|
|
||||||
let templates = templates.clone();
|
|
||||||
async move {
|
|
||||||
info!(?files, "Files changed, reloading templates");
|
|
||||||
|
|
||||||
templates.clone().reload().await.unwrap_or_else(|err| {
|
|
||||||
error!(?err, "Error while reloading templates");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
tokio::spawn(fut);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Options {
|
impl Options {
|
||||||
#[allow(clippy::too_many_lines)]
|
#[allow(clippy::too_many_lines)]
|
||||||
pub async fn run(&self, root: &super::Options) -> anyhow::Result<()> {
|
pub async fn run(&self, root: &super::Options) -> anyhow::Result<()> {
|
||||||
let config: RootConfig = root.load_config()?;
|
let config: RootConfig = root.load_config()?;
|
||||||
|
|
||||||
// Connect to the database
|
// Connect to the database
|
||||||
|
info!("Conntecting to the database");
|
||||||
let pool = database_from_config(&config.database).await?;
|
let pool = database_from_config(&config.database).await?;
|
||||||
|
|
||||||
if self.migrate {
|
if self.migrate {
|
||||||
@@ -156,14 +97,7 @@ impl Options {
|
|||||||
|
|
||||||
// Watch for changes in templates if the --watch flag is present
|
// Watch for changes in templates if the --watch flag is present
|
||||||
if self.watch {
|
if self.watch {
|
||||||
let client = watchman_client::Connector::new()
|
watch_templates(&templates).await?;
|
||||||
.connect()
|
|
||||||
.await
|
|
||||||
.context("could not connect to watchman")?;
|
|
||||||
|
|
||||||
watch_templates(&client, &templates)
|
|
||||||
.await
|
|
||||||
.context("could not watch for templates changes")?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let graphql_schema = mas_handlers::graphql_schema(&pool);
|
let graphql_schema = mas_handlers::graphql_schema(&pool);
|
||||||
@@ -209,22 +143,18 @@ impl Options {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Display some informations about where we'll be serving connections
|
// Display some informations about where we'll be serving connections
|
||||||
let is_tls = config.tls.is_some();
|
let proto = if config.tls.is_some() { "https" } else { "http" };
|
||||||
let addresses: Vec<String> = listeners
|
let addresses= listeners
|
||||||
.iter()
|
.iter()
|
||||||
.map(|listener| {
|
.map(|listener| {
|
||||||
let addr = listener.local_addr();
|
if let Ok(addr) = listener.local_addr() {
|
||||||
let proto = if is_tls { "https" } else { "http" };
|
|
||||||
if let Ok(addr) = addr {
|
|
||||||
format!("{proto}://{addr:?}")
|
format!("{proto}://{addr:?}")
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!("Could not get local address for listener, something might be wrong!");
|
||||||
"Could not get local address for listener, something might be wrong!"
|
|
||||||
);
|
|
||||||
format!("{proto}://???")
|
format!("{proto}://???")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.join(", ");
|
||||||
|
|
||||||
let additional = if config.proxy_protocol {
|
let additional = if config.proxy_protocol {
|
||||||
"(with Proxy Protocol)"
|
"(with Proxy Protocol)"
|
||||||
@@ -233,7 +163,7 @@ impl Options {
|
|||||||
};
|
};
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Listening on {addresses:?} with resources {resources:?} {additional}",
|
"Listening on {addresses} with resources {resources:?} {additional}",
|
||||||
resources = &config.resources
|
resources = &config.resources
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@@ -28,7 +28,7 @@ use sqlx::{
|
|||||||
postgres::{PgConnectOptions, PgPoolOptions},
|
postgres::{PgConnectOptions, PgPoolOptions},
|
||||||
ConnectOptions, PgPool,
|
ConnectOptions, PgPool,
|
||||||
};
|
};
|
||||||
use tracing::log::LevelFilter;
|
use tracing::{error, info, log::LevelFilter};
|
||||||
|
|
||||||
pub async fn password_manager_from_config(
|
pub async fn password_manager_from_config(
|
||||||
config: &PasswordsConfig,
|
config: &PasswordsConfig,
|
||||||
@@ -168,3 +168,61 @@ pub async fn database_from_config(config: &DatabaseConfig) -> Result<PgPool, any
|
|||||||
.await
|
.await
|
||||||
.context("could not connect to the database")
|
.context("could not connect to the database")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Watch for changes in the templates folders
|
||||||
|
pub async fn watch_templates(templates: &Templates) -> anyhow::Result<()> {
|
||||||
|
use watchman_client::{prelude::*, SubscriptionData};
|
||||||
|
|
||||||
|
let client = Connector::new()
|
||||||
|
.connect()
|
||||||
|
.await
|
||||||
|
.context("could not connect to watchman")?;
|
||||||
|
|
||||||
|
let templates = templates.clone();
|
||||||
|
|
||||||
|
// Find which root we're supposed to watch
|
||||||
|
let root = templates.watch_root();
|
||||||
|
|
||||||
|
// Create a subscription on the root
|
||||||
|
let resolved = client
|
||||||
|
.resolve_root(CanonicalPath::canonicalize(root)?)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Only look for *.txt, *.html and *.subject files
|
||||||
|
let request = SubscribeRequest {
|
||||||
|
expression: Some(Expr::Suffix(vec![
|
||||||
|
"txt".into(),
|
||||||
|
"html".into(),
|
||||||
|
"subject".into(),
|
||||||
|
])),
|
||||||
|
..SubscribeRequest::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let (mut subscription, _) = client.subscribe::<NameOnly>(&resolved, request).await?;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let event = match subscription.next().await {
|
||||||
|
Ok(event) => event,
|
||||||
|
Err(error) => {
|
||||||
|
error!(%error, "Stopped watching templates because of an error in the watchman subscription");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let SubscriptionData::FilesChanged(QueryResult {
|
||||||
|
files: Some(files), ..
|
||||||
|
}) = event
|
||||||
|
{
|
||||||
|
let files: Vec<_> = files.into_iter().map(|f| f.name.into_inner()).collect();
|
||||||
|
info!(?files, "Files changed, reloading templates");
|
||||||
|
|
||||||
|
templates.clone().reload().await.unwrap_or_else(|err| {
|
||||||
|
error!(?err, "Error while reloading templates");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user