1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/proxy/test_proxy_e2e.py
Kenny Lee Sin Cheong 5f63b3a7bb
chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

90 lines
2.9 KiB
Python

import json
import unittest
import pytest
from app import model_cache
from data.database import ProxyCacheConfig, User
from proxy import Proxy, UpstreamRegistryError
@pytest.mark.e2e
class TestProxyE2E(unittest.TestCase):
media_type = "application/vnd.docker.distribution.manifest.v2+json"
registry = "docker.io"
repo = "library/postgres"
tag = 14
tag_404 = 666
digest_404 = "sha256:3e23e8160039594a33894f6564e1b1348bbd7a0088d42c4acb73eeaed59c009d"
digest = None # set by setup
proxy = None # set by setup
org = None # set by setup
@pytest.fixture(autouse=True)
def setup(self):
config = ProxyCacheConfig(
upstream_registry=self.registry,
organization=User(username="cache-org", organization=True),
)
if self.proxy is None:
self.proxy = Proxy(config, self.repo)
if self.digest is None:
raw_manifest, content_type = self.proxy.get_manifest(
image_ref=self.tag,
)
manifest = json.loads(raw_manifest)
self.digest = manifest["fsLayers"][0]["blobSum"]
def test_manifest_exists(self):
digest = self.proxy.manifest_exists(image_ref=self.tag)
self.assertIsNotNone(digest)
def test_manifest_exists_404(self):
with pytest.raises(UpstreamRegistryError):
self.proxy.manifest_exists(image_ref=self.tag_404)
def test_get_manifest(self):
try:
self.proxy.get_manifest(image_ref=self.tag)
except Exception as e:
assert False, f"unexpected exception {e}"
def test_get_manifest_404(self):
with pytest.raises(UpstreamRegistryError):
self.proxy.get_manifest(image_ref=self.tag_404)
def test_get_manifest_renews_expired_token(self):
if not hasattr(model_cache, "empty_for_testing"):
# don't continue testing if we can't empty the cache as it will certainly fail.
return
# by clearing the cache and proxy session we force the proxy to
# re-authenticate against the upstream registry.
model_cache.empty_for_testing()
self.proxy._session.headers.pop("Authorization")
try:
self.proxy.get_manifest(image_ref=self.tag)
except Exception as e:
assert False, f"unexpected exception {e}"
def test_blob_exists(self):
try:
self.proxy.blob_exists(digest=self.digest)
except Exception as e:
assert False, f"unexpected exception {e}"
def test_blob_exists_404(self):
with pytest.raises(UpstreamRegistryError):
self.proxy.blob_exists(digest=self.digest_404)
def test_get_blob(self):
try:
self.proxy.get_blob(self.digest)
except Exception as e:
assert False, f"unexpected exception {e}"
def test_get_blob_404(self):
with pytest.raises(UpstreamRegistryError):
self.proxy.get_blob(self.digest_404)