diff --git a/clients/python/coflux/__main__.py b/clients/python/coflux/__main__.py index 1746b15d..ee80eb51 100644 --- a/clients/python/coflux/__main__.py +++ b/clients/python/coflux/__main__.py @@ -94,14 +94,14 @@ def _api_request( def _create_session( host: str, project_id: str, - space_name: str, + workspace_name: str, provides: dict[str, list[str]] | None = None, concurrency: int | None = None, token: str | None = None, *, secure: bool, ) -> str: - payload: dict[str, t.Any] = {"projectId": project_id, "spaceName": space_name} + payload: dict[str, t.Any] = {"projectId": project_id, "workspaceName": workspace_name} if provides: payload["provides"] = provides if concurrency: @@ -158,7 +158,7 @@ def _load_modules( def _register_manifests( project_id: str, - space_name: str, + workspace_name: str, host: str, targets: dict[str, dict[str, tuple[models.Target, t.Callable]]], token: str | None = None, @@ -219,14 +219,14 @@ def _register_manifests( secure=secure, json={ "projectId": project_id, - "spaceName": space_name, + "workspaceName": workspace_name, "manifests": manifests, }, ) def _get_pool( - host: str, project_id: str, space_name: str, pool_name: str, token: str | None, *, secure: bool + host: str, project_id: str, workspace_name: str, pool_name: str, token: str | None, *, secure: bool ) -> dict | None: try: return _api_request( @@ -237,7 +237,7 @@ def _get_pool( secure=secure, params={ "project": project_id, - "space": space_name, + "workspace": workspace_name, "pool": pool_name, }, ) @@ -273,7 +273,7 @@ def _print_table( def _init( *modules: types.ModuleType | str, project: str, - space: str, + workspace: str, host: str, token: str | None, secure: bool, @@ -288,7 +288,7 @@ def _init( try: targets = _load_modules(list(modules)) if register: - _register_manifests(project, space, host, targets, token=token, secure=secure) + _register_manifests(project, workspace, host, targets, token=token, secure=secure) # Track whether we created the session (vs it being provided externally) session_provided = session_id is not None @@ -303,14 +303,14 @@ def _init( time.sleep(delay) print("Creating session...") session_id = _create_session( - host, project, space, provides, concurrency, token=token, secure=secure + host, project, workspace, provides, concurrency, token=token, secure=secure ) print("Session created.") try: with Worker( project, - space, + workspace, host, secure, serialiser_configs, @@ -469,8 +469,8 @@ def decorator(f): return decorator -def _project_options(space: bool = False): - """Add project option, and optionally space.""" +def _project_options(workspace: bool = False): + """Add project option, and optionally workspace.""" def decorator(f): decorators = [ click.option( @@ -483,14 +483,14 @@ def decorator(f): required=True, ), ] - if space: + if workspace: decorators.append( click.option( - "-s", - "--space", - help="Space name", - envvar="COFLUX_SPACE", - default=_load_config().space, + "-w", + "--workspace", + help="Workspace name", + envvar="COFLUX_WORKSPACE", + default=_load_config().workspace, show_default=True, required=True, ) @@ -511,11 +511,11 @@ def decorator(f): prompt=True, ) @click.option( - "space", - "-s", - "--space", - help="Space name", - default=_load_config().space, + "workspace", + "-w", + "--workspace", + help="Workspace name", + default=_load_config().workspace, show_default=True, prompt=True, ) @@ -530,7 +530,7 @@ def decorator(f): def configure( host: str | None, project: str | None, - space: str | None, + workspace: str | None, ): """ Populate/update the configuration file. @@ -541,7 +541,7 @@ def configure( path = _config_path() data = _read_config(path) data["project"] = project - data["space"] = space + data["workspace"] = workspace data.setdefault("server", {})["host"] = host _write_config(path, data) @@ -551,50 +551,50 @@ def configure( @cli.group() -def spaces(): +def workspaces(): """ - Manage spaces. + Manage workspaces. """ pass -@spaces.command("list") +@workspaces.command("list") @_project_options() @_server_options() -def spaces_list( +def workspaces_list( project: str, host: str, token: str | None, secure: bool | None, ): """ - Lists spaces. + Lists workspaces. """ use_secure = _should_use_secure(host, secure) - spaces = _api_request("GET", host, "get_spaces", token, secure=use_secure, params={"project": project}) - if spaces: + workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure, params={"project": project}) + if workspaces: # TODO: draw as tree _print_table( ("Name", "Base"), [ ( - space["name"], - spaces[space["baseId"]]["name"] if space["baseId"] else "(None)", + workspace["name"], + workspaces[workspace["baseId"]]["name"] if workspace["baseId"] else "(None)", ) - for space in spaces.values() + for workspace in workspaces.values() ], ) -@spaces.command("create") +@workspaces.command("create") @_project_options() @_server_options() @click.option( "--base", - help="The base space to inherit from", + help="The base workspace to inherit from", ) @click.argument("name") -def spaces_create( +def workspaces_create( project: str, host: str, token: str | None, @@ -603,14 +603,14 @@ def spaces_create( name: str, ): """ - Creates a space within the project. + Creates a workspace within the project. """ use_secure = _should_use_secure(host, secure) base_id = None if base: - spaces = _api_request("GET", host, "get_spaces", token, secure=use_secure, params={"project": project}) - space_ids_by_name = {w["name"]: id for id, w in spaces.items()} - base_id = space_ids_by_name.get(base) + workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure, params={"project": project}) + workspace_ids_by_name = {w["name"]: id for id, w in workspaces.items()} + base_id = workspace_ids_by_name.get(base) if not base_id: click.BadOptionUsage("base", "Not recognised") @@ -618,7 +618,7 @@ def spaces_create( _api_request( "POST", host, - "create_space", + "create_workspace", token, secure=use_secure, json={ @@ -627,28 +627,28 @@ def spaces_create( "baseId": base_id, }, ) - click.secho(f"Created space '{name}'.", fg="green") + click.secho(f"Created workspace '{name}'.", fg="green") -@spaces.command("update") -@_project_options(space=True) +@workspaces.command("update") +@_project_options(workspace=True) @_server_options() @click.option( "--name", - help="The new name of the space", + help="The new name of the workspace", ) @click.option( "--base", - help="The new base space to inherit from", + help="The new base workspace to inherit from", ) @click.option( "--no-base", is_flag=True, - help="Unset the base space", + help="Unset the base workspace", ) -def spaces_update( +def workspaces_update( project: str, - space: str, + workspace: str, host: str, token: str | None, secure: bool | None, @@ -657,24 +657,24 @@ def spaces_update( no_base: bool, ): """ - Updates a space within the project. + Updates a workspace within the project. """ use_secure = _should_use_secure(host, secure) - spaces = _api_request("GET", host, "get_spaces", token, secure=use_secure, params={"project": project}) - space_ids_by_name = {w["name"]: id for id, w in spaces.items()} - space_id = space_ids_by_name.get(space) - if not space_id: - raise click.BadOptionUsage("space", "Not recognised") + workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure, params={"project": project}) + workspace_ids_by_name = {w["name"]: id for id, w in workspaces.items()} + workspace_id = workspace_ids_by_name.get(workspace) + if not workspace_id: + raise click.BadOptionUsage("workspace", "Not recognised") base_id = None if base: - base_id = space_ids_by_name.get(base) + base_id = workspace_ids_by_name.get(base) if not base_id: raise click.BadOptionUsage("base", "Not recognised") payload = { "projectId": project, - "spaceId": space_id, + "workspaceId": workspace_id, } if name is not None: payload["name"] = name @@ -685,43 +685,43 @@ def spaces_update( payload["baseId"] = None # TODO: handle response - _api_request("POST", host, "update_space", token, secure=use_secure, json=payload) + _api_request("POST", host, "update_workspace", token, secure=use_secure, json=payload) - click.secho(f"Updated space '{name or space}'.", fg="green") + click.secho(f"Updated workspace '{name or workspace}'.", fg="green") -@spaces.command("archive") -@_project_options(space=True) +@workspaces.command("archive") +@_project_options(workspace=True) @_server_options() -def spaces_archive( +def workspaces_archive( project: str, - space: str, + workspace: str, host: str, token: str | None, secure: bool | None, ): """ - Archives a space. + Archives a workspace. """ use_secure = _should_use_secure(host, secure) - spaces = _api_request("GET", host, "get_spaces", token, secure=use_secure, params={"project": project}) - space_ids_by_name = {w["name"]: id for id, w in spaces.items()} - space_id = space_ids_by_name.get(space) - if not space_id: - raise click.BadOptionUsage("space", "Not recognised") + workspaces = _api_request("GET", host, "get_workspaces", token, secure=use_secure, params={"project": project}) + workspace_ids_by_name = {w["name"]: id for id, w in workspaces.items()} + workspace_id = workspace_ids_by_name.get(workspace) + if not workspace_id: + raise click.BadOptionUsage("workspace", "Not recognised") _api_request( "POST", host, - "archive_space", + "archive_workspace", token, secure=use_secure, json={ "projectId": project, - "spaceId": space_id, + "workspaceId": workspace_id, }, ) - click.secho(f"Archived space '{space}'.", fg="green") + click.secho(f"Archived workspace '{workspace}'.", fg="green") @cli.group() @@ -733,9 +733,9 @@ def pools(): @pools.command("list") -@_project_options(space=True) +@_project_options(workspace=True) @_server_options() -def pools_list(project: str, space: str, host: str, token: str | None, secure: bool | None): +def pools_list(project: str, workspace: str, host: str, token: str | None, secure: bool | None): """ Lists pools. """ @@ -746,7 +746,7 @@ def pools_list(project: str, space: str, host: str, token: str | None, secure: b "get_pools", token, secure=use_secure, - json={"projectId": project, "spaceName": space}, + json={"projectId": project, "workspaceName": workspace}, ) if pools: _print_table( @@ -764,7 +764,7 @@ def pools_list(project: str, space: str, host: str, token: str | None, secure: b @pools.command("update") -@_project_options(space=True) +@_project_options(workspace=True) @_server_options() @click.option( "modules", @@ -789,7 +789,7 @@ def pools_list(project: str, space: str, host: str, token: str | None, secure: b @click.argument("name") def pools_update( project: str, - space: str, + workspace: str, host: str, token: str | None, secure: bool | None, @@ -803,7 +803,7 @@ def pools_update( Updates a pool. """ use_secure = _should_use_secure(host, secure) - pool = _get_pool(host, project, space, name, token, secure=use_secure) or {} + pool = _get_pool(host, project, workspace, name, token, secure=use_secure) or {} # TODO: support explicitly unsetting 'provides' (and modules, etc?) @@ -827,7 +827,7 @@ def pools_update( secure=use_secure, json={ "projectId": project, - "spaceName": space, + "workspaceName": workspace, "poolName": name, "pool": pool, }, @@ -835,10 +835,10 @@ def pools_update( @pools.command("delete") -@_project_options(space=True) +@_project_options(workspace=True) @_server_options() @click.argument("name") -def pools_delete(project: str, space: str, host: str, token: str | None, secure: bool | None, name: str): +def pools_delete(project: str, workspace: str, host: str, token: str | None, secure: bool | None, name: str): """ Deletes a pool. """ @@ -849,7 +849,7 @@ def pools_delete(project: str, space: str, host: str, token: str | None, secure: "update_pool", token, secure=use_secure, - json={"projectId": project, "spaceName": space, "poolName": name, "pool": None}, + json={"projectId": project, "workspaceName": workspace, "poolName": name, "pool": None}, ) @@ -1032,12 +1032,12 @@ def assets_download( @cli.command("register") -@_project_options(space=True) +@_project_options(workspace=True) @_server_options() @click.argument("module_name", nargs=-1) def register( project: str, - space: str, + workspace: str, host: str, token: str | None, secure: bool | None, @@ -1054,12 +1054,12 @@ def register( raise click.ClickException("No module(s) specified.") use_secure = _should_use_secure(host, secure) targets = _load_modules(list(module_name)) - _register_manifests(project, space, host, targets, token=token, secure=use_secure) + _register_manifests(project, workspace, host, targets, token=token, secure=use_secure) click.secho("Manifest(s) registered.", fg="green") @cli.command("worker") -@_project_options(space=True) +@_project_options(workspace=True) @_server_options() @click.option( "--provides", @@ -1102,7 +1102,7 @@ def register( @click.argument("module_name", nargs=-1) def worker( project: str, - space: str, + workspace: str, host: str, token: str | None, secure: bool | None, @@ -1129,7 +1129,7 @@ def worker( args = (*module_name,) kwargs = { "project": project, - "space": space, + "workspace": workspace, "host": host, "token": token, "secure": use_secure, @@ -1156,14 +1156,14 @@ def worker( @cli.command("submit") -@_project_options(space=True) +@_project_options(workspace=True) @_server_options() @click.argument("module") @click.argument("target") @click.argument("argument", nargs=-1) def submit( project: str, - space: str, + workspace: str, host: str, token: str | None, secure: bool | None, @@ -1184,7 +1184,7 @@ def submit( secure=use_secure, params={ "project": project, - "space": space, + "workspace": workspace, "module": module, "target": target, }, @@ -1201,7 +1201,7 @@ def submit( secure=use_secure, json={ "projectId": project, - "spaceName": space, + "workspaceName": workspace, "module": module, "target": target, "arguments": [["json", a] for a in argument], diff --git a/clients/python/coflux/config.py b/clients/python/coflux/config.py index 4d2605ab..8d6d475c 100644 --- a/clients/python/coflux/config.py +++ b/clients/python/coflux/config.py @@ -70,7 +70,7 @@ def _default_serialisers(): class Config(pydantic.BaseModel): project: str | None = None - space: str | None = None + workspace: str | None = None concurrency: int = pydantic.Field(default_factory=_default_concurrency) server: ServerConfig = pydantic.Field(default_factory=ServerConfig) provides: dict[str, list[str] | str | bool] | None = None diff --git a/clients/python/coflux/execution.py b/clients/python/coflux/execution.py index 45e39d3e..66e7e250 100644 --- a/clients/python/coflux/execution.py +++ b/clients/python/coflux/execution.py @@ -258,6 +258,7 @@ def __init__( blob_threshold: int, blob_store_configs: list[config.BlobStoreConfig], server_host: str, + secure: bool, connection, ): self._execution_id = execution_id @@ -267,7 +268,7 @@ def __init__( self._groups: list[str | None] = [] self._running = True self._exit_stack = contextlib.ExitStack() - self._blob_manager = blobs.Manager(blob_store_configs, server_host) + self._blob_manager = blobs.Manager(blob_store_configs, server_host, secure=secure) self._serialisation_manager = serialisation.Manager( serialiser_configs, blob_threshold, self._blob_manager ) @@ -566,6 +567,7 @@ def _execute( blob_threshold: int, blob_store_configs: list[config.BlobStoreConfig], server_host: str, + secure: bool, conn, ): global _channel_context @@ -576,6 +578,7 @@ def _execute( blob_threshold, blob_store_configs, server_host, + secure, conn, ) as channel: threading.Thread(target=channel.run).start() @@ -670,6 +673,7 @@ def __init__( blob_threshold: int, blob_store_configs: list[config.BlobStoreConfig], server_host: str, + secure: bool, server_connection: server.Connection, loop: asyncio.AbstractEventLoop, ): @@ -692,6 +696,7 @@ def __init__( blob_threshold, blob_store_configs, server_host, + secure, child_conn, ), name=f"Execution-{execution_id}", @@ -931,6 +936,7 @@ def execute( target: str, arguments: list[types.Value], server_host: str, + secure: bool, loop: asyncio.AbstractEventLoop, ) -> None: if execution_id in self._executions: @@ -944,6 +950,7 @@ def execute( self._blob_threshold, self._blob_store_configs, server_host, + secure, self._connection, loop, ) diff --git a/clients/python/coflux/worker.py b/clients/python/coflux/worker.py index 99b8d68b..226f1a4b 100644 --- a/clients/python/coflux/worker.py +++ b/clients/python/coflux/worker.py @@ -45,7 +45,7 @@ class Worker: def __init__( self, project_id: str, - space_name: str, + workspace_name: str, server_host: str, secure: bool, serialiser_configs: list[config.SerialiserConfig], @@ -55,7 +55,7 @@ def __init__( targets: dict[str, dict[str, tuple[models.Target, t.Callable]]], ): self._project_id = project_id - self._space_name = space_name + self._workspace_name = workspace_name self._server_host = server_host self._secure = secure self._session_id = session_id @@ -80,7 +80,7 @@ async def _handle_execute(self, *args) -> None: arguments = [_parse_value(a) for a in arguments] loop = asyncio.get_running_loop() self._execution_manager.execute( - execution_id, module_name, target, arguments, self._server_host, loop + execution_id, module_name, target, arguments, self._server_host, self._secure, loop ) async def _handle_abort(self, *args) -> None: @@ -97,7 +97,7 @@ def _url(self, scheme: str, path: str, params: dict[str, str]) -> str: def _params(self): params = { "project": self._project_id, - "space": self._space_name, + "workspace": self._workspace_name, } if API_VERSION: params["version"] = API_VERSION @@ -112,7 +112,7 @@ async def run(self) -> None: """Run the worker. Raises SessionExpiredError if session expires.""" check_server(self._server_host, self._secure) while True: - print(f"Connecting ({self._server_host}, {self._project_id}, {self._space_name})...") + print(f"Connecting ({self._server_host}, {self._project_id}, {self._workspace_name})...") scheme = "wss" if self._secure else "ws" url = self._url(scheme, "worker", self._params()) try: @@ -141,9 +141,9 @@ async def run(self) -> None: raise Exception("Project not found") elif reason == "session_invalid": raise SessionExpiredError("Session invalid") - elif reason == "space_mismatch": + elif reason == "workspace_mismatch": raise Exception( - f"Space mismatch: session does not belong to space '{self._space_name}'" + f"Workspace mismatch: session does not belong to workspace '{self._workspace_name}'" ) else: delay = 1 + 3 * random.random() # TODO: exponential backoff diff --git a/docs/docs/caching.md b/docs/docs/caching.md index 9ffa458d..d6c20be0 100644 --- a/docs/docs/caching.md +++ b/docs/docs/caching.md @@ -134,9 +134,9 @@ def my_task(): # ... ``` -## Space inheritance +## Workspace inheritance -The inheritance of spaces affects caching - refer to [the explanation on the concepts page](/concepts#space-inheritance). +The inheritance of workspaces affects caching - refer to [the explanation on the concepts page](/concepts#workspace-inheritance). ## Forcing execution @@ -146,7 +146,7 @@ If you need to re-evaluate a task that's cached, you can do so by 're-running' t To summarise, the requirements for a cache hit (i.e., for a previous result to be reused, instead of executing a step) are that: -1. The result must be in the same space, or an ancestral space, within the same project. +1. The result must be in the same workspace, or an ancestral workspace, within the same project. 2. The result must also have had caching enabled. 3. The cache key and namespace (as described above) must match. 4. The result must not have failed (i.e., either it was successful, it's scheduled, or it's in progress). diff --git a/docs/docs/concepts.md b/docs/docs/concepts.md index ab924a92..bdf4a414 100644 --- a/docs/docs/concepts.md +++ b/docs/docs/concepts.md @@ -12,24 +12,24 @@ You should use a separate project when: 2. Throughput is 3. There's a logical separation of concerns. -## Spaces +## Workspaces -A individual project can contain multiple spaces (workspaces). All spaces within a projects are controlled by the same orchestration process, and some level of separation is provided between spaces, but space inheritance allows controlled data sharing. Spaces might be mapped to deployment environments (e.g., production, staging, development), or separated further - for example a space per customer in a production environment, or a space per developer in a development environment. Or even more granular separation is possible - for example using temporary spaces which correspond with a Git branch, to work on fixing a bug or building a new feature. +A individual project can contain multiple workspaces. All workspaces within a project are controlled by the same orchestration process, and some level of separation is provided between workspaces, but workspace inheritance allows controlled data sharing. Workspaces might be mapped to deployment environments (e.g., production, staging, development), or separated further - for example a workspace per customer in a production environment, or a workspace per developer in a development environment. Or even more granular separation is possible - for example using temporary workspaces which correspond with a Git branch, to work on fixing a bug or building a new feature. -### Space inheritance +### Workspace inheritance -By default there is isolation between spaces within a project - for example, workflows, runs, results are separated. But spaces can be arranged into a hierarchy. This allows: +By default there is isolation between workspaces within a project - for example, workflows, runs, results are separated. But workspaces can be arranged into a hierarchy. This allows: -1. Cached (or memoised) results to be inherited from parent spaces. -2. Steps to be _re-run_ in a 'child' spaces. +1. Cached (or memoised) results to be inherited from parent workspaces. +2. Steps to be _re-run_ in a 'child' workspaces. -For example, a `development` space can inherit from a `production` space, allowing you to re-run whole workflows, or specific steps within a workflow, in a development space, experimenting with changes to the code without having to re-run the whole workflow from scratch. When working with a team on a shared project, you might choose to set up separate space for each engineer, or even create spaces temporarily to work on specific features. +For example, a `development` workspace can inherit from a `production` workspace, allowing you to re-run whole workflows, or specific steps within a workflow, in a development workspace, experimenting with changes to the code without having to re-run the whole workflow from scratch. When working with a team on a shared project, you might choose to set up separate workspace for each engineer, or even create workspaces temporarily to work on specific features. -This makes it easier to diagnose issues that arise in a production space by retrying individual steps locally, and trying out code changes safely. +This makes it easier to diagnose issues that arise in a production workspace by retrying individual steps locally, and trying out code changes safely. ## Workers -An _worker_ is a process that hosts _modules_ (collections workflows/tasks). An worker connects to the server and is associated with a specific project and space. The worker waits for commands from the server telling it to execute specific tasks, and the worker monitors and reports progress of these executions back to the server. +An _worker_ is a process that hosts _modules_ (collections workflows/tasks). An worker connects to the server and is associated with a specific project and workspace. The worker waits for commands from the server telling it to execute specific tasks, and the worker monitors and reports progress of these executions back to the server. This model of having workers connect to the server provides flexibility over where and how workers are run. During development a worker can run locally on a laptop, restarting automatically as code changes are made. Or multiple workers can run in the cloud, or on dedicated machines - or a combination. An worker can be started with specific environment variables associated with the deployment environment (e.g., production access keys). @@ -39,7 +39,7 @@ A _workflow_ is defined in a module, in code. Additionally, _tasks_ can be defin Workflows and tasks are collectively referred to as _targets_, although workflows are really just special forms of tasks, from which runs can be started. You can think of the distinction between workflows and tasks a bit like the distinction between public and private functions in a module. -Workflows need to be registered with a project and space so that they appear in the UI. This can be done explicitly (e.g., for a production space as part of a build process), or automatically by a worker when it starts/restarts (using the `--register` or `--dev` flag). +Workflows need to be registered with a project and workspace so that they appear in the UI. This can be done explicitly (e.g., for a production workspace as part of a build process), or automatically by a worker when it starts/restarts (using the `--register` or `--dev` flag). ## Runs diff --git a/docs/docs/getting_started/server.md b/docs/docs/getting_started/server.md index 9deea46c..6b45c59d 100644 --- a/docs/docs/getting_started/server.md +++ b/docs/docs/getting_started/server.md @@ -24,12 +24,12 @@ Open up the web UI at http://localhost:7777. ## Setting up a project -Before we can connect a worker, we need to create a Coflux project and a space. +Before we can connect a worker, we need to create a Coflux project and a workspace. In the web UI, click 'New project...', enter a project name, and click 'Create'. -Now that you have an empty project, you'll be prompted to add a space. Enter a name (or use the suggested one), and click 'Create'. +Now that you have an empty project, you'll be prompted to add a workspace. Enter a name (or use the suggested one), and click 'Create'. -Take note of the project ID and space name in the instructions. +Take note of the project ID and workspace name in the instructions. Next, we can define a workflow... diff --git a/docs/docs/getting_started/workers.md b/docs/docs/getting_started/workers.md index 754e3403..953e809c 100644 --- a/docs/docs/getting_started/workers.md +++ b/docs/docs/getting_started/workers.md @@ -18,7 +18,7 @@ Use the `configure` command to populate a configuration file. A configuration fi coflux configure ``` -You will be prompted to enter the host (`localhost:7777`), the project ID, and the space name. +You will be prompted to enter the host (`localhost:7777`), the project ID, and the workspace name. ## Run diff --git a/docs/docs/memoising.md b/docs/docs/memoising.md index b345b1b0..e88fa4e8 100644 --- a/docs/docs/memoising.md +++ b/docs/docs/memoising.md @@ -16,7 +16,7 @@ Memoised steps are indicated in the web UI with a pin icon. As with caching, explicitly clicking the 're-run' button for a step will force the step to be re-run, even if it's memoised. Then subsequent memoising will use the new step execution. -If a step is manually re-run in a child space, the memoised results will be used (but memoised results from the child space aren't available to the parent). This follows the same rules as caching. +If a step is manually re-run in a child workspace, the memoised results will be used (but memoised results from the child workspace aren't available to the parent). This follows the same rules as caching. ## For debugging @@ -26,7 +26,7 @@ Memoising provides several benefits for debugging: 2. Memoising slow tasks allows you to fix bugs that are occurring elsewhere in the workflow. -This is particularly useful when re-running a workflow from a production space in a development space (assuming the production space is configured as an ancestor of the development space). By liberally memo-ising tasks, specific steps can be re-run in the development space without re-running downstream steps. +This is particularly useful when re-running a workflow from a production workspace in a development workspace (assuming the production workspace is configured as an ancestor of the development workspace). By liberally memo-ising tasks, specific steps can be re-run in the development workspace without re-running downstream steps. ## For optimisation diff --git a/examples/pandas_etl/README.md b/examples/pandas_etl/README.md index c3cc17fc..f1effbd0 100644 --- a/examples/pandas_etl/README.md +++ b/examples/pandas_etl/README.md @@ -23,7 +23,7 @@ uv sync Configure Coflux: ```bash -uv run configure --project=... --space=... +uv run configure --project=... --workspace=... ``` Run worker in development mode: diff --git a/examples/slack_bot/README.md b/examples/slack_bot/README.md index a1a7bd4e..8020cc5b 100644 --- a/examples/slack_bot/README.md +++ b/examples/slack_bot/README.md @@ -19,7 +19,7 @@ uv sync Configure Coflux: ```bash -uv run configure --project=... --space=... +uv run configure --project=... --workspace=... ``` Run worker in development mode: diff --git a/examples/wikipedia/README.md b/examples/wikipedia/README.md index 37b71906..b7f90e18 100644 --- a/examples/wikipedia/README.md +++ b/examples/wikipedia/README.md @@ -19,7 +19,7 @@ uv sync Configure Coflux: ```bash -uv run configure --project=... --space=... +uv run configure --project=... --workspace=... ``` Run worker in development mode: diff --git a/server/lib/coflux/application.ex b/server/lib/coflux/application.ex index 40c07082..430147e0 100644 --- a/server/lib/coflux/application.ex +++ b/server/lib/coflux/application.ex @@ -34,7 +34,7 @@ defmodule Coflux.Application do [ Topics.Sessions, Topics.Projects, - Topics.Spaces, + Topics.Workspaces, Topics.Modules, Topics.Run, Topics.Workflow, diff --git a/server/lib/coflux/handlers/api.ex b/server/lib/coflux/handlers/api.ex index b8dc6590..d747b26c 100644 --- a/server/lib/coflux/handlers/api.ex +++ b/server/lib/coflux/handlers/api.ex @@ -80,23 +80,23 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "GET", ["get_spaces"], namespace) do + defp handle(req, "GET", ["get_workspaces"], namespace) do qs = :cowboy_req.parse_qs(req) project_id = get_query_param(qs, "project") with_project_access(req, project_id, namespace, fn -> - case Orchestration.get_spaces(project_id) do - {:ok, spaces} -> + case Orchestration.get_workspaces(project_id) do + {:ok, workspaces} -> json_response( req, - Map.new(spaces, fn {space_id, space} -> + Map.new(workspaces, fn {workspace_id, workspace} -> base_id = - if space.base_id, - do: Integer.to_string(space.base_id) + if workspace.base_id, + do: Integer.to_string(workspace.base_id) - {space_id, + {workspace_id, %{ - "name" => space.name, + "name" => workspace.name, "baseId" => base_id }} end) @@ -105,7 +105,7 @@ defmodule Coflux.Handlers.Api do end) end - defp handle(req, "POST", ["create_space"], namespace) do + defp handle(req, "POST", ["create_workspace"], namespace) do {:ok, arguments, errors, req} = read_arguments( req, @@ -120,13 +120,13 @@ defmodule Coflux.Handlers.Api do if Enum.empty?(errors) do with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.create_space( + case Orchestration.create_workspace( arguments.project_id, arguments.name, arguments[:base_id] ) do - {:ok, space_id} -> - json_response(req, %{id: space_id}) + {:ok, workspace_id} -> + json_response(req, %{id: workspace_id}) {:error, errors} -> errors = @@ -143,13 +143,13 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "POST", ["update_space"], namespace) do + defp handle(req, "POST", ["update_workspace"], namespace) do {:ok, arguments, errors, req} = read_arguments( req, %{ project_id: "projectId", - space_id: {"spaceId", &parse_numeric_id/1} + workspace_id: {"workspaceId", &parse_numeric_id/1} }, %{ name: "name", @@ -159,9 +159,9 @@ defmodule Coflux.Handlers.Api do if Enum.empty?(errors) do with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.update_space( + case Orchestration.update_workspace( arguments.project_id, - arguments.space_id, + arguments.workspace_id, Map.take(arguments, [:name, :base_id]) ) do :ok -> @@ -185,18 +185,18 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "POST", ["pause_space"], namespace) do + defp handle(req, "POST", ["pause_workspace"], namespace) do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_id: {"spaceId", &parse_numeric_id/1} + workspace_id: {"workspaceId", &parse_numeric_id/1} }) if Enum.empty?(errors) do with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.pause_space( + case Orchestration.pause_workspace( arguments.project_id, - arguments.space_id + arguments.workspace_id ) do :ok -> :cowboy_req.reply(204, req) @@ -210,18 +210,18 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "POST", ["resume_space"], namespace) do + defp handle(req, "POST", ["resume_workspace"], namespace) do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_id: {"spaceId", &parse_numeric_id/1} + workspace_id: {"workspaceId", &parse_numeric_id/1} }) if Enum.empty?(errors) do with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.resume_space( + case Orchestration.resume_workspace( arguments.project_id, - arguments.space_id + arguments.workspace_id ) do :ok -> :cowboy_req.reply(204, req) @@ -235,24 +235,24 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "POST", ["archive_space"], namespace) do + defp handle(req, "POST", ["archive_workspace"], namespace) do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_id: {"spaceId", &parse_numeric_id/1} + workspace_id: {"workspaceId", &parse_numeric_id/1} }) if Enum.empty?(errors) do with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.archive_space( + case Orchestration.archive_workspace( arguments.project_id, - arguments.space_id + arguments.workspace_id ) do :ok -> :cowboy_req.reply(204, req) {:error, :descendants} -> - json_error_response(req, "bad_request", details: %{"spaceId" => "has_dependencies"}) + json_error_response(req, "bad_request", details: %{"workspaceId" => "has_dependencies"}) {:error, :not_found} -> json_error_response(req, "not_found", status: 404) @@ -267,12 +267,12 @@ defmodule Coflux.Handlers.Api do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_name: "spaceName" + workspace_name: "workspaceName" }) if Enum.empty?(errors) do with_project_access(req, arguments.project_id, namespace, fn -> - case Orchestration.get_pools(arguments.project_id, arguments.space_name) do + case Orchestration.get_pools(arguments.project_id, arguments.workspace_name) do {:ok, pools} -> json_response( req, @@ -287,6 +287,9 @@ defmodule Coflux.Handlers.Api do } end) ) + + {:error, :workspace_invalid} -> + json_error_response(req, "workspace_not_found", status: 404) end end) else @@ -297,11 +300,11 @@ defmodule Coflux.Handlers.Api do defp handle(req, "GET", ["get_pool"], namespace) do qs = :cowboy_req.parse_qs(req) project_id = get_query_param(qs, "project") - space_name = get_query_param(qs, "space") + workspace_name = get_query_param(qs, "workspace") pool_name = get_query_param(qs, "pool") with_project_access(req, project_id, namespace, fn -> - case Orchestration.get_pools(project_id, space_name) do + case Orchestration.get_pools(project_id, workspace_name) do {:ok, pools} -> case Map.fetch(pools, pool_name) do {:ok, pool} -> @@ -317,6 +320,9 @@ defmodule Coflux.Handlers.Api do :error -> json_error_response(req, "not_found", status: 404) end + + {:error, :workspace_invalid} -> + json_error_response(req, "workspace_not_found", status: 404) end end) end @@ -337,7 +343,7 @@ defmodule Coflux.Handlers.Api do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_name: "spaceName", + workspace_name: "workspaceName", pool_name: {"poolName", &parse_pool_name/1}, pool: {"pool", &parse_pool/1} }) @@ -346,7 +352,7 @@ defmodule Coflux.Handlers.Api do with_project_access(req, arguments.project_id, namespace, fn -> case Orchestration.update_pool( arguments.project_id, - arguments.space_name, + arguments.workspace_name, arguments.pool_name, arguments.pool ) do @@ -366,7 +372,7 @@ defmodule Coflux.Handlers.Api do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_name: "spaceName", + workspace_name: "workspaceName", worker_id: {"workerId", &parse_numeric_id/1} }) @@ -374,7 +380,7 @@ defmodule Coflux.Handlers.Api do with_project_access(req, arguments.project_id, namespace, fn -> case Orchestration.stop_worker( arguments.project_id, - arguments.space_name, + arguments.workspace_name, arguments.worker_id ) do :ok -> @@ -393,7 +399,7 @@ defmodule Coflux.Handlers.Api do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_name: "spaceName", + workspace_name: "workspaceName", worker_id: {"workerId", &parse_numeric_id/1} }) @@ -401,7 +407,7 @@ defmodule Coflux.Handlers.Api do with_project_access(req, arguments.project_id, namespace, fn -> case Orchestration.resume_worker( arguments.project_id, - arguments.space_name, + arguments.workspace_name, arguments.worker_id ) do :ok -> @@ -420,7 +426,7 @@ defmodule Coflux.Handlers.Api do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_name: "spaceName", + workspace_name: "workspaceName", manifests: {"manifests", &parse_manifests/1} }) @@ -428,7 +434,7 @@ defmodule Coflux.Handlers.Api do with_project_access(req, arguments.project_id, namespace, fn -> case Orchestration.register_manifests( arguments.project_id, - arguments.space_name, + arguments.workspace_name, arguments.manifests ) do :ok -> @@ -444,7 +450,7 @@ defmodule Coflux.Handlers.Api do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_name: "spaceName", + workspace_name: "workspaceName", module_name: "moduleName" }) @@ -452,7 +458,7 @@ defmodule Coflux.Handlers.Api do with_project_access(req, arguments.project_id, namespace, fn -> case Orchestration.archive_module( arguments.project_id, - arguments.space_name, + arguments.workspace_name, arguments.module_name ) do :ok -> @@ -467,12 +473,12 @@ defmodule Coflux.Handlers.Api do defp handle(req, "GET", ["get_workflow"], namespace) do qs = :cowboy_req.parse_qs(req) project_id = get_query_param(qs, "project") - space_name = get_query_param(qs, "space") + workspace_name = get_query_param(qs, "workspace") module = get_query_param(qs, "module") target_name = get_query_param(qs, "target") with_project_access(req, project_id, namespace, fn -> - case Orchestration.get_workflow(project_id, space_name, module, target_name) do + case Orchestration.get_workflow(project_id, workspace_name, module, target_name) do {:ok, nil} -> json_error_response(req, "not_found", status: 404) @@ -490,7 +496,7 @@ defmodule Coflux.Handlers.Api do project_id: "projectId", module: "module", target: "target", - space_name: "spaceName", + workspace_name: "workspaceName", arguments: {"arguments", &parse_arguments/1} }, %{ @@ -512,7 +518,7 @@ defmodule Coflux.Handlers.Api do arguments.target, :workflow, arguments.arguments, - space: arguments.space_name, + workspace: arguments.workspace_name, wait_for: arguments[:wait_for], cache: arguments[:cache], defer: arguments[:defer], @@ -563,7 +569,7 @@ defmodule Coflux.Handlers.Api do {:ok, arguments, errors, req} = read_arguments(req, %{ project_id: "projectId", - space_name: "spaceName", + workspace_name: "workspaceName", step_id: "stepId" }) @@ -572,13 +578,13 @@ defmodule Coflux.Handlers.Api do case Orchestration.rerun_step( arguments.project_id, arguments.step_id, - arguments.space_name + arguments.workspace_name ) do {:ok, execution_id, attempt} -> json_response(req, %{"executionId" => execution_id, "attempt" => attempt}) - {:error, :space_invalid} -> - json_error_response(req, "bad_request", details: %{"space" => "invalid"}) + {:error, :workspace_invalid} -> + json_error_response(req, "bad_request", details: %{"workspace" => "invalid"}) end end) else @@ -590,13 +596,13 @@ defmodule Coflux.Handlers.Api do qs = :cowboy_req.parse_qs(req) project_id = get_query_param(qs, "project") # TODO: handle parse error - {:ok, space_id} = parse_numeric_id(get_query_param(qs, "spaceId")) + {:ok, workspace_id} = parse_numeric_id(get_query_param(qs, "workspaceId")) query = get_query_param(qs, "query") with_project_access(req, project_id, namespace, fn -> case Topical.execute( Coflux.TopicalRegistry, - ["projects", project_id, "search", space_id], + ["projects", project_id, "search", workspace_id], "query", {query}, %{namespace: namespace} @@ -629,7 +635,7 @@ defmodule Coflux.Handlers.Api do req, %{ project_id: "projectId", - space_name: "spaceName" + workspace_name: "workspaceName" }, %{ provides: {"provides", &parse_tag_set/1}, @@ -646,11 +652,11 @@ defmodule Coflux.Handlers.Api do ] |> Enum.reject(fn {_, v} -> is_nil(v) end) - case Orchestration.create_session(arguments.project_id, arguments.space_name, opts) do + case Orchestration.create_session(arguments.project_id, arguments.workspace_name, opts) do {:ok, session_id} -> json_response(req, %{"sessionId" => session_id}) - {:error, :space_invalid} -> + {:error, :workspace_invalid} -> json_error_response(req, "not_found", status: 404) end end) diff --git a/server/lib/coflux/handlers/worker.ex b/server/lib/coflux/handlers/worker.ex index 84cec1c7..26054108 100644 --- a/server/lib/coflux/handlers/worker.ex +++ b/server/lib/coflux/handlers/worker.ex @@ -24,11 +24,11 @@ defmodule Coflux.Handlers.Worker do :ok -> # TODO: validate project_id = get_query_param(qs, "project") - space_name = get_query_param(qs, "space") + workspace_name = get_query_param(qs, "workspace") session_token = extract_session_token(protocols) req = :cowboy_req.set_resp_header("sec-websocket-protocol", @protocol_version, req) - {:cowboy_websocket, req, {project_id, space_name, session_token}} + {:cowboy_websocket, req, {project_id, workspace_name, session_token}} {:error, server_version, expected_version} -> req = @@ -44,11 +44,11 @@ defmodule Coflux.Handlers.Worker do end end - def websocket_init({project_id, space_name, session_token}) do + def websocket_init({project_id, workspace_name, session_token}) do case Projects.get_project_by_id(Coflux.ProjectsServer, project_id) do {:ok, _} -> # TODO: monitor server? - case Orchestration.resume_session(project_id, session_token, space_name, self()) do + case Orchestration.resume_session(project_id, session_token, workspace_name, self()) do {:ok, external_id, execution_ids} -> {[session_message(external_id)], %{ @@ -60,8 +60,8 @@ defmodule Coflux.Handlers.Worker do {:error, :session_invalid} -> {[{:close, 4000, "session_invalid"}], nil} - {:error, :space_mismatch} -> - {[{:close, 4000, "space_mismatch"}], nil} + {:error, :workspace_mismatch} -> + {[{:close, 4000, "workspace_mismatch"}], nil} end :error -> @@ -346,7 +346,7 @@ defmodule Coflux.Handlers.Worker do end def websocket_info(:stop, state) do - {[{:close, 4000, "space_not_found"}], state} + {[{:close, 4000, "workspace_not_found"}], state} end defp is_recognised_execution?(execution_id, state) do diff --git a/server/lib/coflux/launchers/docker.ex b/server/lib/coflux/launchers/docker.ex index 9ddc5c24..61658a45 100644 --- a/server/lib/coflux/launchers/docker.ex +++ b/server/lib/coflux/launchers/docker.ex @@ -1,7 +1,7 @@ defmodule Coflux.DockerLauncher do @docker_api_version "v1.47" - def launch(project_id, space_name, session_token, modules, config \\ %{}) do + def launch(project_id, workspace_name, session_token, modules, config \\ %{}) do docker_conn = parse_docker_host(config[:docker_host]) coflux_host = get_coflux_host() @@ -15,7 +15,7 @@ defmodule Coflux.DockerLauncher do "Env" => [ "COFLUX_HOST=#{coflux_host}", "COFLUX_PROJECT=#{project_id}", - "COFLUX_SPACE=#{space_name}", + "COFLUX_WORKSPACE=#{workspace_name}", "COFLUX_SESSION=#{session_token}" ] } diff --git a/server/lib/coflux/orchestration.ex b/server/lib/coflux/orchestration.ex index f18dcfc1..8f220086 100644 --- a/server/lib/coflux/orchestration.ex +++ b/server/lib/coflux/orchestration.ex @@ -1,64 +1,64 @@ defmodule Coflux.Orchestration do alias Coflux.Orchestration - def get_spaces(project_id) do - call_server(project_id, :get_spaces) + def get_workspaces(project_id) do + call_server(project_id, :get_workspaces) end - def create_space(project_id, name, base_id) do - call_server(project_id, {:create_space, name, base_id}) + def create_workspace(project_id, name, base_id) do + call_server(project_id, {:create_workspace, name, base_id}) end - def update_space(project_id, space_id, updates) do - call_server(project_id, {:update_space, space_id, updates}) + def update_workspace(project_id, workspace_id, updates) do + call_server(project_id, {:update_workspace, workspace_id, updates}) end - def pause_space(project_id, space_name) do - call_server(project_id, {:pause_space, space_name}) + def pause_workspace(project_id, workspace_name) do + call_server(project_id, {:pause_workspace, workspace_name}) end - def resume_space(project_id, space_name) do - call_server(project_id, {:resume_space, space_name}) + def resume_workspace(project_id, workspace_name) do + call_server(project_id, {:resume_workspace, workspace_name}) end - def archive_space(project_id, space_name) do - call_server(project_id, {:archive_space, space_name}) + def archive_workspace(project_id, workspace_name) do + call_server(project_id, {:archive_workspace, workspace_name}) end - def get_pools(project_id, space_name) do - call_server(project_id, {:get_pools, space_name}) + def get_pools(project_id, workspace_name) do + call_server(project_id, {:get_pools, workspace_name}) end - def update_pool(project_id, space_name, pool_name, pool) do - call_server(project_id, {:update_pool, space_name, pool_name, pool}) + def update_pool(project_id, workspace_name, pool_name, pool) do + call_server(project_id, {:update_pool, workspace_name, pool_name, pool}) end - def stop_worker(project_id, space_name, worker_id) do - call_server(project_id, {:stop_worker, space_name, worker_id}) + def stop_worker(project_id, workspace_name, worker_id) do + call_server(project_id, {:stop_worker, workspace_name, worker_id}) end - def resume_worker(project_id, space_name, worker_id) do - call_server(project_id, {:resume_worker, space_name, worker_id}) + def resume_worker(project_id, workspace_name, worker_id) do + call_server(project_id, {:resume_worker, workspace_name, worker_id}) end - def register_manifests(project_id, space_name, manifests) do - call_server(project_id, {:register_manifests, space_name, manifests}) + def register_manifests(project_id, workspace_name, manifests) do + call_server(project_id, {:register_manifests, workspace_name, manifests}) end - def archive_module(project_id, space_name, module_name) do - call_server(project_id, {:archive_module, space_name, module_name}) + def archive_module(project_id, workspace_name, module_name) do + call_server(project_id, {:archive_module, workspace_name, module_name}) end - def get_workflow(project_id, space_name, module, target_name) do - call_server(project_id, {:get_workflow, space_name, module, target_name}) + def get_workflow(project_id, workspace_name, module, target_name) do + call_server(project_id, {:get_workflow, workspace_name, module, target_name}) end - def resume_session(project_id, session_id, space_name, pid) do - call_server(project_id, {:resume_session, session_id, space_name, pid}) + def resume_session(project_id, session_id, workspace_name, pid) do + call_server(project_id, {:resume_session, session_id, workspace_name, pid}) end - def create_session(project_id, space_name, opts \\ []) do - call_server(project_id, {:create_session, space_name, opts}) + def create_session(project_id, workspace_name, opts \\ []) do + call_server(project_id, {:create_session, workspace_name, opts}) end def declare_targets(project_id, session_id, targets) do @@ -84,8 +84,8 @@ defmodule Coflux.Orchestration do call_server(project_id, {:cancel_execution, execution_id}) end - def rerun_step(project_id, step_id, space_name) do - call_server(project_id, {:rerun_step, step_id, space_name}) + def rerun_step(project_id, step_id, workspace_name) do + call_server(project_id, {:rerun_step, step_id, workspace_name}) end def record_heartbeats(project_id, executions, session_id) do @@ -123,32 +123,32 @@ defmodule Coflux.Orchestration do call_server(project_id, {:record_logs, execution_id, messages}) end - def subscribe_spaces(project_id, pid) do - call_server(project_id, {:subscribe_spaces, pid}) + def subscribe_workspaces(project_id, pid) do + call_server(project_id, {:subscribe_workspaces, pid}) end - def subscribe_modules(project_id, space_id, pid) do - call_server(project_id, {:subscribe_modules, space_id, pid}) + def subscribe_modules(project_id, workspace_id, pid) do + call_server(project_id, {:subscribe_modules, workspace_id, pid}) end - def subscribe_module(project_id, module, space_id, pid) do - call_server(project_id, {:subscribe_module, module, space_id, pid}) + def subscribe_module(project_id, module, workspace_id, pid) do + call_server(project_id, {:subscribe_module, module, workspace_id, pid}) end - def subscribe_pools(project_id, space_id, pid) do - call_server(project_id, {:subscribe_pools, space_id, pid}) + def subscribe_pools(project_id, workspace_id, pid) do + call_server(project_id, {:subscribe_pools, workspace_id, pid}) end - def subscribe_pool(project_id, space_id, pool_name, pid) do - call_server(project_id, {:subscribe_pool, space_id, pool_name, pid}) + def subscribe_pool(project_id, workspace_id, pool_name, pid) do + call_server(project_id, {:subscribe_pool, workspace_id, pool_name, pid}) end - def subscribe_sessions(project_id, space_id, pid) do - call_server(project_id, {:subscribe_sessions, space_id, pid}) + def subscribe_sessions(project_id, workspace_id, pid) do + call_server(project_id, {:subscribe_sessions, workspace_id, pid}) end - def subscribe_workflow(project_id, module, target, space_id, pid) do - call_server(project_id, {:subscribe_workflow, module, target, space_id, pid}) + def subscribe_workflow(project_id, module, target, workspace_id, pid) do + call_server(project_id, {:subscribe_workflow, module, target, workspace_id, pid}) end def subscribe_run(project_id, run_id, pid) do @@ -159,8 +159,8 @@ defmodule Coflux.Orchestration do call_server(project_id, {:subscribe_logs, run_id, pid}) end - def subscribe_targets(project_id, space_id, pid) do - call_server(project_id, {:subscribe_targets, space_id, pid}) + def subscribe_targets(project_id, workspace_id, pid) do + call_server(project_id, {:subscribe_targets, workspace_id, pid}) end def unsubscribe(project_id, ref) do diff --git a/server/lib/coflux/orchestration/manifests.ex b/server/lib/coflux/orchestration/manifests.ex index 2555f42e..39934855 100644 --- a/server/lib/coflux/orchestration/manifests.ex +++ b/server/lib/coflux/orchestration/manifests.ex @@ -3,7 +3,7 @@ defmodule Coflux.Orchestration.Manifests do alias Coflux.Orchestration.{TagSets, CacheConfigs, Utils} - def register_manifests(db, space_id, manifests) do + def register_manifests(db, workspace_id, manifests) do with_transaction(db, fn -> manifest_ids = Map.new(manifests, fn {module, workflows} -> @@ -77,18 +77,18 @@ defmodule Coflux.Orchestration.Manifests do {module, manifest_id} end) - {:ok, current_manifest_ids} = get_latest_manifest_ids(db, space_id) + {:ok, current_manifest_ids} = get_latest_manifest_ids(db, workspace_id) now = current_timestamp() {:ok, _} = insert_many( db, - :space_manifests, - {:space_id, :module, :manifest_id, :created_at}, + :workspace_manifests, + {:workspace_id, :module, :manifest_id, :created_at}, Enum.reduce(manifest_ids, [], fn {module, manifest_id}, result -> if manifest_id != Map.get(current_manifest_ids, module) do - [{space_id, module, manifest_id, now} | result] + [{workspace_id, module, manifest_id, now} | result] else result end @@ -99,13 +99,13 @@ defmodule Coflux.Orchestration.Manifests do end) end - def archive_module(db, space_id, module_name) do + def archive_module(db, workspace_id, module_name) do with_transaction(db, fn -> now = current_timestamp() {:ok, _} = - insert_one(db, :space_manifests, %{ - space_id: space_id, + insert_one(db, :workspace_manifests, %{ + workspace_id: workspace_id, module: module_name, manifest_id: nil, created_at: now @@ -115,30 +115,30 @@ defmodule Coflux.Orchestration.Manifests do end) end - defp get_latest_manifest_ids(db, space_id) do + defp get_latest_manifest_ids(db, workspace_id) do case query( db, """ SELECT wm.module, wm.manifest_id - FROM space_manifests wm + FROM workspace_manifests wm JOIN ( SELECT module, MAX(created_at) AS latest_created_at - FROM space_manifests - WHERE space_id = ?1 + FROM workspace_manifests + WHERE workspace_id = ?1 GROUP BY module ) AS latest ON wm.module = latest.module AND wm.created_at = latest.latest_created_at - WHERE wm.space_id = ?1 + WHERE wm.workspace_id = ?1 """, - {space_id} + {workspace_id} ) do {:ok, rows} -> {:ok, Map.new(rows)} end end - def get_latest_manifests(db, space_id) do - case get_latest_manifest_ids(db, space_id) do + def get_latest_manifests(db, workspace_id) do + case get_latest_manifest_ids(db, workspace_id) do {:ok, manifest_ids} -> manifests = Enum.reduce(manifest_ids, %{}, fn {module, manifest_id}, result -> @@ -154,18 +154,18 @@ defmodule Coflux.Orchestration.Manifests do end end - def get_latest_workflow(db, space_id, module, target_name) do + def get_latest_workflow(db, workspace_id, module, target_name) do case query_one( db, """ SELECT w.parameter_set_id, w.instruction_id, w.wait_for, w.cache_config_id, w.defer_params, w.delay, w.retry_limit, w.retry_delay_min, w.retry_delay_max, w.recurrent, w.requires_tag_set_id - FROM space_manifests AS wm + FROM workspace_manifests AS wm LEFT JOIN workflows AS w ON w.manifest_id = wm.manifest_id - WHERE wm.space_id = ?1 AND wm.module = ?2 AND w.name = ?3 + WHERE wm.workspace_id = ?1 AND wm.module = ?2 AND w.name = ?3 ORDER BY wm.created_at DESC LIMIT 1 """, - {space_id, module, target_name} + {workspace_id, module, target_name} ) do {:ok, nil} -> {:ok, nil} @@ -228,17 +228,17 @@ defmodule Coflux.Orchestration.Manifests do end end - def get_all_workflows_for_space(db, space_id) do + def get_all_workflows_for_workspace(db, workspace_id) do case query( db, """ SELECT DISTINCT wm.module, w.name - FROM space_manifests AS wm + FROM workspace_manifests AS wm INNER JOIN manifests AS m on m.id = wm.manifest_id INNER JOIN workflows AS w ON w.manifest_id = m.id - WHERE wm.space_id = ?1 + WHERE wm.workspace_id = ?1 """, - {space_id} + {workspace_id} ) do {:ok, rows} -> {:ok, diff --git a/server/lib/coflux/orchestration/models.ex b/server/lib/coflux/orchestration/models.ex index d9f6c9ae..5d5617d1 100644 --- a/server/lib/coflux/orchestration/models.ex +++ b/server/lib/coflux/orchestration/models.ex @@ -62,7 +62,7 @@ defmodule Coflux.Orchestration.Models do :retry_limit, :retry_delay_min, :retry_delay_max, - :space_id, + :workspace_id, :execute_after, :attempt, :created_at diff --git a/server/lib/coflux/orchestration/runs.ex b/server/lib/coflux/orchestration/runs.ex index 663c089d..9729779f 100644 --- a/server/lib/coflux/orchestration/runs.ex +++ b/server/lib/coflux/orchestration/runs.ex @@ -85,17 +85,17 @@ defmodule Coflux.Orchestration.Runs do end end - def get_space_id_for_execution(db, execution_id) do + def get_workspace_id_for_execution(db, execution_id) do case query_one!( db, - "SELECT space_id FROM executions WHERE id = ?1", + "SELECT workspace_id FROM executions WHERE id = ?1", {execution_id} ) do - {:ok, {space_id}} -> {:ok, space_id} + {:ok, {workspace_id}} -> {:ok, workspace_id} end end - def get_steps_for_space(db, space_id) do + def get_steps_for_workspace(db, workspace_id) do case query( db, """ @@ -103,7 +103,7 @@ defmodule Coflux.Orchestration.Runs do SELECT s.module, s.target, MAX(e.created_at) AS max_created_at FROM executions AS e INNER JOIN steps AS s ON s.id = e.step_id - WHERE e.space_id = ?1 + WHERE e.workspace_id = ?1 GROUP BY s.module, s.target ) SELECT s.module, s.target, s.type, r.external_id, s.external_id, e.attempt @@ -111,9 +111,9 @@ defmodule Coflux.Orchestration.Runs do INNER JOIN steps AS s ON s.id = e.step_id INNER JOIN latest_executions AS le ON s.module = le.module AND s.target = le.target AND e.created_at = le.max_created_at INNER JOIN runs AS r ON r.id = s.run_id - WHERE e.space_id = ?1 + WHERE e.workspace_id = ?1 """, - {space_id} + {workspace_id} ) do {:ok, rows} -> {:ok, @@ -131,8 +131,8 @@ defmodule Coflux.Orchestration.Runs do target, type, arguments, - space_id, - cache_space_ids, + workspace_id, + cache_workspace_ids, opts \\ [] ) do idempotency_key = Keyword.get(opts, :idempotency_key) @@ -154,8 +154,8 @@ defmodule Coflux.Orchestration.Runs do type, arguments, true, - space_id, - cache_space_ids, + workspace_id, + cache_workspace_ids, now, opts ) @@ -172,8 +172,8 @@ defmodule Coflux.Orchestration.Runs do target, type, arguments, - space_id, - cache_space_ids, + workspace_id, + cache_workspace_ids, opts \\ [] ) do now = current_timestamp() @@ -188,8 +188,8 @@ defmodule Coflux.Orchestration.Runs do type, arguments, false, - space_id, - cache_space_ids, + workspace_id, + cache_workspace_ids, now, opts ) @@ -254,8 +254,8 @@ defmodule Coflux.Orchestration.Runs do type, arguments, is_initial, - space_id, - cache_space_ids, + workspace_id, + cache_workspace_ids, now, opts ) do @@ -277,7 +277,7 @@ defmodule Coflux.Orchestration.Runs do memoised_execution = if memo_key do - case find_memoised_execution(db, run_id, cache_space_ids, memo_key) do + case find_memoised_execution(db, run_id, cache_workspace_ids, memo_key) do {:ok, memoised_execution} -> memoised_execution end end @@ -357,7 +357,7 @@ defmodule Coflux.Orchestration.Runs do attempt = 1 {:ok, execution_id} = - insert_execution(db, step_id, attempt, space_id, execute_after, now) + insert_execution(db, step_id, attempt, workspace_id, execute_after, now) {external_step_id, execution_id, attempt, now, false, cache_key} end @@ -393,14 +393,14 @@ defmodule Coflux.Orchestration.Runs do end end - def rerun_step(db, step_id, space_id, execute_after, dependency_ids) do + def rerun_step(db, step_id, workspace_id, execute_after, dependency_ids) do with_transaction(db, fn -> now = current_timestamp() # TODO: cancel pending executions for step? {:ok, attempt} = get_next_execution_attempt(db, step_id) {:ok, execution_id} = - insert_execution(db, step_id, attempt, space_id, execute_after, now) + insert_execution(db, step_id, attempt, workspace_id, execute_after, now) {:ok, _} = insert_many( @@ -498,7 +498,7 @@ defmodule Coflux.Orchestration.Runs do s.retry_limit, s.retry_delay_min, s.retry_delay_max, - e.space_id, + e.workspace_id, e.execute_after, e.attempt, e.created_at @@ -539,7 +539,7 @@ defmodule Coflux.Orchestration.Runs do ) end - def get_pending_executions_for_space(db, space_id) do + def get_pending_executions_for_workspace(db, workspace_id) do query( db, """ @@ -547,9 +547,9 @@ defmodule Coflux.Orchestration.Runs do FROM executions AS e INNER JOIN steps AS s ON s.id = e.step_id LEFT JOIN results AS r ON r.execution_id = e.id - WHERE e.space_id = ?1 AND r.created_at IS NULL + WHERE e.workspace_id = ?1 AND r.created_at IS NULL """, - {space_id} + {workspace_id} ) end @@ -588,7 +588,7 @@ defmodule Coflux.Orchestration.Runs do ) end - def get_target_runs(db, module, target, type, space_id, limit \\ 50) do + def get_target_runs(db, module, target, type, workspace_id, limit \\ 50) do query( db, """ @@ -596,11 +596,11 @@ defmodule Coflux.Orchestration.Runs do FROM runs as r INNER JOIN steps AS s ON s.run_id = r.id INNER JOIN executions AS e ON e.step_id == s.id - WHERE s.module = ?1 AND s.target = ?2 AND s.type = ?3 AND s.parent_id IS NULL AND e.space_id = ?4 + WHERE s.module = ?1 AND s.target = ?2 AND s.type = ?3 AND s.parent_id IS NULL AND e.workspace_id = ?4 ORDER BY r.created_at DESC LIMIT ?5 """, - {module, target, Utils.encode_step_type(type), space_id, limit} + {module, target, Utils.encode_step_type(type), workspace_id, limit} ) end @@ -706,7 +706,7 @@ defmodule Coflux.Orchestration.Runs do query( db, """ - SELECT e.id, e.step_id, e.attempt, e.space_id, e.execute_after, e.created_at, a.created_at + SELECT e.id, e.step_id, e.attempt, e.workspace_id, e.execute_after, e.created_at, a.created_at FROM steps AS s INNER JOIN executions AS e ON e.step_id = s.id LEFT JOIN assignments AS a ON a.execution_id = e.id @@ -868,7 +868,7 @@ defmodule Coflux.Orchestration.Runs do end # TODO: consider changed 'requires'? - defp find_memoised_execution(db, run_id, space_ids, memo_key) do + defp find_memoised_execution(db, run_id, workspace_ids, memo_key) do case query( db, """ @@ -878,13 +878,13 @@ defmodule Coflux.Orchestration.Runs do LEFT JOIN results AS r ON r.execution_id = e.id WHERE s.run_id = ?1 - AND e.space_id IN (#{build_placeholders(length(space_ids), 1)}) - AND s.memo_key = ?#{length(space_ids) + 2} + AND e.workspace_id IN (#{build_placeholders(length(workspace_ids), 1)}) + AND s.memo_key = ?#{length(workspace_ids) + 2} AND (r.type IS NULL OR r.type = 1) ORDER BY e.created_at DESC LIMIT 1 """, - List.to_tuple([run_id] ++ space_ids ++ [{:blob, memo_key}]) + List.to_tuple([run_id] ++ workspace_ids ++ [{:blob, memo_key}]) ) do {:ok, [row]} -> {:ok, row} @@ -894,7 +894,7 @@ defmodule Coflux.Orchestration.Runs do end end - def find_cached_execution(db, space_ids, step_id, cache_key, recorded_after) do + def find_cached_execution(db, workspace_ids, step_id, cache_key, recorded_after) do case query( db, """ @@ -903,14 +903,14 @@ defmodule Coflux.Orchestration.Runs do INNER JOIN executions AS e ON e.step_id = s.id LEFT JOIN results AS r ON r.execution_id = e.id WHERE - e.space_id IN (#{build_placeholders(length(space_ids))}) - AND s.cache_key = ?#{length(space_ids) + 1} - AND (r.type IS NULL OR (r.type = 1 AND r.created_at >= ?#{length(space_ids) + 2})) - AND s.id <> ?#{length(space_ids) + 3} + e.workspace_id IN (#{build_placeholders(length(workspace_ids))}) + AND s.cache_key = ?#{length(workspace_ids) + 1} + AND (r.type IS NULL OR (r.type = 1 AND r.created_at >= ?#{length(workspace_ids) + 2})) + AND s.id <> ?#{length(workspace_ids) + 3} ORDER BY e.created_at DESC LIMIT 1 """, - List.to_tuple(space_ids ++ [{:blob, cache_key}, recorded_after, step_id]) + List.to_tuple(workspace_ids ++ [{:blob, cache_key}, recorded_after, step_id]) ) do {:ok, [{execution_id}]} -> {:ok, execution_id} @@ -1023,11 +1023,11 @@ defmodule Coflux.Orchestration.Runs do end end - defp insert_execution(db, step_id, attempt, space_id, execute_after, created_at) do + defp insert_execution(db, step_id, attempt, workspace_id, execute_after, created_at) do insert_one(db, :executions, %{ step_id: step_id, attempt: attempt, - space_id: space_id, + workspace_id: workspace_id, execute_after: execute_after, created_at: created_at }) diff --git a/server/lib/coflux/orchestration/server.ex b/server/lib/coflux/orchestration/server.ex index b27314fc..fd354b94 100644 --- a/server/lib/coflux/orchestration/server.ex +++ b/server/lib/coflux/orchestration/server.ex @@ -5,7 +5,7 @@ defmodule Coflux.Orchestration.Server do alias Coflux.MapUtils alias Coflux.Orchestration.{ - Spaces, + Workspaces, Sessions, Runs, Results, @@ -31,21 +31,21 @@ defmodule Coflux.Orchestration.Server do expiry_timer: nil, # id -> %{name, base_id, state} - spaces: %{}, + workspaces: %{}, - # space_id -> %{pool_name -> pool} + # workspace_id -> %{pool_name -> pool} pools: %{}, # name -> id - space_names: %{}, + workspace_names: %{}, - # worker_id -> %{created_at, pool_id, pool_name, space_id, state, data, session_id, stop_id, last_poll_at} + # worker_id -> %{created_at, pool_id, pool_name, workspace_id, state, data, session_id, stop_id, last_poll_at} workers: %{}, # ref -> {pid, session_id} connections: %{}, - # session_id -> %{external_id, connection, targets, queue, starting, executing, concurrency, space_id, provides, worker_id, last_idle_at, activated_at, activation_timeout, reconnection_timeout} + # session_id -> %{external_id, connection, targets, queue, starting, executing, concurrency, workspace_id, provides, worker_id, last_idle_at, activated_at, activation_timeout, reconnection_timeout} sessions: %{}, # external_id -> session_id @@ -93,24 +93,24 @@ defmodule Coflux.Orchestration.Server do end def handle_continue(:setup, state) do - {:ok, spaces} = Spaces.get_all_spaces(state.db) + {:ok, workspaces} = Workspaces.get_all_workspaces(state.db) {:ok, workers} = Workers.get_active_workers(state.db) - space_names = - Map.new(spaces, fn {space_id, space} -> - {space.name, space_id} + workspace_names = + Map.new(workspaces, fn {workspace_id, workspace} -> + {workspace.name, workspace_id} end) workers = Enum.reduce( workers, %{}, - fn {worker_id, created_at, pool_id, pool_name, space_id, state, data}, workers -> + fn {worker_id, created_at, pool_id, pool_name, workspace_id, state, data}, workers -> Map.put(workers, worker_id, %{ created_at: created_at, pool_id: pool_id, pool_name: pool_name, - space_id: space_id, + workspace_id: workspace_id, state: state, data: data, session_id: nil, @@ -121,17 +121,17 @@ defmodule Coflux.Orchestration.Server do ) pools = - spaces + workspaces |> Map.keys() - |> Enum.reduce(%{}, fn space_id, pools -> - {:ok, space_pools} = Spaces.get_space_pools(state.db, space_id) - Map.put(pools, space_id, space_pools) + |> Enum.reduce(%{}, fn workspace_id, pools -> + {:ok, workspace_pools} = Workspaces.get_workspace_pools(state.db, workspace_id) + Map.put(pools, workspace_id, workspace_pools) end) state = Map.merge(state, %{ - spaces: spaces, - space_names: space_names, + workspaces: workspaces, + workspace_names: workspace_names, pools: pools, workers: workers }) @@ -143,7 +143,7 @@ defmodule Coflux.Orchestration.Server do Enum.reduce( active_sessions, state, - fn {session_id, external_id, space_id, worker_id, provides_tag_set_id, concurrency, + fn {session_id, external_id, workspace_id, worker_id, provides_tag_set_id, concurrency, activation_timeout, reconnection_timeout, secret_hash, created_at, activated_at}, state -> provides = @@ -167,7 +167,7 @@ defmodule Coflux.Orchestration.Server do starting: MapSet.new(), executing: MapSet.new(), concurrency: concurrency, - space_id: space_id, + workspace_id: workspace_id, provides: provides, worker_id: worker_id, last_idle_at: activated_at || created_at, @@ -209,51 +209,51 @@ defmodule Coflux.Orchestration.Server do {:noreply, state} end - def handle_call(:get_spaces, _from, state) do - spaces = - state.spaces + def handle_call(:get_workspaces, _from, state) do + workspaces = + state.workspaces |> Enum.filter(fn {_, e} -> e.state != :archived end) - |> Map.new(fn {space_id, space} -> - {space_id, %{name: space.name, base_id: space.base_id}} + |> Map.new(fn {workspace_id, workspace} -> + {workspace_id, %{name: workspace.name, base_id: workspace.base_id}} end) - {:reply, {:ok, spaces}, state} + {:reply, {:ok, workspaces}, state} end - def handle_call({:create_space, name, base_id}, _from, state) do - case Spaces.create_space(state.db, name, base_id) do - {:ok, space_id, space} -> + def handle_call({:create_workspace, name, base_id}, _from, state) do + case Workspaces.create_workspace(state.db, name, base_id) do + {:ok, workspace_id, workspace} -> state = state - |> put_in([Access.key(:spaces), space_id], space) - |> put_in([Access.key(:space_names), space.name], space_id) - |> notify_listeners(:spaces, {:space, space_id, space}) + |> put_in([Access.key(:workspaces), workspace_id], workspace) + |> put_in([Access.key(:workspace_names), workspace.name], workspace_id) + |> notify_listeners(:workspaces, {:workspace, workspace_id, workspace}) |> flush_notifications() - {:reply, {:ok, space_id}, state} + {:reply, {:ok, workspace_id}, state} {:error, error} -> {:reply, {:error, error}, state} end end - def handle_call({:update_space, space_id, updates}, _from, state) do + def handle_call({:update_workspace, workspace_id, updates}, _from, state) do # TODO: shut down/update pools - case Spaces.update_space(state.db, space_id, updates) do - {:ok, space} -> - original_name = state.spaces[space_id].name + case Workspaces.update_workspace(state.db, workspace_id, updates) do + {:ok, workspace} -> + original_name = state.workspaces[workspace_id].name state = state - |> put_in([Access.key(:spaces), space_id], space) - |> Map.update!(:space_names, fn space_names -> - space_names + |> put_in([Access.key(:workspaces), workspace_id], workspace) + |> Map.update!(:workspace_names, fn workspace_names -> + workspace_names |> Map.delete(original_name) - |> Map.put(space.name, space_id) + |> Map.put(workspace.name, workspace_id) end) |> notify_listeners( - :spaces, - {:space, space_id, Map.take(space, [:name, :base_id, :state])} + :workspaces, + {:workspace, workspace_id, Map.take(workspace, [:name, :base_id, :state])} ) |> flush_notifications() @@ -267,26 +267,26 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:pause_space, space_id}, _from, state) do - case Spaces.pause_space(state.db, space_id) do + def handle_call({:pause_workspace, workspace_id}, _from, state) do + case Workspaces.pause_workspace(state.db, workspace_id) do :ok -> state = state - |> put_in([Access.key(:spaces), space_id, Access.key(:state)], :paused) - |> notify_listeners(:spaces, {:state, space_id, :paused}) + |> put_in([Access.key(:workspaces), workspace_id, Access.key(:state)], :paused) + |> notify_listeners(:workspaces, {:state, workspace_id, :paused}) |> flush_notifications() {:reply, :ok, state} end end - def handle_call({:resume_space, space_id}, _from, state) do - case Spaces.resume_space(state.db, space_id) do + def handle_call({:resume_workspace, workspace_id}, _from, state) do + case Workspaces.resume_workspace(state.db, workspace_id) do :ok -> state = state - |> put_in([Access.key(:spaces), space_id, Access.key(:state)], :active) - |> notify_listeners(:spaces, {:state, space_id, :active}) + |> put_in([Access.key(:workspaces), workspace_id, Access.key(:state)], :active) + |> notify_listeners(:workspaces, {:state, workspace_id, :active}) |> flush_notifications() send(self(), :tick) @@ -295,12 +295,12 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:archive_space, space_id}, _from, state) do - case Spaces.archive_space(state.db, space_id) do + def handle_call({:archive_workspace, workspace_id}, _from, state) do + case Workspaces.archive_workspace(state.db, workspace_id) do :ok -> state = state.sessions - |> Enum.filter(fn {_, s} -> s.space_id == space_id end) + |> Enum.filter(fn {_, s} -> s.workspace_id == workspace_id end) |> Enum.reduce(state, fn {session_id, session}, state -> state = if session.connection do @@ -315,7 +315,7 @@ defmodule Coflux.Orchestration.Server do end) state = - case Runs.get_pending_executions_for_space(state.db, space_id) do + case Runs.get_pending_executions_for_workspace(state.db, workspace_id) do {:ok, executions} -> Enum.reduce(executions, state, fn {execution_id, _run_id, module}, state -> case record_and_notify_result( @@ -333,14 +333,14 @@ defmodule Coflux.Orchestration.Server do state = state.workers |> Enum.reduce(state, fn {worker_id, worker}, state -> - if worker.space_id == space_id && worker.state == :active do - update_worker_state(state, worker_id, :draining, space_id, worker.pool_name) + if worker.workspace_id == workspace_id && worker.state == :active do + update_worker_state(state, worker_id, :draining, workspace_id, worker.pool_name) else state end end) - |> put_in([Access.key(:spaces), space_id, Access.key(:state)], :archived) - |> notify_listeners(:spaces, {:state, space_id, :archived}) + |> put_in([Access.key(:workspaces), workspace_id, Access.key(:state)], :archived) + |> notify_listeners(:workspaces, {:state, workspace_id, :archived}) |> flush_notifications() {:reply, :ok, state} @@ -350,9 +350,9 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:get_pools, space_name}, _from, state) do - with {:ok, space_id, _} <- lookup_space_by_name(state, space_name) do - case Spaces.get_space_pools(state.db, space_id) do + def handle_call({:get_pools, workspace_name}, _from, state) do + with {:ok, workspace_id, _} <- lookup_workspace_by_name(state, workspace_name) do + case Workspaces.get_workspace_pools(state.db, workspace_id) do {:ok, pools} -> {:reply, {:ok, pools}, state} end @@ -362,29 +362,29 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:update_pool, space_name, pool_name, pool}, _from, state) do - with {:ok, space_id, _} <- lookup_space_by_name(state, space_name) do - case Spaces.update_pool(state.db, space_id, pool_name, pool) do + def handle_call({:update_pool, workspace_name, pool_name, pool}, _from, state) do + with {:ok, workspace_id, _} <- lookup_workspace_by_name(state, workspace_name) do + case Workspaces.update_pool(state.db, workspace_id, pool_name, pool) do {:ok, pool_id} -> state = state.workers |> Enum.reduce(state, fn {worker_id, worker}, state -> if worker.state == :active && worker.pool_name == pool_name do # TODO: only if pool has meaningfully changed? - update_worker_state(state, worker_id, :draining, space_id, pool_name) + update_worker_state(state, worker_id, :draining, workspace_id, pool_name) else state end end) - |> update_in([Access.key(:pools), Access.key(space_id, %{})], fn pools -> + |> update_in([Access.key(:pools), Access.key(workspace_id, %{})], fn pools -> if pool do Map.put(pools, pool_name, Map.put(pool, :id, pool_id)) else Map.delete(pools, pool_name) end end) - |> notify_listeners({:pool, space_id, pool_name}, {:updated, pool}) - |> notify_listeners({:pools, space_id}, {:pool, pool_name, pool}) + |> notify_listeners({:pool, workspace_id, pool_name}, {:updated, pool}) + |> notify_listeners({:pools, workspace_id}, {:pool, pool_name, pool}) |> flush_notifications() {:reply, :ok, state} @@ -395,12 +395,12 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:stop_worker, space_name, worker_id}, _from, state) do - with {:ok, space_id, _} <- lookup_space_by_name(state, space_name), - {:ok, worker} <- lookup_worker(state, worker_id, space_id) do + def handle_call({:stop_worker, workspace_name, worker_id}, _from, state) do + with {:ok, workspace_id, _} <- lookup_workspace_by_name(state, workspace_name), + {:ok, worker} <- lookup_worker(state, worker_id, workspace_id) do state = state - |> update_worker_state(worker_id, :draining, space_id, worker.pool_name) + |> update_worker_state(worker_id, :draining, workspace_id, worker.pool_name) |> flush_notifications() send(self(), :tick) @@ -412,12 +412,12 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:resume_worker, space_name, worker_id}, _from, state) do - with {:ok, space_id, _} <- lookup_space_by_name(state, space_name), - {:ok, worker} <- lookup_worker(state, worker_id, space_id) do + def handle_call({:resume_worker, workspace_name, worker_id}, _from, state) do + with {:ok, workspace_id, _} <- lookup_workspace_by_name(state, workspace_name), + {:ok, worker} <- lookup_worker(state, worker_id, workspace_id) do state = state - |> update_worker_state(worker_id, :active, space_id, worker.pool_name) + |> update_worker_state(worker_id, :active, workspace_id, worker.pool_name) |> flush_notifications() send(self(), :tick) @@ -430,16 +430,16 @@ defmodule Coflux.Orchestration.Server do end def handle_call( - {:register_manifests, space_name, manifests}, + {:register_manifests, workspace_name, manifests}, _from, state ) do - case lookup_space_by_name(state, space_name) do + case lookup_workspace_by_name(state, workspace_name) do {:error, error} -> {:reply, {:error, error}, state} - {:ok, space_id, _} -> - case Manifests.register_manifests(state.db, space_id, manifests) do + {:ok, workspace_id, _} -> + case Manifests.register_manifests(state.db, workspace_id, manifests) do :ok -> state = manifests @@ -447,17 +447,17 @@ defmodule Coflux.Orchestration.Server do Enum.reduce(workflows, state, fn {target_name, target}, state -> notify_listeners( state, - {:workflow, module, target_name, space_id}, + {:workflow, module, target_name, workspace_id}, {:target, target} ) end) end) |> notify_listeners( - {:modules, space_id}, + {:modules, workspace_id}, {:manifests, manifests} ) |> notify_listeners( - {:targets, space_id}, + {:targets, workspace_id}, {:manifests, Map.new(manifests, fn {module_name, workflows} -> {module_name, MapSet.new(Map.keys(workflows))} @@ -470,18 +470,18 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:archive_module, space_name, module_name}, _from, state) do - case lookup_space_by_name(state, space_name) do + def handle_call({:archive_module, workspace_name, module_name}, _from, state) do + case lookup_workspace_by_name(state, workspace_name) do {:error, error} -> {:reply, {:error, error}, state} - {:ok, space_id, _} -> - case Manifests.archive_module(state.db, space_id, module_name) do + {:ok, workspace_id, _} -> + case Manifests.archive_module(state.db, workspace_id, module_name) do :ok -> state = state |> notify_listeners( - {:modules, space_id}, + {:modules, workspace_id}, {:manifest, module_name, nil} ) |> flush_notifications() @@ -491,10 +491,10 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:get_workflow, space_name, module, target_name}, _from, state) do - with {:ok, space_id, _} <- lookup_space_by_name(state, space_name), + def handle_call({:get_workflow, workspace_name, module, target_name}, _from, state) do + with {:ok, workspace_id, _} <- lookup_workspace_by_name(state, workspace_name), {:ok, workflow} <- - Manifests.get_latest_workflow(state.db, space_id, module, target_name) do + Manifests.get_latest_workflow(state.db, workspace_id, module, target_name) do {:reply, {:ok, workflow}, state} else {:error, error} -> @@ -502,7 +502,7 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:create_session, space_name, opts}, _from, state) do + def handle_call({:create_session, workspace_name, opts}, _from, state) do provides = Keyword.get(opts, :provides, %{}) concurrency = Keyword.get(opts, :concurrency, 0) activation_timeout = Keyword.get(opts, :activation_timeout, @default_activation_timeout_ms) @@ -510,7 +510,7 @@ defmodule Coflux.Orchestration.Server do reconnection_timeout = Keyword.get(opts, :reconnection_timeout, @default_reconnection_timeout_ms) - with {:ok, space_id, _} <- lookup_space_by_name(state, space_name) do + with {:ok, workspace_id, _} <- lookup_workspace_by_name(state, workspace_name) do db_opts = [ provides: provides, concurrency: concurrency, @@ -518,7 +518,7 @@ defmodule Coflux.Orchestration.Server do reconnection_timeout: reconnection_timeout ] - case Sessions.create_session(state.db, space_id, nil, db_opts) do + case Sessions.create_session(state.db, workspace_id, nil, db_opts) do {:ok, session_id, external_session_id, token, secret_hash, now} -> session = %{ external_id: external_session_id, @@ -529,7 +529,7 @@ defmodule Coflux.Orchestration.Server do starting: MapSet.new(), executing: MapSet.new(), concurrency: concurrency, - space_id: space_id, + workspace_id: workspace_id, provides: provides, worker_id: nil, last_idle_at: now, @@ -552,13 +552,13 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:resume_session, token, space_name, pid}, _from, state) do + def handle_call({:resume_session, token, workspace_name, pid}, _from, state) do with {:ok, external_id, secret} <- Sessions.parse_token(token), {:ok, session_id} <- Map.fetch(state.session_ids, external_id), session = Map.fetch!(state.sessions, session_id), true <- Sessions.verify_secret(secret, session.secret_hash), - {:ok, space_id, _} <- lookup_space_by_name(state, space_name), - true <- session.space_id == space_id do + {:ok, workspace_id, _} <- lookup_workspace_by_name(state, workspace_name), + true <- session.workspace_id == workspace_id do activated_at = if is_nil(session.activated_at) do {:ok, now} = Sessions.activate_session(state.db, session_id) @@ -597,7 +597,7 @@ defmodule Coflux.Orchestration.Server do state = notify_listeners( state, - {:sessions, session.space_id}, + {:sessions, session.workspace_id}, {:session, session_id, %{ connected: true, @@ -615,7 +615,7 @@ defmodule Coflux.Orchestration.Server do session_id ) |> notify_listeners( - {:pool, worker.space_id, worker.pool_name}, + {:pool, worker.workspace_id, worker.pool_name}, {:worker_connected, session.worker_id, true} ) @@ -633,10 +633,10 @@ defmodule Coflux.Orchestration.Server do :error -> {:reply, {:error, :session_invalid}, state} - {:error, :space_invalid} -> - {:reply, {:error, :space_mismatch}, state} + {:error, :workspace_invalid} -> + {:reply, {:error, :workspace_mismatch}, state} - # Token verification failed or space mismatch + # Token verification failed or workspace mismatch false -> {:reply, {:error, :session_invalid}, state} end @@ -666,30 +666,30 @@ defmodule Coflux.Orchestration.Server do {:ok, {run_id, parent_id}} end - {:ok, space_id} = + {:ok, workspace_id} = case parent do {_run_id, parent_id} -> - Runs.get_space_id_for_execution(state.db, parent_id) + Runs.get_workspace_id_for_execution(state.db, parent_id) nil -> - space_name = Keyword.get(opts, :space) + workspace_name = Keyword.get(opts, :workspace) - case lookup_space_by_name(state, space_name) do - {:ok, space_id, _} -> {:ok, space_id} - {:error, :space_invalid} -> {:ok, nil} + case lookup_workspace_by_name(state, workspace_name) do + {:ok, workspace_id, _} -> {:ok, workspace_id} + {:error, :workspace_invalid} -> {:ok, nil} end end - if space_id do + if workspace_id do {:ok, external_run_id, external_step_id, execution_id, state} = - schedule_run(state, module, target_name, type, arguments, space_id, opts) + schedule_run(state, module, target_name, type, arguments, workspace_id, opts) send(self(), :tick) state = flush_notifications(state) {:reply, {:ok, external_run_id, external_step_id, execution_id}, state} else - {:reply, {:error, :space_invalid}, state} + {:reply, {:error, :workspace_invalid}, state} end end @@ -699,10 +699,10 @@ defmodule Coflux.Orchestration.Server do state ) do {:ok, parent_run_id} = Runs.get_execution_run_id(state.db, parent_id) - {:ok, space_id} = Runs.get_space_id_for_execution(state.db, parent_id) + {:ok, workspace_id} = Runs.get_workspace_id_for_execution(state.db, parent_id) {:ok, run} = Runs.get_run_by_id(state.db, parent_run_id) - cache_space_ids = get_cache_space_ids(state, space_id) + cache_workspace_ids = get_cache_workspace_ids(state, workspace_id) case Runs.schedule_step( state.db, @@ -712,8 +712,8 @@ defmodule Coflux.Orchestration.Server do target_name, type, arguments, - space_id, - cache_space_ids, + workspace_id, + cache_workspace_ids, opts ) do {:ok, @@ -752,11 +752,11 @@ defmodule Coflux.Orchestration.Server do created_at: created_at, arguments: arguments, requires: requires - }, space_id} + }, workspace_id} ) |> notify_listeners( {:run, run.id}, - {:execution, external_step_id, attempt, execution_id, space_id, created_at, + {:execution, external_step_id, attempt, execution_id, workspace_id, created_at, execute_after, %{}} ) else @@ -781,11 +781,11 @@ defmodule Coflux.Orchestration.Server do state = state |> notify_listeners( - {:modules, space_id}, + {:modules, workspace_id}, {:scheduled, module, execution_id, execute_at} ) |> notify_listeners( - {:module, module, space_id}, + {:module, module, workspace_id}, {:scheduled, execution_id, target_name, run.external_id, external_step_id, attempt, execute_after, created_at} ) @@ -800,7 +800,7 @@ defmodule Coflux.Orchestration.Server do state = state |> notify_listeners( - {:targets, space_id}, + {:targets, workspace_id}, {:step, module, target_name, type, run.external_id, external_step_id, attempt} ) |> flush_notifications() @@ -826,13 +826,13 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:rerun_step, external_step_id, space_name}, _from, state) do - # TODO: abort/cancel any running/scheduled retry? (for the same space) (and reference this retry?) - case lookup_space_by_name(state, space_name) do + def handle_call({:rerun_step, external_step_id, workspace_name}, _from, state) do + # TODO: abort/cancel any running/scheduled retry? (for the same workspace) (and reference this retry?) + case lookup_workspace_by_name(state, workspace_name) do {:error, error} -> {:reply, {:error, error}, state} - {:ok, space_id, _} -> + {:ok, workspace_id, _} -> {:ok, step} = Runs.get_step_by_external_id(state.db, external_step_id) base_execution_id = @@ -844,16 +844,16 @@ defmodule Coflux.Orchestration.Server do end end - {:ok, base_space_id} = - Runs.get_space_id_for_execution(state.db, base_execution_id) + {:ok, base_workspace_id} = + Runs.get_workspace_id_for_execution(state.db, base_execution_id) - if base_space_id == space_id || - is_space_ancestor?(state, base_space_id, space_id) do - {:ok, execution_id, attempt, state} = rerun_step(state, step, space_id) + if base_workspace_id == workspace_id || + is_workspace_ancestor?(state, base_workspace_id, workspace_id) do + {:ok, execution_id, attempt, state} = rerun_step(state, step, workspace_id) state = flush_notifications(state) {:reply, {:ok, execution_id, attempt}, state} else - {:reply, {:error, :space_invalid}, state} + {:reply, {:error, :workspace_invalid}, state} end end end @@ -954,7 +954,7 @@ defmodule Coflux.Orchestration.Server do state = state |> notify_listeners( - {:sessions, session.space_id}, + {:sessions, session.workspace_id}, {:executions, session_id, session.starting |> MapSet.union(session.executing) |> Enum.count()} ) @@ -968,13 +968,22 @@ defmodule Coflux.Orchestration.Server do end def handle_call({:notify_terminated, execution_ids}, _from, state) do - # TODO: record in database? - now = System.os_time(:millisecond) state = execution_ids |> Enum.reduce(state, fn execution_id, state -> + # If execution has no result recorded, mark it as abandoned + state = + case Results.has_result?(state.db, execution_id) do + {:ok, false} -> + {:ok, state} = process_result(state, execution_id, :abandoned) + state + + {:ok, true} -> + state + end + case find_session_for_execution(state, execution_id) do {:ok, session_id} -> state = @@ -999,7 +1008,7 @@ defmodule Coflux.Orchestration.Server do notify_listeners( state, - {:sessions, session.space_id}, + {:sessions, session.workspace_id}, {:executions, session_id, executions} ) @@ -1153,24 +1162,24 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:subscribe_spaces, pid}, _from, state) do - {:ok, ref, state} = add_listener(state, :spaces, pid) + def handle_call({:subscribe_workspaces, pid}, _from, state) do + {:ok, ref, state} = add_listener(state, :workspaces, pid) - spaces = - Map.new(state.spaces, fn {space_id, space} -> - {space_id, Map.take(space, [:name, :base_id, :state])} + workspaces = + Map.new(state.workspaces, fn {workspace_id, workspace} -> + {workspace_id, Map.take(workspace, [:name, :base_id, :state])} end) - {:reply, {:ok, spaces, ref}, state} + {:reply, {:ok, workspaces, ref}, state} end - def handle_call({:subscribe_modules, space_id, pid}, _from, state) do - case lookup_space_by_id(state, space_id) do + def handle_call({:subscribe_modules, workspace_id, pid}, _from, state) do + case lookup_workspace_by_id(state, workspace_id) do {:error, error} -> {:reply, {:error, error}, state} {:ok, _} -> - {:ok, manifests} = Manifests.get_latest_manifests(state.db, space_id) + {:ok, manifests} = Manifests.get_latest_manifests(state.db, workspace_id) executing = state.sessions @@ -1201,41 +1210,41 @@ defmodule Coflux.Orchestration.Server do end) {:ok, ref, state} = - add_listener(state, {:modules, space_id}, pid) + add_listener(state, {:modules, workspace_id}, pid) {:reply, {:ok, manifests, executions, ref}, state} end end - def handle_call({:subscribe_module, module, space_id, pid}, _from, state) do - case lookup_space_by_id(state, space_id) do + def handle_call({:subscribe_module, module, workspace_id, pid}, _from, state) do + case lookup_workspace_by_id(state, workspace_id) do {:error, error} -> {:reply, {:error, error}, state} {:ok, _} -> {:ok, executions} = Runs.get_module_executions(state.db, module) - {:ok, ref, state} = add_listener(state, {:module, module, space_id}, pid) + {:ok, ref, state} = add_listener(state, {:module, module, workspace_id}, pid) {:reply, {:ok, executions, ref}, state} end end - def handle_call({:subscribe_pools, space_id, pid}, _from, state) do - case lookup_space_by_id(state, space_id) do + def handle_call({:subscribe_pools, workspace_id, pid}, _from, state) do + case lookup_workspace_by_id(state, workspace_id) do {:error, error} -> {:reply, {:error, error}, state} {:ok, _} -> # TODO: include non-active pools that contain active workers - pools = Map.get(state.pools, space_id, %{}) - {:ok, ref, state} = add_listener(state, {:pools, space_id}, pid) + pools = Map.get(state.pools, workspace_id, %{}) + {:ok, ref, state} = add_listener(state, {:pools, workspace_id}, pid) {:reply, {:ok, pools, ref}, state} end end - def handle_call({:subscribe_pool, space_id, pool_name, pid}, _from, state) do - case lookup_space_by_id(state, space_id) do + def handle_call({:subscribe_pool, workspace_id, pool_name, pid}, _from, state) do + case lookup_workspace_by_id(state, workspace_id) do {:ok, _} -> - pool = Map.get(state.pools[space_id], pool_name) + pool = Map.get(state.pools[workspace_id], pool_name) {:ok, pool_workers} = Workers.get_pool_workers(state.db, pool_name) # TODO: include 'active' workers that aren't in this (potentially limited) list @@ -1276,7 +1285,7 @@ defmodule Coflux.Orchestration.Server do end ) - {:ok, ref, state} = add_listener(state, {:pool, space_id, pool_name}, pid) + {:ok, ref, state} = add_listener(state, {:pool, workspace_id, pool_name}, pid) {:reply, {:ok, pool, workers, ref}, state} {:error, error} -> @@ -1284,8 +1293,8 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:subscribe_sessions, space_id, pid}, _from, state) do - case lookup_space_by_id(state, space_id) do + def handle_call({:subscribe_sessions, workspace_id, pid}, _from, state) do + case lookup_workspace_by_id(state, workspace_id) do {:error, error} -> {:reply, {:error, error}, state} @@ -1293,7 +1302,7 @@ defmodule Coflux.Orchestration.Server do sessions = state.sessions |> Enum.filter(fn {_, session} -> - session.space_id == space_id + session.workspace_id == workspace_id end) |> Map.new(fn {session_id, session} -> executions = @@ -1311,28 +1320,28 @@ defmodule Coflux.Orchestration.Server do }} end) - {:ok, ref, state} = add_listener(state, {:sessions, space_id}, pid) + {:ok, ref, state} = add_listener(state, {:sessions, workspace_id}, pid) {:reply, {:ok, sessions, ref}, state} end end def handle_call( - {:subscribe_workflow, module, target_name, space_id, pid}, + {:subscribe_workflow, module, target_name, workspace_id, pid}, _from, state ) do - with {:ok, _} <- lookup_space_by_id(state, space_id), + with {:ok, _} <- lookup_workspace_by_id(state, workspace_id), {:ok, workflow} <- - Manifests.get_latest_workflow(state.db, space_id, module, target_name), + Manifests.get_latest_workflow(state.db, workspace_id, module, target_name), {:ok, instruction} <- if(workflow && workflow.instruction_id, do: Manifests.get_instruction(state.db, workflow.instruction_id), else: {:ok, nil} ), {:ok, runs} = - Runs.get_target_runs(state.db, module, target_name, :workflow, space_id) do + Runs.get_target_runs(state.db, module, target_name, :workflow, workspace_id) do {:ok, ref, state} = - add_listener(state, {:workflow, module, target_name, space_id}, pid) + add_listener(state, {:workflow, module, target_name, workspace_id}, pid) {:reply, {:ok, workflow, instruction, runs, ref}, state} else @@ -1417,7 +1426,7 @@ defmodule Coflux.Orchestration.Server do executions: run_executions |> Enum.filter(&(elem(&1, 1) == step.id)) - |> Map.new(fn {execution_id, _step_id, attempt, space_id, execute_after, + |> Map.new(fn {execution_id, _step_id, attempt, workspace_id, execute_after, created_at, assigned_at} -> {result, completed_at} = Map.fetch!(results, execution_id) @@ -1447,7 +1456,7 @@ defmodule Coflux.Orchestration.Server do {attempt, %{ execution_id: execution_id, - space_id: space_id, + workspace_id: workspace_id, created_at: created_at, execute_after: execute_after, assigned_at: assigned_at, @@ -1485,11 +1494,11 @@ defmodule Coflux.Orchestration.Server do end end - def handle_call({:subscribe_targets, space_id, pid}, _from, state) do + def handle_call({:subscribe_targets, workspace_id, pid}, _from, state) do # TODO: indicate which are archived - {:ok, workflows} = Manifests.get_all_workflows_for_space(state.db, space_id) + {:ok, workflows} = Manifests.get_all_workflows_for_workspace(state.db, workspace_id) - {:ok, steps} = Runs.get_steps_for_space(state.db, space_id) + {:ok, steps} = Runs.get_steps_for_workspace(state.db, workspace_id) result = Enum.reduce(workflows, %{}, fn {module_name, target_names}, result -> @@ -1516,7 +1525,7 @@ defmodule Coflux.Orchestration.Server do end ) - {:ok, ref, state} = add_listener(state, {:targets, space_id}, pid) + {:ok, ref, state} = add_listener(state, {:targets, workspace_id}, pid) {:reply, {:ok, result, ref}, state} end @@ -1568,7 +1577,7 @@ defmodule Coflux.Orchestration.Server do executions = Enum.filter(executions, fn execution -> - state.spaces[execution.space_id].state == :active + state.workspaces[execution.workspace_id].state == :active end) now = System.os_time(:millisecond) @@ -1622,13 +1631,13 @@ defmodule Coflux.Orchestration.Server do # TODO: support caching for other attempts? cached_execution_id = if execution.attempt == 1 && execution.cache_config_id do - cache_space_ids = get_cache_space_ids(state, execution.space_id) + cache_workspace_ids = get_cache_workspace_ids(state, execution.workspace_id) cache = Map.fetch!(cache_configs, execution.cache_config_id) recorded_after = if cache.max_age, do: now - cache.max_age, else: 0 case Runs.find_cached_execution( state.db, - cache_space_ids, + cache_workspace_ids, execution.step_id, execution.cache_key, recorded_after @@ -1685,7 +1694,7 @@ defmodule Coflux.Orchestration.Server do execution.target, execution.type, arguments, - execution.space_id, + execution.workspace_id, parent_id: execution.execution_id, cache: if(execution.cache_config_id, @@ -1739,9 +1748,9 @@ defmodule Coflux.Orchestration.Server do assigned_groups = assigned - |> Enum.group_by(fn {execution, _} -> execution.space_id end) - |> Map.new(fn {space_id, executions} -> - {space_id, + |> Enum.group_by(fn {execution, _} -> execution.workspace_id end) + |> Map.new(fn {workspace_id, executions} -> + {workspace_id, executions |> Enum.group_by( fn {execution, _} -> execution.module end, @@ -1751,17 +1760,17 @@ defmodule Coflux.Orchestration.Server do end) state = - Enum.reduce(assigned_groups, state, fn {space_id, space_executions}, state -> + Enum.reduce(assigned_groups, state, fn {workspace_id, workspace_executions}, state -> notify_listeners( state, - {:modules, space_id}, - {:assigned, space_executions} + {:modules, workspace_id}, + {:assigned, workspace_executions} ) end) state = - Enum.reduce(assigned_groups, state, fn {space_id, space_executions}, state -> - Enum.reduce(space_executions, state, fn {module, execution_ids}, state -> + Enum.reduce(assigned_groups, state, fn {workspace_id, workspace_executions}, state -> + Enum.reduce(workspace_executions, state, fn {module, execution_ids}, state -> module_executions = Enum.reduce(assigned, %{}, fn {execution, assigned_at}, module_executions -> if MapSet.member?(execution_ids, execution.execution_id) do @@ -1773,7 +1782,7 @@ defmodule Coflux.Orchestration.Server do notify_listeners( state, - {:module, module, space_id}, + {:module, module, workspace_id}, {:assigned, module_executions} ) end) @@ -1789,8 +1798,8 @@ defmodule Coflux.Orchestration.Server do end) unassigned - |> Enum.group_by(& &1.space_id) - |> Enum.reduce(state, fn {space_id, executions}, state -> + |> Enum.group_by(& &1.workspace_id) + |> Enum.reduce(state, fn {workspace_id, executions}, state -> executions |> Enum.map(fn execution -> requires = @@ -1808,7 +1817,7 @@ defmodule Coflux.Orchestration.Server do {:ok, worker_id, created_at} -> {pool_name, pool} = Enum.find( - state.pools[space_id], + state.pools[workspace_id], &(elem(&1, 1).id == pool_id) ) @@ -1826,7 +1835,7 @@ defmodule Coflux.Orchestration.Server do ] {:ok, session_id, external_id, token, secret_hash, session_now} = - Sessions.create_session(state.db, space_id, worker_id, session_opts) + Sessions.create_session(state.db, workspace_id, worker_id, session_opts) session = %{ external_id: external_id, @@ -1837,7 +1846,7 @@ defmodule Coflux.Orchestration.Server do starting: MapSet.new(), executing: MapSet.new(), concurrency: 0, - space_id: space_id, + workspace_id: workspace_id, provides: pool.provides, worker_id: worker_id, last_idle_at: session_now, @@ -1855,7 +1864,7 @@ defmodule Coflux.Orchestration.Server do :launch, [ state.project_id, - state.spaces[space_id].name, + state.workspaces[workspace_id].name, token, pool.modules, Map.delete(pool.launcher, :type) @@ -1875,7 +1884,7 @@ defmodule Coflux.Orchestration.Server do state |> put_in([Access.key(:workers), worker_id, Access.key(:data)], data) |> notify_listeners( - {:pool, space_id, pool_name}, + {:pool, workspace_id, pool_name}, {:launch_result, worker_id, started_at, error} ) @@ -1893,7 +1902,7 @@ defmodule Coflux.Orchestration.Server do created_at: created_at, pool_id: pool_id, pool_name: pool_name, - space_id: space_id, + workspace_id: workspace_id, state: :active, data: nil, session_id: session_id, @@ -1901,7 +1910,7 @@ defmodule Coflux.Orchestration.Server do last_poll_at: nil }) |> notify_listeners( - {:pool, space_id, pool_name}, + {:pool, workspace_id, pool_name}, {:worker, worker_id, created_at} ) end @@ -1940,7 +1949,7 @@ defmodule Coflux.Orchestration.Server do end end) |> Enum.reduce(state, fn {worker_id, worker}, state -> - {:ok, launcher} = Spaces.get_launcher_for_pool(state.db, worker.pool_id) + {:ok, launcher} = Workspaces.get_launcher_for_pool(state.db, worker.pool_id) state |> call_launcher(launcher, :poll, [worker.data], fn state, result -> @@ -1978,7 +1987,7 @@ defmodule Coflux.Orchestration.Server do end) end) |> Enum.reduce(state, fn {worker_id, worker}, state -> - update_worker_state(state, worker_id, :draining, worker.space_id, worker.pool_name) + update_worker_state(state, worker_id, :draining, worker.workspace_id, worker.pool_name) end) state = @@ -1995,13 +2004,13 @@ defmodule Coflux.Orchestration.Server do end) |> Enum.reduce(state, fn {worker_id, worker}, state -> {:ok, worker_stop_id, stopping_at} = Workers.create_worker_stop(state.db, worker_id) - {:ok, launcher} = Spaces.get_launcher_for_pool(state.db, worker.pool_id) + {:ok, launcher} = Workspaces.get_launcher_for_pool(state.db, worker.pool_id) state = state |> put_in([Access.key(:workers), worker_id, :stop_id], worker_stop_id) |> notify_listeners( - {:pool, worker.space_id, worker.pool_name}, + {:pool, worker.workspace_id, worker.pool_name}, {:worker_stopping, worker_id, stopping_at} ) @@ -2013,7 +2022,7 @@ defmodule Coflux.Orchestration.Server do state |> notify_listeners( - {:pool, worker.space_id, worker.pool_name}, + {:pool, worker.workspace_id, worker.pool_name}, {:worker_stop_result, worker_id, stopped_at, nil} ) @@ -2027,7 +2036,7 @@ defmodule Coflux.Orchestration.Server do state = notify_listeners( state, - {:pool, worker.space_id, worker.pool_name}, + {:pool, worker.workspace_id, worker.pool_name}, {:worker_stop_result, worker_id, nil, error} ) @@ -2125,7 +2134,7 @@ defmodule Coflux.Orchestration.Server do ) |> schedule_session_expiry(session_id, session.reconnection_timeout) |> notify_listeners( - {:sessions, session.space_id}, + {:sessions, session.workspace_id}, {:connected, session_id, false} ) @@ -2135,7 +2144,7 @@ defmodule Coflux.Orchestration.Server do notify_listeners( state, - {:pool, session.space_id, pool_name}, + {:pool, session.workspace_id, pool_name}, {:worker_connected, session.worker_id, false} ) else @@ -2169,67 +2178,67 @@ defmodule Coflux.Orchestration.Server do Store.close(state.db) end - defp lookup_space_by_name(state, space_name) do - case Map.fetch(state.space_names, space_name) do - {:ok, space_id} -> - space = Map.fetch!(state.spaces, space_id) + defp lookup_workspace_by_name(state, workspace_name) do + case Map.fetch(state.workspace_names, workspace_name) do + {:ok, workspace_id} -> + workspace = Map.fetch!(state.workspaces, workspace_id) - if space.state != :archived do - {:ok, space_id, space} + if workspace.state != :archived do + {:ok, workspace_id, workspace} else - {:error, :space_invalid} + {:error, :workspace_invalid} end :error -> - {:error, :space_invalid} + {:error, :workspace_invalid} end end - defp lookup_space_by_id(state, space_id) do - case Map.fetch(state.spaces, space_id) do - {:ok, space} -> - # TODO: include space? Map.fetch!(state.spaces, space_id) - {:ok, space} + defp lookup_workspace_by_id(state, workspace_id) do + case Map.fetch(state.workspaces, workspace_id) do + {:ok, workspace} -> + # TODO: include workspace? Map.fetch!(state.workspaces, workspace_id) + {:ok, workspace} :error -> - {:error, :space_invalid} + {:error, :workspace_invalid} end end - defp is_space_ancestor?(state, maybe_ancestor_id, space_id) do + defp is_workspace_ancestor?(state, maybe_ancestor_id, workspace_id) do # TODO: avoid cycle? - space = Map.fetch!(state.spaces, space_id) + workspace = Map.fetch!(state.workspaces, workspace_id) cond do - !space.base_id -> + !workspace.base_id -> false - space.base_id == maybe_ancestor_id -> + workspace.base_id == maybe_ancestor_id -> true true -> - is_space_ancestor?(state, maybe_ancestor_id, space.base_id) + is_workspace_ancestor?(state, maybe_ancestor_id, workspace.base_id) end end - defp get_cache_space_ids(state, space_id, ids \\ []) do - space = Map.fetch!(state.spaces, space_id) + defp get_cache_workspace_ids(state, workspace_id, ids \\ []) do + workspace = Map.fetch!(state.workspaces, workspace_id) - if space.base_id do - get_cache_space_ids(state, space.base_id, [space_id | ids]) + if workspace.base_id do + get_cache_workspace_ids(state, workspace.base_id, [workspace_id | ids]) else - [space_id | ids] + [workspace_id | ids] end end - defp lookup_worker(state, worker_id, expected_space_id) do + defp lookup_worker(state, worker_id, expected_workspace_id) do if worker_id do case Map.fetch(state.workers, worker_id) do :error -> {:error, :no_worker} {:ok, worker} -> - if worker.space_id != expected_space_id do + if worker.workspace_id != expected_workspace_id do {:error, :no_worker} else {:ok, worker} @@ -2294,7 +2303,7 @@ defmodule Coflux.Orchestration.Server do |> Map.new() end) |> notify_listeners( - {:sessions, session.space_id}, + {:sessions, session.workspace_id}, {:session, session_id, nil} ) @@ -2302,11 +2311,11 @@ defmodule Coflux.Orchestration.Server do if session.worker_id do case Map.fetch(state.workers, session.worker_id) do {:ok, worker} -> - # TODO: check that worker space matches session space? + # TODO: check that worker workspace matches session workspace? state |> put_in([Access.key(:workers), session.worker_id, :session_id], nil) |> notify_listeners( - {:pool, worker.space_id, worker.pool_name}, + {:pool, worker.workspace_id, worker.pool_name}, {:worker_connected, session.worker_id, false} ) @@ -2329,7 +2338,7 @@ defmodule Coflux.Orchestration.Server do fn execution, {due, future, defer, defer_keys} -> defer_key = execution.defer_key && - {execution.module, execution.target, execution.space_id, execution.defer_key} + {execution.module, execution.target, execution.workspace_id, execution.defer_key} defer_id = defer_key && Map.get(defer_keys, defer_key) @@ -2357,8 +2366,8 @@ defmodule Coflux.Orchestration.Server do {executions_due, executions_future, executions_defer} end - defp schedule_run(state, module, target_name, type, arguments, space_id, opts) do - cache_space_ids = get_cache_space_ids(state, space_id) + defp schedule_run(state, module, target_name, type, arguments, workspace_id, opts) do + cache_workspace_ids = get_cache_workspace_ids(state, workspace_id) case Runs.schedule_run( state.db, @@ -2366,8 +2375,8 @@ defmodule Coflux.Orchestration.Server do target_name, type, arguments, - space_id, - cache_space_ids, + workspace_id, + cache_workspace_ids, opts ) do {:ok, @@ -2385,20 +2394,20 @@ defmodule Coflux.Orchestration.Server do state = state |> notify_listeners( - {:workflow, module, target_name, space_id}, + {:workflow, module, target_name, workspace_id}, {:run, external_run_id, created_at} ) |> notify_listeners( - {:modules, space_id}, + {:modules, workspace_id}, {:scheduled, module, execution_id, execute_at} ) |> notify_listeners( - {:module, module, space_id}, + {:module, module, workspace_id}, {:scheduled, execution_id, target_name, external_run_id, external_step_id, attempt, execute_after, created_at} ) |> notify_listeners( - {:targets, space_id}, + {:targets, workspace_id}, {:step, module, target_name, type, external_run_id, external_step_id, attempt} ) @@ -2406,14 +2415,14 @@ defmodule Coflux.Orchestration.Server do end end - defp rerun_step(state, step, space_id, opts \\ []) do + defp rerun_step(state, step, workspace_id, opts \\ []) do execute_after = Keyword.get(opts, :execute_after, nil) dependency_ids = Keyword.get(opts, :dependency_ids, []) # TODO: only get run if needed for notify? {:ok, run} = Runs.get_run_by_id(state.db, step.run_id) - case Runs.rerun_step(state.db, step.id, space_id, execute_after, dependency_ids) do + case Runs.rerun_step(state.db, step.id, workspace_id, execute_after, dependency_ids) do {:ok, execution_id, attempt, created_at} -> {:ok, {run_module, run_target}} = Runs.get_run_target(state.db, run.id) @@ -2428,20 +2437,20 @@ defmodule Coflux.Orchestration.Server do state |> notify_listeners( {:run, step.run_id}, - {:execution, step.external_id, attempt, execution_id, space_id, created_at, + {:execution, step.external_id, attempt, execution_id, workspace_id, created_at, execute_after, dependencies} ) |> notify_listeners( - {:modules, space_id}, + {:modules, workspace_id}, {:scheduled, step.module, execution_id, execute_at} ) |> notify_listeners( - {:module, step.module, space_id}, + {:module, step.module, workspace_id}, {:scheduled, execution_id, step.target, run.external_id, step.external_id, attempt, execute_after, created_at} ) |> notify_listeners( - {:targets, space_id}, + {:targets, workspace_id}, {:step, step.module, step.target, step.type, run.external_id, step.external_id, attempt} ) @@ -2451,7 +2460,7 @@ defmodule Coflux.Orchestration.Server do :workflow -> notify_listeners( state, - {:workflow, run_module, run_target, space_id}, + {:workflow, run_module, run_target, workspace_id}, {:run, run.external_id, run.created_at} ) @@ -2566,7 +2575,7 @@ defmodule Coflux.Orchestration.Server do # TODO: remove 'module' argument? defp record_and_notify_result(state, execution_id, result, module) do - {:ok, space_id} = Runs.get_space_id_for_execution(state.db, execution_id) + {:ok, workspace_id} = Runs.get_workspace_id_for_execution(state.db, execution_id) {:ok, successors} = Runs.get_result_successors(state.db, execution_id) case Results.record_result(state.db, execution_id, result) do @@ -2600,11 +2609,11 @@ defmodule Coflux.Orchestration.Server do end end) |> notify_listeners( - {:modules, space_id}, + {:modules, workspace_id}, {:completed, module, execution_id} ) |> notify_listeners( - {:module, module, space_id}, + {:module, module, workspace_id}, {:completed, execution_id} ) @@ -2625,7 +2634,7 @@ defmodule Coflux.Orchestration.Server do {:ok, false} -> {:ok, step} = Runs.get_step_for_execution(state.db, execution_id) - {:ok, space_id} = Runs.get_space_id_for_execution(state.db, execution_id) + {:ok, workspace_id} = Runs.get_workspace_id_for_execution(state.db, execution_id) {retry_id, state} = cond do @@ -2635,7 +2644,7 @@ defmodule Coflux.Orchestration.Server do # TODO: limit the number of times a step can suspend? (or rate?) {:ok, retry_id, _, state} = - rerun_step(state, step, space_id, + rerun_step(state, step, workspace_id, execute_after: execute_after, dependency_ids: dependency_ids ) @@ -2653,7 +2662,7 @@ defmodule Coflux.Orchestration.Server do execute_after = System.os_time(:millisecond) + delay_s * 1000 {:ok, retry_id, _, state} = - rerun_step(state, step, space_id, execute_after: execute_after) + rerun_step(state, step, workspace_id, execute_after: execute_after) {retry_id, state} @@ -2677,7 +2686,7 @@ defmodule Coflux.Orchestration.Server do execute_after = System.os_time(:millisecond) + delay_s * 1000 {:ok, retry_id, _, state} = - rerun_step(state, step, space_id, execute_after: execute_after) + rerun_step(state, step, workspace_id, execute_after: execute_after) {retry_id, state} else @@ -2690,7 +2699,7 @@ defmodule Coflux.Orchestration.Server do System.os_time(:millisecond) + step.delay end - {:ok, _, _, state} = rerun_step(state, step, space_id, execute_after: execute_after) + {:ok, _, _, state} = rerun_step(state, step, workspace_id, execute_after: execute_after) {nil, state} true -> @@ -2713,6 +2722,7 @@ defmodule Coflux.Orchestration.Server do step.module ) do {:ok, state} -> state + {:error, :already_recorded} -> state end {:ok, state} @@ -2909,7 +2919,7 @@ defmodule Coflux.Orchestration.Server do Enum.filter(target.session_ids, fn session_id -> session = Map.fetch!(state.sessions, session_id) - session.space_id == execution.space_id && session.connection && + session.workspace_id == execution.workspace_id && session.connection && !session_at_capacity?(session) && session_active?(session, state) && has_requirements?(session.provides, requires) @@ -2925,7 +2935,7 @@ defmodule Coflux.Orchestration.Server do defp choose_pool(state, execution, requires) do pools = state.pools - |> Map.get(execution.space_id, %{}) + |> Map.get(execution.workspace_id, %{}) |> Map.filter(fn {_, pool} -> pool.launcher && execution.module in pool.modules && has_requirements?(pool.provides, requires) @@ -3059,7 +3069,7 @@ defmodule Coflux.Orchestration.Server do put_in(state, [Access.key(:launcher_tasks), task.ref], callback) end - defp update_worker_state(state, worker_id, worker_state, space_id, pool_name) do + defp update_worker_state(state, worker_id, worker_state, workspace_id, pool_name) do :ok = Workers.create_worker_state(state.db, worker_id, worker_state) state @@ -3068,7 +3078,7 @@ defmodule Coflux.Orchestration.Server do worker_state ) |> notify_listeners( - {:pool, space_id, pool_name}, + {:pool, workspace_id, pool_name}, {:worker_state, worker_id, worker_state} ) end @@ -3080,7 +3090,7 @@ defmodule Coflux.Orchestration.Server do notify_listeners( state, - {:pool, worker.space_id, worker.pool_name}, + {:pool, worker.workspace_id, worker.pool_name}, {:worker_deactivated, worker_id, deactivated_at, error} ) end diff --git a/server/lib/coflux/orchestration/sessions.ex b/server/lib/coflux/orchestration/sessions.ex index a480d6c1..a2b9b5bd 100644 --- a/server/lib/coflux/orchestration/sessions.ex +++ b/server/lib/coflux/orchestration/sessions.ex @@ -3,7 +3,7 @@ defmodule Coflux.Orchestration.Sessions do import Coflux.Store - def create_session(db, space_id, worker_id, opts \\ []) do + def create_session(db, workspace_id, worker_id, opts \\ []) do provides = Keyword.get(opts, :provides) concurrency = Keyword.get(opts, :concurrency, 0) activation_timeout = Keyword.get(opts, :activation_timeout) @@ -28,7 +28,7 @@ defmodule Coflux.Orchestration.Sessions do case insert_one(db, :sessions, %{ external_id: external_id, - space_id: space_id, + workspace_id: workspace_id, worker_id: worker_id, provides_tag_set_id: provides_tag_set_id, concurrency: concurrency, @@ -96,7 +96,7 @@ defmodule Coflux.Orchestration.Sessions do SELECT s.id, s.external_id, - s.space_id, + s.workspace_id, s.worker_id, s.provides_tag_set_id, s.concurrency, diff --git a/server/lib/coflux/orchestration/workers.ex b/server/lib/coflux/orchestration/workers.ex index 9862fc2b..2ee54cb8 100644 --- a/server/lib/coflux/orchestration/workers.ex +++ b/server/lib/coflux/orchestration/workers.ex @@ -82,7 +82,7 @@ defmodule Coflux.Orchestration.Workers do w.created_at, p.id, p.name, - p.space_id, + p.workspace_id, (SELECT ws.state FROM worker_states AS ws WHERE ws.worker_id = w.id @@ -109,8 +109,8 @@ defmodule Coflux.Orchestration.Workers do {:ok, Enum.map( rows, - fn {worker_id, created_at, pool_id, pool_name, space_id, state, data} -> - {worker_id, created_at, pool_id, pool_name, space_id, decode_state(state), + fn {worker_id, created_at, pool_id, pool_name, workspace_id, state, data} -> + {worker_id, created_at, pool_id, pool_name, workspace_id, decode_state(state), if(data, do: :erlang.binary_to_term(data))} end )} diff --git a/server/lib/coflux/orchestration/spaces.ex b/server/lib/coflux/orchestration/workspaces.ex similarity index 69% rename from server/lib/coflux/orchestration/spaces.ex rename to server/lib/coflux/orchestration/workspaces.ex index ec572c0c..b661e2f8 100644 --- a/server/lib/coflux/orchestration/spaces.ex +++ b/server/lib/coflux/orchestration/workspaces.ex @@ -1,47 +1,47 @@ -defmodule Coflux.Orchestration.Spaces do +defmodule Coflux.Orchestration.Workspaces do alias Coflux.Orchestration.TagSets import Coflux.Store - def get_all_spaces(db) do + def get_all_workspaces(db) do case query( db, """ SELECT w.id, (SELECT ws.state - FROM space_states AS ws - WHERE ws.space_id = w.id + FROM workspace_states AS ws + WHERE ws.workspace_id = w.id ORDER BY ws.created_at DESC LIMIT 1) AS state, (SELECT wn.name - FROM space_names AS wn - WHERE wn.space_id = w.id + FROM workspace_names AS wn + WHERE wn.workspace_id = w.id ORDER BY wn.created_at DESC LIMIT 1) AS name, - (SELECT wb.base_id - FROM space_bases AS wb - WHERE wb.space_id = w.id + (SELECT wb.base_workspace_id + FROM workspace_bases AS wb + WHERE wb.workspace_id = w.id ORDER BY wb.created_at DESC LIMIT 1) AS base_id - FROM spaces AS w + FROM workspaces AS w """ ) do {:ok, rows} -> - spaces = - Enum.reduce(rows, %{}, fn {space_id, state, name, base_id}, result -> - Map.put(result, space_id, %{ + workspaces = + Enum.reduce(rows, %{}, fn {workspace_id, state, name, base_id}, result -> + Map.put(result, workspace_id, %{ name: name, base_id: base_id, state: decode_state(state) }) end) - {:ok, spaces} + {:ok, workspaces} end end - def get_space_pools(db, space_id) do + def get_workspace_pools(db, workspace_id) do case query( db, """ @@ -50,12 +50,12 @@ defmodule Coflux.Orchestration.Spaces do JOIN ( SELECT name, MAX(created_at) AS created_at FROM pools - WHERE space_id = ?1 + WHERE workspace_id = ?1 GROUP BY name ) latest ON p.name = latest.name AND p.created_at = latest.created_at - WHERE p.space_id = ?1 AND p.pool_definition_id IS NOT NULL + WHERE p.workspace_id = ?1 AND p.pool_definition_id IS NOT NULL """, - {space_id} + {workspace_id} ) do {:ok, rows} -> {:ok, @@ -66,85 +66,85 @@ defmodule Coflux.Orchestration.Spaces do end end - defp space_name_used?(db, space_name) do + defp workspace_name_used?(db, workspace_name) do # TODO: neater way to do this? case query( db, """ SELECT (SELECT ws.state - FROM space_states AS ws - WHERE ws.space_id = w.id + FROM workspace_states AS ws + WHERE ws.workspace_id = w.id ORDER BY ws.created_at DESC LIMIT 1) AS state, (SELECT wn.name - FROM space_names AS wn - WHERE wn.space_id = w.id + FROM workspace_names AS wn + WHERE wn.workspace_id = w.id ORDER BY wn.created_at DESC LIMIT 1) AS name - FROM spaces AS w + FROM workspaces AS w """ ) do {:ok, rows} -> {:ok, Enum.any?(rows, fn {state, name} -> - name == space_name && decode_state(state) != :archived + name == workspace_name && decode_state(state) != :archived end)} end end - defp has_active_child_spaces?(db, space_id) do + defp has_active_child_workspaces?(db, workspace_id) do # TODO: neater way to do this? case query( db, """ SELECT (SELECT ws.state - FROM space_states AS ws - WHERE ws.space_id = w.id + FROM workspace_states AS ws + WHERE ws.workspace_id = w.id ORDER BY ws.created_at DESC LIMIT 1) AS state, - (SELECT wb.base_id - FROM space_bases AS wb - WHERE wb.space_id = w.id + (SELECT wb.base_workspace_id + FROM workspace_bases AS wb + WHERE wb.workspace_id = w.id ORDER BY wb.created_at DESC LIMIT 1) AS base_id - FROM spaces AS w + FROM workspaces AS w """ ) do {:ok, rows} -> {:ok, - Enum.any?(rows, fn {state, base_id} -> - base_id == space_id && decode_state(state) != :archived + Enum.any?(rows, fn {state, base_workspace_id} -> + base_workspace_id == workspace_id && decode_state(state) != :archived end)} end end - # TODO: change to 'get_active_space_by_id'? - defp get_space_by_id(db, space_id) do + # TODO: change to 'get_active_workspace_by_id'? + defp get_workspace_by_id(db, workspace_id) do case query_one( db, """ SELECT (SELECT ws.state - FROM space_states AS ws - WHERE ws.space_id = w.id + FROM workspace_states AS ws + WHERE ws.workspace_id = w.id ORDER BY ws.created_at DESC LIMIT 1) AS state, (SELECT wn.name - FROM space_names AS wn - WHERE wn.space_id = w.id + FROM workspace_names AS wn + WHERE wn.workspace_id = w.id ORDER BY wn.created_at DESC LIMIT 1) AS name, - (SELECT wb.base_id - FROM space_bases AS wb - WHERE wb.space_id = w.id + (SELECT wb.base_workspace_id + FROM workspace_bases AS wb + WHERE wb.workspace_id = w.id ORDER BY wb.created_at DESC LIMIT 1) AS base_id - FROM spaces AS w + FROM workspaces AS w WHERE w.id = ?1 """, - {space_id} + {workspace_id} ) do {:ok, {state, name, base_id}} -> {:ok, %{state: decode_state(state), name: name, base_id: base_id}} @@ -154,17 +154,17 @@ defmodule Coflux.Orchestration.Spaces do end end - def create_space(db, name, base_id) do + def create_workspace(db, name, base_id) do with_transaction(db, fn -> - space = %{ + workspace = %{ state: :active, name: name, base_id: base_id } - {space, errors} = + {workspace, errors} = validate( - space, + workspace, name: &validate_name(&1, db), base_id: &validate_base_id(&1, db) ) @@ -173,31 +173,31 @@ defmodule Coflux.Orchestration.Spaces do {:error, errors} else now = current_timestamp() - {:ok, space_id} = insert_one(db, :spaces, %{}) - {:ok, _} = insert_space_state(db, space_id, space.state, now) - {:ok, _} = insert_space_name(db, space_id, space.name, now) - {:ok, _} = insert_space_base(db, space_id, space.base_id, now) + {:ok, workspace_id} = insert_one(db, :workspaces, %{}) + {:ok, _} = insert_workspace_state(db, workspace_id, workspace.state, now) + {:ok, _} = insert_workspace_name(db, workspace_id, workspace.name, now) + {:ok, _} = insert_workspace_base(db, workspace_id, workspace.base_id, now) - {:ok, space_id, space} + {:ok, workspace_id, workspace} end end) end - def update_space(db, space_id, updates) do + def update_workspace(db, workspace_id, updates) do with_transaction(db, fn -> - case get_space_by_id(db, space_id) do + case get_workspace_by_id(db, workspace_id) do {:ok, nil} -> {:error, :not_found} {:ok, %{state: :archived}} -> {:error, :not_found} - {:ok, space} -> + {:ok, workspace} -> {updates, errors} = validate( updates, name: &validate_name(&1, db), - base_id: &validate_base_id(&1, db, space_id) + base_id: &validate_base_id(&1, db, workspace_id) ) if Enum.any?(errors) do @@ -205,26 +205,26 @@ defmodule Coflux.Orchestration.Spaces do else now = current_timestamp() - if Map.has_key?(updates, :name) && updates.name != space.name do - {:ok, _} = insert_space_name(db, space_id, updates.name, now) + if Map.has_key?(updates, :name) && updates.name != workspace.name do + {:ok, _} = insert_workspace_name(db, workspace_id, updates.name, now) end - if Map.has_key?(updates, :base_id) && updates.base_id != space.base_id do - {:ok, _} = insert_space_base(db, space_id, updates.base_id, now) + if Map.has_key?(updates, :base_id) && updates.base_id != workspace.base_id do + {:ok, _} = insert_workspace_base(db, workspace_id, updates.base_id, now) end - # TODO: don't return space - move this to separate function? - {:ok, space} = get_space_by_id(db, space_id) + # TODO: don't return workspace - move this to separate function? + {:ok, workspace} = get_workspace_by_id(db, workspace_id) - {:ok, space} + {:ok, workspace} end end end) end - def pause_space(db, space_id) do + def pause_workspace(db, workspace_id) do with_transaction(db, fn -> - case get_space_by_id(db, space_id) do + case get_workspace_by_id(db, workspace_id) do {:ok, nil} -> {:error, :not_found} @@ -232,7 +232,7 @@ defmodule Coflux.Orchestration.Spaces do {:error, :not_found} {:ok, %{state: :active}} -> - {:ok, _} = insert_space_state(db, space_id, :paused, current_timestamp()) + {:ok, _} = insert_workspace_state(db, workspace_id, :paused, current_timestamp()) :ok {:ok, %{state: :paused}} -> @@ -241,9 +241,9 @@ defmodule Coflux.Orchestration.Spaces do end) end - def resume_space(db, space_id) do + def resume_workspace(db, workspace_id) do with_transaction(db, fn -> - case get_space_by_id(db, space_id) do + case get_workspace_by_id(db, workspace_id) do {:ok, nil} -> {:error, :not_found} @@ -251,7 +251,7 @@ defmodule Coflux.Orchestration.Spaces do {:error, :not_found} {:ok, %{state: :paused}} -> - {:ok, _} = insert_space_state(db, space_id, :active, current_timestamp()) + {:ok, _} = insert_workspace_state(db, workspace_id, :active, current_timestamp()) :ok {:ok, %{state: :active}} -> @@ -260,9 +260,9 @@ defmodule Coflux.Orchestration.Spaces do end) end - def archive_space(db, space_id) do + def archive_workspace(db, workspace_id) do with_transaction(db, fn -> - case get_space_by_id(db, space_id) do + case get_workspace_by_id(db, workspace_id) do {:ok, nil} -> {:error, :not_found} @@ -270,13 +270,13 @@ defmodule Coflux.Orchestration.Spaces do {:error, :not_found} {:ok, _} -> - case has_active_child_spaces?(db, space_id) do + case has_active_child_workspaces?(db, workspace_id) do {:ok, true} -> {:error, :descendants} {:ok, false} -> {:ok, _} = - insert_space_state(db, space_id, :archived, current_timestamp()) + insert_workspace_state(db, workspace_id, :archived, current_timestamp()) :ok end @@ -284,7 +284,7 @@ defmodule Coflux.Orchestration.Spaces do end) end - def update_pool(db, space_id, pool_name, pool) do + def update_pool(db, workspace_id, pool_name, pool) do # TODO: validate pool (check launcher is specified) with_transaction(db, fn -> @@ -297,7 +297,7 @@ defmodule Coflux.Orchestration.Spaces do end {existing_pool_id, existing_pool_definition_id} = - case get_latest_pool(db, space_id, pool_name) do + case get_latest_pool(db, workspace_id, pool_name) do {:ok, {existing_pool_id, existing_pool_definition_id}} -> {existing_pool_id, existing_pool_definition_id} @@ -306,7 +306,7 @@ defmodule Coflux.Orchestration.Spaces do end if pool_definition_id != existing_pool_definition_id do - insert_space_pool(db, space_id, pool_name, pool_definition_id, now) + insert_workspace_pool(db, workspace_id, pool_name, pool_definition_id, now) else {:ok, existing_pool_id} end @@ -319,7 +319,7 @@ defmodule Coflux.Orchestration.Spaces do defp validate_name(name, db) do if is_valid_name?(name) do - case space_name_used?(db, name) do + case workspace_name_used?(db, name) do {:ok, false} -> :ok {:ok, true} -> {:error, :exists} end @@ -328,29 +328,29 @@ defmodule Coflux.Orchestration.Spaces do end end - defp get_ancestor_ids(db, space_id, ancestor_ids \\ []) do - case get_space_by_id(db, space_id) do + defp get_ancestor_ids(db, workspace_id, ancestor_ids \\ []) do + case get_workspace_by_id(db, workspace_id) do {:ok, %{base_id: nil}} -> {:ok, ancestor_ids} {:ok, %{base_id: base_id}} -> - get_ancestor_ids(db, base_id, [space_id | ancestor_ids]) + get_ancestor_ids(db, base_id, [workspace_id | ancestor_ids]) end end - defp validate_base_id(base_id, db, space_id \\ nil) do + defp validate_base_id(base_id, db, workspace_id \\ nil) do if is_nil(base_id) do :ok else - case get_space_by_id(db, base_id) do - {:ok, base} -> - if !base || base.state == :archived do + case get_workspace_by_id(db, base_id) do + {:ok, workspace} -> + if !workspace || workspace.state == :archived do {:error, :invalid} else - if space_id do + if workspace_id do case get_ancestor_ids(db, base_id) do {:ok, ancestor_ids} -> - if space_id in ancestor_ids do + if workspace_id in ancestor_ids do {:error, :invalid} else :ok @@ -502,17 +502,17 @@ defmodule Coflux.Orchestration.Spaces do end end - defp get_latest_pool(db, space_id, pool_name) do + defp get_latest_pool(db, workspace_id, pool_name) do query_one( db, """ SELECT id, pool_definition_id FROM pools - WHERE space_id = ?1 AND name = ?2 + WHERE workspace_id = ?1 AND name = ?2 ORDER BY created_at DESC LIMIT 1 """, - {space_id, pool_name} + {workspace_id, pool_name} ) end @@ -560,33 +560,33 @@ defmodule Coflux.Orchestration.Spaces do end end - defp insert_space_state(db, space_id, state, created_at) do - insert_one(db, :space_states, %{ - space_id: space_id, + defp insert_workspace_state(db, workspace_id, state, created_at) do + insert_one(db, :workspace_states, %{ + workspace_id: workspace_id, state: encode_state(state), created_at: created_at }) end - defp insert_space_name(db, space_id, name, created_at) do - insert_one(db, :space_names, %{ - space_id: space_id, + defp insert_workspace_name(db, workspace_id, name, created_at) do + insert_one(db, :workspace_names, %{ + workspace_id: workspace_id, name: name, created_at: created_at }) end - defp insert_space_base(db, space_id, base_id, created_at) do - insert_one(db, :space_bases, %{ - space_id: space_id, - base_id: base_id, + defp insert_workspace_base(db, workspace_id, base_workspace_id, created_at) do + insert_one(db, :workspace_bases, %{ + workspace_id: workspace_id, + base_workspace_id: base_workspace_id, created_at: created_at }) end - defp insert_space_pool(db, space_id, pool_name, pool_definition_id, created_at) do + defp insert_workspace_pool(db, workspace_id, pool_name, pool_definition_id, created_at) do insert_one(db, :pools, %{ - space_id: space_id, + workspace_id: workspace_id, name: pool_name, pool_definition_id: pool_definition_id, created_at: created_at diff --git a/server/lib/coflux/topics/logs.ex b/server/lib/coflux/topics/logs.ex index 8badf407..88833b85 100644 --- a/server/lib/coflux/topics/logs.ex +++ b/server/lib/coflux/topics/logs.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Logs do - use Topical.Topic, route: ["projects", :project_id, "runs", :run_id, "logs", :space_id] + use Topical.Topic, route: ["projects", :project_id, "runs", :run_id, "logs", :workspace_id] alias Coflux.Orchestration @@ -16,13 +16,13 @@ defmodule Coflux.Topics.Logs do def init(params) do project_id = Map.fetch!(params, :project_id) run_id = Map.fetch!(params, :run_id) - space_id = String.to_integer(Map.fetch!(params, :space_id)) + workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) case Orchestration.subscribe_run(project_id, run_id, self()) do {:ok, _run, _parent, steps, _ref} -> case Orchestration.subscribe_logs(project_id, run_id, self()) do {:ok, _ref, messages} -> - run_space_id = + run_workspace_id = steps |> Map.values() |> Enum.reject(& &1.parent_id) @@ -30,9 +30,9 @@ defmodule Coflux.Topics.Logs do |> Map.fetch!(:executions) |> Map.values() |> Enum.min_by(& &1.created_at) - |> Map.fetch!(:space_id) + |> Map.fetch!(:workspace_id) - space_ids = Enum.uniq([run_space_id, space_id]) + workspace_ids = Enum.uniq([run_workspace_id, workspace_id]) execution_ids = steps @@ -40,7 +40,7 @@ defmodule Coflux.Topics.Logs do |> Enum.flat_map(fn step -> step.executions |> Map.values() - |> Enum.filter(&(&1.space_id in space_ids)) + |> Enum.filter(&(&1.workspace_id in workspace_ids)) |> Enum.map(& &1.execution_id) end) |> MapSet.new() @@ -50,7 +50,7 @@ defmodule Coflux.Topics.Logs do |> Enum.filter(&(elem(&1, 0) in execution_ids)) |> Enum.map(&build_message/1) |> Topic.new(%{ - space_ids: space_ids, + workspace_ids: workspace_ids, execution_ids: execution_ids }) @@ -64,8 +64,8 @@ defmodule Coflux.Topics.Logs do {:ok, topic} end - defp process_notification(topic, {:execution, _, _, execution_id, space_id, _, _, _}) do - if space_id in topic.state.space_ids do + defp process_notification(topic, {:execution, _, _, execution_id, workspace_id, _, _, _}) do + if workspace_id in topic.state.workspace_ids do update_in(topic.state.execution_ids, &MapSet.put(&1, execution_id)) else topic diff --git a/server/lib/coflux/topics/module.ex b/server/lib/coflux/topics/module.ex index 5b55dffc..4f342520 100644 --- a/server/lib/coflux/topics/module.ex +++ b/server/lib/coflux/topics/module.ex @@ -1,6 +1,6 @@ defmodule Coflux.Topics.Module do use Topical.Topic, - route: ["projects", :project_id, "modules", :module, :space_id] + route: ["projects", :project_id, "modules", :module, :workspace_id] alias Coflux.Orchestration @@ -17,10 +17,10 @@ defmodule Coflux.Topics.Module do def init(params) do project_id = Map.fetch!(params, :project_id) module = Map.fetch!(params, :module) - space_id = String.to_integer(Map.fetch!(params, :space_id)) + workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) {:ok, executions, ref} = - Orchestration.subscribe_module(project_id, module, space_id, self()) + Orchestration.subscribe_module(project_id, module, workspace_id, self()) value = Map.new(executions, fn {execution_id, target_name, external_run_id, external_step_id, diff --git a/server/lib/coflux/topics/modules.ex b/server/lib/coflux/topics/modules.ex index e1d3bec1..0ff7f11a 100644 --- a/server/lib/coflux/topics/modules.ex +++ b/server/lib/coflux/topics/modules.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Modules do - use Topical.Topic, route: ["projects", :project_id, "modules", :space_id] + use Topical.Topic, route: ["projects", :project_id, "modules", :workspace_id] alias Coflux.Orchestration @@ -15,10 +15,10 @@ defmodule Coflux.Topics.Modules do def init(params) do project_id = Map.fetch!(params, :project_id) - space_id = String.to_integer(Map.fetch!(params, :space_id)) + workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) {:ok, manifests, executions, ref} = - Orchestration.subscribe_modules(project_id, space_id, self()) + Orchestration.subscribe_modules(project_id, workspace_id, self()) value = Map.new(manifests, fn {module, workflows} -> diff --git a/server/lib/coflux/topics/pool.ex b/server/lib/coflux/topics/pool.ex index 50946f8d..1ec22955 100644 --- a/server/lib/coflux/topics/pool.ex +++ b/server/lib/coflux/topics/pool.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Pool do - use Topical.Topic, route: ["projects", :project_id, "pools", :space_id, :pool_name] + use Topical.Topic, route: ["projects", :project_id, "pools", :workspace_id, :pool_name] alias Coflux.Orchestration @@ -15,10 +15,10 @@ defmodule Coflux.Topics.Pool do def init(params) do project_id = Map.fetch!(params, :project_id) - space_id = String.to_integer(Map.fetch!(params, :space_id)) + workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) pool_name = Map.fetch!(params, :pool_name) - case Orchestration.subscribe_pool(project_id, space_id, pool_name, self()) do + case Orchestration.subscribe_pool(project_id, workspace_id, pool_name, self()) do {:ok, pool, workers, ref} -> {:ok, Topic.new( diff --git a/server/lib/coflux/topics/pools.ex b/server/lib/coflux/topics/pools.ex index 3089b079..f5355e58 100644 --- a/server/lib/coflux/topics/pools.ex +++ b/server/lib/coflux/topics/pools.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Pools do - use Topical.Topic, route: ["projects", :project_id, "pools", :space_id] + use Topical.Topic, route: ["projects", :project_id, "pools", :workspace_id] alias Coflux.Orchestration @@ -15,10 +15,10 @@ defmodule Coflux.Topics.Pools do def init(params) do project_id = Map.fetch!(params, :project_id) - space_id = String.to_integer(Map.fetch!(params, :space_id)) + workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) {:ok, pools, ref} = - Orchestration.subscribe_pools(project_id, space_id, self()) + Orchestration.subscribe_pools(project_id, workspace_id, self()) value = build_value(pools) diff --git a/server/lib/coflux/topics/run.ex b/server/lib/coflux/topics/run.ex index 21b5e784..a5b15ba4 100644 --- a/server/lib/coflux/topics/run.ex +++ b/server/lib/coflux/topics/run.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Run do - use Topical.Topic, route: ["projects", :project_id, "runs", :run_id, :space_id] + use Topical.Topic, route: ["projects", :project_id, "runs", :run_id, :workspace_id] alias Coflux.Orchestration @@ -16,7 +16,7 @@ defmodule Coflux.Topics.Run do def init(params) do project_id = Map.fetch!(params, :project_id) external_run_id = Map.fetch!(params, :run_id) - space_id = String.to_integer(Map.fetch!(params, :space_id)) + workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) case Orchestration.subscribe_run( project_id, @@ -27,7 +27,7 @@ defmodule Coflux.Topics.Run do {:error, :not_found} {:ok, run, parent, steps, _ref} -> - run_space_id = + run_workspace_id = steps |> Map.values() |> Enum.reject(& &1.parent_id) @@ -35,15 +35,15 @@ defmodule Coflux.Topics.Run do |> Map.fetch!(:executions) |> Map.values() |> Enum.min_by(& &1.created_at) - |> Map.fetch!(:space_id) + |> Map.fetch!(:workspace_id) - space_ids = Enum.uniq([run_space_id, space_id]) + workspace_ids = Enum.uniq([run_workspace_id, workspace_id]) {:ok, - Topic.new(build_run(run, parent, steps, space_ids), %{ + Topic.new(build_run(run, parent, steps, workspace_ids), %{ project_id: project_id, external_run_id: external_run_id, - space_ids: space_ids + workspace_ids: workspace_ids })} end end @@ -53,8 +53,8 @@ defmodule Coflux.Topics.Run do {:ok, topic} end - defp process_notification(topic, {:step, external_step_id, step, space_id}) do - if space_id in topic.state.space_ids do + defp process_notification(topic, {:step, external_step_id, step, workspace_id}) do + if workspace_id in topic.state.workspace_ids do Topic.set(topic, [:steps, external_step_id], %{ module: step.module, target: step.target, @@ -75,16 +75,16 @@ defmodule Coflux.Topics.Run do defp process_notification( topic, - {:execution, step_id, attempt, execution_id, space_id, created_at, execute_after, + {:execution, step_id, attempt, execution_id, workspace_id, created_at, execute_after, dependencies} ) do - if space_id in topic.state.space_ids do + if workspace_id in topic.state.workspace_ids do Topic.set( topic, [:steps, step_id, :executions, Integer.to_string(attempt)], %{ executionId: Integer.to_string(execution_id), - spaceId: Integer.to_string(space_id), + workspaceId: Integer.to_string(workspace_id), createdAt: created_at, executeAfter: execute_after, assignedAt: nil, @@ -185,7 +185,7 @@ defmodule Coflux.Topics.Run do end) end - defp build_run(run, parent, steps, space_ids) do + defp build_run(run, parent, steps, workspace_ids) do %{ createdAt: run.created_at, parent: if(parent, do: build_execution(parent)), @@ -194,7 +194,7 @@ defmodule Coflux.Topics.Run do |> Enum.filter(fn {_, step} -> step.executions |> Map.values() - |> Enum.any?(&(&1.space_id in space_ids)) + |> Enum.any?(&(&1.workspace_id in workspace_ids)) end) |> Map.new(fn {step_id, step} -> {step_id, @@ -212,13 +212,13 @@ defmodule Coflux.Topics.Run do executions: step.executions |> Enum.filter(fn {_, execution} -> - execution.space_id in space_ids + execution.workspace_id in workspace_ids end) |> Map.new(fn {attempt, execution} -> {Integer.to_string(attempt), %{ executionId: Integer.to_string(execution.execution_id), - spaceId: Integer.to_string(execution.space_id), + workspaceId: Integer.to_string(execution.workspace_id), createdAt: execution.created_at, executeAfter: execution.execute_after, assignedAt: execution.assigned_at, diff --git a/server/lib/coflux/topics/search.ex b/server/lib/coflux/topics/search.ex index 405dc330..68cd1f5c 100644 --- a/server/lib/coflux/topics/search.ex +++ b/server/lib/coflux/topics/search.ex @@ -1,7 +1,7 @@ defmodule Coflux.Topics.Search do alias Coflux.Orchestration - use Topical.Topic, route: ["projects", :project_id, "search", :space_id] + use Topical.Topic, route: ["projects", :project_id, "search", :workspace_id] import Coflux.TopicUtils, only: [validate_project_access: 2] @@ -15,9 +15,9 @@ defmodule Coflux.Topics.Search do def init(params) do project_id = Map.fetch!(params, :project_id) - space_id = Map.fetch!(params, :space_id) + workspace_id = Map.fetch!(params, :workspace_id) - case Orchestration.subscribe_targets(project_id, space_id, self()) do + case Orchestration.subscribe_targets(project_id, workspace_id, self()) do {:ok, targets, _ref} -> topic = Topical.Topic.new(nil, %{targets: targets}) {:ok, topic} diff --git a/server/lib/coflux/topics/sessions.ex b/server/lib/coflux/topics/sessions.ex index aed43dea..2bd61eb8 100644 --- a/server/lib/coflux/topics/sessions.ex +++ b/server/lib/coflux/topics/sessions.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Sessions do - use Topical.Topic, route: ["projects", :project_id, "sessions", :space_id] + use Topical.Topic, route: ["projects", :project_id, "sessions", :workspace_id] alias Coflux.Orchestration @@ -15,9 +15,9 @@ defmodule Coflux.Topics.Sessions do def init(params) do project_id = Map.fetch!(params, :project_id) - space_id = String.to_integer(Map.fetch!(params, :space_id)) + workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) - {:ok, sessions, ref} = Orchestration.subscribe_sessions(project_id, space_id, self()) + {:ok, sessions, ref} = Orchestration.subscribe_sessions(project_id, workspace_id, self()) sessions = Map.new(sessions, fn {session_id, session} -> diff --git a/server/lib/coflux/topics/spaces.ex b/server/lib/coflux/topics/spaces.ex deleted file mode 100644 index 11660a9a..00000000 --- a/server/lib/coflux/topics/spaces.ex +++ /dev/null @@ -1,56 +0,0 @@ -defmodule Coflux.Topics.Spaces do - use Topical.Topic, route: ["projects", :project_id, "spaces"] - - alias Coflux.Orchestration - - import Coflux.TopicUtils, only: [validate_project_access: 2] - - def connect(params, context) do - namespace = Map.get(context, :namespace) - - with :ok <- validate_project_access(params.project_id, namespace) do - {:ok, params} - end - end - - def init(params) do - project_id = Map.fetch!(params, :project_id) - {:ok, spaces, ref} = Orchestration.subscribe_spaces(project_id, self()) - - spaces = - Map.new(spaces, fn {space_id, space} -> - {Integer.to_string(space_id), build_space(space)} - end) - - {:ok, Topic.new(spaces, %{ref: ref})} - end - - def handle_info({:topic, _ref, notifications}, topic) do - topic = Enum.reduce(notifications, topic, &process_notification(&2, &1)) - {:ok, topic} - end - - defp process_notification(topic, {:space, space_id, space}) do - Topic.set(topic, [Integer.to_string(space_id)], build_space(space)) - end - - defp process_notification(topic, {:state, space_id, state}) do - Topic.set(topic, [Integer.to_string(space_id), :state], build_state(state)) - end - - defp build_space(space) do - %{ - name: space.name, - baseId: space.base_id, - state: build_state(space.state) - } - end - - defp build_state(state) do - case state do - :active -> "active" - :paused -> "paused" - :archived -> "archived" - end - end -end diff --git a/server/lib/coflux/topics/workflow.ex b/server/lib/coflux/topics/workflow.ex index 8ec43153..bd46bd7c 100644 --- a/server/lib/coflux/topics/workflow.ex +++ b/server/lib/coflux/topics/workflow.ex @@ -1,6 +1,6 @@ defmodule Coflux.Topics.Workflow do use Topical.Topic, - route: ["projects", :project_id, "workflows", :module, :target, :space_id] + route: ["projects", :project_id, "workflows", :module, :target, :workspace_id] alias Coflux.Orchestration @@ -18,13 +18,13 @@ defmodule Coflux.Topics.Workflow do project_id = Map.fetch!(params, :project_id) module = Map.fetch!(params, :module) target_name = Map.fetch!(params, :target) - space_id = String.to_integer(Map.fetch!(params, :space_id)) + workspace_id = String.to_integer(Map.fetch!(params, :workspace_id)) case Orchestration.subscribe_workflow( project_id, module, target_name, - space_id, + workspace_id, self() ) do {:ok, workflow, instruction, runs, ref} -> diff --git a/server/lib/coflux/topics/workspaces.ex b/server/lib/coflux/topics/workspaces.ex new file mode 100644 index 00000000..464dc9b8 --- /dev/null +++ b/server/lib/coflux/topics/workspaces.ex @@ -0,0 +1,56 @@ +defmodule Coflux.Topics.Workspaces do + use Topical.Topic, route: ["projects", :project_id, "workspaces"] + + alias Coflux.Orchestration + + import Coflux.TopicUtils, only: [validate_project_access: 2] + + def connect(params, context) do + namespace = Map.get(context, :namespace) + + with :ok <- validate_project_access(params.project_id, namespace) do + {:ok, params} + end + end + + def init(params) do + project_id = Map.fetch!(params, :project_id) + {:ok, workspaces, ref} = Orchestration.subscribe_workspaces(project_id, self()) + + workspaces = + Map.new(workspaces, fn {workspace_id, workspace} -> + {Integer.to_string(workspace_id), build_workspace(workspace)} + end) + + {:ok, Topic.new(workspaces, %{ref: ref})} + end + + def handle_info({:topic, _ref, notifications}, topic) do + topic = Enum.reduce(notifications, topic, &process_notification(&2, &1)) + {:ok, topic} + end + + defp process_notification(topic, {:workspace, workspace_id, workspace}) do + Topic.set(topic, [Integer.to_string(workspace_id)], build_workspace(workspace)) + end + + defp process_notification(topic, {:state, workspace_id, state}) do + Topic.set(topic, [Integer.to_string(workspace_id), :state], build_state(state)) + end + + defp build_workspace(workspace) do + %{ + name: workspace.name, + baseId: workspace.base_id, + state: build_state(workspace.state) + } + end + + defp build_state(state) do + case state do + :active -> "active" + :paused -> "paused" + :archived -> "archived" + end + end +end diff --git a/server/priv/migrations/orchestration/4.sql b/server/priv/migrations/orchestration/4.sql new file mode 100644 index 00000000..55d650ba --- /dev/null +++ b/server/priv/migrations/orchestration/4.sql @@ -0,0 +1,20 @@ +-- Rename 'space' to 'workspace' + +-- Rename tables +ALTER TABLE spaces RENAME TO workspaces; +ALTER TABLE space_states RENAME TO workspace_states; +ALTER TABLE space_names RENAME TO workspace_names; +ALTER TABLE space_bases RENAME TO workspace_bases; +ALTER TABLE space_manifests RENAME TO workspace_manifests; + +-- Rename columns in renamed tables +ALTER TABLE workspace_states RENAME COLUMN space_id TO workspace_id; +ALTER TABLE workspace_names RENAME COLUMN space_id TO workspace_id; +ALTER TABLE workspace_bases RENAME COLUMN space_id TO workspace_id; +ALTER TABLE workspace_bases RENAME COLUMN base_id TO base_workspace_id; +ALTER TABLE workspace_manifests RENAME COLUMN space_id TO workspace_id; + +-- Rename columns in other tables +ALTER TABLE pools RENAME COLUMN space_id TO workspace_id; +ALTER TABLE sessions RENAME COLUMN space_id TO workspace_id; +ALTER TABLE executions RENAME COLUMN space_id TO workspace_id;