Source code for fact.tester

"""
Infrastructure for test case registration and reporting.
"""
import copy
import json
import logging
import os
import pathlib
from datetime import timedelta
from os import makedirs
from typing import Dict, Any, Optional, Union
from xml.etree import ElementTree as Et

import jsonschema
import ruamel.yaml

from fact._util import shorten_text, MAX_OUTPUT_CHAR
from fact.test_cases import TestCompile, TestIO, TestCodeStructure, TestGreyBox, TestStatus, \
    AbstractTest, TestGreyBoxC, ConfigurationError, TestOclint

_LINE_WIDTH = 80
_FACT_VERSION_STRING = "FACT-0.0.5"


[docs]def validate_test_config(config_file) -> bool: """ Returns whether the test configuration is valid and all prerequisites are met (e.g., makefiles are present). :return: True, if the test configuration is valid. """ file_contents = _load_config(config_file) schema_path = pathlib.Path(os.path.dirname(__file__)) / 'schemas' / 'config.schema.json' schema = json.loads(schema_path.read_text()) validator = jsonschema.Draft7Validator(schema) validation_errors = list(validator.iter_errors(file_contents)) if validation_errors: for error in validation_errors: logging.error(error) return False return True
def _load_config(config_file: Union[str, pathlib.Path]): """ Loads a yaml- config file. :param config_file: The yml-file :return: The contents of the yml-file as dict """ config = pathlib.Path(config_file) if not config.is_file(): raise ConfigurationError(f"The config file '{config_file}' is not present!") yaml = ruamel.yaml.YAML(typ='safe') conf_dict = yaml.load(config) return conf_dict
[docs]class NotExecutedError(Exception): """ Raised if the results are checked before they are computed. """
[docs]class TestCase: """ A single test case (e.g., test compilation or IO). """ __test__ = False stdout: str stderr: str tester_output: str name: str time: timedelta result: TestStatus message: str def __init__(self, name: str) -> None: """ :param name: Test name """ self.name = name self.stdout: str = "" self.stderr: str = "" self.tester_output: str = "" self.time: timedelta = timedelta() self.result: TestStatus = TestStatus.SUCCESS self.message: str = ""
[docs] def to_xml(self, suite: Et.Element, max_chars_per_output: int = MAX_OUTPUT_CHAR) -> None: """ Adds the results of the test to a XML test suite :param suite: XML representation of the test suite :param max_chars_per_output: Maximal numbers of characters for stdout and stderr of the test output :return: """ case: Et.Element = Et.SubElement(suite, "testcase") case.set("name", self.name) case.set("time", str(self.time.total_seconds())) if self.result != TestStatus.SUCCESS: result: Et.Element = Et.SubElement(case, str(self.result.value)) result.set("message", self.message) result.text = shorten_text(self.message, max_chars_per_output) if self.stdout: stdout: Et.Element = Et.SubElement(case, "system-out") stdout.text = shorten_text(self.stdout, max_chars_per_output) + "\n" if self.stderr: stderr: Et.Element = Et.SubElement(case, "system-err") stderr.text = shorten_text(self.stderr, max_chars_per_output) + "\n"
[docs]class TestSuite: """ Test suite comprising multiple test cases. """ __test__ = False cases: Dict[str, TestCase] name: str tests: int failures: int errors: int skipped: int successful: int time: timedelta def __init__(self, name: str) -> None: """ :param name: Test suite name """ self.name = name self.cases: Dict[str, TestCase] = {} self.tests: int = 0 self.failures: int = 0 self.errors: int = 0 self.skipped: int = 0 self.successful: int = 0 self.time: timedelta = timedelta()
[docs] def add_case(self, case: TestCase) -> None: """ Adds a test case to the test suite. :param case: Test case to add :return: None """ self.cases[case.name] = case self.tests += 1 self.time += case.time if case.result == TestStatus.ERROR: self.errors += 1 elif case.result == TestStatus.FAILURE: self.failures += 1 elif case.result == TestStatus.SKIPPED: self.skipped += 1 else: self.successful += 1
[docs] def to_xml(self) -> Et.Element: """ :return: Results of the test suite as a XML element """ suite: Et.Element = Et.Element("testsuite") suite.set("name", self.name) suite.set("tests", str(self.tests)) suite.set("failures", str(self.failures)) suite.set("errors", str(self.errors)) suite.set("skipped", str(self.skipped)) suite.set("time", str(self.time.total_seconds())) for _, case in self.cases.items(): case.to_xml(suite) return suite
[docs] def get_test_cases(self) -> Dict[str, TestCase]: """ Returns the test cases :return: Dict of str -> TestCase: Maps the test case name to a TestCase """ return self.cases
[docs]class Tester: """ Test runner used to add, execute tests and export the test results. """ __test__ = False name: str suite: TestSuite tests: Dict[str, Any] executed: bool def __init__(self, name: str = _FACT_VERSION_STRING, logging_level=logging.DEBUG): """ :param name: Name of the tester """ self.name = name self.suite = TestSuite(name) self.tests = {} self.logging_level = logging_level self.executed = False
[docs] @classmethod def from_config(cls, config_file: Union[str, pathlib.Path], name: str = _FACT_VERSION_STRING, logging_level=logging.DEBUG): """ Configure a tester with a given yaml-file. :param config_file: The yml-file :param name: The name of the tester :param logging_level: Debugging level used for printing on stdout :return: The Tester """ tester = cls(name, logging_level) conf_dict = _load_config(config_file) tester._map_config_to_test_types(conf_dict) return tester
[docs] @classmethod def from_dict(cls, conf_dict: Dict[str, Any], makefile_directory: Optional[str] = None, sourcecode_directory: Optional[str] = None, name: str = _FACT_VERSION_STRING, logging_level=logging.DEBUG): """ Configure a tester with a given dict. :param conf_dict: Dict containing the test configuration :param makefile_directory: The directory in which the Makefile resides :param sourcecode_directory: The directory in which the translation unit resides :param name: The name of the tester :param logging_level: Debugging level used for printing on stdout :return: The Tester """ tester = cls(name, logging_level) tester._map_config_to_test_types(conf_dict, makefile_directory, sourcecode_directory) return tester
def _map_config_to_test_types(self, config: Dict[str, Any], makefile_directory: Optional[str] = None, sourcecode_directory: Optional[str] = None): translation_unit = config.get('translation_unit', None) for test_config in config['tests']: test_config_cpy = self._setup_test_config(test_config, makefile_directory, sourcecode_directory, translation_unit) test = self._map_config_to_test(test_config_cpy) self.add_test(test) @staticmethod def _setup_test_config(test_config, makefile_directory, sourcecode_directory, translation_unit): test_config_cpy = copy.copy(test_config) if 'translation_unit' not in test_config_cpy and translation_unit is not None: test_config_cpy['translation_unit'] = translation_unit if makefile_directory is not None: test_config_cpy['makefile_directory'] = makefile_directory if sourcecode_directory is not None: test_config_cpy['sourcecode_directory'] = sourcecode_directory return test_config_cpy @staticmethod def _map_config_to_test(test_config: Dict[str, Any]) -> AbstractTest: test_type = test_config['type'] if test_type == 'compile': test = TestCompile.from_config(test_config) elif test_type == 'io': test = TestIO.from_config(test_config) elif test_type == 'structural': test = TestCodeStructure.from_config(test_config) elif test_type == 'grey_box': test = TestGreyBox.from_config(test_config) elif test_type == 'grey_box_c': test = TestGreyBoxC.from_config(test_config) elif test_type == 'oclint': test = TestOclint.from_config(test_config) else: raise ConfigurationError(f"Unknown test type '{test_type}'!") return test
[docs] def run(self) -> None: """ Starts the tester and runs all tests added via "add_test(test: AbstractTest)". """ logging.basicConfig(level=self.logging_level, format='[%(asctime)s %(filename)s] %(message)s') logging.info("Running: %s", self.name) test_results: Dict[str, TestStatus] = {} for name, test in self.tests.items(): logging.info(test.start_msg()) case = TestCase(test.test_name) if not self.__check_test_requirements(test, test_results): Tester.__skip_test(case, test) self.suite.add_case(case) continue test.start(case) self.suite.add_case(case) logging.info("Finished test case '%s' in %s seconds.", name, test.case.time.total_seconds()) test_results[name] = test.case.result self.__print_result() self.executed = True
@staticmethod def __skip_test(case, test): logging.info( "Skipping test case '%s' not all requirements (%s) are fulfilled", test.test_name, str(test.requirements)) case.block_id = f"Test was skipped! The test requires other test cases to succeed first " \ f"({', '.join(test.requirements)})." case.result = TestStatus.SKIPPED case.time = timedelta() @staticmethod def __check_test_requirements(test: AbstractTest, test_results: Dict[str, TestStatus]) -> bool: """ Checks if all requirements of the current test (e.g., other test cases were successful) are fulfilled. :param test_results: Test results of previous tests :return: True, if all requirements are fulfilled """ for req in test.requirements: if req not in test_results or test_results[req] is not TestStatus.SUCCESS: return False return True
[docs] def add_test(self, test: Any) -> None: """ Adds a new test that will be run once "run()" is invoked. :raise NameError: If the test_name of the provided test has already been added. :param test: Test to add :return: None """ if test.test_name in self.tests: raise NameError( f"Test '{test.test_name}' already registered. Test names should be unique!") self.tests[test.test_name] = test
def __print_result(self) -> None: """ Logs some test statistics. :return: None """ logging.info(" Result ".center(80, "=")) logging.info( "%s finished %s test cases in %s seconds.", self.name, len(self.tests), self.suite.time.total_seconds()) logging.info("SUCCESS: %s", self.suite.successful) logging.info("FAILED: %s", self.suite.failures) logging.info("ERROR: %s", self.suite.errors) logging.info("SKIPPED: %s", self.suite.skipped) logging.info("".center(80, "="))
[docs] def export_result(self, output_path: str = "../test-reports/tests-results.xml") -> None: # pylint: disable=line-too-long """ Exports the test results into a JUnit format and stores it at the given output_path. The JUnit format is based on [#]_. :param output_path: Path used to store the export :return: None .. [#] https://github.com/junit-team/junit5/blob/master/platform-tests/src/test/resources/jenkins-junit.xsd """ # pylint: enable=line-too-long suite_xml: Et.Element = self.suite.to_xml() tree: Et.ElementTree = Et.ElementTree(suite_xml) makedirs(pathlib.Path(output_path).parent, exist_ok=True) tree.write(output_path, xml_declaration=True)
[docs] def successful(self) -> Optional[bool]: """ Returns whether all tests were executed successfully. :return: True, if all tests could be executed successfully. If the tests were not yet executed None is returned. """ if not self.executed: raise NotExecutedError return self.suite.successful == len(self.tests)