diff --git a/_init.py b/_init.py index 71e8bda09..76838511e 100644 --- a/_init.py +++ b/_init.py @@ -1,6 +1,18 @@ import os +import re import subprocess +try: + from util.config.provider import get_config_provider +except ModuleNotFoundError: + # Stub out this call so that we can run the external_libraries script + # without needing the entire codebase. + def get_config_provider( + config_volume, yaml_filename, py_filename, testing=False, kubernetes=False + ): + return None + + ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) CONF_DIR = os.getenv("QUAYCONF", os.path.join(ROOT_DIR, "conf/")) STATIC_DIR = os.path.join(ROOT_DIR, "static/") @@ -15,6 +27,15 @@ IS_KUBERNETES = "KUBERNETES_SERVICE_HOST" in os.environ OVERRIDE_CONFIG_DIRECTORY = os.path.join(CONF_DIR, "stack/") +config_provider = get_config_provider( + OVERRIDE_CONFIG_DIRECTORY, + "config.yaml", + "config.py", + testing=IS_TESTING, + kubernetes=IS_KUBERNETES, +) + + def _get_version_number(): # Try to get version from environment version = os.getenv("QUAY_VERSION", "") diff --git a/app.py b/app.py index 7859463ee..89c87a86d 100644 --- a/app.py +++ b/app.py @@ -3,9 +3,13 @@ import json import logging import os +from functools import partial + from authlib.jose import JsonWebKey -from flask import request, Request +from cryptography.hazmat.primitives import serialization +from flask import Flask, request, Request from flask_login import LoginManager +from flask_mail import Mail from flask_principal import Principal from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.exceptions import HTTPException @@ -13,19 +17,25 @@ from werkzeug.exceptions import HTTPException import features from _init import ( + config_provider, + CONF_DIR, IS_KUBERNETES, IS_TESTING, OVERRIDE_CONFIG_DIRECTORY, IS_BUILDING, ) +from avatars.avatars import Avatar from buildman.manager.buildcanceller import BuildCanceller from data import database +from data import model +from data import logs_model from data.archivedlogs import LogArchive from data.billing import Billing from data.buildlogs import BuildLogs from data.cache import get_model_cache from data.model.user import LoginWrappedDBUser +from data.queue import WorkQueue from data.userevent import UserEventsBuilderModule from data.userfiles import Userfiles from data.users import UserAuthentication @@ -42,35 +52,33 @@ from path_converters import ( from oauth.services.github import GithubOAuthService from oauth.services.gitlab import GitLabOAuthService from oauth.loginmanager import OAuthLoginManager +from storage import Storage from util.log import filter_logs +from util import get_app_url +from util.secscan.secscan_util import get_blob_download_uri_getter +from util.ipresolver import IPResolver from util.saas.analytics import Analytics from util.saas.exceptionlog import Sentry from util.names import urn_generator from util.config import URLSchemeAndHostname +from util.config.configutil import generate_secret_key from util.config.superusermanager import SuperUserManager from util.label_validator import LabelValidator from util.metrics.prometheus import PrometheusPlugin from util.repomirror.api import RepoMirrorAPI +from util.tufmetadata.api import TUFMetadataAPI +from util.security.instancekeys import InstanceKeys from util.greenlet_tracing import enable_tracing -from singletons.app import _app as app -from singletons.config import config_provider, get_app_url # also initialize app.config -from singletons.workqueues import * # noqa: F401, F403 - -# Initialize app -from singletons.avatar import avatar -from singletons.instance_keys import instance_keys -from singletons.ip_resolver import ip_resolver -from singletons.mail import mail -from singletons.storage import storage -from singletons.tuf_metadata_api import tuf_metadata_api - OVERRIDE_CONFIG_YAML_FILENAME = os.path.join(OVERRIDE_CONFIG_DIRECTORY, "config.yaml") OVERRIDE_CONFIG_PY_FILENAME = os.path.join(OVERRIDE_CONFIG_DIRECTORY, "config.py") +OVERRIDE_CONFIG_KEY = "QUAY_OVERRIDE_CONFIG" + DOCKER_V2_SIGNINGKEY_FILENAME = "docker_v2.pem" INIT_SCRIPTS_LOCATION = "/conf/init/" +app = Flask(__name__) logger = logging.getLogger(__name__) # Instantiate the configuration. @@ -78,13 +86,49 @@ is_testing = IS_TESTING is_kubernetes = IS_KUBERNETES is_building = IS_BUILDING -if not is_testing: +if is_testing: + from test.testconfig import TestConfig + + logger.debug("Loading test config.") + app.config.from_object(TestConfig()) +else: + from config import DefaultConfig + + logger.debug("Loading default config.") + app.config.from_object(DefaultConfig()) app.teardown_request(database.close_db_filter) +# Load the override config via the provider. +config_provider.update_app_config(app.config) + +# Update any configuration found in the override environment variable. +environ_config = json.loads(os.environ.get(OVERRIDE_CONFIG_KEY, "{}")) +app.config.update(environ_config) + # Fix remote address handling for Flask. if app.config.get("PROXY_COUNT", 1): app.wsgi_app = ProxyFix(app.wsgi_app) +# Allow user to define a custom storage preference for the local instance. +_distributed_storage_preference = os.environ.get("QUAY_DISTRIBUTED_STORAGE_PREFERENCE", "").split() +if _distributed_storage_preference: + app.config["DISTRIBUTED_STORAGE_PREFERENCE"] = _distributed_storage_preference + +# Generate a secret key if none was specified. +if app.config["SECRET_KEY"] is None: + logger.debug("Generating in-memory secret key") + app.config["SECRET_KEY"] = generate_secret_key() + +# If the "preferred" scheme is https, then http is not allowed. Therefore, ensure we have a secure +# session cookie. +if app.config["PREFERRED_URL_SCHEME"] == "https" and not app.config.get( + "FORCE_NONSECURE_SESSION_COOKIE", False +): + app.config["SESSION_COOKIE_SECURE"] = True + +# Load features from config. +features.import_features(app.config) + # Register additional experimental artifact types. # TODO: extract this into a real, dynamic registration system. if features.GENERAL_OCI_SUPPORT: @@ -101,6 +145,8 @@ if features.HELM_OCI_SUPPORT: CONFIG_DIGEST = hashlib.sha256(json.dumps(app.config, default=str).encode("utf-8")).hexdigest()[0:8] +logger.debug("Loaded config", extra={"config": app.config}) + class RequestWithId(Request): request_gen = staticmethod(urn_generator(["request"])) @@ -198,8 +244,14 @@ Principal(app, use_sessions=False) tf = app.config["DB_TRANSACTION_FACTORY"] model_cache = get_model_cache(app.config) +avatar = Avatar(app) login_manager = LoginManager(app) +mail = Mail(app) prometheus = PrometheusPlugin(app) +chunk_cleanup_queue = WorkQueue(app.config["CHUNK_CLEANUP_QUEUE_NAME"], tf) +instance_keys = InstanceKeys(app) +ip_resolver = IPResolver(app) +storage = Storage(app, chunk_cleanup_queue, instance_keys, config_provider, ip_resolver) userfiles = Userfiles(app, storage) log_archive = LogArchive(app, storage) analytics = Analytics(app) @@ -209,6 +261,7 @@ build_logs = BuildLogs(app) authentication = UserAuthentication(app, config_provider, OVERRIDE_CONFIG_DIRECTORY) userevents = UserEventsBuilderModule(app) superusers = SuperUserManager(app) +instance_keys = InstanceKeys(app) label_validator = LabelValidator(app) build_canceller = BuildCanceller(app) @@ -218,6 +271,32 @@ gitlab_trigger = GitLabOAuthService(app.config, "GITLAB_TRIGGER_CONFIG") oauth_login = OAuthLoginManager(app.config) oauth_apps = [github_trigger, gitlab_trigger] +image_replication_queue = WorkQueue(app.config["REPLICATION_QUEUE_NAME"], tf, has_namespace=False) +dockerfile_build_queue = WorkQueue( + app.config["DOCKERFILE_BUILD_QUEUE_NAME"], tf, has_namespace=True +) +notification_queue = WorkQueue(app.config["NOTIFICATION_QUEUE_NAME"], tf, has_namespace=True) +secscan_notification_queue = WorkQueue( + app.config["SECSCAN_V4_NOTIFICATION_QUEUE_NAME"], tf, has_namespace=False +) +export_action_logs_queue = WorkQueue( + app.config["EXPORT_ACTION_LOGS_QUEUE_NAME"], tf, has_namespace=True +) + +repository_gc_queue = WorkQueue(app.config["REPOSITORY_GC_QUEUE_NAME"], tf, has_namespace=True) + +# Note: We set `has_namespace` to `False` here, as we explicitly want this queue to not be emptied +# when a namespace is marked for deletion. +namespace_gc_queue = WorkQueue(app.config["NAMESPACE_GC_QUEUE_NAME"], tf, has_namespace=False) + +all_queues = [ + image_replication_queue, + dockerfile_build_queue, + notification_queue, + chunk_cleanup_queue, + repository_gc_queue, + namespace_gc_queue, +] url_scheme_and_hostname = URLSchemeAndHostname( app.config["PREFERRED_URL_SCHEME"], app.config["SERVER_HOSTNAME"] @@ -230,6 +309,8 @@ repo_mirror_api = RepoMirrorAPI( instance_keys=instance_keys, ) +tuf_metadata_api = TUFMetadataAPI(app, app.config) + # Check for a key in config. If none found, generate a new signing key for Docker V2 manifests. _v2_key_path = os.path.join(OVERRIDE_CONFIG_DIRECTORY, DOCKER_V2_SIGNINGKEY_FILENAME) if os.path.exists(_v2_key_path): @@ -242,8 +323,25 @@ else: if app.config.get("DATABASE_SECRET_KEY") is None and app.config.get("SETUP_COMPLETE", False): raise Exception("Missing DATABASE_SECRET_KEY in config; did you perhaps forget to add it?") +database.configure(app.config) + +model.config.app_config = app.config +model.config.store = storage +model.config.register_repo_cleanup_callback(tuf_metadata_api.delete_metadata) + +secscan_model.configure(app, instance_keys, storage) + +logs_model.configure(app.config) + +# NOTE: We re-use the page token key here as this is just to obfuscate IDs for V1, and +# does not need to actually be secure. +registry_model.set_id_hash_salt(app.config.get("PAGE_TOKEN_KEY")) + @login_manager.user_loader def load_user(user_uuid): logger.debug("User loader loading deferred user with uuid: %s", user_uuid) return LoginWrappedDBUser(user_uuid) + + +get_app_url = partial(get_app_url, app.config) diff --git a/data/database.py b/data/database.py index 8c4aa74bb..549f0efda 100644 --- a/data/database.py +++ b/data/database.py @@ -47,7 +47,6 @@ from data.text import match_mysql, match_like from data.encryption import FieldEncrypter from data.readreplica import ReadReplicaSupportedModel, ReadOnlyConfig, disallow_replica_use from data.estimate import mysql_estimate_row_count, normal_row_count -from singletons.config import app_config from util.names import urn_generator from util.metrics.prometheus import ( db_pooled_connections_in_use, @@ -2150,5 +2149,3 @@ transition_classes = set([TagManifestToManifest, TagManifestLabelMap, TagToRepos is_model = lambda x: inspect.isclass(x) and issubclass(x, BaseModel) and x is not BaseModel all_models = [model[1] for model in inspect.getmembers(sys.modules[__name__], is_model)] - -configure(app_config) diff --git a/data/logs_model/__init__.py b/data/logs_model/__init__.py index eb3737d6f..c8320e307 100644 --- a/data/logs_model/__init__.py +++ b/data/logs_model/__init__.py @@ -3,7 +3,6 @@ import logging from data.logs_model.table_logs_model import TableLogsModel from data.logs_model.document_logs_model import DocumentLogsModel from data.logs_model.combined_model import CombinedLogsModel -from singletons.config import app_config logger = logging.getLogger(__name__) @@ -64,6 +63,3 @@ def configure(app_config): model_config["should_skip_logging"] = should_skip_logging logs_model.initialize(_LOG_MODELS[model_name](**model_config)) - - -configure(app_config) diff --git a/data/model/__init__.py b/data/model/__init__.py index f5f49038a..32c8d7c5e 100644 --- a/data/model/__init__.py +++ b/data/model/__init__.py @@ -1,7 +1,4 @@ from data.database import db, db_transaction -from singletons.config import app_config -from singletons.storage import storage as store -from singletons.tuf_metadata_api import tuf_metadata_api class DataModelException(Exception): @@ -147,9 +144,9 @@ class TooManyLoginAttemptsException(Exception): class Config(object): - def __init__(self, app_config, store): - self.app_config = app_config - self.store = store + def __init__(self): + self.app_config = None + self.store = None self.image_cleanup_callbacks = [] self.repo_cleanup_callbacks = [] @@ -162,8 +159,7 @@ class Config(object): return lambda: self.repo_cleanup_callbacks.remove(callback) -config = Config(app_config, store) -config.register_repo_cleanup_callback(tuf_metadata_api.delete_metadata) +config = Config() # There MUST NOT be any circular dependencies between these subsections. If there are fix it by diff --git a/data/registry_model/__init__.py b/data/registry_model/__init__.py index deb121f46..140ede42f 100644 --- a/data/registry_model/__init__.py +++ b/data/registry_model/__init__.py @@ -1,7 +1,7 @@ +import os import logging from data.registry_model.registry_oci_model import oci_model -from singletons.config import app_config logger = logging.getLogger(__name__) @@ -21,7 +21,3 @@ registry_model = RegistryModelProxy() logger.info("===============================") logger.info("Using registry model `%s`", registry_model._model) logger.info("===============================") - -# NOTE: We re-use the page token key here as this is just to obfuscate IDs for V1, and -# does not need to actually be secure. -registry_model.set_id_hash_salt(app_config.get("PAGE_TOKEN_KEY")) diff --git a/data/secscan_model/__init__.py b/data/secscan_model/__init__.py index 0ac472440..b72bc17e6 100644 --- a/data/secscan_model/__init__.py +++ b/data/secscan_model/__init__.py @@ -1,4 +1,6 @@ +import os import logging +from collections import namedtuple from data.secscan_model.secscan_v4_model import ( V4SecurityScanner, @@ -7,9 +9,8 @@ from data.secscan_model.secscan_v4_model import ( ) from data.secscan_model.interface import SecurityScannerInterface, InvalidConfigurationException from data.secscan_model.datatypes import SecurityInformationLookupResult, ScanLookupStatus -from singletons.app import _app as app -from singletons.instance_keys import instance_keys -from singletons.storage import storage +from data.database import Manifest +from data.registry_model.datatypes import Manifest as ManifestDataType logger = logging.getLogger(__name__) @@ -65,4 +66,3 @@ class SecurityScannerModelProxy(SecurityScannerInterface): secscan_model = SecurityScannerModelProxy() -secscan_model.configure(app, instance_keys, storage) diff --git a/features/__init__.py b/features/__init__.py index ff6e71a16..0242a4667 100644 --- a/features/__init__.py +++ b/features/__init__.py @@ -1,5 +1,3 @@ -from singletons.config import app_config - _FEATURES = {} @@ -35,7 +33,3 @@ class FeatureNameValue(object): return self.value.lower() == "true" return bool(self.value) - - -# Load features from config. -import_features(app_config) diff --git a/notifications/__init__.py b/notifications/__init__.py index 131ad0b6f..094487825 100644 --- a/notifications/__init__.py +++ b/notifications/__init__.py @@ -2,10 +2,9 @@ import json from contextlib import contextmanager +from app import app, notification_queue from data import model from auth.auth_context import get_authenticated_user, get_validated_oauth_token -from singletons.config import app_config -from singletons.workqueues import notification_queue DEFAULT_BATCH_SIZE = 1000 @@ -19,8 +18,8 @@ def build_repository_event_data(namespace_name, repo_name, extra_data=None, subp """ repo_string = "%s/%s" % (namespace_name, repo_name) homepage = "%s://%s/repository/%s" % ( - app_config["PREFERRED_URL_SCHEME"], - app_config["SERVER_HOSTNAME"], + app.config["PREFERRED_URL_SCHEME"], + app.config["SERVER_HOSTNAME"], repo_string, ) @@ -34,7 +33,7 @@ def build_repository_event_data(namespace_name, repo_name, extra_data=None, subp "repository": repo_string, "namespace": namespace_name, "name": repo_name, - "docker_url": "%s/%s" % (app_config["SERVER_HOSTNAME"], repo_string), + "docker_url": "%s/%s" % (app.config["SERVER_HOSTNAME"], repo_string), "homepage": homepage, } diff --git a/notifications/notificationmethod.py b/notifications/notificationmethod.py index 4410744cf..d6c49e998 100644 --- a/notifications/notificationmethod.py +++ b/notifications/notificationmethod.py @@ -8,11 +8,9 @@ import requests from flask_mail import Message from urllib.parse import urlparse +from app import mail, app, OVERRIDE_CONFIG_DIRECTORY from data import model -from singletons.app import app_context -from singletons.config import app_config, OVERRIDE_CONFIG_DIRECTORY -from singletons.mail import mail -from util.ssl import SSL_FILENAMES +from util.config.validator import SSL_FILENAMES from util.jsontemplate import JSONTemplate, JSONTemplateParseException from util.fips import login_fips_safe from workers.queueworker import JobException @@ -20,10 +18,10 @@ import features logger = logging.getLogger(__name__) -METHOD_TIMEOUT = app_config.get("NOTIFICATION_SEND_TIMEOUT", 10) # Seconds +METHOD_TIMEOUT = app.config.get("NOTIFICATION_SEND_TIMEOUT", 10) # Seconds HOSTNAME_BLACKLIST = ["localhost", "127.0.0.1"] -HOSTNAME_BLACKLIST.extend(app_config.get("WEBHOOK_HOSTNAME_BLACKLIST", [])) -MAIL_DEFAULT_SENDER = app_config.get("MAIL_DEFAULT_SENDER", "admin@example.com") +HOSTNAME_BLACKLIST.extend(app.config.get("WEBHOOK_HOSTNAME_BLACKLIST", [])) +MAIL_DEFAULT_SENDER = app.config.get("MAIL_DEFAULT_SENDER", "admin@example.com") class InvalidNotificationMethodException(Exception): @@ -39,7 +37,7 @@ class NotificationMethodPerformException(JobException): def _ssl_cert(): - if app_config["PREFERRED_URL_SCHEME"] == "https": + if app.config["PREFERRED_URL_SCHEME"] == "https": cert_files = [OVERRIDE_CONFIG_DIRECTORY + f for f in SSL_FILENAMES] cert_exists = all([os.path.isfile(f) for f in cert_files]) return cert_files if cert_exists else None @@ -175,7 +173,7 @@ class EmailMethod(NotificationMethod): if not email: return - with app_context(): + with app.app_context(): msg = Message( event_handler.get_summary(notification_data["event_data"], notification_data), recipients=[email], @@ -184,7 +182,7 @@ class EmailMethod(NotificationMethod): try: if features.FIPS: - assert app_config[ + assert app.config[ "MAIL_USE_TLS" ], "MAIL_USE_TLS must be enabled to use SMTP in FIPS mode." with mock.patch("smtplib.SMTP.login", login_fips_safe): diff --git a/singletons/__init__.py b/singletons/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/singletons/app.py b/singletons/app.py deleted file mode 100644 index fa18182f4..000000000 --- a/singletons/app.py +++ /dev/null @@ -1,10 +0,0 @@ -from _init import TEMPLATE_DIR -from flask import Flask - -# _app is a bare Flask object. Please don't use it directly from this package -# unless you need an uninitialized object. -_app = Flask(__name__, template_folder=TEMPLATE_DIR) - - -def app_context(): - return _app.app_context() diff --git a/singletons/avatar.py b/singletons/avatar.py deleted file mode 100644 index 5a878447f..000000000 --- a/singletons/avatar.py +++ /dev/null @@ -1,4 +0,0 @@ -from avatars.avatars import Avatar -from singletons.app import _app - -avatar = Avatar(_app) diff --git a/singletons/config.py b/singletons/config.py deleted file mode 100644 index 20011a1a8..000000000 --- a/singletons/config.py +++ /dev/null @@ -1,64 +0,0 @@ -import json -import logging -import os -from functools import partial - -from _init import OVERRIDE_CONFIG_DIRECTORY, IS_TESTING, IS_KUBERNETES -from singletons.app import _app as app -from util import get_app_url as util_get_app_url -from util.config.configutil import generate_secret_key -from util.config.provider import get_config_provider - - -OVERRIDE_CONFIG_KEY = "QUAY_OVERRIDE_CONFIG" - -logger = logging.getLogger(__name__) - -config_provider = get_config_provider( - OVERRIDE_CONFIG_DIRECTORY, - "config.yaml", - "config.py", - testing=IS_TESTING, - kubernetes=IS_KUBERNETES, -) - -if IS_TESTING: - from test.testconfig import TestConfig - - logger.debug("Loading test config.") - app.config.from_object(TestConfig()) -else: - from config import DefaultConfig - - logger.debug("Loading default config.") - app.config.from_object(DefaultConfig()) - -# Load the override config via the provider. -config_provider.update_app_config(app.config) - -# Update any configuration found in the override environment variable. -environ_config = json.loads(os.environ.get(OVERRIDE_CONFIG_KEY, "{}")) -app.config.update(environ_config) - -# Allow user to define a custom storage preference for the local instance. -_distributed_storage_preference = os.environ.get("QUAY_DISTRIBUTED_STORAGE_PREFERENCE", "").split() -if _distributed_storage_preference: - app.config["DISTRIBUTED_STORAGE_PREFERENCE"] = _distributed_storage_preference - -# Generate a secret key if none was specified. -if app.config["SECRET_KEY"] is None: - logger.debug("Generating in-memory secret key") - app.config["SECRET_KEY"] = generate_secret_key() - -# If the "preferred" scheme is https, then http is not allowed. Therefore, ensure we have a secure -# session cookie. -if app.config["PREFERRED_URL_SCHEME"] == "https" and not app.config.get( - "FORCE_NONSECURE_SESSION_COOKIE", False -): - app.config["SESSION_COOKIE_SECURE"] = True - -logger.debug("Loaded config", extra={"config": app.config}) - -get_app_url = partial(util_get_app_url, app.config) - -app_config = app.config diff --git a/singletons/instance_keys.py b/singletons/instance_keys.py deleted file mode 100644 index a523999f8..000000000 --- a/singletons/instance_keys.py +++ /dev/null @@ -1,5 +0,0 @@ -from util.security.instancekeys import InstanceKeys - -from singletons.app import _app - -instance_keys = InstanceKeys(_app) diff --git a/singletons/ip_resolver.py b/singletons/ip_resolver.py deleted file mode 100644 index ceca4b447..000000000 --- a/singletons/ip_resolver.py +++ /dev/null @@ -1,5 +0,0 @@ -from util.ipresolver import IPResolver - -from singletons.app import _app - -ip_resolver = IPResolver(_app) diff --git a/singletons/mail.py b/singletons/mail.py deleted file mode 100644 index 3794ed274..000000000 --- a/singletons/mail.py +++ /dev/null @@ -1,4 +0,0 @@ -from flask_mail import Mail -from singletons.app import _app - -mail = Mail(_app) diff --git a/singletons/storage.py b/singletons/storage.py deleted file mode 100644 index 53a410446..000000000 --- a/singletons/storage.py +++ /dev/null @@ -1,8 +0,0 @@ -from singletons.app import _app -from singletons.config import config_provider -from singletons.instance_keys import instance_keys -from singletons.ip_resolver import ip_resolver -from singletons.workqueues import chunk_cleanup_queue -from storage import Storage - -storage = Storage(_app, chunk_cleanup_queue, instance_keys, config_provider, ip_resolver) diff --git a/singletons/tuf_metadata_api.py b/singletons/tuf_metadata_api.py deleted file mode 100644 index d0ad5bcfa..000000000 --- a/singletons/tuf_metadata_api.py +++ /dev/null @@ -1,5 +0,0 @@ -from singletons.app import _app -from singletons.config import app_config -from util.tufmetadata.api import TUFMetadataAPI - -tuf_metadata_api = TUFMetadataAPI(_app, app_config) diff --git a/singletons/workqueues.py b/singletons/workqueues.py deleted file mode 100644 index 14397158b..000000000 --- a/singletons/workqueues.py +++ /dev/null @@ -1,32 +0,0 @@ -from data.queue import WorkQueue -from singletons.config import app_config - -tf = app_config["DB_TRANSACTION_FACTORY"] - -chunk_cleanup_queue = WorkQueue(app_config["CHUNK_CLEANUP_QUEUE_NAME"], tf) -image_replication_queue = WorkQueue(app_config["REPLICATION_QUEUE_NAME"], tf, has_namespace=False) -dockerfile_build_queue = WorkQueue( - app_config["DOCKERFILE_BUILD_QUEUE_NAME"], tf, has_namespace=True -) -notification_queue = WorkQueue(app_config["NOTIFICATION_QUEUE_NAME"], tf, has_namespace=True) -secscan_notification_queue = WorkQueue( - app_config["SECSCAN_V4_NOTIFICATION_QUEUE_NAME"], tf, has_namespace=False -) -export_action_logs_queue = WorkQueue( - app_config["EXPORT_ACTION_LOGS_QUEUE_NAME"], tf, has_namespace=True -) - -repository_gc_queue = WorkQueue(app_config["REPOSITORY_GC_QUEUE_NAME"], tf, has_namespace=True) - -# Note: We set `has_namespace` to `False` here, as we explicitly want this queue to not be emptied -# when a namespace is marked for deletion. -namespace_gc_queue = WorkQueue(app_config["NAMESPACE_GC_QUEUE_NAME"], tf, has_namespace=False) - -all_queues = [ - image_replication_queue, - dockerfile_build_queue, - notification_queue, - chunk_cleanup_queue, - repository_gc_queue, - namespace_gc_queue, -] diff --git a/util/config/provider/testprovider.py b/util/config/provider/testprovider.py index a320f8006..8f187559b 100644 --- a/util/config/provider/testprovider.py +++ b/util/config/provider/testprovider.py @@ -1,6 +1,7 @@ import json import io import os +from datetime import datetime, timedelta from util.config.provider.baseprovider import BaseProvider diff --git a/util/config/validators/validate_ssl.py b/util/config/validators/validate_ssl.py index c47a94da5..d0052f2f1 100644 --- a/util/config/validators/validate_ssl.py +++ b/util/config/validators/validate_ssl.py @@ -1,6 +1,7 @@ from util.config.validators import BaseValidator, ConfigValidationException from util.security.ssl import load_certificate, CertInvalidException, KeyInvalidException -from util.ssl import SSL_FILENAMES + +SSL_FILENAMES = ["ssl.cert", "ssl.key"] class SSLValidator(BaseValidator): diff --git a/util/generatepresharedkey.py b/util/generatepresharedkey.py index ae000c37a..dab8e4f19 100644 --- a/util/generatepresharedkey.py +++ b/util/generatepresharedkey.py @@ -2,8 +2,10 @@ import argparse from dateutil.parser import parse as parse_date +from app import app from data import model from data.database import ServiceKeyApprovalType +from data.logs_model import logs_model def generate_key(service, name, expiration_date=None, notes=None): diff --git a/util/jinjautil.py b/util/jinjautil.py index 9d73fb74c..b3ee36a34 100644 --- a/util/jinjautil.py +++ b/util/jinjautil.py @@ -1,6 +1,5 @@ +from app import get_app_url, avatar from data import model -from singletons.avatar import avatar -from singletons.config import get_app_url from util.names import parse_robot_username from jinja2 import Environment, FileSystemLoader diff --git a/util/registry/replication.py b/util/registry/replication.py index b3e632f9e..4525dc683 100644 --- a/util/registry/replication.py +++ b/util/registry/replication.py @@ -4,7 +4,7 @@ import json from contextlib import contextmanager from data import model -from singletons.workqueues import image_replication_queue +from app import image_replication_queue DEFAULT_BATCH_SIZE = 1000 diff --git a/util/ssl.py b/util/ssl.py deleted file mode 100644 index 20b9a3d26..000000000 --- a/util/ssl.py +++ /dev/null @@ -1 +0,0 @@ -SSL_FILENAMES = ["ssl.cert", "ssl.key"] diff --git a/util/useremails.py b/util/useremails.py index ace8e6837..40aea6b05 100644 --- a/util/useremails.py +++ b/util/useremails.py @@ -6,9 +6,10 @@ from unittest import mock from flask_mail import Message +import features + from _init import ROOT_DIR -from singletons.config import app_config, get_app_url -from singletons.mail import mail +from app import mail, app, get_app_url from util.jinjautil import get_template_env from util.html import html2text from util.fips import login_fips_safe @@ -58,7 +59,7 @@ class GmailAction(object): def send_email(recipient, subject, template_file, parameters, action=None): - app_title = app_config["REGISTRY_TITLE_SHORT"] + app_title = app.config["REGISTRY_TITLE_SHORT"] app_url = get_app_url() html, plain = render_email( app_title, app_url, recipient, subject, template_file, parameters, action=None @@ -69,7 +70,7 @@ def send_email(recipient, subject, template_file, parameters, action=None): try: if features.FIPS: - assert app_config[ + assert app.config[ "MAIL_USE_TLS" ], "MAIL_USE_TLS must be enabled to use SMTP in FIPS mode." with mock.patch("smtplib.SMTP.login", login_fips_safe): @@ -77,7 +78,7 @@ def send_email(recipient, subject, template_file, parameters, action=None): else: mail.send(msg) - if app_config["TESTING"]: + if app.config["TESTING"]: logger.debug("Quay is configured for testing. Email not sent: '%s'", msg.subject) else: logger.debug("Sent email: '%s'", msg.subject) @@ -90,7 +91,7 @@ def render_email(app_title, app_url, recipient, subject, template_file, paramete def app_link_handler(url=None): return app_url + "/" + url if url else app_url - app_logo = app_config.get("ENTERPRISE_LOGO_URL", "https://quay.io/static/img/quay-logo.png") + app_logo = app.config.get("ENTERPRISE_LOGO_URL", "https://quay.io/static/img/quay-logo.png") parameters.update( { @@ -212,7 +213,7 @@ def send_invoice_email(email, contents): msg = Message("Quay payment received - Thank you!", recipients=[email]) msg.html = contents if features.FIPS: - assert app_config["MAIL_USE_TLS"], "MAIL_USE_TLS must be enabled to use SMTP in FIPS mode." + assert app.config["MAIL_USE_TLS"], "MAIL_USE_TLS must be enabled to use SMTP in FIPS mode." with mock.patch("smtplib.SMTP.login", login_fips_safe): mail.send(msg) else: @@ -253,7 +254,7 @@ def send_subscription_change(change_description, customer_id, customer_email, qu change_description, customer_id, customer_email, quay_username ) if features.FIPS: - assert app_config["MAIL_USE_TLS"], "MAIL_USE_TLS must be enabled to use SMTP in FIPS mode." + assert app.config["MAIL_USE_TLS"], "MAIL_USE_TLS must be enabled to use SMTP in FIPS mode." with mock.patch("smtplib.SMTP.login", login_fips_safe): mail.send(msg) else: diff --git a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py index fa04a3d44..5c65282a2 100644 --- a/workers/blobuploadcleanupworker/blobuploadcleanupworker.py +++ b/workers/blobuploadcleanupworker/blobuploadcleanupworker.py @@ -93,7 +93,7 @@ class BlobUploadCleanupWorker(Worker): logger.debug("Removed stale blob upload %s", stale_upload.uuid) -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -102,7 +102,7 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ - worker = GunicornWorker(__name__, BlobUploadCleanupWorker(), True) + worker = GunicornWorker(__name__, app, BlobUploadCleanupWorker(), True) return worker diff --git a/workers/buildlogsarchiver/buildlogsarchiver.py b/workers/buildlogsarchiver/buildlogsarchiver.py index 6384354d1..eeb98bb11 100644 --- a/workers/buildlogsarchiver/buildlogsarchiver.py +++ b/workers/buildlogsarchiver/buildlogsarchiver.py @@ -66,7 +66,7 @@ class ArchiveBuildLogsWorker(Worker): logger.debug("Another worker pre-empted us when archiving: %s", to_archive.uuid) -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -75,7 +75,7 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ - worker = GunicornWorker(__name__, ArchiveBuildLogsWorker(), True) + worker = GunicornWorker(__name__, app, ArchiveBuildLogsWorker(), True) return worker diff --git a/workers/chunkcleanupworker.py b/workers/chunkcleanupworker.py index e38029366..2626f557d 100644 --- a/workers/chunkcleanupworker.py +++ b/workers/chunkcleanupworker.py @@ -34,7 +34,7 @@ class ChunkCleanupWorker(QueueWorker): raise JobException() -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -49,6 +49,7 @@ def create_gunicorn_worker() -> GunicornWorker: feature_flag = "SwiftStorage" in engines worker = GunicornWorker( __name__, + app, ChunkCleanupWorker(chunk_cleanup_queue, poll_period_seconds=POLL_PERIOD_SECONDS), feature_flag, ) diff --git a/workers/expiredappspecifictokenworker.py b/workers/expiredappspecifictokenworker.py index fbd6303d5..651e72806 100644 --- a/workers/expiredappspecifictokenworker.py +++ b/workers/expiredappspecifictokenworker.py @@ -3,12 +3,12 @@ import time import features +from app import app # This is required to initialize the database. from data import model -from singletons.config import app_config +from workers.worker import Worker from util.log import logfile_path from util.timedeltastring import convert_to_timedelta from workers.gunicorn_worker import GunicornWorker -from workers.worker import Worker logger = logging.getLogger(__name__) @@ -20,7 +20,7 @@ class ExpiredAppSpecificTokenWorker(Worker): def __init__(self): super(ExpiredAppSpecificTokenWorker, self).__init__() - expiration_window = app_config.get("EXPIRED_APP_SPECIFIC_TOKEN_GC", "1d") + expiration_window = app.config.get("EXPIRED_APP_SPECIFIC_TOKEN_GC", "1d") self.expiration_window = convert_to_timedelta(expiration_window) logger.debug("Found expiration window: %s", expiration_window) @@ -37,7 +37,7 @@ class ExpiredAppSpecificTokenWorker(Worker): return True -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -47,16 +47,16 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ feature_flag = (features.APP_SPECIFIC_TOKENS) or ( - app_config.get("EXPIRED_APP_SPECIFIC_TOKEN_GC") is not None + app.config.get("EXPIRED_APP_SPECIFIC_TOKEN_GC") is not None ) - worker = GunicornWorker(__name__, ExpiredAppSpecificTokenWorker(), feature_flag) + worker = GunicornWorker(__name__, app, ExpiredAppSpecificTokenWorker(), feature_flag) return worker if __name__ == "__main__": logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) - if app_config.get("ACCOUNT_RECOVERY_MODE", False): + if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) @@ -66,7 +66,7 @@ if __name__ == "__main__": while True: time.sleep(100000) - if app_config.get("EXPIRED_APP_SPECIFIC_TOKEN_GC") is None: + if app.config.get("EXPIRED_APP_SPECIFIC_TOKEN_GC") is None: logger.debug("GC of App specific tokens is disabled; skipping") while True: time.sleep(100000) diff --git a/workers/exportactionlogsworker.py b/workers/exportactionlogsworker.py index 7f39adf4f..9e788ce5d 100644 --- a/workers/exportactionlogsworker.py +++ b/workers/exportactionlogsworker.py @@ -290,7 +290,7 @@ def _parse_time(specified_time): return None -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -302,7 +302,7 @@ def create_gunicorn_worker() -> GunicornWorker: log_worker = ExportActionLogsWorker( export_action_logs_queue, poll_period_seconds=POLL_PERIOD_SECONDS ) - worker = GunicornWorker(__name__, log_worker, features.LOG_EXPORT) + worker = GunicornWorker(__name__, app, log_worker, features.LOG_EXPORT) return worker diff --git a/workers/gc/gcworker.py b/workers/gc/gcworker.py index 112f63859..6fde4a38b 100644 --- a/workers/gc/gcworker.py +++ b/workers/gc/gcworker.py @@ -5,11 +5,11 @@ from contextlib import contextmanager import features -from data.database import UseThenDisconnect, Repository +from app import app +from data.database import UseThenDisconnect, Repository, RepositoryState from data.registry_model import registry_model from data.model.repository import get_random_gc_policy from data.model.gc import garbage_collect_repo -from singletons.config import app_config from workers.worker import Worker from util.locking import GlobalLock, LockNotAcquiredException from workers.gunicorn_worker import GunicornWorker @@ -30,14 +30,14 @@ class GarbageCollectionWorker(Worker): def __init__(self): super(GarbageCollectionWorker, self).__init__() self.add_operation( - self._garbage_collection_repos, app_config.get("GARBAGE_COLLECTION_FREQUENCY", 30) + self._garbage_collection_repos, app.config.get("GARBAGE_COLLECTION_FREQUENCY", 30) ) def _garbage_collection_repos(self, skip_lock_for_testing=False): """ Performs garbage collection on repositories. """ - with UseThenDisconnect(app_config): + with UseThenDisconnect(app.config): policy = get_random_gc_policy() if policy is None: logger.debug("No GC policies found") @@ -72,7 +72,7 @@ class GarbageCollectionWorker(Worker): logger.debug("Could not acquire repo lock for garbage collection") -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -81,12 +81,12 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ - worker = GunicornWorker(__name__, GarbageCollectionWorker(), features.GARBAGE_COLLECTION) + worker = GunicornWorker(__name__, app, GarbageCollectionWorker(), features.GARBAGE_COLLECTION) return worker if __name__ == "__main__": - if app_config.get("ACCOUNT_RECOVERY_MODE", False): + if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) @@ -96,6 +96,6 @@ if __name__ == "__main__": while True: time.sleep(100000) - GlobalLock.configure(app_config) + GlobalLock.configure(app.config) worker = GarbageCollectionWorker() worker.start() diff --git a/workers/globalpromstats/globalpromstats.py b/workers/globalpromstats/globalpromstats.py index 4622410e4..9fb6b5d8e 100644 --- a/workers/globalpromstats/globalpromstats.py +++ b/workers/globalpromstats/globalpromstats.py @@ -3,9 +3,9 @@ import time from prometheus_client import Gauge +from app import app from data import model from data.database import UseThenDisconnect -from singletons.config import app_config from util.locking import GlobalLock, LockNotAcquiredException from util.log import logfile_path from workers.worker import Worker @@ -19,7 +19,7 @@ org_rows = Gauge("quay_org_rows", "number of organizations in the database") robot_rows = Gauge("quay_robot_rows", "number of robot accounts in the database") -WORKER_FREQUENCY = app_config.get("GLOBAL_PROMETHEUS_STATS_FREQUENCY", 60 * 60) +WORKER_FREQUENCY = app.config.get("GLOBAL_PROMETHEUS_STATS_FREQUENCY", 60 * 60) def get_repository_count(): @@ -58,14 +58,14 @@ class GlobalPrometheusStatsWorker(Worker): def _report_stats(self): logger.debug("Reporting global stats") - with UseThenDisconnect(app_config): + with UseThenDisconnect(app.config): repository_rows.set(get_repository_count()) user_rows.set(get_active_user_count()) org_rows.set(get_active_org_count()) robot_rows.set(get_robot_count()) -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -74,25 +74,25 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ - feature_flag = app_config.get("PROMETHEUS_PUSHGATEWAY_URL") is not None - worker = GunicornWorker(__name__, GlobalPrometheusStatsWorker(), feature_flag) + feature_flag = app.config.get("PROMETHEUS_PUSHGATEWAY_URL") is not None + worker = GunicornWorker(__name__, app, GlobalPrometheusStatsWorker(), feature_flag) return worker def main(): logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) - if app_config.get("ACCOUNT_RECOVERY_MODE", False): + if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) - if not app_config.get("PROMETHEUS_PUSHGATEWAY_URL"): + if not app.config.get("PROMETHEUS_PUSHGATEWAY_URL"): logger.debug("Prometheus not enabled; skipping global stats reporting") while True: time.sleep(100000) - GlobalLock.configure(app_config) + GlobalLock.configure(app.config) worker = GlobalPrometheusStatsWorker() worker.start() diff --git a/workers/gunicorn_worker.py b/workers/gunicorn_worker.py index 35092375e..a35cd0f53 100644 --- a/workers/gunicorn_worker.py +++ b/workers/gunicorn_worker.py @@ -1,29 +1,25 @@ -import logging import logging.config +import threading from multiprocessing import Process -from typing import Union, TYPE_CHECKING from util.log import logfile_path -if TYPE_CHECKING: - from features import FeatureNameValue - from workers.worker import Worker - class GunicornWorker: """ - GunicornWorker allows a Quay worker to run as a Gunicorn worker. - The Quay worker is launched as a sub-process. + GunicornWorker allows a quay worker to run as a Gunicorn worker. + The Quay worker is launched as a sub-process and this class serves as a delegate + for the wsgi app. name: the quay worker this class delegates for. + app: a uwsgi framework application object. worker: a quay worker type which implements a .start method. feature_flag: a boolean value determine if the worker thread should be launched """ - def __init__( - self, name: str, worker: "Worker", feature_flag: Union[bool, "FeatureNameValue"] - ) -> None: + def __init__(self, name, app, worker, feature_flag): logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) + self.app = app self.name = name self.worker = worker self.feature_flag = feature_flag @@ -32,7 +28,7 @@ class GunicornWorker: if self.feature_flag: self.logger.debug("starting {} thread".format(self.name)) p = Process(target=self.worker.start) - p.start() + p = p.start() - def __call__(self, environ, start_response): - raise NotImplementedError() + def __call__(environ, start_response): + return self.app(environ, start_response) diff --git a/workers/logrotateworker.py b/workers/logrotateworker.py index d634229ea..28fc64e74 100644 --- a/workers/logrotateworker.py +++ b/workers/logrotateworker.py @@ -131,7 +131,7 @@ def _write_logs(filename, logs, log_archive): log_archive.store_file(tempfile, JSON_MIMETYPE, content_encoding="gzip", file_id=filename) -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -141,7 +141,7 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ feature_flag = (features.ACTION_LOG_ROTATION) or (not None in [SAVE_PATH, SAVE_LOCATION]) - worker = GunicornWorker(__name__, LogRotateWorker(), feature_flag) + worker = GunicornWorker(__name__, app, LogRotateWorker(), feature_flag) return worker diff --git a/workers/manifestbackfillworker.py b/workers/manifestbackfillworker.py index 7fa43abd9..847246824 100644 --- a/workers/manifestbackfillworker.py +++ b/workers/manifestbackfillworker.py @@ -5,18 +5,18 @@ from peewee import fn import features +from app import app from data.database import Manifest from image.shared.schemas import parse_manifest_from_bytes, ManifestException -from singletons.config import app_config +from workers.worker import Worker from util.migrate.allocator import yield_random_entries from util.bytes import Bytes from util.log import logfile_path from workers.gunicorn_worker import GunicornWorker -from workers.worker import Worker logger = logging.getLogger(__name__) -WORKER_FREQUENCY = app_config.get("MANIFEST_BACKFILL_WORKER_FREQUENCY", 60 * 60) +WORKER_FREQUENCY = app.config.get("MANIFEST_BACKFILL_WORKER_FREQUENCY", 60 * 60) class ManifestBackfillWorker(Worker): @@ -86,7 +86,7 @@ class ManifestBackfillWorker(Worker): return True -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -95,14 +95,16 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ - worker = GunicornWorker(__name__, ManifestBackfillWorker(), features.MANIFEST_SIZE_BACKFILL) + worker = GunicornWorker( + __name__, app, ManifestBackfillWorker(), features.MANIFEST_SIZE_BACKFILL + ) return worker def main(): logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) - if app_config.get("ACCOUNT_RECOVERY_MODE", False): + if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) diff --git a/workers/namespacegcworker.py b/workers/namespacegcworker.py index 364603ba6..0dfba8506 100644 --- a/workers/namespacegcworker.py +++ b/workers/namespacegcworker.py @@ -3,14 +3,13 @@ import time import features +from app import app, namespace_gc_queue, all_queues from data import model -from singletons.config import app_config -from singletons.workqueues import namespace_gc_queue, all_queues +from workers.queueworker import QueueWorker, WorkerSleepException from util.log import logfile_path from util.locking import GlobalLock, LockNotAcquiredException from util.metrics.prometheus import gc_namespaces_purged from workers.gunicorn_worker import GunicornWorker -from workers.queueworker import QueueWorker, WorkerSleepException logger = logging.getLogger(__name__) @@ -44,7 +43,7 @@ class NamespaceGCWorker(QueueWorker): gc_namespaces_purged.inc() -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -58,14 +57,14 @@ def create_gunicorn_worker() -> GunicornWorker: poll_period_seconds=POLL_PERIOD_SECONDS, reservation_seconds=NAMESPACE_GC_TIMEOUT, ) - worker = GunicornWorker(__name__, gc_worker, features.NAMESPACE_GARBAGE_COLLECTION) + worker = GunicornWorker(__name__, app, gc_worker, features.NAMESPACE_GARBAGE_COLLECTION) return worker if __name__ == "__main__": logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) - if app_config.get("ACCOUNT_RECOVERY_MODE", False): + if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) @@ -75,7 +74,7 @@ if __name__ == "__main__": while True: time.sleep(100000) - GlobalLock.configure(app_config) + GlobalLock.configure(app.config) logger.debug("Starting namespace GC worker") worker = NamespaceGCWorker( namespace_gc_queue, diff --git a/workers/notificationworker/notificationworker.py b/workers/notificationworker/notificationworker.py index 7205099a6..0d5961a63 100644 --- a/workers/notificationworker/notificationworker.py +++ b/workers/notificationworker/notificationworker.py @@ -1,10 +1,9 @@ import logging import time +from app import app, notification_queue from notifications.notificationmethod import NotificationMethod, InvalidNotificationMethodException from notifications.notificationevent import NotificationEvent, InvalidNotificationEventException -from singletons.config import app_config -from singletons.workqueues import notification_queue from workers.notificationworker.models_pre_oci import pre_oci_model as model from workers.queueworker import QueueWorker, JobException from workers.gunicorn_worker import GunicornWorker @@ -40,7 +39,7 @@ class NotificationWorker(QueueWorker): raise exc -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -52,12 +51,12 @@ def create_gunicorn_worker() -> GunicornWorker: note_worker = NotificationWorker( notification_queue, poll_period_seconds=10, reservation_seconds=30, retry_after_seconds=30 ) - worker = GunicornWorker(__name__, note_worker, True) + worker = GunicornWorker(__name__, app, note_worker, True) return worker if __name__ == "__main__": - if app_config.get("ACCOUNT_RECOVERY_MODE", False): + if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) diff --git a/workers/queuecleanupworker.py b/workers/queuecleanupworker.py index ed9653e32..da6493336 100644 --- a/workers/queuecleanupworker.py +++ b/workers/queuecleanupworker.py @@ -3,9 +3,9 @@ import time from datetime import timedelta, datetime +from app import app from data.database import UseThenDisconnect from data.queue import delete_expired -from singletons.config import app_config from workers.worker import Worker from workers.gunicorn_worker import GunicornWorker @@ -15,7 +15,7 @@ logger = logging.getLogger(__name__) DELETION_DATE_THRESHOLD = timedelta(days=1) DELETION_COUNT_THRESHOLD = 50 BATCH_SIZE = 500 -QUEUE_CLEANUP_FREQUENCY = app_config.get("QUEUE_CLEANUP_FREQUENCY", 60 * 60 * 24) +QUEUE_CLEANUP_FREQUENCY = app.config.get("QUEUE_CLEANUP_FREQUENCY", 60 * 60 * 24) class QueueCleanupWorker(Worker): @@ -27,7 +27,7 @@ class QueueCleanupWorker(Worker): """ Performs garbage collection on the queueitem table. """ - with UseThenDisconnect(app_config): + with UseThenDisconnect(app.config): while True: # Find all queue items older than the threshold (typically a week) and delete them. expiration_threshold = datetime.now() - DELETION_DATE_THRESHOLD @@ -38,7 +38,7 @@ class QueueCleanupWorker(Worker): return -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -47,12 +47,12 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ - worker = GunicornWorker(__name__, QueueCleanupWorker(), True) + worker = GunicornWorker(__name__, app, QueueCleanupWorker(), True) return worker if __name__ == "__main__": - if app_config.get("ACCOUNT_RECOVERY_MODE", False): + if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) diff --git a/workers/queueworker.py b/workers/queueworker.py index 588bb8dc0..89e3ef9b4 100644 --- a/workers/queueworker.py +++ b/workers/queueworker.py @@ -4,8 +4,8 @@ import time from threading import Event, Lock +from app import app from data.database import CloseForLongOperation -from singletons.config import app_config from workers.worker import Worker @@ -73,7 +73,7 @@ class QueueWorker(Worker): # Add the various operations. self.add_operation(self.poll_queue, self._poll_period_seconds) self.add_operation( - self.update_queue_metrics, app_config["QUEUE_WORKER_METRICS_REFRESH_SECONDS"] + self.update_queue_metrics, app.config["QUEUE_WORKER_METRICS_REFRESH_SECONDS"] ) self.add_operation(self.run_watchdog, self._watchdog_period_seconds) @@ -132,7 +132,7 @@ class QueueWorker(Worker): job_details = json.loads(current_queue_item.body) try: - with CloseForLongOperation(app_config): + with CloseForLongOperation(app.config): self.process_queue_item(job_details) self.mark_current_complete() diff --git a/workers/repomirrorworker/__init__.py b/workers/repomirrorworker/__init__.py index b5d284cbf..d2a03b565 100644 --- a/workers/repomirrorworker/__init__.py +++ b/workers/repomirrorworker/__init__.py @@ -8,6 +8,7 @@ from prometheus_client import Gauge import features +from app import app from data import database from data.encryption import DecryptionFailureException from data.model.repo_mirror import claim_mirror, release_mirror @@ -17,7 +18,6 @@ from data.registry_model import registry_model from data.database import RepoMirrorStatus from data.model.oci.tag import delete_tag, retarget_tag, lookup_alive_tags_shallow from notifications import spawn_notification -from singletons.config import app_config from util.audit import wrap_repository from workers.repomirrorworker.repo_mirror_model import repo_mirror_model as model @@ -32,7 +32,7 @@ unmirrored_repositories = Gauge( ) # Used only for testing - should not be set in production -TAG_ROLLBACK_PAGE_SIZE = app_config.get("REPO_MIRROR_TAG_ROLLBACK_PAGE_SIZE", 100) +TAG_ROLLBACK_PAGE_SIZE = app.config.get("REPO_MIRROR_TAG_ROLLBACK_PAGE_SIZE", 100) class PreemptedException(Exception): @@ -68,7 +68,7 @@ def process_mirrors(skopeo, token=None): logger.debug("Found no additional repositories to mirror") return next_token - with database.UseThenDisconnect(app_config): + with database.UseThenDisconnect(app.config): for mirror, abt, num_remaining in iterator: try: perform_mirror(skopeo, mirror) @@ -173,7 +173,7 @@ def perform_mirror(skopeo, mirror): raise dest_server = ( - app_config.get("REPO_MIRROR_SERVER_HOSTNAME", None) or app_config["SERVER_HOSTNAME"] + app.config.get("REPO_MIRROR_SERVER_HOSTNAME", None) or app.config["SERVER_HOSTNAME"] ) for tag in tags: @@ -184,12 +184,12 @@ def perform_mirror(skopeo, mirror): mirror.repository.name, tag, ) - with database.CloseForLongOperation(app_config): + with database.CloseForLongOperation(app.config): result = skopeo.copy( src_image, dest_image, src_tls_verify=mirror.external_registry_config.get("verify_tls", True), - dest_tls_verify=app_config.get( + dest_tls_verify=app.config.get( "REPO_MIRROR_TLS_VERIFY", True ), # TODO: is this a config choice or something else? src_username=username, @@ -302,7 +302,7 @@ def get_all_tags(skopeo, mirror): mirror.external_registry_password.decrypt() if mirror.external_registry_password else None ) - with database.CloseForLongOperation(app_config): + with database.CloseForLongOperation(app.config): result = skopeo.tags( "docker://%s" % (mirror.external_reference), mirror.root_rule.rule_value, diff --git a/workers/repomirrorworker/repomirrorworker.py b/workers/repomirrorworker/repomirrorworker.py index a70ce1419..50003993c 100644 --- a/workers/repomirrorworker/repomirrorworker.py +++ b/workers/repomirrorworker/repomirrorworker.py @@ -38,7 +38,7 @@ class RepoMirrorWorker(Worker): break -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -47,7 +47,7 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ - worker = GunicornWorker(__name__, RepoMirrorWorker(), features.REPO_MIRROR) + worker = GunicornWorker(__name__, app, RepoMirrorWorker(), features.REPO_MIRROR) return worker diff --git a/workers/repositoryactioncounter.py b/workers/repositoryactioncounter.py index fb93cc36c..f815f12ba 100644 --- a/workers/repositoryactioncounter.py +++ b/workers/repositoryactioncounter.py @@ -6,12 +6,12 @@ from math import log10 import features +from app import app # This is required to initialize the database. from data import model, database from data.logs_model import logs_model -from singletons.config import app_config from util.migrate.allocator import yield_random_entries +from workers.worker import Worker, with_exponential_backoff from workers.gunicorn_worker import GunicornWorker -from workers.worker import Worker logger = logging.getLogger(__name__) @@ -105,7 +105,7 @@ class RepositoryActionCountWorker(Worker): return True -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -115,13 +115,13 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ worker = GunicornWorker( - __name__, RepositoryActionCountWorker(), features.REPOSITORY_ACTION_COUNTER + __name__, app, RepositoryActionCountWorker(), features.REPOSITORY_ACTION_COUNTER ) return worker if __name__ == "__main__": - if app_config.get("ACCOUNT_RECOVERY_MODE", False): + if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) diff --git a/workers/repositorygcworker.py b/workers/repositorygcworker.py index d777d8624..ce495b81d 100644 --- a/workers/repositorygcworker.py +++ b/workers/repositorygcworker.py @@ -3,13 +3,12 @@ import time import features +from app import repository_gc_queue, all_queues, app from data import model, database -from singletons.config import app_config -from singletons.workqueues import repository_gc_queue +from workers.queueworker import QueueWorker, WorkerSleepException from util.log import logfile_path from util.locking import GlobalLock, LockNotAcquiredException from workers.gunicorn_worker import GunicornWorker -from workers.queueworker import QueueWorker, WorkerSleepException logger = logging.getLogger(__name__) @@ -49,7 +48,7 @@ class RepositoryGCWorker(QueueWorker): raise Exception("GC interrupted; will retry") -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -64,14 +63,14 @@ def create_gunicorn_worker() -> GunicornWorker: reservation_seconds=REPOSITORY_GC_TIMEOUT, ) - worker = GunicornWorker(__name__, gc_worker, features.REPOSITORY_GARBAGE_COLLECTION) + worker = GunicornWorker(__name__, app, gc_worker, features.REPOSITORY_GARBAGE_COLLECTION) return worker if __name__ == "__main__": logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) - if app_config.get("ACCOUNT_RECOVERY_MODE", False): + if app.config.get("ACCOUNT_RECOVERY_MODE", False): logger.debug("Quay running in account recovery mode") while True: time.sleep(100000) @@ -81,7 +80,7 @@ if __name__ == "__main__": while True: time.sleep(100000) - GlobalLock.configure(app_config) + GlobalLock.configure(app.config) logger.debug("Starting repository GC worker") worker = RepositoryGCWorker( repository_gc_queue, diff --git a/workers/securityscanningnotificationworker.py b/workers/securityscanningnotificationworker.py index bbc8a077f..f87cdfc5f 100644 --- a/workers/securityscanningnotificationworker.py +++ b/workers/securityscanningnotificationworker.py @@ -132,7 +132,7 @@ class SecurityScanningNotificationWorker(QueueWorker): self.extend_processing(_PROCESSING_SECONDS_EXPIRATION, job_details) -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -145,7 +145,7 @@ def create_gunicorn_worker() -> GunicornWorker: note_worker = SecurityScanningNotificationWorker( secscan_notification_queue, poll_period_seconds=_POLL_PERIOD_SECONDS ) - worker = GunicornWorker(__name__, note_worker, feature_flag) + worker = GunicornWorker(__name__, app, note_worker, feature_flag) return worker diff --git a/workers/securityworker/securityworker.py b/workers/securityworker/securityworker.py index 3a41078c8..ba22ebf31 100644 --- a/workers/securityworker/securityworker.py +++ b/workers/securityworker/securityworker.py @@ -52,7 +52,7 @@ class SecurityWorker(Worker): self._model.perform_indexing_recent_manifests(batch_size) -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -62,7 +62,7 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ app.register_blueprint(v2_bp, url_prefix="/v2") - worker = GunicornWorker(__name__, SecurityWorker(), features.SECURITY_SCANNER) + worker = GunicornWorker(__name__, app, SecurityWorker(), features.SECURITY_SCANNER) return worker diff --git a/workers/servicekeyworker/servicekeyworker.py b/workers/servicekeyworker/servicekeyworker.py index d9a1e4a84..0e173faf7 100644 --- a/workers/servicekeyworker/servicekeyworker.py +++ b/workers/servicekeyworker/servicekeyworker.py @@ -56,7 +56,7 @@ class ServiceKeyWorker(Worker): instance_key_renewal_self.labels(True).inc() -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -65,7 +65,7 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ - worker = GunicornWorker(__name__, ServiceKeyWorker(), True) + worker = GunicornWorker(__name__, app, ServiceKeyWorker(), True) return worker diff --git a/workers/storagereplication.py b/workers/storagereplication.py index eff1f36c7..8fec70dae 100644 --- a/workers/storagereplication.py +++ b/workers/storagereplication.py @@ -170,7 +170,7 @@ class StorageReplicationWorker(QueueWorker): ) -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -193,7 +193,7 @@ def create_gunicorn_worker() -> GunicornWorker: poll_period_seconds=POLL_PERIOD_SECONDS, reservation_seconds=RESERVATION_SECONDS, ) - worker = GunicornWorker(__name__, repl_worker, feature_flag) + worker = GunicornWorker(__name__, app, repl_worker, feature_flag) return worker diff --git a/workers/teamsyncworker/teamsyncworker.py b/workers/teamsyncworker/teamsyncworker.py index d9d1c192b..193af9fbc 100644 --- a/workers/teamsyncworker/teamsyncworker.py +++ b/workers/teamsyncworker/teamsyncworker.py @@ -30,7 +30,7 @@ class TeamSynchronizationWorker(Worker): sync_teams_to_groups(authentication, STALE_CUTOFF) -def create_gunicorn_worker() -> GunicornWorker: +def create_gunicorn_worker(): """ follows the gunicorn application factory pattern, enabling a quay worker to run as a gunicorn worker thread. @@ -40,7 +40,7 @@ def create_gunicorn_worker() -> GunicornWorker: utilizing this method will enforce a 1:1 quay worker to gunicorn worker ratio. """ feature_flag = (features.TEAM_SYNCING) and (authentication.federated_service) - worker = GunicornWorker(__name__, TeamSynchronizationWorker(), feature_flag) + worker = GunicornWorker(__name__, app, TeamSynchronizationWorker(), feature_flag) return worker diff --git a/workers/worker.py b/workers/worker.py index 7e7265a24..38d76fbca 100644 --- a/workers/worker.py +++ b/workers/worker.py @@ -11,8 +11,9 @@ from threading import Event from apscheduler.schedulers.background import BackgroundScheduler from raven import Client + +from app import app from data.database import UseThenDisconnect -from singletons.config import app_config from util.log import logfile_path logger = logging.getLogger(__name__) @@ -62,9 +63,9 @@ class Worker(object): self._terminated = Event() self._raven_client = None - if app_config.get("EXCEPTION_LOG_TYPE", "FakeSentry") == "Sentry": + if app.config.get("EXCEPTION_LOG_TYPE", "FakeSentry") == "Sentry": worker_name = "%s:worker-%s" % (socket.gethostname(), self.__class__.__name__) - self._raven_client = Client(app_config.get("SENTRY_DSN", ""), name=worker_name) + self._raven_client = Client(app.config.get("SENTRY_DSN", ""), name=worker_name) def is_healthy(self): return not self._stop.is_set() @@ -82,7 +83,7 @@ class Worker(object): @wraps(operation_func) def _operation_func(): try: - with UseThenDisconnect(app_config): + with UseThenDisconnect(app.config): return operation_func() except Exception: logger.exception("Operation raised exception") @@ -102,12 +103,12 @@ class Worker(object): def start(self): logging.config.fileConfig(logfile_path(debug=False), disable_existing_loggers=False) - if not app_config.get("SETUP_COMPLETE", False): + if not app.config.get("SETUP_COMPLETE", False): logger.info("Product setup is not yet complete; skipping worker startup") self._setup_and_wait_for_shutdown() return - if app_config.get("REGISTRY_STATE", "normal") == "readonly": + if app.config.get("REGISTRY_STATE", "normal") == "readonly": logger.info("Product is in read-only mode; skipping worker startup") self._setup_and_wait_for_shutdown() return @@ -117,7 +118,7 @@ class Worker(object): self._sched.start() for operation_func, operation_sec in self._operations: start_date = datetime.now() + timedelta(seconds=0.001) - if app_config.get("STAGGER_WORKERS"): + if app.config.get("STAGGER_WORKERS"): start_date += timedelta(seconds=randint(1, operation_sec)) logger.debug("First run scheduled for %s", start_date) self._sched.add_job(