1
0
mirror of https://github.com/quay/quay.git synced 2025-10-22 11:12:26 +03:00
Files
quay/oauth/base.py
Dave O'Connor b9460aa334 feat(oidc): add PKCE (S256/plain) support with session-verifier flow (PROJQUAY-9281) (#4256)
Implement PKCE (Proof Key for Code Exchange) for OIDC authentication to enable
  support for public clients and improve OAuth security.

  Changes:
  - Add oauth/pkce.py with code_verifier generation and S256/plain challenge methods
  - Extend OAuthService to support extra auth/token params and public clients (no client_secret)
  - Implement PKCE in OIDCLoginService with code_verifier token exchange
  - Store PKCE verifier in session during auth initiation (endpoints/api/user.py)
  - Add get_pkce_code_verifier() helper with defensive type checking
    * Encapsulates pkce_enabled check and session data extraction
    * Uses isinstance(data, dict) for safe type validation
    * Centralizes logic across OAuth callbacks (callback, attach, cli)
  - Include example Keycloak PKCE config in local-dev/stack/config.yaml

  Security improvements:
  - PKCE method validation to fail fast on invalid configuration
  - Defensive session data validation in OAuth callbacks
  - Explicit Content-Type headers for form-encoded OAuth requests
  - Optimized non-verified JWT decode (skips unnecessary key fetching)
  - Exponential backoff for token exchange retries (0.5s, 1.0s, 2.0s)

  Configuration:
  - PKCE is opt-in via USE_PKCE config (default: disabled)
  - OIDC_SERVER must end with trailing slash
  - Use host.containers.internal with podman for local dev

  Co-authored-by: Claude <noreply@anthropic.com>
2025-10-01 16:42:25 -04:00

312 lines
9.6 KiB
Python

