-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch_processor.py
More file actions
396 lines (322 loc) · 11.1 KB
/
Copy pathbatch_processor.py
File metadata and controls
396 lines (322 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
"""
Batch Processing Service for Echo Desktop Application.
Handles bulk flashcard generation from multiple text sources:
- Multiple clipboard selections
- File imports
- URL content extraction
- Document processing
Architecture: Core (Hub) - batch processing logic.
"""
import os
import json
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
from flashcard_generator import FlashcardGenerator, Flashcard
from ai_providers import AIProviderFactory, AIConfig
@dataclass
class BatchJob:
"""Represents a batch processing job."""
job_id: str
source_type: str # "clipboard", "file", "url", "text"
source_location: str
status: str # "pending", "processing", "completed", "failed"
total_items: int
processed_items: int
flashcards_generated: int
errors: List[str]
class BatchProcessor:
"""
Processes multiple text sources in batch mode.
Features:
- Parallel processing for performance
- Progress tracking and callbacks
- Error handling and retry logic
- Memory-efficient streaming for large files
"""
def __init__(
self,
flashcard_generator: FlashcardGenerator,
max_workers: int = 4,
batch_size: int = 10,
on_progress: Optional[Callable[[int, int], None]] = None
):
"""
Initialize batch processor.
Args:
flashcard_generator: FlashcardGenerator instance
max_workers: Maximum parallel workers
batch_size: Items per batch
on_progress: Progress callback function
"""
self.generator = flashcard_generator
self.max_workers = max_workers
self.batch_size = batch_size
self.on_progress = on_progress
def process_texts(
self,
texts: List[str],
auto_add_to_srs: bool = False
) -> Dict:
"""
Process multiple texts in batch.
Args:
texts: List of text strings to process
auto_add_to_srs: Whether to automatically add to SRS
Returns:
Dictionary with results summary
"""
total = len(texts)
all_flashcards = []
errors = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all jobs
futures = {
executor.submit(
self.generator.generate,
text
): idx
for idx, text in enumerate(texts)
}
# Process completed jobs
for future in as_completed(futures):
idx = futures[future]
try:
flashcards = future.result()
all_flashcards.extend(flashcards)
# Progress callback
if self.on_progress:
self.on_progress(idx + 1, total)
except Exception as e:
errors.append({
"text_index": idx,
"error": str(e)
})
return {
"total_texts": total,
"processed": total - len(errors),
"total_flashcards": len(all_flashcards),
"flashcards": [
{
"question": card.question,
"answer": card.answer,
"difficulty": card.difficulty,
"quality_score": card.quality_score,
"tags": card.tags
}
for card in all_flashcards
],
"errors": errors
}
def process_file(
self,
file_path: str,
chunk_size: int = 1000
) -> Dict:
"""
Process a large file in chunks.
Args:
file_path: Path to file
chunk_size: Characters per chunk
Returns:
Dictionary with results
"""
if not os.path.exists(file_path):
return {
"error": f"File not found: {file_path}",
"flashcards": []
}
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Split into chunks
chunks = self._split_into_chunks(content, chunk_size)
# Process chunks
return self.process_texts(chunks)
except Exception as e:
return {
"error": str(e),
"flashcards": []
}
def process_directory(
self,
directory_path: str,
extensions: List[str] = None
) -> Dict:
"""
Process all text files in a directory.
Args:
directory_path: Path to directory
extensions: File extensions to process (default: .txt, .md)
Returns:
Dictionary with results
"""
if extensions is None:
extensions = ['.txt', '.md']
if not os.path.isdir(directory_path):
return {
"error": f"Directory not found: {directory_path}",
"flashcards": []
}
# Collect all files
files = []
for root, _, filenames in os.walk(directory_path):
for filename in filenames:
if any(filename.endswith(ext) for ext in extensions):
files.append(os.path.join(root, filename))
# Process all files
all_results = []
for file_path in files:
result = self.process_file(file_path)
all_results.append({
"file": file_path,
"result": result
})
# Aggregate results
total_flashcards = sum(
len(r["result"].get("flashcards", []))
for r in all_results
)
return {
"total_files": len(files),
"processed_files": len([r for r in all_results if "error" not in r["result"]]),
"total_flashcards": total_flashcards,
"file_results": all_results
}
def _split_into_chunks(
self,
text: str,
chunk_size: int
) -> List[str]:
"""
Split text into chunks at sentence boundaries.
Args:
text: Text to split
chunk_size: Target chunk size
Returns:
List of text chunks
"""
chunks = []
current_chunk = ""
# Split into sentences
sentences = text.split('. ')
for sentence in sentences:
if len(current_chunk) + len(sentence) < chunk_size:
current_chunk += sentence + '. '
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + '. '
# Add last chunk
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
class ClipboardBatchCollector:
"""
Collects multiple clipboard selections over time for batch processing.
Features:
- Time-windowed collection
- Manual trigger for batch processing
- Deduplication of similar text
"""
def __init__(
self,
time_window: int = 300, # 5 minutes
similarity_threshold: float = 0.8
):
"""
Initialize clipboard batch collector.
Args:
time_window: Time window in seconds for collection
similarity_threshold: Threshold for deduplication
"""
self.time_window = time_window
self.similarity_threshold = similarity_threshold
self.collected_texts: List[Dict] = []
def add_text(self, text: str) -> bool:
"""
Add text to collection.
Args:
text: Text to add
Returns:
True if text was added, False if duplicate
"""
import time
# Check for duplicates
for item in self.collected_texts:
if self._is_similar(text, item["text"]):
return False
# Add new text
self.collected_texts.append({
"text": text,
"timestamp": time.time()
})
return True
def get_batch(self, clear: bool = True) -> List[str]:
"""
Get collected texts for batch processing.
Args:
clear: Whether to clear collection after retrieval
Returns:
List of collected texts
"""
import time
current_time = time.time()
# Filter by time window
texts = [
item["text"]
for item in self.collected_texts
if current_time - item["timestamp"] <= self.time_window
]
# Clear if requested
if clear:
self.collected_texts = []
return texts
def _is_similar(self, text1: str, text2: str) -> bool:
"""
Check if two texts are similar (deduplication).
Simple similarity check using word overlap.
"""
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return False
overlap = len(words1 & words2)
similarity = overlap / min(len(words1), len(words2))
return similarity >= self.similarity_threshold
# Example usage
if __name__ == "__main__":
print("Testing Batch Processor...")
print("=" * 60)
# Create generator
generator = FlashcardGenerator(
provider="rule_based",
max_flashcards=3,
min_quality_score=0.5
)
# Create batch processor
processor = BatchProcessor(
flashcard_generator=generator,
max_workers=2
)
# Test texts
test_texts = [
"Machine learning is a subset of artificial intelligence.",
"Neural networks are computing systems inspired by biological neural networks.",
"Deep learning is a subset of machine learning using multi-layered networks."
]
def on_progress(current, total):
print(f"Progress: {current}/{total}")
processor.on_progress = on_progress
# Process batch
results = processor.process_texts(test_texts)
print(f"\nBatch Results:")
print(f" Total texts: {results['total_texts']}")
print(f" Processed: {results['processed']}")
print(f" Total flashcards: {results['total_flashcards']}")
print(f" Errors: {len(results['errors'])}")
print(f"\nGenerated Flashcards:")
for i, card in enumerate(results['flashcards'], 1):
print(f"{i}. {card['question']}")
print(f" A: {card['answer']}")
print()