diff --git a/Cargo.lock b/Cargo.lock index 2e9a77ff..5a9e87ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2537,6 +2537,8 @@ name = "mas-http" version = "0.1.0" dependencies = [ "anyhow", + "aws-smithy-http", + "aws-types", "axum 0.6.0-rc.2", "bytes 1.2.1", "futures-util", diff --git a/crates/email/Cargo.toml b/crates/email/Cargo.toml index d9edd8cd..ae38a7c1 100644 --- a/crates/email/Cargo.toml +++ b/crates/email/Cargo.toml @@ -17,7 +17,7 @@ aws-smithy-client = { version = "0.51.0", default-features = false, features = [ aws-smithy-async = { version = "0.51.0", default-features = false, features = ["rt-tokio"] } mas-templates = { path = "../templates" } -mas-http = { path = "../http" } +mas-http = { path = "../http", features = ["aws-sdk", "client"] } [dependencies.lettre] version = "0.10.1" diff --git a/crates/email/src/transport/aws_ses.rs b/crates/email/src/transport/aws_ses.rs index b0a054be..7e940746 100644 --- a/crates/email/src/transport/aws_ses.rs +++ b/crates/email/src/transport/aws_ses.rs @@ -55,12 +55,7 @@ impl Transport { let http_connector = DynConnector::new(http_connector); // Middleware to add tracing to AWS SDK operations - let middleware = DynMiddleware::new(( - TraceLayer::with_namespace("aws_sdk") - .make_span_builder(mas_http::otel::DefaultMakeSpanBuilder::new("aws_sdk")) - .on_error(mas_http::otel::DebugOnError), - DefaultMiddleware::default(), - )); + let middleware = DynMiddleware::new((TraceLayer::aws_sdk(), DefaultMiddleware::default())); // Use that connector for discovering the config let config = ProviderConfig::default().with_http_connector(http_connector.clone()); diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 3b8997f6..36817308 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" license = "Apache-2.0" [dependencies] +aws-smithy-http = { version = "0.51.0", optional = true } +aws-types = { version = "0.51.0", optional = true } axum = { version = "0.6.0-rc.2", optional = true } bytes = "1.2.1" futures-util = "0.3.25" @@ -40,6 +42,7 @@ tower = { version = "0.4.13", features = ["util"] } [features] axum = ["dep:axum"] +aws-sdk = ["dep:aws-smithy-http", "dep:aws-types"] native-roots = ["dep:rustls-native-certs"] webpki-roots = ["dep:webpki-roots"] client = [ diff --git a/crates/http/src/layers/otel/inject_context.rs b/crates/http/src/layers/otel/inject_context.rs index 100e2ffc..edf3cafb 100644 --- a/crates/http/src/layers/otel/inject_context.rs +++ b/crates/http/src/layers/otel/inject_context.rs @@ -50,3 +50,27 @@ impl InjectContext> for InjectInHttpRequest { request } } + +#[cfg(feature = "aws-sdk")] +#[derive(Debug, Clone, Copy, Default)] +pub struct InjectInAwsRequest; + +#[cfg(feature = "aws-sdk")] +impl InjectContext for InjectInAwsRequest { + type Output = aws_smithy_http::operation::Request; + + fn inject_context( + &self, + cx: &Context, + mut request: aws_smithy_http::operation::Request, + ) -> Self::Output { + let headers = request.http_mut().headers_mut(); + let mut injector = HeaderInjector(headers); + + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(cx, &mut injector); + }); + + request + } +} diff --git a/crates/http/src/layers/otel/make_span_builder.rs b/crates/http/src/layers/otel/make_span_builder.rs index de958697..b5eeb209 100644 --- a/crates/http/src/layers/otel/make_span_builder.rs +++ b/crates/http/src/layers/otel/make_span_builder.rs @@ -186,3 +186,50 @@ impl MakeSpanBuilder for SpanFromDnsRequest { .with_attributes(attributes) } } + +#[cfg(feature = "aws-sdk")] +#[derive(Debug, Clone, Copy, Default)] +pub struct SpanFromAwsRequest; + +#[cfg(feature = "aws-sdk")] +impl MakeSpanBuilder for SpanFromAwsRequest { + fn make_span_builder(&self, request: &aws_smithy_http::operation::Request) -> SpanBuilder { + let properties = request.properties(); + let request = request.http(); + let mut attributes = vec![ + SC::RPC_SYSTEM.string("aws-api"), + SC::HTTP_METHOD.string(http_method_str(request.method())), + SC::HTTP_FLAVOR.string(http_flavor(request.version())), + SC::HTTP_TARGET.string(request.uri().to_string()), + ]; + + let mut name = Cow::Borrowed("aws_sdk"); + if let Some(metadata) = properties.get::() { + attributes.push(SC::RPC_SERVICE.string(metadata.service().to_owned())); + attributes.push(SC::RPC_METHOD.string(metadata.name().to_owned())); + name = Cow::Owned(metadata.name().to_owned()); + } else if let Some(service) = properties.get::() { + attributes.push(SC::RPC_SERVICE.string(service.as_ref().to_owned())); + } + + let headers = request.headers(); + + if let Some(host) = headers.typed_get::() { + attributes.push(SC::HTTP_HOST.string(host.to_string())); + } + + if let Some(user_agent) = headers.typed_get::() { + attributes.push(SC::HTTP_USER_AGENT.string(user_agent.to_string())); + } + + if let Some(ContentLength(content_length)) = headers.typed_get() { + if let Ok(content_length) = content_length.try_into() { + attributes.push(SC::HTTP_REQUEST_CONTENT_LENGTH.i64(content_length)); + } + } + + SpanBuilder::from_name(name) + .with_kind(SpanKind::Client) + .with_attributes(attributes) + } +} diff --git a/crates/http/src/layers/otel/mod.rs b/crates/http/src/layers/otel/mod.rs index f32d2edc..5054670b 100644 --- a/crates/http/src/layers/otel/mod.rs +++ b/crates/http/src/layers/otel/mod.rs @@ -102,6 +102,27 @@ pub type TraceDns = Trace< S, >; +#[cfg(feature = "aws-sdk")] +pub type TraceAwsSdkClientLayer = TraceLayer< + DefaultExtractContext, + InjectInAwsRequest, + SpanFromAwsRequest, + DefaultMakeMetricsLabels, + OnAwsResponse, + DebugOnError, +>; + +#[cfg(feature = "aws-sdk")] +pub type TraceAwsSdkClient = Trace< + DefaultExtractContext, + InjectInAwsRequest, + SpanFromAwsRequest, + DefaultMakeMetricsLabels, + OnAwsResponse, + DebugOnError, + S, +>; + impl TraceHttpServerLayer { #[must_use] pub fn http_server() -> Self { @@ -153,6 +174,18 @@ impl TraceDnsLayer { } } +#[cfg(feature = "aws-sdk")] +impl TraceAwsSdkClientLayer { + #[must_use] + pub fn aws_sdk() -> Self { + TraceLayer::with_namespace("aws_sdk") + .make_span_builder(SpanFromAwsRequest) + .on_response(OnAwsResponse) + .on_error(DebugOnError) + .inject_context(InjectInAwsRequest) + } +} + #[cfg(feature = "client")] use self::make_metrics_labels::DefaultMakeMetricsLabels; #[cfg(feature = "axum")] diff --git a/crates/http/src/layers/otel/on_response.rs b/crates/http/src/layers/otel/on_response.rs index 59aae359..98d2f83a 100644 --- a/crates/http/src/layers/otel/on_response.rs +++ b/crates/http/src/layers/otel/on_response.rs @@ -62,3 +62,36 @@ impl OnResponse> for OnHttpResponse { } } } + +#[derive(Debug, Clone, Copy, Default)] +pub struct OnAwsResponse; + +impl OnResponse for OnAwsResponse { + fn on_response( + &self, + span: &SpanRef<'_>, + metrics_labels: &mut Vec, + response: &aws_smithy_http::operation::Response, + ) { + let response = response.http(); + let status_code = i64::from(response.status().as_u16()); + span.set_attribute(SC::HTTP_STATUS_CODE.i64(status_code)); + metrics_labels.push(KeyValue::new("status_code", status_code)); + + if let Some(ContentLength(content_length)) = response.headers().typed_get() { + if let Ok(content_length) = content_length.try_into() { + span.set_attribute(SC::HTTP_RESPONSE_CONTENT_LENGTH.i64(content_length)); + } + } + + #[cfg(feature = "client")] + // Get local and remote address from hyper's HttpInfo injected by the + // HttpConnector + if let Some(info) = response.extensions().get::() { + span.set_attribute(SC::NET_PEER_IP.string(info.remote_addr().ip().to_string())); + span.set_attribute(SC::NET_PEER_PORT.i64(info.remote_addr().port().into())); + span.set_attribute(SC::NET_HOST_IP.string(info.local_addr().ip().to_string())); + span.set_attribute(SC::NET_HOST_PORT.i64(info.local_addr().port().into())); + } + } +}