Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions algflow/algorithm/algorithm.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from typing import Any
import inspect
from typing import Any, Generator

from algflow.algorithm.alg_meta import AlgorithmMetaClass
from algflow.algorithm.output import Output
from algflow.algorithm.events import AlgflowEvent

logger = logging.getLogger('Algorithm')

Expand All @@ -15,10 +16,12 @@ def __init__(self, algo, err):


class Algorithm(metaclass=AlgorithmMetaClass):
def __init__(self, params=None):
def __init__(self, params=None, capture_events=True):
self.params = params
self.capture_events = capture_events
self.events = []

def run(self, inputs, outputs):
def run(self, inputs, outputs) -> Generator[AlgflowEvent] | None:
raise NotImplementedError('This method must be implemented')

# def init_input(self, storage):
Expand Down Expand Up @@ -47,6 +50,12 @@ def __getstate__(self):
def __call__(self, inputs: dict[str, Any]) -> dict[str, Any]:
inputs = self.Input(**inputs)
outputs = self.Output()
self.run(inputs, outputs)
if inspect.isgeneratorfunction(self.run):
if self.capture_events:
self.events = [ev for ev in self.run(inputs, outputs)]
else:
yield from self.run(inputs, outputs)
else:
self.run(inputs, outputs)
outputs.validate()
return outputs.__dict__
26 changes: 26 additions & 0 deletions algflow/algorithm/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from dataclasses import dataclass, field
import time


@dataclass
class AlgflowEvent:
name: str
algorithm: str
data: dict
timestamp: int = field(default_factory=time.time)

def __str__(self):
return f'Event[{self.name}] {self.timestamp} {self.algorithm} {self.data}'


class ExecutionStartEvent(AlgflowEvent):
def __init__(self, algorithm: str, data: dict):
super().__init__('ExecutionStart', algorithm, data)

class ExecutionEndEvent(AlgflowEvent):
def __init__(self, algorithm: str, data: dict):
super().__init__('ExecutionEnd', algorithm, data)

class ExecutionErrorEvent(AlgflowEvent):
def __init__(self, algorithm: str, debug_info: dict):
super().__init__('ExecutionError', algorithm, debug_info)
21 changes: 21 additions & 0 deletions algflow/algorithm/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dataclasses import dataclass


@dataclass
class ResourceSchema:
cpu: int
gpu_count: int
memory: str
storage: str

@dataclass
class Trace:
task_id: str
hash: str
native_id: str
name: str
status: str

fields = 'task_id,hash,native_id,name,status,exit,submit,duration,realtime,%cpu,peak_rss,peak_vmem,rchar,read_bytes,wchar,write_bytes,disk'


Empty file added algflow/basic/__init__.py
Empty file.
85 changes: 85 additions & 0 deletions algflow/basic/cmdline_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import dataclasses
import logging
import shlex
import subprocess
from timeit import default_timer

from algflow import Algorithm

logger = logging.getLogger(__name__)


def safe_decode(s):
if s is None:
return None
return s.decode()


@dataclasses.dataclass
class ExecutionResult:
return_code: int
stdout: str
stderr: str
runtime: float


class Process:
def __init__(self, cmd, timeout=None):
self.cmd = cmd
self.timeout = timeout
self.start_time = default_timer()
self.process = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

@property
def exe_name(self):
return self.cmd[0]

def timeout_reached(self):
return self.timeout is not None and default_timer() - self.start_time >= self.timeout

def report_progress(self):
elapsed_time = default_timer() - self.start_time
if elapsed_time % 20 == 0:
logger.debug(f'{self.exe_name} running for {elapsed_time} secs.')

def kill_process_and_raise_timeout(self):
self.process.kill()
msg = f'{self.exe_name} timed out at {self.timeout} secs.'
raise TimeoutError(msg)

def communicate(self) -> ExecutionResult:
# periodically check if the process is still running
while self.process.poll() is None:
if self.timeout_reached():
self.kill_process_and_raise_timeout()
else:
self.report_progress()

