1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/policy.py
OpenShift Cherrypick Robot 57895ec081 [redhat-3.16] fix(api): superuser panel access without SUPERUSERS_FULL_ACCESS (PROJQUAY-9693) (#4512)
* fix(api): fix superuser panel access and permission bypass behavior

Fixed multiple issues with superuser functionality related to FEATURE_SUPERUSERS_FULL_ACCESS:

1. **Superuser Panel Access**: Fixed bug where superuser panel endpoints returned 403
   when FEATURE_SUPERUSERS_FULL_ACCESS was disabled. Basic superuser panel operations
   (user list, logs, org list, messages) now work with just FEATURE_SUPER_USERS enabled.

2. **Permission Bypass Logic**: Updated decorators to properly differentiate between:
   - Basic superuser operations (don't require FULL_ACCESS)
   - Permission bypass operations (require FULL_ACCESS)
   - Read operations (allow global readonly superusers with FULL_ACCESS)
   - Write operations (only allow regular superusers with FULL_ACCESS)

3. **Global Readonly Superuser Support**: Fixed decorators to allow global readonly
   superusers read access when FULL_ACCESS is enabled, while blocking write operations.

4. **License Bypass**: Superusers with FULL_ACCESS now bypass license/quota limits
   when creating or modifying private repositories in any namespace.

Changes:
- endpoints/api/__init__.py: Fixed allow_if_superuser() and decorators
- endpoints/api/organization.py: Updated 4 GET endpoints to require FULL_ACCESS
- endpoints/api/namespacequota.py: Updated 2 GET endpoints to require FULL_ACCESS
- endpoints/api/team.py: Updated 2 GET endpoints to require FULL_ACCESS
- endpoints/api/prototype.py: Updated 1 GET endpoint to require FULL_ACCESS
- endpoints/api/repository.py: Added license bypass for superusers with FULL_ACCESS
- endpoints/api/logs.py: Fixed log access to require FULL_ACCESS for permission bypass
- endpoints/api/test/test_superuser_full_access.py: Added comprehensive test suite
- test/test_api_usage.py: Updated test expectations for license bypass behavior

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* test(api): add plan limit test for regular users

Adds test_plan_limit_enforcement_for_regular_users to verify that
license limits still apply to non-superuser accounts. This restores
test coverage for plan limit enforcement that was changed when we
updated test_trychangevisibility to validate superuser bypass behavior.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(api): correct superuser cross-namespace permission model

Fixes permission checks to properly implement the superuser access model:

- Global Readonly Superusers (auditors): Always have read access to all
  content, independent of FEATURE_SUPERUSERS_FULL_ACCESS setting
- Regular Superusers: Can access /v1/superuser endpoints and their own
  content. Require FEATURE_SUPERUSERS_FULL_ACCESS=true for cross-namespace
  read access
- Full Access Superusers: Regular superusers with FULL_ACCESS enabled,
  can perform CRUD on content they don't own
- Write operations: Only allowed for full access superusers (global
  readonly superusers never get write access)

Updated 18 permission checks across 7 files:
- endpoints/api/__init__.py: require_repo_permission decorator
- endpoints/api/organization.py: org view, teams, collaborators, proxy cache
- endpoints/api/namespacequota.py: quota listing and individual quota
- endpoints/api/policy.py: auto-prune policies for orgs and repos
- endpoints/api/robot.py: robot listing, details, permissions, federation
- endpoints/api/build.py: repository build logs
- endpoints/api/repository_models_pre_oci.py: repository visibility query

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(api): require FULL_ACCESS for regular superusers on /v1/superuser/apptokens

Updates the SuperUserAppTokens endpoint to properly enforce permission model:

- Regular superusers WITHOUT FULL_ACCESS: Get 403 Unauthorized
- Global readonly superusers: Always get 200 (auditor access, independent of FULL_ACCESS)
- Full access superusers (FULL_ACCESS=true): Get 200 with all tokens

This ensures regular superusers can only audit app tokens across the system
when they have full access privileges enabled, while global readonly superusers
(auditors) always maintain read access for auditing purposes.

Changes:
- endpoints/api/superuser.py: Update permission check in SuperUserAppTokens.get()
- endpoints/api/test/test_appspecifictoken.py: Update test mocking and add new test
  for 403 behavior

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Dave O'Connor <doconnor@redhat.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Dave O'Connor <1656866+HammerMeetNail@users.noreply.github.com>
2025-11-13 16:50:32 +00:00

778 lines
28 KiB
Python

import logging
from flask import request
import features
from auth import scopes
from auth.auth_context import get_authenticated_user
from auth.permissions import (
AdministerOrganizationPermission,
AdministerRepositoryPermission,
)
from data import model
from data.registry_model import registry_model
from endpoints.api import (
ApiResource,
RepositoryParamResource,
allow_if_any_superuser,
allow_if_global_readonly_superuser,
allow_if_superuser,
allow_if_superuser_with_full_access,
log_action,
nickname,
path_param,
request_error,
require_repo_admin,
require_scope,
require_user_admin,
resource,
show_if,
validate_json_request,
)
from endpoints.exception import InvalidRequest, NotFound, Unauthorized
logger = logging.getLogger(__name__)
@resource("/v1/organization/<orgname>/autoprunepolicy/")
@path_param("orgname", "The name of the organization")
@show_if(features.AUTO_PRUNE)
class OrgAutoPrunePolicies(ApiResource):
"""
Resource for listing and creating organization auto-prune policies
"""
schemas = {
"AutoPrunePolicyConfig": {
"type": "object",
"description": "The policy configuration that is to be applied to the organization",
"required": ["method", "value"],
"properties": {
"method": {
"type": "string",
"description": "The method to use for pruning tags (number_of_tags, creation_date)",
},
"value": {
"type": ["integer", "string"],
"description": "The value to use for the pruning method (number of tags e.g. 10, time delta e.g. 7d (7 days))",
},
"tagPattern": {
"type": "string",
"description": "Tags only matching this pattern will be pruned",
},
"tagPatternMatches": {
"type": "boolean",
"description": "Determine whether pruned tags should or should not match the tagPattern",
},
},
},
}
@require_scope(scopes.ORG_ADMIN)
@nickname("listOrganizationAutoPrunePolicies")
def get(self, orgname):
"""
Lists the auto-prune policies for the organization
"""
permission = AdministerOrganizationPermission(orgname)
# Global readonly superusers can always view, regular superusers need FULL_ACCESS
if (
not permission.can()
and not allow_if_global_readonly_superuser()
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
):
raise Unauthorized()
policies = model.autoprune.get_namespace_autoprune_policies_by_orgname(orgname)
return {"policies": [policy.get_view() for policy in policies]}
@require_scope(scopes.ORG_ADMIN)
@validate_json_request("AutoPrunePolicyConfig")
@nickname("createOrganizationAutoPrunePolicy")
def post(self, orgname):
"""
Creates an auto-prune policy for the organization
"""
permission = AdministerOrganizationPermission(orgname)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
app_data = request.get_json()
method = app_data.get("method", None)
value = app_data.get("value", None)
tag_pattern = app_data.get("tagPattern", None)
if tag_pattern is not None and isinstance(tag_pattern, str):
tag_pattern = tag_pattern.strip()
tag_pattern_matches = app_data.get("tagPatternMatches", True)
if method is None or value is None:
request_error(message="Missing the following parameters: method, value")
policy_config = {
"method": method,
"value": value,
"tag_pattern": tag_pattern,
"tag_pattern_matches": tag_pattern_matches,
}
try:
policy = model.autoprune.create_namespace_autoprune_policy(
orgname, policy_config, create_task=True
)
except model.InvalidNamespaceException:
raise NotFound()
except model.InvalidNamespaceAutoPrunePolicy as ex:
request_error(ex)
except model.NamespaceAutoPrunePolicyAlreadyExists as ex:
request_error(ex)
log_action(
"create_namespace_autoprune_policy",
orgname,
{
"method": policy_config["method"],
"value": policy_config["value"],
"tag_pattern": policy_config.get("tag_pattern"),
"tag_pattern_matches": policy_config.get("tag_pattern_matches"),
"namespace": orgname,
},
)
return {"uuid": policy.uuid}, 201
@resource("/v1/organization/<orgname>/autoprunepolicy/<policy_uuid>")
@path_param("orgname", "The name of the organization")
@path_param("policy_uuid", "The unique ID of the policy")
@show_if(features.AUTO_PRUNE)
class OrgAutoPrunePolicy(ApiResource):
"""
Resource for fetching, updating, and deleting specific organization auto-prune policies
"""
schemas = {
"AutoPrunePolicyConfig": {
"type": "object",
"description": "The policy configuration that is to be applied to the organization",
"required": ["method", "value"],
"properties": {
"method": {
"type": "string",
"description": "The method to use for pruning tags (number_of_tags, creation_date)",
},
"value": {
"type": ["integer", "string"],
"description": "The value to use for the pruning method (number of tags e.g. 10, time delta e.g. 7d (7 days))",
},
"tagPattern": {
"type": "string",
"description": "Tags only matching this pattern will be pruned",
},
"tagPatternMatches": {
"type": "boolean",
"description": "Determine whether pruned tags should or should not match the tagPattern",
},
},
},
}
@require_scope(scopes.ORG_ADMIN)
@nickname("getOrganizationAutoPrunePolicy")
def get(self, orgname, policy_uuid):
"""
Fetches the auto-prune policy for the organization
"""
permission = AdministerOrganizationPermission(orgname)
# Global readonly superusers can always view, regular superusers need FULL_ACCESS
if (
not permission.can()
and not allow_if_global_readonly_superuser()
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
):
raise Unauthorized()
policy = model.autoprune.get_namespace_autoprune_policy(orgname, policy_uuid)
if policy is None:
raise NotFound()
return policy.get_view()
@require_scope(scopes.ORG_ADMIN)
@validate_json_request("AutoPrunePolicyConfig")
@nickname("updateOrganizationAutoPrunePolicy")
def put(self, orgname, policy_uuid):
"""
Updates the auto-prune policy for the organization
"""
permission = AdministerOrganizationPermission(orgname)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
app_data = request.get_json()
method = app_data.get("method", None)
value = app_data.get("value", None)
tag_pattern = app_data.get("tagPattern", None)
if tag_pattern is not None and isinstance(tag_pattern, str):
tag_pattern = tag_pattern.strip()
tag_pattern_matches = app_data.get("tagPatternMatches", True)
if method is None or value is None:
request_error(message="Missing the following parameters: method, value")
policy_config = {
"method": method,
"value": value,
"tag_pattern": tag_pattern,
"tag_pattern_matches": tag_pattern_matches,
}
try:
updated = model.autoprune.update_namespace_autoprune_policy(
orgname, policy_uuid, policy_config
)
if not updated:
request_error(message="could not update policy")
except model.InvalidNamespaceException:
raise NotFound()
except model.InvalidNamespaceAutoPrunePolicy as ex:
request_error(ex)
except model.NamespaceAutoPrunePolicyDoesNotExist as ex:
raise NotFound()
log_action(
"update_namespace_autoprune_policy",
orgname,
{
"method": policy_config["method"],
"value": policy_config["value"],
"tag_pattern": policy_config.get("tag_pattern"),
"tag_pattern_matches": policy_config.get("tag_pattern_matches"),
"namespace": orgname,
},
)
return {"uuid": policy_uuid}, 204
@require_scope(scopes.ORG_ADMIN)
@nickname("deleteOrganizationAutoPrunePolicy")
def delete(self, orgname, policy_uuid):
"""
Deletes the auto-prune policy for the organization
"""
permission = AdministerOrganizationPermission(orgname)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
try:
updated = model.autoprune.delete_namespace_autoprune_policy(orgname, policy_uuid)
if not updated:
raise InvalidRequest("could not delete policy")
except model.InvalidNamespaceException as ex:
raise NotFound()
except model.NamespaceAutoPrunePolicyDoesNotExist as ex:
raise NotFound()
log_action(
"delete_namespace_autoprune_policy",
orgname,
{"policy_uuid": policy_uuid, "namespace": orgname},
)
return {"uuid": policy_uuid}, 200
@resource("/v1/repository/<apirepopath:repository>/autoprunepolicy/")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@show_if(features.AUTO_PRUNE)
class RepositoryAutoPrunePolicies(RepositoryParamResource):
"""
Resource for listing and creating repository auto-prune policies
"""
schemas = {
"AutoPrunePolicyConfig": {
"type": "object",
"description": "The policy configuration that is to be applied to the repository",
"required": ["method", "value"],
"properties": {
"method": {
"type": "string",
"description": "The method to use for pruning tags (number_of_tags, creation_date)",
},
"value": {
"type": ["integer", "string"],
"description": "The value to use for the pruning method (number of tags e.g. 10, time delta e.g. 7d (7 days))",
},
"tagPattern": {
"type": "string",
"description": "Tags only matching this pattern will be pruned",
},
"tagPatternMatches": {
"type": "boolean",
"description": "Determine whether pruned tags should or should not match the tagPattern",
},
},
},
}
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@nickname("listRepositoryAutoPrunePolicies")
def get(self, namespace, repository):
"""
Lists the auto-prune policies for the repository
"""
permission = AdministerRepositoryPermission(namespace, repository)
# Global readonly superusers can always view, regular superusers need FULL_ACCESS
if (
not permission.can()
and not allow_if_global_readonly_superuser()
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
):
raise Unauthorized()
if registry_model.lookup_repository(namespace, repository) is None:
raise NotFound()
policies = model.autoprune.get_repository_autoprune_policies_by_repo_name(
namespace, repository
)
return {"policies": [policy.get_view() for policy in policies]}
@require_repo_admin(allow_for_superuser=True)
@validate_json_request("AutoPrunePolicyConfig")
@nickname("createRepositoryAutoPrunePolicy")
def post(self, namespace, repository):
"""
Creates an auto-prune policy for the repository
"""
permission = AdministerRepositoryPermission(namespace, repository)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
if registry_model.lookup_repository(namespace, repository) is None:
raise NotFound()
app_data = request.get_json()
method = app_data.get("method", None)
value = app_data.get("value", None)
tag_pattern = app_data.get("tagPattern", None)
if tag_pattern is not None and isinstance(tag_pattern, str):
tag_pattern = tag_pattern.strip()
tag_pattern_matches = app_data.get("tagPatternMatches", True)
if method is None or value is None:
request_error(message="Missing the following parameters: method, value")
policy_config = {
"method": method,
"value": value,
"tag_pattern": tag_pattern,
"tag_pattern_matches": tag_pattern_matches,
}
try:
policy = model.autoprune.create_repository_autoprune_policy(
namespace, repository, policy_config, create_task=True
)
except model.InvalidNamespaceException:
raise NotFound()
except model.InvalidRepositoryException:
raise NotFound()
except model.InvalidRepositoryAutoPrunePolicy as ex:
request_error(ex)
except model.RepositoryAutoPrunePolicyAlreadyExists as ex:
request_error(ex)
log_action(
"create_repository_autoprune_policy",
namespace,
{
"method": policy_config["method"],
"value": policy_config["value"],
"tag_pattern": policy_config.get("tag_pattern"),
"tag_pattern_matches": policy_config.get("tag_pattern_matches"),
"namespace": namespace,
"repo": repository,
},
repo_name=repository,
)
return {"uuid": policy.uuid}, 201
@resource("/v1/repository/<apirepopath:repository>/autoprunepolicy/<policy_uuid>")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("policy_uuid", "The unique ID of the policy")
@show_if(features.AUTO_PRUNE)
class RepositoryAutoPrunePolicy(RepositoryParamResource):
"""
Resource for fetching, updating, and deleting repository specific auto-prune policies
"""
schemas = {
"AutoPrunePolicyConfig": {
"type": "object",
"description": "The policy configuration that is to be applied to the repository",
"required": ["method", "value"],
"properties": {
"method": {
"type": "string",
"description": "The method to use for pruning tags (number_of_tags, creation_date)",
},
"value": {
"type": ["integer", "string"],
"description": "The value to use for the pruning method (number of tags e.g. 10, time delta e.g. 7d (7 days))",
},
"tagPattern": {
"type": "string",
"description": "Tags only matching this pattern will be pruned",
},
"tagPatternMatches": {
"type": "boolean",
"description": "Determine whether pruned tags should or should not match the tagPattern",
},
},
},
}
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@nickname("getRepositoryAutoPrunePolicy")
def get(self, namespace, repository, policy_uuid):
"""
Fetches the auto-prune policy for the repository
"""
permission = AdministerRepositoryPermission(namespace, repository)
# Global readonly superusers can always view, regular superusers need FULL_ACCESS
if (
not permission.can()
and not allow_if_global_readonly_superuser()
and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser())
):
raise Unauthorized()
policy = model.autoprune.get_repository_autoprune_policy_by_uuid(repository, policy_uuid)
if policy is None:
raise NotFound()
return policy.get_view()
@require_repo_admin(allow_for_superuser=True)
@validate_json_request("AutoPrunePolicyConfig")
@nickname("updateRepositoryAutoPrunePolicy")
def put(self, namespace, repository, policy_uuid):
"""
Updates the auto-prune policy for the repository
"""
permission = AdministerRepositoryPermission(namespace, repository)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
app_data = request.get_json()
method = app_data.get("method", None)
value = app_data.get("value", None)
tag_pattern = app_data.get("tagPattern", None)
if tag_pattern is not None and isinstance(tag_pattern, str):
tag_pattern = tag_pattern.strip()
tag_pattern_matches = app_data.get("tagPatternMatches", True)
if method is None or value is None:
request_error(message="Missing the following parameters: method, value")
policy_config = {
"method": method,
"value": value,
"tag_pattern": tag_pattern,
"tag_pattern_matches": tag_pattern_matches,
}
try:
updated = model.autoprune.update_repository_autoprune_policy(
namespace, repository, policy_uuid, policy_config
)
if not updated:
request_error(message="could not update policy")
except model.InvalidNamespaceException:
raise NotFound()
except model.InvalidRepositoryException:
raise NotFound()
except model.InvalidRepositoryAutoPrunePolicy as ex:
request_error(ex)
except model.RepositoryAutoPrunePolicyDoesNotExist as ex:
raise NotFound()
log_action(
"update_repository_autoprune_policy",
namespace,
{
"method": policy_config["method"],
"value": policy_config["value"],
"tag_pattern": policy_config.get("tag_pattern"),
"tag_pattern_matches": policy_config.get("tag_pattern_matches"),
"namespace": namespace,
"repo": repository,
},
repo_name=repository,
)
return {"uuid": policy_uuid}, 204
@require_repo_admin(allow_for_superuser=True)
@nickname("deleteRepositoryAutoPrunePolicy")
def delete(self, namespace, repository, policy_uuid):
"""
Deletes the auto-prune policy for the repository
"""
permission = AdministerRepositoryPermission(namespace, repository)
if not permission.can() and not allow_if_superuser_with_full_access():
raise Unauthorized()
try:
updated = model.autoprune.delete_repository_autoprune_policy(
namespace, repository, policy_uuid
)
if not updated:
raise InvalidRequest("could not delete policy")
except model.InvalidNamespaceException:
raise NotFound()
except model.InvalidRepositoryException as ex:
raise NotFound()
except model.RepositoryAutoPrunePolicyDoesNotExist as ex:
raise NotFound()
log_action(
"delete_repository_autoprune_policy",
namespace,
{"policy_uuid": policy_uuid, "namespace": namespace, "repo": repository},
repo_name=repository,
)
return {"uuid": policy_uuid}, 200
@resource("/v1/user/autoprunepolicy/")
@show_if(features.AUTO_PRUNE)
class UserAutoPrunePolicies(ApiResource):
"""
Resource for listing and creating organization auto-prune policies
"""
schemas = {
"AutoPrunePolicyConfig": {
"type": "object",
"description": "The policy configuration that is to be applied to the user namespace",
"required": ["method", "value"],
"properties": {
"method": {
"type": "string",
"description": "The method to use for pruning tags (number_of_tags, creation_date)",
},
"value": {
"type": ["integer", "string"],
"description": "The value to use for the pruning method (number of tags e.g. 10, time delta e.g. 7d (7 days))",
},
"tagPattern": {
"type": "string",
"description": "Tags only matching this pattern will be pruned",
},
"tagPatternMatches": {
"type": "boolean",
"description": "Determine whether pruned tags should or should not match the tagPattern",
},
},
},
}
@require_user_admin()
@nickname("listUserAutoPrunePolicies")
def get(self):
"""
Lists the auto-prune policies for the currently logged in user
"""
user = get_authenticated_user()
policies = model.autoprune.get_namespace_autoprune_policies_by_orgname(user.username)
return {"policies": [policy.get_view() for policy in policies]}
@require_user_admin()
@validate_json_request("AutoPrunePolicyConfig")
@nickname("createUserAutoPrunePolicy")
def post(self):
"""
Creates the auto-prune policy for the currently logged in user
"""
user = get_authenticated_user()
app_data = request.get_json()
method = app_data.get("method", None)
value = app_data.get("value", None)
tag_pattern = app_data.get("tagPattern", None)
if tag_pattern is not None and isinstance(tag_pattern, str):
tag_pattern = tag_pattern.strip()
tag_pattern_matches = app_data.get("tagPatternMatches", True)
if method is None or value is None:
request_error(message="Missing the following parameters: method, value")
policy_config = {
"method": method,
"value": value,
"tag_pattern": tag_pattern,
"tag_pattern_matches": tag_pattern_matches,
}
try:
policy = model.autoprune.create_namespace_autoprune_policy(
user.username, policy_config, create_task=True
)
except model.InvalidNamespaceException:
raise NotFound()
except model.InvalidNamespaceAutoPrunePolicy as ex:
request_error(ex)
except model.NamespaceAutoPrunePolicyAlreadyExists as ex:
request_error(ex)
log_action(
"create_namespace_autoprune_policy",
user.username,
{
"method": policy_config["method"],
"value": policy_config["value"],
"namespace": user.username,
"tag_pattern": policy_config.get("tag_pattern"),
"tag_pattern_matches": policy_config.get("tag_pattern_matches"),
},
)
return {"uuid": policy.uuid}, 201
@resource("/v1/user/autoprunepolicy/<policy_uuid>")
@path_param("policy_uuid", "The unique ID of the policy")
@show_if(features.AUTO_PRUNE)
class UserAutoPrunePolicy(ApiResource):
"""
Resource for fetching, updating, and deleting specific user auto-prune policies
"""
schemas = {
"AutoPrunePolicyConfig": {
"type": "object",
"description": "The policy configuration that is to be applied to the user namespace",
"required": ["method", "value"],
"properties": {
"method": {
"type": "string",
"description": "The method to use for pruning tags (number_of_tags, creation_date)",
},
"value": {
"type": ["integer", "string"],
"description": "The value to use for the pruning method (number of tags e.g. 10, time delta e.g. 7d (7 days))",
},
"tagPattern": {
"type": "string",
"description": "Tags only matching this pattern will be pruned",
},
"tagPatternMatches": {
"type": "boolean",
"description": "Determine whether pruned tags should or should not match the tagPattern",
},
},
},
}
@require_user_admin()
@nickname("getUserAutoPrunePolicy")
def get(self, policy_uuid):
"""
Fetches the auto-prune policy for the currently logged in user
"""
user = get_authenticated_user()
policy = model.autoprune.get_namespace_autoprune_policy(user.username, policy_uuid)
if policy is None:
raise NotFound()
return policy.get_view()
@require_user_admin()
@validate_json_request("AutoPrunePolicyConfig")
@nickname("updateUserAutoPrunePolicy")
def put(self, policy_uuid):
"""
Updates the auto-prune policy for the currently logged in user
"""
user = get_authenticated_user()
app_data = request.get_json()
method = app_data.get("method", None)
value = app_data.get("value", None)
tag_pattern = app_data.get("tagPattern", None)
if tag_pattern is not None and isinstance(tag_pattern, str):
tag_pattern = tag_pattern.strip()
tag_pattern_matches = app_data.get("tagPatternMatches", True)
if method is None or value is None:
request_error(message="Missing the following parameters: method, value")
policy_config = {
"method": method,
"value": value,
"tag_pattern": tag_pattern,
"tag_pattern_matches": tag_pattern_matches,
}
try:
updated = model.autoprune.update_namespace_autoprune_policy(
user.username, policy_uuid, policy_config
)
if not updated:
raise InvalidRequest("could not update policy")
except model.InvalidNamespaceException:
raise NotFound()
except model.InvalidNamespaceAutoPrunePolicy as ex:
request_error(ex)
except model.NamespaceAutoPrunePolicyDoesNotExist as ex:
raise NotFound()
log_action(
"update_namespace_autoprune_policy",
user.username,
{
"method": policy_config["method"],
"value": policy_config["value"],
"namespace": user.username,
"tag_pattern": policy_config.get("tag_pattern"),
"tag_pattern_matches": policy_config.get("tag_pattern_matches"),
},
)
return {"uuid": policy_uuid}, 204
@require_user_admin()
@nickname("deleteUserAutoPrunePolicy")
def delete(self, policy_uuid):
"""
Deletes the auto-prune policy for the currently logged in user
"""
user = get_authenticated_user()
try:
updated = model.autoprune.delete_namespace_autoprune_policy(user.username, policy_uuid)
if not updated:
raise InvalidRequest("could not delete policy")
except model.InvalidNamespaceException as ex:
raise NotFound()
except model.NamespaceAutoPrunePolicyDoesNotExist as ex:
raise NotFound()
log_action(
"delete_namespace_autoprune_policy",
user.username,
{"policy_uuid": policy_uuid, "namespace": user.username},
)
return {"uuid": policy_uuid}, 200