mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* fix(oauth): prevent redirect URI validation bypass (PROJQUAY-9849) Co-authored-by: Claude <noreply@anthropic.com> * test(oauth): add comprehensive coverage for redirect URI validation (PROJQUAY-9849) Co-authored-by: Claude <noreply@anthropic.com> * fix(oauth): add percent-encoding protection and improve test coverage (PROJQUAY-9849) Co-authored-by: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
312 lines
13 KiB
Python
312 lines
13 KiB
Python
from unittest.mock import patch
|
|
|
|
from auth.scopes import READ_REPO
|
|
from data import model
|
|
from data.model.oauth import DatabaseAuthorizationProvider
|
|
from test.fixtures import *
|
|
|
|
REDIRECT_URI = "http://foo/bar/baz"
|
|
|
|
|
|
class MockDatabaseAuthorizationProvider(DatabaseAuthorizationProvider):
|
|
def get_authorized_user(self):
|
|
return model.user.get_user("devtable")
|
|
|
|
|
|
def setup(num_assignments=0):
|
|
user = model.user.get_user("devtable")
|
|
org = model.organization.get_organization("buynlarge")
|
|
application = model.oauth.create_application(org, "test", "http://foo/bar", REDIRECT_URI)
|
|
token_assignment = model.oauth.assign_token_to_user(
|
|
application, user, REDIRECT_URI, READ_REPO.scope, "token"
|
|
)
|
|
return (application, token_assignment, user, org)
|
|
|
|
|
|
def test_get_token_response_with_assignment_id(initialized_db):
|
|
application, token_assignment, user, org = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="http://foo/bar/baz"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
response = db_auth_provider.get_token_response(
|
|
"token",
|
|
application.client_id,
|
|
REDIRECT_URI,
|
|
token_assignment.uuid,
|
|
scope=READ_REPO.scope,
|
|
)
|
|
|
|
assert response.status_code == 302
|
|
assert "error" not in response.headers["Location"]
|
|
assert model.oauth.get_token_assignment(token_assignment.uuid, user, org) is None
|
|
|
|
|
|
def test_delete_application(initialized_db):
|
|
application, token_assignment, user, org = setup()
|
|
|
|
model.oauth.delete_application(org, application.client_id)
|
|
|
|
assert model.oauth.get_token_assignment(token_assignment.uuid, user, org) is None
|
|
assert model.oauth.lookup_application(org, application.client_id) is None
|
|
|
|
|
|
def test_get_assigned_authorization_for_user(initialized_db):
|
|
application, token_assignment, user, org = setup()
|
|
assigned_oauth = model.oauth.get_assigned_authorization_for_user(user, token_assignment.uuid)
|
|
assert assigned_oauth is not None
|
|
assert assigned_oauth.uuid == token_assignment.uuid
|
|
|
|
|
|
def test_list_assigned_authorizations_for_user(initialized_db):
|
|
application, token_assignment, user, org = setup()
|
|
second_token_assignment = model.oauth.assign_token_to_user(
|
|
application, user, REDIRECT_URI, READ_REPO.scope, "token"
|
|
)
|
|
|
|
assigned_oauths = model.oauth.list_assigned_authorizations_for_user(user)
|
|
assert len(assigned_oauths) == 3
|
|
assert assigned_oauths[1].uuid == token_assignment.uuid
|
|
assert assigned_oauths[2].uuid == second_token_assignment.uuid
|
|
|
|
|
|
def test_get_oauth_application_for_client_id(initialized_db):
|
|
application, token_assignment, user, org = setup()
|
|
assert model.oauth.get_oauth_application_for_client_id(application.client_id) == application
|
|
|
|
|
|
def test_assign_token_to_user(initialized_db):
|
|
application, token_assignment, user, org = setup()
|
|
created_assignment = model.oauth.get_token_assignment(token_assignment.uuid, user, org)
|
|
assert created_assignment is not None
|
|
assert created_assignment.uuid == token_assignment.uuid
|
|
assert created_assignment.application == application
|
|
assert created_assignment.assigned_user == user
|
|
assert created_assignment.redirect_uri == REDIRECT_URI
|
|
assert created_assignment.scope == READ_REPO.scope
|
|
assert created_assignment.response_type == "token"
|
|
|
|
|
|
def test_get_token_assignment_for_client_id(initialized_db):
|
|
application, token_assignment, user, org = setup()
|
|
assert (
|
|
model.oauth.get_token_assignment_for_client_id(
|
|
token_assignment.uuid, user, application.client_id
|
|
)
|
|
== token_assignment
|
|
)
|
|
|
|
|
|
# Security tests for PROJQUAY-9849: OAuth Redirect URI Validation Bypass
|
|
def test_validate_redirect_uri_blocks_subdomain_takeover(initialized_db):
|
|
"""Test that subdomain takeover attacks are blocked"""
|
|
org = model.organization.get_organization("buynlarge")
|
|
# Create app with proper FQDN for subdomain testing
|
|
application = model.oauth.create_application(
|
|
org, "test_subdomain", "https://example.com", "https://example.com/callback"
|
|
)
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Subdomain takeover attack
|
|
malicious_uri = "https://example.com.evil.com/callback"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
|
|
|
|
|
|
def test_validate_redirect_uri_blocks_path_traversal(initialized_db):
|
|
"""Test that path traversal attacks are blocked"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Literal path traversal
|
|
malicious_uri = f"{application.redirect_uri}/../../evil"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
|
|
|
|
# Relative path traversal
|
|
malicious_uri_2 = f"{application.redirect_uri}/../evil"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)
|
|
|
|
|
|
def test_validate_redirect_uri_blocks_url_encoded_traversal(initialized_db):
|
|
"""Test that URL-encoded path traversal is blocked (critical coverage gap)"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# URL-encoded ../ (%2e%2e)
|
|
malicious_uri = f"{application.redirect_uri}/%2e%2e/evil"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
|
|
|
|
# Double URL-encoded ../ (%252e%252e - percent symbol itself encoded)
|
|
malicious_uri_2 = f"{application.redirect_uri}/%252e%252e/evil"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)
|
|
|
|
|
|
def test_validate_redirect_uri_blocks_scheme_mismatch(initialized_db):
|
|
"""Test that scheme downgrade attacks are blocked"""
|
|
org = model.organization.get_organization("buynlarge")
|
|
# Create app with HTTPS to test scheme downgrade
|
|
application = model.oauth.create_application(
|
|
org, "test_https_scheme", "https://example.com", "https://example.com/callback"
|
|
)
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Configured as https, block http downgrade
|
|
malicious_uri = "http://example.com/callback"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
|
|
|
|
|
|
def test_validate_redirect_uri_blocks_domain_mismatch(initialized_db):
|
|
"""Test that different domains are blocked"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Different domain entirely
|
|
malicious_uri = "https://evil.com/callback"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
|
|
|
|
|
|
def test_validate_redirect_uri_allows_exact_match(initialized_db):
|
|
"""Test that exact matches are allowed"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Exact match should work
|
|
assert db_auth_provider.validate_redirect_uri(
|
|
application.client_id, application.redirect_uri
|
|
)
|
|
|
|
|
|
def test_validate_redirect_uri_allows_subpath(initialized_db):
|
|
"""Test that legitimate subpaths are allowed"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Subpath should work
|
|
legitimate_uri = f"{application.redirect_uri}/success"
|
|
assert db_auth_provider.validate_redirect_uri(application.client_id, legitimate_uri)
|
|
|
|
|
|
def test_validate_redirect_uri_allows_query_params(initialized_db):
|
|
"""Test that query parameters are allowed"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Query params should work (no % in path, only in query string)
|
|
legitimate_uri = f"{application.redirect_uri}?code=123&state=abc"
|
|
assert db_auth_provider.validate_redirect_uri(application.client_id, legitimate_uri)
|
|
|
|
|
|
def test_validate_redirect_uri_blocks_percent_in_path(initialized_db):
|
|
"""Test that percent-encoding in path after prefix is blocked"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Any percent-encoding after configured prefix should be blocked
|
|
malicious_uri = f"{application.redirect_uri}/%20space"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
|
|
|
|
# Block even benign-looking encoding
|
|
malicious_uri_2 = f"{application.redirect_uri}/success%2Fpath"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)
|
|
|
|
|
|
def test_validate_redirect_uri_with_internal_redirect(initialized_db):
|
|
"""Test that internal redirects work correctly"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
with patch("data.model.oauth.get_app_url", return_value="https://quay.io"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Internal redirect should be allowed with exact match
|
|
internal_uri = "https://quay.io/oauth/callback"
|
|
assert db_auth_provider.validate_redirect_uri(application.client_id, internal_uri)
|
|
|
|
|
|
def test_validate_redirect_uri_blocks_empty_uri(initialized_db):
|
|
"""Test that empty or None redirect URIs are blocked"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Empty string should be blocked
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, "")
|
|
|
|
# None should be blocked
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, None)
|
|
|
|
|
|
def test_validate_redirect_uri_blocks_username_in_uri(initialized_db):
|
|
"""Test that URIs with username are blocked"""
|
|
org = model.organization.get_organization("buynlarge")
|
|
# Create app with https URI to test username blocking
|
|
application = model.oauth.create_application(
|
|
org, "test_https", "https://example.com", "https://example.com/callback"
|
|
)
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# URI with username should be blocked
|
|
malicious_uri = "https://user@example.com/callback"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
|
|
|
|
# URI with username and password should be blocked
|
|
malicious_uri_2 = "https://user:pass@example.com/callback"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)
|
|
|
|
|
|
def test_validate_redirect_uri_with_root_path(initialized_db):
|
|
"""Test validation when configured URI has root path"""
|
|
org = model.organization.get_organization("buynlarge")
|
|
# Create app with root path
|
|
application = model.oauth.create_application(
|
|
org, "test_root", "http://example.com", "http://example.com/"
|
|
)
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Valid subpath from root should be allowed
|
|
assert db_auth_provider.validate_redirect_uri(
|
|
application.client_id, "http://example.com/callback"
|
|
)
|
|
|
|
# Path not starting with / should be blocked
|
|
assert not db_auth_provider.validate_redirect_uri(
|
|
application.client_id, "http://example.comevil"
|
|
)
|
|
|
|
|
|
def test_validate_redirect_uri_blocks_path_prefix_mismatch(initialized_db):
|
|
"""Test that non-matching path prefixes are blocked"""
|
|
application, _, _, _ = setup()
|
|
|
|
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
|
|
db_auth_provider = MockDatabaseAuthorizationProvider()
|
|
|
|
# Completely different path should be blocked
|
|
malicious_uri = "http://foo/different/path"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
|
|
|
|
# Path that doesn't match configured prefix
|
|
malicious_uri_2 = "http://foo/ba"
|
|
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)
|