mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
test(oidc): add comprehensive PKCE test coverage with improved diagnostics (PROJQUAY-9281) Add extensive test suite for PKCE (Proof Key for Code Exchange) functionality across multiple layers of the application: Test Coverage: - Core PKCE utilities (code_verifier generation, S256 challenge computation) - OAuth base class integration with PKCE parameters - OIDC service with PKCE fixtures and authorization scenarios - Dedicated PKCE flow testing (S256/plain methods, public client support) - API endpoint integration for user PKCE operations - Login flow integration with session-based verifier storage Features Tested: - S256 and plain code challenge methods - Public client support (omitting client_secret) - Session-based code_verifier storage and retrieval - Error handling for missing/invalid verifiers - Integration with existing OIDC authorization flows - Descriptive assertion messages for CI diagnostics All tests include informative error messages with expected vs actual values to improve debugging in CI environments. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
753 lines
24 KiB
Python
753 lines
24 KiB
Python
# pylint: disable=redefined-outer-name, unused-argument, invalid-name, missing-docstring, too-many-arguments
|
|
|
|
import json
|
|
import time
|
|
import urllib.parse
|
|
|
|
import jwt
|
|
import pytest
|
|
import requests
|
|
from authlib.jose import JsonWebKey, jwk
|
|
from cryptography.hazmat.primitives import serialization
|
|
from httmock import HTTMock, urlmatch
|
|
from six.moves.urllib.parse import quote
|
|
|
|
from oauth.base import OAuthUserIdException
|
|
from oauth.login_utils import get_sub_username_email_from_token
|
|
from oauth.oidc import OAuthLoginException, OIDCLoginService, PasswordGrantException
|
|
from util.config import URLSchemeAndHostname
|
|
|
|
|
|
@pytest.fixture(scope="module") # Slow to generate, only do it once.
|
|
def signing_key():
|
|
jwk = JsonWebKey.generate_key("RSA", 2048, is_private=True)
|
|
return {
|
|
"id": "somekey",
|
|
"private_key": jwk.get_private_key().private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
),
|
|
"jwk": jwk.as_dict(),
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def http_client():
|
|
sess = requests.Session()
|
|
adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
|
|
sess.mount("http://", adapter)
|
|
sess.mount("https://", adapter)
|
|
return sess
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def valid_code():
|
|
return "validcode"
|
|
|
|
|
|
@pytest.fixture(params=[True, False])
|
|
def mailing_feature(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=[True, False])
|
|
def email_verified(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=[True, False])
|
|
def userinfo_supported(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture(params=["someusername", "foo@bar.com", None])
|
|
def preferred_username(request):
|
|
return request.param
|
|
|
|
|
|
@pytest.fixture()
|
|
def app_config(http_client, mailing_feature):
|
|
return {
|
|
"PREFERRED_URL_SCHEME": "http",
|
|
"SERVER_HOSTNAME": "localhost",
|
|
"FEATURE_MAILING": mailing_feature,
|
|
"SOMEOIDC_LOGIN_CONFIG": {
|
|
"CLIENT_ID": "foo",
|
|
"CLIENT_SECRET": "bar",
|
|
"SERVICE_NAME": "Some Cool Service",
|
|
"SERVICE_ICON": "http://some/icon",
|
|
"OIDC_SERVER": "http://fakeoidc",
|
|
"DEBUGGING": True,
|
|
},
|
|
"ANOTHEROIDC_LOGIN_CONFIG": {
|
|
"CLIENT_ID": "foo",
|
|
"CLIENT_SECRET": "bar",
|
|
"SERVICE_NAME": "Some Other Service",
|
|
"SERVICE_ICON": "http://some/icon",
|
|
"OIDC_SERVER": "http://fakeoidc",
|
|
"LOGIN_SCOPES": ["openid"],
|
|
"DEBUGGING": True,
|
|
},
|
|
"OIDCWITHPARAMS_LOGIN_CONFIG": {
|
|
"CLIENT_ID": "foo",
|
|
"CLIENT_SECRET": "bar",
|
|
"SERVICE_NAME": "Some Other Service",
|
|
"SERVICE_ICON": "http://some/icon",
|
|
"OIDC_SERVER": "http://fakeoidc",
|
|
"DEBUGGING": True,
|
|
"OIDC_ENDPOINT_CUSTOM_PARAMS": {
|
|
"authorization_endpoint": {
|
|
"some": "param",
|
|
},
|
|
},
|
|
},
|
|
# PKCE-enabled OIDC provider
|
|
"PKCEOIDC_LOGIN_CONFIG": {
|
|
"CLIENT_ID": "foo",
|
|
"CLIENT_SECRET": "bar",
|
|
"SERVICE_NAME": "PKCE Service",
|
|
"SERVICE_ICON": "http://some/icon",
|
|
"OIDC_SERVER": "http://fakeoidc",
|
|
"DEBUGGING": True,
|
|
"USE_PKCE": True,
|
|
"PKCE_METHOD": "S256",
|
|
},
|
|
# PKCE with public client (no client_secret during token exchange)
|
|
"PUBLICPKCEOIDC_LOGIN_CONFIG": {
|
|
"CLIENT_ID": "foo",
|
|
"CLIENT_SECRET": "bar",
|
|
"SERVICE_NAME": "PKCE Public Service",
|
|
"SERVICE_ICON": "http://some/icon",
|
|
"OIDC_SERVER": "http://fakeoidc",
|
|
"DEBUGGING": True,
|
|
"USE_PKCE": True,
|
|
"PKCE_METHOD": "S256",
|
|
"PUBLIC_CLIENT": True,
|
|
},
|
|
# PKCE with plain method
|
|
"PLAINPKCEOIDC_LOGIN_CONFIG": {
|
|
"CLIENT_ID": "foo",
|
|
"CLIENT_SECRET": "bar",
|
|
"SERVICE_NAME": "PKCE Plain Service",
|
|
"SERVICE_ICON": "http://some/icon",
|
|
"OIDC_SERVER": "http://fakeoidc",
|
|
"DEBUGGING": True,
|
|
"USE_PKCE": True,
|
|
"PKCE_METHOD": "plain",
|
|
},
|
|
"HTTPCLIENT": http_client,
|
|
"TESTING": True,
|
|
}
|
|
|
|
|
|
@pytest.fixture()
|
|
def oidc_service(app_config):
|
|
return OIDCLoginService(app_config, "SOMEOIDC_LOGIN_CONFIG")
|
|
|
|
|
|
@pytest.fixture()
|
|
def another_oidc_service(app_config):
|
|
return OIDCLoginService(app_config, "ANOTHEROIDC_LOGIN_CONFIG")
|
|
|
|
|
|
@pytest.fixture()
|
|
def oidc_withparams_service(app_config):
|
|
return OIDCLoginService(app_config, "OIDCWITHPARAMS_LOGIN_CONFIG")
|
|
|
|
|
|
@pytest.fixture()
|
|
def pkce_oidc_service(app_config):
|
|
return OIDCLoginService(app_config, "PKCEOIDC_LOGIN_CONFIG")
|
|
|
|
|
|
@pytest.fixture()
|
|
def public_pkce_oidc_service(app_config):
|
|
return OIDCLoginService(app_config, "PUBLICPKCEOIDC_LOGIN_CONFIG")
|
|
|
|
|
|
@pytest.fixture()
|
|
def plain_pkce_oidc_service(app_config):
|
|
return OIDCLoginService(app_config, "PLAINPKCEOIDC_LOGIN_CONFIG")
|
|
|
|
|
|
@pytest.fixture()
|
|
def discovery_content(userinfo_supported):
|
|
return {
|
|
"scopes_supported": ["openid", "profile", "somescope"],
|
|
"authorization_endpoint": "http://fakeoidc/authorize",
|
|
"token_endpoint": "http://fakeoidc/token",
|
|
"userinfo_endpoint": "http://fakeoidc/userinfo" if userinfo_supported else None,
|
|
"jwks_uri": "http://fakeoidc/jwks",
|
|
}
|
|
|
|
|
|
@pytest.fixture()
|
|
def userinfo_content(preferred_username, email_verified):
|
|
return {
|
|
"sub": "cooluser",
|
|
"preferred_username": preferred_username,
|
|
"email": "foo@example.com",
|
|
"email_verified": email_verified,
|
|
}
|
|
|
|
|
|
@pytest.fixture()
|
|
def id_token(oidc_service, signing_key, userinfo_content, app_config):
|
|
token_data = {
|
|
"iss": oidc_service.config["OIDC_SERVER"],
|
|
"aud": oidc_service.client_id(),
|
|
"nbf": int(time.time()),
|
|
"iat": int(time.time()),
|
|
"exp": int(time.time() + 600),
|
|
"sub": "cooluser",
|
|
}
|
|
|
|
token_data.update(userinfo_content)
|
|
|
|
token_headers = {
|
|
"kid": signing_key["id"],
|
|
}
|
|
|
|
return jwt.encode(token_data, signing_key["private_key"], "RS256", headers=token_headers)
|
|
|
|
|
|
@pytest.fixture()
|
|
def discovery_handler(discovery_content):
|
|
@urlmatch(netloc=r"fakeoidc", path=r".+openid.+")
|
|
def handler(_, __):
|
|
return json.dumps(discovery_content)
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def authorize_handler(discovery_content):
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/authorize")
|
|
def handler(_, request):
|
|
parsed = urllib.parse.urlparse(request.url)
|
|
params = urllib.parse.parse_qs(parsed.query)
|
|
return json.dumps(
|
|
{"authorized": True, "scope": params["scope"][0], "state": params["state"][0]}
|
|
)
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def token_handler_password_grant(oidc_service):
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/token")
|
|
def handler(_, request):
|
|
|
|
params = urllib.parse.parse_qs(request.body)
|
|
if params.get("client_id")[0] != oidc_service.client_id():
|
|
return {"status_code": 401, "content": "Invalid client id"}
|
|
|
|
if params.get("client_secret")[0] != oidc_service.client_secret():
|
|
return {"status_code": 401, "content": "Invalid client secret"}
|
|
|
|
if params.get("grant_type")[0] != "password":
|
|
return {"status_code": 400, "content": "Invalid authorization type"}
|
|
|
|
if params.get("username")[0] != "someusername":
|
|
return {"status_code": 401, "content": "Invalid login credentials"}
|
|
|
|
if params.get("password")[0] != "somepassword":
|
|
return {"status_code": 401, "content": "Invalid login credentials"}
|
|
|
|
content = {
|
|
"access_token": "sometoken",
|
|
}
|
|
return {"status_code": 200, "content": json.dumps(content)}
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def token_handler(oidc_service, id_token, valid_code):
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/token")
|
|
def handler(_, request):
|
|
if int(request.headers["X-Quay-Retry-Attempts"]) < 2:
|
|
raise requests.ConnectionError
|
|
|
|
params = urllib.parse.parse_qs(request.body)
|
|
if params.get("redirect_uri")[0] != "http://localhost/oauth2/someoidc/callback":
|
|
return {"status_code": 400, "content": "Invalid redirect URI"}
|
|
|
|
if params.get("client_id")[0] != oidc_service.client_id():
|
|
return {"status_code": 401, "content": "Invalid client id"}
|
|
|
|
if params.get("client_secret")[0] != oidc_service.client_secret():
|
|
return {"status_code": 401, "content": "Invalid client secret"}
|
|
|
|
if params.get("code")[0] != valid_code:
|
|
return {"status_code": 401, "content": "Invalid code"}
|
|
|
|
if params.get("grant_type")[0] != "authorization_code":
|
|
return {"status_code": 400, "content": "Invalid authorization type"}
|
|
|
|
content = {
|
|
"access_token": "sometoken",
|
|
"id_token": id_token,
|
|
}
|
|
return {"status_code": 200, "content": json.dumps(content)}
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def token_handler_pkce(id_token, valid_code):
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/token")
|
|
def handler(_, request):
|
|
# Validate PKCE code_verifier is present
|
|
params = urllib.parse.parse_qs(request.body)
|
|
if params.get("code")[0] != valid_code:
|
|
return {"status_code": 401, "content": "Invalid code"}
|
|
|
|
if params.get("grant_type")[0] != "authorization_code":
|
|
return {"status_code": 400, "content": "Invalid authorization type"}
|
|
|
|
# Must include code_verifier
|
|
if not params.get("code_verifier"):
|
|
return {"status_code": 400, "content": "Missing code_verifier"}
|
|
|
|
content = {
|
|
"access_token": "sometoken",
|
|
"id_token": id_token,
|
|
}
|
|
return {"status_code": 200, "content": json.dumps(content)}
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def token_handler_public_pkce(id_token, valid_code):
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/token")
|
|
def handler(_, request):
|
|
params = urllib.parse.parse_qs(request.body)
|
|
# client_secret must be omitted for public client
|
|
if "client_secret" in params:
|
|
return {"status_code": 400, "content": "client_secret should not be sent"}
|
|
|
|
if params.get("code")[0] != valid_code:
|
|
return {"status_code": 401, "content": "Invalid code"}
|
|
|
|
if params.get("grant_type")[0] != "authorization_code":
|
|
return {"status_code": 400, "content": "Invalid authorization type"}
|
|
|
|
if not params.get("code_verifier"):
|
|
return {"status_code": 400, "content": "Missing code_verifier"}
|
|
|
|
content = {
|
|
"access_token": "sometoken",
|
|
"id_token": id_token,
|
|
}
|
|
return {"status_code": 200, "content": json.dumps(content)}
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def jwks_handler(signing_key):
|
|
def jwk_with_kid(kid, jwk):
|
|
jwk = jwk.copy()
|
|
jwk.update({"kid": kid})
|
|
return jwk
|
|
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/jwks")
|
|
def handler(_, __):
|
|
content = {"keys": [jwk_with_kid(signing_key["id"], signing_key["jwk"])]}
|
|
return {"status_code": 200, "content": json.dumps(content)}
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def emptykeys_jwks_handler():
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/jwks")
|
|
def handler(_, __):
|
|
content = {"keys": []}
|
|
return {"status_code": 200, "content": json.dumps(content)}
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture
|
|
def userinfo_handler(oidc_service, userinfo_content):
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/userinfo")
|
|
def handler(_, req):
|
|
if req.headers.get("Authorization") != "Bearer sometoken":
|
|
return {"status_code": 401, "content": "Missing expected header"}
|
|
|
|
return {"status_code": 200, "content": json.dumps(userinfo_content)}
|
|
|
|
return handler
|
|
|
|
|
|
@pytest.fixture()
|
|
def invalidsub_userinfo_handler(oidc_service):
|
|
@urlmatch(netloc=r"fakeoidc", path=r"/userinfo")
|
|
def handler(_, __):
|
|
content = {
|
|
"sub": "invalidsub",
|
|
}
|
|
|
|
return {"status_code": 200, "content": json.dumps(content)}
|
|
|
|
return handler
|
|
|
|
|
|
def test_basic_config(oidc_service):
|
|
assert oidc_service.service_id() == "someoidc"
|
|
assert oidc_service.service_name() == "Some Cool Service"
|
|
assert oidc_service.get_icon() == "http://some/icon"
|
|
|
|
|
|
def test_discovery(oidc_service, http_client, discovery_content, discovery_handler):
|
|
with HTTMock(discovery_handler):
|
|
auth = discovery_content["authorization_endpoint"] + "?response_type=code"
|
|
assert oidc_service.authorize_endpoint().to_url() == auth
|
|
assert oidc_service.token_endpoint().to_url() == discovery_content["token_endpoint"]
|
|
|
|
if discovery_content["userinfo_endpoint"] is None:
|
|
assert oidc_service.user_endpoint() is None
|
|
else:
|
|
assert oidc_service.user_endpoint().to_url() == discovery_content["userinfo_endpoint"]
|
|
|
|
assert set(oidc_service.get_login_scopes()) == set(discovery_content["scopes_supported"])
|
|
|
|
|
|
def test_discovery_with_params(
|
|
oidc_withparams_service, http_client, discovery_content, discovery_handler
|
|
):
|
|
with HTTMock(discovery_handler):
|
|
assert "some=param" in oidc_withparams_service.authorize_endpoint().to_url()
|
|
|
|
|
|
def test_filtered_discovery(
|
|
another_oidc_service, http_client, discovery_content, discovery_handler
|
|
):
|
|
with HTTMock(discovery_handler):
|
|
assert another_oidc_service.get_login_scopes() == ["openid"]
|
|
|
|
|
|
def test_public_config(oidc_service, discovery_handler):
|
|
with HTTMock(discovery_handler):
|
|
assert oidc_service.get_public_config()["OIDC"]
|
|
assert oidc_service.get_public_config()["CLIENT_ID"] == "foo"
|
|
|
|
assert "CLIENT_SECRET" not in oidc_service.get_public_config()
|
|
assert "bar" not in list(oidc_service.get_public_config().values())
|
|
|
|
|
|
def test_auth_url(oidc_service, discovery_handler, http_client, authorize_handler):
|
|
config = {"PREFERRED_URL_SCHEME": "https", "SERVER_HOSTNAME": "someserver"}
|
|
|
|
with HTTMock(discovery_handler, authorize_handler):
|
|
url_scheme_and_hostname = URLSchemeAndHostname.from_app_config(config)
|
|
auth_url = oidc_service.get_auth_url(
|
|
url_scheme_and_hostname, "", "some csrf token", ["one", "two"]
|
|
)
|
|
|
|
# Hit the URL and ensure it works.
|
|
result = http_client.get(auth_url).json()
|
|
assert result["state"] == quote("some csrf token")
|
|
assert result["scope"] == "one two"
|
|
|
|
|
|
def test_pkce_token_exchange_includes_verifier(
|
|
pkce_oidc_service,
|
|
discovery_handler,
|
|
app_config,
|
|
http_client,
|
|
token_handler_pkce,
|
|
userinfo_handler,
|
|
jwks_handler,
|
|
valid_code,
|
|
):
|
|
# Ensure that when PKCE is enabled and code_verifier is provided, token exchange succeeds
|
|
with HTTMock(jwks_handler, token_handler_pkce, userinfo_handler, discovery_handler):
|
|
id_token, access_token = pkce_oidc_service.exchange_code_for_tokens(
|
|
app_config, http_client, valid_code, "", code_verifier="test-verifier"
|
|
)
|
|
assert access_token == "sometoken"
|
|
assert id_token is not None
|
|
|
|
|
|
def test_public_client_omits_client_secret(
|
|
public_pkce_oidc_service,
|
|
discovery_handler,
|
|
app_config,
|
|
http_client,
|
|
token_handler_public_pkce,
|
|
userinfo_handler,
|
|
jwks_handler,
|
|
valid_code,
|
|
):
|
|
# PUBLIC_CLIENT True should omit client_secret in token request
|
|
with HTTMock(jwks_handler, token_handler_public_pkce, userinfo_handler, discovery_handler):
|
|
id_token, access_token = public_pkce_oidc_service.exchange_code_for_tokens(
|
|
app_config, http_client, valid_code, "", code_verifier="test-verifier"
|
|
)
|
|
assert access_token == "sometoken"
|
|
assert id_token is not None
|
|
|
|
|
|
def test_exchange_code_invalidcode(
|
|
oidc_service, discovery_handler, app_config, http_client, token_handler
|
|
):
|
|
with HTTMock(token_handler, discovery_handler):
|
|
with pytest.raises(OAuthLoginException):
|
|
oidc_service.exchange_code_for_login(app_config, http_client, "testcode", "")
|
|
|
|
|
|
def test_exchange_code_invalidsub(
|
|
oidc_service,
|
|
discovery_handler,
|
|
app_config,
|
|
http_client,
|
|
token_handler,
|
|
invalidsub_userinfo_handler,
|
|
jwks_handler,
|
|
valid_code,
|
|
userinfo_supported,
|
|
):
|
|
# Skip when userinfo is not supported.
|
|
if not userinfo_supported:
|
|
return
|
|
|
|
with HTTMock(jwks_handler, token_handler, invalidsub_userinfo_handler, discovery_handler):
|
|
# Should fail because the sub of the user info doesn't match that returned by the id_token.
|
|
with pytest.raises(OAuthLoginException):
|
|
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, "")
|
|
|
|
|
|
def test_exchange_code_missingkey(
|
|
oidc_service,
|
|
discovery_handler,
|
|
app_config,
|
|
http_client,
|
|
token_handler,
|
|
userinfo_handler,
|
|
emptykeys_jwks_handler,
|
|
valid_code,
|
|
):
|
|
with HTTMock(emptykeys_jwks_handler, token_handler, userinfo_handler, discovery_handler):
|
|
# Should fail because the key is missing.
|
|
with pytest.raises(OAuthLoginException):
|
|
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, "")
|
|
|
|
|
|
def test_pkce_enabled_true(pkce_oidc_service):
|
|
# Verify pkce_enabled() returns True when USE_PKCE: true
|
|
assert pkce_oidc_service.pkce_enabled() is True
|
|
|
|
|
|
def test_pkce_enabled_false_default(oidc_service):
|
|
# Verify False default when USE_PKCE not set
|
|
assert oidc_service.pkce_enabled() is False
|
|
|
|
|
|
def test_pkce_method_s256_default(pkce_oidc_service):
|
|
# Verify S256 default method
|
|
assert pkce_oidc_service.pkce_method() == "S256"
|
|
|
|
|
|
def test_pkce_method_plain(plain_pkce_oidc_service):
|
|
# Test plain method configuration
|
|
assert plain_pkce_oidc_service.pkce_method() == "plain"
|
|
|
|
|
|
def test_public_client_true(public_pkce_oidc_service):
|
|
# Verify public client flag
|
|
assert public_pkce_oidc_service.public_client() is True
|
|
|
|
|
|
def test_public_client_false_default(pkce_oidc_service):
|
|
# Verify False default
|
|
assert pkce_oidc_service.public_client() is False
|
|
|
|
|
|
def test_exchange_code_for_tokens_with_pkce(
|
|
pkce_oidc_service,
|
|
discovery_handler,
|
|
app_config,
|
|
http_client,
|
|
token_handler_pkce,
|
|
userinfo_handler,
|
|
jwks_handler,
|
|
valid_code,
|
|
):
|
|
# Token exchange with code_verifier
|
|
with HTTMock(jwks_handler, token_handler_pkce, userinfo_handler, discovery_handler):
|
|
id_token, access_token = pkce_oidc_service.exchange_code_for_tokens(
|
|
app_config, http_client, valid_code, "", code_verifier="test-verifier"
|
|
)
|
|
assert access_token == "sometoken"
|
|
assert id_token is not None
|
|
|
|
|
|
def test_exchange_code_for_tokens_without_pkce(
|
|
oidc_service,
|
|
discovery_handler,
|
|
app_config,
|
|
http_client,
|
|
token_handler,
|
|
userinfo_handler,
|
|
jwks_handler,
|
|
valid_code,
|
|
):
|
|
# Standard flow remains unaffected
|
|
with HTTMock(jwks_handler, token_handler, userinfo_handler, discovery_handler):
|
|
id_token, access_token = oidc_service.exchange_code_for_tokens(
|
|
app_config, http_client, valid_code, ""
|
|
)
|
|
assert access_token == "sometoken"
|
|
assert id_token is not None
|
|
|
|
|
|
def test_exchange_code_for_login_with_pkce(
|
|
pkce_oidc_service,
|
|
discovery_handler,
|
|
app_config,
|
|
http_client,
|
|
token_handler_pkce,
|
|
userinfo_handler,
|
|
jwks_handler,
|
|
valid_code,
|
|
preferred_username,
|
|
mailing_feature,
|
|
email_verified,
|
|
):
|
|
# Login flow with code_verifier
|
|
with HTTMock(jwks_handler, token_handler_pkce, userinfo_handler, discovery_handler):
|
|
if mailing_feature and not email_verified:
|
|
# Should fail because there isn't a verified email address.
|
|
with pytest.raises(OAuthLoginException):
|
|
pkce_oidc_service.exchange_code_for_login(
|
|
app_config, http_client, valid_code, "", code_verifier="test-verifier"
|
|
)
|
|
else:
|
|
# Should succeed.
|
|
lid, lusername, lemail, additional_info = pkce_oidc_service.exchange_code_for_login(
|
|
app_config, http_client, valid_code, "", code_verifier="test-verifier"
|
|
)
|
|
|
|
assert lid == "cooluser"
|
|
|
|
if email_verified:
|
|
assert lemail == "foo@example.com"
|
|
else:
|
|
assert lemail is None
|
|
|
|
if preferred_username is not None:
|
|
if preferred_username.find("@") >= 0:
|
|
preferred_username = preferred_username[0 : preferred_username.find("@")]
|
|
|
|
assert lusername == preferred_username
|
|
else:
|
|
assert lusername == lid
|
|
|
|
|
|
def test_exchange_code_validcode(
|
|
oidc_service,
|
|
discovery_handler,
|
|
app_config,
|
|
http_client,
|
|
token_handler,
|
|
userinfo_handler,
|
|
jwks_handler,
|
|
valid_code,
|
|
preferred_username,
|
|
mailing_feature,
|
|
email_verified,
|
|
):
|
|
with HTTMock(jwks_handler, token_handler, userinfo_handler, discovery_handler):
|
|
if mailing_feature and not email_verified:
|
|
# Should fail because there isn't a verified email address.
|
|
with pytest.raises(OAuthLoginException):
|
|
oidc_service.exchange_code_for_login(app_config, http_client, valid_code, "")
|
|
else:
|
|
# Should succeed.
|
|
lid, lusername, lemail, additional_info = oidc_service.exchange_code_for_login(
|
|
app_config, http_client, valid_code, ""
|
|
)
|
|
|
|
assert lid == "cooluser"
|
|
|
|
if email_verified:
|
|
assert lemail == "foo@example.com"
|
|
else:
|
|
assert lemail is None
|
|
|
|
if preferred_username is not None:
|
|
if preferred_username.find("@") >= 0:
|
|
preferred_username = preferred_username[0 : preferred_username.find("@")]
|
|
|
|
assert lusername == preferred_username
|
|
else:
|
|
assert lusername == lid
|
|
|
|
|
|
def test_password_grant_for_login_exceptions(
|
|
oidc_service,
|
|
discovery_handler,
|
|
token_handler,
|
|
token_handler_password_grant,
|
|
):
|
|
with HTTMock(discovery_handler, token_handler):
|
|
with pytest.raises(PasswordGrantException) as err:
|
|
oidc_service.password_grant_for_login("username", None)
|
|
assert err.value == "Missing username or password"
|
|
|
|
with pytest.raises(PasswordGrantException) as err:
|
|
oidc_service.password_grant_for_login(None, "password")
|
|
assert err.value == "Missing username or password"
|
|
|
|
with pytest.raises(PasswordGrantException) as err:
|
|
oidc_service.password_grant_for_login(None, None)
|
|
assert err.value == "Missing username or password"
|
|
|
|
with HTTMock(discovery_handler, token_handler_password_grant):
|
|
with pytest.raises(PasswordGrantException) as err:
|
|
oidc_service.password_grant_for_login("someusername", "wrongpassword")
|
|
assert err.value == "Got non-2XX response for code exchange: 401"
|
|
|
|
|
|
def test_password_grant_for_login(
|
|
oidc_service,
|
|
discovery_handler,
|
|
token_handler_password_grant,
|
|
):
|
|
with HTTMock(discovery_handler, token_handler_password_grant):
|
|
response = oidc_service.password_grant_for_login("someusername", "somepassword")
|
|
assert response.get("access_token") == "sometoken"
|
|
|
|
|
|
def test_get_user_id_missing_sub_raises_exception(oidc_service):
|
|
"""Test that missing sub field raises OAuthUserIdException."""
|
|
decoded_token = {
|
|
"other_field": "value",
|
|
}
|
|
|
|
with pytest.raises(OAuthUserIdException, match="Token missing 'sub' field"):
|
|
oidc_service.get_user_id(decoded_token)
|
|
|
|
|
|
def test_get_sub_username_email_without_login_service():
|
|
"""Test get_sub_username_email_from_token fallback when no login_service."""
|
|
decoded_token = {
|
|
"sub": "token_sub",
|
|
"email": "test@example.com",
|
|
"email_verified": True,
|
|
}
|
|
|
|
user_id, username, email, additional_info = get_sub_username_email_from_token(decoded_token)
|
|
|
|
# Should fallback to decoded_id_token["sub"] when no login_service
|
|
assert user_id == "token_sub"
|
|
|
|
|
|
def test_load_keys_from_url():
|
|
pass
|