mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* fix: allow global readonly superusers to access all organization data without FULL_ACCESS (PROJQUAY-9798) This is a comprehensive fix for multiple endpoints where global readonly superusers were incorrectly blocked from accessing organization data when FEATURE_SUPERUSERS_FULL_ACCESS was set to false. Fixed endpoints in endpoints/api/logs.py: - OrgLogs.get() - Organization audit logs - OrgAggregateLogs.get() - Aggregated organization logs - ExportOrgLogs.post() - Export organization logs Fixed endpoints in endpoints/api/team.py: - TeamMemberList.get() - Team member list - TeamPermissions.get() - Team repository permissions Fixed endpoints in endpoints/api/organization.py: - OrganizationMemberList.get() - Organization member list - OrganizationMember.get() - Individual member details - OrganizationApplications.get() - OAuth application list - OrganizationApplication.get() - Individual application details Fixed endpoints in endpoints/api/prototype.py: - PermissionPrototypeList.get() - Default permission prototypes All endpoints now use consistent permission logic: permission.can() OR allow_if_global_readonly_superuser() OR allow_if_superuser_with_full_access() Added comprehensive tests verifying: 1. Global readonly superusers CAN access all data for auditing, regardless of FEATURE_SUPERUSERS_FULL_ACCESS setting 2. Regular superusers are still blocked when FEATURE_SUPERUSERS_FULL_ACCESS is false (correct behavior) * fix(test): ensure owners team exists for testorglogs org in test setup Addresses review feedback from PR #4549 comment #2539202868. The test was attempting to access the 'owners' team in 'testorglogs' org, but the fixture only created the organization without creating any teams. This could cause the test to receive a 404 (team not found) instead of 403 (permission denied), making it pass for the wrong reason. Also simplified the test logic to only expect 403 since the team now exists in the fixtures, ensuring the test validates permission blocking rather than missing resources.
651 lines
21 KiB
Python
651 lines
21 KiB
Python
"""
|
|
Create, list and manage an organization's teams.
|
|
"""
|
|
|
|
import json
|
|
from functools import wraps
|
|
|
|
from flask import request
|
|
|
|
import features
|
|
from app import app, authentication, avatar
|
|
from auth import scopes
|
|
from auth.auth_context import get_authenticated_user
|
|
from auth.permissions import (
|
|
AdministerOrganizationPermission,
|
|
GlobalReadOnlySuperUserPermission,
|
|
SuperUserPermission,
|
|
ViewTeamPermission,
|
|
)
|
|
from data import model
|
|
from data.database import Team
|
|
from endpoints.api import (
|
|
ApiResource,
|
|
allow_if_any_superuser,
|
|
allow_if_global_readonly_superuser,
|
|
allow_if_superuser,
|
|
allow_if_superuser_with_full_access,
|
|
format_date,
|
|
internal_only,
|
|
log_action,
|
|
nickname,
|
|
parse_args,
|
|
path_param,
|
|
query_param,
|
|
request_error,
|
|
require_fresh_login,
|
|
require_scope,
|
|
require_user_admin,
|
|
resource,
|
|
show_if,
|
|
validate_json_request,
|
|
verify_not_prod,
|
|
)
|
|
from endpoints.exception import InvalidRequest, NotFound, Unauthorized
|
|
from util.names import parse_robot_username
|
|
from util.parsing import truthy_bool
|
|
from util.useremails import send_org_invite_email
|
|
|
|
|
|
def permission_view(permission):
|
|
return {
|
|
"repository": {
|
|
"name": permission.repository.name,
|
|
"is_public": model.repository.is_repository_public(permission.repository),
|
|
},
|
|
"role": permission.role.name,
|
|
}
|
|
|
|
|
|
def try_accept_invite(code, user):
|
|
(team, inviter) = model.team.confirm_team_invite(code, user)
|
|
|
|
model.notification.delete_matching_notifications(
|
|
user, "org_team_invite", org=team.organization.username
|
|
)
|
|
|
|
orgname = team.organization.username
|
|
log_action(
|
|
"org_team_member_invite_accepted",
|
|
orgname,
|
|
{"member": user.username, "team": team.name, "inviter": inviter.username},
|
|
)
|
|
|
|
return team
|
|
|
|
|
|
def handle_addinvite_team(inviter, team, user=None, email=None):
|
|
requires_invite = features.MAILING and features.REQUIRE_TEAM_INVITE
|
|
invite = model.team.add_or_invite_to_team(
|
|
inviter, team, user, email, requires_invite=requires_invite
|
|
)
|
|
if not invite:
|
|
# User was added to the team directly.
|
|
return
|
|
|
|
orgname = team.organization.username
|
|
if user:
|
|
model.notification.create_notification(
|
|
"org_team_invite",
|
|
user,
|
|
metadata={
|
|
"code": invite.invite_token,
|
|
"inviter": inviter.username,
|
|
"org": orgname,
|
|
"team": team.name,
|
|
},
|
|
)
|
|
|
|
send_org_invite_email(
|
|
user.username if user else email,
|
|
user.email if user else email,
|
|
orgname,
|
|
team.name,
|
|
inviter.username,
|
|
invite.invite_token,
|
|
)
|
|
return invite
|
|
|
|
|
|
def team_view(orgname, team, is_new_team=False):
|
|
view_permission = ViewTeamPermission(orgname, team.name)
|
|
return {
|
|
"name": team.name,
|
|
"description": team.description,
|
|
"can_view": view_permission.can(),
|
|
"role": Team.role.get_name(team.role_id),
|
|
"avatar": avatar.get_data_for_team(team),
|
|
"new_team": is_new_team,
|
|
}
|
|
|
|
|
|
def member_view(member, invited=False):
|
|
return {
|
|
"name": member.username,
|
|
"kind": "user",
|
|
"is_robot": member.robot,
|
|
"avatar": avatar.get_data_for_user(member),
|
|
"invited": invited,
|
|
}
|
|
|
|
|
|
def invite_view(invite):
|
|
if invite.user:
|
|
return member_view(invite.user, invited=True)
|
|
else:
|
|
return {
|
|
"email": invite.email,
|
|
"kind": "invite",
|
|
"avatar": avatar.get_data(invite.email, invite.email, "user"),
|
|
"invited": True,
|
|
}
|
|
|
|
|
|
def disallow_for_synced_team(except_robots=False):
|
|
"""
|
|
Disallows the decorated operation for a team that is marked as being synced from an internal
|
|
auth provider such as LDAP.
|
|
|
|
If except_robots is True, then the operation is allowed if the member specified on the operation
|
|
is a robot account.
|
|
"""
|
|
|
|
def inner(func):
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
# Team syncing can only be enabled if we have a federated service.
|
|
if features.TEAM_SYNCING and authentication.federated_service:
|
|
orgname = kwargs["orgname"]
|
|
teamname = kwargs["teamname"]
|
|
if model.team.get_team_sync_information(orgname, teamname):
|
|
if not except_robots or not parse_robot_username(kwargs.get("membername", "")):
|
|
raise InvalidRequest("Cannot call this method on an auth-synced team")
|
|
|
|
return func(self, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return inner
|
|
|
|
|
|
disallow_nonrobots_for_synced_team = disallow_for_synced_team(except_robots=True)
|
|
disallow_all_for_synced_team = disallow_for_synced_team(except_robots=False)
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/team/<teamname>")
|
|
@path_param("orgname", "The name of the organization")
|
|
@path_param("teamname", "The name of the team")
|
|
class OrganizationTeam(ApiResource):
|
|
"""
|
|
Resource for manging an organization's teams.
|
|
"""
|
|
|
|
schemas = {
|
|
"TeamDescription": {
|
|
"type": "object",
|
|
"description": "Description of a team",
|
|
"required": [
|
|
"role",
|
|
],
|
|
"properties": {
|
|
"role": {
|
|
"type": "string",
|
|
"description": "Org wide permissions that should apply to the team",
|
|
"enum": [
|
|
"member",
|
|
"creator",
|
|
"admin",
|
|
],
|
|
},
|
|
"description": {
|
|
"type": "string",
|
|
"description": "Markdown description for the team",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("updateOrganizationTeam")
|
|
@validate_json_request("TeamDescription")
|
|
def put(self, orgname, teamname):
|
|
"""
|
|
Update the org-wide permission for the specified team.
|
|
"""
|
|
edit_permission = AdministerOrganizationPermission(orgname)
|
|
if edit_permission.can() or allow_if_superuser_with_full_access():
|
|
team = None
|
|
|
|
details = request.get_json()
|
|
is_existing = False
|
|
try:
|
|
team = model.team.get_organization_team(orgname, teamname)
|
|
is_existing = True
|
|
except model.InvalidTeamException:
|
|
# Create the new team.
|
|
description = details["description"] if "description" in details else ""
|
|
role = details["role"] if "role" in details else "member"
|
|
|
|
org = model.organization.get_organization(orgname)
|
|
team = model.team.create_team(teamname, org, role, description)
|
|
log_action("org_create_team", orgname, {"team": teamname})
|
|
|
|
if is_existing:
|
|
if "description" in details and team.description != details["description"]:
|
|
team.description = details["description"]
|
|
team.save()
|
|
log_action(
|
|
"org_set_team_description",
|
|
orgname,
|
|
{"team": teamname, "description": team.description},
|
|
)
|
|
|
|
if "role" in details:
|
|
role = Team.role.get_name(team.role_id)
|
|
if role != details["role"]:
|
|
team = model.team.set_team_org_permission(
|
|
team, details["role"], get_authenticated_user().username
|
|
)
|
|
log_action(
|
|
"org_set_team_role",
|
|
orgname,
|
|
{"team": teamname, "role": details["role"]},
|
|
)
|
|
|
|
return team_view(orgname, team, is_new_team=not is_existing), 200
|
|
|
|
raise Unauthorized()
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("deleteOrganizationTeam")
|
|
def delete(self, orgname, teamname):
|
|
"""
|
|
Delete the specified team.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can() or allow_if_superuser_with_full_access():
|
|
model.team.remove_team(orgname, teamname, get_authenticated_user().username)
|
|
log_action("org_delete_team", orgname, {"team": teamname})
|
|
return "", 204
|
|
|
|
raise Unauthorized()
|
|
|
|
|
|
def _syncing_setup_allowed(orgname):
|
|
"""
|
|
Returns whether syncing setup is allowed for the current user over the matching org.
|
|
"""
|
|
if not features.NONSUPERUSER_TEAM_SYNCING_SETUP and not SuperUserPermission().can():
|
|
return False
|
|
|
|
return AdministerOrganizationPermission(orgname).can()
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/team/<teamname>/syncing")
|
|
@path_param("orgname", "The name of the organization")
|
|
@path_param("teamname", "The name of the team")
|
|
@show_if(features.TEAM_SYNCING)
|
|
class OrganizationTeamSyncing(ApiResource):
|
|
"""
|
|
Resource for managing syncing of a team by a backing group.
|
|
"""
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@require_scope(scopes.SUPERUSER)
|
|
@nickname("enableOrganizationTeamSync")
|
|
@verify_not_prod
|
|
@require_fresh_login
|
|
def post(self, orgname, teamname):
|
|
if _syncing_setup_allowed(orgname):
|
|
try:
|
|
team = model.team.get_organization_team(orgname, teamname)
|
|
except model.InvalidTeamException:
|
|
raise NotFound()
|
|
|
|
config = request.get_json()
|
|
|
|
# Ensure that the specified config points to a valid group.
|
|
status, err = authentication.check_group_lookup_args(config)
|
|
if not status:
|
|
raise InvalidRequest("Could not sync to group: %s" % err)
|
|
|
|
# Set the team's syncing config.
|
|
model.team.set_team_syncing(team, authentication.federated_service, config)
|
|
log_action("enable_team_sync", orgname, {"team": teamname})
|
|
|
|
if app.config["AUTHENTICATION_TYPE"] == "OIDC":
|
|
# delete existing team members, team membership will be synced with OIDC group
|
|
model.team.delete_all_team_members(team)
|
|
|
|
return team_view(orgname, team)
|
|
|
|
raise Unauthorized()
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@require_scope(scopes.SUPERUSER)
|
|
@nickname("disableOrganizationTeamSync")
|
|
@verify_not_prod
|
|
@require_fresh_login
|
|
def delete(self, orgname, teamname):
|
|
if _syncing_setup_allowed(orgname):
|
|
try:
|
|
team = model.team.get_organization_team(orgname, teamname)
|
|
except model.InvalidTeamException:
|
|
raise NotFound()
|
|
|
|
model.team.remove_team_syncing(orgname, teamname)
|
|
log_action("disable_team_sync", orgname, {"team": teamname})
|
|
return team_view(orgname, team)
|
|
|
|
raise Unauthorized()
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/team/<teamname>/members")
|
|
@path_param("orgname", "The name of the organization")
|
|
@path_param("teamname", "The name of the team")
|
|
class TeamMemberList(ApiResource):
|
|
"""
|
|
Resource for managing the list of members for a team.
|
|
"""
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@parse_args()
|
|
@query_param(
|
|
"includePending", "Whether to include pending members", type=truthy_bool, default=False
|
|
)
|
|
@nickname("getOrganizationTeamMembers")
|
|
def get(self, orgname, teamname, parsed_args):
|
|
"""
|
|
Retrieve the list of members for the specified team.
|
|
"""
|
|
view_permission = ViewTeamPermission(orgname, teamname)
|
|
edit_permission = AdministerOrganizationPermission(orgname)
|
|
|
|
if (
|
|
view_permission.can()
|
|
or allow_if_global_readonly_superuser()
|
|
or allow_if_superuser_with_full_access()
|
|
):
|
|
team = None
|
|
try:
|
|
team = model.team.get_organization_team(orgname, teamname)
|
|
except model.InvalidTeamException:
|
|
raise NotFound()
|
|
|
|
members = model.organization.get_organization_team_members(team.id)
|
|
invites = []
|
|
|
|
if parsed_args["includePending"] and edit_permission.can():
|
|
invites = model.team.get_organization_team_member_invites(team.id)
|
|
|
|
data = {
|
|
"name": teamname,
|
|
"members": [member_view(m) for m in members] + [invite_view(i) for i in invites],
|
|
"can_edit": edit_permission.can(),
|
|
}
|
|
|
|
if features.TEAM_SYNCING and authentication.federated_service:
|
|
if _syncing_setup_allowed(orgname):
|
|
data["can_sync"] = {
|
|
"service": authentication.federated_service,
|
|
}
|
|
|
|
data["can_sync"].update(authentication.service_metadata())
|
|
|
|
sync_info = model.team.get_team_sync_information(orgname, teamname)
|
|
if sync_info is not None:
|
|
data["synced"] = {
|
|
"service": sync_info.service.name,
|
|
}
|
|
|
|
if features.NONSUPERUSER_TEAM_SYNCING_SETUP or SuperUserPermission().can():
|
|
data["synced"].update(
|
|
{
|
|
"last_updated": format_date(sync_info.last_updated),
|
|
"config": json.loads(sync_info.config),
|
|
}
|
|
)
|
|
|
|
return data
|
|
|
|
raise Unauthorized()
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/team/<teamname>/members/<membername>")
|
|
@path_param("orgname", "The name of the organization")
|
|
@path_param("teamname", "The name of the team")
|
|
@path_param("membername", "The username of the team member")
|
|
class TeamMember(ApiResource):
|
|
"""
|
|
Resource for managing individual members of a team.
|
|
"""
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("updateOrganizationTeamMember")
|
|
@disallow_nonrobots_for_synced_team
|
|
def put(self, orgname, teamname, membername):
|
|
"""
|
|
Adds or invites a member to an existing team.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can() or allow_if_superuser_with_full_access():
|
|
team = None
|
|
user = None
|
|
|
|
# Find the team.
|
|
try:
|
|
team = model.team.get_organization_team(orgname, teamname)
|
|
except model.InvalidTeamException:
|
|
raise NotFound()
|
|
|
|
# Find the user.
|
|
user = model.user.get_user(membername)
|
|
if not user:
|
|
raise request_error(message="Unknown user")
|
|
|
|
# Add or invite the user to the team.
|
|
inviter = get_authenticated_user()
|
|
invite = handle_addinvite_team(inviter, team, user=user)
|
|
if not invite:
|
|
log_action("org_add_team_member", orgname, {"member": membername, "team": teamname})
|
|
return member_view(user, invited=False)
|
|
|
|
# User was invited.
|
|
log_action(
|
|
"org_invite_team_member",
|
|
orgname,
|
|
{"user": membername, "member": membername, "team": teamname},
|
|
)
|
|
return member_view(user, invited=True)
|
|
|
|
raise Unauthorized()
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("deleteOrganizationTeamMember")
|
|
@disallow_nonrobots_for_synced_team
|
|
def delete(self, orgname, teamname, membername):
|
|
"""
|
|
Delete a member of a team.
|
|
|
|
If the user is merely invited to join the team, then the invite is removed instead.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can() or allow_if_superuser_with_full_access():
|
|
# Remote the user from the team.
|
|
invoking_user = get_authenticated_user().username
|
|
|
|
# Find the team.
|
|
try:
|
|
team = model.team.get_organization_team(orgname, teamname)
|
|
except model.InvalidTeamException:
|
|
raise NotFound()
|
|
|
|
# Find the member.
|
|
member = model.user.get_user(membername)
|
|
if not member:
|
|
raise NotFound()
|
|
|
|
# First attempt to delete an invite for the user to this team. If none found,
|
|
# then we try to remove the user directly.
|
|
if model.team.delete_team_user_invite(team, member):
|
|
log_action(
|
|
"org_delete_team_member_invite",
|
|
orgname,
|
|
{"user": membername, "team": teamname, "member": membername},
|
|
)
|
|
return "", 204
|
|
|
|
model.team.remove_user_from_team(orgname, teamname, membername, invoking_user)
|
|
if features.RH_MARKETPLACE:
|
|
org_id = model.organization.get_organization(orgname).id
|
|
model.organization_skus.remove_all_owner_subscriptions_from_org(member.id, org_id)
|
|
log_action("org_remove_team_member", orgname, {"member": membername, "team": teamname})
|
|
return "", 204
|
|
|
|
raise Unauthorized()
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/team/<teamname>/invite/<email>")
|
|
@show_if(features.MAILING)
|
|
class InviteTeamMember(ApiResource):
|
|
"""
|
|
Resource for inviting a team member via email address.
|
|
"""
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("inviteTeamMemberEmail")
|
|
@disallow_all_for_synced_team
|
|
def put(self, orgname, teamname, email):
|
|
"""
|
|
Invites an email address to an existing team.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can() or allow_if_superuser_with_full_access():
|
|
team = None
|
|
|
|
# Find the team.
|
|
try:
|
|
team = model.team.get_organization_team(orgname, teamname)
|
|
except model.InvalidTeamException:
|
|
raise NotFound()
|
|
|
|
# Invite the email to the team.
|
|
inviter = get_authenticated_user()
|
|
invite = handle_addinvite_team(inviter, team, email=email)
|
|
log_action(
|
|
"org_invite_team_member",
|
|
orgname,
|
|
{"email": email, "team": teamname, "member": email},
|
|
)
|
|
return invite_view(invite)
|
|
|
|
raise Unauthorized()
|
|
|
|
@require_scope(scopes.ORG_ADMIN)
|
|
@nickname("deleteTeamMemberEmailInvite")
|
|
def delete(self, orgname, teamname, email):
|
|
"""
|
|
Delete an invite of an email address to join a team.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if permission.can() or allow_if_superuser_with_full_access():
|
|
team = None
|
|
|
|
# Find the team.
|
|
try:
|
|
team = model.team.get_organization_team(orgname, teamname)
|
|
except model.InvalidTeamException:
|
|
raise NotFound()
|
|
|
|
# Delete the invite.
|
|
if not model.team.delete_team_email_invite(team, email):
|
|
raise NotFound()
|
|
|
|
log_action(
|
|
"org_delete_team_member_invite",
|
|
orgname,
|
|
{"email": email, "team": teamname, "member": email},
|
|
)
|
|
return "", 204
|
|
|
|
raise Unauthorized()
|
|
|
|
|
|
@resource("/v1/organization/<orgname>/team/<teamname>/permissions")
|
|
@path_param("orgname", "The name of the organization")
|
|
@path_param("teamname", "The name of the team")
|
|
class TeamPermissions(ApiResource):
|
|
"""
|
|
Resource for listing the permissions an org's team has in the system.
|
|
"""
|
|
|
|
@nickname("getOrganizationTeamPermissions")
|
|
def get(self, orgname, teamname):
|
|
"""
|
|
Returns the list of repository permissions for the org's team.
|
|
"""
|
|
permission = AdministerOrganizationPermission(orgname)
|
|
if (
|
|
permission.can()
|
|
or allow_if_global_readonly_superuser()
|
|
or allow_if_superuser_with_full_access()
|
|
):
|
|
try:
|
|
team = model.team.get_organization_team(orgname, teamname)
|
|
except model.InvalidTeamException:
|
|
raise NotFound()
|
|
|
|
permissions = model.permission.list_team_permissions(team)
|
|
|
|
return {"permissions": [permission_view(permission) for permission in permissions]}
|
|
|
|
raise Unauthorized()
|
|
|
|
|
|
@resource("/v1/teaminvite/<code>")
|
|
@internal_only
|
|
@show_if(features.MAILING)
|
|
class TeamMemberInvite(ApiResource):
|
|
"""
|
|
Resource for managing invites to join a team.
|
|
"""
|
|
|
|
@require_user_admin()
|
|
@nickname("acceptOrganizationTeamInvite")
|
|
def put(self, code):
|
|
"""
|
|
Accepts an invite to join a team in an organization.
|
|
"""
|
|
# Accept the invite for the current user.
|
|
team = try_accept_invite(code, get_authenticated_user())
|
|
if not team:
|
|
raise NotFound()
|
|
|
|
orgname = team.organization.username
|
|
return {"org": orgname, "team": team.name}
|
|
|
|
@nickname("declineOrganizationTeamInvite")
|
|
@require_user_admin()
|
|
def delete(self, code):
|
|
"""
|
|
Delete an existing invitation to join a team.
|
|
"""
|
|
(team, inviter) = model.team.delete_team_invite(code, user_obj=get_authenticated_user())
|
|
|
|
model.notification.delete_matching_notifications(
|
|
get_authenticated_user(), "org_team_invite", code=code
|
|
)
|
|
|
|
orgname = team.organization.username
|
|
log_action(
|
|
"org_team_member_invite_declined",
|
|
orgname,
|
|
{
|
|
"member": get_authenticated_user().username,
|
|
"team": team.name,
|
|
"inviter": inviter.username,
|
|
},
|
|
)
|
|
|
|
return "", 204
|