1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/permission.py
Brandon Caton cad8326d4a api: adding permissions for global readonly superuser (PROJQUAY-7177) (#2993)
The global readonly superuser is missing read only permissions on certain GET api's. This adds those permissions.
2024-07-09 13:17:26 -04:00

269 lines
8.9 KiB
Python

"""
Manage repository permissions.
"""
import logging
from flask import request
from .permission_models_interface import DeleteException, SaveException
from .permission_models_pre_oci import pre_oci_model as model
from endpoints.api import (
RepositoryParamResource,
log_action,
nickname,
path_param,
request_error,
require_repo_admin,
resource,
validate_json_request,
)
from endpoints.exception import NotFound
logger = logging.getLogger(__name__)
@resource("/v1/repository/<apirepopath:repository>/permissions/team/")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
class RepositoryTeamPermissionList(RepositoryParamResource):
"""
Resource for repository team permissions.
"""
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@nickname("listRepoTeamPermissions")
def get(self, namespace_name, repository_name):
"""
List all team permission.
"""
repo_perms = model.get_repo_permissions_by_team(namespace_name, repository_name)
return {
"permissions": {repo_perm.team_name: repo_perm.to_dict() for repo_perm in repo_perms}
}
@resource("/v1/repository/<apirepopath:repository>/permissions/user/")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
class RepositoryUserPermissionList(RepositoryParamResource):
"""
Resource for repository user permissions.
"""
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@nickname("listRepoUserPermissions")
def get(self, namespace_name, repository_name):
"""
List all user permissions.
"""
perms = model.get_repo_permissions_by_user(namespace_name, repository_name)
return {"permissions": {p.username: p.to_dict() for p in perms}}
@resource("/v1/repository/<apirepopath:repository>/permissions/user/<username>/transitive")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("username", "The username of the user to which the permissions apply")
class RepositoryUserTransitivePermission(RepositoryParamResource):
"""
Resource for retrieving whether a user has access to a repository, either directly or via a
team.
"""
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@nickname("getUserTransitivePermission")
def get(self, namespace_name, repository_name, username):
"""
Get the fetch the permission for the specified user.
"""
roles = model.get_repo_roles(username, namespace_name, repository_name)
if not roles:
raise NotFound
return {"permissions": [r.to_dict() for r in roles]}
@resource("/v1/repository/<apirepopath:repository>/permissions/user/<username>")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("username", "The username of the user to which the permission applies")
class RepositoryUserPermission(RepositoryParamResource):
"""
Resource for managing individual user permissions.
"""
schemas = {
"UserPermission": {
"type": "object",
"description": "Description of a user permission.",
"required": [
"role",
],
"properties": {
"role": {
"type": "string",
"description": "Role to use for the user",
"enum": [
"read",
"write",
"admin",
],
},
},
},
}
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@nickname("getUserPermissions")
def get(self, namespace_name, repository_name, username):
"""
Get the permission for the specified user.
"""
logger.debug(
"Get repo: %s/%s permissions for user %s", namespace_name, repository_name, username
)
perm = model.get_repo_permission_for_user(username, namespace_name, repository_name)
return perm.to_dict()
@require_repo_admin(allow_for_superuser=True)
@nickname("changeUserPermissions")
@validate_json_request("UserPermission")
def put(self, namespace_name, repository_name, username): # Also needs to respond to post
"""
Update the perimssions for an existing repository.
"""
new_permission = request.get_json()
logger.debug("Setting permission to: %s for user %s", new_permission["role"], username)
try:
perm = model.set_repo_permission_for_user(
username, namespace_name, repository_name, new_permission["role"]
)
resp = perm.to_dict()
except SaveException as ex:
raise request_error(exception=ex)
log_action(
"change_repo_permission",
namespace_name,
{
"username": username,
"repo": repository_name,
"namespace": namespace_name,
"role": new_permission["role"],
},
repo_name=repository_name,
)
return resp, 200
@require_repo_admin(allow_for_superuser=True)
@nickname("deleteUserPermissions")
def delete(self, namespace_name, repository_name, username):
"""
Delete the permission for the user.
"""
try:
model.delete_repo_permission_for_user(username, namespace_name, repository_name)
except DeleteException as ex:
raise request_error(exception=ex)
log_action(
"delete_repo_permission",
namespace_name,
{"username": username, "repo": repository_name, "namespace": namespace_name},
repo_name=repository_name,
)
return "", 204
@resource("/v1/repository/<apirepopath:repository>/permissions/team/<teamname>")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("teamname", "The name of the team to which the permission applies")
class RepositoryTeamPermission(RepositoryParamResource):
"""
Resource for managing individual team permissions.
"""
schemas = {
"TeamPermission": {
"type": "object",
"description": "Description of a team permission.",
"required": [
"role",
],
"properties": {
"role": {
"type": "string",
"description": "Role to use for the team",
"enum": [
"read",
"write",
"admin",
],
},
},
},
}
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@nickname("getTeamPermissions")
def get(self, namespace_name, repository_name, teamname):
"""
Fetch the permission for the specified team.
"""
logger.debug(
"Get repo: %s/%s permissions for team %s", namespace_name, repository_name, teamname
)
role = model.get_repo_role_for_team(teamname, namespace_name, repository_name)
return role.to_dict()
@require_repo_admin(allow_for_superuser=True)
@nickname("changeTeamPermissions")
@validate_json_request("TeamPermission")
def put(self, namespace_name, repository_name, teamname):
"""
Update the existing team permission.
"""
new_permission = request.get_json()
logger.debug("Setting permission to: %s for team %s", new_permission["role"], teamname)
try:
perm = model.set_repo_permission_for_team(
teamname, namespace_name, repository_name, new_permission["role"]
)
resp = perm.to_dict()
except SaveException as ex:
raise request_error(exception=ex)
log_action(
"change_repo_permission",
namespace_name,
{"team": teamname, "repo": repository_name, "role": new_permission["role"]},
repo_name=repository_name,
)
return resp, 200
@require_repo_admin(allow_for_superuser=True)
@nickname("deleteTeamPermissions")
def delete(self, namespace_name, repository_name, teamname):
"""
Delete the permission for the specified team.
"""
try:
model.delete_repo_permission_for_team(teamname, namespace_name, repository_name)
except DeleteException as ex:
raise request_error(exception=ex)
log_action(
"delete_repo_permission",
namespace_name,
{"team": teamname, "repo": repository_name},
repo_name=repository_name,
)
return "", 204