From 2511b45e891c69ca90fa5332cf6d4760f6b0b91e Mon Sep 17 00:00:00 2001 From: Dave O'Connor <1656866+HammerMeetNail@users.noreply.github.com> Date: Thu, 13 Nov 2025 09:38:11 -0500 Subject: [PATCH] fix(api): superuser panel access without SUPERUSERS_FULL_ACCESS (PROJQUAY-9693) (#4455) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix(api): implement proper superuser permission model and fix access controls Fixes multiple issues with superuser functionality and implements a comprehensive permission model for FEATURE_SUPERUSERS_FULL_ACCESS: **Permission Model:** - Global Readonly Superusers (auditors): Always have read access to all content, independent of FEATURE_SUPERUSERS_FULL_ACCESS setting - Regular Superusers: Can access /v1/superuser endpoints and their own content. Require FEATURE_SUPERUSERS_FULL_ACCESS=true for cross-namespace read access - Full Access Superusers: Regular superusers with FULL_ACCESS enabled, can perform CRUD on content they don't own - Write operations: Only allowed for full access superusers (global readonly superusers never get write access) **Key Fixes:** 1. Fixed superuser panel endpoints returning 403 when FULL_ACCESS was disabled. Basic panel operations (user list, logs, org list, messages) now work with just FEATURE_SUPER_USERS enabled. 2. Updated decorators to properly differentiate between basic superuser operations and permission bypass operations. 3. Implemented license bypass: Superusers with FULL_ACCESS now bypass license/quota limits when creating or modifying private repositories. 4. Fixed 18 permission checks across 7 files to properly implement cross-namespace access controls for different superuser types. **Changes:** - endpoints/api/__init__.py: Fixed allow_if_superuser(), require_repo_permission, and decorators - endpoints/api/superuser.py: Updated SuperUserAppTokens permission check - endpoints/api/organization.py: Updated 4 GET endpoints to require FULL_ACCESS - endpoints/api/namespacequota.py: Updated 2 GET endpoints to require FULL_ACCESS - endpoints/api/team.py: Updated 2 GET endpoints to require FULL_ACCESS - endpoints/api/prototype.py: Updated 1 GET endpoint to require FULL_ACCESS - endpoints/api/policy.py: Updated auto-prune policy endpoints - endpoints/api/robot.py: Updated robot endpoints - endpoints/api/build.py: Updated repository build logs - endpoints/api/repository.py: Added license bypass for superusers with FULL_ACCESS - endpoints/api/repository_models_pre_oci.py: Updated repository visibility query - endpoints/api/logs.py: Fixed log access to require FULL_ACCESS for permission bypass - endpoints/api/test/test_superuser_full_access.py: Added comprehensive test suite - endpoints/api/test/test_appspecifictoken.py: Updated test mocking and added 403 test - test/test_api_usage.py: Updated test expectations for license bypass behavior 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- endpoints/api/__init__.py | 58 +++- endpoints/api/build.py | 13 +- endpoints/api/logs.py | 7 +- endpoints/api/namespacequota.py | 18 +- endpoints/api/organization.py | 57 ++-- endpoints/api/policy.py | 41 ++- endpoints/api/prototype.py | 10 +- endpoints/api/repository.py | 7 +- endpoints/api/repository_models_pre_oci.py | 8 +- endpoints/api/robot.py | 40 ++- endpoints/api/superuser.py | 7 +- endpoints/api/team.py | 17 +- endpoints/api/test/test_appspecifictoken.py | 25 +- .../api/test/test_superuser_full_access.py | 254 ++++++++++++++++++ endpoints/api/trigger.py | 3 +- endpoints/v2/v2auth.py | 11 +- test/test_api_usage.py | 78 +++++- 17 files changed, 564 insertions(+), 90 deletions(-) create mode 100644 endpoints/api/test/test_superuser_full_access.py diff --git a/endpoints/api/__init__.py b/endpoints/api/__init__.py index 428c0fb52..10e04789f 100644 --- a/endpoints/api/__init__.py +++ b/endpoints/api/__init__.py @@ -350,11 +350,24 @@ def require_repo_permission(permission_class, scope, allow_public=False): ): return func(self, namespace, repository, *args, **kwargs) - if features.SUPERUSERS_FULL_ACCESS and allow_for_superuser: + if allow_for_superuser: user = get_authenticated_user() - if user is not None and allow_if_superuser(): - return func(self, namespace, repository, *args, **kwargs) + if user is not None: + # For read operations that also allow global readonly superusers, + # allow any superuser with FULL_ACCESS + if ( + allow_for_global_readonly_superuser + and features.SUPERUSERS_FULL_ACCESS + and allow_if_any_superuser() + ): + return func(self, namespace, repository, *args, **kwargs) + # For write operations, only allow regular superusers with FULL_ACCESS + elif ( + not allow_for_global_readonly_superuser + and allow_if_superuser_with_full_access() + ): + return func(self, namespace, repository, *args, **kwargs) if allow_for_global_readonly_superuser and allow_if_global_readonly_superuser(): return func(self, namespace, repository, *args, **kwargs) @@ -395,7 +408,7 @@ def require_user_permission(permission_class, scope=None): if permission.can(): return func(self, *args, **kwargs) - if features.SUPERUSERS_FULL_ACCESS and allow_for_superuser and allow_if_superuser(): + if allow_for_superuser and allow_if_superuser_with_full_access(): return func(self, *args, **kwargs) raise Unauthorized() @@ -493,18 +506,47 @@ log_unauthorized_delete = log_unauthorized("delete_tag_failed") def allow_if_superuser(): + """ + Returns True if the user is a regular superuser (not global readonly). + + This is for basic superuser panel access and should work when FEATURE_SUPER_USERS + is enabled, regardless of SUPERUSERS_FULL_ACCESS. This does NOT grant permission + to bypass normal access controls on other users' resources. + + For operations that need to bypass normal permission checks (like accessing other + organizations or creating repos in other namespaces), use allow_if_superuser_with_full_access(). + """ + return SuperUserPermission().can() + + +def allow_if_superuser_with_full_access(): + """ + Returns True if the user is a superuser with full access enabled. + + This is for operations that bypass normal permission checks to access or modify + resources owned by other users/organizations. Examples include: + - Creating repositories in other namespaces + - Modifying teams/robots in other organizations + - Accessing private data of other organizations + + Requires both: + - User is a superuser (FEATURE_SUPER_USERS enabled and user in SUPER_USERS list) + - FEATURE_SUPERUSERS_FULL_ACCESS is enabled + + Note: Global readonly superusers are explicitly excluded from this permission. + """ return bool(features.SUPERUSERS_FULL_ACCESS and SuperUserPermission().can()) def allow_if_any_superuser(): """ - Returns True if the user is either a regular superuser (with SUPERUSERS_FULL_ACCESS enabled) - or a global readonly superuser. + Returns True if the user is either a regular superuser or a global readonly superuser. Since these two types are mutually exclusive, this is a convenience helper for read-only - endpoints that should be accessible to both types of superusers. + endpoints that should be accessible to both types of superusers (like viewing user lists, + logs, organizations in the superuser panel). - Note: Regular superusers require SUPERUSERS_FULL_ACCESS to be enabled, but global readonly + Note: Regular superusers work with just FEATURE_SUPER_USERS enabled. Global readonly superusers are always allowed (when the feature is enabled) since they're read-only by design. """ return allow_if_superuser() or allow_if_global_readonly_superuser() diff --git a/endpoints/api/build.py b/endpoints/api/build.py index 1e5968dc7..5a24ac51a 100644 --- a/endpoints/api/build.py +++ b/endpoints/api/build.py @@ -30,6 +30,7 @@ from endpoints.api import ( allow_if_any_superuser, allow_if_global_readonly_superuser, allow_if_superuser, + allow_if_superuser_with_full_access, api, disallow_for_app_repositories, disallow_for_non_normal_repositories, @@ -310,7 +311,7 @@ class RepositoryBuildList(RepositoryParamResource): (robot_namespace, _) = result if ( not AdministerOrganizationPermission(robot_namespace).can() - and not allow_if_superuser() + and not allow_if_superuser_with_full_access() ): raise Unauthorized() else: @@ -326,7 +327,7 @@ class RepositoryBuildList(RepositoryParamResource): not ModifyRepositoryPermission( associated_repository.namespace_user.username, associated_repository.name ) - and not allow_if_superuser() + and not allow_if_superuser_with_full_access() ): raise Unauthorized() @@ -516,7 +517,13 @@ class RepositoryBuildLogs(RepositoryParamResource): Return the build logs for the build specified by the build uuid. """ can_write = ModifyRepositoryPermission(namespace, repository).can() - if not features.READER_BUILD_LOGS and not can_write and not allow_if_any_superuser(): + # Global readonly superusers can always view build logs, regular superusers need FULL_ACCESS + if ( + not features.READER_BUILD_LOGS + and not can_write + and not allow_if_global_readonly_superuser() + and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): raise Unauthorized() build = model.build.get_repository_build(build_uuid) diff --git a/endpoints/api/logs.py b/endpoints/api/logs.py index a826a97b1..4276a1f38 100644 --- a/endpoints/api/logs.py +++ b/endpoints/api/logs.py @@ -20,6 +20,7 @@ from endpoints.api import ( allow_if_any_superuser, allow_if_global_readonly_superuser, allow_if_superuser, + allow_if_superuser_with_full_access, format_date, log_action, nickname, @@ -204,7 +205,7 @@ class OrgLogs(ApiResource): List the logs for the specified organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + if permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): performer_name = parsed_args["performer"] start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] @@ -296,7 +297,7 @@ class OrgAggregateLogs(ApiResource): Gets the aggregated logs for the specified organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + if permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): performer_name = parsed_args["performer"] start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] @@ -492,7 +493,7 @@ class ExportOrgLogs(ApiResource): Exports the logs for the specified organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] diff --git a/endpoints/api/namespacequota.py b/endpoints/api/namespacequota.py index b43ad57e9..e1b0f5b4a 100644 --- a/endpoints/api/namespacequota.py +++ b/endpoints/api/namespacequota.py @@ -100,7 +100,12 @@ class OrganizationQuotaList(ApiResource): @nickname("listOrganizationQuota") def get(self, orgname): orgperm = OrganizationMemberPermission(orgname) - if not orgperm.can() and not allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + not orgperm.can() + and not allow_if_global_readonly_superuser() + and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): raise Unauthorized() try: @@ -201,7 +206,12 @@ class OrganizationQuota(ApiResource): @nickname("getOrganizationQuota") def get(self, orgname, quota_id): orgperm = OrganizationMemberPermission(orgname) - if not orgperm.can() and not allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + not orgperm.can() + and not allow_if_global_readonly_superuser() + and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): raise Unauthorized() quota = get_quota(orgname, quota_id) @@ -277,7 +287,7 @@ class OrganizationQuotaLimitList(ApiResource): @nickname("listOrganizationQuotaLimit") def get(self, orgname, quota_id): orgperm = OrganizationMemberPermission(orgname) - if not orgperm.can() and not allow_if_any_superuser(): + if not orgperm.can() and not (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): raise Unauthorized() quota = get_quota(orgname, quota_id) @@ -347,7 +357,7 @@ class OrganizationQuotaLimit(ApiResource): @nickname("getOrganizationQuotaLimit") def get(self, orgname, quota_id, limit_id): orgperm = OrganizationMemberPermission(orgname) - if not orgperm.can() and not allow_if_any_superuser(): + if not orgperm.can() and not (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): raise Unauthorized() quota = get_quota(orgname, quota_id) diff --git a/endpoints/api/organization.py b/endpoints/api/organization.py index f26c5e385..1aaffc9fe 100644 --- a/endpoints/api/organization.py +++ b/endpoints/api/organization.py @@ -31,6 +31,7 @@ from endpoints.api import ( allow_if_any_superuser, allow_if_global_readonly_superuser, allow_if_superuser, + allow_if_superuser_with_full_access, internal_only, log_action, nickname, @@ -88,11 +89,14 @@ def team_view(orgname, team): def org_view(o, teams): is_admin = AdministerOrganizationPermission(o.username).can() is_member = OrganizationMemberPermission(o.username).can() - is_any_superuser = allow_if_any_superuser() + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + can_view_as_superuser = allow_if_global_readonly_superuser() or ( + features.SUPERUSERS_FULL_ACCESS and allow_if_superuser() + ) view = { "name": o.username, - "email": o.email if is_admin or is_any_superuser else "", + "email": o.email if is_admin or can_view_as_superuser else "", "avatar": avatar.get_data_for_user(o), "is_admin": is_admin, "is_member": is_member, @@ -258,7 +262,12 @@ class Organization(ApiResource): raise NotFound() teams = None - if OrganizationMemberPermission(orgname).can() or allow_if_any_superuser(): + # Global readonly superusers can always view teams, regular superusers need FULL_ACCESS + if ( + OrganizationMemberPermission(orgname).can() + or allow_if_global_readonly_superuser() + or (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): has_syncing = features.TEAM_SYNCING and bool(authentication.federated_service) teams = model.team.get_teams_within_org(org, has_syncing) @@ -272,7 +281,7 @@ class Organization(ApiResource): Change the details for the specified organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -339,7 +348,7 @@ class Organization(ApiResource): Deletes the specified organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -436,7 +445,12 @@ class OrganizationCollaboratorList(ApiResource): List outside collaborators of the specified organization. """ permission = AdministerOrganizationPermission(orgname) - if not permission.can() and not allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + not permission.can() + and not allow_if_global_readonly_superuser() + and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): raise Unauthorized() try: @@ -484,7 +498,7 @@ class OrganizationMemberList(ApiResource): List the human members of the specified organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + if permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -545,7 +559,7 @@ class OrganizationMember(ApiResource): Retrieves the details of a member of the organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + if permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): # Lookup the user. member = model.user.get_user(membername) if not member: @@ -595,7 +609,7 @@ class OrganizationMember(ApiResource): it from all teams in the organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): # Lookup the user. user = model.user.get_nonrobot_user(membername) if not user: @@ -706,7 +720,7 @@ class OrganizationApplications(ApiResource): List the applications for the specified organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + if permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -725,7 +739,7 @@ class OrganizationApplications(ApiResource): Creates a new application under this organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -796,7 +810,7 @@ class OrganizationApplicationResource(ApiResource): Retrieves the application with the specified client_id under the specified organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + if permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -818,7 +832,7 @@ class OrganizationApplicationResource(ApiResource): Updates an application under this organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -852,7 +866,7 @@ class OrganizationApplicationResource(ApiResource): Deletes the application under this organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -887,7 +901,7 @@ class OrganizationApplicationResetClientSecret(ApiResource): Resets the client secret of the application. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -944,7 +958,12 @@ class OrganizationProxyCacheConfig(ApiResource): Retrieves the proxy cache configuration of the organization. """ permission = OrganizationMemberPermission(orgname) - if not permission.can() and not allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + not permission.can() + and not allow_if_global_readonly_superuser() + and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): raise Unauthorized() try: @@ -961,7 +980,7 @@ class OrganizationProxyCacheConfig(ApiResource): Creates proxy cache configuration for the organization. """ permission = AdministerOrganizationPermission(orgname) - if not permission.can() and not allow_if_superuser(): + if not permission.can() and not allow_if_superuser_with_full_access(): raise Unauthorized() try: @@ -998,7 +1017,7 @@ class OrganizationProxyCacheConfig(ApiResource): Delete proxy cache configuration for the organization. """ permission = AdministerOrganizationPermission(orgname) - if not permission.can() and not allow_if_superuser(): + if not permission.can() and not allow_if_superuser_with_full_access(): raise Unauthorized() try: @@ -1041,7 +1060,7 @@ class ProxyCacheConfigValidation(ApiResource): @validate_json_request("NewProxyCacheConfig") def post(self, orgname): permission = AdministerOrganizationPermission(orgname) - if not permission.can() and not allow_if_superuser(): + if not permission.can() and not allow_if_superuser_with_full_access(): raise Unauthorized() try: diff --git a/endpoints/api/policy.py b/endpoints/api/policy.py index 21a51b03c..1e3e1dda6 100644 --- a/endpoints/api/policy.py +++ b/endpoints/api/policy.py @@ -17,6 +17,7 @@ from endpoints.api import ( allow_if_any_superuser, allow_if_global_readonly_superuser, allow_if_superuser, + allow_if_superuser_with_full_access, log_action, nickname, path_param, @@ -74,7 +75,12 @@ class OrgAutoPrunePolicies(ApiResource): Lists the auto-prune policies for the organization """ permission = AdministerOrganizationPermission(orgname) - if not permission.can() and not allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + not permission.can() + and not allow_if_global_readonly_superuser() + and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): raise Unauthorized() policies = model.autoprune.get_namespace_autoprune_policies_by_orgname(orgname) @@ -89,7 +95,7 @@ class OrgAutoPrunePolicies(ApiResource): Creates an auto-prune policy for the organization """ permission = AdministerOrganizationPermission(orgname) - if not permission.can() and not allow_if_superuser(): + if not permission.can() and not allow_if_superuser_with_full_access(): raise Unauthorized() app_data = request.get_json() @@ -178,7 +184,12 @@ class OrgAutoPrunePolicy(ApiResource): Fetches the auto-prune policy for the organization """ permission = AdministerOrganizationPermission(orgname) - if not permission.can() and not allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + not permission.can() + and not allow_if_global_readonly_superuser() + and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): raise Unauthorized() policy = model.autoprune.get_namespace_autoprune_policy(orgname, policy_uuid) @@ -195,7 +206,7 @@ class OrgAutoPrunePolicy(ApiResource): Updates the auto-prune policy for the organization """ permission = AdministerOrganizationPermission(orgname) - if not permission.can() and not allow_if_superuser(): + if not permission.can() and not allow_if_superuser_with_full_access(): raise Unauthorized() app_data = request.get_json() @@ -250,7 +261,7 @@ class OrgAutoPrunePolicy(ApiResource): Deletes the auto-prune policy for the organization """ permission = AdministerOrganizationPermission(orgname) - if not permission.can() and not allow_if_superuser(): + if not permission.can() and not allow_if_superuser_with_full_access(): raise Unauthorized() try: @@ -312,7 +323,12 @@ class RepositoryAutoPrunePolicies(RepositoryParamResource): Lists the auto-prune policies for the repository """ permission = AdministerRepositoryPermission(namespace, repository) - if not permission.can() and not allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + not permission.can() + and not allow_if_global_readonly_superuser() + and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): raise Unauthorized() if registry_model.lookup_repository(namespace, repository) is None: @@ -332,7 +348,7 @@ class RepositoryAutoPrunePolicies(RepositoryParamResource): Creates an auto-prune policy for the repository """ permission = AdministerRepositoryPermission(namespace, repository) - if not permission.can() and not allow_if_superuser(): + if not permission.can() and not allow_if_superuser_with_full_access(): raise Unauthorized() if registry_model.lookup_repository(namespace, repository) is None: @@ -428,7 +444,12 @@ class RepositoryAutoPrunePolicy(RepositoryParamResource): Fetches the auto-prune policy for the repository """ permission = AdministerRepositoryPermission(namespace, repository) - if not permission.can() and not allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + not permission.can() + and not allow_if_global_readonly_superuser() + and not (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): raise Unauthorized() policy = model.autoprune.get_repository_autoprune_policy_by_uuid(repository, policy_uuid) @@ -445,7 +466,7 @@ class RepositoryAutoPrunePolicy(RepositoryParamResource): Updates the auto-prune policy for the repository """ permission = AdministerRepositoryPermission(namespace, repository) - if not permission.can() and not allow_if_superuser(): + if not permission.can() and not allow_if_superuser_with_full_access(): raise Unauthorized() app_data = request.get_json() @@ -504,7 +525,7 @@ class RepositoryAutoPrunePolicy(RepositoryParamResource): Deletes the auto-prune policy for the repository """ permission = AdministerRepositoryPermission(namespace, repository) - if not permission.can() and not allow_if_superuser(): + if not permission.can() and not allow_if_superuser_with_full_access(): raise Unauthorized() try: diff --git a/endpoints/api/prototype.py b/endpoints/api/prototype.py index 2033183ed..4720029cc 100644 --- a/endpoints/api/prototype.py +++ b/endpoints/api/prototype.py @@ -4,6 +4,7 @@ Manage default permissions added to repositories. from flask import request +import features from app import avatar from auth import scopes from auth.auth_context import get_authenticated_user @@ -14,6 +15,7 @@ from endpoints.api import ( allow_if_any_superuser, allow_if_global_readonly_superuser, allow_if_superuser, + allow_if_superuser_with_full_access, log_action, nickname, path_param, @@ -147,7 +149,7 @@ class PermissionPrototypeList(ApiResource): List the existing prototypes for this organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + if permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -173,7 +175,7 @@ class PermissionPrototypeList(ApiResource): Create a new permission prototype. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -264,7 +266,7 @@ class PermissionPrototype(ApiResource): Delete an existing permission prototype. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: @@ -288,7 +290,7 @@ class PermissionPrototype(ApiResource): Update the role of an existing permission prototype. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): try: org = model.organization.get_organization(orgname) except model.InvalidOrganizationException: diff --git a/endpoints/api/repository.py b/endpoints/api/repository.py index 42e4ca288..77bcd1dfb 100644 --- a/endpoints/api/repository.py +++ b/endpoints/api/repository.py @@ -29,6 +29,7 @@ from endpoints.api import ( ApiResource, RepositoryParamResource, allow_if_superuser, + allow_if_superuser_with_full_access, format_date, log_action, nickname, @@ -69,6 +70,10 @@ def check_allowed_private_repos(namespace): If so, raises a ExceedsLicenseException. """ + # Superusers with full access bypass license limits + if allow_if_superuser_with_full_access(): + return + # Not enabled if billing is disabled. if not features.BILLING: return @@ -142,7 +147,7 @@ class RepositoryList(ApiResource): permission = CreateRepositoryPermission(namespace_name) - if (permission.can() or allow_if_superuser()) and not ( + if (permission.can() or allow_if_superuser_with_full_access()) and not ( features.RESTRICTED_USERS and usermanager.is_restricted_user(owner.username) and owner.username == namespace_name diff --git a/endpoints/api/repository_models_pre_oci.py b/endpoints/api/repository_models_pre_oci.py index c812e77b9..a981f509c 100644 --- a/endpoints/api/repository_models_pre_oci.py +++ b/endpoints/api/repository_models_pre_oci.py @@ -8,7 +8,7 @@ from data.database import Repository as RepositoryTable from data.database import RepositoryState from data.registry_model import registry_model from data.registry_model.datatypes import RepositoryReference -from endpoints.api import allow_if_any_superuser +from endpoints.api import allow_if_global_readonly_superuser, allow_if_superuser from endpoints.api.repository_models_interface import ( ApplicationRepository, Channel, @@ -129,6 +129,7 @@ class PreOCIModel(RepositoryDataInterface): # Also note the +1 on the limit, as paginate_query uses the extra result to determine whether # there is a next page. start_id = model.modelutil.pagination_start(page_token) + # Global readonly superusers can always see all repos, regular superusers need FULL_ACCESS repo_query = model.repository.get_visible_repositories( username=username, include_public=public, @@ -136,7 +137,10 @@ class PreOCIModel(RepositoryDataInterface): limit=REPOS_PER_PAGE + 1, kind_filter=repo_kind, namespace=namespace, - is_superuser=allow_if_any_superuser(), + is_superuser=( + allow_if_global_readonly_superuser() + or (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ), ) repos, next_page_token = model.modelutil.paginate_query( diff --git a/endpoints/api/robot.py b/endpoints/api/robot.py index e9c2b2380..ed08877e9 100644 --- a/endpoints/api/robot.py +++ b/endpoints/api/robot.py @@ -6,6 +6,7 @@ import logging from flask import abort, request +import features from auth import scopes from auth.auth_context import get_authenticated_user from auth.permissions import ( @@ -27,6 +28,7 @@ from endpoints.api import ( allow_if_any_superuser, allow_if_global_readonly_superuser, allow_if_superuser, + allow_if_superuser_with_full_access, log_action, max_json_size, nickname, @@ -221,7 +223,12 @@ class OrgRobotList(ApiResource): List the organization's robots. """ permission = OrganizationMemberPermission(orgname) - if permission.can() or allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + permission.can() + or allow_if_global_readonly_superuser() + or (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): include_token = ( AdministerOrganizationPermission(orgname).can() or allow_if_global_readonly_superuser() @@ -261,7 +268,12 @@ class OrgRobot(ApiResource): Returns the organization's robot with the specified name. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + permission.can() + or allow_if_global_readonly_superuser() + or (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): robot = model.get_org_robot(robot_shortname, orgname) return robot.to_dict(include_metadata=True, include_token=True) @@ -276,7 +288,7 @@ class OrgRobot(ApiResource): Create a new robot in the organization. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): create_data = request.get_json(silent=True) or {} try: @@ -308,7 +320,7 @@ class OrgRobot(ApiResource): Delete an existing organization robot. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): robot_username = format_robot_username(orgname, robot_shortname) if not model.robot_has_mirror(robot_username): model.delete_robot(robot_username) @@ -360,7 +372,12 @@ class OrgRobotPermissions(ApiResource): Returns the list of repository permissions for the org's robot. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + permission.can() + or allow_if_global_readonly_superuser() + or (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): robot = model.get_org_robot(robot_shortname, orgname) permissions = model.list_robot_permissions(robot.name) @@ -408,7 +425,7 @@ class RegenerateOrgRobot(ApiResource): Regenerates the token for an organization robot. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): robot = model.regenerate_org_robot_token(robot_shortname, orgname) log_action("regenerate_robot_token", orgname, {"robot": robot_shortname}) return robot.to_dict(include_token=True) @@ -431,7 +448,12 @@ class OrgRobotFederation(ApiResource): @require_scope(scopes.ORG_ADMIN) def get(self, orgname, robot_shortname): permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + # Global readonly superusers can always view, regular superusers need FULL_ACCESS + if ( + permission.can() + or allow_if_global_readonly_superuser() + or (features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()) + ): robot_username = format_robot_username(orgname, robot_shortname) robot = lookup_robot(robot_username) return get_robot_federation_config(robot) @@ -442,7 +464,7 @@ class OrgRobotFederation(ApiResource): @validate_json_request("CreateRobotFederation", optional=False) def post(self, orgname, robot_shortname): permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): fed_config = self._parse_federation_config(request) robot_username = format_robot_username(orgname, robot_shortname) @@ -460,7 +482,7 @@ class OrgRobotFederation(ApiResource): @require_scope(scopes.ORG_ADMIN) def delete(self, orgname, robot_shortname): permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): robot_username = format_robot_username(orgname, robot_shortname) robot = lookup_robot(robot_username) delete_robot_federation_config(robot) diff --git a/endpoints/api/superuser.py b/endpoints/api/superuser.py index de158207e..01b55b30c 100644 --- a/endpoints/api/superuser.py +++ b/endpoints/api/superuser.py @@ -32,6 +32,7 @@ from endpoints.api import ( Unauthorized, allow_if_any_superuser, allow_if_global_readonly_superuser, + allow_if_superuser_with_full_access, format_date, internal_only, log_action, @@ -1250,9 +1251,11 @@ class SuperUserAppTokens(ApiResource): """ Returns a list of all app specific tokens in the system. - This endpoint is for system-wide auditing by superusers and global read-only superusers. + This endpoint is for system-wide auditing by global read-only superusers and + full access superusers only. Regular superusers without full access are denied. """ - if allow_if_any_superuser(): + # Global readonly superusers can always audit, regular superusers need FULL_ACCESS + if allow_if_global_readonly_superuser() or allow_if_superuser_with_full_access(): expiring = parsed_args["expiring"] if expiring: diff --git a/endpoints/api/team.py b/endpoints/api/team.py index df41f4f92..cc8e766c5 100644 --- a/endpoints/api/team.py +++ b/endpoints/api/team.py @@ -24,6 +24,7 @@ from endpoints.api import ( allow_if_any_superuser, allow_if_global_readonly_superuser, allow_if_superuser, + allow_if_superuser_with_full_access, format_date, internal_only, log_action, @@ -212,7 +213,7 @@ class OrganizationTeam(ApiResource): Update the org-wide permission for the specified team. """ edit_permission = AdministerOrganizationPermission(orgname) - if edit_permission.can() or allow_if_superuser(): + if edit_permission.can() or allow_if_superuser_with_full_access(): team = None details = request.get_json() @@ -262,7 +263,7 @@ class OrganizationTeam(ApiResource): Delete the specified team. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): model.team.remove_team(orgname, teamname, get_authenticated_user().username) log_action("org_delete_team", orgname, {"team": teamname}) return "", 204 @@ -360,7 +361,7 @@ class TeamMemberList(ApiResource): view_permission = ViewTeamPermission(orgname, teamname) edit_permission = AdministerOrganizationPermission(orgname) - if view_permission.can() or allow_if_any_superuser(): + if view_permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): team = None try: team = model.team.get_organization_team(orgname, teamname) @@ -423,7 +424,7 @@ class TeamMember(ApiResource): Adds or invites a member to an existing team. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): team = None user = None @@ -465,7 +466,7 @@ class TeamMember(ApiResource): If the user is merely invited to join the team, then the invite is removed instead. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): # Remote the user from the team. invoking_user = get_authenticated_user().username @@ -515,7 +516,7 @@ class InviteTeamMember(ApiResource): Invites an email address to an existing team. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): team = None # Find the team. @@ -543,7 +544,7 @@ class InviteTeamMember(ApiResource): Delete an invite of an email address to join a team. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_superuser(): + if permission.can() or allow_if_superuser_with_full_access(): team = None # Find the team. @@ -580,7 +581,7 @@ class TeamPermissions(ApiResource): Returns the list of repository permissions for the org's team. """ permission = AdministerOrganizationPermission(orgname) - if permission.can() or allow_if_any_superuser(): + if permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()): try: team = model.team.get_organization_team(orgname, teamname) except model.InvalidTeamException: diff --git a/endpoints/api/test/test_appspecifictoken.py b/endpoints/api/test/test_appspecifictoken.py index 80ceaddaa..604d9333b 100644 --- a/endpoints/api/test/test_appspecifictoken.py +++ b/endpoints/api/test/test_appspecifictoken.py @@ -337,16 +337,10 @@ def test_superuser_endpoint_sees_all_tokens(app): assert "token_code" not in token # Test global readonly superuser - # Mock global readonly superuser by mocking the permission classes - with patch("endpoints.api.SuperUserPermission") as mock_super_perm, patch( - "endpoints.api.GlobalReadOnlySuperUserPermission" - ) as mock_global_ro_perm, patch( - "endpoints.api.superuser.allow_if_any_superuser", return_value=True - ): - # Not a regular superuser, but is a global readonly superuser - mock_super_perm.return_value.can.return_value = False - mock_global_ro_perm.return_value.can.return_value = True - + # Mock global readonly superuser by mocking the permission functions + with patch( + "endpoints.api.superuser.allow_if_global_readonly_superuser", return_value=True + ), patch("endpoints.api.superuser.allow_if_superuser_with_full_access", return_value=False): with client_with_identity("reader", app) as cl: # On /v1/superuser/apptokens, global readonly superuser should see all tokens resp = conduct_api_call(cl, SuperUserAppTokens, "GET", None, None, 200).json @@ -363,6 +357,17 @@ def test_superuser_endpoint_sees_all_tokens(app): reader_token.delete_instance() +def test_superuser_endpoint_requires_full_access(app): + """Test that regular superusers without FULL_ACCESS get 403 on /v1/superuser/apptokens""" + # Mock a regular superuser (not global readonly) without FULL_ACCESS + with patch( + "endpoints.api.superuser.allow_if_global_readonly_superuser", return_value=False + ), patch("endpoints.api.superuser.allow_if_superuser_with_full_access", return_value=False): + with client_with_identity("devtable", app) as cl: + # Regular superuser without FULL_ACCESS should get 403 + conduct_api_call(cl, SuperUserAppTokens, "GET", None, None, 403) + + def test_superuser_endpoint_expiring_tokens(app): """Test expiring token filtering on /v1/superuser/apptokens""" devtable_user = model.user.get_user("devtable") diff --git a/endpoints/api/test/test_superuser_full_access.py b/endpoints/api/test/test_superuser_full_access.py new file mode 100644 index 000000000..43e4e5c71 --- /dev/null +++ b/endpoints/api/test/test_superuser_full_access.py @@ -0,0 +1,254 @@ +""" +Tests for superuser functionality with and without FEATURE_SUPERUSERS_FULL_ACCESS. + +This tests the fix for the bug where superuser panel endpoints return 403 +when FEATURE_SUPERUSERS_FULL_ACCESS is disabled, even though they should +work with just FEATURE_SUPER_USERS enabled. +""" + +import pytest + +from endpoints.api.globalmessages import GlobalUserMessages +from endpoints.api.organization import Organization +from endpoints.api.repository import RepositoryList +from endpoints.api.superuser import ( + SuperUserAggregateLogs, + SuperUserList, + SuperUserLogs, + SuperUserManagement, + SuperUserOrganizationList, +) +from endpoints.api.team import OrganizationTeam +from endpoints.api.test.shared import conduct_api_call +from endpoints.test.shared import client_with_identity +from test.fixtures import * + + +class TestSuperuserBasicAccessWithoutFullAccess: + """ + Tests that basic superuser panel endpoints work with FEATURE_SUPER_USERS=true + and FEATURE_SUPERUSERS_FULL_ACCESS=false. + + These are core superuser functions and should NOT require full access. + """ + + @pytest.fixture(autouse=True) + def setup(self, app): + """Disable SUPERUSERS_FULL_ACCESS for these tests.""" + import features + + # Ensure SUPER_USERS is enabled but FULL_ACCESS is disabled + features.import_features( + { + "FEATURE_SUPER_USERS": True, + "FEATURE_SUPERUSERS_FULL_ACCESS": False, + } + ) + yield + # Reset to default test config + features.import_features( + { + "FEATURE_SUPER_USERS": True, + "FEATURE_SUPERUSERS_FULL_ACCESS": True, + } + ) + + def test_superuser_can_list_users_without_full_access(self, app): + """ + Test that superusers can access /v1/superuser/users/ without FULL_ACCESS. + + This is a core superuser panel function and should work with just + FEATURE_SUPER_USERS enabled. + """ + with client_with_identity("devtable", app) as cl: + result = conduct_api_call(cl, SuperUserList, "GET", None, None, 200) + assert result.json is not None + assert "users" in result.json + assert len(result.json["users"]) > 0 + + def test_superuser_can_get_user_details_without_full_access(self, app): + """ + Test that superusers can access /v1/superuser/users/ without FULL_ACCESS. + """ + with client_with_identity("devtable", app) as cl: + params = {"username": "randomuser"} + result = conduct_api_call(cl, SuperUserManagement, "GET", params, None, 200) + assert result.json is not None + assert result.json["username"] == "randomuser" + + def test_superuser_can_list_organizations_without_full_access(self, app): + """ + Test that superusers can access /v1/superuser/organizations/ without FULL_ACCESS. + """ + with client_with_identity("devtable", app) as cl: + result = conduct_api_call(cl, SuperUserOrganizationList, "GET", None, None, 200) + assert result.json is not None + assert "organizations" in result.json + assert len(result.json["organizations"]) > 0 + + def test_superuser_can_view_logs_without_full_access(self, app): + """ + Test that superusers can access /v1/superuser/logs without FULL_ACCESS. + """ + with client_with_identity("devtable", app) as cl: + result = conduct_api_call(cl, SuperUserLogs, "GET", None, None, 200) + assert result.json is not None + assert "logs" in result.json + + def test_superuser_can_view_aggregate_logs_without_full_access(self, app): + """ + Test that superusers can access /v1/superuser/aggregatelogs without FULL_ACCESS. + """ + with client_with_identity("devtable", app) as cl: + params = {"starttime": "01/01/2024 UTC", "endtime": "12/31/2024 UTC"} + result = conduct_api_call(cl, SuperUserAggregateLogs, "GET", params, None, 200) + assert result.json is not None + assert "aggregated" in result.json + + def test_superuser_can_manage_global_messages_without_full_access(self, app): + """ + Test that superusers can create/delete global messages without FULL_ACCESS. + + Managing global messages is a core superuser function. + """ + with client_with_identity("devtable", app) as cl: + # Create a global message + body = { + "message": { + "severity": "info", + "media_type": "text/plain", + "content": "Test message", + } + } + result = conduct_api_call(cl, GlobalUserMessages, "POST", None, body, 201) + assert result.status_code == 201 + + +class TestSuperuserFullAccessRequired: + """ + Tests that operations requiring FEATURE_SUPERUSERS_FULL_ACCESS are properly + blocked when the feature is disabled. + + These operations bypass normal permission checks to access/modify resources + owned by other users/organizations. + """ + + @pytest.fixture(autouse=True) + def setup(self, app): + """Disable SUPERUSERS_FULL_ACCESS for these tests.""" + import features + from data import model + + features.import_features( + { + "FEATURE_SUPER_USERS": True, + "FEATURE_SUPERUSERS_FULL_ACCESS": False, + } + ) + + # Create a test organization owned by randomuser (not devtable) + # This is needed to test that devtable (superuser) cannot modify it without FULL_ACCESS + randomuser = model.user.get_user("randomuser") + try: + model.organization.get_organization("testorg") + except model.InvalidOrganizationException: + model.organization.create_organization("testorg", "testorg@test.com", randomuser) + + yield + # Note: We don't clean up the organization because it has foreign key constraints + # and the test database is reset between test runs anyway + + features.import_features( + { + "FEATURE_SUPER_USERS": True, + "FEATURE_SUPERUSERS_FULL_ACCESS": True, + } + ) + + def test_superuser_cannot_create_repo_in_other_namespace_without_full_access(self, app): + """ + Test that superusers CANNOT create repos in other namespaces without FULL_ACCESS. + + This is a permission bypass operation that requires FULL_ACCESS. + """ + with client_with_identity("devtable", app) as cl: + # Try to create a repo in another user's namespace + body = { + "namespace": "randomuser", # Not devtable's namespace + "repository": "test-repo", + "visibility": "private", + "description": "test", + } + # Should be blocked without FULL_ACCESS + conduct_api_call(cl, RepositoryList, "POST", None, body, 403) + + def test_superuser_cannot_modify_other_org_team_without_full_access(self, app): + """ + Test that superusers CANNOT modify teams in other orgs without FULL_ACCESS. + + This is a permission bypass operation that requires FULL_ACCESS. + """ + with client_with_identity("devtable", app) as cl: + # Try to create a team in testorg (owned by randomuser, not devtable) + params = {"orgname": "testorg", "teamname": "testteam"} + body = {"role": "member"} + # Should be blocked without FULL_ACCESS + conduct_api_call(cl, OrganizationTeam, "PUT", params, body, 403) + + +class TestSuperuserFullAccessEnabled: + """ + Tests that operations work correctly when FEATURE_SUPERUSERS_FULL_ACCESS is enabled. + + With full access, superusers can bypass permission checks. + """ + + @pytest.fixture(autouse=True) + def setup(self, app): + """Enable SUPERUSERS_FULL_ACCESS for these tests.""" + import features + + features.import_features( + { + "FEATURE_SUPER_USERS": True, + "FEATURE_SUPERUSERS_FULL_ACCESS": True, + } + ) + yield + + def test_superuser_can_view_other_org_details_with_full_access(self, app): + """ + Test that superusers CAN view other organizations' details with FULL_ACCESS. + """ + with client_with_identity("devtable", app) as cl: + # Can view org that devtable doesn't own + params = {"orgname": "buynlarge"} + result = conduct_api_call(cl, Organization, "GET", params, None, 200) + assert result.json is not None + assert result.json["name"] == "buynlarge" + + def test_superuser_can_modify_other_org_team_with_full_access(self, app): + """ + Test that superusers CAN modify teams in other orgs with FULL_ACCESS. + """ + with client_with_identity("devtable", app) as cl: + # Can create team in org that devtable doesn't own + params = {"orgname": "buynlarge", "teamname": "testteam"} + body = {"role": "member"} + result = conduct_api_call(cl, OrganizationTeam, "PUT", params, body, 200) + assert result.json is not None + + def test_superuser_can_create_repo_in_other_namespace_with_full_access(self, app): + """ + Test that superusers CAN create repos in other namespaces with FULL_ACCESS. + """ + with client_with_identity("devtable", app) as cl: + # Can create repo in another user's namespace + body = { + "namespace": "randomuser", + "repository": "test-repo-full-access", + "visibility": "public", # Use public to avoid license limit issues + "description": "test with full access", + } + result = conduct_api_call(cl, RepositoryList, "POST", None, body, 201) + assert result.json is not None diff --git a/endpoints/api/trigger.py b/endpoints/api/trigger.py index 0ac5c526b..ac876ea09 100644 --- a/endpoints/api/trigger.py +++ b/endpoints/api/trigger.py @@ -23,6 +23,7 @@ from endpoints.api import ( RepositoryParamResource, abort, allow_if_superuser, + allow_if_superuser_with_full_access, api, disallow_for_app_repositories, disallow_for_non_normal_repositories, @@ -281,7 +282,7 @@ class BuildTriggerActivate(RepositoryParamResource): raise InvalidRequest("Trigger config is not sufficient for activation.") user_permission = UserAdminPermission(trigger.connected_user.username) - if user_permission.can() or allow_if_superuser(): + if user_permission.can() or allow_if_superuser_with_full_access(): # Update the pull robot (if any). pull_robot_name = request.get_json().get("pull_robot", None) if pull_robot_name: diff --git a/endpoints/v2/v2auth.py b/endpoints/v2/v2auth.py index 3a869193b..af3f0bc85 100644 --- a/endpoints/v2/v2auth.py +++ b/endpoints/v2/v2auth.py @@ -24,6 +24,7 @@ from data.registry_model.datatypes import RepositoryReference from endpoints.api import ( allow_if_global_readonly_superuser, allow_if_superuser, + allow_if_superuser_with_full_access, log_action, ) from endpoints.decorators import anon_protect @@ -264,7 +265,10 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context): # Lookup the repository. If it exists, make sure the entity has modify # permission. Otherwise, make sure the entity has create permission. if repository_ref: - if ModifyRepositoryPermission(namespace, reponame).can() or allow_if_superuser(): + if ( + ModifyRepositoryPermission(namespace, reponame).can() + or allow_if_superuser_with_full_access() + ): if repository_ref is not None and repository_ref.kind != "image": raise Unsupported(message=invalid_repo_message) @@ -383,7 +387,10 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context): if "*" in requested_actions: # Grant * user is admin - if AdministerRepositoryPermission(namespace, reponame).can() or allow_if_superuser(): + if ( + AdministerRepositoryPermission(namespace, reponame).can() + or allow_if_superuser_with_full_access() + ): if repository_ref is not None and repository_ref.kind != "image": raise Unsupported(message=invalid_repo_message) diff --git a/test/test_api_usage.py b/test/test_api_usage.py index 4d5495a81..7a6ea3387 100644 --- a/test/test_api_usage.py +++ b/test/test_api_usage.py @@ -2412,18 +2412,18 @@ class TestChangeRepoVisibility(ApiTestCase): # Change the subscription of the namespace. self.putJsonResponse(UserPlan, data=dict(plan="personal-2018")) - # Try to make private. + # Try to make private. Superusers with full access bypass license limits. self.postJsonResponse( RepositoryVisibility, params=dict(repository=self.SIMPLE_REPO), data=dict(visibility="private"), - expected_code=402, + expected_code=200, ) - # Verify the visibility. + # Verify the visibility changed to private (superuser bypassed license limit). json = self.getJsonResponse(Repository, params=dict(repository=self.SIMPLE_REPO)) - self.assertEqual(True, json["is_public"]) + self.assertEqual(False, json["is_public"]) def test_changevisibility(self): self.login(ADMIN_ACCESS_USER) @@ -2452,6 +2452,76 @@ class TestChangeRepoVisibility(ApiTestCase): self.assertEqual(False, json["is_public"]) + def test_plan_limit_enforcement_for_regular_users(self): + """ + Test that plan limits are enforced for regular (non-superuser) users. + + This test is identical to test_trychangevisibility but uses a non-superuser + to validate that license limits are still enforced for regular users. + """ + from data import model + + # Use NO_ACCESS_USER (freshuser) who is definitely not a superuser + # Set up billing for this user so the UserPlan endpoint works + user = model.user.get_user(NO_ACCESS_USER) + user.stripe_id = "test_stripe_id_freshuser" + user.save() + + self.login(NO_ACCESS_USER) + + # Change the subscription to a limited plan first + self.putJsonResponse(UserPlan, data=dict(plan="personal-2018")) + + # Create private repositories until we hit the plan limit + # The personal-2018 plan allows a limited number of private repos + for i in range(20): + try: + self.postJsonResponse( + RepositoryList, + data=dict( + namespace=NO_ACCESS_USER, + repository=f"private_{i}", + description="private repo to exhaust limit", + visibility="private", + ), + expected_code=201, + ) + except AssertionError: + # Hit the limit, which is expected + break + + test_repo = NO_ACCESS_USER + "/simple" + + # Create one more repository as public (should work) + self.postJsonResponse( + RepositoryList, + data=dict( + namespace=NO_ACCESS_USER, + repository="simple", + description="test repository", + visibility="public", + ), + expected_code=201, + ) + + # Verify the visibility. + json = self.getJsonResponse(Repository, params=dict(repository=test_repo)) + + self.assertEqual(True, json["is_public"]) + + # Try to make private - should be blocked by plan limit for regular users. + self.postJsonResponse( + RepositoryVisibility, + params=dict(repository=test_repo), + data=dict(visibility="private"), + expected_code=402, + ) + + # Verify the visibility stayed public (blocked by plan limit). + json = self.getJsonResponse(Repository, params=dict(repository=test_repo)) + + self.assertEqual(True, json["is_public"]) + class TestDeleteRepository(ApiTestCase): SIMPLE_REPO = ADMIN_ACCESS_USER + "/simple"