1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/test/testlogs.py
Kenny Lee Sin Cheong 5f63b3a7bb
chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

250 lines
8.3 KiB
Python

import datetime
import logging
from copy import deepcopy
from functools import partial, wraps
from random import SystemRandom
from jinja2.utils import generate_lorem_ipsum
from data.buildlogs import RedisBuildLogs
logger = logging.getLogger(__name__)
random = SystemRandom()
get_sentence = partial(generate_lorem_ipsum, html=False, n=1, min=5, max=10)
def maybe_advance_script(is_get_status=False):
def inner_advance(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
advance_units = random.randint(1, 500)
logger.debug("Advancing script %s units", advance_units)
while advance_units > 0 and self.remaining_script:
units = self.remaining_script[0][0]
if advance_units > units:
advance_units -= units
self.advance_script(is_get_status)
else:
break
return func(self, *args, **kwargs)
return wrapper
return inner_advance
class TestBuildLogs(RedisBuildLogs):
COMMAND_TYPES = [
"FROM",
"MAINTAINER",
"RUN",
"CMD",
"EXPOSE",
"ENV",
"ADD",
"ENTRYPOINT",
"VOLUME",
"USER",
"WORKDIR",
]
STATUS_TEMPLATE = {
"total_commands": None,
"current_command": None,
"push_completion": 0.0,
"pull_completion": 0.0,
}
def __init__(self, redis_config, namespace, repository, test_build_id, allow_delegate=True):
super(TestBuildLogs, self).__init__(redis_config)
self.namespace = namespace
self.repository = repository
self.test_build_id = test_build_id
self.allow_delegate = allow_delegate
self.remaining_script = self._generate_script()
logger.debug("Total script size: %s", len(self.remaining_script))
self._logs = []
self._status = {}
self._last_status = {}
def advance_script(self, is_get_status):
(_, log, status_wrapper) = self.remaining_script.pop(0)
if log is not None:
self._logs.append(log)
if status_wrapper is not None:
(phase, status) = status_wrapper
if not is_get_status:
from data import model
build_obj = model.build.get_repository_build(self.test_build_id)
build_obj.phase = phase
build_obj.save()
self._status = status
self._last_status = status
def _generate_script(self):
script = []
# generate the init phase
script.append(self._generate_phase(400, "initializing"))
script.extend(self._generate_logs(random.randint(1, 3)))
# move to the building phase
script.append(self._generate_phase(400, "building"))
total_commands = random.randint(5, 20)
for command_num in range(1, total_commands + 1):
command_weight = random.randint(50, 100)
script.append(self._generate_command(command_num, total_commands, command_weight))
# we want 0 logs some percent of the time
num_logs = max(0, random.randint(-50, 400))
script.extend(self._generate_logs(num_logs))
# move to the pushing phase
script.append(self._generate_phase(400, "pushing"))
script.extend(self._generate_push_statuses(total_commands))
# move to the error or complete phase
if random.randint(0, 1) == 0:
script.append(self._generate_phase(400, "complete"))
else:
script.append(self._generate_phase(400, "error"))
script.append(
(1, {"message": "Something bad happened! Oh noes!", "type": self.ERROR}, None)
)
return script
def _generate_phase(self, start_weight, phase_name):
message = {
"message": phase_name,
"type": self.PHASE,
"datetime": str(datetime.datetime.now()),
}
return (start_weight, message, (phase_name, deepcopy(self.STATUS_TEMPLATE)))
def _generate_command(self, command_num, total_commands, command_weight):
sentence = get_sentence()
command = random.choice(self.COMMAND_TYPES)
if command == "FROM":
sentence = random.choice(
[
"ubuntu",
"lopter/raring-base",
"quay.io/devtable/simple",
"quay.io/buynlarge/orgrepo",
"stackbrew/ubuntu:precise",
]
)
msg = {
"message": "Step %s: %s %s" % (command_num, command, sentence),
"type": self.COMMAND,
"datetime": str(datetime.datetime.now()),
}
status = deepcopy(self.STATUS_TEMPLATE)
status["total_commands"] = total_commands
status["current_command"] = command_num
return (command_weight, msg, ("building", status))
@staticmethod
def _generate_logs(count):
others = []
if random.randint(0, 10) <= 8:
premessage = {
"message": "\x1b[91m" + get_sentence(),
"data": {"datetime": str(datetime.datetime.now())},
}
postmessage = {"message": "\x1b[0m", "data": {"datetime": str(datetime.datetime.now())}}
count = count - 2
others = [(1, premessage, None), (1, postmessage, None)]
def get_message():
return {"message": get_sentence(), "data": {"datetime": str(datetime.datetime.now())}}
return others + [(1, get_message(), None) for _ in range(count)]
@staticmethod
def _compute_total_completion(statuses, total_images):
percentage_with_sizes = float(len(list(statuses.values()))) / total_images
sent_bytes = sum([status["current"] for status in list(statuses.values())])
total_bytes = sum([status["total"] for status in list(statuses.values())])
return float(sent_bytes) / total_bytes * percentage_with_sizes
@staticmethod
def _generate_push_statuses(total_commands):
push_status_template = deepcopy(TestBuildLogs.STATUS_TEMPLATE)
push_status_template["current_command"] = total_commands
push_status_template["total_commands"] = total_commands
push_statuses = []
one_mb = 1 * 1024 * 1024
num_images = random.randint(2, 7)
sizes = [random.randint(one_mb, one_mb * 5) for _ in range(num_images)]
image_completion = {}
for image_num, image_size in enumerate(sizes):
image_id = "image_id_%s" % image_num
image_completion[image_id] = {
"current": 0,
"total": image_size,
}
for i in range(one_mb, image_size, one_mb):
image_completion[image_id]["current"] = i
new_status = deepcopy(push_status_template)
completion = TestBuildLogs._compute_total_completion(image_completion, num_images)
new_status["push_completion"] = completion
push_statuses.append((250, None, ("pushing", new_status)))
return push_statuses
@maybe_advance_script()
def get_log_entries(self, build_id, start_index):
if build_id == self.test_build_id:
return (len(self._logs), self._logs[start_index:])
elif not self.allow_delegate:
return None
else:
return super(TestBuildLogs, self).get_log_entries(build_id, start_index)
@maybe_advance_script(True)
def get_status(self, build_id):
if build_id == self.test_build_id:
returnable_status = self._last_status
self._last_status = self._status
return returnable_status
elif not self.allow_delegate:
return None
else:
return super(TestBuildLogs, self).get_status(build_id)
def expire_log_entries(self, build_id):
if build_id == self.test_build_id:
return
if not self.allow_delegate:
return None
else:
return super(TestBuildLogs, self).expire_log_entries(build_id)
def delete_log_entries(self, build_id):
if build_id == self.test_build_id:
return
if not self.allow_delegate:
return None
else:
return super(TestBuildLogs, self).delete_log_entries(build_id)