1
0
mirror of https://github.com/quay/quay.git synced 2026-01-27 18:42:52 +03:00
Files
quay/data/model/test/test_user.py
Syed Ahmed e9161cb3ae robots: Add robot federation for keyless auth (PROJQUAY-7803) (#3207)
robots: Add robot federation for keyless auth (PROJQUAY-7652)

adds the ability to configure federated auth for robots by
using external OIDC providers. Each robot can be configured
to have multiple external OIDC providers as the source for
authentication.
2024-09-24 11:32:38 -04:00

400 lines
14 KiB
Python

from datetime import datetime
from unittest.mock import Mock
import pytest
from mock import patch
from auth.scopes import READ_REPO
from auth.test.mock_oidc_server import MOCK_PUBLIC_KEY, generate_mock_oidc_token
from data import model
from data.database import DeletedNamespace, EmailConfirmation, FederatedLogin, User
from data.fields import Credential
from data.model.notification import create_notification
from data.model.oauth import (
assign_token_to_user,
get_oauth_application_for_client_id,
get_token_assignment,
)
from data.model.organization import (
create_organization,
get_organization,
get_organizations,
)
from data.model.repository import create_repository
from data.model.team import add_user_to_team, create_team
from data.model.user import (
InvalidRobotException,
RobotAccountToken,
attach_federated_login,
create_robot,
create_user_noverify,
delete_namespace_via_marker,
delete_robot,
get_active_namespaces,
get_active_users,
get_estimated_robot_count,
get_matching_users,
get_public_repo_count,
get_pull_credentials,
get_quay_user_from_federated_login_name,
get_user,
list_namespace_robots,
lookup_robot,
mark_namespace_for_deletion,
retrieve_robot_token,
validate_reset_code,
verify_robot,
)
from data.queue import WorkQueue
from test.fixtures import *
from test.helpers import check_transitive_modifications
from util.security.instancekeys import InstanceKeys
from util.security.token import encode_public_private_token
from util.timedeltastring import convert_to_timedelta
def test_create_user_with_expiration(initialized_db):
with patch("data.model.config.app_config", {"DEFAULT_TAG_EXPIRATION": "1h"}):
user = create_user_noverify("foobar", "foo@example.com", email_required=False)
assert user.removed_tag_expiration_s == 60 * 60
@pytest.mark.parametrize(
"token_lifetime, time_since",
[
("1m", "2m"),
("2m", "1m"),
("1h", "1m"),
],
)
def test_validation_code(token_lifetime, time_since, initialized_db):
user = create_user_noverify("foobar", "foo@example.com", email_required=False)
created = datetime.now() - convert_to_timedelta(time_since)
verification_code, unhashed = Credential.generate()
confirmation = EmailConfirmation.create(
user=user, pw_reset=True, created=created, verification_code=verification_code
)
encoded = encode_public_private_token(confirmation.code, unhashed)
with patch("data.model.config.app_config", {"USER_RECOVERY_TOKEN_LIFETIME": token_lifetime}):
result = validate_reset_code(encoded)
expect_success = convert_to_timedelta(token_lifetime) >= convert_to_timedelta(time_since)
assert expect_success == (result is not None)
@pytest.mark.parametrize(
"disabled",
[
(True),
(False),
],
)
@pytest.mark.parametrize(
"deleted",
[
(True),
(False),
],
)
def test_get_active_users(disabled, deleted, initialized_db):
# Delete a user.
deleted_user = model.user.get_user("public")
queue = WorkQueue("testgcnamespace", lambda db: db.transaction())
mark_namespace_for_deletion(deleted_user, [], queue)
users = get_active_users(disabled=disabled, deleted=deleted)
deleted_found = [user for user in users if user.id == deleted_user.id]
assert bool(deleted_found) == (deleted and disabled)
for user in users:
if not disabled:
assert user.enabled
def test_mark_user_for_deletion(initialized_db):
def create_transaction(db):
return db.transaction()
# Create a user and then mark it for deletion.
user = create_user_noverify("foobar", "foo@example.com", email_required=False)
# Add some robots.
create_robot("foo", user)
create_robot("bar", user)
assert lookup_robot("foobar+foo") is not None
assert lookup_robot("foobar+bar") is not None
assert len(list(list_namespace_robots("foobar"))) == 2
# Add some federated user links.
attach_federated_login(user, "google", "someusername")
attach_federated_login(user, "github", "someusername")
assert FederatedLogin.select().where(FederatedLogin.user == user).count() == 2
assert FederatedLogin.select().where(FederatedLogin.service_ident == "someusername").exists()
# Add an oauth assigned token
org = model.organization.get_organization("buynlarge")
application = model.oauth.create_application(
org, "test", "http://foo/bar", "http://foo/bar/baz"
)
assigned_token = assign_token_to_user(
application, user, "http://foo/bar/baz", READ_REPO.scope, "token"
)
assert get_token_assignment(assigned_token.uuid, user, org) is not None
# Mark the user for deletion.
queue = WorkQueue("testgcnamespace", create_transaction)
mark_namespace_for_deletion(user, [], queue)
# Ensure the older user is still in the DB.
older_user = User.get(id=user.id)
assert older_user.username != "foobar"
# Ensure the robots are deleted.
with pytest.raises(InvalidRobotException):
assert lookup_robot("foobar+foo")
with pytest.raises(InvalidRobotException):
assert lookup_robot("foobar+bar")
assert len(list(list_namespace_robots(older_user.username))) == 0
# Ensure the federated logins are gone.
assert FederatedLogin.select().where(FederatedLogin.user == user).count() == 0
assert (
not FederatedLogin.select().where(FederatedLogin.service_ident == "someusername").exists()
)
# Ensure the oauth assigned token is gone
assert get_token_assignment(assigned_token.uuid, user, org) is None
# Ensure we can create a user with the same namespace again.
new_user = create_user_noverify("foobar", "foo@example.com", email_required=False)
assert new_user.id != user.id
# Ensure the older user is still in the DB.
assert User.get(id=user.id).username != "foobar"
def test_mark_organization_for_deletion(initialized_db):
def create_transaction(db):
return db.transaction()
# Create a user and then mark it for deletion.
user = get_user("devtable")
org = create_organization("foobar", "foobar@devtable.com", user)
# Add some robots.
create_robot("foo", org)
create_robot("bar", org)
assert lookup_robot("foobar+foo") is not None
assert lookup_robot("foobar+bar") is not None
assert len(list(list_namespace_robots("foobar"))) == 2
# Add some federated user links.
attach_federated_login(org, "google", "someusername")
attach_federated_login(org, "github", "someusername")
assert FederatedLogin.select().where(FederatedLogin.user == org).count() == 2
assert FederatedLogin.select().where(FederatedLogin.service_ident == "someusername").exists()
# Add an oauth assigned token and application
application1 = model.oauth.create_application(
org, "test", "http://foo/bar", "http://foo/bar/baz"
)
application = model.oauth.create_application(
org, "test2", "http://foo/bar", "http://foo/bar/baz"
)
assigned_token = assign_token_to_user(
application, user, "http://foo/bar/baz", READ_REPO.scope, "token"
)
assert get_token_assignment(assigned_token.uuid, user, org) is not None
# Mark the user for deletion.
queue = WorkQueue("testgcnamespace", create_transaction)
mark_namespace_for_deletion(org, [], queue)
# Ensure the older user is still in the DB.
older_user = User.get(id=org.id)
assert older_user.username != "foobar"
# Ensure the robots are deleted.
with pytest.raises(InvalidRobotException):
assert lookup_robot("foobar+foo")
with pytest.raises(InvalidRobotException):
assert lookup_robot("foobar+bar")
assert len(list(list_namespace_robots(older_user.username))) == 0
# Ensure the federated logins are gone.
assert FederatedLogin.select().where(FederatedLogin.user == org).count() == 0
assert (
not FederatedLogin.select().where(FederatedLogin.service_ident == "someusername").exists()
)
# Ensure the oauth assigned token is gone
assert get_oauth_application_for_client_id(application1.client_id) is None
assert get_oauth_application_for_client_id(application.client_id) is None
assert get_token_assignment(assigned_token.uuid, org, org) is None
# Ensure we can create a user with the same namespace again.
new_org = create_organization("foobar", "foobar@devtable.com", user)
assert new_org.id != org.id
# Ensure the older org is still in the DB.
assert User.get(id=org.id).username != "foobar"
def test_delete_namespace_via_marker(initialized_db):
def create_transaction(db):
return db.transaction()
# Create a user and then mark it for deletion.
user = create_user_noverify("foobar", "foo@example.com", email_required=False)
# Add some repositories.
create_repository("foobar", "somerepo", user)
create_repository("foobar", "anotherrepo", user)
# Mark the user for deletion.
queue = WorkQueue("testgcnamespace", create_transaction)
marker_id = mark_namespace_for_deletion(user, [], queue)
# Delete the user.
with check_transitive_modifications():
delete_namespace_via_marker(marker_id, [])
# Ensure the user was actually deleted.
with pytest.raises(User.DoesNotExist):
User.get(id=user.id)
with pytest.raises(DeletedNamespace.DoesNotExist):
DeletedNamespace.get(id=marker_id)
def test_delete_robot(initialized_db):
# Create a robot account.
user = create_user_noverify("foobar", "foo@example.com", email_required=False)
robot, _ = create_robot("foo", user)
# Add some notifications and other rows pointing to the robot.
create_notification("repo_push", robot)
team = create_team("someteam", get_organization("buynlarge"), "member")
add_user_to_team(robot, team)
# Ensure the robot exists.
assert lookup_robot(robot.username).id == robot.id
# Delete the robot.
delete_robot(robot.username)
# Ensure it is gone.
with pytest.raises(InvalidRobotException):
lookup_robot(robot.username)
def test_get_matching_users(initialized_db):
# Exact match.
for user in User.select().where(User.organization == False, User.robot == False):
assert list(get_matching_users(user.username))[0].username == user.username
# Prefix matching.
for user in User.select().where(User.organization == False, User.robot == False):
assert user.username in [r.username for r in get_matching_users(user.username[:2])]
def test_get_matching_users_with_same_prefix(initialized_db):
# Create a bunch of users with the same prefix.
for index in range(0, 20):
create_user_noverify("foo%s" % index, "foo%s@example.com" % index, email_required=False)
# For each user, ensure that lookup of the exact name is found first.
for index in range(0, 20):
username = "foo%s" % index
assert list(get_matching_users(username))[0].username == username
# Prefix matching.
found = list(get_matching_users("foo", limit=50))
assert len(found) == 20
def test_robot(initialized_db):
# Create a robot account.
user = create_user_noverify("foobar", "foo@example.com", email_required=False)
robot, token = create_robot("foo", user)
assert retrieve_robot_token(robot) == token
# Ensure we can retrieve its information.
found = lookup_robot("foobar+foo")
assert found == robot
creds = get_pull_credentials("foobar+foo")
assert creds is not None
assert creds["username"] == "foobar+foo"
assert creds["password"] == token
assert verify_robot("foobar+foo", token, None) == robot
with pytest.raises(InvalidRobotException):
assert verify_robot("foobar+foo", "someothertoken", None)
with pytest.raises(InvalidRobotException):
assert verify_robot("foobar+unknownbot", token, None)
def test_jwt_robot_token(initialized_db):
user = get_user("devtable")
org = create_organization("foobar", "foobar@devtable.com", user)
create_robot("foo", org)
mock_oidc_token = generate_mock_oidc_token(
issuer="quay", audience="quay-aud", subject="foobar+foo"
)
robot, token = create_robot("foo", user)
mock_instance_keys = Mock(InstanceKeys)
mock_instance_keys.service_name = "quay"
mock_instance_keys.get_service_key_public_key = Mock(return_value=MOCK_PUBLIC_KEY)
with patch("data.model.config.app_config", {"SERVER_HOSTNAME": "quay-aud"}):
verify_robot("foobar+foo", mock_oidc_token, mock_instance_keys)
def test_get_estimated_robot_count(initialized_db):
assert get_estimated_robot_count() >= RobotAccountToken.select().count()
def test_get_quay_user_from_federated_login_name(initialized_db):
username = "non-existant-user"
assert get_quay_user_from_federated_login_name(username) is None
devtable_username = "devtable"
# When quay.io username is same as SSO username
result = get_quay_user_from_federated_login_name(devtable_username)
assert result.username == devtable_username
freshuser_username = "freshuser" # SSO username
public_username = "public" # quayio username
# When quay.io username is different from SSO username
result = get_quay_user_from_federated_login_name(freshuser_username)
assert result.username == public_username
def test_get_public_repo_count(initialized_db):
username = "non-existant-user"
assert get_public_repo_count(username) == 0
public_username = "public"
assert get_public_repo_count(public_username) == 1
def test_get_active_namespaces(initialized_db):
num_active_users = len(get_active_users(disabled=False))
num_organizations = len(get_organizations())
active_namespaces = len(get_active_namespaces())
assert num_active_users > 0
assert num_organizations > 0
assert active_namespaces == num_active_users + num_organizations