Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog for oda_reader

## 1.5.0 (2026-04-09)
- Replaces blind version-decrement fallback with authoritative SDMX metadata endpoint lookup for all datasets.
- Adds `clear_version_cache()` to the public API for forcing fresh version discovery mid-session.

## 1.4.3 (2026-04-09)
- Fixes DAC1 query filter dimension order to match the current DSD schema, which added SECTOR at position 2.
- Bumps DAC1 dataflow version from 1.7 to 1.8.
Expand Down
36 changes: 15 additions & 21 deletions docs/docs/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ qb = QueryBuilder()
# Build a custom DAC1 filter
filter_string = qb.build_dac1_filter(
donor="USA",
sector="_Z",
measure="1010",
flow_type="1140"
)

print(filter_string)
# Output: "USA..1010..1140.."
# Output: "USA._Z.1010..1140.."
```

This filter string can be used to manually construct API URLs.
Expand All @@ -39,50 +40,43 @@ Each method returns a filter string suitable for the SDMX API.

## Dataflow Version Handling

OECD occasionally changes dataflow versions (schema updates). ODA Reader handles this automatically with version fallback.
OECD occasionally changes dataflow versions (schema updates). ODA Reader handles this automatically with version discovery.

### Automatic Fallback
### Automatic Version Discovery

When a dataflow version returns 404 (not found), ODA Reader automatically:
When a dataflow version returns an error (not found), ODA Reader automatically queries the OECD's SDMX metadata endpoint to discover the latest published version and retries with it:

1. Tries the configured version (e.g., `1.5`)
2. If 404, retries with `1.4`
3. Continues decrementing: `1.3`, `1.2`, `1.1`
4. Returns data from first successful version (up to 5 attempts)
1. Tries the configured default version
2. If not found, queries the metadata endpoint for the authoritative latest version
3. Retries once with the discovered version

This means your code keeps working even when OECD makes breaking schema changes.
This means your code keeps working even when OECD updates schema versions. You'll see a log message indicating which version was discovered.

**Example**:
### Clearing the Version Cache

Discovered versions are cached in-process so repeated queries don't trigger extra metadata lookups. If the OECD publishes a new dataflow version mid-session, you can force re-discovery:

```python
from oda_reader import download_dac1
from oda_reader import clear_version_cache

# ODA Reader will automatically try:
# 1.5 -> 404
# 1.4 -> 404
# 1.3 -> Success! Returns data with version 1.3
data = download_dac1(start_year=2022, end_year=2022)
clear_version_cache()
```

You'll see a message indicating which version succeeded.

### Manual Version Override

You can specify an exact dataflow version:

```python
# Force use of version 1.3
data = download_dac1(
start_year=2022,
end_year=2022,
dataflow_version="1.3"
dataflow_version="1.7"
)
```

**When to override**:
- You know the correct version for reproducibility
- Debugging version-specific issues
- Avoiding automatic fallback (for performance)

**Available for**:
- `download_dac1(dataflow_version=...)`
Expand Down
12 changes: 12 additions & 0 deletions docs/docs/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ disable_http_cache()
enable_http_cache()
```

### Clear Version Discovery Cache

ODA Reader caches discovered dataflow versions in-process. If the OECD publishes a new version mid-session:

```python
from oda_reader import clear_version_cache

clear_version_cache()
```

This forces a fresh metadata lookup on the next query.

### Clear HTTP Cache Only

