Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.idea/
.mypy_cache/
.pytest_cache/
__pycache__/
.hypothesis/
50 changes: 50 additions & 0 deletions src/knn/classifier/knn_classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Callable, Optional

import numpy as np

from src.knn.kd_tree.kdtree import KDTree


class KNNClassifier:
def __init__(self, k: int, leaf_size: int, metric: Callable):
self.targets: Optional[np._typing.NDArray] = None
self.classifier: Optional[dict] = None
self.model: Optional[KDTree] = None
self.k = k
self.leaf_size = leaf_size
self.metric = metric

def fit(self, features: np._typing.NDArray, targets: np._typing.NDArray) -> None:
if len(features) != len(targets):
raise ValueError("Features and targets must be same lenght")
self.model = KDTree(features, self.leaf_size, self.metric)
self.classifier = dict((tuple(pair[0]), pair[1]) for pair in zip(features.tolist(), targets.tolist()))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Что мешает двум одинаковым точкам иметь разные лейблы?

self.targets = targets
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не понимаю зачем хранить таргеты отдельно, если они уже есть в self.classifier


def _predict_proba(self, data: np._typing.NDArray) -> list:
if self.model is None or self.classifier is None or self.targets is None:
raise ValueError("Model unfitted")
probability = []
for point in data:
point = tuple(point)
if point in self.classifier:
probability.append(
(
np.unique(self.targets),
(self.classifier[point] == np.unique(self.targets)).astype(int),
)
)
Comment on lines +30 to +36
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Слабо понимаю что тут вообще происходит, но кажется это некорректно, как минимум потому что могут быть две одинаковые точки с разными лейблами

else:
result = self.model.query([point], self.k)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

У тебя model.query принимает много точек сразу, почему бы их всех туда не передать?

target_result = np.array([self.classifier[tuple(neighbors.tolist())] for neighbors in result[0]])
counts = np.array([(target_result == val).sum() for val in np.unique(self.targets)])
probability.append((self.targets, counts / len(result[0])))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Кажется сохранять в каждом предикте таргеты это оверхед по памяти, у тебя они всё равно используются только в классе

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А еще почему выше np.unique(self.targets), а тут просто self.targets

return probability

def predict_proba(self, data: np._typing.NDArray) -> np._typing.ArrayLike:
results = self._predict_proba(data)
return np.array([result[1] for result in results])

def predict(self, data: np._typing.NDArray) -> np._typing.ArrayLike:
results = self._predict_proba(data)
return np.array([result[0][np.argmax(result[1])] for result in results])
29 changes: 29 additions & 0 deletions src/knn/kd_tree/heap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import heapq
from typing import Optional

import numpy as np

from src.knn.knn_typing import PointType


class Heap:
def __init__(self, size: int):
self.id = 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не совсем понимаю предназначение id

self.size = size
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну скорее capacity

self.heap: list[tuple[float, int, Optional[PointType]]] = [(-np.inf, 1, None)]
heapq.heapify(self.heap)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А зачем делать heapify для кучи из одного элемента?


def push(self, addition: tuple[float, PointType]) -> None:
changed_addition = (-addition[0], self.id, addition[1])
if len(self.heap) < self.size:
heapq.heappush(self.heap, changed_addition)
elif changed_addition[0] > self.heap[0][0]:
heapq.heappop(self.heap)
heapq.heappush(self.heap, changed_addition)
self.id += 1

def get_max(self) -> tuple[float, int, Optional[PointType]]:
return self.heap[0]

def get_all_elements(self) -> list[tuple[float, int, Optional[PointType]]]:
return self.heap
93 changes: 93 additions & 0 deletions src/knn/kd_tree/kdtree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from dataclasses import dataclass
from typing import Callable, Optional

import numpy as np

from src.knn.kd_tree.heap import Heap
from src.knn.knn_typing import PointsContainer, PointType


@dataclass
class Leaf:
points: PointsContainer


@dataclass
class Node:
key: PointType
axis: int
left: Optional["Node"] | Leaf = None
right: Optional["Node"] | Leaf = None


class KDTree:
def __init__(self, points: PointsContainer, leaf_size: int, metric: Callable):
self.leaf_size = leaf_size
self.root = self._build_tree(points)
self.metric = metric

@staticmethod
def _validate_points(points: PointsContainer) -> np._typing.NDArray:
if len(np.unique(points, axis=0)) != len(points):
raise ValueError("Points should be unique")
try:
valid_points = np.array(points)
except ValueError:
raise ValueError("Points container should be Sequence and all points must have same arity")
if len(valid_points.shape) != 2:
raise ValueError("Points should be from R^(m x n)")
return valid_points

