1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00

Revert "tracing: improving otlp handling (PROJQUAY-8902) (#4198)"

This reverts commit 89e758846f.
This commit is contained in:
Jordi Piriz
2025-11-03 11:09:33 +01:00
committed by GitHub
parent cb5796bfbb
commit d3a05331ef
17 changed files with 329 additions and 1619 deletions

21
app.py
View File

@@ -11,7 +11,6 @@ from flask_mail import Mail
from flask_principal import Principal
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.trace.status import StatusCode
from werkzeug.exceptions import HTTPException
from werkzeug.middleware.proxy_fix import ProxyFix
@@ -57,7 +56,7 @@ from util.ipresolver import IPResolver
from util.label_validator import LabelValidator
from util.log import filter_logs
from util.marketplace import MarketplaceSubscriptionApi, MarketplaceUserApi
from util.metrics.otel import get_tracecontext, get_traceparent, init_exporter
from util.metrics.otel import init_exporter
from util.metrics.prometheus import PrometheusPlugin
from util.names import urn_generator
from util.pullmetrics import PullMetricsBuilderModule
@@ -371,27 +370,9 @@ def load_user(user_uuid):
get_app_url = partial(get_app_url, app.config)
if features.OTEL_TRACING:
def request_hook(span, environ):
if span and span.is_recording():
for header in request.headers:
span.set_attribute(header[0], str(header[1]))
def response_hook(span, status, response_headers):
if span and span.is_recording():
status_code = int(status.split()[0])
if all([status_code >= 200, status_code < 400]):
span.set_status(StatusCode.OK)
else:
span.set_status(StatusCode.ERROR)
tparent = get_traceparent(span)
response_headers.append(("traceparent", tparent))
FlaskInstrumentor().instrument_app(
app,
excluded_urls=app.config.get("OTEL_TRACING_EXCLUDED_URLS", None),
request_hook=request_hook,
response_hook=response_hook,
)
Psycopg2Instrumentor().instrument()
init_exporter(app.config)

View File

