1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-08-07 17:03:01 +03:00

Fix mas-cli

This does a few things:

 - move `bytes` to workspace dependencies
 - write an hyper-based transport for Sentry
 - ignore OTEL errors related to propagations
 - fix everything else in mas-cli
This commit is contained in:
Quentin Gliech
2024-07-02 15:12:35 +02:00
parent dafc781957
commit 798ca90241
13 changed files with 104 additions and 111 deletions

69
Cargo.lock generated
View File

@@ -608,7 +608,7 @@ dependencies = [
"http 1.1.0", "http 1.1.0",
"http-body 1.0.0", "http-body 1.0.0",
"http-body-util", "http-body-util",
"hyper 1.4.0", "hyper",
"hyper-util", "hyper-util",
"itoa", "itoa",
"matchit", "matchit",
@@ -2097,25 +2097,6 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "h2"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http 0.2.12",
"indexmap 2.2.6",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.4.5" version = "0.4.5"
@@ -2353,30 +2334,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
version = "0.14.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2 0.3.26",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.5.7",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "1.4.0" version = "1.4.0"
@@ -2386,7 +2343,7 @@ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
"futures-util", "futures-util",
"h2 0.4.5", "h2",
"http 1.1.0", "http 1.1.0",
"http-body 1.0.0", "http-body 1.0.0",
"httparse", "httparse",
@@ -2406,7 +2363,7 @@ checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155"
dependencies = [ dependencies = [
"futures-util", "futures-util",
"http 1.1.0", "http 1.1.0",
"hyper 1.4.0", "hyper",
"hyper-util", "hyper-util",
"rustls 0.23.10", "rustls 0.23.10",
"rustls-pki-types", "rustls-pki-types",
@@ -2427,7 +2384,7 @@ dependencies = [
"futures-util", "futures-util",
"http 1.1.0", "http 1.1.0",
"http-body 1.0.0", "http-body 1.0.0",
"hyper 1.4.0", "hyper",
"pin-project-lite", "pin-project-lite",
"socket2 0.5.7", "socket2 0.5.7",
"tokio", "tokio",
@@ -3147,14 +3104,18 @@ version = "0.9.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"axum", "axum",
"bytes",
"camino", "camino",
"clap", "clap",
"console", "console",
"dialoguer", "dialoguer",
"dotenvy", "dotenvy",
"figment", "figment",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"httpdate", "httpdate",
"hyper 1.4.0", "hyper",
"ipnetwork", "ipnetwork",
"itertools 0.13.0", "itertools 0.13.0",
"listenfd", "listenfd",
@@ -3286,7 +3247,7 @@ dependencies = [
"cookie_store", "cookie_store",
"futures-util", "futures-util",
"headers", "headers",
"hyper 1.4.0", "hyper",
"insta", "insta",
"lettre", "lettre",
"mas-axum-utils", "mas-axum-utils",
@@ -3345,7 +3306,7 @@ dependencies = [
"http 1.1.0", "http 1.1.0",
"http-body 1.0.0", "http-body 1.0.0",
"http-body-util", "http-body-util",
"hyper 1.4.0", "hyper",
"hyper-rustls", "hyper-rustls",
"hyper-util", "hyper-util",
"mas-tower", "mas-tower",
@@ -3497,7 +3458,7 @@ dependencies = [
"event-listener 5.3.1", "event-listener 5.3.1",
"futures-util", "futures-util",
"http-body 1.0.0", "http-body 1.0.0",
"hyper 1.4.0", "hyper",
"hyper-util", "hyper-util",
"libc", "libc",
"pin-project-lite", "pin-project-lite",
@@ -4075,9 +4036,7 @@ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
"http 0.2.12", "http 0.2.12",
"hyper 0.14.29",
"opentelemetry", "opentelemetry",
"tokio",
] ]
[[package]] [[package]]
@@ -4943,7 +4902,7 @@ dependencies = [
"http 1.1.0", "http 1.1.0",
"http-body 1.0.0", "http-body 1.0.0",
"http-body-util", "http-body-util",
"hyper 1.4.0", "hyper",
"hyper-rustls", "hyper-rustls",
"hyper-util", "hyper-util",
"ipnet", "ipnet",
@@ -7285,7 +7244,7 @@ dependencies = [
"futures", "futures",
"http 1.1.0", "http 1.1.0",
"http-body-util", "http-body-util",
"hyper 1.4.0", "hyper",
"hyper-util", "hyper-util",
"log", "log",
"once_cell", "once_cell",

View File

@@ -77,6 +77,10 @@ version = "0.7.5"
version = "0.9.3" version = "0.9.3"
features = ["cookie-private", "cookie-key-expansion", "typed-header"] features = ["cookie-private", "cookie-key-expansion", "typed-header"]
# Bytes
[workspace.dependencies.bytes]
version = "1.6.0"
# UTF-8 paths # UTF-8 paths
[workspace.dependencies.camino] [workspace.dependencies.camino]
version = "1.1.7" version = "1.1.7"
@@ -241,7 +245,6 @@ version = "0.23.0"
features = ["trace", "metrics"] features = ["trace", "metrics"]
[workspace.dependencies.opentelemetry-http] [workspace.dependencies.opentelemetry-http]
version = "0.12.0" version = "0.12.0"
features = ["hyper", "tokio"]
[workspace.dependencies.opentelemetry-semantic-conventions] [workspace.dependencies.opentelemetry-semantic-conventions]
version = "0.15.0" version = "0.15.0"
[workspace.dependencies.tracing-opentelemetry] [workspace.dependencies.tracing-opentelemetry]

View File

@@ -15,7 +15,7 @@ workspace = true
async-trait.workspace = true async-trait.workspace = true
axum.workspace = true axum.workspace = true
axum-extra.workspace = true axum-extra.workspace = true
bytes = "1.6.0" bytes.workspace = true
chrono.workspace = true chrono.workspace = true
data-encoding = "2.6.0" data-encoding = "2.6.0"
futures-util = "0.3.30" futures-util = "0.3.30"

View File

@@ -14,6 +14,7 @@ workspace = true
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true
axum.workspace = true axum.workspace = true
bytes.workspace = true
camino.workspace = true camino.workspace = true
clap.workspace = true clap.workspace = true
console = "0.15.8" console = "0.15.8"
@@ -21,6 +22,9 @@ dialoguer = { version = "0.11.0", features = ["fuzzy-select"] }
dotenvy = "0.15.7" dotenvy = "0.15.7"
figment.workspace = true figment.workspace = true
httpdate = "1.0.3" httpdate = "1.0.3"
http.workspace = true
http-body.workspace = true
http-body-util.workspace = true
hyper.workspace = true hyper.workspace = true
ipnetwork = "0.20.0" ipnetwork = "0.20.0"
itertools = "0.13.0" itertools = "0.13.0"

View File

@@ -1,4 +1,4 @@
// Copyright 2022 The Matrix.org Foundation C.I.C. // Copyright 2022-2024 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@@ -14,6 +14,7 @@
use clap::Parser; use clap::Parser;
use figment::Figment; use figment::Figment;
use http_body_util::BodyExt;
use hyper::{Response, Uri}; use hyper::{Response, Uri};
use mas_config::{ConfigurationSection, PolicyConfig}; use mas_config::{ConfigurationSection, PolicyConfig};
use mas_handlers::HttpClientFactory; use mas_handlers::HttpClientFactory;
@@ -79,7 +80,7 @@ impl Options {
let mut client = http_client_factory.client("debug"); let mut client = http_client_factory.client("debug");
let request = hyper::Request::builder() let request = hyper::Request::builder()
.uri(url) .uri(url)
.body(hyper::Body::empty())?; .body(axum::body::Body::empty())?;
let response = client.ready().await?.call(request).await?; let response = client.ready().await?.call(request).await?;
let (parts, body) = response.into_parts(); let (parts, body) = response.into_parts();
@@ -88,7 +89,7 @@ impl Options {
print_headers(&parts); print_headers(&parts);
} }
let mut body = hyper::body::aggregate(body).await?; let mut body = body.collect().await?.to_bytes();
let mut stdout = tokio::io::stdout(); let mut stdout = tokio::io::stdout();
stdout.write_all_buf(&mut body).await?; stdout.write_all_buf(&mut body).await?;
} }
@@ -105,7 +106,7 @@ impl Options {
.json_response(); .json_response();
let request = hyper::Request::builder() let request = hyper::Request::builder()
.uri(url) .uri(url)
.body(hyper::Body::empty())?; .body(axum::body::Body::empty())?;
let response: Response<serde_json::Value> = let response: Response<serde_json::Value> =
client.ready().await?.call(request).await?; client.ready().await?.call(request).await?;

View File

@@ -68,7 +68,7 @@ This means some clients will refuse to use it."#
let request = hyper::Request::builder() let request = hyper::Request::builder()
.uri(&well_known_uri) .uri(&well_known_uri)
.body(hyper::Body::empty())?; .body(axum::body::Body::empty())?;
let result = client.ready().await?.call(request).await; let result = client.ready().await?.call(request).await;
let expected_well_known = serde_json::json!({ let expected_well_known = serde_json::json!({
@@ -180,7 +180,7 @@ Error details: {e}
let client_versions = hs_api.join("/_matrix/client/versions")?; let client_versions = hs_api.join("/_matrix/client/versions")?;
let request = hyper::Request::builder() let request = hyper::Request::builder()
.uri(client_versions.as_str()) .uri(client_versions.as_str())
.body(hyper::Body::empty())?; .body(axum::body::Body::empty())?;
let result = client.ready().await?.call(request).await; let result = client.ready().await?.call(request).await;
let can_reach_cs = match result { let can_reach_cs = match result {
Ok(response) => { Ok(response) => {
@@ -234,7 +234,7 @@ Error details: {e}
"Bearer averyinvalidtokenireallyhopethisisnotvalid", "Bearer averyinvalidtokenireallyhopethisisnotvalid",
) )
.uri(whoami.as_str()) .uri(whoami.as_str())
.body(hyper::Body::empty())?; .body(axum::body::Body::empty())?;
let result = client.ready().await?.call(request).await; let result = client.ready().await?.call(request).await;
match result { match result {
Ok(response) => { Ok(response) => {
@@ -284,7 +284,7 @@ Error details: {e}
let server_version = hs_api.join("/_synapse/admin/v1/server_version")?; let server_version = hs_api.join("/_synapse/admin/v1/server_version")?;
let request = hyper::Request::builder() let request = hyper::Request::builder()
.uri(server_version.as_str()) .uri(server_version.as_str())
.body(hyper::Body::empty())?; .body(axum::body::Body::empty())?;
let result = client.ready().await?.call(request).await; let result = client.ready().await?.call(request).await;
match result { match result {
Ok(response) => { Ok(response) => {
@@ -313,7 +313,7 @@ Error details: {e}
let request = hyper::Request::builder() let request = hyper::Request::builder()
.uri(background_updates.as_str()) .uri(background_updates.as_str())
.header("Authorization", format!("Bearer {admin_token}")) .header("Authorization", format!("Bearer {admin_token}"))
.body(hyper::Body::empty())?; .body(axum::body::Body::empty())?;
let result = client.ready().await?.call(request).await; let result = client.ready().await?.call(request).await;
match result { match result {
Ok(response) => { Ok(response) => {
@@ -361,7 +361,7 @@ Error details: {e}
let compat_login = compat_login.as_str(); let compat_login = compat_login.as_str();
let request = hyper::Request::builder() let request = hyper::Request::builder()
.uri(compat_login) .uri(compat_login)
.body(hyper::Body::empty())?; .body(axum::body::Body::empty())?;
let result = client.ready().await?.call(request).await; let result = client.ready().await?.call(request).await;
match result { match result {
Ok(response) => { Ok(response) => {

View File

@@ -59,11 +59,18 @@ async fn try_main() -> anyhow::Result<()> {
let (log_writer, _guard) = tracing_appender::non_blocking(output); let (log_writer, _guard) = tracing_appender::non_blocking(output);
let fmt_layer = tracing_subscriber::fmt::layer() let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(log_writer) .with_writer(log_writer)
.with_file(true)
.with_line_number(true)
.with_ansi(with_ansi); .with_ansi(with_ansi);
let filter_layer = EnvFilter::try_from_default_env() let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("info")) .or_else(|_| EnvFilter::try_new("info"))
.context("could not setup logging filter")?; .context("could not setup logging filter")?;
// Setup the rustls crypto provider
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.map_err(|_| anyhow::anyhow!("could not install the AWS LC crypto provider"))?;
// Parse the CLI arguments // Parse the CLI arguments
let opts = self::commands::Options::parse(); let opts = self::commands::Options::parse();

View File

@@ -1,4 +1,4 @@
// Copyright 2023 The Matrix.org Foundation C.I.C. // Copyright 2023-2024 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@@ -22,7 +22,10 @@
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use hyper::{client::connect::Connect, header::RETRY_AFTER, Client, StatusCode}; use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::{header::RETRY_AFTER, StatusCode};
use mas_http::UntracedClient;
use sentry::{sentry_debug, ClientOptions, Transport, TransportFactory}; use sentry::{sentry_debug, ClientOptions, Transport, TransportFactory};
use self::tokio_thread::TransportThread; use self::tokio_thread::TransportThread;
@@ -34,30 +37,24 @@ pub struct HyperTransport {
thread: TransportThread, thread: TransportThread,
} }
pub struct HyperTransportFactory<C> { pub struct HyperTransportFactory {
client: Client<C>, client: UntracedClient<Full<Bytes>>,
} }
impl<C> HyperTransportFactory<C> { impl HyperTransportFactory {
pub fn new(client: Client<C>) -> Self { pub fn new(client: UntracedClient<Full<Bytes>>) -> Self {
Self { client } Self { client }
} }
} }
impl<C> TransportFactory for HyperTransportFactory<C> impl TransportFactory for HyperTransportFactory {
where
C: Connect + Clone + Send + Sync + 'static,
{
fn create_transport(&self, options: &ClientOptions) -> Arc<dyn Transport> { fn create_transport(&self, options: &ClientOptions) -> Arc<dyn Transport> {
Arc::new(HyperTransport::new(options, self.client.clone())) Arc::new(HyperTransport::new(options, self.client.clone()))
} }
} }
impl HyperTransport { impl HyperTransport {
pub fn new<C>(options: &ClientOptions, client: Client<C>) -> Self pub fn new(options: &ClientOptions, client: UntracedClient<Full<Bytes>>) -> Self {
where
C: Connect + Clone + Send + Sync + 'static,
{
let dsn = options.dsn.as_ref().unwrap(); let dsn = options.dsn.as_ref().unwrap();
let user_agent = options.user_agent.clone(); let user_agent = options.user_agent.clone();
let auth = dsn.to_auth(Some(&user_agent)).to_string(); let auth = dsn.to_auth(Some(&user_agent)).to_string();
@@ -69,7 +66,7 @@ impl HyperTransport {
let request = hyper::Request::post(&url) let request = hyper::Request::post(&url)
.header("X-Sentry-Auth", &auth) .header("X-Sentry-Auth", &auth)
.body(hyper::Body::from(body)) .body(Full::new(Bytes::from(body)))
.unwrap(); .unwrap();
let fut = client.request(request); let fut = client.request(request);
@@ -93,12 +90,13 @@ impl HyperTransport {
rl.update_from_429(); rl.update_from_429();
} }
match hyper::body::to_bytes(response.into_body()).await { match response.into_body().collect().await {
Err(err) => { Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err); sentry_debug!("Failed to read sentry response: {}", err);
} }
Ok(bytes) => { Ok(body) => {
let text = String::from_utf8_lossy(&bytes); let body = body.to_bytes();
let text = String::from_utf8_lossy(&body);
sentry_debug!("Get response: `{}`", text); sentry_debug!("Get response: `{}`", text);
} }
} }

View File

@@ -1,4 +1,4 @@
// Copyright 2022 The Matrix.org Foundation C.I.C. // Copyright 2022-2024 The Matrix.org Foundation C.I.C.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@ use std::{
use anyhow::Context; use anyhow::Context;
use axum::{ use axum::{
body::HttpBody,
error_handling::HandleErrorLayer, error_handling::HandleErrorLayer,
extract::{FromRef, MatchedPath}, extract::{FromRef, MatchedPath},
Extension, Router, Extension, Router,
@@ -39,7 +38,6 @@ use mas_tower::{
KV, KV,
}; };
use opentelemetry::{Key, KeyValue}; use opentelemetry::{Key, KeyValue};
use opentelemetry_http::HeaderExtractor;
use opentelemetry_semantic_conventions::trace::{ use opentelemetry_semantic_conventions::trace::{
HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE, NETWORK_PROTOCOL_NAME, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE, NETWORK_PROTOCOL_NAME,
NETWORK_PROTOCOL_VERSION, URL_PATH, URL_QUERY, URL_SCHEME, USER_AGENT_ORIGINAL, NETWORK_PROTOCOL_VERSION, URL_PATH, URL_QUERY, URL_SCHEME, USER_AGENT_ORIGINAL,
@@ -55,6 +53,22 @@ use crate::app_state::AppState;
const MAS_LISTENER_NAME: Key = Key::from_static_str("mas.listener.name"); const MAS_LISTENER_NAME: Key = Key::from_static_str("mas.listener.name");
/// Same as the extractor from opentelemetry-http, but using http@1
struct HeaderExtractor<'a>(pub &'a http::HeaderMap);
impl<'a> opentelemetry::propagation::Extractor for HeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|value| value.to_str().ok())
}
fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(http::HeaderName::as_str)
.collect::<Vec<_>>()
}
}
#[inline] #[inline]
fn otel_http_method<B>(request: &Request<B>) -> &'static str { fn otel_http_method<B>(request: &Request<B>) -> &'static str {
match request.method() { match request.method() {
@@ -179,36 +193,31 @@ fn on_http_response_labels<B>(res: &Response<B>) -> Vec<KeyValue> {
)] )]
} }
pub fn build_router<B>( pub fn build_router(
state: AppState, state: AppState,
resources: &[HttpResource], resources: &[HttpResource],
prefix: Option<&str>, prefix: Option<&str>,
name: Option<&str>, name: Option<&str>,
) -> Router<(), B> ) -> Router<()> {
where
B: HttpBody + Send + 'static,
<B as HttpBody>::Data: Into<axum::body::Bytes> + Send,
<B as HttpBody>::Error: std::error::Error + Send + Sync,
{
let templates = Templates::from_ref(&state); let templates = Templates::from_ref(&state);
let mut router = Router::new(); let mut router = Router::new();
for resource in resources { for resource in resources {
router = match resource { router = match resource {
mas_config::HttpResource::Health => { mas_config::HttpResource::Health => {
router.merge(mas_handlers::healthcheck_router::<AppState, B>()) router.merge(mas_handlers::healthcheck_router::<AppState>())
} }
mas_config::HttpResource::Prometheus => { mas_config::HttpResource::Prometheus => {
router.route_service("/metrics", crate::telemetry::prometheus_service()) router.route_service("/metrics", crate::telemetry::prometheus_service())
} }
mas_config::HttpResource::Discovery => { mas_config::HttpResource::Discovery => {
router.merge(mas_handlers::discovery_router::<AppState, B>()) router.merge(mas_handlers::discovery_router::<AppState>())
} }
mas_config::HttpResource::Human => { mas_config::HttpResource::Human => {
router.merge(mas_handlers::human_router::<AppState, B>(templates.clone())) router.merge(mas_handlers::human_router::<AppState>(templates.clone()))
} }
mas_config::HttpResource::GraphQL { playground } => { mas_config::HttpResource::GraphQL { playground } => {
router.merge(mas_handlers::graphql_router::<AppState, B>(*playground)) router.merge(mas_handlers::graphql_router::<AppState>(*playground))
} }
mas_config::HttpResource::Assets { path } => { mas_config::HttpResource::Assets { path } => {
let static_service = ServeDir::new(path) let static_service = ServeDir::new(path)
@@ -230,11 +239,9 @@ where
(error_layer, cache_layer).layer(static_service), (error_layer, cache_layer).layer(static_service),
) )
} }
mas_config::HttpResource::OAuth => { mas_config::HttpResource::OAuth => router.merge(mas_handlers::api_router::<AppState>()),
router.merge(mas_handlers::api_router::<AppState, B>())
}
mas_config::HttpResource::Compat => { mas_config::HttpResource::Compat => {
router.merge(mas_handlers::compat_router::<AppState, B>()) router.merge(mas_handlers::compat_router::<AppState>())
} }
// TODO: do a better handler here // TODO: do a better handler here
mas_config::HttpResource::ConnectionInfo => router.route( mas_config::HttpResource::ConnectionInfo => router.route(

View File

@@ -15,6 +15,8 @@
use std::time::Duration; use std::time::Duration;
use anyhow::Context as _; use anyhow::Context as _;
use bytes::Bytes;
use http_body_util::Full;
use hyper::{header::CONTENT_TYPE, Response}; use hyper::{header::CONTENT_TYPE, Response};
use mas_config::{ use mas_config::{
MetricsConfig, MetricsExporterKind, Propagator, TelemetryConfig, TracingConfig, MetricsConfig, MetricsExporterKind, Propagator, TelemetryConfig, TracingConfig,
@@ -48,7 +50,16 @@ static METER_PROVIDER: OnceCell<SdkMeterProvider> = OnceCell::const_new();
static PROMETHEUS_REGISTRY: OnceCell<Registry> = OnceCell::const_new(); static PROMETHEUS_REGISTRY: OnceCell<Registry> = OnceCell::const_new();
pub fn setup(config: &TelemetryConfig) -> anyhow::Result<Option<Tracer>> { pub fn setup(config: &TelemetryConfig) -> anyhow::Result<Option<Tracer>> {
global::set_error_handler(|e| tracing::error!("{}", e))?; global::set_error_handler(|e| {
// Don't log the propagation errors, else we'll log an error on each request if
// the propagation errors aren't there
if matches!(e, opentelemetry::global::Error::Propagation(_)) {
return;
}
tracing::error!(error = &e as &dyn std::error::Error);
})?;
let propagator = propagator(&config.tracing.propagators); let propagator = propagator(&config.tracing.propagators);
// The CORS filter needs to know what headers it should whitelist for // The CORS filter needs to know what headers it should whitelist for
@@ -162,14 +173,15 @@ fn stdout_metric_reader() -> PeriodicReader {
PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build() PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build()
} }
type PromServiceFuture = std::future::Ready<Result<Response<String>, std::convert::Infallible>>; type PromServiceFuture =
std::future::Ready<Result<Response<Full<Bytes>>, std::convert::Infallible>>;
#[allow(clippy::needless_pass_by_value)] #[allow(clippy::needless_pass_by_value)]
fn prometheus_service_fn<T>(_req: T) -> PromServiceFuture { fn prometheus_service_fn<T>(_req: T) -> PromServiceFuture {
use prometheus::{Encoder, TextEncoder}; use prometheus::{Encoder, TextEncoder};
let response = if let Some(registry) = PROMETHEUS_REGISTRY.get() { let response = if let Some(registry) = PROMETHEUS_REGISTRY.get() {
let mut buffer = String::new(); let mut buffer = Vec::new();
let encoder = TextEncoder::new(); let encoder = TextEncoder::new();
let metric_families = registry.gather(); let metric_families = registry.gather();
@@ -179,13 +191,15 @@ fn prometheus_service_fn<T>(_req: T) -> PromServiceFuture {
Response::builder() Response::builder()
.status(200) .status(200)
.header(CONTENT_TYPE, encoder.format_type()) .header(CONTENT_TYPE, encoder.format_type())
.body(buffer) .body(Full::new(Bytes::from(buffer)))
.unwrap() .unwrap()
} else { } else {
Response::builder() Response::builder()
.status(500) .status(500)
.header(CONTENT_TYPE, "text/plain") .header(CONTENT_TYPE, "text/plain")
.body(Body::from("Prometheus exporter was not enabled in config")) .body(Full::new(Bytes::from_static(
b"Prometheus exporter was not enabled in config",
)))
.unwrap() .unwrap()
}; };

View File

@@ -13,7 +13,7 @@ workspace = true
[dependencies] [dependencies]
async-trait.workspace = true async-trait.workspace = true
bytes = "1.6.0" bytes.workspace = true
futures-util = "0.3.30" futures-util = "0.3.30"
headers.workspace = true headers.workspace = true
http.workspace = true http.workspace = true

View File

@@ -12,7 +12,7 @@ publish = false
workspace = true workspace = true
[dependencies] [dependencies]
bytes = "1.6.0" bytes.workspace = true
event-listener = "5.3.1" event-listener = "5.3.1"
futures-util = "0.3.30" futures-util = "0.3.30"
http-body.workspace = true http-body.workspace = true

View File

@@ -17,7 +17,7 @@ keystore = ["dep:mas-keystore"]
[dependencies] [dependencies]
base64ct = { version = "1.6.0", features = ["std"] } base64ct = { version = "1.6.0", features = ["std"] }
bytes = "1.6.0" bytes.workspace = true
chrono.workspace = true chrono.workspace = true
form_urlencoded = "1.2.1" form_urlencoded = "1.2.1"
futures-util = "0.3.30" futures-util = "0.3.30"