def _build_tree(self, points: PointsContainer) -> Node | Leaf:
def build_tree_recursion(points: np._typing.NDArray) -> Node | Leaf:
if len(points) < self.leaf_size * 2 + 1:
return Leaf(points)
axis = int(np.argmax(points.std(axis=0)))
sorted_x = points[points[:, axis].argsort()]
current_root = Node(sorted_x[len(points) // 2], axis)
current_root.left = build_tree_recursion(np.array([sorted_x[i] for i in range(len(points) // 2)]))
current_root.right = build_tree_recursion(
np.array([sorted_x[i] for i in range(len(points) // 2 + 1, len(points))])
)
return current_root

if self.leaf_size <= 0:
raise ValueError("Leaf size must be positive")

valid_points = self._validate_points(points)
self.dim: int = valid_points.shape[1]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Наверное для удобства стоит в конструкторе объявить через None

return build_tree_recursion(valid_points)

def _find_k_neighbors(self, fixed_point: PointType, k: int) -> PointsContainer:
heap = Heap(k)

def _find_recursion(curr_node: Optional[Node | Leaf]) -> None:
if curr_node is None:
return
if isinstance(curr_node, Leaf):
distances = [self.metric(point, fixed_point) for point in curr_node.points]
for pair in zip(distances, curr_node.points):
heap.push(pair)
return
distance = self.metric(curr_node.key, fixed_point)
if -distance > heap.get_max()[0]:
heap.push((distance, curr_node.key))
if fixed_point[curr_node.axis] < curr_node.key[curr_node.axis]:
_find_recursion(curr_node.left)
if np.abs(curr_node.key[curr_node.axis] - fixed_point[curr_node.axis]) < -heap.get_max()[0]:
_find_recursion(curr_node.right)
else:
_find_recursion(curr_node.right)
if np.abs(curr_node.key[curr_node.axis] - fixed_point[curr_node.axis]) < -heap.get_max()[0]:
_find_recursion(curr_node.left)

_find_recursion(self.root)
return np.array([pair[2] for pair in heap.get_all_elements()])

def query(self, points: PointsContainer, k: int) -> PointsContainer:
if k <= 0:
raise ValueError("Number of neighbors must be positive")
valid_points = self._validate_points(points)
if valid_points.shape[1] != self.dim:
raise ValueError("Incorrect points arity")
return np.array([self._find_k_neighbors(point, k) for point in valid_points])
6 changes: 6 additions & 0 deletions src/knn/knn_typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Sequence

import numpy as np

PointType = Sequence | np._typing.NDArray
PointsContainer = Sequence[PointType] | np._typing.NDArray
51 changes: 51 additions & 0 deletions src/knn/processing/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import numpy as np


def confusion_matrix(y_pred: np._typing.NDArray, y_true: np._typing.NDArray) -> np._typing.NDArray:
if len(y_pred) == 0 or len(y_true) == 0:
raise ValueError("Empty y_pred or y_true")
if y_true.shape != y_pred.shape:
raise ValueError("y_true and y_pred must have the same shape")

unique_labels = np.unique(np.concatenate((y_true, y_pred)))
if unique_labels.size == 1:
unique_labels = np.array([unique_labels[0], unique_labels[0] + 1])

label_map = {label: idx for idx, label in enumerate(unique_labels)}

matrix = np.zeros((len(unique_labels), len(unique_labels)), dtype=int)

for true, pred in zip(y_true, y_pred):
matrix[label_map[true], label_map[pred]] += 1

return matrix


def accuracy_score(y_pred: np._typing.NDArray, y_true: np._typing.NDArray) -> float:
if len(y_pred) == 0 or len(y_true) == 0:
raise ValueError("Empty y_pred or y_true")
if y_true.shape != y_pred.shape:
raise ValueError("y_true and y_pred must have the same shape")
return np.sum(y_pred == y_true) / len(y_true)


def f1_score(
y_pred: np._typing.NDArray,
y_true: np._typing.NDArray,
zero_division: float = 0.0,
) -> float:
cm = confusion_matrix(y_true, y_pred)
if cm.shape == (2, 2):
tn, fp, fn, tp = cm.ravel()
if tp == 0 or (tp + fp + fn) == 0:
return zero_division
else:
tp = np.diag(cm)
fp = np.sum(cm, axis=0) - tp
fn = np.sum(cm, axis=1) - tp
if np.any(tp == 0) or np.any((tp + fn + fp) == 0):
return zero_division
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = 2 * (precision * recall) / (precision + recall)
return f1.mean()
52 changes: 52 additions & 0 deletions src/knn/processing/scalers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from abc import ABCMeta
from typing import Optional

import numpy as np


class AbstractScaler(metaclass=ABCMeta):
def fit(self, data: np._typing.NDArray) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В декоратор @abstractmethod стоило бы обернуть

raise NotImplementedError()

def transform(self, data: np._typing.NDArray) -> np._typing.NDArray:
raise NotImplementedError()

def fit_transform(self, data: np._typing.NDArray) -> np._typing.NDArray:
self.fit(data)
return self.transform(data)


class MinMaxScaler(AbstractScaler):
def __init__(self) -> None:
self.data_min: Optional[float] = None
self.data_max: Optional[float] = None

def fit(self, data: np._typing.NDArray) -> None:
if len(data) == 0:
raise ValueError("Empty input data")
self.data_min = data.min(axis=0)
self.data_max = data.max(axis=0)

def transform(self, data: np._typing.NDArray) -> np._typing.NDArray:
if self.data_min is None or self.data_max is None:
raise ValueError("Scaler unfitted")
if self.data_min == self.data_max:
return np.zeros(shape=data.shape)
return (data - self.data_min) / (self.data_max - self.data_min)


class RobustScaler(AbstractScaler):
def __init__(self) -> None:
self.median: Optional[float] = None
self.iqr: Optional[float] = None

def fit(self, data: np._typing.NDArray) -> None:
if len(data) == 0:
raise ValueError("Empty input data")
self.median = np.median(data, axis=0)
self.iqr = np.quantile(data, q=0.75, axis=0) - np.quantile(data, q=0.25, axis=0)

def transform(self, data: np._typing.NDArray) -> np._typing.NDArray:
if self.median is None or self.iqr is None:
raise ValueError("Scaler unfitted")
return (data - self.median) / self.iqr
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.iqr может быть равным нулю

35 changes: 35 additions & 0 deletions src/knn/processing/train_test_split.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Optional

import numpy as np


def train_test_split(
X: np._typing.NDArray,
Y: np._typing.NDArray,
test_size: float = 0.2,
shuffle: bool = True,
random_seed: Optional[int] = None,
) -> tuple[np._typing.NDArray, np._typing.NDArray, np._typing.NDArray, np._typing.NDArray]:
if not 0 <= test_size <= 1:
raise ValueError("test_size must be between 0 and 1")

if len(X) != len(Y):
raise ValueError("X and Y must be of the same length")

if random_seed is not None:
np.random.seed(random_seed)

indices = np.arange(len(X))

if shuffle:
np.random.shuffle(indices)

split_idx = int(len(indices) * (1 - test_size))

train_indices = indices[:split_idx]
test_indices = indices[split_idx:]

X_train, X_test = X[train_indices], X[test_indices]
Y_train, Y_test = Y[train_indices], Y[test_indices]

return X_train, X_test, Y_train, Y_test
53 changes: 53 additions & 0 deletions tests/knn/classifier/test_classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import numpy as np
import pytest
from scipy.spatial.distance import euclidean

from src.knn.classifier.knn_classifier import KNNClassifier


class TestKNNClassifier:
@pytest.fixture
def knn(self):
return KNNClassifier(k=3, leaf_size=2, metric=euclidean)

@pytest.fixture
def sample_data(self):
X_train = np.array([[1, 2], [2, 3], [3, 4], [5, 5], [6, 6]])
y_train = np.array([0, 0, 1, 1, 1])
X_test = np.array([[2, 2], [4, 5]])
return X_train, y_train, X_test

@pytest.mark.parametrize("count", [10, 100, 500, 1000])
def test_predict_proba_equal_probability(self, count):
features = np.random.rand(count, 2)
target = np.random.permutation([1] * (count // 2) + [0] * (count // 2))
x_test = np.random.rand(100, 2)
classifier = KNNClassifier(count, 2, euclidean)
classifier.fit(features, target)
result = classifier.predict_proba(x_test)
assert np.allclose(result.mean(axis=0), [0.5, 0.5])
np.testing.assert_allclose(result.sum(axis=1), 1, atol=1e-6)

def test_already_existed_point(self, knn):
train = np.array([(0, 1), (1, 0), (0, 0), (1, 1)])
target = np.array([0, 1, 0, 1])
x = np.array([(0, 1)])
knn.fit(train, target)
assert np.array_equal(knn.predict_proba(x), np.array([[1, 0]]))

def test_predict(self, knn, sample_data):
X_train, y_train, X_test = sample_data
knn.fit(X_train, y_train)
predictions = knn.predict(X_test)

assert len(predictions) == len(X_test)
assert all(p in [0, 1] for p in predictions)

def test_predict_proba(self, knn, sample_data):
X_train, y_train, X_test = sample_data
knn.fit(X_train, y_train)
proba = knn.predict_proba(X_test)

assert proba.shape == (len(X_test), len(set(y_train)))
assert np.all(proba >= 0) and np.all(proba <= 1)
assert np.allclose(proba.sum(axis=1), 1)
Loading