mirror of
https://github.com/quay/quay.git
synced 2025-04-18 10:44:06 +03:00
* chore: drop deprecated tables and remove unused code * isort imports * migration: check for table existence before drop
541 lines
19 KiB
Python
541 lines
19 KiB
Python
import datetime
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from abc import ABCMeta, abstractmethod
|
|
from collections import namedtuple
|
|
from contextlib import ContextDecorator
|
|
from enum import IntEnum, unique
|
|
|
|
import redis
|
|
|
|
from util import slash_join
|
|
from util.expiresdict import ExpiresDict
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
ONE_DAY = 60 * 60 * 24
|
|
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION = 5
|
|
DEFAULT_LOCK_EXPIRATION = 10000
|
|
|
|
REDIS_EXPIRING_SUFFIX = "/expiring"
|
|
REDIS_EXPIRED_SUFFIX = "/expired"
|
|
REDIS_DEFAULT_PUBSUB_KEY = "orchestrator_events"
|
|
REDIS_EVENT_KIND_MESSAGE = "message"
|
|
REDIS_EVENT_KIND_PMESSAGE = "pmessage"
|
|
REDIS_NONEXPIRING_KEY = -1
|
|
|
|
# This constant defines the Redis configuration flags used to watch [K]eyspace and e[x]pired
|
|
# events on keys. For more info, see https://redis.io/topics/notifications#configuration
|
|
REDIS_KEYSPACE_EXPIRED_EVENT_CONFIG_VALUE = "Kx"
|
|
REDIS_KEYSPACE_EVENT_CONFIG_KEY = "notify-keyspace-events"
|
|
REDIS_KEYSPACE_KEY_PATTERN = "__keyspace@%s__:%s"
|
|
REDIS_EXPIRED_KEYSPACE_PATTERN = slash_join(REDIS_KEYSPACE_KEY_PATTERN, REDIS_EXPIRING_SUFFIX)
|
|
REDIS_EXPIRED_KEYSPACE_REGEX = re.compile(REDIS_EXPIRED_KEYSPACE_PATTERN % (r"(\S+)", r"(\S+)"))
|
|
|
|
|
|
def orchestrator_from_config(manager_config, canceller_only=False):
|
|
"""
|
|
:param manager_config: the configuration for the orchestrator
|
|
:type manager_config: dict
|
|
:rtype: :class: Orchestrator
|
|
"""
|
|
# Sanity check that legacy prefixes are no longer being used.
|
|
for key in list(manager_config["ORCHESTRATOR"].keys()):
|
|
words = key.split("_")
|
|
if len(words) > 1 and words[-1].lower() == "prefix":
|
|
raise AssertionError("legacy prefix used, use ORCHESTRATOR_PREFIX instead")
|
|
|
|
def _dict_key_prefix(d):
|
|
"""
|
|
:param d: the dict that has keys prefixed with underscore
|
|
:type d: {str: any}
|
|
:rtype: str
|
|
"""
|
|
return list(d.keys())[0].split("_", 1)[0].lower()
|
|
|
|
orchestrator_name = _dict_key_prefix(manager_config["ORCHESTRATOR"])
|
|
|
|
def format_key(key):
|
|
return key.lower().split("_", 1)[1]
|
|
|
|
orchestrator_kwargs = {
|
|
format_key(key): value for (key, value) in manager_config["ORCHESTRATOR"].items()
|
|
}
|
|
|
|
if manager_config.get("ORCHESTRATOR_PREFIX") is not None:
|
|
orchestrator_kwargs["orchestrator_prefix"] = manager_config["ORCHESTRATOR_PREFIX"]
|
|
|
|
orchestrator_kwargs["canceller_only"] = canceller_only
|
|
|
|
logger.debug(
|
|
"attempting to create orchestrator %s with kwargs %s",
|
|
orchestrator_name,
|
|
orchestrator_kwargs,
|
|
)
|
|
return orchestrator_by_name(orchestrator_name, **orchestrator_kwargs)
|
|
|
|
|
|
def orchestrator_by_name(name, **kwargs):
|
|
_ORCHESTRATORS = {
|
|
"mem": MemoryOrchestrator,
|
|
"redis": RedisOrchestrator,
|
|
}
|
|
return _ORCHESTRATORS.get(name, MemoryOrchestrator)(**kwargs)
|
|
|
|
|
|
class OrchestratorError(Exception):
|
|
pass
|
|
|
|
|
|
# TODO: replace with ConnectionError when this codebase is Python 3.
|
|
class OrchestratorConnectionError(OrchestratorError):
|
|
pass
|
|
|
|
|
|
@unique
|
|
class KeyEvent(IntEnum):
|
|
CREATE = 1
|
|
SET = 2
|
|
DELETE = 3
|
|
EXPIRE = 4
|
|
|
|
|
|
class KeyChange(namedtuple("KeyChange", ["event", "key", "value"])):
|
|
pass
|
|
|
|
|
|
class Orchestrator(metaclass=ABCMeta):
|
|
"""
|
|
Orchestrator is the interface that is used to synchronize the build states across build
|
|
managers.
|
|
|
|
This interface assumes that storage is being done by a key-value store
|
|
that supports watching for events on keys.
|
|
|
|
Missing keys should return KeyError; otherwise, errors should raise an
|
|
OrchestratorError.
|
|
|
|
:param key_prefix: the prefix of keys being watched
|
|
:type key_prefix: str
|
|
"""
|
|
|
|
@abstractmethod
|
|
def on_key_change(self, key, callback, restarter=None):
|
|
"""
|
|
The callback parameter takes in a KeyChange object as a parameter.
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_prefixed_keys(self, prefix):
|
|
"""
|
|
:returns: a dict of key value pairs beginning with prefix
|
|
:rtype: {str: str}
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_key(self, key):
|
|
"""
|
|
:returns: the value stored at the provided key
|
|
:rtype: str
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def set_key(self, key, value, overwrite=False, expiration=None):
|
|
"""
|
|
:param key: the identifier for the value
|
|
:type key: str
|
|
:param value: the value being stored
|
|
:type value: str
|
|
:param overwrite: whether or not a KeyError is thrown if the key already exists
|
|
:type overwrite: bool
|
|
:param expiration: the duration in seconds that a key should be available
|
|
:type expiration: int
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def delete_key(self, key):
|
|
"""
|
|
Deletes a key that has been set in the orchestrator.
|
|
|
|
:param key: the identifier for the key
|
|
:type key: str
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
|
|
"""
|
|
Takes a lock for synchronizing exclusive operations cluster-wide.
|
|
|
|
:param key: the identifier for the lock
|
|
:type key: str
|
|
:param expiration: the duration until the lock expires
|
|
:type expiration: :class:`datetime.timedelta` or int (seconds)
|
|
:returns: whether or not the lock was acquired
|
|
:rtype: bool
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def shutdown(self):
|
|
"""
|
|
This function should shutdown any final resources allocated by the Orchestrator.
|
|
"""
|
|
pass
|
|
|
|
|
|
def _sleep_orchestrator():
|
|
"""
|
|
This function blocks by sleeping in order to backoff if a failure
|
|
such as a ConnectionError has occurred.
|
|
"""
|
|
logger.exception(
|
|
"Connecting to orchestrator failed; sleeping for %s and then trying again",
|
|
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION,
|
|
)
|
|
time.sleep(ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION)
|
|
logger.exception(
|
|
"Connecting to orchestrator failed; slept for %s and now trying again",
|
|
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION,
|
|
)
|
|
|
|
|
|
class MemoryOrchestrator(Orchestrator):
|
|
def __init__(self, **kwargs):
|
|
self.state = ExpiresDict()
|
|
self.callbacks = {}
|
|
|
|
def _callbacks_prefixed(self, key):
|
|
return (callback for (prefix, callback) in self.callbacks.items() if key.startswith(prefix))
|
|
|
|
def on_key_change(self, key, callback, restarter=None):
|
|
self.callbacks[key] = callback
|
|
|
|
def get_prefixed_keys(self, prefix):
|
|
return {
|
|
k: value
|
|
for (k, value) in list(self.state.items())
|
|
if k.startswith(prefix)
|
|
and not k.endswith(REDIS_EXPIRED_SUFFIX)
|
|
and not k.endswith(REDIS_EXPIRING_SUFFIX)
|
|
}
|
|
|
|
def get_key(self, key):
|
|
return self.state[key]
|
|
|
|
def set_key(self, key, value, overwrite=False, expiration=None):
|
|
preexisting_key = key in self.state
|
|
if preexisting_key and not overwrite:
|
|
raise KeyError(key)
|
|
|
|
# Simulate redis' behavior when using xx and the key does not exist.
|
|
if not preexisting_key and overwrite:
|
|
return
|
|
|
|
absolute_expiration = None
|
|
if expiration is not None:
|
|
absolute_expiration = datetime.datetime.now() + datetime.timedelta(seconds=expiration)
|
|
|
|
self.state.set(key, value, expires=absolute_expiration)
|
|
self.state.set(slash_join(key, REDIS_EXPIRING_SUFFIX), value, expires=absolute_expiration)
|
|
|
|
event = KeyEvent.CREATE if not preexisting_key else KeyEvent.SET
|
|
for callback in self._callbacks_prefixed(key):
|
|
callback(KeyChange(event, key, value))
|
|
|
|
def delete_key(self, key):
|
|
value = self.state[key]
|
|
del self.state[key]
|
|
|
|
for callback in self._callbacks_prefixed(key):
|
|
callback(KeyChange(KeyEvent.DELETE, key, value))
|
|
|
|
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
|
|
try:
|
|
self.set_key(key, "", overwrite=False, expiration=expiration)
|
|
except KeyError:
|
|
return False
|
|
return True
|
|
|
|
def shutdown(self):
|
|
self.state = None
|
|
self.callbacks = None
|
|
|
|
|
|
class RedisOrchestrator(Orchestrator):
|
|
def __init__(
|
|
self,
|
|
host="127.0.0.1",
|
|
port=6379,
|
|
password=None,
|
|
db=0,
|
|
cert_and_key=None,
|
|
ca_cert=None,
|
|
ssl=False,
|
|
skip_keyspace_event_setup=False,
|
|
canceller_only=False,
|
|
**kwargs,
|
|
):
|
|
self.is_canceller_only = canceller_only
|
|
(cert, key) = tuple(cert_and_key) if cert_and_key is not None else (None, None)
|
|
self._client = redis.StrictRedis(
|
|
host=host,
|
|
port=port,
|
|
password=password,
|
|
db=db,
|
|
ssl_certfile=cert,
|
|
ssl_keyfile=key,
|
|
ssl_ca_certs=ca_cert,
|
|
ssl=ssl,
|
|
socket_connect_timeout=1,
|
|
health_check_interval=2,
|
|
)
|
|
|
|
self._shutting_down = False
|
|
self._watched_keys = {}
|
|
self._pubsub_key = slash_join(
|
|
kwargs.get("orchestrator_prefix", ""), REDIS_DEFAULT_PUBSUB_KEY
|
|
).lstrip("/")
|
|
|
|
if not self.is_canceller_only:
|
|
# sleep_time is not really calling time.sleep(). It is the socket's timeout value.
|
|
# run_in_thread uses an event loop that uses a non-blocking `parse_response` of the PubSub object.
|
|
# This means the event loop will return immedietely even if there are no new messages.
|
|
# Setting a value other than the default 0 prevents that thread from exhausting CPU time.
|
|
# https://github.com/andymccurdy/redis-py/issues/821
|
|
|
|
# Configure a subscription to watch events that the orchestrator manually publishes.
|
|
logger.debug("creating pubsub with key %s", self._pubsub_key)
|
|
self._pubsub = self._client.pubsub()
|
|
self._pubsub.subscribe(**{self._pubsub_key: self._published_key_handler})
|
|
self._pubsub_thread = self._pubsub.run_in_thread(daemon=True, sleep_time=5)
|
|
|
|
# Configure a subscription to watch expired keyspace events.
|
|
if not skip_keyspace_event_setup:
|
|
self._client.config_set(
|
|
REDIS_KEYSPACE_EVENT_CONFIG_KEY, REDIS_KEYSPACE_EXPIRED_EVENT_CONFIG_VALUE
|
|
)
|
|
|
|
self._pubsub_expiring = self._client.pubsub()
|
|
self._pubsub_expiring.psubscribe(
|
|
**{REDIS_EXPIRED_KEYSPACE_PATTERN % (db, "*"): self._expiring_key_handler}
|
|
)
|
|
self._pubsub_expiring_thread = self._pubsub_expiring.run_in_thread(
|
|
daemon=True, sleep_time=5
|
|
)
|
|
|
|
def _expiring_key_handler(self, message):
|
|
try:
|
|
message_tup = (
|
|
message.get("type"),
|
|
message.get("pattern").decode("utf-8"),
|
|
message.get("channel").decode("utf-8"),
|
|
message.get("data").decode("utf-8"),
|
|
)
|
|
if self._is_expired_keyspace_event(message_tup):
|
|
# Get the value of the original key before the expiration happened.
|
|
key = self._key_from_expiration(message_tup)
|
|
expired_value = self._client.get(key)
|
|
|
|
# Mark key as expired. This key is used to track post job cleanup in the callback,
|
|
# to allow another manager to pickup the cleanup if this fails.
|
|
if expired_value:
|
|
self._client.set(slash_join(key, REDIS_EXPIRED_SUFFIX), expired_value)
|
|
self._client.delete(key)
|
|
|
|
except redis.ConnectionError:
|
|
_sleep_orchestrator()
|
|
except redis.RedisError as re:
|
|
logger.exception("Redis exception watching redis expirations: %s - %s", key, re)
|
|
except Exception as e:
|
|
logger.exception("Unknown exception watching redis expirations: %s - %s", key, e)
|
|
|
|
if self._is_expired_keyspace_event(message_tup) and expired_value is not None:
|
|
for watched_key, callback in self._watched_keys.items():
|
|
if key.startswith(watched_key):
|
|
callback(KeyChange(KeyEvent.EXPIRE, key, expired_value))
|
|
|
|
def _published_key_handler(self, message):
|
|
try:
|
|
redis_event, event_key, event_value = (
|
|
message.get("type"),
|
|
message.get("channel").decode("utf-8"),
|
|
message.get("data").decode("utf-8"),
|
|
)
|
|
except redis.ConnectionError:
|
|
_sleep_orchestrator()
|
|
except redis.RedisError as re:
|
|
logger.exception("Redis exception watching redis expirations: %s - %s", key, re)
|
|
except Exception as e:
|
|
logger.exception("Unknown exception watching redis expirations: %s - %s", key, e)
|
|
|
|
if redis_event == REDIS_EVENT_KIND_MESSAGE:
|
|
keychange = self._publish_to_keychange(event_value)
|
|
for watched_key, callback in self._watched_keys.items():
|
|
if keychange.key.startswith(watched_key):
|
|
callback(keychange)
|
|
|
|
def on_key_change(self, key, callback, restarter=None):
|
|
assert not self.is_canceller_only
|
|
|
|
logger.debug("watching key: %s", key)
|
|
self._watched_keys[key] = callback
|
|
|
|
@staticmethod
|
|
def _is_expired_keyspace_event(event_result):
|
|
"""
|
|
Sanity check that this isn't an unrelated keyspace event.
|
|
|
|
There could be a more efficient keyspace event config to avoid this client-side filter.
|
|
"""
|
|
if event_result is None:
|
|
return False
|
|
|
|
(redis_event, _pattern, matched_key, expired) = event_result
|
|
return (
|
|
redis_event == REDIS_EVENT_KIND_PMESSAGE
|
|
and expired == "expired"
|
|
and REDIS_EXPIRED_KEYSPACE_REGEX.match(matched_key) is not None
|
|
)
|
|
|
|
@staticmethod
|
|
def _key_from_expiration(event_result):
|
|
(_redis_event, _pattern, matched_key, _expired) = event_result
|
|
return REDIS_EXPIRED_KEYSPACE_REGEX.match(matched_key).groups()[1]
|
|
|
|
@staticmethod
|
|
def _publish_to_keychange(event_value):
|
|
e = json.loads(event_value)
|
|
return KeyChange(KeyEvent(e["event"]), e["key"], e["value"])
|
|
|
|
def get_prefixed_keys(self, prefix):
|
|
assert not self.is_canceller_only
|
|
|
|
# TODO: This can probably be done with redis pipelines to make it transactional.
|
|
keys = self._client.keys(prefix + "*")
|
|
|
|
# Yielding to the event loop is required, thus this cannot be written as a dict comprehension.
|
|
results = {}
|
|
for key in keys:
|
|
if key.decode("utf-8").endswith(REDIS_EXPIRING_SUFFIX) or key.decode("utf-8").endswith(
|
|
REDIS_EXPIRED_SUFFIX
|
|
):
|
|
continue
|
|
ttl = self._client.ttl(key)
|
|
if ttl == REDIS_NONEXPIRING_KEY:
|
|
# Only redis keys without expirations are live build manager keys.
|
|
try:
|
|
value = self._client.get(key)
|
|
if value is None:
|
|
raise KeyError(key)
|
|
except redis.ConnectionError as rce:
|
|
raise OrchestratorConnectionError(rce)
|
|
except redis.RedisError as re:
|
|
raise OrchestratorError(re)
|
|
|
|
results.update({key.decode("utf-8"): value.decode("utf-8")})
|
|
|
|
return results
|
|
|
|
def _key_is_expired(self, key):
|
|
expired_key = slash_join(key, REDIS_EXPIRED_SUFFIX)
|
|
expired_val = self._client.get(key)
|
|
if expired_val is None:
|
|
return False
|
|
return True
|
|
|
|
def get_key(self, key):
|
|
assert not self.is_canceller_only
|
|
|
|
try:
|
|
value = self._client.get(key)
|
|
if value is None:
|
|
# If expired, the expired key should have been removed but still exists.
|
|
# Delete the key if that's the case.
|
|
if self._key_is_expired(key):
|
|
self._client.delete(slash_join(key, REDIS_EXPIRED_SUFFIX))
|
|
raise KeyError(key)
|
|
except redis.ConnectionError as rce:
|
|
raise OrchestratorConnectionError(rce)
|
|
except redis.RedisError as re:
|
|
raise OrchestratorError(re)
|
|
|
|
return value.decode("utf-8")
|
|
|
|
def set_key(self, key, value, overwrite=False, expiration=None):
|
|
try:
|
|
already_exists = self._client.exists(key)
|
|
if already_exists and not overwrite:
|
|
raise KeyError(key)
|
|
|
|
# Set an expiration in case that the handler was not able to delete the the original key.
|
|
# The extra leeway is so the expire event handler has time to get the original value and publish the event.
|
|
self._client.set(key, value, xx=overwrite)
|
|
if expiration is not None:
|
|
self._client.expire(key, expiration + ONE_DAY)
|
|
overwrite_expiring_key = self._client.exists(slash_join(key, REDIS_EXPIRING_SUFFIX))
|
|
# The "expiring/*" are only used to publish the EXPIRE event. A separate key is needed
|
|
# because the the EXPIRE event does not include the original key value.
|
|
self._client.set(
|
|
slash_join(key, REDIS_EXPIRING_SUFFIX),
|
|
"",
|
|
xx=overwrite_expiring_key,
|
|
ex=expiration,
|
|
)
|
|
# Remove any expired key that might have previously been created but not removed
|
|
# if a new expiration is set.
|
|
self._client.delete(slash_join(key, REDIS_EXPIRED_SUFFIX))
|
|
key_event = KeyEvent.SET if already_exists else KeyEvent.CREATE
|
|
self._publish(event=key_event, key=key, value=value)
|
|
except redis.ConnectionError as rce:
|
|
raise OrchestratorConnectionError(rce)
|
|
except redis.RedisError as re:
|
|
raise OrchestratorError(re)
|
|
|
|
def _publish(self, **kwargs):
|
|
kwargs["event"] = int(kwargs["event"])
|
|
event_json = json.dumps(kwargs)
|
|
logger.debug("publishing event: %s", event_json)
|
|
self._client.publish(self._pubsub_key, event_json)
|
|
|
|
def delete_key(self, key):
|
|
assert not self.is_canceller_only
|
|
|
|
try:
|
|
value = self._client.get(key)
|
|
if value is None:
|
|
raise KeyError(key)
|
|
self._client.delete(key)
|
|
self._client.delete(slash_join(key, REDIS_EXPIRING_SUFFIX))
|
|
self._client.delete(slash_join(key, REDIS_EXPIRED_SUFFIX))
|
|
if value is not None:
|
|
self._publish(event=KeyEvent.DELETE, key=key, value=value.decode("utf-8"))
|
|
except redis.ConnectionError as rce:
|
|
raise OrchestratorConnectionError(rce)
|
|
except redis.RedisError as re:
|
|
raise OrchestratorError(re)
|
|
|
|
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
|
|
assert not self.is_canceller_only
|
|
try:
|
|
self.set_key(key, "", overwrite=False, expiration=expiration)
|
|
except KeyError:
|
|
return False
|
|
return True
|
|
|
|
def shutdown(self):
|
|
logger.debug("Shutting down redis client.")
|
|
self._shutting_down = True
|
|
|
|
if self.is_canceller_only:
|
|
return
|
|
|
|
self._pubsub_thread.stop()
|
|
self._pubsub_expiring_thread.stop()
|