Skip to content

✨(backend) embed async#72

Open
mascarpon3 wants to merge 9 commits intomainfrom
embed-async
Open

✨(backend) embed async#72
mascarpon3 wants to merge 9 commits intomainfrom
embed-async

Conversation

@mascarpon3
Copy link
Copy Markdown
Contributor

@mascarpon3 mascarpon3 commented Apr 13, 2026

Purpose

improving the ability of the indexing pipeline to handle heavy loads.

Proposal

  • improve typing
  • improve format conversion
  • add async task for embedding
  • throttle embedding function to prevent spaming embedding endpoint
  • throttle embedding task to prevent running multiple tasks at once.

@mascarpon3 mascarpon3 force-pushed the embed-async branch 3 times, most recently from 0fb951f to 8115d0d Compare April 15, 2026 09:40
@mascarpon3 mascarpon3 changed the title ✨(backend) improve type checking and conversions ✨(backend) embed async Apr 15, 2026
@mascarpon3 mascarpon3 force-pushed the embed-async branch 2 times, most recently from cd65091 to 5037a2a Compare April 15, 2026 13:53
@mascarpon3 mascarpon3 requested a review from Copilot April 15, 2026 14:18
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR makes document embedding asynchronous and introduces throttling to improve indexing pipeline resilience under heavy load, while refactoring the indexing/reindexing flow and updating related evaluation data and tests.

Changes:

  • Add async embedding workflow: documents can be indexed as to-be-embedded and later embedded via a background task.
  • Add throttling controls for embedding API calls and debouncing for embedding task dispatch.
  • Refactor OpenSearch document formatting via adapters/schema updates; update evaluation corpora/test fixtures to UUID-based IDs.

Reviewed changes

Copilot reviewed 36 out of 37 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
src/backend/find/settings.py Adds embedding throttle/task countdown settings.
src/backend/evaluation/management/commands/evaluate_search_engine.py Updates evaluation indexing setup and imports.
src/backend/evaluation/data/v0/queries.py Updates expected IDs to UUIDs.
src/backend/evaluation/data/v0/documents.py Updates document IDs to UUIDs.
src/backend/evaluation/data/semantic/queries.py Updates expected IDs to UUIDs.
src/backend/evaluation/data/full_text/queries.py Updates expected IDs to UUIDs.
src/backend/evaluation/data/full_text/documents.py Updates document IDs to UUIDs.
src/backend/evaluation/data/corpus/simple_corpus.py Updates corpus IDs to UUIDs.
src/backend/core/views.py Indexing endpoint now sets indexing_status and schedules embedding task (debounced).
src/backend/core/utils.py Adds throttle_acquire + extract_language_code; removes old test-only prepare_index.
src/backend/core/tests/utils.py Introduces test prepare_index and mock indexing pipeline helpers.
src/backend/core/tests/test_utils.py Adds unit tests for extract_language_code.
src/backend/core/tests/test_search.py Uses new test prepare_index with embedding-enabled option.
src/backend/core/tests/test_reindex.py Adds coverage for new reindex-with-embedding service behavior.
src/backend/core/tests/test_embedding.py Adds chunking tests and imports chunk_document.
src/backend/core/tests/test_decorators.py Adds tests for new throttling decorator.
src/backend/core/tests/test_api_documents_search_access_control.py Updates IDs to UUIDs and adjusts fixtures.
src/backend/core/tests/test_api_documents_search.py Adjusts tests for updated indexing/search output expectations.
src/backend/core/tests/test_api_documents_index_single.py Updates single-index tests for async embedding dispatch.
src/backend/core/tests/test_api_documents_index_bulk.py Splits bulk indexing tests by hybrid enabled/disabled; checks async dispatch.
src/backend/core/tests/test_api_documents_delete.py Switches to new test prepare_index import.
src/backend/core/tests/test_adapters.py Adds tests for OpenSearch adapter roundtrips and language helpers.
src/backend/core/tests/commands/test_reindex_with_embedding.py Updates command tests to use call_command and new prepare_index import.
src/backend/core/tasks/indexing.py Adds Celery task intended to embed pending documents.
src/backend/core/tasks/init.py Declares tasks package.
src/backend/core/services/reindex.py Adds reindexing service for chunking+embedding and bulk updating.
src/backend/core/services/opensearch_configuration.py Adds indexing_status mapping to OpenSearch schema.
src/backend/core/services/indexing.py Removes synchronous prepare_document_for_indexing path.
src/backend/core/services/embedding.py Applies throttling decorator to embedding API calls.
src/backend/core/schemas.py Adds IndexedDocumentSchema including language/status fields.
src/backend/core/management/commands/reindex_with_embedding.py Refactors command to call new reindex service with query.
src/backend/core/factories.py Adds IndexedDocumentSchemaFactory.
src/backend/core/enums.py Adds IndexingStatusEnum.
src/backend/core/decorators.py Adds cache-based throttling decorator implementation.
src/backend/core/adapters.py Adds OpenSearch <-> domain conversion helpers.
docs/env.md Documents new embedding-related env vars.
CHANGELOG.md Notes async embedding and throttle system.
Comments suppressed due to low confidence (1)

