1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/data/readreplica.py
Syed Ahmed 943d95b98a chore: fix type check in read replica (PROJQUAY-6397) (#2777)
* fix type check in read replica (PROJQUAY-6397)
2024-03-19 17:57:01 +00:00

212 lines
7.4 KiB
Python

from __future__ import annotations
import logging
import random
from collections import namedtuple
from contextlib import contextmanager
from typing import Any, Type, TypeVar
from peewee import SENTINEL, Model, ModelSelect, OperationalError, Proxy
from data.decorators import is_deprecated_model
logger = logging.getLogger(__name__)
TReadReplicaSupportedModel = TypeVar(
"TReadReplicaSupportedModel", bound="ReadReplicaSupportedModel"
)
ReadOnlyConfig = namedtuple("ReadOnlyConfig", ["is_readonly", "read_replicas"])
class ReadOnlyModeException(Exception):
"""
Exception raised if a write operation was attempted when in read only mode.
"""
_FORCE_MASTER_COUNTER_ATTRIBUTE = "_force_master_nesting"
@contextmanager
def disallow_replica_use(db):
"""When used, any queries run under this context manager will hit the master
node and be disallowed from using the read replica(s). NOTE: This means if
the master node is unavailable, the underlying queries will *fail*.
"""
database = db.obj
counter = getattr(database._state, _FORCE_MASTER_COUNTER_ATTRIBUTE, 0)
setattr(database._state, _FORCE_MASTER_COUNTER_ATTRIBUTE, counter + 1)
try:
yield
finally:
counter = getattr(database._state, _FORCE_MASTER_COUNTER_ATTRIBUTE, 0)
assert counter > 0
setattr(database._state, _FORCE_MASTER_COUNTER_ATTRIBUTE, counter - 1)
class AutomaticFailoverWrapper(object):
"""
Class which wraps a peewee database driver and (optionally) a second driver.
When executing SQL, if an OperationalError occurs, if a second driver is given, the query is
attempted again on the fallback DB. Otherwise, the exception is raised.
"""
def __init__(self, primary_db, fallback_db=None):
self._primary_db = primary_db
self._fallback_db = fallback_db
def __getattr__(self, attribute):
if attribute != "execute_sql" and hasattr(self._primary_db, attribute):
return getattr(self._primary_db, attribute)
return getattr(self, attribute)
def execute(self, query, commit=SENTINEL, **context_options):
ctx = self.get_sql_context(**context_options)
sql, params = ctx.sql(query).query()
return self.execute_sql(sql, params, commit=commit)
def execute_sql(self, sql, params=None, commit=SENTINEL):
try:
return self._primary_db.execute_sql(sql, params, commit)
except OperationalError:
if self._fallback_db is not None:
try:
return self._fallback_db.execute_sql(sql, params, commit)
except OperationalError:
raise
class ReadReplicaSupportedModel(Model):
"""
Base model for peewee data models that support using a read replica for SELECT requests not
under transactions, and automatic failover to the master if the read replica fails.
Read-only queries are initially attempted on one of the read replica databases
being used; if an OperationalError occurs when attempting to invoke the query,
then the failure is logged and the query is retried on the database master.
Queries that are non-SELECTs (or under transactions) are always tried on the
master.
If the system is configured into read only mode, then all non-read-only queries
will raise a ReadOnlyModeException.
"""
@classmethod
def _read_only_config(cls):
read_only_config = getattr(cls._meta, "read_only_config", None)
if read_only_config is None:
return ReadOnlyConfig(False, [])
if isinstance(read_only_config, Proxy) and read_only_config.obj is None:
return ReadOnlyConfig(False, [])
return read_only_config.obj or ReadOnlyConfig(False, [])
@classmethod
def _in_readonly_mode(cls):
return cls._read_only_config().is_readonly
@classmethod
def _select_database(cls, can_use_read_replica=False):
"""
Selects a read replica database if we're configured to support read replicas.
Otherwise, selects the master database.
"""
# Select the master DB if read replica support is not enabled.
read_only_config = cls._read_only_config()
if not read_only_config.read_replicas:
return cls._meta.database
# Select the master DB if we're ever under a transaction.
if cls._meta.database.transaction_depth() > 0:
return cls._meta.database
# Select if forced.
if getattr(cls._meta.database._state, _FORCE_MASTER_COUNTER_ATTRIBUTE, 0) > 0:
return cls._meta.database
if not can_use_read_replica:
return cls._meta.database
# Otherwise, return a read replica database with auto-retry onto the main database.
replicas = read_only_config.read_replicas
selected_read_replica = replicas[random.randrange(len(replicas))]
return AutomaticFailoverWrapper(selected_read_replica, cls._meta.database)
@classmethod
def select(
cls: Type[TReadReplicaSupportedModel], *args, **kwargs: Any
) -> ModelSelect[TReadReplicaSupportedModel]:
can_use_read_replica = None
if "can_use_read_replica" in kwargs:
can_use_read_replica = kwargs.get("can_use_read_replica")
del kwargs["can_use_read_replica"]
query = super(ReadReplicaSupportedModel, cls).select(*args, **kwargs)
query._database = cls._select_database(can_use_read_replica)
return query
@classmethod
def get(
cls: Type[TReadReplicaSupportedModel], *args, **kwargs: Any
) -> TReadReplicaSupportedModel:
can_use_read_replica = None
if "can_use_read_replica" in kwargs:
can_use_read_replica = kwargs.get("can_use_read_replica")
del kwargs["can_use_read_replica"]
sq = cls.select(can_use_read_replica=can_use_read_replica)
if args:
# Handle simple lookup using just the primary key.
if len(args) == 1 and isinstance(args[0], int):
sq = sq.where(cls._meta.primary_key == args[0])
else:
sq = sq.where(*args)
if kwargs:
sq = sq.filter(**kwargs)
return sq.get()
@classmethod
def insert(cls, *args, **kwargs):
if is_deprecated_model(cls):
raise Exception("Attempt to write to deprecated model %s" % cls)
query = super(ReadReplicaSupportedModel, cls).insert(*args, **kwargs)
if cls._in_readonly_mode():
raise ReadOnlyModeException()
return query
@classmethod
def update(cls, *args, **kwargs):
query = super(ReadReplicaSupportedModel, cls).update(*args, **kwargs)
if cls._in_readonly_mode():
raise ReadOnlyModeException()
return query
@classmethod
def delete(cls, *args, **kwargs):
query = super(ReadReplicaSupportedModel, cls).delete(*args, **kwargs)
if cls._in_readonly_mode():
raise ReadOnlyModeException()
return query
@classmethod
def raw(cls, *args, **kwargs):
query = super(ReadReplicaSupportedModel, cls).raw(*args, **kwargs)
if query._sql.lower().startswith("select "):
query._database = cls._select_database()
elif cls._in_readonly_mode():
raise ReadOnlyModeException()
elif query._sql.lower().startswith("insert "):
if is_deprecated_model(cls):
raise Exception("Attempt to write to deprecated model %s" % cls)
return query