From d3a05331ef5af468f64bc76e8f7062ca90cf7ef4 Mon Sep 17 00:00:00 2001 From: Jordi Piriz Date: Mon, 3 Nov 2025 11:09:33 +0100 Subject: [PATCH] Revert "tracing: improving otlp handling (PROJQUAY-8902) (#4198)" This reverts commit 89e758846f52b57389a0fa844931e26977ca604b. --- app.py | 21 +- data/queue.py | 43 -- data/users/externalldap.py | 971 ++++++++++--------------------------- endpoints/v1/__init__.py | 10 - endpoints/v1/index.py | 38 -- endpoints/v1/registry.py | 35 -- endpoints/v1/tag.py | 16 - endpoints/v2/blob.py | 81 +--- endpoints/v2/catalog.py | 7 - endpoints/v2/manifest.py | 28 -- endpoints/v2/tag.py | 7 - endpoints/v2/v2auth.py | 9 - endpoints/web.py | 176 ------- storage/cloud.py | 247 ++-------- util/config/schema.py | 33 +- util/metrics/otel.py | 165 +------ util/test/test_otel.py | 61 --- 17 files changed, 329 insertions(+), 1619 deletions(-) delete mode 100644 util/test/test_otel.py diff --git a/app.py b/app.py index 9559eed1f..f1242b483 100644 --- a/app.py +++ b/app.py @@ -11,7 +11,6 @@ from flask_mail import Mail from flask_principal import Principal from opentelemetry.instrumentation.flask import FlaskInstrumentor from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor -from opentelemetry.trace.status import StatusCode from werkzeug.exceptions import HTTPException from werkzeug.middleware.proxy_fix import ProxyFix @@ -57,7 +56,7 @@ from util.ipresolver import IPResolver from util.label_validator import LabelValidator from util.log import filter_logs from util.marketplace import MarketplaceSubscriptionApi, MarketplaceUserApi -from util.metrics.otel import get_tracecontext, get_traceparent, init_exporter +from util.metrics.otel import init_exporter from util.metrics.prometheus import PrometheusPlugin from util.names import urn_generator from util.pullmetrics import PullMetricsBuilderModule @@ -371,27 +370,9 @@ def load_user(user_uuid): get_app_url = partial(get_app_url, app.config) if features.OTEL_TRACING: - - def request_hook(span, environ): - if span and span.is_recording(): - for header in request.headers: - span.set_attribute(header[0], str(header[1])) - - def response_hook(span, status, response_headers): - if span and span.is_recording(): - status_code = int(status.split()[0]) - if all([status_code >= 200, status_code < 400]): - span.set_status(StatusCode.OK) - else: - span.set_status(StatusCode.ERROR) - tparent = get_traceparent(span) - response_headers.append(("traceparent", tparent)) - FlaskInstrumentor().instrument_app( app, excluded_urls=app.config.get("OTEL_TRACING_EXCLUDED_URLS", None), - request_hook=request_hook, - response_hook=response_hook, ) Psycopg2Instrumentor().instrument() init_exporter(app.config) diff --git a/data/queue.py b/data/queue.py index 8d2309065..42a47ea51 100644 --- a/data/queue.py +++ b/data/queue.py @@ -4,10 +4,6 @@ from datetime import datetime, timedelta from prometheus_client import Counter, Gauge -from util.metrics.otel import StatusCode, get_tracecontext, trace - -tracer = trace.get_tracer("quay.queue") - from data.database import QueueItem, db, db_for_update, db_random_func from util.morecollections import AttrDict @@ -102,9 +98,6 @@ class WorkQueue(object): ~(QueueItem.queue_name << running_query) ) - @tracer.start_as_current_span( - "quay.queue.num_alive_jobs", record_exception=True, set_status_on_exception=True - ) def num_alive_jobs(self, canonical_name_list): """ Returns the number of alive queue items with a given prefix. @@ -123,9 +116,6 @@ class WorkQueue(object): .count() ) - @tracer.start_as_current_span( - "quay.queue.num_available_jobs_between", record_exception=True, set_status_on_exception=True - ) def num_available_jobs_between( self, available_min_time, available_max_time, canonical_name_list ): @@ -152,9 +142,6 @@ class WorkQueue(object): def _item_by_id_for_update(queue_id): return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get() - @tracer.start_as_current_span( - "quay.queue.get_metrics", record_exception=True, set_status_on_exception=True - ) def get_metrics(self): now = datetime.utcnow() name_match_query = self._name_match_query() @@ -174,18 +161,12 @@ class WorkQueue(object): return (running_count, available_not_running_count, available_count) - @tracer.start_as_current_span( - "quay.queue.update_metrics", record_exception=True, set_status_on_exception=True - ) def update_metrics(self): (running_count, available_not_running_count, available_count) = self.get_metrics() queue_items_locked.labels(self._queue_name).set(running_count) queue_items_available.labels(self._queue_name).set(available_count) queue_items_available_unlocked.labels(self._queue_name).set(available_not_running_count) - @tracer.start_as_current_span( - "quay.queue.has_retries_remaining", record_exception=True, set_status_on_exception=True - ) def has_retries_remaining(self, item_id): """ Returns whether the queue item with the given id has any retries remaining. @@ -198,9 +179,6 @@ class WorkQueue(object): except QueueItem.DoesNotExist: return False - @tracer.start_as_current_span( - "quay.queue.delete_namespaced_items", record_exception=True, set_status_on_exception=True - ) def delete_namespaced_items(self, namespace, subpath=None): """ Deletes all items in this queue that exist under the given namespace. @@ -212,9 +190,6 @@ class WorkQueue(object): queue_prefix = "%s/%s/%s%%" % (self._queue_name, namespace, subpath_query) return QueueItem.delete().where(QueueItem.queue_name**queue_prefix).execute() - @tracer.start_as_current_span( - "quay.queue.alive", record_exception=True, set_status_on_exception=True - ) def alive(self, canonical_name_list): """ Returns True if a job matching the canonical name list is currently processing or available. @@ -265,9 +240,6 @@ class WorkQueue(object): queue_item_puts.labels(self._queue_name).inc(len(current_batch)) remaining = remaining[batch_size:] - @tracer.start_as_current_span( - "quay.queue.get", record_exception=True, set_status_on_exception=True - ) def put(self, canonical_name_list, message, available_after=0, retries_remaining=5): """ Put an item, if it shouldn't be processed for some number of seconds, specify that amount as @@ -344,9 +316,6 @@ class WorkQueue(object): changed = set_unavailable_query.execute() return changed == 1 - @tracer.start_as_current_span( - "quay.queue.get", record_exception=True, set_status_on_exception=True - ) def get(self, processing_time=300, ordering_required=False): """ Get an available item and mark it as unavailable for the default of five minutes. @@ -382,9 +351,6 @@ class WorkQueue(object): } ) - @tracer.start_as_current_span( - "quay.queue.cancel", record_exception=True, set_status_on_exception=True - ) def cancel(self, item_id): """ Attempts to cancel the queue item with the given ID from the queue. @@ -394,15 +360,9 @@ class WorkQueue(object): count_removed = QueueItem.delete().where(QueueItem.id == item_id).execute() return count_removed > 0 - @tracer.start_as_current_span( - "quay.queue.complete", record_exception=True, set_status_on_exception=True - ) def complete(self, completed_item): self._currently_processing = not self.cancel(completed_item.id) - @tracer.start_as_current_span( - "quay.queue.incomplete", record_exception=True, set_status_on_exception=True - ) def incomplete(self, incomplete_item, retry_after=300, restore_retry=False): with self._transaction_factory(db): retry_date = datetime.utcnow() + timedelta(seconds=retry_after) @@ -421,9 +381,6 @@ class WorkQueue(object): except QueueItem.DoesNotExist: return False - @tracer.start_as_current_span( - "quay.queue.extend_processing", record_exception=True, set_status_on_exception=True - ) def extend_processing( self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION, updated_data=None ): diff --git a/data/users/externalldap.py b/data/users/externalldap.py index 41cf5265e..0f3ca9747 100644 --- a/data/users/externalldap.py +++ b/data/users/externalldap.py @@ -2,10 +2,6 @@ import logging import os from collections import namedtuple -from util.metrics.otel import StatusCode, get_tracecontext, trace - -tracer = trace.get_tracer("externalldap.py") - import ldap from ldap.controls import SimplePagedResultsControl from ldap.filter import escape_filter_chars, filter_format @@ -42,7 +38,6 @@ class LDAPConnectionBuilder(object): timeout=None, network_timeout=None, referrals=_DEFAULT_REFERRALS, - ctx=None, ): self._ldap_uri = ldap_uri self._user_dn = user_dn @@ -53,29 +48,15 @@ class LDAPConnectionBuilder(object): self._referrals = int(referrals) def get_connection(self): - - with tracer.start_as_current_span( - "quay.LDAPConnectionBuilder", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.dn": str(self._user_dn), - "ldap.network.timeout": self._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - }, - ) as span: - span.set_status(StatusCode.OK) - span.set_attribute("ldap.referrals", str(self._referrals)) - span.set_attribute("ldap.allow_tls_fallback", self._allow_tls_fallback) - span.set_attribute("ldap.timeout", self._timeout) - return LDAPConnection( - self._ldap_uri, - self._user_dn, - self._user_pw, - self._allow_tls_fallback, - self._timeout, - self._network_timeout, - self._referrals, - ) + return LDAPConnection( + self._ldap_uri, + self._user_dn, + self._user_pw, + self._allow_tls_fallback, + self._timeout, + self._network_timeout, + self._referrals, + ) class LDAPConnection(object): @@ -88,7 +69,6 @@ class LDAPConnection(object): timeout=None, network_timeout=None, referrals=_DEFAULT_REFERRALS, - ctx=None, ): self._ldap_uri = ldap_uri self._user_dn = user_dn @@ -98,56 +78,33 @@ class LDAPConnection(object): self._network_timeout = network_timeout self._referrals = int(referrals) self._conn = None - self._ctx = ctx def __enter__(self): trace_level = 2 if os.environ.get("USERS_DEBUG") == "1" else 0 - with tracer.start_as_current_span( - "ldap.initialize", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - }, - ) as span: - self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level) - self._conn.set_option(ldap.OPT_REFERRALS, self._referrals) - self._conn.set_option( - ldap.OPT_NETWORK_TIMEOUT, - self._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - ) - self._conn.set_option(ldap.OPT_TIMEOUT, self._timeout or _DEFAULT_TIMEOUT) - self._conn.set_option(ldap.OPT_X_KEEPALIVE_IDLE, _DEFAULT_KEEPALIVE_IDLE) - self._conn.set_option(ldap.OPT_X_KEEPALIVE_INTERVAL, _DEFAULT_KEEPALIVE_INTERVAL) - self._conn.set_option(ldap.OPT_X_KEEPALIVE_PROBES, _DEFAULT_KEEPALIVE_PROBES) - self._conn.set_option(ldap.OPT_RESTART, ldap.OPT_ON) + self._conn = ldap.initialize(self._ldap_uri, trace_level=trace_level) + self._conn.set_option(ldap.OPT_REFERRALS, self._referrals) + self._conn.set_option( + ldap.OPT_NETWORK_TIMEOUT, self._network_timeout or _DEFAULT_NETWORK_TIMEOUT + ) + self._conn.set_option(ldap.OPT_TIMEOUT, self._timeout or _DEFAULT_TIMEOUT) + self._conn.set_option(ldap.OPT_X_KEEPALIVE_IDLE, _DEFAULT_KEEPALIVE_IDLE) + self._conn.set_option(ldap.OPT_X_KEEPALIVE_INTERVAL, _DEFAULT_KEEPALIVE_INTERVAL) + self._conn.set_option(ldap.OPT_X_KEEPALIVE_PROBES, _DEFAULT_KEEPALIVE_PROBES) + self._conn.set_option(ldap.OPT_RESTART, ldap.OPT_ON) - if self._allow_tls_fallback: - logger.debug("TLS Fallback enabled in LDAP") - self._conn.set_option(ldap.OPT_X_TLS_TRY, 1) + if self._allow_tls_fallback: + logger.debug("TLS Fallback enabled in LDAP") + self._conn.set_option(ldap.OPT_X_TLS_TRY, 1) - # Must come _after_ all other TLS options - self._conn.set_option(ldap.OPT_X_TLS_NEWCTX, ldap.OPT_OFF) + # Must come _after_ all other TLS options + self._conn.set_option(ldap.OPT_X_TLS_NEWCTX, ldap.OPT_OFF) - span.set_status(StatusCode.ERROR) - self._conn.simple_bind_s(self._user_dn, self._user_pw) - span.set_status(StatusCode.OK) + self._conn.simple_bind_s(self._user_dn, self._user_pw) return self._conn def __exit__(self, exc_type, value, tb): - with tracer.start_as_current_span( - "ldap.unbind", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - }, - ) as span: - span.set_attribute("uri", self._ldap_uri) - span.set_attribute("binddn", self._user_dn) - span.set_status(StatusCode.OK) - self._conn.unbind_s() + self._conn.unbind_s() class LDAPUsers(FederatedUsers): @@ -272,10 +229,7 @@ class LDAPUsers(FederatedUsers): filter_global_readonly_superusers=False, ): query = "(|({0}={2}{3})({1}={2}{3}))".format( - self._uid_attr, - self._email_attr, - escape_filter_chars(username_or_email), - suffix, + self._uid_attr, self._email_attr, escape_filter_chars(username_or_email), suffix ) query = self._add_user_filter(query) @@ -298,59 +252,25 @@ class LDAPUsers(FederatedUsers): query = self._add_global_readonly_superuser_filter(query) logger.debug("Conducting user search: %s under %s", query, user_search_dn) - with tracer.start_as_current_span( - "ldap.search_s", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "ldap.suffix": str(suffix), - "ldap.query": str(query), - }, - ) as span: + try: + return (conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query), None) + except ldap.REFERRAL as re: + referral_dn = self._get_ldap_referral_dn(re) + if not referral_dn: + return (None, "Failed to follow referral when looking up username") + try: - span.set_status(StatusCode.OK) - return (conn.search_s(user_search_dn, ldap.SCOPE_SUBTREE, query), None) - except ldap.REFERRAL as re: - referral_dn = self._get_ldap_referral_dn(re) - if not referral_dn: - span.set_status(StatusCode.ERROR) - span.set_attribute( - "error", "Failed to follow referral when looking up username" - ) - span.set_attribute("details", str(re)) - return (None, "Failed to follow referral when looking up username") - - try: - subquery = "(%s=%s)" % (self._uid_attr, username_or_email) - subquery = self._add_user_filter(subquery) - span.add_event( - "ldap.search_s", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - "ldap.filter": str(subquery), - "ldap.suffix": str(referral_dn), - }, - ) - span.set_status(StatusCode.OK) - return (conn.search_s(referral_dn, ldap.SCOPE_BASE, subquery), None) - except ldap.LDAPError as ldaperr: - span.set_status(StatusCode.ERROR) - span.set_attribute("error", "LDAP referral search exception") - span.set_attribute("details", str(ldaperr)) - logger.debug("LDAP referral search exception") - return (None, "Username not found") - - except ldap.LDAPError as ldaperr: - span.set_status(StatusCode.ERROR) - span.set_attribute("error", "LDAP referral search exception") - span.set_attribute("details", str(ldaperr)) - logger.debug("LDAP search exception") + subquery = "(%s=%s)" % (self._uid_attr, username_or_email) + subquery = self._add_user_filter(subquery) + return (conn.search_s(referral_dn, ldap.SCOPE_BASE, subquery), None) + except ldap.LDAPError: + logger.debug("LDAP referral search exception") return (None, "Username not found") + except ldap.LDAPError: + logger.debug("LDAP search exception") + return (None, "Username not found") + def _ldap_user_search( self, username_or_email, @@ -372,122 +292,39 @@ class LDAPUsers(FederatedUsers): # Verify the admin connection works first. We do this here to avoid wrapping # the entire block in the INVALID CREDENTIALS check. - with tracer.start_as_current_span( - "quay.verify_ldap_credentials", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - }, - ) as span: - try: - with self._ldap.get_connection(): - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, - ) - span.set_status(StatusCode.OK) - pass - except ldap.INVALID_CREDENTIALS: - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, + try: + with self._ldap.get_connection(): + pass + except ldap.INVALID_CREDENTIALS: + return (None, "LDAP Admin dn or password is invalid") + + with self._ldap.get_connection() as conn: + logger.debug("Incoming username or email param: %s", username_or_email.__repr__()) + + for user_search_dn in self._user_dns: + (pairs, err_msg) = self._ldap_user_search_with_rdn( + conn, + username_or_email, + user_search_dn, + suffix=suffix, + filter_superusers=filter_superusers, + filter_restricted_users=filter_restricted_users, + filter_global_readonly_superusers=filter_global_readonly_superusers, ) - span.set_status(StatusCode.ERROR) - return (None, "LDAP Admin dn or password is invalid") + if pairs is not None and len(pairs) > 0: + break - with tracer.start_as_current_span( - "quay._ldap_user_search", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "ldap.suffix": str(suffix), - "quay.username_or_email": str(username_or_email), - "quay.filter.superuser": filter_superusers, - "quay.filter.restricted_user": filter_restricted_users, - "quay.filter.superuser.readonly": filter_global_readonly_superusers, - }, - ) as span: - with self._ldap.get_connection() as conn: - try: - with self._ldap.get_connection(): - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, - ) - span.set_status(StatusCode.OK) - pass - except ldap.INVALID_CREDENTIALS: - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, - ) - span.set_status(StatusCode.ERROR) - return (None, "LDAP Admin dn or password is invalid") + if err_msg is not None: + return (None, err_msg) - logger.debug("Incoming username or email param: %s", username_or_email.__repr__()) + dn_lst = [pair[0] for pair in pairs] + logger.debug("Found matching DNs: %s" % dn_lst) - for user_search_dn in self._user_dns: - span.add_event( - "quay._ldap_user_search_with_rdn", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - "ldap.basedn": str(user_search_dn), - "ldap.suffix": str(suffix), - "ldap.filter.superuser": filter_superusers, - "ldap.filterrestricted_user": filter_restricted_users, - "ldap.filter.superuser.readonly": filter_global_readonly_superusers, - "quay.username_or_email": str(username_or_email), - }, - ) - (pairs, err_msg) = self._ldap_user_search_with_rdn( - conn, - username_or_email, - user_search_dn, - suffix=suffix, - filter_superusers=filter_superusers, - filter_restricted_users=filter_restricted_users, - filter_global_readonly_superusers=filter_global_readonly_superusers, - ) - if pairs is not None and len(pairs) > 0: - break + results = [LDAPUsers._LDAPResult(*pair) for pair in take(limit, pairs)] - if err_msg is not None: - return (None, err_msg) - - span.set_status(StatusCode.OK) - dn_lst = [pair[0] for pair in pairs] - logger.debug("Found matching DNs: %s" % dn_lst) - - results = [LDAPUsers._LDAPResult(*pair) for pair in take(limit, pairs)] - - # Filter out pairs without DNs. Some LDAP impls will return such pairs. - with_dns = [result for result in results if result.dn] - return (with_dns, None) + # Filter out pairs without DNs. Some LDAP impls will return such pairs. + with_dns = [result for result in results if result.dn] + return (with_dns, None) def _ldap_single_user_search( self, @@ -496,44 +333,34 @@ class LDAPUsers(FederatedUsers): filter_restricted_users=False, filter_global_readonly_superusers=False, ): - with tracer.start_as_current_span( - "quay._ldap_single_user_search", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "quay.username": str(username_or_email), - "quay.filter.superuser": filter_superusers, - "quay.filter.restricted": filter_restricted_users, - "quay.filter.superuser.readonly": filter_global_readonly_superusers, - }, - ) as span: - with_dns, err_msg = self._ldap_user_search( - username_or_email, - filter_superusers=filter_superusers, - filter_restricted_users=filter_restricted_users, - filter_global_readonly_superusers=filter_global_readonly_superusers, - ) - if err_msg is not None: - span.set_status(StatusCode.ERROR) - return (None, err_msg) + try: + lookup_robot(username_or_email) + return (None, f"LDAP lookup for robots disabled {username_or_email}") + except InvalidRobotException: + # continue with LDAP lookup + pass - # Make sure we have at least one result. - if len(with_dns) < 1: - span.set_attribute("error", "Invalid username or password.") - span.set_status(StatusCode.ERROR) - return (None, "Invalid username or password.") + with_dns, err_msg = self._ldap_user_search( + username_or_email, + filter_superusers=filter_superusers, + filter_restricted_users=filter_restricted_users, + filter_global_readonly_superusers=filter_global_readonly_superusers, + ) + if err_msg is not None: + return (None, err_msg) - # If we have found a single pair, then return it. - if len(with_dns) == 1: - span.set_status(StatusCode.OK) - return (with_dns[0], None) + # Make sure we have at least one result. + if len(with_dns) < 1: + return (None, "Invalid username or password.") - # Otherwise, there are multiple pairs with DNs, so find the one with the mail - # attribute (if any). - with_mail = [result for result in with_dns if result.attrs.get(self._email_attr)] - span.set_status(StatusCode.OK) - return (with_mail[0] if with_mail else with_dns[0], None) + # If we have found a single pair, then return it. + if len(with_dns) == 1: + return (with_dns[0], None) + + # Otherwise, there are multiple pairs with DNs, so find the one with the mail + # attribute (if any). + with_mail = [result for result in with_dns if result.attrs.get(self._email_attr)] + return (with_mail[0] if with_mail else with_dns[0], None) def _build_user_information(self, response): if not response.get(self._uid_attr): @@ -554,178 +381,61 @@ class LDAPUsers(FederatedUsers): return (UserInformation(username=username, email=email, id=username), None) def ping(self): - with tracer.start_as_current_span( - "quay.ping", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - }, - ) as span: + try: + with self._ldap.get_connection(): + pass + except ldap.INVALID_CREDENTIALS: + return (False, "LDAP Admin dn or password is invalid") + except ldap.LDAPError as lde: + logger.exception("Exception when trying to health check LDAP") + return (False, str(lde)) - try: - with self._ldap.get_connection(): - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, - ) - span.set_status(StatusCode.OK) - pass - except ldap.INVALID_CREDENTIALS: - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, - ) - span.set_status(StatusCode.ERROR) - return (False, "LDAP Admin dn or password is invalid") - except ldap.LDAPError as lde: - logger.exception("Exception when trying to health check LDAP") - return (False, str(lde)) - - return (True, None) + return (True, None) def at_least_one_user_exists(self, filter_superusers=False, filter_restricted_users=False): logger.debug("Checking if any users exist in LDAP") + try: + with self._ldap.get_connection(): + pass + except ldap.INVALID_CREDENTIALS: + return (None, "LDAP Admin dn or password is invalid") - with tracer.start_as_current_span( - "quay.at_least_one_user_exists", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "quay.filter.superuser": filter_superusers, - "quay.filter.restricted": filter_restricted_users, - }, - ) as span: + has_pagination = not self._force_no_pagination + with self._ldap.get_connection() as conn: + for user_search_dn in self._user_dns: + search_flt = "(objectClass=*)" - try: - with self._ldap.get_connection(): - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, - ) - span.set_status(StatusCode.OK) - pass - except ldap.INVALID_CREDENTIALS: - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, + search_flt = self._add_user_filter(search_flt) + + if filter_restricted_users: + if self._ldap_restricted_user_filter: + search_flt = self._add_restricted_user_filter(search_flt) + else: + return (False, "Superuser filter not set") + elif filter_superusers: + if self._ldap_superuser_filter: + search_flt = self._add_superuser_filter(search_flt) + else: + return (False, "Restricted user filter not set") + + lc = ldap.controls.libldap.SimplePagedResultsControl( + criticality=True, size=1, cookie="" ) - span.set_status(StatusCode.ERROR) - return (None, "LDAP Admin dn or password is invalid") - has_pagination = not self._force_no_pagination - span.set_attribute("pagination", has_pagination) - with self._ldap.get_connection() as conn: - for user_search_dn in self._user_dns: - search_flt = "(objectClass=*)" - - search_flt = self._add_user_filter(search_flt) - - if filter_restricted_users: - span.add_event( - "quay._add_restricted_user_filter", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - "ldap.suffix": str(search_flt), - "quay.filter.restricted": filter_restricted_users, - }, + try: + if has_pagination: + msgid = conn.search_ext( + user_search_dn, ldap.SCOPE_SUBTREE, search_flt, serverctrls=[lc] ) - if self._ldap_restricted_user_filter: - search_flt = self._add_restricted_user_filter(search_flt) - else: - return (False, "Superuser filter not set") - elif filter_superusers: - span.add_event( - "quay._add_superuser_filter", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - "ldap.suffix": str(search_flt), - "quay.filter.superuser": self._ldap_superuser_filter, - }, - ) - if self._ldap_superuser_filter: - search_flt = self._add_superuser_filter(search_flt) - else: - return (False, "Restricted user filter not set") + _, rdata, _, serverctrls = conn.result3(msgid) + else: + msgid = conn.search(user_search_dn, ldap.SCOPE_SUBTREE, search_flt) + _, rdata = conn.result(msgid) - lc = ldap.controls.libldap.SimplePagedResultsControl( - criticality=True, size=1, cookie="" - ) - try: - if has_pagination: - with tracer.start_as_current_span( - "ldap.search_ext", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - "ldap.suffix": user_search_dn, - "ldap.filter": search_flt, - "ldap.paged": True, - "ldap.serverctrls": str(lc), - }, - ) as sspan: - msgid = conn.search_ext( - user_search_dn, - ldap.SCOPE_SUBTREE, - search_flt, - serverctrls=[lc], - ) - _, rdata, _, serverctrls = conn.result3(msgid) - sspan.set_status(StatusCode.OK) - else: - with tracer.start_as_current_span( - "ldap.search", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - "ldap.suffix": str(user_search_dn), - "ldap.filter": str(search_flt), - }, - ) as sspan: - msgid = conn.search(user_search_dn, ldap.SCOPE_SUBTREE, search_flt) - _, rdata = conn.result(msgid) - sspan.set_status(StatusCode.OK) + for entry in rdata: # Handles both lists and iterators. + return (True, None) - for entry in rdata: # Handles both lists and iterators. - span.set_status(StatusCode.OK) - return (True, None) - - except ldap.LDAPError as lde: - span.set_status(StatusCode.ERROR) - return ( - False, - str(lde) or "Could not find DN %s" % user_search_dn, - ) + except ldap.LDAPError as lde: + return (False, str(lde) or "Could not find DN %s" % user_search_dn) return (False, None) @@ -733,35 +443,14 @@ class LDAPUsers(FederatedUsers): """ Looks up a username or email in LDAP. """ - with tracer.start_as_current_span( - "quay.get_user", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "quay.username_or_email": str(username_or_email), - }, - ) as span: - logger.debug("Looking up LDAP username or email %s", username_or_email) - (found_user, err_msg) = self._ldap_single_user_search(username_or_email) - if err_msg is not None: - span.add_event( - "quay.get_user.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - "quay.username_or_email": str(username_or_email), - }, - ) - span.set_status(StatusCode.ERROR) - return (None, err_msg) + logger.debug("Looking up LDAP username or email %s", username_or_email) + (found_user, err_msg) = self._ldap_single_user_search(username_or_email) + if err_msg is not None: + return (None, err_msg) - logger.debug("Found user for LDAP username or email %s", username_or_email) - _, found_response = found_user - span.set_status(StatusCode.OK) - return self._build_user_information(found_response) + logger.debug("Found user for LDAP username or email %s", username_or_email) + _, found_response = found_user + return self._build_user_information(found_response) def query_users(self, query, limit=20): """ @@ -770,44 +459,21 @@ class LDAPUsers(FederatedUsers): if not query: return (None, self.federated_service, "Empty query") - with tracer.start_as_current_span( - "quay.query_users", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "ldap.query": str(query), - }, - ) as span: + logger.debug("Got query %s with limit %s", query, limit) + (results, err_msg) = self._ldap_user_search(query, limit=limit, suffix="*") + if err_msg is not None: + return (None, self.federated_service, err_msg) - logger.debug("Got query %s with limit %s", query, limit) - (results, err_msg) = self._ldap_user_search(query, limit=limit, suffix="*") + final_results = [] + for result in results[0:limit]: + credentials, err_msg = self._build_user_information(result.attrs) if err_msg is not None: - span.add_event( - "quay._ldap_user_search", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - "ldap.query": query, - "ldap.error": err_msg, - }, - ) - span.set_status(StatusCode.ERROR) - return (None, self.federated_service, err_msg) + continue - final_results = [] - for result in results[0:limit]: - credentials, err_msg = self._build_user_information(result.attrs) - if err_msg is not None: - continue + final_results.append(credentials) - final_results.append(credentials) - - logger.debug("For query %s found results %s", query, final_results) - span.set_status(StatusCode.OK) - return (final_results, self.federated_service, None) + logger.debug("For query %s found results %s", query, final_results) + return (final_results, self.federated_service, None) def verify_credentials(self, username_or_email, password): """ @@ -817,65 +483,36 @@ class LDAPUsers(FederatedUsers): if not password: return (None, "Anonymous binding not allowed.") - with tracer.start_as_current_span( - "quay.verify_credentials", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "quay.username_or_email": str(username_or_email), - }, - ) as span: - (found_user, err_msg) = self._ldap_single_user_search(username_or_email) - span.set_status(StatusCode.OK) - if found_user is None: - span.set_status(StatusCode.ERROR) - return (None, err_msg) + (found_user, err_msg) = self._ldap_single_user_search(username_or_email) + if found_user is None: + return (None, err_msg) found_dn, found_response = found_user logger.debug("Found user for LDAP username %s; validating password", username_or_email) logger.debug("DN %s found: %s", found_dn, found_response) # First validate the password by binding as the user - with tracer.start_as_current_span( - "ldap.bind", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "ldap.suffix": str(found_dn), - "tls_fallback": self._allow_tls_fallback, - }, - ) as span: + try: + with LDAPConnection(self._ldap_uri, found_dn, password, self._allow_tls_fallback): + pass + except ldap.REFERRAL as re: + referral_dn = self._get_ldap_referral_dn(re) + if not referral_dn: + return (None, "Invalid username or password.") + try: - with LDAPConnection(self._ldap_uri, found_dn, password, self._allow_tls_fallback): + with LDAPConnection( + self._ldap_uri, referral_dn, password, self._allow_tls_fallback + ): pass - span.set_status(StatusCode.OK) - except ldap.REFERRAL as re: - referral_dn = self._get_ldap_referral_dn(re) - if not referral_dn: - span.set_status(StatusCode.ERROR) - span.set_attribute("error", "Invalid username or password.") - return (None, "Invalid username or password.") - - try: - with LDAPConnection( - self._ldap_uri, referral_dn, password, self._allow_tls_fallback - ): - pass - span.set_status(StatusCode.OK) - except ldap.INVALID_CREDENTIALS: - span.set_status(StatusCode.ERROR) - span.set_attribute("error", "Invalid username or password.") - logger.debug("Invalid LDAP credentials") - return (None, "Invalid username or password.") - except ldap.INVALID_CREDENTIALS: - span.set_status(StatusCode.ERROR) - span.set_attribute("error", "Invalid username or password.") logger.debug("Invalid LDAP credentials") return (None, "Invalid username or password.") + except ldap.INVALID_CREDENTIALS: + logger.debug("Invalid LDAP credentials") + return (None, "Invalid username or password.") + return self._build_user_information(found_response) def service_metadata(self): @@ -887,130 +524,33 @@ class LDAPUsers(FederatedUsers): if not group_lookup_args.get("group_dn"): return (False, "Missing group_dn") - with tracer.start_as_current_span( - "quay.check_group_lookup_args", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "ldap.query": str(group_lookup_args), - }, - ) as span: - span.set_status(StatusCode.OK) - with tracer.start_as_current_span( - "quay.iterate_group_members", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - }, - ) as tspan: - (it, err) = self.iterate_group_members( - group_lookup_args, - page_size=1, - disable_pagination=disable_pagination, - ) - tspan.set_status(StatusCode.OK) - if err is not None: - tspan.set_status(StatusCode.ERROR) - return (False, err) + (it, err) = self.iterate_group_members( + group_lookup_args, page_size=1, disable_pagination=disable_pagination + ) + if err is not None: + return (False, err) - if not next(it, False): - span.set_status(StatusCode.ERROR) - return (False, "Group does not exist or is empty") + if not next(it, False): + return (False, "Group does not exist or is empty") - return (True, None) + return (True, None) def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False): - with tracer.start_as_current_span( - "quay.iterate_group_members", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "ldap.query": str(group_lookup_args), - }, - ) as span: - try: - # Verify the admin connection works first. We do this here to avoid wrapping - # the entire block in the INVALID CREDENTIALS check. - with self._ldap.get_connection(): - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, - ) - span.set_status(StatusCode.OK) - pass - except ldap.INVALID_CREDENTIALS: - span.add_event( - "quay.verify_ldap_credentials.result", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout - or _DEFAULT_NETWORK_TIMEOUT, - }, - ) - return (None, "LDAP Admin dn or password is invalid") + try: + with self._ldap.get_connection(): + pass + except ldap.INVALID_CREDENTIALS: + return (None, "LDAP Admin dn or password is invalid") group_dn = group_lookup_args["group_dn"] memberof_attr = self._memberof_attr page_size = page_size or _DEFAULT_PAGE_SIZE - return ( - self._iterate_members(group_dn, memberof_attr, page_size, disable_pagination), - None, - ) + return (self._iterate_members(group_dn, memberof_attr, page_size, disable_pagination), None) def is_superuser(self, username_or_email: str) -> bool: if not username_or_email: return False - with tracer.start_as_current_span( - "quay.is_superuser", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "quay.username_or_email": str(username_or_email), - }, - ) as span: - logger.debug("Looking up LDAP superuser username or email %s", username_or_email) - (found_user, err_msg) = self._ldap_single_user_search( - username_or_email, filter_superusers=True - ) - span.set_status(StatusCode.OK) - if found_user is None: - span.set_status(StatusCode.ERROR) - logger.debug("LDAP superuser %s not found: %s", username_or_email, err_msg) - return False - - logger.debug("Found superuser for LDAP username or email %s", username_or_email) - return True - - def has_superusers(self) -> bool: - with tracer.start_as_current_span( - "quay.has_superuser", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - }, - ) as span: - has_superusers, _ = self.at_least_one_user_exists(filter_superusers=True) - if has_superusers: - span.set_status(StatusCode.OK) - else: - span.set_status(StatusCode.ERROR) - return has_superusers - - def is_global_readonly_superuser(self, username_or_email: str) -> bool: - if not username_or_email: - return False try: lookup_robot(username_or_email) return False # Robots are not in LDAP so return False as not being a superuser @@ -1018,37 +558,48 @@ class LDAPUsers(FederatedUsers): # continue with LDAP lookup pass - with tracer.start_as_current_span( - "quay.is_global_readonly_superuser", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "quay.username_or_email": str(username_or_email), - }, - ) as span: + logger.debug("Looking up LDAP superuser username or email %s", username_or_email) + (found_user, err_msg) = self._ldap_single_user_search( + username_or_email, filter_superusers=True + ) + if found_user is None: + logger.debug("LDAP superuser %s not found: %s", username_or_email, err_msg) + return False - logger.debug( - "Looking up LDAP global readonly superuser username or email %s", username_or_email - ) - (found_user, err_msg) = self._ldap_single_user_search( - username_or_email, filter_global_readonly_superusers=True - ) - if found_user is None: - logger.debug( - "LDAP global readonly superuser %s not found: %s", - username_or_email, - err_msg, - ) - span.set_status(StatusCode.ERROR) - return False + logger.debug("Found superuser for LDAP username or email %s", username_or_email) + return True - span.set_status(StatusCode.OK) + def has_superusers(self) -> bool: + has_superusers, _ = self.at_least_one_user_exists(filter_superusers=True) + return has_superusers + + def is_global_readonly_superuser(self, username_or_email: str) -> bool: + if not username_or_email: + return False + + try: + lookup_robot(username_or_email) + return False # Robots are not in LDAP so return False as not being a superuser + except InvalidRobotException: + # continue with LDAP lookup + pass + + logger.debug( + "Looking up LDAP global readonly superuser username or email %s", username_or_email + ) + (found_user, err_msg) = self._ldap_single_user_search( + username_or_email, filter_global_readonly_superusers=True + ) + if found_user is None: logger.debug( - "Found global readonly superuser for LDAP username or email %s", - username_or_email, + "LDAP global readonly superuser %s not found: %s", username_or_email, err_msg ) - return True + return False + + logger.debug( + "Found global readonly superuser for LDAP username or email %s", username_or_email + ) + return True def is_restricted_user(self, username_or_email: str) -> bool: if not username_or_email: @@ -1064,52 +615,23 @@ class LDAPUsers(FederatedUsers): # continue with LDAP lookup pass - with tracer.start_as_current_span( - "quay.is_restricted_user", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - "quay.username_or_email": str(username_or_email), - }, - ) as span: + logger.debug("Looking up LDAP restricted user username or email %s", username_or_email) + (found_user, err_msg) = self._ldap_single_user_search( + username_or_email, filter_restricted_users=True + ) + if found_user is None: + logger.debug("LDAP user %s not found: %s", username_or_email, err_msg) + return False - logger.debug( - "Looking up LDAP restricted user username or email %s", - username_or_email, - ) - (found_user, err_msg) = self._ldap_single_user_search( - username_or_email, - filter_restricted_users=True, - ) - if found_user is None: - logger.debug("LDAP user %s not found: %s", username_or_email, err_msg) - span.set_status(StatusCode.ERROR) - return False - - logger.debug("Found restricted user for LDAP username or email %s", username_or_email) - span.set_status(StatusCode.OK) - return True + logger.debug("Found restricted user for LDAP username or email %s", username_or_email) + return True def has_restricted_users(self) -> bool: - with tracer.start_as_current_span( - "quay.has_restricted_users", - attributes={ - "ldap.id": str(id(self)), - "ldap.uri": str(self._ldap_uri), - "ldap.network.timeout": self._ldap._network_timeout or _DEFAULT_NETWORK_TIMEOUT, - }, - ) as span: - if self._ldap_restricted_user_filter is None and self.at_least_one_user_exists(): - span.set_status(StatusCode.OK) - return True + if self._ldap_restricted_user_filter is None and self.at_least_one_user_exists(): + return True - has_restricted_users, _ = self.at_least_one_user_exists(filter_restricted_users=True) - if has_restricted_users: - span.set_status(StatusCode.OK) - else: - span.set_status(StatusCode.ERROR) - return has_restricted_users + has_restricted_users, _ = self.at_least_one_user_exists(filter_restricted_users=True) + return has_restricted_users def _iterate_members(self, group_dn, memberof_attr, page_size, disable_pagination): has_pagination = not (self._force_no_pagination or disable_pagination) @@ -1126,9 +648,7 @@ class LDAPUsers(FederatedUsers): # Conduct the initial search for users that are a member of the group. logger.debug( - "Conducting LDAP search of DN: %s and filter %s", - user_search_dn, - search_flt, + "Conducting LDAP search of DN: %s and filter %s", user_search_dn, search_flt ) try: if has_pagination: @@ -1141,10 +661,7 @@ class LDAPUsers(FederatedUsers): ) else: msgid = conn.search( - user_search_dn, - ldap.SCOPE_SUBTREE, - search_flt, - attrlist=attributes, + user_search_dn, ldap.SCOPE_SUBTREE, search_flt, attrlist=attributes ) except ldap.LDAPError as lde: logger.exception( diff --git a/endpoints/v1/__init__.py b/endpoints/v1/__init__.py index 95e3b8904..850ae79d1 100644 --- a/endpoints/v1/__init__.py +++ b/endpoints/v1/__init__.py @@ -3,10 +3,6 @@ from functools import wraps from flask import Blueprint, jsonify, make_response -from util.metrics.otel import trace - -tracer = trace.get_tracer("quay.v1") - import features from app import app from data.readreplica import ReadOnlyModeException @@ -28,9 +24,6 @@ def internal_ping(): @v1_bp.route("/_ping") @anon_allowed -@tracer.start_as_current_span( - "quay.endpoints.v1._ping", record_exception=True, set_status_on_exception=True -) def ping(): # NOTE: any changes made here must also be reflected in the nginx config response = make_response("true", 200) @@ -40,9 +33,6 @@ def ping(): @v1_bp.app_errorhandler(ReadOnlyModeException) -@tracer.start_as_current_span( - "quay.endpoints.v1.handle_readonly", record_exception=True, set_status_on_exception=True -) def handle_readonly(ex): response = jsonify( { diff --git a/endpoints/v1/index.py b/endpoints/v1/index.py index 507a2af9c..f4d354a27 100644 --- a/endpoints/v1/index.py +++ b/endpoints/v1/index.py @@ -5,11 +5,6 @@ from functools import wraps from flask import jsonify, make_response, request, session -from util.metrics.otel import trace - -tracer = trace.get_tracer("quay.v1.index") - - from app import app, docker_v2_signing_key, storage, userevents from auth.auth_context import get_authenticated_context, get_authenticated_user from auth.credentials import CredentialKind, validate_credentials @@ -110,9 +105,6 @@ def generate_headers(scope=GrantType.READ_REPOSITORY, add_grant_for_status=None) @v1_bp.route("/users/", methods=["POST"]) @anon_allowed @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.index.create_user", record_exception=True, set_status_on_exception=True -) def create_user(): user_data = request.get_json() if not user_data or not "username" in user_data: @@ -165,9 +157,6 @@ def create_user(): @v1_bp.route("/users/", methods=["GET"]) @process_auth @anon_allowed -@tracer.start_as_current_span( - "quay.endpoints.v1.index.get_user", record_exception=True, set_status_on_exception=True -) def get_user(): context = get_authenticated_context() if not context or context.is_anonymous: @@ -185,9 +174,6 @@ def get_user(): @process_auth @anon_allowed @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.index.update_user", record_exception=True, set_status_on_exception=True -) def update_user(username): permission = UserAdminPermission(username) if permission.can(): @@ -216,9 +202,6 @@ def update_user(username): @generate_headers(scope=GrantType.WRITE_REPOSITORY, add_grant_for_status=201) @anon_allowed @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.index.create_repository", record_exception=True, set_status_on_exception=True -) def create_repository(namespace_name, repo_name): # Verify that the repository name is valid. if not REPOSITORY_NAME_REGEX.match(repo_name): @@ -304,9 +287,6 @@ def create_repository(namespace_name, repo_name): @generate_headers(scope=GrantType.WRITE_REPOSITORY) @anon_allowed @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.index.update_images", record_exception=True, set_status_on_exception=True -) def update_images(namespace_name, repo_name): permission = ModifyRepositoryPermission(namespace_name, repo_name) if permission.can(): @@ -349,11 +329,6 @@ def update_images(namespace_name, repo_name): @ensure_namespace_enabled @generate_headers(scope=GrantType.READ_REPOSITORY) @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.v1.index.get_repository_images", - record_exception=True, - set_status_on_exception=True, -) def get_repository_images(namespace_name, repo_name): repository_ref = registry_model.lookup_repository( namespace_name, repo_name, kind_filter="image" @@ -389,11 +364,6 @@ def get_repository_images(namespace_name, repo_name): @generate_headers(scope=GrantType.WRITE_REPOSITORY) @anon_allowed @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.index.delete_repository_images", - record_exception=True, - set_status_on_exception=True, -) def delete_repository_images(namespace_name, repo_name): abort(501, "Not Implemented", issue="not-implemented") @@ -405,11 +375,6 @@ def delete_repository_images(namespace_name, repo_name): @check_repository_state @anon_allowed @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.index.put_repository_auth", - record_exception=True, - set_status_on_exception=True, -) def put_repository_auth(namespace_name, repo_name): abort(501, "Not Implemented", issue="not-implemented") @@ -417,9 +382,6 @@ def put_repository_auth(namespace_name, repo_name): @v1_bp.route("/search", methods=["GET"]) @process_auth @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.v1.index.get_search", record_exception=True, set_status_on_exception=True -) def get_search(): query = request.args.get("q") or "" diff --git a/endpoints/v1/registry.py b/endpoints/v1/registry.py index 05bbac982..4540dec76 100644 --- a/endpoints/v1/registry.py +++ b/endpoints/v1/registry.py @@ -4,10 +4,6 @@ from datetime import datetime from functools import wraps from time import time -from util.metrics.otel import trace - -tracer = trace.get_tracer("quay.v1.registry") - from flask import Response from flask import abort as flask_abort from flask import make_response, redirect, request, session @@ -90,11 +86,6 @@ def set_cache_headers(f): @require_completion @set_cache_headers @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.v1.registry.head_image_layer", - record_exception=True, - set_status_on_exception=True, -) def head_image_layer(namespace, repository, image_id, headers): permission = ReadRepositoryPermission(namespace, repository) repository_ref = registry_model.lookup_repository(namespace, repository, kind_filter="image") @@ -135,11 +126,6 @@ def head_image_layer(namespace, repository, image_id, headers): @set_cache_headers @check_region_blacklisted() @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.v1.registry.head_image_layer", - record_exception=True, - set_status_on_exception=True, -) def get_image_layer(namespace, repository, image_id, headers): permission = ReadRepositoryPermission(namespace, repository) repository_ref = registry_model.lookup_repository(namespace, repository, kind_filter="image") @@ -193,11 +179,6 @@ def get_image_layer(namespace, repository, image_id, headers): @check_repository_state @anon_protect @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.registry.put_image_layer", - record_exception=True, - set_status_on_exception=True, -) def put_image_layer(namespace, repository, image_id): logger.debug("Checking repo permissions") permission = ModifyRepositoryPermission(namespace, repository) @@ -303,11 +284,6 @@ def put_image_layer(namespace, repository, image_id): @check_repository_state @anon_protect @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.registry.put_image_checksum", - record_exception=True, - set_status_on_exception=True, -) def put_image_checksum(namespace, repository, image_id): logger.debug("Checking repo permissions") permission = ModifyRepositoryPermission(namespace, repository) @@ -371,9 +347,6 @@ def put_image_checksum(namespace, repository, image_id): @require_completion @set_cache_headers @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.v1.registry.get_image_json", record_exception=True, set_status_on_exception=True -) def get_image_json(namespace, repository, image_id, headers): logger.debug("Checking repo permissions") permission = ReadRepositoryPermission(namespace, repository) @@ -406,11 +379,6 @@ def get_image_json(namespace, repository, image_id, headers): @require_completion @set_cache_headers @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.v1.registry.get_image_ancestry", - record_exception=True, - set_status_on_exception=True, -) def get_image_ancestry(namespace, repository, image_id, headers): logger.debug("Checking repo permissions") permission = ReadRepositoryPermission(namespace, repository) @@ -437,9 +405,6 @@ def get_image_ancestry(namespace, repository, image_id, headers): @check_repository_state @anon_protect @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.registry.put_image_json", record_exception=True, set_status_on_exception=True -) def put_image_json(namespace, repository, image_id): logger.debug("Checking repo permissions") permission = ModifyRepositoryPermission(namespace, repository) diff --git a/endpoints/v1/tag.py b/endpoints/v1/tag.py index 101a15be6..bdaccdbdb 100644 --- a/endpoints/v1/tag.py +++ b/endpoints/v1/tag.py @@ -3,10 +3,6 @@ import logging from flask import abort, jsonify, make_response, request, session -from util.metrics.otel import trace - -tracer = trace.get_tracer("quay.v1.tag") - from app import docker_v2_signing_key, model_cache, storage from auth.decorators import process_auth from auth.permissions import ModifyRepositoryPermission, ReadRepositoryPermission @@ -29,9 +25,6 @@ logger = logging.getLogger(__name__) @process_auth @anon_protect @parse_repository_name() -@tracer.start_as_current_span( - "quay.endpoints.v1.tag.get_tags", record_exception=True, set_status_on_exception=True -) def get_tags(namespace_name, repo_name): permission = ReadRepositoryPermission(namespace_name, repo_name) repository_ref = registry_model.lookup_repository( @@ -51,9 +44,6 @@ def get_tags(namespace_name, repo_name): @process_auth @anon_protect @parse_repository_name() -@tracer.start_as_current_span( - "quay.endpoints.v1.tag.get_tag", record_exception=True, set_status_on_exception=True -) def get_tag(namespace_name, repo_name, tag): permission = ReadRepositoryPermission(namespace_name, repo_name) repository_ref = registry_model.lookup_repository( @@ -81,9 +71,6 @@ def get_tag(namespace_name, repo_name, tag): @check_repository_state @check_v1_push_enabled() @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.tag.put_tag", record_exception=True, set_status_on_exception=True -) def put_tag(namespace_name, repo_name, tag): permission = ModifyRepositoryPermission(namespace_name, repo_name) repository_ref = registry_model.lookup_repository( @@ -135,9 +122,6 @@ def put_tag(namespace_name, repo_name, tag): @check_repository_state @check_v1_push_enabled() @check_readonly -@tracer.start_as_current_span( - "quay.endpoints.v1.tag.delete_tag", record_exception=True, set_status_on_exception=True -) def delete_tag(namespace_name, repo_name, tag): permission = ModifyRepositoryPermission(namespace_name, repo_name) repository_ref = registry_model.lookup_repository( diff --git a/endpoints/v2/blob.py b/endpoints/v2/blob.py index 8548250f1..cee633287 100644 --- a/endpoints/v2/blob.py +++ b/endpoints/v2/blob.py @@ -1,10 +1,6 @@ import logging import re -from util.metrics.otel import StatusCode, trace - -tracer = trace.get_tracer("quay.endpoints.v2.blob") - from flask import Response from flask import abort as flask_abort from flask import redirect, request, url_for @@ -71,9 +67,6 @@ BLOB_CONTENT_TYPE = "application/octet-stream" @anon_allowed @cache_control(max_age=31436000) @inject_registry_model() -@tracer.start_as_current_span( - "quay.v2.blobs.check_blob_exists", record_exception=True, set_status_on_exception=True -) def check_blob_exists(namespace_name, repo_name, digest, registry_model): # Find the blob. blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest) @@ -104,9 +97,6 @@ def check_blob_exists(namespace_name, repo_name, digest, registry_model): @check_region_blacklisted(BlobDownloadGeoBlocked) @cache_control(max_age=31536000) @inject_registry_model() -@tracer.start_as_current_span( - "quay.v2.blobs.download_blob", record_exception=True, set_status_on_exception=True -) def download_blob(namespace_name, repo_name, digest, registry_model): # Find the blob. blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest) @@ -272,9 +262,6 @@ def _try_to_mount_blob(repository_ref, mount_blob_digest): @anon_protect @check_readonly @check_pushes_disabled -@tracer.start_as_current_span( - "quay.v2.blobs.start_blob_upload", record_exception=True, set_status_on_exception=True -) def start_blob_upload(namespace_name, repo_name): repository_ref = registry_model.lookup_repository(namespace_name, repo_name) @@ -347,9 +334,6 @@ def start_blob_upload(namespace_name, repo_name): @process_registry_jwt_auth(scopes=["pull"]) @require_repo_write(allow_for_superuser=True, disallow_for_restricted_users=True) @anon_protect -@tracer.start_as_current_span( - "quay.v2.blobs.fetch_existing_upload", record_exception=True, set_status_on_exception=True -) def fetch_existing_upload(namespace_name, repo_name, upload_uuid): repository_ref = registry_model.lookup_repository(namespace_name, repo_name) if repository_ref is None: @@ -380,9 +364,6 @@ def fetch_existing_upload(namespace_name, repo_name, upload_uuid): @anon_protect @check_readonly @check_pushes_disabled -@tracer.start_as_current_span( - "quay.v2.blobs.upload_chunk", record_exception=True, set_status_on_exception=True -) def upload_chunk(namespace_name, repo_name, upload_uuid): repository_ref = registry_model.lookup_repository(namespace_name, repo_name) if repository_ref is None: @@ -425,11 +406,6 @@ def upload_chunk(namespace_name, repo_name, upload_uuid): @require_repo_write(allow_for_superuser=True, disallow_for_restricted_users=True) @anon_protect @check_readonly -@tracer.start_as_current_span( - "quay.v2.blobs.monolithic_upload_or_last_chunk", - record_exception=True, - set_status_on_exception=True, -) def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): # Ensure the digest is present before proceeding. @@ -483,9 +459,6 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid): @anon_protect @check_readonly @check_pushes_disabled -@tracer.start_as_current_span( - "quay.v2.blobs.cancel_upload", record_exception=True, set_status_on_exception=True -) def cancel_upload(namespace_name, repo_name, upload_uuid): repository_ref = registry_model.lookup_repository(namespace_name, repo_name) if repository_ref is None: @@ -509,9 +482,6 @@ def cancel_upload(namespace_name, repo_name, upload_uuid): @anon_protect @check_readonly @check_pushes_disabled -@tracer.start_as_current_span( - "quay.v2.blobs.delete_digest", record_exception=True, set_status_on_exception=True -) def delete_digest(namespace_name, repo_name, digest): # We do not support deleting arbitrary digests, as they break repo images. raise Unsupported() @@ -590,37 +560,26 @@ def _upload_chunk(blob_uploader, commit_digest=None): If commit_digest is specified, the upload is committed to a blob once the stream's data has been read and stored. """ - with tracer.start_as_current_span( - "quay._upload_chunk", - ) as span: - start_offset, length = _start_offset_and_length(request.headers.get("content-range")) - span.set_attribute("offset", str(start_offset)) - span.set_attribute("length", str(length)) + start_offset, length = _start_offset_and_length(request.headers.get("content-range")) + if None in {start_offset, length}: + raise InvalidRequest(message="Invalid range header") - if None in {start_offset, length}: - span.set_status(StatusCode.ERROR) - raise InvalidRequest(message="Invalid range header") + input_fp = get_input_stream(request) - input_fp = get_input_stream(request) + try: + # Upload the data received. + blob_uploader.upload_chunk(app.config, input_fp, start_offset, length) - try: - # Upload the data received. - blob_uploader.upload_chunk(app.config, input_fp, start_offset, length) - - if commit_digest is not None: - # Commit the upload to a blob. - return blob_uploader.commit_to_blob(app.config, commit_digest) - except BlobTooLargeException as ble: - span.record_exception(ble) - span.set_status(StatusCode.ERROR) - raise LayerTooLarge(uploaded=ble.uploaded, max_allowed=ble.max_allowed) - except BlobRangeMismatchException as ble: - span.record_exception(ble) - span.set_status(StatusCode.ERROR) - logger.exception("Exception when uploading blob to %s", blob_uploader.blob_upload_id) - _abort_range_not_satisfiable( - blob_uploader.blob_upload.byte_count, blob_uploader.blob_upload_id - ) - except BlobUploadException: - logger.exception("Exception when uploading blob to %s", blob_uploader.blob_upload_id) - raise BlobUploadInvalid() + if commit_digest is not None: + # Commit the upload to a blob. + return blob_uploader.commit_to_blob(app.config, commit_digest) + except BlobTooLargeException as ble: + raise LayerTooLarge(uploaded=ble.uploaded, max_allowed=ble.max_allowed) + except BlobRangeMismatchException: + logger.exception("Exception when uploading blob to %s", blob_uploader.blob_upload_id) + _abort_range_not_satisfiable( + blob_uploader.blob_upload.byte_count, blob_uploader.blob_upload_id + ) + except BlobUploadException: + logger.exception("Exception when uploading blob to %s", blob_uploader.blob_upload_id) + raise BlobUploadInvalid() diff --git a/endpoints/v2/catalog.py b/endpoints/v2/catalog.py index d045b45d1..ab4e97233 100644 --- a/endpoints/v2/catalog.py +++ b/endpoints/v2/catalog.py @@ -1,9 +1,5 @@ from collections import namedtuple -from util.metrics.otel import trace - -tracer = trace.get_tracer("quay.endpoints.v2.catalog") - from flask import jsonify import features @@ -30,9 +26,6 @@ class Repository(namedtuple("Repository", ["id", "namespace_name", "name"])): @process_registry_jwt_auth() @anon_protect @paginate() -@tracer.start_as_current_span( - "quay.endpoints.v2.catalog.catalog_search", record_exception=True, set_status_on_exception=True -) def catalog_search(start_id, limit, pagination_callback): def _load_catalog(): include_public = bool(features.PUBLIC_CATALOG) diff --git a/endpoints/v2/manifest.py b/endpoints/v2/manifest.py index 339c2db69..c86259011 100644 --- a/endpoints/v2/manifest.py +++ b/endpoints/v2/manifest.py @@ -3,10 +3,6 @@ from functools import wraps from flask import Response, request, url_for -from util.metrics.otel import trace - -tracer = trace.get_tracer("quay.v2.manifest") - import features from app import app, model_cache, pullmetrics, storage from auth.registry_jwt_auth import process_registry_jwt_auth @@ -75,11 +71,6 @@ MANIFEST_TAGNAME_ROUTE = BASE_MANIFEST_ROUTE.format(VALID_TAG_PATTERN) @require_repo_read(allow_for_superuser=True, allow_for_global_readonly_superuser=True) @anon_protect @inject_registry_model() -@tracer.start_as_current_span( - "quay.v2.manifest.fetch_manifest_by_tagname", - record_exception=True, - set_status_on_exception=True, -) def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref, registry_model): try: repository_ref = registry_model.lookup_repository( @@ -163,9 +154,6 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref, registry_ @require_repo_read(allow_for_superuser=True, allow_for_global_readonly_superuser=True) @anon_protect @inject_registry_model() -@tracer.start_as_current_span( - "quay.v2.manifest.fetch_manifest_by_digest", record_exception=True, set_status_on_exception=True -) def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref, registry_model): try: repository_ref = registry_model.lookup_repository( @@ -305,11 +293,6 @@ def _doesnt_accept_schema_v1(): @anon_protect @check_readonly @check_pushes_disabled -@tracer.start_as_current_span( - "quay.v2.manifest.write_manifest_by_tagname", - record_exception=True, - set_status_on_exception=True, -) def write_manifest_by_tagname(namespace_name, repo_name, manifest_ref): parsed = _parse_manifest(request.content_type, request.data) @@ -336,9 +319,6 @@ def _enqueue_blobs_for_replication(manifest, storage, namespace_name): @anon_protect @check_readonly @check_pushes_disabled -@tracer.start_as_current_span( - "quay.v2.manifest.write_manifest_by_digest", record_exception=True, set_status_on_exception=True -) def write_manifest_by_digest(namespace_name, repo_name, manifest_ref): parsed = _parse_manifest(request.content_type, request.data) if parsed.digest != manifest_ref: @@ -416,11 +396,6 @@ def _parse_manifest(content_type, request_data): @anon_protect @check_readonly @check_pushes_disabled -@tracer.start_as_current_span( - "quay.v2.manifest.delete_manifest_by_digest", - record_exception=True, - set_status_on_exception=True, -) def delete_manifest_by_digest(namespace_name, repo_name, manifest_ref): """ Delete the manifest specified by the digest. @@ -458,9 +433,6 @@ def delete_manifest_by_digest(namespace_name, repo_name, manifest_ref): @anon_protect @check_readonly @check_pushes_disabled -@tracer.start_as_current_span( - "quay.v2.manifest.delete_manifest_by_tag", record_exception=True, set_status_on_exception=True -) def delete_manifest_by_tag(namespace_name, repo_name, manifest_ref): """ Deletes the manifest specified by the tag. diff --git a/endpoints/v2/tag.py b/endpoints/v2/tag.py index 6c282acb4..e9490f822 100644 --- a/endpoints/v2/tag.py +++ b/endpoints/v2/tag.py @@ -1,9 +1,5 @@ from flask import jsonify -from util.metrics.otel import trace - -tracer = trace.get_tracer("quay.endpoints.v2.tag") - from app import app, model_cache from auth.registry_jwt_auth import process_registry_jwt_auth from data.registry_model import registry_model @@ -28,9 +24,6 @@ from endpoints.v2.errors import NameUnknown, TooManyTagsRequested @require_repo_read(allow_for_superuser=True, allow_for_global_readonly_superuser=True) @anon_protect @oci_tag_paginate() -@tracer.start_as_current_span( - "quay.endpoints.v2.tag.list_all_tags", record_exception=True, set_status_on_exception=True -) def list_all_tags(namespace_name, repo_name, last_pagination_tag_name, limit, pagination_callback): repository_ref = registry_model.lookup_repository(namespace_name, repo_name) if repository_ref is None: diff --git a/endpoints/v2/v2auth.py b/endpoints/v2/v2auth.py index a977ed3c9..3a869193b 100644 --- a/endpoints/v2/v2auth.py +++ b/endpoints/v2/v2auth.py @@ -2,10 +2,6 @@ import logging import re from collections import namedtuple -from util.metrics.otel import trace - -tracer = trace.get_tracer("quay.endpoints.v2.v2auth") - from cachetools.func import lru_cache from flask import jsonify, request @@ -70,11 +66,6 @@ scopeResult = namedtuple( @process_basic_auth @no_cache @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.v2.v2auth.generate_registry_jwt", - record_exception=True, - set_status_on_exception=True, -) def generate_registry_jwt(auth_result): """ This endpoint will generate a JWT conforming to the Docker Registry v2 Auth Spec: diff --git a/endpoints/web.py b/endpoints/web.py index 2bb166c1e..0756d3f82 100644 --- a/endpoints/web.py +++ b/endpoints/web.py @@ -3,10 +3,6 @@ import logging import os from datetime import datetime, timedelta -from util.metrics.otel import StatusCode, get_tracecontext, trace - -tracer = trace.get_tracer("quay.endpoints.web") - from cachetools.func import lru_cache from flask import ( Blueprint, @@ -108,37 +104,23 @@ STATUS_TAGS = app.config["STATUS_TAGS"] @web.route("/", methods=["GET"], defaults={"path": ""}) @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.index", record_exception=True, set_status_on_exception=True -) def index(path, **kwargs): return render_page_template_with_routedata("index.html", **kwargs) @web.route("/_internal_ping") @anon_allowed -@tracer.start_as_current_span( - "quay.endpoints.web.internal_ping", record_exception=True, set_status_on_exception=True -) def internal_ping(): return make_response("true", 200) @web.route("/500", methods=["GET"]) -@tracer.start_as_current_span( - "quay.endpoints.web.internal_error_display", record_exception=True, set_status_on_exception=True -) def internal_error_display(): return render_page_template_with_routedata("500.html") @web.errorhandler(404) @web.route("/404", methods=["GET"]) -@tracer.start_as_current_span( - "quay.endpoints.web.not_found_error_display", - record_exception=True, - set_status_on_exception=True, -) def not_found_error_display(e=None): resp = index("", error_code=404, error_info=dict(reason="notfound")) resp.status_code = 404 @@ -146,9 +128,6 @@ def not_found_error_display(e=None): @web.route("/opensearch.xml") -@tracer.start_as_current_span( - "quay.endpoints.web.opensearch", record_exception=True, set_status_on_exception=True -) def opensearch(): template = render_template( "opensearch.xml", @@ -163,9 +142,6 @@ def opensearch(): @web.route("/organization/", methods=["GET"]) @web.route("/organization//", methods=["GET"]) @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.org_view", record_exception=True, set_status_on_exception=True -) def org_view(path): return index("") @@ -173,9 +149,6 @@ def org_view(path): @web.route("/user/", methods=["GET"]) @web.route("/user//", methods=["GET"]) @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.user_view", record_exception=True, set_status_on_exception=True -) def user_view(path): return index("") @@ -183,27 +156,18 @@ def user_view(path): @web.route("/plans/") @no_cache @route_show_if(features.BILLING) -@tracer.start_as_current_span( - "quay.endpoints.web.plans", record_exception=True, set_status_on_exception=True -) def plans(): return index("") @web.route("/search") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.search", record_exception=True, set_status_on_exception=True -) def search(): return index("") @web.route("/guide/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.guide", record_exception=True, set_status_on_exception=True -) def guide(): return index("") @@ -211,18 +175,12 @@ def guide(): @web.route("/tour/") @web.route("/tour/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.tour", record_exception=True, set_status_on_exception=True -) def tour(path=""): return index("") @web.route("/tutorial/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.tutorial", record_exception=True, set_status_on_exception=True -) def tutorial(): return index("") @@ -230,9 +188,6 @@ def tutorial(): @web.route("/organizations/") @web.route("/organizations/new/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.organizations", record_exception=True, set_status_on_exception=True -) def organizations(): return index("") @@ -240,9 +195,6 @@ def organizations(): @web.route("/superuser/") @no_cache @route_show_if(features.SUPER_USERS) -@tracer.start_as_current_span( - "quay.endpoints.web.superuser", record_exception=True, set_status_on_exception=True -) def superuser(): return index("") @@ -250,63 +202,42 @@ def superuser(): @web.route("/setup/") @no_cache @route_show_if(features.SUPER_USERS) -@tracer.start_as_current_span( - "quay.endpoints.web.setup", record_exception=True, set_status_on_exception=True -) def setup(): return index("") @web.route("/signin/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.signin", record_exception=True, set_status_on_exception=True -) def signin(redirect=None): return index("") @web.route("/contact/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.contact", record_exception=True, set_status_on_exception=True -) def contact(): return index("") @web.route("/about/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.about", record_exception=True, set_status_on_exception=True -) def about(): return index("") @web.route("/new/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.new", record_exception=True, set_status_on_exception=True -) def new(): return index("") @web.route("/updateuser") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.updateuser", record_exception=True, set_status_on_exception=True -) def updateuser(): return index("") @web.route("/confirminvite") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.confirm_invite", record_exception=True, set_status_on_exception=True -) def confirm_invite(): code = request.values["code"] return index("", code=code) @@ -315,27 +246,18 @@ def confirm_invite(): @web.route("/repository/", defaults={"path": ""}) @web.route("/repository/", methods=["GET"]) @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.repository", record_exception=True, set_status_on_exception=True -) def repository(path): return index("") @web.route("/repository//trigger/", methods=["GET"]) @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.buildtrigger", record_exception=True, set_status_on_exception=True -) def buildtrigger(path, trigger): return index("") @web.route("/security/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.security", record_exception=True, set_status_on_exception=True -) def security(): return index("") @@ -343,18 +265,12 @@ def security(): @web.route("/enterprise/") @no_cache @route_show_if(features.BILLING) -@tracer.start_as_current_span( - "quay.endpoints.web.enterprise", record_exception=True, set_status_on_exception=True -) def enterprise(): return redirect("/plans?tab=enterprise") @web.route("/__exp/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.exp", record_exception=True, set_status_on_exception=True -) def exp(expname): return index("") @@ -362,27 +278,18 @@ def exp(expname): @web.route("/v1") @web.route("/v1/") @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.v1", record_exception=True, set_status_on_exception=True -) def v1(): return index("") @web.route("/tos", methods=["GET"]) @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.tos", record_exception=True, set_status_on_exception=True -) def tos(): return index("") @web.route("/privacy", methods=["GET"]) @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.privacy", record_exception=True, set_status_on_exception=True -) def privacy(): return index("") @@ -391,9 +298,6 @@ def privacy(): @web.route("/health/instance", methods=["GET"]) @process_auth_or_cookie @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.instance_health", record_exception=True, set_status_on_exception=True -) def instance_health(): checker = get_healthchecker(app, config_provider, instance_keys) (data, status_code) = checker.check_instance() @@ -406,9 +310,6 @@ def instance_health(): @web.route("/health/endtoend", methods=["GET"]) @process_auth_or_cookie @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.endtoend_health", record_exception=True, set_status_on_exception=True -) def endtoend_health(): checker = get_healthchecker(app, config_provider, instance_keys) (data, status_code) = checker.check_endtoend() @@ -420,9 +321,6 @@ def endtoend_health(): @web.route("/health/warning", methods=["GET"]) @process_auth_or_cookie @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.warning_health", record_exception=True, set_status_on_exception=True -) def warning_health(): checker = get_healthchecker(app, config_provider, instance_keys) (data, status_code) = checker.check_warning() @@ -435,9 +333,6 @@ def warning_health(): @route_show_if(features.BILLING) # Since this is only used in production. @process_auth_or_cookie @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.dbrevision_health", record_exception=True, set_status_on_exception=True -) def dbrevision_health(): # Find the revision from the database. result = db.execute_sql("select * from alembic_version limit 1").fetchone() @@ -461,9 +356,6 @@ def dbrevision_health(): @web.route("/health/enabledebug/", methods=["GET"]) @no_cache -@tracer.start_as_current_span( - "quay.endpoints.web.enable_health_debug", record_exception=True, set_status_on_exception=True -) def enable_health_debug(secret): if not secret: abort(404) @@ -488,9 +380,6 @@ def robots(): @web.route("/buildlogs/", methods=["GET"]) @route_show_if(features.BUILD_SUPPORT) @process_auth_or_cookie -@tracer.start_as_current_span( - "quay.endpoints.web.buildlogs", record_exception=True, set_status_on_exception=True -) def buildlogs(build_uuid): found_build = model.build.get_repository_build(build_uuid) if not found_build: @@ -519,9 +408,6 @@ def buildlogs(build_uuid): @web.route("/exportedlogs/", methods=["GET"]) -@tracer.start_as_current_span( - "quay.endpoints.web.exportedlogs", record_exception=True, set_status_on_exception=True -) def exportedlogs(file_id): # Only enable this endpoint if local storage is available. has_local_storage = False @@ -554,9 +440,6 @@ def exportedlogs(file_id): @web.route("/logarchive/", methods=["GET"]) @route_show_if(features.BUILD_SUPPORT) @process_auth_or_cookie -@tracer.start_as_current_span( - "quay.endpoints.web.logarchive", record_exception=True, set_status_on_exception=True -) def logarchive(file_id): JSON_MIMETYPE = "application/json" try: @@ -588,9 +471,6 @@ def logarchive(file_id): @web.route("/receipt", methods=["GET"]) @route_show_if(features.BILLING) @require_session_login -@tracer.start_as_current_span( - "quay.endpoints.web.receipt", record_exception=True, set_status_on_exception=True -) def receipt(): if not current_user.is_authenticated: abort(401) @@ -628,9 +508,6 @@ def receipt(): @web.route("/authrepoemail", methods=["GET"]) @route_show_if(features.MAILING) -@tracer.start_as_current_span( - "quay.endpoints.web.confirm_repo_email", record_exception=True, set_status_on_exception=True -) def confirm_repo_email(): code = request.values["code"] record = None @@ -658,9 +535,6 @@ def confirm_repo_email(): @web.route("/confirm", methods=["GET"]) @route_show_if(features.MAILING) @anon_allowed -@tracer.start_as_current_span( - "quay.endpoints.web.confirm_email", record_exception=True, set_status_on_exception=True -) def confirm_email(): code = request.values["code"] user = None @@ -696,9 +570,6 @@ def confirm_email(): @web.route("/recovery", methods=["GET"]) @route_show_if(features.MAILING) @anon_allowed -@tracer.start_as_current_span( - "quay.endpoints.web.confirm_recovery", record_exception=True, set_status_on_exception=True -) def confirm_recovery(): code = request.values["code"] user = model.user.validate_reset_code(code) @@ -720,9 +591,6 @@ def confirm_recovery(): @web.route("/repository//status", methods=["GET"]) @parse_repository_name() @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.web.build_status_badge", record_exception=True, set_status_on_exception=True -) def build_status_badge(namespace_name, repo_name): token = request.args.get("token", None) repo = model.repository.get_repository(namespace_name, repo_name) @@ -766,9 +634,6 @@ class FlaskAuthorizationProvider(model.oauth.DatabaseAuthorizationProvider): @web.route("/oauth/authorizeapp", methods=["POST"]) @process_auth_or_cookie -@tracer.start_as_current_span( - "quay.endpoints.web.authorize_application", record_exception=True, set_status_on_exception=True -) def authorize_application(): # Check for an authenticated user. if not get_authenticated_user(): @@ -816,9 +681,6 @@ def authorize_application(): @web.route(app.config["LOCAL_OAUTH_HANDLER"], methods=["GET"]) -@tracer.start_as_current_span( - "quay.endpoints.web.oauth_local_handler", record_exception=True, set_status_on_exception=True -) def oauth_local_handler(): if not current_user.is_authenticated: abort(401) @@ -837,9 +699,6 @@ def oauth_local_handler(): @web.route("/oauth/denyapp", methods=["POST"]) @csrf_protect() -@tracer.start_as_current_span( - "quay.endpoints.web.deny_application", record_exception=True, set_status_on_exception=True -) def deny_application(): if not current_user.is_authenticated: abort(401) @@ -860,11 +719,6 @@ def deny_application(): @param_required("redirect_uri") @param_required("scope") @process_auth_or_cookie -@tracer.start_as_current_span( - "quay.endpoints.web.request_authorization_code", - record_exception=True, - set_status_on_exception=True, -) def request_authorization_code(): provider = FlaskAuthorizationProvider() response_type = request.args.get("response_type", "code") @@ -987,9 +841,6 @@ def request_authorization_code(): @param_required("scope") @param_required("username") @process_auth_or_cookie -@tracer.start_as_current_span( - "quay.endpoints.web.assign_user_to_app", record_exception=True, set_status_on_exception=True -) def assign_user_to_app(): response_type = request.args.get("response_type", "code") client_id = request.args.get("client_id", None) @@ -1048,11 +899,6 @@ def assign_user_to_app(): @param_required("redirect_uri", allow_body=True) @param_required("code", allow_body=True) @param_required("scope", allow_body=True) -@tracer.start_as_current_span( - "quay.endpoints.web.exchange_code_for_token", - record_exception=True, - set_status_on_exception=True, -) def exchange_code_for_token(): grant_type = request.values.get("grant_type", None) client_id = request.values.get("client_id", None) @@ -1075,11 +921,6 @@ def exchange_code_for_token(): @require_session_login @parse_repository_name() @route_show_if(features.BITBUCKET_BUILD) -@tracer.start_as_current_span( - "quay.endpoints.web.attach_bitbucket_trigger", - record_exception=True, - set_status_on_exception=True, -) def attach_bitbucket_trigger(namespace_name, repo_name): permission = AdministerRepositoryPermission(namespace_name, repo_name) if permission.can(): @@ -1114,11 +955,6 @@ def attach_bitbucket_trigger(namespace_name, repo_name): @web.route("/customtrigger/setup/", methods=["GET"]) @require_session_login @parse_repository_name() -@tracer.start_as_current_span( - "quay.endpoints.web.attach_custom_build_trigger", - record_exception=True, - set_status_on_exception=True, -) def attach_custom_build_trigger(namespace_name, repo_name): permission = AdministerRepositoryPermission(namespace_name, repo_name) if permission.can(): @@ -1147,9 +983,6 @@ def attach_custom_build_trigger(namespace_name, repo_name): @process_oauth @parse_repository_name(include_tag=True) @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.web.redirect_to_repository", record_exception=True, set_status_on_exception=True -) def redirect_to_repository(namespace_name, repo_name, tag_name): # Always return 200 for ac-discovery, to ensure that rkt and other ACI-compliant clients can # find the metadata they need. Permissions will be checked in the registry API. @@ -1202,9 +1035,6 @@ def redirect_to_repository(namespace_name, repo_name, tag_name): @no_cache @process_oauth @anon_protect -@tracer.start_as_current_span( - "quay.endpoints.web.redirect_to_namespace", record_exception=True, set_status_on_exception=True -) def redirect_to_namespace(namespace): okay, _ = model.user.validate_username(namespace) if not okay: @@ -1229,9 +1059,6 @@ def has_users(): @web.route("/api/v1/user/initialize", methods=["POST"]) @route_show_if(features.USER_INITIALIZE) -@tracer.start_as_current_span( - "quay.endpoints.web.user_initialize", record_exception=True, set_status_on_exception=True -) def user_initialize(): """ Create initial user in an empty database @@ -1304,9 +1131,6 @@ def user_initialize(): @web.route("/config", methods=["GET", "OPTIONS"]) @crossorigin(anonymous=False) -@tracer.start_as_current_span( - "quay.endpoints.web.config", record_exception=True, set_status_on_exception=True -) def config(): version_number = "" if not features.BILLING: diff --git a/storage/cloud.py b/storage/cloud.py index 7f732e6f9..cafb5588d 100644 --- a/storage/cloud.py +++ b/storage/cloud.py @@ -8,10 +8,6 @@ from io import BufferedIOBase, BytesIO, StringIO from itertools import chain from uuid import uuid4 -from util.metrics.otel import StatusCode, get_tracecontext, trace - -tracer = trace.get_tracer("cloud.py") - import boto3.session import botocore.config import botocore.exceptions @@ -161,80 +157,17 @@ class _CloudStorage(BaseStorageV2): if deferred_refreshable_credentials: self._session._session._credentials = deferred_refreshable_credentials - def response_tracing_headers(self, **kwargs): - name = f"boto3.{str(kwargs.get('event_name', 'before-send.s3.unknown')).split('.')[-1]}" - with tracer.start_as_current_span(name) as span: - sctx = span.get_span_context() - for k in kwargs.get("parsed", {}): - if isinstance(kwargs["parsed"][k], bytes): - span.set_attribute(k, kwargs["parsed"][k].decode("utf8")) - else: - span.set_attribute(k, str(kwargs["parsed"][k])) - for k in kwargs.get("context", {}): - if isinstance(kwargs.get("context")[k], bytes): - span.set_attribute(k, kwargs["context"][k].decode("utf8")) - else: - span.set_attribute(k, str(kwargs["context"][k])) - - def add_tracing_headers(self, request, **kwargs): - name = f"boto3.{str(kwargs.get('event_name', 'before-send.s3.unknown')).split('.')[-1]}" - with tracer.start_as_current_span(name) as span: - sctx = span.get_span_context() - span.set_attribute("method", str(request.method)) - span.set_attribute("hook-name", str(kwargs.get("event_name", "unknown"))) - for k in request.headers: - if isinstance(request.headers[k], bytes): - span.set_attribute(k, request.headers[k].decode("utf8")) - else: - span.set_attribute(k, str(request.headers[k])) - - try: - request.headers[ - "traceparent" - ] = f"00-{hex(sctx.trace_id)[2:]}-{hex(sctx.span_id)[2:]}-01" - request.headers["x-b3-traceid"] = hex(sctx.trace_id)[2:] - request.headers["x-b3-spanid"] = hex(sctx.span_id)[2:] - request.headers["x-b3-parentspanid"] = hex(sctx.span_id)[2:] - request.headers["x-b3-sampled"] = "1" - logger.error(f"[OTEL] request {request.headers}") - except Exception as trerr: - logger.error(f"OTEL {trerr}") - - def create_trace(self, operation_name, params, **kwargs): - try: - # ctx = get_tracecontext() - name = "boto3." + str(operation_name) - with tracer.start_as_current_span(name) as span: - for k in params: - span.set_attribute(k, str(params[k])) - for k in kwargs: - span.set_attribute(k, str(kwargs[k])) - span.set_status(StatusCode.OK) - except Exception as trerr: - logger.error(f"OTEL createtraces {trerr}") - def _initialize_cloud_conn(self): if not self._initialized: - with tracer.start_as_current_span( - "quay.storage.initialize", - ) as span: - - span.set_status(StatusCode.ERROR) - for k in self._connect_kwargs: - span.set_attribute(k, str(self._connect_kwargs[k])) - self._session.events.register("before-send.s3.*", self.add_tracing_headers) - self._session.events.register("after-call.s3.*", self.response_tracing_headers) - # Low-level client. Needed to generate presigned urls - self._cloud_conn = self._session.client("s3", **self._connect_kwargs) - self._cloud_bucket = self._session.resource("s3", **self._connect_kwargs).Bucket( - self._bucket_name - ) - - # This will raise a ClientError if the bucket does ot exists. - # We actually want an exception raised if the bucket does not exists (same as in boto2) - self._cloud_conn.head_bucket(Bucket=self._bucket_name) - span.set_status(StatusCode.OK) - self._initialized = True + # Low-level client. Needed to generate presigned urls + self._cloud_conn = self._session.client("s3", **self._connect_kwargs) + self._cloud_bucket = self._session.resource("s3", **self._connect_kwargs).Bucket( + self._bucket_name + ) + # This will raise a ClientError if the bucket does ot exists. + # We actually want an exception raised if the bucket does not exists (same as in boto2) + self._cloud_conn.head_bucket(Bucket=self._bucket_name) + self._initialized = True def _debug_key(self, obj): """ @@ -289,9 +222,6 @@ class _CloudStorage(BaseStorageV2): def get_cloud_bucket(self): return self._cloud_bucket - @tracer.start_as_current_span( - "quay.storage.get_content", record_exception=True, set_status_on_exception=True - ) def get_content(self, path): self._initialize_cloud_conn() path = self._init_path(path) @@ -306,9 +236,6 @@ class _CloudStorage(BaseStorageV2): raise - @tracer.start_as_current_span( - "quay.storage.put_content", record_exception=True, set_status_on_exception=True - ) def put_content(self, path, content): self._initialize_cloud_conn() path = self._init_path(path) @@ -319,42 +246,22 @@ class _CloudStorage(BaseStorageV2): def get_supports_resumable_downloads(self): return True - @tracer.start_as_current_span( - "quay.storage.get_direct_download_url", record_exception=True, set_status_on_exception=True - ) def get_direct_download_url( self, path, request_ip=None, expires_in=60, requires_cors=False, head=False, **kwargs ): self._initialize_cloud_conn() path = self._init_path(path) - with tracer.start_as_current_span( - "quay.storage.presigned_url", - attributes=dict( - path=str(path), - request_ip=str(request_ip), - expires_in=expires_in, - requires_cors=requires_cors, - head=head, - ), - ) as span: + client_method = "get_object" + if head: + client_method = "head_object" - client_method = "get_object" - if head: - client_method = "head_object" + return self.get_cloud_conn().generate_presigned_url( + client_method, + Params={"Bucket": self._bucket_name, "Key": path}, + ExpiresIn=expires_in, + ) - uri = self.get_cloud_conn().generate_presigned_url( - client_method, - Params={"Bucket": self._bucket_name, "Key": path}, - ExpiresIn=expires_in, - ) - span.set_attribute("presign_url", uri) - span.set_status(StatusCode.OK) - return uri - - @tracer.start_as_current_span( - "quay.storage.get_direct_upload_url", record_exception=True, set_status_on_exception=True - ) def get_direct_upload_url(self, path, mime_type, requires_cors=True): self._initialize_cloud_conn() path = self._init_path(path) @@ -368,12 +275,8 @@ class _CloudStorage(BaseStorageV2): ExpiresIn=300, ) - @tracer.start_as_current_span( - "quay.storage.stream_read", record_exception=True, set_status_on_exception=True - ) def stream_read(self, path): self._initialize_cloud_conn() - path = self._init_path(path) obj = self.get_cloud_bucket().Object(path) try: @@ -390,12 +293,8 @@ class _CloudStorage(BaseStorageV2): break yield data - @tracer.start_as_current_span( - "quay.storage.stream_read_file", record_exception=True, set_status_on_exception=True - ) def stream_read_file(self, path): self._initialize_cloud_conn() - path = self._init_path(path) obj = self.get_cloud_bucket().Object(path) try: @@ -406,15 +305,9 @@ class _CloudStorage(BaseStorageV2): raise return StreamReadKeyAsFile(obj.get()["Body"]) - @tracer.start_as_current_span( - "quay.storage.__initiate_multipart_upload", - record_exception=True, - set_status_on_exception=True, - ) def __initiate_multipart_upload(self, path, content_type, content_encoding): # Minimum size of upload part size on S3 is 5MB self._initialize_cloud_conn() - path = self._init_path(path) obj = self.get_cloud_bucket().Object(path) @@ -528,12 +421,8 @@ class _CloudStorage(BaseStorageV2): return total_bytes_written, write_error - @tracer.start_as_current_span( - "quay.storage.exists", record_exception=True, set_status_on_exception=True - ) def exists(self, path): self._initialize_cloud_conn() - path = self._init_path(path) obj = self.get_cloud_bucket().Object(path) try: @@ -544,12 +433,8 @@ class _CloudStorage(BaseStorageV2): raise return True - @tracer.start_as_current_span( - "quay.storage.remove", record_exception=True, set_status_on_exception=True - ) def remove(self, path): self._initialize_cloud_conn() - path = self._init_path(path) obj = self.get_cloud_bucket().Object(path) try: @@ -569,12 +454,8 @@ class _CloudStorage(BaseStorageV2): obj = self.get_cloud_bucket().Object(content["Key"]) obj.delete() - @tracer.start_as_current_span( - "quay.storage.checksum", record_exception=True, set_status_on_exception=True - ) def get_checksum(self, path): self._initialize_cloud_conn() - path = self._init_path(path) obj = self.get_cloud_bucket().Object(path) try: @@ -586,9 +467,6 @@ class _CloudStorage(BaseStorageV2): return obj.e_tag[1:-1][:7] - @tracer.start_as_current_span( - "quay.storage.copy_to", record_exception=True, set_status_on_exception=True - ) def copy_to(self, destination, path): """ Copies the given path from this storage to the destination storage. @@ -659,9 +537,6 @@ class _CloudStorage(BaseStorageV2): return random_uuid, metadata - @tracer.start_as_current_span( - "quay.storage.stream_upload_chunk", record_exception=True, set_status_on_exception=True - ) def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None): self._initialize_cloud_conn() @@ -734,16 +609,13 @@ class _CloudStorage(BaseStorageV2): except botocore.exceptions.ClientError as s3re: # sometimes HTTPStatusCode isn't set for some reason, so we need # to protect ourselves against a KeyError. - with tracer.start_as_current_span( - "boto3.Exception", attributes={"args": str(args), "kwargs": str(kwargs)} - ) as span: - if ( - remaining_retries - and s3re.response["Error"].get("HTTPStatusCode", 0) == 200 - and s3re.response["Error"].get("Code", "") == "InternalError" - ): - # Weird internal error case. Retry. - continue + if ( + remaining_retries + and s3re.response["Error"].get("HTTPStatusCode", 0) == 200 + and s3re.response["Error"].get("Code", "") == "InternalError" + ): + # Weird internal error case. Retry. + continue # Otherwise, raise it. logger.exception("Exception trying to perform action %s", action) @@ -768,9 +640,6 @@ class _CloudStorage(BaseStorageV2): ): yield subchunk - @tracer.start_as_current_span( - "quay.storage.complete_chunked_upload", record_exception=True, set_status_on_exception=True - ) def complete_chunked_upload(self, uuid, final_path, storage_metadata, force_client_side=False): self._initialize_cloud_conn() chunk_list = self._chunk_list_from_metadata(storage_metadata) @@ -820,56 +689,26 @@ class _CloudStorage(BaseStorageV2): # [_PartUpload] upload_parts = [] - for index, chunk in enumerate(updated_chunks): abs_chunk_path = self._init_path(chunk.path) - with tracer.start_as_current_span( - "quay.storage.action_with_retry", - attributes={ - "action": "copy_from", - "Bucket": self.get_cloud_bucket().name, - "Key": chunk.path, - "Range": f"bytes={chunk.offset}-{chunk.length + chunk.offset - 1}", - }, - ) as span: - part_copy = self._perform_action_with_retry( - mpu.Part(index + 1).copy_from, - CopySource={ - "Bucket": self.get_cloud_bucket().name, - "Key": abs_chunk_path, - }, - CopySourceRange="bytes=%s-%s" - % (chunk.offset, chunk.length + chunk.offset - 1), - ) - span.set_status(StatusCode.OK) - - span.add_event( - "quay.storag.part.append", - attributes={ - "index": index + 1, - "etag": part_copy["CopyPartResult"]["ETag"], - }, - ) - upload_parts.append( - _PartUpload(index + 1, part_copy["CopyPartResult"]["ETag"]) - ) - - with tracer.start_as_current_span( - "quay.storage.action_with_retry", - attributes={ - "action": "complete", - "Bucket": self.get_cloud_bucket().name, - }, - ) as span: - self._perform_action_with_retry( - mpu.complete, - MultipartUpload={ - "Parts": [ - {"ETag": p.e_tag, "PartNumber": p.part_number} for p in upload_parts - ] - }, + part_copy = self._perform_action_with_retry( + mpu.Part(index + 1).copy_from, + CopySource={"Bucket": self.get_cloud_bucket().name, "Key": abs_chunk_path}, + CopySourceRange="bytes=%s-%s" + % (chunk.offset, chunk.length + chunk.offset - 1), ) + + upload_parts.append(_PartUpload(index + 1, part_copy["CopyPartResult"]["ETag"])) + + self._perform_action_with_retry( + mpu.complete, + MultipartUpload={ + "Parts": [ + {"ETag": p.e_tag, "PartNumber": p.part_number} for p in upload_parts + ] + }, + ) except (botocore.exceptions.ClientError, IOError) as ioe: # Something bad happened, log it and then give up msg = "Exception when attempting server-side assembly for: %s" @@ -882,9 +721,6 @@ class _CloudStorage(BaseStorageV2): # pass that to stream_write to chunk and upload the final object. self._client_side_chunk_join(final_path, chunk_list) - @tracer.start_as_current_span( - "quay.storage.cancel_chunked_upload", record_exception=True, set_status_on_exception=True - ) def cancel_chunked_upload(self, uuid, storage_metadata): self._initialize_cloud_conn() @@ -892,9 +728,6 @@ class _CloudStorage(BaseStorageV2): for chunk in self._chunk_list_from_metadata(storage_metadata): self.remove(chunk.path) - @tracer.start_as_current_span( - "quay.storage.clean_partial_uploads", record_exception=True, set_status_on_exception=True - ) def clean_partial_uploads(self, deletion_date_threshold): self._initialize_cloud_conn() path = self._init_path("uploads") diff --git a/util/config/schema.py b/util/config/schema.py index 5bba2c7dd..9baae1272 100644 --- a/util/config/schema.py +++ b/util/config/schema.py @@ -1653,36 +1653,15 @@ CONFIG_SCHEMA = { "description": "name of service in otel spans", "x-example": "quay", }, - "OTEL_EXPORTER_OTLP_ENDPOINT": { + "dt_api_url": { "type": "string", - "description": "url for OTLP api environment overwrites", - "x-example": "https://dynatrace-api.example:443", + "description": "url for dynatrace api", + "x-example": "https://dynatrace-api.example", }, - "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": { + "dt_api_token": { "type": "string", - "description": "url for OTLP api environment overwrites", - "x-example": "https://dynatrace-api.example:443", - }, - "OTEL_EXPORTER_OTLP_HEADERS": { - "type": "object", - "description": "headers to include in API calls", - "x-example": {"Authorization": "Bearer api-token", "X-Org-Tenant": "some"}, - "items": {"type": "dict"}, - }, - "OTEL_EXPORTER_OTLP_INSECURE": { - "type": "boolean", - "description": "skip ssl verification for API connections", - "x-example": "true", - }, - "OTEL_EXPORTER_OTLP_PROTOCOL": { - "type": "string", - "description": "MUST be one of: grpc, http/protobuf, http/json", - "x-example": "http/protobuf", - }, - "OTEL_TRACES_SAMPLER_ARG": { - "type": "float", - "description": "Sampling decission rate as float", - "x-example": "0.001 # ( 1 / 1000)", + "description": "token for dynatrace api", + "x-example": "sometoken", }, }, }, diff --git a/util/metrics/otel.py b/util/metrics/otel.py index 3c7c76b23..00edaadc3 100644 --- a/util/metrics/otel.py +++ b/util/metrics/otel.py @@ -1,32 +1,13 @@ -import os from functools import wraps -from urllib.parse import urlparse -import opentelemetry.sdk.trace.id_generator as idg -from flask import request -from opentelemetry import context, trace -from opentelemetry.context.context import Context -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( - OTLPSpanExporter as grpcOTLPSpanExporter, -) -from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( - OTLPSpanExporter as httpOTLPSpanExporter, -) -from opentelemetry.sdk.resources import ( - SERVICE_NAME, - SERVICE_NAMESPACE, - SERVICE_VERSION, - Resource, -) +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.sampling import TraceIdRatioBased -from opentelemetry.trace import NonRecordingSpan, Span, SpanContext -from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from opentelemetry.trace.status import StatusCode import features -from _init import _get_version_number as get_quay_version def init_exporter(app_config): @@ -34,134 +15,29 @@ def init_exporter(app_config): otel_config = app_config.get("OTEL_CONFIG", {}) service_name = otel_config.get("service_name", "quay") - service_namespace = os.environ.get("QE_K8S_NAMESPACE", "standalone") - service_version = get_quay_version() - # compile headers if present - otel_headers = otel_config.get( - "OTEL_EXPORTER_OTLP_HEADERS", otel_config.get("OTEL_EXPORTER_OTLP_TRACES_HEADERS", {}) - ) - if otel_headers == {}: - # accept Environ overwrites - otel_headers = os.environ.get( - "OTEL_EXPORTER_OTLP_HEADERS", - os.environ.get("OTEL_EXPORTER_OTLP_TRACES_HEADERS", ""), - ) - # OTLP headers from environment are formatted as key=value,key2=value2 - # to ensure catching the format we - # first need to split by comma "," - # than split by equal "=" only once (example x-auth-sentry=sentry sentry_key=xxx) - try: - # will return a list of tuples [("x-auth-sentry", "sentry sentry_key=xxx"), ("x-tenant", "name")] - # that we transform into a dict - otel_headers = dict( - list(map(lambda x: tuple(x.strip().split("=", 1)), otel_headers.strip().split(","))) - ) - except ValueError: - # if empty [('',)] we cannot form a dict out of the parser - otel_headers = {} - # according to https://opentelemetry.io/docs/specs/otel/protocol/exporter/#endpoint-urls-for-otlphttp - # none signal specific configuration needs to append /v1/traces - otel_endpoint = otel_config.get( - "OTEL_EXPORTER_OTLP_ENDPOINT", - "http://127.0.0.1:80/v1/traces", - ) - if not urlparse(otel_endpoint).path.endswith("/v1/traces"): - otel_endpoint += "/v1/traces" + DT_API_URL = otel_config.get("dt_api_url", None) + DT_API_TOKEN = otel_config.get("dt_api_token", None) - if otel_endpoint == "http://127.0.0.1:80/v1/traces": - # accept Environ overwrites - # signal specific configuration needs to be unmodified - otel_endpoint = os.environ.get( - "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", - os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", otel_endpoint), - ) - otel_insecure = otel_config.get( - "OTEL_EXPORTER_OTLP_INSECURE", otel_config.get("OTEL_EXPORTER_OTLP_TRACES_INSECURE", False) - ) - if not otel_insecure: - # accept Environ overwrites - otel_insecure = bool( - os.environ.get( - "OTEL_EXPORTER_OTLP_INSECURE", - os.environ.get("OTEL_EXPORTER_OTLP_TRACES_INSECURE", False), - ) - ) + resource = Resource.create(attributes={SERVICE_NAME: service_name}) - resource = Resource.create( - attributes={ - SERVICE_NAME: service_name, - SERVICE_NAMESPACE: service_namespace, - SERVICE_VERSION: service_version, - } - ) - - # according to https://opentelemetry.io/docs/specs/otel/protocol/exporter/#specify-protocol - # http/protobuf should be the default and w should support grpc - if otel_config.get("OTEL_EXPORTER_OTLP_PROTOCOL", "http") == "grpc": - OTLPSpanExporter = grpcOTLPSpanExporter - elif otel_config.get("OTEL_EXPORTER_OTLP_PROTOCOL", "http") in ("http", "http/protobuf"): - OTLPSpanExporter = httpOTLPSpanExporter - elif otel_config.get("OTEL_EXPORTER_OTLP_PROTOCOL", "http") in ("http", "http/json"): - # http/json is only may which is why we fallback to http/protobuf - OTLPSpanExporter = httpOTLPSpanExporter - - # we should leave this to the collector to decide - otel_sample_arg = float(otel_config.get("OTEL_TRACES_SAMPLER_ARG", 0.001)) - if otel_sample_arg == 0.001: - # accept Environ overwrites - otel_sample_arg = float(os.environ.get("OTEL_TRACES_SAMPLER_ARG", otel_sample_arg)) - - sampler = TraceIdRatioBased(otel_sample_arg) + sampler = TraceIdRatioBased(1 / 1000) tracerProvider = TracerProvider(resource=resource, sampler=sampler) - processor = BatchSpanProcessor( - OTLPSpanExporter( - endpoint=otel_endpoint, - headers=otel_headers, + if DT_API_URL is not None and DT_API_TOKEN is not None: + processor = BatchSpanProcessor( + OTLPSpanExporter( + endpoint=DT_API_URL + "/v1/traces", + headers={"Authorization": "Api-Token " + DT_API_TOKEN}, + ) ) - ) + else: + spanExporter = OTLPSpanExporter(endpoint="http://jaeger:4317") + processor = BatchSpanProcessor(spanExporter) tracerProvider.add_span_processor(processor) trace.set_tracer_provider(tracerProvider) -def get_tracecontext(custom: str = "", headers: dict = {}) -> Context: - # used to extract trace context from headers or generate one if empty - # with a generated tracecontext that is inherited we do not generate multiple - # trace contexts on methods that are not related/chained - def create_random(): - while True: - span_id = hex(idg.RandomIdGenerator().generate_span_id()) - if len(span_id) == 18: - break - return f"00-{hex(idg.RandomIdGenerator().generate_trace_id())[2:]}" + f"-{span_id[2:]}-01" - - if custom == "": - carrier = {"traceparent": headers.get("traceparent", create_random())} - else: - carrier = {"traceparent": custom} - ctx = TraceContextTextMapPropagator().extract(carrier) - if ctx == {}: - ctx = context.get_current() - return ctx - - -def get_traceparent(_ctx) -> str: - # used to output traceparent into logs - try: - span = _ctx.get(list(_ctx)[0]).get_span_context() - except: - try: - span = _ctx.get_span_context() - except: - span = _ctx - try: - tp = f"00-{hex(span.trace_id)[2:]}-{hex(span.span_id)[2:]}-0{hex(span.trace_flags)[2:]}" - except Exception as err: - tp = "" - return tp - - def traced(span_name=None): """ Decorator for tracing function calls using OpenTelemetry. @@ -171,19 +47,14 @@ def traced(span_name=None): @wraps(func) def wrapper(*args, **kwargs): if features.OTEL_TRACING: - ctx = get_tracecontext(request.headers) tracer = trace.get_tracer(__name__) name = span_name if span_name else func.__name__ - with tracer.start_as_current_span(name, context=ctx) as span: + with tracer.start_as_current_span(name) as span: try: - span.set_status(StatusCode.OK) - span.set_attribute("function", name) - span.set_attribute("args", str(args)) - span.set_attribute("kwargs", str(kwargs)) return func(*args, **kwargs) except Exception as e: span.record_exception(e) - span.set_status(StatusCode.ERROR) + span.set_status(trace.Status(trace.StatusCode.ERROR)) else: return func(*args, **kwargs) diff --git a/util/test/test_otel.py b/util/test/test_otel.py deleted file mode 100644 index 9b7e5370a..000000000 --- a/util/test/test_otel.py +++ /dev/null @@ -1,61 +0,0 @@ -import opentelemetry.exporter.otlp.proto.grpc.trace_exporter -import opentelemetry.exporter.otlp.proto.http.trace_exporter -import pytest -from opentelemetry import context, trace - -from util.metrics.otel import init_exporter - -# check various configuration options -app_configs = [ - { - "OTEL_CONFIG": { - "service_name": "unittest", - "OTEL_EXPORTER_OTLP_HEADERS": { - "Authorization": "Api-Token abcdef", - "Tenant-Id": "Tenant 1", - }, - "OTEL_EXPORTER_OTLP_ENDPOINT": "https://otlp.example.com", - "OTEL_EXPORTER_OTLP_INSECURE": False, - "OTEL_EXPORTER_OTLP_PROTOCOL": "http", - "OTEL_TRACES_SAMPLER_ARG": 0.005, - }, - "FEATURE_ENABLE_OTEL": True, - }, -] - - -@pytest.fixture() -def test_otel_config(): - # since we can set tracer_provider only once, we go with the default one - app.config["OTEL_CONFIG"] = app_configs[0]["OTEL_CONFIG"] - app.config["FEATURE_OTEL_TRACING"] = True - init_exporter(app) - tracer = trace.get_tracer("unittest") - assert tracer.sampler.rate == 0.005 - assert tracer.resource.attributes.get("service.name") == "unittest" - assert tracer.resource.attributes.get("service.namespace") == "standalone" - assert tracer.resource.attributes.get("service.version", False) - # convert to dict so we do not need to differentiate between http/grpc headers - assert ( - dict(tracer.span_processor._span_processors[0].span_exporter._headers).get("Authorization") - == "Api-Token abcdef" - ) - assert ( - dict(tracer.span_processor._span_processors[0].span_exporter._headers).get("Tenant-Id") - == "Tenant 1" - ) - - # http exporter - assert ( - tracer.span_processor._span_processors[0].span_exporter._endpoint - == "https://otlp.example.com/v1/traces" - ) - - # grpc exporter - # assert tracer.span_processor._span_processors[0].span_exporter._endpoint == "otlp.example.com" - - assert isinstance( - tracer.span_processor._span_processors[0].span_exporter, - opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter, - ) - # assert isinstance(tracer.span_processor._span_processors[0].span_exporter, opentelemetry.exporter.otlp.proto.grpc.trace_exporter.OTLPSpanExporter)