1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/test/test_external_jwt_authn.py
Ivan Bazulic 1b27dd3c01
auth: Implement is_restricted_user for federated auth systems (PROJQUAY-8208) (#3400)
* auth: Implement is_restricted_user for OIDC and allow super users to create content regardless of set restriction (PROJQUAY-8208)
Currently, if OIDC is set as an authentication mechanism and restricted users is set, Quay will return a `501 Not Implemented` on invocation. Now, Quay will properly check the restricted user whitelist for federated users.
Additionally, if user restriction is in place and super user's username was **not** explicitly whitelisted, super users would not be able to create new content inside the registry. Now, the username is explicitly checked in the UI to allow super users to create both organizations and repos regardless of restricted users whitelist.

* Add tests

* Add tests for usermanager
2024-11-25 14:47:03 -05:00

370 lines
13 KiB
Python

import base64
import unittest
from contextlib import contextmanager
from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
from typing import Optional
import jwt
import requests
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from flask import Flask, jsonify, make_response, request
from mock import patch
from app import app, usermanager
from data.users import ExternalJWTAuthN
from features import FeatureNameValue
from initdb import finished_database_for_testing, setup_database_for_testing
from test.helpers import liveserver_app
_PORT_NUMBER = 5001
@contextmanager
def fake_jwt(requires_email=True):
"""
Context manager which instantiates and runs a webserver with a fake JWT implementation, until
the result is yielded.
Usage:
with fake_jwt() as jwt_auth:
# Make jwt_auth requests.
"""
jwt_app, port, public_key = _create_app(requires_email)
server_url = "http://" + jwt_app.config["SERVER_HOSTNAME"]
verify_url = server_url + "/user/verify"
query_url = server_url + "/user/query"
getuser_url = server_url + "/user/get"
jwt_auth = ExternalJWTAuthN(
verify_url,
query_url,
getuser_url,
"authy",
"",
app.config["HTTPCLIENT"],
300,
public_key_path=public_key.name,
requires_email=requires_email,
)
with liveserver_app(jwt_app, port):
yield jwt_auth
def _generate_certs():
public_key = NamedTemporaryFile(delete=True)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
pubkey = private_key.public_key()
public_key.write(
pubkey.public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH,
)
)
public_key.seek(0)
return (public_key, private_pem)
def _create_app(emails=True):
global _PORT_NUMBER
_PORT_NUMBER = _PORT_NUMBER + 1
public_key, private_key_data = _generate_certs()
users = [
{"name": "cool.user", "email": "user@domain.com", "password": "password"},
{"name": "some.neat.user", "email": "neat@domain.com", "password": "foobar"},
# Feature Flag: Email Blacklisting
{"name": "blacklistedcom", "email": "foo@blacklisted.com", "password": "somepass"},
{"name": "blacklistednet", "email": "foo@blacklisted.net", "password": "somepass"},
{"name": "blacklistedorg", "email": "foo@blacklisted.org", "password": "somepass"},
{"name": "notblacklistedcom", "email": "foo@notblacklisted.com", "password": "somepass"},
]
jwt_app = Flask("testjwt")
jwt_app.config["SERVER_HOSTNAME"] = "localhost:%s" % _PORT_NUMBER
def _get_basic_auth():
data = base64.b64decode(request.headers["Authorization"][len("Basic ") :]).decode("utf-8")
return data.split(":", 1)
@jwt_app.route("/user/query", methods=["GET"])
def query_users():
query = request.args.get("query")
results = []
for user in users:
if user["name"].startswith(query):
result = {
"username": user["name"],
}
if emails:
result["email"] = user["email"]
results.append(result)
token_data = {
"iss": "authy",
"aud": "quay.io/jwtauthn/query",
"nbf": datetime.utcnow(),
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(seconds=60),
"results": results,
}
encoded = jwt.encode(token_data, private_key_data, "RS256")
return jsonify({"token": encoded})
@jwt_app.route("/user/get", methods=["GET"])
def get_user():
username = request.args.get("username")
if username == "disabled":
return make_response("User is currently disabled", 401)
for user in users:
if user["name"] == username or user["email"] == username:
token_data = {
"iss": "authy",
"aud": "quay.io/jwtauthn/getuser",
"nbf": datetime.utcnow(),
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(seconds=60),
"sub": user["name"],
"email": user["email"],
}
encoded = jwt.encode(token_data, private_key_data, "RS256")
return jsonify({"token": encoded})
return make_response("Invalid username or password", 404)
@jwt_app.route("/user/verify", methods=["GET"])
def verify_user():
username, password = _get_basic_auth()
if username == "disabled":
return make_response("User is currently disabled", 401)
for user in users:
if user["name"] == username or user["email"] == username:
if password != user["password"]:
return make_response("", 404)
token_data = {
"iss": "authy",
"aud": "quay.io/jwtauthn",
"nbf": datetime.utcnow(),
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(seconds=60),
"sub": user["name"],
"email": user["email"],
}
encoded = jwt.encode(token_data, private_key_data, "RS256")
return jsonify({"token": encoded})
return make_response("Invalid username or password", 404)
jwt_app.config["TESTING"] = True
return jwt_app, _PORT_NUMBER, public_key
class JWTAuthTestMixin:
"""
Mixin defining all the JWT auth tests.
"""
maxDiff: Optional[int] = None
@property
def emails(self):
raise NotImplementedError
def setUp(self):
setup_database_for_testing(self)
self.app = app.test_client()
self.ctx = app.test_request_context()
self.ctx.__enter__()
self.session = requests.Session()
def tearDown(self):
finished_database_for_testing(self)
self.ctx.__exit__(True, None, None)
def test_verify_and_link_user(self):
with fake_jwt(self.emails) as jwt_auth:
result, error_message = jwt_auth.verify_and_link_user("invaliduser", "foobar")
self.assertEqual("Invalid username or password", error_message)
self.assertIsNone(result)
result, _ = jwt_auth.verify_and_link_user("cool.user", "invalidpassword")
self.assertIsNone(result)
result, _ = jwt_auth.verify_and_link_user("cool.user", "password")
self.assertIsNotNone(result)
self.assertEqual("cool_user", result.username)
result, _ = jwt_auth.verify_and_link_user("some.neat.user", "foobar")
self.assertIsNotNone(result)
self.assertEqual("some_neat_user", result.username)
def test_confirm_existing_user(self):
with fake_jwt(self.emails) as jwt_auth:
# Create the users in the DB.
result, _ = jwt_auth.verify_and_link_user("cool.user", "password")
self.assertIsNotNone(result)
result, _ = jwt_auth.verify_and_link_user("some.neat.user", "foobar")
self.assertIsNotNone(result)
# Confirm a user with the same internal and external username.
result, _ = jwt_auth.confirm_existing_user("cool_user", "invalidpassword")
self.assertIsNone(result)
result, _ = jwt_auth.confirm_existing_user("cool_user", "password")
self.assertIsNotNone(result)
self.assertEqual("cool_user", result.username)
# Fail to confirm the *external* username, which should return nothing.
result, _ = jwt_auth.confirm_existing_user("some.neat.user", "password")
self.assertIsNone(result)
# Now confirm the internal username.
result, _ = jwt_auth.confirm_existing_user("some_neat_user", "foobar")
self.assertIsNotNone(result)
self.assertEqual("some_neat_user", result.username)
def test_disabled_user_custom_error(self):
with fake_jwt(self.emails) as jwt_auth:
result, error_message = jwt_auth.verify_and_link_user("disabled", "password")
self.assertIsNone(result)
self.assertEqual("User is currently disabled", error_message)
def test_query(self):
with fake_jwt(self.emails) as jwt_auth:
# Lookup `cool`.
results, identifier, error_message = jwt_auth.query_users("cool")
self.assertIsNone(error_message)
self.assertEqual("jwtauthn", identifier)
self.assertEqual(1, len(results))
self.assertEqual("cool.user", results[0].username)
self.assertEqual("user@domain.com" if self.emails else None, results[0].email)
# Lookup `some`.
results, identifier, error_message = jwt_auth.query_users("some")
self.assertIsNone(error_message)
self.assertEqual("jwtauthn", identifier)
self.assertEqual(1, len(results))
self.assertEqual("some.neat.user", results[0].username)
self.assertEqual("neat@domain.com" if self.emails else None, results[0].email)
# Lookup `unknown`.
results, identifier, error_message = jwt_auth.query_users("unknown")
self.assertIsNone(error_message)
self.assertEqual("jwtauthn", identifier)
self.assertEqual(0, len(results))
def test_get_user(self):
with fake_jwt(self.emails) as jwt_auth:
# Lookup cool.user.
result, error_message = jwt_auth.get_user("cool.user")
self.assertIsNone(error_message)
self.assertIsNotNone(result)
self.assertEqual("cool.user", result.username)
self.assertEqual("user@domain.com", result.email)
# Lookup some.neat.user.
result, error_message = jwt_auth.get_user("some.neat.user")
self.assertIsNone(error_message)
self.assertIsNotNone(result)
self.assertEqual("some.neat.user", result.username)
self.assertEqual("neat@domain.com", result.email)
# Lookup unknown user.
result, error_message = jwt_auth.get_user("unknownuser")
self.assertIsNone(result)
def test_link_user(self):
with fake_jwt(self.emails) as jwt_auth:
# Link cool.user.
user, error_message = jwt_auth.link_user("cool.user")
self.assertIsNone(error_message)
self.assertIsNotNone(user)
self.assertEqual("cool_user", user.username)
# Link again. Should return the same user record.
user_again, _ = jwt_auth.link_user("cool.user")
self.assertEqual(user_again.id, user.id)
# Confirm cool.user.
result, _ = jwt_auth.confirm_existing_user("cool_user", "password")
self.assertIsNotNone(result)
self.assertEqual("cool_user", result.username)
def test_link_invalid_user(self):
with fake_jwt(self.emails) as jwt_auth:
user, error_message = jwt_auth.link_user("invaliduser")
self.assertIsNotNone(error_message)
self.assertIsNone(user)
def test_restricted_users(self):
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with fake_jwt(self.emails) as jwt_auth:
user, error_message = jwt_auth.get_user("cool.user")
self.assertIsNone(error_message)
self.assertIsNotNone(user)
# check if the user is superuser
check_is_superuser = usermanager.is_superuser(user.username)
assert check_is_superuser == False
# check if the user is restricted
check_is_restricted_user = usermanager.is_restricted_user(user.username)
assert check_is_restricted_user == True
class JWTAuthNoEmailTestCase(JWTAuthTestMixin, unittest.TestCase):
"""
Test cases for JWT auth, with emails disabled.
"""
@property
def emails(self):
return False
class JWTAuthTestCase(JWTAuthTestMixin, unittest.TestCase):
"""
Test cases for JWT auth, with emails enabled.
"""
@property
def emails(self):
return True
if __name__ == "__main__":
unittest.main()