1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/test/test_user_pkce.py
Dave O'Connor d3c0f10e16 test(oidc): add comprehensive PKCE test coverage (PROJQUAY-9281) (#4257)
test(oidc): add comprehensive PKCE test coverage with improved diagnostics (PROJQUAY-9281)

  Add extensive test suite for PKCE (Proof Key for Code Exchange) functionality
  across multiple layers of the application:

  Test Coverage:
  - Core PKCE utilities (code_verifier generation, S256 challenge computation)
  - OAuth base class integration with PKCE parameters
  - OIDC service with PKCE fixtures and authorization scenarios
  - Dedicated PKCE flow testing (S256/plain methods, public client support)
  - API endpoint integration for user PKCE operations
  - Login flow integration with session-based verifier storage

  Features Tested:
  - S256 and plain code challenge methods
  - Public client support (omitting client_secret)
  - Session-based code_verifier storage and retrieval
  - Error handling for missing/invalid verifiers
  - Integration with existing OIDC authorization flows
  - Descriptive assertion messages for CI diagnostics

  All tests include informative error messages with expected vs actual values
  to improve debugging in CI environments.

  🤖 Generated with [Claude Code](https://claude.com/claude-code)

  Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-02 12:07:28 -04:00

352 lines
13 KiB
Python

# pylint: disable=missing-docstring
import json
import time
from unittest.mock import MagicMock, patch
import pytest
from flask import session
from endpoints.api.test.shared import conduct_api_call
from endpoints.api.user import ExternalLoginInformation
from endpoints.test.shared import client_with_identity
from test.fixtures import *
class MockPKCEOIDCService:
"""Mock OIDC service with PKCE enabled for testing"""
def __init__(self, pkce_enabled=True, pkce_method="S256"):
self.service_id_value = "testoidc"
self.pkce_enabled_value = pkce_enabled
self.pkce_method_value = pkce_method
def service_id(self):
return self.service_id_value
def pkce_enabled(self):
return self.pkce_enabled_value
def pkce_method(self):
return self.pkce_method_value
def get_login_scopes(self):
return ["openid", "profile", "email"]
def get_auth_url(
self, url_scheme_hostname, redirect_suffix, csrf_token, scopes, extra_auth_params=None
):
# Simulate auth URL generation with PKCE parameters
base_url = "https://auth.example.com/authorize"
params = {
"client_id": "test_client",
"response_type": "code",
"scope": " ".join(scopes),
"state": csrf_token,
"redirect_uri": f"{url_scheme_hostname.get_url()}/oauth2/{self.service_id_value}/callback{redirect_suffix}",
}
if extra_auth_params:
params.update(extra_auth_params)
param_string = "&".join(f"{k}={v}" for k, v in params.items())
return f"{base_url}?{param_string}"
class MockNonPKCEOIDCService(MockPKCEOIDCService):
"""Mock OIDC service without PKCE for testing"""
def __init__(self):
super().__init__(pkce_enabled=False)
def service_id(self):
return "nonpkceoidc"
class TestExternalLoginInformationPKCE:
@pytest.fixture()
def mock_pkce_service(self):
return MockPKCEOIDCService()
@pytest.fixture()
def mock_non_pkce_service(self):
return MockNonPKCEOIDCService()
def test_external_login_info_pkce_enabled(self, app, mock_pkce_service):
"""Test PKCE session storage and auth URL generation"""
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_pkce_service
with client_with_identity("devtable", app) as cl:
# Make request to get external login info
response = conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "testoidc"},
{"kind": "login"},
)
# Verify response contains auth URL
assert response.status_code == 200
data = response.json
assert "auth_url" in data
auth_url = data["auth_url"]
# Verify PKCE parameters are in the auth URL
assert "code_challenge=" in auth_url
assert "code_challenge_method=" in auth_url
# Verify session contains PKCE data
with cl.session_transaction() as sess:
session_key = "_oauth_pkce_testoidc"
assert session_key in sess
pkce_data = sess[session_key]
assert "verifier" in pkce_data
assert "ts" in pkce_data
assert len(pkce_data["verifier"]) == 64 # Default verifier length
assert isinstance(pkce_data["ts"], int)
def test_external_login_info_pkce_disabled(self, app, mock_non_pkce_service):
"""Test standard behavior without PKCE"""
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_non_pkce_service
with client_with_identity("devtable", app) as cl:
response = conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "nonpkceoidc"},
{"kind": "login"},
)
# Verify response contains auth URL
assert response.status_code == 200
data = response.json
assert "auth_url" in data
auth_url = data["auth_url"]
# Verify PKCE parameters are NOT in the auth URL
assert "code_challenge=" not in auth_url
assert "code_challenge_method=" not in auth_url
# Verify session does NOT contain PKCE data
with cl.session_transaction() as sess:
session_key = "_oauth_pkce_nonpkceoidc"
assert session_key not in sess
def test_external_login_info_pkce_s256_method(self, app):
"""Test S256 challenge generation"""
mock_service = MockPKCEOIDCService(pkce_method="S256")
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_service
with client_with_identity("devtable", app) as cl:
response = conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "testoidc"},
{"kind": "login"},
)
assert response.status_code == 200
auth_url = response.json["auth_url"]
# Verify S256 method is used
assert "code_challenge_method=S256" in auth_url
def test_external_login_info_pkce_plain_method(self, app):
"""Test plain challenge generation"""
mock_service = MockPKCEOIDCService(pkce_method="plain")
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_service
with client_with_identity("devtable", app) as cl:
response = conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "testoidc"},
{"kind": "login"},
)
assert response.status_code == 200
auth_url = response.json["auth_url"]
# Verify plain method is used
assert "code_challenge_method=plain" in auth_url
def test_external_login_info_session_key_format(self, app, mock_pkce_service):
"""Test correct session key naming"""
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_pkce_service
with client_with_identity("devtable", app) as cl:
conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "testoidc"},
{"kind": "login"},
)
# Verify session key follows expected format
with cl.session_transaction() as sess:
session_key = "_oauth_pkce_testoidc"
assert session_key in sess
def test_external_login_info_session_data_structure(self, app, mock_pkce_service):
"""Test verifier and timestamp storage"""
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_pkce_service
with client_with_identity("devtable", app) as cl:
before_time = int(time.time())
conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "testoidc"},
{"kind": "login"},
)
after_time = int(time.time())
with cl.session_transaction() as sess:
session_key = "_oauth_pkce_testoidc"
pkce_data = sess[session_key]
# Verify data structure
assert isinstance(pkce_data, dict)
assert "verifier" in pkce_data
assert "ts" in pkce_data
# Verify verifier is valid
verifier = pkce_data["verifier"]
assert isinstance(verifier, str)
assert 43 <= len(verifier) <= 128 # PKCE spec length
# Verify timestamp is reasonable
timestamp = pkce_data["ts"]
assert isinstance(timestamp, int)
assert before_time <= timestamp <= after_time
def test_external_login_info_attach_flow(self, app, mock_pkce_service):
"""Test PKCE works with attach flow"""
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_pkce_service
with client_with_identity("devtable", app) as cl:
response = conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "testoidc"},
{"kind": "attach"},
)
assert response.status_code == 200
auth_url = response.json["auth_url"]
# Verify PKCE parameters are present for attach flow
assert "code_challenge=" in auth_url
assert "code_challenge_method=" in auth_url
# Verify session contains PKCE data for attach flow
with cl.session_transaction() as sess:
session_key = "_oauth_pkce_testoidc"
assert session_key in sess
def test_external_login_info_cli_flow(self, app, mock_pkce_service):
"""Test PKCE works with CLI flow"""
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_pkce_service
with client_with_identity("devtable", app) as cl:
response = conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "testoidc"},
{"kind": "cli"},
)
assert response.status_code == 200
auth_url = response.json["auth_url"]
# Verify PKCE parameters are present for CLI flow
assert "code_challenge=" in auth_url
assert "code_challenge_method=" in auth_url
def test_external_login_info_service_without_pkce_methods(self, app):
"""Test graceful handling when service doesn't have PKCE methods"""
# Create a service without pkce_enabled or pkce_method methods
mock_service = MagicMock()
mock_service.service_id.return_value = "oldservice"
mock_service.get_login_scopes.return_value = ["openid"]
mock_service.get_auth_url.return_value = "https://auth.example.com/authorize?basic=params"
# Don't add pkce_enabled or pkce_method methods to simulate old service
del mock_service.pkce_enabled
del mock_service.pkce_method
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_service
with client_with_identity("devtable", app) as cl:
response = conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "oldservice"},
{"kind": "login"},
)
# Should succeed without PKCE
assert response.status_code == 200
auth_url = response.json["auth_url"]
# Should not contain PKCE parameters
assert "code_challenge=" not in auth_url
assert "code_challenge_method=" not in auth_url
def test_external_login_info_challenge_verifier_relationship(self, app, mock_pkce_service):
"""Test that challenge is properly derived from verifier"""
with patch("app.oauth_login.get_service") as mock_get_service:
mock_get_service.return_value = mock_pkce_service
with client_with_identity("devtable", app) as cl:
response = conduct_api_call(
cl,
ExternalLoginInformation,
"POST",
{"service_id": "testoidc"},
{"kind": "login"},
)
assert response.status_code == 200
auth_url = response.json["auth_url"]
# Extract challenge from URL
import urllib.parse
parsed_url = urllib.parse.urlparse(auth_url)
query_params = urllib.parse.parse_qs(parsed_url.query)
challenge = query_params["code_challenge"][0]
method = query_params["code_challenge_method"][0]
# Get verifier from session
with cl.session_transaction() as sess:
session_key = "_oauth_pkce_testoidc"
verifier = sess[session_key]["verifier"]
# Verify challenge was derived from verifier correctly
from oauth.pkce import code_challenge
expected_challenge = code_challenge(verifier, method)
assert challenge == expected_challenge