1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-10-31 18:30:33 +03:00

Created a separate package for tracing-related stuff

Added mirroring of spans into Sentry
Tracer is a facade that redirects actions to tracing backends
This commit is contained in:
Alexander Presnyakov
2025-08-29 23:04:24 +00:00
committed by Leonid Fedorov
parent a0b4bcd1ce
commit 9b98c5c20a
22 changed files with 681 additions and 514 deletions

View File

@@ -72,6 +72,7 @@ install(
cmapi_server
engine_files
mcs_cluster_tool
tracing
DESTINATION ${CMAPI_DIR}
USE_SOURCE_PERMISSIONS
PATTERN "test" EXCLUDE

View File

@@ -16,9 +16,11 @@ from cherrypy.process import plugins
# TODO: fix dispatcher choose logic because code executing in endpoints.py
# while import process, this cause module logger misconfiguration
from cmapi_server.logging_management import config_cmapi_server_logging
from cmapi_server.sentry import maybe_init_sentry, register_sentry_cherrypy_tool
from tracing.sentry import maybe_init_sentry
from tracing.traceparent_backend import TraceparentBackend
from tracing.tracer import get_tracer
config_cmapi_server_logging()
from cmapi_server.trace_tool import register_tracing_tools
from tracing.trace_tool import register_tracing_tools
from cmapi_server import helpers
from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, CMAPI_CONF_PATH
@@ -143,10 +145,8 @@ if __name__ == '__main__':
helpers.cmapi_config_check()
register_tracing_tools()
# Init Sentry if DSN is present
sentry_active = maybe_init_sentry()
if sentry_active:
register_sentry_cherrypy_tool()
get_tracer().register_backend(TraceparentBackend()) # Register default tracing backend
maybe_init_sentry() # Init Sentry if DSN is present
CertificateManager.create_self_signed_certificate_if_not_exist()
CertificateManager.renew_certificate()
@@ -159,8 +159,6 @@ if __name__ == '__main__':
'tools.trace.on': True,
'tools.trace_end.on': True,
}
if sentry_active:
root_config["tools.sentry.on"] = True
app.config.update({
'/': root_config,

View File

@@ -7,11 +7,11 @@
},
"formatters": {
"cmapi_server": {
"format": "%(asctime)s [%(levelname)s] (%(name)s) {%(threadName)s} %(ip)s %(message)s",
"format": "%(asctime)s [%(levelname)s] (%(name)s) {%(threadName)s} %(ip)s %(message)s %(trace_params)s",
"datefmt": "%d/%b/%Y %H:%M:%S"
},
"default": {
"format": "%(asctime)s [%(levelname)s] (%(name)s) {%(threadName)s} %(message)s",
"format": "%(asctime)s [%(levelname)s] (%(name)s) {%(threadName)s} %(message)s %(trace_params)s",
"datefmt": "%d/%b/%Y %H:%M:%S"
},
"container_sh": {
@@ -75,7 +75,7 @@
"level": "DEBUG",
"propagate": false
},
"": {
"root": {
"handlers": ["console", "file"],
"level": "DEBUG"
}

View File

@@ -3,14 +3,16 @@ from typing import Any, Dict, Optional, Union
import pyotp
import requests
from cmapi_server.traced_session import get_traced_session
from cmapi_server.controllers.dispatcher import _version
from cmapi_server.constants import (
CMAPI_CONF_PATH, CURRENT_NODE_CMAPI_URL, SECRET_KEY,
CMAPI_CONF_PATH,
CURRENT_NODE_CMAPI_URL,
SECRET_KEY,
)
from cmapi_server.controllers.dispatcher import _version
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.helpers import get_config_parser, get_current_key
from tracing.traced_session import get_traced_session
class ClusterControllerClient:

View File

@@ -4,21 +4,30 @@ from datetime import datetime
from enum import Enum
from typing import Optional
from cmapi_server.traced_session import get_traced_session
from mcs_node_control.models.misc import get_dbrm_master
from mcs_node_control.models.node_config import NodeConfig
from cmapi_server.constants import (
CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH,
CMAPI_CONF_PATH,
DEFAULT_MCS_CONF_PATH,
)
from cmapi_server.exceptions import CMAPIBasicError
from cmapi_server.helpers import (
broadcast_new_config, get_active_nodes, get_dbroots, get_config_parser,
get_current_key, get_version, update_revision_and_manager,
broadcast_new_config,
get_active_nodes,
get_config_parser,
get_current_key,
get_dbroots,
get_version,
update_revision_and_manager,
)
from cmapi_server.node_manipulation import (
add_node, add_dbroot, remove_node, switch_node_maintenance,
add_dbroot,
add_node,
remove_node,
switch_node_maintenance,
)
from mcs_node_control.models.misc import get_dbrm_master
from mcs_node_control.models.node_config import NodeConfig
from tracing.traced_session import get_traced_session
class ClusterAction(Enum):
@@ -50,7 +59,7 @@ def toggle_cluster_state(
broadcast_new_config(config, distribute_secrets=True)
class ClusterHandler():
class ClusterHandler:
"""Class for handling MCS Cluster operations."""
@staticmethod

View File

@@ -19,8 +19,8 @@ from urllib.parse import urlencode, urlunparse
import aiohttp
import lxml.objectify
import requests
from cmapi_server.traced_session import get_traced_session
from cmapi_server.traced_aiohttp import create_traced_async_session
from tracing.traced_session import get_traced_session
from tracing.traced_aiohttp import create_traced_async_session
from cmapi_server.exceptions import CMAPIBasicError
# Bug in pylint https://github.com/PyCQA/pylint/issues/4584

View File

@@ -7,7 +7,7 @@ import cherrypy
from cherrypy import _cperror
from cmapi_server.constants import CMAPI_LOG_CONF_PATH
from cmapi_server.tracer import get_tracer
from tracing.tracer import get_tracer
class AddIpFilter(logging.Filter):
@@ -27,13 +27,13 @@ def install_trace_record_factory() -> None:
def factory(*args, **kwargs): # type: ignore[no-untyped-def]
record = current_factory(*args, **kwargs)
try:
trace_id, span_id, parent_span_id = get_tracer().current_trace_ids()
record.trace_params = (
f" rid={trace_id} sid={span_id} psid={parent_span_id}"
)
except Exception:
record.trace_params = " rid= sid= psid="
if trace_id and span_id:
record.trace_params = f'rid={trace_id} sid={span_id}'
if parent_span_id:
record.trace_params += f' psid={parent_span_id}'
else:
record.trace_params = ""
return record
logging.setLogRecordFactory(factory)

View File

@@ -14,17 +14,18 @@ from typing import Optional
import requests
from lxml import etree
from mcs_node_control.models.node_config import NodeConfig
from cmapi_server import helpers
from cmapi_server.constants import (
CMAPI_CONF_PATH, CMAPI_SINGLE_NODE_XML, DEFAULT_MCS_CONF_PATH, LOCALHOSTS,
CMAPI_CONF_PATH,
CMAPI_SINGLE_NODE_XML,
DEFAULT_MCS_CONF_PATH,
LOCALHOSTS,
MCS_DATA_PATH,
)
from cmapi_server.traced_session import get_traced_session
from cmapi_server.managers.network import NetworkManager
from cmapi_server.tracer import get_tracer
from mcs_node_control.models.node_config import NodeConfig
from tracing.traced_session import get_traced_session
PMS_NODE_PORT = '8620'
EXEMGR_NODE_PORT = '8601'

View File

@@ -1,197 +0,0 @@
import logging
import socket
import cherrypy
import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from cmapi_server import helpers
from cmapi_server.constants import CMAPI_CONF_PATH
SENTRY_ACTIVE = False
logger = logging.getLogger(__name__)
def maybe_init_sentry() -> bool:
"""Initialize Sentry from CMAPI configuration.
Reads config and initializes Sentry only if dsn parameter is present in corresponding section.
The initialization enables the following integrations:
- LoggingIntegration: capture warning-level logs as Sentry events and use
lower-level logs as breadcrumbs.
- AioHttpIntegration: propagate trace headers for outbound requests made
with `aiohttp`.
The function is a no-op if the DSN is missing.
Returns: True if Sentry is initialized, False otherwise.
"""
global SENTRY_ACTIVE
try:
cfg_parser = helpers.get_config_parser(CMAPI_CONF_PATH)
dsn = helpers.dequote(
cfg_parser.get('Sentry', 'dsn', fallback='').strip()
)
if not dsn:
return False
environment = helpers.dequote(
cfg_parser.get('Sentry', 'environment', fallback='development').strip()
)
traces_sample_rate_str = helpers.dequote(
cfg_parser.get('Sentry', 'traces_sample_rate', fallback='1.0').strip()
)
except Exception:
logger.exception('Failed to initialize Sentry.')
return False
try:
sentry_logging = LoggingIntegration(
level=logging.INFO,
event_level=logging.WARNING,
)
try:
traces_sample_rate = float(traces_sample_rate_str)
except ValueError:
logger.error('Invalid traces_sample_rate: %s', traces_sample_rate_str)
traces_sample_rate = 1.0
sentry_sdk.init(
dsn=dsn,
environment=environment,
traces_sample_rate=traces_sample_rate,
integrations=[sentry_logging, AioHttpIntegration()],
)
SENTRY_ACTIVE = True
logger.info('Sentry initialized for CMAPI via config.')
except Exception:
logger.exception('Failed to initialize Sentry.')
return False
logger.info('Sentry successfully initialized.')
return True
def _sentry_on_start_resource():
"""Start or continue a Sentry transaction for the current CherryPy request.
- Continues an incoming distributed trace using Sentry trace headers if
present; otherwise starts a new transaction with `op='http.server'`.
- Pushes the transaction into the current Sentry scope and attaches useful
request metadata as tags and context (HTTP method, path, client IP,
hostname, request ID, and a filtered subset of headers).
- Stores the transaction on the CherryPy request object for later finishing
in `_sentry_on_end_request`.
"""
if not SENTRY_ACTIVE:
return
try:
request = cherrypy.request
headers = dict(getattr(request, 'headers', {}) or {})
name = f"{request.method} {request.path_info}"
transaction = sentry_sdk.start_transaction(
op='http.server', name=name, continue_from_headers=headers
)
sentry_sdk.Hub.current.scope.set_span(transaction)
# Add request-level context/tags
scope = sentry_sdk.Hub.current.scope
scope.set_tag('http.method', request.method)
scope.set_tag('http.path', request.path_info)
scope.set_tag('client.ip', getattr(request.remote, 'ip', ''))
scope.set_tag('instance.hostname', socket.gethostname())
request_id = getattr(request, 'unique_id', None)
if request_id:
scope.set_tag('request.id', request_id)
# Optionally add headers as context without sensitive values
safe_headers = {k: v for k, v in headers.items()
if k.lower() not in {'authorization', 'x-api-key'}}
scope.set_context('headers', safe_headers)
request.sentry_transaction = transaction
except Exception:
logger.exception('Failed to start Sentry transaction.')
def _sentry_before_error_response():
"""Capture the current exception (if any) to Sentry before error response.
This hook runs when CherryPy prepares an error response. If an exception is
available in the current context, it will be sent to Sentry.
"""
if not SENTRY_ACTIVE:
return
try:
sentry_sdk.capture_exception()
except Exception:
logger.exception('Failed to capture exception to Sentry.')
def _sentry_on_end_request():
"""Finish the Sentry transaction for the current CherryPy request.
Attempts to set the HTTP status code on the active transaction and then
finishes it. If no transaction was started on this request, the function is
a no-op.
"""
if not SENTRY_ACTIVE:
return
try:
request = cherrypy.request
transaction = getattr(request, 'sentry_transaction', None)
if transaction is None:
return
status = cherrypy.response.status
try:
status_code = int(str(status).split()[0])
except Exception:
status_code = None
try:
if status_code is not None and hasattr(transaction, 'set_http_status'):
transaction.set_http_status(status_code)
except Exception:
logger.exception('Failed to set HTTP status code on Sentry transaction.')
transaction.finish()
except Exception:
logger.exception('Failed to finish Sentry transaction.')
class SentryTool(cherrypy.Tool):
"""CherryPy Tool that wires Sentry request lifecycle hooks.
The tool attaches handlers for `on_start_resource`, `before_error_response`,
and `on_end_request` in order to manage Sentry transactions and error
capture across the request lifecycle.
"""
def __init__(self):
cherrypy.Tool.__init__(self, 'on_start_resource', self._tool_callback, priority=50)
@staticmethod
def _tool_callback():
"""Attach Sentry lifecycle callbacks to the current CherryPy request."""
cherrypy.request.hooks.attach(
'on_start_resource', _sentry_on_start_resource, priority=50
)
cherrypy.request.hooks.attach(
'before_error_response', _sentry_before_error_response, priority=60
)
cherrypy.request.hooks.attach(
'on_end_request', _sentry_on_end_request, priority=70
)
def register_sentry_cherrypy_tool() -> None:
"""Register the Sentry CherryPy tool under `tools.sentry`.
This function is safe to call multiple times; failures are silently ignored
to avoid impacting the application startup.
"""
if not SENTRY_ACTIVE:
return
try:
cherrypy.tools.sentry = SentryTool()
except Exception:
logger.exception('Failed to register Sentry CherryPy tool.')

View File

@@ -1,51 +0,0 @@
"""
CherryPy tool that uses the tracer to start a span for each request.
If traceparent header is present in the request, the tool will continue this trace chain.
Otherwise, it will start a new trace (with a new trace_id).
"""
from typing import Dict
import cherrypy
from cmapi_server.tracer import get_tracer
def _on_request_start() -> None:
"""CherryPy tool hook: extract incoming context and start a SERVER span."""
req = cherrypy.request
tracer = get_tracer()
headers: Dict[str, str] = dict(req.headers or {})
trace_id, parent_span_id = tracer.extract_traceparent(headers)
tracer.set_incoming_context(trace_id, parent_span_id)
span_name = f"{getattr(req, 'method', 'HTTP')} {getattr(req, 'path_info', '/')}"
ctx = tracer.start_as_current_span(span_name, kind="SERVER")
span = ctx.__enter__()
setattr(req, "_trace_span_ctx", ctx)
setattr(req, "_trace_span", span)
# Echo current traceparent to the client
tracer.inject_traceparent(cherrypy.response.headers) # type: ignore[arg-type]
def _on_request_end() -> None:
"""CherryPy tool hook: end the SERVER span started at request start."""
req = cherrypy.request
ctx = getattr(req, "_trace_span_ctx", None)
if ctx is not None:
try:
ctx.__exit__(None, None, None)
finally:
setattr(req, "_trace_span_ctx", None)
setattr(req, "_trace_span", None)
def register_tracing_tools() -> None:
"""Register CherryPy tools for request tracing."""
cherrypy.tools.trace = cherrypy.Tool("on_start_resource", _on_request_start, priority=10)
cherrypy.tools.trace_end = cherrypy.Tool("on_end_resource", _on_request_end, priority=80)

View File

@@ -1,210 +0,0 @@
"""Support distributed request tracing via W3C Trace Context.
See https://www.w3.org/TR/trace-context/ for the official spec.
There are 3 and a half main components:
- trace_id: a unique identifier for a trace.
It is a 32-hex string, passed in the outbound HTTP requests headers, so that we can
trace the request chain through the system.
- span_id: a unique identifier for a span (the current operation within a trace chain).
It is a 16-hex string. For example, when we we receive a request to add a host, the addition
of the host is a separate span within the request chain.
- parent_span_id: a unique identifier for the parent span of the current span.
Continuing the example above, when we add a host, first we start a transaction,
then we add the host. If we are already adding a host, then creation of the transaction
is the parent span of the current span.
- traceparent: a header that combines trace_id and span_id in one value.
It has the format "00-<trace_id>-<span_id>-<flags>".
A system that calls CMAPI can pass the traceparent header in the request, so that we can pass
the trace_id through the system, changing span_id as we enter new sub-operations.
We can reconstruct the trace tree from the set of the logged traceparent attributes,
representing how the request was processed, which nodes were involved,
how much time did each op take, etc.
This module implements a tracer class that creates spans, injects/extracts traceparent headers.
It uses contextvars to store the trace/span/parent_span ids and start time for each context.
"""
from __future__ import annotations
import contextvars
import logging
import os
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# Contextvars containing trace/span/parent_span ids and start time for this thread
# (contextvars are something like TLS, they contain variables that are local to the context)
_current_trace_id = contextvars.ContextVar[str]("trace_id", default="")
_current_span_id = contextvars.ContextVar[str]("span_id", default="")
_current_parent_span_id = contextvars.ContextVar[str]("parent_span_id", default="")
_current_span_start_ns = contextvars.ContextVar[int]("span_start_ns", default=0)
def _rand_16_hex() -> str:
# 16 hex bytes (32 hex chars)
return os.urandom(16).hex()
def _rand_8_hex() -> str:
# 8 hex bytes (16 hex chars)
return os.urandom(8).hex()
def format_traceparent(trace_id: str, span_id: str, flags: str = "01") -> str:
"""W3C traceparent: version 00"""
# version(2)-trace_id(32)-span_id(16)-flags(2)
return f"00-{trace_id}-{span_id}-{flags}"
def parse_traceparent(header: str) -> Optional[tuple[str, str, str]]:
"""Return (trace_id, span_id, flags) or None if invalid."""
try:
parts = header.strip().split("-")
if len(parts) != 4 or parts[0] != "00":
logger.error(f"Invalid traceparent: {header}")
return None
trace_id, span_id, flags = parts[1], parts[2], parts[3]
if len(trace_id) != 32 or len(span_id) != 16 or len(flags) != 2:
return None
# W3C: all zero trace_id/span_id are invalid
if set(trace_id) == {"0"} or set(span_id) == {"0"}:
return None
return trace_id, span_id, flags
except Exception:
logger.error(f"Failed to parse traceparent: {header}")
return None
@dataclass
class TraceSpan:
"""Lightweight span handle; keeps attributes in memory (for logging only)."""
name: str
kind: str # "SERVER" | "CLIENT" | "INTERNAL"
start_ns: int
trace_id: str
span_id: str
parent_span_id: str
attributes: Dict[str, Any]
def set_attribute(self, key: str, value: Any) -> None:
self.attributes[key] = value
def add_event(self, name: str, **attrs: Any) -> None:
# For simplicity we just log events immediately
logger.info(
"event name=%s trace_id=%s span_id=%s attrs=%s",
name, self.trace_id, self.span_id, attrs
)
def set_status(self, code: str, description: str = "") -> None:
self.attributes["status.code"] = code
if description:
self.attributes["status.description"] = description
def record_exception(self, exc: BaseException) -> None:
self.add_event("exception", type=type(exc).__name__, msg=str(exc))
class Tracer:
"""Encapsulates everything related to tracing (span creation, logging, etc)"""
@contextmanager
def start_as_current_span(self, name: str, kind: str = "INTERNAL") -> Iterator[TraceSpan]:
trace_id = _current_trace_id.get() or _rand_16_hex()
parent_span = _current_span_id.get()
new_span_id = _rand_8_hex()
# Push new context
tok_tid = _current_trace_id.set(trace_id)
tok_sid = _current_span_id.set(new_span_id)
tok_pid = _current_parent_span_id.set(parent_span)
tok_start = _current_span_start_ns.set(time.time_ns())
span = TraceSpan(
name=name,
kind=kind,
start_ns=_current_span_start_ns.get(),
trace_id=trace_id,
span_id=new_span_id,
parent_span_id=parent_span,
attributes={"span.kind": kind, "span.name": name},
)
try:
logger.info(
"span_begin name=%s kind=%s trace_id=%s span_id=%s parent_span_id=%s attrs=%s",
span.name, span.kind, span.trace_id, span.span_id, span.parent_span_id, span.attributes
)
yield span
except BaseException as exc:
span.record_exception(exc)
span.set_status("ERROR", str(exc))
raise
finally:
# Pop the span from the context (restore parent context)
duration_ms = (time.time_ns() - span.start_ns) / 1_000_000
logger.info(
"span_end name=%s kind=%s trace_id=%s span_id=%s parent_span_id=%s duration_ms=%.3f attrs=%s",
span.name, span.kind, span.trace_id, span.span_id, span.parent_span_id, duration_ms, span.attributes
)
# Restore previous context
_current_span_start_ns.reset(tok_start)
_current_parent_span_id.reset(tok_pid)
_current_span_id.reset(tok_sid)
_current_trace_id.reset(tok_tid)
def set_incoming_context(
self,
trace_id: Optional[str] = None,
parent_span_id: Optional[str] = None,
) -> None:
"""Seed current context with incoming W3C traceparent values.
Only non-empty values are applied.
"""
if trace_id:
_current_trace_id.set(trace_id)
if parent_span_id:
_current_parent_span_id.set(parent_span_id)
def current_trace_ids(self) -> tuple[str, str, str]:
return _current_trace_id.get(), _current_span_id.get(), _current_parent_span_id.get()
def inject_traceparent(self, headers: Dict[str, str]) -> None:
"""Inject W3C traceparent into outbound headers."""
trace_id, span_id, _ = self.current_trace_ids()
if not trace_id or not span_id:
# If called outside of a span, create a short-lived span just to carry IDs
trace_id = trace_id or _rand_16_hex()
span_id = span_id or _rand_8_hex()
headers["traceparent"] = format_traceparent(trace_id, span_id)
def extract_traceparent(self, headers: Dict[str, str]) -> tuple[str, str]:
"""Extract parent trace/span; returns (trace_id, parent_span_id)."""
raw_traceparent = (headers.get("traceparent")
or headers.get("Traceparent")
or headers.get("TRACEPARENT"))
if not raw_traceparent:
return "", ""
parsed = parse_traceparent(raw_traceparent)
if not parsed:
return "", ""
return parsed[0], parsed[1]
# No incoming context
return "", ""
# Tracer singleton for the process (not thread)
_tracer = Tracer()
def get_tracer() -> Tracer:
return _tracer
class TraceLogFilter(logging.Filter):
"""Inject trace/span ids into LogRecord for formatting."""
def filter(self, record: logging.LogRecord) -> bool:
record.traceID, record.spanID, record.parentSpanID = get_tracer().current_trace_ids()
return True

49
cmapi/tracing/__init__.py Normal file
View File

@@ -0,0 +1,49 @@
"""Tracing support for CMAPI
Despite having many files, the idea of this package is simple: MCS is a distributed system,
and we need to be able to trace requests across the system.
We need to understand:
* how one incoming request caused many others
* how long each request took
* which request each log line corresponds to
etc
The basic high-level mechanism is this:
1. Each incoming request is assigned a trace ID (or it may already have one, see point 2).
2. This trace ID is propagated to all other outbound requests that are caused by this request.
3. Each sub-operation is assigned a span ID. Request ID stays the same, but the span ID changes.
4. Each span can have a parent span ID, which is the span ID of the request that caused this span.
5. So basically, we have a tree of spans, and the trace ID identifies the root of the tree.
TraceID/SpanID/ParentSpanID are added to each log line, so we can identify which request each log line corresponds to.
Trace attributes are passed through the system via request headers, and here it becomes a bit more complicated.
There are two technologies that we use to pass these ids:
1. W3C TraceContext. This is a standard, it has a fixed header and its format.
The header is called `traceparent`. It encapsulates trace id and span id.
2. Sentry. For historical reasons, it has different headers. And in our setup it is optional.
But Sentry is very useful, we also use it to monitor the errors, and it has a powerful UI, so we support it too.
How is it implemented?
1. We have a global tracer object, that is used to create spans and pass them through the system.
2. It is a facade that hides two tracing backends with the same interface: TraceparentBackend and SentryBackend.
3. We have CherryPy tool that processes incoming requests, extracts traceparent header (or generates its parts),
creates a span for each request, injects traceparent header into the response.
4. For each outcoming request, we ask tracer to create a new span and to inject tracing headers into the request.
To avoid boilerplate, there is a TracedSession, an extension to requests that does all that.
For async requests, there is a TracedAsyncSession, that does the same.
5. When the request is finished, we ask tracer to finish/pop the current span.
Logging:
There is a trace record factory, that adds a new field trace_params to each log record.
trace_params is a string representation of trace id, span id and parent span id.
If in current context they are empty (like in MainThread that doesn't process requests), trace_params is an empty string.
Sentry reporting:
If Sentry is enabled, we send info about errors and exceptions into it. We also send logs that preceded the problem
as breadcrumbs to understand context of the error.
As we keep Sentry updated about the current trace and span, when an error happens, info about the trace will be sent to Sentry.
So we will know which chain of requests caused the error.
"""

37
cmapi/tracing/backend.py Normal file
View File

@@ -0,0 +1,37 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from tracing.span import TraceSpan
class TracerBackend(ABC):
@abstractmethod
def on_span_start(self, span: TraceSpan) -> None:
raise NotImplementedError
@abstractmethod
def on_span_end(self, span: TraceSpan, exc: Optional[BaseException]) -> None:
raise NotImplementedError
def on_span_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None:
return
def on_span_status(self, span: TraceSpan, code: str, description: str) -> None:
return
def on_span_exception(self, span: TraceSpan, exc: BaseException) -> None:
return
def on_span_attribute(self, span: TraceSpan, key: str, value: Any) -> None:
return
def on_inject_headers(self, headers: Dict[str, str]) -> None:
return
def on_incoming_request(self, headers: Dict[str, str], method: str, path: str) -> None:
return
def on_request_finished(self, status_code: Optional[int]) -> None:
return

65
cmapi/tracing/sentry.py Normal file
View File

@@ -0,0 +1,65 @@
import logging
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration
from cmapi_server import helpers
from cmapi_server.constants import CMAPI_CONF_PATH
from tracing.sentry_backend import SentryBackend
from tracing.tracer import get_tracer
SENTRY_ACTIVE = False
logger = logging.getLogger(__name__)
def maybe_init_sentry() -> bool:
"""Initialize Sentry from CMAPI configuration.
Reads config and initializes Sentry only if dsn parameter is present
in corresponding section of cmapi config.
The initialization enables LoggingIntegration: it captures error-level logs as Sentry events
and uses lower-level logs as breadcrumbs.
Returns: True if Sentry is initialized, False otherwise.
"""
global SENTRY_ACTIVE
try:
cfg_parser = helpers.get_config_parser(CMAPI_CONF_PATH)
dsn = helpers.dequote(
cfg_parser.get('Sentry', 'dsn', fallback='').strip()
)
if not dsn:
return False
environment = helpers.dequote(
cfg_parser.get('Sentry', 'environment', fallback='development').strip()
)
except Exception:
logger.exception('Failed to initialize Sentry.')
return False
try:
sentry_logging = LoggingIntegration(
level=logging.INFO,
event_level=logging.ERROR,
)
sentry_sdk.init(
dsn=dsn,
environment=environment,
sample_rate=1.0,
traces_sample_rate=1.0,
integrations=[sentry_logging],
debug=True,
)
SENTRY_ACTIVE = True
# Register backend to mirror our internal spans into Sentry
get_tracer().register_backend(SentryBackend())
logger.info('Sentry initialized for CMAPI via config.')
except Exception:
logger.exception('Failed to initialize Sentry.')
return False
logger.info('Sentry successfully initialized.')
return True

View File

@@ -0,0 +1,114 @@
import logging
import contextvars
from typing import Any, Dict, Optional
import sentry_sdk
from sentry_sdk.tracing import Transaction
from tracing.tracer import TraceSpan, TracerBackend
from tracing.utils import swallow_exceptions
logger = logging.getLogger(__name__)
class SentryBackend(TracerBackend):
"""Mirror spans and events from our Tracer into Sentry SDK."""
def __init__(self) -> None:
self._active_spans: Dict[str, Any] = {}
self._current_transaction = contextvars.ContextVar[Optional[Transaction]]("sentry_transaction", default=None)
@swallow_exceptions
def on_span_start(self, span: TraceSpan) -> None:
kind_to_op = {
'SERVER': 'http.server',
'CLIENT': 'http.client',
'INTERNAL': 'internal',
}
op = kind_to_op.get(span.kind.upper(), 'internal')
sdk_span = sentry_sdk.start_span(op=op, description=span.name)
sdk_span.set_tag('w3c.trace_id', span.trace_id)
sdk_span.set_tag('w3c.span_id', span.span_id)
if span.parent_span_id:
sdk_span.set_tag('w3c.parent_span_id', span.parent_span_id)
if span.attributes:
sdk_span.set_data('cmapi.span_attributes', dict(span.attributes))
sdk_span.__enter__()
self._active_spans[span.span_id] = sdk_span
@swallow_exceptions
def on_span_end(self, span: TraceSpan, exc: Optional[BaseException]) -> None:
sdk_span = self._active_spans.pop(span.span_id, None)
if sdk_span is None:
return
if exc is not None:
sdk_span.set_status('internal_error')
sdk_span.__exit__(
type(exc) if exc else None,
exc,
exc.__traceback__ if exc else None
)
@swallow_exceptions
def on_span_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None:
sentry_sdk.add_breadcrumb(category='event', message=name, data=dict(attrs))
@swallow_exceptions
def on_span_status(self, span: TraceSpan, code: str, description: str) -> None:
sdk_span = self._active_spans.get(span.span_id)
if sdk_span is not None:
sdk_span.set_status(code)
@swallow_exceptions
def on_span_exception(self, span: TraceSpan, exc: BaseException) -> None:
sentry_sdk.capture_exception(exc)
@swallow_exceptions
def on_span_attribute(self, span: TraceSpan, key: str, value: Any) -> None:
sdk_span = self._active_spans.get(span.span_id)
if sdk_span is not None:
sdk_span.set_data(f'attr.{key}', value)
@swallow_exceptions
def on_inject_headers(self, headers: Dict[str, str]) -> None:
traceparent = sentry_sdk.get_traceparent()
baggage = sentry_sdk.get_baggage()
if traceparent:
headers['sentry-trace'] = traceparent
if baggage:
headers['baggage'] = baggage
@swallow_exceptions
def on_incoming_request(self, headers: Dict[str, str], method: str, path: str) -> None:
name = f"{method} {path}" if method or path else "http.server"
transaction = sentry_sdk.continue_trace(headers or {}, op='http.server', name=name)
# Store transaction in context var to ensure we can finish it later
self._current_transaction.set(transaction)
transaction.__enter__()
scope = sentry_sdk.Hub.current.scope
if method:
scope.set_tag('http.method', method)
if path:
scope.set_tag('http.path', path)
# TODO: remove this
logger.info(
"Sentry transaction started name=%s sampled=%s",
name, getattr(transaction, 'sampled', None)
)
@swallow_exceptions
def on_request_finished(self, status_code: Optional[int]) -> None:
transaction = self._current_transaction.get()
if transaction is None:
return
if status_code is not None:
transaction.set_http_status(status_code)
# Exit to restore parent and finish the transaction
transaction.__exit__(None, None, None)
# Clear transaction in this context
self._current_transaction.set(None)
# TODO: remove this
logger.info("Sentry transaction finished status=%s", status_code)

46
cmapi/tracing/span.py Normal file
View File

@@ -0,0 +1,46 @@
from typing import TYPE_CHECKING
from dataclasses import dataclass
from typing import Any, Dict
from tracing.utils import swallow_exceptions
if TYPE_CHECKING:
from tracing.tracer import Tracer
@dataclass
class TraceSpan:
"""Span handle bound to a tracer.
Provides helpers to add attributes/events/status/exception that
will never propagate exceptions.
"""
name: str
kind: str # "SERVER" | "CLIENT" | "INTERNAL"
start_ns: int
trace_id: str
span_id: str
parent_span_id: str
attributes: Dict[str, Any]
tracer: "Tracer"
@swallow_exceptions
def set_attribute(self, key: str, value: Any) -> None:
self.attributes[key] = value
self.tracer._notify_attribute(self, key, value)
@swallow_exceptions
def add_event(self, name: str, **attrs: Any) -> None:
self.tracer._notify_event(self, name, attrs)
@swallow_exceptions
def set_status(self, code: str, description: str = "") -> None:
self.attributes["status.code"] = code
if description:
self.attributes["status.description"] = description
self.tracer._notify_status(self, code, description)
@swallow_exceptions
def record_exception(self, exc: BaseException) -> None:
self.tracer._notify_exception(self, exc)

View File

@@ -0,0 +1,67 @@
"""
CherryPy tool that uses the tracer to start a span for each request.
"""
import socket
from typing import Dict
import cherrypy
from tracing.tracer import get_tracer
def _on_request_start() -> None:
req = cherrypy.request
tracer = get_tracer()
headers: Dict[str, str] = dict(req.headers or {})
tracer.notify_incoming_request(
headers=headers,
method=getattr(req, 'method', ''),
path=getattr(req, 'path_info', '')
)
trace_id, parent_span_id = tracer.extract_traceparent(headers)
tracer.set_incoming_context(trace_id, parent_span_id)
span_name = f"{getattr(req, 'method', 'HTTP')} {getattr(req, 'path_info', '/')}"
ctx = tracer.start_as_current_span(span_name, kind="SERVER")
span = ctx.__enter__()
span.set_attribute('http.method', getattr(req, 'method', ''))
span.set_attribute('http.path', getattr(req, 'path_info', ''))
span.set_attribute('client.ip', getattr(getattr(req, 'remote', None), 'ip', ''))
span.set_attribute('instance.hostname', socket.gethostname())
safe_headers = {k: v for k, v in headers.items() if k.lower() not in {'authorization', 'x-api-key'}}
span.set_attribute('sentry.incoming_headers', safe_headers)
req._trace_span_ctx = ctx
req._trace_span = span
tracer.inject_traceparent(cherrypy.response.headers) # type: ignore[arg-type]
def _on_request_end() -> None:
req = cherrypy.request
try:
status_str = str(cherrypy.response.status)
status_code = int(status_str.split()[0])
except Exception:
status_code = None
tracer = get_tracer()
tracer.notify_request_finished(status_code)
span = getattr(req, "_trace_span", None)
if span is not None and status_code is not None:
span.set_attribute('http.status_code', status_code)
ctx = getattr(req, "_trace_span_ctx", None)
if ctx is not None:
try:
ctx.__exit__(None, None, None)
finally:
req._trace_span_ctx = None
req._trace_span = None
def register_tracing_tools() -> None:
cherrypy.tools.trace = cherrypy.Tool("on_start_resource", _on_request_start, priority=10)
cherrypy.tools.trace_end = cherrypy.Tool("on_end_resource", _on_request_end, priority=80)

View File

@@ -1,10 +1,9 @@
"""Async sibling of TracedSession"""
"""Async sibling of TracedSession."""
from typing import Any
import aiohttp
from cmapi_server.tracer import get_tracer
from tracing.tracer import get_tracer
class TracedAsyncSession(aiohttp.ClientSession):
@@ -23,10 +22,7 @@ class TracedAsyncSession(aiohttp.ClientSession):
with tracer.start_as_current_span(span_name, kind="CLIENT") as span:
span.set_attribute("http.method", method)
span.set_attribute("http.url", url_text)
try:
tracer.inject_traceparent(headers)
except Exception:
pass
tracer.inject_outbound_headers(headers)
try:
response = await super()._request(method, str_or_url, *args, **kwargs)
except Exception as exc:
@@ -41,3 +37,4 @@ def create_traced_async_session(**kwargs: Any) -> TracedAsyncSession:
return TracedAsyncSession(**kwargs)

View File

@@ -1,19 +1,12 @@
"""Our own customized requests.Session that automatically traces outbound HTTP calls.
Creates a CLIENT span per outbound HTTP request, injects traceparent,
records method/url/status, and closes the span when the request finishes.
"""
"""Customized requests.Session that automatically traces outbound HTTP calls."""
from typing import Any, Optional
import requests
from cmapi_server.tracer import get_tracer
from tracing.tracer import get_tracer
class TracedSession(requests.Session):
"""requests.Session that automatically traces outbound HTTP calls."""
def request(self, method: str, url: str, *args: Any, **kwargs: Any) -> requests.Response:
tracer = get_tracer()
@@ -27,14 +20,13 @@ class TracedSession(requests.Session):
span.set_attribute("http.method", method)
span.set_attribute("http.url", url)
tracer.inject_traceparent(headers)
tracer.inject_outbound_headers(headers)
try:
response = super().request(method, url, *args, **kwargs)
except Exception as exc:
span.set_status("ERROR", str(exc))
raise
else:
# Record status code
span.set_attribute("http.status_code", response.status_code)
return response
@@ -43,10 +35,10 @@ _default_session: Optional[TracedSession] = None
def get_traced_session() -> TracedSession:
"""Return a process-wide TracedSession singleton."""
global _default_session
if _default_session is None:
_default_session = TracedSession()
return _default_session

View File

@@ -0,0 +1,38 @@
import logging
import time
from typing import Any, Dict, Optional
from tracing.tracer import TracerBackend, TraceSpan
from tracing.utils import swallow_exceptions
logger = logging.getLogger(__name__)
class TraceparentBackend(TracerBackend):
"""Default backend that logs span lifecycle and mirrors events/status."""
@swallow_exceptions
def on_span_start(self, span: TraceSpan) -> None:
logger.info(
"span_begin name=%s kind=%s trace_id=%s span_id=%s parent=%s attrs=%s",
span.name, span.kind, span.trace_id, span.span_id,
span.parent_span_id, span.attributes,
)
@swallow_exceptions
def on_span_end(self, span: TraceSpan, exc: Optional[BaseException]) -> None:
duration_ms = (time.time_ns() - span.start_ns) / 1_000_000
logger.info(
"span_end name=%s kind=%s trace_id=%s span_id=%s parent=%s duration_ms=%.3f attrs=%s",
span.name, span.kind, span.trace_id, span.span_id,
span.parent_span_id, duration_ms, span.attributes,
)
@swallow_exceptions
def on_span_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None:
logger.info(
"span_event name=%s trace_id=%s span_id=%s attrs=%s",
name, span.trace_id, span.span_id, attrs,
)

155
cmapi/tracing/tracer.py Normal file
View File

@@ -0,0 +1,155 @@
"""This module implements a tracer facade that creates spans, injects/extracts traceparent headers,
and delegates span lifecycle and enrichment to pluggable backends (e.g., Traceparent and Sentry).
It uses contextvars to store the trace/span/parent_span ids and start time for each context.
"""
import contextvars
import logging
import time
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Tuple
from tracing.backend import TracerBackend
from tracing.span import TraceSpan
from tracing.utils import (
rand_16_hex,
rand_8_hex,
format_traceparent,
parse_traceparent,
)
logger = logging.getLogger(__name__)
# Context vars are something like thread-local storage, they are context-local variables
_current_trace_id = contextvars.ContextVar[str]("trace_id", default="")
_current_span_id = contextvars.ContextVar[str]("span_id", default="")
_current_parent_span_id = contextvars.ContextVar[str]("parent_span_id", default="")
_current_span_start_ns = contextvars.ContextVar[int]("span_start_ns", default=0)
class Tracer:
def __init__(self) -> None:
self._backends: List[TracerBackend] = []
def register_backend(self, backend: TracerBackend) -> None:
try:
self._backends.append(backend)
logger.info(
"Tracing backend registered: %s", backend.__class__.__name__
)
except Exception:
logger.exception("Failed to register tracing backend")
def clear_backends(self) -> None:
self._backends.clear()
@contextmanager
def start_as_current_span(self, name: str, kind: str = "INTERNAL") -> Iterator[TraceSpan]:
trace_id = _current_trace_id.get() or rand_16_hex()
parent_span = _current_span_id.get()
new_span_id = rand_8_hex()
tok_tid = _current_trace_id.set(trace_id)
tok_sid = _current_span_id.set(new_span_id)
tok_pid = _current_parent_span_id.set(parent_span)
tok_start = _current_span_start_ns.set(time.time_ns())
span = TraceSpan(
name=name,
kind=kind,
start_ns=_current_span_start_ns.get(),
trace_id=trace_id,
span_id=new_span_id,
parent_span_id=parent_span,
attributes={"span.kind": kind, "span.name": name},
tracer=self,
)
caught_exc: Optional[BaseException] = None
try:
for backend in list(self._backends):
backend.on_span_start(span)
yield span
except BaseException as exc:
span.record_exception(exc)
span.set_status("ERROR", str(exc))
caught_exc = exc
raise
finally:
for backend in list(self._backends):
backend.on_span_end(span, caught_exc)
_current_span_start_ns.reset(tok_start)
_current_parent_span_id.reset(tok_pid)
_current_span_id.reset(tok_sid)
_current_trace_id.reset(tok_tid)
def set_incoming_context(
self,
trace_id: Optional[str] = None,
parent_span_id: Optional[str] = None,
) -> None:
if trace_id:
_current_trace_id.set(trace_id)
if parent_span_id:
_current_parent_span_id.set(parent_span_id)
def current_trace_ids(self) -> Tuple[str, str, str]:
return _current_trace_id.get(), _current_span_id.get(), _current_parent_span_id.get()
def inject_traceparent(self, headers: Dict[str, str]) -> None:
trace_id, span_id, _ = self.current_trace_ids()
if not trace_id or not span_id:
trace_id = trace_id or rand_16_hex()
span_id = span_id or rand_8_hex()
headers["traceparent"] = format_traceparent(trace_id, span_id)
def inject_outbound_headers(self, headers: Dict[str, str]) -> None:
self.inject_traceparent(headers)
for backend in list(self._backends):
backend.on_inject_headers(headers)
def notify_incoming_request(self, headers: Dict[str, str], method: str, path: str) -> None:
for backend in list(self._backends):
backend.on_incoming_request(headers, method, path)
def notify_request_finished(self, status_code: Optional[int]) -> None:
for backend in list(self._backends):
backend.on_request_finished(status_code)
def extract_traceparent(self, headers: Dict[str, str]) -> Tuple[str, str]:
raw_traceparent = (headers.get("traceparent")
or headers.get("Traceparent")
or headers.get("TRACEPARENT"))
if not raw_traceparent:
return "", ""
parsed = parse_traceparent(raw_traceparent)
if not parsed:
return "", ""
return parsed[0], parsed[1]
def _notify_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None:
for backend in list(self._backends):
backend.on_span_event(span, name, attrs)
def _notify_status(self, span: TraceSpan, code: str, description: str) -> None:
for backend in list(self._backends):
backend.on_span_status(span, code, description)
def _notify_exception(self, span: TraceSpan, exc: BaseException) -> None:
for backend in list(self._backends):
backend.on_span_exception(span, exc)
def _notify_attribute(self, span: TraceSpan, key: str, value: Any) -> None:
for backend in list(self._backends):
backend.on_span_attribute(span, key, value)
_tracer = Tracer()
def get_tracer() -> Tracer:
return _tracer

54
cmapi/tracing/utils.py Normal file
View File

@@ -0,0 +1,54 @@
import logging
import os
from functools import wraps
from typing import Optional, Tuple
logger = logging.getLogger("tracing")
def swallow_exceptions(method):
"""Decorator that logs exceptions and prevents them from propagating up."""
@wraps(method)
def _wrapper(*args, **kwargs):
try:
return method(*args, **kwargs)
except Exception:
logger.exception("%s failed", getattr(method, "__qualname__", repr(method)))
return None
return _wrapper
def rand_16_hex() -> str:
"""Return 16 random bytes as a 32-char hex string (trace_id size)."""
return os.urandom(16).hex()
def rand_8_hex() -> str:
"""Return 8 random bytes as a 16-char hex string (span_id size)."""
return os.urandom(8).hex()
def format_traceparent(trace_id: str, span_id: str, flags: str = "01") -> str:
"""Build a W3C traceparent header (version 00)."""
return f"00-{trace_id}-{span_id}-{flags}"
def parse_traceparent(header: str) -> Optional[Tuple[str, str, str]]:
"""Parse W3C traceparent and return (trace_id, span_id, flags) or None."""
try:
parts = header.strip().split("-")
if len(parts) != 4 or parts[0] != "00":
logger.error("Invalid traceparent: %s", header)
return None
trace_id, span_id, flags = parts[1], parts[2], parts[3]
if len(trace_id) != 32 or len(span_id) != 16 or len(flags) != 2:
return None
# W3C: all zero trace_id/span_id are invalid
if set(trace_id) == {"0"} or set(span_id) == {"0"}:
return None
return trace_id, span_id, flags
except Exception:
logger.exception("Failed to parse traceparent: %s", header)
return None