From fc3873175742bd33b7f706cf9595bb5775819623 Mon Sep 17 00:00:00 2001 From: Jorge Rivera Date: Thu, 9 Apr 2026 16:34:19 -0600 Subject: [PATCH 1/3] Autodiscover dataflow --- CHANGELOG.md | 4 + docs/docs/advanced.md | 36 +- docs/docs/caching.md | 12 + docs/docs/index.md | 3 +- docs/docs/why-oda-reader.md | 2 +- pyproject.toml | 2 +- src/oda_reader/__init__.py | 3 + src/oda_reader/_http_primitives.py | 145 ++++++++ src/oda_reader/common.py | 255 ++++++-------- src/oda_reader/crs.py | 1 + src/oda_reader/dac1.py | 1 + src/oda_reader/dac2a.py | 1 + src/oda_reader/download/download_tools.py | 125 +++++-- src/oda_reader/download/version_discovery.py | 163 +++++++++ src/oda_reader/multisystem.py | 1 + tests/common/unit/test_cache.py | 10 +- tests/conftest.py | 6 +- tests/download/unit/test_download_tools.py | 251 ++++++++++++- tests/download/unit/test_version_discovery.py | 330 ++++++++++++++++++ tests/download/unit/test_version_fallback.py | 64 +++- uv.lock | 2 +- 21 files changed, 1181 insertions(+), 236 deletions(-) create mode 100644 src/oda_reader/_http_primitives.py create mode 100644 src/oda_reader/download/version_discovery.py create mode 100644 tests/download/unit/test_version_discovery.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 65e8c07..f336471 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog for oda_reader +## 1.5.0 (2026-04-09) +- Replaces blind version-decrement fallback with authoritative SDMX metadata endpoint lookup for all datasets. +- Adds `clear_version_cache()` to the public API for forcing fresh version discovery mid-session. + ## 1.4.3 (2026-04-09) - Fixes DAC1 query filter dimension order to match the current DSD schema, which added SECTOR at position 2. - Bumps DAC1 dataflow version from 1.7 to 1.8. diff --git a/docs/docs/advanced.md b/docs/docs/advanced.md index 0f5287f..3dc80cc 100644 --- a/docs/docs/advanced.md +++ b/docs/docs/advanced.md @@ -14,12 +14,13 @@ qb = QueryBuilder() # Build a custom DAC1 filter filter_string = qb.build_dac1_filter( donor="USA", + sector="_Z", measure="1010", flow_type="1140" ) print(filter_string) -# Output: "USA..1010..1140.." +# Output: "USA._Z.1010..1140.." ``` This filter string can be used to manually construct API URLs. @@ -39,50 +40,43 @@ Each method returns a filter string suitable for the SDMX API. ## Dataflow Version Handling -OECD occasionally changes dataflow versions (schema updates). ODA Reader handles this automatically with version fallback. +OECD occasionally changes dataflow versions (schema updates). ODA Reader handles this automatically with version discovery. -### Automatic Fallback +### Automatic Version Discovery -When a dataflow version returns 404 (not found), ODA Reader automatically: +When a dataflow version returns an error (not found), ODA Reader automatically queries the OECD's SDMX metadata endpoint to discover the latest published version and retries with it: -1. Tries the configured version (e.g., `1.5`) -2. If 404, retries with `1.4` -3. Continues decrementing: `1.3`, `1.2`, `1.1` -4. Returns data from first successful version (up to 5 attempts) +1. Tries the configured default version +2. If not found, queries the metadata endpoint for the authoritative latest version +3. Retries once with the discovered version -This means your code keeps working even when OECD makes breaking schema changes. +This means your code keeps working even when OECD updates schema versions. You'll see a log message indicating which version was discovered. -**Example**: +### Clearing the Version Cache + +Discovered versions are cached in-process so repeated queries don't trigger extra metadata lookups. If the OECD publishes a new dataflow version mid-session, you can force re-discovery: ```python -from oda_reader import download_dac1 +from oda_reader import clear_version_cache -# ODA Reader will automatically try: -# 1.5 -> 404 -# 1.4 -> 404 -# 1.3 -> Success! Returns data with version 1.3 -data = download_dac1(start_year=2022, end_year=2022) +clear_version_cache() ``` -You'll see a message indicating which version succeeded. - ### Manual Version Override You can specify an exact dataflow version: ```python -# Force use of version 1.3 data = download_dac1( start_year=2022, end_year=2022, - dataflow_version="1.3" + dataflow_version="1.7" ) ``` **When to override**: - You know the correct version for reproducibility - Debugging version-specific issues -- Avoiding automatic fallback (for performance) **Available for**: - `download_dac1(dataflow_version=...)` diff --git a/docs/docs/caching.md b/docs/docs/caching.md index a504748..bfa04c3 100644 --- a/docs/docs/caching.md +++ b/docs/docs/caching.md @@ -141,6 +141,18 @@ disable_http_cache() enable_http_cache() ``` +### Clear Version Discovery Cache + +ODA Reader caches discovered dataflow versions in-process. If the OECD publishes a new version mid-session: + +```python +from oda_reader import clear_version_cache + +clear_version_cache() +``` + +This forces a fresh metadata lookup on the next query. + ### Clear HTTP Cache Only ```python diff --git a/docs/docs/index.md b/docs/docs/index.md index ccab9b3..efe88a5 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -15,8 +15,7 @@ ODA Reader eliminates these headaches. It provides a unified Python interface th - **Bulk download large files** with memory-efficient streaming for the full CRS (1GB+) - **Automatic rate limiting** and caching to work within API constraints - **Schema translation** between Data Explorer API and OECD.Stat formats -- **Version fallback** automatically searches for the most recent schema version since they -can unexpectedly change with new data releases. +- **Automatic version discovery** queries OECD's SDMX metadata to find the latest dataflow version, handling schema changes transparently. **Built for researchers, analysts, and developers** who need reliable, programmatic access to ODA data without fighting infrastructure. diff --git a/docs/docs/why-oda-reader.md b/docs/docs/why-oda-reader.md index 5b6346f..81b6e1e 100644 --- a/docs/docs/why-oda-reader.md +++ b/docs/docs/why-oda-reader.md @@ -26,7 +26,7 @@ The OECD Development Assistance Committee publishes comprehensive data on offici - Schema changes break queries without warning - No automatic retries or fallback mechanisms -**What ODA Reader provides**: Automatic version fallback, rate limiting built-in, consistent function interface, handles schema changes gracefully. +**What ODA Reader provides**: Automatic version discovery, rate limiting built-in, consistent function interface, handles schema changes gracefully. ### Manual Downloads from OECD.Stat diff --git a/pyproject.toml b/pyproject.toml index fb3c2db..cc5e8b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oda_reader" -version = "1.4.3" +version = "1.5.0" description = "A simple package to import ODA data from the OECD's API and AidData's database" readme = "README.md" license = "MIT" diff --git a/src/oda_reader/__init__.py b/src/oda_reader/__init__.py index 45f33af..55364ce 100644 --- a/src/oda_reader/__init__.py +++ b/src/oda_reader/__init__.py @@ -26,6 +26,7 @@ enable_http_cache, get_http_cache_info, ) +from oda_reader.download.version_discovery import clear_version_cache from oda_reader.crs import bulk_download_crs, download_crs, download_crs_file from oda_reader.dac1 import download_dac1 from oda_reader.dac2a import bulk_download_dac2a, download_dac2a @@ -55,6 +56,8 @@ "disable_http_cache", "clear_http_cache", "get_http_cache_info", + # Version discovery cache + "clear_version_cache", # DataFrame and bulk cache managers "dataframe_cache", "bulk_cache_manager", diff --git a/src/oda_reader/_http_primitives.py b/src/oda_reader/_http_primitives.py new file mode 100644 index 0000000..e2bca73 --- /dev/null +++ b/src/oda_reader/_http_primitives.py @@ -0,0 +1,145 @@ +"""Shared HTTP primitives used by both common.py and version_discovery.py. + +This module exists to break the circular import that would arise if +version_discovery.py imported from common.py while common.py imports +discover_latest_version from version_discovery.py. + +This module must not import from common.py or version_discovery.py to avoid +circular imports. +""" + +import logging +import time +from collections import deque + +import requests +import requests_cache + +from oda_reader._cache.config import get_http_cache_path + +logger = logging.getLogger("oda_importer") + +# --------------------------------------------------------------------------- +# Rate limiter +# --------------------------------------------------------------------------- + + +class RateLimiter: + """Simple blocking rate limiter. + + Parameters correspond to the maximum number of calls allowed within + ``period`` seconds. ``wait`` pauses execution when the limit has been + reached. + """ + + def __init__(self, max_calls: int = 20, period: float = 60.0) -> None: + self.max_calls = max_calls + self.period = period + self._calls: deque[float] = deque() + + def wait(self) -> None: + """Block until a new call is allowed.""" + now = time.monotonic() + while self._calls and now - self._calls[0] >= self.period: + self._calls.popleft() + if len(self._calls) >= self.max_calls: + sleep_for = self.period - (now - self._calls[0]) + time.sleep(max(sleep_for, 0)) + self._calls.popleft() + self._calls.append(time.monotonic()) + + +API_RATE_LIMITER = RateLimiter() + +# --------------------------------------------------------------------------- +# HTTP cache session +# --------------------------------------------------------------------------- + +# Global HTTP cache session (initialized lazily) +_HTTP_SESSION: requests_cache.CachedSession | None = None +_CACHE_ENABLED = True + + +def _get_http_session() -> requests_cache.CachedSession: + """Get or create the global HTTP cache session. + + All responses are cached for 7 days (604800 seconds). + Uses filesystem backend to handle large responses (>2GB). + + Returns: + CachedSession: requests-cache session with 7-day expiration. + """ + global _HTTP_SESSION + + if _HTTP_SESSION is None: + cache_path = str(get_http_cache_path()) + + _HTTP_SESSION = requests_cache.CachedSession( + cache_name=cache_path, + backend="filesystem", + expire_after=604800, # 7 days + allowable_codes=(200, 404), # Cache 404s for version fallback + stale_if_error=True, # Use stale cache if API errors + ) + + return _HTTP_SESSION + + +def get_response_text(url: str, headers: dict) -> tuple[int, str, bool]: + """GET request returning status code, text content, and cache hit status. + + This call is subject to the global rate limiter and HTTP caching. + + Args: + url: The URL to fetch. + headers: Headers to include in the request. + + Returns: + tuple[int, str, bool]: Status code, text content, and whether from cache. + """ + API_RATE_LIMITER.wait() + + if _CACHE_ENABLED: + session = _get_http_session() + response = session.get(url, headers=headers) + from_cache = getattr(response, "from_cache", False) + if from_cache: + logger.info(f"Loading data from HTTP cache: {url}") + else: + logger.info(f"Fetching data from API: {url}") + else: + response = requests.get(url, headers=headers) + from_cache = False + logger.info(f"Fetching data from API (cache disabled): {url}") + + return response.status_code, response.text, from_cache + + +def get_response_content(url: str, headers: dict) -> tuple[int, bytes, bool]: + """GET request returning status code, raw content, and cache hit status. + + This call is subject to the global rate limiter and HTTP caching. + + Args: + url: The URL to fetch. + headers: Headers to include in the request. + + Returns: + tuple[int, bytes, bool]: Status code, content bytes, and whether from cache. + """ + API_RATE_LIMITER.wait() + + if _CACHE_ENABLED: + session = _get_http_session() + response = session.get(url, headers=headers) + from_cache = getattr(response, "from_cache", False) + if from_cache: + logger.info(f"Loading data from HTTP cache: {url}") + else: + logger.info(f"Fetching data from API: {url}") + else: + response = requests.get(url, headers=headers) + from_cache = False + logger.info(f"Fetching data from API (cache disabled): {url}") + + return response.status_code, response.content, from_cache diff --git a/src/oda_reader/common.py b/src/oda_reader/common.py index a16813c..e3a4e59 100644 --- a/src/oda_reader/common.py +++ b/src/oda_reader/common.py @@ -1,26 +1,30 @@ import logging import re -import time -from collections import deque from copy import deepcopy from io import StringIO from pathlib import Path import pandas as pd -import requests -import requests_cache -from oda_reader._cache.config import get_http_cache_path +import oda_reader._http_primitives as _http_primitives +from oda_reader._http_primitives import ( + API_RATE_LIMITER, + RateLimiter, + _get_http_session, + get_response_content as _get_response_content, + get_response_text as _get_response_text, +) +from oda_reader.download.version_discovery import ( + discover_latest_version, + get_dimension_count, +) logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") logger = logging.getLogger("oda_importer") -FALLBACK_STEP = 0.1 -MAX_RETRIES = 5 - # Error message patterns that indicate a dataflow/version not found error -# These should trigger a version fallback retry +# These should trigger a version discovery retry DATAFLOW_NOT_FOUND_PATTERNS = ( "Could not find Dataflow", "Could not find DSD", @@ -32,7 +36,7 @@ def _is_dataflow_not_found_error(response: str) -> bool: """Check if the response indicates a dataflow/version not found error. - These errors should trigger a version fallback retry rather than + These errors should trigger a version discovery retry rather than being treated as valid data. Args: @@ -54,36 +58,6 @@ def _is_dataflow_not_found_error(response: str) -> bool: return is_too_short and looks_like_error -# Global HTTP cache session (initialized lazily) -_HTTP_SESSION: requests_cache.CachedSession | None = None -_CACHE_ENABLED = True - - -def _get_http_session() -> requests_cache.CachedSession: - """Get or create the global HTTP cache session. - - All responses are cached for 7 days (604800 seconds). - Uses filesystem backend to handle large responses (>2GB). - - Returns: - CachedSession: requests-cache session with 7-day expiration. - """ - global _HTTP_SESSION - - if _HTTP_SESSION is None: - cache_path = str(get_http_cache_path()) - - _HTTP_SESSION = requests_cache.CachedSession( - cache_name=cache_path, - backend="filesystem", - expire_after=604800, # 7 days - allowable_codes=(200, 404), # Cache 404s for version fallback - stale_if_error=True, # Use stale cache if API errors - ) - - return _HTTP_SESSION - - def enable_http_cache() -> None: """Enable HTTP response caching. @@ -91,10 +65,9 @@ def enable_http_cache() -> None: >>> from oda_reader import enable_http_cache >>> enable_http_cache() """ - global _CACHE_ENABLED - _CACHE_ENABLED = True - if _HTTP_SESSION is not None: - _HTTP_SESSION.cache.clear() # Clear any cached data from disabled session + _http_primitives._CACHE_ENABLED = True + if _http_primitives._HTTP_SESSION is not None: + _http_primitives._HTTP_SESSION.cache.clear() def disable_http_cache() -> None: @@ -104,8 +77,7 @@ def disable_http_cache() -> None: >>> from oda_reader import disable_http_cache >>> disable_http_cache() """ - global _CACHE_ENABLED - _CACHE_ENABLED = False + _http_primitives._CACHE_ENABLED = False def clear_http_cache() -> None: @@ -138,32 +110,9 @@ def get_http_cache_info() -> dict: } -class RateLimiter: - """Simple blocking rate limiter. - - Parameters correspond to the maximum number of calls allowed within - ``period`` seconds. ``wait`` pauses execution when the limit has been - reached. - """ - - def __init__(self, max_calls: int = 20, period: float = 60.0) -> None: - self.max_calls = max_calls - self.period = period - self._calls: deque[float] = deque() - - def wait(self) -> None: - """Block until a new call is allowed.""" - now = time.monotonic() - while self._calls and now - self._calls[0] >= self.period: - self._calls.popleft() - if len(self._calls) >= self.max_calls: - sleep_for = self.period - (now - self._calls[0]) - time.sleep(max(sleep_for, 0)) - self._calls.popleft() - self._calls.append(time.monotonic()) - - -API_RATE_LIMITER = RateLimiter() +# RateLimiter and API_RATE_LIMITER live in _http_primitives to avoid a +# circular import with version_discovery.py. Re-exported here for +# backward compatibility. class ImporterPaths: @@ -191,121 +140,129 @@ def text_to_stringio(response_text: str) -> StringIO: def _replace_dataflow_version(url: str, version: str) -> str: - """Replace the dataflow version in the URL.""" - pattern = r",(\d+\.\d+)/" + """Replace the dataflow version in the URL. - return re.sub(pattern, f",{version}/", url) + Handles both comma-separated (v1) and slash-separated (v2) patterns. + """ + # v1 pattern: OECD.DCD.FSD,DATAFLOW_ID,VERSION/ + if re.search(r",(\d+\.\d+)/", url): + return re.sub(r",(\d+\.\d+)/", f",{version}/", url) + # v2 pattern: OECD.DCD.FSD/DATAFLOW_ID/VERSION/ + return re.sub(r"/(\d+\.\d+)/", f"/{version}/", url) def _get_dataflow_version(url: str) -> str | None: """Get the dataflow version from the URL. + Handles both comma-separated (v1) and slash-separated (v2) patterns. + Returns: The version string if found, None otherwise. """ - pattern = r",(\d+\.\d+)/" - match = re.search(pattern, url) + # v1 pattern: ,VERSION/ + match = re.search(r",(\d+\.\d+)/", url) + if match: + return match.group(1) + # v2 pattern: /VERSION/ — must not be a port or protocol fragment + match = re.search(r"/(\d+\.\d+)/", url) return match.group(1) if match else None -def _get_response_text(url: str, headers: dict) -> tuple[int, str, bool]: - """GET request returning status code, text content, and cache hit status. - - This call is subject to the global rate limiter and HTTP caching. - - Args: - url: The URL to fetch. - headers: Headers to include in the request. - - Returns: - tuple[int, str, bool]: Status code, text content, and whether from cache. - """ - API_RATE_LIMITER.wait() - - if _CACHE_ENABLED: - session = _get_http_session() - response = session.get(url, headers=headers) - from_cache = getattr(response, "from_cache", False) - if from_cache: - logger.info(f"Loading data from HTTP cache: {url}") - else: - logger.info(f"Fetching data from API: {url}") - else: - response = requests.get(url, headers=headers) - from_cache = False - logger.info(f"Fetching data from API (cache disabled): {url}") +def _extract_dataflow_id(url: str) -> str | None: + """Extract the dataflow ID from an SDMX data URL. - return response.status_code, response.text, from_cache + Handles both URL patterns used by the OECD SDMX API: - -def _get_response_content(url: str, headers: dict) -> tuple[int, bytes, bool]: - """GET request returning status code, content, and cache hit status. - - This call is subject to the global rate limiter and HTTP caching. + - v1 (comma-separated): ``…/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,1.8/…`` + - v2 (slash-separated): ``…/OECD.DCD.FSD/DSD_DAC1@DF_DAC1/1.8/…`` Args: - url: The URL to fetch. - headers: Headers to include in the request. + url: A full SDMX data or dataflow URL string. Returns: - tuple[int, bytes, bool]: Status code, content, and whether from cache. + The dataflow identifier (e.g. ``"DSD_DAC1@DF_DAC1"``) if found, + ``None`` if the URL does not match a recognised pattern. """ - API_RATE_LIMITER.wait() - - if _CACHE_ENABLED: - session = _get_http_session() - response = session.get(url, headers=headers) - from_cache = getattr(response, "from_cache", False) - if from_cache: - logger.info(f"Loading data from HTTP cache: {url}") - else: - logger.info(f"Fetching data from API: {url}") - else: - response = requests.get(url, headers=headers) - from_cache = False - logger.info(f"Fetching data from API (cache disabled): {url}") - - return response.status_code, response.content, from_cache + # v1: AGENCY,DATAFLOW_ID,VERSION + match = re.search(r"OECD\.DCD\.FSD,([^,/]+),\d+\.\d+", url) + if match: + return match.group(1) + # v2: AGENCY/DATAFLOW_ID/VERSION + match = re.search(r"OECD\.DCD\.FSD/([^/]+)/\d+\.\d+", url) + return match.group(1) if match else None -def get_data_from_api(url: str, compressed: bool = True, retries: int = 0) -> str: +def get_data_from_api(url: str, compressed: bool = True) -> str: """Download a CSV file from an API endpoint and return it as text. + If the initial request returns a "dataflow not found" error, the function + queries the SDMX metadata endpoint to discover the authoritative latest + version and retries once with that version. + Args: url: The URL of the API endpoint. compressed: Whether the data is fetched compressed. Strongly recommended. - retries: The number of retries to attempt. Returns: str: The response text from the API. + + Raises: + ConnectionError: If the request fails and version discovery cannot help, + or if the discovered version also fails. """ - # Set the headers with gzip encoding (if required) headers = {"Accept-Encoding": "gzip"} if compressed else {} - # Fetch the data with headers status_code, response, _ = _get_response_text(url, headers=headers) - # Check for dataflow not found errors - these should trigger version fallback - # This check happens regardless of status code since the API may return - # error messages with various status codes (404, 200, etc.) - # Only attempt fallback if the URL contains a dataflow version pattern + # If the response indicates a dataflow-not-found error and the URL contains + # a version, discover the latest version and retry. Before retrying we + # verify that the discovered DSD has the same number of key dimensions as + # the one this release was built for, so we never silently send filters to + # an incompatible schema. version = _get_dataflow_version(url) if _is_dataflow_not_found_error(response) and version is not None: - if retries < MAX_RETRIES: - new_version = str(round(float(version) - FALLBACK_STEP, 1)) - new_url = _replace_dataflow_version(url=url, version=new_version) + dataflow_id = _extract_dataflow_id(url) + if dataflow_id is not None: + discovered_version = discover_latest_version(dataflow_id) + if discovered_version == version: + raise ConnectionError( + f"Dataflow not found and discovered version '{discovered_version}' " + f"matches the attempted version. Response: {response[:200]}" + ) + + # Verify structural compatibility: the discovered DSD must have + # the same number of key dimensions as the version we expected. + try: + old_dims = get_dimension_count(dataflow_id, version) + new_dims = get_dimension_count(dataflow_id, discovered_version) + if old_dims != new_dims: + raise ConnectionError( + f"Discovered version {discovered_version} has " + f"{new_dims} key dimensions, but version {version} " + f"had {old_dims}. This is a breaking schema change. " + f"Please upgrade oda_reader." + ) + except (ConnectionError, ValueError) as exc: + if "breaking schema change" in str(exc): + raise + logger.warning( + f"Could not verify DSD compatibility: {exc}. " + f"Proceeding with discovered version {discovered_version}." + ) + + new_url = _replace_dataflow_version(url=url, version=discovered_version) logger.info( - f"Dataflow not found, retrying with version {new_version} " - f"(attempt {retries + 1}/{MAX_RETRIES})" - ) - return get_data_from_api( - url=new_url, compressed=compressed, retries=retries + 1 - ) - else: - raise ConnectionError( - f"No data found after {MAX_RETRIES} version fallback attempts. " - f"Last response: {response[:200]}" + f"Dataflow not found at version {version}, retrying with " + f"discovered version {discovered_version}" ) + status_code, response, _ = _get_response_text(new_url, headers=headers) + if _is_dataflow_not_found_error(response) or status_code > 299: + raise ConnectionError( + f"Dataflow not found even after version discovery " + f"(tried version {discovered_version}). " + f"Response: {response[:200]}" + ) + return response if (status_code == 500) and (response.find("not set to") > 0): url = url.replace("public", "dcd-public") diff --git a/src/oda_reader/crs.py b/src/oda_reader/crs.py index f51c584..87a4e57 100644 --- a/src/oda_reader/crs.py +++ b/src/oda_reader/crs.py @@ -14,6 +14,7 @@ DATAFLOW_ID: str = "DSD_CRS@DF_CRS" DATAFLOW_ID_GE: str = "DSD_GREQ@DF_CRS_GREQ" +# Default version; actual version is discovered dynamically on fallback DATAFLOW_VERSION: str = "1.6" # CRS filter structure: diff --git a/src/oda_reader/dac1.py b/src/oda_reader/dac1.py index 4672301..808a2c7 100644 --- a/src/oda_reader/dac1.py +++ b/src/oda_reader/dac1.py @@ -4,6 +4,7 @@ from oda_reader.download.download_tools import download DATAFLOW_ID: str = "DSD_DAC1@DF_DAC1" +# Default version; actual version is discovered dynamically on fallback DATAFLOW_VERSION: str = "1.8" diff --git a/src/oda_reader/dac2a.py b/src/oda_reader/dac2a.py index da5d61c..f41fd67 100644 --- a/src/oda_reader/dac2a.py +++ b/src/oda_reader/dac2a.py @@ -13,6 +13,7 @@ ) DATAFLOW_ID: str = "DSD_DAC2@DF_DAC2A" +# Default version; actual version is discovered dynamically on fallback DATAFLOW_VERSION: str = "1.4" diff --git a/src/oda_reader/download/download_tools.py b/src/oda_reader/download/download_tools.py index c3b0130..f93e0c8 100644 --- a/src/oda_reader/download/download_tools.py +++ b/src/oda_reader/download/download_tools.py @@ -24,6 +24,7 @@ logger, ) from oda_reader.download.query_builder import QueryBuilder +from oda_reader.download.version_discovery import discover_latest_version from oda_reader.schemas.crs_translation import convert_crs_to_dotstat_codes from oda_reader.schemas.dac1_translation import convert_dac1_to_dotstat_codes from oda_reader.schemas.dac2_translation import convert_dac2a_to_dotstat_codes @@ -48,8 +49,6 @@ f"{AIDDATA_VERSION.replace('.', '_')}.zip" ) -FALLBACK_STEP = 0.1 -MAX_RETRIES = 5 def _detect_delimiter(file_obj, sample_size: int = 8192) -> str: @@ -626,55 +625,105 @@ def bulk_download_aiddata( return None +def _extract_dataflow_id_from_flow_url(flow_url: str) -> str | None: + """Extract the dataflow ID from a bulk-download flow URL. + + Flow URLs follow the pattern:: + + https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD// + + Args: + flow_url: A URL string such as + ``https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS/``. + + Returns: + The dataflow identifier (e.g. ``"DSD_CRS@DF_CRS"``) if found, + ``None`` otherwise. + """ + match = re.search(r"OECD\.DCD\.FSD/([^/]+)/?$", flow_url) + return match.group(1) if match else None + + def get_bulk_file_id( - flow_url: str, search_string: str, latest_flow: float = 1.6, retries: int = 0 + flow_url: str, + search_string: str, + latest_flow: float | None = None, ) -> str: - """ - Retrieves the full bulk file ID from the OECD dataflow. + """Retrieve the full bulk file ID from the OECD dataflow. + + The version to query is determined as follows: + + 1. If *latest_flow* is provided, try that version first. + 2. If it fails (non-2xx or search string not found), call + :func:`~oda_reader.download.version_discovery.discover_latest_version` + to obtain the authoritative latest version and retry once. + 3. If *latest_flow* is ``None``, discover the version unconditionally + before making any request. Args: - flow_url (str): The URL of the dataflow to check. - search_string (str): The string to search for in the response content. - latest_flow (float): The latest version of the dataflow to check. - retries (int): The current number of retries (to avoid infinite recursion). + flow_url: The base URL of the dataflow (without a version suffix), + e.g. ``https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS/``. + search_string: The string to search for in the response XML to locate + the bulk-download link. + latest_flow: An explicit starting version to try. If ``None`` the + version is discovered automatically. Returns: str: The ID of the bulk download file. Raises: - KeyError: If the bulk download file link could not be found. - RuntimeError: If the maximum number of retries is exceeded. + RuntimeError: If the bulk download file ID cannot be found even after + version discovery. """ - if retries > MAX_RETRIES: - raise RuntimeError(f"Maximum retries ({MAX_RETRIES}) exceeded.") - - if latest_flow == 1.0: - latest_flow = int(round(latest_flow, 0)) - headers = {"Accept-Encoding": "gzip"} - status, response, from_cache = _get_response_text( - f"{flow_url}{latest_flow}", headers=headers - ) - if status > 299: - return get_bulk_file_id( - flow_url=flow_url, - search_string=search_string, - latest_flow=round(latest_flow - FALLBACK_STEP, 1), - retries=retries + 1, - ) + dataflow_id = _extract_dataflow_id_from_flow_url(flow_url) - match = re.search(f"{re.escape(search_string)}(.*?) str | None: + """Attempt to retrieve the file ID for a given version. - if not match: - logger.info("The link to the bulk download file could not be found.") - return get_bulk_file_id( - flow_url=flow_url, - search_string=search_string, - latest_flow=round(latest_flow - FALLBACK_STEP, 1), - retries=retries + 1, + Returns the file ID on success, ``None`` if this version does not + contain the expected link. + """ + status, response, _ = _get_response_text( + f"{flow_url}{version}", headers=headers ) + if status > 299: + return None + found = re.search(f"{re.escape(search_string)}(.*?) version string +_version_cache: dict[str, str] = {} + + +def _build_metadata_url(dataflow_id: str) -> str: + """Construct the SDMX metadata URL for a given dataflow ID. + + Args: + dataflow_id: The SDMX dataflow identifier, e.g. ``DSD_DAC1@DF_DAC1``. + + Returns: + str: The full metadata URL. + """ + return f"{METADATA_BASE_URL}/{dataflow_id}/latest" + + +def _parse_version_from_xml(xml_text: str) -> str: + """Extract the version attribute from SDMX Dataflow XML. + + Iterates over all elements looking for the one whose local name is + ``Dataflow`` and returns its ``version`` attribute. The search is + namespace-agnostic so it works regardless of the XML namespace prefix + used by the server. + + Args: + xml_text: Raw XML response text from the metadata endpoint. + + Returns: + str: The version string, e.g. ``"1.7"``. + + Raises: + ValueError: If no Dataflow element with a version attribute is found. + """ + root = ET.fromstring(xml_text) + for element in root.iter(): + local_name = element.tag.split("}")[-1] if "}" in element.tag else element.tag + if local_name == "Dataflow" and "version" in element.attrib: + return element.attrib["version"] + raise ValueError( + "No element found in SDMX metadata response." + ) + + +def discover_latest_version(dataflow_id: str) -> str: + """Query the OECD SDMX metadata endpoint to find the latest dataflow version. + + Results are cached in the module-level ``_version_cache`` dict so that + repeated calls for the same dataflow ID within a process session incur + only one network round-trip. + + The HTTP call uses the shared requests-cache session (7-day filesystem + cache) and the global rate limiter. + + Args: + dataflow_id: The SDMX dataflow identifier, e.g. ``DSD_DAC1@DF_DAC1``. + + Returns: + str: The latest version string, e.g. ``"1.7"``. + + Raises: + ConnectionError: If the metadata endpoint returns a non-2xx status. + ValueError: If the response XML does not contain a parseable version. + """ + if dataflow_id in _version_cache: + return _version_cache[dataflow_id] + + url = _build_metadata_url(dataflow_id) + + status_code, text, _ = get_response_text(url, headers={}) + + if status_code > 299: + raise ConnectionError( + f"Metadata endpoint returned HTTP {status_code} for " + f"dataflow '{dataflow_id}': {text[:200]}" + ) + + version = _parse_version_from_xml(text) + _version_cache[dataflow_id] = version + logger.info(f"Discovered latest version for '{dataflow_id}': {version}") + return version + + +def get_dimension_count(dataflow_id: str, version: str) -> int: + """Fetch the DSD for a specific version and count key dimensions. + + This excludes the TimeDimension, counting only the dimensions that + form the positional filter key. + + Args: + dataflow_id: e.g. ``"DSD_DAC1@DF_DAC1"``. + version: e.g. ``"1.7"``. + + Returns: + int: Number of key dimensions. + + Raises: + ConnectionError: If the DSD endpoint is unreachable. + ValueError: If no dimensions are found in the response. + """ + # The DSD ID is the part before '@' in the dataflow ID + dsd_id = dataflow_id.split("@")[0] if "@" in dataflow_id else dataflow_id + url = f"{DSD_BASE_URL}/{dsd_id}/{version}" + + status_code, text, _ = get_response_text(url, headers={}) + + if status_code > 299: + raise ConnectionError( + f"DSD endpoint returned HTTP {status_code} for " + f"'{dsd_id}' version {version}: {text[:200]}" + ) + + root = ET.fromstring(text) + count = 0 + for element in root.iter(): + local_name = element.tag.split("}")[-1] if "}" in element.tag else element.tag + if local_name == "Dimension": + count += 1 + if count == 0: + raise ValueError( + f"No dimensions found in DSD '{dsd_id}' version {version}." + ) + return count + + +def clear_version_cache() -> None: + """Clear both the in-process version cache and any HTTP-cached metadata. + + Call this when you need to force a fresh metadata lookup, for example + after a new dataflow version has been published mid-session. + + Example: + >>> from oda_reader import clear_version_cache + >>> clear_version_cache() + """ + # Evict HTTP-cached metadata responses so the next lookup hits the network. + session = _get_http_session() + for dataflow_id in _version_cache: + url = _build_metadata_url(dataflow_id) + session.cache.delete(urls=[url]) + + _version_cache.clear() + logger.info("Version discovery cache cleared.") diff --git a/src/oda_reader/multisystem.py b/src/oda_reader/multisystem.py index b53a163..cd1a730 100644 --- a/src/oda_reader/multisystem.py +++ b/src/oda_reader/multisystem.py @@ -13,6 +13,7 @@ ) DATAFLOW_ID: str = "DSD_MULTI@DF_MULTI" +# Default version; actual version is discovered dynamically on fallback DATAFLOW_VERSION: str = "1.6" diff --git a/tests/common/unit/test_cache.py b/tests/common/unit/test_cache.py index b6c1c18..3c18d55 100644 --- a/tests/common/unit/test_cache.py +++ b/tests/common/unit/test_cache.py @@ -2,9 +2,9 @@ import pytest +import oda_reader._http_primitives as _http_primitives from oda_reader import ( clear_http_cache, - common, disable_http_cache, enable_http_cache, get_http_cache_info, @@ -25,12 +25,12 @@ class TestHTTPCache: def test_disable_cache_sets_flag(self): """Test that disable_http_cache sets the flag.""" enable_http_cache() - assert common._CACHE_ENABLED is True + assert _http_primitives._CACHE_ENABLED is True disable_http_cache() - # Check the global variable through the module - assert common._CACHE_ENABLED is False + # Check the global variable through the primitives module + assert _http_primitives._CACHE_ENABLED is False # Cleanup enable_http_cache() @@ -41,7 +41,7 @@ def test_enable_cache_sets_flag(self): enable_http_cache() - assert common._CACHE_ENABLED is True + assert _http_primitives._CACHE_ENABLED is True def test_clear_cache_resets_counters(self, temp_cache_dir): """Test that clear_http_cache resets cache statistics.""" diff --git a/tests/conftest.py b/tests/conftest.py index f166012..fbc9a8f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,19 +27,19 @@ def temp_cache_dir(tmp_path, monkeypatch): Yields: Path: Path to the temporary cache directory """ - from oda_reader import common + import oda_reader._http_primitives as _http_primitives cache_dir = tmp_path / "test_cache" cache_dir.mkdir() monkeypatch.setenv("ODA_READER_CACHE_DIR", str(cache_dir)) # Reset global HTTP session so it gets reinitialized with new cache path - common._HTTP_SESSION = None + _http_primitives._HTTP_SESSION = None yield cache_dir # Clean up: reset session again after test - common._HTTP_SESSION = None + _http_primitives._HTTP_SESSION = None @pytest.fixture diff --git a/tests/download/unit/test_download_tools.py b/tests/download/unit/test_download_tools.py index 9e638c8..bc3e5ed 100644 --- a/tests/download/unit/test_download_tools.py +++ b/tests/download/unit/test_download_tools.py @@ -10,8 +10,10 @@ from oda_reader.common import get_data_from_api from oda_reader.download.download_tools import ( _detect_delimiter, + _extract_dataflow_id_from_flow_url, _save_or_return_parquet_files_from_content, bulk_download_parquet, + get_bulk_file_id, ) @@ -33,28 +35,130 @@ def test_get_data_from_api_success(self, mocker, sample_csv_response): assert result == sample_csv_response assert "DONOR,RECIPIENT" in result - def test_get_data_from_api_404_triggers_retry(self, mocker): - """Test that 404 with 'Dataflow' message triggers version fallback.""" - # First call returns 404 with Dataflow message - # Second call (with fallback version) returns success - mock_responses = [ - (404, "Dataflow not found", False), - (200, "DONOR,VALUE\n1,100", False), - ] + def test_get_data_from_api_404_triggers_version_discovery(self, mocker): + """Test that a 'Dataflow not found' response triggers version discovery retry.""" + mock_get_response = mocker.patch( + "oda_reader.common._get_response_text", + side_effect=[ + (404, "Dataflow not found", False), + (200, "DONOR,VALUE\n1,100", False), + ], + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="1.9", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + return_value=7, + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + result = get_data_from_api(url) + + assert mock_get_response.call_count == 2 + assert result == "DONOR,VALUE\n1,100" + + def test_get_data_from_api_discovered_version_matches_raises(self, mocker): + """Test that matching discovered version raises immediately without retry.""" + mocker.patch( + "oda_reader.common._get_response_text", + return_value=(404, "Dataflow not found", False), + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="2.0", # same as URL version + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + with pytest.raises(ConnectionError, match="matches the attempted version"): + get_data_from_api(url) + + def test_get_data_from_api_incompatible_dsd_raises(self, mocker): + """Test that a discovered version with different dimension count raises.""" + mocker.patch( + "oda_reader.common._get_response_text", + side_effect=[ + (404, "Dataflow not found", False), + (200, "DONOR,VALUE\n1,100", False), + ], + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="3.0", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + side_effect=[7, 8], # old has 7, new has 8 — breaking change + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + with pytest.raises(ConnectionError, match="breaking schema change"): + get_data_from_api(url) - mock = mocker.patch( + def test_get_data_from_api_compatible_upgrade_succeeds(self, mocker): + """Test that auto-upgrade works when dimension count matches.""" + mocker.patch( "oda_reader.common._get_response_text", - side_effect=mock_responses, + side_effect=[ + (404, "Dataflow not found", False), + (200, "DONOR,VALUE\n1,100", False), + ], + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="3.0", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + return_value=7, # same count — compatible ) - # URL with version 2.0 should fallback to 1.9 - url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DF_DAC1,2.0/" + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" result = get_data_from_api(url) + assert result == "DONOR,VALUE\n1,100" - # Should have made 2 calls (original + fallback) - assert mock.call_count == 2 + def test_get_data_from_api_dsd_check_fails_gracefully(self, mocker): + """Test that DSD check failure doesn't block the retry.""" + mocker.patch( + "oda_reader.common._get_response_text", + side_effect=[ + (404, "Dataflow not found", False), + (200, "DONOR,VALUE\n1,100", False), + ], + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="1.9", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + side_effect=ConnectionError("DSD endpoint down"), + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + result = get_data_from_api(url) assert result == "DONOR,VALUE\n1,100" + def test_get_data_from_api_retry_also_fails_raises(self, mocker): + """Test that failed retry after discovery raises clearly.""" + mocker.patch( + "oda_reader.common._get_response_text", + return_value=(404, "Dataflow not found", False), + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="1.9", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + return_value=7, + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + with pytest.raises(ConnectionError, match="even after version discovery"): + get_data_from_api(url) + def test_get_data_from_api_non_404_error_raises(self, mocker): """Test that non-404 errors raise ConnectionError.""" mock_response = (500, "Internal Server Error", False) @@ -337,3 +441,122 @@ def test_no_warning_when_is_txt_not_provided(self, mocker): warnings.simplefilter("error", DeprecationWarning) # Should not raise any DeprecationWarning bulk_download_parquet("fake-id") + + +@pytest.mark.unit +class TestExtractDataflowIdFromFlowUrl: + """Test the helper that extracts a dataflow ID from a bulk-download flow URL.""" + + @pytest.mark.parametrize( + "url,expected", + [ + ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS/", + "DSD_CRS@DF_CRS", + ), + ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_DAC2@DF_DAC2A/", + "DSD_DAC2@DF_DAC2A", + ), + ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_MULTI@DF_MULTI/", + "DSD_MULTI@DF_MULTI", + ), + # trailing slash optional + ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS", + "DSD_CRS@DF_CRS", + ), + ], + ) + def test_recognized_urls(self, url, expected): + assert _extract_dataflow_id_from_flow_url(url) == expected + + def test_unrecognized_url_returns_none(self): + assert _extract_dataflow_id_from_flow_url("https://example.com/other/path") is None + + +FLOW_URL = "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS/" +SEARCH_STRING = "DF_CRS_BULK=" + + +@pytest.mark.unit +class TestGetBulkFileId: + """Test get_bulk_file_id with discovery and fallback paths.""" + + def test_explicit_version_succeeds_immediately(self, mocker): + """When latest_flow is provided and works, discovery is not called.""" + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + return_value=(200, f"{SEARCH_STRING}abc123", False), + ) + mock_discover = mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + ) + + result = get_bulk_file_id(FLOW_URL, SEARCH_STRING, latest_flow=1.6) + + assert result == "abc123" + mock_discover.assert_not_called() + + def test_discovery_succeeds_when_no_explicit_version(self, mocker): + """When latest_flow=None, discovery is used and succeeds.""" + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + return_value=(200, f"{SEARCH_STRING}xyz789", False), + ) + mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + return_value="1.7", + ) + + result = get_bulk_file_id(FLOW_URL, SEARCH_STRING) + assert result == "xyz789" + + def test_explicit_version_fails_then_discovery_rescues(self, mocker): + """When explicit version fails, discovery finds a working version.""" + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + side_effect=[ + (404, "Not found", False), # explicit version fails + (200, f"{SEARCH_STRING}rescued", False), # discovered version works + ], + ) + mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + return_value="1.7", + ) + + result = get_bulk_file_id(FLOW_URL, SEARCH_STRING, latest_flow=1.8) + assert result == "rescued" + + def test_discovery_fails_then_scan_rescues(self, mocker): + """When discovery raises, the decrement scan finds a working version.""" + responses = [(404, "Not found", False)] * 5 + [ + (200, f"{SEARCH_STRING}scanned", False), + ] + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + side_effect=responses, + ) + mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + side_effect=ConnectionError("metadata endpoint down"), + ) + + result = get_bulk_file_id(FLOW_URL, SEARCH_STRING, latest_flow=2.0) + assert result == "scanned" + + def test_all_methods_exhausted_raises(self, mocker): + """When discovery and scan both fail, RuntimeError is raised.""" + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + return_value=(404, "Not found", False), + ) + mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + side_effect=ConnectionError("metadata endpoint down"), + ) + + with pytest.raises(RuntimeError, match="could not be found"): + get_bulk_file_id(FLOW_URL, SEARCH_STRING, latest_flow=1.0) diff --git a/tests/download/unit/test_version_discovery.py b/tests/download/unit/test_version_discovery.py new file mode 100644 index 0000000..c717c5e --- /dev/null +++ b/tests/download/unit/test_version_discovery.py @@ -0,0 +1,330 @@ +"""Unit tests for the version_discovery module.""" + +import pytest + +from oda_reader.download.version_discovery import ( + _build_metadata_url, + _parse_version_from_xml, + clear_version_cache, + discover_latest_version, + get_dimension_count, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_VALID_XML = """\ + + + + + + DAC1 + + + + +""" + +_NAMESPACED_XML = """\ + + + + + + DAC1 + + + + +""" + +_MISSING_VERSION_XML = """\ + + + + + + DAC1 + + + + +""" + + +# --------------------------------------------------------------------------- +# TestBuildMetadataUrl +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestBuildMetadataUrl: + """Verify URL construction for various dataflow IDs.""" + + def test_basic_dataflow_id(self): + url = _build_metadata_url("DSD_DAC1@DF_DAC1") + assert url == ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD" + "/DSD_DAC1@DF_DAC1/latest" + ) + + def test_crs_dataflow_id(self): + url = _build_metadata_url("DSD_CRS@DF_CRS") + assert url == ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD" + "/DSD_CRS@DF_CRS/latest" + ) + + def test_multisystem_dataflow_id(self): + url = _build_metadata_url("DSD_MULTI@DF_MULTI") + assert url == ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD" + "/DSD_MULTI@DF_MULTI/latest" + ) + + def test_dac2a_dataflow_id(self): + url = _build_metadata_url("DSD_DAC2@DF_DAC2A") + assert url == ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD" + "/DSD_DAC2@DF_DAC2A/latest" + ) + + +# --------------------------------------------------------------------------- +# TestParseVersionFromXml +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestParseVersionFromXml: + """Verify XML parsing handles valid, namespaced, and invalid inputs.""" + + def test_valid_xml(self): + version = _parse_version_from_xml(_VALID_XML) + assert version == "1.7" + + def test_namespaced_xml(self): + version = _parse_version_from_xml(_NAMESPACED_XML) + assert version == "2.3" + + def test_missing_version_raises(self): + with pytest.raises(ValueError, match="No >>") + + +# --------------------------------------------------------------------------- +# TestDiscoverLatestVersion +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def _mock_http(mocker): + """Patch get_response_text in version_discovery and clear the cache.""" + clear_version_cache() + return mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + ) + + +@pytest.mark.unit +class TestDiscoverLatestVersion: + """Verify HTTP calls, caching behaviour, and error propagation.""" + + def test_returns_parsed_version(self, _mock_http): + _mock_http.return_value = (200, _VALID_XML, False) + + version = discover_latest_version("DSD_DAC1@DF_DAC1") + assert version == "1.7" + + def test_result_is_cached(self, _mock_http): + """A second call for the same dataflow ID must not make another HTTP request.""" + _mock_http.return_value = (200, _VALID_XML, False) + + v1 = discover_latest_version("DSD_DAC1@DF_DAC1") + v2 = discover_latest_version("DSD_DAC1@DF_DAC1") + + assert v1 == v2 == "1.7" + _mock_http.assert_called_once() + + def test_http_error_raises_connection_error(self, _mock_http): + """Non-2xx status code raises ConnectionError.""" + _mock_http.return_value = (404, "Not found", False) + + with pytest.raises(ConnectionError, match="HTTP 404"): + discover_latest_version("DSD_DAC1@DF_DAC1") + + def test_different_dataflows_cached_separately(self, _mock_http): + """Each dataflow ID is cached independently.""" + _mock_http.side_effect = [ + (200, _VALID_XML, False), + (200, _NAMESPACED_XML, False), + ] + + v_dac1 = discover_latest_version("DSD_DAC1@DF_DAC1") + v_crs = discover_latest_version("DSD_CRS@DF_CRS") + + assert v_dac1 == "1.7" + assert v_crs == "2.3" + + +# --------------------------------------------------------------------------- +# TestClearVersionCache +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestClearVersionCache: + """Verify that clear_version_cache empties the in-process cache.""" + + def test_clear_forces_refetch(self, _mock_http): + """After clearing, the next call re-fetches from the network.""" + _mock_http.return_value = (200, _VALID_XML, False) + + discover_latest_version("DSD_DAC1@DF_DAC1") + assert _mock_http.call_count == 1 + + clear_version_cache() + + discover_latest_version("DSD_DAC1@DF_DAC1") + assert _mock_http.call_count == 2 + + def test_clear_evicts_http_cache(self, _mock_http, mocker): + """clear_version_cache should also evict HTTP-cached metadata URLs.""" + _mock_http.return_value = (200, _VALID_XML, False) + + mock_session = mocker.MagicMock() + mocker.patch( + "oda_reader.download.version_discovery._get_http_session", + return_value=mock_session, + ) + + discover_latest_version("DSD_DAC1@DF_DAC1") + clear_version_cache() + + mock_session.cache.delete.assert_called_once() + # Verify the URL passed matches the metadata URL pattern + call_kwargs = mock_session.cache.delete.call_args + urls = call_kwargs[1]["urls"] if "urls" in call_kwargs[1] else call_kwargs[0][0] + assert any("DSD_DAC1@DF_DAC1" in u for u in urls) + + def test_clear_on_empty_cache_is_noop(self, mocker): + """Clearing an empty cache should not error.""" + clear_version_cache() # should not raise + + +# --------------------------------------------------------------------------- +# TestGetDimensionCount +# --------------------------------------------------------------------------- + +_DSD_XML_7_DIMS = """\ + + + + + + + + + + + + + + + + + + + + + +""" + +_DSD_XML_NO_DIMS = """\ + + + + + + + + + + + + +""" + + +@pytest.mark.unit +class TestGetDimensionCount: + """Verify DSD dimension counting.""" + + def test_counts_dimensions_excluding_time(self, mocker): + """TimeDimension should not be counted, only Dimension elements.""" + mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + return_value=(200, _DSD_XML_7_DIMS, False), + ) + + count = get_dimension_count("DSD_DAC1@DF_DAC1", "1.7") + assert count == 7 + + def test_empty_dsd_raises(self, mocker): + """DSD with no Dimension elements should raise ValueError.""" + mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + return_value=(200, _DSD_XML_NO_DIMS, False), + ) + + with pytest.raises(ValueError, match="No dimensions found"): + get_dimension_count("DSD_DAC1@DF_DAC1", "1.0") + + def test_http_error_raises(self, mocker): + """Non-2xx from DSD endpoint should raise ConnectionError.""" + mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + return_value=(404, "Not found", False), + ) + + with pytest.raises(ConnectionError, match="HTTP 404"): + get_dimension_count("DSD_DAC1@DF_DAC1", "9.9") + + def test_splits_dataflow_id_on_at_sign(self, mocker): + """DSD ID should be derived from the part before '@'.""" + mock = mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + return_value=(200, _DSD_XML_7_DIMS, False), + ) + + get_dimension_count("DSD_DAC1@DF_DAC1", "1.7") + + called_url = mock.call_args[0][0] + assert "/DSD_DAC1/1.7" in called_url + assert "@" not in called_url + + +# --------------------------------------------------------------------------- +# TestDiscoverLatestVersionEdgeCases +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestDiscoverLatestVersionEdgeCases: + """Edge cases for discover_latest_version.""" + + def test_bad_xml_in_200_raises_valueerror(self, _mock_http): + """200 with unparseable XML should raise ValueError.""" + _mock_http.return_value = (200, " Date: Thu, 9 Apr 2026 16:49:54 -0600 Subject: [PATCH 2/3] Cache --- src/oda_reader/download/version_discovery.py | 9 +++++---- tests/download/unit/test_version_discovery.py | 6 ++++++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/oda_reader/download/version_discovery.py b/src/oda_reader/download/version_discovery.py index d923091..809868b 100644 --- a/src/oda_reader/download/version_discovery.py +++ b/src/oda_reader/download/version_discovery.py @@ -154,10 +154,11 @@ def clear_version_cache() -> None: >>> clear_version_cache() """ # Evict HTTP-cached metadata responses so the next lookup hits the network. - session = _get_http_session() - for dataflow_id in _version_cache: - url = _build_metadata_url(dataflow_id) - session.cache.delete(urls=[url]) + if _version_cache: + session = _get_http_session() + for dataflow_id in _version_cache: + url = _build_metadata_url(dataflow_id) + session.cache.delete(urls=[url]) _version_cache.clear() logger.info("Version discovery cache cleared.") diff --git a/tests/download/unit/test_version_discovery.py b/tests/download/unit/test_version_discovery.py index c717c5e..021c273 100644 --- a/tests/download/unit/test_version_discovery.py +++ b/tests/download/unit/test_version_discovery.py @@ -128,6 +128,12 @@ def test_malformed_xml_raises(self): def _mock_http(mocker): """Patch get_response_text in version_discovery and clear the cache.""" clear_version_cache() + # Also mock _get_http_session so clear_version_cache() doesn't create a + # real SQLite-backed session during tests (causes conflicts in parallel). + mocker.patch( + "oda_reader.download.version_discovery._get_http_session", + return_value=mocker.MagicMock(), + ) return mocker.patch( "oda_reader.download.version_discovery.get_response_text", ) From 45ae9441db905341672c588dd83eecc2f869c6e8 Mon Sep 17 00:00:00 2001 From: Jorge Rivera Date: Thu, 9 Apr 2026 17:00:25 -0600 Subject: [PATCH 3/3] bug --- pyproject.toml | 11 ++++++++-- tests/conftest.py | 5 +++++ uv.lock | 55 ++++++++++++++++++++++++++++++++--------------- 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cc5e8b5..101b82d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,13 +24,13 @@ classifiers = [ "Topic :: Software Development :: Libraries :: Python Modules", ] dependencies = [ - "filelock>=3.18.0", + "filelock>=3.20.3", "joblib>=1.4", "openpyxl>=3.1.0", "pandas>=2.2.0", "platformdirs>=4.5.0", "pyarrow>=14.0.0", - "requests>=2.32", + "requests>=2.33.0", "requests-cache>=1.2.0", ] @@ -58,6 +58,13 @@ test = [ "pytest-xdist>=3.5", ] +[tool.uv] +constraint-dependencies = [ + "urllib3>=2.6.3", + "virtualenv>=20.36.1", + "pygments>=2.20.0", +] + [tool.ruff] # Set the maximum line length line-length = 88 diff --git a/tests/conftest.py b/tests/conftest.py index fbc9a8f..7863ac5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,8 +11,13 @@ @pytest.fixture(autouse=True) def disable_cache_for_tests(): """Disable HTTP cache for all tests by default.""" + import oda_reader._http_primitives as _http_primitives + disable_http_cache() yield + # Reset session before re-enabling to avoid SQLite contention + # in parallel test workers sharing the same cache directory. + _http_primitives._HTTP_SESSION = None enable_http_cache() diff --git a/uv.lock b/uv.lock index b5845bb..f5532db 100644 --- a/uv.lock +++ b/uv.lock @@ -7,6 +7,13 @@ resolution-markers = [ "python_full_version < '3.11'", ] +[manifest] +constraints = [ + { name = "pygments", specifier = ">=2.20.0" }, + { name = "urllib3", specifier = ">=2.6.3" }, + { name = "virtualenv", specifier = ">=20.36.1" }, +] + [[package]] name = "attrs" version = "25.4.0" @@ -326,11 +333,11 @@ wheels = [ [[package]] name = "filelock" -version = "3.20.1" +version = "3.25.2" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/a7/23/ce7a1126827cedeb958fc043d61745754464eb56c5937c35bbf2b8e26f34/filelock-3.20.1.tar.gz", hash = "sha256:b8360948b351b80f420878d8516519a2204b07aefcdcfd24912a5d33127f188c", size = 19476, upload-time = "2025-12-15T23:54:28.027Z" } +sdist = { url = "https://files.pythonhosted.org/packages/94/b8/00651a0f559862f3bb7d6f7477b192afe3f583cc5e26403b44e59a55ab34/filelock-3.25.2.tar.gz", hash = "sha256:b64ece2b38f4ca29dd3e810287aa8c48182bbecd1ae6e9ae126c9b35f1382694", size = 40480, upload-time = "2026-03-11T20:45:38.487Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/e3/7f/a1a97644e39e7316d850784c642093c99df1290a460df4ede27659056834/filelock-3.20.1-py3-none-any.whl", hash = "sha256:15d9e9a67306188a44baa72f569d2bfd803076269365fdea0934385da4dc361a", size = 16666, upload-time = "2025-12-15T23:54:26.874Z" }, + { url = "https://files.pythonhosted.org/packages/a4/a5/842ae8f0c08b61d6484b52f99a03510a3a72d23141942d216ebe81fefbce/filelock-3.25.2-py3-none-any.whl", hash = "sha256:ca8afb0da15f229774c9ad1b455ed96e85a81373065fb10446672f64444ddf70", size = 26759, upload-time = "2026-03-11T20:45:37.437Z" }, ] [[package]] @@ -823,13 +830,13 @@ test = [ [package.metadata] requires-dist = [ - { name = "filelock", specifier = ">=3.18.0" }, + { name = "filelock", specifier = ">=3.20.3" }, { name = "joblib", specifier = ">=1.4" }, { name = "openpyxl", specifier = ">=3.1.0" }, { name = "pandas", specifier = ">=2.2.0" }, { name = "platformdirs", specifier = ">=4.5.0" }, { name = "pyarrow", specifier = ">=14.0.0" }, - { name = "requests", specifier = ">=2.32" }, + { name = "requests", specifier = ">=2.33.0" }, { name = "requests-cache", specifier = ">=1.2.0" }, ] @@ -1046,11 +1053,11 @@ wheels = [ [[package]] name = "pygments" -version = "2.19.2" +version = "2.20.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c3/b2/bc9c9196916376152d655522fdcebac55e66de6603a76a02bca1b6414f6c/pygments-2.20.0.tar.gz", hash = "sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f", size = 4955991, upload-time = "2026-03-29T13:29:33.898Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" }, + { url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" }, ] [[package]] @@ -1135,6 +1142,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" }, ] +[[package]] +name = "python-discovery" +version = "1.2.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "filelock" }, + { name = "platformdirs" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/de/ef/3bae0e537cfe91e8431efcba4434463d2c5a65f5a89edd47c6cf2f03c55f/python_discovery-1.2.2.tar.gz", hash = "sha256:876e9c57139eb757cb5878cbdd9ae5379e5d96266c99ef731119e04fffe533bb", size = 58872, upload-time = "2026-04-07T17:28:49.249Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d8/db/795879cc3ddfe338599bddea6388cc5100b088db0a4caf6e6c1af1c27e04/python_discovery-1.2.2-py3-none-any.whl", hash = "sha256:e1ae95d9af875e78f15e19aed0c6137ab1bb49c200f21f5061786490c9585c7a", size = 31894, upload-time = "2026-04-07T17:28:48.09Z" }, +] + [[package]] name = "pytz" version = "2025.2" @@ -1222,7 +1242,7 @@ wheels = [ [[package]] name = "requests" -version = "2.32.5" +version = "2.33.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "certifi" }, @@ -1230,9 +1250,9 @@ dependencies = [ { name = "idna" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/c9/74/b3ff8e6c8446842c3f5c837e9c3dfcfe2018ea6ecef224c710c85ef728f4/requests-2.32.5.tar.gz", hash = "sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf", size = 134517, upload-time = "2025-08-18T20:46:02.573Z" } +sdist = { url = "https://files.pythonhosted.org/packages/5f/a4/98b9c7c6428a668bf7e42ebb7c79d576a1c3c1e3ae2d47e674b468388871/requests-2.33.1.tar.gz", hash = "sha256:18817f8c57c6263968bc123d237e3b8b08ac046f5456bd1e307ee8f4250d3517", size = 134120, upload-time = "2026-03-30T16:09:15.531Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" }, + { url = "https://files.pythonhosted.org/packages/d7/8e/7540e8a2036f79a125c1d2ebadf69ed7901608859186c856fa0388ef4197/requests-2.33.1-py3-none-any.whl", hash = "sha256:4e6d1ef462f3626a1f0a0a9c42dd93c63bad33f9f1c1937509b8c5c8718ab56a", size = 64947, upload-time = "2026-03-30T16:09:13.83Z" }, ] [[package]] @@ -1368,26 +1388,27 @@ wheels = [ [[package]] name = "urllib3" -version = "2.6.2" +version = "2.6.3" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/1e/24/a2a2ed9addd907787d7aa0355ba36a6cadf1768b934c652ea78acbd59dcd/urllib3-2.6.2.tar.gz", hash = "sha256:016f9c98bb7e98085cb2b4b17b87d2c702975664e4f060c6532e64d1c1a5e797", size = 432930, upload-time = "2025-12-11T15:56:40.252Z" } +sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/6d/b9/4095b668ea3678bf6a0af005527f39de12fb026516fb3df17495a733b7f8/urllib3-2.6.2-py3-none-any.whl", hash = "sha256:ec21cddfe7724fc7cb4ba4bea7aa8e2ef36f607a4bab81aa6ce42a13dc3f03dd", size = 131182, upload-time = "2025-12-11T15:56:38.584Z" }, + { url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" }, ] [[package]] name = "virtualenv" -version = "20.35.4" +version = "21.2.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "distlib" }, { name = "filelock" }, { name = "platformdirs" }, + { name = "python-discovery" }, { name = "typing-extensions", marker = "python_full_version < '3.11'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/20/28/e6f1a6f655d620846bd9df527390ecc26b3805a0c5989048c210e22c5ca9/virtualenv-20.35.4.tar.gz", hash = "sha256:643d3914d73d3eeb0c552cbb12d7e82adf0e504dbf86a3182f8771a153a1971c", size = 6028799, upload-time = "2025-10-29T06:57:40.511Z" } +sdist = { url = "https://files.pythonhosted.org/packages/97/c5/aff062c66b42e2183201a7ace10c6b2e959a9a16525c8e8ca8e59410d27a/virtualenv-21.2.1.tar.gz", hash = "sha256:b66ffe81301766c0d5e2208fc3576652c59d44e7b731fc5f5ed701c9b537fa78", size = 5844770, upload-time = "2026-04-09T18:47:11.482Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/79/0c/c05523fa3181fdf0c9c52a6ba91a23fbf3246cc095f26f6516f9c60e6771/virtualenv-20.35.4-py3-none-any.whl", hash = "sha256:c21c9cede36c9753eeade68ba7d523529f228a403463376cf821eaae2b650f1b", size = 6005095, upload-time = "2025-10-29T06:57:37.598Z" }, + { url = "https://files.pythonhosted.org/packages/20/0e/f083a76cb590e60dff3868779558eefefb8dfb7c9ed020babc7aa014ccbf/virtualenv-21.2.1-py3-none-any.whl", hash = "sha256:bd16b49c53562b28cf1a3ad2f36edb805ad71301dee70ddc449e5c88a9f919a2", size = 5828326, upload-time = "2026-04-09T18:47:09.331Z" }, ] [[package]]