1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/endpoints/api/trigger_analyzer.py
2022-01-27 16:43:32 -06:00

169 lines
6.3 KiB
Python

from os import path
from auth import permissions
from data import model
from util import dockerfileparse
def is_parent(context, dockerfile_path):
"""
This checks whether the context is a parent of the dockerfile_path.
"""
if context == "" or dockerfile_path == "":
return False
normalized_context = path.normpath(context)
if normalized_context[len(normalized_context) - 1] != path.sep:
normalized_context += path.sep
if normalized_context[0] != path.sep:
normalized_context = path.sep + normalized_context
normalized_subdir = path.normpath(path.dirname(dockerfile_path))
if normalized_subdir[0] != path.sep:
normalized_subdir = path.sep + normalized_subdir
if normalized_subdir[len(normalized_subdir) - 1] != path.sep:
normalized_subdir += path.sep
return normalized_subdir.startswith(normalized_context)
class TriggerAnalyzer(object):
"""
This analyzes triggers and returns the appropriate trigger and robot view to the frontend.
"""
def __init__(
self, handler, namespace_name, server_hostname, new_config_dict, admin_org_permission
):
self.handler = handler
self.namespace_name = namespace_name
self.server_hostname = server_hostname
self.new_config_dict = new_config_dict
self.admin_org_permission = admin_org_permission
def analyze_trigger(self):
# Load the contents of the Dockerfile.
contents = self.handler.load_dockerfile_contents()
if not contents:
return self.analyze_view(
self.namespace_name,
None,
"warning",
message="Specified Dockerfile path for the trigger was not found on the main "
+ "branch. This trigger may fail.",
)
# Parse the contents of the Dockerfile.
parsed = dockerfileparse.parse_dockerfile(contents)
if not parsed:
return self.analyze_view(
self.namespace_name,
None,
"error",
message="Could not parse the Dockerfile specified",
)
# Check whether the dockerfile_path is correct
if self.new_config_dict.get("context") and not is_parent(
self.new_config_dict.get("context"), self.new_config_dict.get("dockerfile_path")
):
return self.analyze_view(
self.namespace_name,
None,
"error",
message="Dockerfile, %s, is not a child of the context, %s."
% (
self.new_config_dict.get("dockerfile_path"),
self.new_config_dict.get("context"),
),
)
# Determine the base image (i.e. the FROM) for the Dockerfile.
base_image = parsed.get_base_image()
if not base_image:
return self.analyze_view(
self.namespace_name, None, "warning", message="No FROM line found in the Dockerfile"
)
# Check to see if the base image lives in Quay.
quay_registry_prefix = "%s/" % self.server_hostname
if not base_image.startswith(quay_registry_prefix):
return self.analyze_view(self.namespace_name, None, "publicbase")
# Lookup the repository in Quay.
result = str(base_image)[len(quay_registry_prefix) :].split("/", 2)
if len(result) != 2:
msg = '"%s" is not a valid Quay repository path' % base_image
return self.analyze_view(self.namespace_name, None, "warning", message=msg)
(base_namespace, base_repository) = result
found_repository = model.repository.get_repository(base_namespace, base_repository)
if not found_repository:
return self.analyze_view(
self.namespace_name,
None,
"error",
message='Repository "%s" referenced by the Dockerfile was not found' % base_image,
)
# If the repository is private and the user cannot see that repo, then
# mark it as not found.
can_read = False
if base_namespace == self.namespace_name:
can_read = permissions.ReadRepositoryPermission(base_namespace, base_repository)
if found_repository.visibility.name != "public" and not can_read:
return self.analyze_view(
self.namespace_name,
None,
"error",
message='Repository "%s" referenced by the Dockerfile was not found' % base_image,
)
if found_repository.visibility.name == "public":
return self.analyze_view(base_namespace, base_repository, "publicbase")
return self.analyze_view(base_namespace, base_repository, "requiresrobot")
def analyze_view(self, image_namespace, image_repository, status, message=None):
# Retrieve the list of robots and mark whether they have read access already.
robots = []
if self.admin_org_permission and self.namespace_name == image_namespace:
if image_repository is not None:
perm_query = model.user.get_all_repo_users_transitive(
image_namespace, image_repository
)
user_ids_with_permission = set([user.id for user in perm_query])
else:
user_ids_with_permission = set()
def robot_view(robot):
assert robot.username.startswith(
self.namespace_name + "+"
), "Expected robot under namespace %s, Found: %s" % (
self.namespace_name,
robot.username,
)
result = {
"name": robot.username,
"kind": "user",
"is_robot": True,
"can_read": robot.id in user_ids_with_permission,
}
return result
robots = [
robot_view(robot) for robot in model.user.list_namespace_robots(self.namespace_name)
]
return {
"namespace": image_namespace,
"name": image_repository,
"robots": robots,
"status": status,
"message": message,
"is_admin": self.admin_org_permission,
}