stdout, stderr = self.process.communicate()
runtime = default_timer() - self.start_time
logger.debug(f'{self.exe_name} completed: {runtime} secs')

return ExecutionResult(self.process.returncode, safe_decode(stdout), safe_decode(stderr), runtime)


class CmdLineAlgorithm(Algorithm):
def cmd(self) -> str:
raise NotImplementedError('This method must be implemented')

def pre_run(self, inputs, cmd):
pass

def post_run(self, exec_result, inputs, output):
pass

def run(self, inputs, outputs):
cmd = self.cmd()
if not cmd:
raise ValueError('cmd is empty')
if not isinstance(cmd, list):
cmd = shlex.split(cmd)

self.pre_run(input, cmd)
process = Process(cmd)
exec_result = process.communicate()
self.post_run(exec_result, inputs, outputs)
4 changes: 4 additions & 0 deletions algflow/data/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def get_multi(self, *args: str) -> dict[str, Any]:
def set(self, name, value: Any):
pass

@abstractmethod
def push(self, name, value: Any):
pass


class DataHandlerNotFound(Exception):
def __init__(self, path: str):
Expand Down
64 changes: 60 additions & 4 deletions algflow/pipeline/simple_executor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,29 @@
import os
import traceback
from abc import ABC, abstractmethod
from typing import Any
from typing import Any, Type
import networkx as nx

from algflow import Algorithm
from algflow.data.handler import DataHandlerManager
from algflow.pipeline.main import AlgFlowPipeline
from algflow.algorithm.events import (
AlgflowEvent, ExecutionStartEvent, ExecutionErrorEvent,
ExecutionEndEvent
)
from algflow.data.container import DataContainer
from dataclasses import field



class AlgorithmExecutor:
def __init__(self, capture_events=True):
self.capture_events = capture_events
self.events = []

@abstractmethod
def run(self, algorithm: Type[Algorithm]) -> dict[str, Any]:



class PipelineExecutor(ABC):
Expand All @@ -15,13 +37,27 @@ def __str__(self) -> str:


class SimplePipelineExecutor(PipelineExecutor):
sys_routes: dict[str, DataContainer] = field(default_factory=dict)

def __init__(self, pipeline: AlgFlowPipeline):
self.pipeline = pipeline
event_handler_klass = DataHandlerManager().get_handler("system.events", "events")
self.event_handler = event_handler_klass(path="logs/events", scope='system')
# TODO: ideate the event Data Handler
self.sys_routes = {'events': DataContainer('events', None)}

def emit(self, event: AlgflowEvent):
self.event_handler.push('events', event)

def execute(self, **options) -> Any:
store = self.pipeline.store
params = self.pipeline.params
dag = self.pipeline.dag

task_data = {
'pid': os.getpid(),
'run_id': self.run_id,
}
# execute the dag
for node_name in nx.topological_sort(dag):
node = dag.nodes[node_name]
Expand All @@ -30,9 +66,29 @@ def execute(self, **options) -> Any:
alg_klass = dag.algorithms[node_name]
alg_params = params.get_param(node_name)
inputs = {k: store.get(k) for k, v in alg_klass.Input.__variables__.items()}
alg_instance = alg_klass(alg_params)
outputs = alg_instance(inputs)
store.set_multi(outputs)

alg_instance = alg_klass(alg_params, capture_events=False)
computation = alg_instance(inputs)

try:
self.emit(ExecutionStartEvent(node_name, task_data))
while True:
event = next(computation)
if isinstance(event, AlgflowEvent):
self.event_handler.push('events', event)
except StopIteration as e:
outputs = e.value
store.set_multi(outputs)
except Exception as e:
# save dag and input/output state
# generate debug info
# generate ExecutionError
debug_info = {'error': str(e), 'stack_trace': traceback.format_exc()}
self.emit(ExecutionErrorEvent(node_name, debug_info))
raise e
finally:
self.emit(ExecutionEndEvent(node_name, task_data))
pass
else:
print('Ignoring node:', node_name)

Expand Down