1
0
mirror of https://github.com/quay/quay.git synced 2025-11-16 11:42:27 +03:00
Files
quay/endpoints/api/prototype.py
Dave O'Connor 2511b45e89 fix(api): superuser panel access without SUPERUSERS_FULL_ACCESS (PROJQUAY-9693) (#4455)
fix(api): implement proper superuser permission model and fix access controls

Fixes multiple issues with superuser functionality and implements a comprehensive
permission model for FEATURE_SUPERUSERS_FULL_ACCESS:

**Permission Model:**
- Global Readonly Superusers (auditors): Always have read access to all content,
  independent of FEATURE_SUPERUSERS_FULL_ACCESS setting
- Regular Superusers: Can access /v1/superuser endpoints and their own content.
  Require FEATURE_SUPERUSERS_FULL_ACCESS=true for cross-namespace read access
- Full Access Superusers: Regular superusers with FULL_ACCESS enabled, can
  perform CRUD on content they don't own
- Write operations: Only allowed for full access superusers (global readonly
  superusers never get write access)

**Key Fixes:**
1. Fixed superuser panel endpoints returning 403 when FULL_ACCESS was disabled.
   Basic panel operations (user list, logs, org list, messages) now work with
   just FEATURE_SUPER_USERS enabled.

2. Updated decorators to properly differentiate between basic superuser
   operations and permission bypass operations.

3. Implemented license bypass: Superusers with FULL_ACCESS now bypass
   license/quota limits when creating or modifying private repositories.

4. Fixed 18 permission checks across 7 files to properly implement cross-namespace
   access controls for different superuser types.

**Changes:**
- endpoints/api/__init__.py: Fixed allow_if_superuser(), require_repo_permission, and decorators
- endpoints/api/superuser.py: Updated SuperUserAppTokens permission check
- endpoints/api/organization.py: Updated 4 GET endpoints to require FULL_ACCESS
- endpoints/api/namespacequota.py: Updated 2 GET endpoints to require FULL_ACCESS
- endpoints/api/team.py: Updated 2 GET endpoints to require FULL_ACCESS
- endpoints/api/prototype.py: Updated 1 GET endpoint to require FULL_ACCESS
- endpoints/api/policy.py: Updated auto-prune policy endpoints
- endpoints/api/robot.py: Updated robot endpoints
- endpoints/api/build.py: Updated repository build logs
- endpoints/api/repository.py: Added license bypass for superusers with FULL_ACCESS
- endpoints/api/repository_models_pre_oci.py: Updated repository visibility query
- endpoints/api/logs.py: Fixed log access to require FULL_ACCESS for permission bypass
- endpoints/api/test/test_superuser_full_access.py: Added comprehensive test suite
- endpoints/api/test/test_appspecifictoken.py: Updated test mocking and added 403 test
- test/test_api_usage.py: Updated test expectations for license bypass behavior

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-13 09:38:11 -05:00

320 lines
11 KiB
Python

