mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
Add REST API for managing immutability policies at organization and repository levels. Integrate policy evaluation into tag creation. Signed-off-by: Brady Pratt <bpratt@redhat.com> Co-authored-by: Claude <noreply@anthropic.com>
306 lines
11 KiB
Python
306 lines
11 KiB
Python
import logging
|
|
from typing import Any
|
|
|
|
from flask import request
|
|
|
|
import features
|
|
from auth import scopes
|
|
from auth.permissions import (
|
|
AdministerOrganizationPermission,
|
|
AdministerRepositoryPermission,
|
|
)
|
|
from data import model
|
|
from data.model import immutability
|
|
from data.registry_model import registry_model
|
|
from endpoints.api import (
|
|
ApiResource,
|
|
RepositoryParamResource,
|
|
allow_if_global_readonly_superuser,
|
|
allow_if_superuser,
|
|
allow_if_superuser_with_full_access,
|
|
log_action,
|
|
nickname,
|
|
path_param,
|
|
request_error,
|
|
require_repo_admin,
|
|
require_scope,
|
|
resource,
|
|
show_if,
|
|
validate_json_request,
|
|
)
|
|
from endpoints.exception import NotFound, Unauthorized
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
IMMUTABILITY_POLICY_SCHEMA = {
|
|
"ImmutabilityPolicyConfig": {
|
|
"type": "object",
|
|
"description": "The immutability policy configuration",
|
|
"required": ["tagPattern"],
|
|
"properties": {
|
|
"tagPattern": {
|
|
"type": "string",
|
|
"description": "Regex pattern to match tag names",
|
|
},
|
|
"tagPatternMatches": {
|
|
"type": "boolean",
|
|
"description": "If true, matching tags are immutable. If false, non-matching tags are immutable.",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def _parse_policy_config(app_data: dict[str, Any]) -> dict[str, Any]:
|
|
"""Parse and validate policy config from request."""
|
|
tag_pattern = app_data.get("tagPattern")
|
|
if tag_pattern and isinstance(tag_pattern, str):
|
|
tag_pattern = tag_pattern.strip()
|
|
tag_pattern_matches = app_data.get("tagPatternMatches", True)
|
|
return {"tag_pattern": tag_pattern, "tag_pattern_matches": tag_pattern_matches}
|
|
|
|
|
|
# Organization endpoints
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/immutabilitypolicy/")
|
|
@path_param("orgname", "The name of the organization")
|
|
@show_if(features.IMMUTABLE_TAGS)
|
|
class OrgImmutabilityPolicies(ApiResource):
|
|
schemas = IMMUTABILITY_POLICY_SCHEMA
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("listOrgImmutabilityPolicies")
|
|
def get(self, orgname):
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if (
|
|
not permission.can()
|
|
and not allow_if_global_readonly_superuser()
|
|
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
|
|
):
|
|
raise Unauthorized()
|
|
|
|
policies = immutability.get_namespace_immutability_policies(orgname)
|
|
return {"policies": [p.get_view() for p in policies]}
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@validate_json_request("ImmutabilityPolicyConfig")
|
|
@nickname("createOrgImmutabilityPolicy")
|
|
def post(self, orgname):
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if not permission.can() and not allow_if_superuser_with_full_access():
|
|
raise Unauthorized()
|
|
|
|
policy_config = _parse_policy_config(request.get_json())
|
|
|
|
try:
|
|
policy = immutability.create_namespace_immutability_policy(orgname, policy_config)
|
|
except model.InvalidImmutabilityPolicy as ex:
|
|
request_error(ex)
|
|
except model.DuplicateImmutabilityPolicy as ex:
|
|
request_error(ex)
|
|
|
|
log_action("create_immutability_policy", orgname, {"namespace": orgname, **policy_config})
|
|
return {"uuid": policy.uuid}, 201
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/immutabilitypolicy/<policy_uuid>")
|
|
@path_param("orgname", "The name of the organization")
|
|
@path_param("policy_uuid", "The unique ID of the policy")
|
|
@show_if(features.IMMUTABLE_TAGS)
|
|
class OrgImmutabilityPolicy(ApiResource):
|
|
schemas = IMMUTABILITY_POLICY_SCHEMA
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("getOrgImmutabilityPolicy")
|
|
def get(self, orgname, policy_uuid):
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if (
|
|
not permission.can()
|
|
and not allow_if_global_readonly_superuser()
|
|
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
|
|
):
|
|
raise Unauthorized()
|
|
|
|
policy = immutability.get_namespace_immutability_policy(orgname, policy_uuid)
|
|
if policy is None:
|
|
raise NotFound()
|
|
return policy.get_view()
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@validate_json_request("ImmutabilityPolicyConfig")
|
|
@nickname("updateOrgImmutabilityPolicy")
|
|
def put(self, orgname, policy_uuid):
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if not permission.can() and not allow_if_superuser_with_full_access():
|
|
raise Unauthorized()
|
|
|
|
policy_config = _parse_policy_config(request.get_json())
|
|
|
|
try:
|
|
immutability.update_namespace_immutability_policy(orgname, policy_uuid, policy_config)
|
|
except model.InvalidImmutabilityPolicy as ex:
|
|
request_error(ex)
|
|
except model.DuplicateImmutabilityPolicy as ex:
|
|
request_error(ex)
|
|
except model.ImmutabilityPolicyDoesNotExist:
|
|
raise NotFound()
|
|
|
|
log_action("update_immutability_policy", orgname, {"namespace": orgname, **policy_config})
|
|
return {"uuid": policy_uuid}, 204
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("deleteOrgImmutabilityPolicy")
|
|
def delete(self, orgname, policy_uuid):
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if not permission.can() and not allow_if_superuser_with_full_access():
|
|
raise Unauthorized()
|
|
|
|
try:
|
|
immutability.delete_namespace_immutability_policy(orgname, policy_uuid)
|
|
except model.ImmutabilityPolicyDoesNotExist:
|
|
raise NotFound()
|
|
|
|
log_action(
|
|
"delete_immutability_policy",
|
|
orgname,
|
|
{"namespace": orgname, "policy_uuid": policy_uuid},
|
|
)
|
|
return {"uuid": policy_uuid}, 200
|
|
|
|
|
|
# Repository endpoints
|
|
|
|
|
|
@resource("/v1/repository/<apirepopath:repository>/immutabilitypolicy/")
|
|
@path_param("repository", "The full path of the repository. e.g. namespace/name")
|
|
@show_if(features.IMMUTABLE_TAGS)
|
|
class RepositoryImmutabilityPolicies(RepositoryParamResource):
|
|
schemas = IMMUTABILITY_POLICY_SCHEMA
|
|
|
|
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
|
|
@nickname("listRepositoryImmutabilityPolicies")
|
|
def get(self, namespace, repository):
|
|
permission = AdministerRepositoryPermission(namespace, repository)
|
|
if (
|
|
not permission.can()
|
|
and not allow_if_global_readonly_superuser()
|
|
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
|
|
):
|
|
raise Unauthorized()
|
|
|
|
if registry_model.lookup_repository(namespace, repository) is None:
|
|
raise NotFound()
|
|
|
|
policies = immutability.get_repository_immutability_policies(namespace, repository)
|
|
return {"policies": [p.get_view() for p in policies]}
|
|
|
|
@require_repo_admin(allow_for_superuser=True)
|
|
@validate_json_request("ImmutabilityPolicyConfig")
|
|
@nickname("createRepositoryImmutabilityPolicy")
|
|
def post(self, namespace, repository):
|
|
permission = AdministerRepositoryPermission(namespace, repository)
|
|
if not permission.can() and not allow_if_superuser_with_full_access():
|
|
raise Unauthorized()
|
|
|
|
if registry_model.lookup_repository(namespace, repository) is None:
|
|
raise NotFound()
|
|
|
|
policy_config = _parse_policy_config(request.get_json())
|
|
|
|
try:
|
|
policy = immutability.create_repository_immutability_policy(
|
|
namespace, repository, policy_config
|
|
)
|
|
except model.InvalidImmutabilityPolicy as ex:
|
|
request_error(ex)
|
|
except model.DuplicateImmutabilityPolicy as ex:
|
|
request_error(ex)
|
|
except model.InvalidRepositoryException:
|
|
raise NotFound()
|
|
|
|
log_action(
|
|
"create_immutability_policy",
|
|
namespace,
|
|
{"namespace": namespace, "repo": repository, **policy_config},
|
|
repo_name=repository,
|
|
)
|
|
return {"uuid": policy.uuid}, 201
|
|
|
|
|
|
@resource("/v1/repository/<apirepopath:repository>/immutabilitypolicy/<policy_uuid>")
|
|
@path_param("repository", "The full path of the repository. e.g. namespace/name")
|
|
@path_param("policy_uuid", "The unique ID of the policy")
|
|
@show_if(features.IMMUTABLE_TAGS)
|
|
class RepositoryImmutabilityPolicy(RepositoryParamResource):
|
|
schemas = IMMUTABILITY_POLICY_SCHEMA
|
|
|
|
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
|
|
@nickname("getRepositoryImmutabilityPolicy")
|
|
def get(self, namespace, repository, policy_uuid):
|
|
permission = AdministerRepositoryPermission(namespace, repository)
|
|
if (
|
|
not permission.can()
|
|
and not allow_if_global_readonly_superuser()
|
|
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
|
|
):
|
|
raise Unauthorized()
|
|
|
|
policy = immutability.get_repository_immutability_policy(namespace, repository, policy_uuid)
|
|
if policy is None:
|
|
raise NotFound()
|
|
return policy.get_view()
|
|
|
|
@require_repo_admin(allow_for_superuser=True)
|
|
@validate_json_request("ImmutabilityPolicyConfig")
|
|
@nickname("updateRepositoryImmutabilityPolicy")
|
|
def put(self, namespace, repository, policy_uuid):
|
|
permission = AdministerRepositoryPermission(namespace, repository)
|
|
if not permission.can() and not allow_if_superuser_with_full_access():
|
|
raise Unauthorized()
|
|
|
|
policy_config = _parse_policy_config(request.get_json())
|
|
|
|
try:
|
|
immutability.update_repository_immutability_policy(
|
|
namespace, repository, policy_uuid, policy_config
|
|
)
|
|
except model.InvalidImmutabilityPolicy as ex:
|
|
request_error(ex)
|
|
except model.DuplicateImmutabilityPolicy as ex:
|
|
request_error(ex)
|
|
except model.ImmutabilityPolicyDoesNotExist:
|
|
raise NotFound()
|
|
except model.InvalidRepositoryException:
|
|
raise NotFound()
|
|
|
|
log_action(
|
|
"update_immutability_policy",
|
|
namespace,
|
|
{"namespace": namespace, "repo": repository, **policy_config},
|
|
repo_name=repository,
|
|
)
|
|
return {"uuid": policy_uuid}, 204
|
|
|
|
@require_repo_admin(allow_for_superuser=True)
|
|
@nickname("deleteRepositoryImmutabilityPolicy")
|
|
def delete(self, namespace, repository, policy_uuid):
|
|
permission = AdministerRepositoryPermission(namespace, repository)
|
|
if not permission.can() and not allow_if_superuser_with_full_access():
|
|
raise Unauthorized()
|
|
|
|
try:
|
|
immutability.delete_repository_immutability_policy(namespace, repository, policy_uuid)
|
|
except model.ImmutabilityPolicyDoesNotExist:
|
|
raise NotFound()
|
|
except model.InvalidRepositoryException:
|
|
raise NotFound()
|
|
|
|
log_action(
|
|
"delete_immutability_policy",
|
|
namespace,
|
|
{"namespace": namespace, "repo": repository, "policy_uuid": policy_uuid},
|
|
repo_name=repository,
|
|
)
|
|
return {"uuid": policy_uuid}, 200
|