Skip to content
124 changes: 124 additions & 0 deletions tools/evaluation/extract_evaluation_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""
Dynamatic evaluation data extraction script

Extracts key metrics from evaluation data and writes it to a CSV file.
"""

import argparse
import csv
import re
import sys
from pathlib import Path


def parse_sim_report(path: Path):
"""Return (cycle_count, passed) from a simulation report.txt."""
text = path.read_text()
passed = "C and VHDL outputs match" in text
m = re.search(r"Simulation done!\s+Latency\s*=\s*(\d+)\s+cycles", text)
cycle_count = int(m.group(1)) if m else None
return cycle_count, passed


def parse_utilization(path: Path):
"""Return (slices, luts, ffs) from a utilization report."""
slices = luts = ffs = None
for line in path.read_text().splitlines():
# Match table rows like "| Slice LUTs | 4029 | ..."
# Use the most-specific patterns first.
if m := re.match(r"\|\s*Slice LUTs\s*\|\s*(\d+)", line):
luts = int(m.group(1))
elif m := re.match(r"\|\s*Slice Registers\s*\|\s*(\d+)", line):
ffs = int(m.group(1))
elif m := re.match(r"\|\s*Slice\s*\|\s*(\d+)", line):
slices = int(m.group(1))
return slices, luts, ffs


def parse_timing(path: Path):
"""Return (cp_ns, slack_ns, cp_src, cp_dst) from a timing report."""
cp_ns = slack_ns = cp_src = cp_dst = None
for line in path.read_text().splitlines():
if m := re.match(r"\s*Slack\s*\([^)]*\)\s*:\s*(-?[\d.]+)ns", line):
slack_ns = float(m.group(1))
elif m := re.match(r"\s*Data Path Delay:\s*([\d.]+)ns", line):
cp_ns = float(m.group(1))
elif m := re.match(r"\s*Source:\s*(\S+)", line):
cp_src = m.group(1)
elif m := re.match(r"\s*Destination:\s*(\S+)", line):
cp_dst = m.group(1)
return cp_ns, slack_ns, cp_src, cp_dst


def main():
parser = argparse.ArgumentParser(
description="Extract evaluation metrics from run_evaluation.py output directory."
)
parser.add_argument("eval_dir", help="Path to the evaluation output directory")
args = parser.parse_args()

eval_dir = Path(args.eval_dir)
if not eval_dir.is_dir():
print(f"Error: {eval_dir} is not a directory", file=sys.stderr)
sys.exit(1)

columns = [
"kernel",
"cycle_count",
"utilization_slice",
"utilization_lut",
"utilization_ff",
"timing_cp_ns",
"timing_slack_ns",
"timing_cp_src",
"timing_cp_dst",
]

writer = csv.DictWriter(sys.stdout, fieldnames=columns)
writer.writeheader()

for kernel_dir in sorted(eval_dir.iterdir()):
if not kernel_dir.is_dir():
continue
kernel = kernel_dir.name

row: dict = {"kernel": kernel}

# Simulation report
sim_report = kernel_dir / "out" / "sim" / "report.txt"
if not sim_report.exists():
print(f"Error: {sim_report} not found", file=sys.stderr)
sys.exit(1)
cycle_count, passed = parse_sim_report(sim_report)
if not passed:
print(f"Error: {kernel}: C and VHDL outputs do not match", file=sys.stderr)
sys.exit(1)
row["cycle_count"] = cycle_count

# Utilization report
util_rpt = kernel_dir / "out" / "synth" / "utilization_post_pr.rpt"
if not util_rpt.exists():
print(f"Error: {util_rpt} not found", file=sys.stderr)
sys.exit(1)
slices, luts, ffs = parse_utilization(util_rpt)
row["utilization_slice"] = slices
row["utilization_lut"] = luts
row["utilization_ff"] = ffs

# Timing report
timing_rpt = kernel_dir / "out" / "synth" / "timing_post_pr.rpt"
if not timing_rpt.exists():
print(f"Error: {timing_rpt} not found", file=sys.stderr)
sys.exit(1)
cp_ns, slack_ns, cp_src, cp_dst = parse_timing(timing_rpt)
row["timing_cp_ns"] = cp_ns
row["timing_slack_ns"] = slack_ns
row["timing_cp_src"] = cp_src
row["timing_cp_dst"] = cp_dst

writer.writerow(row)


if __name__ == "__main__":
main()
226 changes: 226 additions & 0 deletions tools/evaluation/run_evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
Dynamatic performance/resource-usage evaluation script.

Builds Dynamatic, then runs the embedded .dyn script for each kernel,
collecting exit codes and logging failures. Supports parallel execution through
the -j flag.
"""

import argparse
import logging
import shutil
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

# ──────────────────────────────────────────────────────────────────────────────
# Kernel list
# ──────────────────────────────────────────────────────────────────────────────
KERNELS = [
"atax",
"atax_float",
"bicg",
"bicg_float",
"covariance",
"gaussian",
"gemver",
"gemver_float",
"get_tanh",
"histogram",
"insertion_sort",
"jacobi_1d_imper",
"kernel_2mm",
"kernel_2mm_float",
"kernel_3mm",
"kernel_3mm_float",
"kmp",
"loop_array",
"lu",
"matching",
"matching_2",
"matrix_power",
"pivot",
"polyn_mult",
"symm_float",
"syr2k_float",
"threshold",
"triangular",
"while_loop_1",
]

