Skip to content

WIP✨(backend) refactor indexation pipeline#26

Open
joehybird wants to merge 4 commits intomainfrom
index-pipeline
Open

WIP✨(backend) refactor indexation pipeline#26
joehybird wants to merge 4 commits intomainfrom
index-pipeline

Conversation

@joehybird
Copy link
Copy Markdown
Contributor

@joehybird joehybird commented Nov 24, 2025

Add support for deferred loading, preprocessing & embedding of documents.

TODO

  • Throttle indexation tasks or create commands + cron
  • Parser for pdf files with albert
  • Download directly from storage APIs (S3)
  • Use base64 + "encoding=base64" argument for small binary files sent directly through index/ endpoint

Whishlist

  • Add async support for download (use asyncio loop)
  • Parser for all formats with a dedicated service (docling, unstructured, ...)
  • Use hash to prevent indexing the same content multiple times

@joehybird joehybird changed the title ✨(backend) refactor indexation pipeline WIP✨(backend) refactor indexation pipeline Nov 24, 2025
Comment thread docs/env.md Outdated
Comment thread src/backend/core/schemas.py
Comment thread src/backend/core/factories.py
@property
def is_loaded(self):
"""Retuns true if in loaded status"""
return self.content_status == enums.ContentStatusEnum.LOADED
Copy link
Copy Markdown
Contributor

@mascarpon3 mascarpon3 Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't "indexing_status" more descriptive than "content_status" ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum... it is for the loading and the preprocessing of the content, but also the embedding... so you may be right.

Comment thread src/backend/core/schemas.py Outdated
@joehybird joehybird force-pushed the index-pipeline branch 5 times, most recently from 55ccc2b to 06cb52f Compare November 26, 2025 17:14
Add support for deferred loading, preprocessing & embedding of documents.
Add mimetype, language, content_status & content_uri fields in document schema.

Signed-off-by: Fabre Florian <ffabre@hybird.org>
Add AlbertAI client to wrap embedding & conversion API calls
Implement working pdf to markdown converter using Albert

Signed-off-by: Fabre Florian <ffabre@hybird.org>
Use service.index_name instead of service.name in create_demo
command.

Signed-off-by: Fabre Florian <ffabre@hybird.org>
@joehybird joehybird force-pushed the index-pipeline branch 2 times, most recently from 5c65736 to 3e822c5 Compare November 27, 2025 14:00
New processors mechanism in IndexerTaskService : after the loading & conversion
steps a list a functions can be chained to transform the document content (like
django middlewares)

Signed-off-by: Fabre Florian <ffabre@hybird.org>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors the backend document indexation pipeline to support deferred loading/preprocessing/embedding (via a new IndexerTaskService + Celery tasks) and introduces Albert AI integration for PDF parsing and embeddings.

Changes:

  • Add IndexerTaskService + Celery tasks to split indexing into synchronous “store metadata/content state” and deferred “load/process/embed” steps.
  • Extend document schema/index mapping with content_uri, content_status, and mimetype, and add Albert AI client + PDF converter.
  • Add/adjust tests and demo commands to cover the new indexing workflow.

Reviewed changes

Copilot reviewed 20 out of 21 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
src/backend/find/settings.py Adds indexer/albert-related settings and adjusts test settings.
src/backend/demo/management/commands/create_demo.py Uses secrets and updates indexing to use service.index_name.
src/backend/demo/management/commands/albert.py New management command for manually testing Albert parse.
src/backend/core/views.py Refactors /documents/index/ to use IndexerTaskService + deferred task dispatch.
src/backend/core/tests/utils.py Adds refresh_index() helper and uses it in index prep.
src/backend/core/tests/test_indexer_services.py Large new test suite for indexing/loading/processing/embedding and task dispatch logic.
src/backend/core/tests/test_api_documents_index_single.py Updates expectations for deferred embedding and adds content_uri coverage.
src/backend/core/tests/test_api_documents_index_bulk.py Updates bulk error mocking to include _id.
src/backend/core/tests/test_albert_services.py New tests for Albert embedding/convert client.
src/backend/core/tasks/indexer.py New Celery tasks and dispatch helper for deferred steps.
src/backend/core/services/opensearch.py Extends index mapping for new fields; marks embed_text as deprecated.
src/backend/core/services/indexer_services.py New core service implementing bulk ops, conversion, deferred load/process/embed.
src/backend/core/services/converters.py New PDF converter using Albert parse endpoint.
src/backend/core/services/albert.py New Albert AI client wrapper for embedding + conversion.
src/backend/core/schemas.py Adds content_uri and mimetype to the document schema.
src/backend/core/models.py Adds IndexDocument dataclass representation for OpenSearch docs.
src/backend/core/factories.py Adds factories/fields for new document attributes and IndexDocumentFactory.
src/backend/core/enums.py Adds ContentStatusEnum.
docs/env.md Documents new indexer environment variables.
CHANGELOG.md Adds changelog entry for async indexer task service.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/backend/core/views.py
Comment on lines +155 to +156
return Response(
{"status": "error", **errors[0]}, status=status.HTTP_400_BAD_REQUEST
Comment on lines +31 to +36
if service is not None:
indexer = IndexerTaskService(service, force_refresh=True)

logger.info("Start deferred loading on index %s", service.index_name)
indexer.load_n_process_all()

Comment on lines +67 to +74
self.client = client or opensearch_client()
self.index_name = index_name
self._actions = []
self._errors = {}

def errors(self):
"""Returns errors from last commit"""
return self._errors
Comment on lines +299 to +306
def stream_content(self, document):
"""Open download stream containing the file data"""
try:
response = requests.get(
document.content_uri, stream=True, timeout=self.download_timeout
)
response.raise_for_status()
return response.raw
try:
return models.Service.objects.get(pk=service_id, is_active=True)
except models.Service.DoesNotExist:
logging.warning("The service {service_id} does not exit or disabled")
self.service = service
self.batch_size = batch_size
self.client = client or opensearch_client()
self.force_refresh = force_refresh or settings.INDEXER_FORCE_REFRESH

# Hybrid search is disabled, skip it
if not check_hybrid_search_enabled():
return ()
Comment on lines +23 to +55
"-f", "--format", dest="format", default="markdown", help="output format"
)
parse.add_argument(
"-p", "--pages", dest="page", default="", help="extracted pages"
)

def handle(self, *args, **options):
"""Handling of the management command."""
action = options.get("action")

try:
handler = getattr(self, f"handle_{action}")
except AttributeError:
self.print_help("albert", action)
return

handler(options)

def handle_parse(self, options):
"""Handling of the file convertion using Albert AI (only pdf)"""
paths = [Path(p) for p in options.get("files", [])]
albert = AlbertAI()

for path in paths:
with open(path, "rb") as fd:
try:
self.stdout.write(
albert.convert(
content=fd,
mimetype=guess_type(path)[0],
output=options.get("format"),
pages=options.get("pages"),
)
Comment thread src/backend/core/views.py
Comment on lines +132 to +137
# Update error status of documents
errors = {e.id: e for e in errors}

for result in results:
error = errors.get(result["_id"])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants