diff --git a/pose_evaluation/evaluation/evaluate_signclip.py b/pose_evaluation/evaluation/evaluate_signclip.py index 9daaa77..2524124 100644 --- a/pose_evaluation/evaluation/evaluate_signclip.py +++ b/pose_evaluation/evaluation/evaluate_signclip.py @@ -166,7 +166,9 @@ def calculate_class_means(gloss_indices, scores): # return within_class_means_by_gloss -def evaluate_signclip(emb_dir: Path, split_file: Path, out_path: Path, kind: str = "cosine"): +def evaluate_signclip( + emb_dir: Path, split_file: Path, out_path: Path, kind: str = "cosine" +): # pylint: disable=too-many-locals, too-many-statements """ Evaluate SignCLIP embeddings using score_all. @@ -263,7 +265,7 @@ def evaluate_signclip(emb_dir: Path, split_file: Path, out_path: Path, kind: str save_start = time.perf_counter() class_means_json = out_path.with_name(f"{out_path.stem}_class_means").with_suffix(".json") - with open(class_means_json, "w") as f: + with open(class_means_json, "w", encoding="utf-8") as f: print(f"Writing class means to {f}") json.dump(class_means, f) np.savez(out_path, scores=scores, files=files) diff --git a/pose_evaluation/examples/example_metric_construction.py b/pose_evaluation/examples/example_metric_construction.py index 71de1af..cd734da 100644 --- a/pose_evaluation/examples/example_metric_construction.py +++ b/pose_evaluation/examples/example_metric_construction.py @@ -2,11 +2,20 @@ from pose_format import Pose -from pose_evaluation.metrics.base import BaseMetric from pose_evaluation.metrics.distance_measure import AggregatedPowerDistance from pose_evaluation.metrics.distance_metric import DistanceMetric +from pose_evaluation.metrics.dtw_metric import ( + DTWAggregatedPowerDistanceMeasure, + DTWAggregatedScipyDistanceMeasure, +) from pose_evaluation.metrics.test_distance_metric import get_poses -from pose_evaluation.utils.pose_utils import zero_pad_shorter_poses +from pose_evaluation.metrics.pose_processors import ( + NormalizePosesProcessor, + ZeroPadShorterPosesProcessor, + HideLegsPosesProcessor, + ReduceHolisticPoseProcessor, + get_standard_pose_processors, +) if __name__ == "__main__": # Define file paths for test pose data @@ -16,7 +25,8 @@ # Choose whether to load real files or generate test poses # They have different lengths, and so some metrics will crash! - # Change to False to generate fake poses with known distances, e.g. all 0 and all 1 + # Metrics with ZeroPadShorterPosesProcessor, DTWMetrics are fine. + # Change to False to generate fake poses with known distances, e.g. all 0 and all 1\ USE_REAL_FILES = True if USE_REAL_FILES: @@ -24,45 +34,129 @@ Pose.read(hypothesis_file.read_bytes()), Pose.read(reference_file.read_bytes()), ] - # TODO: add PosePreprocessors to PoseDistanceMetrics, with their own signatures - poses = zero_pad_shorter_poses(poses) else: hypothesis, reference = get_poses(2, 2, conf1=1, conf2=1) poses = [hypothesis, reference] + hypotheses = [pose.copy() for pose in poses] + references = [pose.copy() for pose in poses] + + ############################# + # Abstract classes: + + # BaseMetric does not actually have score() function + # base_metric = BaseMetric("base") + + # PoseMetric calls preprocessors before scoring, + # It is also an abstract class + # PoseMetric("pose base"), + + # Segments first, also abstract. + # SegmentedPoseMetric("SegmentedMetric") + # Define distance metrics - mean_l1_metric = DistanceMetric("mean_l1_metric", distance_measure=AggregatedPowerDistance(1, 17)) metrics = [ - BaseMetric("base"), - DistanceMetric("PowerDistanceMetric", AggregatedPowerDistance(2, 1)), - DistanceMetric("AnotherPowerDistanceMetric", AggregatedPowerDistance(1, 10)), - mean_l1_metric, + # a DistanceMetric uses a DistanceMeasure to calculate distances between two Poses + # This one is effectively (normalized) Average Position Error (APE) + # as it by default will run zero-padding of the shorter pose, and normalization, + # and AggregatedPowerDistance does mean absolute (euclidean) distances by default. DistanceMetric( - "max_l1_metric", - AggregatedPowerDistance(order=1, aggregation_strategy="max", default_distance=0), + "NormalizedAveragePositionError", + AggregatedPowerDistance(), # ), + # Customizing Distances + # Distance Measures have signatures as well. + # You can set options on the DistanceMeasure and they will be reflected in the signature. + # This one would be distance_measure:{power_distance|pow:1.0|dflt:1.0|agg:max} DistanceMetric( - "MeanL2Score", + "MaxL1DistanceMetric", + AggregatedPowerDistance(order=1, default_distance=1, aggregation_strategy="max"), # + ), + # Customizing Preprocessing + # A DistanceMetric is a PoseMetric, and so it will call PosePreprocessors before scoring + # get_standard_pose_processors gives you some default options, + # for example you could decide not to remove the legs + DistanceMetric( + "CustomizedPosePreprocessorsWithLegsMetric", + distance_measure=AggregatedPowerDistance("A custom name", order=1, default_distance=10), + pose_preprocessors=get_standard_pose_processors( + remove_legs=False, # If you want the legs + ), + ), + # Recreating Existing Metrics: Average Position Error/ Mean Joint Error + # As defined in Ham2Pose, + # APE is "the average L2 distance between the predicted and the GT pose keypoints + # across all frames and data samples. Since it compares absolute positions, + # it is sensitive to different body shapes and slight changes + # in timing or position of the performed movement" + # So we: + # - Select AggregatedPowerDistance measure + # - set the order to 2 (Euclidean distance) + # - set the aggregation strategy to mean + # - recreate the set of preprocessors from https://github.com/rotem-shalev/Ham2Pose/blob/main/metrics.py#L32-L62 + # (adapting to MediaPipe Holistic keypoints format instead of OpenPose) + DistanceMetric( + "AveragePositionError", AggregatedPowerDistance(order=2, aggregation_strategy="mean", default_distance=0), + pose_preprocessors=[ + NormalizePosesProcessor(), + HideLegsPosesProcessor(), + ZeroPadShorterPosesProcessor(), + ReduceHolisticPoseProcessor(), + ], + ), + # Recreating Dynamic Time Warping - Mean Joint Error + # As before, only now we use the Dynamic Time Warping version! + DistanceMetric( + "DTWPowerDistance", + DTWAggregatedPowerDistanceMeasure(aggregation_strategy="mean", default_distance=0.0, order=2), + pose_preprocessors=get_standard_pose_processors( + zero_pad_shorter=False, reduce_holistic_to_face_and_upper_body=True + ), + ), + # We can also implement a version that uses scipy distances "cdist" + # This lets us experiment with e.g. jaccard + # Options are listed at the documentation for scipy: + # https://docs.scipy.org/doc/scipy-1.15.0/reference/generated/scipy.spatial.distance.cdist.html + DistanceMetric( + "DTWScipyDistance", + DTWAggregatedScipyDistanceMeasure(aggregation_strategy="mean", default_distance=0.0, metric="jaccard"), + pose_preprocessors=get_standard_pose_processors( + zero_pad_shorter=False, reduce_holistic_to_face_and_upper_body=True + ), ), ] # Evaluate each metric on the test poses for metric in metrics: print("*" * 10) + print(metric.name) + + print("\nMETRIC __str__: ") + print(str(metric)) + + print("\nMETRIC to repr: ") + print(repr(metric)) + + print("\nSIGNATURE: ") print(metric.get_signature().format()) + + print("\nSIGNATURE (short): ") print(metric.get_signature().format(short=True)) try: + # + print("\nSCORE ALL with Signature (short):") + print(metric.score_all_with_signature(hypotheses, references, short=True, progress_bar=True)) + score = metric.score(poses[0], poses[1]) - print(f"SCORE: {score}") - print("SCORE With Signature:") - score_with_sig = metric.score_with_signature(poses[0], poses[1]) - print(score_with_sig) - print(repr(score_with_sig)) - print(f"{type(score_with_sig)}") + print(f"\nSCORE: {score}") + + print("\nSCORE With Signature:") + print(metric.score_with_signature(poses[0], poses[1])) + print("\nSCORE with Signature (short):") print(metric.score_with_signature(poses[0], poses[1], short=True)) except NotImplementedError: diff --git a/pose_evaluation/metrics/base.py b/pose_evaluation/metrics/base.py index b99385d..c8d1a56 100644 --- a/pose_evaluation/metrics/base.py +++ b/pose_evaluation/metrics/base.py @@ -1,8 +1,10 @@ -# pylint: disable=undefined-variable -from typing import Any, Callable, Sequence +from abc import ABC, abstractmethod +from typing import Any, Callable, Generic, Sequence, TypeVar from tqdm import tqdm +T = TypeVar("T") + class Signature: """Represents reproducibility signatures for metrics. Inspired by sacreBLEU""" @@ -21,7 +23,6 @@ def update_abbr(self, key: str, abbr: str): def update_signature_and_abbr(self, key: str, abbr: str, args: dict): self.update_abbr(key, abbr) - self.signature_info.update({key: args.get(key, None)}) def format(self, short: bool = False) -> str: @@ -39,6 +40,9 @@ def format(self, short: bool = False) -> str: nested_signature = value.get_signature() if isinstance(nested_signature, Signature): value = "{" + nested_signature.format(short=short) + "}" + elif isinstance(value, list) and all(hasattr(v, "get_signature") for v in value): + value = "[" + ",".join(v.get_signature().format(short=short) for v in value) + "]" + if isinstance(value, bool): value = "yes" if value else "no" if isinstance(value, Callable): @@ -60,16 +64,31 @@ class Score: def __init__(self, name: str, score: float, signature: str) -> None: self.name = name self.score = score - self._signature = signature + self.signature = signature def __str__(self): - return f"{self._signature} = {self.score}" + return f"{self.signature} = {self.score}" + + def format( + self, + width: int = 2, + score_only: bool = False, + ) -> str: + + sc = f"{self.score:.{width}f}" + + full_score = f"{self.signature}" if self.signature else self.name + full_score = f"{full_score} = {sc}" + + if score_only: + return sc + return full_score def __repr__(self): - return f"Score({super().__repr__()}, signature={repr(self._signature)})" + return self.format() -class BaseMetric[T]: +class BaseMetric(ABC, Generic[T]): # Ensure it extends ABC """Base class for all metrics.""" _SIGNATURE_TYPE = Signature @@ -81,10 +100,16 @@ def __init__(self, name: str, higher_is_better: bool = False): def __call__(self, hypothesis: T, reference: T) -> float: return self.score(hypothesis, reference) + @abstractmethod def score(self, hypothesis: T, reference: T) -> float: raise NotImplementedError - def score_with_signature(self, hypothesis: T, reference: T, short: bool = False) -> Score: + def score_with_signature( + self, + hypothesis: T, + reference: T, + short: bool = False, + ) -> Score: return Score( name=self.name, score=self.score(hypothesis, reference), @@ -107,15 +132,27 @@ def corpus_score(self, hypotheses: Sequence[T], references: Sequence[list[T]]) - scores = [self.score_max(h, r) for h, r in zip(hypotheses, transpose_references)] return sum(scores) / len(hypotheses) - def score_all(self, hypotheses: Sequence[T], references: Sequence[T], progress_bar=True) -> list[list[float]]: + def score_all(self, hypotheses: Sequence[T], references: Sequence[T], progress_bar=False) -> list[list[float]]: """Call the score function for each hypothesis-reference pair.""" return [ [self.score(h, r) for r in references] for h in tqdm(hypotheses, disable=not progress_bar or len(hypotheses) == 1) ] + def score_all_with_signature( + self, + hypotheses: Sequence[T], + references: Sequence[T], + progress_bar=False, + short: bool = False, + ) -> list[list[Score]]: + return [ + [self.score_with_signature(h, r, short=short) for r in references] + for h in tqdm(hypotheses, disable=not progress_bar or len(hypotheses) == 1) + ] + def __str__(self): - return self.name + return str(self.get_signature()) def get_signature(self) -> Signature: return self._SIGNATURE_TYPE(self.name, self.__dict__) diff --git a/pose_evaluation/metrics/base_pose_metric.py b/pose_evaluation/metrics/base_pose_metric.py index 903bf01..925f4c6 100644 --- a/pose_evaluation/metrics/base_pose_metric.py +++ b/pose_evaluation/metrics/base_pose_metric.py @@ -1,5 +1,92 @@ +from abc import ABC, abstractmethod +from typing import Iterable, List, Sequence, cast, Union +from tqdm import tqdm + from pose_format import Pose -from pose_evaluation.metrics.base import BaseMetric +from pose_evaluation.metrics.base import BaseMetric, Signature, Score +from pose_evaluation.metrics.pose_processors import PoseProcessor +from pose_evaluation.metrics.pose_processors import get_standard_pose_processors + + +class PoseMetricSignature(Signature): + def __init__(self, name: str, args: dict): + super().__init__(name, args) + self.update_abbr("pose_preprocessors", "pre") + # self.update_signature_and_abbr("pose_preprocessors", "pre", args) + + +class PoseMetric(BaseMetric[Pose], ABC): + _SIGNATURE_TYPE = PoseMetricSignature + + def __init__( + self, + name: str = "PoseMetric", + higher_is_better: bool = False, + pose_preprocessors: Union[None, List[PoseProcessor]] = None, + ): + + super().__init__(name, higher_is_better) + if pose_preprocessors is None: + self.pose_preprocessors = get_standard_pose_processors() + else: + self.pose_preprocessors = pose_preprocessors + + @abstractmethod + def _pose_score(self, processed_hypothesis: Pose, processed_reference: Pose): + raise NotImplementedError("Subclasses must implement _pose_score") + + def score(self, hypothesis: Pose, reference: Pose): + """For PoseMetrics, preprocessors are called before scoring.""" + hypothesis, reference = self.process_poses([hypothesis, reference]) + return self._pose_score(hypothesis, reference) + + def score_all( + self, hypotheses: Sequence[Pose], references: Sequence[Pose], progress_bar=False + ) -> List[List[float]]: + hyp_len = len(hypotheses) + ref_len = len(references) + + all_poses = self.process_poses(list(hypotheses) + list(references)) + + # Recover original lists if needed + hypotheses = all_poses[:hyp_len] + references = all_poses[hyp_len : hyp_len + ref_len] + return [ + [self.score(h, r) for r in references] + for h in tqdm(hypotheses, disable=not progress_bar or len(hypotheses) == 1) + ] + + def score_with_signature(self, hypothesis: Pose, reference: Pose, short: bool = False) -> Score: + return Score( + name=self.name, + score=self.score(hypothesis, reference), + signature=self.get_signature().format(short=short), + ) + + def score_all_with_signature( + self, + hypotheses: Sequence[Pose], + references: Sequence[Pose], + progress_bar=False, + short: bool = False, + ) -> list[list[Score]]: + + return [ + [self.score_with_signature(h, r, short=short) for r in references] + for h in tqdm( + hypotheses, + desc=f"{self.name} scoring", + disable=not progress_bar or len(hypotheses) == 1, + ) + ] + + def process_poses(self, poses: Iterable[Pose], progress=False) -> List[Pose]: + poses = list(poses) + for preprocessor in tqdm(self.pose_preprocessors, desc="Preprocessing Poses", disable=not progress): + preprocessor = cast(PoseProcessor, preprocessor) + poses = preprocessor.process_poses(poses, progress=progress) + return poses -PoseMetric = BaseMetric[Pose] + def add_preprocessor(self, processor: PoseProcessor): + self.pose_preprocessors.append(processor) diff --git a/pose_evaluation/metrics/conftest.py b/pose_evaluation/metrics/conftest.py index 754a47e..9c44e47 100644 --- a/pose_evaluation/metrics/conftest.py +++ b/pose_evaluation/metrics/conftest.py @@ -1,10 +1,10 @@ import shutil from pathlib import Path -from typing import Callable, Union - +from typing import Callable, List, Union +import torch import numpy as np import pytest -import torch +from pose_format import Pose @pytest.fixture(scope="session", autouse=True) @@ -22,13 +22,16 @@ def clean_test_artifacts(): def fixture_distance_matrix_shape_checker() -> Callable[[torch.Tensor, torch.Tensor], None]: def _check_shape(hyp_count: int, ref_count: int, distance_matrix: torch.Tensor): expected_shape = torch.Size([hyp_count, ref_count]) - assert distance_matrix.shape == expected_shape, ( - f"For M={hyp_count} hypotheses, N={ref_count} references, " + # "line too long" + msg = ( + f"For M={hyp_count} hypotheses, N={ref_count} references, " f"Distance Matrix should be MxN={expected_shape}. " f"Instead, received {distance_matrix.shape}" ) - return _check_shape + assert distance_matrix.shape == expected_shape, msg + + return _check_shape # type: ignore @pytest.fixture(name="distance_range_checker") @@ -50,3 +53,10 @@ def _check_range( ), f"Maximum distance ({max_distance}) is outside the expected range [{min_val}, {max_val}]" return _check_range + + +@pytest.fixture +def real_pose_files() -> List[Pose]: + test_files_folder = Path("pose_evaluation") / "utils" / "test" / "test_data" + real_pose_files_list = [Pose.read(test_file.read_bytes()) for test_file in test_files_folder.glob("*.pose")] + return real_pose_files_list diff --git a/pose_evaluation/metrics/distance_measure.py b/pose_evaluation/metrics/distance_measure.py index 52bf1c2..0a2d0ce 100644 --- a/pose_evaluation/metrics/distance_measure.py +++ b/pose_evaluation/metrics/distance_measure.py @@ -1,3 +1,4 @@ +from abc import ABC, abstractmethod from typing import Literal, Dict, Any import numpy.ma as ma # pylint: disable=consider-using-from-import @@ -16,14 +17,16 @@ def __init__(self, name: str, args: Dict[str, Any]) -> None: self.update_abbr("power", "pow") -class DistanceMeasure: +class DistanceMeasure(ABC): """Abstract base class for distance measures.""" _SIGNATURE_TYPE = DistanceMeasureSignature - def __init__(self, name: str) -> None: + def __init__(self, name: str, default_distance=0.0) -> None: self.name = name + self.default_distance = default_distance + @abstractmethod def get_distance(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> float: """ Compute the distance between hypothesis and reference data. @@ -32,9 +35,21 @@ def get_distance(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> fl """ raise NotImplementedError + def _get_keypoint_trajectories(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray): + # frames, persons, keypoint + for keypoint_idx in range(hyp_data.shape[2]): + hyp_trajectory, ref_trajectory = ( + hyp_data[:, 0, keypoint_idx, :], + ref_data[:, 0, keypoint_idx, :], + ) + yield hyp_trajectory, ref_trajectory + def __call__(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> float: return self.get_distance(hyp_data, ref_data) + def _calculate_pointwise_distances(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> ma.MaskedArray: + raise NotImplementedError + def get_signature(self) -> Signature: """Return the signature of the distance measure.""" return self._SIGNATURE_TYPE(self.name, self.__dict__) @@ -50,27 +65,14 @@ def __init__(self, name: str, args: Dict[str, Any]) -> None: self.update_signature_and_abbr("aggregation_strategy", "agg", args) -class AggregatedPowerDistance(DistanceMeasure): - """Aggregated power distance metric using a specified aggregation strategy.""" - - _SIGNATURE_TYPE = PowerDistanceSignature - +class AggregatedDistanceMeasure(DistanceMeasure): def __init__( self, - order: int = 2, + name: str, default_distance: float = 0.0, aggregation_strategy: AggregationStrategy = "mean", ) -> None: - """ - Initialize the aggregated power distance metric. - - :param order: The exponent to which differences are raised. - :param default_distance: The value to fill in for masked entries. - :param aggregation_strategy: Strategy to aggregate computed distances. - """ - super().__init__(name="power_distance") - self.power = float(order) - self.default_distance = default_distance + super().__init__(name, default_distance=default_distance) self.aggregation_strategy = aggregation_strategy def _aggregate(self, distances: ma.MaskedArray) -> float: @@ -91,28 +93,63 @@ def _aggregate(self, distances: ma.MaskedArray) -> float: raise NotImplementedError(f"Aggregation Strategy {self.aggregation_strategy} not implemented") - def _calculate_distances(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> ma.MaskedArray: - """ - Compute element-wise distances between hypothesis and reference data. - - Steps: - 1. Compute the absolute differences. - 2. Raise the differences to the specified power. - 3. Sum the powered differences along the last axis. - 4. Extract the root corresponding to the power. - 5. Fill masked values with the default distance. - - :param hyp_data: Hypothesis data as a masked array. - :param ref_data: Reference data as a masked array. - :return: A masked array of computed distances. - """ - diffs = ma.abs(hyp_data - ref_data) - raised_to_power = ma.power(diffs, self.power) - summed_results = ma.sum(raised_to_power, axis=-1, keepdims=True) - roots = ma.power(summed_results, 1 / self.power) - return ma.filled(roots, self.default_distance) - def get_distance(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> float: """Compute and aggregate the distance between hypothesis and reference data.""" - calculated = self._calculate_distances(hyp_data, ref_data) + calculated = self._calculate_pointwise_distances(hyp_data, ref_data) return self._aggregate(calculated) + + +class AggregatedPowerDistance(AggregatedDistanceMeasure): + """Aggregated power distance metric using a specified aggregation strategy.""" + + _SIGNATURE_TYPE = PowerDistanceSignature + + def __init__( + self, + name="AggregatedPowerDistance", + order: int = 2, + default_distance: float = 0.0, + aggregation_strategy: AggregationStrategy = "mean", + ) -> None: + """ + Initialize the aggregated power distance metric. + + :param order: The exponent to which differences are raised. + :param default_distance: The value to fill in for masked entries. + :param aggregation_strategy: Strategy to aggregate computed distances. + """ + super().__init__( + name=name, + aggregation_strategy=aggregation_strategy, + default_distance=default_distance, + ) + self.power = float(order) + + def _calculate_pointwise_distances(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> ma.MaskedArray: + return masked_array_power_distance( + hyp_data=hyp_data, ref_data=ref_data, power=self.power, default_distance=self.default_distance + ) + + +def masked_array_power_distance( + hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray, power: float, default_distance: float +) -> ma.MaskedArray: + """ + Compute element-wise distances between hypothesis and reference data. + + Steps: + 1. Compute the absolute differences. + 2. Raise the differences to the specified power. + 3. Sum the powered differences along the last axis. + 4. Extract the root corresponding to the power. + 5. Fill masked values with the default distance. + + :param hyp_data: Hypothesis data as a masked array. + :param ref_data: Reference data as a masked array. + :return: A masked array of computed distances. + """ + diffs = ma.abs(hyp_data - ref_data) + raised_to_power = ma.power(diffs, power) + summed_results = ma.sum(raised_to_power, axis=-1, keepdims=True) + roots = ma.power(summed_results, 1 / power) + return ma.filled(roots, default_distance) diff --git a/pose_evaluation/metrics/distance_metric.py b/pose_evaluation/metrics/distance_metric.py index 9beae4b..934d647 100644 --- a/pose_evaluation/metrics/distance_metric.py +++ b/pose_evaluation/metrics/distance_metric.py @@ -1,3 +1,4 @@ +from typing import Any from pose_format import Pose from pose_evaluation.metrics.base_pose_metric import PoseMetric @@ -7,10 +8,15 @@ class DistanceMetric(PoseMetric): """Computes the distance between two poses using the provided distance measure.""" - def __init__(self, name: str, distance_measure: DistanceMeasure) -> None: - super().__init__(name=name, higher_is_better=False) + def __init__( + self, + name: str, + distance_measure: DistanceMeasure, + **kwargs: Any, + ) -> None: + super().__init__(name=name, higher_is_better=False, **kwargs) + self.distance_measure = distance_measure - def score(self, hypothesis: Pose, reference: Pose) -> float: - """Calculate the distance score between hypothesis and reference poses.""" - return self.distance_measure(hypothesis.body.data, reference.body.data) + def _pose_score(self, processed_hypothesis: Pose, processed_reference: Pose) -> float: + return self.distance_measure(processed_hypothesis.body.data, processed_reference.body.data) diff --git a/pose_evaluation/metrics/dtw_metric.py b/pose_evaluation/metrics/dtw_metric.py new file mode 100644 index 0000000..35e0cd1 --- /dev/null +++ b/pose_evaluation/metrics/dtw_metric.py @@ -0,0 +1,176 @@ +from fastdtw import fastdtw # type: ignore +from dtaidistance import dtw_ndim +from scipy.spatial.distance import cdist +import numpy.ma as ma # pylint: disable=consider-using-from-import +from tqdm import tqdm + +from pose_evaluation.metrics.distance_measure import ( + AggregatedDistanceMeasure, + AggregationStrategy, + masked_array_power_distance, +) + + +class DTWAggregatedDistanceMeasure(AggregatedDistanceMeasure): + def __init__( + self, + name="DTWAggregatedDistanceMeasure", + default_distance: float = 0, + aggregation_strategy: AggregationStrategy = "mean", + ) -> None: + super().__init__( + name=name, + default_distance=default_distance, + aggregation_strategy=aggregation_strategy, + ) + + def get_distance(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray, progress=False) -> float: + keypoint_count = hyp_data.shape[2] # Assuming shape: (frames, person, keypoints, xyz) + trajectory_distances = ma.empty(keypoint_count) # Preallocate a NumPy array + + for i, (hyp_trajectory, ref_trajectory) in tqdm( + enumerate(self._get_keypoint_trajectories(hyp_data, ref_data)), + desc="getting dtw distances for trajectories", + total=keypoint_count, + disable=not progress, + ): + distance, _ = fastdtw(hyp_trajectory, ref_trajectory, dist=self._calculate_pointwise_distances) + trajectory_distances[i] = distance # Store distance in the preallocated array + trajectory_distances = ma.array(trajectory_distances) + return self._aggregate(trajectory_distances) + + def _calculate_pointwise_distances(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> ma.MaskedArray: + raise NotImplementedError("_calculate_pointwise_distances must be a callable that can be passed to fastdtw") + + +class DTWAggregatedPowerDistanceMeasure(DTWAggregatedDistanceMeasure): + def __init__( + self, + name="DTWAggregatedDistanceMeasure", + default_distance: float = 0, + aggregation_strategy: AggregationStrategy = "mean", + order=2, + ) -> None: + super().__init__( + name=name, + default_distance=default_distance, + aggregation_strategy=aggregation_strategy, + ) + self.power = order + + def _calculate_pointwise_distances(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> ma.MaskedArray: + return masked_array_power_distance( + hyp_data=hyp_data, ref_data=ref_data, power=self.power, default_distance=self.default_distance + ) + + +class DTWAggregatedScipyDistanceMeasure(DTWAggregatedDistanceMeasure): + def __init__( + self, + name="DTWAggregatedDistanceMeasure", + default_distance: float = 0, + aggregation_strategy: AggregationStrategy = "mean", + metric: str = "euclidean", + ) -> None: + super().__init__( + name=name, + default_distance=default_distance, + aggregation_strategy=aggregation_strategy, + ) + self.metric = metric + + def _calculate_pointwise_distances(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray) -> ma.MaskedArray: + hyp_data = hyp_data.reshape(1, -1) # Adds a new leading dimension + ref_data = ref_data.reshape(1, -1) + + return cdist(hyp_data, ref_data, metric=self.metric) # type: ignore "no overloads match" but it works fine + + +class DTWOptimizedDistanceMeasure(DTWAggregatedDistanceMeasure): + """Optimized according to https://github.com/slaypni/fastdtw/blob/master/fastdtw/_fastdtw.pyx#L71-L76 + This function runs fastest if the following conditions are satisfied: + 1) x and y are either 1 or 2d numpy arrays whose dtype is a + subtype of np.float + 2) The dist input is a positive integer or None + """ + + def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments + self, + name="DTWOptimizedDistanceMeasure", + default_distance: float = 0, + aggregation_strategy: AggregationStrategy = "mean", + power=2, + masked_fill_value=0, + ) -> None: + super().__init__( + name=name, + default_distance=default_distance, + aggregation_strategy=aggregation_strategy, + ) + self.power = power + self.masked_fill_value = masked_fill_value + + def get_distance(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray, progress=False) -> float: + # https://github.com/slaypni/fastdtw/blob/master/fastdtw/_fastdtw.pyx#L71-L76 + # fastdtw goes more quickly if... + # 1) x and y are either 1 or 2d numpy arrays whose dtype is a + # subtype of np.float + # 2) The dist input is a positive integer or None + # + # So we convert to ndarray by filling in masked values, and we ensure the datatype. + hyp_data = hyp_data.filled(self.masked_fill_value).astype(float) + ref_data = ref_data.filled(self.masked_fill_value).astype(float) + keypoint_count = hyp_data.shape[2] # Assuming shape: (frames, person, keypoints, xyz) + trajectory_distances = ma.empty(keypoint_count) # Preallocate a NumPy array + + for i, (hyp_trajectory, ref_trajectory) in tqdm( + enumerate(self._get_keypoint_trajectories(hyp_data, ref_data)), + desc="getting dtw distances for trajectories", + total=keypoint_count, + disable=not progress, + ): + distance, _ = fastdtw(hyp_trajectory, ref_trajectory, self.power) + trajectory_distances[i] = distance # Store distance in the preallocated array + trajectory_distances = ma.array(trajectory_distances) + return self._aggregate(trajectory_distances) + + +# https://forecastegy.com/posts/dynamic-time-warping-dtw-libraries-python-examples/ +class DTWDTAIImplementationDistanceMeasure(AggregatedDistanceMeasure): + + def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments + self, + name="dtaiDTWAggregatedDistanceMeasure", + default_distance: float = 0, + aggregation_strategy: AggregationStrategy = "mean", + masked_fill_value=0, + use_fast=True, + ) -> None: + super().__init__( + name=name, + default_distance=default_distance, + aggregation_strategy=aggregation_strategy, + ) + self.masked_fill_value = masked_fill_value + self.use_fast = use_fast + + def get_distance(self, hyp_data: ma.MaskedArray, ref_data: ma.MaskedArray, progress=False) -> float: + hyp_data = hyp_data.filled(self.masked_fill_value) + ref_data = ref_data.filled(self.masked_fill_value) + keypoint_count = hyp_data.shape[2] # Assuming shape: (frames, person, keypoints, xyz) + trajectory_distances = ma.empty(keypoint_count) # Preallocate a NumPy array + + for i, (hyp_trajectory, ref_trajectory) in tqdm( + enumerate(self._get_keypoint_trajectories(hyp_data, ref_data)), + desc="getting dtw distances for trajectories", + total=keypoint_count, + disable=not progress, + ): + if self.use_fast: + distance = dtw_ndim.distance_fast(hyp_trajectory, ref_trajectory) # about 8s per call + else: + distance = dtw_ndim.distance(hyp_trajectory, ref_trajectory) # about 8s per call + + trajectory_distances[i] = distance # Store distance in the preallocated array + trajectory_distances = ma.array(trajectory_distances) + return self._aggregate(trajectory_distances) diff --git a/pose_evaluation/metrics/embedding_distance_metric.py b/pose_evaluation/metrics/embedding_distance_metric.py index 7475680..e3e1a76 100644 --- a/pose_evaluation/metrics/embedding_distance_metric.py +++ b/pose_evaluation/metrics/embedding_distance_metric.py @@ -1,5 +1,5 @@ import logging -from typing import Literal, List, Union +from typing import Literal, List, Optional, Union import numpy as np import torch @@ -28,13 +28,13 @@ class EmbeddingDistanceMetric(EmbeddingMetric): def __init__( self, kind: ValidDistanceKinds = "cosine", - device: Union[torch.device, str] = None, + device: Optional[Union[torch.device, str]] = None, dtype=None, ): """ Args: kind (ValidDistanceKinds): The type of distance metric, e.g. "cosine", or "euclidean". - device (Union[torch.device, str]): The device to use for computation. + device:The device to use for computation. If None, automatically detects. dtype (torch.dtype): The data type to use for tensors. If None, uses torch.get_default_dtype() @@ -83,7 +83,9 @@ def _to_batch_tensor_on_device(self, data: TensorConvertableType) -> Tensor: # https://stackoverflow.com/questions/55050717/converting-list-of-tensors-to-tensors-pytorch data = torch.stack(data) - return st_util._convert_to_batch_tensor(data).to(device=self.device, dtype=self.dtype) + return st_util._convert_to_batch_tensor(data).to( # pylint: disable=protected-access + device=self.device, dtype=self.dtype + ) def score(self, hypothesis: TensorConvertableType, reference: TensorConvertableType) -> Number: """ diff --git a/pose_evaluation/metrics/pose_processors.py b/pose_evaluation/metrics/pose_processors.py new file mode 100644 index 0000000..89ac2b0 --- /dev/null +++ b/pose_evaluation/metrics/pose_processors.py @@ -0,0 +1,155 @@ +from typing import Any, List, Union, Iterable, Callable + +from tqdm import tqdm + +from pose_format import Pose +from pose_format.utils.generic import pose_hide_legs, reduce_holistic +from pose_evaluation.metrics.base import Signature +from pose_evaluation.utils.pose_utils import ( + zero_pad_shorter_poses, + reduce_poses_to_intersection, +) + +PosesTransformerFunctionType = Callable[[Iterable[Pose]], List[Pose]] + + +class PoseProcessor: + _SIGNATURE_TYPE = Signature + + def __init__(self, name="PoseProcessor") -> None: + self.name = name + + def __call__(self, pose_or_poses: Union[Iterable[Pose], Pose]) -> Any: + if isinstance(pose_or_poses, Iterable): + return self.process_poses(pose_or_poses) + + return self.process_pose(pose_or_poses) + + def __repr__(self) -> str: + return str(self.get_signature()) + + def __str__(self) -> str: + return self.get_signature().format() + + def process_pose(self, pose: Pose) -> Pose: + raise NotImplementedError(f"process_pose not implemented for {self.name}") + + def process_poses(self, poses: Iterable[Pose], progress=False) -> List[Pose]: + return [self.process_pose(pose) for pose in tqdm(poses, desc=f"{self.name}", disable=not progress)] + + def get_signature(self) -> Signature: + return self._SIGNATURE_TYPE(self.name, self.__dict__) + + +class NormalizePosesSignature(Signature): + def __init__(self, name: str, args: dict): + super().__init__(name, args) + self.update_signature_and_abbr("scale_factor", "s", args) + self.update_signature_and_abbr("info", "i", args) + + +class NormalizePosesProcessor(PoseProcessor): + _SIGNATURE_TYPE = NormalizePosesSignature + + def __init__(self, info=None, scale_factor=1) -> None: + super().__init__("normalize_poses") + self.info = info + self.scale_factor = scale_factor + + def process_pose(self, pose: Pose) -> Pose: + return pose.normalize(self.info, self.scale_factor) + + +class RemoveWorldLandmarksProcessor(PoseProcessor): + def __init__(self, name="remove_world_landmarks") -> None: + super().__init__(name) + + def process_pose(self, pose: Pose) -> Pose: + return pose.remove_components(["WORLD_LANDMARKS"]) + + +class HideLegsPosesProcessor(PoseProcessor): + def __init__(self, name="hide_legs", remove=True) -> None: + super().__init__(name) + self.remove = remove + + def process_pose(self, pose: Pose) -> Pose: + return pose_hide_legs(pose, remove=self.remove) + + +class ReducePosesToCommonComponentsProcessor(PoseProcessor): + def __init__(self, name="reduce_poses_to_intersection") -> None: + super().__init__(name) + + def process_pose(self, pose: Pose) -> Pose: + return self.process_poses([pose])[0] + + def process_poses(self, poses: Iterable[Pose], progress=False) -> List[Pose]: + return reduce_poses_to_intersection(poses, progress=progress) + + +class ZeroPadShorterPosesProcessor(PoseProcessor): + def __init__(self) -> None: + super().__init__(name="zero_pad_shorter_sequence") + + def process_pose(self, pose: Pose) -> Pose: + return pose # intersection with itself + + def process_poses(self, poses: Iterable[Pose], progress=False) -> List[Pose]: + return zero_pad_shorter_poses(poses) + + +class ReduceHolisticPoseProcessor(PoseProcessor): + def __init__(self) -> None: + super().__init__(name="reduce_holistic") + + def process_pose(self, pose: Pose) -> Pose: + return reduce_holistic(pose) + + +class ZeroFillMaskedValuesPoseProcessor(PoseProcessor): + def __init__(self) -> None: + super().__init__(name="reduce_holistic") + + def process_pose(self, pose: Pose) -> Pose: + pose = pose.copy() + pose.body = pose.body.zero_filled() + return pose + + +def get_standard_pose_processors( # pylint: disable=too-many-arguments,too-many-positional-arguments + normalize_poses: bool = True, + reduce_poses_to_common_components: bool = True, + remove_world_landmarks=True, + remove_legs=True, + reduce_holistic_to_face_and_upper_body=False, + zero_fill_masked=False, + zero_pad_shorter=True, +) -> List[PoseProcessor]: + pose_processors = [] + + if normalize_poses: + pose_processors.append(NormalizePosesProcessor()) + + if reduce_poses_to_common_components: + pose_processors.append(ReducePosesToCommonComponentsProcessor()) + + if remove_world_landmarks: + pose_processors.append(RemoveWorldLandmarksProcessor()) + + if remove_legs: + pose_processors.append(HideLegsPosesProcessor()) + + if reduce_holistic_to_face_and_upper_body: + pose_processors.append(ReduceHolisticPoseProcessor()) + + if zero_fill_masked: + pose_processors.append(ZeroFillMaskedValuesPoseProcessor()) + + if zero_pad_shorter: + pose_processors.append(ZeroPadShorterPosesProcessor()) + + # TODO: prune leading/trailing frames containing "almost all zeros, almost no face, or no hands" + # TODO: Focus processor https://github.com/rotem-shalev/Ham2Pose/blob/main/metrics.py#L32-L62 + + return pose_processors diff --git a/pose_evaluation/metrics/segmented_metric.py b/pose_evaluation/metrics/segmented_metric.py index 0ebb821..12fe83e 100644 --- a/pose_evaluation/metrics/segmented_metric.py +++ b/pose_evaluation/metrics/segmented_metric.py @@ -1,15 +1,16 @@ +from abc import ABC from importlib import resources import numpy as np from pose_format import Pose from scipy.optimize import linear_sum_assignment -from sign_language_segmentation.bin import load_model, predict, process_pose +from sign_language_segmentation.bin import load_model, predict from sign_language_segmentation.src.utils.probs_to_segments import probs_to_segments from pose_evaluation.metrics.base_pose_metric import PoseMetric -class SegmentedPoseMetric(PoseMetric): +class SegmentedPoseMetric(PoseMetric, ABC): def __init__(self, isolated_metric: PoseMetric): super().__init__("SegmentedPoseMetric", higher_is_better=isolated_metric.higher_is_better) @@ -21,8 +22,7 @@ def __init__(self, isolated_metric: PoseMetric): # pylint: disable=too-many-locals def score(self, hypothesis: Pose, reference: Pose) -> float: # Process input files - processed_hypothesis = process_pose(hypothesis) - processed_reference = process_pose(reference) + processed_hypothesis, processed_reference = self.process_poses([hypothesis, reference]) # Predict segments BIO hypothesis_probs = predict(self.segmentation_model, processed_hypothesis)["sign"] reference_probs = predict(self.segmentation_model, processed_reference)["sign"] @@ -30,7 +30,7 @@ def score(self, hypothesis: Pose, reference: Pose) -> float: hypothesis_signs = probs_to_segments(hypothesis_probs, 60, 50) reference_signs = probs_to_segments(reference_probs, 60, 50) - print(hypothesis_signs) # TODO convert segmentes to Pose objects + print(hypothesis_signs) # TODO convert segments to Pose objects # Fallback to isolated metric if no segments are found if len(hypothesis_signs) == 0 or len(reference_signs) == 0: diff --git a/pose_evaluation/metrics/test_distance_metric.py b/pose_evaluation/metrics/test_distance_metric.py index 4f97e70..2be6136 100644 --- a/pose_evaluation/metrics/test_distance_metric.py +++ b/pose_evaluation/metrics/test_distance_metric.py @@ -8,12 +8,22 @@ from pose_evaluation.metrics.distance_measure import AggregatedPowerDistance from pose_evaluation.metrics.distance_metric import DistanceMetric +from pose_evaluation.metrics.pose_processors import get_standard_pose_processors -def get_poses( + +def get_poses( # pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-locals length1: int, length2: int, conf1: Optional[float] = None, conf2: Optional[float] = None, + people1: int = 3, + people2: int = 3, + keypoints1: int = 4, + keypoints2: int = 4, + coordinate_dimensions1: int = 3, + coordinate_dimensions2: int = 3, + fill_value1: float = 1.0, + fill_value2: float = 0.0, ): """ Utility function to generate hypothesis and reference Pose objects for testing. @@ -27,8 +37,9 @@ def get_poses( Returns: tuple: A tuple containing (hypothesis, reference) Pose objects. """ - data_tensor = np.full((length1, 3, 4, 3), fill_value=2) - zeros_tensor = np.zeros((length2, 3, 4, 3)) + + data_tensor = np.full([length1, people1, keypoints1, coordinate_dimensions1], fill_value=fill_value1) + zeros_tensor = np.full((length2, people2, keypoints2, coordinate_dimensions2), fill_value=fill_value2) data_confidence = np.ones(data_tensor.shape[:-1]) zeros_confidence = np.ones(zeros_tensor.shape[:-1]) @@ -57,11 +68,21 @@ def setUp(self): self.metric = DistanceMetric( "mean_l1_metric", distance_measure=AggregatedPowerDistance(order=1, default_distance=0), + # preprocessors that won't crash + pose_preprocessors=get_standard_pose_processors( + normalize_poses=False, + remove_world_landmarks=False, + remove_legs=False, + reduce_poses_to_common_components=False, + ), ) def test_score_equal_length(self): - hypothesis, reference = get_poses(2, 2) - expected_distance = 6 # Sum of absolute differences: 2 + 2 + 2 + hypothesis, reference = get_poses(3, 3) + + # each pointwise distance is 3 (1+1+1) + # Since they're all the same, the mean is also 3 + expected_distance = 3 score = self.metric.score(hypothesis, reference) self.assertIsInstance(score, float) @@ -76,6 +97,13 @@ def setUp(self): self.metric = DistanceMetric( "l2_metric", distance_measure=AggregatedPowerDistance(order=2, default_distance=self.default_distance), + # preprocessors that won't crash + pose_preprocessors=get_standard_pose_processors( + normalize_poses=False, + remove_world_landmarks=False, + remove_legs=False, + reduce_poses_to_common_components=False, + ), ) def _check_against_expected(self, hypothesis, reference, expected): @@ -84,8 +112,10 @@ def _check_against_expected(self, hypothesis, reference, expected): self.assertAlmostEqual(score, expected) def test_score_equal_length(self): - hypothesis, reference = get_poses(2, 2) - expected_distance = np.sqrt(2**2 + 2**2 + 2**2) # sqrt(12) + hypothesis, reference = get_poses(3, 3) + # pointwise distance is sqrt((1-0)**2 + (1-0)**2 + (1-0)**2) = sqrt(3) + # Every pointwise distance is the same, so the mean is also 3. + expected_distance = np.sqrt(3) self._check_against_expected(hypothesis, reference, expected=expected_distance) def test_score_equal_length_one_masked(self): diff --git a/pose_evaluation/metrics/test_dtw_metric.py b/pose_evaluation/metrics/test_dtw_metric.py new file mode 100644 index 0000000..313a26e --- /dev/null +++ b/pose_evaluation/metrics/test_dtw_metric.py @@ -0,0 +1,45 @@ +import unittest +from pose_evaluation.metrics.distance_metric import DistanceMetric +from pose_evaluation.metrics.pose_processors import get_standard_pose_processors +from pose_evaluation.metrics.test_distance_metric import get_poses +from pose_evaluation.metrics.dtw_metric import DTWAggregatedPowerDistanceMeasure + + +class TestDTWMetricL1(unittest.TestCase): + def setUp(self): + distance_measure = DTWAggregatedPowerDistanceMeasure(order=1, aggregation_strategy="mean", default_distance=0.0) + self.metric = DistanceMetric( + name="DTWPowerDistance", + distance_measure=distance_measure, + pose_preprocessors=get_standard_pose_processors( + normalize_poses=False, # no shoulders, will crash + remove_world_landmarks=False, # there are none, will crash + reduce_poses_to_common_components=False, # removes all components, there are none in common + remove_legs=False, # there are none, it will crash + zero_pad_shorter=False, # defeats the point of dtw + reduce_holistic_to_face_and_upper_body=False, + ), + ) + self.poses_supplier = get_poses + + def test_score_equal_length(self): + hypothesis, reference = self.poses_supplier(3, 3) + expected_distance = 9 + + score = self.metric.score(hypothesis, reference) + self.assertIsInstance(score, float) + self.assertAlmostEqual(score, expected_distance) + + def test_score_unequal_length(self): + hypothesis, reference = self.poses_supplier(2, 3, people1=1, people2=1) + + # fastdtw creates mappings such that they are effectively both length 3. + # Each of the point distances are (0,0,0) to (1,1,1), so 1+1+1=3 + # 3 pointwise distances, so 3*3 = 9 + # Those are then aggregated by mean aggregation across keypoints, but they're all 9. + # so mean of all 9s is 9 + expected_distance = 9 + + score = self.metric.score(hypothesis, reference) + self.assertIsInstance(score, float) + self.assertAlmostEqual(score, expected_distance) diff --git a/pose_evaluation/metrics/test_embedding_distance_metric.py b/pose_evaluation/metrics/test_embedding_distance_metric.py index 1bf992a..41f0d74 100644 --- a/pose_evaluation/metrics/test_embedding_distance_metric.py +++ b/pose_evaluation/metrics/test_embedding_distance_metric.py @@ -1,7 +1,7 @@ import itertools import logging from pathlib import Path -from typing import List, Callable, Tuple +from typing import List, Callable, Optional, Tuple import matplotlib.pyplot as plt import numpy as np @@ -64,13 +64,13 @@ def call_and_call_with_inputs_swapped( return score1, score2 -def call_with_both_input_orders_and_do_standard_checks( +def call_with_both_input_orders_and_do_standard_checks( # pylint: disable=too-many-arguments,too-many-positional-arguments hyps: torch.Tensor, refs: torch.Tensor, scoring_function: Callable[[torch.Tensor, torch.Tensor], torch.Tensor], distance_range_checker, distance_matrix_shape_checker, - expected_shape: Tuple = None, + expected_shape: Optional[Tuple] = None, ): scores, scores2 = call_and_call_with_inputs_swapped(hyps, refs, scoring_function) if expected_shape is not None: @@ -432,7 +432,7 @@ def test_score_mixed_input_types(cosine_metric): emb2 = torch.rand(768) all_scores = call_and_call_with_inputs_swapped(emb1, emb2, cosine_metric.score) - assert all([isinstance(score, float) for score in all_scores]), "Output should be a float." + assert all(isinstance(score, float) for score in all_scores), "Output should be a float." def test_score_all_mixed_input_types(cosine_metric, distance_range_checker, distance_matrix_shape_checker): diff --git a/pose_evaluation/utils/conftest.py b/pose_evaluation/utils/conftest.py index 6e4e3ef..f5bc3da 100644 --- a/pose_evaluation/utils/conftest.py +++ b/pose_evaluation/utils/conftest.py @@ -12,6 +12,7 @@ from pose_evaluation.utils.pose_utils import load_pose_file + utils_test_data_dir = Path(__file__).parent / "test" / "test_data" @@ -22,7 +23,7 @@ def mediapipe_poses_test_data_paths() -> List[Path]: @pytest.fixture(scope="function") -def mediapipe_poses_test_data(mediapipe_poses_test_data_paths) -> List[Pose]: +def mediapipe_poses_test_data(mediapipe_poses_test_data_paths) -> List[Pose]: # pylint: disable=redefined-outer-name original_poses = [load_pose_file(pose_path) for pose_path in mediapipe_poses_test_data_paths] # I ran into issues where if one test would modify a Pose, it would affect other tests. # specifically, pose.header.components[0].name = unsupported_component_name in test_detect_format diff --git a/pose_evaluation/utils/pose_utils.py b/pose_evaluation/utils/pose_utils.py index 53cb353..b7b5e68 100644 --- a/pose_evaluation/utils/pose_utils.py +++ b/pose_evaluation/utils/pose_utils.py @@ -5,6 +5,7 @@ import numpy as np from numpy import ma from pose_format import Pose +from tqdm import tqdm def pose_remove_world_landmarks(pose: Pose) -> Pose: @@ -42,7 +43,10 @@ def load_pose_file(pose_path: Path) -> Pose: return pose -def reduce_poses_to_intersection(poses: Iterable[Pose]) -> List[Pose]: +def reduce_poses_to_intersection( + poses: Iterable[Pose], + progress=False, +) -> List[Pose]: poses = list(poses) # get a list, no need to copy # look at the first pose @@ -50,7 +54,7 @@ def reduce_poses_to_intersection(poses: Iterable[Pose]) -> List[Pose]: points = {c.name: set(c.points) for c in poses[0].header.components} # remove anything that other poses don't have - for pose in poses[1:]: + for pose in tqdm(poses[1:], desc="reduce poses to intersection", disable=not progress): component_names.intersection_update({c.name for c in pose.header.components}) for component in pose.header.components: points[component.name].intersection_update(set(component.points)) diff --git a/pose_evaluation/utils/test_pose_utils.py b/pose_evaluation/utils/test_pose_utils.py index e9f778f..67090f0 100644 --- a/pose_evaluation/utils/test_pose_utils.py +++ b/pose_evaluation/utils/test_pose_utils.py @@ -2,6 +2,7 @@ from typing import List, Dict import numpy as np + import pytest from pose_format import Pose from pose_format.utils.generic import detect_known_pose_format, pose_hide_legs diff --git a/pyproject.toml b/pyproject.toml index c8750b0..6981b2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,20 +5,23 @@ version = "0.0.1" authors = [ { name = "Zifan Jiang", email = "zifan.jiang@uzh.ch" }, { name = "Colin Leong", email = "cleong1@udayton.edu" }, - { name = "Amit Moryossef", email = "amitmoryossef@gmail.com" } + { name = "Amit Moryossef", email = "amitmoryossef@gmail.com" }, ] readme = "README.md" dependencies = [ "pose-format", "scipy", - "torch", - "numpy", # possibly could replace all with torch + "torch", + "numpy", # possibly could replace all with torch # for various vector/tensor similarities and distances in torch "sentence-transformers", # For reading .csv files, etc - "pandas", + "pandas", # For segment similarity - "sign_language_segmentation @ git+https://github.com/sign-language-processing/segmentation" + "sign_language_segmentation @ git+https://github.com/sign-language-processing/segmentation", + "fastdtw", + # alternative to fastdtw + "dtaidistance", ] [project.optional-dependencies] @@ -27,7 +30,7 @@ dev = [ "pylint", "black", # to plot metric evaluation results - "matplotlib" + "matplotlib", ] [tool.yapf] @@ -48,10 +51,7 @@ disable = [ line-length = 120 [tool.setuptools] -packages = [ - "pose_evaluation", - "pose_evaluation.metrics", -] +packages = ["pose_evaluation", "pose_evaluation.metrics"] [tool.pytest.ini_options] addopts = "-v"