mirror of
https://github.com/quay/quay.git
synced 2025-04-18 10:44:06 +03:00
This issue should fix PROJQUAY-7747 and PROJQUAY-8130. Adding IntegrityError to the try except block to skip adding existing user to team.
348 lines
14 KiB
Python
348 lines
14 KiB
Python
import json
|
|
import unittest
|
|
import urllib.parse
|
|
|
|
import pytest
|
|
from httmock import HTTMock, urlmatch
|
|
from mock import patch
|
|
|
|
from app import app as realapp
|
|
from app import usermanager
|
|
from data import model
|
|
from data.database import TeamMember
|
|
from data.users.externaloidc import OIDCUsers
|
|
from features import FeatureNameValue
|
|
from initdb import finished_database_for_testing, setup_database_for_testing
|
|
from oauth.oidc import OIDCLoginService, PasswordGrantException
|
|
from test.fixtures import *
|
|
|
|
|
|
@pytest.fixture()
|
|
def discovery_content():
|
|
return {
|
|
"scopes_supported": ["openid"],
|
|
"authorization_endpoint": "http://fakeoidc/authorize",
|
|
"token_endpoint": "http://fakeoidc/token",
|
|
"jwks_uri": "http://fakeoidc/jwks",
|
|
"userinfo_endpoint": "http://fakeoidc/userinfo",
|
|
}
|
|
|
|
|
|
@pytest.fixture()
|
|
def discovery_handler(discovery_content):
|
|
@urlmatch(netloc=r"fakeoidc", path=r".+openid.+")
|
|
def handler(_, __):
|
|
return json.dumps(discovery_content)
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def token_handler_password_grant():
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/token")
|
|
def handler(_, request):
|
|
params = urllib.parse.parse_qs(request.body)
|
|
if params.get("grant_type")[0] != "password":
|
|
return {"status_code": 400, "content": "Invalid authorization type"}
|
|
|
|
if params.get("username")[0] != "someusername":
|
|
return {"status_code": 401, "content": "Invalid login credentials"}
|
|
|
|
if params.get("password")[0] != "somepassword":
|
|
return {"status_code": 401, "content": "Invalid login credentials"}
|
|
|
|
content = {
|
|
"access_token": "sometoken",
|
|
}
|
|
return {"status_code": 200, "content": json.dumps(content)}
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def userinfo_content():
|
|
return {
|
|
"sub": "cooluser",
|
|
"preferred_username": "someusername",
|
|
"email": "foo@example.com",
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def userinfo_handler(userinfo_content):
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/userinfo")
|
|
def handler(_, req):
|
|
if req.headers.get("Authorization") != "Bearer sometoken":
|
|
return {"status_code": 401, "content": "Missing expected header"}
|
|
|
|
return {"status_code": 200, "content": json.dumps(userinfo_content)}
|
|
|
|
return handler
|
|
|
|
|
|
class OIDCAuthTests(unittest.TestCase):
|
|
def fake_oidc(self):
|
|
"""
|
|
Instantiates a fake OIDC instance to use in the test cases
|
|
"""
|
|
client_id = "quay-test"
|
|
client_secret = "secret"
|
|
oidc_server = "http://test-server/realms/myrealm"
|
|
service_name = "test"
|
|
login_scopes = ["openid"]
|
|
preferred_group_claim_name = "groups"
|
|
|
|
oidc_instance = OIDCUsers(
|
|
client_id,
|
|
client_secret,
|
|
oidc_server,
|
|
service_name,
|
|
login_scopes,
|
|
preferred_group_claim_name,
|
|
)
|
|
return oidc_instance
|
|
|
|
def fake_oidc_login_service(self):
|
|
config = {
|
|
"CLIENT_ID": "foo",
|
|
"CLIENT_SECRET": "bar",
|
|
"SERVICE_NAME": "Some Cool Service",
|
|
"SERVICE_ICON": "http://some/icon",
|
|
"OIDC_SERVER": "http://fakeoidc",
|
|
"DEBUGGING": True,
|
|
}
|
|
return OIDCLoginService(config, "OIDC_LOGIN_CONFIG")
|
|
|
|
def setUp(self):
|
|
setup_database_for_testing(self)
|
|
self.oidc_instance = self.fake_oidc()
|
|
self.oidc_login_service = self.fake_oidc_login_service()
|
|
|
|
def tearDown(self):
|
|
finished_database_for_testing(self)
|
|
|
|
def test_sync_for_empty_oidc_groups(self):
|
|
user_obj = model.user.get_user("devtable")
|
|
|
|
test_org = model.organization.create_organization(
|
|
"testorg", "testorg@example.com", user_obj
|
|
)
|
|
team_1 = model.team.create_team("team_1", test_org, "member")
|
|
assert model.team.add_user_to_team(user_obj, team_1)
|
|
|
|
team_2 = model.team.create_team("team_2", test_org, "member")
|
|
assert model.team.add_user_to_team(user_obj, team_2)
|
|
|
|
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
self.oidc_instance.sync_oidc_groups([], user_obj)
|
|
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
assert user_teams_before_sync == user_teams_after_sync
|
|
|
|
self.oidc_instance.sync_oidc_groups(None, user_obj)
|
|
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
assert user_teams_before_sync == user_teams_after_sync
|
|
|
|
def test_sync_for_non_empty_oidc_groups(self):
|
|
user_obj = model.user.get_user("devtable")
|
|
fresh_user = model.user.get_user("freshuser")
|
|
random_user = model.user.get_user("randomuser")
|
|
|
|
test_org_1 = model.organization.create_organization(
|
|
"test_org_1", "testorg1@example.com", user_obj
|
|
)
|
|
test_org_2 = model.organization.create_organization(
|
|
"test_org_2", "testorg2@example.com", fresh_user
|
|
)
|
|
test_org_3 = model.organization.create_organization(
|
|
"test_org_3", "testorg3@example.com", random_user
|
|
)
|
|
|
|
team_1 = model.team.create_team("team_1", test_org_1, "member")
|
|
assert model.team.set_team_syncing(team_1, "oidc", {"group_name": "test_org_1_team_1"})
|
|
|
|
team_2 = model.team.create_team("team_2", test_org_2, "member")
|
|
assert model.team.set_team_syncing(team_2, "oidc", {"group_name": "test_org_2_team_2"})
|
|
|
|
team_3 = model.team.create_team("team_3", test_org_3, "member")
|
|
assert model.team.set_team_syncing(team_3, "ldap", {"group_name": "test_org_3_team_3"})
|
|
|
|
user_groups = [
|
|
"test_org_1_team_1",
|
|
"test_org_2_team_2",
|
|
"test_org_3_team_3",
|
|
"wrong_group_name",
|
|
]
|
|
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
self.oidc_instance.sync_oidc_groups(user_groups, user_obj)
|
|
|
|
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
|
|
assert user_teams_before_sync + 2 == user_teams_after_sync
|
|
|
|
# attempt to sync already synced groups
|
|
self.oidc_instance.sync_oidc_groups(user_groups, user_obj)
|
|
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
|
|
assert user_teams_before_sync + 2 == user_teams_after_sync
|
|
|
|
def test_resync_for_empty_quay_teams(self):
|
|
user_obj = model.user.get_user("devtable")
|
|
|
|
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
self.oidc_instance.resync_quay_teams([], user_obj)
|
|
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
assert user_teams_before_sync == user_teams_after_sync
|
|
|
|
# add user to team that doesn't have team sync enabled
|
|
test_org_1 = model.organization.create_organization(
|
|
"test_org_1", "testorg1@example.com", user_obj
|
|
)
|
|
team_1 = model.team.create_team("team_1", test_org_1, "member")
|
|
assert model.team.add_user_to_team(user_obj, team_1)
|
|
|
|
# add user to team that has team sync enabled with a different login service
|
|
team_2 = model.team.create_team("team_2", test_org_1, "member")
|
|
assert model.team.set_team_syncing(team_2, "ldap", None)
|
|
assert model.team.add_user_to_team(user_obj, team_2)
|
|
|
|
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
self.oidc_instance.resync_quay_teams([], user_obj)
|
|
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
assert user_teams_before_sync == user_teams_after_sync
|
|
|
|
def test_resync_for_non_empty_quay_teams(self):
|
|
user_obj = model.user.get_user("devtable")
|
|
fresh_user = model.user.get_user("freshuser")
|
|
|
|
test_org_1 = model.organization.create_organization(
|
|
"test_org_1", "testorg1@example.com", user_obj
|
|
)
|
|
test_org_2 = model.organization.create_organization(
|
|
"test_org_2", "testorg2@example.com", fresh_user
|
|
)
|
|
|
|
# add user to team that has team sync enabled with oidc login service
|
|
team_1 = model.team.create_team("team_1", test_org_1, "member")
|
|
assert model.team.set_team_syncing(team_1, "oidc", {"group_name": "test_org_1_team_1"})
|
|
assert model.team.add_user_to_team(user_obj, team_1)
|
|
|
|
# add user to another team that has team sync enabled with oidc login service
|
|
team_2 = model.team.create_team("team_1", test_org_2, "member")
|
|
assert model.team.set_team_syncing(team_2, "oidc", {"group_name": "test_org_2_team_1"})
|
|
assert model.team.add_user_to_team(user_obj, team_2)
|
|
|
|
# add user to another team that has team sync enabled with oidc login service
|
|
team_3 = model.team.create_team("team_2", test_org_2, "member")
|
|
assert model.team.set_team_syncing(team_3, "oidc", {"group_name": "test_org_2_team_2"})
|
|
assert model.team.add_user_to_team(user_obj, team_3)
|
|
|
|
user_groups = ["test_org_1_team_1", "another_group", "test_org_2_team_2"]
|
|
# user should be removed from team_2
|
|
self.oidc_instance.resync_quay_teams(user_groups, user_obj)
|
|
assert (
|
|
TeamMember.select()
|
|
.where(TeamMember.user == user_obj, TeamMember.team == team_2)
|
|
.count()
|
|
== 0
|
|
)
|
|
|
|
# user should part of team_1 and team_3
|
|
assert (
|
|
TeamMember.select()
|
|
.where(TeamMember.user == user_obj, TeamMember.team << [team_1, team_3])
|
|
.count()
|
|
== 2
|
|
)
|
|
|
|
def test_sync_user_groups_for_empty_user_obj(self):
|
|
assert self.oidc_instance.sync_user_groups([], None, self.oidc_login_service) is None
|
|
assert self.oidc_instance.sync_user_groups(None, None, self.oidc_login_service) is None
|
|
|
|
user_groups = ["test_org_1_team_1", "another_group"]
|
|
assert (
|
|
self.oidc_instance.sync_user_groups(user_groups, None, self.oidc_login_service) is None
|
|
)
|
|
|
|
user_obj = model.user.get_user("devtable")
|
|
fresh_user = model.user.get_user("freshuser")
|
|
test_org_1 = model.organization.create_organization(
|
|
"test_org_1", "testorg1@example.com", user_obj
|
|
)
|
|
# team set to sync but user is not added to team
|
|
team_1 = model.team.create_team("team_1", test_org_1, "member")
|
|
assert model.team.set_team_syncing(
|
|
team_1, self.oidc_login_service.service_id(), {"group_name": "test_org_1_team_1"}
|
|
)
|
|
|
|
test_org_2 = model.organization.create_organization(
|
|
"test_org_2", "testorg2@example.com", fresh_user
|
|
)
|
|
# team set to sync and user is added to team
|
|
team_2 = model.team.create_team("team_1", test_org_2, "member")
|
|
assert model.team.set_team_syncing(
|
|
team_2, self.oidc_login_service.service_id(), {"group_name": "test_org_2_team_1"}
|
|
)
|
|
assert model.team.add_user_to_team(user_obj, team_2)
|
|
|
|
user_teams_before_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
# user will be removed from team_2
|
|
self.oidc_instance.sync_user_groups([], user_obj, self.oidc_login_service)
|
|
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
assert user_teams_before_sync == user_teams_after_sync + 1
|
|
|
|
# user will be removed from team_1
|
|
self.oidc_instance.sync_user_groups(user_groups, user_obj, self.oidc_login_service)
|
|
user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count()
|
|
assert user_teams_before_sync == user_teams_after_sync
|
|
|
|
def test_missing_verify_credentials(self):
|
|
result, error_msg = self.oidc_instance.verify_credentials(None, "password")
|
|
assert result is None
|
|
assert error_msg == "Missing username or email."
|
|
|
|
result, error_msg = self.oidc_instance.verify_credentials("username", None)
|
|
assert result is None
|
|
assert error_msg == "Anonymous binding not allowed."
|
|
|
|
def test_query_users(self):
|
|
result, service, error_msg = self.oidc_instance.query_users("some_query_here", None)
|
|
assert len(result) == 0
|
|
assert service == "oidc"
|
|
assert error_msg == "Not supported"
|
|
|
|
|
|
def test_verify_credentials(discovery_handler, token_handler_password_grant, userinfo_handler):
|
|
oidc_instance = OIDCAuthTests().fake_oidc()
|
|
with HTTMock(discovery_handler, token_handler_password_grant, userinfo_handler):
|
|
result, error_msg = oidc_instance.verify_credentials("username", "password")
|
|
# no result because invalid credentials
|
|
assert result is None
|
|
|
|
result, error_msg = oidc_instance.verify_credentials("someusername", "somepassword")
|
|
assert result.username == "someusername"
|
|
assert result.email == "foo@example.com"
|
|
assert result.id == "cooluser"
|
|
assert error_msg is None
|
|
|
|
|
|
def test_service_metadata(discovery_handler, token_handler_password_grant, userinfo_handler):
|
|
oidc_instance = OIDCAuthTests().fake_oidc()
|
|
with HTTMock(discovery_handler, token_handler_password_grant, userinfo_handler):
|
|
result = oidc_instance.service_metadata()
|
|
assert result["issuer_domain"] == "fakeoidc"
|
|
|
|
|
|
def test_user_restrictions(discovery_handler, token_handler_password_grant, userinfo_handler):
|
|
oidc_instance = OIDCAuthTests().fake_oidc()
|
|
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
|
|
with HTTMock(discovery_handler, token_handler_password_grant, userinfo_handler):
|
|
result, error_msg = oidc_instance.verify_credentials("someusername", "somepassword")
|
|
assert result.username == "someusername"
|
|
assert error_msg is None
|
|
# check if user is super user
|
|
check_is_superuser = usermanager.is_superuser(result.username)
|
|
assert check_is_superuser == False
|
|
# turn on restricted users
|
|
check_is_restricted_user = usermanager.is_restricted_user(result.username)
|
|
assert check_is_restricted_user == True
|