@@ -4,10 +4,6 @@ from datetime import datetime, timedelta
from prometheus_client import Counter, Gauge
from util.metrics.otel import StatusCode, get_tracecontext, trace
tracer = trace.get_tracer("quay.queue")
from data.database import QueueItem, db, db_for_update, db_random_func
from util.morecollections import AttrDict
@@ -102,9 +98,6 @@ class WorkQueue(object):
~(QueueItem.queue_name << running_query)
)
@tracer.start_as_current_span(
"quay.queue.num_alive_jobs", record_exception=True, set_status_on_exception=True
)
def num_alive_jobs(self, canonical_name_list):
"""
Returns the number of alive queue items with a given prefix.
@@ -123,9 +116,6 @@ class WorkQueue(object):
.count()
)
@tracer.start_as_current_span(
"quay.queue.num_available_jobs_between", record_exception=True, set_status_on_exception=True
)
def num_available_jobs_between(
self, available_min_time, available_max_time, canonical_name_list
):
@@ -152,9 +142,6 @@ class WorkQueue(object):
def _item_by_id_for_update(queue_id):
return db_for_update(QueueItem.select().where(QueueItem.id == queue_id)).get()
@tracer.start_as_current_span(
"quay.queue.get_metrics", record_exception=True, set_status_on_exception=True
)
def get_metrics(self):
now = datetime.utcnow()
name_match_query = self._name_match_query()
@@ -174,18 +161,12 @@ class WorkQueue(object):
return (running_count, available_not_running_count, available_count)
@tracer.start_as_current_span(
"quay.queue.update_metrics", record_exception=True, set_status_on_exception=True
)
def update_metrics(self):
(running_count, available_not_running_count, available_count) = self.get_metrics()
queue_items_locked.labels(self._queue_name).set(running_count)
queue_items_available.labels(self._queue_name).set(available_count)
queue_items_available_unlocked.labels(self._queue_name).set(available_not_running_count)
@tracer.start_as_current_span(
"quay.queue.has_retries_remaining", record_exception=True, set_status_on_exception=True
)
def has_retries_remaining(self, item_id):
"""
Returns whether the queue item with the given id has any retries remaining.
@@ -198,9 +179,6 @@ class WorkQueue(object):
except QueueItem.DoesNotExist:
return False
@tracer.start_as_current_span(
"quay.queue.delete_namespaced_items", record_exception=True, set_status_on_exception=True
)
def delete_namespaced_items(self, namespace, subpath=None):
"""
Deletes all items in this queue that exist under the given namespace.
@@ -212,9 +190,6 @@ class WorkQueue(object):
queue_prefix = "%s/%s/%s%%" % (self._queue_name, namespace, subpath_query)
return QueueItem.delete().where(QueueItem.queue_name**queue_prefix).execute()
@tracer.start_as_current_span(
"quay.queue.alive", record_exception=True, set_status_on_exception=True
)
def alive(self, canonical_name_list):
"""
Returns True if a job matching the canonical name list is currently processing or available.
@@ -265,9 +240,6 @@ class WorkQueue(object):
queue_item_puts.labels(self._queue_name).inc(len(current_batch))
remaining = remaining[batch_size:]
@tracer.start_as_current_span(
"quay.queue.get", record_exception=True, set_status_on_exception=True
)
def put(self, canonical_name_list, message, available_after=0, retries_remaining=5):
"""
Put an item, if it shouldn't be processed for some number of seconds, specify that amount as
@@ -344,9 +316,6 @@ class WorkQueue(object):
changed = set_unavailable_query.execute()
return changed == 1
@tracer.start_as_current_span(
"quay.queue.get", record_exception=True, set_status_on_exception=True
)
def get(self, processing_time=300, ordering_required=False):
"""
Get an available item and mark it as unavailable for the default of five minutes.
@@ -382,9 +351,6 @@ class WorkQueue(object):
}
)
@tracer.start_as_current_span(
"quay.queue.cancel", record_exception=True, set_status_on_exception=True
)
def cancel(self, item_id):
"""
Attempts to cancel the queue item with the given ID from the queue.
@@ -394,15 +360,9 @@ class WorkQueue(object):
count_removed = QueueItem.delete().where(QueueItem.id == item_id).execute()
return count_removed > 0
@tracer.start_as_current_span(
"quay.queue.complete", record_exception=True, set_status_on_exception=True
)
def complete(self, completed_item):
self._currently_processing = not self.cancel(completed_item.id)
@tracer.start_as_current_span(
"quay.queue.incomplete", record_exception=True, set_status_on_exception=True
)
def incomplete(self, incomplete_item, retry_after=300, restore_retry=False):
with self._transaction_factory(db):
retry_date = datetime.utcnow() + timedelta(seconds=retry_after)
@@ -421,9 +381,6 @@ class WorkQueue(object):
except QueueItem.DoesNotExist:
return False
@tracer.start_as_current_span(
"quay.queue.extend_processing", record_exception=True, set_status_on_exception=True
)
def extend_processing(
self, item, seconds_from_now, minimum_extension=MINIMUM_EXTENSION, updated_data=None
):

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,6 @@ from functools import wraps
from flask import Blueprint, jsonify, make_response
from util.metrics.otel import trace
tracer = trace.get_tracer("quay.v1")
import features
from app import app
from data.readreplica import ReadOnlyModeException
@@ -28,9 +24,6 @@ def internal_ping():
@v1_bp.route("/_ping")
@anon_allowed
@tracer.start_as_current_span(
"quay.endpoints.v1._ping", record_exception=True, set_status_on_exception=True
)
def ping():
# NOTE: any changes made here must also be reflected in the nginx config
response = make_response("true", 200)
@@ -40,9 +33,6 @@ def ping():
@v1_bp.app_errorhandler(ReadOnlyModeException)
@tracer.start_as_current_span(
"quay.endpoints.v1.handle_readonly", record_exception=True, set_status_on_exception=True
)
def handle_readonly(ex):
response = jsonify(
{

View File

@@ -5,11 +5,6 @@ from functools import wraps
from flask import jsonify, make_response, request, session
from util.metrics.otel import trace
tracer = trace.get_tracer("quay.v1.index")
from app import app, docker_v2_signing_key, storage, userevents
from auth.auth_context import get_authenticated_context, get_authenticated_user
from auth.credentials import CredentialKind, validate_credentials
@@ -110,9 +105,6 @@ def generate_headers(scope=GrantType.READ_REPOSITORY, add_grant_for_status=None)
@v1_bp.route("/users/", methods=["POST"])
@anon_allowed
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.index.create_user", record_exception=True, set_status_on_exception=True
)
def create_user():
user_data = request.get_json()
if not user_data or not "username" in user_data:
@@ -165,9 +157,6 @@ def create_user():
@v1_bp.route("/users/", methods=["GET"])
@process_auth
@anon_allowed
@tracer.start_as_current_span(
"quay.endpoints.v1.index.get_user", record_exception=True, set_status_on_exception=True
)
def get_user():
context = get_authenticated_context()
if not context or context.is_anonymous:
@@ -185,9 +174,6 @@ def get_user():
@process_auth
@anon_allowed
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.index.update_user", record_exception=True, set_status_on_exception=True
)
def update_user(username):
permission = UserAdminPermission(username)
if permission.can():
@@ -216,9 +202,6 @@ def update_user(username):
@generate_headers(scope=GrantType.WRITE_REPOSITORY, add_grant_for_status=201)
@anon_allowed
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.index.create_repository", record_exception=True, set_status_on_exception=True
)
def create_repository(namespace_name, repo_name):
# Verify that the repository name is valid.
if not REPOSITORY_NAME_REGEX.match(repo_name):
@@ -304,9 +287,6 @@ def create_repository(namespace_name, repo_name):
@generate_headers(scope=GrantType.WRITE_REPOSITORY)
@anon_allowed
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.index.update_images", record_exception=True, set_status_on_exception=True
)
def update_images(namespace_name, repo_name):
permission = ModifyRepositoryPermission(namespace_name, repo_name)
if permission.can():
@@ -349,11 +329,6 @@ def update_images(namespace_name, repo_name):
@ensure_namespace_enabled
@generate_headers(scope=GrantType.READ_REPOSITORY)
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.v1.index.get_repository_images",
record_exception=True,
set_status_on_exception=True,
)
def get_repository_images(namespace_name, repo_name):
repository_ref = registry_model.lookup_repository(
namespace_name, repo_name, kind_filter="image"
@@ -389,11 +364,6 @@ def get_repository_images(namespace_name, repo_name):
@generate_headers(scope=GrantType.WRITE_REPOSITORY)
@anon_allowed
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.index.delete_repository_images",
record_exception=True,
set_status_on_exception=True,
)
def delete_repository_images(namespace_name, repo_name):
abort(501, "Not Implemented", issue="not-implemented")
@@ -405,11 +375,6 @@ def delete_repository_images(namespace_name, repo_name):
@check_repository_state
@anon_allowed
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.index.put_repository_auth",
record_exception=True,
set_status_on_exception=True,
)
def put_repository_auth(namespace_name, repo_name):
abort(501, "Not Implemented", issue="not-implemented")
@@ -417,9 +382,6 @@ def put_repository_auth(namespace_name, repo_name):
@v1_bp.route("/search", methods=["GET"])
@process_auth
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.v1.index.get_search", record_exception=True, set_status_on_exception=True
)
def get_search():
query = request.args.get("q") or ""

View File

@@ -4,10 +4,6 @@ from datetime import datetime
from functools import wraps
from time import time
from util.metrics.otel import trace
tracer = trace.get_tracer("quay.v1.registry")
from flask import Response
from flask import abort as flask_abort
from flask import make_response, redirect, request, session
@@ -90,11 +86,6 @@ def set_cache_headers(f):
@require_completion
@set_cache_headers
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.v1.registry.head_image_layer",
record_exception=True,
set_status_on_exception=True,
)
def head_image_layer(namespace, repository, image_id, headers):
permission = ReadRepositoryPermission(namespace, repository)
repository_ref = registry_model.lookup_repository(namespace, repository, kind_filter="image")
@@ -135,11 +126,6 @@ def head_image_layer(namespace, repository, image_id, headers):
@set_cache_headers
@check_region_blacklisted()
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.v1.registry.head_image_layer",
record_exception=True,
set_status_on_exception=True,
)
def get_image_layer(namespace, repository, image_id, headers):
permission = ReadRepositoryPermission(namespace, repository)
repository_ref = registry_model.lookup_repository(namespace, repository, kind_filter="image")
@@ -193,11 +179,6 @@ def get_image_layer(namespace, repository, image_id, headers):
@check_repository_state
@anon_protect
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.registry.put_image_layer",
record_exception=True,
set_status_on_exception=True,
)
def put_image_layer(namespace, repository, image_id):
logger.debug("Checking repo permissions")
permission = ModifyRepositoryPermission(namespace, repository)
@@ -303,11 +284,6 @@ def put_image_layer(namespace, repository, image_id):
@check_repository_state
@anon_protect
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.registry.put_image_checksum",
record_exception=True,
set_status_on_exception=True,
)
def put_image_checksum(namespace, repository, image_id):
logger.debug("Checking repo permissions")
permission = ModifyRepositoryPermission(namespace, repository)
@@ -371,9 +347,6 @@ def put_image_checksum(namespace, repository, image_id):
@require_completion
@set_cache_headers
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.v1.registry.get_image_json", record_exception=True, set_status_on_exception=True
)
def get_image_json(namespace, repository, image_id, headers):
logger.debug("Checking repo permissions")
permission = ReadRepositoryPermission(namespace, repository)
@@ -406,11 +379,6 @@ def get_image_json(namespace, repository, image_id, headers):
@require_completion
@set_cache_headers
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.v1.registry.get_image_ancestry",
record_exception=True,
set_status_on_exception=True,
)
def get_image_ancestry(namespace, repository, image_id, headers):
logger.debug("Checking repo permissions")
permission = ReadRepositoryPermission(namespace, repository)
@@ -437,9 +405,6 @@ def get_image_ancestry(namespace, repository, image_id, headers):
@check_repository_state
@anon_protect
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.registry.put_image_json", record_exception=True, set_status_on_exception=True
)
def put_image_json(namespace, repository, image_id):
logger.debug("Checking repo permissions")
permission = ModifyRepositoryPermission(namespace, repository)

View File

@@ -3,10 +3,6 @@ import logging
from flask import abort, jsonify, make_response, request, session
from util.metrics.otel import trace
tracer = trace.get_tracer("quay.v1.tag")
from app import docker_v2_signing_key, model_cache, storage
from auth.decorators import process_auth
from auth.permissions import ModifyRepositoryPermission, ReadRepositoryPermission
@@ -29,9 +25,6 @@ logger = logging.getLogger(__name__)
@process_auth
@anon_protect
@parse_repository_name()
@tracer.start_as_current_span(
"quay.endpoints.v1.tag.get_tags", record_exception=True, set_status_on_exception=True
)
def get_tags(namespace_name, repo_name):
permission = ReadRepositoryPermission(namespace_name, repo_name)
repository_ref = registry_model.lookup_repository(
@@ -51,9 +44,6 @@ def get_tags(namespace_name, repo_name):
@process_auth
@anon_protect
@parse_repository_name()
@tracer.start_as_current_span(
"quay.endpoints.v1.tag.get_tag", record_exception=True, set_status_on_exception=True
)
def get_tag(namespace_name, repo_name, tag):
permission = ReadRepositoryPermission(namespace_name, repo_name)
repository_ref = registry_model.lookup_repository(
@@ -81,9 +71,6 @@ def get_tag(namespace_name, repo_name, tag):
@check_repository_state
@check_v1_push_enabled()
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.tag.put_tag", record_exception=True, set_status_on_exception=True
)
def put_tag(namespace_name, repo_name, tag):
permission = ModifyRepositoryPermission(namespace_name, repo_name)
repository_ref = registry_model.lookup_repository(
@@ -135,9 +122,6 @@ def put_tag(namespace_name, repo_name, tag):
@check_repository_state
@check_v1_push_enabled()
@check_readonly
@tracer.start_as_current_span(
"quay.endpoints.v1.tag.delete_tag", record_exception=True, set_status_on_exception=True
)
def delete_tag(namespace_name, repo_name, tag):
permission = ModifyRepositoryPermission(namespace_name, repo_name)
repository_ref = registry_model.lookup_repository(

View File

@@ -1,10 +1,6 @@
import logging
import re
from util.metrics.otel import StatusCode, trace
tracer = trace.get_tracer("quay.endpoints.v2.blob")
from flask import Response
from flask import abort as flask_abort
from flask import redirect, request, url_for
@@ -71,9 +67,6 @@ BLOB_CONTENT_TYPE = "application/octet-stream"
@anon_allowed
@cache_control(max_age=31436000)
@inject_registry_model()
@tracer.start_as_current_span(
"quay.v2.blobs.check_blob_exists", record_exception=True, set_status_on_exception=True
)
def check_blob_exists(namespace_name, repo_name, digest, registry_model):
# Find the blob.
blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest)
@@ -104,9 +97,6 @@ def check_blob_exists(namespace_name, repo_name, digest, registry_model):
@check_region_blacklisted(BlobDownloadGeoBlocked)
@cache_control(max_age=31536000)
@inject_registry_model()
@tracer.start_as_current_span(
"quay.v2.blobs.download_blob", record_exception=True, set_status_on_exception=True
)
def download_blob(namespace_name, repo_name, digest, registry_model):
# Find the blob.
blob = registry_model.get_cached_repo_blob(model_cache, namespace_name, repo_name, digest)
@@ -272,9 +262,6 @@ def _try_to_mount_blob(repository_ref, mount_blob_digest):
@anon_protect
@check_readonly
@check_pushes_disabled
@tracer.start_as_current_span(
"quay.v2.blobs.start_blob_upload", record_exception=True, set_status_on_exception=True
)
def start_blob_upload(namespace_name, repo_name):
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
@@ -347,9 +334,6 @@ def start_blob_upload(namespace_name, repo_name):
@process_registry_jwt_auth(scopes=["pull"])
@require_repo_write(allow_for_superuser=True, disallow_for_restricted_users=True)
@anon_protect
@tracer.start_as_current_span(
"quay.v2.blobs.fetch_existing_upload", record_exception=True, set_status_on_exception=True
)
def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repository_ref is None:
@@ -380,9 +364,6 @@ def fetch_existing_upload(namespace_name, repo_name, upload_uuid):
@anon_protect
@check_readonly
@check_pushes_disabled
@tracer.start_as_current_span(
"quay.v2.blobs.upload_chunk", record_exception=True, set_status_on_exception=True
)
def upload_chunk(namespace_name, repo_name, upload_uuid):
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repository_ref is None:
@@ -425,11 +406,6 @@ def upload_chunk(namespace_name, repo_name, upload_uuid):
@require_repo_write(allow_for_superuser=True, disallow_for_restricted_users=True)
@anon_protect
@check_readonly
@tracer.start_as_current_span(
"quay.v2.blobs.monolithic_upload_or_last_chunk",
record_exception=True,
set_status_on_exception=True,
)
def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
# Ensure the digest is present before proceeding.
@@ -483,9 +459,6 @@ def monolithic_upload_or_last_chunk(namespace_name, repo_name, upload_uuid):
@anon_protect
@check_readonly
@check_pushes_disabled
@tracer.start_as_current_span(
"quay.v2.blobs.cancel_upload", record_exception=True, set_status_on_exception=True
)
def cancel_upload(namespace_name, repo_name, upload_uuid):
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repository_ref is None:
@@ -509,9 +482,6 @@ def cancel_upload(namespace_name, repo_name, upload_uuid):
@anon_protect
@check_readonly
@check_pushes_disabled
@tracer.start_as_current_span(
"quay.v2.blobs.delete_digest", record_exception=True, set_status_on_exception=True
)
def delete_digest(namespace_name, repo_name, digest):
# We do not support deleting arbitrary digests, as they break repo images.
raise Unsupported()
@@ -590,37 +560,26 @@ def _upload_chunk(blob_uploader, commit_digest=None):
If commit_digest is specified, the upload is committed to a blob once the stream's data has been
read and stored.
"""
with tracer.start_as_current_span(
"quay._upload_chunk",
) as span:
start_offset, length = _start_offset_and_length(request.headers.get("content-range"))
span.set_attribute("offset", str(start_offset))
span.set_attribute("length", str(length))
start_offset, length = _start_offset_and_length(request.headers.get("content-range"))
if None in {start_offset, length}:
raise InvalidRequest(message="Invalid range header")
if None in {start_offset, length}:
span.set_status(StatusCode.ERROR)
raise InvalidRequest(message="Invalid range header")
input_fp = get_input_stream(request)
input_fp = get_input_stream(request)
try:
# Upload the data received.
blob_uploader.upload_chunk(app.config, input_fp, start_offset, length)
try:
# Upload the data received.
blob_uploader.upload_chunk(app.config, input_fp, start_offset, length)
if commit_digest is not None:
# Commit the upload to a blob.
return blob_uploader.commit_to_blob(app.config, commit_digest)
except BlobTooLargeException as ble:
span.record_exception(ble)
span.set_status(StatusCode.ERROR)
raise LayerTooLarge(uploaded=ble.uploaded, max_allowed=ble.max_allowed)
except BlobRangeMismatchException as ble:
span.record_exception(ble)
span.set_status(StatusCode.ERROR)
logger.exception("Exception when uploading blob to %s", blob_uploader.blob_upload_id)
_abort_range_not_satisfiable(
blob_uploader.blob_upload.byte_count, blob_uploader.blob_upload_id
)
except BlobUploadException:
logger.exception("Exception when uploading blob to %s", blob_uploader.blob_upload_id)
raise BlobUploadInvalid()
if commit_digest is not None:
# Commit the upload to a blob.
return blob_uploader.commit_to_blob(app.config, commit_digest)
except BlobTooLargeException as ble:
raise LayerTooLarge(uploaded=ble.uploaded, max_allowed=ble.max_allowed)
except BlobRangeMismatchException:
logger.exception("Exception when uploading blob to %s", blob_uploader.blob_upload_id)
_abort_range_not_satisfiable(
blob_uploader.blob_upload.byte_count, blob_uploader.blob_upload_id
)
except BlobUploadException:
logger.exception("Exception when uploading blob to %s", blob_uploader.blob_upload_id)
raise BlobUploadInvalid()

