-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_processor.py
More file actions
156 lines (124 loc) · 5.37 KB
/
test_processor.py
File metadata and controls
156 lines (124 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import json
import unittest
from datetime import datetime
from unittest.mock import Mock, patch
from feed_processor.processor import FeedProcessor
from feed_processor.webhook import WebhookConfig, WebhookManager, WebhookResponse
class TestFeedProcessor(unittest.TestCase):
def setUp(self):
self.processor = FeedProcessor(
max_queue_size=10,
webhook_endpoint="https://example.com/webhook",
webhook_auth_token="test-token",
webhook_batch_size=2,
)
self.sample_feed = {
"content": """
<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>Test Feed</title>
<link>http://example.com/feed</link>
<description>Test Description</description>
<item>
<title>Test Item</title>
<link>http://example.com/item1</link>
<description>Test Item Description</description>
</item>
</channel>
</rss>
"""
}
def test_add_feed_invalid_content(self):
"""Test adding feed with invalid content."""
self.assertFalse(self.processor.add_feed({"content": "invalid content"}))
def test_add_feed_queue_full(self):
"""Test adding feed when queue is full."""
# Fill up the queue
for _ in range(10): # max_queue_size is 10
self.processor.add_feed(self.sample_feed)
# Try to add one more
self.assertFalse(self.processor.add_feed(self.sample_feed))
def test_add_feed_success(self):
"""Test successfully adding a feed."""
self.assertTrue(self.processor.add_feed(self.sample_feed))
def test_add_feed_with_webhook(self):
"""Test adding a feed with webhook enabled."""
with patch("feed_processor.webhook.WebhookManager.batch_send") as mock_send:
mock_send.return_value = [WebhookResponse(success=True, status_code=200)]
# Add two feeds to trigger a batch
self.assertTrue(self.processor.add_feed(self.sample_feed))
self.assertTrue(self.processor.add_feed(self.sample_feed))
# Start processing
self.processor.start()
# Let the processor run briefly
import time
time.sleep(0.5)
# Stop and ensure final batch is sent
self.processor.stop()
# Verify webhook was called
mock_send.assert_called()
def test_webhook_batch_processing(self):
"""Test that feeds are properly batched before sending."""
with patch("feed_processor.webhook.WebhookManager.batch_send") as mock_send:
mock_send.return_value = [WebhookResponse(success=True, status_code=200)]
# Add three feeds (should create one full batch and one partial)
for _ in range(3):
self.assertTrue(self.processor.add_feed(self.sample_feed))
# Start and stop to process all feeds
self.processor.start()
import time
time.sleep(0.5)
self.processor.stop()
# Should have been called twice (one full batch, one partial)
self.assertEqual(mock_send.call_count, 2)
def test_webhook_failure_handling(self):
"""Test handling of webhook failures."""
with patch("feed_processor.webhook.WebhookManager.batch_send") as mock_send:
# Simulate a failed webhook call
mock_send.return_value = [
WebhookResponse(
success=False,
status_code=500,
error_message="Internal Server Error",
retry_count=3,
)
]
# Add feeds and process
self.assertTrue(self.processor.add_feed(self.sample_feed))
self.assertTrue(self.processor.add_feed(self.sample_feed))
self.processor.start()
import time
time.sleep(0.5)
self.processor.stop()
# Verify webhook was called and metrics were updated
mock_send.assert_called()
def test_rate_limiting(self):
"""Test handling of rate limiting in webhook calls."""
with patch("feed_processor.webhook.WebhookManager.batch_send") as mock_send:
# Simulate rate limiting
mock_send.return_value = [
WebhookResponse(
success=False,
status_code=429,
error_message="Rate limit exceeded",
retry_count=1,
rate_limited=True,
)
]
# Add feeds and process
self.assertTrue(self.processor.add_feed(self.sample_feed))
self.assertTrue(self.processor.add_feed(self.sample_feed))
self.processor.start()
import time
time.sleep(0.5)
self.processor.stop()
# Verify webhook was called and rate limiting was handled
mock_send.assert_called()
def test_process_feed(self):
"""Test processing a single feed."""
feed_data = {"type": "rss", "title": "Test", "link": "http://example.com"}
self.processor._process_feed(feed_data)
self.assertEqual(len(self.processor.current_batch), 1)
if __name__ == "__main__":
unittest.main()