import copy
import logging
import time
import urllib.error
import urllib.parse
import urllib.request
from abc import ABCMeta, abstractmethod
import requests
from six import add_metaclass
from six.moves.urllib.parse import quote
from util.config import URLSchemeAndHostname
logger = logging.getLogger(__name__)
class OAuthUserIdException(Exception):
"""
Exception raised when user ID cannot be extracted from JWT token.
"""
pass
class OAuthEndpoint(object):
def __init__(self, base_url, params=None):
self.base_url = base_url
self.params = params or {}
def with_param(self, name, value):
params_copy = copy.copy(self.params)
params_copy[name] = value
return OAuthEndpoint(self.base_url, params_copy)
def with_params(self, parameters):
params_copy = copy.copy(self.params)
params_copy.update(parameters)
return OAuthEndpoint(self.base_url, params_copy)
def to_url(self):
(scheme, netloc, path, _, fragment) = urllib.parse.urlsplit(self.base_url)
updated_query = urllib.parse.urlencode(self.params)
return urllib.parse.urlunsplit((scheme, netloc, path, updated_query, fragment))
class OAuthExchangeCodeException(Exception):
"""
Exception raised if a code exchange fails.
"""
pass
class OAuthGetUserInfoException(Exception):
"""
Exception raised if a call to get user information fails.
"""
pass
@add_metaclass(ABCMeta)
class OAuthService(object):
"""
A base class for defining an external service, exposed via OAuth.
"""
def __init__(self, config, key_name):
self.key_name = key_name
self.config = config.get(key_name) or {}
self._is_testing = config.get("TESTING")
@abstractmethod
def service_id(self):
"""
The internal ID for this service.
Must match the URL portion for the service, e.g. `github`
"""
pass
@abstractmethod
def service_name(self):
"""
The user-readable name for the service, e.g. `GitHub`
"""
pass
@abstractmethod
def token_endpoint(self):
"""
Returns the endpoint at which the OAuth code can be exchanged for a token.
"""
pass
@abstractmethod
def user_endpoint(self):
"""
Returns the endpoint at which user information can be looked up.
"""
pass
@abstractmethod
def authorize_endpoint(self):
"""
Returns the for authorization of the OAuth service.
"""
pass
@abstractmethod
def validate_client_id_and_secret(self, http_client, url_scheme_and_hostname):
"""
Performs validation of the client ID and secret, raising an exception on failure.
"""
pass
def get_user_id(self, decoded_id_token: dict) -> str:
"""
Returns the 'sub' field from the decoded ID token.
"""
sub = decoded_id_token.get("sub")
if not sub:
raise OAuthUserIdException("Token missing 'sub' field")
return sub
def requires_form_encoding(self):
"""
Returns True if form encoding is necessary for the exchange_code_for_token call.
"""
return False
def client_id(self):
return self.config.get("CLIENT_ID")
def client_secret(self):
return self.config.get("CLIENT_SECRET")
def login_binding_field(self):
"""
Returns the name of the field (`username` or `email`) used for auto binding an external
login service account to an *internal* login service account.
For example, if the external login service is GitHub and the internal login service is LDAP,
a value of `email` here will cause login-with-Github to conduct a search (via email) in LDAP
for a user, an auto bind the external and internal users together. May return None, in which
case no binding is performing, and login with this external account will simply create a new
account in the database.
"""
return self.config.get("LOGIN_BINDING_FIELD", None)
def get_auth_url(
self, url_scheme_and_hostname, redirect_suffix, csrf_token, scopes, extra_auth_params=None
):
"""
Retrieves the authorization URL for this login service.
"""
redirect_uri = "%s/oauth2/%s/callback%s" % (
url_scheme_and_hostname.get_url(),
self.service_id(),
redirect_suffix,
)
params = {
"client_id": self.client_id(),
"redirect_uri": redirect_uri,
"scope": " ".join(scopes),
"state": quote(csrf_token),
}
if extra_auth_params:
params.update(extra_auth_params)
return self.authorize_endpoint().with_params(params).to_url()
def get_redirect_uri(self, url_scheme_and_hostname, redirect_suffix=""):
return "%s://%s/oauth2/%s/callback%s" % (
url_scheme_and_hostname.url_scheme,
url_scheme_and_hostname.hostname,
self.service_id(),
redirect_suffix,
)
def get_user_info(self, http_client, token):
token_param = {
"alt": "json",
}
headers = {
"Authorization": "Bearer %s" % token,
}
got_user = http_client.get(
self.user_endpoint().to_url(), params=token_param, headers=headers
)
if got_user.status_code // 100 != 2:
raise OAuthGetUserInfoException(
"Non-2XX response code for user_info call: %s" % got_user.status_code
)
user_info = got_user.json()
if user_info is None:
raise OAuthGetUserInfoException()
return user_info
def exchange_code_for_token(
self,
app_config,
http_client,
code,
form_encode=False,
redirect_suffix="",
client_auth=False,
):
"""
Exchanges an OAuth access code for the associated OAuth token.
"""
json_data = self.exchange_code(
app_config, http_client, code, form_encode, redirect_suffix, client_auth
)
access_token = json_data.get("access_token", None)
if access_token is None:
logger.debug(
"Got successful get_access_token response with missing token: %s", json_data
)
raise OAuthExchangeCodeException("Missing `access_token` in OAuth response")
return access_token
def exchange_code(
self,
app_config,
http_client,
code,
form_encode=False,
redirect_suffix="",
client_auth=False,
extra_token_params=None,
omit_client_secret=False,
):
"""
Exchanges an OAuth access code for associated OAuth token and other data.
"""
url_scheme_and_hostname = URLSchemeAndHostname.from_app_config(app_config)
payload = {
"code": code,
"grant_type": "authorization_code",
"redirect_uri": self.get_redirect_uri(url_scheme_and_hostname, redirect_suffix),
}
if extra_token_params:
payload.update(extra_token_params)
headers = {"Accept": "application/json"}
auth = None
if client_auth:
auth = (self.client_id(), self.client_secret())
else:
payload["client_id"] = self.client_id()
if not omit_client_secret and self.client_secret() is not None:
payload["client_secret"] = self.client_secret()
token_url = self.token_endpoint().to_url()
def perform_request():
attempts = 0
max_attempts = 3
base_timeout = 0.5 # Start with 500ms
while attempts < max_attempts:
if self._is_testing:
headers["X-Quay-Retry-Attempts"] = str(attempts)
try:
if form_encode:
form_headers = headers.copy()
form_headers["Content-Type"] = "application/x-www-form-urlencoded"
return http_client.post(
token_url, data=payload, headers=form_headers, auth=auth, timeout=5
)
else:
return http_client.post(
token_url, params=payload, headers=headers, auth=auth, timeout=5
)
except requests.ConnectionError:
logger.debug("Got ConnectionError during OAuth token exchange, retrying.")
attempts += 1
# Exponential backoff: 0.5s, 1.0s, 2.0s
timeout = base_timeout * (2**attempts)
time.sleep(timeout)
get_access_token = perform_request()
if get_access_token is None:
logger.debug("Received too many ConnectionErrors during code exchange")
raise OAuthExchangeCodeException(
"Received too many ConnectionErrors during code exchange"
)
if get_access_token.status_code // 100 != 2:
logger.debug("Got get_access_token response %s", get_access_token.text)
raise OAuthExchangeCodeException(
"Got non-2XX response for code exchange: %s" % get_access_token.status_code
)
json_data = get_access_token.json()
if not json_data:
raise OAuthExchangeCodeException("Got non-JSON response for code exchange")
if "error" in json_data:
raise OAuthExchangeCodeException(json_data.get("error_description", json_data["error"]))
return json_data