You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-08-07 17:03:01 +03:00
Better AWS SDK tracing
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2537,6 +2537,8 @@ name = "mas-http"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"aws-smithy-http",
|
||||||
|
"aws-types",
|
||||||
"axum 0.6.0-rc.2",
|
"axum 0.6.0-rc.2",
|
||||||
"bytes 1.2.1",
|
"bytes 1.2.1",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
@@ -17,7 +17,7 @@ aws-smithy-client = { version = "0.51.0", default-features = false, features = [
|
|||||||
aws-smithy-async = { version = "0.51.0", default-features = false, features = ["rt-tokio"] }
|
aws-smithy-async = { version = "0.51.0", default-features = false, features = ["rt-tokio"] }
|
||||||
|
|
||||||
mas-templates = { path = "../templates" }
|
mas-templates = { path = "../templates" }
|
||||||
mas-http = { path = "../http" }
|
mas-http = { path = "../http", features = ["aws-sdk", "client"] }
|
||||||
|
|
||||||
[dependencies.lettre]
|
[dependencies.lettre]
|
||||||
version = "0.10.1"
|
version = "0.10.1"
|
||||||
|
@@ -55,12 +55,7 @@ impl Transport {
|
|||||||
let http_connector = DynConnector::new(http_connector);
|
let http_connector = DynConnector::new(http_connector);
|
||||||
|
|
||||||
// Middleware to add tracing to AWS SDK operations
|
// Middleware to add tracing to AWS SDK operations
|
||||||
let middleware = DynMiddleware::new((
|
let middleware = DynMiddleware::new((TraceLayer::aws_sdk(), DefaultMiddleware::default()));
|
||||||
TraceLayer::with_namespace("aws_sdk")
|
|
||||||
.make_span_builder(mas_http::otel::DefaultMakeSpanBuilder::new("aws_sdk"))
|
|
||||||
.on_error(mas_http::otel::DebugOnError),
|
|
||||||
DefaultMiddleware::default(),
|
|
||||||
));
|
|
||||||
|
|
||||||
// Use that connector for discovering the config
|
// Use that connector for discovering the config
|
||||||
let config = ProviderConfig::default().with_http_connector(http_connector.clone());
|
let config = ProviderConfig::default().with_http_connector(http_connector.clone());
|
||||||
|
@@ -6,6 +6,8 @@ edition = "2021"
|
|||||||
license = "Apache-2.0"
|
license = "Apache-2.0"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
aws-smithy-http = { version = "0.51.0", optional = true }
|
||||||
|
aws-types = { version = "0.51.0", optional = true }
|
||||||
axum = { version = "0.6.0-rc.2", optional = true }
|
axum = { version = "0.6.0-rc.2", optional = true }
|
||||||
bytes = "1.2.1"
|
bytes = "1.2.1"
|
||||||
futures-util = "0.3.25"
|
futures-util = "0.3.25"
|
||||||
@@ -40,6 +42,7 @@ tower = { version = "0.4.13", features = ["util"] }
|
|||||||
|
|
||||||
[features]
|
[features]
|
||||||
axum = ["dep:axum"]
|
axum = ["dep:axum"]
|
||||||
|
aws-sdk = ["dep:aws-smithy-http", "dep:aws-types"]
|
||||||
native-roots = ["dep:rustls-native-certs"]
|
native-roots = ["dep:rustls-native-certs"]
|
||||||
webpki-roots = ["dep:webpki-roots"]
|
webpki-roots = ["dep:webpki-roots"]
|
||||||
client = [
|
client = [
|
||||||
|
@@ -50,3 +50,27 @@ impl<T> InjectContext<Request<T>> for InjectInHttpRequest {
|
|||||||
request
|
request
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "aws-sdk")]
|
||||||
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
|
pub struct InjectInAwsRequest;
|
||||||
|
|
||||||
|
#[cfg(feature = "aws-sdk")]
|
||||||
|
impl InjectContext<aws_smithy_http::operation::Request> for InjectInAwsRequest {
|
||||||
|
type Output = aws_smithy_http::operation::Request;
|
||||||
|
|
||||||
|
fn inject_context(
|
||||||
|
&self,
|
||||||
|
cx: &Context,
|
||||||
|
mut request: aws_smithy_http::operation::Request,
|
||||||
|
) -> Self::Output {
|
||||||
|
let headers = request.http_mut().headers_mut();
|
||||||
|
let mut injector = HeaderInjector(headers);
|
||||||
|
|
||||||
|
opentelemetry::global::get_text_map_propagator(|propagator| {
|
||||||
|
propagator.inject_context(cx, &mut injector);
|
||||||
|
});
|
||||||
|
|
||||||
|
request
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -186,3 +186,50 @@ impl MakeSpanBuilder<Name> for SpanFromDnsRequest {
|
|||||||
.with_attributes(attributes)
|
.with_attributes(attributes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "aws-sdk")]
|
||||||
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
|
pub struct SpanFromAwsRequest;
|
||||||
|
|
||||||
|
#[cfg(feature = "aws-sdk")]
|
||||||
|
impl MakeSpanBuilder<aws_smithy_http::operation::Request> for SpanFromAwsRequest {
|
||||||
|
fn make_span_builder(&self, request: &aws_smithy_http::operation::Request) -> SpanBuilder {
|
||||||
|
let properties = request.properties();
|
||||||
|
let request = request.http();
|
||||||
|
let mut attributes = vec![
|
||||||
|
SC::RPC_SYSTEM.string("aws-api"),
|
||||||
|
SC::HTTP_METHOD.string(http_method_str(request.method())),
|
||||||
|
SC::HTTP_FLAVOR.string(http_flavor(request.version())),
|
||||||
|
SC::HTTP_TARGET.string(request.uri().to_string()),
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut name = Cow::Borrowed("aws_sdk");
|
||||||
|
if let Some(metadata) = properties.get::<aws_smithy_http::operation::Metadata>() {
|
||||||
|
attributes.push(SC::RPC_SERVICE.string(metadata.service().to_owned()));
|
||||||
|
attributes.push(SC::RPC_METHOD.string(metadata.name().to_owned()));
|
||||||
|
name = Cow::Owned(metadata.name().to_owned());
|
||||||
|
} else if let Some(service) = properties.get::<aws_types::SigningService>() {
|
||||||
|
attributes.push(SC::RPC_SERVICE.string(service.as_ref().to_owned()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let headers = request.headers();
|
||||||
|
|
||||||
|
if let Some(host) = headers.typed_get::<Host>() {
|
||||||
|
attributes.push(SC::HTTP_HOST.string(host.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(user_agent) = headers.typed_get::<UserAgent>() {
|
||||||
|
attributes.push(SC::HTTP_USER_AGENT.string(user_agent.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(ContentLength(content_length)) = headers.typed_get() {
|
||||||
|
if let Ok(content_length) = content_length.try_into() {
|
||||||
|
attributes.push(SC::HTTP_REQUEST_CONTENT_LENGTH.i64(content_length));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SpanBuilder::from_name(name)
|
||||||
|
.with_kind(SpanKind::Client)
|
||||||
|
.with_attributes(attributes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -102,6 +102,27 @@ pub type TraceDns<S> = Trace<
|
|||||||
S,
|
S,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
|
#[cfg(feature = "aws-sdk")]
|
||||||
|
pub type TraceAwsSdkClientLayer = TraceLayer<
|
||||||
|
DefaultExtractContext,
|
||||||
|
InjectInAwsRequest,
|
||||||
|
SpanFromAwsRequest,
|
||||||
|
DefaultMakeMetricsLabels,
|
||||||
|
OnAwsResponse,
|
||||||
|
DebugOnError,
|
||||||
|
>;
|
||||||
|
|
||||||
|
#[cfg(feature = "aws-sdk")]
|
||||||
|
pub type TraceAwsSdkClient<S> = Trace<
|
||||||
|
DefaultExtractContext,
|
||||||
|
InjectInAwsRequest,
|
||||||
|
SpanFromAwsRequest,
|
||||||
|
DefaultMakeMetricsLabels,
|
||||||
|
OnAwsResponse,
|
||||||
|
DebugOnError,
|
||||||
|
S,
|
||||||
|
>;
|
||||||
|
|
||||||
impl TraceHttpServerLayer {
|
impl TraceHttpServerLayer {
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn http_server() -> Self {
|
pub fn http_server() -> Self {
|
||||||
@@ -153,6 +174,18 @@ impl TraceDnsLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "aws-sdk")]
|
||||||
|
impl TraceAwsSdkClientLayer {
|
||||||
|
#[must_use]
|
||||||
|
pub fn aws_sdk() -> Self {
|
||||||
|
TraceLayer::with_namespace("aws_sdk")
|
||||||
|
.make_span_builder(SpanFromAwsRequest)
|
||||||
|
.on_response(OnAwsResponse)
|
||||||
|
.on_error(DebugOnError)
|
||||||
|
.inject_context(InjectInAwsRequest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "client")]
|
#[cfg(feature = "client")]
|
||||||
use self::make_metrics_labels::DefaultMakeMetricsLabels;
|
use self::make_metrics_labels::DefaultMakeMetricsLabels;
|
||||||
#[cfg(feature = "axum")]
|
#[cfg(feature = "axum")]
|
||||||
|
@@ -62,3 +62,36 @@ impl<B> OnResponse<Response<B>> for OnHttpResponse {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
|
pub struct OnAwsResponse;
|
||||||
|
|
||||||
|
impl OnResponse<aws_smithy_http::operation::Response> for OnAwsResponse {
|
||||||
|
fn on_response(
|
||||||
|
&self,
|
||||||
|
span: &SpanRef<'_>,
|
||||||
|
metrics_labels: &mut Vec<KeyValue>,
|
||||||
|
response: &aws_smithy_http::operation::Response,
|
||||||
|
) {
|
||||||
|
let response = response.http();
|
||||||
|
let status_code = i64::from(response.status().as_u16());
|
||||||
|
span.set_attribute(SC::HTTP_STATUS_CODE.i64(status_code));
|
||||||
|
metrics_labels.push(KeyValue::new("status_code", status_code));
|
||||||
|
|
||||||
|
if let Some(ContentLength(content_length)) = response.headers().typed_get() {
|
||||||
|
if let Ok(content_length) = content_length.try_into() {
|
||||||
|
span.set_attribute(SC::HTTP_RESPONSE_CONTENT_LENGTH.i64(content_length));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "client")]
|
||||||
|
// Get local and remote address from hyper's HttpInfo injected by the
|
||||||
|
// HttpConnector
|
||||||
|
if let Some(info) = response.extensions().get::<HttpInfo>() {
|
||||||
|
span.set_attribute(SC::NET_PEER_IP.string(info.remote_addr().ip().to_string()));
|
||||||
|
span.set_attribute(SC::NET_PEER_PORT.i64(info.remote_addr().port().into()));
|
||||||
|
span.set_attribute(SC::NET_HOST_IP.string(info.local_addr().ip().to_string()));
|
||||||
|
span.set_attribute(SC::NET_HOST_PORT.i64(info.local_addr().port().into()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user