1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00

auth: Implement is_restricted_user for federated auth systems (PROJQUAY-8208) (#3400)

* auth: Implement is_restricted_user for OIDC and allow super users to create content regardless of set restriction (PROJQUAY-8208)
Currently, if OIDC is set as an authentication mechanism and restricted users is set, Quay will return a `501 Not Implemented` on invocation. Now, Quay will properly check the restricted user whitelist for federated users.
Additionally, if user restriction is in place and super user's username was **not** explicitly whitelisted, super users would not be able to create new content inside the registry. Now, the username is explicitly checked in the UI to allow super users to create both organizations and repos regardless of restricted users whitelist.

* Add tests

* Add tests for usermanager
This commit is contained in:
Ivan Bazulic 2024-11-25 14:47:03 -05:00 committed by GitHub
parent a2f02db8ca
commit 1b27dd3c01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 190 additions and 19 deletions

View File

@ -405,6 +405,12 @@ class UserManager(object):
return self.state.is_restricted_user(username)
def is_superuser(self, username):
if not features.SUPER_USERS:
return False
return self.state.is_superuser(username)
class FederatedUserManager(ConfigUserManager):
"""
@ -438,7 +444,9 @@ class FederatedUserManager(ConfigUserManager):
if super().restricted_whitelist_is_set() and not super().is_restricted_user(username):
return False
return self.federated_users.is_restricted_user(username)
return self.federated_users.is_restricted_user(username) or super().is_restricted_user(
username
)
def has_restricted_users(self) -> bool:
return self.federated_users.has_restricted_users() or super().has_restricted_users()

View File

@ -150,16 +150,16 @@ class ExternalJWTAuthN(FederatedUsers):
return (None, "Exception when decoding returned JWT")
def is_superuser(self, username):
raise NotImplementedError()
return None
def is_global_readonly_superuser(self, username):
raise NotImplementedError()
return None
def has_superusers(self):
raise NotImplementedError()
def is_restricted_user(self, username):
raise NotImplementedError()
return None
def has_restricted_users(self):
raise NotImplementedError()

View File

@ -49,6 +49,12 @@ class OIDCUsers(FederatedUsers):
"""
return None
def is_restricted_user(self, username):
"""
Checks whether the currently logged in user is restricted.
"""
return None
def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False):
"""
Used by teamSync worker, unsupported for oidc team sync

View File

@ -356,16 +356,16 @@ class KeystoneV3Users(FederatedUsers):
)
def is_superuser(self, username):
raise NotImplementedError()
return None
def is_global_readonly_superuser(self, username):
raise NotImplementedError()
return None
def has_superusers(self, username):
raise NotImplementedError()
return None
def is_restricted_user(self, username):
raise NotImplementedError()
return None
def has_restricted_users(self):
raise NotImplementedError()
return None

View File

@ -158,8 +158,10 @@ class OrganizationList(ApiResource):
org_data = request.get_json()
existing = None
if features.RESTRICTED_USERS and usermanager.is_restricted_user(user.username):
raise Unauthorized()
# Super users should be able to create new orgs regardless of user restriction
if user.username not in app.config.get("SUPER_USERS", None):
if features.RESTRICTED_USERS and usermanager.is_restricted_user(user.username):
raise Unauthorized()
try:
existing = model.organization.get_organization(org_data["name"])

View File

@ -10,6 +10,7 @@ from flask import abort, request
import features
from app import (
app,
dockerfile_build_queue,
repository_gc_queue,
tuf_metadata_api,
@ -144,6 +145,7 @@ class RepositoryList(ApiResource):
features.RESTRICTED_USERS
and usermanager.is_restricted_user(owner.username)
and owner.username == namespace_name
and owner.username not in app.config.get("SUPER_USERS", None)
):
repository_name = req["repository"]

View File

@ -1,12 +1,18 @@
from test.fixtures import *
import pytest
from mock import patch
from app import app as realapp
from data import model
from endpoints.api import api
from endpoints.api.organization import Organization, OrganizationCollaboratorList
from endpoints.api.organization import (
Organization,
OrganizationCollaboratorList,
OrganizationList,
)
from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
from features import FeatureNameValue
from test.fixtures import *
@pytest.mark.parametrize(
@ -44,3 +50,36 @@ def test_get_organization_collaborators(app):
if collaborator["name"] == "outsideorg":
assert "orgrepo" in collaborator["repositories"]
assert "anotherorgrepo" not in collaborator["repositories"]
def test_create_org_as_superuser_with_restricted_users_set(app):
body = {
"name": "buyandlarge",
"email": "some@email.com",
}
# check if super users can create organizations regardles of restricted users set
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with client_with_identity("devtable", app) as cl:
resp = conduct_api_call(
cl, OrganizationList, "POST", None, body=body, expected_code=201
)
# unset all super users temporarily
superuser_list = realapp.config.get("SUPER_USERS")
realapp.config["SUPER_USERS"] = []
body = {
"name": "buyandlargetimes2",
"email": "some1@email.com",
}
# check if users who are not super users can create organizations when restricted users is set
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with client_with_identity("devtable", app) as cl:
resp = conduct_api_call(
cl, OrganizationList, "POST", None, body=body, expected_code=403
)
# reset superuser list to previous value
realapp.config["SUPER_USERS"] = superuser_list

View File

@ -1,13 +1,15 @@
from test.fixtures import *
import pytest
from mock import ANY, MagicMock, patch
from app import app as realapp
from app import authentication, usermanager
from data import database, model
from data.users import UserAuthentication, UserManager
from endpoints.api.repository import Repository, RepositoryList, RepositoryTrust
from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
from features import FeatureNameValue
from test.fixtures import *
@pytest.mark.parametrize(
@ -208,3 +210,46 @@ def test_delete_repo(initialized_db, app):
marker = database.DeletedRepository.get()
assert marker.original_name == "simple"
assert marker.queue_id
@pytest.mark.parametrize(
"namespace, expected_status",
[
("devtable", 201),
("buynlarge", 201),
],
)
def test_create_repo_with_restricted_users_enabled_as_superuser(namespace, expected_status, app):
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with client_with_identity("devtable", app) as cl:
body = {
"namespace": namespace,
"repository": "somerepo",
"visibility": "public",
"description": "foobar",
}
resp = conduct_api_call(
cl, RepositoryList, "POST", None, body, expected_code=expected_status
).json
if expected_status == 201:
assert resp["name"] == "somerepo"
assert model.repository.get_repository(namespace, "somerepo").name == "somerepo"
def test_create_repo_with_restricted_users_enabled_as_normal_user(app):
# reset super user list
super_users = realapp.config.get("SUPER_USERS")
realapp.config["SUPER_USERS"] = []
# try creating a repo with a random user
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with client_with_identity("devtable", app) as cl:
body = {
"namespace": "devtable",
"repository": "somerepo2",
"visibility": "public",
"description": "foobarbaz",
}
resp = conduct_api_call(cl, RepositoryList, "POST", None, body, expected_code=403)

View File

@ -3,7 +3,6 @@ import unittest
from contextlib import contextmanager
from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
from test.helpers import liveserver_app
from typing import Optional
import jwt
@ -11,10 +10,13 @@ import requests
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from flask import Flask, jsonify, make_response, request
from mock import patch
from app import app
from app import app, usermanager
from data.users import ExternalJWTAuthN
from features import FeatureNameValue
from initdb import finished_database_for_testing, setup_database_for_testing
from test.helpers import liveserver_app
_PORT_NUMBER = 5001
@ -327,6 +329,21 @@ class JWTAuthTestMixin:
self.assertIsNotNone(error_message)
self.assertIsNone(user)
def test_restricted_users(self):
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with fake_jwt(self.emails) as jwt_auth:
user, error_message = jwt_auth.get_user("cool.user")
self.assertIsNone(error_message)
self.assertIsNotNone(user)
# check if the user is superuser
check_is_superuser = usermanager.is_superuser(user.username)
assert check_is_superuser == False
# check if the user is restricted
check_is_restricted_user = usermanager.is_restricted_user(user.username)
assert check_is_restricted_user == True
class JWTAuthNoEmailTestCase(JWTAuthTestMixin, unittest.TestCase):
"""

