1
0
mirror of https://github.com/quay/quay.git synced 2025-07-28 20:22:05 +03:00

chore: update werkzeug and related package versions (PROJQUAY-5098) (#1982)

* chore: update werkzeug and related package versions (PROJQUAY-5098)

Path converter related change reference: https://github.com/pallets/werkzeug/issues/2506

* Update query count
This commit is contained in:
Kenny Lee Sin Cheong
2023-09-12 11:51:09 -04:00
committed by GitHub
parent 8314a58515
commit 72f7c64ed6
39 changed files with 278 additions and 237 deletions

4
app.py
View File

@ -99,7 +99,7 @@ app.config.update(environ_config)
# Fix remote address handling for Flask.
if app.config.get("PROXY_COUNT", 1):
app.wsgi_app = ProxyFix(app.wsgi_app)
app.wsgi_app = ProxyFix(app.wsgi_app) # type: ignore[method-assign]
# Allow user to define a custom storage preference for the local instance.
_distributed_storage_preference = os.environ.get("QUAY_DISTRIBUTED_STORAGE_PREFERENCE", "").split()
@ -124,7 +124,7 @@ features.import_features(app.config)
# Register additional experimental artifact types.
# TODO: extract this into a real, dynamic registration system.
if features.GENERAL_OCI_SUPPORT:
for media_type, layer_types in app.config.get("ALLOWED_OCI_ARTIFACT_TYPES").items():
for media_type, layer_types in app.config["ALLOWED_OCI_ARTIFACT_TYPES"].items():
register_artifact_type(media_type, layer_types)
if features.HELM_OCI_SUPPORT:

View File

@ -6,7 +6,8 @@ import jwt
import pytest
from cryptography.hazmat.primitives import serialization
from app import app, instance_keys
from app import app as flask_app
from app import instance_keys
from auth.auth_context_type import ValidatedAuthContext
from auth.registry_jwt_auth import InvalidJWTException, identity_from_bearer_token
from data import model # TODO: remove this after service keys are decoupled
@ -15,7 +16,7 @@ from initdb import finished_database_for_testing, setup_database_for_testing
from util.morecollections import AttrDict
from util.security.registry_jwt import ANONYMOUS_SUB, build_context_and_subject
TEST_AUDIENCE = app.config["SERVER_HOSTNAME"]
TEST_AUDIENCE = flask_app.config["SERVER_HOSTNAME"]
TEST_USER = AttrDict({"username": "joeuser", "uuid": "foobar", "enabled": True})
MAX_SIGNED_S = 3660
TOKEN_VALIDITY_LIFETIME_S = 60 * 60 # 1 hour

View File

@ -33,7 +33,7 @@ class UserfilesHandlers(View):
buffered,
mimetype=self._magic.from_buffer(file_header_bytes),
as_attachment=True,
attachment_filename=file_id,
download_name=file_id,
)
except IOError:
logger.exception("Error reading user file")

View File

@ -4,6 +4,7 @@ from calendar import timegm
from email.utils import formatdate
from functools import partial, wraps
import pytz
from flask import Blueprint, request, session
from flask_restful import Api, Resource, abort, reqparse
from flask_restful.utils import unpack
@ -438,7 +439,7 @@ def require_fresh_login(func):
)
if (
last_login >= valid_span
last_login.replace(tzinfo=pytz.UTC) >= valid_span.replace(tzinfo=pytz.UTC)
or not authentication.supports_fresh_login
or not authentication.has_password_set(user.username)
):

View File

@ -7,8 +7,8 @@ from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
def test_app_specific_tokens(app, client):
with client_with_identity("devtable", client) as cl:
def test_app_specific_tokens(app):
with client_with_identity("devtable", app) as cl:
# Add an app specific token.
token_data = {"title": "Testing 123"}
resp = conduct_api_call(cl, AppTokens, "POST", None, token_data, 200).json
@ -41,11 +41,11 @@ def test_app_specific_tokens(app, client):
conduct_api_call(cl, AppToken, "GET", {"token_uuid": token_uuid}, None, 404)
def test_delete_expired_app_token(app, client):
def test_delete_expired_app_token(app):
user = model.user.get_user("devtable")
expiration = datetime.now() - timedelta(seconds=10)
token = model.appspecifictoken.create_token(user, "some token", expiration)
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
# Delete the token.
conduct_api_call(cl, AppToken, "DELETE", {"token_uuid": token.uuid}, None, 204)

View File

@ -85,7 +85,7 @@ FIELD_ARGS = {"trigger_uuid": "1234", "field_name": "foobar"}
(BuildTriggerSourceNamespaces, "get", TRIGGER_ARGS),
],
)
def test_disallowed_for_apps(resource, method, params, client):
def test_disallowed_for_apps(resource, method, params, app):
namespace = "devtable"
repository = "someapprepo"
@ -95,5 +95,5 @@ def test_disallowed_for_apps(resource, method, params, client):
params = params or {}
params["repository"] = "%s/%s" % (namespace, repository)
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
conduct_api_call(cl, resource, method, params, None, 501)

View File

@ -58,7 +58,7 @@ FIELD_ARGS = {"trigger_uuid": "1234", "field_name": "foobar"}
(BuildTriggerSources, "post", TRIGGER_ARGS),
],
)
def test_disallowed_for_nonnormal(state, resource, method, params, client):
def test_disallowed_for_nonnormal(state, resource, method, params, app):
namespace = "devtable"
repository = "somenewstaterepo"
@ -70,5 +70,5 @@ def test_disallowed_for_nonnormal(state, resource, method, params, client):
params = params or {}
params["repository"] = "%s/%s" % (namespace, repository)
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
conduct_api_call(cl, resource, method, params, {}, 503)

View File

