1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00

Merge pull request #119 from josephschorr/joseph.schorr/PROJQUAY-124/async-repo-delete

Change repository deletion to occur in the background
This commit is contained in:
Joseph Schorr
2020-01-28 13:21:47 -05:00
committed by GitHub
23 changed files with 228 additions and 34 deletions

3
app.py
View File

@@ -288,6 +288,8 @@ export_action_logs_queue = WorkQueue(
app.config["EXPORT_ACTION_LOGS_QUEUE_NAME"], tf, has_namespace=True
)
repository_gc_queue = WorkQueue(app.config["REPOSITORY_GC_QUEUE_NAME"], tf, has_namespace=True)
# Note: We set `has_namespace` to `False` here, as we explicitly want this queue to not be emptied
# when a namespace is marked for deletion.
namespace_gc_queue = WorkQueue(app.config["NAMESPACE_GC_QUEUE_NAME"], tf, has_namespace=False)
@@ -298,6 +300,7 @@ all_queues = [
notification_queue,
secscan_notification_queue,
chunk_cleanup_queue,
repository_gc_queue,
namespace_gc_queue,
]

View File

@@ -26,6 +26,7 @@ def default_services():
"labelbackfillworker": {"autostart": "true"},
"logrotateworker": {"autostart": "true"},
"namespacegcworker": {"autostart": "true"},
"repositorygcworker": {"autostart": "true"},
"notificationworker": {"autostart": "true"},
"queuecleanupworker": {"autostart": "true"},
"repositoryactioncounter": {"autostart": "true"},

View File

