diff --git a/src/python/pose_format/numpy/pose_body.py b/src/python/pose_format/numpy/pose_body.py index 38fed36..60d60ea 100644 --- a/src/python/pose_format/numpy/pose_body.py +++ b/src/python/pose_format/numpy/pose_body.py @@ -128,6 +128,11 @@ def write(self, version: float, buffer: BinaryIO): buffer.write(np.array(self.data.data, dtype=np.float32).tobytes()) buffer.write(np.array(self.confidence, dtype=np.float32).tobytes()) + def copy(self) -> 'NumPyPoseBody': + return type(self)(fps=self.fps, + data=self.data.copy(), + confidence=self.confidence.copy()) + @property def mask(self): """ Returns mask associated with data. """ @@ -181,8 +186,9 @@ def zero_filled(self): NumPyPoseBody changed pose body data. """ - self.data = ma.array(self.data.filled(0), mask=self.data.mask) - return self + copy = self.copy() + copy.data = ma.array(copy.data.filled(0), mask=copy.data.mask) + return copy def matmul(self, matrix: np.ndarray): """ diff --git a/src/python/pose_format/pose.py b/src/python/pose_format/pose.py index 06dbfab..5031f64 100644 --- a/src/python/pose_format/pose.py +++ b/src/python/pose_format/pose.py @@ -202,6 +202,28 @@ def frame_dropout_normal(self, dropout_mean: float = 0.5, dropout_std: float = 0 """ body, selected_indexes = self.body.frame_dropout_normal(dropout_mean=dropout_mean, dropout_std=dropout_std) return Pose(header=self.header, body=body), selected_indexes + + + def remove_components(self, components_to_remove: Union[str, List[str]], points_to_remove: Union[Dict[str, List[str]],None] = None): + + if isinstance(components_to_remove, str): + components_to_remove = [components_to_remove] + + components_to_keep = [] + points_dict = {} + + for component in self.header.components: + if component.name not in components_to_remove: + components_to_keep.append(component.name) + points_dict[component.name] = [] + if points_to_remove is not None: + for point in component.points: + if point not in points_to_remove[component.name]: + points_dict[component.name].append(point) + + return self.get_components(components_to_keep, points_dict) + + def get_components(self, components: List[str], points: Union[Dict[str, List[str]],None] = None): """ @@ -253,6 +275,10 @@ def get_components(self, components: List[str], points: Union[Dict[str, List[str new_body = self.body.get_points(flat_indexes) return Pose(header=new_header, body=new_body) + + + def copy(self): + return self.__class__(self.header, self.body.copy()) def bbox(self): """ diff --git a/src/python/pose_format/pose_body.py b/src/python/pose_format/pose_body.py index ca0a0da..32d99a7 100644 --- a/src/python/pose_format/pose_body.py +++ b/src/python/pose_format/pose_body.py @@ -1,8 +1,9 @@ +import math from random import sample -from typing import BinaryIO, List, Tuple +from typing import BinaryIO, List, Tuple, Optional import numpy as np -import math + from pose_format.pose_header import PoseHeader from pose_format.utils.reader import BufferReader, ConstStructs @@ -60,9 +61,9 @@ def read(cls, header: PoseHeader, reader: BufferReader, **kwargs) -> "PoseBody": if header.version == 0: return cls.read_v0_0(header, reader, **kwargs) - elif round(header.version, 3) == 0.1: + if round(header.version, 3) == 0.1: return cls.read_v0_1(header, reader, **kwargs) - elif round(header.version, 3) == 0.2: + if round(header.version, 3) == 0.2: return cls.read_v0_2(header, reader, **kwargs) raise NotImplementedError("Unknown version - %f" % header.version) @@ -93,8 +94,8 @@ def read_v0_1_frames(cls, frames: int, shape: List[int], reader: BufferReader, - start_frame: int = None, - end_frame: int = None): + start_frame: Optional[int] = None, + end_frame: Optional[int] = None): """ Reads frame data for version 0.1 from a buffer. @@ -149,8 +150,8 @@ def read_v0_1_frames(cls, def read_v0_1(cls, header: PoseHeader, reader: BufferReader, - start_frame: int = None, - end_frame: int = None, + start_frame: Optional[int] = None, + end_frame: Optional[int] = None, **unused_kwargs) -> "PoseBody": """ Reads pose data for version 0.1 from a buffer. @@ -176,7 +177,7 @@ def read_v0_1(cls, fps, _frames = reader.unpack(ConstStructs.double_ushort) _people = reader.unpack(ConstStructs.ushort) - _points = sum([len(c.points) for c in header.components]) + _points = sum(len(c.points) for c in header.components) _dims = header.num_dims() # _frames is defined as short, which sometimes is not enough! TODO change to int @@ -191,10 +192,10 @@ def read_v0_1(cls, def read_v0_2(cls, header: PoseHeader, reader: BufferReader, - start_frame: int = None, - end_frame: int = None, - start_time: int = None, - end_time: int = None, + start_frame: Optional[int] = None, + end_frame: Optional[int] = None, + start_time: Optional[int] = None, + end_time: Optional[int] = None, **unused_kwargs) -> "PoseBody": """ Reads pose data for version 0.2 from a buffer. @@ -256,6 +257,11 @@ def write(self, version: float, buffer: BinaryIO): Buffer to write the pose data to. """ raise NotImplementedError("'write' not implemented on '%s'" % self.__class__) + + def copy(self)->"PoseBody": + return self.__class__(fps=self.fps, + data=self.data, + confidence=self.confidence) def __getitem__(self, index): """ @@ -306,7 +312,7 @@ def torch(self): Raises ------ NotImplementedError - If toch is not implemented. + If torch is not implemented. """ raise NotImplementedError("'torch' not implemented on '%s'" % self.__class__) @@ -474,7 +480,7 @@ def get_points(self, indexes: List[int]) -> __qualname__: Returns ------- PoseBody - PoseBody instance containing only choosen points. + PoseBody instance containing only chosen points. Raises ------ diff --git a/src/python/pose_format/tensorflow/pose_body.py b/src/python/pose_format/tensorflow/pose_body.py index b804cf8..c7bd282 100644 --- a/src/python/pose_format/tensorflow/pose_body.py +++ b/src/python/pose_format/tensorflow/pose_body.py @@ -17,7 +17,7 @@ class TensorflowPoseBody(PoseBody): """ Representation of pose body data, optimized for TensorFlow operations. - * Inherites from PoseBody + * Inherits from PoseBody Parameters ---------- @@ -43,10 +43,11 @@ def __init__(self, fps: float, data: Union[MaskedTensor, tf.Tensor], confidence: super().__init__(fps, data, confidence) - def zero_filled(self): + def zero_filled(self) -> 'TensorflowPoseBody': """Return an instance with zero-filled data.""" - self.data = self.data.zero_filled() - return self + copy = self.copy() + copy.data = self.data.zero_filled() + return copy def select_frames(self, frame_indexes: List[int]): """ @@ -152,6 +153,17 @@ def points_perspective(self) -> MaskedTensor: """ return self.data.transpose(perm=POINTS_DIMS) + def copy(self) -> 'TensorflowPoseBody': + # Ensure copies are fully detached from the TF computation graph by round-trip through numpy + detached_data = tf.convert_to_tensor(self.data.tensor.numpy()) + detached_mask = tf.convert_to_tensor(self.data.mask.numpy()) + data_copy = MaskedTensor(detached_data, detached_mask) + confidence_copy = tf.convert_to_tensor(self.confidence.numpy()) + return self.__class__( + fps=self.fps, + data=data_copy, + confidence=confidence_copy) + def get_points(self, indexes: List[int]): """ Gets and returns points from pose data based on indexes diff --git a/src/python/pose_format/torch/pose_body.py b/src/python/pose_format/torch/pose_body.py index a062cea..7038d8f 100644 --- a/src/python/pose_format/torch/pose_body.py +++ b/src/python/pose_format/torch/pose_body.py @@ -4,8 +4,6 @@ import torch from ..pose_body import POINTS_DIMS, PoseBody -from ..pose_header import PoseHeader -from ..utils.reader import BufferReader from .masked.tensor import MaskedTensor @@ -28,10 +26,21 @@ def __init__(self, fps: float, data: Union[MaskedTensor, torch.Tensor], confiden super().__init__(fps, data, confidence) def cuda(self): - """Move data and cofidence of tensors to GPU""" + """Move data and confidence of tensors to GPU""" self.data = self.data.cuda() self.confidence = self.confidence.cuda() + def copy(self) -> 'TorchPoseBody': + data_copy = MaskedTensor(tensor=self.data.tensor.detach().clone().to(self.data.tensor.device), + mask=self.data.mask.detach().clone().to(self.data.mask.device), + ) + confidence_copy = self.confidence.detach().clone().to(self.confidence.device) + + return self.__class__(fps=self.fps, + data=data_copy, + confidence=confidence_copy) + + def zero_filled(self) -> 'TorchPoseBody': """ Fill invalid values with zeros. @@ -42,8 +51,9 @@ def zero_filled(self) -> 'TorchPoseBody': TorchPoseBody instance with masked data filled with zeros. """ - self.data.zero_filled() - return self + copy = self.copy() + copy.data = copy.data.zero_filled() + return copy def matmul(self, matrix: np.ndarray) -> 'TorchPoseBody': """ @@ -120,3 +130,6 @@ def flatten(self): scalar = torch.ones(len(shape) + shape[-1], device=data.device) scalar[0] = 1 / self.fps return flat * scalar + + + diff --git a/src/python/tests/pose_test.py b/src/python/tests/pose_test.py index 22f7526..897a0dc 100644 --- a/src/python/tests/pose_test.py +++ b/src/python/tests/pose_test.py @@ -6,13 +6,17 @@ import numpy as np import numpy.ma as ma import tensorflow as tf +import torch from pose_format.numpy.pose_body import NumPyPoseBody from pose_format.pose import Pose from pose_format.pose_header import (PoseHeader, PoseHeaderComponent, PoseHeaderDimensions) -from pose_format.tensorflow.masked.tensor import MaskedTensor +from pose_format.tensorflow.masked.tensor import MaskedTensor as TensorflowMaskedTensor +from pose_format.torch.masked import MaskedTensor as TorchMaskedTensor from pose_format.tensorflow.pose_body import TensorflowPoseBody +from pose_format.torch.pose_body import TorchPoseBody + def _create_pose_header_component(name: str, num_keypoints: int) -> PoseHeaderComponent: @@ -179,6 +183,77 @@ def _create_random_numpy_data(frames_min: Optional[int] = None, return tensor, mask, confidence +def _create_random_torch_data(frames_min: Optional[int] = None, + frames_max: Optional[int] = None, + num_frames: Optional[int] = None, + num_keypoints: int = 137, + num_dimensions: int = 2) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + Creates random PyTorch data for testing. + + Parameters + ---------- + frames_min : Optional[int], default=None + Minimum number of frames for random generation if `num_frames` is not specified. + frames_max : Optional[int], default=None + Maximum number of frames for random generation if `num_frames` is not specified. + num_frames : Optional[int], default=None + Specific number of frames. + num_keypoints : int, default=137 + Number of keypoints in the pose data. + num_dimensions : int, default=2 + Number of dimensions in the pose data. + + Returns + ------- + Tuple[torch.Tensor, torch.Tensor, torch.Tensor] + Random tensor data, mask, and confidence values. + """ + if num_frames is None: + assert None not in [frames_min, frames_max] + num_frames = np.random.randint(frames_min, frames_max + 1) + else: + assert frames_min is None and frames_max is None + + # Avoid a mean of zero to test certain pose methods + tensor = torch.randn((num_frames, 1, num_keypoints, num_dimensions)) + 1.0 # Shape: (Frames, People, Points, Dims) + + confidence = torch.rand((num_frames, 1, num_keypoints), dtype=torch.float32) # Shape: (Frames, People, Points) + + mask = torch.randint(0, 2, (num_frames, 1, num_keypoints, num_dimensions), dtype=torch.bool) # Bool mask + + return tensor, mask, confidence + + +def _get_random_pose_object_with_torch_posebody(num_keypoints: int, frames_min: int = 1, frames_max: int = 10) -> Pose: + """ + Generates a random Pose object with PyTorch pose body for testing. + + Parameters + ---------- + num_keypoints : int + Number of keypoints in the pose data. + frames_min : int, default=1 + Minimum number of frames for random generation. + frames_max : int, default=10 + Maximum number of frames for random generation. + + Returns + ------- + Pose + Randomly generated Pose object. + """ + + tensor, mask, confidence = _create_random_torch_data(frames_min=frames_min, + frames_max=frames_max, + num_keypoints=num_keypoints) + + masked_tensor = TorchMaskedTensor(tensor=tensor, mask=mask) + body = TorchPoseBody(fps=10, data=masked_tensor, confidence=confidence) + + header = _create_pose_header(width=10, height=7, depth=0, num_components=3, num_keypoints=num_keypoints) + + return Pose(header=header, body=body) def _get_random_pose_object_with_tf_posebody(num_keypoints: int, frames_min: int = 1, frames_max: int = 10) -> Pose: """ @@ -203,7 +278,7 @@ def _get_random_pose_object_with_tf_posebody(num_keypoints: int, frames_min: int frames_max=frames_max, num_keypoints=num_keypoints) - masked_tensor = MaskedTensor(tensor=tensor, mask=mask) + masked_tensor = TensorflowMaskedTensor(tensor=tensor, mask=mask) body = TensorflowPoseBody(fps=10, data=masked_tensor, confidence=confidence) header = _create_pose_header(width=10, height=7, depth=0, num_components=3, num_keypoints=num_keypoints) @@ -255,6 +330,7 @@ def test_pose_object_should_be_callable(self): assert callable(Pose) + class TestPoseTensorflowPoseBody(TestCase): """ Tests for Pose objects containing TensorFlow PoseBody data. @@ -399,6 +475,39 @@ def create_pose_and_frame_dropout_uniform(example: tf.Tensor) -> tf.Tensor: return example dataset.map(create_pose_and_frame_dropout_uniform) + + + def test_pose_tf_posebody_copy_creates_deepcopy(self): + pose = _get_random_pose_object_with_tf_posebody(num_keypoints=5) + self.assertIsInstance(pose.body, TensorflowPoseBody) + self.assertIsInstance(pose.body.data, TensorflowMaskedTensor) + + pose_copy = pose.copy() + self.assertIsInstance(pose_copy.body, TensorflowPoseBody) + self.assertIsInstance(pose_copy.body.data, TensorflowMaskedTensor) + + # Check that pose and pose_copy are not the same object + self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") + + # Ensure the data tensors are equal but independent + self.assertTrue(tf.reduce_all(pose.body.data == pose_copy.body.data), "Copy's data should match original") + + # Modify original pose's data and check that copy remains unchanged + pose.body.data = TensorflowMaskedTensor(tf.zeros_like(pose.body.data.tensor), tf.zeros_like(pose.body.data.mask)) + + self.assertFalse(tf.reduce_all(pose.body.data == pose_copy.body.data), "Copy's data should not match original after original is replaced") + + # Create another copy and ensure it matches the first copy + pose = pose_copy.copy() + + self.assertTrue(tf.reduce_all(pose.body.data == pose_copy.body.data), "Copy's data should match original again") + + # Modify the copy and check that the original remains unchanged + pose_copy.body.data.tensor = tf.zeros(pose_copy.body.data.tensor.shape) + + self.assertFalse(tf.reduce_all(pose.body.data == pose_copy.body.data), "Copy's data should not match original after copy is modified") + + class TestPoseNumpyPoseBody(TestCase): @@ -445,3 +554,67 @@ def test_pose_numpy_posebody_frame_dropout_uniform_eager_mode_num_frames_not_zer num_frames = pose_after_dropout.body.data.shape[0] self.assertNotEqual(num_frames, 0, "Number of frames after dropout can never be 0.") + + def test_pose_numpy_posebody_copy_creates_deepcopy(self): + + pose = _get_random_pose_object_with_numpy_posebody(num_keypoints=5, frames_min=3) + + pose_copy = pose.copy() + + self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") + + self.assertTrue(np.array_equal(pose.body.data, pose_copy.body.data), "Copy's data should match original") + + pose.body.data = ma.zeros(pose.body.data.shape) + + self.assertFalse(np.array_equal(pose.body.data, pose_copy.body.data), "Copy's data should not match original after original is replaced") + + pose = pose_copy.copy() + + self.assertTrue(np.array_equal(pose.body.data, pose_copy.body.data), "Copy's data should match original again") + + pose_copy.body.data[:] = 3.14 + + self.assertFalse(np.array_equal(pose.body.data, pose_copy.body.data), "Copy's data should not match original after copy is modified") + + + + +class TestPoseTorchPoseBody(TestCase): + + def test_pose_torch_posebody_copy_tensors_detached(self): + pose = _get_random_pose_object_with_torch_posebody(num_keypoints=5) + pose_copy = pose.copy() + + self.assertFalse(pose.body.data.data.requires_grad, "Copied data should be detached from computation graph") + self.assertFalse(pose_copy.body.data.mask.requires_grad, "Copied mask should be detached from computation graph") + + def test_pose_torch_posebody_copy_creates_deepcopy(self): + pose = _get_random_pose_object_with_torch_posebody(num_keypoints=5) + self.assertIsInstance(pose.body, TorchPoseBody) + self.assertIsInstance(pose.body.data, TorchMaskedTensor) + + + pose_copy = pose.copy() + self.assertIsInstance(pose_copy.body, TorchPoseBody) + self.assertIsInstance(pose_copy.body.data, TorchMaskedTensor) + + self.assertNotEqual(pose, pose_copy, "Copy of pose should not be 'equal' to original") + self.assertTrue(pose.body.data.tensor.equal(pose_copy.body.data.tensor), "Copy's data should match original") + self.assertTrue(pose.body.data.mask.equal(pose_copy.body.data.mask), "Copy's mask should match original") + + pose.body.data = TorchMaskedTensor(tensor=torch.zeros_like(pose.body.data.tensor), + mask=torch.ones_like(pose.body.data.mask)) + + + self.assertFalse(pose.body.data.tensor.equal(pose_copy.body.data.tensor), "Copy's data should not match original after original is replaced") + self.assertFalse(pose.body.data.mask.equal(pose_copy.body.data.mask), "Copy's mask should not match original after original is replaced") + + pose = pose_copy.copy() + + self.assertTrue(pose.body.data.tensor.equal(pose_copy.body.data.tensor), "Copy's data should match original again") + self.assertTrue(pose.body.data.mask.equal(pose_copy.body.data.mask), "Copy's mask should match original again") + + pose_copy.body.data.tensor.fill_(3.14) + + self.assertFalse(pose.body.data.tensor.equal(pose_copy.body.data.tensor), "Copy's data should not match original after copy is modified")