diff --git a/Cargo.lock b/Cargo.lock index 43bcdd24..847e86b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3250,6 +3250,7 @@ dependencies = [ "mas-templates", "mime", "oauth2-types", + "opentelemetry", "pbkdf2", "rand 0.8.5", "rand_chacha 0.3.1", diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index 2b5dd8a7..d0d23cc5 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -141,17 +141,22 @@ impl Options { let graphql_schema = mas_handlers::graphql_schema(&pool, conn); - let state = AppState { - pool, - templates, - key_store, - encrypter, - url_builder, - homeserver, - policy_factory, - graphql_schema, - http_client_factory, - password_manager, + let state = { + let mut s = AppState { + pool, + templates, + key_store, + encrypter, + url_builder, + homeserver, + policy_factory, + graphql_schema, + http_client_factory, + password_manager, + conn_acquisition_histogram: None, + }; + s.init_metrics()?; + s }; let mut fd_manager = listenfd::ListenFd::from_env(); diff --git a/crates/handlers/Cargo.toml b/crates/handlers/Cargo.toml index 00804327..bf5e6aba 100644 --- a/crates/handlers/Cargo.toml +++ b/crates/handlers/Cargo.toml @@ -12,6 +12,7 @@ futures-util = "0.3.28" # Logging and tracing tracing = "0.1.37" +opentelemetry = "0.19.0" # Error management thiserror = "1.0.41" diff --git a/crates/handlers/src/app_state.rs b/crates/handlers/src/app_state.rs index 3a3b7032..7bf61df5 100644 --- a/crates/handlers/src/app_state.rs +++ b/crates/handlers/src/app_state.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{convert::Infallible, sync::Arc}; +use std::{convert::Infallible, sync::Arc, time::Instant}; use axum::{ async_trait, @@ -27,6 +27,10 @@ use mas_router::UrlBuilder; use mas_storage::{BoxClock, BoxRepository, BoxRng, Repository, SystemClock}; use mas_storage_pg::PgRepository; use mas_templates::Templates; +use opentelemetry::{ + metrics::{Histogram, MetricsError, Unit}, + Context, KeyValue, +}; use rand::SeedableRng; use sqlx::PgPool; use thiserror::Error; @@ -45,6 +49,43 @@ pub struct AppState { pub graphql_schema: mas_graphql::Schema, pub http_client_factory: HttpClientFactory, pub password_manager: PasswordManager, + pub conn_acquisition_histogram: Option>, +} + +impl AppState { + /// Init the metrics for the app state. + /// + /// # Errors + /// + /// Returns an error if the metrics could not be initialized. + pub fn init_metrics(&mut self) -> Result<(), MetricsError> { + // XXX: do we want to put that somewhere else? + let meter = opentelemetry::global::meter("mas-handlers"); + let pool = self.pool.clone(); + let usage = meter + .i64_observable_up_down_counter("db.connections.usage") + .with_description("The number of connections that are currently in `state` described by the state attribute.") + .with_unit(Unit::new("connection")) + .init(); + + // Observe the number of active and idle connections in the pool + meter.register_callback(move |cx| { + let idle = u32::try_from(pool.num_idle()).unwrap_or(u32::MAX); + let used = pool.size() - idle; + usage.observe(cx, i64::from(idle), &[KeyValue::new("state", "idle")]); + usage.observe(cx, i64::from(used), &[KeyValue::new("state", "used")]); + })?; + + // Track the connection acquisition time + let histogram = meter + .u64_histogram("db.client.connections.create_time") + .with_description("The time it took to create a new connection.") + .with_unit(Unit::new("ms")) + .init(); + self.conn_acquisition_histogram = Some(histogram); + + Ok(()) + } } impl FromRef for PgPool { @@ -155,7 +196,17 @@ impl FromRequestParts for BoxRepository { _parts: &mut axum::http::request::Parts, state: &AppState, ) -> Result { + let start = Instant::now(); let repo = PgRepository::from_pool(&state.pool).await?; + + // Measure the time it took to create the connection + let duration = start.elapsed(); + let duration_ms = duration.as_millis().try_into().unwrap_or(u64::MAX); + + if let Some(histogram) = &state.conn_acquisition_histogram { + histogram.record(&Context::new(), duration_ms, &[]); + } + Ok(repo .map_err(mas_storage::RepositoryError::from_error) .boxed())