diff --git a/pyproject.toml b/pyproject.toml index 286ccda..4568340 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,12 +6,12 @@ readme = "README.md" requires-python = ">=3.12" dependencies = [ "drain3>=0.9.11", - "pandas>=2.3.2", "protobuf>=6.32.1", "pydantic>=2.11.7", "pyyaml>=6.0.3", "regex>=2025.11.3", "kafka-python>=2.3.0", + "numpy>=2.3.2", ] [dependency-groups] @@ -21,6 +21,8 @@ dev = [ "prek>=0.2.8", "pytest>=8.4.2", "pytest-cov>=6.2.1", + "pandas>=2.3.2", + "polars>=1.38.1", ] [build-system] diff --git a/src/detectmatelibrary/utils/RLE_list.py b/src/detectmatelibrary/utils/RLE_list.py new file mode 100644 index 0000000..8009433 --- /dev/null +++ b/src/detectmatelibrary/utils/RLE_list.py @@ -0,0 +1,63 @@ +# Incremental RLE implementation. +# https://en.wikipedia.org/wiki/Run-length_encoding + +from typing import Generic, Iterable, Iterator, List, Tuple, TypeVar + +from .preview_helpers import list_preview_str + +T = TypeVar("T") + + +class RLEList(Generic[T]): + """List-like container storing data in run-length encoded form.""" + + def __init__(self, data: Iterable[T] | None = None): + self._runs: List[Tuple[T, int]] = [] + self._len: int = 0 + if data is not None: + for x in data: + self.append(x) + + def append(self, x: T) -> None: + if self._runs and self._runs[-1][0] == x: + v, c = self._runs[-1] + self._runs[-1] = (v, c + 1) + else: + self._runs.append((x, 1)) + self._len += 1 + + def extend(self, xs: Iterable[T]) -> None: + for x in xs: + self.append(x) + + def __len__(self) -> int: + return self._len + + def __iter__(self) -> Iterator[T]: + for v, c in self._runs: + for _ in range(c): + yield v + + def runs(self) -> List[Tuple[T, int]]: + """Return the internal RLE representation.""" + return list(self._runs) + + def __repr__(self) -> str: + # convert bool to int + runs_str = list_preview_str(self._runs) + return f"RLEList(len={self._len}, runs={runs_str})" + + +# example usage + +# r = RLEList[str]() + +# r.append("A") +# r.append("A") +# r.append("B") +# r.extend(["B", "B", "C"]) + +# print(len(r)) # 6 +# print(list(r)) # ['A', 'A', 'B', 'B', 'B', 'C'] +# print(r.runs()) # [('A', 2), ('B', 3), ('C', 1)] +# print(r) # RLEList(len=6, runs=[('A', 2), ('B', 3), ('C', 1)]) diff --git a/src/detectmatelibrary/utils/persistency.py b/src/detectmatelibrary/utils/persistency.py deleted file mode 100644 index a107634..0000000 --- a/src/detectmatelibrary/utils/persistency.py +++ /dev/null @@ -1 +0,0 @@ -# placeholder for the persistency class that handles all kinds of persistency diff --git a/src/detectmatelibrary/utils/persistency/__init__.py b/src/detectmatelibrary/utils/persistency/__init__.py new file mode 100644 index 0000000..5ea37cd --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/__init__.py @@ -0,0 +1,5 @@ +from .event_persistency import EventPersistency + +__all__ = [ + "EventPersistency" +] diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/base.py b/src/detectmatelibrary/utils/persistency/event_data_structures/base.py new file mode 100644 index 0000000..8319e33 --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/base.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from typing import Any, List +from dataclasses import dataclass + + +@dataclass +class EventDataStructure(ABC): + """Storage backend interface for event-based data analysis.""" + + event_id: int = -1 + template: str = "" + + @abstractmethod + def add_data(self, data_object: Any) -> None: ... + + @abstractmethod + def get_data(self) -> Any: ... + + @abstractmethod + def get_variables(self) -> List[str]: ... + + @abstractmethod + def to_data(self, raw_data: Any) -> Any: + """Convert raw data into the appropriate data format for storage.""" + pass + + def get_template(self) -> str: + return self.template + + def get_event_id(self) -> int: + return self.event_id diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/dataframes/__init__.py b/src/detectmatelibrary/utils/persistency/event_data_structures/dataframes/__init__.py new file mode 100644 index 0000000..52c22b1 --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/dataframes/__init__.py @@ -0,0 +1,4 @@ +from .event_dataframe import EventDataFrame +from .chunked_event_dataframe import ChunkedEventDataFrame + +__all__ = ["EventDataFrame", "ChunkedEventDataFrame"] diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/dataframes/chunked_event_dataframe.py b/src/detectmatelibrary/utils/persistency/event_data_structures/dataframes/chunked_event_dataframe.py new file mode 100644 index 0000000..8b15bff --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/dataframes/chunked_event_dataframe.py @@ -0,0 +1,81 @@ +from typing import Any, Dict, List, Optional +from dataclasses import dataclass, field + +import polars as pl + +from ..base import EventDataStructure + + +# -------- Polars backends -------- +@dataclass +class ChunkedEventDataFrame(EventDataStructure): + """ + Streaming-friendly Polars DataFrame backend: + - Ingest appends chunks (cheap) + - Retention by max_rows is handled internally + - DataFrame is materialized on demand + """ + max_rows: Optional[int] = 10_000_000 + compact_every: int = 1000 + + chunks: list[pl.DataFrame] = field(default_factory=list) + _rows: int = 0 + + def add_data(self, data: pl.DataFrame) -> None: + if data.height == 0: + return + self.chunks.append(data) + self._rows += data.height + + if self.max_rows is not None: + self._evict_oldest() + + if len(self.chunks) > self.compact_every: + self._compact() + + def _evict_oldest(self) -> None: + if self.max_rows is not None: + overflow = self._rows - self.max_rows + if overflow <= 0: + return + + # drop whole chunks + while self.chunks and overflow >= self.chunks[0].height: + oldest = self.chunks.pop(0) + overflow -= oldest.height + self._rows -= oldest.height + + # trim remaining overflow from the oldest chunk + if overflow > 0 and self.chunks: + oldest = self.chunks[0] + keep = oldest.height - overflow + self.chunks[0] = oldest.tail(keep) + self._rows -= overflow + + def _compact(self) -> None: + if not self.chunks: + return + df = pl.concat(self.chunks, how="vertical", rechunk=False) + self.chunks = [df] + self._rows = df.height + + def get_data(self) -> pl.DataFrame: + if not self.chunks: + return pl.DataFrame() + if len(self.chunks) == 1: + return self.chunks[0] + return pl.concat(self.chunks, how="vertical", rechunk=False) + + def get_variables(self) -> Any: + if not self.chunks: + return [] + return self.chunks[0].columns + + def to_data(self, raw_data: Dict[str, List[Any]]) -> pl.DataFrame: + return pl.DataFrame(raw_data) + + def __repr__(self) -> str: + return ( + f"ChunkedEventDataFrame(df=..., rows={self._rows}, chunks={len(self.chunks)}, " + f"variables={self.get_variables()})" + ) diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/dataframes/event_dataframe.py b/src/detectmatelibrary/utils/persistency/event_data_structures/dataframes/event_dataframe.py new file mode 100644 index 0000000..a823f28 --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/dataframes/event_dataframe.py @@ -0,0 +1,39 @@ +from typing import Any, Dict, List + +from dataclasses import dataclass, field + +import pandas as pd + +from ..base import EventDataStructure + + +# -------- Pandas backend -------- + +@dataclass +class EventDataFrame(EventDataStructure): + """ + Pandas DataFrame backend: + - Ingest appends data (expensive) + - Retention is not handled (can be extended) + - DataFrame is always materialized + """ + data: pd.DataFrame = field(default_factory=pd.DataFrame) + + def add_data(self, data: pd.DataFrame) -> None: + if len(self.data) > 0: + self.data = pd.concat([self.data, data], ignore_index=True) + else: + self.data = data + + def get_data(self) -> pd.DataFrame: + return self.data + + def get_variables(self) -> List[str]: + return list(self.data.columns) + + def to_data(self, raw_data: Dict[int | str, Any]) -> pd.DataFrame: + data = {key: [value] for key, value in raw_data.items()} + return pd.DataFrame(data) + + def __repr__(self) -> str: + return f"EventDataFrame(df=..., rows={len(self.data)}, variables={self.get_variables()})" diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/__init__.py b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/__init__.py new file mode 100644 index 0000000..6b7e531 --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/__init__.py @@ -0,0 +1,32 @@ +"""Trackers module for tracking variable behaviors over time/events. + +A tracker is a data structure that stores a specific feature of a +variable and tracks the behavior of that feature over time/events. It +operates within the persistency framework to monitor how variables +evolve. It is interchangable with other EventDataStructure +implementations. +""" + +from .stability import ( + StabilityClassifier, + SingleStabilityTracker, + MultiStabilityTracker, + EventStabilityTracker +) +from .base import ( + EventTracker, + MultiTracker, + SingleTracker, + Classification, +) + +__all__ = [ + "EventTracker", + "SingleTracker", + "MultiTracker", + "Classification", + "StabilityClassifier", + "SingleStabilityTracker", + "MultiStabilityTracker", + "EventStabilityTracker", +] diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/__init__.py b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/__init__.py new file mode 100644 index 0000000..33a8ecd --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/__init__.py @@ -0,0 +1,10 @@ +from .single_tracker import SingleTracker, Classification +from .multi_tracker import MultiTracker +from .event_tracker import EventTracker + +__all__ = [ + "EventTracker", + "MultiTracker", + "SingleTracker", + "Classification", +] diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/event_tracker.py b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/event_tracker.py new file mode 100644 index 0000000..7dd474d --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/event_tracker.py @@ -0,0 +1,45 @@ +"""Event data structure that tracks variable behaviors over time/events.""" + +from typing import Any, Callable, Dict, Type + +from detectmatelibrary.utils.preview_helpers import format_dict_repr + +from .multi_tracker import MultiTracker +from .single_tracker import SingleTracker +from ...base import EventDataStructure + + +class EventTracker(EventDataStructure): + """Event data structure that tracks the behavior of each event over time / + number of events.""" + + def __init__( + self, + single_tracker_type: Type[SingleTracker] = SingleTracker, + multi_tracker_type: Type[MultiTracker] = MultiTracker, + converter_function: Callable[[Any], Any] = lambda x: x, + ) -> None: + self.single_tracker_type = single_tracker_type + self.multi_tracker_type = multi_tracker_type + self.converter_function = converter_function + self.multi_tracker = self.multi_tracker_type(single_tracker_type=self.single_tracker_type) + + def add_data(self, data_object: Any) -> None: + """Add data to the variable trackers.""" + self.multi_tracker.add_data(data_object) + + def get_data(self) -> Dict[str, SingleTracker]: + """Retrieve the tracker's stored data.""" + return self.multi_tracker.get_trackers() + + def get_variables(self) -> list[str]: + """Get the list of tracked variable names.""" + return list(self.multi_tracker.get_trackers().keys()) + + def to_data(self, raw_data: Dict[str, Any]) -> Any: + """Transform raw data into the format expected by the tracker.""" + return self.converter_function(raw_data) + + def __repr__(self) -> str: + strs = format_dict_repr(self.multi_tracker.get_trackers(), indent="\t") + return f"{self.__class__.__name__}(data={{\n\t{strs}\n}})" diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/multi_tracker.py b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/multi_tracker.py new file mode 100644 index 0000000..d6fea78 --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/multi_tracker.py @@ -0,0 +1,36 @@ +from typing import Any, Dict, Type + +from detectmatelibrary.utils.preview_helpers import format_dict_repr + +from .single_tracker import SingleTracker, Classification + + +class MultiTracker: + """Tracks multiple features (e.g. variables or variable combos) using + individual trackers.""" + + def __init__(self, single_tracker_type: Type[SingleTracker] = SingleTracker) -> None: + self.single_trackers: Dict[str, SingleTracker] = {} + self.single_tracker_type: Type[SingleTracker] = single_tracker_type + + def add_data(self, data_object: Dict[str, Any]) -> None: + """Add data to the appropriate feature trackers.""" + for name, value in data_object.items(): + if name not in self.single_trackers: + self.single_trackers[name] = self.single_tracker_type() + self.single_trackers[name].add_value(value) + + def get_trackers(self) -> Dict[str, SingleTracker]: + """Get the current feature trackers.""" + return self.single_trackers + + def classify(self) -> Dict[str, Classification]: + """Classify all tracked features.""" + classifications = {} + for name, tracker in self.single_trackers.items(): + classifications[name] = tracker.classify() + return classifications + + def __repr__(self) -> str: + strs = format_dict_repr(self.single_trackers, indent="\t") + return f"{self.__class__.__name__}{{\n\t{strs}\n}}\n" diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/single_tracker.py b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/single_tracker.py new file mode 100644 index 0000000..de6a017 --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/base/single_tracker.py @@ -0,0 +1,28 @@ +"""Tracks whether a variable is converging to a constant value.""" + +from typing import Any +from dataclasses import dataclass +from abc import ABC, abstractmethod + + +@dataclass +class Classification: + type: str = "" + reason: str = "" + + +class SingleTracker(ABC): + """Tracks whether a single variable is converging to a constant value.""" + + @abstractmethod + def add_value(self, value: Any) -> None: + """Add a new value to the tracker.""" + + @abstractmethod + def classify(self) -> Classification: + """Classify the tracker based on the current data.""" + pass + + @abstractmethod + def __repr__(self) -> str: + pass diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/stability/__init__.py b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/stability/__init__.py new file mode 100644 index 0000000..32ead5d --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/stability/__init__.py @@ -0,0 +1,9 @@ +from .stability_tracker import SingleStabilityTracker, MultiStabilityTracker, EventStabilityTracker +from .stability_classifier import StabilityClassifier + +__all__ = [ + "EventStabilityTracker", + "MultiStabilityTracker", + "SingleStabilityTracker", + "StabilityClassifier", +] diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/stability/stability_classifier.py b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/stability/stability_classifier.py new file mode 100644 index 0000000..5c2237c --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/stability/stability_classifier.py @@ -0,0 +1,90 @@ +"""Classifier for stability based on segment means.""" + +from typing import List +import numpy as np + +from detectmatelibrary.utils.RLE_list import RLEList + + +class StabilityClassifier: + """Classifier for stability based on segment means.""" + def __init__(self, segment_thresholds: List[float], min_samples: int = 10): + self.segment_threshs = segment_thresholds + self.min_samples = min_samples + # for RLELists + self.segment_sums = [0.0] * len(segment_thresholds) + self.segment_counts = [0] * len(segment_thresholds) + self.n_segments = len(self.segment_threshs) + # for lists + self.segment_means: List[float] = [] + + def is_stable(self, change_series: RLEList[bool] | List[bool]) -> bool: + """Determine if a list of segment means is stable. + + Works efficiently with RLEList without expanding to a full list. + """ + # Handle both RLEList and regular list + if isinstance(change_series, RLEList): + total_len = len(change_series) + if total_len == 0: + return True + + # Calculate segment boundaries + segment_size = total_len / self.n_segments + segment_boundaries = [int(i * segment_size) for i in range(self.n_segments + 1)] + segment_boundaries[-1] = total_len + + # Compute segment means directly from RLE runs + segment_sums = [0.0] * self.n_segments + segment_counts = [0] * self.n_segments + + position = 0 + for value, count in change_series.runs(): + run_start = position + run_end = position + count + + # Find which segments this run overlaps with + for seg_idx in range(self.n_segments): + seg_start = segment_boundaries[seg_idx] + seg_end = segment_boundaries[seg_idx + 1] + + # Calculate overlap between run and segment + overlap_start = max(run_start, seg_start) + overlap_end = min(run_end, seg_end) + overlap_count = max(0, overlap_end - overlap_start) + + if overlap_count > 0: + segment_sums[seg_idx] += value * overlap_count + segment_counts[seg_idx] += overlap_count + + position = run_end + + # Calculate means + self.segment_means = [ + segment_sums[i] / segment_counts[i] if segment_counts[i] > 0 else np.nan + for i in range(self.n_segments) + ] + else: + # Original implementation for regular lists + self.segment_means = self._compute_segment_means(change_series) + return all([not q >= thresh for q, thresh in zip(self.segment_means, self.segment_threshs)]) + + def _compute_segment_means(self, change_series: List[bool]) -> List[float]: + """Get means of each segment for a normal list.""" + segments = np.array_split(change_series, self.n_segments) + return list(map(lambda x: np.mean(x) if len(x) > 0 else np.nan, segments)) + + def get_last_segment_means(self) -> List[float]: + return self.segment_means + + def get_segment_thresholds(self) -> List[float]: + return self.segment_threshs + + def __call__(self, change_series: RLEList[bool] | List[bool]) -> bool: + return self.is_stable(change_series) + + def __repr__(self) -> str: + return ( + f"StabilityClassifier(segment_threshs={self.segment_threshs}, " + f"segment_means={self.segment_means})" + ) diff --git a/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/stability/stability_tracker.py b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/stability/stability_tracker.py new file mode 100644 index 0000000..a37ab3d --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_data_structures/trackers/stability/stability_tracker.py @@ -0,0 +1,107 @@ +"""Tracks whether a variable is converging to a constant value.""" + +from typing import Any, Callable, List, Literal, Set + +from detectmatelibrary.utils.preview_helpers import list_preview_str +from detectmatelibrary.utils.RLE_list import RLEList + +from ..base import SingleTracker, MultiTracker, EventTracker, Classification +from .stability_classifier import StabilityClassifier + + +class SingleStabilityTracker(SingleTracker): + """Tracks stability of a single feature.""" + + def __init__(self, min_samples: int = 3) -> None: + self.min_samples = min_samples + self.change_series: RLEList[bool] = RLEList() + self.unique_set: Set[Any] = set() + self.stability_classifier: StabilityClassifier = StabilityClassifier( + segment_thresholds=[1.1, 0.3, 0.1, 0.01], + ) + + def add_value(self, value: Any) -> None: + """Add a new value to the tracker.""" + unique_set_size_before = len(self.unique_set) + self.unique_set.add(value) + has_changed = len(self.unique_set) - unique_set_size_before > 0 + self.change_series.append(has_changed) + + def classify(self) -> Classification: + """Classify the variable.""" + if len(self.change_series) < self.min_samples: + return Classification( + type="INSUFFICIENT_DATA", + reason=f"Not enough data (have {len(self.change_series)}, need {self.min_samples})" + ) + elif len(self.unique_set) == 1: + return Classification( + type="STATIC", + reason="Unique set size is 1" + ) + elif len(self.unique_set) == len(self.change_series): + return Classification( + type="RANDOM", + reason=f"Unique set size equals number of samples ({len(self.change_series)})" + ) + elif self.stability_classifier.is_stable(self.change_series): + return Classification( + type="STABLE", + reason=( + f"Segment means of change series {self.stability_classifier.get_last_segment_means()} " + f"are below segment thresholds: {self.stability_classifier.get_segment_thresholds()}" + ) + ) + else: + return Classification( + type="UNSTABLE", + reason="No classification matched; variable is unstable" + ) + + def __repr__(self) -> str: + # show only part of the series for brevity + series_str = list_preview_str(self.change_series) + unique_set_str = "{" + ", ".join(map(str, list_preview_str(self.unique_set))) + "}" + RLE_str = list_preview_str(self.change_series.runs()) + return ( + f"{self.__class__.__name__}(classification={self.classify()}, change_series={series_str}, " + f"unique_set={unique_set_str}, RLE={RLE_str})" + ) + + +class MultiStabilityTracker(MultiTracker): + """Tracks multiple features (e.g. variables or variable combos) using + individual trackers.""" + + def get_variables_by_classification( + self, + classification_type: Literal["INSUFFICIENT_DATA", "STATIC", "RANDOM", "STABLE", "UNSTABLE"] + ) -> List[str]: + """Get a list of variable names that are classified as the given + type.""" + variables = [] + for name, tracker in self.single_trackers.items(): + classification = tracker.classify() + if classification.type == classification_type: + variables.append(name) + return variables + + +class EventStabilityTracker(EventTracker): + """Event data structure that tracks the stability of each event over time / + number of events.""" + + def __init__(self, converter_function: Callable[[Any], Any] = lambda x: x) -> None: + self.multi_tracker: MultiStabilityTracker # for type hinting + super().__init__( + single_tracker_type=SingleStabilityTracker, + multi_tracker_type=MultiStabilityTracker, + converter_function=converter_function, + ) + + def get_variables_by_classification( + self, classification_type: Literal["INSUFFICIENT_DATA", "STATIC", "RANDOM", "STABLE", "UNSTABLE"] + ) -> List[str]: + """Get a list of variable names that are classified as the given + type.""" + return self.multi_tracker.get_variables_by_classification(classification_type) diff --git a/src/detectmatelibrary/utils/persistency/event_persistency.py b/src/detectmatelibrary/utils/persistency/event_persistency.py new file mode 100644 index 0000000..4861b8a --- /dev/null +++ b/src/detectmatelibrary/utils/persistency/event_persistency.py @@ -0,0 +1,101 @@ +from typing import Any, Dict, List, Optional, Type + +from .event_data_structures.base import EventDataStructure + + +# -------- Generic persistency -------- + +class EventPersistency: + """ + Event-based persistency orchestrator: + - manages multiple EventDataStructure instances, one per event ID + - doesn't know retention strategy + - only delegates to EventDataStructure + + Args: + event_data_class: The EventDataStructure subclass to use for storing event data. + variable_blacklist: Variable names to exclude from storage. "Content" is excluded by default. + event_data_kwargs: Additional keyword arguments to pass to the EventDataStructure constructor. + """ + + def __init__( + self, + event_data_class: Type[EventDataStructure], + variable_blacklist: Optional[List[str | int]] = ["Content"], + *, + event_data_kwargs: Optional[dict[str, Any]] = None, + ): + self.events_data: Dict[int, EventDataStructure] = {} + self.event_data_class = event_data_class + self.event_data_kwargs = event_data_kwargs or {} + self.variable_blacklist = variable_blacklist or [] + self.event_templates: Dict[int, str] = {} + + def ingest_event( + self, + event_id: int, + event_template: str, + variables: list[Any] = [], + named_variables: Dict[str, Any] = {} + ) -> None: + """Ingest event data into the appropriate EventData store.""" + if not variables and not named_variables: + return + self.event_templates[event_id] = event_template + all_variables = self.get_all_variables(variables, named_variables) + + data_structure = self.events_data.get(event_id) + if data_structure is None: + data_structure = self.event_data_class(**self.event_data_kwargs) + self.events_data[event_id] = data_structure + + data = data_structure.to_data(all_variables) + data_structure.add_data(data) + + def get_event_data(self, event_id: int) -> Any | None: + """Retrieve the data for a specific event ID.""" + data_structure = self.events_data.get(event_id) + return data_structure.get_data() if data_structure is not None else None + + def get_events_data(self) -> Dict[int, EventDataStructure]: + """Retrieve the events' data.""" + return self.events_data + + def get_event_template(self, event_id: int) -> str | None: + """Retrieve the template for a specific event ID.""" + return self.event_templates.get(event_id) + + def get_event_templates(self) -> Dict[int, str]: + """Retrieve all event templates.""" + return self.event_templates + + def get_all_variables( + self, + variables: list[Any], + log_format_variables: Dict[str, Any], + # variable_blacklist: List[str | int], + event_var_prefix: str = "var_", + ) -> dict[str, list[Any]]: + """Combine log format variables and event variables into a single + dictionary. + + Schema-friendly by using string column names. + """ + all_vars: dict[str, list[Any]] = { + k: v for k, v in log_format_variables.items() + if k not in self.variable_blacklist + } + all_vars.update({ + f"{event_var_prefix}{i}": val for i, val in enumerate(variables) + if i not in self.variable_blacklist + }) + return all_vars + + def __getitem__(self, event_id: int) -> EventDataStructure | None: + return self.events_data.get(event_id) + + def __repr__(self) -> str: + return ( + f"EventPersistency(num_event_types={len(self.events_data)}, " + f"keys={list(self.events_data.keys())})" + ) diff --git a/src/detectmatelibrary/utils/preview_helpers.py b/src/detectmatelibrary/utils/preview_helpers.py new file mode 100644 index 0000000..278f031 --- /dev/null +++ b/src/detectmatelibrary/utils/preview_helpers.py @@ -0,0 +1,19 @@ +from typing import Any, Dict + + +def list_preview_str(listlike: Any, bool_to_int: bool = True) -> list[Any]: + """Show a preview of a listlike sequence.""" + series_start = list(listlike)[:3] + if len(listlike) > 6: + series_end = list(listlike)[-3:] + series_preview = series_start + ["..."] + series_end + else: + series_preview = list(listlike) + if bool_to_int: + series_preview = [int(x) if isinstance(x, bool) else x for x in series_preview] + return series_preview + + +def format_dict_repr(items: Dict[str, Any], indent: str = "\t") -> str: + """Format a dictionary as a multiline string with indentation.""" + return f"\n{indent}".join(f"{name}: {value}" for name, value in items.items()) diff --git a/tests/test_utils/test_persistency.py b/tests/test_utils/test_persistency.py new file mode 100644 index 0000000..55af32b --- /dev/null +++ b/tests/test_utils/test_persistency.py @@ -0,0 +1,463 @@ +"""Tests for the persistency module core functionality. + +This module tests EventPersistency and data structure backends including +EventDataFrame (Pandas) and ChunkedEventDataFrame (Polars). +""" + + +import pandas as pd +import polars as pl +from detectmatelibrary.utils.persistency.event_persistency import EventPersistency +from detectmatelibrary.utils.persistency.event_data_structures.dataframes import ( + EventDataFrame, + ChunkedEventDataFrame, +) +from detectmatelibrary.utils.persistency.event_data_structures.trackers import ( + EventTracker, + SingleStabilityTracker, + EventStabilityTracker +) + + +# Sample test data - variables is a list, not a dict +SAMPLE_EVENT_1 = { + "event_id": "E001", + "event_template": "User <*> logged in from <*>", + "variables": ["alice", "192.168.1.1"], + "named_variables": {"timestamp": "2024-01-01 10:00:00"}, +} + +SAMPLE_EVENT_2 = { + "event_id": "E002", + "event_template": "Error in module <*>: <*>", + "variables": ["auth", "timeout"], + "named_variables": {"timestamp": "2024-01-01 10:01:00"}, +} + +SAMPLE_EVENT_3 = { + "event_id": "E001", + "event_template": "User <*> logged in from <*>", + "variables": ["bob", "192.168.1.2"], + "named_variables": {"timestamp": "2024-01-01 10:02:00"}, +} + + +class TestEventPersistency: + """Test suite for EventPersistency orchestrator class.""" + + def test_initialization_with_pandas_backend(self): + """Test initialization with EventDataFrame backend.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + assert persistency is not None + assert persistency.event_data_class == EventDataFrame + + def test_initialization_with_polars_backend(self): + """Test initialization with ChunkedEventDataFrame backend.""" + persistency = EventPersistency( + event_data_class=ChunkedEventDataFrame, + event_data_kwargs={"max_rows": 100}, + ) + assert persistency is not None + assert persistency.event_data_class == ChunkedEventDataFrame + + def test_initialization_with_tracker_backend(self): + """Test initialization with EventVariableTrackerData backend.""" + persistency = EventPersistency( + event_data_class=EventTracker, + event_data_kwargs={"tracker_type": SingleStabilityTracker}, + ) + assert persistency is not None + assert persistency.event_data_class == EventTracker + + def test_ingest_single_event(self): + """Test ingesting a single event.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + persistency.ingest_event(**SAMPLE_EVENT_1) + + data = persistency.get_event_data("E001") + assert data is not None + assert len(data) == 1 + assert "var_0" in data.columns # alice + assert "var_1" in data.columns # 192.168.1.1 + assert "timestamp" in data.columns + + def test_ingest_multiple_events_same_id(self): + """Test ingesting multiple events with the same ID.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + persistency.ingest_event(**SAMPLE_EVENT_1) + persistency.ingest_event(**SAMPLE_EVENT_3) + + data = persistency.get_event_data("E001") + assert len(data) == 2 + assert data["var_0"].tolist() == ["alice", "bob"] + + def test_ingest_multiple_events_different_ids(self): + """Test ingesting events with different IDs.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + persistency.ingest_event(**SAMPLE_EVENT_1) + persistency.ingest_event(**SAMPLE_EVENT_2) + + data1 = persistency.get_event_data("E001") + data2 = persistency.get_event_data("E002") + + assert len(data1) == 1 + assert len(data2) == 1 + assert "var_0" in data1.columns + assert "var_0" in data2.columns + + def test_get_all_events_data(self): + """Test retrieving data for all events.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + persistency.ingest_event(**SAMPLE_EVENT_1) + persistency.ingest_event(**SAMPLE_EVENT_2) + + all_data = persistency.get_events_data() + assert "E001" in all_data + assert "E002" in all_data + assert isinstance(all_data["E001"], EventDataFrame) + assert isinstance(all_data["E002"], EventDataFrame) + + def test_template_storage_and_retrieval(self): + """Test template storage and retrieval.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + persistency.ingest_event(**SAMPLE_EVENT_1) + persistency.ingest_event(**SAMPLE_EVENT_2) + + template1 = persistency.get_event_template("E001") + template2 = persistency.get_event_template("E002") + + assert template1 == "User <*> logged in from <*>" + assert template2 == "Error in module <*>: <*>" + + def test_get_all_templates(self): + """Test retrieving all templates.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + persistency.ingest_event(**SAMPLE_EVENT_1) + persistency.ingest_event(**SAMPLE_EVENT_2) + + templates = persistency.get_event_templates() + assert len(templates) == 2 + assert templates["E001"] == "User <*> logged in from <*>" + assert templates["E002"] == "Error in module <*>: <*>" + + def test_variable_blacklist(self): + """Test variable blacklisting functionality.""" + persistency = EventPersistency( + event_data_class=EventDataFrame, + variable_blacklist=[1], # Blacklist index 1 (second variable) + ) + persistency.ingest_event(**SAMPLE_EVENT_1) + + data = persistency.get_event_data("E001") + assert "var_0" in data.columns # First variable should be present + assert "var_1" not in data.columns # Second variable should be blocked + + def test_get_all_variables_method(self): + """Test the get_all_variables instance method.""" + variables = ["value1", "value2", "value3"] + named_variables = {"timestamp": "2024-01-01", "level": "INFO"} + blacklist = [1] # Blacklist index 1 + + persistency = EventPersistency( + event_data_class=EventDataFrame, + variable_blacklist=blacklist, + ) + combined = persistency.get_all_variables(variables, named_variables) + + assert "timestamp" in combined + assert "level" in combined + assert "var_0" in combined # First variable + assert "var_1" not in combined # Blacklisted + assert "var_2" in combined # Third variable + + def test_dict_like_access(self): + """Test dictionary-like access via __getitem__.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + persistency.ingest_event(**SAMPLE_EVENT_1) + + data_structure = persistency["E001"] + assert data_structure is not None + assert isinstance(data_structure, EventDataFrame) + + +class TestEventDataFrame: + """Test suite for EventDataFrame (Pandas backend).""" + + def test_initialization(self): + """Test EventDataFrame initialization.""" + edf = EventDataFrame() + assert edf is not None + assert len(edf.data) == 0 # Empty DataFrame + + def test_add_single_data(self): + """Test adding single data entry.""" + edf = EventDataFrame() + data_dict = {"user": "alice", "ip": "192.168.1.1"} + data_df = edf.to_data(data_dict) + edf.add_data(data_df) + + assert edf.data is not None + assert len(edf.data) == 1 + assert "user" in edf.data.columns + + def test_add_multiple_data(self): + """Test adding multiple data entries.""" + edf = EventDataFrame() + edf.add_data(edf.to_data({"user": "alice", "ip": "192.168.1.1"})) + edf.add_data(edf.to_data({"user": "bob", "ip": "192.168.1.2"})) + + assert len(edf.data) == 2 + assert edf.data["user"].tolist() == ["alice", "bob"] + + def test_get_data(self): + """Test retrieving data.""" + edf = EventDataFrame() + edf.add_data(edf.to_data({"user": "alice", "ip": "192.168.1.1"})) + + data = edf.get_data() + assert isinstance(data, pd.DataFrame) + assert len(data) == 1 + + def test_get_variable_names(self): + """Test retrieving variable names.""" + edf = EventDataFrame() + edf.add_data(edf.to_data({"user": "alice", "ip": "192.168.1.1", "port": "22"})) + + var_names = edf.get_variables() + assert "user" in var_names + assert "ip" in var_names + assert "port" in var_names + + +class TestChunkedEventDataFrame: + """Test suite for ChunkedEventDataFrame (Polars backend).""" + + def test_initialization_default(self): + """Test ChunkedEventDataFrame initialization with defaults.""" + cedf = ChunkedEventDataFrame() + assert cedf is not None + assert cedf.max_rows == 10_000_000 + assert cedf.compact_every == 1000 + + def test_initialization_custom_params(self): + """Test initialization with custom parameters.""" + cedf = ChunkedEventDataFrame(max_rows=500, compact_every=100) + assert cedf.max_rows == 500 + assert cedf.compact_every == 100 + + def test_add_single_data(self): + """Test adding single data entry.""" + cedf = ChunkedEventDataFrame(max_rows=10) + data_dict = {"user": ["alice"], "ip": ["192.168.1.1"]} + data_df = cedf.to_data(data_dict) + cedf.add_data(data_df) + + data = cedf.get_data() + assert data is not None + assert len(data) == 1 + + def test_add_data_triggers_compaction(self): + """Test that adding data beyond compact_every triggers compaction.""" + cedf = ChunkedEventDataFrame(max_rows=10000, compact_every=5) + + # Add 6 entries (should trigger compaction at 5) + for i in range(6): + cedf.add_data(cedf.to_data({"user": [f"user{i}"], "value": [i]})) + + # After compaction, should have 1 chunk + data = cedf.get_data() + assert len(data) == 6 + + def test_chunked_storage(self): + """Test that data is stored in chunks.""" + cedf = ChunkedEventDataFrame(max_rows=5, compact_every=1000) + + # Add more than max_rows + for i in range(8): + cedf.add_data(cedf.to_data({"user": [f"user{i}"], "value": [i]})) + + # Should have evicted oldest to stay within max_rows + data = cedf.get_data() + assert data is not None + assert len(data) <= 5 + + def test_get_variable_names(self): + """Test retrieving variable names from chunks.""" + cedf = ChunkedEventDataFrame() + cedf.add_data( + cedf.to_data({"user": ["alice"], "ip": ["192.168.1.1"], "port": ["22"]}) + ) + + var_names = cedf.get_variables() + assert "user" in var_names + assert "ip" in var_names + assert "port" in var_names + + def test_dict_to_dataframe_conversion(self): + """Test to_data method.""" + cedf = ChunkedEventDataFrame() + data_dict = {"user": ["alice"], "ip": ["192.168.1.1"]} + df = cedf.to_data(data_dict) + + assert isinstance(df, pl.DataFrame) + assert len(df) == 1 + assert "user" in df.columns + + +class TestEventPersistencyIntegration: + """Integration tests for EventPersistency with different backends.""" + + def test_pandas_backend_full_workflow(self): + """Test complete workflow with Pandas backend.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + + # Ingest multiple events + for i in range(10): + persistency.ingest_event( + event_id=f"E{i % 3}", + event_template=f"Template {i % 3}", + variables=[str(i), str(i * 10)], + named_variables={}, + ) + + # Verify all events stored + all_data = persistency.get_events_data() + assert len(all_data) == 3 # 3 unique event IDs + + # Verify correct grouping + assert len(all_data["E0"].get_data()) == 4 # 0, 3, 6, 9 + assert len(all_data["E1"].get_data()) == 3 # 1, 4, 7 + assert len(all_data["E2"].get_data()) == 3 # 2, 5, 8 + + def test_polars_backend_full_workflow(self): + """Test complete workflow with Polars backend.""" + persistency = EventPersistency( + event_data_class=ChunkedEventDataFrame, + event_data_kwargs={"max_rows": 5, "compact_every": 10}, + ) + + # Ingest events + for i in range(10): + persistency.ingest_event( + event_id="E001", + event_template="Test template", + variables=[str(i)], + named_variables={}, + ) + + # Verify data retrieval works + data = persistency.get_event_data("E001") + assert data is not None + assert len(data) <= 5 # Should be trimmed to max_rows + + def test_tracker_backend_full_workflow(self): + """Test complete workflow with Tracker backend.""" + persistency = EventPersistency( + event_data_class=EventStabilityTracker, + ) + + # Ingest events with patterns + for i in range(20): + persistency.ingest_event( + event_id="E001", + event_template="Test template", + variables=["constant", str(i)], + named_variables={}, + ) + + # Verify tracker functionality + data_structure = persistency.events_data["E001"] + assert isinstance(data_structure, EventTracker) + + def test_mixed_event_ids_and_templates(self): + """Test handling mixed event IDs and templates.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + + events = [ + ("E001", "Login from <*>", ["192.168.1.1"]), + ("E002", "Error: <*>", ["timeout"]), + ("E001", "Login from <*>", ["192.168.1.2"]), + ("E003", "Logout <*>", ["alice"]), + ("E002", "Error: <*>", ["connection refused"]), + ] + + for event_id, template, variables in events: + persistency.ingest_event( + event_id=event_id, + event_template=template, + variables=variables, + named_variables={}, + ) + + # Verify correct storage + all_data = persistency.get_events_data() + assert len(all_data) == 3 + assert len(all_data["E001"].get_data()) == 2 + assert len(all_data["E002"].get_data()) == 2 + assert len(all_data["E003"].get_data()) == 1 + + # Verify templates + templates = persistency.get_event_templates() + assert templates["E001"] == "Login from <*>" + assert templates["E002"] == "Error: <*>" + assert templates["E003"] == "Logout <*>" + + def test_large_scale_ingestion(self): + """Test ingesting a large number of events.""" + persistency = EventPersistency(event_data_class=EventDataFrame) + + num_events = 1000 + for i in range(num_events): + persistency.ingest_event( + event_id=f"E{i % 10}", + event_template=f"Template {i % 10}", + variables=[str(i), str(i * 2)], + named_variables={"timestamp": f"2024-01-01 10:{i % 60}:00"}, + ) + + # Verify all data stored + all_data = persistency.get_events_data() + assert len(all_data) == 10 + + # Verify counts + total_rows = sum(len(data_structure.get_data()) for data_structure in all_data.values()) + assert total_rows == num_events + + def test_variable_blacklist_across_backends(self): + """Test variable blacklist works with different backends.""" + # Blacklist log format variables by name and event variables by index + log_blacklist = ["timestamp"] + event_blacklist = [1] # Second event variable + blacklist = log_blacklist + event_blacklist + + # Test with Pandas + p1 = EventPersistency( + event_data_class=EventDataFrame, + variable_blacklist=blacklist, + ) + p1.ingest_event( + event_id="E001", + event_template="Test", + variables=["alice", "1234"], + named_variables={"timestamp": "2024-01-01"}, + ) + data1 = p1.get_event_data("E001") + assert "var_0" in data1.columns # First variable + assert "var_1" not in data1.columns # Blacklisted + assert "timestamp" not in data1.columns # Blacklisted + + # Test with Polars + p2 = EventPersistency( + event_data_class=ChunkedEventDataFrame, + variable_blacklist=blacklist, + ) + p2.ingest_event( + event_id="E001", + event_template="Test", + variables=["bob", "5678"], + named_variables={"timestamp": "2024-01-02"}, + ) + data2 = p2.get_event_data("E001") + assert "var_0" in data2.columns # First variable + assert "var_1" not in data2.columns # Blacklisted + assert "timestamp" not in data2.columns # Blacklisted diff --git a/tests/test_utils/test_stability_tracking.py b/tests/test_utils/test_stability_tracking.py new file mode 100644 index 0000000..1b92a8d --- /dev/null +++ b/tests/test_utils/test_stability_tracking.py @@ -0,0 +1,554 @@ +"""Tests for the stability tracking module. + +This module tests StabilityClassifier, SingleVariableTracker, +MultiVariableTracker, and EventVariableTrackerData for variable +convergence and stability analysis. +""" + +from detectmatelibrary.utils.persistency.event_data_structures.trackers import ( + StabilityClassifier, + SingleStabilityTracker, + MultiStabilityTracker, + EventStabilityTracker, + Classification, +) +from detectmatelibrary.utils.RLE_list import RLEList + + +class TestStabilityClassifier: + """Test suite for StabilityClassifier.""" + + def test_initialization_default(self): + """Test StabilityClassifier initialization with defaults.""" + classifier = StabilityClassifier(segment_thresholds=[1.1, 0.5, 0.2, 0.1]) + assert classifier.segment_threshs == [1.1, 0.5, 0.2, 0.1] + assert classifier is not None + + def test_initialization_custom_threshold(self): + """Test initialization with custom segment thresholds.""" + classifier = StabilityClassifier(segment_thresholds=[0.8, 0.4, 0.2, 0.05]) + assert classifier.segment_threshs == [0.8, 0.4, 0.2, 0.05] + + def test_is_stable_with_rle_list_stable_pattern(self): + """Test stability detection with RLEList - stable pattern.""" + classifier = StabilityClassifier(segment_thresholds=[1.1, 0.3, 0.1, 0.01]) + + # Create RLEList: 10 True, 5 False, 15 True (stabilizing to True) + rle = RLEList() + for _ in range(10): + rle.append(True) + for _ in range(5): + rle.append(False) + for _ in range(15): + rle.append(True) + + result = classifier.is_stable(rle) + assert isinstance(result, bool) + + def test_is_stable_with_rle_list_unstable_pattern(self): + """Test stability detection with RLEList - unstable pattern.""" + classifier = StabilityClassifier(segment_thresholds=[1.1, 0.3, 0.1, 0.01]) + + # Create alternating pattern + rle = RLEList() + for _ in range(20): + rle.append(True) + rle.append(False) + + result = classifier.is_stable(rle) + assert isinstance(result, bool) + + def test_is_stable_with_regular_list(self): + """Test stability detection with regular list.""" + classifier = StabilityClassifier(segment_thresholds=[1.1, 0.3, 0.1, 0.01]) + + # Stable pattern + stable_list = [1] * 10 + [0] * 5 + [1] * 15 + result = classifier.is_stable(stable_list) + assert isinstance(result, bool) + + def test_different_segment_thresholds(self): + """Test behavior with different segment thresholds.""" + series = [1] * 10 + [0] * 3 + [1] * 15 + + strict = StabilityClassifier(segment_thresholds=[0.5, 0.2, 0.05, 0.01]) + lenient = StabilityClassifier(segment_thresholds=[2.0, 1.0, 0.5, 0.3]) + + result_strict = strict.is_stable(series) + result_lenient = lenient.is_stable(series) + + assert isinstance(result_strict, bool) + assert isinstance(result_lenient, bool) + + +class TestSingleVariableTracker: + """Test suite for SingleVariableTracker.""" + + def test_initialization_default(self): + """Test SingleVariableTracker initialization with defaults.""" + tracker = SingleStabilityTracker() + assert tracker.min_samples == 3 + assert isinstance(tracker.change_series, RLEList) + assert isinstance(tracker.unique_set, set) + + def test_initialization_custom_params(self): + """Test initialization with custom parameters.""" + tracker = SingleStabilityTracker(min_samples=20) + assert tracker.min_samples == 20 + + def test_add_value_single(self): + """Test adding a single value.""" + tracker = SingleStabilityTracker() + tracker.add_value("value1") + + assert len(tracker.unique_set) == 1 + assert len(tracker.change_series) == 1 + + def test_add_value_multiple_same(self): + """Test adding multiple same values.""" + tracker = SingleStabilityTracker() + + for i in range(10): + tracker.add_value("constant") + + assert len(tracker.unique_set) == 1 + assert "constant" in tracker.unique_set + + def test_add_value_multiple_different(self): + """Test adding multiple different values.""" + tracker = SingleStabilityTracker() + + for i in range(10): + tracker.add_value(f"value_{i}") + + assert len(tracker.unique_set) == 10 + + def test_classification_insufficient_data(self): + """Test classification with insufficient data.""" + tracker = SingleStabilityTracker(min_samples=30) + + for i in range(10): # Less than min_samples + tracker.add_value(f"value_{i}") + + result = tracker.classify() + assert result.type == "INSUFFICIENT_DATA" + + def test_classification_static(self): + """Test classification as STATIC (single unique value).""" + tracker = SingleStabilityTracker(min_samples=10) + + for i in range(40): + tracker.add_value("constant") + + result = tracker.classify() + assert result.type == "STATIC" + + def test_classification_random(self): + """Test classification as RANDOM (all unique values).""" + tracker = SingleStabilityTracker(min_samples=10) + + for i in range(40): + tracker.add_value(f"unique_{i}") + + result = tracker.classify() + assert result.type == "RANDOM" + + def test_classification_stable(self): + """Test classification as STABLE (converging pattern).""" + tracker = SingleStabilityTracker(min_samples=10) + + # Pattern: changing values that stabilize + for i in range(15): + tracker.add_value(f"value_{i % 5}") + for i in range(25): + tracker.add_value("stable_value") + + result = tracker.classify() + assert result.type in ["STABLE", "STATIC"] + + def test_classification_unstable(self): + """Test classification as UNSTABLE (no clear pattern).""" + tracker = SingleStabilityTracker(min_samples=10) + + # Alternating pattern + for i in range(40): + tracker.add_value("value_a" if i % 2 == 0 else "value_b") + + result = tracker.classify() + # Could be UNSTABLE or classified differently depending on classifier logic + assert result.type in ["UNSTABLE", "STABLE", "RANDOM"] + + def test_rle_list_integration(self): + """Test that RLEList is used for efficient storage.""" + tracker = SingleStabilityTracker() + + # Add values that create runs + for i in range(20): + tracker.add_value("a") + for i in range(20): + tracker.add_value("b") + + # RLEList should be populated + assert len(tracker.change_series) > 0 + + def test_unique_values_tracking(self): + """Test unique values are tracked in a set.""" + tracker = SingleStabilityTracker() + + values = ["a", "b", "c", "a", "b", "a"] + for v in values: + tracker.add_value(v) + + assert len(tracker.unique_set) == 3 + assert "a" in tracker.unique_set + assert "b" in tracker.unique_set + assert "c" in tracker.unique_set + + def test_change_detection(self): + """Test that add_value correctly detects changes.""" + tracker = SingleStabilityTracker() + + tracker.add_value("a") + tracker.add_value("a") + tracker.add_value("b") + + # First value creates a change (new unique value) + # Second doesn't (same value) + # Third does (new unique value) + assert len(tracker.change_series) == 3 + + +class TestMultiVariableTracker: + """Test suite for MultiStabilityTracker manager.""" + + def test_initialization_default(self): + """Test MultiStabilityTracker initialization.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + assert trackers is not None + assert trackers.single_tracker_type == SingleStabilityTracker + + def test_initialization_with_kwargs(self): + """Test initialization without kwargs - MultiStabilityTracker doesn't store tracker kwargs.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + assert trackers.single_tracker_type == SingleStabilityTracker + + def test_add_data_single_variable(self): + """Test adding data for a single variable.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + data = {"var1": "value1"} + trackers.add_data(data) + + all_trackers = trackers.get_trackers() + assert "var1" in all_trackers + assert isinstance(all_trackers["var1"], SingleStabilityTracker) + + def test_add_data_multiple_variables(self): + """Test adding data for multiple variables.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + data = {"var1": "value1", "var2": "value2", "var3": "value3"} + trackers.add_data(data) + + all_trackers = trackers.get_trackers() + assert len(all_trackers) == 3 + assert "var1" in all_trackers + assert "var2" in all_trackers + assert "var3" in all_trackers + + def test_add_data_multiple_times(self): + """Test adding data multiple times.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + + trackers.add_data({"var1": "a", "var2": "x"}) + trackers.add_data({"var1": "b", "var2": "y"}) + trackers.add_data({"var1": "a", "var3": "z"}) + + all_trackers = trackers.get_trackers() + assert len(all_trackers) == 3 + assert "var3" in all_trackers # New variable dynamically added + + def test_classify_all_variables(self): + """Test classifying all variables.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + + # Add enough data for classification + for i in range(10): + trackers.add_data({"var1": "constant", "var2": f"changing_{i}"}) + + classifications = trackers.classify() + assert "var1" in classifications + assert "var2" in classifications + assert isinstance(classifications["var1"], Classification) + + def test_get_stable_variables(self): + """Test retrieving stable variables.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + + # Create stable pattern + for i in range(40): + trackers.add_data({ + "stable_var": "constant", + "random_var": f"unique_{i}", + }) + + stable_vars = trackers.get_variables_by_classification("STABLE") + assert isinstance(stable_vars, list) + # "stable_var" should be classified as STATIC + + def test_get_trackers(self): + """Test retrieving all trackers.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + trackers.add_data({"var1": "a", "var2": "b"}) + + all_trackers = trackers.get_trackers() + assert isinstance(all_trackers, dict) + assert len(all_trackers) == 2 + + def test_dynamic_tracker_creation(self): + """Test that trackers are created dynamically.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + + # First add + trackers.add_data({"var1": "a"}) + assert len(trackers.get_trackers()) == 1 + + # Second add with new variable + trackers.add_data({"var1": "b", "var2": "x"}) + assert len(trackers.get_trackers()) == 2 + + # Third add with another new variable + trackers.add_data({"var3": "z"}) + assert len(trackers.get_trackers()) == 3 + + +class TestEventVariableTrackerData: + """Test suite for EventStabilityTracker.""" + + def test_initialization(self): + """Test EventStabilityTracker initialization.""" + evt = EventStabilityTracker() + assert evt is not None + assert isinstance(evt.multi_tracker, MultiStabilityTracker) + + def test_add_data(self): + """Test adding data.""" + evt = EventStabilityTracker() + data = {"var1": "value1", "var2": "value2"} + evt.add_data(data) + + # Should have created trackers + trackers = evt.get_data() + assert len(trackers) == 2 + + def test_get_variables(self): + """Test retrieving variable names.""" + evt = EventStabilityTracker() + evt.add_data({"var1": "a", "var2": "b", "var3": "c"}) + + var_names = evt.get_variables() + assert "var1" in var_names + assert "var2" in var_names + assert "var3" in var_names + assert len(var_names) == 3 + + def test_get_stable_variables(self): + """Test retrieving stable variables.""" + evt = EventStabilityTracker() + + for i in range(40): + evt.add_data({ + "stable_var": "constant", + "random_var": f"unique_{i}", + }) + + stable_vars = evt.get_variables_by_classification("STABLE") + assert isinstance(stable_vars, list) + + def test_integration_with_stability_tracker(self): + """Test full integration with SingleStabilityTracker.""" + evt = EventStabilityTracker() + + # Simulate log processing + for i in range(50): + evt.add_data({ + "user": f"user_{i % 5}", + "status": "success" if i > 30 else f"status_{i}", + "request_id": f"req_{i}", + }) + + # Get data and variables + var_names = evt.get_variables() + stable_vars = evt.get_variables_by_classification("STABLE") + + assert len(var_names) == 3 + assert isinstance(stable_vars, list) + + def test_to_data_default_converter(self): + """Test to_data method with default converter (identity function).""" + evt = EventStabilityTracker() + + raw_data = {"var1": "value1", "var2": "value2"} + converted_data = evt.to_data(raw_data) + + # Default converter returns the dict as-is + assert converted_data == raw_data + assert "var1" in converted_data + assert "var2" in converted_data + + def test_to_data_with_custom_converter(self): + """Test to_data method with custom converter function.""" + # Custom converter that uppercases string values + def uppercase_converter(data: dict) -> dict: + return {k: v.upper() if isinstance(v, str) else v for k, v in data.items()} + + evt = EventStabilityTracker(converter_function=uppercase_converter) + + raw_data = {"var1": "value1", "var2": "value2"} + converted_data = evt.to_data(raw_data) + + assert converted_data["var1"] == "VALUE1" + assert converted_data["var2"] == "VALUE2" + + def test_to_data_integration_with_add_data(self): + """Test that to_data and add_data work together correctly.""" + evt = EventStabilityTracker() + + # Use to_data to convert raw data, then add it + raw_data = {"user": "alice", "ip": "192.168.1.1"} + converted_data = evt.to_data(raw_data) + evt.add_data(converted_data) + + # Verify data was added correctly + trackers = evt.get_data() + assert len(trackers) == 2 + assert "user" in trackers + assert "ip" in trackers + + def test_converter_function_is_callable(self): + """Test that converter_function is correctly set.""" + evt = EventStabilityTracker() + # Default converter should be identity function + assert callable(evt.converter_function) + assert evt.converter_function({"a": 1}) == {"a": 1} + + +class TestClassification: + """Test suite for Classification dataclass.""" + + def test_initialization(self): + """Test Classification initialization.""" + result = Classification(type="STABLE", reason="Test reason") + assert result.type == "STABLE" + assert result.reason == "Test reason" + + def test_all_classification_types(self): + """Test Classification with all classification types.""" + types = ["INSUFFICIENT_DATA", "STATIC", "RANDOM", "STABLE", "UNSTABLE"] + + for cls_type in types: + result = Classification(type=cls_type, reason=f"Reason for {cls_type}") + assert result.type == cls_type + assert isinstance(result.reason, str) + + +class TestStabilityTrackingIntegration: + """Integration tests for stability tracking components.""" + + def test_full_workflow_static_variable(self): + """Test full workflow with static variable.""" + tracker = SingleStabilityTracker(min_samples=10) + + # Add 50 identical values + for i in range(50): + tracker.add_value("constant_value") + + classification = tracker.classify() + assert classification.type == "STATIC" + + def test_full_workflow_random_variable(self): + """Test full workflow with random variable.""" + tracker = SingleStabilityTracker(min_samples=10) + + # Add 50 unique values + for i in range(50): + tracker.add_value(f"unique_value_{i}") + + classification = tracker.classify() + assert classification.type == "RANDOM" + + def test_full_workflow_stabilizing_variable(self): + """Test full workflow with stabilizing variable.""" + tracker = SingleStabilityTracker(min_samples=10) + + # Start with varied values, then stabilize + for i in range(15): + tracker.add_value(f"value_{i % 7}") + for i in range(35): + tracker.add_value("final_stable_value") + + classification = tracker.classify() + # Should be STABLE or STATIC + assert classification.type in ["STABLE", "STATIC"] + + def test_multiple_variables_with_different_patterns(self): + """Test tracking multiple variables with different patterns.""" + trackers = MultiStabilityTracker(single_tracker_type=SingleStabilityTracker) + + # Simulate 100 events + for i in range(100): + trackers.add_data({ + "static_var": "always_same", + "random_var": f"unique_{i}", + "user_id": f"user_{i % 10}", # 10 users cycling + "status": "ok" if i > 80 else f"status_{i % 5}", # Stabilizing + }) + + classifications = trackers.classify() + + # static_var should be STATIC + assert classifications["static_var"].type == "STATIC" + + # random_var should be RANDOM + assert classifications["random_var"].type == "RANDOM" + + # user_id and status depend on classifier logic + assert isinstance(classifications["user_id"], Classification) + assert isinstance(classifications["status"], Classification) + + def test_event_variable_tracker_real_world_scenario(self): + """Test EventStabilityTracker with realistic log data.""" + evt = EventStabilityTracker() + + # Simulate web server logs + for i in range(200): + evt.add_data({ + "method": "GET" if i % 10 != 0 else "POST", # Mostly GET + "status_code": "200" if i > 150 else str(200 + (i % 5)), # Stabilizing to 200 + "user_agent": f"Browser_{i % 3}", # 3 different browsers + "request_id": f"req_{i}", # Unique per request + "server": "prod-server-1", # Static + }) + + var_names = evt.get_variables() + stable_vars = evt.get_variables_by_classification("STABLE") + + assert len(var_names) == 5 + assert isinstance(stable_vars, list) + + # Server should definitely be stable + # Request_id should be random + # Others depend on exact classifier logic + + def test_classifier_with_varying_thresholds(self): + """Test stability classifier with different thresholds.""" + # Create a borderline case + pattern = [1] * 15 + [0] * 5 + [1] * 20 + + strict = StabilityClassifier(segment_thresholds=[0.5, 0.2, 0.08, 0.02]) + lenient = StabilityClassifier(segment_thresholds=[2.0, 1.5, 1.0, 0.5]) + + result_strict = strict.is_stable(pattern) + result_lenient = lenient.is_stable(pattern) + + # Both should produce boolean results + assert isinstance(result_strict, bool) + assert isinstance(result_lenient, bool) diff --git a/uv.lock b/uv.lock index caf0ecc..1fe4445 100644 --- a/uv.lock +++ b/uv.lock @@ -110,7 +110,7 @@ source = { editable = "." } dependencies = [ { name = "drain3" }, { name = "kafka-python" }, - { name = "pandas" }, + { name = "numpy" }, { name = "protobuf" }, { name = "pydantic" }, { name = "pyyaml" }, @@ -118,6 +118,18 @@ dependencies = [ ] [package.optional-dependencies] +all = [ + { name = "pandas" }, + { name = "polars" }, +] +pandas = [ + { name = "pandas" }, +] +polars = [ + { name = "polars" }, +] + +[package.dev-dependencies] dev = [ { name = "prek" }, { name = "pytest" }, @@ -128,16 +140,24 @@ dev = [ requires-dist = [ { name = "drain3", specifier = ">=0.9.11" }, { name = "kafka-python", specifier = ">=2.3.0" }, - { name = "pandas", specifier = ">=2.3.2" }, - { name = "prek", marker = "extra == 'dev'", specifier = ">=0.2.8" }, + { name = "numpy", specifier = ">=2.3.2" }, + { name = "pandas", marker = "extra == 'all'", specifier = ">=2.3.2" }, + { name = "pandas", marker = "extra == 'pandas'", specifier = ">=2.0" }, + { name = "polars", marker = "extra == 'all'", specifier = ">=1.38.1" }, + { name = "polars", marker = "extra == 'polars'", specifier = ">=1.38.1" }, { name = "protobuf", specifier = ">=6.32.1" }, { name = "pydantic", specifier = ">=2.11.7" }, - { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.4.2" }, - { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=6.2.1" }, { name = "pyyaml", specifier = ">=6.0.3" }, { name = "regex", specifier = ">=2025.11.3" }, ] -provides-extras = ["dev"] +provides-extras = ["pandas", "polars", "all"] + +[package.metadata.requires-dev] +dev = [ + { name = "prek", specifier = ">=0.2.8" }, + { name = "pytest", specifier = ">=8.4.2" }, + { name = "pytest-cov", specifier = ">=6.2.1" }, +] [[package]] name = "drain3" @@ -291,6 +311,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, ] +[[package]] +name = "polars" +version = "1.38.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "polars-runtime-32" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c6/5e/208a24471a433bcd0e9a6889ac49025fd4daad2815c8220c5bd2576e5f1b/polars-1.38.1.tar.gz", hash = "sha256:803a2be5344ef880ad625addfb8f641995cfd777413b08a10de0897345778239", size = 717667, upload-time = "2026-02-06T18:13:23.013Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0a/49/737c1a6273c585719858261753da0b688454d1b634438ccba8a9c4eb5aab/polars-1.38.1-py3-none-any.whl", hash = "sha256:a29479c48fed4984d88b656486d221f638cba45d3e961631a50ee5fdde38cb2c", size = 810368, upload-time = "2026-02-06T18:11:55.819Z" }, +] + +[[package]] +name = "polars-runtime-32" +version = "1.38.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/07/4b/04d6b3fb7cf336fbe12fbc4b43f36d1783e11bb0f2b1e3980ec44878df06/polars_runtime_32-1.38.1.tar.gz", hash = "sha256:04f20ed1f5c58771f34296a27029dc755a9e4b1390caeaef8f317e06fdfce2ec", size = 2812631, upload-time = "2026-02-06T18:13:25.206Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ae/a2/a00defbddadd8cf1042f52380dcba6b6592b03bac8e3b34c436b62d12d3b/polars_runtime_32-1.38.1-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:18154e96044724a0ac38ce155cf63aa03c02dd70500efbbf1a61b08cadd269ef", size = 44108001, upload-time = "2026-02-06T18:11:58.127Z" }, + { url = "https://files.pythonhosted.org/packages/a7/fb/599ff3709e6a303024efd7edfd08cf8de55c6ac39527d8f41cbc4399385f/polars_runtime_32-1.38.1-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:c49acac34cc4049ed188f1eb67d6ff3971a39b4af7f7b734b367119970f313ac", size = 40230140, upload-time = "2026-02-06T18:12:01.181Z" }, + { url = "https://files.pythonhosted.org/packages/dc/8c/3ac18d6f89dc05fe2c7c0ee1dc5b81f77a5c85ad59898232c2500fe2ebbf/polars_runtime_32-1.38.1-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fef2ef2626a954e010e006cc8e4de467ecf32d08008f130cea1c78911f545323", size = 41994039, upload-time = "2026-02-06T18:12:04.332Z" }, + { url = "https://files.pythonhosted.org/packages/f2/5a/61d60ec5cc0ab37cbd5a699edb2f9af2875b7fdfdfb2a4608ca3cc5f0448/polars_runtime_32-1.38.1-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e8a5f7a8125e2d50e2e060296551c929aec09be23a9edcb2b12ca923f555a5ba", size = 45755804, upload-time = "2026-02-06T18:12:07.846Z" }, + { url = "https://files.pythonhosted.org/packages/91/54/02cd4074c98c361ccd3fec3bcb0bd68dbc639c0550c42a4436b0ff0f3ccf/polars_runtime_32-1.38.1-cp310-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:10d19cd9863e129273b18b7fcaab625b5c8143c2d22b3e549067b78efa32e4fa", size = 42159605, upload-time = "2026-02-06T18:12:10.919Z" }, + { url = "https://files.pythonhosted.org/packages/8e/f3/b2a5e720cc56eaa38b4518e63aa577b4bbd60e8b05a00fe43ca051be5879/polars_runtime_32-1.38.1-cp310-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:61e8d73c614b46a00d2f853625a7569a2e4a0999333e876354ac81d1bf1bb5e2", size = 45336615, upload-time = "2026-02-06T18:12:14.074Z" }, + { url = "https://files.pythonhosted.org/packages/f1/8d/ee2e4b7de948090cfb3df37d401c521233daf97bfc54ddec5d61d1d31618/polars_runtime_32-1.38.1-cp310-abi3-win_amd64.whl", hash = "sha256:08c2b3b93509c1141ac97891294ff5c5b0c548a373f583eaaea873a4bf506437", size = 45680732, upload-time = "2026-02-06T18:12:19.097Z" }, + { url = "https://files.pythonhosted.org/packages/bf/18/72c216f4ab0c82b907009668f79183ae029116ff0dd245d56ef58aac48e7/polars_runtime_32-1.38.1-cp310-abi3-win_arm64.whl", hash = "sha256:6d07d0cc832bfe4fb54b6e04218c2c27afcfa6b9498f9f6bbf262a00d58cc7c4", size = 41639413, upload-time = "2026-02-06T18:12:22.044Z" }, +] + [[package]] name = "prek" version = "0.2.10"