1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-11-02 06:13:16 +03:00

Merge branch 'stable-23.10' into feat/MCOL-6072-parallel-scan-4-CES-4

This commit is contained in:
drrtuy
2025-09-12 10:44:01 +01:00
23 changed files with 1348 additions and 345 deletions

View File

@@ -1,3 +1,11 @@
## DBRM unresponsive timeout
The master DBRM node previously used a fixed 300s (5 minutes) timeout before forcing read-only mode when a worker didn't respond. This is now configurable via Columnstore.xml only:
- SystemConfig/DBRMUnresponsiveTimeout (seconds, default 300)
The value controls how long the master waits for workers to reconfigure/respond after a network error is detected before switching to read-only. See `versioning/BRM/masterdbrmnode.cpp` for details.
This file documents helpful knowledge, commands and setup instructions for better developing flow.
## Logging

7
cmapi/.gitignore vendored
View File

@@ -91,3 +91,10 @@ buildinfo.txt
# Self-signed certificates
cmapi_server/self-signed.crt
cmapi_server/self-signed.key
# Comes in handy if you build packages locally
pp
mcs
mcs_aws
mcs_gsutil
_CPack_Packages/
venv/

View File

@@ -58,6 +58,7 @@
<DBRMRoot>/var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves</DBRMRoot>
<TableLockSaveFile>/var/lib/columnstore/data1/systemFiles/dbrm/tablelocks</TableLockSaveFile>
<DBRMTimeOut>15</DBRMTimeOut> <!-- in seconds -->
<DBRMUnresponsiveTimeout>300</DBRMUnresponsiveTimeout>
<DBRMSnapshotInterval>100000</DBRMSnapshotInterval>
<WaitPeriod>10</WaitPeriod> <!-- in seconds -->
<MemoryCheckPercent>95</MemoryCheckPercent> <!-- Max real memory to limit growth of buffers to -->

View File

@@ -24,7 +24,7 @@ from tracing.trace_tool import register_tracing_tools
from cmapi_server import helpers
from cmapi_server.constants import DEFAULT_MCS_CONF_PATH, CMAPI_CONF_PATH
from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error
from cmapi_server.controllers.dispatcher import dispatcher, jsonify_error, jsonify_404
from cmapi_server.failover_agent import FailoverAgent
from cmapi_server.managers.application import AppManager
from cmapi_server.managers.process import MCSProcessManager
@@ -155,6 +155,7 @@ if __name__ == '__main__':
root_config = {
"request.dispatch": dispatcher,
"error_page.default": jsonify_error,
"error_page.404": jsonify_404,
# Enable tracing tools
'tools.trace.on': True,
'tools.trace_end.on': True,

View File

@@ -3,6 +3,9 @@
"filters": {
"add_ip_filter": {
"()": "cmapi_server.logging_management.AddIpFilter"
},
"trace_params_filter": {
"()": "cmapi_server.logging_management.TraceParamsFilter"
}
},
"formatters": {
@@ -17,25 +20,30 @@
"container_sh": {
"format" : "`%(asctime)s`: %(message)s",
"datefmt": "%a %d %b %Y %I:%M:%S %p %Z"
},
"json_formatter": {
"()": "cmapi_server.logging_management.JsonFormatter"
}
},
"handlers": {
"cmapi_server": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"filters": ["add_ip_filter"],
"filters": ["add_ip_filter", "trace_params_filter"],
"formatter": "cmapi_server",
"stream": "ext://sys.stdout"
},
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"filters": ["trace_params_filter"],
"formatter": "default",
"stream": "ext://sys.stdout"
},
"file": {
"level": "DEBUG",
"class": "logging.handlers.RotatingFileHandler",
"filters": ["trace_params_filter"],
"formatter": "default",
"filename": "/var/log/mariadb/columnstore/cmapi_server.log",
"mode": "a",
@@ -52,6 +60,16 @@
"maxBytes": 1024,
"backupCount": 3,
"encoding": "utf8"
},
"json_trace": {
"level": "DEBUG",
"class": "logging.handlers.RotatingFileHandler",
"formatter": "json_formatter",
"filename": "/var/log/mariadb/columnstore/cmapi_json_trace.log",
"mode": "a",
"maxBytes": 1048576,
"backupCount": 10,
"encoding": "utf8"
}
},
"loggers": {
@@ -78,6 +96,11 @@
"root": {
"handlers": ["console", "file"],
"level": "DEBUG"
},
"json_trace": {
"handlers": ["json_trace"],
"level": "DEBUG",
"propagate": false
}
},
"disable_existing_loggers": false

View File

@@ -1,4 +1,5 @@
import json
import logging
import cherrypy
@@ -13,6 +14,7 @@ from cmapi_server.controllers.s3dataload import S3DataLoadController
_version = '0.4.0'
dispatcher = cherrypy.dispatch.RoutesDispatcher()
logger = logging.getLogger(__name__)
# /_version/status (GET)
@@ -280,3 +282,18 @@ def jsonify_error(status, message, traceback, version): \
cherrypy.response.status = status
return response_body
def jsonify_404(status, message, traceback, version):
# pylint: disable=unused-argument
"""Specialized renderer for 404 Not Found that logs context, then renders JSON.
"""
try:
req = cherrypy.request
method = getattr(req, 'method', '')
path = getattr(req, 'path_info', '') or '/'
remote_ip = getattr(getattr(req, 'remote', None), 'ip', '') or '?'
logger.error("404 Not Found: %s %s from %s", method, path, remote_ip)
except Exception:
pass
return jsonify_error(status, message, traceback, version)

View File

