mirror of
https://github.com/quay/quay.git
synced 2026-01-27 18:42:52 +03:00
Add database models, migration, and CRUD functions for namespace and repository immutability policies. Policies define regex patterns that automatically mark matching tags as immutable when created. Signed-off-by: Brady Pratt <bpratt@redhat.com> Co-authored-by: Claude <noreply@anthropic.com>
357 lines
13 KiB
Python
357 lines
13 KiB
Python
import re
|
|
from functools import lru_cache
|
|
from re import Pattern
|
|
from typing import Optional, TypedDict
|
|
|
|
from data.database import (
|
|
NamespaceImmutabilityPolicy as NamespaceImmutabilityPolicyTable,
|
|
)
|
|
from data.database import Repository
|
|
from data.database import (
|
|
RepositoryImmutabilityPolicy as RepositoryImmutabilityPolicyTable,
|
|
)
|
|
from data.database import User
|
|
from data.model import (
|
|
DuplicateImmutabilityPolicy,
|
|
ImmutabilityPolicyDoesNotExist,
|
|
InvalidImmutabilityPolicy,
|
|
InvalidRepositoryException,
|
|
db_transaction,
|
|
repository,
|
|
)
|
|
from data.model.user import get_active_namespace_user_by_username
|
|
|
|
|
|
class PolicyConfig(TypedDict, total=False):
|
|
tag_pattern: str
|
|
tag_pattern_matches: bool
|
|
|
|
|
|
class NamespaceImmutabilityPolicy:
|
|
def __init__(self, db_row: NamespaceImmutabilityPolicyTable) -> None:
|
|
config = db_row.policy
|
|
self._db_row = db_row
|
|
self.uuid: str = db_row.uuid
|
|
self.tag_pattern: Optional[str] = config.get("tag_pattern")
|
|
self.tag_pattern_matches: bool = config.get("tag_pattern_matches", True)
|
|
|
|
def get_view(self) -> dict[str, str | bool | None]:
|
|
return {
|
|
"uuid": self.uuid,
|
|
"tagPattern": self.tag_pattern,
|
|
"tagPatternMatches": self.tag_pattern_matches,
|
|
}
|
|
|
|
|
|
class RepositoryImmutabilityPolicy:
|
|
def __init__(self, db_row: RepositoryImmutabilityPolicyTable) -> None:
|
|
config = db_row.policy
|
|
self._db_row = db_row
|
|
self.uuid: str = db_row.uuid
|
|
self.tag_pattern: Optional[str] = config.get("tag_pattern")
|
|
self.tag_pattern_matches: bool = config.get("tag_pattern_matches", True)
|
|
|
|
def get_view(self) -> dict[str, str | bool | None]:
|
|
return {
|
|
"uuid": self.uuid,
|
|
"tagPattern": self.tag_pattern,
|
|
"tagPatternMatches": self.tag_pattern_matches,
|
|
}
|
|
|
|
|
|
def _validate_policy(policy_config: PolicyConfig) -> None:
|
|
"""Validate policy config. Raises InvalidImmutabilityPolicy on error."""
|
|
tag_pattern = policy_config.get("tag_pattern")
|
|
|
|
if not tag_pattern or not isinstance(tag_pattern, str):
|
|
raise InvalidImmutabilityPolicy("tag_pattern is required and must be a non-empty string")
|
|
|
|
if len(tag_pattern) > 256:
|
|
raise InvalidImmutabilityPolicy("tag_pattern must be 256 characters or less")
|
|
|
|
try:
|
|
re.compile(tag_pattern)
|
|
except re.error as e:
|
|
raise InvalidImmutabilityPolicy(f"Invalid regex pattern: {e}")
|
|
|
|
tag_pattern_matches = policy_config.get("tag_pattern_matches")
|
|
if tag_pattern_matches is not None and not isinstance(tag_pattern_matches, bool):
|
|
raise InvalidImmutabilityPolicy("tag_pattern_matches must be a boolean")
|
|
|
|
|
|
@lru_cache(maxsize=256)
|
|
def _compile_pattern(pattern: str) -> Pattern[str]:
|
|
return re.compile(pattern)
|
|
|
|
|
|
def _matches_policy(tag_name: str, tag_pattern: str, tag_pattern_matches: bool) -> bool:
|
|
"""Check if tag should be immutable based on pattern."""
|
|
try:
|
|
matches = bool(_compile_pattern(tag_pattern).match(tag_name))
|
|
except re.error:
|
|
return False
|
|
return matches if tag_pattern_matches else not matches
|
|
|
|
|
|
def _is_duplicate_namespace_policy(
|
|
namespace_id: int, policy_config: PolicyConfig, exclude_uuid: Optional[str] = None
|
|
) -> bool:
|
|
"""Check if a policy with the same tag_pattern already exists for namespace."""
|
|
new_pattern = policy_config.get("tag_pattern")
|
|
new_matches = policy_config.get("tag_pattern_matches", True)
|
|
|
|
for row in NamespaceImmutabilityPolicyTable.select().where(
|
|
NamespaceImmutabilityPolicyTable.namespace == namespace_id
|
|
):
|
|
if exclude_uuid and row.uuid == exclude_uuid:
|
|
continue
|
|
existing = row.policy
|
|
if (
|
|
existing.get("tag_pattern") == new_pattern
|
|
and existing.get("tag_pattern_matches", True) == new_matches
|
|
):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _is_duplicate_repository_policy(
|
|
repo_id: int, policy_config: PolicyConfig, exclude_uuid: Optional[str] = None
|
|
) -> bool:
|
|
"""Check if a policy with the same tag_pattern already exists for repository."""
|
|
new_pattern = policy_config.get("tag_pattern")
|
|
new_matches = policy_config.get("tag_pattern_matches", True)
|
|
|
|
for row in RepositoryImmutabilityPolicyTable.select().where(
|
|
RepositoryImmutabilityPolicyTable.repository == repo_id
|
|
):
|
|
if exclude_uuid and row.uuid == exclude_uuid:
|
|
continue
|
|
existing = row.policy
|
|
if (
|
|
existing.get("tag_pattern") == new_pattern
|
|
and existing.get("tag_pattern_matches", True) == new_matches
|
|
):
|
|
return True
|
|
return False
|
|
|
|
|
|
# Namespace policy CRUD
|
|
|
|
|
|
def get_namespace_immutability_policies(orgname: str) -> list[NamespaceImmutabilityPolicy]:
|
|
"""Get all immutability policies for namespace."""
|
|
query = NamespaceImmutabilityPolicyTable.select().join(User).where(User.username == orgname)
|
|
return [NamespaceImmutabilityPolicy(row) for row in query]
|
|
|
|
|
|
def get_namespace_immutability_policy(
|
|
orgname: str, uuid: str
|
|
) -> Optional[NamespaceImmutabilityPolicy]:
|
|
"""Get specific policy by uuid."""
|
|
try:
|
|
row = (
|
|
NamespaceImmutabilityPolicyTable.select()
|
|
.join(User)
|
|
.where(NamespaceImmutabilityPolicyTable.uuid == uuid, User.username == orgname)
|
|
.get()
|
|
)
|
|
return NamespaceImmutabilityPolicy(row)
|
|
except NamespaceImmutabilityPolicyTable.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def create_namespace_immutability_policy(
|
|
orgname: str, policy_config: PolicyConfig
|
|
) -> NamespaceImmutabilityPolicyTable:
|
|
"""Create namespace immutability policy."""
|
|
_validate_policy(policy_config)
|
|
with db_transaction():
|
|
namespace = get_active_namespace_user_by_username(orgname)
|
|
|
|
if _is_duplicate_namespace_policy(namespace.id, policy_config):
|
|
raise DuplicateImmutabilityPolicy(
|
|
"A policy with the same tag_pattern and tag_pattern_matches already exists"
|
|
)
|
|
|
|
return NamespaceImmutabilityPolicyTable.create(namespace=namespace.id, policy=policy_config)
|
|
|
|
|
|
def update_namespace_immutability_policy(
|
|
orgname: str, uuid: str, policy_config: PolicyConfig
|
|
) -> bool:
|
|
"""Update namespace immutability policy."""
|
|
_validate_policy(policy_config)
|
|
with db_transaction():
|
|
namespace = get_active_namespace_user_by_username(orgname)
|
|
|
|
if not get_namespace_immutability_policy(orgname, uuid):
|
|
raise ImmutabilityPolicyDoesNotExist(f"Policy {uuid} not found")
|
|
|
|
if _is_duplicate_namespace_policy(namespace.id, policy_config, exclude_uuid=uuid):
|
|
raise DuplicateImmutabilityPolicy(
|
|
"A policy with the same tag_pattern and tag_pattern_matches already exists"
|
|
)
|
|
|
|
NamespaceImmutabilityPolicyTable.update(policy=policy_config).where(
|
|
NamespaceImmutabilityPolicyTable.uuid == uuid,
|
|
NamespaceImmutabilityPolicyTable.namespace == namespace.id,
|
|
).execute()
|
|
return True
|
|
|
|
|
|
def delete_namespace_immutability_policy(orgname: str, uuid: str) -> bool:
|
|
"""Delete namespace immutability policy."""
|
|
with db_transaction():
|
|
namespace = get_active_namespace_user_by_username(orgname)
|
|
|
|
if not get_namespace_immutability_policy(orgname, uuid):
|
|
raise ImmutabilityPolicyDoesNotExist(f"Policy {uuid} not found")
|
|
|
|
NamespaceImmutabilityPolicyTable.delete().where(
|
|
NamespaceImmutabilityPolicyTable.uuid == uuid,
|
|
NamespaceImmutabilityPolicyTable.namespace == namespace.id,
|
|
).execute()
|
|
return True
|
|
|
|
|
|
# Repository policy CRUD
|
|
|
|
|
|
def get_repository_immutability_policies(
|
|
orgname: str, repo_name: str
|
|
) -> list[RepositoryImmutabilityPolicy]:
|
|
"""Get all immutability policies for repository."""
|
|
query = (
|
|
RepositoryImmutabilityPolicyTable.select()
|
|
.join(Repository)
|
|
.join(User)
|
|
.where(User.username == orgname, Repository.name == repo_name)
|
|
)
|
|
return [RepositoryImmutabilityPolicy(row) for row in query]
|
|
|
|
|
|
def get_repository_immutability_policy(
|
|
orgname: str, repo_name: str, uuid: str
|
|
) -> Optional[RepositoryImmutabilityPolicy]:
|
|
"""Get specific policy by uuid."""
|
|
try:
|
|
row = (
|
|
RepositoryImmutabilityPolicyTable.select()
|
|
.join(Repository)
|
|
.join(User)
|
|
.where(
|
|
User.username == orgname,
|
|
Repository.name == repo_name,
|
|
RepositoryImmutabilityPolicyTable.uuid == uuid,
|
|
)
|
|
.get()
|
|
)
|
|
return RepositoryImmutabilityPolicy(row)
|
|
except RepositoryImmutabilityPolicyTable.DoesNotExist:
|
|
return None
|
|
|
|
|
|
def create_repository_immutability_policy(
|
|
orgname: str, repo_name: str, policy_config: PolicyConfig
|
|
) -> RepositoryImmutabilityPolicyTable:
|
|
"""Create repository immutability policy."""
|
|
_validate_policy(policy_config)
|
|
with db_transaction():
|
|
namespace = get_active_namespace_user_by_username(orgname)
|
|
repo = repository.get_repository(orgname, repo_name)
|
|
|
|
if repo is None:
|
|
raise InvalidRepositoryException(f"Repository does not exist: {repo_name}")
|
|
|
|
if _is_duplicate_repository_policy(repo.id, policy_config):
|
|
raise DuplicateImmutabilityPolicy(
|
|
"A policy with the same tag_pattern and tag_pattern_matches already exists"
|
|
)
|
|
|
|
return RepositoryImmutabilityPolicyTable.create(
|
|
namespace=namespace.id, repository=repo.id, policy=policy_config
|
|
)
|
|
|
|
|
|
def update_repository_immutability_policy(
|
|
orgname: str, repo_name: str, uuid: str, policy_config: PolicyConfig
|
|
) -> bool:
|
|
"""Update repository immutability policy."""
|
|
_validate_policy(policy_config)
|
|
with db_transaction():
|
|
namespace = get_active_namespace_user_by_username(orgname)
|
|
repo = repository.get_repository(orgname, repo_name)
|
|
|
|
if repo is None:
|
|
raise InvalidRepositoryException(f"Repository does not exist: {repo_name}")
|
|
|
|
if not get_repository_immutability_policy(orgname, repo_name, uuid):
|
|
raise ImmutabilityPolicyDoesNotExist(f"Policy {uuid} not found")
|
|
|
|
if _is_duplicate_repository_policy(repo.id, policy_config, exclude_uuid=uuid):
|
|
raise DuplicateImmutabilityPolicy(
|
|
"A policy with the same tag_pattern and tag_pattern_matches already exists"
|
|
)
|
|
|
|
RepositoryImmutabilityPolicyTable.update(policy=policy_config).where(
|
|
RepositoryImmutabilityPolicyTable.uuid == uuid,
|
|
RepositoryImmutabilityPolicyTable.namespace == namespace.id,
|
|
).execute()
|
|
return True
|
|
|
|
|
|
def delete_repository_immutability_policy(orgname: str, repo_name: str, uuid: str) -> bool:
|
|
"""Delete repository immutability policy."""
|
|
with db_transaction():
|
|
namespace = get_active_namespace_user_by_username(orgname)
|
|
repo = repository.get_repository(orgname, repo_name)
|
|
|
|
if repo is None:
|
|
raise InvalidRepositoryException(f"Repository does not exist: {repo_name}")
|
|
|
|
if not get_repository_immutability_policy(orgname, repo_name, uuid):
|
|
raise ImmutabilityPolicyDoesNotExist(f"Policy {uuid} not found")
|
|
|
|
RepositoryImmutabilityPolicyTable.delete().where(
|
|
RepositoryImmutabilityPolicyTable.uuid == uuid,
|
|
RepositoryImmutabilityPolicyTable.namespace == namespace.id,
|
|
).execute()
|
|
return True
|
|
|
|
|
|
# Policy evaluation
|
|
|
|
|
|
def evaluate_immutability_policies(repository_id: int, namespace_id: int, tag_name: str) -> bool:
|
|
"""
|
|
Check if tag should be marked immutable based on policies.
|
|
Returns True if any policy matches.
|
|
|
|
This function performs two separate queries (repository policies, then namespace
|
|
policies) and returns early on first match. This design is acceptable because:
|
|
- Policy tables are expected to remain small (few policies per namespace/repo)
|
|
- Function is typically called once per tag creation, not in a loop
|
|
- Early return on first match minimizes query overhead in the common case
|
|
"""
|
|
# Check repository policies
|
|
for row in RepositoryImmutabilityPolicyTable.select().where(
|
|
RepositoryImmutabilityPolicyTable.repository == repository_id
|
|
):
|
|
config = row.policy
|
|
if _matches_policy(
|
|
tag_name, config.get("tag_pattern"), config.get("tag_pattern_matches", True)
|
|
):
|
|
return True
|
|
|
|
# Check namespace policies
|
|
for row in NamespaceImmutabilityPolicyTable.select().where(
|
|
NamespaceImmutabilityPolicyTable.namespace == namespace_id
|
|
):
|
|
config = row.policy
|
|
if _matches_policy(
|
|
tag_name, config.get("tag_pattern"), config.get("tag_pattern_matches", True)
|
|
):
|
|
return True
|
|
|
|
return False
|