1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00

Implement executor methods to get the count of running builders (#582)

Used by the manager to schedule builds based on the current running
count. Uses the specific executors' api to get the count of running
builders instead of Redis/Orchestrator.

This is due to issues encountered in the past where the manager would
have problems scheduling builds, and go into a weird state when
Redis was unavailable.

Remove wamp's REALM/websocket parameters from executor

Remove asyncio from executor
This commit is contained in:
Kenny Lee Sin Cheong
2020-10-22 10:17:40 -04:00
committed by GitHub
parent 2eb6a73c51
commit cee2ab56d9
2 changed files with 91 additions and 96 deletions

View File

@@ -1,4 +1,3 @@
import asyncio
import datetime
import hashlib
import io
@@ -25,7 +24,6 @@ import release
from _init import ROOT_DIR
from app import app
from buildman.asyncutil import AsyncWrapper
from buildman.container_cloud_config import CloudConfigContext
@@ -49,16 +47,13 @@ build_start_duration = Histogram(
)
def async_observe(metric, *labels):
def observe(metric, *labels):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
trigger_time = time.time()
try:
rv = func(*args, **kwargs)
except Return as e:
metric.labels(*labels).observe(time.time() - trigger_time)
raise e
rv = func(*args, **kwargs)
metric.labels(*labels).observe(time.time() - trigger_time)
return rv
return wrapper
@@ -83,8 +78,8 @@ class BuilderExecutor(object):
self.executor_config = executor_config
self.manager_hostname = manager_hostname
default_websocket_scheme = "wss" if app.config["PREFERRED_URL_SCHEME"] == "https" else "ws"
self.websocket_scheme = executor_config.get("WEBSOCKET_SCHEME", default_websocket_scheme)
default_scheme = app.config["PREFERRED_URL_SCHEME"]
self.http_scheme = executor_config.get("HTTP_SCHEME", default_scheme)
@property
def name(self):
@@ -102,7 +97,7 @@ class BuilderExecutor(object):
"""
return self.executor_config.get("SETUP_TIME")
async def start_builder(self, realm, token, build_uuid):
def start_builder(self, token, build_uuid):
"""
Create a builder with the specified config.
@@ -110,12 +105,19 @@ class BuilderExecutor(object):
"""
raise NotImplementedError
async def stop_builder(self, builder_id):
def stop_builder(self, builder_id):
"""
Stop a builder which is currently running.
"""
raise NotImplementedError
@property
def running_builders_count(self):
"""
Returns the number of builders running under the executor
"""
raise NotImplementedError
def allowed_for_namespace(self, namespace):
"""
Returns true if this executor can be used for builds in the given namespace.
@@ -145,7 +147,6 @@ class BuilderExecutor(object):
def generate_cloud_config(
self,
realm,
token,
build_uuid,
coreos_channel,
@@ -161,13 +162,12 @@ class BuilderExecutor(object):
rendered_json = json.load(
io.StringIO(TEMPLATE.render(
realm=realm,
token=token,
build_uuid=build_uuid,
quay_username=quay_username,
quay_password=quay_password,
manager_hostname=manager_hostname,
websocket_scheme=self.websocket_scheme,
http_scheme=self.http_scheme,
coreos_channel=coreos_channel,
worker_image=self.executor_config.get(
"WORKER_IMAGE", "quay.io/coreos/registry-build-worker"
@@ -193,22 +193,34 @@ class EC2Executor(BuilderExecutor):
)
def __init__(self, *args, **kwargs):
self._loop = asyncio.get_event_loop()
super(EC2Executor, self).__init__(*args, **kwargs)
def _get_conn(self):
"""
Creates an ec2 connection which can be used to manage instances.
"""
return AsyncWrapper(
boto3.client(
"ec2",
region_name=self.executor_config["EC2_REGION"],
aws_access_key_id=self.executor_config["AWS_ACCESS_KEY"],
aws_secret_access_key=self.executor_config["AWS_SECRET_KEY"],
)
return boto3.client(
"ec2",
region_name=self.executor_config["EC2_REGION"],
aws_access_key_id=self.executor_config["AWS_ACCESS_KEY"],
aws_secret_access_key=self.executor_config["AWS_SECRET_KEY"],
)
@property
def running_builders_count(self):
ec2_conn = self._get_conn()
resp = ec2.conn.describe_instances(Filters=[
{"Name": "tag:Name", "Values": ["Quay Ephemeral Builder"]}
])
count = 0
for reservation in resp["Reservations"]:
for instance in reservation["Instances"]:
if instance["State"]["Name"] in ("Running", "Pending"):
count += 1
return count
@classmethod
@cachetools.func.ttl_cache(ttl=ONE_HOUR)
def _get_coreos_ami(cls, ec2_region, coreos_channel):
@@ -219,18 +231,17 @@ class EC2Executor(BuilderExecutor):
stack_amis = stack_list_json['architectures'][EC2Executor.COREOS_STACK_ARCHITECTURE]['images']['aws']['regions']
return stack_amis[ec2_region]['image']
@async_observe(build_start_duration, "ec2")
async def start_builder(self, realm, token, build_uuid):
@observe(build_start_duration, "ec2")
def start_builder(self, token, build_uuid):
region = self.executor_config["EC2_REGION"]
channel = self.executor_config.get("COREOS_CHANNEL", "stable")
coreos_ami = self.executor_config.get("COREOS_AMI", None)
if coreos_ami is None:
get_ami_callable = partial(self._get_coreos_ami, region, channel)
coreos_ami = await self._loop.run_in_executor(None, get_ami_callable)
coreos_ami = self.get_coreos_ami(region, channel)
user_data = self.generate_cloud_config(
realm, token, build_uuid, channel, self.manager_hostname
token, build_uuid, channel, self.manager_hostname
)
logger.debug("Generated cloud config for build %s: %s", build_uuid, user_data)
@@ -258,23 +269,32 @@ class EC2Executor(BuilderExecutor):
}
]
tag_specs = [
{
"ResourceType": "instance",
"Tags": [
{"Key": "Name", "Value": "Quay Ephemeral Builder"},
{"Key": "Token", "Value": token},
{"Key": "BuildUUID", "Value": build_uuid},
]
}
]
try:
reservation = await (
ec2_conn.run_instances(
ImageId=coreos_ami,
InstanceType=self.executor_config["EC2_INSTANCE_TYPE"],
KeyName=self.executor_config.get("EC2_KEY_NAME", None),
UserData=user_data,
InstanceInitiatedShutdownBehavior="terminate",
BlockDeviceMappings=block_device_mappings,
NetworkInterfaces=interfaces,
MinCount=1,
MaxCount=1,
)
reservation = ec2_conn.run_instances(
ImageId=coreos_ami,
InstanceType=self.executor_config["EC2_INSTANCE_TYPE"],
KeyName=self.executor_config.get("EC2_KEY_NAME", None),
UserData=user_data,
InstanceInitiatedShutdownBehavior="terminate",
BlockDeviceMappings=block_device_mappings,
NetworkInterfaces=interfaces,
MinCount=1,
MaxCount=1,
TagSpecifications=tag_specs
)
except (ec2_conn.exceptions.ClientError, botocore.exceptions.ClientError) as ec2e:
logger.exception("Unable to spawn builder instance")
raise ec2e
raise ExecutorException(ec2e)
instances = reservation.get("Instances", [])
if not instances:
@@ -284,48 +304,13 @@ class EC2Executor(BuilderExecutor):
launched = instances[0]
# Sleep a few seconds to wait for AWS to spawn the instance.
await asyncio.sleep(_TAG_RETRY_SLEEP)
# Tag the instance with its metadata.
for i in range(0, _TAG_RETRY_COUNT):
try:
await (
ec2_conn.create_tags(
Resources=[
launched["InstanceId"]
],
Tags=[
{"Key": "Name", "Value": "Quay Ephemeral Builder"},
{"Key": "Realm", "Value": realm},
{"Key": "Token", "Value": token},
{"Key": "BuildUUID", "Value": build_uuid},
]
)
)
except (ec2_conn.exceptions.ClientError, botocore.exceptions.ClientError) as ec2e:
if ec2e.response["Error"]["Code"] == "InvalidInstanceID.NotFound":
if i < _TAG_RETRY_COUNT - 1:
logger.warning(
"Failed to write EC2 tags for instance %s for build %s (attempt #%s)",
launched["InstanceId"],
build_uuid,
i,
)
await asyncio.sleep(_TAG_RETRY_SLEEP)
continue
raise ExecutorException("Unable to find builder instance.")
logger.exception("Failed to write EC2 tags (attempt #%s)", i)
logger.debug("Machine with ID %s started for build %s", launched["InstanceId"], build_uuid)
return launched["InstanceId"]
async def stop_builder(self, builder_id):
def stop_builder(self, builder_id):
try:
ec2_conn = self._get_conn()
terminated_instances = await ec2_conn.terminate_instances(InstanceIds=[builder_id])
terminated_instances = ec2_conn.terminate_instances(InstanceIds=[builder_id])
except (ec2_conn.exceptions.ClientError, botocore.exceptions.ClientError) as ec2e:
if ec2e.response["Error"]["Code"] == "InvalidInstanceID.NotFound":
logger.debug("Instance %s already terminated", builder_id)
@@ -347,8 +332,12 @@ class PopenExecutor(BuilderExecutor):
self._jobs = {}
super(PopenExecutor, self).__init__(executor_config, manager_hostname)
@async_observe(build_start_duration, "fork")
async def start_builder(self, realm, token, build_uuid):
@property
def running_builders_count(self):
return len([i for i in [v[0].poll() for k, v in self._jobs] if i is not None])
@observe(build_start_duration, "fork")
def start_builder(self, token, build_uuid):
# Now start a machine for this job, adding the machine id to the etcd information
logger.debug("Forking process for build")
@@ -356,7 +345,6 @@ class PopenExecutor(BuilderExecutor):
ws_port = os.environ.get("BUILDMAN_WS_PORT", "8787")
builder_env = {
"TOKEN": token,
"REALM": realm,
"ENDPOINT": "ws://%s:%s" % (ws_host, ws_port),
"DOCKER_TLS_VERIFY": os.environ.get("DOCKER_TLS_VERIFY", ""),
"DOCKER_CERT_PATH": os.environ.get("DOCKER_CERT_PATH", ""),
@@ -377,7 +365,7 @@ class PopenExecutor(BuilderExecutor):
logger.debug("Builder spawned with id: %s", builder_id)
return builder_id
async def stop_builder(self, builder_id):
def stop_builder(self, builder_id):
if builder_id not in self._jobs:
raise ExecutorException("Builder id not being tracked by executor.")
@@ -396,13 +384,18 @@ class KubernetesExecutor(BuilderExecutor):
def __init__(self, *args, **kwargs):
super(KubernetesExecutor, self).__init__(*args, **kwargs)
self._loop = asyncio.get_event_loop()
self.namespace = self.executor_config.get("BUILDER_NAMESPACE", "builder")
self.image = self.executor_config.get(
"BUILDER_VM_CONTAINER_IMAGE", "quay.io/quay/quay-builder-qemu-fedoracoreos:stable"
)
async def _request(self, method, path, **kwargs):
@property
def running_builders_count(self):
q = {"labelSelector": "build,time,manager,quay-sha"}
jobs_list = self._request("GET", self._jobs_path(), params=q)
return len(jobs_list.json()["items"])
def _request(self, method, path, **kwargs):
request_options = dict(kwargs)
tls_cert = self.executor_config.get("K8S_API_TLS_CERT")
@@ -529,6 +522,7 @@ class KubernetesExecutor(BuilderExecutor):
},
"spec": {
"activeDeadlineSeconds": self.executor_config.get("MAXIMUM_JOB_TIME", 7200),
"ttlSecondsAfterFinished": self.executor_config.get("RETENTION_AFTER_FINISHED", 120),
"template": {
"metadata": {
"labels": {
@@ -577,19 +571,19 @@ class KubernetesExecutor(BuilderExecutor):
return job_resource
@async_observe(build_start_duration, "k8s")
async def start_builder(self, realm, token, build_uuid):
@observe(build_start_duration, "k8s")
def start_builder(self, token, build_uuid):
# generate resource
channel = self.executor_config.get("COREOS_CHANNEL", "stable")
user_data = self.generate_cloud_config(
realm, token, build_uuid, channel, self.manager_hostname
token, build_uuid, channel, self.manager_hostname
)
resource = self._job_resource(build_uuid, user_data, channel)
logger.debug("Using Kubernetes Distribution: %s", self._kubernetes_distribution())
logger.debug("Generated kubernetes resource:\n%s", resource)
# schedule
create_job = await self._request("POST", self._jobs_path(), json=resource)
create_job = self._request("POST", self._jobs_path(), json=resource)
if int(create_job.status_code / 100) != 2:
raise ExecutorException(
"Failed to create job: %s: %s: %s"
@@ -599,21 +593,23 @@ class KubernetesExecutor(BuilderExecutor):
job = create_job.json()
return job["metadata"]["name"]
async def stop_builder(self, builder_id):
def stop_builder(self, builder_id):
pods_path = "/api/v1/namespaces/%s/pods" % self.namespace
# Delete the job itself.
try:
await self._request("DELETE", self._job_path(builder_id))
self._request("DELETE", self._job_path(builder_id))
except:
logger.exception("Failed to send delete job call for job %s", builder_id)
logger.error("Failed to send delete job call for job %s", builder_id)
raise ExecutorException("Failed to send delete job call for job %s", builder_id)
# Delete the pod(s) for the job.
selectorString = "job-name=%s" % builder_id
try:
await (self._request("DELETE", pods_path, params=dict(labelSelector=selectorString)))
self._request("DELETE", pods_path, params=dict(labelSelector=selectorString))
except:
logger.exception("Failed to send delete pod call for job %s", builder_id)
logger.error("Failed to send delete pod call for job %s", builder_id)
raise ExecutorException("Failed to send delete pod call for job %s", builder_id)
class LogPipe(threading.Thread):

View File

@@ -1,8 +1,7 @@
{% macro overridelist() -%}
REALM={{ realm }}
TOKEN={{ token }}
SERVER={{ websocket_scheme }}://{{ manager_hostname }}
SERVER={{ http_scheme }}://{{ manager_hostname }}
{%- endmacro %}