@@ -104,6 +104,14 @@ autostart = {{ config['logrotateworker']['autostart'] }}
stdout_events_enabled = true
stderr_events_enabled = true
[program:repositorygcworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s
command=python -m workers.repositorygcworker
autostart = {{ config['repositorygcworker']['autostart'] }}
stdout_events_enabled = true
stderr_events_enabled = true
[program:namespacegcworker]
environment=
PYTHONPATH=%(ENV_QUAYDIR)s

View File

@@ -261,6 +261,7 @@ class DefaultConfig(ImmutableConfig):
SECSCAN_NOTIFICATION_QUEUE_NAME = "security_notification"
CHUNK_CLEANUP_QUEUE_NAME = "chunk_cleanup"
NAMESPACE_GC_QUEUE_NAME = "namespacegc"
REPOSITORY_GC_QUEUE_NAME = "repositorygc"
EXPORT_ACTION_LOGS_QUEUE_NAME = "exportactionlogs"
# Super user config. Note: This MUST BE an empty list for the default config.

View File

@@ -766,11 +766,14 @@ class RepositoryState(IntEnum):
NORMAL: Regular repo where all actions are possible
READ_ONLY: Only read actions, such as pull, are allowed regardless of specific user permissions
MIRROR: Equivalent to READ_ONLY except that mirror robot has write permission
MARKED_FOR_DELETION: Indicates the repository has been marked for deletion and should be hidden
and un-usable.
"""
NORMAL = 0
READ_ONLY = 1
MIRROR = 2
MARKED_FOR_DELETION = 3
class Repository(BaseModel):
@@ -791,10 +794,12 @@ class Repository(BaseModel):
(("namespace_user", "name"), True),
)
def delete_instance(self, recursive=False, delete_nullable=False):
def delete_instance(self, recursive=False, delete_nullable=False, force=False):
if not recursive:
raise RuntimeError("Non-recursive delete on repository.")
assert force or self.state == RepositoryState.MARKED_FOR_DELETION
# These models don't need to use transitive deletes, because the referenced objects
# are cleaned up directly
skip_transitive_deletes = (
@@ -811,6 +816,7 @@ class Repository(BaseModel):
RepositorySearchScore,
RepoMirrorConfig,
RepoMirrorRule,
DeletedRepository,
}
| appr_classes
| v22_classes
@@ -826,6 +832,13 @@ class RepositorySearchScore(BaseModel):
last_updated = DateTimeField(null=True)
class DeletedRepository(BaseModel):
repository = ForeignKeyField(Repository, unique=True)
marked = DateTimeField(default=datetime.now)
original_name = CharField(index=True)
queue_id = CharField(null=True, index=True)
class Star(BaseModel):
user = ForeignKeyField(User)
repository = ForeignKeyField(Repository)

View File

@@ -0,0 +1,51 @@
"""Add new DeletedRepository tracking table
Revision ID: 4fd6b8463eb2
Revises: 34c8ef052ec9
Create Date: 2019-12-22 14:58:34.375692
"""
# revision identifiers, used by Alembic.
revision = "4fd6b8463eb2"
down_revision = "34c8ef052ec9"
from alembic import op as original_op
from data.migrations.progress import ProgressWrapper
import sqlalchemy as sa
def upgrade(tables, tester, progress_reporter):
op = ProgressWrapper(original_op, progress_reporter)
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"deletedrepository",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("repository_id", sa.Integer(), nullable=False),
sa.Column("marked", sa.DateTime(), nullable=False),
sa.Column("original_name", sa.String(length=255), nullable=False),
sa.Column("queue_id", sa.String(length=255), nullable=True),
sa.ForeignKeyConstraint(
["repository_id"],
["repository.id"],
name=op.f("fk_deletedrepository_repository_id_repository"),
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_deletedrepository")),
)
op.create_index(
"deletedrepository_original_name", "deletedrepository", ["original_name"], unique=False
)
op.create_index("deletedrepository_queue_id", "deletedrepository", ["queue_id"], unique=False)
op.create_index(
"deletedrepository_repository_id", "deletedrepository", ["repository_id"], unique=True
)
# ### end Alembic commands ###
def downgrade(tables, tester, progress_reporter):
op = ProgressWrapper(original_op, progress_reporter)
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("deletedrepository")
# ### end Alembic commands ###

View File

@@ -9,6 +9,7 @@ from data.model import DataModelException, config
from data.readreplica import ReadOnlyModeException
from data.database import (
Repository,
RepositoryState,
User,
Team,
TeamMember,
@@ -54,6 +55,7 @@ def get_existing_repository(namespace_name, repository_name, for_update=False, k
Repository.select(Repository, Namespace)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == namespace_name, Repository.name == repository_name)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
)
if kind_filter:

View File

@@ -7,12 +7,14 @@ from data.database import ApprTag
from data.database import (
Tag,
Manifest,
DeletedRepository,
ManifestBlob,
ManifestChild,
ManifestLegacyImage,
ManifestLabel,
Label,
TagManifestLabel,
RepositoryState,
)
from data.database import RepositoryTag, TagManifest, Image, DerivedStorageForImage
from data.database import TagManifestToManifest, TagToRepositoryTag, TagManifestLabelMap
@@ -53,18 +55,13 @@ class _GarbageCollectorContext(object):
self.blob_ids.remove(blob_id)
def purge_repository(namespace_name, repository_name):
def purge_repository(repo, force=False):
""" Completely delete all traces of the repository. Will return True upon
complete success, and False upon partial or total failure. Garbage
collection is incremental and repeatable, so this return value does
not need to be checked or responded to.
"""
try:
repo = _basequery.get_existing_repository(namespace_name, repository_name)
except Repository.DoesNotExist:
return False
assert repo.name == repository_name
assert repo.state == RepositoryState.MARKED_FOR_DELETION or force
# Delete the repository of all Appr-referenced entries.
# Note that new-model Tag's must be deleted in *two* passes, as they can reference parent tags,
@@ -84,19 +81,17 @@ def purge_repository(namespace_name, repository_name):
assert ManifestBlob.select().where(ManifestBlob.repository == repo).count() == 0
assert Image.select().where(Image.repository == repo).count() == 0
# Delete any marker rows for the repository.
DeletedRepository.delete().where(DeletedRepository.repository == repo).execute()
# Delete the rest of the repository metadata.
try:
# Make sure the repository still exists.
fetched = _basequery.get_existing_repository(namespace_name, repository_name)
fetched = Repository.get(id=repo.id)
except Repository.DoesNotExist:
return False
fetched.delete_instance(recursive=True, delete_nullable=False)
# Run callbacks
for callback in config.repo_cleanup_callbacks:
callback(namespace_name, repository_name)
fetched.delete_instance(recursive=True, delete_nullable=False, force=force)
return True

View File

@@ -1,5 +1,7 @@
import logging
import random
import json
import uuid
from enum import Enum
from datetime import timedelta, datetime
@@ -17,6 +19,8 @@ from data.model import (
)
from data.database import (
Repository,
RepositoryState,
DeletedRepository,
Namespace,
RepositoryTag,
Star,
@@ -210,6 +214,7 @@ def get_user_starred_repositories(user, kind_filter="image"):
.switch(Repository)
.join(Visibility)
.where(Star.user == user, Repository.kind == repo_kind)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
)
return query
@@ -266,6 +271,7 @@ def get_visible_repositories(
)
.switch(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
)
user_id = None
@@ -419,6 +425,7 @@ def _get_sorted_matching_repositories(
query = (
Repository.select(*select_fields)
.join(RepositorySearchScore)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.order_by(RepositorySearchScore.score.desc())
)
else:
@@ -443,6 +450,7 @@ def _get_sorted_matching_repositories(
Repository.select(*select_fields)
.join(RepositorySearchScore)
.where(clause)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.order_by(SQL("score").desc())
)
@@ -483,6 +491,7 @@ def repository_is_public(namespace_name, repository_name):
Repository.name == repository_name,
Visibility.name == "public",
)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.get()
)
return True
@@ -510,6 +519,7 @@ def get_email_authorized_for_repo(namespace, repository, email):
Repository.name == repository,
RepositoryAuthorizedEmail.email == email,
)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.get()
)
except RepositoryAuthorizedEmail.DoesNotExist:
@@ -532,6 +542,7 @@ def confirm_email_authorization_for_repo(code):
.join(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(RepositoryAuthorizedEmail.code == code)
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.get()
)
except RepositoryAuthorizedEmail.DoesNotExist:
@@ -566,3 +577,37 @@ def get_repository_state(namespace_name, repository_name):
def set_repository_state(repo, state):
repo.state = state
repo.save()
def mark_repository_for_deletion(namespace_name, repository_name, repository_gc_queue):
""" Marks a repository for future deletion in the background. The repository will be
renamed and hidden, and then deleted later by a worker.
"""
repo = get_repository(namespace_name, repository_name)
if not repo:
return None
with db_transaction():
# Delete any stars for the repository.
Star.delete().where(Star.repository == repo).execute()
# Change the name and state of the repository.
repo.name = str(uuid.uuid4())
repo.state = RepositoryState.MARKED_FOR_DELETION
repo.save()
# Create a tracking row and a queueitem to delete the repository.
marker = DeletedRepository.create(repository=repo, original_name=repository_name)
# Add a queueitem to delete the repository.
marker.queue_id = repository_gc_queue.put(
[namespace_name, str(repo.id)],
json.dumps({"marker_id": marker.id, "original_name": repository_name,}),
)
marker.save()
# Run callbacks for the deleted repo.
for callback in config.repo_cleanup_callbacks:
callback(namespace_name, repository_name)
return marker.id

View File

@@ -282,7 +282,7 @@ def test_has_garbage(default_tag_policy, initialized_db):
"""
# Delete all existing repos.
for repo in database.Repository.select().order_by(database.Repository.id):
assert model.gc.purge_repository(repo.namespace_user.username, repo.name)
assert model.gc.purge_repository(repo, force=True)
# Change the time machine expiration on the namespace.
(
@@ -732,14 +732,6 @@ def test_images_shared_cas_with_new_blob_table(default_tag_policy, initialized_d
assert storage.exists({preferred}, storage.blob_path(digest))
def test_purge_repo(app):
""" Test that app registers delete_metadata function on repository deletions """
with assert_gc_integrity():
with patch("app.tuf_metadata_api") as mock_tuf:
model.gc.purge_repository("ns", "repo")
assert mock_tuf.delete_metadata.called_with("ns", "repo")
def test_super_long_image_chain_gc(app, default_tag_policy):
""" Test that a super long chain of images all gets properly GCed. """
with assert_gc_integrity():

View File

@@ -16,6 +16,7 @@ from data.database import (
TeamMember,
Team,
Repository,
RepositoryState,
TupleSelector,
TeamRole,
Namespace,
@@ -1042,6 +1043,7 @@ def get_private_repo_count(username):
.switch(Repository)
.join(Namespace, on=(Repository.namespace_user == Namespace.id))
.where(Namespace.username == username, Visibility.name == "private")
.where(Repository.state != RepositoryState.MARKED_FOR_DELETION)
.count()
)
@@ -1190,7 +1192,7 @@ def delete_user(user, queues):
# Delete any repositories under the user's namespace.
for repo in list(Repository.select().where(Repository.namespace_user == user)):
gc.purge_repository(user.username, repo.name)
gc.purge_repository(repo, force=True)
# Delete non-repository related items.
_delete_user_linked_data(user)

View File

@@ -9,7 +9,7 @@ from datetime import timedelta, datetime
from flask import request, abort
from app import dockerfile_build_queue, tuf_metadata_api
from app import dockerfile_build_queue, tuf_metadata_api, repository_gc_queue
from data.database import RepositoryState
from endpoints.api import (
format_date,
@@ -311,7 +311,7 @@ class Repository(RepositoryParamResource):
@nickname("deleteRepository")
def delete(self, namespace, repository):
""" Delete a repository. """
username = model.purge_repository(namespace, repository)
username = model.mark_repository_for_deletion(namespace, repository, repository_gc_queue)
if features.BILLING:
plan = get_namespace_plan(namespace)

View File

@@ -298,9 +298,9 @@ class RepositoryDataInterface(object):
"""
@abstractmethod
def purge_repository(self, namespace_name, repository_name):
def mark_repository_for_deletion(self, namespace_name, repository_name, repository_gc_queue):
"""
Removes a repository
Marks a repository for deletion.
"""
@abstractmethod

View File

@@ -51,8 +51,10 @@ class PreOCIModel(RepositoryDataInterface):
else:
model.notification.delete_notifications_by_kind(user_or_org, "over_private_usage")
def purge_repository(self, namespace_name, repository_name):
model.gc.purge_repository(namespace_name, repository_name)
def mark_repository_for_deletion(self, namespace_name, repository_name, repository_gc_queue):
model.repository.mark_repository_for_deletion(
namespace_name, repository_name, repository_gc_queue
)
user = model.user.get_namespace_user(namespace_name)
return user.username
@@ -89,6 +91,7 @@ class PreOCIModel(RepositoryDataInterface):
if starred:
# Return the full list of repos starred by the current user that are still visible to them.
def can_view_repo(repo):
assert repo.state != RepositoryState.MARKED_FOR_DELETION
can_view = ReadRepositoryPermission(repo.namespace_user.username, repo.name).can()
return can_view or model.repository.is_repository_public(repo)

View File

@@ -61,4 +61,4 @@ def test_disallowed_for_nonnormal(state, resource, method, params, client):
params["repository"] = "%s/%s" % (namespace, repository)
with client_with_identity("devtable", client) as cl:
conduct_api_call(cl, resource, method, params, None, 503)
conduct_api_call(cl, resource, method, params, {}, 503)

View File

@@ -188,3 +188,26 @@ def test_get_repo_state_can_write(state, can_write, client, initialized_db):
params = {"repository": "devtable/simple"}
response = conduct_api_call(cl, Repository, "GET", params).json
assert response["can_write"] == can_write
def test_delete_repo(client, initialized_db):
with client_with_identity("devtable", client) as cl:
resp = conduct_api_call(cl, RepositoryList, "GET", {"namespace": "devtable"}).json
repos = {repo["name"] for repo in resp["repositories"]}
assert "simple" in repos
# Delete the repository.
params = {"repository": "devtable/simple"}
conduct_api_call(cl, Repository, "DELETE", params, expected_code=204)
# Ensure it isn't visible anymore.
conduct_api_call(cl, Repository, "GET", params, expected_code=404)
resp = conduct_api_call(cl, RepositoryList, "GET", {"namespace": "devtable"}).json
repos = {repo["name"] for repo in resp["repositories"]}
assert "simple" not in repos
# Check that the repository is enqueued for deletion.
marker = database.DeletedRepository.get()
assert marker.original_name == "simple"
assert marker.queue_id

View File

@@ -514,7 +514,9 @@ class BuildTriggerSources(RepositoryParamResource):
@validate_json_request("BuildTriggerSourcesRequest")
def post(self, namespace_name, repo_name, trigger_uuid):
""" List the build sources for the trigger configuration thus far. """
namespace = request.get_json()["namespace"]
namespace = request.get_json().get("namespace")
if namespace is None:
raise InvalidRequest()
trigger = get_trigger(trigger_uuid)

View File

@@ -230,6 +230,7 @@ def check_repository_state(f):
NORMAL -> Pass
READ_ONLY -> Block all POST/PUT/DELETE
MIRROR -> Same as READ_ONLY, except treat the Mirroring Robot User as Normal
MARKED_FOR_DELETION -> Block everything as a 404
"""
user = get_authenticated_user()
if user is None:
@@ -240,6 +241,9 @@ def check_repository_state(f):
if not repository:
return f(namespace_name, repo_name, *args, **kwargs)
if repository.state == RepositoryState.MARKED_FOR_DELETION:
abort(404)
if repository.state == RepositoryState.READ_ONLY:
abort(405, "%s/%s is in read-only mode." % (namespace_name, repo_name))

View File

@@ -203,6 +203,10 @@ def _authorize_or_downscope_request(scope_param, has_valid_auth_context):
"This repository is for managing %s " + "and not container images."
) % repository_ref.kind
# Ensure the repository is not marked for deletion.
if repository_ref is not None and repository_ref.state == RepositoryState.MARKED_FOR_DELETION:
raise Unknown(message="Unknown repository")
if "push" in requested_actions:
# Check if there is a valid user or token, as otherwise the repository cannot be
# accessed.

View File

@@ -41,6 +41,7 @@ from data.database import (
User,
DisableReason,
DeletedNamespace,
DeletedRepository,
appr_classes,
ApprTagKind,
ApprBlobPlacementLocation,
@@ -1303,6 +1304,7 @@ def populate_database(minimal=False, with_storage=False):
WHITELISTED_EMPTY_MODELS = [
"DeletedNamespace",
"DeletedRepository",
"LogEntry",
"LogEntry2",
"ManifestChild",

View File

@@ -2102,7 +2102,7 @@ class TestListRepos(ApiTestCase):
for repo in list(
RepositoryTable.select().where(RepositoryTable.namespace_user == public_user)
):
model.gc.purge_repository(public_user.username, repo.name)
model.gc.purge_repository(repo, force=True)
# Add public repos until we have enough for a few pages.
required = set()

View File

@@ -21,6 +21,7 @@ INTERNAL_ONLY_PROPERTIES = {
"SECSCAN_NOTIFICATION_QUEUE_NAME",
"SECURITY_SCANNER_ISSUER_NAME",
"NOTIFICATION_QUEUE_NAME",
"REPOSITORY_GC_QUEUE_NAME",
"NAMESPACE_GC_QUEUE_NAME",
"EXPORT_ACTION_LOGS_QUEUE_NAME",
"FEATURE_BILLING",

View File

@@ -0,0 +1,42 @@
import logging
from app import repository_gc_queue, all_queues
from data import model, database
from workers.queueworker import QueueWorker
from util.log import logfile_path
logger = logging.getLogger(__name__)
POLL_PERIOD_SECONDS = 60
REPOSITORY_GC_TIMEOUT = 6 # 0 * 15 # 15 minutes
class RepositoryGCWorker(QueueWorker):
""" Worker which cleans up repositories enqueued to be GCed.
"""
def process_queue_item(self, job_details):
logger.debug("Got repository GC queue item: %s", job_details)
marker_id = job_details["marker_id"]
try:
marker = database.DeletedRepository.get(id=marker_id)
except database.DeletedRepository.DoesNotExist:
logger.debug("Found no matching delete repo marker: %s", job_details)
return
logger.debug("Purging repository %s", marker.repository)
model.gc.purge_repository(marker.repository)
if __name__ == "__main__":
logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False)
logger.debug("Starting repository GC worker")
worker = RepositoryGCWorker(
repository_gc_queue,
poll_period_seconds=POLL_PERIOD_SECONDS,
reservation_seconds=REPOSITORY_GC_TIMEOUT,
)
worker.start()