diff --git a/cmapi/.gitignore b/cmapi/.gitignore index 4b87c115f..6258c213b 100644 --- a/cmapi/.gitignore +++ b/cmapi/.gitignore @@ -90,4 +90,11 @@ buildinfo.txt # Self-signed certificates cmapi_server/self-signed.crt -cmapi_server/self-signed.key \ No newline at end of file +cmapi_server/self-signed.key +# Comes in handy if you build packages locally +pp +mcs +mcs_aws +mcs_gsutil +_CPack_Packages/ +venv/ \ No newline at end of file diff --git a/cmapi/cmapi_server/__main__.py b/cmapi/cmapi_server/__main__.py index e037f4f7f..040cad153 100644 --- a/cmapi/cmapi_server/__main__.py +++ b/cmapi/cmapi_server/__main__.py @@ -24,7 +24,7 @@ from tracing.trace_tool import register_tracing_tools from cmapi_server import helpers from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, CMAPI_CONF_PATH -from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error +from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error, jsonify_404 from cmapi_server.failover_agent import FailoverAgent from cmapi_server.managers.application import AppManager from cmapi_server.managers.process import MCSProcessManager @@ -155,6 +155,7 @@ if __name__ == '__main__': root_config = { "request.dispatch": dispatcher, "error_page.default": jsonify_error, + "error_page.404": jsonify_404, # Enable tracing tools 'tools.trace.on': True, 'tools.trace_end.on': True, diff --git a/cmapi/cmapi_server/cmapi_logger.conf b/cmapi/cmapi_server/cmapi_logger.conf index e0edb52f6..8ef9f510b 100644 --- a/cmapi/cmapi_server/cmapi_logger.conf +++ b/cmapi/cmapi_server/cmapi_logger.conf @@ -3,6 +3,9 @@ "filters": { "add_ip_filter": { "()": "cmapi_server.logging_management.AddIpFilter" + }, + "trace_params_filter": { + "()": "cmapi_server.logging_management.TraceParamsFilter" } }, "formatters": { @@ -17,25 +20,30 @@ "container_sh": { "format" : "`%(asctime)s`: %(message)s", "datefmt": "%a %d %b %Y %I:%M:%S %p %Z" + }, + "json_formatter": { + "()": "cmapi_server.logging_management.JsonFormatter" } }, "handlers": { "cmapi_server": { "level": "DEBUG", "class": "logging.StreamHandler", - "filters": ["add_ip_filter"], + "filters": ["add_ip_filter", "trace_params_filter"], "formatter": "cmapi_server", "stream": "ext://sys.stdout" }, "console": { "level": "DEBUG", "class": "logging.StreamHandler", + "filters": ["trace_params_filter"], "formatter": "default", "stream": "ext://sys.stdout" }, "file": { "level": "DEBUG", "class": "logging.handlers.RotatingFileHandler", + "filters": ["trace_params_filter"], "formatter": "default", "filename": "/var/log/mariadb/columnstore/cmapi_server.log", "mode": "a", @@ -52,6 +60,16 @@ "maxBytes": 1024, "backupCount": 3, "encoding": "utf8" + }, + "json_trace": { + "level": "DEBUG", + "class": "logging.handlers.RotatingFileHandler", + "formatter": "json_formatter", + "filename": "/var/log/mariadb/columnstore/cmapi_json_trace.log", + "mode": "a", + "maxBytes": 1048576, + "backupCount": 10, + "encoding": "utf8" } }, "loggers": { @@ -78,6 +96,11 @@ "root": { "handlers": ["console", "file"], "level": "DEBUG" + }, + "json_trace": { + "handlers": ["json_trace"], + "level": "DEBUG", + "propagate": false } }, "disable_existing_loggers": false diff --git a/cmapi/cmapi_server/controllers/dispatcher.py b/cmapi/cmapi_server/controllers/dispatcher.py index ba7c90565..e8ec8830a 100644 --- a/cmapi/cmapi_server/controllers/dispatcher.py +++ b/cmapi/cmapi_server/controllers/dispatcher.py @@ -1,4 +1,5 @@ import json +import logging import cherrypy @@ -13,6 +14,7 @@ from cmapi_server.controllers.s3dataload import S3DataLoadController _version = '0.4.0' dispatcher = cherrypy.dispatch.RoutesDispatcher() +logger = logging.getLogger(__name__) # /_version/status (GET) @@ -280,3 +282,18 @@ def jsonify_error(status, message, traceback, version): \ cherrypy.response.status = status return response_body + + +def jsonify_404(status, message, traceback, version): + # pylint: disable=unused-argument + """Specialized renderer for 404 Not Found that logs context, then renders JSON. + """ + try: + req = cherrypy.request + method = getattr(req, 'method', '') + path = getattr(req, 'path_info', '') or '/' + remote_ip = getattr(getattr(req, 'remote', None), 'ip', '') or '?' + logger.error("404 Not Found: %s %s from %s", method, path, remote_ip) + except Exception: + pass + return jsonify_error(status, message, traceback, version) diff --git a/cmapi/cmapi_server/logging_management.py b/cmapi/cmapi_server/logging_management.py index 8406bfb1f..0d78b57b9 100644 --- a/cmapi/cmapi_server/logging_management.py +++ b/cmapi/cmapi_server/logging_management.py @@ -17,27 +17,23 @@ class AddIpFilter(logging.Filter): return True -def install_trace_record_factory() -> None: - """Install a LogRecord factory that adds 'trace_params' field. - 'trace_params' will be an empty string if there is no active trace/span - (like in MainThread, where there is no incoming requests). - Otherwise it will contain trace parameters. - """ - current_factory = logging.getLogRecordFactory() +class TraceParamsFilter(logging.Filter): + """Filter that adds trace_params to log records, except for the 'tracer' logger.""" + def filter(self, record: logging.LogRecord) -> bool: + # Don't print trace params for tracer logger family; it already prints trace data + if record.name == 'tracer' or record.name.startswith('tracer.'): + record.trace_params = "" + return True - def factory(*args, **kwargs): # type: ignore[no-untyped-def] - record = current_factory(*args, **kwargs) trace_id, span_id, parent_span_id = get_tracer().current_trace_ids() if trace_id and span_id: - record.trace_params = f'rid={trace_id} sid={span_id}' + trace_params = f"rid={trace_id} sid={span_id}" if parent_span_id: - record.trace_params += f' psid={parent_span_id}' + trace_params += f" psid={parent_span_id}" + record.trace_params = trace_params else: record.trace_params = "" - return record - - logging.setLogRecordFactory(factory) - + return True def custom_cherrypy_error( self, msg='', context='', severity=logging.INFO, traceback=False @@ -142,8 +138,7 @@ def config_cmapi_server_logging(): cherrypy._cplogging.LogManager.access_log_format = ( '{h} ACCESS "{r}" code {s}, bytes {b}, user-agent "{a}"' ) - # Ensure trace_params is available on every record - install_trace_record_factory() + # trace_params are populated via TraceParamsFilter configured in logging config dict_config(CMAPI_LOG_CONF_PATH) disable_unwanted_loggers() @@ -164,3 +159,22 @@ def change_loggers_level(level: str): def disable_unwanted_loggers(): logging.getLogger("urllib3").setLevel(logging.WARNING) + + +class JsonFormatter(logging.Formatter): + # Standard LogRecord fields + skip_fields = set(logging.LogRecord('',0,'',0,'',(),None).__dict__.keys()) + + def format(self, record): + data = { + "ts": self.formatTime(record, self.datefmt), + "level": record.levelname, + "logger": record.name, + "msg": record.getMessage(), + } + # Extract extras from the record (all attributes except standard LogRecord fields) + for k, v in record.__dict__.items(): + if k not in self.skip_fields: + data[k] = v + # Allow non-serializable extras (e.g., bytes, datetime) to be stringified + return json.dumps(data, ensure_ascii=False, sort_keys=True, default=str) \ No newline at end of file diff --git a/cmapi/mcs_node_control/models/node_config.py b/cmapi/mcs_node_control/models/node_config.py index e83f1e541..0068c35cf 100644 --- a/cmapi/mcs_node_control/models/node_config.py +++ b/cmapi/mcs_node_control/models/node_config.py @@ -7,6 +7,7 @@ import socket from os import mkdir, replace, chown from pathlib import Path from shutil import copyfile +from typing import Iterator from xml.dom import minidom # to pick up pretty printing functionality from lxml import etree @@ -366,12 +367,10 @@ class NodeConfig: return False - def get_network_addresses(self): + def get_network_addresses(self) -> Iterator[str]: """Retrievs the list of the network addresses Generator that yields network interface addresses. - - :rtype: str """ for ni in get_network_interfaces(): for fam in [socket.AF_INET, socket.AF_INET6]: diff --git a/cmapi/scripts/trace_to_plantuml.py b/cmapi/scripts/trace_to_plantuml.py new file mode 100644 index 000000000..c0146c41d --- /dev/null +++ b/cmapi/scripts/trace_to_plantuml.py @@ -0,0 +1,773 @@ +#!/usr/bin/env python3 +""" +Read CMAPI tracing JSONL logs (JsonFormatter) from one or more nodes and render a +PlantUML sequence diagram showing inbound (SERVER) and outbound (CLIENT) requests +between nodes. + +Inputs + - One or more JSONL log files + - Each input log file is assumed to be from a single node. + +Output + - PlantUML written to the specified output file. + +Parsing and correlation + - Detects span_begin/span_end events from JSONL (msg in {"span_begin","span_end"}). + - Per file, reads the "my_addresses" record and takes the first non-loopback IP (preferring IPv4) + as the node's local IP; all spans from that file carry this local IP. + - Merge spans across files by (trace_id, span_id). Duration and status_code are added when present. + +Normalization and aliasing + - Localhost remap: any 127.0.0.1/::1 seen in that file is replaced with the file's local IP. + - Remote host normalization: hostnames with embedded IPs (dashed or dotted) are converted to dotted IPs. + - Aliasing: if an IP is provided via --alias IP=NAME, the diagram uses NAME for that participant. + +Diagram semantics + - SERVER spans: " --> METHOD /path" render as remote -> local (local is the file's node). + - CLIENT spans: "HTTP METHOD https://host/path" render as local -> remote. + - CLIENT spans prefer request_duration_ms; otherwise duration_ms is shown if available. + - SERVER spans include [status_code] when available. + +CLI options + - --filter-trace TRACE_ID Only include the specified trace_id. + - --alias IP=NAME Map an IP to a friendly name (may be specified multiple times). + - --list-ips Print the set of encountered IPs and exit. + +Usage examples + python3 cmapi/scripts/trace_to_plantuml.py json_trace_mcs1 json_trace_mcs2 json_trace_mcs3 --output diagram.puml + python3 cmapi/scripts/trace_to_plantuml.py json_trace_mcs1 json_trace_mcs2 json_trace_mcs3 --output diagram.puml + --filter-trace 1f3b2cb2... --alias 172.31.41.127=mcs2 --alias 172.31.45.34=mcs3 +""" +from __future__ import annotations + +import argparse +import colorsys +import dataclasses +import datetime as dt +import hashlib +import ipaddress +import json +import logging +import os +import re +import sys +from typing import Dict, List, Optional, Tuple +from urllib.parse import urlparse + +JSON_TS_FORMATS = [ + "%Y-%m-%d %H:%M:%S,%f", # default logging formatter + "%Y-%m-%d %H:%M:%S", # without millis +] + +# Minimum processing time to show activation lifeline (in milliseconds) +ACTIVATION_MIN_MS = 1000 + +# dashed IP fragment pattern; will convert '172-31-45-34' -> '172.31.45.34' +DASHED_IP_RE = re.compile(r"(?P\b\d{1,3}(?:-\d{1,3}){3}\b)") + +logger = logging.getLogger("trace_viz") + +def _normalize_cmapi_path(path: str) -> str: + """Remove common CMAPI prefixes like '/cmapi/0.4.0' and '/cmapi/'. + """ + if not path: + return "/" + # normalize double slashes + while '//' in path: + path = path.replace('//', '/') + prefixes = [ + '/cmapi/0.4.0', + '/cmapi/', + ] + for pref in prefixes: + if path.startswith(pref): + path = path[len(pref):] + break + if not path.startswith('/'): + path = '/' + path + return path if path != '//' else '/' + +def _hex_color_for_trace(trace_id: str, depth: int) -> str: + """Derive a stable color from trace_id and vary by depth. + + We map trace_id hash to a base hue and then shift lightness slightly per depth. + Returns a hex color string like '#a1b2c3'. + """ + h = hashlib.sha1(trace_id.encode('utf-8')).digest() + # base hue from first byte, saturation fixed, lightness varies by depth + base_hue = h[0] / 255.0 # 0..1 + sat = 0.55 + base_lightness = 0.55 + # depth shift: +/- 0.06 per level alternating + shift = ((depth % 6) - 3) * 0.02 # -0.06..+0.06 + lightness = max(0.35, min(0.75, base_lightness + shift)) + # convert HSL to RGB + r, g, b = colorsys.hls_to_rgb(base_hue, lightness, sat) + return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}" + + +def _colorized_arrow(arrow: str, color_hex: str) -> str: + """Insert a PlantUML color into an arrow token. + + Examples: + '->' -> '-[#hex]->' + '-->' -> '-[#hex]-->' + '->>' -> '-[#hex]->>' + '-->>' -> '-[#hex]-->>' + """ + if not color_hex: + return arrow + if arrow.startswith('--'): + return f"-[{color_hex}]{arrow}" + return f"-[{color_hex}]{arrow}" + + +def _fmt_secs(ms: float) -> str: + """Format milliseconds as seconds with 2 decimal places.""" + return f"{(ms or 0.0)/1000.0:.2f}s" + +@dataclasses.dataclass +class Span: + trace_id: str + span_id: str + parent_span_id: Optional[str] + kind: str # SERVER or CLIENT + name: str + start_ts: dt.datetime + duration_ms: Optional[float] = None + status_code: Optional[int] = None + request_duration_ms: Optional[float] = None + local_label: str = "local" + local_ip: Optional[str] = None + local_file: Optional[str] = None # full path to source log file + # Structured attributes + http_method: Optional[str] = None + http_path: Optional[str] = None + http_url: Optional[str] = None + client_ip: Optional[str] = None + request_is_sync: Optional[bool] = None + + +@dataclasses.dataclass +class Message: + ts: dt.datetime + src: str + dst: str + label: str + arrow: str # '->', '->>', '-->', '-->>' + + +@dataclasses.dataclass +class Directive: + ts: dt.datetime + text: str # e.g., 'activate mcs1' or 'deactivate mcs1' + + +def parse_json_ts(ts_str: str) -> Optional[dt.datetime]: + for fmt in JSON_TS_FORMATS: + try: + return dt.datetime.strptime(ts_str, fmt) + except Exception: + continue + return None + + +def parse_json_span_event(obj: Dict[str, object]) -> Optional[Tuple[str, Dict[str, object]]]: + """Return (msg, data) if this JSON object is a span_begin/span_end with usable fields.""" + msg = str(obj.get("msg", "")) + if msg not in {"span_begin", "span_end"}: + return None + return msg, obj + + +def local_label_from_path(path: str) -> str: + base = os.path.basename(path) + return base + + +def collect_known_ips_from_span(sp: "Span", known_ips: set[str]) -> None: + """Populate known IPs using structured span attributes only.""" + try: + if sp.kind == "SERVER" and sp.client_ip and is_ip(sp.client_ip): + known_ips.add(sp.client_ip) + elif sp.kind == "CLIENT" and sp.http_url: + try: + host = urlparse(sp.http_url).hostname + except Exception: + host = None + if host and is_ip(host): + known_ips.add(host) + except Exception: + pass + + +def normalize_participant(name: str) -> str: + # PlantUML participant names cannot contain spaces or some punctuation unless quoted. + # We'll use an alias form: participant "label" as alias, where alias is alnum/underscore. + # This function returns a safe alias; the label is kept original elsewhere. + alias = re.sub(r"[^A-Za-z0-9_]", "_", name) + # Avoid alias starting with a digit + if alias and alias[0].isdigit(): + alias = f"n_{alias}" + return alias or "unknown" + + +# Legacy name-parsing helpers removed; rely on structured attributes. + + +def is_ip(value: str) -> bool: + try: + ipaddress.ip_address(value) + return True + except Exception: + return False + + +def parse_jsonl_logs(log_paths: List[str]) -> Tuple[List[Span], List[str]]: + spans_by_key: Dict[Tuple[str, str], Span] = {} + known_ips: set[str] = set() + for path in log_paths: + local_label = local_label_from_path(path) + # Per-file local IP discovered from a special 'my_addresses' record + file_local_ip: Optional[str] = None + if not os.path.exists(path): + logger.error("Input log file does not exist: %s", path) + raise FileNotFoundError(path) + with open(path, "r", encoding="utf-8", errors="ignore") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except Exception as exc: + logger.error("Failed to parse JSON line in %s: %s", path, line[:256]) + raise ValueError(f"JSON parse error in {path}: {exc}") + # Detect and store local IPs for this file + if str(obj.get("msg", "")) == "my_addresses": + addrs = obj.get("my_addresses") + if isinstance(addrs, list): + # Pick first non-loopback IPv4 if available, else first non-loopback + candidates = [a for a in addrs if isinstance(a, str) and a not in {"127.0.0.1", "::1"}] + # Prefer IPv4 + ipv4 = [a for a in candidates if re.match(r"^\d{1,3}(?:\.\d{1,3}){3}$", a)] + chosen = ipv4[0] if ipv4 else (candidates[0] if candidates else None) + if chosen: + file_local_ip = chosen + known_ips.add(chosen) + else: + logger.error("No non-loopback addresses found in my_addresses for %s: %s", path, addrs) + raise ValueError(f"No non-loopback addresses found in my_addresses for {path}: {addrs}") + # continue scanning; spans may come later + continue + parsed = parse_json_span_event(obj) + if not parsed: + continue + msg, data = parsed + ts_raw = str(data.get("ts", "")) + ts = parse_json_ts(ts_raw) + if ts is None: + logger.error("Unparseable timestamp '%s' in %s; defaulting to epoch", ts_raw, path) + ts = dt.datetime.fromtimestamp(0) + # Fields from flattened span dict + name = str(data.get("span_name", data.get("name", ""))) + kind = str(data.get("kind", "")).upper() + tid = str(data.get("trace_id", "")) + sid = str(data.get("span_id", "")) + psid = data.get("parent_span_id") + key = (tid, sid) + + if msg == "span_begin": + if not tid or not sid or not name or not kind: + logger.error( + "Malformed span_begin in %s (missing fields): tid=%s sid=%s name=%s kind=%s", + path, tid, sid, name, kind, + ) + raise ValueError(f"Malformed span_begin in {path} (missing fields): tid={tid} sid={sid} name={name} kind={kind}") + if kind not in {"SERVER", "CLIENT"}: + logger.error( + "Unexpected span kind in span_begin: kind=%s tid=%s sid=%s name=%s (file=%s)", + kind, tid, sid, name, path, + ) + raise ValueError(f"Unexpected span kind in span_begin: kind={kind} tid={tid} sid={sid} name={name} (file={path})") + sp = Span( + trace_id=tid, + span_id=sid, + parent_span_id=str(psid) if psid else None, + kind=kind, + name=name, + start_ts=ts, + local_label=local_label, + local_ip=file_local_ip, + local_file=path, + ) + # Capture structured fields when present + sp.http_method = str(data.get("http.method")) if data.get("http.method") is not None else sp.http_method + sp.http_path = str(data.get("http.path")) if data.get("http.path") is not None else sp.http_path + sp.http_url = str(data.get("http.url")) if data.get("http.url") is not None else sp.http_url + sp.client_ip = str(data.get("client.ip")) if data.get("client.ip") is not None else sp.client_ip + # Status code if present + if data.get("http.status_code") is not None: + try: + sp.status_code = int(data.get("http.status_code")) + except Exception: + pass + if data.get("request_is_sync") is not None: + try: + sp.request_is_sync = bool(data.get("request_is_sync")) + except Exception: + pass + spans_by_key[key] = sp + collect_known_ips_from_span(sp, known_ips) + elif msg == "span_end": + sp = spans_by_key.get(key) + if sp is None: + # Create minimal span if begin was not seen in provided files + logger.error( + "span_end without matching span_begin (tid=%s sid=%s) in %s; creating synthetic span", + tid, sid, path, + ) + sp = Span( + trace_id=tid, + span_id=sid or f"unknown_{len(spans_by_key)}", + parent_span_id=str(psid) if psid else None, + kind=kind or "SERVER", + name=name, + start_ts=ts, + local_label=local_label, + local_ip=file_local_ip, + local_file=path, + ) + spans_by_key[key] = sp + collect_known_ips_from_span(sp, known_ips) + # Update structured fields from end event (often richer) + try: + if data.get("http.method") is not None: + sp.http_method = str(data.get("http.method")) + if data.get("http.path") is not None: + sp.http_path = str(data.get("http.path")) + if data.get("http.url") is not None: + sp.http_url = str(data.get("http.url")) + if data.get("client.ip") is not None: + sp.client_ip = str(data.get("client.ip")) + if data.get("http.status_code") is not None: + try: + sp.status_code = int(data.get("http.status_code")) + except Exception: + pass + if data.get("request_is_sync") is not None: + sp.request_is_sync = bool(data.get("request_is_sync")) + except Exception: + pass + # Duration may be present as duration_ms attribute in JSON; otherwise leave None + dur = data.get("duration_ms") + try: + sp.duration_ms = float(dur) if dur is not None else sp.duration_ms + if sp.duration_ms is not None and sp.duration_ms < 0: + logger.warning( + "Negative duration_ms for span: tid=%s sid=%s duration_ms=%s", + sp.trace_id, sp.span_id, sp.duration_ms, + ) + except Exception: + pass + # Prefer explicit outbound request duration if present + try: + req_dur = data.get("request_duration_ms") + if req_dur is not None: + sp.request_duration_ms = float(req_dur) + if sp.request_duration_ms < 0: + logger.warning( + "Negative request_duration_ms for CLIENT span: tid=%s sid=%s duration_ms=%s", + sp.trace_id, sp.span_id, sp.request_duration_ms, + ) + except Exception: + pass + # Retroactively assign local_ip to any earlier spans from this file that were created + # before the 'my_addresses' record was seen + if file_local_ip: + for sp in spans_by_key.values(): + if sp.local_file == path and sp.local_ip is None: + sp.local_ip = file_local_ip + else: + logger.error("Did not see my_addresses for file %s; local_ip unknown for its spans", path) + # Sort spans by start_ts + spans = sorted(spans_by_key.values(), key=lambda s: s.start_ts) + # Post-parse validation: check unresolved parent_span_id references + by_trace: Dict[str, set[str]] = {} + for sp in spans: + by_trace.setdefault(sp.trace_id, set()).add(sp.span_id) + for sp in spans: + if sp.parent_span_id and sp.parent_span_id not in by_trace.get(sp.trace_id, set()): + logger.error( + "Unresolved parent_span_id: tid=%s sid=%s parent_sid=%s (no matching span_begin/end in inputs)", + sp.trace_id, sp.span_id, sp.parent_span_id, + ) + return spans, sorted(known_ips) + + +def normalize_hostname_to_ip(host: str, known_ips: List[str], host_map: Dict[str, str]) -> str: + # If host contains a dashed IP that matches a known dotted IP, replace with that IP + # First, use explicit host->ip mappings from the log + if host in host_map: + return host_map[host] + # Prefer explicit matches against our known IPs (dotted or dashed forms) + for ip in known_ips: + dashed_ip = ip.replace('.', '-') + if dashed_ip in host or ip in host: + return ip + # If the hostname includes a dashed-quad anywhere, extract and return only the dotted IP + m = DASHED_IP_RE.search(host) + if m: + dashed = m.group("dashed") + dotted = dashed.replace("-", ".") + return dotted + # If the hostname includes a dotted-quad anywhere, return that IP + m2 = re.search(r"\b(\d{1,3}(?:\.\d{1,3}){3})\b", host) + if m2: + return m2.group(1) + return host + + +def build_plantuml(spans: List[Span], filter_trace: Optional[str], known_ips: List[str], ip_alias_map: Dict[str, str]) -> str: + lines: List[str] = [] + logged_replacements: set[Tuple[str, str]] = set() # (original, replaced) + lines.append("@startuml") + # Collect participants from spans + participants: Dict[str, str] = {} # alias -> label + + msgs: List[Message] = [] + directives: List[Directive] = [] + unaliased_ips_logged: set[str] = set() + + # Build a per-file preferred local label from alias map by looking at peers in the same file + file_preferred_label: Dict[str, str] = {} + file_label_counts: Dict[str, Dict[str, int]] = {} + for sp in spans: + fname = sp.local_file or "" + if not fname: + continue + # Consider both SERVER remote and CLIENT host using structured attributes + if sp.kind == "SERVER" and sp.client_ip: + norm = normalize_hostname_to_ip(sp.client_ip, known_ips, {}) + if norm in ip_alias_map: + label = ip_alias_map[norm] + file_label_counts.setdefault(fname, {}).setdefault(label, 0) + file_label_counts[fname][label] += 1 + elif sp.kind == "CLIENT" and sp.http_url: + try: + host = urlparse(sp.http_url).hostname or "" + except Exception: + host = "" + if host: + norm = normalize_hostname_to_ip(host, known_ips, {}) + if norm in ip_alias_map: + label = ip_alias_map[norm] + file_label_counts.setdefault(fname, {}).setdefault(label, 0) + file_label_counts[fname][label] += 1 + for fname, counts in file_label_counts.items(): + best_label = max(counts.items(), key=lambda kv: kv[1])[0] + file_preferred_label[fname] = best_label + + for sp in spans: + if filter_trace and sp.trace_id != filter_trace: + continue + # Compute depth (nesting) via parent chain length within the same trace + depth = 0 + try: + seen = set() + cur = sp.parent_span_id + while cur: + if (sp.trace_id, cur) in seen: + break + seen.add((sp.trace_id, cur)) + depth += 1 + parent = next((p for p in spans if p.trace_id == sp.trace_id and p.span_id == cur), None) + cur = parent.parent_span_id if parent else None + except Exception: + depth = 0 + color = _hex_color_for_trace(sp.trace_id, depth) + trace_tag = f"[{sp.trace_id[:4]}-{sp.span_id[:2]}]" + time_str = sp.start_ts.strftime("%H:%M:%S") + # Prefer explicit outbound request duration for CLIENT spans if present + if sp.kind == "CLIENT" and sp.request_duration_ms is not None: + dur_str = f" ({_fmt_secs(sp.request_duration_ms)})" + else: + dur_str = f" ({_fmt_secs(sp.duration_ms)})" if sp.duration_ms is not None else "" + + if sp.kind == "SERVER": + # Structured attributes required: client_ip, http_method, http_path + remote = sp.client_ip + method = sp.http_method + path = sp.http_path + if not (remote and method and path): + logger.warning( + "Skipping SERVER span due to missing structured fields: tid=%s sid=%s name=%s client.ip=%s http.method=%s http.path=%s", + sp.trace_id, sp.span_id, sp.name, sp.client_ip, sp.http_method, sp.http_path, + ) + continue + label = f"{method} {_normalize_cmapi_path(path)}" + # Server status code + code_str = f" [{sp.status_code}]" if sp.status_code is not None else "" + if sp.status_code is None: + logger.warning( + "Missing http.status_code for SERVER span at render time: tid=%s sid=%s method=%s path=%s", + sp.trace_id, sp.span_id, method, path, + ) + # Replace loopback with file's real IP if available + if remote in {"127.0.0.1", "::1"} and sp.local_ip: + orig = remote + remote = sp.local_ip + key = (orig, remote) + if key not in logged_replacements: + file_label = os.path.basename(sp.local_file) if sp.local_file else "?" + logger.info("Loopback remapped (SERVER): %s -> %s [file=%s]", orig, remote, file_label) + logged_replacements.add(key) + # do not filter self traffic + # If remote is an IP, keep as-is; else use hostname + # apply aliasing for IPs + # Log unaliased IPs + if is_ip(remote) and remote not in ip_alias_map and remote not in unaliased_ips_logged: + logger.info("Unaliased IP encountered (SERVER remote): %s. Consider providing --alias %s=NAME", remote, remote) + unaliased_ips_logged.add(remote) + remote_label = ip_alias_map.get(remote, remote) + remote_alias = normalize_participant(remote_label) + participants.setdefault(remote_alias, remote_label) + # Prefer local IP alias (e.g., mcs1); otherwise use the local IP string. + if sp.local_ip: + if is_ip(sp.local_ip) and sp.local_ip not in ip_alias_map and sp.local_ip not in unaliased_ips_logged: + logger.info("Unaliased IP encountered (SERVER local): %s. Consider providing --alias %s=NAME", sp.local_ip, sp.local_ip) + unaliased_ips_logged.add(sp.local_ip) + local_label_effective = ip_alias_map.get(sp.local_ip, sp.local_ip) + else: + # As a last resort, use filename label + local_label_effective = sp.local_label + local_alias = normalize_participant(local_label_effective) + participants.setdefault(local_alias, local_label_effective) + arrow = _colorized_arrow('->', color) # SERVER handling is synchronous from remote to local + msgs.append(Message(sp.start_ts, remote_alias, local_alias, f"{trace_tag} [{time_str}] {label}{dur_str}{code_str}", arrow)) + # Add a note for the request with trace tag and start timestamp + directives.append(Directive(sp.start_ts, f"note over {remote_alias}, {local_alias}: {trace_tag} [{time_str}]")) + # Activate server processing lifeline only if duration is known and >= threshold + if sp.duration_ms is not None and sp.duration_ms >= ACTIVATION_MIN_MS: + directives.append(Directive(sp.start_ts, f"activate {local_alias} {color}")) + try: + srv_end = sp.start_ts + dt.timedelta(milliseconds=sp.duration_ms) + directives.append(Directive(srv_end, f"deactivate {local_alias}")) + except Exception: + pass + elif sp.duration_ms is None: + # Log that we couldn't determine server processing duration + logger.warning( + "No duration for SERVER span: tid=%s sid=%s method=%s path=%s local=%s", + sp.trace_id, sp.span_id, method, path, local_label_effective, + ) + elif sp.kind == "CLIENT": + # Structured attributes required: http_url, http_method + if not (sp.http_url and sp.http_method): + logger.warning( + "Skipping CLIENT span due to missing structured fields: tid=%s sid=%s name=%s http.url=%s http.method=%s", + sp.trace_id, sp.span_id, sp.name, sp.http_url, sp.http_method, + ) + continue + # Extract host and path from URL + try: + p = urlparse(sp.http_url) + host = p.hostname or sp.http_url + path = p.path or "/" + except Exception: + logger.warning("Failed to parse URL for CLIENT span (tid=%s sid=%s): %s", sp.trace_id, sp.span_id, sp.http_url) + host = sp.http_url + path = sp.http_url + label = f"{sp.http_method} {_normalize_cmapi_path(path)}" + # Replace loopback with file's real IP if available + if host in {"127.0.0.1", "::1"} and sp.local_ip: + orig = host + host = sp.local_ip + key = (orig, host) + if key not in logged_replacements: + file_label = os.path.basename(sp.local_file) if sp.local_file else "?" + logger.info("Loopback remapped (CLIENT): %s -> %s [file=%s]", orig, host, file_label) + logged_replacements.add(key) + # Normalize hostnames that embed an IP in dashed form to dotted IP when recognized + new_host = normalize_hostname_to_ip(host, known_ips, {}) + if new_host != host: + key = (host, new_host) + if key not in logged_replacements: + file_label = os.path.basename(sp.local_file) if sp.local_file else "?" + logger.info("Host normalized: %s -> %s [file=%s]", host, new_host, file_label) + logged_replacements.add(key) + host = new_host + # do not filter self traffic + # If host resolved to IP, allow user alias to apply + if is_ip(host) and host not in ip_alias_map and host not in unaliased_ips_logged: + logger.info("Unaliased IP encountered (CLIENT host): %s. Consider providing --alias %s=NAME", host, host) + unaliased_ips_logged.add(host) + remote_label = ip_alias_map.get(host, host) + remote_alias = normalize_participant(remote_label) + participants.setdefault(remote_alias, remote_label) + # Prefer local IP alias (e.g., mcs1); otherwise use the local IP string. + if sp.local_ip: + if is_ip(sp.local_ip) and sp.local_ip not in ip_alias_map and sp.local_ip not in unaliased_ips_logged: + logger.info("Unaliased IP encountered (CLIENT local): %s. Consider providing --alias %s=NAME", sp.local_ip, sp.local_ip) + unaliased_ips_logged.add(sp.local_ip) + local_label_effective = ip_alias_map.get(sp.local_ip, sp.local_ip) + else: + # As a last resort, use filename label + local_label_effective = sp.local_label + local_alias = normalize_participant(local_label_effective) + participants.setdefault(local_alias, local_label_effective) + # Use async arrow for async requests + is_sync = sp.request_is_sync + arrow = _colorized_arrow('->' if (is_sync is True) else '->>' if (is_sync is False) else '->', color) + msgs.append(Message(sp.start_ts, local_alias, remote_alias, f"{trace_tag} [{time_str}] {label}{dur_str}", arrow)) + # Add a response arrow back at end time if we know duration + end_ts: Optional[dt.datetime] = None + if sp.request_duration_ms is not None: + try: + end_ts = sp.start_ts + dt.timedelta(milliseconds=sp.request_duration_ms) + except Exception: + end_ts = None + elif sp.duration_ms is not None: + try: + end_ts = sp.start_ts + dt.timedelta(milliseconds=sp.duration_ms) + except Exception: + end_ts = None + if end_ts is not None: + end_time_str = end_ts.strftime("%H:%M:%S") + resp_arrow = '-->' if (is_sync is True or is_sync is None) else '-->>' + # Build details once + dur_val = None + if sp.request_duration_ms is not None: + dur_val = sp.request_duration_ms + elif sp.duration_ms is not None: + dur_val = sp.duration_ms + same_second = (end_time_str == time_str) + # Response label: always include method/path; timestamp first + resp_label_parts = ["Response", f"[{end_time_str}]", label] + if sp.status_code is not None: + code_token = str(sp.status_code) + if sp.status_code != 200: + code_token = f"{code_token}" + resp_label_parts.append(f"[{code_token}]") + else: + logger.error( + "Missing status_code for CLIENT response: tid=%s sid=%s url=%s", + sp.trace_id, sp.span_id, sp.http_url, + ) + resp_label = ' '.join(resp_label_parts) + msgs.append(Message(end_ts, remote_alias, local_alias, f"{trace_tag} {resp_label}", _colorized_arrow(resp_arrow, color))) + if same_second: + # Single combined note at the (end) time with tag and duration only + parts = [f"{trace_tag}"] + if dur_val is not None: + parts.append(f"dur={_fmt_secs(dur_val)}") + note_text = f"note over {local_alias}, {remote_alias}: " + ", ".join(parts) + directives.append(Directive(end_ts, note_text)) + else: + # Two separate notes: request note at start, response note at end + directives.append(Directive(sp.start_ts, f"note over {local_alias}, {remote_alias}: {trace_tag} [{time_str}]")) + parts = [f"{trace_tag}"] + if dur_val is not None: + parts.append(f"dur={_fmt_secs(dur_val)}") + note_text = f"note over {remote_alias}, {local_alias}: " + ", ".join(parts) + directives.append(Directive(end_ts, note_text)) + else: + # No end time available; emit only the request note at start + directives.append(Directive(sp.start_ts, f"note over {local_alias}, {remote_alias}: {trace_tag} [{time_str}]")) + logger.info( + "No duration for CLIENT request: tid=%s sid=%s method=%s url=%s", + sp.trace_id, sp.span_id, sp.http_method, sp.http_url, + ) + + # Emit participants sorted by label (case-insensitive) + items = list(participants.items()) # (alias, label) + items.sort(key=lambda x: x[1].lower()) + + def emit_part(alias: str, label: str): + if alias == label and alias.isidentifier(): + lines.append(f"participant {alias}") + else: + lines.append(f"participant \"{label}\" as {alias}") + + for alias, label in items: + emit_part(alias, label) + + # Sort and emit messages and directives merged by time + msgs.sort(key=lambda m: m.ts) + directives.sort(key=lambda d: d.ts) + i = j = 0 + while i < len(msgs) or j < len(directives): + if j >= len(directives) or (i < len(msgs) and msgs[i].ts <= directives[j].ts): + m = msgs[i] + lines.append(f"{m.src} {m.arrow} {m.dst} : {m.label}") + i += 1 + else: + d = directives[j] + lines.append(d.text) + j += 1 + + lines.append("@enduml") + return "\n".join(lines) + "\n" + + +def main(argv: Optional[List[str]] = None) -> int: + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + ap = argparse.ArgumentParser(description="Convert CMAPI tracing JSONL logs to PlantUML sequence diagram") + ap.add_argument("paths", nargs="+", help="One or more JSONL log files.") + ap.add_argument("--filter-trace", help="Only include spans for the given trace_id (tid)") + ap.add_argument("--alias", action="append", default=[], metavar="IP=NAME", help="Assign a label NAME to IP. May be used multiple times.") + ap.add_argument("--list-ips", action="store_true", help="Print only the set of IP addresses encountered and exit") + ap.add_argument("--output", default="diagram.puml", help="Output PlantUML path (default: diagram.puml)") + args = ap.parse_args(argv) + + if args.list_ips: + logs = args.paths + if not logs: + ap.error("At least one log path is required with --list-ips") + spans, known_ips = parse_jsonl_logs(logs) + else: + # Determine logs and output path + logs = args.paths + output_path = args.output + if not logs: + ap.error("Provide at least one JSONL log path") + spans, known_ips = parse_jsonl_logs(logs) + + # Build alias map from args.alias entries (used by both list-ips and diagram output) + ip_alias_map: Dict[str, str] = {} + for item in args.alias: + if "=" in item: + ip, name = item.split("=", 1) + ip = ip.strip() + name = name.strip() + if ip: + ip_alias_map[ip] = name + else: + logger.error("Invalid alias format: %s", item) + return 2 + + if args.list_ips: + to_print = sorted(set(known_ips)) + for ip in to_print: + parts = [ip] + if ip in ip_alias_map: + parts.append(f"alias={ip_alias_map[ip]}") + print(" ".join(parts)) + return 0 + + + puml = build_plantuml(spans, args.filter_trace, known_ips, ip_alias_map) + + if args.list_ips: + # Already handled above, but keep structure consistent + return 0 + + # Write to output file + with open(output_path, "w", encoding="utf-8") as f: + f.write(puml) + logger.info("Diagram written to %s", output_path) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/cmapi/tracing/span.py b/cmapi/tracing/span.py index cc1ac8996..163fd1ebc 100644 --- a/cmapi/tracing/span.py +++ b/cmapi/tracing/span.py @@ -43,4 +43,12 @@ class TraceSpan: def record_exception(self, exc: BaseException) -> None: self.tracer._notify_exception(self, exc) + def to_flat_dict(self) -> Dict[str, Any]: + fd = self.__dict__.copy() + fd['span_name'] = fd.pop('name') # name field is reserved in log records + # Remove non-serializable references + fd.pop('tracer', None) + attributes = fd.pop("attributes") + fd.update(attributes) + return fd diff --git a/cmapi/tracing/trace_tool.py b/cmapi/tracing/trace_tool.py index 5d0c278db..826830d91 100644 --- a/cmapi/tracing/trace_tool.py +++ b/cmapi/tracing/trace_tool.py @@ -8,7 +8,6 @@ import cherrypy from tracing.tracer import get_tracer - def _on_request_start() -> None: req = cherrypy.request tracer = get_tracer() @@ -22,20 +21,27 @@ def _on_request_start() -> None: trace_id, parent_span_id = tracer.extract_traceparent(headers) tracer.set_incoming_context(trace_id, parent_span_id) - span_name = f"{getattr(req, 'method', 'HTTP')} {getattr(req, 'path_info', '/')}" + requester_host = getattr(getattr(req, 'remote', None), 'ip', '') + + method = getattr(req, 'method', 'HTTP') + path = getattr(req, 'path_info', '/') + if requester_host: + span_name = f"{requester_host} --> {method} {path}" + else: + span_name = f"{method} {path}" ctx = tracer.start_as_current_span(span_name, kind="SERVER") span = ctx.__enter__() span.set_attribute('http.method', getattr(req, 'method', '')) span.set_attribute('http.path', getattr(req, 'path_info', '')) - span.set_attribute('client.ip', getattr(getattr(req, 'remote', None), 'ip', '')) + span.set_attribute('client.ip', requester_host) span.set_attribute('instance.hostname', socket.gethostname()) safe_headers = {k: v for k, v in headers.items() if k.lower() not in {'authorization', 'x-api-key'}} span.set_attribute('sentry.incoming_headers', safe_headers) req._trace_span_ctx = ctx req._trace_span = span - tracer.inject_traceparent(cherrypy.response.headers) # type: ignore[arg-type] + tracer.inject_traceparent(cherrypy.response.headers) def _on_request_end() -> None: diff --git a/cmapi/tracing/traced_aiohttp.py b/cmapi/tracing/traced_aiohttp.py index d9468e0b1..e71980c39 100644 --- a/cmapi/tracing/traced_aiohttp.py +++ b/cmapi/tracing/traced_aiohttp.py @@ -1,10 +1,17 @@ """Async sibling of TracedSession.""" from typing import Any +import logging +import time import aiohttp from tracing.tracer import get_tracer +# Limit for raw JSON string preview (in characters) +_PREVIEW_MAX_CHARS = 512 + +logger = logging.getLogger("tracer") + class TracedAsyncSession(aiohttp.ClientSession): async def _request( @@ -22,6 +29,7 @@ class TracedAsyncSession(aiohttp.ClientSession): with tracer.start_as_current_span(span_name, kind="CLIENT") as span: span.set_attribute("http.method", method) span.set_attribute("http.url", url_text) + span.set_attribute("request_is_sync", False) tracer.inject_outbound_headers(headers) try: response = await super()._request(method, str_or_url, *args, **kwargs) @@ -30,7 +38,11 @@ class TracedAsyncSession(aiohttp.ClientSession): raise else: span.set_attribute("http.status_code", response.status) + await _record_outbound_json_preview(response, span) return response + finally: + duration_ms = (time.time_ns() - span.start_ns) / 1_000_000.0 + span.set_attribute("request_duration_ms", duration_ms) def create_traced_async_session(**kwargs: Any) -> TracedAsyncSession: @@ -38,3 +50,20 @@ def create_traced_async_session(**kwargs: Any) -> TracedAsyncSession: +async def _record_outbound_json_preview(response: aiohttp.ClientResponse, span) -> None: + """If response is JSON, attach small part of it to span + + We don't use streaming in aiohttp, so reading text is safe here. + """ + try: + content_type = str(response.headers.get('Content-Type', '')).lower() + if 'application/json' not in content_type: + return + text = await response.text() + if text is None: + text = "" + span.set_attribute('http.response.body.size', len(text)) + span.set_attribute('http.response.json', text[:_PREVIEW_MAX_CHARS]) + except Exception: + logger.exception("Could not extract JSON response body") + return None diff --git a/cmapi/tracing/traced_session.py b/cmapi/tracing/traced_session.py index 823023a26..49cb1d869 100644 --- a/cmapi/tracing/traced_session.py +++ b/cmapi/tracing/traced_session.py @@ -1,9 +1,16 @@ """Customized requests.Session that automatically traces outbound HTTP calls.""" from typing import Any, Optional +import logging +import time import requests -from tracing.tracer import get_tracer +from tracing.tracer import get_tracer, TraceSpan + +# Limit for raw JSON string preview (in characters) +_PREVIEW_MAX_CHARS = 512 + +logger = logging.getLogger("tracer") class TracedSession(requests.Session): @@ -19,6 +26,7 @@ class TracedSession(requests.Session): with tracer.start_as_current_span(span_name, kind="CLIENT") as span: span.set_attribute("http.method", method) span.set_attribute("http.url", url) + span.set_attribute("request_is_sync", True) tracer.inject_outbound_headers(headers) try: @@ -28,7 +36,11 @@ class TracedSession(requests.Session): raise else: span.set_attribute("http.status_code", response.status_code) + _record_outbound_json_preview(response, span) return response + finally: + duration_ms = (time.time_ns() - span.start_ns) / 1_000_000.0 + span.set_attribute("request_duration_ms", duration_ms) _default_session: Optional[TracedSession] = None @@ -41,4 +53,14 @@ def get_traced_session() -> TracedSession: return _default_session - +def _record_outbound_json_preview(response: requests.Response, span: TraceSpan) -> None: + """If response is JSON, attach small part of it to span""" + try: + content_type = str(response.headers.get('Content-Type', '')).lower() + if 'application/json' not in content_type: + return + text = response.text # requests will decode using inferred/declared encoding + span.set_attribute('http.response.body.size', len(text)) + span.set_attribute('http.response.json', text[:_PREVIEW_MAX_CHARS]) + except Exception: + logger.exception("Could not extract JSON response body") diff --git a/cmapi/tracing/traceparent_backend.py b/cmapi/tracing/traceparent_backend.py index e486f9bd3..f5f57fb15 100644 --- a/cmapi/tracing/traceparent_backend.py +++ b/cmapi/tracing/traceparent_backend.py @@ -2,37 +2,63 @@ import logging import time from typing import Any, Dict, Optional +import cherrypy +from mcs_node_control.models.node_config import NodeConfig from tracing.tracer import TracerBackend, TraceSpan from tracing.utils import swallow_exceptions -logger = logging.getLogger("tracing") +logger = logging.getLogger("tracer") +json_logger = logging.getLogger("json_trace") class TraceparentBackend(TracerBackend): """Default backend that logs span lifecycle and mirrors events/status.""" + def __init__(self): + my_addresses = list(NodeConfig().get_network_addresses()) + logger.info("My addresses: %s", my_addresses) + json_logger.info("my_addresses", extra={"my_addresses": my_addresses}) @swallow_exceptions def on_span_start(self, span: TraceSpan) -> None: logger.info( - "span_begin name=%s kind=%s trace_id=%s span_id=%s parent=%s attrs=%s", + "span_begin name='%s' kind=%s tid=%s sid=%s%s", span.name, span.kind, span.trace_id, span.span_id, - span.parent_span_id, span.attributes, + f' psid={span.parent_span_id}' if span.parent_span_id else '', ) + json_logger.info("span_begin", extra=span.to_flat_dict()) @swallow_exceptions def on_span_end(self, span: TraceSpan, exc: Optional[BaseException]) -> None: - duration_ms = (time.time_ns() - span.start_ns) / 1_000_000 + end_ns = time.time_ns() + duration_ms = (end_ns - span.start_ns) / 1_000_000 + span.set_attribute("duration_ms", duration_ms) + span.set_attribute("end_ns", end_ns) + + # Try to set status code if not already set + if span.kind == "SERVER" and "http.status_code" not in span.attributes: + try: + status_str = str(cherrypy.response.status) + code = int(status_str.split()[0]) + span.set_attribute("http.status_code", code) + except Exception: + pass + logger.info( - "span_end name=%s kind=%s trace_id=%s span_id=%s parent=%s duration_ms=%.3f attrs=%s", + "span_end name='%s' kind=%s tid=%s sid=%s%s duration_ms=%.3f", span.name, span.kind, span.trace_id, span.span_id, - span.parent_span_id, duration_ms, span.attributes, + f' psid={span.parent_span_id}' if span.parent_span_id else '', + duration_ms, ) + json_logger.info("span_end", extra=span.to_flat_dict()) @swallow_exceptions def on_span_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None: logger.info( - "span_event name=%s trace_id=%s span_id=%s attrs=%s", - name, span.trace_id, span.span_id, attrs, + "span_event name='%s' tid=%s sid=%s%s attrs=%s", + name, span.trace_id, span.span_id, + f' psid={span.parent_span_id}' if span.parent_span_id else '', + attrs, ) + json_logger.info("span_event", extra=span.to_flat_dict()) diff --git a/cmapi/tracing/utils.py b/cmapi/tracing/utils.py index 26e9d4a0c..e67e003f0 100644 --- a/cmapi/tracing/utils.py +++ b/cmapi/tracing/utils.py @@ -50,5 +50,3 @@ def parse_traceparent(header: str) -> Optional[Tuple[str, str, str]]: except Exception: logger.exception("Failed to parse traceparent: %s", header) return None - -