@@ -17,27 +17,23 @@ class AddIpFilter(logging.Filter):
return True
def install_trace_record_factory() -> None:
"""Install a LogRecord factory that adds 'trace_params' field.
'trace_params' will be an empty string if there is no active trace/span
(like in MainThread, where there is no incoming requests).
Otherwise it will contain trace parameters.
"""
current_factory = logging.getLogRecordFactory()
class TraceParamsFilter(logging.Filter):
"""Filter that adds trace_params to log records, except for the 'tracer' logger."""
def filter(self, record: logging.LogRecord) -> bool:
# Don't print trace params for tracer logger family; it already prints trace data
if record.name == 'tracer' or record.name.startswith('tracer.'):
record.trace_params = ""
return True
def factory(*args, **kwargs): # type: ignore[no-untyped-def]
record = current_factory(*args, **kwargs)
trace_id, span_id, parent_span_id = get_tracer().current_trace_ids()
if trace_id and span_id:
record.trace_params = f'rid={trace_id} sid={span_id}'
trace_params = f"rid={trace_id} sid={span_id}"
if parent_span_id:
record.trace_params += f' psid={parent_span_id}'
trace_params += f" psid={parent_span_id}"
record.trace_params = trace_params
else:
record.trace_params = ""
return record
logging.setLogRecordFactory(factory)
return True
def custom_cherrypy_error(
self, msg='', context='', severity=logging.INFO, traceback=False
@@ -142,8 +138,7 @@ def config_cmapi_server_logging():
cherrypy._cplogging.LogManager.access_log_format = (
'{h} ACCESS "{r}" code {s}, bytes {b}, user-agent "{a}"'
)
# Ensure trace_params is available on every record
install_trace_record_factory()
# trace_params are populated via TraceParamsFilter configured in logging config
dict_config(CMAPI_LOG_CONF_PATH)
disable_unwanted_loggers()
@@ -164,3 +159,22 @@ def change_loggers_level(level: str):
def disable_unwanted_loggers():
logging.getLogger("urllib3").setLevel(logging.WARNING)
class JsonFormatter(logging.Formatter):
# Standard LogRecord fields
skip_fields = set(logging.LogRecord('',0,'',0,'',(),None).__dict__.keys())
def format(self, record):
data = {
"ts": self.formatTime(record, self.datefmt),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
}
# Extract extras from the record (all attributes except standard LogRecord fields)
for k, v in record.__dict__.items():
if k not in self.skip_fields:
data[k] = v
# Allow non-serializable extras (e.g., bytes, datetime) to be stringified
return json.dumps(data, ensure_ascii=False, sort_keys=True, default=str)

View File

@@ -123,7 +123,7 @@ def add_node(
def remove_node(
node: str, input_config_filename: str = DEFAULT_MCS_CONF_PATH,
output_config_filename: Optional[str] = None,
deactivate_only: bool = True,
deactivate_only: bool = False,
use_rebalance_dbroots: bool = True, **kwargs
):
"""Remove node from a cluster.

View File

@@ -237,6 +237,7 @@
<DBRMRoot>/var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves</DBRMRoot>
<TableLockSaveFile>/var/lib/columnstore/data1/systemFiles/dbrm/tablelocks</TableLockSaveFile>
<DBRMTimeOut>15</DBRMTimeOut> <!-- in seconds -->
<DBRMUnresponsiveTimeout>300</DBRMUnresponsiveTimeout>
<DBRMSnapshotInterval>100000</DBRMSnapshotInterval>
<WaitPeriod>10</WaitPeriod> <!-- in seconds -->
<MemoryCheckPercent>95</MemoryCheckPercent> <!-- Max real memory to limit growth of buffers to -->

View File

@@ -239,6 +239,7 @@
<DBRMRoot>/var/lib/columnstore/data1/systemFiles/dbrm/BRM_saves</DBRMRoot>
<TableLockSaveFile>/var/lib/columnstore/data1/systemFiles/dbrm/tablelocks</TableLockSaveFile>
<DBRMTimeOut>20</DBRMTimeOut> <!-- in seconds -->
<DBRMUnresponsiveTimeout>300</DBRMUnresponsiveTimeout>
<DBRMSnapshotInterval>100000</DBRMSnapshotInterval>
<WaitPeriod>10</WaitPeriod> <!-- in seconds -->
<MemoryCheckPercent>95</MemoryCheckPercent> <!-- Max real memory to limit growth of buffers to -->

View File

@@ -7,6 +7,7 @@ import socket
from os import mkdir, replace, chown
from pathlib import Path
from shutil import copyfile
from typing import Iterator
from xml.dom import minidom # to pick up pretty printing functionality
from lxml import etree
@@ -366,12 +367,10 @@ class NodeConfig:
return False
def get_network_addresses(self):
def get_network_addresses(self) -> Iterator[str]:
"""Retrievs the list of the network addresses
Generator that yields network interface addresses.
:rtype: str
"""
for ni in get_network_interfaces():
for fam in [socket.AF_INET, socket.AF_INET6]:

View File

@@ -6,7 +6,8 @@ enterprise_token=""
dev_drone_key=""
ci_user=""
ci_pwd=""
cs_pkg_manager_version="3.10"
cs_pkg_manager_version="3.11"
USE_DEV_PACKAGES=false
if [ ! -f /var/lib/columnstore/local/module ]; then pm="pm1"; else pm=$(cat /var/lib/columnstore/local/module); fi;
pm_number=$(echo "$pm" | tr -dc '0-9')
action=$1
@@ -695,6 +696,7 @@ Flags:
-mp | --maxscale-pwd Maxscale password Default: Mariadb123%
-cu | --cross-engine-user Cross-engine user Default: cross_engine
-cp | --cross-engine-pwd Cross-engine password Default: Mariadb123%
--with-dev Include development client packages (MariaDB-devel on RPM, libmariadb-dev on DEB)
-h | --help Help Text
Example:
@@ -842,6 +844,10 @@ parse_install_cluster_additional_args() {
install|enterprise|community|dev|local)
shift # past argument
;;
--with-dev)
USE_DEV_PACKAGES=true
shift # past argument
;;
-t | --token)
enterprise_token="$2"
shift # past argument
@@ -1819,6 +1825,13 @@ do_community_yum_install() {
exit 1;
fi
# Optionally install dev client packages
if $USE_DEV_PACKAGES; then
if ! yum install MariaDB-devel -y; then
printf "\n[!] Failed to install MariaDB-devel (dev packages) \n\n"
fi
fi
# Install CMAPI
if $CONFIGURE_CMAPI ; then
cmapi_installable=$(yum list | grep MariaDB-columnstore-cmapi)
@@ -1855,6 +1868,13 @@ do_community_apt_install() {
exit 1;
fi;
# Optionally install dev client packages
if $USE_DEV_PACKAGES; then
if ! apt install libmariadb-dev -y --quiet; then
printf "\n[!] Failed to install libmariadb-dev (dev packages) \n\n"
fi
fi
# Install CMAPI
if $CONFIGURE_CMAPI ; then
if ! apt install mariadb-columnstore-cmapi jq -y --quiet ; then
@@ -2171,7 +2191,11 @@ EOF
do_ci_yum_install() {
# list packages
grep_list="MariaDB-backup-*|MariaDB-client-*|MariaDB-columnstore-engine-*|MariaDB-common-*|MariaDB-server-*|MariaDB-shared-*|galera-enterprise-*|cmapi"
if $USE_DEV_PACKAGES; then
grep_list="${grep_list}|MariaDB-devel-*"
fi
rpm_list=$(curl -s -u $ci_user:$ci_pwd $ci_url/$os_package/ | grep -oP '(?<=href=").+?\.rpm' | sed 's/.*\///' | grep -v debug | grep -E "$grep_list")
if [ -z "$rpm_list" ]; then
@@ -2195,7 +2219,11 @@ do_ci_yum_install() {
fi
# Install MariaDB Server
if ! yum install MariaDB-server-* galera-enterprise-* MariaDB-client-* MariaDB-common-* MariaDB-shared-* -y; then
install_list="MariaDB-server-* galera-enterprise-* MariaDB-client-* MariaDB-common-* MariaDB-shared-*"
if $USE_DEV_PACKAGES; then
install_list+=" MariaDB-devel-*"
fi
if ! yum install $install_list -y; then
printf "\n[!] Failed to install MariaDB-server \n\n"
exit 1;
fi
@@ -2249,6 +2277,9 @@ do_ci_apt_install() {
# list packages
echo "++++++++++++++++++++++++++++++++++++++++++++"
grep_list="mariadb-backup-*|mariadb-client-*|mariadb-plugin-columnstore-*|mariadb-common*|mariadb-server-*|mariadb-shared-*|galera-enterprise-*|cmapi|libmariadb3|mysql-common"
if $USE_DEV_PACKAGES; then
grep_list="${grep_list}|libmariadb-dev"
fi
rpm_list=$(curl -s -u $ci_user:$ci_pwd $ci_url/$os_package/ | grep -oP '(?<=href=").+?\.deb' | sed 's/.*\///' | grep -v debug | grep -E "$grep_list")
if [ -z "$rpm_list" ]; then
@@ -2271,7 +2302,12 @@ do_ci_apt_install() {
apt-get clean
DEBIAN_FRONTEND=noninteractive sudo apt install gawk libdbi-perl lsof perl rsync -y --quiet
DEBIAN_FRONTEND=noninteractive sudo apt install libdbi-perl socat libhtml-template-perl -y --quiet
if ! DEBIAN_FRONTEND=noninteractive apt install $(pwd)/mysql-common*.deb $(pwd)/mariadb-server*.deb $(pwd)/galera-enterprise-* $(pwd)/mariadb-common*.deb $(pwd)/mariadb-client-*.deb $(pwd)/libmariadb3*.deb -y --quiet; then
# Optionally include libmariadb-dev if it was downloaded
DEV_DEB=""
if $USE_DEV_PACKAGES; then
DEV_DEB="$(ls $(pwd)/libmariadb-dev*.deb 2>/dev/null || true)"
fi
if ! DEBIAN_FRONTEND=noninteractive apt install $(pwd)/mysql-common*.deb $(pwd)/mariadb-server*.deb $(pwd)/galera-enterprise-* $(pwd)/mariadb-common*.deb $(pwd)/mariadb-client-*.deb $(pwd)/libmariadb3*.deb $DEV_DEB -y --quiet; then
printf "\n[!] Failed to install mariadb-server \n\n"
exit 1;
fi
@@ -3887,6 +3923,10 @@ parse_download_additional_args() {
key="$1"
case $key in
--with-dev)
USE_DEV_PACKAGES=true
shift # past argument
;;
-t | --token)
enterprise_token="$2"
shift # past argument
@@ -4254,6 +4294,14 @@ download_dev() {
echo "------------------------------------------------------------------"
check_dev_build_exists
# Optional dev package includes
DEV_RPM_INCLUDE=""
DEV_DEB_INCLUDE=""
if $USE_DEV_PACKAGES; then
DEV_RPM_INCLUDE='--include "MariaDB-devel-*.rpm"'
DEV_DEB_INCLUDE='--include "libmariadb-dev*.deb" --include "libmariadb-dev-compat*.deb"'
fi
case $distro_info in
centos | rhel | rocky | almalinux )
@@ -4276,6 +4324,7 @@ download_dev() {
--include "MariaDB-columnstore-cmapi-*.rpm" \
--include "MariaDB-columnstore-engine-*.rpm" \
--include "MariaDB-shared-*.rpm" \
$DEV_RPM_INCLUDE \
--include "MariaDB-backup-*.rpm" \
--include "MariaDB-client-*.rpm" \
--include "galera*" \
@@ -4288,6 +4337,7 @@ download_dev() {
--include "MariaDB-columnstore-cmapi-*.rpm" \
--include "MariaDB-columnstore-engine-*.rpm" \
--include "MariaDB-shared-*.rpm" \
$DEV_RPM_INCLUDE \
--include "MariaDB-backup-*.rpm" \
--include "MariaDB-client-*.rpm" \
--include "galera*" \
@@ -4309,30 +4359,33 @@ download_dev() {
aws s3 cp $s3_path/ . --exclude "*" --include "*.deb" --recursive --no-sign-request
fi
if [ "$remove_debug" == true ]; then
aws s3 cp $s3_path/ . --recursive --exclude "*" \
--include "mariadb-server*.deb" \
--include "mariadb-common*.deb" \
--include "mariadb-columnstore-cmapi*.deb" \
--include "mariadb-plugin-columnstore*.deb" \
--include "mysql-common*.deb" \
--include "mariadb-client*.deb" \
--include "libmariadb3_*.deb" \
--include "galera*" \
--include "jemalloc*" \
--exclude "*debug*" --no-sign-request
else
aws s3 cp $s3_path/ . --recursive --exclude "*" \
--include "mariadb-server*.deb" \
--include "mariadb-common*.deb" \
--include "mariadb-columnstore-cmapi*.deb" \
--include "mariadb-plugin-columnstore*.deb" \
--include "mysql-common*.deb" \
--include "mariadb-client*.deb" \
--include "libmariadb3_*.deb" \
--include "galera*" \
--include "jemalloc*" --no-sign-request
AWS_ARGS=("s3" "cp" "$s3_path/" "." "--recursive" "--exclude" "*")
AWS_ARGS+=(
"--include" "mariadb-server*.deb"
"--include" "mariadb-common*.deb"
"--include" "mariadb-columnstore-cmapi*.deb"
"--include" "mariadb-plugin-columnstore*.deb"
"--include" "mysql-common*.deb"
"--include" "mariadb-client*.deb"
"--include" "libmariadb3_*.deb"
"--include" "galera*"
"--include" "jemalloc*"
)
# Optional dev headers
if $USE_DEV_PACKAGES; then
AWS_ARGS+=(
"--include" "libmariadb-dev*.deb"
"--include" "libmariadb-dev-compat*.deb"
)
fi
# Exclude debug if requested
if [ "$remove_debug" == true ]; then
AWS_ARGS+=("--exclude" "*debug*")
fi
# Always add no-sign-request
AWS_ARGS+=("--no-sign-request")
aws "${AWS_ARGS[@]}"
;;
*) # unknown option

View File

@@ -0,0 +1,773 @@
#!/usr/bin/env python3
"""
Read CMAPI tracing JSONL logs (JsonFormatter) from one or more nodes and render a
PlantUML sequence diagram showing inbound (SERVER) and outbound (CLIENT) requests
between nodes.
Inputs
- One or more JSONL log files
- Each input log file is assumed to be from a single node.
Output
- PlantUML written to the specified output file.
Parsing and correlation
- Detects span_begin/span_end events from JSONL (msg in {"span_begin","span_end"}).
- Per file, reads the "my_addresses" record and takes the first non-loopback IP (preferring IPv4)
as the node's local IP; all spans from that file carry this local IP.
- Merge spans across files by (trace_id, span_id). Duration and status_code are added when present.
Normalization and aliasing
- Localhost remap: any 127.0.0.1/::1 seen in that file is replaced with the file's local IP.
- Remote host normalization: hostnames with embedded IPs (dashed or dotted) are converted to dotted IPs.
- Aliasing: if an IP is provided via --alias IP=NAME, the diagram uses NAME for that participant.
Diagram semantics
- SERVER spans: "<remote> --> METHOD /path" render as remote -> local (local is the file's node).
- CLIENT spans: "HTTP METHOD https://host/path" render as local -> remote.
- CLIENT spans prefer request_duration_ms; otherwise duration_ms is shown if available.
- SERVER spans include [status_code] when available.
CLI options
- --filter-trace TRACE_ID Only include the specified trace_id.
- --alias IP=NAME Map an IP to a friendly name (may be specified multiple times).
- --list-ips Print the set of encountered IPs and exit.
Usage examples
python3 cmapi/scripts/trace_to_plantuml.py json_trace_mcs1 json_trace_mcs2 json_trace_mcs3 --output diagram.puml
python3 cmapi/scripts/trace_to_plantuml.py json_trace_mcs1 json_trace_mcs2 json_trace_mcs3 --output diagram.puml
--filter-trace 1f3b2cb2... --alias 172.31.41.127=mcs2 --alias 172.31.45.34=mcs3
"""
from __future__ import annotations
import argparse
import colorsys
import dataclasses
import datetime as dt
import hashlib
import ipaddress
import json
import logging
import os
import re
import sys
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse
JSON_TS_FORMATS = [
"%Y-%m-%d %H:%M:%S,%f", # default logging formatter
"%Y-%m-%d %H:%M:%S", # without millis
]
# Minimum processing time to show activation lifeline (in milliseconds)
ACTIVATION_MIN_MS = 1000
# dashed IP fragment pattern; will convert '172-31-45-34' -> '172.31.45.34'
DASHED_IP_RE = re.compile(r"(?P<dashed>\b\d{1,3}(?:-\d{1,3}){3}\b)")
logger = logging.getLogger("trace_viz")
def _normalize_cmapi_path(path: str) -> str:
"""Remove common CMAPI prefixes like '/cmapi/0.4.0' and '/cmapi/'.
"""
if not path:
return "/"
# normalize double slashes
while '//' in path:
path = path.replace('//', '/')
prefixes = [
'/cmapi/0.4.0',
'/cmapi/',
]
for pref in prefixes:
if path.startswith(pref):
path = path[len(pref):]
break
if not path.startswith('/'):
path = '/' + path
return path if path != '//' else '/'
def _hex_color_for_trace(trace_id: str, depth: int) -> str:
"""Derive a stable color from trace_id and vary by depth.
We map trace_id hash to a base hue and then shift lightness slightly per depth.
Returns a hex color string like '#a1b2c3'.
"""
h = hashlib.sha1(trace_id.encode('utf-8')).digest()
# base hue from first byte, saturation fixed, lightness varies by depth
base_hue = h[0] / 255.0 # 0..1
sat = 0.55
base_lightness = 0.55
# depth shift: +/- 0.06 per level alternating
shift = ((depth % 6) - 3) * 0.02 # -0.06..+0.06
lightness = max(0.35, min(0.75, base_lightness + shift))
# convert HSL to RGB
r, g, b = colorsys.hls_to_rgb(base_hue, lightness, sat)
return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}"
def _colorized_arrow(arrow: str, color_hex: str) -> str:
"""Insert a PlantUML color into an arrow token.
Examples:
'->' -> '-[#hex]->'
'-->' -> '-[#hex]-->'
'->>' -> '-[#hex]->>'
'-->>' -> '-[#hex]-->>'
"""
if not color_hex:
return arrow
if arrow.startswith('--'):
return f"-[{color_hex}]{arrow}"
return f"-[{color_hex}]{arrow}"
def _fmt_secs(ms: float) -> str:
"""Format milliseconds as seconds with 2 decimal places."""
return f"{(ms or 0.0)/1000.0:.2f}s"
@dataclasses.dataclass
class Span:
trace_id: str
span_id: str
parent_span_id: Optional[str]
kind: str # SERVER or CLIENT
name: str
start_ts: dt.datetime
duration_ms: Optional[float] = None
status_code: Optional[int] = None
request_duration_ms: Optional[float] = None
local_label: str = "local"
local_ip: Optional[str] = None
local_file: Optional[str] = None # full path to source log file
# Structured attributes
http_method: Optional[str] = None
http_path: Optional[str] = None
http_url: Optional[str] = None
client_ip: Optional[str] = None
request_is_sync: Optional[bool] = None
@dataclasses.dataclass
class Message:
ts: dt.datetime
src: str
dst: str
label: str
arrow: str # '->', '->>', '-->', '-->>'
@dataclasses.dataclass
class Directive:
ts: dt.datetime
text: str # e.g., 'activate mcs1' or 'deactivate mcs1'
def parse_json_ts(ts_str: str) -> Optional[dt.datetime]:
for fmt in JSON_TS_FORMATS:
try:
return dt.datetime.strptime(ts_str, fmt)
except Exception:
continue
return None
def parse_json_span_event(obj: Dict[str, object]) -> Optional[Tuple[str, Dict[str, object]]]:
"""Return (msg, data) if this JSON object is a span_begin/span_end with usable fields."""
msg = str(obj.get("msg", ""))
if msg not in {"span_begin", "span_end"}:
return None
return msg, obj
def local_label_from_path(path: str) -> str:
base = os.path.basename(path)
return base
def collect_known_ips_from_span(sp: "Span", known_ips: set[str]) -> None:
"""Populate known IPs using structured span attributes only."""
try:
if sp.kind == "SERVER" and sp.client_ip and is_ip(sp.client_ip):
known_ips.add(sp.client_ip)
elif sp.kind == "CLIENT" and sp.http_url:
try:
host = urlparse(sp.http_url).hostname
except Exception:
host = None
if host and is_ip(host):
known_ips.add(host)
except Exception:
pass
def normalize_participant(name: str) -> str:
# PlantUML participant names cannot contain spaces or some punctuation unless quoted.
# We'll use an alias form: participant "label" as alias, where alias is alnum/underscore.
# This function returns a safe alias; the label is kept original elsewhere.
alias = re.sub(r"[^A-Za-z0-9_]", "_", name)
# Avoid alias starting with a digit
if alias and alias[0].isdigit():
alias = f"n_{alias}"
return alias or "unknown"
# Legacy name-parsing helpers removed; rely on structured attributes.
def is_ip(value: str) -> bool:
try:
ipaddress.ip_address(value)
return True
except Exception:
return False
def parse_jsonl_logs(log_paths: List[str]) -> Tuple[List[Span], List[str]]:
spans_by_key: Dict[Tuple[str, str], Span] = {}
known_ips: set[str] = set()
for path in log_paths:
local_label = local_label_from_path(path)
# Per-file local IP discovered from a special 'my_addresses' record
file_local_ip: Optional[str] = None
if not os.path.exists(path):
logger.error("Input log file does not exist: %s", path)
raise FileNotFoundError(path)
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception as exc:
logger.error("Failed to parse JSON line in %s: %s", path, line[:256])
raise ValueError(f"JSON parse error in {path}: {exc}")
# Detect and store local IPs for this file
if str(obj.get("msg", "")) == "my_addresses":
addrs = obj.get("my_addresses")
if isinstance(addrs, list):
# Pick first non-loopback IPv4 if available, else first non-loopback
candidates = [a for a in addrs if isinstance(a, str) and a not in {"127.0.0.1", "::1"}]
# Prefer IPv4
ipv4 = [a for a in candidates if re.match(r"^\d{1,3}(?:\.\d{1,3}){3}$", a)]
chosen = ipv4[0] if ipv4 else (candidates[0] if candidates else None)
if chosen:
file_local_ip = chosen
known_ips.add(chosen)
else:
logger.error("No non-loopback addresses found in my_addresses for %s: %s", path, addrs)
raise ValueError(f"No non-loopback addresses found in my_addresses for {path}: {addrs}")
# continue scanning; spans may come later
continue
parsed = parse_json_span_event(obj)
if not parsed:
continue
msg, data = parsed
ts_raw = str(data.get("ts", ""))
ts = parse_json_ts(ts_raw)
if ts is None:
logger.error("Unparseable timestamp '%s' in %s; defaulting to epoch", ts_raw, path)
ts = dt.datetime.fromtimestamp(0)
# Fields from flattened span dict
name = str(data.get("span_name", data.get("name", "")))
kind = str(data.get("kind", "")).upper()
tid = str(data.get("trace_id", ""))
sid = str(data.get("span_id", ""))
psid = data.get("parent_span_id")
key = (tid, sid)
if msg == "span_begin":
if not tid or not sid or not name or not kind:
logger.error(
"Malformed span_begin in %s (missing fields): tid=%s sid=%s name=%s kind=%s",
path, tid, sid, name, kind,
)
raise ValueError(f"Malformed span_begin in {path} (missing fields): tid={tid} sid={sid} name={name} kind={kind}")
if kind not in {"SERVER", "CLIENT"}:
logger.error(
"Unexpected span kind in span_begin: kind=%s tid=%s sid=%s name=%s (file=%s)",
kind, tid, sid, name, path,
)
raise ValueError(f"Unexpected span kind in span_begin: kind={kind} tid={tid} sid={sid} name={name} (file={path})")
sp = Span(
trace_id=tid,
span_id=sid,
parent_span_id=str(psid) if psid else None,
kind=kind,
name=name,
start_ts=ts,
local_label=local_label,
local_ip=file_local_ip,
local_file=path,
)
# Capture structured fields when present
sp.http_method = str(data.get("http.method")) if data.get("http.method") is not None else sp.http_method
sp.http_path = str(data.get("http.path")) if data.get("http.path") is not None else sp.http_path
sp.http_url = str(data.get("http.url")) if data.get("http.url") is not None else sp.http_url
sp.client_ip = str(data.get("client.ip")) if data.get("client.ip") is not None else sp.client_ip
# Status code if present
if data.get("http.status_code") is not None:
try:
sp.status_code = int(data.get("http.status_code"))
except Exception:
pass
if data.get("request_is_sync") is not None:
try:
sp.request_is_sync = bool(data.get("request_is_sync"))
except Exception:
pass
spans_by_key[key] = sp
collect_known_ips_from_span(sp, known_ips)
elif msg == "span_end":
sp = spans_by_key.get(key)
if sp is None:
# Create minimal span if begin was not seen in provided files
logger.error(
"span_end without matching span_begin (tid=%s sid=%s) in %s; creating synthetic span",
tid, sid, path,
)
sp = Span(
trace_id=tid,
span_id=sid or f"unknown_{len(spans_by_key)}",
parent_span_id=str(psid) if psid else None,
kind=kind or "SERVER",
name=name,
start_ts=ts,
local_label=local_label,
local_ip=file_local_ip,
local_file=path,
)
spans_by_key[key] = sp
collect_known_ips_from_span(sp, known_ips)
# Update structured fields from end event (often richer)
try:
if data.get("http.method") is not None:
sp.http_method = str(data.get("http.method"))
if data.get("http.path") is not None:
sp.http_path = str(data.get("http.path"))
if data.get("http.url") is not None:
sp.http_url = str(data.get("http.url"))
if data.get("client.ip") is not None:
sp.client_ip = str(data.get("client.ip"))
if data.get("http.status_code") is not None:
try:
sp.status_code = int(data.get("http.status_code"))
except Exception:
pass
if data.get("request_is_sync") is not None:
sp.request_is_sync = bool(data.get("request_is_sync"))
except Exception:
pass
# Duration may be present as duration_ms attribute in JSON; otherwise leave None
dur = data.get("duration_ms")
try:
sp.duration_ms = float(dur) if dur is not None else sp.duration_ms
if sp.duration_ms is not None and sp.duration_ms < 0:
logger.warning(
"Negative duration_ms for span: tid=%s sid=%s duration_ms=%s",
sp.trace_id, sp.span_id, sp.duration_ms,
)
except Exception:
pass
# Prefer explicit outbound request duration if present
try:
req_dur = data.get("request_duration_ms")
if req_dur is not None:
sp.request_duration_ms = float(req_dur)
if sp.request_duration_ms < 0:
logger.warning(
"Negative request_duration_ms for CLIENT span: tid=%s sid=%s duration_ms=%s",
sp.trace_id, sp.span_id, sp.request_duration_ms,
)
except Exception:
pass
# Retroactively assign local_ip to any earlier spans from this file that were created
# before the 'my_addresses' record was seen
if file_local_ip:
for sp in spans_by_key.values():
if sp.local_file == path and sp.local_ip is None:
sp.local_ip = file_local_ip
else:
logger.error("Did not see my_addresses for file %s; local_ip unknown for its spans", path)
# Sort spans by start_ts
spans = sorted(spans_by_key.values(), key=lambda s: s.start_ts)
# Post-parse validation: check unresolved parent_span_id references
by_trace: Dict[str, set[str]] = {}
for sp in spans:
by_trace.setdefault(sp.trace_id, set()).add(sp.span_id)
for sp in spans:
if sp.parent_span_id and sp.parent_span_id not in by_trace.get(sp.trace_id, set()):
logger.error(
"Unresolved parent_span_id: tid=%s sid=%s parent_sid=%s (no matching span_begin/end in inputs)",
sp.trace_id, sp.span_id, sp.parent_span_id,
)
return spans, sorted(known_ips)
def normalize_hostname_to_ip(host: str, known_ips: List[str], host_map: Dict[str, str]) -> str:
# If host contains a dashed IP that matches a known dotted IP, replace with that IP
# First, use explicit host->ip mappings from the log
if host in host_map:
return host_map[host]
# Prefer explicit matches against our known IPs (dotted or dashed forms)
for ip in known_ips:
dashed_ip = ip.replace('.', '-')
if dashed_ip in host or ip in host:
return ip
# If the hostname includes a dashed-quad anywhere, extract and return only the dotted IP
m = DASHED_IP_RE.search(host)
if m:
dashed = m.group("dashed")
dotted = dashed.replace("-", ".")
return dotted
# If the hostname includes a dotted-quad anywhere, return that IP
m2 = re.search(r"\b(\d{1,3}(?:\.\d{1,3}){3})\b", host)
if m2:
return m2.group(1)
return host
def build_plantuml(spans: List[Span], filter_trace: Optional[str], known_ips: List[str], ip_alias_map: Dict[str, str]) -> str:
lines: List[str] = []
logged_replacements: set[Tuple[str, str]] = set() # (original, replaced)
lines.append("@startuml")
# Collect participants from spans
participants: Dict[str, str] = {} # alias -> label
msgs: List[Message] = []
directives: List[Directive] = []
unaliased_ips_logged: set[str] = set()
# Build a per-file preferred local label from alias map by looking at peers in the same file
file_preferred_label: Dict[str, str] = {}
file_label_counts: Dict[str, Dict[str, int]] = {}
for sp in spans:
fname = sp.local_file or ""
if not fname:
continue
# Consider both SERVER remote and CLIENT host using structured attributes
if sp.kind == "SERVER" and sp.client_ip:
norm = normalize_hostname_to_ip(sp.client_ip, known_ips, {})
if norm in ip_alias_map:
label = ip_alias_map[norm]
file_label_counts.setdefault(fname, {}).setdefault(label, 0)
file_label_counts[fname][label] += 1
elif sp.kind == "CLIENT" and sp.http_url:
try:
host = urlparse(sp.http_url).hostname or ""
except Exception:
host = ""
if host:
norm = normalize_hostname_to_ip(host, known_ips, {})
if norm in ip_alias_map:
label = ip_alias_map[norm]
file_label_counts.setdefault(fname, {}).setdefault(label, 0)
file_label_counts[fname][label] += 1
for fname, counts in file_label_counts.items():
best_label = max(counts.items(), key=lambda kv: kv[1])[0]
file_preferred_label[fname] = best_label
for sp in spans:
if filter_trace and sp.trace_id != filter_trace:
continue
# Compute depth (nesting) via parent chain length within the same trace
depth = 0
try:
seen = set()
cur = sp.parent_span_id
while cur:
if (sp.trace_id, cur) in seen:
break
seen.add((sp.trace_id, cur))
depth += 1
parent = next((p for p in spans if p.trace_id == sp.trace_id and p.span_id == cur), None)
cur = parent.parent_span_id if parent else None
except Exception:
depth = 0
color = _hex_color_for_trace(sp.trace_id, depth)
trace_tag = f"[{sp.trace_id[:4]}-{sp.span_id[:2]}]"
time_str = sp.start_ts.strftime("%H:%M:%S")
# Prefer explicit outbound request duration for CLIENT spans if present
if sp.kind == "CLIENT" and sp.request_duration_ms is not None:
dur_str = f" ({_fmt_secs(sp.request_duration_ms)})"
else:
dur_str = f" ({_fmt_secs(sp.duration_ms)})" if sp.duration_ms is not None else ""
if sp.kind == "SERVER":
# Structured attributes required: client_ip, http_method, http_path
remote = sp.client_ip
method = sp.http_method
path = sp.http_path
if not (remote and method and path):
logger.warning(
"Skipping SERVER span due to missing structured fields: tid=%s sid=%s name=%s client.ip=%s http.method=%s http.path=%s",
sp.trace_id, sp.span_id, sp.name, sp.client_ip, sp.http_method, sp.http_path,
)
continue
label = f"{method} {_normalize_cmapi_path(path)}"
# Server status code
code_str = f" [{sp.status_code}]" if sp.status_code is not None else ""
if sp.status_code is None:
logger.warning(
"Missing http.status_code for SERVER span at render time: tid=%s sid=%s method=%s path=%s",
sp.trace_id, sp.span_id, method, path,
)
# Replace loopback with file's real IP if available
if remote in {"127.0.0.1", "::1"} and sp.local_ip:
orig = remote
remote = sp.local_ip
key = (orig, remote)
if key not in logged_replacements:
file_label = os.path.basename(sp.local_file) if sp.local_file else "?"
logger.info("Loopback remapped (SERVER): %s -> %s [file=%s]", orig, remote, file_label)
logged_replacements.add(key)
# do not filter self traffic
# If remote is an IP, keep as-is; else use hostname
# apply aliasing for IPs
# Log unaliased IPs
if is_ip(remote) and remote not in ip_alias_map and remote not in unaliased_ips_logged:
logger.info("Unaliased IP encountered (SERVER remote): %s. Consider providing --alias %s=NAME", remote, remote)
unaliased_ips_logged.add(remote)
remote_label = ip_alias_map.get(remote, remote)
remote_alias = normalize_participant(remote_label)
participants.setdefault(remote_alias, remote_label)
# Prefer local IP alias (e.g., mcs1); otherwise use the local IP string.
if sp.local_ip:
if is_ip(sp.local_ip) and sp.local_ip not in ip_alias_map and sp.local_ip not in unaliased_ips_logged:
logger.info("Unaliased IP encountered (SERVER local): %s. Consider providing --alias %s=NAME", sp.local_ip, sp.local_ip)
unaliased_ips_logged.add(sp.local_ip)
local_label_effective = ip_alias_map.get(sp.local_ip, sp.local_ip)
else:
# As a last resort, use filename label
local_label_effective = sp.local_label
local_alias = normalize_participant(local_label_effective)
participants.setdefault(local_alias, local_label_effective)
arrow = _colorized_arrow('->', color) # SERVER handling is synchronous from remote to local
msgs.append(Message(sp.start_ts, remote_alias, local_alias, f"{trace_tag} [{time_str}] {label}{dur_str}{code_str}", arrow))
# Add a note for the request with trace tag and start timestamp
directives.append(Directive(sp.start_ts, f"note over {remote_alias}, {local_alias}: {trace_tag} [{time_str}]"))
# Activate server processing lifeline only if duration is known and >= threshold
if sp.duration_ms is not None and sp.duration_ms >= ACTIVATION_MIN_MS:
directives.append(Directive(sp.start_ts, f"activate {local_alias} {color}"))
try:
srv_end = sp.start_ts + dt.timedelta(milliseconds=sp.duration_ms)
directives.append(Directive(srv_end, f"deactivate {local_alias}"))
except Exception:
pass
elif sp.duration_ms is None:
# Log that we couldn't determine server processing duration
logger.warning(
"No duration for SERVER span: tid=%s sid=%s method=%s path=%s local=%s",
sp.trace_id, sp.span_id, method, path, local_label_effective,
)
elif sp.kind == "CLIENT":
# Structured attributes required: http_url, http_method
if not (sp.http_url and sp.http_method):
logger.warning(
"Skipping CLIENT span due to missing structured fields: tid=%s sid=%s name=%s http.url=%s http.method=%s",
sp.trace_id, sp.span_id, sp.name, sp.http_url, sp.http_method,
)
continue
# Extract host and path from URL
try:
p = urlparse(sp.http_url)
host = p.hostname or sp.http_url
path = p.path or "/"
except Exception:
logger.warning("Failed to parse URL for CLIENT span (tid=%s sid=%s): %s", sp.trace_id, sp.span_id, sp.http_url)
host = sp.http_url
path = sp.http_url
label = f"{sp.http_method} {_normalize_cmapi_path(path)}"
# Replace loopback with file's real IP if available
if host in {"127.0.0.1", "::1"} and sp.local_ip:
orig = host
host = sp.local_ip
key = (orig, host)
if key not in logged_replacements:
file_label = os.path.basename(sp.local_file) if sp.local_file else "?"
logger.info("Loopback remapped (CLIENT): %s -> %s [file=%s]", orig, host, file_label)
logged_replacements.add(key)
# Normalize hostnames that embed an IP in dashed form to dotted IP when recognized
new_host = normalize_hostname_to_ip(host, known_ips, {})
if new_host != host:
key = (host, new_host)
if key not in logged_replacements:
file_label = os.path.basename(sp.local_file) if sp.local_file else "?"
logger.info("Host normalized: %s -> %s [file=%s]", host, new_host, file_label)
logged_replacements.add(key)
host = new_host
# do not filter self traffic
# If host resolved to IP, allow user alias to apply
if is_ip(host) and host not in ip_alias_map and host not in unaliased_ips_logged:
logger.info("Unaliased IP encountered (CLIENT host): %s. Consider providing --alias %s=NAME", host, host)
unaliased_ips_logged.add(host)
remote_label = ip_alias_map.get(host, host)
remote_alias = normalize_participant(remote_label)
participants.setdefault(remote_alias, remote_label)
# Prefer local IP alias (e.g., mcs1); otherwise use the local IP string.
if sp.local_ip:
if is_ip(sp.local_ip) and sp.local_ip not in ip_alias_map and sp.local_ip not in unaliased_ips_logged:
logger.info("Unaliased IP encountered (CLIENT local): %s. Consider providing --alias %s=NAME", sp.local_ip, sp.local_ip)
unaliased_ips_logged.add(sp.local_ip)
local_label_effective = ip_alias_map.get(sp.local_ip, sp.local_ip)
else:
# As a last resort, use filename label
local_label_effective = sp.local_label
local_alias = normalize_participant(local_label_effective)
participants.setdefault(local_alias, local_label_effective)
# Use async arrow for async requests
is_sync = sp.request_is_sync
arrow = _colorized_arrow('->' if (is_sync is True) else '->>' if (is_sync is False) else '->', color)
msgs.append(Message(sp.start_ts, local_alias, remote_alias, f"{trace_tag} [{time_str}] {label}{dur_str}", arrow))
# Add a response arrow back at end time if we know duration
end_ts: Optional[dt.datetime] = None
if sp.request_duration_ms is not None:
try:
end_ts = sp.start_ts + dt.timedelta(milliseconds=sp.request_duration_ms)
except Exception:
end_ts = None
elif sp.duration_ms is not None:
try:
end_ts = sp.start_ts + dt.timedelta(milliseconds=sp.duration_ms)
except Exception:
end_ts = None
if end_ts is not None:
end_time_str = end_ts.strftime("%H:%M:%S")
resp_arrow = '-->' if (is_sync is True or is_sync is None) else '-->>'
# Build details once
dur_val = None
if sp.request_duration_ms is not None:
dur_val = sp.request_duration_ms
elif sp.duration_ms is not None:
dur_val = sp.duration_ms
same_second = (end_time_str == time_str)
# Response label: always include method/path; timestamp first
resp_label_parts = ["Response", f"[{end_time_str}]", label]
if sp.status_code is not None:
code_token = str(sp.status_code)
if sp.status_code != 200:
code_token = f"<color:red>{code_token}</color>"
resp_label_parts.append(f"[{code_token}]")
else:
logger.error(
"Missing status_code for CLIENT response: tid=%s sid=%s url=%s",
sp.trace_id, sp.span_id, sp.http_url,
)
resp_label = ' '.join(resp_label_parts)
msgs.append(Message(end_ts, remote_alias, local_alias, f"{trace_tag} {resp_label}", _colorized_arrow(resp_arrow, color)))
if same_second:
# Single combined note at the (end) time with tag and duration only
parts = [f"{trace_tag}"]
if dur_val is not None:
parts.append(f"dur={_fmt_secs(dur_val)}")
note_text = f"note over {local_alias}, {remote_alias}: " + ", ".join(parts)
directives.append(Directive(end_ts, note_text))
else:
# Two separate notes: request note at start, response note at end
directives.append(Directive(sp.start_ts, f"note over {local_alias}, {remote_alias}: {trace_tag} [{time_str}]"))
parts = [f"{trace_tag}"]
if dur_val is not None:
parts.append(f"dur={_fmt_secs(dur_val)}")
note_text = f"note over {remote_alias}, {local_alias}: " + ", ".join(parts)
directives.append(Directive(end_ts, note_text))
else:
# No end time available; emit only the request note at start
directives.append(Directive(sp.start_ts, f"note over {local_alias}, {remote_alias}: {trace_tag} [{time_str}]"))
logger.info(
"No duration for CLIENT request: tid=%s sid=%s method=%s url=%s",
sp.trace_id, sp.span_id, sp.http_method, sp.http_url,
)
# Emit participants sorted by label (case-insensitive)
items = list(participants.items()) # (alias, label)
items.sort(key=lambda x: x[1].lower())
def emit_part(alias: str, label: str):
if alias == label and alias.isidentifier():
lines.append(f"participant {alias}")
else:
lines.append(f"participant \"{label}\" as {alias}")
for alias, label in items:
emit_part(alias, label)
# Sort and emit messages and directives merged by time
msgs.sort(key=lambda m: m.ts)
directives.sort(key=lambda d: d.ts)
i = j = 0
while i < len(msgs) or j < len(directives):
if j >= len(directives) or (i < len(msgs) and msgs[i].ts <= directives[j].ts):
m = msgs[i]
lines.append(f"{m.src} {m.arrow} {m.dst} : {m.label}")
i += 1
else:
d = directives[j]
lines.append(d.text)
j += 1
lines.append("@enduml")
return "\n".join(lines) + "\n"
def main(argv: Optional[List[str]] = None) -> int:
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
ap = argparse.ArgumentParser(description="Convert CMAPI tracing JSONL logs to PlantUML sequence diagram")
ap.add_argument("paths", nargs="+", help="One or more JSONL log files.")
ap.add_argument("--filter-trace", help="Only include spans for the given trace_id (tid)")
ap.add_argument("--alias", action="append", default=[], metavar="IP=NAME", help="Assign a label NAME to IP. May be used multiple times.")
ap.add_argument("--list-ips", action="store_true", help="Print only the set of IP addresses encountered and exit")
ap.add_argument("--output", default="diagram.puml", help="Output PlantUML path (default: diagram.puml)")
args = ap.parse_args(argv)
if args.list_ips:
logs = args.paths
if not logs:
ap.error("At least one log path is required with --list-ips")
spans, known_ips = parse_jsonl_logs(logs)
else:
# Determine logs and output path
logs = args.paths
output_path = args.output
if not logs:
ap.error("Provide at least one JSONL log path")
spans, known_ips = parse_jsonl_logs(logs)
# Build alias map from args.alias entries (used by both list-ips and diagram output)
ip_alias_map: Dict[str, str] = {}
for item in args.alias:
if "=" in item:
ip, name = item.split("=", 1)
ip = ip.strip()
name = name.strip()
if ip:
ip_alias_map[ip] = name
else:
logger.error("Invalid alias format: %s", item)
return 2
if args.list_ips:
to_print = sorted(set(known_ips))
for ip in to_print:
parts = [ip]
if ip in ip_alias_map:
parts.append(f"alias={ip_alias_map[ip]}")
print(" ".join(parts))
return 0
puml = build_plantuml(spans, args.filter_trace, known_ips, ip_alias_map)
if args.list_ips:
# Already handled above, but keep structure consistent
return 0
# Write to output file
with open(output_path, "w", encoding="utf-8") as f:
f.write(puml)
logger.info("Diagram written to %s", output_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -43,4 +43,12 @@ class TraceSpan:
def record_exception(self, exc: BaseException) -> None:
self.tracer._notify_exception(self, exc)
def to_flat_dict(self) -> Dict[str, Any]:
fd = self.__dict__.copy()
fd['span_name'] = fd.pop('name') # name field is reserved in log records
# Remove non-serializable references
fd.pop('tracer', None)
attributes = fd.pop("attributes")
fd.update(attributes)
return fd

View File

@@ -8,7 +8,6 @@ import cherrypy
from tracing.tracer import get_tracer
def _on_request_start() -> None:
req = cherrypy.request
tracer = get_tracer()
@@ -22,20 +21,27 @@ def _on_request_start() -> None:
trace_id, parent_span_id = tracer.extract_traceparent(headers)
tracer.set_incoming_context(trace_id, parent_span_id)
span_name = f"{getattr(req, 'method', 'HTTP')} {getattr(req, 'path_info', '/')}"
requester_host = getattr(getattr(req, 'remote', None), 'ip', '')
method = getattr(req, 'method', 'HTTP')
path = getattr(req, 'path_info', '/')
if requester_host:
span_name = f"{requester_host} --> {method} {path}"
else:
span_name = f"{method} {path}"
ctx = tracer.start_as_current_span(span_name, kind="SERVER")
span = ctx.__enter__()
span.set_attribute('http.method', getattr(req, 'method', ''))
span.set_attribute('http.path', getattr(req, 'path_info', ''))
span.set_attribute('client.ip', getattr(getattr(req, 'remote', None), 'ip', ''))
span.set_attribute('client.ip', requester_host)
span.set_attribute('instance.hostname', socket.gethostname())
safe_headers = {k: v for k, v in headers.items() if k.lower() not in {'authorization', 'x-api-key'}}
span.set_attribute('sentry.incoming_headers', safe_headers)
req._trace_span_ctx = ctx
req._trace_span = span
tracer.inject_traceparent(cherrypy.response.headers) # type: ignore[arg-type]
tracer.inject_traceparent(cherrypy.response.headers)
def _on_request_end() -> None:

View File

@@ -1,10 +1,17 @@
"""Async sibling of TracedSession."""
from typing import Any
import logging
import time
import aiohttp
from tracing.tracer import get_tracer
# Limit for raw JSON string preview (in characters)
_PREVIEW_MAX_CHARS = 512
logger = logging.getLogger("tracer")
class TracedAsyncSession(aiohttp.ClientSession):
async def _request(
@@ -22,6 +29,7 @@ class TracedAsyncSession(aiohttp.ClientSession):
with tracer.start_as_current_span(span_name, kind="CLIENT") as span:
span.set_attribute("http.method", method)
span.set_attribute("http.url", url_text)
span.set_attribute("request_is_sync", False)
tracer.inject_outbound_headers(headers)
try:
response = await super()._request(method, str_or_url, *args, **kwargs)
@@ -30,7 +38,11 @@ class TracedAsyncSession(aiohttp.ClientSession):
raise
else:
span.set_attribute("http.status_code", response.status)
await _record_outbound_json_preview(response, span)
return response
finally:
duration_ms = (time.time_ns() - span.start_ns) / 1_000_000.0
span.set_attribute("request_duration_ms", duration_ms)
def create_traced_async_session(**kwargs: Any) -> TracedAsyncSession:
@@ -38,3 +50,20 @@ def create_traced_async_session(**kwargs: Any) -> TracedAsyncSession:
async def _record_outbound_json_preview(response: aiohttp.ClientResponse, span) -> None:
"""If response is JSON, attach small part of it to span
We don't use streaming in aiohttp, so reading text is safe here.
"""
try:
content_type = str(response.headers.get('Content-Type', '')).lower()
if 'application/json' not in content_type:
return
text = await response.text()
if text is None:
text = ""
span.set_attribute('http.response.body.size', len(text))
span.set_attribute('http.response.json', text[:_PREVIEW_MAX_CHARS])
except Exception:
logger.exception("Could not extract JSON response body")
return None

View File

@@ -1,9 +1,16 @@
"""Customized requests.Session that automatically traces outbound HTTP calls."""
from typing import Any, Optional
import logging
import time
import requests
from tracing.tracer import get_tracer
from tracing.tracer import get_tracer, TraceSpan
# Limit for raw JSON string preview (in characters)
_PREVIEW_MAX_CHARS = 512
logger = logging.getLogger("tracer")
class TracedSession(requests.Session):
@@ -19,6 +26,7 @@ class TracedSession(requests.Session):
with tracer.start_as_current_span(span_name, kind="CLIENT") as span:
span.set_attribute("http.method", method)
span.set_attribute("http.url", url)
span.set_attribute("request_is_sync", True)
tracer.inject_outbound_headers(headers)
try:
@@ -28,7 +36,11 @@ class TracedSession(requests.Session):
raise
else:
span.set_attribute("http.status_code", response.status_code)
_record_outbound_json_preview(response, span)
return response
finally:
duration_ms = (time.time_ns() - span.start_ns) / 1_000_000.0
span.set_attribute("request_duration_ms", duration_ms)
_default_session: Optional[TracedSession] = None
@@ -41,4 +53,14 @@ def get_traced_session() -> TracedSession:
return _default_session
def _record_outbound_json_preview(response: requests.Response, span: TraceSpan) -> None:
"""If response is JSON, attach small part of it to span"""
try:
content_type = str(response.headers.get('Content-Type', '')).lower()
if 'application/json' not in content_type:
return
text = response.text # requests will decode using inferred/declared encoding
span.set_attribute('http.response.body.size', len(text))
span.set_attribute('http.response.json', text[:_PREVIEW_MAX_CHARS])
except Exception:
logger.exception("Could not extract JSON response body")

View File

@@ -2,37 +2,63 @@ import logging
import time
from typing import Any, Dict, Optional
import cherrypy
from mcs_node_control.models.node_config import NodeConfig
from tracing.tracer import TracerBackend, TraceSpan
from tracing.utils import swallow_exceptions
logger = logging.getLogger("tracing")
logger = logging.getLogger("tracer")
json_logger = logging.getLogger("json_trace")
class TraceparentBackend(TracerBackend):
"""Default backend that logs span lifecycle and mirrors events/status."""
def __init__(self):
my_addresses = list(NodeConfig().get_network_addresses())
logger.info("My addresses: %s", my_addresses)
json_logger.info("my_addresses", extra={"my_addresses": my_addresses})
@swallow_exceptions
def on_span_start(self, span: TraceSpan) -> None:
logger.info(
"span_begin name=%s kind=%s trace_id=%s span_id=%s parent=%s attrs=%s",
"span_begin name='%s' kind=%s tid=%s sid=%s%s",
span.name, span.kind, span.trace_id, span.span_id,
span.parent_span_id, span.attributes,
f' psid={span.parent_span_id}' if span.parent_span_id else '',
)
json_logger.info("span_begin", extra=span.to_flat_dict())
@swallow_exceptions
def on_span_end(self, span: TraceSpan, exc: Optional[BaseException]) -> None:
duration_ms = (time.time_ns() - span.start_ns) / 1_000_000
end_ns = time.time_ns()
duration_ms = (end_ns - span.start_ns) / 1_000_000
span.set_attribute("duration_ms", duration_ms)
span.set_attribute("end_ns", end_ns)
# Try to set status code if not already set
if span.kind == "SERVER" and "http.status_code" not in span.attributes:
try:
status_str = str(cherrypy.response.status)
code = int(status_str.split()[0])
span.set_attribute("http.status_code", code)
except Exception:
pass
logger.info(
"span_end name=%s kind=%s trace_id=%s span_id=%s parent=%s duration_ms=%.3f attrs=%s",
"span_end name='%s' kind=%s tid=%s sid=%s%s duration_ms=%.3f",
span.name, span.kind, span.trace_id, span.span_id,
span.parent_span_id, duration_ms, span.attributes,
f' psid={span.parent_span_id}' if span.parent_span_id else '',
duration_ms,
)
json_logger.info("span_end", extra=span.to_flat_dict())
@swallow_exceptions
def on_span_event(self, span: TraceSpan, name: str, attrs: Dict[str, Any]) -> None:
logger.info(
"span_event name=%s trace_id=%s span_id=%s attrs=%s",
name, span.trace_id, span.span_id, attrs,
"span_event name='%s' tid=%s sid=%s%s attrs=%s",
name, span.trace_id, span.span_id,
f' psid={span.parent_span_id}' if span.parent_span_id else '',
attrs,
)
json_logger.info("span_event", extra=span.to_flat_dict())

View File

@@ -50,5 +50,3 @@ def parse_traceparent(header: str) -> Optional[Tuple[str, str, str]]:
except Exception:
logger.exception("Failed to parse traceparent: %s", header)
return None

View File

@@ -20,6 +20,7 @@ select calgetversion()=mcsgetversion();
select calviewtablelock("t1");
select calcleartablelock(0);
select callastinsertid("t1");
--replace_regex /(Running SQL statements 1)/Running SQL statements 0/
select calgetsqlcount();
DROP TABLE t1;

View File

@@ -429,7 +429,7 @@ EXPORT void log(const std::string& msg, logging::LOG_TYPE = logging::LOG_TYPE_CR
EXPORT void log_errno(const std::string& msg, logging::LOG_TYPE = logging::LOG_TYPE_CRITICAL);
EXPORT void errString(int rc, std::string& errMsg);
const struct timespec FIVE_MIN_TIMEOUT = {300, 0};
// Note: Unresponsive timeouts are now configurable in the master and not defined here.
/* Function identifiers used for master-slave communication.

View File

@@ -107,6 +107,16 @@ MasterDBRMNode::MasterDBRMNode()
MSG_TIMEOUT.tv_sec = secondsToWait;
else
MSG_TIMEOUT.tv_sec = 20;
// Configurable unresponsive timeout (default 300 seconds)
haltTimeout = {300, 0};
std::string unrespStr = config->getConfig("SystemConfig", "DBRMUnresponsiveTimeout");
int unrespSecs = config->fromText(unrespStr);
if (unrespSecs > 0)
{
haltTimeout.tv_sec = unrespSecs;
haltTimeout.tv_nsec = 0;
}
}
MasterDBRMNode::~MasterDBRMNode()
@@ -534,16 +544,16 @@ void MasterDBRMNode::msgProcessor()
retrycmd:
uint32_t haltloops = 0;
while (halting && ++haltloops < static_cast<uint32_t>(FIVE_MIN_TIMEOUT.tv_sec))
while (halting && ++haltloops < static_cast<uint32_t>(haltTimeout.tv_sec))
sleep(1);
slaveLock.lock();
if (haltloops == FIVE_MIN_TIMEOUT.tv_sec)
if (haltloops == static_cast<uint32_t>(haltTimeout.tv_sec))
{
ostringstream os;
os << "A node is unresponsive for cmd = " << (uint32_t)cmd << ", no reconfigure in at least "
<< FIVE_MIN_TIMEOUT.tv_sec << " seconds. Setting read-only mode.";
<< haltTimeout.tv_sec << " seconds. Setting read-only mode.";
log(os.str());
readOnly = true;
halting = false;
@@ -832,7 +842,9 @@ int MasterDBRMNode::gatherResponses(uint8_t cmd, uint32_t cmdMsgLength, vector<B
{
// can't just block for 5 mins
timespec newtimeout = {10, 0};
uint32_t ntRetries = FIVE_MIN_TIMEOUT.tv_sec / newtimeout.tv_sec;
uint32_t ntRetries = (newtimeout.tv_sec > 0) ? (haltTimeout.tv_sec / newtimeout.tv_sec) : 0;
if (ntRetries == 0)
ntRetries = 1;
uint32_t retries = 0;
while (++retries < ntRetries && tmp->length() == 0 && !halting)

View File

@@ -258,6 +258,9 @@ class MasterDBRMNode
volatile bool die, halting;
bool reloadCmd;
mutable bool readOnly;
// Maximum time to wait for worker responses/reconfigure before forcing read-only
// Loaded from Columnstore.xml: SystemConfig/DBRMUnresponsiveTimeout (default: 300 seconds)
struct timespec haltTimeout;
mutable bool waitToFinishJobs{false};
struct timespec MSG_TIMEOUT;
};