1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/data/model/repository.py
Michaela Lang be82aefb44 proxycache(permissions): CVE-2025-4374 (PROJQUAY-8892) (#3941)
fixing CVE-2025-4374 by extending the create_repository method to understand if we are requesting a proxy_cache repository
added unittests for create_repository when proxy_cache.
2025-05-22 09:09:02 -04:00

777 lines
23 KiB
Python

import json
import logging
import random
import uuid
from datetime import datetime, timedelta
from enum import Enum
from cachetools.func import ttl_cache
from peewee import JOIN, SQL, Case, IntegrityError, fn
import data
from data.database import (
ApprTag,
BlobUpload,
DeletedRepository,
ExternalNotificationEvent,
ImageStorage,
Label,
Manifest,
ManifestBlob,
ManifestChild,
Namespace,
QuotaRepositorySize,
Repository,
RepositoryActionCount,
RepositoryAuthorizedEmail,
RepositoryKind,
RepositoryNotification,
RepositoryPermission,
RepositorySearchScore,
RepositoryState,
Role,
Star,
Tag,
User,
Visibility,
db_concat_func,
db_for_update,
db_random_func,
get_epoch_timestamp,
get_epoch_timestamp_ms,
)
from data.model import (
DataModelException,
_basequery,
config,
db_transaction,
oci,
permission,
storage,
)
from data.text import prefix_search
from util.itertoolrecipes import take
logger = logging.getLogger(__name__)
SEARCH_FIELDS = Enum("SearchFields", ["name", "description"])
class RepoStateConfigException(Exception):
"""
Repository.state value requires further configuration to operate.
"""
pass
def lookup_secscan_notification_severities(repository_id):
"""
Returns the configured security scanner notification severities for the repository
or None if none.
"""
try:
repo = Repository.get(id=repository_id)
except Repository.DoesNotExist:
return None
event_kind = ExternalNotificationEvent.get(name="vulnerability_found")
for event in RepositoryNotification.select().where(
RepositoryNotification.repository == repository_id,
RepositoryNotification.event == event_kind,
):
severity = json.loads(event.event_config_json).get("vulnerability", {}).get("priority")
if severity:
yield severity
def get_max_id():
"""
Gets the maximum id for repository.
"""
return Repository.select(fn.Max(Repository.id)).scalar()
def get_min_id():
"""
Gets the minimum id for repository.
"""
return Repository.select(fn.Min(Repository.id)).scalar()
def get_repository_count():
"""Returns the count of repositories."""
return Repository.select().count()
def get_repo_kind_name(repo):
return Repository.kind.get_name(repo.kind_id)
def get_estimated_repository_count():
return _basequery.estimated_row_count(Repository)
def get_public_repo_visibility():
return _basequery.get_public_repo_visibility()
class _RepositoryExistsException(Exception):
"""Exception raised if a repository exists in create_repository. Used to breakout of
the transaction.
"""
def __init__(self, internal_exception):
self.internal_exception = internal_exception
def create_repository(
namespace,
name,
creating_user,
visibility="private",
repo_kind="image",
description=None,
proxy_cache=False,
):
namespace_user = User.get(username=namespace)
yesterday = datetime.now() - timedelta(days=1)
try:
with db_transaction():
# Check if the repository exists to avoid an IntegrityError if possible.
existing = get_repository(namespace, name)
if existing is not None:
return None
try:
repo = Repository.create(
name=name,
visibility=Repository.visibility.get_id(visibility),
namespace_user=namespace_user,
kind=Repository.kind.get_id(repo_kind),
description=description,
)
except IntegrityError as ie:
raise _RepositoryExistsException(ie)
RepositoryActionCount.create(repository=repo, count=0, date=yesterday)
RepositorySearchScore.create(repository=repo, score=0)
# Note: We put the admin create permission under the transaction to ensure it is created.
if creating_user and not creating_user.organization:
rolename = "admin" if proxy_cache == False else "read"
admin = Role.get(name=rolename)
RepositoryPermission.create(user=creating_user, repository=repo, role=admin)
except _RepositoryExistsException as ree:
try:
return Repository.get(namespace_user=namespace_user, name=name)
except Repository.DoesNotExist:
logger.error(
"Got integrity error when trying to create repository %s/%s: %s",
namespace,
name,
ree.internal_exception,
)
return None
# Apply default permissions (only occurs for repositories under organizations)
if creating_user and not creating_user.organization and creating_user.username != namespace:
permission.apply_default_permissions(repo, creating_user)
return repo
def get_repository(namespace_name, repository_name, kind_filter=None):
try:
return _basequery.get_existing_repository(
namespace_name, repository_name, kind_filter=kind_filter
)
except Repository.DoesNotExist:
return None
def get_or_create_repository(
namespace, name, creating_user, visibility="private", repo_kind="image"
):
repo = get_repository(namespace, name, repo_kind)
if repo is None:
repo = create_repository(namespace, name, creating_user, visibility, repo_kind)
return repo
@ttl_cache(maxsize=1, ttl=600)
def _get_gc_expiration_policies():
policy_tuples_query = (
Namespace.select(Namespace.removed_tag_expiration_s)
.distinct()
.limit(100) # This sucks but it's the only way to limit memory
.tuples()
)
return [policy[0] for policy in policy_tuples_query]
def get_random_gc_policy():
"""
Return a single random policy from the database to use when garbage collecting or None if none
available.
"""
policies = _get_gc_expiration_policies()
if not policies:
return None
return random.choice(policies)
def star_repository(user, repository):
"""
Stars a repository.
"""
star = Star.create(user=user.id, repository=repository.id)
star.save()
def unstar_repository(user, repository):
"""
Unstars a repository.
"""
try:
(Star.delete().where(Star.repository == repository.id, Star.user == user.id).execute())
except Star.DoesNotExist:
raise DataModelException("Star not found.")
def set_trust(repo, trust_enabled):
repo.trust_enabled = trust_enabled
repo.save()
def set_description(repo, description):
repo.description = description
repo.save()
def get_user_starred_repositories(user, kind_filter="image"):
"""
Retrieves all of the repositories a user has starred.
"""
try:
repo_kind = Repository.kind.get_id(kind_filter)
except RepositoryKind.DoesNotExist:
raise DataModelException("Unknown kind of repository")
query = (
Repository.select(Repository, User, Visibility, Repository.id.alias("rid"))
.join(Star)
.switch(Repository)
.join(User)
.switch(Repository)
.join(Visibility)
.where(Star.user == user, Repository.kind == repo_kind)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
)
return query
def repository_is_starred(user, repository):
"""
Determines whether a user has starred a repository or not.
"""
try:
(Star.select().where(Star.repository == repository.id, Star.user == user.id).get())
return True
except Star.DoesNotExist:
return False
def get_stars(repository_ids):
"""
Returns a map from repository ID to the number of stars for each repository in the given
repository IDs list.
"""
if not repository_ids:
return {}
tuples = (
Star.select(Star.repository, fn.Count(Star.id))
.where(Star.repository << repository_ids)
.group_by(Star.repository)
.tuples()
)
star_map = {}
for record in tuples:
star_map[record[0]] = record[1]
return star_map
def get_visible_repositories(
username,
namespace=None,
kind_filter="image",
include_public=False,
start_id=None,
limit=None,
is_superuser=False,
return_all=False,
):
"""
Returns the repositories visible to the given user (if any).
"""
if not include_public and not username:
# Short circuit by returning a query that will find no repositories. We need to return a query
# here, as it will be modified by other queries later on.
return Repository.select(Repository.id.alias("rid")).where(Repository.id == -1)
query = (
Repository.select(
Repository.name,
Repository.id.alias("rid"),
Repository.description,
Namespace.username,
Repository.visibility,
Repository.kind,
Repository.state,
)
.switch(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
)
user_id = None
if username:
# Note: We only need the permissions table if we will filter based on a user's permissions.
query = query.switch(Repository).distinct().join(RepositoryPermission, JOIN.LEFT_OUTER)
found_namespace = _get_namespace_user(username)
if not found_namespace:
return Repository.select(Repository.id.alias("rid")).where(Repository.id == -1)
user_id = found_namespace.id
query = _basequery.filter_to_repos_for_user(
query,
user_id,
namespace,
kind_filter,
include_public,
start_id=start_id,
is_superuser=is_superuser,
return_all=return_all,
)
if limit is not None:
query = query.limit(limit).order_by(SQL("rid"))
return query
def get_app_repository(namespace_name, repository_name):
"""
Find an application repository.
"""
try:
return _basequery.get_existing_repository(
namespace_name, repository_name, kind_filter="application"
)
except Repository.DoesNotExist:
return None
def get_app_search(lookup, search_fields=None, username=None, limit=50):
if search_fields is None:
search_fields = set([SEARCH_FIELDS.name.name])
return get_filtered_matching_repositories(
lookup,
filter_username=username,
search_fields=search_fields,
repo_kind="application",
offset=0,
limit=limit,
)
def _get_namespace_user(username):
try:
return User.get(username=username)
except User.DoesNotExist:
return None
def get_filtered_matching_repositories(
lookup_value, filter_username=None, repo_kind="image", offset=0, limit=25, search_fields=None
):
"""
Returns an iterator of all repositories matching the given lookup value, with optional filtering
to a specific user.
If the user is unspecified, only public repositories will be returned.
"""
if search_fields is None:
search_fields = set([SEARCH_FIELDS.description.name, SEARCH_FIELDS.name.name])
# Build the unfiltered search query.
unfiltered_query = _get_sorted_matching_repositories(
lookup_value,
repo_kind=repo_kind,
search_fields=search_fields,
include_private=filter_username is not None,
ids_only=filter_username is not None,
)
# Add a filter to the iterator, if necessary.
if filter_username is not None:
filter_user = _get_namespace_user(filter_username)
if filter_user is None:
return []
# NOTE: We add the offset to the limit here to ensure we have enough results
# for the take's we conduct below.
iterator = _filter_repositories_visible_to_user(
unfiltered_query, filter_user.id, offset + limit, repo_kind
)
if offset > 0:
take(offset, iterator)
# Return the results.
return list(take(limit, iterator))
return list(unfiltered_query.offset(offset).limit(limit))
def _filter_repositories_visible_to_user(unfiltered_query, filter_user_id, limit, repo_kind):
encountered = set()
chunk_count = limit * 2
unfiltered_page = 0
iteration_count = 0
while iteration_count < 10: # Just to be safe
# Find the next chunk's worth of repository IDs, paginated by the chunk size.
unfiltered_page = unfiltered_page + 1
found_ids = [r.id for r in unfiltered_query.paginate(unfiltered_page, chunk_count)]
# Make sure we haven't encountered these results before. This code is used to handle
# the case where we've previously seen a result, as pagination is not necessary
# stable in SQL databases.
unfiltered_repository_ids = set(found_ids)
new_unfiltered_ids = unfiltered_repository_ids - encountered
if not new_unfiltered_ids:
break
encountered.update(new_unfiltered_ids)
# Filter the repositories found to only those visible to the current user.
query = (
Repository.select(Repository, Namespace)
.distinct()
.join(Namespace, on=(Namespace.id == Repository.namespace_user))
.switch(Repository)
.join(RepositoryPermission)
.where(Repository.id << list(new_unfiltered_ids))
)
filtered = _basequery.filter_to_repos_for_user(query, filter_user_id, repo_kind=repo_kind)
# Sort the filtered repositories by their initial order.
all_filtered_repos = list(filtered)
all_filtered_repos.sort(key=lambda repo: found_ids.index(repo.id))
# Yield the repositories in sorted order.
for filtered_repo in all_filtered_repos:
yield filtered_repo
# If the number of found IDs is less than the chunk count, then we're done.
if len(found_ids) < chunk_count:
break
iteration_count = iteration_count + 1
def _get_sorted_matching_repositories(
lookup_value, repo_kind="image", include_private=False, search_fields=None, ids_only=False
):
"""
Returns a query of repositories matching the given lookup string, with optional inclusion of
private repositories.
Note that this method does *not* filter results based on visibility to users.
"""
select_fields = [Repository.id] if ids_only else [Repository, Namespace]
# Used to make sure that the join to the Namespace table only occurs once
is_namespace_table_joined = False
if not lookup_value:
# This is a generic listing of repositories. Simply return the sorted repositories based
# on RepositorySearchScore.
query = (
Repository.select(*select_fields)
.join(RepositorySearchScore)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.order_by(RepositorySearchScore.score.desc(), RepositorySearchScore.id)
)
else:
if search_fields is None:
search_fields = set([SEARCH_FIELDS.description.name, SEARCH_FIELDS.name.name])
# Always search at least on name (init clause)
repo_name_value, user_name_value = lookup_value, None
if "/" in lookup_value:
user_name_value, repo_name_value = lookup_value.split("/", 1)
clause = Repository.name.match(repo_name_value)
computed_score = RepositorySearchScore.score.alias("score")
# If the description field is in the search fields, then we need to compute a synthetic score
# to discount the weight of the description more than the name.
if SEARCH_FIELDS.description.name in search_fields:
clause = Repository.description.match(repo_name_value) | clause
cases = [
(Repository.name.match(repo_name_value), 100 * RepositorySearchScore.score),
]
computed_score = Case(None, cases, RepositorySearchScore.score).alias("score")
select_fields.append(computed_score)
query = (
Repository.select(*select_fields, can_use_read_replica=True)
.join(RepositorySearchScore)
.where(clause)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.order_by(SQL("score").desc(), RepositorySearchScore.id)
)
# If an organization/user was found and it was not blank.
if user_name_value is not None and user_name_value != "":
query = (
query.switch(Repository)
.join(Namespace)
.where(Namespace.username == user_name_value)
)
is_namespace_table_joined = True
if repo_kind is not None:
query = query.where(Repository.kind == Repository.kind.get_id(repo_kind))
if not include_private:
query = query.where(Repository.visibility == _basequery.get_public_repo_visibility())
if not ids_only and not is_namespace_table_joined:
query = query.switch(Repository).join(Namespace)
return query
def lookup_repository(repo_id):
try:
return Repository.get(Repository.id == repo_id)
except Repository.DoesNotExist:
return None
def repository_visibility_name(repository):
return "public" if is_repository_public(repository) else "private"
def is_repository_public(repository):
return repository.visibility_id == _basequery.get_public_repo_visibility().id
def repository_is_public(namespace_name, repository_name):
try:
(
Repository.select()
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(Repository)
.join(Visibility)
.where(
Namespace.username == namespace_name,
Repository.name == repository_name,
Visibility.name == "public",
)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.get()
)
return True
except Repository.DoesNotExist:
return False
def set_repository_visibility(repo, visibility):
visibility_obj = Visibility.get(name=visibility)
if not visibility_obj:
return
repo.visibility = visibility_obj
repo.save()
def get_email_authorized_for_repo(namespace, repository, email):
try:
return (
RepositoryAuthorizedEmail.select(RepositoryAuthorizedEmail, Repository, Namespace)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(
Namespace.username == namespace,
Repository.name == repository,
RepositoryAuthorizedEmail.email == email,
)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.get()
)
except RepositoryAuthorizedEmail.DoesNotExist:
return None
def create_email_authorization_for_repo(namespace_name, repository_name, email):
try:
repo = _basequery.get_existing_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
raise DataModelException("Invalid repository %s/%s" % (namespace_name, repository_name))
return RepositoryAuthorizedEmail.create(repository=repo, email=email, confirmed=False)
def confirm_email_authorization_for_repo(code):
try:
found = (
RepositoryAuthorizedEmail.select(RepositoryAuthorizedEmail, Repository, Namespace)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(RepositoryAuthorizedEmail.code == code)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.get()
)
except RepositoryAuthorizedEmail.DoesNotExist:
raise DataModelException("Invalid confirmation code.")
found.confirmed = True
found.save()
return found
def get_repository_state(namespace_name, repository_name):
"""
Return the Repository State if the Repository exists.
Otherwise, returns None.
"""
repo = get_repository(namespace_name, repository_name)
if repo:
return repo.state
return None
def set_repository_state(repo, state):
repo.state = state
repo.save()
def mark_repository_for_deletion(
namespace_name, repository_name, repository_gc_queue, available_after=0, deleted_suffix=None
):
"""
Marks a repository for future deletion in the background.
The repository will be renamed and hidden, and then deleted later by a worker.
"""
repo = get_repository(namespace_name, repository_name)
if not repo:
return None
with db_transaction():
# Delete any stars for the repository.
Star.delete().where(Star.repository == repo).execute()
# Change the name and state of the repository.
deleted_name = str(uuid.uuid4())
if deleted_suffix is not None:
deleted_name = "_".join([deleted_name, str(deleted_suffix)])
repo.name = deleted_name
repo.state = RepositoryState.MARKED_FOR_DELETION
repo.save()
# Create a tracking row and a queueitem to delete the repository.
marker = DeletedRepository.create(repository=repo, original_name=repository_name)
# Add a queueitem to delete the repository.
marker.queue_id = repository_gc_queue.put(
[namespace_name, str(repo.id)],
json.dumps(
{
"marker_id": marker.id,
"original_name": repository_name,
}
),
available_after=available_after,
)
marker.save()
# Run callbacks for the deleted repo.
for callback in config.repo_cleanup_callbacks:
callback(namespace_name, repository_name)
return marker.id
def get_repository_size(repo_id: int):
try:
size = (
QuotaRepositorySize.select(QuotaRepositorySize.size_bytes)
.where(QuotaRepositorySize.repository == repo_id)
.scalar()
)
return size if size is not None else 0
except QuotaRepositorySize.DoesNotExist:
return 0
def get_repository_sizes(repo_ids: list):
"""
Returns a map from repository ID to the size of the repository in bytes.
List of repo_ids should be kept below 1000 to avoid performance issues.
"""
if not repo_ids:
return {}
if len(repo_ids) > 1000:
logger.warning(
"Fetching more than 1000 repository sizes at once, you may experience performance issues."
)
tuples = (
QuotaRepositorySize.select(
QuotaRepositorySize.repository,
QuotaRepositorySize.size_bytes,
can_use_read_replica=True,
)
.where(QuotaRepositorySize.repository << repo_ids)
.tuples()
)
size_map = {}
for record in tuples:
size_map[record[0]] = record[1] if record[1] is not None else 0
# Default to 0 for any repositories that don't have a size.
for repo_id in repo_ids:
if repo_id not in size_map:
size_map[repo_id] = 0
return size_map
def get_size_during_upload(repo_id: int):
query = (
BlobUpload.select(fn.Sum(BlobUpload.byte_count).alias("size_bytes")).where(
BlobUpload.repository_id == repo_id
)
).get()
return query.size_bytes if query.size_bytes is not None else 0