@ -48,10 +48,10 @@ def test_entity_search(auth_engine, requires_email, client):
assert entity["kind"] == "external"
def test_link_external_entity(auth_engine, requires_email, client):
def test_link_external_entity(auth_engine, requires_email, app):
with auth_engine(requires_email=requires_email) as auth:
with patch("endpoints.api.search.authentication", auth):
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
# Try an unknown user.
conduct_api_call(
cl,

View File

@ -15,16 +15,15 @@ from endpoints.test.shared import client_with_identity
os.environ.get("TEST_DATABASE_URI", "").find("mysql") >= 0,
reason="Queue code is very sensitive to times on MySQL, making this flaky",
)
def test_export_logs(client):
with client_with_identity("devtable", client) as cl:
assert export_action_logs_queue.get() is None
def test_export_logs(app):
timecode = time.time()
def get_time():
return timecode - 2
with patch("time.time", get_time):
with client_with_identity("devtable", app) as cl:
assert export_action_logs_queue.get() is None
# Call to export logs.
body = {
"callback_url": "http://some/url",
@ -39,13 +38,13 @@ def test_export_logs(client):
assert export_action_logs_queue.get() is not None
def test_invalid_date_range(client):
def test_invalid_date_range(app):
starttime = "02/02/2020"
endtime = "01/01/2020"
parsed_starttime, parsed_endtime = _validate_logs_arguments(starttime, endtime)
assert parsed_starttime >= parsed_endtime
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
conduct_api_call(
cl,
OrgLogs,

View File

@ -6,8 +6,8 @@ from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
def test_repository_manifest(client):
with client_with_identity("devtable", client) as cl:
def test_repository_manifest(app):
with client_with_identity("devtable", app) as cl:
repo_ref = registry_model.lookup_repository("devtable", "simple")
tags = registry_model.list_all_active_repository_tags(repo_ref)
for tag in tags:

View File

@ -49,7 +49,7 @@ def _setup_mirror():
("admin", "admin"),
],
)
def test_create_mirror_sets_permissions(existing_robot_permission, expected_permission, client):
def test_create_mirror_sets_permissions(existing_robot_permission, expected_permission, app):
mirror_bot, _ = model.user.create_robot(
"newmirrorbot", model.user.get_namespace_user("devtable")
)
@ -59,7 +59,7 @@ def test_create_mirror_sets_permissions(existing_robot_permission, expected_perm
mirror_bot.username, "devtable", "simple", existing_robot_permission
)
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
request_body = {
"external_reference": "quay.io/foobar/barbaz",
@ -78,25 +78,25 @@ def test_create_mirror_sets_permissions(existing_robot_permission, expected_perm
assert config.root_rule.rule_value == ["latest", "foo", "bar"]
def test_get_mirror_does_not_exist(client):
with client_with_identity("devtable", client) as cl:
def test_get_mirror_does_not_exist(app):
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 404)
def test_get_repo_does_not_exist(client):
with client_with_identity("devtable", client) as cl:
def test_get_repo_does_not_exist(app):
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/unicorn"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 404)
def test_get_mirror(client):
def test_get_mirror(app):
"""
Verify that performing a `GET` request returns expected and accurate data.
"""
mirror = _setup_mirror()
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200).json
@ -171,13 +171,13 @@ def test_get_mirror(client):
("root_rule", {"rule_kind": "incorrect", "rule_value": ["3.1", "3.1*"]}, 400),
],
)
def test_change_config(key, value, expected_status, client):
def test_change_config(key, value, expected_status, app):
"""
Verify that changing each attribute works as expected.
"""
mirror = _setup_mirror()
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
if key in ("http_proxy", "https_proxy", "no_proxy"):
request_body = {"external_registry_config": {"proxy": {key: value}}}
@ -187,7 +187,7 @@ def test_change_config(key, value, expected_status, client):
request_body = {key: value}
conduct_api_call(cl, RepoMirrorResource, "PUT", params, request_body, expected_status)
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200)
@ -240,12 +240,12 @@ def test_change_config(key, value, expected_status, client):
({"external_registry_username": "", "external_registry_password": ""}, 201),
],
)
def test_change_credentials(request_body, expected_status, client):
def test_change_credentials(request_body, expected_status, app):
"""
Verify credentials can only be modified as a pair.
"""
mirror = _setup_mirror()
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
conduct_api_call(cl, RepoMirrorResource, "PUT", params, request_body, expected_status)

View File

@ -17,8 +17,8 @@ from endpoints.test.shared import client_with_identity
(100000000000000000000, 400),
],
)
def test_change_tag_expiration(expiration, expected_code, client):
with client_with_identity("devtable", client) as cl:
def test_change_tag_expiration(expiration, expected_code, app):
with client_with_identity("devtable", app) as cl:
conduct_api_call(
cl,
Organization,
@ -29,10 +29,10 @@ def test_change_tag_expiration(expiration, expected_code, client):
)
def test_get_organization_collaborators(client):
def test_get_organization_collaborators(app):
params = {"orgname": "buynlarge"}
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
resp = conduct_api_call(cl, OrganizationCollaboratorList, "GET", params)
collaborator_names = [c["name"] for c in resp.json["collaborators"]]

View File

