diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..78a5ecd --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea/ +.mypy_cache/ +.pytest_cache/ +__pycache__/ +.hypothesis/ diff --git a/src/knn/classifier/knn_classifier.py b/src/knn/classifier/knn_classifier.py new file mode 100644 index 0000000..935ad3b --- /dev/null +++ b/src/knn/classifier/knn_classifier.py @@ -0,0 +1,50 @@ +from typing import Callable, Optional + +import numpy as np + +from src.knn.kd_tree.kdtree import KDTree + + +class KNNClassifier: + def __init__(self, k: int, leaf_size: int, metric: Callable): + self.targets: Optional[np._typing.NDArray] = None + self.classifier: Optional[dict] = None + self.model: Optional[KDTree] = None + self.k = k + self.leaf_size = leaf_size + self.metric = metric + + def fit(self, features: np._typing.NDArray, targets: np._typing.NDArray) -> None: + if len(features) != len(targets): + raise ValueError("Features and targets must be same lenght") + self.model = KDTree(features, self.leaf_size, self.metric) + self.classifier = dict((tuple(pair[0]), pair[1]) for pair in zip(features.tolist(), targets.tolist())) + self.targets = targets + + def _predict_proba(self, data: np._typing.NDArray) -> list: + if self.model is None or self.classifier is None or self.targets is None: + raise ValueError("Model unfitted") + probability = [] + for point in data: + point = tuple(point) + if point in self.classifier: + probability.append( + ( + np.unique(self.targets), + (self.classifier[point] == np.unique(self.targets)).astype(int), + ) + ) + else: + result = self.model.query([point], self.k) + target_result = np.array([self.classifier[tuple(neighbors.tolist())] for neighbors in result[0]]) + counts = np.array([(target_result == val).sum() for val in np.unique(self.targets)]) + probability.append((self.targets, counts / len(result[0]))) + return probability + + def predict_proba(self, data: np._typing.NDArray) -> np._typing.ArrayLike: + results = self._predict_proba(data) + return np.array([result[1] for result in results]) + + def predict(self, data: np._typing.NDArray) -> np._typing.ArrayLike: + results = self._predict_proba(data) + return np.array([result[0][np.argmax(result[1])] for result in results]) diff --git a/src/knn/kd_tree/heap.py b/src/knn/kd_tree/heap.py new file mode 100644 index 0000000..d7f7af8 --- /dev/null +++ b/src/knn/kd_tree/heap.py @@ -0,0 +1,29 @@ +import heapq +from typing import Optional + +import numpy as np + +from src.knn.knn_typing import PointType + + +class Heap: + def __init__(self, size: int): + self.id = 1 + self.size = size + self.heap: list[tuple[float, int, Optional[PointType]]] = [(-np.inf, 1, None)] + heapq.heapify(self.heap) + + def push(self, addition: tuple[float, PointType]) -> None: + changed_addition = (-addition[0], self.id, addition[1]) + if len(self.heap) < self.size: + heapq.heappush(self.heap, changed_addition) + elif changed_addition[0] > self.heap[0][0]: + heapq.heappop(self.heap) + heapq.heappush(self.heap, changed_addition) + self.id += 1 + + def get_max(self) -> tuple[float, int, Optional[PointType]]: + return self.heap[0] + + def get_all_elements(self) -> list[tuple[float, int, Optional[PointType]]]: + return self.heap diff --git a/src/knn/kd_tree/kdtree.py b/src/knn/kd_tree/kdtree.py new file mode 100644 index 0000000..2274db2 --- /dev/null +++ b/src/knn/kd_tree/kdtree.py @@ -0,0 +1,93 @@ +from dataclasses import dataclass +from typing import Callable, Optional + +import numpy as np + +from src.knn.kd_tree.heap import Heap +from src.knn.knn_typing import PointsContainer, PointType + + +@dataclass +class Leaf: + points: PointsContainer + + +@dataclass +class Node: + key: PointType + axis: int + left: Optional["Node"] | Leaf = None + right: Optional["Node"] | Leaf = None + + +class KDTree: + def __init__(self, points: PointsContainer, leaf_size: int, metric: Callable): + self.leaf_size = leaf_size + self.root = self._build_tree(points) + self.metric = metric + + @staticmethod + def _validate_points(points: PointsContainer) -> np._typing.NDArray: + if len(np.unique(points, axis=0)) != len(points): + raise ValueError("Points should be unique") + try: + valid_points = np.array(points) + except ValueError: + raise ValueError("Points container should be Sequence and all points must have same arity") + if len(valid_points.shape) != 2: + raise ValueError("Points should be from R^(m x n)") + return valid_points + + def _build_tree(self, points: PointsContainer) -> Node | Leaf: + def build_tree_recursion(points: np._typing.NDArray) -> Node | Leaf: + if len(points) < self.leaf_size * 2 + 1: + return Leaf(points) + axis = int(np.argmax(points.std(axis=0))) + sorted_x = points[points[:, axis].argsort()] + current_root = Node(sorted_x[len(points) // 2], axis) + current_root.left = build_tree_recursion(np.array([sorted_x[i] for i in range(len(points) // 2)])) + current_root.right = build_tree_recursion( + np.array([sorted_x[i] for i in range(len(points) // 2 + 1, len(points))]) + ) + return current_root + + if self.leaf_size <= 0: + raise ValueError("Leaf size must be positive") + + valid_points = self._validate_points(points) + self.dim: int = valid_points.shape[1] + return build_tree_recursion(valid_points) + + def _find_k_neighbors(self, fixed_point: PointType, k: int) -> PointsContainer: + heap = Heap(k) + + def _find_recursion(curr_node: Optional[Node | Leaf]) -> None: + if curr_node is None: + return + if isinstance(curr_node, Leaf): + distances = [self.metric(point, fixed_point) for point in curr_node.points] + for pair in zip(distances, curr_node.points): + heap.push(pair) + return + distance = self.metric(curr_node.key, fixed_point) + if -distance > heap.get_max()[0]: + heap.push((distance, curr_node.key)) + if fixed_point[curr_node.axis] < curr_node.key[curr_node.axis]: + _find_recursion(curr_node.left) + if np.abs(curr_node.key[curr_node.axis] - fixed_point[curr_node.axis]) < -heap.get_max()[0]: + _find_recursion(curr_node.right) + else: + _find_recursion(curr_node.right) + if np.abs(curr_node.key[curr_node.axis] - fixed_point[curr_node.axis]) < -heap.get_max()[0]: + _find_recursion(curr_node.left) + + _find_recursion(self.root) + return np.array([pair[2] for pair in heap.get_all_elements()]) + + def query(self, points: PointsContainer, k: int) -> PointsContainer: + if k <= 0: + raise ValueError("Number of neighbors must be positive") + valid_points = self._validate_points(points) + if valid_points.shape[1] != self.dim: + raise ValueError("Incorrect points arity") + return np.array([self._find_k_neighbors(point, k) for point in valid_points]) diff --git a/src/knn/knn_typing.py b/src/knn/knn_typing.py new file mode 100644 index 0000000..5ca7773 --- /dev/null +++ b/src/knn/knn_typing.py @@ -0,0 +1,6 @@ +from typing import Sequence + +import numpy as np + +PointType = Sequence | np._typing.NDArray +PointsContainer = Sequence[PointType] | np._typing.NDArray diff --git a/src/knn/processing/metrics.py b/src/knn/processing/metrics.py new file mode 100644 index 0000000..a99df03 --- /dev/null +++ b/src/knn/processing/metrics.py @@ -0,0 +1,51 @@ +import numpy as np + + +def confusion_matrix(y_pred: np._typing.NDArray, y_true: np._typing.NDArray) -> np._typing.NDArray: + if len(y_pred) == 0 or len(y_true) == 0: + raise ValueError("Empty y_pred or y_true") + if y_true.shape != y_pred.shape: + raise ValueError("y_true and y_pred must have the same shape") + + unique_labels = np.unique(np.concatenate((y_true, y_pred))) + if unique_labels.size == 1: + unique_labels = np.array([unique_labels[0], unique_labels[0] + 1]) + + label_map = {label: idx for idx, label in enumerate(unique_labels)} + + matrix = np.zeros((len(unique_labels), len(unique_labels)), dtype=int) + + for true, pred in zip(y_true, y_pred): + matrix[label_map[true], label_map[pred]] += 1 + + return matrix + + +def accuracy_score(y_pred: np._typing.NDArray, y_true: np._typing.NDArray) -> float: + if len(y_pred) == 0 or len(y_true) == 0: + raise ValueError("Empty y_pred or y_true") + if y_true.shape != y_pred.shape: + raise ValueError("y_true and y_pred must have the same shape") + return np.sum(y_pred == y_true) / len(y_true) + + +def f1_score( + y_pred: np._typing.NDArray, + y_true: np._typing.NDArray, + zero_division: float = 0.0, +) -> float: + cm = confusion_matrix(y_true, y_pred) + if cm.shape == (2, 2): + tn, fp, fn, tp = cm.ravel() + if tp == 0 or (tp + fp + fn) == 0: + return zero_division + else: + tp = np.diag(cm) + fp = np.sum(cm, axis=0) - tp + fn = np.sum(cm, axis=1) - tp + if np.any(tp == 0) or np.any((tp + fn + fp) == 0): + return zero_division + precision = tp / (tp + fp) + recall = tp / (tp + fn) + f1 = 2 * (precision * recall) / (precision + recall) + return f1.mean() diff --git a/src/knn/processing/scalers.py b/src/knn/processing/scalers.py new file mode 100644 index 0000000..840ca5d --- /dev/null +++ b/src/knn/processing/scalers.py @@ -0,0 +1,52 @@ +from abc import ABCMeta +from typing import Optional + +import numpy as np + + +class AbstractScaler(metaclass=ABCMeta): + def fit(self, data: np._typing.NDArray) -> None: + raise NotImplementedError() + + def transform(self, data: np._typing.NDArray) -> np._typing.NDArray: + raise NotImplementedError() + + def fit_transform(self, data: np._typing.NDArray) -> np._typing.NDArray: + self.fit(data) + return self.transform(data) + + +class MinMaxScaler(AbstractScaler): + def __init__(self) -> None: + self.data_min: Optional[float] = None + self.data_max: Optional[float] = None + + def fit(self, data: np._typing.NDArray) -> None: + if len(data) == 0: + raise ValueError("Empty input data") + self.data_min = data.min(axis=0) + self.data_max = data.max(axis=0) + + def transform(self, data: np._typing.NDArray) -> np._typing.NDArray: + if self.data_min is None or self.data_max is None: + raise ValueError("Scaler unfitted") + if self.data_min == self.data_max: + return np.zeros(shape=data.shape) + return (data - self.data_min) / (self.data_max - self.data_min) + + +class RobustScaler(AbstractScaler): + def __init__(self) -> None: + self.median: Optional[float] = None + self.iqr: Optional[float] = None + + def fit(self, data: np._typing.NDArray) -> None: + if len(data) == 0: + raise ValueError("Empty input data") + self.median = np.median(data, axis=0) + self.iqr = np.quantile(data, q=0.75, axis=0) - np.quantile(data, q=0.25, axis=0) + + def transform(self, data: np._typing.NDArray) -> np._typing.NDArray: + if self.median is None or self.iqr is None: + raise ValueError("Scaler unfitted") + return (data - self.median) / self.iqr diff --git a/src/knn/processing/train_test_split.py b/src/knn/processing/train_test_split.py new file mode 100644 index 0000000..68267cb --- /dev/null +++ b/src/knn/processing/train_test_split.py @@ -0,0 +1,35 @@ +from typing import Optional + +import numpy as np + + +def train_test_split( + X: np._typing.NDArray, + Y: np._typing.NDArray, + test_size: float = 0.2, + shuffle: bool = True, + random_seed: Optional[int] = None, +) -> tuple[np._typing.NDArray, np._typing.NDArray, np._typing.NDArray, np._typing.NDArray]: + if not 0 <= test_size <= 1: + raise ValueError("test_size must be between 0 and 1") + + if len(X) != len(Y): + raise ValueError("X and Y must be of the same length") + + if random_seed is not None: + np.random.seed(random_seed) + + indices = np.arange(len(X)) + + if shuffle: + np.random.shuffle(indices) + + split_idx = int(len(indices) * (1 - test_size)) + + train_indices = indices[:split_idx] + test_indices = indices[split_idx:] + + X_train, X_test = X[train_indices], X[test_indices] + Y_train, Y_test = Y[train_indices], Y[test_indices] + + return X_train, X_test, Y_train, Y_test diff --git a/tests/knn/classifier/test_classifier.py b/tests/knn/classifier/test_classifier.py new file mode 100644 index 0000000..e335d3d --- /dev/null +++ b/tests/knn/classifier/test_classifier.py @@ -0,0 +1,53 @@ +import numpy as np +import pytest +from scipy.spatial.distance import euclidean + +from src.knn.classifier.knn_classifier import KNNClassifier + + +class TestKNNClassifier: + @pytest.fixture + def knn(self): + return KNNClassifier(k=3, leaf_size=2, metric=euclidean) + + @pytest.fixture + def sample_data(self): + X_train = np.array([[1, 2], [2, 3], [3, 4], [5, 5], [6, 6]]) + y_train = np.array([0, 0, 1, 1, 1]) + X_test = np.array([[2, 2], [4, 5]]) + return X_train, y_train, X_test + + @pytest.mark.parametrize("count", [10, 100, 500, 1000]) + def test_predict_proba_equal_probability(self, count): + features = np.random.rand(count, 2) + target = np.random.permutation([1] * (count // 2) + [0] * (count // 2)) + x_test = np.random.rand(100, 2) + classifier = KNNClassifier(count, 2, euclidean) + classifier.fit(features, target) + result = classifier.predict_proba(x_test) + assert np.allclose(result.mean(axis=0), [0.5, 0.5]) + np.testing.assert_allclose(result.sum(axis=1), 1, atol=1e-6) + + def test_already_existed_point(self, knn): + train = np.array([(0, 1), (1, 0), (0, 0), (1, 1)]) + target = np.array([0, 1, 0, 1]) + x = np.array([(0, 1)]) + knn.fit(train, target) + assert np.array_equal(knn.predict_proba(x), np.array([[1, 0]])) + + def test_predict(self, knn, sample_data): + X_train, y_train, X_test = sample_data + knn.fit(X_train, y_train) + predictions = knn.predict(X_test) + + assert len(predictions) == len(X_test) + assert all(p in [0, 1] for p in predictions) + + def test_predict_proba(self, knn, sample_data): + X_train, y_train, X_test = sample_data + knn.fit(X_train, y_train) + proba = knn.predict_proba(X_test) + + assert proba.shape == (len(X_test), len(set(y_train))) + assert np.all(proba >= 0) and np.all(proba <= 1) + assert np.allclose(proba.sum(axis=1), 1) diff --git a/tests/knn/kdtree/test_kdtree.py b/tests/knn/kdtree/test_kdtree.py new file mode 100644 index 0000000..05859b8 --- /dev/null +++ b/tests/knn/kdtree/test_kdtree.py @@ -0,0 +1,50 @@ +import numpy as np +import pytest +from scipy.spatial.distance import euclidean + +from src.knn.kd_tree.kdtree import KDTree + + +class DefaultFounder: + def __init__(self, X): + self.X = X + + def k_neighbors(self, points, k): + def one_point_find(fixed): + res = sorted([(euclidean(fixed, p), p) for p in self.X], key=lambda x: x[0])[:k] + return [np.array(pair[1]) for pair in res] + + return [one_point_find(point) for point in points] + + +class TestKDTree: + @pytest.fixture + def get_dataset(self): + point_dim = list(range(2, 30)) + train_dataset = [np.random.rand(200, dim) for dim in point_dim] + test_dataset = [np.random.rand(30, dim) for dim in point_dim] + return train_dataset, test_dataset + + def test_k_neighbors_search(self, get_dataset): + train_dataset, test_dataset = get_dataset + for train, test in zip(train_dataset, test_dataset): + tree = KDTree(train, 2, euclidean) + tree_res = tree.query(test, 5) + default = DefaultFounder(train) + default_res = default.k_neighbors(test, 5) + assert np.array_equiv(np.sort(tree_res, axis=1), np.sort(default_res, axis=1)) + + @pytest.mark.parametrize( + "points", + [ + [1, 2, 3, 4, 5], + [(1, 1), (1, 1), (2, 2)], + [(1, 1), (1, 1, 1)], + {1: 2, 3: 4}, + "123", + (1, 2), + ], + ) + def test_validate_points(self, points): + with pytest.raises(ValueError): + tree = KDTree(points, 2, euclidean) diff --git a/tests/knn/processing/test_metrics.py b/tests/knn/processing/test_metrics.py new file mode 100644 index 0000000..7686da5 --- /dev/null +++ b/tests/knn/processing/test_metrics.py @@ -0,0 +1,51 @@ +import numpy as np +import pytest + +from src.knn.processing.metrics import accuracy_score, f1_score + + +class TestAccuracyScore: + + @pytest.mark.parametrize( + ["y_true", "y_pred", "expected"], + [ + ([1, 0, 1, 1, 0], [1, 0, 1, 1, 0], 1.0), + ([1, 0, 1, 1, 0], [0, 1, 0, 0, 1], 0.0), + ([1, 0, 1, 1, 0], [1, 0, 0, 1, 1], 3 / 5), + ([2, 0, 1, 2, 1, 0], [2, 0, 1, 1, 2, 0], 4 / 6), + ], + ) + def test_different_accuracy(self, y_true, y_pred, expected): + assert accuracy_score(np.array(y_pred), np.array(y_true)) == expected + + def test_empty_input(self): + with pytest.raises(ValueError): + accuracy_score(np.array([]), np.array([])) + + def test_mismatched_lengths(self): + y_true = np.array([1, 0, 1]) + y_pred = np.array([1, 0]) + with pytest.raises(ValueError): + accuracy_score(y_pred, y_true) + + +class TestFScore: + @pytest.mark.parametrize( + "y_true, y_pred, expected_f1", + [ + ([1, 0, 1, 1, 0], [1, 0, 1, 1, 0], 1.0), + ([1, 1, 1, 0, 0], [0, 0, 0, 1, 1], 0.0), + ([1, 0, 1, 1, 0], [1, 0, 0, 1, 1], 0.6667), + ([2, 0, 1, 2, 1, 0], [2, 0, 1, 1, 2, 0], 0.6667), + ], + ) + def test_f1_score(self, y_true, y_pred, expected_f1): + assert f1_score(np.array(y_pred), np.array(y_true)) == pytest.approx(expected_f1, rel=1e-3) + + @pytest.mark.parametrize( + "y_true, y_pred, expected_exception", + [([], [], ValueError), ([1, 0, 1], [1, 0], ValueError)], + ) + def test_f1_score_exceptions(self, y_true, y_pred, expected_exception): + with pytest.raises(expected_exception): + f1_score(np.array(y_true), np.array(y_pred)) diff --git a/tests/knn/processing/test_scalers.py b/tests/knn/processing/test_scalers.py new file mode 100644 index 0000000..46cf09a --- /dev/null +++ b/tests/knn/processing/test_scalers.py @@ -0,0 +1,78 @@ +import numpy as np +import pytest + +from src.knn.processing.scalers import MinMaxScaler, RobustScaler + + +class TestMinMaxScaler: + @pytest.fixture + def scaler(self): + return MinMaxScaler() + + @pytest.mark.parametrize( + "data, expected", + [ + ( + np.array([[1], [2], [3], [4], [5]]), + np.array([[0.0], [0.25], [0.5], [0.75], [1.0]]), + ), + ( + np.array([[-5], [0], [5], [10]]), + np.array([[0.0], [0.33], [0.67], [1.0]]), + ), + (np.array([[5], [5], [5]]), np.array([[0], [0], [0]])), + ], + ) + def test_different_scaling(self, scaler, data, expected): + transformed = scaler.fit_transform(data) + np.testing.assert_almost_equal(transformed, expected, decimal=2) + + def test_single_feature_input(self, scaler): + data = np.array([[10], [20], [30]]) + transformed = scaler.fit_transform(data) + assert transformed.min() == 0 and transformed.max() == 1 + + def test_unfitted_exception(self, scaler): + data = np.array([[5], [5], [5]]) + with pytest.raises(ValueError): + scaler.transform(data) + + def test_empty_input(self, scaler): + X = np.array([]) + with pytest.raises(ValueError): + scaler.fit_transform(X) + + +class TestRobustScaler: + @pytest.fixture + def scaler(self): + return RobustScaler() + + @pytest.mark.parametrize( + "X, median, first_quantile, third_quantile", + [ + (np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]), 0, -0.5, 0.5), + ( + np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12], [100, 100, 100]]), + 0, + -0.5, + 0.5, + ), + ], + ) + def test_different_scaling(self, scaler, X, median, first_quantile, third_quantile): + X_scaled = scaler.fit_transform(X) + assert np.allclose(np.median(X_scaled, axis=0), median) + assert np.allclose(np.percentile(X_scaled, 25, axis=0), first_quantile, atol=1e-2) + assert np.allclose(np.percentile(X_scaled, 75, axis=0), third_quantile, atol=1e-2) + + def test_empty_input(self, scaler): + X = np.array([]) + with pytest.raises(ValueError): + scaler.fit_transform(X) + + def test_multidimensional_data(self, scaler): + X = np.array([[1, 2], [3, 4], [5, 6], [7, 8]]) + X_scaled = scaler.fit_transform(X) + assert X_scaled.shape == X.shape + assert np.allclose(np.median(X_scaled, axis=0), 0) diff --git a/tests/knn/processing/test_train_test_split.py b/tests/knn/processing/test_train_test_split.py new file mode 100644 index 0000000..4779670 --- /dev/null +++ b/tests/knn/processing/test_train_test_split.py @@ -0,0 +1,46 @@ +import numpy as np +import pytest + +from src.knn.processing.train_test_split import train_test_split + + +class TestTrainTestSplit: + def test_basic_split(self): + X = np.arange(10).reshape((10, 1)) + y = np.arange(10) + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) + + assert len(X_train) == 8 + assert len(X_test) == 2 + assert len(y_train) == 8 + assert len(y_test) == 2 + + def test_different_test_sizes(self): + X = np.arange(20).reshape((20, 1)) + y = np.arange(20) + + for test_size in [0.1, 0.2, 0.5]: + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size) + assert len(X_test) == int(len(X) * test_size) + assert len(y_test) == int(len(y) * test_size) + + def test_random_state(self): + X = np.arange(50).reshape((50, 1)) + y = np.arange(50) + X_train1, X_test1, y_train1, y_test1 = train_test_split(X, y, test_size=0.2, random_seed=42) + X_train2, X_test2, y_train2, y_test2 = train_test_split(X, y, test_size=0.2, random_seed=42) + + assert np.array_equal(X_train1, X_train2) + assert np.array_equal(X_test1, X_test2) + assert np.array_equal(y_train1, y_train2) + assert np.array_equal(y_test1, y_test2) + + def test_small_dataset(self): + X = np.array([[1], [2], [3]]) + y = np.array([0, 1, 0]) + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_seed=1) + + assert len(X_train) + len(X_test) == len(X) + assert len(y_train) + len(y_test) == len(y)