-
Notifications
You must be signed in to change notification settings - Fork 0
Init: KDTree #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Init: KDTree #3
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| .idea/ | ||
| .mypy_cache/ | ||
| .pytest_cache/ | ||
| __pycache__/ | ||
| .hypothesis/ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| from typing import Callable, Optional | ||
|
|
||
| import numpy as np | ||
|
|
||
| from src.knn.kd_tree.kdtree import KDTree | ||
|
|
||
|
|
||
| class KNNClassifier: | ||
| def __init__(self, k: int, leaf_size: int, metric: Callable): | ||
| self.targets: Optional[np._typing.NDArray] = None | ||
| self.classifier: Optional[dict] = None | ||
| self.model: Optional[KDTree] = None | ||
| self.k = k | ||
| self.leaf_size = leaf_size | ||
| self.metric = metric | ||
|
|
||
| def fit(self, features: np._typing.NDArray, targets: np._typing.NDArray) -> None: | ||
| if len(features) != len(targets): | ||
| raise ValueError("Features and targets must be same lenght") | ||
| self.model = KDTree(features, self.leaf_size, self.metric) | ||
| self.classifier = dict((tuple(pair[0]), pair[1]) for pair in zip(features.tolist(), targets.tolist())) | ||
| self.targets = targets | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Не понимаю зачем хранить таргеты отдельно, если они уже есть в |
||
|
|
||
| def _predict_proba(self, data: np._typing.NDArray) -> list: | ||
| if self.model is None or self.classifier is None or self.targets is None: | ||
| raise ValueError("Model unfitted") | ||
| probability = [] | ||
| for point in data: | ||
| point = tuple(point) | ||
| if point in self.classifier: | ||
| probability.append( | ||
| ( | ||
| np.unique(self.targets), | ||
| (self.classifier[point] == np.unique(self.targets)).astype(int), | ||
| ) | ||
| ) | ||
|
Comment on lines
+30
to
+36
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Слабо понимаю что тут вообще происходит, но кажется это некорректно, как минимум потому что могут быть две одинаковые точки с разными лейблами |
||
| else: | ||
| result = self.model.query([point], self.k) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. У тебя |
||
| target_result = np.array([self.classifier[tuple(neighbors.tolist())] for neighbors in result[0]]) | ||
| counts = np.array([(target_result == val).sum() for val in np.unique(self.targets)]) | ||
| probability.append((self.targets, counts / len(result[0]))) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Кажется сохранять в каждом предикте таргеты это оверхед по памяти, у тебя они всё равно используются только в классе There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. А еще почему выше |
||
| return probability | ||
|
|
||
| def predict_proba(self, data: np._typing.NDArray) -> np._typing.ArrayLike: | ||
| results = self._predict_proba(data) | ||
| return np.array([result[1] for result in results]) | ||
|
|
||
| def predict(self, data: np._typing.NDArray) -> np._typing.ArrayLike: | ||
| results = self._predict_proba(data) | ||
| return np.array([result[0][np.argmax(result[1])] for result in results]) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| import heapq | ||
| from typing import Optional | ||
|
|
||
| import numpy as np | ||
|
|
||
| from src.knn.knn_typing import PointType | ||
|
|
||
|
|
||
| class Heap: | ||
| def __init__(self, size: int): | ||
| self.id = 1 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Не совсем понимаю предназначение |
||
| self.size = size | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ну скорее |
||
| self.heap: list[tuple[float, int, Optional[PointType]]] = [(-np.inf, 1, None)] | ||
| heapq.heapify(self.heap) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. А зачем делать |
||
|
|
||
| def push(self, addition: tuple[float, PointType]) -> None: | ||
| changed_addition = (-addition[0], self.id, addition[1]) | ||
| if len(self.heap) < self.size: | ||
| heapq.heappush(self.heap, changed_addition) | ||
| elif changed_addition[0] > self.heap[0][0]: | ||
| heapq.heappop(self.heap) | ||
| heapq.heappush(self.heap, changed_addition) | ||
| self.id += 1 | ||
|
|
||
| def get_max(self) -> tuple[float, int, Optional[PointType]]: | ||
| return self.heap[0] | ||
|
|
||
| def get_all_elements(self) -> list[tuple[float, int, Optional[PointType]]]: | ||
| return self.heap | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| from dataclasses import dataclass | ||
| from typing import Callable, Optional | ||
|
|
||
| import numpy as np | ||
|
|
||
| from src.knn.kd_tree.heap import Heap | ||
| from src.knn.knn_typing import PointsContainer, PointType | ||
|
|
||
|
|
||
| @dataclass | ||
| class Leaf: | ||
| points: PointsContainer | ||
|
|
||
|
|
||
| @dataclass | ||
| class Node: | ||
| key: PointType | ||
| axis: int | ||
| left: Optional["Node"] | Leaf = None | ||
| right: Optional["Node"] | Leaf = None | ||
|
|
||
|
|
||
| class KDTree: | ||
| def __init__(self, points: PointsContainer, leaf_size: int, metric: Callable): | ||
| self.leaf_size = leaf_size | ||
| self.root = self._build_tree(points) | ||
| self.metric = metric | ||
|
|
||
| @staticmethod | ||
| def _validate_points(points: PointsContainer) -> np._typing.NDArray: | ||
| if len(np.unique(points, axis=0)) != len(points): | ||
| raise ValueError("Points should be unique") | ||
| try: | ||
| valid_points = np.array(points) | ||
| except ValueError: | ||
| raise ValueError("Points container should be Sequence and all points must have same arity") | ||
| if len(valid_points.shape) != 2: | ||
| raise ValueError("Points should be from R^(m x n)") | ||
| return valid_points | ||
|
|
||
| def _build_tree(self, points: PointsContainer) -> Node | Leaf: | ||
| def build_tree_recursion(points: np._typing.NDArray) -> Node | Leaf: | ||
| if len(points) < self.leaf_size * 2 + 1: | ||
| return Leaf(points) | ||
| axis = int(np.argmax(points.std(axis=0))) | ||
| sorted_x = points[points[:, axis].argsort()] | ||
| current_root = Node(sorted_x[len(points) // 2], axis) | ||
| current_root.left = build_tree_recursion(np.array([sorted_x[i] for i in range(len(points) // 2)])) | ||
| current_root.right = build_tree_recursion( | ||
| np.array([sorted_x[i] for i in range(len(points) // 2 + 1, len(points))]) | ||
| ) | ||
| return current_root | ||
|
|
||
| if self.leaf_size <= 0: | ||
| raise ValueError("Leaf size must be positive") | ||
|
|
||
| valid_points = self._validate_points(points) | ||
| self.dim: int = valid_points.shape[1] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Наверное для удобства стоит в конструкторе объявить через |
||
| return build_tree_recursion(valid_points) | ||
|
|
||
| def _find_k_neighbors(self, fixed_point: PointType, k: int) -> PointsContainer: | ||
| heap = Heap(k) | ||
|
|
||
| def _find_recursion(curr_node: Optional[Node | Leaf]) -> None: | ||
| if curr_node is None: | ||
| return | ||
| if isinstance(curr_node, Leaf): | ||
| distances = [self.metric(point, fixed_point) for point in curr_node.points] | ||
| for pair in zip(distances, curr_node.points): | ||
| heap.push(pair) | ||
| return | ||
| distance = self.metric(curr_node.key, fixed_point) | ||
| if -distance > heap.get_max()[0]: | ||
| heap.push((distance, curr_node.key)) | ||
| if fixed_point[curr_node.axis] < curr_node.key[curr_node.axis]: | ||
| _find_recursion(curr_node.left) | ||
| if np.abs(curr_node.key[curr_node.axis] - fixed_point[curr_node.axis]) < -heap.get_max()[0]: | ||
| _find_recursion(curr_node.right) | ||
| else: | ||
| _find_recursion(curr_node.right) | ||
| if np.abs(curr_node.key[curr_node.axis] - fixed_point[curr_node.axis]) < -heap.get_max()[0]: | ||
| _find_recursion(curr_node.left) | ||
|
|
||
| _find_recursion(self.root) | ||
| return np.array([pair[2] for pair in heap.get_all_elements()]) | ||
|
|
||
| def query(self, points: PointsContainer, k: int) -> PointsContainer: | ||
| if k <= 0: | ||
| raise ValueError("Number of neighbors must be positive") | ||
| valid_points = self._validate_points(points) | ||
| if valid_points.shape[1] != self.dim: | ||
| raise ValueError("Incorrect points arity") | ||
| return np.array([self._find_k_neighbors(point, k) for point in valid_points]) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| from typing import Sequence | ||
|
|
||
| import numpy as np | ||
|
|
||
| PointType = Sequence | np._typing.NDArray | ||
| PointsContainer = Sequence[PointType] | np._typing.NDArray |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| import numpy as np | ||
|
|
||
|
|
||
| def confusion_matrix(y_pred: np._typing.NDArray, y_true: np._typing.NDArray) -> np._typing.NDArray: | ||
| if len(y_pred) == 0 or len(y_true) == 0: | ||
| raise ValueError("Empty y_pred or y_true") | ||
| if y_true.shape != y_pred.shape: | ||
| raise ValueError("y_true and y_pred must have the same shape") | ||
|
|
||
| unique_labels = np.unique(np.concatenate((y_true, y_pred))) | ||
| if unique_labels.size == 1: | ||
| unique_labels = np.array([unique_labels[0], unique_labels[0] + 1]) | ||
|
|
||
| label_map = {label: idx for idx, label in enumerate(unique_labels)} | ||
|
|
||
| matrix = np.zeros((len(unique_labels), len(unique_labels)), dtype=int) | ||
|
|
||
| for true, pred in zip(y_true, y_pred): | ||
| matrix[label_map[true], label_map[pred]] += 1 | ||
|
|
||
| return matrix | ||
|
|
||
|
|
||
| def accuracy_score(y_pred: np._typing.NDArray, y_true: np._typing.NDArray) -> float: | ||
| if len(y_pred) == 0 or len(y_true) == 0: | ||
| raise ValueError("Empty y_pred or y_true") | ||
| if y_true.shape != y_pred.shape: | ||
| raise ValueError("y_true and y_pred must have the same shape") | ||
| return np.sum(y_pred == y_true) / len(y_true) | ||
|
|
||
|
|
||
| def f1_score( | ||
| y_pred: np._typing.NDArray, | ||
| y_true: np._typing.NDArray, | ||
| zero_division: float = 0.0, | ||
| ) -> float: | ||
| cm = confusion_matrix(y_true, y_pred) | ||
| if cm.shape == (2, 2): | ||
| tn, fp, fn, tp = cm.ravel() | ||
| if tp == 0 or (tp + fp + fn) == 0: | ||
| return zero_division | ||
| else: | ||
| tp = np.diag(cm) | ||
| fp = np.sum(cm, axis=0) - tp | ||
| fn = np.sum(cm, axis=1) - tp | ||
| if np.any(tp == 0) or np.any((tp + fn + fp) == 0): | ||
| return zero_division | ||
| precision = tp / (tp + fp) | ||
| recall = tp / (tp + fn) | ||
| f1 = 2 * (precision * recall) / (precision + recall) | ||
| return f1.mean() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| from abc import ABCMeta | ||
| from typing import Optional | ||
|
|
||
| import numpy as np | ||
|
|
||
|
|
||
| class AbstractScaler(metaclass=ABCMeta): | ||
| def fit(self, data: np._typing.NDArray) -> None: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. В декоратор |
||
| raise NotImplementedError() | ||
|
|
||
| def transform(self, data: np._typing.NDArray) -> np._typing.NDArray: | ||
| raise NotImplementedError() | ||
|
|
||
| def fit_transform(self, data: np._typing.NDArray) -> np._typing.NDArray: | ||
| self.fit(data) | ||
| return self.transform(data) | ||
|
|
||
|
|
||
| class MinMaxScaler(AbstractScaler): | ||
| def __init__(self) -> None: | ||
| self.data_min: Optional[float] = None | ||
| self.data_max: Optional[float] = None | ||
|
|
||
| def fit(self, data: np._typing.NDArray) -> None: | ||
| if len(data) == 0: | ||
| raise ValueError("Empty input data") | ||
| self.data_min = data.min(axis=0) | ||
| self.data_max = data.max(axis=0) | ||
|
|
||
| def transform(self, data: np._typing.NDArray) -> np._typing.NDArray: | ||
| if self.data_min is None or self.data_max is None: | ||
| raise ValueError("Scaler unfitted") | ||
| if self.data_min == self.data_max: | ||
| return np.zeros(shape=data.shape) | ||
| return (data - self.data_min) / (self.data_max - self.data_min) | ||
|
|
||
|
|
||
| class RobustScaler(AbstractScaler): | ||
| def __init__(self) -> None: | ||
| self.median: Optional[float] = None | ||
| self.iqr: Optional[float] = None | ||
|
|
||
| def fit(self, data: np._typing.NDArray) -> None: | ||
| if len(data) == 0: | ||
| raise ValueError("Empty input data") | ||
| self.median = np.median(data, axis=0) | ||
| self.iqr = np.quantile(data, q=0.75, axis=0) - np.quantile(data, q=0.25, axis=0) | ||
|
|
||
| def transform(self, data: np._typing.NDArray) -> np._typing.NDArray: | ||
| if self.median is None or self.iqr is None: | ||
| raise ValueError("Scaler unfitted") | ||
| return (data - self.median) / self.iqr | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| from typing import Optional | ||
|
|
||
| import numpy as np | ||
|
|
||
|
|
||
| def train_test_split( | ||
| X: np._typing.NDArray, | ||
| Y: np._typing.NDArray, | ||
| test_size: float = 0.2, | ||
| shuffle: bool = True, | ||
| random_seed: Optional[int] = None, | ||
| ) -> tuple[np._typing.NDArray, np._typing.NDArray, np._typing.NDArray, np._typing.NDArray]: | ||
| if not 0 <= test_size <= 1: | ||
| raise ValueError("test_size must be between 0 and 1") | ||
|
|
||
| if len(X) != len(Y): | ||
| raise ValueError("X and Y must be of the same length") | ||
|
|
||
| if random_seed is not None: | ||
| np.random.seed(random_seed) | ||
|
|
||
| indices = np.arange(len(X)) | ||
|
|
||
| if shuffle: | ||
| np.random.shuffle(indices) | ||
|
|
||
| split_idx = int(len(indices) * (1 - test_size)) | ||
|
|
||
| train_indices = indices[:split_idx] | ||
| test_indices = indices[split_idx:] | ||
|
|
||
| X_train, X_test = X[train_indices], X[test_indices] | ||
| Y_train, Y_test = Y[train_indices], Y[test_indices] | ||
|
|
||
| return X_train, X_test, Y_train, Y_test |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| import numpy as np | ||
| import pytest | ||
| from scipy.spatial.distance import euclidean | ||
|
|
||
| from src.knn.classifier.knn_classifier import KNNClassifier | ||
|
|
||
|
|
||
| class TestKNNClassifier: | ||
| @pytest.fixture | ||
| def knn(self): | ||
| return KNNClassifier(k=3, leaf_size=2, metric=euclidean) | ||
|
|
||
| @pytest.fixture | ||
| def sample_data(self): | ||
| X_train = np.array([[1, 2], [2, 3], [3, 4], [5, 5], [6, 6]]) | ||
| y_train = np.array([0, 0, 1, 1, 1]) | ||
| X_test = np.array([[2, 2], [4, 5]]) | ||
| return X_train, y_train, X_test | ||
|
|
||
| @pytest.mark.parametrize("count", [10, 100, 500, 1000]) | ||
| def test_predict_proba_equal_probability(self, count): | ||
| features = np.random.rand(count, 2) | ||
| target = np.random.permutation([1] * (count // 2) + [0] * (count // 2)) | ||
| x_test = np.random.rand(100, 2) | ||
| classifier = KNNClassifier(count, 2, euclidean) | ||
| classifier.fit(features, target) | ||
| result = classifier.predict_proba(x_test) | ||
| assert np.allclose(result.mean(axis=0), [0.5, 0.5]) | ||
| np.testing.assert_allclose(result.sum(axis=1), 1, atol=1e-6) | ||
|
|
||
| def test_already_existed_point(self, knn): | ||
| train = np.array([(0, 1), (1, 0), (0, 0), (1, 1)]) | ||
| target = np.array([0, 1, 0, 1]) | ||
| x = np.array([(0, 1)]) | ||
| knn.fit(train, target) | ||
| assert np.array_equal(knn.predict_proba(x), np.array([[1, 0]])) | ||
|
|
||
| def test_predict(self, knn, sample_data): | ||
| X_train, y_train, X_test = sample_data | ||
| knn.fit(X_train, y_train) | ||
| predictions = knn.predict(X_test) | ||
|
|
||
| assert len(predictions) == len(X_test) | ||
| assert all(p in [0, 1] for p in predictions) | ||
|
|
||
| def test_predict_proba(self, knn, sample_data): | ||
| X_train, y_train, X_test = sample_data | ||
| knn.fit(X_train, y_train) | ||
| proba = knn.predict_proba(X_test) | ||
|
|
||
| assert proba.shape == (len(X_test), len(set(y_train))) | ||
| assert np.all(proba >= 0) and np.all(proba <= 1) | ||
| assert np.allclose(proba.sum(axis=1), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Что мешает двум одинаковым точкам иметь разные лейблы?