Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions bigdata_briefs/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,29 @@ class BriefCreationRequest(BaseModel):
le=10,
examples=[settings.API_FRESHNESS_BOOST],
)
sentiment_threshold: float | None = Field(
None,
description=(
"Sentiment filter magnitude for every Bigdata /v1/search in this brief: chunks outside "
"[-1,-t] ∪ [t,1] are excluded. Omit to use the server default "
f"({settings.EXPLORATORY_SENTIMENT_THRESHOLD}). Use 0 to disable sentiment filtering."
),
ge=0,
le=1,
examples=[settings.EXPLORATORY_SENTIMENT_THRESHOLD],
)
rerank_threshold: float | None = Field(
None,
description=(
"Reranker score threshold for exploratory and follow-up Bigdata /v1/search calls "
"(ranking_params.reranker). Omit to use built-in defaults: exploratory "
f"{settings.API_RERANK_EXPLORATORY}, follow-up {settings.API_RERANK_FOLLOWUP}. "
"The initial lightweight \"has results\" probe always runs with reranking disabled."
),
ge=0,
le=1,
examples=[settings.API_RERANK_EXPLORATORY],
)


class BriefAcceptedResponse(BaseModel):
Expand Down
10 changes: 5 additions & 5 deletions bigdata_briefs/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Type
from typing import Any, cast

from pydantic import BaseModel

Expand All @@ -7,7 +7,7 @@
from bigdata_briefs.sql_models import SQLBriefReport