"""
Manage default permissions added to repositories.
"""
from flask import request
import features
from app import avatar
from auth import scopes
from auth.auth_context import get_authenticated_user
from auth.permissions import AdministerOrganizationPermission
from data import model
from endpoints.api import (
ApiResource,
allow_if_any_superuser,
allow_if_global_readonly_superuser,
allow_if_superuser,
allow_if_superuser_with_full_access,
log_action,
nickname,
path_param,
request_error,
require_scope,
resource,
validate_json_request,
)
from endpoints.exception import NotFound, Unauthorized
def prototype_view(proto, org_members):
def prototype_user_view(user):
return {
"name": user.username,
"is_robot": user.robot,
"kind": "user",
"is_org_member": user.robot or user.username in org_members,
"avatar": avatar.get_data_for_user(user),
}
if proto.delegate_user:
delegate_view = prototype_user_view(proto.delegate_user)
else:
delegate_view = {
"name": proto.delegate_team.name,
"kind": "team",
"avatar": avatar.get_data_for_team(proto.delegate_team),
}
return {
"activating_user": (
prototype_user_view(proto.activating_user) if proto.activating_user else None
),
"delegate": delegate_view,
"role": proto.role.name,
"id": proto.uuid,
}
def log_prototype_action(action_kind, orgname, prototype, **kwargs):
username = get_authenticated_user().username
log_params = {
"prototypeid": prototype.uuid,
"username": username,
"activating_username": (
prototype.activating_user.username if prototype.activating_user else None
),
"role": prototype.role.name,
}
for key, value in list(kwargs.items()):
log_params[key] = value
if prototype.delegate_user:
log_params["delegate_user"] = prototype.delegate_user.username
elif prototype.delegate_team:
log_params["delegate_team"] = prototype.delegate_team.name
log_action(action_kind, orgname, log_params)
@resource("/v1/organization/<orgname>/prototypes")
@path_param("orgname", "The name of the organization")
class PermissionPrototypeList(ApiResource):
"""
Resource for listing and creating permission prototypes.
"""
schemas = {
"NewPrototype": {
"type": "object",
"description": "Description of a new prototype",
"required": [
"role",
"delegate",
],
"properties": {
"role": {
"type": "string",
"description": "Role that should be applied to the delegate",
"enum": [
"read",
"write",
"admin",
],
},
"activating_user": {
"type": "object",
"description": "Repository creating user to whom the rule should apply",
"required": [
"name",
],
"properties": {
"name": {
"type": "string",
"description": "The username for the activating_user",
},
},
},
"delegate": {
"type": "object",
"description": "Information about the user or team to which the rule grants access",
"required": [
"name",
"kind",
],
"properties": {
"name": {
"type": "string",
"description": "The name for the delegate team or user",
},
"kind": {
"type": "string",
"description": "Whether the delegate is a user or a team",
"enum": [
"user",
"team",
],
},
},
},
},
},
}
@require_scope(scopes.ORG_ADMIN)
@nickname("getOrganizationPrototypePermissions")
def get(self, orgname):
"""
List the existing prototypes for this organization.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can() or (features.SUPERUSERS_FULL_ACCESS and allow_if_any_superuser()):
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
permissions = model.permission.get_prototype_permissions(org)
users_filter = {p.activating_user for p in permissions} | {
p.delegate_user for p in permissions
}
org_members = model.organization.get_organization_member_set(
org, users_filter=users_filter
)
return {"prototypes": [prototype_view(p, org_members) for p in permissions]}
raise Unauthorized()
@require_scope(scopes.ORG_ADMIN)
@nickname("createOrganizationPrototypePermission")
@validate_json_request("NewPrototype")
def post(self, orgname):
"""
Create a new permission prototype.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can() or allow_if_superuser_with_full_access():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
details = request.get_json()
activating_username = None
if (
"activating_user" in details
and details["activating_user"]
and "name" in details["activating_user"]
):
activating_username = details["activating_user"]["name"]
delegate = details["delegate"] if "delegate" in details else {}
delegate_kind = delegate.get("kind", None)
delegate_name = delegate.get("name", None)
delegate_username = delegate_name if delegate_kind == "user" else None
delegate_teamname = delegate_name if delegate_kind == "team" else None
activating_user = (
model.user.get_user(activating_username) if activating_username else None
)
delegate_user = model.user.get_user(delegate_username) if delegate_username else None
delegate_team = (
model.team.get_organization_team(orgname, delegate_teamname)
if delegate_teamname
else None
)
if activating_username and not activating_user:
raise request_error(message="Unknown activating user")
if not delegate_user and not delegate_team:
raise request_error(message="Missing delegate user or team")
role_name = details["role"]
prototype = model.permission.add_prototype_permission(
org, role_name, activating_user, delegate_user, delegate_team
)
log_prototype_action("create_prototype_permission", orgname, prototype)
users_filter = {prototype.activating_user, prototype.delegate_user}
org_members = model.organization.get_organization_member_set(
org, users_filter=users_filter
)
return prototype_view(prototype, org_members)
raise Unauthorized()
@resource("/v1/organization/<orgname>/prototypes/<prototypeid>")
@path_param("orgname", "The name of the organization")
@path_param("prototypeid", "The ID of the prototype")
class PermissionPrototype(ApiResource):
"""
Resource for managinging individual permission prototypes.
"""
schemas = {
"PrototypeUpdate": {
"type": "object",
"description": "Description of a the new prototype role",
"required": [
"role",
],
"properties": {
"role": {
"type": "string",
"description": "Role that should be applied to the permission",
"enum": [
"read",
"write",
"admin",
],
},
},
},
}
@require_scope(scopes.ORG_ADMIN)
@nickname("deleteOrganizationPrototypePermission")
def delete(self, orgname, prototypeid):
"""
Delete an existing permission prototype.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can() or allow_if_superuser_with_full_access():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
prototype = model.permission.delete_prototype_permission(org, prototypeid)
if not prototype:
raise NotFound()
log_prototype_action("delete_prototype_permission", orgname, prototype)
return "", 204
raise Unauthorized()
@require_scope(scopes.ORG_ADMIN)
@nickname("updateOrganizationPrototypePermission")
@validate_json_request("PrototypeUpdate")
def put(self, orgname, prototypeid):
"""
Update the role of an existing permission prototype.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can() or allow_if_superuser_with_full_access():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
existing = model.permission.get_prototype_permission(org, prototypeid)
if not existing:
raise NotFound()
details = request.get_json()
role_name = details["role"]
prototype = model.permission.update_prototype_permission(org, prototypeid, role_name)
if not prototype:
raise NotFound()
log_prototype_action(
"modify_prototype_permission", orgname, prototype, original_role=existing.role.name
)
users_filter = {prototype.activating_user, prototype.delegate_user}
org_members = model.organization.get_organization_member_set(
org, users_filter=users_filter
)
return prototype_view(prototype, org_members)
raise Unauthorized()