-
Notifications
You must be signed in to change notification settings - Fork 0
mujoco sim module #1035
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
mujoco sim module #1035
Conversation
Greptile SummaryImplements a comprehensive MuJoCo simulation module for robotic manipulators with a layered architecture: Key Changes:
Issues Found:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant User
participant MujocoDriver
participant MujocoManipInterface
participant MujocoSimBackend
participant MuJoCo
participant LCM
User->>MujocoDriver: start()
MujocoDriver->>MujocoManipInterface: connect()
MujocoManipInterface->>MujocoSimBackend: create backend
MujocoSimBackend->>MuJoCo: load robot model
MujocoSimBackend->>MujocoSimBackend: start sim_thread
MujocoSimBackend-->>MujocoManipInterface: connected
MujocoManipInterface-->>MujocoDriver: connected
MujocoDriver->>MujocoDriver: start control_thread
MujocoDriver->>MujocoDriver: start monitor_thread
MujocoDriver->>LCM: subscribe to joint_position_command
MujocoDriver->>LCM: subscribe to joint_velocity_command
loop Monitor Loop (100Hz)
MujocoDriver->>MujocoManipInterface: read_joint_positions()
MujocoManipInterface->>MujocoSimBackend: joint_positions
MujocoSimBackend-->>MujocoManipInterface: positions
MujocoManipInterface-->>MujocoDriver: positions
MujocoDriver->>LCM: publish JointState
MujocoDriver->>LCM: publish RobotState
end
LCM->>MujocoDriver: JointCommand received
MujocoDriver->>MujocoDriver: store pending_positions
loop Control Loop (100Hz)
MujocoDriver->>MujocoDriver: check pending commands
MujocoDriver->>MujocoManipInterface: write_joint_positions()
MujocoManipInterface->>MujocoSimBackend: set_joint_position_targets()
MujocoSimBackend->>MujocoSimBackend: update _joint_position_targets
end
loop Simulation Loop (model timestep)
MujocoSimBackend->>MujocoSimBackend: _apply_control()
MujocoSimBackend->>MuJoCo: mj_step()
MuJoCo-->>MujocoSimBackend: simulation state
MujocoSimBackend->>MujocoSimBackend: _update_joint_state()
end
User->>MujocoDriver: stop()
MujocoDriver->>MujocoDriver: stop control/monitor threads
MujocoDriver->>MujocoManipInterface: disconnect()
MujocoManipInterface->>MujocoSimBackend: disconnect()
MujocoSimBackend->>MujocoSimBackend: stop sim_thread
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8 files reviewed, 2 comments
| def _on_joint_velocity_command(self, msg: JointCommand) -> None: | ||
| with self._command_lock: | ||
| self._pending_velocities = list(msg.positions) | ||
| self._pending_positions = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
syntax: JointCommand doesn't have a velocities field - it only has positions. This will cause an AttributeError when velocity commands are received.
| def _on_joint_velocity_command(self, msg: JointCommand) -> None: | |
| with self._command_lock: | |
| self._pending_velocities = list(msg.positions) | |
| self._pending_positions = None | |
| def _on_joint_velocity_command(self, msg: JointCommand) -> None: | |
| with self._command_lock: | |
| self._pending_velocities = list(msg.positions) | |
| self._pending_positions = None |
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8 files reviewed, 1 comment
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
| self._dof = int(spec.dof) if spec.dof is not None else 0 | ||
| self._connected = False | ||
| self._servos_enabled = False | ||
| self._control_mode = ControlMode.POSITION |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need ControlMode.Position?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a default until user changes it via set_control_mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a scalable approach. Having to build plugin for each oem manufacturer + sim engine is not maintainable.
The robot arms should not be specific to any SimEngine
| def _xarm_limits(dof: int) -> JointLimits: | ||
| if dof == 7: | ||
| lower_deg = [-360, -118, -360, -233, -360, -97, -360] | ||
| upper_deg = [360, 118, 360, 11, 360, 180, 360] | ||
| elif dof == 6: | ||
| lower_deg = [-360, -118, -225, -11, -360, -97] | ||
| upper_deg = [360, 118, 11, 225, 360, 180] | ||
| else: | ||
| lower_deg = [-360, -118, -225, -97, -360] | ||
| upper_deg = [360, 118, 11, 180, 360] | ||
|
|
||
| lower_rad = [math.radians(d) for d in lower_deg[:dof]] | ||
| upper_rad = [math.radians(d) for d in upper_deg[:dof]] | ||
| max_vel_rad = math.radians(180.0) | ||
| return JointLimits( | ||
| position_lower=lower_rad, | ||
| position_upper=upper_rad, | ||
| velocity_max=[max_vel_rad] * dof, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sim module should get a Robot config that has all its limits, assets etc.
Robot shouldn't need a registry to be loaded, but should be defined in the blueprint
| model: str | None = None | ||
|
|
||
|
|
||
| class SimulationEngine(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
dimos/simulation/engines/base.py
Outdated
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class RobotSpec: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this extend some existing general robot spec @mustafab0 ? Yeah pretty sure this should subclass, and shouldnt this take urdf @jca0
| for i in range(limit): | ||
| self._joint_position_targets[i] = float(positions[i]) | ||
|
|
||
| def write_joint_velocities(self, velocities: list[float]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to be changed to list of some velocity type. I assume Vector3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't make sense to. Vector3 is 3D, Twist is 6D, joints can only go in 2 directions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally a JointState message
Both write_joint_position & write_joint_velocities can be refactored into 1 write_joint_command/etc and take in a standard JointState message that can set Pos/Vel/Effort
| self._pending_positions = None | ||
|
|
||
| def _control_loop(self) -> None: | ||
| period = 1.0 / max(self._control_rate, 1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second 1.0 here is the minimum control rate so should be an env var called MIN CONTROL RATE or something
| time.sleep(period) | ||
|
|
||
| def _monitor_loop(self) -> None: | ||
| period = 1.0 / max(self._monitor_rate, 1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above here
| return [f"{self._joint_prefix}{i + 1}" for i in range(dof)] | ||
|
|
||
|
|
||
| def get_blueprint() -> dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return blueprint how we normally do idk if we support returning blueprint via dict like this or if converting this back to .blueprint() type even works.
| joints=[float(p) for p in positions], | ||
| ) | ||
| ) | ||
| time.sleep(period) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WRONG
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time.sleep(period) ignores work time; loop runs slower than target.
| def _xarm_limits(dof: int) -> JointLimits: | ||
| if dof == 7: | ||
| lower_deg = [-360, -118, -360, -233, -360, -97, -360] | ||
| upper_deg = [360, 118, 360, 11, 360, 180, 360] | ||
| elif dof == 6: | ||
| lower_deg = [-360, -118, -225, -11, -360, -97] | ||
| upper_deg = [360, 118, 11, 225, 360, 180] | ||
| else: | ||
| lower_deg = [-360, -118, -225, -97, -360] | ||
| upper_deg = [360, 118, 11, 180, 360] | ||
|
|
||
| lower_rad = [math.radians(d) for d in lower_deg[:dof]] | ||
| upper_rad = [math.radians(d) for d in upper_deg[:dof]] | ||
| max_vel_rad = math.radians(180.0) | ||
| return JointLimits( | ||
| position_lower=lower_rad, | ||
| position_upper=upper_rad, | ||
| velocity_max=[max_vel_rad] * dof, | ||
| ) | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad
|
@jca0 Plus these changes add a blueprint to add_blueprints so devs can easily run, and then need an e2e test that runs everything and puts the arm through some motions, as well as a few unit tests where needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also be in lfs
| from dimos.msgs.sensor_msgs import JointCommand, JointState, RobotState | ||
| from dimos.simulation.engines.mujoco_engine import MujocoEngine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sim_module should not have "Mujoco" references.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be in a utils folder
dimos/simulation/sim_blueprints.py
Outdated
| engine="mujoco", | ||
| robot="xarm7", | ||
| config_path=None, | ||
| config_path="dimos/simulation/assets/xarm7/scene.xml", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard coded paths are a No-No
please look at how the manipulation branch does it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see test_control_module.py in e2e tests for reference.
The e2e test should confirm an instance of Sim engine is running and it can initialize correctly, and the module can communicate on its respective streams.
| def connect(self) -> None: | ||
| logger.info(f"{self.__class__.__name__}: connect()") | ||
| with self._lock: | ||
| self._connected = True | ||
| self._stop_event.clear() | ||
|
|
||
| if self._sim_thread is None or not self._sim_thread.is_alive(): | ||
| self._sim_thread = threading.Thread( | ||
| target=self._sim_loop, | ||
| name=f"{self.__class__.__name__}Sim", | ||
| daemon=True, | ||
| ) | ||
| self._sim_thread.start() | ||
|
|
||
| def disconnect(self) -> None: | ||
| logger.info(f"{self.__class__.__name__}: disconnect()") | ||
| with self._lock: | ||
| self._connected = False | ||
|
|
||
| self._stop_event.set() | ||
| if self._sim_thread and self._sim_thread.is_alive(): | ||
| self._sim_thread.join(timeout=2.0) | ||
| self._sim_thread = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should return either a bool or an int to track if call was successful
| for i in range(limit): | ||
| self._joint_position_targets[i] = float(positions[i]) | ||
|
|
||
| def write_joint_velocities(self, velocities: list[float]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally a JointState message
Both write_joint_position & write_joint_velocities can be refactored into 1 write_joint_command/etc and take in a standard JointState message that can set Pos/Vel/Effort
| self.joint_state.publish( | ||
| JointState( | ||
| frame_id=self.frame_id, | ||
| name=names, | ||
| position=positions, | ||
| velocity=velocities, | ||
| effort=efforts, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be part of the control loop, as joint states would be needed at the same control frequency
| joints=[float(p) for p in positions], | ||
| ) | ||
| ) | ||
| time.sleep(period) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time.sleep(period) ignores work time; loop runs slower than target.
dimos/simulation/sim_blueprints.py
Outdated
|
|
||
| xarm7_trajectory_sim = simulation( | ||
| engine="mujoco", | ||
| config_path=str(get_data("xarm7") / "scene.xml"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep config_path as a Path. In MujocoEngine you're converting it back to a Path anyway.
| if not self.config.engine: | ||
| raise ValueError("engine is required for SimulationModule") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If engine is required, then its type should be str, not str | None = None.
There's no need to verify it here if you disallow the class from being instantiated with an invalid config.
| engine_key = self.config.engine.lower() | ||
| if engine_key not in self._ENGINES: | ||
| raise ValueError(f"Unknown simulation engine: {self.config.engine}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to use the type system.
EngineType = Literal["mujoco", "some_other"]
_ENGINES: dict[EngineType, SimulationEngine] = {
"mujoco": MujocoEngine,
}
...
# No need for None check, lower(), or key existence.
engine_cls = self._ENGINES[self.config.engine]| except Exception as exc: | ||
| import logging | ||
|
|
||
| logging.getLogger(self.__class__.__name__).warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use logger = setup_logger() for logging.
But what exceptions are you expecting here? I would expect just this to work without if and try:
self._disposables.add(
Disposable(
self.joint_position_command.subscribe(self._on_joint_position_command)
)
)
If that doesn't work, it would be good to know why.
| return self._error_code, self._error_message | ||
|
|
||
| def write_joint_positions(self, positions: list[float], velocity: float = 1.0) -> bool: | ||
| _ = velocity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this to silence the velocity parameter not being used? If so, it's better to rename it to _velocity. I assume it's required in the interface?
Oh, velocity isn't used at all. Just delete it?
| if hasattr(self._engine, "hold_current_position"): | ||
| self._engine.hold_current_position() # type: ignore[attr-defined] | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking if a method exists with hasattr is often a sign that the code isn't ideal.
It looks like hold_current_position exists on MujocoEngine, but not on SimulationEngine and you're using hasattr to break the abstraction.
The better way to accomplish this (adding special handling just for Mujoco) without ignoring types is to check if self._engine is an instance of MujocoEngine .
But isn't it better to just add hold_current_position to SimulationEngine?
| with self._lock: | ||
| pos_targets = list(self._joint_position_targets) | ||
| with self._lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why two? Seems like you're releasing and acquiring unnecessary the second time.
| limit = min(len(positions), self._num_joints) | ||
| for i in range(limit): | ||
| self._joint_position_targets[i] = float(positions[i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this pattern a lot in the code where you're copying values only up to a point.
But I think this just silently ignores errors. Is there a valid reason why write_joint_positions would be sent more positions than can be stored? I think you should let the code throw.
Co-authored-by: Paul Nechifor <paul@nechifor.net>
mujoco simulation module
created
command to test:
python3 -m dimos.simulation.sim_blueprintstodos: