1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/data/model/quota.py
Brandon Caton d2a1fedab4 quota: refactor, tests, and optimizations (PROJQUAY-6735) (#2712)
Adds the following changes:
- Additional logging
- Moving checks before expensive DB calls
- Additional tests
2024-03-06 09:06:38 -05:00

580 lines
20 KiB
Python

import logging
import time
from collections import namedtuple
from enum import Enum
from typing import Dict, List
from peewee import JOIN, fn
import features
from data.database import (
ImageStorage,
ManifestBlob,
ManifestChild,
QuotaNamespaceSize,
QuotaRegistrySize,
QuotaRepositorySize,
Repository,
RepositoryState,
Tag,
User,
)
from data.model import config, db_transaction
from data.model.repository import lookup_repository
logger = logging.getLogger(__name__)
get_epoch_timestamp_ms = lambda: int(time.time() * 1000)
class QuotaOperation(str, Enum):
ADD = "add"
SUBTRACT = "subtract"
def update_quota(repository_id: int, manifest_id: int, blobs: dict, operation: QuotaOperation):
# If quota management is disabled, mark the total as stale and return
if not features.QUOTA_MANAGEMENT:
if config.app_config.get("QUOTA_INVALIDATE_TOTALS", True):
reset_backfill(repository_id)
return
# Not ideal to use such a wide exception but failure to calculate quota should not
# stop image pushes
try:
update_sizes(repository_id, manifest_id, blobs, operation)
except Exception as ex:
if features.QUOTA_SUPPRESS_FAILURES:
logger.exception(
"quota size calculation: failed to %s manifest id %s from repository %s with exception: %s",
operation,
manifest_id,
repository_id,
ex,
)
else:
raise ex
def update_sizes(repository_id: int, manifest_id: int, blobs: dict, operation: str):
"""
Adds or subtracts the blobs that are currently not being referrenced by
existing manifests from the total
"""
if len(blobs) == 0:
logger.debug(
"no blobs found for manifest %s in repository %s, skipping calculation",
manifest_id,
repository_id,
)
return
namespace_id = get_namespace_id_from_repository(repository_id)
if not eligible_namespace(namespace_id):
logger.debug(
"ineligible namespace %s for quota calculation, skipping calculation", namespace_id
)
return
# Addition - if the blob already referenced it's already been counted
# Subtraction - should only happen on the deletion of the last blob, if another exists
# don't subtract
namespace_total = 0
repository_total = 0
for blob_id, blob_image_size in blobs.items():
# If the blob doesn't exist in the namespace it doesn't exist in the repo either
# so add the total to both. If it exists in the namespace we need to check
# if it exists in the repository.
blob_size = blob_image_size if blob_image_size is not None else 0
if not blob_exists_in_namespace(namespace_id, manifest_id, blob_id):
namespace_total = namespace_total + blob_size
repository_total = repository_total + blob_size
elif not blob_exists_in_repository(repository_id, manifest_id, blob_id):
repository_total = repository_total + blob_size
write_namespace_total(namespace_id, manifest_id, namespace_total, operation)
# TODO: If repository is marked for deletion or doesn't exist, don't write total
write_repository_total(repository_id, manifest_id, repository_total, operation)
def blob_exists_in_namespace(namespace_id: int, manifest_id: int, blob_id: int):
# Return true if there exists some other manifest within the namespace that
# references this blob
return (
ManifestBlob.select(1)
.join(Repository, on=(ManifestBlob.repository == Repository.id))
.where(
Repository.namespace_user == namespace_id,
ManifestBlob.blob == blob_id,
ManifestBlob.manifest != manifest_id,
)
.exists()
)
def blob_exists_in_repository(repository_id: int, manifest_id: int, blob_id: int):
# Return true if there exists some other manifest within the repository that
# references this blob
return (
ManifestBlob.select(1)
.where(
ManifestBlob.repository == repository_id,
ManifestBlob.blob == blob_id,
ManifestBlob.manifest != manifest_id,
)
.exists()
)
def write_namespace_total(
namespace_id: int, manifest_id: int, namespace_total: int, operation: str
):
namespace_size = get_namespace_size(namespace_id)
namespace_size_exists = namespace_size is not None
# TODO: The following checks should be before the blob lookup to prevent
# unnecessary reads
# If backfill hasn't ran yet for this namespace don't do anything
if namespace_size_exists and not namespace_size.backfill_complete:
return
# If the namespacesize entry doesn't exist and this is the only manifest in the namespace
# we can assume this is the first push to the namespace and there is no blobs to be
# backfilled, so let the entry be created. Otherwise it still needs to be handled by the
# backfill worker so let's exit
if (
operation == QuotaOperation.ADD
and not namespace_size_exists
and only_manifest_in_namespace(namespace_id, manifest_id)
):
logger.info(
"inserting namespace size for manifest %s in namespace %s", manifest_id, namespace_id
)
# pylint: disable-next=no-value-for-parameter
QuotaNamespaceSize.insert(
namespace_user_id=namespace_id,
backfill_start_ms=0,
backfill_complete=True,
size_bytes=namespace_total,
).execute()
return
# If the quotanamespacesize entry doesn't exist and it's not the only
# manifest in the repository, it needs to be handled by the backfill worker.
# If it does exist we can add/subtract the total
if namespace_size_exists:
logger.info(
"updating namespace size for manifest %s in namespace %s, %s %s",
manifest_id,
namespace_id,
operation,
namespace_total,
)
params = {}
if operation == QuotaOperation.ADD:
params["size_bytes"] = QuotaNamespaceSize.size_bytes + namespace_total
elif operation == QuotaOperation.SUBTRACT:
params["size_bytes"] = QuotaNamespaceSize.size_bytes - namespace_total
QuotaNamespaceSize.update(**params).where(
QuotaNamespaceSize.namespace_user == namespace_id
).execute()
else:
logger.info("backfill required for manifest %s in namespace %s", manifest_id, namespace_id)
def write_repository_total(
repository_id: int, manifest_id: int, repository_total: int, operation: str
):
repository_size = get_repository_size(repository_id)
repository_size_exists = repository_size is not None
# TODO: The following checks should be before the blob lookup to prevent
# unnecessary reads
# If backfill hasn't ran yet for this repository don't do anything
if repository_size_exists and not repository_size.backfill_complete:
return
# If the repositorysize entry doesn't exist and this is the only manifest in the repository
# we can assume this is the first push to the repository and there is no blobs to be
# backfilled, so let the entry be created. Otherwise it still needs to be handled by the
# backfill worker so let's exit
if (
operation == QuotaOperation.ADD
and not repository_size_exists
and only_manifest_in_repository(repository_id, manifest_id)
):
logger.info(
"inserting repository size for manifest %s in repository %s", manifest_id, repository_id
)
# pylint: disable-next=no-value-for-parameter
QuotaRepositorySize.insert(
repository_id=repository_id,
backfill_start_ms=0,
backfill_complete=True,
size_bytes=repository_total,
).execute()
return
# If the quotarepositorysize entry doesn't exist and it's not the only
# manifest in the repository, it needs to be handled by the backfill worker.
# If it does exist we can add/subtract the total
if repository_size_exists:
logger.info(
"updating repository size for manifest %s in repository %s, %s %s",
manifest_id,
repository_id,
operation,
repository_total,
)
params = {}
if operation == QuotaOperation.ADD:
params["size_bytes"] = QuotaRepositorySize.size_bytes + repository_total
elif operation == QuotaOperation.SUBTRACT:
params["size_bytes"] = QuotaRepositorySize.size_bytes - repository_total
QuotaRepositorySize.update(**params).where(
QuotaRepositorySize.repository == repository_id
).execute()
else:
logger.info(
"backfill required for manifest %s in repository %s", manifest_id, repository_id
)
def get_namespace_id_from_repository(repository: int):
try:
repo = Repository.select(Repository.namespace_user).where(Repository.id == repository).get()
return repo.namespace_user_id
except Repository.DoesNotExist:
return None
def get_namespace_size(namespace_id: int):
try:
namespace_size = (
QuotaNamespaceSize.select()
.where(QuotaNamespaceSize.namespace_user == namespace_id)
.get()
)
return namespace_size
except QuotaNamespaceSize.DoesNotExist:
return None
def get_repository_size(repository_id: int):
try:
repository_size = (
QuotaRepositorySize.select()
.where(QuotaRepositorySize.repository == repository_id)
.get()
)
return repository_size
except QuotaRepositorySize.DoesNotExist:
return None
def only_manifest_in_namespace(namespace_id: int, manifest_id: int):
return not (
ManifestBlob.select(1)
.join(Repository, on=(Repository.id == ManifestBlob.repository))
.where(
Repository.namespace_user == namespace_id,
ManifestBlob.manifest != manifest_id,
Repository.state != RepositoryState.MARKED_FOR_DELETION,
)
.exists()
)
def only_manifest_in_repository(repository_id: int, manifest_id: int):
return not (
ManifestBlob.select(1)
.where(ManifestBlob.repository == repository_id, ManifestBlob.manifest != manifest_id)
.exists()
)
def is_blob_alive(namespace_id: int, tag_id: int, blob_id: int):
# Check if the blob is being referenced by an alive, non-hidden tag that isn't the
# tag we're currently creating/deleting within the namespace.
# Since sub-manifests are only considered alive if their parent tag is alive,
# check the parent tag as well.
# The where statements create an if ... else ... statement creating the logic:
# if ParentTag is None:
# check that Tag is not hidden, alive, and in the namespace
# elif ParentTag is not None:
# check that ParentTag is not hidden, alive, and in the namespace
ParentTag = Tag.alias()
return (
ManifestBlob.select(1)
.join(Repository, on=(ManifestBlob.repository == Repository.id))
.join(Tag, on=(Tag.manifest == ManifestBlob.manifest))
.join(
ManifestChild,
on=(ManifestBlob.manifest == ManifestChild.child_manifest),
join_type=JOIN.LEFT_OUTER,
)
.join(
ParentTag, on=(ManifestChild.manifest == ParentTag.manifest), join_type=JOIN.LEFT_OUTER
)
.where(
(
ParentTag.id.is_null(True)
& ~Tag.hidden
& (Repository.namespace_user == namespace_id)
& (ManifestBlob.blob == blob_id)
& (Tag.id != tag_id)
& (
Tag.lifetime_end_ms.is_null(True)
| (Tag.lifetime_end_ms > get_epoch_timestamp_ms())
)
)
| (
ParentTag.id.is_null(False)
& ~ParentTag.hidden
& (Repository.namespace_user == namespace_id)
& (ParentTag.id != tag_id)
& (ManifestBlob.blob == blob_id)
& (
ParentTag.lifetime_end_ms.is_null(True)
| (ParentTag.lifetime_end_ms > get_epoch_timestamp_ms())
)
)
)
.exists()
)
def eligible_namespace(namespace_id):
"""
Returns true if the namespace is eligible to have a quota size
"""
return User.select(1).where(User.id == namespace_id, User.enabled, ~User.robot).exists()
def run_backfill(namespace_id: int):
"""
Calculates the total of unique blobs in the namespace and repositories within
the namespace.
"""
namespace_size = get_namespace_size(namespace_id)
namespace_size_exists = namespace_size is not None
if not namespace_size_exists or (
namespace_size_exists
and not namespace_size.backfill_complete
and namespace_size.backfill_start_ms is None
):
params = {
"size_bytes": 0,
"backfill_start_ms": get_epoch_timestamp_ms(),
"backfill_complete": False,
}
update_namespacesize(namespace_id, params, namespace_size_exists)
params = {"size_bytes": get_namespace_total(namespace_id), "backfill_complete": True}
update_namespacesize(namespace_id, params, True)
# pylint: disable-next=not-an-iterable
for repository in repositories_in_namespace(namespace_id):
# Check to make sure the repository hasn't been deleted since the time passed
latest_repository = lookup_repository(repository.id)
if (
latest_repository is None
or latest_repository.state == RepositoryState.MARKED_FOR_DELETION
):
return
repository_size = get_repository_size(repository.id)
repository_size_exists = repository_size is not None
if not repository_size_exists or (
repository_size_exists
and not repository_size.backfill_complete
and repository_size.backfill_start_ms is None
):
params = {
"size_bytes": 0,
"backfill_start_ms": get_epoch_timestamp_ms(),
"backfill_complete": False,
}
update_repositorysize(repository.id, params, repository_size_exists)
params = {"size_bytes": get_repository_total(repository.id), "backfill_complete": True}
update_repositorysize(repository.id, params, True)
def get_namespace_total(namespace_id: int):
derived_ns = (
ImageStorage.select(ImageStorage.image_size)
.join(ManifestBlob, on=(ImageStorage.id == ManifestBlob.blob))
.join(Repository, on=(Repository.id == ManifestBlob.repository))
.where(
Repository.namespace_user == namespace_id,
)
.group_by(ImageStorage.id)
)
total = ImageStorage.select(fn.Sum(derived_ns.c.image_size)).from_(derived_ns).scalar()
return total if total is not None else 0
def get_repository_total(repository_id: int):
derived_ns = (
ImageStorage.select(ImageStorage.image_size)
.join(ManifestBlob, on=(ImageStorage.id == ManifestBlob.blob))
.where(ManifestBlob.repository == repository_id)
.group_by(ImageStorage.id)
)
total = ImageStorage.select(fn.Sum(derived_ns.c.image_size)).from_(derived_ns).scalar()
return total if total is not None else 0
def repositories_in_namespace(namespace_id: int):
return Repository.select().where(
Repository.namespace_user == namespace_id,
Repository.state != RepositoryState.MARKED_FOR_DELETION,
)
def update_namespacesize(namespace_id: int, params, exists=False):
if exists:
QuotaNamespaceSize.update(**params).where(
QuotaNamespaceSize.namespace_user == namespace_id
).execute()
else:
# pylint: disable-next=no-value-for-parameter
QuotaNamespaceSize.insert(namespace_user_id=namespace_id, **params).execute()
def update_repositorysize(repository_id: int, params, exists: bool):
if exists:
QuotaRepositorySize.update(**params).where(
QuotaRepositorySize.repository == repository_id
).execute()
else:
# pylint: disable-next=no-value-for-parameter
QuotaRepositorySize.insert(repository_id=repository_id, **params).execute()
def reset_backfill(repository_id: int):
"""
Resets the quotarepositorysize fields to be picked up by the backfill worker
for recalculation. Since the repository total will change we
need to reset the namespace backfill has well.
"""
if not config.app_config.get("QUOTA_INVALIDATE_TOTALS", True):
return
try:
QuotaRepositorySize.update(
{"size_bytes": 0, "backfill_start_ms": None, "backfill_complete": False}
).where(
QuotaRepositorySize.repository == repository_id,
QuotaRepositorySize.backfill_start_ms.is_null(False),
).execute()
namespace_id = get_namespace_id_from_repository(repository_id)
reset_namespace_backfill(namespace_id)
except QuotaRepositorySize.DoesNotExist:
pass
def reset_namespace_backfill(namespace_id: int):
"""
Resets the quotanamespacesize fields to be picked up by the backfill worker
for recalculation.
"""
if not config.app_config.get("QUOTA_INVALIDATE_TOTALS", True):
return
try:
QuotaNamespaceSize.update(
{"size_bytes": 0, "backfill_start_ms": None, "backfill_complete": False}
).where(
QuotaNamespaceSize.namespace_user == namespace_id,
QuotaNamespaceSize.backfill_start_ms.is_null(False),
).execute()
except QuotaNamespaceSize.DoesNotExist:
pass
def calculate_registry_size():
"""
Calculates the size of the registry. Concurrency is done through the
quotaregistrysize.running field.
"""
quota_registry_size = get_registry_size()
exists = quota_registry_size is not None
if exists and not quota_registry_size.running and quota_registry_size.queued:
set_registry_size_running(exists)
logger.info("Calculating registry size")
total_size = sum_registry_size()
logger.info("Completed calculation of registry size")
update_registry_size(total_size)
def get_registry_size():
try:
return QuotaRegistrySize.select().get()
except QuotaRegistrySize.DoesNotExist:
return None
def queue_registry_size_calculation():
"""
Queues the registry size calculation for the quotaregistrysizeworker by
setting quotaregistrysize.queued to true. Returns whether the calculation has been queued
and whether it was already queued.
"""
registry_size = get_registry_size()
registry_size_exists = registry_size is not None
if not registry_size_exists:
# pylint: disable-next=no-value-for-parameter
QuotaRegistrySize.insert(
{"size_bytes": 0, "running": False, "queued": True, "completed_ms": None}
).execute()
logger.info("Queued initial registry size calculation")
return True, False
if registry_size_exists and (registry_size.queued or registry_size.running):
logger.info("Registry size calculation already queued")
return True, True
if registry_size_exists and (not registry_size.running and not registry_size.queued):
# pylint: disable-next=no-value-for-parameter
updated = QuotaRegistrySize.update({"queued": True}).execute()
if updated != 0:
logger.info("Queued registry size calculation")
return updated != 0, False
def set_registry_size_running(exists=False):
if exists:
# pylint: disable-next=no-value-for-parameter
QuotaRegistrySize.update({"running": True, "queued": False}).execute()
else:
# pylint: disable-next=no-value-for-parameter
QuotaRegistrySize.insert({"running": True, "queued": False}).execute()
def sum_registry_size():
# pylint: disable-next=no-value-for-parameter
total_size = ImageStorage.select(fn.SUM(ImageStorage.image_size)).scalar()
return total_size if total_size is not None else 0
def update_registry_size(size=0):
# pylint: disable-next=no-value-for-parameter
QuotaRegistrySize.update(
{
"running": False,
"queued": False,
"size_bytes": size,
"completed_ms": get_epoch_timestamp_ms(),
}
).execute()