1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/test/test_mirror.py
Marcus Kok f3f15f3f6d api: Add architecture_filter to mirror configuration API (PROJQUAY-10259) (#4922)
Extend the mirror configuration API to accept and return architecture_filter
settings, allowing users to configure which architectures should be mirrored
from multi-architecture images via the REST API.

Changes:
- Add architecture_filter to API schema and GET response
- Handle architecture_filter in POST (create) and PUT (update) methods
- Validate architecture values against allowed set (amd64, arm64, ppc64le, s390x)
- Add comprehensive API tests for the new field

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 09:28:29 -05:00

382 lines
15 KiB
Python

from datetime import datetime
import pytest
from data import model
from endpoints.api.mirror import RepoMirrorResource, RepoMirrorSyncCancelResource
from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
from test.fixtures import *
def _setup_mirror():
repo = model.repository.get_repository("devtable", "simple")
assert repo
robot = model.user.lookup_robot("devtable+dtrobot")
assert robot
rule = model.repo_mirror.create_rule(repo, ["latest", "3.3*", "foo"])
assert rule
mirror_kwargs = {
"is_enabled": True,
"external_reference": "quay.io/redhat/quay",
"sync_interval": 5000,
"sync_start_date": datetime(2020, 0o1, 0o2, 6, 30, 0),
"skopeo_timeout_interval": 300,
"external_registry_username": "fakeUsername",
"external_registry_password": "fakePassword",
"external_registry_config": {
"verify_tls": True,
"unsigned_images": False,
"proxy": {
"http_proxy": "http://insecure.proxy.corp",
"https_proxy": "https://secure.proxy.corp",
"no_proxy": "mylocalhost",
},
},
}
mirror = model.repo_mirror.enable_mirroring_for_repository(
repo, root_rule=rule, internal_robot=robot, **mirror_kwargs
)
assert mirror
return mirror
@pytest.mark.parametrize(
"existing_robot_permission, expected_permission",
[
(None, "write"),
("read", "write"),
("write", "write"),
("admin", "admin"),
],
)
def test_create_mirror_sets_permissions(existing_robot_permission, expected_permission, app):
mirror_bot, _ = model.user.create_robot(
"newmirrorbot", model.user.get_namespace_user("devtable")
)
if existing_robot_permission:
model.permission.set_user_repo_permission(
mirror_bot.username, "devtable", "simple", existing_robot_permission
)
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
request_body = {
"external_reference": "quay.io/foobar/barbaz",
"sync_interval": 100,
"skopeo_timeout_interval": 300,
"sync_start_date": "2019-08-20T17:51:00Z",
"root_rule": {"rule_kind": "tag_glob_csv", "rule_value": ["latest", "foo", "bar"]},
"robot_username": "devtable+newmirrorbot",
}
conduct_api_call(cl, RepoMirrorResource, "POST", params, request_body, 201)
# Check the status of the robot.
permissions = model.permission.get_user_repository_permissions(mirror_bot, "devtable", "simple")
permission = next(permissions, None)
assert permission and permission.role.name == expected_permission
config = model.repo_mirror.get_mirror(model.repository.get_repository("devtable", "simple"))
assert config.root_rule.rule_value == ["latest", "foo", "bar"]
def test_get_mirror_does_not_exist(app):
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 404)
def test_get_repo_does_not_exist(app):
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/unicorn"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 404)
def test_get_mirror(app):
"""
Verify that performing a `GET` request returns expected and accurate data.
"""
mirror = _setup_mirror()
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200).json
assert resp["is_enabled"] == True
assert resp["external_reference"] == "quay.io/redhat/quay"
assert resp["sync_interval"] == 5000
assert resp["skopeo_timeout_interval"] == 300
assert resp["sync_start_date"] == "2020-01-02T06:30:00Z"
assert resp["external_registry_username"] == "fakeUsername"
assert "external_registry_password" not in resp
assert "external_registry_config" in resp
assert resp["external_registry_config"]["verify_tls"] is True
assert resp["external_registry_config"]["unsigned_images"] is False
assert "proxy" in resp["external_registry_config"]
assert resp["external_registry_config"]["proxy"]["http_proxy"] == "http://insecure.proxy.corp"
assert resp["external_registry_config"]["proxy"]["https_proxy"] == "https://secure.proxy.corp"
assert resp["external_registry_config"]["proxy"]["no_proxy"] == "mylocalhost"
@pytest.mark.parametrize(
"key, value, expected_status",
[
("is_enabled", True, 201),
("is_enabled", False, 201),
("is_enabled", None, 400),
("is_enabled", "foo", 400),
("external_reference", "example.com/foo/bar", 201),
("external_reference", "example.com/foo", 201),
("external_reference", "example.com", 201),
("external_registry_username", "newTestUsername", 201),
("external_registry_username", None, 201),
("external_registry_username", 123, 400),
("external_registry_password", "newTestPassword", 400),
("external_registry_password", None, 400),
("external_registry_password", 41, 400),
("robot_username", "devtable+dtrobot", 201),
("robot_username", "devtable+doesntExist", 400),
("sync_start_date", "2020-01-01T00:00:00Z", 201),
("sync_start_date", "January 1 2020", 400),
("sync_start_date", "2020-01-01T00:00:00.00Z", 400),
("sync_start_date", "Wed, 01 Jan 2020 00:00:00 -0000", 400),
("sync_start_date", "Wed, 02 Oct 2002 08:00:00 EST", 400),
("sync_interval", 2000, 201),
("sync_interval", -5, 400),
("skopeo_timeout_interval", 3000, 201),
("skopeo_timeout_interval", 60, 400),
("https_proxy", "https://proxy.corp.example.com", 201),
("https_proxy", None, 201),
(
"https_proxy",
"proxy.example.com; rm -rf /",
201,
), # Safe; values only set in env, not eval'ed
("http_proxy", "http://proxy.corp.example.com", 201),
("http_proxy", None, 201),
(
"http_proxy",
"proxy.example.com; rm -rf /",
201,
), # Safe; values only set in env, not eval'ed
("no_proxy", "quay.io", 201),
("no_proxy", None, 201),
("no_proxy", "quay.io; rm -rf /", 201), # Safe because proxy values are not eval'ed
("verify_tls", True, 201),
("verify_tls", False, 201),
("verify_tls", None, 400),
("verify_tls", "abc", 400),
("unsigned_images", True, 201),
("unsigned_images", False, 201),
("unsigned_images", None, 400),
("unsigned_images", "abc", 400),
("root_rule", {"rule_kind": "tag_glob_csv", "rule_value": ["3.1", "3.1*"]}, 201),
("root_rule", {"rule_kind": "tag_glob_csv"}, 400),
("root_rule", {"rule_kind": "tag_glob_csv", "rule_value": []}, 400),
("root_rule", {"rule_kind": "incorrect", "rule_value": ["3.1", "3.1*"]}, 400),
("architecture_filter", ["amd64"], 201),
("architecture_filter", ["amd64", "arm64"], 201),
("architecture_filter", ["amd64", "arm64", "ppc64le", "s390x"], 201),
("architecture_filter", [], 201),
("architecture_filter", None, 201),
("architecture_filter", ["invalid_arch"], 400),
("architecture_filter", ["amd64", "invalid"], 400),
("architecture_filter", "amd64", 400),
],
)
def test_change_config(key, value, expected_status, app):
"""
Verify that changing each attribute works as expected.
"""
mirror = _setup_mirror()
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
if key in ("http_proxy", "https_proxy", "no_proxy"):
request_body = {"external_registry_config": {"proxy": {key: value}}}
elif key == "verify_tls" or key == "unsigned_images":
request_body = {"external_registry_config": {key: value}}
else:
request_body = {key: value}
conduct_api_call(cl, RepoMirrorResource, "PUT", params, request_body, expected_status)
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200)
if expected_status < 400:
if key == "external_registry_password":
assert key not in resp.json
elif key == "verify_tls":
assert resp.json["external_registry_config"]["verify_tls"] == value
elif key == "unsigned_images":
assert resp.json["external_registry_config"]["unsigned_images"] == value
elif key in ("http_proxy", "https_proxy", "no_proxy"):
assert resp.json["external_registry_config"]["proxy"][key] == value
elif key == "architecture_filter":
# None is normalized to [] in the API response
expected_value = value if value is not None else []
assert resp.json[key] == expected_value
else:
assert resp.json[key] == value
else:
if key == "external_registry_password":
assert key not in resp.json
elif key == "verify_tls":
assert resp.json["external_registry_config"][key] != value
elif key == "unsigned_images":
assert resp.json["external_registry_config"][key] != value
elif key in ("http_proxy", "https_proxy", "no_proxy"):
assert resp.json["external_registry_config"]["proxy"][key] != value
elif key == "architecture_filter":
# On failure, architecture_filter should remain at default []
assert resp.json[key] == []
else:
assert resp.json[key] != value
@pytest.mark.parametrize(
"request_body, expected_status",
[
# Set a new password and username => Success
(
{
"external_registry_username": "newUsername",
"external_registry_password": "newPassword",
},
201,
),
# Set password and username to None => Success
({"external_registry_username": None, "external_registry_password": None}, 201),
# Set username to value but password None => Sucess
({"external_registry_username": "myUsername", "external_registry_password": None}, 201),
# Set only new Username => Success
({"external_registry_username": "myNewUsername"}, 201),
({"external_registry_username": None}, 201),
# Set only new Password => Failure
({"external_registry_password": "myNewPassword"}, 400),
({"external_registry_password": None}, 400),
# Set username and password to empty string => Success?
({"external_registry_username": "", "external_registry_password": ""}, 201),
],
)
def test_change_credentials(request_body, expected_status, app):
"""
Verify credentials can only be modified as a pair.
"""
mirror = _setup_mirror()
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
conduct_api_call(cl, RepoMirrorResource, "PUT", params, request_body, expected_status)
def test_cancel_repo_mirroring(app):
"""
Verify that cancelling a mirror sync sets proper values for sync status and number of retries.
"""
mirror = _setup_mirror()
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200).json
mirror = model.repo_mirror.update_sync_status_to_sync_now(mirror)
assert mirror.sync_status == 3
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
conduct_api_call(cl, RepoMirrorSyncCancelResource, "POST", params, None, 204)
assert model.repo_mirror.check_repo_mirror_sync_status(mirror) == -2
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200).json
assert resp["sync_retries_remaining"] == 0
def test_get_mirror_includes_architecture_filter(app):
"""
Verify GET response includes architecture_filter field.
"""
mirror = _setup_mirror()
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200).json
assert "architecture_filter" in resp
assert resp["architecture_filter"] == []
def test_set_and_get_architecture_filter(app):
"""
Verify architecture_filter can be set and retrieved correctly.
"""
mirror = _setup_mirror()
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
# Set architecture filter
request_body = {"architecture_filter": ["amd64", "arm64"]}
conduct_api_call(cl, RepoMirrorResource, "PUT", params, request_body, 201)
# Verify it was set correctly
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200).json
assert resp["architecture_filter"] == ["amd64", "arm64"]
# Clear architecture filter
request_body = {"architecture_filter": []}
conduct_api_call(cl, RepoMirrorResource, "PUT", params, request_body, 201)
# Verify it was cleared
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200).json
assert resp["architecture_filter"] == []
def test_create_mirror_with_architecture_filter(app):
"""
Verify architecture_filter can be set during mirror creation.
"""
mirror_bot, _ = model.user.create_robot(
"archfiltermirrorbot", model.user.get_namespace_user("devtable")
)
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
request_body = {
"external_reference": "quay.io/foobar/barbaz",
"sync_interval": 100,
"skopeo_timeout_interval": 300,
"sync_start_date": "2019-08-20T17:51:00Z",
"root_rule": {"rule_kind": "tag_glob_csv", "rule_value": ["latest", "foo", "bar"]},
"robot_username": "devtable+archfiltermirrorbot",
"architecture_filter": ["amd64", "s390x"],
}
conduct_api_call(cl, RepoMirrorResource, "POST", params, request_body, 201)
# Verify the architecture_filter was set
resp = conduct_api_call(cl, RepoMirrorResource, "GET", params, None, 200).json
assert resp["architecture_filter"] == ["amd64", "s390x"]
def test_create_mirror_with_invalid_architecture_filter(app):
"""
Verify creating mirror with invalid architecture_filter fails.
"""
mirror_bot, _ = model.user.create_robot(
"invalidarchbot", model.user.get_namespace_user("devtable")
)
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
request_body = {
"external_reference": "quay.io/foobar/barbaz",
"sync_interval": 100,
"skopeo_timeout_interval": 300,
"sync_start_date": "2019-08-20T17:51:00Z",
"root_rule": {"rule_kind": "tag_glob_csv", "rule_value": ["latest"]},
"robot_username": "devtable+invalidarchbot",
"architecture_filter": ["invalid_arch"],
}
conduct_api_call(cl, RepoMirrorResource, "POST", params, request_body, 400)