diff --git a/Cargo.lock b/Cargo.lock index 518d3c09..ef34d05a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3348,9 +3348,11 @@ name = "mas-http" version = "0.9.0" dependencies = [ "anyhow", + "async-trait", "bytes", "futures-util", "headers", + "http 0.2.12", "http 1.1.0", "http-body 1.0.0", "http-body-util", @@ -3359,6 +3361,7 @@ dependencies = [ "hyper-util", "mas-tower", "opentelemetry", + "opentelemetry-http", "opentelemetry-semantic-conventions", "pin-project-lite", "rustls 0.23.10", diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs index 48b4ecab..5f54dc54 100644 --- a/crates/cli/src/telemetry.rs +++ b/crates/cli/src/telemetry.rs @@ -15,11 +15,12 @@ use std::time::Duration; use anyhow::Context as _; -use hyper::{header::CONTENT_TYPE, Body, Response}; +use hyper::{header::CONTENT_TYPE, Response}; use mas_config::{ MetricsConfig, MetricsExporterKind, Propagator, TelemetryConfig, TracingConfig, TracingExporterKind, }; +use mas_http::OtelClient; use opentelemetry::{ global, propagation::{TextMapCompositePropagator, TextMapPropagator}, @@ -87,7 +88,7 @@ fn propagator(propagators: &[Propagator]) -> impl TextMapPropagator { fn http_client() -> impl opentelemetry_http::HttpClient + 'static { let client = mas_http::make_untraced_client(); - opentelemetry_http::hyper::HyperClient::new_with_timeout(client, Duration::from_secs(30)) + OtelClient::new(client) } fn stdout_tracer_provider() -> TracerProvider { @@ -161,14 +162,14 @@ fn stdout_metric_reader() -> PeriodicReader { PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio).build() } -type PromServiceFuture = std::future::Ready, std::convert::Infallible>>; +type PromServiceFuture = std::future::Ready, std::convert::Infallible>>; #[allow(clippy::needless_pass_by_value)] fn prometheus_service_fn(_req: T) -> PromServiceFuture { use prometheus::{Encoder, TextEncoder}; let response = if let Some(registry) = PROMETHEUS_REGISTRY.get() { - let mut buffer = vec![]; + let mut buffer = String::new(); let encoder = TextEncoder::new(); let metric_families = registry.gather(); @@ -178,7 +179,7 @@ fn prometheus_service_fn(_req: T) -> PromServiceFuture { Response::builder() .status(200) .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) + .body(buffer) .unwrap() } else { Response::builder() diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 2d336d3b..c47b08d6 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true workspace = true [dependencies] +async-trait.workspace = true bytes = "1.6.0" futures-util = "0.3.30" headers.workspace = true @@ -22,6 +23,7 @@ hyper.workspace = true hyper-util.workspace = true hyper-rustls = { workspace = true, optional = true } opentelemetry.workspace = true +opentelemetry-http = { workspace = true, optional = true } opentelemetry-semantic-conventions.workspace = true rustls = { workspace = true, optional = true } rustls-platform-verifier = { workspace = true, optional = true } @@ -35,6 +37,9 @@ tower-http.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true +# opentelemetry-http still requires http 0.2, and we need to convert types +http02 = { package = "http", version = "0.2.12", optional = true } + mas-tower = { workspace = true, optional = true } [dev-dependencies] @@ -47,6 +52,8 @@ client = [ "dep:rustls", "dep:hyper-rustls", "dep:rustls-platform-verifier", + "dep:http02", + "dep:opentelemetry-http", "tower/limit", "tower-http/timeout", "tower-http/follow-redirect", diff --git a/crates/http/src/client.rs b/crates/http/src/client.rs index f227e35c..1449778f 100644 --- a/crates/http/src/client.rs +++ b/crates/http/src/client.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use bytes::Bytes; +use http_body_util::{BodyExt, Full}; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; pub use hyper_util::client::legacy::Client; use hyper_util::{ @@ -25,6 +27,7 @@ use mas_tower::{ DurationRecorderLayer, DurationRecorderService, FnWrapper, InFlightCounterLayer, InFlightCounterService, TraceLayer, TraceService, }; +use opentelemetry_http::HttpClient; use opentelemetry_semantic_conventions::trace::SERVER_ADDRESS; use tower::Layer; use tracing::Span; @@ -94,3 +97,76 @@ fn make_connector( .enable_http2() .wrap_connector(http) } + +/// A client which can be used by opentelemetry-http to send request through +/// hyper 1.x +/// +/// This is needed until OTEL upgrades to hyper 1.x +/// +#[derive(Debug)] +pub struct OtelClient { + client: UntracedClient>, +} + +impl OtelClient { + /// Create a new [`OtelClient`] from a [`UntracedClient`] + #[must_use] + pub fn new(client: UntracedClient>) -> Self { + Self { client } + } +} + +#[async_trait::async_trait] +impl HttpClient for OtelClient { + async fn send( + &self, + request: opentelemetry_http::Request>, + ) -> Result, opentelemetry_http::HttpError> { + // This is the annoying part: converting the OTEL http0.2 request to a http1 + // request + let (parts, body) = request.into_parts(); + let body = Full::new(Bytes::from(body)); + let mut request = http::Request::new(body); + + *request.uri_mut() = parts.uri.to_string().parse().unwrap(); + *request.method_mut() = match parts.method { + http02::Method::GET => http::Method::GET, + http02::Method::POST => http::Method::POST, + http02::Method::PUT => http::Method::PUT, + http02::Method::DELETE => http::Method::DELETE, + http02::Method::HEAD => http::Method::HEAD, + http02::Method::OPTIONS => http::Method::OPTIONS, + http02::Method::CONNECT => http::Method::CONNECT, + http02::Method::PATCH => http::Method::PATCH, + http02::Method::TRACE => http::Method::TRACE, + _ => return Err(opentelemetry_http::HttpError::from("Unsupported method")), + }; + request + .headers_mut() + .extend(parts.headers.into_iter().map(|(k, v)| { + ( + k.map(|k| http::HeaderName::from_bytes(k.as_ref()).unwrap()), + http::HeaderValue::from_bytes(v.as_ref()).unwrap(), + ) + })); + + // Send the request + let response = self.client.request(request).await?; + + // Convert back the response + let (parts, body) = response.into_parts(); + let body = body.collect().await?.to_bytes(); + let mut response = opentelemetry_http::Response::new(body); + *response.status_mut() = parts.status.as_u16().try_into().unwrap(); + response + .headers_mut() + .extend(parts.headers.into_iter().map(|(k, v)| { + ( + k.map(|k| http02::HeaderName::from_bytes(k.as_ref()).unwrap()), + http02::HeaderValue::from_bytes(v.as_ref()).unwrap(), + ) + })); + + Ok(response) + } +} diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index 48b7f801..3541ca4f 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -26,8 +26,8 @@ mod service; #[cfg(feature = "client")] pub use self::{ client::{ - make_traced_connector, make_untraced_client, Client, TracedClient, TracedConnector, - UntracedClient, UntracedConnector, + make_traced_connector, make_untraced_client, Client, OtelClient, TracedClient, + TracedConnector, UntracedClient, UntracedConnector, }, layers::client::{ClientLayer, ClientService}, };