1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00

ldap: allow global readonly superuser to be filtered (PROJQUAY-7044) (#2917)

Allow global readonly superuser to be specified via LDAP.
This commit is contained in:
Brandon Caton 2024-06-04 15:19:37 -04:00 committed by GitHub
parent 3248a72da6
commit 65e727086a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 127 additions and 9 deletions

View File

@ -69,6 +69,9 @@ def get_users_handler(config, _, override_config_dir, oauth_login):
network_timeout = config.get("LDAP_NETWORK_TIMEOUT")
ldap_user_filter = config.get("LDAP_USER_FILTER", None)
ldap_superuser_filter = config.get("LDAP_SUPERUSER_FILTER", None)
ldap_global_readonly_superuser_filter = config.get(
"LDAP_GLOBAL_READONLY_SUPERUSER_FILTER", None
)
ldap_restricted_user_filter = config.get("LDAP_RESTRICTED_USER_FILTER", None)
ldap_referrals = int(config.get("LDAP_FOLLOW_REFERRALS", True))
@ -89,6 +92,7 @@ def get_users_handler(config, _, override_config_dir, oauth_login):
network_timeout=network_timeout,
ldap_user_filter=ldap_user_filter,
ldap_superuser_filter=ldap_superuser_filter,
ldap_global_readonly_superuser_filter=ldap_global_readonly_superuser_filter,
ldap_restricted_user_filter=ldap_restricted_user_filter,
ldap_referrals=ldap_referrals,
)
@ -362,6 +366,9 @@ class UserAuthentication(object):
def is_superuser(self, username):
return self.state.is_superuser(username)
def is_global_readonly_superuser(self, username):
return self.state.is_global_readonly_superuser(username)
def has_superusers(self):
return self.state.has_superusers()
@ -437,4 +444,6 @@ class FederatedUserManager(ConfigUserManager):
return self.federated_users.has_restricted_users() or super().has_restricted_users()
def is_global_readonly_superuser(self, username: str) -> bool:
return super().is_global_readonly_superuser(username)
return self.federated_users.is_global_readonly_superuser(
username
) or super().is_global_readonly_superuser(username)

View File

@ -69,6 +69,9 @@ class AppTokenInternalAuth(object):
def is_superuser(self, username):
raise NotImplementedError()
def is_global_readonly_superuser(self, username):
raise NotImplementedError()
def has_superusers(self):
raise NotImplementedError()

View File

@ -89,6 +89,9 @@ class DatabaseUsers(object):
def is_superuser(self, username):
raise NotImplementedError()
def is_global_readonly_superuser(self, username):
raise NotImplementedError()
def has_superusers(self):
raise NotImplementedError()

View File

@ -152,6 +152,9 @@ class ExternalJWTAuthN(FederatedUsers):
def is_superuser(self, username):
raise NotImplementedError()
def is_global_readonly_superuser(self, username):
raise NotImplementedError()
def has_superusers(self):
raise NotImplementedError()

View File

