1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/oauth/test/test_oidc.py
Dave O'Connor d3c0f10e16 test(oidc): add comprehensive PKCE test coverage (PROJQUAY-9281) (#4257)
test(oidc): add comprehensive PKCE test coverage with improved diagnostics (PROJQUAY-9281)

  Add extensive test suite for PKCE (Proof Key for Code Exchange) functionality
  across multiple layers of the application:

  Test Coverage:
  - Core PKCE utilities (code_verifier generation, S256 challenge computation)
  - OAuth base class integration with PKCE parameters
  - OIDC service with PKCE fixtures and authorization scenarios
  - Dedicated PKCE flow testing (S256/plain methods, public client support)
  - API endpoint integration for user PKCE operations
  - Login flow integration with session-based verifier storage

  Features Tested:
  - S256 and plain code challenge methods
  - Public client support (omitting client_secret)
  - Session-based code_verifier storage and retrieval
  - Error handling for missing/invalid verifiers
  - Integration with existing OIDC authorization flows
  - Descriptive assertion messages for CI diagnostics

  All tests include informative error messages with expected vs actual values
  to improve debugging in CI environments.

  🤖 Generated with [Claude Code](https://claude.com/claude-code)

  Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 12:07:28 -04:00

753 lines
24 KiB
Python

# pylint: disable=redefined-outer-name, unused-argument, invalid-name, missing-docstring, too-many-arguments
import json
import time
import urllib.parse
import jwt
import pytest
import requests
from authlib.jose import JsonWebKey, jwk
from cryptography.hazmat.primitives import serialization
from httmock import HTTMock, urlmatch
from six.moves.urllib.parse import quote
from oauth.base import OAuthUserIdException
from oauth.login_utils import get_sub_username_email_from_token
from oauth.oidc import OAuthLoginException, OIDCLoginService, PasswordGrantException
from util.config import URLSchemeAndHostname
@pytest.fixture(scope="module") # Slow to generate, only do it once.
def signing_key():
jwk = JsonWebKey.generate_key("RSA", 2048, is_private=True)
return {
"id": "somekey",
"private_key": jwk.get_private_key().private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
),
"jwk": jwk.as_dict(),
}
@pytest.fixture(scope="module")
def http_client():
sess = requests.Session()
adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
sess.mount("http://", adapter)
sess.mount("https://", adapter)
return sess
@pytest.fixture(scope="module")
def valid_code():
return "validcode"
@pytest.fixture(params=[True, False])
def mailing_feature(request):
return request.param
@pytest.fixture(params=[True, False])
def email_verified(request):
return request.param
@pytest.fixture(params=[True, False])
def userinfo_supported(request):
return request.param
@pytest.fixture(params=["someusername", "foo@bar.com", None])
def preferred_username(request):
return request.param
@pytest.fixture()
def app_config(http_client, mailing_feature):
return {
"PREFERRED_URL_SCHEME": "http",
"SERVER_HOSTNAME": "localhost",
"FEATURE_MAILING": mailing_feature,
"SOMEOIDC_LOGIN_CONFIG": {
"CLIENT_ID": "foo",
"CLIENT_SECRET": "bar",
"SERVICE_NAME": "Some Cool Service",
"SERVICE_ICON": "http://some/icon",
"OIDC_SERVER": "http://fakeoidc",
"DEBUGGING": True,
},
"ANOTHEROIDC_LOGIN_CONFIG": {
"CLIENT_ID": "foo",
"CLIENT_SECRET": "bar",
"SERVICE_NAME": "Some Other Service",
"SERVICE_ICON": "http://some/icon",
"OIDC_SERVER": "http://fakeoidc",
"LOGIN_SCOPES": ["openid"],
"DEBUGGING": True,
},
"OIDCWITHPARAMS_LOGIN_CONFIG": {
"CLIENT_ID": "foo",
"CLIENT_SECRET": "bar",
"SERVICE_NAME": "Some Other Service",
"SERVICE_ICON": "http://some/icon",
"OIDC_SERVER": "http://fakeoidc",
"DEBUGGING": True,
"OIDC_ENDPOINT_CUSTOM_PARAMS": {
"authorization_endpoint": {
"some": "param",
},
},
},
# PKCE-enabled OIDC provider
"PKCEOIDC_LOGIN_CONFIG": {
"CLIENT_ID": "foo",
"CLIENT_SECRET": "bar",
"SERVICE_NAME": "PKCE Service",
"SERVICE_ICON": "http://some/icon",
"OIDC_SERVER": "http://fakeoidc",
"DEBUGGING": True,
"USE_PKCE": True,
"PKCE_METHOD": "S256",
},
# PKCE with public client (no client_secret during token exchange)
"PUBLICPKCEOIDC_LOGIN_CONFIG": {
"CLIENT_ID": "foo",
"CLIENT_SECRET": "bar",
"SERVICE_NAME": "PKCE Public Service",
"SERVICE_ICON": "http://some/icon",
"OIDC_SERVER": "http://fakeoidc",
"DEBUGGING": True,
"USE_PKCE": True,
"PKCE_METHOD": "S256",
"PUBLIC_CLIENT": True,
},
# PKCE with plain method
"PLAINPKCEOIDC_LOGIN_CONFIG": {
"CLIENT_ID": "foo",
"CLIENT_SECRET": "bar",
"SERVICE_NAME": "PKCE Plain Service",
"SERVICE_ICON": "http://some/icon",
"OIDC_SERVER": "http://fakeoidc",
"DEBUGGING": True,
"USE_PKCE": True,
"PKCE_METHOD": "plain",
},
"HTTPCLIENT": http_client,
"TESTING": True,
}
@pytest.fixture()
def oidc_service(app_config):
return OIDCLoginService(app_config, "SOMEOIDC_LOGIN_CONFIG")
@pytest.fixture()
def another_oidc_service(app_config):
return OIDCLoginService(app_config, "ANOTHEROIDC_LOGIN_CONFIG")
@pytest.fixture()
def oidc_withparams_service(app_config):
return OIDCLoginService(app_config, "OIDCWITHPARAMS_LOGIN_CONFIG")
@pytest.fixture()
def pkce_oidc_service(app_config):
return OIDCLoginService(app_config, "PKCEOIDC_LOGIN_CONFIG")
@pytest.fixture()
def public_pkce_oidc_service(app_config):
return OIDCLoginService(app_config, "PUBLICPKCEOIDC_LOGIN_CONFIG")
@pytest.fixture()
def plain_pkce_oidc_service(app_config):
return OIDCLoginService(app_config, "PLAINPKCEOIDC_LOGIN_CONFIG")
@pytest.fixture()
def discovery_content(userinfo_supported):
return {
"scopes_supported": ["openid", "profile", "somescope"],
"authorization_endpoint": "http://fakeoidc/authorize",
"token_endpoint": "http://fakeoidc/token",
"userinfo_endpoint": "http://fakeoidc/userinfo" if userinfo_supported else None,
"jwks_uri": "http://fakeoidc/jwks",
}
@pytest.fixture()
def userinfo_content(preferred_username, email_verified):
return {
"sub": "cooluser",
"preferred_username": preferred_username,
"email": "foo@example.com",
"email_verified": email_verified,
}
@pytest.fixture()
def id_token(oidc_service, signing_key, userinfo_content, app_config):
token_data = {
"iss": oidc_service.config["OIDC_SERVER"],
"aud": oidc_service.client_id(),
"nbf": int(time.time()),
"iat": int(time.time()),
"exp": int(time.time() + 600),
"sub": "cooluser",
}
token_data.update(userinfo_content)
token_headers = {
"kid": signing_key["id"],
}
return jwt.encode(token_data, signing_key["private_key"], "RS256", headers=token_headers)
@pytest.fixture()
def discovery_handler(discovery_content):
@urlmatch(netloc=r"fakeoidc", path=r".+openid.+")
def handler(_, __):
return json.dumps(discovery_content)
return handler
@pytest.fixture()
def authorize_handler(discovery_content):
@urlmatch(netloc=r"fakeoidc", path=r"/authorize")
def handler(_, request):
parsed = urllib.parse.urlparse(request.url)
params = urllib.parse.parse_qs(parsed.query)
return json.dumps(
{"authorized": True, "scope": params["scope"][0], "state": params["state"][0]}
)
return handler
@pytest.fixture()
def token_handler_password_grant(oidc_service):
@urlmatch(netloc=r"fakeoidc", path=r"/token")
def handler(_, request):
params = urllib.parse.parse_qs(request.body)
if params.get("client_id")[0] != oidc_service.client_id():
return {"status_code": 401, "content": "Invalid client id"}
if params.get("client_secret")[0] != oidc_service.client_secret():
return {"status_code": 401, "content": "Invalid client secret"}
if params.get("grant_type")[0] != "password":
return {"status_code": 400, "content": "Invalid authorization type"}
if params.get("username")[0] != "someusername":
return {"status_code": 401, "content": "Invalid login credentials"}
if params.get("password")[0] != "somepassword":
return {"status_code": 401, "content": "Invalid login credentials"}
content = {
"access_token": "sometoken",
}
return {"status_code": 200, "content": json.dumps(content)}
return handler
@pytest.fixture()
def token_handler(oidc_service, id_token, valid_code):
@urlmatch(netloc=r"fakeoidc", path=r"/token")
def handler(_, request):
if int(request.headers["X-Quay-Retry-Attempts"]) < 2:
raise requests.ConnectionError
params = urllib.parse.parse_qs(request.body)
if params.get("redirect_uri")[0] != "http://localhost/oauth2/someoidc/callback":
return {"status_code": 400, "content": "Invalid redirect URI"}
if params.get("client_id")[0] != oidc_service.client_id():
return {"status_code": 401, "content": "Invalid client id"}
if params.get("client_secret")[0] != oidc_service.client_secret():
return {"status_code": 401, "content": "Invalid client secret"}
if params.get("code")[0] != valid_code:
return {"status_code": 401, "content": "Invalid code"}
if params.get("grant_type")[0] != "authorization_code":
return {"status_code": 400, "content": "Invalid authorization type"}
content = {
"access_token": "sometoken",
"id_token": id_token,
}
return {"status_code": 200, "content": json.dumps(content)}
return handler
@pytest.fixture()
def token_handler_pkce(id_token, valid_code):
@urlmatch(netloc=r"fakeoidc", path=r"/token")
def handler(_, request):
# Validate PKCE code_verifier is present
params = urllib.parse.parse_qs(request.body)
if params.get("code")[0] != valid_code:
return {"status_code": 401, "content": "Invalid code"}
if params.get("grant_type")[0] != "authorization_code":
return {"status_code": 400, "content": "Invalid authorization type"}
# Must include code_verifier
if not params.get("code_verifier"):
return {"status_code": 400, "content": "Missing code_verifier"}
content = {
"access_token": "sometoken",
"id_token": id_token,
}
return {"status_code": 200, "content": json.dumps(content)}
return handler
@pytest.fixture()
def token_handler_public_pkce(id_token, valid_code):
@urlmatch(netloc=r"fakeoidc", path=r"/token")
def handler(_, request):
params = urllib.parse.parse_qs(request.body)
# client_secret must be omitted for public client
if "client_secret" in params:
return {"status_code": 400, "content": "client_secret should not be sent"}
if params.get("code")[0] != valid_code:
return {"status_code": 401, "content": "Invalid code"}
if params.get("grant_type")[0] != "authorization_code":
return {"status_code": 400, "content": "Invalid authorization type"}
if not params.get("code_verifier"):
return {"status_code": 400, "content": "Missing code_verifier"}
content = {
"access_token": "sometoken",
"id_token": id_token,
}
return {"status_code": 200, "content": json.dumps(content)}
return handler
@pytest.fixture()
def jwks_handler(signing_key):
def jwk_with_kid(kid, jwk):
jwk = jwk.copy()
jwk.update({"kid": kid})
return jwk
@urlmatch(netloc=r"fakeoidc", path=r"/jwks")
def handler(_, __):
content = {"keys": [jwk_with_kid(signing_key["id"], signing_key["jwk"])]}
return {"status_code": 200, "content": json.dumps(content)}
return handler
@pytest.fixture()
def emptykeys_jwks_handler():
@urlmatch(netloc=r"fakeoidc", path=r"/jwks")
def handler(_, __):
content = {"keys": []}
return {"status_code": 200, "content": json.dumps(content)}
return handler
@pytest.fixture
def userinfo_handler(oidc_service, userinfo_content):
@urlmatch(netloc=r"fakeoidc", path=r"/userinfo")
def handler(_, req):
if req.headers.get("Authorization") != "Bearer sometoken":
return {"status_code": 401, "content": "Missing expected header"}
return {"status_code": 200, "content": json.dumps(userinfo_content)}
return handler
@pytest.fixture()
def invalidsub_userinfo_handler(oidc_service):
@urlmatch(netloc=r"fakeoidc", path=r"/userinfo")
def handler(_, __):
content = {
"sub": "invalidsub",
}
return {"status_code": 200, "content": json.dumps(content)}
return handler
def test_basic_config(oidc_service):
assert oidc_service.service_id() == "someoidc"
assert oidc_service.service_name() == "Some Cool Service"
assert oidc_service.get_icon() == "http://some/icon"
def test_discovery(oidc_service, http_client, discovery_content, discovery_handler):
with HTTMock(discovery_handler):
auth = discovery_content["authorization_endpoint"] + "?response_type=code"
assert oidc_service.authorize_endpoint().to_url() == auth
assert oidc_service.token_endpoint().to_url() == discovery_content["token_endpoint"]
if discovery_content["userinfo_endpoint"] is None:
assert oidc_service.user_endpoint() is None
else:
assert oidc_service.user_endpoint().to_url() == discovery_content["userinfo_endpoint"]
assert set(oidc_service.get_login_scopes()) == set(discovery_content["scopes_supported"])
def test_discovery_with_params(
oidc_withparams_service, http_client, discovery_content, discovery_handler
):
with HTTMock(discovery_handler):
assert "some=param" in oidc_withparams_service.authorize_endpoint().to_url()
def test_filtered_discovery(
another_oidc_service, http_client, discovery_content, discovery_handler
):
with HTTMock(discovery_handler):
assert another_oidc_service.get_login_scopes() == ["openid"]
def test_public_config(oidc_service, discovery_handler):
with HTTMock(discovery_handler):
assert oidc_service.get_public_config()["OIDC"]
assert oidc_service.get_public_config()["CLIENT_ID"] == "foo"
assert "CLIENT_SECRET" not in oidc_service.get_public_config()
assert "bar" not in list(oidc_service.get_public_config().values())
def test_auth_url(oidc_service, discovery_handler, http_client, authorize_handler):
config = {"PREFERRED_URL_SCHEME": "https", "SERVER_HOSTNAME": "someserver"}
with HTTMock(discovery_handler, authorize_handler):
url_scheme_and_hostname = URLSchemeAndHostname.from_app_config(config)
auth_url = oidc_service.get_auth_url(
url_scheme_and_hostname, "", "some csrf token", ["one", "two"]
)
# Hit the URL and ensure it works.
result = http_client.get(auth_url).json()
assert result["state"] == quote("some csrf token")
assert result["scope"] == "one two"
def test_pkce_token_exchange_includes_verifier(
pkce_oidc_service,
discovery_handler,
app_config,
http_client,
token_handler_pkce,
userinfo_handler,
jwks_handler,
valid_code,
):
# Ensure that when PKCE is enabled and code_verifier is provided, token exchange succeeds
with HTTMock(jwks_handler, token_handler_pkce, userinfo_handler, discovery_handler):
id_token, access_token = pkce_oidc_service.exchange_code_for_tokens(
app_config, http_client, valid_code, "", code_verifier="test-verifier"
)
assert access_token == "sometoken"
assert id_token is not None
def test_public_client_omits_client_secret(
public_pkce_oidc_service,
discovery_handler,
app_config,
http_client,
token_handler_public_pkce,
userinfo_handler,
jwks_handler,
valid_code,
):
# PUBLIC_CLIENT True should omit client_secret in token request
with HTTMock(jwks_handler, token_handler_public_pkce, userinfo_handler, discovery_handler):
id_token, access_token = public_pkce_oidc_service.exchange_code_for_tokens(
app_config, http_client, valid_code, "", code_verifier="test-verifier"
)
assert access_token == "sometoken"
assert id_token is not None
def test_exchange_code_invalidcode(
oidc_service, discovery_handler, app_config, http_client, token_handler
):
with HTTMock(token_handler, discovery_handler):
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, "testcode", "")
def test_exchange_code_invalidsub(
oidc_service,
discovery_handler,
app_config,
http_client,
token_handler,
invalidsub_userinfo_handler,
jwks_handler,
valid_code,
userinfo_supported,
):
# Skip when userinfo is not supported.
if not userinfo_supported:
return
with HTTMock(jwks_handler, token_handler, invalidsub_userinfo_handler, discovery_handler):
# Should fail because the sub of the user info doesn't match that returned by the id_token.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, "")
def test_exchange_code_missingkey(
oidc_service,
discovery_handler,
app_config,
http_client,
token_handler,
userinfo_handler,
emptykeys_jwks_handler,
valid_code,
):
with HTTMock(emptykeys_jwks_handler, token_handler, userinfo_handler, discovery_handler):
# Should fail because the key is missing.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, "")
def test_pkce_enabled_true(pkce_oidc_service):
# Verify pkce_enabled() returns True when USE_PKCE: true
assert pkce_oidc_service.pkce_enabled() is True
def test_pkce_enabled_false_default(oidc_service):
# Verify False default when USE_PKCE not set
assert oidc_service.pkce_enabled() is False
def test_pkce_method_s256_default(pkce_oidc_service):
# Verify S256 default method
assert pkce_oidc_service.pkce_method() == "S256"
def test_pkce_method_plain(plain_pkce_oidc_service):
# Test plain method configuration
assert plain_pkce_oidc_service.pkce_method() == "plain"
def test_public_client_true(public_pkce_oidc_service):
# Verify public client flag
assert public_pkce_oidc_service.public_client() is True
def test_public_client_false_default(pkce_oidc_service):
# Verify False default
assert pkce_oidc_service.public_client() is False
def test_exchange_code_for_tokens_with_pkce(
pkce_oidc_service,
discovery_handler,
app_config,
http_client,
token_handler_pkce,
userinfo_handler,
jwks_handler,
valid_code,
):
# Token exchange with code_verifier
with HTTMock(jwks_handler, token_handler_pkce, userinfo_handler, discovery_handler):
id_token, access_token = pkce_oidc_service.exchange_code_for_tokens(
app_config, http_client, valid_code, "", code_verifier="test-verifier"
)
assert access_token == "sometoken"
assert id_token is not None
def test_exchange_code_for_tokens_without_pkce(
oidc_service,
discovery_handler,
app_config,
http_client,
token_handler,
userinfo_handler,
jwks_handler,
valid_code,
):
# Standard flow remains unaffected
with HTTMock(jwks_handler, token_handler, userinfo_handler, discovery_handler):
id_token, access_token = oidc_service.exchange_code_for_tokens(
app_config, http_client, valid_code, ""
)
assert access_token == "sometoken"
assert id_token is not None
def test_exchange_code_for_login_with_pkce(
pkce_oidc_service,
discovery_handler,
app_config,
http_client,
token_handler_pkce,
userinfo_handler,
jwks_handler,
valid_code,
preferred_username,
mailing_feature,
email_verified,
):
# Login flow with code_verifier
with HTTMock(jwks_handler, token_handler_pkce, userinfo_handler, discovery_handler):
if mailing_feature and not email_verified:
# Should fail because there isn't a verified email address.
with pytest.raises(OAuthLoginException):
pkce_oidc_service.exchange_code_for_login(
app_config, http_client, valid_code, "", code_verifier="test-verifier"
)
else:
# Should succeed.
lid, lusername, lemail, additional_info = pkce_oidc_service.exchange_code_for_login(
app_config, http_client, valid_code, "", code_verifier="test-verifier"
)
assert lid == "cooluser"
if email_verified:
assert lemail == "foo@example.com"
else:
assert lemail is None
if preferred_username is not None:
if preferred_username.find("@") >= 0:
preferred_username = preferred_username[0 : preferred_username.find("@")]
assert lusername == preferred_username
else:
assert lusername == lid
def test_exchange_code_validcode(
oidc_service,
discovery_handler,
app_config,
http_client,
token_handler,
userinfo_handler,
jwks_handler,
valid_code,
preferred_username,
mailing_feature,
email_verified,
):
with HTTMock(jwks_handler, token_handler, userinfo_handler, discovery_handler):
if mailing_feature and not email_verified:
# Should fail because there isn't a verified email address.
with pytest.raises(OAuthLoginException):
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, "")
else:
# Should succeed.
lid, lusername, lemail, additional_info = oidc_service.exchange_code_for_login(
app_config, http_client, valid_code, ""
)
assert lid == "cooluser"
if email_verified:
assert lemail == "foo@example.com"
else:
assert lemail is None
if preferred_username is not None:
if preferred_username.find("@") >= 0:
preferred_username = preferred_username[0 : preferred_username.find("@")]
assert lusername == preferred_username
else:
assert lusername == lid
def test_password_grant_for_login_exceptions(
oidc_service,
discovery_handler,
token_handler,
token_handler_password_grant,
):
with HTTMock(discovery_handler, token_handler):
with pytest.raises(PasswordGrantException) as err:
oidc_service.password_grant_for_login("username", None)
assert err.value == "Missing username or password"
with pytest.raises(PasswordGrantException) as err:
oidc_service.password_grant_for_login(None, "password")
assert err.value == "Missing username or password"
with pytest.raises(PasswordGrantException) as err:
oidc_service.password_grant_for_login(None, None)
assert err.value == "Missing username or password"
with HTTMock(discovery_handler, token_handler_password_grant):
with pytest.raises(PasswordGrantException) as err:
oidc_service.password_grant_for_login("someusername", "wrongpassword")
assert err.value == "Got non-2XX response for code exchange: 401"
def test_password_grant_for_login(
oidc_service,
discovery_handler,
token_handler_password_grant,
):
with HTTMock(discovery_handler, token_handler_password_grant):
response = oidc_service.password_grant_for_login("someusername", "somepassword")
assert response.get("access_token") == "sometoken"
def test_get_user_id_missing_sub_raises_exception(oidc_service):
"""Test that missing sub field raises OAuthUserIdException."""
decoded_token = {
"other_field": "value",
}
with pytest.raises(OAuthUserIdException, match="Token missing 'sub' field"):
oidc_service.get_user_id(decoded_token)
def test_get_sub_username_email_without_login_service():
"""Test get_sub_username_email_from_token fallback when no login_service."""
decoded_token = {
"sub": "token_sub",
"email": "test@example.com",
"email_verified": True,
}
user_id, username, email, additional_info = get_sub_username_email_from_token(decoded_token)
# Should fallback to decoded_id_token["sub"] when no login_service
assert user_id == "token_sub"
def test_load_keys_from_url():
pass