Skip to content

Conversation

@paul-nechifor
Copy link
Contributor

uv run dimos --no-dask --viewer-backend foxglove --simulation run unitree-go2-agentic

@paul-nechifor paul-nechifor marked this pull request as draft January 25, 2026 14:37
@greptile-apps
Copy link

greptile-apps bot commented Jan 25, 2026

Greptile Overview

Greptile Summary

This PR implements an alternative to Dask for module deployment using a multiprocessing-based worker system with forkserver. The implementation allows toggling between Dask and the new system via the --no-dask flag (global_config.dask).

Key Changes:

  • Renamed DaskModule to Module and removed the alias
  • Added new WorkerManager and Worker classes that spawn processes via forkserver and communicate via pipes
  • Modified ModuleCoordinator to conditionally use either DimosCluster (Dask) or WorkerManager based on configuration
  • Changed blueprint deployment to batch module specs and deploy in parallel
  • Added py-spy profiling tool to dev dependencies

Issues Found:

  • Module.set_ref() still calls get_worker() from Dask, which will fail in --no-dask mode (though it's bypassed by the worker implementation)
  • Incorrect import of traitlets.Any instead of typing.Any in module_coordinator.py
  • Potential issues with __getattr__ implementation in the Actor proxy not distinguishing between method calls and attribute access
  • Test documentation notes a known limitation where multiple instances of the same Module class share RPC channels

Confidence Score: 2/5

  • This PR has critical runtime issues that need resolution before merging
  • The PR has several logic errors that could cause runtime failures: (1) Module.set_ref() calls Dask's get_worker() which will fail in --no-dask mode, (2) incorrect import of traitlets.Any, (3) the Actor proxy's __getattr__ implementation may not correctly handle method calls vs attribute access. While the new worker system bypasses some of these issues, the inconsistency between Dask and non-Dask code paths creates fragility. The PR is marked WIP which aligns with these findings.
  • Pay close attention to dimos/core/module.py and dimos/core/worker.py - these files have critical logic issues that need resolution

Important Files Changed

Filename Overview
dimos/core/module.py Renamed DaskModule to Module and changed type variable imports, but get_worker() from Dask is still used in set_ref() which will break in --no-dask mode
dimos/core/worker.py New multiprocessing-based worker system using forkserver for Dask replacement, but __getattr__ and getattr request handling may not correctly handle method calls vs attribute access
dimos/core/worker_manager.py Manages worker processes with parallel deployment support, handles cleanup properly
dimos/core/module_coordinator.py Added conditional logic to use WorkerManager when dask=False, imports traitlets.Any instead of typing.Any
dimos/core/blueprints.py Changed to batch module specs and deploy in parallel using deploy_parallel() instead of sequential deploy() calls
dimos/core/test_worker.py New test file for worker system with documented limitation about multiple instances sharing RPC channels

Sequence Diagram

sequenceDiagram
    participant MB as ModuleBlueprintSet
    participant MC as ModuleCoordinator
    participant WM as WorkerManager
    participant W as Worker
    participant P as Process (forkserver)
    participant M as Module Instance
    participant RPC as RPCClient

    Note over MB,MC: Module Deployment (--no-dask mode)
    MB->>MB: Collect module specs
    MB->>MC: deploy_parallel(module_specs)
    
    MC->>MC: Check if WorkerManager
    MC->>WM: deploy_parallel(module_specs)
    
    loop For each module_spec
        WM->>W: Create Worker(module_class, args, kwargs)
        W->>W: Assign worker_id
        W->>P: Start forkserver Process
        activate P
        P->>M: Instantiate module_class(*args, **kwargs)
        P->>M: Set worker = worker_id
        P->>P: Start _worker_loop()
    end
    
    loop For each worker
        W->>P: Send set_ref request via pipe
        P->>M: Set instance.ref = actor
        P-->>W: Return worker_id
        W->>W: Mark ready
    end
    
    loop For each worker
        W->>RPC: Create RPCClient(actor, module_class)
        RPC->>RPC: Start LCMRPC service
        WM-->>MC: Return RPCClient
    end
    
    MC-->>MB: Return list of RPCClients
    
    Note over MB,RPC: RPC Method Call
    MB->>RPC: module.start()
    RPC->>RPC: Create RpcCall
    RPC->>W: actor.__getattr__("start")
    W->>P: Send getattr request via pipe
    P->>M: getattr(instance, "start")
    P-->>W: Return method
    W-->>RPC: Return ActorFuture(method)
    RPC->>RPC: Call method via LCMRPC
    
    Note over MC,P: Shutdown
    MC->>RPC: module.stop()
    RPC->>P: call_nowait("stop")
    P->>M: instance.stop()
    MC->>WM: close_all()
    WM->>W: shutdown()
    W->>P: Send shutdown request
    P-->>W: Acknowledge
    W->>P: Join/terminate process
    deactivate P
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, 6 comments

Edit Code Review Agent Settings | Greptile


from typing import TypeVar

from dask.distributed import Actor, get_worker
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_worker is still imported from dask but is used in Module.set_ref() on line 424. When --no-dask mode is enabled, this import will succeed but calling get_worker() will fail at runtime since no Dask worker context exists.

Suggested change
from dask.distributed import Actor, get_worker
from dask.distributed import Actor

Comment on lines +29 to +34
from typing_extensions import TypeVar as TypeVarExtension

if TYPE_CHECKING:
from dimos.core.introspection.module import ModuleInfo

from typing import TypeVar
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeVar is imported twice - once from typing_extensions as TypeVarExtension (line 29), and again from typing (line 34). The second import shadows the first one.

Suggested change
from typing_extensions import TypeVar as TypeVarExtension
if TYPE_CHECKING:
from dimos.core.introspection.module import ModuleInfo
from typing import TypeVar
from typing_extensions import TypeVar as TypeVarExtension
if TYPE_CHECKING:
from dimos.core.introspection.module import ModuleInfo
from typing import TypeVar

Comment on lines +59 to +65
def __getattr__(self, name: str) -> ActorFuture:
"""Proxy attribute access to the worker process."""
if name.startswith("_"):
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")

result = self._send_request_to_worker({"type": "getattr", "name": name})
return ActorFuture(result)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The __getattr__ method always returns an ActorFuture, but attribute access should return the actual attribute value, not a future. This works for method calls (where the future wraps the return value), but for non-callable attributes, callers would get an ActorFuture wrapper instead of the value itself.

Comment on lines +195 to +196
elif req_type == "getattr":
response["result"] = getattr(instance, request["name"])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This getattr request returns the attribute value directly without wrapping method calls. When the attribute is a method, it returns the bound method object, not the result of calling it. This means the Actor.__getattr__ will wrap a method object in an ActorFuture, not the result of calling the method.

# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

traitlets.Any is imported but typing.Any should be used instead for type annotations

Suggested change
from concurrent.futures import ThreadPoolExecutor
from typing import Any

@greptile-apps
Copy link

greptile-apps bot commented Jan 25, 2026

Additional Comments (1)

dimos/core/module.py
This set_ref() method calls get_worker() from Dask, which will fail in --no-dask mode. The new worker system bypasses this method by setting instance.ref and instance.worker directly in worker.py:192-193 and worker.py:165, but this creates inconsistency. If Dask is still enabled, this method is used; if not, it's bypassed.

@paul-nechifor paul-nechifor force-pushed the pauln-dask-alternative branch from 945e0ba to 85bb311 Compare January 25, 2026 14:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants