Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 94 additions & 94 deletions clients/python/coflux/__main__.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion clients/python/coflux/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _default_serialisers():

class Config(pydantic.BaseModel):
project: str | None = None
space: str | None = None
workspace: str | None = None
concurrency: int = pydantic.Field(default_factory=_default_concurrency)
server: ServerConfig = pydantic.Field(default_factory=ServerConfig)
provides: dict[str, list[str] | str | bool] | None = None
Expand Down
9 changes: 8 additions & 1 deletion clients/python/coflux/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ def __init__(
blob_threshold: int,
blob_store_configs: list[config.BlobStoreConfig],
server_host: str,
secure: bool,
connection,
):
self._execution_id = execution_id
Expand All @@ -267,7 +268,7 @@ def __init__(
self._groups: list[str | None] = []
self._running = True
self._exit_stack = contextlib.ExitStack()
self._blob_manager = blobs.Manager(blob_store_configs, server_host)
self._blob_manager = blobs.Manager(blob_store_configs, server_host, secure=secure)
self._serialisation_manager = serialisation.Manager(
serialiser_configs, blob_threshold, self._blob_manager
)
Expand Down Expand Up @@ -566,6 +567,7 @@ def _execute(
blob_threshold: int,
blob_store_configs: list[config.BlobStoreConfig],
server_host: str,
secure: bool,
conn,
):
global _channel_context
Expand All @@ -576,6 +578,7 @@ def _execute(
blob_threshold,
blob_store_configs,
server_host,
secure,
conn,
) as channel:
threading.Thread(target=channel.run).start()
Expand Down Expand Up @@ -670,6 +673,7 @@ def __init__(
blob_threshold: int,
blob_store_configs: list[config.BlobStoreConfig],
server_host: str,
secure: bool,
server_connection: server.Connection,
loop: asyncio.AbstractEventLoop,
):
Expand All @@ -692,6 +696,7 @@ def __init__(
blob_threshold,
blob_store_configs,
server_host,
secure,
child_conn,
),
name=f"Execution-{execution_id}",
Expand Down Expand Up @@ -931,6 +936,7 @@ def execute(
target: str,
arguments: list[types.Value],
server_host: str,
secure: bool,
loop: asyncio.AbstractEventLoop,
) -> None:
if execution_id in self._executions:
Expand All @@ -944,6 +950,7 @@ def execute(
self._blob_threshold,
self._blob_store_configs,
server_host,
secure,
self._connection,
loop,
)
Expand Down
14 changes: 7 additions & 7 deletions clients/python/coflux/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Worker:
def __init__(
self,
project_id: str,
space_name: str,
workspace_name: str,
server_host: str,
secure: bool,
serialiser_configs: list[config.SerialiserConfig],
Expand All @@ -55,7 +55,7 @@ def __init__(
targets: dict[str, dict[str, tuple[models.Target, t.Callable]]],
):
self._project_id = project_id
self._space_name = space_name
self._workspace_name = workspace_name
self._server_host = server_host
self._secure = secure
self._session_id = session_id
Expand All @@ -80,7 +80,7 @@ async def _handle_execute(self, *args) -> None:
arguments = [_parse_value(a) for a in arguments]
loop = asyncio.get_running_loop()
self._execution_manager.execute(
execution_id, module_name, target, arguments, self._server_host, loop
execution_id, module_name, target, arguments, self._server_host, self._secure, loop
)

async def _handle_abort(self, *args) -> None:
Expand All @@ -97,7 +97,7 @@ def _url(self, scheme: str, path: str, params: dict[str, str]) -> str:
def _params(self):
params = {
"project": self._project_id,
"space": self._space_name,
"workspace": self._workspace_name,
}
if API_VERSION:
params["version"] = API_VERSION
Expand All @@ -112,7 +112,7 @@ async def run(self) -> None:
"""Run the worker. Raises SessionExpiredError if session expires."""
check_server(self._server_host, self._secure)
while True:
print(f"Connecting ({self._server_host}, {self._project_id}, {self._space_name})...")
print(f"Connecting ({self._server_host}, {self._project_id}, {self._workspace_name})...")
scheme = "wss" if self._secure else "ws"
url = self._url(scheme, "worker", self._params())
try:
Expand Down Expand Up @@ -141,9 +141,9 @@ async def run(self) -> None:
raise Exception("Project not found")
elif reason == "session_invalid":
raise SessionExpiredError("Session invalid")
elif reason == "space_mismatch":
elif reason == "workspace_mismatch":
raise Exception(
f"Space mismatch: session does not belong to space '{self._space_name}'"
f"Workspace mismatch: session does not belong to workspace '{self._workspace_name}'"
)
else:
delay = 1 + 3 * random.random() # TODO: exponential backoff
Expand Down
6 changes: 3 additions & 3 deletions docs/docs/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def my_task():
# ...
```

## Space inheritance
## Workspace inheritance

The inheritance of spaces affects caching - refer to [the explanation on the concepts page](/concepts#space-inheritance).
The inheritance of workspaces affects caching - refer to [the explanation on the concepts page](/concepts#workspace-inheritance).

## Forcing execution

Expand All @@ -146,7 +146,7 @@ If you need to re-evaluate a task that's cached, you can do so by 're-running' t

To summarise, the requirements for a cache hit (i.e., for a previous result to be reused, instead of executing a step) are that:

1. The result must be in the same space, or an ancestral space, within the same project.
1. The result must be in the same workspace, or an ancestral workspace, within the same project.
2. The result must also have had caching enabled.
3. The cache key and namespace (as described above) must match.
4. The result must not have failed (i.e., either it was successful, it's scheduled, or it's in progress).
Expand Down
20 changes: 10 additions & 10 deletions docs/docs/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ You should use a separate project when:
2. Throughput is
3. There's a logical separation of concerns.

## Spaces
## Workspaces

A individual project can contain multiple spaces (workspaces). All spaces within a projects are controlled by the same orchestration process, and some level of separation is provided between spaces, but space inheritance allows controlled data sharing. Spaces might be mapped to deployment environments (e.g., production, staging, development), or separated further - for example a space per customer in a production environment, or a space per developer in a development environment. Or even more granular separation is possible - for example using temporary spaces which correspond with a Git branch, to work on fixing a bug or building a new feature.
A individual project can contain multiple workspaces. All workspaces within a project are controlled by the same orchestration process, and some level of separation is provided between workspaces, but workspace inheritance allows controlled data sharing. Workspaces might be mapped to deployment environments (e.g., production, staging, development), or separated further - for example a workspace per customer in a production environment, or a workspace per developer in a development environment. Or even more granular separation is possible - for example using temporary workspaces which correspond with a Git branch, to work on fixing a bug or building a new feature.

### Space inheritance
### Workspace inheritance

By default there is isolation between spaces within a project - for example, workflows, runs, results are separated. But spaces can be arranged into a hierarchy. This allows:
By default there is isolation between workspaces within a project - for example, workflows, runs, results are separated. But workspaces can be arranged into a hierarchy. This allows:

1. Cached (or memoised) results to be inherited from parent spaces.
2. Steps to be _re-run_ in a 'child' spaces.
1. Cached (or memoised) results to be inherited from parent workspaces.
2. Steps to be _re-run_ in a 'child' workspaces.

For example, a `development` space can inherit from a `production` space, allowing you to re-run whole workflows, or specific steps within a workflow, in a development space, experimenting with changes to the code without having to re-run the whole workflow from scratch. When working with a team on a shared project, you might choose to set up separate space for each engineer, or even create spaces temporarily to work on specific features.
For example, a `development` workspace can inherit from a `production` workspace, allowing you to re-run whole workflows, or specific steps within a workflow, in a development workspace, experimenting with changes to the code without having to re-run the whole workflow from scratch. When working with a team on a shared project, you might choose to set up separate workspace for each engineer, or even create workspaces temporarily to work on specific features.

This makes it easier to diagnose issues that arise in a production space by retrying individual steps locally, and trying out code changes safely.
This makes it easier to diagnose issues that arise in a production workspace by retrying individual steps locally, and trying out code changes safely.

## Workers

An _worker_ is a process that hosts _modules_ (collections workflows/tasks). An worker connects to the server and is associated with a specific project and space. The worker waits for commands from the server telling it to execute specific tasks, and the worker monitors and reports progress of these executions back to the server.
An _worker_ is a process that hosts _modules_ (collections workflows/tasks). An worker connects to the server and is associated with a specific project and workspace. The worker waits for commands from the server telling it to execute specific tasks, and the worker monitors and reports progress of these executions back to the server.

This model of having workers connect to the server provides flexibility over where and how workers are run. During development a worker can run locally on a laptop, restarting automatically as code changes are made. Or multiple workers can run in the cloud, or on dedicated machines - or a combination. An worker can be started with specific environment variables associated with the deployment environment (e.g., production access keys).

Expand All @@ -39,7 +39,7 @@ A _workflow_ is defined in a module, in code. Additionally, _tasks_ can be defin

Workflows and tasks are collectively referred to as _targets_, although workflows are really just special forms of tasks, from which runs can be started. You can think of the distinction between workflows and tasks a bit like the distinction between public and private functions in a module.

Workflows need to be registered with a project and space so that they appear in the UI. This can be done explicitly (e.g., for a production space as part of a build process), or automatically by a worker when it starts/restarts (using the `--register` or `--dev` flag).
Workflows need to be registered with a project and workspace so that they appear in the UI. This can be done explicitly (e.g., for a production workspace as part of a build process), or automatically by a worker when it starts/restarts (using the `--register` or `--dev` flag).

## Runs

Expand Down
6 changes: 3 additions & 3 deletions docs/docs/getting_started/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ Open up the web UI at http://localhost:7777.

## Setting up a project

Before we can connect a worker, we need to create a Coflux project and a space.
Before we can connect a worker, we need to create a Coflux project and a workspace.

In the web UI, click 'New project...', enter a project name, and click 'Create'.

Now that you have an empty project, you'll be prompted to add a space. Enter a name (or use the suggested one), and click 'Create'.
Now that you have an empty project, you'll be prompted to add a workspace. Enter a name (or use the suggested one), and click 'Create'.

Take note of the project ID and space name in the instructions.
Take note of the project ID and workspace name in the instructions.

Next, we can define a workflow...
2 changes: 1 addition & 1 deletion docs/docs/getting_started/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Use the `configure` command to populate a configuration file. A configuration fi
coflux configure
```

You will be prompted to enter the host (`localhost:7777`), the project ID, and the space name.
You will be prompted to enter the host (`localhost:7777`), the project ID, and the workspace name.

## Run

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/memoising.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Memoised steps are indicated in the web UI with a pin icon.

As with caching, explicitly clicking the 're-run' button for a step will force the step to be re-run, even if it's memoised. Then subsequent memoising will use the new step execution.

If a step is manually re-run in a child space, the memoised results will be used (but memoised results from the child space aren't available to the parent). This follows the same rules as caching.
If a step is manually re-run in a child workspace, the memoised results will be used (but memoised results from the child workspace aren't available to the parent). This follows the same rules as caching.

## For debugging

Expand All @@ -26,7 +26,7 @@ Memoising provides several benefits for debugging:

2. Memoising slow tasks allows you to fix bugs that are occurring elsewhere in the workflow.

This is particularly useful when re-running a workflow from a production space in a development space (assuming the production space is configured as an ancestor of the development space). By liberally memo-ising tasks, specific steps can be re-run in the development space without re-running downstream steps.
This is particularly useful when re-running a workflow from a production workspace in a development workspace (assuming the production workspace is configured as an ancestor of the development workspace). By liberally memo-ising tasks, specific steps can be re-run in the development workspace without re-running downstream steps.

## For optimisation

Expand Down
2 changes: 1 addition & 1 deletion examples/pandas_etl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ uv sync
Configure Coflux:

```bash
uv run configure --project=... --space=...
uv run configure --project=... --workspace=...
```

Run worker in development mode:
Expand Down
2 changes: 1 addition & 1 deletion examples/slack_bot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ uv sync
Configure Coflux:

```bash
uv run configure --project=... --space=...
uv run configure --project=... --workspace=...
```

Run worker in development mode:
Expand Down
2 changes: 1 addition & 1 deletion examples/wikipedia/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ uv sync
Configure Coflux:

```bash
uv run configure --project=... --space=...
uv run configure --project=... --workspace=...
```

Run worker in development mode:
Expand Down
2 changes: 1 addition & 1 deletion server/lib/coflux/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Coflux.Application do
[
Topics.Sessions,
Topics.Projects,
Topics.Spaces,
Topics.Workspaces,
Topics.Modules,
Topics.Run,
Topics.Workflow,
Expand Down
Loading