# ──────────────────────────────────────────────────────────────────────────────
# .dyn script template: {src} is substituted with the kernel source path
# ──────────────────────────────────────────────────────────────────────────────
DYN_SCRIPT = """\
set-src {src}
set-clock-period 5
compile --buffer-algorithm fpga20
write-hdl --hdl vhdl
simulate
synthesize
exit
"""

# ──────────────────────────────────────────────────────────────────────────────
# Paths
# ──────────────────────────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parents[2]


def setup_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
stream=sys.stdout,
)


def build() -> None:
"""Run ninja to build Dynamatic."""
logging.info("Building Dynamatic...")
result = subprocess.run(
["ninja", "-C", "build"],
cwd=REPO_ROOT,
)
if result.returncode != 0:
logging.error("Build failed with exit code %d. Aborting...", result.returncode)
sys.exit(result.returncode)
logging.info("Build succeeded.")


def run_kernel(kernel: str) -> tuple[str, str | None]:
"""
Run the .dyn script for *kernel*, writing stdout/stderr directly to
integration-test/{kernel}/out/dynamatic_{out,err}.txt, and return
(kernel, failure_reason). failure_reason is None on success.
"""
logging.info("Running kernel %s...", kernel)
src = f"integration-test/{kernel}/{kernel}.c"
script = DYN_SCRIPT.format(src=src)

out_dir = REPO_ROOT / "integration-test" / kernel / "out"
# ensure a clean output directory for the kernel
if out_dir.exists():
shutil.rmtree(out_dir)
out_dir.mkdir(parents=True)

out_path = out_dir / "dynamatic_out.txt"
err_path = out_dir / "dynamatic_err.txt"

with open(out_path, "w") as out_f, open(err_path, "w") as err_f:
result = subprocess.run(
["bin/dynamatic"],
input=script,
text=True,
stdout=out_f,
stderr=err_f,
cwd=REPO_ROOT,
)

if result.returncode != 0:
reason = f"exit code {result.returncode}"
logging.error("[FAIL] %s (%s)", kernel, reason)
return kernel, reason

# Check stdout for FATAL messages
if "FATAL" in out_path.read_text(errors="replace"):
reason = "FATAL in stdout"
logging.error("[FAIL] %s (%s)", kernel, reason)
return kernel, reason

# Check simulation report
report_path = out_dir / "sim" / "report.txt"
if not report_path.exists():
reason = "sim/report.txt not found"
logging.error("[FAIL] %s (%s)", kernel, reason)
return kernel, reason

report_text = report_path.read_text(errors="replace")
if "C and VHDL outputs match" not in report_text:
reason = 'sim/report.txt missing "C and VHDL outputs match"'
logging.error("[FAIL] %s (%s)", kernel, reason)
return kernel, reason

logging.info("[PASS] %s", kernel)
return kernel, None


def main() -> None:
setup_logging()

parser = argparse.ArgumentParser(
description="Run Dynamatic evaluation for a list of kernels."
)
parser.add_argument(
"-j",
"--jobs",
type=int,
default=1,
metavar="JOBS",
help="Number of kernels to run in parallel (default: 1).",
)
parser.add_argument(
"-o",
"--output-dir",
type=Path,
default=None,
metavar="DIR",
help="Copy kernel out/ directories to DIR/{kernel}/out after each run.",
)
args = parser.parse_args()

if args.output_dir is not None:
if args.output_dir.exists():
logging.error("Output directory %s already exists. Aborting...", args.output_dir)
sys.exit(1)
args.output_dir.mkdir(parents=True)

build()

logging.info("Running %d kernel(s) with %d parallel job(s)...", len(KERNELS), args.jobs)
start_time = time.time()

passed: list[str] = []
failed: list[tuple[str, str | None]] = []

with ThreadPoolExecutor(max_workers=args.jobs) as executor:
futures = {executor.submit(run_kernel, k): k for k in KERNELS}
for future in as_completed(futures):
kernel, failure_reason = future.result()
if failure_reason is None:
passed.append(kernel)
else:
failed.append((kernel, failure_reason))

if args.output_dir is not None:
src = REPO_ROOT / "integration-test" / kernel / "out"
dst = args.output_dir / kernel / "out"
shutil.move(src, dst)

# Summary
elapsed = int(time.time() - start_time) // 60
hours = elapsed // 60
minutes = elapsed % 60
logging.info("Total time: %dh %02dm.", hours, minutes)

total = len(KERNELS)
logging.info(
"Results: %d/%d passed, %d failed.",
len(passed),
total,
len(failed),
)
if failed:
logging.error(
"Failed kernels: %s",
", ".join(f"{k} ({reason})" for k, reason in sorted(failed)),
)
sys.exit(1)


if __name__ == "__main__":
main()