diff --git a/eu_fact_force/ingestion/embedding.py b/eu_fact_force/ingestion/embedding.py index 2834742..f3de901 100644 --- a/eu_fact_force/ingestion/embedding.py +++ b/eu_fact_force/ingestion/embedding.py @@ -1,6 +1,12 @@ -from eu_fact_force.ingestion.models import DocumentChunk from typing import Iterator +import structlog + +from eu_fact_force.ingestion.models import DocumentChunk +from eu_fact_force.utils.decorators import tracker + +LOGGER = structlog.get_logger(__name__) + MODEL_ID = "intfloat/multilingual-e5-base" # E5 models expect "passage: " for documents to index and "query: " for search queries (asymmetric retrieval). PASSAGE_PREFIX = "passage: " @@ -37,11 +43,14 @@ def embed_query(query: str) -> list[float]: return out.tolist() if hasattr(out, "tolist") else list(out) -def _iter_batches(items: list[DocumentChunk], batch_size: int) -> Iterator[list[DocumentChunk]]: +def _iter_batches( + items: list[DocumentChunk], batch_size: int +) -> Iterator[list[DocumentChunk]]: for start in range(0, len(items), batch_size): yield items[start : start + batch_size] +@tracker(ulogger=LOGGER, inputs=True, log_start=True) def add_embeddings(chunks: list[DocumentChunk]): """ Add embeddings to the chunks and update in the DB. @@ -61,5 +70,7 @@ def add_embeddings(chunks: list[DocumentChunk]): normalize_embeddings=True, ) for chunk, vector in zip(batch, vectors): - chunk.embedding = vector.tolist() if hasattr(vector, "tolist") else list(vector) + chunk.embedding = ( + vector.tolist() if hasattr(vector, "tolist") else list(vector) + ) DocumentChunk.objects.bulk_update(batch, ["embedding"]) diff --git a/eu_fact_force/ingestion/parsing/__init__.py b/eu_fact_force/ingestion/parsing/__init__.py index 1577584..d020c5e 100644 --- a/eu_fact_force/ingestion/parsing/__init__.py +++ b/eu_fact_force/ingestion/parsing/__init__.py @@ -2,14 +2,24 @@ from __future__ import annotations +import tempfile from contextlib import contextmanager from pathlib import Path -import tempfile + +import structlog +from django.core.files.storage import default_storage + from eu_fact_force.exploration.parsing_benchmarking.benchmarking.parsers import ( parse_docling, ) -from eu_fact_force.ingestion.chunking import MAX_CHUNK_CHARS, split_into_paragraph_chunks -from django.core.files.storage import default_storage +from eu_fact_force.ingestion.chunking import ( + MAX_CHUNK_CHARS, + split_into_paragraph_chunks, +) +from eu_fact_force.utils.decorators import tracker + +LOGGER = structlog.get_logger(__name__) + @contextmanager def _source_file_local_path(source_file): @@ -47,6 +57,7 @@ def _extract_text_from_source_file(source_file) -> str: return full_text +@tracker(ulogger=LOGGER, inputs=True, log_start=True) def parse_file(source_file) -> list[str]: """ Parse the source file and return paragraph-bounded text chunks. diff --git a/eu_fact_force/utils/decorators.py b/eu_fact_force/utils/decorators.py new file mode 100644 index 0000000..5be70b3 --- /dev/null +++ b/eu_fact_force/utils/decorators.py @@ -0,0 +1,124 @@ +import functools +import inspect +import logging +import time + +import structlog + + +def _is_structlog_logger(logger) -> bool: + """Best-effort check to detect structlog loggers.""" + if logger is None: + return False + return logger.__class__.__module__.startswith("structlog") + + +def log_msg(logger, level, msg, extra): + # If this is a structlog logger, keep structured kwargs + if _is_structlog_logger(logger): + if level.lower() == "info": + logger.info(msg, **extra) + elif level.lower() == "debug": + logger.debug(msg, **extra) + else: + logger.log(level.upper(), msg, **extra) # type: ignore[attr-defined] + return + + # For non-structlog loggers (e.g. Dagster / stdlib logging), + # stringify the extra dict and log it as part of the message. + if not isinstance(logger, logging.Logger | logging.LoggerAdapter): + logger = logging.getLogger() # root logger + + logging_level = getattr(logging, level.upper(), logging.INFO) + extra_str = str(extra) if extra is not None else "" + if extra_str: + logger.log(logging_level, "%s %s", msg, extra_str) + else: + logger.log(logging_level, "%s", msg) + + +def _build_extra(func_name, args, kwargs, include_inputs): + extra = {"function_": func_name} + if include_inputs: + for k, v in kwargs.items(): + extra["args_" + k] = v + + for i, v in enumerate(args): + extra["args_" + str(i)] = v + return extra + + +def _log_start(ulogger, level, log_start, extra): + if log_start: + log_msg(ulogger, level, "start", extra) + + +def _log_end(ulogger, level, tracker_state: dict) -> None: + outputs = tracker_state["outputs"] + value = tracker_state["value"] + start_time = tracker_state["start_time"] + extra = tracker_state["extra"] + end_time = time.time() + extra["duration_"] = round((end_time - start_time), 3) + if outputs: + extra["return_"] = value + log_msg(ulogger, level, "tracker", extra) + + +def tracker( + _func=None, ulogger=None, inputs=False, outputs=False, log_start=False, level="info" +): + """Log the trace of the program""" + if ulogger is None: + ulogger = structlog.get_logger() + + def decorator_tracker(func): + if inspect.iscoroutinefunction(func): + + @functools.wraps(func) + async def wrapper_logger(*args, **kwargs): + extra = _build_extra(func.__name__, args, kwargs, inputs) + _log_start(ulogger, level, log_start, extra) + start_time = time.time() + value = await func(*args, **kwargs) + _log_end( + ulogger, + level, + { + "outputs": outputs, + "value": value, + "start_time": start_time, + "extra": extra, + }, + ) + + return value + + return wrapper_logger + else: + + @functools.wraps(func) + def wrapper_logger(*args, **kwargs): + extra = _build_extra(func.__name__, args, kwargs, inputs) + _log_start(ulogger, level, log_start, extra) + start_time = time.time() + value = func(*args, **kwargs) + _log_end( + ulogger, + level, + { + "outputs": outputs, + "value": value, + "start_time": start_time, + "extra": extra, + }, + ) + + return value + + return wrapper_logger + + if _func is None: + return decorator_tracker + else: + return decorator_tracker(_func) diff --git a/pyproject.toml b/pyproject.toml index 96020a7..4ee2659 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "docling-hierarchical-pdf>=0.1.6", "requests>=2.32.5", "arxiv>=2.4.1", + "structlog>=25.5.0", ] [tool.pytest.ini_options] diff --git a/uv.lock b/uv.lock index ecd5cd4..b1ba1a2 100644 --- a/uv.lock +++ b/uv.lock @@ -20,6 +20,7 @@ dependencies = [ { name = "python-dotenv" }, { name = "requests" }, { name = "sentence-transformers" }, + { name = "structlog" }, ] [package.dev-dependencies] @@ -56,6 +57,7 @@ requires-dist = [ { name = "python-dotenv", specifier = ">=1.0" }, { name = "requests", specifier = ">=2.32.5" }, { name = "sentence-transformers", specifier = ">=5.2.3" }, + { name = "structlog", specifier = ">=25.5.0" }, ] [package.metadata.requires-dev] @@ -3526,6 +3528,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f1/7b/ce1eafaf1a76852e2ec9b22edecf1daa58175c090266e9f6c64afcd81d91/stack_data-0.6.3-py3-none-any.whl", hash = "sha256:d5558e0c25a4cb0853cddad3d77da9891a08cb85dd9f9f91b9f8cd66e511e695", size = 24521, upload-time = "2023-09-30T13:58:03.53Z" }, ] +[[package]] +name = "structlog" +version = "25.5.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ef/52/9ba0f43b686e7f3ddfeaa78ac3af750292662284b3661e91ad5494f21dbc/structlog-25.5.0.tar.gz", hash = "sha256:098522a3bebed9153d4570c6d0288abf80a031dfdb2048d59a49e9dc2190fc98", size = 1460830, upload-time = "2025-10-27T08:28:23.028Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a8/45/a132b9074aa18e799b891b91ad72133c98d8042c70f6240e4c5f9dabee2f/structlog-25.5.0-py3-none-any.whl", hash = "sha256:a8453e9b9e636ec59bd9e79bbd4a72f025981b3ba0f5837aebf48f02f37a7f9f", size = 72510, upload-time = "2025-10-27T08:28:21.535Z" }, +] + [[package]] name = "sympy" version = "1.14.0"