1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/trigger.py
OpenShift Cherrypick Robot 57895ec081 [redhat-3.16] fix(api): superuser panel access without SUPERUSERS_FULL_ACCESS (PROJQUAY-9693) (#4512)
* fix(api): fix superuser panel access and permission bypass behavior

Fixed multiple issues with superuser functionality related to FEATURE_SUPERUSERS_FULL_ACCESS:

1. **Superuser Panel Access**: Fixed bug where superuser panel endpoints returned 403
   when FEATURE_SUPERUSERS_FULL_ACCESS was disabled. Basic superuser panel operations
   (user list, logs, org list, messages) now work with just FEATURE_SUPER_USERS enabled.

2. **Permission Bypass Logic**: Updated decorators to properly differentiate between:
   - Basic superuser operations (don't require FULL_ACCESS)
   - Permission bypass operations (require FULL_ACCESS)
   - Read operations (allow global readonly superusers with FULL_ACCESS)
   - Write operations (only allow regular superusers with FULL_ACCESS)

3. **Global Readonly Superuser Support**: Fixed decorators to allow global readonly
   superusers read access when FULL_ACCESS is enabled, while blocking write operations.

4. **License Bypass**: Superusers with FULL_ACCESS now bypass license/quota limits
   when creating or modifying private repositories in any namespace.

Changes:
- endpoints/api/__init__.py: Fixed allow_if_superuser() and decorators
- endpoints/api/organization.py: Updated 4 GET endpoints to require FULL_ACCESS
- endpoints/api/namespacequota.py: Updated 2 GET endpoints to require FULL_ACCESS
- endpoints/api/team.py: Updated 2 GET endpoints to require FULL_ACCESS
- endpoints/api/prototype.py: Updated 1 GET endpoint to require FULL_ACCESS
- endpoints/api/repository.py: Added license bypass for superusers with FULL_ACCESS
- endpoints/api/logs.py: Fixed log access to require FULL_ACCESS for permission bypass
- endpoints/api/test/test_superuser_full_access.py: Added comprehensive test suite
- test/test_api_usage.py: Updated test expectations for license bypass behavior

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* test(api): add plan limit test for regular users

Adds test_plan_limit_enforcement_for_regular_users to verify that
license limits still apply to non-superuser accounts. This restores
test coverage for plan limit enforcement that was changed when we
updated test_trychangevisibility to validate superuser bypass behavior.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(api): correct superuser cross-namespace permission model

Fixes permission checks to properly implement the superuser access model:

- Global Readonly Superusers (auditors): Always have read access to all
  content, independent of FEATURE_SUPERUSERS_FULL_ACCESS setting
- Regular Superusers: Can access /v1/superuser endpoints and their own
  content. Require FEATURE_SUPERUSERS_FULL_ACCESS=true for cross-namespace
  read access
- Full Access Superusers: Regular superusers with FULL_ACCESS enabled,
  can perform CRUD on content they don't own
- Write operations: Only allowed for full access superusers (global
  readonly superusers never get write access)

Updated 18 permission checks across 7 files:
- endpoints/api/__init__.py: require_repo_permission decorator
- endpoints/api/organization.py: org view, teams, collaborators, proxy cache
- endpoints/api/namespacequota.py: quota listing and individual quota
- endpoints/api/policy.py: auto-prune policies for orgs and repos
- endpoints/api/robot.py: robot listing, details, permissions, federation
- endpoints/api/build.py: repository build logs
- endpoints/api/repository_models_pre_oci.py: repository visibility query

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(api): require FULL_ACCESS for regular superusers on /v1/superuser/apptokens

Updates the SuperUserAppTokens endpoint to properly enforce permission model:

- Regular superusers WITHOUT FULL_ACCESS: Get 403 Unauthorized
- Global readonly superusers: Always get 200 (auditor access, independent of FULL_ACCESS)
- Full access superusers (FULL_ACCESS=true): Get 200 with all tokens

This ensures regular superusers can only audit app tokens across the system
when they have full access privileges enabled, while global readonly superusers
(auditors) always maintain read access for auditing purposes.

Changes:
- endpoints/api/superuser.py: Update permission check in SuperUserAppTokens.get()
- endpoints/api/test/test_appspecifictoken.py: Update test mocking and add new test
  for 403 behavior

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Dave O'Connor <doconnor@redhat.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Dave O'Connor <1656866+HammerMeetNail@users.noreply.github.com>
2025-11-13 16:50:32 +00:00

635 lines
22 KiB
Python

"""
Create, list and manage build triggers.
"""
import logging
from urllib.parse import urlunparse
from flask import request, url_for
from app import app
from auth.auth_context import get_authenticated_user
from auth.permissions import (
AdministerOrganizationPermission,
AdministerRepositoryPermission,
UserAdminPermission,
)
from buildtrigger.basehandler import BuildTriggerHandler
from buildtrigger.triggerutil import EmptyRepositoryException, TriggerException
from data import model
from data.fields import DecryptedValue
from data.model.build import update_build_trigger
from endpoints.api import (
RepositoryParamResource,
abort,
allow_if_superuser,
allow_if_superuser_with_full_access,
api,
disallow_for_app_repositories,
disallow_for_non_normal_repositories,
disallow_for_user_namespace,
internal_only,
log_action,
nickname,
parse_args,
path_param,
query_param,
request_error,
require_repo_admin,
resource,
validate_json_request,
)
from endpoints.api.build import RepositoryBuildStatus, build_status_view, trigger_view
from endpoints.api.trigger_analyzer import TriggerAnalyzer
from endpoints.building import (
BuildTriggerDisabledException,
MaximumBuildsQueuedException,
start_build,
)
from endpoints.exception import InvalidRequest, NotFound, Unauthorized
from util.names import parse_robot_username
logger = logging.getLogger(__name__)
def _prepare_webhook_url(scheme, username, password, hostname, path):
auth_hostname = "%s:%s@%s" % (username, password, hostname)
return urlunparse((scheme, auth_hostname, path, "", "", ""))
def get_trigger(trigger_uuid):
try:
trigger = model.build.get_build_trigger(trigger_uuid)
except model.InvalidBuildTriggerException:
raise NotFound()
return trigger
@resource("/v1/repository/<apirepopath:repository>/trigger/")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
class BuildTriggerList(RepositoryParamResource):
"""
Resource for listing repository build triggers.
"""
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@disallow_for_app_repositories
@nickname("listBuildTriggers")
def get(self, namespace_name, repo_name):
"""
List the triggers for the specified repository.
"""
triggers = model.build.list_build_triggers(namespace_name, repo_name)
return {"triggers": [trigger_view(trigger, can_admin=True) for trigger in triggers]}
@resource("/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("trigger_uuid", "The UUID of the build trigger")
class BuildTrigger(RepositoryParamResource):
"""
Resource for managing specific build triggers.
"""
schemas = {
"UpdateTrigger": {
"type": "object",
"description": "Options for updating a build trigger",
"required": [
"enabled",
],
"properties": {
"enabled": {
"type": "boolean",
"description": "Whether the build trigger is enabled",
},
},
},
}
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@disallow_for_app_repositories
@nickname("getBuildTrigger")
def get(self, namespace_name, repo_name, trigger_uuid):
"""
Get information for the specified build trigger.
"""
return trigger_view(get_trigger(trigger_uuid), can_admin=True)
@require_repo_admin(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("updateBuildTrigger")
@validate_json_request("UpdateTrigger")
def put(self, namespace_name, repo_name, trigger_uuid):
"""
Updates the specified build trigger.
"""
trigger = get_trigger(trigger_uuid)
handler = BuildTriggerHandler.get_handler(trigger)
if not handler.is_active():
raise InvalidRequest("Cannot update an unactivated trigger")
enable = request.get_json()["enabled"]
model.build.toggle_build_trigger(trigger, enable)
log_action(
"toggle_repo_trigger",
namespace_name,
{
"repo": repo_name,
"trigger_id": trigger_uuid,
"service": trigger.service.name,
"enabled": enable,
},
repo=model.repository.get_repository(namespace_name, repo_name),
)
return trigger_view(trigger)
@require_repo_admin(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("deleteBuildTrigger")
def delete(self, namespace_name, repo_name, trigger_uuid):
"""
Delete the specified build trigger.
"""
trigger = get_trigger(trigger_uuid)
handler = BuildTriggerHandler.get_handler(trigger)
if handler.is_active():
try:
handler.deactivate()
except TriggerException as ex:
# We are just going to eat this error
logger.warning("Trigger deactivation problem: %s", ex)
log_action(
"delete_repo_trigger",
namespace_name,
{"repo": repo_name, "trigger_id": trigger_uuid, "service": trigger.service.name},
repo=model.repository.get_repository(namespace_name, repo_name),
)
trigger.delete_instance(recursive=True)
if trigger.write_token is not None:
trigger.write_token.delete_instance()
return "No Content", 204
@resource("/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>/subdir")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("trigger_uuid", "The UUID of the build trigger")
@internal_only
class BuildTriggerSubdirs(RepositoryParamResource):
"""
Custom verb for fetching the subdirs which are buildable for a trigger.
"""
schemas = {
"BuildTriggerSubdirRequest": {
"type": "object",
"description": "Arbitrary json.",
},
}
@require_repo_admin(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("listBuildTriggerSubdirs")
@validate_json_request("BuildTriggerSubdirRequest")
def post(self, namespace_name, repo_name, trigger_uuid):
"""
List the subdirectories available for the specified build trigger and source.
"""
trigger = get_trigger(trigger_uuid)
user_permission = UserAdminPermission(trigger.connected_user.username)
if user_permission.can():
new_config_dict = request.get_json()
handler = BuildTriggerHandler.get_handler(trigger, new_config_dict)
try:
subdirs = handler.list_build_subdirs()
context_map = {}
for file in subdirs:
context_map = handler.get_parent_directory_mappings(file, context_map)
return {
"dockerfile_paths": ["/" + subdir for subdir in subdirs],
"contextMap": context_map,
"status": "success",
}
except EmptyRepositoryException as exc:
return {
"status": "success",
"contextMap": {},
"dockerfile_paths": [],
}
except TriggerException as exc:
return {
"status": "error",
"message": str(exc),
}
else:
raise Unauthorized()
@resource("/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>/activate")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("trigger_uuid", "The UUID of the build trigger")
class BuildTriggerActivate(RepositoryParamResource):
"""
Custom verb for activating a build trigger once all required information has been collected.
"""
schemas = {
"BuildTriggerActivateRequest": {
"type": "object",
"required": ["config"],
"properties": {
"config": {
"type": "object",
"description": "Arbitrary json.",
},
"pull_robot": {
"type": "string",
"description": "The name of the robot that will be used to pull images.",
},
},
},
}
@require_repo_admin(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("activateBuildTrigger")
@validate_json_request("BuildTriggerActivateRequest")
def post(self, namespace_name, repo_name, trigger_uuid):
"""
Activate the specified build trigger.
"""
trigger = get_trigger(trigger_uuid)
handler = BuildTriggerHandler.get_handler(trigger)
if handler.is_active():
raise InvalidRequest("Trigger config is not sufficient for activation.")
user_permission = UserAdminPermission(trigger.connected_user.username)
if user_permission.can() or allow_if_superuser_with_full_access():
# Update the pull robot (if any).
pull_robot_name = request.get_json().get("pull_robot", None)
if pull_robot_name:
try:
pull_robot = model.user.lookup_robot(pull_robot_name)
except model.InvalidRobotException:
raise NotFound()
# Make sure the user has administer permissions for the robot's namespace.
(robot_namespace, _) = parse_robot_username(pull_robot_name)
if not AdministerOrganizationPermission(robot_namespace).can():
raise Unauthorized()
# Make sure the namespace matches that of the trigger.
if robot_namespace != namespace_name:
raise Unauthorized()
# Set the pull robot.
trigger.pull_robot = pull_robot
# Update the config.
new_config_dict = request.get_json()["config"]
write_token_name = "Build Trigger: %s" % trigger.service.name
write_token = model.token.create_delegate_token(
namespace_name, repo_name, write_token_name, "write"
)
try:
path = url_for("webhooks.build_trigger_webhook", trigger_uuid=trigger.uuid)
authed_url = _prepare_webhook_url(
app.config["PREFERRED_URL_SCHEME"],
"$token",
write_token.get_code(),
app.config["SERVER_HOSTNAME"],
path,
)
handler = BuildTriggerHandler.get_handler(trigger, new_config_dict)
final_config, private_config = handler.activate(authed_url)
if "private_key" in private_config:
trigger.secure_private_key = DecryptedValue(private_config["private_key"])
except TriggerException as exc:
write_token.delete_instance()
raise request_error(message=str(exc))
# Save the updated config.
update_build_trigger(trigger, final_config, write_token=write_token)
# Log the trigger setup.
repo = model.repository.get_repository(namespace_name, repo_name)
log_action(
"setup_repo_trigger",
namespace_name,
{
"repo": repo_name,
"namespace": namespace_name,
"trigger_id": trigger.uuid,
"service": trigger.service.name,
"pull_robot": trigger.pull_robot.username if trigger.pull_robot else None,
"config": final_config,
},
repo=repo,
)
return trigger_view(trigger, can_admin=True)
else:
raise Unauthorized()
@resource("/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>/analyze")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("trigger_uuid", "The UUID of the build trigger")
@internal_only
class BuildTriggerAnalyze(RepositoryParamResource):
"""
Custom verb for analyzing the config for a build trigger and suggesting various changes (such as
a robot account to use for pulling)
"""
schemas = {
"BuildTriggerAnalyzeRequest": {
"type": "object",
"required": ["config"],
"properties": {
"config": {
"type": "object",
"description": "Arbitrary json.",
}
},
},
}
@require_repo_admin(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("analyzeBuildTrigger")
@validate_json_request("BuildTriggerAnalyzeRequest")
def post(self, namespace_name, repo_name, trigger_uuid):
"""
Analyze the specified build trigger configuration.
"""
trigger = get_trigger(trigger_uuid)
if trigger.repository.namespace_user.username != namespace_name:
raise NotFound()
if trigger.repository.name != repo_name:
raise NotFound()
new_config_dict = request.get_json()["config"]
handler = BuildTriggerHandler.get_handler(trigger, new_config_dict)
server_hostname = app.config["SERVER_HOSTNAME"]
try:
trigger_analyzer = TriggerAnalyzer(
handler,
namespace_name,
server_hostname,
new_config_dict,
AdministerOrganizationPermission(namespace_name).can(),
)
return trigger_analyzer.analyze_trigger()
except TriggerException as rre:
return {
"status": "error",
"message": "Could not analyze the repository: %s" % rre,
}
except NotImplementedError:
return {
"status": "notimplemented",
}
@resource("/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>/start")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("trigger_uuid", "The UUID of the build trigger")
class ActivateBuildTrigger(RepositoryParamResource):
"""
Custom verb to manually activate a build trigger.
"""
schemas = {
"RunParameters": {
"type": "object",
"description": "Optional run parameters for activating the build trigger",
"properties": {
"branch_name": {
"type": "string",
"description": "(SCM only) If specified, the name of the branch to build.",
},
"commit_sha": {
"type": "string",
"description": "(Custom Only) If specified, the ref/SHA1 used to checkout a git repository.",
},
"refs": {
"type": ["object", "null"],
"description": "(SCM Only) If specified, the ref to build.",
},
},
"additionalProperties": False,
}
}
@require_repo_admin(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("manuallyStartBuildTrigger")
@validate_json_request("RunParameters")
def post(self, namespace_name, repo_name, trigger_uuid):
"""
Manually start a build from the specified trigger.
"""
trigger = get_trigger(trigger_uuid)
if not trigger.enabled:
raise InvalidRequest("Trigger is not enabled.")
handler = BuildTriggerHandler.get_handler(trigger)
if not handler.is_active():
raise InvalidRequest("Trigger is not active.")
try:
repo = model.repository.get_repository(namespace_name, repo_name)
pull_robot_name = model.build.get_pull_robot_name(trigger)
run_parameters = request.get_json()
prepared = handler.manual_start(run_parameters=run_parameters)
performer = get_authenticated_user()
build_request = start_build(
repo,
prepared,
pull_robot_name=pull_robot_name,
performer=performer,
manual_trigger=True,
)
except TriggerException as tse:
raise InvalidRequest(str(tse)) from tse
except MaximumBuildsQueuedException:
abort(429, message="Maximum queued build rate exceeded.")
except BuildTriggerDisabledException:
abort(400, message="Build trigger is disabled")
resp = build_status_view(build_request)
repo_string = "%s/%s" % (namespace_name, repo_name)
headers = {
"Location": api.url_for(
RepositoryBuildStatus, repository=repo_string, build_uuid=build_request.uuid
),
}
return resp, 201, headers
@resource("/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>/builds")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("trigger_uuid", "The UUID of the build trigger")
class TriggerBuildList(RepositoryParamResource):
"""
Resource to represent builds that were activated from the specified trigger.
"""
@require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True)
@disallow_for_app_repositories
@parse_args()
@query_param("limit", "The maximum number of builds to return", type=int, default=5)
@nickname("listTriggerRecentBuilds")
def get(self, namespace_name, repo_name, trigger_uuid, parsed_args):
"""
List the builds started by the specified trigger.
"""
limit = parsed_args["limit"]
builds = model.build.list_trigger_builds(namespace_name, repo_name, trigger_uuid, limit)
return {"builds": [build_status_view(bld) for bld in builds]}
FIELD_VALUE_LIMIT = 30
@resource("/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>/fields/<field_name>")
@internal_only
class BuildTriggerFieldValues(RepositoryParamResource):
"""
Custom verb to fetch a values list for a particular field name.
"""
@require_repo_admin(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("listTriggerFieldValues")
def post(self, namespace_name, repo_name, trigger_uuid, field_name):
"""
List the field values for a custom run field.
"""
trigger = get_trigger(trigger_uuid)
config = request.get_json() or None
if AdministerRepositoryPermission(namespace_name, repo_name).can():
handler = BuildTriggerHandler.get_handler(trigger, config)
values = handler.list_field_values(field_name, limit=FIELD_VALUE_LIMIT)
if values is None:
raise NotFound()
return {"values": values}
else:
raise Unauthorized()
@resource("/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>/sources")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("trigger_uuid", "The UUID of the build trigger")
@internal_only
class BuildTriggerSources(RepositoryParamResource):
"""
Custom verb to fetch the list of build sources for the trigger config.
"""
schemas = {
"BuildTriggerSourcesRequest": {
"type": "object",
"description": "Specifies the namespace under which to fetch sources",
"properties": {
"namespace": {
"type": "string",
"description": "The namespace for which to fetch sources",
},
},
}
}
@require_repo_admin(allow_for_superuser=True)
@disallow_for_app_repositories
@disallow_for_non_normal_repositories
@disallow_for_user_namespace
@nickname("listTriggerBuildSources")
@validate_json_request("BuildTriggerSourcesRequest")
def post(self, namespace_name, repo_name, trigger_uuid):
"""
List the build sources for the trigger configuration thus far.
"""
namespace = request.get_json().get("namespace")
if namespace is None:
raise InvalidRequest()
trigger = get_trigger(trigger_uuid)
user_permission = UserAdminPermission(trigger.connected_user.username)
if user_permission.can():
handler = BuildTriggerHandler.get_handler(trigger)
try:
return {"sources": handler.list_build_sources_for_namespace(namespace)}
except TriggerException as rre:
raise InvalidRequest(str(rre)) from rre
else:
raise Unauthorized()
@resource("/v1/repository/<apirepopath:repository>/trigger/<trigger_uuid>/namespaces")
@path_param("repository", "The full path of the repository. e.g. namespace/name")
@path_param("trigger_uuid", "The UUID of the build trigger")
@internal_only
class BuildTriggerSourceNamespaces(RepositoryParamResource):
"""
Custom verb to fetch the list of namespaces (orgs, projects, etc) for the trigger config.
"""
@require_repo_admin(allow_for_superuser=True, allow_for_global_readonly_superuser=True)
@disallow_for_app_repositories
@nickname("listTriggerBuildSourceNamespaces")
def get(self, namespace_name, repo_name, trigger_uuid):
"""
List the build sources for the trigger configuration thus far.
"""
trigger = get_trigger(trigger_uuid)
user_permission = UserAdminPermission(trigger.connected_user.username)
if user_permission.can():
handler = BuildTriggerHandler.get_handler(trigger)
try:
return {"namespaces": handler.list_build_source_namespaces()}
except TriggerException as rre:
raise InvalidRequest(str(rre)) from rre
else:
raise Unauthorized()