mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
Add REST API for managing immutability policies at organization and repository levels. Integrate policy evaluation into tag creation. Signed-off-by: Brady Pratt <bpratt@redhat.com> Co-authored-by: Claude <noreply@anthropic.com>
787 lines
25 KiB
Python
787 lines
25 KiB
Python
import datetime
|
|
import logging
|
|
from calendar import timegm
|
|
from email.utils import formatdate
|
|
from functools import partial, wraps
|
|
|
|
import pytz
|
|
from flask import Blueprint, request, session
|
|
from flask_restful import Api, Resource, abort, reqparse
|
|
from flask_restful.utils import unpack
|
|
from jsonschema import ValidationError, validate
|
|
from werkzeug.routing.exceptions import RequestRedirect
|
|
|
|
import features
|
|
from .__init__models_pre_oci import pre_oci_model as model
|
|
from app import app, authentication, usermanager
|
|
from auth import scopes
|
|
from auth.auth_context import (
|
|
get_authenticated_context,
|
|
get_authenticated_user,
|
|
get_validated_oauth_token,
|
|
)
|
|
from auth.decorators import process_oauth
|
|
from auth.permissions import (
|
|
AdministerRepositoryPermission,
|
|
GlobalReadOnlySuperUserPermission,
|
|
ModifyRepositoryPermission,
|
|
ReadRepositoryPermission,
|
|
SuperUserPermission,
|
|
UserAdminPermission,
|
|
UserReadPermission,
|
|
)
|
|
from data import model as data_model
|
|
from data.database import RepositoryState
|
|
from data.logs_model import logs_model
|
|
from digest import digest_tools
|
|
from endpoints.csrf import csrf_protect
|
|
from endpoints.decorators import (
|
|
check_anon_protection,
|
|
check_readonly,
|
|
require_xhr_from_browser,
|
|
)
|
|
from endpoints.exception import (
|
|
FreshLoginRequired,
|
|
InvalidRequest,
|
|
InvalidResponse,
|
|
NotFound,
|
|
Unauthorized,
|
|
)
|
|
from util.metrics.prometheus import timed_blueprint
|
|
from util.names import parse_namespace_repository
|
|
from util.pagination import decrypt_page_token, encrypt_page_token
|
|
from util.request import crossorigin, get_request_ip
|
|
from util.timedeltastring import convert_to_timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
api_bp = timed_blueprint(Blueprint("api", __name__), get_app=lambda: app)
|
|
|
|
|
|
FRESH_LOGIN_TIMEOUT = convert_to_timedelta(app.config.get("FRESH_LOGIN_TIMEOUT", "10m"))
|
|
|
|
|
|
class ApiExceptionHandlingApi(Api):
|
|
@crossorigin()
|
|
def handle_error(self, error):
|
|
return super(ApiExceptionHandlingApi, self).handle_error(error)
|
|
|
|
def _should_use_fr_error_handler(self):
|
|
try:
|
|
return super(ApiExceptionHandlingApi, self)._should_use_fr_error_handler()
|
|
except RequestRedirect:
|
|
return False
|
|
|
|
|
|
api = ApiExceptionHandlingApi()
|
|
api.init_app(api_bp)
|
|
api.decorators = [
|
|
csrf_protect(),
|
|
crossorigin(),
|
|
process_oauth,
|
|
require_xhr_from_browser,
|
|
]
|
|
|
|
|
|
def resource(*urls, **kwargs):
|
|
def wrapper(api_resource):
|
|
if not api_resource:
|
|
return None
|
|
|
|
api_resource.registered = True
|
|
api.add_resource(api_resource, *urls, **kwargs)
|
|
return api_resource
|
|
|
|
return wrapper
|
|
|
|
|
|
def show_if(value):
|
|
def f(inner):
|
|
if hasattr(inner, "registered") and inner.registered:
|
|
msg = (
|
|
"API endpoint %s is already registered; please switch the "
|
|
+ "@show_if to be *below* the @resource decorator"
|
|
)
|
|
raise Exception(msg % inner)
|
|
|
|
if not value:
|
|
return None
|
|
|
|
return inner
|
|
|
|
return f
|
|
|
|
|
|
def hide_if(value):
|
|
def f(inner):
|
|
if hasattr(inner, "registered") and inner.registered:
|
|
msg = (
|
|
"API endpoint %s is already registered; please switch the "
|
|
+ "@hide_if to be *below* the @resource decorator"
|
|
)
|
|
raise Exception(msg % inner)
|
|
|
|
if value:
|
|
return None
|
|
|
|
return inner
|
|
|
|
return f
|
|
|
|
|
|
def format_date(date):
|
|
"""
|
|
Output an RFC 2822 date format.
|
|
"""
|
|
if date is None:
|
|
return None
|
|
return formatdate(timegm(date.utctimetuple()))
|
|
|
|
|
|
def add_method_metadata(name, value):
|
|
def modifier(func):
|
|
if func is None:
|
|
return None
|
|
|
|
if "__api_metadata" not in dir(func):
|
|
func.__api_metadata = {}
|
|
func.__api_metadata[name] = value
|
|
return func
|
|
|
|
return modifier
|
|
|
|
|
|
def method_metadata(func, name):
|
|
if func is None:
|
|
return None
|
|
|
|
if "__api_metadata" in dir(func):
|
|
return func.__api_metadata.get(name, None)
|
|
return None
|
|
|
|
|
|
nickname = partial(add_method_metadata, "nickname")
|
|
related_user_resource = partial(add_method_metadata, "related_user_resource")
|
|
internal_only = add_method_metadata("internal", True)
|
|
|
|
|
|
def path_param(name, description):
|
|
def add_param(func):
|
|
if not func:
|
|
return func
|
|
|
|
if "__api_path_params" not in dir(func):
|
|
func.__api_path_params = {}
|
|
func.__api_path_params[name] = {"name": name, "description": description}
|
|
return func
|
|
|
|
return add_param
|
|
|
|
|
|
def query_param(name, help_str, type=reqparse.text_type, default=None, choices=(), required=False):
|
|
def add_param(func):
|
|
if "__api_query_params" not in dir(func):
|
|
func.__api_query_params = []
|
|
func.__api_query_params.append(
|
|
{
|
|
"name": name,
|
|
"type": type,
|
|
"help": help_str,
|
|
"default": default,
|
|
"choices": choices,
|
|
"required": required,
|
|
"location": ("args"),
|
|
}
|
|
)
|
|
return func
|
|
|
|
return add_param
|
|
|
|
|
|
def page_support(page_token_kwarg="page_token", parsed_args_kwarg="parsed_args"):
|
|
def inner(func):
|
|
"""
|
|
Adds pagination support to an API endpoint.
|
|
|
|
The decorated API will have an added query parameter named 'next_page'. Works in tandem with
|
|
the modelutil paginate method.
|
|
"""
|
|
|
|
@wraps(func)
|
|
@query_param("next_page", "The page token for the next page", type=str)
|
|
def wrapper(self, *args, **kwargs):
|
|
# Note: if page_token is None, we'll receive the first page of results back.
|
|
page_token = decrypt_page_token(kwargs[parsed_args_kwarg]["next_page"])
|
|
kwargs[page_token_kwarg] = page_token
|
|
|
|
(result, next_page_token) = func(self, *args, **kwargs)
|
|
if next_page_token is not None:
|
|
result["next_page"] = encrypt_page_token(next_page_token)
|
|
|
|
return result
|
|
|
|
return wrapper
|
|
|
|
return inner
|
|
|
|
|
|
def parse_args(kwarg_name="parsed_args"):
|
|
def inner(func):
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if "__api_query_params" not in dir(func):
|
|
abort(500)
|
|
|
|
parser = reqparse.RequestParser()
|
|
for arg_spec in func.__api_query_params:
|
|
parser.add_argument(**arg_spec)
|
|
kwargs[kwarg_name] = parser.parse_args()
|
|
|
|
return func(self, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return inner
|
|
|
|
|
|
def parse_repository_name(func):
|
|
@wraps(func)
|
|
def wrapper(repository, *args, **kwargs):
|
|
(namespace, repository) = parse_namespace_repository(
|
|
repository, app.config["LIBRARY_NAMESPACE"]
|
|
)
|
|
return func(namespace, repository, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
class ApiResource(Resource):
|
|
registered = False
|
|
method_decorators = [
|
|
check_anon_protection,
|
|
check_readonly,
|
|
]
|
|
|
|
def options(self):
|
|
return None, 200
|
|
|
|
|
|
class RepositoryParamResource(ApiResource):
|
|
method_decorators = [
|
|
check_anon_protection,
|
|
parse_repository_name,
|
|
check_readonly,
|
|
]
|
|
|
|
|
|
def disallow_for_user_namespace(func):
|
|
@wraps(func)
|
|
def wrapped(self, namespace_name, repository_name, *args, **kwargs):
|
|
if features.RESTRICTED_USERS:
|
|
user = get_authenticated_user()
|
|
if (
|
|
user is not None
|
|
and user.username == namespace_name
|
|
and usermanager.is_restricted_user(user.username)
|
|
):
|
|
abort(403, message="Operation not allowed on restricted user owned namespace")
|
|
|
|
return func(self, namespace_name, repository_name, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def disallow_for_app_repositories(func):
|
|
@wraps(func)
|
|
def wrapped(self, namespace_name, repository_name, *args, **kwargs):
|
|
# Lookup the repository with the given namespace and name and ensure it is not an application
|
|
# repository.
|
|
if model.is_app_repository(namespace_name, repository_name):
|
|
abort(501)
|
|
|
|
return func(self, namespace_name, repository_name, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def disallow_for_non_normal_repositories(func):
|
|
@wraps(func)
|
|
def wrapped(self, namespace_name, repository_name, *args, **kwargs):
|
|
repo = data_model.repository.get_repository(namespace_name, repository_name)
|
|
if repo and repo.state != RepositoryState.NORMAL:
|
|
abort(503, message="Repository is in read only or mirror mode: %s" % repo.state)
|
|
|
|
return func(self, namespace_name, repository_name, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def require_repo_permission(permission_class, scope, allow_public=False):
|
|
def _require_permission(
|
|
allow_for_superuser=False,
|
|
disallow_for_restricted_user=False,
|
|
allow_for_global_readonly_superuser=False,
|
|
):
|
|
def wrapper(func):
|
|
@add_method_metadata("oauth2_scope", scope)
|
|
@wraps(func)
|
|
def wrapped(self, namespace, repository, *args, **kwargs):
|
|
logger.debug(
|
|
"Checking permission %s for repo: %s/%s",
|
|
permission_class,
|
|
namespace,
|
|
repository,
|
|
)
|
|
|
|
user = get_authenticated_user()
|
|
|
|
if features.RESTRICTED_USERS and disallow_for_restricted_user:
|
|
if (
|
|
usermanager.is_restricted_user(user.username)
|
|
and not (allow_public and model.repository_is_public(namespace, repository))
|
|
and not SuperUserPermission().can()
|
|
):
|
|
raise Unauthorized()
|
|
|
|
permission = permission_class(namespace, repository)
|
|
if permission.can() or (
|
|
allow_public
|
|
and model.repository_is_public(namespace, repository)
|
|
or (allow_public and GlobalReadOnlySuperUserPermission().can())
|
|
):
|
|
return func(self, namespace, repository, *args, **kwargs)
|
|
|
|
if allow_for_superuser:
|
|
user = get_authenticated_user()
|
|
|
|
if user is not None:
|
|
# For read operations that also allow global readonly superusers,
|
|
# allow any superuser with FULL_ACCESS
|
|
if (
|
|
allow_for_global_readonly_superuser
|
|
and features.SUPERUSERS_FULL_ACCESS
|
|
and allow_if_any_superuser()
|
|
):
|
|
return func(self, namespace, repository, *args, **kwargs)
|
|
# For write operations, only allow regular superusers with FULL_ACCESS
|
|
elif (
|
|
not allow_for_global_readonly_superuser
|
|
and allow_if_superuser_with_full_access()
|
|
):
|
|
return func(self, namespace, repository, *args, **kwargs)
|
|
|
|
if allow_for_global_readonly_superuser and allow_if_global_readonly_superuser():
|
|
return func(self, namespace, repository, *args, **kwargs)
|
|
|
|
raise Unauthorized()
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
return _require_permission
|
|
|
|
|
|
require_repo_read = require_repo_permission(ReadRepositoryPermission, scopes.READ_REPO, True)
|
|
require_repo_write = require_repo_permission(ModifyRepositoryPermission, scopes.WRITE_REPO)
|
|
require_repo_admin = require_repo_permission(AdministerRepositoryPermission, scopes.ADMIN_REPO)
|
|
|
|
|
|
def require_user_permission(permission_class, scope=None):
|
|
def _require_permission(allow_for_superuser=False, disallow_for_restricted_users=False):
|
|
def wrapper(func):
|
|
@add_method_metadata("oauth2_scope", scope)
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
user = get_authenticated_user()
|
|
if not user:
|
|
raise Unauthorized()
|
|
|
|
if features.RESTRICTED_USERS and disallow_for_restricted_users:
|
|
if (
|
|
usermanager.is_restricted_user(user.username)
|
|
and not SuperUserPermission().can()
|
|
):
|
|
raise Unauthorized()
|
|
|
|
logger.debug("Checking permission %s for user %s", permission_class, user.username)
|
|
permission = permission_class(user.username)
|
|
if permission.can():
|
|
return func(self, *args, **kwargs)
|
|
|
|
if allow_for_superuser and allow_if_superuser_with_full_access():
|
|
return func(self, *args, **kwargs)
|
|
|
|
raise Unauthorized()
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
return _require_permission
|
|
|
|
|
|
require_user_read = require_user_permission(UserReadPermission, scopes.READ_USER)
|
|
require_user_admin = require_user_permission(UserAdminPermission, scopes.ADMIN_USER)
|
|
|
|
|
|
def log_unauthorized(audit_event):
|
|
def inner(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except endpoints.v2.errors.Unauthorized as e:
|
|
|
|
if (
|
|
(
|
|
app.config.get("ACTION_LOG_AUDIT_PUSH_FAILURES")
|
|
and audit_event == "push_repo_failed"
|
|
)
|
|
or (
|
|
app.config.get("ACTION_LOG_AUDIT_PULL_FAILURES")
|
|
and audit_event == "pull_repo_failed"
|
|
)
|
|
or (
|
|
app.config.get("ACTION_LOG_AUDIT_DELETE_FAILURES")
|
|
and audit_event == "delete_tag_failed"
|
|
)
|
|
):
|
|
if "namespace_name" in kwargs and "repo_name" in kwargs:
|
|
metadata = {
|
|
"namespace": kwargs["namespace_name"],
|
|
"repo": kwargs["repo_name"],
|
|
}
|
|
|
|
if "manifest_ref" in kwargs:
|
|
try:
|
|
digest = digest_tools.Digest.parse_digest(kwargs["manifest_ref"])
|
|
metadata["manifest_digest"] = str(digest)
|
|
except digest_tools.InvalidDigestException:
|
|
metadata["tag"] = kwargs["manifest_ref"]
|
|
|
|
user_or_orgname = data_model.user.get_user_or_org(kwargs["namespace_name"])
|
|
|
|
if user_or_orgname is not None:
|
|
repo = data_model.repository.get_repository(
|
|
user_or_orgname.username, kwargs["repo_name"]
|
|
)
|
|
else:
|
|
repo = None
|
|
|
|
if user_or_orgname is None:
|
|
metadata["message"] = "Namespace does not exist"
|
|
log_action(
|
|
kind=audit_event,
|
|
user_or_orgname=None,
|
|
metadata=metadata,
|
|
)
|
|
elif repo is None:
|
|
metadata["message"] = "Repository does not exist"
|
|
log_action(
|
|
kind=audit_event,
|
|
user_or_orgname=user_or_orgname.username,
|
|
metadata=metadata,
|
|
)
|
|
else:
|
|
metadata["message"] = str(e)
|
|
log_action(
|
|
kind=audit_event,
|
|
user_or_orgname=user_or_orgname.username,
|
|
repo_name=repo.name,
|
|
metadata=metadata,
|
|
)
|
|
|
|
logger.debug("Unauthorized request: %s", e)
|
|
|
|
raise e
|
|
|
|
return wrapper
|
|
|
|
return inner
|
|
|
|
|
|
log_unauthorized_pull = log_unauthorized("pull_repo_failed")
|
|
log_unauthorized_push = log_unauthorized("push_repo_failed")
|
|
log_unauthorized_delete = log_unauthorized("delete_tag_failed")
|
|
|
|
|
|
def allow_if_superuser():
|
|
"""
|
|
Returns True if the user is a regular superuser (not global readonly).
|
|
|
|
This is for basic superuser panel access and should work when FEATURE_SUPER_USERS
|
|
is enabled, regardless of SUPERUSERS_FULL_ACCESS. This does NOT grant permission
|
|
to bypass normal access controls on other users' resources.
|
|
|
|
For operations that need to bypass normal permission checks (like accessing other
|
|
organizations or creating repos in other namespaces), use allow_if_superuser_with_full_access().
|
|
"""
|
|
return SuperUserPermission().can()
|
|
|
|
|
|
def allow_if_superuser_with_full_access():
|
|
"""
|
|
Returns True if the user is a superuser with full access enabled.
|
|
|
|
This is for operations that bypass normal permission checks to access or modify
|
|
resources owned by other users/organizations. Examples include:
|
|
- Creating repositories in other namespaces
|
|
- Modifying teams/robots in other organizations
|
|
- Accessing private data of other organizations
|
|
|
|
Requires both:
|
|
- User is a superuser (FEATURE_SUPER_USERS enabled and user in SUPER_USERS list)
|
|
- FEATURE_SUPERUSERS_FULL_ACCESS is enabled
|
|
|
|
Note: Global readonly superusers are explicitly excluded from this permission.
|
|
"""
|
|
return bool(features.SUPERUSERS_FULL_ACCESS and SuperUserPermission().can())
|
|
|
|
|
|
def allow_if_any_superuser():
|
|
"""
|
|
Returns True if the user is either a regular superuser or a global readonly superuser.
|
|
|
|
Since these two types are mutually exclusive, this is a convenience helper for read-only
|
|
endpoints that should be accessible to both types of superusers (like viewing user lists,
|
|
logs, organizations in the superuser panel).
|
|
|
|
Note: Regular superusers work with just FEATURE_SUPER_USERS enabled. Global readonly
|
|
superusers are always allowed (when the feature is enabled) since they're read-only by design.
|
|
"""
|
|
return allow_if_superuser() or allow_if_global_readonly_superuser()
|
|
|
|
|
|
def allow_if_global_readonly_superuser():
|
|
ldap_filter = app.config.get("LDAP_GLOBAL_READONLY_SUPERUSER_FILTER", None)
|
|
config_users = app.config.get("GLOBAL_READONLY_SUPER_USERS", None)
|
|
|
|
logger.debug(
|
|
"allow_if_global_readonly_superuser: ldap_filter=%s, config_users=%s",
|
|
ldap_filter,
|
|
config_users,
|
|
)
|
|
|
|
if ldap_filter is None and config_users is None:
|
|
logger.debug("allow_if_global_readonly_superuser: returning False - no config")
|
|
return False
|
|
|
|
context = get_authenticated_context()
|
|
logger.debug("allow_if_global_readonly_superuser: context=%s", context)
|
|
|
|
if context is None or context.authed_user is None:
|
|
logger.debug("allow_if_global_readonly_superuser: returning False - no context/user")
|
|
return False
|
|
|
|
username = context.authed_user.username
|
|
is_global_readonly = usermanager.is_global_readonly_superuser(username)
|
|
logger.debug(
|
|
"allow_if_global_readonly_superuser: user=%s, is_global_readonly=%s",
|
|
username,
|
|
is_global_readonly,
|
|
)
|
|
|
|
return is_global_readonly
|
|
|
|
|
|
def verify_not_prod(func):
|
|
@add_method_metadata("enterprise_only", True)
|
|
@wraps(func)
|
|
def wrapped(*args, **kwargs):
|
|
# Verify that we are not running on a production (i.e. hosted) stack. If so, we fail.
|
|
# This should never happen (because of the feature-flag on SUPER_USERS), but we want to be
|
|
# absolutely sure.
|
|
if app.config["SERVER_HOSTNAME"].find("quay.io") >= 0:
|
|
logger.error("!!! Super user method called IN PRODUCTION !!!")
|
|
raise NotFound()
|
|
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def require_fresh_login(func):
|
|
@add_method_metadata("requires_fresh_login", True)
|
|
@wraps(func)
|
|
def wrapped(*args, **kwargs):
|
|
user = get_authenticated_user()
|
|
if not user or user.robot:
|
|
raise Unauthorized()
|
|
|
|
if get_validated_oauth_token():
|
|
return func(*args, **kwargs)
|
|
|
|
last_login = session.get("login_time", datetime.datetime.min)
|
|
valid_span = datetime.datetime.now() - FRESH_LOGIN_TIMEOUT
|
|
logger.debug(
|
|
"Checking fresh login for user %s: Last login at %s", user.username, last_login
|
|
)
|
|
|
|
if (
|
|
last_login.replace(tzinfo=pytz.UTC) >= valid_span.replace(tzinfo=pytz.UTC)
|
|
or not authentication.supports_fresh_login
|
|
or not authentication.has_password_set(user.username)
|
|
):
|
|
return func(*args, **kwargs)
|
|
|
|
raise FreshLoginRequired()
|
|
|
|
return wrapped
|
|
|
|
|
|
def require_scope(scope_object):
|
|
def wrapper(func):
|
|
@add_method_metadata("oauth2_scope", scope_object)
|
|
@wraps(func)
|
|
def wrapped(*args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
def max_json_size(max_size):
|
|
def wrapper(func):
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
if request.is_json and len(request.get_data()) > max_size:
|
|
raise InvalidRequest()
|
|
|
|
return func(self, *args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
def validate_json_request(schema_name, optional=False):
|
|
def wrapper(func):
|
|
@add_method_metadata("request_schema", schema_name)
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
schema = self.schemas[schema_name]
|
|
try:
|
|
json_data = request.get_json(silent=optional)
|
|
if json_data is None:
|
|
if not optional:
|
|
raise InvalidRequest("Missing JSON body")
|
|
else:
|
|
validate(json_data, schema)
|
|
return func(self, *args, **kwargs)
|
|
except ValidationError as ex:
|
|
raise InvalidRequest(str(ex))
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
def request_error(exception=None, **kwargs):
|
|
data = kwargs.copy()
|
|
message = "Request error."
|
|
if exception:
|
|
message = str(exception)
|
|
|
|
message = data.pop("message", message)
|
|
raise InvalidRequest(message, data)
|
|
|
|
|
|
def log_action(kind, user_or_orgname, metadata=None, repo=None, repo_name=None, performer=None):
|
|
if not metadata:
|
|
metadata = {}
|
|
|
|
oauth_token = get_validated_oauth_token()
|
|
if oauth_token:
|
|
metadata["oauth_token_id"] = oauth_token.id
|
|
metadata["oauth_token_application_id"] = oauth_token.application.client_id
|
|
metadata["oauth_token_application"] = oauth_token.application.name
|
|
|
|
if performer is None:
|
|
performer = get_authenticated_user()
|
|
|
|
if repo_name is not None:
|
|
repo = data_model.repository.get_repository(user_or_orgname, repo_name)
|
|
|
|
logs_model.log_action(
|
|
kind,
|
|
user_or_orgname,
|
|
repository=repo,
|
|
performer=performer,
|
|
ip=get_request_ip(),
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
def define_json_response(schema_name):
|
|
def wrapper(func):
|
|
@add_method_metadata("response_schema", schema_name)
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
schema = self.schemas[schema_name]
|
|
resp = func(self, *args, **kwargs)
|
|
|
|
if app.config["TESTING"]:
|
|
try:
|
|
validate(resp, schema)
|
|
except ValidationError as ex:
|
|
raise InvalidResponse(str(ex))
|
|
|
|
return resp
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
def deprecated():
|
|
"""
|
|
Marks a given API resource operation as deprecated by adding `Deprecation` header.
|
|
See https://tools.ietf.org/id/draft-dalal-deprecation-header-01.html#RFC7234.
|
|
"""
|
|
|
|
def wrapper(func):
|
|
@wraps(func)
|
|
def wrapped(self, *args, **kwargs):
|
|
(data, code, headers) = unpack(func(self, *args, **kwargs))
|
|
headers["Deprecation"] = "true"
|
|
|
|
return (data, code, headers)
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
import endpoints.api.appspecifictokens
|
|
import endpoints.api.billing
|
|
import endpoints.api.build
|
|
import endpoints.api.capabilities
|
|
import endpoints.api.discovery
|
|
import endpoints.api.error
|
|
import endpoints.api.globalmessages
|
|
import endpoints.api.immutability_policy
|
|
import endpoints.api.logs
|
|
import endpoints.api.manifest
|
|
import endpoints.api.mirror
|
|
import endpoints.api.namespacequota
|
|
import endpoints.api.org_mirror
|
|
import endpoints.api.organization
|
|
import endpoints.api.permission
|
|
import endpoints.api.policy
|
|
import endpoints.api.prototype
|
|
import endpoints.api.repoemail
|
|
import endpoints.api.repository
|
|
import endpoints.api.repositorynotification
|
|
import endpoints.api.repotoken
|
|
import endpoints.api.robot
|
|
import endpoints.api.search
|
|
import endpoints.api.secscan
|
|
import endpoints.api.signing
|
|
import endpoints.api.suconfig
|
|
import endpoints.api.superuser
|
|
import endpoints.api.tag
|
|
import endpoints.api.team
|
|
import endpoints.api.trigger
|
|
import endpoints.api.user
|