1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/test/test_keystone_auth.py
Ivan Bazulic 1b27dd3c01
auth: Implement is_restricted_user for federated auth systems (PROJQUAY-8208) (#3400)
* auth: Implement is_restricted_user for OIDC and allow super users to create content regardless of set restriction (PROJQUAY-8208)
Currently, if OIDC is set as an authentication mechanism and restricted users is set, Quay will return a `501 Not Implemented` on invocation. Now, Quay will properly check the restricted user whitelist for federated users.
Additionally, if user restriction is in place and super user's username was **not** explicitly whitelisted, super users would not be able to create new content inside the registry. Now, the username is explicitly checked in the UI to allow super users to create both organizations and repos regardless of restricted users whitelist.

* Add tests

* Add tests for usermanager
2024-11-25 14:47:03 -05:00

462 lines
16 KiB
Python

import json
import os
import unittest
import unittest.util
from contextlib import contextmanager
from typing import Optional
import requests
from flask import Flask, abort, make_response, request
from mock import patch
from app import app as realapp
from app import usermanager
from data.users.keystone import get_keystone_users
from features import FeatureNameValue
from initdb import finished_database_for_testing, setup_database_for_testing
from test.helpers import liveserver_app
_PORT_NUMBER = 5001
@contextmanager
def fake_keystone(version=3, requires_email=True):
"""
Context manager which instantiates and runs a webserver with a fake Keystone implementation,
until the result is yielded.
Usage:
with fake_keystone(version) as keystone_auth:
# Make keystone_auth requests.
"""
keystone_app, port = _create_app(requires_email)
server_url = "http://" + keystone_app.config["SERVER_HOSTNAME"]
endpoint_url = server_url + "/v3"
if version == 2:
endpoint_url = server_url + "/v2.0/auth"
keystone_auth = get_keystone_users(
version,
endpoint_url,
"adminuser",
"adminpass",
"admintenant",
requires_email=requires_email,
)
with liveserver_app(keystone_app, port):
yield keystone_auth
def _create_app(requires_email=True):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
server_url = "http://localhost:%s" % (_PORT_NUMBER)
users = [
{"username": "adminuser", "name": "Admin User", "password": "adminpass"},
{"username": "cool.user", "name": "Cool User", "password": "password"},
{"username": "some.neat.user", "name": "Neat User", "password": "foobar"},
]
# Feature Flag: Email-based Blacklisting
# Create additional, mocked Users
test_domains = (
"blacklisted.com",
"blacklisted.net",
"blacklisted.org",
"notblacklisted.com",
"mail.blacklisted.com",
)
for domain in test_domains:
mock_email = "foo@" + domain # e.g. foo@blacklisted.com
new_user = {
"username": mock_email, # Simplifies consistent querying in tests
"name": domain.replace(".", ""), # blacklisted.com => blacklistedcom
"email": mock_email,
"password": "somepass",
}
users.append(new_user)
groups = [
{
"id": "somegroupid",
"name": "somegroup",
"description": "Hi there!",
"members": ["adminuser", "cool.user"],
},
{
"id": "admintenant",
"name": "somegroup",
"description": "Hi there!",
"members": ["adminuser", "cool.user"],
},
]
def _get_user(username):
for user in users:
if user["username"] == username:
user_data = {}
user_data["id"] = username
user_data["name"] = username
if requires_email:
user_data["email"] = user.get("email") or username + "@example.com"
return user_data
return None
ks_app = Flask("testks")
ks_app.config["SERVER_HOSTNAME"] = "localhost:%s" % _PORT_NUMBER
if os.environ.get("DEBUG") == "true":
ks_app.config["DEBUG"] = True
@ks_app.route("/v2.0/admin/users/<userid>", methods=["GET"])
def getuser(userid):
for user in users:
if user["username"] == userid:
user_data = {}
user_data["name"] = userid
if requires_email:
user_data["email"] = user.get("email") or userid + "@example.com"
return json.dumps({"user": user_data})
abort(404)
# v2 referred to all groups as tenants, so replace occurrences of 'group' with 'tenant'
@ks_app.route("/v2.0/admin/tenants/<tenant>/users", methods=["GET"])
def getv2_tenant_members(tenant):
return getv3groupmembers(tenant)
@ks_app.route("/v3/identity/groups/<groupid>/users", methods=["GET"])
def getv3groupmembers(groupid):
for group in groups:
if group["id"] == groupid:
group_data = {
"links": {},
"users": [_get_user(username) for username in group["members"]],
}
return json.dumps(group_data)
abort(404)
@ks_app.route("/v3/identity/groups/<groupid>", methods=["GET"])
def getv3group(groupid):
for group in groups:
if group["id"] == groupid:
group_data = {
"description": group["description"],
"domain_id": "default",
"id": groupid,
"links": {},
"name": group["name"],
}
return json.dumps({"group": group_data})
abort(404)
@ks_app.route("/v3/identity/users/<userid>", methods=["GET"])
def getv3user(userid):
for user in users:
if user["username"] == userid:
user_data = {
"domain_id": "default",
"enabled": True,
"id": user["username"],
"links": {},
"name": user["username"],
}
if requires_email:
user_data["email"] = user.get("email") or user["username"] + "@example.com"
return json.dumps({"user": user_data})
abort(404)
@ks_app.route("/v3/identity/users", methods=["GET"])
def v3identity():
returned = []
for user in users:
if not request.args.get("name") or user["username"].startswith(
request.args.get("name")
):
returned.append(
{
"domain_id": "default",
"enabled": True,
"id": user["username"],
"links": {},
"name": user["username"],
"email": user.get("email") or user["username"] + "@example.com",
}
)
return json.dumps({"users": returned})
@ks_app.route("/v3/auth/tokens", methods=["POST"])
def v3tokens():
creds = request.json["auth"]["identity"]["password"]["user"]
for user in users:
if creds["name"] == user["username"] and creds["password"] == user["password"]:
data = json.dumps(
{
"token": {
"methods": ["password"],
"roles": [
{"id": "9fe2ff9ee4384b1894a90878d3e92bab", "name": "_member_"},
{"id": "c703057be878458588961ce9a0ce686b", "name": "admin"},
],
"project": {
"domain": {"id": "default", "name": "Default"},
"id": "8538a3f13f9541b28c2620eb19065e45",
"name": "admin",
},
"catalog": [
{
"endpoints": [
{
"url": server_url + "/v3/identity",
"region": "RegionOne",
"interface": "admin",
"id": "29beb2f1567642eb810b042b6719ea88",
},
],
"type": "identity",
"id": "bd73972c0e14fb69bae8ff76e112a90",
"name": "keystone",
}
],
"extras": {},
"user": {
"domain": {"id": "default", "name": "Default"},
"id": user["username"],
"name": "admin",
},
"audit_ids": ["yRt0UrxJSs6-WYJgwEMMmg"],
"issued_at": "2014-06-16T22:24:26.089380",
"expires_at": "2020-06-16T23:24:26Z",
}
}
)
response = make_response(data, 200)
response.headers["X-Subject-Token"] = "sometoken"
return response
abort(403)
@ks_app.route("/v2.0/auth/tokens", methods=["POST"])
def tokens():
creds = request.json["auth"]["passwordCredentials"]
for user in users:
if creds["username"] == user["username"] and creds["password"] == user["password"]:
return json.dumps(
{
"access": {
"token": {
"issued_at": "2014-06-16T22:24:26.089380",
"expires": "2020-06-16T23:24:26Z",
"id": creds["username"],
"tenant": {"id": "sometenant"},
},
"serviceCatalog": [
{
"endpoints": [
{
"adminURL": server_url + "/v2.0/admin",
}
],
"endpoints_links": [],
"type": "identity",
"name": "admin",
},
],
"user": {
"username": creds["username"],
"roles_links": [],
"id": creds["username"],
"roles": [],
"name": user["name"],
},
"metadata": {
"is_admin": 0,
"roles": [],
},
},
}
)
abort(403)
return ks_app, _PORT_NUMBER
class KeystoneAuthTestsMixin:
maxDiff: Optional[int] = None
@property
def emails(self):
raise NotImplementedError
def fake_keystone(self):
raise NotImplementedError
def setUp(self):
setup_database_for_testing(self)
self.session = requests.Session()
def tearDown(self):
finished_database_for_testing(self)
def test_invalid_user(self):
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials("unknownuser", "password")
self.assertIsNone(user)
def test_invalid_password(self):
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials("cool.user", "notpassword")
self.assertIsNone(user)
def test_cooluser(self):
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials("cool.user", "password")
self.assertEqual(user.username, "cool.user")
self.assertEqual(user.email, "cool.user@example.com" if self.emails else None)
def test_neatuser(self):
with self.fake_keystone() as keystone:
(user, _) = keystone.verify_credentials("some.neat.user", "foobar")
self.assertEqual(user.username, "some.neat.user")
self.assertEqual(user.email, "some.neat.user@example.com" if self.emails else None)
class KeystoneV2AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(2, requires_email=False)
@property
def emails(self):
return False
class KeystoneV3AuthNoEmailTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(3, requires_email=False)
@property
def emails(self):
return False
class KeystoneV2AuthTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(2, requires_email=True)
@property
def emails(self):
return True
class KeystoneV3AuthTests(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(3, requires_email=True)
def emails(self):
return True
def test_query(self):
with self.fake_keystone() as keystone:
# Lookup cool.
(response, federated_id, error_message) = keystone.query_users("cool")
self.assertIsNone(error_message)
self.assertEqual(1, len(response))
self.assertEqual("keystone", federated_id)
user_info = response[0]
self.assertEqual("cool.user", user_info.username)
# Lookup unknown.
(response, federated_id, error_message) = keystone.query_users("unknown")
self.assertIsNone(error_message)
self.assertEqual(0, len(response))
self.assertEqual("keystone", federated_id)
def test_link_user(self):
with self.fake_keystone() as keystone:
# Link someuser.
user, error_message = keystone.link_user("cool.user")
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEqual("cool_user", user.username)
self.assertEqual("cool.user@example.com", user.email)
# Link again. Should return the same user record.
user_again, _ = keystone.link_user("cool.user")
self.assertEqual(user_again.id, user.id)
# Confirm someuser.
result, _ = keystone.confirm_existing_user("cool_user", "password")
self.assertIsNotNone(result)
self.assertEqual("cool_user", result.username)
def test_check_group_lookup_args(self):
with self.fake_keystone() as keystone:
(status, err) = keystone.check_group_lookup_args({})
self.assertFalse(status)
self.assertEqual("Missing group_id", err)
(status, err) = keystone.check_group_lookup_args({"group_id": "unknownid"})
self.assertFalse(status)
self.assertEqual("Group not found", err)
(status, err) = keystone.check_group_lookup_args({"group_id": "somegroupid"})
self.assertTrue(status)
self.assertIsNone(err)
def test_iterate_group_members(self):
with self.fake_keystone() as keystone:
(itt, err) = keystone.iterate_group_members({"group_id": "somegroupid"})
self.assertIsNone(err)
results = list(itt)
results.sort()
self.assertEqual(2, len(results))
self.assertEqual("adminuser", results[0][0].id)
self.assertEqual("cool.user", results[1][0].id)
class KeystoneRestrictedUsers(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(3, requires_email=True)
def emails(self):
return True
def test_restricted_users(self):
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with self.fake_keystone() as keystone:
# Lookup cool.
(response, federated_id, error_message) = keystone.query_users("cool")
self.assertIsNone(error_message)
self.assertEqual(1, len(response))
self.assertEqual("keystone", federated_id)
user_info = response[0]
self.assertEqual("cool.user", user_info.username)
# check if "cool" is a super user
check_is_superuser = usermanager.is_superuser(user_info.username)
assert check_is_superuser == False
# turn on restricted users
check_is_restricted_user = usermanager.is_restricted_user(user_info.username)
assert check_is_restricted_user == True
if __name__ == "__main__":
unittest.main()