@ -121,6 +121,7 @@ class LDAPUsers(FederatedUsers):
force_no_pagination=False,
ldap_user_filter=None,
ldap_superuser_filter=None,
ldap_global_readonly_superuser_filter=None,
ldap_restricted_user_filter=None,
ldap_referrals=_DEFAULT_REFERRALS,
):
@ -144,6 +145,7 @@ class LDAPUsers(FederatedUsers):
self._force_no_pagination = force_no_pagination
self._ldap_user_filter = ldap_user_filter
self._ldap_superuser_filter = ldap_superuser_filter
self._ldap_global_readonly_superuser_filter = ldap_global_readonly_superuser_filter
self._ldap_restricted_user_filter = ldap_restricted_user_filter
self._ldap_referrals = int(ldap_referrals)
@ -201,6 +203,10 @@ class LDAPUsers(FederatedUsers):
assert self._ldap_superuser_filter
return self._add_filter(query, self._ldap_superuser_filter)
def _add_global_readonly_superuser_filter(self, query):
assert self._ldap_global_readonly_superuser_filter
return self._add_filter(query, self._ldap_global_readonly_superuser_filter)
def _add_restricted_user_filter(self, query):
assert self._ldap_restricted_user_filter
return self._add_filter(query, self._ldap_restricted_user_filter)
@ -213,6 +219,7 @@ class LDAPUsers(FederatedUsers):
suffix="",
filter_superusers=False,
filter_restricted_users=False,
filter_global_readonly_superusers=False,
):
query = "(|({0}={2}{3})({1}={2}{3}))".format(
self._uid_attr, self._email_attr, escape_filter_chars(username_or_email), suffix
@ -231,6 +238,11 @@ class LDAPUsers(FederatedUsers):
return (None, "Superuser username not found")
query = self._add_superuser_filter(query)
elif filter_global_readonly_superusers:
if not self._ldap_global_readonly_superuser_filter:
return (None, "Global readonly superuser username not found")
query = self._add_global_readonly_superuser_filter(query)
logger.debug("Conducting user search: %s under %s", query, user_search_dn)
try:
@ -259,6 +271,7 @@ class LDAPUsers(FederatedUsers):
suffix="",
filter_superusers=False,
filter_restricted_users=False,
filter_global_readonly_superusers=False,
):
if not username_or_email:
return (None, "Empty username/email")
@ -282,6 +295,7 @@ class LDAPUsers(FederatedUsers):
suffix=suffix,
filter_superusers=filter_superusers,
filter_restricted_users=filter_restricted_users,
filter_global_readonly_superusers=filter_global_readonly_superusers,
)
if pairs is not None and len(pairs) > 0:
break
@ -299,12 +313,17 @@ class LDAPUsers(FederatedUsers):
return (with_dns, None)
def _ldap_single_user_search(
self, username_or_email, filter_superusers=False, filter_restricted_users=False
self,
username_or_email,
filter_superusers=False,
filter_restricted_users=False,
filter_global_readonly_superusers=False,
):
with_dns, err_msg = self._ldap_user_search(
username_or_email,
filter_superusers=filter_superusers,
filter_restricted_users=filter_restricted_users,
filter_global_readonly_superusers=filter_global_readonly_superusers,
)
if err_msg is not None:
return (None, err_msg)
@ -526,6 +545,27 @@ class LDAPUsers(FederatedUsers):
has_superusers, _ = self.at_least_one_user_exists(filter_superusers=True)
return has_superusers
def is_global_readonly_superuser(self, username_or_email: str) -> bool:
if not username_or_email:
return False
logger.debug(
"Looking up LDAP global readonly superuser username or email %s", username_or_email
)
(found_user, err_msg) = self._ldap_single_user_search(
username_or_email, filter_global_readonly_superusers=True
)
if found_user is None:
logger.debug(
"LDAP global readonly superuser %s not found: %s", username_or_email, err_msg
)
return False
logger.debug(
"Found global readonly superuser for LDAP username or email %s", username_or_email
)
return True
def is_restricted_user(self, username_or_email: str) -> bool:
if not username_or_email:
return False

View File

@ -43,6 +43,12 @@ class OIDCUsers(FederatedUsers):
"""
return None
def is_global_readonly_superuser(self, username: str):
"""
Initiated from FederatedUserManager.is_global_readonly_superuser(), falls back to ConfigUserManager.is_global_readonly_superuser()
"""
return None
def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False):
"""
Used by teamSync worker, unsupported for oidc team sync

View File

@ -134,6 +134,9 @@ class FederatedUsers(object):
def is_superuser(self, username):
raise NotImplementedError()
def is_global_readonly_superuser(self, username):
raise NotImplementedError()
def has_superusers(self):
raise NotImplementedError()

View File

@ -358,6 +358,9 @@ class KeystoneV3Users(FederatedUsers):
def is_superuser(self, username):
raise NotImplementedError()
def is_global_readonly_superuser(self, username):
raise NotImplementedError()
def has_superusers(self, username):
raise NotImplementedError()

View File

