You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-09-11 08:50:45 +03:00
Record part of the response into the span
This commit is contained in:
@@ -176,4 +176,5 @@ class JsonFormatter(logging.Formatter):
|
|||||||
for k, v in record.__dict__.items():
|
for k, v in record.__dict__.items():
|
||||||
if k not in self.skip_fields:
|
if k not in self.skip_fields:
|
||||||
data[k] = v
|
data[k] = v
|
||||||
return json.dumps(data, ensure_ascii=False, sort_keys=True)
|
# Allow non-serializable extras (e.g., bytes, datetime) to be stringified
|
||||||
|
return json.dumps(data, ensure_ascii=False, sort_keys=True, default=str)
|
@@ -7,6 +7,7 @@ import socket
|
|||||||
from os import mkdir, replace, chown
|
from os import mkdir, replace, chown
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from shutil import copyfile
|
from shutil import copyfile
|
||||||
|
from typing import Iterator
|
||||||
from xml.dom import minidom # to pick up pretty printing functionality
|
from xml.dom import minidom # to pick up pretty printing functionality
|
||||||
|
|
||||||
from lxml import etree
|
from lxml import etree
|
||||||
@@ -366,12 +367,10 @@ class NodeConfig:
|
|||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_network_addresses(self):
|
def get_network_addresses(self) -> Iterator[str]:
|
||||||
"""Retrievs the list of the network addresses
|
"""Retrievs the list of the network addresses
|
||||||
|
|
||||||
Generator that yields network interface addresses.
|
Generator that yields network interface addresses.
|
||||||
|
|
||||||
:rtype: str
|
|
||||||
"""
|
"""
|
||||||
for ni in get_network_interfaces():
|
for ni in get_network_interfaces():
|
||||||
for fam in [socket.AF_INET, socket.AF_INET6]:
|
for fam in [socket.AF_INET, socket.AF_INET6]:
|
||||||
|
@@ -8,36 +8,6 @@ import cherrypy
|
|||||||
|
|
||||||
from tracing.tracer import get_tracer
|
from tracing.tracer import get_tracer
|
||||||
|
|
||||||
|
|
||||||
def _capture_short_json_response(span) -> None:
|
|
||||||
"""If response is JSON and is already serialized (bytes/str), put it into span attrs.
|
|
||||||
|
|
||||||
- Only captures when body is bytes or str (already materialized JSON string).
|
|
||||||
- Truncates to 500 bytes; records original size
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
headers = cherrypy.response.headers
|
|
||||||
content_type = str(headers.get('Content-Type', '')).lower()
|
|
||||||
if 'application/json' not in content_type:
|
|
||||||
return
|
|
||||||
body = cherrypy.response.body # may be bytes/str or an iterable
|
|
||||||
max_len = 500
|
|
||||||
text = None
|
|
||||||
# Only handle concrete bytes/str bodies (already serialized JSON)
|
|
||||||
if isinstance(body, (bytes, str)):
|
|
||||||
if isinstance(body, bytes):
|
|
||||||
text = body.decode('utf-8', errors='replace')
|
|
||||||
else:
|
|
||||||
text = body
|
|
||||||
# Only set attribute if we could safely materialize text
|
|
||||||
if text is not None:
|
|
||||||
value = text[:max_len]
|
|
||||||
span.set_attribute('http.response.body.size', len(text))
|
|
||||||
span.set_attribute('http.response.body', value)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def _on_request_start() -> None:
|
def _on_request_start() -> None:
|
||||||
req = cherrypy.request
|
req = cherrypy.request
|
||||||
tracer = get_tracer()
|
tracer = get_tracer()
|
||||||
@@ -86,7 +56,6 @@ def _on_request_end() -> None:
|
|||||||
span = getattr(req, "_trace_span", None)
|
span = getattr(req, "_trace_span", None)
|
||||||
if span is not None and status_code is not None:
|
if span is not None and status_code is not None:
|
||||||
span.set_attribute('http.status_code', status_code)
|
span.set_attribute('http.status_code', status_code)
|
||||||
_capture_short_json_response(span)
|
|
||||||
ctx = getattr(req, "_trace_span_ctx", None)
|
ctx = getattr(req, "_trace_span_ctx", None)
|
||||||
if ctx is not None:
|
if ctx is not None:
|
||||||
try:
|
try:
|
||||||
|
@@ -1,11 +1,17 @@
|
|||||||
"""Async sibling of TracedSession."""
|
"""Async sibling of TracedSession."""
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
import logging
|
||||||
import time
|
import time
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
|
||||||
from tracing.tracer import get_tracer
|
from tracing.tracer import get_tracer
|
||||||
|
|
||||||
|
# Limit for raw JSON string preview (in characters)
|
||||||
|
_PREVIEW_MAX_CHARS = 512
|
||||||
|
|
||||||
|
logger = logging.getLogger("tracer")
|
||||||
|
|
||||||
|
|
||||||
class TracedAsyncSession(aiohttp.ClientSession):
|
class TracedAsyncSession(aiohttp.ClientSession):
|
||||||
async def _request(
|
async def _request(
|
||||||
@@ -32,6 +38,7 @@ class TracedAsyncSession(aiohttp.ClientSession):
|
|||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
span.set_attribute("http.status_code", response.status)
|
span.set_attribute("http.status_code", response.status)
|
||||||
|
await _record_outbound_json_preview(response, span)
|
||||||
return response
|
return response
|
||||||
finally:
|
finally:
|
||||||
duration_ms = (time.time_ns() - span.start_ns) / 1_000_000.0
|
duration_ms = (time.time_ns() - span.start_ns) / 1_000_000.0
|
||||||
@@ -43,3 +50,20 @@ def create_traced_async_session(**kwargs: Any) -> TracedAsyncSession:
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def _record_outbound_json_preview(response: aiohttp.ClientResponse, span) -> None:
|
||||||
|
"""If response is JSON, attach small part of it to span
|
||||||
|
|
||||||
|
We don't use streaming in aiohttp, so reading text is safe here.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
content_type = str(response.headers.get('Content-Type', '')).lower()
|
||||||
|
if 'application/json' not in content_type:
|
||||||
|
return
|
||||||
|
text = await response.text()
|
||||||
|
if text is None:
|
||||||
|
text = ""
|
||||||
|
span.set_attribute('http.response.body.size', len(text))
|
||||||
|
span.set_attribute('http.response.json', text[:_PREVIEW_MAX_CHARS])
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Could not extract JSON response body")
|
||||||
|
return None
|
||||||
|
@@ -1,10 +1,16 @@
|
|||||||
"""Customized requests.Session that automatically traces outbound HTTP calls."""
|
"""Customized requests.Session that automatically traces outbound HTTP calls."""
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
import logging
|
||||||
import time
|
import time
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
from tracing.tracer import get_tracer
|
from tracing.tracer import get_tracer, TraceSpan
|
||||||
|
|
||||||
|
# Limit for raw JSON string preview (in characters)
|
||||||
|
_PREVIEW_MAX_CHARS = 512
|
||||||
|
|
||||||
|
logger = logging.getLogger("tracer")
|
||||||
|
|
||||||
|
|
||||||
class TracedSession(requests.Session):
|
class TracedSession(requests.Session):
|
||||||
@@ -30,6 +36,7 @@ class TracedSession(requests.Session):
|
|||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
span.set_attribute("http.status_code", response.status_code)
|
span.set_attribute("http.status_code", response.status_code)
|
||||||
|
_record_outbound_json_preview(response, span)
|
||||||
return response
|
return response
|
||||||
finally:
|
finally:
|
||||||
duration_ms = (time.time_ns() - span.start_ns) / 1_000_000.0
|
duration_ms = (time.time_ns() - span.start_ns) / 1_000_000.0
|
||||||
@@ -46,4 +53,14 @@ def get_traced_session() -> TracedSession:
|
|||||||
return _default_session
|
return _default_session
|
||||||
|
|
||||||
|
|
||||||
|
def _record_outbound_json_preview(response: requests.Response, span: TraceSpan) -> None:
|
||||||
|
"""If response is JSON, attach small part of it to span"""
|
||||||
|
try:
|
||||||
|
content_type = str(response.headers.get('Content-Type', '')).lower()
|
||||||
|
if 'application/json' not in content_type:
|
||||||
|
return
|
||||||
|
text = response.text # requests will decode using inferred/declared encoding
|
||||||
|
span.set_attribute('http.response.body.size', len(text))
|
||||||
|
span.set_attribute('http.response.json', text[:_PREVIEW_MAX_CHARS])
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Could not extract JSON response body")
|
||||||
|
@@ -2,6 +2,7 @@ import logging
|
|||||||
import time
|
import time
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
from mcs_node_control.models.node_config import NodeConfig
|
||||||
from tracing.tracer import TracerBackend, TraceSpan
|
from tracing.tracer import TracerBackend, TraceSpan
|
||||||
from tracing.utils import swallow_exceptions
|
from tracing.utils import swallow_exceptions
|
||||||
|
|
||||||
@@ -11,6 +12,10 @@ json_logger = logging.getLogger("json_trace")
|
|||||||
|
|
||||||
class TraceparentBackend(TracerBackend):
|
class TraceparentBackend(TracerBackend):
|
||||||
"""Default backend that logs span lifecycle and mirrors events/status."""
|
"""Default backend that logs span lifecycle and mirrors events/status."""
|
||||||
|
def __init__(self):
|
||||||
|
my_addresses = list(NodeConfig().get_network_addresses())
|
||||||
|
logger.info("My addresses: %s", my_addresses)
|
||||||
|
json_logger.info("my_addresses", extra={"my_addresses": my_addresses})
|
||||||
|
|
||||||
@swallow_exceptions
|
@swallow_exceptions
|
||||||
def on_span_start(self, span: TraceSpan) -> None:
|
def on_span_start(self, span: TraceSpan) -> None:
|
||||||
|
@@ -50,5 +50,3 @@ def parse_traceparent(header: str) -> Optional[Tuple[str, str, str]]:
|
|||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to parse traceparent: %s", header)
|
logger.exception("Failed to parse traceparent: %s", header)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user