diff --git a/Cargo.lock b/Cargo.lock index 01ef99e2..b82a50f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2701,7 +2701,6 @@ dependencies = [ "camino", "clap", "dotenv", - "futures-util", "hyper", "indoc", "itertools", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index d46d5379..78dd1d5c 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -12,7 +12,6 @@ axum = "0.6.1" camino = "1.1.1" clap = { version = "4.0.29", features = ["derive"] } dotenv = "0.15.0" -futures-util = "0.3.25" hyper = { version = "0.14.23", features = ["full"] } itertools = "0.10.5" listenfd = "1.0.0" diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index 3e10f091..60654782 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -16,7 +16,6 @@ use std::{sync::Arc, time::Duration}; use anyhow::Context; use clap::Parser; -use futures_util::stream::{StreamExt, TryStreamExt}; use itertools::Itertools; use mas_config::RootConfig; use mas_handlers::{AppState, HttpClientFactory, MatrixHomeserver}; @@ -24,13 +23,12 @@ use mas_listener::{server::Server, shutdown::ShutdownStream}; use mas_router::UrlBuilder; use mas_storage::MIGRATOR; use mas_tasks::TaskQueue; -use mas_templates::Templates; use tokio::signal::unix::SignalKind; -use tracing::{error, info, log::warn}; +use tracing::{info, warn}; use crate::util::{ database_from_config, mailer_from_config, password_manager_from_config, - policy_factory_from_config, templates_from_config, + policy_factory_from_config, templates_from_config, watch_templates, }; #[derive(Parser, Debug, Default)] @@ -44,70 +42,13 @@ pub(super) struct Options { watch: bool, } -/// Watch for changes in the templates folders -async fn watch_templates( - client: &watchman_client::Client, - templates: &Templates, -) -> anyhow::Result<()> { - use watchman_client::{ - fields::NameOnly, - pdu::{QueryResult, SubscribeRequest}, - CanonicalPath, SubscriptionData, - }; - - let templates = templates.clone(); - - // Find which root we're supposed to watch - let root = templates.watch_root(); - - // For each root, create a subscription - let resolved = client - .resolve_root(CanonicalPath::canonicalize(root)?) - .await?; - - // TODO: we could subscribe to less, properly filter here - let (subscription, _) = client - .subscribe::(&resolved, SubscribeRequest::default()) - .await?; - - // Create a stream out of that subscription - let fut = futures_util::stream::try_unfold(subscription, |mut sub| async move { - let next = sub.next().await?; - anyhow::Ok(Some((next, sub))) - }) - .try_filter_map(|event| async move { - match event { - SubscriptionData::FilesChanged(QueryResult { - files: Some(files), .. - }) => { - let files: Vec<_> = files.into_iter().map(|f| f.name.into_inner()).collect(); - Ok(Some(files)) - } - _ => Ok(None), - } - }) - .for_each(move |files| { - let templates = templates.clone(); - async move { - info!(?files, "Files changed, reloading templates"); - - templates.clone().reload().await.unwrap_or_else(|err| { - error!(?err, "Error while reloading templates"); - }); - } - }); - - tokio::spawn(fut); - - Ok(()) -} - impl Options { #[allow(clippy::too_many_lines)] pub async fn run(&self, root: &super::Options) -> anyhow::Result<()> { let config: RootConfig = root.load_config()?; // Connect to the database + info!("Conntecting to the database"); let pool = database_from_config(&config.database).await?; if self.migrate { @@ -156,14 +97,7 @@ impl Options { // Watch for changes in templates if the --watch flag is present if self.watch { - let client = watchman_client::Connector::new() - .connect() - .await - .context("could not connect to watchman")?; - - watch_templates(&client, &templates) - .await - .context("could not watch for templates changes")?; + watch_templates(&templates).await?; } let graphql_schema = mas_handlers::graphql_schema(&pool); @@ -209,22 +143,18 @@ impl Options { ); // Display some informations about where we'll be serving connections - let is_tls = config.tls.is_some(); - let addresses: Vec = listeners + let proto = if config.tls.is_some() { "https" } else { "http" }; + let addresses= listeners .iter() .map(|listener| { - let addr = listener.local_addr(); - let proto = if is_tls { "https" } else { "http" }; - if let Ok(addr) = addr { + if let Ok(addr) = listener.local_addr() { format!("{proto}://{addr:?}") } else { - warn!( - "Could not get local address for listener, something might be wrong!" - ); + warn!("Could not get local address for listener, something might be wrong!"); format!("{proto}://???") } }) - .collect(); + .join(", "); let additional = if config.proxy_protocol { "(with Proxy Protocol)" @@ -233,7 +163,7 @@ impl Options { }; info!( - "Listening on {addresses:?} with resources {resources:?} {additional}", + "Listening on {addresses} with resources {resources:?} {additional}", resources = &config.resources ); diff --git a/crates/cli/src/util.rs b/crates/cli/src/util.rs index f138eddd..ec525478 100644 --- a/crates/cli/src/util.rs +++ b/crates/cli/src/util.rs @@ -28,7 +28,7 @@ use sqlx::{ postgres::{PgConnectOptions, PgPoolOptions}, ConnectOptions, PgPool, }; -use tracing::log::LevelFilter; +use tracing::{error, info, log::LevelFilter}; pub async fn password_manager_from_config( config: &PasswordsConfig, @@ -168,3 +168,61 @@ pub async fn database_from_config(config: &DatabaseConfig) -> Result anyhow::Result<()> { + use watchman_client::{prelude::*, SubscriptionData}; + + let client = Connector::new() + .connect() + .await + .context("could not connect to watchman")?; + + let templates = templates.clone(); + + // Find which root we're supposed to watch + let root = templates.watch_root(); + + // Create a subscription on the root + let resolved = client + .resolve_root(CanonicalPath::canonicalize(root)?) + .await?; + + // Only look for *.txt, *.html and *.subject files + let request = SubscribeRequest { + expression: Some(Expr::Suffix(vec![ + "txt".into(), + "html".into(), + "subject".into(), + ])), + ..SubscribeRequest::default() + }; + + let (mut subscription, _) = client.subscribe::(&resolved, request).await?; + + tokio::spawn(async move { + loop { + let event = match subscription.next().await { + Ok(event) => event, + Err(error) => { + error!(%error, "Stopped watching templates because of an error in the watchman subscription"); + break; + } + }; + + if let SubscriptionData::FilesChanged(QueryResult { + files: Some(files), .. + }) = event + { + let files: Vec<_> = files.into_iter().map(|f| f.name.into_inner()).collect(); + info!(?files, "Files changed, reloading templates"); + + templates.clone().reload().await.unwrap_or_else(|err| { + error!(?err, "Error while reloading templates"); + }); + } + } + }); + + Ok(()) +}