1
0
mirror of https://github.com/quay/quay.git synced 2026-01-27 18:42:52 +03:00
Files
quay/data/model/permission.py
Kurtis Mullins 38be6d05d0 Python 3 (#153)
* Convert all Python2 to Python3 syntax.

* Removes oauth2lib dependency

* Replace mockredis with fakeredis

* byte/str conversions

* Removes nonexisting __nonzero__ in Python3

* Python3 Dockerfile and related

* [PROJQUAY-98] Replace resumablehashlib with rehash

* PROJQUAY-123 - replace gpgme with python3-gpg

* [PROJQUAY-135] Fix unhashable class error

* Update external dependencies for Python 3

- Move github.com/app-registry/appr to github.com/quay/appr
- github.com/coderanger/supervisor-stdout
- github.com/DevTable/container-cloud-config
- Update to latest mockldap with changes applied from coreos/mockldap
- Update dependencies in requirements.txt and requirements-dev.txt

* Default FLOAT_REPR function to str in json encoder and removes keyword assignment

True, False, and str were not keywords in Python2...

* [PROJQUAY-165] Replace package `bencode` with `bencode.py`

- Bencode is not compatible with Python 3.x and is no longer
  maintained. Bencode.py appears to be a drop-in replacement/fork
  that is compatible with Python 3.

* Make sure monkey.patch is called before anything else (

* Removes anunidecode dependency and replaces it with text_unidecode

* Base64 encode/decode pickle dumps/loads when storing value in DB

Base64 encodes/decodes the serialized values when storing them in the
DB. Also make sure to return a Python3 string instead of a Bytes when
coercing for db, otherwise, Postgres' TEXT field will convert it into
a hex representation when storing the value.

* Implement __hash__ on Digest class

In Python 3, if a class defines __eq__() but not __hash__(), its
instances will not be usable as items in hashable collections (e.g sets).

* Remove basestring check

* Fix expected message in credentials tests

* Fix usage of Cryptography.Fernet for Python3 (#219)

- Specifically, this addresses the issue where Byte<->String
  conversions weren't being applied correctly.

* Fix utils

- tar+stream layer format utils
- filelike util

* Fix storage tests

* Fix endpoint tests

* Fix workers tests

* Fix docker's empty layer bytes

* Fix registry tests

* Appr

* Enable CI for Python 3.6

* Skip buildman tests

Skip buildman tests while it's being rewritten to allow ci to pass.

* Install swig for CI

* Update expected exception type in redis validation test

* Fix gpg signing calls

Fix gpg calls for updated gpg wrapper, and add signing tests.

* Convert / to // for Python3 integer division

* WIP: Update buildman to use asyncio instead of trollius.

This dependency is considered deprecated/abandoned and was only
used as an implementation/backport of asyncio on Python 2.x
This is a work in progress, and is included in the PR just to get the
rest of the tests passing. The builder is actually being rewritten.

* Target Python 3.8

* Removes unused files

- Removes unused files that were added accidentally while rebasing
- Small fixes/cleanup
- TODO tasks comments

* Add TODO to verify rehash backward compat with resumablehashlib

* Revert "[PROJQUAY-135] Fix unhashable class error" and implements __hash__ instead.

This reverts commit 735e38e3c1d072bf50ea864bc7e119a55d3a8976.
Instead, defines __hash__ for encryped fields class, using the parent
field's implementation.

* Remove some unused files ad imports

Co-authored-by: Kenny Lee Sin Cheong <kenny.lee@redhat.com>
Co-authored-by: Tom McKay <thomasmckay@redhat.com>
2020-06-05 16:50:13 -04:00

366 lines
11 KiB
Python

from peewee import JOIN
from data.database import (
RepositoryPermission,
User,
Repository,
Visibility,
Role,
TeamMember,
PermissionPrototype,
Team,
TeamRole,
Namespace,
)
from data.model import DataModelException, _basequery
from util.names import parse_robot_username
def list_team_permissions(team):
return (
RepositoryPermission.select(RepositoryPermission)
.join(Repository)
.join(Visibility)
.switch(RepositoryPermission)
.join(Role)
.switch(RepositoryPermission)
.where(RepositoryPermission.team == team)
)
def list_robot_permissions(robot_name):
return (
RepositoryPermission.select(RepositoryPermission, User, Repository)
.join(Repository)
.join(Visibility)
.switch(RepositoryPermission)
.join(Role)
.switch(RepositoryPermission)
.join(User)
.where(User.username == robot_name, User.robot == True)
)
def list_organization_member_permissions(organization, limit_to_user=None):
query = (
RepositoryPermission.select(RepositoryPermission, Repository, User)
.join(Repository)
.switch(RepositoryPermission)
.join(User)
.where(Repository.namespace_user == organization)
)
if limit_to_user is not None:
query = query.where(RepositoryPermission.user == limit_to_user)
else:
query = query.where(User.robot == False)
return query
def get_all_user_repository_permissions(user):
return _get_user_repo_permissions(user)
def get_user_repo_permissions(user, repo):
return _get_user_repo_permissions(user, limit_to_repository_obj=repo)
def get_user_repository_permissions(user, namespace, repo_name):
return _get_user_repo_permissions(user, limit_namespace=namespace, limit_repo_name=repo_name)
def _get_user_repo_permissions(
user, limit_to_repository_obj=None, limit_namespace=None, limit_repo_name=None
):
UserThroughTeam = User.alias()
base_query = (
RepositoryPermission.select(RepositoryPermission, Role, Repository, Namespace)
.join(Role)
.switch(RepositoryPermission)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(RepositoryPermission)
)
if limit_to_repository_obj is not None:
base_query = base_query.where(RepositoryPermission.repository == limit_to_repository_obj)
elif limit_namespace and limit_repo_name:
base_query = base_query.where(
Repository.name == limit_repo_name, Namespace.username == limit_namespace
)
direct = base_query.clone().join(User).where(User.id == user)
team = (
base_query.clone()
.join(Team)
.join(TeamMember)
.join(UserThroughTeam, on=(UserThroughTeam.id == TeamMember.user))
.where(UserThroughTeam.id == user)
)
return direct | team
def delete_prototype_permission(org, uid):
found = get_prototype_permission(org, uid)
if not found:
return None
found.delete_instance()
return found
def get_prototype_permission(org, uid):
try:
return PermissionPrototype.get(
PermissionPrototype.org == org, PermissionPrototype.uuid == uid
)
except PermissionPrototype.DoesNotExist:
return None
def get_prototype_permissions(org):
ActivatingUser = User.alias()
DelegateUser = User.alias()
query = (
PermissionPrototype.select()
.where(PermissionPrototype.org == org)
.join(
ActivatingUser,
JOIN.LEFT_OUTER,
on=(ActivatingUser.id == PermissionPrototype.activating_user),
)
.join(
DelegateUser, JOIN.LEFT_OUTER, on=(DelegateUser.id == PermissionPrototype.delegate_user)
)
.join(Team, JOIN.LEFT_OUTER, on=(Team.id == PermissionPrototype.delegate_team))
.join(Role, JOIN.LEFT_OUTER, on=(Role.id == PermissionPrototype.role))
)
return query
def update_prototype_permission(org, uid, role_name):
found = get_prototype_permission(org, uid)
if not found:
return None
new_role = Role.get(Role.name == role_name)
found.role = new_role
found.save()
return found
def add_prototype_permission(
org, role_name, activating_user, delegate_user=None, delegate_team=None
):
new_role = Role.get(Role.name == role_name)
return PermissionPrototype.create(
org=org,
role=new_role,
activating_user=activating_user,
delegate_user=delegate_user,
delegate_team=delegate_team,
)
def get_org_wide_permissions(user, org_filter=None):
Org = User.alias()
team_with_role = Team.select(Team, Org, TeamRole).join(TeamRole)
with_org = team_with_role.switch(Team).join(Org, on=(Team.organization == Org.id))
with_user = with_org.switch(Team).join(TeamMember).join(User)
if org_filter:
with_user.where(Org.username == org_filter)
return with_user.where(User.id == user, Org.organization == True)
def get_all_repo_teams(namespace_name, repository_name):
return (
RepositoryPermission.select(Team.name, Role.name, RepositoryPermission)
.join(Team)
.switch(RepositoryPermission)
.join(Role)
.switch(RepositoryPermission)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == namespace_name, Repository.name == repository_name)
)
def apply_default_permissions(repo_obj, creating_user_obj):
org = repo_obj.namespace_user
user_clause = (PermissionPrototype.activating_user == creating_user_obj) | (
PermissionPrototype.activating_user >> None
)
team_protos = PermissionPrototype.select().where(
PermissionPrototype.org == org, user_clause, PermissionPrototype.delegate_user >> None
)
def create_team_permission(team, repo, role):
RepositoryPermission.create(team=team, repository=repo, role=role)
__apply_permission_list(repo_obj, team_protos, "name", create_team_permission)
user_protos = PermissionPrototype.select().where(
PermissionPrototype.org == org, user_clause, PermissionPrototype.delegate_team >> None
)
def create_user_permission(user, repo, role):
# The creating user always gets admin anyway
if user.username == creating_user_obj.username:
return
RepositoryPermission.create(user=user, repository=repo, role=role)
__apply_permission_list(repo_obj, user_protos, "username", create_user_permission)
def __apply_permission_list(repo, proto_query, name_property, create_permission_func):
final_protos = {}
for proto in proto_query:
applies_to = proto.delegate_team or proto.delegate_user
name = getattr(applies_to, name_property)
# We will skip the proto if it is pre-empted by a more important proto
if name in final_protos and proto.activating_user is None:
continue
# By this point, it is either a user specific proto, or there is no
# proto yet, so we can safely assume it applies
final_protos[name] = (applies_to, proto.role)
for delegate, role in list(final_protos.values()):
create_permission_func(delegate, repo, role)
def __entity_permission_repo_query(
entity_id, entity_table, entity_id_property, namespace_name, repository_name
):
"""
This method works for both users and teams.
"""
return (
RepositoryPermission.select(entity_table, Repository, Namespace, Role, RepositoryPermission)
.join(entity_table)
.switch(RepositoryPermission)
.join(Role)
.switch(RepositoryPermission)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(
Repository.name == repository_name,
Namespace.username == namespace_name,
entity_id_property == entity_id,
)
)
def get_user_reponame_permission(username, namespace_name, repository_name):
fetched = list(
__entity_permission_repo_query(
username, User, User.username, namespace_name, repository_name
)
)
if not fetched:
raise DataModelException("User does not have permission for repo.")
return fetched[0]
def get_team_reponame_permission(team_name, namespace_name, repository_name):
fetched = list(
__entity_permission_repo_query(team_name, Team, Team.name, namespace_name, repository_name)
)
if not fetched:
raise DataModelException("Team does not have permission for repo.")
return fetched[0]
def delete_user_permission(username, namespace_name, repository_name):
if username == namespace_name:
raise DataModelException("Namespace owner must always be admin.")
fetched = list(
__entity_permission_repo_query(
username, User, User.username, namespace_name, repository_name
)
)
if not fetched:
raise DataModelException("User does not have permission for repo.")
fetched[0].delete_instance()
def delete_team_permission(team_name, namespace_name, repository_name):
fetched = list(
__entity_permission_repo_query(team_name, Team, Team.name, namespace_name, repository_name)
)
if not fetched:
raise DataModelException("Team does not have permission for repo.")
fetched[0].delete_instance()
def __set_entity_repo_permission(
entity, permission_entity_property, namespace_name, repository_name, role_name
):
repo = _basequery.get_existing_repository(namespace_name, repository_name)
new_role = Role.get(Role.name == role_name)
# Fetch any existing permission for this entity on the repo
try:
entity_attr = getattr(RepositoryPermission, permission_entity_property)
perm = RepositoryPermission.get(
entity_attr == entity, RepositoryPermission.repository == repo
)
perm.role = new_role
perm.save()
return perm
except RepositoryPermission.DoesNotExist:
set_entity_kwargs = {permission_entity_property: entity}
new_perm = RepositoryPermission.create(repository=repo, role=new_role, **set_entity_kwargs)
return new_perm
def set_user_repo_permission(username, namespace_name, repository_name, role_name):
if username == namespace_name:
raise DataModelException("Namespace owner must always be admin.")
try:
user = User.get(User.username == username)
except User.DoesNotExist:
raise DataModelException("Invalid username: %s" % username)
if user.robot:
parts = parse_robot_username(user.username)
if not parts:
raise DataModelException("Invalid robot: %s" % username)
robot_namespace, _ = parts
if robot_namespace != namespace_name:
raise DataModelException(
"Cannot add robot %s under namespace %s" % (username, namespace_name)
)
return __set_entity_repo_permission(user, "user", namespace_name, repository_name, role_name)
def set_team_repo_permission(team_name, namespace_name, repository_name, role_name):
try:
team = (
Team.select()
.join(User)
.where(Team.name == team_name, User.username == namespace_name)
.get()
)
except Team.DoesNotExist:
raise DataModelException("No team %s in organization %s" % (team_name, namespace_name))
return __set_entity_repo_permission(team, "team", namespace_name, repository_name, role_name)