OPS-001: Scheduling and Automation
Overview
Implement scheduling and automation capabilities for running the wareflow pipeline on a scheduled basis (daily, weekly, etc.).
Description
Scheduling and automation enables:
- Automated daily pipeline execution
- Scheduled report generation and distribution
- Unattended operation (fire-and-forget)
- Error notifications and alerts
- Pipeline history and logging
Technical Approach
Built-in Scheduler
# src/wareflow_analysis/scheduler.py
import schedule
import time
from datetime import datetime
from pathlib import Path
from run.pipeline import PipelineRunner
class WareflowScheduler:
"""Schedule and automate wareflow pipeline execution."""
def __init__(self, project_dir: Path, config_path: Path = None):
self.project_dir = project_dir
self.config_path = config_path or project_dir / "config" / "schedule.yaml"
self.pipeline = PipelineRunner(project_dir)
self.load_config()
def load_config(self):
"""Load scheduling configuration."""
import yaml
if self.config_path.exists():
with open(self.config_path) as f:
self.config = yaml.safe_load(f)
else:
self.config = {"jobs": []}
def run_pipeline(self, job_name: str):
"""Run pipeline with logging and error handling."""
print(f"\n{'='*60}")
print(f"Scheduled Job: {job_name}")
print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*60}\n")
try:
results = self.pipeline.run_full_pipeline()
# Log results
self.log_results(job_name, results)
# Send notifications if configured
if self.config.get('notifications'):
self.send_notifications(job_name, results)
except Exception as e:
print(f"❌ Pipeline failed: {e}")
self.log_error(job_name, e)
self.send_alert(job_name, e)
def setup_schedules(self):
"""Setup scheduled jobs from configuration."""
for job in self.config.get('jobs', []):
if job.get('enabled', True):
frequency = job['frequency']
time_str = job.get('time', '02:00')
if frequency == 'daily':
schedule.every().day.at(time_str).do(
self.run_pipeline, job['name']
)
elif frequency == 'weekly':
day = job.get('day', 'monday')
getattr(schedule.every(), day.lower()).at(time_str).do(
self.run_pipeline, job['name']
)
elif frequency == 'hourly':
schedule.every().hour.do(
self.run_pipeline, job['name']
)
def start(self):
"""Start the scheduler."""
print(f"🗓️ Wareflow Scheduler Started")
print(f"Project: {self.project_dir}")
print(f"Config: {self.config_path}")
print(f"\nScheduled Jobs:")
for job in self.config.get('jobs', []):
if job.get('enabled', True):
print(f" ✓ {job['name']}: {job['frequency']} at {job.get('time', '02:00')}")
print(f"\nScheduler running. Press Ctrl+C to stop.\n")
self.setup_schedules()
try:
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
except KeyboardInterrupt:
print("\n\n🛑 Scheduler stopped by user")
def log_results(self, job_name: str, results: dict):
"""Log pipeline execution results."""
log_dir = self.project_dir / ".wareflow" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = log_dir / f"pipeline_{timestamp}.log"
with open(log_file, 'w') as f:
f.write(f"Job: {job_name}\n")
f.write(f"Timestamp: {datetime.now().isoformat()}\n")
f.write(f"Status: {results['status']}\n")
f.write(f"\nResults:\n")
for step, result in results.items():
if step not in ['status', 'errors']:
f.write(f" {step}: {result}\n")
if results.get('errors'):
f.write(f"\nErrors:\n")
for error in results['errors']:
f.write(f" - {error}\n")
Configuration Structure
Schedule Configuration
config/schedule.yaml:
jobs:
- name: "daily_pipeline"
enabled: true
frequency: "daily"
time: "02:00" # 2 AM
warehouses:
- paris
- lyon
- marseille
steps:
- import
- analyze
- export
export:
output_dir: "output/reports/daily"
filename_pattern: "warehouse_report_{warehouse}_{date}.xlsx"
consolidate: true
- name: "weekly_summary"
enabled: true
frequency: "weekly"
day: "friday"
time: "08:00"
warehouses: "all"
steps:
- analyze
export:
output_dir: "output/reports/weekly"
filename_pattern: "weekly_summary_{date}.xlsx"
consolidate: true
email:
to: ["execs@company.com"]
subject: "Weekly Warehouse Summary"
notifications:
on_success:
email: ["warehouse-ops@company.com"]
on_failure:
email: ["warehouse-ops@company.com", "it-alerts@company.com"]
slack:
webhook_url: "${SLACK_WEBHOOK_URL}"
channel: "#warehouse-alerts"
logging:
level: "INFO"
retention_days: 30
Implementation Plan
Phase 1: Core Scheduler (1-2 days)
Phase 2: Enhanced Features (1 day)
CLI Usage
# Start scheduler
wareflow scheduler start
# Run scheduled jobs manually (test)
wareflow scheduler run --job daily_pipeline
# Validate schedule configuration
wareflow scheduler validate
# Show next scheduled runs
wareflow scheduler next
# View schedule history
wareflow scheduler history
System Integration
Windows Task Scheduler
<!-- Task Scheduler XML -->
<Task>
<Triggers>
<CalendarTrigger>
<StartAt 02:00:00</StartAt>
<DaysInterval>1</DaysInterval>
</CalendarTrigger>
</Triggers>
<Actions>
<Exec>
<Command>wareflow</Command>
<Arguments>run</Arguments>
<WorkingDirectory>C:\path\to\project</WorkingDirectory>
</Exec>
</Actions>
</Task>
Linux Cron
# /etc/cron.d/wareflow
# Run daily at 2 AM
0 2 * * * cd /path/to/project && wareflow run >> .wareflow/logs/cron.log 2>&1
# Run weekly report on Fridays at 8 AM
0 8 * * 5 cd /path/to/project && wareflow export --consolidated --output reports/weekly/
systemd Timer (Linux)
# /etc/systemd/system/wareflow.service
[Unit]
Description=Wareflow Analysis Pipeline
After=network.target
[Service]
Type=oneshot
WorkingDirectory=/path/to/project
ExecStart=/usr/local/bin/wareflow run
[Install]
WantedBy=multi-user.target
# /etc/systemd/system/wareflow.timer
[Unit]
Description=Run Wareflow Pipeline Daily
Requires=wareflow.service
[Timer]
OnCalendar=*-*-* 02:00:00
Persistent=true
[Install]
WantedBy=timers.target
Monitoring and Logging
Pipeline History
-- Track pipeline executions
CREATE TABLE pipeline_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_name TEXT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
status TEXT, -- 'success', 'failed', 'partial'
import_rows INTEGER,
export_files TEXT,
error_message TEXT
);
Log Files
.wareflow/
├── logs/
│ ├── pipeline_20250121_020000.log
│ ├── pipeline_20250120_020000.log
│ └── scheduler.log
└── history.json
Error Handling
Retry Logic
# config/schedule.yaml
jobs:
- name: "daily_pipeline"
retry:
max_attempts: 3
backoff: "exponential" # or "linear"
initial_delay: 60 # seconds
Alerting
def send_alert(job_name: str, error: Exception):
"""Send alert on pipeline failure."""
if config['notifications'].get('slack'):
send_slack_alert(
webhook_url=config['notifications']['slack']['webhook_url'],
channel=config['notifications']['slack']['channel'],
message=f"❌ Pipeline {job_name} failed: {error}"
)
if config['notifications'].get('email'):
send_email_alert(
to=config['notifications']['on_failure']['email'],
subject=f"⚠️ Pipeline Alert: {job_name}",
body=f"Pipeline execution failed:\n\n{error}"
)
Success Criteria
Future Enhancements
- Web UI: Dashboard for viewing schedule and history
- Dynamic Scheduling: Adjust schedules based on data availability
- Distributed Execution: Run jobs across multiple servers
- Priority Queues: High-priority ad-hoc jobs
- Resource Limits: Prevent overlapping executions
Dependencies
Required
- CORE-005 (run command)
schedule Python package (or use APScheduler)
Optional
- SMTP server (for email notifications)
- Slack webhook (for Slack alerts)
Related Issues
- Depends on: CORE-005
- Related to: ARCH-003 (Consolidated Reporting)
References
Notes
Scheduling and automation is the final piece for production deployment:
- Enables fire-and-forget operation
- Reduces manual intervention
- Ensures timely reports
- Provides operational monitoring
Key considerations:
- Reliability: Must run consistently without manual intervention
- Monitoring: Must detect and alert on failures
- Idempotency: Must handle re-runs safely
- Logging: Must maintain audit trail
The scheduler should be:
- Simple: Easy to configure and understand
- Robust: Handle errors gracefully
- Visible: Clear status and logging
- Flexible: Support different schedules per environment
OPS-001: Scheduling and Automation
Overview
Implement scheduling and automation capabilities for running the wareflow pipeline on a scheduled basis (daily, weekly, etc.).
Description
Scheduling and automation enables:
Technical Approach
Built-in Scheduler
Configuration Structure
Schedule Configuration
config/schedule.yaml:
Implementation Plan
Phase 1: Core Scheduler (1-2 days)
WareflowSchedulerclassPhase 2: Enhanced Features (1 day)
CLI Usage
System Integration
Windows Task Scheduler
Linux Cron
systemd Timer (Linux)
Monitoring and Logging
Pipeline History
Log Files
Error Handling
Retry Logic
Alerting
Success Criteria
Future Enhancements
Dependencies
Required
schedulePython package (or use APScheduler)Optional
Related Issues
References
Notes
Scheduling and automation is the final piece for production deployment:
Key considerations:
The scheduler should be: