mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
* chore: drop deprecated tables and remove unused code * isort imports * migration: check for table existence before drop
961 lines
38 KiB
Python
961 lines
38 KiB
Python
import calendar
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
|
|
import dateutil.parser
|
|
from prometheus_client import Counter, Histogram
|
|
|
|
from app import app, instance_keys
|
|
from buildman.build_token import (
|
|
BUILD_JOB_REGISTRATION_TYPE,
|
|
BUILD_JOB_TOKEN_TYPE,
|
|
InvalidBearerTokenException,
|
|
build_token,
|
|
verify_build_token,
|
|
)
|
|
from buildman.interface import (
|
|
RESULT_PHASES,
|
|
BuildJobAlreadyExistsError,
|
|
BuildJobDoesNotExistsError,
|
|
BuildJobError,
|
|
BuildJobResult,
|
|
BuildStateInterface,
|
|
)
|
|
from buildman.jobutil.buildjob import BuildJob, BuildJobLoadException
|
|
from buildman.manager.executor import (
|
|
EC2Executor,
|
|
KubernetesExecutor,
|
|
KubernetesPodmanExecutor,
|
|
PopenExecutor,
|
|
)
|
|
from buildman.orchestrator import (
|
|
ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION,
|
|
KeyEvent,
|
|
OrchestratorConnectionError,
|
|
OrchestratorError,
|
|
orchestrator_from_config,
|
|
)
|
|
from data import database, model
|
|
from data.database import BUILD_PHASE
|
|
from util import slash_join
|
|
from util.morecollections import AttrDict
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
build_fallback = Counter(
|
|
"quay_build_fallback_total", "number of times a build has been retried", labelnames=["executor"]
|
|
)
|
|
build_ack_duration = Histogram(
|
|
"quay_build_ack_duration_seconds",
|
|
"seconds taken for the builder to acknowledge a queued build",
|
|
labelnames=["executor"],
|
|
)
|
|
build_queue_duration = Histogram(
|
|
"quay_build_queue_duration_seconds",
|
|
"seconds taken for a build job to be scheduled from the queue",
|
|
labelnames=["schedule_success"],
|
|
)
|
|
build_duration = Histogram(
|
|
"quay_build_duration_seconds",
|
|
"seconds taken for a build's execution",
|
|
labelnames=["executor", "job_status"], # status in (COMPLETE, INCOMPLETE, ERROR)
|
|
)
|
|
build_jobs = Counter("quay_build_jobs", "total number of build requests", labelnames=["success"])
|
|
|
|
JOB_PREFIX = "building/"
|
|
LOCK_PREFIX = "lock/"
|
|
CANCEL_PREFIX = "cancel/"
|
|
METRIC_PREFIX = "metric/"
|
|
|
|
EPHEMERAL_API_TIMEOUT = 20
|
|
EPHEMERAL_SETUP_TIMEOUT = 500
|
|
WORK_CHECK_TIMEOUT = 10
|
|
SETUP_LEEWAY_SECONDS = 30
|
|
|
|
# Schedule retry durations
|
|
RETRY_IMMEDIATELY_SLEEP_DURATION = 0
|
|
TOO_MANY_WORKERS_SLEEP_DURATION = 10
|
|
CREATED_JOB_TIMEOUT_SLEEP_DURATION = 10
|
|
|
|
JOB_REGISTRATION_TIMEOUT = 30
|
|
JOB_TIMEOUT_SECONDS = 300
|
|
MINIMUM_JOB_EXTENSION = timedelta(minutes=1)
|
|
|
|
HEARTBEAT_PERIOD_SECONDS = 30
|
|
HEARTBEAT_DELTA = timedelta(seconds=60)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EphemeralBuilderManager(BuildStateInterface):
|
|
PHASES_NOT_ALLOWED_TO_CANCEL_FROM = (
|
|
BUILD_PHASE.PUSHING,
|
|
BUILD_PHASE.COMPLETE,
|
|
BUILD_PHASE.ERROR,
|
|
BUILD_PHASE.INTERNAL_ERROR,
|
|
BUILD_PHASE.CANCELLED,
|
|
)
|
|
ARCHIVABLE_BUILD_PHASES = (BUILD_PHASE.COMPLETE, BUILD_PHASE.ERROR, BUILD_PHASE.CANCELLED)
|
|
COMPLETED_PHASES = ARCHIVABLE_BUILD_PHASES + (BUILD_PHASE.INTERNAL_ERROR,)
|
|
|
|
EXECUTORS = {
|
|
"popen": PopenExecutor,
|
|
"ec2": EC2Executor,
|
|
"kubernetes": KubernetesExecutor,
|
|
"kubernetesPodman": KubernetesPodmanExecutor,
|
|
}
|
|
|
|
def __init__(
|
|
self, registry_hostname, manager_hostname, queue, build_logs, user_files, instance_keys
|
|
):
|
|
self._registry_hostname = registry_hostname
|
|
self._manager_hostname = manager_hostname
|
|
self._queue = queue
|
|
self._build_logs = build_logs
|
|
self._user_files = user_files
|
|
self._instance_keys = instance_keys
|
|
|
|
self._ordered_executors = []
|
|
self._executor_name_to_executor = {}
|
|
|
|
self._manager_config = {}
|
|
self._orchestrator = None
|
|
|
|
def initialize(self, manager_config):
|
|
self._manager_config = manager_config
|
|
if manager_config.get("EXECUTORS"):
|
|
for executor_config in manager_config["EXECUTORS"]:
|
|
self._load_executor(executor_config.get("EXECUTOR"), executor_config)
|
|
else:
|
|
self._load_executor(
|
|
manager_config.get("EXECUTOR"), manager_config.get("EXECUTOR_CONFIG")
|
|
)
|
|
|
|
logger.debug("calling orchestrator_from_config")
|
|
self._orchestrator = orchestrator_from_config(manager_config)
|
|
|
|
logger.debug("setting on_key_change callbacks for job expiry, cancel")
|
|
self._orchestrator.on_key_change(self._job_prefix, self._job_expired_callback)
|
|
self._orchestrator.on_key_change(self._cancel_prefix, self._job_cancelled_callback)
|
|
|
|
def _load_executor(self, executor_kind_name, executor_config):
|
|
executor_klass = EphemeralBuilderManager.EXECUTORS.get(executor_kind_name)
|
|
if executor_klass is None:
|
|
logger.error("Unknown executor %s; skipping install", executor_kind_name)
|
|
return
|
|
|
|
executor = executor_klass(executor_config, self._registry_hostname, self._manager_hostname)
|
|
if executor.name in self._executor_name_to_executor:
|
|
raise Exception("Executor with name %s already registered" % executor.name)
|
|
|
|
self._ordered_executors.append(executor)
|
|
self._executor_name_to_executor[executor.name] = executor
|
|
|
|
def generate_build_token(self, token_type, build_id, job_id, expiration):
|
|
return build_token(
|
|
self._manager_hostname, token_type, build_id, job_id, expiration, self._instance_keys
|
|
)
|
|
|
|
def verify_build_token(self, token, token_type):
|
|
return verify_build_token(token, self._manager_hostname, token_type, self._instance_keys)
|
|
|
|
def _config_prefix(self, key):
|
|
if self._manager_config.get("ORCHESTRATOR") is None:
|
|
return key
|
|
|
|
prefix = self._manager_config.get("ORCHESTRATOR_PREFIX", "")
|
|
return slash_join(prefix, key).lstrip("/") + "/"
|
|
|
|
@property
|
|
def _job_prefix(self):
|
|
return self._config_prefix(JOB_PREFIX)
|
|
|
|
@property
|
|
def _cancel_prefix(self):
|
|
return self._config_prefix(CANCEL_PREFIX)
|
|
|
|
@property
|
|
def _metric_prefix(self):
|
|
return self._config_prefix(METRIC_PREFIX)
|
|
|
|
@property
|
|
def _lock_prefix(self):
|
|
return self._config_prefix(LOCK_PREFIX)
|
|
|
|
@property
|
|
def machine_max_expiration(self):
|
|
return self._manager_config.get("MACHINE_MAX_TIME", 7200)
|
|
|
|
def _lock_key(self, build_id):
|
|
"""Create a key which is used to get a lock on a job in the Orchestrator."""
|
|
return slash_join(self._lock_prefix, build_id)
|
|
|
|
def _metric_key(self, build_id):
|
|
"""Create a key which is used to track a job's metrics in the Orchestrator."""
|
|
return slash_join(self._metric_prefix, build_id)
|
|
|
|
def _job_key(self, build_id):
|
|
"""Creates a key which is used to track a job in the Orchestrator."""
|
|
return slash_join(self._job_prefix, build_id)
|
|
|
|
def _build_job_from_job_id(self, job_id):
|
|
"""Return the BuildJob from the job id."""
|
|
try:
|
|
job_data = self._orchestrator.get_key(job_id)
|
|
except KeyError:
|
|
raise BuildJobDoesNotExistsError(job_id)
|
|
except (OrchestratorConnectionError, OrchestratorError) as oe:
|
|
raise BuildJobError(oe)
|
|
|
|
job_metadata = json.loads(job_data)
|
|
build_job = BuildJob(AttrDict(job_metadata["job_queue_item"]))
|
|
return build_job
|
|
|
|
def create_job(self, build_id, build_metadata):
|
|
"""Create the job in the orchestrator.
|
|
The job will expire if it is not scheduled within JOB_REGISTRATION_TIMEOUT.
|
|
"""
|
|
# Sets max threshold for build heartbeats. i.e max total running time of the build (default: 2h)
|
|
# This is separate from the redis key expiration, which is kept alive with heartbeats from the worker.
|
|
max_expiration = datetime.utcnow() + timedelta(seconds=self.machine_max_expiration)
|
|
build_metadata["max_expiration"] = calendar.timegm(max_expiration.timetuple())
|
|
build_metadata["last_heartbeat"] = None
|
|
build_metadata["created_at"] = time.time()
|
|
|
|
job_key = self._job_key(build_id)
|
|
try:
|
|
self._orchestrator.set_key(
|
|
job_key,
|
|
json.dumps(build_metadata),
|
|
overwrite=False,
|
|
expiration=self._manager_config.get(
|
|
"JOB_REGISTRATION_TIMEOUT", JOB_REGISTRATION_TIMEOUT
|
|
),
|
|
)
|
|
except KeyError:
|
|
raise BuildJobAlreadyExistsError(job_key)
|
|
except (OrchestratorConnectionError, OrchestratorError) as je:
|
|
raise BuildJobError(je)
|
|
|
|
return job_key
|
|
|
|
def job_scheduled(self, job_id, control_plane, execution_id, max_startup_time):
|
|
"""Mark the given job as scheduled with execution id, with max_startup_time.
|
|
A job is considered scheduled once a worker is started with a given registration token.
|
|
"""
|
|
# Get job to schedule
|
|
try:
|
|
job_data = self._orchestrator.get_key(job_id)
|
|
job_data_json = json.loads(job_data)
|
|
except KeyError:
|
|
logger.warning(
|
|
"Failed to mark job %s as scheduled. Job no longer exists in the orchestrator",
|
|
job_id,
|
|
)
|
|
return False
|
|
except Exception as e:
|
|
logger.warning("Exception loading job %s from orchestrator: %s", job_id, e)
|
|
return False
|
|
|
|
# Update build context
|
|
job_data_json["executor_name"] = control_plane
|
|
job_data_json["execution_id"] = execution_id
|
|
try:
|
|
self._orchestrator.set_key(
|
|
job_id, json.dumps(job_data_json), overwrite=True, expiration=max_startup_time
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Exception updating job %s in orchestrator: %s", job_id, e)
|
|
return False
|
|
|
|
build_job = BuildJob(AttrDict(job_data_json["job_queue_item"]))
|
|
updated = self.update_job_phase(job_id, BUILD_PHASE.BUILD_SCHEDULED)
|
|
if updated:
|
|
self._queue.extend_processing(
|
|
build_job.job_item,
|
|
seconds_from_now=max_startup_time
|
|
+ 60, # Add some leeway to allow the expiry event to complete
|
|
minimum_extension=MINIMUM_JOB_EXTENSION,
|
|
)
|
|
|
|
logger.debug(
|
|
"Job scheduled for job %s with execution with ID %s on control plane %s with max startup time of %s",
|
|
job_id,
|
|
execution_id,
|
|
control_plane,
|
|
max_startup_time,
|
|
)
|
|
else:
|
|
logger.warning("Job %s not scheduled. Unable update build phase to SCHEDULED", job_id)
|
|
|
|
return updated
|
|
|
|
def job_unschedulable(self, job_id):
|
|
"""Stop tracking the given unschedulable job.
|
|
Deletes any states that might have previously been stored in the orchestrator.
|
|
"""
|
|
try:
|
|
build_job = self._build_job_from_job_id(job_id)
|
|
self._cleanup_job_from_orchestrator(build_job)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Exception trying to mark job %s as unschedulable. Some state may not have been cleaned/updated: %s",
|
|
job_id,
|
|
e,
|
|
)
|
|
|
|
def on_job_complete(self, build_job, job_result, executor_name, execution_id):
|
|
"""Handle a completed job by updating the queue, job metrics, and cleaning up
|
|
any remaining state.
|
|
|
|
If the job result is INCOMPLETE, the job is requeued with its retry restored.
|
|
If a job result is in EXPIRED or ERROR, the job is requeued, but it retry is not restored.
|
|
|
|
If the job is cancelled, it is not requeued.
|
|
|
|
If the job is completed, it is marked as such in the queue.
|
|
|
|
Also checks the disable threshold on the build trigger if the phase is in (INTERNAL_ERROR, ERROR)
|
|
"""
|
|
job_id = self._job_key(build_job.build_uuid)
|
|
logger.debug("Calling job complete callback for job %s with result %s", job_id, job_result)
|
|
|
|
self._write_duration_metric(build_duration, build_job.build_uuid, job_status=job_result)
|
|
|
|
# Build timeout. No retry restored
|
|
if job_result == BuildJobResult.EXPIRED:
|
|
self._queue.incomplete(build_job.job_item, restore_retry=False, retry_after=30)
|
|
if not build_job.has_retries_remaining():
|
|
build_job.send_notification("build_failure")
|
|
logger.warning(
|
|
"Job %s completed with result %s. Requeuing build without restoring retry.",
|
|
job_id,
|
|
job_result,
|
|
)
|
|
|
|
# Unfinished build due to internal error. Restore retry.
|
|
elif job_result == BuildJobResult.INCOMPLETE:
|
|
logger.warning(
|
|
"Job %s completed with result %s. Requeuing build with retry restored.",
|
|
job_id,
|
|
job_result,
|
|
)
|
|
self._queue.incomplete(build_job.job_item, restore_retry=True, retry_after=30)
|
|
|
|
elif job_result in (
|
|
BuildJobResult.ERROR,
|
|
BuildJobResult.COMPLETE,
|
|
BuildJobResult.CANCELLED,
|
|
):
|
|
if job_result == BuildJobResult.ERROR and not build_job.has_retries_remaining():
|
|
# increment counter for failed jobs
|
|
build_jobs.labels("false").inc()
|
|
build_job.send_notification("build_failure")
|
|
if job_result == BuildJobResult.COMPLETE:
|
|
build_job.send_notification("build_success")
|
|
build_jobs.labels("true").inc()
|
|
logger.warning(
|
|
"Job %s completed with result %s. Marking build done in queue.", job_id, job_result
|
|
)
|
|
self._queue.complete(build_job.job_item)
|
|
|
|
# Disable trigger if needed
|
|
if build_job.repo_build.trigger is not None:
|
|
model.build.update_trigger_disable_status(
|
|
build_job.repo_build.trigger, RESULT_PHASES[job_result]
|
|
)
|
|
|
|
# Cleanup job from executors
|
|
if executor_name and execution_id:
|
|
self._terminate_executor(executor_name, execution_id)
|
|
|
|
# Cleanup job from orchestrator
|
|
self._cleanup_job_from_orchestrator(build_job)
|
|
|
|
logger.debug("Job completed for job %s with result %s", job_id, job_result)
|
|
|
|
def start_job(self, job_id):
|
|
"""Starts the build job. This is invoked by the worker once the job has been created and
|
|
scheduled, returing the buildpack needed to start the actual build.
|
|
"""
|
|
try:
|
|
job_data = self._orchestrator.get_key(job_id)
|
|
job_data_json = json.loads(job_data)
|
|
build_job = BuildJob(AttrDict(job_data_json["job_queue_item"]))
|
|
except KeyError:
|
|
logger.warning("Failed to start job %s. Job does not exists in orchestrator", job_id)
|
|
return None, None
|
|
except Exception as e:
|
|
logger.error("Exception loading job %s from orchestrator: %s", job_id, e)
|
|
return None, None
|
|
|
|
# Construct the buildpack
|
|
repo = build_job.repo_build.repository
|
|
repository_name = repo.namespace_user.username + "/" + repo.name
|
|
context, dockerfile_path = build_job.extract_dockerfile_args()
|
|
base_image_information = {}
|
|
if build_job.pull_credentials:
|
|
base_image_information["username"] = build_job.pull_credentials.get("username", "")
|
|
base_image_information["password"] = build_job.pull_credentials.get("password", "")
|
|
|
|
build_args = {
|
|
"build_package": build_job.get_build_package_url(self._user_files),
|
|
"context": context,
|
|
"dockerfile_path": dockerfile_path,
|
|
"repository": repository_name,
|
|
"registry": self._registry_hostname,
|
|
"pull_token": build_job.repo_build.access_token.get_code(),
|
|
"push_token": build_job.repo_build.access_token.get_code(),
|
|
"tag_names": build_job.build_config.get("docker_tags", ["latest"]),
|
|
"base_image": base_image_information,
|
|
}
|
|
|
|
private_key = None
|
|
if (
|
|
build_job.repo_build.trigger is not None
|
|
and build_job.repo_build.trigger.secure_private_key is not None
|
|
):
|
|
private_key = build_job.repo_build.trigger.secure_private_key.decrypt()
|
|
|
|
if private_key is not None:
|
|
build_args["git"] = {
|
|
"url": build_job.build_config["trigger_metadata"].get("git_url", ""),
|
|
"sha": build_job.commit_sha(),
|
|
"private_key": private_key or "",
|
|
}
|
|
|
|
# If the build args have no buildpack, mark it as a failure before sending
|
|
# it to a builder instance.
|
|
if not build_args["build_package"] and not build_args["git"]:
|
|
logger.error(
|
|
"Failed to start job %s: insufficient build args - No package url or git",
|
|
job_id,
|
|
)
|
|
self.update_job_phase(job_id, BUILD_PHASE.INTERNAL_ERROR)
|
|
return (None, None)
|
|
|
|
# Generate the build token
|
|
token = self.generate_build_token(
|
|
BUILD_JOB_TOKEN_TYPE, build_job.build_uuid, job_id, self.machine_max_expiration
|
|
)
|
|
|
|
# Publish the time it took for a worker to ack the build
|
|
self._write_duration_metric(build_ack_duration, build_job.build_uuid)
|
|
|
|
logger.debug("Started build job %s with arguments %s", job_id, build_args)
|
|
return (token, build_args)
|
|
|
|
def update_job_phase(self, job_id, phase, phase_metadata=None):
|
|
"""Updates the given job's phase and append the phase change to the buildlogs, with the
|
|
given phase metadata. If the job reaches a completed state, update_job_phase also update the
|
|
queue and cleanups any existing state and executors.
|
|
"""
|
|
try:
|
|
job_data = self._orchestrator.get_key(job_id)
|
|
job_data_json = json.loads(job_data)
|
|
build_job = BuildJob(AttrDict(job_data_json["job_queue_item"]))
|
|
except KeyError:
|
|
logger.warning("Job %s no longer exists in the orchestrator, likely expired", job_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Exception loading job %s from orchestrator: %s", job_id, e)
|
|
return False
|
|
|
|
# Check if the build has not already reached a final phase
|
|
if build_job.repo_build.phase in EphemeralBuilderManager.ARCHIVABLE_BUILD_PHASES:
|
|
logger.warning(
|
|
"Job %s is already in a final completed phase (%s), cannot update to %s",
|
|
job_id,
|
|
build_job.repo_build.phase,
|
|
phase,
|
|
)
|
|
return False
|
|
|
|
# Job is already in desired phase. Don't need to do anything.
|
|
# We return true to allow the caller to move forward.
|
|
if build_job.repo_build.phase == phase:
|
|
logger.warning("Job %s is already in the desired state/phase (%s)", job_id, phase)
|
|
return True
|
|
|
|
# Update the build phase
|
|
phase_metadata = phase_metadata or {}
|
|
updated = model.build.update_phase_then_close(build_job.build_uuid, phase)
|
|
if updated:
|
|
self.append_log_message(
|
|
build_job.build_uuid, phase, self._build_logs.PHASE, phase_metadata
|
|
)
|
|
|
|
# Check if on_job_complete needs to be called
|
|
if updated and phase in EphemeralBuilderManager.COMPLETED_PHASES:
|
|
executor_name = job_data_json.get("executor_name")
|
|
execution_id = job_data_json.get("execution_id")
|
|
|
|
if phase == BUILD_PHASE.ERROR:
|
|
self.on_job_complete(build_job, BuildJobResult.ERROR, executor_name, execution_id)
|
|
elif phase == BUILD_PHASE.COMPLETE:
|
|
self.on_job_complete(
|
|
build_job, BuildJobResult.COMPLETE, executor_name, execution_id
|
|
)
|
|
elif phase == BUILD_PHASE.INTERNAL_ERROR:
|
|
self.on_job_complete(
|
|
build_job, BuildJobResult.INCOMPLETE, executor_name, execution_id
|
|
)
|
|
elif phase == BUILD_PHASE.CANCELLED:
|
|
self.on_job_complete(
|
|
build_job, BuildJobResult.CANCELLED, executor_name, execution_id
|
|
)
|
|
|
|
return updated
|
|
|
|
def job_heartbeat(self, job_id):
|
|
"""Extend the processing time in the queue and updates the ttl of the job in the
|
|
orchestrator.
|
|
"""
|
|
try:
|
|
job_data = self._orchestrator.get_key(job_id)
|
|
job_data_json = json.loads(job_data)
|
|
build_job = BuildJob(AttrDict(job_data_json["job_queue_item"]))
|
|
except KeyError:
|
|
logger.warning("Job %s no longer exists in the orchestrator, likely expired", job_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Exception loading job %s from orchestrator: %s", job_id, e)
|
|
return False
|
|
|
|
max_expiration = datetime.utcfromtimestamp(job_data_json["max_expiration"])
|
|
max_expiration_remaining = max_expiration - datetime.utcnow()
|
|
max_expiration_sec = max(1, int(max_expiration_remaining.total_seconds()))
|
|
ttl = min(HEARTBEAT_PERIOD_SECONDS * 2, max_expiration_sec)
|
|
|
|
# Update job expirations
|
|
if (
|
|
job_data_json["last_heartbeat"]
|
|
and dateutil.parser.isoparse(job_data_json["last_heartbeat"])
|
|
< datetime.utcnow() - HEARTBEAT_DELTA
|
|
):
|
|
logger.warning(
|
|
"Heartbeat expired for job %s. Marking job as expired. Last heartbeat received at %s",
|
|
job_data_json["last_heartbeat"],
|
|
)
|
|
self.update_job_phase(job_id, BUILD_PHASE.INTERNAL_ERROR)
|
|
return False
|
|
|
|
job_data_json["last_heartbeat"] = str(datetime.utcnow())
|
|
|
|
self._queue.extend_processing(
|
|
build_job.job_item,
|
|
seconds_from_now=JOB_TIMEOUT_SECONDS,
|
|
minimum_extension=MINIMUM_JOB_EXTENSION,
|
|
)
|
|
|
|
try:
|
|
self._orchestrator.set_key(
|
|
job_id, json.dumps(job_data_json), overwrite=True, expiration=ttl
|
|
)
|
|
except OrchestratorConnectionError:
|
|
logger.error(
|
|
"Could not update heartbeat for job %s. Orchestrator is not available", job_id
|
|
)
|
|
return False
|
|
|
|
return True
|
|
|
|
def cancel_build(self, build_id):
|
|
build = model.build.get_repository_build(build_id)
|
|
if build.phase in EphemeralBuilderManager.PHASES_NOT_ALLOWED_TO_CANCEL_FROM:
|
|
return False
|
|
|
|
cancelled = model.build.update_phase_then_close(build_id, BUILD_PHASE.CANCELLED)
|
|
if cancelled:
|
|
try:
|
|
job_data = self._orchestrator.get_key(self._job_key(build_id))
|
|
job_data_json = json.loads(job_data)
|
|
build_job = BuildJob(AttrDict(job_data_json["job_queue_item"]))
|
|
self.on_job_complete(
|
|
build_job,
|
|
BuildJobResult.CANCELLED,
|
|
job_data_json.get("executor_name"),
|
|
job_data_json.get("execution_id"),
|
|
)
|
|
except KeyError:
|
|
logger.warning(
|
|
"Could not cleanup cancelled job %s. Job does not exist in orchestrator", job_id
|
|
)
|
|
|
|
return cancelled
|
|
|
|
def determine_cached_tag(self, build_id, base_image_id):
|
|
job_id = self._job_key(build_id)
|
|
try:
|
|
job_data = self._orchestrator.get_key(job_id)
|
|
job_data_json = json.loads(job_data)
|
|
build_job = BuildJob(AttrDict(job_data_json["job_queue_item"]))
|
|
except KeyError:
|
|
logger.warning("Job %s does not exist in orchestrator", job_id)
|
|
return None
|
|
except Exception as e:
|
|
logger.warning("Exception loading job from orchestrator: %s", e)
|
|
return None
|
|
|
|
return build_job.determine_cached_tag(base_image_id)
|
|
|
|
def schedule(self, build_id):
|
|
"""Schedule an existed job to be started on the configured control planes (executors)."""
|
|
logger.debug("Scheduling build %s", build_id)
|
|
|
|
allowed_worker_count = self._manager_config.get("ALLOWED_WORKER_COUNT", 1)
|
|
try:
|
|
if self._running_workers() >= allowed_worker_count:
|
|
logger.warning(
|
|
"Could not schedule build %s. Number of workers at capacity: %s.",
|
|
build_id,
|
|
self._running_workers(),
|
|
)
|
|
return False, TOO_MANY_WORKERS_SLEEP_DURATION
|
|
except Exception as exe:
|
|
logger.warning("Failed to get worker count from executors: %s", exe)
|
|
return False, EPHEMERAL_API_TIMEOUT
|
|
|
|
job_id = self._job_key(build_id)
|
|
try:
|
|
build_job = self._build_job_from_job_id(job_id)
|
|
except BuildJobDoesNotExistsError as bjne:
|
|
logger.warning(
|
|
"Failed to schedule job %s - Job no longer exists in the orchestrator, likely expired: %s",
|
|
job_id,
|
|
bjne,
|
|
)
|
|
return False, CREATED_JOB_TIMEOUT_SLEEP_DURATION
|
|
except BuildJobError as bje:
|
|
logger.warning(
|
|
"Failed to schedule job %s - Could not get job from orchestrator: %s", job_id, bje
|
|
)
|
|
return False, ORCHESTRATOR_UNAVAILABLE_SLEEP_DURATION
|
|
|
|
registration_timeout = self._manager_config.get(
|
|
"JOB_REGISTRATION_TIMEOUT", JOB_REGISTRATION_TIMEOUT
|
|
)
|
|
registration_token = self.generate_build_token(
|
|
BUILD_JOB_REGISTRATION_TYPE,
|
|
build_job.build_uuid,
|
|
job_id,
|
|
registration_timeout + SETUP_LEEWAY_SECONDS,
|
|
)
|
|
|
|
started_with_executor = None
|
|
execution_id = None
|
|
for executor in self._ordered_executors:
|
|
namespace = build_job.namespace
|
|
if not executor.allowed_for_namespace(namespace):
|
|
logger.warning(
|
|
"Job %s (namespace: %s) cannot use executor %s",
|
|
job_id,
|
|
namespace,
|
|
executor.name,
|
|
)
|
|
continue
|
|
|
|
# Check if we can use this executor based on the retries remaining.
|
|
if executor.minimum_retry_threshold > build_job.retries_remaining:
|
|
build_fallback.labels(executor.name).inc()
|
|
logger.warning(
|
|
"Job %s cannot use executor %s as it is below retry threshold %s (retry #%s) - Falling back to next configured executor",
|
|
job_id,
|
|
executor.name,
|
|
executor.minimum_retry_threshold,
|
|
build_job.retries_remaining,
|
|
)
|
|
continue
|
|
|
|
logger.debug(
|
|
"Starting builder for job %s with selected executor: %s", job_id, executor.name
|
|
)
|
|
|
|
try:
|
|
execution_id = executor.start_builder(registration_token, build_job.build_uuid)
|
|
except:
|
|
logger.exception(
|
|
"Exception when starting builder for job: %s - Falling back to next configured executor",
|
|
job_id,
|
|
)
|
|
continue
|
|
|
|
started_with_executor = executor
|
|
|
|
# Break out of the loop now that we've started a builder successfully.
|
|
break
|
|
|
|
# If we didn't start the job, cleanup and return it to the queue.
|
|
if started_with_executor is None:
|
|
logger.error("Could not start ephemeral worker for build %s", build_job.build_uuid)
|
|
|
|
# Delete the associated build job record.
|
|
self._orchestrator.delete_key(job_id)
|
|
return False, EPHEMERAL_API_TIMEOUT
|
|
|
|
# Store metric data tracking job
|
|
metric_spec = json.dumps(
|
|
{
|
|
"executor_name": started_with_executor.name,
|
|
"start_time": time.time(),
|
|
}
|
|
)
|
|
|
|
# Mark the job as scheduled
|
|
setup_time = started_with_executor.setup_time or EPHEMERAL_SETUP_TIMEOUT
|
|
if not self.job_scheduled(job_id, started_with_executor.name, execution_id, setup_time):
|
|
return False, EPHEMERAL_API_TIMEOUT
|
|
|
|
self._write_metric_spec(build_job.build_uuid, metric_spec)
|
|
|
|
return True, None
|
|
|
|
def _job_expired_callback(self, key_change):
|
|
"""Callback invoked when job key is changed, except for CREATE, SET events.
|
|
DELETE and EXPIRE exvents make sure the build is marked as completed and remove any
|
|
state tracking, executors left.
|
|
"""
|
|
if key_change.event == KeyEvent.EXPIRE:
|
|
job_metadata = json.loads(key_change.value)
|
|
build_job = BuildJob(AttrDict(job_metadata["job_queue_item"]))
|
|
|
|
logger.info("Build job key expire event: %s", build_job.build_uuid)
|
|
|
|
executor_name = job_metadata.get("executor_name")
|
|
execution_id = job_metadata.get("execution_id")
|
|
|
|
job_result = BuildJobResult.EXPIRED
|
|
|
|
model.build.update_phase_then_close(build_job.build_uuid, RESULT_PHASES[job_result])
|
|
self.on_job_complete(build_job, job_result, executor_name, execution_id)
|
|
|
|
def _job_cancelled_callback(self, key_change):
|
|
if key_change.event not in (KeyEvent.CREATE, KeyEvent.SET):
|
|
return
|
|
|
|
job_metadata = json.loads(key_change.value)
|
|
build_job = BuildJob(AttrDict(job_metadata["job_queue_item"]))
|
|
executor_name = job_metadata.get("executor_name")
|
|
execution_id = job_metadata.get("execution_id")
|
|
|
|
job_result = BuildJobResult.CANCELLED
|
|
self.on_job_complete(build_job, job_result, executor_name, execution_id)
|
|
|
|
def _cleanup_job_from_orchestrator(self, build_job):
|
|
"""Cleanup the given job from the orchestrator.
|
|
This includes any keys related to that job: job keys, expiry keys, metric keys, ...
|
|
"""
|
|
lock_key = self._lock_key(build_job.build_uuid)
|
|
lock_acquired = self._orchestrator.lock(lock_key)
|
|
if lock_acquired:
|
|
try:
|
|
self._orchestrator.delete_key(self._job_key(build_job.build_uuid))
|
|
self._orchestrator.delete_key(self._metric_key(build_job.build_uuid))
|
|
except KeyError:
|
|
pass
|
|
finally:
|
|
self._orchestrator.delete_key(lock_key) # Release lock
|
|
|
|
def append_build_log(self, build_id, log_message):
|
|
"""
|
|
Append the logs from Docker's build output.
|
|
This checks if the given message is a "STEP" line from Docker's output,
|
|
and set the log type to "COMMAND" if so.
|
|
|
|
See https://github.com/quay/quay-builder/blob/master/docker/log_writer.go
|
|
to get the serialized message structure
|
|
"""
|
|
try:
|
|
log_data = json.loads(log_message)
|
|
except ValueError:
|
|
return False
|
|
|
|
fully_unwrapped = ""
|
|
keys_to_extract = ["error", "status", "stream"]
|
|
for key in keys_to_extract:
|
|
if key in log_data:
|
|
fully_unwrapped = log_data[key]
|
|
break
|
|
|
|
current_log_string = str(fully_unwrapped)
|
|
current_step = _extract_current_step(current_log_string)
|
|
|
|
if current_step:
|
|
self.append_log_message(build_id, current_log_string, log_type=self._build_logs.COMMAND)
|
|
else:
|
|
self.append_log_message(build_id, current_log_string)
|
|
|
|
return True
|
|
|
|
def append_log_message(self, build_id, log_message, log_type=None, log_data=None):
|
|
"""
|
|
Append the given message to the buildlogs.
|
|
|
|
log_data adds additional context to the log message.
|
|
|
|
log_type can be one of: "command", "phase", "error"
|
|
If the log_message is an output line of Docker's build output, and not the first line of a RUN command,
|
|
log_type should be set to None.
|
|
|
|
For example, an entry for a phase change might have the following structure:
|
|
{
|
|
"type": "phase"
|
|
"message": "build-scheduled"
|
|
"data": {
|
|
"datetime": "2020-10-26 05:37:25.932196"
|
|
}
|
|
}
|
|
"""
|
|
|
|
log_data = log_data or {}
|
|
log_data["datetime"] = str(datetime.now())
|
|
|
|
try:
|
|
self._build_logs.append_log_message(build_id, log_message, log_type, log_data)
|
|
except Exception as e:
|
|
logger.exception("Could not append log to buildlogs for build %s - %s", e, build_id)
|
|
|
|
def _running_workers(self):
|
|
return sum([x.running_builders_count for x in self._ordered_executors])
|
|
|
|
def _terminate_executor(self, executor_name, execution_id):
|
|
"""Cleanup existing running executor running on `executor_name` with `execution_id`."""
|
|
executor = self._executor_name_to_executor.get(executor_name)
|
|
if executor is None:
|
|
logger.error(
|
|
"Could not find registered executor %s to terminate %s", executor_name, execution_id
|
|
)
|
|
return
|
|
|
|
# Terminate the executor's execution
|
|
logger.debug("Terminating executor %s with execution id %s", executor_name, execution_id)
|
|
executor.stop_builder(execution_id)
|
|
|
|
def _write_metric_spec(self, build_id, payload):
|
|
metric_key = self._metric_key(build_id)
|
|
try:
|
|
self._orchestrator.set_key(
|
|
metric_key,
|
|
payload,
|
|
overwrite=False,
|
|
expiration=self.machine_max_expiration + 60,
|
|
)
|
|
except KeyError:
|
|
logger.warning(
|
|
"Metric already exists in orchestrator for build %s. Build was likely started before and requeued.",
|
|
build_id,
|
|
)
|
|
except (OrchestratorConnectionError, OrchestratorError) as oe:
|
|
logger.error("Error when writing metric for build %s to orchestrator: %s", build_id, oe)
|
|
|
|
def _write_duration_metric(self, metric, build_id, job_status=None):
|
|
try:
|
|
metric_data = self._orchestrator.get_key(self._metric_key(build_id))
|
|
parsed_metric_data = json.loads(metric_data)
|
|
start_time = parsed_metric_data["start_time"]
|
|
executor = parsed_metric_data.get("executor_name", "unknown")
|
|
if job_status is not None:
|
|
metric.labels(executor, str(job_status)).observe(time.time() - start_time)
|
|
else:
|
|
metric.labels(executor).observe(time.time() - start_time)
|
|
except Exception:
|
|
logger.warning("Could not write metric for build %s", build_id)
|
|
|
|
def _work_checker(self):
|
|
logger.debug("Initializing work checker")
|
|
while True:
|
|
logger.debug("Writing queue metrics")
|
|
self._queue.update_metrics()
|
|
|
|
with database.CloseForLongOperation(app.config):
|
|
time.sleep(WORK_CHECK_TIMEOUT)
|
|
|
|
logger.debug("Checking for more work from the build queue")
|
|
processing_time = EPHEMERAL_SETUP_TIMEOUT + SETUP_LEEWAY_SECONDS
|
|
job_item = self._queue.get(processing_time=processing_time, ordering_required=True)
|
|
|
|
if job_item is None:
|
|
logger.debug(
|
|
"No additional work found. Going to sleep for %s seconds", WORK_CHECK_TIMEOUT
|
|
)
|
|
continue
|
|
|
|
try:
|
|
build_job = BuildJob(job_item)
|
|
except BuildJobLoadException as bjle:
|
|
logger.error(
|
|
"BuildJobLoadException. Job data: %s. No retry restore. - %s",
|
|
job_item.body,
|
|
bjle,
|
|
)
|
|
self._queue.incomplete(job_item, restore_retry=False)
|
|
continue
|
|
|
|
build_id = build_job.build_uuid
|
|
job_id = self._job_key(build_id)
|
|
|
|
try:
|
|
logger.debug("Creating build job for build %s", build_id)
|
|
self.create_job(build_id, {"job_queue_item": build_job.job_item})
|
|
except BuildJobAlreadyExistsError:
|
|
logger.warning(
|
|
"Attempted to create job %s that already exists. Cleaning up existing job and returning it to the queue.",
|
|
job_id,
|
|
)
|
|
self.job_unschedulable(job_id)
|
|
self._queue.incomplete(job_item, restore_retry=True)
|
|
continue
|
|
except BuildJobError as je:
|
|
logger.error("Create job exception. Build %s - %s", build_id, je)
|
|
self._queue.incomplete(job_item, restore_retry=True)
|
|
continue
|
|
|
|
try:
|
|
logger.debug("Scheduling build job %s", job_id)
|
|
schedule_success, retry_timeout = self.schedule(build_id)
|
|
except Exception as se:
|
|
logger.exception("Exception when scheduling job %s: %s", build_job.build_uuid, se)
|
|
self._queue.incomplete(job_item, restore_retry=True, retry_after=WORK_CHECK_TIMEOUT)
|
|
continue
|
|
|
|
if schedule_success:
|
|
logger.debug("Build job %s scheduled.", job_id)
|
|
else:
|
|
# job failed to schedule - increment counter
|
|
build_jobs.labels("false").inc()
|
|
logger.warning(
|
|
"Unsuccessful schedule. Build ID: %s. Check build executor service.",
|
|
build_job.repo_build.uuid,
|
|
)
|
|
self.job_unschedulable(job_id)
|
|
self._queue.incomplete(job_item, restore_retry=False, retry_after=retry_timeout)
|
|
|
|
# record time spent in queue
|
|
job_data = self._orchestrator.get_key(job_id)
|
|
job_data_json = json.loads(job_data)
|
|
created_at = job_data_json["created_at"]
|
|
build_queue_duration.labels(schedule_success).observe(time.time() - created_at)
|
|
|
|
|
|
def _extract_current_step(current_status_string):
|
|
"""
|
|
Attempts to extract the current step numeric identifier from the given status string.
|
|
|
|
Returns the step number or None if none.
|
|
"""
|
|
# Older format: `Step 12 :`
|
|
# Newer format: `Step 4/13 :`
|
|
step_increment = re.search(r"Step ([0-9]+)/([0-9]+) :", current_status_string)
|
|
if step_increment:
|
|
return int(step_increment.group(1))
|
|
|
|
step_increment = re.search(r"Step ([0-9]+) :", current_status_string)
|
|
if step_increment:
|
|
return int(step_increment.group(1))
|