mirror of
https://github.com/facebook/proxygen.git
synced 2025-08-07 07:02:53 +03:00
getdeps: add an add_fb_python_unittest()
function
Summary: Add a function for defining Python unit tests. This creates the test executable, and also emits logic to perform test discovery for ctest. Reviewed By: simpkins Differential Revision: D17610034 fbshipit-source-id: cdf15b0b04acc1d3e906a1e2a95eb327951176ba
This commit is contained in:
committed by
Facebook Github Bot
parent
5bb91345f0
commit
b072f4002f
@@ -62,6 +62,11 @@ find_program(
|
|||||||
FB_MAKE_PYTHON_ARCHIVE "make_fbpy_archive.py"
|
FB_MAKE_PYTHON_ARCHIVE "make_fbpy_archive.py"
|
||||||
PATHS ${CMAKE_MODULE_PATH}
|
PATHS ${CMAKE_MODULE_PATH}
|
||||||
)
|
)
|
||||||
|
set(FB_PY_TEST_MAIN "${CMAKE_CURRENT_LIST_DIR}/fb_py_test_main.py")
|
||||||
|
set(
|
||||||
|
FB_PY_TEST_DISCOVER_SCRIPT
|
||||||
|
"${CMAKE_CURRENT_LIST_DIR}/FBPythonTestAddTests.cmake"
|
||||||
|
)
|
||||||
|
|
||||||
# An option to control the default installation location for
|
# An option to control the default installation location for
|
||||||
# install_fb_python_library(). This is relative to ${CMAKE_INSTALL_PREFIX}
|
# install_fb_python_library(). This is relative to ${CMAKE_INSTALL_PREFIX}
|
||||||
@@ -164,6 +169,119 @@ function(add_fb_python_executable EXE_NAME)
|
|||||||
PROPERTY EXECUTABLE "${CMAKE_CURRENT_BINARY_DIR}/${output_file}")
|
PROPERTY EXECUTABLE "${CMAKE_CURRENT_BINARY_DIR}/${output_file}")
|
||||||
endfunction()
|
endfunction()
|
||||||
|
|
||||||
|
# Define a python unittest executable.
|
||||||
|
# The executable is built using add_fb_python_executable and has the
|
||||||
|
# following differences:
|
||||||
|
#
|
||||||
|
# Each of the source files specified in SOURCES will be imported
|
||||||
|
# and have unittest discovery performed upon them.
|
||||||
|
# Those sources will be imported in the top level namespace.
|
||||||
|
#
|
||||||
|
# The ENV argument allows specifying a list of "KEY=VALUE"
|
||||||
|
# pairs that will be used by the test runner to set up the environment
|
||||||
|
# in the child process prior to running the test. This is useful for
|
||||||
|
# passing additional configuration to the test.
|
||||||
|
function(add_fb_python_unittest TARGET)
|
||||||
|
# Parse the arguments
|
||||||
|
set(multi_value_args SOURCES DEPENDS ENV PROPERTIES)
|
||||||
|
set(
|
||||||
|
one_value_args
|
||||||
|
WORKING_DIRECTORY BASE_DIR NAMESPACE TEST_LIST DISCOVERY_TIMEOUT
|
||||||
|
)
|
||||||
|
fb_cmake_parse_args(
|
||||||
|
ARG "" "${one_value_args}" "${multi_value_args}" "${ARGN}"
|
||||||
|
)
|
||||||
|
fb_py_process_default_args(ARG_NAMESPACE ARG_BASE_DIR)
|
||||||
|
if(NOT ARG_WORKING_DIRECTORY)
|
||||||
|
# Default the working directory to the current binary directory.
|
||||||
|
# This matches the default behavior of add_test() and other standard
|
||||||
|
# test functions like gtest_discover_tests()
|
||||||
|
set(ARG_WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}")
|
||||||
|
endif()
|
||||||
|
if(NOT ARG_TEST_LIST)
|
||||||
|
set(ARG_TEST_LIST "${TARGET}_TESTS")
|
||||||
|
endif()
|
||||||
|
if(NOT ARG_DISCOVERY_TIMEOUT)
|
||||||
|
set(ARG_DISCOVERY_TIMEOUT 5)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
# Tell our test program the list of modules to scan for tests.
|
||||||
|
# We scan all modules directly listed in our SOURCES argument, and skip
|
||||||
|
# modules that came from dependencies in the DEPENDS list.
|
||||||
|
#
|
||||||
|
# This is written into a __test_modules__.py module that the test runner
|
||||||
|
# will look at.
|
||||||
|
set(
|
||||||
|
test_modules_path
|
||||||
|
"${CMAKE_CURRENT_BINARY_DIR}/${TARGET}_test_modules.py"
|
||||||
|
)
|
||||||
|
file(WRITE "${test_modules_path}" "TEST_MODULES = [\n")
|
||||||
|
string(REPLACE "." "/" namespace_dir "${ARG_NAMESPACE}")
|
||||||
|
if (NOT "${namespace_dir}" STREQUAL "")
|
||||||
|
set(namespace_dir "${namespace_dir}/")
|
||||||
|
endif()
|
||||||
|
set(test_modules)
|
||||||
|
foreach(src_path IN LISTS ARG_SOURCES)
|
||||||
|
fb_py_compute_dest_path(
|
||||||
|
abs_source dest_path
|
||||||
|
"${src_path}" "${namespace_dir}" "${ARG_BASE_DIR}"
|
||||||
|
)
|
||||||
|
string(REPLACE "/" "." module_name "${dest_path}")
|
||||||
|
string(REGEX REPLACE "\\.py$" "" module_name "${module_name}")
|
||||||
|
list(APPEND test_modules "${module_name}")
|
||||||
|
file(APPEND "${test_modules_path}" " '${module_name}',\n")
|
||||||
|
endforeach()
|
||||||
|
file(APPEND "${test_modules_path}" "]\n")
|
||||||
|
|
||||||
|
# The __main__ is provided by our runner wrapper/bootstrap
|
||||||
|
list(APPEND ARG_SOURCES "${FB_PY_TEST_MAIN}=__main__.py")
|
||||||
|
list(APPEND ARG_SOURCES "${test_modules_path}=__test_modules__.py")
|
||||||
|
|
||||||
|
add_fb_python_executable(
|
||||||
|
"${TARGET}"
|
||||||
|
NAMESPACE "${ARG_NAMESPACE}"
|
||||||
|
BASE_DIR "${ARG_BASE_DIR}"
|
||||||
|
SOURCES ${ARG_SOURCES}
|
||||||
|
DEPENDS ${ARG_DEPENDS}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Run test discovery after the test executable is built.
|
||||||
|
# This logic is based on the code for gtest_discover_tests()
|
||||||
|
set(ctest_file_base "${CMAKE_CURRENT_BINARY_DIR}/${TARGET}")
|
||||||
|
set(ctest_include_file "${ctest_file_base}_include.cmake")
|
||||||
|
set(ctest_tests_file "${ctest_file_base}_tests.cmake")
|
||||||
|
add_custom_command(
|
||||||
|
TARGET "${TARGET}.GEN_PY_EXE" POST_BUILD
|
||||||
|
BYPRODUCTS "${ctest_tests_file}"
|
||||||
|
COMMAND
|
||||||
|
"${CMAKE_COMMAND}"
|
||||||
|
-D "TEST_TARGET=${TARGET}"
|
||||||
|
-D "TEST_INTERPRETER=${Python3_EXECUTABLE}"
|
||||||
|
-D "TEST_ENV=${ARG_ENV}"
|
||||||
|
-D "TEST_EXECUTABLE=$<TARGET_PROPERTY:${TARGET}.GEN_PY_EXE,EXECUTABLE>"
|
||||||
|
-D "TEST_WORKING_DIR=${ARG_WORKING_DIRECTORY}"
|
||||||
|
-D "TEST_LIST=${ARG_TEST_LIST}"
|
||||||
|
-D "TEST_PREFIX=${TARGET}::"
|
||||||
|
-D "TEST_PROPERTIES=${ARG_PROPERTIES}"
|
||||||
|
-D "CTEST_FILE=${ctest_tests_file}"
|
||||||
|
-P "${FB_PY_TEST_DISCOVER_SCRIPT}"
|
||||||
|
VERBATIM
|
||||||
|
)
|
||||||
|
|
||||||
|
file(
|
||||||
|
WRITE "${ctest_include_file}"
|
||||||
|
"if(EXISTS \"${ctest_tests_file}\")\n"
|
||||||
|
" include(\"${ctest_tests_file}\")\n"
|
||||||
|
"else()\n"
|
||||||
|
" add_test(\"${TARGET}_NOT_BUILT\" \"${TARGET}_NOT_BUILT\")\n"
|
||||||
|
"endif()\n"
|
||||||
|
)
|
||||||
|
set_property(
|
||||||
|
DIRECTORY APPEND PROPERTY TEST_INCLUDE_FILES
|
||||||
|
"${ctest_include_file}"
|
||||||
|
)
|
||||||
|
endfunction()
|
||||||
|
|
||||||
#
|
#
|
||||||
# Define a python library.
|
# Define a python library.
|
||||||
#
|
#
|
||||||
@@ -278,35 +396,11 @@ function(add_fb_python_library LIB_NAME)
|
|||||||
file(WRITE "${tmp_manifest}" "FBPY_MANIFEST 1\n")
|
file(WRITE "${tmp_manifest}" "FBPY_MANIFEST 1\n")
|
||||||
set(abs_sources)
|
set(abs_sources)
|
||||||
foreach(src_path IN LISTS ARG_SOURCES)
|
foreach(src_path IN LISTS ARG_SOURCES)
|
||||||
if("${src_path}" MATCHES "=")
|
fb_py_compute_dest_path(
|
||||||
# We want to split the string on the `=` sign, but cmake doesn't
|
abs_source dest_path
|
||||||
# provide much in the way of helpers for this, so we rewrite the
|
"${src_path}" "${namespace_dir}" "${ARG_BASE_DIR}"
|
||||||
# `=` sign to `;` so that we can treat it as a cmake list and
|
)
|
||||||
# then index into the components
|
|
||||||
string(REPLACE "=" ";" src_path_list "${src_path}")
|
|
||||||
list(GET src_path_list 0 src_path)
|
|
||||||
# Note that we ignore the `namespace_dir` in the alias case
|
|
||||||
# in order to allow aliasing a source to the top level `__main__.py`
|
|
||||||
# filename.
|
|
||||||
list(GET src_path_list 1 dest_path)
|
|
||||||
else()
|
|
||||||
unset(dest_path)
|
|
||||||
endif()
|
|
||||||
|
|
||||||
get_filename_component(abs_source "${src_path}" ABSOLUTE)
|
|
||||||
list(APPEND abs_sources "${abs_source}")
|
list(APPEND abs_sources "${abs_source}")
|
||||||
file(RELATIVE_PATH rel_src "${ARG_BASE_DIR}" "${abs_source}")
|
|
||||||
|
|
||||||
if(NOT DEFINED dest_path)
|
|
||||||
if("${rel_src}" MATCHES "^../")
|
|
||||||
message(
|
|
||||||
FATAL_ERROR "${LIB_NAME}: source file \"${abs_source}\" is not inside "
|
|
||||||
"the base directory ${ARG_BASE_DIR}"
|
|
||||||
)
|
|
||||||
endif()
|
|
||||||
set(dest_path "${namespace_dir}${rel_src}")
|
|
||||||
endif()
|
|
||||||
|
|
||||||
target_sources(
|
target_sources(
|
||||||
"${LIB_NAME}.py_lib" INTERFACE
|
"${LIB_NAME}.py_lib" INTERFACE
|
||||||
"$<BUILD_INTERFACE:${abs_source}>"
|
"$<BUILD_INTERFACE:${abs_source}>"
|
||||||
@@ -489,7 +583,42 @@ function(fb_py_check_available)
|
|||||||
if (NOT FB_MAKE_PYTHON_ARCHIVE)
|
if (NOT FB_MAKE_PYTHON_ARCHIVE)
|
||||||
message(
|
message(
|
||||||
FATAL_ERROR "unable to find make_fbpy_archive.py helper program (it "
|
FATAL_ERROR "unable to find make_fbpy_archive.py helper program (it "
|
||||||
"should be located in the same directory as FBPythonRules.cmake)"
|
"should be located in the same directory as FBPythonBinary.cmake)"
|
||||||
)
|
)
|
||||||
endif()
|
endif()
|
||||||
endfunction()
|
endfunction()
|
||||||
|
|
||||||
|
function(
|
||||||
|
fb_py_compute_dest_path
|
||||||
|
src_path_output dest_path_output src_path namespace_dir base_dir
|
||||||
|
)
|
||||||
|
if("${src_path}" MATCHES "=")
|
||||||
|
# We want to split the string on the `=` sign, but cmake doesn't
|
||||||
|
# provide much in the way of helpers for this, so we rewrite the
|
||||||
|
# `=` sign to `;` so that we can treat it as a cmake list and
|
||||||
|
# then index into the components
|
||||||
|
string(REPLACE "=" ";" src_path_list "${src_path}")
|
||||||
|
list(GET src_path_list 0 src_path)
|
||||||
|
# Note that we ignore the `namespace_dir` in the alias case
|
||||||
|
# in order to allow aliasing a source to the top level `__main__.py`
|
||||||
|
# filename.
|
||||||
|
list(GET src_path_list 1 dest_path)
|
||||||
|
else()
|
||||||
|
unset(dest_path)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
get_filename_component(abs_source "${src_path}" ABSOLUTE)
|
||||||
|
if(NOT DEFINED dest_path)
|
||||||
|
file(RELATIVE_PATH rel_src "${ARG_BASE_DIR}" "${abs_source}")
|
||||||
|
if("${rel_src}" MATCHES "^../")
|
||||||
|
message(
|
||||||
|
FATAL_ERROR "${LIB_NAME}: source file \"${abs_source}\" is not inside "
|
||||||
|
"the base directory ${ARG_BASE_DIR}"
|
||||||
|
)
|
||||||
|
endif()
|
||||||
|
set(dest_path "${namespace_dir}${rel_src}")
|
||||||
|
endif()
|
||||||
|
|
||||||
|
set("${src_path_output}" "${abs_source}" PARENT_SCOPE)
|
||||||
|
set("${dest_path_output}" "${dest_path}" PARENT_SCOPE)
|
||||||
|
endfunction()
|
||||||
|
59
build/fbcode_builder/CMake/FBPythonTestAddTests.cmake
Normal file
59
build/fbcode_builder/CMake/FBPythonTestAddTests.cmake
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||||
|
|
||||||
|
# Add a command to be emitted to the CTest file
|
||||||
|
set(ctest_script)
|
||||||
|
function(add_command CMD)
|
||||||
|
set(escaped_args "")
|
||||||
|
foreach(arg ${ARGN})
|
||||||
|
# Escape all arguments using "Bracket Argument" syntax
|
||||||
|
# We could skip this for argument that don't contain any special
|
||||||
|
# characters if we wanted to make the output slightly more human-friendly.
|
||||||
|
set(escaped_args "${escaped_args} [==[${arg}]==]")
|
||||||
|
endforeach()
|
||||||
|
set(ctest_script "${ctest_script}${CMD}(${escaped_args})\n" PARENT_SCOPE)
|
||||||
|
endfunction()
|
||||||
|
|
||||||
|
if(NOT EXISTS "${TEST_EXECUTABLE}")
|
||||||
|
message(FATAL_ERROR "Test executable does not exist: ${TEST_EXECUTABLE}")
|
||||||
|
endif()
|
||||||
|
execute_process(
|
||||||
|
COMMAND ${CMAKE_COMMAND} -E env ${TEST_ENV} "${TEST_INTERPRETER}" "${TEST_EXECUTABLE}" --list-tests
|
||||||
|
WORKING_DIRECTORY "${TEST_WORKING_DIR}"
|
||||||
|
OUTPUT_VARIABLE output
|
||||||
|
RESULT_VARIABLE result
|
||||||
|
)
|
||||||
|
if(NOT "${result}" EQUAL 0)
|
||||||
|
string(REPLACE "\n" "\n " output "${output}")
|
||||||
|
message(
|
||||||
|
FATAL_ERROR
|
||||||
|
"Error running test executable: ${TEST_EXECUTABLE}\n"
|
||||||
|
"Output:\n"
|
||||||
|
" ${output}\n"
|
||||||
|
)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
# Parse output
|
||||||
|
string(REPLACE "\n" ";" tests_list "${output}")
|
||||||
|
foreach(test_name ${tests_list})
|
||||||
|
add_command(
|
||||||
|
add_test
|
||||||
|
"${TEST_PREFIX}${test_name}"
|
||||||
|
${CMAKE_COMMAND} -E env ${TEST_ENV}
|
||||||
|
"${TEST_INTERPRETER}" "${TEST_EXECUTABLE}" "${test_name}"
|
||||||
|
)
|
||||||
|
add_command(
|
||||||
|
set_tests_properties
|
||||||
|
"${TEST_PREFIX}${test_name}"
|
||||||
|
PROPERTIES
|
||||||
|
WORKING_DIRECTORY "${TEST_WORKING_DIR}"
|
||||||
|
${TEST_PROPERTIES}
|
||||||
|
)
|
||||||
|
endforeach()
|
||||||
|
|
||||||
|
# Set a list of discovered tests in the parent scope, in case users
|
||||||
|
# want access to this list as a CMake variable
|
||||||
|
if(TEST_LIST)
|
||||||
|
add_command(set ${TEST_LIST} ${tests_list})
|
||||||
|
endif()
|
||||||
|
|
||||||
|
file(WRITE "${CTEST_FILE}" "${ctest_script}")
|
820
build/fbcode_builder/CMake/fb_py_test_main.py
Normal file
820
build/fbcode_builder/CMake/fb_py_test_main.py
Normal file
@@ -0,0 +1,820 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
#
|
||||||
|
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||||
|
#
|
||||||
|
"""
|
||||||
|
This file contains the main module code for Python test programs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import ctypes
|
||||||
|
import fnmatch
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import optparse
|
||||||
|
import os
|
||||||
|
import platform
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
import unittest
|
||||||
|
import warnings
|
||||||
|
|
||||||
|
# Hide warning about importing "imp"; remove once python2 is gone.
|
||||||
|
with warnings.catch_warnings():
|
||||||
|
warnings.filterwarnings("ignore", category=DeprecationWarning)
|
||||||
|
import imp
|
||||||
|
|
||||||
|
try:
|
||||||
|
from StringIO import StringIO
|
||||||
|
except ImportError:
|
||||||
|
from io import StringIO
|
||||||
|
try:
|
||||||
|
import coverage
|
||||||
|
except ImportError:
|
||||||
|
coverage = None # type: ignore
|
||||||
|
try:
|
||||||
|
from importlib.machinery import SourceFileLoader
|
||||||
|
except ImportError:
|
||||||
|
SourceFileLoader = None # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
class get_cpu_instr_counter(object):
|
||||||
|
def read(self):
|
||||||
|
# TODO
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
EXIT_CODE_SUCCESS = 0
|
||||||
|
EXIT_CODE_TEST_FAILURE = 70
|
||||||
|
|
||||||
|
|
||||||
|
class TestStatus(object):
|
||||||
|
|
||||||
|
ABORTED = "FAILURE"
|
||||||
|
PASSED = "SUCCESS"
|
||||||
|
FAILED = "FAILURE"
|
||||||
|
EXPECTED_FAILURE = "SUCCESS"
|
||||||
|
UNEXPECTED_SUCCESS = "FAILURE"
|
||||||
|
SKIPPED = "ASSUMPTION_VIOLATION"
|
||||||
|
|
||||||
|
|
||||||
|
class PathMatcher(object):
|
||||||
|
def __init__(self, include_patterns, omit_patterns):
|
||||||
|
self.include_patterns = include_patterns
|
||||||
|
self.omit_patterns = omit_patterns
|
||||||
|
|
||||||
|
def omit(self, path):
|
||||||
|
"""
|
||||||
|
Omit iff matches any of the omit_patterns or the include patterns are
|
||||||
|
not empty and none is matched
|
||||||
|
"""
|
||||||
|
path = os.path.realpath(path)
|
||||||
|
return any(fnmatch.fnmatch(path, p) for p in self.omit_patterns) or (
|
||||||
|
self.include_patterns
|
||||||
|
and not any(fnmatch.fnmatch(path, p) for p in self.include_patterns)
|
||||||
|
)
|
||||||
|
|
||||||
|
def include(self, path):
|
||||||
|
return not self.omit(path)
|
||||||
|
|
||||||
|
|
||||||
|
class DebugWipeFinder(object):
|
||||||
|
"""
|
||||||
|
PEP 302 finder that uses a DebugWipeLoader for all files which do not need
|
||||||
|
coverage
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, matcher):
|
||||||
|
self.matcher = matcher
|
||||||
|
|
||||||
|
def find_module(self, fullname, path=None):
|
||||||
|
_, _, basename = fullname.rpartition(".")
|
||||||
|
try:
|
||||||
|
fd, pypath, (_, _, kind) = imp.find_module(basename, path)
|
||||||
|
except Exception:
|
||||||
|
# Maybe it's a top level module
|
||||||
|
try:
|
||||||
|
fd, pypath, (_, _, kind) = imp.find_module(basename, None)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if hasattr(fd, "close"):
|
||||||
|
fd.close()
|
||||||
|
if kind != imp.PY_SOURCE:
|
||||||
|
return None
|
||||||
|
if self.matcher.include(pypath):
|
||||||
|
return None
|
||||||
|
|
||||||
|
"""
|
||||||
|
This is defined to match CPython's PyVarObject struct
|
||||||
|
"""
|
||||||
|
|
||||||
|
class PyVarObject(ctypes.Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("ob_refcnt", ctypes.c_long),
|
||||||
|
("ob_type", ctypes.c_void_p),
|
||||||
|
("ob_size", ctypes.c_ulong),
|
||||||
|
]
|
||||||
|
|
||||||
|
class DebugWipeLoader(SourceFileLoader):
|
||||||
|
"""
|
||||||
|
PEP302 loader that zeros out debug information before execution
|
||||||
|
"""
|
||||||
|
|
||||||
|
def get_code(self, fullname):
|
||||||
|
code = super(DebugWipeLoader, self).get_code(fullname)
|
||||||
|
if code:
|
||||||
|
# Ideally we'd do
|
||||||
|
# code.co_lnotab = b''
|
||||||
|
# But code objects are READONLY. Not to worry though; we'll
|
||||||
|
# directly modify CPython's object
|
||||||
|
code_impl = PyVarObject.from_address(id(code.co_lnotab))
|
||||||
|
code_impl.ob_size = 0
|
||||||
|
return code
|
||||||
|
|
||||||
|
return DebugWipeLoader(fullname, pypath)
|
||||||
|
|
||||||
|
|
||||||
|
def optimize_for_coverage(cov, include_patterns, omit_patterns):
|
||||||
|
"""
|
||||||
|
We get better performance if we zero out debug information for files which
|
||||||
|
we're not interested in. Only available in CPython 3.3+
|
||||||
|
"""
|
||||||
|
matcher = PathMatcher(include_patterns, omit_patterns)
|
||||||
|
if SourceFileLoader and platform.python_implementation() == "CPython":
|
||||||
|
sys.meta_path.insert(0, DebugWipeFinder(matcher))
|
||||||
|
|
||||||
|
|
||||||
|
class TeeStream(object):
|
||||||
|
def __init__(self, *streams):
|
||||||
|
self._streams = streams
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
for stream in self._streams:
|
||||||
|
stream.write(data)
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
for stream in self._streams:
|
||||||
|
stream.flush()
|
||||||
|
|
||||||
|
def isatty(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class CallbackStream(object):
|
||||||
|
def __init__(self, callback, bytes_callback=None, orig=None):
|
||||||
|
self._callback = callback
|
||||||
|
self._fileno = orig.fileno() if orig else None
|
||||||
|
|
||||||
|
# Python 3 APIs:
|
||||||
|
# - `encoding` is a string holding the encoding name
|
||||||
|
# - `errors` is a string holding the error-handling mode for encoding
|
||||||
|
# - `buffer` should look like an io.BufferedIOBase object
|
||||||
|
|
||||||
|
self.errors = orig.errors if orig else None
|
||||||
|
if bytes_callback:
|
||||||
|
# those members are only on the io.TextIOWrapper
|
||||||
|
self.encoding = orig.encoding if orig else "UTF-8"
|
||||||
|
self.buffer = CallbackStream(bytes_callback, orig=orig)
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
self._callback(data)
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def isatty(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def fileno(self):
|
||||||
|
return self._fileno
|
||||||
|
|
||||||
|
|
||||||
|
class BuckTestResult(unittest._TextTestResult):
|
||||||
|
"""
|
||||||
|
Our own TestResult class that outputs data in a format that can be easily
|
||||||
|
parsed by buck's test runner.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_instr_counter = get_cpu_instr_counter()
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, stream, descriptions, verbosity, show_output, main_program, suite
|
||||||
|
):
|
||||||
|
super(BuckTestResult, self).__init__(stream, descriptions, verbosity)
|
||||||
|
self._main_program = main_program
|
||||||
|
self._suite = suite
|
||||||
|
self._results = []
|
||||||
|
self._current_test = None
|
||||||
|
self._saved_stdout = sys.stdout
|
||||||
|
self._saved_stderr = sys.stderr
|
||||||
|
self._show_output = show_output
|
||||||
|
|
||||||
|
def getResults(self):
|
||||||
|
return self._results
|
||||||
|
|
||||||
|
def startTest(self, test):
|
||||||
|
super(BuckTestResult, self).startTest(test)
|
||||||
|
|
||||||
|
# Pass in the real stdout and stderr filenos. We can't really do much
|
||||||
|
# here to intercept callers who directly operate on these fileno
|
||||||
|
# objects.
|
||||||
|
sys.stdout = CallbackStream(
|
||||||
|
self.addStdout, self.addStdoutBytes, orig=sys.stdout
|
||||||
|
)
|
||||||
|
sys.stderr = CallbackStream(
|
||||||
|
self.addStderr, self.addStderrBytes, orig=sys.stderr
|
||||||
|
)
|
||||||
|
self._current_test = test
|
||||||
|
self._test_start_time = time.time()
|
||||||
|
self._current_status = TestStatus.ABORTED
|
||||||
|
self._messages = []
|
||||||
|
self._stacktrace = None
|
||||||
|
self._stdout = ""
|
||||||
|
self._stderr = ""
|
||||||
|
self._start_instr_count = self._instr_counter.read()
|
||||||
|
|
||||||
|
def _find_next_test(self, suite):
|
||||||
|
"""
|
||||||
|
Find the next test that has not been run.
|
||||||
|
"""
|
||||||
|
|
||||||
|
for test in suite:
|
||||||
|
|
||||||
|
# We identify test suites by test that are iterable (as is done in
|
||||||
|
# the builtin python test harness). If we see one, recurse on it.
|
||||||
|
if hasattr(test, "__iter__"):
|
||||||
|
test = self._find_next_test(test)
|
||||||
|
|
||||||
|
# The builtin python test harness sets test references to `None`
|
||||||
|
# after they have run, so we know we've found the next test up
|
||||||
|
# if it's not `None`.
|
||||||
|
if test is not None:
|
||||||
|
return test
|
||||||
|
|
||||||
|
def stopTest(self, test):
|
||||||
|
sys.stdout = self._saved_stdout
|
||||||
|
sys.stderr = self._saved_stderr
|
||||||
|
|
||||||
|
super(BuckTestResult, self).stopTest(test)
|
||||||
|
|
||||||
|
# If a failure occured during module/class setup, then this "test" may
|
||||||
|
# actually be a `_ErrorHolder`, which doesn't contain explicit info
|
||||||
|
# about the upcoming test. Since we really only care about the test
|
||||||
|
# name field (i.e. `_testMethodName`), we use that to detect an actual
|
||||||
|
# test cases, and fall back to looking the test up from the suite
|
||||||
|
# otherwise.
|
||||||
|
if not hasattr(test, "_testMethodName"):
|
||||||
|
test = self._find_next_test(self._suite)
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"testCaseName": "{0}.{1}".format(
|
||||||
|
test.__class__.__module__, test.__class__.__name__
|
||||||
|
),
|
||||||
|
"testCase": test._testMethodName,
|
||||||
|
"type": self._current_status,
|
||||||
|
"time": int((time.time() - self._test_start_time) * 1000),
|
||||||
|
"message": os.linesep.join(self._messages),
|
||||||
|
"stacktrace": self._stacktrace,
|
||||||
|
"stdOut": self._stdout,
|
||||||
|
"stdErr": self._stderr,
|
||||||
|
}
|
||||||
|
|
||||||
|
# TestPilot supports an instruction count field.
|
||||||
|
if "TEST_PILOT" in os.environ:
|
||||||
|
result["instrCount"] = (
|
||||||
|
int(self._instr_counter.read() - self._start_instr_count),
|
||||||
|
)
|
||||||
|
|
||||||
|
self._results.append(result)
|
||||||
|
self._current_test = None
|
||||||
|
|
||||||
|
def stopTestRun(self):
|
||||||
|
cov = self._main_program.get_coverage()
|
||||||
|
if cov is not None:
|
||||||
|
self._results.append({"coverage": cov})
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _withTest(self, test):
|
||||||
|
self.startTest(test)
|
||||||
|
yield
|
||||||
|
self.stopTest(test)
|
||||||
|
|
||||||
|
def _setStatus(self, test, status, message=None, stacktrace=None):
|
||||||
|
assert test == self._current_test
|
||||||
|
self._current_status = status
|
||||||
|
self._stacktrace = stacktrace
|
||||||
|
if message is not None:
|
||||||
|
if message.endswith(os.linesep):
|
||||||
|
message = message[:-1]
|
||||||
|
self._messages.append(message)
|
||||||
|
|
||||||
|
def setStatus(self, test, status, message=None, stacktrace=None):
|
||||||
|
# addError() may be called outside of a test if one of the shared
|
||||||
|
# fixtures (setUpClass/tearDownClass/setUpModule/tearDownModule)
|
||||||
|
# throws an error.
|
||||||
|
#
|
||||||
|
# In this case, create a fake test result to record the error.
|
||||||
|
if self._current_test is None:
|
||||||
|
with self._withTest(test):
|
||||||
|
self._setStatus(test, status, message, stacktrace)
|
||||||
|
else:
|
||||||
|
self._setStatus(test, status, message, stacktrace)
|
||||||
|
|
||||||
|
def setException(self, test, status, excinfo):
|
||||||
|
exctype, value, tb = excinfo
|
||||||
|
self.setStatus(
|
||||||
|
test,
|
||||||
|
status,
|
||||||
|
"{0}: {1}".format(exctype.__name__, value),
|
||||||
|
"".join(traceback.format_tb(tb)),
|
||||||
|
)
|
||||||
|
|
||||||
|
def addSuccess(self, test):
|
||||||
|
super(BuckTestResult, self).addSuccess(test)
|
||||||
|
self.setStatus(test, TestStatus.PASSED)
|
||||||
|
|
||||||
|
def addError(self, test, err):
|
||||||
|
super(BuckTestResult, self).addError(test, err)
|
||||||
|
self.setException(test, TestStatus.ABORTED, err)
|
||||||
|
|
||||||
|
def addFailure(self, test, err):
|
||||||
|
super(BuckTestResult, self).addFailure(test, err)
|
||||||
|
self.setException(test, TestStatus.FAILED, err)
|
||||||
|
|
||||||
|
def addSkip(self, test, reason):
|
||||||
|
super(BuckTestResult, self).addSkip(test, reason)
|
||||||
|
self.setStatus(test, TestStatus.SKIPPED, "Skipped: %s" % (reason,))
|
||||||
|
|
||||||
|
def addExpectedFailure(self, test, err):
|
||||||
|
super(BuckTestResult, self).addExpectedFailure(test, err)
|
||||||
|
self.setException(test, TestStatus.EXPECTED_FAILURE, err)
|
||||||
|
|
||||||
|
def addUnexpectedSuccess(self, test):
|
||||||
|
super(BuckTestResult, self).addUnexpectedSuccess(test)
|
||||||
|
self.setStatus(test, TestStatus.UNEXPECTED_SUCCESS, "Unexpected success")
|
||||||
|
|
||||||
|
def addStdout(self, val):
|
||||||
|
self._stdout += val
|
||||||
|
if self._show_output:
|
||||||
|
self._saved_stdout.write(val)
|
||||||
|
self._saved_stdout.flush()
|
||||||
|
|
||||||
|
def addStdoutBytes(self, val):
|
||||||
|
string = val.decode("utf-8", errors="backslashreplace")
|
||||||
|
self.addStdout(string)
|
||||||
|
|
||||||
|
def addStderr(self, val):
|
||||||
|
self._stderr += val
|
||||||
|
if self._show_output:
|
||||||
|
self._saved_stderr.write(val)
|
||||||
|
self._saved_stderr.flush()
|
||||||
|
|
||||||
|
def addStderrBytes(self, val):
|
||||||
|
string = val.decode("utf-8", errors="backslashreplace")
|
||||||
|
self.addStderr(string)
|
||||||
|
|
||||||
|
|
||||||
|
class BuckTestRunner(unittest.TextTestRunner):
|
||||||
|
def __init__(self, main_program, suite, show_output=True, **kwargs):
|
||||||
|
super(BuckTestRunner, self).__init__(**kwargs)
|
||||||
|
self.show_output = show_output
|
||||||
|
self._main_program = main_program
|
||||||
|
self._suite = suite
|
||||||
|
|
||||||
|
def _makeResult(self):
|
||||||
|
return BuckTestResult(
|
||||||
|
self.stream,
|
||||||
|
self.descriptions,
|
||||||
|
self.verbosity,
|
||||||
|
self.show_output,
|
||||||
|
self._main_program,
|
||||||
|
self._suite,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _format_test_name(test_class, attrname):
|
||||||
|
return "{0}.{1}.{2}".format(test_class.__module__, test_class.__name__, attrname)
|
||||||
|
|
||||||
|
|
||||||
|
class StderrLogHandler(logging.StreamHandler):
|
||||||
|
"""
|
||||||
|
This class is very similar to logging.StreamHandler, except that it
|
||||||
|
always uses the current sys.stderr object.
|
||||||
|
|
||||||
|
StreamHandler caches the current sys.stderr object when it is constructed.
|
||||||
|
This makes it behave poorly in unit tests, which may replace sys.stderr
|
||||||
|
with a StringIO buffer during tests. The StreamHandler will continue using
|
||||||
|
the old sys.stderr object instead of the desired StringIO buffer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
logging.Handler.__init__(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def stream(self):
|
||||||
|
return sys.stderr
|
||||||
|
|
||||||
|
|
||||||
|
class RegexTestLoader(unittest.TestLoader):
|
||||||
|
def __init__(self, regex=None):
|
||||||
|
self.regex = regex
|
||||||
|
super(RegexTestLoader, self).__init__()
|
||||||
|
|
||||||
|
def getTestCaseNames(self, testCaseClass):
|
||||||
|
"""
|
||||||
|
Return a sorted sequence of method names found within testCaseClass
|
||||||
|
"""
|
||||||
|
|
||||||
|
testFnNames = super(RegexTestLoader, self).getTestCaseNames(testCaseClass)
|
||||||
|
if self.regex is None:
|
||||||
|
return testFnNames
|
||||||
|
robj = re.compile(self.regex)
|
||||||
|
matched = []
|
||||||
|
for attrname in testFnNames:
|
||||||
|
fullname = _format_test_name(testCaseClass, attrname)
|
||||||
|
if robj.search(fullname):
|
||||||
|
matched.append(attrname)
|
||||||
|
return matched
|
||||||
|
|
||||||
|
|
||||||
|
class Loader(object):
|
||||||
|
|
||||||
|
suiteClass = unittest.TestSuite
|
||||||
|
|
||||||
|
def __init__(self, modules, regex=None):
|
||||||
|
self.modules = modules
|
||||||
|
self.regex = regex
|
||||||
|
|
||||||
|
def load_all(self):
|
||||||
|
loader = RegexTestLoader(self.regex)
|
||||||
|
test_suite = self.suiteClass()
|
||||||
|
for module_name in self.modules:
|
||||||
|
__import__(module_name, level=0)
|
||||||
|
module = sys.modules[module_name]
|
||||||
|
module_suite = loader.loadTestsFromModule(module)
|
||||||
|
test_suite.addTest(module_suite)
|
||||||
|
return test_suite
|
||||||
|
|
||||||
|
def load_args(self, args):
|
||||||
|
loader = RegexTestLoader(self.regex)
|
||||||
|
|
||||||
|
suites = []
|
||||||
|
for arg in args:
|
||||||
|
suite = loader.loadTestsFromName(arg)
|
||||||
|
# loadTestsFromName() can only process names that refer to
|
||||||
|
# individual test functions or modules. It can't process package
|
||||||
|
# names. If there were no module/function matches, check to see if
|
||||||
|
# this looks like a package name.
|
||||||
|
if suite.countTestCases() != 0:
|
||||||
|
suites.append(suite)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Load all modules whose name is <arg>.<something>
|
||||||
|
prefix = arg + "."
|
||||||
|
for module in self.modules:
|
||||||
|
if module.startswith(prefix):
|
||||||
|
suite = loader.loadTestsFromName(module)
|
||||||
|
suites.append(suite)
|
||||||
|
|
||||||
|
return loader.suiteClass(suites)
|
||||||
|
|
||||||
|
|
||||||
|
_COVERAGE_INI = '''\
|
||||||
|
[report]
|
||||||
|
exclude_lines =
|
||||||
|
pragma: no cover
|
||||||
|
pragma: nocover
|
||||||
|
pragma:.*no${PLATFORM}
|
||||||
|
pragma:.*no${PY_IMPL}${PY_MAJOR}${PY_MINOR}
|
||||||
|
pragma:.*no${PY_IMPL}${PY_MAJOR}
|
||||||
|
pragma:.*nopy${PY_MAJOR}
|
||||||
|
pragma:.*nopy${PY_MAJOR}${PY_MINOR}
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
class MainProgram(object):
|
||||||
|
"""
|
||||||
|
This class implements the main program. It can be subclassed by
|
||||||
|
users who wish to customize some parts of the main program.
|
||||||
|
(Adding additional command line options, customizing test loading, etc.)
|
||||||
|
"""
|
||||||
|
|
||||||
|
DEFAULT_VERBOSITY = 2
|
||||||
|
|
||||||
|
def __init__(self, argv):
|
||||||
|
self.init_option_parser()
|
||||||
|
self.parse_options(argv)
|
||||||
|
self.setup_logging()
|
||||||
|
|
||||||
|
def init_option_parser(self):
|
||||||
|
usage = "%prog [options] [TEST] ..."
|
||||||
|
op = optparse.OptionParser(usage=usage, add_help_option=False)
|
||||||
|
self.option_parser = op
|
||||||
|
|
||||||
|
op.add_option(
|
||||||
|
"--hide-output",
|
||||||
|
dest="show_output",
|
||||||
|
action="store_false",
|
||||||
|
default=True,
|
||||||
|
help="Suppress data that tests print to stdout/stderr, and only "
|
||||||
|
"show it if the test fails.",
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"-o",
|
||||||
|
"--output",
|
||||||
|
help="Write results to a file in a JSON format to be read by Buck",
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"-f",
|
||||||
|
"--failfast",
|
||||||
|
action="store_true",
|
||||||
|
default=False,
|
||||||
|
help="Stop after the first failure",
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"-l",
|
||||||
|
"--list-tests",
|
||||||
|
action="store_true",
|
||||||
|
dest="list",
|
||||||
|
default=False,
|
||||||
|
help="List tests and exit",
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"-r",
|
||||||
|
"--regex",
|
||||||
|
default=None,
|
||||||
|
help="Regex to apply to tests, to only run those tests",
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"--collect-coverage",
|
||||||
|
action="store_true",
|
||||||
|
default=False,
|
||||||
|
help="Collect test coverage information",
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"--coverage-include",
|
||||||
|
default="*",
|
||||||
|
help='File globs to include in converage (split by ",")',
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"--coverage-omit",
|
||||||
|
default="",
|
||||||
|
help='File globs to omit from converage (split by ",")',
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"--logger",
|
||||||
|
action="append",
|
||||||
|
metavar="<category>=<level>",
|
||||||
|
default=[],
|
||||||
|
help="Configure log levels for specific logger categories",
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"-q",
|
||||||
|
"--quiet",
|
||||||
|
action="count",
|
||||||
|
default=0,
|
||||||
|
help="Decrease the verbosity (may be specified multiple times)",
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"-v",
|
||||||
|
"--verbosity",
|
||||||
|
action="count",
|
||||||
|
default=self.DEFAULT_VERBOSITY,
|
||||||
|
help="Increase the verbosity (may be specified multiple times)",
|
||||||
|
)
|
||||||
|
op.add_option(
|
||||||
|
"-?", "--help", action="help", help="Show this help message and exit"
|
||||||
|
)
|
||||||
|
|
||||||
|
def parse_options(self, argv):
|
||||||
|
self.options, self.test_args = self.option_parser.parse_args(argv[1:])
|
||||||
|
self.options.verbosity -= self.options.quiet
|
||||||
|
|
||||||
|
if self.options.collect_coverage and coverage is None:
|
||||||
|
self.option_parser.error("coverage module is not available")
|
||||||
|
self.options.coverage_include = self.options.coverage_include.split(",")
|
||||||
|
if self.options.coverage_omit == "":
|
||||||
|
self.options.coverage_omit = []
|
||||||
|
else:
|
||||||
|
self.options.coverage_omit = self.options.coverage_omit.split(",")
|
||||||
|
|
||||||
|
def setup_logging(self):
|
||||||
|
# Configure the root logger to log at INFO level.
|
||||||
|
# This is similar to logging.basicConfig(), but uses our
|
||||||
|
# StderrLogHandler instead of a StreamHandler.
|
||||||
|
fmt = logging.Formatter("%(pathname)s:%(lineno)s: %(message)s")
|
||||||
|
log_handler = StderrLogHandler()
|
||||||
|
log_handler.setFormatter(fmt)
|
||||||
|
root_logger = logging.getLogger()
|
||||||
|
root_logger.addHandler(log_handler)
|
||||||
|
root_logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
level_names = {
|
||||||
|
"debug": logging.DEBUG,
|
||||||
|
"info": logging.INFO,
|
||||||
|
"warn": logging.WARNING,
|
||||||
|
"warning": logging.WARNING,
|
||||||
|
"error": logging.ERROR,
|
||||||
|
"critical": logging.CRITICAL,
|
||||||
|
"fatal": logging.FATAL,
|
||||||
|
}
|
||||||
|
|
||||||
|
for value in self.options.logger:
|
||||||
|
parts = value.rsplit("=", 1)
|
||||||
|
if len(parts) != 2:
|
||||||
|
self.option_parser.error(
|
||||||
|
"--logger argument must be of the "
|
||||||
|
"form <name>=<level>: %s" % value
|
||||||
|
)
|
||||||
|
name = parts[0]
|
||||||
|
level_name = parts[1].lower()
|
||||||
|
level = level_names.get(level_name)
|
||||||
|
if level is None:
|
||||||
|
self.option_parser.error(
|
||||||
|
"invalid log level %r for log " "category %s" % (parts[1], name)
|
||||||
|
)
|
||||||
|
logging.getLogger(name).setLevel(level)
|
||||||
|
|
||||||
|
def create_loader(self):
|
||||||
|
import __test_modules__
|
||||||
|
|
||||||
|
return Loader(__test_modules__.TEST_MODULES, self.options.regex)
|
||||||
|
|
||||||
|
def load_tests(self):
|
||||||
|
loader = self.create_loader()
|
||||||
|
if self.options.collect_coverage:
|
||||||
|
self.start_coverage()
|
||||||
|
include = self.options.coverage_include
|
||||||
|
omit = self.options.coverage_omit
|
||||||
|
if include and "*" not in include:
|
||||||
|
optimize_for_coverage(self.cov, include, omit)
|
||||||
|
|
||||||
|
if self.test_args:
|
||||||
|
suite = loader.load_args(self.test_args)
|
||||||
|
else:
|
||||||
|
suite = loader.load_all()
|
||||||
|
if self.options.collect_coverage:
|
||||||
|
self.cov.start()
|
||||||
|
return suite
|
||||||
|
|
||||||
|
def get_tests(self, test_suite):
|
||||||
|
tests = []
|
||||||
|
|
||||||
|
for test in test_suite:
|
||||||
|
if isinstance(test, unittest.TestSuite):
|
||||||
|
tests.extend(self.get_tests(test))
|
||||||
|
else:
|
||||||
|
tests.append(test)
|
||||||
|
|
||||||
|
return tests
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
test_suite = self.load_tests()
|
||||||
|
|
||||||
|
if self.options.list:
|
||||||
|
for test in self.get_tests(test_suite):
|
||||||
|
method_name = getattr(test, "_testMethodName", "")
|
||||||
|
name = _format_test_name(test.__class__, method_name)
|
||||||
|
print(name)
|
||||||
|
return EXIT_CODE_SUCCESS
|
||||||
|
else:
|
||||||
|
result = self.run_tests(test_suite)
|
||||||
|
if self.options.output is not None:
|
||||||
|
with open(self.options.output, "w") as f:
|
||||||
|
json.dump(result.getResults(), f, indent=4, sort_keys=True)
|
||||||
|
if not result.wasSuccessful():
|
||||||
|
return EXIT_CODE_TEST_FAILURE
|
||||||
|
return EXIT_CODE_SUCCESS
|
||||||
|
|
||||||
|
def run_tests(self, test_suite):
|
||||||
|
# Install a signal handler to catch Ctrl-C and display the results
|
||||||
|
# (but only if running >2.6).
|
||||||
|
if sys.version_info[0] > 2 or sys.version_info[1] > 6:
|
||||||
|
unittest.installHandler()
|
||||||
|
|
||||||
|
# Run the tests
|
||||||
|
runner = BuckTestRunner(
|
||||||
|
self,
|
||||||
|
test_suite,
|
||||||
|
verbosity=self.options.verbosity,
|
||||||
|
show_output=self.options.show_output,
|
||||||
|
)
|
||||||
|
result = runner.run(test_suite)
|
||||||
|
|
||||||
|
if self.options.collect_coverage and self.options.show_output:
|
||||||
|
self.cov.stop()
|
||||||
|
try:
|
||||||
|
self.cov.report(file=sys.stdout)
|
||||||
|
except coverage.misc.CoverageException:
|
||||||
|
print("No lines were covered, potentially restricted by file filters")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get_abbr_impl(self):
|
||||||
|
"""Return abbreviated implementation name."""
|
||||||
|
impl = platform.python_implementation()
|
||||||
|
if impl == "PyPy":
|
||||||
|
return "pp"
|
||||||
|
elif impl == "Jython":
|
||||||
|
return "jy"
|
||||||
|
elif impl == "IronPython":
|
||||||
|
return "ip"
|
||||||
|
elif impl == "CPython":
|
||||||
|
return "cp"
|
||||||
|
else:
|
||||||
|
raise RuntimeError("unknown python runtime")
|
||||||
|
|
||||||
|
def start_coverage(self):
|
||||||
|
if not self.options.collect_coverage:
|
||||||
|
return
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile('w', delete=False) as coverage_ini:
|
||||||
|
coverage_ini.write(_COVERAGE_INI)
|
||||||
|
self._coverage_ini_path = coverage_ini.name
|
||||||
|
|
||||||
|
# Keep the original working dir in case tests use os.chdir
|
||||||
|
self._original_working_dir = os.getcwd()
|
||||||
|
|
||||||
|
# for coverage config ignores by platform/python version
|
||||||
|
os.environ["PLATFORM"] = sys.platform
|
||||||
|
os.environ["PY_IMPL"] = self.get_abbr_impl()
|
||||||
|
os.environ["PY_MAJOR"] = str(sys.version_info.major)
|
||||||
|
os.environ["PY_MINOR"] = str(sys.version_info.minor)
|
||||||
|
|
||||||
|
self.cov = coverage.Coverage(
|
||||||
|
include=self.options.coverage_include,
|
||||||
|
omit=self.options.coverage_omit,
|
||||||
|
config_file=coverage_ini.name,
|
||||||
|
)
|
||||||
|
self.cov.erase()
|
||||||
|
self.cov.start()
|
||||||
|
|
||||||
|
def get_coverage(self):
|
||||||
|
if not self.options.collect_coverage:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.remove(self._coverage_ini_path)
|
||||||
|
except OSError:
|
||||||
|
pass # Better to litter than to fail the test
|
||||||
|
|
||||||
|
# Switch back to the original working directory.
|
||||||
|
os.chdir(self._original_working_dir)
|
||||||
|
|
||||||
|
result = {}
|
||||||
|
|
||||||
|
self.cov.stop()
|
||||||
|
|
||||||
|
try:
|
||||||
|
f = StringIO()
|
||||||
|
self.cov.report(file=f)
|
||||||
|
lines = f.getvalue().split("\n")
|
||||||
|
except coverage.misc.CoverageException:
|
||||||
|
# Nothing was covered. That's fine by us
|
||||||
|
return result
|
||||||
|
|
||||||
|
# N.B.: the format of the coverage library's output differs
|
||||||
|
# depending on whether one or more files are in the results
|
||||||
|
for line in lines[2:]:
|
||||||
|
if line.strip("-") == "":
|
||||||
|
break
|
||||||
|
r = line.split()[0]
|
||||||
|
analysis = self.cov.analysis2(r)
|
||||||
|
covString = self.convert_to_diff_cov_str(analysis)
|
||||||
|
if covString:
|
||||||
|
result[r] = covString
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def convert_to_diff_cov_str(self, analysis):
|
||||||
|
# Info on the format of analysis:
|
||||||
|
# http://nedbatchelder.com/code/coverage/api.html
|
||||||
|
if not analysis:
|
||||||
|
return None
|
||||||
|
numLines = max(
|
||||||
|
analysis[1][-1] if len(analysis[1]) else 0,
|
||||||
|
analysis[2][-1] if len(analysis[2]) else 0,
|
||||||
|
analysis[3][-1] if len(analysis[3]) else 0,
|
||||||
|
)
|
||||||
|
lines = ["N"] * numLines
|
||||||
|
for l in analysis[1]:
|
||||||
|
lines[l - 1] = "C"
|
||||||
|
for l in analysis[2]:
|
||||||
|
lines[l - 1] = "X"
|
||||||
|
for l in analysis[3]:
|
||||||
|
lines[l - 1] = "U"
|
||||||
|
return "".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv):
|
||||||
|
return MainProgram(sys.argv).run()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main(sys.argv))
|
Reference in New Issue
Block a user