1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/immutability_policy.py
jbpratt 73d2e2f444 feat(endpoints): add immutability policy API endpoints (PROJQUAY-10160) (#4934)
Add REST API for managing immutability policies at organization and
repository levels. Integrate policy evaluation into tag creation.

Signed-off-by: Brady Pratt <bpratt@redhat.com>
Co-authored-by: Claude <noreply@anthropic.com>
2026-01-22 15:30:09 +00:00

306 lines
11 KiB
Python

import logging
from typing import Any
from flask import request
import features
from auth import scopes
from auth.permissions import (
AdministerOrganizationPermission,
AdministerRepositoryPermission,
)
from data import model
from data.model import immutability
from data.registry_model import registry_model
from endpoints.api import (
ApiResource,
RepositoryParamResource,
allow_if_global_readonly_superuser,
allow_if_superuser,
allow_if_superuser_with_full_access,
log_action,
nickname,
path_param,
request_error,
require_repo_admin,
require_scope,
resource,
show_if,
validate_json_request,
)
from endpoints.exception import NotFound, Unauthorized
logger = logging.getLogger(__name__)
IMMUTABILITY_POLICY_SCHEMA = {
"ImmutabilityPolicyConfig": {
"type": "object",
"description": "The immutability policy configuration",
"required": ["tagPattern"],
"properties": {
"tagPattern": {
"type": "string",
"description": "Regex pattern to match tag names",
},
"tagPatternMatches": {
"type": "boolean",
"description": "If true, matching tags are immutable. If false, non-matching tags are immutable.",
},
},
},
}
def _parse_policy_config(app_data: dict[str, Any]) -> dict[str, Any]:
"""Parse and validate policy config from request."""
tag_pattern = app_data.get("tagPattern")
if tag_pattern and isinstance(tag_pattern, str):
tag_pattern = tag_pattern.strip()
tag_pattern_matches = app_data.get("tagPatternMatches", True)
return {"tag_pattern": tag_pattern, "tag_pattern_matches": tag_pattern_matches}
# Organization endpoints
@resource("/v1/organization/<orgname>/immutabilitypolicy/")
@path_param("orgname", "The name of the organization")
@show_if(features.IMMUTABLE_TAGS)
class OrgImmutabilityPolicies(ApiResource):
schemas = IMMUTABILITY_POLICY_SCHEMA
@require_scope(scopes.ORG_ADMIN)
@nickname("listOrgImmutabilityPolicies")
def get(self, orgname):
permission = AdministerOrganizationPermission(orgname)
if (
not permission.can()
and not allow_if_global_readonly_superuser()
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
):
raise Unauthorized()
policies = immutability.get_namespace_immutability_policies(orgname)
return {"policies": [p.get_view() for p in policies]}
@require_scope(scopes.ORG_ADMIN)
@validate_json_request("ImmutabilityPolicyConfig")
@nickname("createOrgImmutabilityPolicy")
def post(self, orgname):
permission = AdministerOrganizationPermission(orgname)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
policy_config = _parse_policy_config(request.get_json())
try:
policy = immutability.create_namespace_immutability_policy(orgname, policy_config)
except model.InvalidImmutabilityPolicy as ex:
request_error(ex)
except model.DuplicateImmutabilityPolicy as ex:
request_error(ex)
log_action("create_immutability_policy", orgname, {"namespace": orgname, **policy_config})
return {"uuid": policy.uuid}, 201
@resource("/v1/organization/<orgname>/immutabilitypolicy/<policy_uuid>")
@path_param("orgname", "The name of the organization")
@path_param("policy_uuid", "The unique ID of the policy")
@show_if(features.IMMUTABLE_TAGS)
class OrgImmutabilityPolicy(ApiResource):
schemas = IMMUTABILITY_POLICY_SCHEMA
@require_scope(scopes.ORG_ADMIN)
@nickname("getOrgImmutabilityPolicy")
def get(self, orgname, policy_uuid):
permission = AdministerOrganizationPermission(orgname)
if (
not permission.can()
and not allow_if_global_readonly_superuser()
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
):
raise Unauthorized()
policy = immutability.get_namespace_immutability_policy(orgname, policy_uuid)
if policy is None:
raise NotFound()
return policy.get_view()
@require_scope(scopes.ORG_ADMIN)
@validate_json_request("ImmutabilityPolicyConfig")
@nickname("updateOrgImmutabilityPolicy")
def put(self, orgname, policy_uuid):
permission = AdministerOrganizationPermission(orgname)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
policy_config = _parse_policy_config(request.get_json())
try:
immutability.update_namespace_immutability_policy(orgname, policy_uuid, policy_config)
except model.InvalidImmutabilityPolicy as ex:
request_error(ex)
except model.DuplicateImmutabilityPolicy as ex:
request_error(ex)
except model.ImmutabilityPolicyDoesNotExist:
raise NotFound()
log_action("update_immutability_policy", orgname, {"namespace": orgname, **policy_config})
return {"uuid": policy_uuid}, 204
@require_scope(scopes.ORG_ADMIN)
@nickname("deleteOrgImmutabilityPolicy")
def delete(self, orgname, policy_uuid):
permission = AdministerOrganizationPermission(orgname)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
try:
immutability.delete_namespace_immutability_policy(orgname, policy_uuid)
except model.ImmutabilityPolicyDoesNotExist:
raise NotFound()
log_action(
"delete_immutability_policy",
orgname,
{"namespace": orgname, "policy_uuid": policy_uuid},
)
return {"uuid": policy_uuid}, 200
# Repository endpoints
@resource("/v1/repository/<apirepopath:repository>/immutabilitypolicy/")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@show_if(features.IMMUTABLE_TAGS)
class RepositoryImmutabilityPolicies(RepositoryParamResource):
schemas = IMMUTABILITY_POLICY_SCHEMA
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@nickname("listRepositoryImmutabilityPolicies")
def get(self, namespace, repository):
permission = AdministerRepositoryPermission(namespace, repository)
if (
not permission.can()
and not allow_if_global_readonly_superuser()
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
):
raise Unauthorized()
if registry_model.lookup_repository(namespace, repository) is None:
raise NotFound()
policies = immutability.get_repository_immutability_policies(namespace, repository)
return {"policies": [p.get_view() for p in policies]}
@require_repo_admin(allow_for_superuser=True)
@validate_json_request("ImmutabilityPolicyConfig")
@nickname("createRepositoryImmutabilityPolicy")
def post(self, namespace, repository):
permission = AdministerRepositoryPermission(namespace, repository)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
if registry_model.lookup_repository(namespace, repository) is None:
raise NotFound()
policy_config = _parse_policy_config(request.get_json())
try:
policy = immutability.create_repository_immutability_policy(
namespace, repository, policy_config
)
except model.InvalidImmutabilityPolicy as ex:
request_error(ex)
except model.DuplicateImmutabilityPolicy as ex:
request_error(ex)
except model.InvalidRepositoryException:
raise NotFound()
log_action(
"create_immutability_policy",
namespace,
{"namespace": namespace, "repo": repository, **policy_config},
repo_name=repository,
)
return {"uuid": policy.uuid}, 201
@resource("/v1/repository/<apirepopath:repository>/immutabilitypolicy/<policy_uuid>")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("policy_uuid", "The unique ID of the policy")
@show_if(features.IMMUTABLE_TAGS)
class RepositoryImmutabilityPolicy(RepositoryParamResource):
schemas = IMMUTABILITY_POLICY_SCHEMA
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@nickname("getRepositoryImmutabilityPolicy")
def get(self, namespace, repository, policy_uuid):
permission = AdministerRepositoryPermission(namespace, repository)
if (
not permission.can()
and not allow_if_global_readonly_superuser()
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
):
raise Unauthorized()
policy = immutability.get_repository_immutability_policy(namespace, repository, policy_uuid)
if policy is None:
raise NotFound()
return policy.get_view()
@require_repo_admin(allow_for_superuser=True)
@validate_json_request("ImmutabilityPolicyConfig")
@nickname("updateRepositoryImmutabilityPolicy")
def put(self, namespace, repository, policy_uuid):
permission = AdministerRepositoryPermission(namespace, repository)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
policy_config = _parse_policy_config(request.get_json())
try:
immutability.update_repository_immutability_policy(
namespace, repository, policy_uuid, policy_config
)
except model.InvalidImmutabilityPolicy as ex:
request_error(ex)
except model.DuplicateImmutabilityPolicy as ex:
request_error(ex)
except model.ImmutabilityPolicyDoesNotExist:
raise NotFound()
except model.InvalidRepositoryException:
raise NotFound()
log_action(
"update_immutability_policy",
namespace,
{"namespace": namespace, "repo": repository, **policy_config},
repo_name=repository,
)
return {"uuid": policy_uuid}, 204
@require_repo_admin(allow_for_superuser=True)
@nickname("deleteRepositoryImmutabilityPolicy")
def delete(self, namespace, repository, policy_uuid):
permission = AdministerRepositoryPermission(namespace, repository)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
try:
immutability.delete_repository_immutability_policy(namespace, repository, policy_uuid)
except model.ImmutabilityPolicyDoesNotExist:
raise NotFound()
except model.InvalidRepositoryException:
raise NotFound()
log_action(
"delete_immutability_policy",
namespace,
{"namespace": namespace, "repo": repository, "policy_uuid": policy_uuid},
repo_name=repository,
)
return {"uuid": policy_uuid}, 200