Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export(read_waterdata_latest_continuous)
export(read_waterdata_latest_daily)
export(read_waterdata_metadata)
export(read_waterdata_monitoring_location)
export(read_waterdata_nearest_continuous)
export(read_waterdata_parameter_codes)
export(read_waterdata_samples)
export(read_waterdata_stats_daterange)
Expand Down
15 changes: 15 additions & 0 deletions NEWS
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
dataRetrieval 2.7.24
===================
* Let dataRetrieval take care of chunking up requests by monitoring_location_id.
* Added `read_waterdata_nearest_continuous()` which, given a vector of
target timestamps, returns the single continuous observation nearest
each target, fetched in one HTTP round-trip (auto-chunked when the
underlying CQL filter gets long). Knobs include `window` (half-window
around each target, default 7.5 minutes to match the 15-minute
continuous cadence) and `on_tie` (`"first"` / `"last"` / `"mean"`).
* Added `filter` and `filter_lang` arguments to the OGC `read_waterdata_*`
functions (continuous, daily, field_measurements, monitoring_location,
ts_meta, latest_continuous, latest_daily, channel). These are forwarded
to the OGC API's CQL `filter` / `filter-lang` query parameters, enabling
server-side filtering that isn't expressible via the other arguments --
most commonly, OR'ing multiple time ranges into a single request. For
`cql-text` filters, a long top-level `OR` chain is transparently split
into multiple sub-requests sized to stay under the server's 8 KB URL
byte limit; results are concatenated and deduplicated by id.

