1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/__init__.py
jbpratt 73d2e2f444 feat(endpoints): add immutability policy API endpoints (PROJQUAY-10160) (#4934)
Add REST API for managing immutability policies at organization and
repository levels. Integrate policy evaluation into tag creation.

Signed-off-by: Brady Pratt <bpratt@redhat.com>
Co-authored-by: Claude <noreply@anthropic.com>
2026-01-22 15:30:09 +00:00

787 lines
25 KiB
Python

import datetime
import logging
from calendar import timegm
from email.utils import formatdate
from functools import partial, wraps
import pytz
from flask import Blueprint, request, session
from flask_restful import Api, Resource, abort, reqparse
from flask_restful.utils import unpack
from jsonschema import ValidationError, validate
from werkzeug.routing.exceptions import RequestRedirect
import features
from .__init__models_pre_oci import pre_oci_model as model
from app import app, authentication, usermanager
from auth import scopes
from auth.auth_context import (
get_authenticated_context,
get_authenticated_user,
get_validated_oauth_token,
)
from auth.decorators import process_oauth
from auth.permissions import (
AdministerRepositoryPermission,
GlobalReadOnlySuperUserPermission,
ModifyRepositoryPermission,
ReadRepositoryPermission,
SuperUserPermission,
UserAdminPermission,
UserReadPermission,
)
from data import model as data_model
from data.database import RepositoryState
from data.logs_model import logs_model
from digest import digest_tools
from endpoints.csrf import csrf_protect
from endpoints.decorators import (
check_anon_protection,
check_readonly,
require_xhr_from_browser,
)
from endpoints.exception import (
FreshLoginRequired,
InvalidRequest,
InvalidResponse,
NotFound,
Unauthorized,
)
from util.metrics.prometheus import timed_blueprint
from util.names import parse_namespace_repository
from util.pagination import decrypt_page_token, encrypt_page_token
from util.request import crossorigin, get_request_ip
from util.timedeltastring import convert_to_timedelta
logger = logging.getLogger(__name__)
api_bp = timed_blueprint(Blueprint("api", __name__), get_app=lambda: app)
FRESH_LOGIN_TIMEOUT = convert_to_timedelta(app.config.get("FRESH_LOGIN_TIMEOUT", "10m"))
class ApiExceptionHandlingApi(Api):
@crossorigin()
def handle_error(self, error):
return super(ApiExceptionHandlingApi, self).handle_error(error)
def _should_use_fr_error_handler(self):
try:
return super(ApiExceptionHandlingApi, self)._should_use_fr_error_handler()
except RequestRedirect:
return False
api = ApiExceptionHandlingApi()
api.init_app(api_bp)
api.decorators = [
csrf_protect(),
crossorigin(),
process_oauth,
require_xhr_from_browser,
]
def resource(*urls, **kwargs):
def wrapper(api_resource):
if not api_resource:
return None
api_resource.registered = True
api.add_resource(api_resource, *urls, **kwargs)
return api_resource
return wrapper
def show_if(value):
def f(inner):
if hasattr(inner, "registered") and inner.registered:
msg = (
"API endpoint %s is already registered; please switch the "
+ "@show_if to be *below* the @resource decorator"
)
raise Exception(msg % inner)
if not value:
return None
return inner
return f
def hide_if(value):
def f(inner):
if hasattr(inner, "registered") and inner.registered:
msg = (
"API endpoint %s is already registered; please switch the "
+ "@hide_if to be *below* the @resource decorator"
)
raise Exception(msg % inner)
if value:
return None
return inner
return f
def format_date(date):
"""
Output an RFC 2822 date format.
"""
if date is None:
return None
return formatdate(timegm(date.utctimetuple()))
def add_method_metadata(name, value):
def modifier(func):
if func is None:
return None
if "__api_metadata" not in dir(func):
func.__api_metadata = {}
func.__api_metadata[name] = value
return func
return modifier
def method_metadata(func, name):
if func is None:
return None
if "__api_metadata" in dir(func):
return func.__api_metadata.get(name, None)
return None
nickname = partial(add_method_metadata, "nickname")
related_user_resource = partial(add_method_metadata, "related_user_resource")
internal_only = add_method_metadata("internal", True)
def path_param(name, description):
def add_param(func):
if not func:
return func
if "__api_path_params" not in dir(func):
func.__api_path_params = {}
func.__api_path_params[name] = {"name": name, "description": description}
return func
return add_param
def query_param(name, help_str, type=reqparse.text_type, default=None, choices=(), required=False):
def add_param(func):
if "__api_query_params" not in dir(func):
func.__api_query_params = []
func.__api_query_params.append(
{
"name": name,
"type": type,
"help": help_str,
"default": default,
"choices": choices,
"required": required,
"location": ("args"),
}
)
return func
return add_param
def page_support(page_token_kwarg="page_token", parsed_args_kwarg="parsed_args"):
def inner(func):
"""
Adds pagination support to an API endpoint.
The decorated API will have an added query parameter named 'next_page'. Works in tandem with
the modelutil paginate method.
"""
@wraps(func)
@query_param("next_page", "The page token for the next page", type=str)
def wrapper(self, *args, **kwargs):
# Note: if page_token is None, we'll receive the first page of results back.
page_token = decrypt_page_token(kwargs[parsed_args_kwarg]["next_page"])
kwargs[page_token_kwarg] = page_token
(result, next_page_token) = func(self, *args, **kwargs)
if next_page_token is not None:
result["next_page"] = encrypt_page_token(next_page_token)
return result
return wrapper
return inner
def parse_args(kwarg_name="parsed_args"):
def inner(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if "__api_query_params" not in dir(func):
abort(500)
parser = reqparse.RequestParser()
for arg_spec in func.__api_query_params:
parser.add_argument(**arg_spec)
kwargs[kwarg_name] = parser.parse_args()
return func(self, *args, **kwargs)
return wrapper
return inner
def parse_repository_name(func):
@wraps(func)
def wrapper(repository, *args, **kwargs):
(namespace, repository) = parse_namespace_repository(
repository, app.config["LIBRARY_NAMESPACE"]
)
return func(namespace, repository, *args, **kwargs)
return wrapper
class ApiResource(Resource):
registered = False
method_decorators = [
check_anon_protection,
check_readonly,
]
def options(self):
return None, 200
class RepositoryParamResource(ApiResource):
method_decorators = [
check_anon_protection,
parse_repository_name,
check_readonly,
]
def disallow_for_user_namespace(func):
@wraps(func)
def wrapped(self, namespace_name, repository_name, *args, **kwargs):
if features.RESTRICTED_USERS:
user = get_authenticated_user()
if (
user is not None
and user.username == namespace_name
and usermanager.is_restricted_user(user.username)
):
abort(403, message="Operation not allowed on restricted user owned namespace")
return func(self, namespace_name, repository_name, *args, **kwargs)
return wrapped
def disallow_for_app_repositories(func):
@wraps(func)
def wrapped(self, namespace_name, repository_name, *args, **kwargs):
# Lookup the repository with the given namespace and name and ensure it is not an application
# repository.
if model.is_app_repository(namespace_name, repository_name):
abort(501)
return func(self, namespace_name, repository_name, *args, **kwargs)
return wrapped
def disallow_for_non_normal_repositories(func):
@wraps(func)
def wrapped(self, namespace_name, repository_name, *args, **kwargs):
repo = data_model.repository.get_repository(namespace_name, repository_name)
if repo and repo.state != RepositoryState.NORMAL:
abort(503, message="Repository is in read only or mirror mode: %s" % repo.state)
return func(self, namespace_name, repository_name, *args, **kwargs)
return wrapped
def require_repo_permission(permission_class, scope, allow_public=False):
def _require_permission(
allow_for_superuser=False,
disallow_for_restricted_user=False,
allow_for_global_readonly_superuser=False,
):
def wrapper(func):
@add_method_metadata("oauth2_scope", scope)
@wraps(func)
def wrapped(self, namespace, repository, *args, **kwargs):
logger.debug(
"Checking permission %s for repo: %s/%s",
permission_class,
namespace,
repository,
)
user = get_authenticated_user()
if features.RESTRICTED_USERS and disallow_for_restricted_user:
if (
usermanager.is_restricted_user(user.username)
and not (allow_public and model.repository_is_public(namespace, repository))
and not SuperUserPermission().can()
):
raise Unauthorized()
permission = permission_class(namespace, repository)
if permission.can() or (
allow_public
and model.repository_is_public(namespace, repository)
or (allow_public and GlobalReadOnlySuperUserPermission().can())
):
return func(self, namespace, repository, *args, **kwargs)
if allow_for_superuser:
user = get_authenticated_user()
if user is not None:
# For read operations that also allow global readonly superusers,
# allow any superuser with FULL_ACCESS
if (
allow_for_global_readonly_superuser
and features.SUPERUSERS_FULL_ACCESS
and allow_if_any_superuser()
):
return func(self, namespace, repository, *args, **kwargs)
# For write operations, only allow regular superusers with FULL_ACCESS
elif (
not allow_for_global_readonly_superuser
and allow_if_superuser_with_full_access()
):
return func(self, namespace, repository, *args, **kwargs)
if allow_for_global_readonly_superuser and allow_if_global_readonly_superuser():
return func(self, namespace, repository, *args, **kwargs)
raise Unauthorized()
return wrapped
return wrapper
return _require_permission
require_repo_read = require_repo_permission(ReadRepositoryPermission, scopes.READ_REPO, True)
require_repo_write = require_repo_permission(ModifyRepositoryPermission, scopes.WRITE_REPO)
require_repo_admin = require_repo_permission(AdministerRepositoryPermission, scopes.ADMIN_REPO)
def require_user_permission(permission_class, scope=None):
def _require_permission(allow_for_superuser=False, disallow_for_restricted_users=False):
def wrapper(func):
@add_method_metadata("oauth2_scope", scope)
@wraps(func)
def wrapped(self, *args, **kwargs):
user = get_authenticated_user()
if not user:
raise Unauthorized()
if features.RESTRICTED_USERS and disallow_for_restricted_users:
if (
usermanager.is_restricted_user(user.username)
and not SuperUserPermission().can()
):
raise Unauthorized()
logger.debug("Checking permission %s for user %s", permission_class, user.username)
permission = permission_class(user.username)
if permission.can():
return func(self, *args, **kwargs)
if allow_for_superuser and allow_if_superuser_with_full_access():
return func(self, *args, **kwargs)
raise Unauthorized()
return wrapped
return wrapper
return _require_permission
require_user_read = require_user_permission(UserReadPermission, scopes.READ_USER)
require_user_admin = require_user_permission(UserAdminPermission, scopes.ADMIN_USER)
def log_unauthorized(audit_event):
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except endpoints.v2.errors.Unauthorized as e:
if (
(
app.config.get("ACTION_LOG_AUDIT_PUSH_FAILURES")
and audit_event == "push_repo_failed"
)
or (
app.config.get("ACTION_LOG_AUDIT_PULL_FAILURES")
and audit_event == "pull_repo_failed"
)
or (
app.config.get("ACTION_LOG_AUDIT_DELETE_FAILURES")
and audit_event == "delete_tag_failed"
)
):
if "namespace_name" in kwargs and "repo_name" in kwargs:
metadata = {
"namespace": kwargs["namespace_name"],
"repo": kwargs["repo_name"],
}
if "manifest_ref" in kwargs:
try:
digest = digest_tools.Digest.parse_digest(kwargs["manifest_ref"])
metadata["manifest_digest"] = str(digest)
except digest_tools.InvalidDigestException:
metadata["tag"] = kwargs["manifest_ref"]
user_or_orgname = data_model.user.get_user_or_org(kwargs["namespace_name"])
if user_or_orgname is not None:
repo = data_model.repository.get_repository(
user_or_orgname.username, kwargs["repo_name"]
)
else:
repo = None
if user_or_orgname is None:
metadata["message"] = "Namespace does not exist"
log_action(
kind=audit_event,
user_or_orgname=None,
metadata=metadata,
)
elif repo is None:
metadata["message"] = "Repository does not exist"
log_action(
kind=audit_event,
user_or_orgname=user_or_orgname.username,
metadata=metadata,
)
else:
metadata["message"] = str(e)
log_action(
kind=audit_event,
user_or_orgname=user_or_orgname.username,
repo_name=repo.name,
metadata=metadata,
)
logger.debug("Unauthorized request: %s", e)
raise e
return wrapper
return inner
log_unauthorized_pull = log_unauthorized("pull_repo_failed")
log_unauthorized_push = log_unauthorized("push_repo_failed")
log_unauthorized_delete = log_unauthorized("delete_tag_failed")
def allow_if_superuser():
"""
Returns True if the user is a regular superuser (not global readonly).
This is for basic superuser panel access and should work when FEATURE_SUPER_USERS
is enabled, regardless of SUPERUSERS_FULL_ACCESS. This does NOT grant permission
to bypass normal access controls on other users' resources.
For operations that need to bypass normal permission checks (like accessing other
organizations or creating repos in other namespaces), use allow_if_superuser_with_full_access().
"""
return SuperUserPermission().can()
def allow_if_superuser_with_full_access():
"""
Returns True if the user is a superuser with full access enabled.
This is for operations that bypass normal permission checks to access or modify
resources owned by other users/organizations. Examples include:
- Creating repositories in other namespaces
- Modifying teams/robots in other organizations
- Accessing private data of other organizations
Requires both:
- User is a superuser (FEATURE_SUPER_USERS enabled and user in SUPER_USERS list)
- FEATURE_SUPERUSERS_FULL_ACCESS is enabled
Note: Global readonly superusers are explicitly excluded from this permission.
"""
return bool(features.SUPERUSERS_FULL_ACCESS and SuperUserPermission().can())
def allow_if_any_superuser():
"""
Returns True if the user is either a regular superuser or a global readonly superuser.
Since these two types are mutually exclusive, this is a convenience helper for read-only
endpoints that should be accessible to both types of superusers (like viewing user lists,
logs, organizations in the superuser panel).
Note: Regular superusers work with just FEATURE_SUPER_USERS enabled. Global readonly
superusers are always allowed (when the feature is enabled) since they're read-only by design.
"""
return allow_if_superuser() or allow_if_global_readonly_superuser()
def allow_if_global_readonly_superuser():
ldap_filter = app.config.get("LDAP_GLOBAL_READONLY_SUPERUSER_FILTER", None)
config_users = app.config.get("GLOBAL_READONLY_SUPER_USERS", None)
logger.debug(
"allow_if_global_readonly_superuser: ldap_filter=%s, config_users=%s",
ldap_filter,
config_users,
)
if ldap_filter is None and config_users is None:
logger.debug("allow_if_global_readonly_superuser: returning False - no config")
return False
context = get_authenticated_context()
logger.debug("allow_if_global_readonly_superuser: context=%s", context)
if context is None or context.authed_user is None:
logger.debug("allow_if_global_readonly_superuser: returning False - no context/user")
return False
username = context.authed_user.username
is_global_readonly = usermanager.is_global_readonly_superuser(username)
logger.debug(
"allow_if_global_readonly_superuser: user=%s, is_global_readonly=%s",
username,
is_global_readonly,
)
return is_global_readonly
def verify_not_prod(func):
@add_method_metadata("enterprise_only", True)
@wraps(func)
def wrapped(*args, **kwargs):
# Verify that we are not running on a production (i.e. hosted) stack. If so, we fail.
# This should never happen (because of the feature-flag on SUPER_USERS), but we want to be
# absolutely sure.
if app.config["SERVER_HOSTNAME"].find("quay.io") >= 0:
logger.error("!!! Super user method called IN PRODUCTION !!!")
raise NotFound()
return func(*args, **kwargs)
return wrapped
def require_fresh_login(func):
@add_method_metadata("requires_fresh_login", True)
@wraps(func)
def wrapped(*args, **kwargs):
user = get_authenticated_user()
if not user or user.robot:
raise Unauthorized()
if get_validated_oauth_token():
return func(*args, **kwargs)
last_login = session.get("login_time", datetime.datetime.min)
valid_span = datetime.datetime.now() - FRESH_LOGIN_TIMEOUT
logger.debug(
"Checking fresh login for user %s: Last login at %s", user.username, last_login
)
if (
last_login.replace(tzinfo=pytz.UTC) >= valid_span.replace(tzinfo=pytz.UTC)
or not authentication.supports_fresh_login
or not authentication.has_password_set(user.username)
):
return func(*args, **kwargs)
raise FreshLoginRequired()
return wrapped
def require_scope(scope_object):
def wrapper(func):
@add_method_metadata("oauth2_scope", scope_object)
@wraps(func)
def wrapped(*args, **kwargs):
return func(*args, **kwargs)
return wrapped
return wrapper
def max_json_size(max_size):
def wrapper(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
if request.is_json and len(request.get_data()) > max_size:
raise InvalidRequest()
return func(self, *args, **kwargs)
return wrapped
return wrapper
def validate_json_request(schema_name, optional=False):
def wrapper(func):
@add_method_metadata("request_schema", schema_name)
@wraps(func)
def wrapped(self, *args, **kwargs):
schema = self.schemas[schema_name]
try:
json_data = request.get_json(silent=optional)
if json_data is None:
if not optional:
raise InvalidRequest("Missing JSON body")
else:
validate(json_data, schema)
return func(self, *args, **kwargs)
except ValidationError as ex:
raise InvalidRequest(str(ex))
return wrapped
return wrapper
def request_error(exception=None, **kwargs):
data = kwargs.copy()
message = "Request error."
if exception:
message = str(exception)
message = data.pop("message", message)
raise InvalidRequest(message, data)
def log_action(kind, user_or_orgname, metadata=None, repo=None, repo_name=None, performer=None):
if not metadata:
metadata = {}
oauth_token = get_validated_oauth_token()
if oauth_token:
metadata["oauth_token_id"] = oauth_token.id
metadata["oauth_token_application_id"] = oauth_token.application.client_id
metadata["oauth_token_application"] = oauth_token.application.name
if performer is None:
performer = get_authenticated_user()
if repo_name is not None:
repo = data_model.repository.get_repository(user_or_orgname, repo_name)
logs_model.log_action(
kind,
user_or_orgname,
repository=repo,
performer=performer,
ip=get_request_ip(),
metadata=metadata,
)
def define_json_response(schema_name):
def wrapper(func):
@add_method_metadata("response_schema", schema_name)
@wraps(func)
def wrapped(self, *args, **kwargs):
schema = self.schemas[schema_name]
resp = func(self, *args, **kwargs)
if app.config["TESTING"]:
try:
validate(resp, schema)
except ValidationError as ex:
raise InvalidResponse(str(ex))
return resp
return wrapped
return wrapper
def deprecated():
"""
Marks a given API resource operation as deprecated by adding `Deprecation` header.
See https://tools.ietf.org/id/draft-dalal-deprecation-header-01.html#RFC7234.
"""
def wrapper(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
(data, code, headers) = unpack(func(self, *args, **kwargs))
headers["Deprecation"] = "true"
return (data, code, headers)
return wrapped
return wrapper
import endpoints.api.appspecifictokens
import endpoints.api.billing
import endpoints.api.build
import endpoints.api.capabilities
import endpoints.api.discovery
import endpoints.api.error
import endpoints.api.globalmessages
import endpoints.api.immutability_policy
import endpoints.api.logs
import endpoints.api.manifest
import endpoints.api.mirror
import endpoints.api.namespacequota
import endpoints.api.org_mirror
import endpoints.api.organization
import endpoints.api.permission
import endpoints.api.policy
import endpoints.api.prototype
import endpoints.api.repoemail
import endpoints.api.repository
import endpoints.api.repositorynotification
import endpoints.api.repotoken
import endpoints.api.robot
import endpoints.api.search
import endpoints.api.secscan
import endpoints.api.signing
import endpoints.api.suconfig
import endpoints.api.superuser
import endpoints.api.tag
import endpoints.api.team
import endpoints.api.trigger
import endpoints.api.user