diff --git a/.gitignore b/.gitignore index 97dbc43a2cee7..a4ef9d22bbbd5 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,6 @@ coverage.xml # ...except the templates. !/tools/run_python.ps1 !/tools/run_python_compiler.ps1 + +# Test runner previous run results for sorting the next run +__previous_test_run_results.json diff --git a/test/parallel_testsuite.py b/test/parallel_testsuite.py index c0908fc7427ff..6f35684e6b048 100644 --- a/test/parallel_testsuite.py +++ b/test/parallel_testsuite.py @@ -3,6 +3,7 @@ # University of Illinois/NCSA Open Source License. Both these licenses can be # found in the LICENSE file. +import json import multiprocessing import os import sys @@ -19,7 +20,12 @@ seen_class = set() -def run_test(test): +def run_test(test, failfast_event): + # If failfast mode is in effect and any of the tests have failed, + # and then we should abort executing further tests immediately. + if failfast_event is not None and failfast_event.is_set(): + return None + olddir = os.getcwd() result = BufferedParallelTestResult() temp_dir = tempfile.mkdtemp(prefix='emtest_') @@ -29,10 +35,16 @@ def run_test(test): seen_class.add(test.__class__) test.__class__.setUpClass() test(result) + + # Alert all other multiprocess pool runners that they need to stop executing further tests. + if failfast_event is not None and result.test_result not in ['success', 'skipped']: + failfast_event.set() except unittest.SkipTest as e: result.addSkip(test, e) except Exception as e: result.addError(test, e) + if failfast_event is not None: + failfast_event.set() # Before attempting to delete the tmp dir make sure the current # working directory is not within it. os.chdir(olddir) @@ -46,9 +58,11 @@ class ParallelTestSuite(unittest.BaseTestSuite): Creates worker threads, manages the task queue, and combines the results. """ - def __init__(self, max_cores): + def __init__(self, max_cores, options): super().__init__() self.max_cores = max_cores + self.failfast = options.failfast + self.failing_and_slow_first = options.failing_and_slow_first def addTest(self, test): super().addTest(test) @@ -61,12 +75,48 @@ def run(self, result): # inherited by the child process, but can lead to hard-to-debug windows-only # issues. # multiprocessing.set_start_method('spawn') - tests = list(self.reversed_tests()) + + # If we are running with --failing-and-slow-first, then the test list has been + # pre-sorted based on previous test run results. Otherwise run the tests in + # reverse alphabetical order. + tests = list(self if self.failing_and_slow_first else self.reversed_tests()) use_cores = cap_max_workers_in_pool(min(self.max_cores, len(tests), num_cores())) print('Using %s parallel test processes' % use_cores) - pool = multiprocessing.Pool(use_cores) - results = [pool.apply_async(run_test, (t,)) for t in tests] - results = [r.get() for r in results] + with multiprocessing.Manager() as manager: + pool = multiprocessing.Pool(use_cores) + failfast_event = manager.Event() if self.failfast else None + results = [pool.apply_async(run_test, (t, failfast_event)) for t in tests] + results = [r.get() for r in results] + results = [r for r in results if r is not None] + + try: + previous_test_run_results = json.load(open('__previous_test_run_results.json')) + except FileNotFoundError: + previous_test_run_results = {} + + if self.failing_and_slow_first: + for r in results: + # Save a test result record with the specific suite name (e.g. "core0.test_foo") + test_failed = r.test_result not in ['success', 'skipped'] + fail_frequency = previous_test_run_results[r.test_name]['fail_frequency'] if r.test_name in previous_test_run_results else int(test_failed) + fail_frequency = (fail_frequency + int(test_failed)) / 2 + previous_test_run_results[r.test_name] = { + 'result': r.test_result, + 'duration': r.test_duration, + 'fail_frequency': fail_frequency, + } + # Also save a test result record without suite name (e.g. just "test_foo"). This enables different suite runs to order tests + # for quick --failfast termination, in case a test fails in multiple suites + test_in_any_suite = r.test_name.split(' ')[0] + fail_frequency = previous_test_run_results[test_in_any_suite]['fail_frequency'] if test_in_any_suite in previous_test_run_results else int(test_failed) + fail_frequency = (fail_frequency + int(test_failed)) / 2 + previous_test_run_results[test_in_any_suite] = { + 'result': r.test_result, + 'duration': r.test_duration, + 'fail_frequency': fail_frequency, + } + + json.dump(previous_test_run_results, open('__previous_test_run_results.json', 'w'), indent=2) pool.close() pool.join() return self.combine_results(result, results) @@ -104,6 +154,8 @@ class BufferedParallelTestResult: def __init__(self): self.buffered_result = None self.test_duration = 0 + self.test_result = 'errored' + self.test_name = '' @property def test(self): @@ -122,6 +174,7 @@ def updateResult(self, result): result.core_time += self.test_duration def startTest(self, test): + self.test_name = str(test) self.start_time = time.perf_counter() def stopTest(self, test): @@ -134,28 +187,34 @@ def addSuccess(self, test): if hasattr(time, 'perf_counter'): print(test, '... ok (%.2fs)' % (self.calculateElapsed()), file=sys.stderr) self.buffered_result = BufferedTestSuccess(test) + self.test_result = 'success' def addExpectedFailure(self, test, err): if hasattr(time, 'perf_counter'): print(test, '... expected failure (%.2fs)' % (self.calculateElapsed()), file=sys.stderr) self.buffered_result = BufferedTestExpectedFailure(test, err) + self.test_result = 'expected failure' def addUnexpectedSuccess(self, test): if hasattr(time, 'perf_counter'): print(test, '... unexpected success (%.2fs)' % (self.calculateElapsed()), file=sys.stderr) self.buffered_result = BufferedTestUnexpectedSuccess(test) + self.test_result = 'unexpected success' def addSkip(self, test, reason): print(test, "... skipped '%s'" % reason, file=sys.stderr) self.buffered_result = BufferedTestSkip(test, reason) + self.test_result = 'skipped' def addFailure(self, test, err): print(test, '... FAIL', file=sys.stderr) self.buffered_result = BufferedTestFailure(test, err) + self.test_result = 'failed' def addError(self, test, err): print(test, '... ERROR', file=sys.stderr) self.buffered_result = BufferedTestError(test, err) + self.test_result = 'errored' class BufferedTestBase: diff --git a/test/runner.py b/test/runner.py index 74e9bb4f64c9d..ea206e7b973e1 100755 --- a/test/runner.py +++ b/test/runner.py @@ -21,6 +21,7 @@ import atexit import fnmatch import glob +import json import logging import math import operator @@ -30,6 +31,7 @@ import sys import unittest import time +from functools import cmp_to_key # Setup @@ -270,8 +272,75 @@ def error_on_legacy_suite_names(args): utils.exit_with_error('`%s` test suite has been replaced with `%s`', a, new) -def load_test_suites(args, modules, start_at, repeat): - found_start = not start_at +def create_test_run_sorter(failfast): + try: + previous_test_run_results = json.load(open('__previous_test_run_results.json')) + except FileNotFoundError: + previous_test_run_results = {} + + def read_approx_fail_freq(test_name): + if test_name in previous_test_run_results and 'fail_frequency' in previous_test_run_results[test_name]: + # Quantize the float value to relatively fine-grained buckets for sorting + return round(previous_test_run_results[test_name]['fail_frequency'] * 20) / 20 + return 0 + + def sort_tests_failing_and_slowest_first_comparator(x, y): + x = str(x) + y = str(y) + + # Look at the number of times this test has failed, and order by failures count first + # Only do this in --failfast, if we are looking to fail early. (otherwise sorting by last test run duration is more productive) + if failfast: + x_fail_freq = read_approx_fail_freq(x) + y_fail_freq = read_approx_fail_freq(y) + if x_fail_freq != y_fail_freq: + return y_fail_freq - x_fail_freq + + # Look at the number of times this test has failed overall in any other suite, and order by failures count first + x_fail_freq = read_approx_fail_freq(x.split(' ')[0]) + y_fail_freq = read_approx_fail_freq(y.split(' ')[0]) + if x_fail_freq != y_fail_freq: + return y_fail_freq - x_fail_freq + + if x in previous_test_run_results: + X = previous_test_run_results[x] + + # if test Y has not been run even once, run Y before X + if y not in previous_test_run_results: + return 1 + Y = previous_test_run_results[y] + + # If both X and Y have been run before, order the tests based on what the previous result was (failures first, skips very last) + # N.b. it is important to sandwich all skipped tests between fails and successes. This is to maximize the chances that when + # a failing test is detected, then the other cores will fail-fast as well. (successful tests are run slowest-first to help + # scheduling) + order_by_result = {'errored': 0, 'failed': 1, 'expected failure': 2, 'unexpected success': 3, 'skipped': 4, 'success': 5} + x_result = order_by_result[X['result']] + y_result = order_by_result[Y['result']] + if x_result != y_result: + return x_result - y_result + + # Finally, order by test duration from last run + if X['duration'] != Y['duration']: + if X['result'] == 'success': + # If both tests were successful tests, run the slower test first to improve parallelism + return Y['duration'] - X['duration'] + else: + # If both tests were failing tests, run the quicker test first to improve --failfast detection time + return X['duration'] - Y['duration'] + + # if test X has not been run even once, but Y has, run X before Y + if y in previous_test_run_results: + return -1 + + # Neither test have been run before, so run them in alphabetical order + return (x > y) - (x < y) + + return sort_tests_failing_and_slowest_first_comparator + + +def load_test_suites(args, modules, options): + found_start = not options.start_at loader = unittest.TestLoader() error_on_legacy_suite_names(args) @@ -291,20 +360,22 @@ def load_test_suites(args, modules, start_at, repeat): if names_in_module: loaded_tests = loader.loadTestsFromNames(sorted(names_in_module), m) tests = flattened_tests(loaded_tests) - suite = suite_for_module(m, tests) + suite = suite_for_module(m, tests, options) + if options.failing_and_slow_first: + tests = sorted(tests, key=cmp_to_key(create_test_run_sorter(options.failfast))) for test in tests: if not found_start: # Skip over tests until we find the start - if test.id().endswith(start_at): + if test.id().endswith(options.start_at): found_start = True else: continue - for _x in range(repeat): + for _x in range(options.repeat): total_tests += 1 suite.addTest(test) suites.append((m.__name__, suite)) if not found_start: - utils.exit_with_error(f'unable to find --start-at test: {start_at}') + utils.exit_with_error(f'unable to find --start-at test: {options.start_at}') if total_tests == 1 or parallel_testsuite.num_cores() == 1: # TODO: perhaps leave it at 2 if it was 2 before? common.EMTEST_SAVE_DIR = 1 @@ -318,13 +389,13 @@ def flattened_tests(loaded_tests): return tests -def suite_for_module(module, tests): +def suite_for_module(module, tests, options): suite_supported = module.__name__ in ('test_core', 'test_other', 'test_posixtest') if not common.EMTEST_SAVE_DIR and not shared.DEBUG: has_multiple_tests = len(tests) > 1 has_multiple_cores = parallel_testsuite.num_cores() > 1 if suite_supported and has_multiple_tests and has_multiple_cores: - return parallel_testsuite.ParallelTestSuite(len(tests)) + return parallel_testsuite.ParallelTestSuite(len(tests), options) return unittest.TestSuite() @@ -394,6 +465,7 @@ def parse_args(): help='Command to launch web browser in which to run browser tests.') parser.add_argument('tests', nargs='*') parser.add_argument('--failfast', action='store_true') + parser.add_argument('--failing-and-slow-first', action='store_true', help='Run failing tests first, then sorted by slowest first. Combine with --failfast for fast fail-early CI runs.') parser.add_argument('--start-at', metavar='NAME', help='Skip all tests up until ') parser.add_argument('--continue', dest='_continue', action='store_true', help='Resume from the last run test.' @@ -488,7 +560,7 @@ def prepend_default(arg): if os.path.exists(common.LAST_TEST): options.start_at = utils.read_file(common.LAST_TEST).strip() - suites, unmatched_tests = load_test_suites(tests, modules, options.start_at, options.repeat) + suites, unmatched_tests = load_test_suites(tests, modules, options) if unmatched_tests: print('ERROR: could not find the following tests: ' + ' '.join(unmatched_tests)) return 1