mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
The global readonly superuser is missing read only permissions on certain GET api's. This adds those permissions.
269 lines
8.9 KiB
Python
269 lines
8.9 KiB
Python
"""
|
|
Manage repository permissions.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from flask import request
|
|
|
|
from .permission_models_interface import DeleteException, SaveException
|
|
from .permission_models_pre_oci import pre_oci_model as model
|
|
from endpoints.api import (
|
|
RepositoryParamResource,
|
|
log_action,
|
|
nickname,
|
|
path_param,
|
|
request_error,
|
|
require_repo_admin,
|
|
resource,
|
|
validate_json_request,
|
|
)
|
|
from endpoints.exception import NotFound
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@resource("/v1/repository/<apirepopath:repository>/permissions/team/")
|
|
@path_param("repository", "The full path of the repository. e.g. namespace/name")
|
|
class RepositoryTeamPermissionList(RepositoryParamResource):
|
|
"""
|
|
Resource for repository team permissions.
|
|
"""
|
|
|
|
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
|
|
@nickname("listRepoTeamPermissions")
|
|
def get(self, namespace_name, repository_name):
|
|
"""
|
|
List all team permission.
|
|
"""
|
|
repo_perms = model.get_repo_permissions_by_team(namespace_name, repository_name)
|
|
|
|
return {
|
|
"permissions": {repo_perm.team_name: repo_perm.to_dict() for repo_perm in repo_perms}
|
|
}
|
|
|
|
|
|
@resource("/v1/repository/<apirepopath:repository>/permissions/user/")
|
|
@path_param("repository", "The full path of the repository. e.g. namespace/name")
|
|
class RepositoryUserPermissionList(RepositoryParamResource):
|
|
"""
|
|
Resource for repository user permissions.
|
|
"""
|
|
|
|
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
|
|
@nickname("listRepoUserPermissions")
|
|
def get(self, namespace_name, repository_name):
|
|
"""
|
|
List all user permissions.
|
|
"""
|
|
perms = model.get_repo_permissions_by_user(namespace_name, repository_name)
|
|
return {"permissions": {p.username: p.to_dict() for p in perms}}
|
|
|
|
|
|
@resource("/v1/repository/<apirepopath:repository>/permissions/user/<username>/transitive")
|
|
@path_param("repository", "The full path of the repository. e.g. namespace/name")
|
|
@path_param("username", "The username of the user to which the permissions apply")
|
|
class RepositoryUserTransitivePermission(RepositoryParamResource):
|
|
"""
|
|
Resource for retrieving whether a user has access to a repository, either directly or via a
|
|
team.
|
|
"""
|
|
|
|
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
|
|
@nickname("getUserTransitivePermission")
|
|
def get(self, namespace_name, repository_name, username):
|
|
"""
|
|
Get the fetch the permission for the specified user.
|
|
"""
|
|
|
|
roles = model.get_repo_roles(username, namespace_name, repository_name)
|
|
|
|
if not roles:
|
|
raise NotFound
|
|
|
|
return {"permissions": [r.to_dict() for r in roles]}
|
|
|
|
|
|
@resource("/v1/repository/<apirepopath:repository>/permissions/user/<username>")
|
|
@path_param("repository", "The full path of the repository. e.g. namespace/name")
|
|
@path_param("username", "The username of the user to which the permission applies")
|
|
class RepositoryUserPermission(RepositoryParamResource):
|
|
"""
|
|
Resource for managing individual user permissions.
|
|
"""
|
|
|
|
schemas = {
|
|
"UserPermission": {
|
|
"type": "object",
|
|
"description": "Description of a user permission.",
|
|
"required": [
|
|
"role",
|
|
],
|
|
"properties": {
|
|
"role": {
|
|
"type": "string",
|
|
"description": "Role to use for the user",
|
|
"enum": [
|
|
"read",
|
|
"write",
|
|
"admin",
|
|
],
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
|
|
@nickname("getUserPermissions")
|
|
def get(self, namespace_name, repository_name, username):
|
|
"""
|
|
Get the permission for the specified user.
|
|
"""
|
|
logger.debug(
|
|
"Get repo: %s/%s permissions for user %s", namespace_name, repository_name, username
|
|
)
|
|
perm = model.get_repo_permission_for_user(username, namespace_name, repository_name)
|
|
return perm.to_dict()
|
|
|
|
@require_repo_admin(allow_for_superuser=True)
|
|
@nickname("changeUserPermissions")
|
|
@validate_json_request("UserPermission")
|
|
def put(self, namespace_name, repository_name, username): # Also needs to respond to post
|
|
"""
|
|
Update the perimssions for an existing repository.
|
|
"""
|
|
new_permission = request.get_json()
|
|
|
|
logger.debug("Setting permission to: %s for user %s", new_permission["role"], username)
|
|
|
|
try:
|
|
perm = model.set_repo_permission_for_user(
|
|
username, namespace_name, repository_name, new_permission["role"]
|
|
)
|
|
resp = perm.to_dict()
|
|
except SaveException as ex:
|
|
raise request_error(exception=ex)
|
|
|
|
log_action(
|
|
"change_repo_permission",
|
|
namespace_name,
|
|
{
|
|
"username": username,
|
|
"repo": repository_name,
|
|
"namespace": namespace_name,
|
|
"role": new_permission["role"],
|
|
},
|
|
repo_name=repository_name,
|
|
)
|
|
|
|
return resp, 200
|
|
|
|
@require_repo_admin(allow_for_superuser=True)
|
|
@nickname("deleteUserPermissions")
|
|
def delete(self, namespace_name, repository_name, username):
|
|
"""
|
|
Delete the permission for the user.
|
|
"""
|
|
try:
|
|
model.delete_repo_permission_for_user(username, namespace_name, repository_name)
|
|
except DeleteException as ex:
|
|
raise request_error(exception=ex)
|
|
|
|
log_action(
|
|
"delete_repo_permission",
|
|
namespace_name,
|
|
{"username": username, "repo": repository_name, "namespace": namespace_name},
|
|
repo_name=repository_name,
|
|
)
|
|
|
|
return "", 204
|
|
|
|
|
|
@resource("/v1/repository/<apirepopath:repository>/permissions/team/<teamname>")
|
|
@path_param("repository", "The full path of the repository. e.g. namespace/name")
|
|
@path_param("teamname", "The name of the team to which the permission applies")
|
|
class RepositoryTeamPermission(RepositoryParamResource):
|
|
"""
|
|
Resource for managing individual team permissions.
|
|
"""
|
|
|
|
schemas = {
|
|
"TeamPermission": {
|
|
"type": "object",
|
|
"description": "Description of a team permission.",
|
|
"required": [
|
|
"role",
|
|
],
|
|
"properties": {
|
|
"role": {
|
|
"type": "string",
|
|
"description": "Role to use for the team",
|
|
"enum": [
|
|
"read",
|
|
"write",
|
|
"admin",
|
|
],
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
|
|
@nickname("getTeamPermissions")
|
|
def get(self, namespace_name, repository_name, teamname):
|
|
"""
|
|
Fetch the permission for the specified team.
|
|
"""
|
|
logger.debug(
|
|
"Get repo: %s/%s permissions for team %s", namespace_name, repository_name, teamname
|
|
)
|
|
role = model.get_repo_role_for_team(teamname, namespace_name, repository_name)
|
|
return role.to_dict()
|
|
|
|
@require_repo_admin(allow_for_superuser=True)
|
|
@nickname("changeTeamPermissions")
|
|
@validate_json_request("TeamPermission")
|
|
def put(self, namespace_name, repository_name, teamname):
|
|
"""
|
|
Update the existing team permission.
|
|
"""
|
|
new_permission = request.get_json()
|
|
|
|
logger.debug("Setting permission to: %s for team %s", new_permission["role"], teamname)
|
|
|
|
try:
|
|
perm = model.set_repo_permission_for_team(
|
|
teamname, namespace_name, repository_name, new_permission["role"]
|
|
)
|
|
resp = perm.to_dict()
|
|
except SaveException as ex:
|
|
raise request_error(exception=ex)
|
|
|
|
log_action(
|
|
"change_repo_permission",
|
|
namespace_name,
|
|
{"team": teamname, "repo": repository_name, "role": new_permission["role"]},
|
|
repo_name=repository_name,
|
|
)
|
|
return resp, 200
|
|
|
|
@require_repo_admin(allow_for_superuser=True)
|
|
@nickname("deleteTeamPermissions")
|
|
def delete(self, namespace_name, repository_name, teamname):
|
|
"""
|
|
Delete the permission for the specified team.
|
|
"""
|
|
try:
|
|
model.delete_repo_permission_for_team(teamname, namespace_name, repository_name)
|
|
except DeleteException as ex:
|
|
raise request_error(exception=ex)
|
|
|
|
log_action(
|
|
"delete_repo_permission",
|
|
namespace_name,
|
|
{"team": teamname, "repo": repository_name},
|
|
repo_name=repository_name,
|
|
)
|
|
|
|
return "", 204
|