diff --git a/NEWS.md b/NEWS.md index a3aa8361..2e2dba99 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,13 @@ +**04/23/2026:** Added `waterdata.get_nearest_continuous(targets, ...)` — for each of N target timestamps, fetches the single continuous observation closest to that timestamp in one HTTP round-trip (auto-chunked when the resulting CQL filter is long, via the facility added in #238). The helper is designed for workflows that pair many discrete-measurement timestamps with surrounding instantaneous data, which the OGC `time` parameter can't express since it only accepts one instant or one interval per request. Ties at window midpoints are resolved per a configurable `on_tie` ∈ {`"first"`, `"last"`, `"mean"`}; the default `window="PT7M30S"` matches a 15-minute continuous gauge. + +**04/22/2026:** Highlights since the `v1.1.0` release (2025-11-26), which shipped the `waterdata` module: + +- Added `get_channel` for channel-measurement data (#218) and `get_stats_por` / `get_stats_date_range` for period-of-record and daily statistics (#207). +- Added `get_reference_table` (and made it considerably simpler and faster in #209), then extended it to accept arbitrary collections-API query parameters (#214). +- Removed the deprecated `waterwatch` module (#228) and several defunct NWIS stubs (#222, #225), and added `py.typed` so `dataretrieval` ships type information to downstream users (#186). +- Now supports `pandas` 3.x (#221). +- The OGC `waterdata` getters (`get_continuous`, `get_daily`, `get_field_measurements`, and the six others built on the same OGC collections) now accept `filter` and `filter_lang` kwargs that are passed through to the service's CQL filter parameter. This enables advanced server-side filtering that isn't expressible via the other kwargs — most commonly, OR'ing multiple time ranges into a single request. A long expression made up of a top-level `OR` chain is transparently split into multiple requests that each fit under the server's URI length limit, and the results are concatenated. + **12/04/2025:** The `get_continuous()` function was added to the `waterdata` module, which provides access to measurements collected via automated sensors at a high frequency (often 15 minute intervals) at a monitoring location. This is an early version of the continuous endpoint and should be used with caution as the API team improves its performance. In the future, we anticipate the addition of an endpoint(s) specifically for handling large data requests, so it may make sense for power users to hold off on heavy development using the new continuous endpoint. **11/24/2025:** `dataretrieval` is pleased to offer a new module, `waterdata`, which gives users access USGS's modernized [Water Data APIs](https://api.waterdata.usgs.gov/). The Water Data API endpoints include daily values, instantaneous values, field measurements (modernized groundwater levels service), time series metadata, and discrete water quality data from the Samples database. Though there will be a period of overlap, the functions within `waterdata` will eventually replace the `nwis` module, which currently provides access to the legacy [NWIS Water Services](https://waterservices.usgs.gov/). More example workflows and functions coming soon. Check `help(waterdata)` for more information. diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index 2110de83..f0df1f1d 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -25,6 +25,8 @@ get_stats_por, get_time_series_metadata, ) +from .filters import FILTER_LANG +from .nearest import get_nearest_continuous from .types import ( CODE_SERVICES, PROFILE_LOOKUP, @@ -34,6 +36,7 @@ __all__ = [ "CODE_SERVICES", + "FILTER_LANG", "PROFILES", "PROFILE_LOOKUP", "SERVICES", @@ -45,6 +48,7 @@ "get_latest_continuous", "get_latest_daily", "get_monitoring_locations", + "get_nearest_continuous", "get_reference_table", "get_samples", "get_stats_date_range", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index b2310e7a..a3c0a94b 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -16,6 +16,7 @@ from requests.models import PreparedRequest from dataretrieval.utils import BaseMetadata, to_str +from dataretrieval.waterdata.filters import FILTER_LANG from dataretrieval.waterdata.types import ( CODE_SERVICES, METADATA_COLLECTIONS, @@ -51,6 +52,8 @@ def get_daily( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the @@ -177,6 +180,11 @@ def get_daily( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (NA) will set the limit to the maximum allowable limit for the service. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -228,6 +236,8 @@ def get_continuous( last_modified: str | None = None, time: str | list[str] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """ @@ -348,6 +358,11 @@ def get_continuous( allowable limit is 10000. It may be beneficial to set this number lower if your internet connection is spotty. The default (NA) will set the limit to the maximum allowable limit for the service. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector @@ -370,6 +385,21 @@ def get_continuous( ... parameter_code="00065", ... time="2021-01-01T00:00:00Z/2022-01-01T00:00:00Z", ... ) + + >>> # Pull several disjoint time windows in one call via a CQL + >>> # ``filter``. See ``dataretrieval.waterdata.filters`` for the + >>> # full grammar, auto-chunking, and pitfalls. + >>> df, md = dataretrieval.waterdata.get_continuous( + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... filter=( + ... "(time >= '2023-06-01T12:00:00Z' " + ... "AND time <= '2023-06-01T13:00:00Z') " + ... "OR (time >= '2023-06-15T12:00:00Z' " + ... "AND time <= '2023-06-15T13:00:00Z')" + ... ), + ... filter_lang="cql-text", + ... ) """ service = "continuous" output_id = "continuous_id" @@ -426,6 +456,8 @@ def get_monitoring_locations( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Location information is basic information about the monitoring location @@ -635,6 +667,11 @@ def get_monitoring_locations( The returning object will be a data frame with no spatial information. Note that the USGS Water Data APIs use camelCase "skipGeometry" in CQL2 queries. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -697,6 +734,8 @@ def get_time_series_metadata( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data and continuous measurements are grouped into time series, @@ -851,6 +890,11 @@ def get_time_series_metadata( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -903,6 +947,8 @@ def get_latest_continuous( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """This endpoint provides the most recent observation for each time series @@ -1026,6 +1072,11 @@ def get_latest_continuous( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1075,6 +1126,8 @@ def get_latest_daily( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Daily data provide one data value to represent water conditions for the @@ -1200,6 +1253,11 @@ def get_latest_daily( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1251,6 +1309,8 @@ def get_field_measurements( time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Field measurements are physically measured values collected during a @@ -1366,6 +1426,11 @@ def get_field_measurements( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -2017,6 +2082,8 @@ def get_channel( skip_geometry: bool | None = None, bbox: list[float] | None = None, limit: int | None = None, + filter: str | None = None, + filter_lang: FILTER_LANG | None = None, convert_type: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """ @@ -2123,6 +2190,11 @@ def get_channel( vertical_velocity_description, longitudinal_velocity_description, measurement_type, last_modified, channel_measurement_type. The default (NA) will return all columns of the data. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector diff --git a/dataretrieval/waterdata/filters.py b/dataretrieval/waterdata/filters.py new file mode 100644 index 00000000..bcbe6ccc --- /dev/null +++ b/dataretrieval/waterdata/filters.py @@ -0,0 +1,518 @@ +"""CQL ``filter`` support for the Water Data OGC getters. + +Everything related to the ``filter`` / ``filter_lang`` kwargs lives in +this module: the ``FILTER_LANG`` type alias, the top-level ``OR`` +splitter / chunker, the per-request URL-budget probe, the +lexicographic-pitfall guard (see the module comment on +``_NUMERIC_COMPARE_RE`` for why), and the ``chunked`` decorator that +``utils.py`` applies to its single-request fetch function. + +Scope — what this module parses vs. what passes through +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The CQL-text filter the caller supplies is **forwarded verbatim** to +the server as a ``filter=`` query parameter. This module doesn't +parse CQL semantics; it inspects the string for exactly two +purposes: + +1. **Chunking** (``_chunk_cql_or``) splits the filter on top-level + ``OR`` boundaries so the full URL can stay under the server's + ~8 KB limit. Only ``OR`` splits cleanly into independent + sub-requests whose result sets can be union'd back together by + ``pd.concat`` + dedup. Everything else — ``AND`` chains, + ``NOT``, ``LIKE``, ``IS NULL``, spatial / temporal predicates, + function calls — is sent as a single request. If such a filter + is long enough to trip the URL limit the caller gets the + server's ``414``; we don't try to rewrite it, because no rewrite + preserves set semantics for those shapes. + +2. **Pitfall detection** (``_check_numeric_filter_pitfall``) scans + for three patterns where the user almost certainly means a + numeric comparison but the server would do a lexicographic one + (every queryable is string-typed): + + - `` `` (and the reverse) + - `` [NOT] IN (, ...)`` + - `` [NOT] BETWEEN AND `` + + ``LIKE``, ``IS NULL``, function-call RHS (``COUNT(x) > 5``), + ``CAST`` expressions, and arithmetic (``value > 1 + 1`` — + partially caught on ``value > 1``) are not flagged. The first + three the server doesn't support anyway; the last is a minor + offense-text imprecision rather than a correctness issue. + +Isolation contract (rolling the feature back) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The feature's footprint outside this module is deliberately small +and mechanical: + +- Files to delete: ``filters.py``, ``nearest.py`` (which depends on + filters), ``tests/waterdata_filters_test.py``, + ``tests/waterdata_nearest_test.py``. +- ``__init__.py``: drop two imports (``FILTER_LANG``, + ``get_nearest_continuous``) and two ``__all__`` entries. +- ``utils.py``: drop the ``from . import filters`` import and the + ``@filters.chunked(...)`` decorator on ``_fetch_once``. The two + function bodies (``_fetch_once``, ``get_ogc_data``) are + filter-unaware and need no changes. +- ``api.py``: drop the ``from .filters import FILTER_LANG`` import, + the eight ``filter, filter_lang`` kwarg pairs on the OGC getters, + and their one-line docstring pointers (now + ``filter, filter_lang : optional — see dataretrieval.waterdata.filters``). + Also the one compact filter example inside ``get_continuous``'s + docstring. + +The two-line ``filter_lang`` → ``filter-lang`` URL-key translation +inside ``_construct_api_requests`` becomes unreachable dead code (no +caller sets it); removing it is optional. + +Only two names are imported by other modules — ``FILTER_LANG`` and +``chunked``. Everything else is package-private. +""" + +from __future__ import annotations + +import functools +import re +from collections.abc import Callable, Iterator +from typing import Any, Literal, TypeVar +from urllib.parse import quote_plus + +import pandas as pd +import requests + +# --------------------------------------------------------------------- +# Public types +# --------------------------------------------------------------------- + +FILTER_LANG = Literal["cql-text", "cql-json"] + + +# --------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------- + +# Conservative fallback budget (characters) for a single CQL ``filter`` +# query parameter, used when ``_chunk_cql_or`` is called without a +# ``max_len``. The ``chunked`` decorator computes a tighter +# per-request budget from ``_WATERDATA_URL_BYTE_LIMIT`` below. +_CQL_FILTER_CHUNK_LEN = 5000 + +# Total URL byte limit the Water Data API will accept before replying +# HTTP 414 (Request-URI Too Large). Empirically the cliff sits at +# ~8,200 bytes of full URL, which lines up with nginx's default +# ``large_client_header_buffers`` of 8 KB (8192). 8000 leaves ~200 bytes +# of headroom for request-line framing ("GET ... HTTP/1.1\r\n") and any +# intermediate proxy variance. +_WATERDATA_URL_BYTE_LIMIT = 8000 + +# Conservative over-estimate of the URL bytes consumed by everything +# *except* the filter value — the base URL, other query params, and the +# ``&filter=`` / ``&filter-lang=...`` keys. Used only to decide whether a +# filter is small enough that the expensive budget probe can be skipped. +_NON_FILTER_URL_HEADROOM = 1000 + + +# --------------------------------------------------------------------- +# Pitfall regexes +# --------------------------------------------------------------------- + +# Every queryable property on every OGC collection for the Water Data +# API is ``type: string`` (confirmed across ``continuous``, ``daily``, +# ``field-measurements``, ``monitoring-locations``, +# ``time-series-metadata``, ``latest-continuous``, ``latest-daily``, +# ``channel-measurements`` — see ``/collections//queryables``). +# That includes fields whose *values* look numeric — ``value``, +# ``parameter_code`` (``'00060'``), ``statistic_id`` (``'00011'``), +# ``district_code`` (``'01'``), ``hydrologic_unit_code``, +# ``channel_flow``, and more. Comparing any of them to an *unquoted* +# numeric literal (``value >= 1000``) triggers a lexicographic sort on +# the server and silently produces wrong results — zero-padded codes +# are especially nasty (``parameter_code = 60`` matches nothing because +# the real values are all ``'00060'``-shaped). So the rule we enforce +# client-side is the general one: any `` `` is a bug — quote the literal or drop the comparison and +# filter in pandas. + +# Unquoted numeric literal: integer, decimal (with or without leading +# zero), or scientific notation. ``\d+(?:\.\d+)?`` covers ``1``, +# ``1.5``; ``\.\d+`` covers the leading-dot form ``.5`` that users +# sometimes write as a fraction. Trailing-dot ``5.`` is deliberately +# not accepted (not a common numeric spelling and not required CQL). +_NUM = r"-?(?:\d+(?:\.\d+)?|\.\d+)(?:[eE][+-]?\d+)?" +_IDENT = r"[A-Za-z_]\w*" +_OP = r">=|<=|<>|!=|==|=|>|<" + +# ```` in ``IN`` / ``BETWEEN`` contexts, with optional ``NOT`` +# keyword after the field. ``(?!NOT\b)`` keeps the bare keyword +# ``NOT`` from being captured as the field when the caller writes +# ``value NOT IN (…)``; the ``(?PNOT\s+)?`` after the field +# captures the negation so the error message can report the offending +# form accurately. +_FIELD_NEGATED = rf"\b(?!NOT\b)(?P{_IDENT})\s+(?PNOT\s+)?" + +_NUMERIC_COMPARE_RE = re.compile( + rf""" + (?: + \b(?P{_IDENT})\s* + (?P{_OP})\s* + (?P{_NUM})\b + | + \b(?P{_NUM})\s* + (?P{_OP})\s* + (?P{_IDENT})\b + ) + """, + re.VERBOSE, +) + +# `` [NOT] IN (, ...)`` — same footgun as a simple +# comparison but using the list form. Caught separately because +# ``IN`` isn't one of the comparison operators in ``_OP``. +_IN_NUMERIC_RE = re.compile( + rf"{_FIELD_NEGATED}IN\s*\(\s*{_NUM}", + re.IGNORECASE, +) + +# `` [NOT] BETWEEN AND `` — range form. +_BETWEEN_NUMERIC_RE = re.compile( + rf"{_FIELD_NEGATED}BETWEEN\s+{_NUM}\s+AND\s+{_NUM}\b", + re.IGNORECASE, +) + + +# --------------------------------------------------------------------- +# Top-level OR splitter / chunker +# --------------------------------------------------------------------- + + +def _iter_or_boundaries(expr: str) -> Iterator[tuple[int, int]]: + """Yield ``(start, end)`` spans of each top-level ``OR`` separator. + + Tracks single/double-quoted string literals and parenthesized + sub-expressions so that ``OR`` tokens inside them are skipped. + Matching is case-insensitive and the yielded span covers the + surrounding whitespace on both sides. + """ + depth = 0 + in_quote = None + i = 0 + n = len(expr) + while i < n: + ch = expr[i] + if in_quote is not None: + if ch == in_quote: + in_quote = None + i += 1 + continue + if ch in ("'", '"'): + in_quote = ch + i += 1 + continue + if ch == "(": + depth += 1 + i += 1 + continue + if ch == ")": + depth -= 1 + i += 1 + continue + if depth == 0 and ch.isspace(): + j = i + 1 + while j < n and expr[j].isspace(): + j += 1 + if j + 2 <= n and expr[j : j + 2].lower() == "or": + k = j + 2 + if k < n and expr[k].isspace(): + m = k + 1 + while m < n and expr[m].isspace(): + m += 1 + yield i, m + i = m + continue + i += 1 + + +def _split_top_level_or(expr: str) -> list[str]: + """Split a CQL expression at each top-level ``OR`` separator. + + Respects parentheses and single/double-quoted string literals so that + ``OR`` tokens inside ``(A OR B)`` or ``'word OR word'`` are left alone. + Matching is case-insensitive. Whitespace around each emitted part is + stripped; empty parts are dropped. + """ + parts = [] + last = 0 + for start, end in _iter_or_boundaries(expr): + parts.append(expr[last:start].strip()) + last = end + parts.append(expr[last:].strip()) + return [p for p in parts if p] + + +def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]: + """Split a CQL expression into OR-chunks that each fit under ``max_len``. + + The splitter only understands top-level ``OR`` chains, since that is + the only shape that can be recombined losslessly as a disjunction of + independent sub-queries. Returns ``[expr]`` unchanged when the whole + expression already fits, when it contains no top-level ``OR``, or when + any single clause is larger than ``max_len`` on its own (we would + rather send a too-long request and surface the server's 414 than + silently drop data). + """ + if len(expr) <= max_len: + return [expr] + parts = _split_top_level_or(expr) + if len(parts) < 2 or any(len(p) > max_len for p in parts): + return [expr] + + chunks = [] + current = [] + current_len = 0 + for part in parts: + join_cost = len(" OR ") if current else 0 + if current and current_len + join_cost + len(part) > max_len: + chunks.append(" OR ".join(current)) + current = [part] + current_len = len(part) + else: + current.append(part) + current_len += join_cost + len(part) + if current: + chunks.append(" OR ".join(current)) + return chunks + + +# --------------------------------------------------------------------- +# Per-request URL-byte budget +# --------------------------------------------------------------------- + + +def _effective_filter_budget( + args: dict[str, Any], + filter_expr: str, + build_request: Callable[..., Any], +) -> int: + """Compute the raw CQL byte budget for ``filter_expr`` in this request. + + The server limits total URL length (see ``_WATERDATA_URL_BYTE_LIMIT``), + not raw CQL length. To derive a raw-byte budget we can hand to + ``_chunk_cql_or``: + + 1. Probe the URL space consumed by the other query params by building + the request with a 1-byte placeholder filter. + 2. Subtract from the URL limit to get the bytes available for the + encoded filter value. + 3. Convert back to raw CQL bytes using the *maximum* per-clause + encoding ratio, not the whole-filter average. A chunk can end up + containing only the heavier-encoding clauses (e.g. heavy ones + clustered at one end of the filter), so budgeting against the + average lets such a chunk overflow the URL limit by a few bytes. + """ + # Fast path: if the whole encoded filter already fits with room for + # any plausible non-filter URL overhead, skip the probe and the + # splitter entirely. Signals pass-through via a budget larger than + # the filter. Saves a PreparedRequest build + a full splitter scan + # on every short-filter call. + encoded_len = len(quote_plus(filter_expr)) + if encoded_len + _NON_FILTER_URL_HEADROOM <= _WATERDATA_URL_BYTE_LIMIT: + return len(filter_expr) + 1 + + probe = build_request(**{**args, "filter": "x"}) + non_filter_url_bytes = len(probe.url) - 1 + available_url_bytes = _WATERDATA_URL_BYTE_LIMIT - non_filter_url_bytes + if available_url_bytes <= 0: + # The non-filter URL already exceeds the byte limit, so no chunk + # we could produce would fit. Return a budget larger than the + # filter so _chunk_cql_or passes it through unchanged — one 414 + # from the server is clearer than a burst of N failing sub-requests. + return len(filter_expr) + 1 + # ``_split_top_level_or`` already drops empty clauses, and + # ``filter_expr`` is non-empty here (guarded by ``_is_chunkable``), + # so every ``p`` below is non-empty. + parts = _split_top_level_or(filter_expr) or [filter_expr] + encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts) + return max(100, int(available_url_bytes / encoding_ratio)) + + +# --------------------------------------------------------------------- +# Lexicographic-pitfall guard +# --------------------------------------------------------------------- + + +def _check_numeric_filter_pitfall(filter_expr: str) -> None: + """Raise if the filter pairs any field with an unquoted numeric literal. + + Every queryable property on this API is typed as a string on the + server, so any numeric-looking comparison — ``value >= 1000``, + ``parameter_code = 60``, ``parameter_code IN (60, 61)``, + ``value BETWEEN 5 AND 10`` — either gets rejected with HTTP 500 + or silently produces lexicographic results. Zero-padded codes are + especially nasty (``parameter_code = '60'`` matches nothing because + the real codes are ``'00060'``-shaped). + + Explicit string comparisons with quoted literals + (``value >= '1000'``) are not flagged — the caller has signalled + they know the column is textual. + """ + # Blank out single-quoted string literals so ``name = 'value > 5'`` + # doesn't false-positive. The ``"'" in`` pre-check saves the + # allocation on the common auto-chunked case (many-target OR chains + # always contain quotes, but short ad-hoc filters often don't). + masked = ( + re.sub(r"'[^']*'", "''", filter_expr) if "'" in filter_expr else filter_expr + ) + + compare = _NUMERIC_COMPARE_RE.search(masked) + if compare: + field = compare.group("field1") or compare.group("field2") + offense = ( + f"{field} {compare.group('op1') or compare.group('op2')} " + f"{compare.group('num1') or compare.group('num2')}" + ) + _raise_pitfall(field, offense) + + membership = _IN_NUMERIC_RE.search(masked) + if membership: + field = membership.group("field") + op = "NOT IN" if membership.group("negated") else "IN" + _raise_pitfall(field, f"{field} {op} (…)") + + between = _BETWEEN_NUMERIC_RE.search(masked) + if between: + field = between.group("field") + op = "NOT BETWEEN" if between.group("negated") else "BETWEEN" + _raise_pitfall(field, f"{field} {op} …") + + +def _raise_pitfall(field: str, offense: str) -> None: + raise ValueError( + f"Filter uses an unquoted numeric comparison against {field!r} " + f"(``{offense}``). Every queryable on the Water Data API is " + f"typed as a string, so the server rejects unquoted numeric " + f"literals with HTTP 500; even quoting the literal gives a " + f"lexicographic comparison (``value > '10'`` matches " + f"``value='34.52'``, ``parameter_code = '60'`` matches nothing " + f"because the real codes are ``'00060'``-shaped). For a true " + f"numeric filter, fetch a wider result and reduce in pandas." + ) + + +# --------------------------------------------------------------------- +# Chunked fan-out (decorator) +# --------------------------------------------------------------------- + + +def _is_chunkable(filter_expr: Any, filter_lang: Any) -> bool: + """Only non-empty cql-text filters can be safely split at top-level OR.""" + return ( + isinstance(filter_expr, str) + and bool(filter_expr) + and filter_lang in {None, "cql-text"} + ) + + +def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: + """Concatenate per-chunk frames, handling the edge cases. + + Drops empty frames before concat — ``_get_resp_data`` returns a + plain ``pd.DataFrame()`` on empty responses, which would downgrade + a concat of real GeoDataFrames back to a plain DataFrame and strip + geometry/CRS. Also dedups on the pre-rename feature ``id`` so + overlapping user-supplied OR-clauses don't produce duplicate rows + across chunks. + """ + non_empty = [f for f in frames if not f.empty] + if not non_empty: + return pd.DataFrame() + if len(non_empty) == 1: + return non_empty[0] + combined = pd.concat(non_empty, ignore_index=True) + if "id" in combined.columns: + combined = combined.drop_duplicates(subset="id", ignore_index=True) + return combined + + +def _aggregate_chunk_responses( + responses: list[requests.Response], +) -> requests.Response: + """Return a response whose URL/headers come from the first chunk and + whose ``elapsed`` is the sum across all chunks. + + Mutates the first response in place (adjusting only ``elapsed``) so + the caller can still wrap it in ``BaseMetadata`` as it would any + single-request response — the decorator's output shape matches the + undecorated function's output shape. + """ + metadata_response = responses[0] + if len(responses) > 1: + metadata_response.elapsed = sum( + (r.elapsed for r in responses[1:]), + start=metadata_response.elapsed, + ) + return metadata_response + + +_FetchOnce = TypeVar( + "_FetchOnce", + bound=Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]], +) + + +def chunked(*, build_request: Callable[..., Any]) -> Callable[[_FetchOnce], _FetchOnce]: + """Decorator that adds CQL-filter chunking to a single-request fetch. + + The wrapped function must have signature + ``(args: dict) -> (pd.DataFrame, requests.Response)`` and represent + one HTTP round-trip (build a request, walk its pages). The + decorator inspects ``args``: + + - If no chunkable filter is present, it calls the wrapped function + once and returns the result unchanged. + - If a chunkable cql-text filter is present, it validates the + filter against the lexicographic-comparison pitfall, splits it + into URL-length-safe sub-expressions, calls the wrapped function + once per chunk with ``{**args, "filter": chunk}``, concatenates + the resulting frames (dropping empties, dedup'ing by feature + ``id``), and returns an aggregated response (first chunk's + URL/headers + summed ``elapsed``). + + Either way the return type matches the wrapped function's — the + caller wraps the response in ``BaseMetadata`` the same way in + both paths. That's what lets the feature be removed by dropping + just the decorator line. + + ``build_request`` is injected so the decorator can probe URL + length for budget computation without importing any specific HTTP + builder. It receives the same kwargs the wrapped function's + ``args`` would, and returns a prepared-request-like object with a + ``.url`` attribute. + """ + + def decorator(fetch_once: _FetchOnce) -> _FetchOnce: + @functools.wraps(fetch_once) + def wrapper( + args: dict[str, Any], + ) -> tuple[pd.DataFrame, requests.Response]: + filter_expr = args.get("filter") + if not _is_chunkable(filter_expr, args.get("filter_lang")): + return fetch_once(args) + + _check_numeric_filter_pitfall(filter_expr) + budget = _effective_filter_budget(args, filter_expr, build_request) + chunks = _chunk_cql_or(filter_expr, max_len=budget) + + frames: list[pd.DataFrame] = [] + responses: list[requests.Response] = [] + for chunk in chunks: + frame, response = fetch_once({**args, "filter": chunk}) + frames.append(frame) + responses.append(response) + + return _combine_chunk_frames(frames), _aggregate_chunk_responses(responses) + + return wrapper # type: ignore[return-value] + + return decorator diff --git a/dataretrieval/waterdata/nearest.py b/dataretrieval/waterdata/nearest.py new file mode 100644 index 00000000..904eb40f --- /dev/null +++ b/dataretrieval/waterdata/nearest.py @@ -0,0 +1,263 @@ +"""Nearest-timestamp convenience on top of ``get_continuous``. + +This module exists purely for isolation: ``get_nearest_continuous`` +is built on top of the CQL ``filter`` passthrough (see +``dataretrieval/waterdata/filters.py``) and has no meaning without +it, so the two features live in two sibling modules that can be +deleted together. + +Rolling back the filter feature: + +- Delete ``dataretrieval/waterdata/filters.py``, + ``dataretrieval/waterdata/nearest.py``, and their test files. +- Drop the ``FILTER_LANG`` and ``get_nearest_continuous`` imports + from ``waterdata/__init__.py`` (two lines). +- Drop the ``filter`` / ``filter_lang`` kwargs from the eight OGC + getters in ``api.py``. + +Only one name is imported from this module — ``get_nearest_continuous`` +— and that import sits in ``__init__.py``. Everything else is +package-private. +""" + +from __future__ import annotations + +from typing import Literal + +import pandas as pd + +from dataretrieval.utils import BaseMetadata +from dataretrieval.waterdata.api import get_continuous + + +def get_nearest_continuous( + targets, + monitoring_location_id: str | list[str] | None = None, + parameter_code: str | list[str] | None = None, + *, + window: str | pd.Timedelta = "PT7M30S", + on_tie: Literal["first", "last", "mean"] = "first", + **kwargs, +) -> tuple[pd.DataFrame, BaseMetadata]: + """For each target timestamp, return the nearest continuous observation. + + Builds one bracketed ``(time >= t-window AND time <= t+window)`` clause + per target, joins them as a top-level CQL ``OR`` filter, and lets + ``get_continuous`` (with its auto-chunking) fetch every observation + that falls in any window. Then, per ``(monitoring_location_id, target)`` + pair, picks the single observation with the smallest ``|time - target|``. + + The USGS continuous endpoint matches ``time`` parameters exactly rather + than fuzzily, and it does not implement ``sortby`` for arbitrary fields; + this function is the single-round-trip way to ask "what reading is + nearest this timestamp?" for many timestamps at once. + + Parameters + ---------- + targets : list-like of datetime-convertible + Target timestamps. Naive datetimes are treated as UTC. Accepts a + list, ``pandas.Series``, ``pandas.DatetimeIndex``, ``numpy.ndarray``, + or anything ``pandas.to_datetime`` consumes. + monitoring_location_id : string or list of strings, optional + Forwarded to ``get_continuous``. + parameter_code : string or list of strings, optional + Forwarded to ``get_continuous``. + window : string or ``pandas.Timedelta``, default ``"PT7M30S"`` + Half-window around each target, as an ISO 8601 duration + (``"PT7M30S"``, ``"PT15M"``, ``"PT1H"``, etc.). Also accepts + any other form ``pandas.Timedelta`` parses — ``HH:MM:SS`` + (``"00:07:30"``), pandas shorthand (``"7min30s"``, + ``"450s"``), or a ``pd.Timedelta`` directly. See the + `pandas.Timedelta docs + `_ + for the full grammar. + + Must be small enough that every target's window captures + roughly one observation at the service cadence. The default + matches a 15-minute continuous gauge; widen (e.g. + ``"PT15M"``) for irregular cadences or resilience to data + gaps. + on_tie : {"first", "last", "mean"}, default ``"first"`` + How to resolve ties when two observations are exactly equidistant + from a target (which happens when the target falls at the midpoint + between grid points — e.g. target ``10:22:30`` for a 15-minute + gauge). + + - ``"first"``: keep the earlier observation. + - ``"last"``: keep the later observation. + - ``"mean"``: average numeric columns; set the ``time`` column to + the target, since no real observation exists at the midpoint. + + **kwargs + Additional keyword arguments forwarded to ``get_continuous`` + (e.g. ``statistic_id``, ``approval_status``, ``properties``). + Passing ``time``, ``filter``, or ``filter_lang`` raises + ``TypeError`` — this function builds those itself. + + Returns + ------- + df : ``pandas.DataFrame`` + One row per ``(target, monitoring_location_id)`` combination that + had at least one observation in its window. Rows are augmented + with a ``target_time`` column indicating which target they + correspond to. Targets with no observations in their window are + silently dropped. + md : :class:`~dataretrieval.utils.BaseMetadata` + Metadata from the underlying ``get_continuous`` call. + + Notes + ----- + *Window sizing and ties.* When ``window`` is exactly half the service + cadence, most targets' windows contain a single observation and + ``on_tie`` is moot. Ties arise only when a target sits exactly at the + window edge — rare in practice but possible. Setting ``window`` to a + full cadence (or larger) guarantees at least one observation per + target in steady state at the cost of more bytes per response. + + *Why windowed CQL rather than sort+limit.* The API's advertised + ``sortby`` parameter would make this a one-liner per target (``filter`` + by ``time <= t`` and ``limit 1``), but it is per-query — you would need + one HTTP round-trip per target. The CQL ``OR``-chain approach folds + all N targets into one request (auto-chunked when the URL is long). + + Examples + -------- + .. code:: + + >>> import pandas as pd + >>> from dataretrieval import waterdata + + >>> # Pair three off-grid timestamps with nearby observations + >>> targets = pd.to_datetime( + ... [ + ... "2023-06-15T10:30:31Z", + ... "2023-06-15T14:07:12Z", + ... "2023-06-16T03:45:19Z", + ... ] + ... ) + >>> df, md = waterdata.get_nearest_continuous( + ... targets, + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... ) + + >>> # Widen the window for an irregular-cadence gauge + >>> df, md = waterdata.get_nearest_continuous( + ... targets, + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... window="PT30M", + ... on_tie="mean", + ... ) + """ + _check_nearest_kwargs(kwargs, on_tie) + targets = pd.DatetimeIndex(pd.to_datetime(targets, utc=True)) + window_td = pd.Timedelta(window) + + if len(targets) == 0: + raise ValueError("targets must contain at least one timestamp") + + filter_expr = _build_window_or_filter(targets, window_td) + df, md = get_continuous( + monitoring_location_id=monitoring_location_id, + parameter_code=parameter_code, + filter=filter_expr, + filter_lang="cql-text", + **kwargs, + ) + if df.empty: + return _empty_nearest_result(df), md + + df = df.assign(time=pd.to_datetime(df["time"], utc=True)) + site_groups = ( + df.groupby("monitoring_location_id", sort=False) + if "monitoring_location_id" in df.columns + else [(None, df)] + ) + + selected = [ + row + for _, site_df in site_groups + for target in targets + if (row := _pick_nearest_row(site_df, target, window_td, on_tie)) is not None + ] + if not selected: + return _empty_nearest_result(df), md + return pd.DataFrame(selected).reset_index(drop=True), md + + +_VALID_ON_TIE = ("first", "last", "mean") + + +def _check_nearest_kwargs(kwargs: dict, on_tie: str) -> None: + """Reject kwargs the helper owns; validate ``on_tie``.""" + for forbidden in ("time", "filter", "filter_lang"): + if forbidden in kwargs: + raise TypeError( + f"get_nearest_continuous constructs its own {forbidden!r}; " + "do not pass it directly" + ) + if on_tie not in _VALID_ON_TIE: + raise ValueError(f"on_tie must be one of {_VALID_ON_TIE}; got {on_tie!r}") + + +def _build_window_or_filter(targets: pd.DatetimeIndex, window_td: pd.Timedelta) -> str: + """Build the CQL OR-chain of ``time >= ... AND time <= ...`` windows. + + ``get_continuous`` auto-chunks the result if the full URL would + exceed the server's length limit, so this is always safe to build + as one string even for many targets. + """ + return " OR ".join( + f"(time >= '{(t - window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}' " + f"AND time <= '{(t + window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}')" + for t in targets + ) + + +def _pick_nearest_row( + site_df: pd.DataFrame, + target: pd.Timestamp, + window_td: pd.Timedelta, + on_tie: str, +) -> pd.Series | None: + """Return the single row within ``window_td`` of ``target``, or ``None``. + + Resolves ties (two rows equidistant from ``target``) per ``on_tie``. + The returned row carries a ``target_time`` column identifying which + target it was selected for. + """ + in_window = site_df[ + (site_df["time"] >= target - window_td) + & (site_df["time"] <= target + window_td) + ] + if in_window.empty: + return None + deltas = (in_window["time"] - target).abs() + candidates = in_window[deltas == deltas.min()].sort_values("time") + + if len(candidates) == 1 or on_tie == "first": + row = candidates.iloc[0].copy() + elif on_tie == "last": + row = candidates.iloc[-1].copy() + else: # "mean" — synthesize a row whose numeric cols are averaged and + # whose ``time`` is the target (no real observation sits at the midpoint). + row = candidates.iloc[0].copy() + for col in candidates.select_dtypes("number").columns: + row[col] = candidates[col].mean() + row["time"] = target + + row["target_time"] = target + return row + + +def _empty_nearest_result(template: pd.DataFrame | None = None) -> pd.DataFrame: + """Empty frame with a ``target_time`` column, for no-match cases. + + When ``template`` is provided, preserve its columns/dtypes so the + returned frame matches the shape of a real ``get_continuous`` + response. + """ + base = pd.DataFrame() if template is None else template.iloc[0:0].copy() + base["target_time"] = pd.Series(dtype="datetime64[ns, UTC]") + return base diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index c58148d5..7070da50 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -13,6 +13,7 @@ from dataretrieval import __version__ from dataretrieval.utils import BaseMetadata +from dataretrieval.waterdata import filters from dataretrieval.waterdata.types import ( PROFILE_LOOKUP, PROFILES, @@ -419,6 +420,12 @@ def _construct_api_requests( if properties: params["properties"] = ",".join(properties) + # Translate CQL filter Python names to the hyphenated URL parameter that + # the OGC API expects. The Python kwarg is `filter_lang` because hyphens + # aren't valid in Python identifiers. + if "filter_lang" in params: + params["filter-lang"] = params.pop("filter_lang") + headers = _default_headers() if POST: @@ -817,34 +824,41 @@ def get_ogc_data( - Applies column cleanup and reordering based on service and properties. """ args = args.copy() - # Add service as an argument args["service"] = service - # Switch the input id to "id" if needed args = _switch_arg_id(args, id_name=output_id, service=service) + # Capture `properties` before the id-switch so post-processing sees + # the user-facing names, not the wire-format ones. properties = args.get("properties") - # Switch properties id to "id" if needed args["properties"] = _switch_properties_id( properties, id_name=output_id, service=service ) convert_type = args.pop("convert_type", False) - # Create fresh dictionary of args without any None values args = {k: v for k, v in args.items() if v is not None} - # Build API request - req = _construct_api_requests(**args) - # Run API request and iterate through pages if needed - return_list, response = _walk_pages(geopd=GEOPANDAS, req=req) - # Manage some aspects of the returned dataset - return_list = _deal_with_empty(return_list, properties, service) + return_list, response = _fetch_once(args) + return_list = _deal_with_empty(return_list, properties, service) if convert_type: return_list = _type_cols(return_list) - return_list = _arrange_cols(return_list, properties, output_id) - return_list = _sort_rows(return_list) - # Create metadata object from response - metadata = BaseMetadata(response) - return return_list, metadata + + return return_list, BaseMetadata(response) + + +@filters.chunked(build_request=_construct_api_requests) +def _fetch_once( + args: dict[str, Any], +) -> tuple[pd.DataFrame, requests.Response]: + """Send one prepared-args OGC request; return the frame + response. + + Filter chunking is added orthogonally by the ``@filters.chunked`` + decorator: with no filter (or an un-chunkable one) the decorator + passes ``args`` through to this body; with a chunkable filter it + fans out and calls this body once per sub-filter, then combines. + Either way the return shape is ``(frame, response)``. + """ + req = _construct_api_requests(**args) + return _walk_pages(geopd=GEOPANDAS, req=req) def _handle_stats_nesting( diff --git a/tests/waterdata_filters_test.py b/tests/waterdata_filters_test.py new file mode 100644 index 00000000..21eb6c1b --- /dev/null +++ b/tests/waterdata_filters_test.py @@ -0,0 +1,589 @@ +from datetime import timedelta +from types import SimpleNamespace +from unittest import mock +from urllib.parse import parse_qs, urlsplit + +import pandas as pd +import pytest + +from dataretrieval.waterdata.filters import ( + _CQL_FILTER_CHUNK_LEN, + _WATERDATA_URL_BYTE_LIMIT, + _check_numeric_filter_pitfall, + _chunk_cql_or, + _effective_filter_budget, + _split_top_level_or, +) +from dataretrieval.waterdata.utils import _construct_api_requests + + +def _query_params(prepared_request): + return parse_qs(urlsplit(prepared_request.url).query) + + +def _fake_prepared_request(url="https://example.test"): + """Stand-in for the object ``_construct_api_requests`` returns.""" + return SimpleNamespace(url=url, method="GET", headers={}) + + +def _fake_response(url="https://example.test", elapsed_ms=1): + """Stand-in for the response object ``_walk_pages`` returns.""" + return SimpleNamespace( + url=url, + elapsed=timedelta(milliseconds=elapsed_ms), + headers={}, + ) + + +def _build_request(**kwargs): + """Wrapper that matches the ``build_request`` callable shape.""" + return _construct_api_requests(**kwargs) + + +def test_construct_filter_passthrough(): + """`filter` is forwarded verbatim as a query parameter.""" + expr = ( + "(time >= '2023-01-06T16:00:00Z' AND time <= '2023-01-06T18:00:00Z') " + "OR (time >= '2023-01-10T18:00:00Z' AND time <= '2023-01-10T20:00:00Z')" + ) + req = _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + ) + qs = _query_params(req) + assert qs["filter"] == [expr] + + +def test_construct_filter_lang_hyphenated(): + """The Python kwarg `filter_lang` is sent as URL key `filter-lang`.""" + req = _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter="time >= '2023-01-01T00:00:00Z'", + filter_lang="cql-text", + ) + qs = _query_params(req) + assert qs["filter-lang"] == ["cql-text"] + assert "filter_lang" not in qs + + +def test_split_top_level_or_simple(): + parts = _split_top_level_or("A OR B OR C") + assert parts == ["A", "B", "C"] + + +def test_split_top_level_or_case_insensitive(): + assert _split_top_level_or("A or B Or C") == ["A", "B", "C"] + + +def test_split_top_level_or_respects_parens(): + assert _split_top_level_or("(A OR B) OR (C OR D)") == ["(A OR B)", "(C OR D)"] + + +def test_split_top_level_or_respects_quotes(): + expr = "name = 'foo OR bar' OR id = 1" + assert _split_top_level_or(expr) == ["name = 'foo OR bar'", "id = 1"] + + +def test_split_top_level_or_handles_doubled_quote_escape(): + """CQL text escapes a single quote inside a literal as ``''``. The + two quotes are adjacent, so the scanner's naive toggle-on-quote logic + happens to land back in the correct state with nothing between the + toggles to misclassify. Lock that behavior in so a future refactor + can't regress it.""" + cases = [ + ("name = 'O''Reilly OR Co' OR id = 1", ["name = 'O''Reilly OR Co'", "id = 1"]), + ("name = 'It''s' OR id = 1", ["name = 'It''s'", "id = 1"]), + ( + "name = 'alpha ''or'' beta' OR id = 1", + ["name = 'alpha ''or'' beta'", "id = 1"], + ), + ("'x'' OR ''y' OR id = 1", ["'x'' OR ''y'", "id = 1"]), + ] + for expr, expected in cases: + assert _split_top_level_or(expr) == expected, expr + + +def test_split_top_level_or_single_clause(): + assert _split_top_level_or("time >= '2023-01-01T00:00:00Z'") == [ + "time >= '2023-01-01T00:00:00Z'" + ] + + +def test_chunk_cql_or_short_passthrough(): + expr = "time >= '2023-01-01T00:00:00Z'" + assert _chunk_cql_or(expr, max_len=1000) == [expr] + + +def test_chunk_cql_or_splits_into_multiple(): + clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr = " OR ".join([clause] * 200) + chunks = _chunk_cql_or(expr, max_len=1000) + # each chunk must be under the budget + assert all(len(c) <= 1000 for c in chunks) + # rejoined chunks must cover every clause + rejoined_clauses = sum(len(c.split(" OR ")) for c in chunks) + assert rejoined_clauses == 200 + # and must be a valid OR chain (each chunk is itself a top-level OR of clauses) + assert len(chunks) > 1 + + +def test_chunk_cql_or_unsplittable_returns_input(): + big = "value > 0 AND " + ("A " * 4000) + assert _chunk_cql_or(big, max_len=1000) == [big] + + +def test_chunk_cql_or_single_clause_over_budget_returns_input(): + huge_clause = "(value > " + "9" * 6000 + ")" + expr = f"{huge_clause} OR (value > 0)" + assert _chunk_cql_or(expr, max_len=1000) == [expr] + + +@pytest.mark.parametrize( + "service", + [ + "daily", + "continuous", + "monitoring-locations", + "time-series-metadata", + "latest-continuous", + "latest-daily", + "field-measurements", + "channel-measurements", + ], +) +def test_construct_filter_on_all_ogc_services(service): + """Filter passthrough works uniformly for every OGC collection endpoint.""" + req = _construct_api_requests( + service=service, + filter="value > 0", + filter_lang="cql-text", + ) + qs = _query_params(req) + assert qs["filter"] == ["value > 0"] + assert qs["filter-lang"] == ["cql-text"] + + +def test_long_filter_fans_out_into_multiple_requests(): + """An oversized top-level OR filter triggers multiple HTTP requests + whose results are concatenated.""" + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + assert len(expr) > _CQL_FILTER_CHUNK_LEN + + sent_filters = [] + + def fake_construct_api_requests(**kwargs): + sent_filters.append(kwargs.get("filter")) + return _fake_prepared_request() + + def fake_walk_pages(*_args, **_kwargs): + idx = len(sent_filters) + frame = pd.DataFrame({"id": [f"chunk-{idx}"], "value": [idx]}) + return frame, _fake_response() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + side_effect=fake_construct_api_requests, + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.filters._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # Mocking _effective_filter_budget bypasses the URL-length probe, so + # sent_filters contains only real chunk requests. Assert invariants: + # chunking happened, every original clause is preserved exactly once + # in order, each chunk stays under the budget, and the mock's + # one-row-per-chunk responses concatenate to a row per chunk. + expected_parts = _split_top_level_or(expr) + assert len(sent_filters) > 1 + rejoined_parts = [] + for chunk in sent_filters: + rejoined_parts.extend(_split_top_level_or(chunk)) + assert rejoined_parts == expected_parts + assert len(df) == len(sent_filters) + assert all(len(chunk) <= _CQL_FILTER_CHUNK_LEN for chunk in sent_filters) + + +def test_long_filter_deduplicates_cross_chunk_overlap(): + """Features returned by multiple chunks (same feature `id`) are + deduplicated in the concatenated result.""" + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + + call_count = {"n": 0} + + def fake_walk_pages(*_args, **_kwargs): + call_count["n"] += 1 + frame = pd.DataFrame({"id": ["shared-feature"], "value": [1]}) + return frame, _fake_response() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=_fake_prepared_request(), + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.filters._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # Chunking must have happened (otherwise dedup wouldn't be exercised). + assert call_count["n"] > 1 + # Even though each chunk returned a feature, dedup by id collapses them. + assert len(df) == 1 + + +def test_empty_chunks_do_not_downgrade_geodataframe(): + """A mix of empty and non-empty chunk responses must not downgrade a + GeoDataFrame-typed result to a plain DataFrame. ``_get_resp_data`` + returns ``pd.DataFrame()`` on empty responses, which would otherwise + strip geometry/CRS from the concatenated output.""" + pytest.importorskip("geopandas") + import geopandas as gpd + from shapely.geometry import Point + + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + + call_count = {"n": 0} + + def fake_walk_pages(*_args, **_kwargs): + call_count["n"] += 1 + # Chunk 2 returns empty; chunks 1 and 3 return GeoDataFrames. + if call_count["n"] == 2: + frame = pd.DataFrame() + else: + frame = gpd.GeoDataFrame( + {"id": [f"feat-{call_count['n']}"], "value": [call_count["n"]]}, + geometry=[Point(call_count["n"], call_count["n"])], + crs="EPSG:4326", + ) + return frame, _fake_response() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=_fake_prepared_request(), + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.filters._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # The empty chunk must not have stripped the GeoDataFrame type. + assert isinstance(df, gpd.GeoDataFrame) + assert "geometry" in df.columns + assert df.crs is not None + + +def test_effective_filter_budget_respects_url_limit(): + """The computed budget, once encoded, fits within the URL byte limit + alongside the other query params.""" + from urllib.parse import quote_plus + + filter_expr = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "parameter_code": "00060", + "filter": filter_expr, + "filter_lang": "cql-text", + } + raw_budget = _effective_filter_budget(args, filter_expr, _build_request) + + # Build a chunk exactly at the raw budget (padded with the clause repeated) + # and confirm the full URL it produces stays under the URL byte limit. + padded = (" OR ".join([filter_expr] * 200))[:raw_budget] + req = _construct_api_requests(**{**args, "filter": padded}) + assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT + # And the budget scales inversely with encoding ratio (sanity). + assert raw_budget < _WATERDATA_URL_BYTE_LIMIT + # Quick sanity on the encoding math itself. + assert len(quote_plus(padded)) <= _WATERDATA_URL_BYTE_LIMIT + + +def test_effective_filter_budget_uses_max_clause_ratio(): + """Heavy clauses clustered in one part of the filter must not be able + to push any chunk over the URL limit. The budget is computed against + the max per-clause encoding ratio, not the whole-filter average, so + a chunk of only-heaviest-clauses still fits.""" + from urllib.parse import quote_plus + + heavy = ( + "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z' " + "AND approval_status IN ('Approved','Provisional','Revised'))" + ) + light = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + # Heavy ratio < light ratio for these shapes; cluster them at opposite + # ends so the chunker must produce at least one light-only chunk. + clauses = [heavy] * 100 + [light] * 400 + expr = " OR ".join(clauses) + args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "filter": expr, + "filter_lang": "cql-text", + } + budget = _effective_filter_budget(args, expr, _build_request) + chunks = _chunk_cql_or(expr, max_len=budget) + assert len(chunks) > 1 + + # Every chunk, once built into a full request, fits under the URL byte + # limit — even the all-light chunks that have a higher-than-average ratio. + for chunk in chunks: + req = _construct_api_requests(**{**args, "filter": chunk}) + assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT, ( + f"chunk url {len(req.url)} exceeds {_WATERDATA_URL_BYTE_LIMIT}" + ) + + # Budget should be tight enough that a chunk of only-light clauses + # (the heavier-encoding shape here) still fits. + assert len(quote_plus(light)) * (budget // len(light)) < _WATERDATA_URL_BYTE_LIMIT + + +def test_effective_filter_budget_passes_through_when_no_url_space(): + """If the non-filter URL already exceeds the byte limit, chunking + cannot make the request succeed. The budget helper should signal + pass-through (return a budget larger than the filter) so + ``_chunk_cql_or`` emits one chunk — one 414 from the server is + clearer than a burst of N guaranteed-414 sub-requests.""" + expr = " OR ".join( + ["(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')"] * 50 + ) + fake_build = mock.Mock( + return_value=_fake_prepared_request(url="https://example.test/" + "A" * 9000) + ) + budget = _effective_filter_budget({"filter": expr}, expr, fake_build) + # Budget is large enough that _chunk_cql_or returns the expression + # unchanged (passthrough) rather than producing many small chunks. + assert budget > len(expr) + assert _chunk_cql_or(expr, max_len=budget) == [expr] + + +def test_effective_filter_budget_shrinks_with_more_url_params(): + """Adding more scalar query params consumes URL bytes and should + shrink the raw filter budget accordingly. Use a filter large enough + to skip the short-circuit fast path so the probe actually runs.""" + clause = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + expr = " OR ".join([clause] * 100) + sparse_args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "filter": expr, + "filter_lang": "cql-text", + } + dense_args = { + **sparse_args, + "parameter_code": "00060", + "statistic_id": "00003", + "last_modified": "2023-01-01T00:00:00Z/2023-12-31T23:59:59Z", + } + sparse_budget = _effective_filter_budget(sparse_args, expr, _build_request) + dense_budget = _effective_filter_budget(dense_args, expr, _build_request) + assert dense_budget < sparse_budget + + +def test_cql_json_filter_is_not_chunked(): + """Chunking applies only to cql-text; cql-json is passed through unchanged.""" + from dataretrieval.waterdata import get_continuous + + clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr = " OR ".join([clause] * 300) + sent_filters = [] + + def fake_construct_api_requests(**kwargs): + sent_filters.append(kwargs.get("filter")) + return _fake_prepared_request() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + side_effect=fake_construct_api_requests, + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", + return_value=( + pd.DataFrame({"id": ["row-1"], "value": [1]}), + _fake_response(), + ), + ): + get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-json", + ) + + assert sent_filters == [expr] + + +@pytest.mark.parametrize( + "expr", + [ + # The motivating case — numeric-valued string field + "value >= 1000", + "value > 1000", + "value <= 1000", + "value < 1000", + "value = 1000", + "value != 1000", + "value >= 1000.5", + "value >= -50", + # Zero-padded codes: `parameter_code = 60` matches nothing + # because the real values are all `'00060'`-shaped + "parameter_code = 60", + "statistic_id = 11", + "district_code = 1", + "county_code != 0", + "hydrologic_unit_code = 20301030401", + # Channel-measurements numeric-looking string fields + "channel_flow > 500", + "channel_velocity >= 1.5", + # Scientific notation — floats expressed as 1e5, 1.5e-3 + "value > 1e5", + "value >= 2.5E+3", + "value < 1.5e-3", + # Leading-dot decimals (``.5`` is a fraction, not a typo) + "value > .5", + "value >= -.5", + "value < .5e-3", + # ``IN`` list form — same footgun, common pattern for codes + "parameter_code IN (60, 61)", + "value IN (10, 20, 30)", + "statistic_id in (11)", # case-insensitive, single-element + # ``NOT IN`` with numbers — same footgun via negation + "value NOT IN (1, 2, 3)", + "parameter_code not in (60, 61)", + # ``BETWEEN`` range form — same footgun + "value BETWEEN 5 AND 10", + "channel_flow between 100 and 500", + # ``NOT BETWEEN`` with numbers + "value NOT BETWEEN 0 AND 100", + "channel_flow not between 50 and 150", + # Composite expressions + "time >= '2023-01-01T00:00:00Z' AND value >= 1000", + "value > 1000 OR value < 0", + "parameter_code = 60 AND statistic_id = 11", + # Reverse (literal on the left) + "1000 <= value", + "60 = parameter_code", + ], +) +def test_check_numeric_filter_pitfall_raises(expr): + """Unquoted numeric comparisons against any field resolve + lexicographically on this API — every queryable is string-typed — + so reject them with a clear message before the request is sent.""" + with pytest.raises(ValueError, match="lexicographic"): + _check_numeric_filter_pitfall(expr) + + +@pytest.mark.parametrize( + "expr", + [ + # Quoted literals — caller has opted into string comparison + "value >= '1000'", + "value = '42.5'", + "parameter_code = '00060'", + "district_code = '01'", + "hydrologic_unit_code = '020301030401'", + # Pure string comparisons + "time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-02T00:00:00Z'", + "monitoring_location_id = 'USGS-02238500'", + "approval_status = 'Approved'", + "qualifier IN ('A', 'P')", + "parameter_code IN ('00060', '00065')", + "value BETWEEN '1' AND '9'", + # Footgun identifiers appearing only inside string literals + "monitoring_location_id = 'USGS-value >= 1000'", + "name = 'why I care about parameter_code = 60'", + "note = 'see district_code = 1 in docs'", + "note = 'quoted: value IN (10, 20) within literal'", + # Multi-clause where every comparison is quoted + "parameter_code = '00060' AND statistic_id = '00011'", + # CQL escape-quote (``O''Reilly``) within a quoted literal + "name = 'O''Reilly 1000'", + # Identifiers that start with "NOT" (e.g. ``NOTES``) must not be + # mistakenly treated as the CQL negation keyword + "NOTES = 'hello'", + "NOTE_VAL LIKE 'anything%'", + ], +) +def test_check_numeric_filter_pitfall_allows(expr): + """Quoted literals and comparisons that don't pair a field with an + unquoted numeric literal must not trigger the check.""" + _check_numeric_filter_pitfall(expr) # must not raise + + +@pytest.mark.parametrize( + "expr,field,op", + [ + ("value NOT IN (1, 2)", "value", "NOT IN"), + ("parameter_code NOT IN (60, 61)", "parameter_code", "NOT IN"), + ("value IN (1, 2)", "value", "IN"), + ("value NOT BETWEEN 0 AND 10", "value", "NOT BETWEEN"), + ("channel_flow between 100 and 500", "channel_flow", "BETWEEN"), + ], +) +def test_pitfall_error_names_real_field_not_NOT_keyword(expr, field, op): + """The CQL keyword ``NOT`` must not be reported as the offending field + — the error should identify the actual column and include ``NOT`` as + part of the operator form so the caller knows what to quote.""" + with pytest.raises(ValueError) as exc: + _check_numeric_filter_pitfall(expr) + msg = str(exc.value) + assert f"against {field!r}" in msg, msg + assert op.upper() in msg.upper(), msg + + +def test_get_continuous_surfaces_pitfall_to_caller(): + """End-to-end: the check runs at the ``get_continuous`` boundary, + not as a deep internal-only protection, so callers see the error + before any HTTP traffic.""" + from dataretrieval.waterdata import get_continuous + + with mock.patch("dataretrieval.waterdata.utils._construct_api_requests") as build: + with pytest.raises(ValueError, match="lexicographic"): + get_continuous( + monitoring_location_id="USGS-02238500", + parameter_code="00060", + filter="value >= 1000", + filter_lang="cql-text", + ) + build.assert_not_called() diff --git a/tests/waterdata_nearest_test.py b/tests/waterdata_nearest_test.py new file mode 100644 index 00000000..4dc0ab9d --- /dev/null +++ b/tests/waterdata_nearest_test.py @@ -0,0 +1,267 @@ +"""Tests for ``waterdata.get_nearest_continuous``. + +All network interaction is mocked at the ``get_continuous`` boundary, so +these run without an API key and without touching the USGS servers. +""" + +from unittest import mock + +import pandas as pd +import pytest + +from dataretrieval.waterdata.nearest import get_nearest_continuous + + +def _fake_df(rows): + """Build a minimal continuous-response-shaped DataFrame.""" + return pd.DataFrame( + { + "time": pd.to_datetime([r["time"] for r in rows], utc=True), + "value": [r["value"] for r in rows], + "monitoring_location_id": [r.get("site", "USGS-02238500") for r in rows], + } + ) + + +@pytest.fixture +def patch_get_continuous(): + """Replace ``waterdata.api.get_continuous`` with a controllable stub.""" + with mock.patch("dataretrieval.waterdata.nearest.get_continuous") as m: + yield m + + +def test_returns_nearest_per_target(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:31Z", "2023-06-15T10:45:16Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + {"time": "2023-06-15T10:45:00Z", "value": 22.5}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + parameter_code="00060", + ) + assert len(result) == 2 + assert list(result["value"]) == [22.4, 22.5] + assert list(result["target_time"]) == list(targets) + + +def test_builds_one_or_clause_per_target(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:00Z", "2023-06-16T12:00:00Z"], utc=True) + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + parameter_code="00060", + window="PT7M30S", + ) + _, kwargs = patch_get_continuous.call_args + filter_expr = kwargs["filter"] + assert kwargs["filter_lang"] == "cql-text" + # Two windows — one top-level OR separator + assert filter_expr.count(") OR (") == 1 + # Each target produces >= and <= bounds + assert filter_expr.count("time >= '") == 2 + assert filter_expr.count("time <= '") == 2 + # Lower bound of the first window is 7:30 before the target + assert "'2023-06-15T10:22:30Z'" in filter_expr + assert "'2023-06-15T10:37:30Z'" in filter_expr + + +def test_tie_first_keeps_earlier(patch_get_continuous): + # Target at the midpoint between two grid points + targets = pd.to_datetime(["2023-06-15T10:22:30Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:15:00Z", "value": 22.0}, + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + on_tie="first", + window="PT7M30S", + ) + assert len(result) == 1 + assert result.iloc[0]["value"] == 22.0 + assert result.iloc[0]["time"] == pd.Timestamp("2023-06-15T10:15:00Z") + + +def test_tie_last_keeps_later(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:22:30Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:15:00Z", "value": 22.0}, + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + on_tie="last", + window="PT7M30S", + ) + assert result.iloc[0]["value"] == 22.4 + assert result.iloc[0]["time"] == pd.Timestamp("2023-06-15T10:30:00Z") + + +def test_tie_mean_averages_numeric_and_uses_target_time(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:22:30Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:15:00Z", "value": 22.0}, + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + on_tie="mean", + window="PT7M30S", + ) + assert result.iloc[0]["value"] == pytest.approx(22.2) + # Time is set to the target since no real observation sits at the midpoint + assert result.iloc[0]["time"] == targets[0] + + +def test_target_without_observations_is_dropped(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:31Z", "2023-07-15T10:30:31Z"], utc=True) + # Only the June target has nearby data; July returns nothing. + patch_get_continuous.return_value = ( + _fake_df([{"time": "2023-06-15T10:30:00Z", "value": 22.4}]), + mock.Mock(), + ) + result, _ = get_nearest_continuous(targets, monitoring_location_id="USGS-02238500") + assert len(result) == 1 + assert result.iloc[0]["target_time"] == targets[0] + + +def test_multi_site_returns_row_per_target_per_site(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:31Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:30:00Z", "value": 22.4, "site": "USGS-1"}, + {"time": "2023-06-15T10:30:00Z", "value": 99.9, "site": "USGS-2"}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id=["USGS-1", "USGS-2"], + parameter_code="00060", + ) + assert len(result) == 2 + assert set(result["monitoring_location_id"]) == {"USGS-1", "USGS-2"} + + +def test_empty_targets_raises(patch_get_continuous): + """An empty ``targets`` is a call with no useful work to do and + almost always a caller bug — raise rather than silently issuing a + no-op HTTP request.""" + with pytest.raises(ValueError, match="targets"): + get_nearest_continuous([], monitoring_location_id="USGS-02238500") + patch_get_continuous.assert_not_called() + + +def test_rejects_time_kwarg(patch_get_continuous): + with pytest.raises(TypeError, match="time"): + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + time="2023-06-01/2023-07-01", + ) + + +def test_rejects_filter_kwarg(patch_get_continuous): + with pytest.raises(TypeError, match="filter"): + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + filter="x = 1", + ) + + +def test_rejects_invalid_on_tie(patch_get_continuous): + with pytest.raises(ValueError, match="on_tie"): + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + on_tie="random", + ) + + +def test_accepts_naive_datetimes_as_utc(patch_get_continuous): + """Naive inputs must be treated as UTC (matching pandas default).""" + naive = [pd.Timestamp("2023-06-15T10:30:00")] + patch_get_continuous.return_value = ( + _fake_df([{"time": "2023-06-15T10:30:00Z", "value": 22.4}]), + mock.Mock(), + ) + result, _ = get_nearest_continuous(naive, monitoring_location_id="USGS-02238500") + assert len(result) == 1 + + +def test_accepts_list_of_strings(patch_get_continuous): + patch_get_continuous.return_value = ( + _fake_df([{"time": "2023-06-15T10:30:00Z", "value": 22.4}]), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + ["2023-06-15T10:30:31Z"], monitoring_location_id="USGS-02238500" + ) + assert len(result) == 1 + + +@pytest.mark.parametrize( + "window", + [ + "00:07:30", # HH:MM:SS + "7min30s", # pandas shorthand + "450s", # seconds shorthand + "PT7M30S", # ISO 8601 duration + pd.Timedelta(minutes=7, seconds=30), # Timedelta object + ], +) +def test_window_accepts_any_pandas_timedelta_form(patch_get_continuous, window): + """Every representation ``pandas.Timedelta`` parses must produce the + same CQL filter. Documents the public contract: ``window`` is + whatever ``pd.Timedelta(window)`` returns.""" + targets = pd.to_datetime(["2023-06-15T10:30:00Z"], utc=True) + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + + get_nearest_continuous(targets, monitoring_location_id="USGS-1", window=window) + filter_expr = patch_get_continuous.call_args.kwargs["filter"] + # Bounds are 7:30 away from the target regardless of input spelling + assert "'2023-06-15T10:22:30Z'" in filter_expr + assert "'2023-06-15T10:37:30Z'" in filter_expr + + +def test_forwards_kwargs_to_get_continuous(patch_get_continuous): + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + parameter_code="00060", + statistic_id="00011", + approval_status="Approved", + ) + _, kwargs = patch_get_continuous.call_args + assert kwargs["statistic_id"] == "00011" + assert kwargs["approval_status"] == "Approved" diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 772f75b7..36150be8 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -2,7 +2,10 @@ import requests -from dataretrieval.waterdata.utils import _get_args, _walk_pages +from dataretrieval.waterdata.utils import ( + _get_args, + _walk_pages, +) def test_get_args_basic():