mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* fix(api): fix superuser panel access and permission bypass behavior Fixed multiple issues with superuser functionality related to FEATURE_SUPERUSERS_FULL_ACCESS: 1. **Superuser Panel Access**: Fixed bug where superuser panel endpoints returned 403 when FEATURE_SUPERUSERS_FULL_ACCESS was disabled. Basic superuser panel operations (user list, logs, org list, messages) now work with just FEATURE_SUPER_USERS enabled. 2. **Permission Bypass Logic**: Updated decorators to properly differentiate between: - Basic superuser operations (don't require FULL_ACCESS) - Permission bypass operations (require FULL_ACCESS) - Read operations (allow global readonly superusers with FULL_ACCESS) - Write operations (only allow regular superusers with FULL_ACCESS) 3. **Global Readonly Superuser Support**: Fixed decorators to allow global readonly superusers read access when FULL_ACCESS is enabled, while blocking write operations. 4. **License Bypass**: Superusers with FULL_ACCESS now bypass license/quota limits when creating or modifying private repositories in any namespace. Changes: - endpoints/api/__init__.py: Fixed allow_if_superuser() and decorators - endpoints/api/organization.py: Updated 4 GET endpoints to require FULL_ACCESS - endpoints/api/namespacequota.py: Updated 2 GET endpoints to require FULL_ACCESS - endpoints/api/team.py: Updated 2 GET endpoints to require FULL_ACCESS - endpoints/api/prototype.py: Updated 1 GET endpoint to require FULL_ACCESS - endpoints/api/repository.py: Added license bypass for superusers with FULL_ACCESS - endpoints/api/logs.py: Fixed log access to require FULL_ACCESS for permission bypass - endpoints/api/test/test_superuser_full_access.py: Added comprehensive test suite - test/test_api_usage.py: Updated test expectations for license bypass behavior 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * test(api): add plan limit test for regular users Adds test_plan_limit_enforcement_for_regular_users to verify that license limits still apply to non-superuser accounts. This restores test coverage for plan limit enforcement that was changed when we updated test_trychangevisibility to validate superuser bypass behavior. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * fix(api): correct superuser cross-namespace permission model Fixes permission checks to properly implement the superuser access model: - Global Readonly Superusers (auditors): Always have read access to all content, independent of FEATURE_SUPERUSERS_FULL_ACCESS setting - Regular Superusers: Can access /v1/superuser endpoints and their own content. Require FEATURE_SUPERUSERS_FULL_ACCESS=true for cross-namespace read access - Full Access Superusers: Regular superusers with FULL_ACCESS enabled, can perform CRUD on content they don't own - Write operations: Only allowed for full access superusers (global readonly superusers never get write access) Updated 18 permission checks across 7 files: - endpoints/api/__init__.py: require_repo_permission decorator - endpoints/api/organization.py: org view, teams, collaborators, proxy cache - endpoints/api/namespacequota.py: quota listing and individual quota - endpoints/api/policy.py: auto-prune policies for orgs and repos - endpoints/api/robot.py: robot listing, details, permissions, federation - endpoints/api/build.py: repository build logs - endpoints/api/repository_models_pre_oci.py: repository visibility query 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * fix(api): require FULL_ACCESS for regular superusers on /v1/superuser/apptokens Updates the SuperUserAppTokens endpoint to properly enforce permission model: - Regular superusers WITHOUT FULL_ACCESS: Get 403 Unauthorized - Global readonly superusers: Always get 200 (auditor access, independent of FULL_ACCESS) - Full access superusers (FULL_ACCESS=true): Get 200 with all tokens This ensures regular superusers can only audit app tokens across the system when they have full access privileges enabled, while global readonly superusers (auditors) always maintain read access for auditing purposes. Changes: - endpoints/api/superuser.py: Update permission check in SuperUserAppTokens.get() - endpoints/api/test/test_appspecifictoken.py: Update test mocking and add new test for 403 behavior 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Dave O'Connor <doconnor@redhat.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Dave O'Connor <1656866+HammerMeetNail@users.noreply.github.com>
784 lines
25 KiB
Python
784 lines
25 KiB
Python
import datetime
|
|
import logging
|
|
from calendar import timegm
|
|
from email.utils import formatdate
|
|
from functools import partial, wraps
|
|
|
|
import pytz
|
|
from flask import Blueprint, request, session
|
|
from flask_restful import Api, Resource, abort, reqparse
|
|
from flask_restful.utils import unpack
|
|
from jsonschema import ValidationError, validate
|
|
from werkzeug.routing.exceptions import RequestRedirect
|
|
|
|
import features
|
|
from .__init__models_pre_oci import pre_oci_model as model
|
|
from app import app, authentication, usermanager
|
|
from auth import scopes
|
|
from auth.auth_context import (
|
|
get_authenticated_context,
|
|
get_authenticated_user,
|
|
get_validated_oauth_token,
|
|
)
|
|
from auth.decorators import process_oauth
|
|
from auth.permissions import (
|
|
AdministerRepositoryPermission,
|
|
GlobalReadOnlySuperUserPermission,
|
|
ModifyRepositoryPermission,
|
|
ReadRepositoryPermission,
|
|
SuperUserPermission,
|
|
UserAdminPermission,
|
|
UserReadPermission,
|
|
)
|
|
from data import model as data_model
|
|
from data.database import RepositoryState
|
|
from data.logs_model import logs_model
|
|
from digest import digest_tools
|
|
from endpoints.csrf import csrf_protect
|
|
from endpoints.decorators import (
|
|
check_anon_protection,
|
|
check_readonly,
|
|
require_xhr_from_browser,
|
|
)
|
|
from endpoints.exception import (
|
|
FreshLoginRequired,
|
|
InvalidRequest,
|
|
InvalidResponse,
|
|
NotFound,
|
|
Unauthorized,
|
|
)
|
|
from util.metrics.prometheus import timed_blueprint
|
|
from util.names import parse_namespace_repository
|
|
from util.pagination import decrypt_page_token, encrypt_page_token
|
|
from util.request import crossorigin, get_request_ip
|
|
from util.timedeltastring import convert_to_timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
api_bp = timed_blueprint(Blueprint("api", __name__))
|
|
|
|
|
|
FRESH_LOGIN_TIMEOUT = convert_to_timedelta(app.config.get("FRESH_LOGIN_TIMEOUT", "10m"))
|
|
|
|
|
|
class ApiExceptionHandlingApi(Api):
|
|
@crossorigin()
|
|
def handle_error(self, error):
|
|
return super(ApiExceptionHandlingApi, self).handle_error(error)
|
|
|
|
def _should_use_fr_error_handler(self):
|
|
try:
|
|
return super(ApiExceptionHandlingApi, self)._should_use_fr_error_handler()
|
|
except RequestRedirect:
|
|
return False
|
|
|
|
|
|
api = ApiExceptionHandlingApi()
|
|
api.init_app(api_bp)
|
|
api.decorators = [
|
|
csrf_protect(),
|
|
crossorigin(),
|
|
process_oauth,
|
|
require_xhr_from_browser,
|
|
]
|
|
|
|
|
|
def resource(*urls, **kwargs):
|
|
def wrapper(api_resource):
|
|
if not api_resource:
|
|
return None
|
|
|
|
api_resource.registered = True
|
|
api.add_resource(api_resource, *urls, **kwargs)
|
|
return api_resource
|
|
|
|
return wrapper
|
|
|
|
|
|
def show_if(value):
|
|
def f(inner):
|
|
if hasattr(inner, "registered") and inner.registered:
|
|
msg = (
|
|
"API endpoint %s is already registered; please switch the "
|
|
+ "@show_if to be *below* the @resource decorator"
|
|
)
|
|
raise Exception(msg % inner)
|
|
|
|
if not value:
|
|
return None
|
|
|
|
return inner
|
|
|
|
return f
|
|
|
|
|
|
def hide_if(value):
|
|
def f(inner):
|
|
if hasattr(inner, "registered") and inner.registered:
|
|
msg = (
|
|
"API endpoint %s is already registered; please switch the "
|
|
+ "@hide_if to be *below* the @resource decorator"
|
|
)
|
|
raise Exception(msg % inner)
|
|
|
|
if value:
|
|
return None
|
|
|
|
return inner
|
|
|
|
return f
|
|
|
|
|
|
def format_date(date):
|
|
"""
|
|
Output an RFC 2822 date format.
|
|
"""
|
|
if date is None:
|
|
return None
|
|
return formatdate(timegm(date.utctimetuple()))
|
|
|
|
|
|
def add_method_metadata(name, value):
|
|
def modifier(func):
|
|
if func is None:
|
|
return None
|
|
|
|
if "__api_metadata" not in dir(func):
|
|
func.__api_metadata = {}
|
|
func.__api_metadata[name] = value
|
|
return func
|
|
|
|
return modifier
|
|
|
|
|
|
def method_metadata(func, name):
|
|
if func is None:
|
|
return None
|
|
|
|
if "__api_metadata" in dir(func):
|
|
return func.__api_metadata.get(name, None)
|
|
return None
|
|
|
|
|
|
nickname = partial(add_method_metadata, "nickname")
|
|
related_user_resource = partial(add_method_metadata, "related_user_resource")
|
|
internal_only = add_method_metadata("internal", True)
|
|
|
|
|
|
def path_param(name, description):
|
|
def add_param(func):
|
|
if not func:
|
|
return func
|
|
|
|
if "__api_path_params" not in dir(func):
|
|
func.__api_path_params = {}
|
|
func.__api_path_params[name] = {"name": name, "description": description}
|
|
return func
|
|
|
|
return add_param
|
|
|
|
|
|
def query_param(name, help_str, type=reqparse.text_type, default=None, choices=(), required=False):
|
|
def add_param(func):
|
|
if "__api_query_params" not in dir(func):
|
|
func.__api_query_params = []
|
|
func.__api_query_params.append(
|
|
{
|
|
"name": name,
|
|
"type": type,
|
|
"help": help_str,
|
|
"default": default,
|
|
"choices": choices,
|
|
"required": required,
|
|
"location": ("args"),
|
|
}
|
|
)
|
|
return func
|
|
|
|
return add_param
|
|
|
|
|
|
def page_support(page_token_kwarg="page_token", parsed_args_kwarg="parsed_args"):
|
|
def inner(func):
|
|
"""
|
|
Adds pagination support to an API endpoint.
|
|
|
|
The decorated API will have an added query parameter named 'next_page'. Works in tandem with
|
|
the modelutil paginate method.
|
|
"""
|
|
|
|
@wraps(func)
|
|
@query_param("next_page", "The page token for the next page", type=str)
|
|
def wrapper(self, *args, **kwargs):
|
|
# Note: if page_token is None, we'll receive the first page of results back.
|
|
page_token = decrypt_page_token(kwargs[parsed_args_kwarg]["next_page"])
|
|
kwargs[page_token_kwarg] = page_token
|
|
|
|
(result, next_page_token) = func(self, *args, **kwargs)
|
|
if next_page_token is not None:
|
|
result["next_page"] = encrypt_page_token(next_page_token)
|
|
|
|
return result
|
|
|
|
return wrapper
|
|
|
|
return inner
|
|
|
|
|
|
def parse_args(kwarg_name="parsed_args"):
|
|
def inner(func):
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if "__api_query_params" not in dir(func):
|
|
abort(500)
|
|
|
|
parser = reqparse.RequestParser()
|
|
for arg_spec in func.__api_query_params:
|
|
parser.add_argument(**arg_spec)
|
|
kwargs[kwarg_name] = parser.parse_args()
|
|
|
|
return func(self, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return inner
|
|
|
|
|
|
def parse_repository_name(func):
|
|
@wraps(func)
|
|
def wrapper(repository, *args, **kwargs):
|
|
(namespace, repository) = parse_namespace_repository(
|
|
repository, app.config["LIBRARY_NAMESPACE"]
|
|
)
|
|
return func(namespace, repository, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
class ApiResource(Resource):
|
|
registered = False
|
|
method_decorators = [
|
|
check_anon_protection,
|
|
check_readonly,
|
|
]
|
|
|
|
def options(self):
|
|
return None, 200
|
|
|
|
|
|
class RepositoryParamResource(ApiResource):
|
|
method_decorators = [
|
|
check_anon_protection,
|
|
parse_repository_name,
|
|
check_readonly,
|
|
]
|
|
|
|
|
|
def disallow_for_user_namespace(func):
|
|
@wraps(func)
|
|
def wrapped(self, namespace_name, repository_name, *args, **kwargs):
|
|
if features.RESTRICTED_USERS:
|
|
user = get_authenticated_user()
|
|
if (
|
|
user is not None
|
|
and user.username == namespace_name
|
|
and usermanager.is_restricted_user(user.username)
|
|
):
|
|
abort(403, message="Operation not allowed on restricted user owned namespace")
|
|
|
|
return func(self, namespace_name, repository_name, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def disallow_for_app_repositories(func):
|
|
@wraps(func)
|
|
def wrapped(self, namespace_name, repository_name, *args, **kwargs):
|
|
# Lookup the repository with the given namespace and name and ensure it is not an application
|
|
# repository.
|
|
if model.is_app_repository(namespace_name, repository_name):
|
|
abort(501)
|
|
|
|
return func(self, namespace_name, repository_name, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def disallow_for_non_normal_repositories(func):
|
|
@wraps(func)
|
|
def wrapped(self, namespace_name, repository_name, *args, **kwargs):
|
|
repo = data_model.repository.get_repository(namespace_name, repository_name)
|
|
if repo and repo.state != RepositoryState.NORMAL:
|
|
abort(503, message="Repository is in read only or mirror mode: %s" % repo.state)
|
|
|
|
return func(self, namespace_name, repository_name, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def require_repo_permission(permission_class, scope, allow_public=False):
|
|
def _require_permission(
|
|
allow_for_superuser=False,
|
|
disallow_for_restricted_user=False,
|
|
allow_for_global_readonly_superuser=False,
|
|
):
|
|
def wrapper(func):
|
|
@add_method_metadata("oauth2_scope", scope)
|
|
@wraps(func)
|
|
def wrapped(self, namespace, repository, *args, **kwargs):
|
|
logger.debug(
|
|
"Checking permission %s for repo: %s/%s",
|
|
permission_class,
|
|
namespace,
|
|
repository,
|
|
)
|
|
|
|
user = get_authenticated_user()
|
|
|
|
if features.RESTRICTED_USERS and disallow_for_restricted_user:
|
|
if (
|
|
usermanager.is_restricted_user(user.username)
|
|
and not (allow_public and model.repository_is_public(namespace, repository))
|
|
and not SuperUserPermission().can()
|
|
):
|
|
raise Unauthorized()
|
|
|
|
permission = permission_class(namespace, repository)
|
|
if permission.can() or (
|
|
allow_public
|
|
and model.repository_is_public(namespace, repository)
|
|
or (allow_public and GlobalReadOnlySuperUserPermission().can())
|
|
):
|
|
return func(self, namespace, repository, *args, **kwargs)
|
|
|
|
if allow_for_superuser:
|
|
user = get_authenticated_user()
|
|
|
|
if user is not None:
|
|
# For read operations that also allow global readonly superusers,
|
|
# allow any superuser with FULL_ACCESS
|
|
if (
|
|
allow_for_global_readonly_superuser
|
|
and features.SUPERUSERS_FULL_ACCESS
|
|
and allow_if_any_superuser()
|
|
):
|
|
return func(self, namespace, repository, *args, **kwargs)
|
|
# For write operations, only allow regular superusers with FULL_ACCESS
|
|
elif (
|
|
not allow_for_global_readonly_superuser
|
|
and allow_if_superuser_with_full_access()
|
|
):
|
|
return func(self, namespace, repository, *args, **kwargs)
|
|
|
|
if allow_for_global_readonly_superuser and allow_if_global_readonly_superuser():
|
|
return func(self, namespace, repository, *args, **kwargs)
|
|
|
|
raise Unauthorized()
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
return _require_permission
|
|
|
|
|
|
require_repo_read = require_repo_permission(ReadRepositoryPermission, scopes.READ_REPO, True)
|
|
require_repo_write = require_repo_permission(ModifyRepositoryPermission, scopes.WRITE_REPO)
|
|
require_repo_admin = require_repo_permission(AdministerRepositoryPermission, scopes.ADMIN_REPO)
|
|
|
|
|
|
def require_user_permission(permission_class, scope=None):
|
|
def _require_permission(allow_for_superuser=False, disallow_for_restricted_users=False):
|
|
def wrapper(func):
|
|
@add_method_metadata("oauth2_scope", scope)
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
user = get_authenticated_user()
|
|
if not user:
|
|
raise Unauthorized()
|
|
|
|
if features.RESTRICTED_USERS and disallow_for_restricted_users:
|
|
if (
|
|
usermanager.is_restricted_user(user.username)
|
|
and not SuperUserPermission().can()
|
|
):
|
|
raise Unauthorized()
|
|
|
|
logger.debug("Checking permission %s for user %s", permission_class, user.username)
|
|
permission = permission_class(user.username)
|
|
if permission.can():
|
|
return func(self, *args, **kwargs)
|
|
|
|
if allow_for_superuser and allow_if_superuser_with_full_access():
|
|
return func(self, *args, **kwargs)
|
|
|
|
raise Unauthorized()
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
return _require_permission
|
|
|
|
|
|
require_user_read = require_user_permission(UserReadPermission, scopes.READ_USER)
|
|
require_user_admin = require_user_permission(UserAdminPermission, scopes.ADMIN_USER)
|
|
|
|
|
|
def log_unauthorized(audit_event):
|
|
def inner(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except endpoints.v2.errors.Unauthorized as e:
|
|
|
|
if (
|
|
(
|
|
app.config.get("ACTION_LOG_AUDIT_PUSH_FAILURES")
|
|
and audit_event == "push_repo_failed"
|
|
)
|
|
or (
|
|
app.config.get("ACTION_LOG_AUDIT_PULL_FAILURES")
|
|
and audit_event == "pull_repo_failed"
|
|
)
|
|
or (
|
|
app.config.get("ACTION_LOG_AUDIT_DELETE_FAILURES")
|
|
and audit_event == "delete_tag_failed"
|
|
)
|
|
):
|
|
if "namespace_name" in kwargs and "repo_name" in kwargs:
|
|
metadata = {
|
|
"namespace": kwargs["namespace_name"],
|
|
"repo": kwargs["repo_name"],
|
|
}
|
|
|
|
if "manifest_ref" in kwargs:
|
|
try:
|
|
digest = digest_tools.Digest.parse_digest(kwargs["manifest_ref"])
|
|
metadata["manifest_digest"] = str(digest)
|
|
except digest_tools.InvalidDigestException:
|
|
metadata["tag"] = kwargs["manifest_ref"]
|
|
|
|
user_or_orgname = data_model.user.get_user_or_org(kwargs["namespace_name"])
|
|
|
|
if user_or_orgname is not None:
|
|
repo = data_model.repository.get_repository(
|
|
user_or_orgname.username, kwargs["repo_name"]
|
|
)
|
|
else:
|
|
repo = None
|
|
|
|
if user_or_orgname is None:
|
|
metadata["message"] = "Namespace does not exist"
|
|
log_action(
|
|
kind=audit_event,
|
|
user_or_orgname=None,
|
|
metadata=metadata,
|
|
)
|
|
elif repo is None:
|
|
metadata["message"] = "Repository does not exist"
|
|
log_action(
|
|
kind=audit_event,
|
|
user_or_orgname=user_or_orgname.username,
|
|
metadata=metadata,
|
|
)
|
|
else:
|
|
metadata["message"] = str(e)
|
|
log_action(
|
|
kind=audit_event,
|
|
user_or_orgname=user_or_orgname.username,
|
|
repo_name=repo.name,
|
|
metadata=metadata,
|
|
)
|
|
|
|
logger.debug("Unauthorized request: %s", e)
|
|
|
|
raise e
|
|
|
|
return wrapper
|
|
|
|
return inner
|
|
|
|
|
|
log_unauthorized_pull = log_unauthorized("pull_repo_failed")
|
|
log_unauthorized_push = log_unauthorized("push_repo_failed")
|
|
log_unauthorized_delete = log_unauthorized("delete_tag_failed")
|
|
|
|
|
|
def allow_if_superuser():
|
|
"""
|
|
Returns True if the user is a regular superuser (not global readonly).
|
|
|
|
This is for basic superuser panel access and should work when FEATURE_SUPER_USERS
|
|
is enabled, regardless of SUPERUSERS_FULL_ACCESS. This does NOT grant permission
|
|
to bypass normal access controls on other users' resources.
|
|
|
|
For operations that need to bypass normal permission checks (like accessing other
|
|
organizations or creating repos in other namespaces), use allow_if_superuser_with_full_access().
|
|
"""
|
|
return SuperUserPermission().can()
|
|
|
|
|
|
def allow_if_superuser_with_full_access():
|
|
"""
|
|
Returns True if the user is a superuser with full access enabled.
|
|
|
|
This is for operations that bypass normal permission checks to access or modify
|
|
resources owned by other users/organizations. Examples include:
|
|
- Creating repositories in other namespaces
|
|
- Modifying teams/robots in other organizations
|
|
- Accessing private data of other organizations
|
|
|
|
Requires both:
|
|
- User is a superuser (FEATURE_SUPER_USERS enabled and user in SUPER_USERS list)
|
|
- FEATURE_SUPERUSERS_FULL_ACCESS is enabled
|
|
|
|
Note: Global readonly superusers are explicitly excluded from this permission.
|
|
"""
|
|
return bool(features.SUPERUSERS_FULL_ACCESS and SuperUserPermission().can())
|
|
|
|
|
|
def allow_if_any_superuser():
|
|
"""
|
|
Returns True if the user is either a regular superuser or a global readonly superuser.
|
|
|
|
Since these two types are mutually exclusive, this is a convenience helper for read-only
|
|
endpoints that should be accessible to both types of superusers (like viewing user lists,
|
|
logs, organizations in the superuser panel).
|
|
|
|
Note: Regular superusers work with just FEATURE_SUPER_USERS enabled. Global readonly
|
|
superusers are always allowed (when the feature is enabled) since they're read-only by design.
|
|
"""
|
|
return allow_if_superuser() or allow_if_global_readonly_superuser()
|
|
|
|
|
|
def allow_if_global_readonly_superuser():
|
|
ldap_filter = app.config.get("LDAP_GLOBAL_READONLY_SUPERUSER_FILTER", None)
|
|
config_users = app.config.get("GLOBAL_READONLY_SUPER_USERS", None)
|
|
|
|
logger.debug(
|
|
"allow_if_global_readonly_superuser: ldap_filter=%s, config_users=%s",
|
|
ldap_filter,
|
|
config_users,
|
|
)
|
|
|
|
if ldap_filter is None and config_users is None:
|
|
logger.debug("allow_if_global_readonly_superuser: returning False - no config")
|
|
return False
|
|
|
|
context = get_authenticated_context()
|
|
logger.debug("allow_if_global_readonly_superuser: context=%s", context)
|
|
|
|
if context is None or context.authed_user is None:
|
|
logger.debug("allow_if_global_readonly_superuser: returning False - no context/user")
|
|
return False
|
|
|
|
username = context.authed_user.username
|
|
is_global_readonly = usermanager.is_global_readonly_superuser(username)
|
|
logger.debug(
|
|
"allow_if_global_readonly_superuser: user=%s, is_global_readonly=%s",
|
|
username,
|
|
is_global_readonly,
|
|
)
|
|
|
|
return is_global_readonly
|
|
|
|
|
|
def verify_not_prod(func):
|
|
@add_method_metadata("enterprise_only", True)
|
|
@wraps(func)
|
|
def wrapped(*args, **kwargs):
|
|
# Verify that we are not running on a production (i.e. hosted) stack. If so, we fail.
|
|
# This should never happen (because of the feature-flag on SUPER_USERS), but we want to be
|
|
# absolutely sure.
|
|
if app.config["SERVER_HOSTNAME"].find("quay.io") >= 0:
|
|
logger.error("!!! Super user method called IN PRODUCTION !!!")
|
|
raise NotFound()
|
|
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def require_fresh_login(func):
|
|
@add_method_metadata("requires_fresh_login", True)
|
|
@wraps(func)
|
|
def wrapped(*args, **kwargs):
|
|
user = get_authenticated_user()
|
|
if not user or user.robot:
|
|
raise Unauthorized()
|
|
|
|
if get_validated_oauth_token():
|
|
return func(*args, **kwargs)
|
|
|
|
last_login = session.get("login_time", datetime.datetime.min)
|
|
valid_span = datetime.datetime.now() - FRESH_LOGIN_TIMEOUT
|
|
logger.debug(
|
|
"Checking fresh login for user %s: Last login at %s", user.username, last_login
|
|
)
|
|
|
|
if (
|
|
last_login.replace(tzinfo=pytz.UTC) >= valid_span.replace(tzinfo=pytz.UTC)
|
|
or not authentication.supports_fresh_login
|
|
or not authentication.has_password_set(user.username)
|
|
):
|
|
return func(*args, **kwargs)
|
|
|
|
raise FreshLoginRequired()
|
|
|
|
return wrapped
|
|
|
|
|
|
def require_scope(scope_object):
|
|
def wrapper(func):
|
|
@add_method_metadata("oauth2_scope", scope_object)
|
|
@wraps(func)
|
|
def wrapped(*args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
def max_json_size(max_size):
|
|
def wrapper(func):
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
if request.is_json and len(request.get_data()) > max_size:
|
|
raise InvalidRequest()
|
|
|
|
return func(self, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
def validate_json_request(schema_name, optional=False):
|
|
def wrapper(func):
|
|
@add_method_metadata("request_schema", schema_name)
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
schema = self.schemas[schema_name]
|
|
try:
|
|
json_data = request.get_json(silent=optional)
|
|
if json_data is None:
|
|
if not optional:
|
|
raise InvalidRequest("Missing JSON body")
|
|
else:
|
|
validate(json_data, schema)
|
|
return func(self, *args, **kwargs)
|
|
except ValidationError as ex:
|
|
raise InvalidRequest(str(ex))
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
def request_error(exception=None, **kwargs):
|
|
data = kwargs.copy()
|
|
message = "Request error."
|
|
if exception:
|
|
message = str(exception)
|
|
|
|
message = data.pop("message", message)
|
|
raise InvalidRequest(message, data)
|
|
|
|
|
|
def log_action(kind, user_or_orgname, metadata=None, repo=None, repo_name=None, performer=None):
|
|
if not metadata:
|
|
metadata = {}
|
|
|
|
oauth_token = get_validated_oauth_token()
|
|
if oauth_token:
|
|
metadata["oauth_token_id"] = oauth_token.id
|
|
metadata["oauth_token_application_id"] = oauth_token.application.client_id
|
|
metadata["oauth_token_application"] = oauth_token.application.name
|
|
|
|
if performer is None:
|
|
performer = get_authenticated_user()
|
|
|
|
if repo_name is not None:
|
|
repo = data_model.repository.get_repository(user_or_orgname, repo_name)
|
|
|
|
logs_model.log_action(
|
|
kind,
|
|
user_or_orgname,
|
|
repository=repo,
|
|
performer=performer,
|
|
ip=get_request_ip(),
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
def define_json_response(schema_name):
|
|
def wrapper(func):
|
|
@add_method_metadata("response_schema", schema_name)
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
schema = self.schemas[schema_name]
|
|
resp = func(self, *args, **kwargs)
|
|
|
|
if app.config["TESTING"]:
|
|
try:
|
|
validate(resp, schema)
|
|
except ValidationError as ex:
|
|
raise InvalidResponse(str(ex))
|
|
|
|
return resp
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
def deprecated():
|
|
"""
|
|
Marks a given API resource operation as deprecated by adding `Deprecation` header.
|
|
See https://tools.ietf.org/id/draft-dalal-deprecation-header-01.html#RFC7234.
|
|
"""
|
|
|
|
def wrapper(func):
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
(data, code, headers) = unpack(func(self, *args, **kwargs))
|
|
headers["Deprecation"] = "true"
|
|
|
|
return (data, code, headers)
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
import endpoints.api.appspecifictokens
|
|
import endpoints.api.billing
|
|
import endpoints.api.build
|
|
import endpoints.api.discovery
|
|
import endpoints.api.error
|
|
import endpoints.api.globalmessages
|
|
import endpoints.api.logs
|
|
import endpoints.api.manifest
|
|
import endpoints.api.mirror
|
|
import endpoints.api.namespacequota
|
|
import endpoints.api.organization
|
|
import endpoints.api.permission
|
|
import endpoints.api.policy
|
|
import endpoints.api.prototype
|
|
import endpoints.api.repoemail
|
|
import endpoints.api.repository
|
|
import endpoints.api.repositorynotification
|
|
import endpoints.api.repotoken
|
|
import endpoints.api.robot
|
|
import endpoints.api.search
|
|
import endpoints.api.secscan
|
|
import endpoints.api.signing
|
|
import endpoints.api.suconfig
|
|
import endpoints.api.superuser
|
|
import endpoints.api.tag
|
|
import endpoints.api.team
|
|
import endpoints.api.trigger
|
|
import endpoints.api.user
|