diff --git a/Cargo.lock b/Cargo.lock index 0ab6eca6..77b1edbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2349,7 +2349,7 @@ dependencies = [ "atty", "clap", "dotenv", - "futures 0.3.24", + "futures-util", "hyper", "indoc", "mas-config", @@ -2366,8 +2366,10 @@ dependencies = [ "opentelemetry-http", "opentelemetry-jaeger", "opentelemetry-otlp", + "opentelemetry-prometheus", "opentelemetry-semantic-conventions", "opentelemetry-zipkin", + "prometheus", "schemars", "serde_json", "serde_yaml", @@ -2375,6 +2377,7 @@ dependencies = [ "tower", "tracing", "tracing-appender", + "tracing-opentelemetry", "tracing-subscriber", "url", "watchman_client", @@ -2994,8 +2997,10 @@ dependencies = [ "async-trait", "futures 0.3.24", "futures-executor", + "http", "once_cell", "opentelemetry", + "opentelemetry-http", "opentelemetry-semantic-conventions", "thiserror", "thrift", @@ -3021,6 +3026,17 @@ dependencies = [ "tonic", ] +[[package]] +name = "opentelemetry-prometheus" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06c3d833835a53cf91331d2cfb27e9121f5a95261f31f08a1f79ab31688b8da8" +dependencies = [ + "opentelemetry", + "prometheus", + "protobuf", +] + [[package]] name = "opentelemetry-proto" version = "0.1.0" @@ -3540,6 +3556,21 @@ dependencies = [ "yansi", ] +[[package]] +name = "prometheus" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45c8babc29389186697fe5a2a4859d697825496b83db5d0b65271cdc0488e88c" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.1", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.11.0" @@ -3593,6 +3624,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "psm" version = "0.1.21" diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index d8152343..e1720f84 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -7,7 +7,7 @@ license = "Apache-2.0" [dependencies] tokio = { version = "1.21.1", features = ["full"] } -futures = "0.3.24" +futures-util = "0.3.24" anyhow = "1.0.65" clap = { version = "3.2.22", features = ["derive"] } dotenv = "0.15.0" @@ -24,13 +24,15 @@ atty = "0.2.14" tracing = "0.1.36" tracing-appender = "0.2.2" tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } -#tracing-opentelemetry = "0.17.4" +tracing-opentelemetry = "0.18.0" opentelemetry = { version = "0.18.0", features = ["trace", "metrics", "rt-tokio"] } opentelemetry-semantic-conventions = "0.10.0" -opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"], optional = true } +opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio", "collector_client"], optional = true } opentelemetry-otlp = { version = "0.11.0", features = ["trace", "metrics", "http-proto"], optional = true } opentelemetry-zipkin = { version = "0.16.0", features = ["opentelemetry-http"], default-features = false, optional = true } opentelemetry-http = { version = "0.7.0", features = ["tokio", "hyper"], optional = true } +opentelemetry-prometheus = "0.11.0" +prometheus = "0.13.2" mas-config = { path = "../config" } mas-email = { path = "../email" } diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index eb6d1424..c83a8b8f 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -58,16 +58,14 @@ async fn try_main() -> anyhow::Result<()> { .or_else(|_| EnvFilter::try_new("info")) .context("could not setup logging filter")?; - /* // Don't fill the telemetry layer for now, we want to configure it based on the // app config, so we need to delay that a bit let (telemetry_layer, handle) = reload::Layer::new(None); // We only want "INFO" level spans to go through OpenTelemetry let telemetry_layer = telemetry_layer.with_filter(LevelFilter::INFO); - */ let subscriber = Registry::default() - //.with(telemetry_layer) + .with(telemetry_layer) .with(filter_layer) .with(fmt_layer); subscriber @@ -91,10 +89,9 @@ async fn try_main() -> anyhow::Result<()> { let telemetry_config: TelemetryConfig = opts.load_config().unwrap_or_default(); // Setup OpenTelemtry tracing and metrics - let tracer = telemetry::setup(&telemetry_config) + let (tracer, _meter) = telemetry::setup(&telemetry_config) .await .context("failed to setup opentelemetry")?; - /* if let Some(tracer) = tracer { // Now we can swap out the actual opentelemetry tracing layer handle.reload( @@ -103,7 +100,6 @@ async fn try_main() -> anyhow::Result<()> { .with_tracked_inactivity(false), )?; } - */ // And run the command tracing::trace!(?opts, "Running command"); diff --git a/crates/cli/src/telemetry.rs b/crates/cli/src/telemetry.rs index 8b208b3c..aef14793 100644 --- a/crates/cli/src/telemetry.rs +++ b/crates/cli/src/telemetry.rs @@ -12,28 +12,46 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{net::SocketAddr, time::Duration}; +use std::{ + convert::Infallible, + net::{SocketAddr, TcpListener}, + time::Duration, +}; -use anyhow::bail; -use mas_config::{MetricsExporterConfig, Propagator, TelemetryConfig, TracingExporterConfig}; +use anyhow::{bail, Context as _}; +use hyper::{header::CONTENT_TYPE, service::make_service_fn, Body, Method, Request, Response}; +use mas_config::{ + JaegerExporterProtocolConfig, MetricsExporterConfig, Propagator, TelemetryConfig, + TracingExporterConfig, +}; use opentelemetry::{ global, propagation::TextMapPropagator, sdk::{ self, + metrics::controllers::BasicController, propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, trace::{Sampler, Tracer}, Resource, }, + Context, }; #[cfg(feature = "jaeger")] use opentelemetry_jaeger::Propagator as JaegerPropagator; use opentelemetry_semantic_conventions as semcov; #[cfg(feature = "zipkin")] use opentelemetry_zipkin::{B3Encoding, Propagator as ZipkinPropagator}; +use prometheus::{Encoder, TextEncoder}; +use tokio::sync::OnceCell; +use tower::service_fn; +use tracing::info; use url::Url; -pub async fn setup(config: &TelemetryConfig) -> anyhow::Result> { +static METRICS_BASIC_CONTROLLER: OnceCell> = OnceCell::const_new(); + +pub async fn setup( + config: &TelemetryConfig, +) -> anyhow::Result<(Option, Option)> { global::set_error_handler(|e| tracing::error!("{}", e))?; let propagator = propagator(&config.tracing.propagators)?; @@ -43,12 +61,19 @@ pub async fn setup(config: &TelemetryConfig) -> anyhow::Result> { global::set_text_map_propagator(propagator); let tracer = tracer(&config.tracing.exporter).await?; - meter(&config.metrics.exporter)?; - Ok(tracer) + let meter = meter(&config.metrics.exporter)?; + METRICS_BASIC_CONTROLLER.set(meter.clone())?; + + Ok((tracer, meter)) } pub fn shutdown() { global::shutdown_tracer_provider(); + + if let Some(Some(controller)) = METRICS_BASIC_CONTROLLER.get() { + let cx = Context::new(); + controller.stop(&cx).unwrap(); + } } fn match_propagator( @@ -124,19 +149,50 @@ fn otlp_tracer(_endpoint: &Option) -> anyhow::Result { } #[cfg(not(feature = "jaeger"))] -fn jaeger_tracer(_agent_endpoint: &Option) -> anyhow::Result { +fn jaeger_agent_tracer(host: &str, port: u16) -> anyhow::Result { anyhow::bail!("The service was compiled without Jaeger exporter support, but config exports traces via Jaeger.") } #[cfg(feature = "jaeger")] -fn jaeger_tracer(agent_endpoint: &Option) -> anyhow::Result { - // TODO: also support exporting to a Jaeger collector & skip the agent - let mut pipeline = opentelemetry_jaeger::new_agent_pipeline() +fn jaeger_agent_tracer(host: &str, port: u16) -> anyhow::Result { + let pipeline = opentelemetry_jaeger::new_agent_pipeline() .with_service_name(env!("CARGO_PKG_NAME")) - .with_trace_config(trace_config()); + .with_trace_config(trace_config()) + .with_endpoint((host, port)); - if let Some(agent_endpoint) = agent_endpoint { - pipeline = pipeline.with_endpoint(agent_endpoint); + let tracer = pipeline.install_batch(opentelemetry::runtime::Tokio)?; + + Ok(tracer) +} + +#[cfg(not(feature = "jaeger"))] +async fn jaeger_collector_tracer( + endpoint: &str, + username: Option<&str>, + password: Option<&str>, +) -> anyhow::Result { + anyhow::bail!("The service was compiled without Jaeger exporter support, but config exports traces via Jaeger.") +} + +#[cfg(feature = "jaeger")] +async fn jaeger_collector_tracer( + endpoint: &str, + username: Option<&str>, + password: Option<&str>, +) -> anyhow::Result { + let http_client = http_client().await?; + let mut pipeline = opentelemetry_jaeger::new_collector_pipeline() + .with_service_name(env!("CARGO_PKG_NAME")) + .with_trace_config(trace_config()) + .with_http_client(http_client) + .with_endpoint(endpoint); + + if let Some(username) = username { + pipeline = pipeline.with_username(username); + } + + if let Some(password) = password { + pipeline = pipeline.with_password(password); } let tracer = pipeline.install_batch(opentelemetry::runtime::Tokio)?; @@ -145,7 +201,7 @@ fn jaeger_tracer(agent_endpoint: &Option) -> anyhow::Result } #[cfg(not(feature = "zipkin"))] -fn zipkin_tracer(_collector_endpoint: &Option) -> anyhow::Result { +async fn zipkin_tracer(_collector_endpoint: &Option) -> anyhow::Result { anyhow::bail!("The service was compiled without Jaeger exporter support, but config exports traces via Jaeger.") } @@ -159,7 +215,7 @@ async fn zipkin_tracer(collector_endpoint: &Option) -> anyhow::Result anyhow::Result TracingExporterConfig::None => return Ok(None), TracingExporterConfig::Stdout => stdout_tracer(), TracingExporterConfig::Otlp { endpoint } => otlp_tracer(endpoint)?, - TracingExporterConfig::Jaeger { agent_endpoint } => jaeger_tracer(agent_endpoint)?, + TracingExporterConfig::Jaeger(JaegerExporterProtocolConfig::UdpThriftCompact { + agent_host, + agent_port, + }) => jaeger_agent_tracer(agent_host, *agent_port)?, + TracingExporterConfig::Jaeger(JaegerExporterProtocolConfig::HttpThriftBinary { + endpoint, + username, + password, + }) => jaeger_collector_tracer(endpoint, username.as_deref(), password.as_deref()).await?, TracingExporterConfig::Zipkin { collector_endpoint } => { zipkin_tracer(collector_endpoint).await? } @@ -182,7 +246,7 @@ async fn tracer(config: &TracingExporterConfig) -> anyhow::Result } #[cfg(feature = "otlp")] -fn otlp_meter(endpoint: &Option) -> anyhow::Result<()> { +fn otlp_meter(endpoint: &Option) -> anyhow::Result { use opentelemetry_otlp::WithExportConfig; let mut exporter = opentelemetry_otlp::new_exporter().tonic(); @@ -190,46 +254,109 @@ fn otlp_meter(endpoint: &Option) -> anyhow::Result<()> { exporter = exporter.with_endpoint(endpoint.to_string()); } - opentelemetry_otlp::new_pipeline() + let controller = opentelemetry_otlp::new_pipeline() .metrics( - sdk::metrics::selectors::simple::histogram([0.1, 0.2, 0.5, 1.0, 5.0, 10.0]), - sdk::export::metrics::aggregation::stateless_temporality_selector(), + sdk::metrics::selectors::simple::inexpensive(), + sdk::export::metrics::aggregation::cumulative_temporality_selector(), opentelemetry::runtime::Tokio, ) + .with_resource(resource()) .with_exporter(exporter) .build()?; - Ok(()) + Ok(controller) } #[cfg(not(feature = "otlp"))] -fn otlp_meter(_endpoint: &Option) -> anyhow::Result<()> { +fn otlp_meter(_endpoint: &Option) -> anyhow::Result { anyhow::bail!("The service was compiled without OTLP exporter support, but config exports metrics via OTLP.") } -fn stdout_meter() -> anyhow::Result<()> { +fn stdout_meter() -> anyhow::Result { let exporter = sdk::export::metrics::stdout().build()?; + let controller = sdk::metrics::controllers::basic(sdk::metrics::processors::factory( + sdk::metrics::selectors::simple::inexpensive(), + exporter.temporality_selector(), + )) + .with_resource(resource()) + .with_exporter(exporter) + .build(); + + let cx = Context::new(); + controller.start(&cx, opentelemetry::runtime::Tokio)?; + + global::set_meter_provider(controller.clone()); + Ok(controller) +} + +fn prometheus_meter(address: &str) -> anyhow::Result { let controller = sdk::metrics::controllers::basic( sdk::metrics::processors::factory( - sdk::metrics::selectors::simple::histogram([0.1, 0.2, 0.5, 1.0, 5.0, 10.0]), - exporter.temporality_selector(), + sdk::metrics::selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]), + sdk::export::metrics::aggregation::cumulative_temporality_selector(), ) .with_memory(true), ) - .with_exporter(exporter) .build(); - global::set_meter_provider(controller); - Ok(()) + + let exporter = opentelemetry_prometheus::exporter(controller.clone()).init(); + + let make_svc = make_service_fn(move |_conn| { + let exporter = exporter.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req: Request| { + let exporter = exporter.clone(); + async move { + let response = match (req.method(), req.uri().path()) { + (&Method::GET, "/metrics") => { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = exporter.registry().gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Response::builder() + .status(200) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() + } + _ => Response::builder() + .status(404) + .body(Body::from("404 not found")) + .unwrap(), + }; + + Ok::<_, Infallible>(response) + } + })) + } + }); + + let address: SocketAddr = address + .parse() + .context("could not parse listener address")?; + let listener = TcpListener::bind(address).context("could not bind address")?; + + info!( + "Prometheus exporter listening on on http://{}/metrics", + listener.local_addr().unwrap() + ); + + let server = hyper::server::Server::from_tcp(listener)?.serve(make_svc); + tokio::spawn(server); + + Ok(controller) } -fn meter(config: &MetricsExporterConfig) -> anyhow::Result<()> { - match config { - MetricsExporterConfig::None => {} - MetricsExporterConfig::Stdout => stdout_meter()?, - MetricsExporterConfig::Otlp { endpoint } => otlp_meter(endpoint)?, +fn meter(config: &MetricsExporterConfig) -> anyhow::Result> { + let controller = match config { + MetricsExporterConfig::None => None, + MetricsExporterConfig::Stdout => Some(stdout_meter()?), + MetricsExporterConfig::Otlp { endpoint } => Some(otlp_meter(endpoint)?), + MetricsExporterConfig::Prometheus { address } => Some(prometheus_meter(address)?), }; - Ok(()) + Ok(controller) } fn trace_config() -> sdk::trace::Config { diff --git a/crates/config/src/sections/mod.rs b/crates/config/src/sections/mod.rs index a2b1ff5a..2c9b5b4f 100644 --- a/crates/config/src/sections/mod.rs +++ b/crates/config/src/sections/mod.rs @@ -37,8 +37,8 @@ pub use self::{ policy::PolicyConfig, secrets::SecretsConfig, telemetry::{ - MetricsConfig, MetricsExporterConfig, Propagator, TelemetryConfig, TracingConfig, - TracingExporterConfig, + JaegerExporterProtocolConfig, MetricsConfig, MetricsExporterConfig, Propagator, + TelemetryConfig, TracingConfig, TracingExporterConfig, }, templates::TemplatesConfig, }; diff --git a/crates/config/src/sections/telemetry.rs b/crates/config/src/sections/telemetry.rs index 53be4365..36273187 100644 --- a/crates/config/src/sections/telemetry.rs +++ b/crates/config/src/sections/telemetry.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::net::SocketAddr; +use std::num::NonZeroU16; use async_trait::async_trait; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; +use thiserror::Error; use url::Url; use super::ConfigurationSection; @@ -49,18 +50,130 @@ fn otlp_endpoint_example() -> &'static str { "https://localhost:4317" } -fn jaeger_agent_endpoint_example() -> &'static str { - "127.0.0.1:6831" -} - fn zipkin_collector_endpoint_example() -> &'static str { "http://127.0.0.1:9411/api/v2/spans" } +/// The protocol to use by the Jaeger exporter +/// +/// Defaults to `udp/thrift.compact` +#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)] +#[serde(tag = "protocol", try_from = "JaegerExporterKitchenSink")] +pub enum JaegerExporterProtocolConfig { + /// Thrift over HTTP + #[serde(rename = "http/thrift.binary")] + HttpThriftBinary { + /// Full URL of the Jaeger HTTP endpoint + /// + /// Defaults to `http://localhost:14268/api/traces` + endpoint: String, + + /// Username to be used for HTTP basic authentication + username: Option, + + /// Password to be used for HTTP basic authentication + password: Option, + }, + + /// Thrift with compact encoding over UDP + #[serde(rename = "udp/thrift.compact")] + UdpThriftCompact { + /// Hostname of the Jaeger agent + /// + /// Defaults to `localhost` + agent_host: String, + + /// `udp/thrift.compact` port of the Jaeger agent + /// + /// Defaults to `6831` + agent_port: u16, + }, +} + +#[derive(Clone, Debug, Deserialize)] +enum JaegerProtocol { + #[serde(rename = "http/thrift.binary")] + HttpThriftBinary, + + #[serde(rename = "grpc")] + Grpc, + + #[serde(rename = "udp/thrift.compact")] + UdpThriftCompact, + + #[serde(rename = "udp/thrift.binary")] + UdpThriftBinary, +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct JaegerExporterKitchenSink { + protocol: Option, + endpoint: Option, + username: Option, + password: Option, + agent_host: Option, + agent_port: Option, +} + +#[derive(Error, Debug)] +#[error("Invalid Jaeger exporter config")] +enum InvalidJaegerConfig { + UnsupportedProtocol, + UnsupportedField, +} + +impl TryFrom for JaegerExporterProtocolConfig { + type Error = InvalidJaegerConfig; + + fn try_from(value: JaegerExporterKitchenSink) -> Result { + match value { + JaegerExporterKitchenSink { + protocol: Some(JaegerProtocol::UdpThriftCompact) | None, + endpoint: None, + username: None, + password: None, + agent_host: host, + agent_port: port, + } => Ok(Self::UdpThriftCompact { + agent_host: host.unwrap_or_else(|| "localhost".into()), + agent_port: port.map_or(6831, u16::from), + }), + + JaegerExporterKitchenSink { + protocol: Some(JaegerProtocol::HttpThriftBinary), + endpoint, + username, + password, + agent_host: None, + agent_port: None, + } => Ok(Self::HttpThriftBinary { + endpoint: endpoint.map_or_else( + || "http://localhost:14268/api/traces".into(), + |u| u.to_string(), + ), + username, + password, + }), + + JaegerExporterKitchenSink { + protocol: + Some(JaegerProtocol::HttpThriftBinary | JaegerProtocol::UdpThriftCompact) | None, + .. + } => Err(InvalidJaegerConfig::UnsupportedField), + + JaegerExporterKitchenSink { + protocol: Some(JaegerProtocol::Grpc | JaegerProtocol::UdpThriftBinary), + .. + } => Err(InvalidJaegerConfig::UnsupportedProtocol), + } + } +} + /// Exporter to use when exporting traces #[skip_serializing_none] #[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)] -#[serde(tag = "exporter", rename_all = "lowercase")] +#[serde(tag = "exporter", rename_all = "lowercase", deny_unknown_fields)] pub enum TracingExporterConfig { /// Don't export traces None, @@ -77,12 +190,7 @@ pub enum TracingExporterConfig { }, /// Export traces to a Jaeger agent - Jaeger { - /// Jaeger agent endpoint - #[schemars(example = "jaeger_agent_endpoint_example")] - #[serde(default)] - agent_endpoint: Option, - }, + Jaeger(JaegerExporterProtocolConfig), /// Export traces to a Zipkin collector Zipkin { @@ -128,6 +236,12 @@ pub enum MetricsExporterConfig { #[serde(default)] endpoint: Option, }, + + /// Export metrics by exposing a Prometheus-compatible endpoint + Prometheus { + /// IP and port on which the Prometheus endpoint should be exposed + address: String, + }, } impl Default for MetricsExporterConfig {