Skip to content

Conversation

@jca0
Copy link

@jca0 jca0 commented Jan 16, 2026

mujoco simulation module

created

  • mujoco backend to bring up mujoco
  • mujoco manipulation interface to easily use with other robots
  • mujoco driver to make it a module/determine topics to publish and subscribe to
  • xarm sim driver for xarm specific needs

command to test: python3 -m dimos.simulation.sim_blueprints

todos:

  • load an environment from a config file/folder on simulation bringup
  • add support for other robots
  • dealing with sensor topics

@jca0 jca0 requested a review from a team January 16, 2026 01:34
@greptile-apps
Copy link

greptile-apps bot commented Jan 16, 2026

Greptile Summary

Implements a comprehensive MuJoCo simulation module for robotic manipulators with a layered architecture: MujocoSimBackend handles the core MuJoCo engine, MujocoManipInterface provides a uniform API, and MujocoDriver wraps it as a dimos Module with LCM transport integration.

Key Changes:

  • Created modular simulation backend with proper threading and synchronization
  • Implemented position and velocity control modes with 100Hz control/monitor loops
  • Added xArm-specific driver with correct joint limits
  • Includes test scripts for validating pub/sub functionality

Issues Found:

  • Duplicate return True at mujoco_manip_interface.py:183 (unreachable code)
  • Previous review flagged: mujoco_driver.py:156 incorrectly uses msg.positions for velocity commands (already noted in previous thread)

Confidence Score: 4/5

  • Safe to merge after fixing the duplicate return statement
  • The architecture is solid with proper threading, locking, and separation of concerns. The duplicate return is a minor syntax issue that doesn't affect runtime. The velocity command bug was already flagged in a previous review.
  • Pay attention to mujoco_driver.py (velocity command bug from previous review) and mujoco_manip_interface.py (duplicate return)

Important Files Changed

Filename Overview
dimos/simulation/manipulators/mujoco_manip_interface.py Adapter interface with duplicate return statement at line 183
dimos/simulation/manipulators/mujoco_sim_backend.py Core MuJoCo backend with proper threading and synchronization
dimos/simulation/manipulators/mujoco_driver.py Module driver - previous issue flagged on line 156 using msg.positions instead of velocities field

Sequence Diagram

sequenceDiagram
    participant User
    participant MujocoDriver
    participant MujocoManipInterface
    participant MujocoSimBackend
    participant MuJoCo
    participant LCM

    User->>MujocoDriver: start()
    MujocoDriver->>MujocoManipInterface: connect()
    MujocoManipInterface->>MujocoSimBackend: create backend
    MujocoSimBackend->>MuJoCo: load robot model
    MujocoSimBackend->>MujocoSimBackend: start sim_thread
    MujocoSimBackend-->>MujocoManipInterface: connected
    MujocoManipInterface-->>MujocoDriver: connected
    
    MujocoDriver->>MujocoDriver: start control_thread
    MujocoDriver->>MujocoDriver: start monitor_thread
    MujocoDriver->>LCM: subscribe to joint_position_command
    MujocoDriver->>LCM: subscribe to joint_velocity_command
    
    loop Monitor Loop (100Hz)
        MujocoDriver->>MujocoManipInterface: read_joint_positions()
        MujocoManipInterface->>MujocoSimBackend: joint_positions
        MujocoSimBackend-->>MujocoManipInterface: positions
        MujocoManipInterface-->>MujocoDriver: positions
        MujocoDriver->>LCM: publish JointState
        MujocoDriver->>LCM: publish RobotState
    end
    
    LCM->>MujocoDriver: JointCommand received
    MujocoDriver->>MujocoDriver: store pending_positions
    
    loop Control Loop (100Hz)
        MujocoDriver->>MujocoDriver: check pending commands
        MujocoDriver->>MujocoManipInterface: write_joint_positions()
        MujocoManipInterface->>MujocoSimBackend: set_joint_position_targets()
        MujocoSimBackend->>MujocoSimBackend: update _joint_position_targets
    end
    
    loop Simulation Loop (model timestep)
        MujocoSimBackend->>MujocoSimBackend: _apply_control()
        MujocoSimBackend->>MuJoCo: mj_step()
        MuJoCo-->>MujocoSimBackend: simulation state
        MujocoSimBackend->>MujocoSimBackend: _update_joint_state()
    end
    
    User->>MujocoDriver: stop()
    MujocoDriver->>MujocoDriver: stop control/monitor threads
    MujocoDriver->>MujocoManipInterface: disconnect()
    MujocoManipInterface->>MujocoSimBackend: disconnect()
    MujocoSimBackend->>MujocoSimBackend: stop sim_thread
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 154 to 157
def _on_joint_velocity_command(self, msg: JointCommand) -> None:
with self._command_lock:
self._pending_velocities = list(msg.positions)
self._pending_positions = None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: JointCommand doesn't have a velocities field - it only has positions. This will cause an AttributeError when velocity commands are received.

