From 07bface72e8f15bccc8f7e27240a6b8016476d67 Mon Sep 17 00:00:00 2001 From: Brady Pratt Date: Sun, 4 Jan 2026 02:08:44 -0600 Subject: [PATCH] feat(auth): Kubernetes ServiceAccount OIDC authentication (PROJQUAY-0000) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enable Kubernetes ServiceAccounts to authenticate to Quay using OIDC federation. This allows operators running in Kubernetes to authenticate using their pod's service account token. Key features: - KubernetesServiceAccountLoginService extends OIDCLoginService for JWKS-based JWT validation against the Kubernetes API server - Authenticated SAs map to robot accounts in 'quay-system' organization - Configurable superuser subject for operator service accounts - Startup validation of configuration with helpful error messages Configuration: FEATURE_KUBERNETES_SA_AUTH: true KUBERNETES_SA_AUTH_CONFIG: SYSTEM_ORG_NAME: "quay-system" SUPERUSER_SUBJECT: "system:serviceaccount:ns:sa-name" 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 Signed-off-by: Brady Pratt --- auth/oauth.py | 211 ++++++++++++++++++++++ auth/test/test_kubernetes_sa_auth.py | 201 +++++++++++++++++++++ auth/validateresult.py | 1 + config.py | 6 + endpoints/api/organization.py | 5 + features/__init__.pyi | 3 + oauth/oidc.py | 21 ++- oauth/services/kubernetes_sa.py | 256 +++++++++++++++++++++++++++ oauth/test/test_kubernetes_sa.py | 256 +++++++++++++++++++++++++++ util/config/schema.py | 45 +++++ util/config/superusermanager.py | 2 +- 11 files changed, 999 insertions(+), 8 deletions(-) create mode 100644 auth/test/test_kubernetes_sa_auth.py create mode 100644 oauth/services/kubernetes_sa.py create mode 100644 oauth/test/test_kubernetes_sa.py diff --git a/auth/oauth.py b/auth/oauth.py index e9b9254a8..a861053dd 100644 --- a/auth/oauth.py +++ b/auth/oauth.py @@ -1,9 +1,14 @@ import logging from datetime import datetime +from typing import TYPE_CHECKING, Optional from flask import request from jwt import ExpiredSignatureError, InvalidTokenError +import features + +if TYPE_CHECKING: + from data.database import User from app import analytics, app, authentication, oauth_login from auth.log import log_action from auth.scopes import scopes_from_scope_string @@ -39,6 +44,13 @@ def validate_bearer_auth(auth_header): def validate_oauth_token(token): + # K8s SA tokens do not have typ:JWT header so is_jwt() returns False for + # them, but they're still valid JWTs + if features.KUBERNETES_SA_AUTH: + result = validate_kubernetes_sa_token(token) + if result is not None: + return result + if is_jwt(token): return validate_sso_oauth_token(token) else: @@ -95,6 +107,205 @@ def validate_sso_oauth_token(token): return ValidateResult(AuthKind.ssojwt, error_message=str(ole)) +def _create_kubernetes_sa_system_org(org_name: str) -> "User": + """ + Create the system organization for Kubernetes SA robot accounts. + + This is called lazily on the first SA authentication if the org doesn't exist. + + Args: + org_name: Name of the organization to create + + Returns: + The created organization object + + Raises: + Exception if creation fails + """ + from data.database import db_transaction + from data.model import team, user + + with db_transaction(): + # Create org without an owner - it's a system org + new_org = user.create_user_noverify( + org_name, + email=None, + email_required=False, + ) + new_org.organization = True + new_org.save() + + # Create the owners team (required for org structure) + team.create_team("owners", new_org, "admin") + + logger.info(f"Created Kubernetes SA system organization '{org_name}'") + return new_org + + +def validate_kubernetes_sa_token(token: str) -> Optional[ValidateResult]: + """ + Validate a Kubernetes ServiceAccount JWT token. + + Kubernetes SA tokens are validated via OIDC/JWKS, then mapped to robot accounts + in the configured system organization. If the SA matches the configured superuser + subject, the robot is dynamically registered as a superuser. + + Returns: + ValidateResult on success, None if Kubernetes SA auth not configured or + token issuer doesn't match + """ + from data import model + from data.model import organization, team, user + from oauth.oidc import PublicKeyLoadException + from oauth.services.kubernetes_sa import ( + KUBERNETES_OIDC_SERVER, + KubernetesServiceAccountLoginService, + ) + from util.security.jwtutil import InvalidTokenError + + kubernetes_config = app.config.get("KUBERNETES_SA_AUTH_CONFIG", {}) + if not kubernetes_config: + logger.debug("FEATURE_KUBERNETES_SA_AUTH is enabled but no config provided") + return None + + # Check if token issuer matches our configured Kubernetes OIDC server + # This allows other JWT tokens to fall through to generic SSO handling + try: + issuer = get_jwt_issuer(token) + if not issuer: + return None + + # Normalize trailing slashes for comparison + if issuer.rstrip("/") != KUBERNETES_OIDC_SERVER.rstrip("/"): + logger.debug( + "Token issuer %s doesn't match Kubernetes OIDC server %s, skipping", + issuer, + KUBERNETES_OIDC_SERVER, + ) + return None + except Exception: + # If we can't decode issuer, let it fall through to other handlers + return None + + try: + # Create Kubernetes SA login service + kubernetes_service = KubernetesServiceAccountLoginService(app.config) + + # Validate the token + decoded_token = kubernetes_service.validate_sa_token(token) + + except (InvalidTokenError, PublicKeyLoadException) as e: + logger.warning(f"Kubernetes SA token validation failed: {e}") + return ValidateResult( + AuthKind.kubernetessa, + error_message=f"Token validation failed: {e}", + ) + except Exception as e: + logger.warning(f"Kubernetes SA token validation error: {e}") + return ValidateResult( + AuthKind.kubernetessa, + error_message=f"Token validation error: {e}", + ) + + # Extract subject (required claim) + subject = decoded_token.get("sub") + if not subject: + logger.warning("Kubernetes SA token missing 'sub' claim") + return ValidateResult( + AuthKind.kubernetessa, + error_message="Token missing subject claim", + ) + + # Parse and validate SA subject format + parsed = kubernetes_service.parse_sa_subject(subject) + if not parsed: + logger.warning(f"Invalid Kubernetes SA subject format: {subject}") + return ValidateResult( + AuthKind.kubernetessa, + error_message=f"Invalid ServiceAccount subject format: {subject}", + ) + + namespace, sa_name = parsed + + # Get or create robot account + robot_shortname = kubernetes_service.generate_robot_shortname(namespace, sa_name) + system_org_name = kubernetes_service.system_org_name + robot_username = f"{system_org_name}+{robot_shortname}" + + try: + robot = user.lookup_robot(robot_username) + except model.InvalidRobotException: + # Robot doesn't exist - need to create it + # First ensure system org exists (lazy creation) + try: + system_org = organization.get_organization(system_org_name) + except model.InvalidOrganizationException: + # Create the system org lazily on first SA authentication + logger.info(f"Creating Kubernetes SA system organization '{system_org_name}'") + try: + system_org = _create_kubernetes_sa_system_org(system_org_name) + except Exception as e: + logger.error(f"Failed to create system organization '{system_org_name}': {e}") + return ValidateResult( + AuthKind.kubernetessa, + error_message=f"Failed to create system organization: {e}", + ) + + # Create the robot account + try: + description = f"Kubernetes SA: {namespace}/{sa_name}" + metadata = { + "kubernetes_namespace": namespace, + "kubernetes_sa_name": sa_name, + "kubernetes_subject": subject, + } + robot, _ = user.create_robot( + robot_shortname, + system_org, + description=description, + unstructured_metadata=metadata, + ) + logger.info(f"Created Kubernetes SA robot account: {robot_username}") + + # Add robot to owners + try: + owners_team = team.get_organization_team(system_org_name, "owners") + team.add_user_to_team(robot, owners_team) + except Exception as e: + logger.warning(f"Failed to add robot to owners team: {e}") + except model.InvalidRobotException as e: + logger.error(f"Failed to create robot {robot_username}: {e}") + return ValidateResult( + AuthKind.kubernetessa, + error_message=f"Failed to create robot account: {e}", + ) + + # Check if this SA should be a superuser and register dynamically + if kubernetes_service.is_superuser_subject(subject): + try: + from app import usermanager + + if not usermanager.is_superuser(robot_username): + usermanager.register_superuser(robot_username) + logger.info(f"Registered Kubernetes SA robot as superuser: {robot_username}") + except Exception as e: + logger.error(f"Failed to register superuser {robot_username}: {e}") + # Continue - auth still valid, just not superuser + + # Log successful authentication + logger.info( + f"Kubernetes SA authenticated: {subject} -> {robot_username}", + extra={ + "kubernetes_namespace": namespace, + "kubernetes_sa_name": sa_name, + "robot_username": robot_username, + "is_superuser": kubernetes_service.is_superuser_subject(subject), + }, + ) + + return ValidateResult(AuthKind.kubernetessa, robot=robot, sso_token=token) + + def validate_app_oauth_token(token): """ Validates the specified OAuth token, returning whether it points to a valid OAuth token. diff --git a/auth/test/test_kubernetes_sa_auth.py b/auth/test/test_kubernetes_sa_auth.py new file mode 100644 index 000000000..67bbddcd8 --- /dev/null +++ b/auth/test/test_kubernetes_sa_auth.py @@ -0,0 +1,201 @@ +"""Integration tests for Kubernetes SA authentication.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from auth.oauth import validate_kubernetes_sa_token +from auth.validateresult import AuthKind + + +class TestValidateKubernetesSAToken: + """Tests for validate_kubernetes_sa_token function.""" + + def test_returns_none_when_feature_disabled(self, app): + """Should return None when FEATURE_KUBERNETES_SA_AUTH is disabled.""" + with patch.dict(app.config, {"FEATURE_KUBERNETES_SA_AUTH": False}): + result = validate_kubernetes_sa_token("some.jwt.token") + assert result is None + + def test_returns_none_when_config_missing(self, app): + """Should return None when KUBERNETES_SA_AUTH_CONFIG is missing.""" + with patch.dict( + app.config, + {"FEATURE_KUBERNETES_SA_AUTH": True, "KUBERNETES_SA_AUTH_CONFIG": None}, + ): + result = validate_kubernetes_sa_token("some.jwt.token") + assert result is None + + def test_returns_none_for_non_matching_issuer(self, app): + """Should return None when token issuer doesn't match config.""" + # Create a minimal JWT-like token with a different issuer + # The function checks issuer before full validation + with patch.dict( + app.config, + { + "FEATURE_KUBERNETES_SA_AUTH": True, + "KUBERNETES_SA_AUTH_CONFIG": {}, + }, + ): + with patch( + "auth.oauth.get_jwt_issuer", + return_value="https://some-other-issuer.com", + ): + result = validate_kubernetes_sa_token("some.jwt.token") + assert result is None + + def test_returns_none_when_issuer_extraction_fails(self, app): + """Should return None when issuer can't be extracted from token.""" + with patch.dict( + app.config, + { + "FEATURE_KUBERNETES_SA_AUTH": True, + "KUBERNETES_SA_AUTH_CONFIG": {}, + }, + ): + with patch("auth.oauth.get_jwt_issuer", return_value=None): + result = validate_kubernetes_sa_token("invalid.token") + assert result is None + + def test_returns_error_for_invalid_token(self, app): + """Should return error result for invalid token with matching issuer.""" + from util.security.jwtutil import InvalidTokenError + + with patch.dict( + app.config, + { + "FEATURE_KUBERNETES_SA_AUTH": True, + "KUBERNETES_SA_AUTH_CONFIG": { + "DEBUGGING": True, + }, + "TESTING": True, + }, + ): + with patch( + "auth.oauth.get_jwt_issuer", + return_value="https://kubernetes.default.svc", + ): + # Mock service to raise InvalidTokenError + mock_service = MagicMock() + mock_service.validate_sa_token.side_effect = InvalidTokenError("Invalid signature") + with patch( + "auth.oauth.KubernetesServiceAccountLoginService", + return_value=mock_service, + ): + result = validate_kubernetes_sa_token("bad.jwt.token") + assert result is not None + assert result.kind == AuthKind.kubernetessa + assert result.error_message is not None + assert "Token validation failed" in result.error_message + + def test_returns_error_for_missing_subject(self, app): + """Should return error when token lacks subject claim.""" + with patch.dict( + app.config, + { + "FEATURE_KUBERNETES_SA_AUTH": True, + "KUBERNETES_SA_AUTH_CONFIG": { + "DEBUGGING": True, + }, + "TESTING": True, + }, + ): + with patch( + "auth.oauth.get_jwt_issuer", + return_value="https://kubernetes.default.svc", + ): + mock_service = MagicMock() + mock_service.validate_sa_token.return_value = {"iss": "test"} # No sub + with patch( + "auth.oauth.KubernetesServiceAccountLoginService", + return_value=mock_service, + ): + result = validate_kubernetes_sa_token("some.jwt.token") + assert result is not None + assert result.kind == AuthKind.kubernetessa + assert "subject" in result.error_message.lower() + + def test_returns_error_for_invalid_subject_format(self, app): + """Should return error when subject format is invalid.""" + with patch.dict( + app.config, + { + "FEATURE_KUBERNETES_SA_AUTH": True, + "KUBERNETES_SA_AUTH_CONFIG": { + "DEBUGGING": True, + }, + "TESTING": True, + }, + ): + with patch( + "auth.oauth.get_jwt_issuer", + return_value="https://kubernetes.default.svc", + ): + mock_service = MagicMock() + mock_service.validate_sa_token.return_value = {"sub": "invalid:format"} + mock_service.parse_sa_subject.return_value = None + with patch( + "auth.oauth.KubernetesServiceAccountLoginService", + return_value=mock_service, + ): + result = validate_kubernetes_sa_token("some.jwt.token") + assert result is not None + assert result.kind == AuthKind.kubernetessa + assert "Invalid ServiceAccount subject" in result.error_message + + +class TestValidateKubernetesSATokenWithFixtures: + """Integration tests with database fixtures.""" + + def test_returns_error_when_org_creation_fails(self, app): + """Should return error when system organization creation fails.""" + from data import model + + with patch.dict( + app.config, + { + "FEATURE_KUBERNETES_SA_AUTH": True, + "KUBERNETES_SA_AUTH_CONFIG": { + "SYSTEM_ORG_NAME": "quay-system", + "DEBUGGING": True, + }, + "TESTING": True, + }, + ): + with patch( + "auth.oauth.get_jwt_issuer", + return_value="https://kubernetes.default.svc", + ): + mock_service = MagicMock() + mock_service.validate_sa_token.return_value = { + "sub": "system:serviceaccount:quay:operator" + } + mock_service.parse_sa_subject.return_value = ("quay", "operator") + mock_service.generate_robot_shortname.return_value = "kube_quay_operator" + mock_service.system_org_name = "quay-system" + + with patch( + "auth.oauth.KubernetesServiceAccountLoginService", + return_value=mock_service, + ): + # Make sure robot lookup fails (triggering org lookup/creation) + with patch( + "data.model.user.lookup_robot", + side_effect=model.InvalidRobotException("Not found"), + ): + # Make sure org lookup fails + with patch( + "data.model.organization.get_organization", + side_effect=model.InvalidOrganizationException("Not found"), + ): + # Make sure org creation fails + with patch( + "auth.oauth._create_kubernetes_sa_system_org", + side_effect=Exception("Database error"), + ): + result = validate_kubernetes_sa_token("some.jwt.token") + assert result is not None + assert result.kind == AuthKind.kubernetessa + assert ( + "Failed to create system organization" in result.error_message + ) diff --git a/auth/validateresult.py b/auth/validateresult.py index 668d1d400..a8dc4ac77 100644 --- a/auth/validateresult.py +++ b/auth/validateresult.py @@ -11,6 +11,7 @@ class AuthKind(Enum): credentials = "credentials" ssojwt = "ssojwt" federated = "federated" + kubernetessa = "kubernetessa" # Kubernetes ServiceAccount def __str__(self): return "%s" % self.value diff --git a/config.py b/config.py index f41f820d9..c759e9b32 100644 --- a/config.py +++ b/config.py @@ -295,6 +295,9 @@ class DefaultConfig(ImmutableConfig): # Gitlab Config. GITLAB_TRIGGER_CONFIG: Optional[Dict[str, str]] = None + # Kubernetes ServiceAccount authentication configuration + KUBERNETES_SA_AUTH_CONFIG: Optional[Dict[str, str]] = None + NOTIFICATION_QUEUE_NAME = "notification" DOCKERFILE_BUILD_QUEUE_NAME = "dockerfilebuild" REPLICATION_QUEUE_NAME = "imagestoragereplication" @@ -334,6 +337,9 @@ class DefaultConfig(ImmutableConfig): # Feature Flag: Whether Google login is supported. FEATURE_GOOGLE_LOGIN = False + # Feature Flag: Whether Kubernetes ServiceAccount OIDC authentication is supported. + FEATURE_KUBERNETES_SA_AUTH = False + # Feature Flag: Whether to support GitHub build triggers. FEATURE_GITHUB_BUILD = False diff --git a/endpoints/api/organization.py b/endpoints/api/organization.py index 833d30741..0ad8c23ea 100644 --- a/endpoints/api/organization.py +++ b/endpoints/api/organization.py @@ -95,6 +95,11 @@ def org_view(o, teams): features.SUPERUSERS_FULL_ACCESS and allow_if_superuser() ) + # Grant superusers effective admin/member status for UI visibility + if can_view_as_superuser: + is_admin = True + is_member = True + view = { "name": o.username, "email": o.email if is_admin or can_view_as_superuser else "", diff --git a/features/__init__.pyi b/features/__init__.pyi index f6cce6d32..44083d93b 100644 --- a/features/__init__.pyi +++ b/features/__init__.pyi @@ -31,6 +31,9 @@ GITHUB_LOGIN: FeatureNameValue # Feature Flag: Whether Google login is supported. GOOGLE_LOGIN: FeatureNameValue +# Feature Flag: Whether Kubernetes ServiceAccount OIDC authentication is supported. +KUBERNETES_SA_AUTH: FeatureNameValue + # Feature Flag: Whether to support GitHub build triggers. GITHUB_BUILD: FeatureNameValue diff --git a/oauth/oidc.py b/oauth/oidc.py index d8e3189a8..34b51a145 100644 --- a/oauth/oidc.py +++ b/oauth/oidc.py @@ -419,11 +419,18 @@ class _PublicKeyCache(TTLCache): # Load the keys. try: - keys = KeySet( - _load_keys_from_url( - keys_url, verify=not self._login_service.config.get("DEBUGGING", False) - ) - ) + # Check if service has custom SSL verification (e.g., Kubernetes SA with CA bundle) + if hasattr(self._login_service, "get_ssl_verification"): + verify = self._login_service.get_ssl_verification() + else: + verify = not self._login_service.config.get("DEBUGGING", False) + + # Check if service provides auth headers for JWKS fetching (e.g., Kubernetes SA) + headers = None + if hasattr(self._login_service, "get_jwks_auth_headers"): + headers = self._login_service.get_jwks_auth_headers() + + keys = KeySet(_load_keys_from_url(keys_url, verify=verify, headers=headers)) except Exception as ex: logger.exception("Exception loading public key") raise PublicKeyLoadException(str(ex)) @@ -443,7 +450,7 @@ class _PublicKeyCache(TTLCache): return rsa_key -def _load_keys_from_url(url, verify=True): +def _load_keys_from_url(url, verify=True, headers=None): """ Expects something on this form: {"keys": @@ -467,7 +474,7 @@ def _load_keys_from_url(url, verify=True): """ keys = [] - r = request("GET", url, allow_redirects=True, verify=verify) + r = request("GET", url, allow_redirects=True, verify=verify, headers=headers) if r.status_code == 200: keys_dict = json.loads(r.text) for key_spec in keys_dict["keys"]: diff --git a/oauth/services/kubernetes_sa.py b/oauth/services/kubernetes_sa.py new file mode 100644 index 000000000..a1f524155 --- /dev/null +++ b/oauth/services/kubernetes_sa.py @@ -0,0 +1,256 @@ +""" +Kubernetes ServiceAccount OIDC authentication service for Quay. + +Enables Kubernetes ServiceAccount tokens to authenticate to Quay using OIDC +federation. SA tokens map to robot accounts owned by a configurable system +organization. Authenticated SAs matching the configured superuser subject +receive dynamic superuser privileges. +""" + +import logging +import os +import re +from typing import Any, Optional + +from oauth.oidc import ( + ALLOWED_ALGORITHMS, + JWT_CLOCK_SKEW_SECONDS, + OIDCLoginService, + PublicKeyLoadException, +) +from util.security.jwtutil import InvalidTokenError, decode + +logger = logging.getLogger(__name__) + +# Standard Kubernetes ServiceAccount CA certificate path +SERVICE_ACCOUNT_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + +# Kubernetes SA subject format: system:serviceaccount:: +KUBERNETES_SA_SUBJECT_PATTERN = re.compile( + r"^system:serviceaccount:(?P[^:]+):(?P[^:]+)$" +) + +# Default system org for Kubernetes SA robot accounts +DEFAULT_SYSTEM_ORG_NAME = "quay-system" + +# Kubernetes API server OIDC endpoint (in-cluster) +KUBERNETES_OIDC_SERVER = "https://kubernetes.default.svc" + + +class KubernetesServiceAccountLoginService(OIDCLoginService): + """ + Kubernetes ServiceAccount OIDC authentication service. + + Validates Kubernetes SA JWT tokens using OIDC/JWKS and maps them to robot + accounts in a dedicated system organization. + + Unlike standard OIDC login services, this service: + - Does not participate in OAuth authorization flows + - Validates bearer tokens directly from Kubernetes pods + - Uses relaxed audience validation (K8s audiences vary) + - Maps SA subjects to robot accounts deterministically + """ + + def __init__( + self, + config: dict[str, Any], + key_name: str = "KUBERNETES_SA_AUTH_CONFIG", + client: Any = None, + ) -> None: + """ + Initialize the Kubernetes SA login service. + + Args: + config: Application configuration dict containing KUBERNETES_SA_AUTH_CONFIG + key_name: Configuration key name (default: KUBERNETES_SA_AUTH_CONFIG) + client: Optional HTTP client for testing + """ + kubernetes_config: dict[str, Any] = config.get(key_name, {}) + + # Build OIDC-compatible config from Kubernetes SA config + oidc_config: dict[str, Any] = { + key_name: { + "OIDC_SERVER": KUBERNETES_OIDC_SERVER, + "SERVICE_NAME": kubernetes_config.get("SERVICE_NAME", "Kubernetes"), + # CLIENT_ID is used for audience validation - defaults to server hostname + "CLIENT_ID": config.get("SERVER_HOSTNAME", ""), + "CLIENT_SECRET": "", # Not used for SA token validation + "DEBUGGING": kubernetes_config.get("DEBUGGING", False), + }, + "HTTPCLIENT": config.get("HTTPCLIENT"), + "TESTING": config.get("TESTING", False), + "FEATURE_MAILING": False, + } + + super().__init__(oidc_config, key_name, client) + + # Store Kubernetes-specific config + self._kubernetes_config: dict[str, Any] = kubernetes_config + self._system_org_name: str = kubernetes_config.get( + "SYSTEM_ORG_NAME", DEFAULT_SYSTEM_ORG_NAME + ) + self._superuser_subject: Optional[str] = kubernetes_config.get("SUPERUSER_SUBJECT") + self._verify_tls: bool = kubernetes_config.get("VERIFY_TLS", True) + self._ca_bundle: str = kubernetes_config.get("CA_BUNDLE", SERVICE_ACCOUNT_CA_PATH) + + def service_id(self) -> str: + return "kubernetes_sa" + + def service_name(self) -> str: + return self._kubernetes_config.get("SERVICE_NAME", "Kubernetes") + + @property + def system_org_name(self) -> str: + """Name of the organization that owns Kubernetes SA robot accounts.""" + return self._system_org_name + + def is_superuser_subject(self, subject: str) -> bool: + """Check if the given SA subject is configured as a superuser.""" + if not self._superuser_subject: + return False + return subject == self._superuser_subject + + def parse_sa_subject(self, subject: str) -> Optional[tuple[str, str]]: + """ + Parse a Kubernetes SA subject into namespace and name. + + Args: + subject: SA subject (e.g., "system:serviceaccount:quay:quay-operator") + + Returns: + Tuple of (namespace, sa_name) or None if invalid format + """ + match = KUBERNETES_SA_SUBJECT_PATTERN.match(subject) + if not match: + return None + return match.group("namespace"), match.group("name") + + def generate_robot_shortname(self, namespace: str, sa_name: str) -> str: + """ + Generate a deterministic robot shortname from Kubernetes SA identity. + + Robot format: kube__ + Characters are sanitized to be valid robot names (lowercase alphanumeric + underscore). + + Args: + namespace: Kubernetes namespace + sa_name: ServiceAccount name + + Returns: + Sanitized robot shortname + """ + # Sanitize: replace invalid chars with underscore, lowercase + safe_ns = re.sub(r"[^a-z0-9]", "_", namespace.lower()) + safe_name = re.sub(r"[^a-z0-9]", "_", sa_name.lower()) + return f"kube_{safe_ns}_{safe_name}" + + def get_ssl_verification(self) -> str | bool: + """ + Get SSL verification setting for Kubernetes API calls. + + Returns: + False if TLS verification disabled, path to CA bundle if configured, + or True to use system CA certificates. + """ + if not self._verify_tls: + return False + + if self._ca_bundle and os.path.exists(self._ca_bundle): + return self._ca_bundle + + return True + + def get_jwks_auth_headers(self) -> dict[str, str] | None: + """ + Get auth headers for JWKS endpoint requests. + + Kubernetes OIDC JWKS endpoints may require authentication. We use + the pod's ServiceAccount token to authenticate. + + Returns: + Dict with Authorization header, or None if token unavailable. + """ + token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" + if os.path.exists(token_path): + try: + with open(token_path) as f: + token = f.read().strip() + return {"Authorization": f"Bearer {token}"} + except Exception as e: + logger.warning(f"Failed to read SA token for JWKS auth: {e}") + return None + + def _load_oidc_config_via_discovery(self, is_debugging: bool) -> dict[str, Any]: + """ + Override to use custom TLS verification for Kubernetes API server. + + Kubernetes clusters may use self-signed certificates, so we use + the configured CA bundle for verification. + """ + import json + from posixpath import join + + from oauth.oidc import OIDC_WELLKNOWN, DiscoveryFailureException + + oidc_server = self.config["OIDC_SERVER"] + if not oidc_server.startswith("https://") and not is_debugging: + raise DiscoveryFailureException("OIDC server must be accessed over SSL") + + ssl_verify = self.get_ssl_verification() + + discovery_url = join(oidc_server, OIDC_WELLKNOWN) + try: + discovery = self._http_client.get(discovery_url, timeout=5, verify=ssl_verify) + except Exception as e: + logger.exception("Failed to connect to Kubernetes OIDC server: %s", e) + raise DiscoveryFailureException(f"Failed to connect to Kubernetes OIDC server: {e}") + + if discovery.status_code // 100 != 2: + logger.debug( + "Got %s response for OIDC discovery: %s", + discovery.status_code, + discovery.text, + ) + raise DiscoveryFailureException( + f"Could not load OIDC discovery information: {discovery.status_code}" + ) + + try: + return json.loads(discovery.text) + except ValueError: + logger.exception("Could not parse OIDC discovery for url: %s", discovery_url) + raise DiscoveryFailureException("Could not parse OIDC discovery information") + + def validate_sa_token(self, token: str) -> dict[str, Any]: + """ + Validate a Kubernetes ServiceAccount JWT token. + + This method validates the token signature using JWKS from the Kubernetes + API server, but uses relaxed validation for audience and nbf claims + since Kubernetes tokens may not follow standard OIDC conventions. + + Args: + token: The JWT token from Kubernetes SA + + Returns: + Decoded token claims + + Raises: + InvalidTokenError: If token validation fails + PublicKeyLoadException: If JWKS cannot be loaded + """ + options = { + "verify_aud": False, # Kubernetes audience varies by cluster config + "verify_nbf": False, # Some Kubernetes tokens lack nbf claim + } + + return self.decode_user_jwt(token, options=options) + + def get_issuer(self) -> Optional[str]: + """ + Get the expected token issuer. + + For Kubernetes, this is typically the API server URL or a custom + issuer configured at the cluster level. + """ + return self._issuer diff --git a/oauth/test/test_kubernetes_sa.py b/oauth/test/test_kubernetes_sa.py new file mode 100644 index 000000000..9d6eecdc4 --- /dev/null +++ b/oauth/test/test_kubernetes_sa.py @@ -0,0 +1,256 @@ +"""Tests for Kubernetes ServiceAccount OIDC authentication service.""" + +import pytest + +from oauth.services.kubernetes_sa import ( + DEFAULT_SYSTEM_ORG_NAME, + KUBERNETES_SA_SUBJECT_PATTERN, + KubernetesServiceAccountLoginService, +) + + +class TestKubernetesServiceAccountLoginService: + """Tests for KubernetesServiceAccountLoginService.""" + + @pytest.fixture + def kubernetes_config(self): + """Basic Kubernetes SA auth configuration.""" + return { + "FEATURE_KUBERNETES_SA_AUTH": True, + "SERVER_HOSTNAME": "quay.example.com", + "KUBERNETES_SA_AUTH_CONFIG": { + "SERVICE_NAME": "Kubernetes", + "VERIFY_TLS": False, + "SYSTEM_ORG_NAME": "quay-system", + "SUPERUSER_SUBJECT": "system:serviceaccount:quay-operator:quay-controller", + "DEBUGGING": True, + }, + "TESTING": True, + } + + @pytest.fixture + def kubernetes_service(self, kubernetes_config): + """Create Kubernetes SA login service.""" + return KubernetesServiceAccountLoginService(kubernetes_config) + + def test_service_id(self, kubernetes_service): + """Service ID should be 'kubernetes_sa'.""" + assert kubernetes_service.service_id() == "kubernetes_sa" + + def test_service_name(self, kubernetes_service): + """Service name should be from config.""" + assert kubernetes_service.service_name() == "Kubernetes" + + def test_service_name_default(self): + """Service name should default to 'Kubernetes'.""" + config = { + "KUBERNETES_SA_AUTH_CONFIG": {}, + "TESTING": True, + } + service = KubernetesServiceAccountLoginService(config) + assert service.service_name() == "Kubernetes" + + def test_system_org_name(self, kubernetes_service): + """System org name should be from config.""" + assert kubernetes_service.system_org_name == "quay-system" + + def test_system_org_name_default(self): + """System org name should default to 'quay-system'.""" + config = { + "KUBERNETES_SA_AUTH_CONFIG": {}, + "TESTING": True, + } + service = KubernetesServiceAccountLoginService(config) + assert service.system_org_name == DEFAULT_SYSTEM_ORG_NAME + + +class TestSASubjectParsing: + """Tests for SA subject parsing.""" + + @pytest.fixture + def service(self): + config = { + "KUBERNETES_SA_AUTH_CONFIG": {}, + "TESTING": True, + } + return KubernetesServiceAccountLoginService(config) + + @pytest.mark.parametrize( + "subject,expected", + [ + ("system:serviceaccount:default:my-sa", ("default", "my-sa")), + ( + "system:serviceaccount:quay-operator:controller", + ("quay-operator", "controller"), + ), + ( + "system:serviceaccount:ns:name-with-dashes", + ("ns", "name-with-dashes"), + ), + ( + "system:serviceaccount:kube-system:default", + ("kube-system", "default"), + ), + ( + "system:serviceaccount:my-namespace:my-service-account", + ("my-namespace", "my-service-account"), + ), + ], + ) + def test_parse_valid_subjects(self, service, subject, expected): + """Valid SA subjects should parse correctly.""" + result = service.parse_sa_subject(subject) + assert result == expected + + @pytest.mark.parametrize( + "subject", + [ + "invalid", + "system:serviceaccount", + "system:serviceaccount:only-namespace", + "user:someone", + "", + "system:node:my-node", + "serviceaccount:default:my-sa", # Missing system: prefix + ], + ) + def test_parse_invalid_subjects(self, service, subject): + """Invalid SA subjects should return None.""" + result = service.parse_sa_subject(subject) + assert result is None + + +class TestRobotNameGeneration: + """Tests for robot name generation.""" + + @pytest.fixture + def service(self): + config = { + "KUBERNETES_SA_AUTH_CONFIG": {}, + "TESTING": True, + } + return KubernetesServiceAccountLoginService(config) + + @pytest.mark.parametrize( + "namespace,sa_name,expected", + [ + ("default", "my-sa", "kube_default_my_sa"), + ("quay-operator", "controller-manager", "kube_quay_operator_controller_manager"), + ("ns", "name.with.dots", "kube_ns_name_with_dots"), + ("NS", "UPPERCASE", "kube_ns_uppercase"), + ("kube-system", "default", "kube_kube_system_default"), + ("my_namespace", "my_sa", "kube_my_namespace_my_sa"), + ], + ) + def test_generate_robot_shortname(self, service, namespace, sa_name, expected): + """Robot shortnames should be generated correctly.""" + result = service.generate_robot_shortname(namespace, sa_name) + assert result == expected + + def test_robot_shortname_deterministic(self, service): + """Robot shortnames should be deterministic for same input.""" + result1 = service.generate_robot_shortname("quay", "operator") + result2 = service.generate_robot_shortname("quay", "operator") + assert result1 == result2 + + +class TestSuperuserCheck: + """Tests for superuser subject checking.""" + + def test_is_superuser_subject_true(self): + """Matching subject should return True.""" + config = { + "KUBERNETES_SA_AUTH_CONFIG": { + "SUPERUSER_SUBJECT": "system:serviceaccount:quay-operator:controller", + }, + "TESTING": True, + } + service = KubernetesServiceAccountLoginService(config) + assert service.is_superuser_subject("system:serviceaccount:quay-operator:controller") + + def test_is_superuser_subject_false(self): + """Non-matching subject should return False.""" + config = { + "KUBERNETES_SA_AUTH_CONFIG": { + "SUPERUSER_SUBJECT": "system:serviceaccount:quay-operator:controller", + }, + "TESTING": True, + } + service = KubernetesServiceAccountLoginService(config) + assert not service.is_superuser_subject("system:serviceaccount:default:random-sa") + + def test_is_superuser_subject_no_config(self): + """No superuser configured should always return False.""" + config = { + "KUBERNETES_SA_AUTH_CONFIG": {}, + "TESTING": True, + } + service = KubernetesServiceAccountLoginService(config) + assert not service.is_superuser_subject("system:serviceaccount:quay-operator:controller") + + +class TestSSLVerification: + """Tests for SSL verification settings.""" + + def test_verify_tls_false(self): + """VERIFY_TLS=false should return False.""" + config = { + "KUBERNETES_SA_AUTH_CONFIG": { + "VERIFY_TLS": False, + }, + "TESTING": True, + } + service = KubernetesServiceAccountLoginService(config) + assert service.get_ssl_verification() is False + + def test_verify_tls_true_no_ca(self): + """VERIFY_TLS=true without CA bundle should return True.""" + config = { + "KUBERNETES_SA_AUTH_CONFIG": { + "VERIFY_TLS": True, + "CA_BUNDLE": "/nonexistent/path", + }, + "TESTING": True, + } + service = KubernetesServiceAccountLoginService(config) + assert service.get_ssl_verification() is True + + def test_verify_tls_default(self): + """Default should be to verify TLS.""" + config = { + "KUBERNETES_SA_AUTH_CONFIG": {}, + "TESTING": True, + } + service = KubernetesServiceAccountLoginService(config) + # Will return True since default CA path doesn't exist in test environment + result = service.get_ssl_verification() + assert result is True or isinstance(result, str) + + +class TestSubjectPattern: + """Tests for the SA subject regex pattern.""" + + @pytest.mark.parametrize( + "subject", + [ + "system:serviceaccount:default:my-sa", + "system:serviceaccount:kube-system:default", + "system:serviceaccount:my-ns:my-sa-123", + ], + ) + def test_valid_pattern_matches(self, subject): + """Valid SA subjects should match the pattern.""" + assert KUBERNETES_SA_SUBJECT_PATTERN.match(subject) is not None + + @pytest.mark.parametrize( + "subject", + [ + "system:node:my-node", + "system:serviceaccount:only-namespace", + "user:admin", + "", + ], + ) + def test_invalid_pattern_no_match(self, subject): + """Invalid SA subjects should not match the pattern.""" + assert KUBERNETES_SA_SUBJECT_PATTERN.match(subject) is None diff --git a/util/config/schema.py b/util/config/schema.py index 8bb568882..19ed8ad55 100644 --- a/util/config/schema.py +++ b/util/config/schema.py @@ -599,6 +599,51 @@ CONFIG_SCHEMA = { }, }, }, + "FEATURE_KUBERNETES_SA_AUTH": { + "type": "boolean", + "description": "Whether Kubernetes ServiceAccount OIDC authentication is supported. " + "When enabled, Kubernetes ServiceAccounts can authenticate to Quay using " + "OIDC-federated tokens. Defaults to False.", + "x-example": True, + }, + "KUBERNETES_SA_AUTH_CONFIG": { + "type": ["object", "null"], + "description": "Configuration for Kubernetes ServiceAccount OIDC authentication. " + "Enables Kubernetes operators to authenticate using ServiceAccount tokens. " + "Uses the in-cluster Kubernetes API server (https://kubernetes.default.svc) for OIDC.", + "properties": { + "SERVICE_NAME": { + "type": "string", + "description": "Display name for the authentication service in logs.", + "x-example": "Kubernetes", + }, + "VERIFY_TLS": { + "type": "boolean", + "description": "Whether to verify TLS certificates when connecting to " + "the Kubernetes API server. Defaults to True.", + "x-example": True, + }, + "CA_BUNDLE": { + "type": ["string", "null"], + "description": "Path to CA certificate bundle for TLS verification. " + "Defaults to /var/run/secrets/kubernetes.io/serviceaccount/ca.crt", + "x-example": "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", + }, + "SYSTEM_ORG_NAME": { + "type": "string", + "description": "Organization name that will own robot accounts for " + "authenticated ServiceAccounts. Must be created before enabling. " + "Defaults to 'quay-system'.", + "x-example": "quay-system", + }, + "SUPERUSER_SUBJECT": { + "type": ["string", "null"], + "description": "Kubernetes ServiceAccount subject that receives " + "superuser permissions. Format: system:serviceaccount::", + "x-example": "system:serviceaccount:quay-operator:quay-operator-controller-manager", + }, + }, + }, "GITLAB_TRIGGER_CONFIG": { "type": ["object", "null"], "description": "Configuration for using Gitlab (Enterprise) for external authentication", diff --git a/util/config/superusermanager.py b/util/config/superusermanager.py index 0a61e5466..f1de8c2c5 100644 --- a/util/config/superusermanager.py +++ b/util/config/superusermanager.py @@ -69,7 +69,7 @@ class ConfigUserManager(UserManager): usernames.append(username) new_string = ",".join(usernames) - if len(new_string) <= self._max_length: + if len(new_string) <= self._super_max_length: self._superusers_array.value = new_string.encode("utf8") else: raise Exception("Maximum superuser count reached. Please report this to support.")