Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"drain3>=0.9.11",
"pandas>=2.3.2",
"protobuf>=6.32.1",
"pydantic>=2.11.7",
"pyyaml>=6.0.3",
"regex>=2025.11.3",
"kafka-python>=2.3.0",
"numpy>=2.3.2",
]

[dependency-groups]
Expand All @@ -21,6 +21,8 @@ dev = [
"prek>=0.2.8",
"pytest>=8.4.2",
"pytest-cov>=6.2.1",
"pandas>=2.3.2",
"polars>=1.38.1",
]

[build-system]
Expand Down
63 changes: 63 additions & 0 deletions src/detectmatelibrary/utils/RLE_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Incremental RLE implementation.
# https://en.wikipedia.org/wiki/Run-length_encoding

from typing import Generic, Iterable, Iterator, List, Tuple, TypeVar

from .preview_helpers import list_preview_str

T = TypeVar("T")


class RLEList(Generic[T]):
"""List-like container storing data in run-length encoded form."""

def __init__(self, data: Iterable[T] | None = None):
self._runs: List[Tuple[T, int]] = []
self._len: int = 0
if data is not None:
for x in data:
self.append(x)

def append(self, x: T) -> None:
if self._runs and self._runs[-1][0] == x:
v, c = self._runs[-1]
self._runs[-1] = (v, c + 1)
else:
self._runs.append((x, 1))
self._len += 1

def extend(self, xs: Iterable[T]) -> None:
for x in xs:
self.append(x)

def __len__(self) -> int:
return self._len

def __iter__(self) -> Iterator[T]:
for v, c in self._runs:
for _ in range(c):
yield v

def runs(self) -> List[Tuple[T, int]]:
"""Return the internal RLE representation."""
return list(self._runs)

def __repr__(self) -> str:
# convert bool to int
runs_str = list_preview_str(self._runs)
return f"RLEList(len={self._len}, runs={runs_str})"


# example usage

# r = RLEList[str]()

# r.append("A")
# r.append("A")
# r.append("B")
# r.extend(["B", "B", "C"])

# print(len(r)) # 6
# print(list(r)) # ['A', 'A', 'B', 'B', 'B', 'C']
# print(r.runs()) # [('A', 2), ('B', 3), ('C', 1)]
# print(r) # RLEList(len=6, runs=[('A', 2), ('B', 3), ('C', 1)])
1 change: 0 additions & 1 deletion src/detectmatelibrary/utils/persistency.py

This file was deleted.

5 changes: 5 additions & 0 deletions src/detectmatelibrary/utils/persistency/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .event_persistency import EventPersistency