@ -18,8 +18,8 @@ from endpoints.test.shared import client_with_identity
pytest.param("buynlarge/orgrepo", "buynlarge+coolrobot", 200, id="valid robot under org"),
],
)
def test_robot_permission(repository, username, expected_code, client):
with client_with_identity("devtable", client) as cl:
def test_robot_permission(repository, username, expected_code, app):
with client_with_identity("devtable", app) as cl:
conduct_api_call(
cl,
RepositoryUserPermission,

View File

@ -19,29 +19,29 @@ from features import FeatureNameValue
("invalid_req", False, 400),
],
)
def test_post_changetrust(trust_enabled, repo_found, expected_status, client):
def test_post_changetrust(trust_enabled, repo_found, expected_status, app):
with patch("endpoints.api.repository.tuf_metadata_api") as mock_tuf:
with patch(
"endpoints.api.repository_models_pre_oci.model.repository.get_repository"
) as mock_model:
mock_model.return_value = MagicMock() if repo_found else None
mock_tuf.get_default_tags_with_expiration.return_value = ["tags", "expiration"]
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/repo"}
request_body = {"trust_enabled": trust_enabled}
conduct_api_call(cl, RepositoryTrust, "POST", params, request_body, expected_status)
def test_signing_disabled(client):
def test_signing_disabled(app):
with patch("features.SIGNING", FeatureNameValue("SIGNING", False)):
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
response = conduct_api_call(cl, Repository, "GET", params).json
assert not response["trust_enabled"]
def test_list_starred_repos(client):
with client_with_identity("devtable", client) as cl:
def test_list_starred_repos(app):
with client_with_identity("devtable", app) as cl:
params = {
"starred": "true",
}
@ -70,8 +70,8 @@ def test_list_starred_repos(client):
assert "public/publicrepo" not in repos
def test_list_repos(client, initialized_db):
with client_with_identity("devtable", client) as cl:
def test_list_repos(initialized_db, app):
with client_with_identity("devtable", app) as cl:
params = {"starred": "true", "repo_kind": "application"}
response = conduct_api_call(cl, RepositoryList, "GET", params).json
repo_states = {r["state"] for r in response["repositories"]}
@ -79,8 +79,8 @@ def test_list_repos(client, initialized_db):
assert state in ["NORMAL", "MIRROR", "READ_ONLY", "MARKED_FOR_DELETION"]
def test_list_starred_app_repos(client, initialized_db):
with client_with_identity("devtable", client) as cl:
def test_list_starred_app_repos(initialized_db, app):
with client_with_identity("devtable", app) as cl:
params = {"starred": "true", "repo_kind": "application"}
devtable = model.user.get_user("devtable")
@ -94,8 +94,8 @@ def test_list_starred_app_repos(client, initialized_db):
assert "devtable/someappr" in repos
def test_list_repositories_last_modified(client):
with client_with_identity("devtable", client) as cl:
def test_list_repositories_last_modified(app):
with client_with_identity("devtable", app) as cl:
params = {
"namespace": "devtable",
"last_modified": "true",
@ -127,12 +127,12 @@ def test_list_repositories_last_modified(client):
pytest.param("devtable/nested1/nested2", True, 201, id="Slashes Allowed Multiple Levels"),
],
)
def test_create_repository(repo_name, extended_repo_names, expected_status, client):
def test_create_repository(repo_name, extended_repo_names, expected_status, app):
with patch(
"features.EXTENDED_REPOSITORY_NAMES",
FeatureNameValue("EXTENDED_REPOSITORY_NAMES", extended_repo_names),
):
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
body = {
"namespace": "devtable",
"repository": repo_name,
@ -141,7 +141,7 @@ def test_create_repository(repo_name, extended_repo_names, expected_status, clie
}
result = conduct_api_call(
client, RepositoryList, "post", None, body, expected_code=expected_status
cl, RepositoryList, "post", None, body, expected_code=expected_status
).json
if expected_status == 201:
assert result["name"] == repo_name
@ -155,8 +155,8 @@ def test_create_repository(repo_name, extended_repo_names, expected_status, clie
False,
],
)
def test_get_repo(has_tag_manifest, client, initialized_db):
with client_with_identity("devtable", client) as cl:
def test_get_repo(has_tag_manifest, initialized_db, app):
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
response = conduct_api_call(cl, Repository, "GET", params).json
assert response["kind"] == "image"
@ -171,8 +171,8 @@ def test_get_repo(has_tag_manifest, client, initialized_db):
(database.RepositoryState.MIRROR, False),
],
)
def test_get_repo_state_can_write(state, can_write, client, initialized_db):
with client_with_identity("devtable", client) as cl:
def test_get_repo_state_can_write(state, can_write, initialized_db, app):
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
response = conduct_api_call(cl, Repository, "GET", params).json
assert response["can_write"]
@ -181,14 +181,14 @@ def test_get_repo_state_can_write(state, can_write, client, initialized_db):
repo.state = state
repo.save()
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
response = conduct_api_call(cl, Repository, "GET", params).json
assert response["can_write"] == can_write
def test_delete_repo(client, initialized_db):
with client_with_identity("devtable", client) as cl:
def test_delete_repo(initialized_db, app):
with client_with_identity("devtable", app) as cl:
resp = conduct_api_call(cl, RepositoryList, "GET", {"namespace": "devtable"}).json
repos = {repo["name"] for repo in resp["repositories"]}
assert "simple" in repos

View File

@ -14,8 +14,8 @@ from endpoints.test.shared import client_with_identity
@pytest.fixture()
def authd_client(client):
with client_with_identity("devtable", client) as cl:
def authd_client(app):
with client_with_identity("devtable", app) as cl:
yield cl

View File

