A Python-based distributed log processing system built with PySpark and Streamlit. This system processes log files at scale, performs analytics, generates reports, and provides an interactive dashboard for visualization.
- Distributed Processing: Uses PySpark for scalable log processing
- Comprehensive Analytics: Error analysis, trends, IP/service breakdowns
- Alert System: Configurable alerts based on thresholds
- Interactive Dashboard: Modern Streamlit-based visualization
- Automated Reporting: CSV and JSON report generation
- Production Ready: Clean, modular, scalable architecture
distributed-log-system/
β
βββ data/
β βββ raw_logs/ # Input CSV log files
β
βββ reports/
β βββ csv/ # CSV reports
β βββ json/ # JSON reports
β βββ alerts.log # Alert history
β
βββ src/
β βββ spark/
β β βββ spark_session.py # Spark session management
β β βββ ingest_logs.py # Log ingestion
β β βββ parse_logs.py # Log parsing & normalization
β β βββ analytics.py # Core analytics
β β βββ alerts.py # Alert system
β β βββ export_reports.py # Report generation
β β
β βββ dashboard/
β β βββ app.py # Streamlit dashboard
β β
β βββ main.py # Main processing pipeline
β
βββ config/
β βββ config.yaml # Configuration file
β
βββ requirements.txt
βββ README.md
- Python 3.8 or higher
- Java 8 or higher (required for PySpark)
-
Clone or navigate to the project directory
-
Create a virtual environment (recommended)
python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
-
Install dependencies
pip install -r requirements.txt
-
Verify Java installation
java -version
Place your log CSV files in the data/raw_logs/ directory. The CSV files should contain at least the following columns:
timestamp: Timestamp of the log entry (various formats supported)log_level: Log level (INFO, WARN, ERROR, DEBUG)message: Log message content
Optional columns:
iporIP: IP addressservice: Service nameendpoint: Endpoint path
timestamp,log_level,message,ip,service
2024-01-15 10:30:00,INFO,Request processed successfully,192.168.1.100,api-service
2024-01-15 10:31:00,ERROR,Connection timeout exception,192.168.1.101,api-service
2024-01-15 10:32:00,WARN,High response time detected,192.168.1.102,web-serviceEdit config/config.yaml to customize:
- Spark settings: Memory, executor settings
- Paths: Input/output directories
- Alert thresholds: Error rate, error count, critical errors
- Analytics: Top N errors, time windows
- Dashboard: Auto-refresh settings
Run the main processing pipeline:
python src/main.pyOr run individual modules:
# Ingest logs
python src/spark/ingest_logs.py
# Parse logs
python src/spark/parse_logs.py
# Run analytics
python src/spark/analytics.py
# Check alerts
python src/spark/alerts.py
# Export reports
python src/spark/export_reports.pyStart the Streamlit dashboard:
streamlit run src/dashboard/app.pyThe dashboard will open in your browser at http://localhost:8501
The interactive dashboard provides:
- Key Metrics: Total logs, errors, error rate, active alerts
- Charts:
- Errors over time (line chart)
- Top error types (bar chart)
- Errors per IP (bar chart)
- Errors per service (bar chart)
- Filters: Date range, log level selection
- Alert Panel: Recent alerts with timestamps
- Detailed Tables: Top errors, trends, severity breakdown
- Auto-refresh: Automatic data refresh capability
After processing, reports are available in:
-
CSV Reports (
reports/csv/):errors_by_type.csv: Errors grouped by typeerrors_by_hour.csv: Errors by hourtop_errors.csv: Top N most frequent errorserrors_per_ip.csv: Errors grouped by IPerrors_per_service.csv: Errors grouped by service
-
JSON Reports (
reports/json/):error_trends.json: Error trends over timeerrors_by_day.json: Errors by dayerrors_by_severity.json: Errors by severity level
-
Alert Log (
reports/alerts.log): History of all alerts
The system automatically checks for:
- High Error Rate: Exceeds configured threshold (default: 10%)
- High Error Count: Total errors exceed threshold (default: 100)
- Critical Errors: Critical error count exceeds threshold (default: 5)
Alerts are:
- Printed to console
- Logged to
reports/alerts.log - Displayed in the dashboard
- Modular Design: Each module has a specific responsibility
- PySpark Only: All data processing uses PySpark (no Pandas for transformations)
- Configurable: Settings in YAML configuration
- Logging: Comprehensive logging throughout
- Error Handling: Robust error handling and validation
- Add function to
src/spark/analytics.py - Call it in
run_all_analytics() - Export results in
export_reports.py - Visualize in dashboard
app.py
Edit config/config.yaml to adjust:
alerts.error_rate_thresholdalerts.error_count_thresholdalerts.critical_error_count
Currently configured for local[*] execution. To run on a cluster:
-
Update
config/config.yaml:spark: master: "spark://your-cluster:7077"
-
Ensure Spark cluster is accessible
-
Run the same commands
The code is structured to be cloud-ready. For AWS EMR, Databricks, or other platforms:
- Package the code
- Update Spark master URL
- Ensure data paths are accessible (S3, HDFS, etc.)
- Deploy and run
- No Pandas for Processing: All analytics use PySpark DataFrames
- Caching: Intermediate results are cached for performance
- Partitioning: Spark optimizations are applied automatically
- Scalability: Designed to handle large-scale log processing
- Install Java 8+ and set
JAVA_HOMEenvironment variable
- Check Java installation
- Verify Spark dependencies are installed
- Check configuration file paths
- Ensure Spark processing has been run first
- Check that reports are generated in
reports/directory - Verify file paths in configuration
- Ensure you're running from the project root
- Check that all dependencies are installed
- Verify Python path includes project directory
This project is provided as-is for educational and development purposes.
Feel free to extend this system with:
- Additional analytics
- New visualization types
- Enhanced alerting
- Performance optimizations
Built with PySpark & Streamlit π