You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	Added mirroring of spans into Sentry Tracer is a facade that redirects actions to tracing backends
		
			
				
	
	
		
			156 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			156 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """This module implements a tracer facade that creates spans, injects/extracts traceparent headers,
 | |
| and delegates span lifecycle and enrichment to pluggable backends (e.g., Traceparent and Sentry).
 | |
| It uses contextvars to store the trace/span/parent_span ids and start time for each context.
 | |
| """
 | |
| 
 | |
| import contextvars
 | |
| import logging
 | |
| import time
 | |
| from collections.abc import Iterator
 | |
| from contextlib import contextmanager
 | |
| from typing import Any, Dict, List, Optional, Tuple
 | |
| 
 | |
| from tracing.backend import TracerBackend
 | |
| from tracing.span import TraceSpan
 | |
| from tracing.utils import (
 | |
|     rand_16_hex,
 | |
|     rand_8_hex,
 | |
|     format_traceparent,
 | |
|     parse_traceparent,
 | |
| )
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| # Context vars are something like thread-local storage, they are context-local variables
 | |
| _current_trace_id = contextvars.ContextVar[str]("trace_id", default="")
 | |
| _current_span_id = contextvars.ContextVar[str]("span_id", default="")
 | |
| _current_parent_span_id = contextvars.ContextVar[str]("parent_span_id", default="")
 | |
| _current_span_start_ns = contextvars.ContextVar[int]("span_start_ns", default=0)
 | |
| 
 | |
| 
 | |
| class Tracer:
 | |
|     def __init__(self) -> None:
 | |
|         self._backends: List[TracerBackend] = []
 | |
| 
 | |
|     def register_backend(self, backend: TracerBackend) -> None:
 | |
|         try:
 | |
|             self._backends.append(backend)
 | |
|             logger.info(
 | |
|                 "Tracing backend registered: %s", backend.__class__.__name__
 | |
|             )
 | |
|         except Exception:
 | |
|             logger.exception("Failed to register tracing backend")
 | |
| 
 | |
|     def clear_backends(self) -> None:
 | |
|         self._backends.clear()
 | |
| 
 | |
|     @contextmanager
 | |
|     def start_as_current_span(self, name: str, kind: str = "INTERNAL") -> Iterator[TraceSpan]:
 | |
|         trace_id = _current_trace_id.get() or rand_16_hex()
 | |
|         parent_span = _current_span_id.get()
 | |
|         new_span_id = rand_8_hex()
 | |
| 
 | |
|         tok_tid = _current_trace_id.set(trace_id)
 | |
|         tok_sid = _current_span_id.set(new_span_id)
 | |
|         tok_pid = _current_parent_span_id.set(parent_span)
 | |
|         tok_start = _current_span_start_ns.set(time.time_ns())
 | |
| 
 | |
|         span = TraceSpan(
 | |
|             name=name,
 | |
|             kind=kind,
 | |
|             start_ns=_current_span_start_ns.get(),
 | |
|             trace_id=trace_id,
 | |
|             span_id=new_span_id,
 | |
|             parent_span_id=parent_span,
 | |
|             attributes={"span.kind": kind, "span.name": name},
 | |
|             tracer=self,
 | |
|         )
 | |
| 
 | |
|         caught_exc: Optional[BaseException] = None
 | |
|         try:
 | |
|             for backend in list(self._backends):
 | |
|                 backend.on_span_start(span)
 | |
|             yield span
 | |
|         except BaseException as exc:
 | |
|             span.record_exception(exc)
 | |
|             span.set_status("ERROR", str(exc))
 | |
|             caught_exc = exc
 | |
|             raise
 | |
|         finally:
 | |
|             for backend in list(self._backends):
 | |
|                 backend.on_span_end(span, caught_exc)
 | |
|             _current_span_start_ns.reset(tok_start)
 | |
|             _current_parent_span_id.reset(tok_pid)
 | |
|             _current_span_id.reset(tok_sid)
 | |
|             _current_trace_id.reset(tok_tid)
 | |
| 
 | |
|     def set_incoming_context(
 | |
|         self,
 | |
|         trace_id: Optional[str] = None,
 | |
|         parent_span_id: Optional[str] = None,
 | |
|     ) -> None:
 | |
|         if trace_id:
 | |
|             _current_trace_id.set(trace_id)
 | |
|         if parent_span_id:
 | |
|             _current_parent_span_id.set(parent_span_id)
 | |
| 
 | |
|     def current_trace_ids(self) -> Tuple[str, str, str]:
 | |
|         return _current_trace_id.get(), _current_span_id.get(), _current_parent_span_id.get()
 | |
| 
 | |
|     def inject_traceparent(self, headers: Dict[str, str]) -> None:
 | |
|         trace_id, span_id, _ = self.current_trace_ids()
 | |
|         if not trace_id or not span_id:
 | |
|             trace_id = trace_id or rand_16_hex()
 | |
|             span_id = span_id or rand_8_hex()
 | |
|         headers["traceparent"] = format_traceparent(trace_id, span_id)
 | |
| 
 | |
|     def inject_outbound_headers(self, headers: Dict[str, str]) -> None:
 | |
|         self.inject_traceparent(headers)
 | |
|         for backend in list(self._backends):
 | |
|             backend.on_inject_headers(headers)
 | |
| 
 | |
|     def notify_incoming_request(self, headers: Dict[str, str], method: str, path: str) -> None:
 | |
|         for backend in list(self._backends):
 | |
|             backend.on_incoming_request(headers, method, path)
 | |
| 
 | |
|     def notify_request_finished(self, status_code: Optional[int]) -> None:
 | |
|         for backend in list(self._backends):
 | |
|             backend.on_request_finished(status_code)
 | |
| 
 | |
|     def extract_traceparent(self, headers: Dict[str, str]) -> Tuple[str, str]:
 | |
|         raw_traceparent = (headers.get("traceparent")
 | |
|                             or headers.get("Traceparent")
 | |
|                             or headers.get("TRACEPARENT"))
 | |
|         if not raw_traceparent:
 | |
|             return "", ""
 | |
|         parsed = parse_traceparent(raw_traceparent)
 | |
|         if not parsed:
 | |
|             return "", ""
 | |
|         return parsed[0], parsed[1]
 | |
| 
 | |
|     def _notify_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None:
 | |
|         for backend in list(self._backends):
 | |
|             backend.on_span_event(span, name, attrs)
 | |
| 
 | |
|     def _notify_status(self, span: TraceSpan, code: str, description: str) -> None:
 | |
|         for backend in list(self._backends):
 | |
|             backend.on_span_status(span, code, description)
 | |
| 
 | |
|     def _notify_exception(self, span: TraceSpan, exc: BaseException) -> None:
 | |
|         for backend in list(self._backends):
 | |
|             backend.on_span_exception(span, exc)
 | |
| 
 | |
|     def _notify_attribute(self, span: TraceSpan, key: str, value: Any) -> None:
 | |
|         for backend in list(self._backends):
 | |
|             backend.on_span_attribute(span, key, value)
 | |
| 
 | |
| 
 | |
| _tracer = Tracer()
 | |
| 
 | |
| 
 | |
| def get_tracer() -> Tracer:
 | |
|     return _tracer
 | |
| 
 | |
| 
 |