Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0c7bb56
CDL: copying code by @j22melody, as requested.
cleong110 Nov 18, 2024
db9056e
Merge branch 'main' into ham2pose_metrics
cleong110 Jan 6, 2025
e5f703f
CDL: add new req for fastdtw
cleong110 Jan 6, 2025
a7ca062
Start ndtw_mje, add common functions for pose preprocessing, etc
cleong110 Jan 6, 2025
2dee6ce
Edit the name of a test function to avoid potential collisions
cleong110 Jan 6, 2025
04472e1
Stubbed test file
cleong110 Jan 6, 2025
1f5767d
A bit of pylint cleanup
cleong110 Jan 6, 2025
07225cb
Preprocessing for poses, and some type annotations, and a bit of refa…
cleong110 Jan 7, 2025
e931966
adding tests for local pose_utils
cleong110 Jan 8, 2025
7bc5371
some gitignore updates
cleong110 Jan 8, 2025
4369938
Fixing a few type issues
cleong110 Jan 8, 2025
bdf5d73
adding test data
cleong110 Jan 9, 2025
99e27f9
remove instead of hide legs in pose_utils
cleong110 Jan 9, 2025
ec09e3c
Take out temp test code
cleong110 Jan 9, 2025
66361ca
fix forgetting to assign in preprocess_pose
cleong110 Jan 9, 2025
389abe2
some minor fixes in tests
cleong110 Jan 9, 2025
8344d54
euclidean, not l2
cleong110 Jan 9, 2025
e9e8cc1
Transitioning to pytest from unittest
cleong110 Jan 10, 2025
4334fb8
Trying to figure out pytest
cleong110 Jan 10, 2025
92ec8e0
Caught another L2
cleong110 Jan 15, 2025
251ad26
basic scoring script
cleong110 Jan 15, 2025
0449eba
implement ape_metric
cleong110 Jan 15, 2025
daedc1e
Very WIP, pushing code for the day
cleong110 Jan 15, 2025
72cec6d
Starting the move to PoseProcessors
cleong110 Jan 17, 2025
74e016e
adding in the set_masked_values_to_zero
cleong110 Jan 27, 2025
d2d1759
Pushing all changes as-is
cleong110 Jan 27, 2025
d8dd461
Cleaning u and implementing separate DTW and Distance Metrics
cleong110 Jan 27, 2025
3517a76
I can build the basic Ham2Pose metrics!
cleong110 Jan 28, 2025
688fcee
remove unused file
cleong110 Jan 30, 2025
57f6932
remove unused alignment_strategy
cleong110 Jan 30, 2025
a636406
Various pylint changes
cleong110 Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
.idea/
build/
pose_evaluation.egg-info/
**/__pycache__/
**/__pycache__/
.coverage
.vscode/
coverage.lcov
**/test_data/
*.npz
*.code-workspace
2 changes: 1 addition & 1 deletion pose_evaluation/metrics/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
temp/
tests
46 changes: 46 additions & 0 deletions pose_evaluation/metrics/aggregate_distances.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Callable, Iterable, Literal
from numpy.ma import MaskedArray
from pose_evaluation.metrics.base import Signature, SignatureMixin


AggregationStrategy = Literal["sum", "mean", "max"]
DistanceAggregatorFunction = Callable[[Iterable[float]], float]


class DistanceAggregatorSignature(Signature):
def __init__(self, args: dict):
super().__init__(args)
self.update_signature_and_abbr("aggregation_strategy", "s", args)


class DistanceAggregator(SignatureMixin):
_SIGNATURE_TYPE = DistanceAggregatorSignature

def __init__(self, aggregation_strategy: AggregationStrategy) -> None:
self.aggregator_function = get_aggregator_function(
strategy=aggregation_strategy
)
self.aggregation_strategy = aggregation_strategy

def __call__(self, distances: Iterable[float]) -> float:
return self.aggregator_function(distances)


def create_maskedarrray_and_cast_result_to_float(
callable_to_wrap: Callable,
) -> DistanceAggregatorFunction:
return lambda a: float(callable_to_wrap(MaskedArray(a)))


def get_aggregator_function(
strategy: AggregationStrategy,
) -> DistanceAggregatorFunction:

if strategy == "max":
return create_maskedarrray_and_cast_result_to_float(MaskedArray.max)