__all__ = [
"EventPersistency"
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from typing import Any, List
from dataclasses import dataclass


@dataclass
class EventDataStructure(ABC):
"""Storage backend interface for event-based data analysis."""

event_id: int = -1
template: str = ""

@abstractmethod
def add_data(self, data_object: Any) -> None: ...

@abstractmethod
def get_data(self) -> Any: ...

@abstractmethod
def get_variables(self) -> List[str]: ...

@abstractmethod
def to_data(self, raw_data: Any) -> Any:
"""Convert raw data into the appropriate data format for storage."""
pass

def get_template(self) -> str:
return self.template

def get_event_id(self) -> int:
return self.event_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .event_dataframe import EventDataFrame
from .chunked_event_dataframe import ChunkedEventDataFrame

__all__ = ["EventDataFrame", "ChunkedEventDataFrame"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field

import polars as pl

from ..base import EventDataStructure


# -------- Polars backends --------
@dataclass
class ChunkedEventDataFrame(EventDataStructure):
"""
Streaming-friendly Polars DataFrame backend:
- Ingest appends chunks (cheap)
- Retention by max_rows is handled internally
- DataFrame is materialized on demand
"""
max_rows: Optional[int] = 10_000_000
compact_every: int = 1000

chunks: list[pl.DataFrame] = field(default_factory=list)
_rows: int = 0

def add_data(self, data: pl.DataFrame) -> None:
if data.height == 0:
return
self.chunks.append(data)
self._rows += data.height

if self.max_rows is not None:
self._evict_oldest()

if len(self.chunks) > self.compact_every:
self._compact()

def _evict_oldest(self) -> None:
if self.max_rows is not None:
overflow = self._rows - self.max_rows
if overflow <= 0:
return

# drop whole chunks
while self.chunks and overflow >= self.chunks[0].height:
oldest = self.chunks.pop(0)
overflow -= oldest.height
self._rows -= oldest.height

# trim remaining overflow from the oldest chunk
if overflow > 0 and self.chunks:
oldest = self.chunks[0]
keep = oldest.height - overflow
self.chunks[0] = oldest.tail(keep)
self._rows -= overflow

def _compact(self) -> None:
if not self.chunks:
return
df = pl.concat(self.chunks, how="vertical", rechunk=False)
self.chunks = [df]
self._rows = df.height

def get_data(self) -> pl.DataFrame:
if not self.chunks:
return pl.DataFrame()
if len(self.chunks) == 1:
return self.chunks[0]
return pl.concat(self.chunks, how="vertical", rechunk=False)

def get_variables(self) -> Any:
if not self.chunks:
return []
return self.chunks[0].columns

def to_data(self, raw_data: Dict[str, List[Any]]) -> pl.DataFrame:
return pl.DataFrame(raw_data)

def __repr__(self) -> str:
return (
f"ChunkedEventDataFrame(df=..., rows={self._rows}, chunks={len(self.chunks)}, "
f"variables={self.get_variables()})"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Any, Dict, List

from dataclasses import dataclass, field

import pandas as pd

from ..base import EventDataStructure


# -------- Pandas backend --------

@dataclass
class EventDataFrame(EventDataStructure):
"""
Pandas DataFrame backend:
- Ingest appends data (expensive)
- Retention is not handled (can be extended)
- DataFrame is always materialized
"""
data: pd.DataFrame = field(default_factory=pd.DataFrame)

def add_data(self, data: pd.DataFrame) -> None:
if len(self.data) > 0:
self.data = pd.concat([self.data, data], ignore_index=True)
else:
self.data = data

def get_data(self) -> pd.DataFrame:
return self.data

def get_variables(self) -> List[str]:
return list(self.data.columns)

def to_data(self, raw_data: Dict[int | str, Any]) -> pd.DataFrame:
data = {key: [value] for key, value in raw_data.items()}
return pd.DataFrame(data)

def __repr__(self) -> str:
return f"EventDataFrame(df=..., rows={len(self.data)}, variables={self.get_variables()})"
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Trackers module for tracking variable behaviors over time/events.

A tracker is a data structure that stores a specific feature of a
variable and tracks the behavior of that feature over time/events. It
operates within the persistency framework to monitor how variables
evolve. It is interchangable with other EventDataStructure
implementations.
"""

from .stability import (
StabilityClassifier,
SingleStabilityTracker,
MultiStabilityTracker,
EventStabilityTracker
)
from .base import (
EventTracker,
MultiTracker,
SingleTracker,
Classification,
)

__all__ = [
"EventTracker",
"SingleTracker",
"MultiTracker",
"Classification",
"StabilityClassifier",
"SingleStabilityTracker",
"MultiStabilityTracker",
"EventStabilityTracker",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .single_tracker import SingleTracker, Classification
from .multi_tracker import MultiTracker
from .event_tracker import EventTracker

__all__ = [
"EventTracker",
"MultiTracker",
"SingleTracker",
"Classification",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Event data structure that tracks variable behaviors over time/events."""

from typing import Any, Callable, Dict, Type

from detectmatelibrary.utils.preview_helpers import format_dict_repr

from .multi_tracker import MultiTracker
from .single_tracker import SingleTracker
from ...base import EventDataStructure


class EventTracker(EventDataStructure):
"""Event data structure that tracks the behavior of each event over time /
number of events."""

def __init__(
self,
single_tracker_type: Type[SingleTracker] = SingleTracker,
multi_tracker_type: Type[MultiTracker] = MultiTracker,
converter_function: Callable[[Any], Any] = lambda x: x,
) -> None:
self.single_tracker_type = single_tracker_type
self.multi_tracker_type = multi_tracker_type
self.converter_function = converter_function
self.multi_tracker = self.multi_tracker_type(single_tracker_type=self.single_tracker_type)

def add_data(self, data_object: Any) -> None:
"""Add data to the variable trackers."""
self.multi_tracker.add_data(data_object)

def get_data(self) -> Dict[str, SingleTracker]:
"""Retrieve the tracker's stored data."""
return self.multi_tracker.get_trackers()

def get_variables(self) -> list[str]:
"""Get the list of tracked variable names."""
return list(self.multi_tracker.get_trackers().keys())

def to_data(self, raw_data: Dict[str, Any]) -> Any:
"""Transform raw data into the format expected by the tracker."""
return self.converter_function(raw_data)

def __repr__(self) -> str:
strs = format_dict_repr(self.multi_tracker.get_trackers(), indent="\t")
return f"{self.__class__.__name__}(data={{\n\t{strs}\n}})"
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Any, Dict, Type

from detectmatelibrary.utils.preview_helpers import format_dict_repr

from .single_tracker import SingleTracker, Classification


class MultiTracker:
"""Tracks multiple features (e.g. variables or variable combos) using
individual trackers."""

def __init__(self, single_tracker_type: Type[SingleTracker] = SingleTracker) -> None:
self.single_trackers: Dict[str, SingleTracker] = {}
self.single_tracker_type: Type[SingleTracker] = single_tracker_type

def add_data(self, data_object: Dict[str, Any]) -> None:
"""Add data to the appropriate feature trackers."""
for name, value in data_object.items():
if name not in self.single_trackers:
self.single_trackers[name] = self.single_tracker_type()
self.single_trackers[name].add_value(value)

def get_trackers(self) -> Dict[str, SingleTracker]:
"""Get the current feature trackers."""
return self.single_trackers

def classify(self) -> Dict[str, Classification]:
"""Classify all tracked features."""
classifications = {}
for name, tracker in self.single_trackers.items():
classifications[name] = tracker.classify()
return classifications

def __repr__(self) -> str:
strs = format_dict_repr(self.single_trackers, indent="\t")
return f"{self.__class__.__name__}{{\n\t{strs}\n}}\n"
Loading
Loading