Conversation
0fb951f to
8115d0d
Compare
cd65091 to
5037a2a
Compare
There was a problem hiding this comment.
Pull request overview
This PR makes document embedding asynchronous and introduces throttling to improve indexing pipeline resilience under heavy load, while refactoring the indexing/reindexing flow and updating related evaluation data and tests.
Changes:
- Add async embedding workflow: documents can be indexed as
to-be-embeddedand later embedded via a background task. - Add throttling controls for embedding API calls and debouncing for embedding task dispatch.
- Refactor OpenSearch document formatting via adapters/schema updates; update evaluation corpora/test fixtures to UUID-based IDs.
Reviewed changes
Copilot reviewed 36 out of 37 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| src/backend/find/settings.py | Adds embedding throttle/task countdown settings. |
| src/backend/evaluation/management/commands/evaluate_search_engine.py | Updates evaluation indexing setup and imports. |
| src/backend/evaluation/data/v0/queries.py | Updates expected IDs to UUIDs. |
| src/backend/evaluation/data/v0/documents.py | Updates document IDs to UUIDs. |
| src/backend/evaluation/data/semantic/queries.py | Updates expected IDs to UUIDs. |
| src/backend/evaluation/data/full_text/queries.py | Updates expected IDs to UUIDs. |
| src/backend/evaluation/data/full_text/documents.py | Updates document IDs to UUIDs. |
| src/backend/evaluation/data/corpus/simple_corpus.py | Updates corpus IDs to UUIDs. |
| src/backend/core/views.py | Indexing endpoint now sets indexing_status and schedules embedding task (debounced). |
| src/backend/core/utils.py | Adds throttle_acquire + extract_language_code; removes old test-only prepare_index. |
| src/backend/core/tests/utils.py | Introduces test prepare_index and mock indexing pipeline helpers. |
| src/backend/core/tests/test_utils.py | Adds unit tests for extract_language_code. |
| src/backend/core/tests/test_search.py | Uses new test prepare_index with embedding-enabled option. |
| src/backend/core/tests/test_reindex.py | Adds coverage for new reindex-with-embedding service behavior. |
| src/backend/core/tests/test_embedding.py | Adds chunking tests and imports chunk_document. |
| src/backend/core/tests/test_decorators.py | Adds tests for new throttling decorator. |
| src/backend/core/tests/test_api_documents_search_access_control.py | Updates IDs to UUIDs and adjusts fixtures. |
| src/backend/core/tests/test_api_documents_search.py | Adjusts tests for updated indexing/search output expectations. |
| src/backend/core/tests/test_api_documents_index_single.py | Updates single-index tests for async embedding dispatch. |
| src/backend/core/tests/test_api_documents_index_bulk.py | Splits bulk indexing tests by hybrid enabled/disabled; checks async dispatch. |
| src/backend/core/tests/test_api_documents_delete.py | Switches to new test prepare_index import. |
| src/backend/core/tests/test_adapters.py | Adds tests for OpenSearch adapter roundtrips and language helpers. |
| src/backend/core/tests/commands/test_reindex_with_embedding.py | Updates command tests to use call_command and new prepare_index import. |
| src/backend/core/tasks/indexing.py | Adds Celery task intended to embed pending documents. |
| src/backend/core/tasks/init.py | Declares tasks package. |
| src/backend/core/services/reindex.py | Adds reindexing service for chunking+embedding and bulk updating. |
| src/backend/core/services/opensearch_configuration.py | Adds indexing_status mapping to OpenSearch schema. |
| src/backend/core/services/indexing.py | Removes synchronous prepare_document_for_indexing path. |
| src/backend/core/services/embedding.py | Applies throttling decorator to embedding API calls. |
| src/backend/core/schemas.py | Adds IndexedDocumentSchema including language/status fields. |
| src/backend/core/management/commands/reindex_with_embedding.py | Refactors command to call new reindex service with query. |
| src/backend/core/factories.py | Adds IndexedDocumentSchemaFactory. |
| src/backend/core/enums.py | Adds IndexingStatusEnum. |
| src/backend/core/decorators.py | Adds cache-based throttling decorator implementation. |
| src/backend/core/adapters.py | Adds OpenSearch <-> domain conversion helpers. |
| docs/env.md | Documents new embedding-related env vars. |
| CHANGELOG.md | Notes async embedding and throttle system. |
Comments suppressed due to low confidence (1)
src/backend/evaluation/management/commands/evaluate_search_engine.py:33
evaluate_search_enginenow importsprepare_indexfromcore.tests.utils. Pulling test helpers into a management command (non-test code) creates an undesirable dependency on the test suite and can break in production installs where test modules aren’t packaged. Moveprepare_indexto a non-test module (e.g.core.services.indexingor anevaluation.utils) and import from there instead.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| key = f"throttle-lock:{name}" | ||
|
|
||
| # Redis is used as cache database (not in tests). Use the lock feature here | ||
| # to ensure atomicity of changes to the throttle flag. | ||
| if isinstance(cache, RedisCache) and atomic: | ||
| with cache.client.get_client().lock(key, timeout=timeout): | ||
| return throttle_acquire(name, timeout, atomic=False) | ||
|
|
||
| # cache.add() is atomic test-and-set operation: | ||
| # - If key doesn't exist: creates it with timeout and returns True | ||
| # - If key already exists: does nothing and returns False | ||
| # The key expires after timeout seconds, releasing the lock. | ||
| # The value 1 is irrelevant, only the key presence/absence matters. | ||
| return cache.add(key, 1, timeout=timeout) |
There was a problem hiding this comment.
throttle_acquire uses the same Redis key for both the distributed lock and the debounce flag. With django-redis, lock(key) will set that key in Redis, so the subsequent cache.add(key, ...) will always return False, meaning the throttle will never be acquired in production (and embedding tasks may never be scheduled). Use a separate lock key (e.g. lock:{key}) and a distinct flag key, or rely solely on an atomic Redis primitive (e.g. SET key value NX EX timeout) instead of mixing lock() + add() on the same key.
There was a problem hiding this comment.
i confess i do not understand the recursive part of this function. this is a copy of a logic in Docs.
to mei should work only with
cache.add(key, 1, timeout=timeout)
| def decorator(func): | ||
| redis_key = f"throttle:{func.__module__}.{func.__name__}" | ||
|
|
||
| @wraps(func) | ||
| def wrapper(*args, **kwargs): | ||
| now = time.time() | ||
| window_timestamps = get_window_timestamps(redis_key, now - period) | ||
|
|
||
| while len(window_timestamps) >= max_calls: | ||
| wait_time = period - (now - min(window_timestamps)) | ||
| if wait_time > 0: | ||
| logger.info( | ||
| "Throttle limit reached for '%s' " | ||
| "(%d calls in %d seconds). Waiting %.2f seconds.", | ||
| redis_key, | ||
| max_calls, | ||
| period, | ||
| wait_time, | ||
| ) | ||
| time.sleep(wait_time) | ||
|
|
||
| now = time.time() | ||
| # move the window after waiting | ||
| window_timestamps = get_window_timestamps(redis_key, now - period) | ||
|
|
||
| window_timestamps.append(now) | ||
| cache.set(redis_key, window_timestamps, timeout=period) | ||
|
|
There was a problem hiding this comment.
The throttle decorator claims to be a distributed rate limiter, but the implementation does a non-atomic read-modify-write on a list stored in cache (get -> filter -> set). With multiple workers, concurrent calls can race and exceed the intended rate limit. Consider using Redis atomic operations (e.g. sorted sets + ZADD/ZREMRANGEBYSCORE/ZCARD, or an INCR-based fixed window) to enforce limits correctly under concurrency.
this improves typing
this task takes embeddings in charged
the all indexing pipeline is changed to use the async task
the embedding must not be spamed. the throttling prevents spaming it.
this adds a throttle the task itself
some tests are impacted by the changes
the reindex command can be (even) better
this adds more tests
this adds validator on schema
Purpose
improving the ability of the indexing pipeline to handle heavy loads.
Proposal