1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/buildman/orchestrator.py
Kenny Lee Sin Cheong 5f63b3a7bb
chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

541 lines
19 KiB
Python

import datetime
import json
import logging
import re
import time
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from contextlib import ContextDecorator
from enum import IntEnum, unique
import redis
from util import slash_join
from util.expiresdict import ExpiresDict
logger = logging.getLogger(__name__)
ONE_DAY = 60 * 60 * 24
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION = 5
DEFAULT_LOCK_EXPIRATION = 10000
REDIS_EXPIRING_SUFFIX = "/expiring"
REDIS_EXPIRED_SUFFIX = "/expired"
REDIS_DEFAULT_PUBSUB_KEY = "orchestrator_events"
REDIS_EVENT_KIND_MESSAGE = "message"
REDIS_EVENT_KIND_PMESSAGE = "pmessage"
REDIS_NONEXPIRING_KEY = -1
# This constant defines the Redis configuration flags used to watch [K]eyspace and e[x]pired
# events on keys. For more info, see https://redis.io/topics/notifications#configuration
REDIS_KEYSPACE_EXPIRED_EVENT_CONFIG_VALUE = "Kx"
REDIS_KEYSPACE_EVENT_CONFIG_KEY = "notify-keyspace-events"
REDIS_KEYSPACE_KEY_PATTERN = "__keyspace@%s__:%s"
REDIS_EXPIRED_KEYSPACE_PATTERN = slash_join(REDIS_KEYSPACE_KEY_PATTERN, REDIS_EXPIRING_SUFFIX)
REDIS_EXPIRED_KEYSPACE_REGEX = re.compile(REDIS_EXPIRED_KEYSPACE_PATTERN % (r"(\S+)", r"(\S+)"))
def orchestrator_from_config(manager_config, canceller_only=False):
"""
:param manager_config: the configuration for the orchestrator
:type manager_config: dict
:rtype: :class: Orchestrator
"""
# Sanity check that legacy prefixes are no longer being used.
for key in list(manager_config["ORCHESTRATOR"].keys()):
words = key.split("_")
if len(words) > 1 and words[-1].lower() == "prefix":
raise AssertionError("legacy prefix used, use ORCHESTRATOR_PREFIX instead")
def _dict_key_prefix(d):
"""
:param d: the dict that has keys prefixed with underscore
:type d: {str: any}
:rtype: str
"""
return list(d.keys())[0].split("_", 1)[0].lower()
orchestrator_name = _dict_key_prefix(manager_config["ORCHESTRATOR"])
def format_key(key):
return key.lower().split("_", 1)[1]
orchestrator_kwargs = {
format_key(key): value for (key, value) in manager_config["ORCHESTRATOR"].items()
}
if manager_config.get("ORCHESTRATOR_PREFIX") is not None:
orchestrator_kwargs["orchestrator_prefix"] = manager_config["ORCHESTRATOR_PREFIX"]
orchestrator_kwargs["canceller_only"] = canceller_only
logger.debug(
"attempting to create orchestrator %s with kwargs %s",
orchestrator_name,
orchestrator_kwargs,
)
return orchestrator_by_name(orchestrator_name, **orchestrator_kwargs)
def orchestrator_by_name(name, **kwargs):
_ORCHESTRATORS = {
"mem": MemoryOrchestrator,
"redis": RedisOrchestrator,
}
return _ORCHESTRATORS.get(name, MemoryOrchestrator)(**kwargs)
class OrchestratorError(Exception):
pass
# TODO: replace with ConnectionError when this codebase is Python 3.
class OrchestratorConnectionError(OrchestratorError):
pass
@unique
class KeyEvent(IntEnum):
CREATE = 1
SET = 2
DELETE = 3
EXPIRE = 4
class KeyChange(namedtuple("KeyChange", ["event", "key", "value"])):
pass
class Orchestrator(metaclass=ABCMeta):
"""
Orchestrator is the interface that is used to synchronize the build states across build
managers.
This interface assumes that storage is being done by a key-value store
that supports watching for events on keys.
Missing keys should return KeyError; otherwise, errors should raise an
OrchestratorError.
:param key_prefix: the prefix of keys being watched
:type key_prefix: str
"""
@abstractmethod
def on_key_change(self, key, callback, restarter=None):
"""
The callback parameter takes in a KeyChange object as a parameter.
"""
pass
@abstractmethod
def get_prefixed_keys(self, prefix):
"""
:returns: a dict of key value pairs beginning with prefix
:rtype: {str: str}
"""
pass
@abstractmethod
def get_key(self, key):
"""
:returns: the value stored at the provided key
:rtype: str
"""
pass
@abstractmethod
def set_key(self, key, value, overwrite=False, expiration=None):
"""
:param key: the identifier for the value
:type key: str
:param value: the value being stored
:type value: str
:param overwrite: whether or not a KeyError is thrown if the key already exists
:type overwrite: bool
:param expiration: the duration in seconds that a key should be available
:type expiration: int
"""
pass
@abstractmethod
def delete_key(self, key):
"""
Deletes a key that has been set in the orchestrator.
:param key: the identifier for the key
:type key: str
"""
pass
@abstractmethod
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
"""
Takes a lock for synchronizing exclusive operations cluster-wide.
:param key: the identifier for the lock
:type key: str
:param expiration: the duration until the lock expires
:type expiration: :class:`datetime.timedelta` or int (seconds)
:returns: whether or not the lock was acquired
:rtype: bool
"""
pass
@abstractmethod
def shutdown(self):
"""
This function should shutdown any final resources allocated by the Orchestrator.
"""
pass
def _sleep_orchestrator():
"""
This function blocks by sleeping in order to backoff if a failure
such as a ConnectionError has occurred.
"""
logger.exception(
"Connecting to orchestrator failed; sleeping for %s and then trying again",
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION,
)
time.sleep(ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION)
logger.exception(
"Connecting to orchestrator failed; slept for %s and now trying again",
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION,
)
class MemoryOrchestrator(Orchestrator):
def __init__(self, **kwargs):
self.state = ExpiresDict()
self.callbacks = {}
def _callbacks_prefixed(self, key):
return (callback for (prefix, callback) in self.callbacks.items() if key.startswith(prefix))
def on_key_change(self, key, callback, restarter=None):
self.callbacks[key] = callback
def get_prefixed_keys(self, prefix):
return {
k: value
for (k, value) in list(self.state.items())
if k.startswith(prefix)
and not k.endswith(REDIS_EXPIRED_SUFFIX)
and not k.endswith(REDIS_EXPIRING_SUFFIX)
}
def get_key(self, key):
return self.state[key]
def set_key(self, key, value, overwrite=False, expiration=None):
preexisting_key = key in self.state
if preexisting_key and not overwrite:
raise KeyError(key)
# Simulate redis' behavior when using xx and the key does not exist.
if not preexisting_key and overwrite:
return
absolute_expiration = None
if expiration is not None:
absolute_expiration = datetime.datetime.now() + datetime.timedelta(seconds=expiration)
self.state.set(key, value, expires=absolute_expiration)
self.state.set(slash_join(key, REDIS_EXPIRING_SUFFIX), value, expires=absolute_expiration)
event = KeyEvent.CREATE if not preexisting_key else KeyEvent.SET
for callback in self._callbacks_prefixed(key):
callback(KeyChange(event, key, value))
def delete_key(self, key):
value = self.state[key]
del self.state[key]
for callback in self._callbacks_prefixed(key):
callback(KeyChange(KeyEvent.DELETE, key, value))
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
try:
self.set_key(key, "", overwrite=False, expiration=expiration)
except KeyError:
return False
return True
def shutdown(self):
self.state = None
self.callbacks = None
class RedisOrchestrator(Orchestrator):
def __init__(
self,
host="127.0.0.1",
port=6379,
password=None,
db=0,
cert_and_key=None,
ca_cert=None,
ssl=False,
skip_keyspace_event_setup=False,
canceller_only=False,
**kwargs,
):
self.is_canceller_only = canceller_only
(cert, key) = tuple(cert_and_key) if cert_and_key is not None else (None, None)
self._client = redis.StrictRedis(
host=host,
port=port,
password=password,
db=db,
ssl_certfile=cert,
ssl_keyfile=key,
ssl_ca_certs=ca_cert,
ssl=ssl,
socket_connect_timeout=1,
health_check_interval=2,
)
self._shutting_down = False
self._watched_keys = {}
self._pubsub_key = slash_join(
kwargs.get("orchestrator_prefix", ""), REDIS_DEFAULT_PUBSUB_KEY
).lstrip("/")
if not self.is_canceller_only:
# sleep_time is not really calling time.sleep(). It is the socket's timeout value.
# run_in_thread uses an event loop that uses a non-blocking `parse_response` of the PubSub object.
# This means the event loop will return immedietely even if there are no new messages.
# Setting a value other than the default 0 prevents that thread from exhausting CPU time.
# https://github.com/andymccurdy/redis-py/issues/821
# Configure a subscription to watch events that the orchestrator manually publishes.
logger.debug("creating pubsub with key %s", self._pubsub_key)
self._pubsub = self._client.pubsub()
self._pubsub.subscribe(**{self._pubsub_key: self._published_key_handler})
self._pubsub_thread = self._pubsub.run_in_thread(daemon=True, sleep_time=5)
# Configure a subscription to watch expired keyspace events.
if not skip_keyspace_event_setup:
self._client.config_set(
REDIS_KEYSPACE_EVENT_CONFIG_KEY, REDIS_KEYSPACE_EXPIRED_EVENT_CONFIG_VALUE
)
self._pubsub_expiring = self._client.pubsub()
self._pubsub_expiring.psubscribe(
**{REDIS_EXPIRED_KEYSPACE_PATTERN % (db, "*"): self._expiring_key_handler}
)
self._pubsub_expiring_thread = self._pubsub_expiring.run_in_thread(
daemon=True, sleep_time=5
)
def _expiring_key_handler(self, message):
try:
message_tup = (
message.get("type"),
message.get("pattern").decode("utf-8"),
message.get("channel").decode("utf-8"),
message.get("data").decode("utf-8"),
)
if self._is_expired_keyspace_event(message_tup):
# Get the value of the original key before the expiration happened.
key = self._key_from_expiration(message_tup)
expired_value = self._client.get(key)
# Mark key as expired. This key is used to track post job cleanup in the callback,
# to allow another manager to pickup the cleanup if this fails.
if expired_value:
self._client.set(slash_join(key, REDIS_EXPIRED_SUFFIX), expired_value)
self._client.delete(key)
except redis.ConnectionError:
_sleep_orchestrator()
except redis.RedisError as re:
logger.exception("Redis exception watching redis expirations: %s - %s", key, re)
except Exception as e:
logger.exception("Unknown exception watching redis expirations: %s - %s", key, e)
if self._is_expired_keyspace_event(message_tup) and expired_value is not None:
for watched_key, callback in self._watched_keys.items():
if key.startswith(watched_key):
callback(KeyChange(KeyEvent.EXPIRE, key, expired_value))
def _published_key_handler(self, message):
try:
redis_event, event_key, event_value = (
message.get("type"),
message.get("channel").decode("utf-8"),
message.get("data").decode("utf-8"),
)
except redis.ConnectionError:
_sleep_orchestrator()
except redis.RedisError as re:
logger.exception("Redis exception watching redis expirations: %s - %s", key, re)
except Exception as e:
logger.exception("Unknown exception watching redis expirations: %s - %s", key, e)
if redis_event == REDIS_EVENT_KIND_MESSAGE:
keychange = self._publish_to_keychange(event_value)
for watched_key, callback in self._watched_keys.items():
if keychange.key.startswith(watched_key):
callback(keychange)
def on_key_change(self, key, callback, restarter=None):
assert not self.is_canceller_only
logger.debug("watching key: %s", key)
self._watched_keys[key] = callback
@staticmethod
def _is_expired_keyspace_event(event_result):
"""
Sanity check that this isn't an unrelated keyspace event.
There could be a more efficient keyspace event config to avoid this client-side filter.
"""
if event_result is None:
return False
(redis_event, _pattern, matched_key, expired) = event_result
return (
redis_event == REDIS_EVENT_KIND_PMESSAGE
and expired == "expired"
and REDIS_EXPIRED_KEYSPACE_REGEX.match(matched_key) is not None
)
@staticmethod
def _key_from_expiration(event_result):
(_redis_event, _pattern, matched_key, _expired) = event_result
return REDIS_EXPIRED_KEYSPACE_REGEX.match(matched_key).groups()[1]
@staticmethod
def _publish_to_keychange(event_value):
e = json.loads(event_value)
return KeyChange(KeyEvent(e["event"]), e["key"], e["value"])
def get_prefixed_keys(self, prefix):
assert not self.is_canceller_only
# TODO: This can probably be done with redis pipelines to make it transactional.
keys = self._client.keys(prefix + "*")
# Yielding to the event loop is required, thus this cannot be written as a dict comprehension.
results = {}
for key in keys:
if key.decode("utf-8").endswith(REDIS_EXPIRING_SUFFIX) or key.decode("utf-8").endswith(
REDIS_EXPIRED_SUFFIX
):
continue
ttl = self._client.ttl(key)
if ttl == REDIS_NONEXPIRING_KEY:
# Only redis keys without expirations are live build manager keys.
try:
value = self._client.get(key)
if value is None:
raise KeyError(key)
except redis.ConnectionError as rce:
raise OrchestratorConnectionError(rce)
except redis.RedisError as re:
raise OrchestratorError(re)
results.update({key.decode("utf-8"): value.decode("utf-8")})
return results
def _key_is_expired(self, key):
expired_key = slash_join(key, REDIS_EXPIRED_SUFFIX)
expired_val = self._client.get(key)
if expired_val is None:
return False
return True
def get_key(self, key):
assert not self.is_canceller_only
try:
value = self._client.get(key)
if value is None:
# If expired, the expired key should have been removed but still exists.
# Delete the key if that's the case.
if self._key_is_expired(key):
self._client.delete(slash_join(key, REDIS_EXPIRED_SUFFIX))
raise KeyError(key)
except redis.ConnectionError as rce:
raise OrchestratorConnectionError(rce)
except redis.RedisError as re:
raise OrchestratorError(re)
return value.decode("utf-8")
def set_key(self, key, value, overwrite=False, expiration=None):
try:
already_exists = self._client.exists(key)
if already_exists and not overwrite:
raise KeyError(key)
# Set an expiration in case that the handler was not able to delete the the original key.
# The extra leeway is so the expire event handler has time to get the original value and publish the event.
self._client.set(key, value, xx=overwrite)
if expiration is not None:
self._client.expire(key, expiration + ONE_DAY)
overwrite_expiring_key = self._client.exists(slash_join(key, REDIS_EXPIRING_SUFFIX))
# The "expiring/*" are only used to publish the EXPIRE event. A separate key is needed
# because the the EXPIRE event does not include the original key value.
self._client.set(
slash_join(key, REDIS_EXPIRING_SUFFIX),
"",
xx=overwrite_expiring_key,
ex=expiration,
)
# Remove any expired key that might have previously been created but not removed
# if a new expiration is set.
self._client.delete(slash_join(key, REDIS_EXPIRED_SUFFIX))
key_event = KeyEvent.SET if already_exists else KeyEvent.CREATE
self._publish(event=key_event, key=key, value=value)
except redis.ConnectionError as rce:
raise OrchestratorConnectionError(rce)
except redis.RedisError as re:
raise OrchestratorError(re)
def _publish(self, **kwargs):
kwargs["event"] = int(kwargs["event"])
event_json = json.dumps(kwargs)
logger.debug("publishing event: %s", event_json)
self._client.publish(self._pubsub_key, event_json)
def delete_key(self, key):
assert not self.is_canceller_only
try:
value = self._client.get(key)
if value is None:
raise KeyError(key)
self._client.delete(key)
self._client.delete(slash_join(key, REDIS_EXPIRING_SUFFIX))
self._client.delete(slash_join(key, REDIS_EXPIRED_SUFFIX))
if value is not None:
self._publish(event=KeyEvent.DELETE, key=key, value=value.decode("utf-8"))
except redis.ConnectionError as rce:
raise OrchestratorConnectionError(rce)
except redis.RedisError as re:
raise OrchestratorError(re)
def lock(self, key, expiration=DEFAULT_LOCK_EXPIRATION):
assert not self.is_canceller_only
try:
self.set_key(key, "", overwrite=False, expiration=expiration)
except KeyError:
return False
return True
def shutdown(self):
logger.debug("Shutting down redis client.")
self._shutting_down = True
if self.is_canceller_only:
return
self._pubsub_thread.stop()
self._pubsub_expiring_thread.stop()