The Common Knowledge Base implements a comprehensive ETL (Extract, Transform, Load) pipeline that automatically collects, processes, and manages data from various sources. The system is designed to maintain up-to-date knowledge for large language model consumption.
graph TB
subgraph "Extract Phase"
Web[Web Scraping]
API[API Integration]
Upload[File Upload]
Schedule[Scheduled Tasks]
end
subgraph "Transform Phase"
Clean[Content Cleaning]
Meta[Metadata Extraction]
Hash[Content Hashing]
Valid[Validation]
end
subgraph "Load Phase"
Store[Data Storage]
Index[Content Indexing]
Archive[Data Archival]
Export[Data Export]
end
Web --> Clean
API --> Clean
Upload --> Clean
Schedule --> Web
Schedule --> API
Clean --> Store
Meta --> Store
Hash --> Store
Valid --> Store
Store --> Index
Store --> Archive
Store --> Export
- Estonian Government Sites: Specialized scraper for public sector content
- General Websites: Configurable scraping for any website
- Sitemap-based: Automatic URL discovery via sitemaps
- Specific Pages: Targeted scraping of defined URLs
- REST APIs: Direct integration with data APIs
- File APIs: Processing files from API endpoints
- Real-time Feeds: Periodic polling of data feeds
- Document Upload: PDF, DOCX, DOC processing
- Bulk Upload: Multiple file processing
- Metadata Enrichment: Manual metadata addition
-
Source Configuration
- Define target URLs and scraping parameters
- Set scraping intervals and scheduling
- Configure content type expectations
-
Content Extraction
- Use Playwright for JavaScript-heavy sites
- Handle multiple content types (HTML, PDF, documents)
- Extract text, images, and metadata
- Generate content hashes for duplicate detection
-
Error Handling
- Log failed requests and network errors
- Implement retry mechanisms
- Track source availability and status
-
API Configuration
- Define API endpoints and authentication
- Set polling intervals and data formats
- Configure response parsing rules
-
Data Retrieval
- Execute scheduled API calls
- Parse JSON/XML responses
- Handle pagination and rate limiting
- Process file downloads from APIs
-
Upload Management
- Generate presigned URLs for secure uploads
- Validate file types and sizes
- Track upload progress and status
-
File Processing
- Convert uploads to scraping tasks
- Extract content using appropriate parsers
- Generate metadata for uploaded files
sequenceDiagram
participant Scrapper
participant Cleaning
participant Storage
participant DB
Scrapper->>Storage: Store raw content
Scrapper->>DB: Store file metadata
Scrapper->>Cleaning: Request cleaning
Cleaning->>Storage: Download raw content
Cleaning->>Cleaning: Process content
alt HTML Content
Cleaning->>Cleaning: Remove HTML tags
Cleaning->>Cleaning: Extract text with BeautifulSoup
else Document Content
Cleaning->>Cleaning: Parse with Unstructured
Cleaning->>Cleaning: Extract structured text
end
Cleaning->>Storage: Upload cleaned text
Cleaning->>DB: Update processing status
-
HTML Processing
- Remove HTML tags and CSS styles
- Extract readable text content
- Preserve document structure where relevant
- Handle special characters and encoding
-
Document Processing
- Parse PDF, DOCX, DOC files
- Extract text while preserving formatting
- Handle embedded images and tables
- Support multiple languages
-
Metadata Generation
- Extract page titles and descriptions
- Generate content fingerprints
- Record processing timestamps
- Track content source and lineage
-
Content Validation
- Verify content integrity
- Check for duplicate content
- Validate extracted text quality
- Flag problematic content
sequenceDiagram
participant Service
participant RuuterInt
participant Resql
participant DB
participant Storage
Service->>RuuterInt: Submit processed data
RuuterInt->>Resql: Execute SQL query
Resql->>DB: Store/update records
DB->>Resql: Return results
Resql->>RuuterInt: Query response
RuuterInt->>Storage: Update file references
RuuterInt->>Service: Confirmation
-
Database Storage (PostgreSQL)
- Structured Data: Agency, source, and file metadata
- Processing Status: Track ETL pipeline progress
- Relationships: Maintain data lineage and dependencies
- Indexing: Optimize for query performance
-
Blob Storage (S3)
- Raw Content: Original scraped files
- Cleaned Content: Processed text files
- Metadata: JSON metadata files
- Archives: Compressed historical data
-
File System
- Temporary Files: Processing intermediates
- Logs: Service execution logs
- Exports: Generated data exports
- Configuration: DSL and settings files
-
Export Process
- Scheduled Exports: Daily/weekly data exports
- Compression: Gzip compression for efficiency
- Timestamping: Versioned export files
- Cleanup: Remove exported data from active tables
-
Archive Management
- Cold Storage: Move old data to cheaper storage
- Retention Policies: Automated data lifecycle management
- Backup Strategy: Regular backup scheduling
The system uses multiple scheduled tasks for automation:
# Every 5 minutes: Check for unscheduled sources
trigger_scheduler: "0 */5 * * * ?"
# Every 5 minutes: Execute scheduled scraping
trigger_pipeline: "0 */5 * * * ?"
# Hourly: Archive and compress data
trigger_zipping: "0 0 */1 * * ?"-
Source Scheduling
- Identify sources due for updates
- Calculate next run times based on intervals
- Queue scraping tasks
-
Task Execution
- Distribute scraping tasks across workers
- Monitor task progress and status
- Handle failures and retries
-
Data Processing
- Trigger cleaning after successful scraping
- Update metadata and status
- Generate notifications for completion
-
Duplicate Detection
- SHA1 hashing for content fingerprinting
- Cross-source duplicate identification
- Prevent redundant processing
-
Quality Metrics
- Content length and structure validation
- Language detection and processing
- Encoding and character set handling
-
Error Tracking
- Comprehensive error logging
- Failed operation tracking
- Recovery and retry mechanisms
-
Pipeline Health
- Service availability monitoring
- Queue depth and processing times
- Error rate tracking
-
Data Quality Metrics
- Content freshness indicators
- Processing success rates
- Storage utilization metrics
-
Horizontal Scaling
- Stateless service design
- Load-balanced deployments
- Worker pool management
-
Caching Strategy
- Content deduplication
- Metadata caching
- Query result caching
-
Resource Management
- Connection pooling
- Memory optimization
- Disk space management
-
Parallel Processing
- Concurrent scraping tasks
- Multi-threaded cleaning
- Batch operations
-
Smart Scheduling
- Content change detection
- Adaptive polling intervals
- Priority-based processing
-
Environment Variables
- Service endpoints and credentials
- Storage configuration
- Processing parameters
-
DSL Configuration
- API endpoint definitions
- Database query specifications
- Pipeline workflow definitions
-
Database Maintenance
- Index optimization
- Statistics updates
- Cleanup operations
-
Storage Management
- Archive old content
- Clean temporary files
- Monitor storage usage
-
Service Health
- Regular health checks
- Performance monitoring
- Capacity planning
-
Scraping Failures
- Network connectivity issues
- Website structure changes
- Authentication problems
-
Processing Errors
- File format compatibility
- Encoding issues
- Resource limitations
-
Storage Issues
- Disk space constraints
- Network storage problems
- Permission issues
-
Logs Analysis
- Service-specific log files
- Error tracking and correlation
- Performance metrics
-
Database Queries
- Direct database inspection
- Query performance analysis
- Data integrity checks
-
Service Testing
- Health check endpoints
- Manual task triggering
- Integration testing tools