diff --git a/CHANGELOG.md b/CHANGELOG.md index 65e8c07..f336471 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog for oda_reader +## 1.5.0 (2026-04-09) +- Replaces blind version-decrement fallback with authoritative SDMX metadata endpoint lookup for all datasets. +- Adds `clear_version_cache()` to the public API for forcing fresh version discovery mid-session. + ## 1.4.3 (2026-04-09) - Fixes DAC1 query filter dimension order to match the current DSD schema, which added SECTOR at position 2. - Bumps DAC1 dataflow version from 1.7 to 1.8. diff --git a/docs/docs/advanced.md b/docs/docs/advanced.md index 0f5287f..3dc80cc 100644 --- a/docs/docs/advanced.md +++ b/docs/docs/advanced.md @@ -14,12 +14,13 @@ qb = QueryBuilder() # Build a custom DAC1 filter filter_string = qb.build_dac1_filter( donor="USA", + sector="_Z", measure="1010", flow_type="1140" ) print(filter_string) -# Output: "USA..1010..1140.." +# Output: "USA._Z.1010..1140.." ``` This filter string can be used to manually construct API URLs. @@ -39,50 +40,43 @@ Each method returns a filter string suitable for the SDMX API. ## Dataflow Version Handling -OECD occasionally changes dataflow versions (schema updates). ODA Reader handles this automatically with version fallback. +OECD occasionally changes dataflow versions (schema updates). ODA Reader handles this automatically with version discovery. -### Automatic Fallback +### Automatic Version Discovery -When a dataflow version returns 404 (not found), ODA Reader automatically: +When a dataflow version returns an error (not found), ODA Reader automatically queries the OECD's SDMX metadata endpoint to discover the latest published version and retries with it: -1. Tries the configured version (e.g., `1.5`) -2. If 404, retries with `1.4` -3. Continues decrementing: `1.3`, `1.2`, `1.1` -4. Returns data from first successful version (up to 5 attempts) +1. Tries the configured default version +2. If not found, queries the metadata endpoint for the authoritative latest version +3. Retries once with the discovered version -This means your code keeps working even when OECD makes breaking schema changes. +This means your code keeps working even when OECD updates schema versions. You'll see a log message indicating which version was discovered. -**Example**: +### Clearing the Version Cache + +Discovered versions are cached in-process so repeated queries don't trigger extra metadata lookups. If the OECD publishes a new dataflow version mid-session, you can force re-discovery: ```python -from oda_reader import download_dac1 +from oda_reader import clear_version_cache -# ODA Reader will automatically try: -# 1.5 -> 404 -# 1.4 -> 404 -# 1.3 -> Success! Returns data with version 1.3 -data = download_dac1(start_year=2022, end_year=2022) +clear_version_cache() ``` -You'll see a message indicating which version succeeded. - ### Manual Version Override You can specify an exact dataflow version: ```python -# Force use of version 1.3 data = download_dac1( start_year=2022, end_year=2022, - dataflow_version="1.3" + dataflow_version="1.7" ) ``` **When to override**: - You know the correct version for reproducibility - Debugging version-specific issues -- Avoiding automatic fallback (for performance) **Available for**: - `download_dac1(dataflow_version=...)` diff --git a/docs/docs/caching.md b/docs/docs/caching.md index a504748..bfa04c3 100644 --- a/docs/docs/caching.md +++ b/docs/docs/caching.md @@ -141,6 +141,18 @@ disable_http_cache() enable_http_cache() ``` +### Clear Version Discovery Cache + +ODA Reader caches discovered dataflow versions in-process. If the OECD publishes a new version mid-session: + +```python +from oda_reader import clear_version_cache + +clear_version_cache() +``` + +This forces a fresh metadata lookup on the next query. + ### Clear HTTP Cache Only ```python diff --git a/docs/docs/index.md b/docs/docs/index.md index ccab9b3..efe88a5 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -15,8 +15,7 @@ ODA Reader eliminates these headaches. It provides a unified Python interface th - **Bulk download large files** with memory-efficient streaming for the full CRS (1GB+) - **Automatic rate limiting** and caching to work within API constraints - **Schema translation** between Data Explorer API and OECD.Stat formats -- **Version fallback** automatically searches for the most recent schema version since they -can unexpectedly change with new data releases. +- **Automatic version discovery** queries OECD's SDMX metadata to find the latest dataflow version, handling schema changes transparently. **Built for researchers, analysts, and developers** who need reliable, programmatic access to ODA data without fighting infrastructure. diff --git a/docs/docs/why-oda-reader.md b/docs/docs/why-oda-reader.md index 5b6346f..81b6e1e 100644 --- a/docs/docs/why-oda-reader.md +++ b/docs/docs/why-oda-reader.md @@ -26,7 +26,7 @@ The OECD Development Assistance Committee publishes comprehensive data on offici - Schema changes break queries without warning - No automatic retries or fallback mechanisms -**What ODA Reader provides**: Automatic version fallback, rate limiting built-in, consistent function interface, handles schema changes gracefully. +**What ODA Reader provides**: Automatic version discovery, rate limiting built-in, consistent function interface, handles schema changes gracefully. ### Manual Downloads from OECD.Stat diff --git a/pyproject.toml b/pyproject.toml index fb3c2db..101b82d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oda_reader" -version = "1.4.3" +version = "1.5.0" description = "A simple package to import ODA data from the OECD's API and AidData's database" readme = "README.md" license = "MIT" @@ -24,13 +24,13 @@ classifiers = [ "Topic :: Software Development :: Libraries :: Python Modules", ] dependencies = [ - "filelock>=3.18.0", + "filelock>=3.20.3", "joblib>=1.4", "openpyxl>=3.1.0", "pandas>=2.2.0", "platformdirs>=4.5.0", "pyarrow>=14.0.0", - "requests>=2.32", + "requests>=2.33.0", "requests-cache>=1.2.0", ] @@ -58,6 +58,13 @@ test = [ "pytest-xdist>=3.5", ] +[tool.uv] +constraint-dependencies = [ + "urllib3>=2.6.3", + "virtualenv>=20.36.1", + "pygments>=2.20.0", +] + [tool.ruff] # Set the maximum line length line-length = 88 diff --git a/src/oda_reader/__init__.py b/src/oda_reader/__init__.py index 45f33af..55364ce 100644 --- a/src/oda_reader/__init__.py +++ b/src/oda_reader/__init__.py @@ -26,6 +26,7 @@ enable_http_cache, get_http_cache_info, ) +from oda_reader.download.version_discovery import clear_version_cache from oda_reader.crs import bulk_download_crs, download_crs, download_crs_file from oda_reader.dac1 import download_dac1 from oda_reader.dac2a import bulk_download_dac2a, download_dac2a @@ -55,6 +56,8 @@ "disable_http_cache", "clear_http_cache", "get_http_cache_info", + # Version discovery cache + "clear_version_cache", # DataFrame and bulk cache managers "dataframe_cache", "bulk_cache_manager", diff --git a/src/oda_reader/_http_primitives.py b/src/oda_reader/_http_primitives.py new file mode 100644 index 0000000..e2bca73 --- /dev/null +++ b/src/oda_reader/_http_primitives.py @@ -0,0 +1,145 @@ +"""Shared HTTP primitives used by both common.py and version_discovery.py. + +This module exists to break the circular import that would arise if +version_discovery.py imported from common.py while common.py imports +discover_latest_version from version_discovery.py. + +This module must not import from common.py or version_discovery.py to avoid +circular imports. +""" + +import logging +import time +from collections import deque + +import requests +import requests_cache + +from oda_reader._cache.config import get_http_cache_path + +logger = logging.getLogger("oda_importer") + +# --------------------------------------------------------------------------- +# Rate limiter +# --------------------------------------------------------------------------- + + +class RateLimiter: + """Simple blocking rate limiter. + + Parameters correspond to the maximum number of calls allowed within + ``period`` seconds. ``wait`` pauses execution when the limit has been + reached. + """ + + def __init__(self, max_calls: int = 20, period: float = 60.0) -> None: + self.max_calls = max_calls + self.period = period + self._calls: deque[float] = deque() + + def wait(self) -> None: + """Block until a new call is allowed.""" + now = time.monotonic() + while self._calls and now - self._calls[0] >= self.period: + self._calls.popleft() + if len(self._calls) >= self.max_calls: + sleep_for = self.period - (now - self._calls[0]) + time.sleep(max(sleep_for, 0)) + self._calls.popleft() + self._calls.append(time.monotonic()) + + +API_RATE_LIMITER = RateLimiter() + +# --------------------------------------------------------------------------- +# HTTP cache session +# --------------------------------------------------------------------------- + +# Global HTTP cache session (initialized lazily) +_HTTP_SESSION: requests_cache.CachedSession | None = None +_CACHE_ENABLED = True + + +def _get_http_session() -> requests_cache.CachedSession: + """Get or create the global HTTP cache session. + + All responses are cached for 7 days (604800 seconds). + Uses filesystem backend to handle large responses (>2GB). + + Returns: + CachedSession: requests-cache session with 7-day expiration. + """ + global _HTTP_SESSION + + if _HTTP_SESSION is None: + cache_path = str(get_http_cache_path()) + + _HTTP_SESSION = requests_cache.CachedSession( + cache_name=cache_path, + backend="filesystem", + expire_after=604800, # 7 days + allowable_codes=(200, 404), # Cache 404s for version fallback + stale_if_error=True, # Use stale cache if API errors + ) + + return _HTTP_SESSION + + +def get_response_text(url: str, headers: dict) -> tuple[int, str, bool]: + """GET request returning status code, text content, and cache hit status. + + This call is subject to the global rate limiter and HTTP caching. + + Args: + url: The URL to fetch. + headers: Headers to include in the request. + + Returns: + tuple[int, str, bool]: Status code, text content, and whether from cache. + """ + API_RATE_LIMITER.wait() + + if _CACHE_ENABLED: + session = _get_http_session() + response = session.get(url, headers=headers) + from_cache = getattr(response, "from_cache", False) + if from_cache: + logger.info(f"Loading data from HTTP cache: {url}") + else: + logger.info(f"Fetching data from API: {url}") + else: + response = requests.get(url, headers=headers) + from_cache = False + logger.info(f"Fetching data from API (cache disabled): {url}") + + return response.status_code, response.text, from_cache + + +def get_response_content(url: str, headers: dict) -> tuple[int, bytes, bool]: + """GET request returning status code, raw content, and cache hit status. + + This call is subject to the global rate limiter and HTTP caching. + + Args: + url: The URL to fetch. + headers: Headers to include in the request. + + Returns: + tuple[int, bytes, bool]: Status code, content bytes, and whether from cache. + """ + API_RATE_LIMITER.wait() + + if _CACHE_ENABLED: + session = _get_http_session() + response = session.get(url, headers=headers) + from_cache = getattr(response, "from_cache", False) + if from_cache: + logger.info(f"Loading data from HTTP cache: {url}") + else: + logger.info(f"Fetching data from API: {url}") + else: + response = requests.get(url, headers=headers) + from_cache = False + logger.info(f"Fetching data from API (cache disabled): {url}") + + return response.status_code, response.content, from_cache diff --git a/src/oda_reader/common.py b/src/oda_reader/common.py index a16813c..e3a4e59 100644 --- a/src/oda_reader/common.py +++ b/src/oda_reader/common.py @@ -1,26 +1,30 @@ import logging import re -import time -from collections import deque from copy import deepcopy from io import StringIO from pathlib import Path import pandas as pd -import requests -import requests_cache -from oda_reader._cache.config import get_http_cache_path +import oda_reader._http_primitives as _http_primitives +from oda_reader._http_primitives import ( + API_RATE_LIMITER, + RateLimiter, + _get_http_session, + get_response_content as _get_response_content, + get_response_text as _get_response_text, +) +from oda_reader.download.version_discovery import ( + discover_latest_version, + get_dimension_count, +) logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") logger = logging.getLogger("oda_importer") -FALLBACK_STEP = 0.1 -MAX_RETRIES = 5 - # Error message patterns that indicate a dataflow/version not found error -# These should trigger a version fallback retry +# These should trigger a version discovery retry DATAFLOW_NOT_FOUND_PATTERNS = ( "Could not find Dataflow", "Could not find DSD", @@ -32,7 +36,7 @@ def _is_dataflow_not_found_error(response: str) -> bool: """Check if the response indicates a dataflow/version not found error. - These errors should trigger a version fallback retry rather than + These errors should trigger a version discovery retry rather than being treated as valid data. Args: @@ -54,36 +58,6 @@ def _is_dataflow_not_found_error(response: str) -> bool: return is_too_short and looks_like_error -# Global HTTP cache session (initialized lazily) -_HTTP_SESSION: requests_cache.CachedSession | None = None -_CACHE_ENABLED = True - - -def _get_http_session() -> requests_cache.CachedSession: - """Get or create the global HTTP cache session. - - All responses are cached for 7 days (604800 seconds). - Uses filesystem backend to handle large responses (>2GB). - - Returns: - CachedSession: requests-cache session with 7-day expiration. - """ - global _HTTP_SESSION - - if _HTTP_SESSION is None: - cache_path = str(get_http_cache_path()) - - _HTTP_SESSION = requests_cache.CachedSession( - cache_name=cache_path, - backend="filesystem", - expire_after=604800, # 7 days - allowable_codes=(200, 404), # Cache 404s for version fallback - stale_if_error=True, # Use stale cache if API errors - ) - - return _HTTP_SESSION - - def enable_http_cache() -> None: """Enable HTTP response caching. @@ -91,10 +65,9 @@ def enable_http_cache() -> None: >>> from oda_reader import enable_http_cache >>> enable_http_cache() """ - global _CACHE_ENABLED - _CACHE_ENABLED = True - if _HTTP_SESSION is not None: - _HTTP_SESSION.cache.clear() # Clear any cached data from disabled session + _http_primitives._CACHE_ENABLED = True + if _http_primitives._HTTP_SESSION is not None: + _http_primitives._HTTP_SESSION.cache.clear() def disable_http_cache() -> None: @@ -104,8 +77,7 @@ def disable_http_cache() -> None: >>> from oda_reader import disable_http_cache >>> disable_http_cache() """ - global _CACHE_ENABLED - _CACHE_ENABLED = False + _http_primitives._CACHE_ENABLED = False def clear_http_cache() -> None: @@ -138,32 +110,9 @@ def get_http_cache_info() -> dict: } -class RateLimiter: - """Simple blocking rate limiter. - - Parameters correspond to the maximum number of calls allowed within - ``period`` seconds. ``wait`` pauses execution when the limit has been - reached. - """ - - def __init__(self, max_calls: int = 20, period: float = 60.0) -> None: - self.max_calls = max_calls - self.period = period - self._calls: deque[float] = deque() - - def wait(self) -> None: - """Block until a new call is allowed.""" - now = time.monotonic() - while self._calls and now - self._calls[0] >= self.period: - self._calls.popleft() - if len(self._calls) >= self.max_calls: - sleep_for = self.period - (now - self._calls[0]) - time.sleep(max(sleep_for, 0)) - self._calls.popleft() - self._calls.append(time.monotonic()) - - -API_RATE_LIMITER = RateLimiter() +# RateLimiter and API_RATE_LIMITER live in _http_primitives to avoid a +# circular import with version_discovery.py. Re-exported here for +# backward compatibility. class ImporterPaths: @@ -191,121 +140,129 @@ def text_to_stringio(response_text: str) -> StringIO: def _replace_dataflow_version(url: str, version: str) -> str: - """Replace the dataflow version in the URL.""" - pattern = r",(\d+\.\d+)/" + """Replace the dataflow version in the URL. - return re.sub(pattern, f",{version}/", url) + Handles both comma-separated (v1) and slash-separated (v2) patterns. + """ + # v1 pattern: OECD.DCD.FSD,DATAFLOW_ID,VERSION/ + if re.search(r",(\d+\.\d+)/", url): + return re.sub(r",(\d+\.\d+)/", f",{version}/", url) + # v2 pattern: OECD.DCD.FSD/DATAFLOW_ID/VERSION/ + return re.sub(r"/(\d+\.\d+)/", f"/{version}/", url) def _get_dataflow_version(url: str) -> str | None: """Get the dataflow version from the URL. + Handles both comma-separated (v1) and slash-separated (v2) patterns. + Returns: The version string if found, None otherwise. """ - pattern = r",(\d+\.\d+)/" - match = re.search(pattern, url) + # v1 pattern: ,VERSION/ + match = re.search(r",(\d+\.\d+)/", url) + if match: + return match.group(1) + # v2 pattern: /VERSION/ — must not be a port or protocol fragment + match = re.search(r"/(\d+\.\d+)/", url) return match.group(1) if match else None -def _get_response_text(url: str, headers: dict) -> tuple[int, str, bool]: - """GET request returning status code, text content, and cache hit status. - - This call is subject to the global rate limiter and HTTP caching. - - Args: - url: The URL to fetch. - headers: Headers to include in the request. - - Returns: - tuple[int, str, bool]: Status code, text content, and whether from cache. - """ - API_RATE_LIMITER.wait() - - if _CACHE_ENABLED: - session = _get_http_session() - response = session.get(url, headers=headers) - from_cache = getattr(response, "from_cache", False) - if from_cache: - logger.info(f"Loading data from HTTP cache: {url}") - else: - logger.info(f"Fetching data from API: {url}") - else: - response = requests.get(url, headers=headers) - from_cache = False - logger.info(f"Fetching data from API (cache disabled): {url}") +def _extract_dataflow_id(url: str) -> str | None: + """Extract the dataflow ID from an SDMX data URL. - return response.status_code, response.text, from_cache + Handles both URL patterns used by the OECD SDMX API: - -def _get_response_content(url: str, headers: dict) -> tuple[int, bytes, bool]: - """GET request returning status code, content, and cache hit status. - - This call is subject to the global rate limiter and HTTP caching. + - v1 (comma-separated): ``…/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,1.8/…`` + - v2 (slash-separated): ``…/OECD.DCD.FSD/DSD_DAC1@DF_DAC1/1.8/…`` Args: - url: The URL to fetch. - headers: Headers to include in the request. + url: A full SDMX data or dataflow URL string. Returns: - tuple[int, bytes, bool]: Status code, content, and whether from cache. + The dataflow identifier (e.g. ``"DSD_DAC1@DF_DAC1"``) if found, + ``None`` if the URL does not match a recognised pattern. """ - API_RATE_LIMITER.wait() - - if _CACHE_ENABLED: - session = _get_http_session() - response = session.get(url, headers=headers) - from_cache = getattr(response, "from_cache", False) - if from_cache: - logger.info(f"Loading data from HTTP cache: {url}") - else: - logger.info(f"Fetching data from API: {url}") - else: - response = requests.get(url, headers=headers) - from_cache = False - logger.info(f"Fetching data from API (cache disabled): {url}") - - return response.status_code, response.content, from_cache + # v1: AGENCY,DATAFLOW_ID,VERSION + match = re.search(r"OECD\.DCD\.FSD,([^,/]+),\d+\.\d+", url) + if match: + return match.group(1) + # v2: AGENCY/DATAFLOW_ID/VERSION + match = re.search(r"OECD\.DCD\.FSD/([^/]+)/\d+\.\d+", url) + return match.group(1) if match else None -def get_data_from_api(url: str, compressed: bool = True, retries: int = 0) -> str: +def get_data_from_api(url: str, compressed: bool = True) -> str: """Download a CSV file from an API endpoint and return it as text. + If the initial request returns a "dataflow not found" error, the function + queries the SDMX metadata endpoint to discover the authoritative latest + version and retries once with that version. + Args: url: The URL of the API endpoint. compressed: Whether the data is fetched compressed. Strongly recommended. - retries: The number of retries to attempt. Returns: str: The response text from the API. + + Raises: + ConnectionError: If the request fails and version discovery cannot help, + or if the discovered version also fails. """ - # Set the headers with gzip encoding (if required) headers = {"Accept-Encoding": "gzip"} if compressed else {} - # Fetch the data with headers status_code, response, _ = _get_response_text(url, headers=headers) - # Check for dataflow not found errors - these should trigger version fallback - # This check happens regardless of status code since the API may return - # error messages with various status codes (404, 200, etc.) - # Only attempt fallback if the URL contains a dataflow version pattern + # If the response indicates a dataflow-not-found error and the URL contains + # a version, discover the latest version and retry. Before retrying we + # verify that the discovered DSD has the same number of key dimensions as + # the one this release was built for, so we never silently send filters to + # an incompatible schema. version = _get_dataflow_version(url) if _is_dataflow_not_found_error(response) and version is not None: - if retries < MAX_RETRIES: - new_version = str(round(float(version) - FALLBACK_STEP, 1)) - new_url = _replace_dataflow_version(url=url, version=new_version) + dataflow_id = _extract_dataflow_id(url) + if dataflow_id is not None: + discovered_version = discover_latest_version(dataflow_id) + if discovered_version == version: + raise ConnectionError( + f"Dataflow not found and discovered version '{discovered_version}' " + f"matches the attempted version. Response: {response[:200]}" + ) + + # Verify structural compatibility: the discovered DSD must have + # the same number of key dimensions as the version we expected. + try: + old_dims = get_dimension_count(dataflow_id, version) + new_dims = get_dimension_count(dataflow_id, discovered_version) + if old_dims != new_dims: + raise ConnectionError( + f"Discovered version {discovered_version} has " + f"{new_dims} key dimensions, but version {version} " + f"had {old_dims}. This is a breaking schema change. " + f"Please upgrade oda_reader." + ) + except (ConnectionError, ValueError) as exc: + if "breaking schema change" in str(exc): + raise + logger.warning( + f"Could not verify DSD compatibility: {exc}. " + f"Proceeding with discovered version {discovered_version}." + ) + + new_url = _replace_dataflow_version(url=url, version=discovered_version) logger.info( - f"Dataflow not found, retrying with version {new_version} " - f"(attempt {retries + 1}/{MAX_RETRIES})" - ) - return get_data_from_api( - url=new_url, compressed=compressed, retries=retries + 1 - ) - else: - raise ConnectionError( - f"No data found after {MAX_RETRIES} version fallback attempts. " - f"Last response: {response[:200]}" + f"Dataflow not found at version {version}, retrying with " + f"discovered version {discovered_version}" ) + status_code, response, _ = _get_response_text(new_url, headers=headers) + if _is_dataflow_not_found_error(response) or status_code > 299: + raise ConnectionError( + f"Dataflow not found even after version discovery " + f"(tried version {discovered_version}). " + f"Response: {response[:200]}" + ) + return response if (status_code == 500) and (response.find("not set to") > 0): url = url.replace("public", "dcd-public") diff --git a/src/oda_reader/crs.py b/src/oda_reader/crs.py index f51c584..87a4e57 100644 --- a/src/oda_reader/crs.py +++ b/src/oda_reader/crs.py @@ -14,6 +14,7 @@ DATAFLOW_ID: str = "DSD_CRS@DF_CRS" DATAFLOW_ID_GE: str = "DSD_GREQ@DF_CRS_GREQ" +# Default version; actual version is discovered dynamically on fallback DATAFLOW_VERSION: str = "1.6" # CRS filter structure: diff --git a/src/oda_reader/dac1.py b/src/oda_reader/dac1.py index 4672301..808a2c7 100644 --- a/src/oda_reader/dac1.py +++ b/src/oda_reader/dac1.py @@ -4,6 +4,7 @@ from oda_reader.download.download_tools import download DATAFLOW_ID: str = "DSD_DAC1@DF_DAC1" +# Default version; actual version is discovered dynamically on fallback DATAFLOW_VERSION: str = "1.8" diff --git a/src/oda_reader/dac2a.py b/src/oda_reader/dac2a.py index da5d61c..f41fd67 100644 --- a/src/oda_reader/dac2a.py +++ b/src/oda_reader/dac2a.py @@ -13,6 +13,7 @@ ) DATAFLOW_ID: str = "DSD_DAC2@DF_DAC2A" +# Default version; actual version is discovered dynamically on fallback DATAFLOW_VERSION: str = "1.4" diff --git a/src/oda_reader/download/download_tools.py b/src/oda_reader/download/download_tools.py index c3b0130..f93e0c8 100644 --- a/src/oda_reader/download/download_tools.py +++ b/src/oda_reader/download/download_tools.py @@ -24,6 +24,7 @@ logger, ) from oda_reader.download.query_builder import QueryBuilder +from oda_reader.download.version_discovery import discover_latest_version from oda_reader.schemas.crs_translation import convert_crs_to_dotstat_codes from oda_reader.schemas.dac1_translation import convert_dac1_to_dotstat_codes from oda_reader.schemas.dac2_translation import convert_dac2a_to_dotstat_codes @@ -48,8 +49,6 @@ f"{AIDDATA_VERSION.replace('.', '_')}.zip" ) -FALLBACK_STEP = 0.1 -MAX_RETRIES = 5 def _detect_delimiter(file_obj, sample_size: int = 8192) -> str: @@ -626,55 +625,105 @@ def bulk_download_aiddata( return None +def _extract_dataflow_id_from_flow_url(flow_url: str) -> str | None: + """Extract the dataflow ID from a bulk-download flow URL. + + Flow URLs follow the pattern:: + + https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD// + + Args: + flow_url: A URL string such as + ``https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS/``. + + Returns: + The dataflow identifier (e.g. ``"DSD_CRS@DF_CRS"``) if found, + ``None`` otherwise. + """ + match = re.search(r"OECD\.DCD\.FSD/([^/]+)/?$", flow_url) + return match.group(1) if match else None + + def get_bulk_file_id( - flow_url: str, search_string: str, latest_flow: float = 1.6, retries: int = 0 + flow_url: str, + search_string: str, + latest_flow: float | None = None, ) -> str: - """ - Retrieves the full bulk file ID from the OECD dataflow. + """Retrieve the full bulk file ID from the OECD dataflow. + + The version to query is determined as follows: + + 1. If *latest_flow* is provided, try that version first. + 2. If it fails (non-2xx or search string not found), call + :func:`~oda_reader.download.version_discovery.discover_latest_version` + to obtain the authoritative latest version and retry once. + 3. If *latest_flow* is ``None``, discover the version unconditionally + before making any request. Args: - flow_url (str): The URL of the dataflow to check. - search_string (str): The string to search for in the response content. - latest_flow (float): The latest version of the dataflow to check. - retries (int): The current number of retries (to avoid infinite recursion). + flow_url: The base URL of the dataflow (without a version suffix), + e.g. ``https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS/``. + search_string: The string to search for in the response XML to locate + the bulk-download link. + latest_flow: An explicit starting version to try. If ``None`` the + version is discovered automatically. Returns: str: The ID of the bulk download file. Raises: - KeyError: If the bulk download file link could not be found. - RuntimeError: If the maximum number of retries is exceeded. + RuntimeError: If the bulk download file ID cannot be found even after + version discovery. """ - if retries > MAX_RETRIES: - raise RuntimeError(f"Maximum retries ({MAX_RETRIES}) exceeded.") - - if latest_flow == 1.0: - latest_flow = int(round(latest_flow, 0)) - headers = {"Accept-Encoding": "gzip"} - status, response, from_cache = _get_response_text( - f"{flow_url}{latest_flow}", headers=headers - ) - if status > 299: - return get_bulk_file_id( - flow_url=flow_url, - search_string=search_string, - latest_flow=round(latest_flow - FALLBACK_STEP, 1), - retries=retries + 1, - ) + dataflow_id = _extract_dataflow_id_from_flow_url(flow_url) - match = re.search(f"{re.escape(search_string)}(.*?) str | None: + """Attempt to retrieve the file ID for a given version. - if not match: - logger.info("The link to the bulk download file could not be found.") - return get_bulk_file_id( - flow_url=flow_url, - search_string=search_string, - latest_flow=round(latest_flow - FALLBACK_STEP, 1), - retries=retries + 1, + Returns the file ID on success, ``None`` if this version does not + contain the expected link. + """ + status, response, _ = _get_response_text( + f"{flow_url}{version}", headers=headers ) + if status > 299: + return None + found = re.search(f"{re.escape(search_string)}(.*?) version string +_version_cache: dict[str, str] = {} + + +def _build_metadata_url(dataflow_id: str) -> str: + """Construct the SDMX metadata URL for a given dataflow ID. + + Args: + dataflow_id: The SDMX dataflow identifier, e.g. ``DSD_DAC1@DF_DAC1``. + + Returns: + str: The full metadata URL. + """ + return f"{METADATA_BASE_URL}/{dataflow_id}/latest" + + +def _parse_version_from_xml(xml_text: str) -> str: + """Extract the version attribute from SDMX Dataflow XML. + + Iterates over all elements looking for the one whose local name is + ``Dataflow`` and returns its ``version`` attribute. The search is + namespace-agnostic so it works regardless of the XML namespace prefix + used by the server. + + Args: + xml_text: Raw XML response text from the metadata endpoint. + + Returns: + str: The version string, e.g. ``"1.7"``. + + Raises: + ValueError: If no Dataflow element with a version attribute is found. + """ + root = ET.fromstring(xml_text) + for element in root.iter(): + local_name = element.tag.split("}")[-1] if "}" in element.tag else element.tag + if local_name == "Dataflow" and "version" in element.attrib: + return element.attrib["version"] + raise ValueError( + "No element found in SDMX metadata response." + ) + + +def discover_latest_version(dataflow_id: str) -> str: + """Query the OECD SDMX metadata endpoint to find the latest dataflow version. + + Results are cached in the module-level ``_version_cache`` dict so that + repeated calls for the same dataflow ID within a process session incur + only one network round-trip. + + The HTTP call uses the shared requests-cache session (7-day filesystem + cache) and the global rate limiter. + + Args: + dataflow_id: The SDMX dataflow identifier, e.g. ``DSD_DAC1@DF_DAC1``. + + Returns: + str: The latest version string, e.g. ``"1.7"``. + + Raises: + ConnectionError: If the metadata endpoint returns a non-2xx status. + ValueError: If the response XML does not contain a parseable version. + """ + if dataflow_id in _version_cache: + return _version_cache[dataflow_id] + + url = _build_metadata_url(dataflow_id) + + status_code, text, _ = get_response_text(url, headers={}) + + if status_code > 299: + raise ConnectionError( + f"Metadata endpoint returned HTTP {status_code} for " + f"dataflow '{dataflow_id}': {text[:200]}" + ) + + version = _parse_version_from_xml(text) + _version_cache[dataflow_id] = version + logger.info(f"Discovered latest version for '{dataflow_id}': {version}") + return version + + +def get_dimension_count(dataflow_id: str, version: str) -> int: + """Fetch the DSD for a specific version and count key dimensions. + + This excludes the TimeDimension, counting only the dimensions that + form the positional filter key. + + Args: + dataflow_id: e.g. ``"DSD_DAC1@DF_DAC1"``. + version: e.g. ``"1.7"``. + + Returns: + int: Number of key dimensions. + + Raises: + ConnectionError: If the DSD endpoint is unreachable. + ValueError: If no dimensions are found in the response. + """ + # The DSD ID is the part before '@' in the dataflow ID + dsd_id = dataflow_id.split("@")[0] if "@" in dataflow_id else dataflow_id + url = f"{DSD_BASE_URL}/{dsd_id}/{version}" + + status_code, text, _ = get_response_text(url, headers={}) + + if status_code > 299: + raise ConnectionError( + f"DSD endpoint returned HTTP {status_code} for " + f"'{dsd_id}' version {version}: {text[:200]}" + ) + + root = ET.fromstring(text) + count = 0 + for element in root.iter(): + local_name = element.tag.split("}")[-1] if "}" in element.tag else element.tag + if local_name == "Dimension": + count += 1 + if count == 0: + raise ValueError( + f"No dimensions found in DSD '{dsd_id}' version {version}." + ) + return count + + +def clear_version_cache() -> None: + """Clear both the in-process version cache and any HTTP-cached metadata. + + Call this when you need to force a fresh metadata lookup, for example + after a new dataflow version has been published mid-session. + + Example: + >>> from oda_reader import clear_version_cache + >>> clear_version_cache() + """ + # Evict HTTP-cached metadata responses so the next lookup hits the network. + if _version_cache: + session = _get_http_session() + for dataflow_id in _version_cache: + url = _build_metadata_url(dataflow_id) + session.cache.delete(urls=[url]) + + _version_cache.clear() + logger.info("Version discovery cache cleared.") diff --git a/src/oda_reader/multisystem.py b/src/oda_reader/multisystem.py index b53a163..cd1a730 100644 --- a/src/oda_reader/multisystem.py +++ b/src/oda_reader/multisystem.py @@ -13,6 +13,7 @@ ) DATAFLOW_ID: str = "DSD_MULTI@DF_MULTI" +# Default version; actual version is discovered dynamically on fallback DATAFLOW_VERSION: str = "1.6" diff --git a/tests/common/unit/test_cache.py b/tests/common/unit/test_cache.py index b6c1c18..3c18d55 100644 --- a/tests/common/unit/test_cache.py +++ b/tests/common/unit/test_cache.py @@ -2,9 +2,9 @@ import pytest +import oda_reader._http_primitives as _http_primitives from oda_reader import ( clear_http_cache, - common, disable_http_cache, enable_http_cache, get_http_cache_info, @@ -25,12 +25,12 @@ class TestHTTPCache: def test_disable_cache_sets_flag(self): """Test that disable_http_cache sets the flag.""" enable_http_cache() - assert common._CACHE_ENABLED is True + assert _http_primitives._CACHE_ENABLED is True disable_http_cache() - # Check the global variable through the module - assert common._CACHE_ENABLED is False + # Check the global variable through the primitives module + assert _http_primitives._CACHE_ENABLED is False # Cleanup enable_http_cache() @@ -41,7 +41,7 @@ def test_enable_cache_sets_flag(self): enable_http_cache() - assert common._CACHE_ENABLED is True + assert _http_primitives._CACHE_ENABLED is True def test_clear_cache_resets_counters(self, temp_cache_dir): """Test that clear_http_cache resets cache statistics.""" diff --git a/tests/conftest.py b/tests/conftest.py index f166012..7863ac5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,8 +11,13 @@ @pytest.fixture(autouse=True) def disable_cache_for_tests(): """Disable HTTP cache for all tests by default.""" + import oda_reader._http_primitives as _http_primitives + disable_http_cache() yield + # Reset session before re-enabling to avoid SQLite contention + # in parallel test workers sharing the same cache directory. + _http_primitives._HTTP_SESSION = None enable_http_cache() @@ -27,19 +32,19 @@ def temp_cache_dir(tmp_path, monkeypatch): Yields: Path: Path to the temporary cache directory """ - from oda_reader import common + import oda_reader._http_primitives as _http_primitives cache_dir = tmp_path / "test_cache" cache_dir.mkdir() monkeypatch.setenv("ODA_READER_CACHE_DIR", str(cache_dir)) # Reset global HTTP session so it gets reinitialized with new cache path - common._HTTP_SESSION = None + _http_primitives._HTTP_SESSION = None yield cache_dir # Clean up: reset session again after test - common._HTTP_SESSION = None + _http_primitives._HTTP_SESSION = None @pytest.fixture diff --git a/tests/download/unit/test_download_tools.py b/tests/download/unit/test_download_tools.py index 9e638c8..bc3e5ed 100644 --- a/tests/download/unit/test_download_tools.py +++ b/tests/download/unit/test_download_tools.py @@ -10,8 +10,10 @@ from oda_reader.common import get_data_from_api from oda_reader.download.download_tools import ( _detect_delimiter, + _extract_dataflow_id_from_flow_url, _save_or_return_parquet_files_from_content, bulk_download_parquet, + get_bulk_file_id, ) @@ -33,28 +35,130 @@ def test_get_data_from_api_success(self, mocker, sample_csv_response): assert result == sample_csv_response assert "DONOR,RECIPIENT" in result - def test_get_data_from_api_404_triggers_retry(self, mocker): - """Test that 404 with 'Dataflow' message triggers version fallback.""" - # First call returns 404 with Dataflow message - # Second call (with fallback version) returns success - mock_responses = [ - (404, "Dataflow not found", False), - (200, "DONOR,VALUE\n1,100", False), - ] + def test_get_data_from_api_404_triggers_version_discovery(self, mocker): + """Test that a 'Dataflow not found' response triggers version discovery retry.""" + mock_get_response = mocker.patch( + "oda_reader.common._get_response_text", + side_effect=[ + (404, "Dataflow not found", False), + (200, "DONOR,VALUE\n1,100", False), + ], + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="1.9", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + return_value=7, + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + result = get_data_from_api(url) + + assert mock_get_response.call_count == 2 + assert result == "DONOR,VALUE\n1,100" + + def test_get_data_from_api_discovered_version_matches_raises(self, mocker): + """Test that matching discovered version raises immediately without retry.""" + mocker.patch( + "oda_reader.common._get_response_text", + return_value=(404, "Dataflow not found", False), + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="2.0", # same as URL version + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + with pytest.raises(ConnectionError, match="matches the attempted version"): + get_data_from_api(url) + + def test_get_data_from_api_incompatible_dsd_raises(self, mocker): + """Test that a discovered version with different dimension count raises.""" + mocker.patch( + "oda_reader.common._get_response_text", + side_effect=[ + (404, "Dataflow not found", False), + (200, "DONOR,VALUE\n1,100", False), + ], + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="3.0", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + side_effect=[7, 8], # old has 7, new has 8 — breaking change + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + with pytest.raises(ConnectionError, match="breaking schema change"): + get_data_from_api(url) - mock = mocker.patch( + def test_get_data_from_api_compatible_upgrade_succeeds(self, mocker): + """Test that auto-upgrade works when dimension count matches.""" + mocker.patch( "oda_reader.common._get_response_text", - side_effect=mock_responses, + side_effect=[ + (404, "Dataflow not found", False), + (200, "DONOR,VALUE\n1,100", False), + ], + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="3.0", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + return_value=7, # same count — compatible ) - # URL with version 2.0 should fallback to 1.9 - url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DF_DAC1,2.0/" + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" result = get_data_from_api(url) + assert result == "DONOR,VALUE\n1,100" - # Should have made 2 calls (original + fallback) - assert mock.call_count == 2 + def test_get_data_from_api_dsd_check_fails_gracefully(self, mocker): + """Test that DSD check failure doesn't block the retry.""" + mocker.patch( + "oda_reader.common._get_response_text", + side_effect=[ + (404, "Dataflow not found", False), + (200, "DONOR,VALUE\n1,100", False), + ], + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="1.9", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + side_effect=ConnectionError("DSD endpoint down"), + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + result = get_data_from_api(url) assert result == "DONOR,VALUE\n1,100" + def test_get_data_from_api_retry_also_fails_raises(self, mocker): + """Test that failed retry after discovery raises clearly.""" + mocker.patch( + "oda_reader.common._get_response_text", + return_value=(404, "Dataflow not found", False), + ) + mocker.patch( + "oda_reader.common.discover_latest_version", + return_value="1.9", + ) + mocker.patch( + "oda_reader.common.get_dimension_count", + return_value=7, + ) + + url = "https://sdmx.oecd.org/public/rest/data/OECD.DCD.FSD,DSD_DAC1@DF_DAC1,2.0/" + with pytest.raises(ConnectionError, match="even after version discovery"): + get_data_from_api(url) + def test_get_data_from_api_non_404_error_raises(self, mocker): """Test that non-404 errors raise ConnectionError.""" mock_response = (500, "Internal Server Error", False) @@ -337,3 +441,122 @@ def test_no_warning_when_is_txt_not_provided(self, mocker): warnings.simplefilter("error", DeprecationWarning) # Should not raise any DeprecationWarning bulk_download_parquet("fake-id") + + +@pytest.mark.unit +class TestExtractDataflowIdFromFlowUrl: + """Test the helper that extracts a dataflow ID from a bulk-download flow URL.""" + + @pytest.mark.parametrize( + "url,expected", + [ + ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS/", + "DSD_CRS@DF_CRS", + ), + ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_DAC2@DF_DAC2A/", + "DSD_DAC2@DF_DAC2A", + ), + ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_MULTI@DF_MULTI/", + "DSD_MULTI@DF_MULTI", + ), + # trailing slash optional + ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS", + "DSD_CRS@DF_CRS", + ), + ], + ) + def test_recognized_urls(self, url, expected): + assert _extract_dataflow_id_from_flow_url(url) == expected + + def test_unrecognized_url_returns_none(self): + assert _extract_dataflow_id_from_flow_url("https://example.com/other/path") is None + + +FLOW_URL = "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/DSD_CRS@DF_CRS/" +SEARCH_STRING = "DF_CRS_BULK=" + + +@pytest.mark.unit +class TestGetBulkFileId: + """Test get_bulk_file_id with discovery and fallback paths.""" + + def test_explicit_version_succeeds_immediately(self, mocker): + """When latest_flow is provided and works, discovery is not called.""" + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + return_value=(200, f"{SEARCH_STRING}abc123", False), + ) + mock_discover = mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + ) + + result = get_bulk_file_id(FLOW_URL, SEARCH_STRING, latest_flow=1.6) + + assert result == "abc123" + mock_discover.assert_not_called() + + def test_discovery_succeeds_when_no_explicit_version(self, mocker): + """When latest_flow=None, discovery is used and succeeds.""" + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + return_value=(200, f"{SEARCH_STRING}xyz789", False), + ) + mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + return_value="1.7", + ) + + result = get_bulk_file_id(FLOW_URL, SEARCH_STRING) + assert result == "xyz789" + + def test_explicit_version_fails_then_discovery_rescues(self, mocker): + """When explicit version fails, discovery finds a working version.""" + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + side_effect=[ + (404, "Not found", False), # explicit version fails + (200, f"{SEARCH_STRING}rescued", False), # discovered version works + ], + ) + mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + return_value="1.7", + ) + + result = get_bulk_file_id(FLOW_URL, SEARCH_STRING, latest_flow=1.8) + assert result == "rescued" + + def test_discovery_fails_then_scan_rescues(self, mocker): + """When discovery raises, the decrement scan finds a working version.""" + responses = [(404, "Not found", False)] * 5 + [ + (200, f"{SEARCH_STRING}scanned", False), + ] + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + side_effect=responses, + ) + mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + side_effect=ConnectionError("metadata endpoint down"), + ) + + result = get_bulk_file_id(FLOW_URL, SEARCH_STRING, latest_flow=2.0) + assert result == "scanned" + + def test_all_methods_exhausted_raises(self, mocker): + """When discovery and scan both fail, RuntimeError is raised.""" + mocker.patch( + "oda_reader.download.download_tools._get_response_text", + return_value=(404, "Not found", False), + ) + mocker.patch( + "oda_reader.download.download_tools.discover_latest_version", + side_effect=ConnectionError("metadata endpoint down"), + ) + + with pytest.raises(RuntimeError, match="could not be found"): + get_bulk_file_id(FLOW_URL, SEARCH_STRING, latest_flow=1.0) diff --git a/tests/download/unit/test_version_discovery.py b/tests/download/unit/test_version_discovery.py new file mode 100644 index 0000000..021c273 --- /dev/null +++ b/tests/download/unit/test_version_discovery.py @@ -0,0 +1,336 @@ +"""Unit tests for the version_discovery module.""" + +import pytest + +from oda_reader.download.version_discovery import ( + _build_metadata_url, + _parse_version_from_xml, + clear_version_cache, + discover_latest_version, + get_dimension_count, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_VALID_XML = """\ + + + + + + DAC1 + + + + +""" + +_NAMESPACED_XML = """\ + + + + + + DAC1 + + + + +""" + +_MISSING_VERSION_XML = """\ + + + + + + DAC1 + + + + +""" + + +# --------------------------------------------------------------------------- +# TestBuildMetadataUrl +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestBuildMetadataUrl: + """Verify URL construction for various dataflow IDs.""" + + def test_basic_dataflow_id(self): + url = _build_metadata_url("DSD_DAC1@DF_DAC1") + assert url == ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD" + "/DSD_DAC1@DF_DAC1/latest" + ) + + def test_crs_dataflow_id(self): + url = _build_metadata_url("DSD_CRS@DF_CRS") + assert url == ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD" + "/DSD_CRS@DF_CRS/latest" + ) + + def test_multisystem_dataflow_id(self): + url = _build_metadata_url("DSD_MULTI@DF_MULTI") + assert url == ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD" + "/DSD_MULTI@DF_MULTI/latest" + ) + + def test_dac2a_dataflow_id(self): + url = _build_metadata_url("DSD_DAC2@DF_DAC2A") + assert url == ( + "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD" + "/DSD_DAC2@DF_DAC2A/latest" + ) + + +# --------------------------------------------------------------------------- +# TestParseVersionFromXml +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestParseVersionFromXml: + """Verify XML parsing handles valid, namespaced, and invalid inputs.""" + + def test_valid_xml(self): + version = _parse_version_from_xml(_VALID_XML) + assert version == "1.7" + + def test_namespaced_xml(self): + version = _parse_version_from_xml(_NAMESPACED_XML) + assert version == "2.3" + + def test_missing_version_raises(self): + with pytest.raises(ValueError, match="No >>") + + +# --------------------------------------------------------------------------- +# TestDiscoverLatestVersion +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def _mock_http(mocker): + """Patch get_response_text in version_discovery and clear the cache.""" + clear_version_cache() + # Also mock _get_http_session so clear_version_cache() doesn't create a + # real SQLite-backed session during tests (causes conflicts in parallel). + mocker.patch( + "oda_reader.download.version_discovery._get_http_session", + return_value=mocker.MagicMock(), + ) + return mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + ) + + +@pytest.mark.unit +class TestDiscoverLatestVersion: + """Verify HTTP calls, caching behaviour, and error propagation.""" + + def test_returns_parsed_version(self, _mock_http): + _mock_http.return_value = (200, _VALID_XML, False) + + version = discover_latest_version("DSD_DAC1@DF_DAC1") + assert version == "1.7" + + def test_result_is_cached(self, _mock_http): + """A second call for the same dataflow ID must not make another HTTP request.""" + _mock_http.return_value = (200, _VALID_XML, False) + + v1 = discover_latest_version("DSD_DAC1@DF_DAC1") + v2 = discover_latest_version("DSD_DAC1@DF_DAC1") + + assert v1 == v2 == "1.7" + _mock_http.assert_called_once() + + def test_http_error_raises_connection_error(self, _mock_http): + """Non-2xx status code raises ConnectionError.""" + _mock_http.return_value = (404, "Not found", False) + + with pytest.raises(ConnectionError, match="HTTP 404"): + discover_latest_version("DSD_DAC1@DF_DAC1") + + def test_different_dataflows_cached_separately(self, _mock_http): + """Each dataflow ID is cached independently.""" + _mock_http.side_effect = [ + (200, _VALID_XML, False), + (200, _NAMESPACED_XML, False), + ] + + v_dac1 = discover_latest_version("DSD_DAC1@DF_DAC1") + v_crs = discover_latest_version("DSD_CRS@DF_CRS") + + assert v_dac1 == "1.7" + assert v_crs == "2.3" + + +# --------------------------------------------------------------------------- +# TestClearVersionCache +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestClearVersionCache: + """Verify that clear_version_cache empties the in-process cache.""" + + def test_clear_forces_refetch(self, _mock_http): + """After clearing, the next call re-fetches from the network.""" + _mock_http.return_value = (200, _VALID_XML, False) + + discover_latest_version("DSD_DAC1@DF_DAC1") + assert _mock_http.call_count == 1 + + clear_version_cache() + + discover_latest_version("DSD_DAC1@DF_DAC1") + assert _mock_http.call_count == 2 + + def test_clear_evicts_http_cache(self, _mock_http, mocker): + """clear_version_cache should also evict HTTP-cached metadata URLs.""" + _mock_http.return_value = (200, _VALID_XML, False) + + mock_session = mocker.MagicMock() + mocker.patch( + "oda_reader.download.version_discovery._get_http_session", + return_value=mock_session, + ) + + discover_latest_version("DSD_DAC1@DF_DAC1") + clear_version_cache() + + mock_session.cache.delete.assert_called_once() + # Verify the URL passed matches the metadata URL pattern + call_kwargs = mock_session.cache.delete.call_args + urls = call_kwargs[1]["urls"] if "urls" in call_kwargs[1] else call_kwargs[0][0] + assert any("DSD_DAC1@DF_DAC1" in u for u in urls) + + def test_clear_on_empty_cache_is_noop(self, mocker): + """Clearing an empty cache should not error.""" + clear_version_cache() # should not raise + + +# --------------------------------------------------------------------------- +# TestGetDimensionCount +# --------------------------------------------------------------------------- + +_DSD_XML_7_DIMS = """\ + + + + + + + + + + + + + + + + + + + + + +""" + +_DSD_XML_NO_DIMS = """\ + + + + + + + + + + + + +""" + + +@pytest.mark.unit +class TestGetDimensionCount: + """Verify DSD dimension counting.""" + + def test_counts_dimensions_excluding_time(self, mocker): + """TimeDimension should not be counted, only Dimension elements.""" + mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + return_value=(200, _DSD_XML_7_DIMS, False), + ) + + count = get_dimension_count("DSD_DAC1@DF_DAC1", "1.7") + assert count == 7 + + def test_empty_dsd_raises(self, mocker): + """DSD with no Dimension elements should raise ValueError.""" + mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + return_value=(200, _DSD_XML_NO_DIMS, False), + ) + + with pytest.raises(ValueError, match="No dimensions found"): + get_dimension_count("DSD_DAC1@DF_DAC1", "1.0") + + def test_http_error_raises(self, mocker): + """Non-2xx from DSD endpoint should raise ConnectionError.""" + mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + return_value=(404, "Not found", False), + ) + + with pytest.raises(ConnectionError, match="HTTP 404"): + get_dimension_count("DSD_DAC1@DF_DAC1", "9.9") + + def test_splits_dataflow_id_on_at_sign(self, mocker): + """DSD ID should be derived from the part before '@'.""" + mock = mocker.patch( + "oda_reader.download.version_discovery.get_response_text", + return_value=(200, _DSD_XML_7_DIMS, False), + ) + + get_dimension_count("DSD_DAC1@DF_DAC1", "1.7") + + called_url = mock.call_args[0][0] + assert "/DSD_DAC1/1.7" in called_url + assert "@" not in called_url + + +# --------------------------------------------------------------------------- +# TestDiscoverLatestVersionEdgeCases +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestDiscoverLatestVersionEdgeCases: + """Edge cases for discover_latest_version.""" + + def test_bad_xml_in_200_raises_valueerror(self, _mock_http): + """200 with unparseable XML should raise ValueError.""" + _mock_http.return_value = (200, "