src/backend/evaluation/management/commands/evaluate_search_engine.py:33

  • evaluate_search_engine now imports prepare_index from core.tests.utils. Pulling test helpers into a management command (non-test code) creates an undesirable dependency on the test suite and can break in production installs where test modules aren’t packaged. Move prepare_index to a non-test module (e.g. core.services.indexing or an evaluation.utils) and import from there instead.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/backend/core/utils.py
Comment on lines +32 to +45
key = f"throttle-lock:{name}"

# Redis is used as cache database (not in tests). Use the lock feature here
# to ensure atomicity of changes to the throttle flag.
if isinstance(cache, RedisCache) and atomic:
with cache.client.get_client().lock(key, timeout=timeout):
return throttle_acquire(name, timeout, atomic=False)

# cache.add() is atomic test-and-set operation:
# - If key doesn't exist: creates it with timeout and returns True
# - If key already exists: does nothing and returns False
# The key expires after timeout seconds, releasing the lock.
# The value 1 is irrelevant, only the key presence/absence matters.
return cache.add(key, 1, timeout=timeout)
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throttle_acquire uses the same Redis key for both the distributed lock and the debounce flag. With django-redis, lock(key) will set that key in Redis, so the subsequent cache.add(key, ...) will always return False, meaning the throttle will never be acquired in production (and embedding tasks may never be scheduled). Use a separate lock key (e.g. lock:{key}) and a distinct flag key, or rely solely on an atomic Redis primitive (e.g. SET key value NX EX timeout) instead of mixing lock() + add() on the same key.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i confess i do not understand the recursive part of this function. this is a copy of a logic in Docs.

to mei should work only with

cache.add(key, 1, timeout=timeout)

Comment thread src/backend/core/tasks/indexing.py
Comment thread src/backend/core/schemas.py Outdated
Comment thread src/backend/core/views.py Outdated
Comment on lines +24 to +51
def decorator(func):
redis_key = f"throttle:{func.__module__}.{func.__name__}"

@wraps(func)
def wrapper(*args, **kwargs):
now = time.time()
window_timestamps = get_window_timestamps(redis_key, now - period)

while len(window_timestamps) >= max_calls:
wait_time = period - (now - min(window_timestamps))
if wait_time > 0:
logger.info(
"Throttle limit reached for '%s' "
"(%d calls in %d seconds). Waiting %.2f seconds.",
redis_key,
max_calls,
period,
wait_time,
)
time.sleep(wait_time)

now = time.time()
# move the window after waiting
window_timestamps = get_window_timestamps(redis_key, now - period)

window_timestamps.append(now)
cache.set(redis_key, window_timestamps, timeout=period)

Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The throttle decorator claims to be a distributed rate limiter, but the implementation does a non-atomic read-modify-write on a list stored in cache (get -> filter -> set). With multiple workers, concurrent calls can race and exceed the intended rate limit. Consider using Redis atomic operations (e.g. sorted sets + ZADD/ZREMRANGEBYSCORE/ZCARD, or an INCR-based fixed window) to enforce limits correctly under concurrency.

Copilot uses AI. Check for mistakes.
Comment thread src/backend/core/utils.py Outdated
Comment thread src/backend/core/tests/test_api_documents_search_access_control.py
Comment thread src/backend/core/tests/test_api_documents_index_bulk.py Outdated
@suitenumerique suitenumerique deleted a comment from Copilot AI Apr 16, 2026
@mascarpon3 mascarpon3 marked this pull request as ready for review April 16, 2026 08:14
this task takes embeddings in charged
the all indexing pipeline is changed to
use the async task
the embedding must not be spamed.
the throttling prevents spaming it.
this adds a throttle the task itself
some tests are impacted by the changes
the reindex command can be (even) better
this adds more tests
this adds validator on schema
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants