diff --git a/cmapi/CMakeLists.txt b/cmapi/CMakeLists.txt index 43f44816d..5130ec85f 100644 --- a/cmapi/CMakeLists.txt +++ b/cmapi/CMakeLists.txt @@ -72,6 +72,7 @@ install( cmapi_server engine_files mcs_cluster_tool + tracing DESTINATION ${CMAPI_DIR} USE_SOURCE_PERMISSIONS PATTERN "test" EXCLUDE diff --git a/cmapi/cmapi_server/__main__.py b/cmapi/cmapi_server/__main__.py index 5a8a2de58..e037f4f7f 100644 --- a/cmapi/cmapi_server/__main__.py +++ b/cmapi/cmapi_server/__main__.py @@ -16,9 +16,11 @@ from cherrypy.process import plugins # TODO: fix dispatcher choose logic because code executing in endpoints.py # while import process, this cause module logger misconfiguration from cmapi_server.logging_management import config_cmapi_server_logging -from cmapi_server.sentry import maybe_init_sentry, register_sentry_cherrypy_tool +from tracing.sentry import maybe_init_sentry +from tracing.traceparent_backend import TraceparentBackend +from tracing.tracer import get_tracer config_cmapi_server_logging() -from cmapi_server.trace_tool import register_tracing_tools +from tracing.trace_tool import register_tracing_tools from cmapi_server import helpers from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, CMAPI_CONF_PATH @@ -143,10 +145,8 @@ if __name__ == '__main__': helpers.cmapi_config_check() register_tracing_tools() - # Init Sentry if DSN is present - sentry_active = maybe_init_sentry() - if sentry_active: - register_sentry_cherrypy_tool() + get_tracer().register_backend(TraceparentBackend()) # Register default tracing backend + maybe_init_sentry() # Init Sentry if DSN is present CertificateManager.create_self_signed_certificate_if_not_exist() CertificateManager.renew_certificate() @@ -159,8 +159,6 @@ if __name__ == '__main__': 'tools.trace.on': True, 'tools.trace_end.on': True, } - if sentry_active: - root_config["tools.sentry.on"] = True app.config.update({ '/': root_config, diff --git a/cmapi/cmapi_server/cmapi_logger.conf b/cmapi/cmapi_server/cmapi_logger.conf index 2bc3d383d..301eed718 100644 --- a/cmapi/cmapi_server/cmapi_logger.conf +++ b/cmapi/cmapi_server/cmapi_logger.conf @@ -7,11 +7,11 @@ }, "formatters": { "cmapi_server": { - "format": "%(asctime)s [%(levelname)s] (%(name)s) {%(threadName)s} %(ip)s %(message)s", + "format": "%(asctime)s [%(levelname)s] (%(name)s) {%(threadName)s} %(ip)s %(message)s %(trace_params)s", "datefmt": "%d/%b/%Y %H:%M:%S" }, "default": { - "format": "%(asctime)s [%(levelname)s] (%(name)s) {%(threadName)s} %(message)s", + "format": "%(asctime)s [%(levelname)s] (%(name)s) {%(threadName)s} %(message)s %(trace_params)s", "datefmt": "%d/%b/%Y %H:%M:%S" }, "container_sh": { @@ -75,7 +75,7 @@ "level": "DEBUG", "propagate": false }, - "": { + "root": { "handlers": ["console", "file"], "level": "DEBUG" } diff --git a/cmapi/cmapi_server/controllers/api_clients.py b/cmapi/cmapi_server/controllers/api_clients.py index 74c8723a9..27063543e 100644 --- a/cmapi/cmapi_server/controllers/api_clients.py +++ b/cmapi/cmapi_server/controllers/api_clients.py @@ -3,14 +3,16 @@ from typing import Any, Dict, Optional, Union import pyotp import requests -from cmapi_server.traced_session import get_traced_session -from cmapi_server.controllers.dispatcher import _version from cmapi_server.constants import ( - CMAPI_CONF_PATH, CURRENT_NODE_CMAPI_URL, SECRET_KEY, + CMAPI_CONF_PATH, + CURRENT_NODE_CMAPI_URL, + SECRET_KEY, ) +from cmapi_server.controllers.dispatcher import _version from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.helpers import get_config_parser, get_current_key +from tracing.traced_session import get_traced_session class ClusterControllerClient: diff --git a/cmapi/cmapi_server/handlers/cluster.py b/cmapi/cmapi_server/handlers/cluster.py index 0f0d08606..c78f5350b 100644 --- a/cmapi/cmapi_server/handlers/cluster.py +++ b/cmapi/cmapi_server/handlers/cluster.py @@ -4,21 +4,30 @@ from datetime import datetime from enum import Enum from typing import Optional -from cmapi_server.traced_session import get_traced_session +from mcs_node_control.models.misc import get_dbrm_master +from mcs_node_control.models.node_config import NodeConfig from cmapi_server.constants import ( - CMAPI_CONF_PATH, DEFAULT_MCS_CONF_PATH, + CMAPI_CONF_PATH, + DEFAULT_MCS_CONF_PATH, ) from cmapi_server.exceptions import CMAPIBasicError from cmapi_server.helpers import ( - broadcast_new_config, get_active_nodes, get_dbroots, get_config_parser, - get_current_key, get_version, update_revision_and_manager, + broadcast_new_config, + get_active_nodes, + get_config_parser, + get_current_key, + get_dbroots, + get_version, + update_revision_and_manager, ) from cmapi_server.node_manipulation import ( - add_node, add_dbroot, remove_node, switch_node_maintenance, + add_dbroot, + add_node, + remove_node, + switch_node_maintenance, ) -from mcs_node_control.models.misc import get_dbrm_master -from mcs_node_control.models.node_config import NodeConfig +from tracing.traced_session import get_traced_session class ClusterAction(Enum): @@ -50,7 +59,7 @@ def toggle_cluster_state( broadcast_new_config(config, distribute_secrets=True) -class ClusterHandler(): +class ClusterHandler: """Class for handling MCS Cluster operations.""" @staticmethod diff --git a/cmapi/cmapi_server/helpers.py b/cmapi/cmapi_server/helpers.py index be38db2f5..cb9baf1df 100644 --- a/cmapi/cmapi_server/helpers.py +++ b/cmapi/cmapi_server/helpers.py @@ -19,8 +19,8 @@ from urllib.parse import urlencode, urlunparse import aiohttp import lxml.objectify import requests -from cmapi_server.traced_session import get_traced_session -from cmapi_server.traced_aiohttp import create_traced_async_session +from tracing.traced_session import get_traced_session +from tracing.traced_aiohttp import create_traced_async_session from cmapi_server.exceptions import CMAPIBasicError # Bug in pylint https://github.com/PyCQA/pylint/issues/4584 diff --git a/cmapi/cmapi_server/logging_management.py b/cmapi/cmapi_server/logging_management.py index 837b4267f..9949db431 100644 --- a/cmapi/cmapi_server/logging_management.py +++ b/cmapi/cmapi_server/logging_management.py @@ -7,7 +7,7 @@ import cherrypy from cherrypy import _cperror from cmapi_server.constants import CMAPI_LOG_CONF_PATH -from cmapi_server.tracer import get_tracer +from tracing.tracer import get_tracer class AddIpFilter(logging.Filter): @@ -27,13 +27,13 @@ def install_trace_record_factory() -> None: def factory(*args, **kwargs): # type: ignore[no-untyped-def] record = current_factory(*args, **kwargs) - try: - trace_id, span_id, parent_span_id = get_tracer().current_trace_ids() - record.trace_params = ( - f" rid={trace_id} sid={span_id} psid={parent_span_id}" - ) - except Exception: - record.trace_params = " rid= sid= psid=" + trace_id, span_id, parent_span_id = get_tracer().current_trace_ids() + if trace_id and span_id: + record.trace_params = f'rid={trace_id} sid={span_id}' + if parent_span_id: + record.trace_params += f' psid={parent_span_id}' + else: + record.trace_params = "" return record logging.setLogRecordFactory(factory) diff --git a/cmapi/cmapi_server/node_manipulation.py b/cmapi/cmapi_server/node_manipulation.py index ff0d5259c..4d2d9e5dd 100644 --- a/cmapi/cmapi_server/node_manipulation.py +++ b/cmapi/cmapi_server/node_manipulation.py @@ -14,17 +14,18 @@ from typing import Optional import requests from lxml import etree +from mcs_node_control.models.node_config import NodeConfig from cmapi_server import helpers from cmapi_server.constants import ( - CMAPI_CONF_PATH, CMAPI_SINGLE_NODE_XML, DEFAULT_MCS_CONF_PATH, LOCALHOSTS, + CMAPI_CONF_PATH, + CMAPI_SINGLE_NODE_XML, + DEFAULT_MCS_CONF_PATH, + LOCALHOSTS, MCS_DATA_PATH, ) -from cmapi_server.traced_session import get_traced_session from cmapi_server.managers.network import NetworkManager -from cmapi_server.tracer import get_tracer -from mcs_node_control.models.node_config import NodeConfig - +from tracing.traced_session import get_traced_session PMS_NODE_PORT = '8620' EXEMGR_NODE_PORT = '8601' diff --git a/cmapi/cmapi_server/sentry.py b/cmapi/cmapi_server/sentry.py deleted file mode 100644 index 7777ee8fc..000000000 --- a/cmapi/cmapi_server/sentry.py +++ /dev/null @@ -1,197 +0,0 @@ -import logging -import socket - -import cherrypy -import sentry_sdk -from sentry_sdk.integrations.aiohttp import AioHttpIntegration -from sentry_sdk.integrations.logging import LoggingIntegration - -from cmapi_server import helpers -from cmapi_server.constants import CMAPI_CONF_PATH - -SENTRY_ACTIVE = False - -logger = logging.getLogger(__name__) - -def maybe_init_sentry() -> bool: - """Initialize Sentry from CMAPI configuration. - - Reads config and initializes Sentry only if dsn parameter is present in corresponding section. - The initialization enables the following integrations: - - LoggingIntegration: capture warning-level logs as Sentry events and use - lower-level logs as breadcrumbs. - - AioHttpIntegration: propagate trace headers for outbound requests made - with `aiohttp`. - - The function is a no-op if the DSN is missing. - - Returns: True if Sentry is initialized, False otherwise. - """ - global SENTRY_ACTIVE - try: - cfg_parser = helpers.get_config_parser(CMAPI_CONF_PATH) - dsn = helpers.dequote( - cfg_parser.get('Sentry', 'dsn', fallback='').strip() - ) - if not dsn: - return False - - environment = helpers.dequote( - cfg_parser.get('Sentry', 'environment', fallback='development').strip() - ) - traces_sample_rate_str = helpers.dequote( - cfg_parser.get('Sentry', 'traces_sample_rate', fallback='1.0').strip() - ) - except Exception: - logger.exception('Failed to initialize Sentry.') - return False - - try: - sentry_logging = LoggingIntegration( - level=logging.INFO, - event_level=logging.WARNING, - ) - - try: - traces_sample_rate = float(traces_sample_rate_str) - except ValueError: - logger.error('Invalid traces_sample_rate: %s', traces_sample_rate_str) - traces_sample_rate = 1.0 - - sentry_sdk.init( - dsn=dsn, - environment=environment, - traces_sample_rate=traces_sample_rate, - integrations=[sentry_logging, AioHttpIntegration()], - ) - SENTRY_ACTIVE = True - logger.info('Sentry initialized for CMAPI via config.') - except Exception: - logger.exception('Failed to initialize Sentry.') - return False - - logger.info('Sentry successfully initialized.') - return True - -def _sentry_on_start_resource(): - """Start or continue a Sentry transaction for the current CherryPy request. - - - Continues an incoming distributed trace using Sentry trace headers if - present; otherwise starts a new transaction with `op='http.server'`. - - Pushes the transaction into the current Sentry scope and attaches useful - request metadata as tags and context (HTTP method, path, client IP, - hostname, request ID, and a filtered subset of headers). - - Stores the transaction on the CherryPy request object for later finishing - in `_sentry_on_end_request`. - """ - if not SENTRY_ACTIVE: - return - try: - request = cherrypy.request - headers = dict(getattr(request, 'headers', {}) or {}) - name = f"{request.method} {request.path_info}" - transaction = sentry_sdk.start_transaction( - op='http.server', name=name, continue_from_headers=headers - ) - sentry_sdk.Hub.current.scope.set_span(transaction) - - # Add request-level context/tags - scope = sentry_sdk.Hub.current.scope - scope.set_tag('http.method', request.method) - scope.set_tag('http.path', request.path_info) - scope.set_tag('client.ip', getattr(request.remote, 'ip', '')) - scope.set_tag('instance.hostname', socket.gethostname()) - request_id = getattr(request, 'unique_id', None) - if request_id: - scope.set_tag('request.id', request_id) - # Optionally add headers as context without sensitive values - safe_headers = {k: v for k, v in headers.items() - if k.lower() not in {'authorization', 'x-api-key'}} - scope.set_context('headers', safe_headers) - - request.sentry_transaction = transaction - except Exception: - logger.exception('Failed to start Sentry transaction.') - - -def _sentry_before_error_response(): - """Capture the current exception (if any) to Sentry before error response. - - This hook runs when CherryPy prepares an error response. If an exception is - available in the current context, it will be sent to Sentry. - """ - if not SENTRY_ACTIVE: - return - try: - sentry_sdk.capture_exception() - except Exception: - logger.exception('Failed to capture exception to Sentry.') - - -def _sentry_on_end_request(): - """Finish the Sentry transaction for the current CherryPy request. - - Attempts to set the HTTP status code on the active transaction and then - finishes it. If no transaction was started on this request, the function is - a no-op. - """ - if not SENTRY_ACTIVE: - return - try: - request = cherrypy.request - transaction = getattr(request, 'sentry_transaction', None) - if transaction is None: - return - status = cherrypy.response.status - try: - status_code = int(str(status).split()[0]) - except Exception: - status_code = None - try: - if status_code is not None and hasattr(transaction, 'set_http_status'): - transaction.set_http_status(status_code) - except Exception: - logger.exception('Failed to set HTTP status code on Sentry transaction.') - transaction.finish() - except Exception: - logger.exception('Failed to finish Sentry transaction.') - - -class SentryTool(cherrypy.Tool): - """CherryPy Tool that wires Sentry request lifecycle hooks. - - The tool attaches handlers for `on_start_resource`, `before_error_response`, - and `on_end_request` in order to manage Sentry transactions and error - capture across the request lifecycle. - """ - def __init__(self): - cherrypy.Tool.__init__(self, 'on_start_resource', self._tool_callback, priority=50) - - @staticmethod - def _tool_callback(): - """Attach Sentry lifecycle callbacks to the current CherryPy request.""" - cherrypy.request.hooks.attach( - 'on_start_resource', _sentry_on_start_resource, priority=50 - ) - cherrypy.request.hooks.attach( - 'before_error_response', _sentry_before_error_response, priority=60 - ) - cherrypy.request.hooks.attach( - 'on_end_request', _sentry_on_end_request, priority=70 - ) - - -def register_sentry_cherrypy_tool() -> None: - """Register the Sentry CherryPy tool under `tools.sentry`. - - This function is safe to call multiple times; failures are silently ignored - to avoid impacting the application startup. - """ - if not SENTRY_ACTIVE: - return - - try: - cherrypy.tools.sentry = SentryTool() - except Exception: - logger.exception('Failed to register Sentry CherryPy tool.') - diff --git a/cmapi/cmapi_server/trace_tool.py b/cmapi/cmapi_server/trace_tool.py deleted file mode 100644 index 7164373d6..000000000 --- a/cmapi/cmapi_server/trace_tool.py +++ /dev/null @@ -1,51 +0,0 @@ -""" -CherryPy tool that uses the tracer to start a span for each request. - -If traceparent header is present in the request, the tool will continue this trace chain. -Otherwise, it will start a new trace (with a new trace_id). -""" -from typing import Dict - -import cherrypy - -from cmapi_server.tracer import get_tracer - - -def _on_request_start() -> None: - """CherryPy tool hook: extract incoming context and start a SERVER span.""" - req = cherrypy.request - tracer = get_tracer() - - headers: Dict[str, str] = dict(req.headers or {}) - trace_id, parent_span_id = tracer.extract_traceparent(headers) - tracer.set_incoming_context(trace_id, parent_span_id) - - span_name = f"{getattr(req, 'method', 'HTTP')} {getattr(req, 'path_info', '/')}" - - ctx = tracer.start_as_current_span(span_name, kind="SERVER") - span = ctx.__enter__() - setattr(req, "_trace_span_ctx", ctx) - setattr(req, "_trace_span", span) - - # Echo current traceparent to the client - tracer.inject_traceparent(cherrypy.response.headers) # type: ignore[arg-type] - - -def _on_request_end() -> None: - """CherryPy tool hook: end the SERVER span started at request start.""" - req = cherrypy.request - ctx = getattr(req, "_trace_span_ctx", None) - if ctx is not None: - try: - ctx.__exit__(None, None, None) - finally: - setattr(req, "_trace_span_ctx", None) - setattr(req, "_trace_span", None) - - -def register_tracing_tools() -> None: - """Register CherryPy tools for request tracing.""" - cherrypy.tools.trace = cherrypy.Tool("on_start_resource", _on_request_start, priority=10) - cherrypy.tools.trace_end = cherrypy.Tool("on_end_resource", _on_request_end, priority=80) - - diff --git a/cmapi/cmapi_server/tracer.py b/cmapi/cmapi_server/tracer.py deleted file mode 100644 index 1b85b2e4d..000000000 --- a/cmapi/cmapi_server/tracer.py +++ /dev/null @@ -1,210 +0,0 @@ -"""Support distributed request tracing via W3C Trace Context. -See https://www.w3.org/TR/trace-context/ for the official spec. - -There are 3 and a half main components: -- trace_id: a unique identifier for a trace. - It is a 32-hex string, passed in the outbound HTTP requests headers, so that we can - trace the request chain through the system. -- span_id: a unique identifier for a span (the current operation within a trace chain). - It is a 16-hex string. For example, when we we receive a request to add a host, the addition - of the host is a separate span within the request chain. -- parent_span_id: a unique identifier for the parent span of the current span. - Continuing the example above, when we add a host, first we start a transaction, - then we add the host. If we are already adding a host, then creation of the transaction - is the parent span of the current span. -- traceparent: a header that combines trace_id and span_id in one value. - It has the format "00---". - -A system that calls CMAPI can pass the traceparent header in the request, so that we can pass - the trace_id through the system, changing span_id as we enter new sub-operations. - -We can reconstruct the trace tree from the set of the logged traceparent attributes, - representing how the request was processed, which nodes were involved, - how much time did each op take, etc. - -This module implements a tracer class that creates spans, injects/extracts traceparent headers. -It uses contextvars to store the trace/span/parent_span ids and start time for each context. -""" - -from __future__ import annotations - -import contextvars -import logging -import os -import time -from collections.abc import Iterator -from contextlib import contextmanager -from dataclasses import dataclass -from typing import Any, Dict, Optional - -logger = logging.getLogger(__name__) - -# Contextvars containing trace/span/parent_span ids and start time for this thread -# (contextvars are something like TLS, they contain variables that are local to the context) -_current_trace_id = contextvars.ContextVar[str]("trace_id", default="") -_current_span_id = contextvars.ContextVar[str]("span_id", default="") -_current_parent_span_id = contextvars.ContextVar[str]("parent_span_id", default="") -_current_span_start_ns = contextvars.ContextVar[int]("span_start_ns", default=0) - - -def _rand_16_hex() -> str: - # 16 hex bytes (32 hex chars) - return os.urandom(16).hex() - -def _rand_8_hex() -> str: - # 8 hex bytes (16 hex chars) - return os.urandom(8).hex() - -def format_traceparent(trace_id: str, span_id: str, flags: str = "01") -> str: - """W3C traceparent: version 00""" - # version(2)-trace_id(32)-span_id(16)-flags(2) - return f"00-{trace_id}-{span_id}-{flags}" - -def parse_traceparent(header: str) -> Optional[tuple[str, str, str]]: - """Return (trace_id, span_id, flags) or None if invalid.""" - try: - parts = header.strip().split("-") - if len(parts) != 4 or parts[0] != "00": - logger.error(f"Invalid traceparent: {header}") - return None - trace_id, span_id, flags = parts[1], parts[2], parts[3] - if len(trace_id) != 32 or len(span_id) != 16 or len(flags) != 2: - return None - # W3C: all zero trace_id/span_id are invalid - if set(trace_id) == {"0"} or set(span_id) == {"0"}: - return None - return trace_id, span_id, flags - except Exception: - logger.error(f"Failed to parse traceparent: {header}") - return None - - -@dataclass -class TraceSpan: - """Lightweight span handle; keeps attributes in memory (for logging only).""" - name: str - kind: str # "SERVER" | "CLIENT" | "INTERNAL" - start_ns: int - trace_id: str - span_id: str - parent_span_id: str - attributes: Dict[str, Any] - - def set_attribute(self, key: str, value: Any) -> None: - self.attributes[key] = value - - def add_event(self, name: str, **attrs: Any) -> None: - # For simplicity we just log events immediately - logger.info( - "event name=%s trace_id=%s span_id=%s attrs=%s", - name, self.trace_id, self.span_id, attrs - ) - - def set_status(self, code: str, description: str = "") -> None: - self.attributes["status.code"] = code - if description: - self.attributes["status.description"] = description - - def record_exception(self, exc: BaseException) -> None: - self.add_event("exception", type=type(exc).__name__, msg=str(exc)) - - -class Tracer: - """Encapsulates everything related to tracing (span creation, logging, etc)""" - @contextmanager - def start_as_current_span(self, name: str, kind: str = "INTERNAL") -> Iterator[TraceSpan]: - trace_id = _current_trace_id.get() or _rand_16_hex() - parent_span = _current_span_id.get() - new_span_id = _rand_8_hex() - - # Push new context - tok_tid = _current_trace_id.set(trace_id) - tok_sid = _current_span_id.set(new_span_id) - tok_pid = _current_parent_span_id.set(parent_span) - tok_start = _current_span_start_ns.set(time.time_ns()) - - span = TraceSpan( - name=name, - kind=kind, - start_ns=_current_span_start_ns.get(), - trace_id=trace_id, - span_id=new_span_id, - parent_span_id=parent_span, - attributes={"span.kind": kind, "span.name": name}, - ) - - try: - logger.info( - "span_begin name=%s kind=%s trace_id=%s span_id=%s parent_span_id=%s attrs=%s", - span.name, span.kind, span.trace_id, span.span_id, span.parent_span_id, span.attributes - ) - yield span - except BaseException as exc: - span.record_exception(exc) - span.set_status("ERROR", str(exc)) - raise - finally: - # Pop the span from the context (restore parent context) - duration_ms = (time.time_ns() - span.start_ns) / 1_000_000 - logger.info( - "span_end name=%s kind=%s trace_id=%s span_id=%s parent_span_id=%s duration_ms=%.3f attrs=%s", - span.name, span.kind, span.trace_id, span.span_id, span.parent_span_id, duration_ms, span.attributes - ) - # Restore previous context - _current_span_start_ns.reset(tok_start) - _current_parent_span_id.reset(tok_pid) - _current_span_id.reset(tok_sid) - _current_trace_id.reset(tok_tid) - - def set_incoming_context( - self, - trace_id: Optional[str] = None, - parent_span_id: Optional[str] = None, - ) -> None: - """Seed current context with incoming W3C traceparent values. - - Only non-empty values are applied. - """ - if trace_id: - _current_trace_id.set(trace_id) - if parent_span_id: - _current_parent_span_id.set(parent_span_id) - - def current_trace_ids(self) -> tuple[str, str, str]: - return _current_trace_id.get(), _current_span_id.get(), _current_parent_span_id.get() - - def inject_traceparent(self, headers: Dict[str, str]) -> None: - """Inject W3C traceparent into outbound headers.""" - trace_id, span_id, _ = self.current_trace_ids() - if not trace_id or not span_id: - # If called outside of a span, create a short-lived span just to carry IDs - trace_id = trace_id or _rand_16_hex() - span_id = span_id or _rand_8_hex() - headers["traceparent"] = format_traceparent(trace_id, span_id) - - def extract_traceparent(self, headers: Dict[str, str]) -> tuple[str, str]: - """Extract parent trace/span; returns (trace_id, parent_span_id).""" - raw_traceparent = (headers.get("traceparent") - or headers.get("Traceparent") - or headers.get("TRACEPARENT")) - if not raw_traceparent: - return "", "" - parsed = parse_traceparent(raw_traceparent) - if not parsed: - return "", "" - return parsed[0], parsed[1] - # No incoming context - return "", "" - -# Tracer singleton for the process (not thread) -_tracer = Tracer() - -def get_tracer() -> Tracer: - return _tracer - - -class TraceLogFilter(logging.Filter): - """Inject trace/span ids into LogRecord for formatting.""" - def filter(self, record: logging.LogRecord) -> bool: - record.traceID, record.spanID, record.parentSpanID = get_tracer().current_trace_ids() - return True diff --git a/cmapi/tracing/__init__.py b/cmapi/tracing/__init__.py new file mode 100644 index 000000000..4c9ad4334 --- /dev/null +++ b/cmapi/tracing/__init__.py @@ -0,0 +1,49 @@ +"""Tracing support for CMAPI + +Despite having many files, the idea of this package is simple: MCS is a distributed system, + and we need to be able to trace requests across the system. +We need to understand: +* how one incoming request caused many others +* how long each request took +* which request each log line corresponds to +etc + +The basic high-level mechanism is this: +1. Each incoming request is assigned a trace ID (or it may already have one, see point 2). +2. This trace ID is propagated to all other outbound requests that are caused by this request. +3. Each sub-operation is assigned a span ID. Request ID stays the same, but the span ID changes. +4. Each span can have a parent span ID, which is the span ID of the request that caused this span. +5. So basically, we have a tree of spans, and the trace ID identifies the root of the tree. + +TraceID/SpanID/ParentSpanID are added to each log line, so we can identify which request each log line corresponds to. + +Trace attributes are passed through the system via request headers, and here it becomes a bit more complicated. +There are two technologies that we use to pass these ids: +1. W3C TraceContext. This is a standard, it has a fixed header and its format. + The header is called `traceparent`. It encapsulates trace id and span id. +2. Sentry. For historical reasons, it has different headers. And in our setup it is optional. + But Sentry is very useful, we also use it to monitor the errors, and it has a powerful UI, so we support it too. + +How is it implemented? +1. We have a global tracer object, that is used to create spans and pass them through the system. +2. It is a facade that hides two tracing backends with the same interface: TraceparentBackend and SentryBackend. +3. We have CherryPy tool that processes incoming requests, extracts traceparent header (or generates its parts), + creates a span for each request, injects traceparent header into the response. +4. For each outcoming request, we ask tracer to create a new span and to inject tracing headers into the request. + To avoid boilerplate, there is a TracedSession, an extension to requests that does all that. + For async requests, there is a TracedAsyncSession, that does the same. +5. When the request is finished, we ask tracer to finish/pop the current span. + +Logging: +There is a trace record factory, that adds a new field trace_params to each log record. +trace_params is a string representation of trace id, span id and parent span id. +If in current context they are empty (like in MainThread that doesn't process requests), trace_params is an empty string. + +Sentry reporting: +If Sentry is enabled, we send info about errors and exceptions into it. We also send logs that preceded the problem + as breadcrumbs to understand context of the error. +As we keep Sentry updated about the current trace and span, when an error happens, info about the trace will be sent to Sentry. +So we will know which chain of requests caused the error. +""" + + diff --git a/cmapi/tracing/backend.py b/cmapi/tracing/backend.py new file mode 100644 index 000000000..2d37dca43 --- /dev/null +++ b/cmapi/tracing/backend.py @@ -0,0 +1,37 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +from tracing.span import TraceSpan + + +class TracerBackend(ABC): + @abstractmethod + def on_span_start(self, span: TraceSpan) -> None: + raise NotImplementedError + + @abstractmethod + def on_span_end(self, span: TraceSpan, exc: Optional[BaseException]) -> None: + raise NotImplementedError + + def on_span_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None: + return + + def on_span_status(self, span: TraceSpan, code: str, description: str) -> None: + return + + def on_span_exception(self, span: TraceSpan, exc: BaseException) -> None: + return + + def on_span_attribute(self, span: TraceSpan, key: str, value: Any) -> None: + return + + def on_inject_headers(self, headers: Dict[str, str]) -> None: + return + + def on_incoming_request(self, headers: Dict[str, str], method: str, path: str) -> None: + return + + def on_request_finished(self, status_code: Optional[int]) -> None: + return + + diff --git a/cmapi/tracing/sentry.py b/cmapi/tracing/sentry.py new file mode 100644 index 000000000..a28d455ee --- /dev/null +++ b/cmapi/tracing/sentry.py @@ -0,0 +1,65 @@ +import logging + +import sentry_sdk +from sentry_sdk.integrations.logging import LoggingIntegration + +from cmapi_server import helpers +from cmapi_server.constants import CMAPI_CONF_PATH +from tracing.sentry_backend import SentryBackend +from tracing.tracer import get_tracer + +SENTRY_ACTIVE = False + +logger = logging.getLogger(__name__) + +def maybe_init_sentry() -> bool: + """Initialize Sentry from CMAPI configuration. + + Reads config and initializes Sentry only if dsn parameter is present + in corresponding section of cmapi config. + The initialization enables LoggingIntegration: it captures error-level logs as Sentry events + and uses lower-level logs as breadcrumbs. + + Returns: True if Sentry is initialized, False otherwise. + """ + global SENTRY_ACTIVE + try: + cfg_parser = helpers.get_config_parser(CMAPI_CONF_PATH) + dsn = helpers.dequote( + cfg_parser.get('Sentry', 'dsn', fallback='').strip() + ) + if not dsn: + return False + + environment = helpers.dequote( + cfg_parser.get('Sentry', 'environment', fallback='development').strip() + ) + except Exception: + logger.exception('Failed to initialize Sentry.') + return False + + try: + sentry_logging = LoggingIntegration( + level=logging.INFO, + event_level=logging.ERROR, + ) + + sentry_sdk.init( + dsn=dsn, + environment=environment, + sample_rate=1.0, + traces_sample_rate=1.0, + integrations=[sentry_logging], + debug=True, + ) + SENTRY_ACTIVE = True + # Register backend to mirror our internal spans into Sentry + get_tracer().register_backend(SentryBackend()) + logger.info('Sentry initialized for CMAPI via config.') + except Exception: + logger.exception('Failed to initialize Sentry.') + return False + + logger.info('Sentry successfully initialized.') + return True + diff --git a/cmapi/tracing/sentry_backend.py b/cmapi/tracing/sentry_backend.py new file mode 100644 index 000000000..cb343fe5f --- /dev/null +++ b/cmapi/tracing/sentry_backend.py @@ -0,0 +1,114 @@ +import logging +import contextvars +from typing import Any, Dict, Optional + +import sentry_sdk +from sentry_sdk.tracing import Transaction + +from tracing.tracer import TraceSpan, TracerBackend +from tracing.utils import swallow_exceptions + + +logger = logging.getLogger(__name__) + + +class SentryBackend(TracerBackend): + """Mirror spans and events from our Tracer into Sentry SDK.""" + + def __init__(self) -> None: + self._active_spans: Dict[str, Any] = {} + self._current_transaction = contextvars.ContextVar[Optional[Transaction]]("sentry_transaction", default=None) + + @swallow_exceptions + def on_span_start(self, span: TraceSpan) -> None: + kind_to_op = { + 'SERVER': 'http.server', + 'CLIENT': 'http.client', + 'INTERNAL': 'internal', + } + op = kind_to_op.get(span.kind.upper(), 'internal') + sdk_span = sentry_sdk.start_span(op=op, description=span.name) + sdk_span.set_tag('w3c.trace_id', span.trace_id) + sdk_span.set_tag('w3c.span_id', span.span_id) + if span.parent_span_id: + sdk_span.set_tag('w3c.parent_span_id', span.parent_span_id) + if span.attributes: + sdk_span.set_data('cmapi.span_attributes', dict(span.attributes)) + sdk_span.__enter__() + self._active_spans[span.span_id] = sdk_span + + @swallow_exceptions + def on_span_end(self, span: TraceSpan, exc: Optional[BaseException]) -> None: + sdk_span = self._active_spans.pop(span.span_id, None) + if sdk_span is None: + return + if exc is not None: + sdk_span.set_status('internal_error') + sdk_span.__exit__( + type(exc) if exc else None, + exc, + exc.__traceback__ if exc else None + ) + + @swallow_exceptions + def on_span_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None: + sentry_sdk.add_breadcrumb(category='event', message=name, data=dict(attrs)) + + @swallow_exceptions + def on_span_status(self, span: TraceSpan, code: str, description: str) -> None: + sdk_span = self._active_spans.get(span.span_id) + if sdk_span is not None: + sdk_span.set_status(code) + + @swallow_exceptions + def on_span_exception(self, span: TraceSpan, exc: BaseException) -> None: + sentry_sdk.capture_exception(exc) + + @swallow_exceptions + def on_span_attribute(self, span: TraceSpan, key: str, value: Any) -> None: + sdk_span = self._active_spans.get(span.span_id) + if sdk_span is not None: + sdk_span.set_data(f'attr.{key}', value) + + @swallow_exceptions + def on_inject_headers(self, headers: Dict[str, str]) -> None: + traceparent = sentry_sdk.get_traceparent() + baggage = sentry_sdk.get_baggage() + if traceparent: + headers['sentry-trace'] = traceparent + if baggage: + headers['baggage'] = baggage + + @swallow_exceptions + def on_incoming_request(self, headers: Dict[str, str], method: str, path: str) -> None: + name = f"{method} {path}" if method or path else "http.server" + transaction = sentry_sdk.continue_trace(headers or {}, op='http.server', name=name) + # Store transaction in context var to ensure we can finish it later + self._current_transaction.set(transaction) + transaction.__enter__() + scope = sentry_sdk.Hub.current.scope + if method: + scope.set_tag('http.method', method) + if path: + scope.set_tag('http.path', path) + # TODO: remove this + logger.info( + "Sentry transaction started name=%s sampled=%s", + name, getattr(transaction, 'sampled', None) + ) + + @swallow_exceptions + def on_request_finished(self, status_code: Optional[int]) -> None: + transaction = self._current_transaction.get() + if transaction is None: + return + if status_code is not None: + transaction.set_http_status(status_code) + # Exit to restore parent and finish the transaction + transaction.__exit__(None, None, None) + # Clear transaction in this context + self._current_transaction.set(None) + # TODO: remove this + logger.info("Sentry transaction finished status=%s", status_code) + + diff --git a/cmapi/tracing/span.py b/cmapi/tracing/span.py new file mode 100644 index 000000000..cc1ac8996 --- /dev/null +++ b/cmapi/tracing/span.py @@ -0,0 +1,46 @@ +from typing import TYPE_CHECKING +from dataclasses import dataclass +from typing import Any, Dict +from tracing.utils import swallow_exceptions + +if TYPE_CHECKING: + from tracing.tracer import Tracer + +@dataclass +class TraceSpan: + """Span handle bound to a tracer. + + Provides helpers to add attributes/events/status/exception that + will never propagate exceptions. + """ + + name: str + kind: str # "SERVER" | "CLIENT" | "INTERNAL" + start_ns: int + trace_id: str + span_id: str + parent_span_id: str + attributes: Dict[str, Any] + tracer: "Tracer" + + @swallow_exceptions + def set_attribute(self, key: str, value: Any) -> None: + self.attributes[key] = value + self.tracer._notify_attribute(self, key, value) + + @swallow_exceptions + def add_event(self, name: str, **attrs: Any) -> None: + self.tracer._notify_event(self, name, attrs) + + @swallow_exceptions + def set_status(self, code: str, description: str = "") -> None: + self.attributes["status.code"] = code + if description: + self.attributes["status.description"] = description + self.tracer._notify_status(self, code, description) + + @swallow_exceptions + def record_exception(self, exc: BaseException) -> None: + self.tracer._notify_exception(self, exc) + + diff --git a/cmapi/tracing/trace_tool.py b/cmapi/tracing/trace_tool.py new file mode 100644 index 000000000..5d0c278db --- /dev/null +++ b/cmapi/tracing/trace_tool.py @@ -0,0 +1,67 @@ +""" +CherryPy tool that uses the tracer to start a span for each request. +""" +import socket +from typing import Dict + +import cherrypy + +from tracing.tracer import get_tracer + + +def _on_request_start() -> None: + req = cherrypy.request + tracer = get_tracer() + + headers: Dict[str, str] = dict(req.headers or {}) + tracer.notify_incoming_request( + headers=headers, + method=getattr(req, 'method', ''), + path=getattr(req, 'path_info', '') + ) + trace_id, parent_span_id = tracer.extract_traceparent(headers) + tracer.set_incoming_context(trace_id, parent_span_id) + + span_name = f"{getattr(req, 'method', 'HTTP')} {getattr(req, 'path_info', '/')}" + + ctx = tracer.start_as_current_span(span_name, kind="SERVER") + span = ctx.__enter__() + span.set_attribute('http.method', getattr(req, 'method', '')) + span.set_attribute('http.path', getattr(req, 'path_info', '')) + span.set_attribute('client.ip', getattr(getattr(req, 'remote', None), 'ip', '')) + span.set_attribute('instance.hostname', socket.gethostname()) + safe_headers = {k: v for k, v in headers.items() if k.lower() not in {'authorization', 'x-api-key'}} + span.set_attribute('sentry.incoming_headers', safe_headers) + req._trace_span_ctx = ctx + req._trace_span = span + + tracer.inject_traceparent(cherrypy.response.headers) # type: ignore[arg-type] + + +def _on_request_end() -> None: + req = cherrypy.request + try: + status_str = str(cherrypy.response.status) + status_code = int(status_str.split()[0]) + except Exception: + status_code = None + tracer = get_tracer() + tracer.notify_request_finished(status_code) + span = getattr(req, "_trace_span", None) + if span is not None and status_code is not None: + span.set_attribute('http.status_code', status_code) + ctx = getattr(req, "_trace_span_ctx", None) + if ctx is not None: + try: + ctx.__exit__(None, None, None) + finally: + req._trace_span_ctx = None + req._trace_span = None + + +def register_tracing_tools() -> None: + cherrypy.tools.trace = cherrypy.Tool("on_start_resource", _on_request_start, priority=10) + cherrypy.tools.trace_end = cherrypy.Tool("on_end_resource", _on_request_end, priority=80) + + + diff --git a/cmapi/cmapi_server/traced_aiohttp.py b/cmapi/tracing/traced_aiohttp.py similarity index 84% rename from cmapi/cmapi_server/traced_aiohttp.py rename to cmapi/tracing/traced_aiohttp.py index 4643e217d..d9468e0b1 100644 --- a/cmapi/cmapi_server/traced_aiohttp.py +++ b/cmapi/tracing/traced_aiohttp.py @@ -1,10 +1,9 @@ -"""Async sibling of TracedSession""" - +"""Async sibling of TracedSession.""" from typing import Any import aiohttp -from cmapi_server.tracer import get_tracer +from tracing.tracer import get_tracer class TracedAsyncSession(aiohttp.ClientSession): @@ -23,10 +22,7 @@ class TracedAsyncSession(aiohttp.ClientSession): with tracer.start_as_current_span(span_name, kind="CLIENT") as span: span.set_attribute("http.method", method) span.set_attribute("http.url", url_text) - try: - tracer.inject_traceparent(headers) - except Exception: - pass + tracer.inject_outbound_headers(headers) try: response = await super()._request(method, str_or_url, *args, **kwargs) except Exception as exc: @@ -41,3 +37,4 @@ def create_traced_async_session(**kwargs: Any) -> TracedAsyncSession: return TracedAsyncSession(**kwargs) + diff --git a/cmapi/cmapi_server/traced_session.py b/cmapi/tracing/traced_session.py similarity index 69% rename from cmapi/cmapi_server/traced_session.py rename to cmapi/tracing/traced_session.py index 120e2379e..823023a26 100644 --- a/cmapi/cmapi_server/traced_session.py +++ b/cmapi/tracing/traced_session.py @@ -1,19 +1,12 @@ -"""Our own customized requests.Session that automatically traces outbound HTTP calls. - -Creates a CLIENT span per outbound HTTP request, injects traceparent, -records method/url/status, and closes the span when the request finishes. -""" - +"""Customized requests.Session that automatically traces outbound HTTP calls.""" from typing import Any, Optional import requests -from cmapi_server.tracer import get_tracer +from tracing.tracer import get_tracer class TracedSession(requests.Session): - """requests.Session that automatically traces outbound HTTP calls.""" - def request(self, method: str, url: str, *args: Any, **kwargs: Any) -> requests.Response: tracer = get_tracer() @@ -27,14 +20,13 @@ class TracedSession(requests.Session): span.set_attribute("http.method", method) span.set_attribute("http.url", url) - tracer.inject_traceparent(headers) + tracer.inject_outbound_headers(headers) try: response = super().request(method, url, *args, **kwargs) except Exception as exc: span.set_status("ERROR", str(exc)) raise else: - # Record status code span.set_attribute("http.status_code", response.status_code) return response @@ -43,10 +35,10 @@ _default_session: Optional[TracedSession] = None def get_traced_session() -> TracedSession: - """Return a process-wide TracedSession singleton.""" global _default_session if _default_session is None: _default_session = TracedSession() return _default_session + diff --git a/cmapi/tracing/traceparent_backend.py b/cmapi/tracing/traceparent_backend.py new file mode 100644 index 000000000..89bda3ac4 --- /dev/null +++ b/cmapi/tracing/traceparent_backend.py @@ -0,0 +1,38 @@ +import logging +import time +from typing import Any, Dict, Optional + +from tracing.tracer import TracerBackend, TraceSpan +from tracing.utils import swallow_exceptions + +logger = logging.getLogger(__name__) + + +class TraceparentBackend(TracerBackend): + """Default backend that logs span lifecycle and mirrors events/status.""" + + @swallow_exceptions + def on_span_start(self, span: TraceSpan) -> None: + logger.info( + "span_begin name=%s kind=%s trace_id=%s span_id=%s parent=%s attrs=%s", + span.name, span.kind, span.trace_id, span.span_id, + span.parent_span_id, span.attributes, + ) + + @swallow_exceptions + def on_span_end(self, span: TraceSpan, exc: Optional[BaseException]) -> None: + duration_ms = (time.time_ns() - span.start_ns) / 1_000_000 + logger.info( + "span_end name=%s kind=%s trace_id=%s span_id=%s parent=%s duration_ms=%.3f attrs=%s", + span.name, span.kind, span.trace_id, span.span_id, + span.parent_span_id, duration_ms, span.attributes, + ) + + @swallow_exceptions + def on_span_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None: + logger.info( + "span_event name=%s trace_id=%s span_id=%s attrs=%s", + name, span.trace_id, span.span_id, attrs, + ) + + diff --git a/cmapi/tracing/tracer.py b/cmapi/tracing/tracer.py new file mode 100644 index 000000000..e73bb3f58 --- /dev/null +++ b/cmapi/tracing/tracer.py @@ -0,0 +1,155 @@ +"""This module implements a tracer facade that creates spans, injects/extracts traceparent headers, +and delegates span lifecycle and enrichment to pluggable backends (e.g., Traceparent and Sentry). +It uses contextvars to store the trace/span/parent_span ids and start time for each context. +""" + +import contextvars +import logging +import time +from collections.abc import Iterator +from contextlib import contextmanager +from typing import Any, Dict, List, Optional, Tuple + +from tracing.backend import TracerBackend +from tracing.span import TraceSpan +from tracing.utils import ( + rand_16_hex, + rand_8_hex, + format_traceparent, + parse_traceparent, +) + +logger = logging.getLogger(__name__) + + +# Context vars are something like thread-local storage, they are context-local variables +_current_trace_id = contextvars.ContextVar[str]("trace_id", default="") +_current_span_id = contextvars.ContextVar[str]("span_id", default="") +_current_parent_span_id = contextvars.ContextVar[str]("parent_span_id", default="") +_current_span_start_ns = contextvars.ContextVar[int]("span_start_ns", default=0) + + +class Tracer: + def __init__(self) -> None: + self._backends: List[TracerBackend] = [] + + def register_backend(self, backend: TracerBackend) -> None: + try: + self._backends.append(backend) + logger.info( + "Tracing backend registered: %s", backend.__class__.__name__ + ) + except Exception: + logger.exception("Failed to register tracing backend") + + def clear_backends(self) -> None: + self._backends.clear() + + @contextmanager + def start_as_current_span(self, name: str, kind: str = "INTERNAL") -> Iterator[TraceSpan]: + trace_id = _current_trace_id.get() or rand_16_hex() + parent_span = _current_span_id.get() + new_span_id = rand_8_hex() + + tok_tid = _current_trace_id.set(trace_id) + tok_sid = _current_span_id.set(new_span_id) + tok_pid = _current_parent_span_id.set(parent_span) + tok_start = _current_span_start_ns.set(time.time_ns()) + + span = TraceSpan( + name=name, + kind=kind, + start_ns=_current_span_start_ns.get(), + trace_id=trace_id, + span_id=new_span_id, + parent_span_id=parent_span, + attributes={"span.kind": kind, "span.name": name}, + tracer=self, + ) + + caught_exc: Optional[BaseException] = None + try: + for backend in list(self._backends): + backend.on_span_start(span) + yield span + except BaseException as exc: + span.record_exception(exc) + span.set_status("ERROR", str(exc)) + caught_exc = exc + raise + finally: + for backend in list(self._backends): + backend.on_span_end(span, caught_exc) + _current_span_start_ns.reset(tok_start) + _current_parent_span_id.reset(tok_pid) + _current_span_id.reset(tok_sid) + _current_trace_id.reset(tok_tid) + + def set_incoming_context( + self, + trace_id: Optional[str] = None, + parent_span_id: Optional[str] = None, + ) -> None: + if trace_id: + _current_trace_id.set(trace_id) + if parent_span_id: + _current_parent_span_id.set(parent_span_id) + + def current_trace_ids(self) -> Tuple[str, str, str]: + return _current_trace_id.get(), _current_span_id.get(), _current_parent_span_id.get() + + def inject_traceparent(self, headers: Dict[str, str]) -> None: + trace_id, span_id, _ = self.current_trace_ids() + if not trace_id or not span_id: + trace_id = trace_id or rand_16_hex() + span_id = span_id or rand_8_hex() + headers["traceparent"] = format_traceparent(trace_id, span_id) + + def inject_outbound_headers(self, headers: Dict[str, str]) -> None: + self.inject_traceparent(headers) + for backend in list(self._backends): + backend.on_inject_headers(headers) + + def notify_incoming_request(self, headers: Dict[str, str], method: str, path: str) -> None: + for backend in list(self._backends): + backend.on_incoming_request(headers, method, path) + + def notify_request_finished(self, status_code: Optional[int]) -> None: + for backend in list(self._backends): + backend.on_request_finished(status_code) + + def extract_traceparent(self, headers: Dict[str, str]) -> Tuple[str, str]: + raw_traceparent = (headers.get("traceparent") + or headers.get("Traceparent") + or headers.get("TRACEPARENT")) + if not raw_traceparent: + return "", "" + parsed = parse_traceparent(raw_traceparent) + if not parsed: + return "", "" + return parsed[0], parsed[1] + + def _notify_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None: + for backend in list(self._backends): + backend.on_span_event(span, name, attrs) + + def _notify_status(self, span: TraceSpan, code: str, description: str) -> None: + for backend in list(self._backends): + backend.on_span_status(span, code, description) + + def _notify_exception(self, span: TraceSpan, exc: BaseException) -> None: + for backend in list(self._backends): + backend.on_span_exception(span, exc) + + def _notify_attribute(self, span: TraceSpan, key: str, value: Any) -> None: + for backend in list(self._backends): + backend.on_span_attribute(span, key, value) + + +_tracer = Tracer() + + +def get_tracer() -> Tracer: + return _tracer + + diff --git a/cmapi/tracing/utils.py b/cmapi/tracing/utils.py new file mode 100644 index 000000000..26e9d4a0c --- /dev/null +++ b/cmapi/tracing/utils.py @@ -0,0 +1,54 @@ +import logging +import os +from functools import wraps +from typing import Optional, Tuple + +logger = logging.getLogger("tracing") + +def swallow_exceptions(method): + """Decorator that logs exceptions and prevents them from propagating up.""" + + @wraps(method) + def _wrapper(*args, **kwargs): + try: + return method(*args, **kwargs) + except Exception: + logger.exception("%s failed", getattr(method, "__qualname__", repr(method))) + return None + + return _wrapper + +def rand_16_hex() -> str: + """Return 16 random bytes as a 32-char hex string (trace_id size).""" + return os.urandom(16).hex() + + +def rand_8_hex() -> str: + """Return 8 random bytes as a 16-char hex string (span_id size).""" + return os.urandom(8).hex() + + +def format_traceparent(trace_id: str, span_id: str, flags: str = "01") -> str: + """Build a W3C traceparent header (version 00).""" + return f"00-{trace_id}-{span_id}-{flags}" + + +def parse_traceparent(header: str) -> Optional[Tuple[str, str, str]]: + """Parse W3C traceparent and return (trace_id, span_id, flags) or None.""" + try: + parts = header.strip().split("-") + if len(parts) != 4 or parts[0] != "00": + logger.error("Invalid traceparent: %s", header) + return None + trace_id, span_id, flags = parts[1], parts[2], parts[3] + if len(trace_id) != 32 or len(span_id) != 16 or len(flags) != 2: + return None + # W3C: all zero trace_id/span_id are invalid + if set(trace_id) == {"0"} or set(span_id) == {"0"}: + return None + return trace_id, span_id, flags + except Exception: + logger.exception("Failed to parse traceparent: %s", header) + return None + +