mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* fix: allow global readonly superusers to access all organization data without FULL_ACCESS (PROJQUAY-9798) This is a comprehensive fix for multiple endpoints where global readonly superusers were incorrectly blocked from accessing organization data when FEATURE_SUPERUSERS_FULL_ACCESS was set to false. Fixed endpoints in endpoints/api/logs.py: - OrgLogs.get() - Organization audit logs - OrgAggregateLogs.get() - Aggregated organization logs - ExportOrgLogs.post() - Export organization logs Fixed endpoints in endpoints/api/team.py: - TeamMemberList.get() - Team member list - TeamPermissions.get() - Team repository permissions Fixed endpoints in endpoints/api/organization.py: - OrganizationMemberList.get() - Organization member list - OrganizationMember.get() - Individual member details - OrganizationApplications.get() - OAuth application list - OrganizationApplication.get() - Individual application details Fixed endpoints in endpoints/api/prototype.py: - PermissionPrototypeList.get() - Default permission prototypes All endpoints now use consistent permission logic: permission.can() OR allow_if_global_readonly_superuser() OR allow_if_superuser_with_full_access() Added comprehensive tests verifying: 1. Global readonly superusers CAN access all data for auditing, regardless of FEATURE_SUPERUSERS_FULL_ACCESS setting 2. Regular superusers are still blocked when FEATURE_SUPERUSERS_FULL_ACCESS is false (correct behavior) * fix(test): ensure owners team exists for testorglogs org in test setup Addresses review feedback from PR #4549 comment #2539202868. The test was attempting to access the 'owners' team in 'testorglogs' org, but the fixture only created the organization without creating any teams. This could cause the test to receive a 404 (team not found) instead of 403 (permission denied), making it pass for the wrong reason. Also simplified the test logic to only expect 403 since the team now exists in the fixtures, ensuring the test validates permission blocking rather than missing resources.
324 lines
11 KiB
Python
324 lines
11 KiB
Python
"""
|
|
Manage default permissions added to repositories.
|
|
"""
|
|
|
|
from flask import request
|
|
|
|
import features
|
|
from app import avatar
|
|
from auth import scopes
|
|
from auth.auth_context import get_authenticated_user
|
|
from auth.permissions import AdministerOrganizationPermission
|
|
from data import model
|
|
from endpoints.api import (
|
|
ApiResource,
|
|
allow_if_any_superuser,
|
|
allow_if_global_readonly_superuser,
|
|
allow_if_superuser,
|
|
allow_if_superuser_with_full_access,
|
|
log_action,
|
|
nickname,
|
|
path_param,
|
|
request_error,
|
|
require_scope,
|
|
resource,
|
|
validate_json_request,
|
|
)
|
|
from endpoints.exception import NotFound, Unauthorized
|
|
|
|
|
|
def prototype_view(proto, org_members):
|
|
def prototype_user_view(user):
|
|
return {
|
|
"name": user.username,
|
|
"is_robot": user.robot,
|
|
"kind": "user",
|
|
"is_org_member": user.robot or user.username in org_members,
|
|
"avatar": avatar.get_data_for_user(user),
|
|
}
|
|
|
|
if proto.delegate_user:
|
|
delegate_view = prototype_user_view(proto.delegate_user)
|
|
else:
|
|
delegate_view = {
|
|
"name": proto.delegate_team.name,
|
|
"kind": "team",
|
|
"avatar": avatar.get_data_for_team(proto.delegate_team),
|
|
}
|
|
|
|
return {
|
|
"activating_user": (
|
|
prototype_user_view(proto.activating_user) if proto.activating_user else None
|
|
),
|
|
"delegate": delegate_view,
|
|
"role": proto.role.name,
|
|
"id": proto.uuid,
|
|
}
|
|
|
|
|
|
def log_prototype_action(action_kind, orgname, prototype, **kwargs):
|
|
username = get_authenticated_user().username
|
|
log_params = {
|
|
"prototypeid": prototype.uuid,
|
|
"username": username,
|
|
"activating_username": (
|
|
prototype.activating_user.username if prototype.activating_user else None
|
|
),
|
|
"role": prototype.role.name,
|
|
}
|
|
|
|
for key, value in list(kwargs.items()):
|
|
log_params[key] = value
|
|
|
|
if prototype.delegate_user:
|
|
log_params["delegate_user"] = prototype.delegate_user.username
|
|
elif prototype.delegate_team:
|
|
log_params["delegate_team"] = prototype.delegate_team.name
|
|
|
|
log_action(action_kind, orgname, log_params)
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/prototypes")
|
|
@path_param("orgname", "The name of the organization")
|
|
class PermissionPrototypeList(ApiResource):
|
|
"""
|
|
Resource for listing and creating permission prototypes.
|
|
"""
|
|
|
|
schemas = {
|
|
"NewPrototype": {
|
|
"type": "object",
|
|
"description": "Description of a new prototype",
|
|
"required": [
|
|
"role",
|
|
"delegate",
|
|
],
|
|
"properties": {
|
|
"role": {
|
|
"type": "string",
|
|
"description": "Role that should be applied to the delegate",
|
|
"enum": [
|
|
"read",
|
|
"write",
|
|
"admin",
|
|
],
|
|
},
|
|
"activating_user": {
|
|
"type": "object",
|
|
"description": "Repository creating user to whom the rule should apply",
|
|
"required": [
|
|
"name",
|
|
],
|
|
"properties": {
|
|
"name": {
|
|
"type": "string",
|
|
"description": "The username for the activating_user",
|
|
},
|
|
},
|
|
},
|
|
"delegate": {
|
|
"type": "object",
|
|
"description": "Information about the user or team to which the rule grants access",
|
|
"required": [
|
|
"name",
|
|
"kind",
|
|
],
|
|
"properties": {
|
|
"name": {
|
|
"type": "string",
|
|
"description": "The name for the delegate team or user",
|
|
},
|
|
"kind": {
|
|
"type": "string",
|
|
"description": "Whether the delegate is a user or a team",
|
|
"enum": [
|
|
"user",
|
|
"team",
|
|
],
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("getOrganizationPrototypePermissions")
|
|
def get(self, orgname):
|
|
"""
|
|
List the existing prototypes for this organization.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if (
|
|
permission.can()
|
|
or allow_if_global_readonly_superuser()
|
|
or allow_if_superuser_with_full_access()
|
|
):
|
|
try:
|
|
org = model.organization.get_organization(orgname)
|
|
except model.InvalidOrganizationException:
|
|
raise NotFound()
|
|
|
|
permissions = model.permission.get_prototype_permissions(org)
|
|
|
|
users_filter = {p.activating_user for p in permissions} | {
|
|
p.delegate_user for p in permissions
|
|
}
|
|
org_members = model.organization.get_organization_member_set(
|
|
org, users_filter=users_filter
|
|
)
|
|
return {"prototypes": [prototype_view(p, org_members) for p in permissions]}
|
|
|
|
raise Unauthorized()
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("createOrganizationPrototypePermission")
|
|
@validate_json_request("NewPrototype")
|
|
def post(self, orgname):
|
|
"""
|
|
Create a new permission prototype.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can() or allow_if_superuser_with_full_access():
|
|
try:
|
|
org = model.organization.get_organization(orgname)
|
|
except model.InvalidOrganizationException:
|
|
raise NotFound()
|
|
|
|
details = request.get_json()
|
|
activating_username = None
|
|
|
|
if (
|
|
"activating_user" in details
|
|
and details["activating_user"]
|
|
and "name" in details["activating_user"]
|
|
):
|
|
activating_username = details["activating_user"]["name"]
|
|
|
|
delegate = details["delegate"] if "delegate" in details else {}
|
|
delegate_kind = delegate.get("kind", None)
|
|
delegate_name = delegate.get("name", None)
|
|
|
|
delegate_username = delegate_name if delegate_kind == "user" else None
|
|
delegate_teamname = delegate_name if delegate_kind == "team" else None
|
|
|
|
activating_user = (
|
|
model.user.get_user(activating_username) if activating_username else None
|
|
)
|
|
delegate_user = model.user.get_user(delegate_username) if delegate_username else None
|
|
delegate_team = (
|
|
model.team.get_organization_team(orgname, delegate_teamname)
|
|
if delegate_teamname
|
|
else None
|
|
)
|
|
|
|
if activating_username and not activating_user:
|
|
raise request_error(message="Unknown activating user")
|
|
|
|
if not delegate_user and not delegate_team:
|
|
raise request_error(message="Missing delegate user or team")
|
|
|
|
role_name = details["role"]
|
|
|
|
prototype = model.permission.add_prototype_permission(
|
|
org, role_name, activating_user, delegate_user, delegate_team
|
|
)
|
|
log_prototype_action("create_prototype_permission", orgname, prototype)
|
|
|
|
users_filter = {prototype.activating_user, prototype.delegate_user}
|
|
org_members = model.organization.get_organization_member_set(
|
|
org, users_filter=users_filter
|
|
)
|
|
return prototype_view(prototype, org_members)
|
|
|
|
raise Unauthorized()
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/prototypes/<prototypeid>")
|
|
@path_param("orgname", "The name of the organization")
|
|
@path_param("prototypeid", "The ID of the prototype")
|
|
class PermissionPrototype(ApiResource):
|
|
"""
|
|
Resource for managinging individual permission prototypes.
|
|
"""
|
|
|
|
schemas = {
|
|
"PrototypeUpdate": {
|
|
"type": "object",
|
|
"description": "Description of a the new prototype role",
|
|
"required": [
|
|
"role",
|
|
],
|
|
"properties": {
|
|
"role": {
|
|
"type": "string",
|
|
"description": "Role that should be applied to the permission",
|
|
"enum": [
|
|
"read",
|
|
"write",
|
|
"admin",
|
|
],
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("deleteOrganizationPrototypePermission")
|
|
def delete(self, orgname, prototypeid):
|
|
"""
|
|
Delete an existing permission prototype.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can() or allow_if_superuser_with_full_access():
|
|
try:
|
|
org = model.organization.get_organization(orgname)
|
|
except model.InvalidOrganizationException:
|
|
raise NotFound()
|
|
|
|
prototype = model.permission.delete_prototype_permission(org, prototypeid)
|
|
if not prototype:
|
|
raise NotFound()
|
|
|
|
log_prototype_action("delete_prototype_permission", orgname, prototype)
|
|
|
|
return "", 204
|
|
|
|
raise Unauthorized()
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("updateOrganizationPrototypePermission")
|
|
@validate_json_request("PrototypeUpdate")
|
|
def put(self, orgname, prototypeid):
|
|
"""
|
|
Update the role of an existing permission prototype.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can() or allow_if_superuser_with_full_access():
|
|
try:
|
|
org = model.organization.get_organization(orgname)
|
|
except model.InvalidOrganizationException:
|
|
raise NotFound()
|
|
|
|
existing = model.permission.get_prototype_permission(org, prototypeid)
|
|
if not existing:
|
|
raise NotFound()
|
|
|
|
details = request.get_json()
|
|
role_name = details["role"]
|
|
prototype = model.permission.update_prototype_permission(org, prototypeid, role_name)
|
|
if not prototype:
|
|
raise NotFound()
|
|
|
|
log_prototype_action(
|
|
"modify_prototype_permission", orgname, prototype, original_role=existing.role.name
|
|
)
|
|
|
|
users_filter = {prototype.activating_user, prototype.delegate_user}
|
|
org_members = model.organization.get_organization_member_set(
|
|
org, users_filter=users_filter
|
|
)
|
|
return prototype_view(prototype, org_members)
|
|
|
|
raise Unauthorized()
|