diff --git a/.github/ISSUE_TEMPLATE/01_bug_report.md b/.github/ISSUE_TEMPLATE/01_bug_report.md new file mode 100644 index 0000000..8495c6c --- /dev/null +++ b/.github/ISSUE_TEMPLATE/01_bug_report.md @@ -0,0 +1,21 @@ +--- +name: 🐜 Bug report +about: If something isn't working 🔧 +--- + +### Subject of the issue +Describe your issue here. + +### Your environment +* Version of detectmate +* Version of python +* Docker or manual installation? + +### Steps to reproduce +Tell us how to reproduce this issue. + +### Expected behaviour +Tell us what should happen + +### Actual behaviour +Tell us what happens instead diff --git a/.github/ISSUE_TEMPLATE/02_feature_request.md b/.github/ISSUE_TEMPLATE/02_feature_request.md new file mode 100644 index 0000000..442b05c --- /dev/null +++ b/.github/ISSUE_TEMPLATE/02_feature_request.md @@ -0,0 +1,20 @@ +--- +name: 🚀 Feature request +about: If you have a feature request 💡 +--- + +**Context** + +What are you trying to do and how would you want to do it differently? Is it something you currently you cannot do? Is this related to an issue/problem? + +**Alternatives** + +Can you achieve the same result doing it in an alternative way? Is the alternative considerable? + +**Has the feature been requested before?** + +Please provide a link to the issue. + +**If the feature request is approved, would you be willing to submit a PR?** + +Yes / No _(Help can be provided if you need assistance submitting a PR)_ diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml new file mode 100644 index 0000000..3ba13e0 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -0,0 +1 @@ +blank_issues_enabled: false diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..53dec30 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,19 @@ +# Task + + +# Description + + + + +# How Has This Been Tested? + + +# Checklist + + +- [ ] This Pull-Request goes to the **development** branch. +- [ ] I have successfully run prek locally. +- [ ] I have added tests to cover my changes. +- [ ] I have linked the issue-id to the task-description. +- [ ] I have performed a self-review of my own code. diff --git a/.gitignore b/.gitignore index 3113ce1..b0a557c 100644 --- a/.gitignore +++ b/.gitignore @@ -199,3 +199,6 @@ cython_debug/ local/ test.ipynb test.py + +# claude code +CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..95c372c --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,102 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +DetectMateLibrary is a Python library for log processing and anomaly detection. It provides composable, stream-friendly components (parsers and detectors) that communicate via Protobuf-based schemas. The library is designed for both single-process and microservice deployments. + +## Development Commands + +```bash +# Install dependencies and pre-commit hooks +uv sync --dev +uv run prek install + +# Run tests +uv run pytest -q +uv run pytest -s # verbose with stdout +uv run pytest --cov=. --cov-report=term-missing # with coverage +uv run pytest tests/test_foo.py # single test file + +# Run linting/formatting (all pre-commit hooks) +uv run prek run -a + +# Recompile Protobuf (only if schemas.proto is modified) +protoc --proto_path=src/detectmatelibrary/schemas/ \ + --python_out=src/detectmatelibrary/schemas/ \ + src/detectmatelibrary/schemas/schemas.proto + +# Scaffold a new component workspace +mate create --type --name --dir +``` + +## Architecture + +### Data Flow + +``` +Raw Logs → Parser → ParserSchema → Detector → DetectorSchema (Alerts) +``` + +All data flows through typed Protobuf-backed schema objects. Components are stateful and support an optional training phase before detection. + +### Core Abstractions (`src/detectmatelibrary/common/`) + +- **`CoreComponent`** — base class managing buffering, ID generation, and training state + - **`CoreParser(CoreComponent)`** — parse raw logs into `ParserSchema` + - **`CoreDetector(CoreComponent)`** — detect anomalies in `ParserSchema`, emit `DetectorSchema` +- **`CoreConfig`** / **`CoreParserConfig`** / **`CoreDetectorConfig`** — Pydantic-based configuration hierarchy + +### Schema System (`src/detectmatelibrary/schemas/`) + +- `BaseSchema` wraps generated Protobuf messages with dict-like access (`schema["field"]`) +- Key schemas: `LogSchema`, `ParserSchema`, `DetectorSchema` +- Support serialization to/from bytes for transport and persistence + +### Buffering Modes (`src/detectmatelibrary/utils/data_buffer.py`) + +Three modes via `ArgsBuffer` config: +- **NO_BUF** — one item at a time (default) +- **BATCH** — accumulate N items, process as batch +- **WINDOW** — sliding window of size N + +### Implementations + +- **Parsers** (`src/detectmatelibrary/parsers/`): `JsonParser`, `DummyParser`, `TemplateMatcherParser` (uses Drain3 for template mining) +- **Detectors** (`src/detectmatelibrary/detectors/`): `NewValueDetector`, `NewValueComboDetector`, `RandomDetector`, `DummyDetector` +- **Utilities** (`src/detectmatelibrary/utils/`): `DataBuffer`, `EventPersistency`, `KeyExtractor`, `TimeFormatHandler`, `IdGenerator` + +## Extending the Library + +Implement a custom detector by subclassing `CoreDetector`: + +```python +class MyDetectorConfig(CoreDetectorConfig): + method_type: str = "my_detector" + my_param: int = 10 + +class MyDetector(CoreDetector): + def __init__(self, name="MyDetector", config=MyDetectorConfig()): + super().__init__(name=name, config=config) + + def train(self, input_: ParserSchema) -> None: + pass # optional + + def detect(self, input_: ParserSchema, output_: DetectorSchema) -> bool: + output_["detectorID"] = self.name + output_["score"] = 0.0 + return False # True = anomaly detected +``` + +Same pattern applies for `CoreParser` — implement `parse(input_: LogSchema, output_: ParserSchema) -> bool`. + +## Code Quality + +Pre-commit hooks enforce: +- **mypy** strict mode +- **flake8** linting, **autopep8** formatting (max line 110) +- **bandit** security checks, **vulture** dead-code detection (70% threshold) +- **docformatter** docstring style + +Python 3.12 is required (see `.python-version`). diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000..e7cbac5 --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,32 @@ +# Security Policy + +## Supported Versions + +| Version | Supported | +| ------- | ------------------ | +| 1.x.x | :white_check_mark: | +| < 1.0.0 | :x: | + +> [!IMPORTANT] +> Currently DetectMateService is a work in progress and heavily under development. Possible vulnerabilities will not be treated any special and can be issued using [GitHub-Issues](https://github.com/ait-detectmate/DetectMateService/issues) + +## Reporting a Vulnerability + +Please email reports about any security related issues you find to aecid@ait.ac.at. This mail is delivered to a small developer team. Your email will be acknowledged within one business day, and you'll receive a more detailed response to your email within 7 days indicating the next steps in handling your report. + +Please use a descriptive subject line for your report email. After the initial reply to your report, our team will endeavor to keep you informed of the progress being made towards a fix and announcement. + +In addition, please include the following information along with your report: + +* Your name and affiliation (if any). +* A description of the technical details of the vulnerabilities. It is very important to let us know how we can reproduce your findings. +* An explanation who can exploit this vulnerability, and what they gain when doing so -- write an attack scenario. This will help us evaluate your report quickly, especially if the issue is complex. +* Whether this vulnerability public or known to third parties. If it is, please provide details. +* Whether we could mention your name in the changelogs. + +Once an issue is reported we use the following disclosure process: + +* When a report is received, we confirm the issue and determine its severity. +* If we know of specific third-party services or software based on DetectMateService that require mitigation before publication, those projects will be notified. +* Fixes are prepared for the last minor release of the latest major release. +* Patch releases are published for all fixed released versions. diff --git a/config/pipeline_config_default.yaml b/config/pipeline_config_default.yaml index 0475495..4271752 100644 --- a/config/pipeline_config_default.yaml +++ b/config/pipeline_config_default.yaml @@ -68,8 +68,6 @@ detectors: NewValueComboDetector: method_type: new_value_combo_detector auto_config: False - params: - comb_size: 3 events: 1: test: diff --git a/docs/detectors.md b/docs/detectors.md index cb987f2..7625b9d 100644 --- a/docs/detectors.md +++ b/docs/detectors.md @@ -16,7 +16,7 @@ This document describes the minimal API, implementation guidance, a short exampl ```python class CoreDetectorConfig(CoreConfig): - comp_type: str = "detectors" + component_type: str = "detectors" method_type: str = "core_detector" parser: str = "" @@ -89,43 +89,63 @@ List of detectors: * [Combo Detector](detectors/combo.md): Detect new combination of variables in the logs. * [New Event](detectors/new_event.md): Detect new events in the variables in the logs. +## Configuration -## Auto-configuration (optional) - -Detectors can optionally support **auto-configuration** — a process where the detector automatically discovers which variables are worth monitoring, instead of requiring the user to specify them manually. - -### Enabling auto-configuration - -Auto-configuration is controlled by the `auto_config` flag in the pipeline config (e.g. `config/pipeline_config_default.yaml`): +When `auto_config` is set to `False`, the detector expects an explicit `events` block that specifies exactly which variables to monitor: ```yaml detectors: NewValueDetector: method_type: new_value_detector - auto_config: True # enable auto-configuration - params: {} - # no "events" block needed — it will be generated automatically + auto_config: False + params: {} # global parameters + events: # event-specific configuration + 1: # event_id + instance1: # name of instance (arbitrary) + params: {} # additional params + variables: + - pos: 0 # location of an unnamed variable from the log message + name: var1 # name of variable (arbitrary) + header_variables: + - pos: level # location of a named variable (defined in log_format of parser) + global: # define global instance for new_value_detector similar to "events" + global_instance1: # define instance name + header_variables: # same logic as header_variables in "events" + - pos: Status ``` -When `auto_config` is set to `False`, the detector expects an explicit `events` block that specifies exactly which variables to monitor: + +### Configuration semantics (preliminary) + +**`events` key** — The integer key is the `EventID` (or `event_id`) to monitor (see the MatcherParser docs for how EventID is assigned). + +**`variables[].pos`** — The 0-indexed position of the `<*>` wildcard in the matched template, counting from left to right starting at 0. For example, given: + +```text +pid=<*> uid=<*> auid=<*> ses=<*> msg='op=<*> acct=<*> exe=<*> hostname=<*> addr=<*> terminal=<*> res=<*>' +``` + +`pos: 0` captures `pid=`, `pos: 6` captures `exe=`, etc. + +**`header_variables[].pos`** — A named field from the log format string (e.g., `Type`, `Time`, `Content`) rather than a wildcard position. + + +### Auto-configuration (optional) + +Detectors can optionally support **auto-configuration** — a process where the detector automatically discovers which variables are worth monitoring, instead of requiring the user to specify them manually. + +Auto-configuration is controlled by the `auto_config` flag in the pipeline config (e.g. `config/pipeline_config_default.yaml`): ```yaml detectors: NewValueDetector: method_type: new_value_detector - auto_config: False + auto_config: True # enable auto-configuration params: {} - events: - 1: - instance1: - params: {} - variables: - - pos: 0 - name: var1 - header_variables: - - pos: level + # no "events" block needed — it will be generated automatically ``` + ### How it works When auto-configuration is enabled, the detector goes through two extra phases before training: @@ -173,7 +193,7 @@ The `set_configuration()` method queries the tracker results and generates the f def set_configuration(self): variables = {} for event_id, tracker in self.auto_conf_persistency.get_events_data().items(): - stable_vars = tracker.get_variables_by_classification("STABLE") + stable_vars = tracker.get_features_by_classification("STABLE") variables[event_id] = stable_vars config_dict = generate_detector_config( diff --git a/docs/detectors/combo.md b/docs/detectors/combo.md index 0272668..1440167 100644 --- a/docs/detectors/combo.md +++ b/docs/detectors/combo.md @@ -18,7 +18,7 @@ detectors: method_type: new_value_combo_detector auto_config: False params: - comb_size: 3 + max_combo_size: 3 events: 1: test: diff --git a/docs/parsers.md b/docs/parsers.md index 68c35d8..744419a 100644 --- a/docs/parsers.md +++ b/docs/parsers.md @@ -102,4 +102,10 @@ def test_my_parser_parse(): assert out["variables"] == ["a", "b", "c"] ``` +## Available parsers + +- [JSON Parser](parsers/json_parser.md): extracts structured fields from JSON-formatted logs. +- [Template Matcher](parsers/template_matcher.md): matches logs against a predefined set of `<*>` templates. +- [LogBatcher Parser](parsers/logbatcher_parser.md): LLM-based parser that infers templates from raw logs with no training data. + Go back to [Index](index.md) diff --git a/docs/parsers/logbatcher_parser.md b/docs/parsers/logbatcher_parser.md new file mode 100644 index 0000000..5970876 --- /dev/null +++ b/docs/parsers/logbatcher_parser.md @@ -0,0 +1,106 @@ +# LogBatcher Parser + +LLM-based log parser that infers event templates from raw log messages using any OpenAI-compatible model. No training data or labeled examples are required. + +| | Schema | Description | +|------------|-------------------------------|------------------------------------------| +| **Input** | [LogSchema](../schemas.md) | Raw log string | +| **Output** | [ParserSchema](../schemas.md) | Structured log with template and variables | + +## Overview + +`LogBatcherParser` wraps the [LogBatcher](https://github.com/LogIntelligence/LogBatcher) engine (MIT, LogIntelligence 2024) as a `CoreParser`. Parsing proceeds in two phases: + +1. **Cache lookup** — the incoming log is matched against previously seen templates using a hash-based exact match followed by a tree-based similarity check. If a match is found, no LLM call is made. +2. **LLM query** — on a cache miss, the log is submitted to the configured model. The returned template is stored in the cache for future reuse. + +Variable slots in templates use the `<*>` wildcard notation (e.g. `User <*> logged in from <*>`). Extracted variables are written to `output_["variables"]` in order of appearance. + +## Configuration + +| Field | Type | Default | Description | +|---|---|---|---| +| `method_type` | string | `"logbatcher_parser"` | Parser type identifier | +| `model` | string | `"gpt-4o-mini"` | Model name passed to the OpenAI-compatible endpoint | +| `api_key` | string | `""` | API key for the chosen provider | +| `base_url` | string | `""` | Base URL of the OpenAI-compatible endpoint. Leave empty to use the default OpenAI endpoint | +| `batch_size` | int | `10` | Maximum number of logs submitted per LLM call | + +Example YAML fragment (OpenAI): + +```yaml +parsers: + LogBatcherParser: + method_type: logbatcher_parser + params: + model: "gpt-4o-mini" + api_key: "" + batch_size: 10 +``` + +Example YAML fragment (local Ollama): + +```yaml +parsers: + LogBatcherParser: + method_type: logbatcher_parser + params: + model: "llama3" + api_key: "ollama" + base_url: "http://localhost:11434/v1" + batch_size: 10 +``` + +## Usage examples + +Basic usage — parse a raw log and read the inferred template: + +```python +from detectmatelibrary.parsers.logbatcher import LogBatcherParser, LogBatcherParserConfig +import detectmatelibrary.schemas as schemas + +config = LogBatcherParserConfig( + api_key="", + model="gpt-4o-mini", + batch_size=10, +) + +parser = LogBatcherParser(name="LogBatcherParser", config=config) + +input_log = schemas.LogSchema({ + "logID": "1", + "log": "User admin logged in from 192.168.1.10", +}) + +output = schemas.ParserSchema() +parser.parse(input_log, output) + +print(output["template"]) # e.g. "User <*> logged in from <*>" +print(output["variables"]) # e.g. ["admin", "192.168.1.10"] +print(output["EventID"]) # integer index assigned by the cache +``` + +Using a local Ollama instance: + +```python +config = LogBatcherParserConfig( + api_key="ollama", + model="llama3", + base_url="http://localhost:11434/v1", + batch_size=10, +) +parser = LogBatcherParser(name="LogBatcherParser", config=config) +``` + +Passing config as a dict: + +```python +parser = LogBatcherParser(config={ + "method_type": "logbatcher_parser", + "api_key": "", + "model": "gpt-4o-mini", + "batch_size": 10, +}) +``` + +Go back to [Index](../index.md) diff --git a/docs/parsers/template_matcher.md b/docs/parsers/template_matcher.md index dc42bf7..9b8a64c 100644 --- a/docs/parsers/template_matcher.md +++ b/docs/parsers/template_matcher.md @@ -14,9 +14,23 @@ The template matcher is a lightweight, fast parser intended for logs that follow - Preprocesses logs and templates (remove spaces, punctuation, lowercase) based on config. - Finds the first template that matches and extracts all wildcard parameters in order. - Populates ParserSchema fields: `EventID`, `template`, `variables`, `logID`, and related fields. +- **`EventID` is the 0-indexed line number of the matched template** in the template file (first line → `EventID: 0`, second line → `EventID: 1`, etc.). This parser is deterministic and designed for high-throughput use when templates are known in advance. +## EventID assignment (preliminary) + +The `EventID` (or `event_id`) field in the output `ParserSchema` identifies which template was matched. It equals the **0-indexed line number** of the matching template in the template file: + +| Line in template file | EventID | +|-----------------------|---------| +| 1st line | 0 | +| 2nd line | 1 | +| 3rd line | 2 | +| ... | ... | + +This `EventID` is the integer key used in detector configurations (e.g., `NewValueDetector`) to scope detection rules to logs of a particular template type. + ## Template format - Templates are plain text lines in a template file. diff --git a/mkdocs.yml b/mkdocs.yml index b2b0de9..96f7fbd 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -18,6 +18,7 @@ nav: - Parsers Methods: - Template Matcher: parsers/template_matcher.md - Json Parser: parsers/json_parser.md + - LogBatcher Parser: parsers/logbatcher_parser.md - Detectors Methods: - Random Detector: detectors/random_detector.md - New Value: detectors/new_value.md diff --git a/pyproject.toml b/pyproject.toml index edd0e0a..dfd5c1d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detectmatelibrary" -version = "0.1.0" +version = "0.2.0" description = "DetectMate Library for log processing components" readme = "README.md" dynamic = ["authors"] @@ -10,6 +10,12 @@ dependencies = [ "pydantic>=2.11.7", "pyyaml>=6.0.3", "regex>=2025.11.3", + "kafka-python>=2.3.0", + "openai>=2.26.0", + "tenacity>=9.1.4", + "scipy>=1.17.1", + "scikit-learn>=1.8.0", + "tiktoken>=0.12.0", "numpy>=2.3.2", "pandas>=2.3.2", "polars>=1.38.1", diff --git a/setup.py b/setup.py index 4e86b68..7dcd355 100644 --- a/setup.py +++ b/setup.py @@ -25,4 +25,11 @@ def gather_dependencies(toml_path: str = "pyproject.toml") -> list[str]: author="voice", author_email="voice@example.com", install_requires=gather_dependencies(), + data_files=[( + "src/tools/workspace/templates/data", + [ + "src/tools/workspace/templates/data/logs.json", + "src/tools/workspace/templates/data/parsed_log.json", + ] + )] ) diff --git a/src/detectmatelibrary/common/_config/__init__.py b/src/detectmatelibrary/common/_config/__init__.py index 7c0265b..0ee1ed8 100644 --- a/src/detectmatelibrary/common/_config/__init__.py +++ b/src/detectmatelibrary/common/_config/__init__.py @@ -9,6 +9,14 @@ from typing import Any, Dict from copy import deepcopy +from numpy.random import choice +import string + + +def random_id(length: int = 10) -> str: + characters = [s for s in string.ascii_letters + string.digits] + return "".join(str(choice(characters)) for _ in range(length)) + class BasicConfig(BaseModel): """Base configuration class with helper methods.""" @@ -16,7 +24,7 @@ class BasicConfig(BaseModel): model_config = ConfigDict(extra="forbid") method_type: str = "default_method_type" - comp_type: str = "default_type" + component_type: str = "default_type" auto_config: bool = False @@ -33,13 +41,13 @@ def update_config(self, new_config: Dict[str, Any]) -> None: def from_dict(cls, data: Dict[str, Any], method_id: str) -> Self: aux = cls() config_ = ConfigMethods.get_method( - deepcopy(data), comp_type=aux.comp_type, method_id=method_id + deepcopy(data), component_type=aux.component_type, method_id=method_id ) ConfigMethods.check_type(config_, method_type=aux.method_type) return cls(**ConfigMethods.process(config_)) - def to_dict(self, method_id: str) -> Dict[str, Any]: + def to_dict(self, method_id: str = random_id()) -> Dict[str, Any]: """Convert the config back to YAML-compatible dictionary format. This is the inverse of from_dict() and ensures yaml -> pydantic -> yaml preservation. @@ -48,7 +56,7 @@ def to_dict(self, method_id: str) -> Dict[str, Any]: method_id: The method identifier to use in the output structure Returns: - Dictionary with structure: {comp_type: {method_id: config_data}} + Dictionary with structure: {component_type: {method_id: config_data}} """ # Build the config in the format expected by from_dict result: Dict[str, Any] = { @@ -63,7 +71,7 @@ def to_dict(self, method_id: str) -> Dict[str, Any]: for field_name, field_value in self: # Skip meta fields - if field_name in ("comp_type", "method_type", "auto_config"): + if field_name in ("component_type", "method_type", "auto_config"): continue # Handle EventsConfig specially @@ -96,9 +104,9 @@ def to_dict(self, method_id: str) -> Dict[str, Any]: if events_data is not None: result["events"] = events_data - # Wrap in the comp_type and method_id structure + # Wrap in the component_type and method_id structure return { - self.comp_type: { + self.component_type: { method_id: result } } diff --git a/src/detectmatelibrary/common/_config/_compile.py b/src/detectmatelibrary/common/_config/_compile.py index 5198629..0f01d5d 100644 --- a/src/detectmatelibrary/common/_config/_compile.py +++ b/src/detectmatelibrary/common/_config/_compile.py @@ -41,9 +41,9 @@ def _classify_variables( class MethodNotFoundError(Exception): - def __init__(self, method_id: str, comp_type: str) -> None: + def __init__(self, method_id: str, component_type: str) -> None: super().__init__( - f"Method '{method_id}' of type '{comp_type}' not found in configuration." + f"Method '{method_id}' of type '{component_type}' not found in configuration." ) @@ -72,15 +72,15 @@ def __init__(self) -> None: class ConfigMethods: @staticmethod def get_method( - config: Dict[str, Dict[str, Dict[str, Any]]], method_id: str, comp_type: str + config: Dict[str, Dict[str, Dict[str, Any]]], method_id: str, component_type: str ) -> Dict[str, Any]: - if comp_type not in config: - raise TypeNotFoundError(comp_type) + if component_type not in config: + raise TypeNotFoundError(component_type) - args = config[comp_type] + args = config[component_type] if method_id not in args: - raise MethodNotFoundError(method_id, comp_type) + raise MethodNotFoundError(method_id, component_type) return args[method_id] @@ -142,7 +142,7 @@ def generate_detector_config( detector_name: Name of the detector, used as the base instance_id. method_type: Type of detection method (e.g., "new_value_detector"). **additional_params: Additional parameters for the detector's params - dict (e.g., comb_size=3). + dict (e.g., max_combo_size=3). Returns: Dictionary with structure compatible with detector config classes. @@ -162,7 +162,7 @@ def generate_detector_config( variable_selection={1: [("username", "src_ip"), ("var_0", "var_1")]}, detector_name="MyDetector", method_type="new_value_combo_detector", - comb_size=2, + max_combo_size=2, ) """ var_pattern = re.compile(r"^var_(\d+)$") diff --git a/src/detectmatelibrary/common/_config/_formats.py b/src/detectmatelibrary/common/_config/_formats.py index 564d5e5..dcee7e2 100644 --- a/src/detectmatelibrary/common/_config/_formats.py +++ b/src/detectmatelibrary/common/_config/_formats.py @@ -6,16 +6,17 @@ # Sub-formats ********************************************************+ class Variable(BaseModel): - pos: int - name: str + pos: str | int + name: str = "" params: Dict[str, Any] = {} def to_dict(self) -> Dict[str, Any]: """Convert Variable to YAML-compatible dictionary.""" result: Dict[str, Any] = { "pos": self.pos, - "name": self.name, } + if self.name: + result["name"] = self.name if self.params: result["params"] = self.params return result @@ -38,7 +39,7 @@ def to_dict(self) -> Dict[str, Any]: class _EventInstance(BaseModel): """Configuration for a specific instance within an event.""" params: Dict[str, Any] = {} - variables: Dict[int, Variable] = {} + variables: Dict[str | int, Variable] = {} header_variables: Dict[str, Header] = {} @classmethod @@ -79,7 +80,7 @@ def _init(cls, instances_dict: Dict[str, Dict[str, Any]]) -> "_EventConfig": return cls(instances=instances) @property - def variables(self) -> Dict[int, Variable]: + def variables(self) -> Dict[str | int, Variable]: """Pass-through to first instance for compatibility.""" if self.instances: return next(iter(self.instances.values())).variables diff --git a/src/detectmatelibrary/common/_core_op/_fit_logic.py b/src/detectmatelibrary/common/_core_op/_fit_logic.py new file mode 100644 index 0000000..7a4aa61 --- /dev/null +++ b/src/detectmatelibrary/common/_core_op/_fit_logic.py @@ -0,0 +1,119 @@ + +from enum import Enum + + +class TrainState(Enum): + DEFAULT = 0 + STOP_TRAINING = 1 + KEEP_TRAINING = 2 + + def describe(self) -> str: + descriptions = [ + "Follow default training behavior.", + "Force stop training.", + "Keep training regardless of default behavior." + ] + + return descriptions[self.value] + + +class ConfigState(Enum): + DEFAULT = 0 + STOP_CONFIGURE = 1 + KEEP_CONFIGURE = 2 + + def describe(self) -> str: + descriptions = [ + "Follow default configuration behavior.", + "Force stop configuration.", + "Keep configuring regardless of default behavior." + ] + + return descriptions[self.value] + + +def do_training( + data_use_training: int | None, index: int, train_state: TrainState +) -> bool: + if train_state == TrainState.STOP_TRAINING: + return False + elif train_state == TrainState.KEEP_TRAINING: + return True + + return data_use_training is not None and data_use_training > index + + +def do_configure( + data_use_configure: int | None, index: int, configure_state: ConfigState +) -> bool: + if configure_state == ConfigState.STOP_CONFIGURE: + return False + elif configure_state == ConfigState.KEEP_CONFIGURE: + return True + + return data_use_configure is not None and data_use_configure > index + + +class FitLogicState(Enum): + DO_CONFIG = 0 + DO_TRAIN = 1 + NOTHING = 2 + + +class FitLogic: + def __init__( + self, data_use_configure: int | None, data_use_training: int | None + ) -> None: + + self.train_state = TrainState.DEFAULT + self.configure_state = ConfigState.DEFAULT + + self.data_used_train = 0 + self.data_used_configure = 0 + + self._configuration_done = False + self.config_finished = False + + self._training_done = False + self.training_finished = False + + self.data_use_configure = data_use_configure + self.data_use_training = data_use_training + + def finish_config(self) -> bool: + if self._configuration_done and not self.config_finished: + self.config_finished = True + return True + + return False + + def finish_training(self) -> bool: + if self._training_done and not self.training_finished: + self.training_finished = True + return True + + return False + + def run(self) -> FitLogicState: + if do_configure( + data_use_configure=self.data_use_configure, + index=self.data_used_configure, + configure_state=self.configure_state + ): + self.data_used_configure += 1 + return FitLogicState.DO_CONFIG + else: + if self.data_used_configure > 0 and not self._configuration_done: + self._configuration_done = True + + if do_training( + data_use_training=self.data_use_training, + index=self.data_used_train, + train_state=self.train_state + ): + self.data_used_train += 1 + return FitLogicState.DO_TRAIN + elif self.data_used_train > 0 and not self._training_done: + self._training_done = True + + return FitLogicState.NOTHING \ No newline at end of file diff --git a/src/detectmatelibrary/common/_core_op/_schema_pipeline.py b/src/detectmatelibrary/common/_core_op/_schema_pipeline.py new file mode 100644 index 0000000..401cad6 --- /dev/null +++ b/src/detectmatelibrary/common/_core_op/_schema_pipeline.py @@ -0,0 +1,28 @@ +from detectmatelibrary.schemas import BaseSchema + + +from typing import Tuple + + +class SchemaPipeline: + @staticmethod + def preprocess( + input_: BaseSchema, data: BaseSchema | bytes + ) -> Tuple[bool, BaseSchema]: + + is_byte = False + if isinstance(data, bytes): + is_byte = True + input_.deserialize(data) + data = input_.copy() + else: + data = data.copy() + + return is_byte, data + + @staticmethod + def postprocess( + data: BaseSchema, is_byte: bool + ) -> BaseSchema | bytes: + + return data if not is_byte else data.serialize() diff --git a/src/detectmatelibrary/common/core.py b/src/detectmatelibrary/common/core.py index 17eaa9c..02afb3c 100644 --- a/src/detectmatelibrary/common/core.py +++ b/src/detectmatelibrary/common/core.py @@ -1,3 +1,7 @@ +from detectmatelibrary.common._core_op._fit_logic import FitLogicState +from detectmatelibrary.common._core_op._schema_pipeline import SchemaPipeline +from detectmatelibrary.common._core_op._fit_logic import FitLogic + from detectmatelibrary.utils.data_buffer import DataBuffer, ArgsBuffer, BufferMode from detectmatelibrary.utils.id_generator import SimpleIDGenerator @@ -7,113 +11,27 @@ from tools.logging import logger, setup_logging -from typing import Any, Dict, Tuple, List -from enum import Enum +from typing import Any, Dict, List setup_logging() -class SchemaPipeline: - @staticmethod - def preprocess( - input_: BaseSchema, data: BaseSchema | bytes - ) -> Tuple[bool, BaseSchema]: - - is_byte = False - if isinstance(data, bytes): - is_byte = True - input_.deserialize(data) - data = input_.copy() - else: - data = data.copy() - - return is_byte, data - - @staticmethod - def postprocess( - data: BaseSchema, is_byte: bool - ) -> BaseSchema | bytes: - - return data if not is_byte else data.serialize() - - -class TrainState(Enum): - DEFAULT = 0 - STOP_TRAINING = 1 - KEEP_TRAINING = 2 - - def describe(self) -> str: - descriptions = [ - "Follow default training behavior.", - "Force stop training.", - "Keep training regardless of default behavior." - ] - - return descriptions[self.value] - - -class ConfigState(Enum): - DEFAULT = 0 - STOP_CONFIGURE = 1 - KEEP_CONFIGURE = 2 - - def describe(self) -> str: - descriptions = [ - "Follow default configuration behavior.", - "Force stop configuration.", - "Keep configuring regardless of default behavior." - ] - - return descriptions[self.value] - - class CoreConfig(BasicConfig): start_id: int = 10 data_use_training: int | None = None data_use_configure: int | None = None -def do_training(config: CoreConfig, index: int, train_state: TrainState) -> bool: - if train_state == TrainState.STOP_TRAINING: - return False - elif train_state == TrainState.KEEP_TRAINING: - return True - - return config.data_use_training is not None and config.data_use_training > index - - -def do_configure(config: CoreConfig, index: int, configure_state: ConfigState) -> bool: - if configure_state == ConfigState.STOP_CONFIGURE: - return False - elif configure_state == ConfigState.KEEP_CONFIGURE: - return True - - return config.data_use_configure is not None and config.data_use_configure > index - - -class CoreComponent: - """Base class for all components in the system.""" +class Component: + """Empty methods.""" def __init__( self, name: str, type_: str = "Core", config: CoreConfig = CoreConfig(), - args_buffer: ArgsBuffer = ArgsBuffer(BufferMode.NO_BUF), - input_schema: type[BaseSchema] = BaseSchema, - output_schema: type[BaseSchema] = BaseSchema ) -> None: - self.name, self.type_, self.config = name, type_, config - self.input_schema, self.output_schema = input_schema, output_schema - - self.data_buffer = DataBuffer(args_buffer) - self.id_generator = SimpleIDGenerator(self.config.start_id) - self.data_used_train = 0 - self.train_state: TrainState = TrainState.DEFAULT - self.data_used_configure = 0 - self.configure_state: ConfigState = ConfigState.DEFAULT - self._configuration_done = False def __repr__(self) -> str: return f"<{self.type_}> {self.name}: {self.config}" @@ -136,6 +54,37 @@ def configure( def set_configuration(self) -> None: pass + def post_train(self) -> None: + pass + + def get_config(self) -> Dict[str, Any]: + return self.config.get_config() + + def update_config(self, new_config: Dict[str, Any]) -> None: + self.config.update_config(new_config) + + +class CoreComponent(Component): + """Base class for all components in the system.""" + def __init__( + self, + name: str, + type_: str = "Core", + config: CoreConfig = CoreConfig(), + args_buffer: ArgsBuffer = ArgsBuffer(BufferMode.NO_BUF), + input_schema: type[BaseSchema] = BaseSchema, + output_schema: type[BaseSchema] = BaseSchema + ) -> None: + super().__init__(name=name, type_=type_, config=config) + self.input_schema, self.output_schema = input_schema, output_schema + + self.data_buffer = DataBuffer(args_buffer) + self.id_generator = SimpleIDGenerator(self.config.start_id) + self.fitlogic = FitLogic( + data_use_configure=self.config.data_use_configure, + data_use_training=self.config.data_use_training, + ) + def process(self, data: BaseSchema | bytes) -> BaseSchema | bytes | None: is_byte, data = SchemaPipeline.preprocess(self.input_schema(), data) logger.debug(f"<<{self.name}>> received:\n{data}") @@ -143,25 +92,20 @@ def process(self, data: BaseSchema | bytes) -> BaseSchema | bytes | None: if (data_buffered := self.data_buffer.add(data)) is None: # type: ignore return None - if do_configure( - config=self.config, - index=self.data_used_configure, - configure_state=self.configure_state - ): - self.data_used_configure += 1 + if (fit_state := self.fitlogic.run()) == FitLogicState.DO_CONFIG: logger.info(f"<<{self.name}>> use data for configuration") self.configure(input_=data_buffered) return None - else: - if self.data_used_configure > 0 and not self._configuration_done: - self._configuration_done = True - logger.info(f"<<{self.name}>> finalizing configuration") - self.set_configuration() + elif self.fitlogic.finish_config(): + logger.info(f"<<{self.name}>> finalizing configuration") + self.set_configuration() - if do_training(config=self.config, index=self.data_used_train, train_state=self.train_state): - self.data_used_train += 1 - logger.info(f"<<{self.name}>> use data for training") - self.train(input_=data_buffered) + if fit_state == FitLogicState.DO_TRAIN: + logger.info(f"<<{self.name}>> use data for training") + self.train(input_=data_buffered) + elif self.fitlogic.finish_training(): + logger.info(f"<<{self.name}>> finalizing training") + self.post_train() output_ = self.output_schema() logger.info(f"<<{self.name}>> processing data") @@ -172,9 +116,3 @@ def process(self, data: BaseSchema | bytes) -> BaseSchema | bytes | None: logger.debug(f"<<{self.name}>> processed:\n{output_}") return SchemaPipeline.postprocess(output_, is_byte=is_byte) - - def get_config(self) -> Dict[str, Any]: - return self.config.get_config() - - def update_config(self, new_config: Dict[str, Any]) -> None: - self.config.update_config(new_config) diff --git a/src/detectmatelibrary/common/detector.py b/src/detectmatelibrary/common/detector.py index 331eae2..2be7153 100644 --- a/src/detectmatelibrary/common/detector.py +++ b/src/detectmatelibrary/common/detector.py @@ -3,6 +3,7 @@ from detectmatelibrary.utils.data_buffer import ArgsBuffer, BufferMode from detectmatelibrary.utils.aux import get_timestamp +from detectmatelibrary.utils.persistency.event_persistency import EventPersistency from detectmatelibrary.schemas import ParserSchema, DetectorSchema @@ -10,6 +11,7 @@ from typing import Dict, List, Optional, Any from detectmatelibrary.utils.time_format_handler import TimeFormatHandler +from tools.logging import logger _time_handler = TimeFormatHandler() @@ -56,7 +58,7 @@ def get_configured_variables( # Extract template variables by position if hasattr(event_config, "variables"): for pos, var in event_config.variables.items(): - if pos < len(input_["variables"]): + if isinstance(pos, int) and pos < len(input_["variables"]): result[var.name] = input_["variables"][pos] # Extract header/log format variables by name @@ -89,12 +91,53 @@ def get_global_variables( return result +def validate_config_coverage( + detector_name: str, + config_events: EventsConfig | dict[str, Any], + persistency: EventPersistency, +) -> None: + """Log warnings when configured EventIDs or variables have no training + data. + + Args: + detector_name: Name of the detector (used in warning messages). + config_events: The detector's events configuration. + persistency: The persistency object populated during training. + """ + config_ids = ( + config_events.events.keys() + if isinstance(config_events, EventsConfig) + else config_events.keys() + ) + if not config_ids: + return + + events_seen = persistency.get_events_seen() + events_with_data = set(persistency.get_events_data().keys()) + + for event_id in config_ids: + if event_id not in events_seen: + logger.warning( + f"[{detector_name}] EventID {event_id!r} is configured but was " + "never observed in training data. Verify that EventIDs in your " + "config match those produced by the parser." + ) + elif event_id not in events_with_data: + logger.warning( + f"[{detector_name}] EventID {event_id!r} was observed in training " + "data but no configured variables were extracted. Verify that " + "variable names/positions in your config match those in the data." + ) + + class CoreDetectorConfig(CoreConfig): - comp_type: str = "detectors" + component_type: str = "detectors" method_type: str = "core_detector" parser: str = "" auto_config: bool = True + events: EventsConfig | dict[str, Any] = {} + global_instances: Dict[str, _EventInstance] = {} class CoreDetector(CoreComponent): @@ -110,7 +153,7 @@ def __init__( super().__init__( name=name, - type_=config.comp_type, # type: ignore + type_=config.component_type, # type: ignore config=config, # type: ignore args_buffer=ArgsBuffer(mode=buffer_mode, size=buffer_size), input_schema=ParserSchema, @@ -156,3 +199,7 @@ def configure( @override def set_configuration(self) -> None: pass + + @override + def post_train(self) -> None: + pass diff --git a/src/detectmatelibrary/common/parser.py b/src/detectmatelibrary/common/parser.py index ca4e5c4..4bfc170 100644 --- a/src/detectmatelibrary/common/parser.py +++ b/src/detectmatelibrary/common/parser.py @@ -12,7 +12,7 @@ class CoreParserConfig(CoreConfig): - comp_type: str = "parsers" + component_type: str = "parsers" method_type: str = "core_parser" log_format: str | None = None diff --git a/src/detectmatelibrary/detectors/new_value_combo_detector.py b/src/detectmatelibrary/detectors/new_value_combo_detector.py index fe0c896..5f5a781 100644 --- a/src/detectmatelibrary/detectors/new_value_combo_detector.py +++ b/src/detectmatelibrary/detectors/new_value_combo_detector.py @@ -1,11 +1,12 @@ from detectmatelibrary.common._config import generate_detector_config -from detectmatelibrary.common._config._formats import EventsConfig, _EventInstance +from detectmatelibrary.common._config._formats import EventsConfig from detectmatelibrary.common.detector import ( CoreDetectorConfig, CoreDetector, get_configured_variables, - get_global_variables + get_global_variables, + validate_config_coverage, ) from detectmatelibrary.utils.data_buffer import BufferMode @@ -20,6 +21,9 @@ from typing import Any, Dict, Sequence, cast, Tuple from itertools import combinations +from typing_extensions import override +from tools.logging import logger + def get_combo(variables: Dict[str, Any]) -> Dict[Tuple[str, ...], Tuple[Any, ...]]: """Get a single combination of all variables as a key-value pair.""" @@ -53,9 +57,9 @@ def get_all_possible_combos( class NewValueComboDetectorConfig(CoreDetectorConfig): method_type: str = "new_value_combo_detector" - events: EventsConfig | dict[str, Any] = {} - global_instances: Dict[str, _EventInstance] = {} - comb_size: int = 2 + max_combo_size: int = 3 + use_stable_vars: bool = True + use_static_vars: bool = False class NewValueComboDetector(CoreDetector): @@ -147,6 +151,12 @@ def detect( return True return False + @override + def post_train(self) -> None: + config = cast(NewValueComboDetectorConfig, self.config) + if not config.auto_config: + validate_config_coverage(self.name, config.events, self.persistency) + def configure(self, input_: ParserSchema) -> None: # type: ignore """Configure the detector based on the stability of individual variables, then learn value combinations based on that @@ -162,7 +172,7 @@ def configure(self, input_: ParserSchema) -> None: # type: ignore named_variables=input_["logFormatVariables"], ) - def set_configuration(self, max_combo_size: int = 3) -> None: + def set_configuration(self, max_combo_size: int | None = None) -> None: """Set the detector configuration based on the stability of variable combinations. @@ -172,18 +182,18 @@ def set_configuration(self, max_combo_size: int = 3) -> None: 3. Re-ingest all events to learn the stability of these combos (testing all possible combos right away would explode combinatorially). """ + config = cast(NewValueComboDetectorConfig, self.config) # run WITH auto_conf_persistency variable_combos = {} for event_id, tracker in self.auto_conf_persistency.get_events_data().items(): - classified_vars = (tracker.get_variables_by_classification("STABLE") + # type: ignore - tracker.get_variables_by_classification("STATIC")) # type: ignore - if len(classified_vars) > 1: - variable_combos[event_id] = classified_vars + stable_vars = tracker.get_features_by_classification("STABLE") # type: ignore + if len(stable_vars) > 1: + variable_combos[event_id] = stable_vars config_dict = generate_detector_config( variable_selection=variable_combos, detector_name=self.name, method_type=self.config.method_type, - comb_size=max_combo_size + max_combo_size=max_combo_size or config.max_combo_size ) # Update the config object from the dictionary instead of replacing it self.config = NewValueComboDetectorConfig.from_dict(config_dict, self.name) @@ -200,15 +210,28 @@ def set_configuration(self, max_combo_size: int = 3) -> None: # rerun to set final config WITH auto_conf_persistency_combos combo_selection = {} for event_id, tracker in self.auto_conf_persistency_combos.get_events_data().items(): - stable_combos = tracker.get_variables_by_classification("STABLE") # type: ignore + stable_combos = [] + if self.config.use_stable_vars: + stable_combos = tracker.get_features_by_classification("STABLE") # type: ignore + static_combos = [] + if self.config.use_static_vars: + static_combos = tracker.get_features_by_classification("STATIC") # type: ignore + combos = stable_combos + static_combos # Keep combos as tuples - each will become a separate config entry - if len(stable_combos) >= 1: - combo_selection[event_id] = stable_combos + if len(combos) > 0: + combo_selection[event_id] = combos config_dict = generate_detector_config( variable_selection=combo_selection, detector_name=self.name, method_type=self.config.method_type, - comb_size=max_combo_size + max_combo_size=max_combo_size or self.config.max_combo_size ) # Update the config object from the dictionary instead of replacing it self.config = NewValueComboDetectorConfig.from_dict(config_dict, self.name) + events = self.config.events + if isinstance(events, EventsConfig) and not events.events: + logger.warning( + f"[{self.name}] auto_config=True generated an empty configuration. " + "No stable variable combinations were found in configure-phase data. " + "The detector will produce no alerts." + ) diff --git a/src/detectmatelibrary/detectors/new_value_detector.py b/src/detectmatelibrary/detectors/new_value_detector.py index bf9f1a3..b7a051f 100644 --- a/src/detectmatelibrary/detectors/new_value_detector.py +++ b/src/detectmatelibrary/detectors/new_value_detector.py @@ -1,11 +1,12 @@ from detectmatelibrary.common._config._compile import generate_detector_config -from detectmatelibrary.common._config._formats import EventsConfig, _EventInstance +from detectmatelibrary.common._config._formats import EventsConfig from detectmatelibrary.common.detector import ( CoreDetectorConfig, CoreDetector, get_configured_variables, - get_global_variables + get_global_variables, + validate_config_coverage, ) from detectmatelibrary.utils.persistency.event_data_structures.trackers.stability.stability_tracker import ( EventStabilityTracker @@ -16,14 +17,15 @@ from detectmatelibrary.schemas import ParserSchema, DetectorSchema from detectmatelibrary.constants import GLOBAL_EVENT_ID -from typing import Any, Dict +from typing_extensions import override +from tools.logging import logger class NewValueDetectorConfig(CoreDetectorConfig): method_type: str = "new_value_detector" - events: EventsConfig | dict[str, Any] = {} - global_instances: Dict[str, _EventInstance] = {} + use_stable_vars: bool = True + use_static_vars: bool = True class NewValueDetector(CoreDetector): @@ -115,12 +117,23 @@ def configure(self, input_: ParserSchema) -> None: # type: ignore named_variables=input_["logFormatVariables"], ) + @override + def post_train(self) -> None: + if not self.config.auto_config: + validate_config_coverage(self.name, self.config.events, self.persistency) + def set_configuration(self) -> None: variables = {} for event_id, tracker in self.auto_conf_persistency.get_events_data().items(): - classified_vars = (tracker.get_variables_by_classification("STABLE") + # type: ignore - tracker.get_variables_by_classification("STABLE")) # type: ignore - variables[event_id] = classified_vars + stable = [] + if self.config.use_stable_vars: + stable = tracker.get_features_by_classification("STABLE") # type: ignore + static = [] + if self.config.use_static_vars: + static = tracker.get_features_by_classification("STATIC") # type: ignore + vars_ = stable + static + if len(vars_) > 0: + variables[event_id] = vars_ config_dict = generate_detector_config( variable_selection=variables, detector_name=self.name, @@ -128,3 +141,10 @@ def set_configuration(self) -> None: ) # Update the config object from the dictionary instead of replacing it self.config = NewValueDetectorConfig.from_dict(config_dict, self.name) + events = self.config.events + if isinstance(events, EventsConfig) and not events.events: + logger.warning( + f"[{self.name}] auto_config=True generated an empty configuration. " + "No stable variables were found in configure-phase data. " + "The detector will produce no alerts." + ) diff --git a/src/detectmatelibrary/parsers/logbatcher/__init__.py b/src/detectmatelibrary/parsers/logbatcher/__init__.py new file mode 100644 index 0000000..f3cfc57 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/__init__.py @@ -0,0 +1,29 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# import sys, os +# sys.path.append(os.path.join(os.getcwd(), "parsing", "parsers")) + +# flake8: noqa +from .parser import LogBatcherParserConfig, LogBatcherParser # noqa: F401 diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/LICENSE b/src/detectmatelibrary/parsers/logbatcher/engine/LICENSE new file mode 100644 index 0000000..493952e --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 LogIntelligence + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/README.md b/src/detectmatelibrary/parsers/logbatcher/engine/README.md new file mode 100644 index 0000000..feb62ad --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/README.md @@ -0,0 +1,266 @@ +# LogBatcher +[![pypi package](https://img.shields.io/pypi/v/logbatcher.svg)](https://pypi.org/project/logbatcher/) +[![Build and test](https://github.com/LogIntelligence/LogBatcher/actions/workflows/build_and_test.yml/badge.svg)](https://github.com/LogIntelligence/LogBatcher/actions/workflows/build_and_test.yml) +[![Upload Python Package](https://github.com/LogIntelligence/LogBatcher/actions/workflows/python-publish.yml/badge.svg)](https://github.com/LogIntelligence/LogBatcher/actions/workflows/python-publish.yml) +[![Downloads](https://static.pepy.tech/badge/logbatcher)](https://pepy.tech/projects/logbatcher) + + +**LogBatcher** is a cost-effective LLM-based log parser that requires no training process or labeled data. This repository includes artifacts for reuse and reproduction of experimental results presented in our ASE'24 paper titled *"Demonstration-Free: Towards More Practical Log Parsing with Large Language Models"*. + +## Work Flow +![workflow](outputs/figures/workflow.png) +Log Batcher contians three main components: **Partitioning, Caching and Batching - Querying** + + +**Table of Contents** + - [Setup](#setup) + - [Get start](#get-start) + - [Project Tree](#project-tree) + - [Usage](#usage) + - [Data format](#data-format) + - [Usage example](#usage-example) + - [Example Evaluation](#example-evaluation) + - [Benchmark](#benchmark) + - [Prepare datasets](#prepare-datasets) + - [Reproduce](#reproduce) + - [Benchmark Evaluation](#benchmark-evaluation) + + +## Setup + + +### Get start + +_To run at the local environment:_ + +Git Clone LogBatcher from github +```bash +git clone https://github.com/LogIntelligence/LogBatcher.git && cd LogBatcher +``` + +The code is implemented in **Python >= 3.9**. To install the required packages, run the following command (conda is optional): +```bash +conda create -n logbatcher python==3.9 +conda activate logbatcher +pip install -r requirements.txt +``` + +Install LogBatcher from PyPI +```bash +pip install logbatcher +``` + +OR, Install LogBatcher from source +```bash +pip install -e . +``` + +Set your **API Key** in `config.json` + +Note that if you find the access to specific API versions is lost, please refer to the following: + +To ensure the long-term reusability of LogBatcher, we recommend using OpenAI's latest released models. For example, as indicated on [Open AI](https://platform.openai.com/docs/deprecations), the GPT-3.5 series is soon to be deprecated, and it is recommended to switch to the newer gpt-4o-mini model. Additionally, we also support the open-source LLMs as the base model. You can use the API provided by [Together AI](https://www.together.ai/) to replace LogBatcher's base model with their commercially available open-source models (such as LLama 3.1, etc.). + +```json +"api_key_from_openai": "", +"api_key_from_together":"", +``` + +_To run with docker:_ + +Download the pre-installed docker image from our Zenodo repository, which also includes the source code, benchmarks and scripts. + +Zenodo repository DOI: [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.13752709.svg)](https://doi.org/10.5281/zenodo.13752709) + +Running the following command after downloading the pre-built Docker image: + +```bash +docker load -i logbatcher.tar +docker images +docker run -it logbatcher +``` + +Or you can build the docker image from the `Dockerfile` we provide: +```bash +docker build -t logbatcher . +docker images +docker run -it logbatcher +``` + +### Project Tree + +``` +📦LogBatcher + ┣ 📂datasets + ┃ ┣ 📂loghub-2k + ┃ ┃ ┣ 📂Android + ┃ ┃ ┃ ┣ 📜Android_2k.log + ┃ ┃ ┃ ┣ 📜Android_2k.log_structured.csv + ┃ ┃ ┃ ┣ 📜Android_2k.log_templates.csv + ┃ ┃ ┃ ┣ 📜Android_2k.log_structured_corrected.csv + ┃ ┃ ┃ ┗ 📜Android_2k.log_templates_corrected.csv + ┃ ┃ ┣ ... + ┃ ┗ 📂loghub-2.0 + ┣ 📂evaluation + ┃ ┣ 📂utils + ┃ ┣ 📜logbatcher_eval.py + ┃ ┗ 📜settings.py + ┣ 📂logbatcher + ┃ ┣ 📜additional_cluster.py + ┃ ┣ 📜cluster.py + ┃ ┣ 📜parser.py + ┃ ┣ 📜matching.py + ┃ ┣ 📜parsing_base.py + ┃ ┣ 📜postprocess.py + ┃ ┣ 📜sample.py + ┃ ┗ 📜util.py + ┣ 📂outputs + ┃ ┣ 📂figures + ┃ ┗ 📂parser + ┣ 📜README.md + ┣ 📜benchmark.py + ┣ 📜config.json + ┣ 📜requirements.txt + ┗ 📜demo.py +``` + +## Usage + +### Data format + +LogBatcher mainly takes **a raw log file** (in plain text format) as input and outputs the **parsed log file** (in CSV format). A **raw log file** is a log file with each line representing a complete log. + +Following the data format from [LOGPAI](https://github.com/logpai/loghub), the data can also be a **structured log file**. A **structured log file** is a CSV file that includes at least the `LineID` and `Content` columns for parsing, with optional `EventID` and `EventTemplate` columns for evaluation. + +### Usage example + +We provide a usage example for more convenient reuse, which is presented as follows. The usage example can be found in file `demo.py`. The example provides a test on a specific dataset **Apache** from [LOGPAI](https://github.com/logpai/loghub). If you want to evaluate LogBatcher on your own dataset, please replace the arguments `file_name` and `dataset_format` with your own raw log file path to load log data and the corresponding dataset format to extract the contents. Run `python demo.py` and find the results in `outputs/parser/test` folder. + +```python +import json +from logbatcher.parsing_base import single_dataset_paring +from logbatcher.parser import Parser +from logbatcher.util import data_loader + +# load api key, dataset format and parser +model, dataset, folder_name ='gpt-3.5-turbo-0125', 'Apache', 'test' +config = json.load(open('config.json', 'r')) +parser = Parser(model, folder_name, config) + +# load contents from raw log file, structured log file or content list +contents = data_loader( + file_name=f"datasets/loghub-2k/{dataset}/{dataset}_2k.log", + dataset_format= config['datasets_format'][dataset], + file_format ='raw' +) + +# parse logs +single_dataset_paring( + dataset=dataset, + contents=contents, + output_dir= f'outputs/parser/{folder_name}/', + parser=parser, + debug=False +) +``` + +
+Expected output + +``` +python demo.py +Parsing 2000 logs in dataset Apache... +100%|██████████████████████████████████| 2000/2000 [00:04<00:00, 420.55log/s] +parsing time: 4.756490230560303 +idetified templates: 6 +``` +
+ +### Example Evaluation + +To evaluate the output of the usage example, run the following command +```bash +cd evaluation && python logbatcher_eval.py --config test --dataset Apache +``` + +
+Expected output + + +``` +Calculating Edit Distance.... +100%|███████████████████████████████████████████████████████████| 2000/2000 [00:00<00:00, 4029110.47it/s] +Normalized_Edit_distance (NED): 1.0000, ED: 0.0000, +Grouping Accuracy calculation done. [Time taken: 0.002] +Start compute grouping accuracy +100%|███████████████████████████████████████████████████████████| 6/6 [00:00<00:00, 2084.64it/s] +Grouping_Accuracy (GA): 1.0000, FGA: 1.0000, +Grouping Accuracy calculation done. [Time taken: 0.006] +Parsing_Accuracy (PA): 1.0000 +Parsing Accuracy calculation done. [Time taken: 0.001] +100%|███████████████████████████████████████████████████████████| 6/6 [00:00<00:00, 10677.06it/s] +PTA: 1.0000, RTA: 1.0000 FTA: 1.0000 +Identify : 6, Groundtruth : 6 +Template-level accuracy calculation done. [Time taken: 0.003] +``` +
+ +The results of evaluation metrics can be found in `outputs/parser/test` folder + +## Benchmark + +### Prepare datasets + +We have already provided _loghub-2k_ datasets in `datasets/loghub-2.0` folder. + +if you want to benchmark on _Loghub-2.0_ datasets, please Run `datasets/loghub-2.0/download.sh` or download the datasets: + + +1. Datasets DOI: [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.8275861.svg)](https://doi.org/10.5281/zenodo.8275861) +2. Datasets Homepage: [Loghub-2.0](https://zenodo.org/records/8275861) + +### Reproduce + +To benchmark on all datasets in loghub-2k or loghub-2.0, you can run the following commands: +```bash +python benchmark.py --data_type [DATATYPE] --model [MODEL] --batch_size [BATCHSIZE] --chunk_size [CHUNKSIZE] --sampling_method [SAMPLINGMETHOD] +``` + +The description of the arguments can be found in `benchmark.py` or below: + +```bash +--data_type + Datasets type, Options: ['2k', 'full'], default: '2k'. +--model + the Large Lauguage model used in LogBatcher, default: 'gpt-3.5-turbo-0125'. +--batch_size + size of a batch query, default: 10. +--chunk_size + size of a log chunk, default: 2000. +--clustering_method + clustering method used in the partitioning stage, Options: ['dbscan', 'meanshift', 'hierarchical'], default: 'dbscan'. +--sampling_method + sampling method used in the batching stage, Options: ['dpp', 'similar', 'random'], default: 'dpp'. +``` + +### Benchmark Evaluation + +To evaluate the output of benchmark, run the following command +```bash +cd evaluation && python logbatcher_eval.py --config logbatcher_2k +``` + + +The expected results will be similar with that presented in the paper, also see [experimental_results](docs/experimental_results.md). + + +The description of the arguments: + +```bash +--config + The folder name of the outputs, Options: ['test', 'logbatcher_2k', 'logbatcher_full'] +--data_type + Datasets type, Options: ['2k', 'full'], default: '2k' +--dataset + To evaluate on a single dataset, default: 'null'. +``` diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/__init__.py b/src/detectmatelibrary/parsers/logbatcher/engine/__init__.py new file mode 100644 index 0000000..02e47b9 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/__init__.py @@ -0,0 +1,5 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/additional_cluster.py b/src/detectmatelibrary/parsers/logbatcher/engine/additional_cluster.py new file mode 100644 index 0000000..af6d61d --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/additional_cluster.py @@ -0,0 +1,185 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import annotations + +import re +import heapq +from collections import Counter +from typing import Any, Dict, FrozenSet, List, Optional, Tuple + +import time +import calendar +import random +import os +from sklearn.cluster import MeanShift +from sklearn.feature_extraction.text import TfidfVectorizer + + + +class Vocab: + def __init__(self, stopwords: Optional[List[str]] = None) -> None: + if stopwords is None: + stopwords = ["<*>"] + stopwords = [ + "a", + "an", + "and", + "i", + "ie", + "so", + "to", + "the", + + ] + list(calendar.day_name) + list(calendar.day_abbr) \ + + list(calendar.month_name) + list(calendar.month_abbr) + self.token_counter: Counter[str] = Counter() + self.stopwords: FrozenSet[str] = frozenset(set(stopwords)) + #print(self.__filter_stopwords(['LDAP', 'Built', 'with'])) + + def build(self, sequences: List[List[str]]) -> None: + print("Build vocab with examples: ", len(sequences)) + for sequence in sequences: + sequence = self.__filter_stopwords(sequence) + #print(sequence) + self.update(sequence) + + def update(self, sequence: List[str]) -> None: + sequence = self.__filter_stopwords(sequence) + self.token_counter.update(sequence) + + def topk_tokens(self, sequence: List[str], topk: int = 3) -> Tuple[str, ...]: + sequence = self.__filter_stopwords(sequence) + token_count = [(token, self.token_counter[token]) for token in set(sequence)] + topk_tuples = heapq.nlargest(topk, token_count, key=lambda x: x[1]) + topk_keys = tuple([t[0] for t in topk_tuples]) + return topk_keys + + def __len__(self) -> int: + return len(self.token_counter) + + def __filter_stopwords(self, sequence: List[str]) -> List[str]: + return [ + token + for token in sequence + if (len(token) > 2) and (token not in self.stopwords) + ] + + +def clean(s: str) -> Tuple[str, str]: + log_format = re.sub(r'[0-9A-Za-z, ]+', '', s) + unique_chars = list(set(log_format)) + sorted_string = ''.join(sorted(unique_chars)) + s = re.sub(r':|\(|\)|=|,|"|\{|\}|@|$|\[|\]|\||;|\.?!', ' ', s) + s = " ".join([word for word in s.strip().split() if not bool(re.search(r'\d', word))]) + # trantab = str.maketrans(dict.fromkeys(list(string.punctuation))) + return s, sorted_string + + +def h_clustering( + contents: Dict[int, Tuple[str, str]], +) -> Tuple[Dict[Tuple[str, ...], Dict[str, Any]], int, int]: + t1 = time.time() + vocab = Vocab() + vocab.build([v[0].split() for v in contents.values()]) + t2 = time.time() + # print("Build time: ", t2 - t1) + + # hierichical clustering + hierichical_clusters = {} + for k, v in contents.items(): + frequent_token = tuple(sorted(vocab.topk_tokens(v[0].split(), 3))) + log_format = v[1] + if frequent_token not in hierichical_clusters: + hierichical_clusters[frequent_token] = {"size": 1, "cluster": {log_format: [k]}} + else: + hierichical_clusters[frequent_token]["size"] = hierichical_clusters[frequent_token]["size"] + 1 + if log_format not in hierichical_clusters[frequent_token]["cluster"]: + hierichical_clusters[frequent_token]["cluster"][log_format] = [k] + else: + hierichical_clusters[frequent_token]["cluster"][log_format].append(k) + print("Number of coarse-grained clusters: ", len(hierichical_clusters.keys())) + total_coarse_clusters = len(hierichical_clusters.keys()) + total_fine_clusters = 0 + for k, v in hierichical_clusters.items(): + total_fine_clusters += len(hierichical_clusters[k]["cluster"]) + print("Number of fine-grained clusters: ", total_fine_clusters) + return hierichical_clusters, total_coarse_clusters, total_fine_clusters + + +def assign_labels( + clusters: Dict[Tuple[str, ...], Dict[str, Any]], logs: List[str], granularity: str = "coarse" +) -> List[int]: + # Initialize the labels list with -1 for all logs + labels = [-1] * len(logs) + + # Map each log ID to its cluster ID + cluster_id = 0 + for frequent_tokens, cluster_info in clusters.items(): + if granularity == "coarse": + # Assign cluster ID based on frequent tokens + for log_format, log_ids in cluster_info["cluster"].items(): + for log_id in log_ids: + labels[log_id] = cluster_id + cluster_id += 1 + elif granularity == "fine": + # Assign unique cluster ID for each log format within frequent tokens + for log_format, log_ids in cluster_info["cluster"].items(): + for log_id in log_ids: + labels[log_id] = cluster_id + cluster_id += 1 + + return labels + +def hierichical_clustering( + logs: List[str], granularity: str = "fine" +) -> Tuple[List[int], int]: + contents = {} + for i, x in enumerate(logs): + x, fx = clean(x) + if len(x.split()) > 1: + contents[i] = (x, fx) + clusters, a, b = h_clustering(contents) + labels = assign_labels(clusters, logs, granularity) + if granularity == "coarse": + return labels, a + else: + return labels, b + +def replace_numbers_with_zero(text: str) -> str: + return re.sub(r'\d+(\.\d+)?', '0', text) + + +def meanshift_clustering(logs: List[str]) -> Tuple[List[int], int]: + + text_column = [replace_numbers_with_zero(log) for log in logs] + + # Text preprocessing and vectorization + vectorizer = TfidfVectorizer() + data_matrix = vectorizer.fit_transform(text_column).toarray() + + # Mean Shift clustering + mean_shift = MeanShift(bandwidth=0.5) + labels = mean_shift.fit_predict(data_matrix).tolist() + return labels, max(labels) + 1 \ No newline at end of file diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/cluster.py b/src/detectmatelibrary/parsers/logbatcher/engine/cluster.py new file mode 100644 index 0000000..6e511e7 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/cluster.py @@ -0,0 +1,175 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import annotations + +from collections import OrderedDict +import re +from typing import List, Optional, Tuple + +import numpy as np +from scipy.sparse import spmatrix +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.metrics.pairwise import cosine_similarity +from sklearn.cluster import DBSCAN +from .sample import group_samples_clustering, dpp_sample +from .util import not_varibility +import random +class Cluster: + def __init__(self) -> None: + self.logs: List[str] = [] + self.batch_logs: List[str] = [] + self.indexs: List[int] = [] + self.size: int = 0 + self.sample_log: str = '' + + + def append_log(self, log: str, index: int) -> None: + self.logs.append(log) + self.indexs.append(index) + self.size += 1 + + def varaible_sampling(self, batch_size: int = 5, sample_method: str = "dpp") -> None: + self.batch_logs = list(OrderedDict.fromkeys(self.logs)) # remove duplicates + def _replacer(match: re.Match[str]) -> str: + char = match.group() + return '0' if char.isdigit() else 'a' + vars = [] + for var in self.batch_logs: + vars.append(re.sub(r'[0-9a-zA-Z]', _replacer, var)) + vectorizer = TfidfVectorizer() + try: + tfidf_matrix = vectorizer.fit_transform(vars) + tfidf_matrix = tfidf_matrix.toarray() + except Exception as e: + print("VARS", vars) + raise ValueError("Error during TF-IDF vectorization:", e) + + # sample + if len(self.batch_logs) <= batch_size: + result = range(len(self.batch_logs)) + elif sample_method == "dpp": + similarity_matrix = cosine_similarity(tfidf_matrix) + result = dpp_sample(similarity_matrix, batch_size) + elif sample_method == "random": + random.seed(0) + result = random.sample(range(0, len(self.batch_logs)), batch_size) + elif sample_method == "similar": + result = group_samples_clustering(tfidf_matrix, batch_size)[0] + else: + raise ValueError("Invalid sample method") + self.batch_logs = [self.batch_logs[i] for i in result] + + def batching(self, batch_size: int = 10, min_size: int = 3, sample_method: str = "dpp") -> None: + self.batch_logs = list(OrderedDict.fromkeys(self.logs)) # remove duplicates + if len(self.batch_logs) > batch_size: + self.sample(batch_size, sample_method) + if type(self.batch_logs) == str: + self.batch_logs = [self.batch_logs] + self.sample_log = self.batch_logs[0] + if not_varibility(self.batch_logs): + self.batch_logs = self.batch_logs[:min_size] if len(self.batch_logs) > min_size else self.batch_logs + + def sample(self, batch_size: int, sample_method: str) -> None: + # vetorize logs + vectorizer = TfidfVectorizer() + tfidf_matrix = vectorizer.fit_transform(self.batch_logs) + tfidf_matrix = tfidf_matrix.toarray() + + # sample + if sample_method == "dpp": + similarity_matrix = cosine_similarity(tfidf_matrix) + result = dpp_sample(similarity_matrix, batch_size) + elif sample_method == "random": + random.seed(0) + result = random.sample(range(0, len(self.batch_logs)), batch_size) + elif sample_method == "similar": + result = group_samples_clustering(tfidf_matrix, batch_size)[0] + else: + raise ValueError("Invalid sample method") + self.batch_logs = [self.batch_logs[i] for i in result] + return + +def tokenize(log_content: str, tokenize_pattern: str = r'[ ,|]', removeDight: bool = True) -> List[str]: + words = re.split(tokenize_pattern, log_content) + new_words = [] + for word in words: + if '=' in word: + ws = word.split('=') + if len(ws) <= 2: + new_words.append(ws[0]) + else: + # might be some parameters of a URL + pass + + elif removeDight and re.search(r'\d', word): + pass + elif '/' in word.lower() or re.match(r"^[a-zA-Z][+-]$|^[+-][a-zA-Z]$", word): + pass + else: + word = re.sub(r"\([^)]*\)", "", word) + new_words.append(word) + new_words = [word for word in new_words if word] # remove null + if new_words == []: + new_words.append(re.sub(r'\d+(\.\d+)?', '0', log_content)) + return new_words + + +def vectorize(tokenized_logs: List[List[str]]) -> spmatrix: + vectorizer = TfidfVectorizer(tokenizer=lambda x: x, lowercase=False, token_pattern=None) + return vectorizer.fit_transform(tokenized_logs) + + +def cluster(vectorized_logs: spmatrix, eps: float = 0.5) -> Tuple[np.ndarray, int]: + cluster = DBSCAN(eps=eps, min_samples=5) + cluster.fit(vectorized_logs) + labels = cluster.labels_ + cluster_nums = max(labels) + 1 + return labels, cluster_nums + + +def reassign_clusters( + labels: np.ndarray, cluster_nums: int, tokenized_logs: List[List[str]] +) -> Tuple[np.ndarray, int]: + mergerd_logs = [] + for tokenized_log in tokenized_logs: + mergerd_logs.append(' '.join(tokenized_log)) + + for i in range(len(labels)): + if labels[i] == -1: + for j in range(i+1, len(labels)): + if labels[j] == -1 and mergerd_logs[i] == mergerd_logs[j]: + labels[j] = cluster_nums + labels[i] = cluster_nums + cluster_nums += 1 + return labels, cluster_nums + +def process_new_cluster( + new_cluster: Cluster, clusters: List[Optional[Cluster]], batch_size: int, min_size: int = 3 +) -> int: + if new_cluster.size != 0: + new_cluster.batching(batch_size, min_size) + clusters.append(new_cluster) + return 1 + return 0 \ No newline at end of file diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/matching.py b/src/detectmatelibrary/parsers/logbatcher/engine/matching.py new file mode 100644 index 0000000..1fd2a05 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/matching.py @@ -0,0 +1,110 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import annotations + +import re +from types import FrameType +from typing import Optional, Tuple + +from .cluster import Cluster + +import signal + +class TimeoutException(Exception): + pass + +def timeout_handler(_signum: int, _frame: Optional[FrameType]) -> None: + raise TimeoutException() + +def safe_search(pattern: str, string: str, timeout: float = 0.5) -> Optional[re.Match[str]]: + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(timeout) + try: + result = re.search(pattern, string) + except TimeoutException: + result = None + finally: + signal.alarm(0) + return result + + +# @timeout(10) +def extract_variables(log: str, template: str) -> Optional[Tuple[str, ...]]: + log = re.sub(r'\s+', ' ', log.strip()) # DS + pattern_parts = template.split("<*>") + pattern_parts_escaped = [re.escape(part) for part in pattern_parts] + regex_pattern = "(.*?)".join(pattern_parts_escaped) + regex = "^" + regex_pattern + "$" + # matches = re.search(regex, log) + matches = safe_search(regex, log, 1) + if matches: + return matches.groups() + else: + return None + +def matches_template(log: str, cached_pair: Tuple[str, str]) -> Optional[str]: + + reference_log = cached_pair[0] + template = cached_pair[1] + + # length matters + if abs(len(log.split()) - len(reference_log.split())) > 1: + return None + + try: + groups = extract_variables(log, template) + except: + groups = None + if groups == None: + return None + + # consider the case where the varaible is empty + parts = [] + for index, part in enumerate(template.split("<*>")): + parts.append(part) + if index < len(groups): + if groups[index] == '': + parts.append('') + else: + parts.append('<*>') + + return ''.join(parts) + + + +def prune_from_cluster(template: str, cluster: Cluster) -> Tuple[Cluster, Cluster]: + + new_cluster = Cluster() + logs, indexs = cluster.logs, cluster.indexs + for log, index in zip(logs, indexs): + if extract_variables(log, template) == None: + new_cluster.append_log(log, index) + if new_cluster.size != 0: + old_logs = [log for log in logs if log not in new_cluster.logs] + old_indexs = [index for index in indexs if index not in new_cluster.indexs] + cluster.logs = old_logs + cluster.indexs = old_indexs + # print(f"prune {new_cluster.size} logs from {len(logs)} logs in mathcing process") + return cluster, new_cluster \ No newline at end of file diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/parser.py b/src/detectmatelibrary/parsers/logbatcher/engine/parser.py new file mode 100644 index 0000000..76fcb98 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/parser.py @@ -0,0 +1,120 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import annotations + +import time +from typing import Dict, List, Tuple + +from openai import OpenAI +from tenacity import retry, stop_after_attempt, wait_random_exponential +from tools.logging import logger +from .cluster import Cluster +from .postprocess import post_process +from .matching import prune_from_cluster +from .postprocess import correct_single_template +from .util import verify_template, count_message_tokens +from .parsing_cache import ParsingCache + +class Parser: + + def __init__(self, model: str, theme: str, config: Dict[str, str]) -> None: + + self.model: str = model + self.theme: str = theme + self.dataset: str = 'null' + self.token_list: List[int] = [0, 0] + self.time_consumption_llm: float = 0 + self.api_key = config['api_key'] + base_url = config.get('base_url') or None + self.client = OpenAI( + api_key=self.api_key, + base_url=base_url, + ) + + @retry(wait=wait_random_exponential(min=1, max=8), stop=stop_after_attempt(10)) + def chat(self, messages: List[Dict[str, str]]) -> str: + response = self.client.chat.completions.create( + model=self.model, + messages=messages, + temperature=0.05, + ) + return response.choices[0].message.content.strip('\n') + + def get_responce(self, cluster: Cluster, cache_base: ParsingCache) -> Tuple[str, Cluster, Cluster]: + + # initialize + logs = cluster.batch_logs + sample_log = cluster.sample_log + + # Matching and Pruning + new_cluster = Cluster() + for log in cluster.logs: + template, _, _ = cache_base.match_event(log) + if template != "NoMatch": + cluster, new_cluster = prune_from_cluster( + template, cluster) + if new_cluster.size >= 0 and new_cluster.size < cluster.size: + return template, cluster, new_cluster + elif new_cluster.size == cluster.size: + cluster.logs, cluster.indexs = new_cluster.logs, new_cluster.indexs + new_cluster = Cluster() + + # historical variables + variable_cluster = Cluster() + variable_cluster.logs = cache_base.variable_candidates + if variable_cluster.logs != []: + variable_cluster.varaible_sampling(5) + variables = variable_cluster.batch_logs + + variable_prompt = f' Historical variables: {variables}.' if variables != [] else '' + instruction = "You will be provided with some log messages separated by line break. You must abstract variables with `{{placeholders}}` to extract the corresponding template. The variable type in log messages can be any of the following: ['url', 'IPv4_port', 'host_port', 'package_host', 'IPv6', 'Mac_address', 'time', 'path', 'id', 'date', 'duration', 'size', 'numerical', 'weekday_months', 'user_name']." + variable_prompt + " Constant text and strings should not be recognized as variables.\nPrint the input log's template delimited by backticks." + + # invoke LLM + messages = [ + {"role": "system", "content": instruction}, + {"role": "user", "content": '\n'.join(f'Log[{i+1}]: `{log}`' for i, log in enumerate(logs))} + ] + try: + t0 = time.time() + answer = self.chat(messages) + # print(messages) + # print(answer) + self.token_list[0] += 1 + self.token_list[1] += count_message_tokens(messages, self.model) + self.time_consumption_llm += (time.time() - t0) + except Exception as e: + logger.error(f"invoke LLM error: {e}") + answer = sample_log + + template = post_process(answer) + if not verify_template(template): + template = correct_single_template(sample_log) + + cluster, new_cluster = prune_from_cluster(template, cluster) + if new_cluster.size == cluster.size: + cluster.logs, cluster.indexs = new_cluster.logs, new_cluster.indexs + new_cluster = Cluster() + template = correct_single_template(sample_log) + return template, cluster, new_cluster diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/parsing_base.py b/src/detectmatelibrary/parsers/logbatcher/engine/parsing_base.py new file mode 100644 index 0000000..25035a1 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/parsing_base.py @@ -0,0 +1,220 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# Changes from original (parsing_base_old.py): +# - Returns a result dict (logs_df, templates_df, cache, metrics, template_samples) +# instead of writing CSV/JSON files directly to disk. +# - Replaced print() calls with structured logger (tools.logging.logger). +# - Made `cache` an optional parameter to support reuse across calls. +# - Added _extract_template_samples() helper to extract template→sample-log mappings. +# - Default chunk_size raised from 10 000 to 30 000. + +import time +import pandas as pd +from collections import Counter +from typing import List, Dict, Any, Optional +from tqdm import tqdm +from tools.logging import logger +from .vars import vars_update +from .cluster import Cluster,tokenize, vectorize, cluster, reassign_clusters, process_new_cluster +from .additional_cluster import hierichical_clustering,meanshift_clustering +from .util import verify_template +from .parsing_cache import ParsingCache + +def _extract_template_samples(cache: ParsingCache) -> Dict[str, str]: + """Extract template to sample log mapping from cache. + + Args: + cache: ParsingCache instance containing template_tree + + Returns: + Dictionary mapping template strings to their sample logs + """ + template_samples = {} + + def traverse_tree(node): + """Recursively traverse template tree to find all templates.""" + for key, value in node.items(): + if isinstance(value, tuple): + # Tuple structure: (stat_len, wildcard_count, template, template_id, refer_log) + template = value[2] # event_template + refer_log = value[4] # sample log + template_samples[template] = refer_log + elif isinstance(value, dict): + traverse_tree(value) + + traverse_tree(cache.template_tree) + return template_samples + +def single_dataset_parsing( + dataset: str, + contents: List[str], + parser: Any, + cache: Optional[ParsingCache] = None, + batch_size: int = 10, # number of logs that can be sent to LLM at once + chunk_size: int = 30000, + clustering_method: str = 'dbscan', + debug: bool = True +) -> Dict[str, Any]: + """Parse logs using clustering and LLM-based template extraction. + + Args: + dataset: Name of the dataset being parsed + contents: List of log messages to parse + parser: Parser object with get_responce method + cache: Optional ParsingCache instance for template caching + batch_size: Size of batches for processing clusters + chunk_size: Number of logs to process in each chunk + clustering_method: Method for clustering ('dbscan', 'hierarchical', or 'meanshift') + debug: Enable debug logging + + Returns: + Dictionary containing: + - logs_df: DataFrame with Content and EventTemplate columns + - templates_df: DataFrame with EventId, EventTemplate, and Occurrence columns + - cache: Updated ParsingCache instance + - metrics: Dictionary with parsing statistics + - template_samples: Dictionary mapping templates to sample logs + """ + if cache is None: + cache = ParsingCache() + + logs = contents + log_chunk: List[str] = [] + log_chunk_index: List[int] = [] + + logger.info(f'Parsing {len(logs)} logs in dataset {dataset}...') + + outputs: List[Optional[str]] = [None for _ in range(len(logs))] + outputs_index: List[Optional[int]] = [None for _ in range(len(logs))] + + # Parsing + t1 = time.time() + iterable = tqdm(enumerate(logs), total=len(logs), unit="log") + for index, log in iterable: + + match_results = cache.match_event(log) + if match_results[0] != "NoMatch": + # outputs[index] = match_results[0] + outputs_index[index] = match_results[1] + else: + log_chunk.append(log) + log_chunk_index.append(index) + + + # Parsing with LLM + if len(log_chunk) == chunk_size or (len(log_chunk)!=0 and index == len(logs) - 1): + # parsing start + if debug: + logger.debug(f'Parsing {len(log_chunk)} logs...') + if clustering_method == 'dbscan': + # tokenize -> vectorize -> cluster -> reassign_clusters + tokenized_logs = [tokenize(log) for log in log_chunk] + labels, cluster_nums = cluster(vectorize(tokenized_logs)) + labels, cluster_nums = reassign_clusters(labels, cluster_nums, tokenized_logs) + elif clustering_method == 'hierarchical': + labels, cluster_nums = hierichical_clustering(log_chunk) + elif clustering_method == 'meanshift': + labels, cluster_nums = meanshift_clustering(log_chunk) + else: + raise ValueError('Invalid clustering method') + + # create clusters + clusters: List[Optional[Cluster]] = [None for _ in range(cluster_nums)] + for i, label in enumerate(labels): + if clusters[label] is None: + clusters[label] = Cluster() + clusters[label].append_log(log_chunk[i], log_chunk_index[i]) + + # sorting + clusters = sorted(clusters, key=lambda cluster: len(cluster.logs), reverse=True) + + # batching + [cluster.batching(batch_size) for cluster in clusters] + + # parsing + # print(len(clusters), 'clusters identified') if debug else None + for index, old_cluster in enumerate(clusters): + template, old_cluster, new_cluster = parser.get_responce(old_cluster, cache_base = cache) + # update clusters + cluster_nums += process_new_cluster(new_cluster, clusters, batch_size) + refer_log = old_cluster.logs[0] + if template not in cache.template_list: + if verify_template(template): + if debug: + logger.debug('=' * 20) + logger.debug(f'New cluster processed, {len(set(cache.template_list))} templates identified till now:') + logger.debug(f'Refer Log: {refer_log}') + logger.debug(f'Output Template: {template}') + id, _, _ = cache.add_templates(event_template=template, insert=False, refer_log = refer_log) + cache.variable_candidates.extend(vars_update(refer_log, template, cache.variable_candidates)) + else: + id, _, _ = cache.add_templates(event_template=refer_log, insert=False, refer_log = refer_log) + else: + id = cache.template_list.index(template) + for index in old_cluster.indexs: + outputs_index[index] = id + log_chunk = [] + log_chunk_index = [] + + outputs = [cache.template_list[i] for i in outputs_index] + t2 = time.time() + parsing_time = t2 - t1 + template_count = len(set(outputs)) + + logger.info(f'Parsing complete: {parsing_time:.3f}s, {template_count} unique templates identified') + + # Create structured logs DataFrame + logs_df = pd.DataFrame({'Content': logs, 'EventTemplate': outputs}) + + # Create templates DataFrame + counter = Counter(outputs) + items = list(counter.items()) + items.sort(key=lambda x: x[1], reverse=True) + templates_df = pd.DataFrame(items, columns=['EventTemplate', 'Occurrence']) + templates_df['EventId'] = [f"E{i + 1}" for i in range(len(templates_df))] + templates_df = templates_df[['EventId', 'EventTemplate', 'Occurrence']] + + # Extract template-to-sample-log mapping + template_samples = _extract_template_samples(cache) + + # Collect metrics + metrics = { + 'dataset': dataset, + 'parsing_time': round(parsing_time, 3), + 'llm_invocation_time': round(parser.time_consumption_llm, 3), + 'cache_hit_num': cache.hit_num, + 'hash_table_size': len(cache.hashing_cache), + 'token_stats': parser.token_list, + 'template_count': template_count, + 'log_count': len(logs) + } + + return { + 'logs_df': logs_df, + 'templates_df': templates_df, + 'cache': cache, + 'metrics': metrics, + 'template_samples': template_samples, + } \ No newline at end of file diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/parsing_cache.py b/src/detectmatelibrary/parsers/logbatcher/engine/parsing_cache.py new file mode 100644 index 0000000..6cd7a22 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/parsing_cache.py @@ -0,0 +1,416 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import annotations + +from hashlib import sha256 +import re +import sys +from types import FrameType +from typing import Any, Dict, List, Match, Optional, Tuple + +sys.setrecursionlimit(1000000) + +import re +import signal + +class TimeoutException(Exception): + pass + +def timeout_handler(_signum: int, _frame: Optional[FrameType]) -> None: + raise TimeoutException() + +def safe_search(pattern: str, string: str, timeout: int = 1) -> Optional[Match[str]]: + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(timeout) + try: + result = re.search(pattern, string) + except TimeoutException: + result = None + finally: + signal.alarm(0) + return result + +# _PATTERN = re.compile(r'(?:<\*>|\b\d+\b|[\s\/,:._-]+)') +# def old_standardize(log: str) -> str: +# return _PATTERN.sub('', log) + +# TODO: logb2 v3.1 +_PATTERN1 = re.compile(r'/([^/]*)(?=/)') # path +_PATTERN2 = re.compile(r'\d') # digit +_PATTERN3 = re.compile(r'[\/:,._-]+') # : , . _ - +_PATTERN4 = re.compile(r'\s') # space + +def standardize(input_string: str) -> str: + result = _PATTERN1.sub('', input_string) + result = _PATTERN2.sub('', result) + result = _PATTERN3.sub('', result) + result = _PATTERN4.sub('', result) + return result + +def print_tree(move_tree: Dict[str, Any], indent: str = ' ') -> None: + for key, value in move_tree.items(): + if isinstance(value, dict): + print(f'{indent}|- {key}') + print_tree(value, indent + '| ') + elif isinstance(value, tuple): + print(f'{indent}|- {key}: tuple') + else: + print(f'{indent}|- {key}: {value}') + + +def lcs_similarity(X: List[str], Y: List[str]) -> float: + m, n = len(X), len(Y) + c = [[0] * (n + 1) for _ in range(m + 1)] + for i in range(1, m + 1): + for j in range(1, n + 1): + if X[i - 1] == Y[j - 1]: + c[i][j] = c[i - 1][j - 1] + 1 + else: + c[i][j] = max(c[i][j - 1], c[i - 1][j]) + return 2 * c[m][n] / (m + n) + + +class ParsingCache: + def __init__(self) -> None: + self.template_tree: Dict[str, Any] = {} + self.template_list: List[str] = [] + self.hashing_cache: Dict[str, Tuple[str, str, int]] = {} + self.variable_candidates: List[str] = [] + self.hit_num: int = 0 + + def add_templates( + self, + event_template: str, + insert: bool = True, + relevant_templates: Optional[List[str]] = None, + refer_log: str = '', + ) -> Tuple[int, Optional[str], Optional[bool]]: + + # if "<*>" not in event_template: + # self.template_tree["$CONSTANT_TEMPLATE$"][event_template] = event_template + # continue + # original_template = event_template + # event_template = self._preprocess_template(event_template) + #print("event template after preprocess: ", event_template) + if relevant_templates is None: + relevant_templates = [] + template_tokens = message_split(event_template) + if not template_tokens or event_template == "<*>": + return -1, None, None + if insert or len(relevant_templates) == 0: + id = self.insert(event_template, template_tokens, len(self.template_list), refer_log) + self.template_list.append(event_template) + return id,None,None + # print("relevant templates: ", relevant_templates) + max_similarity = 0 + similar_template = None + for rt in relevant_templates: + splited_template1, splited_template2 = rt.split(), event_template.split() + if len(splited_template1) != len(splited_template2): + continue + similarity = lcs_similarity(splited_template1, splited_template2) + if similarity > max_similarity: + max_similarity = similarity + similar_template = rt + if max_similarity > 0.8: + success, id = self.modify(similar_template, event_template, refer_log) + if not success: + id = self.insert(event_template, template_tokens, len(self.template_list), refer_log) + self.template_list.append(event_template) + return id, similar_template, success + else: + id = self.insert(event_template, template_tokens, len(self.template_list), refer_log) + self.template_list.append(event_template) + return id,None,None + #print("template tokens: ", template_tokens) + + def insert(self, event_template: str, template_tokens: List[str], template_id: int, refer_log: str = '') -> int: + + standardized = standardize(event_template) + hash_key = sha256(standardized.encode()).hexdigest() + self.hashing_cache[hash_key] = (standardized, event_template, template_id) + + start_token = template_tokens[0] + if start_token not in self.template_tree: + self.template_tree[start_token] = {} + move_tree = self.template_tree[start_token] + + tidx = 1 + while tidx < len(template_tokens): + token = template_tokens[tidx] + if token not in move_tree: + move_tree[token] = {} + move_tree = move_tree[token] + tidx += 1 + + move_tree["".join(template_tokens)] = ( + sum(1 for s in template_tokens if s != "<*>"), + template_tokens.count("<*>"), + event_template, + template_id, + refer_log + ) # statistic length, count of <*>, original_log, template_id + return template_id + + def modify(self, similar_template: str, event_template: str, refer_log: str) -> Tuple[bool, int]: + merged_template = [] + similar_tokens = similar_template.split() + event_tokens = event_template.split() + i = 0 + for token in similar_tokens: + if token == event_tokens[i]: + merged_template.append(token) + else: + merged_template.append("<*>") + i += 1 + merged_template = " ".join(merged_template) + success, old_ids = self.delete(similar_template) + if not success: + return False, -1 + self.insert(merged_template, message_split(merged_template), old_ids, refer_log) + self.template_list[old_ids] = merged_template + return True, old_ids + + + def delete(self, event_template: str) -> Tuple[bool, int | List[Any]]: + template_tokens = message_split(event_template) + start_token = template_tokens[0] + if start_token not in self.template_tree: + return False, [] + move_tree = self.template_tree[start_token] + + tidx = 1 + while tidx < len(template_tokens): + token = template_tokens[tidx] + if token not in move_tree: + return False, [] + move_tree = move_tree[token] + tidx += 1 + old_id = move_tree["".join(template_tokens)][3] + del move_tree["".join(template_tokens)] + return True, old_id + + + def match_event(self, log: str) -> Tuple[str, Any, List[str]]: + standardized = standardize(log) + hash_key = sha256(standardized.encode()).hexdigest() + if hash_key in self.hashing_cache: + cached_str, template, id = self.hashing_cache[hash_key] + if cached_str == standardized: + self.hit_num += 1 + return template, id, [] + results = tree_match(self.template_tree, self.template_list, log) + if results[0] != "NoMatch": + standardized = standardize(log) + hash_key = sha256(standardized.encode()).hexdigest() + self.hashing_cache[hash_key] = (standardized, results[0], results[1]) + return results + + + def _preprocess_template(self, template: str) -> str: + return template + + +def post_process_tokens(tokens: List[str], punc: str) -> List[str]: + excluded_str = ['=', '|', '(', ')', ";"] + for i in range(len(tokens)): + if tokens[i].find("<*>") != -1: + tokens[i] = "<*>" + else: + new_str = "" + for s in tokens[i]: + if (s not in punc and s != ' ') or s in excluded_str: + new_str += s + tokens[i] = new_str + return tokens + + +def message_split(message: str) -> List[str]: + punc = "!\"#$%&'()+,-/;:=?@.[\\]^_`{|}~" + splitters = "\\s\\" + "\\".join(punc) + splitter_regex = re.compile("([{}])".format(splitters)) + tokens = re.split(splitter_regex, message) + + tokens = list(filter(lambda x: x != "", tokens)) + + #print("tokens: ", tokens) + tokens = post_process_tokens(tokens, punc) + + tokens = [ + token.strip() + for token in tokens + if token != "" and token != ' ' + ] + tokens = [ + token + for idx, token in enumerate(tokens) + if not (token == "<*>" and idx > 0 and tokens[idx - 1] == "<*>") + ] + return tokens + + + +def tree_match(match_tree: Dict[str, Any], template_list: List[str], log_content: str) -> Tuple[str, Any, List[str]]: + log_tokens = message_split(log_content) + template, template_id, refer_log, relevant_templates = match_template(match_tree, log_tokens) + # length matters + if template: + if abs(len(log_content.split()) - len(refer_log.split())) <= 1: + return (template, template_id, relevant_templates) + elif len(relevant_templates) > 0: + if match_log(log_content, relevant_templates[0]): + return (relevant_templates[0], template_list.index(relevant_templates[0]), relevant_templates) + return ("NoMatch", "NoMatch", relevant_templates) + +def match_log(log: str, template: str) -> bool: + pattern_parts = template.split("<*>") + pattern_parts_escaped = [re.escape(part) for part in pattern_parts] + regex_pattern = "(.*?)".join(pattern_parts_escaped) + regex = "^" + regex_pattern + "$" + matches = safe_search(regex, log) + + if matches == None: + return False + else: + return True #all(len(var.split()) == 1 for var in matches.groups()) + +def match_template( + match_tree: Dict[str, Any], log_tokens: List[str] +) -> Tuple[Any, Any, str, List[str]]: + results = [] + find_results = find_template(match_tree, log_tokens, results, [], 1) + relevant_templates = find_results[1] + if len(results) > 1: + new_results = [] + for result in results: + if result[0] is not None and result[1] is not None and result[2] is not None: + new_results.append(result) + else: + new_results = results + if len(new_results) > 0: + if len(new_results) > 1: + new_results.sort(key=lambda x: (-x[1][0], x[1][1])) + return new_results[0][1][2], new_results[0][1][3], new_results[0][1][4], relevant_templates + return False, False, '', relevant_templates + + +def get_all_templates(move_tree: Dict[str, Any]) -> List[str]: + result = [] + for key, value in move_tree.items(): + if isinstance(value, tuple): + result.append(value[2]) + else: + result = result + get_all_templates(value) + return result + + +def find_template( + move_tree: Dict[str, Any], + log_tokens: List[str], + result: List[Tuple[Any, ...]], + parameter_list: List[str], + depth: int, +) -> Tuple[bool, List[str]]: + flag = 0 # no futher find + if len(log_tokens) == 0: + for key, value in move_tree.items(): + if isinstance(value, tuple): + result.append((key, value, tuple(parameter_list))) + flag = 2 # match + if "<*>" in move_tree: + parameter_list.append("") + move_tree = move_tree["<*>"] + if isinstance(move_tree, tuple): + result.append(("<*>", None, None)) + flag = 2 # match + else: + for key, value in move_tree.items(): + if isinstance(value, tuple): + result.append((key, value, tuple(parameter_list))) + flag = 2 # match + # return (True, []) + else: + token = log_tokens[0] + + relevant_templates = [] + if token in move_tree: + find_result = find_template(move_tree[token], log_tokens[1:], result, parameter_list,depth+1) + if find_result[0]: + flag = 2 # match + elif flag != 2: + flag = 1 # futher find but no match + relevant_templates = relevant_templates + find_result[1] + if "<*>" in move_tree: + if isinstance(move_tree["<*>"], dict): + next_keys = move_tree["<*>"].keys() + next_continue_keys = [] + for nk in next_keys: + nv = move_tree["<*>"][nk] + if not isinstance(nv, tuple): + next_continue_keys.append(nk) + idx = 0 + # print("len : ", len(log_tokens)) + while idx < len(log_tokens): + token = log_tokens[idx] + # print("try", token) + if token in next_continue_keys: + # print("add", "".join(log_tokens[0:idx])) + parameter_list.append("".join(log_tokens[0:idx])) + # print("End at", idx, parameter_list) + find_result = find_template( + move_tree["<*>"], log_tokens[idx:], result, parameter_list,depth+1 + ) + if find_result[0]: + flag = 2 # match + elif flag != 2: + flag = 1 # futher find but no match + relevant_templates = relevant_templates + find_result[1] + if parameter_list: + parameter_list.pop() + next_continue_keys.remove(token) + idx += 1 + if idx == len(log_tokens): + parameter_list.append("".join(log_tokens[0:idx])) + find_result = find_template( + move_tree["<*>"], log_tokens[idx + 1 :], result, parameter_list,depth+1 + ) + if find_result[0]: + flag = 2 # match + else: + if flag != 2: + flag = 1 + # relevant_templates = relevant_templates + find_result[1] + if parameter_list: + parameter_list.pop() + if flag == 2: + return (True, []) + if flag == 1: + return (False, relevant_templates) + if flag == 0: + # print(log_tokens, flag) + if depth >= 2: + return (False, get_all_templates(move_tree)) + else: + return (False, []) \ No newline at end of file diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/postprocess.py b/src/detectmatelibrary/parsers/logbatcher/engine/postprocess.py new file mode 100644 index 0000000..d3868fd --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/postprocess.py @@ -0,0 +1,195 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import annotations + +import re +from typing import Optional, Set + +def post_process(response: str) -> str: + + response = response.replace('\n', '') + first_backtick_index = response.find('`') + last_backtick_index = response.rfind('`') + if first_backtick_index == -1 or last_backtick_index == -1 or first_backtick_index == last_backtick_index: + tmps = [] + else: + tmps = response[first_backtick_index: last_backtick_index + 1].split('`') + for tmp in tmps: + if tmp.replace(' ','').replace('<*>','') == '': + tmps.remove(tmp) + tmp = '' + if len(tmps) == 1: + tmp = tmps[0] + if len(tmps) > 1: + tmp = max(tmps, key=len) + + template = re.sub(r'\{\{.*?\}\}', '<*>', tmp) + template = re.sub(r'\$\{.*?\}', '<*>', template) + template = correct_single_template(template) + if template.replace('<*>', '').replace(' ','') == '': + template = '' + + return template + +def exclude_digits(string: str) -> bool: + ''' + exclude the digits-domain words from partial constant + ''' + pattern = r'\d' + digits = re.findall(pattern, string) + if len(digits) == 0 or string[0].isalpha() or any(c.isupper() for c in string): + return False + elif len(digits) >= 4: + return True + else: + return len(digits) / len(string) > 0.3 + +def correct_single_template(template: str, user_strings: Optional[Set[str]] = None) -> str: + """Apply all rules to process a template. + + DS (Double Space) + BL (Boolean) + US (User String) + DG (Digit) + PS (Path-like String) + WV (Word concatenated with Variable) + DV (Dot-separated Variables) + CV (Consecutive Variables) + + """ + + boolean = {'true', 'false'} + default_strings = {'null', 'root'} # 'null', 'root', 'admin' + path_delimiters = { # reduced set of delimiters for tokenizing for checking the path-like strings + r'\s', r'\,', r'\!', r'\;', r'\:', + r'\=', r'\|', r'\"', r'\'', r'\+', + r'\[', r'\]', r'\(', r'\)', r'\{', r'\}' + } + token_delimiters = path_delimiters.union({ # all delimiters for tokenizing the remaining rules + r'\.', r'\-', r'\@', r'\#', r'\$', r'\%', r'\&', r'\/' + }) + + if user_strings: + default_strings = default_strings.union(user_strings) + # default_strings = {} + + # apply DS + # Note: this is not necessary while postprorcessing + template = template.strip() + template = re.sub(r'\s+', ' ', template) + + # apply PS + p_tokens = re.split('(' + '|'.join(path_delimiters) + ')', template) + new_p_tokens = [] + for p_token in p_tokens: + # print(p_token) + # if re.match(r'^(\/[^\/]+)+$', p_token) or re.match(r'^([a-zA-Z0-9-]+\.){2,}[a-zA-Z]+$', p_token): + if re.match(r'^(\/[^\/]+)+\/?$', p_token) or re.match(r'.*/.*\..*', p_token) or re.match(r'^([a-zA-Z0-9-]+\.){3,}[a-z]+$', p_token): + # or re.match(r'^([a-z0-9-]+\.){2,}[a-z]+$', p_token) + p_token = '<*>' + + new_p_tokens.append(p_token) + template = ''.join(new_p_tokens) + # tokenize for the remaining rules + tokens = re.split('(' + '|'.join(token_delimiters) + ')', template) # tokenizing while keeping delimiters + new_tokens = [] + for token in tokens: + # apply BL, US + for to_replace in boolean.union(default_strings): + # if token.lower() == to_replace.lower(): + if token == to_replace: + token = '<*>' + + # apply DG + # Note: hexadecimal num also appears a lot in the logs + # if re.match(r'^\d+$', token) or re.match(r'\b0[xX][0-9a-fA-F]+\b', token): + # token = '<*>' + if exclude_digits(token): + token = '<*>' + + # apply WV + if re.match(r'^[^\s\/]*<\*>[^\s\/]*$', token) or re.match(r'^<\*>.*<\*>$', token): + token = '<*>' + # collect the result + new_tokens.append(token) + + # make the template using new_tokens + template = ''.join(new_tokens) + + # Substitute consecutive variables only if separated with any delimiter including "." (DV) + while True: + prev = template + template = re.sub(r'<\*>\.<\*>', '<*>', template) + if prev == template: + break + + # Substitute consecutive variables only if not separated with any delimiter including space (CV) + # NOTE: this should be done at the end + while True: + prev = template + template = re.sub(r'<\*><\*>', '<*>', template) + if prev == template: + break + + while "#<*>#" in template: + template = template.replace("#<*>#", "<*>") + + while "<*>:<*>" in template: + template = template.replace("<*>:<*>", "<*>") + + while "<*>/<*>" in template: + template = template.replace("<*>/<*>", "<*>") + + while " #<*> " in template: + template = template.replace(" #<*> ", " <*> ") + + while "<*>:<*>" in template: + template = template.replace("<*>:<*>", "<*>") + + while "<*>#<*>" in template: + template = template.replace("<*>#<*>", "<*>") + + while "<*>/<*>" in template: + template = template.replace("<*>/<*>", "<*>") + + while "<*>@<*>" in template: + template = template.replace("<*>@<*>", "<*>") + + while "<*>.<*>" in template: + template = template.replace("<*>.<*>", "<*>") + + while ' "<*>" ' in template: + template = template.replace(' "<*>" ', ' <*> ') + + while " '<*>' " in template: + template = template.replace(" '<*>' ", " <*> ") + + while "<*><*>" in template: + template = template.replace("<*><*>", "<*>") + + template = re.sub(r'<\*> [KGTM]?B\b', '<*>', template) + + return template + diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/sample.py b/src/detectmatelibrary/parsers/logbatcher/engine/sample.py new file mode 100644 index 0000000..d0444de --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/sample.py @@ -0,0 +1,140 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import annotations + +from typing import Any, List, Tuple + +from sklearn.feature_extraction.text import TfidfVectorizer +from sklearn.metrics.pairwise import cosine_similarity +import random +from sklearn.cluster import KMeans +import numpy as np + + +def dpp_sample(S: np.ndarray, k: int) -> List[int]: + # S: similarity matrix + # k: number of items to sample + n = S.shape[0] + + # Initialize empty set Y + Y = set() + for _ in range(k): + best_i = -1 + best_p = -1 + + for i in range(n): + if i not in Y: + # Compute determinant of submatrix + det_Yi = np.linalg.det(S[np.ix_(list(Y) + [i], list(Y) + [i])]) + + # Compute probability of adding i to Y + p_add = det_Yi / (1 + det_Yi) + + if p_add > best_p: + best_p = p_add + best_i = i + + # Add best item to Y + Y.add(best_i) + + return list(Y) + + +def sample_from_clusters(clusters: List[Any], shot: int = 32) -> List[Tuple[str, str]]: + clusters = sorted(clusters, key=lambda cluster: len(cluster.indexs), reverse=True) + # form a random list + random.seed(0) + random_int_list = [random.randint(0, 1000) for _ in range(10)] + + sample_clusters = [] + sample_pairs = [] + for cluster in clusters: + if len(sample_clusters) >= shot: + break + if cluster.oracle_template not in [pair[1] for pair in sample_clusters]: + sample_clusters.append((cluster, cluster.oracle_template)) + + for random_int in random_int_list: + if len(sample_pairs) >= shot: + break + for item in sample_clusters: + length = len(item[0].logs) + if len(sample_pairs) >= shot: + break + else: + sample_pairs.append((item[0].logs[random_int%length], item[1])) + return sample_pairs + + +def nearest_k_pairs_from_log( + log: str, sample_pairs: List[Tuple[str, str]], k: int +) -> List[Tuple[str, str]]: + vectorizer = TfidfVectorizer() + tfidf_matrix = vectorizer.fit_transform([log] + [pair[0] for pair in sample_pairs]) + similarity_matrix = cosine_similarity(tfidf_matrix) + similarity = similarity_matrix[0][1:] + nearest_k_indices = similarity.argsort()[-k:][::-1] + nearest_k_pairs = [sample_pairs[i] for i in nearest_k_indices] + return nearest_k_pairs + + + +def group_samples_clustering(embed_matrix: np.ndarray, num_in_batch: int) -> List[List[int]]: + def _calculate_cos_similarities(v1: np.ndarray, v2: np.ndarray) -> np.ndarray: + num = np.dot(v1, v2.T) + denom = np.linalg.norm(v1, axis=1).reshape(-1, 1) * \ + np.linalg.norm(v2, axis=1) + similarity_matrix = num / denom + similarity_matrix[np.isneginf(similarity_matrix)] = 0 + similarity_matrix = 0.5 + 0.5 * similarity_matrix + return similarity_matrix + + if embed_matrix.shape[0] % num_in_batch: + n_clusters = embed_matrix.shape[0] // num_in_batch + 1 + else: + n_clusters = embed_matrix.shape[0] // num_in_batch + + # K-means clustering + kmeans = KMeans(n_clusters=n_clusters, random_state=0, + n_init="auto").fit(embed_matrix) + similarity_matrix = _calculate_cos_similarities( + embed_matrix, kmeans.cluster_centers_) # [n_samples, n_clusters] + similarity_rankings = np.argsort(-similarity_matrix, axis=1) + groups = [[] for _ in range(n_clusters)] + for sample_idx, label in enumerate(kmeans.labels_): + groups[label].append(sample_idx) + # Reassign to equalize the number of samples in each cluster + for group_idx, group in enumerate(groups): + if len(group) > num_in_batch: + groups[group_idx] = sorted( + group, key=lambda x: similarity_matrix[x, group_idx], reverse=True) + samples_to_reassign = groups[group_idx][num_in_batch:] + groups[group_idx] = groups[group_idx][:num_in_batch] + for sample_idx in samples_to_reassign: + for candi_group_idx in similarity_rankings[sample_idx]: + if len(groups[candi_group_idx]) < num_in_batch: + groups[candi_group_idx].append(sample_idx) + break + return groups diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/util.py b/src/detectmatelibrary/parsers/logbatcher/engine/util.py new file mode 100644 index 0000000..37bd6e8 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/util.py @@ -0,0 +1,169 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import annotations + +import re +import string +from typing import Dict, List, Tuple + +import pandas as pd +import tiktoken + +def data_loader(file_name: str, dataset_format: str, file_format: str) -> List[str]: + if file_format == 'structured': + df = pd.read_csv(file_name) + contents = df['Content'].tolist() + elif file_format == 'raw': + with open(file_name, 'r') as f: + log_raws = f.readlines() + print(f"Total log lines: {len(log_raws)}") + headers, regex = generate_logformat_regex(dataset_format) + contents = log_to_dataframe(file_name, regex, headers, len(log_raws)) + return contents + + +def count_prompt_tokens(prompt: str, model_name: str) -> int: + """ + Count the number of tokens in the prompt + Models supported: gpt-4o-mini, gpt-3.5-turbo + """ + if model_name == 'gpt-4o-mini': + encoder = tiktoken.encoding_for_model('gpt-4o-mini') + elif model_name == 'gpt-3.5-turbo': + encoder = tiktoken.encoding_for_model('gpt-3.5-turbo') + else: + raise ValueError("Unsupported model: {}".format(model_name)) + + # 计算编码后的token数 + prompt_tokens = encoder.encode(prompt) + return len(prompt_tokens) + + +def count_message_tokens(messages: List[Dict[str, str]], model_name: str = "gpt-3.5-turbo") -> int: + """ + Count the number of tokens in the messages + Models supported: gpt-4o-mini, gpt-3.5-turbo + """ + if model_name == 'gpt-4o-mini': + encoder = tiktoken.encoding_for_model('gpt-4o-mini') + elif model_name == 'gpt-3.5-turbo': + encoder = tiktoken.encoding_for_model('gpt-3.5-turbo') + else: + raise ValueError("Unsupported model: {}".format(model_name)) + + token_count = 0 + + for message in messages: + role_tokens = encoder.encode(message['role']) + content_tokens = encoder.encode(message['content']) + token_count += len(role_tokens) + len(content_tokens) + 4 + return token_count + + +def generate_logformat_regex(logformat: str) -> Tuple[List[str], str]: + """ + Function to generate regular expression to split log messages + Args: + logformat: log format, a string + Returns: + headers: headers of log messages + regex: regular expression to split log messages + """ + headers = [] + splitters = re.split(r'(<[^<>]+>)', logformat) + regex = '' + for k in range(len(splitters)): + if k % 2 == 0: + splitter = re.sub(' +', r'\\s+', splitters[k]) + regex += splitter + else: + header = splitters[k].strip('<').strip('>') + regex += '(?P<%s>.*?)' % header + headers.append(header) + pattern = '^' + regex + '$' + return headers, pattern + + +def log_to_dataframe(log_file: str, regex: str, headers: List[str], size: int) -> List[str]: + """ + Function to transform log file to contents + Args: + log_file: log file path + regex: regular expression to split log messages + headers: headers of log messages + size: number of log messages to read + Returns: + log_messages: list of log contents + """ + log_contents = [] + with open(log_file, 'r') as file: + for line in [next(file) for _ in range(size)]: + try: + if not headers: # If no headers are defined + log_contents.append(line.strip()) + continue + match = regex.search(line.strip()) + message = [match.group(header) for header in headers] + log_contents.append(message[-1]) + except Exception as e: + pass + return log_contents + + +def not_varibility(logs: List[str]) -> bool: + a_logs = [re.sub(r'\d+', '', log) for log in logs] + if len(set(a_logs)) == 1: + return True + return False + +def verify_template(template: str) -> bool: + template = template.replace("<*>", "") + template = template.replace(" ", "") + return any(char not in string.punctuation for char in template) + +if __name__ == "__main__": + import json + import csv + + # LogBacther + with open('/root/LogBatcher/messages.json', 'r') as file: + messages_dict = json.load(file) + data = [] + datasets = ['BGL', 'HDFS', 'OpenStack', 'OpenSSH', 'HPC', 'Zookeeper', 'Spark', 'Proxifier', 'HealthApp', 'Mac', 'Hadoop', 'Apache', 'Linux', 'Thunderbird'] + all = 0 + for dataset in datasets: + messages = messages_dict[dataset] + count = 0 + for message in messages: + count += count_message_tokens(message) + print(f"{dataset}: [{count}, {len(messages)}] -> {count/len(messages).__round__(2)}") + data.append([dataset, count, len(messages), (count/len(messages)).__round__(2)]) + all += count + print(f"all: {all}") + with open('/root/LogBatcher/output_lilac_0.csv', 'w', newline='') as csvfile: + writer = csv.writer(csvfile) + writer.writerow(["Dataset", "Value1", "Value2", "Value3"]) # 写入标题 + for row in data: + writer.writerow([row[0], row[1], row[2], row[3]]) # 写入数据 diff --git a/src/detectmatelibrary/parsers/logbatcher/engine/vars.py b/src/detectmatelibrary/parsers/logbatcher/engine/vars.py new file mode 100644 index 0000000..201b703 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/engine/vars.py @@ -0,0 +1,41 @@ +# MIT License +# +# Copyright (c) 2024 LogIntelligence +# +# Based on LogBatcher (https://github.com/LogIntelligence/LogBatcher) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from __future__ import annotations + +import re +from typing import List + +from .matching import extract_variables + +def vars_update(refer_log: str, template: str, candidates: List[str]) -> List[str]: + new_variables = extract_variables(refer_log, template) + extend_vars = [] + if not new_variables: + return extend_vars + for var in new_variables: + var = re.sub(r'^\((.*)\)$|^\[(.*)\]$', r'\1\2', var) + if var not in candidates and not var.isdigit() and not var.isalpha() and len(var.split()) <= 3: + extend_vars.append(var) + return extend_vars \ No newline at end of file diff --git a/src/detectmatelibrary/parsers/logbatcher/parser.py b/src/detectmatelibrary/parsers/logbatcher/parser.py new file mode 100644 index 0000000..160a826 --- /dev/null +++ b/src/detectmatelibrary/parsers/logbatcher/parser.py @@ -0,0 +1,66 @@ +from detectmatelibrary.common.parser import CoreParser, CoreParserConfig +from detectmatelibrary.parsers.logbatcher.engine.parser import Parser as LLMParser +from detectmatelibrary.parsers.logbatcher.engine.parsing_cache import ParsingCache +from detectmatelibrary.parsers.logbatcher.engine.cluster import Cluster +from detectmatelibrary.parsers.logbatcher.engine.matching import extract_variables +from detectmatelibrary import schemas + +from typing import Any + + +class LogBatcherParserConfig(CoreParserConfig): + """Configuration for LogBatcherParser.""" + method_type: str = "logbatcher_parser" + model: str = "gpt-4o-mini" + api_key: str = "" + base_url: str = "" + batch_size: int = 10 + + +class LogBatcherParser(CoreParser): + """LLM-based log parser wrapping LogBatcher, integrated as a CoreParser.""" + + def __init__( + self, + name: str = "LogBatcherParser", + config: LogBatcherParserConfig | dict[str, Any] = LogBatcherParserConfig(), + ) -> None: + if isinstance(config, dict): + config = LogBatcherParserConfig.from_dict(config, name) + + super().__init__(name=name, config=config) + + llm_config = { + "api_key": config.api_key, + "base_url": config.base_url, + } + self._llm_parser = LLMParser(model=config.model, theme="default", config=llm_config) + self._cache = ParsingCache() + self._batch_size = config.batch_size + + def parse( + self, + input_: schemas.LogSchema, + output_: schemas.ParserSchema, + ) -> None: + log_content = input_["log"] + + template, event_id, _ = self._cache.match_event(log_content) + + if template == "NoMatch": + cluster = Cluster() + cluster.append_log(log_content, 0) + cluster.batching(self._batch_size) + + template, cluster, _ = self._llm_parser.get_responce(cluster, cache_base=self._cache) + + if template not in self._cache.template_list: + event_id, _, _ = self._cache.add_templates(template, refer_log=log_content) + else: + event_id = self._cache.template_list.index(template) + + variables = extract_variables(log_content, template) or () + + output_["template"] = template + output_["variables"].extend(list(variables)) + output_["EventID"] = event_id diff --git a/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py b/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py index bd1e780..0c41a05 100644 --- a/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py +++ b/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py @@ -1,8 +1,17 @@ from collections import defaultdict -from typing import Dict, List, Any, Tuple +from typing import Dict, List, Any, Tuple, TypedDict import regex import re +from detectmatelibrary.common._config._formats import ( + EventsConfig, _EventConfig, _EventInstance, Variable +) + + +class TemplateMetadata(TypedDict): + event_id_label: str | None + labels: list[str] + def safe_search(pattern: str, string: str, timeout: int = 1) -> regex.Match[str] | None: """Perform regex search with a timeout to prevent catastrophic @@ -64,6 +73,7 @@ class TemplatesManager: def __init__( self, template_list: list[str], + metadata: dict[int, TemplateMetadata] | None = None, remove_spaces: bool = True, remove_punctuation: bool = True, lowercase: bool = True @@ -96,6 +106,61 @@ def __init__( first = tokens[0] if tokens else "" self._prefix_index[first].append(idx) + _metadata: dict[int, TemplateMetadata] = metadata or {} + self._event_label_to_idx: dict[str, int] = { + m["event_id_label"]: i + for i, m in _metadata.items() + if m["event_id_label"] + } + self._idx_to_var_map: dict[int, dict[str, int]] = { + i: {label: pos for pos, label in enumerate(m["labels"])} + for i, m in _metadata.items() + if m["labels"] + } + + def compile_events_config(self, events_config: EventsConfig) -> EventsConfig: + """Resolve named event IDs and named variable labels to positional + ints. + + Translates user-friendly named format to the internal positional + representation. Returns a new EventsConfig with only int keys + and int positions. + """ + new_events: Dict[Any, _EventConfig] = {} + + for event_key, event_config in events_config.events.items(): + if isinstance(event_key, str) and event_key in self._event_label_to_idx: + resolved_key: str | int = self._event_label_to_idx[event_key] + else: + resolved_key = event_key + + var_map = self._idx_to_var_map.get(resolved_key if isinstance(resolved_key, int) else -1, {}) + + new_instances: Dict[str, _EventInstance] = {} + for instance_id, instance in event_config.instances.items(): + new_vars: Dict[str | int, Variable] = {} + for pos, var in instance.variables.items(): + if isinstance(pos, str): + if pos not in var_map: + raise ValueError( + f"Label '{pos}' not found in template for event '{event_key}'. " + f"Available labels: {list(var_map)}" + ) + resolved_pos = var_map[pos] + new_vars[resolved_pos] = Variable( + pos=resolved_pos, name=pos, params=var.params + ) + else: + new_vars[pos] = var + new_instances[instance_id] = _EventInstance( + params=instance.params, + variables=new_vars, + header_variables=instance.header_variables, + ) + new_events[resolved_key] = _EventConfig(instances=new_instances) + + return EventsConfig(events=new_events) + def candidate_indices(self, s: str) -> Tuple[str, List[int]]: pre_s = self.preprocess(s) candidates = [] @@ -110,17 +175,28 @@ class TemplateMatcher: def __init__( self, template_list: list[str], + metadata: dict[int, TemplateMetadata] | None = None, remove_spaces: bool = True, remove_punctuation: bool = True, lowercase: bool = True ) -> None: self.manager = TemplatesManager( template_list=template_list, + metadata=metadata, remove_spaces=remove_spaces, remove_punctuation=remove_punctuation, lowercase=lowercase ) + def compile_detector_config(self, events_config: EventsConfig) -> EventsConfig: + """Resolve named event IDs and variable labels to positional ints. + + Call once at setup time. Returns a new EventsConfig using the + internal positional representation, compatible with + get_configured_variables(). + """ + return self.manager.compile_events_config(events_config) + @staticmethod def extract_parameters(log: str, template: str) -> tuple[str, ...] | None: """Extract parameters from the log based on the template.""" diff --git a/src/detectmatelibrary/parsers/template_matcher/_parser.py b/src/detectmatelibrary/parsers/template_matcher/_parser.py index 3192a84..edcae99 100644 --- a/src/detectmatelibrary/parsers/template_matcher/_parser.py +++ b/src/detectmatelibrary/parsers/template_matcher/_parser.py @@ -1,10 +1,13 @@ -from detectmatelibrary.parsers.template_matcher._matcher_op import TemplateMatcher +from detectmatelibrary.parsers.template_matcher._matcher_op import TemplateMatcher, TemplateMetadata from detectmatelibrary.common.parser import CoreParser, CoreParserConfig from detectmatelibrary import schemas from typing import Any import csv import os +import re + +_NAMED_WC_RE = re.compile(r'<([A-Za-z_]\w*)>') class TemplatesNotFoundError(Exception): @@ -15,34 +18,93 @@ class TemplateNoPermissionError(Exception): pass -def load_templates(path: str) -> list[str]: +def _compile_templates( + raw_templates: list[str], + event_id_labels: list[str | None] | None = None, +) -> tuple[list[str], dict[int, TemplateMetadata]]: + """Convert named wildcards to <*> and record label order and event ID + labels. + + Args: + raw_templates: Raw template strings, possibly containing named wildcards. + event_id_labels: Optional per-template event ID labels (from CSV EventId column). + If provided, must have the same length as raw_templates. + + Returns: + compiled: Template strings with only <*> wildcards, ready for TemplatesManager. + metadata: Mapping of template index to TemplateMetadata. + + Raises: + ValueError: If a template mixes <*> and named wildcards. + """ + compiled: list[str] = [] + metadata: dict[int, TemplateMetadata] = {} + + for i, raw in enumerate(raw_templates): + has_anon = "<*>" in raw + labels = _NAMED_WC_RE.findall(raw) + has_named = bool(labels) + + if has_anon and has_named: + raise ValueError( + f"Template mixes <*> and named wildcards: {raw!r}. " + "Use either <*> (positional) or