You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-07-31 09:24:31 +03:00
Attach remote and local address to HTTP server/client spans
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -2136,6 +2136,7 @@ dependencies = [
|
|||||||
name = "mas-http"
|
name = "mas-http"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"axum",
|
||||||
"bytes 1.1.0",
|
"bytes 1.1.0",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"http",
|
"http",
|
||||||
|
@ -222,7 +222,7 @@ impl Options {
|
|||||||
info!("Listening on http://{}", listener.local_addr().unwrap());
|
info!("Listening on http://{}", listener.local_addr().unwrap());
|
||||||
|
|
||||||
Server::from_tcp(listener)?
|
Server::from_tcp(listener)?
|
||||||
.serve(router.into_make_service())
|
.serve(router.into_make_service_with_connect_info::<SocketAddr>())
|
||||||
.with_graceful_shutdown(shutdown_signal())
|
.with_graceful_shutdown(shutdown_signal())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ edition = "2021"
|
|||||||
license = "Apache-2.0"
|
license = "Apache-2.0"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
axum = "0.5.1"
|
||||||
bytes = "1.1.0"
|
bytes = "1.1.0"
|
||||||
futures-util = "0.3.21"
|
futures-util = "0.3.21"
|
||||||
http = "0.2.6"
|
http = "0.2.6"
|
||||||
|
@ -12,13 +12,14 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use std::borrow::Cow;
|
use std::{borrow::Cow, net::SocketAddr};
|
||||||
|
|
||||||
|
use axum::extract::{ConnectInfo, MatchedPath};
|
||||||
use http::{Method, Request, Version};
|
use http::{Method, Request, Version};
|
||||||
use hyper::client::connect::dns::Name;
|
use hyper::client::connect::dns::Name;
|
||||||
use opentelemetry::trace::{SpanBuilder, SpanKind};
|
use opentelemetry::trace::{SpanBuilder, SpanKind};
|
||||||
use opentelemetry_semantic_conventions::trace::{
|
use opentelemetry_semantic_conventions::trace::{
|
||||||
HTTP_FLAVOR, HTTP_METHOD, HTTP_URL, NET_HOST_NAME,
|
HTTP_FLAVOR, HTTP_METHOD, HTTP_URL, NET_HOST_NAME, NET_PEER_IP, NET_PEER_PORT,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub trait MakeSpanBuilder<R> {
|
pub trait MakeSpanBuilder<R> {
|
||||||
@ -125,6 +126,34 @@ impl<B> MakeSpanBuilder<Request<B>> for SpanFromHttpRequest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct SpanFromAxumRequest;
|
||||||
|
|
||||||
|
impl<B> MakeSpanBuilder<Request<B>> for SpanFromAxumRequest {
|
||||||
|
fn make_span_builder(&self, request: &Request<B>) -> SpanBuilder {
|
||||||
|
let mut attributes = vec![
|
||||||
|
HTTP_METHOD.string(http_method_str(request.method())),
|
||||||
|
HTTP_FLAVOR.string(http_flavor(request.version())),
|
||||||
|
HTTP_URL.string(request.uri().to_string()),
|
||||||
|
];
|
||||||
|
|
||||||
|
if let Some(ConnectInfo(addr)) = request.extensions().get::<ConnectInfo<SocketAddr>>() {
|
||||||
|
attributes.push(NET_PEER_IP.string(addr.ip().to_string()));
|
||||||
|
attributes.push(NET_PEER_PORT.i64(addr.port().into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let path = if let Some(path) = request.extensions().get::<MatchedPath>() {
|
||||||
|
path.as_str()
|
||||||
|
} else {
|
||||||
|
request.uri().path()
|
||||||
|
};
|
||||||
|
|
||||||
|
SpanBuilder::from_name(path.to_string())
|
||||||
|
.with_kind(SpanKind::Server)
|
||||||
|
.with_attributes(attributes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Default)]
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
pub struct SpanFromDnsRequest;
|
pub struct SpanFromDnsRequest;
|
||||||
|
|
||||||
|
@ -37,6 +37,23 @@ pub type TraceHttpServer<S> = Trace<
|
|||||||
S,
|
S,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
|
pub type TraceAxumServerLayer = TraceLayer<
|
||||||
|
ExtractFromHttpRequest,
|
||||||
|
DefaultInjectContext,
|
||||||
|
SpanFromAxumRequest,
|
||||||
|
OnHttpResponse,
|
||||||
|
DefaultOnError,
|
||||||
|
>;
|
||||||
|
|
||||||
|
pub type TraceAxumServer<S> = Trace<
|
||||||
|
ExtractFromHttpRequest,
|
||||||
|
DefaultInjectContext,
|
||||||
|
SpanFromAxumRequest,
|
||||||
|
OnHttpResponse,
|
||||||
|
DefaultOnError,
|
||||||
|
S,
|
||||||
|
>;
|
||||||
|
|
||||||
pub type TraceHttpClientLayer = TraceLayer<
|
pub type TraceHttpClientLayer = TraceLayer<
|
||||||
DefaultExtractContext,
|
DefaultExtractContext,
|
||||||
InjectInHttpRequest,
|
InjectInHttpRequest,
|
||||||
@ -81,6 +98,16 @@ impl TraceHttpServerLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TraceAxumServerLayer {
|
||||||
|
#[must_use]
|
||||||
|
pub fn axum() -> Self {
|
||||||
|
TraceLayer::default()
|
||||||
|
.make_span_builder(SpanFromAxumRequest)
|
||||||
|
.on_response(OnHttpResponse)
|
||||||
|
.extract_context(ExtractFromHttpRequest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl TraceHttpClientLayer {
|
impl TraceHttpClientLayer {
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn http_client(operation: &'static str) -> Self {
|
pub fn http_client(operation: &'static str) -> Self {
|
||||||
|
@ -13,8 +13,11 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use http::Response;
|
use http::Response;
|
||||||
|
use hyper::client::connect::HttpInfo;
|
||||||
use opentelemetry::trace::SpanRef;
|
use opentelemetry::trace::SpanRef;
|
||||||
use opentelemetry_semantic_conventions::trace::HTTP_STATUS_CODE;
|
use opentelemetry_semantic_conventions::trace::{
|
||||||
|
HTTP_STATUS_CODE, NET_HOST_IP, NET_HOST_PORT, NET_PEER_IP, NET_PEER_PORT,
|
||||||
|
};
|
||||||
|
|
||||||
pub trait OnResponse<R> {
|
pub trait OnResponse<R> {
|
||||||
fn on_response(&self, span: &SpanRef<'_>, response: &R);
|
fn on_response(&self, span: &SpanRef<'_>, response: &R);
|
||||||
@ -33,5 +36,14 @@ pub struct OnHttpResponse;
|
|||||||
impl<B> OnResponse<Response<B>> for OnHttpResponse {
|
impl<B> OnResponse<Response<B>> for OnHttpResponse {
|
||||||
fn on_response(&self, span: &SpanRef<'_>, response: &Response<B>) {
|
fn on_response(&self, span: &SpanRef<'_>, response: &Response<B>) {
|
||||||
span.set_attribute(HTTP_STATUS_CODE.i64(i64::from(response.status().as_u16())));
|
span.set_attribute(HTTP_STATUS_CODE.i64(i64::from(response.status().as_u16())));
|
||||||
|
|
||||||
|
// Get local and remote address from hyper's HttpInfo injected by the
|
||||||
|
// HttpConnector
|
||||||
|
if let Some(info) = response.extensions().get::<HttpInfo>() {
|
||||||
|
span.set_attribute(NET_PEER_IP.string(info.remote_addr().ip().to_string()));
|
||||||
|
span.set_attribute(NET_PEER_PORT.i64(info.remote_addr().port().into()));
|
||||||
|
span.set_attribute(NET_HOST_IP.string(info.local_addr().ip().to_string()));
|
||||||
|
span.set_attribute(NET_HOST_PORT.i64(info.local_addr().port().into()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ where
|
|||||||
fn layer(&self, inner: S) -> Self::Service {
|
fn layer(&self, inner: S) -> Self::Service {
|
||||||
ServiceBuilder::new()
|
ServiceBuilder::new()
|
||||||
.compression()
|
.compression()
|
||||||
.layer(TraceLayer::http_server())
|
.layer(TraceLayer::axum())
|
||||||
.service(inner)
|
.service(inner)
|
||||||
.boxed_clone()
|
.boxed_clone()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user