1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/data/model/permission.py
Syed Ahmed 97b3e025de db: use read replica for selected queries (PROJQUAY-6397) (#2758)
* db: use read replica for selected queries (PROJQUAY-6397)

We add a new param `can_use_read_replica` to the `select`
query. This allows us to choose which queries we want to
send to the read replica. This is useful in cases where
the read replica lags behind the primary and some queries
need the latest data
2024-03-18 14:23:16 -04:00

371 lines
12 KiB
Python

from peewee import JOIN
from data.database import (
Namespace,
PermissionPrototype,
Repository,
RepositoryPermission,
Role,
Team,
TeamMember,
TeamRole,
User,
Visibility,
)
from data.model import DataModelException, _basequery
from util.names import parse_robot_username
def list_team_permissions(team):
return (
RepositoryPermission.select(RepositoryPermission)
.join(Repository)
.join(Visibility)
.switch(RepositoryPermission)
.join(Role)
.switch(RepositoryPermission)
.where(RepositoryPermission.team == team)
)
def list_robot_permissions(robot_name):
return (
RepositoryPermission.select(RepositoryPermission, User, Repository)
.join(Repository)
.join(Visibility)
.switch(RepositoryPermission)
.join(Role)
.switch(RepositoryPermission)
.join(User)
.where(User.username == robot_name, User.robot == True)
)
def list_organization_member_permissions(organization, limit_to_user=None):
query = (
RepositoryPermission.select(RepositoryPermission, Repository, User)
.join(Repository)
.switch(RepositoryPermission)
.join(User)
.where(Repository.namespace_user == organization)
)
if limit_to_user is not None:
query = query.where(RepositoryPermission.user == limit_to_user)
else:
query = query.where(User.robot == False)
return query
def get_all_user_repository_permissions(user):
return _get_user_repo_permissions(user)
def get_user_repo_permissions(user, repo):
return _get_user_repo_permissions(user, limit_to_repository_obj=repo)
def get_user_repository_permissions(user, namespace, repo_name):
return _get_user_repo_permissions(user, limit_namespace=namespace, limit_repo_name=repo_name)
def _get_user_repo_permissions(
user, limit_to_repository_obj=None, limit_namespace=None, limit_repo_name=None
):
UserThroughTeam = User.alias()
base_query = (
RepositoryPermission.select(
RepositoryPermission,
Role,
Repository,
Namespace,
)
.join(Role)
.switch(RepositoryPermission)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.switch(RepositoryPermission)
)
if limit_to_repository_obj is not None:
base_query = base_query.where(RepositoryPermission.repository == limit_to_repository_obj)
elif limit_namespace and limit_repo_name:
base_query = base_query.where(
Repository.name == limit_repo_name, Namespace.username == limit_namespace
)
direct = base_query.clone().join(User).where(User.id == user)
team = (
base_query.clone()
.join(Team)
.join(TeamMember)
.join(UserThroughTeam, on=(UserThroughTeam.id == TeamMember.user))
.where(UserThroughTeam.id == user)
)
return direct | team
def delete_prototype_permission(org, uid):
found = get_prototype_permission(org, uid)
if not found:
return None
found.delete_instance()
return found
def get_prototype_permission(org, uid):
try:
return PermissionPrototype.get(
PermissionPrototype.org == org, PermissionPrototype.uuid == uid
)
except PermissionPrototype.DoesNotExist:
return None
def get_prototype_permissions(org):
ActivatingUser = User.alias()
DelegateUser = User.alias()
query = (
PermissionPrototype.select()
.where(PermissionPrototype.org == org)
.join(
ActivatingUser,
JOIN.LEFT_OUTER,
on=(ActivatingUser.id == PermissionPrototype.activating_user),
)
.join(
DelegateUser, JOIN.LEFT_OUTER, on=(DelegateUser.id == PermissionPrototype.delegate_user)
)
.join(Team, JOIN.LEFT_OUTER, on=(Team.id == PermissionPrototype.delegate_team))
.join(Role, JOIN.LEFT_OUTER, on=(Role.id == PermissionPrototype.role))
)
return query
def update_prototype_permission(org, uid, role_name):
found = get_prototype_permission(org, uid)
if not found:
return None
new_role = Role.get(Role.name == role_name)
found.role = new_role
found.save()
return found
def add_prototype_permission(
org, role_name, activating_user, delegate_user=None, delegate_team=None
):
new_role = Role.get(Role.name == role_name)
return PermissionPrototype.create(
org=org,
role=new_role,
activating_user=activating_user,
delegate_user=delegate_user,
delegate_team=delegate_team,
)
def get_org_wide_permissions(user, org_filter=None):
Org = User.alias()
team_with_role = Team.select(Team, Org, TeamRole).join(TeamRole)
with_org = team_with_role.switch(Team).join(Org, on=(Team.organization == Org.id))
with_user = with_org.switch(Team).join(TeamMember).join(User)
if org_filter:
with_user.where(Org.username == org_filter)
return with_user.where(User.id == user, Org.organization == True)
def get_all_repo_teams(namespace_name, repository_name):
return (
RepositoryPermission.select(Team.name, Role.name, RepositoryPermission)
.join(Team)
.switch(RepositoryPermission)
.join(Role)
.switch(RepositoryPermission)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == namespace_name, Repository.name == repository_name)
)
def apply_default_permissions(repo_obj, creating_user_obj):
org = repo_obj.namespace_user
user_clause = (PermissionPrototype.activating_user == creating_user_obj) | (
PermissionPrototype.activating_user >> None
)
team_protos = PermissionPrototype.select().where(
PermissionPrototype.org == org, user_clause, PermissionPrototype.delegate_user >> None
)
def create_team_permission(team, repo, role):
RepositoryPermission.create(team=team, repository=repo, role=role)
__apply_permission_list(repo_obj, team_protos, "name", create_team_permission)
user_protos = PermissionPrototype.select().where(
PermissionPrototype.org == org, user_clause, PermissionPrototype.delegate_team >> None
)
def create_user_permission(user, repo, role):
# The creating user always gets admin anyway
if user.username == creating_user_obj.username:
return
RepositoryPermission.create(user=user, repository=repo, role=role)
__apply_permission_list(repo_obj, user_protos, "username", create_user_permission)
def __apply_permission_list(repo, proto_query, name_property, create_permission_func):
final_protos = {}
for proto in proto_query:
applies_to = proto.delegate_team or proto.delegate_user
name = getattr(applies_to, name_property)
# We will skip the proto if it is pre-empted by a more important proto
if name in final_protos and proto.activating_user is None:
continue
# By this point, it is either a user specific proto, or there is no
# proto yet, so we can safely assume it applies
final_protos[name] = (applies_to, proto.role)
for delegate, role in list(final_protos.values()):
create_permission_func(delegate, repo, role)
def __entity_permission_repo_query(
entity_id, entity_table, entity_id_property, namespace_name, repository_name
):
"""
This method works for both users and teams.
"""
return (
RepositoryPermission.select(entity_table, Repository, Namespace, Role, RepositoryPermission)
.join(entity_table)
.switch(RepositoryPermission)
.join(Role)
.switch(RepositoryPermission)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(
Repository.name == repository_name,
Namespace.username == namespace_name,
entity_id_property == entity_id,
)
)
def get_user_reponame_permission(username, namespace_name, repository_name):
fetched = list(
__entity_permission_repo_query(
username, User, User.username, namespace_name, repository_name
)
)
if not fetched:
raise DataModelException("User does not have permission for repo.")
return fetched[0]
def get_team_reponame_permission(team_name, namespace_name, repository_name):
fetched = list(
__entity_permission_repo_query(team_name, Team, Team.name, namespace_name, repository_name)
)
if not fetched:
raise DataModelException("Team does not have permission for repo.")
return fetched[0]
def delete_user_permission(username, namespace_name, repository_name):
if username == namespace_name:
raise DataModelException("Namespace owner must always be admin.")
fetched = list(
__entity_permission_repo_query(
username, User, User.username, namespace_name, repository_name
)
)
if not fetched:
raise DataModelException("User does not have permission for repo.")
fetched[0].delete_instance()
def delete_team_permission(team_name, namespace_name, repository_name):
fetched = list(
__entity_permission_repo_query(team_name, Team, Team.name, namespace_name, repository_name)
)
if not fetched:
raise DataModelException("Team does not have permission for repo.")
fetched[0].delete_instance()
def __set_entity_repo_permission(
entity, permission_entity_property, namespace_name, repository_name, role_name
):
repo = _basequery.get_existing_repository(namespace_name, repository_name)
new_role = Role.get(Role.name == role_name)
# Fetch any existing permission for this entity on the repo
try:
entity_attr = getattr(RepositoryPermission, permission_entity_property)
perm = RepositoryPermission.get(
entity_attr == entity, RepositoryPermission.repository == repo
)
perm.role = new_role
perm.save()
return perm
except RepositoryPermission.DoesNotExist:
set_entity_kwargs = {permission_entity_property: entity}
new_perm = RepositoryPermission.create(repository=repo, role=new_role, **set_entity_kwargs)
return new_perm
def set_user_repo_permission(username, namespace_name, repository_name, role_name):
if username == namespace_name:
raise DataModelException("Namespace owner must always be admin.")
try:
user = User.get(User.username == username)
except User.DoesNotExist:
raise DataModelException("Invalid username: %s" % username)
if user.robot:
parts = parse_robot_username(user.username)
if not parts:
raise DataModelException("Invalid robot: %s" % username)
robot_namespace, _ = parts
if robot_namespace != namespace_name:
raise DataModelException(
"Cannot add robot %s under namespace %s" % (username, namespace_name)
)
return __set_entity_repo_permission(user, "user", namespace_name, repository_name, role_name)
def set_team_repo_permission(team_name, namespace_name, repository_name, role_name):
try:
team = (
Team.select()
.join(User)
.where(Team.name == team_name, User.username == namespace_name)
.get()
)
except Team.DoesNotExist:
raise DataModelException("No team %s in organization %s" % (team_name, namespace_name))
return __set_entity_repo_permission(team, "team", namespace_name, repository_name, role_name)