if strategy == "mean":
return create_maskedarrray_and_cast_result_to_float(MaskedArray.mean)

if strategy == "sum":
return create_maskedarrray_and_cast_result_to_float(MaskedArray.sum)
72 changes: 71 additions & 1 deletion pose_evaluation/metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,76 @@
# pylint: disable=undefined-variable
from tqdm import tqdm
from typing import Any, Callable

class Signature:
"""Represents reproducibility signatures for metrics. Inspired by sacreBLEU
"""
def __init__(self, args: dict):

self._abbreviated = {
"name":"n",
"higher_is_better":"hb"
}

self.signature_info = {
"name": args.get("name", None),
"higher_is_better": args.get("higher_is_better", None)
}

def update(self, key: str, value: Any):
self.signature_info[key] = value

def update_signature_and_abbr(self, key:str, abbr:str, args:dict):
self._abbreviated.update({
key: abbr
})

self.signature_info.update({
key: args.get(key, None)
})

def format(self, short: bool = False) -> str:
pairs = []
keys = list(self.signature_info.keys())
for name in keys:
value = self.signature_info[name]
if value is not None:
# Check for nested signature objects
if hasattr(value, "get_signature"):

# Wrap nested signatures in brackets
nested_signature = value.get_signature()
if isinstance(nested_signature, Signature):
nested_signature = nested_signature.format(short=short)
value = f"{{{nested_signature}}}"
if isinstance(value, bool):
# Replace True/False with yes/no
value = "yes" if value else "no"
if isinstance(value, Callable):
value = value.__name__
final_name = self._abbreviated[name] if short else name
pairs.append(f"{final_name}:{value}")

return "|".join(pairs)

def __str__(self):
return self.format()

def __repr__(self):
return self.format()


class SignatureMixin:
_SIGNATURE_TYPE = Signature
def get_signature(self) -> Signature:
return self._SIGNATURE_TYPE(self.__dict__)

class BaseMetric[T]:
"""Base class for all metrics."""
# Each metric should define its Signature class' name here
_SIGNATURE_TYPE = Signature

def __init__(self, name: str, higher_is_better: bool = True):
def __init__(self, name: str, higher_is_better: bool = False):
self.name = name
self.higher_is_better = higher_is_better

Expand Down Expand Up @@ -38,3 +103,8 @@ def score_all(self, hypotheses: list[T], references: list[T], progress_bar=True)

def __str__(self):
return self.name

def get_signature(self) -> Signature:
return self._SIGNATURE_TYPE(self.__dict__)


54 changes: 52 additions & 2 deletions pose_evaluation/metrics/base_pose_metric.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,55 @@
from typing import Iterable, List, cast, Union
from pose_format import Pose
from pose_evaluation.metrics.pose_processors import PoseProcessor

from pose_evaluation.metrics.base import BaseMetric
from pose_evaluation.metrics.base import BaseMetric, Signature

PoseMetric = BaseMetric[Pose]

class PoseMetricSignature(Signature):

def __init__(self, args: dict):
super().__init__(args)

self._abbreviated.update({"pose_preprocessers": "pre"})

pose_preprocessors = args.get("pose_preprocessers", None)
prep_string = ""
if pose_preprocessors is not None:
prep_string = (
"{" + "|".join([f"{prep}" for prep in pose_preprocessors]) + "}"
)

self.signature_info.update(
{"pose_preprocessers": prep_string if pose_preprocessors else None}
)


class PoseMetric(BaseMetric[Pose]):
_SIGNATURE_TYPE = PoseMetricSignature

def __init__(
self,
name: str = "PoseMetric",
higher_is_better: bool = False,
pose_preprocessors: Union[None, List[PoseProcessor]] = None,
):

super().__init__(name, higher_is_better)
if pose_preprocessors is None:
self.pose_preprocessers = []
else:
self.pose_preprocessers = pose_preprocessors

def score(self, hypothesis: Pose, reference: Pose) -> float:
hypothesis, reference = self.process_poses([hypothesis, reference])
return self.score(hypothesis, reference)

def process_poses(self, poses: Iterable[Pose]) -> List[Pose]:
poses = list(poses)
for preprocessor in self.pose_preprocessers:
preprocessor = cast(PoseProcessor, preprocessor)
poses = preprocessor.process_poses(poses)
return poses