@ -13,7 +13,11 @@ from initdb import finished_database_for_testing, setup_database_for_testing
def _create_ldap(
requires_email=True, user_filter=None, superuser_filter=None, restricted_user_filter=None
requires_email=True,
user_filter=None,
superuser_filter=None,
restricted_user_filter=None,
global_readonly_superuser_filter=None,
):
base_dn = ["dc=quay", "dc=io"]
admin_dn = "uid=testy,ou=employees,dc=quay,dc=io"
@ -38,13 +42,18 @@ def _create_ldap(
ldap_user_filter=user_filter,
ldap_superuser_filter=superuser_filter,
ldap_restricted_user_filter=restricted_user_filter,
ldap_global_readonly_superuser_filter=global_readonly_superuser_filter,
)
return ldap
@contextmanager
def mock_ldap(
requires_email=True, user_filter=None, superuser_filter=None, restricted_user_filter=None
requires_email=True,
user_filter=None,
superuser_filter=None,
restricted_user_filter=None,
global_readonly_superuser_filter=None,
):
mock_data = {
"dc=quay,dc=io": {"dc": ["quay", "io"]},
@ -192,6 +201,16 @@ def mock_ldap(
"filterField": ["somevalue", "restricted"],
"objectClass": "user",
},
"uid=someglobalreadonlysuperuser,ou=employees,dc=quay,dc=io": {
"dc": ["quay", "io"],
"ou": "employees",
"uid": ["someglobalreadonlysuperuser"],
"userPassword": ["someglobalreadonlypass"],
"mail": ["globalreadonlysuperuserfoo@bar.com"],
"memberOf": ["cn=AwesomeFolk,dc=quay,dc=io", "cn=*Guys,dc=quay,dc=io"],
"filterField": ["somevalue", "globalreadonlysuperuser"],
"objectClass": "user",
},
}
if not requires_email:
@ -324,6 +343,7 @@ def mock_ldap(
user_filter=user_filter,
superuser_filter=superuser_filter,
restricted_user_filter=restricted_user_filter,
global_readonly_superuser_filter=global_readonly_superuser_filter,
)
finally:
mockldap.stop()
@ -577,7 +597,7 @@ class TestLDAP(unittest.TestCase):
self.assertIsNone(err)
results = list(it)
self.assertEqual(4, len(results))
self.assertEqual(5, len(results))
first = results[0][0]
second = results[1][0]
@ -611,6 +631,11 @@ class TestLDAP(unittest.TestCase):
self.assertEqual("somerestricteduser", u.username)
self.assertEqual("restrictedfoo@bar.com", u.email)
if u.id == "someglobalreadonlysuperuser":
self.assertEqual("someglobalreadonlysuperuser", u.id)
self.assertEqual("someglobalreadonlysuperuser", u.username)
self.assertEqual("globalreadonlysuperuserfoo@bar.com", u.email)
def test_iterate_group_members_with_pagination(self):
with mock_ldap() as ldap:
for dn in ["cn=AwesomeFolk", "cn=*Guys"]:
@ -794,12 +819,13 @@ class TestLDAP(unittest.TestCase):
self.assertIsNone(err)
results = list(it)
self.assertEqual(4, len(results))
self.assertEqual(5, len(results))
def test_ldap_superuser_and_restricted_user_filtering(self):
valid_user_filter = "(filterField=somevalue)"
valid_superuser_filter = "(filterField=superuser)"
valid_restricted_user_filter = "(filterField=restricted)"
valid_global_readonly_user_filter = "(filterField=globalreadonlysuperuser)"
with mock_ldap(user_filter=valid_user_filter) as ldap:
# Verify we can login.
@ -810,6 +836,7 @@ class TestLDAP(unittest.TestCase):
user_filter=valid_user_filter,
superuser_filter=valid_superuser_filter,
restricted_user_filter=valid_restricted_user_filter,
global_readonly_superuser_filter=valid_global_readonly_user_filter,
) as ldap:
(it, err) = ldap.iterate_group_members(
{"group_dn": "cn=AwesomeFolk"}, disable_pagination=True
@ -817,25 +844,37 @@ class TestLDAP(unittest.TestCase):
self.assertIsNone(err)
results = list(it)
self.assertEqual(4, len(results))
self.assertEqual(5, len(results))
for u in results:
user = u[0]
is_superuser = ldap.is_superuser(user.username)
is_restricted_user = ldap.is_restricted_user(user.username)
is_global_readonly_superuser = ldap.is_global_readonly_superuser(user.username)
if user.username == "somesuperuser":
self.assertTrue(is_superuser)
self.assertFalse(is_restricted_user)
self.assertFalse(is_global_readonly_superuser)
self.assertEqual("somesuperuser", user.id)
self.assertEqual("somesuperuser", user.username)
self.assertEqual("superfoo@bar.com", user.email)
elif user.username == "someglobalreadonlysuperuser":
self.assertFalse(is_superuser)
self.assertFalse(is_restricted_user)
self.assertTrue(is_global_readonly_superuser)
self.assertEqual("someglobalreadonlysuperuser", user.id)
self.assertEqual("someglobalreadonlysuperuser", user.username)
self.assertEqual("globalreadonlysuperuserfoo@bar.com", user.email)
elif user.username == "somerestricteduser":
self.assertTrue(is_restricted_user)
self.assertFalse(is_superuser)
self.assertFalse(is_global_readonly_superuser)
self.assertEqual("somerestricteduser", user.id)
self.assertEqual("somerestricteduser", user.username)
@ -843,6 +882,7 @@ class TestLDAP(unittest.TestCase):
else:
self.assertFalse(is_superuser)
self.assertFalse(is_global_readonly_superuser)
self.assertFalse(is_restricted_user)
self.assertTrue(ldap.has_superusers())
@ -852,6 +892,7 @@ class TestLDAP(unittest.TestCase):
valid_user_filter = "(filterField=somevalue)"
invalid_superuser_filter = "(filterField=notsuperuser)"
invalid_restricted_user_filter = "(filterField=notrestricted)"
invalid_global_readonly_superuser_filter = "(filterField=notglobalreadonlysuperuser)"
with mock_ldap(user_filter=valid_user_filter) as ldap:
# Verify we can login.
@ -862,6 +903,7 @@ class TestLDAP(unittest.TestCase):
user_filter=valid_user_filter,
superuser_filter=invalid_superuser_filter,
restricted_user_filter=invalid_restricted_user_filter,
global_readonly_superuser_filter=invalid_global_readonly_superuser_filter,
) as ldap:
(it, err) = ldap.iterate_group_members(
{"group_dn": "cn=AwesomeFolk"}, disable_pagination=True
@ -869,15 +911,17 @@ class TestLDAP(unittest.TestCase):
self.assertIsNone(err)
results = list(it)
self.assertEqual(4, len(results))
self.assertEqual(5, len(results))
for u in results:
user = u[0]
is_superuser = ldap.is_superuser(user.username)
is_restricted_user = ldap.is_restricted_user(user.username)
is_global_readonly_superser = ldap.is_global_readonly_superuser(user.username)
self.assertFalse(is_superuser)
self.assertFalse(is_restricted_user)
self.assertFalse(is_global_readonly_superser)
self.assertFalse(ldap.has_superusers())
self.assertFalse(ldap.has_restricted_users())
@ -886,6 +930,7 @@ class TestLDAP(unittest.TestCase):
valid_user_filter = "(filterField=somevalue)"
superuser_filter = None
restricted_user_filter = None
global_readonly_superuser_filter = None
with mock_ldap(user_filter=valid_user_filter) as ldap:
# Verify we can login.
@ -896,6 +941,7 @@ class TestLDAP(unittest.TestCase):
user_filter=valid_user_filter,
superuser_filter=superuser_filter,
restricted_user_filter=restricted_user_filter,
global_readonly_superuser_filter=global_readonly_superuser_filter,
) as ldap:
(it, err) = ldap.iterate_group_members(
{"group_dn": "cn=AwesomeFolk"}, disable_pagination=True
@ -903,14 +949,16 @@ class TestLDAP(unittest.TestCase):
self.assertIsNone(err)
results = list(it)
self.assertEqual(4, len(results))
self.assertEqual(5, len(results))
for u in results:
user = u[0]
is_superuser = ldap.is_superuser(user.username)
is_restricted_user = ldap.is_restricted_user(user.username)
is_global_readonly_superser = ldap.is_global_readonly_superuser(user.username)
self.assertFalse(is_superuser)
self.assertFalse(is_global_readonly_superser)
self.assertTrue(is_restricted_user)
self.assertFalse(ldap.has_superusers())