1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00

feat(auth): Kubernetes ServiceAccount OIDC authentication (PROJQUAY-0000)

Enable Kubernetes ServiceAccounts to authenticate to Quay using OIDC
federation. This allows operators running in Kubernetes to authenticate
using their pod's service account token.

Key features:
- KubernetesServiceAccountLoginService extends OIDCLoginService for
  JWKS-based JWT validation against the Kubernetes API server
- Authenticated SAs map to robot accounts in 'quay-system' organization
- Configurable superuser subject for operator service accounts
- Startup validation of configuration with helpful error messages

Configuration:
  FEATURE_KUBERNETES_SA_AUTH: true
  KUBERNETES_SA_AUTH_CONFIG:
    SYSTEM_ORG_NAME: "quay-system"
    SUPERUSER_SUBJECT: "system:serviceaccount:ns:sa-name"

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Signed-off-by: Brady Pratt <bpratt@redhat.com>
This commit is contained in:
Brady Pratt
2026-01-04 02:08:44 -06:00
parent e21a9caae2
commit 07bface72e
11 changed files with 999 additions and 8 deletions

View File

@@ -1,9 +1,14 @@
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from flask import request
from jwt import ExpiredSignatureError, InvalidTokenError
import features
if TYPE_CHECKING:
from data.database import User
from app import analytics, app, authentication, oauth_login
from auth.log import log_action
from auth.scopes import scopes_from_scope_string
@@ -39,6 +44,13 @@ def validate_bearer_auth(auth_header):
def validate_oauth_token(token):
# K8s SA tokens do not have typ:JWT header so is_jwt() returns False for
# them, but they're still valid JWTs
if features.KUBERNETES_SA_AUTH:
result = validate_kubernetes_sa_token(token)
if result is not None:
return result
if is_jwt(token):
return validate_sso_oauth_token(token)
else:
@@ -95,6 +107,205 @@ def validate_sso_oauth_token(token):
return ValidateResult(AuthKind.ssojwt, error_message=str(ole))
def _create_kubernetes_sa_system_org(org_name: str) -> "User":
"""
Create the system organization for Kubernetes SA robot accounts.
This is called lazily on the first SA authentication if the org doesn't exist.
Args:
org_name: Name of the organization to create
Returns:
The created organization object
Raises:
Exception if creation fails
"""
from data.database import db_transaction
from data.model import team, user
with db_transaction():
# Create org without an owner - it's a system org
new_org = user.create_user_noverify(
org_name,
email=None,
email_required=False,
)
new_org.organization = True
new_org.save()
# Create the owners team (required for org structure)
team.create_team("owners", new_org, "admin")
logger.info(f"Created Kubernetes SA system organization '{org_name}'")
return new_org
def validate_kubernetes_sa_token(token: str) -> Optional[ValidateResult]:
"""
Validate a Kubernetes ServiceAccount JWT token.
Kubernetes SA tokens are validated via OIDC/JWKS, then mapped to robot accounts
in the configured system organization. If the SA matches the configured superuser
subject, the robot is dynamically registered as a superuser.
Returns:
ValidateResult on success, None if Kubernetes SA auth not configured or
token issuer doesn't match
"""
from data import model
from data.model import organization, team, user
from oauth.oidc import PublicKeyLoadException
from oauth.services.kubernetes_sa import (
KUBERNETES_OIDC_SERVER,
KubernetesServiceAccountLoginService,
)
from util.security.jwtutil import InvalidTokenError
kubernetes_config = app.config.get("KUBERNETES_SA_AUTH_CONFIG", {})
if not kubernetes_config:
logger.debug("FEATURE_KUBERNETES_SA_AUTH is enabled but no config provided")
return None
# Check if token issuer matches our configured Kubernetes OIDC server
# This allows other JWT tokens to fall through to generic SSO handling
try:
issuer = get_jwt_issuer(token)
if not issuer:
return None
# Normalize trailing slashes for comparison
if issuer.rstrip("/") != KUBERNETES_OIDC_SERVER.rstrip("/"):
logger.debug(
"Token issuer %s doesn't match Kubernetes OIDC server %s, skipping",
issuer,
KUBERNETES_OIDC_SERVER,
)
return None
except Exception:
# If we can't decode issuer, let it fall through to other handlers
return None
try:
# Create Kubernetes SA login service
kubernetes_service = KubernetesServiceAccountLoginService(app.config)
# Validate the token
decoded_token = kubernetes_service.validate_sa_token(token)
except (InvalidTokenError, PublicKeyLoadException) as e:
logger.warning(f"Kubernetes SA token validation failed: {e}")
return ValidateResult(
AuthKind.kubernetessa,
error_message=f"Token validation failed: {e}",
)
except Exception as e:
logger.warning(f"Kubernetes SA token validation error: {e}")
return ValidateResult(
AuthKind.kubernetessa,
error_message=f"Token validation error: {e}",
)
# Extract subject (required claim)
subject = decoded_token.get("sub")
if not subject:
logger.warning("Kubernetes SA token missing 'sub' claim")
return ValidateResult(
AuthKind.kubernetessa,
error_message="Token missing subject claim",
)
# Parse and validate SA subject format
parsed = kubernetes_service.parse_sa_subject(subject)
if not parsed:
logger.warning(f"Invalid Kubernetes SA subject format: {subject}")
return ValidateResult(
AuthKind.kubernetessa,
error_message=f"Invalid ServiceAccount subject format: {subject}",
)
namespace, sa_name = parsed
# Get or create robot account
robot_shortname = kubernetes_service.generate_robot_shortname(namespace, sa_name)
system_org_name = kubernetes_service.system_org_name
robot_username = f"{system_org_name}+{robot_shortname}"
try:
robot = user.lookup_robot(robot_username)
except model.InvalidRobotException:
# Robot doesn't exist - need to create it
# First ensure system org exists (lazy creation)
try:
system_org = organization.get_organization(system_org_name)
except model.InvalidOrganizationException:
# Create the system org lazily on first SA authentication
logger.info(f"Creating Kubernetes SA system organization '{system_org_name}'")
try:
system_org = _create_kubernetes_sa_system_org(system_org_name)
except Exception as e:
logger.error(f"Failed to create system organization '{system_org_name}': {e}")
return ValidateResult(
AuthKind.kubernetessa,
error_message=f"Failed to create system organization: {e}",
)
# Create the robot account
try:
description = f"Kubernetes SA: {namespace}/{sa_name}"
metadata = {
"kubernetes_namespace": namespace,
"kubernetes_sa_name": sa_name,
"kubernetes_subject": subject,
}
robot, _ = user.create_robot(
robot_shortname,
system_org,
description=description,
unstructured_metadata=metadata,
)
logger.info(f"Created Kubernetes SA robot account: {robot_username}")
# Add robot to owners
try:
owners_team = team.get_organization_team(system_org_name, "owners")
team.add_user_to_team(robot, owners_team)
except Exception as e:
logger.warning(f"Failed to add robot to owners team: {e}")
except model.InvalidRobotException as e:
logger.error(f"Failed to create robot {robot_username}: {e}")
return ValidateResult(
AuthKind.kubernetessa,
error_message=f"Failed to create robot account: {e}",
)
# Check if this SA should be a superuser and register dynamically
if kubernetes_service.is_superuser_subject(subject):
try:
from app import usermanager
if not usermanager.is_superuser(robot_username):
usermanager.register_superuser(robot_username)
logger.info(f"Registered Kubernetes SA robot as superuser: {robot_username}")
except Exception as e:
logger.error(f"Failed to register superuser {robot_username}: {e}")
# Continue - auth still valid, just not superuser
# Log successful authentication
logger.info(
f"Kubernetes SA authenticated: {subject} -> {robot_username}",
extra={
"kubernetes_namespace": namespace,
"kubernetes_sa_name": sa_name,
"robot_username": robot_username,
"is_superuser": kubernetes_service.is_superuser_subject(subject),
},
)
return ValidateResult(AuthKind.kubernetessa, robot=robot, sso_token=token)
def validate_app_oauth_token(token):
"""
Validates the specified OAuth token, returning whether it points to a valid OAuth token.

View File

@@ -0,0 +1,201 @@
"""Integration tests for Kubernetes SA authentication."""
from unittest.mock import MagicMock, patch
import pytest
from auth.oauth import validate_kubernetes_sa_token
from auth.validateresult import AuthKind
class TestValidateKubernetesSAToken:
"""Tests for validate_kubernetes_sa_token function."""
def test_returns_none_when_feature_disabled(self, app):
"""Should return None when FEATURE_KUBERNETES_SA_AUTH is disabled."""
with patch.dict(app.config, {"FEATURE_KUBERNETES_SA_AUTH": False}):
result = validate_kubernetes_sa_token("some.jwt.token")
assert result is None
def test_returns_none_when_config_missing(self, app):
"""Should return None when KUBERNETES_SA_AUTH_CONFIG is missing."""
with patch.dict(
app.config,
{"FEATURE_KUBERNETES_SA_AUTH": True, "KUBERNETES_SA_AUTH_CONFIG": None},
):
result = validate_kubernetes_sa_token("some.jwt.token")
assert result is None
def test_returns_none_for_non_matching_issuer(self, app):
"""Should return None when token issuer doesn't match config."""
# Create a minimal JWT-like token with a different issuer
# The function checks issuer before full validation
with patch.dict(
app.config,
{
"FEATURE_KUBERNETES_SA_AUTH": True,
"KUBERNETES_SA_AUTH_CONFIG": {},
},
):
with patch(
"auth.oauth.get_jwt_issuer",
return_value="https://some-other-issuer.com",
):
result = validate_kubernetes_sa_token("some.jwt.token")
assert result is None
def test_returns_none_when_issuer_extraction_fails(self, app):
"""Should return None when issuer can't be extracted from token."""
with patch.dict(
app.config,
{
"FEATURE_KUBERNETES_SA_AUTH": True,
"KUBERNETES_SA_AUTH_CONFIG": {},
},
):
with patch("auth.oauth.get_jwt_issuer", return_value=None):
result = validate_kubernetes_sa_token("invalid.token")
assert result is None
def test_returns_error_for_invalid_token(self, app):
"""Should return error result for invalid token with matching issuer."""
from util.security.jwtutil import InvalidTokenError
with patch.dict(
app.config,
{
"FEATURE_KUBERNETES_SA_AUTH": True,
"KUBERNETES_SA_AUTH_CONFIG": {
"DEBUGGING": True,
},
"TESTING": True,
},
):
with patch(
"auth.oauth.get_jwt_issuer",
return_value="https://kubernetes.default.svc",
):
# Mock service to raise InvalidTokenError
mock_service = MagicMock()
mock_service.validate_sa_token.side_effect = InvalidTokenError("Invalid signature")
with patch(
"auth.oauth.KubernetesServiceAccountLoginService",
return_value=mock_service,
):
result = validate_kubernetes_sa_token("bad.jwt.token")
assert result is not None
assert result.kind == AuthKind.kubernetessa
assert result.error_message is not None
assert "Token validation failed" in result.error_message
def test_returns_error_for_missing_subject(self, app):
"""Should return error when token lacks subject claim."""
with patch.dict(
app.config,
{
"FEATURE_KUBERNETES_SA_AUTH": True,
"KUBERNETES_SA_AUTH_CONFIG": {
"DEBUGGING": True,
},
"TESTING": True,
},
):
with patch(
"auth.oauth.get_jwt_issuer",
return_value="https://kubernetes.default.svc",
):
mock_service = MagicMock()
mock_service.validate_sa_token.return_value = {"iss": "test"} # No sub
with patch(
"auth.oauth.KubernetesServiceAccountLoginService",
return_value=mock_service,
):
result = validate_kubernetes_sa_token("some.jwt.token")
assert result is not None
assert result.kind == AuthKind.kubernetessa
assert "subject" in result.error_message.lower()
def test_returns_error_for_invalid_subject_format(self, app):
"""Should return error when subject format is invalid."""
with patch.dict(
app.config,
{
"FEATURE_KUBERNETES_SA_AUTH": True,
"KUBERNETES_SA_AUTH_CONFIG": {
"DEBUGGING": True,
},
"TESTING": True,
},
):
with patch(
"auth.oauth.get_jwt_issuer",
return_value="https://kubernetes.default.svc",
):
mock_service = MagicMock()
mock_service.validate_sa_token.return_value = {"sub": "invalid:format"}
mock_service.parse_sa_subject.return_value = None
with patch(
"auth.oauth.KubernetesServiceAccountLoginService",
return_value=mock_service,
):
result = validate_kubernetes_sa_token("some.jwt.token")
assert result is not None
assert result.kind == AuthKind.kubernetessa
assert "Invalid ServiceAccount subject" in result.error_message
class TestValidateKubernetesSATokenWithFixtures:
"""Integration tests with database fixtures."""
def test_returns_error_when_org_creation_fails(self, app):
"""Should return error when system organization creation fails."""
from data import model
with patch.dict(
app.config,
{
"FEATURE_KUBERNETES_SA_AUTH": True,
"KUBERNETES_SA_AUTH_CONFIG": {
"SYSTEM_ORG_NAME": "quay-system",
"DEBUGGING": True,
},
"TESTING": True,
},
):
with patch(
"auth.oauth.get_jwt_issuer",
return_value="https://kubernetes.default.svc",
):
mock_service = MagicMock()
mock_service.validate_sa_token.return_value = {
"sub": "system:serviceaccount:quay:operator"
}
mock_service.parse_sa_subject.return_value = ("quay", "operator")
mock_service.generate_robot_shortname.return_value = "kube_quay_operator"
mock_service.system_org_name = "quay-system"
with patch(
"auth.oauth.KubernetesServiceAccountLoginService",
return_value=mock_service,
):
# Make sure robot lookup fails (triggering org lookup/creation)
with patch(
"data.model.user.lookup_robot",
side_effect=model.InvalidRobotException("Not found"),
):
# Make sure org lookup fails
with patch(
"data.model.organization.get_organization",
side_effect=model.InvalidOrganizationException("Not found"),
):
# Make sure org creation fails
with patch(
"auth.oauth._create_kubernetes_sa_system_org",
side_effect=Exception("Database error"),
):
result = validate_kubernetes_sa_token("some.jwt.token")
assert result is not None
assert result.kind == AuthKind.kubernetessa
assert (
"Failed to create system organization" in result.error_message
)

