mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* feat(mirror): add architecture-filtered mirroring support (PROJQUAY-10257) When architecture_filter is set on a mirror config, copy only the specified architectures instead of using the --all flag. This preserves the original manifest list digest for OpenShift compatibility by pushing the original manifest bytes directly after copying the filtered architecture manifests. Key changes: - Add inspect_raw() and copy_by_digest() methods to SkopeoMirror - Create manifest_utils.py for manifest list parsing and filtering - Modify perform_mirror() to use architecture filtering when configured - Add comprehensive unit tests for the new functionality Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(mirror): default media_type to OCI index when None (PROJQUAY-10257) Prevent InvalidHeader error when get_manifest_media_type() returns None by defaulting to OCI_IMAGE_INDEX_CONTENT_TYPE in the Content-Type header of the manifest push request. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
676 lines
24 KiB
Python
676 lines
24 KiB
Python
import fnmatch
|
|
import logging
|
|
import logging.config
|
|
import os
|
|
import re
|
|
import traceback
|
|
from typing import Optional
|
|
|
|
import requests
|
|
from prometheus_client import Gauge
|
|
|
|
import features
|
|
from app import app
|
|
from data import database
|
|
from data.database import RepoMirrorConfig, RepoMirrorStatus
|
|
from data.encryption import DecryptionFailureException
|
|
from data.logs_model import logs_model
|
|
from data.model.oci.tag import delete_tag, lookup_alive_tags_shallow, retarget_tag
|
|
from data.model.repo_mirror import (
|
|
change_retries_remaining,
|
|
change_sync_status,
|
|
check_repo_mirror_sync_status,
|
|
claim_mirror,
|
|
release_mirror,
|
|
)
|
|
from data.model.user import retrieve_robot_token
|
|
from data.registry_model import registry_model
|
|
from image.oci import OCI_IMAGE_INDEX_CONTENT_TYPE
|
|
from notifications import spawn_notification
|
|
from util.audit import wrap_repository
|
|
from util.repomirror.skopeomirror import SkopeoMirror, SkopeoResults
|
|
from workers.repomirrorworker.manifest_utils import (
|
|
filter_manifests_by_architecture,
|
|
get_available_architectures,
|
|
get_manifest_media_type,
|
|
is_manifest_list,
|
|
)
|
|
from workers.repomirrorworker.repo_mirror_model import repo_mirror_model as model
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
unmirrored_repositories = Gauge(
|
|
"quay_repository_rows_unmirrored",
|
|
"number of repositories in the database that have not yet been mirrored",
|
|
)
|
|
|
|
# Used only for testing - should not be set in production
|
|
TAG_ROLLBACK_PAGE_SIZE = app.config.get("REPO_MIRROR_TAG_ROLLBACK_PAGE_SIZE", 100)
|
|
|
|
|
|
class PreemptedException(Exception):
|
|
"""
|
|
Exception raised if another worker analyzed the image before this worker was able to do so.
|
|
"""
|
|
|
|
|
|
class RepoMirrorSkopeoException(Exception):
|
|
"""
|
|
Exception from skopeo.
|
|
"""
|
|
|
|
def __init__(self, message, stdout, stderr):
|
|
self.message = message
|
|
self.stdout = stdout
|
|
self.stderr = stderr
|
|
|
|
|
|
def process_mirrors(skopeo, token=None):
|
|
"""
|
|
Performs mirroring of repositories whose last sync time is greater than sync interval.
|
|
|
|
If a token is provided, scanning will begin where the token indicates it previously completed.
|
|
"""
|
|
|
|
if not features.REPO_MIRROR:
|
|
logger.debug("Repository mirror disabled; skipping RepoMirrorWorker process_mirrors")
|
|
return None
|
|
|
|
iterator, next_token = model.repositories_to_mirror(start_token=token)
|
|
if not iterator:
|
|
logger.debug("Found no additional repositories to mirror")
|
|
return next_token
|
|
|
|
with database.UseThenDisconnect(app.config):
|
|
for mirror, abt, num_remaining in iterator:
|
|
try:
|
|
perform_mirror(skopeo, mirror)
|
|
except PreemptedException:
|
|
logger.info(
|
|
"Another repository mirror worker pre-empted us for repository: %s", mirror.id
|
|
)
|
|
abt.set()
|
|
except Exception as e: # TODO: define exceptions
|
|
logger.exception("Repository Mirror service unavailable: %s" % e)
|
|
return None
|
|
|
|
unmirrored_repositories.set(num_remaining)
|
|
|
|
return next_token
|
|
|
|
|
|
def perform_mirror(skopeo: SkopeoMirror, mirror: RepoMirrorConfig):
|
|
"""
|
|
Run mirror on all matching tags of remote repository.
|
|
"""
|
|
|
|
if os.getenv("DEBUGLOG", "false").lower() == "true":
|
|
verbose_logs = True
|
|
else:
|
|
verbose_logs = False
|
|
|
|
mirror = claim_mirror(mirror)
|
|
if not mirror:
|
|
raise PreemptedException
|
|
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_started",
|
|
"start",
|
|
"'%s' with tag pattern '%s'"
|
|
% (mirror.external_reference, ",".join(mirror.root_rule.rule_value)),
|
|
)
|
|
|
|
# Fetch the tags to mirror, being careful to handle exceptions. The 'Exception' is safety net only, allowing
|
|
# easy communication by user through bug report.
|
|
tags = []
|
|
try:
|
|
tags = tags_to_mirror(skopeo, mirror)
|
|
except RepoMirrorSkopeoException as e:
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_failed",
|
|
"end",
|
|
"'%s' with tag pattern '%s': %s"
|
|
% (mirror.external_reference, ",".join(mirror.root_rule.rule_value), str(e)),
|
|
tags=", ".join(tags),
|
|
stdout=e.stdout,
|
|
stderr=e.stderr,
|
|
)
|
|
release_mirror(mirror, RepoMirrorStatus.FAIL)
|
|
return
|
|
except Exception as e:
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_failed",
|
|
"end",
|
|
"'%s' with tag pattern '%s': INTERNAL ERROR"
|
|
% (mirror.external_reference, ",".join(mirror.root_rule.rule_value)),
|
|
tags=", ".join(tags),
|
|
stdout="Not applicable",
|
|
stderr=traceback.format_exc(),
|
|
)
|
|
release_mirror(mirror, RepoMirrorStatus.FAIL)
|
|
return
|
|
if tags == []:
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_success",
|
|
"end",
|
|
"'%s' with tag pattern '%s'"
|
|
% (mirror.external_reference, ",".join(mirror.root_rule.rule_value)),
|
|
tags="No tags matched",
|
|
)
|
|
release_mirror(mirror, RepoMirrorStatus.SUCCESS)
|
|
return
|
|
|
|
# Sync tags
|
|
now_ms = database.get_epoch_timestamp_ms()
|
|
overall_status = RepoMirrorStatus.SUCCESS
|
|
failed_tags = []
|
|
try:
|
|
try:
|
|
username = (
|
|
mirror.external_registry_username.decrypt()
|
|
if mirror.external_registry_username
|
|
else None
|
|
)
|
|
password = (
|
|
mirror.external_registry_password.decrypt()
|
|
if mirror.external_registry_password
|
|
else None
|
|
)
|
|
except DecryptionFailureException:
|
|
logger.exception(
|
|
"Failed to decrypt username or password for mirroring %s", mirror.repository
|
|
)
|
|
raise
|
|
|
|
dest_server = (
|
|
app.config.get("REPO_MIRROR_SERVER_HOSTNAME", None) or app.config["SERVER_HOSTNAME"]
|
|
)
|
|
|
|
skopeo_timeout = mirror.skopeo_timeout
|
|
|
|
# Check for architecture filter
|
|
architecture_filter = mirror.architecture_filter or []
|
|
use_arch_filter = bool(architecture_filter)
|
|
if use_arch_filter:
|
|
logger.info(
|
|
"Architecture filter for %s/%s: %s",
|
|
mirror.repository.namespace_user.username,
|
|
mirror.repository.name,
|
|
architecture_filter,
|
|
)
|
|
|
|
for tag in tags:
|
|
src_image = "docker://%s:%s" % (mirror.external_reference, tag)
|
|
dest_image = "docker://%s/%s/%s:%s" % (
|
|
dest_server,
|
|
mirror.repository.namespace_user.username,
|
|
mirror.repository.name,
|
|
tag,
|
|
)
|
|
|
|
if use_arch_filter:
|
|
# Use architecture-filtered copy
|
|
result = copy_filtered_architectures(
|
|
skopeo, mirror, tag, architecture_filter, verbose_logs=verbose_logs
|
|
)
|
|
else:
|
|
# Use existing --all copy
|
|
with database.CloseForLongOperation(app.config):
|
|
result = skopeo.copy(
|
|
src_image,
|
|
dest_image,
|
|
timeout=skopeo_timeout,
|
|
src_tls_verify=mirror.external_registry_config.get("verify_tls", True),
|
|
dest_tls_verify=app.config.get(
|
|
"REPO_MIRROR_TLS_VERIFY", True
|
|
), # TODO: is this a config choice or something else?
|
|
src_username=username,
|
|
src_password=password,
|
|
dest_username=mirror.internal_robot.username,
|
|
dest_password=retrieve_robot_token(mirror.internal_robot),
|
|
proxy=mirror.external_registry_config.get("proxy", {}),
|
|
verbose_logs=verbose_logs,
|
|
unsigned_images=mirror.external_registry_config.get(
|
|
"unsigned_images", False
|
|
),
|
|
)
|
|
|
|
if check_repo_mirror_sync_status(mirror) == RepoMirrorStatus.CANCEL:
|
|
logger.info(
|
|
"Sync cancelled on repo %s/%s.",
|
|
mirror.repository.namespace_user.username,
|
|
mirror.repository.name,
|
|
)
|
|
overall_status = RepoMirrorStatus.CANCEL
|
|
break
|
|
|
|
if not result.success:
|
|
overall_status = RepoMirrorStatus.FAIL
|
|
failed_tags.append(tag)
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_tag_failed",
|
|
"finish",
|
|
"Source '%s' failed to sync" % src_image,
|
|
tag=tag,
|
|
stdout=result.stdout,
|
|
stderr=result.stderr,
|
|
)
|
|
logger.info("Source '%s' failed to sync." % src_image)
|
|
else:
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_tag_success",
|
|
"finish",
|
|
"Source '%s' successful sync" % src_image,
|
|
tag=tag,
|
|
stdout=result.stdout,
|
|
stderr=result.stderr,
|
|
)
|
|
logger.info("Source '%s' successful sync." % src_image)
|
|
|
|
delete_obsolete_tags(mirror, tags)
|
|
|
|
except PreemptedException as e:
|
|
overall_status = RepoMirrorStatus.FAIL
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_failed",
|
|
"lost",
|
|
"'%s' job lost" % (mirror.external_reference),
|
|
tags="",
|
|
stdout="Not applicable",
|
|
stderr="Not applicable",
|
|
)
|
|
release_mirror(mirror, overall_status)
|
|
return
|
|
|
|
except Exception as e:
|
|
overall_status = RepoMirrorStatus.FAIL
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_failed",
|
|
"end",
|
|
"'%s' with tag pattern '%s': INTERNAL ERROR"
|
|
% (mirror.external_reference, ",".join(mirror.root_rule.rule_value)),
|
|
tags=", ".join(tags),
|
|
stdout="Not applicable",
|
|
stderr=traceback.format_exc(),
|
|
)
|
|
release_mirror(mirror, overall_status)
|
|
return
|
|
finally:
|
|
if overall_status == RepoMirrorStatus.FAIL:
|
|
log_tags = []
|
|
log_message = "'%s' with tag pattern '%s'"
|
|
|
|
# Handle the case where not all tags were synced and state will not be rolled back
|
|
if (
|
|
len(failed_tags) != len(tags)
|
|
and len(failed_tags) > 0
|
|
and not app.config.get("REPO_MIRROR_ROLLBACK", False)
|
|
):
|
|
log_message = "'%s' with tag pattern '%s': PARTIAL SYNC"
|
|
for tag in tags:
|
|
if tag in failed_tags:
|
|
tag = tag + "(FAILED)"
|
|
log_tags.append(tag)
|
|
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_failed",
|
|
"lost",
|
|
log_message % (mirror.external_reference, ",".join(mirror.root_rule.rule_value)),
|
|
tags=", ".join(log_tags),
|
|
stdout="Not applicable",
|
|
stderr="Not applicable",
|
|
)
|
|
|
|
# Rollback the state of repo if feature is enabled,
|
|
# otherwise only rollback those that failed
|
|
if app.config.get("REPO_MIRROR_ROLLBACK", False):
|
|
rollback(mirror, now_ms)
|
|
else:
|
|
rollback(mirror, now_ms, failed_tags)
|
|
if overall_status == RepoMirrorStatus.CANCEL:
|
|
log_message = "'%s' with tag pattern '%s'"
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_failed",
|
|
"Cancelled",
|
|
log_message % (mirror.external_reference, ",".join(mirror.root_rule.rule_value)),
|
|
stdout="Not applicable",
|
|
stderr="Not applicable",
|
|
)
|
|
else:
|
|
emit_log(
|
|
mirror,
|
|
"repo_mirror_sync_success",
|
|
"end",
|
|
"'%s' with tag pattern '%s'"
|
|
% (mirror.external_reference, ",".join(mirror.root_rule.rule_value)),
|
|
tags=", ".join(tags),
|
|
)
|
|
release_mirror(mirror, overall_status)
|
|
|
|
return overall_status
|
|
|
|
|
|
def tags_to_mirror(skopeo: SkopeoMirror, mirror: RepoMirrorConfig) -> list[str]:
|
|
all_tags = get_all_tags(skopeo, mirror)
|
|
if all_tags == []:
|
|
return []
|
|
|
|
matching_tags: list[str] = []
|
|
for pattern in mirror.root_rule.rule_value:
|
|
matching_tags = matching_tags + [tag for tag in all_tags if fnmatch.fnmatch(tag, pattern)]
|
|
matching_tags = list(set(matching_tags))
|
|
matching_tags.sort()
|
|
return matching_tags
|
|
|
|
|
|
def get_all_tags(skopeo: SkopeoMirror, mirror: RepoMirrorConfig) -> list[str]:
|
|
verbose_logs = os.getenv("DEBUGLOG", "false").lower() == "true"
|
|
|
|
username = (
|
|
mirror.external_registry_username.decrypt() if mirror.external_registry_username else None
|
|
)
|
|
password = (
|
|
mirror.external_registry_password.decrypt() if mirror.external_registry_password else None
|
|
)
|
|
|
|
skopeo_timeout = mirror.skopeo_timeout
|
|
|
|
with database.CloseForLongOperation(app.config):
|
|
result = skopeo.tags(
|
|
"docker://%s" % (mirror.external_reference),
|
|
timeout=skopeo_timeout,
|
|
username=username,
|
|
password=password,
|
|
verbose_logs=verbose_logs,
|
|
verify_tls=mirror.external_registry_config.get("verify_tls", True),
|
|
proxy=mirror.external_registry_config.get("proxy", {}),
|
|
)
|
|
|
|
if not result.success:
|
|
raise RepoMirrorSkopeoException(
|
|
"skopeo list-tags failed: %s" % _skopeo_inspect_failure(result),
|
|
result.stdout,
|
|
result.stderr,
|
|
)
|
|
|
|
return result.tags
|
|
|
|
|
|
def _skopeo_inspect_failure(result: SkopeoResults) -> str:
|
|
"""
|
|
Custom processing of skopeo error messages for user friendly description.
|
|
|
|
:param result: SkopeoResults object
|
|
:return: Message to display
|
|
"""
|
|
|
|
return "See output"
|
|
|
|
|
|
def rollback(
|
|
mirror: RepoMirrorConfig, since_ms: int, tag_names: Optional[list[str]] = None
|
|
) -> None:
|
|
"""
|
|
:param mirror: Mirror to perform rollback on
|
|
:param start_time: Time mirror was started; all changes after will be undone
|
|
:return:
|
|
"""
|
|
|
|
repository_ref = registry_model.lookup_repository(
|
|
mirror.repository.namespace_user.username, mirror.repository.name
|
|
)
|
|
|
|
tags = []
|
|
index = 1
|
|
has_more = True
|
|
while has_more:
|
|
tags_page, has_more = registry_model.list_repository_tag_history(
|
|
repository_ref, index, TAG_ROLLBACK_PAGE_SIZE, since_time_ms=since_ms
|
|
)
|
|
tags.extend(tags_page)
|
|
index = index + 1
|
|
|
|
if tag_names is not None:
|
|
tags = [tag for tag in tags if tag.name in tag_names]
|
|
|
|
for tag in tags:
|
|
logger.debug("Repo mirroring rollback tag '%s'" % tag)
|
|
|
|
# If the tag has an end time, it was either deleted or moved.
|
|
if tag.lifetime_end_ms:
|
|
# If a future entry exists with a start time equal to the end time for this tag,
|
|
# then the action was a move, rather than a delete and a create.
|
|
tag_list = list(
|
|
filter(
|
|
lambda t: tag != t
|
|
and tag.name == t.name
|
|
and tag.lifetime_end_ms
|
|
and t.lifetime_start_ms == tag.lifetime_end_ms,
|
|
tags,
|
|
)
|
|
)
|
|
if len(tag_list) > 0:
|
|
logger.debug("Repo mirroring rollback revert tag '%s'" % tag)
|
|
retarget_tag(tag.name, tag.manifest._db_id, is_reversion=True)
|
|
else:
|
|
logger.debug("Repo mirroring recreate tag '%s'" % tag)
|
|
retarget_tag(tag.name, tag.manifest._db_id, is_reversion=True)
|
|
|
|
# If the tag has a start time, it was created.
|
|
elif tag.lifetime_start_ms:
|
|
logger.debug("Repo mirroring rollback delete tag '%s'" % tag)
|
|
delete_tag(mirror.repository, tag.name)
|
|
|
|
|
|
def delete_obsolete_tags(mirror, tags):
|
|
existing_tags, _ = lookup_alive_tags_shallow(mirror.repository.id)
|
|
obsolete_tags = list([tag for tag in existing_tags if tag.name not in tags])
|
|
|
|
for tag in obsolete_tags:
|
|
logger.debug("Repo mirroring delete obsolete tag '%s'" % tag.name)
|
|
delete_tag(mirror.repository, tag.name)
|
|
|
|
return obsolete_tags
|
|
|
|
|
|
def push_sparse_manifest_list(mirror, tag, manifest_bytes, media_type):
|
|
"""
|
|
Push original manifest list bytes directly to preserve digest.
|
|
|
|
Returns True on success, False on failure.
|
|
"""
|
|
dest_server = (
|
|
app.config.get("REPO_MIRROR_SERVER_HOSTNAME", None) or app.config["SERVER_HOSTNAME"]
|
|
)
|
|
namespace = mirror.repository.namespace_user.username
|
|
repo_name = mirror.repository.name
|
|
url = f"https://{dest_server}/v2/{namespace}/{repo_name}/manifests/{tag}"
|
|
|
|
robot_username = mirror.internal_robot.username
|
|
robot_token = retrieve_robot_token(mirror.internal_robot)
|
|
dest_tls_verify = app.config.get("REPO_MIRROR_TLS_VERIFY", True)
|
|
|
|
try:
|
|
response = requests.put(
|
|
url,
|
|
data=(
|
|
manifest_bytes.encode("utf-8")
|
|
if isinstance(manifest_bytes, str)
|
|
else manifest_bytes
|
|
),
|
|
headers={"Content-Type": media_type or OCI_IMAGE_INDEX_CONTENT_TYPE},
|
|
auth=(robot_username, robot_token),
|
|
verify=dest_tls_verify,
|
|
timeout=60,
|
|
)
|
|
if response.status_code in (200, 201):
|
|
logger.info("Pushed sparse manifest list for %s/%s:%s", namespace, repo_name, tag)
|
|
return True
|
|
logger.error("Failed to push manifest list: %s %s", response.status_code, response.text)
|
|
return False
|
|
except requests.RequestException as e:
|
|
logger.exception("Request failed pushing manifest list: %s", e)
|
|
return False
|
|
|
|
|
|
def copy_filtered_architectures(skopeo, mirror, tag, architecture_filter, verbose_logs=False):
|
|
"""
|
|
Copy only specified architectures from a multi-arch image.
|
|
|
|
Returns tuple of (success, stdout, stderr).
|
|
"""
|
|
from util.repomirror.skopeomirror import SkopeoResults
|
|
|
|
# Get credentials
|
|
username = (
|
|
mirror.external_registry_username.decrypt() if mirror.external_registry_username else None
|
|
)
|
|
password = (
|
|
mirror.external_registry_password.decrypt() if mirror.external_registry_password else None
|
|
)
|
|
|
|
dest_server = (
|
|
app.config.get("REPO_MIRROR_SERVER_HOSTNAME", None) or app.config["SERVER_HOSTNAME"]
|
|
)
|
|
src_image_base = f"docker://{mirror.external_reference}"
|
|
src_image_tag = f"{src_image_base}:{tag}"
|
|
dest_image_base = (
|
|
f"docker://{dest_server}/{mirror.repository.namespace_user.username}/"
|
|
f"{mirror.repository.name}"
|
|
)
|
|
|
|
proxy = mirror.external_registry_config.get("proxy", {})
|
|
src_tls_verify = mirror.external_registry_config.get("verify_tls", True)
|
|
dest_tls_verify = app.config.get("REPO_MIRROR_TLS_VERIFY", True)
|
|
unsigned_images = mirror.external_registry_config.get("unsigned_images", False)
|
|
|
|
# Step 1: Inspect manifest
|
|
with database.CloseForLongOperation(app.config):
|
|
result = skopeo.inspect_raw(
|
|
src_image_tag,
|
|
mirror.skopeo_timeout,
|
|
username=username,
|
|
password=password,
|
|
verify_tls=src_tls_verify,
|
|
proxy=proxy,
|
|
verbose_logs=verbose_logs,
|
|
)
|
|
|
|
if not result.success:
|
|
logger.error("Failed to inspect manifest for %s: %s", src_image_tag, result.stderr)
|
|
return SkopeoResults(False, [], result.stdout, result.stderr)
|
|
|
|
manifest_bytes = result.stdout
|
|
|
|
# Step 2: Check if manifest list
|
|
if not is_manifest_list(manifest_bytes):
|
|
logger.info("Image %s is not a manifest list, using standard copy", src_image_tag)
|
|
with database.CloseForLongOperation(app.config):
|
|
result = skopeo.copy(
|
|
src_image_tag,
|
|
f"{dest_image_base}:{tag}",
|
|
timeout=mirror.skopeo_timeout,
|
|
src_tls_verify=src_tls_verify,
|
|
dest_tls_verify=dest_tls_verify,
|
|
src_username=username,
|
|
src_password=password,
|
|
dest_username=mirror.internal_robot.username,
|
|
dest_password=retrieve_robot_token(mirror.internal_robot),
|
|
proxy=proxy,
|
|
verbose_logs=verbose_logs,
|
|
unsigned_images=unsigned_images,
|
|
)
|
|
return result
|
|
|
|
# Step 3: Filter and validate architectures
|
|
available = get_available_architectures(manifest_bytes)
|
|
matching = [a for a in architecture_filter if a in available]
|
|
missing = [a for a in architecture_filter if a not in available]
|
|
|
|
if missing:
|
|
logger.warning("Architectures not in source %s: %s", src_image_tag, missing)
|
|
if not matching:
|
|
return SkopeoResults(
|
|
False,
|
|
[],
|
|
"",
|
|
f"No matching architectures. Requested: {architecture_filter}, Available: {available}",
|
|
)
|
|
|
|
filtered = filter_manifests_by_architecture(manifest_bytes, matching)
|
|
logger.info("Mirroring %d architectures for %s: %s", len(filtered), src_image_tag, matching)
|
|
|
|
# Step 4: Copy each architecture by digest
|
|
all_stdout, all_stderr = [], []
|
|
for entry in filtered:
|
|
digest = entry.get("digest")
|
|
arch = entry.get("platform", {}).get("architecture", "unknown")
|
|
logger.info("Copying architecture %s (%s)", arch, digest)
|
|
|
|
with database.CloseForLongOperation(app.config):
|
|
result = skopeo.copy_by_digest(
|
|
f"{src_image_base}@{digest}",
|
|
f"{dest_image_base}@{digest}",
|
|
timeout=mirror.skopeo_timeout,
|
|
src_tls_verify=src_tls_verify,
|
|
dest_tls_verify=dest_tls_verify,
|
|
src_username=username,
|
|
src_password=password,
|
|
dest_username=mirror.internal_robot.username,
|
|
dest_password=retrieve_robot_token(mirror.internal_robot),
|
|
proxy=proxy,
|
|
verbose_logs=verbose_logs,
|
|
unsigned_images=unsigned_images,
|
|
)
|
|
all_stdout.append(f"[{arch}] {result.stdout}")
|
|
all_stderr.append(f"[{arch}] {result.stderr}")
|
|
if not result.success:
|
|
logger.error("Failed to copy arch %s: %s", arch, result.stderr)
|
|
return SkopeoResults(False, [], "\n".join(all_stdout), "\n".join(all_stderr))
|
|
|
|
# Step 5: Push original manifest list
|
|
media_type = get_manifest_media_type(manifest_bytes)
|
|
if not push_sparse_manifest_list(mirror, tag, manifest_bytes, media_type):
|
|
return SkopeoResults(
|
|
False, [], "\n".join(all_stdout), "Failed to push sparse manifest list"
|
|
)
|
|
|
|
return SkopeoResults(True, [], "\n".join(all_stdout), "\n".join(all_stderr))
|
|
|
|
|
|
# TODO: better to call 'track_and_log()' https://jira.coreos.com/browse/QUAY-1821
|
|
def emit_log(mirror, log_kind, verb, message, tag=None, tags=None, stdout=None, stderr=None):
|
|
logs_model.log_action(
|
|
log_kind,
|
|
namespace_name=mirror.repository.namespace_user.username,
|
|
repository_name=mirror.repository.name,
|
|
metadata={
|
|
"verb": verb,
|
|
"namespace": mirror.repository.namespace_user.username,
|
|
"repo": mirror.repository.name,
|
|
"message": message,
|
|
"tag": tag,
|
|
"tags": tags,
|
|
"stdout": stdout,
|
|
"stderr": stderr,
|
|
},
|
|
)
|
|
|
|
if log_kind in (
|
|
"repo_mirror_sync_started",
|
|
"repo_mirror_sync_failed",
|
|
"repo_mirror_sync_success",
|
|
):
|
|
spawn_notification(wrap_repository(mirror.repository), log_kind, {"message": message})
|