mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
Adds the following changes: - Additional logging - Moving checks before expensive DB calls - Additional tests
580 lines
20 KiB
Python
580 lines
20 KiB
Python
import logging
|
|
import time
|
|
from collections import namedtuple
|
|
from enum import Enum
|
|
from typing import Dict, List
|
|
|
|
from peewee import JOIN, fn
|
|
|
|
import features
|
|
from data.database import (
|
|
ImageStorage,
|
|
ManifestBlob,
|
|
ManifestChild,
|
|
QuotaNamespaceSize,
|
|
QuotaRegistrySize,
|
|
QuotaRepositorySize,
|
|
Repository,
|
|
RepositoryState,
|
|
Tag,
|
|
User,
|
|
)
|
|
from data.model import config, db_transaction
|
|
from data.model.repository import lookup_repository
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
get_epoch_timestamp_ms = lambda: int(time.time() * 1000)
|
|
|
|
|
|
class QuotaOperation(str, Enum):
|
|
ADD = "add"
|
|
SUBTRACT = "subtract"
|
|
|
|
|
|
def update_quota(repository_id: int, manifest_id: int, blobs: dict, operation: QuotaOperation):
|
|
# If quota management is disabled, mark the total as stale and return
|
|
if not features.QUOTA_MANAGEMENT:
|
|
if config.app_config.get("QUOTA_INVALIDATE_TOTALS", True):
|
|
reset_backfill(repository_id)
|
|
return
|
|
|
|
# Not ideal to use such a wide exception but failure to calculate quota should not
|
|
# stop image pushes
|
|
try:
|
|
update_sizes(repository_id, manifest_id, blobs, operation)
|
|
except Exception as ex:
|
|
if features.QUOTA_SUPPRESS_FAILURES:
|
|
logger.exception(
|
|
"quota size calculation: failed to %s manifest id %s from repository %s with exception: %s",
|
|
operation,
|
|
manifest_id,
|
|
repository_id,
|
|
ex,
|
|
)
|
|
else:
|
|
raise ex
|
|
|
|
|
|
def update_sizes(repository_id: int, manifest_id: int, blobs: dict, operation: str):
|
|
"""
|
|
Adds or subtracts the blobs that are currently not being referrenced by
|
|
existing manifests from the total
|
|
"""
|
|
if len(blobs) == 0:
|
|
logger.debug(
|
|
"no blobs found for manifest %s in repository %s, skipping calculation",
|
|
manifest_id,
|
|
repository_id,
|
|
)
|
|
return
|
|
|
|
namespace_id = get_namespace_id_from_repository(repository_id)
|
|
if not eligible_namespace(namespace_id):
|
|
logger.debug(
|
|
"ineligible namespace %s for quota calculation, skipping calculation", namespace_id
|
|
)
|
|
return
|
|
|
|
# Addition - if the blob already referenced it's already been counted
|
|
# Subtraction - should only happen on the deletion of the last blob, if another exists
|
|
# don't subtract
|
|
namespace_total = 0
|
|
repository_total = 0
|
|
for blob_id, blob_image_size in blobs.items():
|
|
|
|
# If the blob doesn't exist in the namespace it doesn't exist in the repo either
|
|
# so add the total to both. If it exists in the namespace we need to check
|
|
# if it exists in the repository.
|
|
blob_size = blob_image_size if blob_image_size is not None else 0
|
|
if not blob_exists_in_namespace(namespace_id, manifest_id, blob_id):
|
|
namespace_total = namespace_total + blob_size
|
|
repository_total = repository_total + blob_size
|
|
elif not blob_exists_in_repository(repository_id, manifest_id, blob_id):
|
|
repository_total = repository_total + blob_size
|
|
|
|
write_namespace_total(namespace_id, manifest_id, namespace_total, operation)
|
|
|
|
# TODO: If repository is marked for deletion or doesn't exist, don't write total
|
|
write_repository_total(repository_id, manifest_id, repository_total, operation)
|
|
|
|
|
|
def blob_exists_in_namespace(namespace_id: int, manifest_id: int, blob_id: int):
|
|
# Return true if there exists some other manifest within the namespace that
|
|
# references this blob
|
|
return (
|
|
ManifestBlob.select(1)
|
|
.join(Repository, on=(ManifestBlob.repository == Repository.id))
|
|
.where(
|
|
Repository.namespace_user == namespace_id,
|
|
ManifestBlob.blob == blob_id,
|
|
ManifestBlob.manifest != manifest_id,
|
|
)
|
|
.exists()
|
|
)
|
|
|
|
|
|
def blob_exists_in_repository(repository_id: int, manifest_id: int, blob_id: int):
|
|
# Return true if there exists some other manifest within the repository that
|
|
# references this blob
|
|
return (
|
|
ManifestBlob.select(1)
|
|
.where(
|
|
ManifestBlob.repository == repository_id,
|
|
ManifestBlob.blob == blob_id,
|
|
ManifestBlob.manifest != manifest_id,
|
|
)
|
|
.exists()
|
|
)
|
|
|
|
|
|
def write_namespace_total(
|
|
namespace_id: int, manifest_id: int, namespace_total: int, operation: str
|
|
):
|
|
namespace_size = get_namespace_size(namespace_id)
|
|
namespace_size_exists = namespace_size is not None
|
|
|
|
# TODO: The following checks should be before the blob lookup to prevent
|
|
# unnecessary reads
|
|
|
|
# If backfill hasn't ran yet for this namespace don't do anything
|
|
if namespace_size_exists and not namespace_size.backfill_complete:
|
|
return
|
|
|
|
# If the namespacesize entry doesn't exist and this is the only manifest in the namespace
|
|
# we can assume this is the first push to the namespace and there is no blobs to be
|
|
# backfilled, so let the entry be created. Otherwise it still needs to be handled by the
|
|
# backfill worker so let's exit
|
|
if (
|
|
operation == QuotaOperation.ADD
|
|
and not namespace_size_exists
|
|
and only_manifest_in_namespace(namespace_id, manifest_id)
|
|
):
|
|
logger.info(
|
|
"inserting namespace size for manifest %s in namespace %s", manifest_id, namespace_id
|
|
)
|
|
# pylint: disable-next=no-value-for-parameter
|
|
QuotaNamespaceSize.insert(
|
|
namespace_user_id=namespace_id,
|
|
backfill_start_ms=0,
|
|
backfill_complete=True,
|
|
size_bytes=namespace_total,
|
|
).execute()
|
|
return
|
|
|
|
# If the quotanamespacesize entry doesn't exist and it's not the only
|
|
# manifest in the repository, it needs to be handled by the backfill worker.
|
|
# If it does exist we can add/subtract the total
|
|
if namespace_size_exists:
|
|
logger.info(
|
|
"updating namespace size for manifest %s in namespace %s, %s %s",
|
|
manifest_id,
|
|
namespace_id,
|
|
operation,
|
|
namespace_total,
|
|
)
|
|
params = {}
|
|
if operation == QuotaOperation.ADD:
|
|
params["size_bytes"] = QuotaNamespaceSize.size_bytes + namespace_total
|
|
elif operation == QuotaOperation.SUBTRACT:
|
|
params["size_bytes"] = QuotaNamespaceSize.size_bytes - namespace_total
|
|
QuotaNamespaceSize.update(**params).where(
|
|
QuotaNamespaceSize.namespace_user == namespace_id
|
|
).execute()
|
|
else:
|
|
logger.info("backfill required for manifest %s in namespace %s", manifest_id, namespace_id)
|
|
|
|
|
|
def write_repository_total(
|
|
repository_id: int, manifest_id: int, repository_total: int, operation: str
|
|
):
|
|
repository_size = get_repository_size(repository_id)
|
|
repository_size_exists = repository_size is not None
|
|
|
|
# TODO: The following checks should be before the blob lookup to prevent
|
|
# unnecessary reads
|
|
|
|
# If backfill hasn't ran yet for this repository don't do anything
|
|
if repository_size_exists and not repository_size.backfill_complete:
|
|
return
|
|
|
|
# If the repositorysize entry doesn't exist and this is the only manifest in the repository
|
|
# we can assume this is the first push to the repository and there is no blobs to be
|
|
# backfilled, so let the entry be created. Otherwise it still needs to be handled by the
|
|
# backfill worker so let's exit
|
|
if (
|
|
operation == QuotaOperation.ADD
|
|
and not repository_size_exists
|
|
and only_manifest_in_repository(repository_id, manifest_id)
|
|
):
|
|
logger.info(
|
|
"inserting repository size for manifest %s in repository %s", manifest_id, repository_id
|
|
)
|
|
# pylint: disable-next=no-value-for-parameter
|
|
QuotaRepositorySize.insert(
|
|
repository_id=repository_id,
|
|
backfill_start_ms=0,
|
|
backfill_complete=True,
|
|
size_bytes=repository_total,
|
|
).execute()
|
|
return
|
|
|
|
# If the quotarepositorysize entry doesn't exist and it's not the only
|
|
# manifest in the repository, it needs to be handled by the backfill worker.
|
|
# If it does exist we can add/subtract the total
|
|
if repository_size_exists:
|
|
logger.info(
|
|
"updating repository size for manifest %s in repository %s, %s %s",
|
|
manifest_id,
|
|
repository_id,
|
|
operation,
|
|
repository_total,
|
|
)
|
|
params = {}
|
|
if operation == QuotaOperation.ADD:
|
|
params["size_bytes"] = QuotaRepositorySize.size_bytes + repository_total
|
|
elif operation == QuotaOperation.SUBTRACT:
|
|
params["size_bytes"] = QuotaRepositorySize.size_bytes - repository_total
|
|
QuotaRepositorySize.update(**params).where(
|
|
QuotaRepositorySize.repository == repository_id
|
|
).execute()
|
|
else:
|
|
logger.info(
|
|
"backfill required for manifest %s in repository %s", manifest_id, repository_id
|
|
)
|
|
|
|
|
|
def get_namespace_id_from_repository(repository: int):
|
|
try:
|
|
repo = Repository.select(Repository.namespace_user).where(Repository.id == repository).get()
|
|
return repo.namespace_user_id
|
|
except Repository.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def get_namespace_size(namespace_id: int):
|
|
try:
|
|
namespace_size = (
|
|
QuotaNamespaceSize.select()
|
|
.where(QuotaNamespaceSize.namespace_user == namespace_id)
|
|
.get()
|
|
)
|
|
return namespace_size
|
|
except QuotaNamespaceSize.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def get_repository_size(repository_id: int):
|
|
try:
|
|
repository_size = (
|
|
QuotaRepositorySize.select()
|
|
.where(QuotaRepositorySize.repository == repository_id)
|
|
.get()
|
|
)
|
|
return repository_size
|
|
except QuotaRepositorySize.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def only_manifest_in_namespace(namespace_id: int, manifest_id: int):
|
|
return not (
|
|
ManifestBlob.select(1)
|
|
.join(Repository, on=(Repository.id == ManifestBlob.repository))
|
|
.where(
|
|
Repository.namespace_user == namespace_id,
|
|
ManifestBlob.manifest != manifest_id,
|
|
Repository.state != RepositoryState.MARKED_FOR_DELETION,
|
|
)
|
|
.exists()
|
|
)
|
|
|
|
|
|
def only_manifest_in_repository(repository_id: int, manifest_id: int):
|
|
return not (
|
|
ManifestBlob.select(1)
|
|
.where(ManifestBlob.repository == repository_id, ManifestBlob.manifest != manifest_id)
|
|
.exists()
|
|
)
|
|
|
|
|
|
def is_blob_alive(namespace_id: int, tag_id: int, blob_id: int):
|
|
# Check if the blob is being referenced by an alive, non-hidden tag that isn't the
|
|
# tag we're currently creating/deleting within the namespace.
|
|
# Since sub-manifests are only considered alive if their parent tag is alive,
|
|
# check the parent tag as well.
|
|
# The where statements create an if ... else ... statement creating the logic:
|
|
# if ParentTag is None:
|
|
# check that Tag is not hidden, alive, and in the namespace
|
|
# elif ParentTag is not None:
|
|
# check that ParentTag is not hidden, alive, and in the namespace
|
|
ParentTag = Tag.alias()
|
|
return (
|
|
ManifestBlob.select(1)
|
|
.join(Repository, on=(ManifestBlob.repository == Repository.id))
|
|
.join(Tag, on=(Tag.manifest == ManifestBlob.manifest))
|
|
.join(
|
|
ManifestChild,
|
|
on=(ManifestBlob.manifest == ManifestChild.child_manifest),
|
|
join_type=JOIN.LEFT_OUTER,
|
|
)
|
|
.join(
|
|
ParentTag, on=(ManifestChild.manifest == ParentTag.manifest), join_type=JOIN.LEFT_OUTER
|
|
)
|
|
.where(
|
|
(
|
|
ParentTag.id.is_null(True)
|
|
& ~Tag.hidden
|
|
& (Repository.namespace_user == namespace_id)
|
|
& (ManifestBlob.blob == blob_id)
|
|
& (Tag.id != tag_id)
|
|
& (
|
|
Tag.lifetime_end_ms.is_null(True)
|
|
| (Tag.lifetime_end_ms > get_epoch_timestamp_ms())
|
|
)
|
|
)
|
|
| (
|
|
ParentTag.id.is_null(False)
|
|
& ~ParentTag.hidden
|
|
& (Repository.namespace_user == namespace_id)
|
|
& (ParentTag.id != tag_id)
|
|
& (ManifestBlob.blob == blob_id)
|
|
& (
|
|
ParentTag.lifetime_end_ms.is_null(True)
|
|
| (ParentTag.lifetime_end_ms > get_epoch_timestamp_ms())
|
|
)
|
|
)
|
|
)
|
|
.exists()
|
|
)
|
|
|
|
|
|
def eligible_namespace(namespace_id):
|
|
"""
|
|
Returns true if the namespace is eligible to have a quota size
|
|
"""
|
|
return User.select(1).where(User.id == namespace_id, User.enabled, ~User.robot).exists()
|
|
|
|
|
|
def run_backfill(namespace_id: int):
|
|
"""
|
|
Calculates the total of unique blobs in the namespace and repositories within
|
|
the namespace.
|
|
"""
|
|
namespace_size = get_namespace_size(namespace_id)
|
|
namespace_size_exists = namespace_size is not None
|
|
|
|
if not namespace_size_exists or (
|
|
namespace_size_exists
|
|
and not namespace_size.backfill_complete
|
|
and namespace_size.backfill_start_ms is None
|
|
):
|
|
params = {
|
|
"size_bytes": 0,
|
|
"backfill_start_ms": get_epoch_timestamp_ms(),
|
|
"backfill_complete": False,
|
|
}
|
|
update_namespacesize(namespace_id, params, namespace_size_exists)
|
|
|
|
params = {"size_bytes": get_namespace_total(namespace_id), "backfill_complete": True}
|
|
update_namespacesize(namespace_id, params, True)
|
|
|
|
# pylint: disable-next=not-an-iterable
|
|
for repository in repositories_in_namespace(namespace_id):
|
|
|
|
# Check to make sure the repository hasn't been deleted since the time passed
|
|
latest_repository = lookup_repository(repository.id)
|
|
if (
|
|
latest_repository is None
|
|
or latest_repository.state == RepositoryState.MARKED_FOR_DELETION
|
|
):
|
|
return
|
|
|
|
repository_size = get_repository_size(repository.id)
|
|
repository_size_exists = repository_size is not None
|
|
if not repository_size_exists or (
|
|
repository_size_exists
|
|
and not repository_size.backfill_complete
|
|
and repository_size.backfill_start_ms is None
|
|
):
|
|
params = {
|
|
"size_bytes": 0,
|
|
"backfill_start_ms": get_epoch_timestamp_ms(),
|
|
"backfill_complete": False,
|
|
}
|
|
update_repositorysize(repository.id, params, repository_size_exists)
|
|
|
|
params = {"size_bytes": get_repository_total(repository.id), "backfill_complete": True}
|
|
update_repositorysize(repository.id, params, True)
|
|
|
|
|
|
def get_namespace_total(namespace_id: int):
|
|
derived_ns = (
|
|
ImageStorage.select(ImageStorage.image_size)
|
|
.join(ManifestBlob, on=(ImageStorage.id == ManifestBlob.blob))
|
|
.join(Repository, on=(Repository.id == ManifestBlob.repository))
|
|
.where(
|
|
Repository.namespace_user == namespace_id,
|
|
)
|
|
.group_by(ImageStorage.id)
|
|
)
|
|
total = ImageStorage.select(fn.Sum(derived_ns.c.image_size)).from_(derived_ns).scalar()
|
|
return total if total is not None else 0
|
|
|
|
|
|
def get_repository_total(repository_id: int):
|
|
derived_ns = (
|
|
ImageStorage.select(ImageStorage.image_size)
|
|
.join(ManifestBlob, on=(ImageStorage.id == ManifestBlob.blob))
|
|
.where(ManifestBlob.repository == repository_id)
|
|
.group_by(ImageStorage.id)
|
|
)
|
|
total = ImageStorage.select(fn.Sum(derived_ns.c.image_size)).from_(derived_ns).scalar()
|
|
return total if total is not None else 0
|
|
|
|
|
|
def repositories_in_namespace(namespace_id: int):
|
|
return Repository.select().where(
|
|
Repository.namespace_user == namespace_id,
|
|
Repository.state != RepositoryState.MARKED_FOR_DELETION,
|
|
)
|
|
|
|
|
|
def update_namespacesize(namespace_id: int, params, exists=False):
|
|
if exists:
|
|
QuotaNamespaceSize.update(**params).where(
|
|
QuotaNamespaceSize.namespace_user == namespace_id
|
|
).execute()
|
|
else:
|
|
# pylint: disable-next=no-value-for-parameter
|
|
QuotaNamespaceSize.insert(namespace_user_id=namespace_id, **params).execute()
|
|
|
|
|
|
def update_repositorysize(repository_id: int, params, exists: bool):
|
|
if exists:
|
|
QuotaRepositorySize.update(**params).where(
|
|
QuotaRepositorySize.repository == repository_id
|
|
).execute()
|
|
else:
|
|
# pylint: disable-next=no-value-for-parameter
|
|
QuotaRepositorySize.insert(repository_id=repository_id, **params).execute()
|
|
|
|
|
|
def reset_backfill(repository_id: int):
|
|
"""
|
|
Resets the quotarepositorysize fields to be picked up by the backfill worker
|
|
for recalculation. Since the repository total will change we
|
|
need to reset the namespace backfill has well.
|
|
"""
|
|
if not config.app_config.get("QUOTA_INVALIDATE_TOTALS", True):
|
|
return
|
|
|
|
try:
|
|
QuotaRepositorySize.update(
|
|
{"size_bytes": 0, "backfill_start_ms": None, "backfill_complete": False}
|
|
).where(
|
|
QuotaRepositorySize.repository == repository_id,
|
|
QuotaRepositorySize.backfill_start_ms.is_null(False),
|
|
).execute()
|
|
namespace_id = get_namespace_id_from_repository(repository_id)
|
|
reset_namespace_backfill(namespace_id)
|
|
except QuotaRepositorySize.DoesNotExist:
|
|
pass
|
|
|
|
|
|
def reset_namespace_backfill(namespace_id: int):
|
|
"""
|
|
Resets the quotanamespacesize fields to be picked up by the backfill worker
|
|
for recalculation.
|
|
"""
|
|
if not config.app_config.get("QUOTA_INVALIDATE_TOTALS", True):
|
|
return
|
|
|
|
try:
|
|
QuotaNamespaceSize.update(
|
|
{"size_bytes": 0, "backfill_start_ms": None, "backfill_complete": False}
|
|
).where(
|
|
QuotaNamespaceSize.namespace_user == namespace_id,
|
|
QuotaNamespaceSize.backfill_start_ms.is_null(False),
|
|
).execute()
|
|
except QuotaNamespaceSize.DoesNotExist:
|
|
pass
|
|
|
|
|
|
def calculate_registry_size():
|
|
"""
|
|
Calculates the size of the registry. Concurrency is done through the
|
|
quotaregistrysize.running field.
|
|
"""
|
|
quota_registry_size = get_registry_size()
|
|
exists = quota_registry_size is not None
|
|
|
|
if exists and not quota_registry_size.running and quota_registry_size.queued:
|
|
set_registry_size_running(exists)
|
|
logger.info("Calculating registry size")
|
|
total_size = sum_registry_size()
|
|
logger.info("Completed calculation of registry size")
|
|
update_registry_size(total_size)
|
|
|
|
|
|
def get_registry_size():
|
|
try:
|
|
return QuotaRegistrySize.select().get()
|
|
except QuotaRegistrySize.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def queue_registry_size_calculation():
|
|
"""
|
|
Queues the registry size calculation for the quotaregistrysizeworker by
|
|
setting quotaregistrysize.queued to true. Returns whether the calculation has been queued
|
|
and whether it was already queued.
|
|
"""
|
|
registry_size = get_registry_size()
|
|
registry_size_exists = registry_size is not None
|
|
|
|
if not registry_size_exists:
|
|
# pylint: disable-next=no-value-for-parameter
|
|
QuotaRegistrySize.insert(
|
|
{"size_bytes": 0, "running": False, "queued": True, "completed_ms": None}
|
|
).execute()
|
|
logger.info("Queued initial registry size calculation")
|
|
return True, False
|
|
|
|
if registry_size_exists and (registry_size.queued or registry_size.running):
|
|
logger.info("Registry size calculation already queued")
|
|
return True, True
|
|
|
|
if registry_size_exists and (not registry_size.running and not registry_size.queued):
|
|
# pylint: disable-next=no-value-for-parameter
|
|
updated = QuotaRegistrySize.update({"queued": True}).execute()
|
|
if updated != 0:
|
|
logger.info("Queued registry size calculation")
|
|
return updated != 0, False
|
|
|
|
|
|
def set_registry_size_running(exists=False):
|
|
if exists:
|
|
# pylint: disable-next=no-value-for-parameter
|
|
QuotaRegistrySize.update({"running": True, "queued": False}).execute()
|
|
else:
|
|
# pylint: disable-next=no-value-for-parameter
|
|
QuotaRegistrySize.insert({"running": True, "queued": False}).execute()
|
|
|
|
|
|
def sum_registry_size():
|
|
# pylint: disable-next=no-value-for-parameter
|
|
total_size = ImageStorage.select(fn.SUM(ImageStorage.image_size)).scalar()
|
|
return total_size if total_size is not None else 0
|
|
|
|
|
|
def update_registry_size(size=0):
|
|
# pylint: disable-next=no-value-for-parameter
|
|
QuotaRegistrySize.update(
|
|
{
|
|
"running": False,
|
|
"queued": False,
|
|
"size_bytes": size,
|
|
"completed_ms": get_epoch_timestamp_ms(),
|
|
}
|
|
).execute()
|