diff --git a/Cargo.lock b/Cargo.lock index a88748a3..2e9a77ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -390,8 +390,6 @@ dependencies = [ "http", "http-body", "hyper", - "hyper-rustls", - "lazy_static", "pin-project-lite", "tokio", "tower", @@ -415,8 +413,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "pin-utils", - "tokio", - "tokio-util 0.7.4", "tracing", ] @@ -1980,9 +1976,7 @@ checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" dependencies = [ "http", "hyper", - "log", "rustls", - "rustls-native-certs", "tokio", "tokio-rustls", ] @@ -2486,7 +2480,10 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-sesv2", + "aws-smithy-async", + "aws-smithy-client", "lettre", + "mas-http", "mas-templates", "tokio", "tracing", diff --git a/crates/config/src/sections/email.rs b/crates/config/src/sections/email.rs index 713198a8..cdde5054 100644 --- a/crates/config/src/sections/email.rs +++ b/crates/config/src/sections/email.rs @@ -195,7 +195,7 @@ impl EmailTransportConfig { .context("failed to build SMTP transport") } EmailTransportConfig::Sendmail { command } => Ok(MailTransport::sendmail(command)), - EmailTransportConfig::AwsSes => Ok(MailTransport::aws_ses().await), + EmailTransportConfig::AwsSes => Ok(MailTransport::aws_ses().await?), } } } diff --git a/crates/email/Cargo.toml b/crates/email/Cargo.toml index 51e88380..d9edd8cd 100644 --- a/crates/email/Cargo.toml +++ b/crates/email/Cargo.toml @@ -10,10 +10,14 @@ anyhow = "1.0.66" async-trait = "0.1.58" tokio = { version = "1.21.2", features = ["macros"] } tracing = "0.1.37" -aws-sdk-sesv2 = "0.21.0" -aws-config = "0.51.0" + +aws-sdk-sesv2 = { version = "0.21.0", default-features = false } +aws-config = { version = "0.51.0", default-features = false } +aws-smithy-client = { version = "0.51.0", default-features = false, features = ["client-hyper"] } +aws-smithy-async = { version = "0.51.0", default-features = false, features = ["rt-tokio"] } mas-templates = { path = "../templates" } +mas-http = { path = "../http" } [dependencies.lettre] version = "0.10.1" diff --git a/crates/email/src/transport/aws_ses.rs b/crates/email/src/transport/aws_ses.rs index 864f21cf..b0a054be 100644 --- a/crates/email/src/transport/aws_ses.rs +++ b/crates/email/src/transport/aws_ses.rs @@ -12,13 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use async_trait::async_trait; +use aws_config::provider_config::ProviderConfig; use aws_sdk_sesv2::{ + middleware::DefaultMiddleware, model::{EmailContent, RawMessage}, types::Blob, Client, }; +use aws_smithy_async::rt::sleep::TokioSleep; +use aws_smithy_client::erase::{DynConnector, DynMiddleware}; use lettre::{address::Envelope, AsyncTransport}; +use mas_http::{otel::TraceLayer, ClientInitError}; /// An asynchronous email transport that sends email via the AWS Simple Email /// Service v2 API @@ -28,17 +35,47 @@ pub struct Transport { impl Transport { /// Construct a [`Transport`] from the environment - pub async fn from_env() -> Self { - let config = aws_config::from_env().load().await; - let config = aws_sdk_sesv2::Config::from(&config); - Self::new(config) - } + /// + /// # Errors + /// + /// Returns an error if the HTTP client failed to initialize + pub async fn from_env() -> Result { + let sleep = Arc::new(TokioSleep::new()); - /// Constructs a [`Transport`] from a given AWS SES SDK config - #[must_use] - pub fn new(config: aws_sdk_sesv2::Config) -> Self { - let client = Client::from_conf(config); - Self { client } + // Create the TCP connector from mas-http. This way we share the root + // certificate loader with it + let http_connector = mas_http::make_traced_connector() + .await + .expect("failed to create HTTPS connector"); + + let http_connector = aws_smithy_client::hyper_ext::Adapter::builder() + .sleep_impl(sleep.clone()) + .build(http_connector); + + let http_connector = DynConnector::new(http_connector); + + // Middleware to add tracing to AWS SDK operations + let middleware = DynMiddleware::new(( + TraceLayer::with_namespace("aws_sdk") + .make_span_builder(mas_http::otel::DefaultMakeSpanBuilder::new("aws_sdk")) + .on_error(mas_http::otel::DebugOnError), + DefaultMiddleware::default(), + )); + + // Use that connector for discovering the config + let config = ProviderConfig::default().with_http_connector(http_connector.clone()); + let config = aws_config::from_env().configure(config).load().await; + let config = aws_sdk_sesv2::Config::from(&config); + + // As well as for the client itself + let client = aws_smithy_client::Client::builder() + .sleep_impl(sleep) + .connector(http_connector) + .middleware(middleware) + .build_dyn(); + + let client = Client::with_config(client, config); + Ok(Self { client }) } } diff --git a/crates/email/src/transport/mod.rs b/crates/email/src/transport/mod.rs index bb1773b7..33b59e7e 100644 --- a/crates/email/src/transport/mod.rs +++ b/crates/email/src/transport/mod.rs @@ -25,6 +25,7 @@ use lettre::{ }, AsyncTransport, Tokio1Executor, }; +use mas_http::ClientInitError; pub mod aws_ses; @@ -101,8 +102,13 @@ impl Transport { } /// Construct a AWS SES transport - pub async fn aws_ses() -> Self { - Self::new(TransportInner::AwsSes(aws_ses::Transport::from_env().await)) + /// + /// # Errors + /// + /// Returns an error if the HTTP client failed to initialize + pub async fn aws_ses() -> Result { + let transport = aws_ses::Transport::from_env().await?; + Ok(Self::new(TransportInner::AwsSes(transport))) } } diff --git a/crates/http/src/client.rs b/crates/http/src/client.rs index 6aa79f31..4c3b79c4 100644 --- a/crates/http/src/client.rs +++ b/crates/http/src/client.rs @@ -148,6 +148,16 @@ pub enum NativeRootsLoadError { Empty, } +async fn make_tls_config() -> Result { + let roots = tls_roots().await?; + let tls_config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(roots) + .with_no_client_auth(); + + Ok(tls_config) +} + /// Create a basic Hyper HTTP & HTTPS client without any tracing /// /// # Errors @@ -159,57 +169,63 @@ where B: http_body::Body + Send + 'static, E: Into, { - let resolver = GaiResolver::new(); - let roots = tls_roots().await?; - let tls_config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(roots) - .with_no_client_auth(); - - Ok(make_client(resolver, tls_config)) + let https = make_untraced_connector().await?; + Ok(Client::builder().build(https)) } -async fn make_base_client( +async fn make_traced_client( ) -> Result>>, B>, ClientInitError> where B: http_body::Body + Send + 'static, E: Into, { - // Trace DNS requests - let resolver = TraceLayer::dns().layer(GaiResolver::new()); - - let roots = tls_roots().await?; - let tls_config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(roots) - .with_no_client_auth(); - - Ok(make_client(resolver, tls_config)) + let https = make_traced_connector().await?; + Ok(Client::builder().build(https)) } -fn make_client( +/// Create a traced HTTP and HTTPS connector +/// +/// # Errors +/// +/// Returns an error if it failed to load the TLS certificates +pub async fn make_traced_connector( +) -> Result>>, ClientInitError> +where +{ + // Trace DNS requests + let resolver = TraceLayer::dns().layer(GaiResolver::new()); + let tls_config = make_tls_config().await?; + Ok(make_connector(resolver, tls_config)) +} + +async fn make_untraced_connector( +) -> Result>, ClientInitError> +where +{ + let resolver = GaiResolver::new(); + let tls_config = make_tls_config().await?; + Ok(make_connector(resolver, tls_config)) +} + +fn make_connector( resolver: R, tls_config: rustls::ClientConfig, -) -> hyper::Client>, B> +) -> HttpsConnector> where R: Service + Send + Sync + Clone + 'static, R::Error: std::error::Error + Send + Sync, R::Future: Send, R::Response: Iterator, - B: http_body::Body + Send + 'static, - E: Into, { let mut http = HttpConnector::new_with_resolver(resolver); http.enforce_http(false); - let https = HttpsConnectorBuilder::new() + HttpsConnectorBuilder::new() .with_tls_config(tls_config) .https_or_http() .enable_http1() .enable_http2() - .wrap_connector(http); - - Client::builder().build(https) + .wrap_connector(http) } /// Create a traced HTTP client, with a default timeout, which follows redirects @@ -228,7 +244,7 @@ where B: http_body::Body + Default + Send + 'static, E: Into + 'static, { - let client = make_base_client().await?; + let client = make_traced_client().await?; let layer = ( // Convert the errors to ClientError to help dealing with them diff --git a/crates/http/src/layers/otel/on_error.rs b/crates/http/src/layers/otel/on_error.rs index 2c441115..8bd70d43 100644 --- a/crates/http/src/layers/otel/on_error.rs +++ b/crates/http/src/layers/otel/on_error.rs @@ -31,3 +31,16 @@ where span.add_event("exception".to_owned(), attributes); } } + +#[derive(Debug, Clone, Copy, Default)] +pub struct DebugOnError; + +impl OnError for DebugOnError +where + E: std::fmt::Debug, +{ + fn on_error(&self, span: &SpanRef<'_>, _metrics_labels: &mut Vec, err: &E) { + let attributes = vec![EXCEPTION_MESSAGE.string(format!("{err:?}"))]; + span.add_event("exception".to_owned(), attributes); + } +} diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index 7f05f83f..642e56f8 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -30,7 +30,7 @@ mod ext; mod layers; #[cfg(feature = "client")] -pub use self::client::{client, make_untraced_client}; +pub use self::client::{client, make_traced_connector, make_untraced_client, ClientInitError}; pub use self::{ ext::{set_propagator, CorsLayerExt, ServiceExt as HttpServiceExt}, layers::{ diff --git a/crates/policy/src/lib.rs b/crates/policy/src/lib.rs index a3b7a176..7e6e7a97 100644 --- a/crates/policy/src/lib.rs +++ b/crates/policy/src/lib.rs @@ -67,6 +67,7 @@ pub struct PolicyFactory { } impl PolicyFactory { + #[tracing::instrument(skip(source), err(Display))] pub async fn load( mut source: impl AsyncRead + std::marker::Unpin, data: serde_json::Value, @@ -125,6 +126,7 @@ impl PolicyFactory { .await } + #[tracing::instrument(skip(self), err)] pub async fn instantiate(&self) -> Result { let mut store = Store::new(&self.engine, ()); let runtime = Runtime::new(&mut store, &self.module).await?;