diff --git a/source/isaaclab/isaaclab/scene/interactive_scene.py b/source/isaaclab/isaaclab/scene/interactive_scene.py index f57ccd5f1f22..5a9ff0696094 100644 --- a/source/isaaclab/isaaclab/scene/interactive_scene.py +++ b/source/isaaclab/isaaclab/scene/interactive_scene.py @@ -583,6 +583,11 @@ def _is_scene_setup_from_cfg(self) -> bool: def _add_entities_from_cfg(self): """Add scene entities from the config.""" + + def resolve_env_ns(path: str, ns: str) -> str: + """Normalize ``{ENV_REGEX_NS}`` (or a literal env-regex) and substitute *ns*.""" + return path.replace(self.env_regex_ns, "{ENV_REGEX_NS}").format(ENV_REGEX_NS=ns) + # parse the entire scene config and resolve regex for asset_name, asset_cfg in self.cfg.__dict__.items(): # skip keywords @@ -595,11 +600,12 @@ def _add_entities_from_cfg(self): # In order to compose cloner behavior more flexibly, we ask each spawner to spawn prototypes in # prepared /World/template path, once all template is ready, cloner can determine what rules to follow # to combine, and distribute the templates to cloned environments. - asset_cfg.prim_path = asset_cfg.prim_path.replace(self.env_regex_ns, "{ENV_REGEX_NS}") - destinations_regex_ns = asset_cfg.prim_path.format(ENV_REGEX_NS=self.env_regex_ns) - if self.env_regex_ns[:-2] in destinations_regex_ns: + # Sensors are excluded — they don't spawn prims and need env-regex + # paths during __init__ for cl_register_site. + destinations_regex_ns = resolve_env_ns(asset_cfg.prim_path, self.env_regex_ns) + if not isinstance(asset_cfg, SensorBaseCfg) and self.env_regex_ns[:-2] in destinations_regex_ns: require_clone = True - prototype_root = asset_cfg.prim_path.format(ENV_REGEX_NS=self.cloner_cfg.template_root) + prototype_root = resolve_env_ns(asset_cfg.prim_path, self.cloner_cfg.template_root) asset_cfg.prim_path = f"{prototype_root}/{self.cloner_cfg.template_prototype_identifier}_.*" else: asset_cfg.prim_path = destinations_regex_ns @@ -615,29 +621,22 @@ def _add_entities_from_cfg(self): self._rigid_objects[asset_name] = asset_cfg.class_type(asset_cfg) elif isinstance(asset_cfg, SensorBaseCfg): if isinstance(asset_cfg, ContactSensorCfg): - if asset_cfg.shape_path is not None: - updated_shape_names_expr = [] - for filter_prim_path in asset_cfg.shape_path: - updated_shape_names_expr.append(filter_prim_path.format(ENV_REGEX_NS=self.env_regex_ns)) - asset_cfg.shape_path = updated_shape_names_expr if asset_cfg.filter_prim_paths_expr is not None: - updated_contact_partners_body_expr = [] - for contact_partners_body_expr in asset_cfg.filter_prim_paths_expr: - updated_contact_partners_body_expr.append( - contact_partners_body_expr.format(ENV_REGEX_NS=self.env_regex_ns) - ) - asset_cfg.filter_prim_paths_expr = updated_contact_partners_body_expr - if asset_cfg.filter_shape_paths_expr is not None: - updated_contact_partners_shape_expr = [] - for contact_partners_shape_expr in asset_cfg.filter_shape_paths_expr: - updated_contact_partners_shape_expr.append( - contact_partners_shape_expr.format(ENV_REGEX_NS=self.env_regex_ns) - ) - asset_cfg.filter_shape_paths_expr = updated_contact_partners_shape_expr + asset_cfg.filter_prim_paths_expr = [ + resolve_env_ns(e, self.env_regex_ns) for e in asset_cfg.filter_prim_paths_expr + ] + if hasattr(asset_cfg, "sensor_shape_prim_expr") and asset_cfg.sensor_shape_prim_expr is not None: + asset_cfg.sensor_shape_prim_expr = [ + resolve_env_ns(e, self.env_regex_ns) for e in asset_cfg.sensor_shape_prim_expr + ] + if hasattr(asset_cfg, "filter_shape_prim_expr") and asset_cfg.filter_shape_prim_expr is not None: + asset_cfg.filter_shape_prim_expr = [ + resolve_env_ns(e, self.env_regex_ns) for e in asset_cfg.filter_shape_prim_expr + ] elif isinstance(asset_cfg, FrameTransformerCfg): if asset_cfg.target_frames is not None: for target_frame in asset_cfg.target_frames: - target_frame.prim_path = target_frame.prim_path.format(ENV_REGEX_NS=self.env_regex_ns) + target_frame.prim_path = resolve_env_ns(target_frame.prim_path, self.env_regex_ns) self._sensors[asset_name] = asset_cfg.class_type(asset_cfg) elif isinstance(asset_cfg, AssetBaseCfg): # manually spawn asset diff --git a/source/isaaclab/isaaclab/sensors/contact_sensor/base_contact_sensor.py b/source/isaaclab/isaaclab/sensors/contact_sensor/base_contact_sensor.py index 158547253220..c87ec4fc86d6 100644 --- a/source/isaaclab/isaaclab/sensors/contact_sensor/base_contact_sensor.py +++ b/source/isaaclab/isaaclab/sensors/contact_sensor/base_contact_sensor.py @@ -127,7 +127,7 @@ def reset(self, env_ids: Sequence[int] | None = None, env_mask: wp.array(dtype=w env_mask: The masks of the environments to reset. Defaults to None: all the environments are reset. """ # reset the timers and counters - super().reset(env_ids) + super().reset(env_ids, env_mask) @abstractmethod def find_bodies( diff --git a/source/isaaclab/isaaclab/sensors/contact_sensor/contact_sensor_cfg.py b/source/isaaclab/isaaclab/sensors/contact_sensor/contact_sensor_cfg.py index 61e4be9f94be..077682d11ae6 100644 --- a/source/isaaclab/isaaclab/sensors/contact_sensor/contact_sensor_cfg.py +++ b/source/isaaclab/isaaclab/sensors/contact_sensor/contact_sensor_cfg.py @@ -13,18 +13,44 @@ @configclass class ContactSensorCfg(SensorBaseCfg): - """Configuration for the contact sensor.""" + """Configuration for the contact sensor. + + Sensing bodies are selected via :attr:`SensorBaseCfg.prim_path`. Filter bodies for + per-partner force reporting are selected via :attr:`filter_prim_paths_expr`. + + Only body-level sensing and filtering are supported. For shape-level granularity, + see ``NewtonContactSensorCfg`` in ``isaaclab_newton``. + """ class_type: type = ContactSensor track_pose: bool = False """Whether to track the pose of the sensor's origin. Defaults to False.""" + track_contact_points: bool = False + """Whether to track the contact point locations. Defaults to False.""" + + track_friction_forces: bool = False + """Whether to track the friction forces at the contact points. Defaults to False.""" + + max_contact_data_count_per_prim: int | None = None + """The maximum number of contacts across all batches of the sensor to keep track of. Default is 4, where supported. + + This parameter sets the total maximum counts of the simulation across all bodies and environments. The total number + of contacts allowed is max_contact_data_count_per_prim*num_envs*num_sensor_bodies. + + .. note:: + + If the environment is very contact rich it is suggested to increase this parameter to avoid out of bounds memory + errors and loss of contact data leading to inaccurate measurements. + """ + track_air_time: bool = False """Whether to track the air/contact time of the bodies (time between contacts). Defaults to False.""" - force_threshold: float = 0.0 + force_threshold: float | None = None """The threshold on the norm of the contact force that determines whether two bodies are in collision or not. + Defaults to None, in which case the sensor backend chooses an appropriate value. This value is only used for tracking the mode duration (the time in contact or in air), if :attr:`track_air_time` is True. @@ -34,73 +60,26 @@ class ContactSensorCfg(SensorBaseCfg): """Number of past frames to store in the sensor buffers. Defaults to 0, which means that only the current data is stored (no history).""" - shape_path: list[str] | None = None - """A list of expressions to filter contacts shapes with. Defaults to None. If both :attr:`body_names_expr` and - :attr:`shape_names_expr` are None, the contact with all bodies/shapes is reported. - - Only one of :attr:`body_names_expr` or :attr:`shape_names_expr` can be provided. - If both are provided, an error will be raised. - - We make an explicit difference between a body and a shape. A body is a rigid body, while a shape is a collision - shape. A body can have multiple shapes. The shape option allows a more fine-grained control over the contact - reporting. - - .. note:: - The expression in the list can contain the environment namespace regex ``{ENV_REGEX_NS}`` which - will be replaced with the environment namespace. - """ - filter_prim_paths_expr: list[str] | None = None - """A list of expressions to filter contacts bodies with. Defaults to None. If both :attr:`contact_partners_body_expr` and - :attr:`contact_partners_shape_expr` are None, the contact with all bodies/shapes is reported. - - Only one of :attr:`contact_partners_body_expr` or :attr:`contact_partners_shape_expr` can be provided. - If both are provided, an error will be raised. + """List of body prim path expressions to filter contacts against. Defaults to None, + meaning contacts with all bodies are aggregated into the net force. - The contact sensor allows reporting contacts between the primitive specified with either :attr:`body_names_expr` or - :attr:`shape_names_expr` and other primitives in the scene. For instance, in a scene containing a robot, a ground - plane and an object, you can obtain individual contact reports of the base of the robot with the ground plane and - the object. + If provided, a per-partner force matrix (:attr:`ContactSensorData.force_matrix_w`) is + reported in addition to the net force. Each expression is matched against body prim paths + in the scene. - We make an explicit difference between a body and a shape. A body is a rigid body, while a shape is a collision - shape. A body can have multiple shapes. The shape option allows a more fine-grained control over the contact - reporting. + For shape-level filtering, see ``NewtonContactSensorCfg`` in ``isaaclab_newton``. .. note:: - The expression in the list can contain the environment namespace regex ``{ENV_REGEX_NS}`` which - will be replaced with the environment namespace. + Expressions can contain the environment namespace regex ``{ENV_REGEX_NS}``, which + is replaced with the environment namespace. - Example: ``{ENV_REGEX_NS}/Object`` will be replaced with ``/World/envs/env_.*/Object``. + Example: ``{ENV_REGEX_NS}/Object`` becomes ``/World/envs/env_.*/Object``. .. attention:: - The reporting of filtered contacts only works when the sensor primitive :attr:`prim_path` corresponds to a - single primitive in that environment. If the sensor primitive corresponds to multiple primitives, the - filtering will not work as expected. Please check :class:`~isaaclab.sensors.contact_sensor.ContactSensor` - for more details. - """ - - filter_shape_paths_expr: list[str] | None = None - """A list of expressions to filter contacts shapes with. Defaults to None. If both :attr:`contact_partners_body_expr` and - :attr:`contact_partners_shape_expr` are None, the contact with all bodies/shapes is reported. - - Only one of :attr:`contact_partners_body_expr` or :attr:`contact_partners_shape_expr` can be provided. - If both are provided, an error will be raised. - - The contact sensor allows reporting contacts between the primitive specified with either :attr:`body_names_expr` or - :attr:`shape_names_expr` and other primitives in the scene. For instance, in a scene containing a robot, a ground - plane and an object, you can obtain individual contact reports of the base of the robot with the ground plane and - the object. - - - We make an explicit difference between a body and a shape. A body is a rigid body, while a shape is a collision - shape. A body can have multiple shapes. The shape option allows a more fine-grained control over the contact - reporting. - - .. note:: - The expression in the list can contain the environment namespace regex ``{ENV_REGEX_NS}`` which - will be replaced with the environment namespace. - - Example: ``{ENV_REGEX_NS}/Object`` will be replaced with ``/World/envs/env_.*/Object``. + Filtered contact reporting only works when :attr:`SensorBaseCfg.prim_path` matches a + single primitive per environment. For many-to-many filtering, see + ``NewtonContactSensorCfg`` in ``isaaclab_newton``. """ visualizer_cfg: VisualizationMarkersCfg = CONTACT_SENSOR_MARKER_CFG.replace(prim_path="/Visuals/ContactSensor") diff --git a/source/isaaclab/isaaclab/sensors/frame_transformer/__init__.py b/source/isaaclab/isaaclab/sensors/frame_transformer/__init__.py index d5db853e8cc2..d678d9e0082b 100644 --- a/source/isaaclab/isaaclab/sensors/frame_transformer/__init__.py +++ b/source/isaaclab/isaaclab/sensors/frame_transformer/__init__.py @@ -5,6 +5,17 @@ """Sub-module for frame transformer sensor.""" +from .base_frame_transformer import BaseFrameTransformer +from .base_frame_transformer_data import BaseFrameTransformerData from .frame_transformer import FrameTransformer from .frame_transformer_cfg import FrameTransformerCfg, OffsetCfg from .frame_transformer_data import FrameTransformerData + +__all__ = [ + "BaseFrameTransformer", + "BaseFrameTransformerData", + "FrameTransformer", + "FrameTransformerData", + "FrameTransformerCfg", + "OffsetCfg", +] diff --git a/source/isaaclab/isaaclab/sensors/frame_transformer/base_frame_transformer.py b/source/isaaclab/isaaclab/sensors/frame_transformer/base_frame_transformer.py new file mode 100644 index 000000000000..430f25721372 --- /dev/null +++ b/source/isaaclab/isaaclab/sensors/frame_transformer/base_frame_transformer.py @@ -0,0 +1,117 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from __future__ import annotations + +from abc import abstractmethod +from collections.abc import Sequence +from typing import TYPE_CHECKING + +import warp as wp + +from ..sensor_base import SensorBase + +if TYPE_CHECKING: + from .base_frame_transformer_data import BaseFrameTransformerData + from .frame_transformer_cfg import FrameTransformerCfg + + +class BaseFrameTransformer(SensorBase): + """Sensor reporting transforms of target frames relative to a source frame. + + Source frame is set via :attr:`FrameTransformerCfg.prim_path`; target frames via + :attr:`FrameTransformerCfg.target_frames`. Both must be rigid bodies. Relative + transforms are derived from world-frame poses obtained from the physics engine. + + Optional offsets on source and target frames allow measuring from points other + than the body origin (e.g. a tool-tip offset from the body center of mass). + """ + + cfg: FrameTransformerCfg + """The configuration parameters.""" + + def __init__(self, cfg: FrameTransformerCfg): + """Initializes the frame transformer object. + + Args: + cfg: The configuration parameters. + """ + # initialize base class + super().__init__(cfg) + + def __str__(self) -> str: + """Returns: A string containing information about the instance.""" + return ( + f"FrameTransformer @ '{self.cfg.prim_path}': \n" + f"\tupdate period (s) : {self.cfg.update_period}\n" + f"\tnumber of bodies : {self.num_bodies}\n" + f"\tbody names : {self.body_names}\n" + ) + + """ + Properties + """ + + @property + @abstractmethod + def data(self) -> BaseFrameTransformerData: + """Data from the sensor.""" + raise NotImplementedError + + @property + def num_bodies(self) -> int: + """Number of target bodies tracked.""" + return len(self._target_frame_body_names) + + @property + def body_names(self) -> list[str]: + """Ordered names of target bodies tracked.""" + return self._target_frame_body_names + + """ + Operations + """ + + def reset(self, env_ids: Sequence[int] | None = None, env_mask: wp.array | None = None): + # reset the timers and counters + super().reset(env_ids, env_mask) + + @abstractmethod + def find_bodies(self, name_keys: str | Sequence[str], preserve_order: bool = False) -> tuple[list[int], list[str]]: + """Find target bodies by name. + + Args: + name_keys: Regex or list of regexes to match target body names. + preserve_order: Whether to preserve the order of the name keys. Defaults to False. + + Returns: + Tuple of (indices, names) for matched bodies. + """ + raise NotImplementedError + + """ + Implementation. + """ + + @abstractmethod + def _initialize_impl(self): + super()._initialize_impl() + + @abstractmethod + def _update_buffers_impl(self, env_mask: wp.array | None): + """Fills the buffers of the sensor data. + + Args: + env_mask: Mask of the environments to update. None: update all environments. + """ + raise NotImplementedError + + """ + Internal simulation callbacks. + """ + + def _invalidate_initialize_callback(self, event): + """Invalidates the scene elements.""" + super()._invalidate_initialize_callback(event) diff --git a/source/isaaclab/isaaclab/sensors/frame_transformer/base_frame_transformer_data.py b/source/isaaclab/isaaclab/sensors/frame_transformer/base_frame_transformer_data.py new file mode 100644 index 000000000000..7261ec93a953 --- /dev/null +++ b/source/isaaclab/isaaclab/sensors/frame_transformer/base_frame_transformer_data.py @@ -0,0 +1,74 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from __future__ import annotations + +from abc import ABC, abstractmethod + +import warp as wp + + +class BaseFrameTransformerData(ABC): + """Data container for the frame transformer sensor.""" + + target_frame_names: list[str] | None = None + """Target frame names in the order frame data is stored. + + Resolved from :attr:`FrameTransformerCfg.FrameCfg.name`; order may differ from + the config due to regex matching. + """ + + @property + @abstractmethod + def target_pos_source(self) -> wp.array: + """Target position(s) relative to source frame. + (N, M) array of ``wp.vec3f`` — N envs, M target frames. + """ + raise NotImplementedError + + @property + @abstractmethod + def target_quat_source(self) -> wp.array: + """Target orientation(s) relative to source frame. + (N, M) array of ``wp.quatf`` — N envs, M target frames. + """ + raise NotImplementedError + + @property + @abstractmethod + def target_pos_w(self) -> wp.array: + """Target position(s) in world frame (with offset applied). + (N, M) array of ``wp.vec3f`` — N envs, M target frames. + """ + raise NotImplementedError + + @property + @abstractmethod + def target_quat_w(self) -> wp.array: + """Target orientation(s) in world frame (with offset applied). + (N, M) array of ``wp.quatf`` — N envs, M target frames. + """ + raise NotImplementedError + + @property + @abstractmethod + def source_pos_w(self) -> wp.array: + """Source position in world frame (with offset applied). + (N,) array of ``wp.vec3f`` — N envs. + """ + raise NotImplementedError + + @property + @abstractmethod + def source_quat_w(self) -> wp.array: + """Source orientation in world frame (with offset applied). + (N,) array of ``wp.quatf`` — N envs. + """ + raise NotImplementedError + + @abstractmethod + def _create_buffers(self, *args, **kwargs): + """Allocates owned data buffers.""" + raise NotImplementedError diff --git a/source/isaaclab/isaaclab/sensors/frame_transformer/frame_transformer.py b/source/isaaclab/isaaclab/sensors/frame_transformer/frame_transformer.py index ca03184a5a65..de64ba316421 100644 --- a/source/isaaclab/isaaclab/sensors/frame_transformer/frame_transformer.py +++ b/source/isaaclab/isaaclab/sensors/frame_transformer/frame_transformer.py @@ -5,591 +5,19 @@ from __future__ import annotations -import torch -from collections.abc import Sequence from typing import TYPE_CHECKING -import warp as wp -from isaaclab_newton.physics import NewtonManager +from isaaclab.utils.backend_utils import FactoryBase -import isaaclab.sim as sim_utils -import isaaclab.utils.string as string_utils -from isaaclab.utils.math import normalize, quat_from_angle_axis - -from ..sensor_base import SensorBase -from .frame_transformer_data import FrameTransformerData +from .base_frame_transformer import BaseFrameTransformer if TYPE_CHECKING: - from .frame_transformer_cfg import FrameTransformerCfg - -""" -Warp kernels -""" - - -@wp.kernel -def set_env_mask(env_mask: wp.array(dtype=bool), env_ids: wp.array(dtype=wp.int32)): - """Create an environment mask from the environment ids. - - Args: - env_mask: The environment mask (num_envs,). (modified) - env_ids: The environment ids. - """ - - idx = wp.tid() - env_mask[env_ids[idx]] = True - - -@wp.func -def split_transform_to_quat4lab(transform: wp.transformf) -> wp.vec4f: - """Split a frame transform into a quaternion in wxyz order. - - Args: - transform: The frame transform in xyzw order. - - Returns: - The quaternion in wxyz order. - """ - quat = wp.transform_get_rotation(transform) - return wp.vec4f(quat[3], quat[0], quat[1], quat[2]) - - -@wp.kernel -def update_source_transform( - offset: wp.transformf, - source_index: int, - source_transforms: wp.array2d(dtype=wp.transformf), - output_transforms: wp.array(dtype=wp.transformf), - output_pos: wp.array(dtype=wp.vec3f), - output_quat: wp.array(dtype=wp.vec4f), - env_mask: wp.array(dtype=bool), -): - """Update the source transform. - - This kernel applies the offset to the source frame transform and outputs the results - in the output transforms, positions, and quaternions. - - .. note:: The output positions and quaternions are used to keep everything compatible - with the rest of the code but they are not needed. - - Args: - offset: The offset of the source frame. - source_index: The index of the source frame. - source_transforms: The source frame transforms. - output_transforms: The output transforms (modified). - output_pos: The output positions (modified). - output_quat: The output quaternions (modified). - env_mask: The environment mask. - """ - idx = wp.tid() - if env_mask[idx]: - output_transforms[idx] = source_transforms[idx, source_index] * offset - output_pos[idx] = wp.transform_get_translation(output_transforms[idx]) - output_quat[idx] = split_transform_to_quat4lab(output_transforms[idx]) - - -@wp.kernel -def update_frame_transforms( - frame_offsets: wp.array(dtype=wp.transformf), - origin_transforms: wp.array(dtype=wp.transformf), - frames_transforms_world: wp.array2d(dtype=wp.transformf), - frames_transforms_origin: wp.array2d(dtype=wp.transformf), - frames_pos_world: wp.array2d(dtype=wp.vec3f), - frames_quat_world: wp.array2d(dtype=wp.vec4f), - frames_pos_origin: wp.array2d(dtype=wp.vec3f), - frames_quat_origin: wp.array2d(dtype=wp.vec4f), - frame_to_view_ids: wp.array(dtype=wp.int32), - body_to_view_ids: wp.array(dtype=wp.int32), - env_mask: wp.array(dtype=bool), -): - """Update the frame transforms. - - This kernel updates the frame transforms in the origin frame and the world frame. - - .. note:: The output positions and quaternions are used to keep everything compatible - with the rest of the code but they are not needed. - - Args: - frame_offsets: The offsets of the frames. - origin_transforms: The origin transforms. - frames_transforms_world: The frame transforms in world frame. - frames_transforms_origin: The frame transforms in origin frame (modified). - frames_pos_world: The frame positions in world frame (modified). - frames_quat_world: The frame quaternions in world frame (modified). - frames_pos_origin: The frame positions in origin frame (modified). - frames_quat_origin: The frame quaternions in origin frame (modified). - frame_to_view_ids: The mapping from frame to view ids. - body_to_view_ids: The mapping from body to view ids. - env_mask: The environment mask. - """ - env_idx, frame_idx = wp.tid() - - if env_mask[env_idx]: - frames_transforms_origin[env_idx][frame_to_view_ids[frame_idx]] = ( - frames_transforms_world[env_idx][body_to_view_ids[frame_idx]] * frame_offsets[frame_to_view_ids[frame_idx]] - ) - frames_pos_world[env_idx][frame_to_view_ids[frame_idx]] = wp.transform_get_translation( - frames_transforms_origin[env_idx][frame_to_view_ids[frame_idx]] - ) - frames_quat_world[env_idx][frame_to_view_ids[frame_idx]] = split_transform_to_quat4lab( - frames_transforms_origin[env_idx][frame_to_view_ids[frame_idx]] - ) - frames_transforms_origin[env_idx][frame_to_view_ids[frame_idx]] = ( - wp.transform_inverse(origin_transforms[env_idx]) - * frames_transforms_origin[env_idx][frame_to_view_ids[frame_idx]] - ) - frames_pos_origin[env_idx][frame_to_view_ids[frame_idx]] = wp.transform_get_translation( - frames_transforms_origin[env_idx][frame_to_view_ids[frame_idx]] - ) - frames_quat_origin[env_idx][frame_to_view_ids[frame_idx]] = split_transform_to_quat4lab( - frames_transforms_origin[env_idx][frame_to_view_ids[frame_idx]] - ) - - -""" -FrameTransformer class -""" - - -class FrameTransformer(SensorBase): - """A sensor for reporting frame transforms. - - This class provides an interface for reporting the transform of one or more frames (target frames) - with respect to another frame (source frame). The source frame is specified by the user as a prim path - (:attr:`FrameTransformerCfg.prim_path`) and the target frames are specified by the user as a list of - prim paths (:attr:`FrameTransformerCfg.target_frames`). - - The source frame and target frames are assumed to be rigid bodies. The transform of the target frames - with respect to the source frame is computed by first extracting the transform of the source frame - and target frames from the physics engine and then computing the relative transform between the two. - - Additionally, the user can specify an offset for the source frame and each target frame. This is useful - for specifying the transform of the desired frame with respect to the body's center of mass, for instance. - - A common example of using this sensor is to track the position and orientation of the end effector of a - robotic manipulator. In this case, the source frame would be the body corresponding to the base frame of the - manipulator, and the target frame would be the body corresponding to the end effector. Since the end-effector is - typically a fictitious body, the user may need to specify an offset from the end-effector to the body of the - manipulator. - - """ - - cfg: FrameTransformerCfg - """The configuration parameters.""" - - def __init__(self, cfg: FrameTransformerCfg): - """Initializes the frame transformer object. - - Args: - cfg: The configuration parameters. - """ - # initialize base class - super().__init__(cfg) - # Create empty variables for storing output data - self._data: FrameTransformerData = FrameTransformerData() - # Warp buffers used to store the frame transforms. - # Note we bind these buffers to the data fields in the _initialize_impl method. This way any changes - # made to the buffers are reflected in the data fields. - self._ALL_ENV_MASK = None - self._ENV_MASK = None - self._warp_source_pos_w = None - self._warp_source_quat_w = None - self._warp_source_transforms_w = None - self._warp_target_pos_w = None - self._warp_target_quat_w = None - self._warp_target_pos_source = None - self._warp_target_quat_source = None - self._warp_target_transform_source = None - - def __str__(self) -> str: - """Returns: A string containing information about the instance.""" - return ( - f"FrameTransformer @ '{self.cfg.prim_path}': \n" - f"\ttracked body frames: {[self._source_frame_body_name] + self._target_frame_body_names} \n" - f"\tnumber of envs: {self._num_envs}\n" - f"\tsource body frame: {self._source_frame_body_name}\n" - f"\ttarget frames (count: {self._target_frame_names}): {len(self._target_frame_names)}\n" - ) - - """ - Properties - """ - - @property - def data(self) -> FrameTransformerData: - # update sensors if needed - self._update_outdated_buffers() - # return the data - return self._data - - @property - def num_bodies(self) -> int: - """Returns the number of target bodies being tracked. - - Note: - This is an alias used for consistency with other sensors. Otherwise, we recommend using - :attr:`len(data.target_frame_names)` to access the number of target frames. - """ - return len(self._target_frame_body_names) - - @property - def body_names(self) -> list[str]: - """Returns the names of the target bodies being tracked. - - Note: - This is an alias used for consistency with other sensors. Otherwise, we recommend using - :attr:`data.target_frame_names` to access the target frame names. - """ - return self._target_frame_body_names - - """ - Operations - """ - - def reset(self, env_ids: Sequence[int] | None = None, env_mask: wp.array | None = None): - # reset the timers and counters - super().reset(env_ids) - # resolve None - if env_ids is None: - env_ids = ... - - def find_bodies(self, name_keys: str | Sequence[str], preserve_order: bool = False) -> tuple[list[int], list[str]]: - """Find bodies in the articulation based on the name keys. - - Args: - name_keys: A regular expression or a list of regular expressions to match the body names. - preserve_order: Whether to preserve the order of the name keys in the output. Defaults to False. - - Returns: - A tuple of lists containing the body indices and names. - """ - return string_utils.resolve_matching_names(name_keys, self._target_frame_names, preserve_order) - - """ - Implementation. - """ - - def _initialize_impl(self): - super()._initialize_impl() - - # Collect all target frames, their associated body prim paths and their offsets - frames = [target_frame.name for target_frame in self.cfg.target_frames] - frame_prim_paths = [target_frame.prim_path for target_frame in self.cfg.target_frames] - # First element is None because source frame offset is handled separately - frame_offsets = [target_frame.offset for target_frame in self.cfg.target_frames] - - # Loop through all the views attached to the Newton manager to find the source frame body - # and set view id and the id of the body within that view. - source_frame_prim_path = self.cfg.prim_path - frame_found = False - body_name = source_frame_prim_path.rsplit("/", 1)[-1] - for view_id, view in enumerate(NewtonManager.get_physics_sim_view()): - for body_id, view_body_name in enumerate(view.body_names): - if body_name == view_body_name: - self._warp_source_body_id = body_id - self._warp_source_view_id = view_id - # Convert the offset to a wp.transformf - self._warp_source_offset = wp.transformf( - self.cfg.source_frame_offset.pos[0], - self.cfg.source_frame_offset.pos[1], - self.cfg.source_frame_offset.pos[2], - self.cfg.source_frame_offset.rot[1], - self.cfg.source_frame_offset.rot[2], - self.cfg.source_frame_offset.rot[3], - self.cfg.source_frame_offset.rot[0], - ) - frame_found = True - break - if frame_found: - break - # Raise an error if the source frame body is not found - if not frame_found: - raise ValueError(f"Source frame '{body_name}' not found.") - self._source_frame_body_name = body_name - # Print the information about the source frame body - print("[INFO]: Initializing FrameTransformer!") - print(f"[INFO]: Using source body: {body_name} as reference frame.") - print(f"[INFO]: + Body found in view id: {self._warp_source_view_id}.") - print(f"[INFO]: + Body id in view {self._warp_source_view_id}: {self._warp_source_body_id}.") - - # Go through all the target frames and find the bodies matching the regex - matching_prims = [] - offsets = [] - self._target_frame_names = [] - for prim_path, offset, frame_name in zip(frame_prim_paths, frame_offsets, frames): - # Find the matching prims - prims = sim_utils.find_matching_prims(prim_path) - # Duplicate the target frame names if more than one prim is found - self._target_frame_names.extend([frame_name] * len(prims)) - # Add to the list all the matching prims - matching_prims.extend(prims) - # Duplicate the offsets if more than one prim is found - offsets.extend([offset] * len(prims)) - # Convert the matching prims to a list of prim paths - matching_prims = [prim.GetPath().pathString for prim in matching_prims] - - # Set the number of bodies/frames found in the scene - self._num_frames = len(matching_prims) - # Create a buffer to store the pose of the target frames - pose = torch.zeros((self._num_frames, 7), dtype=torch.float32, device=self._device) - # Set the default to identity transform - pose[:, -1] = 1.0 - - # Create a dictionary to store the body id, frame id and body name for each view - self._warp_view_body_id = {} - self._warp_view_frame_id = {} - self._warp_view_body_name = {} - self._target_frame_body_names = [] - - # Loop through all the matching prims and offsets to find the bodies in the scene - for frame_id, (prim_path, offset) in enumerate(zip(matching_prims, offsets)): - frame_found = False - # Go through all the views to find the requested bodies - body_name = prim_path.rsplit("/", 1)[-1] - for view_id, view in enumerate(NewtonManager.get_physics_sim_view()): - # Get the body names from the view - view_body_names = view.body_names - for body_id, view_body_name in enumerate(view_body_names): - if body_name == view_body_name: - frame_found = True - if view_id not in self._warp_view_body_id.keys(): - self._warp_view_body_id[view_id] = [body_id] - self._warp_view_frame_id[view_id] = [frame_id] - self._warp_view_body_name[view_id] = [body_name] - else: - self._warp_view_body_id[view_id].append(body_id) - self._warp_view_frame_id[view_id].append(frame_id) - self._warp_view_body_name[view_id].append(body_name) - self._target_frame_body_names.append(body_name) - break - if frame_found: - break - # If the offset is not None, then set the pose of the target frame - if offset is not None: - pose[frame_id, :3] = torch.tensor(offset.pos, device=self._device) - # Warp wants quaternions in xyzw order - pose[frame_id, 3] = offset.rot[1] - pose[frame_id, 4] = offset.rot[2] - pose[frame_id, 5] = offset.rot[3] - pose[frame_id, 6] = offset.rot[0] - # Raise an error if the frame is not found - if not frame_found: - raise ValueError(f"Frame '{body_name}' found.") - # Set the target frame names - self._data.target_frame_names = self._target_frame_names - print(f"[INFO]: Found {self._num_frames} target frames.") - for key in self._warp_view_body_name: - print(f"[INFO]: + Found {len(self._warp_view_body_name[key])} bodies in view {key}.") - for body_name, body_id, frame_id in zip( - self._warp_view_body_name[key], self._warp_view_body_id[key], self._warp_view_frame_id[key] - ): - print(f"[INFO]: + Found {body_name} in view {key} with body id {body_id} and frame id {frame_id}.") - print("[INFO]: FrameTransformer initialized!") - # Convert the pose to a wp.array - self._warp_offset_buffer = wp.from_torch(pose, dtype=wp.transformf) - - for key in self._warp_view_body_id.keys(): - self._warp_view_body_id[key] = wp.array(self._warp_view_body_id[key], dtype=wp.int32) - for key in self._warp_view_frame_id.keys(): - self._warp_view_frame_id[key] = wp.array(self._warp_view_frame_id[key], dtype=wp.int32) - - # Populate the warp buffers - self._ALL_ENV_MASK = wp.ones((self._num_envs,), dtype=bool, device=self._device) - self._ENV_MASK = wp.zeros((self._num_envs,), dtype=bool, device=self._device) - self._warp_source_pos_w = wp.zeros((self._num_envs,), dtype=wp.vec3f, device=self._device) - self._warp_source_quat_w = wp.zeros((self._num_envs,), dtype=wp.vec4f, device=self._device) - self._warp_source_transforms_w = wp.zeros((self._num_envs,), dtype=wp.transformf, device=self._device) - self._warp_target_pos_w = wp.zeros( - ( - self._num_envs, - self._num_frames, - ), - dtype=wp.vec3f, - device=self._device, - ) - self._warp_target_quat_w = wp.zeros( - ( - self._num_envs, - self._num_frames, - ), - dtype=wp.vec4f, - device=self._device, - ) - self._warp_target_pos_source = wp.zeros_like(self._warp_target_pos_w, device=self._device) - self._warp_target_quat_source = wp.zeros_like(self._warp_target_quat_w, device=self._device) - self._warp_target_transform_source = wp.zeros( - ( - self._num_envs, - self._num_frames, - ), - dtype=wp.transformf, - device=self._device, - ) - # Bindings with dataclass - self._data.source_pos_w = wp.to_torch(self._warp_source_pos_w) - self._data.source_quat_w = wp.to_torch(self._warp_source_quat_w) - self._data.target_pos_w = wp.to_torch(self._warp_target_pos_w) - self._data.target_quat_w = wp.to_torch(self._warp_target_quat_w) - self._data.target_pos_source = wp.to_torch(self._warp_target_pos_source) - self._data.target_quat_source = wp.to_torch(self._warp_target_quat_source) - # Bind with Newton: - self._warp_views = {} - for key in self._warp_view_body_id: - view = NewtonManager.get_physics_sim_view()[key].get_link_transforms(NewtonManager.get_state_0())[:, 0] - if len(view.shape) == 1: - view = view.reshape((-1, 1)) - self._warp_views[key] = view - - def _update_buffers_impl(self, env_ids: Sequence[int]): - """Fills the buffers of the sensor data.""" - # default to all sensors, create a mask for the environment ids if needed - if (len(env_ids) == self._num_envs) or (env_ids is None): - env_mask = self._ALL_ENV_MASK - else: - self._ENV_MASK.fill_(False) - wp.launch( - set_env_mask, - dim=len(env_ids), - inputs=[ - self._ENV_MASK, - env_ids.to(torch.int32), - ], - ) - env_mask = self._ENV_MASK - - # Update the source frame transform - wp.launch( - update_source_transform, - dim=self._num_envs, - inputs=[ - self._warp_source_offset, - self._warp_source_body_id, - self._warp_views[self._warp_source_view_id], - self._warp_source_transforms_w, - self._warp_source_pos_w, - self._warp_source_quat_w, - env_mask, - ], - ) - # Update the frame transforms in the origin frame and the world frame - for view_id in self._warp_view_body_id.keys(): - wp.launch( - update_frame_transforms, - dim=(self._num_envs, self._warp_view_frame_id[view_id].shape[0]), - inputs=[ - self._warp_offset_buffer, - self._warp_source_transforms_w, - self._warp_views[view_id], - self._warp_target_transform_source, - self._warp_target_pos_w, - self._warp_target_quat_w, - self._warp_target_pos_source, - self._warp_target_quat_source, - self._warp_view_frame_id[view_id], - self._warp_view_body_id[view_id], - env_mask, - ], - ) - - # def _set_debug_vis_impl(self, debug_vis: bool): - # # set visibility of markers - # # note: parent only deals with callbacks. not their visibility - # if debug_vis: - # if not hasattr(self, "frame_visualizer"): - # self.frame_visualizer = VisualizationMarkers(self.cfg.visualizer_cfg) - - # # set their visibility to true - # self.frame_visualizer.set_visibility(True) - # else: - # if hasattr(self, "frame_visualizer"): - # self.frame_visualizer.set_visibility(False) - - # def _debug_vis_callback(self, event): - # return - # # Get the all frames pose - # frames_pos = torch.cat([self._data.source_pos_w, self._data.target_pos_w.view(-1, 3)], dim=0) - # frames_quat = torch.cat([self._data.source_quat_w, self._data.target_quat_w.view(-1, 4)], dim=0) - - # # Get the all connecting lines between frames pose - # lines_pos, lines_quat, lines_length = self._get_connecting_lines( - # start_pos=self._data.source_pos_w.repeat_interleave(self._data.target_pos_w.size(1), dim=0), - # end_pos=self._data.target_pos_w.view(-1, 3), - # ) - - # # Initialize default (identity) scales and marker indices for all markers (frames + lines) - # marker_scales = torch.ones(frames_pos.size(0) + lines_pos.size(0), 3) - # marker_indices = torch.zeros(marker_scales.size(0)) - - # # Set the z-scale of line markers to represent their actual length - # marker_scales[-lines_length.size(0) :, -1] = lines_length - - # # Assign marker config index 1 to line markers - # marker_indices[-lines_length.size(0) :] = 1 - - # # Update the frame and the connecting line visualizer - # self.frame_visualizer.visualize( - # translations=torch.cat((frames_pos, lines_pos), dim=0), - # orientations=torch.cat((frames_quat, lines_quat), dim=0), - # scales=marker_scales, - # marker_indices=marker_indices, - # ) - - """ - Internal simulation callbacks. - """ - - def _invalidate_initialize_callback(self, event): - """Invalidates the scene elements.""" - # call parent - super()._invalidate_initialize_callback(event) - # set all existing views to None to invalidate them - self._frame_physx_view = None - - """ - Internal helpers. - """ - - def _get_connecting_lines( - self, start_pos: torch.Tensor, end_pos: torch.Tensor - ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: - """ - Given start and end points, compute the positions (mid-point), orientations, and lengths of the connecting lines. - - Args: - start_pos: The start positions of the connecting lines. Shape is (N, 3). - end_pos: The end positions of the connecting lines. Shape is (N, 3). - - Returns: - positions: The position of each connecting line. Shape is (N, 3). - orientations: The orientation of each connecting line in quaternion. Shape is (N, 4). - lengths: The length of each connecting line. Shape is (N,). - """ - direction = end_pos - start_pos - lengths = torch.norm(direction, dim=-1) - positions = (start_pos + end_pos) / 2 - - # Get default direction (along z-axis) - default_direction = torch.tensor([0.0, 0.0, 1.0], device=self.device).expand(start_pos.size(0), -1) - - # Normalize direction vector - direction_norm = normalize(direction) - - # Calculate rotation from default direction to target direction - rotation_axis = torch.linalg.cross(default_direction, direction_norm) - rotation_axis_norm = torch.norm(rotation_axis, dim=-1) + from isaaclab_newton.sensors.frame_transformer import FrameTransformer as NewtonFrameTransformer - # Handle case where vectors are parallel - mask = rotation_axis_norm > 1e-6 - rotation_axis = torch.where( - mask.unsqueeze(-1), - normalize(rotation_axis), - torch.tensor([1.0, 0.0, 0.0], device=self.device).expand(start_pos.size(0), -1), - ) - # Calculate rotation angle - cos_angle = torch.sum(default_direction * direction_norm, dim=-1) - cos_angle = torch.clamp(cos_angle, -1.0, 1.0) - angle = torch.acos(cos_angle) - orientations = quat_from_angle_axis(angle, rotation_axis) +class FrameTransformer(FactoryBase): + """Factory for creating frame transformer sensor instances.""" - return positions, orientations, lengths + def __new__(cls, *args, **kwargs) -> BaseFrameTransformer | NewtonFrameTransformer: + """Create a new instance of a frame transformer based on the backend.""" + return super().__new__(cls, *args, **kwargs) diff --git a/source/isaaclab/isaaclab/sensors/frame_transformer/frame_transformer_data.py b/source/isaaclab/isaaclab/sensors/frame_transformer/frame_transformer_data.py index 7ce9b0f436d6..2b258e7192ef 100644 --- a/source/isaaclab/isaaclab/sensors/frame_transformer/frame_transformer_data.py +++ b/source/isaaclab/isaaclab/sensors/frame_transformer/frame_transformer_data.py @@ -3,54 +3,23 @@ # # SPDX-License-Identifier: BSD-3-Clause -import torch -from dataclasses import dataclass +from __future__ import annotations +from typing import TYPE_CHECKING -@dataclass -class FrameTransformerData: - """Data container for the frame transformer sensor.""" +from isaaclab.utils.backend_utils import FactoryBase - target_frame_names: list[str] = None - """Target frame names (this denotes the order in which that frame data is ordered). +from .base_frame_transformer_data import BaseFrameTransformerData - The frame names are resolved from the :attr:`FrameTransformerCfg.FrameCfg.name` field. - This does not necessarily follow the order in which the frames are defined in the config due to - the regex matching. - """ +if TYPE_CHECKING: + from isaaclab_newton.sensors.frame_transformer.frame_transformer_data import ( + FrameTransformerData as NewtonFrameTransformerData, + ) - target_pos_source: torch.Tensor = None - """Position of the target frame(s) relative to source frame. - Shape is (N, M, 3), where N is the number of environments, and M is the number of target frames. - """ +class FrameTransformerData(FactoryBase): + """Factory for creating frame transformer data instances.""" - target_quat_source: torch.Tensor = None - """Orientation of the target frame(s) relative to source frame quaternion (w, x, y, z). - - Shape is (N, M, 4), where N is the number of environments, and M is the number of target frames. - """ - - target_pos_w: torch.Tensor = None - """Position of the target frame(s) after offset (in world frame). - - Shape is (N, M, 3), where N is the number of environments, and M is the number of target frames. - """ - - target_quat_w: torch.Tensor = None - """Orientation of the target frame(s) after offset (in world frame) quaternion (w, x, y, z). - - Shape is (N, M, 4), where N is the number of environments, and M is the number of target frames. - """ - - source_pos_w: torch.Tensor = None - """Position of the source frame after offset (in world frame). - - Shape is (N, 3), where N is the number of environments. - """ - - source_quat_w: torch.Tensor = None - """Orientation of the source frame after offset (in world frame) quaternion (w, x, y, z). - - Shape is (N, 4), where N is the number of environments. - """ + def __new__(cls, *args, **kwargs) -> BaseFrameTransformerData | NewtonFrameTransformerData: + """Create a new instance of frame transformer data based on the backend.""" + return super().__new__(cls, *args, **kwargs) diff --git a/source/isaaclab/isaaclab/sim/scene_data_providers/physx_scene_data_provider.py b/source/isaaclab/isaaclab/sim/scene_data_providers/physx_scene_data_provider.py index 84eae7b5b85b..877795276b90 100644 --- a/source/isaaclab/isaaclab/sim/scene_data_providers/physx_scene_data_provider.py +++ b/source/isaaclab/isaaclab/sim/scene_data_providers/physx_scene_data_provider.py @@ -103,7 +103,7 @@ def __init__(self, visualizer_cfgs: list[Any] | None, stage, simulation_context) self._rigid_body_paths: list[str] = [] self._articulation_paths: list[str] = [] self._set_body_q_kernel = None - # env_id -> list of body indices (in Newton body_key order) + # env_id -> list of body indices (in Newton body_label order) self._env_id_to_body_indices: dict[int, list[int]] = {} # Initialize Newton pipeline only if needed for visualization @@ -147,8 +147,8 @@ def _build_newton_model_from_usd(self) -> None: self._newton_state = self._newton_model.state() # Extract scene structure from Newton model (single source of truth) - self._rigid_body_paths = list(self._newton_model.body_key) - self._articulation_paths = list(self._newton_model.articulation_key) + self._rigid_body_paths = list(self._newton_model.body_label) + self._articulation_paths = list(self._newton_model.articulation_label) self._xform_views.clear() self._view_body_index_map = {} @@ -186,7 +186,7 @@ def _build_filtered_newton_model(self, env_ids: list[int]) -> None: self._filtered_newton_state = self._filtered_newton_model.state() full_index_by_path = {path: i for i, path in enumerate(self._rigid_body_paths)} - filtered_paths = list(self._filtered_newton_model.body_key) + filtered_paths = list(self._filtered_newton_model.body_label) self._filtered_body_indices = [] missing = [] for path in filtered_paths: @@ -278,7 +278,7 @@ def _get_view_world_poses(self, view): return None, None def _cache_view_index_map(self, view, key: str) -> None: - """Map PhysX view indices to Newton body_key ordering.""" + """Map PhysX view indices to Newton body_label ordering.""" prim_paths = getattr(view, "prim_paths", None) if not prim_paths or not self._rigid_body_paths: return @@ -408,7 +408,7 @@ def _read_poses_from_best_source(self) -> tuple[Any, Any, str, Any] | None: num_bodies = len(self._rigid_body_paths) if num_bodies != self._newton_state.body_q.shape[0]: - logger.warning(f"Body count mismatch: body_key={num_bodies}, state={self._newton_state.body_q.shape[0]}") + logger.warning(f"Body count mismatch: body_label={num_bodies}, state={self._newton_state.body_q.shape[0]}") return None # Allocate outputs in Newton body order. diff --git a/source/isaaclab/isaaclab/utils/warp/math_ops.py b/source/isaaclab/isaaclab/utils/warp/math_ops.py index 1cb0f264c1b0..7eafb75b20a2 100644 --- a/source/isaaclab/isaaclab/utils/warp/math_ops.py +++ b/source/isaaclab/isaaclab/utils/warp/math_ops.py @@ -291,3 +291,26 @@ def inplace_square(x: Any): source_ndims = get_ndims(x) # Fetch the appropriate kernel from the cache and launch it wp.launch(_get_square_kernel(source_ndims), dim=x.shape, inputs=[x]) + + +def transform_to_vec_quat( + t: wp.array, +) -> tuple[wp.array, wp.array]: + """Split a wp.transformf array into position (vec3f) and quaternion (quatf) arrays. + + Zero-copy: returns views into the same underlying memory. + + Args: + t: Array of transforms (dtype=wp.transformf). Shape ``(N,)``, ``(N, M)``, or ``(N, M, K)``. + + Returns: + Tuple of (positions, quaternions) as warp array views with matching dimensionality. + """ + floats = t.view(wp.float32) + if t.ndim == 1: + return floats[:, :3].view(wp.vec3f), floats[:, 3:].view(wp.quatf) + if t.ndim == 2: + return floats[:, :, :3].view(wp.vec3f), floats[:, :, 3:].view(wp.quatf) + if t.ndim == 3: + return floats[:, :, :, :3].view(wp.vec3f), floats[:, :, :, 3:].view(wp.quatf) + raise ValueError(f"Expected 1D, 2D, or 3D transform array, got ndim={t.ndim}") diff --git a/source/isaaclab_newton/isaaclab_newton/assets/articulation/articulation.py b/source/isaaclab_newton/isaaclab_newton/assets/articulation/articulation.py index d8f961138a88..73b3a5c05533 100644 --- a/source/isaaclab_newton/isaaclab_newton/assets/articulation/articulation.py +++ b/source/isaaclab_newton/isaaclab_newton/assets/articulation/articulation.py @@ -2278,7 +2278,7 @@ def _process_parameter_override(self): # 1D flattened array param = getattr(NewtonManager.get_model(), param_name) # Search over all bodies as organized in the environment - body_subset = NewtonManager.get_model().body_key + body_subset = NewtonManager.get_model().body_label param_expr = ".*" if param_expr is None else param_expr indices, _ = string_utils.resolve_matching_names(param_expr, body_subset, False) indices = wp.array(indices, dtype=wp.int32, device=self.device) @@ -2296,7 +2296,7 @@ def _process_parameter_override(self): # 1D flattened array param = getattr(NewtonManager.get_model(), param_name) # Search over all shapes as organized in the environment - all_shapes = NewtonManager.get_model().shape_key + all_shapes = NewtonManager.get_model().shape_label param_expr = ".*" if param_expr is None else param_expr indices, _ = string_utils.resolve_matching_names(param_expr, all_shapes, False) indices = wp.array(indices, dtype=wp.int32, device=self.device) @@ -2304,7 +2304,7 @@ def _process_parameter_override(self): # 1D flattened array param = getattr(NewtonManager.get_model(), param_name) # Search over all joints as organized in the environment - all_joints = NewtonManager.get_model().joint_key + all_joints = NewtonManager.get_model().joint_label param_expr = ".*" if param_expr is None else param_expr indices, _ = string_utils.resolve_matching_names(param_expr, all_joints, False) indices = wp.array(indices, dtype=wp.int32, device=self.device) diff --git a/source/isaaclab_newton/isaaclab_newton/assets/articulation/articulation_data.py b/source/isaaclab_newton/isaaclab_newton/assets/articulation/articulation_data.py index ebdf9bf19dd3..0d6a9d022e2b 100644 --- a/source/isaaclab_newton/isaaclab_newton/assets/articulation/articulation_data.py +++ b/source/isaaclab_newton/isaaclab_newton/assets/articulation/articulation_data.py @@ -2182,16 +2182,10 @@ def _create_simulation_bindings(self) -> None: n_dof = self._root_view.joint_dof_count # -- root properties - if self._root_view.is_fixed_base: - self._sim_bind_root_link_pose_w = self._root_view.get_root_transforms(NewtonManager.get_state_0())[:, 0, 0] - else: - self._sim_bind_root_link_pose_w = self._root_view.get_root_transforms(NewtonManager.get_state_0())[:, 0] + self._sim_bind_root_link_pose_w = self._root_view.get_root_transforms(NewtonManager.get_state_0())[:, 0] self._sim_bind_root_com_vel_w = self._root_view.get_root_velocities(NewtonManager.get_state_0()) if self._sim_bind_root_com_vel_w is not None: - if self._root_view.is_fixed_base: - self._sim_bind_root_com_vel_w = self._sim_bind_root_com_vel_w[:, 0, 0] - else: - self._sim_bind_root_com_vel_w = self._sim_bind_root_com_vel_w[:, 0] + self._sim_bind_root_com_vel_w = self._sim_bind_root_com_vel_w[:, 0] # -- body properties self._sim_bind_body_com_pos_b = self._root_view.get_attribute("body_com", NewtonManager.get_model())[:, 0] self._sim_bind_body_link_pose_w = self._root_view.get_link_transforms(NewtonManager.get_state_0())[:, 0] diff --git a/source/isaaclab_newton/isaaclab_newton/assets/rigid_object/rigid_object.py b/source/isaaclab_newton/isaaclab_newton/assets/rigid_object/rigid_object.py index e7e4d16a0fd2..5a7efdacc40e 100644 --- a/source/isaaclab_newton/isaaclab_newton/assets/rigid_object/rigid_object.py +++ b/source/isaaclab_newton/isaaclab_newton/assets/rigid_object/rigid_object.py @@ -936,7 +936,7 @@ def _process_parameter_override(self): # 1D flattened array param = getattr(NewtonManager.get_model(), param_name) # Search over all bodies as organized in the environment - body_subset = NewtonManager.get_model().body_key + body_subset = NewtonManager.get_model().body_label param_expr = ".*" if param_expr is None else param_expr indices, _ = string_utils.resolve_matching_names(param_expr, body_subset, False) indices = wp.array(indices, dtype=wp.int32, device=self.device) @@ -944,7 +944,7 @@ def _process_parameter_override(self): # 1D flattened array param = getattr(NewtonManager.get_model(), param_name) # Search over all shapes as organized in the environment - all_shapes = NewtonManager.get_model().shape_key + all_shapes = NewtonManager.get_model().shape_label param_expr = ".*" if param_expr is None else param_expr indices, _ = string_utils.resolve_matching_names(param_expr, all_shapes, False) indices = wp.array(indices, dtype=wp.int32, device=self.device) @@ -952,7 +952,7 @@ def _process_parameter_override(self): # 1D flattened array param = getattr(NewtonManager.get_model(), param_name) # Search over all joints as organized in the environment - all_joints = NewtonManager.get_model().joint_key + all_joints = NewtonManager.get_model().joint_label param_expr = ".*" if param_expr is None else param_expr indices, _ = string_utils.resolve_matching_names(param_expr, all_joints, False) indices = wp.array(indices, dtype=wp.int32, device=self.device) diff --git a/source/isaaclab_newton/isaaclab_newton/cloner/newton_replicate.py b/source/isaaclab_newton/isaaclab_newton/cloner/newton_replicate.py index 1777d01be7cc..93039791482e 100644 --- a/source/isaaclab_newton/isaaclab_newton/cloner/newton_replicate.py +++ b/source/isaaclab_newton/isaaclab_newton/cloner/newton_replicate.py @@ -11,6 +11,30 @@ from pxr import Usd, UsdGeom +def _proto_env_mappings( + sources: list[str], + destinations: list[str], + mapping: torch.Tensor, + env_ids: torch.Tensor, +) -> list[tuple[str, str, dict[int, int]]]: + """Map each prototype source to its destination template and per-world env IDs. + + Returns one ``(src_prefix, dest_template, world_to_env)`` tuple per source, + where *world_to_env* maps Newton world IDs to environment IDs for worlds + that contain that source. Used by both :func:`_cl_inject_sites` (to + translate sensor body patterns into prototype-local paths) and the rename + loop (to rewrite labels from prototype paths to per-env paths). + """ + result: list[tuple[str, str, dict[int, int]]] = [] + for i, src_path in enumerate(sources): + src_prefix = src_path.rstrip("/") + dest_template = destinations[i] + world_cols = torch.nonzero(mapping[i], as_tuple=True)[0].tolist() + world_to_env = {c: int(env_ids[c]) for c in world_cols} + result.append((src_prefix, dest_template, world_to_env)) + return result + + def newton_replicate( stage: Usd.Stage, sources: list[str], @@ -33,13 +57,13 @@ def newton_replicate( quaternions[:, 3] = 1.0 # load empty stage - builder = ModelBuilder(up_axis=up_axis) + builder = NewtonManager.create_builder(up_axis=up_axis) stage_info = builder.add_usd(stage, ignore_paths=["/World/envs"] + sources) # build a prototype for each source - protos: dict[str, ModelBuilder] = {} + proto_builders: dict[str, ModelBuilder] = {} for src_path in sources: - p = ModelBuilder(up_axis=up_axis) + p = NewtonManager.create_builder(up_axis=up_axis) solvers.SolverMuJoCo.register_custom_attributes(p) inverse_env_xform = get_inverse_env_xform(stage, src_path) p.add_usd( @@ -51,39 +75,34 @@ def newton_replicate( ) if simplify_meshes: p.approximate_meshes("convex_hull", keep_visual_shapes=True) - protos[src_path] = p + proto_builders[src_path] = p + + # Shared mapping used by both site injection and renaming + proto_env_map = _proto_env_mappings(sources, destinations, mapping, env_ids) + + # Inject registered sites into prototypes (and global sites into main builder) + NewtonManager._cl_inject_sites(builder, proto_builders, proto_env_map) # create a separate world for each environment (heterogeneous spawning) - # Newton assigns sequential world IDs (0, 1, 2, ...), so we need to track the mapping - newton_world_to_env_id = {} for col, env_id in enumerate(env_ids.tolist()): - # begin a new world context (Newton assigns world ID = col) builder.begin_world() - newton_world_to_env_id[col] = env_id - # add all active sources for this world for row in torch.nonzero(mapping[:, col], as_tuple=True)[0].tolist(): builder.add_builder( - protos[sources[row]], + proto_builders[sources[row]], xform=wp.transform(positions[col].tolist(), quaternions[col].tolist()), ) - # end the world context builder.end_world() - # per-source, per-world renaming (strict prefix swap), compact style preserved - for i, src_path in enumerate(sources): - src_prefix_len = len(src_path.rstrip("/")) - swap = lambda name, new_root: new_root + name[src_prefix_len:] # noqa: E731 - world_cols = torch.nonzero(mapping[i], as_tuple=True)[0].tolist() - # Map Newton world IDs (sequential) to destination paths using env_ids - world_roots = {int(env_ids[c]): destinations[i].format(int(env_ids[c])) for c in world_cols} - + # per-source, per-world renaming (strict prefix swap) + for src_prefix, dest_template, world_to_env in proto_env_map: + src_len = len(src_prefix) for t in ("body", "joint", "shape", "articulation"): - keys, worlds_arr = getattr(builder, f"{t}_key"), getattr(builder, f"{t}_world") + keys, worlds_arr = getattr(builder, f"{t}_label"), getattr(builder, f"{t}_world") for k, w in enumerate(worlds_arr): - if w in world_roots and keys[k].startswith(src_path): - keys[k] = swap(keys[k], world_roots[w]) + if w in world_to_env and keys[k].startswith(src_prefix): + keys[k] = dest_template.format(world_to_env[w]) + keys[k][src_len:] NewtonManager.set_builder(builder) NewtonManager._num_envs = mapping.size(1) diff --git a/source/isaaclab_newton/isaaclab_newton/physics/newton_manager.py b/source/isaaclab_newton/isaaclab_newton/physics/newton_manager.py index c658836ff94c..7394b427247b 100644 --- a/source/isaaclab_newton/isaaclab_newton/physics/newton_manager.py +++ b/source/isaaclab_newton/isaaclab_newton/physics/newton_manager.py @@ -10,15 +10,17 @@ import logging import numpy as np import re +import time from typing import TYPE_CHECKING import warp as wp -from newton import Axis, BroadPhaseMode, CollisionPipeline, Contacts, Control, Model, ModelBuilder, State, eval_fk -from newton.sensors import SensorContact as NewtonContactSensor +from newton import Axis, CollisionPipeline, Contacts, Control, Model, ModelBuilder, State, eval_fk +from newton.sensors import SensorContact, SensorFrameTransform from newton.solvers import SolverBase, SolverFeatherstone, SolverMuJoCo, SolverNotifyFlags, SolverXPBD from isaaclab.physics import PhysicsEvent, PhysicsManager from isaaclab.sim.utils.stage import get_current_stage +from isaaclab.utils.string import resolve_matching_names from isaaclab.utils.timer import Timer if TYPE_CHECKING: @@ -27,19 +29,6 @@ logger = logging.getLogger(__name__) -def flipped_match(x: str, y: str) -> bool: - """Flipped match function for contact partner matching. - - Args: - x: The body/shape name in the simulation. - y: The body/shape name in the contact view. - - Returns: - True if the body/shape name matches the contact view pattern. - """ - return re.match(y, x) is not None - - class NewtonManager(PhysicsManager): """Newton physics manager for Isaac Lab. @@ -72,7 +61,8 @@ class NewtonManager(PhysicsManager): _contacts: Contacts | None = None _needs_collision_pipeline: bool = False _collision_pipeline = None - _newton_contact_sensors: dict = {} # Maps sensor_key to NewtonContactSensor + _newton_contact_sensors: dict = {} # Maps sensor_key to SensorContact + _newton_frame_transform_sensors: list = [] # List of SensorFrameTransform _report_contacts: bool = False # CUDA graphing @@ -87,6 +77,11 @@ class NewtonManager(PhysicsManager): # Model changes (callbacks use unified system from PhysicsManager) _model_changes: set[int] = set() + # Pending site requests from sensors. + # Key: (body_pattern, xform_floats), Value: (label, wp.transform) + # identical (body_pattern, transform) reuses the same site. + _cl_pending_sites: dict[tuple[str | None, tuple[float, ...]], tuple[str, wp.transform]] = {} + # Views list for assets to register their views _views: list = [] @@ -144,6 +139,7 @@ def step(cls) -> None: wp.capture_launch(cls._graph) # type: ignore[arg-type] else: cls._simulate() + # time.sleep(0.005) # Debug convergence info if cfg is not None and cfg.debug_mode: # type: ignore[union-attr] @@ -190,12 +186,14 @@ def clear(cls): cls._needs_collision_pipeline = False cls._collision_pipeline = None cls._newton_contact_sensors = {} + cls._newton_frame_transform_sensors = [] cls._report_contacts = False cls._graph = None cls._newton_stage_path = None cls._usdrt_stage = None cls._up_axis = "Z" cls._model_changes = set() + cls._cl_pending_sites = {} cls._views = [] @classmethod @@ -203,6 +201,124 @@ def set_builder(cls, builder: ModelBuilder) -> None: """Set the Newton model builder.""" cls._builder = builder + @classmethod + def create_builder(cls, up_axis: str | None = None, **kwargs) -> ModelBuilder: + """Create a :class:`ModelBuilder` configured with default settings. + + Args: + up_axis: Override for the up-axis. Defaults to ``None``, which uses + the manager's ``_up_axis``. + **kwargs: Forwarded to :class:`ModelBuilder`. + + Returns: + New builder with up-axis and contact margin defaults applied. + """ + builder = ModelBuilder(up_axis=up_axis or cls._up_axis, **kwargs) + builder.default_shape_cfg.contact_margin = 0.01 + return builder + + @classmethod + def cl_register_site(cls, body_pattern: str | None, xform: wp.transform) -> str: + """Register a site request for injection into prototypes before replication. + + Sensors call this during ``__init__``. Sites are injected into prototype + builders by :meth:`_cl_inject_sites` (called from ``newton_replicate``) + before ``add_builder``, so they replicate correctly per-world. + + Identical ``(body_pattern, transform)`` registrations share sites. + + Args: + body_pattern: Env-regex body path + (e.g. ``"/World/envs/env_.*/Robot/link0"``), or ``None`` for + global sites (world-origin reference, etc.). + xform: Site transform relative to body. + + Returns: + Assigned site label suffix. + """ + xform_key = tuple(xform) + key = (body_pattern, xform_key) + if key in cls._cl_pending_sites: + return cls._cl_pending_sites[key][0] + label = f"ft_{len(cls._cl_pending_sites)}" + cls._cl_pending_sites[key] = (label, xform) + return label + + @classmethod + def _cl_inject_sites( + cls, + main_builder: ModelBuilder, + proto_builders: dict[str, ModelBuilder], + proto_env_map: list[tuple[str, str, dict[int, int]]], + ) -> None: + """Inject registered sites into prototype builders before replication. + + Non-global sites are matched against prototype body labels via prefix + translation (dest-regex → src prefix). Global sites (``body_pattern is + None``) are added to *main_builder* with ``body=-1``. + + Pending requests are cleared after processing. + + Args: + main_builder: Top-level builder that receives global sites. + proto_builders: ``{src_path: ModelBuilder}`` prototype builders. + proto_env_map: Output of :func:`_proto_env_mappings` — list of + ``(src_prefix, dest_template, world_to_env)`` tuples. + """ + if not cls._cl_pending_sites: + return + + for (body_pattern, _xform_key), (label, xform) in cls._cl_pending_sites.items(): + if body_pattern is None: + main_builder.add_site(body=-1, xform=xform, label=label) + continue + + for src_prefix, dest_template, _ in proto_env_map: + dest_prefix = dest_template.replace("{}", ".*").rstrip("/") + + if not body_pattern.startswith(dest_prefix): + continue + + suffix = body_pattern[len(dest_prefix) :] + proto_body_path = src_prefix + suffix + + proto = proto_builders.get(src_prefix) or proto_builders.get(src_prefix + "/") + if proto is None: + continue + + for body_idx, body_label in enumerate(proto.body_label): + if body_label.rstrip("/") == proto_body_path.rstrip("/"): + site_label = f"{body_label}/{label}" + proto.add_site(body=body_idx, xform=xform, label=site_label) + logger.debug(f"Injected site '{site_label}' into prototype") + break + + cls._cl_pending_sites.clear() + + @classmethod + def _cl_inject_sites_fallback(cls) -> None: + """Inject pending sites into the flat builder (no-replication path). + + Uses regex matching against ``_builder.body_label`` instead of + prefix translation through prototypes. + """ + if not cls._cl_pending_sites: + return + + builder = cls._builder + body_labels = list(builder.body_label) + + for (body_pattern, _xform_key), (label, xform) in cls._cl_pending_sites.items(): + if body_pattern is None: + builder.add_site(body=-1, xform=xform, label=label) + else: + for body_idx, body_label in enumerate(body_labels): + if re.fullmatch(body_pattern, body_label): + site_label = f"{body_label}/{label}" + builder.add_site(body=body_idx, xform=xform, label=site_label) + + cls._cl_pending_sites.clear() + @classmethod def add_model_change(cls, change: SolverNotifyFlags) -> None: """Register a model change to notify the solver.""" @@ -225,11 +341,13 @@ def start_simulation(cls) -> None: logger.info("Dispatching MODEL_INIT callbacks") cls.dispatch_event(PhysicsEvent.MODEL_INIT) + # Inject any pending site requests (no-replication fallback path). + # In the replication path, _cl_inject_sites() already ran from newton_replicate. + cls._cl_inject_sites_fallback() + device = PhysicsManager._device logger.info(f"Finalizing model on device: {device}") cls._builder.up_axis = Axis.from_string(cls._up_axis) - # Set smaller contact margin for manipulation examples (default 10cm is too large) - cls._builder.default_shape_cfg.contact_margin = 0.01 with Timer(name="newton_finalize_builder", msg="Finalize builder took:", enable=True, format="ms"): cls._model = cls._builder.finalize(device=device) cls._model.set_gravity(cls._gravity_vector) @@ -248,7 +366,7 @@ def start_simulation(cls) -> None: import usdrt cls._usdrt_stage = get_current_stage(fabric=True) - for i, prim_path in enumerate(cls._model.body_key): + for i, prim_path in enumerate(cls._model.body_label): prim = cls._usdrt_stage.GetPrimAtPath(prim_path) prim.CreateAttribute(cls._newton_index_attr, usdrt.Sdf.ValueTypeNames.UInt, True) prim.GetAttribute(cls._newton_index_attr).Set(i) @@ -263,7 +381,7 @@ def instantiate_builder_from_stage(cls): stage = get_current_stage() up_axis = UsdGeom.GetStageUpAxis(stage) - builder = ModelBuilder(up_axis=up_axis) + builder = cls.create_builder(up_axis=up_axis) builder.add_usd(stage) cls.set_builder(builder) @@ -277,7 +395,7 @@ def _initialize_contacts(cls) -> None: if cls._needs_collision_pipeline: # Newton collision pipeline: create pipeline and generate contacts if cls._collision_pipeline is None: - cls._collision_pipeline = CollisionPipeline(cls._model, broad_phase_mode=BroadPhaseMode.EXPLICIT) + cls._collision_pipeline = CollisionPipeline(cls._model, broad_phase="explicit") if cls._contacts is None: cls._contacts = cls._collision_pipeline.contacts() @@ -393,6 +511,11 @@ def step_fn(state_0, state_1): cls._state_0, cls._state_1 = cls._state_1, cls._state_0 cls._state_0.clear_forces() + # Update frame transform sensors + if cls._newton_frame_transform_sensors: + for sensor in cls._newton_frame_transform_sensors: + sensor.update(cls._state_0) + # Populate contacts for contact sensors if cls._report_contacts: # For newton_contacts (unified pipeline): use locally computed contacts @@ -400,7 +523,7 @@ def step_fn(state_0, state_1): eval_contacts = contacts if contacts is not None else cls._contacts cls._solver.update_contacts(eval_contacts, cls._state_0) for sensor in cls._newton_contact_sensors.values(): - sensor.eval(eval_contacts) + sensor.update(eval_contacts) @classmethod def get_solver_convergence_steps(cls) -> dict[str, float | int]: @@ -453,18 +576,21 @@ def add_contact_sensor( contact_partners_shape_expr: str | list[str] | None = None, prune_noncolliding: bool = True, verbose: bool = False, - ) -> None: + ) -> tuple: """Add a contact sensor for reporting contacts between bodies/shapes. - Note: Only one contact sensor can be active at a time. + Regex expressions are resolved to ``list[int]`` indices before being forwarded to the Newton sensor. Args: - body_names_expr: Expression for body names to sense. - shape_names_expr: Expression for shape names to sense. - contact_partners_body_expr: Expression for contact partner body names. - contact_partners_shape_expr: Expression for contact partner shape names. + body_names_expr: Regex expression for body names to sense. + shape_names_expr: Regex expression for shape names to sense. + contact_partners_body_expr: Regex expression for contact partner body names. + contact_partners_shape_expr: Regex expression for contact partner shape names. prune_noncolliding: Make force matrix sparse using collision pairs. verbose: Print verbose information. + + Returns: + Hashable sensor key for looking up the sensor in :attr:`_newton_contact_sensors`. """ # Validate inputs if body_names_expr is None and shape_names_expr is None: @@ -474,23 +600,46 @@ def add_contact_sensor( if contact_partners_body_expr is not None and contact_partners_shape_expr is not None: raise ValueError("Only one of contact_partners_body_expr or contact_partners_shape_expr must be provided") + # Resolve regex expressions to integer indices + body_indices = resolve_matching_names(body_names_expr, cls._model.body_label)[0] if body_names_expr else None + shape_indices = ( + resolve_matching_names(shape_names_expr, cls._model.shape_label)[0] if shape_names_expr else None + ) + partner_body_indices = ( + resolve_matching_names(contact_partners_body_expr, cls._model.body_label)[0] + if contact_partners_body_expr + else None + ) + partner_shape_indices = ( + resolve_matching_names(contact_partners_shape_expr, cls._model.shape_label)[0] + if contact_partners_shape_expr + else None + ) + # Log sensor configuration sensor_target = body_names_expr or shape_names_expr partner_filter = contact_partners_body_expr or contact_partners_shape_expr or "all bodies/shapes" logger.info(f"Adding contact sensor for {sensor_target} with filter {partner_filter}") - # Create unique key for this sensor - sensor_key = (body_names_expr, shape_names_expr, contact_partners_body_expr, contact_partners_shape_expr) + # Create unique key from the original expressions + def _as_key(val): + return tuple(val) if isinstance(val, list) else val + + sensor_key = ( + _as_key(body_names_expr), + _as_key(shape_names_expr), + _as_key(contact_partners_body_expr), + _as_key(contact_partners_shape_expr), + ) # Create and store the sensor # Note: SensorContact constructor requests 'force' attribute from the model - newton_sensor = NewtonContactSensor( + newton_sensor = SensorContact( cls._model, - sensing_obj_bodies=body_names_expr, - sensing_obj_shapes=shape_names_expr, - counterpart_bodies=contact_partners_body_expr, - counterpart_shapes=contact_partners_shape_expr, - match_fn=flipped_match, + sensing_obj_bodies=body_indices, + sensing_obj_shapes=shape_indices, + counterpart_bodies=partner_body_indices, + counterpart_shapes=partner_shape_indices, include_total=True, prune_noncolliding=prune_noncolliding, verbose=verbose, @@ -501,8 +650,32 @@ def add_contact_sensor( # Regenerate contacts only if they were already created without force attribute # If solver is not initialized, contacts will be created with force in initialize_solver() if cls._solver is not None and cls._contacts is not None: - # Only regenerate if contacts don't have force attribute (sensor.eval() requires it) + # Only regenerate if contacts don't have force attribute (sensor.update() requires it) if cls._contacts.force is None: cls._initialize_contacts() return sensor_key + + @classmethod + def add_frame_transform_sensor(cls, shapes: list[int], reference_sites: list[int]) -> int: + """Add a frame transform sensor for measuring relative transforms. + + Creates a :class:`SensorFrameTransform` from pre-resolved shape and reference + site indices, appends it to the internal list, and returns its index. + + Args: + shapes: Ordered list of shape indices to measure. + reference_sites: 1:1 list of reference site indices (same length as shapes). + + Returns: + Index of the newly created sensor in :attr:`_newton_frame_transform_sensors`. + """ + sensor = SensorFrameTransform( + cls._model, + shapes=shapes, + reference_sites=reference_sites, + ) + idx = len(cls._newton_frame_transform_sensors) + cls._newton_frame_transform_sensors.append(sensor) + logger.info(f"Added frame transform sensor (index={idx}, shapes={len(shapes)})") + return idx diff --git a/source/isaaclab_newton/isaaclab_newton/physics/newton_manager_cfg.py b/source/isaaclab_newton/isaaclab_newton/physics/newton_manager_cfg.py index f8b1a05ef17c..1c8a9a70b237 100644 --- a/source/isaaclab_newton/isaaclab_newton/physics/newton_manager_cfg.py +++ b/source/isaaclab_newton/isaaclab_newton/physics/newton_manager_cfg.py @@ -71,12 +71,6 @@ class MJWarpSolverCfg(NewtonSolverCfg): disable_contacts: bool = False """Whether to disable contact computation in MuJoCo.""" - default_actuator_gear: float | None = None - """Default gear ratio for all actuators.""" - - actuator_gears: dict[str, float] | None = None - """Dictionary mapping joint names to specific gear ratios, overriding the `default_actuator_gear`.""" - update_data_interval: int = 1 """Frequency (in simulation steps) at which to update the MuJoCo Data object from the Newton state. diff --git a/source/isaaclab_newton/isaaclab_newton/sensors/__init__.py b/source/isaaclab_newton/isaaclab_newton/sensors/__init__.py index 4fc8a58b14f1..d371579ae4d3 100644 --- a/source/isaaclab_newton/isaaclab_newton/sensors/__init__.py +++ b/source/isaaclab_newton/isaaclab_newton/sensors/__init__.py @@ -36,3 +36,4 @@ """ from .contact_sensor import * # noqa: F401, F403 +from .frame_transformer import * # noqa: F401, F403 diff --git a/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/__init__.py b/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/__init__.py index f6dda9e595af..bff112831d27 100644 --- a/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/__init__.py +++ b/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/__init__.py @@ -6,9 +6,11 @@ """Sub-module for contact sensor based on :class:`newton.SensorContact`.""" from .contact_sensor import ContactSensor +from .contact_sensor_cfg import NewtonContactSensorCfg from .contact_sensor_data import ContactSensorData __all__ = [ "ContactSensor", "ContactSensorData", + "NewtonContactSensorCfg", ] diff --git a/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/contact_sensor.py b/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/contact_sensor.py index bc07a5018a88..7d4956343fc8 100644 --- a/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/contact_sensor.py +++ b/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/contact_sensor.py @@ -14,11 +14,9 @@ import warp as wp from isaaclab_newton.physics import NewtonManager -from newton.sensors import MatchKind from newton.sensors import SensorContact as NewtonContactSensor import isaaclab.utils.string as string_utils -from isaaclab.markers import VisualizationMarkers from isaaclab.sensors.contact_sensor.base_contact_sensor import BaseContactSensor from isaaclab.utils.helpers import deprecated @@ -33,6 +31,8 @@ if TYPE_CHECKING: from isaaclab.sensors.contact_sensor.contact_sensor_cfg import ContactSensorCfg + from .contact_sensor_cfg import NewtonContactSensorCfg + logger = logging.getLogger(__name__) @@ -49,16 +49,26 @@ class ContactSensor(BaseContactSensor): .. _Newton SensorContact: https://newton-physics.github.io/newton/api/_generated/newton.sensors.SensorContact.html """ - cfg: ContactSensorCfg + cfg: NewtonContactSensorCfg """The configuration parameters.""" - def __init__(self, cfg: ContactSensorCfg): + def __init__(self, cfg: ContactSensorCfg | NewtonContactSensorCfg): """Initializes the contact sensor object. Args: cfg: The configuration parameters. """ - # initialize base class + from isaaclab.sensors.contact_sensor.contact_sensor_cfg import ContactSensorCfg + + from .contact_sensor_cfg import NewtonContactSensorCfg + + if isinstance(cfg, NewtonContactSensorCfg): + pass + elif isinstance(cfg, ContactSensorCfg): + cfg = NewtonContactSensorCfg.from_base_cfg(cfg) + else: + raise TypeError(f"Invalid config: {cfg}") + super().__init__(cfg) # Create empty variables for storing output data @@ -100,6 +110,7 @@ def sensor_names(self) -> list[str] | None: @property def filter_object_names(self) -> list[str] | None: + """Ordered names of filter objects (counterparts) for contact filtering.""" return self._filter_object_names @property @@ -186,9 +197,10 @@ def compute_first_contact(self, dt: float, abs_tol: float = 1.0e-8) -> wp.array: abs_tol: The absolute tolerance for the comparison. Returns: - A boolean array indicating the sensors that have established contact within the last - :attr:`dt` seconds. Shape is (N, S), where N is the number of environments and S is the - number of sensors. + A float array (1.0/0.0) indicating the sensors that have established contact within the + last :attr:`dt` seconds. Shape is (N, S), where N is the number of environments and S is + the number of sensors. The returned array is a shared internal buffer; it is invalidated + by the next call to :meth:`compute_first_contact` or :meth:`compute_first_air`. Raises: RuntimeError: If the sensor is not configured to track contact time. @@ -226,8 +238,10 @@ def compute_first_air(self, dt: float, abs_tol: float = 1.0e-8) -> wp.array: abs_tol: The absolute tolerance for the comparison. Returns: - A boolean array indicating the sensors that have broken contact within the last :attr:`dt` seconds. - Shape is (N, S), where N is the number of environments and S is the number of sensors. + A float array (1.0/0.0) indicating the sensors that have broken contact within the last + :attr:`dt` seconds. Shape is (N, S), where N is the number of environments and S is the + number of sensors. The returned array is a shared internal buffer; it is invalidated by + the next call to :meth:`compute_first_contact` or :meth:`compute_first_air`. Raises: RuntimeError: If the sensor is not configured to track contact time. @@ -253,42 +267,21 @@ def compute_first_air(self, dt: float, abs_tol: float = 1.0e-8) -> wp.array: """ def _initialize_impl(self): - super()._initialize_impl() """Initializes the sensor-related handles and internal buffers.""" - # construct regex expression for the sensor names + super()._initialize_impl() - if self.cfg.filter_prim_paths_expr is not None or self.cfg.filter_shape_paths_expr is not None: - self._generate_force_matrix = True - else: - self._generate_force_matrix = False + if self.cfg.force_threshold is None: + self.cfg.force_threshold = 0.0 - sensor_body_regex = self.cfg.prim_path - if self.cfg.shape_path is not None: - sensor_shape_regex = "(" + "|".join(self.cfg.shape_path) + ")" - else: - sensor_shape_regex = None - if self.cfg.filter_prim_paths_expr is not None: - filter_object_body_regex = "(" + "|".join(self.cfg.filter_prim_paths_expr) + ")" - else: - filter_object_body_regex = None - if self.cfg.filter_shape_paths_expr is not None: - filter_object_shape_regex = "(" + "|".join(self.cfg.filter_shape_paths_expr) + ")" - else: - filter_object_shape_regex = None - - # Store the sensor key for later lookup - self._sensor_key = ( - sensor_body_regex, - sensor_shape_regex, - filter_object_body_regex, - filter_object_shape_regex, + self._generate_force_matrix = ( + self.cfg.filter_prim_paths_expr is not None or self.cfg.filter_shape_prim_expr is not None ) - NewtonManager.add_contact_sensor( - body_names_expr=sensor_body_regex, - shape_names_expr=sensor_shape_regex, - contact_partners_body_expr=filter_object_body_regex, - contact_partners_shape_expr=filter_object_shape_regex, + self._sensor_key = NewtonManager.add_contact_sensor( + body_names_expr=self.cfg.prim_path if self.cfg.sensor_shape_prim_expr is None else None, + shape_names_expr=self.cfg.sensor_shape_prim_expr, + contact_partners_body_expr=self.cfg.filter_prim_paths_expr, + contact_partners_shape_expr=self.cfg.filter_shape_prim_expr, prune_noncolliding=True, ) @@ -312,10 +305,10 @@ def _create_buffers(self): # Assume homogeneous envs, i.e. all envs have the same number of sensors # Only get the names for the first env. Expected structure: /World/envs/env_.*/... def get_name(idx, match_kind): - if match_kind == MatchKind.BODY: - return NewtonManager._model.body_key[idx].split("/")[-1] - if match_kind == MatchKind.SHAPE: - return NewtonManager._model.shape_key[idx].split("/")[-1] + if match_kind == NewtonContactSensor.MatchKind.BODY: + return NewtonManager._model.body_label[idx].split("/")[-1] + if match_kind == NewtonContactSensor.MatchKind.SHAPE: + return NewtonManager._model.shape_label[idx].split("/")[-1] return "MATCH_ANY" self._sensor_names = [get_name(idx, kind) for idx, kind in self.contact_view.sensing_objs] @@ -399,37 +392,9 @@ def _update_buffers_impl(self, env_mask: wp.array): # pose[..., 3:] = convert_quat(pose[..., 3:], to="wxyz") # self._data.pos_w[env_ids], self._data.quat_w[env_ids] = pose.split([3, 4], dim=-1) - def _set_debug_vis_impl(self, debug_vis: bool): - # set visibility of markers - # note: parent only deals with callbacks. not their visibility - if debug_vis: - # create markers if necessary for the first tome - if not hasattr(self, "contact_visualizer"): - self.contact_visualizer = VisualizationMarkers(self.cfg.visualizer_cfg) - # set their visibility to true - self.contact_visualizer.set_visibility(True) - else: - if hasattr(self, "contact_visualizer"): - self.contact_visualizer.set_visibility(False) - def _debug_vis_callback(self, event): # safely return if view becomes invalid return - # note: this invalidity happens because of isaac sim view callbacks - # if self.body_physx_view is None: - # return - # marker indices - # 0: contact, 1: no contact - # net_contact_force_w = torch.norm(self._data.net_forces_w, dim=-1) - # marker_indices = torch.where(net_contact_force_w > self.cfg.force_threshold, 0, 1) - # check if prim is visualized - # if self.cfg.track_pose: - # frame_origins: torch.Tensor = self._data.pos_w - # else: - # pose = self.body_physx_view.get_transforms() - # frame_origins = pose.view(-1, self._num_sensors, 7)[:, :, :3] - # visualize - # self.contact_visualizer.visualize(frame_origins.view(-1, 3), marker_indices=marker_indices.view(-1)) """ Internal simulation callbacks. diff --git a/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/contact_sensor_cfg.py b/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/contact_sensor_cfg.py new file mode 100644 index 000000000000..4aadb33d115b --- /dev/null +++ b/source/isaaclab_newton/isaaclab_newton/sensors/contact_sensor/contact_sensor_cfg.py @@ -0,0 +1,118 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +import warnings + +from isaaclab.sensors.contact_sensor.contact_sensor_cfg import ContactSensorCfg +from isaaclab.utils import configclass + +from .contact_sensor import ContactSensor + + +@configclass +class NewtonContactSensorCfg(ContactSensorCfg): + """Configuration for the Newton contact sensor with shape-level support. + + Extends :class:`ContactSensorCfg` with shape-level fields for finer-grained + contact reporting. A body is a rigid body; a shape is an individual collision geometry + attached to a body. A single body can have multiple shapes. + + Sensing objects (what to measure forces on): + + - :attr:`sensor_body_prim_expr` — read-only alias for :attr:`prim_path` (body-level sensing). + - :attr:`sensor_shape_prim_expr` — optional shape-level sensing. If set, takes + precedence over :attr:`prim_path`. + + Filter partners (what to measure forces against): + + - :attr:`filter_prim_paths_expr` — body-level filter (inherited from :class:`ContactSensorCfg`). + - :attr:`filter_shape_prim_expr` — shape-level filter. + + An instance can be created from an existing :class:`ContactSensorCfg` via + :meth:`from_base_cfg`. + """ + + class_type: type = ContactSensor + + @property + def sensor_body_prim_expr(self) -> str: + """Read-only alias for :attr:`prim_path`.""" + return self.prim_path + + sensor_shape_prim_expr: list[str] | None = None + """List of shape prim path expressions for shape-level contact sensing. + Defaults to None, meaning sensing is at the body level (via :attr:`prim_path`). + + Mutually exclusive with body-level sensing: if set, :attr:`prim_path` is ignored + for the sensing objects and these shape expressions are used instead. + + .. note:: + Expressions can contain the environment namespace regex ``{ENV_REGEX_NS}``, which + is replaced with the environment namespace. + + Example: ``{ENV_REGEX_NS}/Robot/fingertip_.*`` becomes ``/World/envs/env_.*/Robot/fingertip_.*``. + """ + + filter_shape_prim_expr: list[str] | None = None + """List of shape prim path expressions to filter contacts against at the shape level. + Defaults to None, meaning filter partners are resolved at the body level only + (via :attr:`ContactSensorCfg.filter_prim_paths_expr`). + + If provided, the force matrix reports per-shape contact forces between the sensing + primitives and the filter shapes. + + Mutually exclusive with :attr:`ContactSensorCfg.filter_prim_paths_expr`; only one + must be set. + + .. note:: + Expressions can contain the environment namespace regex ``{ENV_REGEX_NS}``, which + is replaced with the environment namespace. + + Example: ``{ENV_REGEX_NS}/Object`` becomes ``/World/envs/env_.*/Object``. + """ + + def __post_init__(self): + if self.track_contact_points: + warnings.warn( + "NewtonContactSensorCfg: 'track_contact_points' is not supported by the Newton backend. Ignoring.", + stacklevel=2, + ) + self.track_contact_points = False + + if self.max_contact_data_count_per_prim is not None: + warnings.warn( + "NewtonContactSensorCfg: 'max_contact_data_count_per_prim' is not supported by the Newton" + " backend. Ignoring.", + stacklevel=2, + ) + self.max_contact_data_count_per_prim = None + + if self.track_friction_forces: + warnings.warn( + "NewtonContactSensorCfg: 'track_friction_forces' is not supported by the Newton backend. Ignoring.", + stacklevel=2, + ) + self.track_friction_forces = False + + @classmethod + def from_base_cfg(cls, base_cfg: ContactSensorCfg, **kwargs) -> "NewtonContactSensorCfg": + """Creates a :class:`NewtonContactSensorCfg` from an existing :class:`ContactSensorCfg`. + + Args: + base_cfg: The base contact sensor configuration to copy from. + **kwargs: Newton-specific fields, e.g. ``filter_shape_prim_expr=["fingertip_.*"]``. + + Returns: + A new :class:`NewtonContactSensorCfg` instance. + + Raises: + ValueError: If ``class_type`` is passed in keyword arguments. + """ + if "class_type" in kwargs: + raise ValueError("Cannot override 'class_type' via from_base_cfg.") + base_fields = { + field: getattr(base_cfg, field) for field in base_cfg.__dataclass_fields__ if field != "class_type" + } + return cls(**base_fields, **kwargs) diff --git a/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/__init__.py b/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/__init__.py new file mode 100644 index 000000000000..13b522b6cc7d --- /dev/null +++ b/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +"""Sub-module for frame transformer sensor based on :class:`newton.sensors.SensorFrameTransform`.""" + +from .frame_transformer import FrameTransformer +from .frame_transformer_data import FrameTransformerData + +__all__ = [ + "FrameTransformer", + "FrameTransformerData", +] diff --git a/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/frame_transformer.py b/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/frame_transformer.py new file mode 100644 index 000000000000..baab85b13e0c --- /dev/null +++ b/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/frame_transformer.py @@ -0,0 +1,209 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from __future__ import annotations + +import logging +from collections.abc import Sequence +from typing import TYPE_CHECKING + +import warp as wp +from isaaclab_newton.physics import NewtonManager + +from isaaclab.sensors.frame_transformer.base_frame_transformer import BaseFrameTransformer +from isaaclab.utils.string import resolve_matching_names + +from .frame_transformer_data import FrameTransformerData +from .frame_transformer_kernels import compose_target_world_kernel, copy_from_newton_kernel + +if TYPE_CHECKING: + from isaaclab.sensors.frame_transformer.frame_transformer_cfg import FrameTransformerCfg, OffsetCfg + +logger = logging.getLogger(__name__) + + +def _offset_to_transform(offset: OffsetCfg) -> wp.transform: + """Convert an OffsetCfg (pos, rot_wxyz) to wp.transform (pos, quat_xyzw).""" + return wp.transform( + offset.pos[0], + offset.pos[1], + offset.pos[2], + offset.rot[1], + offset.rot[2], + offset.rot[3], + offset.rot[0], + ) + + +class FrameTransformer(BaseFrameTransformer): + """Newton frame transformer wrapping :class:`newton.sensors.SensorFrameTransform`. + + Creates per-env sites for the source and all target frames, backed by a single + :class:`SensorFrameTransform` with 1:1 shape/reference pairs: + + * Entry 0 per env — source site measured w.r.t. a world-origin site. + * Entries 1..M per env — target sites measured w.r.t. source site. + + Flat sensor output is indexed with stride ``1 + num_targets``: + ``[i * stride]`` is the source world transform, ``[i * stride + 1 + j]`` + is target *j* relative to source in env *i*. + """ + + cfg: FrameTransformerCfg + """The configuration parameters.""" + + def __init__(self, cfg: FrameTransformerCfg): + """Initializes the frame transformer. + + Registers site requests via :meth:`NewtonManager.cl_register_site` for + the source frame, each target frame, and a shared world-origin reference. + Sites are injected into prototype builders by ``newton_replicate`` before + replication, so they end up correctly in each world. + + Args: + cfg: Configuration parameters. + """ + # initialize base class (registers PHYSICS_READY callback for _initialize_impl) + super().__init__(cfg) + + self._data: FrameTransformerData = FrameTransformerData() + + self._sensor_index: int | None = None + self._source_frame_body_name: str = cfg.prim_path.rsplit("/", 1)[-1] + + # Register world-origin reference site + self._world_origin_label = NewtonManager.cl_register_site(None, wp.transform()) + + # Register source site + source_offset = _offset_to_transform(cfg.source_frame_offset) + self._source_label = NewtonManager.cl_register_site(cfg.prim_path, source_offset) + + # Register target sites + self._target_labels: list[str] = [] + self._target_frame_body_names: list[str] = [] + self._num_targets: int = 0 + + for tgt_idx, target_frame in enumerate(cfg.target_frames): + target_offset = _offset_to_transform(target_frame.offset) + label = NewtonManager.cl_register_site(target_frame.prim_path, target_offset) + + self._target_labels.append(label) + body_name = target_frame.prim_path.rsplit("/", 1)[-1] + self._target_frame_body_names.append(target_frame.name or body_name) + self._num_targets += 1 + + self._data.target_frame_names = [t.name or t.prim_path.rsplit("/", 1)[-1] for t in cfg.target_frames] + + logger.info( + f"FrameTransformer '{cfg.prim_path}': source='{self._source_frame_body_name}', " + f"{self._num_targets} target(s) registered" + ) + + """ + Properties + """ + + @property + def data(self) -> FrameTransformerData: + # update sensors if needed + self._update_outdated_buffers() + return self._data + + """ + Operations + """ + + def find_bodies(self, name_keys: str | Sequence[str], preserve_order: bool = False) -> tuple[list[int], list[str]]: + return resolve_matching_names(name_keys, self._data.target_frame_names, preserve_order) + + """ + Implementation + """ + + def _initialize_impl(self): + """PHYSICS_READY callback: resolves site indices and creates the SensorFrameTransform.""" + super()._initialize_impl() + + model = NewtonManager.get_model() + shape_labels = list(model.shape_label) + num_envs = self._num_envs + + # Resolve world-origin site + world_origin_indices, _ = resolve_matching_names(self._world_origin_label, shape_labels) + world_origin_idx = world_origin_indices[0] + + # Resolve source sites (one per env) + source_pattern = f"{self.cfg.prim_path}/{self._source_label}" + source_indices, _ = resolve_matching_names(source_pattern, shape_labels) + + # Resolve target sites (one per env per target) + target_indices_per_target: list[list[int]] = [] + for tgt_idx, target_frame in enumerate(self.cfg.target_frames): + target_pattern = f"{target_frame.prim_path}/{self._target_labels[tgt_idx]}" + indices, _ = resolve_matching_names(target_pattern, shape_labels) + target_indices_per_target.append(indices) + + # Build ordered 1:1 shape/reference index lists + # Layout per env: [source, target_0, target_1, ..., target_M-1] + shapes_list: list[int] = [] + references_list: list[int] = [] + + for env_idx in range(num_envs): + # Source site measured w.r.t. world origin + source_idx = source_indices[env_idx] + shapes_list.append(source_idx) + references_list.append(world_origin_idx) + + # Each target measured w.r.t. source + for tgt_idx in range(self._num_targets): + target_idx = target_indices_per_target[tgt_idx][env_idx] + shapes_list.append(target_idx) + references_list.append(source_idx) + + # Create SensorFrameTransform via NewtonManager + self._sensor_index = NewtonManager.add_frame_transform_sensor(shapes_list, references_list) + + # Store reference to Newton sensor's flat transforms array + sensor = NewtonManager._newton_frame_transform_sensors[self._sensor_index] + self._newton_transforms = sensor.transforms + self._data._stride = 1 + self._num_targets + + # Allocate owned buffers + self._data._create_buffers(num_envs, self._num_targets, self._device) + + logger.info( + f"FrameTransformer initialized: {num_envs} envs, " + f"{self._num_targets} targets, sensor_index={self._sensor_index}" + ) + + def _update_buffers_impl(self, env_mask: wp.array | None): + """Copies transforms from Newton sensor into owned buffers.""" + wp.launch( + copy_from_newton_kernel, + dim=(self._num_envs, 1 + self._num_targets), + inputs=[env_mask, self._newton_transforms, self._data._stride, self._num_targets], + outputs=[self._data._source_transforms, self._data._target_transforms], + device=self._device, + ) + + # Compose target world transforms: source_world * target_relative + if self._num_targets > 0: + wp.launch( + compose_target_world_kernel, + dim=(self._num_envs, self._num_targets), + inputs=[self._data._source_transforms, self._data._target_transforms], + outputs=[self._data._target_transforms_w], + device=self._device, + ) + + """ + Internal simulation callbacks. + """ + + def _invalidate_initialize_callback(self, event): + """Invalidates the scene elements.""" + super()._invalidate_initialize_callback(event) + self._newton_transforms = None + self._sensor_index = None diff --git a/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/frame_transformer_data.py b/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/frame_transformer_data.py new file mode 100644 index 000000000000..0f073b2a3f35 --- /dev/null +++ b/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/frame_transformer_data.py @@ -0,0 +1,84 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +from __future__ import annotations + +import warp as wp + +from isaaclab.sensors.frame_transformer.base_frame_transformer_data import BaseFrameTransformerData +from isaaclab.utils.warp.math_ops import transform_to_vec_quat + + +class FrameTransformerData(BaseFrameTransformerData): + """Data container for the Newton frame transformer sensor. + + Transform buffers are populated from the Newton sensor via + :func:`copy_from_newton_kernel`. + """ + + target_frame_names: list[str] | None + """Ordered target frame names, or ``None`` before initialisation.""" + + _source_transforms: wp.array | None + """Source world transforms — ``(num_envs,)`` array of ``wp.transformf``.""" + + _target_transforms: wp.array | None + """Target-relative transforms — ``(num_envs, num_targets)`` array of ``wp.transformf``.""" + + _target_transforms_w: wp.array | None + """Target world transforms — ``(num_envs, num_targets)`` array of ``wp.transformf``.""" + + _stride: int + """Entries per env in the Newton sensor flat array: ``1 + num_targets``.""" + + def __init__(self): + self.target_frame_names = None + self._source_transforms = None + self._target_transforms = None + self._target_transforms_w = None + self._stride = 0 + + def _create_buffers(self, num_envs: int, num_targets: int, device: str): + """Allocates transform buffers and zero-copy views.""" + self._source_transforms = wp.zeros(num_envs, dtype=wp.transformf, device=device) + self._target_transforms = wp.zeros((num_envs, num_targets), dtype=wp.transformf, device=device) + self._target_transforms_w = wp.zeros((num_envs, num_targets), dtype=wp.transformf, device=device) + + # Zero-copy views — auto-reflect kernel writes to underlying transforms + self._source_pos_w: wp.array # vec3f (num_envs,) + self._source_quat_w: wp.array # quatf (num_envs,) + self._source_pos_w, self._source_quat_w = transform_to_vec_quat(self._source_transforms) + + self._target_pos_source: wp.array # vec3f (num_envs, num_targets) + self._target_quat_source: wp.array # quatf (num_envs, num_targets) + self._target_pos_source, self._target_quat_source = transform_to_vec_quat(self._target_transforms) + + self._target_pos_w: wp.array # vec3f (num_envs, num_targets) + self._target_quat_w: wp.array # quatf (num_envs, num_targets) + self._target_pos_w, self._target_quat_w = transform_to_vec_quat(self._target_transforms_w) + + @property + def source_pos_w(self) -> wp.array: + return self._source_pos_w + + @property + def source_quat_w(self) -> wp.array: + return self._source_quat_w + + @property + def target_pos_source(self) -> wp.array: + return self._target_pos_source + + @property + def target_quat_source(self) -> wp.array: + return self._target_quat_source + + @property + def target_pos_w(self) -> wp.array: + return self._target_pos_w + + @property + def target_quat_w(self) -> wp.array: + return self._target_quat_w diff --git a/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/frame_transformer_kernels.py b/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/frame_transformer_kernels.py new file mode 100644 index 000000000000..9f8369694e1e --- /dev/null +++ b/source/isaaclab_newton/isaaclab_newton/sensors/frame_transformer/frame_transformer_kernels.py @@ -0,0 +1,55 @@ +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +# Ignore optional memory usage warning globally +# pyright: reportOptionalSubscript=false + +import warp as wp + + +@wp.kernel +def copy_from_newton_kernel( + # in + env_mask: wp.array(dtype=wp.bool), + newton_transforms: wp.array(dtype=wp.transformf), # flat (num_envs * stride,) + stride: int, + num_targets: int, + # outputs + source_transforms: wp.array(dtype=wp.transformf), # (num_envs,) + target_transforms: wp.array2d(dtype=wp.transformf), # (num_envs, num_targets) +): + """Copy frame transform data from Newton sensor into owned buffers. + + Deinterleaves the flat strided Newton output into separate source and target buffers. + Launch with dim=(num_envs, 1 + num_targets). + """ + env, idx = wp.tid() + + if env_mask: + if not env_mask[env]: + return + + t = newton_transforms[env * stride + idx] + + if idx == 0: + source_transforms[env] = t + else: + target_transforms[env, idx - 1] = t + + +@wp.kernel +def compose_target_world_kernel( + # in + source_transforms: wp.array(dtype=wp.transformf), # (num_envs,) + target_transforms: wp.array2d(dtype=wp.transformf), # (num_envs, num_targets) + # outputs + target_transforms_w: wp.array2d(dtype=wp.transformf), # (num_envs, num_targets) +): + """Compute target world transforms: source_world * target_relative. + + Launch with dim=(num_envs, num_targets). + """ + env, tgt = wp.tid() + target_transforms_w[env, tgt] = wp.transform_multiply(source_transforms[env], target_transforms[env, tgt]) diff --git a/source/isaaclab_newton/setup.py b/source/isaaclab_newton/setup.py index 867e9211a76a..ee443f4ec1e4 100644 --- a/source/isaaclab_newton/setup.py +++ b/source/isaaclab_newton/setup.py @@ -32,7 +32,7 @@ # newton "mujoco>=3.5.0", "mujoco-warp>=3.5.0", - "newton @ git+https://github.com/newton-physics/newton.git@51ce35e8def843377546764033edc33a0b479d65", + "newton @ git+https://github.com/newton-physics/newton.git@f999d593ad33cd7f004d5a5544ec995f4951719b", "imgui-bundle==1.92.0", "PyOpenGL-accelerate==3.1.10", ] diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/velocity_env_cfg.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/velocity_env_cfg.py index caf261f86ae8..a4816f9d4768 100644 --- a/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/velocity_env_cfg.py +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/locomotion/velocity/velocity_env_cfg.py @@ -65,7 +65,6 @@ class MySceneCfg(InteractiveSceneCfg): # sensors contact_forces = ContactSensorCfg( prim_path="{ENV_REGEX_NS}/Robot/.*", - filter_shape_paths_expr=None, # ["/World/ground/terrain/GroundPlane/CollisionPlane"], history_length=3, track_air_time=True, ) diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/cabinet/mdp/observations.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/cabinet/mdp/observations.py index c5d8c2df8ace..5dca90661f5f 100644 --- a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/cabinet/mdp/observations.py +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/cabinet/mdp/observations.py @@ -23,7 +23,7 @@ def rel_ee_object_distance(env: ManagerBasedRLEnv) -> torch.Tensor: ee_tf_data: FrameTransformerData = env.scene["ee_frame"].data object_data: ArticulationData = env.scene["object"].data - return wp.to_torch(object_data.root_pos_w) - ee_tf_data.target_pos_w[..., 0, :] + return wp.to_torch(object_data.root_pos_w) - wp.to_torch(ee_tf_data.target_pos_w)[..., 0, :] def rel_ee_drawer_distance(env: ManagerBasedRLEnv) -> torch.Tensor: @@ -31,13 +31,13 @@ def rel_ee_drawer_distance(env: ManagerBasedRLEnv) -> torch.Tensor: ee_tf_data: FrameTransformerData = env.scene["ee_frame"].data cabinet_tf_data: FrameTransformerData = env.scene["cabinet_frame"].data - return cabinet_tf_data.target_pos_w[..., 0, :] - ee_tf_data.target_pos_w[..., 0, :] + return wp.to_torch(cabinet_tf_data.target_pos_w)[..., 0, :] - wp.to_torch(ee_tf_data.target_pos_w)[..., 0, :] def fingertips_pos(env: ManagerBasedRLEnv) -> torch.Tensor: """The position of the fingertips relative to the environment origins.""" ee_tf_data: FrameTransformerData = env.scene["ee_frame"].data - fingertips_pos = ee_tf_data.target_pos_w[..., 1:, :] - env.scene.env_origins.unsqueeze(1) + fingertips_pos = wp.to_torch(ee_tf_data.target_pos_w)[..., 1:, :] - env.scene.env_origins.unsqueeze(1) return fingertips_pos.view(env.num_envs, -1) @@ -45,7 +45,7 @@ def fingertips_pos(env: ManagerBasedRLEnv) -> torch.Tensor: def ee_pos(env: ManagerBasedRLEnv) -> torch.Tensor: """The position of the end-effector relative to the environment origins.""" ee_tf_data: FrameTransformerData = env.scene["ee_frame"].data - ee_pos = ee_tf_data.target_pos_w[..., 0, :] - env.scene.env_origins + ee_pos = wp.to_torch(ee_tf_data.target_pos_w)[..., 0, :] - env.scene.env_origins return ee_pos @@ -56,6 +56,6 @@ def ee_quat(env: ManagerBasedRLEnv, make_quat_unique: bool = True) -> torch.Tens If :attr:`make_quat_unique` is True, the quaternion is made unique by ensuring the real part is positive. """ ee_tf_data: FrameTransformerData = env.scene["ee_frame"].data - ee_quat = ee_tf_data.target_quat_w[..., 0, :] + ee_quat = wp.to_torch(ee_tf_data.target_quat_w)[..., 0, :] # make first element of quaternion positive return math_utils.quat_unique(ee_quat) if make_quat_unique else ee_quat diff --git a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/cabinet/mdp/rewards.py b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/cabinet/mdp/rewards.py index 4601b51abe79..6c3cc025281f 100644 --- a/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/cabinet/mdp/rewards.py +++ b/source/isaaclab_tasks/isaaclab_tasks/manager_based/manipulation/cabinet/mdp/rewards.py @@ -30,8 +30,8 @@ def approach_ee_handle(env: ManagerBasedRLEnv, threshold: float) -> torch.Tensor \end{cases} """ - ee_tcp_pos = env.scene["ee_frame"].data.target_pos_w[..., 0, :] - handle_pos = env.scene["cabinet_frame"].data.target_pos_w[..., 0, :] + ee_tcp_pos = wp.to_torch(env.scene["ee_frame"].data.target_pos_w)[..., 0, :] + handle_pos = wp.to_torch(env.scene["cabinet_frame"].data.target_pos_w)[..., 0, :] # Compute the distance of the end-effector to the handle distance = torch.norm(handle_pos - ee_tcp_pos, dim=-1, p=2) @@ -54,8 +54,8 @@ def align_ee_handle(env: ManagerBasedRLEnv) -> torch.Tensor: where :math:`align_z` is the dot product of the z direction of the gripper and the -x direction of the handle and :math:`align_x` is the dot product of the x direction of the gripper and the -y direction of the handle. """ - ee_tcp_quat = env.scene["ee_frame"].data.target_quat_w[..., 0, :] - handle_quat = env.scene["cabinet_frame"].data.target_quat_w[..., 0, :] + ee_tcp_quat = wp.to_torch(env.scene["ee_frame"].data.target_quat_w)[..., 0, :] + handle_quat = wp.to_torch(env.scene["cabinet_frame"].data.target_quat_w)[..., 0, :] ee_tcp_rot_mat = matrix_from_quat(ee_tcp_quat) handle_mat = matrix_from_quat(handle_quat) @@ -80,9 +80,9 @@ def align_grasp_around_handle(env: ManagerBasedRLEnv) -> torch.Tensor: The correct hand orientation is when the left finger is above the handle and the right finger is below the handle. """ # Target object position: (num_envs, 3) - handle_pos = env.scene["cabinet_frame"].data.target_pos_w[..., 0, :] + handle_pos = wp.to_torch(env.scene["cabinet_frame"].data.target_pos_w)[..., 0, :] # Fingertips position: (num_envs, n_fingertips, 3) - ee_fingertips_w = env.scene["ee_frame"].data.target_pos_w[..., 1:, :] + ee_fingertips_w = wp.to_torch(env.scene["ee_frame"].data.target_pos_w)[..., 1:, :] lfinger_pos = ee_fingertips_w[..., 0, :] rfinger_pos = ee_fingertips_w[..., 1, :] @@ -100,9 +100,9 @@ def approach_gripper_handle(env: ManagerBasedRLEnv, offset: float = 0.04) -> tor (i.e., the left finger is above the handle and the right finger is below the handle). Otherwise, it returns zero. """ # Target object position: (num_envs, 3) - handle_pos = env.scene["cabinet_frame"].data.target_pos_w[..., 0, :] + handle_pos = wp.to_torch(env.scene["cabinet_frame"].data.target_pos_w)[..., 0, :] # Fingertips position: (num_envs, n_fingertips, 3) - ee_fingertips_w = env.scene["ee_frame"].data.target_pos_w[..., 1:, :] + ee_fingertips_w = wp.to_torch(env.scene["ee_frame"].data.target_pos_w)[..., 1:, :] lfinger_pos = ee_fingertips_w[..., 0, :] rfinger_pos = ee_fingertips_w[..., 1, :] @@ -127,8 +127,8 @@ def grasp_handle( Note: It is assumed that zero joint position corresponds to the fingers being closed. """ - ee_tcp_pos = env.scene["ee_frame"].data.target_pos_w[..., 0, :] - handle_pos = env.scene["cabinet_frame"].data.target_pos_w[..., 0, :] + ee_tcp_pos = wp.to_torch(env.scene["ee_frame"].data.target_pos_w)[..., 0, :] + handle_pos = wp.to_torch(env.scene["cabinet_frame"].data.target_pos_w)[..., 0, :] gripper_joint_pos = wp.to_torch(env.scene[asset_cfg.name].data.joint_pos)[:, asset_cfg.joint_ids] distance = torch.norm(handle_pos - ee_tcp_pos, dim=-1, p=2)