Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ logging:
# Number of backup log files to keep
backup_count: 5

# =============================================================================
# PERSISTENCE CONFIGURATION
# =============================================================================
persistence:
# SQLite database path for persisting alerts and beacons
# Set to null to disable persistence (in-memory only)
db_path: "./beacon_detect.db"

# =============================================================================
# WHITELIST CONFIGURATION
# =============================================================================
Expand Down
45 changes: 44 additions & 1 deletion control_plane/alerter.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ def convert_value(v):
def to_json(self):
return json.dumps(self.to_dict(), indent=2)

@classmethod
def from_dict(cls, data):
return cls(
alert_id=data["alert_id"],
title=data["title"],
description=data["description"],
severity=AlertSeverity(data["severity"]),
source=data["source"],
details=data.get("details", {}),
timestamp=data.get("timestamp", ""),
tags=data.get("tags", []),
)

def to_syslog_message(self):
return (
f"[{self.severity.value.upper()}] {self.title} | "
Expand Down Expand Up @@ -293,9 +306,10 @@ def send(self, alert: Alert):

class AlertManager:

def __init__(self, config=None):
def __init__(self, config=None, persistence=None):

self.config = config or AlertingConfig()
self._persistence = persistence

# Initialize handlers
self._syslog = SyslogHandler(self.config)
Expand Down Expand Up @@ -406,6 +420,13 @@ def _deliver_alert(self, alert: Alert):
if len(self._recent_alerts) > self._max_recent_alerts:
self._recent_alerts = self._recent_alerts[-self._max_recent_alerts :]

# Persist to database
if self._persistence:
try:
self._persistence.save_alert(alert.to_dict())
except Exception as e:
logger.error(f"Failed to persist alert: {e}")

def send_alert(self, alert: Alert):

if not self.config.enabled:
Expand Down Expand Up @@ -456,6 +477,28 @@ def get_recent_alerts(self, limit: int = 50, severity=None):

return [a.to_dict() for a in reversed(alerts)]

def load_historical_alerts(self):
"""Load alerts from persistence on startup."""
if not self._persistence:
return

try:
alert_dicts = self._persistence.load_alerts(limit=self._max_recent_alerts)
for alert_dict in reversed(alert_dicts):
try:
alert = Alert.from_dict(alert_dict)
self._recent_alerts.append(alert)
except Exception as e:
logger.warning(
f"Failed to restore alert {alert_dict.get('alert_id')}: {e}"
)

logger.info(
f"Loaded {len(self._recent_alerts)} historical alerts from database"
)
except Exception as e:
logger.error(f"Failed to load historical alerts: {e}")

@property
def statistics(self):

Expand Down
89 changes: 87 additions & 2 deletions control_plane/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, run_id: str):
self.pairs_analyzed = 0
self.beacons_detected = 0
self.alerts_generated = 0
self.pairs_skipped = 0
self.errors = 0
self.results: List[DetectionResult] = []

Expand All @@ -61,6 +62,7 @@ def to_dict(self):
"pairs_analyzed": int(self.pairs_analyzed),
"beacons_detected": int(self.beacons_detected),
"alerts_generated": int(self.alerts_generated),
"pairs_skipped": int(self.pairs_skipped),
"errors": int(self.errors),
}

Expand All @@ -73,6 +75,8 @@ def __init__(
detector: BeaconDetector,
alert_manager: AlertManager,
config=None,
whitelist: dict = None,
persistence=None,
):
"""
Initialize the analyzer.
Expand All @@ -82,11 +86,15 @@ def __init__(
detector: BeaconDetector instance
alert_manager: AlertManager instance
config: Analyzer configuration
whitelist: Whitelist configuration for filtering pairs
persistence: SQLiteStore instance for persisting beacons
"""
self.storage = storage
self.detector = detector
self.alert_manager = alert_manager
self.config = config or AnalyzerConfig()
self._whitelist = whitelist or {}
self._persistence = persistence

# Thread safety lock for shared state
self._lock = threading.RLock()
Expand Down Expand Up @@ -118,6 +126,29 @@ def __init__(
f"ConnectionAnalyzer initialized: interval={self.config.analysis_interval}s"
)

def _is_whitelisted(self, pair) -> bool:
"""Check if a connection pair matches any whitelist rule."""
if pair.src_ip in self._whitelist.get("source_ips", []):
return True

if pair.dst_ip in self._whitelist.get("destination_ips", []):
return True

if pair.dst_port in self._whitelist.get("ports", []):
return True

pair_str = f"{pair.src_ip}:{pair.dst_ip}:{pair.dst_port}"
if pair_str in self._whitelist.get("pairs", []):
return True

return False

def update_whitelist(self, whitelist: dict):
"""Update whitelist configuration (thread-safe)."""
with self._lock:
self._whitelist = whitelist or {}
logger.info("Whitelist updated")

def start(self):

if self._running:
Expand Down Expand Up @@ -165,13 +196,23 @@ def run_analysis(self):
logger.info(f"Starting analysis run: {run_id}")

