1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-08-09 04:22:45 +03:00

Replace the OTEL-based tracing layer with tracing based layers

This commit is contained in:
Quentin Gliech
2023-04-15 14:21:12 +02:00
parent 4a4fbc7a16
commit 08f58db08b
47 changed files with 1703 additions and 1520 deletions

View File

@@ -19,13 +19,15 @@ thiserror = "1.0.40"
tower = "0.4.13"
tracing = "0.1.37"
tracing-opentelemetry = "0.18.0"
opentelemetry = "0.18.0"
ulid = "1.0.0"
url = "2.3.1"
serde = { version = "1.0.159", features = ["derive"] }
mas-axum-utils = { path = "../axum-utils" }
mas-storage = { path = "../storage" }
mas-storage-pg = { path = "../storage-pg" }
mas-data-model = { path = "../data-model" }
mas-email = { path = "../email" }
mas-http = { path = "../http" }
mas-data-model = { path = "../data-model" }
mas-storage = { path = "../storage" }
mas-storage-pg = { path = "../storage-pg" }
mas-tower = { path = "../tower" }

View File

@@ -28,7 +28,7 @@ use chrono::{DateTime, Utc};
use mas_storage::{oauth2::OAuth2AccessTokenRepository, RepositoryAccess};
use tracing::{debug, info};
use crate::{JobContextExt, State};
use crate::{utils::metrics_layer, JobContextExt, State};
#[derive(Default, Clone)]
pub struct CleanupExpiredTokensJob {
@@ -77,6 +77,7 @@ pub(crate) fn register(
let worker = WorkerBuilder::new(worker_name)
.stream(CronStream::new(schedule).to_stream())
.layer(state.inject())
.layer(metrics_layer::<CleanupExpiredTokensJob>())
.build_fn(cleanup_expired_tokens);
monitor.register(worker)

View File

@@ -27,7 +27,10 @@ use mas_storage::job::{JobWithSpanContext, VerifyEmailJob};
use rand::{distributions::Uniform, Rng};
use tracing::info;
use crate::{layers::TracingLayer, JobContextExt, State};
use crate::{
utils::{metrics_layer, trace_layer},
JobContextExt, State,
};
#[tracing::instrument(
name = "job.verify_email",
@@ -98,7 +101,8 @@ pub(crate) fn register(
let worker_name = format!("{job}-{suffix}", job = VerifyEmailJob::NAME);
let worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.layer(trace_layer::<VerifyEmailJob>())
.layer(metrics_layer::<JobWithSpanContext<VerifyEmailJob>>())
.with_storage(storage)
.build_fn(verify_email);
monitor.register(worker)

View File

@@ -1,70 +0,0 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::task::{Context, Poll};
use apalis_core::{job::Job, request::JobRequest};
use mas_storage::job::JobWithSpanContext;
use tower::{Layer, Service};
use tracing::{info_span, instrument::Instrumented, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
pub struct TracingLayer;
impl TracingLayer {
pub fn new() -> Self {
Self
}
}
impl<S> Layer<S> for TracingLayer {
type Service = TracingService<S>;
fn layer(&self, inner: S) -> Self::Service {
TracingService { inner }
}
}
pub struct TracingService<S> {
inner: S,
}
impl<J, S> Service<JobRequest<JobWithSpanContext<J>>> for TracingService<S>
where
J: Job,
S: Service<JobRequest<JobWithSpanContext<J>>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = Instrumented<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: JobRequest<JobWithSpanContext<J>>) -> Self::Future {
let span = info_span!(
"job.run",
job.id = %req.id(),
job.attempts = req.attempts(),
job.name = J::NAME,
);
if let Some(context) = req.inner().span_context() {
span.add_link(context);
}
self.inner.call(req).instrument(span)
}
}

View File

@@ -31,8 +31,8 @@ use tracing::debug;
mod database;
mod email;
mod layers;
mod matrix;
mod utils;
pub use self::matrix::HomeserverConnection;
@@ -103,15 +103,12 @@ impl State {
&self.homeserver
}
pub async fn http_client<B>(
&self,
operation: &'static str,
) -> Result<ClientService<TracedClient<B>>, ClientInitError>
pub async fn http_client<B>(&self) -> Result<ClientService<TracedClient<B>>, ClientInitError>
where
B: mas_axum_utils::axum::body::HttpBody + Send,
B::Data: Send,
{
self.http_client_factory.client(operation).await
self.http_client_factory.client().await
}
}

View File

@@ -33,10 +33,13 @@ use mas_storage::{
};
use serde::{Deserialize, Serialize};
use tower::{Service, ServiceExt};
use tracing::info;
use tracing::{info, info_span, Instrument};
use url::Url;
use crate::{layers::TracingLayer, JobContextExt, State};
use crate::{
utils::{metrics_layer, trace_layer},
JobContextExt, State,
};
pub struct HomeserverConnection {
homeserver: String,
@@ -101,7 +104,7 @@ async fn provision_user(
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state
.http_client("matrix.provision_user")
.http_client()
.await?
.request_bytes_to_body()
.json_request();
@@ -158,7 +161,12 @@ async fn provision_user(
let req = req.body(body).context("Failed to build request")?;
let response = client.ready().await?.call(req).await?;
let response = client
.ready()
.await?
.call(req)
.instrument(info_span!("matrix.provision_user"))
.await?;
match response.status() {
StatusCode::CREATED => info!(%user.id, %mxid, "User created"),
@@ -194,7 +202,7 @@ async fn provision_device(
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state
.http_client("matrix.provision_device")
.http_client()
.await?
.request_bytes_to_body()
.json_request();
@@ -225,7 +233,12 @@ async fn provision_device(
})
.context("Failed to build request")?;
let response = client.ready().await?.call(req).await?;
let response = client
.ready()
.await?
.call(req)
.instrument(info_span!("matrix.create_device"))
.await?;
match response.status() {
StatusCode::CREATED => {
@@ -255,7 +268,7 @@ async fn delete_device(
) -> Result<(), anyhow::Error> {
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state.http_client("matrix.delete_device").await?;
let mut client = state.http_client().await?;
let mut repo = state.repository().await?;
let user = repo
@@ -284,7 +297,12 @@ async fn delete_device(
.body(EmptyBody::new())
.context("Failed to build request")?;
let response = client.ready().await?.call(req).await?;
let response = client
.ready()
.await?
.call(req)
.instrument(info_span!("matrix.delete_device"))
.await?;
match response.status() {
StatusCode::OK => info!(%user.id, %mxid, "Device deleted"),
@@ -303,7 +321,8 @@ pub(crate) fn register(
let worker_name = format!("{job}-{suffix}", job = ProvisionUserJob::NAME);
let provision_user_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.layer(trace_layer())
.layer(metrics_layer::<JobWithSpanContext<ProvisionUserJob>>())
.with_storage(storage)
.build_fn(provision_user);
@@ -311,7 +330,8 @@ pub(crate) fn register(
let worker_name = format!("{job}-{suffix}", job = ProvisionDeviceJob::NAME);
let provision_device_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.layer(trace_layer())
.layer(metrics_layer::<JobWithSpanContext<ProvisionDeviceJob>>())
.with_storage(storage)
.build_fn(provision_device);
@@ -319,7 +339,8 @@ pub(crate) fn register(
let worker_name = format!("{job}-{suffix}", job = DeleteDeviceJob::NAME);
let delete_device_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.layer(trace_layer())
.layer(metrics_layer::<JobWithSpanContext<DeleteDeviceJob>>())
.with_storage(storage)
.build_fn(delete_device);

79
crates/tasks/src/utils.rs Normal file
View File

@@ -0,0 +1,79 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use apalis_core::{job::Job, request::JobRequest};
use mas_storage::job::JobWithSpanContext;
use mas_tower::{
make_span_fn, DurationRecorderLayer, FnWrapper, InFlightCounterLayer, TraceLayer, KV,
};
use opentelemetry::{Key, KeyValue};
use tracing::info_span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
const JOB_NAME: Key = Key::from_static_str("job.name");
const JOB_STATUS: Key = Key::from_static_str("job.status");
fn make_span_for_job_request<J>(req: &JobRequest<JobWithSpanContext<J>>) -> tracing::Span
where
J: Job,
{
let span = info_span!(
"job.run",
"otel.kind" = "consumer",
"otel.status_code" = tracing::field::Empty,
"job.id" = %req.id(),
"job.attempts" = req.attempts(),
"job.name" = J::NAME,
);
if let Some(context) = req.inner().span_context() {
span.add_link(context);
}
span
}
type TraceLayerForJob<J> = TraceLayer<
FnWrapper<fn(&JobRequest<JobWithSpanContext<J>>) -> tracing::Span>,
KV<&'static str>,
KV<&'static str>,
>;
pub(crate) fn trace_layer<J>() -> TraceLayerForJob<J>
where
J: Job,
{
TraceLayer::new(make_span_fn(
make_span_for_job_request::<J> as fn(&JobRequest<JobWithSpanContext<J>>) -> tracing::Span,
))
.on_response(KV("otel.status_code", "OK"))
.on_error(KV("otel.status_code", "ERROR"))
}
pub(crate) fn metrics_layer<J>() -> (
DurationRecorderLayer<KeyValue, KeyValue, KeyValue>,
InFlightCounterLayer<KeyValue>,
)
where
J: Job,
{
let duration_recorder = DurationRecorderLayer::new("job.run.duration")
.on_request(JOB_NAME.string(J::NAME))
.on_response(JOB_STATUS.string("success"))
.on_error(JOB_STATUS.string("error"));
let in_flight_counter =
InFlightCounterLayer::new("job.run.active").on_request(JOB_NAME.string(J::NAME));
(duration_recorder, in_flight_counter)
}