Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions clients/python/coflux/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .context import (
asset,
checkpoint,
group,
log_debug,
log_error,
Expand All @@ -9,17 +8,15 @@
suspend,
suspense,
)
from .decorators import sensor, stub, task, workflow
from .models import Asset, Execution
from .decorators import stub, task, workflow
from .models import Asset, Execution, Retries
from .worker import Worker

__all__ = [
"workflow",
"task",
"stub",
"sensor",
"group",
"checkpoint",
"suspense",
"suspend",
"log_debug",
Expand All @@ -29,5 +26,6 @@
"asset",
"Execution",
"Asset",
"Retries",
"Worker",
]
97 changes: 40 additions & 57 deletions clients/python/coflux/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,63 +106,46 @@ def _register_manifests(
) -> None:
manifests = {
module: {
"workflows": {
workflow_name: {
"parameters": [
{
"name": p.name,
"annotation": p.annotation,
"default": p.default,
}
for p in definition.parameters
],
"waitFor": list(definition.wait_for),
"cache": (
definition.cache
and {
"params": definition.cache.params,
"maxAge": definition.cache.max_age,
"namespace": definition.cache.namespace,
"version": definition.cache.version,
}
),
"defer": (
definition.defer
and {
"params": definition.defer.params,
}
),
"delay": definition.delay,
"retries": (
definition.retries
and {
"limit": definition.retries.limit,
"delayMin": definition.retries.delay_min,
"delayMax": definition.retries.delay_max,
}
),
"requires": definition.requires,
"instruction": definition.instruction,
}
for workflow_name, (definition, _) in target.items()
if definition.type == "workflow"
},
"sensors": {
sensor_name: {
"parameters": [
{
"name": p.name,
"annotation": p.annotation,
"default": p.default,
}
for p in definition.parameters
],
"requires": definition.requires,
"instruction": definition.instruction,
}
for sensor_name, (definition, _) in target.items()
if definition.type == "sensor"
},
workflow_name: {
"parameters": [
{
"name": p.name,
"annotation": p.annotation,
"default": p.default,
}
for p in definition.parameters
],
"waitFor": list(definition.wait_for),
"cache": (
definition.cache
and {
"params": definition.cache.params,
"maxAge": definition.cache.max_age,
"namespace": definition.cache.namespace,
"version": definition.cache.version,
}
),
"defer": (
definition.defer
and {
"params": definition.defer.params,
}
),
"delay": definition.delay,
"retries": (
definition.retries
and {
"limit": definition.retries.limit,
"delayMin": definition.retries.delay_min,
"delayMax": definition.retries.delay_max,
}
),
"recurrent": definition.recurrent,
"requires": definition.requires,
"instruction": definition.instruction,
}
for workflow_name, (definition, _) in target.items()
if definition.type == "workflow"
}
for module, target in targets.items()
}
Expand Down
6 changes: 0 additions & 6 deletions clients/python/coflux/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ def submit(
cache: models.Cache | None = None,
retries: models.Retries | None = None,
defer: models.Defer | None = None,
execute_after: dt.datetime | None = None,
delay: float | dt.timedelta = 0,
memo: list[int] | bool = False,
requires: types.Requires | None = None,
Expand All @@ -29,7 +28,6 @@ def submit(
cache=cache,
retries=retries,
defer=defer,
execute_after=execute_after,
delay=delay,
memo=memo,
requires=requires,
Expand Down Expand Up @@ -63,10 +61,6 @@ def asset(
return execution.get_channel().create_asset(entries, at=at, match=match, name=name)


def checkpoint(*arguments: t.Any) -> None:
return execution.get_channel().record_checkpoint(arguments)


def log_debug(template: str | None = None, **kwargs) -> None:
execution.get_channel().log_message(0, template, **kwargs)

Expand Down
59 changes: 29 additions & 30 deletions clients/python/coflux/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,24 @@ def _parse_cache(


def _parse_retries(
retries: int | tuple[int, int] | tuple[int, int, int],
retries: int | bool | models.Retries,
) -> models.Retries | None:
# TODO: parse string (e.g., '1h')
match retries:
case 0:
case False | 0:
return None
case True:
# Unlimited with sensible defaults (1s-60s backoff)
return models.Retries(None, 1000, 60000)
case int(limit):
return models.Retries(limit, 0, 0)
case (limit, delay):
return models.Retries(limit, delay, delay)
case (limit, delay_min, delay_max):
return models.Retries(limit, delay_min, delay_max)
case other:
raise ValueError(other)
case models.Retries(limit, delay_min, delay_max):
if limit == 0:
return None
return models.Retries(
limit,
int(delay_min * 1000),
int(delay_max * 1000),
)


def _parse_defer(
Expand Down Expand Up @@ -124,7 +128,8 @@ def _build_definition(
cache_params: t.Iterable[str] | str | None,
cache_namespace: str | None,
cache_version: str | None,
retries: int | tuple[int, int] | tuple[int, int, int],
retries: int | bool | models.Retries,
recurrent: bool,
defer: bool,
defer_params: t.Iterable[str] | str | None,
delay: float | dt.timedelta,
Expand All @@ -145,6 +150,7 @@ def _build_definition(
_parse_defer(defer, defer_params, parameters_),
_parse_delay(delay),
_parse_retries(retries),
recurrent,
_parse_memo(memo, parameters_),
_parse_requires(requires),
inspect.getdoc(fn),
Expand Down Expand Up @@ -180,7 +186,8 @@ def __init__(
cache_params: t.Iterable[str] | str | None = None,
cache_namespace: str | None = None,
cache_version: str | None = None,
retries: int | tuple[int, int] | tuple[int, int, int] = 0,
retries: int | bool | models.Retries = 0,
recurrent: bool = False,
defer: bool = False,
defer_params: t.Iterable[str] | str | None = None,
delay: float | dt.timedelta = 0,
Expand All @@ -200,6 +207,7 @@ def __init__(
cache_namespace,
cache_version,
retries,
recurrent,
defer,
defer_params,
delay,
Expand Down Expand Up @@ -232,6 +240,7 @@ def submit(self, *args: P.args, **kwargs: P.kwargs) -> models.Execution[T]:
wait_for=self._definition.wait_for,
cache=self._definition.cache,
retries=self._definition.retries,
recurrent=self._definition.recurrent,
defer=self._definition.defer,
delay=self._definition.delay,
memo=self._definition.memo,
Expand Down Expand Up @@ -290,7 +299,8 @@ def task(
cache_params: t.Iterable[str] | str | None = None,
cache_namespace: str | None = None,
cache_version: str | None = None,
retries: int | tuple[int, int] | tuple[int, int, int] = 0,
retries: int | bool | models.Retries = 0,
recurrent: bool = False,
defer: bool = False,
defer_params: t.Iterable[str] | str | None = None,
delay: float | dt.timedelta = 0,
Expand All @@ -308,6 +318,7 @@ def decorator(fn: t.Callable[P, T]) -> Target[P, T]:
cache_namespace=cache_namespace,
cache_version=cache_version,
retries=retries,
recurrent=recurrent,
defer=defer,
defer_params=defer_params,
delay=delay,
Expand All @@ -326,7 +337,8 @@ def workflow(
cache_params: t.Iterable[str] | str | None = None,
cache_namespace: str | None = None,
cache_version: str | None = None,
retries: int | tuple[int, int] | tuple[int, int, int] = 0,
retries: int | bool | models.Retries = 0,
recurrent: bool = False,
defer: bool = False,
defer_params: t.Iterable[str] | str | None = None,
delay: float | dt.timedelta = 0,
Expand All @@ -343,6 +355,7 @@ def decorator(fn: t.Callable[P, T]) -> Target[P, T]:
cache_namespace=cache_namespace,
cache_version=cache_version,
retries=retries,
recurrent=recurrent,
defer=defer,
defer_params=defer_params,
delay=delay,
Expand All @@ -362,7 +375,8 @@ def stub(
cache_params: t.Iterable[str] | str | None = None,
cache_namespace: str | None = None,
cache_version: str | None = None,
retries: int | tuple[int, int] | tuple[int, int, int] = 0,
retries: int | bool | models.Retries = 0,
recurrent: bool = False,
defer: bool = False,
defer_params: t.Iterable[str] | str | None = None,
delay: float | dt.timedelta = 0,
Expand All @@ -380,6 +394,7 @@ def decorator(fn: t.Callable[P, T]) -> Target[P, T]:
cache_namespace=cache_namespace,
cache_version=cache_version,
retries=retries,
recurrent=recurrent,
defer=defer,
defer_params=defer_params,
delay=delay,
Expand All @@ -388,19 +403,3 @@ def decorator(fn: t.Callable[P, T]) -> Target[P, T]:
)

return decorator


def sensor(
*,
name=None,
requires: dict[str, str | bool | list[str]] | None = None,
) -> t.Callable[[t.Callable[P, None]], Target[P, None]]:
def decorator(fn: t.Callable[P, None]) -> Target[P, None]:
return Target(
fn,
"sensor",
name=name,
requires=requires,
)

return decorator
Loading