Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions tests/services/corpus/test_filter_attributes_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,31 @@ def test_text_integer_boolean_filters(self, client, unique_id):
description="both documents indexed",
)

text_query = client.post(
"/v2/query",
data={
"query": "research and data",
"search": {
"corpora": [{"corpus_key": corpus_key, "metadata_filter": "part.category = 'tech'"}],
"limit": 10,
},
},
def _filter_query_returns_results(metadata_filter):
def _check():
resp = client.post(
"/v2/query",
data={
"query": "research and data",
"search": {
"corpora": [{"corpus_key": corpus_key, "metadata_filter": metadata_filter}],
"limit": 10,
},
},
)
if not resp.success:
return None
if not resp.data.get("search_results"):
return None
return resp

return _check

text_query = wait_for(
_filter_query_returns_results("part.category = 'tech'"),
timeout=30,
interval=2,
description="text filter query to return results",
)
assert text_query.success, f"Text filter query failed: {text_query.status_code}"
text_results = text_query.data.get("search_results", [])
Expand All @@ -77,15 +93,11 @@ def test_text_integer_boolean_filters(self, client, unique_id):
"quantum" in r.get("text", "").lower() for r in text_results
), f"Text filter for 'tech' should only return tech doc: {[r.get('text', '')[:50] for r in text_results]}"

int_query = client.post(
"/v2/query",
data={
"query": "research and data",
"search": {
"corpora": [{"corpus_key": corpus_key, "metadata_filter": "part.priority >= 3"}],
"limit": 10,
},
},
int_query = wait_for(
_filter_query_returns_results("part.priority >= 3"),
timeout=30,
interval=2,
description="integer filter query to return results",
)
assert int_query.success, f"Integer filter query failed: {int_query.status_code}"
int_results = int_query.data.get("search_results", [])
Expand All @@ -94,15 +106,11 @@ def test_text_integer_boolean_filters(self, client, unique_id):
"climate" in r.get("text", "").lower() for r in int_results
), f"Integer filter >= 3 should only return science doc: {[r.get('text', '')[:50] for r in int_results]}"

bool_query = client.post(
"/v2/query",
data={
"query": "research and data",
"search": {
"corpora": [{"corpus_key": corpus_key, "metadata_filter": "part.is_public = true"}],
"limit": 10,
},
},
bool_query = wait_for(
_filter_query_returns_results("part.is_public = true"),
timeout=30,
interval=2,
description="boolean filter query to return results",
)
assert bool_query.success, f"Boolean filter query failed: {bool_query.status_code}"
bool_results = bool_query.data.get("search_results", [])
Expand Down
16 changes: 15 additions & 1 deletion tests/services/indexing/test_document_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ def test_index_query_delete_query_cycle(self, client, test_corpus, unique_id):
description="document to be indexed",
)

query_resp = client.query(test_corpus, "Krakatoa volcano eruption", limit=10)
def _krakatoa_in_results():
qr = client.query(test_corpus, "Krakatoa volcano eruption", limit=10)
if not qr.success:
return None
hits = qr.data.get("search_results", [])
if any("krakatoa" in r.get("text", "").lower() for r in hits):
return qr
return None

query_resp = wait_for(
_krakatoa_in_results,
timeout=30,
interval=2,
description="Krakatoa to appear in search",
)
assert query_resp.success, f"Query failed: {query_resp.status_code}"
results = query_resp.data.get("search_results", [])
found = any("krakatoa" in r.get("text", "").lower() for r in results)
Expand Down
29 changes: 21 additions & 8 deletions tests/services/query/test_query_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,28 @@ def test_query_with_valid_metadata_filter(self, client, unique_id):
description="document to be indexed",
)

