1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-08-06 06:02:40 +03:00

Write an adapter for opentelemetry-http

This commit is contained in:
Quentin Gliech
2024-07-02 14:13:49 +02:00
parent d4cbbd97d8
commit 2e63e3da71
5 changed files with 94 additions and 7 deletions

3
Cargo.lock generated
View File

@@ -3348,9 +3348,11 @@ name = "mas-http"
version = "0.9.0" version = "0.9.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"bytes", "bytes",
"futures-util", "futures-util",
"headers", "headers",
"http 0.2.12",
"http 1.1.0", "http 1.1.0",
"http-body 1.0.0", "http-body 1.0.0",
"http-body-util", "http-body-util",
@@ -3359,6 +3361,7 @@ dependencies = [
"hyper-util", "hyper-util",
"mas-tower", "mas-tower",
"opentelemetry", "opentelemetry",
"opentelemetry-http",
"opentelemetry-semantic-conventions", "opentelemetry-semantic-conventions",
"pin-project-lite", "pin-project-lite",
"rustls 0.23.10", "rustls 0.23.10",

View File

@@ -15,11 +15,12 @@
use std::time::Duration; use std::time::Duration;
use anyhow::Context as _; use anyhow::Context as _;
use hyper::{header::CONTENT_TYPE, Body, Response}; use hyper::{header::CONTENT_TYPE, Response};
use mas_config::{ use mas_config::{
MetricsConfig, MetricsExporterKind, Propagator, TelemetryConfig, TracingConfig, MetricsConfig, MetricsExporterKind, Propagator, TelemetryConfig, TracingConfig,
TracingExporterKind, TracingExporterKind,
}; };
use mas_http::OtelClient;
use opentelemetry::{ use opentelemetry::{
global, global,
propagation::{TextMapCompositePropagator, TextMapPropagator}, propagation::{TextMapCompositePropagator, TextMapPropagator},
@@ -87,7 +88,7 @@ fn propagator(propagators: &[Propagator]) -> impl TextMapPropagator {
fn http_client() -> impl opentelemetry_http::HttpClient + 'static { fn http_client() -> impl opentelemetry_http::HttpClient + 'static {
let client = mas_http::make_untraced_client(); let client = mas_http::make_untraced_client();
opentelemetry_http::hyper::HyperClient::new_with_timeout(client, Duration::from_secs(30)) OtelClient::new(client)
} }
fn stdout_tracer_provider() -> TracerProvider { fn stdout_tracer_provider() -> TracerProvider {
@@ -161,14 +162,14 @@ fn stdout_metric_reader() -> PeriodicReader {
PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build() PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build()
} }
type PromServiceFuture = std::future::Ready<Result<Response<Body>, std::convert::Infallible>>; type PromServiceFuture = std::future::Ready<Result<Response<String>, std::convert::Infallible>>;
#[allow(clippy::needless_pass_by_value)] #[allow(clippy::needless_pass_by_value)]
fn prometheus_service_fn<T>(_req: T) -> PromServiceFuture { fn prometheus_service_fn<T>(_req: T) -> PromServiceFuture {
use prometheus::{Encoder, TextEncoder}; use prometheus::{Encoder, TextEncoder};
let response = if let Some(registry) = PROMETHEUS_REGISTRY.get() { let response = if let Some(registry) = PROMETHEUS_REGISTRY.get() {
let mut buffer = vec![]; let mut buffer = String::new();
let encoder = TextEncoder::new(); let encoder = TextEncoder::new();
let metric_families = registry.gather(); let metric_families = registry.gather();
@@ -178,7 +179,7 @@ fn prometheus_service_fn<T>(_req: T) -> PromServiceFuture {
Response::builder() Response::builder()
.status(200) .status(200)
.header(CONTENT_TYPE, encoder.format_type()) .header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer)) .body(buffer)
.unwrap() .unwrap()
} else { } else {
Response::builder() Response::builder()

View File

@@ -12,6 +12,7 @@ repository.workspace = true
workspace = true workspace = true
[dependencies] [dependencies]
async-trait.workspace = true
bytes = "1.6.0" bytes = "1.6.0"
futures-util = "0.3.30" futures-util = "0.3.30"
headers.workspace = true headers.workspace = true
@@ -22,6 +23,7 @@ hyper.workspace = true
hyper-util.workspace = true hyper-util.workspace = true
hyper-rustls = { workspace = true, optional = true } hyper-rustls = { workspace = true, optional = true }
opentelemetry.workspace = true opentelemetry.workspace = true
opentelemetry-http = { workspace = true, optional = true }
opentelemetry-semantic-conventions.workspace = true opentelemetry-semantic-conventions.workspace = true
rustls = { workspace = true, optional = true } rustls = { workspace = true, optional = true }
rustls-platform-verifier = { workspace = true, optional = true } rustls-platform-verifier = { workspace = true, optional = true }
@@ -35,6 +37,9 @@ tower-http.workspace = true
tracing.workspace = true tracing.workspace = true
tracing-opentelemetry.workspace = true tracing-opentelemetry.workspace = true
# opentelemetry-http still requires http 0.2, and we need to convert types
http02 = { package = "http", version = "0.2.12", optional = true }
mas-tower = { workspace = true, optional = true } mas-tower = { workspace = true, optional = true }
[dev-dependencies] [dev-dependencies]
@@ -47,6 +52,8 @@ client = [
"dep:rustls", "dep:rustls",
"dep:hyper-rustls", "dep:hyper-rustls",
"dep:rustls-platform-verifier", "dep:rustls-platform-verifier",
"dep:http02",
"dep:opentelemetry-http",
"tower/limit", "tower/limit",
"tower-http/timeout", "tower-http/timeout",
"tower-http/follow-redirect", "tower-http/follow-redirect",

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
pub use hyper_util::client::legacy::Client; pub use hyper_util::client::legacy::Client;
use hyper_util::{ use hyper_util::{
@@ -25,6 +27,7 @@ use mas_tower::{
DurationRecorderLayer, DurationRecorderService, FnWrapper, InFlightCounterLayer, DurationRecorderLayer, DurationRecorderService, FnWrapper, InFlightCounterLayer,
InFlightCounterService, TraceLayer, TraceService, InFlightCounterService, TraceLayer, TraceService,
}; };
use opentelemetry_http::HttpClient;
use opentelemetry_semantic_conventions::trace::SERVER_ADDRESS; use opentelemetry_semantic_conventions::trace::SERVER_ADDRESS;
use tower::Layer; use tower::Layer;
use tracing::Span; use tracing::Span;
@@ -94,3 +97,76 @@ fn make_connector<R>(
.enable_http2() .enable_http2()
.wrap_connector(http) .wrap_connector(http)
} }
/// A client which can be used by opentelemetry-http to send request through
/// hyper 1.x
///
/// This is needed until OTEL upgrades to hyper 1.x
/// <https://github.com/open-telemetry/opentelemetry-rust/pull/1674>
#[derive(Debug)]
pub struct OtelClient {
client: UntracedClient<Full<Bytes>>,
}
impl OtelClient {
/// Create a new [`OtelClient`] from a [`UntracedClient`]
#[must_use]
pub fn new(client: UntracedClient<Full<Bytes>>) -> Self {
Self { client }
}
}
#[async_trait::async_trait]
impl HttpClient for OtelClient {
async fn send(
&self,
request: opentelemetry_http::Request<Vec<u8>>,
) -> Result<opentelemetry_http::Response<Bytes>, opentelemetry_http::HttpError> {
// This is the annoying part: converting the OTEL http0.2 request to a http1
// request
let (parts, body) = request.into_parts();
let body = Full::new(Bytes::from(body));
let mut request = http::Request::new(body);
*request.uri_mut() = parts.uri.to_string().parse().unwrap();
*request.method_mut() = match parts.method {
http02::Method::GET => http::Method::GET,
http02::Method::POST => http::Method::POST,
http02::Method::PUT => http::Method::PUT,
http02::Method::DELETE => http::Method::DELETE,
http02::Method::HEAD => http::Method::HEAD,
http02::Method::OPTIONS => http::Method::OPTIONS,
http02::Method::CONNECT => http::Method::CONNECT,
http02::Method::PATCH => http::Method::PATCH,
http02::Method::TRACE => http::Method::TRACE,
_ => return Err(opentelemetry_http::HttpError::from("Unsupported method")),
};
request
.headers_mut()
.extend(parts.headers.into_iter().map(|(k, v)| {
(
k.map(|k| http::HeaderName::from_bytes(k.as_ref()).unwrap()),
http::HeaderValue::from_bytes(v.as_ref()).unwrap(),
)
}));
// Send the request
let response = self.client.request(request).await?;
// Convert back the response
let (parts, body) = response.into_parts();
let body = body.collect().await?.to_bytes();
let mut response = opentelemetry_http::Response::new(body);
*response.status_mut() = parts.status.as_u16().try_into().unwrap();
response
.headers_mut()
.extend(parts.headers.into_iter().map(|(k, v)| {
(
k.map(|k| http02::HeaderName::from_bytes(k.as_ref()).unwrap()),
http02::HeaderValue::from_bytes(v.as_ref()).unwrap(),
)
}));
Ok(response)
}
}

View File

@@ -26,8 +26,8 @@ mod service;
#[cfg(feature = "client")] #[cfg(feature = "client")]
pub use self::{ pub use self::{
client::{ client::{
make_traced_connector, make_untraced_client, Client, TracedClient, TracedConnector, make_traced_connector, make_untraced_client, Client, OtelClient, TracedClient,
UntracedClient, UntracedConnector, TracedConnector, UntracedClient, UntracedConnector,
}, },
layers::client::{ClientLayer, ClientService}, layers::client::{ClientLayer, ClientService},
}; };