mirror of
https://github.com/quay/quay.git
synced 2025-04-18 10:44:06 +03:00
* chore: drop deprecated tables and remove unused code * isort imports * migration: check for table existence before drop
90 lines
2.9 KiB
Python
90 lines
2.9 KiB
Python
import json
|
|
import unittest
|
|
|
|
import pytest
|
|
|
|
from app import model_cache
|
|
from data.database import ProxyCacheConfig, User
|
|
from proxy import Proxy, UpstreamRegistryError
|
|
|
|
|
|
@pytest.mark.e2e
|
|
class TestProxyE2E(unittest.TestCase):
|
|
media_type = "application/vnd.docker.distribution.manifest.v2+json"
|
|
registry = "docker.io"
|
|
repo = "library/postgres"
|
|
tag = 14
|
|
tag_404 = 666
|
|
digest_404 = "sha256:3e23e8160039594a33894f6564e1b1348bbd7a0088d42c4acb73eeaed59c009d"
|
|
digest = None # set by setup
|
|
proxy = None # set by setup
|
|
org = None # set by setup
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup(self):
|
|
config = ProxyCacheConfig(
|
|
upstream_registry=self.registry,
|
|
organization=User(username="cache-org", organization=True),
|
|
)
|
|
if self.proxy is None:
|
|
self.proxy = Proxy(config, self.repo)
|
|
|
|
if self.digest is None:
|
|
raw_manifest, content_type = self.proxy.get_manifest(
|
|
image_ref=self.tag,
|
|
)
|
|
manifest = json.loads(raw_manifest)
|
|
self.digest = manifest["fsLayers"][0]["blobSum"]
|
|
|
|
def test_manifest_exists(self):
|
|
digest = self.proxy.manifest_exists(image_ref=self.tag)
|
|
self.assertIsNotNone(digest)
|
|
|
|
def test_manifest_exists_404(self):
|
|
with pytest.raises(UpstreamRegistryError):
|
|
self.proxy.manifest_exists(image_ref=self.tag_404)
|
|
|
|
def test_get_manifest(self):
|
|
try:
|
|
self.proxy.get_manifest(image_ref=self.tag)
|
|
except Exception as e:
|
|
assert False, f"unexpected exception {e}"
|
|
|
|
def test_get_manifest_404(self):
|
|
with pytest.raises(UpstreamRegistryError):
|
|
self.proxy.get_manifest(image_ref=self.tag_404)
|
|
|
|
def test_get_manifest_renews_expired_token(self):
|
|
if not hasattr(model_cache, "empty_for_testing"):
|
|
# don't continue testing if we can't empty the cache as it will certainly fail.
|
|
return
|
|
|
|
# by clearing the cache and proxy session we force the proxy to
|
|
# re-authenticate against the upstream registry.
|
|
model_cache.empty_for_testing()
|
|
self.proxy._session.headers.pop("Authorization")
|
|
try:
|
|
self.proxy.get_manifest(image_ref=self.tag)
|
|
except Exception as e:
|
|
assert False, f"unexpected exception {e}"
|
|
|
|
def test_blob_exists(self):
|
|
try:
|
|
self.proxy.blob_exists(digest=self.digest)
|
|
except Exception as e:
|
|
assert False, f"unexpected exception {e}"
|
|
|
|
def test_blob_exists_404(self):
|
|
with pytest.raises(UpstreamRegistryError):
|
|
self.proxy.blob_exists(digest=self.digest_404)
|
|
|
|
def test_get_blob(self):
|
|
try:
|
|
self.proxy.get_blob(self.digest)
|
|
except Exception as e:
|
|
assert False, f"unexpected exception {e}"
|
|
|
|
def test_get_blob_404(self):
|
|
with pytest.raises(UpstreamRegistryError):
|
|
self.proxy.get_blob(self.digest_404)
|