1
0
mirror of https://github.com/quay/quay.git synced 2026-01-27 18:42:52 +03:00
Files
quay/data/model/test/test_oauth.py
Harish Govindarajulu a97ca5c231 fix(oauth): prevent redirect URI validation bypass (PROJQUAY-9849) (#4635)
* fix(oauth): prevent redirect URI validation bypass (PROJQUAY-9849)

Co-authored-by: Claude <noreply@anthropic.com>

* test(oauth): add comprehensive coverage for redirect URI validation (PROJQUAY-9849)

Co-authored-by: Claude <noreply@anthropic.com>

* fix(oauth): add percent-encoding protection and improve test coverage (PROJQUAY-9849)

Co-authored-by: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-11-25 13:26:38 -05:00

312 lines
13 KiB
Python

from unittest.mock import patch
from auth.scopes import READ_REPO
from data import model
from data.model.oauth import DatabaseAuthorizationProvider
from test.fixtures import *
REDIRECT_URI = "http://foo/bar/baz"
class MockDatabaseAuthorizationProvider(DatabaseAuthorizationProvider):
def get_authorized_user(self):
return model.user.get_user("devtable")
def setup(num_assignments=0):
user = model.user.get_user("devtable")
org = model.organization.get_organization("buynlarge")
application = model.oauth.create_application(org, "test", "http://foo/bar", REDIRECT_URI)
token_assignment = model.oauth.assign_token_to_user(
application, user, REDIRECT_URI, READ_REPO.scope, "token"
)
return (application, token_assignment, user, org)
def test_get_token_response_with_assignment_id(initialized_db):
application, token_assignment, user, org = setup()
with patch("data.model.oauth.url_for", return_value="http://foo/bar/baz"):
db_auth_provider = MockDatabaseAuthorizationProvider()
response = db_auth_provider.get_token_response(
"token",
application.client_id,
REDIRECT_URI,
token_assignment.uuid,
scope=READ_REPO.scope,
)
assert response.status_code == 302
assert "error" not in response.headers["Location"]
assert model.oauth.get_token_assignment(token_assignment.uuid, user, org) is None
def test_delete_application(initialized_db):
application, token_assignment, user, org = setup()
model.oauth.delete_application(org, application.client_id)
assert model.oauth.get_token_assignment(token_assignment.uuid, user, org) is None
assert model.oauth.lookup_application(org, application.client_id) is None
def test_get_assigned_authorization_for_user(initialized_db):
application, token_assignment, user, org = setup()
assigned_oauth = model.oauth.get_assigned_authorization_for_user(user, token_assignment.uuid)
assert assigned_oauth is not None
assert assigned_oauth.uuid == token_assignment.uuid
def test_list_assigned_authorizations_for_user(initialized_db):
application, token_assignment, user, org = setup()
second_token_assignment = model.oauth.assign_token_to_user(
application, user, REDIRECT_URI, READ_REPO.scope, "token"
)
assigned_oauths = model.oauth.list_assigned_authorizations_for_user(user)
assert len(assigned_oauths) == 3
assert assigned_oauths[1].uuid == token_assignment.uuid
assert assigned_oauths[2].uuid == second_token_assignment.uuid
def test_get_oauth_application_for_client_id(initialized_db):
application, token_assignment, user, org = setup()
assert model.oauth.get_oauth_application_for_client_id(application.client_id) == application
def test_assign_token_to_user(initialized_db):
application, token_assignment, user, org = setup()
created_assignment = model.oauth.get_token_assignment(token_assignment.uuid, user, org)
assert created_assignment is not None
assert created_assignment.uuid == token_assignment.uuid
assert created_assignment.application == application
assert created_assignment.assigned_user == user
assert created_assignment.redirect_uri == REDIRECT_URI
assert created_assignment.scope == READ_REPO.scope
assert created_assignment.response_type == "token"
def test_get_token_assignment_for_client_id(initialized_db):
application, token_assignment, user, org = setup()
assert (
model.oauth.get_token_assignment_for_client_id(
token_assignment.uuid, user, application.client_id
)
== token_assignment
)
# Security tests for PROJQUAY-9849: OAuth Redirect URI Validation Bypass
def test_validate_redirect_uri_blocks_subdomain_takeover(initialized_db):
"""Test that subdomain takeover attacks are blocked"""
org = model.organization.get_organization("buynlarge")
# Create app with proper FQDN for subdomain testing
application = model.oauth.create_application(
org, "test_subdomain", "https://example.com", "https://example.com/callback"
)
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Subdomain takeover attack
malicious_uri = "https://example.com.evil.com/callback"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
def test_validate_redirect_uri_blocks_path_traversal(initialized_db):
"""Test that path traversal attacks are blocked"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Literal path traversal
malicious_uri = f"{application.redirect_uri}/../../evil"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
# Relative path traversal
malicious_uri_2 = f"{application.redirect_uri}/../evil"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)
def test_validate_redirect_uri_blocks_url_encoded_traversal(initialized_db):
"""Test that URL-encoded path traversal is blocked (critical coverage gap)"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# URL-encoded ../ (%2e%2e)
malicious_uri = f"{application.redirect_uri}/%2e%2e/evil"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
# Double URL-encoded ../ (%252e%252e - percent symbol itself encoded)
malicious_uri_2 = f"{application.redirect_uri}/%252e%252e/evil"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)
def test_validate_redirect_uri_blocks_scheme_mismatch(initialized_db):
"""Test that scheme downgrade attacks are blocked"""
org = model.organization.get_organization("buynlarge")
# Create app with HTTPS to test scheme downgrade
application = model.oauth.create_application(
org, "test_https_scheme", "https://example.com", "https://example.com/callback"
)
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Configured as https, block http downgrade
malicious_uri = "http://example.com/callback"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
def test_validate_redirect_uri_blocks_domain_mismatch(initialized_db):
"""Test that different domains are blocked"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Different domain entirely
malicious_uri = "https://evil.com/callback"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
def test_validate_redirect_uri_allows_exact_match(initialized_db):
"""Test that exact matches are allowed"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Exact match should work
assert db_auth_provider.validate_redirect_uri(
application.client_id, application.redirect_uri
)
def test_validate_redirect_uri_allows_subpath(initialized_db):
"""Test that legitimate subpaths are allowed"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Subpath should work
legitimate_uri = f"{application.redirect_uri}/success"
assert db_auth_provider.validate_redirect_uri(application.client_id, legitimate_uri)
def test_validate_redirect_uri_allows_query_params(initialized_db):
"""Test that query parameters are allowed"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Query params should work (no % in path, only in query string)
legitimate_uri = f"{application.redirect_uri}?code=123&state=abc"
assert db_auth_provider.validate_redirect_uri(application.client_id, legitimate_uri)
def test_validate_redirect_uri_blocks_percent_in_path(initialized_db):
"""Test that percent-encoding in path after prefix is blocked"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Any percent-encoding after configured prefix should be blocked
malicious_uri = f"{application.redirect_uri}/%20space"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
# Block even benign-looking encoding
malicious_uri_2 = f"{application.redirect_uri}/success%2Fpath"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)
def test_validate_redirect_uri_with_internal_redirect(initialized_db):
"""Test that internal redirects work correctly"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
with patch("data.model.oauth.get_app_url", return_value="https://quay.io"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Internal redirect should be allowed with exact match
internal_uri = "https://quay.io/oauth/callback"
assert db_auth_provider.validate_redirect_uri(application.client_id, internal_uri)
def test_validate_redirect_uri_blocks_empty_uri(initialized_db):
"""Test that empty or None redirect URIs are blocked"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Empty string should be blocked
assert not db_auth_provider.validate_redirect_uri(application.client_id, "")
# None should be blocked
assert not db_auth_provider.validate_redirect_uri(application.client_id, None)
def test_validate_redirect_uri_blocks_username_in_uri(initialized_db):
"""Test that URIs with username are blocked"""
org = model.organization.get_organization("buynlarge")
# Create app with https URI to test username blocking
application = model.oauth.create_application(
org, "test_https", "https://example.com", "https://example.com/callback"
)
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# URI with username should be blocked
malicious_uri = "https://user@example.com/callback"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
# URI with username and password should be blocked
malicious_uri_2 = "https://user:pass@example.com/callback"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)
def test_validate_redirect_uri_with_root_path(initialized_db):
"""Test validation when configured URI has root path"""
org = model.organization.get_organization("buynlarge")
# Create app with root path
application = model.oauth.create_application(
org, "test_root", "http://example.com", "http://example.com/"
)
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Valid subpath from root should be allowed
assert db_auth_provider.validate_redirect_uri(
application.client_id, "http://example.com/callback"
)
# Path not starting with / should be blocked
assert not db_auth_provider.validate_redirect_uri(
application.client_id, "http://example.comevil"
)
def test_validate_redirect_uri_blocks_path_prefix_mismatch(initialized_db):
"""Test that non-matching path prefixes are blocked"""
application, _, _, _ = setup()
with patch("data.model.oauth.url_for", return_value="/oauth/callback"):
db_auth_provider = MockDatabaseAuthorizationProvider()
# Completely different path should be blocked
malicious_uri = "http://foo/different/path"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri)
# Path that doesn't match configured prefix
malicious_uri_2 = "http://foo/ba"
assert not db_auth_provider.validate_redirect_uri(application.client_id, malicious_uri_2)