Suggested change
def _on_joint_velocity_command(self, msg: JointCommand) -> None:
with self._command_lock:
self._pending_velocities = list(msg.positions)
self._pending_positions = None
def _on_joint_velocity_command(self, msg: JointCommand) -> None:
with self._command_lock:
self._pending_velocities = list(msg.positions)
self._pending_positions = None

@jca0 jca0 marked this pull request as draft January 16, 2026 01:59
jca0 and others added 3 commits January 15, 2026 23:41
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
@jca0 jca0 marked this pull request as ready for review January 16, 2026 07:44
Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

self._dof = int(spec.dof) if spec.dof is not None else 0
self._connected = False
self._servos_enabled = False
self._control_mode = ControlMode.POSITION
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need ControlMode.Position?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a default until user changes it via set_control_mode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a scalable approach. Having to build plugin for each oem manufacturer + sim engine is not maintainable.

The robot arms should not be specific to any SimEngine

Comment on lines 27 to 45
def _xarm_limits(dof: int) -> JointLimits:
if dof == 7:
lower_deg = [-360, -118, -360, -233, -360, -97, -360]
upper_deg = [360, 118, 360, 11, 360, 180, 360]
elif dof == 6:
lower_deg = [-360, -118, -225, -11, -360, -97]
upper_deg = [360, 118, 11, 225, 360, 180]
else:
lower_deg = [-360, -118, -225, -97, -360]
upper_deg = [360, 118, 11, 180, 360]

