Open
Conversation
1c7e5ae to
d65ecd4
Compare
qbey
reviewed
Nov 24, 2025
mascarpon3
reviewed
Nov 25, 2025
mascarpon3
reviewed
Nov 25, 2025
| @property | ||
| def is_loaded(self): | ||
| """Retuns true if in loaded status""" | ||
| return self.content_status == enums.ContentStatusEnum.LOADED |
Contributor
There was a problem hiding this comment.
isn't "indexing_status" more descriptive than "content_status" ?
Contributor
Author
There was a problem hiding this comment.
Hum... it is for the loading and the preprocessing of the content, but also the embedding... so you may be right.
mascarpon3
reviewed
Nov 25, 2025
55ccc2b to
06cb52f
Compare
Add support for deferred loading, preprocessing & embedding of documents. Add mimetype, language, content_status & content_uri fields in document schema. Signed-off-by: Fabre Florian <ffabre@hybird.org>
Add AlbertAI client to wrap embedding & conversion API calls Implement working pdf to markdown converter using Albert Signed-off-by: Fabre Florian <ffabre@hybird.org>
Use service.index_name instead of service.name in create_demo command. Signed-off-by: Fabre Florian <ffabre@hybird.org>
5c65736 to
3e822c5
Compare
New processors mechanism in IndexerTaskService : after the loading & conversion steps a list a functions can be chained to transform the document content (like django middlewares) Signed-off-by: Fabre Florian <ffabre@hybird.org>
3e822c5 to
a2d3b08
Compare
There was a problem hiding this comment.
Pull request overview
Refactors the backend document indexation pipeline to support deferred loading/preprocessing/embedding (via a new IndexerTaskService + Celery tasks) and introduces Albert AI integration for PDF parsing and embeddings.
Changes:
- Add
IndexerTaskService+ Celery tasks to split indexing into synchronous “store metadata/content state” and deferred “load/process/embed” steps. - Extend document schema/index mapping with
content_uri,content_status, andmimetype, and add Albert AI client + PDF converter. - Add/adjust tests and demo commands to cover the new indexing workflow.
Reviewed changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
src/backend/find/settings.py |
Adds indexer/albert-related settings and adjusts test settings. |
src/backend/demo/management/commands/create_demo.py |
Uses secrets and updates indexing to use service.index_name. |
src/backend/demo/management/commands/albert.py |
New management command for manually testing Albert parse. |
src/backend/core/views.py |
Refactors /documents/index/ to use IndexerTaskService + deferred task dispatch. |
src/backend/core/tests/utils.py |
Adds refresh_index() helper and uses it in index prep. |
src/backend/core/tests/test_indexer_services.py |
Large new test suite for indexing/loading/processing/embedding and task dispatch logic. |
src/backend/core/tests/test_api_documents_index_single.py |
Updates expectations for deferred embedding and adds content_uri coverage. |
src/backend/core/tests/test_api_documents_index_bulk.py |
Updates bulk error mocking to include _id. |
src/backend/core/tests/test_albert_services.py |
New tests for Albert embedding/convert client. |
src/backend/core/tasks/indexer.py |
New Celery tasks and dispatch helper for deferred steps. |
src/backend/core/services/opensearch.py |
Extends index mapping for new fields; marks embed_text as deprecated. |
src/backend/core/services/indexer_services.py |
New core service implementing bulk ops, conversion, deferred load/process/embed. |
src/backend/core/services/converters.py |
New PDF converter using Albert parse endpoint. |
src/backend/core/services/albert.py |
New Albert AI client wrapper for embedding + conversion. |
src/backend/core/schemas.py |
Adds content_uri and mimetype to the document schema. |
src/backend/core/models.py |
Adds IndexDocument dataclass representation for OpenSearch docs. |
src/backend/core/factories.py |
Adds factories/fields for new document attributes and IndexDocumentFactory. |
src/backend/core/enums.py |
Adds ContentStatusEnum. |
docs/env.md |
Documents new indexer environment variables. |
CHANGELOG.md |
Adds changelog entry for async indexer task service. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+155
to
+156
| return Response( | ||
| {"status": "error", **errors[0]}, status=status.HTTP_400_BAD_REQUEST |
Comment on lines
+31
to
+36
| if service is not None: | ||
| indexer = IndexerTaskService(service, force_refresh=True) | ||
|
|
||
| logger.info("Start deferred loading on index %s", service.index_name) | ||
| indexer.load_n_process_all() | ||
|
|
Comment on lines
+67
to
+74
| self.client = client or opensearch_client() | ||
| self.index_name = index_name | ||
| self._actions = [] | ||
| self._errors = {} | ||
|
|
||
| def errors(self): | ||
| """Returns errors from last commit""" | ||
| return self._errors |
Comment on lines
+299
to
+306
| def stream_content(self, document): | ||
| """Open download stream containing the file data""" | ||
| try: | ||
| response = requests.get( | ||
| document.content_uri, stream=True, timeout=self.download_timeout | ||
| ) | ||
| response.raise_for_status() | ||
| return response.raw |
| try: | ||
| return models.Service.objects.get(pk=service_id, is_active=True) | ||
| except models.Service.DoesNotExist: | ||
| logging.warning("The service {service_id} does not exit or disabled") |
| self.service = service | ||
| self.batch_size = batch_size | ||
| self.client = client or opensearch_client() | ||
| self.force_refresh = force_refresh or settings.INDEXER_FORCE_REFRESH |
|
|
||
| # Hybrid search is disabled, skip it | ||
| if not check_hybrid_search_enabled(): | ||
| return () |
Comment on lines
+23
to
+55
| "-f", "--format", dest="format", default="markdown", help="output format" | ||
| ) | ||
| parse.add_argument( | ||
| "-p", "--pages", dest="page", default="", help="extracted pages" | ||
| ) | ||
|
|
||
| def handle(self, *args, **options): | ||
| """Handling of the management command.""" | ||
| action = options.get("action") | ||
|
|
||
| try: | ||
| handler = getattr(self, f"handle_{action}") | ||
| except AttributeError: | ||
| self.print_help("albert", action) | ||
| return | ||
|
|
||
| handler(options) | ||
|
|
||
| def handle_parse(self, options): | ||
| """Handling of the file convertion using Albert AI (only pdf)""" | ||
| paths = [Path(p) for p in options.get("files", [])] | ||
| albert = AlbertAI() | ||
|
|
||
| for path in paths: | ||
| with open(path, "rb") as fd: | ||
| try: | ||
| self.stdout.write( | ||
| albert.convert( | ||
| content=fd, | ||
| mimetype=guess_type(path)[0], | ||
| output=options.get("format"), | ||
| pages=options.get("pages"), | ||
| ) |
Comment on lines
+132
to
+137
| # Update error status of documents | ||
| errors = {e.id: e for e in errors} | ||
|
|
||
| for result in results: | ||
| error = errors.get(result["_id"]) | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Add support for deferred loading, preprocessing & embedding of documents.
TODO
index/endpointWhishlist