mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* chore: drop deprecated tables and remove unused code * isort imports * migration: check for table existence before drop
416 lines
15 KiB
Python
416 lines
15 KiB
Python
"""
|
|
Azure storage driver.
|
|
|
|
Based on: https://docs.microsoft.com/en-us/azure/storage/blobs/storage-python-how-to-use-blob-storage
|
|
"""
|
|
|
|
import concurrent.futures
|
|
import copy
|
|
import io
|
|
import logging
|
|
import os
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
|
|
from azure.core.exceptions import AzureError, ResourceNotFoundError
|
|
from azure.storage.blob import (
|
|
BlobBlock,
|
|
BlobServiceClient,
|
|
BlobType,
|
|
ContainerSasPermissions,
|
|
ContentSettings,
|
|
CorsRule,
|
|
generate_blob_sas,
|
|
)
|
|
|
|
from storage.basestorage import BaseStorage
|
|
from util.registry.filelike import READ_UNTIL_END, LimitingStream
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_COPY_POLL_SLEEP = 0.25 # seconds
|
|
_MAX_COPY_POLL_COUNT = 120 # _COPY_POLL_SLEEPs => 120s
|
|
_API_VERSION_LIMITS = {
|
|
"2016-05-31": (datetime.strptime("2016-05-31", "%Y-%m-%d"), 1024 * 1024 * 4), # 4MiB
|
|
"2019-07-07": (datetime.strptime("2019-07-07", "%Y-%m-%d"), 1024 * 1024 * 100), # 100MiB
|
|
"2019-12-12": (datetime.strptime("2019-12-12", "%Y-%m-%d"), 1024 * 1024 * 1000 * 4), # 4000MiB
|
|
}
|
|
_BLOCKS_KEY = "blocks"
|
|
_CONTENT_TYPE_KEY = "content-type"
|
|
|
|
|
|
AZURE_STORAGE_URL_STRING = {
|
|
"global": "https://{}.blob.core.windows.net",
|
|
}
|
|
|
|
|
|
class AzureStorage(BaseStorage):
|
|
def __init__(
|
|
self,
|
|
context,
|
|
azure_container,
|
|
storage_path,
|
|
azure_account_name,
|
|
azure_account_key=None,
|
|
sas_token=None,
|
|
endpoint_url=None,
|
|
):
|
|
super(AzureStorage, self).__init__()
|
|
self._context = context
|
|
self._storage_path = storage_path.lstrip("/")
|
|
|
|
self._azure_account_name = azure_account_name
|
|
self._azure_account_key = azure_account_key
|
|
self._azure_sas_token = sas_token
|
|
self._azure_container = azure_container
|
|
self._azure_connection_string = endpoint_url
|
|
|
|
self._blob_service_client = BlobServiceClient(
|
|
self._azure_connection_string
|
|
if self._azure_connection_string
|
|
else AZURE_STORAGE_URL_STRING["global"].format(self._azure_account_name),
|
|
credential=self._azure_account_key,
|
|
)
|
|
|
|
# https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs
|
|
api_version = self._blob_service_client.api_version
|
|
api_version_dt = datetime.strptime(api_version, "%Y-%m-%d")
|
|
if api_version_dt < _API_VERSION_LIMITS["2016-05-31"][0]:
|
|
self._max_block_size = _API_VERSION_LIMITS["2016-05-31"][1]
|
|
elif api_version_dt <= _API_VERSION_LIMITS["2019-07-07"][0]:
|
|
self._max_block_size = _API_VERSION_LIMITS["2019-07-07"][1]
|
|
elif api_version_dt >= _API_VERSION_LIMITS["2019-12-12"][0]:
|
|
self._max_block_size = _API_VERSION_LIMITS["2019-12-12"][1]
|
|
else:
|
|
raise Exception("Unknown Azure api version %s" % api_version)
|
|
|
|
def _blob_name_from_path(self, object_path):
|
|
if ".." in object_path:
|
|
raise Exception("Relative paths are not allowed; found %s" % object_path)
|
|
|
|
return os.path.join(self._storage_path, object_path).rstrip("/")
|
|
|
|
def _upload_blob_path_from_uuid(self, uuid):
|
|
return self._blob_name_from_path(self._upload_blob_name_from_uuid(uuid))
|
|
|
|
def _upload_blob_name_from_uuid(self, uuid):
|
|
return "uploads/{0}".format(uuid)
|
|
|
|
def _blob(self, blob_name):
|
|
return self._blob_service_client.get_blob_client(self._azure_container, blob_name)
|
|
|
|
@property
|
|
def _container(self):
|
|
return self._blob_service_client.get_container_client(self._azure_container)
|
|
|
|
def get_direct_download_url(
|
|
self,
|
|
object_path,
|
|
request_ip=None,
|
|
expires_in=60,
|
|
requires_cors=False,
|
|
head=False,
|
|
**kwargs,
|
|
):
|
|
blob_name = self._blob_name_from_path(object_path)
|
|
|
|
try:
|
|
sas_token = generate_blob_sas(
|
|
self._azure_account_name,
|
|
self._azure_container,
|
|
blob_name,
|
|
account_key=self._azure_account_key,
|
|
permission=ContainerSasPermissions.from_string("r"),
|
|
expiry=datetime.utcnow() + timedelta(seconds=expires_in),
|
|
)
|
|
|
|
blob_url = "{}?{}".format(self._blob(blob_name).url, sas_token)
|
|
|
|
except AzureError:
|
|
logger.exception(
|
|
"Exception when trying to get direct download for path %s", object_path
|
|
)
|
|
raise IOError("Exception when trying to get direct download")
|
|
|
|
return blob_url
|
|
|
|
def validate(self, client):
|
|
super(AzureStorage, self).validate(client)
|
|
|
|
def get_content(self, path):
|
|
blob_name = self._blob_name_from_path(path)
|
|
try:
|
|
blob_stream = self._blob(blob_name).download_blob()
|
|
except AzureError:
|
|
logger.exception("Exception when trying to get path %s", path)
|
|
raise IOError("Exception when trying to get path")
|
|
|
|
return blob_stream.content_as_bytes()
|
|
|
|
def put_content(self, path, content):
|
|
blob_name = self._blob_name_from_path(path)
|
|
try:
|
|
self._blob(blob_name).upload_blob(content, blob_type=BlobType.BlockBlob, overwrite=True)
|
|
except AzureError:
|
|
logger.exception("Exception when trying to put path %s", path)
|
|
raise IOError("Exception when trying to put path")
|
|
|
|
def stream_read(self, path):
|
|
with self.stream_read_file(path) as f:
|
|
while True:
|
|
buf = f.read(self.buffer_size)
|
|
if not buf:
|
|
break
|
|
yield buf
|
|
|
|
def stream_read_file(self, path):
|
|
blob_name = self._blob_name_from_path(path)
|
|
|
|
try:
|
|
output_stream = io.BytesIO()
|
|
self._blob(blob_name).download_blob().download_to_stream(output_stream)
|
|
output_stream.seek(0)
|
|
except AzureError:
|
|
logger.exception("Exception when trying to stream_file_read path %s", path)
|
|
raise IOError("Exception when trying to stream_file_read path")
|
|
|
|
return output_stream
|
|
|
|
def stream_write(self, path, fp, content_type=None, content_encoding=None):
|
|
blob_name = self._blob_name_from_path(path)
|
|
content_settings = ContentSettings(
|
|
content_type=content_type,
|
|
content_encoding=content_encoding,
|
|
)
|
|
|
|
try:
|
|
self._blob(blob_name).upload_blob(fp, content_settings=content_settings, overwrite=True)
|
|
except AzureError as ae:
|
|
logger.exception("Exception when trying to stream_write path %s", path)
|
|
raise IOError("Exception when trying to stream_write path", ae)
|
|
|
|
def exists(self, path):
|
|
blob_name = self._blob_name_from_path(path)
|
|
|
|
try:
|
|
self._blob(blob_name).get_blob_properties()
|
|
except ResourceNotFoundError:
|
|
return False
|
|
except AzureError:
|
|
logger.exception("Exception when trying to check exists path %s", path)
|
|
raise IOError("Exception when trying to check exists path")
|
|
|
|
return True
|
|
|
|
def remove(self, path):
|
|
blob_name = self._blob_name_from_path(path)
|
|
try:
|
|
self._blob(blob_name).delete_blob()
|
|
except AzureError:
|
|
logger.exception("Exception when trying to remove path %s", path)
|
|
raise IOError("Exception when trying to remove path")
|
|
|
|
def get_checksum(self, path):
|
|
blob_name = self._blob_name_from_path(path)
|
|
try:
|
|
blob_properties = self._blob(blob_name).get_blob_properties()
|
|
except AzureError:
|
|
logger.exception("Exception when trying to get_checksum for path %s", path)
|
|
raise IOError("Exception when trying to get_checksum path")
|
|
return blob_properties.etag
|
|
|
|
def initiate_chunked_upload(self):
|
|
random_uuid = str(uuid.uuid4())
|
|
metadata = {
|
|
_BLOCKS_KEY: [],
|
|
_CONTENT_TYPE_KEY: None,
|
|
}
|
|
return random_uuid, metadata
|
|
|
|
def stream_upload_chunk(self, uuid, offset, length, in_fp, storage_metadata, content_type=None):
|
|
if length == 0:
|
|
return 0, storage_metadata, None
|
|
|
|
upload_blob_path = self._upload_blob_path_from_uuid(uuid)
|
|
new_metadata = copy.deepcopy(storage_metadata)
|
|
|
|
total_bytes_written = 0
|
|
|
|
while True:
|
|
# Deal with smaller blocks to avoid timeouts when staging larger blocks.
|
|
# Azure has a limit of 50000 blocks,
|
|
# which should be enough for most use cases (12800GB per blob).
|
|
max_length = min(self._max_block_size, 1024 * 1024 * 128)
|
|
|
|
if max_length <= 0:
|
|
break
|
|
|
|
# Note: Azure fails if a zero-length block is uploaded, so we read all the data here,
|
|
# and, if there is none, terminate early.
|
|
bytes_written = 0
|
|
|
|
with io.BytesIO() as buf:
|
|
bytes_written += buf.write(in_fp.read(max_length))
|
|
buf.seek(0)
|
|
if bytes_written == 0:
|
|
break
|
|
|
|
block_index = len(new_metadata[_BLOCKS_KEY])
|
|
block_id = format(block_index, "05")
|
|
new_metadata[_BLOCKS_KEY].append(block_id)
|
|
|
|
try:
|
|
# TODO (kleesc): Look at doing this multithreaded
|
|
self._blob(upload_blob_path).stage_block(block_id, buf, validate_content=True)
|
|
except AzureError as ae:
|
|
raise IOError(
|
|
"Exception when trying to stream_upload_chunk block %s for %s",
|
|
block_id,
|
|
uuid,
|
|
)
|
|
|
|
total_bytes_written += bytes_written
|
|
if bytes_written == 0 or bytes_written < max_length:
|
|
break
|
|
|
|
if content_type is not None:
|
|
new_metadata[_CONTENT_TYPE_KEY] = content_type
|
|
|
|
return total_bytes_written, new_metadata, None
|
|
|
|
def complete_chunked_upload(self, uuid, final_path, storage_metadata):
|
|
"""
|
|
Complete the chunked upload and store the final results in the path indicated.
|
|
|
|
Returns nothing.
|
|
"""
|
|
# Commit the blob's blocks.
|
|
upload_blob_name = self._upload_blob_name_from_uuid(uuid) # upload/<uuid>
|
|
upload_blob_path = self._upload_blob_path_from_uuid(uuid) # storage/path/upload/<uuid>
|
|
block_list = [BlobBlock(block_id) for block_id in storage_metadata[_BLOCKS_KEY]]
|
|
|
|
try:
|
|
if storage_metadata[_CONTENT_TYPE_KEY] is not None:
|
|
content_settings = ContentSettings(content_type=storage_metadata[_CONTENT_TYPE_KEY])
|
|
self._blob(upload_blob_path).commit_block_list(
|
|
block_list, content_settings=content_settings
|
|
)
|
|
else:
|
|
self._blob(upload_blob_path).commit_block_list(block_list)
|
|
except AzureError:
|
|
logger.exception(
|
|
"Exception when trying to put block list for path %s from upload %s",
|
|
final_path,
|
|
uuid,
|
|
)
|
|
raise IOError("Exception when trying to put block list")
|
|
|
|
# Copy the blob to its final location.
|
|
upload_blob_name = self._upload_blob_name_from_uuid(uuid)
|
|
copy_source_url = self.get_direct_download_url(upload_blob_name, expires_in=300)
|
|
|
|
try:
|
|
final_blob_name = self._blob_name_from_path(final_path)
|
|
cp = self._blob(final_blob_name).start_copy_from_url(copy_source_url)
|
|
except AzureError:
|
|
logger.exception(
|
|
"Exception when trying to set copy uploaded blob %s to path %s", uuid, final_path
|
|
)
|
|
raise IOError("Exception when trying to copy uploaded blob")
|
|
|
|
self._await_copy(final_blob_name)
|
|
|
|
# Delete the original blob.
|
|
logger.debug("Deleting chunked upload %s at path %s", uuid, upload_blob_path)
|
|
try:
|
|
self._blob(upload_blob_path).delete_blob()
|
|
except AzureError:
|
|
logger.exception("Exception when trying to set delete uploaded blob %s", uuid)
|
|
raise IOError("Exception when trying to delete uploaded blob")
|
|
|
|
def cancel_chunked_upload(self, uuid, storage_metadata):
|
|
"""
|
|
Cancel the chunked upload and clean up any outstanding partially uploaded data.
|
|
|
|
Returns nothing.
|
|
"""
|
|
upload_blob_path = self._upload_blob_path_from_uuid(uuid)
|
|
logger.debug("Canceling chunked upload %s at path %s", uuid, upload_blob_path)
|
|
try:
|
|
self._blob(upload_blob_path).delete_blob()
|
|
except ResourceNotFoundError:
|
|
pass
|
|
|
|
def _await_copy(self, blob_name):
|
|
# Poll for copy completion.
|
|
blob = self._blob(blob_name)
|
|
copy_prop = blob.get_blob_properties().copy
|
|
|
|
count = 0
|
|
while copy_prop.status == "pending":
|
|
props = blob.get_blob_properties()
|
|
copy_prop = props.copy
|
|
|
|
if copy_prop.status == "success":
|
|
return
|
|
|
|
if copy_prop.status == "failed" or copy_prop.status == "aborted":
|
|
raise IOError(
|
|
"Copy of blob %s failed with status %s" % (blob_name, copy_prop.status)
|
|
)
|
|
|
|
count = count + 1
|
|
if count > _MAX_COPY_POLL_COUNT:
|
|
raise IOError("Timed out waiting for copy to complete")
|
|
|
|
time.sleep(_COPY_POLL_SLEEP)
|
|
|
|
def copy_to(self, destination, path):
|
|
if self.__class__ == destination.__class__:
|
|
logger.debug(
|
|
"Starting copying file from Azure %s to Azure %s via an Azure copy",
|
|
self._azure_container,
|
|
destination._azure_container,
|
|
)
|
|
copy_source_url = self.get_direct_download_url(path)
|
|
blob_name = destination._blob_name_from_path(path)
|
|
dest_blob = destination._blob(blob_name)
|
|
|
|
destination._blob(blob_name).start_copy_from_url(copy_source_url)
|
|
destination._await_copy(blob_name)
|
|
logger.debug(
|
|
"Finished copying file from Azure %s to Azure %s via an Azure copy",
|
|
self._azure_container,
|
|
destination._azure_container,
|
|
)
|
|
return
|
|
|
|
# Fallback to a slower, default copy.
|
|
logger.debug(
|
|
"Copying file from Azure container %s to %s via a streamed copy",
|
|
self._azure_container,
|
|
destination,
|
|
)
|
|
with self.stream_read_file(path) as fp:
|
|
destination.stream_write(path, fp)
|
|
|
|
def setup(self):
|
|
# From: https://docs.microsoft.com/en-us/rest/api/storageservices/cross-origin-resource-sharing--cors--support-for-the-azure-storage-services
|
|
cors = [
|
|
CorsRule(
|
|
allowed_origins="*",
|
|
allowed_methods=["GET", "PUT"],
|
|
max_age_in_seconds=3000,
|
|
exposed_headers=["x-ms-meta-*"],
|
|
allowed_headers=[
|
|
"x-ms-meta-data*",
|
|
"x-ms-meta-target*",
|
|
"x-ms-meta-abc",
|
|
"Content-Type",
|
|
],
|
|
)
|
|
]
|
|
|
|
self._blob_service_client.set_service_properties(cors=cors)
|