@ -28,8 +28,8 @@ from util.names import parse_robot_username
{"description": "this is a description", "unstructured_metadata": {"foo": "bar"}},
],
)
def test_create_robot_with_metadata(endpoint, body, client):
with client_with_identity("devtable", client) as cl:
def test_create_robot_with_metadata(endpoint, body, app):
with client_with_identity("devtable", app) as cl:
# Create the robot with the specified body.
conduct_api_call(
cl,
@ -63,8 +63,8 @@ def test_create_robot_with_metadata(endpoint, body, client):
(OrgRobot, {"orgname": "buynlarge", "robot_shortname": "coolrobot"}),
],
)
def test_retrieve_robot(endpoint, params, app, client):
with client_with_identity("devtable", client) as cl:
def test_retrieve_robot(endpoint, params, app):
with client_with_identity("devtable", app) as cl:
result = conduct_api_call(cl, endpoint, "GET", params, None)
assert result.json["token"] is not None
@ -91,13 +91,13 @@ def test_retrieve_robot(endpoint, params, app, client):
5,
],
)
def test_retrieve_robots(endpoint, params, bot_endpoint, include_token, limit, app, client):
def test_retrieve_robots(endpoint, params, bot_endpoint, include_token, limit, app):
params["token"] = "true" if include_token else "false"
if limit is not None:
params["limit"] = limit
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
result = conduct_api_call(cl, endpoint, "GET", params, None)
if limit is not None:
@ -126,8 +126,8 @@ def test_retrieve_robots(endpoint, params, bot_endpoint, include_token, limit, a
False,
],
)
def test_retrieve_robots_token_permission(username, is_admin, with_permissions, app, client):
with client_with_identity(username, client) as cl:
def test_retrieve_robots_token_permission(username, is_admin, with_permissions, app):
with client_with_identity(username, app) as cl:
params = {"orgname": "buynlarge", "token": "true"}
if with_permissions:
params["permissions"] = "true"

View File

@ -19,12 +19,12 @@ from endpoints.test.shared import client_with_identity
("repository"),
],
)
def test_repository_search(query, client):
def test_repository_search(query, app):
# Prime the caches.
database.Repository.kind.get_id("image")
database.Repository.kind.get_name(1)
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"query": query}
with assert_query_count(4):
result = conduct_api_call(cl, ConductRepositorySearch, "GET", params, None, 200).json
@ -41,8 +41,8 @@ def test_repository_search(query, client):
("repository"),
],
)
def test_search_query_count(query, client):
with client_with_identity("devtable", client) as cl:
def test_search_query_count(query, app):
with client_with_identity("devtable", app) as cl:
params = {"query": query}
with assert_query_count(10):
result = conduct_api_call(cl, ConductSearch, "GET", params, None, 200).json
@ -62,7 +62,7 @@ def test_search_query_count(query, client):
6,
],
)
def test_repository_search_pagination(page_count, client):
def test_repository_search_pagination(page_count, app):
# Create at least a few pages of results.
all_repositories = set()
user = model.user.get_user("devtable")
@ -71,7 +71,7 @@ def test_repository_search_pagination(page_count, client):
all_repositories.add(repo_name)
model.repository.create_repository("devtable", repo_name, user)
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
for page_index in range(0, page_count):
params = {"query": "somerepo", "page": page_index + 1}

View File

@ -41,7 +41,7 @@ from endpoints.api.namespacequota import *
from endpoints.api.repository import Repository
from test.fixtures import *
from test.fixtures import * # type: ignore[assignment] # isort: skip
ORG_PARAMS = {"orgname": "buynlarge"}
TEAM_PARAMS = {"orgname": "buynlarge", "teamname": "owners"}
@ -6062,8 +6062,8 @@ SECURITY_TESTS: List[
@pytest.mark.parametrize("resource,method,params,body,identity,expected", SECURITY_TESTS)
def test_api_security(resource, method, params, body, identity, expected, client):
with client_with_identity(identity, client) as cl:
def test_api_security(resource, method, params, body, identity, expected, app):
with client_with_identity(identity, app) as cl:
conduct_api_call(cl, resource, method, params, body, expected)
@ -6122,13 +6122,13 @@ def test_all_apis_tested(app):
("DELETE", 200),
],
)
def test_team_sync_security(is_superuser, allow_nonsuperuser, method, expected, client):
def test_team_sync_security(is_superuser, allow_nonsuperuser, method, expected, app):
def is_superuser_method(_):
return is_superuser
with patch("auth.permissions.usermanager.is_superuser", is_superuser_method):
with toggle_feature("NONSUPERUSER_TEAM_SYNCING_SETUP", allow_nonsuperuser):
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
expect_success = is_superuser or allow_nonsuperuser
expected_status = expected if expect_success else 403
conduct_api_call(

View File

@ -47,10 +47,10 @@ def tags_equal(expected, actual):
(None, {"delegations": None}), # API returns None on exceptions
],
)
def test_get_signatures(targets_map, expected, client):
def test_get_signatures(targets_map, expected, app):
with patch("endpoints.api.signing.tuf_metadata_api") as mock_tuf:
mock_tuf.get_all_tags_with_expiration.return_value = targets_map
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/trusted"}
assert tags_equal(
expected, conduct_api_call(cl, RepositorySignatures, "GET", params, None, 200).json

View File

@ -19,8 +19,8 @@ from endpoints.test.shared import client_with_identity
(False),
],
)
def test_list_all_users(disabled, client):
with client_with_identity("devtable", client) as cl:
def test_list_all_users(disabled, app):
with client_with_identity("devtable", app) as cl:
params = {"disabled": disabled}
result = conduct_api_call(cl, SuperUserList, "GET", params, None, 200).json
assert len(result["users"])
@ -29,14 +29,14 @@ def test_list_all_users(disabled, client):
assert user["enabled"]
def test_list_all_orgs(client):
with client_with_identity("devtable", client) as cl:
def test_list_all_orgs(app):
with client_with_identity("devtable", app) as cl:
result = conduct_api_call(cl, SuperUserOrganizationList, "GET", None, None, 200).json
assert len(result["organizations"]) == 5
def test_paginate_orgs(client):
with client_with_identity("devtable", client) as cl:
def test_paginate_orgs(app):
with client_with_identity("devtable", app) as cl:
params = {"limit": 3}
firstResult = conduct_api_call(cl, SuperUserOrganizationList, "GET", params, None, 200).json
assert len(firstResult["organizations"]) == 3
@ -49,8 +49,8 @@ def test_paginate_orgs(client):
assert secondResult.get("next_page", None) is None
def test_paginate_test_list_all_users(client):
with client_with_identity("devtable", client) as cl:
def test_paginate_test_list_all_users(app):
with client_with_identity("devtable", app) as cl:
params = {"limit": 6}
firstResult = conduct_api_call(cl, SuperUserList, "GET", params, None, 200).json
assert len(firstResult["users"]) == 6
@ -61,8 +61,8 @@ def test_paginate_test_list_all_users(client):
assert secondResult.get("next_page", None) is None
def test_change_install_user(client):
with client_with_identity("devtable", client) as cl:
def test_change_install_user(app):
with client_with_identity("devtable", app) as cl:
params = {"username": "randomuser"}
body = {"email": "new_email123@test.com"}
result = conduct_api_call(cl, SuperUserManagement, "PUT", params, body, 200).json

