diff --git a/cli/localci/cli/run/__init__.py b/cli/localci/cli/run/__init__.py new file mode 100644 index 0000000..e2c8659 --- /dev/null +++ b/cli/localci/cli/run/__init__.py @@ -0,0 +1,6 @@ +"""``localci run`` command package.""" + +from localci.cli.run.cli import run +from localci.cli.run.patcher import _write_patched_workflow + +__all__ = ["run", "_write_patched_workflow"] diff --git a/cli/localci/cli/run/cli.py b/cli/localci/cli/run/cli.py new file mode 100644 index 0000000..6c43d1d --- /dev/null +++ b/cli/localci/cli/run/cli.py @@ -0,0 +1,60 @@ +"""Click entry point for ``localci run`` (options in ``click_options``, logic in ``run_flow``).""" + +from __future__ import annotations + +from pathlib import Path + +import click + +from localci.cli.run.click_options import run_options +from localci.cli.run.container import build_run_container +from localci.cli.run.params import RunOptions +from localci.cli.run.run_flow import execute_run + + +@click.command() +@run_options +def run( + ctx: click.Context, + workflow: str | None, + jobs: tuple[str, ...], + platform: str | None, + compiler: str | None, + matrix_filters: tuple[str, ...], + parallel: int | None, + timeout: int | None, + dry_run: bool, + no_cache: bool, + cache_dir: Path | None, + rebuild_image: bool, + keep_containers: bool | None, + interactive: bool, + verbose: bool, + github_token: str | None, + offline: bool, +) -> None: + """Execute selected jobs locally with parallel execution.""" + exit_code = execute_run( + cfg=ctx.obj["config"], + options=RunOptions( + workflow=workflow, + jobs=jobs, + platform=platform, + compiler=compiler, + matrix_filters=matrix_filters, + parallel=parallel, + timeout=timeout, + dry_run=dry_run, + no_cache=no_cache, + cache_dir=cache_dir, + rebuild_image=rebuild_image, + keep_containers=keep_containers, + interactive=interactive, + verbose=verbose, + github_token=github_token, + offline=offline, + ), + deps=build_run_container(), + ) + if exit_code: + ctx.exit(exit_code) diff --git a/cli/localci/cli/run/click_options.py b/cli/localci/cli/run/click_options.py new file mode 100644 index 0000000..faaee16 --- /dev/null +++ b/cli/localci/cli/run/click_options.py @@ -0,0 +1,106 @@ +"""Click option decorators for ``localci run`` (keeps ``cli.py`` entry ≤50 lines).""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Callable, TypeVar + +import click + +F = TypeVar("F", bound=Callable[..., Any]) + + +def run_options(command: F) -> F: + """Apply all ``localci run`` CLI options to *command*.""" + opts: list[Callable[[F], F]] = [ + click.option( + "--workflow", + "-w", + type=click.Path(exists=True), + default=None, + help="Workflow file (defaults to config value).", + ), + click.option( + "--job", + "-j", + "jobs", + multiple=True, + help="Job index or name (can specify multiple).", + ), + click.option( + "--platform", + "-p", + type=click.Choice(["linux", "windows", "macos"]), + default=None, + help="Run all jobs for a platform.", + ), + click.option("--compiler", type=str, default=None, help="Filter by compiler."), + click.option( + "--matrix", + "-m", + "matrix_filters", + multiple=True, + help="Matrix filter as key=value (repeatable).", + ), + click.option( + "--parallel", + type=int, + default=None, + help="Max parallel jobs (overrides config).", + ), + click.option( + "--timeout", + type=int, + default=None, + help="Job timeout in seconds (overrides config).", + ), + click.option( + "--dry-run", is_flag=True, help="Preview execution plan without running." + ), + click.option( + "--no-cache", + is_flag=True, + help="Disable build caching (ccache, boost, b2-source, cmake).", + ), + click.option( + "--cache-dir", + type=click.Path(path_type=Path, file_okay=False), + default=None, + help="Override cache root directory (default: config cache.directory).", + ), + click.option( + "--rebuild-image", is_flag=True, help="Force rebuild Docker image." + ), + click.option( + "--keep-containers/--no-keep-containers", + "keep_containers", + default=None, + help="Keep containers after execution (default: from config).", + ), + click.option( + "--interactive", "-i", is_flag=True, help="Interactive job selection." + ), + click.option( + "--verbose", "-v", is_flag=True, help="Show verbose act output." + ), + click.option( + "--github-token", + "-t", + "github_token", + type=str, + default=None, + help="GitHub token for API access (or set GITHUB_TOKEN env var).", + ), + click.option( + "--offline", + is_flag=True, + help=( + "Run in offline mode (no action downloads, requires pre-cached actions)." + ), + ), + click.pass_context, + ] + wrapped: F = command + for opt in reversed(opts): + wrapped = opt(wrapped) # type: ignore[assignment] + return wrapped diff --git a/cli/localci/cli/run/container.py b/cli/localci/cli/run/container.py new file mode 100644 index 0000000..d0031cd --- /dev/null +++ b/cli/localci/cli/run/container.py @@ -0,0 +1,53 @@ +"""Dependency injection container for ``localci run``.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Callable + +from localci.core.boost_cache import ensure_boost_cache +from localci.core.ccache_stats import get_ccache_stats +from localci.core.config import LocalCIConfig, resolve_cache_paths +from localci.core.executor import JobExecutor +from localci.core.orchestrator import OrchestratorConfig, ParallelExecutionManager +from localci.core.progress import ProgressTracker +from localci.core.queue import PriorityConfig +from localci.core.queue_builder import QueueBuilder +from localci.core.workflow import WorkflowAnalyzer + + +@dataclass +class RunDependencies: + """Injectable collaborators for ``execute_run`` (pass via ``deps=`` in tests).""" + + workflow_analyzer: WorkflowAnalyzer + queue_builder_factory: Callable[..., QueueBuilder] + job_executor_factory: Callable[[Path], JobExecutor] + parallel_manager_factory: Callable[..., ParallelExecutionManager] + progress_tracker_factory: Callable[..., ProgressTracker] + orchestrator_config_factory: Callable[[LocalCIConfig], OrchestratorConfig] + priority_config_factory: Callable[[LocalCIConfig], PriorityConfig] + ensure_boost_cache_fn: Callable[..., bool] + get_ccache_stats_fn: Callable[..., str | None] + resolve_cache_paths_fn: Callable[..., object] + workflow_patcher: Callable[..., Path] + + +def build_run_container() -> RunDependencies: + """Construct the default production dependency graph for ``localci run``.""" + from localci.cli.run.patcher import _write_patched_workflow + + return RunDependencies( + workflow_analyzer=WorkflowAnalyzer(), + queue_builder_factory=QueueBuilder, + job_executor_factory=JobExecutor, + parallel_manager_factory=ParallelExecutionManager, + progress_tracker_factory=ProgressTracker, + orchestrator_config_factory=OrchestratorConfig.from_config, + priority_config_factory=PriorityConfig.from_config, + ensure_boost_cache_fn=ensure_boost_cache, + get_ccache_stats_fn=get_ccache_stats, + resolve_cache_paths_fn=resolve_cache_paths, + workflow_patcher=_write_patched_workflow, + ) diff --git a/cli/localci/cli/run/params.py b/cli/localci/cli/run/params.py new file mode 100644 index 0000000..92b02de --- /dev/null +++ b/cli/localci/cli/run/params.py @@ -0,0 +1,28 @@ +"""CLI option bundle for ``localci run``.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True) +class RunOptions: + """Normalized options passed from the Click entry point to orchestration.""" + + workflow: str | None + jobs: tuple[str, ...] + platform: str | None + compiler: str | None + matrix_filters: tuple[str, ...] + parallel: int | None + timeout: int | None + dry_run: bool + no_cache: bool + cache_dir: Path | None + rebuild_image: bool + keep_containers: bool | None + interactive: bool + verbose: bool + github_token: str | None + offline: bool diff --git a/cli/localci/cli/run.py b/cli/localci/cli/run/patcher.py similarity index 50% rename from cli/localci/cli/run.py rename to cli/localci/cli/run/patcher.py index 470a0b0..5e8e2cb 100644 --- a/cli/localci/cli/run.py +++ b/cli/localci/cli/run/patcher.py @@ -1,8 +1,4 @@ -"""``localci run`` command. - -Execute selected jobs locally via the parallel execution manager -(queue + orchestrator) with Docker containers. -""" +"""Workflow patcher and dry-run plan output for `localci run`.""" from __future__ import annotations @@ -11,400 +7,8 @@ import tempfile from pathlib import Path -import click - -from localci.core.executor import ( - ActNotFoundError, - DockerNotAvailableError, - JobExecutor, -) -from localci.errors import WorkflowError -from localci.core.models import JobEvent, JobEventType -from localci.core.orchestrator import ( - OrchestratorConfig, - ParallelExecutionManager, -) -from localci.core.progress import ProgressTracker -from localci.core.queue import PriorityConfig -from localci.core.queue_builder import QueueBuilder -from localci.core.results import ExecutionSummary -from localci.core.workflow import MatrixEntry, Platform, WorkflowAnalyzer -from localci.core.github_token import resolve_github_token, warn_sentinel_github_token -from localci.core.boost_cache import ensure_boost_cache -from localci.core.ccache_stats import get_ccache_stats -from localci.core.config import resolve_cache_paths -from localci.utils.output import ( - console, - print_error, - print_info, - print_key_value, - print_success, - print_warning, -) - - -@click.command() -@click.option( - "--workflow", - "-w", - type=click.Path(exists=True), - default=None, - help="Workflow file (defaults to config value).", -) -@click.option( - "--job", - "-j", - "jobs", - multiple=True, - help="Job index or name (can specify multiple).", -) -@click.option( - "--platform", - "-p", - type=click.Choice(["linux", "windows", "macos"]), - default=None, - help="Run all jobs for a platform.", -) -@click.option("--compiler", type=str, default=None, help="Filter by compiler.") -@click.option( - "--matrix", - "-m", - "matrix_filters", - multiple=True, - help="Matrix filter as key=value (repeatable).", -) -@click.option( - "--parallel", - type=int, - default=None, - help="Max parallel jobs (overrides config).", -) -@click.option( - "--timeout", - type=int, - default=None, - help="Job timeout in seconds (overrides config).", -) -@click.option( - "--dry-run", is_flag=True, help="Preview execution plan without running." -) -@click.option("--no-cache", is_flag=True, help="Disable build caching (ccache, boost, b2-source, cmake).") -@click.option( - "--cache-dir", - type=click.Path(path_type=Path, file_okay=False), - default=None, - help="Override cache root directory (default: config cache.directory).", -) -@click.option( - "--rebuild-image", is_flag=True, help="Force rebuild Docker image." -) -@click.option( - "--keep-containers/--no-keep-containers", - "keep_containers", - default=None, - help="Keep containers after execution (default: from config).", -) -@click.option( - "--interactive", "-i", is_flag=True, help="Interactive job selection." -) -@click.option( - "--verbose", "-v", is_flag=True, help="Show verbose act output." -) -@click.option( - "--github-token", - "-t", - "github_token", - type=str, - default=None, - help="GitHub token for API access (or set GITHUB_TOKEN env var).", -) -@click.option( - "--offline", - is_flag=True, - help="Run in offline mode (no action downloads, requires pre-cached actions).", -) -@click.pass_context -def run( - ctx: click.Context, - workflow: str | None, - jobs: tuple[str, ...], - platform: str | None, - compiler: str | None, - matrix_filters: tuple[str, ...], - parallel: int | None, - timeout: int | None, - dry_run: bool, - no_cache: bool, - cache_dir: Path | None, - rebuild_image: bool, - keep_containers: bool | None, - interactive: bool, - verbose: bool, - github_token: str | None, - offline: bool, -) -> None: - """Execute selected jobs locally with parallel execution.""" - cfg = ctx.obj["config"] - - effective_timeout = timeout or cfg.execution.timeout - # parallel.max_jobs / --parallel cap concurrent act processes. On Windows with - # Docker Desktop + WSL2, many containers share one Linux VM; very high values - # can contend on CPU/RAM/disk (see Usage Guide: Windows, WSL2, parallelism). - effective_parallel = parallel or cfg.parallel.max_jobs - effective_keep_containers = ( - keep_containers if keep_containers is not None else cfg.execution.keep_containers - ) - workflow_path = Path(workflow) if workflow else cfg.workflow - project_dir = Path(".").resolve() - - gh_token = resolve_github_token(github_token) - if not offline: - warn_sentinel_github_token(gh_token) - - # ── 1. Parse the workflow ────────────────────────────────────── - try: - analyzer = WorkflowAnalyzer() - wf = analyzer.analyze(workflow_path) - except WorkflowError as exc: - print_error(str(exc)) - ctx.exit(1) - return - - # Collect (job_id, entry) pairs - all_pairs: list[tuple[str, MatrixEntry]] = [] - for job_id, job in wf.jobs.items(): - for entry in job.matrix: - all_pairs.append((job_id, entry)) - - if not all_pairs: - print_warning("No matrix entries found in workflow.") - return - - # ── 2. Warn about not-yet-implemented flags ───────────────────── - if rebuild_image: - print_warning("--rebuild-image is not yet implemented; ignoring.") - if interactive: - print_warning("--interactive is not yet implemented; ignoring.") - - # ── 3. Filter entries ────────────────────────────────────────── - plat_map = { - "linux": Platform.LINUX, - "windows": Platform.WINDOWS, - "macos": Platform.MACOS, - } - compiler_filter = compiler.lower() if compiler else None - selected: list[tuple[str, MatrixEntry]] = list(all_pairs) - if platform: - target_plat = plat_map.get(platform) - selected = [(jid, e) for jid, e in selected if e.platform == target_plat] - - if compiler_filter: - selected = [ - (jid, e) - for jid, e in selected - if e.compiler.family.value == compiler_filter - ] - - if jobs: - seen: set[tuple[str, int]] = set() - filtered_list: list[tuple[str, MatrixEntry]] = [] - for j in jobs: - try: - idx = int(j) - for jid, e in selected: - if e.index == idx and (jid, e.index) not in seen: - filtered_list.append((jid, e)) - seen.add((jid, e.index)) - continue - except ValueError: - pass # not a numeric job index; treat *j* as a name substring below - j_lower = j.lower() - for jid, e in selected: - if j_lower in e.name.lower() and (jid, e.index) not in seen: - filtered_list.append((jid, e)) - seen.add((jid, e.index)) - selected = filtered_list - - if not selected: - print_warning("No jobs match the given filters.") - return - - # Build queue via QueueBuilder - selected_set = {(jid, e.index) for jid, e in selected} - job_filter_list = list({jid for jid, _ in selected}) - plat_filter = plat_map.get(platform) if platform else None - # CLI --matrix key=value (repeatable) → single include filter dict; overrides config when set - cli_matrix_include: list[dict] | None = None - if matrix_filters: - cli_matrix_include = [{}] - for s in matrix_filters: - if "=" in s: - k, v = s.split("=", 1) - cli_matrix_include[0][k.strip()] = v.strip() - if not cli_matrix_include[0]: - cli_matrix_include = None - matrix_include = ( - cli_matrix_include - if cli_matrix_include - else ( - [f.model_dump(exclude_none=True) for f in cfg.matrix.include] - if cfg.matrix.include - else None - ) - ) - matrix_exclude = ( - [f.model_dump(exclude_none=True) for f in cfg.matrix.exclude] - if cfg.matrix.exclude - else None - ) - priority_config = PriorityConfig.from_config(cfg) - registry_path = project_dir / "image-registry.yml" - if not registry_path.exists(): - registry_path = None - builder = QueueBuilder(wf, priority_config=priority_config) - queue = builder.build( - platform_filter=plat_filter, - job_filter=job_filter_list, - compiler_filter=compiler_filter, - matrix_include=matrix_include, - matrix_exclude=matrix_exclude, - entries_include=selected_set, - registry_path=registry_path, - ) - - # ── 4. Dry-run mode ─────────────────────────────────────────── - if dry_run: - _print_execution_plan(queue, workflow_path, effective_timeout) - return - - # ── 5. Preflight checks ─────────────────────────────────────── - logs_dir = Path(cfg.logging.directory) - executor = JobExecutor(logs_dir=logs_dir) - try: - act_version = executor.check_act() - print_info(f"Using {act_version}") - except ActNotFoundError as exc: - print_error(str(exc)) - ctx.exit(1) - return - - try: - executor.check_docker() - except DockerNotAvailableError as exc: - print_error(str(exc)) - ctx.exit(1) - return - - # ── 5b. Phase 2: ensure Boost cache (clone/fetch when enabled) ─── - if not no_cache and cfg.cache.enabled and cfg.cache.boost.enabled: - if not ensure_boost_cache(cfg.cache, no_cache, cache_dir): - print_warning("Boost cache setup failed; jobs will clone Boost from scratch.") - - # ── 6. Execute via orchestrator ──────────────────────────────── - orch_config = OrchestratorConfig.from_config(cfg) - orch_config.max_parallel = effective_parallel - orch_config.job_timeout = effective_timeout - orch_config.keep_containers = effective_keep_containers - orch_config.default_secrets = {"GITHUB_TOKEN": gh_token} - # Match feature/cache-main-install behavior: noninteractive apt so "Install packages" never hangs - orch_config.default_env = {"DEBIAN_FRONTEND": "noninteractive"} - orch_config.image_registry_path = cfg.images.registry - orch_config.verbose = verbose - orch_config.offline = offline - orch_config.auto_build = cfg.images.auto_build - orchestrator = ParallelExecutionManager( - queue=queue, - workflow_file=workflow_path, - project_dir=project_dir, - config=orch_config, - logs_dir=logs_dir, - workflow_patcher=_write_patched_workflow, - cache_config=cfg.cache, - no_cache=no_cache, - cache_dir_override=cache_dir, - ) - - status_file = logs_dir / "last-status.json" - tracker = ProgressTracker( - queue=queue, - workflow_file=str(workflow_path), - platform=platform or "linux", - max_parallel=effective_parallel, - status_file=status_file, - ) - for job in queue.get_all_jobs(): - tracker.on_event( - JobEvent(event_type=JobEventType.JOB_QUEUED, job=job) - ) - - orchestrator.add_listener(tracker.on_event) - - if not no_cache and cfg.cache.enabled: - cache_parts = [] - if cfg.cache.ccache.enabled: - cache_parts.append("ccache") - if cfg.cache.boost.enabled: - cache_parts.append("boost") - if cfg.cache.cmake.enabled: - cache_parts.append("cmake") - if getattr(cfg.cache.boost, "build_dir", True): - cache_parts.append("b2-source") - if getattr(cfg.cache, "apt", None) and getattr(cfg.cache.apt, "enabled", True): - cache_parts.append("apt") - if cache_parts: - print_info(f"Cache enabled: {', '.join(cache_parts)}. Use --no-cache to disable.") - - tracker.start_live() - try: - run = orchestrator.execute() - finally: - tracker.stop_live() - - tracker.set_execution_id(run.execution_id) - tracker.write_status_file() - - # ── 7. Summary ──────────────────────────────────────────────── - summary = ExecutionSummary( - execution_id=run.execution_id, - started_at=run.started_at, - finished_at=run.finished_at, - results=list(run.results.values()), - ) - tracker.print_summary(run) - - # Issue 9: ccache stats after run (when cache enabled) - if not no_cache and cfg.cache.enabled and cfg.cache.ccache.enabled: - resolved = resolve_cache_paths( - cfg.cache, no_cache, cache_dir, None, None - ) - if resolved and resolved.ccache_host is not None: - stats = get_ccache_stats(resolved.ccache_host) - if stats: - print_info("ccache stats:") - for line in stats.splitlines(): - console.print(f" {line}") - - # Save results: both last-run.json and {execution_id}.json so - # status --execution-id X and logs -e X can find this run - logs_dir = cfg.logging.directory - last_run_file = logs_dir / "last-run.json" - execution_file = logs_dir / f"{summary.execution_id}.json" - try: - summary.save(last_run_file) - summary.save(execution_file) - print_info(f"Results saved to {last_run_file}") - print_info(f"Execution ID: {summary.execution_id} (use with status -e or logs -e)") - except OSError as exc: - print_warning(f"Could not save results: {exc}") - except (TypeError, ValueError) as exc: - print_warning(f"Could not serialize results for save: {exc}") - - if not summary.all_passed: - ctx.exit(1) - return - - -# ─── Helpers ─────────────────────────────────────────────────────── +from localci.core.workflow import MatrixEntry +from localci.utils.output import console, print_info, print_key_value def _print_execution_plan(queue, workflow_path: Path, timeout: int) -> None: diff --git a/cli/localci/cli/run/run_flow.py b/cli/localci/cli/run/run_flow.py new file mode 100644 index 0000000..f070417 --- /dev/null +++ b/cli/localci/cli/run/run_flow.py @@ -0,0 +1,332 @@ +"""Testable orchestration logic for ``localci run`` (not ``localci.core.orchestrator``).""" + +from __future__ import annotations + +from pathlib import Path + +from localci.cli.run.container import RunDependencies, build_run_container +from localci.cli.run.params import RunOptions +from localci.cli.run.patcher import _print_execution_plan +from localci.core.config import LocalCIConfig +from localci.core.executor import ActNotFoundError, DockerNotAvailableError +from localci.core.github_token import resolve_github_token, warn_sentinel_github_token +from localci.core.models import JobEvent, JobEventType +from localci.core.results import ExecutionSummary +from localci.core.workflow import MatrixEntry, Platform +from localci.errors import WorkflowError +from localci.utils.output import print_error, print_info, print_warning + +_PLATFORM_MAP = { + "linux": Platform.LINUX, + "windows": Platform.WINDOWS, + "macos": Platform.MACOS, +} + + +def execute_run( + *, + cfg: LocalCIConfig, + options: RunOptions, + deps: RunDependencies | None = None, +) -> int: + """Run the full ``localci run`` flow; return process exit code (0 = success).""" + container = deps or build_run_container() + + effective_timeout = options.timeout or cfg.execution.timeout + effective_parallel = options.parallel or cfg.parallel.max_jobs + effective_keep_containers = ( + options.keep_containers + if options.keep_containers is not None + else cfg.execution.keep_containers + ) + workflow_path = Path(options.workflow) if options.workflow else cfg.workflow + project_dir = Path(".").resolve() + gh_token = resolve_github_token(options.github_token) + if not options.offline: + warn_sentinel_github_token(gh_token) + + try: + wf = container.workflow_analyzer.analyze(workflow_path) + except WorkflowError as exc: + print_error(str(exc)) + return 1 + + all_pairs = _collect_matrix_pairs(wf) + if not all_pairs: + print_warning("No matrix entries found in workflow.") + return 0 + + if options.rebuild_image: + print_warning("--rebuild-image is not yet implemented; ignoring.") + if options.interactive: + print_warning("--interactive is not yet implemented; ignoring.") + + selected = _filter_matrix_pairs( + all_pairs, + platform=options.platform, + compiler=options.compiler, + jobs=options.jobs, + ) + if not selected: + print_warning("No jobs match the given filters.") + return 0 + + priority_config = container.priority_config_factory(cfg) + registry_path = project_dir / "image-registry.yml" + if not registry_path.exists(): + registry_path = None + + matrix_include, matrix_exclude = _resolve_matrix_filters(options, cfg) + plat_filter = _PLATFORM_MAP.get(options.platform) if options.platform else None + compiler_filter = options.compiler.lower() if options.compiler else None + selected_set = {(jid, e.index) for jid, e in selected} + job_filter_list = list({jid for jid, _ in selected}) + + builder = container.queue_builder_factory(wf, priority_config=priority_config) + queue = builder.build( + platform_filter=plat_filter, + job_filter=job_filter_list, + compiler_filter=compiler_filter, + matrix_include=matrix_include, + matrix_exclude=matrix_exclude, + entries_include=selected_set, + registry_path=registry_path, + ) + + if options.dry_run: + _print_execution_plan(queue, workflow_path, effective_timeout) + return 0 + + logs_dir = Path(cfg.logging.directory) + executor = container.job_executor_factory(logs_dir) + preflight_code = _run_preflight(executor) + if preflight_code != 0: + return preflight_code + + if ( + not options.no_cache + and cfg.cache.enabled + and cfg.cache.boost.enabled + ): + if not container.ensure_boost_cache_fn( + cfg.cache, options.no_cache, options.cache_dir + ): + print_warning( + "Boost cache setup failed; jobs will clone Boost from scratch." + ) + + orch_config = container.orchestrator_config_factory(cfg) + orch_config.max_parallel = effective_parallel + orch_config.job_timeout = effective_timeout + orch_config.keep_containers = effective_keep_containers + orch_config.default_secrets = {"GITHUB_TOKEN": gh_token} + orch_config.default_env = {"DEBIAN_FRONTEND": "noninteractive"} + orch_config.image_registry_path = cfg.images.registry + orch_config.verbose = options.verbose + orch_config.offline = options.offline + orch_config.auto_build = cfg.images.auto_build + + parallel_manager = container.parallel_manager_factory( + queue=queue, + workflow_file=workflow_path, + project_dir=project_dir, + config=orch_config, + logs_dir=logs_dir, + workflow_patcher=container.workflow_patcher, + cache_config=cfg.cache, + no_cache=options.no_cache, + cache_dir_override=options.cache_dir, + ) + + status_file = logs_dir / "last-status.json" + tracker = container.progress_tracker_factory( + queue=queue, + workflow_file=str(workflow_path), + platform=options.platform or "linux", + max_parallel=effective_parallel, + status_file=status_file, + ) + for job in queue.get_all_jobs(): + tracker.on_event( + JobEvent(event_type=JobEventType.JOB_QUEUED, job=job) + ) + parallel_manager.add_listener(tracker.on_event) + + _print_cache_enabled_message(cfg, options.no_cache) + + tracker.start_live() + try: + run_result = parallel_manager.execute() + finally: + tracker.stop_live() + + tracker.set_execution_id(run_result.execution_id) + tracker.write_status_file() + + summary = ExecutionSummary( + execution_id=run_result.execution_id, + started_at=run_result.started_at, + finished_at=run_result.finished_at, + results=list(run_result.results.values()), + ) + tracker.print_summary(run_result) + + _print_ccache_stats(container, cfg, options) + + _save_execution_results(summary, cfg) + + return 0 if summary.all_passed else 1 + + +def _collect_matrix_pairs(wf) -> list[tuple[str, MatrixEntry]]: + pairs: list[tuple[str, MatrixEntry]] = [] + for job_id, job in wf.jobs.items(): + for entry in job.matrix: + pairs.append((job_id, entry)) + return pairs + + +def _filter_matrix_pairs( + all_pairs: list[tuple[str, MatrixEntry]], + *, + platform: str | None, + compiler: str | None, + jobs: tuple[str, ...], +) -> list[tuple[str, MatrixEntry]]: + selected = list(all_pairs) + if platform: + target_plat = _PLATFORM_MAP.get(platform) + selected = [(jid, e) for jid, e in selected if e.platform == target_plat] + + compiler_filter = compiler.lower() if compiler else None + if compiler_filter: + selected = [ + (jid, e) + for jid, e in selected + if e.compiler.family.value == compiler_filter + ] + + if jobs: + seen: set[tuple[str, int]] = set() + filtered_list: list[tuple[str, MatrixEntry]] = [] + for j in jobs: + try: + idx = int(j) + for jid, e in selected: + if e.index == idx and (jid, e.index) not in seen: + filtered_list.append((jid, e)) + seen.add((jid, e.index)) + continue + except ValueError: + pass + j_lower = j.lower() + for jid, e in selected: + if j_lower in e.name.lower() and (jid, e.index) not in seen: + filtered_list.append((jid, e)) + seen.add((jid, e.index)) + selected = filtered_list + + return selected + + +def _resolve_matrix_filters( + options: RunOptions, cfg: LocalCIConfig +) -> tuple[list[dict] | None, list[dict] | None]: + cli_matrix_include: list[dict] | None = None + if options.matrix_filters: + cli_matrix_include = [{}] + for s in options.matrix_filters: + if "=" in s: + k, v = s.split("=", 1) + cli_matrix_include[0][k.strip()] = v.strip() + if not cli_matrix_include[0]: + cli_matrix_include = None + + matrix_include = ( + cli_matrix_include + if cli_matrix_include + else ( + [f.model_dump(exclude_none=True) for f in cfg.matrix.include] + if cfg.matrix.include + else None + ) + ) + matrix_exclude = ( + [f.model_dump(exclude_none=True) for f in cfg.matrix.exclude] + if cfg.matrix.exclude + else None + ) + return matrix_include, matrix_exclude + + +def _run_preflight(executor) -> int: + try: + act_version = executor.check_act() + print_info(f"Using {act_version}") + except ActNotFoundError as exc: + print_error(str(exc)) + return 1 + + try: + executor.check_docker() + except DockerNotAvailableError as exc: + print_error(str(exc)) + return 1 + + return 0 + + +def _print_cache_enabled_message(cfg: LocalCIConfig, no_cache: bool) -> None: + if no_cache or not cfg.cache.enabled: + return + cache_parts = [] + if cfg.cache.ccache.enabled: + cache_parts.append("ccache") + if cfg.cache.boost.enabled: + cache_parts.append("boost") + if cfg.cache.cmake.enabled: + cache_parts.append("cmake") + if getattr(cfg.cache.boost, "build_dir", True): + cache_parts.append("b2-source") + if getattr(cfg.cache, "apt", None) and getattr(cfg.cache.apt, "enabled", True): + cache_parts.append("apt") + if cache_parts: + print_info( + f"Cache enabled: {', '.join(cache_parts)}. Use --no-cache to disable." + ) + + +def _print_ccache_stats( + container: RunDependencies, cfg: LocalCIConfig, options: RunOptions +) -> None: + from localci.utils.output import console + + if options.no_cache or not cfg.cache.enabled or not cfg.cache.ccache.enabled: + return + resolved = container.resolve_cache_paths_fn( + cfg.cache, options.no_cache, options.cache_dir, None, None + ) + if resolved and resolved.ccache_host is not None: + stats = container.get_ccache_stats_fn(resolved.ccache_host) + if stats: + print_info("ccache stats:") + for line in stats.splitlines(): + console.print(f" {line}") + + +def _save_execution_results(summary: ExecutionSummary, cfg: LocalCIConfig) -> None: + logs_dir = Path(cfg.logging.directory) + last_run_file = logs_dir / "last-run.json" + execution_file = logs_dir / f"{summary.execution_id}.json" + try: + summary.save(last_run_file) + summary.save(execution_file) + print_info(f"Results saved to {last_run_file}") + print_info( + f"Execution ID: {summary.execution_id} " + "(use with status -e or logs -e)" + ) + except OSError as exc: + print_warning(f"Could not save results: {exc}") + except (TypeError, ValueError) as exc: + print_warning(f"Could not serialize results for save: {exc}") diff --git a/cli/tests/test_cli.py b/cli/tests/test_cli.py index 455af05..4d3f060 100644 --- a/cli/tests/test_cli.py +++ b/cli/tests/test_cli.py @@ -410,7 +410,7 @@ def test_leaked_localci_error_exit_one_no_crash_log( assert "mikefarah/yq" in result.output.lower() assert not (isolated_localci_home / CRASH_LOG_NAME).exists() - @patch("localci.cli.run.JobExecutor.check_act") + @patch("localci.core.executor.JobExecutor.check_act") def test_run_unhandled_exception_uses_catch_all( self, mock_check_act, isolated_localci_home, tmp_path ): diff --git a/cli/tests/test_github_token.py b/cli/tests/test_github_token.py index bd9706e..97abd18 100644 --- a/cli/tests/test_github_token.py +++ b/cli/tests/test_github_token.py @@ -16,6 +16,7 @@ is_sentinel_github_token, resolve_github_token, ) + runner = CliRunner() SAMPLE_WORKFLOW = str(Path(__file__).parent / "fixtures" / "sample_workflow.yml") @@ -60,6 +61,7 @@ def test_warning_message_documents_remediation(self) -> None: assert "--offline" in msg assert "401" in msg + class TestAuthErrorExtractKeywords: def test_auth_keywords_registered(self) -> None: assert AUTH_ERROR_EXTRACT_KEYWORDS == ( diff --git a/cli/tests/test_run_flow.py b/cli/tests/test_run_flow.py new file mode 100644 index 0000000..1bcb937 --- /dev/null +++ b/cli/tests/test_run_flow.py @@ -0,0 +1,171 @@ +"""Unit tests for ``localci.cli.run.run_flow.execute_run``.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from click.testing import CliRunner + +from localci.cli.run.container import RunDependencies, build_run_container +from localci.cli.run.params import RunOptions +from localci.cli.run.run_flow import execute_run +from localci.core.config import LocalCIConfig +from localci.core.executor import ActNotFoundError +from localci.errors import WorkflowError + +FIXTURES_DIR = Path(__file__).parent / "fixtures" +SAMPLE_WORKFLOW = FIXTURES_DIR / "sample_workflow.yml" + + +def _dry_run_options(**overrides: object) -> RunOptions: + base = dict( + workflow=str(SAMPLE_WORKFLOW), + jobs=(), + platform=None, + compiler=None, + matrix_filters=(), + parallel=None, + timeout=None, + dry_run=True, + no_cache=True, + cache_dir=None, + rebuild_image=False, + keep_containers=None, + interactive=False, + verbose=False, + github_token=None, + offline=False, + ) + base.update(overrides) + return RunOptions(**base) # type: ignore[arg-type] + + +@pytest.fixture +def sample_config() -> LocalCIConfig: + return LocalCIConfig(workflow=SAMPLE_WORKFLOW) + + +class TestExecuteRunDryRun: + """Direct tests of ``execute_run`` without act/Docker.""" + + def test_dry_run_prints_execution_plan( + self, sample_config: LocalCIConfig + ) -> None: + with patch("localci.cli.run.run_flow._print_execution_plan") as mock_plan: + code = execute_run( + cfg=sample_config, + options=_dry_run_options(), + deps=build_run_container(), + ) + assert code == 0 + mock_plan.assert_called_once() + + def test_dry_run_skips_preflight_and_parallel_execute( + self, sample_config: LocalCIConfig + ) -> None: + deps = build_run_container() + mock_executor = MagicMock() + mock_executor.check_act.side_effect = ActNotFoundError() + deps.job_executor_factory = lambda _logs: mock_executor + + with patch("localci.cli.run.run_flow._print_execution_plan"): + code = execute_run( + cfg=sample_config, + options=_dry_run_options(), + deps=deps, + ) + + assert code == 0 + mock_executor.check_act.assert_not_called() + + def test_act_not_found_returns_exit_code_when_not_dry_run( + self, sample_config: LocalCIConfig + ) -> None: + deps = build_run_container() + mock_executor = MagicMock() + mock_executor.check_act.side_effect = ActNotFoundError() + deps.job_executor_factory = lambda _logs: mock_executor + + code = execute_run( + cfg=sample_config, + options=_dry_run_options(dry_run=False, timeout=60), + deps=deps, + ) + + assert code == 1 + + def test_workflow_error_returns_exit_code( + self, sample_config: LocalCIConfig + ) -> None: + deps = build_run_container() + deps.workflow_analyzer = MagicMock() + deps.workflow_analyzer.analyze.side_effect = WorkflowError("invalid workflow") + + code = execute_run( + cfg=sample_config, + options=_dry_run_options(), + deps=deps, + ) + + assert code == 1 + + def test_empty_matrix_returns_zero( + self, sample_config: LocalCIConfig + ) -> None: + deps = build_run_container() + mock_wf = MagicMock() + mock_wf.jobs = {} + deps.workflow_analyzer = MagicMock() + deps.workflow_analyzer.analyze.return_value = mock_wf + + code = execute_run( + cfg=sample_config, + options=_dry_run_options(), + deps=deps, + ) + + assert code == 0 + + def test_no_jobs_match_filters_returns_zero( + self, sample_config: LocalCIConfig + ) -> None: + code = execute_run( + cfg=sample_config, + options=_dry_run_options(jobs=("nonexistent-job-xyz",)), + deps=build_run_container(), + ) + assert code == 0 + + +class TestExecuteRunCliParity: + """CLI still delegates to the same orchestration path.""" + + def test_cli_dry_run_via_package_entry(self) -> None: + from localci.cli.main import cli + + result = CliRunner().invoke( + cli, ["run", "--workflow", str(SAMPLE_WORKFLOW), "--dry-run"] + ) + assert result.exit_code == 0 + assert "Dry run" in result.output or "execution plan" in result.output.lower() + + def test_cli_dry_run_offline_skips_token_warning( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + from localci.cli.main import cli + + monkeypatch.delenv("GITHUB_TOKEN", raising=False) + result = CliRunner().invoke( + cli, + [ + "run", + "--workflow", + str(SAMPLE_WORKFLOW), + "--dry-run", + "--offline", + ], + ) + assert result.exit_code == 0 + assert "No GitHub token provided" not in result.output