1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-07-29 22:01:14 +03:00

Sentry transport based on hyper to get rid of reqwest

This commit is contained in:
Quentin Gliech
2023-04-18 22:03:17 +02:00
parent 70b03a131f
commit c9e9130cdf
6 changed files with 447 additions and 91 deletions

View File

@ -40,7 +40,7 @@ opentelemetry-zipkin = { version = "0.16.0", features = ["opentelemetry-http"],
opentelemetry-http = { version = "0.7.0", features = ["tokio", "hyper"], optional = true }
opentelemetry-prometheus = { version = "0.11.0", optional = true }
prometheus = { version = "0.13.3", optional = true }
sentry = { version = "0.30.0", default-features = false, features = ["backtrace", "contexts", "panic", "reqwest", "rustls", "tower"] }
sentry = { version = "0.30.0", default-features = false, features = ["backtrace", "contexts", "panic", "tower"] }
sentry-tracing = "0.30.0"
sentry-tower = { version = "0.30.0", features = ["http"] }
@ -59,6 +59,7 @@ mas-tasks = { path = "../tasks" }
mas-templates = { path = "../templates" }
mas-tower = { path = "../tower" }
oauth2-types = { path = "../oauth2-types" }
httpdate = "1.0.2"
[dev-dependencies]
indoc = "2.0.1"

View File

@ -17,6 +17,8 @@
#![warn(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
use std::sync::Arc;
use anyhow::Context;
use clap::Parser;
use mas_config::TelemetryConfig;
@ -25,7 +27,10 @@ use tracing_subscriber::{
filter::LevelFilter, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer, Registry,
};
use crate::sentry_transport::HyperTransportFactory;
mod commands;
mod sentry_transport;
mod server;
mod telemetry;
mod util;
@ -70,6 +75,9 @@ async fn try_main() -> anyhow::Result<()> {
let sentry = sentry::init((
telemetry_config.sentry.dsn.as_deref(),
sentry::ClientOptions {
transport: Some(Arc::new(HyperTransportFactory::new(
mas_http::make_untraced_client().await?,
))),
traces_sample_rate: 1.0,
auto_session_tracking: true,
session_mode: sentry::SessionMode::Request,

View File

@ -0,0 +1,131 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Implements a transport for Sentry based on Hyper.
//!
//! This avoids the dependency on `reqwest`, which helps avoiding having two
//! HTTP and TLS stacks in the binary.
//!
//! The [`ratelimit`] and [`tokio_thread`] modules are directly copied from the
//! Sentry codebase.
use std::{sync::Arc, time::Duration};
use hyper::{client::connect::Connect, header::RETRY_AFTER, Client, StatusCode};
use sentry::{sentry_debug, ClientOptions, Transport, TransportFactory};
use self::tokio_thread::TransportThread;
mod ratelimit;
mod tokio_thread;
pub struct HyperTransport {
thread: TransportThread,
}
pub struct HyperTransportFactory<C> {
client: Client<C>,
}
impl<C> HyperTransportFactory<C> {
pub fn new(client: Client<C>) -> Self {
Self { client }
}
}
impl<C> TransportFactory for HyperTransportFactory<C>
where
C: Connect + Clone + Send + Sync + 'static,
{
fn create_transport(&self, options: &ClientOptions) -> Arc<dyn Transport> {
Arc::new(HyperTransport::new(options, self.client.clone()))
}
}
impl HyperTransport {
pub fn new<C>(options: &ClientOptions, client: Client<C>) -> Self
where
C: Connect + Clone + Send + Sync + 'static,
{
let dsn = options.dsn.as_ref().unwrap();
let user_agent = options.user_agent.clone();
let auth = dsn.to_auth(Some(&user_agent)).to_string();
let url = dsn.envelope_api_url().to_string();
let thread = TransportThread::new(move |envelope, mut rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = hyper::Request::post(&url)
.header("X-Sentry-Auth", &auth)
.body(hyper::Body::from(body))
.unwrap();
let fut = client.request(request);
async move {
match fut.await {
Ok(response) => {
if let Some(sentry_header) = response
.headers()
.get("x-sentry-rate-limits")
.and_then(|x| x.to_str().ok())
{
rl.update_from_sentry_header(sentry_header);
} else if let Some(retry_after) = response
.headers()
.get(RETRY_AFTER)
.and_then(|x| x.to_str().ok())
{
rl.update_from_retry_after(retry_after);
} else if response.status() == StatusCode::TOO_MANY_REQUESTS {
rl.update_from_429();
}
match hyper::body::to_bytes(response.into_body()).await {
Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err);
}
Ok(bytes) => {
let text = String::from_utf8_lossy(&bytes);
sentry_debug!("Get response: `{}`", text);
}
}
}
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
rl
}
});
Self { thread }
}
}
impl Transport for HyperTransport {
fn send_envelope(&self, envelope: sentry::Envelope) {
self.thread.send(envelope);
}
fn flush(&self, timeout: Duration) -> bool {
self.thread.flush(timeout)
}
fn shutdown(&self, timeout: Duration) -> bool {
self.flush(timeout)
}
}

View File

@ -0,0 +1,185 @@
// Taken from sentry/transports/ratelimit.rs
use std::time::{Duration, SystemTime};
use httpdate::parse_http_date;
use sentry::{protocol::EnvelopeItem, Envelope};
/// A Utility that helps with rate limiting sentry requests.
#[derive(Debug, Default)]
pub struct RateLimiter {
global: Option<SystemTime>,
error: Option<SystemTime>,
session: Option<SystemTime>,
transaction: Option<SystemTime>,
attachment: Option<SystemTime>,
profile: Option<SystemTime>,
}
impl RateLimiter {
/// Create a new RateLimiter.
pub fn new() -> Self {
Self::default()
}
/// Updates the RateLimiter with information from a `Retry-After` header.
pub fn update_from_retry_after(&mut self, header: &str) {
let new_time = if let Ok(value) = header.parse::<f64>() {
SystemTime::now() + Duration::from_secs(value.ceil() as u64)
} else if let Ok(value) = parse_http_date(header) {
value
} else {
SystemTime::now() + Duration::from_secs(60)
};
self.global = Some(new_time);
}
/// Updates the RateLimiter with information from a `X-Sentry-Rate-Limits`
/// header.
pub fn update_from_sentry_header(&mut self, header: &str) {
// <rate-limit> = (<group>,)+
// <group> = <time>:(<category>;)+:<scope>(:<reason>)?
let mut parse_group = |group: &str| {
let mut splits = group.split(':');
let seconds = splits.next()?.parse::<f64>().ok()?;
let categories = splits.next()?;
let _scope = splits.next()?;
let new_time = Some(SystemTime::now() + Duration::from_secs(seconds.ceil() as u64));
if categories.is_empty() {
self.global = new_time;
}
for category in categories.split(';') {
match category {
"error" => self.error = new_time,
"session" => self.session = new_time,
"transaction" => self.transaction = new_time,
"attachment" => self.attachment = new_time,
"profile" => self.profile = new_time,
_ => {}
}
}
Some(())
};
for group in header.split(',') {
parse_group(group.trim());
}
}
/// Updates the RateLimiter in response to a `429` status code.
pub fn update_from_429(&mut self) {
self.global = Some(SystemTime::now() + Duration::from_secs(60));
}
/// Query the RateLimiter if a certain category of event is currently rate
/// limited.
///
/// If the given category is rate limited, it will return the remaining
/// [`Duration`] for which it is.
pub fn is_disabled(&self, category: RateLimitingCategory) -> Option<Duration> {
if let Some(ts) = self.global {
let time_left = ts.duration_since(SystemTime::now()).ok();
if time_left.is_some() {
return time_left;
}
}
let time_left = match category {
RateLimitingCategory::Any => self.global,
RateLimitingCategory::Error => self.error,
RateLimitingCategory::Session => self.session,
RateLimitingCategory::Transaction => self.transaction,
RateLimitingCategory::Attachment => self.attachment,
RateLimitingCategory::Profile => self.profile,
}?;
time_left.duration_since(SystemTime::now()).ok()
}
/// Query the RateLimiter for a certain category of event.
///
/// Returns `true` if the category is *not* rate limited and should be sent.
pub fn is_enabled(&self, category: RateLimitingCategory) -> bool {
self.is_disabled(category).is_none()
}
/// Filters the [`Envelope`] according to the current rate limits.
///
/// Returns [`None`] if all the envelope items were filtered out.
pub fn filter_envelope(&self, envelope: Envelope) -> Option<Envelope> {
envelope.filter(|item| {
self.is_enabled(match item {
EnvelopeItem::Event(_) => RateLimitingCategory::Error,
EnvelopeItem::SessionUpdate(_) | EnvelopeItem::SessionAggregates(_) => {
RateLimitingCategory::Session
}
EnvelopeItem::Transaction(_) => RateLimitingCategory::Transaction,
EnvelopeItem::Attachment(_) => RateLimitingCategory::Attachment,
EnvelopeItem::Profile(_) => RateLimitingCategory::Profile,
_ => RateLimitingCategory::Any,
})
})
}
}
/// The Category of payload that a Rate Limit refers to.
#[non_exhaustive]
pub enum RateLimitingCategory {
/// Rate Limit for any kind of payload.
Any,
/// Rate Limit pertaining to Errors.
Error,
/// Rate Limit pertaining to Sessions.
Session,
/// Rate Limit pertaining to Transactions.
Transaction,
/// Rate Limit pertaining to Attachments.
Attachment,
/// Rate Limit pertaining to Profiles.
Profile,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sentry_header() {
let mut rl = RateLimiter::new();
rl.update_from_sentry_header("120:error:project:reason, 60:session:foo");
assert!(rl.is_disabled(RateLimitingCategory::Error).unwrap() <= Duration::from_secs(120));
assert!(rl.is_disabled(RateLimitingCategory::Session).unwrap() <= Duration::from_secs(60));
assert!(rl.is_disabled(RateLimitingCategory::Transaction).is_none());
assert!(rl.is_disabled(RateLimitingCategory::Any).is_none());
rl.update_from_sentry_header(
r#"
30::bar,
120:invalid:invalid,
4711:foo;bar;baz;security:project
"#,
);
assert!(
rl.is_disabled(RateLimitingCategory::Transaction).unwrap() <= Duration::from_secs(30)
);
assert!(rl.is_disabled(RateLimitingCategory::Any).unwrap() <= Duration::from_secs(30));
}
#[test]
fn test_retry_after() {
let mut rl = RateLimiter::new();
rl.update_from_retry_after("60");
assert!(rl.is_disabled(RateLimitingCategory::Error).unwrap() <= Duration::from_secs(60));
assert!(rl.is_disabled(RateLimitingCategory::Session).unwrap() <= Duration::from_secs(60));
assert!(
rl.is_disabled(RateLimitingCategory::Transaction).unwrap() <= Duration::from_secs(60)
);
assert!(rl.is_disabled(RateLimitingCategory::Any).unwrap() <= Duration::from_secs(60));
}
}

View File

@ -0,0 +1,118 @@
// Copied from sentry/transports/tokio_thread.rs
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, SyncSender},
Arc,
},
thread::{self, JoinHandle},
time::Duration,
};
use sentry::{sentry_debug, Envelope};
use super::ratelimit::{RateLimiter, RateLimitingCategory};
enum Task {
SendEnvelope(Envelope),
Flush(SyncSender<()>),
Shutdown,
}
pub struct TransportThread {
sender: SyncSender<Task>,
shutdown: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl TransportThread {
pub fn new<SendFn, SendFuture>(mut send: SendFn) -> Self
where
SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static,
// NOTE: returning RateLimiter here, otherwise we are in borrow hell
SendFuture: std::future::Future<Output = RateLimiter>,
{
let (sender, receiver) = sync_channel(30);
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_worker = shutdown.clone();
let handle = thread::Builder::new()
.name("sentry-transport".into())
.spawn(move || {
// create a runtime on the transport thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut rl = RateLimiter::new();
// and block on an async fn in this runtime/thread
rt.block_on(async move {
for task in receiver.into_iter() {
if shutdown_worker.load(Ordering::SeqCst) {
return;
}
let envelope = match task {
Task::SendEnvelope(envelope) => envelope,
Task::Flush(sender) => {
sender.send(()).ok();
continue;
}
Task::Shutdown => {
return;
}
};
if let Some(time_left) = rl.is_disabled(RateLimitingCategory::Any) {
sentry_debug!(
"Skipping event send because we're disabled due to rate limits for {}s",
time_left.as_secs()
);
continue;
}
match rl.filter_envelope(envelope) {
Some(envelope) => {
rl = send(envelope, rl).await;
},
None => {
sentry_debug!("Envelope was discarded due to per-item rate limits");
},
};
}
})
})
.ok();
Self {
sender,
shutdown,
handle,
}
}
pub fn send(&self, envelope: Envelope) {
// Using send here would mean that when the channel fills up for whatever
// reason, trying to send an envelope would block everything. We'd rather
// drop the envelope in that case.
if let Err(e) = self.sender.try_send(Task::SendEnvelope(envelope)) {
sentry_debug!("envelope dropped: {e}");
}
}
pub fn flush(&self, timeout: Duration) -> bool {
let (sender, receiver) = sync_channel(1);
let _ = self.sender.send(Task::Flush(sender));
receiver.recv_timeout(timeout).is_err()
}
}
impl Drop for TransportThread {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
let _ = self.sender.send(Task::Shutdown);
if let Some(handle) = self.handle.take() {
handle.join().unwrap();
}
}
}