lower_rad = [math.radians(d) for d in lower_deg[:dof]]
upper_rad = [math.radians(d) for d in upper_deg[:dof]]
max_vel_rad = math.radians(180.0)
return JointLimits(
position_lower=lower_rad,
position_upper=upper_rad,
velocity_max=[max_vel_rad] * dof,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sim module should get a Robot config that has all its limits, assets etc.

Robot shouldn't need a registry to be loaded, but should be defined in the blueprint

model: str | None = None


class SimulationEngine(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice



@dataclass(frozen=True)
class RobotSpec:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this extend some existing general robot spec @mustafab0 ? Yeah pretty sure this should subclass, and shouldnt this take urdf @jca0

for i in range(limit):
self._joint_position_targets[i] = float(positions[i])

def write_joint_velocities(self, velocities: list[float]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be changed to list of some velocity type. I assume Vector3

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't make sense to. Vector3 is 3D, Twist is 6D, joints can only go in 2 directions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally a JointState message

Both write_joint_position & write_joint_velocities can be refactored into 1 write_joint_command/etc and take in a standard JointState message that can set Pos/Vel/Effort

self._pending_positions = None

def _control_loop(self) -> None:
period = 1.0 / max(self._control_rate, 1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second 1.0 here is the minimum control rate so should be an env var called MIN CONTROL RATE or something

time.sleep(period)

def _monitor_loop(self) -> None:
period = 1.0 / max(self._monitor_rate, 1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above here

return [f"{self._joint_prefix}{i + 1}" for i in range(dof)]


def get_blueprint() -> dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return blueprint how we normally do idk if we support returning blueprint via dict like this or if converting this back to .blueprint() type even works.

joints=[float(p) for p in positions],
)
)
time.sleep(period)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WRONG

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.sleep(period) ignores work time; loop runs slower than target.

Comment on lines 27 to 47
def _xarm_limits(dof: int) -> JointLimits:
if dof == 7:
lower_deg = [-360, -118, -360, -233, -360, -97, -360]
upper_deg = [360, 118, 360, 11, 360, 180, 360]
elif dof == 6:
lower_deg = [-360, -118, -225, -11, -360, -97]
upper_deg = [360, 118, 11, 225, 360, 180]
else:
lower_deg = [-360, -118, -225, -97, -360]
upper_deg = [360, 118, 11, 180, 360]

lower_rad = [math.radians(d) for d in lower_deg[:dof]]
upper_rad = [math.radians(d) for d in upper_deg[:dof]]
max_vel_rad = math.radians(180.0)
return JointLimits(
position_lower=lower_rad,
position_upper=upper_rad,
velocity_max=[max_vel_rad] * dof,
)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad

@spomichter
Copy link
Contributor

@jca0 Plus these changes add a blueprint to add_blueprints so devs can easily run, and then need an e2e test that runs everything and puts the arm through some motions, as well as a few unit tests where needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also be in lfs

Comment on lines 28 to 29
from dimos.msgs.sensor_msgs import JointCommand, JointState, RobotState
from dimos.simulation.engines.mujoco_engine import MujocoEngine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sim_module should not have "Mujoco" references.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be in a utils folder

engine="mujoco",
robot="xarm7",
config_path=None,
config_path="dimos/simulation/assets/xarm7/scene.xml",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard coded paths are a No-No

please look at how the manipulation branch does it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see test_control_module.py in e2e tests for reference.

The e2e test should confirm an instance of Sim engine is running and it can initialize correctly, and the module can communicate on its respective streams.

Comment on lines 131 to 153
def connect(self) -> None:
logger.info(f"{self.__class__.__name__}: connect()")
with self._lock:
self._connected = True
self._stop_event.clear()

if self._sim_thread is None or not self._sim_thread.is_alive():
self._sim_thread = threading.Thread(
target=self._sim_loop,
name=f"{self.__class__.__name__}Sim",
daemon=True,
)
self._sim_thread.start()

def disconnect(self) -> None:
logger.info(f"{self.__class__.__name__}: disconnect()")
with self._lock:
self._connected = False

self._stop_event.set()
if self._sim_thread and self._sim_thread.is_alive():
self._sim_thread.join(timeout=2.0)
self._sim_thread = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should return either a bool or an int to track if call was successful

for i in range(limit):
self._joint_position_targets[i] = float(positions[i])

def write_joint_velocities(self, velocities: list[float]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally a JointState message

Both write_joint_position & write_joint_velocities can be refactored into 1 write_joint_command/etc and take in a standard JointState message that can set Pos/Vel/Effort

Comment on lines 218 to 226
self.joint_state.publish(
JointState(
frame_id=self.frame_id,
name=names,
position=positions,
velocity=velocities,
effort=efforts,
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be part of the control loop, as joint states would be needed at the same control frequency

joints=[float(p) for p in positions],
)
)
time.sleep(period)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.sleep(period) ignores work time; loop runs slower than target.


xarm7_trajectory_sim = simulation(
engine="mujoco",
config_path=str(get_data("xarm7") / "scene.xml"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep config_path as a Path. In MujocoEngine you're converting it back to a Path anyway.

Comment on lines 70 to 71
if not self.config.engine:
raise ValueError("engine is required for SimulationModule")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If engine is required, then its type should be str, not str | None = None.

There's no need to verify it here if you disallow the class from being instantiated with an invalid config.

Comment on lines 74 to 76
engine_key = self.config.engine.lower()
if engine_key not in self._ENGINES:
raise ValueError(f"Unknown simulation engine: {self.config.engine}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to use the type system.

EngineType = Literal["mujoco", "some_other"]
_ENGINES: dict[EngineType, SimulationEngine] = {
    "mujoco": MujocoEngine,
}

...

# No need for None check, lower(), or key existence.
engine_cls = self._ENGINES[self.config.engine]

except Exception as exc:
import logging

logging.getLogger(self.__class__.__name__).warning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use logger = setup_logger() for logging.

But what exceptions are you expecting here? I would expect just this to work without if and try:

                self._disposables.add(
                    Disposable(
                        self.joint_position_command.subscribe(self._on_joint_position_command)
                    )
                )

If that doesn't work, it would be good to know why.

return self._error_code, self._error_message

def write_joint_positions(self, positions: list[float], velocity: float = 1.0) -> bool:
_ = velocity
Copy link
Contributor

@paul-nechifor paul-nechifor Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to silence the velocity parameter not being used? If so, it's better to rename it to _velocity. I assume it's required in the interface?

Oh, velocity isn't used at all. Just delete it?

Comment on lines 149 to 151
if hasattr(self._engine, "hold_current_position"):
self._engine.hold_current_position() # type: ignore[attr-defined]
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking if a method exists with hasattr is often a sign that the code isn't ideal.

It looks like hold_current_position exists on MujocoEngine, but not on SimulationEngine and you're using hasattr to break the abstraction.

The better way to accomplish this (adding special handling just for Mujoco) without ignoring types is to check if self._engine is an instance of MujocoEngine .

But isn't it better to just add hold_current_position to SimulationEngine?

Comment on lines 93 to 95
with self._lock:
pos_targets = list(self._joint_position_targets)
with self._lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why two? Seems like you're releasing and acquiring unnecessary the second time.

Comment on lines 231 to 233
limit = min(len(positions), self._num_joints)
for i in range(limit):
self._joint_position_targets[i] = float(positions[i])
Copy link
Contributor

@paul-nechifor paul-nechifor Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this pattern a lot in the code where you're copying values only up to a point.

But I think this just silently ignores errors. Is there a valid reason why write_joint_positions would be sent more positions than can be stored? I think you should let the code throw.

Co-authored-by: Paul Nechifor <paul@nechifor.net>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants