flowrun is a lightweight async DAG orchestrator for small to medium ETL pipelines.
It is designed for low operational overhead and low complexity workloads (for example
Polars pipelines, API ingest + transform + load jobs, or periodic data sync tasks).
Core ideas:
- Keep orchestration simple: declare tasks + dependencies, run a DAG.
- Keep runtime dependency-free: stdlib-based implementation.
- Keep behavior explicit: retries, timeouts, skip semantics, run reports.
pip install flowrun-dagThe import name remains
flowrun:import flowrun
For development:
git clone https://github.com/Mg30/flowrun.git
cd flowrun
uv sync --group dev
uv run pytest -qimport asyncio
from dataclasses import dataclass
from flowrun import RunContext, build_default_engine
engine = build_default_engine(max_workers=4, max_parallel=3)
@dataclass(frozen=True)
class Deps:
source_path: str
@engine.task(name="extract", dag="daily_etl")
def extract(context: RunContext[Deps]) -> list[dict]:
# In real jobs, read from file/API/db
return [{"id": 1, "amount": 10}, {"id": 2, "amount": 15}]
@engine.task(name="transform", dag="daily_etl", deps=[extract])
def transform(extract: list[dict]) -> dict[str, int]:
total = sum(row["amount"] for row in extract)
return {"rows": len(extract), "total": total}
@engine.task(name="load", dag="daily_etl", deps=[transform])
def load(transform: dict[str, int]) -> str:
# Persist results
return f"loaded rows={transform['rows']} total={transform['total']}"
async def main() -> None:
ctx = RunContext(Deps(source_path="/tmp/data.json"))
async with engine:
engine.validate("daily_etl")
run_id = await engine.run_once("daily_etl", context=ctx)
report = engine.get_run_report(run_id)
print(report["status"]) # SUCCESS | FAILED | RUNNING
asyncio.run(main())- Task: Python callable registered with
@engine.task(...)or@task(...). - DAG: namespace (
dag="name") plus dependency edges between tasks. - Run: one execution instance of a DAG (
run_id). - State store: tracks run/task status, timing, errors, and results.
Task status lifecycle:
PENDING -> RUNNING -> SUCCESSPENDING -> RUNNING -> FAILED -> PENDING(retry path)PENDING -> SKIPPED(blocked by failed upstream)
engine = build_default_engine(
executor=None,
max_workers=8,
max_parallel=4,
logger=None,
hooks=None,
state_store=None,
)Parameters:
executor: optionalconcurrent.futures.Executorfor sync tasks.max_workers: thread pool size ifexecutoris not provided.max_parallel: max concurrent scheduled tasks, must be>= 1.logger: optionallogging.Loggerused across components.hooks: optional list ofRunHookhandlers.state_store: optional custom state store (StateStoreProtocol).
Returns: configured Engine.
Run control:
await engine.run_once(dag_name, context=None) -> strawait engine.resume(run_id, from_tasks=None, context=None) -> strawait engine.run_subgraph(dag_name, targets, context=None) -> str
Validation and discovery:
engine.validate(dag_name) -> Noneengine.list_dags() -> list[str]engine.list_tasks(dag_name) -> list[str]engine.display_dag(dag_name) -> str
Reporting:
engine.get_run_report(run_id) -> dict
Resource lifecycle:
engine.close() -> Noneasync with engine:closes owned thread pool on exit.
Preferred style (bound to engine registry):
@engine.task(name="task_a", dag="etl", deps=[...], timeout_s=30.0, retries=1, retain_result=True)
def task_a(...):
...Arguments:
name: optional, defaults to function name.dag: DAG namespace for selection viarun_once(dag_name).deps: list of task names or decorated task callables.timeout_s: per-attempt timeout (Nonedisables timeout).retries: retry count after failures.retain_result: ifFalse, clear result from state when safe.
Avoid repeating dag=... with a DAG-scoped container:
etl = engine.dag("daily_etl")
@etl.task(name="extract")
def extract() -> list[int]:
return [1, 2, 3]
@etl.task(name="sum_values", deps=[extract])
def sum_values(extract: list[int]) -> int:
return sum(extract)
run_id = await etl.run_once()Available on the scope:
etl.task(...)etl.task_template(...)await etl.run_once(context=None)await etl.run_subgraph(targets, context=None)etl.validate(),etl.display(),etl.list_tasks()
Also available as global decorator:
from flowrun import task, TaskRegistry
registry = TaskRegistry()
token = registry.activate()
@task
def my_task():
return 1
TaskRegistry.deactivate(token)Notes:
@task(...),@task, and@task("name", ...)are supported.- If using global
@task, provideregistry=...or activate one.
Named dependency injection:
@engine.task(name="extract", dag="etl")
def extract() -> list[int]:
return [1, 2, 3]
@engine.task(name="sum_values", dag="etl", deps=[extract])
def sum_values(extract: list[int]) -> int:
return sum(extract)Generic upstream injection:
@engine.task(name="combine", dag="etl", deps=["a", "b"])
def combine(upstream: dict[str, object]) -> object:
return (upstream["a"], upstream["b"])If upstream is declared, named dependency injection is disabled.
Tasks can accept a typed RunContext[...] as a positional parameter.
@dataclass(frozen=True)
class Deps:
api_base: str
@engine.task(name="pull", dag="etl")
def pull(context: RunContext[Deps]) -> dict:
return {"base": context.api_base}Register parameterized task variants.
def fetch_table(*, table: str) -> str:
return f"select * from {table}"
tpl = engine.task_template(fetch_table, dag="etl")
tpl.bind("fetch_users", table="users")
tpl.bind("fetch_orders", table="orders")- If tasks use explicit
dag=...namespaces, unknown DAG names raiseValueError. - Error messages include available DAG names and a close-match suggestion.
- Legacy behavior remains for unscoped registries (single implicit DAG).
Build-time validation catches:
- Missing dependencies.
- Cross-DAG dependencies.
- Cycles.
Missing dependency errors include close-match suggestions when available.
- Retries are per task and per run attempt.
- Downstream tasks are skipped only after upstream retries are exhausted.
- Applied per attempt.
- Async tasks use
asyncio.wait_for. - Sync tasks run in executor and are awaited with timeout.
retain_result=True(default): keep result in state.retain_result=False: clear result once all downstream consumers are launched/done.- Useful to reduce memory when passing larger intermediate objects.
engine.get_run_report(run_id) returns:
{
"run_id": "...",
"dag_name": "...",
"created_at": 0.0,
"finished_at": 0.0,
"status": "SUCCESS", # SUCCESS | FAILED | RUNNING
"tasks": {
"task_name": {
"status": "SUCCESS",
"attempt": 1,
"started_at": 0.0,
"finished_at": 0.0,
"error": None,
"result": {...}
}
}
}Run-level status rules:
FAILEDif any task isFAILEDorSKIPPED.SUCCESSif all tasks areSUCCESS.RUNNINGotherwise.
Use hooks to emit metrics, alerts, or tracing signals.
from flowrun import fn_hook, build_default_engine
hook = fn_hook(
on_task_failure=lambda e: print(f"FAIL {e.task_name}: {e.error}"),
on_dag_end=lambda e: print(f"DAG done: {e.dag_name}"),
)
engine = build_default_engine(hooks=[hook])Hook API:
RunHookclass with overridable methods.fn_hook(...)for function-based handlers.- Hook errors are caught and logged (do not crash runs).
Events:
DagStartEvent,DagEndEventTaskStartEvent,TaskSuccessEvent,TaskFailureEventTaskRetryEvent,TaskSkipEvent
In-memory (default):
StateStore/InMemoryStateStore- Fast, process-local, ephemeral.
SQLite persistent backend:
SqliteStateStore(db_path, serializer=..., cache_ttl_s=None, recover=False)- Persists run and task state.
- Optional crash recovery marks orphaned
RUNNINGtasks as failed.
Serialization options for persisted results:
JsonSerializer(default for SQLite backend)PickleSerializer- custom
ResultSerializerimplementation
- Keep each task focused (
extract,transform,load). - Set
retain_result=Falseon large intermediate transforms. - Use
retrieson flaky IO tasks, not pure transforms. - Keep
max_parallelmodest for predictable resource use.
new_run_id = await engine.resume(old_run_id, from_tasks=["transform"], context=ctx)This re-executes transform and all downstream tasks, while preserving unaffected successful upstream tasks.
run_id = await engine.run_subgraph("daily_etl", targets=["load"], context=ctx)This executes load plus all transitive dependencies required for load.
Pass a logger to build_default_engine(logger=...).
Typical levels:
INFO: DAG start/finish, task success, retries, skips.WARNING: task failures, timeouts.DEBUG: task launch details, tracebacks, shutdown details.
Recommended test layers:
- Unit test each task function directly.
- Integration test DAG execution with
build_default_engine(). - Validate topology with
engine.validate(...)andengine.list_tasks(...). - Assert on
get_run_report(...)for end-to-end behavior.
Top-level exports in flowrun:
Engine,build_default_engineRunContexttask,task_template,TaskSpec,TaskRegistrySchedulerConfigRunHook,fn_hookStateStore,InMemoryStateStore,StateStoreProtocolSqliteStateStoreJsonSerializer,PickleSerializer,ResultSerializer
MIT