1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/storage/test/test_cloud_storage.py
Marcus Kok c49077cff6 storage: Do nothing when completing chunked upload if chunk list is empty (PROJQUAY-5489) (#2005)
When completing a chunked upload, if the chunk list is empty do not attempt to assemble anything.

Using oras to copy an artifact from an outside registry to quay results in a 5XX error. This is because at some point the upload chunk list is empty and attempting to complete the chunked upload causes an exception. Not trying to write to storage if there are no chunks allows the copy operation to successfully complete.
2023-07-06 15:45:20 -04:00

352 lines
11 KiB
Python

import os
import time
from datetime import timedelta
from io import BytesIO
import boto3
import botocore.exceptions
import pytest
from moto import mock_s3
from storage import S3Storage, StorageContext
from storage.cloud import (
_CHUNKS_KEY,
_build_endpoint_url,
_CloudStorage,
_PartUploadMetadata,
)
_TEST_CONTENT = os.urandom(1024)
_TEST_BUCKET = "somebucket"
_TEST_USER = "someuser"
_TEST_PASSWORD = "somepassword"
_TEST_REGION = "us-bacon-1"
_TEST_PATH = "some/cool/path"
_TEST_UPLOADS_PATH = "uploads/ee160658-9444-4950-8ec6-30faab40529c"
_TEST_CONTEXT = StorageContext("nyc", None, None, None)
@pytest.fixture(scope="function")
def storage_engine():
with mock_s3():
# Create a test bucket and put some test content.
boto3.client("s3").create_bucket(Bucket=_TEST_BUCKET)
engine = S3Storage(
_TEST_CONTEXT, "some/path", _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD, _TEST_REGION
)
assert engine._connect_kwargs["endpoint_url"] == "https://s3.{}.amazonaws.com".format(
_TEST_REGION
)
engine.put_content(_TEST_PATH, _TEST_CONTENT)
yield engine
@pytest.mark.parametrize(
"hostname, port, is_secure, expected",
[
pytest.param("somehost", None, False, "http://somehost"),
pytest.param("somehost", 8080, False, "http://somehost:8080"),
pytest.param("somehost", 8080, True, "https://somehost:8080"),
pytest.param("https://somehost.withscheme", None, False, "https://somehost.withscheme"),
pytest.param("http://somehost.withscheme", None, True, "http://somehost.withscheme"),
pytest.param("somehost.withport:8080", 9090, True, "https://somehost.withport:8080"),
],
)
def test_build_endpoint_url(hostname, port, is_secure, expected):
assert _build_endpoint_url(hostname, port, is_secure) == expected
def test_basicop(storage_engine):
# Ensure the content exists.
assert storage_engine.exists(_TEST_PATH)
# Verify it can be retrieved.
assert storage_engine.get_content(_TEST_PATH) == _TEST_CONTENT
# Retrieve a checksum for the content.
storage_engine.get_checksum(_TEST_PATH)
# Remove the file.
storage_engine.remove(_TEST_PATH)
# Ensure it no longer exists.
with pytest.raises(IOError):
storage_engine.get_content(_TEST_PATH)
with pytest.raises(IOError):
storage_engine.get_checksum(_TEST_PATH)
assert not storage_engine.exists(_TEST_PATH)
def test_storage_setup(storage_engine):
storage_engine.setup()
def test_remove_dir(storage_engine):
# Ensure the content exists.
assert storage_engine.exists(_TEST_PATH)
# Verify it can be retrieved.
assert storage_engine.get_content(_TEST_PATH) == _TEST_CONTENT
# Retrieve a checksum for the content.
storage_engine.get_checksum(_TEST_PATH)
# Remove the "directory".
storage_engine.remove(_TEST_PATH.split("/")[0])
assert not storage_engine.exists(_TEST_PATH)
@pytest.mark.parametrize(
"bucket, username, password",
[
pytest.param(_TEST_BUCKET, _TEST_USER, _TEST_PASSWORD, id="same credentials"),
pytest.param("another_bucket", "blech", "password", id="different credentials"),
],
)
def test_copy(bucket, username, password, storage_engine):
# Copy the content to another engine.
another_engine = S3Storage(
_TEST_CONTEXT, "another/path", _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD
)
boto3.client("s3").create_bucket(Bucket="another_bucket")
storage_engine.copy_to(another_engine, _TEST_PATH)
# Verify it can be retrieved.
assert another_engine.get_content(_TEST_PATH) == _TEST_CONTENT
def test_copy_with_error(storage_engine):
another_engine = S3Storage(_TEST_CONTEXT, "another/path", "anotherbucket", "foo", "bar")
with pytest.raises(IOError):
storage_engine.copy_to(another_engine, _TEST_PATH)
def test_stream_read(storage_engine):
# Read the streaming content.
data = b"".join(storage_engine.stream_read(_TEST_PATH))
assert data == _TEST_CONTENT
def test_stream_read_file(storage_engine):
with storage_engine.stream_read_file(_TEST_PATH) as f:
assert f.read() == _TEST_CONTENT
def test_stream_write(storage_engine):
new_data = os.urandom(4096)
storage_engine.stream_write(_TEST_PATH, BytesIO(new_data), content_type="Cool/Type")
assert storage_engine.get_content(_TEST_PATH) == new_data
def test_stream_write_error():
with mock_s3():
# Create an engine but not the bucket.
engine = S3Storage(_TEST_CONTEXT, "some/path", _TEST_BUCKET, _TEST_USER, _TEST_PASSWORD)
# Attempt to write to the uncreated bucket, which should raise an error.
with pytest.raises(IOError):
engine.stream_write(_TEST_PATH, BytesIO(b"hello world"), content_type="Cool/Type")
with pytest.raises(botocore.exceptions.ClientError) as excinfo:
engine.exists(_TEST_PATH)
assert s3r.value.response["Error"]["Code"] == "NoSuchBucket"
@pytest.mark.parametrize(
"chunk_count",
[
0,
1,
2,
50,
],
)
@pytest.mark.parametrize("force_client_side", [False, True])
def test_chunk_upload(storage_engine, chunk_count, force_client_side):
if chunk_count == 0 and force_client_side:
return
upload_id, metadata = storage_engine.initiate_chunked_upload()
final_data = b""
for index in range(0, chunk_count):
chunk_data = os.urandom(1024)
final_data = final_data + chunk_data
bytes_written, new_metadata, error = storage_engine.stream_upload_chunk(
upload_id, 0, len(chunk_data), BytesIO(chunk_data), metadata
)
metadata = new_metadata
assert bytes_written == len(chunk_data)
assert error is None
assert len(metadata[_CHUNKS_KEY]) == index + 1
# Complete the chunked upload.
storage_engine.complete_chunked_upload(
upload_id, "some/chunked/path", metadata, force_client_side=force_client_side
)
# Ensure the file contents are valid.
if chunk_count != 0:
assert storage_engine.get_content("some/chunked/path") == final_data
@pytest.mark.parametrize(
"chunk_count",
[
0,
1,
50,
],
)
def test_cancel_chunked_upload(storage_engine, chunk_count):
upload_id, metadata = storage_engine.initiate_chunked_upload()
for _ in range(0, chunk_count):
chunk_data = os.urandom(1024)
_, new_metadata, _ = storage_engine.stream_upload_chunk(
upload_id, 0, len(chunk_data), BytesIO(chunk_data), metadata
)
metadata = new_metadata
# Cancel the upload.
storage_engine.cancel_chunked_upload(upload_id, metadata)
# Ensure all chunks were deleted.
for chunk in metadata[_CHUNKS_KEY]:
assert not storage_engine.exists(chunk.path)
def test_large_chunks_upload(storage_engine):
# Make the max chunk size much smaller for testing.
storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2
upload_id, metadata = storage_engine.initiate_chunked_upload()
# Write a "super large" chunk, to ensure that it is broken into smaller chunks.
chunk_data = os.urandom(int(storage_engine.maximum_chunk_size * 2.5))
bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(
upload_id, 0, -1, BytesIO(chunk_data), metadata
)
assert len(chunk_data) == bytes_written
# Complete the chunked upload.
storage_engine.complete_chunked_upload(upload_id, "some/chunked/path", new_metadata)
# Ensure the file contents are valid.
assert len(chunk_data) == len(storage_engine.get_content("some/chunked/path"))
assert storage_engine.get_content("some/chunked/path") == chunk_data
def test_large_chunks_with_ragged_edge(storage_engine):
# Make the max chunk size much smaller for testing and force it to have a ragged edge.
storage_engine.maximum_chunk_size = storage_engine.minimum_chunk_size * 2 + 10
upload_id, metadata = storage_engine.initiate_chunked_upload()
# Write a few "super large" chunks, to ensure that it is broken into smaller chunks.
all_data = b""
for _ in range(0, 2):
chunk_data = os.urandom(int(storage_engine.maximum_chunk_size) + 20)
bytes_written, new_metadata, _ = storage_engine.stream_upload_chunk(
upload_id, 0, -1, BytesIO(chunk_data), metadata
)
assert len(chunk_data) == bytes_written
all_data = all_data + chunk_data
metadata = new_metadata
# Complete the chunked upload.
storage_engine.complete_chunked_upload(upload_id, "some/chunked/path", new_metadata)
# Ensure the file contents are valid.
assert len(all_data) == len(storage_engine.get_content("some/chunked/path"))
assert storage_engine.get_content("some/chunked/path") == all_data
@pytest.mark.parametrize(
"max_size, parts",
[
(
50,
[
_PartUploadMetadata("foo", 0, 50),
_PartUploadMetadata("foo", 50, 50),
],
),
(
40,
[
_PartUploadMetadata("foo", 0, 25),
_PartUploadMetadata("foo", 25, 25),
_PartUploadMetadata("foo", 50, 25),
_PartUploadMetadata("foo", 75, 25),
],
),
(
51,
[
_PartUploadMetadata("foo", 0, 50),
_PartUploadMetadata("foo", 50, 50),
],
),
(
49,
[
_PartUploadMetadata("foo", 0, 25),
_PartUploadMetadata("foo", 25, 25),
_PartUploadMetadata("foo", 50, 25),
_PartUploadMetadata("foo", 75, 25),
],
),
(
99,
[
_PartUploadMetadata("foo", 0, 50),
_PartUploadMetadata("foo", 50, 50),
],
),
(
100,
[
_PartUploadMetadata("foo", 0, 100),
],
),
],
)
def test_rechunked(max_size, parts):
chunk = _PartUploadMetadata("foo", 0, 100)
rechunked = list(_CloudStorage._rechunk(chunk, max_size))
assert len(rechunked) == len(parts)
for index, chunk in enumerate(rechunked):
assert chunk == parts[index]
@pytest.mark.parametrize("path", ["/", _TEST_PATH])
def test_clean_partial_uploads(storage_engine, path):
# Setup root path and add come content to _root_path/uploads
storage_engine._root_path = path
storage_engine.put_content(_TEST_UPLOADS_PATH, _TEST_CONTENT)
assert storage_engine.exists(_TEST_UPLOADS_PATH)
assert storage_engine.get_content(_TEST_UPLOADS_PATH) == _TEST_CONTENT
# Test ensure fresh blobs are not deleted
storage_engine.clean_partial_uploads(timedelta(days=2))
assert storage_engine.exists(_TEST_UPLOADS_PATH)
assert storage_engine.get_content(_TEST_UPLOADS_PATH) == _TEST_CONTENT
# Test deletion of stale blobs
time.sleep(1)
storage_engine.clean_partial_uploads(timedelta(seconds=0))
assert not storage_engine.exists(_TEST_UPLOADS_PATH)
# Test if uploads folder does not exist
storage_engine.remove("uploads")
assert not storage_engine.exists("uploads")
storage_engine.clean_partial_uploads(timedelta(seconds=0))