1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/tag.py
jbpratt a49ccd6333 feat(endpoints): add tag immutability API layer (PROJQUAY-10159) (#4839)
Expose tag immutability through the existing tag REST API endpoint.
This adds:
- immutable field to PUT /api/v1/repository/{repo}/tag/{tag}
- TagImmutable 409 exception for blocked operations
- immutable field in tag list responses
- Exception handling for DELETE and PUT on immutable tags

Write permission required to lock, admin required to unlock.

Signed-off-by: Brady Pratt <bpratt@redhat.com>
Co-authored-by: Claude <noreply@anthropic.com>
2026-01-16 11:09:14 -05:00

529 lines
19 KiB
Python

"""
Manage the tags of a repository.
"""
from datetime import datetime
from flask import abort, request
import features
from app import app, docker_v2_signing_key, model_cache, storage
from auth.auth_context import get_authenticated_user
from auth.permissions import AdministerRepositoryPermission
from data.model import ImmutableTagException
from data.model.oci.tag import RetargetTagException
from data.model.pull_statistics import (
get_manifest_pull_statistics,
get_tag_pull_statistics,
)
from data.registry_model import registry_model
from endpoints.api import RepositoryParamResource
from endpoints.api import abort as custom_abort
from endpoints.api import (
disallow_for_app_repositories,
disallow_for_non_normal_repositories,
disallow_for_user_namespace,
format_date,
log_action,
nickname,
parse_args,
path_param,
query_param,
require_repo_read,
require_repo_write,
resource,
show_if,
validate_json_request,
)
from endpoints.exception import InvalidRequest, NotFound, TagImmutable, Unauthorized
from util.names import TAG_ERROR, TAG_REGEX
from util.parsing import truthy_bool
def _tag_dict(tag):
tag_info = {
"name": tag.name,
"reversion": tag.reversion,
}
if features.IMMUTABLE_TAGS:
tag_info["immutable"] = tag.immutable
if tag.lifetime_start_ts and tag.lifetime_start_ts > 0:
tag_info["start_ts"] = tag.lifetime_start_ts
if tag.lifetime_end_ts and tag.lifetime_end_ts > 0:
tag_info["end_ts"] = tag.lifetime_end_ts
tag_info["manifest_digest"] = tag.manifest_digest
tag_info["is_manifest_list"] = tag.manifest.is_manifest_list
tag_info["size"] = tag.manifest_layers_size
if tag.lifetime_start_ts and tag.lifetime_start_ts > 0:
last_modified = format_date(datetime.utcfromtimestamp(tag.lifetime_start_ts))
tag_info["last_modified"] = last_modified
if tag.lifetime_end_ts is not None:
expiration = format_date(datetime.utcfromtimestamp(tag.lifetime_end_ts))
tag_info["expiration"] = expiration
return tag_info
@resource("/v1/repository/<apirepopath:repository>/tag/")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
class ListRepositoryTags(RepositoryParamResource):
"""
Resource for listing full repository tag history, alive *and dead*.
"""
@require_repo_read(allow_for_superuser=True, allow_for_global_readonly_superuser=True)
@disallow_for_app_repositories
@parse_args()
@query_param("specificTag", "Filters the tags to the specific tag.", type=str, default="")
@query_param(
"filter_tag_name",
"Syntax: <op>:<name> Filters the tag names based on the operation."
"<op> can be 'like' or 'eq'.",
type=str,
default="",
)
@query_param(
"limit", "Limit to the number of results to return per page. Max 100.", type=int, default=50
)
@query_param("page", "Page index for the results. Default 1.", type=int, default=1)
@query_param("onlyActiveTags", "Filter to only active tags.", type=truthy_bool, default=False)
@nickname("listRepoTags")
def get(self, namespace, repository, parsed_args):
specific_tag = parsed_args.get("specificTag") or None
filter_tag_name = parsed_args.get("filter_tag_name") or None
page = max(1, parsed_args.get("page", 1))
limit = min(100, max(1, parsed_args.get("limit", 50)))
active_tags_only = parsed_args.get("onlyActiveTags")
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
try:
history, has_more = registry_model.list_repository_tag_history(
repo_ref,
page=page,
size=limit,
specific_tag_name=specific_tag,
active_tags_only=active_tags_only,
filter_tag_name=filter_tag_name,
)
except ValueError as error:
print("error", error)
custom_abort(400, message=str(error))
return {
"tags": [_tag_dict(tag) for tag in history],
"page": page,
"has_additional": has_more,
}
@resource("/v1/repository/<apirepopath:repository>/tag/<tag>")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("tag", "The name of the tag")
class RepositoryTag(RepositoryParamResource):
"""
Resource for managing repository tags.
"""
schemas = {
"ChangeTag": {
"type": "object",
"description": "Makes changes to a specific tag",
"properties": {
"manifest_digest": {
"type": ["string", "null"],
"description": "(If specified) The manifest digest to which the tag should point",
},
"expiration": {
"type": ["number", "null"],
"description": "(If specified) The expiration for the image",
},
"immutable": {
"type": "boolean",
"description": "(If specified) Whether the tag should be immutable. Write permission required to set, admin permission required to unset.",
},
},
},
}
@require_repo_write(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("changeTag")
@validate_json_request("ChangeTag")
def put(self, namespace, repository, tag):
"""
Change which image a tag points to or create a new tag.
"""
if not TAG_REGEX.match(tag):
abort(400, TAG_ERROR)
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
if "expiration" in request.get_json():
tag_ref = registry_model.get_repo_tag(repo_ref, tag)
if tag_ref is None:
raise NotFound()
expiration = request.get_json().get("expiration")
expiration_date = None
if expiration is not None:
try:
expiration_date = datetime.utcfromtimestamp(float(expiration))
except ValueError:
abort(400)
if expiration_date <= datetime.now():
abort(400)
existing_end_ts, ok = registry_model.change_repository_tag_expiration(
tag_ref, expiration_date
)
if ok:
if not (existing_end_ts is None and expiration_date is None):
log_action(
"change_tag_expiration",
namespace,
{
"username": get_authenticated_user().username,
"repo": repository,
"tag": tag,
"namespace": namespace,
"expiration_date": expiration_date,
"old_expiration_date": existing_end_ts,
},
repo_name=repository,
)
else:
raise InvalidRequest("Could not update tag expiration; Tag has probably changed")
if "immutable" in request.get_json() and features.IMMUTABLE_TAGS:
tag_ref = registry_model.get_repo_tag(repo_ref, tag)
if tag_ref is None:
raise NotFound()
immutable = request.get_json()["immutable"]
# Removing immutability requires admin permission
if not immutable and tag_ref.immutable:
if not AdministerRepositoryPermission(namespace, repository).can():
raise Unauthorized()
previous, ok = registry_model.change_tag_immutability(tag_ref, immutable)
if not ok:
raise InvalidRequest("Could not update tag immutability; Tag has probably changed")
username = get_authenticated_user().username
log_action(
"change_tag_immutability",
namespace,
{
"username": username,
"repo": repository,
"tag": tag,
"namespace": namespace,
"immutable": immutable,
"previous_immutable": previous,
},
repo_name=repository,
)
if "manifest_digest" in request.get_json():
existing_tag = registry_model.get_repo_tag(repo_ref, tag)
manifest_digest = None
manifest_digest = request.get_json()["manifest_digest"]
manifest = registry_model.lookup_manifest_by_digest(
repo_ref, manifest_digest, require_available=True
)
if manifest is None:
raise NotFound()
existing_manifest = (
registry_model.get_manifest_for_tag(existing_tag) if existing_tag else None
)
existing_manifest_digest = existing_manifest.digest if existing_manifest else None
try:
if not registry_model.retarget_tag(
repo_ref, tag, manifest, storage, docker_v2_signing_key
):
raise InvalidRequest("Could not move tag")
except ImmutableTagException as e:
raise TagImmutable(e.tag_name, e.operation) from e
except RetargetTagException:
raise InvalidRequest("Could not move tag")
username = get_authenticated_user().username
log_action(
"move_tag" if existing_tag else "create_tag",
namespace,
{
"username": username,
"repo": repository,
"tag": tag,
"namespace": namespace,
"manifest_digest": manifest_digest,
"original_manifest_digest": existing_manifest_digest,
},
repo_name=repository,
)
return "Updated", 201
@require_repo_write(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("deleteFullTag")
def delete(self, namespace, repository, tag):
"""
Delete the specified repository tag.
"""
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
try:
tag_ref = registry_model.delete_tag(model_cache, repo_ref, tag)
except ImmutableTagException as e:
raise TagImmutable(e.tag_name, e.operation) from e
if tag_ref is None:
raise NotFound()
username = get_authenticated_user().username
log_action(
"delete_tag",
namespace,
{"username": username, "repo": repository, "namespace": namespace, "tag": tag},
repo_name=repository,
)
return "", 204
@resource("/v1/repository/<apirepopath:repository>/tag/<tag>/restore")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("tag", "The name of the tag")
class RestoreTag(RepositoryParamResource):
"""
Resource for restoring a repository tag back to a previous image.
"""
schemas = {
"RestoreTag": {
"type": "object",
"description": "Restores a tag to a specific image",
"properties": {
"manifest_digest": {
"type": "string",
"description": "If specified, the manifest digest that should be used",
},
},
},
}
@require_repo_write(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("restoreTag")
@validate_json_request("RestoreTag")
def post(self, namespace, repository, tag):
"""
Restores a repository tag back to a previous image in the repository.
"""
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
# Restore the tag back to the previous image.
manifest_digest = request.get_json().get("manifest_digest", None)
if manifest_digest is None:
raise InvalidRequest("Missing manifest_digest")
# Data for logging the reversion/restoration.
username = get_authenticated_user().username
log_data = {
"username": username,
"repo": repository,
"tag": tag,
"manifest_digest": manifest_digest,
}
manifest = registry_model.lookup_manifest_by_digest(
repo_ref, manifest_digest, allow_dead=True, require_available=True
)
if manifest is None:
raise NotFound()
try:
if not registry_model.retarget_tag(
repo_ref,
tag,
manifest,
storage,
docker_v2_signing_key,
is_reversion=True,
):
raise InvalidRequest("Could not restore tag")
except ImmutableTagException as e:
raise TagImmutable(e.tag_name, e.operation) from e
except RetargetTagException:
raise InvalidRequest("Could not restore tag")
log_action("revert_tag", namespace, log_data, repo_name=repository)
return {}
@resource("/v1/repository/<apirepopath:repository>/tag/<tag>/expire")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("tag", "The name of the tag")
@show_if(app.config.get("PERMANENTLY_DELETE_TAGS", True))
class TagTimeMachineDelete(RepositoryParamResource):
"""
Resource for updating the expiry of tags outside the time machine window
"""
schemas = {
"ExpireTag": {
"type": "object",
"description": "Removes tag from the time machine window",
"properties": {
"manifest_digest": {
"type": "string",
"description": "Required if is_alive set to false. If specified, the manifest digest that should be used. Ignored when setting alive to true.",
},
"include_submanifests": {
"type": "boolean",
"description": "If set to true, expire the sub-manifests as well",
},
"is_alive": {
"type": "boolean",
"description": "If true, set the expiry of the matching alive tag outside the time machine window. If false set the expiry of any expired tags with the same tag and manifest outside the time machine window.",
},
},
},
}
@require_repo_write(allow_for_superuser=True)
@disallow_for_app_repositories
@nickname("removeTagFromTimemachine")
@validate_json_request("ExpireTag")
def post(self, namespace, repository, tag):
"""
Updates any expired tags with the matching name and manifest with an expiry outside the time machine window
"""
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
alive = request.get_json().get("is_alive", False)
manifest_digest = request.get_json().get("manifest_digest", None)
if not alive and manifest_digest is None:
raise InvalidRequest("manifest_digest required when is_alive set to false")
manifest_ref = None
if alive:
existing_tag = registry_model.get_repo_tag(repo_ref, tag)
if existing_tag is None:
raise NotFound()
manifest_ref = existing_tag.manifest
else:
manifest_ref = registry_model.lookup_manifest_by_digest(
repo_ref, manifest_digest, allow_dead=True
)
if manifest_ref is None:
raise NotFound()
include_submanifests = request.get_json().get("include_submanifests", False)
tags_updated = registry_model.remove_tag_from_timemachine(
repo_ref, tag, manifest_ref, include_submanifests, alive
)
if not tags_updated:
raise NotFound()
username = get_authenticated_user().username
log_action(
"permanently_delete_tag",
namespace,
{
"username": username,
"repo": repository,
"namespace": namespace,
"tag": tag,
"manifest_digest": manifest_ref.digest,
},
repo_name=repository,
)
return "", 200
@resource("/v1/repository/<apirepopath:repository>/tag/<tag>/pull_statistics")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("tag", "The name of the tag")
@show_if(features.IMAGE_PULL_STATS)
class RepositoryTagPullStatistics(RepositoryParamResource):
"""
Resource for retrieving pull statistics for a specific repository tag.
"""
@require_repo_read(allow_for_superuser=True, allow_for_global_readonly_superuser=True)
@disallow_for_app_repositories
@nickname("getTagPullStatistics")
def get(self, namespace, repository, tag):
"""
Get pull statistics for a specific tag.
"""
repo_ref = registry_model.lookup_repository(namespace, repository)
if repo_ref is None:
raise NotFound()
# Get the tag reference
tag_ref = registry_model.get_repo_tag(repo_ref, tag)
if tag_ref is None:
raise NotFound()
# Get pull statistics from database
tag_stats = get_tag_pull_statistics(repo_ref.id, tag)
manifest_stats = get_manifest_pull_statistics(repo_ref.id, tag_ref.manifest_digest)
# Return statistics - handle cases where tag_stats or manifest_stats may be None
# Note: manifest_stats can exist even when tag_stats doesn't (digest pulls only)
last_tag_pull = tag_stats.get("last_pull_date") if tag_stats else None
last_manifest_pull = manifest_stats.get("last_pull_date") if manifest_stats else None
return {
"tag_name": tag,
"tag_pull_count": tag_stats.get("pull_count", 0) if tag_stats else 0,
"last_tag_pull_date": format_date(last_tag_pull) if last_tag_pull else None,
"current_manifest_digest": (
tag_stats.get("current_manifest_digest", tag_ref.manifest_digest)
if tag_stats
else tag_ref.manifest_digest
),
"manifest_pull_count": manifest_stats.get("pull_count", 0) if manifest_stats else 0,
"last_manifest_pull_date": (
format_date(last_manifest_pull) if last_manifest_pull else None
),
}