1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00

feat(endpoints): add tag immutability API layer (PROJQUAY-10159) (#4839)

Expose tag immutability through the existing tag REST API endpoint.
This adds:
- immutable field to PUT /api/v1/repository/{repo}/tag/{tag}
- TagImmutable 409 exception for blocked operations
- immutable field in tag list responses
- Exception handling for DELETE and PUT on immutable tags

Write permission required to lock, admin required to unlock.

Signed-off-by: Brady Pratt <bpratt@redhat.com>
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
jbpratt
2026-01-16 10:09:14 -06:00
committed by GitHub
parent 6cb20da31d
commit a49ccd6333
12 changed files with 423 additions and 24 deletions

View File

@@ -214,6 +214,7 @@ class Tag(
"lifetime_end_ts",
"lifetime_start_ms",
"lifetime_end_ms",
"immutable",
],
)
):
@@ -235,6 +236,7 @@ class Tag(
lifetime_start_ts=tag.lifetime_start_ms // 1000,
lifetime_end_ts=tag.lifetime_end_ms // 1000 if tag.lifetime_end_ms else None,
manifest_digest=manifest_row.digest if manifest_row else tag.manifest.digest,
immutable=tag.immutable,
inputs=dict(
legacy_id_handler=legacy_id_handler,
manifest_row=manifest_row or tag.manifest,

View File

@@ -259,6 +259,14 @@ class RegistryDataInterface(object):
previous expiration timestamp in seconds (if any), and whether the operation succeeded.
"""
@abstractmethod
def change_tag_immutability(self, tag, immutable):
"""
Sets the immutability status of the tag.
Returns a tuple of (previous_immutable_status, success).
"""
@abstractmethod
def reset_security_status(self, manifest_or_legacy_image):
"""

View File

@@ -591,6 +591,7 @@ class OCIModel(RegistryDataInterface):
manifest_id,
is_reversion=is_reversion,
expiration_seconds=expiration_seconds,
raise_on_error=True,
)
return Tag.for_tag(tag, self._legacy_image_id_handler)
@@ -637,6 +638,16 @@ class OCIModel(RegistryDataInterface):
with db_disallow_replica_use():
return oci.tag.change_tag_expiration(tag._db_id, expiration_date)
def change_tag_immutability(self, tag, immutable):
"""
Sets the immutability status of the tag.
Returns a tuple of (previous_immutable_status, success).
"""
with db_disallow_replica_use():
repo_ref = tag.repository
return oci.tag.set_tag_immutable(repo_ref.id, tag.name, immutable)
def reset_security_status(self, manifest_or_legacy_image):
"""
Resets the security status for the given manifest or legacy image, ensuring that it will get

View File

@@ -15,6 +15,7 @@ class TestTag:
lifetime_start_ms=one_hour_ago_ms,
lifetime_end_ts=one_minute_ago_ms // 1000,
lifetime_end_ms=one_minute_ago_ms,
immutable=False,
)
assert tag.expired
@@ -29,6 +30,7 @@ class TestTag:
lifetime_start_ms=one_hour_ago_ms,
lifetime_end_ts=now_ms // 1000,
lifetime_end_ms=now_ms,
immutable=False,
)
assert tag.expired
@@ -44,6 +46,7 @@ class TestTag:
lifetime_start_ms=one_hour_ago_ms,
lifetime_end_ts=one_hour_from_now_ms // 1000,
lifetime_end_ms=one_hour_from_now_ms,
immutable=False,
)
assert not tag.expired
@@ -58,5 +61,6 @@ class TestTag:
lifetime_start_ms=one_hour_ago_ms,
lifetime_end_ts=None,
lifetime_end_ms=None,
immutable=False,
)
assert not tag.expired

View File

@@ -9,6 +9,9 @@ from flask import abort, request
import features
from app import app, docker_v2_signing_key, model_cache, storage
from auth.auth_context import get_authenticated_user
from auth.permissions import AdministerRepositoryPermission
from data.model import ImmutableTagException
from data.model.oci.tag import RetargetTagException
from data.model.pull_statistics import (
get_manifest_pull_statistics,
get_tag_pull_statistics,
@@ -32,7 +35,7 @@ from endpoints.api import (
show_if,
validate_json_request,
)
from endpoints.exception import InvalidRequest, NotFound
from endpoints.exception import InvalidRequest, NotFound, TagImmutable, Unauthorized
from util.names import TAG_ERROR, TAG_REGEX
from util.parsing import truthy_bool
@@ -43,6 +46,9 @@ def _tag_dict(tag):
"reversion": tag.reversion,
}
if features.IMMUTABLE_TAGS:
tag_info["immutable"] = tag.immutable
if tag.lifetime_start_ts and tag.lifetime_start_ts > 0:
tag_info["start_ts"] = tag.lifetime_start_ts
@@ -139,6 +145,10 @@ class RepositoryTag(RepositoryParamResource):
"type": ["number", "null"],
"description": "(If specified) The expiration for the image",
},
"immutable": {
"type": "boolean",
"description": "(If specified) Whether the tag should be immutable. Write permission required to set, admin permission required to unset.",
},
},
},
}
@@ -197,6 +207,37 @@ class RepositoryTag(RepositoryParamResource):
else:
raise InvalidRequest("Could not update tag expiration; Tag has probably changed")
if "immutable" in request.get_json() and features.IMMUTABLE_TAGS:
tag_ref = registry_model.get_repo_tag(repo_ref, tag)
if tag_ref is None:
raise NotFound()
immutable = request.get_json()["immutable"]
# Removing immutability requires admin permission
if not immutable and tag_ref.immutable:
if not AdministerRepositoryPermission(namespace, repository).can():
raise Unauthorized()
previous, ok = registry_model.change_tag_immutability(tag_ref, immutable)
if not ok:
raise InvalidRequest("Could not update tag immutability; Tag has probably changed")
username = get_authenticated_user().username
log_action(
"change_tag_immutability",
namespace,
{
"username": username,
"repo": repository,
"tag": tag,
"namespace": namespace,
"immutable": immutable,
"previous_immutable": previous,
},
repo_name=repository,
)
if "manifest_digest" in request.get_json():
existing_tag = registry_model.get_repo_tag(repo_ref, tag)
@@ -215,9 +256,14 @@ class RepositoryTag(RepositoryParamResource):
)
existing_manifest_digest = existing_manifest.digest if existing_manifest else None
if not registry_model.retarget_tag(
repo_ref, tag, manifest, storage, docker_v2_signing_key
):
try:
if not registry_model.retarget_tag(
repo_ref, tag, manifest, storage, docker_v2_signing_key
):
raise InvalidRequest("Could not move tag")
except ImmutableTagException as e:
raise TagImmutable(e.tag_name, e.operation) from e
except RetargetTagException:
raise InvalidRequest("Could not move tag")
username = get_authenticated_user().username
@@ -252,7 +298,11 @@ class RepositoryTag(RepositoryParamResource):
if repo_ref is None:
raise NotFound()
tag_ref = registry_model.delete_tag(model_cache, repo_ref, tag)
try:
tag_ref = registry_model.delete_tag(model_cache, repo_ref, tag)
except ImmutableTagException as e:
raise TagImmutable(e.tag_name, e.operation) from e
if tag_ref is None:
raise NotFound()
@@ -324,14 +374,19 @@ class RestoreTag(RepositoryParamResource):
if manifest is None:
raise NotFound()
if not registry_model.retarget_tag(
repo_ref,
tag,
manifest,
storage,
docker_v2_signing_key,
is_reversion=True,
):
try:
if not registry_model.retarget_tag(
repo_ref,
tag,
manifest,
storage,
docker_v2_signing_key,
is_reversion=True,
):
raise InvalidRequest("Could not restore tag")
except ImmutableTagException as e:
raise TagImmutable(e.tag_name, e.operation) from e
except RetargetTagException:
raise InvalidRequest("Could not restore tag")
log_action("revert_tag", namespace, log_data, repo_name=repository)

View File

@@ -2,10 +2,11 @@ import pytest
from playhouse.test_utils import assert_query_count
from data.database import Manifest
from data.model.oci.tag import set_tag_immutable
from data.registry_model import registry_model
from endpoints.api.tag import ListRepositoryTags, RepositoryTag, RestoreTag
from endpoints.api.test.shared import conduct_api_call
from endpoints.test.shared import client_with_identity
from endpoints.test.shared import client_with_identity, toggle_feature
from test.fixtures import *
@@ -136,3 +137,187 @@ def test_list_repo_tags_filter(repo_namespace, repo_name, query_count, app):
with client_with_identity("devtable", app) as cl:
params["filter_tag_name"] = "random"
resp = conduct_api_call(cl, ListRepositoryTags, "get", params, None, expected_code=400)
# Tag Immutability Tests
def test_set_tag_immutable_with_write_permission(app):
"""Test setting tag immutable with write permission via RepositoryTag PUT."""
with client_with_identity("devtable", app) as cl:
params = {
"repository": "devtable/simple",
"tag": "latest",
}
request_body = {"immutable": True}
conduct_api_call(cl, RepositoryTag, "put", params, request_body, 201)
# Verify it's now immutable via data model
repo_ref = registry_model.lookup_repository("devtable", "simple")
tag_ref = registry_model.get_repo_tag(repo_ref, "latest")
assert tag_ref.immutable is True
def test_remove_immutability_requires_admin(app):
"""Test that removing immutability requires admin permission."""
repo_ref = registry_model.lookup_repository("devtable", "simple")
# First make the tag immutable via data layer
set_tag_immutable(repo_ref.id, "latest", True)
# devtable is admin on their own repo, so they can remove it
with client_with_identity("devtable", app) as cl:
params = {
"repository": "devtable/simple",
"tag": "latest",
}
request_body = {"immutable": False}
conduct_api_call(cl, RepositoryTag, "put", params, request_body, 201)
# Verify it's now not immutable
tag_ref = registry_model.get_repo_tag(repo_ref, "latest")
assert tag_ref.immutable is False
def test_remove_immutability_denied_for_non_admin(app):
"""Test that users with write but not admin permission cannot remove immutability."""
# Use devtable/shared where 'public' user has write permission but not admin
repo_ref = registry_model.lookup_repository("devtable", "shared")
# Make the tag immutable via data layer
set_tag_immutable(repo_ref.id, "latest", True)
# 'public' user has write permission on devtable/shared but is not admin
# This tests the AdministerRepositoryPermission check, not @require_repo_write
with client_with_identity("public", app) as cl:
params = {
"repository": "devtable/shared",
"tag": "latest",
}
request_body = {"immutable": False}
# User with write but not admin should get 403 from the admin permission check
conduct_api_call(cl, RepositoryTag, "put", params, request_body, 403)
# Verify tag is still immutable
resp_check = registry_model.get_repo_tag(repo_ref, "latest")
assert resp_check.immutable is True
def test_list_repo_tags_includes_immutable(app):
"""Test that tag list includes immutable field."""
with toggle_feature("IMMUTABLE_TAGS", True):
with client_with_identity("devtable", app) as cl:
params = {"repository": "devtable/simple"}
tags = conduct_api_call(cl, ListRepositoryTags, "get", params).json["tags"]
for tag in tags:
assert "immutable" in tag
assert isinstance(tag["immutable"], bool)
def test_delete_immutable_tag_returns_409(app):
"""Test DELETE on immutable tag returns 409."""
with toggle_feature("IMMUTABLE_TAGS", True):
repo_ref = registry_model.lookup_repository("devtable", "simple")
# Make the tag immutable
set_tag_immutable(repo_ref.id, "latest", True)
with client_with_identity("devtable", app) as cl:
params = {
"repository": "devtable/simple",
"tag": "latest",
}
resp = conduct_api_call(cl, RepositoryTag, "delete", params, None, 409)
assert resp.json["error_type"] == "tag_immutable"
assert resp.json["title"] == "tag_immutable"
def test_retarget_immutable_tag_returns_409(app):
"""Test PUT (retarget) on immutable tag returns 409."""
with toggle_feature("IMMUTABLE_TAGS", True):
repo_ref = registry_model.lookup_repository("devtable", "simple")
tag_ref = registry_model.get_repo_tag(repo_ref, "latest")
# Make the tag immutable
set_tag_immutable(repo_ref.id, "latest", True)
with client_with_identity("devtable", app) as cl:
params = {
"repository": "devtable/simple",
"tag": "latest",
}
request_body = {"manifest_digest": tag_ref.manifest.digest}
resp = conduct_api_call(cl, RepositoryTag, "put", params, request_body, 409)
assert resp.json["error_type"] == "tag_immutable"
assert resp.json["title"] == "tag_immutable"
def test_restore_immutable_tag_returns_409(app):
"""Test restoring immutable tag returns 409."""
with toggle_feature("IMMUTABLE_TAGS", True):
repo_ref = registry_model.lookup_repository("devtable", "simple")
tag_ref = registry_model.get_repo_tag(repo_ref, "latest")
# Make the tag immutable
set_tag_immutable(repo_ref.id, "latest", True)
with client_with_identity("devtable", app) as cl:
params = {
"repository": "devtable/simple",
"tag": "latest",
}
request_body = {"manifest_digest": tag_ref.manifest.digest}
resp = conduct_api_call(cl, RestoreTag, "post", params, request_body, 409)
assert resp.json["error_type"] == "tag_immutable"
assert resp.json["title"] == "tag_immutable"
def test_set_immutability_not_found(app):
"""Test 404 for setting immutability on non-existent tag."""
with client_with_identity("devtable", app) as cl:
params = {
"repository": "devtable/simple",
"tag": "nonexistent",
}
request_body = {"immutable": True}
conduct_api_call(cl, RepositoryTag, "put", params, request_body, 404)
def test_set_immutability_idempotent(app):
"""Test setting same immutability status is idempotent."""
repo_ref = registry_model.lookup_repository("devtable", "simple")
with client_with_identity("devtable", app) as cl:
params = {
"repository": "devtable/simple",
"tag": "latest",
}
# Set to immutable
request_body = {"immutable": True}
conduct_api_call(cl, RepositoryTag, "put", params, request_body, 201)
# Verify it's immutable
tag_ref = registry_model.get_repo_tag(repo_ref, "latest")
assert tag_ref.immutable is True
# Set to immutable again - should be idempotent
conduct_api_call(cl, RepositoryTag, "put", params, request_body, 201)
# Still immutable
tag_ref = registry_model.get_repo_tag(repo_ref, "latest")
assert tag_ref.immutable is True

View File

@@ -17,6 +17,7 @@ class ApiErrorType(Enum):
exceeds_license = "exceeds_license"
not_found = "not_found"
downstream_issue = "downstream_issue"
tag_immutable = "tag_immutable"
ERROR_DESCRIPTION = {
@@ -30,6 +31,7 @@ ERROR_DESCRIPTION = {
ApiErrorType.exceeds_license.value: "The action was refused because the current license does not allow it.",
ApiErrorType.not_found.value: "The resource was not found.",
ApiErrorType.downstream_issue.value: "An error occurred in a downstream service.",
ApiErrorType.tag_immutable.value: "The tag is immutable and cannot be modified or deleted.",
}
@@ -141,3 +143,9 @@ class NotFound(ApiException):
class DownstreamIssue(ApiException):
def __init__(self, error_description, payload=None):
ApiException.__init__(self, ApiErrorType.downstream_issue, 520, error_description, payload)
class TagImmutable(ApiException):
def __init__(self, tag_name, operation, payload=None):
error_description = f"Cannot {operation} immutable tag '{tag_name}'"
ApiException.__init__(self, ApiErrorType.tag_immutable, 409, error_description, payload)

View File

@@ -6,6 +6,8 @@ from flask import abort, jsonify, make_response, request, session
from app import docker_v2_signing_key, model_cache, storage
from auth.decorators import process_auth
from auth.permissions import ModifyRepositoryPermission, ReadRepositoryPermission
from data.model import ImmutableTagException
from data.model.oci.tag import RetargetTagException
from data.registry_model import registry_model
from data.registry_model.manifestbuilder import lookup_manifest_builder
from endpoints.decorators import (
@@ -102,12 +104,17 @@ def put_tag(namespace_name, repo_name, tag):
if legacy_image is None:
abort(400)
if (
registry_model.retarget_tag(
repository_ref, tag, legacy_image, storage, docker_v2_signing_key
)
is None
):
try:
if (
registry_model.retarget_tag(
repository_ref, tag, legacy_image, storage, docker_v2_signing_key
)
is None
):
abort(400)
except ImmutableTagException:
abort(409, f"Tag '{tag}' is immutable and cannot be overwritten")
except RetargetTagException:
abort(400)
return make_response("Created", 200)
@@ -129,8 +136,11 @@ def delete_tag(namespace_name, repo_name, tag):
)
if permission.can() and repository_ref is not None:
if not registry_model.delete_tag(model_cache, repository_ref, tag):
abort(404)
try:
if not registry_model.delete_tag(model_cache, repository_ref, tag):
abort(404)
except ImmutableTagException:
abort(409, f"Tag '{tag}' is immutable and cannot be deleted")
track_and_log("delete_tag", repository_ref, tag=tag)
return make_response("Deleted", 200)

View File

@@ -121,6 +121,13 @@ class TagAlreadyExists(V2RegistryException):
)
class TagImmutable(V2RegistryException):
def __init__(self, detail=None):
super(TagImmutable, self).__init__(
"TAG_IMMUTABLE", "tag is immutable and cannot be overwritten", detail, 409
)
class TagInvalid(V2RegistryException):
def __init__(self, detail=None):
super(TagInvalid, self).__init__("TAG_INVALID", "manifest tag did not match URI", detail)

View File

@@ -8,6 +8,7 @@ from app import app, model_cache, pullmetrics, storage
from auth.registry_jwt_auth import process_registry_jwt_auth
from data.database import db_disallow_replica_use
from data.model import (
ImmutableTagException,
ManifestDoesNotExist,
QuotaExceededException,
RepositoryDoesNotExist,
@@ -40,6 +41,7 @@ from endpoints.v2.errors import (
NameUnknown,
QuotaExceeded,
TagExpired,
TagImmutable,
)
from image.docker.schema1 import (
DOCKER_SCHEMA1_CONTENT_TYPES,
@@ -458,7 +460,11 @@ def delete_manifest_by_tag(namespace_name, repo_name, manifest_ref):
if tag is None:
raise ManifestUnknown()
deleted_tag = registry_model.delete_tag(model_cache, repository_ref, manifest_ref)
try:
deleted_tag = registry_model.delete_tag(model_cache, repository_ref, manifest_ref)
except ImmutableTagException as ite:
raise TagImmutable(detail={"message": f"tag '{ite.tag_name}' is immutable"}) from ite
if not deleted_tag:
raise ManifestUnknown()
@@ -520,6 +526,8 @@ def _write_manifest(
raise ManifestInvalid(detail={"message": str(cme)})
except RetargetTagException as rte:
raise ManifestInvalid(detail={"message": str(rte)})
except ImmutableTagException as ite:
raise TagImmutable(detail={"message": f"tag '{ite.tag_name}' is immutable"}) from ite
except QuotaExceededException as qee:
raise QuotaExceeded()

View File

@@ -10,8 +10,9 @@ from app import app as realapp
from app import instance_keys
from auth.auth_context_type import ValidatedAuthContext
from data import model
from data.model.oci.tag import set_tag_immutable
from data.registry_model import registry_model
from endpoints.test.shared import conduct_call
from endpoints.test.shared import conduct_call, toggle_feature
from image.docker.schema2.test.test_config import (
CONFIG_BYTES,
CONFIG_DIGEST,
@@ -342,3 +343,100 @@ def test_fetch_manifest_by_tagname_tracks_pull_metrics(client, app):
call_args = mock_event.track_tag_pull.call_args
# Args: repository_ref, tag_name, manifest_digest
assert call_args[0][1] == "latest" # tag_name
def test_delete_manifest_by_tag_immutable_returns_409(client, app):
"""Test that DELETE on an immutable tag returns 409 with TAG_IMMUTABLE error."""
with toggle_feature("IMMUTABLE_TAGS", True):
repo_ref = registry_model.lookup_repository("devtable", "simple")
# Make the tag immutable
set_tag_immutable(repo_ref.id, "latest", True)
params = {
"repository": "devtable/simple",
"manifest_ref": "latest",
}
user = model.user.get_user("devtable")
access = [
{
"type": "repository",
"name": "devtable/simple",
"actions": ["pull", "push"],
}
]
context, subject = build_context_and_subject(ValidatedAuthContext(user=user))
token = generate_bearer_token(
realapp.config["SERVER_HOSTNAME"], subject, context, access, 600, instance_keys
)
headers = {
"Authorization": "Bearer %s" % token,
}
rv = conduct_call(
client,
"v2.delete_manifest_by_tag",
url_for,
"DELETE",
params,
expected_code=409,
headers=headers,
)
# Verify TAG_IMMUTABLE error in response
response_data = json.loads(rv.data)
assert "errors" in response_data
assert response_data["errors"][0]["code"] == "TAG_IMMUTABLE"
def test_write_manifest_by_tagname_immutable_returns_409(client, app):
"""Test that PUT manifest on an immutable tag returns 409 with TAG_IMMUTABLE error."""
with toggle_feature("IMMUTABLE_TAGS", True):
repo_ref = registry_model.lookup_repository("devtable", "simple")
tag = registry_model.get_repo_tag(repo_ref, "latest")
manifest = registry_model.get_manifest_for_tag(tag)
# Make the tag immutable
set_tag_immutable(repo_ref.id, "latest", True)
params = {
"repository": "devtable/simple",
"manifest_ref": "latest",
}
user = model.user.get_user("devtable")
access = [
{
"type": "repository",
"name": "devtable/simple",
"actions": ["pull", "push"],
}
]
context, subject = build_context_and_subject(ValidatedAuthContext(user=user))
token = generate_bearer_token(
realapp.config["SERVER_HOSTNAME"], subject, context, access, 600, instance_keys
)
headers = {
"Authorization": "Bearer %s" % token,
}
rv = conduct_call(
client,
"v2.write_manifest_by_tagname",
url_for,
"PUT",
params,
expected_code=409,
headers=headers,
raw_body=manifest.internal_manifest_bytes.as_encoded_str(),
)
# Verify TAG_IMMUTABLE error in response
response_data = json.loads(rv.data)
assert "errors" in response_data
assert response_data["errors"][0]["code"] == "TAG_IMMUTABLE"

View File

@@ -140,6 +140,9 @@ NONSUPERUSER_TEAM_SYNCING_SETUP: FeatureNameValue
# Feature Flag: Whether users can view and change their tag expiration.
CHANGE_TAG_EXPIRATION: FeatureNameValue
# Feature Flag: Whether tag immutability is enabled.
IMMUTABLE_TAGS: FeatureNameValue
# Feature Flag: If enabled, users can create and use app specific tokens to login via the CLI.
APP_SPECIFIC_TOKENS: FeatureNameValue