1
0
mirror of https://github.com/quay/quay.git synced 2026-01-27 18:42:52 +03:00
Files
quay/util/security/instancekeys.py
mosen fca67e7729 feat: mypy type annotations (PROJQUAY-740) (#455)
* Add dev dependencies mypy and typing

* Add makefile target `types-test`, not yet included in `test` target.

* Generate stubs for imported modules to avoid mypy complaining about missing types.

* Remove generated stubs as there are way too many and they cause tons of mess in the repo. Switched to ignoring untyped modules for now, to concentrate on Quay-only type checking.

* mypy config changed to ignore missing imports

* ignore property decorator as it is not supported by mypy

* mypy annotations for many configuration variables

* re-generate mypy_stubs directory as its necessary in some classes for base classes to prevent mypy errors

* util/registry/queuefile referred to non existent definition of Empty class in multiprocessing.queues

* ignore type checking for things like monkey patching and exported/re-imported objects that 
mypy does not allow.

* Adjust mypy config to warn us about unreachable return paths and useless expressions.

* Add the __annotations__ property to INTERNAL_ONLY_PROPERTIES so that it is not part of the config schema testing

* Remove redundant dependencies `typing` and `typing-extensions` which are NOOP after Python 3.5

* Remove mypy-extensions which only provides a TypedDict implementation but has not been updated since 2019.

* updated mypy to 0.910 which requires all types packages to be installed manually.

* exclude local-dev from type checking until core team can suggest an outcome for __init__.py duplicate packages

* re-add typing dependency which will be needed until Python 3.9

* ignore .mypy_cache

* add mypy stub for features module to replace inline definitions

* import annotations eager evaluation in billing.py as it was required to reference a class declared later in the module.

* remove the type definition of V1ProtocolSteps/V2ProtocolSteps to make tox happy
2021-10-25 09:56:26 +02:00

97 lines
2.9 KiB
Python

from cachetools.func import lru_cache
from data import model
from util.expiresdict import ExpiresDict, ExpiresEntry
from util.security import jwtutil
class CachingKey(object):
def __init__(self, service_key):
self._service_key = service_key
self._cached_public_key = None
@property
def public_key(self):
cached_key = self._cached_public_key
if cached_key is not None:
return cached_key
# Convert the JWK into a public key and cache it (since the conversion can take > 200ms).
public_key = jwtutil.jwk_dict_to_public_key(self._service_key.jwk)
self._cached_public_key = public_key
return public_key
class InstanceKeys(object):
"""
InstanceKeys defines a helper class for interacting with the Quay instance service keys used for
JWT signing of registry tokens as well as requests from Quay to other services such as Clair.
Each container will have a single registered instance key.
"""
def __init__(self, app):
self.app = app
self.instance_keys = ExpiresDict(self._load_instance_keys)
def clear_cache(self):
"""
Clears the cache of instance keys.
"""
self.instance_keys = ExpiresDict(self._load_instance_keys)
def _load_instance_keys(self):
# Load all the instance keys.
keys = {}
for key in model.service_keys.list_service_keys(self.service_name):
keys[key.kid] = ExpiresEntry(CachingKey(key), key.expiration_date)
return keys
@property
def service_name(self):
"""
Returns the name of the instance key's service (i.e. 'quay').
"""
return self.app.config["INSTANCE_SERVICE_KEY_SERVICE"]
@property
def service_key_expiration(self):
"""
Returns the defined expiration for instance service keys, in minutes.
"""
return self.app.config.get("INSTANCE_SERVICE_KEY_EXPIRATION", 120)
@property # type: ignore
@lru_cache(maxsize=1)
def local_key_id(self):
"""
Returns the ID of the local instance service key.
"""
return _load_file_contents(self.app.config["INSTANCE_SERVICE_KEY_KID_LOCATION"])
@property # type: ignore
@lru_cache(maxsize=1)
def local_private_key(self):
"""
Returns the private key of the local instance service key.
"""
return _load_file_contents(self.app.config["INSTANCE_SERVICE_KEY_LOCATION"])
def get_service_key_public_key(self, kid):
"""
Returns the public key associated with the given instance service key or None if none.
"""
caching_key = self.instance_keys.get(kid)
if caching_key is None:
return None
return caching_key.public_key
def _load_file_contents(path):
"""
Returns the contents of the specified file path.
"""
with open(path) as f:
return f.read()