From cee2ab56d9ee4acbe4e90755c4a7fae4ea4fef6b Mon Sep 17 00:00:00 2001 From: Kenny Lee Sin Cheong <2530351+kleesc@users.noreply.github.com> Date: Thu, 22 Oct 2020 10:17:40 -0400 Subject: [PATCH] Implement executor methods to get the count of running builders (#582) Used by the manager to schedule builds based on the current running count. Uses the specific executors' api to get the count of running builders instead of Redis/Orchestrator. This is due to issues encountered in the past where the manager would have problems scheduling builds, and go into a weird state when Redis was unavailable. Remove wamp's REALM/websocket parameters from executor Remove asyncio from executor --- buildman/manager/executor.py | 184 ++++++++++++++-------------- buildman/templates/cloudconfig.json | 3 +- 2 files changed, 91 insertions(+), 96 deletions(-) diff --git a/buildman/manager/executor.py b/buildman/manager/executor.py index 931c11d0b..096e3502b 100644 --- a/buildman/manager/executor.py +++ b/buildman/manager/executor.py @@ -1,4 +1,3 @@ -import asyncio import datetime import hashlib import io @@ -25,7 +24,6 @@ import release from _init import ROOT_DIR from app import app -from buildman.asyncutil import AsyncWrapper from buildman.container_cloud_config import CloudConfigContext @@ -49,16 +47,13 @@ build_start_duration = Histogram( ) -def async_observe(metric, *labels): +def observe(metric, *labels): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): trigger_time = time.time() - try: - rv = func(*args, **kwargs) - except Return as e: - metric.labels(*labels).observe(time.time() - trigger_time) - raise e + rv = func(*args, **kwargs) + metric.labels(*labels).observe(time.time() - trigger_time) return rv return wrapper @@ -83,8 +78,8 @@ class BuilderExecutor(object): self.executor_config = executor_config self.manager_hostname = manager_hostname - default_websocket_scheme = "wss" if app.config["PREFERRED_URL_SCHEME"] == "https" else "ws" - self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme) + default_scheme = app.config["PREFERRED_URL_SCHEME"] + self.http_scheme = executor_config.get("HTTP_SCHEME", default_scheme) @property def name(self): @@ -102,7 +97,7 @@ class BuilderExecutor(object): """ return self.executor_config.get("SETUP_TIME") - async def start_builder(self, realm, token, build_uuid): + def start_builder(self, token, build_uuid): """ Create a builder with the specified config. @@ -110,12 +105,19 @@ class BuilderExecutor(object): """ raise NotImplementedError - async def stop_builder(self, builder_id): + def stop_builder(self, builder_id): """ Stop a builder which is currently running. """ raise NotImplementedError + @property + def running_builders_count(self): + """ + Returns the number of builders running under the executor + """ + raise NotImplementedError + def allowed_for_namespace(self, namespace): """ Returns true if this executor can be used for builds in the given namespace. @@ -145,7 +147,6 @@ class BuilderExecutor(object): def generate_cloud_config( self, - realm, token, build_uuid, coreos_channel, @@ -161,13 +162,12 @@ class BuilderExecutor(object): rendered_json = json.load( io.StringIO(TEMPLATE.render( - realm=realm, token=token, build_uuid=build_uuid, quay_username=quay_username, quay_password=quay_password, manager_hostname=manager_hostname, - websocket_scheme=self.websocket_scheme, + http_scheme=self.http_scheme, coreos_channel=coreos_channel, worker_image=self.executor_config.get( "WORKER_IMAGE", "quay.io/coreos/registry-build-worker" @@ -193,22 +193,34 @@ class EC2Executor(BuilderExecutor): ) def __init__(self, *args, **kwargs): - self._loop = asyncio.get_event_loop() super(EC2Executor, self).__init__(*args, **kwargs) def _get_conn(self): """ Creates an ec2 connection which can be used to manage instances. """ - return AsyncWrapper( - boto3.client( - "ec2", - region_name=self.executor_config["EC2_REGION"], - aws_access_key_id=self.executor_config["AWS_ACCESS_KEY"], - aws_secret_access_key=self.executor_config["AWS_SECRET_KEY"], - ) + return boto3.client( + "ec2", + region_name=self.executor_config["EC2_REGION"], + aws_access_key_id=self.executor_config["AWS_ACCESS_KEY"], + aws_secret_access_key=self.executor_config["AWS_SECRET_KEY"], ) + @property + def running_builders_count(self): + ec2_conn = self._get_conn() + resp = ec2.conn.describe_instances(Filters=[ + {"Name": "tag:Name", "Values": ["Quay Ephemeral Builder"]} + ]) + + count = 0 + for reservation in resp["Reservations"]: + for instance in reservation["Instances"]: + if instance["State"]["Name"] in ("Running", "Pending"): + count += 1 + + return count + @classmethod @cachetools.func.ttl_cache(ttl=ONE_HOUR) def _get_coreos_ami(cls, ec2_region, coreos_channel): @@ -219,18 +231,17 @@ class EC2Executor(BuilderExecutor): stack_amis = stack_list_json['architectures'][EC2Executor.COREOS_STACK_ARCHITECTURE]['images']['aws']['regions'] return stack_amis[ec2_region]['image'] - @async_observe(build_start_duration, "ec2") - async def start_builder(self, realm, token, build_uuid): + @observe(build_start_duration, "ec2") + def start_builder(self, token, build_uuid): region = self.executor_config["EC2_REGION"] channel = self.executor_config.get("COREOS_CHANNEL", "stable") coreos_ami = self.executor_config.get("COREOS_AMI", None) if coreos_ami is None: - get_ami_callable = partial(self._get_coreos_ami, region, channel) - coreos_ami = await self._loop.run_in_executor(None, get_ami_callable) + coreos_ami = self.get_coreos_ami(region, channel) user_data = self.generate_cloud_config( - realm, token, build_uuid, channel, self.manager_hostname + token, build_uuid, channel, self.manager_hostname ) logger.debug("Generated cloud config for build %s: %s", build_uuid, user_data) @@ -258,23 +269,32 @@ class EC2Executor(BuilderExecutor): } ] + tag_specs = [ + { + "ResourceType": "instance", + "Tags": [ + {"Key": "Name", "Value": "Quay Ephemeral Builder"}, + {"Key": "Token", "Value": token}, + {"Key": "BuildUUID", "Value": build_uuid}, + ] + } + ] + try: - reservation = await ( - ec2_conn.run_instances( - ImageId=coreos_ami, - InstanceType=self.executor_config["EC2_INSTANCE_TYPE"], - KeyName=self.executor_config.get("EC2_KEY_NAME", None), - UserData=user_data, - InstanceInitiatedShutdownBehavior="terminate", - BlockDeviceMappings=block_device_mappings, - NetworkInterfaces=interfaces, - MinCount=1, - MaxCount=1, - ) + reservation = ec2_conn.run_instances( + ImageId=coreos_ami, + InstanceType=self.executor_config["EC2_INSTANCE_TYPE"], + KeyName=self.executor_config.get("EC2_KEY_NAME", None), + UserData=user_data, + InstanceInitiatedShutdownBehavior="terminate", + BlockDeviceMappings=block_device_mappings, + NetworkInterfaces=interfaces, + MinCount=1, + MaxCount=1, + TagSpecifications=tag_specs ) except (ec2_conn.exceptions.ClientError, botocore.exceptions.ClientError) as ec2e: - logger.exception("Unable to spawn builder instance") - raise ec2e + raise ExecutorException(ec2e) instances = reservation.get("Instances", []) if not instances: @@ -284,48 +304,13 @@ class EC2Executor(BuilderExecutor): launched = instances[0] - # Sleep a few seconds to wait for AWS to spawn the instance. - await asyncio.sleep(_TAG_RETRY_SLEEP) - - # Tag the instance with its metadata. - for i in range(0, _TAG_RETRY_COUNT): - try: - await ( - ec2_conn.create_tags( - Resources=[ - launched["InstanceId"] - ], - Tags=[ - {"Key": "Name", "Value": "Quay Ephemeral Builder"}, - {"Key": "Realm", "Value": realm}, - {"Key": "Token", "Value": token}, - {"Key": "BuildUUID", "Value": build_uuid}, - ] - ) - ) - except (ec2_conn.exceptions.ClientError, botocore.exceptions.ClientError) as ec2e: - if ec2e.response["Error"]["Code"] == "InvalidInstanceID.NotFound": - if i < _TAG_RETRY_COUNT - 1: - logger.warning( - "Failed to write EC2 tags for instance %s for build %s (attempt #%s)", - launched["InstanceId"], - build_uuid, - i, - ) - await asyncio.sleep(_TAG_RETRY_SLEEP) - continue - - raise ExecutorException("Unable to find builder instance.") - - logger.exception("Failed to write EC2 tags (attempt #%s)", i) - logger.debug("Machine with ID %s started for build %s", launched["InstanceId"], build_uuid) return launched["InstanceId"] - async def stop_builder(self, builder_id): + def stop_builder(self, builder_id): try: ec2_conn = self._get_conn() - terminated_instances = await ec2_conn.terminate_instances(InstanceIds=[builder_id]) + terminated_instances = ec2_conn.terminate_instances(InstanceIds=[builder_id]) except (ec2_conn.exceptions.ClientError, botocore.exceptions.ClientError) as ec2e: if ec2e.response["Error"]["Code"] == "InvalidInstanceID.NotFound": logger.debug("Instance %s already terminated", builder_id) @@ -347,8 +332,12 @@ class PopenExecutor(BuilderExecutor): self._jobs = {} super(PopenExecutor, self).__init__(executor_config, manager_hostname) - @async_observe(build_start_duration, "fork") - async def start_builder(self, realm, token, build_uuid): + @property + def running_builders_count(self): + return len([i for i in [v[0].poll() for k, v in self._jobs] if i is not None]) + + @observe(build_start_duration, "fork") + def start_builder(self, token, build_uuid): # Now start a machine for this job, adding the machine id to the etcd information logger.debug("Forking process for build") @@ -356,7 +345,6 @@ class PopenExecutor(BuilderExecutor): ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787") builder_env = { "TOKEN": token, - "REALM": realm, "ENDPOINT": "ws://%s:%s" % (ws_host, ws_port), "DOCKER_TLS_VERIFY": os.environ.get("DOCKER_TLS_VERIFY", ""), "DOCKER_CERT_PATH": os.environ.get("DOCKER_CERT_PATH", ""), @@ -377,7 +365,7 @@ class PopenExecutor(BuilderExecutor): logger.debug("Builder spawned with id: %s", builder_id) return builder_id - async def stop_builder(self, builder_id): + def stop_builder(self, builder_id): if builder_id not in self._jobs: raise ExecutorException("Builder id not being tracked by executor.") @@ -396,13 +384,18 @@ class KubernetesExecutor(BuilderExecutor): def __init__(self, *args, **kwargs): super(KubernetesExecutor, self).__init__(*args, **kwargs) - self._loop = asyncio.get_event_loop() self.namespace = self.executor_config.get("BUILDER_NAMESPACE", "builder") self.image = self.executor_config.get( "BUILDER_VM_CONTAINER_IMAGE", "quay.io/quay/quay-builder-qemu-fedoracoreos:stable" ) - async def _request(self, method, path, **kwargs): + @property + def running_builders_count(self): + q = {"labelSelector": "build,time,manager,quay-sha"} + jobs_list = self._request("GET", self._jobs_path(), params=q) + return len(jobs_list.json()["items"]) + + def _request(self, method, path, **kwargs): request_options = dict(kwargs) tls_cert = self.executor_config.get("K8S_API_TLS_CERT") @@ -529,6 +522,7 @@ class KubernetesExecutor(BuilderExecutor): }, "spec": { "activeDeadlineSeconds": self.executor_config.get("MAXIMUM_JOB_TIME", 7200), + "ttlSecondsAfterFinished": self.executor_config.get("RETENTION_AFTER_FINISHED", 120), "template": { "metadata": { "labels": { @@ -577,19 +571,19 @@ class KubernetesExecutor(BuilderExecutor): return job_resource - @async_observe(build_start_duration, "k8s") - async def start_builder(self, realm, token, build_uuid): + @observe(build_start_duration, "k8s") + def start_builder(self, token, build_uuid): # generate resource channel = self.executor_config.get("COREOS_CHANNEL", "stable") user_data = self.generate_cloud_config( - realm, token, build_uuid, channel, self.manager_hostname + token, build_uuid, channel, self.manager_hostname ) resource = self._job_resource(build_uuid, user_data, channel) logger.debug("Using Kubernetes Distribution: %s", self._kubernetes_distribution()) logger.debug("Generated kubernetes resource:\n%s", resource) # schedule - create_job = await self._request("POST", self._jobs_path(), json=resource) + create_job = self._request("POST", self._jobs_path(), json=resource) if int(create_job.status_code / 100) != 2: raise ExecutorException( "Failed to create job: %s: %s: %s" @@ -599,21 +593,23 @@ class KubernetesExecutor(BuilderExecutor): job = create_job.json() return job["metadata"]["name"] - async def stop_builder(self, builder_id): + def stop_builder(self, builder_id): pods_path = "/api/v1/namespaces/%s/pods" % self.namespace # Delete the job itself. try: - await self._request("DELETE", self._job_path(builder_id)) + self._request("DELETE", self._job_path(builder_id)) except: - logger.exception("Failed to send delete job call for job %s", builder_id) + logger.error("Failed to send delete job call for job %s", builder_id) + raise ExecutorException("Failed to send delete job call for job %s", builder_id) # Delete the pod(s) for the job. selectorString = "job-name=%s" % builder_id try: - await (self._request("DELETE", pods_path, params=dict(labelSelector=selectorString))) + self._request("DELETE", pods_path, params=dict(labelSelector=selectorString)) except: - logger.exception("Failed to send delete pod call for job %s", builder_id) + logger.error("Failed to send delete pod call for job %s", builder_id) + raise ExecutorException("Failed to send delete pod call for job %s", builder_id) class LogPipe(threading.Thread): diff --git a/buildman/templates/cloudconfig.json b/buildman/templates/cloudconfig.json index ef6a02080..336ea5182 100644 --- a/buildman/templates/cloudconfig.json +++ b/buildman/templates/cloudconfig.json @@ -1,8 +1,7 @@ {% macro overridelist() -%} -REALM={{ realm }} TOKEN={{ token }} -SERVER={{ websocket_scheme }}://{{ manager_hostname }} +SERVER={{ http_scheme }}://{{ manager_hostname }} {%- endmacro %}