From e9953767ae275591930837ff23c1fbb664b6d090 Mon Sep 17 00:00:00 2001 From: Nik Kale Date: Fri, 26 Dec 2025 15:07:39 -0800 Subject: [PATCH] commit - security: add input validation and size limits for data ingestion --- autorca_core/__init__.py | 3 + autorca_core/ingestion/logs.py | 60 +++++++++-- autorca_core/validation.py | 181 +++++++++++++++++++++++++++++++++ 3 files changed, 237 insertions(+), 7 deletions(-) create mode 100644 autorca_core/validation.py diff --git a/autorca_core/__init__.py b/autorca_core/__init__.py index 7d1cbc7..f1d9769 100644 --- a/autorca_core/__init__.py +++ b/autorca_core/__init__.py @@ -14,6 +14,7 @@ from autorca_core.reasoning.loop import run_rca, RCARunResult from autorca_core.logging import configure_logging, get_logger from autorca_core.config import ThresholdConfig +from autorca_core.validation import IngestionLimits, ValidationError __all__ = [ "Event", @@ -28,4 +29,6 @@ "configure_logging", "get_logger", "ThresholdConfig", + "IngestionLimits", + "ValidationError", ] diff --git a/autorca_core/ingestion/logs.py b/autorca_core/ingestion/logs.py index bc86283..9d3d29f 100644 --- a/autorca_core/ingestion/logs.py +++ b/autorca_core/ingestion/logs.py @@ -8,10 +8,18 @@ import re from pathlib import Path from typing import List, Optional, Dict, Any -from datetime import datetime +from datetime import datetime, timezone from autorca_core.model.events import LogEvent, Severity from autorca_core.logging import get_logger +from autorca_core.validation import ( + IngestionLimits, + validate_path, + check_file_size, + check_line_length, + check_total_events, + sanitize_error_message, +) logger = get_logger(__name__) @@ -21,6 +29,7 @@ def load_logs( time_from: Optional[datetime] = None, time_to: Optional[datetime] = None, service_filter: Optional[str] = None, + limits: Optional[IngestionLimits] = None, ) -> List[LogEvent]: """ Load logs from a file or directory. @@ -30,10 +39,14 @@ def load_logs( time_from: Start of time window (inclusive) time_to: End of time window (inclusive) service_filter: Only include logs from this service + limits: Optional ingestion limits for security Returns: List of LogEvent objects """ + if limits is None: + limits = IngestionLimits() + source_path = Path(source) if not source_path.exists(): @@ -42,13 +55,39 @@ def load_logs( events = [] if source_path.is_file(): - events.extend(_load_log_file(source_path)) + check_file_size(source_path, limits) + events.extend(_load_log_file(source_path, limits)) else: # Load all .log, .jsonl, .txt files in directory extensions = ['*.log', '*.jsonl', '*.txt'] +<<<<<<< HEAD for ext in extensions: for file_path in source_path.glob(f"**/{ext}"): events.extend(_load_log_file(file_path)) +======= + file_count = 0 + for ext in extensions: + for file_path in source_path.glob(f"**/{ext}"): + # Validate path to prevent traversal + validate_path(source_path, file_path) + + # Check file count limit + file_count += 1 + if file_count > limits.max_files_per_directory: + print(f"Warning: Reached file limit ({limits.max_files_per_directory}), skipping remaining files") + break + + # Check file size + try: + check_file_size(file_path, limits) + events.extend(_load_log_file(file_path, limits)) + + # Check total event count + check_total_events(len(events), limits) + except Exception as e: + print(f"Warning: Skipping file {file_path.name}: {sanitize_error_message(e, file_path)}") + continue +>>>>>>> 0ac8e01 (security: add input validation and size limits for data ingestion) # Apply filters if time_from: @@ -61,7 +100,7 @@ def load_logs( return sorted(events, key=lambda e: e.timestamp) -def _load_log_file(file_path: Path) -> List[LogEvent]: +def _load_log_file(file_path: Path, limits: IngestionLimits) -> List[LogEvent]: """Load a single log file.""" events = [] @@ -72,6 +111,9 @@ def _load_log_file(file_path: Path) -> List[LogEvent]: continue try: + # Check line length + check_line_length(line, limits) + # Try JSON parsing first event = _parse_json_log(line) if event: @@ -83,7 +125,11 @@ def _load_log_file(file_path: Path) -> List[LogEvent]: events.append(event) except Exception as e: # Log parsing errors are non-fatal +<<<<<<< HEAD logger.warning(f"Failed to parse line {line_num} in {file_path}: {e}") +======= + print(f"Warning: Failed to parse line {line_num} in {file_path.name}: {sanitize_error_message(e)}") +>>>>>>> 0ac8e01 (security: add input validation and size limits for data ingestion) return events @@ -96,8 +142,8 @@ def _parse_json_log(line: str) -> Optional[LogEvent]: # Extract timestamp timestamp_str = data.get('timestamp') or data.get('time') or data.get('@timestamp') if not timestamp_str: - # Use current time as fallback - timestamp = datetime.utcnow() + # Use current time as fallback (timezone-aware) + timestamp = datetime.now(timezone.utc) else: timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) @@ -151,7 +197,7 @@ def _parse_text_log(line: str) -> Optional[LogEvent]: try: timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) except ValueError: - timestamp = datetime.utcnow() + timestamp = datetime.now(timezone.utc) level = _parse_severity(level_str) @@ -165,7 +211,7 @@ def _parse_text_log(line: str) -> Optional[LogEvent]: # If pattern doesn't match, create a basic log event return LogEvent( - timestamp=datetime.utcnow(), + timestamp=datetime.now(timezone.utc), service="unknown", message=line, level=Severity.INFO, diff --git a/autorca_core/validation.py b/autorca_core/validation.py new file mode 100644 index 0000000..d13ae5d --- /dev/null +++ b/autorca_core/validation.py @@ -0,0 +1,181 @@ +""" +Input validation and security controls for AutoRCA-Core. + +Provides path validation, size limits, and sanitization to prevent security issues. +""" + +import os +from pathlib import Path +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class IngestionLimits: + """ + Limits for data ingestion to prevent resource exhaustion. + + Attributes: + max_file_size_mb: Maximum file size in megabytes + max_total_events: Maximum number of events to ingest + max_line_length: Maximum length of a single line + max_files_per_directory: Maximum files to process from a directory + """ + + max_file_size_mb: float = 100.0 + max_total_events: int = 1_000_000 + max_line_length: int = 65536 # 64KB per line + max_files_per_directory: int = 1000 + + @classmethod + def strict(cls) -> "IngestionLimits": + """Create strict limits for untrusted data.""" + return cls( + max_file_size_mb=10.0, + max_total_events=100_000, + max_line_length=8192, + max_files_per_directory=100, + ) + + @classmethod + def relaxed(cls) -> "IngestionLimits": + """Create relaxed limits for trusted data.""" + return cls( + max_file_size_mb=500.0, + max_total_events=10_000_000, + max_line_length=131072, # 128KB + max_files_per_directory=10000, + ) + + +class ValidationError(Exception): + """Base exception for validation errors.""" + + pass + + +class PathTraversalError(ValidationError): + """Raised when path traversal is detected.""" + + pass + + +class FileSizeError(ValidationError): + """Raised when file size exceeds limits.""" + + pass + + +class LineLengthError(ValidationError): + """Raised when line length exceeds limits.""" + + pass + + +def validate_path(source_path: Path, file_path: Path) -> bool: + """ + Ensure file_path is within source_path to prevent path traversal attacks. + + Args: + source_path: The expected root directory + file_path: The file path to validate + + Returns: + True if path is safe, False otherwise + + Raises: + PathTraversalError: If path traversal is detected + """ + try: + # Resolve both paths to absolute paths + source_resolved = source_path.resolve() + file_resolved = file_path.resolve() + + # Check if file_path is within source_path + file_resolved.relative_to(source_resolved) + return True + except ValueError: + raise PathTraversalError( + f"Path traversal detected: {file_path} is outside {source_path}" + ) + + +def check_file_size(file_path: Path, limits: IngestionLimits) -> None: + """ + Check if file size is within limits. + + Args: + file_path: Path to the file + limits: Ingestion limits configuration + + Raises: + FileSizeError: If file exceeds size limit + """ + try: + size_mb = file_path.stat().st_size / (1024 * 1024) + if size_mb > limits.max_file_size_mb: + raise FileSizeError( + f"File size {size_mb:.1f}MB exceeds limit of {limits.max_file_size_mb}MB" + ) + except OSError as e: + raise ValidationError(f"Error checking file size: {e}") + + +def check_line_length(line: str, limits: IngestionLimits) -> None: + """ + Check if line length is within limits. + + Args: + line: The line to check + limits: Ingestion limits configuration + + Raises: + LineLengthError: If line exceeds length limit + """ + if len(line) > limits.max_line_length: + raise LineLengthError( + f"Line length {len(line)} exceeds limit of {limits.max_line_length}" + ) + + +def sanitize_error_message(error: Exception, file_path: Optional[Path] = None) -> str: + """ + Sanitize error messages to avoid leaking sensitive path information. + + Args: + error: The exception to sanitize + file_path: Optional file path to redact + + Returns: + Sanitized error message + """ + message = str(error) + + # Redact absolute paths + if file_path: + message = message.replace(str(file_path.resolve()), f"") + + # Redact home directory paths + home = os.path.expanduser("~") + if home in message: + message = message.replace(home, "~") + + return message + + +def check_total_events(current_count: int, limits: IngestionLimits) -> None: + """ + Check if total event count is within limits. + + Args: + current_count: Current number of events + limits: Ingestion limits configuration + + Raises: + ValidationError: If event count exceeds limit + """ + if current_count >= limits.max_total_events: + raise ValidationError( + f"Event count {current_count} exceeds limit of {limits.max_total_events}" + ) +