1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/util/security/federated_robot_auth.py
Syed Ahmed e9161cb3ae robots: Add robot federation for keyless auth (PROJQUAY-7803) (#3207)
robots: Add robot federation for keyless auth (PROJQUAY-7652)

adds the ability to configure federated auth for robots by
using external OIDC providers. Each robot can be configured
to have multiple external OIDC providers as the source for
authentication.
2024-09-24 11:32:38 -04:00

117 lines
4.1 KiB
Python

import json
import logging
from jwt import InvalidTokenError
from app import app
from auth.basic import _parse_basic_auth_header
from auth.log import log_action
from auth.validateresult import AuthKind, ValidateResult
from data.database import FederatedLogin
from data.model import InvalidRobotCredentialException
from data.model.user import lookup_robot
from oauth.login_utils import get_jwt_issuer
from oauth.oidc import OIDCLoginService
from util.names import parse_robot_username
logger = logging.getLogger(__name__)
def validate_federated_auth(auth_header):
"""
Validates the specified federated auth header, returning whether its credentials point to a valid
user or token.
"""
if not auth_header:
return ValidateResult(AuthKind.federated, missing=True, error_message="No auth header")
logger.debug("Attempt to process federated auth header")
# Parse the federated auth header.
assert isinstance(auth_header, str)
credentials, err = _parse_basic_auth_header(auth_header)
if err is not None:
logger.debug("Got invalid federated auth header: %s", auth_header)
return ValidateResult(AuthKind.federated, missing=True, error_message=err)
auth_username, federated_token = credentials
is_robot = parse_robot_username(auth_username)
if not is_robot:
logger.debug(
f"Federated auth is only supported for robots. got invalid federated auth header: {auth_header}"
)
return ValidateResult(AuthKind.federated, missing=True, error_message="Invalid robot")
# find out if the robot is federated
# get the issuer from the DB config
# validate the token
robot = lookup_robot(auth_username)
assert robot.robot
result = verify_federated_robot_jwt_token(robot, federated_token)
return result.with_kind(AuthKind.federated)
def verify_federated_robot_jwt_token(robot, token):
# The token is a JWT token from the external OIDC provider
# We always have an entry in the federatedlogin table for each robot account
federated_robot = FederatedLogin.select().where(FederatedLogin.user == robot).get()
assert federated_robot
try:
metadata = json.loads(federated_robot.metadata_json)
except Exception as e:
logger.debug("Error parsing federated login metadata: %s", e)
raise InvalidRobotCredentialException("Robot does not have federated login configured")
# check if robot has federated login config
token_issuer = get_jwt_issuer(token)
if not token_issuer:
raise InvalidRobotCredentialException("Token does not contain issuer")
fed_config = metadata.get("federation_config", [])
if not fed_config:
raise InvalidRobotCredentialException("Robot does not have federated login configured")
matched_subs = []
for item in fed_config:
if item.get("issuer") == token_issuer:
matched_subs.append(item.get("subject"))
if not matched_subs:
raise InvalidRobotCredentialException(
f"issuer {token_issuer} not configured for this robot"
)
# verify the token
service_config = {"quayrobot": {"OIDC_SERVER": token_issuer}}
service = OIDCLoginService(service_config, "quayrobot", client=app.config["HTTPCLIENT"])
# throws an exception if we cannot decode/verify the token
options = {"verify_aud": False, "verify_nbf": False}
try:
decoded_token = service.decode_user_jwt(token, options=options)
except InvalidTokenError as e:
raise InvalidRobotCredentialException(f"Invalid token: {e}")
assert decoded_token
# check if the token is for the robot
if decoded_token.get("sub") not in matched_subs:
raise InvalidRobotCredentialException("Token does not match robot")
namespace, robot_name = parse_robot_username(robot.username)
log_action(
"federated_robot_token_exchange",
namespace,
{
"subject": decoded_token.get("sub"),
"issuer": decoded_token.get("iss"),
"robot": robot_name,
},
)
return ValidateResult(AuthKind.credentials, robot=robot)