diff --git a/.gitignore b/.gitignore index 36e3fa3..ec00402 100644 --- a/.gitignore +++ b/.gitignore @@ -3,8 +3,8 @@ build/ pose_evaluation.egg-info/ **/__pycache__/ .coverage -.vscode/ -coverage.lcov **/test_data/ *.npz *.code-workspace +.vscode/ +coverage.lcov \ No newline at end of file diff --git a/pose_evaluation/examples/example_metric_construction.py b/pose_evaluation/examples/example_metric_construction.py new file mode 100644 index 0000000..c1ff2db --- /dev/null +++ b/pose_evaluation/examples/example_metric_construction.py @@ -0,0 +1,77 @@ +from pathlib import Path +from pose_format import Pose +from pose_evaluation.metrics.distance_metric import DistanceMetric +from pose_evaluation.metrics.distance_measure import AggregatedPowerDistance +from pose_evaluation.metrics.base import BaseMetric +from pose_evaluation.metrics.test_distance_metric import get_poses +from pose_evaluation.utils.pose_utils import zero_pad_shorter_poses + +if __name__ == "__main__": + # Define file paths for test pose data + reference_file = ( + Path("pose_evaluation") / "utils" / "test" / "test_data" / "colin-1-HOUSE.pose" + ) + hypothesis_file = ( + Path("pose_evaluation") / "utils" / "test" / "test_data" / "colin-2-HOUSE.pose" + ) + + # Choose whether to load real files or generate test poses + # They have different lengths, and so some metrics will crash! + # Change to False to generate fake poses with known distances, e.g. all 0 and all 1 + USE_REAL_FILES = True + + if USE_REAL_FILES: + poses = [ + Pose.read(hypothesis_file.read_bytes()), + Pose.read(reference_file.read_bytes()), + ] + # TODO: add PosePreprocessors to PoseDistanceMetrics, with their own signatures + poses = zero_pad_shorter_poses(poses) + + else: + hypothesis, reference = get_poses(2, 2, conf1=1, conf2=1) + poses = [hypothesis, reference] + + # Define distance metrics + mean_l1_metric = DistanceMetric( + "mean_l1_metric", distance_measure=AggregatedPowerDistance(1, 17) + ) + metrics = [ + BaseMetric("base"), + DistanceMetric("PowerDistanceMetric", AggregatedPowerDistance(2, 1)), + DistanceMetric("AnotherPowerDistanceMetric", AggregatedPowerDistance(1, 10)), + mean_l1_metric, + DistanceMetric( + "max_l1_metric", + AggregatedPowerDistance( + order=1, aggregation_strategy="max", default_distance=0 + ), + ), + DistanceMetric( + "MeanL2Score", + AggregatedPowerDistance( + order=2, aggregation_strategy="mean", default_distance=0 + ), + ), + ] + + # Evaluate each metric on the test poses + for metric in metrics: + print("*" * 10) + print(metric.get_signature().format()) + print(metric.get_signature().format(short=True)) + + try: + score = metric.score(poses[0], poses[1]) + print(f"SCORE: {score}") + print("SCORE With Signature:") + score_with_sig = metric.score_with_signature(poses[0], poses[1]) + print(score_with_sig) + print(repr(score_with_sig)) + print(f"{type(score_with_sig)}") + + print(metric.score_with_signature(poses[0], poses[1], short=True)) + + except NotImplementedError: + print(f"{metric} score not implemented") + print("*" * 10) diff --git a/pose_evaluation/metrics/base.py b/pose_evaluation/metrics/base.py index 692f04b..1c69942 100644 --- a/pose_evaluation/metrics/base.py +++ b/pose_evaluation/metrics/base.py @@ -1,67 +1,76 @@ # pylint: disable=undefined-variable -from typing import Any, Callable +from typing import Any, Callable, Sequence from tqdm import tqdm class Signature: - """Represents reproducibility signatures for metrics. Inspired by sacreBLEU - """ - def __init__(self, name:str, args: dict): + """Represents reproducibility signatures for metrics. Inspired by sacreBLEU""" - self._abbreviated = { - "name":"n", - "higher_is_better":"hb" - } + def __init__(self, name: str, args: dict): + + self._abbreviated = {"name": "n", "higher_is_better": "hb"} self.signature_info = {"name": name, **args} def update(self, key: str, value: Any): self.signature_info[key] = value - def update_signature_and_abbr(self, key:str, abbr:str, args:dict): - self._abbreviated.update({ - key: abbr - }) + def update_abbr(self, key: str, abbr: str): + self._abbreviated.update({key: abbr}) + + def update_signature_and_abbr(self, key: str, abbr: str, args: dict): + self.update_abbr(key, abbr) - self.signature_info.update({ - key: args.get(key, None) - }) + self.signature_info.update({key: args.get(key, None)}) def format(self, short: bool = False) -> str: - pairs = [] - keys = list(self.signature_info.keys()) - for name in keys: - value = self.signature_info[name] - if value is not None: - # Check for nested signature objects - if hasattr(value, "get_signature"): - # Wrap nested signatures in brackets - nested_signature = value.get_signature() - if isinstance(nested_signature, Signature): - nested_signature = nested_signature.format(short=short) - value = f"{{{nested_signature}}}" - if isinstance(value, bool): - # Replace True/False with yes/no - value = "yes" if value else "no" - if isinstance(value, Callable): - value = value.__name__ - - # if the abbreviation is not defined, use the full name as a fallback. - abbreviated_name = self._abbreviated.get(name, name) - final_name = abbreviated_name if short else name - pairs.append(f"{final_name}:{value}") - - return "|".join(pairs) + parts = [] + # Always print the "name" value first, if available. + name_value = self.signature_info.get("name") + if name_value is not None: + parts.append(str(name_value)) + # Process all other keys. + for key, value in self.signature_info.items(): + if key == "name" or value is None: + continue + # Handle nested signature objects and wrap them in curly braces. + if hasattr(value, "get_signature"): + nested_signature = value.get_signature() + if isinstance(nested_signature, Signature): + value = "{" + nested_signature.format(short=short) + "}" + if isinstance(value, bool): + value = "yes" if value else "no" + if isinstance(value, Callable): + value = value.__name__ + abbreviated_key = self._abbreviated.get(key, key) if short else key + parts.append(f"{abbreviated_key}:{value}") + return "|".join(parts) + + def __str__(self) -> str: + return self.format() - def __str__(self): + def __repr__(self) -> str: return self.format() + +class Score: + """Inspired by Sacrebleu, a base score class which can add signature information after the value.""" + + def __init__(self, name: str, score: float, signature: str) -> None: + self.name = name + self.score = score + self._signature = signature + + def __str__(self): + return f"{self._signature} = {self.score}" + def __repr__(self): - return self.format() + return f"Score({super().__repr__()}, signature={repr(self._signature)})" + class BaseMetric[T]: """Base class for all metrics.""" - # Each metric should define its Signature class' name here + _SIGNATURE_TYPE = Signature def __init__(self, name: str, higher_is_better: bool = False): @@ -74,26 +83,47 @@ def __call__(self, hypothesis: T, reference: T) -> float: def score(self, hypothesis: T, reference: T) -> float: raise NotImplementedError - def score_max(self, hypothesis: T, references: list[T]) -> float: + def score_with_signature( + self, hypothesis: T, reference: T, short: bool = False + ) -> Score: + return Score( + name=self.name, + score=self.score(hypothesis, reference), + signature=self.get_signature().format(short=short), + ) + + def score_max(self, hypothesis: T, references: Sequence[T]) -> float: all_scores = self.score_all([hypothesis], references) return max(max(scores) for scores in all_scores) - def validate_corpus_score_input(self, hypotheses: list[T], references: list[list[T]]): + def validate_corpus_score_input( + self, hypotheses: Sequence[T], references: Sequence[Sequence[T]] + ): # This method is designed to avoid mistakes in the use of the corpus_score method for reference in references: - assert len(hypotheses) == len(reference), \ - "Hypothesis and reference must have the same number of instances" - - def corpus_score(self, hypotheses: list[T], references: list[list[T]]) -> float: - # Default implementation: average over sentence scores + assert len(hypotheses) == len( + reference + ), "Hypothesis and reference must have the same number of instances" + + def corpus_score( + self, hypotheses: Sequence[T], references: Sequence[list[T]] + ) -> float: + """Default implementation: average over sentence scores.""" self.validate_corpus_score_input(hypotheses, references) transpose_references = list(zip(*references)) - return sum(self.score_max(h, r) for h, r in zip(hypotheses, transpose_references)) / len(hypotheses) - - def score_all(self, hypotheses: list[T], references: list[T], progress_bar=True) -> list[list[float]]: - # Default implementation: call the score function for each hypothesis-reference pair - return [[self.score(h, r) for r in references] - for h in tqdm(hypotheses, disable=not progress_bar or len(hypotheses) == 1)] + scores = [ + self.score_max(h, r) for h, r in zip(hypotheses, transpose_references) + ] + return sum(scores) / len(hypotheses) + + def score_all( + self, hypotheses: Sequence[T], references: Sequence[T], progress_bar=True + ) -> list[list[float]]: + """Call the score function for each hypothesis-reference pair.""" + return [ + [self.score(h, r) for r in references] + for h in tqdm(hypotheses, disable=not progress_bar or len(hypotheses) == 1) + ] def __str__(self): return self.name diff --git a/pose_evaluation/metrics/distance_measure.py b/pose_evaluation/metrics/distance_measure.py new file mode 100644 index 0000000..1ab5433 --- /dev/null +++ b/pose_evaluation/metrics/distance_measure.py @@ -0,0 +1,115 @@ +from typing import Literal, Dict, Any +import numpy.ma as ma # pylint: disable=consider-using-from-import +from pose_evaluation.metrics.base import Signature + +AggregationStrategy = Literal["max", "min", "mean", "sum"] + +class DistanceMeasureSignature(Signature): + """Signature for distance measure metrics.""" + def __init__(self, name: str, args: Dict[str, Any]) -> None: + super().__init__(name=name, args=args) + self.update_abbr("distance", "dist") + self.update_abbr("power", "pow") + + +class DistanceMeasure: + """Abstract base class for distance measures.""" + _SIGNATURE_TYPE = DistanceMeasureSignature + + def __init__(self, name: str) -> None: + self.name = name + + def get_distance(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> float: + """ + Compute the distance between hypothesis and reference data. + + This method should be implemented by subclasses. + """ + raise NotImplementedError + + def __call__(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> float: + return self.get_distance(hyp_data, ref_data) + + def get_signature(self) -> Signature: + """Return the signature of the distance measure.""" + return self._SIGNATURE_TYPE(self.name, self.__dict__) + + +class PowerDistanceSignature(DistanceMeasureSignature): + """Signature for power distance measures.""" + def __init__(self, name: str, args: Dict[str, Any]) -> None: + super().__init__(name=name, args=args) + self.update_signature_and_abbr("order", "ord", args) + self.update_signature_and_abbr("default_distance", "dflt", args) + self.update_signature_and_abbr("aggregation_strategy", "agg", args) + + +class AggregatedPowerDistance(DistanceMeasure): + """Aggregated power distance metric using a specified aggregation strategy.""" + _SIGNATURE_TYPE = PowerDistanceSignature + + def __init__( + self, + order: int = 2, + default_distance: float = 0.0, + aggregation_strategy: AggregationStrategy = "mean", + ) -> None: + """ + Initialize the aggregated power distance metric. + + :param order: The exponent to which differences are raised. + :param default_distance: The value to fill in for masked entries. + :param aggregation_strategy: Strategy to aggregate computed distances. + """ + super().__init__(name="power_distance") + self.power = float(order) + self.default_distance = default_distance + self.aggregation_strategy = aggregation_strategy + + def _aggregate(self, distances: ma.MaskedArray) -> float: + """ + Aggregate computed distances using the specified strategy. + + :param distances: A masked array of computed distances. + :return: A single aggregated distance value. + """ + aggregation_funcs = { + "mean": distances.mean, + "max": distances.max, + "min": distances.min, + "sum": distances.sum, + } + if self.aggregation_strategy in aggregation_funcs: + return aggregation_funcs[self.aggregation_strategy]() + + raise NotImplementedError( + f"Aggregation Strategy {self.aggregation_strategy} not implemented" + ) + + def _calculate_distances( + self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray + ) -> ma.MaskedArray: + """ + Compute element-wise distances between hypothesis and reference data. + + Steps: + 1. Compute the absolute differences. + 2. Raise the differences to the specified power. + 3. Sum the powered differences along the last axis. + 4. Extract the root corresponding to the power. + 5. Fill masked values with the default distance. + + :param hyp_data: Hypothesis data as a masked array. + :param ref_data: Reference data as a masked array. + :return: A masked array of computed distances. + """ + diffs = ma.abs(hyp_data - ref_data) + raised_to_power = ma.power(diffs, self.power) + summed_results = ma.sum(raised_to_power, axis=-1, keepdims=True) + roots = ma.power(summed_results, 1 / self.power) + return ma.filled(roots, self.default_distance) + + def get_distance(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> float: + """Compute and aggregate the distance between hypothesis and reference data.""" + calculated = self._calculate_distances(hyp_data, ref_data) + return self._aggregate(calculated) diff --git a/pose_evaluation/metrics/distance_metric.py b/pose_evaluation/metrics/distance_metric.py index c3cbc16..830db13 100644 --- a/pose_evaluation/metrics/distance_metric.py +++ b/pose_evaluation/metrics/distance_metric.py @@ -1,38 +1,15 @@ -from typing import Literal - -from numpy import ma from pose_format import Pose - from pose_evaluation.metrics.base_pose_metric import PoseMetric +from pose_evaluation.metrics.distance_measure import DistanceMeasure class DistanceMetric(PoseMetric): - def __init__(self, kind: Literal["l1", "l2"] = "l2"): - super().__init__(f"DistanceMetric {kind}", higher_is_better=False) - self.kind = kind - - def score(self, hypothesis: Pose, reference: Pose) -> float: - arrays = [hypothesis.body.data, reference.body.data] - max_length = max(len(array) for array in arrays) - # Pad the shorter array with zeros - for i, array in enumerate(arrays): - if len(array) < max_length: - shape = list(array.shape) - shape[0] = max_length - len(array) - padding_tensor = ma.zeros(shape) - arrays[i] = ma.concatenate([array, padding_tensor], axis=0) - - # Calculate the error - error = arrays[0] - arrays[1] + """Computes the distance between two poses using the provided distance measure.""" - # for l2, we need to calculate the error for each point - if self.kind == "l2": - # the last dimension is the 3D coordinates - error = ma.power(error, 2) - error = error.sum(axis=-1) - error = ma.sqrt(error) - else: - error = ma.abs(error) + def __init__(self, name: str, distance_measure: DistanceMeasure) -> None: + super().__init__(name=name, higher_is_better=False) + self.distance_measure = distance_measure - error = error.filled(0) - return error.sum() + def score(self, hypothesis: Pose, reference: Pose) -> float: + """Calculate the distance score between hypothesis and reference poses.""" + return self.distance_measure(hypothesis.body.data, reference.body.data) diff --git a/pose_evaluation/metrics/test_distance_metric.py b/pose_evaluation/metrics/test_distance_metric.py index e1d7d39..7d2278a 100644 --- a/pose_evaluation/metrics/test_distance_metric.py +++ b/pose_evaluation/metrics/test_distance_metric.py @@ -1,72 +1,103 @@ -import math import unittest +from typing import Optional import numpy as np from pose_format import Pose from pose_format.numpy import NumPyPoseBody +from pose_evaluation.metrics.distance_measure import AggregatedPowerDistance from pose_evaluation.metrics.distance_metric import DistanceMetric -def get_poses(length1: int, length2: int): +def get_poses( + length1: int, + length2: int, + conf1: Optional[float] = None, + conf2: Optional[float] = None, +): + """ + Utility function to generate hypothesis and reference Pose objects for testing. + + Args: + length1 (int): Number of frames in the hypothesis pose. + length2 (int): Number of frames in the reference pose. + conf1 (float, optional): Confidence multiplier for the hypothesis. + conf2 (float, optional): Confidence multiplier for the reference. + + Returns: + tuple: A tuple containing (hypothesis, reference) Pose objects. + """ data_tensor = np.full((length1, 3, 4, 3), fill_value=2) zeros_tensor = np.zeros((length2, 3, 4, 3)) data_confidence = np.ones(data_tensor.shape[:-1]) zeros_confidence = np.ones(zeros_tensor.shape[:-1]) - hypothesis = Pose(header=None, body=NumPyPoseBody(fps=1, data=data_tensor, confidence=data_confidence)) - reference = Pose(header=None, body=NumPyPoseBody(fps=1, data=zeros_tensor, confidence=zeros_confidence)) + if conf1 is not None: + data_confidence = conf1 * data_confidence + if conf2 is not None: + zeros_confidence = conf2 * zeros_confidence + + # TODO: add an actual header, something like + # header = PoseHeader(1.0, PoseHeaderDimensions(10, 20, 5), [PoseHeaderComponent(...)], is_bbox=True) + hypothesis = Pose( + header=None, # type: ignore + body=NumPyPoseBody(fps=1, data=data_tensor, confidence=data_confidence), + ) + reference = Pose( + header=None, # type: ignore + body=NumPyPoseBody(fps=1, data=zeros_tensor, confidence=zeros_confidence), + ) return hypothesis, reference -class TestDistanceMetricGeneric(unittest.TestCase): - def setUp(self): - self.metric = DistanceMetric("l2") - - def test_scores_are_symmetric(self): - hypothesis, reference = get_poses(2, 2) - - score1 = self.metric.score(hypothesis, reference) - # pylint: disable=arguments-out-of-order - score2 = self.metric.score(reference, hypothesis) - self.assertAlmostEqual(score1, score2) - def test_score_different_length(self): - hypothesis, reference = get_poses(3, 2) - - difference = 6 * np.prod(hypothesis.body.confidence.shape) - - score = self.metric.score(hypothesis, reference) - self.assertIsInstance(score, float) - self.assertAlmostEqual(score, difference) +class TestDistanceMetricMeanL1(unittest.TestCase): + """Tests for the L1 (Manhattan) distance metric using mean aggregation.""" -class TestDistanceMetricL1(unittest.TestCase): def setUp(self): - self.metric = DistanceMetric("l1") + self.metric = DistanceMetric( + "mean_l1_metric", + distance_measure=AggregatedPowerDistance(order=1, default_distance=0), + ) def test_score_equal_length(self): hypothesis, reference = get_poses(2, 2) - - # calculate what the difference should be - difference = 6 * np.prod(hypothesis.body.confidence.shape) + expected_distance = 6 # Sum of absolute differences: 2 + 2 + 2 score = self.metric.score(hypothesis, reference) - self.assertIsInstance(score, float) # Check if the score is a float - self.assertAlmostEqual(score, difference) + self.assertIsInstance(score, float) + self.assertAlmostEqual(score, expected_distance) + class TestDistanceMetricL2(unittest.TestCase): + """Tests for the L2 (Euclidean) distance metric with default distance substitution.""" + def setUp(self): - self.metric = DistanceMetric("l2") + self.default_distance = 17 + self.metric = DistanceMetric( + "l2_metric", + distance_measure=AggregatedPowerDistance( + order=2, default_distance=self.default_distance + ), + ) + + def _check_against_expected(self, hypothesis, reference, expected): + score = self.metric.score(hypothesis, reference) + self.assertIsInstance(score, float) + self.assertAlmostEqual(score, expected) def test_score_equal_length(self): hypothesis, reference = get_poses(2, 2) + expected_distance = np.sqrt(2**2 + 2**2 + 2**2) # sqrt(12) + self._check_against_expected(hypothesis, reference, expected=expected_distance) - # calculate what the difference should be - difference = math.sqrt(12) * np.prod(hypothesis.body.confidence.shape) + def test_score_equal_length_one_masked(self): + hypothesis, reference = get_poses(2, 2, conf1=0.0) + self._check_against_expected( + hypothesis, reference, expected=self.default_distance + ) - score = self.metric.score(hypothesis, reference) - self.assertIsInstance(score, float) # Check if the score is a float - self.assertAlmostEqual(score, difference) + # TODO: Add tests for other aggregation strategies and power values -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/pose_evaluation/metrics/test_embedding_distance_metric.py b/pose_evaluation/metrics/test_embedding_distance_metric.py index ab275c6..0f08bb9 100644 --- a/pose_evaluation/metrics/test_embedding_distance_metric.py +++ b/pose_evaluation/metrics/test_embedding_distance_metric.py @@ -87,7 +87,7 @@ def save_and_plot_distances(distances, matrix_name, num_points, dim): """Helper function to save distance matrix and plot distances.""" distances = distances.cpu() - test_artifacts_dir = Path(__file__).parent / "temp" + test_artifacts_dir = Path(__file__).parent / "tests" output_path = test_artifacts_dir / f"distance_matrix_{matrix_name}_{num_points}_{dim}D.csv" np.savetxt(output_path, distances.numpy(), delimiter=",", fmt="%.4f") print(f"Distance matrix saved to {output_path}") diff --git a/pose_evaluation/utils/test_pose_utils.py b/pose_evaluation/utils/test_pose_utils.py index cd54cea..adc69f6 100644 --- a/pose_evaluation/utils/test_pose_utils.py +++ b/pose_evaluation/utils/test_pose_utils.py @@ -16,7 +16,6 @@ reduce_poses_to_intersection, get_component_names_and_points_dict, zero_pad_shorter_poses, - set_masked_to_origin_position, ) @@ -233,35 +232,6 @@ def test_detect_format( detect_known_pose_format(pose) -def test_set_masked_to_origin_pos(mediapipe_poses_test_data: List[Pose]): - # Create a copy of the original poses for comparison - originals = [pose.copy() for pose in mediapipe_poses_test_data] - - # Apply the transformation - poses = [set_masked_to_origin_position(pose) for pose in mediapipe_poses_test_data] - - for original, transformed in zip(originals, poses): - # 1. Ensure the transformed data is still a MaskedArray - assert isinstance(transformed.body.data, np.ma.MaskedArray) - - # # 2. Ensure the mask is now all False, meaning data _exists_ though its _value_ is now zero - # assert np.ma.all(~transformed.body.data.mask) - # assert original.body.data.mask.sum() == 0 - assert transformed.body.data.mask.sum() == 0 - - # 3. Check the shape matches the original - assert transformed.body.data.shape == original.body.data.shape - - # 4. Validate masked positions in the original are now zeros - assert ma.all(transformed.body.data.data[original.body.data.mask] == 0) - - # 5. Validate unmasked positions in the original remain unchanged - assert ma.all( - transformed.body.data.data[~original.body.data.mask] - == original.body.data.data[~original.body.data.mask] - ) - - def test_hide_low_conf(mediapipe_poses_test_data: List[Pose]): copies = [pose.copy() for pose in mediapipe_poses_test_data] for pose, copy in zip(mediapipe_poses_test_data, copies):