1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/data/model/organization.py
jbpratt c3aaa87c47 fix(organization): optimize get_organization_member_set to accept IDs directly (PROJQUAY-900) (#4918)
Eliminate N+1 lazy-load queries by accepting user IDs instead of User objects.

## Problem

When checking organization membership for permissions, the old code triggered
a lazy-load query for EACH permission just to extract the user ID:

```python
# Old caller pattern - triggers N lazy-loads!
users_filter = {perm.user for perm in repo_perms}
org_members = get_organization_member_set(org, users_filter=users_filter)
```

For an endpoint returning 50 permissions, this caused 50 extra SELECT queries.

## Solution

Accept user IDs directly via `user_ids_filter` parameter:

```python
# New pattern - no lazy-loads!
user_ids_filter = {perm.user_id for perm in repo_perms}
org_members = get_organization_member_set(org, user_ids_filter=user_ids_filter)
```

The `user_id` foreign key field is already populated on the model - accessing
it doesn't require a database query.

## Changes

- Renamed `users_filter` → `user_ids_filter` parameter
- Accept set of integer IDs instead of User objects
- Updated 6 call sites in permission_models_pre_oci.py and prototype.py
- Added comprehensive test coverage

## Performance Impact

For get_repo_permissions_by_user with 50 permissions:
- Before: 50 lazy-load queries
- After: 0 queries

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Signed-off-by: Brady Pratt <bpratt@redhat.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-19 14:17:08 -06:00

213 lines
6.3 KiB
Python

from data.database import (
DeletedNamespace,
FederatedLogin,
Namespace,
Repository,
RepositoryPermission,
Team,
TeamMember,
TeamRole,
User,
)
from data.model import (
DataModelException,
InvalidOrganizationException,
InvalidUsernameException,
_basequery,
db_transaction,
team,
user,
)
def create_organization(name, email, creating_user, email_required=True, is_possible_abuser=False):
with db_transaction():
try:
# Create the org
new_org = user.create_user_noverify(
name, email, email_required=email_required, is_possible_abuser=is_possible_abuser
)
new_org.organization = True
new_org.save()
# Create a team for the owners
owners_team = team.create_team("owners", new_org, "admin")
# Add the user who created the org to the owners team
team.add_user_to_team(creating_user, owners_team)
return new_org
except InvalidUsernameException as iue:
raise InvalidOrganizationException(str(iue))
def get_organization(name):
try:
return User.get(username=name, organization=True)
except User.DoesNotExist:
raise InvalidOrganizationException("Organization does not exist: %s" % name)
def get_organization_by_id(org_db_id):
try:
return User.get(id=org_db_id, organization=True)
except User.DoesNotExist:
raise InvalidOrganizationException("Organization does not exist: %s" % org_db_id)
def convert_user_to_organization(user_obj, admin_user):
if user_obj.robot:
raise DataModelException("Cannot convert a robot into an organization")
with db_transaction():
# Change the user to an organization and disable this account for login.
user_obj.organization = True
user_obj.password_hash = None
user_obj.save()
# Clear any federated auth pointing to this user.
FederatedLogin.delete().where(FederatedLogin.user == user_obj).execute()
# Delete any user-specific permissions on repositories.
(RepositoryPermission.delete().where(RepositoryPermission.user == user_obj).execute())
# Create a team for the owners
owners_team = team.create_team("owners", user_obj, "admin")
# Add the user who will admin the org to the owners team
team.add_user_to_team(admin_user, owners_team)
return user_obj
def get_user_organizations(username):
return _basequery.get_user_organizations(username)
def get_organization_team_members(teamid):
joined = User.select().join(TeamMember).join(Team)
query = joined.where(Team.id == teamid)
return query
def __get_org_admin_users(org):
return (
User.select()
.join(TeamMember)
.join(Team)
.join(TeamRole)
.where(Team.organization == org, TeamRole.name == "admin", User.robot == False)
.distinct()
)
def get_admin_users(org):
"""
Returns the owner users for the organization.
"""
return __get_org_admin_users(org)
def remove_organization_member(org, user_obj):
org_admins = [u.username for u in __get_org_admin_users(org)]
if len(org_admins) == 1 and user_obj.username in org_admins:
raise DataModelException("Cannot remove user as they are the only organization admin")
with db_transaction():
# Find and remove the user from any repositories under the org.
permissions = list(
RepositoryPermission.select(RepositoryPermission.id)
.join(Repository)
.where(Repository.namespace_user == org, RepositoryPermission.user == user_obj)
)
if permissions:
RepositoryPermission.delete().where(RepositoryPermission.id << permissions).execute()
# Find and remove the user from any teams under the org.
members = list(
TeamMember.select(TeamMember.id)
.join(Team)
.where(Team.organization == org, TeamMember.user == user_obj)
)
if members:
TeamMember.delete().where(TeamMember.id << members).execute()
def get_organization_member_set(org, include_robots=False, user_ids_filter=None):
"""
Returns the set of all member usernames under the given organization, with optional filtering by
robots and/or by a specific set of user IDs.
"""
org_users = (
User.select(User.username)
.join(TeamMember)
.join(Team)
.where(Team.organization == org)
.distinct()
)
if not include_robots:
org_users = org_users.where(User.robot == False)
if user_ids_filter is not None:
if not user_ids_filter:
return set()
org_users = org_users.where(User.id << user_ids_filter)
return {user.username for user in org_users}
def get_all_repo_users_transitive_via_teams(namespace_name, repository_name):
return (
User.select()
.distinct()
.join(TeamMember)
.join(Team)
.join(RepositoryPermission)
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == namespace_name, Repository.name == repository_name)
)
def get_organizations(disabled=True, deleted=False):
query = User.select().where(User.organization == True, User.robot == False)
if not disabled:
query = query.where(User.enabled == True)
else:
# NOTE: Deleted users are already disabled, so we don't need this extra check.
if not deleted:
query = query.where(User.id.not_in(DeletedNamespace.select(DeletedNamespace.namespace)))
return query
def get_active_org_count():
return get_organizations(disabled=False).count()
def add_user_as_admin(user_obj, org_obj):
try:
admin_role = TeamRole.get(name="admin")
admin_team = (
Team.select().where(Team.role == admin_role, Team.organization == org_obj).get()
)
team.add_user_to_team(user_obj, admin_team)
except team.UserAlreadyInTeam:
pass
def is_org_admin(user, org):
return (
Team.select()
.join(TeamMember)
.switch(Team)
.join(TeamRole)
.where(Team.organization == org, TeamRole.name == "admin", TeamMember.user == user)
.exists()
)