View File

@@ -11,6 +11,7 @@ class AuthKind(Enum):
credentials = "credentials"
ssojwt = "ssojwt"
federated = "federated"
kubernetessa = "kubernetessa" # Kubernetes ServiceAccount
def __str__(self):
return "%s" % self.value

View File

@@ -295,6 +295,9 @@ class DefaultConfig(ImmutableConfig):
# Gitlab Config.
GITLAB_TRIGGER_CONFIG: Optional[Dict[str, str]] = None
# Kubernetes ServiceAccount authentication configuration
KUBERNETES_SA_AUTH_CONFIG: Optional[Dict[str, str]] = None
NOTIFICATION_QUEUE_NAME = "notification"
DOCKERFILE_BUILD_QUEUE_NAME = "dockerfilebuild"
REPLICATION_QUEUE_NAME = "imagestoragereplication"
@@ -334,6 +337,9 @@ class DefaultConfig(ImmutableConfig):
# Feature Flag: Whether Google login is supported.
FEATURE_GOOGLE_LOGIN = False
# Feature Flag: Whether Kubernetes ServiceAccount OIDC authentication is supported.
FEATURE_KUBERNETES_SA_AUTH = False
# Feature Flag: Whether to support GitHub build triggers.
FEATURE_GITHUB_BUILD = False

