diff --git a/clients/python/coflux/__init__.py b/clients/python/coflux/__init__.py index 1eb87046..4ad9d42f 100644 --- a/clients/python/coflux/__init__.py +++ b/clients/python/coflux/__init__.py @@ -1,6 +1,5 @@ from .context import ( asset, - checkpoint, group, log_debug, log_error, @@ -9,17 +8,15 @@ suspend, suspense, ) -from .decorators import sensor, stub, task, workflow -from .models import Asset, Execution +from .decorators import stub, task, workflow +from .models import Asset, Execution, Retries from .worker import Worker __all__ = [ "workflow", "task", "stub", - "sensor", "group", - "checkpoint", "suspense", "suspend", "log_debug", @@ -29,5 +26,6 @@ "asset", "Execution", "Asset", + "Retries", "Worker", ] diff --git a/clients/python/coflux/__main__.py b/clients/python/coflux/__main__.py index a977f7c1..a1f8d891 100644 --- a/clients/python/coflux/__main__.py +++ b/clients/python/coflux/__main__.py @@ -106,63 +106,46 @@ def _register_manifests( ) -> None: manifests = { module: { - "workflows": { - workflow_name: { - "parameters": [ - { - "name": p.name, - "annotation": p.annotation, - "default": p.default, - } - for p in definition.parameters - ], - "waitFor": list(definition.wait_for), - "cache": ( - definition.cache - and { - "params": definition.cache.params, - "maxAge": definition.cache.max_age, - "namespace": definition.cache.namespace, - "version": definition.cache.version, - } - ), - "defer": ( - definition.defer - and { - "params": definition.defer.params, - } - ), - "delay": definition.delay, - "retries": ( - definition.retries - and { - "limit": definition.retries.limit, - "delayMin": definition.retries.delay_min, - "delayMax": definition.retries.delay_max, - } - ), - "requires": definition.requires, - "instruction": definition.instruction, - } - for workflow_name, (definition, _) in target.items() - if definition.type == "workflow" - }, - "sensors": { - sensor_name: { - "parameters": [ - { - "name": p.name, - "annotation": p.annotation, - "default": p.default, - } - for p in definition.parameters - ], - "requires": definition.requires, - "instruction": definition.instruction, - } - for sensor_name, (definition, _) in target.items() - if definition.type == "sensor" - }, + workflow_name: { + "parameters": [ + { + "name": p.name, + "annotation": p.annotation, + "default": p.default, + } + for p in definition.parameters + ], + "waitFor": list(definition.wait_for), + "cache": ( + definition.cache + and { + "params": definition.cache.params, + "maxAge": definition.cache.max_age, + "namespace": definition.cache.namespace, + "version": definition.cache.version, + } + ), + "defer": ( + definition.defer + and { + "params": definition.defer.params, + } + ), + "delay": definition.delay, + "retries": ( + definition.retries + and { + "limit": definition.retries.limit, + "delayMin": definition.retries.delay_min, + "delayMax": definition.retries.delay_max, + } + ), + "recurrent": definition.recurrent, + "requires": definition.requires, + "instruction": definition.instruction, + } + for workflow_name, (definition, _) in target.items() + if definition.type == "workflow" } for module, target in targets.items() } diff --git a/clients/python/coflux/context.py b/clients/python/coflux/context.py index 9479e8b8..7646493a 100644 --- a/clients/python/coflux/context.py +++ b/clients/python/coflux/context.py @@ -15,7 +15,6 @@ def submit( cache: models.Cache | None = None, retries: models.Retries | None = None, defer: models.Defer | None = None, - execute_after: dt.datetime | None = None, delay: float | dt.timedelta = 0, memo: list[int] | bool = False, requires: types.Requires | None = None, @@ -29,7 +28,6 @@ def submit( cache=cache, retries=retries, defer=defer, - execute_after=execute_after, delay=delay, memo=memo, requires=requires, @@ -63,10 +61,6 @@ def asset( return execution.get_channel().create_asset(entries, at=at, match=match, name=name) -def checkpoint(*arguments: t.Any) -> None: - return execution.get_channel().record_checkpoint(arguments) - - def log_debug(template: str | None = None, **kwargs) -> None: execution.get_channel().log_message(0, template, **kwargs) diff --git a/clients/python/coflux/decorators.py b/clients/python/coflux/decorators.py index cac5fd55..b63bd4d1 100644 --- a/clients/python/coflux/decorators.py +++ b/clients/python/coflux/decorators.py @@ -74,20 +74,24 @@ def _parse_cache( def _parse_retries( - retries: int | tuple[int, int] | tuple[int, int, int], + retries: int | bool | models.Retries, ) -> models.Retries | None: - # TODO: parse string (e.g., '1h') match retries: - case 0: + case False | 0: return None + case True: + # Unlimited with sensible defaults (1s-60s backoff) + return models.Retries(None, 1000, 60000) case int(limit): return models.Retries(limit, 0, 0) - case (limit, delay): - return models.Retries(limit, delay, delay) - case (limit, delay_min, delay_max): - return models.Retries(limit, delay_min, delay_max) - case other: - raise ValueError(other) + case models.Retries(limit, delay_min, delay_max): + if limit == 0: + return None + return models.Retries( + limit, + int(delay_min * 1000), + int(delay_max * 1000), + ) def _parse_defer( @@ -124,7 +128,8 @@ def _build_definition( cache_params: t.Iterable[str] | str | None, cache_namespace: str | None, cache_version: str | None, - retries: int | tuple[int, int] | tuple[int, int, int], + retries: int | bool | models.Retries, + recurrent: bool, defer: bool, defer_params: t.Iterable[str] | str | None, delay: float | dt.timedelta, @@ -145,6 +150,7 @@ def _build_definition( _parse_defer(defer, defer_params, parameters_), _parse_delay(delay), _parse_retries(retries), + recurrent, _parse_memo(memo, parameters_), _parse_requires(requires), inspect.getdoc(fn), @@ -180,7 +186,8 @@ def __init__( cache_params: t.Iterable[str] | str | None = None, cache_namespace: str | None = None, cache_version: str | None = None, - retries: int | tuple[int, int] | tuple[int, int, int] = 0, + retries: int | bool | models.Retries = 0, + recurrent: bool = False, defer: bool = False, defer_params: t.Iterable[str] | str | None = None, delay: float | dt.timedelta = 0, @@ -200,6 +207,7 @@ def __init__( cache_namespace, cache_version, retries, + recurrent, defer, defer_params, delay, @@ -232,6 +240,7 @@ def submit(self, *args: P.args, **kwargs: P.kwargs) -> models.Execution[T]: wait_for=self._definition.wait_for, cache=self._definition.cache, retries=self._definition.retries, + recurrent=self._definition.recurrent, defer=self._definition.defer, delay=self._definition.delay, memo=self._definition.memo, @@ -290,7 +299,8 @@ def task( cache_params: t.Iterable[str] | str | None = None, cache_namespace: str | None = None, cache_version: str | None = None, - retries: int | tuple[int, int] | tuple[int, int, int] = 0, + retries: int | bool | models.Retries = 0, + recurrent: bool = False, defer: bool = False, defer_params: t.Iterable[str] | str | None = None, delay: float | dt.timedelta = 0, @@ -308,6 +318,7 @@ def decorator(fn: t.Callable[P, T]) -> Target[P, T]: cache_namespace=cache_namespace, cache_version=cache_version, retries=retries, + recurrent=recurrent, defer=defer, defer_params=defer_params, delay=delay, @@ -326,7 +337,8 @@ def workflow( cache_params: t.Iterable[str] | str | None = None, cache_namespace: str | None = None, cache_version: str | None = None, - retries: int | tuple[int, int] | tuple[int, int, int] = 0, + retries: int | bool | models.Retries = 0, + recurrent: bool = False, defer: bool = False, defer_params: t.Iterable[str] | str | None = None, delay: float | dt.timedelta = 0, @@ -343,6 +355,7 @@ def decorator(fn: t.Callable[P, T]) -> Target[P, T]: cache_namespace=cache_namespace, cache_version=cache_version, retries=retries, + recurrent=recurrent, defer=defer, defer_params=defer_params, delay=delay, @@ -362,7 +375,8 @@ def stub( cache_params: t.Iterable[str] | str | None = None, cache_namespace: str | None = None, cache_version: str | None = None, - retries: int | tuple[int, int] | tuple[int, int, int] = 0, + retries: int | bool | models.Retries = 0, + recurrent: bool = False, defer: bool = False, defer_params: t.Iterable[str] | str | None = None, delay: float | dt.timedelta = 0, @@ -380,6 +394,7 @@ def decorator(fn: t.Callable[P, T]) -> Target[P, T]: cache_namespace=cache_namespace, cache_version=cache_version, retries=retries, + recurrent=recurrent, defer=defer, defer_params=defer_params, delay=delay, @@ -388,19 +403,3 @@ def decorator(fn: t.Callable[P, T]) -> Target[P, T]: ) return decorator - - -def sensor( - *, - name=None, - requires: dict[str, str | bool | list[str]] | None = None, -) -> t.Callable[[t.Callable[P, None]], Target[P, None]]: - def decorator(fn: t.Callable[P, None]) -> Target[P, None]: - return Target( - fn, - "sensor", - name=name, - requires=requires, - ) - - return decorator diff --git a/clients/python/coflux/execution.py b/clients/python/coflux/execution.py index 510802da..45e39d3e 100644 --- a/clients/python/coflux/execution.py +++ b/clients/python/coflux/execution.py @@ -91,8 +91,9 @@ class SubmitExecutionRequest(t.NamedTuple): cache: models.Cache | None defer: models.Defer | None memo: list[int] | bool - execute_after: dt.datetime | None + delay: int # milliseconds retries: models.Retries | None + recurrent: bool requires: types.Requires | None @@ -124,10 +125,6 @@ class RegisterGroupRequest(t.NamedTuple): name: str | None -class RecordCheckpointRequest(t.NamedTuple): - arguments: list[types.Value] - - class LogMessageRequest(t.NamedTuple): level: int template: str | None @@ -329,19 +326,19 @@ def submit_execution( wait_for: set[int] | None = None, cache: models.Cache | None = None, retries: models.Retries | None = None, + recurrent: bool = False, defer: models.Defer | None = None, - execute_after: dt.datetime | None = None, delay: float | dt.timedelta = 0, memo: list[int] | bool = False, requires: types.Requires | None = None, ) -> models.Execution[t.Any]: + # Convert delay to milliseconds for the server + delay_ms = 0 if delay: - delay = ( - dt.timedelta(seconds=delay) - if isinstance(delay, (int, float)) - else delay - ) - execute_after = (execute_after or dt.datetime.now()) + delay + if isinstance(delay, dt.timedelta): + delay_ms = int(delay.total_seconds() * 1000) + else: + delay_ms = int(delay * 1000) # TODO: parallelise? serialised_arguments = [ self._serialisation_manager.serialise(a) for a in arguments @@ -357,8 +354,9 @@ def submit_execution( cache, defer, memo, - execute_after, + delay_ms, retries, + recurrent, requires, ) ) @@ -443,12 +441,6 @@ def cancel_execution(self, execution_id: int) -> None: # TODO: wait for confirmation? self._notify(CancelExecutionRequest(execution_id)) - def record_checkpoint(self, arguments): - serialised_arguments = [ - self._serialisation_manager.serialise(a) for a in arguments - ] - self._notify(RecordCheckpointRequest(serialised_arguments)) - # TODO: don't support AssetEntry? or somehow consider entry's path? or somehow remove path from AssetEntry..? def create_asset( self, @@ -797,11 +789,6 @@ def _handle_notify(self, message): self._server_notify("cancel", (execution_id,)) case RegisterGroupRequest(execution_id, group_id, name): self._server_notify("register_group", (execution_id, group_id, name)) - case RecordCheckpointRequest(arguments): - self._server_notify( - "record_checkpoint", - (self._id, _json_safe_arguments(arguments)), - ) case LogMessageRequest(level, template, values, timestamp): self._server_notify( "log_messages", ([self._id, timestamp, level, template, values],) @@ -821,13 +808,11 @@ def _handle_request(self, request_id, request): cache, defer, memo, - execute_after, + delay, retries, + recurrent, requires, ): - execute_after_ms = execute_after and int( - execute_after.timestamp() * 1000 - ) self._server_request( "submit", ( @@ -841,8 +826,9 @@ def _handle_request(self, request_id, request): cache and cache._asdict(), defer and defer._asdict(), memo, - execute_after_ms, + delay, retries and retries._asdict(), + recurrent, requires, ), request_id, diff --git a/clients/python/coflux/models.py b/clients/python/coflux/models.py index 96430616..878eb125 100644 --- a/clients/python/coflux/models.py +++ b/clients/python/coflux/models.py @@ -25,9 +25,9 @@ class Defer(t.NamedTuple): class Retries(t.NamedTuple): - limit: int - delay_min: int - delay_max: int + limit: int | None = None # 0 = no retries, None = unlimited + delay_min: float = 1 # seconds + delay_max: float = 60 # seconds class Target(t.NamedTuple): @@ -38,6 +38,7 @@ class Target(t.NamedTuple): defer: Defer | None delay: float retries: Retries | None + recurrent: bool memo: list[int] | bool requires: types.Requires | None instruction: str | None diff --git a/clients/python/coflux/types.py b/clients/python/coflux/types.py index 48af4640..68c15596 100644 --- a/clients/python/coflux/types.py +++ b/clients/python/coflux/types.py @@ -1,6 +1,6 @@ import typing as t -TargetType = t.Literal["workflow", "task", "sensor"] +TargetType = t.Literal["workflow", "task"] Requires = dict[str, list[str]] diff --git a/docs/docs/concurrency.md b/docs/docs/concurrency.md index 09120dc7..35b9eb88 100644 --- a/docs/docs/concurrency.md +++ b/docs/docs/concurrency.md @@ -108,7 +108,7 @@ An example use case might be sending a notification to a user. ## Cancelling executions -Once a task (or workflow/sensor) has been submitted, the returned `Execution` can be used to cancel the running execution: +Once a task or workflow has been submitted, the returned `Execution` can be used to cancel the running execution: ```python @cf.workflow() diff --git a/docs/docs/executions.md b/docs/docs/executions.md index 4337833b..a68d59ba 100644 --- a/docs/docs/executions.md +++ b/docs/docs/executions.md @@ -21,7 +21,7 @@ When an execution is first scheduled, it starts in the _Queued_ state. (unless caching is enabled and there is a cache hit, in which case it transitions straight to _Cached_). -From the _Queued_ state it will transition to _Assigned_ once it is due, and a suitable worker is available to run it, unless: caching is enabled and there's a cache hit; it's referring to a workflow/sensor, in which case that is 'spawned' as a separate run; it become 'deferred' to another execution in the meantime. +From the _Queued_ state it will transition to _Assigned_ once it is due, and a suitable worker is available to run it, unless: caching is enabled and there's a cache hit; it's referring to a workflow, in which case that is 'spawned' as a separate run; it become 'deferred' to another execution in the meantime. Once assigned, the worker will generally execute it until it succeeds (_Succeeded_) or raises an exception (_Failed_). If contact is lost with a worker for more than the timeout period, the task that are running will be marked as _Abandoned_ (we don't know whether it completed successfully). Executions may be _Cancelled_ while they're running (or before they've been assigned). An execution may choose to suspend itself (either explicitly, or from timing out while waiting for another execution to complete) - in this case it will be automatically re-run. diff --git a/docs/docs/getting_started/workflows.md b/docs/docs/getting_started/workflows.md index 56c55fa3..f39225c9 100644 --- a/docs/docs/getting_started/workflows.md +++ b/docs/docs/getting_started/workflows.md @@ -30,7 +30,7 @@ Workflows are defined in _modules_. Typically these are Python modules, but they Put the workflow above into `hello.py`. :::tip -The docstring of a workflow (or sensor) will be available in the UI when running a workflow. This is a great place to explain what the workflow does. +The docstring of a workflow will be available in the UI when running a workflow. This is a great place to explain what the workflow does. ::: Before coming back to more advanced features, let's see how to get this workflow running... diff --git a/docs/docs/recurring.md b/docs/docs/recurring.md new file mode 100644 index 00000000..6c741bc4 --- /dev/null +++ b/docs/docs/recurring.md @@ -0,0 +1,35 @@ +# Recurring tasks + +A task or workflow can be configured to automatically re-execute after it completes successfully, by setting `recurrent=True`: + +```python +@cf.workflow(recurrent=True) +def poll_for_updates(): + updates = fetch_updates() + for update in updates: + process_update.submit(update) +``` + +This continues indefinitely until the run is cancelled. + +## Delay + +By default, recurring tasks restart immediately. Use `delay` to wait between executions: + +```python +@cf.workflow(recurrent=True, delay=60) +def poll_every_minute(): + ... +``` + +The delay is in seconds (or pass a `timedelta`). + +## Retries + +Recurring tasks can be combined with [retries](./retries.md). If a task fails, retries are attempted first. Only successful completions trigger the next recurrence. + +```python +@cf.workflow(recurrent=True, delay=60, retries=3) +def resilient_polling(): + ... +``` diff --git a/docs/docs/retries.md b/docs/docs/retries.md index a9a12665..d16d9ba6 100644 --- a/docs/docs/retries.md +++ b/docs/docs/retries.md @@ -18,10 +18,10 @@ Any tasks waiting for the initial task will continue waiting for retries. If all ## Retry delay -Failures can be caused by external environmental issues, so it's useful to be able to delay the retry. This can be done by passing a delay parameter: +Failures can be caused by external environmental issues, so it's useful to be able to delay the retry. This can be done using the `Retries` class: ```python -@cf.task(retries=(2, 300)) +@cf.task(retries=cf.Retries(limit=2, delay_min=300, delay_max=300)) def send_notification(): ... ``` @@ -30,12 +30,32 @@ In this case, the task will be retried up to twice, with a five minute delay (30 ## Backoff -Alternatively, a range of delays can be specified: +A range of delays can be specified to implement backoff: ```python -@cf.task(retries=(5, 300, 600)) +@cf.task(retries=cf.Retries(limit=5, delay_min=300, delay_max=600)) def send_notification(): ... ``` In this case, the first retry will happen after 300 seconds, and the fifth (and final) retry will happen 600 seconds after the fourth, with an increasing gap between each retry. So (ignoring time taken for the execution and scheduling) attempts of this task would happen at `t+0`, `t+300`, `t+675`, `t+1125`, `t+1650`, `t+2250` (i.e., the final attempt happening over half an hour after the initial attempt). + +## Unlimited retries + +For tasks that should keep retrying indefinitely until they succeed, use `retries=True`: + +```python +@cf.task(retries=True) +def critical_task(): + ... +``` + +This will retry with a random delay between 1 and 60 seconds (the defaults). For custom delays: + +```python +@cf.task(retries=cf.Retries(limit=None, delay_min=10, delay_max=300)) +def critical_task(): + ... +``` + +With unlimited retries, each retry uses a random delay between `delay_min` and `delay_max` seconds. diff --git a/docs/docs/sensors.md b/docs/docs/sensors.md deleted file mode 100644 index 7d37a9c8..00000000 --- a/docs/docs/sensors.md +++ /dev/null @@ -1,42 +0,0 @@ -# Sensors - -Triggering runs on demand may suit some use cases, but often you'll want to be able to react to events occurring in your system. As well as tasks and steps, Coflux provides another target type: sensors. These can be used to monitor a database, watch a file system, or listen to a queue. They provide flexibility to subscribe to events, or poll a resource. - -Sensors are defined in a module, along with workflows and tasks, and hosted by your worker: - -```python -import coflux as cf - -@cf.workflow(): -def process_file(): - ... - -@cf.sensor() -def new_files(): - ... -``` - -Once a sensor is activated, the orchestrator will do its best to ensure the function is always running. Once it terminates, it will be automatically restarted (subject to rate limiting). The sensor is responsible for initiating workflows as needed. - -## Checkpoints - -Typically a sensor needs to maintain some state. For example, to track a database cursor, or the name of the last file that was processed. Coflux supports this be allowing a sensor to 'checkpoint'. In the event that a sensor is restarted, its arguments will be replaced with those that were most recently passed to the `checkpoint` function. - -## An example - -Here's a sensor that periodically starts a workflow: - -```python -@cf.sensor() -def ticker(interval: int = 300, last_tick: float | None = None): - next_tick = last_tick + interval if last_tick else time.time() - while True: - remaining = max(0, next_tick - time.time()) - if remaining: - time.sleep(remaining) - my_workflow.submit() - cf.checkpoint(interval, next_tick) - next_tick += interval -``` - -This will call `my_workflow` every five minutes. The use of checkpointing means that if the worker gets restarted, the interval shouldn't get interrupted (subject to the time and duration of the restart). diff --git a/docs/sidebars.ts b/docs/sidebars.ts index fea00831..2c4e3804 100644 --- a/docs/sidebars.ts +++ b/docs/sidebars.ts @@ -18,6 +18,7 @@ const sidebars: SidebarsConfig = { "executions", "concurrency", "retries", + "recurring", "caching", "groups", "logging", @@ -29,7 +30,6 @@ const sidebars: SidebarsConfig = { "deferring", "memoising", "assets", - "sensors", "stubs", "blobs", ], diff --git a/examples/README.md b/examples/README.md index 5998a5d9..94c15490 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,6 +4,6 @@ This directory contains a few example Coflux projects: - [Edge detection](edge_detection/) - a basic image pipeline, using OpenCV and scikit-image to detect edges in an image. - [Pandas/ETL](pandas_etl/) - an ETL-like pipeline that generates some random sales data and aggregates it. -- [Slack bot](slack_bot/) - a sensor that schedules workflows to respond to Slack messages. +- [Slack bot](slack_bot/) - a workflow that schedules workflows to respond to Slack messages. - [Wikipedia](wikipedia/) - a workflow that queries the Wikipedia API and does some text processing on the top pages. diff --git a/examples/slack_bot/README.md b/examples/slack_bot/README.md index 9d5964d5..a1a7bd4e 100644 --- a/examples/slack_bot/README.md +++ b/examples/slack_bot/README.md @@ -1,6 +1,6 @@ # Examples → Slack bot -This example implements a basic chat bot. Using a sensor, it listens for messages from the Slack API (using the [socket mode](https://api.slack.com/apis/connections/socket) client). Each message that is sent to the bot will cause a workflow run to be scheduled. The workflow adds a random Emoji reaction to the message. +This example implements a basic chat bot. Using a long-running workflow, it listens for messages from the Slack API (using the [socket mode](https://api.slack.com/apis/connections/socket) client). Each message that is sent to the bot will cause a workflow run to be scheduled. The workflow adds a random Emoji reaction to the message. ## Slack bot setup diff --git a/examples/slack_bot/slackbot/workflows.py b/examples/slack_bot/slackbot/workflows.py index 845fc1d3..2641f2d1 100644 --- a/examples/slack_bot/slackbot/workflows.py +++ b/examples/slack_bot/slackbot/workflows.py @@ -39,7 +39,7 @@ def _handle_request(client: SocketModeClient, req: SocketModeRequest): client.send_socket_mode_response(SocketModeResponse(envelope_id=req.envelope_id)) -@cf.sensor() +@cf.workflow() def slack_bot(): client = _socket_client() client.socket_mode_request_listeners.append(_handle_request) diff --git a/server/lib/coflux/application.ex b/server/lib/coflux/application.ex index fb64d54c..40c07082 100644 --- a/server/lib/coflux/application.ex +++ b/server/lib/coflux/application.ex @@ -38,7 +38,6 @@ defmodule Coflux.Application do Topics.Modules, Topics.Run, Topics.Workflow, - Topics.Sensor, Topics.Logs, Topics.Module, Topics.Pools, diff --git a/server/lib/coflux/handlers/api.ex b/server/lib/coflux/handlers/api.ex index 3ef161e2..d62e21c4 100644 --- a/server/lib/coflux/handlers/api.ex +++ b/server/lib/coflux/handlers/api.ex @@ -485,8 +485,9 @@ defmodule Coflux.Handlers.Api do wait_for: {"waitFor", &parse_indexes/1}, cache: {"cache", &parse_cache/1}, defer: {"defer", &parse_defer/1}, - execute_after: {"executeAfter", &parse_integer(&1, optional: true)}, + delay: {"delay", &parse_integer(&1, optional: true)}, retries: {"retries", &parse_retries/1}, + recurrent: {"recurrent", &parse_boolean(&1, optional: true)}, requires: {"requires", &parse_tag_set/1} } ) @@ -500,52 +501,12 @@ defmodule Coflux.Handlers.Api do :workflow, arguments.arguments, space: arguments.space_name, - execute_after: arguments[:execute_after], wait_for: arguments[:wait_for], cache: arguments[:cache], defer: arguments[:defer], - delay: arguments[:delay], + delay: arguments[:delay] || 0, retries: arguments[:retries], - requires: arguments[:requires] - ) do - {:ok, run_id, step_id, execution_id} -> - json_response(req, %{ - "runId" => run_id, - "stepId" => step_id, - "executionId" => execution_id - }) - end - end) - else - json_error_response(req, "bad_request", details: errors) - end - end - - defp handle(req, "POST", ["start_sensor"], namespace) do - {:ok, arguments, errors, req} = - read_arguments( - req, - %{ - project_id: "projectId", - module: "module", - target: "target", - space_name: "spaceName", - arguments: {"arguments", &parse_arguments/1} - }, - %{ - requires: {"requires", &parse_tag_set/1} - } - ) - - if Enum.empty?(errors) do - with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.start_run( - arguments.project_id, - arguments.module, - arguments.target, - :sensor, - arguments.arguments, - space: arguments.space_name, + recurrent: arguments[:recurrent] == true, requires: arguments[:requires] ) do {:ok, run_id, step_id, execution_id} -> @@ -952,6 +913,14 @@ defmodule Coflux.Handlers.Api do end end + defp parse_boolean(value, opts \\ []) do + cond do + opts[:optional] && is_nil(value) -> {:ok, nil} + is_boolean(value) -> {:ok, value} + true -> {:error, :invalid} + end + end + defp parse_string(value, opts) do cond do opts[:optional] && is_nil(value) -> {:ok, nil} @@ -1009,7 +978,8 @@ defmodule Coflux.Handlers.Api do {:ok, nil} is_map(value) -> - with {:ok, limit} <- parse_integer(Map.get(value, "limit")), + # limit can be nil (unlimited) or an integer + with {:ok, limit} <- parse_integer(Map.get(value, "limit"), optional: true), {:ok, delay_min} <- parse_integer(Map.get(value, "delayMin"), optional: true), {:ok, delay_max} <- parse_integer(Map.get(value, "delayMax"), optional: true) do {:ok, %{limit: limit, delay_min: delay_min, delay_max: delay_max}} @@ -1028,6 +998,7 @@ defmodule Coflux.Handlers.Api do {:ok, defer} <- parse_defer(Map.get(value, "defer")), {:ok, delay} <- parse_integer(Map.get(value, "delay")), {:ok, retries} <- parse_retries(Map.get(value, "retries")), + {:ok, recurrent} <- parse_boolean(Map.get(value, "recurrent"), optional: true), {:ok, requires} <- parse_tag_set(Map.get(value, "requires")), {:ok, instruction} <- parse_string( @@ -1043,6 +1014,7 @@ defmodule Coflux.Handlers.Api do defer: defer, delay: delay, retries: retries, + recurrent: recurrent == true, requires: requires, instruction: instruction }} @@ -1055,28 +1027,6 @@ defmodule Coflux.Handlers.Api do end end - defp parse_sensor(value) do - if is_map(value) do - with {:ok, parameters} <- parse_parameters(Map.get(value, "parameters")), - {:ok, requires} <- parse_tag_set(Map.get(value, "requires")), - {:ok, instruction} <- - parse_string( - Map.get(value, "instruction"), - optional: true, - max_length: 5000 - ) do - {:ok, - %{ - parameters: parameters, - requires: requires, - instruction: instruction - }} - end - else - {:error, :invalid} - end - end - defp parse_workflows(value) do Enum.reduce_while(value, {:ok, %{}}, fn {workflow_name, workflow}, {:ok, result} -> if is_valid_target_name?(workflow_name) do @@ -1093,38 +1043,11 @@ defmodule Coflux.Handlers.Api do end) end - defp parse_sensors(value) do - Enum.reduce_while(value, {:ok, %{}}, fn {sensor_name, sensor}, {:ok, result} -> - if is_valid_target_name?(sensor_name) do - case parse_sensor(sensor) do - {:ok, parsed} -> - {:cont, {:ok, Map.put(result, sensor_name, parsed)}} - - {:error, error} -> - {:halt, {:error, error}} - end - else - {:halt, {:error, :invalid}} - end - end) - end - - defp parse_manifest(value) do - if is_map(value) do - with {:ok, workflows} <- parse_workflows(Map.get(value, "workflows", %{})), - {:ok, sensors} <- parse_sensors(Map.get(value, "sensors", %{})) do - {:ok, %{workflows: workflows, sensors: sensors}} - end - else - {:error, :invalid} - end - end - defp parse_manifests(value) do if is_map(value) do - Enum.reduce_while(value, {:ok, %{}}, fn {module, manifest}, {:ok, result} -> + Enum.reduce_while(value, {:ok, %{}}, fn {module, workflows}, {:ok, result} -> if is_valid_module_name?(module) do - case parse_manifest(manifest) do + case parse_workflows(workflows) do {:ok, parsed} -> {:cont, {:ok, Map.put(result, module, parsed)}} diff --git a/server/lib/coflux/handlers/auth.ex b/server/lib/coflux/handlers/auth.ex index 4d13b4d3..9335f6d5 100644 --- a/server/lib/coflux/handlers/auth.ex +++ b/server/lib/coflux/handlers/auth.ex @@ -16,8 +16,8 @@ defmodule Coflux.Handlers.Auth do - If namespaces omitted, defaults to [null] Auth mode is controlled by COFLUX_AUTH_MODE: - - "token" (default): Require valid token with namespace access - - "none": No authentication required + - "none" (default): No authentication required + - "token": Require valid token with namespace access """ alias Coflux.Config diff --git a/server/lib/coflux/handlers/worker.ex b/server/lib/coflux/handlers/worker.ex index 1a3b6677..6638c823 100644 --- a/server/lib/coflux/handlers/worker.ex +++ b/server/lib/coflux/handlers/worker.ex @@ -109,8 +109,9 @@ defmodule Coflux.Handlers.Worker do cache, defer, memo, - execute_after, + delay, retries, + recurrent, requires ] = message["params"] @@ -123,12 +124,13 @@ defmodule Coflux.Handlers.Worker do parse_type(type), Enum.map(arguments, &parse_value/1), group_id: group_id, - execute_after: execute_after, wait_for: wait_for, cache: parse_cache(cache), defer: parse_defer(defer), memo: memo, + delay: delay || 0, retries: parse_retries(retries), + recurrent: recurrent == true, requires: requires ) do {:ok, _run_id, _step_id, execution_id} -> @@ -158,24 +160,6 @@ defmodule Coflux.Handlers.Worker do {[{:close, 4000, "execution_invalid"}], nil} end - "record_checkpoint" -> - [execution_id, arguments] = message["params"] - - if is_recognised_execution?(execution_id, state) do - arguments = Enum.map(arguments, &parse_value/1) - - :ok = - Orchestration.record_checkpoint( - state.project_id, - execution_id, - arguments - ) - - {[], state} - else - {[{:close, 4000, "execution_invalid"}], nil} - end - "notify_terminated" -> [execution_ids] = message["params"] @@ -403,7 +387,6 @@ defmodule Coflux.Handlers.Worker do case type do "workflow" -> :workflow "task" -> :task - "sensor" -> :sensor end end @@ -478,7 +461,7 @@ defmodule Coflux.Handlers.Worker do def parse_retries(value) do if value do %{ - limit: Map.fetch!(value, "limit"), + limit: Map.get(value, "limit"), delay_min: Map.fetch!(value, "delay_min"), delay_max: Map.fetch!(value, "delay_max") } diff --git a/server/lib/coflux/orchestration.ex b/server/lib/coflux/orchestration.ex index 5a17cc97..059dde78 100644 --- a/server/lib/coflux/orchestration.ex +++ b/server/lib/coflux/orchestration.ex @@ -99,10 +99,6 @@ defmodule Coflux.Orchestration do call_server(project_id, {:notify_terminated, execution_ids}) end - def record_checkpoint(project_id, execution_id, arguments) do - call_server(project_id, {:record_checkpoint, execution_id, arguments}) - end - def record_result(project_id, execution_id, result) do call_server(project_id, {:record_result, execution_id, result}) end @@ -158,10 +154,6 @@ defmodule Coflux.Orchestration do call_server(project_id, {:subscribe_workflow, module, target, space_id, pid}) end - def subscribe_sensor(project_id, module, target, space_id, pid) do - call_server(project_id, {:subscribe_sensor, module, target, space_id, pid}) - end - def subscribe_run(project_id, run_id, pid) do call_server(project_id, {:subscribe_run, run_id, pid}) end diff --git a/server/lib/coflux/orchestration/manifests.ex b/server/lib/coflux/orchestration/manifests.ex index 3ddfa2a8..2555f42e 100644 --- a/server/lib/coflux/orchestration/manifests.ex +++ b/server/lib/coflux/orchestration/manifests.ex @@ -6,10 +6,10 @@ defmodule Coflux.Orchestration.Manifests do def register_manifests(db, space_id, manifests) do with_transaction(db, fn -> manifest_ids = - Map.new(manifests, fn {module, manifest} -> + Map.new(manifests, fn {module, workflows} -> {:ok, manifest_id} = - if manifest do - hash = hash_manifest(manifest) + if workflows && map_size(workflows) > 0 do + hash = hash_manifest_workflows(workflows) case query_one(db, "SELECT id FROM manifests WHERE hash = ?1", {{:blob, hash}}) do {:ok, nil} -> @@ -21,8 +21,8 @@ defmodule Coflux.Orchestration.Manifests do :workflows, {:manifest_id, :name, :instruction_id, :parameter_set_id, :wait_for, :cache_config_id, :defer_params, :delay, :retry_limit, :retry_delay_min, - :retry_delay_max, :requires_tag_set_id}, - Enum.map(manifest.workflows, fn {name, workflow} -> + :retry_delay_max, :recurrent, :requires_tag_set_id}, + Enum.map(workflows, fn {name, workflow} -> {:ok, instruction_id} = if workflow.instruction do get_or_create_instruction_id(db, workflow.instruction) @@ -56,43 +56,15 @@ defmodule Coflux.Orchestration.Manifests do do: Utils.encode_params_list(workflow.defer.params) ), workflow.delay, - if(workflow.retries, do: workflow.retries.limit, else: 0), + if(workflow.retries, do: workflow.retries.limit || -1, else: 0), if(workflow.retries, do: workflow.retries.delay_min, else: 0), if(workflow.retries, do: workflow.retries.delay_max, else: 0), + if(workflow.recurrent, do: 1, else: 0), requires_tag_set_id } end) ) - {:ok, _} = - insert_many( - db, - :sensors, - {:manifest_id, :name, :instruction_id, :parameter_set_id, - :requires_tag_set_id}, - Enum.map(manifest.sensors, fn {name, sensor} -> - {:ok, instruction_id} = - if sensor.instruction do - get_or_create_instruction_id(db, sensor.instruction) - else - {:ok, nil} - end - - {:ok, requires_tag_set_id} = - if sensor.requires do - TagSets.get_or_create_tag_set_id(db, sensor.requires) - else - {:ok, nil} - end - - case get_or_create_parameter_set_id(db, sensor.parameters) do - {:ok, parameter_set_id} -> - {manifest_id, name, instruction_id, parameter_set_id, - requires_tag_set_id} - end - end) - ) - {:ok, manifest_id} {:ok, {manifest_id}} -> @@ -172,8 +144,7 @@ defmodule Coflux.Orchestration.Manifests do Enum.reduce(manifest_ids, %{}, fn {module, manifest_id}, result -> if manifest_id do {:ok, workflows} = get_manifest_workflows(db, manifest_id) - {:ok, sensors} = get_manifest_sensors(db, manifest_id) - Map.put(result, module, %{workflows: workflows, sensors: sensors}) + Map.put(result, module, workflows) else result end @@ -187,7 +158,7 @@ defmodule Coflux.Orchestration.Manifests do case query_one( db, """ - SELECT w.parameter_set_id, w.instruction_id, w.wait_for, w.cache_config_id, w.defer_params, w.delay, w.retry_limit, w.retry_delay_min, w.retry_delay_max, w.requires_tag_set_id + SELECT w.parameter_set_id, w.instruction_id, w.wait_for, w.cache_config_id, w.defer_params, w.delay, w.retry_limit, w.retry_delay_min, w.retry_delay_max, w.recurrent, w.requires_tag_set_id FROM space_manifests AS wm LEFT JOIN workflows AS w ON w.manifest_id = wm.manifest_id WHERE wm.space_id = ?1 AND wm.module = ?2 AND w.name = ?3 @@ -201,7 +172,7 @@ defmodule Coflux.Orchestration.Manifests do {:ok, {parameter_set_id, instruction_id, wait_for, cache_config_id, defer_params, delay, - retry_limit, retry_delay_min, retry_delay_max, requires_tag_set_id}} -> + retry_limit, retry_delay_min, retry_delay_max, recurrent, requires_tag_set_id}} -> build_workflow( db, parameter_set_id, @@ -213,37 +184,17 @@ defmodule Coflux.Orchestration.Manifests do retry_limit, retry_delay_min, retry_delay_max, + recurrent, requires_tag_set_id ) end end - def get_latest_sensor(db, space_id, module, target_name) do - case query_one( - db, - """ - SELECT s.parameter_set_id, s.instruction_id, s.requires_tag_set_id - FROM space_manifests AS wm - LEFT JOIN sensors AS s ON s.manifest_id = wm.manifest_id - WHERE wm.space_id = ?1 AND wm.module = ?2 AND s.name = ?3 - ORDER BY wm.created_at DESC - LIMIT 1 - """, - {space_id, module, target_name} - ) do - {:ok, nil} -> - {:ok, nil} - - {:ok, {parameter_set_id, instruction_id, requires_tag_set_id}} -> - build_sensor(db, parameter_set_id, instruction_id, requires_tag_set_id) - end - end - defp get_manifest_workflows(db, manifest_id) do case query( db, """ - SELECT name, instruction_id, parameter_set_id, wait_for, cache_config_id, defer_params, delay, retry_limit, retry_delay_min, retry_delay_max, requires_tag_set_id + SELECT name, instruction_id, parameter_set_id, wait_for, cache_config_id, defer_params, delay, retry_limit, retry_delay_min, retry_delay_max, recurrent, requires_tag_set_id FROM workflows WHERE manifest_id = ?1 """, @@ -253,7 +204,7 @@ defmodule Coflux.Orchestration.Manifests do workflows = Map.new(rows, fn {name, instruction_id, parameter_set_id, wait_for, cache_config_id, defer_params, delay, retry_limit, retry_delay_min, retry_delay_max, - requires_tag_set_id} -> + recurrent, requires_tag_set_id} -> {:ok, workflow} = build_workflow( db, @@ -266,6 +217,7 @@ defmodule Coflux.Orchestration.Manifests do retry_limit, retry_delay_min, retry_delay_max, + recurrent, requires_tag_set_id ) @@ -276,30 +228,7 @@ defmodule Coflux.Orchestration.Manifests do end end - defp get_manifest_sensors(db, manifest_id) do - case query( - db, - """ - SELECT name, parameter_set_id, instruction_id, requires_tag_set_id - FROM sensors - WHERE manifest_id = ?1 - """, - {manifest_id} - ) do - {:ok, rows} -> - sensors = - Map.new(rows, fn {name, parameter_set_id, instruction_id, requires_tag_set_id} -> - {:ok, sensor} = - build_sensor(db, parameter_set_id, instruction_id, requires_tag_set_id) - - {name, sensor} - end) - - {:ok, sensors} - end - end - - defp get_all_workflows_for_space(db, space_id) do + def get_all_workflows_for_space(db, space_id) do case query( db, """ @@ -321,41 +250,6 @@ defmodule Coflux.Orchestration.Manifests do end end - defp get_all_sensors_for_space(db, space_id) do - case query( - db, - """ - SELECT DISTINCT wm.module, s.name - FROM space_manifests AS wm - INNER JOIN manifests AS m on m.id = wm.manifest_id - INNER JOIN sensors AS s ON s.manifest_id = m.id - WHERE wm.space_id = ?1 - """, - {space_id} - ) do - {:ok, rows} -> - {:ok, - Enum.reduce(rows, %{}, fn {module, target_name}, result -> - result - |> Map.put_new(module, MapSet.new()) - |> Map.update!(module, &MapSet.put(&1, target_name)) - end)} - end - end - - def get_all_targets_for_space(db, space_id) do - with {:ok, workflows} <- get_all_workflows_for_space(db, space_id), - {:ok, sensors} <- get_all_sensors_for_space(db, space_id) do - {:ok, workflows, sensors} - end - end - - defp hash_manifest(manifest) do - workflows_hash = hash_manifest_workflows(manifest.workflows) - sensors_hash = hash_manifest_sensors(manifest.sensors) - :crypto.hash(:sha256, [workflows_hash, 0, sensors_hash]) - end - defp hash_manifest_workflows(workflows) do data = Enum.map(workflows, fn {name, workflow} -> @@ -363,15 +257,20 @@ defmodule Coflux.Orchestration.Manifests do name, hash_parameter_set(workflow.parameters), Integer.to_string(Utils.encode_params_set(workflow.wait_for)), + # TODO: fix (workflow.cache.params being true gets encoded to "" as well as not set) - also below/elsewhere if(workflow.cache, do: Utils.encode_params_list(workflow.cache.params), else: ""), if(workflow.cache[:max_age], do: Integer.to_string(workflow.cache.max_age), else: ""), if(workflow.cache[:namespace], do: workflow.cache.namespace, else: ""), if(workflow.cache[:version], do: workflow.cache.version, else: ""), if(workflow.defer, do: Utils.encode_params_list(workflow.defer.params), else: ""), Integer.to_string(workflow.delay), - if(workflow.retries, do: Integer.to_string(workflow.retries.limit), else: ""), + if(workflow.retries, + do: if(workflow.retries.limit, do: Integer.to_string(workflow.retries.limit), else: "unlimited"), + else: "" + ), if(workflow.retries, do: Integer.to_string(workflow.retries.delay_min), else: ""), if(workflow.retries, do: Integer.to_string(workflow.retries.delay_max), else: ""), + if(workflow.recurrent, do: "1", else: "0"), hash_requires(workflow.requires), workflow.instruction || "" ] @@ -380,20 +279,6 @@ defmodule Coflux.Orchestration.Manifests do :crypto.hash(:sha256, Enum.intersperse(data, 0)) end - defp hash_manifest_sensors(sensors) do - data = - Enum.map(sensors, fn {name, sensor} -> - [ - name, - hash_parameter_set(sensor.parameters), - hash_requires(sensor.requires), - sensor.instruction || "" - ] - end) - - :crypto.hash(:sha256, Enum.intersperse(data, 0)) - end - defp build_workflow( db, parameter_set_id, @@ -405,6 +290,7 @@ defmodule Coflux.Orchestration.Manifests do retry_limit, retry_delay_min, retry_delay_max, + recurrent, requires_tag_set_id ) do {:ok, parameters} = get_parameter_set(db, parameter_set_id) @@ -431,12 +317,26 @@ defmodule Coflux.Orchestration.Manifests do end retries = - if retry_limit do - %{ - limit: retry_limit, - delay_min: retry_delay_min, - delay_max: retry_delay_max - } + cond do + # 0 = no retries + retry_limit == 0 -> + nil + + # -1 = unlimited retries + retry_limit == -1 -> + %{ + limit: nil, + delay_min: retry_delay_min, + delay_max: retry_delay_max + } + + # positive = that many retries + true -> + %{ + limit: retry_limit, + delay_min: retry_delay_min, + delay_max: retry_delay_max + } end {:ok, @@ -448,24 +348,7 @@ defmodule Coflux.Orchestration.Manifests do defer: defer, delay: delay, retries: retries, - requires: requires - }} - end - - defp build_sensor(db, parameter_set_id, instruction_id, requires_tag_set_id) do - {:ok, parameters} = get_parameter_set(db, parameter_set_id) - - {:ok, requires} = - if requires_tag_set_id do - TagSets.get_tag_set(db, requires_tag_set_id) - else - {:ok, nil} - end - - {:ok, - %{ - parameters: parameters, - instruction_id: instruction_id, + recurrent: recurrent == 1, requires: requires }} end diff --git a/server/lib/coflux/orchestration/models.ex b/server/lib/coflux/orchestration/models.ex index 04234c04..d9f6c9ae 100644 --- a/server/lib/coflux/orchestration/models.ex +++ b/server/lib/coflux/orchestration/models.ex @@ -31,6 +31,8 @@ defmodule Coflux.Orchestration.Models do :retry_limit, :retry_delay_min, :retry_delay_max, + :recurrent, + :delay, :requires_tag_set_id, :created_at ] diff --git a/server/lib/coflux/orchestration/results.ex b/server/lib/coflux/orchestration/results.ex index 756579bd..41e5507b 100644 --- a/server/lib/coflux/orchestration/results.ex +++ b/server/lib/coflux/orchestration/results.ex @@ -3,75 +3,6 @@ defmodule Coflux.Orchestration.Results do alias Coflux.Orchestration.Values - def record_checkpoint(db, execution_id, arguments) do - with_transaction(db, fn -> - sequence = - case query_one( - db, - """ - SELECT MAX(sequence) - FROM checkpoints - WHERE execution_id = ?1 - """, - {execution_id} - ) do - {:ok, {nil}} -> 1 - {:ok, {max_sequence}} -> max_sequence + 1 - end - - now = current_timestamp() - - {:ok, checkpoint_id} = insert_checkpoint(db, execution_id, sequence, now) - - arguments - |> Enum.with_index() - |> Enum.each(fn {value, position} -> - {:ok, value_id} = Values.get_or_create_value(db, value) - {:ok, _} = insert_checkpoint_argument(db, checkpoint_id, position, value_id) - end) - - {:ok, checkpoint_id, sequence, now} - end) - end - - def get_latest_checkpoint(db, step_id) do - query_one( - db, - """ - SELECT c.id, c.execution_id, c.sequence, c.created_at - FROM checkpoints AS c - INNER JOIN executions AS e ON e.id = c.execution_id - WHERE e.step_id = ?1 - ORDER BY e.attempt DESC, c.sequence DESC - LIMIT 1 - """, - {step_id} - ) - end - - def get_checkpoint_arguments(db, checkpoint_id) do - case query( - db, - """ - SELECT value_id - FROM checkpoint_arguments - WHERE checkpoint_id = ?1 - ORDER BY position - """, - {checkpoint_id} - ) do - {:ok, rows} -> - values = - Enum.map(rows, fn {value_id} -> - case Values.get_value_by_id(db, value_id) do - {:ok, value} -> value - end - end) - - {:ok, values} - end - end - def record_result(db, execution_id, result) do with_transaction(db, fn -> now = current_timestamp() @@ -276,22 +207,6 @@ defmodule Coflux.Orchestration.Results do }) end - defp insert_checkpoint(db, execution_id, sequence, created_at) do - insert_one(db, :checkpoints, %{ - execution_id: execution_id, - sequence: sequence, - created_at: created_at - }) - end - - defp insert_checkpoint_argument(db, checkpoint_id, position, value_id) do - insert_one(db, :checkpoint_arguments, %{ - checkpoint_id: checkpoint_id, - position: position, - value_id: value_id - }) - end - defp current_timestamp() do System.os_time(:millisecond) end diff --git a/server/lib/coflux/orchestration/runs.ex b/server/lib/coflux/orchestration/runs.ex index 1dc0547b..663c089d 100644 --- a/server/lib/coflux/orchestration/runs.ex +++ b/server/lib/coflux/orchestration/runs.ex @@ -24,6 +24,8 @@ defmodule Coflux.Orchestration.Runs do retry_limit, retry_delay_min, retry_delay_max, + recurrent, + delay, requires_tag_set_id, created_at FROM steps @@ -55,6 +57,8 @@ defmodule Coflux.Orchestration.Runs do s.retry_limit, s.retry_delay_min, s.retry_delay_max, + s.recurrent, + s.delay, s.requires_tag_set_id, s.created_at FROM steps AS s @@ -135,7 +139,7 @@ defmodule Coflux.Orchestration.Runs do parent_id = Keyword.get(opts, :parent_id) now = current_timestamp() - # TODO: check that 'type' is :workflow or :sensor? + # TODO: check that 'type' is :workflow? with_transaction(db, fn -> {:ok, run_id, external_run_id} = insert_run(db, parent_id, idempotency_key, now) @@ -261,10 +265,14 @@ defmodule Coflux.Orchestration.Runs do cache = Keyword.get(opts, :cache) defer = Keyword.get(opts, :defer) memo = Keyword.get(opts, :memo) - execute_after = Keyword.get(opts, :execute_after) retries = Keyword.get(opts, :retries) + recurrent = Keyword.get(opts, :recurrent, false) + delay = Keyword.get(opts, :delay, 0) requires = Keyword.get(opts, :requires) || %{} + # Calculate execute_after from delay + execute_after = if delay > 0, do: now + delay + memo_key = if memo, do: build_key(memo, arguments, "#{module}:#{target}") memoised_execution = @@ -324,9 +332,11 @@ defmodule Coflux.Orchestration.Runs do cache_config_id, defer_key, memo_key, - if(retries, do: retries.limit, else: 0), + if(retries, do: retries.limit || -1, else: 0), if(retries, do: retries.delay_min, else: 0), if(retries, do: retries.delay_max, else: 0), + recurrent, + delay, requires_tag_set_id, now ) @@ -681,6 +691,7 @@ defmodule Coflux.Orchestration.Runs do retry_limit, retry_delay_min, retry_delay_max, + recurrent, requires_tag_set_id, created_at FROM steps @@ -738,6 +749,27 @@ defmodule Coflux.Orchestration.Runs do end end + @doc """ + Gets result types for executions of a step, ordered most recent first. + """ + def get_step_result_types(db, step_id, limit) do + case query( + db, + """ + SELECT r.type + FROM executions AS e + INNER JOIN results AS r ON r.execution_id = e.id + WHERE e.step_id = ?1 + ORDER BY e.created_at DESC + LIMIT ?2 + """, + {step_id, limit} + ) do + {:ok, rows} -> + {:ok, Enum.map(rows, fn {type} -> type end)} + end + end + def get_first_step_execution_id(db, step_id) do case query_one( db, @@ -939,6 +971,8 @@ defmodule Coflux.Orchestration.Runs do retry_limit, retry_delay_min, retry_delay_max, + recurrent, + delay, requires_tag_set_id, now ) do @@ -960,6 +994,8 @@ defmodule Coflux.Orchestration.Runs do retry_limit: retry_limit, retry_delay_min: retry_delay_min, retry_delay_max: retry_delay_max, + recurrent: if(recurrent, do: 1, else: 0), + delay: delay, requires_tag_set_id: requires_tag_set_id, created_at: now }) do diff --git a/server/lib/coflux/orchestration/server.ex b/server/lib/coflux/orchestration/server.ex index d81e836d..e039ecf5 100644 --- a/server/lib/coflux/orchestration/server.ex +++ b/server/lib/coflux/orchestration/server.ex @@ -18,7 +18,6 @@ defmodule Coflux.Orchestration.Server do } @session_timeout_ms 5_000 - @sensor_rate_limit_ms 5_000 @connected_worker_poll_interval_ms 30_000 @disconnected_worker_poll_interval_ms 5_000 @worker_idle_timeout_ms 5_000 @@ -377,26 +376,14 @@ defmodule Coflux.Orchestration.Server do :ok -> state = manifests - |> Enum.reduce(state, fn {module, manifest}, state -> - state = - Enum.reduce(manifest.workflows, state, fn {target_name, target}, state -> + |> Enum.reduce(state, fn {module, workflows}, state -> + Enum.reduce(workflows, state, fn {target_name, target}, state -> notify_listeners( state, {:workflow, module, target_name, space_id}, {:target, target} ) end) - - state = - Enum.reduce(manifest.sensors, state, fn {target_name, target}, state -> - notify_listeners( - state, - {:sensor, module, target_name, space_id}, - {:target, target} - ) - end) - - state end) |> notify_listeners( {:modules, space_id}, @@ -405,12 +392,8 @@ defmodule Coflux.Orchestration.Server do |> notify_listeners( {:targets, space_id}, {:manifests, - Map.new(manifests, fn {module_name, targets} -> - {module_name, - %{ - workflows: MapSet.new(Map.keys(targets.workflows)), - sensors: MapSet.new(Map.keys(targets.sensors)) - }} + Map.new(manifests, fn {module_name, workflows} -> + {module_name, MapSet.new(Map.keys(workflows))} end)} ) |> flush_notifications() @@ -678,7 +661,8 @@ defmodule Coflux.Orchestration.Server do }} -> group_id = Keyword.get(opts, :group_id) cache = Keyword.get(opts, :cache) - execute_after = Keyword.get(opts, :execute_after) + delay = Keyword.get(opts, :delay, 0) + execute_after = if delay > 0, do: created_at + delay requires = Keyword.get(opts, :requires) || %{} state = @@ -962,13 +946,6 @@ defmodule Coflux.Orchestration.Server do {:reply, :ok, state} end - def handle_call({:record_checkpoint, execution_id, arguments}, _from, state) do - case Results.record_checkpoint(state.db, execution_id, arguments) do - {:ok, _checkpoint_id, _attempt, _created_at} -> - {:reply, :ok, state} - end - end - def handle_call({:record_result, execution_id, result}, _from, state) do case process_result(state, execution_id, result) do {:ok, state} -> @@ -1298,30 +1275,6 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call( - {:subscribe_sensor, module, target_name, space_id, pid}, - _from, - state - ) do - with {:ok, _} <- lookup_space_by_id(state, space_id), - {:ok, sensor} <- - Manifests.get_latest_sensor(state.db, space_id, module, target_name), - {:ok, instruction} <- - if(sensor && sensor.instruction_id, - do: Manifests.get_instruction(state.db, sensor.instruction_id), - else: {:ok, nil} - ), - {:ok, runs} = Runs.get_target_runs(state.db, module, target_name, :sensor, space_id) do - {:ok, ref, state} = - add_listener(state, {:sensor, module, target_name, space_id}, pid) - - {:reply, {:ok, sensor, instruction, runs, ref}, state} - else - {:error, error} -> - {:reply, {:error, error}, state} - end - end - def handle_call({:subscribe_run, external_run_id, pid}, _from, state) do case Runs.get_run_by_external_id(state.db, external_run_id) do {:ok, nil} -> @@ -1467,27 +1420,21 @@ defmodule Coflux.Orchestration.Server do end def handle_call({:subscribe_targets, space_id, pid}, _from, state) do - # TODO: indicate which are archived (only workflows/sensors) - {:ok, workflows, sensors} = - Manifests.get_all_targets_for_space(state.db, space_id) + # TODO: indicate which are archived + {:ok, workflows} = Manifests.get_all_workflows_for_space(state.db, space_id) {:ok, steps} = Runs.get_steps_for_space(state.db, space_id) result = - Enum.reduce( - %{workflow: workflows, sensor: sensors}, - %{}, - fn {target_type, targets}, result -> - Enum.reduce(targets, result, fn {module_name, target_names}, result -> - Enum.reduce(target_names, result, fn target_name, result -> - put_in( - result, - [Access.key(module_name, %{}), target_name], - {target_type, nil} - ) - end) - end) - end + Enum.reduce(workflows, %{}, fn {module_name, target_names}, result -> + Enum.reduce(target_names, result, fn target_name, result -> + put_in( + result, + [Access.key(module_name, %{}), target_name], + {:workflow, nil} + ) + end) + end ) result = @@ -1618,14 +1565,7 @@ defmodule Coflux.Orchestration.Server do {state, assigned, unassigned} else # TODO: choose session before resolving arguments? - {:ok, arguments} = - case Results.get_latest_checkpoint(state.db, execution.step_id) do - {:ok, nil} -> - Runs.get_step_arguments(state.db, execution.step_id) - - {:ok, {checkpoint_id, _, _, _}} -> - Results.get_checkpoint_arguments(state.db, checkpoint_id) - end + {:ok, arguments} = Runs.get_step_arguments(state.db, execution.step_id) if arguments_ready?(state.db, execution.wait_for, arguments) && dependencies_ready?(state.db, execution.execution_id) do @@ -1672,9 +1612,9 @@ defmodule Coflux.Orchestration.Server do do: Map.fetch!(cache_configs, execution.cache_config_id) ), retries: - if(execution.retry_limit > 0, + if(execution.retry_limit == -1 || execution.retry_limit > 0, do: %{ - limit: execution.retry_limit, + limit: if(execution.retry_limit == -1, do: nil, else: execution.retry_limit), delay_min: execution.retry_delay_min, delay_max: execution.retry_delay_max } @@ -2318,17 +2258,14 @@ defmodule Coflux.Orchestration.Server do attempt: attempt, created_at: created_at }} -> - # TODO: neater way to get execute_after? - execute_after = Keyword.get(opts, :execute_after) + delay = Keyword.get(opts, :delay, 0) + execute_after = if delay > 0, do: created_at + delay execute_at = execute_after || created_at state = state |> notify_listeners( - case type do - :workflow -> {:workflow, module, target_name, space_id} - :sensor -> {:sensor, module, target_name, space_id} - end, + {:workflow, module, target_name, space_id}, {:run, external_run_id, created_at} ) |> notify_listeners( @@ -2398,13 +2335,6 @@ defmodule Coflux.Orchestration.Server do {:run, run.external_id, run.created_at} ) - :sensor -> - notify_listeners( - state, - {:sensor, run_module, run_target, space_id}, - {:run, run.external_id, run.created_at} - ) - _other -> state end @@ -2594,15 +2524,34 @@ defmodule Coflux.Orchestration.Server do {retry_id, state} + result_retryable?(result) && step.retry_limit == -1 -> + # Unlimited retries - random delay between min and max + delay_s = + step.retry_delay_min + + :rand.uniform() * (step.retry_delay_max - step.retry_delay_min) + + execute_after = System.os_time(:millisecond) + delay_s * 1000 + + {:ok, retry_id, _, state} = + rerun_step(state, step, space_id, execute_after: execute_after) + + {retry_id, state} + result_retryable?(result) && step.retry_limit > 0 -> - {:ok, assignments} = Runs.get_step_assignments(state.db, step.id) - attempts = Enum.count(assignments) + # Limited retries - check consecutive failures + {:ok, result_types} = + Runs.get_step_result_types(state.db, step.id, step.retry_limit + 1) + + consecutive_failures = + result_types + |> Enum.take_while(&(&1 in [0, 2])) + |> Enum.count() - if attempts <= step.retry_limit do + if consecutive_failures <= step.retry_limit do # TODO: add jitter (within min/max delay) delay_s = step.retry_delay_min + - (attempts - 1) / step.retry_limit * + (consecutive_failures - 1) / step.retry_limit * (step.retry_delay_max - step.retry_delay_min) execute_after = System.os_time(:millisecond) + delay_s * 1000 @@ -2615,27 +2564,14 @@ defmodule Coflux.Orchestration.Server do {nil, state} end - step.type == :sensor -> - {:ok, assignments} = Runs.get_step_assignments(state.db, step.id) - - if Enum.all?(Map.values(assignments)) do - now = System.os_time(:millisecond) - - last_assigned_at = - assignments |> Map.values() |> Enum.max(&>=/2, fn -> nil end) - - execute_after = - if last_assigned_at && now - last_assigned_at < @sensor_rate_limit_ms do - last_assigned_at + @sensor_rate_limit_ms - end - - {:ok, _, _, state} = - rerun_step(state, step, space_id, execute_after: execute_after) + step.recurrent == 1 and match?({:value, _}, result) -> + execute_after = + if step.delay > 0 do + System.os_time(:millisecond) + step.delay + end - {nil, state} - else - {nil, state} - end + {:ok, _, _, state} = rerun_step(state, step, space_id, execute_after: execute_after) + {nil, state} true -> {nil, state} diff --git a/server/lib/coflux/orchestration/utils.ex b/server/lib/coflux/orchestration/utils.ex index 8f7f92ae..c02ea8a2 100644 --- a/server/lib/coflux/orchestration/utils.ex +++ b/server/lib/coflux/orchestration/utils.ex @@ -33,7 +33,6 @@ defmodule Coflux.Orchestration.Utils do case type do :task -> 0 :workflow -> 1 - :sensor -> 2 end end @@ -41,7 +40,6 @@ defmodule Coflux.Orchestration.Utils do case value do 0 -> :task 1 -> :workflow - 2 -> :sensor end end end diff --git a/server/lib/coflux/topics/modules.ex b/server/lib/coflux/topics/modules.ex index 620fa116..e1d3bec1 100644 --- a/server/lib/coflux/topics/modules.ex +++ b/server/lib/coflux/topics/modules.ex @@ -21,10 +21,9 @@ defmodule Coflux.Topics.Modules do Orchestration.subscribe_modules(project_id, space_id, self()) value = - Map.new(manifests, fn {module, manifest} -> + Map.new(manifests, fn {module, workflows} -> result = %{ - workflows: Map.keys(manifest.workflows), - sensors: Map.keys(manifest.sensors), + workflows: Map.keys(workflows), executing: 0, scheduled: 0, nextDueAt: nil @@ -109,11 +108,9 @@ defmodule Coflux.Topics.Modules do |> Topic.set([module, :nextDueAt], next_due_at) end - defp update_manifest(topic, module, manifest) do - if manifest do - topic - |> Topic.set([module, :workflows], Map.keys(manifest.workflows)) - |> Topic.set([module, :sensors], Map.keys(manifest.sensors)) + defp update_manifest(topic, module, workflows) do + if workflows do + Topic.set(topic, [module, :workflows], Map.keys(workflows)) else Topic.unset(topic, [], module) end diff --git a/server/lib/coflux/topics/search.ex b/server/lib/coflux/topics/search.ex index f3340256..405dc330 100644 --- a/server/lib/coflux/topics/search.ex +++ b/server/lib/coflux/topics/search.ex @@ -31,28 +31,20 @@ defmodule Coflux.Topics.Search do defp process_notification(topic, {:manifests, targets}) do update_in(topic.state.targets, fn existing -> - Enum.reduce(targets, existing, fn {module_name, module_targets}, existing -> - Enum.reduce( - %{workflows: :workflow, sensors: :sensor}, - existing, - fn {key, target_type}, existing -> - module_targets - |> Map.fetch!(key) - |> Enum.reduce(existing, fn target_name, existing -> - existing_target = get_in(existing, [module_name, target_name]) - - if !existing_target || elem(existing_target, 0) != target_type do - put_in( - existing, - [Access.key(module_name, %{}), target_name], - {target_type, nil} - ) - else - existing - end - end) + Enum.reduce(targets, existing, fn {module_name, workflow_names}, existing -> + Enum.reduce(workflow_names, existing, fn target_name, existing -> + existing_target = get_in(existing, [module_name, target_name]) + + if !existing_target || elem(existing_target, 0) != :workflow do + put_in( + existing, + [Access.key(module_name, %{}), target_name], + {:workflow, nil} + ) + else + existing end - ) + end) end) end) end @@ -139,7 +131,7 @@ defmodule Coflux.Topics.Search do name: module_name } - {type, module_name, target_name, latest_run} when type in [:workflow, :sensor, :task] -> + {type, module_name, target_name, latest_run} when type in [:workflow, :task] -> run = case latest_run do {run_id, step_id, attempt} -> %{runId: run_id, stepId: step_id, attempt: attempt} diff --git a/server/lib/coflux/topics/sensor.ex b/server/lib/coflux/topics/sensor.ex deleted file mode 100644 index 896ed0bb..00000000 --- a/server/lib/coflux/topics/sensor.ex +++ /dev/null @@ -1,84 +0,0 @@ -defmodule Coflux.Topics.Sensor do - use Topical.Topic, - route: ["projects", :project_id, "sensors", :module, :target, :space_id] - - alias Coflux.Orchestration - - import Coflux.TopicUtils, only: [validate_project_access: 2] - - def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end - end - - def init(params) do - project_id = Map.fetch!(params, :project_id) - module = Map.fetch!(params, :module) - target_name = Map.fetch!(params, :target) - space_id = String.to_integer(Map.fetch!(params, :space_id)) - - case Orchestration.subscribe_sensor( - project_id, - module, - target_name, - space_id, - self() - ) do - {:ok, sensor, instruction, runs, ref} -> - value = %{ - parameters: if(sensor, do: build_parameters(sensor.parameters)), - instruction: instruction, - configuration: build_configuration(sensor), - runs: build_runs(runs) - } - - {:ok, Topic.new(value, %{ref: ref})} - - {:error, :not_found} -> - {:error, :not_found} - end - end - - def handle_info({:topic, _ref, notifications}, topic) do - topic = Enum.reduce(notifications, topic, &process_notification/2) - {:ok, topic} - end - - defp process_notification({:target, target}, topic) do - topic - |> Topic.set([:parameters], build_parameters(target.parameters)) - |> Topic.set([:instruction], target.instruction) - |> Topic.set([:configuration], build_configuration(target)) - end - - defp process_notification({:run, external_run_id, created_at}, topic) do - Topic.set( - topic, - [:runs, external_run_id], - %{id: external_run_id, createdAt: created_at} - ) - end - - defp build_parameters(parameters) do - Enum.map(parameters, fn {name, default, annotation} -> - %{name: name, default: default, annotation: annotation} - end) - end - - defp build_configuration(sensor) do - if sensor do - %{ - requires: sensor.requires - } - end - end - - defp build_runs(runs) do - Map.new(runs, fn {external_run_id, created_at} -> - {external_run_id, %{id: external_run_id, createdAt: created_at}} - end) - end -end diff --git a/server/lib/coflux/topics/workflow.ex b/server/lib/coflux/topics/workflow.ex index 258efd3c..8ec43153 100644 --- a/server/lib/coflux/topics/workflow.ex +++ b/server/lib/coflux/topics/workflow.ex @@ -104,6 +104,7 @@ defmodule Coflux.Topics.Workflow do defer: build_defer_configuration(workflow.defer), delay: workflow.delay, retries: build_retries_configuration(workflow.retries), + recurrent: workflow.recurrent, requires: workflow.requires } end diff --git a/server/priv/migrations/orchestration/1.sql b/server/priv/migrations/orchestration/1.sql index 032ab40e..4318ab4d 100644 --- a/server/priv/migrations/orchestration/1.sql +++ b/server/priv/migrations/orchestration/1.sql @@ -58,18 +58,6 @@ CREATE TABLE workflows ( FOREIGN KEY (requires_tag_set_id) REFERENCES tag_sets ON DELETE RESTRICT ) STRICT; -CREATE TABLE sensors ( - id INTEGER PRIMARY KEY, - manifest_id INTEGER NOT NULL, - name TEXT NOT NULL, - parameter_set_id INTEGER NOT NULL, - instruction_id INTEGER, - requires_tag_set_id INTEGER, - UNIQUE (manifest_id, name), - FOREIGN KEY (manifest_id) REFERENCES manifests ON DELETE CASCADE, - FOREIGN KEY (parameter_set_id) REFERENCES parameter_sets ON DELETE RESTRICT -) STRICT; - CREATE TABLE spaces (id INTEGER PRIMARY KEY) STRICT; CREATE TABLE space_manifests ( @@ -337,24 +325,6 @@ CREATE TABLE asset_dependencies ( FOREIGN KEY (asset_id) REFERENCES assets ON DELETE RESTRICT ) STRICT; -CREATE TABLE checkpoints ( - id INTEGER PRIMARY KEY, - execution_id INTEGER NOT NULL, - sequence INTEGER NOT NULL, - created_at INTEGER NOT NULL, - UNIQUE (execution_id, sequence), - FOREIGN KEY (execution_id) REFERENCES executions ON DELETE CASCADE -) STRICT; - -CREATE TABLE checkpoint_arguments ( - checkpoint_id INTEGER NOT NULL, - position INTEGER NOT NULL, - value_id INTEGER NOT NULL, - PRIMARY KEY (checkpoint_id, position), - FOREIGN KEY (checkpoint_id) REFERENCES checkpoints ON DELETE CASCADE, - FOREIGN KEY (value_id) REFERENCES values_ ON DELETE RESTRICT -) STRICT; - CREATE TABLE heartbeats ( id INTEGER PRIMARY KEY, execution_id INTEGER NOT NULL, diff --git a/server/priv/migrations/orchestration/2.sql b/server/priv/migrations/orchestration/2.sql new file mode 100644 index 00000000..74312d9c --- /dev/null +++ b/server/priv/migrations/orchestration/2.sql @@ -0,0 +1,5 @@ +ALTER TABLE workflows ADD COLUMN recurrent INTEGER NOT NULL DEFAULT 0; + +ALTER TABLE steps ADD COLUMN recurrent INTEGER NOT NULL DEFAULT 0; + +ALTER TABLE steps ADD COLUMN delay INTEGER NOT NULL DEFAULT 0;