View File

@ -17,8 +17,8 @@ from endpoints.test.shared import client_with_identity
("aksdjhasd", 400),
],
)
def test_change_tag_expiration_default(expiration_time, expected_status, client, app):
with client_with_identity("devtable", client) as cl:
def test_change_tag_expiration_default(expiration_time, expected_status, app):
with client_with_identity("devtable", app) as cl:
params = {
"repository": "devtable/simple",
"tag": "latest",
@ -31,8 +31,8 @@ def test_change_tag_expiration_default(expiration_time, expected_status, client,
conduct_api_call(cl, RepositoryTag, "put", params, request_body, expected_status)
def test_change_tag_expiration(client, app):
with client_with_identity("devtable", client) as cl:
def test_change_tag_expiration(app):
with client_with_identity("devtable", app) as cl:
params = {
"repository": "devtable/simple",
"tag": "latest",
@ -68,8 +68,8 @@ def test_change_tag_expiration(client, app):
(True, "newtag", 201),
],
)
def test_move_tag(manifest_exists, test_tag, expected_status, client, app):
with client_with_identity("devtable", client) as cl:
def test_move_tag(manifest_exists, test_tag, expected_status, app):
with client_with_identity("devtable", app) as cl:
test_image = "unknown"
if manifest_exists:
repo_ref = registry_model.lookup_repository("devtable", "simple")
@ -98,12 +98,12 @@ def test_move_tag(manifest_exists, test_tag, expected_status, client, app):
("buynlarge", "anotherorgrepo", 6), # +2 for permissions checks.
],
)
def test_list_repo_tags(repo_namespace, repo_name, client, query_count, app):
def test_list_repo_tags(repo_namespace, repo_name, query_count, app):
# Pre-cache media type loads to ensure consistent query count.
Manifest.media_type.get_name(1)
params = {"repository": repo_namespace + "/" + repo_name}
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
with assert_query_count(query_count):
tags = conduct_api_call(cl, ListRepositoryTags, "get", params).json["tags"]
@ -118,22 +118,22 @@ def test_list_repo_tags(repo_namespace, repo_name, client, query_count, app):
("devtable", "gargantuan", 4),
],
)
def test_list_repo_tags_filter(repo_namespace, repo_name, client, query_count, app):
def test_list_repo_tags_filter(repo_namespace, repo_name, query_count, app):
Manifest.media_type.get_name(1)
params = {"repository": repo_namespace + "/" + repo_name}
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
with assert_query_count(query_count):
params["filter_tag_name"] = "like:v"
tags = conduct_api_call(cl, ListRepositoryTags, "get", params).json["tags"]
assert len(tags) == 5
with client_with_identity("devtable", client) as cl:
with assert_query_count(query_count):
with client_with_identity("devtable", app) as cl:
with assert_query_count(query_count - 1):
params["filter_tag_name"] = "eq:prod"
tags = conduct_api_call(cl, ListRepositoryTags, "get", params).json["tags"]
assert len(tags) == 1
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
params["filter_tag_name"] = "random"
resp = conduct_api_call(cl, ListRepositoryTags, "get", params, None, expected_code=400)

View File

@ -15,10 +15,10 @@ SYNCED_TEAM_PARAMS = {"orgname": "sellnsmall", "teamname": "synced"}
UNSYNCED_TEAM_PARAMS = {"orgname": "sellnsmall", "teamname": "owners"}
def test_team_syncing(client):
def test_team_syncing(app):
with mock_ldap() as ldap:
with patch("endpoints.api.team.authentication", ldap):
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
config = {
"group_dn": "cn=AwesomeFolk",
}
@ -42,25 +42,32 @@ def test_team_syncing(client):
assert sync_info is None
def test_team_member_sync_info(client):
def test_team_member_sync_info_unsynced_superuser(app):
with mock_ldap() as ldap:
with patch("endpoints.api.team.authentication", ldap):
# Check for an unsynced team, with superuser.
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
resp = conduct_api_call(cl, TeamMemberList, "GET", UNSYNCED_TEAM_PARAMS)
assert "can_sync" in resp.json
assert resp.json["can_sync"]["service"] == "ldap"
assert "synced" not in resp.json
def test_team_member_sync_info_unsynced_nonsuperuser(app):
with mock_ldap() as ldap:
with patch("endpoints.api.team.authentication", ldap):
# Check for an unsynced team, with non-superuser.
with client_with_identity("randomuser", client) as cl:
with client_with_identity("randomuser", app) as cl:
resp = conduct_api_call(cl, TeamMemberList, "GET", UNSYNCED_TEAM_PARAMS)
assert "can_sync" not in resp.json
assert "synced" not in resp.json
def test_team_member_sync_info_synced_superuser(app):
with mock_ldap() as ldap:
with patch("endpoints.api.team.authentication", ldap):
# Check for a synced team, with superuser.
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
resp = conduct_api_call(cl, TeamMemberList, "GET", SYNCED_TEAM_PARAMS)
assert "can_sync" in resp.json
assert resp.json["can_sync"]["service"] == "ldap"
@ -69,8 +76,12 @@ def test_team_member_sync_info(client):
assert "last_updated" in resp.json["synced"]
assert "group_dn" in resp.json["synced"]["config"]
def test_team_member_sync_info_synced_nonsuperuser(app):
with mock_ldap() as ldap:
with patch("endpoints.api.team.authentication", ldap):
# Check for a synced team, with non-superuser.
with client_with_identity("randomuser", client) as cl:
with client_with_identity("randomuser", app) as cl:
resp = conduct_api_call(cl, TeamMemberList, "GET", SYNCED_TEAM_PARAMS)
assert "can_sync" not in resp.json
@ -79,11 +90,11 @@ def test_team_member_sync_info(client):
assert "config" not in resp.json["synced"]
def test_organization_teams_sync_bool(client):
def test_organization_teams_sync_bool(app):
with mock_ldap() as ldap:
with patch("endpoints.api.organization.authentication", ldap):
# Ensure synced teams are marked as such in the organization teams list.
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
resp = conduct_api_call(cl, Organization, "GET", {"orgname": "sellnsmall"})
assert not resp.json["teams"]["owners"]["is_synced"]

