1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-07-29 22:01:14 +03:00

HTTP metrics & other stuff

This commit is contained in:
Quentin Gliech
2022-09-19 12:14:23 +02:00
parent 3901829ccd
commit 56fdb64a84
7 changed files with 206 additions and 61 deletions

View File

@ -1,5 +1,6 @@
use std::sync::Arc;
use opentelemetry::metrics::{Counter, Histogram, UpDownCounter};
use tower::Layer;
use super::{
@ -22,12 +23,43 @@ pub struct TraceLayer<
make_span_builder: MakeSpanBuilder,
on_response: OnResponse,
on_error: OnError,
inflight_requests: UpDownCounter<i64>,
request_counter: Counter<u64>,
request_histogram: Histogram<f64>,
}
impl Default for TraceLayer {
fn default() -> Self {
Self::with_namespace("http")
}
}
impl TraceLayer {
#[must_use]
pub fn with_namespace(namespace: &'static str) -> Self {
let tracer = Arc::new(opentelemetry::global::tracer("mas-http"));
Self::new(tracer)
let meter = opentelemetry::global::meter("mas-http");
let inflight_requests = meter
.i64_up_down_counter(format!("{namespace}.inflight_requests"))
.with_description("Number of in-flight requests")
.init();
let request_counter = meter
.u64_counter(format!("{namespace}.requests_total"))
.with_description("Total number of requests made.")
.init();
let request_histogram = meter
.f64_histogram(format!("{namespace}.request_duration_seconds"))
.with_description("The request latencies in seconds.")
.init();
Self::new(
tracer,
inflight_requests,
request_counter,
request_histogram,
)
}
}
@ -35,7 +67,12 @@ impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
TraceLayer<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
{
#[must_use]
pub fn new(tracer: Arc<opentelemetry::global::BoxedTracer>) -> Self
pub fn new(
tracer: Arc<opentelemetry::global::BoxedTracer>,
inflight_requests: UpDownCounter<i64>,
request_counter: Counter<u64>,
request_histogram: Histogram<f64>,
) -> Self
where
ExtractContext: Default,
InjectContext: Default,
@ -50,6 +87,9 @@ impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
make_span_builder: MakeSpanBuilder::default(),
on_response: OnResponse::default(),
on_error: OnError::default(),
inflight_requests,
request_counter,
request_histogram,
}
}
@ -65,6 +105,9 @@ impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
make_span_builder: self.make_span_builder,
on_response: self.on_response,
on_error: self.on_error,
inflight_requests: self.inflight_requests,
request_counter: self.request_counter,
request_histogram: self.request_histogram,
}
}
@ -80,6 +123,9 @@ impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
make_span_builder: self.make_span_builder,
on_response: self.on_response,
on_error: self.on_error,
inflight_requests: self.inflight_requests,
request_counter: self.request_counter,
request_histogram: self.request_histogram,
}
}
@ -95,6 +141,9 @@ impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
make_span_builder,
on_response: self.on_response,
on_error: self.on_error,
inflight_requests: self.inflight_requests,
request_counter: self.request_counter,
request_histogram: self.request_histogram,
}
}
@ -110,6 +159,9 @@ impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
make_span_builder: self.make_span_builder,
on_response,
on_error: self.on_error,
inflight_requests: self.inflight_requests,
request_counter: self.request_counter,
request_histogram: self.request_histogram,
}
}
@ -125,6 +177,9 @@ impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError>
make_span_builder: self.make_span_builder,
on_response: self.on_response,
on_error,
inflight_requests: self.inflight_requests,
request_counter: self.request_counter,
request_histogram: self.request_histogram,
}
}
}
@ -141,14 +196,17 @@ where
type Service = Trace<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError, S>;
fn layer(&self, inner: S) -> Self::Service {
Trace {
Trace::new(
inner,
tracer: self.tracer.clone(),
extract_context: self.extract_context.clone(),
inject_context: self.inject_context.clone(),
make_span_builder: self.make_span_builder.clone(),
on_response: self.on_response.clone(),
on_error: self.on_error.clone(),
}
self.tracer.clone(),
self.extract_context.clone(),
self.inject_context.clone(),
self.make_span_builder.clone(),
self.on_response.clone(),
self.on_error.clone(),
self.inflight_requests.clone(),
self.request_counter.clone(),
self.request_histogram.clone(),
)
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::{borrow::Cow, vec::Vec};
#[cfg(feature = "axum")]
use axum::extract::{ConnectInfo, MatchedPath};
@ -20,11 +20,14 @@ use headers::{ContentLength, HeaderMapExt, Host, UserAgent};
use http::{Method, Request, Version};
#[cfg(feature = "client")]
use hyper::client::connect::dns::Name;
use opentelemetry::trace::{SpanBuilder, SpanKind};
use opentelemetry::{
trace::{SpanBuilder, SpanKind},
KeyValue,
};
use opentelemetry_semantic_conventions::trace as SC;
pub trait MakeSpanBuilder<R> {
fn make_span_builder(&self, request: &R) -> SpanBuilder;
fn make_span_builder(&self, request: &R) -> (SpanBuilder, Vec<KeyValue>);
}
#[derive(Debug, Clone, Copy)]
@ -48,8 +51,8 @@ impl Default for DefaultMakeSpanBuilder {
}
impl<R> MakeSpanBuilder<R> for DefaultMakeSpanBuilder {
fn make_span_builder(&self, _request: &R) -> SpanBuilder {
SpanBuilder::from_name(self.operation)
fn make_span_builder(&self, _request: &R) -> (SpanBuilder, Vec<KeyValue>) {
(SpanBuilder::from_name(self.operation), Vec::new())
}
}
@ -114,9 +117,10 @@ impl SpanFromHttpRequest {
}
impl<B> MakeSpanBuilder<Request<B>> for SpanFromHttpRequest {
fn make_span_builder(&self, request: &Request<B>) -> SpanBuilder {
fn make_span_builder(&self, request: &Request<B>) -> (SpanBuilder, Vec<KeyValue>) {
let method = SC::HTTP_METHOD.string(http_method_str(request.method()));
let mut attributes = vec![
SC::HTTP_METHOD.string(http_method_str(request.method())),
method.clone(),
SC::HTTP_FLAVOR.string(http_flavor(request.version())),
SC::HTTP_TARGET.string(request.uri().to_string()),
];
@ -137,9 +141,12 @@ impl<B> MakeSpanBuilder<Request<B>> for SpanFromHttpRequest {
}
}
SpanBuilder::from_name(self.operation)
let span_builder = SpanBuilder::from_name(self.operation)
.with_kind(self.span_kind.clone())
.with_attributes(attributes)
.with_attributes(attributes);
let metrics_labels = vec![method];
(span_builder, metrics_labels)
}
}
@ -149,9 +156,13 @@ pub struct SpanFromAxumRequest;
#[cfg(feature = "axum")]
impl<B> MakeSpanBuilder<Request<B>> for SpanFromAxumRequest {
fn make_span_builder(&self, request: &Request<B>) -> SpanBuilder {
fn make_span_builder(&self, request: &Request<B>) -> (SpanBuilder, Vec<KeyValue>) {
let method = SC::HTTP_METHOD.string(http_method_str(request.method()));
let mut metrics_labels = vec![method.clone()];
let mut attributes = vec![
SC::HTTP_METHOD.string(http_method_str(request.method())),
method,
SC::HTTP_FLAVOR.string(http_flavor(request.version())),
SC::HTTP_TARGET.string(request.uri().to_string()),
];
@ -181,17 +192,23 @@ impl<B> MakeSpanBuilder<Request<B>> for SpanFromAxumRequest {
attributes.push(SC::NET_PEER_PORT.i64(addr.port().into()));
}
let name = if let Some(path) = request.extensions().get::<MatchedPath>() {
let path = path.as_str().to_owned();
attributes.push(SC::HTTP_ROUTE.string(path.clone()));
path
let (name, route) = if let Some(path) = request.extensions().get::<MatchedPath>() {
let path = path.as_str();
(path, path)
} else {
request.uri().path().to_owned()
(request.uri().path(), "FALLBACK")
};
SpanBuilder::from_name(name)
.with_kind(SpanKind::Server)
.with_attributes(attributes)
let route = SC::HTTP_ROUTE.string(route.to_owned());
attributes.push(route.clone());
metrics_labels.push(route);
(
SpanBuilder::from_name(name.to_owned())
.with_kind(SpanKind::Server)
.with_attributes(attributes),
metrics_labels,
)
}
}
@ -201,11 +218,14 @@ pub struct SpanFromDnsRequest;
#[cfg(feature = "client")]
impl MakeSpanBuilder<Name> for SpanFromDnsRequest {
fn make_span_builder(&self, request: &Name) -> SpanBuilder {
fn make_span_builder(&self, request: &Name) -> (SpanBuilder, Vec<KeyValue>) {
let attributes = vec![SC::NET_HOST_NAME.string(request.as_str().to_owned())];
SpanBuilder::from_name("resolve")
.with_kind(SpanKind::Client)
.with_attributes(attributes)
(
SpanBuilder::from_name("resolve")
.with_kind(SpanKind::Client)
.with_attributes(attributes.clone()),
attributes,
)
}
}

View File

@ -95,7 +95,7 @@ pub type TraceDns<S> = Trace<
impl TraceHttpServerLayer {
#[must_use]
pub fn http_server() -> Self {
TraceLayer::default()
TraceLayer::with_namespace("http_server")
.make_span_builder(SpanFromHttpRequest::server())
.on_response(OnHttpResponse)
.extract_context(ExtractFromHttpRequest)
@ -106,7 +106,7 @@ impl TraceHttpServerLayer {
impl TraceAxumServerLayer {
#[must_use]
pub fn axum() -> Self {
TraceLayer::default()
TraceLayer::with_namespace("http_server")
.make_span_builder(SpanFromAxumRequest)
.on_response(OnHttpResponse)
.extract_context(ExtractFromHttpRequest)
@ -116,7 +116,7 @@ impl TraceAxumServerLayer {
impl TraceHttpClientLayer {
#[must_use]
pub fn http_client(operation: &'static str) -> Self {
TraceLayer::default()
TraceLayer::with_namespace("http_client")
.make_span_builder(SpanFromHttpRequest::client(operation))
.on_response(OnHttpResponse)
.inject_context(InjectInHttpRequest)
@ -124,7 +124,7 @@ impl TraceHttpClientLayer {
#[must_use]
pub fn inner_http_client() -> Self {
TraceLayer::default()
TraceLayer::with_namespace("inner_http_client")
.make_span_builder(SpanFromHttpRequest::inner_client())
.on_response(OnHttpResponse)
.inject_context(InjectInHttpRequest)
@ -135,7 +135,7 @@ impl TraceHttpClientLayer {
impl TraceDnsLayer {
#[must_use]
pub fn dns() -> Self {
TraceLayer::default().make_span_builder(SpanFromDnsRequest)
TraceLayer::with_namespace("dns").make_span_builder(SpanFromDnsRequest)
}
}

View File

@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use opentelemetry::trace::SpanRef;
use opentelemetry::{trace::SpanRef, KeyValue};
use opentelemetry_semantic_conventions::trace::EXCEPTION_MESSAGE;
pub trait OnError<E> {
fn on_error(&self, span: &SpanRef<'_>, err: &E);
fn on_error(&self, span: &SpanRef<'_>, err: &E) -> Vec<KeyValue>;
}
#[derive(Debug, Clone, Copy, Default)]
@ -26,8 +26,10 @@ impl<E> OnError<E> for DefaultOnError
where
E: std::fmt::Display,
{
fn on_error(&self, span: &SpanRef<'_>, err: &E) {
fn on_error(&self, span: &SpanRef<'_>, err: &E) -> Vec<KeyValue> {
let attributes = vec![EXCEPTION_MESSAGE.string(err.to_string())];
span.add_event("exception".to_owned(), attributes);
Vec::new()
}
}

View File

@ -16,26 +16,29 @@ use headers::{ContentLength, HeaderMapExt};
use http::Response;
#[cfg(feature = "client")]
use hyper::client::connect::HttpInfo;
use opentelemetry::trace::SpanRef;
use opentelemetry::{trace::SpanRef, KeyValue};
use opentelemetry_semantic_conventions::trace as SC;
pub trait OnResponse<R> {
fn on_response(&self, span: &SpanRef<'_>, response: &R);
fn on_response(&self, span: &SpanRef<'_>, response: &R) -> Vec<KeyValue>;
}
#[derive(Debug, Clone, Copy, Default)]
pub struct DefaultOnResponse;
impl<R> OnResponse<R> for DefaultOnResponse {
fn on_response(&self, _span: &SpanRef<'_>, _response: &R) {}
fn on_response(&self, _span: &SpanRef<'_>, _response: &R) -> Vec<KeyValue> {
Vec::new()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct OnHttpResponse;
impl<B> OnResponse<Response<B>> for OnHttpResponse {
fn on_response(&self, span: &SpanRef<'_>, response: &Response<B>) {
span.set_attribute(SC::HTTP_STATUS_CODE.i64(i64::from(response.status().as_u16())));
fn on_response(&self, span: &SpanRef<'_>, response: &Response<B>) -> Vec<KeyValue> {
let status_code = i64::from(response.status().as_u16());
span.set_attribute(SC::HTTP_STATUS_CODE.i64(status_code));
if let Some(ContentLength(content_length)) = response.headers().typed_get() {
if let Ok(content_length) = content_length.try_into() {
@ -52,5 +55,7 @@ impl<B> OnResponse<Response<B>> for OnHttpResponse {
span.set_attribute(SC::NET_HOST_IP.string(info.local_addr().ip().to_string()));
span.set_attribute(SC::NET_HOST_PORT.i64(info.local_addr().port().into()));
}
vec![KeyValue::new("status_code", status_code)]
}
}

View File

@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{sync::Arc, task::Poll};
use std::{sync::Arc, task::Poll, time::SystemTime};
use futures_util::{future::BoxFuture, FutureExt as _};
use opentelemetry::trace::{FutureExt as _, TraceContextExt};
use opentelemetry::{
metrics::{Counter, Histogram, UpDownCounter},
trace::{FutureExt as _, TraceContextExt},
};
use tower::Service;
use super::{
@ -25,13 +28,50 @@ use super::{
#[derive(Debug, Clone)]
pub struct Trace<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError, S> {
pub(crate) inner: S,
pub(crate) tracer: Arc<opentelemetry::global::BoxedTracer>,
pub(crate) extract_context: ExtractContext,
pub(crate) inject_context: InjectContext,
pub(crate) make_span_builder: MakeSpanBuilder,
pub(crate) on_response: OnResponse,
pub(crate) on_error: OnError,
inner: S,
tracer: Arc<opentelemetry::global::BoxedTracer>,
extract_context: ExtractContext,
inject_context: InjectContext,
make_span_builder: MakeSpanBuilder,
on_response: OnResponse,
on_error: OnError,
inflight_requests: UpDownCounter<i64>,
request_counter: Counter<u64>,
request_histogram: Histogram<f64>,
}
impl<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError, S>
Trace<ExtractContext, InjectContext, MakeSpanBuilder, OnResponse, OnError, S>
{
#[allow(clippy::too_many_arguments)]
pub fn new(
service: S,
tracer: Arc<opentelemetry::global::BoxedTracer>,
extract_context: ExtractContext,
inject_context: InjectContext,
make_span_builder: MakeSpanBuilder,
on_response: OnResponse,
on_error: OnError,
inflight_requests: UpDownCounter<i64>,
request_counter: Counter<u64>,
request_histogram: Histogram<f64>,
) -> Self {
Self {
inner: service,
tracer,
extract_context,
inject_context,
make_span_builder,
on_response,
on_error,
inflight_requests,
request_counter,
request_histogram,
}
}
}
impl<Req, S, ExtractContextT, InjectContextT, MakeSpanBuilderT, OnResponseT, OnErrorT> Service<Req>
@ -54,13 +94,20 @@ where
}
fn call(&mut self, request: Req) -> Self::Future {
let inflight_requests = self.inflight_requests.clone();
let request_counter = self.request_counter.clone();
let request_histogram = self.request_histogram.clone();
let start_time = SystemTime::now();
let cx = self.extract_context.extract_context(&request);
let span_builder = self.make_span_builder.make_span_builder(&request);
let (span_builder, mut metrics_labels) = self.make_span_builder.make_span_builder(&request);
let span = span_builder.start_with_context(self.tracer.as_ref(), &cx);
let cx = cx.with_span(span);
let request = self.inject_context.inject_context(&cx, request);
inflight_requests.add(&cx, 1, &metrics_labels);
let on_response = self.on_response.clone();
let on_error = self.on_error.clone();
let attachment = cx.clone().attach();
@ -69,11 +116,21 @@ where
.call(request)
.with_context(cx.clone())
.inspect(move |r| {
inflight_requests.add(&cx, -1, &metrics_labels);
let span = cx.span();
match r {
let extra_labels = match r {
Ok(response) => on_response.on_response(&span, response),
Err(err) => on_error.on_error(&span, err),
}
};
metrics_labels.extend_from_slice(&extra_labels);
request_counter.add(&cx, 1, &metrics_labels);
request_histogram.record(
&cx,
start_time.elapsed().map_or(0.0, |d| d.as_secs_f64()),
&metrics_labels,
);
span.end();
})