1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/workers/test/test_storagereplication.py
Kenny Lee Sin Cheong 5f63b3a7bb chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

159 lines
5.9 KiB
Python

import hashlib
from test.fixtures import *
import pytest
from data import database, model
from storage.basestorage import StoragePaths
from storage.distributedstorage import DistributedStorage
from storage.fakestorage import FakeStorage
from workers.storagereplication import (
JobException,
StorageReplicationWorker,
WorkerSleepException,
WorkerUnhealthyException,
)
@pytest.fixture()
def storage_user(app):
user = model.user.get_user("devtable")
database.UserRegion.create(
user=user, location=database.ImageStorageLocation.get(name="local_us")
)
database.UserRegion.create(
user=user, location=database.ImageStorageLocation.get(name="local_eu")
)
return user
@pytest.fixture()
def storage_paths():
return StoragePaths()
@pytest.fixture()
def replication_worker():
return StorageReplicationWorker(None)
@pytest.fixture()
def storage():
return DistributedStorage(
{"local_us": FakeStorage("local"), "local_eu": FakeStorage("local")}, ["local_us"]
)
def test_storage_replication_v1(storage_user, storage_paths, replication_worker, storage, app):
# Add a storage entry with a V1 path.
v1_storage = model.storage.create_v1_storage("local_us")
content_path = storage_paths.v1_image_layer_path(v1_storage.uuid)
storage.put_content(["local_us"], content_path, b"some content")
# Call replicate on it and verify it replicates.
replication_worker.replicate_storage(storage_user, v1_storage.uuid, storage)
# Ensure that the data was replicated to the other "region".
assert storage.get_content(["local_eu"], content_path) == b"some content"
locations = model.storage.get_storage_locations(v1_storage.uuid)
assert len(locations) == 2
def test_storage_replication_cas(storage_user, storage_paths, replication_worker, storage, app):
# Add a storage entry with a CAS path.
content_checksum = "sha256:" + hashlib.sha256(b"some content").hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name="local_us")
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
content_path = storage_paths.blob_path(cas_storage.content_checksum)
storage.put_content(["local_us"], content_path, b"some content")
# Call replicate on it and verify it replicates.
replication_worker.replicate_storage(storage_user, cas_storage.uuid, storage)
# Ensure that the data was replicated to the other "region".
assert storage.get_content(["local_eu"], content_path) == b"some content"
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 2
def test_storage_replication_missing_base(
storage_user, storage_paths, replication_worker, storage, app
):
# Add a storage entry with a CAS path.
content_checksum = "sha256:" + hashlib.sha256(b"some content").hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name="local_us")
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
# Attempt to replicate storage. This should fail because the layer is missing from the base
# storage.
with pytest.raises(JobException):
replication_worker.replicate_storage(
storage_user, cas_storage.uuid, storage, backoff_check=False
)
# Ensure the storage location count remains 1. This is technically inaccurate, but that's okay
# as we still require at least one location per storage.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1
def test_storage_replication_copy_error(
storage_user, storage_paths, replication_worker, storage, app
):
# Add a storage entry with a CAS path.
content_checksum = "sha256:" + hashlib.sha256(b"some content").hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name="local_us")
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
content_path = storage_paths.blob_path(cas_storage.content_checksum)
storage.put_content(["local_us"], content_path, b"some content")
# Tell storage to break copying.
storage.put_content(["local_us"], "break_copying", b"true")
# Attempt to replicate storage. This should fail because the write fails.
with pytest.raises(WorkerSleepException):
replication_worker.replicate_storage(
storage_user, cas_storage.uuid, storage, backoff_check=False
)
# Ensure the storage location count remains 1.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1
def test_storage_replication_copy_didnot_copy(
storage_user, storage_paths, replication_worker, storage, app
):
# Add a storage entry with a CAS path.
content_checksum = "sha256:" + hashlib.sha256(b"some content").hexdigest()
cas_storage = database.ImageStorage.create(cas_path=True, content_checksum=content_checksum)
location = database.ImageStorageLocation.get(name="local_us")
database.ImageStoragePlacement.create(storage=cas_storage, location=location)
content_path = storage_paths.blob_path(cas_storage.content_checksum)
storage.put_content(["local_us"], content_path, b"some content")
# Tell storage to fake copying (i.e. not actually copy the data).
storage.put_content(["local_us"], "fake_copying", b"true")
# Attempt to replicate storage. This should fail because the copy doesn't actually do the copy.
with pytest.raises(JobException):
replication_worker.replicate_storage(
storage_user, cas_storage.uuid, storage, backoff_check=False
)
# Ensure the storage location count remains 1.
locations = model.storage.get_storage_locations(cas_storage.uuid)
assert len(locations) == 1