View File

@@ -95,6 +95,11 @@ def org_view(o, teams):
features.SUPERUSERS_FULL_ACCESS and allow_if_superuser()
)
# Grant superusers effective admin/member status for UI visibility
if can_view_as_superuser:
is_admin = True
is_member = True
view = {
"name": o.username,
"email": o.email if is_admin or can_view_as_superuser else "",

View File

@@ -31,6 +31,9 @@ GITHUB_LOGIN: FeatureNameValue
# Feature Flag: Whether Google login is supported.
GOOGLE_LOGIN: FeatureNameValue
# Feature Flag: Whether Kubernetes ServiceAccount OIDC authentication is supported.
KUBERNETES_SA_AUTH: FeatureNameValue
# Feature Flag: Whether to support GitHub build triggers.
GITHUB_BUILD: FeatureNameValue

View File

@@ -419,11 +419,18 @@ class _PublicKeyCache(TTLCache):
# Load the keys.
try:
keys = KeySet(
_load_keys_from_url(
keys_url, verify=not self._login_service.config.get("DEBUGGING", False)
)
)
# Check if service has custom SSL verification (e.g., Kubernetes SA with CA bundle)
if hasattr(self._login_service, "get_ssl_verification"):
verify = self._login_service.get_ssl_verification()
else:
verify = not self._login_service.config.get("DEBUGGING", False)
# Check if service provides auth headers for JWKS fetching (e.g., Kubernetes SA)
headers = None
if hasattr(self._login_service, "get_jwks_auth_headers"):
headers = self._login_service.get_jwks_auth_headers()
keys = KeySet(_load_keys_from_url(keys_url, verify=verify, headers=headers))
except Exception as ex:
logger.exception("Exception loading public key")
raise PublicKeyLoadException(str(ex))
@@ -443,7 +450,7 @@ class _PublicKeyCache(TTLCache):
return rsa_key
def _load_keys_from_url(url, verify=True):
def _load_keys_from_url(url, verify=True, headers=None):
"""
Expects something on this form:
{"keys":
@@ -467,7 +474,7 @@ def _load_keys_from_url(url, verify=True):
"""
keys = []
r = request("GET", url, allow_redirects=True, verify=verify)
r = request("GET", url, allow_redirects=True, verify=verify, headers=headers)
if r.status_code == 200:
keys_dict = json.loads(r.text)
for key_spec in keys_dict["keys"]:

View File

@@ -0,0 +1,256 @@
"""
Kubernetes ServiceAccount OIDC authentication service for Quay.
Enables Kubernetes ServiceAccount tokens to authenticate to Quay using OIDC
federation. SA tokens map to robot accounts owned by a configurable system
organization. Authenticated SAs matching the configured superuser subject
receive dynamic superuser privileges.
"""
import logging
import os
import re
from typing import Any, Optional
from oauth.oidc import (
ALLOWED_ALGORITHMS,
JWT_CLOCK_SKEW_SECONDS,
OIDCLoginService,
PublicKeyLoadException,
)
from util.security.jwtutil import InvalidTokenError, decode
logger = logging.getLogger(__name__)
# Standard Kubernetes ServiceAccount CA certificate path
SERVICE_ACCOUNT_CA_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
# Kubernetes SA subject format: system:serviceaccount:<namespace>:<name>
KUBERNETES_SA_SUBJECT_PATTERN = re.compile(
r"^system:serviceaccount:(?P<namespace>[^:]+):(?P<name>[^:]+)$"
)
# Default system org for Kubernetes SA robot accounts
DEFAULT_SYSTEM_ORG_NAME = "quay-system"
# Kubernetes API server OIDC endpoint (in-cluster)
KUBERNETES_OIDC_SERVER = "https://kubernetes.default.svc"
class KubernetesServiceAccountLoginService(OIDCLoginService):
"""
Kubernetes ServiceAccount OIDC authentication service.
Validates Kubernetes SA JWT tokens using OIDC/JWKS and maps them to robot
accounts in a dedicated system organization.
Unlike standard OIDC login services, this service:
- Does not participate in OAuth authorization flows
- Validates bearer tokens directly from Kubernetes pods
- Uses relaxed audience validation (K8s audiences vary)
- Maps SA subjects to robot accounts deterministically
"""
def __init__(
self,
config: dict[str, Any],
key_name: str = "KUBERNETES_SA_AUTH_CONFIG",
client: Any = None,
) -> None:
"""
Initialize the Kubernetes SA login service.
Args:
config: Application configuration dict containing KUBERNETES_SA_AUTH_CONFIG
key_name: Configuration key name (default: KUBERNETES_SA_AUTH_CONFIG)
client: Optional HTTP client for testing
"""
kubernetes_config: dict[str, Any] = config.get(key_name, {})
# Build OIDC-compatible config from Kubernetes SA config
oidc_config: dict[str, Any] = {
key_name: {
"OIDC_SERVER": KUBERNETES_OIDC_SERVER,
"SERVICE_NAME": kubernetes_config.get("SERVICE_NAME", "Kubernetes"),
# CLIENT_ID is used for audience validation - defaults to server hostname
"CLIENT_ID": config.get("SERVER_HOSTNAME", ""),
"CLIENT_SECRET": "", # Not used for SA token validation
"DEBUGGING": kubernetes_config.get("DEBUGGING", False),
},
"HTTPCLIENT": config.get("HTTPCLIENT"),
"TESTING": config.get("TESTING", False),
"FEATURE_MAILING": False,
}
super().__init__(oidc_config, key_name, client)
# Store Kubernetes-specific config
self._kubernetes_config: dict[str, Any] = kubernetes_config
self._system_org_name: str = kubernetes_config.get(
"SYSTEM_ORG_NAME", DEFAULT_SYSTEM_ORG_NAME
)
self._superuser_subject: Optional[str] = kubernetes_config.get("SUPERUSER_SUBJECT")
self._verify_tls: bool = kubernetes_config.get("VERIFY_TLS", True)
self._ca_bundle: str = kubernetes_config.get("CA_BUNDLE", SERVICE_ACCOUNT_CA_PATH)
def service_id(self) -> str:
return "kubernetes_sa"
def service_name(self) -> str:
return self._kubernetes_config.get("SERVICE_NAME", "Kubernetes")
@property
def system_org_name(self) -> str:
"""Name of the organization that owns Kubernetes SA robot accounts."""
return self._system_org_name
def is_superuser_subject(self, subject: str) -> bool:
"""Check if the given SA subject is configured as a superuser."""
if not self._superuser_subject:
return False
return subject == self._superuser_subject
def parse_sa_subject(self, subject: str) -> Optional[tuple[str, str]]:
"""
Parse a Kubernetes SA subject into namespace and name.
Args:
subject: SA subject (e.g., "system:serviceaccount:quay:quay-operator")
Returns:
Tuple of (namespace, sa_name) or None if invalid format
"""
match = KUBERNETES_SA_SUBJECT_PATTERN.match(subject)
if not match:
return None
return match.group("namespace"), match.group("name")
def generate_robot_shortname(self, namespace: str, sa_name: str) -> str:
"""
Generate a deterministic robot shortname from Kubernetes SA identity.
Robot format: kube_<namespace>_<sa_name>
Characters are sanitized to be valid robot names (lowercase alphanumeric + underscore).
Args:
namespace: Kubernetes namespace
sa_name: ServiceAccount name
Returns:
Sanitized robot shortname
"""
# Sanitize: replace invalid chars with underscore, lowercase
safe_ns = re.sub(r"[^a-z0-9]", "_", namespace.lower())
safe_name = re.sub(r"[^a-z0-9]", "_", sa_name.lower())
return f"kube_{safe_ns}_{safe_name}"
def get_ssl_verification(self) -> str | bool:
"""
Get SSL verification setting for Kubernetes API calls.
Returns:
False if TLS verification disabled, path to CA bundle if configured,
or True to use system CA certificates.
"""
if not self._verify_tls:
return False
if self._ca_bundle and os.path.exists(self._ca_bundle):
return self._ca_bundle
return True
def get_jwks_auth_headers(self) -> dict[str, str] | None:
"""
Get auth headers for JWKS endpoint requests.
Kubernetes OIDC JWKS endpoints may require authentication. We use
the pod's ServiceAccount token to authenticate.
Returns:
Dict with Authorization header, or None if token unavailable.
"""
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
if os.path.exists(token_path):
try:
with open(token_path) as f:
token = f.read().strip()
return {"Authorization": f"Bearer {token}"}
except Exception as e:
logger.warning(f"Failed to read SA token for JWKS auth: {e}")
return None
def _load_oidc_config_via_discovery(self, is_debugging: bool) -> dict[str, Any]:
"""
Override to use custom TLS verification for Kubernetes API server.
Kubernetes clusters may use self-signed certificates, so we use
the configured CA bundle for verification.
"""
import json
from posixpath import join
from oauth.oidc import OIDC_WELLKNOWN, DiscoveryFailureException
oidc_server = self.config["OIDC_SERVER"]
if not oidc_server.startswith("https://") and not is_debugging:
raise DiscoveryFailureException("OIDC server must be accessed over SSL")
ssl_verify = self.get_ssl_verification()
discovery_url = join(oidc_server, OIDC_WELLKNOWN)
try:
discovery = self._http_client.get(discovery_url, timeout=5, verify=ssl_verify)
except Exception as e:
logger.exception("Failed to connect to Kubernetes OIDC server: %s", e)
raise DiscoveryFailureException(f"Failed to connect to Kubernetes OIDC server: {e}")
if discovery.status_code // 100 != 2:
logger.debug(
"Got %s response for OIDC discovery: %s",
discovery.status_code,
discovery.text,
)
raise DiscoveryFailureException(
f"Could not load OIDC discovery information: {discovery.status_code}"
)
try:
return json.loads(discovery.text)
except ValueError:
logger.exception("Could not parse OIDC discovery for url: %s", discovery_url)
raise DiscoveryFailureException("Could not parse OIDC discovery information")
def validate_sa_token(self, token: str) -> dict[str, Any]:
"""
Validate a Kubernetes ServiceAccount JWT token.
This method validates the token signature using JWKS from the Kubernetes
API server, but uses relaxed validation for audience and nbf claims
since Kubernetes tokens may not follow standard OIDC conventions.
Args:
token: The JWT token from Kubernetes SA
Returns:
Decoded token claims
Raises:
InvalidTokenError: If token validation fails
PublicKeyLoadException: If JWKS cannot be loaded
"""
options = {
"verify_aud": False, # Kubernetes audience varies by cluster config
"verify_nbf": False, # Some Kubernetes tokens lack nbf claim
}
return self.decode_user_jwt(token, options=options)
def get_issuer(self) -> Optional[str]:
"""
Get the expected token issuer.
For Kubernetes, this is typically the API server URL or a custom
issuer configured at the cluster level.
"""
return self._issuer

View File

@@ -0,0 +1,256 @@
"""Tests for Kubernetes ServiceAccount OIDC authentication service."""
import pytest
from oauth.services.kubernetes_sa import (
DEFAULT_SYSTEM_ORG_NAME,
KUBERNETES_SA_SUBJECT_PATTERN,
KubernetesServiceAccountLoginService,
)
class TestKubernetesServiceAccountLoginService:
"""Tests for KubernetesServiceAccountLoginService."""
@pytest.fixture
def kubernetes_config(self):
"""Basic Kubernetes SA auth configuration."""
return {
"FEATURE_KUBERNETES_SA_AUTH": True,
"SERVER_HOSTNAME": "quay.example.com",
"KUBERNETES_SA_AUTH_CONFIG": {
"SERVICE_NAME": "Kubernetes",
"VERIFY_TLS": False,
"SYSTEM_ORG_NAME": "quay-system",
"SUPERUSER_SUBJECT": "system:serviceaccount:quay-operator:quay-controller",
"DEBUGGING": True,
},
"TESTING": True,
}
@pytest.fixture
def kubernetes_service(self, kubernetes_config):
"""Create Kubernetes SA login service."""
return KubernetesServiceAccountLoginService(kubernetes_config)
def test_service_id(self, kubernetes_service):
"""Service ID should be 'kubernetes_sa'."""
assert kubernetes_service.service_id() == "kubernetes_sa"
def test_service_name(self, kubernetes_service):
"""Service name should be from config."""
assert kubernetes_service.service_name() == "Kubernetes"
def test_service_name_default(self):
"""Service name should default to 'Kubernetes'."""
config = {
"KUBERNETES_SA_AUTH_CONFIG": {},
"TESTING": True,
}
service = KubernetesServiceAccountLoginService(config)
assert service.service_name() == "Kubernetes"
def test_system_org_name(self, kubernetes_service):
"""System org name should be from config."""
assert kubernetes_service.system_org_name == "quay-system"
def test_system_org_name_default(self):
"""System org name should default to 'quay-system'."""
config = {
"KUBERNETES_SA_AUTH_CONFIG": {},
"TESTING": True,
}
service = KubernetesServiceAccountLoginService(config)
assert service.system_org_name == DEFAULT_SYSTEM_ORG_NAME
class TestSASubjectParsing:
"""Tests for SA subject parsing."""
@pytest.fixture
def service(self):
config = {
"KUBERNETES_SA_AUTH_CONFIG": {},
"TESTING": True,
}
return KubernetesServiceAccountLoginService(config)
@pytest.mark.parametrize(
"subject,expected",
[
("system:serviceaccount:default:my-sa", ("default", "my-sa")),
(
"system:serviceaccount:quay-operator:controller",
("quay-operator", "controller"),
),
(
"system:serviceaccount:ns:name-with-dashes",
("ns", "name-with-dashes"),
),
(
"system:serviceaccount:kube-system:default",
("kube-system", "default"),
),
(
"system:serviceaccount:my-namespace:my-service-account",
("my-namespace", "my-service-account"),
),
],
)
def test_parse_valid_subjects(self, service, subject, expected):
"""Valid SA subjects should parse correctly."""
result = service.parse_sa_subject(subject)
assert result == expected
@pytest.mark.parametrize(
"subject",
[
"invalid",
"system:serviceaccount",
"system:serviceaccount:only-namespace",
"user:someone",
"",
"system:node:my-node",
"serviceaccount:default:my-sa", # Missing system: prefix
],
)
def test_parse_invalid_subjects(self, service, subject):
"""Invalid SA subjects should return None."""
result = service.parse_sa_subject(subject)
assert result is None
class TestRobotNameGeneration:
"""Tests for robot name generation."""
@pytest.fixture
def service(self):
config = {
"KUBERNETES_SA_AUTH_CONFIG": {},
"TESTING": True,
}
return KubernetesServiceAccountLoginService(config)
@pytest.mark.parametrize(
"namespace,sa_name,expected",
[
("default", "my-sa", "kube_default_my_sa"),
("quay-operator", "controller-manager", "kube_quay_operator_controller_manager"),
("ns", "name.with.dots", "kube_ns_name_with_dots"),
("NS", "UPPERCASE", "kube_ns_uppercase"),
("kube-system", "default", "kube_kube_system_default"),
("my_namespace", "my_sa", "kube_my_namespace_my_sa"),
],
)
def test_generate_robot_shortname(self, service, namespace, sa_name, expected):
"""Robot shortnames should be generated correctly."""
result = service.generate_robot_shortname(namespace, sa_name)
assert result == expected
def test_robot_shortname_deterministic(self, service):
"""Robot shortnames should be deterministic for same input."""
result1 = service.generate_robot_shortname("quay", "operator")
result2 = service.generate_robot_shortname("quay", "operator")
assert result1 == result2
class TestSuperuserCheck:
"""Tests for superuser subject checking."""
def test_is_superuser_subject_true(self):
"""Matching subject should return True."""
config = {
"KUBERNETES_SA_AUTH_CONFIG": {
"SUPERUSER_SUBJECT": "system:serviceaccount:quay-operator:controller",
},
"TESTING": True,
}
service = KubernetesServiceAccountLoginService(config)
assert service.is_superuser_subject("system:serviceaccount:quay-operator:controller")
def test_is_superuser_subject_false(self):
"""Non-matching subject should return False."""
config = {
"KUBERNETES_SA_AUTH_CONFIG": {
"SUPERUSER_SUBJECT": "system:serviceaccount:quay-operator:controller",
},
"TESTING": True,
}
service = KubernetesServiceAccountLoginService(config)
assert not service.is_superuser_subject("system:serviceaccount:default:random-sa")
def test_is_superuser_subject_no_config(self):
"""No superuser configured should always return False."""
config = {
"KUBERNETES_SA_AUTH_CONFIG": {},
"TESTING": True,
}
service = KubernetesServiceAccountLoginService(config)
assert not service.is_superuser_subject("system:serviceaccount:quay-operator:controller")
class TestSSLVerification:
"""Tests for SSL verification settings."""
def test_verify_tls_false(self):
"""VERIFY_TLS=false should return False."""
config = {
"KUBERNETES_SA_AUTH_CONFIG": {
"VERIFY_TLS": False,
},
"TESTING": True,
}
service = KubernetesServiceAccountLoginService(config)
assert service.get_ssl_verification() is False
def test_verify_tls_true_no_ca(self):
"""VERIFY_TLS=true without CA bundle should return True."""
config = {
"KUBERNETES_SA_AUTH_CONFIG": {
"VERIFY_TLS": True,
"CA_BUNDLE": "/nonexistent/path",
},
"TESTING": True,
}
service = KubernetesServiceAccountLoginService(config)
assert service.get_ssl_verification() is True
def test_verify_tls_default(self):
"""Default should be to verify TLS."""
config = {
"KUBERNETES_SA_AUTH_CONFIG": {},
"TESTING": True,
}
service = KubernetesServiceAccountLoginService(config)
# Will return True since default CA path doesn't exist in test environment
result = service.get_ssl_verification()
assert result is True or isinstance(result, str)
class TestSubjectPattern:
"""Tests for the SA subject regex pattern."""
@pytest.mark.parametrize(
"subject",
[
"system:serviceaccount:default:my-sa",
"system:serviceaccount:kube-system:default",
"system:serviceaccount:my-ns:my-sa-123",
],
)
def test_valid_pattern_matches(self, subject):
"""Valid SA subjects should match the pattern."""
assert KUBERNETES_SA_SUBJECT_PATTERN.match(subject) is not None
@pytest.mark.parametrize(
"subject",
[
"system:node:my-node",
"system:serviceaccount:only-namespace",
"user:admin",
"",
],
)
def test_invalid_pattern_no_match(self, subject):
"""Invalid SA subjects should not match the pattern."""
assert KUBERNETES_SA_SUBJECT_PATTERN.match(subject) is None

View File

@@ -599,6 +599,51 @@ CONFIG_SCHEMA = {
},
},
},
"FEATURE_KUBERNETES_SA_AUTH": {
"type": "boolean",
"description": "Whether Kubernetes ServiceAccount OIDC authentication is supported. "
"When enabled, Kubernetes ServiceAccounts can authenticate to Quay using "
"OIDC-federated tokens. Defaults to False.",
"x-example": True,
},
"KUBERNETES_SA_AUTH_CONFIG": {
"type": ["object", "null"],
"description": "Configuration for Kubernetes ServiceAccount OIDC authentication. "
"Enables Kubernetes operators to authenticate using ServiceAccount tokens. "
"Uses the in-cluster Kubernetes API server (https://kubernetes.default.svc) for OIDC.",
"properties": {
"SERVICE_NAME": {
"type": "string",
"description": "Display name for the authentication service in logs.",
"x-example": "Kubernetes",
},
"VERIFY_TLS": {
"type": "boolean",
"description": "Whether to verify TLS certificates when connecting to "
"the Kubernetes API server. Defaults to True.",
"x-example": True,
},
"CA_BUNDLE": {
"type": ["string", "null"],
"description": "Path to CA certificate bundle for TLS verification. "
"Defaults to /var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
"x-example": "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
},
"SYSTEM_ORG_NAME": {
"type": "string",
"description": "Organization name that will own robot accounts for "
"authenticated ServiceAccounts. Must be created before enabling. "
"Defaults to 'quay-system'.",
"x-example": "quay-system",
},
"SUPERUSER_SUBJECT": {
"type": ["string", "null"],
"description": "Kubernetes ServiceAccount subject that receives "
"superuser permissions. Format: system:serviceaccount:<namespace>:<name>",
"x-example": "system:serviceaccount:quay-operator:quay-operator-controller-manager",
},
},
},
"GITLAB_TRIGGER_CONFIG": {
"type": ["object", "null"],
"description": "Configuration for using Gitlab (Enterprise) for external authentication",

View File

@@ -69,7 +69,7 @@ class ConfigUserManager(UserManager):
usernames.append(username)
new_string = ",".join(usernames)
if len(new_string) <= self._max_length:
if len(new_string) <= self._super_max_length:
self._superusers_array.value = new_string.encode("utf8")
else:
raise Exception("Maximum superuser count reached. Please report this to support.")