From f8a82b6c1f6237013ade045b5842f9c9158f8115 Mon Sep 17 00:00:00 2001 From: nuwangeek Date: Wed, 25 Feb 2026 17:58:53 +0530 Subject: [PATCH 1/4] fixed issue --- src/llm_orchestration_service.py | 90 ++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/src/llm_orchestration_service.py b/src/llm_orchestration_service.py index 7432957..0224a53 100644 --- a/src/llm_orchestration_service.py +++ b/src/llm_orchestration_service.py @@ -133,45 +133,69 @@ def __init__(self) -> None: # This allows components to be initialized per-request with proper context self.tool_classifier = None - # Initialize shared guardrails adapter at startup - self.shared_guardrails_adapter = self._initialize_shared_guardrails_at_startup() + # Initialize shared guardrails adapters at startup (production and testing) + self.shared_guardrails_adapters = ( + self._initialize_shared_guardrails_at_startup() + ) # Log feature flag configuration FeatureFlags.log_configuration() - def _initialize_shared_guardrails_at_startup(self) -> Optional[NeMoRailsAdapter]: + def _initialize_shared_guardrails_at_startup(self) -> Dict[str, NeMoRailsAdapter]: """ - Initialize shared guardrails at startup. + Initialize shared guardrails adapters at startup for production and testing environments. Returns: - NeMoRailsAdapter if successful, None on failure (graceful degradation) + Dictionary mapping environment names to NeMoRailsAdapter instances. + Empty dict on failure (graceful degradation). """ - try: - logger.info(" Initializing shared guardrails at startup...") - start_time = time.time() + adapters: Dict[str, NeMoRailsAdapter] = {} - # Initialize with production environment and no specific connection - # This creates a shared guardrails instance using default/production config - guardrails_adapter = self._initialize_guardrails( - environment="production", - connection_id=None, # Shared configuration, not user-specific - ) + # Initialize adapters for commonly-used environments + environments_to_initialize = ["production", "testing"] - elapsed_time = time.time() - start_time - logger.info( - f" Shared guardrails initialized successfully in {elapsed_time:.3f}s" - ) + logger.info(" Initializing shared guardrails at startup...") + total_start_time = time.time() - return guardrails_adapter + for env in environments_to_initialize: + try: + logger.info(f" Initializing guardrails for environment: {env}") + start_time = time.time() - except Exception as e: - logger.error(f" Failed to initialize shared guardrails at startup: {e}") + # Initialize with specific environment and no connection (shared config) + guardrails_adapter = self._initialize_guardrails( + environment=env, + connection_id=None, # Shared configuration, not user-specific + ) + + elapsed_time = time.time() - start_time + adapters[env] = guardrails_adapter + logger.info( + f" Guardrails for '{env}' initialized successfully in {elapsed_time:.3f}s" + ) + + except Exception as e: + logger.error(f" Failed to initialize guardrails for '{env}': {e}") + logger.warning( + f" Service will fall back to per-request initialization for '{env}' environment" + ) + # Continue with other environments - partial success is acceptable + continue + + total_elapsed = time.time() - total_start_time + + if adapters: + logger.info( + f" Shared guardrails initialized for {len(adapters)} environment(s) " + f"in {total_elapsed:.3f}s total" + ) + else: logger.error( - " Service will continue without guardrails (graceful degradation)" + " Failed to initialize any shared guardrails - " + "service will use per-request initialization (slower)" ) - # Return None - service continues without guardrails - # Per-request fallback will be attempted if needed - return None + + return adapters @observe(name="orchestration_request", as_type="agent") async def process_orchestration_request( @@ -1079,16 +1103,18 @@ def _initialize_service_components( environment=request.environment, connection_id=request.connection_id ) - # Use shared guardrails adapter (initialized at startup) - # Falls back to per-request initialization if shared instance unavailable - if self.shared_guardrails_adapter is not None: - logger.debug( - "Using shared guardrails adapter (startup-initialized, zero overhead)" + if request.environment in self.shared_guardrails_adapters: + logger.info( + f" Using shared guardrails adapter for environment='{request.environment}' " + f"(startup-initialized, zero overhead)" ) - components["guardrails_adapter"] = self.shared_guardrails_adapter + components["guardrails_adapter"] = self.shared_guardrails_adapters[ + request.environment + ] else: logger.warning( - "Shared guardrails unavailable, initializing per-request (slower)" + f" Shared guardrails unavailable for environment='{request.environment}', " + f"initializing per-request (slower)" ) components["guardrails_adapter"] = self._safe_initialize_guardrails( request.environment, request.connection_id From 3b89fba35cf3b053c2d4fc9153224473ca0bda1a Mon Sep 17 00:00:00 2001 From: nuwangeek Date: Thu, 26 Feb 2026 12:01:34 +0530 Subject: [PATCH 2/4] added hybrid search for the service detection --- docker-compose.yml | 1 + src/intent_data_enrichment/constants.py | 4 + src/intent_data_enrichment/main_enrichment.py | 151 ++++++-- src/intent_data_enrichment/models.py | 21 +- src/intent_data_enrichment/qdrant_manager.py | 274 ++++++++++---- src/tool_classifier/classifier.py | 350 ++++++++++++++++-- src/tool_classifier/constants.py | 19 + src/tool_classifier/sparse_encoder.py | 82 ++++ .../workflows/service_workflow.py | 156 +++++++- 9 files changed, 916 insertions(+), 142 deletions(-) create mode 100644 src/tool_classifier/sparse_encoder.py diff --git a/docker-compose.yml b/docker-compose.yml index 1fec54b..976e27f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -178,6 +178,7 @@ services: - ./DSL/CronManager/DSL:/DSL - ./DSL/CronManager/script:/app/scripts - ./src/vector_indexer:/app/src/vector_indexer + - ./src/tool_classifier:/app/src/tool_classifier - ./src/intent_data_enrichment:/app/src/intent_data_enrichment - ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only) - cron_data:/app/data diff --git a/src/intent_data_enrichment/constants.py b/src/intent_data_enrichment/constants.py index f1f35f3..f506880 100644 --- a/src/intent_data_enrichment/constants.py +++ b/src/intent_data_enrichment/constants.py @@ -24,6 +24,10 @@ class EnrichmentConstants: VECTOR_SIZE = 3072 # Azure text-embedding-3-large dimension DISTANCE_METRIC = "Cosine" + # Named Vector Configuration (for hybrid search) + DENSE_VECTOR_NAME = "dense" + SPARSE_VECTOR_NAME = "sparse" + # Context Generation CONTEXT_TEMPLATE = """ {full_service_info} diff --git a/src/intent_data_enrichment/main_enrichment.py b/src/intent_data_enrichment/main_enrichment.py index d718678..d82358b 100644 --- a/src/intent_data_enrichment/main_enrichment.py +++ b/src/intent_data_enrichment/main_enrichment.py @@ -3,19 +3,61 @@ Service Data Enrichment Script This script receives service data, enriches it with LLM-generated context, -creates embeddings, and stores in Qdrant intent_collections. +creates embeddings (dense + sparse per example), and stores in Qdrant intent_collections. + +Indexing strategy: +- One 'example' point per example query (dense + sparse vectors of the example text) +- One 'summary' point per service (dense + sparse vectors of name + description + context) """ import sys import json import argparse import asyncio +from typing import List from loguru import logger from intent_data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult from intent_data_enrichment.api_client import LLMAPIClient from intent_data_enrichment.qdrant_manager import QdrantManager +# Import sparse encoder from tool_classifier (shared module) +sys.path.insert(0, "/app/src") +try: + from tool_classifier.sparse_encoder import compute_sparse_vector +except ImportError: + # Fallback for local development + try: + from src.tool_classifier.sparse_encoder import compute_sparse_vector + except ImportError: + logger.warning( + "Could not import sparse_encoder from tool_classifier, " + "attempting direct import" + ) + import importlib.util + import os + + # Try to find the module relative to this file + module_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + "tool_classifier", + "sparse_encoder.py", + ) + if os.path.exists(module_path): + spec = importlib.util.spec_from_file_location("sparse_encoder", module_path) + if spec is not None and spec.loader is not None: + sparse_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(sparse_module) + compute_sparse_vector = sparse_module.compute_sparse_vector + else: + raise ImportError( + f"Cannot load spec or loader for sparse_encoder.py at {module_path}" + ) from None + else: + raise ImportError( + f"Cannot find sparse_encoder.py at {module_path}" + ) from None + def parse_arguments() -> ServiceData: """Parse command line arguments into ServiceData model.""" @@ -76,7 +118,8 @@ def parse_arguments() -> ServiceData: async def enrich_service(service_data: ServiceData) -> EnrichmentResult: """ - Main enrichment pipeline: generate context, create embedding, store in Qdrant. + Main enrichment pipeline: generate context, create per-example embeddings, + store in Qdrant with hybrid vectors (dense + sparse). Args: service_data: Service data to enrich @@ -85,14 +128,51 @@ async def enrich_service(service_data: ServiceData) -> EnrichmentResult: EnrichmentResult with success/failure information """ try: - # Step 1: Generate rich context using LLM + # Step 1: Generate rich context using LLM (unchanged from original) logger.info("Step 1: Generating rich context with LLM") async with LLMAPIClient() as api_client: context = await api_client.generate_context(service_data) logger.success(f"Context generated: {len(context)} characters") - # Step 2: Combine generated context with original metadata for embedding - logger.info("Step 2: Combining context with original service metadata") + # Step 2: Create per-example points (dense + sparse vectors) + logger.info( + f"Step 2: Creating per-example embeddings for " + f"{len(service_data.examples)} examples" + ) + enriched_points: List[EnrichedService] = [] + + for i, example in enumerate(service_data.examples): + logger.info( + f" Creating embeddings for example {i + 1}/{len(service_data.examples)}: " + f"'{example[:80]}...'" if len(example) > 80 else + f" Creating embeddings for example {i + 1}/{len(service_data.examples)}: " + f"'{example}'" + ) + + # Dense: embed the individual example + dense_embedding = await api_client.create_embedding(example) + + # Sparse: BM25-style term frequencies for the example + sparse_vec = compute_sparse_vector(example) + + enriched_points.append( + EnrichedService( + id=service_data.service_id, + name=service_data.name, + description=service_data.description, + examples=service_data.examples, + entities=service_data.entities, + context=context, + embedding=dense_embedding, + sparse_indices=sparse_vec.indices, + sparse_values=sparse_vec.values, + example_text=example, + point_type="example", + ) + ) + + # Step 3: Create summary point (combined name + description + context) + logger.info("Step 3: Creating summary embedding") combined_text_parts = [ f"Service Name: {service_data.name}", f"Description: {service_data.description}", @@ -108,35 +188,44 @@ async def enrich_service(service_data: ServiceData) -> EnrichmentResult: f"Required Entities: {', '.join(service_data.entities)}" ) - # Add generated context last (enriched understanding) combined_text_parts.append(f"Enriched Context: {context}") - combined_text = "\n".join(combined_text_parts) - logger.info(f"Combined text length: {len(combined_text)} characters") - - # Step 3: Create embedding for combined text - logger.info("Step 3: Creating embedding vector for combined text") - embedding = await api_client.create_embedding(combined_text) - logger.success(f"Embedding created: {len(embedding)}-dimensional vector") - - # Step 4: Prepare enriched service - enriched_service = EnrichedService( - id=service_data.service_id, - name=service_data.name, - description=service_data.description, - examples=service_data.examples, - entities=service_data.entities, - context=context, - embedding=embedding, - ) - # Step 5: Store in Qdrant - logger.info("Step 5: Storing in Qdrant") + summary_embedding = await api_client.create_embedding(combined_text) + summary_sparse = compute_sparse_vector(combined_text) + + enriched_points.append( + EnrichedService( + id=service_data.service_id, + name=service_data.name, + description=service_data.description, + examples=service_data.examples, + entities=service_data.entities, + context=context, + embedding=summary_embedding, + sparse_indices=summary_sparse.indices, + sparse_values=summary_sparse.values, + example_text=None, + point_type="summary", + ) + ) + + # Step 4: Delete existing points for this service (idempotent update) + logger.info("Step 4: Removing existing points for idempotent update") qdrant = QdrantManager() try: qdrant.connect() qdrant.ensure_collection() - success = qdrant.upsert_service(enriched_service) + + # Delete old points before inserting new ones + qdrant.delete_service_points(service_data.service_id) + + # Step 5: Bulk upsert all points (examples + summary) + logger.info( + f"Step 5: Storing {len(enriched_points)} points in Qdrant " + f"({len(service_data.examples)} examples + 1 summary)" + ) + success = qdrant.upsert_service_points(enriched_points) finally: qdrant.close() @@ -144,9 +233,13 @@ async def enrich_service(service_data: ServiceData) -> EnrichmentResult: return EnrichmentResult( success=True, service_id=service_data.service_id, - message=f"Service '{service_data.name}' enriched and indexed successfully", + message=( + f"Service '{service_data.name}' enriched and indexed successfully " + f"({len(enriched_points)} points: " + f"{len(service_data.examples)} examples + 1 summary)" + ), context_length=len(context), - embedding_dimension=len(embedding), + embedding_dimension=len(summary_embedding), error=None, ) else: diff --git a/src/intent_data_enrichment/models.py b/src/intent_data_enrichment/models.py index eb0ef64..9390e73 100644 --- a/src/intent_data_enrichment/models.py +++ b/src/intent_data_enrichment/models.py @@ -20,7 +20,12 @@ class ServiceData(BaseModel): class EnrichedService(BaseModel): - """Enriched service data ready for storage.""" + """Enriched service data ready for storage. + + Each service produces multiple points in Qdrant: + - One 'example' point per example query (for precise matching) + - One 'summary' point for the combined service description + context + """ id: str = Field(..., description="Service ID (maps to service_id)") name: str = Field(..., description="Service name") @@ -28,7 +33,19 @@ class EnrichedService(BaseModel): examples: List[str] = Field(..., description="Example queries") entities: List[str] = Field(..., description="Expected entity names") context: str = Field(..., description="Generated rich context") - embedding: List[float] = Field(..., description="Context embedding vector") + embedding: List[float] = Field(..., description="Dense embedding vector") + sparse_indices: List[int] = Field( + default_factory=list, description="Sparse vector indices" + ) + sparse_values: List[float] = Field( + default_factory=list, description="Sparse vector values" + ) + example_text: Optional[str] = Field( + default=None, description="The specific example this point represents" + ) + point_type: str = Field( + default="summary", description="Point type: 'example' or 'summary'" + ) class EnrichmentResult(BaseModel): diff --git a/src/intent_data_enrichment/qdrant_manager.py b/src/intent_data_enrichment/qdrant_manager.py index 5024e23..579357a 100644 --- a/src/intent_data_enrichment/qdrant_manager.py +++ b/src/intent_data_enrichment/qdrant_manager.py @@ -1,10 +1,21 @@ -"""Qdrant manager for intent collections.""" +"""Qdrant manager for intent collections with hybrid search support.""" import uuid -from typing import Optional +from typing import Optional, List from loguru import logger from qdrant_client import QdrantClient -from qdrant_client.models import Distance, VectorParams, PointStruct +from qdrant_client.models import ( + Distance, + VectorParams, + PointStruct, + SparseVectorParams, + SparseIndexParams, + SparseVector, + Filter, + FieldCondition, + MatchValue, + FilterSelector, +) from intent_data_enrichment.constants import EnrichmentConstants from intent_data_enrichment.models import EnrichedService @@ -14,7 +25,7 @@ class QdrantManager: - """Manages Qdrant operations for intent collections.""" + """Manages Qdrant operations for intent collections with hybrid search.""" def __init__( self, @@ -44,7 +55,12 @@ def connect(self) -> None: raise def ensure_collection(self) -> None: - """Ensure the intent_collections collection exists with correct vector size.""" + """Ensure the intent_collections collection exists with hybrid vector config. + + The collection uses named vectors: + - 'dense': 3072-dim cosine similarity vectors for semantic matching + - 'sparse': BM25-style sparse vectors for keyword matching + """ try: if not self.client: raise RuntimeError(_CLIENT_NOT_INITIALIZED) @@ -53,48 +69,60 @@ def ensure_collection(self) -> None: collection_names = [col.name for col in collections] if self.collection_name in collection_names: - # Check if existing collection has correct vector size collection_info = self.client.get_collection(self.collection_name) - - # Qdrant vectors config is a dict - get the default vector config vectors_config = collection_info.config.params.vectors - existing_vector_size: Optional[int] = None + # Check if collection has the expected named vector configuration if isinstance(vectors_config, dict): - # Get first vector config (usually the default/unnamed one) - if vectors_config: - vector_params = next(iter(vectors_config.values())) - existing_vector_size = vector_params.size + if EnrichmentConstants.DENSE_VECTOR_NAME in vectors_config: + existing_vector_size = vectors_config[ + EnrichmentConstants.DENSE_VECTOR_NAME + ].size + if existing_vector_size != EnrichmentConstants.VECTOR_SIZE: + logger.error( + f"Collection '{self.collection_name}' has incompatible vector size: " + f"{existing_vector_size} (expected {EnrichmentConstants.VECTOR_SIZE})" + ) + raise RuntimeError( + f"Collection '{self.collection_name}' has incompatible vector size " + f"({existing_vector_size} vs expected {EnrichmentConstants.VECTOR_SIZE}). " + "To recreate the collection, manually delete it first using: " + f"qdrant.client.delete_collection('{self.collection_name}') or via Qdrant UI/API." + ) + logger.info( + f"Collection '{self.collection_name}' already exists " + f"with correct hybrid vector config (dense: {existing_vector_size}d + sparse)" + ) + else: + # Old collection format (unnamed/single vector) — needs migration + logger.error( + f"Collection '{self.collection_name}' exists but uses old single-vector format. " + "Migration to named vectors (dense + sparse) required." + ) + raise RuntimeError( + f"Collection '{self.collection_name}' uses old single-vector format. " + "Please delete the collection and re-index all services. " + f"Delete with: qdrant.client.delete_collection('{self.collection_name}') " + "or via Qdrant UI/API." + ) elif vectors_config is not None: - # Direct VectorParams object (older API) - existing_vector_size = vectors_config.size - - if existing_vector_size is None: + # Direct VectorParams object (old single-vector format) logger.error( - f"Collection '{self.collection_name}' exists but vector size cannot be determined" + f"Collection '{self.collection_name}' exists but uses old single-vector format." ) raise RuntimeError( - f"Collection '{self.collection_name}' exists but vector size cannot be determined. " - "This may indicate a Qdrant API issue or unexpected collection configuration. " - "Manual intervention required: verify Qdrant health, inspect collection config, " - "or manually delete the collection if recreating is intended." + f"Collection '{self.collection_name}' uses old single-vector format. " + "Please delete the collection and re-index all services. " + f"Delete with: qdrant.client.delete_collection('{self.collection_name}') " + "or via Qdrant UI/API." ) - elif existing_vector_size != EnrichmentConstants.VECTOR_SIZE: + else: logger.error( - f"Collection '{self.collection_name}' has incompatible vector size: " - f"{existing_vector_size} (expected {EnrichmentConstants.VECTOR_SIZE})" + f"Collection '{self.collection_name}' exists but vector config cannot be determined" ) raise RuntimeError( - f"Collection '{self.collection_name}' has incompatible vector size " - f"({existing_vector_size} vs expected {EnrichmentConstants.VECTOR_SIZE}). " - "This prevents automatic deletion to avoid accidental data loss. " - "To recreate the collection, manually delete it first using: " - f"qdrant.client.delete_collection('{self.collection_name}') or via Qdrant UI/API." - ) - else: - logger.info( - f"Collection '{self.collection_name}' already exists " - f"with correct vector size ({existing_vector_size})" + f"Collection '{self.collection_name}' exists but vector config cannot be determined. " + "Manual intervention required." ) else: self._create_collection() @@ -104,77 +132,175 @@ def ensure_collection(self) -> None: raise def _create_collection(self) -> None: - """Create the collection with correct vector configuration.""" + """Create the collection with hybrid vector configuration (dense + sparse).""" if not self.client: raise RuntimeError(_CLIENT_NOT_INITIALIZED) logger.info( f"Creating collection '{self.collection_name}' " - f"with vector size {EnrichmentConstants.VECTOR_SIZE}" + f"with hybrid vectors (dense: {EnrichmentConstants.VECTOR_SIZE}d + sparse)" ) self.client.create_collection( collection_name=self.collection_name, - vectors_config=VectorParams( - size=EnrichmentConstants.VECTOR_SIZE, - distance=Distance.COSINE, - ), + vectors_config={ + EnrichmentConstants.DENSE_VECTOR_NAME: VectorParams( + size=EnrichmentConstants.VECTOR_SIZE, + distance=Distance.COSINE, + ), + }, + sparse_vectors_config={ + EnrichmentConstants.SPARSE_VECTOR_NAME: SparseVectorParams( + index=SparseIndexParams(on_disk=False), + ), + }, ) logger.success(f"Collection '{self.collection_name}' created successfully") - def upsert_service(self, enriched_service: EnrichedService) -> bool: - """ - Upsert enriched service to Qdrant (update if exists, insert if new). + def delete_service_points(self, service_id: str) -> bool: + """Delete all points belonging to a service. + + Used before re-indexing to ensure idempotent updates, and when + a service is deactivated. Args: - enriched_service: EnrichedService instance containing the embedding and - associated metadata to upsert into Qdrant. + service_id: Service identifier to delete all points for Returns: True if successful, False otherwise """ try: if not self.client: - raise RuntimeError("Qdrant client not initialized") - - logger.info(f"Upserting service '{enriched_service.id}' to Qdrant") - - # Convert service_id to UUID for Qdrant compatibility - # Qdrant requires point IDs to be either integers or UUIDs - point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, enriched_service.id)) - - # Prepare payload (all metadata except embedding) - payload = { - "service_id": enriched_service.id, # Store original ID in payload - "name": enriched_service.name, - "description": enriched_service.description, - "examples": enriched_service.examples, - "entities": enriched_service.entities, - "context": enriched_service.context, - } - - # Create point with UUID - point = PointStruct( - id=point_id, # ✓ Now using UUID string - vector=enriched_service.embedding, - payload=payload, + raise RuntimeError(_CLIENT_NOT_INITIALIZED) + + logger.info( + f"Deleting existing points for service '{service_id}' from Qdrant" + ) + + self.client.delete( + collection_name=self.collection_name, + points_selector=FilterSelector( + filter=Filter( + must=[ + FieldCondition( + key="service_id", + match=MatchValue(value=service_id), + ) + ] + ) + ), + ) + + logger.success( + f"Successfully deleted points for service '{service_id}'" + ) + return True + + except Exception as e: + logger.error( + f"Failed to delete points for service '{service_id}': {e}" ) + return False + + def upsert_service_points( + self, enriched_points: List[EnrichedService] + ) -> bool: + """Upsert multiple enriched service points to Qdrant. + + Each point contains both dense and sparse vectors for hybrid search. + Points are identified by a deterministic UUID based on service_id + point_index. + + Args: + enriched_points: List of EnrichedService instances (examples + summary) - # Upsert to Qdrant + Returns: + True if all points upserted successfully, False otherwise + """ + try: + if not self.client: + raise RuntimeError(_CLIENT_NOT_INITIALIZED) + + if not enriched_points: + logger.warning("No points to upsert") + return True + + service_id = enriched_points[0].id + logger.info( + f"Upserting {len(enriched_points)} points for service '{service_id}'" + ) + + + from typing import Any, Dict + points: List[PointStruct] = [] + for idx, enriched_service in enumerate(enriched_points): + # Deterministic UUID based on service_id + index + point_id_source = f"{enriched_service.id}_{idx}" + point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, point_id_source)) + + # Prepare payload + payload = { + "service_id": enriched_service.id, + "name": enriched_service.name, + "description": enriched_service.description, + "examples": enriched_service.examples, + "entities": enriched_service.entities, + "context": enriched_service.context, + "point_type": enriched_service.point_type, + } + + # Add example_text for example points + if enriched_service.example_text: + payload["example_text"] = enriched_service.example_text + + # Build named vectors (dense always, sparse if present) + vectors: Dict[str, Any] = { + EnrichmentConstants.DENSE_VECTOR_NAME: enriched_service.embedding, + } + if enriched_service.sparse_indices: + vectors[EnrichmentConstants.SPARSE_VECTOR_NAME] = SparseVector( + indices=enriched_service.sparse_indices, + values=enriched_service.sparse_values, + ) + + point = PointStruct( + id=point_id, + vector=vectors, + payload=payload, + ) + + points.append(point) + + # Bulk upsert self.client.upsert( collection_name=self.collection_name, - points=[point], + points=points, ) logger.success( - f"Successfully upserted service '{enriched_service.id}' " - f"({len(enriched_service.embedding)}-dim vector)" + f"Successfully upserted {len(points)} points for service '{service_id}' " + f"({sum(1 for p in enriched_points if p.point_type == 'example')} examples + " + f"{sum(1 for p in enriched_points if p.point_type == 'summary')} summary)" ) return True except Exception as e: - logger.error(f"Failed to upsert service '{enriched_service.id}': {e}") + logger.error( + f"Failed to upsert service points: {e}" + ) return False + def upsert_service(self, enriched_service: EnrichedService) -> bool: + """Upsert a single enriched service to Qdrant. + + Backward-compatible wrapper that delegates to upsert_service_points. + + Args: + enriched_service: EnrichedService instance + + Returns: + True if successful, False otherwise + """ + return self.upsert_service_points([enriched_service]) + def close(self) -> None: """Close Qdrant connection.""" if self.client: diff --git a/src/tool_classifier/classifier.py b/src/tool_classifier/classifier.py index ab9e402..2313b94 100644 --- a/src/tool_classifier/classifier.py +++ b/src/tool_classifier/classifier.py @@ -1,6 +1,7 @@ -"""Main tool classifier for workflow routing.""" +"""Main tool classifier for workflow routing with hybrid search classification.""" from typing import Any, AsyncIterator, Dict, List, Literal, Optional, Union, overload +import httpx from loguru import logger from models.request_models import ( @@ -10,6 +11,17 @@ ) from tool_classifier.enums import WorkflowType, WORKFLOW_DISPLAY_NAMES from tool_classifier.models import ClassificationResult +from tool_classifier.constants import ( + QDRANT_HOST, + QDRANT_PORT, + QDRANT_COLLECTION, + QDRANT_TIMEOUT, + HYBRID_SEARCH_TOP_K, + HYBRID_SEARCH_MIN_THRESHOLD, + SCORE_RATIO_THRESHOLD, + SCORE_GAP_THRESHOLD, +) +from tool_classifier.sparse_encoder import compute_sparse_vector from tool_classifier.workflows import ( ServiceWorkflowExecutor, ContextWorkflowExecutor, @@ -22,19 +34,16 @@ class ToolClassifier: """ Main classifier that determines which workflow should handle user queries. + Uses Qdrant hybrid search (dense + sparse + RRF fusion) to classify queries: + - High-confidence service match → SERVICE workflow (skip discovery + intent detection) + - Ambiguous match → SERVICE workflow with LLM confirmation + - No match → CONTEXT/RAG workflow (skip SERVICE entirely) + Implements a layer-wise filtering approach: Layer 1: Service Workflow → External API calls Layer 2: Context Workflow → Conversation history/greetings Layer 3: RAG Workflow → Knowledge base retrieval Layer 4: OOD Workflow → Out-of-domain fallback - - Each layer is tried in sequence. If a layer cannot handle the query - (returns None), the classifier falls back to the next layer. - - Architecture: - - Strategy Pattern: Each workflow is a pluggable strategy - - Chain of Responsibility: Layers form a fallback chain - - Dependency Injection: LLM manager and connections injected from main service """ def __init__( @@ -52,6 +61,17 @@ def __init__( self.llm_manager = llm_manager self.orchestration_service = orchestration_service + # Shared httpx client for Qdrant queries (connection pooling) + self._qdrant_base_url = f"http://{QDRANT_HOST}:{QDRANT_PORT}" + self._qdrant_client = httpx.AsyncClient( + base_url=self._qdrant_base_url, + timeout=QDRANT_TIMEOUT, + limits=httpx.Limits( + max_connections=20, + max_keepalive_connections=10, + ), + ) + # Initialize workflow executors self.service_workflow = ServiceWorkflowExecutor( llm_manager=llm_manager, @@ -65,7 +85,10 @@ def __init__( ) self.ood_workflow = OODWorkflowExecutor() - logger.info("Tool classifier initialized with all workflow executors") + logger.info( + "Tool classifier initialized with hybrid search classification " + f"(Qdrant: {self._qdrant_base_url})" + ) async def classify( self, @@ -74,13 +97,16 @@ async def classify( language: str, ) -> ClassificationResult: """ - Classify a user query to determine which workflow should handle it. + Classify a user query using Qdrant hybrid search (dense + sparse + RRF). - Implements layer-wise classification logic with fallback chain: - 1. SERVICE workflow (external API calls) - 2. CONTEXT workflow (greetings/conversation history) - 3. RAG workflow (knowledge base retrieval) - 4. OOD workflow (out-of-domain) + Classification flow: + 1. Generate dense embedding for the query + 2. Generate sparse vector for the query (BM25-style) + 3. Run hybrid search on intent_collections (prefetch dense + sparse → RRF fusion) + 4. Apply score-gap analysis: + - Clear winner (high ratio + gap) → SERVICE with high confidence + - Ambiguous (scores exist but close) → SERVICE with LLM confirmation flag + - No match (low/no scores) → CONTEXT (skip SERVICE entirely) Args: query: User's query string @@ -92,13 +118,291 @@ async def classify( """ logger.info(f"Classifying query: {query[:100]}...") - logger.info("Starting layer-wise fallback: ") - return ClassificationResult( - workflow=WorkflowType.SERVICE, - confidence=1.0, - metadata={}, - reasoning="Start with Service workflow - will cascade through layers", - ) + try: + # Step 1: Generate dense embedding for query + query_embedding = self._get_query_embedding(query) + if query_embedding is None: + logger.warning("Failed to generate query embedding, falling back to CONTEXT/RAG") + return ClassificationResult( + workflow=WorkflowType.CONTEXT, + confidence=1.0, + metadata={"reason": "embedding_generation_failed"}, + reasoning="Could not generate embedding - skip to Context/RAG", + ) + + # Step 2: Generate sparse vector for query + query_sparse = compute_sparse_vector(query) + + # Step 3: Qdrant hybrid search with RRF fusion + results = await self._hybrid_search( + dense_vector=query_embedding, + sparse_vector=query_sparse, + top_k=HYBRID_SEARCH_TOP_K, + ) + + if not results: + logger.info("No hybrid search results - routing to CONTEXT/RAG") + return ClassificationResult( + workflow=WorkflowType.CONTEXT, + confidence=1.0, + metadata={"reason": "no_service_match"}, + reasoning="No services matched the query", + ) + + # Step 4: Score-gap analysis + top = results[0] + top_score = top.get("rrf_score", 0.0) + top_service_id = top.get("service_id", "unknown") + top_service_name = top.get("name", "unknown") + + second_score = results[1].get("rrf_score", 0.0) if len(results) > 1 else 0.0 + + score_ratio = top_score / max(second_score, 0.0001) + score_gap = top_score - second_score + + logger.info( + f"Hybrid search results - " + f"top: {top_service_name} (score={top_score:.6f}), " + f"second: {results[1].get('name', 'none') if len(results) > 1 else 'none'} " + f"(score={second_score:.6f}), " + f"ratio={score_ratio:.2f}, gap={score_gap:.6f}" + ) + + # High confidence: clear winner → SERVICE (skip discovery + intent detection) + if score_ratio > SCORE_RATIO_THRESHOLD and score_gap > SCORE_GAP_THRESHOLD: + logger.info( + f"High-confidence service match: {top_service_name} " + f"(ratio={score_ratio:.2f}, gap={score_gap:.6f})" + ) + return ClassificationResult( + workflow=WorkflowType.SERVICE, + confidence=min(score_ratio / 5.0, 1.0), + metadata={ + "matched_service_id": top_service_id, + "matched_service_name": top_service_name, + "rrf_score": top_score, + "score_gap": score_gap, + "score_ratio": score_ratio, + "needs_llm_confirmation": False, + "top_results": results[:3], + }, + reasoning=( + f"High-confidence match: {top_service_name} " + f"(ratio={score_ratio:.2f}, gap={score_gap:.6f})" + ), + ) + + # Medium confidence: ambiguous → SERVICE with LLM confirmation + if top_score > HYBRID_SEARCH_MIN_THRESHOLD: + logger.info( + f"Ambiguous service match: {top_service_name} " + f"(score={top_score:.6f}, ratio={score_ratio:.2f}) - needs LLM confirmation" + ) + return ClassificationResult( + workflow=WorkflowType.SERVICE, + confidence=0.5, + metadata={ + "matched_service_id": top_service_id, + "matched_service_name": top_service_name, + "rrf_score": top_score, + "score_gap": score_gap, + "score_ratio": score_ratio, + "needs_llm_confirmation": True, + "top_results": results[:3], + }, + reasoning=( + f"Ambiguous match: {top_service_name} " + f"(score={top_score:.6f}) - LLM confirmation needed" + ), + ) + + # No confidence: skip SERVICE entirely → CONTEXT/RAG + logger.info( + f"No service match (top_score={top_score:.6f} below threshold " + f"{HYBRID_SEARCH_MIN_THRESHOLD}) - routing to CONTEXT/RAG" + ) + return ClassificationResult( + workflow=WorkflowType.CONTEXT, + confidence=1.0, + metadata={"reason": "below_threshold", "top_score": top_score}, + reasoning=f"Top score {top_score:.6f} below threshold - skip to Context/RAG", + ) + + except Exception as e: + logger.error(f"Hybrid classification failed: {e}", exc_info=True) + # Fallback: route to CONTEXT/RAG on any error + return ClassificationResult( + workflow=WorkflowType.CONTEXT, + confidence=1.0, + metadata={"reason": "classification_error", "error": str(e)}, + reasoning=f"Classification error - falling back to Context/RAG: {e}", + ) + + def _get_query_embedding(self, query: str) -> Optional[List[float]]: + """Generate dense embedding for a query using the orchestration service. + + Args: + query: Query text to embed + + Returns: + List of floats representing the dense embedding, or None on failure + """ + try: + if not self.orchestration_service: + logger.error("Orchestration service not available for embedding") + return None + + result = self.orchestration_service.create_embeddings_for_indexer( + texts=[query], + environment="production", + batch_size=1, + ) + + embeddings = result.get("embeddings", []) + if embeddings and len(embeddings) > 0: + return embeddings[0] + + logger.error("No embedding returned for query") + return None + + except Exception as e: + logger.error(f"Failed to generate query embedding: {e}") + return None + + async def _hybrid_search( + self, + dense_vector: List[float], + sparse_vector: Any, + top_k: int = HYBRID_SEARCH_TOP_K, + ) -> List[Dict[str, Any]]: + """Execute hybrid search on Qdrant using prefetch + RRF fusion. + + Sends both dense and sparse vectors in a single Qdrant query, + using the prefetch API for parallel retrieval and RRF for fusion. + + Args: + dense_vector: Dense embedding vector (3072-dim) + sparse_vector: SparseVector with indices and values + top_k: Number of results to return + + Returns: + List of result dicts with service metadata and rrf_score + """ + try: + # Check if collection exists and has data + try: + collection_info = await self._qdrant_client.get( + f"/collections/{QDRANT_COLLECTION}" + ) + if collection_info.status_code == 200: + info = collection_info.json() + points_count = info.get("result", {}).get("points_count", 0) + if points_count == 0: + logger.info("Intent collection is empty - no services indexed") + return [] + else: + logger.warning( + f"Could not verify collection: HTTP {collection_info.status_code}" + ) + return [] + except Exception as e: + logger.warning(f"Could not verify intent collection: {e}") + return [] + + # Build hybrid search payload with prefetch + RRF + search_payload: Dict[str, Any] = { + "prefetch": [ + { + "query": dense_vector, + "using": "dense", + "limit": top_k * 2, + }, + ], + "query": {"fusion": "rrf"}, + "limit": top_k, + "with_payload": True, + } + + # Add sparse prefetch only if sparse vector is non-empty + if not sparse_vector.is_empty(): + search_payload["prefetch"].append( + { + "query": sparse_vector.to_dict(), + "using": "sparse", + "limit": top_k * 2, + } + ) + + response = await self._qdrant_client.post( + f"/collections/{QDRANT_COLLECTION}/points/query", + json=search_payload, + ) + + if response.status_code != 200: + logger.error( + f"Qdrant hybrid search failed: HTTP {response.status_code} - " + f"{response.text}" + ) + return [] + + search_results = response.json() + points = search_results.get("result", {}).get("points", []) + + if not points: + logger.info("No results from hybrid search") + return [] + + # Parse and deduplicate results (group by service_id, keep best score) + service_results: Dict[str, Dict[str, Any]] = {} + for point in points: + payload = point.get("payload", {}) + score = float(point.get("score", 0)) + service_id = payload.get("service_id", "unknown") + + if service_id not in service_results or score > service_results[service_id].get("rrf_score", 0): + service_results[service_id] = { + "service_id": service_id, + "name": payload.get("name", ""), + "description": payload.get("description", ""), + "examples": payload.get("examples", []), + "entities": payload.get("entities", []), + "context": payload.get("context", ""), + "point_type": payload.get("point_type", "unknown"), + "example_text": payload.get("example_text"), + "rrf_score": score, + } + + # Sort by RRF score descending + sorted_results = sorted( + service_results.values(), + key=lambda x: x["rrf_score"], + reverse=True, + ) + + logger.info( + f"Hybrid search found {len(sorted_results)} unique services " + f"from {len(points)} points" + ) + + for i, r in enumerate(sorted_results[:3]): + logger.debug( + f" Rank {i + 1}: {r['name']} " + f"(service_id={r['service_id']}, " + f"rrf_score={r['rrf_score']:.6f}, " + f"type={r['point_type']})" + ) + + return sorted_results + + except httpx.TimeoutException: + logger.error( + f"Qdrant hybrid search timeout after {QDRANT_TIMEOUT}s" + ) + return [] + except Exception as e: + logger.error(f"Hybrid search failed: {e}", exc_info=True) + return [] + @overload async def route_to_workflow( diff --git a/src/tool_classifier/constants.py b/src/tool_classifier/constants.py index c885b52..7db6aa9 100644 --- a/src/tool_classifier/constants.py +++ b/src/tool_classifier/constants.py @@ -58,3 +58,22 @@ SERVICE_COUNT_THRESHOLD = 10 """Threshold for triggering semantic search. If service count > this value, semantic search is used instead of sending all services to LLM.""" + + +# ============================================================================ +# Hybrid Search Classification Thresholds +# ============================================================================ + +HYBRID_SEARCH_TOP_K = 5 +"""Number of top results from hybrid search for classification.""" + +HYBRID_SEARCH_MIN_THRESHOLD = 0.01 +"""Minimum RRF score to consider a result as a potential match.""" + +SCORE_RATIO_THRESHOLD = 2.0 +"""Score ratio (top/second) for confident service classification. +If the top result's RRF score is > 2x the second result, it's a high-confidence match.""" + +SCORE_GAP_THRESHOLD = 0.005 +"""Absolute score gap for confident classification. +Prevents false positives when both scores are very low.""" diff --git a/src/tool_classifier/sparse_encoder.py b/src/tool_classifier/sparse_encoder.py new file mode 100644 index 0000000..0d0dc3f --- /dev/null +++ b/src/tool_classifier/sparse_encoder.py @@ -0,0 +1,82 @@ +""" +Sparse vector encoder for BM25-style term frequency vectors. + +Shared module used by both: +- intent_data_enrichment (indexing time) — to create sparse vectors for service examples +- tool_classifier (query time) — to create sparse vectors for user queries + +Uses hash-based indexing compatible with Qdrant's sparse vector format. +""" + +import re +from collections import Counter +from dataclasses import dataclass, field +from typing import List + + +# Hash space for sparse vector indices +# Larger = fewer collisions but more memory; 50K is a good balance for intent classification +SPARSE_VOCAB_SIZE = 50_000 + +# Simple word tokenizer matching the pattern used in contextual_retrieval/bm25_search.py +TOKENIZER_PATTERN = re.compile(r"\w+") + + +@dataclass +class SparseVector: + """Sparse vector representation for Qdrant. + + Attributes: + indices: Sorted list of non-zero dimension indices + values: Corresponding values for each index + """ + + indices: List[int] = field(default_factory=list) + values: List[float] = field(default_factory=list) + + def to_dict(self) -> dict: + """Convert to Qdrant API format.""" + return {"indices": self.indices, "values": self.values} + + def is_empty(self) -> bool: + """Check if the sparse vector has no entries.""" + return len(self.indices) == 0 + + +def compute_sparse_vector(text: str) -> SparseVector: + """Convert text to a sparse vector using term-frequency hashing. + + Tokenizes the input text, counts term frequencies, and maps each token + to a hash-based index in the sparse vector space. This creates a + BM25-compatible representation that Qdrant can use for sparse search. + + Args: + text: Input text to vectorize + + Returns: + SparseVector with hash-based indices and term frequency values + """ + if not text or not text.strip(): + return SparseVector() + + # Tokenize: lowercase and extract word tokens + tokens = TOKENIZER_PATTERN.findall(text.lower()) + if not tokens: + return SparseVector() + + # Count term frequencies + token_counts = Counter(tokens) + + # Hash-based indexing: map each token to an index in [0, SPARSE_VOCAB_SIZE) + # Collisions are handled by summing values at the same index + hash_counts: dict[int, float] = {} + for token, count in token_counts.items(): + idx = hash(token) % SPARSE_VOCAB_SIZE + # Handle hash collisions by accumulating + hash_counts[idx] = hash_counts.get(idx, 0) + float(count) + + # Sort indices for consistent representation (Qdrant requirement) + sorted_indices = sorted(hash_counts.keys()) + sorted_values = [hash_counts[i] for i in sorted_indices] + + return SparseVector(indices=sorted_indices, values=sorted_values) diff --git a/src/tool_classifier/workflows/service_workflow.py b/src/tool_classifier/workflows/service_workflow.py index bed97dd..747c987 100644 --- a/src/tool_classifier/workflows/service_workflow.py +++ b/src/tool_classifier/workflows/service_workflow.py @@ -553,9 +553,14 @@ async def execute_async( ) -> Optional[OrchestrationResponse]: """Execute service workflow in non-streaming mode. + Uses classification metadata from hybrid search: + - needs_llm_confirmation=False: Skip discovery + intent detection, use matched service + - needs_llm_confirmation=True: Run LLM intent detection on candidate services only + - No metadata: Fall back to original discovery flow + Args: request: Orchestration request - context: Workflow context + context: Workflow context (contains classification metadata) timing_dict: Optional timing dictionary for unified tracking """ import time @@ -568,12 +573,77 @@ async def execute_async( if timing_dict is None: timing_dict = {} - # Service discovery with timing - start_time = time.time() - await self._log_request_details( - request, context, mode="non-streaming", costs_dict=costs_dict - ) - timing_dict["service.discovery"] = time.time() - start_time + # Check if classifier provided hybrid search metadata + needs_llm_confirmation = context.get("needs_llm_confirmation") + + if needs_llm_confirmation is False: + # HIGH CONFIDENCE PATH: Classifier matched a service with high confidence + # Skip service discovery — use hybrid search match directly + matched_service_id = context.get("matched_service_id") + matched_service_name = context.get("matched_service_name") + rrf_score = context.get("rrf_score", 0) + + logger.info( + f"[{chat_id}] HIGH-CONFIDENCE SERVICE MATCH (non-streaming): " + f"{matched_service_name} (rrf_score={rrf_score:.6f}) - " + f"skipping discovery" + ) + + # Get service details from top_results (already retrieved by classifier) + top_results = context.get("top_results", []) + if top_results: + matched = top_results[0] + + # Run entity extraction via LLM (DSPy) for this single service + start_time = time.time() + await self._process_intent_detection( + services=[matched], + request=request, + chat_id=chat_id, + context=context, + costs_dict=costs_dict, + ) + timing_dict["service.intent_detection"] = time.time() - start_time + + # Ensure service_data is populated from hybrid match + # _process_intent_detection may not set it if DSPy returns + # a different service_id format, so we populate it explicitly + if not context.get("service_data"): + context["service_id"] = matched.get("service_id") + context["service_data"] = matched + logger.info( + f"[{chat_id}] Populated service_data from hybrid match: " + f"{matched.get('name')}" + ) + + elif needs_llm_confirmation is True: + # AMBIGUOUS PATH: Multiple services scored similarly + # Run LLM intent detection only on candidate services (not all services) + top_results = context.get("top_results", []) + logger.info( + f"[{chat_id}] AMBIGUOUS SERVICE MATCH (non-streaming): " + f"running LLM intent detection on {len(top_results)} candidates" + ) + + start_time = time.time() + if top_results: + await self._process_intent_detection( + services=top_results, + request=request, + chat_id=chat_id, + context=context, + costs_dict=costs_dict, + ) + timing_dict["service.discovery"] = time.time() - start_time + + else: + # LEGACY PATH: No hybrid search metadata (classifier disabled or error) + # Full service discovery + intent detection (original behavior) + start_time = time.time() + await self._log_request_details( + request, context, mode="non-streaming", costs_dict=costs_dict + ) + timing_dict["service.discovery"] = time.time() - start_time # Check if service was detected and validated if not context.get("service_id"): @@ -692,9 +762,11 @@ async def execute_streaming( ) -> Optional[AsyncIterator[str]]: """Execute service workflow in streaming mode. + Uses classification metadata from hybrid search (same as execute_async). + Args: request: Orchestration request - context: Workflow context + context: Workflow context (contains classification metadata) timing_dict: Optional timing dictionary for unified tracking """ import time @@ -707,12 +779,68 @@ async def execute_streaming( if timing_dict is None: timing_dict = {} - # Service discovery with timing - start_time = time.time() - await self._log_request_details( - request, context, mode="streaming", costs_dict=costs_dict - ) - timing_dict["service.discovery"] = time.time() - start_time + # Check if classifier provided hybrid search metadata + needs_llm_confirmation = context.get("needs_llm_confirmation") + + if needs_llm_confirmation is False: + # HIGH CONFIDENCE PATH: Skip discovery, use matched service + matched_service_name = context.get("matched_service_name") + rrf_score = context.get("rrf_score", 0) + + logger.info( + f"[{chat_id}] HIGH-CONFIDENCE SERVICE MATCH (streaming): " + f"{matched_service_name} (rrf_score={rrf_score:.6f})" + ) + + top_results = context.get("top_results", []) + if top_results: + matched = top_results[0] + + start_time = time.time() + await self._process_intent_detection( + services=[matched], + request=request, + chat_id=chat_id, + context=context, + costs_dict=costs_dict, + ) + timing_dict["service.intent_detection"] = time.time() - start_time + + # Ensure service_data is populated from hybrid match + if not context.get("service_data"): + context["service_id"] = matched.get("service_id") + context["service_data"] = matched + logger.info( + f"[{chat_id}] Populated service_data from hybrid match: " + f"{matched.get('name')}" + ) + + elif needs_llm_confirmation is True: + # AMBIGUOUS PATH: Run LLM intent detection on candidates + top_results = context.get("top_results", []) + logger.info( + f"[{chat_id}] AMBIGUOUS SERVICE MATCH (streaming): " + f"{len(top_results)} candidates" + ) + + start_time = time.time() + if top_results: + await self._process_intent_detection( + services=top_results, + request=request, + chat_id=chat_id, + context=context, + costs_dict=costs_dict, + ) + timing_dict["service.discovery"] = time.time() - start_time + + else: + # LEGACY PATH: Full service discovery (original behavior) + start_time = time.time() + await self._log_request_details( + request, context, mode="streaming", costs_dict=costs_dict + ) + timing_dict["service.discovery"] = time.time() - start_time # Check if service was detected and validated if not context.get("service_id"): From 789f062e3fdd6aca000ce1551fc8f411328020d8 Mon Sep 17 00:00:00 2001 From: nuwangeek Date: Sun, 1 Mar 2026 10:38:40 +0530 Subject: [PATCH 3/4] update tool classifier --- docs/HYBRID_SEARCH_CLASSIFICATION.md | 380 +++++++++++++++++++++++++++ src/tool_classifier/classifier.py | 263 ++++++++++++------ src/tool_classifier/constants.py | 24 +- 3 files changed, 579 insertions(+), 88 deletions(-) create mode 100644 docs/HYBRID_SEARCH_CLASSIFICATION.md diff --git a/docs/HYBRID_SEARCH_CLASSIFICATION.md b/docs/HYBRID_SEARCH_CLASSIFICATION.md new file mode 100644 index 0000000..3e29b99 --- /dev/null +++ b/docs/HYBRID_SEARCH_CLASSIFICATION.md @@ -0,0 +1,380 @@ +# Hybrid Search Classification & Intent Data Enrichment + +> Updated architecture for the Tool Classifier using hybrid search (dense + sparse + RRF) with per-example indexing. +> Replaces the single-embedding approach documented in `TOOL_CLASSIFIER_AND_SERVICE_WORKFLOW.md`. + +--- + +## Table of Contents + +1. [Architecture Overview](#architecture-overview) +2. [Intent Data Enrichment (Indexing)](#intent-data-enrichment-indexing) +3. [Classification Flow (Query Time)](#classification-flow-query-time) +4. [Intent Detection & Entity Extraction](#intent-detection--entity-extraction) +5. [Thresholds & Configuration](#thresholds--configuration) + +--- + +## Architecture Overview + +The system has two phases: + +1. **Indexing (offline):** For each service, create multiple Qdrant points with dense + sparse vectors +2. **Classification (query time):** Two-step search to route queries — dense for relevance, hybrid for service identification + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ INDEXING (Offline) │ +│ │ +│ service_enrichment.sh → main_enrichment.py │ +│ ├─ LLM context generation │ +│ ├─ Per-example: dense embedding + sparse BM25 vector │ +│ ├─ Summary: dense embedding + sparse BM25 vector │ +│ └─ Qdrant upsert (N examples + 1 summary = N+1 points) │ +├─────────────────────────────────────────────────────────────────────┤ +│ CLASSIFICATION (Query Time) │ +│ │ +│ User Query │ +│ ├─ Step 1: Dense search → cosine similarity (relevance check) │ +│ ├─ Step 2: Hybrid search → RRF fusion (service identification) │ +│ └─ Route: HIGH-CONFIDENCE / AMBIGUOUS / CONTEXT-RAG │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Intent Data Enrichment (Indexing) + +### Source Files + +| File | Role | +|------|------| +| `DSL/CronManager/script/service_enrichment.sh` | Entry point — sets environment, runs Python script | +| `src/intent_data_enrichment/main_enrichment.py` | Orchestrates per-example and summary point creation | +| `src/intent_data_enrichment/qdrant_manager.py` | Qdrant collection management, upsert, and deletion | +| `src/intent_data_enrichment/api_client.py` | LLM API calls (context generation, embeddings) | +| `src/intent_data_enrichment/models.py` | `EnrichedService` data model | +| `src/tool_classifier/sparse_encoder.py` | BM25-style sparse vector computation | + +### What Changed: Single Embedding → Per-Example Indexing + +**Before (old):** One point per service from concatenated text. + +**After (new):** N+1 points per service — one per example query, plus one summary. + +Example for a service with 3 examples: +``` +Service "Valuutakursid" → 4 Qdrant points + + Point 0 (example): "Mis suhe on euro ja usd vahel" + dense: 3072-dim embedding of this exact text + sparse: BM25 vector → {euro: 1.0, usd: 1.0, suhe: 1.0, ...} + + Point 1 (example): "Mis on euro ja btc vahetuskurss?" + dense: 3072-dim embedding of this exact text + sparse: BM25 vector → {euro: 1.0, btc: 1.0, vahetuskurss: 1.0, ...} + + Point 2 (example): "euro ja gbp vaheline kurss" + dense: 3072-dim embedding of this exact text + sparse: BM25 vector → {euro: 1.0, gbp: 1.0, kurss: 1.0, ...} + + Point 3 (summary): "Valuutakursid - Kasutaja soovib infot..." + dense: 3072-dim embedding of name + description + LLM context + sparse: BM25 vector of combined text +``` + +### Why Per-Example Indexing? + +- Each example gets its own embedding, matching diverse user phrasings better +- Short example queries aren't diluted by long descriptions +- More examples = wider coverage "net" for query matching +- Sparse vectors enable keyword matching ("EUR", "USD") alongside semantic search + +### Dense vs Sparse Vectors + +| Type | Generation | Strength | +|------|-----------|----------| +| **Dense** (3072-dim) | `text-embedding-3-large` via Azure OpenAI | Semantic similarity — matches paraphrases, cross-language | +| **Sparse** (BM25) | Term frequency hashing (`sparse_encoder.py`) | Keyword overlap — exact token matching ("EUR", "USD", "THB") | + +### Sparse Vector Generation + +```python +# sparse_encoder.py +text = "Mis suhe on euro ja usd vahel" +tokens = re.findall(r"\w+", text.lower()) # ["mis", "suhe", "on", "euro", ...] +# Each token → hashed to index in [0, VOCAB_SIZE), value = term frequency +# Output: SparseVector(indices=[hash("mis"), hash("euro"), ...], values=[1.0, 1.0, ...]) +``` + +### Qdrant Collection Schema + +```python +# Collection: "intent_collections" +vectors_config = { + "dense": VectorParams(size=3072, distance=Distance.COSINE) +} +sparse_vectors_config = { + "sparse": SparseVectorParams(index=SparseIndexParams()) +} +``` + +Each point payload: +```json +{ + "service_id": "common_service_exchange_rate", + "name": "Valuutakursid", + "description": "Kasutaja soovib infot valuutade kohta", + "examples": ["Mis suhe on euro ja usd vahel", "..."], + "entities": ["currency_from", "currency_to"], + "context": "LLM-generated enriched context...", + "point_type": "example", + "example_text": "Mis suhe on euro ja usd vahel", + "point_index": 0 +} +``` + +### Enrichment Pipeline Flow + +``` +service_enrichment.sh + │ + ├─ Parse args: service_id, name, description, examples, entities + │ + ├─ Step 1: LLM context generation (enriched description) + │ + ├─ Step 2: For each example query: + │ ├─ Generate dense embedding (text-embedding-3-large) + │ └─ Generate sparse vector (BM25 term hashing) + │ + ├─ Step 3: Summary point (name + description + LLM context): + │ ├─ Generate dense embedding + │ └─ Generate sparse vector + │ + ├─ Step 4: Delete existing points for this service (idempotent) + │ + └─ Step 5: Bulk upsert N+1 points to Qdrant +``` + +### Service Deletion + +When a service is deactivated, all its points are removed: +```python +qdrant_manager.delete_service_points(service_id) +# Uses payload filter: {"service_id": service_id} +``` + +--- + +## Classification Flow (Query Time) + +### Source Files + +| File | Role | +|------|------| +| `src/tool_classifier/classifier.py` | Two-step search + routing decisions | +| `src/tool_classifier/constants.py` | All thresholds and configuration | +| `src/tool_classifier/sparse_encoder.py` | Query sparse vector generation | +| `src/tool_classifier/workflows/service_workflow.py` | Service execution with 3 routing paths | + +### Step 1: Dense Search — "Is This a Service Query?" + +Queries Qdrant using only the dense vector to get **actual cosine similarity scores** (0.0 – 1.0). + +```python +# classifier.py → _dense_search() +POST /collections/intent_collections/points/query +{ + "query": [0.023, -0.041, ...], # 3072-dim dense vector + "using": "dense", + "limit": 6, + "with_payload": true +} +``` + +Results are deduplicated by `service_id` (best score per service). + +**Why not use RRF scores?** +Qdrant's RRF uses `1/(1+rank)`, producing fixed scores (0.50, 0.33, 0.25) regardless of actual relevance. A perfect match and a random query both get 0.50 for rank 1. Cosine similarity reflects true semantic closeness. + +### Step 2: Hybrid Search — "Which Service?" + +Only runs if cosine ≥ `DENSE_MIN_THRESHOLD`. Combines dense + sparse search with RRF fusion. + +```python +# classifier.py → _hybrid_search() +POST /collections/intent_collections/points/query +{ + "prefetch": [ + {"query": dense_vector, "using": "dense", "limit": 20}, + {"query": {"indices": [...], "values": [...]}, "using": "sparse", "limit": 20} + ], + "query": {"fusion": "rrf"}, + "limit": 5, + "with_payload": true +} +``` + +### Routing Decision + +``` +Dense cosine score + gap + │ + ├─ cosine < 0.20 → PATH 1: Skip SERVICE → CONTEXT/RAG + │ + ├─ cosine ≥ 0.40 AND → PATH 2: HIGH-CONFIDENCE SERVICE + │ gap ≥ 0.05 (skip discovery, entity extraction only) + │ + └─ else (0.20 ≤ cosine < 0.40 → PATH 3: AMBIGUOUS SERVICE + OR gap < 0.05) (LLM intent detection on candidates) +``` + +### Path 1: Non-Service Query → CONTEXT/RAG + +Top cosine score below minimum threshold. The query has no meaningful similarity to any indexed service. + +``` +Query: "Tere, kuidas läheb?" +Dense: top cosine=0.15 → below 0.20 → skip SERVICE +→ Routes directly to CONTEXT → RAG (saves ~2-4s) +``` + +### Path 2: HIGH-CONFIDENCE Service Match + +One service clearly stands out with high cosine and large gap to second result. + +``` +Query: "Palju saan 1 EUR eest THBdes?" +Dense: Valuutakursid (cosine=0.5511), gap=0.2371 +→ 0.5511 ≥ 0.40 AND 0.2371 ≥ 0.05 → HIGH-CONFIDENCE +→ Skips service discovery +→ Runs entity extraction on matched service only +→ Entities: {currency_from: EUR, currency_to: THB} +→ Validation: PASSED ✓ +``` + +### Path 3: AMBIGUOUS Service Match → LLM Confirmation + +Multiple services score similarly or cosine is in the medium range. + +``` +Query: "Mis on täna ilm?" +Dense: Ilmapäring (cosine=0.35), gap=0.02 +→ 0.35 ≥ 0.20 but 0.35 < 0.40 → AMBIGUOUS +→ Runs LLM Intent Detection on top 3 candidates +→ LLM confirms or rejects → falls back to RAG if rejected +``` + +### Fallback Chain + +Each workflow returns a response or `None` (fallback to next): + +``` +SERVICE (Layer 1) → CONTEXT (Layer 2) → RAG (Layer 3) → OOD (Layer 4) +``` + +--- + +## Intent Detection & Entity Extraction + +### When Does It Run? + +| Path | Intent Detection | Entity Extraction | +|------|-----------------|-------------------| +| HIGH-CONFIDENCE | On 1 service (matched) | Yes — from LLM output | +| AMBIGUOUS | On 2-3 candidates | Yes — if LLM matches | +| Non-service | Not run | Not run | + +### Intent Detection Module (DSPy) + +**File:** `src/tool_classifier/intent_detector.py` + +The DSPy `IntentDetectionModule` receives: +- User query +- Candidate services (formatted as JSON) +- Conversation history (last 3 turns) + +It returns: +```json +{ + "matched_service_id": "common_service_exchange_rate", + "confidence": 0.92, + "entities": { + "currency_from": "EUR", + "currency_to": "THB" + }, + "reasoning": "User wants EUR to THB exchange rate" +} +``` + +### Entity Validation + +**File:** `src/tool_classifier/workflows/service_workflow.py` → `_validate_entities()` + +Extracted entities are validated against the service's schema: + +``` +Schema: ["currency_from", "currency_to"] +Extracted: {"currency_from": "EUR", "currency_to": "THB"} +Result: PASSED ✓ +``` + +- **Missing entities** → sent as empty strings (service validates) +- **Extra entities** → ignored +- **Validation is lenient** — always proceeds, lets the service endpoint validate + +### Entity Transformation + +Entities dict → ordered array matching service schema: + +```python +# Schema: ["currency_from", "currency_to"] +# Dict: {"currency_from": "EUR", "currency_to": "THB"} +# Array: ["EUR", "THB"] +``` + +--- + +## Thresholds & Configuration + +All defined in `src/tool_classifier/constants.py`. + +### Classification Thresholds + +| Constant | Value | Description | +|----------|-------|-------------| +| `DENSE_MIN_THRESHOLD` | `0.20` | Minimum cosine to consider any service match. Below → skip SERVICE entirely. Set low because multilingual (Estonian) queries yield lower cosine (0.25–0.55). | +| `DENSE_HIGH_CONFIDENCE_THRESHOLD` | `0.40` | Cosine for HIGH-CONFIDENCE path. Service queries with correct match score > 0.40 (observed: 0.55). Non-service score 0.27–0.35. | +| `DENSE_SCORE_GAP_THRESHOLD` | `0.05` | Required gap between top two services. Prevents false positives when multiple services score similarly. Service gaps: ~0.24, non-service gaps: ~0.01. | + +### Search Configuration + +| Constant | Value | Description | +|----------|-------|-------------| +| `DENSE_SEARCH_TOP_K` | `3` | Unique services from dense search | +| `HYBRID_SEARCH_TOP_K` | `5` | Results from hybrid RRF search | + +### Observed Score Distributions + +Based on real Estonian query testing: + +| Metric | Service Query | Non-Service Query | +|--------|:------------:|:-----------------:| +| Top cosine | **0.55** | 0.27 – 0.35 | +| Cosine gap | **0.24** | 0.005 – 0.017 | +| Decision | HIGH-CONFIDENCE | AMBIGUOUS → LLM reject | + +### Performance by Path + +| Path | Latency | LLM Calls | Cost | +|------|:-------:|:---------:|:----:| +| Non-service (below threshold) | ~0.3s | 0 | $0 | +| HIGH-CONFIDENCE service | ~2.0s | 1 | ~$0.002 | +| AMBIGUOUS service | ~3.5s | 1-2 | ~$0.002–0.004 | +| Legacy (no classifier) | ~4.0s | 2+ | ~$0.004+ | + +### Tuning Recommendations + +- **Adding more services:** Score distributions improve naturally — service queries score higher, non-service score lower. +- **Adding more examples per service:** Diverse phrasings expand the embedding coverage. Aim for 5-8 examples per service covering formal + informal + different word orders. +- **Adjusting thresholds:** Monitor the logs (`Dense search: top=... cosine=...`) and adjust if real-world scores differ from test data. diff --git a/src/tool_classifier/classifier.py b/src/tool_classifier/classifier.py index 2313b94..181b6cf 100644 --- a/src/tool_classifier/classifier.py +++ b/src/tool_classifier/classifier.py @@ -9,7 +9,7 @@ OrchestrationRequest, OrchestrationResponse, ) -from tool_classifier.enums import WorkflowType, WORKFLOW_DISPLAY_NAMES +from tool_classifier.enums import WorkflowType, WORKFLOW_DISPLAY_NAMES, WORKFLOW_LAYER_ORDER from tool_classifier.models import ClassificationResult from tool_classifier.constants import ( QDRANT_HOST, @@ -17,9 +17,10 @@ QDRANT_COLLECTION, QDRANT_TIMEOUT, HYBRID_SEARCH_TOP_K, - HYBRID_SEARCH_MIN_THRESHOLD, - SCORE_RATIO_THRESHOLD, - SCORE_GAP_THRESHOLD, + DENSE_SEARCH_TOP_K, + DENSE_MIN_THRESHOLD, + DENSE_HIGH_CONFIDENCE_THRESHOLD, + DENSE_SCORE_GAP_THRESHOLD, ) from tool_classifier.sparse_encoder import compute_sparse_vector from tool_classifier.workflows import ( @@ -34,7 +35,11 @@ class ToolClassifier: """ Main classifier that determines which workflow should handle user queries. - Uses Qdrant hybrid search (dense + sparse + RRF fusion) to classify queries: + Uses a two-step search approach for classification: + 1. Dense-only search → real cosine similarity scores for relevance check + 2. Hybrid search (dense + sparse + RRF) → best service identification + + Routing decisions: - High-confidence service match → SERVICE workflow (skip discovery + intent detection) - Ambiguous match → SERVICE workflow with LLM confirmation - No match → CONTEXT/RAG workflow (skip SERVICE entirely) @@ -97,16 +102,15 @@ async def classify( language: str, ) -> ClassificationResult: """ - Classify a user query using Qdrant hybrid search (dense + sparse + RRF). + Classify a user query using a two-step search approach. + + Step 1: Dense-only search → cosine similarity for relevance check + Step 2: Hybrid search (dense + sparse + RRF) → service identification - Classification flow: - 1. Generate dense embedding for the query - 2. Generate sparse vector for the query (BM25-style) - 3. Run hybrid search on intent_collections (prefetch dense + sparse → RRF fusion) - 4. Apply score-gap analysis: - - Clear winner (high ratio + gap) → SERVICE with high confidence - - Ambiguous (scores exist but close) → SERVICE with LLM confirmation flag - - No match (low/no scores) → CONTEXT (skip SERVICE entirely) + Routing: + - cosine < DENSE_MIN_THRESHOLD → CONTEXT/RAG (skip SERVICE) + - cosine ≥ HIGH_CONFIDENCE + large gap → SERVICE (no LLM needed) + - else → SERVICE with LLM confirmation Args: query: User's query string @@ -130,107 +134,126 @@ async def classify( reasoning="Could not generate embedding - skip to Context/RAG", ) - # Step 2: Generate sparse vector for query - query_sparse = compute_sparse_vector(query) - - # Step 3: Qdrant hybrid search with RRF fusion - results = await self._hybrid_search( + # Step 2: Dense-only search → get actual cosine similarity scores + dense_results = await self._dense_search( dense_vector=query_embedding, - sparse_vector=query_sparse, - top_k=HYBRID_SEARCH_TOP_K, + top_k=DENSE_SEARCH_TOP_K, ) - if not results: - logger.info("No hybrid search results - routing to CONTEXT/RAG") + if not dense_results: + logger.info("No dense search results - routing to CONTEXT/RAG") return ClassificationResult( workflow=WorkflowType.CONTEXT, confidence=1.0, metadata={"reason": "no_service_match"}, - reasoning="No services matched the query", + reasoning="No services matched the query (dense search empty)", ) - # Step 4: Score-gap analysis - top = results[0] - top_score = top.get("rrf_score", 0.0) - top_service_id = top.get("service_id", "unknown") - top_service_name = top.get("name", "unknown") - - second_score = results[1].get("rrf_score", 0.0) if len(results) > 1 else 0.0 - - score_ratio = top_score / max(second_score, 0.0001) - score_gap = top_score - second_score + top_cosine = dense_results[0].get("cosine_score", 0.0) + top_service_name = dense_results[0].get("name", "unknown") + second_cosine = dense_results[1].get("cosine_score", 0.0) if len(dense_results) > 1 else 0.0 + cosine_gap = top_cosine - second_cosine logger.info( - f"Hybrid search results - " - f"top: {top_service_name} (score={top_score:.6f}), " - f"second: {results[1].get('name', 'none') if len(results) > 1 else 'none'} " - f"(score={second_score:.6f}), " - f"ratio={score_ratio:.2f}, gap={score_gap:.6f}" + f"Dense search: top={top_service_name} " + f"(cosine={top_cosine:.4f}), " + f"second={dense_results[1].get('name', 'none') if len(dense_results) > 1 else 'none'} " + f"(cosine={second_cosine:.4f}), " + f"gap={cosine_gap:.4f}" ) - # High confidence: clear winner → SERVICE (skip discovery + intent detection) - if score_ratio > SCORE_RATIO_THRESHOLD and score_gap > SCORE_GAP_THRESHOLD: + # Decision: Is this a service query at all? + if top_cosine < DENSE_MIN_THRESHOLD: logger.info( - f"High-confidence service match: {top_service_name} " - f"(ratio={score_ratio:.2f}, gap={score_gap:.6f})" + f"Low relevance (cosine={top_cosine:.4f} < {DENSE_MIN_THRESHOLD}) " + f"- routing to CONTEXT/RAG, skipping SERVICE" ) return ClassificationResult( - workflow=WorkflowType.SERVICE, - confidence=min(score_ratio / 5.0, 1.0), + workflow=WorkflowType.CONTEXT, + confidence=1.0, metadata={ - "matched_service_id": top_service_id, - "matched_service_name": top_service_name, - "rrf_score": top_score, - "score_gap": score_gap, - "score_ratio": score_ratio, - "needs_llm_confirmation": False, - "top_results": results[:3], + "reason": "below_dense_threshold", + "top_cosine": top_cosine, + "top_service": top_service_name, }, reasoning=( - f"High-confidence match: {top_service_name} " - f"(ratio={score_ratio:.2f}, gap={score_gap:.6f})" + f"Dense cosine {top_cosine:.4f} below threshold " + f"{DENSE_MIN_THRESHOLD} - skip to Context/RAG" ), ) - # Medium confidence: ambiguous → SERVICE with LLM confirmation - if top_score > HYBRID_SEARCH_MIN_THRESHOLD: + # Step 3: Hybrid search → identify best service using RRF + query_sparse = compute_sparse_vector(query) + hybrid_results = await self._hybrid_search( + dense_vector=query_embedding, + sparse_vector=query_sparse, + top_k=HYBRID_SEARCH_TOP_K, + ) + + # Use hybrid results for service identification, dense scores for confidence + if not hybrid_results: + # Dense matched but hybrid didn't — use dense results + hybrid_results = dense_results + + top_result = hybrid_results[0] + top_service_id = top_result.get("service_id", "unknown") + top_service_name_hybrid = top_result.get("name", "unknown") + + logger.info( + f"Hybrid search: best service={top_service_name_hybrid} " + f"(service_id={top_service_id})" + ) + + # High confidence: cosine is high AND clear gap to second result + if ( + top_cosine >= DENSE_HIGH_CONFIDENCE_THRESHOLD + and cosine_gap >= DENSE_SCORE_GAP_THRESHOLD + ): logger.info( - f"Ambiguous service match: {top_service_name} " - f"(score={top_score:.6f}, ratio={score_ratio:.2f}) - needs LLM confirmation" + f"HIGH-CONFIDENCE match: {top_service_name_hybrid} " + f"(cosine={top_cosine:.4f}, gap={cosine_gap:.4f})" ) return ClassificationResult( workflow=WorkflowType.SERVICE, - confidence=0.5, + confidence=min(top_cosine, 1.0), metadata={ "matched_service_id": top_service_id, - "matched_service_name": top_service_name, - "rrf_score": top_score, - "score_gap": score_gap, - "score_ratio": score_ratio, - "needs_llm_confirmation": True, - "top_results": results[:3], + "matched_service_name": top_service_name_hybrid, + "cosine_score": top_cosine, + "cosine_gap": cosine_gap, + "needs_llm_confirmation": False, + "top_results": hybrid_results[:3], }, reasoning=( - f"Ambiguous match: {top_service_name} " - f"(score={top_score:.6f}) - LLM confirmation needed" + f"High-confidence match: {top_service_name_hybrid} " + f"(cosine={top_cosine:.4f}, gap={cosine_gap:.4f})" ), ) - # No confidence: skip SERVICE entirely → CONTEXT/RAG + # Medium confidence: above min threshold but ambiguous logger.info( - f"No service match (top_score={top_score:.6f} below threshold " - f"{HYBRID_SEARCH_MIN_THRESHOLD}) - routing to CONTEXT/RAG" + f"AMBIGUOUS match: {top_service_name_hybrid} " + f"(cosine={top_cosine:.4f}, gap={cosine_gap:.4f}) - needs LLM confirmation" ) return ClassificationResult( - workflow=WorkflowType.CONTEXT, - confidence=1.0, - metadata={"reason": "below_threshold", "top_score": top_score}, - reasoning=f"Top score {top_score:.6f} below threshold - skip to Context/RAG", + workflow=WorkflowType.SERVICE, + confidence=0.5, + metadata={ + "matched_service_id": top_service_id, + "matched_service_name": top_service_name_hybrid, + "cosine_score": top_cosine, + "cosine_gap": cosine_gap, + "needs_llm_confirmation": True, + "top_results": hybrid_results[:3], + }, + reasoning=( + f"Ambiguous match: {top_service_name_hybrid} " + f"(cosine={top_cosine:.4f}) - LLM confirmation needed" + ), ) except Exception as e: logger.error(f"Hybrid classification failed: {e}", exc_info=True) - # Fallback: route to CONTEXT/RAG on any error return ClassificationResult( workflow=WorkflowType.CONTEXT, confidence=1.0, @@ -269,6 +292,92 @@ def _get_query_embedding(self, query: str) -> Optional[List[float]]: logger.error(f"Failed to generate query embedding: {e}") return None + async def _dense_search( + self, + dense_vector: List[float], + top_k: int = DENSE_SEARCH_TOP_K, + ) -> List[Dict[str, Any]]: + """Execute dense-only search on Qdrant to get actual cosine similarity scores. + + This is used as a pre-filter: the cosine scores tell us HOW RELEVANT + the top results actually are, unlike RRF scores which are purely rank-based. + + Args: + dense_vector: Dense embedding vector (3072-dim) + top_k: Number of results to return + + Returns: + List of result dicts with service metadata and cosine_score, + deduplicated by service_id (best score per service) + """ + try: + search_payload = { + "query": dense_vector, + "using": "dense", + "limit": top_k * 2, # Get more to allow dedup by service + "with_payload": True, + } + + response = await self._qdrant_client.post( + f"/collections/{QDRANT_COLLECTION}/points/query", + json=search_payload, + ) + + if response.status_code != 200: + logger.error( + f"Qdrant dense search failed: HTTP {response.status_code} - " + f"{response.text}" + ) + return [] + + search_results = response.json() + points = search_results.get("result", {}).get("points", []) + + if not points: + logger.info("No results from dense search") + return [] + + # Deduplicate by service_id (keep best cosine score per service) + service_results: Dict[str, Dict[str, Any]] = {} + for point in points: + payload = point.get("payload", {}) + score = float(point.get("score", 0)) + service_id = payload.get("service_id", "unknown") + + if service_id not in service_results or score > service_results[service_id].get("cosine_score", 0): + service_results[service_id] = { + "service_id": service_id, + "name": payload.get("name", ""), + "description": payload.get("description", ""), + "examples": payload.get("examples", []), + "entities": payload.get("entities", []), + "context": payload.get("context", ""), + "point_type": payload.get("point_type", "unknown"), + "example_text": payload.get("example_text"), + "cosine_score": score, + } + + # Sort by cosine score descending + sorted_results = sorted( + service_results.values(), + key=lambda x: x["cosine_score"], + reverse=True, + ) + + logger.info( + f"Dense search found {len(sorted_results)} unique services " + f"(top cosine: {sorted_results[0]['cosine_score']:.4f})" + ) + + return sorted_results + + except httpx.TimeoutException: + logger.error(f"Qdrant dense search timeout after {QDRANT_TIMEOUT}s") + return [] + except Exception as e: + logger.error(f"Dense search failed: {e}", exc_info=True) + return [] + async def _hybrid_search( self, dense_vector: List[float], @@ -535,7 +644,6 @@ async def _execute_with_fallback_async( ) # Get the layer order starting from current layer - from tool_classifier.enums import WORKFLOW_LAYER_ORDER current_index = WORKFLOW_LAYER_ORDER.index(start_layer) remaining_layers = WORKFLOW_LAYER_ORDER[current_index + 1 :] @@ -557,7 +665,6 @@ async def _execute_with_fallback_async( return result logger.info(f"[{chat_id}] {next_name} returned None, continuing...") - current_index += 1 # This should never happen since RAG/OOD should always return result raise RuntimeError("All workflows returned None (unexpected)") @@ -617,7 +724,6 @@ async def _execute_with_fallback_streaming( ) # Get the layer order starting from current layer - from tool_classifier.enums import WORKFLOW_LAYER_ORDER current_index = WORKFLOW_LAYER_ORDER.index(start_layer) remaining_layers = WORKFLOW_LAYER_ORDER[current_index + 1 :] @@ -642,7 +748,6 @@ async def _execute_with_fallback_streaming( return logger.info(f"[{chat_id}] {next_name} returned None, continuing...") - current_index += 1 # This should never happen raise RuntimeError("All workflows returned None in streaming (unexpected)") diff --git a/src/tool_classifier/constants.py b/src/tool_classifier/constants.py index 7db6aa9..9c4adf8 100644 --- a/src/tool_classifier/constants.py +++ b/src/tool_classifier/constants.py @@ -65,15 +65,21 @@ # ============================================================================ HYBRID_SEARCH_TOP_K = 5 -"""Number of top results from hybrid search for classification.""" +"""Number of top results from hybrid search for service identification.""" -HYBRID_SEARCH_MIN_THRESHOLD = 0.01 -"""Minimum RRF score to consider a result as a potential match.""" +DENSE_SEARCH_TOP_K = 3 +"""Number of top results from dense-only search for relevance scoring.""" -SCORE_RATIO_THRESHOLD = 2.0 -"""Score ratio (top/second) for confident service classification. -If the top result's RRF score is > 2x the second result, it's a high-confidence match.""" +DENSE_MIN_THRESHOLD = 0.20 +"""Minimum dense cosine similarity to consider a result as a potential match. +Below this → skip SERVICE entirely, go to CONTEXT/RAG. +Note: Multilingual embeddings (Estonian/short queries) typically yield +lower cosine scores (0.25-0.40) than English. Tune based on observed scores.""" -SCORE_GAP_THRESHOLD = 0.005 -"""Absolute score gap for confident classification. -Prevents false positives when both scores are very low.""" +DENSE_HIGH_CONFIDENCE_THRESHOLD = 0.40 +"""Dense cosine similarity for high-confidence service classification. +Above this AND score gap is large → SERVICE without LLM confirmation.""" + +DENSE_SCORE_GAP_THRESHOLD = 0.05 +"""Cosine score gap (top - second) for high-confidence classification. +Ensures the top result is significantly better than the runner-up.""" From bee9fbfba8e0ce9bfaf62788159dfe8deb451b55 Mon Sep 17 00:00:00 2001 From: nuwangeek Date: Mon, 2 Mar 2026 21:57:15 +0530 Subject: [PATCH 4/4] fixed issue --- src/tool_classifier/sparse_encoder.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/tool_classifier/sparse_encoder.py b/src/tool_classifier/sparse_encoder.py index 0d0dc3f..06f38a8 100644 --- a/src/tool_classifier/sparse_encoder.py +++ b/src/tool_classifier/sparse_encoder.py @@ -8,6 +8,7 @@ Uses hash-based indexing compatible with Qdrant's sparse vector format. """ +import hashlib import re from collections import Counter from dataclasses import dataclass, field @@ -68,10 +69,12 @@ def compute_sparse_vector(text: str) -> SparseVector: token_counts = Counter(tokens) # Hash-based indexing: map each token to an index in [0, SPARSE_VOCAB_SIZE) - # Collisions are handled by summing values at the same index + # Uses MD5 (first 4 bytes) for deterministic cross-process indices. + # Collisions are handled by summing values at the same index. hash_counts: dict[int, float] = {} for token, count in token_counts.items(): - idx = hash(token) % SPARSE_VOCAB_SIZE + digest = hashlib.md5(token.encode(), usedforsecurity=False).digest() # noqa: S324 + idx = int.from_bytes(digest[:4], "little") % SPARSE_VOCAB_SIZE # Handle hash collisions by accumulating hash_counts[idx] = hash_counts.get(idx, 0) + float(count)