1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/prototype.py
Dave O'Connor fbfd20b2bc fix: allow global readonly superusers to access all organization data without FULL_ACCESS (PROJQUAY-9798) (#4549)
* fix: allow global readonly superusers to access all organization data without FULL_ACCESS (PROJQUAY-9798)

This is a comprehensive fix for multiple endpoints where global readonly superusers
were incorrectly blocked from accessing organization data when
FEATURE_SUPERUSERS_FULL_ACCESS was set to false.

Fixed endpoints in endpoints/api/logs.py:
- OrgLogs.get() - Organization audit logs
- OrgAggregateLogs.get() - Aggregated organization logs
- ExportOrgLogs.post() - Export organization logs

Fixed endpoints in endpoints/api/team.py:
- TeamMemberList.get() - Team member list
- TeamPermissions.get() - Team repository permissions

Fixed endpoints in endpoints/api/organization.py:
- OrganizationMemberList.get() - Organization member list
- OrganizationMember.get() - Individual member details
- OrganizationApplications.get() - OAuth application list
- OrganizationApplication.get() - Individual application details

Fixed endpoints in endpoints/api/prototype.py:
- PermissionPrototypeList.get() - Default permission prototypes

All endpoints now use consistent permission logic:
  permission.can() OR
  allow_if_global_readonly_superuser() OR
  allow_if_superuser_with_full_access()

Added comprehensive tests verifying:
1. Global readonly superusers CAN access all data for auditing, regardless
   of FEATURE_SUPERUSERS_FULL_ACCESS setting
2. Regular superusers are still blocked when FEATURE_SUPERUSERS_FULL_ACCESS
   is false (correct behavior)

* fix(test): ensure owners team exists for testorglogs org in test setup

Addresses review feedback from PR #4549 comment #2539202868.

The test was attempting to access the 'owners' team in 'testorglogs'
org, but the fixture only created the organization without creating
any teams. This could cause the test to receive a 404 (team not found)
instead of 403 (permission denied), making it pass for the wrong reason.

Also simplified the test logic to only expect 403 since the team now
exists in the fixtures, ensuring the test validates permission blocking
rather than missing resources.
2025-11-18 14:57:04 -05:00

324 lines
11 KiB
Python

