From e18be3cdb84371fc2cb68734b8f88a7e2f6687c1 Mon Sep 17 00:00:00 2001 From: Bret Naylor Date: Tue, 3 Feb 2026 13:43:39 -0500 Subject: [PATCH 1/2] fixing coverage --- testflo/cover.py | 199 ++++++++++++++++------------------------- testflo/discover.py | 2 +- testflo/isolatedrun.py | 36 ++++---- testflo/main.py | 62 ++++++------- testflo/mpirun.py | 18 ++-- testflo/runner.py | 57 ++++++------ testflo/test.py | 21 +++-- testflo/util.py | 65 +++++++++----- 8 files changed, 219 insertions(+), 241 deletions(-) diff --git a/testflo/cover.py b/testflo/cover.py index c88cde9..4e0470e 100644 --- a/testflo/cover.py +++ b/testflo/cover.py @@ -3,146 +3,99 @@ """ import os import sys -import shutil import webbrowser try: import coverage - from coverage.config import HandyConfigParser + from coverage.collector import Collector except ImportError: coverage = None -else: - coverage.process_startup() -# use to hold a global coverage obj -_coverobj = None +def setup_coverage(options): + """ + Programmatically initializes coverage for the current process. + Ensures absolute paths for temp-dir safety and avoids double-init. + """ + if not (options.coverage or options.coveragehtml): + return None -def _to_ini(lst): - if lst: - return ','.join(lst) - return '' + # Prevent double-initialization + if Collector._collectors: + return None + cover_dir = options.cover_dir or os.getcwd() + data_file = os.path.join(cover_dir, '.coverage') + cfg_file = os.path.join(cover_dir, '.coveragerc') -def _write_temp_config(options, rcfile): - """ - Read any .coveragerc file if it exists, and override parts of it then generate our temp config. - - Parameters - ---------- - options : cmd line options - Options from the command line parser. - rcfile : str - The name of our temporary coverage config file. - """ - tmp_cfg = { - 'run': { - 'branch': False, - 'parallel': True, - 'concurrency': 'multiprocessing', - }, - 'report': { - 'ignore_errors': True, - 'skip_empty': True, - 'sort': '-cover', - }, - 'html': { - 'skip_empty': True, - } - } + cov = coverage.Coverage( + config_file=cfg_file, + data_file=data_file, + data_suffix=True, + branch=options.cover_branch, + ) - if options.coverpkgs: - tmp_cfg['run']['source_pkgs'] = _to_ini(options.coverpkgs) + cov.config.ignore_errors = True + if sys.version_info >= (3, 13): + cov.set_option("run:core", "sysmon") + + if options.coverpkgs: + cov.set_option("run:source", options.coverpkgs) if options.cover_omits: - tmp_cfg['run']['omit'] = _to_ini(options.cover_omits) - tmp_cfg['report']['omit'] = _to_ini(options.cover_omits) + omits = cov.get_option("run:omit") + if omits: + omits.extend(options.cover_omits) + else: + omits = options.cover_omits + cov.set_option("run:omit", omits) + + cov.set_option("run:disable_warnings", ["module-not-imported", "no-data-collected", + "couldnt-parse"]) + cov.set_option("report:ignore_errors", True) + cov.set_option("report:sort", "-cover") + cov.set_option("report:exclude_lines", [ + "pragma: no cover", + "if __name__ == .__main__.:", + "raise NotImplementedError", + "def __repr__", + ]) + + return cov + + +def finalize_coverage(options, cov): + """ + Combines all parallel coverage files and generates an HTML report. + """ + if cov is None: + return - cfgparser = HandyConfigParser(our_file=True) + cov.save() - if os.path.isfile('.coveragerc'): - cfgparser.read(['.coveragerc']) + data_dir = options.cover_dir - cfgparser.read_dict(tmp_cfg) + # Combine all coverage files found in the data_dir + try: + cov.combine(data_paths=[data_dir]) + except coverage.exceptions.CoverageException as e: + print(f"Combining coverage files failed: {e}", file=sys.stderr) + return - with open(rcfile, 'w') as f: - cfgparser.write(f) + if options.coverage: + print("\n--- Coverage Summary ---") + cov.report(ignore_errors=True, skip_empty=True) + if options.coveragehtml: + html_dir = os.path.join(data_dir, 'htmlcov') + cov.html_report(directory=html_dir, ignore_errors=True, skip_empty=True, + show_contexts=options.dyn_contexts) + index_file = os.path.join(html_dir, 'index.html') + if sys.platform == 'darwin': + os.system('open %s' % index_file) + else: + webbrowser.get().open(index_file) -def setup_coverage(options): - global _coverobj - if _coverobj is None and (options.coverage or options.coveragehtml): - if not coverage: - raise RuntimeError("coverage has not been installed.") - if not options.coverpkgs: - raise RuntimeError("No packages specified for coverage. " - "Use the --coverpkg option to add a package.") - oldcov = os.path.join(os.getcwd(), '.coverage') - if os.path.isfile(oldcov): - os.remove(oldcov) - covdir = os.path.join(os.getcwd(), '_covdir') - if os.path.isdir('_covdir'): - shutil.rmtree('_covdir') - os.mkdir('_covdir') - os.environ['COVERAGE_RUN'] = 'true' - os.environ['COVERAGE_RCFILE'] = rcfile = os.path.join(covdir, '_coveragerc_') - os.environ['COVERAGE_FILE'] = covfile = os.path.join(covdir, '.coverage') - os.environ['COVERAGE_PROCESS_START'] = rcfile - _write_temp_config(options, rcfile) - _coverobj = coverage.Coverage(data_file=covfile, data_suffix=True, config_file=rcfile) - return _coverobj - -def start_coverage(): - if _coverobj: - _coverobj.start() - -def stop_coverage(): - if _coverobj: - _coverobj.stop() - -def save_coverage(): - if _coverobj: - _coverobj.save() - -def finalize_coverage(options): - if _coverobj and options.coverpkgs: - rank = 0 - if not options.nompi: - try: - from mpi4py import MPI - rank = MPI.COMM_WORLD.rank - except ImportError: - pass - if rank == 0: - from testflo.util import find_files, find_module - excl = lambda n: (n.startswith('test_') and n.endswith('.py')) or \ - n.startswith('__init__.') - dirs = [] - for n in options.coverpkgs: - if os.path.isdir(n): - dirs.append(n) - else: - path = find_module(n) - if path is None: - raise RuntimeError("Can't find module %s" % n) - dirs.append(os.path.dirname(path)) - - morfs = list(find_files(dirs, match='*.py', exclude=excl)) - - _coverobj.combine() - _coverobj.save() - - if options.coverage: - _coverobj.report(morfs=morfs) - else: - dname = '_html' - _coverobj.html_report(morfs=morfs, directory=dname) - outfile = os.path.join(os.getcwd(), dname, 'index.html') - - if sys.platform == 'darwin': - os.system('open %s' % outfile) - else: - webbrowser.get().open(outfile) - - shutil.copy(_coverobj.get_data().data_filename(), - os.path.join(os.getcwd(), '.coverage')) + print(f"\nHTML report generated at: {index_file}") + + return cov \ No newline at end of file diff --git a/testflo/discover.py b/testflo/discover.py index 4a59cb6..62ad4ca 100644 --- a/testflo/discover.py +++ b/testflo/discover.py @@ -61,7 +61,7 @@ def get_iter(self, input_iter): # process so that we can execute the module or class level setup/teardown # only once while impacting all of the tests in that group. new_tcase_groups = [] - for tcase, tests in self._tcase_fixture_groups.items(): + for tests in self._tcase_fixture_groups.values(): tests = sorted(tests, key=lambda t: t.spec) # mark the first and last tests so that we know when to diff --git a/testflo/isolatedrun.py b/testflo/isolatedrun.py index c7ff4b0..63760be 100644 --- a/testflo/isolatedrun.py +++ b/testflo/isolatedrun.py @@ -5,12 +5,6 @@ """ if __name__ == '__main__': - try: - import coverage - except ImportError: - coverage = None - else: - coverage.process_startup() import sys import os @@ -19,27 +13,31 @@ from testflo.test import Test from testflo.qman import get_client_queue from testflo.options import get_options + from testflo.cover import setup_coverage queue = get_client_queue() os.environ['TESTFLO_QUEUE'] = '' options = get_options() + test = None - try: - try: - test = Test(sys.argv[1], options) - test.nocapture = True # so we don't lose stdout - test.run() - except: - print(traceback.format_exc()) - test.status = 'FAIL' - test.err_msg = traceback.format_exc() + if options.coverage or options.coveragehtml: + cov = setup_coverage(options) + else: + cov = None + try: + test = Test(sys.argv[1], options) + test.nocapture = True # so we don't lose stdout + test.run(cov=cov) except: - test.err_msg = traceback.format_exc() test.status = 'FAIL' + test.err_msg = traceback.format_exc() + finally: + sys.stdout.flush() + sys.stderr.flush() - sys.stdout.flush() - sys.stderr.flush() + queue.put(test) - queue.put(test) + if cov is not None: + cov.save() diff --git a/testflo/main.py b/testflo/main.py index 4350b3d..52d883a 100644 --- a/testflo/main.py +++ b/testflo/main.py @@ -29,8 +29,6 @@ def get_iter(self, input_iter) import time import warnings import multiprocessing -import atexit -import shutil from fnmatch import fnmatch, fnmatchcase @@ -43,12 +41,15 @@ def get_iter(self, input_iter) from testflo.duration import DurationSummary from testflo.discover import TestDiscoverer from testflo.filters import TimeFilter, FailFilter +from testflo.cover import setup_coverage, finalize_coverage from testflo.util import read_config_file, read_test_file from testflo.options import get_options from testflo.qman import get_server_queue - -options = get_options() +try: + import coverage +except ImportError: + coverage = None def dryrun(input_iter): @@ -128,17 +129,8 @@ def main(args=None): # create one if it doesn't exist homedir = os.path.expanduser('~') rcfile = os.path.join(homedir, '.testflo') - if not os.path.isfile(rcfile): - with open(rcfile, 'w') as f: - f.write("""[testflo] -skip_dirs=site-packages, - dist-packages, - __pycache__, - build, - _build, - contrib -""") - read_config_file(rcfile, options) + if os.path.isfile(rcfile): + read_config_file(rcfile, options) if options.cfg: read_config_file(options.cfg, options) @@ -159,8 +151,14 @@ def main(args=None): if not tests: tests = [os.getcwd()] + always_skip = {'site-packages', 'dist-packages', '__pycache__', 'build', '_build', + '.pixi'} + def dir_exclude(d): base = os.path.basename(d) + if base in always_skip: + return True + for skip in options.skip_dirs: if fnmatch(base, skip): return True @@ -170,21 +168,21 @@ def dir_exclude(d): os.environ['TESTFLO_RUNNING'] = '1' if options.coverage or options.coveragehtml: - os.environ['TESTFLO_MAIN_PID'] = str(os.getpid()) - # some coverage files aren't written until atexit of their processes, so put our finalize - # routine in atexit before their Coverage._atexit methods are registered so ours will be - # executed *after* theirs. - def _finalize(): - if os.getpid() == int(os.environ.get('TESTFLO_MAIN_PID', '0')): - from testflo.cover import finalize_coverage - finalize_coverage(options) - # clean up the temporary dir where we store the interim coverage files from all - # of the processes. - if os.path.isdir('_covdir'): - shutil.rmtree('_covdir') - atexit.register(_finalize) - from testflo.cover import setup_coverage - setup_coverage(options) + cov_dir = options.cover_dir or os.getcwd() + options.cover_dir = os.path.abspath(cov_dir) + if options.cover_omits is None: + options.cover_omits = [] + options.cover_omits.append('*/testflo/*') + + if not coverage: + raise RuntimeError("coverage has not been installed.") + if not options.coverpkgs: + raise RuntimeError("No packages specified for coverage. " + "Use the --coverpkg option to add a package.") + + cov = setup_coverage(options) + else: + cov = None if options.noreport: report_file = open(os.devnull, 'a') @@ -235,7 +233,7 @@ def func_matcher(funcname): if options.pre_announce: options.num_procs = 1 - pipeline.append(ConcurrentTestRunner(options, queue).get_iter) + pipeline.append(ConcurrentTestRunner(options, queue, cov=cov).get_iter) if options.show_deprecations or options.deprecations_report: pipeline.append(DeprecationsReport(options).get_iter) @@ -274,6 +272,8 @@ def func_matcher(funcname): if manager is not None: manager.shutdown() + finalize_coverage(options, cov) + return retval diff --git a/testflo/mpirun.py b/testflo/mpirun.py index 044100f..cd5bfb8 100644 --- a/testflo/mpirun.py +++ b/testflo/mpirun.py @@ -6,12 +6,6 @@ """ if __name__ == '__main__': - try: - import coverage - except ImportError: - pass - else: - coverage.process_startup() import sys import os @@ -24,6 +18,7 @@ from testflo.test import Test from testflo.qman import get_client_queue from testflo.options import get_options + from testflo.cover import setup_coverage exitcode = 0 # use 0 for exit code of all ranks != 0 because otherwise, # MPI will terminate other processes @@ -32,13 +27,19 @@ os.environ['TESTFLO_QUEUE'] = '' options = get_options() + test = None + + if options.coverage or options.coveragehtml: + cov = setup_coverage(options) + else: + cov = None try: try: comm = MPI.COMM_WORLD test = Test(sys.argv[1], options) test.nocapture = True # so we don't lose stdout - test.run() + test.run(cov=cov) except: print(traceback.format_exc()) test.status = 'FAIL' @@ -71,3 +72,6 @@ if comm.rank == 0: queue.put(test) + + if cov is not None: + cov.save() diff --git a/testflo/runner.py b/testflo/runner.py index dfa39ff..9d91679 100644 --- a/testflo/runner.py +++ b/testflo/runner.py @@ -8,43 +8,45 @@ from multiprocessing import Queue, Process -from testflo.cover import save_coverage -from testflo.test import Test -from testflo.options import get_options +from testflo.cover import setup_coverage -def worker(test_queue, done_queue, subproc_queue, worker_id): - """This is used by concurrent test processes. It takes a test +def worker(test_queue, done_queue, subproc_queue, worker_id, options): + """This is used by concurrent test processes. It takes a Test object off of the test_queue, runs it, then puts the Test object on the done_queue. """ - test_count = 0 - for tests in iter(test_queue.get, 'STOP'): - done_tests = [] - for test in tests: - try: - test_count += 1 - done_tests.append(test.run(subproc_queue)) - except: - # we generally shouldn't get here, but just in case, - # handle it so that the main process doesn't hang at the - # end when it tries to join all of the concurrent processes. - done_tests.append(test) + cov = setup_coverage(options) - done_queue.put(done_tests) + test_count = 0 + try: + for tests in iter(test_queue.get, 'STOP'): - # don't save anything unless we actually ran a test - if test_count > 0: - save_coverage() + done_tests = [] + for test in tests: + try: + test_count += 1 + done_tests.append(test.run(subproc_queue, cov=cov)) + except: + # we generally shouldn't get here, but just in case, + # handle it so that the main process doesn't hang at the + # end when it tries to join all of the concurrent processes. + done_tests.append(test) + + done_queue.put(done_tests) + finally: + if cov: + cov.save() class TestRunner(object): - def __init__(self, options, subproc_queue): + def __init__(self, options, subproc_queue, cov): self.stop = options.stop self.pre_announce = options.pre_announce self._queue = subproc_queue + self.cov = cov def get_iter(self, input_iter): """Run tests serially.""" @@ -55,7 +57,7 @@ def get_iter(self, input_iter): if self.pre_announce: print(" about to run %s " % test.short_name(), end='') sys.stdout.flush() - result = test.run(self._queue) + result = test.run(self._queue, cov=self.cov) yield result if self.stop: if (result.status == 'FAIL' and not result.expected_fail) or ( @@ -65,16 +67,14 @@ def get_iter(self, input_iter): if stop: break - save_coverage() - class ConcurrentTestRunner(TestRunner): """TestRunner that uses the multiprocessing package to execute tests concurrently. """ - def __init__(self, options, subproc_queue): - super(ConcurrentTestRunner, self).__init__(options, subproc_queue) + def __init__(self, options, subproc_queue, cov): + super(ConcurrentTestRunner, self).__init__(options, subproc_queue, cov) self.num_procs = options.num_procs # only do concurrent stuff if num_procs > 1 @@ -84,7 +84,6 @@ def __init__(self, options, subproc_queue): # Create queues self.task_queue = Queue() self.done_queue = Queue() - self.procs = [] # Start worker processes @@ -93,7 +92,7 @@ def __init__(self, options, subproc_queue): self.procs.append(Process(target=worker, args=(self.task_queue, self.done_queue, subproc_queue, - worker_id))) + worker_id, options))) for proc in self.procs: proc.start() diff --git a/testflo/test.py b/testflo/test.py index 7a419db..af78604 100644 --- a/testflo/test.py +++ b/testflo/test.py @@ -16,8 +16,6 @@ from unittest import TestCase, SkipTest from unittest.case import _UnexpectedSuccess -from testflo.cover import start_coverage, stop_coverage - from testflo.util import get_module, ismethod, get_memory_usage, \ get_testpath, _options2args, _testing_path from testflo.utresult import UnitTestResult @@ -47,7 +45,12 @@ def __init__(self): @contextmanager -def testcontext(test): +def testcontext(test, cov): + if cov is not None: + cov.start() + if test.options.dyn_contexts: + cov.switch_context(test.spec) + global _testing_path old_sys_path = sys.path @@ -62,6 +65,8 @@ def testcontext(test): test.status = 'FAIL' test.err_msg = traceback.format_exc() finally: + if cov is not None: + cov.stop() sys.path = old_sys_path if 'TESTFLO_SPEC' in os.environ: del os.environ['TESTFLO_SPEC'] @@ -113,7 +118,7 @@ def _get_test_info(self): """Get the test's module, testcase (if any), function name, N_PROCS (for mpi tests) and ISOLATED and set our attributes. """ - with testcontext(self): + with testcontext(self, None): try: mod, self.tcasename, self.funcname = _parse_test_path(self.spec) self.modpath = mod.__name__ @@ -203,7 +208,7 @@ def _run_mpi(self, queue): cmd = [mpirun_exe, '-n', str(self.nprocs), sys.executable, os.path.join(os.path.dirname(__file__), 'mpirun.py'), - self.spec] + _options2args() + self.spec] + _options2args(self.options) result = self._run_subproc(cmd, queue, os.environ) @@ -220,7 +225,7 @@ def _run_mpi(self, queue): return result - def run(self, queue=None): + def run(self, queue=None, cov=None): """Runs the test, assuming status is not already known.""" if self.status is not None: # premature failure occurred (or dry run), just return @@ -238,7 +243,7 @@ def run(self, queue=None): elif self.options.isolated: return self._run_isolated(queue) - with testcontext(self): + with testcontext(self, cov): testpath, _ = get_testpath(self.spec) _, mod = get_module(testpath) @@ -285,8 +290,6 @@ def run(self, queue=None): sys.stdout = outstream sys.stderr = errstream - start_coverage() - self.start_time = time.perf_counter() catch_deps = self.options.show_deprecations or self.options.deprecations_report diff --git a/testflo/util.py b/testflo/util.py index 08972ed..ebab110 100644 --- a/testflo/util.py +++ b/testflo/util.py @@ -94,7 +94,13 @@ def _get_parser(): parser.add_argument('--cover-omit', action='append', dest='cover_omits', metavar='FILE', help="Add a file name pattern to remove it from coverage.") - + parser.add_argument('--cover-branch', action='store_true', dest='cover_branch', + help="Enable branch coverage (more detailed but slower). Default is False.") + parser.add_argument('--cover-dir', action='store', dest='cover_dir', + metavar='DIR', help="Specify the coverage dir to use. Default is the " + "current working directory.") + parser.add_argument('--cover-contexts', action='store_true', dest='dyn_contexts', + help="Record which tests hit which lines (increases overhead)."), parser.add_argument('-b', '--benchmark', action='store_true', dest='benchmark', help='Specifies that benchmarks are to be run rather ' 'than tests, so only files starting with "benchmark_" ' @@ -152,31 +158,46 @@ def _get_parser(): return parser -def _options2args(): - """Gets the testflo args that should be used in subprocesses.""" +def _options2args(options): + """Gets the testflo args that should be passed to subprocesses.""" + + store_args = set([ + 'coverage_dir', + ]) - cmdset = set([ - '--nocapture', - '-s', - '--coverpkg', - '--coverage', - '--coverage-html', - '--cover-omit', + multi_args = set([ + 'coverpkg', + 'cover_omits', ]) + store_true_args = set([ + 'coverage', + 'coveragehtml', + 'nocapture', + 'cover_branch', + 'dyn_contexts', + ]) + + all_args = store_args | store_true_args | multi_args + + parser = _get_parser() + dest_to_flags = { + action.dest: (action.option_strings, action.default) for action in parser._actions + if action.dest in all_args + } + keep = [] - i = 0 - args = sys.argv[1:] - argslen = len(args) - while i < argslen: - arg = args[i] - if arg.split('=',1)[0] in cmdset: - keep.append(arg) - if ((arg.startswith('--coverpkg') or arg.startswith('--cover-omit')) - and '=' not in arg): - i += 1 - keep.append(args[i]) - i += 1 + for dest, (flags, default) in dest_to_flags.items(): + opt = getattr(options, dest) + if dest in store_args: + if opt != default: + keep.append(f"{flags[0]}={opt}") + elif dest in multi_args: + if opt: + for oparg in opt: + keep.append(f"{flags[0]}={oparg}") + elif dest in store_true_args and opt: + keep.append(flags[0]) return keep From ec3ba3e40e39da0a2df247f8f8b34a5f6b4ff806 Mon Sep 17 00:00:00 2001 From: Bret Naylor Date: Tue, 3 Feb 2026 13:45:40 -0500 Subject: [PATCH 2/2] bump version --- testflo/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testflo/__init__.py b/testflo/__init__.py index 4109376..bfe6266 100644 --- a/testflo/__init__.py +++ b/testflo/__init__.py @@ -1 +1 @@ -__version__ = '1.4.21-dev' +__version__ = '1.4.21'