mirror of
https://github.com/quay/quay.git
synced 2025-04-18 10:44:06 +03:00
teamsync: peewee integrity error check on teamsync (PROJQUAY-7747) (#3550)
This issue should fix PROJQUAY-7747 and PROJQUAY-8130. Adding IntegrityError to the try except block to skip adding existing user to team.
This commit is contained in:
parent
e015964768
commit
70a0edef5b
@ -65,13 +65,13 @@ def create_team(name, org_obj, team_role_name, description=""):
|
||||
|
||||
|
||||
def add_user_to_team(user_obj, team):
|
||||
try:
|
||||
return TeamMember.create(user=user_obj, team=team)
|
||||
except Exception:
|
||||
if user_exists_in_team(user_obj, team):
|
||||
raise UserAlreadyInTeam(
|
||||
"User %s is already a member of team %s" % (user_obj.username, team.name)
|
||||
)
|
||||
|
||||
return TeamMember.create(user=user_obj, team=team)
|
||||
|
||||
|
||||
def remove_user_from_team(org_name, team_name, username, removed_by_username):
|
||||
Org = User.alias()
|
||||
@ -608,14 +608,18 @@ def get_oidc_team_from_groupname(group_name, login_service_name):
|
||||
Fetch TeamSync row synced with login_service_name from `group_name` in TeamSync.config
|
||||
"""
|
||||
response = []
|
||||
with db_transaction():
|
||||
query_result = (
|
||||
TeamSync.select()
|
||||
.join(LoginService)
|
||||
.where(TeamSync.config.contains(group_name), LoginService.name == login_service_name)
|
||||
)
|
||||
for row in query_result:
|
||||
if json.loads(row.config).get("group_name", None) == group_name:
|
||||
response.append(row)
|
||||
query_result = (
|
||||
TeamSync.select()
|
||||
.join(LoginService)
|
||||
.where(TeamSync.config.contains(group_name), LoginService.name == login_service_name)
|
||||
)
|
||||
|
||||
for row in query_result:
|
||||
if json.loads(row.config).get("group_name", None) == group_name:
|
||||
response.append(row)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def user_exists_in_team(user_obj, team):
|
||||
return TeamMember.select().where(TeamMember.user == user_obj, TeamMember.team == team).exists()
|
||||
|
@ -1,7 +1,9 @@
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from data.database import TeamMember
|
||||
from data.model import DataModelException
|
||||
from data.model import DataModelException, UserAlreadyInTeam
|
||||
from data.model.organization import create_organization
|
||||
from data.model.team import (
|
||||
__get_user_admin_teams,
|
||||
@ -11,10 +13,12 @@ from data.model.team import (
|
||||
create_team,
|
||||
delete_all_team_members,
|
||||
get_federated_user_teams,
|
||||
get_oidc_team_from_groupname,
|
||||
list_team_users,
|
||||
remove_team,
|
||||
remove_user_from_team,
|
||||
set_team_syncing,
|
||||
user_exists_in_team,
|
||||
validate_team_name,
|
||||
)
|
||||
from data.model.user import create_user_noverify, get_user
|
||||
@ -166,3 +170,38 @@ def test_get_federated_user_teams(login_service_name, initialized_db):
|
||||
assert len(user_teams) == 2
|
||||
elif login_service_name == "ldap":
|
||||
assert len(user_teams) == 1
|
||||
|
||||
|
||||
def test_user_exists_in_team(initialized_db):
|
||||
dev_user = get_user("devtable")
|
||||
new_org = create_organization("testorg", "testorg" + "@example.com", dev_user)
|
||||
|
||||
team_1 = create_team("team_1", new_org, "member")
|
||||
assert add_user_to_team(dev_user, team_1)
|
||||
assert user_exists_in_team(dev_user, team_1) is True
|
||||
|
||||
# add user to team already part of
|
||||
with pytest.raises(UserAlreadyInTeam):
|
||||
add_user_to_team(dev_user, team_1)
|
||||
|
||||
team_2 = create_team("team_2", new_org, "member")
|
||||
assert user_exists_in_team(dev_user, team_2) is False
|
||||
|
||||
|
||||
def test_get_oidc_team_from_groupname(initialized_db):
|
||||
dev_user = get_user("devtable")
|
||||
new_org = create_organization("testorg", "testorg" + "@example.com", dev_user)
|
||||
|
||||
team_1 = create_team("team_1", new_org, "member")
|
||||
assert add_user_to_team(dev_user, team_1)
|
||||
assert set_team_syncing(team_1, "oidc", {"group_name": "grp1"})
|
||||
response = get_oidc_team_from_groupname(group_name="grp1", login_service_name="oidc")
|
||||
assert len(response) == 1
|
||||
assert response[0].team.name == "team_1"
|
||||
assert json.loads(response[0].config).get("group_name") == "grp1"
|
||||
|
||||
response = get_oidc_team_from_groupname(group_name="team_1", login_service_name="ldap")
|
||||
assert len(response) == 0
|
||||
|
||||
response = get_oidc_team_from_groupname(group_name="team_1", login_service_name="ldap")
|
||||
assert len(response) == 0
|
||||
|
@ -179,6 +179,12 @@ class OIDCAuthTests(unittest.TestCase):
|
||||
|
||||
assert user_teams_before_sync + 2 == user_teams_after_sync
|
||||
|
||||
# attempt to sync already synced groups
|
||||
self.oidc_instance.sync_oidc_groups(user_groups, user_obj)
|
||||
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
||||
|
||||
assert user_teams_before_sync + 2 == user_teams_after_sync
|
||||
|
||||
def test_resync_for_empty_quay_teams(self):
|
||||
user_obj = model.user.get_user("devtable")
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user