1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/util/metrics/prometheus.py
Kenny Lee Sin Cheong c24a2fd22f metrics: rename namespace label to avoid conflict (PROJQUAY-9388) (#4746)
Rename namespace label to avoid conflict
2025-12-17 18:18:02 +00:00

306 lines
9.6 KiB
Python

import logging
import os
import socket
import sys
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from collections import namedtuple
from cachetools.func import lru_cache
from flask import g, request
from prometheus_client import REGISTRY, Counter, Gauge, Histogram, push_to_gateway
logger = logging.getLogger(__name__)
def get_namespace_bucket_mapping(app):
"""
Get namespace to bucket mapping from config.
Supports two formats:
1. List format: ["namespace1", "namespace2"] -> maps each to its own name
2. Dict format: {"bucket1": ["namespace1", "namespace2"], "bucket2": ["namespace3"]}
Returns a dict mapping namespace -> bucket_name.
"""
if not app:
return {}
try:
tracked = app.config.get("TRACKED_NAMESPACES", [])
if not tracked:
return {}
# If it's a dict, use it directly (namespace -> bucket mapping)
if isinstance(tracked, dict):
mapping = {}
for bucket_name, namespaces in tracked.items():
if isinstance(namespaces, (list, tuple)):
for ns in namespaces:
mapping[ns] = bucket_name
else:
# Single namespace string
mapping[namespaces] = bucket_name
return mapping
# If it's a list, map each namespace to itself (backward compatibility)
elif isinstance(tracked, (list, tuple)):
return {ns: ns for ns in tracked}
except:
pass
return {}
def get_namespace_label_for_counter(namespace_name, app):
"""
Get namespace label (bucket) for high-frequency counter metrics.
Returns bucket name if namespace is tracked, None otherwise.
Multiple namespaces can map to the same bucket.
"""
if not namespace_name:
return None
mapping = get_namespace_bucket_mapping(app)
return mapping.get(namespace_name)
# DB connections
db_pooled_connections_in_use = Gauge(
"quay_db_pooled_connections_in_use", "number of pooled db connections in use"
)
db_pooled_connections_available = Gauge(
"quay_db_pooled_connections_available", "number of pooled db connections available"
)
db_connect_calls = Counter(
"quay_db_connect_calls",
"number of connect() calls made to db",
)
db_close_calls = Counter(
"quay_db_close_calls",
"number of close() calls made to db",
)
request_duration = Histogram(
"quay_request_duration_seconds",
"seconds taken to process a request",
labelnames=["method", "route", "status", "namespace_name"],
)
# GC: DB table rows
gc_table_rows_deleted = Counter(
"quay_gc_table_rows_deleted", "number of table rows deleted by GC", labelnames=["table"]
)
# GC: Storage blob
gc_storage_blobs_deleted = Counter(
"quay_gc_storage_blobs_deleted", "number of storage blobs deleted"
)
# GC iterations
gc_repos_purged = Counter(
"quay_gc_repos_purged",
"number of repositories purged by the RepositoryGCWorker or NamespaceGCWorker",
)
gc_namespaces_purged = Counter(
"quay_gc_namespaces_purged", "number of namespaces purged by the NamespaceGCWorker"
)
gc_iterations = Counter("quay_gc_iterations", "number of iterations by the GCWorker")
secscan_request_duration = Histogram(
"quay_secscan_request_duration_seconds",
"seconds taken to make an index request to the secscan service",
labelnames=["method", "action", "status"],
)
secscan_index_layer_size = Histogram(
"quay_secscan_index_layer_size_bytes",
"bytes submitted to index to the secscan service",
)
INF = float("inf")
SECSCAN_RESULT_BUCKETS = (60, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, INF)
secscan_result_duration = Histogram(
"quay_secscan_result_duration_seconds",
"how long it takes to receive scan results after pushing an image",
buckets=SECSCAN_RESULT_BUCKETS,
)
PROMETHEUS_PUSH_INTERVAL_SECONDS = 30
ONE_DAY_IN_SECONDS = 60 * 60 * 24
@lru_cache(maxsize=1)
def process_grouping_key():
"""
Implements a grouping key based on the last argument used to run the current process.
https://github.com/prometheus/client_python#exporting-to-a-pushgateway
"""
return {
"host": socket.gethostname(),
"process_name": os.path.basename(sys.argv[-1]),
"pid": str(os.getpid()),
}
class PrometheusPlugin(object):
"""
Application plugin for reporting metrics to Prometheus.
"""
def __init__(self, app=None):
self.app = app
if app is not None:
self.state = self.init_app(app)
else:
self.state = None
def init_app(self, app):
pusher = ThreadPusher(app)
pusher.start()
# register extension with app
app.extensions = getattr(app, "extensions", {})
app.extensions["prometheus"] = pusher
return pusher
def __getattr__(self, name):
return getattr(self.state, name, None)
class ThreadPusher(threading.Thread):
def __init__(self, app):
super(ThreadPusher, self).__init__()
self.daemon = True
self._app = app
def run(self):
agg_url = self._app.config.get("PROMETHEUS_PUSHGATEWAY_URL")
while True:
# Practically disable this worker, if there is no pushgateway.
if agg_url is None or os.getenv("TEST", "false").lower() == "true":
time.sleep(ONE_DAY_IN_SECONDS)
continue
time.sleep(PROMETHEUS_PUSH_INTERVAL_SECONDS)
try:
push_to_gateway(
agg_url,
job=self._app.config.get("PROMETHEUS_NAMESPACE", "quay"),
registry=REGISTRY,
grouping_key=process_grouping_key(),
)
logger.debug(
"pushed registry to pushgateway at %s with grouping key %s",
agg_url,
process_grouping_key(),
)
except urllib.error.URLError:
# There are many scenarios when the gateway might not be running.
# These could be testing scenarios or simply processes racing to start.
# Rather than try to guess all of them, keep it simple and let it fail.
if os.getenv("DEBUGLOG", "false").lower() == "true":
logger.exception(
"failed to push registry to pushgateway at %s with grouping key %s",
agg_url,
process_grouping_key(),
)
else:
pass
def _extract_namespace_from_request():
"""
Extract namespace from Flask request.
Checks route parameters first, then tries to parse from path.
"""
# Try to get namespace from route parameters (most common case)
if hasattr(request, "view_args") and request.view_args:
namespace = request.view_args.get("namespace_name") or request.view_args.get("namespace")
if namespace:
return namespace
# Try to parse from request path
try:
path = request.path.strip("/")
if "/" in path:
# Try to parse as namespace/repository
parts = path.split("/", 1)
if len(parts) >= 1:
potential_namespace = parts[0]
# Basic validation: namespace should be alphanumeric with dashes/underscores
if potential_namespace and all(
c.isalnum() or c in "-_" for c in potential_namespace
):
return potential_namespace
except:
pass
return None
def timed_blueprint(bp, get_app=None):
"""
Decorates a blueprint to have its request duration tracked by Prometheus.
Includes namespace label for tracked namespaces.
Args:
bp: Flask Blueprint to decorate
get_app: Optional callable that returns the Flask app object when called.
If not provided, will attempt to get app from blueprint or request context.
"""
def _time_before_request():
g._request_start_time = time.time()
bp.before_request(_time_before_request)
def _get_app_from_context():
"""Try to get app from various sources without importing current_app."""
# First, try the provided get_app callback
if get_app:
try:
return get_app()
except:
pass
# Try to get from blueprint
if hasattr(bp, "app") and bp.app:
return bp.app
# Try to get from request context (Flask stores app in request context)
if hasattr(request, "environ"):
# Flask stores the app in the WSGI environ
app = request.environ.get("werkzeug.app")
if app:
return app
return None
def _time_after_request():
def f(r):
start = getattr(g, "_request_start_time", None)
if start is None:
return r
dur = time.time() - start
# Extract namespace from request
namespace_name = _extract_namespace_from_request()
# Get app using the provided mechanism
app = _get_app_from_context()
namespace_label = get_namespace_label_for_counter(namespace_name, app) if app else None
# Use "other" for untracked namespaces to keep cardinality low
label_value = namespace_label if namespace_label else "other"
request_duration.labels(
request.method, request.endpoint, r.status_code, label_value
).observe(dur)
return r
return f
bp.after_request(_time_after_request())
return bp