1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/test/test_external_oidc.py
Sunandadadi 70a0edef5b
teamsync: peewee integrity error check on teamsync (PROJQUAY-7747) (#3550)
This issue should fix PROJQUAY-7747 and PROJQUAY-8130. Adding IntegrityError to the try except block to skip adding existing user to team.
2025-01-09 15:40:01 -05:00

348 lines
14 KiB
Python

import json
import unittest
import urllib.parse
import pytest
from httmock import HTTMock, urlmatch
from mock import patch
from app import app as realapp
from app import usermanager
from data import model
from data.database import TeamMember
from data.users.externaloidc import OIDCUsers
from features import FeatureNameValue
from initdb import finished_database_for_testing, setup_database_for_testing
from oauth.oidc import OIDCLoginService, PasswordGrantException
from test.fixtures import *
@pytest.fixture()
def discovery_content():
return {
"scopes_supported": ["openid"],
"authorization_endpoint": "http://fakeoidc/authorize",
"token_endpoint": "http://fakeoidc/token",
"jwks_uri": "http://fakeoidc/jwks",
"userinfo_endpoint": "http://fakeoidc/userinfo",
}
@pytest.fixture()
def discovery_handler(discovery_content):
@urlmatch(netloc=r"fakeoidc", path=r".+openid.+")
def handler(_, __):
return json.dumps(discovery_content)
return handler
@pytest.fixture()
def token_handler_password_grant():
@urlmatch(netloc=r"fakeoidc", path=r"/token")
def handler(_, request):
params = urllib.parse.parse_qs(request.body)
if params.get("grant_type")[0] != "password":
return {"status_code": 400, "content": "Invalid authorization type"}
if params.get("username")[0] != "someusername":
return {"status_code": 401, "content": "Invalid login credentials"}
if params.get("password")[0] != "somepassword":
return {"status_code": 401, "content": "Invalid login credentials"}
content = {
"access_token": "sometoken",
}
return {"status_code": 200, "content": json.dumps(content)}
return handler
@pytest.fixture()
def userinfo_content():
return {
"sub": "cooluser",
"preferred_username": "someusername",
"email": "foo@example.com",
}
@pytest.fixture
def userinfo_handler(userinfo_content):
@urlmatch(netloc=r"fakeoidc", path=r"/userinfo")
def handler(_, req):
if req.headers.get("Authorization") != "Bearer sometoken":
return {"status_code": 401, "content": "Missing expected header"}
return {"status_code": 200, "content": json.dumps(userinfo_content)}
return handler
class OIDCAuthTests(unittest.TestCase):
def fake_oidc(self):
"""
Instantiates a fake OIDC instance to use in the test cases
"""
client_id = "quay-test"
client_secret = "secret"
oidc_server = "http://test-server/realms/myrealm"
service_name = "test"
login_scopes = ["openid"]
preferred_group_claim_name = "groups"
oidc_instance = OIDCUsers(
client_id,
client_secret,
oidc_server,
service_name,
login_scopes,
preferred_group_claim_name,
)
return oidc_instance
def fake_oidc_login_service(self):
config = {
"CLIENT_ID": "foo",
"CLIENT_SECRET": "bar",
"SERVICE_NAME": "Some Cool Service",
"SERVICE_ICON": "http://some/icon",
"OIDC_SERVER": "http://fakeoidc",
"DEBUGGING": True,
}
return OIDCLoginService(config, "OIDC_LOGIN_CONFIG")
def setUp(self):
setup_database_for_testing(self)
self.oidc_instance = self.fake_oidc()
self.oidc_login_service = self.fake_oidc_login_service()
def tearDown(self):
finished_database_for_testing(self)
def test_sync_for_empty_oidc_groups(self):
user_obj = model.user.get_user("devtable")
test_org = model.organization.create_organization(
"testorg", "testorg@example.com", user_obj
)
team_1 = model.team.create_team("team_1", test_org, "member")
assert model.team.add_user_to_team(user_obj, team_1)
team_2 = model.team.create_team("team_2", test_org, "member")
assert model.team.add_user_to_team(user_obj, team_2)
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
self.oidc_instance.sync_oidc_groups([], user_obj)
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
assert user_teams_before_sync == user_teams_after_sync
self.oidc_instance.sync_oidc_groups(None, user_obj)
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
assert user_teams_before_sync == user_teams_after_sync
def test_sync_for_non_empty_oidc_groups(self):
user_obj = model.user.get_user("devtable")
fresh_user = model.user.get_user("freshuser")
random_user = model.user.get_user("randomuser")
test_org_1 = model.organization.create_organization(
"test_org_1", "testorg1@example.com", user_obj
)
test_org_2 = model.organization.create_organization(
"test_org_2", "testorg2@example.com", fresh_user
)
test_org_3 = model.organization.create_organization(
"test_org_3", "testorg3@example.com", random_user
)
team_1 = model.team.create_team("team_1", test_org_1, "member")
assert model.team.set_team_syncing(team_1, "oidc", {"group_name": "test_org_1_team_1"})
team_2 = model.team.create_team("team_2", test_org_2, "member")
assert model.team.set_team_syncing(team_2, "oidc", {"group_name": "test_org_2_team_2"})
team_3 = model.team.create_team("team_3", test_org_3, "member")
assert model.team.set_team_syncing(team_3, "ldap", {"group_name": "test_org_3_team_3"})
user_groups = [
"test_org_1_team_1",
"test_org_2_team_2",
"test_org_3_team_3",
"wrong_group_name",
]
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
self.oidc_instance.sync_oidc_groups(user_groups, user_obj)
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
assert user_teams_before_sync + 2 == user_teams_after_sync
# attempt to sync already synced groups
self.oidc_instance.sync_oidc_groups(user_groups, user_obj)
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
assert user_teams_before_sync + 2 == user_teams_after_sync
def test_resync_for_empty_quay_teams(self):
user_obj = model.user.get_user("devtable")
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
self.oidc_instance.resync_quay_teams([], user_obj)
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
assert user_teams_before_sync == user_teams_after_sync
# add user to team that doesn't have team sync enabled
test_org_1 = model.organization.create_organization(
"test_org_1", "testorg1@example.com", user_obj
)
team_1 = model.team.create_team("team_1", test_org_1, "member")
assert model.team.add_user_to_team(user_obj, team_1)
# add user to team that has team sync enabled with a different login service
team_2 = model.team.create_team("team_2", test_org_1, "member")
assert model.team.set_team_syncing(team_2, "ldap", None)
assert model.team.add_user_to_team(user_obj, team_2)
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
self.oidc_instance.resync_quay_teams([], user_obj)
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
assert user_teams_before_sync == user_teams_after_sync
def test_resync_for_non_empty_quay_teams(self):
user_obj = model.user.get_user("devtable")
fresh_user = model.user.get_user("freshuser")
test_org_1 = model.organization.create_organization(
"test_org_1", "testorg1@example.com", user_obj
)
test_org_2 = model.organization.create_organization(
"test_org_2", "testorg2@example.com", fresh_user
)
# add user to team that has team sync enabled with oidc login service
team_1 = model.team.create_team("team_1", test_org_1, "member")
assert model.team.set_team_syncing(team_1, "oidc", {"group_name": "test_org_1_team_1"})
assert model.team.add_user_to_team(user_obj, team_1)
# add user to another team that has team sync enabled with oidc login service
team_2 = model.team.create_team("team_1", test_org_2, "member")
assert model.team.set_team_syncing(team_2, "oidc", {"group_name": "test_org_2_team_1"})
assert model.team.add_user_to_team(user_obj, team_2)
# add user to another team that has team sync enabled with oidc login service
team_3 = model.team.create_team("team_2", test_org_2, "member")
assert model.team.set_team_syncing(team_3, "oidc", {"group_name": "test_org_2_team_2"})
assert model.team.add_user_to_team(user_obj, team_3)
user_groups = ["test_org_1_team_1", "another_group", "test_org_2_team_2"]
# user should be removed from team_2
self.oidc_instance.resync_quay_teams(user_groups, user_obj)
assert (
TeamMember.select()
.where(TeamMember.user == user_obj, TeamMember.team == team_2)
.count()
== 0
)
# user should part of team_1 and team_3
assert (
TeamMember.select()
.where(TeamMember.user == user_obj, TeamMember.team << [team_1, team_3])
.count()
== 2
)
def test_sync_user_groups_for_empty_user_obj(self):
assert self.oidc_instance.sync_user_groups([], None, self.oidc_login_service) is None
assert self.oidc_instance.sync_user_groups(None, None, self.oidc_login_service) is None
user_groups = ["test_org_1_team_1", "another_group"]
assert (
self.oidc_instance.sync_user_groups(user_groups, None, self.oidc_login_service) is None
)
user_obj = model.user.get_user("devtable")
fresh_user = model.user.get_user("freshuser")
test_org_1 = model.organization.create_organization(
"test_org_1", "testorg1@example.com", user_obj
)
# team set to sync but user is not added to team
team_1 = model.team.create_team("team_1", test_org_1, "member")
assert model.team.set_team_syncing(
team_1, self.oidc_login_service.service_id(), {"group_name": "test_org_1_team_1"}
)
test_org_2 = model.organization.create_organization(
"test_org_2", "testorg2@example.com", fresh_user
)
# team set to sync and user is added to team
team_2 = model.team.create_team("team_1", test_org_2, "member")
assert model.team.set_team_syncing(
team_2, self.oidc_login_service.service_id(), {"group_name": "test_org_2_team_1"}
)
assert model.team.add_user_to_team(user_obj, team_2)
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
# user will be removed from team_2
self.oidc_instance.sync_user_groups([], user_obj, self.oidc_login_service)
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
assert user_teams_before_sync == user_teams_after_sync + 1
# user will be removed from team_1
self.oidc_instance.sync_user_groups(user_groups, user_obj, self.oidc_login_service)
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
assert user_teams_before_sync == user_teams_after_sync
def test_missing_verify_credentials(self):
result, error_msg = self.oidc_instance.verify_credentials(None, "password")
assert result is None
assert error_msg == "Missing username or email."
result, error_msg = self.oidc_instance.verify_credentials("username", None)
assert result is None
assert error_msg == "Anonymous binding not allowed."
def test_query_users(self):
result, service, error_msg = self.oidc_instance.query_users("some_query_here", None)
assert len(result) == 0
assert service == "oidc"
assert error_msg == "Not supported"
def test_verify_credentials(discovery_handler, token_handler_password_grant, userinfo_handler):
oidc_instance = OIDCAuthTests().fake_oidc()
with HTTMock(discovery_handler, token_handler_password_grant, userinfo_handler):
result, error_msg = oidc_instance.verify_credentials("username", "password")
# no result because invalid credentials
assert result is None
result, error_msg = oidc_instance.verify_credentials("someusername", "somepassword")
assert result.username == "someusername"
assert result.email == "foo@example.com"
assert result.id == "cooluser"
assert error_msg is None
def test_service_metadata(discovery_handler, token_handler_password_grant, userinfo_handler):
oidc_instance = OIDCAuthTests().fake_oidc()
with HTTMock(discovery_handler, token_handler_password_grant, userinfo_handler):
result = oidc_instance.service_metadata()
assert result["issuer_domain"] == "fakeoidc"
def test_user_restrictions(discovery_handler, token_handler_password_grant, userinfo_handler):
oidc_instance = OIDCAuthTests().fake_oidc()
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with HTTMock(discovery_handler, token_handler_password_grant, userinfo_handler):
result, error_msg = oidc_instance.verify_credentials("someusername", "somepassword")
assert result.username == "someusername"
assert error_msg is None
# check if user is super user
check_is_superuser = usermanager.is_superuser(result.username)
assert check_is_superuser == False
# turn on restricted users
check_is_restricted_user = usermanager.is_restricted_user(result.username)
assert check_is_restricted_user == True