query_resp = client.post(
"/v2/query",
data={
"query": "artificial intelligence",
"search": {
"corpora": [{"corpus_key": corpus_key, "metadata_filter": "part.topic = 'ai'"}],
"limit": 10,
def _query_returns_results():
resp = client.post(
"/v2/query",
data={
"query": "artificial intelligence",
"search": {
"corpora": [{"corpus_key": corpus_key, "metadata_filter": "part.topic = 'ai'"}],
"limit": 10,
},
},
},
)
if not resp.success:
return None
if not resp.data.get("search_results"):
return None
return resp

query_resp = wait_for(
_query_returns_results,
timeout=30,
interval=2,
description="filter query to return results",
)
assert query_resp.success, f"Query failed: {query_resp.status_code} - {query_resp.data}"
results = query_resp.data.get("search_results", [])
Expand Down
75 changes: 64 additions & 11 deletions utils/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import logging
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
Expand All @@ -20,6 +21,39 @@

from .config import Config

# Methods safe to auto-retry on 5xx/429. POST/PUT/PATCH/DELETE are excluded
# because urllib3 cannot tell whether a retried mutation already committed
# server-side, and a duplicate POST that returns 409 looks indistinguishable
# from a "real" duplicate-key bug. See PR fixing flaky agent-creation tests
# where a transient 5xx on the first attempt left the resource created and
# the retried POST returned 409 "already exists".
RETRY_SAFE_METHODS = frozenset({"GET", "HEAD", "OPTIONS"})


def _extract_retry_history(response: requests.Response) -> list:
"""Return urllib3's per-attempt retry history as a list of dicts.

Empty list means the response came from a single attempt with no retries.
Each entry is a RequestHistory namedtuple from urllib3 with fields
method/url/error/status/redirect_location; we render them as plain dicts
so failed-test logs and APIResponse consumers don't need urllib3 imports.
"""
raw = getattr(response, "raw", None)
retries = getattr(raw, "retries", None) if raw is not None else None
history = getattr(retries, "history", None) if retries is not None else None
if not history:
return []
return [
{
"method": getattr(item, "method", None),
"url": getattr(item, "url", None),
"status": getattr(item, "status", None),
"error": str(item.error) if getattr(item, "error", None) is not None else None,
"redirect_location": getattr(item, "redirect_location", None),
}
for item in history
]


@dataclass
class APIResponse:
Expand All @@ -30,6 +64,8 @@ class APIResponse:
elapsed_ms: float
headers: dict = field(default_factory=dict)
error: Optional[str] = None
request_id: Optional[str] = None
retry_history: list = field(default_factory=list)

@property
def success(self) -> bool:
Expand All @@ -52,12 +88,11 @@ def session(self) -> requests.Session:
if self._session is None:
self._session = requests.Session()

# Configure retry strategy
retry_strategy = Retry(
total=self.config.max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allowed_methods=RETRY_SAFE_METHODS,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self._session.mount("https://", adapter)
Expand Down Expand Up @@ -102,9 +137,10 @@ def _request(
APIResponse with status, data, and timing
"""
url = self._build_url(endpoint)
request_headers = {**(headers or {})}
request_id = uuid.uuid4().hex
request_headers = {"X-Request-Id": request_id, **(headers or {})}

self.logger.debug(f"{method} {url}")
self.logger.debug(f"{method} {url} request_id={request_id}")

start_time = time.time()

Expand All @@ -120,49 +156,65 @@ def _request(

elapsed_ms = (time.time() - start_time) * 1000

# Try to parse JSON response
try:
response_data = response.json()
except ValueError:
response_data = response.text

self.logger.debug(f"Response: {response.status_code} ({elapsed_ms:.1f}ms)")
retry_history = _extract_retry_history(response)
if retry_history:
self.logger.warning(
"Request retried %d time(s): method=%s url=%s request_id=%s " "final_status=%s history=%s",
len(retry_history),
method,
url,
request_id,
response.status_code,
retry_history,
)

self.logger.debug(f"Response: {response.status_code} ({elapsed_ms:.1f}ms) request_id={request_id}")

return APIResponse(
status_code=response.status_code,
data=response_data,
elapsed_ms=elapsed_ms,
headers=dict(response.headers),
request_id=request_id,
retry_history=retry_history,
)

except requests.exceptions.Timeout:
elapsed_ms = (time.time() - start_time) * 1000
self.logger.error(f"Request timeout after {elapsed_ms:.1f}ms")
self.logger.error(f"Request timeout after {elapsed_ms:.1f}ms request_id={request_id}")
return APIResponse(
status_code=408,
data=None,
elapsed_ms=elapsed_ms,
error="Request timeout",
request_id=request_id,
)

except requests.exceptions.ConnectionError as e:
elapsed_ms = (time.time() - start_time) * 1000
self.logger.error(f"Connection error: {e}")
self.logger.error(f"Connection error: {e} request_id={request_id}")
return APIResponse(
status_code=0,
data=None,
elapsed_ms=elapsed_ms,
error=f"Connection error: {str(e)}",
request_id=request_id,
)

except Exception as e:
elapsed_ms = (time.time() - start_time) * 1000
self.logger.error(f"Unexpected error: {e}")
self.logger.error(f"Unexpected error: {e} request_id={request_id}")
return APIResponse(
status_code=0,
data=None,
elapsed_ms=elapsed_ms,
error=f"Unexpected error: {str(e)}",
request_id=request_id,
)

def _request_raw(
Expand Down Expand Up @@ -198,9 +250,10 @@ def _request_raw(
The raw :class:`requests.Response` object.
"""
url = self._build_url(endpoint)
request_headers = {**(headers or {})}
request_id = uuid.uuid4().hex
request_headers = {"X-Request-Id": request_id, **(headers or {})}

self.logger.debug(f"{method} {url}")
self.logger.debug(f"{method} {url} request_id={request_id}")

kwargs: dict = {
"method": method,
Expand Down
Loading