View File

@ -4,10 +4,14 @@ import urllib.parse
import pytest
from httmock import HTTMock, urlmatch
from mock import patch
from app import app as realapp
from app import usermanager
from data import model
from data.database import TeamMember
from data.users.externaloidc import OIDCUsers
from features import FeatureNameValue
from initdb import finished_database_for_testing, setup_database_for_testing
from oauth.oidc import OIDCLoginService, PasswordGrantException
from test.fixtures import *
@ -320,3 +324,18 @@ def test_service_metadata(discovery_handler, token_handler_password_grant, useri
with HTTMock(discovery_handler, token_handler_password_grant, userinfo_handler):
result = oidc_instance.service_metadata()
assert result["issuer_domain"] == "fakeoidc"
def test_user_restrictions(discovery_handler, token_handler_password_grant, userinfo_handler):
oidc_instance = OIDCAuthTests().fake_oidc()
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with HTTMock(discovery_handler, token_handler_password_grant, userinfo_handler):
result, error_msg = oidc_instance.verify_credentials("someusername", "somepassword")
assert result.username == "someusername"
assert error_msg is None
# check if user is super user
check_is_superuser = usermanager.is_superuser(result.username)
assert check_is_superuser == False
# turn on restricted users
check_is_restricted_user = usermanager.is_restricted_user(result.username)
assert check_is_restricted_user == True

View File

@ -1,15 +1,20 @@
import json
import os
import unittest
import unittest.util
from contextlib import contextmanager
from test.helpers import liveserver_app
from typing import Optional
import requests
from flask import Flask, abort, make_response, request
from mock import patch
from app import app as realapp
from app import usermanager
from data.users.keystone import get_keystone_users
from features import FeatureNameValue
from initdb import finished_database_for_testing, setup_database_for_testing
from test.helpers import liveserver_app
_PORT_NUMBER = 5001
@ -424,5 +429,33 @@ class KeystoneV3AuthTests(KeystoneAuthTestsMixin, unittest.TestCase):
self.assertEqual("cool.user", results[1][0].id)
class KeystoneRestrictedUsers(KeystoneAuthTestsMixin, unittest.TestCase):
def fake_keystone(self):
return fake_keystone(3, requires_email=True)
def emails(self):
return True
def test_restricted_users(self):
with patch("features.RESTRICTED_USERS", FeatureNameValue("RESTRICTED_USERS", True)):
with self.fake_keystone() as keystone:
# Lookup cool.
(response, federated_id, error_message) = keystone.query_users("cool")
self.assertIsNone(error_message)
self.assertEqual(1, len(response))
self.assertEqual("keystone", federated_id)
user_info = response[0]
self.assertEqual("cool.user", user_info.username)
# check if "cool" is a super user
check_is_superuser = usermanager.is_superuser(user_info.username)
assert check_is_superuser == False
# turn on restricted users
check_is_restricted_user = usermanager.is_restricted_user(user_info.username)
assert check_is_restricted_user == True
if __name__ == "__main__":
unittest.main()