def get_example_values_from_schema(schema_model: Type[BaseModel]) -> dict:
def get_example_values_from_schema(schema_model: type[BaseModel]) -> dict:
"""
Extract example values from a Pydantic model's fields, falling back to defaults if no example is provided.
Args:
Expand All @@ -18,9 +18,9 @@ def get_example_values_from_schema(schema_model: Type[BaseModel]) -> dict:
example_values = {}
for field_name, field in schema_model.model_fields.items():
example = None
if isinstance(field.json_schema_extra, dict):
if "example" in field.json_schema_extra:
example = field.json_schema_extra["example"]
extra = field.json_schema_extra
if isinstance(extra, dict):
example = cast(dict[str, Any], extra).get("example")
elif field.examples:
if field.examples:
example = field.examples[0]
Expand Down
13 changes: 11 additions & 2 deletions bigdata_briefs/llm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import openai
from pydantic import BaseModel

from bigdata_briefs import logger
from bigdata_briefs import LOG_LEVEL, logger
from bigdata_briefs.metrics import LLMMetrics
from bigdata_briefs.models import LLMUsage
from bigdata_briefs.settings import settings
Expand All @@ -22,7 +22,10 @@ class FollowUpQuestionsPromptDefaults(BaseModel):
class LLMClient:
def __init__(self, client: openai.OpenAI | None = None):
if client is None:
client = openai.OpenAI()
client = openai.OpenAI(
timeout=settings.OPENAI_TIMEOUT_SECONDS,
max_retries=settings.LLM_RETRIES,
)
self.client = client

@log_time
Expand Down Expand Up @@ -91,6 +94,12 @@ def _call_with_retries(self, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
if LOG_LEVEL == "DEBUG" and "timeout" in str(e).lower():
prompt_payload = kwargs.get("input") or kwargs.get("messages")
print(
"\n[DEBUG][LLM TIMEOUT] Prompt payload sent to LLM:\n",
json.dumps(prompt_payload, indent=2, ensure_ascii=False),
)
if attempt >= settings.LLM_RETRIES - 1:
raise
logger.warning(f"Error calling LLM: {e}. Attempt {attempt + 1}")
Expand Down
12 changes: 8 additions & 4 deletions bigdata_briefs/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from queue import Queue
from threading import Lock
from typing import Any, ClassVar

from bigdata_briefs import logger
from bigdata_briefs.models import (
Expand All @@ -12,6 +13,9 @@


class Metrics(ABC):
lock: ClassVar[Any]
metrics_queue: ClassVar[Queue[Any]]

@classmethod
@abstractmethod
def track_usage(cls, usage): ...
Expand Down Expand Up @@ -66,12 +70,12 @@ class WarningsMetrics(Metrics):
lock = Lock()

@classmethod
def track_usage(cls, warning_message: str):
def track_usage(cls, usage: str):
with cls.lock:
# Avoid logging duplicate warnings
if warning_message not in cls.warnings:
logger.info("A warning have been suppressed", warning=warning_message)
cls.warnings.add(warning_message)
if usage not in cls.warnings:
logger.info("A warning have been suppressed", warning=usage)
cls.warnings.add(usage)

@classmethod
def get_total_usage(cls) -> set[str]:
Expand Down
6 changes: 5 additions & 1 deletion bigdata_briefs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ def from_api(cls, api_document):
ts=api_document["timestamp"],
document_scope=api_document.get("document_type", "Unknown"),
language=api_document.get("language", "Unknown"),
chunks=[Chunk.from_api(api_chunk) for api_chunk in api_document["chunks"]],
chunks=tuple(
Chunk.from_api(api_chunk) for api_chunk in api_document["chunks"]
),
)


Expand Down Expand Up @@ -205,6 +207,8 @@ class ValidatedInput(BaseModel):
categories: list[str] | None
source_rank_boost: int | None
freshness_boost: int | None
sentiment_threshold: float
rerank_threshold: float | None = None


class FollowUpAnalysis(BaseModel):
Expand Down
5 changes: 4 additions & 1 deletion bigdata_briefs/novelty/embedding_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ class EmbeddingClient:
def __init__(self, model: str, client: openai.OpenAI | None = None):
self.model = model
if client is None:
client = openai.OpenAI()
client = openai.OpenAI(
timeout=settings.OPENAI_TIMEOUT_SECONDS,
max_retries=settings.EMBEDDING_RETRIES,
)
self.client = client

def compute(self, texts: list[str], **kwargs) -> list[list[float]]:
Expand Down
18 changes: 18 additions & 0 deletions bigdata_briefs/query_service/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ def run_exploratory_search(
self._run_single_exploratory_search,
entity_id=entity.id,
report_dates=report_dates,
source_filter=source_filter,
categories=categories,
sentiment_threshold=sentiment_threshold,
chunk_limit=chunk_limit,
rerank_threshold=rerank_threshold,
source_rank_boost=source_rank_boost,
freshness_boost=freshness_boost,
enable_metric=True,
metric_name=f"Exploratory search. Entity {entity.id}",
)
Expand All @@ -288,6 +295,13 @@ def run_exploratory_search(
return self._run_single_exploratory_search(
entity_id=entity.id,
report_dates=report_dates,
source_filter=source_filter,
categories=categories,
sentiment_threshold=sentiment_threshold,
chunk_limit=chunk_limit,
rerank_threshold=rerank_threshold,
source_rank_boost=source_rank_boost,
freshness_boost=freshness_boost,
enable_metric=True,
metric_name=f"Exploratory search. Entity {entity.id}",
)
Expand Down Expand Up @@ -348,6 +362,8 @@ def run_query_with_follow_up_questions(
executor: ThreadPoolExecutor,
source_rank_boost: int | None = settings.API_SOURCE_RANK_BOOST,
freshness_boost: int | None = settings.API_FRESHNESS_BOOST,
sentiment_threshold: float | None = settings.FOLLOWUP_SENTIMENT_THRESHOLD,
rerank_threshold: float | None = settings.API_RERANK_FOLLOWUP,
) -> QAPairs:
future_to_question = {
executor.submit(
Expand All @@ -359,6 +375,8 @@ def run_query_with_follow_up_questions(
categories=categories,
source_rank_boost=source_rank_boost,
freshness_boost=freshness_boost,
sentiment_threshold=sentiment_threshold,
rerank_threshold=rerank_threshold,
): question
for question in follow_up_questions
}
Expand Down
5 changes: 4 additions & 1 deletion bigdata_briefs/query_service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def check_if_entity_has_results(
categories: list[str] | None = None,
sentiment_threshold: float | None = None,
chunk_limit: int | None = None,
rerank_threshold: float = 0.0,
rerank_threshold: float | None = None,
) -> list[Result]: ...

@abstractmethod
Expand Down Expand Up @@ -106,7 +106,10 @@ def run_query_with_follow_up_questions(
follow_up_questions: list[str],
report_dates: ReportDates,
source_filter: list[str] | None,
categories: list[str] | None,
executor: ThreadPoolExecutor,
source_rank_boost: int | None,
freshness_boost: int | None,
sentiment_threshold: float | None = None,
rerank_threshold: float | None = None,
) -> QAPairs: ...
20 changes: 14 additions & 6 deletions bigdata_briefs/query_service/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Literal, NotRequired, TypedDict
from typing import Literal, NotRequired, TypedDict


class TimestampFilter(TypedDict):
Expand All @@ -7,21 +7,29 @@ class TimestampFilter(TypedDict):


class EntityFilter(TypedDict):
any_of: List[str]
any_of: list[str]


class SentimentFilter(TypedDict):
values: List[Literal["positive", "negative", "neutral"]]
class SentimentRangeBand(TypedDict):
min: float
max: float


class SentimentFilter(TypedDict, total=False):
"""API supports categorical values or numeric range bands (magnitude filter)."""

values: list[Literal["positive", "negative", "neutral"]]
ranges: list[SentimentRangeBand]


class SourceFilter(TypedDict):
mode: Literal["INCLUDE", "EXCLUDE"]
values: List[str]
values: list[str]


class CategoryFilter(TypedDict):
mode: Literal["INCLUDE", "EXCLUDE"]
values: List[str]
values: list[str]


class Filters(TypedDict, total=False):
Expand Down
Loading
Loading