Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protoc --version

This dependency is only needed if a proto file is modified. To compile the proto file do:
```bash
protoc --proto_path=src/schemas/ --python_out=src/schemas/ src/schemas/schemas.proto
protoc --proto_path=src/detectmatelibrary/schemas/ --python_out=src/detectmatelibrary/schemas/ src/detectmatelibrary/schemas/schemas.proto
```

### Step 3: Run unit tests
Expand Down
15 changes: 13 additions & 2 deletions src/detectmatelibrary/common/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@

from detectmatelibrary.schemas import BaseSchema

from tools.logging import logger, setup_logging

from typing import Any, Dict, Tuple, List


setup_logging()


class SchemaPipeline:
@staticmethod
def preprocess(
Expand Down Expand Up @@ -76,18 +81,24 @@ def train(

def process(self, data: BaseSchema | bytes) -> BaseSchema | bytes | None:
is_byte, data = SchemaPipeline.preprocess(self.input_schema(), data)
logger.info(f"<<{self.name}>> received:\n{data}")

if (data_buffered := self.data_buffer.add(data)) is None: # type: ignore
return None

if do_training(config=self.config, index=self.data_used_train):
self.data_used_train += 1
logger.info(f"<<{self.name}>> use data for training")
self.train(input_=data_buffered)

output_ = self.output_schema()
anomaly_detected = self.run(input_=data_buffered, output_=output_)
if not anomaly_detected:
logger.info(f"<<{self.name}>> processing data")
return_schema = self.run(input_=data_buffered, output_=output_)
if not return_schema:
logger.info(f"<<{self.name}>> returns None")
return None

logger.info(f"<<{self.name}>> processed:\n{output_}")
return SchemaPipeline.postprocess(output_, is_byte=is_byte)

def get_config(self) -> Dict[str, Any]:
Expand Down
81 changes: 81 additions & 0 deletions src/detectmatelibrary/common/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from detectmatelibrary.common.core import CoreComponent, CoreConfig

from detectmatelibrary.utils.data_buffer import ArgsBuffer, BufferMode
from detectmatelibrary.utils.aux import get_timestamp

from detectmatelibrary.schemas import DetectorSchema, OutputSchema, FieldNotFound

from typing_extensions import override
from typing import List, Optional, Any


class CoreOutputConfig(CoreConfig):
comp_type: str = "outputs"
method_type: str = "core_output"

auto_config: bool = False


class DetectorFieldNotFound(Exception):
pass


def get_field(input_: List[DetectorSchema] | DetectorSchema, field: str) -> List[Any]:
try:
if isinstance(input_, list):
if input_[0].is_field_list(field):
return [item for detector in input_ for item in detector[field]]
return [detector[field] for detector in input_]
else:
return [input_[field]]
except FieldNotFound:
raise DetectorFieldNotFound()


class CoreOutput(CoreComponent):
def __init__(
self,
name: str = "CoreOutput",
buffer_mode: BufferMode = BufferMode.NO_BUF,
buffer_size: Optional[int] = None,
config: Optional[CoreOutputConfig | dict[str, Any]] = CoreOutputConfig(),
) -> None:
if isinstance(config, dict):
config = CoreOutputConfig.from_dict(config, name)

super().__init__(
name=name,
type_=config.comp_type, # type: ignore
config=config, # type: ignore
args_buffer=ArgsBuffer(mode=buffer_mode, size=buffer_size),
input_schema=DetectorSchema,
output_schema=OutputSchema,
)

@override
def run(
self, input_: List[DetectorSchema] | DetectorSchema, output_: OutputSchema # type: ignore
) -> bool:
output_["detectorIDs"] = get_field(input_, "detectorID")
output_["detectorTypes"] = get_field(input_, "detectorType")
output_["alertIDs"] = get_field(input_, "alertID")
output_["logIDs"] = get_field(input_, "logIDs")
output_["extractedTimestamps"] = get_field(input_, "extractedTimestamps")

do_output = self.do_output(input_, output_)
output_["outputTimestamp"] = get_timestamp()

return do_output if do_output is not None else True

def do_output(
self,
input_: List[DetectorSchema] | DetectorSchema,
output_: OutputSchema,
) -> bool | None:
return True

@override
def train(
self, input_: DetectorSchema | list[DetectorSchema] # type: ignore
) -> None:
pass
4 changes: 4 additions & 0 deletions src/detectmatelibrary/common/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from detectmatelibrary import schemas

from tools.logging import logger

from typing import Optional, Any


Expand Down Expand Up @@ -44,8 +46,10 @@ def __init_logs(self) -> schemas.LogSchema:
def process(self, as_bytes: bool = True) -> schemas.LogSchema | bytes | None: # type: ignore
is_new_log = self.read(log := self.__init_logs())
if not is_new_log:
logger.info(f"<<{self.name}>> returns None")
return None

logger.info(f"<<{self.name}>> read:\n{log}")
return SchemaPipeline.postprocess(log, is_byte=as_bytes) if is_new_log else None # type: ignore

def read(self, output_: schemas.LogSchema) -> bool:
Expand Down
46 changes: 46 additions & 0 deletions src/detectmatelibrary/outputs/json_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

from detectmatelibrary.common.output import CoreOutput, CoreOutputConfig
from detectmatelibrary.schemas import DetectorSchema, OutputSchema
from detectmatelibrary.utils.data_buffer import BufferMode

from typing import cast, Any
import json
import os


def save_json_file(path_folder: str, id_: str, data: dict[str, Any]) -> None:
if not os.path.exists(path_folder):
os.mkdir(path_folder)

with open(f"{path_folder}/{id_}.json", "w") as json_file:
json.dump(data, json_file)


class JSONOutputConfig(CoreOutputConfig):
method_type: str = "json_output"
path_folder: str = ""


class JSONOutput(CoreOutput):
def __init__(
self, name: str = "JsonOutput", config: JSONOutputConfig = JSONOutputConfig()
) -> None:

if isinstance(config, dict):
config = JSONOutputConfig.from_dict(config, name)

super().__init__(name=name, buffer_mode=BufferMode.NO_BUF, config=config)

self.config = cast(JSONOutputConfig, self.config)
self.test_mode: bool = False

def do_output(self, input_: DetectorSchema, output_: OutputSchema) -> None: # type: ignore
output_["description"] = f"Alert description: {input_['description']}"
output_["alertsObtain"] = input_["alertsObtain"]

if not self.test_mode:
save_json_file(
path_folder=self.config.path_folder, # type: ignore
id_=str(input_["alertID"]),
data=output_.as_dict(),
)
6 changes: 4 additions & 2 deletions src/detectmatelibrary/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
BaseSchema,
LogSchema,
ParserSchema,
DetectorSchema
DetectorSchema,
OutputSchema,
FieldNotFound,
)


__all__ = ["BaseSchema", "LogSchema", "ParserSchema", "DetectorSchema"]
__all__ = ["BaseSchema", "LogSchema", "ParserSchema", "DetectorSchema", "OutputSchema", "FieldNotFound"]
32 changes: 28 additions & 4 deletions src/detectmatelibrary/schemas/_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@ def __init__(
) -> None:
self.schema_id = schema_id
self.var_names: set[str]
self.__is_list: dict[str, bool] = {}
self.init_schema(kwargs=kwargs)

def __contains__(self, idx: str) -> bool:
return idx in self.var_names

def as_dict(self) -> dict[str, Any]:
"""Return the schema variables as a dictionary."""
return {var: getattr(self, var) for var in self.var_names}

def get_schema(self) -> op.SchemaT:
"""Retrieve the current schema instance."""
return _initialize_schema(
schema_id=self.schema_id,
kwargs={var: getattr(self, var) for var in self.var_names}
)
return _initialize_schema(schema_id=self.schema_id, kwargs=self.as_dict())

def set_schema(self, schema: op.SchemaT) -> None:
"""Set the schema instance and update attributes."""
Expand All @@ -53,6 +55,16 @@ def init_schema(self, kwargs: dict[str, Any] | None) -> None:
var_names.append(var)
self.var_names = set(var_names)

def is_field_list(self, field_name: str) -> bool:
"""Check if a field is a list."""
if field_name in self.__is_list: # Avoid recomputation
return self.__is_list[field_name]

schema = self.get_schema()
is_list = op.is_repeated(schema=schema, field_name=field_name)
self.__is_list[field_name] = is_list
return is_list


class BaseSchema(SchemaVariables):
def __init__(
Expand Down Expand Up @@ -149,3 +161,15 @@ def __init__(
def copy(self) -> "DetectorSchema":
schema: DetectorSchema = super().copy() # type: ignore
return schema


class OutputSchema(BaseSchema):
"""Output schema class."""
def __init__(
self, kwargs: dict[str, Any] | None = None
) -> None:
super().__init__(schema_id=op.OUTPUT_SCHEMA, kwargs=kwargs)

def copy(self) -> "OutputSchema":
schema: OutputSchema = super().copy() # type: ignore
return schema
70 changes: 40 additions & 30 deletions src/detectmatelibrary/schemas/_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@

# Main variables ************************************
# Use Union of actual protobuf classes for better type hints
SchemaT = Union[s.Schema, s.LogSchema, s.ParserSchema, s.DetectorSchema] # type: ignore
SchemaT = Union[s.Schema, s.LogSchema, s.ParserSchema, s.DetectorSchema, s.OutputSchema] # type: ignore
SchemaID = NewType("SchemaID", bytes)

BASE_SCHEMA: SchemaID = SchemaID(b"0")
LOG_SCHEMA: SchemaID = SchemaID(b"1")
PARSER_SCHEMA: SchemaID = SchemaID(b"2")
DETECTOR_SCHEMA: SchemaID = SchemaID(b"3")
OUTPUT_SCHEMA: SchemaID = SchemaID(b"4")

__current_version = "1.0.0"
__id_codes: Dict[SchemaID, Type[Message]] = {
BASE_SCHEMA: s.Schema, # type: ignore
LOG_SCHEMA: s.LogSchema, # type: ignore
PARSER_SCHEMA: s.ParserSchema, # type: ignore
DETECTOR_SCHEMA: s.DetectorSchema, # type: ignore
OUTPUT_SCHEMA: s.OutputSchema, # type: ignore
}


Expand Down Expand Up @@ -51,11 +53,47 @@ def __get_schema_class(schema_id: SchemaID) -> Type[Message]:
return __id_codes[schema_id]


def __is_repeated_field(field: Any) -> bool:
def __is_repeated(field: Any) -> bool:
"""Check if a field in the message is a repeated element."""
return bool(field.is_repeated)


# Auxiliar methods *****************************************
def is_repeated(schema: SchemaT, field_name: str) -> bool:
"""Check if a field is a repeated element."""
for field in schema.DESCRIPTOR.fields:
if field.name == field_name:
return __is_repeated(field)
raise NotSupportedSchema()


def check_is_same_schema(
id_schema_1: SchemaID, id_schema_2: SchemaID
) -> None | IncorrectSchema:
"""Raise exception if two schemas do not match."""
if id_schema_1 != id_schema_2:
raise IncorrectSchema()
return None


def check_if_schema_is_complete(schema: SchemaT) -> None | NotCompleteSchema:
"""Check if the schema is complete."""
missing_fields = []
for field in schema.DESCRIPTOR.fields:
if not __is_repeated(field) and not schema.HasField(field.name):
missing_fields.append(field.name)

if len(missing_fields) > 0:
raise NotCompleteSchema(f"Missing fields: {missing_fields}")

return None


def get_variables_names(schema: SchemaT) -> list[str]:
"""Get the variable names of the schema."""
return [field.name for field in schema.DESCRIPTOR.fields]


# Main methods *****************************************
def initialize(schema_id: SchemaID, **kwargs: Any) -> SchemaT | NotSupportedSchema:
"""Initialize a protobuf schema, it use its arguments and the assigned
Expand Down Expand Up @@ -95,31 +133,3 @@ def deserialize(message: bytes) -> Tuple[SchemaID, SchemaT]:
schema = schema_class()
schema.ParseFromString(message[1:])
return schema_id, schema


# Auxiliar methods *****************************************
def check_is_same_schema(
id_schema_1: SchemaID, id_schema_2: SchemaID
) -> None | IncorrectSchema:
"""Raise exception if two schemas do not match."""
if id_schema_1 != id_schema_2:
raise IncorrectSchema()
return None


def check_if_schema_is_complete(schema: SchemaT) -> None | NotCompleteSchema:
"""Check if the schema is complete."""
missing_fields = []
for field in schema.DESCRIPTOR.fields:
if not __is_repeated_field(field) and not schema.HasField(field.name):
missing_fields.append(field.name)

if len(missing_fields) > 0:
raise NotCompleteSchema(f"Missing fields: {missing_fields}")

return None


def get_variables_names(schema: SchemaT) -> list[str]:
"""Get the variable names of the schema."""
return [field.name for field in schema.DESCRIPTOR.fields]
12 changes: 12 additions & 0 deletions src/detectmatelibrary/schemas/schemas.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,15 @@ message DetectorSchema {
optional int32 receivedTimestamp = 11;
map<string, string> alertsObtain = 12;
}

message OutputSchema {
optional string __version__ = 1;
repeated string detectorIDs = 2;
repeated string detectorTypes = 3;
repeated int32 alertIDs = 4;
optional int32 outputTimestamp = 5;
repeated int32 logIDs = 6;
repeated int32 extractedTimestamps = 9;
optional string description = 10;
map<string, string> alertsObtain = 12;
}
Loading