From 5ebfc5e64d124a4ec2a71c63e1ed61db04e45ea9 Mon Sep 17 00:00:00 2001 From: Seraj Ahmad <97997362+ahmads9-roche@users.noreply.github.com> Date: Thu, 31 Jul 2025 10:27:13 -0700 Subject: [PATCH] first draft for basic executor --- algflow/algorithm/algorithm.py | 19 +++++-- algflow/algorithm/events.py | 26 +++++++++ algflow/algorithm/resources.py | 21 +++++++ algflow/basic/__init__.py | 0 algflow/basic/cmdline_algorithm.py | 85 +++++++++++++++++++++++++++++ algflow/data/handler.py | 4 ++ algflow/pipeline/simple_executor.py | 64 ++++++++++++++++++++-- 7 files changed, 210 insertions(+), 9 deletions(-) create mode 100644 algflow/algorithm/events.py create mode 100644 algflow/algorithm/resources.py create mode 100644 algflow/basic/__init__.py create mode 100644 algflow/basic/cmdline_algorithm.py diff --git a/algflow/algorithm/algorithm.py b/algflow/algorithm/algorithm.py index e2ba50a..c3ce454 100644 --- a/algflow/algorithm/algorithm.py +++ b/algflow/algorithm/algorithm.py @@ -1,8 +1,9 @@ import logging -from typing import Any +import inspect +from typing import Any, Generator from algflow.algorithm.alg_meta import AlgorithmMetaClass -from algflow.algorithm.output import Output +from algflow.algorithm.events import AlgflowEvent logger = logging.getLogger('Algorithm') @@ -15,10 +16,12 @@ def __init__(self, algo, err): class Algorithm(metaclass=AlgorithmMetaClass): - def __init__(self, params=None): + def __init__(self, params=None, capture_events=True): self.params = params + self.capture_events = capture_events + self.events = [] - def run(self, inputs, outputs): + def run(self, inputs, outputs) -> Generator[AlgflowEvent] | None: raise NotImplementedError('This method must be implemented') # def init_input(self, storage): @@ -47,6 +50,12 @@ def __getstate__(self): def __call__(self, inputs: dict[str, Any]) -> dict[str, Any]: inputs = self.Input(**inputs) outputs = self.Output() - self.run(inputs, outputs) + if inspect.isgeneratorfunction(self.run): + if self.capture_events: + self.events = [ev for ev in self.run(inputs, outputs)] + else: + yield from self.run(inputs, outputs) + else: + self.run(inputs, outputs) outputs.validate() return outputs.__dict__ diff --git a/algflow/algorithm/events.py b/algflow/algorithm/events.py new file mode 100644 index 0000000..5a87387 --- /dev/null +++ b/algflow/algorithm/events.py @@ -0,0 +1,26 @@ +from dataclasses import dataclass, field +import time + + +@dataclass +class AlgflowEvent: + name: str + algorithm: str + data: dict + timestamp: int = field(default_factory=time.time) + + def __str__(self): + return f'Event[{self.name}] {self.timestamp} {self.algorithm} {self.data}' + + +class ExecutionStartEvent(AlgflowEvent): + def __init__(self, algorithm: str, data: dict): + super().__init__('ExecutionStart', algorithm, data) + +class ExecutionEndEvent(AlgflowEvent): + def __init__(self, algorithm: str, data: dict): + super().__init__('ExecutionEnd', algorithm, data) + +class ExecutionErrorEvent(AlgflowEvent): + def __init__(self, algorithm: str, debug_info: dict): + super().__init__('ExecutionError', algorithm, debug_info) \ No newline at end of file diff --git a/algflow/algorithm/resources.py b/algflow/algorithm/resources.py new file mode 100644 index 0000000..95d5de1 --- /dev/null +++ b/algflow/algorithm/resources.py @@ -0,0 +1,21 @@ +from dataclasses import dataclass + + +@dataclass +class ResourceSchema: + cpu: int + gpu_count: int + memory: str + storage: str + +@dataclass +class Trace: + task_id: str + hash: str + native_id: str + name: str + status: str + + fields = 'task_id,hash,native_id,name,status,exit,submit,duration,realtime,%cpu,peak_rss,peak_vmem,rchar,read_bytes,wchar,write_bytes,disk' + + diff --git a/algflow/basic/__init__.py b/algflow/basic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/algflow/basic/cmdline_algorithm.py b/algflow/basic/cmdline_algorithm.py new file mode 100644 index 0000000..d54b7f8 --- /dev/null +++ b/algflow/basic/cmdline_algorithm.py @@ -0,0 +1,85 @@ +import dataclasses +import logging +import shlex +import subprocess +from timeit import default_timer + +from algflow import Algorithm + +logger = logging.getLogger(__name__) + + +def safe_decode(s): + if s is None: + return None + return s.decode() + + +@dataclasses.dataclass +class ExecutionResult: + return_code: int + stdout: str + stderr: str + runtime: float + + +class Process: + def __init__(self, cmd, timeout=None): + self.cmd = cmd + self.timeout = timeout + self.start_time = default_timer() + self.process = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + @property + def exe_name(self): + return self.cmd[0] + + def timeout_reached(self): + return self.timeout is not None and default_timer() - self.start_time >= self.timeout + + def report_progress(self): + elapsed_time = default_timer() - self.start_time + if elapsed_time % 20 == 0: + logger.debug(f'{self.exe_name} running for {elapsed_time} secs.') + + def kill_process_and_raise_timeout(self): + self.process.kill() + msg = f'{self.exe_name} timed out at {self.timeout} secs.' + raise TimeoutError(msg) + + def communicate(self) -> ExecutionResult: + # periodically check if the process is still running + while self.process.poll() is None: + if self.timeout_reached(): + self.kill_process_and_raise_timeout() + else: + self.report_progress() + + stdout, stderr = self.process.communicate() + runtime = default_timer() - self.start_time + logger.debug(f'{self.exe_name} completed: {runtime} secs') + + return ExecutionResult(self.process.returncode, safe_decode(stdout), safe_decode(stderr), runtime) + + +class CmdLineAlgorithm(Algorithm): + def cmd(self) -> str: + raise NotImplementedError('This method must be implemented') + + def pre_run(self, inputs, cmd): + pass + + def post_run(self, exec_result, inputs, output): + pass + + def run(self, inputs, outputs): + cmd = self.cmd() + if not cmd: + raise ValueError('cmd is empty') + if not isinstance(cmd, list): + cmd = shlex.split(cmd) + + self.pre_run(input, cmd) + process = Process(cmd) + exec_result = process.communicate() + self.post_run(exec_result, inputs, outputs) diff --git a/algflow/data/handler.py b/algflow/data/handler.py index a171e8b..e6d1c21 100644 --- a/algflow/data/handler.py +++ b/algflow/data/handler.py @@ -89,6 +89,10 @@ def get_multi(self, *args: str) -> dict[str, Any]: def set(self, name, value: Any): pass + @abstractmethod + def push(self, name, value: Any): + pass + class DataHandlerNotFound(Exception): def __init__(self, path: str): diff --git a/algflow/pipeline/simple_executor.py b/algflow/pipeline/simple_executor.py index 936253a..e746789 100644 --- a/algflow/pipeline/simple_executor.py +++ b/algflow/pipeline/simple_executor.py @@ -1,7 +1,29 @@ +import os +import traceback from abc import ABC, abstractmethod -from typing import Any +from typing import Any, Type import networkx as nx + +from algflow import Algorithm +from algflow.data.handler import DataHandlerManager from algflow.pipeline.main import AlgFlowPipeline +from algflow.algorithm.events import ( + AlgflowEvent, ExecutionStartEvent, ExecutionErrorEvent, + ExecutionEndEvent +) +from algflow.data.container import DataContainer +from dataclasses import field + + + +class AlgorithmExecutor: + def __init__(self, capture_events=True): + self.capture_events = capture_events + self.events = [] + + @abstractmethod + def run(self, algorithm: Type[Algorithm]) -> dict[str, Any]: + class PipelineExecutor(ABC): @@ -15,13 +37,27 @@ def __str__(self) -> str: class SimplePipelineExecutor(PipelineExecutor): + sys_routes: dict[str, DataContainer] = field(default_factory=dict) + def __init__(self, pipeline: AlgFlowPipeline): self.pipeline = pipeline + event_handler_klass = DataHandlerManager().get_handler("system.events", "events") + self.event_handler = event_handler_klass(path="logs/events", scope='system') + # TODO: ideate the event Data Handler + self.sys_routes = {'events': DataContainer('events', None)} + + def emit(self, event: AlgflowEvent): + self.event_handler.push('events', event) def execute(self, **options) -> Any: store = self.pipeline.store params = self.pipeline.params dag = self.pipeline.dag + + task_data = { + 'pid': os.getpid(), + 'run_id': self.run_id, + } # execute the dag for node_name in nx.topological_sort(dag): node = dag.nodes[node_name] @@ -30,9 +66,29 @@ def execute(self, **options) -> Any: alg_klass = dag.algorithms[node_name] alg_params = params.get_param(node_name) inputs = {k: store.get(k) for k, v in alg_klass.Input.__variables__.items()} - alg_instance = alg_klass(alg_params) - outputs = alg_instance(inputs) - store.set_multi(outputs) + + alg_instance = alg_klass(alg_params, capture_events=False) + computation = alg_instance(inputs) + + try: + self.emit(ExecutionStartEvent(node_name, task_data)) + while True: + event = next(computation) + if isinstance(event, AlgflowEvent): + self.event_handler.push('events', event) + except StopIteration as e: + outputs = e.value + store.set_multi(outputs) + except Exception as e: + # save dag and input/output state + # generate debug info + # generate ExecutionError + debug_info = {'error': str(e), 'stack_trace': traceback.format_exc()} + self.emit(ExecutionErrorEvent(node_name, debug_info)) + raise e + finally: + self.emit(ExecutionEndEvent(node_name, task_data)) + pass else: print('Ignoring node:', node_name)