mirror of
https://gitlab.com/bzip2/bzip2.git
synced 2025-07-30 07:23:05 +03:00
Remodel test suite using Python unittests. Add more thorough test cases for original "quick" test suite. Add in automatic Valgrind testing for the "quick" test suite on Linux if Valgrind was detected at build-time. Add larger suite of assorted .bz2 test files as git submodule. Updates to the Compiling documentation. Increased the minimum Meson version to 0.56 because this commit uses the `fs` module that was introduced in 0.56. Added Python 3's `pytest` as a required module for running the tests when using Meson.
216 lines
7.2 KiB
Python
216 lines
7.2 KiB
Python
|
|
'''
|
|
Wrapper for Python's unittest.TestCase that sets up BZip2 testing environment.
|
|
'''
|
|
|
|
__copyright__ = "Copyright (C) 2022 Micah Snyder"
|
|
|
|
from math import ceil
|
|
import os
|
|
from pathlib import Path
|
|
import platform
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import unittest
|
|
from typing import Tuple, Union, NamedTuple
|
|
|
|
# Use older Python 3.5 syntax.
|
|
CmdResult = NamedTuple('CmdResult', [('ec', int), ('out', bytes), ('err', bytes)])
|
|
|
|
class TestCase(unittest.TestCase):
|
|
|
|
version = ""
|
|
|
|
path_source = None
|
|
path_build = None
|
|
path_tmp = None
|
|
|
|
bzip2 = None
|
|
|
|
valgrind = "" # Not 'None' because we'll use this variable even if valgrind not found.
|
|
valgrind_args = []
|
|
|
|
original_working_directory = ""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
'''
|
|
Prepare test environment:
|
|
- Create a temporary testing directory.
|
|
- Get paths needed for tests from environment variables.
|
|
'''
|
|
cls.operating_system = platform.platform().split("-")[0].lower()
|
|
|
|
# The bzip2 program uses the BZIP and BZIP2 environment variables as
|
|
# additional input. We must purge them to prevent OS environment
|
|
# variables from affecting the test suite.
|
|
os.environ.pop('BZIP', None)
|
|
os.environ.pop('BZIP2', None)
|
|
|
|
# Version may be used for testing bzip2 --version output, etc.
|
|
cls.version = os.getenv("VERSION")
|
|
if cls.version == None:
|
|
raise Exception("VERSION environment variable not defined! Aborting...")
|
|
|
|
# Get test paths from environment variables.
|
|
cls.path_source = Path(os.getenv("PATH_SOURCE"))
|
|
cls.path_build = Path(os.getenv("PATH_BUILD"))
|
|
cls.bzip2 = Path(os.getenv("PATH_BZIP2")) if os.getenv("PATH_BZIP2") != None else None
|
|
|
|
# Generate temp directory
|
|
cls.path_tmp = Path(tempfile.mkdtemp(prefix=(cls.__name__ + "-"), dir=os.getenv("TMP")))
|
|
|
|
# Enable valgrind testing if VALGRIND variable set to path of Valgrind executable.
|
|
if os.getenv('VALGRIND') != None:
|
|
valgrind = Path(os.getenv("VALGRIND"))
|
|
|
|
if valgrind.is_file():
|
|
cls.valgrind = valgrind
|
|
|
|
logfile = cls.path_tmp / 'valgrind.log'
|
|
cls.valgrind_args = [
|
|
'-v',
|
|
'--trace-children=yes',
|
|
'--track-fds=yes',
|
|
'--leak-check=full',
|
|
'--gen-suppressions=all',
|
|
'--show-leak-kinds=definite',
|
|
'--errors-for-leak-kinds=definite',
|
|
f'--log-file={logfile}',
|
|
'--error-exitcode=123',
|
|
]
|
|
|
|
# Perform all tests with cwd set to the cls.path_tmp, created above.
|
|
cls.original_working_directory = os.getcwd()
|
|
os.chdir(cls.path_tmp)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
'''
|
|
Clean up after ourselves,
|
|
Delete the generated tmp directory.
|
|
'''
|
|
print("")
|
|
|
|
# Restore current working directory before deleting cls.path_tmp.
|
|
os.chdir(cls.original_working_directory)
|
|
|
|
if None == os.getenv("KEEPTEMP"):
|
|
try:
|
|
shutil.rmtree(cls.path_tmp)
|
|
print("Removed tmp directory: {}".format(cls.path_tmp))
|
|
except Exception:
|
|
print("No tmp directory to clean up.")
|
|
|
|
def setUp(self):
|
|
print('\n')
|
|
|
|
def tearDown(self):
|
|
print('\n')
|
|
|
|
def execute(self, cmd: list, try_valgrind: bool = True) -> CmdResult:
|
|
'''
|
|
Execute a subprocess.Popen list of commands.
|
|
Return a tuple of
|
|
'''
|
|
# Use valgrind if we have it.
|
|
if try_valgrind and self.valgrind != '':
|
|
cmd = [str(self.valgrind),] + self.valgrind_args + cmd
|
|
|
|
print(f"Running: {' '.join(cmd)}\n")
|
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
out, err = p.communicate()
|
|
|
|
# Check the valgrind log for errors.
|
|
if try_valgrind and self.valgrind != '':
|
|
self.verify_valgrind_log()
|
|
|
|
return CmdResult(p.returncode, out, err)
|
|
|
|
def verify_valgrind_log(self, log_file: Union[Path, None]=None):
|
|
'''
|
|
Check if valgrind log file contains errors.
|
|
If valgrind not enabled this is basically a nop.
|
|
'''
|
|
if self.valgrind == "":
|
|
return
|
|
|
|
if log_file == None:
|
|
log_file = self.path_tmp / 'valgrind.log'
|
|
|
|
if not log_file.exists():
|
|
raise AssertionError('{} not found. Valgrind failed to run?'.format(log_file))
|
|
|
|
errors = False
|
|
print('Verifying {}...'.format(log_file))
|
|
try:
|
|
with log_file.open('r') as the_log:
|
|
assert 'ERROR SUMMARY: 0 errors' not in the_log
|
|
except AssertionError:
|
|
print("*" * 80)
|
|
print('Valgrind test failed!'.center(80, ' '))
|
|
print('Please submit a bug report with this log to https://gitlab.com/bzip2/bzip2/issues'.center(69, ' '))
|
|
print(str(log_file).center(80, ' '))
|
|
print("*" * 80)
|
|
errors = True
|
|
finally:
|
|
with log_file.open('r') as log:
|
|
found_summary = False
|
|
for line in log.readlines():
|
|
if 'ERROR SUMMARY' in line:
|
|
found_summary = True
|
|
if (found_summary or errors) and len(line) < 500:
|
|
print(line.rstrip('\n'))
|
|
if errors:
|
|
raise AssertionError('Valgrind test FAILED!')
|
|
|
|
@staticmethod
|
|
def hex_compare(actual: bytes, expected: bytes, size: int = 16):
|
|
'''
|
|
Return string with hex comparison of two buffers
|
|
'''
|
|
a_lines = ceil(float(len(actual)) / float(size))
|
|
e_lines = ceil(float(len(expected)) / float(size))
|
|
lines = max(a_lines, e_lines)
|
|
|
|
comparison = ' ' + \
|
|
f'output ({len(actual)}):'.ljust(size*2 + 3) + \
|
|
f'expected ({len(expected)}):\n'
|
|
|
|
def render_slice(to_print, to_compare):
|
|
line = ''
|
|
for byte in range(0, size):
|
|
if byte == size / 2:
|
|
line += ' '
|
|
|
|
if byte < len(to_print):
|
|
if byte >= len(to_compare) or to_print[byte] != to_compare[byte]:
|
|
line += '\x1b[1;33m{:02x}\x1b[0m'.format(to_print[byte]) # bold yellow
|
|
else:
|
|
line += '{:02x}'.format(to_print[byte]) # plain
|
|
else:
|
|
line += ' '
|
|
|
|
return line
|
|
|
|
prev_is_dots = False
|
|
for line in range(0, lines):
|
|
a_line = actual[line * size : line * size + size]
|
|
e_line = expected[line * size : line * size + size]
|
|
|
|
if a_line == e_line:
|
|
if prev_is_dots == False:
|
|
comparison += " ...\n"
|
|
prev_is_dots = True
|
|
else:
|
|
text_line = '{:8d}: {} {}'.format(
|
|
line * size,
|
|
render_slice(a_line, e_line),
|
|
render_slice(e_line, a_line)
|
|
)
|
|
comparison += text_line + '\n'
|
|
prev_is_dots = False
|
|
|
|
return comparison + '\n'
|