From 308dec67659a6d92a81c4aff838b1c977855f1b7 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Sat, 27 Jun 2026 11:28:54 +0800 Subject: [PATCH] fix unit test coverage reporting --- test/run_all_test.py | 162 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 147 insertions(+), 15 deletions(-) diff --git a/test/run_all_test.py b/test/run_all_test.py index e4c47a2e0..0c498d311 100644 --- a/test/run_all_test.py +++ b/test/run_all_test.py @@ -1,6 +1,7 @@ import importlib.util import logging import os +import re import subprocess import sys from concurrent.futures import ThreadPoolExecutor, as_completed @@ -80,6 +81,20 @@ def _collect_test_files(project_root: Path) -> list[Path]: return sorted({path.resolve() for path in test_files}) +def _coverage_command(*args: str) -> list[str]: + cmd = [sys.executable, "-m", "coverage"] + cmd.extend(args) + return cmd + + +def _coverage_env(project_root: Path) -> dict[str, str]: + env = os.environ.copy() + cov_config = project_root / "test" / ".coveragerc" + if cov_config.exists(): + env["COVERAGE_RCFILE"] = str(cov_config) + return env + + def _run_test_file( *, index: int, @@ -112,6 +127,8 @@ def _run_test_file( path_separator = ";" if sys.platform == "win32" else ":" env["PYTHONPATH"] = f"{project_root}{path_separator}{env.get('PYTHONPATH', '')}" env["COVERAGE_FILE"] = str(coverage_file) + if cov_config.exists(): + env["COVERAGE_PROCESS_START"] = str(cov_config) try: result = subprocess.run( @@ -142,6 +159,30 @@ def _run_test_file( } +def _parse_test_counts(result: dict) -> dict[str, int]: + counts = {"total": 0, "passed": 0, "failed": 0} + summary_lines = [ + line.strip() + for line in result["stdout"].splitlines() + if " in " in line and any(word in line for word in (" passed", " failed", " error", " errors")) + ] + if summary_lines: + summary = summary_lines[-1] + for key in ("passed", "failed"): + match = re.search(rf"(\d+)\s+{key}\b", summary) + if match: + counts[key] = int(match.group(1)) + error_match = re.search(r"(\d+)\s+errors?\b", summary) + if error_match: + counts["failed"] += int(error_match.group(1)) + counts["total"] = counts["passed"] + counts["failed"] + + if result["returncode"] != 0 and counts["failed"] == 0: + counts["failed"] = 1 + counts["total"] = max(counts["total"], 1) + return counts + + def _print_file_result(result: dict) -> None: summary = "execution failed" for line in reversed(result["stdout"].splitlines()): @@ -155,40 +196,105 @@ def _print_file_result(result: dict) -> None: logger.info("%-60s %s | %s", result["file"], status, summary) +def _print_test_summary(results: list[dict]) -> None: + total_tests = 0 + passed_tests = 0 + failed_tests = 0 + + logger.info("\n%s", "=" * 60) + logger.info("Test Summary") + logger.info("=" * 60) + for result in sorted(results, key=lambda item: item["file"]): + status = "PASSED" if result["returncode"] == 0 else "FAILED" + logger.info("%s - %s", status, result["file"]) + counts = _parse_test_counts(result) + total_tests += counts["total"] + passed_tests += counts["passed"] + failed_tests += counts["failed"] + + pass_rate = (passed_tests / total_tests * 100) if total_tests else 0 + logger.info("\nTest Results:") + logger.info(" Total Tests: %s", total_tests) + logger.info(" Passed: %s", passed_tests) + logger.info(" Failed: %s", failed_tests) + logger.info(" Pass Rate: %.1f%%", pass_rate) + + +def generate_error_report(results: list[dict]) -> None: + failed_results = [result for result in results if result["returncode"] != 0] + if not failed_results: + return + + logger.info("\n%s", "=" * 60) + logger.info("Test Error Report") + logger.info("=" * 60) + for index, result in enumerate(failed_results, start=1): + output = "\n".join(part for part in (result["stdout"], result["stderr"]) if part) + logger.info("\n%s. File: %s", index, result["file"]) + logger.info("-" * 40) + + error_lines: list[str] = [] + capture_error = False + for line in output.splitlines(): + stripped = line.strip() + if stripped.startswith("=") and ("ERROR" in line or "FAIL" in line): + capture_error = True + error_lines.append(line) + elif stripped.startswith("=== short test summary"): + error_lines.append(line) + break + elif capture_error: + error_lines.append(line) + + if not error_lines: + capture_traceback = False + for line in output.splitlines(): + if "Traceback" in line: + capture_traceback = True + if capture_traceback: + error_lines.append(line) + if len(error_lines) > 15: + error_lines.append("... (truncated) ...") + break + + if not error_lines: + output_lines = output.splitlines() + error_lines = output_lines[-10:] if len(output_lines) > 10 else output_lines + + for line in error_lines: + logger.info(line) + + logger.info("\n%s", "=" * 60) + logger.info("Total failed test files: %s", len(failed_results)) + logger.info("=" * 60) + + def _combine_coverage(current_dir: Path, project_root: Path) -> bool: coverage_data_file = current_dir / ".coverage" coverage_xml_file = current_dir / "coverage.xml" - cov_config = current_dir / ".coveragerc" for path in (coverage_data_file, coverage_xml_file): if path.exists(): path.unlink() - combine_cmd = [ - sys.executable, - "-m", - "coverage", + combine_cmd = _coverage_command( "combine", "--data-file", str(coverage_data_file), str(current_dir), - ] - xml_cmd = [ - sys.executable, - "-m", - "coverage", + ) + xml_cmd = _coverage_command( "xml", "-o", str(coverage_xml_file), "--data-file", str(coverage_data_file), - ] - if cov_config.exists(): - xml_cmd[4:4] = ["--rcfile=test/.coveragerc"] - combine = subprocess.run(combine_cmd, cwd=project_root, text=True, capture_output=True) + ) + coverage_env = _coverage_env(project_root) + combine = subprocess.run(combine_cmd, cwd=project_root, env=coverage_env, text=True, capture_output=True) if combine.returncode != 0: logger.error("Coverage combine failed:\n%s\n%s", combine.stdout, combine.stderr) return False - xml = subprocess.run(xml_cmd, cwd=project_root, text=True, capture_output=True) + xml = subprocess.run(xml_cmd, cwd=project_root, env=coverage_env, text=True, capture_output=True) if xml.returncode != 0: logger.error("Coverage XML generation failed:\n%s\n%s", xml.stdout, xml.stderr) return False @@ -196,6 +302,28 @@ def _combine_coverage(current_dir: Path, project_root: Path) -> bool: return True +def _report_coverage(current_dir: Path) -> bool: + coverage_data_file = current_dir / ".coverage" + cov_config = current_dir / ".coveragerc" + try: + import coverage + + cov = coverage.Coverage( + data_file=str(coverage_data_file), + config_file=str(cov_config) if cov_config.exists() else True, + ) + cov.load() + total_coverage = cov.report(show_missing=True) + logger.info("\nTotal Coverage: %.1f%%", total_coverage) + html_dir = current_dir / "coverage_html" + cov.html_report(directory=str(html_dir)) + logger.info("\nHTML coverage report generated in: %s", html_dir) + except Exception as exc: + logger.error("Coverage report failed: %s", exc) + return False + return True + + def run_tests() -> bool: current_dir = Path(__file__).resolve().parent project_root = current_dir.parent @@ -244,13 +372,17 @@ def run_tests() -> bool: results.append(result) _print_file_result(result) + _print_test_summary(results) failed = [result for result in results if result["returncode"] != 0] if failed: logger.error("\nFailed test files: %s", len(failed)) for result in failed[:10]: logger.error("\n%s\n%s\n%s", result["file"], result["stdout"][-4000:], result["stderr"][-2000:]) + generate_error_report(results) coverage_ok = _combine_coverage(current_dir, project_root) + if coverage_ok: + coverage_ok = _report_coverage(current_dir) return not failed and coverage_ok