# ๐Ÿงช Testing Guide Comprehensive testing strategies for RssBot Platform including unit tests, integration tests, and end-to-end testing. ## ๐ŸŽฏ Testing Philosophy RssBot Platform follows a **multi-layered testing approach**: - **๐Ÿ”ฌ Unit Tests**: Test individual functions and components in isolation - **๐Ÿ”— Integration Tests**: Test service interactions and API endpoints - **๐ŸŒ End-to-End Tests**: Test complete user workflows - **โšก Performance Tests**: Test scalability and performance characteristics - **๐Ÿ”’ Security Tests**: Test authentication, authorization, and input validation ## ๐Ÿ“‹ Test Structure ### Test Organization ``` tests/ โ”œโ”€โ”€ unit/ # Unit tests โ”‚ โ”œโ”€โ”€ test_controller.py # Core controller logic โ”‚ โ”œโ”€โ”€ test_registry.py # Service registry tests โ”‚ โ”œโ”€โ”€ test_proxy.py # Service proxy tests โ”‚ โ””โ”€โ”€ services/ # Individual service tests โ”œโ”€โ”€ integration/ # Integration tests โ”‚ โ”œโ”€โ”€ test_api.py # API endpoint tests โ”‚ โ”œโ”€โ”€ test_services.py # Service interaction tests โ”‚ โ””โ”€โ”€ test_database.py # Database integration tests โ”œโ”€โ”€ e2e/ # End-to-end tests โ”‚ โ”œโ”€โ”€ test_bot_workflow.py โ”‚ โ”œโ”€โ”€ test_feed_processing.py โ”‚ โ””โ”€โ”€ test_admin_workflows.py โ”œโ”€โ”€ performance/ # Performance tests โ”‚ โ”œโ”€โ”€ test_load.py โ”‚ โ””โ”€โ”€ test_scalability.py โ””โ”€โ”€ fixtures/ # Test data and fixtures โ”œโ”€โ”€ feeds.json โ”œโ”€โ”€ users.json โ””โ”€โ”€ responses.json ``` ## ๐Ÿš€ Running Tests ### Quick Test Commands ```bash # Run all tests pytest # Run with coverage pytest --cov=src/rssbot --cov-report=html # Run specific test categories pytest tests/unit/ # Unit tests only pytest tests/integration/ # Integration tests only pytest tests/e2e/ # E2E tests only # Run specific test files pytest tests/unit/test_controller.py pytest tests/integration/test_api.py -v # Run tests matching pattern pytest -k "test_service_discovery" pytest -k "not slow" ``` ### Test Environment Setup ```bash # Install test dependencies rye sync --all-features # Or with pip pip install -r requirements-dev.lock # Set up test environment export ENVIRONMENT=testing export DATABASE_URL=sqlite:///test.db export REDIS_URL=redis://localhost:6379/1 # Run tests with test environment pytest tests/ --env testing ``` ## ๐Ÿ”ฌ Unit Testing ### Core Controller Tests ```python # tests/unit/test_controller.py import pytest from unittest.mock import AsyncMock, patch from rssbot.core.controller import create_platform_app from rssbot.discovery.cached_registry import CachedServiceRegistry @pytest.fixture async def test_app(): """Create test application instance""" with patch('rssbot.core.controller.get_cached_registry') as mock_registry: mock_registry.return_value = AsyncMock(spec=CachedServiceRegistry) app = await create_platform_app() yield app @pytest.mark.asyncio async def test_health_endpoint(test_client): """Test platform health endpoint""" response = await test_client.get("/health") assert response.status_code == 200 data = response.json() assert data["status"] == "healthy" assert "architecture" in data assert "services_count" in data @pytest.mark.asyncio async def test_services_list(test_client, mock_registry): """Test services listing endpoint""" # Mock service data mock_registry.get_all_services.return_value = [ { "name": "db_svc", "status": "running", "connection_method": "router" } ] response = await test_client.get("/services") assert response.status_code == 200 services = response.json() assert len(services) > 0 assert services[0]["name"] == "db_svc" ``` ### Service Registry Tests ```python # tests/unit/test_registry.py import pytest from unittest.mock import AsyncMock, patch from rssbot.discovery.cached_registry import CachedServiceRegistry from rssbot.models.service_registry import ServiceInfo @pytest.fixture def mock_redis(): """Mock Redis client""" redis_mock = AsyncMock() redis_mock.get.return_value = None redis_mock.set.return_value = True return redis_mock @pytest.fixture def mock_database(): """Mock database connection""" db_mock = AsyncMock() return db_mock @pytest.mark.asyncio async def test_service_cache_miss(mock_redis, mock_database): """Test service registry cache miss scenario""" # Setup registry = CachedServiceRegistry(redis=mock_redis, db=mock_database) service_data = ServiceInfo( name="test_svc", status="running", connection_method="router" ) # Configure mocks mock_redis.get.return_value = None # Cache miss mock_database.get_service.return_value = service_data # Execute result = await registry.get_service("test_svc") # Verify assert result.name == "test_svc" mock_redis.get.assert_called_once_with("service:test_svc") mock_database.get_service.assert_called_once_with("test_svc") mock_redis.set.assert_called_once() @pytest.mark.asyncio async def test_service_cache_hit(mock_redis, mock_database): """Test service registry cache hit scenario""" # Setup registry = CachedServiceRegistry(redis=mock_redis, db=mock_database) cached_data = '{"name": "test_svc", "status": "running"}' # Configure mocks mock_redis.get.return_value = cached_data # Execute result = await registry.get_service("test_svc") # Verify assert result.name == "test_svc" mock_redis.get.assert_called_once_with("service:test_svc") mock_database.get_service.assert_not_called() # Should not hit database ``` ## ๐Ÿ”— Integration Testing ### API Integration Tests ```python # tests/integration/test_api.py import pytest import httpx from fastapi.testclient import TestClient @pytest.fixture def test_client(): """Create test client for API testing""" from rssbot.core.controller import create_platform_app app = create_platform_app() return TestClient(app) @pytest.mark.integration def test_service_connection_method_update(test_client): """Test updating service connection method""" # Test data service_name = "ai_svc" new_method = "hybrid" # Execute API call response = test_client.post( f"/services/{service_name}/connection-method", json={"connection_method": new_method}, headers={"Authorization": "Bearer test_token"} ) # Verify response assert response.status_code == 200 data = response.json() assert data["service_name"] == service_name assert data["new_method"] == new_method # Verify change persisted status_response = test_client.get(f"/services/{service_name}/status") status_data = status_response.json() assert status_data["connection_method"] == new_method @pytest.mark.integration async def test_database_service_integration(test_client, test_database): """Test database service integration""" # Create test feed feed_data = { "url": "https://example.com/test-feed.xml", "title": "Test Feed", "chat_id": 12345, "update_interval": 3600 } # Create feed via API response = test_client.post( "/services/db_svc/feeds", json=feed_data, headers={"Authorization": "Bearer test_token"} ) assert response.status_code == 201 created_feed = response.json() feed_id = created_feed["id"] # Verify feed exists in database get_response = test_client.get(f"/services/db_svc/feeds/{feed_id}") assert get_response.status_code == 200 retrieved_feed = get_response.json() assert retrieved_feed["url"] == feed_data["url"] assert retrieved_feed["title"] == feed_data["title"] ``` ### Service Communication Tests ```python # tests/integration/test_services.py import pytest from unittest.mock import patch, AsyncMock @pytest.mark.integration async def test_ai_service_communication(): """Test AI service communication through proxy""" from rssbot.discovery.proxy import ServiceProxy from rssbot.models.service_registry import ServiceInfo # Setup proxy = ServiceProxy() with patch.object(proxy, 'registry') as mock_registry: # Mock service info service_info = ServiceInfo( name="ai_svc", connection_method="router", status="running", health_score=0.9 ) mock_registry.get_service.return_value = service_info # Mock AI service function with patch('rssbot.services.ai_svc.main.process_content') as mock_ai: mock_ai.return_value = {"summary": "Test summary"} # Execute result = await proxy.call_service( "ai_svc", "process_content", text="Test content", action="summarize" ) # Verify assert result["summary"] == "Test summary" mock_ai.assert_called_once() @pytest.mark.integration async def test_service_failover(): """Test service failover in hybrid mode""" from rssbot.discovery.proxy import ServiceProxy proxy = ServiceProxy() # Test automatic failover from router to REST with patch.object(proxy, '_direct_call') as mock_direct: mock_direct.side_effect = ConnectionError("Service unavailable") with patch.object(proxy, '_http_call') as mock_http: mock_http.return_value = {"status": "success"} # This should automatically failover to HTTP result = await proxy._hybrid_call(service_info, "test_method") assert result["status"] == "success" mock_direct.assert_called_once() mock_http.assert_called_once() ``` ## ๐ŸŒ End-to-End Testing ### Bot Workflow Tests ```python # tests/e2e/test_bot_workflow.py import pytest from unittest.mock import AsyncMock, patch @pytest.mark.e2e async def test_complete_bot_workflow(test_app, mock_telegram): """Test complete bot workflow from user interaction to feed processing""" # Setup test user and bot user_id = 12345 chat_id = 12345 # Step 1: User starts bot with patch('telegram.Bot.send_message') as mock_send: await mock_telegram.send_update({ "message": { "from": {"id": user_id}, "chat": {"id": chat_id}, "text": "/start" } }) # Verify welcome message sent mock_send.assert_called() welcome_call = mock_send.call_args assert "welcome" in welcome_call[1]["text"].lower() # Step 2: User subscribes to feed with patch('telegram.Bot.send_message') as mock_send: await mock_telegram.send_update({ "message": { "from": {"id": user_id}, "chat": {"id": chat_id}, "text": "/subscribe https://example.com/feed.xml" } }) # Verify subscription confirmation mock_send.assert_called() confirm_call = mock_send.call_args assert "subscribed" in confirm_call[1]["text"].lower() # Step 3: Simulate feed update feed_content = """ Test Article Test description https://example.com/article """ with patch('httpx.AsyncClient.get') as mock_get: mock_get.return_value.text = feed_content mock_get.return_value.status_code = 200 with patch('telegram.Bot.send_message') as mock_send: # Trigger feed processing await test_app.services.channel_mgr.process_feeds() # Verify article was sent to user mock_send.assert_called() article_call = mock_send.call_args assert "Test Article" in article_call[1]["text"] @pytest.mark.e2e async def test_ai_enhancement_workflow(test_app, mock_openai): """Test AI enhancement in complete workflow""" # Setup user_id = 12345 original_content = "This is a very long article about technology trends..." ai_summary = "Brief summary of technology trends." # Mock OpenAI response mock_openai.completions.create.return_value.choices[0].text = ai_summary # Create feed with AI enabled feed_data = { "url": "https://example.com/feed.xml", "title": "Tech Feed", "chat_id": user_id, "ai_processing": True, "ai_summary": True } # Process content with AI result = await test_app.services.ai_svc.process_content({ "text": original_content, "action": "summarize", "max_length": 200 }) # Verify AI processing assert result["summary"] == ai_summary assert len(result["summary"]) < len(original_content) ``` ### Feed Processing Tests ```python # tests/e2e/test_feed_processing.py import pytest from datetime import datetime from unittest.mock import patch, AsyncMock @pytest.mark.e2e async def test_feed_processing_pipeline(test_app): """Test complete feed processing pipeline""" # Step 1: Create feed feed_data = { "url": "https://example.com/test-feed.xml", "title": "Test Feed", "chat_id": 12345, "update_interval": 3600 } created_feed = await test_app.services.db_svc.create_feed(feed_data) feed_id = created_feed.id # Step 2: Mock RSS content rss_content = """ Test Feed Breaking News Important news update https://example.com/news1 Mon, 15 Jan 2024 10:00:00 GMT """ # Step 3: Process feed with patch('httpx.AsyncClient.get') as mock_get: mock_get.return_value.text = rss_content mock_get.return_value.status_code = 200 with patch('telegram.Bot.send_message') as mock_send: # Execute feed processing await test_app.services.channel_mgr.process_feed(feed_id) # Verify processing results mock_get.assert_called_once_with(feed_data["url"]) mock_send.assert_called_once() # Check message content send_args = mock_send.call_args message_text = send_args[1]["text"] assert "Breaking News" in message_text assert "Important news update" in message_text # Step 4: Verify feed status updated updated_feed = await test_app.services.db_svc.get_feed(feed_id) assert updated_feed.last_updated is not None assert updated_feed.status == "active" ``` ## โšก Performance Testing ### Load Testing ```python # tests/performance/test_load.py import pytest import asyncio import time from concurrent.futures import ThreadPoolExecutor @pytest.mark.performance async def test_concurrent_service_calls(test_app): """Test platform performance under concurrent load""" async def make_health_check(): """Single health check request""" start_time = time.time() response = await test_app.test_client.get("/health") end_time = time.time() return { "status_code": response.status_code, "response_time": end_time - start_time } # Execute 100 concurrent requests tasks = [make_health_check() for _ in range(100)] results = await asyncio.gather(*tasks) # Analyze results successful_requests = [r for r in results if r["status_code"] == 200] response_times = [r["response_time"] for r in successful_requests] # Performance assertions assert len(successful_requests) >= 95 # 95% success rate assert max(response_times) < 1.0 # Max 1 second response time assert sum(response_times) / len(response_times) < 0.1 # Average < 100ms @pytest.mark.performance async def test_service_discovery_performance(test_app): """Test service discovery performance""" # Warm up cache await test_app.registry.get_service("db_svc") # Test cached performance start_time = time.time() for _ in range(1000): await test_app.registry.get_service("db_svc") end_time = time.time() total_time = end_time - start_time avg_time_per_call = total_time / 1000 # Should be sub-millisecond for cached calls assert avg_time_per_call < 0.001 # Less than 1ms average @pytest.mark.performance async def test_feed_processing_performance(test_app): """Test feed processing performance with multiple feeds""" # Create 50 test feeds feeds = [] for i in range(50): feed_data = { "url": f"https://example.com/feed{i}.xml", "title": f"Test Feed {i}", "chat_id": 12345 + i, "update_interval": 3600 } feed = await test_app.services.db_svc.create_feed(feed_data) feeds.append(feed) # Process all feeds concurrently start_time = time.time() await test_app.services.channel_mgr.process_all_feeds() end_time = time.time() processing_time = end_time - start_time # Should process 50 feeds in reasonable time assert processing_time < 30 # Less than 30 seconds for 50 feeds assert processing_time / 50 < 1 # Less than 1 second per feed average ``` ## ๐Ÿ”’ Security Testing ### Authentication Tests ```python # tests/security/test_auth.py import pytest from unittest.mock import patch @pytest.mark.security async def test_unauthorized_api_access(test_client): """Test API endpoints require proper authentication""" # Test without token response = await test_client.post("/services/ai_svc/connection-method") assert response.status_code == 401 # Test with invalid token response = await test_client.post( "/services/ai_svc/connection-method", headers={"Authorization": "Bearer invalid_token"} ) assert response.status_code == 401 # Test with valid token response = await test_client.post( "/services/ai_svc/connection-method", json={"connection_method": "router"}, headers={"Authorization": "Bearer valid_test_token"} ) assert response.status_code in [200, 422] # 422 for validation errors @pytest.mark.security async def test_input_validation(test_client): """Test input validation and sanitization""" # Test SQL injection attempt malicious_feed = { "url": "https://example.com'; DROP TABLE feeds; --", "title": "Malicious Feed", "chat_id": 12345 } response = await test_client.post( "/services/db_svc/feeds", json=malicious_feed, headers={"Authorization": "Bearer valid_test_token"} ) # Should reject malicious input assert response.status_code == 422 # Validation error # Test XSS attempt xss_feed = { "url": "https://example.com/feed.xml", "title": "", "chat_id": 12345 } response = await test_client.post( "/services/db_svc/feeds", json=xss_feed, headers={"Authorization": "Bearer valid_test_token"} ) if response.status_code == 201: # If created, title should be sanitized created_feed = response.json() assert "