1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/data/users/externaloidc.py
Brandon Caton 65e727086a ldap: allow global readonly superuser to be filtered (PROJQUAY-7044) (#2917)
Allow global readonly superuser to be specified via LDAP.
2024-06-04 15:19:37 -04:00

195 lines
7.5 KiB
Python

import json
import logging
from urllib.parse import urlparse
import app
from data.model import InvalidTeamException, UserAlreadyInTeam, team
from data.users.federated import FederatedUsers, UserInformation
from oauth.login_utils import get_username_from_userinfo
from oauth.oidc import OIDCLoginService
logger = logging.getLogger(__name__)
class OIDCUsers(FederatedUsers):
def __init__(
self,
client_id,
client_secret,
oidc_server,
service_name,
login_scopes,
preferred_group_claim_name,
requires_email=True,
):
super(OIDCUsers, self).__init__("oidc", requires_email)
self._client_id = client_id
self._client_secret = client_secret
self._oidc_server = oidc_server
self._service_name = service_name
self._login_scopes = login_scopes
self._preferred_group_claim_name = preferred_group_claim_name
self._requires_email = requires_email
def service_metadata(self):
for service in app.oauth_login.services:
if isinstance(service, OIDCLoginService):
return {"issuer_domain": urlparse(service.get_issuer()).netloc}
return {}
def is_superuser(self, username: str):
"""
Initiated from FederatedUserManager.is_superuser(), falls back to ConfigUserManager.is_superuser()
"""
return None
def is_global_readonly_superuser(self, username: str):
"""
Initiated from FederatedUserManager.is_global_readonly_superuser(), falls back to ConfigUserManager.is_global_readonly_superuser()
"""
return None
def iterate_group_members(self, group_lookup_args, page_size=None, disable_pagination=False):
"""
Used by teamSync worker, unsupported for oidc team sync
"""
return (None, "Not supported")
def verify_credentials(self, username_or_email, password):
"""
Unsupported to login via username/email and password
"""
if not password:
return (None, "Anonymous binding not allowed.")
if not username_or_email:
return (None, "Missing username or email.")
for service in app.oauth_login.services:
if isinstance(service, OIDCLoginService):
try:
response = service.password_grant_for_login(username_or_email, password)
if response is None:
return (None, "External OIDC Group Sync: Got no user info")
user_info = service.get_user_info(
service._http_client, response["access_token"]
)
return (
UserInformation(
username=get_username_from_userinfo(user_info, service.config),
email=user_info.get("email"),
id=user_info.get("sub"),
),
None,
)
except Exception as err:
logger.exception(
f"External OIDC Group Sync: Exception while verifying credentials: {err}"
)
return (None, err)
def check_group_lookup_args(self, group_lookup_args, disable_pagination=False):
"""
No way to verify if the group is valid, so assuming the group is valid
"""
return (True, None)
def get_user(self, username_or_email):
"""
No way to look up a username or email in OIDC so returning None
"""
return (None, "Currently user lookup is not supported with OIDC")
def query_users(self, query, limit):
"""
No way to query users so returning empty list
"""
return ([], self._federated_service, "Not supported")
def sync_oidc_groups(self, user_groups, user_obj):
"""
Adds user to quay teams that have team sync enabled with an OIDC group
"""
if user_groups is None:
logger.debug(
f"External OIDC Group Sync: Found no oidc groups for user: {user_obj.username}"
)
return
for oidc_group in user_groups:
# fetch TeamSync row if exists, for the oidc_group synced with the login service
synced_teams = team.get_oidc_team_from_groupname(oidc_group, self._federated_service)
if len(synced_teams) == 0:
logger.debug(
f"External OIDC Group Sync: OIDC group: {oidc_group} is either not synced with a team in quay or is not synced with the {self._federated_service} service"
)
continue
# fetch team name and organization name for the Teamsync row
for team_synced in synced_teams:
team_name = team_synced.team.name
org_name = team_synced.team.organization.username
if not team_name or not org_name:
logger.debug(
f"External OIDC Group Sync: Cannot retrieve quay team synced with the oidc group: {oidc_group}"
)
# add user to team
try:
team.add_user_to_team(user_obj, team_synced.team)
except InvalidTeamException as err:
logger.exception(
f"External OIDC Group Sync: Exception occurred when adding user: {user_obj.username} to quay team: {team_synced.team} as {err}"
)
except UserAlreadyInTeam:
# Ignore
pass
return
def ping(self):
"""
TODO: get the OIDC connection here
"""
return (True, None)
def resync_quay_teams(self, user_groups, user_obj):
"""
Fetch quay teams that user is a member of.
Remove user from teams that are synced with an OIDC group but group does not exist in "user_groups"
"""
# fetch user's quay teams that have team sync enabled
existing_user_teams = team.get_federated_user_teams(user_obj, self._federated_service)
logger.debug(
f"External OIDC Group Sync: For user {user_obj.username} re-syncing {len(existing_user_teams)} quay teams"
)
user_groups = user_groups or []
for user_team in existing_user_teams:
try:
sync_group_info = json.loads(user_team.teamsync.config)
# remove user's membership from teams that were not returned from users OIDC groups
if (
sync_group_info.get("group_name", None)
and sync_group_info["group_name"] not in user_groups
):
org_name = user_team.teamsync.team.organization.username
team.remove_user_from_team(org_name, user_team.name, user_obj.username, None)
logger.debug(
f"External OIDC Group Sync: Successfully removed user: {user_obj.username} from team: {user_team.name} in organization: {org_name}"
)
except Exception as err:
logger.exception(
f"External OIDC Group Sync: Exception occurred for user {user_obj.username} when removing membership from quay team: {user_team.name} as {err}"
)
return
def sync_user_groups(self, user_groups, user_obj, login_service):
if not user_obj:
return
self.sync_oidc_groups(user_groups, user_obj)
self.resync_quay_teams(user_groups, user_obj)
return