try:
# logger.info(f"self.config.min_connections {self.config.min_connections} and self.config.min_duration {self.config.min_duration}")
# Get analyzable pairs from storage
pairs = self.storage.get_analyzable_pairs(
min_connections=self.config.min_connections,
min_duration=self.config.min_duration,
)
# logger.info(f"pairs{pairs}")

# Filter out whitelisted pairs
if self._whitelist:
original_count = len(pairs)
pairs = [p for p in pairs if not self._is_whitelisted(p)]
run.pairs_skipped = original_count - len(pairs)
if run.pairs_skipped > 0:
logger.info(
f"Whitelist filtered {run.pairs_skipped} pairs "
f"({original_count} -> {len(pairs)})"
)

# Limit pairs for performance
if len(pairs) > self.config.max_pairs_per_run:
logger.warning(
Expand Down Expand Up @@ -208,6 +249,17 @@ def run_analysis(self):
with self._lock:
self._known_beacons[result.pair_key] = result

# Persist beacon to database
if self._persistence:
try:
self._persistence.save_beacon(
result.pair_key, result.to_dict()
)
except Exception as e:
logger.error(
f"Failed to persist beacon {result.pair_key}: {e}"
)

except Exception as e:
logger.error(f"Error generating alert for {result.pair_key}: {e}")
run.errors += 1
Expand All @@ -220,6 +272,13 @@ def run_analysis(self):
]
for key in stale_keys:
del self._known_beacons[key]
if self._persistence:
try:
self._persistence.remove_beacon(key)
except Exception as e:
logger.error(
f"Failed to remove beacon {key} from database: {e}"
)

except Exception as e:
logger.error(f"Analysis run error: {e}", exc_info=True)
Expand Down Expand Up @@ -320,6 +379,26 @@ def get_run_history(self, limit: int = 10):
runs = self._run_history[-limit:]
return [r.to_dict() for r in reversed(runs)]

def load_historical_beacons(self):
"""Load known beacons from persistence on startup."""
if not self._persistence:
return

try:
beacon_dicts = self._persistence.load_beacons()
for pair_key, detection_dict in beacon_dicts.items():
try:
result = DetectionResult.from_dict(detection_dict)
self._known_beacons[pair_key] = result
except Exception as e:
logger.warning(f"Failed to restore beacon {pair_key}: {e}")

logger.info(
f"Loaded {len(self._known_beacons)} historical beacons from database"
)
except Exception as e:
logger.error(f"Failed to load historical beacons: {e}")

@property
def statistics(self):
"""Get analyzer statistics"""
Expand All @@ -331,4 +410,10 @@ def statistics(self):
"total_alerts_generated": self._total_alerts_generated,
"current_known_beacons": len(self._known_beacons),
"active_cooldowns": len(self._alert_cooldowns),
"whitelist_rules": {
"source_ips": len(self._whitelist.get("source_ips", [])),
"destination_ips": len(self._whitelist.get("destination_ips", [])),
"ports": len(self._whitelist.get("ports", [])),
"pairs": len(self._whitelist.get("pairs", [])),
},
}
45 changes: 45 additions & 0 deletions control_plane/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ def to_dict(self):
"jitter": float(round(self.jitter, 3)),
}

@classmethod
def from_dict(cls, data):
return cls(
count=data["count"],
mean=data["mean"],
std_dev=data["std_dev"],
cv=data["cv"],
median=data["median"],
min_interval=data["min_interval"],
max_interval=data["max_interval"],
jitter=data["jitter"],
)


@dataclass
class PeriodicityResult:
Expand All @@ -68,6 +81,15 @@ def to_dict(self):
],
}

@classmethod
def from_dict(cls, data):
return cls(
is_periodic=data["is_periodic"],
dominant_period=data["dominant_period"] or 0.0,
periodicity_score=data["periodicity_score"],
frequency_peaks=[(f, m) for f, m in data["frequency_peaks"]],
)


@dataclass
class DetectionResult:
Expand Down Expand Up @@ -125,6 +147,29 @@ def to_dict(self):
"analysis_time": str(self.analysis_time),
}

@classmethod
def from_dict(cls, data):
return cls(
pair_key=data["pair_key"],
src_ip=data["src_ip"],
dst_ip=data["dst_ip"],
dst_port=data["dst_port"],
protocol=data["protocol"],
cv_score=data["cv_score"],
periodicity_score=data["periodicity_score"],
jitter_score=data["jitter_score"],
combined_score=data["combined_score"],
is_beacon=data["is_beacon"],
confidence=BeaconConfidence(data["confidence"]),
interval_stats=IntervalStats.from_dict(data["interval_stats"]),
periodicity_result=PeriodicityResult.from_dict(data["periodicity_result"]),
connection_count=data["connection_count"],
duration_seconds=data["duration_seconds"],
first_seen=data["first_seen"],
last_seen=data["last_seen"],
analysis_time=data.get("analysis_time", ""),
)


@dataclass
class DetectorConfig:
Expand Down
Loading
Loading