1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-08-07 17:03:01 +03:00

Track the database connection acquisition time and pool usage

This commit is contained in:
Quentin Gliech
2023-07-06 18:35:31 +02:00
parent ca520dfd9a
commit f5143c045e
4 changed files with 70 additions and 12 deletions

1
Cargo.lock generated
View File

@@ -3250,6 +3250,7 @@ dependencies = [
"mas-templates", "mas-templates",
"mime", "mime",
"oauth2-types", "oauth2-types",
"opentelemetry",
"pbkdf2", "pbkdf2",
"rand 0.8.5", "rand 0.8.5",
"rand_chacha 0.3.1", "rand_chacha 0.3.1",

View File

@@ -141,7 +141,8 @@ impl Options {
let graphql_schema = mas_handlers::graphql_schema(&pool, conn); let graphql_schema = mas_handlers::graphql_schema(&pool, conn);
let state = AppState { let state = {
let mut s = AppState {
pool, pool,
templates, templates,
key_store, key_store,
@@ -152,6 +153,10 @@ impl Options {
graphql_schema, graphql_schema,
http_client_factory, http_client_factory,
password_manager, password_manager,
conn_acquisition_histogram: None,
};
s.init_metrics()?;
s
}; };
let mut fd_manager = listenfd::ListenFd::from_env(); let mut fd_manager = listenfd::ListenFd::from_env();

View File

@@ -12,6 +12,7 @@ futures-util = "0.3.28"
# Logging and tracing # Logging and tracing
tracing = "0.1.37" tracing = "0.1.37"
opentelemetry = "0.19.0"
# Error management # Error management
thiserror = "1.0.41" thiserror = "1.0.41"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::{convert::Infallible, sync::Arc}; use std::{convert::Infallible, sync::Arc, time::Instant};
use axum::{ use axum::{
async_trait, async_trait,
@@ -27,6 +27,10 @@ use mas_router::UrlBuilder;
use mas_storage::{BoxClock, BoxRepository, BoxRng, Repository, SystemClock}; use mas_storage::{BoxClock, BoxRepository, BoxRng, Repository, SystemClock};
use mas_storage_pg::PgRepository; use mas_storage_pg::PgRepository;
use mas_templates::Templates; use mas_templates::Templates;
use opentelemetry::{
metrics::{Histogram, MetricsError, Unit},
Context, KeyValue,
};
use rand::SeedableRng; use rand::SeedableRng;
use sqlx::PgPool; use sqlx::PgPool;
use thiserror::Error; use thiserror::Error;
@@ -45,6 +49,43 @@ pub struct AppState {
pub graphql_schema: mas_graphql::Schema, pub graphql_schema: mas_graphql::Schema,
pub http_client_factory: HttpClientFactory, pub http_client_factory: HttpClientFactory,
pub password_manager: PasswordManager, pub password_manager: PasswordManager,
pub conn_acquisition_histogram: Option<Histogram<u64>>,
}
impl AppState {
/// Init the metrics for the app state.
///
/// # Errors
///
/// Returns an error if the metrics could not be initialized.
pub fn init_metrics(&mut self) -> Result<(), MetricsError> {
// XXX: do we want to put that somewhere else?
let meter = opentelemetry::global::meter("mas-handlers");
let pool = self.pool.clone();
let usage = meter
.i64_observable_up_down_counter("db.connections.usage")
.with_description("The number of connections that are currently in `state` described by the state attribute.")
.with_unit(Unit::new("connection"))
.init();
// Observe the number of active and idle connections in the pool
meter.register_callback(move |cx| {
let idle = u32::try_from(pool.num_idle()).unwrap_or(u32::MAX);
let used = pool.size() - idle;
usage.observe(cx, i64::from(idle), &[KeyValue::new("state", "idle")]);
usage.observe(cx, i64::from(used), &[KeyValue::new("state", "used")]);
})?;
// Track the connection acquisition time
let histogram = meter
.u64_histogram("db.client.connections.create_time")
.with_description("The time it took to create a new connection.")
.with_unit(Unit::new("ms"))
.init();
self.conn_acquisition_histogram = Some(histogram);
Ok(())
}
} }
impl FromRef<AppState> for PgPool { impl FromRef<AppState> for PgPool {
@@ -155,7 +196,17 @@ impl FromRequestParts<AppState> for BoxRepository {
_parts: &mut axum::http::request::Parts, _parts: &mut axum::http::request::Parts,
state: &AppState, state: &AppState,
) -> Result<Self, Self::Rejection> { ) -> Result<Self, Self::Rejection> {
let start = Instant::now();
let repo = PgRepository::from_pool(&state.pool).await?; let repo = PgRepository::from_pool(&state.pool).await?;
// Measure the time it took to create the connection
let duration = start.elapsed();
let duration_ms = duration.as_millis().try_into().unwrap_or(u64::MAX);
if let Some(histogram) = &state.conn_acquisition_histogram {
histogram.record(&Context::new(), duration_ms, &[]);
}
Ok(repo Ok(repo
.map_err(mas_storage::RepositoryError::from_error) .map_err(mas_storage::RepositoryError::from_error)
.boxed()) .boxed())