You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-08-06 06:02:40 +03:00
OpenTelemetry upgrade
This commit is contained in:
@@ -25,26 +25,29 @@ use opentelemetry::{
|
||||
propagation::TextMapPropagator,
|
||||
sdk::{
|
||||
self,
|
||||
metrics::controllers::BasicController,
|
||||
metrics::{
|
||||
reader::{DefaultAggregationSelector, DefaultTemporalitySelector},
|
||||
MeterProvider, PeriodicReader,
|
||||
},
|
||||
propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator},
|
||||
trace::{Sampler, Tracer},
|
||||
trace::{Sampler, Tracer, TracerProvider},
|
||||
Resource,
|
||||
},
|
||||
Context,
|
||||
trace::TracerProvider as _,
|
||||
};
|
||||
use opentelemetry_jaeger::Propagator as JaegerPropagator;
|
||||
use opentelemetry_otlp::MetricsExporterBuilder;
|
||||
use opentelemetry_prometheus::PrometheusExporter;
|
||||
use opentelemetry_semantic_conventions as semcov;
|
||||
use opentelemetry_zipkin::{B3Encoding, Propagator as ZipkinPropagator};
|
||||
use prometheus::Registry;
|
||||
use tokio::sync::OnceCell;
|
||||
use url::Url;
|
||||
|
||||
static METRICS_BASIC_CONTROLLER: OnceCell<BasicController> = OnceCell::const_new();
|
||||
static PROMETHEUS_EXPORTER: OnceCell<PrometheusExporter> = OnceCell::const_new();
|
||||
static METER_PROVIDER: OnceCell<MeterProvider> = OnceCell::const_new();
|
||||
static PROMETHEUS_REGISTRY: OnceCell<Registry> = OnceCell::const_new();
|
||||
|
||||
pub async fn setup(
|
||||
config: &TelemetryConfig,
|
||||
) -> anyhow::Result<(Option<Tracer>, Option<BasicController>)> {
|
||||
pub async fn setup(config: &TelemetryConfig) -> anyhow::Result<Option<Tracer>> {
|
||||
global::set_error_handler(|e| tracing::error!("{}", e))?;
|
||||
let propagator = propagator(&config.tracing.propagators);
|
||||
|
||||
@@ -57,20 +60,16 @@ pub async fn setup(
|
||||
.await
|
||||
.context("Failed to configure traces exporter")?;
|
||||
|
||||
let meter = meter(&config.metrics.exporter).context("Failed to configure metrics exporter")?;
|
||||
if let Some(meter) = meter.as_ref() {
|
||||
METRICS_BASIC_CONTROLLER.set(meter.clone())?;
|
||||
}
|
||||
init_meter(&config.metrics.exporter).context("Failed to configure metrics exporter")?;
|
||||
|
||||
Ok((tracer, meter))
|
||||
Ok(tracer)
|
||||
}
|
||||
|
||||
pub fn shutdown() {
|
||||
global::shutdown_tracer_provider();
|
||||
|
||||
if let Some(controller) = METRICS_BASIC_CONTROLLER.get() {
|
||||
let cx = Context::new();
|
||||
controller.stop(&cx).unwrap();
|
||||
if let Some(meter_provider) = METER_PROVIDER.get() {
|
||||
meter_provider.shutdown().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,11 +99,11 @@ async fn http_client() -> anyhow::Result<impl opentelemetry_http::HttpClient + '
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn stdout_tracer() -> Tracer {
|
||||
sdk::export::trace::stdout::new_pipeline()
|
||||
.with_pretty_print(true)
|
||||
.with_trace_config(trace_config())
|
||||
.install_simple()
|
||||
fn stdout_tracer_provider() -> TracerProvider {
|
||||
let exporter = opentelemetry_stdout::SpanExporter::default();
|
||||
TracerProvider::builder()
|
||||
.with_simple_exporter(exporter)
|
||||
.build()
|
||||
}
|
||||
|
||||
fn otlp_tracer(endpoint: Option<&Url>) -> anyhow::Result<Tracer> {
|
||||
@@ -125,24 +124,24 @@ fn otlp_tracer(endpoint: Option<&Url>) -> anyhow::Result<Tracer> {
|
||||
Ok(tracer)
|
||||
}
|
||||
|
||||
fn jaeger_agent_tracer(host: &str, port: u16) -> anyhow::Result<Tracer> {
|
||||
fn jaeger_agent_tracer_provider(host: &str, port: u16) -> anyhow::Result<TracerProvider> {
|
||||
let pipeline = opentelemetry_jaeger::new_agent_pipeline()
|
||||
.with_service_name(env!("CARGO_PKG_NAME"))
|
||||
.with_trace_config(trace_config())
|
||||
.with_endpoint((host, port));
|
||||
|
||||
let tracer = pipeline
|
||||
.install_batch(opentelemetry::runtime::Tokio)
|
||||
let tracer_provider = pipeline
|
||||
.build_batch(opentelemetry::runtime::Tokio)
|
||||
.context("Failed to configure Jaeger agent exporter")?;
|
||||
|
||||
Ok(tracer)
|
||||
Ok(tracer_provider)
|
||||
}
|
||||
|
||||
async fn jaeger_collector_tracer(
|
||||
async fn jaeger_collector_tracer_provider(
|
||||
endpoint: &str,
|
||||
username: Option<&str>,
|
||||
password: Option<&str>,
|
||||
) -> anyhow::Result<Tracer> {
|
||||
) -> anyhow::Result<TracerProvider> {
|
||||
let http_client = http_client().await?;
|
||||
let mut pipeline = opentelemetry_jaeger::new_collector_pipeline()
|
||||
.with_service_name(env!("CARGO_PKG_NAME"))
|
||||
@@ -158,11 +157,11 @@ async fn jaeger_collector_tracer(
|
||||
pipeline = pipeline.with_password(password);
|
||||
}
|
||||
|
||||
let tracer = pipeline
|
||||
.install_batch(opentelemetry::runtime::Tokio)
|
||||
let tracer_provider = pipeline
|
||||
.build_batch(opentelemetry::runtime::Tokio)
|
||||
.context("Failed to configure Jaeger collector exporter")?;
|
||||
|
||||
Ok(tracer)
|
||||
Ok(tracer_provider)
|
||||
}
|
||||
|
||||
async fn zipkin_tracer(collector_endpoint: &Option<Url>) -> anyhow::Result<Tracer> {
|
||||
@@ -185,28 +184,43 @@ async fn zipkin_tracer(collector_endpoint: &Option<Url>) -> anyhow::Result<Trace
|
||||
}
|
||||
|
||||
async fn tracer(config: &TracingExporterConfig) -> anyhow::Result<Option<Tracer>> {
|
||||
let tracer = match config {
|
||||
let tracer_provider = match config {
|
||||
TracingExporterConfig::None => return Ok(None),
|
||||
TracingExporterConfig::Stdout => stdout_tracer(),
|
||||
TracingExporterConfig::Otlp { endpoint } => otlp_tracer(endpoint.as_ref())?,
|
||||
TracingExporterConfig::Stdout => stdout_tracer_provider(),
|
||||
TracingExporterConfig::Otlp { endpoint } => {
|
||||
// The OTLP exporter already creates a tracer and installs it
|
||||
return Ok(Some(otlp_tracer(endpoint.as_ref())?));
|
||||
}
|
||||
TracingExporterConfig::Jaeger(JaegerExporterProtocolConfig::UdpThriftCompact {
|
||||
agent_host,
|
||||
agent_port,
|
||||
}) => jaeger_agent_tracer(agent_host, *agent_port)?,
|
||||
}) => jaeger_agent_tracer_provider(agent_host, *agent_port)?,
|
||||
TracingExporterConfig::Jaeger(JaegerExporterProtocolConfig::HttpThriftBinary {
|
||||
endpoint,
|
||||
username,
|
||||
password,
|
||||
}) => jaeger_collector_tracer(endpoint, username.as_deref(), password.as_deref()).await?,
|
||||
}) => {
|
||||
jaeger_collector_tracer_provider(endpoint, username.as_deref(), password.as_deref())
|
||||
.await?
|
||||
}
|
||||
TracingExporterConfig::Zipkin { collector_endpoint } => {
|
||||
zipkin_tracer(collector_endpoint).await?
|
||||
// The Zipkin exporter already creates a tracer and installs it
|
||||
return Ok(Some(zipkin_tracer(collector_endpoint).await?));
|
||||
}
|
||||
};
|
||||
|
||||
let tracer = tracer_provider.versioned_tracer(
|
||||
env!("CARGO_PKG_NAME"),
|
||||
Some(env!("CARGO_PKG_VERSION")),
|
||||
Some(semcov::SCHEMA_URL),
|
||||
None,
|
||||
);
|
||||
global::set_tracer_provider(tracer_provider);
|
||||
|
||||
Ok(Some(tracer))
|
||||
}
|
||||
|
||||
fn otlp_meter(endpoint: Option<&url::Url>) -> anyhow::Result<BasicController> {
|
||||
fn otlp_metric_reader(endpoint: Option<&url::Url>) -> anyhow::Result<PeriodicReader> {
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
|
||||
let mut exporter = opentelemetry_otlp::new_exporter().tonic();
|
||||
@@ -214,35 +228,17 @@ fn otlp_meter(endpoint: Option<&url::Url>) -> anyhow::Result<BasicController> {
|
||||
exporter = exporter.with_endpoint(endpoint.to_string());
|
||||
}
|
||||
|
||||
let controller = opentelemetry_otlp::new_pipeline()
|
||||
.metrics(
|
||||
sdk::metrics::selectors::simple::inexpensive(),
|
||||
sdk::export::metrics::aggregation::cumulative_temporality_selector(),
|
||||
opentelemetry::runtime::Tokio,
|
||||
)
|
||||
.with_resource(resource())
|
||||
.with_exporter(exporter)
|
||||
.build()
|
||||
.context("Failed to configure OTLP metrics exporter")?;
|
||||
let exporter = MetricsExporterBuilder::from(exporter).build_metrics_exporter(
|
||||
Box::new(DefaultTemporalitySelector::new()),
|
||||
Box::new(DefaultAggregationSelector::new()),
|
||||
)?;
|
||||
|
||||
Ok(controller)
|
||||
Ok(PeriodicReader::builder(exporter, opentelemetry::runtime::Tokio).build())
|
||||
}
|
||||
|
||||
fn stdout_meter() -> anyhow::Result<BasicController> {
|
||||
let exporter = sdk::export::metrics::stdout().build()?;
|
||||
let controller = sdk::metrics::controllers::basic(sdk::metrics::processors::factory(
|
||||
sdk::metrics::selectors::simple::inexpensive(),
|
||||
exporter.temporality_selector(),
|
||||
))
|
||||
.with_resource(resource())
|
||||
.with_exporter(exporter)
|
||||
.build();
|
||||
|
||||
let cx = Context::new();
|
||||
controller.start(&cx, opentelemetry::runtime::Tokio)?;
|
||||
|
||||
global::set_meter_provider(controller.clone());
|
||||
Ok(controller)
|
||||
fn stdout_metric_reader() -> PeriodicReader {
|
||||
let exporter = opentelemetry_stdout::MetricsExporter::default();
|
||||
PeriodicReader::builder(exporter, opentelemetry::runtime::Tokio).build()
|
||||
}
|
||||
|
||||
pub fn prometheus_service<T>() -> tower::util::ServiceFn<
|
||||
@@ -250,15 +246,15 @@ pub fn prometheus_service<T>() -> tower::util::ServiceFn<
|
||||
> {
|
||||
use prometheus::{Encoder, TextEncoder};
|
||||
|
||||
if !PROMETHEUS_EXPORTER.initialized() {
|
||||
if !PROMETHEUS_REGISTRY.initialized() {
|
||||
tracing::warn!("A Prometheus resource was mounted on a listener, but the Prometheus exporter was not setup in the config");
|
||||
}
|
||||
|
||||
tower::service_fn(move |_req| {
|
||||
let response = if let Some(exporter) = PROMETHEUS_EXPORTER.get() {
|
||||
let response = if let Some(registry) = PROMETHEUS_REGISTRY.get() {
|
||||
let mut buffer = vec![];
|
||||
let encoder = TextEncoder::new();
|
||||
let metric_families = exporter.registry().gather();
|
||||
let metric_families = registry.gather();
|
||||
|
||||
// That shouldn't panic, unless we're constructing invalid labels
|
||||
encoder.encode(&metric_families, &mut buffer).unwrap();
|
||||
@@ -280,32 +276,41 @@ pub fn prometheus_service<T>() -> tower::util::ServiceFn<
|
||||
})
|
||||
}
|
||||
|
||||
fn prometheus_meter() -> anyhow::Result<BasicController> {
|
||||
let controller = sdk::metrics::controllers::basic(sdk::metrics::processors::factory(
|
||||
// All histogram metrics are in milliseconds. Each bucket is ~2x the previous one.
|
||||
sdk::metrics::selectors::simple::histogram([
|
||||
1.0, 3.0, 5.0, 10.0, 30.0, 50.0, 100.0, 300.0, 1000.0,
|
||||
]),
|
||||
sdk::export::metrics::aggregation::cumulative_temporality_selector(),
|
||||
))
|
||||
.with_resource(resource())
|
||||
.build();
|
||||
fn prometheus_metric_reader() -> anyhow::Result<PrometheusExporter> {
|
||||
let registry = Registry::new();
|
||||
PROMETHEUS_REGISTRY.set(registry.clone())?;
|
||||
|
||||
let exporter = opentelemetry_prometheus::exporter(controller.clone()).try_init()?;
|
||||
PROMETHEUS_EXPORTER.set(exporter)?;
|
||||
let exporter = opentelemetry_prometheus::exporter()
|
||||
.with_registry(registry)
|
||||
.without_scope_info()
|
||||
.build()?;
|
||||
|
||||
Ok(controller)
|
||||
Ok(exporter)
|
||||
}
|
||||
|
||||
fn meter(config: &MetricsExporterConfig) -> anyhow::Result<Option<BasicController>> {
|
||||
let controller = match config {
|
||||
MetricsExporterConfig::None => None,
|
||||
MetricsExporterConfig::Stdout => Some(stdout_meter()?),
|
||||
MetricsExporterConfig::Otlp { endpoint } => Some(otlp_meter(endpoint.as_ref())?),
|
||||
MetricsExporterConfig::Prometheus => Some(prometheus_meter()?),
|
||||
fn init_meter(config: &MetricsExporterConfig) -> anyhow::Result<()> {
|
||||
let mut meter_provider_builder = MeterProvider::builder();
|
||||
match config {
|
||||
MetricsExporterConfig::None => {}
|
||||
MetricsExporterConfig::Stdout => {
|
||||
meter_provider_builder = meter_provider_builder.with_reader(stdout_metric_reader());
|
||||
}
|
||||
MetricsExporterConfig::Otlp { endpoint } => {
|
||||
meter_provider_builder =
|
||||
meter_provider_builder.with_reader(otlp_metric_reader(endpoint.as_ref())?);
|
||||
}
|
||||
MetricsExporterConfig::Prometheus => {
|
||||
meter_provider_builder =
|
||||
meter_provider_builder.with_reader(prometheus_metric_reader()?);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(controller)
|
||||
let meter_provider = meter_provider_builder.with_resource(resource()).build();
|
||||
|
||||
METER_PROVIDER.set(meter_provider.clone())?;
|
||||
global::set_meter_provider(meter_provider.clone());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn trace_config() -> sdk::trace::Config {
|
||||
@@ -326,6 +331,7 @@ fn resource() -> Resource {
|
||||
Box::new(sdk::resource::EnvResourceDetector::new()),
|
||||
Box::new(sdk::resource::OsResourceDetector),
|
||||
Box::new(sdk::resource::ProcessResourceDetector),
|
||||
Box::new(sdk::resource::TelemetryResourceDetector),
|
||||
],
|
||||
);
|
||||
|
||||
|
Reference in New Issue
Block a user