mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
169 lines
6.3 KiB
Python
169 lines
6.3 KiB
Python
from os import path
|
|
|
|
from auth import permissions
|
|
from data import model
|
|
from util import dockerfileparse
|
|
|
|
|
|
def is_parent(context, dockerfile_path):
|
|
"""
|
|
This checks whether the context is a parent of the dockerfile_path.
|
|
"""
|
|
if context == "" or dockerfile_path == "":
|
|
return False
|
|
|
|
normalized_context = path.normpath(context)
|
|
if normalized_context[len(normalized_context) - 1] != path.sep:
|
|
normalized_context += path.sep
|
|
|
|
if normalized_context[0] != path.sep:
|
|
normalized_context = path.sep + normalized_context
|
|
|
|
normalized_subdir = path.normpath(path.dirname(dockerfile_path))
|
|
if normalized_subdir[0] != path.sep:
|
|
normalized_subdir = path.sep + normalized_subdir
|
|
|
|
if normalized_subdir[len(normalized_subdir) - 1] != path.sep:
|
|
normalized_subdir += path.sep
|
|
|
|
return normalized_subdir.startswith(normalized_context)
|
|
|
|
|
|
class TriggerAnalyzer(object):
|
|
"""
|
|
This analyzes triggers and returns the appropriate trigger and robot view to the frontend.
|
|
"""
|
|
|
|
def __init__(
|
|
self, handler, namespace_name, server_hostname, new_config_dict, admin_org_permission
|
|
):
|
|
self.handler = handler
|
|
self.namespace_name = namespace_name
|
|
self.server_hostname = server_hostname
|
|
self.new_config_dict = new_config_dict
|
|
self.admin_org_permission = admin_org_permission
|
|
|
|
def analyze_trigger(self):
|
|
# Load the contents of the Dockerfile.
|
|
contents = self.handler.load_dockerfile_contents()
|
|
if not contents:
|
|
return self.analyze_view(
|
|
self.namespace_name,
|
|
None,
|
|
"warning",
|
|
message="Specified Dockerfile path for the trigger was not found on the main "
|
|
+ "branch. This trigger may fail.",
|
|
)
|
|
|
|
# Parse the contents of the Dockerfile.
|
|
parsed = dockerfileparse.parse_dockerfile(contents)
|
|
if not parsed:
|
|
return self.analyze_view(
|
|
self.namespace_name,
|
|
None,
|
|
"error",
|
|
message="Could not parse the Dockerfile specified",
|
|
)
|
|
|
|
# Check whether the dockerfile_path is correct
|
|
if self.new_config_dict.get("context") and not is_parent(
|
|
self.new_config_dict.get("context"), self.new_config_dict.get("dockerfile_path")
|
|
):
|
|
return self.analyze_view(
|
|
self.namespace_name,
|
|
None,
|
|
"error",
|
|
message="Dockerfile, %s, is not a child of the context, %s."
|
|
% (
|
|
self.new_config_dict.get("dockerfile_path"),
|
|
self.new_config_dict.get("context"),
|
|
),
|
|
)
|
|
|
|
# Determine the base image (i.e. the FROM) for the Dockerfile.
|
|
base_image = parsed.get_base_image()
|
|
if not base_image:
|
|
return self.analyze_view(
|
|
self.namespace_name, None, "warning", message="No FROM line found in the Dockerfile"
|
|
)
|
|
|
|
# Check to see if the base image lives in Quay.
|
|
quay_registry_prefix = "%s/" % self.server_hostname
|
|
if not base_image.startswith(quay_registry_prefix):
|
|
return self.analyze_view(self.namespace_name, None, "publicbase")
|
|
|
|
# Lookup the repository in Quay.
|
|
result = str(base_image)[len(quay_registry_prefix) :].split("/", 2)
|
|
if len(result) != 2:
|
|
msg = '"%s" is not a valid Quay repository path' % base_image
|
|
return self.analyze_view(self.namespace_name, None, "warning", message=msg)
|
|
|
|
(base_namespace, base_repository) = result
|
|
found_repository = model.repository.get_repository(base_namespace, base_repository)
|
|
if not found_repository:
|
|
return self.analyze_view(
|
|
self.namespace_name,
|
|
None,
|
|
"error",
|
|
message='Repository "%s" referenced by the Dockerfile was not found' % base_image,
|
|
)
|
|
|
|
# If the repository is private and the user cannot see that repo, then
|
|
# mark it as not found.
|
|
can_read = False
|
|
if base_namespace == self.namespace_name:
|
|
can_read = permissions.ReadRepositoryPermission(base_namespace, base_repository)
|
|
|
|
if found_repository.visibility.name != "public" and not can_read:
|
|
return self.analyze_view(
|
|
self.namespace_name,
|
|
None,
|
|
"error",
|
|
message='Repository "%s" referenced by the Dockerfile was not found' % base_image,
|
|
)
|
|
|
|
if found_repository.visibility.name == "public":
|
|
return self.analyze_view(base_namespace, base_repository, "publicbase")
|
|
|
|
return self.analyze_view(base_namespace, base_repository, "requiresrobot")
|
|
|
|
def analyze_view(self, image_namespace, image_repository, status, message=None):
|
|
# Retrieve the list of robots and mark whether they have read access already.
|
|
robots = []
|
|
if self.admin_org_permission and self.namespace_name == image_namespace:
|
|
if image_repository is not None:
|
|
perm_query = model.user.get_all_repo_users_transitive(
|
|
image_namespace, image_repository
|
|
)
|
|
user_ids_with_permission = set([user.id for user in perm_query])
|
|
else:
|
|
user_ids_with_permission = set()
|
|
|
|
def robot_view(robot):
|
|
assert robot.username.startswith(
|
|
self.namespace_name + "+"
|
|
), "Expected robot under namespace %s, Found: %s" % (
|
|
self.namespace_name,
|
|
robot.username,
|
|
)
|
|
result = {
|
|
"name": robot.username,
|
|
"kind": "user",
|
|
"is_robot": True,
|
|
"can_read": robot.id in user_ids_with_permission,
|
|
}
|
|
return result
|
|
|
|
robots = [
|
|
robot_view(robot) for robot in model.user.list_namespace_robots(self.namespace_name)
|
|
]
|
|
|
|
return {
|
|
"namespace": image_namespace,
|
|
"name": image_repository,
|
|
"robots": robots,
|
|
"status": status,
|
|
"message": message,
|
|
"is_admin": self.admin_org_permission,
|
|
}
|