diff --git a/src/tdamapper/_common.py b/src/tdamapper/_common.py index 629701b..14aa0d8 100644 --- a/src/tdamapper/_common.py +++ b/src/tdamapper/_common.py @@ -27,7 +27,7 @@ def deprecated(msg: str) -> Callable[..., Any]: """ def deprecated_func(func: Callable[..., Any]) -> Callable[..., Any]: - def wrapper(*args: list[Any], **kwargs: dict[str, Any]) -> Any: + def wrapper(*args: list[Any], **kwargs: Any) -> Any: warnings.warn(msg, DeprecationWarning, stacklevel=2) return func(*args, **kwargs) @@ -179,10 +179,12 @@ def __repr__(self) -> str: obj_noargs = type(self)() args_repr = [] for k, v in self.__dict__.items(): + if not self._is_param_public(k): + continue v_default = getattr(obj_noargs, k) v_default_repr = repr(v_default) v_repr = repr(v) - if self._is_param_public(k) and not v_repr == v_default_repr: + if not v_repr == v_default_repr: args_repr.append(f"{k}={v_repr}") return f"{self.__class__.__name__}({', '.join(args_repr)})" @@ -211,7 +213,7 @@ def profile(n_lines: int = 10) -> Callable[..., Any]: """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: - def wrapper(*args: list[Any], **kwargs: dict[str, Any]) -> Any: + def wrapper(*args: list[Any], **kwargs: Any) -> Any: profiler = cProfile.Profile() profiler.enable() result = func(*args, **kwargs) diff --git a/src/tdamapper/app.py b/src/tdamapper/app.py index 1dda0ca..aac6adc 100644 --- a/src/tdamapper/app.py +++ b/src/tdamapper/app.py @@ -202,7 +202,7 @@ def _umap(X: NDArray[np.float_]) -> NDArray[np.float_]: def run_mapper( - df: pd.DataFrame, **kwargs: dict[str, Any] + df: pd.DataFrame, **kwargs: Any ) -> Optional[tuple[nx.Graph, pd.DataFrame]]: """ Runs the Mapper algorithm on the provided DataFrame and returns the Mapper @@ -301,7 +301,7 @@ def create_mapper_figure( df_y: pd.DataFrame, df_target: pd.DataFrame, mapper_graph: nx.Graph, - **kwargs: dict[str, Any], + **kwargs: Any, ) -> go.Figure: """ Renders the Mapper graph as a Plotly figure. diff --git a/src/tdamapper/core.py b/src/tdamapper/core.py index 333e567..d251643 100644 --- a/src/tdamapper/core.py +++ b/src/tdamapper/core.py @@ -279,6 +279,15 @@ class TrivialCover(ParamsMixin, Generic[T]): dataset. """ + def fit(self, X: ArrayRead[T]) -> TrivialCover[T]: + """ + Fit the cover algorithm to the data. + + :param X: A dataset of n points. Ignored. + :return: self + """ + return self + def apply(self, X: ArrayRead[T]) -> Iterator[list[int]]: """ Covers the dataset with a single open set. @@ -286,7 +295,8 @@ def apply(self, X: ArrayRead[T]) -> Iterator[list[int]]: :param X: A dataset of n points. :return: A generator of lists of ids. """ - yield list(range(0, len(X))) + if len(X) > 0: + yield list(range(0, len(X))) class FailSafeClustering(ParamsMixin, Generic[T]): diff --git a/src/tdamapper/cover.py b/src/tdamapper/cover.py index fdcccc2..de5c171 100644 --- a/src/tdamapper/cover.py +++ b/src/tdamapper/cover.py @@ -377,6 +377,8 @@ def fit(self, X: ArrayRead[NDArray[np.float_]]) -> BaseCubicalCover: :param X: A dataset of n points. :return: The object itself. """ + if len(X) == 0: + return self X_ = np.asarray(X).reshape(len(X), -1).astype(float) if self.overlap_frac is None: dim = 1 if X_.ndim == 1 else X_.shape[1] diff --git a/src/tdamapper/learn.py b/src/tdamapper/learn.py index c336795..b98b690 100644 --- a/src/tdamapper/learn.py +++ b/src/tdamapper/learn.py @@ -159,10 +159,12 @@ def fit( """ y_ = X if y is None else y X, y_ = self._validate_X_y(X, y_) - self._cover = TrivialCover() if self.cover is None else self.cover - self._clustering = ( - TrivialClustering() if self.clustering is None else self.clustering - ) + self._cover = TrivialCover() + if self.cover is not None: + self._cover = clone(self.cover) + self._clustering = TrivialClustering() + if self.clustering is not None: + self._clustering = clone(self.clustering) self._verbose = self.verbose self._failsafe = self.failsafe if self._failsafe: @@ -170,8 +172,6 @@ def fit( clustering=self._clustering, verbose=self._verbose, ) - self._cover = clone(self._cover) - self._clustering = clone(self._clustering) self._n_jobs = self.n_jobs self.graph_ = mapper_graph( X, diff --git a/src/tdamapper/utils/heap.py b/src/tdamapper/utils/heap.py index 6fad7af..33e653f 100644 --- a/src/tdamapper/utils/heap.py +++ b/src/tdamapper/utils/heap.py @@ -1,3 +1,10 @@ +""" +This module implements a max-heap data structure that allows for efficient +retrieval and removal of the maximum element. The heap supports adding +elements, retrieving the maximum element, and removing the maximum element +while maintaining the heap property. +""" + from __future__ import annotations from typing import Generic, Iterator, Optional, Protocol, TypeVar @@ -16,6 +23,9 @@ def _parent(i: int) -> int: class Comparable(Protocol): + """ + Protocol for comparison methods required for a key in the heap. + """ def __lt__(self: K, other: K) -> bool: ... @@ -32,6 +42,14 @@ def __ge__(self: K, other: K) -> bool: ... class _HeapNode(Generic[K, V]): + """ + A node in the heap that holds a key-value pair. + + The key is used for comparison, and the value is stored alongside it. + + :param key: The key used for comparison. + :param value: The value associated with the key. + """ _key: K _value: V @@ -41,6 +59,11 @@ def __init__(self, key: K, value: V) -> None: self._value = value def get(self) -> tuple[K, V]: + """ + Returns the key-value pair stored in the node. + + :return: A tuple containing the key and value. + """ return self._key, self._value def __lt__(self, other: _HeapNode[K, V]) -> bool: @@ -57,6 +80,12 @@ def __ge__(self, other: _HeapNode[K, V]) -> bool: class MaxHeap(Generic[K, V]): + """ + A max-heap implementation that allows for efficient retrieval of the + maximum element. This heap supports adding elements, retrieving the maximum + element, and removing the maximum element while maintaining the heap + property. + """ _heap: list[_HeapNode[K, V]] _iter: Iterator[_HeapNode[K, V]] @@ -75,12 +104,32 @@ def __next__(self) -> tuple[K, V]: def __len__(self) -> int: return len(self._heap) + def is_empty(self) -> bool: + """ + Check if the heap is empty. + + :return: True if the heap is empty, False otherwise. + """ + return len(self._heap) == 0 + def top(self) -> Optional[tuple[K, V]]: + """ + Returns the maximum element in the heap without removing it. + + :return: A tuple containing the key and value of the maximum element, + or None if the heap is empty. + """ if not self._heap: return None return self._heap[0].get() def pop(self) -> Optional[tuple[K, V]]: + """ + Removes and returns the maximum element from the heap. + + :return: A tuple containing the key and value of the maximum element, + or None if the heap is empty. + """ if not self._heap: return None max_val = self._heap[0] @@ -89,8 +138,14 @@ def pop(self) -> Optional[tuple[K, V]]: self._bubble_down() return max_val.get() - def add(self, key: K, val: V) -> None: - self._heap.append(_HeapNode(key, val)) + def add(self, key: K, value: V) -> None: + """ + Adds a new key-value pair to the heap. + + :param key: The key used for comparison. + :param value: The value associated with the key. + """ + self._heap.append(_HeapNode(key, value)) self._bubble_up() def _get_local_max(self, i: int) -> int: diff --git a/src/tdamapper/utils/metrics.py b/src/tdamapper/utils/metrics.py index 46d54ed..4d7bde1 100644 --- a/src/tdamapper/utils/metrics.py +++ b/src/tdamapper/utils/metrics.py @@ -58,7 +58,7 @@ def get_supported_metrics() -> list[MetricLiteral]: return list(get_args(MetricLiteral)) -def euclidean(**kwargs: dict[str, Any]) -> Metric[Any]: +def euclidean() -> Metric[Any]: """ Return the Euclidean distance function for vectors. @@ -70,7 +70,7 @@ def euclidean(**kwargs: dict[str, Any]) -> Metric[Any]: return _metrics.euclidean -def manhattan(**kwargs: dict[str, Any]) -> Metric[Any]: +def manhattan() -> Metric[Any]: """ Return the Manhattan distance function for vectors. @@ -82,7 +82,7 @@ def manhattan(**kwargs: dict[str, Any]) -> Metric[Any]: return _metrics.manhattan -def chebyshev(**kwargs: dict[str, Any]) -> Metric[Any]: +def chebyshev() -> Metric[Any]: """ Return the Chebyshev distance function for vectors. @@ -94,7 +94,7 @@ def chebyshev(**kwargs: dict[str, Any]) -> Metric[Any]: return _metrics.chebyshev -def minkowski(**kwargs: dict[str, Any]) -> Metric[Any]: +def minkowski(p: Union[int, float]) -> Metric[Any]: """ Return the Minkowski distance function for order p on vectors. @@ -106,9 +106,6 @@ def minkowski(**kwargs: dict[str, Any]) -> Metric[Any]: :param p: The order of the Minkowski distance. :return: The Minkowski distance function. """ - p = kwargs.get("p", 2) - if not isinstance(p, (int, float)): - raise TypeError("p must be an integer or a float") if p == 1: return manhattan() if p == 2: @@ -122,7 +119,7 @@ def dist(x: Any, y: Any) -> float: return dist -def cosine(**kwargs: dict[str, Any]) -> Metric[Any]: +def cosine() -> Metric[Any]: """ Return the cosine distance function for vectors. @@ -145,9 +142,7 @@ def cosine(**kwargs: dict[str, Any]) -> Metric[Any]: return _metrics.cosine -def get_metric( - metric: Union[MetricLiteral, Metric[Any]], **kwargs: dict[str, Any] -) -> Metric[Any]: +def get_metric(metric: Union[MetricLiteral, Metric[Any]], **kwargs: Any) -> Metric[Any]: """ Return a distance function based on the specified string or callable. diff --git a/src/tdamapper/utils/unionfind.py b/src/tdamapper/utils/unionfind.py index 05ef3ce..00fee61 100644 --- a/src/tdamapper/utils/unionfind.py +++ b/src/tdamapper/utils/unionfind.py @@ -1,16 +1,36 @@ +""" +This module implements a Union-Find data structure that supports union and +find operations. +""" + from typing import Any, Iterable class UnionFind: + """ + A Union-Find data structure that supports union and find operations. + + This implementation uses path compression for efficient find operations + and union by size to keep the tree flat. It allows for efficient + determination of connected components in a set of elements. + + :param X: An iterable of elements to initialize the Union-Find structure. + """ _parent: dict[Any, Any] _size: dict[Any, int] - def __init__(self, X: Iterable[Any]): - self._parent = {x: x for x in X} - self._size = {x: 1 for x in X} + def __init__(self, items: Iterable[Any]): + self._parent = {x: x for x in items} + self._size = {x: 1 for x in items} def find(self, x: Any) -> Any: + """ + Finds the class of an element, applying path compression. + + :param x: The element to find the class of. + :return: The representative of the class containing x. + """ root = x while root != self._parent[root]: root = self._parent[root] @@ -22,6 +42,13 @@ def find(self, x: Any) -> Any: return root def union(self, x: Any, y: Any) -> Any: + """ + Unites the classes of two elements. + + :param x: The first element. + :param y: The second element. + :return: The representative of the class after the union operation. + """ x, y = self.find(x), self.find(y) if x != y: x_size, y_size = self._size[x], self._size[y] diff --git a/src/tdamapper/utils/vptree_flat/builder.py b/src/tdamapper/utils/vptree_flat/builder.py index 3a989ab..728518c 100644 --- a/src/tdamapper/utils/vptree_flat/builder.py +++ b/src/tdamapper/utils/vptree_flat/builder.py @@ -91,7 +91,8 @@ def build(self) -> VPArray[T]: :return: A tuple containing the constructed vp-tree and the VPArray. """ - self._build_iter() + if self._array.size() > 0: + self._build_iter() return self._array def _build_iter(self) -> None: diff --git a/src/tdamapper/utils/vptree_hier/builder.py b/src/tdamapper/utils/vptree_hier/builder.py index 450ebbb..3f5f21e 100644 --- a/src/tdamapper/utils/vptree_hier/builder.py +++ b/src/tdamapper/utils/vptree_hier/builder.py @@ -94,10 +94,15 @@ def build(self) -> tuple[Tree[T], VPArray[T]]: :return: A tuple containing the constructed vp-tree and the VPArray. """ - tree = self._build_rec(0, self._array.size()) + if self._array.size() > 0: + tree = self._build_rec(0, self._array.size()) + else: + tree = Leaf(0, 0) return tree, self._array def _build_rec(self, start: int, end: int) -> Tree[T]: + if end - start <= self._leaf_capacity: + return Leaf(start, end) mid = _mid(start, end) self._update(start, end) v_point = self._array.get_point(start) @@ -106,7 +111,7 @@ def _build_rec(self, start: int, end: int) -> Tree[T]: self._array.set_distance(start, v_radius) left: Tree[T] right: Tree[T] - if (end - start <= 2 * self._leaf_capacity) or (v_radius <= self._leaf_radius): + if v_radius <= self._leaf_radius: left = Leaf(start + 1, mid) right = Leaf(mid, end) else: diff --git a/tests/test_unit_cover.py b/tests/test_unit_cover.py index b9c5add..4e0dec9 100644 --- a/tests/test_unit_cover.py +++ b/tests/test_unit_cover.py @@ -1,74 +1,232 @@ +""" +Unit tests for the cover algorithms. +""" + +import math + import numpy as np +import pytest from tdamapper.core import TrivialCover -from tdamapper.cover import BallCover, CubicalCover, KNNCover +from tdamapper.cover import ( + BallCover, + CubicalCover, + KNNCover, + ProximityCubicalCover, + StandardCubicalCover, +) +from tdamapper.utils.unionfind import UnionFind +from tests.test_utils import dataset_grid, dataset_simple, dataset_two_lines + +SIMPLE = dataset_simple() +TWO_LINES = dataset_two_lines() -def dataset(dim=1, num=10000): - return [np.random.rand(dim) for _ in range(num)] +GRID = dataset_grid(10) -def test_trivial_cover(): - data = dataset() - cover = TrivialCover() +def assert_coverage(data, cover): + """ + Assert that the cover applies to the data and covers all points. + """ + covered = set() charts = list(cover.apply(data)) - assert 1 == len(charts) + for point_ids in charts: + for point_id in point_ids: + covered.add(point_id) + assert len(covered) == len(data) + return charts -def test_ball_cover_simple(): - data = [ - np.array([0.0, 1.0]), - np.array([1.0, 0.0]), - np.array([0.0, 0.0]), - np.array([1.0, 1.0]), - ] - cover = BallCover(radius=1.1, metric="euclidean") - charts = list(cover.apply(data)) - assert 2 == len(charts) +def count_components(charts): + """ + Count the number of unique connected components in the charts. + Each chart is a list of point ids. When multiple charts share points, + they are considered connected. + """ + # Create a mapping from point ids to chart ids + point_charts = {} + for chart_id, point_ids in enumerate(charts): + for point_id in point_ids: + if point_id not in point_charts: + point_charts[point_id] = [] + point_charts[point_id].append(chart_id) + uf = UnionFind(list(range(len(charts)))) + for point_id, chart_ids in point_charts.items(): + for i in range(len(chart_ids) - 1): + uf.union(chart_ids[i], chart_ids[i + 1]) + # Count the number of unique components + unique_components = set() + for chart_id in range(len(charts)): + unique_components.add(uf.find(chart_id)) + return len(unique_components) -def test_knn_cover_simple(): - data = [ - np.array([0.0, 1.0]), - np.array([1.1, 0.0]), - np.array([0.0, 0.0]), - np.array([1.1, 1.0]), - ] - cover = KNNCover(neighbors=2, metric="euclidean") - charts = list(cover.apply(data)) - assert 2 == len(charts) +@pytest.mark.parametrize( + "cover", + [ + TrivialCover(), + BallCover(radius=0.1, metric="euclidean"), + KNNCover(neighbors=1, metric="euclidean"), + StandardCubicalCover(n_intervals=2, overlap_frac=0.5), + ProximityCubicalCover(n_intervals=2, overlap_frac=0.5), + ], +) +def test_cover_empty(cover): + """ + Test that the cover algorithms handle empty datasets correctly. + """ + empty_data = np.array([]) + cover.fit(empty_data) + charts = cover.apply(empty_data) + assert len(list(charts)) == 0 -def test_cubical_cover_simple(): - data = [ - np.array([0.0, 1.0]), - np.array([1.1, 0.0]), - np.array([0.0, 0.0]), - np.array([1.1, 1.0]), - ] - cover = CubicalCover(n_intervals=2, overlap_frac=0.5) - charts = list(cover.apply(data)) - assert 4 == len(charts) + +@pytest.mark.parametrize( + "dataset, cover, num_charts, num_components", + [ + # Simple dataset + (SIMPLE, TrivialCover(), 1, 1), + # BallCover: components are expected to merge when the radius crosses the + # lenghts of the rectangle sides. + (SIMPLE, BallCover(radius=0.9, metric="euclidean"), 4, 4), + (SIMPLE, BallCover(radius=1.1, metric="euclidean"), 2, 2), + (SIMPLE, BallCover(radius=1.5, metric="euclidean"), 1, 1), + # KNNCover: components are expected to merge when the number of neighbors + # is enough to cover a given number of rectangle sides. + (SIMPLE, KNNCover(neighbors=1, metric="euclidean"), 4, 4), + (SIMPLE, KNNCover(neighbors=2, metric="euclidean"), 2, 2), + (SIMPLE, KNNCover(neighbors=3, metric="euclidean"), 2, 1), + # StandardCubicalCover: components are expected to merge when intervals + # are big enough to cover the rectangle sides. + (SIMPLE, StandardCubicalCover(n_intervals=2, overlap_frac=0.1), 4, 4), + (SIMPLE, StandardCubicalCover(n_intervals=2, overlap_frac=0.5), 4, 4), + (SIMPLE, StandardCubicalCover(n_intervals=1, overlap_frac=0.5), 1, 1), + (SIMPLE, ProximityCubicalCover(n_intervals=2, overlap_frac=0.1), 4, 4), + (SIMPLE, ProximityCubicalCover(n_intervals=2, overlap_frac=0.5), 4, 4), + (SIMPLE, ProximityCubicalCover(n_intervals=1, overlap_frac=0.5), 1, 1), + # Two lines dataset + (TWO_LINES, TrivialCover(), 1, 1), + # BallCover: components are expected to merge when the radius crosses the + # distance between the two lines. + (TWO_LINES, BallCover(radius=0.2, metric="euclidean"), 10, 2), + (TWO_LINES, BallCover(radius=0.5, metric="euclidean"), 4, 2), + (TWO_LINES, BallCover(radius=1.0, metric="euclidean"), 4, 2), + (TWO_LINES, BallCover(radius=1.1, metric="euclidean"), 2, 1), + (TWO_LINES, BallCover(radius=1.5, metric="euclidean"), 1, 1), + # KNNCover: components are expected to merge when the number of neighbors + # is more than the cardinality of a single line. + (TWO_LINES, KNNCover(neighbors=3, metric="euclidean"), None, 2), + (TWO_LINES, KNNCover(neighbors=10, metric="euclidean"), None, 2), + (TWO_LINES, KNNCover(neighbors=100, metric="euclidean"), None, 2), + (TWO_LINES, KNNCover(neighbors=1001, metric="euclidean"), 2, 1), + # StandardCubicalCover: components are expected to merge when intervals + # are big enough to cover the distance between the two lines. + (TWO_LINES, StandardCubicalCover(n_intervals=2, overlap_frac=0.5), 4, 2), + (TWO_LINES, ProximityCubicalCover(n_intervals=2, overlap_frac=0.5), 4, 2), + # Grid dataset + (GRID, TrivialCover(), 1, 1), + # BallCover: components are expected to jump from many singletons sets + # to a single one when the radius crosses the grid spacing. + (GRID, BallCover(radius=0.01, metric="euclidean"), 100, 100), + (GRID, BallCover(radius=0.2, metric="euclidean"), None, 1), + # KNNCover: components are expected to merge when the number of neighbors + # is more than the number of adjacent points in the grid. + (GRID, KNNCover(neighbors=1, metric="euclidean"), 100, 100), + (GRID, KNNCover(neighbors=10, metric="euclidean"), None, 1), + (GRID, StandardCubicalCover(n_intervals=2, overlap_frac=0.5), 4, 1), + (GRID, StandardCubicalCover(n_intervals=2), 4, 1), + (GRID, ProximityCubicalCover(n_intervals=2, overlap_frac=0.5), 4, 1), + ( + GRID, + CubicalCover(n_intervals=2, overlap_frac=0.5, algorithm="proximity"), + 4, + 1, + ), + ( + GRID, + CubicalCover(n_intervals=2, overlap_frac=0.5, algorithm="standard"), + 4, + 1, + ), + ], +) +def test_cover(dataset, cover, num_charts, num_components): + """ + Test that the cover algorithm covers the dataset correctly, and that the + number of charts and components is as expected. If num_charts or + num_components is None, the test will not check that value. + """ + charts = assert_coverage(dataset, cover) + for chart in charts: + assert len(chart) > 0 + if num_charts is not None: + assert len(charts) == num_charts + if num_components is not None: + assert count_components(charts) == num_components -def test_params(): - cover = CubicalCover(n_intervals=2, overlap_frac=0.5) +@pytest.mark.parametrize( + "cover, params", + [ + (TrivialCover(), {}), + ( + BallCover(radius=0.2, metric="euclidean"), + {"radius": 0.21, "metric": "euclidean"}, + ), + ( + KNNCover(neighbors=10, metric="euclidean"), + {"neighbors": 13, "metric": "euclidean"}, + ), + ( + StandardCubicalCover(n_intervals=2, overlap_frac=0.5), + {"n_intervals": 4, "overlap_frac": 0.145}, + ), + ( + ProximityCubicalCover(n_intervals=2, overlap_frac=0.5), + {"n_intervals": 4, "overlap_frac": 0.145}, + ), + ( + CubicalCover(n_intervals=2, overlap_frac=0.5, algorithm="standard"), + {"n_intervals": 4, "overlap_frac": 0.145, "algorithm": "proximity"}, + ), + ], +) +def test_params(cover, params): + """ + Test that the cover can get and set parameters correctly. + """ + cover.set_params(**params) params = cover.get_params(deep=True) - assert 2 == params["n_intervals"] - assert 0.5 == params["overlap_frac"] - - -def test_standard_cover_simple(): - data = [ - np.array([0.0, 1.0]), - np.array([1.1, 0.0]), - np.array([0.0, 0.0]), - np.array([1.1, 1.0]), - ] - cover = CubicalCover( - n_intervals=2, - overlap_frac=0.5, - algorithm="standard", - ) - charts = list(cover.apply(data)) - assert 4 == len(charts) + for k, v in params.items(): + assert params[k] == v + + +def test_cubical_cover(): + """ + Test the CubicalCover with a specific configuration. + This test checks that the cover correctly identifies the intervals and + overlaps based on the given parameters. + """ + m, M = 0, 99 + n = 10 + p = 0.1 + w = (M - m) / (n * (1.0 - p)) + delta = p * w + data = list(range(m, M + 1)) + cover = CubicalCover(n_intervals=n, overlap_frac=p) + cover.fit(data) + for x in data[:-1]: + result = cover.search(x) + i = math.floor((x - m) / (w - delta)) + a_i = m + i * (w - delta) - delta / 2.0 + b_i = m + (i + 1) * (w - delta) + delta / 2.0 + expected = [y for y in data if y > a_i and y < b_i] + for c in result: + assert c in expected + for c in expected: + assert c in result + x = data[-1] + last_result = cover.search(x) + assert result == last_result diff --git a/tests/test_unit_heap.py b/tests/test_unit_heap.py index 55f0ce3..b020c5b 100644 --- a/tests/test_unit_heap.py +++ b/tests/test_unit_heap.py @@ -1,39 +1,41 @@ -import random +import pytest from tdamapper.utils.heap import MaxHeap - - -def maxheap(data): +from tests.test_utils import list_int_random + + +def _check_heap_property(data, i=0): + if i >= len(data): + return + i_left = 2 * i + 1 + if i_left < len(data): + assert data[i] >= data[i_left] + i_right = 2 * i + 2 + if i_right < len(data): + assert data[i] >= data[i_right] + _check_heap_property(data, i_left) + _check_heap_property(data, i_right) + + +@pytest.mark.parametrize( + "data", + [ + [], + [1, 2, 3, 4, 5], + [5, 4, 3, 2, 1], + [1, 1, 1, 1, 2], + list_int_random(10), + list_int_random(100), + list_int_random(1000), + ], +) +def test_max_heap(data): m = MaxHeap() for x in data: m.add(x, x) - return m - - -def test_empty(): - m = MaxHeap() - assert 0 == len(m) - - -def test_max(): - data = list(range(10)) - random.shuffle(data) - m = maxheap(data) - assert (9, 9) == m.top() - assert 10 == len(m) - - -def test_max_random(): - data = random.sample(list(range(1000)), 100) - m = maxheap(data) - assert 100 == len(m) - max_data = max(data) - assert (max_data, max_data) == m.top() - assert 0 != len(m) - collected = [] - for _ in range(10): - collected.append(m.pop()) - data.sort() - collected.sort() - assert collected == [(x, x) for x in data[-10:]] - assert 90 == len(m) + _check_heap_property(list(m)) + assert len(m) == len(data) + if not data: + assert m.is_empty() + assert m.top() is None + assert m.pop() is None diff --git a/tests/test_unit_knn.py b/tests/test_unit_knn.py deleted file mode 100644 index 69da0b6..0000000 --- a/tests/test_unit_knn.py +++ /dev/null @@ -1,162 +0,0 @@ -import numpy as np - -from tdamapper.cover import KNNCover -from tdamapper.utils.metrics import euclidean, get_metric -from tdamapper.utils.vptree_flat.vptree import VPTree - -distance = get_metric("euclidean") - -X = np.array( - [ - [99.30543214, 99.85036546], - [98.82687659, 101.94362119], - [99.260437, 101.5430146], - [99.5444675, 100.01747916], - [99.49034782, 99.5619257], - [99.73199663, 100.8024564], - [100.3130677, 99.14590426], - [101.18802979, 100.31694261], - [98.96575716, 100.68159452], - [101.8831507, 98.65224094], - [99.59682305, 101.22244507], - [99.32566734, 100.03183056], - [100.04575852, 99.81281615], - [101.48825219, 101.89588918], - [100.15494743, 100.37816252], - [98.95144703, 98.57998206], - [101.86755799, 99.02272212], - [100.72909056, 100.12898291], - [99.65208785, 100.15634897], - [99.97181777, 100.42833187], - [99.36567791, 99.63725883], - [99.89678115, 100.4105985], - [100.46566244, 98.46375631], - [99.25524518, 99.17356146], - [100.92085882, 100.31872765], - [100.12691209, 100.40198936], - [101.53277921, 101.46935877], - [101.12663592, 98.92006849], - [99.61267318, 99.69769725], - [100.17742614, 99.59821906], - [98.74720464, 100.77749036], - [99.09270164, 100.0519454], - [99.13877431, 101.91006495], - [97.44701018, 100.6536186], - [99.10453344, 100.3869025], - [99.12920285, 99.42115034], - [99.90154748, 99.33652171], - [98.99978465, 98.4552289], - [98.85253135, 99.56217996], - [100.20827498, 100.97663904], - [99.96071718, 98.8319065], - [101.49407907, 99.79484174], - [99.50196755, 101.92953205], - [100.40234164, 99.31518991], - [98.29372981, 101.9507754], - [100.1666735, 100.63503144], - [99.32753955, 99.64044684], - [100.14404357, 101.45427351], - [99.3563816, 97.77659685], - [98.50874241, 100.4393917], - [99.36415392, 100.67643329], - [102.38314477, 100.94447949], - [100.44386323, 100.33367433], - [100.94942081, 100.08755124], - [100.85683061, 99.34897441], - [99.58638102, 99.25254519], - [99.08717777, 101.11701629], - [98.729515, 100.96939671], - [99.48919486, 98.81936782], - [100.57659082, 99.79170124], - [100.06651722, 100.3024719], - [98.36980165, 100.46278226], - [100.37642553, 98.90059921], - [98.83485016, 100.90082649], - [98.70714309, 100.26705087], - [100.76103773, 100.12167502], - [100.39600671, 98.90693849], - [101.13940068, 98.76517418], - [98.89561666, 100.05216508], - [98.38610215, 99.78725972], - [99.68844747, 100.05616534], - [101.76405235, 100.40015721], - [100.97873798, 102.2408932], - [100.94725197, 99.84498991], - [102.26975462, 98.54563433], - [100.95008842, 99.84864279], - [101.17877957, 99.82007516], - [100.52327666, 99.82845367], - [100.77179055, 100.82350415], - [98.68409259, 99.5384154], - ] -) - - -x = np.array([99.73199663, 100.8024564]) - - -def test_knn_search(): - knn_cover = KNNCover(neighbors=5, metric=distance) - knn_cover.fit(X) - neigh_ids = knn_cover.search(x) - d = euclidean() - dists = [d(x, X[j]) for j in neigh_ids] - x_dist = d(x, X[5]) - assert x_dist in dists - - -def test_vptree(): - vptree = VPTree(X[:80], metric=distance, leaf_capacity=5) - neigh = vptree.knn_search(x, 5) - d = euclidean() - dists = [d(x, y) for y in neigh] - x_dist = d(x, X[5]) - check_vptree(vptree) - assert x_dist in dists - - -def test_vptree_simple(): - XX = np.array([np.array([x, x / 2]) for x in range(30)]) - vptree = VPTree(XX, metric=distance, leaf_capacity=5, leaf_radius=0.0) - xx = np.array([3, 3 / 2]) - neigh = vptree.knn_search(xx, 2) - d = euclidean() - dists = [d(xx, y) for y in neigh] - check_vptree(vptree) - assert 0.0 in dists - - -def check_vptree(vpt): - arr = vpt.array - data = arr._dataset - distances = arr._distances - indices = arr._indices - - dist = vpt.metric - leaf_capacity = vpt.leaf_capacity - leaf_radius = vpt.leaf_radius - - def check_sub(start, end): - v_radius = distances[start] - v_point_index = indices[start] - v_point = data[v_point_index] - - mid = (start + end) // 2 - for i in range(start + 1, mid): - y_index = indices[i] - y = data[y_index] - assert dist(v_point, y) <= v_radius - for i in range(mid, end): - y_index = indices[i] - y = data[y_index] - assert dist(v_point, y) >= v_radius - - def check_rec(start, end): - v_radius = distances[start] - if (end - start > leaf_capacity) and (v_radius > leaf_radius): - check_sub(start, end) - mid = (start + end) // 2 - check_rec(start + 1, mid) - check_rec(mid, end) - - check_rec(0, len(data)) diff --git a/tests/test_unit_metrics.py b/tests/test_unit_metrics.py index ca2c192..7a0cd06 100644 --- a/tests/test_unit_metrics.py +++ b/tests/test_unit_metrics.py @@ -1,51 +1,101 @@ +""" +Unit tests for the metrics module. +""" + +import math + import numpy as np +import pytest + +from tdamapper.utils.metrics import ( + chebyshev, + cosine, + euclidean, + get_metric, + get_supported_metrics, + manhattan, + minkowski, +) +from tests.test_utils import ( + dataset_empty, + dataset_grid, + dataset_random, + dataset_simple, + dataset_two_lines, +) + +EMPTY = dataset_empty() + +SIMPLE = dataset_simple() + +TWO_LINES = dataset_two_lines(100) + +GRID = dataset_grid(10) + +RANDOM = dataset_random(2, 100) + + +def _check_values(m1, m2, a, b): + m1_div_by_zero = False + m1_is_nan = False + + m2_div_by_zero = False + m2_is_nan = False + + try: + m1_value = m1(a, b) + if np.isnan(m1_value): + m1_is_nan = True + except ZeroDivisionError: + m1_div_by_zero = True + try: + m2_value = m2(a, b) + if np.isnan(m2_value): + m2_is_nan = True + except ZeroDivisionError: + m2_div_by_zero = True + assert m1_div_by_zero == m2_div_by_zero + assert m1_is_nan == m2_is_nan + if m1_div_by_zero or m2_div_by_zero: + return True + if m1_is_nan or m2_is_nan: + return True + return math.isclose(m1_value, m2_value) + + +@pytest.mark.parametrize("data", [SIMPLE, TWO_LINES, GRID, RANDOM]) +@pytest.mark.parametrize( + "m1, m2", + [ + (euclidean(), get_metric("euclidean")), + (manhattan(), get_metric("manhattan")), + (chebyshev(), get_metric("chebyshev")), + (manhattan(), get_metric("minkowski", p=1)), + (euclidean(), get_metric("minkowski", p=2)), + (minkowski(p=2.5), get_metric("minkowski", p=2.5)), + (minkowski(p=3), get_metric("minkowski", p=3)), + (chebyshev(), get_metric("minkowski", p=float("inf"))), + (cosine(), get_metric("cosine")), + ], +) +def test_metrics(m1, m2, data): + for a in data: + for b in data: + assert _check_values(m1, m2, a, b) + + +def test_supported_metrics(): + expected_metrics = [ + "euclidean", + "manhattan", + "chebyshev", + "cosine", + "minkowski", + ] + supported_metrics = get_supported_metrics() + assert set(supported_metrics) == set(expected_metrics) + -import tdamapper.utils.metrics as metrics - - -def test_euclidean(): - d = metrics.euclidean() - a = np.array([1.0, 0.0]) - b = np.array([0.0, 1.0]) - ab = d(a, b) - assert ab >= 1.414 - assert ab <= 1.415 - - -def test_manhattan(): - d = metrics.manhattan() - a = np.array([1.0, 0.0]) - b = np.array([0.0, 1.0]) - ab = d(a, b) - assert ab == 2.0 - - -def test_chebyshev(): - d = metrics.chebyshev() - a = np.array([1.0, 0.0]) - b = np.array([0.0, 1.0]) - ab = d(a, b) - assert ab == 1.0 - - -def test_cosine(): - d = metrics.cosine() - a = np.array([1.0, 0.0]) - b = np.array([0.0, 1.0]) - c = np.array([0.0, 2.0]) - ab = d(a, b) - assert ab >= 1.414 - assert ab <= 1.415 - bc = d(b, c) - assert bc == 0.0 - - -def test_get_metric(): - assert metrics.euclidean() == metrics.get_metric("euclidean") - assert metrics.euclidean() == metrics.get_metric("minkowski") - assert metrics.chebyshev() == metrics.get_metric("chebyshev") - assert metrics.chebyshev() == metrics.get_metric("minkowski", p=np.inf) - assert metrics.chebyshev() == metrics.get_metric("minkowski", p=float("inf")) - assert metrics.manhattan() == metrics.get_metric("manhattan") - assert metrics.manhattan() == metrics.get_metric("minkowski", p=1) - assert metrics.cosine() == metrics.get_metric("cosine") +def test_non_existent_metric(): + with pytest.raises(ValueError): + get_metric("non_existent_metric") diff --git a/tests/test_unit_params.py b/tests/test_unit_params.py deleted file mode 100644 index 5b52f85..0000000 --- a/tests/test_unit_params.py +++ /dev/null @@ -1,130 +0,0 @@ -from sklearn.cluster import DBSCAN - -from tdamapper._common import clone -from tdamapper.cover import BallCover, CubicalCover -from tdamapper.learn import MapperAlgorithm, MapperClustering - - -def _test_clone(obj): - obj_repr = repr(obj) - obj_cln = clone(obj) - cln_repr = repr(obj_cln) - assert obj_repr == cln_repr - - -def _test_repr(obj): - obj_repr = repr(obj) - _obj = eval(obj_repr) - _obj_repr = repr(_obj) - assert obj_repr == _obj_repr - - -def _test_clone_and_repr(obj): - _test_clone(obj) - _test_repr(obj) - - -def test_params_mapper_algorithm(): - est = MapperAlgorithm( - cover=CubicalCover( - n_intervals=3, - overlap_frac=0.3, - ), - ) - params = est.get_params(deep=False) - assert 5 == len(params) - params = est.get_params() - assert 12 == len(params) - assert 3 == params["cover__n_intervals"] - assert 0.3 == params["cover__overlap_frac"] - est.set_params(cover__n_intervals=2, cover__overlap_frac=0.2) - params = est.get_params() - assert 12 == len(params) - assert 2 == params["cover__n_intervals"] - assert 0.2 == params["cover__overlap_frac"] - - -def test_params_mapper_clustering(): - est = MapperClustering( - cover=CubicalCover( - n_intervals=3, - overlap_frac=0.3, - ), - ) - params = est.get_params(deep=False) - assert 3 == len(params) - params = est.get_params() - assert 10 == len(params) - assert 3 == params["cover__n_intervals"] - assert 0.3 == params["cover__overlap_frac"] - est.set_params(cover__n_intervals=2, cover__overlap_frac=0.2) - params = est.get_params() - assert 10 == len(params) - assert 2 == params["cover__n_intervals"] - assert 0.2 == params["cover__overlap_frac"] - - -def test_clone_and_repr_ball_cover(): - _test_clone_and_repr(BallCover()) - _test_clone_and_repr( - BallCover( - radius=2.0, - metric="test", - metric_params={"f": 4}, - kind="kind_test", - leaf_capacity=3.0, - leaf_radius=-2.0, - pivoting=7, - ) - ) - - -def test_clone_and_repr_cubical_cover(): - _test_clone_and_repr(CubicalCover()) - _test_clone_and_repr( - CubicalCover( - n_intervals=4, - overlap_frac=5, - algorithm="algo_test", - kind="simple", - leaf_radius=5, - leaf_capacity=6, - pivoting="no", - ) - ) - - -def test_clone_repr_mapper_algorithm(): - _test_clone_and_repr(MapperAlgorithm()) - _test_clone_and_repr( - MapperAlgorithm( - cover=CubicalCover( - n_intervals=3, - overlap_frac=0.3, - ), - clustering=DBSCAN( - eps="none", - min_samples=5.4, - ), - failsafe=4, - n_jobs="foo", - verbose=4, - ) - ) - - -def test_clone_repr_mapper_clustering(): - _test_clone_and_repr(MapperClustering()) - _test_clone_and_repr( - MapperClustering( - cover=CubicalCover( - n_intervals=3, - overlap_frac=0.3, - ), - clustering=DBSCAN( - eps="none", - min_samples=5.4, - ), - n_jobs="foo", - ) - ) diff --git a/tests/test_unit_proximity.py b/tests/test_unit_proximity.py deleted file mode 100644 index ca54bb3..0000000 --- a/tests/test_unit_proximity.py +++ /dev/null @@ -1,90 +0,0 @@ -import math - -import numpy as np - -from tdamapper.cover import BallCover, CubicalCover, KNNCover - - -def dataset(dim=1, num=10000): - return [np.random.rand(dim) for _ in range(num)] - - -def absdist(x, y): - return abs(x - y) - - -def test_ball_proximity(): - data = list(range(100)) - cover = BallCover(radius=10, metric=absdist) - cover.fit(data) - for x in data: - result = cover.search(x) - expected = [y for y in data if abs(x - y) < 10] - assert len(expected) == len(result) - - -def test_knn_proximity(): - data = list(range(100)) - cover = KNNCover(neighbors=11, metric=absdist) - cover.fit(data) - for x in range(5, 94): - result = cover.search(x) - expected = [x + i for i in range(-5, 6)] - assert set(expected) == set(result) - - -def test_cubical_proximity(): - m, M = 0, 99 - n = 10 - p = 0.1 - w = (M - m) / (n * (1.0 - p)) - delta = p * w - data = list(range(m, M + 1)) - cover = CubicalCover(n_intervals=n, overlap_frac=p) - cover.fit(data) - for x in data[:-1]: - result = cover.search(x) - i = math.floor((x - m) / (w - delta)) - a_i = m + i * (w - delta) - delta / 2.0 - b_i = m + (i + 1) * (w - delta) + delta / 2.0 - expected = [y for y in data if y > a_i and y < b_i] - for c in result: - assert c in expected - for c in expected: - assert c in result - x = data[-1] - last_result = cover.search(x) - assert result == last_result - - -def test_cubical_params(): - cover = CubicalCover(n_intervals=10, overlap_frac=0.5) - params = cover.get_params() - assert 10 == params["n_intervals"] - assert 0.5 == params["overlap_frac"] - cover.set_params(n_intervals=5, overlap_frac=0.25) - params = cover.get_params() - assert 5 == params["n_intervals"] - assert 0.25 == params["overlap_frac"] - - -def test_knn_params(): - cover = KNNCover(neighbors=10, metric="chebyshev") - params = cover.get_params() - assert 10 == params["neighbors"] - assert "chebyshev" == params["metric"] - cover.set_params(neighbors=5, metric="euclidean") - params = cover.get_params() - assert 5 == params["neighbors"] - assert "euclidean" == params["metric"] - - -def test_ball_params(): - cover = BallCover(radius=10.0, metric="chebyshev") - params = cover.get_params() - assert 10.0 == params["radius"] - assert "chebyshev" == params["metric"] - cover.set_params(radius=5.0, metric="euclidean") - params = cover.get_params() - assert 5.0 == params["radius"] - assert "euclidean" == params["metric"] diff --git a/tests/test_unit_quickselect.py b/tests/test_unit_quickselect.py index 8b8ab0a..07361f9 100755 --- a/tests/test_unit_quickselect.py +++ b/tests/test_unit_quickselect.py @@ -1,14 +1,23 @@ -import random - import numpy as np +import pytest from tdamapper.utils.quickselect import partition, quickselect - - -def test_partition(): - n = 1000 - arr = np.array([i for i in range(n)]) - arr_extra = np.array([random.randint(0, n - 1) for i in range(n)]) +from tests.test_utils import list_int_random + + +@pytest.mark.parametrize( + "arr, arr_extra", + [ + (np.array([0, 1, -1]), np.array([4, 5, 6])), + (np.array([3, 2, 1]), np.array([7, 8, 9])), + (np.array([10, 20, 30]), np.array([11, 12, 13])), + (np.array([-5, -10, -15]), np.array([-1, -2, -3])), + (np.array([0, 0, 1, 6, 9, 1, 1]), np.array([-1, -2, -3, 0, 0, 0, 0])), + (np.array(list_int_random(1000)), np.array(list_int_random(1000))), + ], +) +def test_partition(arr, arr_extra): + n = len(arr) for choice in range(n): h = partition(arr, 0, n, choice, arr_extra) for i in range(0, h): @@ -17,22 +26,19 @@ def test_partition(): assert arr[i] >= choice -def test_quickselect_bounds(): - arr = np.array([0, 1, -1]) - arr_extra = np.array([4, 5, 6]) - quickselect(arr, 1, 2, 0, arr_extra) - assert 0 == arr[0] - assert 1 == arr[1] - assert -1 == arr[2] - assert 4 == arr_extra[0] - assert 5 == arr_extra[1] - assert 6 == arr_extra[2] - - -def test_quickselect(): - n = 1000 - arr = np.array([i for i in range(n)]) - arr_extra = np.array([random.randint(0, n - 1) for i in range(n)]) +@pytest.mark.parametrize( + "arr, arr_extra", + [ + (np.array([0, 1, -1]), np.array([4, 5, 6])), + (np.array([3, 2, 1]), np.array([7, 8, 9])), + (np.array([10, 20, 30]), np.array([11, 12, 13])), + (np.array([-5, -10, -15]), np.array([-1, -2, -3])), + (np.array([0, 0, 1, 6, 9, 1, 1]), np.array([-1, -2, -3, 0, 0, 0, 0])), + (np.array(list_int_random(1000)), np.array(list_int_random(1000))), + ], +) +def test_quickselect(arr, arr_extra): + n = len(arr) for choice in range(n): quickselect(arr, 0, n, choice, arr_extra) val = arr[choice] @@ -40,28 +46,3 @@ def test_quickselect(): assert arr[i] <= val for i in range(choice, n): assert arr[i] >= val - - -def test_partition_tuple(): - n = 1000 - arr_data = np.array([random.randint(0, n - 1) for i in range(n)]) - arr_ord = np.array(list(range(n))) - for choice in range(n): - h = partition(arr_ord, 0, n, choice, arr_data) - for i in range(0, h): - assert arr_ord[i] < choice - for i in range(h, n): - assert arr_ord[i] >= choice - - -def test_quickselect_tuple(): - n = 1000 - arr_data = np.array([random.randint(0, n - 1) for i in range(n)]) - arr_ord = np.array(list(range(n))) - for choice in range(n): - quickselect(arr_ord, 0, n, choice, arr_data) - val = arr_ord[choice] - for i in range(0, choice): - assert arr_ord[i] <= val - for i in range(choice, n): - assert arr_ord[i] >= val diff --git a/tests/test_unit_sklearn.py b/tests/test_unit_sklearn.py index 2f12b99..d3aeb7a 100644 --- a/tests/test_unit_sklearn.py +++ b/tests/test_unit_sklearn.py @@ -1,52 +1,111 @@ import logging import numpy as np +import pytest +from sklearn.cluster import DBSCAN, AgglomerativeClustering from sklearn.utils.estimator_checks import check_estimator -from tdamapper.cover import BallCover, CubicalCover, KNNCover +from tdamapper._common import clone +from tdamapper.core import TrivialClustering, TrivialCover +from tdamapper.cover import ( + BallCover, + CubicalCover, + KNNCover, + ProximityCubicalCover, + StandardCubicalCover, +) from tdamapper.learn import MapperAlgorithm, MapperClustering from tests.setup_logging import setup_logging - -def euclidean(x, y): - return np.linalg.norm(x - y) - - setup_logging() logger = logging.getLogger(__name__) -def run_tests(estimator): - for est, check in check_estimator(estimator, generate_only=True): - # logger.info(f'{check}') - check(est) - - -def test_trivial(): - est = MapperAlgorithm() - run_tests(est) - - -def test_ball(): - est = MapperAlgorithm(cover=BallCover(metric=euclidean)) - run_tests(est) - - -def test_knn(): - est = MapperAlgorithm(cover=KNNCover(metric=euclidean)) - run_tests(est) - - -def test_cubical(): - est = MapperAlgorithm(cover=CubicalCover()) - run_tests(est) - - -def test_clustering_trivial(): - est = MapperClustering() - run_tests(est) +def _euclidean(x, y): + """ + Calculate the Euclidean distance between two points. + """ + return np.linalg.norm(x - y) -def test_clustering_ball(): - est = MapperClustering(cover=BallCover(metric=euclidean)) - run_tests(est) +def _test_clone(obj): + obj_repr = repr(obj) + obj_cln = clone(obj) + cln_repr = repr(obj_cln) + assert obj_repr == cln_repr + + +def _test_repr(obj): + obj_repr = repr(obj) + _obj = eval(obj_repr) + _obj_repr = repr(_obj) + assert obj_repr == _obj_repr + + +def _test_params(obj, params): + obj.set_params(**params) + params = obj.get_params(deep=True) + for k, v in params.items(): + assert params[k] == v + + +@pytest.mark.parametrize( + "clustering", + [ + TrivialClustering(), + # DBSCAN(eps=0.5, min_samples=5), + # AgglomerativeClustering(n_clusters=3, linkage="ward"), + # AgglomerativeClustering(n_clusters=3, linkage="average"), + ], +) +@pytest.mark.parametrize( + "cover", + [ + TrivialCover(), + BallCover(metric=_euclidean), + BallCover(metric=_euclidean), + KNNCover(metric=_euclidean), + CubicalCover(n_intervals=3, overlap_frac=0.2), + StandardCubicalCover(n_intervals=3, overlap_frac=0.2), + ProximityCubicalCover(n_intervals=3, overlap_frac=0.2), + ], +) +@pytest.mark.parametrize("estimator", [MapperAlgorithm, MapperClustering]) +def test_mapper_estimator(estimator, cover, clustering): + """ + Test that the estimator is compliant with scikit-learn's estimator checks. + """ + est = estimator(cover=cover, clustering=clustering) + for est_, check in check_estimator(est, generate_only=True): + # logger.info(f'{check}') + check(est_) + + +@pytest.mark.parametrize( + "obj, params", + [ + (TrivialClustering(), {}), + (TrivialCover(), {}), + (MapperAlgorithm(), {"cover": BallCover(metric=_euclidean)}), + ( + MapperAlgorithm(), + { + "cover": BallCover(metric="euclidean"), + "clustering": DBSCAN(eps=0.5, min_samples=5), + }, + ), + (MapperClustering(), {"cover": KNNCover(metric="euclidean")}), + (BallCover(), {"radius": 0.5, "metric": "euclidean"}), + (KNNCover(), {"neighbors": 5, "metric": "euclidean"}), + (CubicalCover(), {"n_intervals": 3, "overlap_frac": 0.2}), + (StandardCubicalCover(), {"n_intervals": 3, "overlap_frac": 0.2}), + (ProximityCubicalCover(), {"n_intervals": 3, "overlap_frac": 0.2}), + ], +) +def test_obj(obj, params): + """ + Test that the object can be cloned, represented, and has correct parameters. + """ + _test_clone(obj) + _test_repr(obj) + _test_params(obj, params) diff --git a/tests/test_unit_unionfind.py b/tests/test_unit_unionfind.py index d2c42f6..bae09a4 100644 --- a/tests/test_unit_unionfind.py +++ b/tests/test_unit_unionfind.py @@ -1,14 +1,30 @@ +import pytest + from tdamapper.utils.unionfind import UnionFind -def test_list(): - data = [1, 2, 3, 4] - uf = UnionFind(data) - for i in data: - assert i == uf.find(i) - j = uf.union(1, 2) - assert j == uf.find(1) - assert j == uf.find(2) - k = uf.union(3, 4) - assert k == uf.find(3) - assert k == uf.find(4) +@pytest.mark.parametrize( + "datasets, expected_components", + [ + ([[], []], 0), + ([[], [1]], 1), + ([[], [1, 2]], 1), + ([[1, 2], [3, 4]], 2), + ([[1, 2, 3], [3, 4]], 1), + ], +) +def test_unionfind(datasets, expected_components): + all_data = [] + for dataset in datasets: + all_data.extend(dataset) + uf = UnionFind(all_data) + for dataset in datasets: + if dataset: + for x, y in zip(dataset, dataset[1:]): + uf.union(x, y) + + components = set() + for x in all_data: + components.add(uf.find(x)) + + assert len(components) == expected_components diff --git a/tests/test_unit_vptree.py b/tests/test_unit_vptree.py index 2dad076..b6780f9 100644 --- a/tests/test_unit_vptree.py +++ b/tests/test_unit_vptree.py @@ -1,118 +1,199 @@ -import random +""" +Unit tests for the vp-tree implementations. +""" -import numpy as np +import math + +import pytest from tdamapper.utils.metrics import get_metric +from tdamapper.utils.vptree import VPTree from tdamapper.utils.vptree_flat.vptree import VPTree as FVPT from tdamapper.utils.vptree_hier.vptree import VPTree as HVPT from tests.ball_tree import SkBallTree +from tests.test_utils import ( + dataset_empty, + dataset_grid, + dataset_random, + dataset_simple, + dataset_two_lines, +) -distance = get_metric("euclidean") - +EMPTY = dataset_empty() -def dataset(dim=10, num=1000): - return [np.random.rand(dim) for _ in range(num)] +SIMPLE = dataset_simple() +TWO_LINES = dataset_two_lines(10) -eps = 0.25 +GRID = dataset_grid(10) -neighbors = 5 +RANDOM = dataset_random(2, 10) -def _test_ball_search(data, dist, vpt): - for _ in range(len(data) // 10): - point = random.choice(data) - ball = vpt.ball_search(point, eps) - d = get_metric(dist) - near = [y for y in data if d(point, y) < eps] - for x in ball: - assert any(d(x, y) == 0.0 for y in near) - for x in near: - assert any(d(x, y) == 0.0 for y in ball) +def _test_ball_search(data, dist, vpt, eps): + d = get_metric(dist) + for point in data: + neigh = vpt.ball_search(point, eps, inclusive=False) + neigh_naive = [y for y in data if d(point, y) < eps] + dist_neigh = [d(point, y) for y in neigh] + dist_neigh_naive = [d(point, y) for y in neigh_naive] + dist_neigh.sort() + dist_neigh_naive.sort() + assert len(dist_neigh) == len(dist_neigh_naive) + assert len(neigh) == len(neigh_naive) + for x, y in zip(dist_neigh, dist_neigh_naive): + assert math.isclose(x, y) -def _test_knn_search(data, dist, vpt): - for _ in range(len(data) // 10): - point = random.choice(data) +def _test_knn_search(data, dist, vpt, neighbors): + d = get_metric(dist) + for point in data: neigh = vpt.knn_search(point, neighbors) - assert neighbors == len(neigh) - d = get_metric(dist) + neigh_len = len(neigh) + assert min(neighbors, len(data)) == neigh_len dist_neigh = [d(point, y) for y in neigh] dist_data = [d(point, y) for y in data] dist_data.sort() dist_neigh.sort() - assert 0.0 == dist_data[0] - assert 0.0 == dist_neigh[0] - assert dist_neigh == dist_data[:neighbors] - assert set(dist_neigh) == set(dist_data[:neighbors]) + assert math.isclose(dist_data[0], 0.0) + assert math.isclose(dist_neigh[0], 0.0) + for x, y in zip(dist_neigh, dist_data[:neigh_len]): + assert math.isclose(x, y) def _test_nn_search(data, dist, vpt): d = get_metric(dist) for val in data: neigh = vpt.knn_search(val, 1) - assert 0.0 == d(val, neigh[0]) + assert math.isclose(d(val, neigh[0]), 0.0) -def _test_vptree(builder, data, dist): - vpt = builder(data, metric=dist, leaf_radius=eps, leaf_capacity=neighbors) - _test_ball_search(data, dist, vpt) - _test_knn_search(data, dist, vpt) - _test_nn_search(data, dist, vpt) +def _test_vptree(builder, data, dist, eps, neighbors, pivoting): vpt = builder( data, metric=dist, leaf_radius=eps, leaf_capacity=neighbors, - pivoting="random", + pivoting=pivoting, ) - _test_ball_search(data, dist, vpt) - _test_knn_search(data, dist, vpt) + _check_vptree_property(vpt) + _test_ball_search(data, dist, vpt, eps) + _test_knn_search(data, dist, vpt, neighbors) _test_nn_search(data, dist, vpt) + + +def _check_vptree_property(vpt): + arr = vpt.array + data = arr._dataset + distances = arr._distances + indices = arr._indices + + dist = vpt.metric + leaf_capacity = vpt.leaf_capacity + leaf_radius = vpt.leaf_radius + + def _check_sub(start, end): + v_radius = distances[start] + v_point_index = indices[start] + v_point = data[v_point_index] + + mid = (start + end) // 2 + for i in range(start + 1, mid): + y_index = indices[i] + y = data[y_index] + assert dist(v_point, y) <= v_radius + for i in range(mid, end): + y_index = indices[i] + y = data[y_index] + assert dist(v_point, y) >= v_radius + + def _check_rec(start, end): + v_radius = distances[start] + if (end - start > leaf_capacity) and (v_radius > leaf_radius): + _check_sub(start, end) + mid = (start + end) // 2 + _check_rec(start + 1, mid) + _check_rec(mid, end) + + _check_rec(0, len(data)) + + +@pytest.mark.parametrize("builder", [HVPT, FVPT]) +@pytest.mark.parametrize("dataset", [[], [1], [1, 2]]) +def test_vptree_small_dataset(builder, dataset): + """ + Test the vp-tree implementations with an empty dataset. + """ + vpt = builder(dataset, metric=lambda x, y: abs(x - y)) + array = vpt.array + assert array.size() == len(dataset) + + +@pytest.mark.parametrize("pivoting", ["disabled", "random", "furthest"]) +@pytest.mark.parametrize("eps", [0.1, 0.5]) +@pytest.mark.parametrize("neighbors", [2, 10]) +@pytest.mark.parametrize("builder", [HVPT, FVPT]) +@pytest.mark.parametrize("metric", ["euclidean", "manhattan"]) +@pytest.mark.parametrize("dataset", [TWO_LINES, GRID, RANDOM]) +def test_vptree(builder, dataset, metric, eps, neighbors, pivoting): + """ + Test the vp-tree implementations with various datasets and metrics. + """ + metric = get_metric(metric) vpt = builder( - data, - metric=dist, + dataset, + metric=metric, leaf_radius=eps, leaf_capacity=neighbors, - pivoting="furthest", + pivoting=pivoting, ) - _test_ball_search(data, dist, vpt) - _test_knn_search(data, dist, vpt) - _test_nn_search(data, dist, vpt) - - -def test_vptree_hier_refs(): - data = dataset() - data_refs = list(range(len(data))) - d = get_metric(distance) - - def dist_refs(i, j): - return d(data[i], data[j]) - - _test_vptree(HVPT, data_refs, dist_refs) - - -def test_vptree_hier_data(): - data = dataset() - _test_vptree(HVPT, data, distance) - - -def test_vptree_flat_refs(): - data = dataset() - data_refs = list(range(len(data))) - d = get_metric(distance) - - def dist_refs(i, j): - return d(data[i], data[j]) - - _test_vptree(FVPT, data_refs, dist_refs) - - -def test_vptree_flat_data(): - data = dataset() - _test_vptree(FVPT, data, distance) - - -def test_ball_tree_data(): - data = dataset() - _test_vptree(SkBallTree, data, distance) + _check_vptree_property(vpt) + _test_ball_search(dataset, metric, vpt, eps) + _test_knn_search(dataset, metric, vpt, neighbors) + _test_nn_search(dataset, metric, vpt) + + +@pytest.mark.parametrize("pivoting", ["disabled", "random", "furthest"]) +@pytest.mark.parametrize("eps", [0.1, 0.5]) +@pytest.mark.parametrize("neighbors", [2, 10]) +@pytest.mark.parametrize("kind", ["flat", "hierarchical"]) +@pytest.mark.parametrize("metric", ["euclidean", "manhattan"]) +@pytest.mark.parametrize("dataset", [SIMPLE, TWO_LINES]) +def test_vptree_public(kind, dataset, metric, eps, neighbors, pivoting): + """ + Test the vp-tree implementations with various datasets and metrics. + """ + metric = get_metric(metric) + vpt = VPTree( + dataset, + kind=kind, + metric=metric, + leaf_radius=eps, + leaf_capacity=neighbors, + pivoting=pivoting, + ) + _test_ball_search(dataset, metric, vpt, eps) + _test_knn_search(dataset, metric, vpt, neighbors) + _test_nn_search(dataset, metric, vpt) + + +@pytest.mark.parametrize("pivoting", ["disabled", "random", "furthest"]) +@pytest.mark.parametrize("eps", [0.1, 0.5]) +@pytest.mark.parametrize("neighbors", [2, 10]) +@pytest.mark.parametrize("metric", ["euclidean", "manhattan"]) +@pytest.mark.parametrize("dataset", [TWO_LINES, GRID, RANDOM]) +def test_vptree_sklearn(dataset, metric, eps, neighbors, pivoting): + """ + Test the baseline vp-tree implementation with various datasets and metrics. + """ + metric = get_metric(metric) + vpt = SkBallTree( + dataset, + metric=metric, + leaf_radius=eps, + leaf_capacity=neighbors, + pivoting=pivoting, + ) + _test_ball_search(dataset, metric, vpt, eps) + _test_knn_search(dataset, metric, vpt, neighbors) + _test_nn_search(dataset, metric, vpt) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..255772d --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,67 @@ +""" +Test utilities. +""" + +import random + +import numpy as np + + +def list_int_random(num=1000): + """ + Create a list of random integers. + """ + return random.sample(list(range(num)), num) + + +def dataset_empty(): + """ + Create an empty dataset. + """ + return np.array([]) + + +def dataset_simple(scale=1.0): + """ + Create a simple dataset of points in a 2D space. + + This dataset consists of four points forming the corners of a rectangle + such that two sides are longer than the other two. + """ + return scale * np.array( + [ + [0.0, 1.0], + [1.1, 0.0], + [0.0, 0.0], + [1.1, 1.0], + ] + ) + + +def dataset_random(dim=1, num=1000, scale=1.0): + """ + Create a random dataset of points in the unit square. + """ + return np.array([scale * np.random.rand(dim) for _ in range(num)]) + + +def dataset_two_lines(num=1000, scale=1.0): + """ + Create a dataset consisting of two lines in the unit square. + One line is horizontal at y=0, the other is vertical at x=1. + """ + t = np.linspace(0.0, 1.0, num) + line1 = scale * np.array([[x, 0.0] for x in t]) + line2 = scale * np.array([[x, 1.0] for x in t]) + return np.concatenate((line1, line2), axis=0) + + +def dataset_grid(num=1000, scale=1.0): + """ + Create a grid dataset in the unit square. + The grid consists of points evenly spaced in both dimensions. + """ + t = np.linspace(0.0, 1.0, num) + s = np.linspace(0.0, 1.0, num) + grid = scale * np.array([[x, y] for x in t for y in s]) + return grid