dataRetrieval 2.7.23
===================
Expand Down
26 changes: 25 additions & 1 deletion R/construct_api_requests.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@
#' each feature. The returning object will be a data frame with no spatial
#' information.
#' @keywords internal
#' @details
#' Two arguments forwarded via `...` deserve a callout:
#' \itemize{
#' \item `filter` — a CQL text or JSON expression passed through to the
#' OGC API `filter` query parameter. Commonly used to OR several time
#' ranges into a single request. At the time of writing the server
#' accepts `cql-text` (default) and `cql-json`; `cql2-text` /
#' `cql2-json` are not yet supported. A long expression made up of a
#' top-level `OR` chain is automatically split into multiple requests
#' that each fit under the server's URI length limit (handled in
#' `get_ogc_data`); the results are concatenated and deduplicated by id.
#' \item `filter_lang` — language of the `filter` expression, for
#' example `cql-text` (default) or `cql-json`. Sent as `filter-lang`
#' in the URL.
#' }
#' @examples
#' site <- "USGS-02238500"
#' pcode <- "00060"
Expand Down Expand Up @@ -48,6 +63,13 @@ construct_api_requests <- function(

full_list <- list(...)

# Translate the Python-style underscore name to the hyphenated URL key
# the OGC API expects. R doesn't allow hyphens in formal argument names,
# so callers pass `filter_lang`; the URL needs `filter-lang`.
if ("filter_lang" %in% names(full_list)) {
names(full_list)[names(full_list) == "filter_lang"] <- "filter-lang"
}

time_periods <- c(
"last_modified",
"datetime",
Expand Down Expand Up @@ -76,7 +98,9 @@ construct_api_requests <- function(
"time",
"limit",
"begin_utc",
"end_utc"
"end_utc",
"filter",
"filter-lang"
)

comma_params <- c(
Expand Down
281 changes: 271 additions & 10 deletions R/get_ogc_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,12 @@ get_ogc_data <- function(args, output_id, service, ..., chunk_size = 250) {
args[["convertType"]] <- NULL
args[["service"]] <- service

req <- do.call(construct_api_requests, args)

chunks <- plan_filter_chunks(args)
fetched <- fetch_chunks(args, chunks)
return_list <- combine_chunk_frames(fetched$frames)
req <- fetched$req
no_paging <- grepl("f=csv", req$url)

message("Requesting:\n", req$url)

if (no_paging) {
return_list <- get_csv(req, limit = args[["limit"]])
} else {
return_list <- walk_pages(req)
}

if (is.na(args[["skipGeometry"]])) {
skipGeometry <- FALSE
} else {
Expand Down Expand Up @@ -179,3 +173,270 @@ switch_properties_id <- function(properties, id) {

return(properties)
}

# Conservative fallback budget (characters) for a single CQL `filter` query
# parameter, used when `chunk_cql_or` is called directly without a `max_len`.
# `get_ogc_data` computes a tighter per-request budget from
# `.WATERDATA_URL_BYTE_LIMIT` below.
.CQL_FILTER_CHUNK_LEN <- 5000L

# Total URL byte limit the Water Data API will accept before replying
# HTTP 414 (Request-URI Too Large). Empirically the cliff sits at
# ~8,200 bytes of full URL, which lines up with nginx's default
# `large_client_header_buffers` of 8 KB (8192). 8000 leaves ~200 bytes of
# headroom for request-line framing and any intermediate proxy variance.
.WATERDATA_URL_BYTE_LIMIT <- 8000L


#' Decide how to fan `args[["filter"]]` out across HTTP calls
#'
#' Returns one entry per request to send. A `NULL` entry means "send
#' `args` as-is" -- either there is no filter, or the filter language
#' is not one we can safely split (only cql-text top-level `OR` chains
#' are chunkable). Otherwise each character entry is a chunked cql-text
#' expression that replaces `args[["filter"]]` for its sub-request.
#' Overlapping user OR-clauses are deduplicated by feature id later in
#' [combine_chunk_frames].
#'
#' @noRd
plan_filter_chunks <- function(args) {
filter_expr <- args[["filter"]]
filter_lang <- args[["filter_lang"]]
chunkable <- is.character(filter_expr) &&
length(filter_expr) == 1L &&
!is.na(filter_expr) &&
nzchar(filter_expr) &&
(is.null(filter_lang) ||
is.na(filter_lang) ||
identical(filter_lang, "cql-text"))
if (!chunkable) {
return(list(NULL))
}
raw_budget <- effective_filter_budget(args, filter_expr)
as.list(chunk_cql_or(filter_expr, max_len = raw_budget))
}


#' Send one request per chunk; return per-chunk frames and the request
#'
#' A `NULL` chunk means "send `args` as-is" (no filter override). The
#' returned `req` is the *first* sub-request, which is the
#' representative URL to attach as the `request` attribute when the
#' caller asked for one.
#'
#' @noRd
fetch_chunks <- function(args, chunks) {
frames <- vector("list", length(chunks))
first_req <- NULL
for (i in seq_along(chunks)) {
chunk_args <- if (is.null(chunks[[i]])) {
args
} else {
replace(args, "filter", list(chunks[[i]]))
}
chunk_req <- do.call(construct_api_requests, chunk_args)
message("Requesting:\n", chunk_req$url)
if (grepl("f=csv", chunk_req$url)) {
frames[[i]] <- get_csv(chunk_req, limit = chunk_args[["limit"]])
} else {
frames[[i]] <- walk_pages(chunk_req)
}
if (i == 1L) {
first_req <- chunk_req
}
}
list(frames = frames, req = first_req)
}


#' Concatenate per-chunk frames, handling the edge cases
#'
#' Drops empty frames before concat: `walk_pages` returns a plain
#' empty `data.frame` on no-feature responses, which would downgrade
#' a concat of real `sf` results back to a plain frame and strip
#' geometry / CRS. Also dedups on the pre-rename feature `id` so
#' overlapping user-supplied OR-clauses don't produce duplicate rows
#' across chunks.
#'
#' @noRd
combine_chunk_frames <- function(frames) {
non_empty <- Filter(function(df) nrow(df) > 0L, frames)
if (length(non_empty) == 0L) {
return(frames[[1L]])
}
if (length(non_empty) == 1L) {
return(non_empty[[1L]])
}
combined <- do.call(rbind, non_empty)
if ("id" %in% names(combined)) {
combined <- combined[!duplicated(combined$id), , drop = FALSE]
}
combined
}


#' Compute the raw CQL byte budget for `filter_expr` in this request
#'
#' The server limits total URL length (see `.WATERDATA_URL_BYTE_LIMIT`),
#' not raw CQL length. To derive a raw-byte budget we can hand to
#' `chunk_cql_or`:
#'
#' 1. Probe the URL space consumed by the other query params by building
#' the request with a 1-byte placeholder filter.
#' 2. Subtract from the URL limit to get the bytes available for the
#' encoded filter value.
#' 3. Convert back to raw CQL bytes using the *maximum* per-clause
#' encoding ratio, not the whole-filter average. A chunk can end up
#' containing only the heavier-encoding clauses (e.g. heavy ones
#' clustered at one end of the filter), so budgeting against the
#' average lets such a chunk overflow the URL limit.
#'
#' @noRd
effective_filter_budget <- function(args, filter_expr) {
probe_args <- args
probe_args[["filter"]] <- "x"
probe_req <- do.call(construct_api_requests, probe_args)
non_filter_url_bytes <- nchar(probe_req$url) - 1L
available_url_bytes <- .WATERDATA_URL_BYTE_LIMIT - non_filter_url_bytes
if (available_url_bytes <= 0L) {
# The non-filter URL already exceeds the byte limit, so no chunk we
# could produce would fit. Return a budget larger than the filter so
# chunk_cql_or passes it through unchanged -- one clear 414 from the
# server is better feedback than a burst of N failing sub-requests.
return(nchar(filter_expr) + 1L)
}
parts <- split_top_level_or(filter_expr)
if (length(parts) == 0L) parts <- filter_expr
parts <- parts[nzchar(parts)]
# Include the " OR " joiner: it is 4 raw bytes but encodes to
# "%20OR%20" (8 bytes, ratio 2.0). For inputs whose clauses encode
# lighter than that (e.g. time-interval clauses at ~1.6), the joiner
# is what pushes the full URL past the limit -- leaving it out made
# the budget too loose.
ratio <- function(p) nchar(utils::URLencode(p, reserved = TRUE)) / nchar(p)
encoding_ratio <- max(vapply(c(parts, " OR "), ratio, numeric(1)))
max(100L, as.integer(available_url_bytes / encoding_ratio))
}


#' Split a CQL expression at each top-level `OR` separator
#'
#' Respects parentheses and single/double-quoted string literals so that
#' `OR` tokens inside `(A OR B)` or `'word OR word'` are left alone.
#' Matching is case-insensitive. Whitespace around each emitted part is
#' stripped; empty parts are dropped.
#'
#' @noRd
#' @examples
#' dataRetrieval:::split_top_level_or("A OR B OR C")
#' dataRetrieval:::split_top_level_or("(A OR B) OR (C OR D)")
#' dataRetrieval:::split_top_level_or("name = 'foo OR bar' OR id = 1")
split_top_level_or <- function(expr) {
chars <- strsplit(expr, "", fixed = TRUE)[[1]]
n <- length(chars)
if (n == 0L) {
return(character(0))
}

ws <- c(" ", "\t", "\n", "\r")
starts <- integer(0)
ends <- integer(0)

depth <- 0L
in_quote <- ""
i <- 1L
while (i <= n) {
ch <- chars[i]
if (in_quote != "") {
if (ch == in_quote) in_quote <- ""
i <- i + 1L
} else if (ch == "'" || ch == '"') {
in_quote <- ch
i <- i + 1L
} else if (ch == "(") {
depth <- depth + 1L
i <- i + 1L
} else if (ch == ")") {
depth <- depth - 1L
i <- i + 1L
} else if (depth == 0L && ch %in% ws) {
j <- i + 1L
while (j <= n && chars[j] %in% ws) j <- j + 1L
if (
j + 1L <= n &&
(chars[j] == "o" || chars[j] == "O") &&
(chars[j + 1L] == "r" || chars[j + 1L] == "R")
) {
k <- j + 2L
if (k <= n && chars[k] %in% ws) {
starts <- c(starts, i - 1L)
m <- k + 1L
while (m <= n && chars[m] %in% ws) m <- m + 1L
ends <- c(ends, m)
i <- m
next
}
}
i <- i + 1L
} else {
i <- i + 1L
}
}

if (length(starts) == 0L) {
out <- trimws(expr)
return(if (nzchar(out)) out else character(0))
}

segment_starts <- c(1L, ends)
segment_ends <- c(starts, n)
parts <- substring(expr, segment_starts, segment_ends)
parts <- trimws(parts)
parts[nzchar(parts)]
}


#' Split a CQL expression into OR-chunks that each fit under `max_len`
#'
#' Only top-level `OR` chains are split, since that is the only shape
#' that can be recombined losslessly as a disjunction of independent
#' sub-queries. Returns the input unchanged when the whole expression
#' already fits, when it contains no top-level `OR`, or when any single
#' clause is larger than `max_len` on its own (we would rather send a
#' too-long request and surface the server's 414 than silently drop
#' data).
#'
#' @noRd
#' @examples
#' clause <- "(time >= '2023-01-01' AND time <= '2023-01-02')"
#' expr <- paste(rep(clause, 3), collapse = " OR ")
#' dataRetrieval:::chunk_cql_or(expr, max_len = 100)
chunk_cql_or <- function(expr, max_len = .CQL_FILTER_CHUNK_LEN) {
if (nchar(expr) <= max_len) {
return(expr)
}
parts <- split_top_level_or(expr)
if (length(parts) < 2L || any(nchar(parts) > max_len)) {
return(expr)
}

join_len <- nchar(" OR ")
chunks <- character(0)
current <- character(0)
current_len <- 0L
for (part in parts) {
join_cost <- if (length(current) > 0L) join_len else 0L
if (length(current) > 0L && current_len + join_cost + nchar(part) > max_len) {
chunks <- c(chunks, paste(current, collapse = " OR "))
current <- part
current_len <- nchar(part)
} else {
current <- c(current, part)
current_len <- current_len + join_cost + nchar(part)
}
}
if (length(current) > 0L) {
chunks <- c(chunks, paste(current, collapse = " OR "))
}
chunks
}
12 changes: 12 additions & 0 deletions R/read_waterdata_channel.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
#' @param skipGeometry This option can be used to skip response geometries for
#' each feature. The returning object will be a data frame with no spatial
#' information.
#' @param filter A CQL text or JSON expression passed through to the OGC
#' API `filter` query parameter. Commonly used to OR several time ranges
#' into a single request. At the time of writing the server accepts
#' `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not
#' yet supported. A long expression made up of a top-level `OR` chain is
#' transparently split into multiple requests that each fit under the
#' server's URI length limit; the results are concatenated and
#' deduplicated by id.
#' @param filter_lang Language of the `filter` expression, for example
#' `cql-text` (default) or `cql-json`. Sent as `filter-lang` in the URL.
#' @param convertType logical, defaults to `TRUE`. If `TRUE`, the function
#' will convert the data to dates and qualifier to string vector.
#' @param no_paging logical, defaults to `FALSE`. If `TRUE`, the data will
Expand Down Expand Up @@ -95,6 +105,8 @@ read_waterdata_channel <- function(
skipGeometry = NA,
bbox = NA,
limit = NA,
filter = NA_character_,
filter_lang = NA_character_,
convertType = TRUE,
no_paging = FALSE
) {
Expand Down
Loading
Loading