Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions roar/application/run/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,17 @@ def execute_and_report(
return 1

presenter = ConsolePresenter()
report = RunReportPresenter(presenter)
report.show_report(result, command, quiet)
report = RunReportPresenter(presenter, quiet=quiet)

# The coordinator already emitted the lifecycle lines (trace start/end,
# hashing spinner, lineage captured). Here we emit the summary block and
# the final "done" line.
report.summary(result, command)
report.done(
exit_code=result.exit_code,
trace_duration=result.duration,
post_duration=result.post_duration,
)

if result.stale_upstream or result.stale_downstream:
report.show_stale_warnings(
Expand Down
17 changes: 17 additions & 0 deletions roar/core/models/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TracerResult(ImmutableModel):
tracer_log_path: Annotated[str, Field(min_length=1)]
inject_log_path: str
interrupted: bool = False
backend: str | None = None # "ebpf" | "preload" | "ptrace"


class RunContext(RoarBaseModel):
Expand Down Expand Up @@ -109,6 +110,22 @@ class RunResult(ImmutableModel):
is_build: bool = False
stale_upstream: list[int] = Field(default_factory=list)
stale_downstream: list[int] = Field(default_factory=list)
# UX metadata for the new run presenter. Optional so callers that build
# RunResult without these fields (older code paths, error cases) still work.
backend: str | None = None
post_duration: float = 0.0
proxy_active: bool = False
pip_count: int = 0
dpkg_count: int = 0
env_count: int = 0
git_branch: str | None = None
git_short_commit: str | None = None
git_clean: bool = True
total_hash_bytes: int = 0
hash_duration: float = 0.0
dag_jobs: int = 0
dag_artifacts: int = 0
dag_depth: int = 0

@computed_field # type: ignore[prop-decorator]
@property
Expand Down
187 changes: 154 additions & 33 deletions roar/execution/runtime/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import os
import secrets
import subprocess
import sys
import time
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -109,6 +110,12 @@ def execute(self, ctx: RunContext) -> RunResult:
# Backup previous outputs if reversibility is enabled
self._backup_previous_outputs(ctx)

# UX presenter for lifecycle output (stderr). Pipes through to existing
# IPresenter for any legacy print_error calls.
from ...presenters.run_report import RunReportPresenter

run_presenter = RunReportPresenter(self.presenter, quiet=ctx.quiet)

# Start proxy if configured
proxy_handle = None
extra_env: dict[str, str] = {
Expand Down Expand Up @@ -151,6 +158,25 @@ def stop_proxy_if_running() -> list:
# Execute via tracer
from ...core.exceptions import TracerNotFoundError

# Resolve the backend name before execution so the trace_starting
# line can show the actual tracer, not "auto". Mirror the same
# resolution logic that execute() uses (config → override → auto).
proxy_active = proxy_handle is not None
resolved_mode: str | None = None
try:
mode = ctx.tracer_mode or self._tracer._get_tracer_mode()
fallback = (
ctx.tracer_fallback
if ctx.tracer_fallback is not None
else self._tracer._get_fallback_enabled()
)
candidates = self._tracer._get_tracer_candidates(mode, fallback)
if candidates:
resolved_mode = candidates[0][0]
except Exception:
pass
run_presenter.trace_starting(resolved_mode or ctx.tracer_mode, proxy_active)

self.logger.debug("Starting tracer execution")
try:
tracer_result = self._tracer.execute(
Expand All @@ -162,6 +188,7 @@ def stop_proxy_if_running() -> list:
tracer_mode_override=ctx.tracer_mode,
fallback_enabled_override=ctx.tracer_fallback,
)
run_presenter.trace_ended(tracer_result.duration, tracer_result.exit_code)
self.logger.debug(
"Tracer completed: exit_code=%d, duration=%.2fs, interrupted=%s",
tracer_result.exit_code,
Expand Down Expand Up @@ -220,39 +247,35 @@ def stop_proxy_if_running() -> list:
is_build=is_build,
)

# Post-processing with progress spinner
from ...presenters.spinner import Spinner

with Spinner("Sniffing out provenance...", quiet=ctx.quiet) as spin:
# Collect provenance
self.logger.debug("Collecting provenance data")
inject_log = (
tracer_result.inject_log_path
if os.path.exists(tracer_result.inject_log_path)
else None
)
roar_dir = os.path.join(ctx.repo_root, ".roar")
provenance_service = ProvenanceService(cache_dir=roar_dir)
t_prov_start = time.perf_counter()
prov = provenance_service.collect(
ctx.repo_root,
tracer_result.tracer_log_path,
inject_log,
config,
)
t_prov_end = time.perf_counter()
self.logger.debug(
"Provenance collected: read_files=%d, written_files=%d",
len(prov.get("data", {}).get("read_files", [])),
len(prov.get("data", {}).get("written_files", [])),
)

# Stop proxy and collect S3 entries before DB recording.
s3_entries = stop_proxy_if_running()
# Collect provenance first (fast) so we know total file count for the
# hashing spinner.
self.logger.debug("Collecting provenance data")
inject_log = (
tracer_result.inject_log_path if os.path.exists(tracer_result.inject_log_path) else None
)
roar_dir = os.path.join(ctx.repo_root, ".roar")
provenance_service = ProvenanceService(cache_dir=roar_dir)
t_prov_start = time.perf_counter()
prov = provenance_service.collect(
ctx.repo_root,
tracer_result.tracer_log_path,
inject_log,
config,
)
t_prov_end = time.perf_counter()
n_read = len(prov.get("data", {}).get("read_files", []))
n_written = len(prov.get("data", {}).get("written_files", []))
self.logger.debug(
"Provenance collected: read_files=%d, written_files=%d",
n_read,
n_written,
)

spin.update("Hashing files and TReqing outputs...")
# Stop proxy and collect S3 entries before DB recording.
s3_entries = stop_proxy_if_running()

# Record in database
total_files = n_read + n_written
with run_presenter.hashing(total=total_files or None):
self.logger.debug("Recording job in database")
t_record_start = time.perf_counter()
job_id, job_uid, read_file_info, written_file_info, stale_upstream, stale_downstream = (
Expand All @@ -275,12 +298,20 @@ def stop_proxy_if_running() -> list:
len(written_file_info),
)

spin.update("Auto-lineaging done, tidying up...")

# Cleanup temp files
self.logger.debug("Cleaning up temporary log files")
self._cleanup_logs(tracer_result.tracer_log_path, tracer_result.inject_log_path)

# "hashed" throughput line.
total_hash_bytes = sum(f.get("size") or 0 for f in written_file_info)
hash_duration = t_record_end - t_record_start
run_presenter.hashed(
n_artifacts=len(read_file_info) + len(written_file_info),
total_bytes=total_hash_bytes,
duration=hash_duration,
)
run_presenter.lineage_captured()

t_postrun_end = time.perf_counter()

if emit_timing:
Expand All @@ -304,6 +335,82 @@ def stop_proxy_if_running() -> list:
tracer_result.exit_code,
tracer_result.duration,
)
# UX metadata for the new run presenter.
exec_packages = prov.get("executables", {}).get("packages", {}) or {}
pip_count = len(exec_packages.get("pip", {}) or {})
dpkg_count = len(exec_packages.get("dpkg", {}) or {})
env_count = len(prov.get("runtime", {}).get("env_vars", {}) or {})
post_duration = t_postrun_end - t_postrun_start
backend_name = getattr(tracer_result, "backend", None)
if not isinstance(backend_name, str):
backend_name = None

# Git info (best-effort, never fail the run for this).
git_branch, git_short_commit, git_clean = None, None, True
try:
git_branch = (
subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stderr=subprocess.DEVNULL,
text=True,
cwd=ctx.repo_root,
).strip()
or None
)
git_short_commit = (
subprocess.check_output(
["git", "rev-parse", "--short", "HEAD"],
stderr=subprocess.DEVNULL,
text=True,
cwd=ctx.repo_root,
).strip()
or None
)
except Exception:
pass

# DAG stats + parent job lookup (best-effort, never fail the run).
dag_jobs, dag_artifacts, dag_depth = 0, 0, 0
try:
from ...db.context import create_database_context as _create_db_ctx
from ...presenters.dag_data_builder import DagDataBuilder

with _create_db_ctx(ctx.roar_dir) as db_ctx:
session = db_ctx.sessions.get_active()
if session:
builder = DagDataBuilder(db_ctx, int(session["id"]))
dag_data = builder.build(expanded=False)
dag_jobs = len(dag_data.get("nodes", []))
dag_artifacts = len(dag_data.get("artifacts", []))
# Compute depth: longest dependency chain.
nodes = dag_data.get("nodes", [])
if nodes:
step_deps = {n["step_number"]: n.get("dependencies", []) for n in nodes}
all_steps = set(step_deps)
memo: dict[int, int] = {}

def _depth(s: int) -> int:
if s in memo:
return memo[s]
children = [x for x in all_steps if s in step_deps.get(x, [])]
d = 1 + max((_depth(ch) for ch in children), default=0)
memo[s] = d
return d

roots = [s for s in all_steps if not step_deps.get(s)]
dag_depth = max((_depth(r) for r in roots), default=1) if roots else 1

# Parent job UIDs for input artifacts.
for inp in read_file_info:
aid = inp.get("artifact_id")
if aid:
jobs_info = db_ctx.artifacts.get_jobs(aid)
producers = jobs_info.get("produced_by", [])
if producers:
inp["parent_job_uid"] = producers[0].get("job_uid")
except Exception:
pass

return RunResult(
exit_code=tracer_result.exit_code,
job_id=job_id,
Expand All @@ -315,6 +422,20 @@ def stop_proxy_if_running() -> list:
is_build=is_build,
stale_upstream=stale_upstream,
stale_downstream=stale_downstream,
backend=backend_name,
post_duration=post_duration,
proxy_active=proxy_active,
pip_count=pip_count,
dpkg_count=dpkg_count,
env_count=env_count,
git_branch=git_branch,
git_short_commit=git_short_commit,
git_clean=git_clean,
total_hash_bytes=total_hash_bytes,
hash_duration=hash_duration,
dag_jobs=dag_jobs,
dag_artifacts=dag_artifacts,
dag_depth=dag_depth,
)

def _record_job(
Expand Down
3 changes: 3 additions & 0 deletions roar/execution/runtime/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,10 @@ def execute(
signal_handler.install()

exit_code = 1
selected_backend: str | None = None
try:
for idx, (backend, tracer_path) in enumerate(candidates):
selected_backend = backend
# Ensure stale files from previous attempts don't mask failures.
for log_file in (tracer_log_file, inject_log_file):
try:
Expand Down Expand Up @@ -368,6 +370,7 @@ def execute(
tracer_log_path=tracer_log_file,
inject_log_path=inject_log_file,
interrupted=signal_handler.is_interrupted(),
backend=selected_backend,
)

def get_log_paths(self, roar_dir: Path) -> tuple:
Expand Down
Loading
Loading