"""
Manage default permissions added to repositories.
"""
from flask import request
import features
from app import avatar
from auth import scopes
from auth.auth_context import get_authenticated_user
from auth.permissions import AdministerOrganizationPermission
from data import model
from endpoints.api import (
ApiResource,
allow_if_any_superuser,
allow_if_global_readonly_superuser,
allow_if_superuser,
allow_if_superuser_with_full_access,
log_action,
nickname,
path_param,
request_error,
require_scope,
resource,
validate_json_request,
)
from endpoints.exception import NotFound, Unauthorized
def prototype_view(proto, org_members):
def prototype_user_view(user):
return {
"name": user.username,
"is_robot": user.robot,
"kind": "user",
"is_org_member": user.robot or user.username in org_members,
"avatar": avatar.get_data_for_user(user),
}
if proto.delegate_user:
delegate_view = prototype_user_view(proto.delegate_user)
else:
delegate_view = {
"name": proto.delegate_team.name,
"kind": "team",
"avatar": avatar.get_data_for_team(proto.delegate_team),
}
return {
"activating_user": (
prototype_user_view(proto.activating_user) if proto.activating_user else None
),
"delegate": delegate_view,
"role": proto.role.name,
"id": proto.uuid,
}
def log_prototype_action(action_kind, orgname, prototype, **kwargs):
username = get_authenticated_user().username
log_params = {
"prototypeid": prototype.uuid,
"username": username,
"activating_username": (
prototype.activating_user.username if prototype.activating_user else None
),
"role": prototype.role.name,
}
for key, value in list(kwargs.items()):
log_params[key] = value
if prototype.delegate_user:
log_params["delegate_user"] = prototype.delegate_user.username
elif prototype.delegate_team:
log_params["delegate_team"] = prototype.delegate_team.name
log_action(action_kind, orgname, log_params)
@resource("/v1/organization/<orgname>/prototypes")
@path_param("orgname", "The name of the organization")
class PermissionPrototypeList(ApiResource):
"""
Resource for listing and creating permission prototypes.
"""
schemas = {
"NewPrototype": {
"type": "object",
"description": "Description of a new prototype",
"required": [
"role",
"delegate",
],
"properties": {
"role": {
"type": "string",
"description": "Role that should be applied to the delegate",
"enum": [
"read",
"write",
"admin",
],
},
"activating_user": {
"type": "object",
"description": "Repository creating user to whom the rule should apply",
"required": [
"name",
],
"properties": {
"name": {
"type": "string",
"description": "The username for the activating_user",
},
},
},
"delegate": {
"type": "object",
"description": "Information about the user or team to which the rule grants access",
"required": [
"name",
"kind",
],
"properties": {
"name": {
"type": "string",
"description": "The name for the delegate team or user",
},
"kind": {
"type": "string",
"description": "Whether the delegate is a user or a team",
"enum": [
"user",
"team",
],
},
},
},
},
},
}
@require_scope(scopes.ORG_ADMIN)
@nickname("getOrganizationPrototypePermissions")
def get(self, orgname):
"""
List the existing prototypes for this organization.
"""
permission = AdministerOrganizationPermission(orgname)
if (
permission.can()
or allow_if_global_readonly_superuser()
or allow_if_superuser_with_full_access()
):
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
permissions = model.permission.get_prototype_permissions(org)
users_filter = {p.activating_user for p in permissions} | {
p.delegate_user for p in permissions
}
org_members = model.organization.get_organization_member_set(
org, users_filter=users_filter
)
return {"prototypes": [prototype_view(p, org_members) for p in permissions]}
raise Unauthorized()
@require_scope(scopes.ORG_ADMIN)
@nickname("createOrganizationPrototypePermission")
@validate_json_request("NewPrototype")
def post(self, orgname):
"""
Create a new permission prototype.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can() or allow_if_superuser_with_full_access():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
details = request.get_json()
activating_username = None
if (
"activating_user" in details
and details["activating_user"]
and "name" in details["activating_user"]
):
activating_username = details["activating_user"]["name"]
delegate = details["delegate"] if "delegate" in details else {}
delegate_kind = delegate.get("kind", None)
delegate_name = delegate.get("name", None)
delegate_username = delegate_name if delegate_kind == "user" else None
delegate_teamname = delegate_name if delegate_kind == "team" else None
activating_user = (
model.user.get_user(activating_username) if activating_username else None
)
delegate_user = model.user.get_user(delegate_username) if delegate_username else None
delegate_team = (
model.team.get_organization_team(orgname, delegate_teamname)
if delegate_teamname
else None
)
if activating_username and not activating_user:
raise request_error(message="Unknown activating user")
if not delegate_user and not delegate_team:
raise request_error(message="Missing delegate user or team")
role_name = details["role"]
prototype = model.permission.add_prototype_permission(
org, role_name, activating_user, delegate_user, delegate_team
)
log_prototype_action("create_prototype_permission", orgname, prototype)
users_filter = {prototype.activating_user, prototype.delegate_user}
org_members = model.organization.get_organization_member_set(
org, users_filter=users_filter
)
return prototype_view(prototype, org_members)
raise Unauthorized()
@resource("/v1/organization/<orgname>/prototypes/<prototypeid>")
@path_param("orgname", "The name of the organization")
@path_param("prototypeid", "The ID of the prototype")
class PermissionPrototype(ApiResource):
"""
Resource for managinging individual permission prototypes.
"""
schemas = {
"PrototypeUpdate": {
"type": "object",
"description": "Description of a the new prototype role",
"required": [
"role",
],
"properties": {
"role": {
"type": "string",
"description": "Role that should be applied to the permission",
"enum": [
"read",
"write",
"admin",
],
},
},
},
}
@require_scope(scopes.ORG_ADMIN)
@nickname("deleteOrganizationPrototypePermission")
def delete(self, orgname, prototypeid):
"""
Delete an existing permission prototype.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can() or allow_if_superuser_with_full_access():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
prototype = model.permission.delete_prototype_permission(org, prototypeid)
if not prototype:
raise NotFound()
log_prototype_action("delete_prototype_permission", orgname, prototype)
return "", 204
raise Unauthorized()
@require_scope(scopes.ORG_ADMIN)
@nickname("updateOrganizationPrototypePermission")
@validate_json_request("PrototypeUpdate")
def put(self, orgname, prototypeid):
"""
Update the role of an existing permission prototype.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can() or allow_if_superuser_with_full_access():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
existing = model.permission.get_prototype_permission(org, prototypeid)
if not existing:
raise NotFound()
details = request.get_json()
role_name = details["role"]
prototype = model.permission.update_prototype_permission(org, prototypeid, role_name)
if not prototype:
raise NotFound()
log_prototype_action(
"modify_prototype_permission", orgname, prototype, original_role=existing.role.name
)
users_filter = {prototype.activating_user, prototype.delegate_user}
org_members = model.organization.get_organization_member_set(
org, users_filter=users_filter
)
return prototype_view(prototype, org_members)
raise Unauthorized()