View File

@ -32,7 +32,7 @@ def test_super_user_build_endpoints(context, dockerfile_path, expected):
assert is_parent(context, dockerfile_path) == expected
def test_enabled_disabled_trigger(app, client):
def test_enabled_disabled_trigger(app):
trigger = model.build.list_build_triggers("devtable", "building")[0]
trigger.config = json.dumps({"hook_id": "someid"})
trigger.save()
@ -46,7 +46,7 @@ def test_enabled_disabled_trigger(app, client):
"enabled": False,
}
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
result = conduct_api_call(cl, BuildTrigger, "PUT", params, body, 200).json
assert not result["enabled"]
@ -54,6 +54,6 @@ def test_enabled_disabled_trigger(app, client):
"enabled": True,
}
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
result = conduct_api_call(cl, BuildTrigger, "PUT", params, body, 200).json
assert result["enabled"]

View File

@ -10,9 +10,9 @@ from endpoints.test.shared import client_with_identity, conduct_call
from features import FeatureNameValue
def test_user_metadata_update(client):
def test_user_metadata_update(app):
with patch("features.USER_METADATA", FeatureNameValue("USER_METADATA", True)):
with client_with_identity("devtable", client) as cl:
with client_with_identity("devtable", app) as cl:
metadata = {
"given_name": "Quay",
"family_name": "User",

View File

@ -6,7 +6,8 @@ import logging
import os
from functools import wraps
from flask import abort, make_response, request
from flask import request
from ua_parser import user_agent_parser
import features
from app import app, ip_resolver, model_cache, usermanager
@ -235,11 +236,40 @@ def require_xhr_from_browser(func):
text attacks.
"""
# https://github.com/pallets/werkzeug/issues/2078
browsers = (
"aol",
"ask",
"camino",
"chrome",
"firefox",
"galeon",
"google",
"kmeleon",
"konqueror",
"links",
"lynx",
"msie",
"msn",
"netscape",
"opera",
"safari",
"seamonkey",
"webkit",
"yahoo",
)
@wraps(func)
def wrapper(*args, **kwargs):
if app.config.get("BROWSER_API_CALLS_XHR_ONLY", False):
if request.method == "GET" and request.user_agent.browser:
if (
request.method == "GET"
and request.user_agent.string
and user_agent_parser.ParseUserAgent(request.user_agent.string)["family"].lower()
in browsers
):
has_xhr_header = request.headers.get("X-Requested-With") == "XMLHttpRequest"
if not has_xhr_header and not app.config.get("DEBUGGING") == True:
logger.warning(
"Disallowed possible RTA to URL %s with user agent %s",

View File

@ -12,19 +12,19 @@ CSRF_TOKEN_KEY = "_csrf_token"
@contextmanager
def client_with_identity(auth_username, client):
with client.session_transaction() as sess:
def client_with_identity(auth_username, app):
if auth_username and auth_username is not None:
loaded = model.user.get_user(auth_username)
sess["user_id"] = loaded.uuid
sess["login_time"] = datetime.datetime.now()
else:
sess["user_id"] = "anonymous"
loaded = None
yield client
with app.test_client(user=loaded) as cl:
yield cl
with client.session_transaction() as sess:
with cl.session_transaction() as sess:
sess["_user_id"] = None
sess["user_id"] = None
sess["_fresh"] = False
sess["login_time"] = None
sess[CSRF_TOKEN_KEY] = None

View File

@ -18,7 +18,15 @@ def test_verify_blueprint(blueprint):
self.first_registration = True
self.app = app
def add_url_rule(self, rule, endpoint, view_function, methods=None):
def add_url_rule(
self,
rule,
endpoint=None,
view_function=None,
methods=None,
provide_automatic_options=None,
**options,
):
result = "__anon_protected" in dir(view_function) or "__anon_allowed" in dir(
view_function
)

View File

@ -15,7 +15,11 @@ from endpoints.test.shared import conduct_call
("curl/whatever", False, 200),
("Mozilla/whatever", True, 200),
("Mozilla/5.0", True, 200),
("Mozilla/5.0 (Windows NT 5.1; Win64; x64)", False, 400),
(
"Mozilla/5.0 (Unknown; Linux x86_64) AppleWebKit/534.34 (KHTML, like Gecko) Safari/534.34",
False,
400,
),
],
)
def test_require_xhr_from_browser(user_agent, include_header, expected_code, app, client):

View File

@ -183,6 +183,9 @@ ignore_missing_imports = True
[mypy-tldextract]
ignore_missing_imports = True
[mypy-ua_parser]
ignore_missing_imports = True
[mypy-werkzeug.*]
ignore_missing_imports = True

View File

@ -6,11 +6,7 @@ import logging
from io import StringIO
from requests import Response
try:
from werkzeug.exceptions import Unauthorized
except ImportError:
Unauthorized = Exception
from oauth import utils

View File

@ -3,7 +3,15 @@ from werkzeug.routing import BaseConverter
import features
class APIRepositoryPathConverter(BaseConverter):
class QuayBaseConverter(BaseConverter):
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if "part_isolating" not in cls.__dict__:
cls.part_isolating = "/" not in cls.regex
class APIRepositoryPathConverter(QuayBaseConverter):
"""
Converter for handling repository paths.
@ -17,7 +25,7 @@ class APIRepositoryPathConverter(BaseConverter):
# TODO(kleesc): Remove after fully deprecating V1 push/pull
class V1CreateRepositoryPathConverter(BaseConverter):
class V1CreateRepositoryPathConverter(QuayBaseConverter):
"""
Converter for handling PUT repository path.
Handles both library and non-library paths (if configured).
@ -41,7 +49,7 @@ class V1CreateRepositoryPathConverter(BaseConverter):
self.regex = r"([^/]+(/[^/]+)+)(?<!auth)(?<!tags)(?<!images)"
class RepositoryPathConverter(BaseConverter):
class RepositoryPathConverter(QuayBaseConverter):
"""
Converter for handling repository paths.
Handles both library and non-library paths (if configured).
@ -59,7 +67,7 @@ class RepositoryPathConverter(BaseConverter):
self.regex = r"([^/]+(/[^/]+)+)"
class RegexConverter(BaseConverter):
class RegexConverter(QuayBaseConverter):
"""
Converter for handling custom regular expression patterns in paths.
"""
@ -69,7 +77,7 @@ class RegexConverter(BaseConverter):
self.regex = regex_value
class RepositoryPathRedirectConverter(BaseConverter):
class RepositoryPathRedirectConverter(QuayBaseConverter):
"""
Converter for handling redirect paths that don't match any other routes.
@ -97,6 +105,7 @@ class RepositoryPathRedirectConverter(BaseConverter):
def __init__(self, url_map):
super().__init__(url_map)
self.weight = 200
if features.LIBRARY_SUPPORT:

View File

@ -12,7 +12,7 @@ ipdb
ipython
mock==3.0.5
mypy==1.3.0
moto==2.0.1
moto==4.1.4
parameterized==0.8.1
pre-commit==2.20.0
pytest

View File

@ -11,7 +11,7 @@ bcrypt==3.1.7
beautifulsoup4==4.11.1
bintrees==2.1.0
bitmath==1.3.3.1
blinker==1.4
blinker==1.6.2
boto3==1.21.42
botocore==1.24.42
cachetools==4.0.0
@ -19,7 +19,7 @@ certifi==2023.7.22
cffi==1.14.3
chardet==3.0.4
charset-normalizer==2.0.12
click==8.0.0
click==8.1.3
cryptography==41.0.3
DateTime==4.3
debtcollector==1.22.0
@ -28,8 +28,8 @@ Deprecated==1.2.7
dumb-init==1.2.2
elasticsearch==7.6.0
elasticsearch-dsl==7.0.0
Flask==1.1.1
Flask-Login==0.4.1
Flask==2.3.2
Flask-Login==0.6.2
Flask-Mail==0.9.1
Flask-Principal==0.4.0
Flask-RESTful==0.3.9
@ -42,11 +42,11 @@ gunicorn==20.1.0
hashids==1.2.0
html5lib==1.1
idna==3.4
importlib-metadata==1.4.0
importlib-metadata==6.7.0
iso8601==0.1.12
isodate==0.6.1
itsdangerous==1.1.0
Jinja2==2.11.3
itsdangerous==2.1.2
Jinja2==3.1.2
jmespath==0.9.4
jsonpath-rw==1.4.0
jsonpointer==2.0
@ -54,7 +54,7 @@ jsonschema==3.2.0
kafka-python==1.4.7
keystoneauth1==3.18.0
Mako==1.2.2
MarkupSafe==1.1.1
MarkupSafe==2.1.3
maxminddb==1.5.2
mixpanel==4.5.0
msgpack==0.6.2
@ -124,11 +124,12 @@ text-unidecode==1.3
tldextract==2.2.2
toposort==1.5
tzlocal==2.0.0
ua-parser==0.18.0
urllib3==1.26.9
webencodings==0.5.1
WebOb==1.8.6
websocket-client==0.57.0
Werkzeug==1.0.0
Werkzeug==2.3.6
wrapt==1.13.3
xhtml2pdf==0.2.6
zipp==2.1.0

View File

@ -1,3 +1,4 @@
import datetime
import inspect
import os
import shutil
@ -6,6 +7,7 @@ from test.testconfig import FakeTransaction
import pytest
from flask import Flask, jsonify
from flask.testing import FlaskClient
from flask_login import LoginManager
from flask_mail import Mail
from flask_principal import Principal, identity_loaded
@ -292,6 +294,29 @@ def initialized_db(appconfig):
yield
class _FlaskLoginClient(FlaskClient):
"""
A Flask test client that knows how to log in users
using the Flask-Login extension.
https://github.com/maxcountryman/flask-login/pull/470
"""
def __init__(self, *args, **kwargs):
user = kwargs.pop("user", None)
fresh = kwargs.pop("fresh_login", True)
super(_FlaskLoginClient, self).__init__(*args, **kwargs)
with self.session_transaction() as sess:
if user:
sess["_user_id"] = user.uuid
sess["user_id"] = user.uuid
sess["_fresh"] = fresh
sess["login_time"] = datetime.datetime.now()
else:
sess["_user_id"] = "anonymous"
@pytest.fixture()
def app(appconfig, initialized_db):
"""
@ -299,6 +324,7 @@ def app(appconfig, initialized_db):
"""
app = Flask(__name__)
login_manager = LoginManager(app)
login_manager.init_app(app)
@app.errorhandler(model.DataModelException)
def handle_dme(ex):
@ -314,6 +340,8 @@ def app(appconfig, initialized_db):
def on_identity_loaded_for_test(sender, identity):
on_identity_loaded(sender, identity)
app.test_client_class = _FlaskLoginClient
Principal(app, use_sessions=False)
app.url_map.converters["regex"] = RegexConverter

View File

@ -459,7 +459,7 @@ class TestUserStarredRepositoryList(ApiTestCase):
self.login(READ_ACCESS_USER)
# Queries: Base + the list query
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1):
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT):
self.getJsonResponse(StarredRepositoryList, expected_code=200)
def test_star_repo_guest(self):
@ -476,7 +476,7 @@ class TestUserStarredRepositoryList(ApiTestCase):
self.login(READ_ACCESS_USER)
# Queries: Base + the list query
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1):
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT):
json = self.getJsonResponse(StarredRepositoryList)
assert json["repositories"] == []
@ -2188,7 +2188,7 @@ class TestListRepos(ApiTestCase):
# Queries: Base + the list query + the popularity and last modified queries + full perms load
# TODO: Add quota queries
with patch("features.QUOTA_MANAGEMENT", False):
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 5):
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 4):
json = self.getJsonResponse(
RepositoryList,
params=dict(
@ -2553,11 +2553,11 @@ class TestGetRepository(ApiTestCase):
self.login(ADMIN_ACCESS_USER)
# base + repo + is_starred + tags
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 4):
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 3):
self.getJsonResponse(Repository, params=dict(repository=ADMIN_ACCESS_USER + "/simple"))
# base + repo + is_starred + tags
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 4):
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 3):
json = self.getJsonResponse(
Repository, params=dict(repository=ADMIN_ACCESS_USER + "/gargantuan")
)
@ -2799,7 +2799,7 @@ class TestRepoBuilds(ApiTestCase):
self.login(ADMIN_ACCESS_USER)
# Queries: Permission + the list query + app check
with assert_query_count(3):
with assert_query_count(2):
json = self.getJsonResponse(
RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + "/simple")
)
@ -2810,7 +2810,7 @@ class TestRepoBuilds(ApiTestCase):
self.login(ADMIN_ACCESS_USER)
# Queries: Permission + the list query + app check
with assert_query_count(3):
with assert_query_count(2):
json = self.getJsonResponse(
RepositoryBuildList, params=dict(repository=ADMIN_ACCESS_USER + "/building")
)
@ -3687,11 +3687,11 @@ class TestUserRobots(ApiTestCase):
self.putJsonResponse(UserRobot, params=dict(robot_shortname="coolbot"), expected_code=201)
# Queries: Base + the lookup query
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1):
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT):
self.getJsonResponse(UserRobotList)
# Queries: Base + the lookup query
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT + 1):
with assert_query_count(BASE_LOGGEDIN_QUERY_COUNT):
self.getJsonResponse(UserRobotList, params=dict(permissions=True))
def test_robots(self):

View File

@ -231,7 +231,8 @@ class WebhookEndpointTestCase(EndpointTestCase):
"webhooks.build_trigger_webhook",
trigger_uuid=trigger.uuid,
expected_code=400,
headers={"Authorization": auth_header},
headers={"Authorization": auth_header, "Content-Type": "application/json"},
data={},
)
def test_valid_build_trigger_webhook_invalid_payload(self):
@ -663,7 +664,14 @@ class KeyServerTestCase(EndpointTestCase):
def test_put_service_key(self):
# No Authorization header should yield a 400
self.putResponse(
"key_server.put_service_key", service="sample_service", kid="kid420", expected_code=400
"key_server.put_service_key",
service="sample_service",
kid="kid420",
headers={
"Content-Type": "application/json",
},
data={},
expected_code=400,
)
# Mint a JWT with our test payload

View File

@ -6,11 +6,7 @@ from xhtml2pdf import pisa
from app import app
jinja_options = {
"loader": FileSystemLoader("util"),
}
env = Environment(**jinja_options)
env = Environment(loader=FileSystemLoader("util"))
def renderInvoiceToPdf(invoice, user):

View File

@ -1,26 +0,0 @@
from urllib.parse import urljoin
from flask import url_for
def get_blob_download_uri_getter(context, url_scheme_and_hostname):
"""
Returns a function with context to later generate the uri for a download blob.
:param context: Flask RequestContext
:param url_scheme_and_hostname: URLSchemeAndHostname class instance
:return: function (repository_and_namespace, checksum) -> uri
"""
def create_uri(repository_and_namespace, checksum):
"""
Creates a uri for a download blob from a repository, namespace, and checksum from earlier
context.
"""
with context:
relative_layer_url = url_for(
"v2.download_blob", repository=repository_and_namespace, digest=checksum
)
return urljoin(url_scheme_and_hostname.get_url(), relative_layer_url)
return create_uri

View File

@ -1,28 +0,0 @@
import pytest
from app import app
from util.config import URLSchemeAndHostname
from util.secscan.secscan_util import get_blob_download_uri_getter
from test.fixtures import *
@pytest.mark.parametrize(
"url_scheme_and_hostname, repo_namespace, checksum, expected_value,",
[
(
URLSchemeAndHostname("http", "localhost:5000"),
"devtable/simple",
"tarsum+sha256:123",
"http://localhost:5000/v2/devtable/simple/blobs/tarsum%2Bsha256:123",
),
],
)
def test_blob_download_uri_getter(
app, url_scheme_and_hostname, repo_namespace, checksum, expected_value
):
blob_uri_getter = get_blob_download_uri_getter(
app.test_request_context("/"), url_scheme_and_hostname
)
assert blob_uri_getter(repo_namespace, checksum) == expected_value