def add_preprocessor(self, processor: PoseProcessor):
self.pose_preprocessers.append(processor)
94 changes: 94 additions & 0 deletions pose_evaluation/metrics/build_ham2pose_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from pose_evaluation.metrics.distance_measure import (
PowerDistance,
)
from pose_evaluation.metrics.distance_metric import DistanceMetric
from pose_evaluation.metrics.ham2pose_distances import (
Ham2PoseMSEDistance,
Ham2PoseMaskedEuclideanDistance,
Ham2PoseAPEDistance,
)
from pose_evaluation.metrics.mje_metric import MeanJointErrorMetric
from pose_evaluation.metrics.dynamic_time_warping_metric import DTWMetric
from pose_evaluation.metrics.pose_processors import (
get_standard_pose_processors,
)

if __name__ == "__main__":

metrics = []
MJEMetric = (
MeanJointErrorMetric()
) # automatically sets distance measure, zero-padding.
metrics.append(MJEMetric)

Ham2Pose_DTW_MJE_Metric = DTWMetric(
name="DTW_MJE",
distance_measure=PowerDistance(2, 0),
pose_preprocessors=get_standard_pose_processors(zero_pad_shorter=False),
)

Ham2Pose_nDTW_MJE_Metric = DTWMetric(
name="nDTW_MJE",
distance_measure=Ham2PoseMaskedEuclideanDistance(),
pose_preprocessors=get_standard_pose_processors(zero_pad_shorter=False),
)

metrics.append(Ham2Pose_DTW_MJE_Metric)
metrics.append(Ham2Pose_nDTW_MJE_Metric)

# Ham2Pose APE is a PowerDistance. But with a few preprocessors.
# 1. standard preprocessors
# 2. then these: basically this is "zero_pad_shorter", and also setting masked values to zero.
# if len(trajectory1) < len(trajectory2):
# diff = len(trajectory2) - len(trajectory1)
# trajectory1 = np.concatenate((trajectory1, np.zeros((diff, 3))))
# elif len(trajectory2) < len(trajectory1):
# trajectory2 = np.concatenate((trajectory2, np.zeros((len(trajectory1) - len(trajectory2), 3))))
# pose1_mask = np.ma.getmask(trajectory1)
# pose2_mask = np.ma.getmask(trajectory2)
# trajectory1[pose1_mask] = 0
# trajectory1[pose2_mask] = 0
# trajectory2[pose1_mask] = 0
# trajectory2[pose2_mask] = 0
# 3. pointwise aggregate by SUM
# sq_error = np.power(trajectory1 - trajectory2, 2).sum(-1)
# 4. trajectorywise aggregate by MEAN
# np.sqrt(sq_error).mean()
Ham2PoseAPEMetric = DistanceMetric(
name="Ham2PoseAPEMetric",
distance_measure=Ham2PoseAPEDistance(),
pose_preprocessors=get_standard_pose_processors(
zero_pad_shorter=True, set_masked_values_to_origin=True
),
)
metrics.append(Ham2PoseAPEMetric)

# MSE from Ham2Pose is zero-padding, plus set to origin, and then squared error.
# if len(trajectory1) < len(trajectory2):
# diff = len(trajectory2) - len(trajectory1)
# trajectory1 = np.concatenate((trajectory1, np.zeros((diff, 3))))
# elif len(trajectory2) < len(trajectory1):
# trajectory2 = np.concatenate((trajectory2, np.zeros((len(trajectory1) - len(trajectory2), 3))))
# pose1_mask = np.ma.getmask(trajectory1)
# pose2_mask = np.ma.getmask(trajectory2)
# trajectory1[pose1_mask] = 0
# trajectory1[pose2_mask] = 0
# trajectory2[pose1_mask] = 0
# trajectory2[pose2_mask] = 0
# sq_error = np.power(trajectory1 - trajectory2, 2).sum(-1)
# return sq_error.mean()
Ham2PoseMSEMetric = DistanceMetric(
name="Ham2PoseMSEMetric",
distance_measure=Ham2PoseMSEDistance(),
pose_preprocessors=get_standard_pose_processors(
zero_pad_shorter=True, set_masked_values_to_origin=True
),
)
metrics.append(Ham2PoseMSEMetric)

for metric in metrics:
print("*" * 30)
print(f"METRIC: {metric}")
print(metric.get_signature())
print(metric.get_signature().format(short=True))
print()
Loading