```python
Expand Down
3 changes: 1 addition & 2 deletions docs/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ ODA Reader eliminates these headaches. It provides a unified Python interface th
- **Bulk download large files** with memory-efficient streaming for the full CRS (1GB+)
- **Automatic rate limiting** and caching to work within API constraints
- **Schema translation** between Data Explorer API and OECD.Stat formats
- **Version fallback** automatically searches for the most recent schema version since they
can unexpectedly change with new data releases.
- **Automatic version discovery** queries OECD's SDMX metadata to find the latest dataflow version, handling schema changes transparently.

**Built for researchers, analysts, and developers** who need reliable, programmatic access to ODA data without fighting infrastructure.

Expand Down
2 changes: 1 addition & 1 deletion docs/docs/why-oda-reader.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The OECD Development Assistance Committee publishes comprehensive data on offici
- Schema changes break queries without warning
- No automatic retries or fallback mechanisms

**What ODA Reader provides**: Automatic version fallback, rate limiting built-in, consistent function interface, handles schema changes gracefully.
**What ODA Reader provides**: Automatic version discovery, rate limiting built-in, consistent function interface, handles schema changes gracefully.

### Manual Downloads from OECD.Stat

Expand Down
13 changes: 10 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "oda_reader"
version = "1.4.3"
version = "1.5.0"
description = "A simple package to import ODA data from the OECD's API and AidData's database"
readme = "README.md"
license = "MIT"
Expand All @@ -24,13 +24,13 @@ classifiers = [
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"filelock>=3.18.0",
"filelock>=3.20.3",
"joblib>=1.4",
"openpyxl>=3.1.0",
"pandas>=2.2.0",
"platformdirs>=4.5.0",
"pyarrow>=14.0.0",
"requests>=2.32",
"requests>=2.33.0",
"requests-cache>=1.2.0",
]

Expand Down Expand Up @@ -58,6 +58,13 @@ test = [
"pytest-xdist>=3.5",
]

[tool.uv]
constraint-dependencies = [
"urllib3>=2.6.3",
"virtualenv>=20.36.1",
"pygments>=2.20.0",
]

[tool.ruff]
# Set the maximum line length
line-length = 88
Expand Down
3 changes: 3 additions & 0 deletions src/oda_reader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
enable_http_cache,
get_http_cache_info,
)
from oda_reader.download.version_discovery import clear_version_cache
from oda_reader.crs import bulk_download_crs, download_crs, download_crs_file
from oda_reader.dac1 import download_dac1
from oda_reader.dac2a import bulk_download_dac2a, download_dac2a
Expand Down Expand Up @@ -55,6 +56,8 @@
"disable_http_cache",
"clear_http_cache",
"get_http_cache_info",
# Version discovery cache
"clear_version_cache",
# DataFrame and bulk cache managers
"dataframe_cache",
"bulk_cache_manager",
Expand Down
145 changes: 145 additions & 0 deletions src/oda_reader/_http_primitives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""Shared HTTP primitives used by both common.py and version_discovery.py.

This module exists to break the circular import that would arise if
version_discovery.py imported from common.py while common.py imports
discover_latest_version from version_discovery.py.

This module must not import from common.py or version_discovery.py to avoid
circular imports.
"""

import logging
import time
from collections import deque

import requests
import requests_cache

from oda_reader._cache.config import get_http_cache_path

logger = logging.getLogger("oda_importer")

# ---------------------------------------------------------------------------
# Rate limiter
# ---------------------------------------------------------------------------


class RateLimiter:
"""Simple blocking rate limiter.

Parameters correspond to the maximum number of calls allowed within
``period`` seconds. ``wait`` pauses execution when the limit has been
reached.
"""

def __init__(self, max_calls: int = 20, period: float = 60.0) -> None:
self.max_calls = max_calls
self.period = period
self._calls: deque[float] = deque()

def wait(self) -> None:
"""Block until a new call is allowed."""
now = time.monotonic()
while self._calls and now - self._calls[0] >= self.period:
self._calls.popleft()
if len(self._calls) >= self.max_calls:
sleep_for = self.period - (now - self._calls[0])
time.sleep(max(sleep_for, 0))
self._calls.popleft()
self._calls.append(time.monotonic())


API_RATE_LIMITER = RateLimiter()

# ---------------------------------------------------------------------------
# HTTP cache session
# ---------------------------------------------------------------------------

# Global HTTP cache session (initialized lazily)
_HTTP_SESSION: requests_cache.CachedSession | None = None
_CACHE_ENABLED = True


def _get_http_session() -> requests_cache.CachedSession:
"""Get or create the global HTTP cache session.

All responses are cached for 7 days (604800 seconds).
Uses filesystem backend to handle large responses (>2GB).

Returns:
CachedSession: requests-cache session with 7-day expiration.
"""
global _HTTP_SESSION

if _HTTP_SESSION is None:
cache_path = str(get_http_cache_path())

_HTTP_SESSION = requests_cache.CachedSession(
cache_name=cache_path,
backend="filesystem",
expire_after=604800, # 7 days
allowable_codes=(200, 404), # Cache 404s for version fallback
stale_if_error=True, # Use stale cache if API errors
)

return _HTTP_SESSION


def get_response_text(url: str, headers: dict) -> tuple[int, str, bool]:
"""GET request returning status code, text content, and cache hit status.

This call is subject to the global rate limiter and HTTP caching.

Args:
url: The URL to fetch.
headers: Headers to include in the request.

Returns:
tuple[int, str, bool]: Status code, text content, and whether from cache.
"""
API_RATE_LIMITER.wait()

if _CACHE_ENABLED:
session = _get_http_session()
response = session.get(url, headers=headers)
from_cache = getattr(response, "from_cache", False)
if from_cache:
logger.info(f"Loading data from HTTP cache: {url}")
else:
logger.info(f"Fetching data from API: {url}")
else:
response = requests.get(url, headers=headers)
from_cache = False
logger.info(f"Fetching data from API (cache disabled): {url}")

return response.status_code, response.text, from_cache


def get_response_content(url: str, headers: dict) -> tuple[int, bytes, bool]:
"""GET request returning status code, raw content, and cache hit status.

This call is subject to the global rate limiter and HTTP caching.

Args:
url: The URL to fetch.
headers: Headers to include in the request.

Returns:
tuple[int, bytes, bool]: Status code, content bytes, and whether from cache.
"""
API_RATE_LIMITER.wait()

if _CACHE_ENABLED:
session = _get_http_session()
response = session.get(url, headers=headers)
from_cache = getattr(response, "from_cache", False)
if from_cache:
logger.info(f"Loading data from HTTP cache: {url}")
else:
logger.info(f"Fetching data from API: {url}")
else:
response = requests.get(url, headers=headers)
from_cache = False
logger.info(f"Fetching data from API (cache disabled): {url}")

return response.status_code, response.content, from_cache
Loading
Loading