View File

@@ -1,9 +1,5 @@
from collections import namedtuple
from util.metrics.otel import trace
tracer = trace.get_tracer("quay.endpoints.v2.catalog")
from flask import jsonify
import features
@@ -30,9 +26,6 @@ class Repository(namedtuple("Repository", ["id", "namespace_name", "name"])):
@process_registry_jwt_auth()
@anon_protect
@paginate()
@tracer.start_as_current_span(
"quay.endpoints.v2.catalog.catalog_search", record_exception=True, set_status_on_exception=True
)
def catalog_search(start_id, limit, pagination_callback):
def _load_catalog():
include_public = bool(features.PUBLIC_CATALOG)

View File

@@ -3,10 +3,6 @@ from functools import wraps
from flask import Response, request, url_for
from util.metrics.otel import trace
tracer = trace.get_tracer("quay.v2.manifest")
import features
from app import app, model_cache, pullmetrics, storage
from auth.registry_jwt_auth import process_registry_jwt_auth
@@ -75,11 +71,6 @@ MANIFEST_TAGNAME_ROUTE = BASE_MANIFEST_ROUTE.format(VALID_TAG_PATTERN)
@require_repo_read(allow_for_superuser=True, allow_for_global_readonly_superuser=True)
@anon_protect
@inject_registry_model()
@tracer.start_as_current_span(
"quay.v2.manifest.fetch_manifest_by_tagname",
record_exception=True,
set_status_on_exception=True,
)
def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref, registry_model):
try:
repository_ref = registry_model.lookup_repository(
@@ -163,9 +154,6 @@ def fetch_manifest_by_tagname(namespace_name, repo_name, manifest_ref, registry_
@require_repo_read(allow_for_superuser=True, allow_for_global_readonly_superuser=True)
@anon_protect
@inject_registry_model()
@tracer.start_as_current_span(
"quay.v2.manifest.fetch_manifest_by_digest", record_exception=True, set_status_on_exception=True
)
def fetch_manifest_by_digest(namespace_name, repo_name, manifest_ref, registry_model):
try:
repository_ref = registry_model.lookup_repository(
@@ -305,11 +293,6 @@ def _doesnt_accept_schema_v1():
@anon_protect
@check_readonly
@check_pushes_disabled
@tracer.start_as_current_span(
"quay.v2.manifest.write_manifest_by_tagname",
record_exception=True,
set_status_on_exception=True,
)
def write_manifest_by_tagname(namespace_name, repo_name, manifest_ref):
parsed = _parse_manifest(request.content_type, request.data)
@@ -336,9 +319,6 @@ def _enqueue_blobs_for_replication(manifest, storage, namespace_name):
@anon_protect
@check_readonly
@check_pushes_disabled
@tracer.start_as_current_span(
"quay.v2.manifest.write_manifest_by_digest", record_exception=True, set_status_on_exception=True
)
def write_manifest_by_digest(namespace_name, repo_name, manifest_ref):
parsed = _parse_manifest(request.content_type, request.data)
if parsed.digest != manifest_ref:
@@ -416,11 +396,6 @@ def _parse_manifest(content_type, request_data):
@anon_protect
@check_readonly
@check_pushes_disabled
@tracer.start_as_current_span(
"quay.v2.manifest.delete_manifest_by_digest",
record_exception=True,
set_status_on_exception=True,
)
def delete_manifest_by_digest(namespace_name, repo_name, manifest_ref):
"""
Delete the manifest specified by the digest.
@@ -458,9 +433,6 @@ def delete_manifest_by_digest(namespace_name, repo_name, manifest_ref):
@anon_protect
@check_readonly
@check_pushes_disabled
@tracer.start_as_current_span(
"quay.v2.manifest.delete_manifest_by_tag", record_exception=True, set_status_on_exception=True
)
def delete_manifest_by_tag(namespace_name, repo_name, manifest_ref):
"""
Deletes the manifest specified by the tag.

View File

@@ -1,9 +1,5 @@
from flask import jsonify
from util.metrics.otel import trace
tracer = trace.get_tracer("quay.endpoints.v2.tag")
from app import app, model_cache
from auth.registry_jwt_auth import process_registry_jwt_auth
from data.registry_model import registry_model
@@ -28,9 +24,6 @@ from endpoints.v2.errors import NameUnknown, TooManyTagsRequested
@require_repo_read(allow_for_superuser=True, allow_for_global_readonly_superuser=True)
@anon_protect
@oci_tag_paginate()
@tracer.start_as_current_span(
"quay.endpoints.v2.tag.list_all_tags", record_exception=True, set_status_on_exception=True
)
def list_all_tags(namespace_name, repo_name, last_pagination_tag_name, limit, pagination_callback):
repository_ref = registry_model.lookup_repository(namespace_name, repo_name)
if repository_ref is None:

View File

@@ -2,10 +2,6 @@ import logging
import re
from collections import namedtuple
from util.metrics.otel import trace
tracer = trace.get_tracer("quay.endpoints.v2.v2auth")
from cachetools.func import lru_cache
from flask import jsonify, request
@@ -70,11 +66,6 @@ scopeResult = namedtuple(
@process_basic_auth
@no_cache
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.v2.v2auth.generate_registry_jwt",
record_exception=True,
set_status_on_exception=True,
)
def generate_registry_jwt(auth_result):
"""
This endpoint will generate a JWT conforming to the Docker Registry v2 Auth Spec:

View File

@@ -3,10 +3,6 @@ import logging
import os
from datetime import datetime, timedelta
from util.metrics.otel import StatusCode, get_tracecontext, trace
tracer = trace.get_tracer("quay.endpoints.web")
from cachetools.func import lru_cache
from flask import (
Blueprint,
@@ -108,37 +104,23 @@ STATUS_TAGS = app.config["STATUS_TAGS"]
@web.route("/", methods=["GET"], defaults={"path": ""})
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.index", record_exception=True, set_status_on_exception=True
)
def index(path, **kwargs):
return render_page_template_with_routedata("index.html", **kwargs)
@web.route("/_internal_ping")
@anon_allowed
@tracer.start_as_current_span(
"quay.endpoints.web.internal_ping", record_exception=True, set_status_on_exception=True
)
def internal_ping():
return make_response("true", 200)
@web.route("/500", methods=["GET"])
@tracer.start_as_current_span(
"quay.endpoints.web.internal_error_display", record_exception=True, set_status_on_exception=True
)
def internal_error_display():
return render_page_template_with_routedata("500.html")
@web.errorhandler(404)
@web.route("/404", methods=["GET"])
@tracer.start_as_current_span(
"quay.endpoints.web.not_found_error_display",
record_exception=True,
set_status_on_exception=True,
)
def not_found_error_display(e=None):
resp = index("", error_code=404, error_info=dict(reason="notfound"))
resp.status_code = 404
@@ -146,9 +128,6 @@ def not_found_error_display(e=None):
@web.route("/opensearch.xml")
@tracer.start_as_current_span(
"quay.endpoints.web.opensearch", record_exception=True, set_status_on_exception=True
)
def opensearch():
template = render_template(
"opensearch.xml",
@@ -163,9 +142,6 @@ def opensearch():
@web.route("/organization/<path:path>", methods=["GET"])
@web.route("/organization/<path:path>/", methods=["GET"])
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.org_view", record_exception=True, set_status_on_exception=True
)
def org_view(path):
return index("")
@@ -173,9 +149,6 @@ def org_view(path):
@web.route("/user/<path:path>", methods=["GET"])
@web.route("/user/<path:path>/", methods=["GET"])
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.user_view", record_exception=True, set_status_on_exception=True
)
def user_view(path):
return index("")
@@ -183,27 +156,18 @@ def user_view(path):
@web.route("/plans/")
@no_cache
@route_show_if(features.BILLING)
@tracer.start_as_current_span(
"quay.endpoints.web.plans", record_exception=True, set_status_on_exception=True
)
def plans():
return index("")
@web.route("/search")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.search", record_exception=True, set_status_on_exception=True
)
def search():
return index("")
@web.route("/guide/")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.guide", record_exception=True, set_status_on_exception=True
)
def guide():
return index("")
@@ -211,18 +175,12 @@ def guide():
@web.route("/tour/")
@web.route("/tour/<path:path>")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.tour", record_exception=True, set_status_on_exception=True
)
def tour(path=""):
return index("")
@web.route("/tutorial/")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.tutorial", record_exception=True, set_status_on_exception=True
)
def tutorial():
return index("")
@@ -230,9 +188,6 @@ def tutorial():
@web.route("/organizations/")
@web.route("/organizations/new/")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.organizations", record_exception=True, set_status_on_exception=True
)
def organizations():
return index("")
@@ -240,9 +195,6 @@ def organizations():
@web.route("/superuser/")
@no_cache
@route_show_if(features.SUPER_USERS)
@tracer.start_as_current_span(
"quay.endpoints.web.superuser", record_exception=True, set_status_on_exception=True
)
def superuser():
return index("")
@@ -250,63 +202,42 @@ def superuser():
@web.route("/setup/")
@no_cache
@route_show_if(features.SUPER_USERS)
@tracer.start_as_current_span(
"quay.endpoints.web.setup", record_exception=True, set_status_on_exception=True
)
def setup():
return index("")
@web.route("/signin/")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.signin", record_exception=True, set_status_on_exception=True
)
def signin(redirect=None):
return index("")
@web.route("/contact/")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.contact", record_exception=True, set_status_on_exception=True
)
def contact():
return index("")
@web.route("/about/")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.about", record_exception=True, set_status_on_exception=True
)
def about():
return index("")
@web.route("/new/")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.new", record_exception=True, set_status_on_exception=True
)
def new():
return index("")
@web.route("/updateuser")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.updateuser", record_exception=True, set_status_on_exception=True
)
def updateuser():
return index("")
@web.route("/confirminvite")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.confirm_invite", record_exception=True, set_status_on_exception=True
)
def confirm_invite():
code = request.values["code"]
return index("", code=code)
@@ -315,27 +246,18 @@ def confirm_invite():
@web.route("/repository/", defaults={"path": ""})
@web.route("/repository/<path:path>", methods=["GET"])
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.repository", record_exception=True, set_status_on_exception=True
)
def repository(path):
return index("")
@web.route("/repository/<path:path>/trigger/<trigger>", methods=["GET"])
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.buildtrigger", record_exception=True, set_status_on_exception=True
)
def buildtrigger(path, trigger):
return index("")
@web.route("/security/")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.security", record_exception=True, set_status_on_exception=True
)
def security():
return index("")
@@ -343,18 +265,12 @@ def security():
@web.route("/enterprise/")
@no_cache
@route_show_if(features.BILLING)
@tracer.start_as_current_span(
"quay.endpoints.web.enterprise", record_exception=True, set_status_on_exception=True
)
def enterprise():
return redirect("/plans?tab=enterprise")
@web.route("/__exp/<expname>")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.exp", record_exception=True, set_status_on_exception=True
)
def exp(expname):
return index("")
@@ -362,27 +278,18 @@ def exp(expname):
@web.route("/v1")
@web.route("/v1/")
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.v1", record_exception=True, set_status_on_exception=True
)
def v1():
return index("")
@web.route("/tos", methods=["GET"])
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.tos", record_exception=True, set_status_on_exception=True
)
def tos():
return index("")
@web.route("/privacy", methods=["GET"])
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.privacy", record_exception=True, set_status_on_exception=True
)
def privacy():
return index("")
@@ -391,9 +298,6 @@ def privacy():
@web.route("/health/instance", methods=["GET"])
@process_auth_or_cookie
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.instance_health", record_exception=True, set_status_on_exception=True
)
def instance_health():
checker = get_healthchecker(app, config_provider, instance_keys)
(data, status_code) = checker.check_instance()
@@ -406,9 +310,6 @@ def instance_health():
@web.route("/health/endtoend", methods=["GET"])
@process_auth_or_cookie
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.endtoend_health", record_exception=True, set_status_on_exception=True
)
def endtoend_health():
checker = get_healthchecker(app, config_provider, instance_keys)
(data, status_code) = checker.check_endtoend()
@@ -420,9 +321,6 @@ def endtoend_health():
@web.route("/health/warning", methods=["GET"])
@process_auth_or_cookie
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.warning_health", record_exception=True, set_status_on_exception=True
)
def warning_health():
checker = get_healthchecker(app, config_provider, instance_keys)
(data, status_code) = checker.check_warning()
@@ -435,9 +333,6 @@ def warning_health():
@route_show_if(features.BILLING) # Since this is only used in production.
@process_auth_or_cookie
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.dbrevision_health", record_exception=True, set_status_on_exception=True
)
def dbrevision_health():
# Find the revision from the database.
result = db.execute_sql("select * from alembic_version limit 1").fetchone()
@@ -461,9 +356,6 @@ def dbrevision_health():
@web.route("/health/enabledebug/<secret>", methods=["GET"])
@no_cache
@tracer.start_as_current_span(
"quay.endpoints.web.enable_health_debug", record_exception=True, set_status_on_exception=True
)
def enable_health_debug(secret):
if not secret:
abort(404)
@@ -488,9 +380,6 @@ def robots():
@web.route("/buildlogs/<build_uuid>", methods=["GET"])
@route_show_if(features.BUILD_SUPPORT)
@process_auth_or_cookie
@tracer.start_as_current_span(
"quay.endpoints.web.buildlogs", record_exception=True, set_status_on_exception=True
)
def buildlogs(build_uuid):
found_build = model.build.get_repository_build(build_uuid)
if not found_build:
@@ -519,9 +408,6 @@ def buildlogs(build_uuid):
@web.route("/exportedlogs/<file_id>", methods=["GET"])
@tracer.start_as_current_span(
"quay.endpoints.web.exportedlogs", record_exception=True, set_status_on_exception=True
)
def exportedlogs(file_id):
# Only enable this endpoint if local storage is available.
has_local_storage = False
@@ -554,9 +440,6 @@ def exportedlogs(file_id):
@web.route("/logarchive/<file_id>", methods=["GET"])
@route_show_if(features.BUILD_SUPPORT)
@process_auth_or_cookie
@tracer.start_as_current_span(
"quay.endpoints.web.logarchive", record_exception=True, set_status_on_exception=True
)
def logarchive(file_id):
JSON_MIMETYPE = "application/json"
try:
@@ -588,9 +471,6 @@ def logarchive(file_id):
@web.route("/receipt", methods=["GET"])
@route_show_if(features.BILLING)
@require_session_login
@tracer.start_as_current_span(
"quay.endpoints.web.receipt", record_exception=True, set_status_on_exception=True
)
def receipt():
if not current_user.is_authenticated:
abort(401)
@@ -628,9 +508,6 @@ def receipt():
@web.route("/authrepoemail", methods=["GET"])
@route_show_if(features.MAILING)
@tracer.start_as_current_span(
"quay.endpoints.web.confirm_repo_email", record_exception=True, set_status_on_exception=True
)
def confirm_repo_email():
code = request.values["code"]
record = None
@@ -658,9 +535,6 @@ def confirm_repo_email():
@web.route("/confirm", methods=["GET"])
@route_show_if(features.MAILING)
@anon_allowed
@tracer.start_as_current_span(
"quay.endpoints.web.confirm_email", record_exception=True, set_status_on_exception=True
)
def confirm_email():
code = request.values["code"]
user = None
@@ -696,9 +570,6 @@ def confirm_email():
@web.route("/recovery", methods=["GET"])
@route_show_if(features.MAILING)
@anon_allowed
@tracer.start_as_current_span(
"quay.endpoints.web.confirm_recovery", record_exception=True, set_status_on_exception=True
)
def confirm_recovery():
code = request.values["code"]
user = model.user.validate_reset_code(code)
@@ -720,9 +591,6 @@ def confirm_recovery():
@web.route("/repository/<repopath:repository>/status", methods=["GET"])
@parse_repository_name()
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.web.build_status_badge", record_exception=True, set_status_on_exception=True
)
def build_status_badge(namespace_name, repo_name):
token = request.args.get("token", None)
repo = model.repository.get_repository(namespace_name, repo_name)
@@ -766,9 +634,6 @@ class FlaskAuthorizationProvider(model.oauth.DatabaseAuthorizationProvider):
@web.route("/oauth/authorizeapp", methods=["POST"])
@process_auth_or_cookie
@tracer.start_as_current_span(
"quay.endpoints.web.authorize_application", record_exception=True, set_status_on_exception=True
)
def authorize_application():
# Check for an authenticated user.
if not get_authenticated_user():
@@ -816,9 +681,6 @@ def authorize_application():
@web.route(app.config["LOCAL_OAUTH_HANDLER"], methods=["GET"])
@tracer.start_as_current_span(
"quay.endpoints.web.oauth_local_handler", record_exception=True, set_status_on_exception=True
)
def oauth_local_handler():
if not current_user.is_authenticated:
abort(401)
@@ -837,9 +699,6 @@ def oauth_local_handler():
@web.route("/oauth/denyapp", methods=["POST"])
@csrf_protect()
@tracer.start_as_current_span(
"quay.endpoints.web.deny_application", record_exception=True, set_status_on_exception=True
)
def deny_application():
if not current_user.is_authenticated:
abort(401)
@@ -860,11 +719,6 @@ def deny_application():
@param_required("redirect_uri")
@param_required("scope")
@process_auth_or_cookie
@tracer.start_as_current_span(
"quay.endpoints.web.request_authorization_code",
record_exception=True,
set_status_on_exception=True,
)
def request_authorization_code():
provider = FlaskAuthorizationProvider()
response_type = request.args.get("response_type", "code")
@@ -987,9 +841,6 @@ def request_authorization_code():
@param_required("scope")
@param_required("username")
@process_auth_or_cookie
@tracer.start_as_current_span(
"quay.endpoints.web.assign_user_to_app", record_exception=True, set_status_on_exception=True
)
def assign_user_to_app():
response_type = request.args.get("response_type", "code")
client_id = request.args.get("client_id", None)
@@ -1048,11 +899,6 @@ def assign_user_to_app():
@param_required("redirect_uri", allow_body=True)
@param_required("code", allow_body=True)
@param_required("scope", allow_body=True)
@tracer.start_as_current_span(
"quay.endpoints.web.exchange_code_for_token",
record_exception=True,
set_status_on_exception=True,
)
def exchange_code_for_token():
grant_type = request.values.get("grant_type", None)
client_id = request.values.get("client_id", None)
@@ -1075,11 +921,6 @@ def exchange_code_for_token():
@require_session_login
@parse_repository_name()
@route_show_if(features.BITBUCKET_BUILD)
@tracer.start_as_current_span(
"quay.endpoints.web.attach_bitbucket_trigger",
record_exception=True,
set_status_on_exception=True,
)
def attach_bitbucket_trigger(namespace_name, repo_name):
permission = AdministerRepositoryPermission(namespace_name, repo_name)
if permission.can():
@@ -1114,11 +955,6 @@ def attach_bitbucket_trigger(namespace_name, repo_name):
@web.route("/customtrigger/setup/<repopath:repository>", methods=["GET"])
@require_session_login
@parse_repository_name()
@tracer.start_as_current_span(
"quay.endpoints.web.attach_custom_build_trigger",
record_exception=True,
set_status_on_exception=True,
)
def attach_custom_build_trigger(namespace_name, repo_name):
permission = AdministerRepositoryPermission(namespace_name, repo_name)
if permission.can():
@@ -1147,9 +983,6 @@ def attach_custom_build_trigger(namespace_name, repo_name):
@process_oauth
@parse_repository_name(include_tag=True)
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.web.redirect_to_repository", record_exception=True, set_status_on_exception=True
)
def redirect_to_repository(namespace_name, repo_name, tag_name):
# Always return 200 for ac-discovery, to ensure that rkt and other ACI-compliant clients can
# find the metadata they need. Permissions will be checked in the registry API.
@@ -1202,9 +1035,6 @@ def redirect_to_repository(namespace_name, repo_name, tag_name):
@no_cache
@process_oauth
@anon_protect
@tracer.start_as_current_span(
"quay.endpoints.web.redirect_to_namespace", record_exception=True, set_status_on_exception=True
)
def redirect_to_namespace(namespace):
okay, _ = model.user.validate_username(namespace)
if not okay:
@@ -1229,9 +1059,6 @@ def has_users():
@web.route("/api/v1/user/initialize", methods=["POST"])
@route_show_if(features.USER_INITIALIZE)
@tracer.start_as_current_span(
"quay.endpoints.web.user_initialize", record_exception=True, set_status_on_exception=True
)
def user_initialize():
"""
Create initial user in an empty database
@@ -1304,9 +1131,6 @@ def user_initialize():
@web.route("/config", methods=["GET", "OPTIONS"])
@crossorigin(anonymous=False)
@tracer.start_as_current_span(
"quay.endpoints.web.config", record_exception=True, set_status_on_exception=True
)
def config():
version_number = ""
if not features.BILLING:

View File

@@ -8,10 +8,6 @@ from io import BufferedIOBase, BytesIO, StringIO
from itertools import chain
from uuid import uuid4
from util.metrics.otel import StatusCode, get_tracecontext, trace
tracer = trace.get_tracer("cloud.py")
import boto3.session
import botocore.config
import botocore.exceptions
@@ -161,80 +157,17 @@ class _CloudStorage(BaseStorageV2):
if deferred_refreshable_credentials:
self._session._session._credentials = deferred_refreshable_credentials
def response_tracing_headers(self, **kwargs):
name = f"boto3.{str(kwargs.get('event_name', 'before-send.s3.unknown')).split('.')[-1]}"
with tracer.start_as_current_span(name) as span:
sctx = span.get_span_context()
for k in kwargs.get("parsed", {}):
if isinstance(kwargs["parsed"][k], bytes):
span.set_attribute(k, kwargs["parsed"][k].decode("utf8"))
else:
span.set_attribute(k, str(kwargs["parsed"][k]))
for k in kwargs.get("context", {}):
if isinstance(kwargs.get("context")[k], bytes):
span.set_attribute(k, kwargs["context"][k].decode("utf8"))
else:
span.set_attribute(k, str(kwargs["context"][k]))
def add_tracing_headers(self, request, **kwargs):
name = f"boto3.{str(kwargs.get('event_name', 'before-send.s3.unknown')).split('.')[-1]}"
with tracer.start_as_current_span(name) as span:
sctx = span.get_span_context()
span.set_attribute("method", str(request.method))
span.set_attribute("hook-name", str(kwargs.get("event_name", "unknown")))
for k in request.headers:
if isinstance(request.headers[k], bytes):
span.set_attribute(k, request.headers[k].decode("utf8"))
else:
span.set_attribute(k, str(request.headers[k]))
try:
request.headers[
"traceparent"
] = f"00-{hex(sctx.trace_id)[2:]}-{hex(sctx.span_id)[2:]}-01"
request.headers["x-b3-traceid"] = hex(sctx.trace_id)[2:]
request.headers["x-b3-spanid"] = hex(sctx.span_id)[2:]
request.headers["x-b3-parentspanid"] = hex(sctx.span_id)[2:]
request.headers["x-b3-sampled"] = "1"
logger.error(f"[OTEL] request {request.headers}")
except Exception as trerr:
logger.error(f"OTEL {trerr}")
def create_trace(self, operation_name, params, **kwargs):
try:
# ctx = get_tracecontext()
name = "boto3." + str(operation_name)
with tracer.start_as_current_span(name) as span:
for k in params:
span.set_attribute(k, str(params[k]))
for k in kwargs:
span.set_attribute(k, str(kwargs[k]))
span.set_status(StatusCode.OK)
except Exception as trerr:
logger.error(f"OTEL createtraces {trerr}")
def _initialize_cloud_conn(self):
if not self._initialized:
with tracer.start_as_current_span(
"quay.storage.initialize",
) as span:
span.set_status(StatusCode.ERROR)
for k in self._connect_kwargs:
span.set_attribute(k, str(self._connect_kwargs[k]))
self._session.events.register("before-send.s3.*", self.add_tracing_headers)
self._session.events.register("after-call.s3.*", self.response_tracing_headers)
# Low-level client. Needed to generate presigned urls
self._cloud_conn = self._session.client("s3", **self._connect_kwargs)
self._cloud_bucket = self._session.resource("s3", **self._connect_kwargs).Bucket(
self._bucket_name
)
# This will raise a ClientError if the bucket does ot exists.
# We actually want an exception raised if the bucket does not exists (same as in boto2)
self._cloud_conn.head_bucket(Bucket=self._bucket_name)
span.set_status(StatusCode.OK)
self._initialized = True
# Low-level client. Needed to generate presigned urls
self._cloud_conn = self._session.client("s3", **self._connect_kwargs)
self._cloud_bucket = self._session.resource("s3", **self._connect_kwargs).Bucket(
self._bucket_name
)
# This will raise a ClientError if the bucket does ot exists.
# We actually want an exception raised if the bucket does not exists (same as in boto2)
self._cloud_conn.head_bucket(Bucket=self._bucket_name)
self._initialized = True
def _debug_key(self, obj):
"""
@@ -289,9 +222,6 @@ class _CloudStorage(BaseStorageV2):
def get_cloud_bucket(self):
return self._cloud_bucket
@tracer.start_as_current_span(
"quay.storage.get_content", record_exception=True, set_status_on_exception=True
)
def get_content(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
@@ -306,9 +236,6 @@ class _CloudStorage(BaseStorageV2):
raise
@tracer.start_as_current_span(
"quay.storage.put_content", record_exception=True, set_status_on_exception=True
)
def put_content(self, path, content):
self._initialize_cloud_conn()
path = self._init_path(path)
@@ -319,42 +246,22 @@ class _CloudStorage(BaseStorageV2):
def get_supports_resumable_downloads(self):
return True
@tracer.start_as_current_span(
"quay.storage.get_direct_download_url", record_exception=True, set_status_on_exception=True
)
def get_direct_download_url(
self, path, request_ip=None, expires_in=60, requires_cors=False, head=False, **kwargs
):
self._initialize_cloud_conn()
path = self._init_path(path)
with tracer.start_as_current_span(
"quay.storage.presigned_url",
attributes=dict(
path=str(path),
request_ip=str(request_ip),
expires_in=expires_in,
requires_cors=requires_cors,
head=head,
),
) as span:
client_method = "get_object"
if head:
client_method = "head_object"
client_method = "get_object"
if head:
client_method = "head_object"
return self.get_cloud_conn().generate_presigned_url(
client_method,
Params={"Bucket": self._bucket_name, "Key": path},
ExpiresIn=expires_in,
)
uri = self.get_cloud_conn().generate_presigned_url(
client_method,
Params={"Bucket": self._bucket_name, "Key": path},
ExpiresIn=expires_in,
)
span.set_attribute("presign_url", uri)
span.set_status(StatusCode.OK)
return uri
@tracer.start_as_current_span(
"quay.storage.get_direct_upload_url", record_exception=True, set_status_on_exception=True
)
def get_direct_upload_url(self, path, mime_type, requires_cors=True):
self._initialize_cloud_conn()
path = self._init_path(path)
@@ -368,12 +275,8 @@ class _CloudStorage(BaseStorageV2):
ExpiresIn=300,
)
@tracer.start_as_current_span(
"quay.storage.stream_read", record_exception=True, set_status_on_exception=True
)
def stream_read(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
obj = self.get_cloud_bucket().Object(path)
try:
@@ -390,12 +293,8 @@ class _CloudStorage(BaseStorageV2):
break
yield data
@tracer.start_as_current_span(
"quay.storage.stream_read_file", record_exception=True, set_status_on_exception=True
)
def stream_read_file(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
obj = self.get_cloud_bucket().Object(path)
try:
@@ -406,15 +305,9 @@ class _CloudStorage(BaseStorageV2):
raise
return StreamReadKeyAsFile(obj.get()["Body"])
@tracer.start_as_current_span(
"quay.storage.__initiate_multipart_upload",
record_exception=True,
set_status_on_exception=True,
)
def __initiate_multipart_upload(self, path, content_type, content_encoding):
# Minimum size of upload part size on S3 is 5MB
self._initialize_cloud_conn()
path = self._init_path(path)
obj = self.get_cloud_bucket().Object(path)
@@ -528,12 +421,8 @@ class _CloudStorage(BaseStorageV2):
return total_bytes_written, write_error
@tracer.start_as_current_span(
"quay.storage.exists", record_exception=True, set_status_on_exception=True
)
def exists(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
obj = self.get_cloud_bucket().Object(path)
try:
@@ -544,12 +433,8 @@ class _CloudStorage(BaseStorageV2):
raise
return True
@tracer.start_as_current_span(
"quay.storage.remove", record_exception=True, set_status_on_exception=True
)
def remove(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
obj = self.get_cloud_bucket().Object(path)
try:
@@ -569,12 +454,8 @@ class _CloudStorage(BaseStorageV2):
obj = self.get_cloud_bucket().Object(content["Key"])
obj.delete()
@tracer.start_as_current_span(
"quay.storage.checksum", record_exception=True, set_status_on_exception=True
)
def get_checksum(self, path):
self._initialize_cloud_conn()
path = self._init_path(path)
obj = self.get_cloud_bucket().Object(path)
try:
@@ -586,9 +467,6 @@ class _CloudStorage(BaseStorageV2):
return obj.e_tag[1:-1][:7]
@tracer.start_as_current_span(
"quay.storage.copy_to", record_exception=True, set_status_on_exception=True
)
def copy_to(self, destination, path):
"""
Copies the given path from this storage to the destination storage.
@@ -659,9 +537,6 @@ class _CloudStorage(BaseStorageV2):
return random_uuid, metadata
@tracer.start_as_current_span(
"quay.storage.stream_upload_chunk", record_exception=True, set_status_on_exception=True
)
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
self._initialize_cloud_conn()
@@ -734,16 +609,13 @@ class _CloudStorage(BaseStorageV2):
except botocore.exceptions.ClientError as s3re:
# sometimes HTTPStatusCode isn't set for some reason, so we need
# to protect ourselves against a KeyError.
with tracer.start_as_current_span(
"boto3.Exception", attributes={"args": str(args), "kwargs": str(kwargs)}
) as span:
if (
remaining_retries
and s3re.response["Error"].get("HTTPStatusCode", 0) == 200
and s3re.response["Error"].get("Code", "") == "InternalError"
):
# Weird internal error case. Retry.
continue
if (
remaining_retries
and s3re.response["Error"].get("HTTPStatusCode", 0) == 200
and s3re.response["Error"].get("Code", "") == "InternalError"
):
# Weird internal error case. Retry.
continue
# Otherwise, raise it.
logger.exception("Exception trying to perform action %s", action)
@@ -768,9 +640,6 @@ class _CloudStorage(BaseStorageV2):
):
yield subchunk
@tracer.start_as_current_span(
"quay.storage.complete_chunked_upload", record_exception=True, set_status_on_exception=True
)
def complete_chunked_upload(self, uuid, final_path, storage_metadata, force_client_side=False):
self._initialize_cloud_conn()
chunk_list = self._chunk_list_from_metadata(storage_metadata)
@@ -820,56 +689,26 @@ class _CloudStorage(BaseStorageV2):
# [_PartUpload]
upload_parts = []
for index, chunk in enumerate(updated_chunks):
abs_chunk_path = self._init_path(chunk.path)
with tracer.start_as_current_span(
"quay.storage.action_with_retry",
attributes={
"action": "copy_from",
"Bucket": self.get_cloud_bucket().name,
"Key": chunk.path,
"Range": f"bytes={chunk.offset}-{chunk.length + chunk.offset - 1}",
},
) as span:
part_copy = self._perform_action_with_retry(
mpu.Part(index + 1).copy_from,
CopySource={
"Bucket": self.get_cloud_bucket().name,
"Key": abs_chunk_path,
},
CopySourceRange="bytes=%s-%s"
% (chunk.offset, chunk.length + chunk.offset - 1),
)
span.set_status(StatusCode.OK)
span.add_event(
"quay.storag.part.append",
attributes={
"index": index + 1,
"etag": part_copy["CopyPartResult"]["ETag"],
},
)
upload_parts.append(
_PartUpload(index + 1, part_copy["CopyPartResult"]["ETag"])
)
with tracer.start_as_current_span(
"quay.storage.action_with_retry",
attributes={
"action": "complete",
"Bucket": self.get_cloud_bucket().name,
},
) as span:
self._perform_action_with_retry(
mpu.complete,
MultipartUpload={
"Parts": [
{"ETag": p.e_tag, "PartNumber": p.part_number} for p in upload_parts
]
},
part_copy = self._perform_action_with_retry(
mpu.Part(index + 1).copy_from,
CopySource={"Bucket": self.get_cloud_bucket().name, "Key": abs_chunk_path},
CopySourceRange="bytes=%s-%s"
% (chunk.offset, chunk.length + chunk.offset - 1),
)
upload_parts.append(_PartUpload(index + 1, part_copy["CopyPartResult"]["ETag"]))
self._perform_action_with_retry(
mpu.complete,
MultipartUpload={
"Parts": [
{"ETag": p.e_tag, "PartNumber": p.part_number} for p in upload_parts
]
},
)
except (botocore.exceptions.ClientError, IOError) as ioe:
# Something bad happened, log it and then give up
msg = "Exception when attempting server-side assembly for: %s"
@@ -882,9 +721,6 @@ class _CloudStorage(BaseStorageV2):
# pass that to stream_write to chunk and upload the final object.
self._client_side_chunk_join(final_path, chunk_list)
@tracer.start_as_current_span(
"quay.storage.cancel_chunked_upload", record_exception=True, set_status_on_exception=True
)
def cancel_chunked_upload(self, uuid, storage_metadata):
self._initialize_cloud_conn()
@@ -892,9 +728,6 @@ class _CloudStorage(BaseStorageV2):
for chunk in self._chunk_list_from_metadata(storage_metadata):
self.remove(chunk.path)
@tracer.start_as_current_span(
"quay.storage.clean_partial_uploads", record_exception=True, set_status_on_exception=True
)
def clean_partial_uploads(self, deletion_date_threshold):
self._initialize_cloud_conn()
path = self._init_path("uploads")

View File

@@ -1653,36 +1653,15 @@ CONFIG_SCHEMA = {
"description": "name of service in otel spans",
"x-example": "quay",
},
"OTEL_EXPORTER_OTLP_ENDPOINT": {
"dt_api_url": {
"type": "string",
"description": "url for OTLP api environment overwrites",
"x-example": "https://dynatrace-api.example:443",
"description": "url for dynatrace api",
"x-example": "https://dynatrace-api.example",
},
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": {
"dt_api_token": {
"type": "string",
"description": "url for OTLP api environment overwrites",
"x-example": "https://dynatrace-api.example:443",
},
"OTEL_EXPORTER_OTLP_HEADERS": {
"type": "object",
"description": "headers to include in API calls",
"x-example": {"Authorization": "Bearer api-token", "X-Org-Tenant": "some"},
"items": {"type": "dict"},
},
"OTEL_EXPORTER_OTLP_INSECURE": {
"type": "boolean",
"description": "skip ssl verification for API connections",
"x-example": "true",
},
"OTEL_EXPORTER_OTLP_PROTOCOL": {
"type": "string",
"description": "MUST be one of: grpc, http/protobuf, http/json",
"x-example": "http/protobuf",
},
"OTEL_TRACES_SAMPLER_ARG": {
"type": "float",
"description": "Sampling decission rate as float",
"x-example": "0.001 # ( 1 / 1000)",
"description": "token for dynatrace api",
"x-example": "sometoken",
},
},
},

View File

@@ -1,32 +1,13 @@
import os
from functools import wraps
from urllib.parse import urlparse
import opentelemetry.sdk.trace.id_generator as idg
from flask import request
from opentelemetry import context, trace
from opentelemetry.context.context import Context
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as grpcOTLPSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as httpOTLPSpanExporter,
)
from opentelemetry.sdk.resources import (
SERVICE_NAME,
SERVICE_NAMESPACE,
SERVICE_VERSION,
Resource,
)
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
from opentelemetry.trace import NonRecordingSpan, Span, SpanContext
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.trace.status import StatusCode
import features
from _init import _get_version_number as get_quay_version
def init_exporter(app_config):
@@ -34,134 +15,29 @@ def init_exporter(app_config):
otel_config = app_config.get("OTEL_CONFIG", {})
service_name = otel_config.get("service_name", "quay")
service_namespace = os.environ.get("QE_K8S_NAMESPACE", "standalone")
service_version = get_quay_version()
# compile headers if present
otel_headers = otel_config.get(
"OTEL_EXPORTER_OTLP_HEADERS", otel_config.get("OTEL_EXPORTER_OTLP_TRACES_HEADERS", {})
)
if otel_headers == {}:
# accept Environ overwrites
otel_headers = os.environ.get(
"OTEL_EXPORTER_OTLP_HEADERS",
os.environ.get("OTEL_EXPORTER_OTLP_TRACES_HEADERS", ""),
)
# OTLP headers from environment are formatted as key=value,key2=value2
# to ensure catching the format we
# first need to split by comma ","
# than split by equal "=" only once (example x-auth-sentry=sentry sentry_key=xxx)
try:
# will return a list of tuples [("x-auth-sentry", "sentry sentry_key=xxx"), ("x-tenant", "name")]
# that we transform into a dict
otel_headers = dict(
list(map(lambda x: tuple(x.strip().split("=", 1)), otel_headers.strip().split(",")))
)
except ValueError:
# if empty [('',)] we cannot form a dict out of the parser
otel_headers = {}
# according to https://opentelemetry.io/docs/specs/otel/protocol/exporter/#endpoint-urls-for-otlphttp
# none signal specific configuration needs to append /v1/traces
otel_endpoint = otel_config.get(
"OTEL_EXPORTER_OTLP_ENDPOINT",
"http://127.0.0.1:80/v1/traces",
)
if not urlparse(otel_endpoint).path.endswith("/v1/traces"):
otel_endpoint += "/v1/traces"
DT_API_URL = otel_config.get("dt_api_url", None)
DT_API_TOKEN = otel_config.get("dt_api_token", None)
if otel_endpoint == "http://127.0.0.1:80/v1/traces":
# accept Environ overwrites
# signal specific configuration needs to be unmodified
otel_endpoint = os.environ.get(
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT",
os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", otel_endpoint),
)
otel_insecure = otel_config.get(
"OTEL_EXPORTER_OTLP_INSECURE", otel_config.get("OTEL_EXPORTER_OTLP_TRACES_INSECURE", False)
)
if not otel_insecure:
# accept Environ overwrites
otel_insecure = bool(
os.environ.get(
"OTEL_EXPORTER_OTLP_INSECURE",
os.environ.get("OTEL_EXPORTER_OTLP_TRACES_INSECURE", False),
)
)
resource = Resource.create(attributes={SERVICE_NAME: service_name})
resource = Resource.create(
attributes={
SERVICE_NAME: service_name,
SERVICE_NAMESPACE: service_namespace,
SERVICE_VERSION: service_version,
}
)
# according to https://opentelemetry.io/docs/specs/otel/protocol/exporter/#specify-protocol
# http/protobuf should be the default and w should support grpc
if otel_config.get("OTEL_EXPORTER_OTLP_PROTOCOL", "http") == "grpc":
OTLPSpanExporter = grpcOTLPSpanExporter
elif otel_config.get("OTEL_EXPORTER_OTLP_PROTOCOL", "http") in ("http", "http/protobuf"):
OTLPSpanExporter = httpOTLPSpanExporter
elif otel_config.get("OTEL_EXPORTER_OTLP_PROTOCOL", "http") in ("http", "http/json"):
# http/json is only may which is why we fallback to http/protobuf
OTLPSpanExporter = httpOTLPSpanExporter
# we should leave this to the collector to decide
otel_sample_arg = float(otel_config.get("OTEL_TRACES_SAMPLER_ARG", 0.001))
if otel_sample_arg == 0.001:
# accept Environ overwrites
otel_sample_arg = float(os.environ.get("OTEL_TRACES_SAMPLER_ARG", otel_sample_arg))
sampler = TraceIdRatioBased(otel_sample_arg)
sampler = TraceIdRatioBased(1 / 1000)
tracerProvider = TracerProvider(resource=resource, sampler=sampler)
processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=otel_endpoint,
headers=otel_headers,
if DT_API_URL is not None and DT_API_TOKEN is not None:
processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=DT_API_URL + "/v1/traces",
headers={"Authorization": "Api-Token " + DT_API_TOKEN},
)
)
)
else:
spanExporter = OTLPSpanExporter(endpoint="http://jaeger:4317")
processor = BatchSpanProcessor(spanExporter)
tracerProvider.add_span_processor(processor)
trace.set_tracer_provider(tracerProvider)
def get_tracecontext(custom: str = "", headers: dict = {}) -> Context:
# used to extract trace context from headers or generate one if empty
# with a generated tracecontext that is inherited we do not generate multiple
# trace contexts on methods that are not related/chained
def create_random():
while True:
span_id = hex(idg.RandomIdGenerator().generate_span_id())
if len(span_id) == 18:
break
return f"00-{hex(idg.RandomIdGenerator().generate_trace_id())[2:]}" + f"-{span_id[2:]}-01"
if custom == "":
carrier = {"traceparent": headers.get("traceparent", create_random())}
else:
carrier = {"traceparent": custom}
ctx = TraceContextTextMapPropagator().extract(carrier)
if ctx == {}:
ctx = context.get_current()
return ctx
def get_traceparent(_ctx) -> str:
# used to output traceparent into logs
try:
span = _ctx.get(list(_ctx)[0]).get_span_context()
except:
try:
span = _ctx.get_span_context()
except:
span = _ctx
try:
tp = f"00-{hex(span.trace_id)[2:]}-{hex(span.span_id)[2:]}-0{hex(span.trace_flags)[2:]}"
except Exception as err:
tp = ""
return tp
def traced(span_name=None):
"""
Decorator for tracing function calls using OpenTelemetry.
@@ -171,19 +47,14 @@ def traced(span_name=None):
@wraps(func)
def wrapper(*args, **kwargs):
if features.OTEL_TRACING:
ctx = get_tracecontext(request.headers)
tracer = trace.get_tracer(__name__)
name = span_name if span_name else func.__name__
with tracer.start_as_current_span(name, context=ctx) as span:
with tracer.start_as_current_span(name) as span:
try:
span.set_status(StatusCode.OK)
span.set_attribute("function", name)
span.set_attribute("args", str(args))
span.set_attribute("kwargs", str(kwargs))
return func(*args, **kwargs)
except Exception as e:
span.record_exception(e)
span.set_status(StatusCode.ERROR)
span.set_status(trace.Status(trace.StatusCode.ERROR))
else:
return func(*args, **kwargs)

View File

@@ -1,61 +0,0 @@
import opentelemetry.exporter.otlp.proto.grpc.trace_exporter
import opentelemetry.exporter.otlp.proto.http.trace_exporter
import pytest
from opentelemetry import context, trace
from util.metrics.otel import init_exporter
# check various configuration options
app_configs = [
{
"OTEL_CONFIG": {
"service_name": "unittest",
"OTEL_EXPORTER_OTLP_HEADERS": {
"Authorization": "Api-Token abcdef",
"Tenant-Id": "Tenant 1",
},
"OTEL_EXPORTER_OTLP_ENDPOINT": "https://otlp.example.com",
"OTEL_EXPORTER_OTLP_INSECURE": False,
"OTEL_EXPORTER_OTLP_PROTOCOL": "http",
"OTEL_TRACES_SAMPLER_ARG": 0.005,
},
"FEATURE_ENABLE_OTEL": True,
},
]
@pytest.fixture()
def test_otel_config():
# since we can set tracer_provider only once, we go with the default one
app.config["OTEL_CONFIG"] = app_configs[0]["OTEL_CONFIG"]
app.config["FEATURE_OTEL_TRACING"] = True
init_exporter(app)
tracer = trace.get_tracer("unittest")
assert tracer.sampler.rate == 0.005
assert tracer.resource.attributes.get("service.name") == "unittest"
assert tracer.resource.attributes.get("service.namespace") == "standalone"
assert tracer.resource.attributes.get("service.version", False)
# convert to dict so we do not need to differentiate between http/grpc headers
assert (
dict(tracer.span_processor._span_processors[0].span_exporter._headers).get("Authorization")
== "Api-Token abcdef"
)
assert (
dict(tracer.span_processor._span_processors[0].span_exporter._headers).get("Tenant-Id")
== "Tenant 1"
)
# http exporter
assert (
tracer.span_processor._span_processors[0].span_exporter._endpoint
== "https://otlp.example.com/v1/traces"
)
# grpc exporter
# assert tracer.span_processor._span_processors[0].span_exporter._endpoint == "otlp.example.com"
assert isinstance(
tracer.span_processor._span_processors[0].span_exporter,
opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter,
)
# assert isinstance(tracer.span_processor._span_processors[0].span_exporter, opentelemetry.exporter.otlp.proto.grpc.trace_exporter.OTLPSpanExporter)