mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
797 lines
30 KiB
Python
797 lines
30 KiB
Python
import datetime
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from functools import lru_cache, partial, wraps
|
|
|
|
import boto3
|
|
import botocore
|
|
import cachetools.func
|
|
import requests
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from prometheus_client import Histogram
|
|
|
|
import release
|
|
from _init import OVERRIDE_CONFIG_DIRECTORY, ROOT_DIR
|
|
from app import app
|
|
from buildman.container_cloud_config import CloudConfigContext
|
|
from buildman.server import SECURE_GRPC_SERVER_PORT
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
ONE_HOUR = 60 * 60
|
|
|
|
_TAG_RETRY_COUNT = 3 # Number of times to retry adding tags.
|
|
_TAG_RETRY_SLEEP = 2 # Number of seconds to wait between tag retries.
|
|
|
|
ENV = Environment(loader=FileSystemLoader(os.path.join(ROOT_DIR, "buildman/templates")))
|
|
CloudConfigContext().populate_jinja_environment(ENV)
|
|
TEMPLATE = ENV.get_template("cloudconfig.json")
|
|
|
|
|
|
build_start_duration = Histogram(
|
|
"quay_build_start_duration_seconds",
|
|
"seconds taken for a executor to start executing a queued build",
|
|
labelnames=["executor"],
|
|
)
|
|
|
|
|
|
def observe(metric, *labels):
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
trigger_time = time.time()
|
|
rv = func(*args, **kwargs)
|
|
metric.labels(*labels).observe(time.time() - trigger_time)
|
|
return rv
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def persist_for_debugging(func):
|
|
"""
|
|
Wrapper for stop_builder that prevents the workers from being cleaned up (for testing purposes only)
|
|
"""
|
|
|
|
@wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
if self.executor_config.get("DEBUG", False):
|
|
logger.debug("Executor %s DEBUG set, not calling 'stop_builder()'", self.name)
|
|
return
|
|
return func(self, *args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
class ExecutorException(Exception):
|
|
"""
|
|
Exception raised when there is a problem starting or stopping a builder.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
class BuilderExecutor(object):
|
|
def __init__(self, executor_config, registry_hostname, manager_hostname):
|
|
"""
|
|
Interface which can be plugged into the EphemeralNodeManager to provide a strategy for
|
|
starting and stopping builders.
|
|
"""
|
|
self.executor_config = executor_config
|
|
self.registry_hostname = registry_hostname
|
|
self.manager_hostname = manager_hostname
|
|
|
|
@property
|
|
def name(self):
|
|
"""
|
|
Name returns the unique name for this executor.
|
|
"""
|
|
return self.executor_config.get("NAME") or self.__class__.__name__
|
|
|
|
@property
|
|
def setup_time(self):
|
|
"""
|
|
Returns the amount of time (in seconds) to wait for the execution to start for the build.
|
|
|
|
If None, the manager's default will be used.
|
|
"""
|
|
return self.executor_config.get("SETUP_TIME")
|
|
|
|
def start_builder(self, token, build_uuid):
|
|
"""
|
|
Create a builder with the specified config.
|
|
|
|
Returns a unique id which can be used to manage the builder.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def stop_builder(self, builder_id):
|
|
"""
|
|
Stop a builder which is currently running.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
@property
|
|
def running_builders_count(self):
|
|
"""
|
|
Returns the number of builders running under the executor
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def allowed_for_namespace(self, namespace):
|
|
"""
|
|
Returns true if this executor can be used for builds in the given namespace.
|
|
"""
|
|
|
|
# Check for an explicit namespace whitelist.
|
|
namespace_whitelist = self.executor_config.get("NAMESPACE_WHITELIST")
|
|
if namespace_whitelist is not None and namespace in namespace_whitelist:
|
|
return True
|
|
|
|
# Check for a staged rollout percentage. If found, we hash the namespace and, if it is found
|
|
# in the first X% of the character space, we allow this executor to be used.
|
|
staged_rollout = self.executor_config.get("STAGED_ROLLOUT")
|
|
if staged_rollout is not None:
|
|
bucket = int(hashlib.sha256(namespace.encode("utf-8")).hexdigest()[-2:], 16)
|
|
return bucket < (256 * staged_rollout)
|
|
|
|
# If there are no restrictions in place, we are free to use this executor.
|
|
return staged_rollout is None and namespace_whitelist is None
|
|
|
|
@property
|
|
def minimum_retry_threshold(self):
|
|
"""
|
|
Returns the minimum number of retries required for this executor to be used or 0 if none.
|
|
"""
|
|
return self.executor_config.get("MINIMUM_RETRY_THRESHOLD", 0)
|
|
|
|
@lru_cache(maxsize=1)
|
|
def _ca_cert(self):
|
|
try:
|
|
with open(os.path.join(OVERRIDE_CONFIG_DIRECTORY, "ssl.cert"), "r") as f:
|
|
return f.read()
|
|
except:
|
|
return None
|
|
|
|
def generate_cloud_config(
|
|
self,
|
|
token,
|
|
build_uuid,
|
|
manager_hostname,
|
|
quay_username=None,
|
|
quay_password=None,
|
|
):
|
|
if quay_username is None:
|
|
quay_username = self.executor_config.get("QUAY_USERNAME", None)
|
|
|
|
if quay_password is None:
|
|
quay_password = self.executor_config.get("QUAY_PASSWORD", None)
|
|
|
|
# If SERVER_HOSTNAME and BUILDMAN_HOSTNAME are served under the same host
|
|
# the gRPC service will be exposed at SERVER_HOSTNAME:55443
|
|
#
|
|
# If that's not the case, then BUILDMAN_HOSTNAME should include the port at which the service is being served.
|
|
# For example, BUILDMAN_HOST:443 for an OpenShift `Route`.
|
|
server_grpc_addr = (
|
|
manager_hostname.split(":", 1)[0] + ":" + str(SECURE_GRPC_SERVER_PORT)
|
|
if self.registry_hostname == self.manager_hostname
|
|
else self.manager_hostname
|
|
)
|
|
|
|
rendered_json = json.load(
|
|
io.StringIO(
|
|
TEMPLATE.render(
|
|
token=token,
|
|
build_uuid=build_uuid,
|
|
quay_username=quay_username,
|
|
quay_password=quay_password,
|
|
manager_hostname=server_grpc_addr,
|
|
registry_hostname=self.registry_hostname,
|
|
worker_image=self.executor_config.get(
|
|
"WORKER_IMAGE", "quay.io/coreos/registry-build-worker"
|
|
),
|
|
worker_tag=self.executor_config["WORKER_TAG"],
|
|
volume_size=self.executor_config.get("VOLUME_SIZE", "42G"),
|
|
max_lifetime_s=self.executor_config.get("MAX_LIFETIME_S", 10800),
|
|
timeout_start_sec=self.executor_config.get("MAX_LIFETIME_S", 10800),
|
|
ssh_authorized_keys=self.executor_config.get("SSH_AUTHORIZED_KEYS", []),
|
|
container_runtime=self.executor_config.get("CONTAINER_RUNTIME", "docker"),
|
|
ca_cert=self.executor_config.get("CA_CERT", self._ca_cert()),
|
|
debug=self.executor_config.get("DEBUG", False),
|
|
http_proxy=self.executor_config.get("HTTP_PROXY", None),
|
|
https_proxy=self.executor_config.get("HTTPS_PROXY", None),
|
|
noproxy=self.executor_config.get("NO_PROXY", None),
|
|
)
|
|
)
|
|
)
|
|
|
|
return json.dumps(rendered_json)
|
|
|
|
|
|
class EC2Executor(BuilderExecutor):
|
|
"""
|
|
Implementation of BuilderExecutor which uses libcloud to start machines on a variety of cloud
|
|
providers.
|
|
"""
|
|
|
|
COREOS_STACK_ARCHITECTURE = "x86_64"
|
|
COREOS_STACK_URL = "https://builds.coreos.fedoraproject.org/streams/%s.json"
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(EC2Executor, self).__init__(*args, **kwargs)
|
|
|
|
def _get_conn(self):
|
|
"""
|
|
Creates an ec2 connection which can be used to manage instances.
|
|
"""
|
|
return boto3.client(
|
|
"ec2",
|
|
region_name=self.executor_config["EC2_REGION"],
|
|
aws_access_key_id=self.executor_config.get("AWS_ACCESS_KEY", ""),
|
|
aws_secret_access_key=self.executor_config.get("AWS_SECRET_KEY", ""),
|
|
)
|
|
|
|
@property
|
|
def running_builders_count(self):
|
|
try:
|
|
ec2_conn = self._get_conn()
|
|
resp = ec2_conn.describe_instances(
|
|
Filters=[{"Name": "tag:Name", "Values": ["Quay Ephemeral Builder"]}]
|
|
)
|
|
except Exception as ec2e:
|
|
logger.error("EC2 executor error: %s", ec2e)
|
|
raise ExecutorException(ec2e)
|
|
|
|
count = 0
|
|
for reservation in resp["Reservations"]:
|
|
for instance in reservation["Instances"]:
|
|
if instance["State"]["Name"] in ("Running", "Pending"):
|
|
count += 1
|
|
|
|
return count
|
|
|
|
@classmethod
|
|
@cachetools.func.ttl_cache(ttl=ONE_HOUR)
|
|
def _get_coreos_ami(cls, ec2_region, coreos_channel):
|
|
"""
|
|
Retrieve the CoreOS AMI id from the canonical listing.
|
|
"""
|
|
stack_list_json = requests.get(EC2Executor.COREOS_STACK_URL % coreos_channel).json()
|
|
stack_amis = stack_list_json["architectures"][EC2Executor.COREOS_STACK_ARCHITECTURE][
|
|
"images"
|
|
]["aws"]["regions"]
|
|
return stack_amis[ec2_region]["image"]
|
|
|
|
@observe(build_start_duration, "ec2")
|
|
def start_builder(self, token, build_uuid):
|
|
region = self.executor_config["EC2_REGION"]
|
|
channel = self.executor_config.get("COREOS_CHANNEL", "stable")
|
|
|
|
coreos_ami = self.executor_config.get("COREOS_AMI", None)
|
|
if coreos_ami is None:
|
|
coreos_ami = self._get_coreos_ami(region, channel)
|
|
|
|
user_data = self.generate_cloud_config(token, build_uuid, self.manager_hostname)
|
|
logger.debug("Generated cloud config for build %s: %s", build_uuid, user_data)
|
|
|
|
ec2_conn = self._get_conn()
|
|
|
|
block_device_mappings = [
|
|
{
|
|
"DeviceName": "/dev/xvda",
|
|
"Ebs": {
|
|
"VolumeSize": int(self.executor_config.get("BLOCK_DEVICE_SIZE", 48)),
|
|
"VolumeType": "gp2",
|
|
"DeleteOnTermination": True,
|
|
},
|
|
}
|
|
]
|
|
|
|
interfaces = None
|
|
if self.executor_config.get("EC2_VPC_SUBNET_ID", None) is not None:
|
|
interfaces = [
|
|
{
|
|
"DeviceIndex": 0,
|
|
"SubnetId": self.executor_config["EC2_VPC_SUBNET_ID"],
|
|
"Groups": self.executor_config["EC2_SECURITY_GROUP_IDS"],
|
|
"AssociatePublicIpAddress": True,
|
|
}
|
|
]
|
|
|
|
tag_specs = [
|
|
{
|
|
"ResourceType": "instance",
|
|
"Tags": [
|
|
{"Key": "Name", "Value": "Quay Ephemeral Builder"},
|
|
{"Key": "RegistrationToken", "Value": token[:36]},
|
|
{"Key": "BuildUUID", "Value": build_uuid},
|
|
],
|
|
}
|
|
]
|
|
|
|
try:
|
|
reservation = ec2_conn.run_instances(
|
|
ImageId=coreos_ami,
|
|
InstanceType=self.executor_config["EC2_INSTANCE_TYPE"],
|
|
KeyName=self.executor_config.get("EC2_KEY_NAME", None),
|
|
UserData=user_data,
|
|
InstanceInitiatedShutdownBehavior="terminate",
|
|
BlockDeviceMappings=block_device_mappings,
|
|
NetworkInterfaces=interfaces,
|
|
MinCount=1,
|
|
MaxCount=1,
|
|
TagSpecifications=tag_specs,
|
|
)
|
|
except (ec2_conn.exceptions.ClientError, botocore.exceptions.ClientError) as ec2e:
|
|
raise ExecutorException(ec2e)
|
|
|
|
instances = reservation.get("Instances", [])
|
|
if not instances:
|
|
raise ExecutorException("Unable to spawn builder instance.")
|
|
elif len(instances) != 1:
|
|
raise ExecutorException("EC2 started wrong number of instances!")
|
|
|
|
launched = instances[0]
|
|
|
|
logger.debug("Machine with ID %s started for build %s", launched["InstanceId"], build_uuid)
|
|
return launched["InstanceId"]
|
|
|
|
@persist_for_debugging
|
|
def stop_builder(self, builder_id):
|
|
try:
|
|
ec2_conn = self._get_conn()
|
|
terminated_instances = ec2_conn.terminate_instances(InstanceIds=[builder_id])
|
|
except (ec2_conn.exceptions.ClientError, botocore.exceptions.ClientError) as ec2e:
|
|
if ec2e.response["Error"]["Code"] == "InvalidInstanceID.NotFound":
|
|
logger.debug("Instance %s already terminated", builder_id)
|
|
return
|
|
|
|
logger.exception("Exception when trying to terminate instance %s", builder_id)
|
|
raise
|
|
|
|
if builder_id not in [
|
|
si["InstanceId"] for si in terminated_instances["TerminatingInstances"]
|
|
]:
|
|
raise ExecutorException("Unable to terminate instance: %s" % builder_id)
|
|
|
|
|
|
class PopenExecutor(BuilderExecutor):
|
|
"""
|
|
Implementation of BuilderExecutor which uses Popen to fork a quay-builder process.
|
|
"""
|
|
|
|
def __init__(self, executor_config, manager_hostname):
|
|
self._jobs = {}
|
|
super(PopenExecutor, self).__init__(executor_config, manager_hostname)
|
|
|
|
@property
|
|
def running_builders_count(self):
|
|
return len([i for i in [v[0].poll() for k, v in self._jobs] if i is not None])
|
|
|
|
@observe(build_start_duration, "fork")
|
|
def start_builder(self, token, build_uuid):
|
|
# Now start a machine for this job, adding the machine id to the etcd information
|
|
logger.debug("Forking process for build")
|
|
|
|
ws_host = os.environ.get("BUILDMAN_WS_HOST", "localhost")
|
|
ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787")
|
|
builder_env = {
|
|
"TOKEN": token,
|
|
"ENDPOINT": "ws://%s:%s" % (ws_host, ws_port),
|
|
"DOCKER_TLS_VERIFY": os.environ.get("DOCKER_TLS_VERIFY", ""),
|
|
"DOCKER_CERT_PATH": os.environ.get("DOCKER_CERT_PATH", ""),
|
|
"DOCKER_HOST": os.environ.get("DOCKER_HOST", ""),
|
|
"PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
|
|
}
|
|
|
|
logpipe = LogPipe(logging.INFO)
|
|
spawned = subprocess.Popen(
|
|
os.environ.get("BUILDER_BINARY_LOCATION", "/usr/local/bin/quay-builder"),
|
|
stdout=logpipe,
|
|
stderr=logpipe,
|
|
env=builder_env,
|
|
)
|
|
|
|
builder_id = str(uuid.uuid4())
|
|
self._jobs[builder_id] = (spawned, logpipe)
|
|
logger.debug("Builder spawned with id: %s", builder_id)
|
|
return builder_id
|
|
|
|
def stop_builder(self, builder_id):
|
|
if builder_id not in self._jobs:
|
|
raise ExecutorException("Builder id not being tracked by executor.")
|
|
|
|
logger.debug("Killing builder with id: %s", builder_id)
|
|
spawned, logpipe = self._jobs[builder_id]
|
|
|
|
if spawned.poll() is None:
|
|
spawned.kill()
|
|
logpipe.close()
|
|
|
|
|
|
class KubernetesExecutor(BuilderExecutor):
|
|
"""
|
|
Executes build jobs by creating Kubernetes jobs which run a qemu-kvm virtual machine in a pod.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(KubernetesExecutor, self).__init__(*args, **kwargs)
|
|
self.namespace = self.executor_config.get("BUILDER_NAMESPACE", "builder")
|
|
self.image = self.executor_config.get(
|
|
"BUILDER_VM_CONTAINER_IMAGE", "quay.io/quay/quay-builder-qemu-fedoracoreos:stable"
|
|
)
|
|
|
|
@property
|
|
def running_builders_count(self):
|
|
def _completed(job):
|
|
if not job.get("status"):
|
|
return False
|
|
|
|
conditions = job["status"].get("conditions")
|
|
if not conditions:
|
|
return False
|
|
|
|
if (
|
|
conditions[0]["type"] in ("Complete", "Failed")
|
|
and conditions[0]["status"] == "True"
|
|
):
|
|
return True
|
|
|
|
return False
|
|
|
|
q = {"labelSelector": "build,time,manager,quay-sha"}
|
|
jobs_list = self._request("GET", self._jobs_path(), params=q)
|
|
if jobs_list.status_code != 200:
|
|
logger.error(
|
|
"Kubernetes executor request error: %s %s - %s",
|
|
"GET",
|
|
jobs_list.url,
|
|
jobs_list.status_code,
|
|
)
|
|
raise ExecutorException(
|
|
"Failed to get runnning builder count from executor %s: %s %s",
|
|
self.name,
|
|
jobs_list.status_code,
|
|
jobs_list.reason,
|
|
)
|
|
|
|
running_jobs = [j for j in jobs_list.json()["items"] if not _completed(j)]
|
|
|
|
return len(running_jobs)
|
|
|
|
def _request(self, method, path, **kwargs):
|
|
request_options = dict(kwargs)
|
|
|
|
tls_cert = self.executor_config.get("K8S_API_TLS_CERT")
|
|
tls_key = self.executor_config.get("K8S_API_TLS_KEY")
|
|
tls_ca = self.executor_config.get("K8S_API_TLS_CA")
|
|
service_account_token = self.executor_config.get("SERVICE_ACCOUNT_TOKEN")
|
|
|
|
if tls_ca:
|
|
request_options["verify"] = tls_ca
|
|
|
|
if "timeout" not in request_options:
|
|
request_options["timeout"] = self.executor_config.get("K8S_API_TIMEOUT", 20)
|
|
|
|
if service_account_token:
|
|
scheme = "https"
|
|
request_options["headers"] = {"Authorization": "Bearer " + service_account_token}
|
|
logger.debug("Using service account token for Kubernetes authentication")
|
|
elif tls_cert and tls_key:
|
|
scheme = "https"
|
|
request_options["cert"] = (tls_cert, tls_key)
|
|
logger.debug("Using tls certificate and key for Kubernetes authentication")
|
|
else:
|
|
scheme = "http"
|
|
|
|
server = self.executor_config.get("K8S_API_SERVER", "localhost:8080")
|
|
url = "%s://%s%s" % (scheme, server, path)
|
|
|
|
logger.debug("Executor config: %s", self.executor_config)
|
|
logger.debug("Kubernetes request: %s %s: %s", method, url, request_options)
|
|
res = requests.request(method, url, **request_options)
|
|
logger.debug("Kubernetes response: %s: %s", res.status_code, res.text)
|
|
return res
|
|
|
|
def _jobs_path(self):
|
|
return "/apis/batch/v1/namespaces/%s/jobs" % self.namespace
|
|
|
|
def _job_path(self, build_uuid):
|
|
return "%s/%s" % (self._jobs_path(), build_uuid)
|
|
|
|
def _kubernetes_distribution(self):
|
|
return self.executor_config.get("KUBERNETES_DISTRIBUTION", "basic").lower()
|
|
|
|
def _is_basic_kubernetes_distribution(self):
|
|
return self._kubernetes_distribution() == "basic"
|
|
|
|
def _is_openshift_kubernetes_distribution(self):
|
|
return self._kubernetes_distribution() == "openshift"
|
|
|
|
def _build_job_container_resources(self):
|
|
# Minimum acceptable free resources for this container to "fit" in a quota
|
|
# These may be lower than the absolute limits if the cluster is knowingly
|
|
# oversubscribed by some amount.
|
|
container_requests = {
|
|
"memory": self.executor_config.get("CONTAINER_MEMORY_REQUEST", "3968Mi"),
|
|
}
|
|
|
|
container_limits = {
|
|
"memory": self.executor_config.get("CONTAINER_MEMORY_LIMITS", "5120Mi"),
|
|
"cpu": self.executor_config.get("CONTAINER_CPU_LIMITS", "1000m"),
|
|
}
|
|
|
|
resources = {
|
|
"requests": container_requests,
|
|
}
|
|
|
|
if self._is_openshift_kubernetes_distribution():
|
|
resources["requests"]["cpu"] = self.executor_config.get("CONTAINER_CPU_REQUEST", "500m")
|
|
resources["limits"] = container_limits
|
|
|
|
return resources
|
|
|
|
def _build_job_containers(self, token, build_uuid):
|
|
user_data = self.generate_cloud_config(token, build_uuid, self.manager_hostname)
|
|
vm_memory_limit = self.executor_config.get("VM_MEMORY_LIMIT", "4G")
|
|
vm_volume_size = self.executor_config.get("VOLUME_SIZE", "32G")
|
|
|
|
container = {
|
|
"name": "builder",
|
|
"imagePullPolicy": self.executor_config.get("IMAGE_PULL_POLICY", "Always"),
|
|
"image": self.image,
|
|
"securityContext": {"privileged": True},
|
|
"env": [
|
|
{"name": "USERDATA", "value": user_data},
|
|
{"name": "VM_MEMORY", "value": vm_memory_limit},
|
|
{"name": "VM_VOLUME_SIZE", "value": vm_volume_size},
|
|
],
|
|
"resources": self._build_job_container_resources(),
|
|
}
|
|
|
|
if self._is_basic_kubernetes_distribution():
|
|
container["volumeMounts"] = [
|
|
{
|
|
"name": "secrets-mask",
|
|
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
|
|
}
|
|
]
|
|
|
|
return container
|
|
|
|
def _job_resource(self, token, build_uuid):
|
|
image_pull_secret_name = self.executor_config.get("IMAGE_PULL_SECRET_NAME", "builder")
|
|
service_account = self.executor_config.get("SERVICE_ACCOUNT_NAME", "quay-builder-sa")
|
|
node_selector_label_key = self.executor_config.get(
|
|
"NODE_SELECTOR_LABEL_KEY", "beta.kubernetes.io/instance-type"
|
|
)
|
|
node_selector_label_value = self.executor_config.get("NODE_SELECTOR_LABEL_VALUE", "")
|
|
|
|
node_selector = {node_selector_label_key: node_selector_label_value}
|
|
|
|
release_sha = release.GIT_HEAD or "none"
|
|
if " " in release_sha:
|
|
release_sha = "HEAD"
|
|
|
|
job_resource = {
|
|
"apiVersion": "batch/v1",
|
|
"kind": "Job",
|
|
"metadata": {
|
|
"namespace": self.namespace,
|
|
"generateName": build_uuid + "-",
|
|
"labels": {
|
|
"build": build_uuid,
|
|
"time": datetime.datetime.now().strftime("%Y-%m-%d-%H"),
|
|
"manager": socket.gethostname(),
|
|
"quay-sha": release_sha,
|
|
},
|
|
},
|
|
"spec": {
|
|
"activeDeadlineSeconds": self.executor_config.get("MAXIMUM_JOB_TIME", 7200),
|
|
"backoffLimit": 1,
|
|
"template": {
|
|
"metadata": {
|
|
"labels": {
|
|
"build": build_uuid,
|
|
"time": datetime.datetime.now().strftime("%Y-%m-%d-%H"),
|
|
"manager": socket.gethostname(),
|
|
"quay-sha": release_sha,
|
|
},
|
|
},
|
|
"spec": {
|
|
"imagePullSecrets": [{"name": image_pull_secret_name}],
|
|
"restartPolicy": "Never",
|
|
"dnsPolicy": self.executor_config.get("DNS_POLICY", "Default"),
|
|
"containers": [self._build_job_containers(token, build_uuid)],
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
# If DEBUG isn't enabled, add a TTL on the job
|
|
if not self.executor_config.get("DEBUG", False):
|
|
job_resource["spec"]["ttlSecondsAfterFinished"] = self.executor_config.get(
|
|
"RETENTION_AFTER_FINISHED", 120
|
|
)
|
|
|
|
if self._is_openshift_kubernetes_distribution():
|
|
# Setting `automountServiceAccountToken` to false will prevent automounting API credentials for a service account.
|
|
job_resource["spec"]["template"]["spec"]["automountServiceAccountToken"] = False
|
|
|
|
# Use dedicated service account that has no authorization to any resources.
|
|
job_resource["spec"]["template"]["spec"]["serviceAccount"] = service_account
|
|
|
|
# Setting `enableServiceLinks` to false prevents information about other services from being injected into pod's
|
|
# environment variables. Pod has no visibility into other services on the cluster.
|
|
job_resource["spec"]["template"]["spec"]["enableServiceLinks"] = False
|
|
|
|
if node_selector_label_value.strip() != "":
|
|
job_resource["spec"]["template"]["spec"]["nodeSelector"] = node_selector
|
|
|
|
if self._is_basic_kubernetes_distribution():
|
|
# This volume is a hack to mask the token for the namespace's
|
|
# default service account, which is placed in a file mounted under
|
|
# `/var/run/secrets/kubernetes.io/serviceaccount` in all pods.
|
|
# There's currently no other way to just disable the service
|
|
# account at either the pod or namespace level.
|
|
#
|
|
# https://github.com/kubernetes/kubernetes/issues/16779
|
|
#
|
|
job_resource["spec"]["template"]["spec"]["volumes"] = [
|
|
{"name": "secrets-mask", "emptyDir": {"medium": "Memory"}}
|
|
]
|
|
|
|
return job_resource
|
|
|
|
@observe(build_start_duration, "k8s")
|
|
def start_builder(self, token, build_uuid):
|
|
# generate resource
|
|
resource = self._job_resource(token, build_uuid)
|
|
logger.debug("Using Kubernetes Distribution: %s", self._kubernetes_distribution())
|
|
logger.debug("Generated kubernetes resource:\n%s", resource)
|
|
|
|
# schedule
|
|
create_job = self._request("POST", self._jobs_path(), json=resource)
|
|
if int(create_job.status_code / 100) != 2:
|
|
raise ExecutorException(
|
|
"Failed to create job: %s: %s: %s"
|
|
% (build_uuid, create_job.status_code, create_job.text)
|
|
)
|
|
|
|
job = create_job.json()
|
|
return job["metadata"]["name"]
|
|
|
|
@persist_for_debugging
|
|
def stop_builder(self, builder_id):
|
|
pods_path = "/api/v1/namespaces/%s/pods" % self.namespace
|
|
|
|
# Delete the job itself.
|
|
try:
|
|
self._request("DELETE", self._job_path(builder_id))
|
|
except:
|
|
logger.error("Failed to send delete job call for job %s", builder_id)
|
|
raise ExecutorException("Failed to send delete job call for job %s", builder_id)
|
|
|
|
# Delete the pod(s) for the job.
|
|
selectorString = "job-name=%s" % builder_id
|
|
try:
|
|
self._request("DELETE", pods_path, params=dict(labelSelector=selectorString))
|
|
except:
|
|
logger.error("Failed to send delete pod call for job %s", builder_id)
|
|
raise ExecutorException("Failed to send delete pod call for job %s", builder_id)
|
|
|
|
|
|
class KubernetesPodmanExecutor(KubernetesExecutor):
|
|
def __init__(self, *args, **kwargs):
|
|
super(KubernetesExecutor, self).__init__(*args, **kwargs)
|
|
self.namespace = self.executor_config.get("BUILDER_NAMESPACE", "builder")
|
|
self.image = self.executor_config.get(
|
|
"BUILDER_CONTAINER_IMAGE", "quay.io/projectquay/quay-builder:latest"
|
|
)
|
|
|
|
def _build_job_containers(self, token, build_uuid):
|
|
server_grpc_addr = (
|
|
self.manager_hostname.split(":", 1)[0] + ":" + str(SECURE_GRPC_SERVER_PORT)
|
|
if self.registry_hostname == self.manager_hostname
|
|
else self.manager_hostname
|
|
)
|
|
|
|
cert = self.executor_config.get("CA_CERT", self._ca_cert())
|
|
certs = [cert] if cert is not None else []
|
|
for extra_cert in self.executor_config.get("EXTRA_CA_CERTS", []):
|
|
try:
|
|
with open(os.path.join(OVERRIDE_CONFIG_DIRECTORY, extra_cert), "r") as f:
|
|
certs.append(f.read())
|
|
except:
|
|
logger.warning("Failed to load extra CA cert for builder %s", extra_cert)
|
|
certs = "\n".join(certs)
|
|
|
|
container = {
|
|
"name": "builder",
|
|
"imagePullPolicy": self.executor_config.get("IMAGE_PULL_POLICY", "Always"),
|
|
"image": self.image,
|
|
"env": [
|
|
{"name": "TOKEN", "value": token},
|
|
{"name": "BUILD_UUID", "value": build_uuid},
|
|
{"name": "SERVER", "value": server_grpc_addr}, # manager_hostname
|
|
{"name": "REGISTRY_HOSTNAME", "value": self.registry_hostname},
|
|
{"name": "CONTAINER_RUNTIME", "value": "podman"},
|
|
{"name": "BULDAH_ISOLATION", "value": "chroot"},
|
|
{"name": "CA_CERT", "value": certs},
|
|
{"name": "GIT_SSL_CAINFO", "value": "/certs/cacert.crt"},
|
|
{"name": "TLS_CERT_PATH", "value": "/certs/cacert.crt"}, # Used by build manager
|
|
{
|
|
"name": "SSL_CERT_FILE",
|
|
"value": "/certs/cacert.crt",
|
|
}, # Used for other Podman and other Go libraries
|
|
{"name": "DEBUG", "value": str(self.executor_config.get("DEBUG", False)).lower()},
|
|
{"name": "HTTP_PROXY", "value": self.executor_config.get("HTTP_PROXY", "")},
|
|
{"name": "HTTPS_PROXY", "value": self.executor_config.get("HTTPS_PROXY", "")},
|
|
{"name": "NO_PROXY", "value": self.executor_config.get("NO_PROXY", "")},
|
|
{"name": "DOCKER_HOST", "value": "unix:///tmp/podman-run-1000/podman/podman.sock"},
|
|
{
|
|
"name": "EXECUTOR",
|
|
"value": self.executor_config.get("EXECUTOR", "kubernetesPodman"),
|
|
},
|
|
],
|
|
"resources": self._build_job_container_resources(),
|
|
}
|
|
|
|
if self._is_basic_kubernetes_distribution():
|
|
container["volumeMounts"] = [
|
|
{
|
|
"name": "secrets-mask",
|
|
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
|
|
}
|
|
]
|
|
|
|
return container
|
|
|
|
|
|
class LogPipe(threading.Thread):
|
|
"""
|
|
Adapted from http://codereview.stackexchange.com/a/17959.
|
|
"""
|
|
|
|
def __init__(self, level):
|
|
"""
|
|
Setup the object with a logger and a loglevel and start the thread.
|
|
"""
|
|
threading.Thread.__init__(self)
|
|
self.daemon = False
|
|
self.level = level
|
|
self.fd_read, self.fd_write = os.pipe()
|
|
self.pipe_reader = os.fdopen(self.fd_read)
|
|
self.start()
|
|
|
|
def fileno(self):
|
|
"""
|
|
Return the write file descriptor of the pipe.
|
|
"""
|
|
return self.fd_write
|
|
|
|
def run(self):
|
|
"""
|
|
Run the thread, logging everything.
|
|
"""
|
|
for line in iter(self.pipe_reader.readline, ""):
|
|
logging.log(self.level, line.strip("\n"))
|
|
|
|
self.pipe_reader.close()
|
|
|
|
def close(self):
|
|
"""
|
|
Close the write end of the pipe.
|
|
"""
|
|
os.close(self.fd_write)
|