diff --git a/NAMESPACE b/NAMESPACE index 67434a709..70dfb5f2e 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -54,6 +54,7 @@ export(read_waterdata_latest_continuous) export(read_waterdata_latest_daily) export(read_waterdata_metadata) export(read_waterdata_monitoring_location) +export(read_waterdata_nearest_continuous) export(read_waterdata_parameter_codes) export(read_waterdata_samples) export(read_waterdata_stats_daterange) diff --git a/NEWS b/NEWS index aa42bd4a1..33a2682e2 100644 --- a/NEWS +++ b/NEWS @@ -1,6 +1,21 @@ dataRetrieval 2.7.24 =================== * Let dataRetrieval take care of chunking up requests by monitoring_location_id. +* Added `read_waterdata_nearest_continuous()` which, given a vector of + target timestamps, returns the single continuous observation nearest + each target, fetched in one HTTP round-trip (auto-chunked when the + underlying CQL filter gets long). Knobs include `window` (half-window + around each target, default 7.5 minutes to match the 15-minute + continuous cadence) and `on_tie` (`"first"` / `"last"` / `"mean"`). +* Added `filter` and `filter_lang` arguments to the OGC `read_waterdata_*` + functions (continuous, daily, field_measurements, monitoring_location, + ts_meta, latest_continuous, latest_daily, channel). These are forwarded + to the OGC API's CQL `filter` / `filter-lang` query parameters, enabling + server-side filtering that isn't expressible via the other arguments -- + most commonly, OR'ing multiple time ranges into a single request. For + `cql-text` filters, a long top-level `OR` chain is transparently split + into multiple sub-requests sized to stay under the server's 8 KB URL + byte limit; results are concatenated and deduplicated by id. dataRetrieval 2.7.23 =================== diff --git a/R/construct_api_requests.R b/R/construct_api_requests.R index 662b95662..3cef7fe05 100644 --- a/R/construct_api_requests.R +++ b/R/construct_api_requests.R @@ -17,6 +17,21 @@ #' each feature. The returning object will be a data frame with no spatial #' information. #' @keywords internal +#' @details +#' Two arguments forwarded via `...` deserve a callout: +#' \itemize{ +#' \item `filter` — a CQL text or JSON expression passed through to the +#' OGC API `filter` query parameter. Commonly used to OR several time +#' ranges into a single request. At the time of writing the server +#' accepts `cql-text` (default) and `cql-json`; `cql2-text` / +#' `cql2-json` are not yet supported. A long expression made up of a +#' top-level `OR` chain is automatically split into multiple requests +#' that each fit under the server's URI length limit (handled in +#' `get_ogc_data`); the results are concatenated and deduplicated by id. +#' \item `filter_lang` — language of the `filter` expression, for +#' example `cql-text` (default) or `cql-json`. Sent as `filter-lang` +#' in the URL. +#' } #' @examples #' site <- "USGS-02238500" #' pcode <- "00060" @@ -48,6 +63,13 @@ construct_api_requests <- function( full_list <- list(...) + # Translate the Python-style underscore name to the hyphenated URL key + # the OGC API expects. R doesn't allow hyphens in formal argument names, + # so callers pass `filter_lang`; the URL needs `filter-lang`. + if ("filter_lang" %in% names(full_list)) { + names(full_list)[names(full_list) == "filter_lang"] <- "filter-lang" + } + time_periods <- c( "last_modified", "datetime", @@ -76,7 +98,9 @@ construct_api_requests <- function( "time", "limit", "begin_utc", - "end_utc" + "end_utc", + "filter", + "filter-lang" ) comma_params <- c( diff --git a/R/get_ogc_data.R b/R/get_ogc_data.R index e0a915ac1..ef8d686a7 100644 --- a/R/get_ogc_data.R +++ b/R/get_ogc_data.R @@ -44,18 +44,12 @@ get_ogc_data <- function(args, output_id, service, ..., chunk_size = 250) { args[["convertType"]] <- NULL args[["service"]] <- service - req <- do.call(construct_api_requests, args) - + chunks <- plan_filter_chunks(args) + fetched <- fetch_chunks(args, chunks) + return_list <- combine_chunk_frames(fetched$frames) + req <- fetched$req no_paging <- grepl("f=csv", req$url) - message("Requesting:\n", req$url) - - if (no_paging) { - return_list <- get_csv(req, limit = args[["limit"]]) - } else { - return_list <- walk_pages(req) - } - if (is.na(args[["skipGeometry"]])) { skipGeometry <- FALSE } else { @@ -179,3 +173,270 @@ switch_properties_id <- function(properties, id) { return(properties) } + +# Conservative fallback budget (characters) for a single CQL `filter` query +# parameter, used when `chunk_cql_or` is called directly without a `max_len`. +# `get_ogc_data` computes a tighter per-request budget from +# `.WATERDATA_URL_BYTE_LIMIT` below. +.CQL_FILTER_CHUNK_LEN <- 5000L + +# Total URL byte limit the Water Data API will accept before replying +# HTTP 414 (Request-URI Too Large). Empirically the cliff sits at +# ~8,200 bytes of full URL, which lines up with nginx's default +# `large_client_header_buffers` of 8 KB (8192). 8000 leaves ~200 bytes of +# headroom for request-line framing and any intermediate proxy variance. +.WATERDATA_URL_BYTE_LIMIT <- 8000L + + +#' Decide how to fan `args[["filter"]]` out across HTTP calls +#' +#' Returns one entry per request to send. A `NULL` entry means "send +#' `args` as-is" -- either there is no filter, or the filter language +#' is not one we can safely split (only cql-text top-level `OR` chains +#' are chunkable). Otherwise each character entry is a chunked cql-text +#' expression that replaces `args[["filter"]]` for its sub-request. +#' Overlapping user OR-clauses are deduplicated by feature id later in +#' [combine_chunk_frames]. +#' +#' @noRd +plan_filter_chunks <- function(args) { + filter_expr <- args[["filter"]] + filter_lang <- args[["filter_lang"]] + chunkable <- is.character(filter_expr) && + length(filter_expr) == 1L && + !is.na(filter_expr) && + nzchar(filter_expr) && + (is.null(filter_lang) || + is.na(filter_lang) || + identical(filter_lang, "cql-text")) + if (!chunkable) { + return(list(NULL)) + } + raw_budget <- effective_filter_budget(args, filter_expr) + as.list(chunk_cql_or(filter_expr, max_len = raw_budget)) +} + + +#' Send one request per chunk; return per-chunk frames and the request +#' +#' A `NULL` chunk means "send `args` as-is" (no filter override). The +#' returned `req` is the *first* sub-request, which is the +#' representative URL to attach as the `request` attribute when the +#' caller asked for one. +#' +#' @noRd +fetch_chunks <- function(args, chunks) { + frames <- vector("list", length(chunks)) + first_req <- NULL + for (i in seq_along(chunks)) { + chunk_args <- if (is.null(chunks[[i]])) { + args + } else { + replace(args, "filter", list(chunks[[i]])) + } + chunk_req <- do.call(construct_api_requests, chunk_args) + message("Requesting:\n", chunk_req$url) + if (grepl("f=csv", chunk_req$url)) { + frames[[i]] <- get_csv(chunk_req, limit = chunk_args[["limit"]]) + } else { + frames[[i]] <- walk_pages(chunk_req) + } + if (i == 1L) { + first_req <- chunk_req + } + } + list(frames = frames, req = first_req) +} + + +#' Concatenate per-chunk frames, handling the edge cases +#' +#' Drops empty frames before concat: `walk_pages` returns a plain +#' empty `data.frame` on no-feature responses, which would downgrade +#' a concat of real `sf` results back to a plain frame and strip +#' geometry / CRS. Also dedups on the pre-rename feature `id` so +#' overlapping user-supplied OR-clauses don't produce duplicate rows +#' across chunks. +#' +#' @noRd +combine_chunk_frames <- function(frames) { + non_empty <- Filter(function(df) nrow(df) > 0L, frames) + if (length(non_empty) == 0L) { + return(frames[[1L]]) + } + if (length(non_empty) == 1L) { + return(non_empty[[1L]]) + } + combined <- do.call(rbind, non_empty) + if ("id" %in% names(combined)) { + combined <- combined[!duplicated(combined$id), , drop = FALSE] + } + combined +} + + +#' Compute the raw CQL byte budget for `filter_expr` in this request +#' +#' The server limits total URL length (see `.WATERDATA_URL_BYTE_LIMIT`), +#' not raw CQL length. To derive a raw-byte budget we can hand to +#' `chunk_cql_or`: +#' +#' 1. Probe the URL space consumed by the other query params by building +#' the request with a 1-byte placeholder filter. +#' 2. Subtract from the URL limit to get the bytes available for the +#' encoded filter value. +#' 3. Convert back to raw CQL bytes using the *maximum* per-clause +#' encoding ratio, not the whole-filter average. A chunk can end up +#' containing only the heavier-encoding clauses (e.g. heavy ones +#' clustered at one end of the filter), so budgeting against the +#' average lets such a chunk overflow the URL limit. +#' +#' @noRd +effective_filter_budget <- function(args, filter_expr) { + probe_args <- args + probe_args[["filter"]] <- "x" + probe_req <- do.call(construct_api_requests, probe_args) + non_filter_url_bytes <- nchar(probe_req$url) - 1L + available_url_bytes <- .WATERDATA_URL_BYTE_LIMIT - non_filter_url_bytes + if (available_url_bytes <= 0L) { + # The non-filter URL already exceeds the byte limit, so no chunk we + # could produce would fit. Return a budget larger than the filter so + # chunk_cql_or passes it through unchanged -- one clear 414 from the + # server is better feedback than a burst of N failing sub-requests. + return(nchar(filter_expr) + 1L) + } + parts <- split_top_level_or(filter_expr) + if (length(parts) == 0L) parts <- filter_expr + parts <- parts[nzchar(parts)] + # Include the " OR " joiner: it is 4 raw bytes but encodes to + # "%20OR%20" (8 bytes, ratio 2.0). For inputs whose clauses encode + # lighter than that (e.g. time-interval clauses at ~1.6), the joiner + # is what pushes the full URL past the limit -- leaving it out made + # the budget too loose. + ratio <- function(p) nchar(utils::URLencode(p, reserved = TRUE)) / nchar(p) + encoding_ratio <- max(vapply(c(parts, " OR "), ratio, numeric(1))) + max(100L, as.integer(available_url_bytes / encoding_ratio)) +} + + +#' Split a CQL expression at each top-level `OR` separator +#' +#' Respects parentheses and single/double-quoted string literals so that +#' `OR` tokens inside `(A OR B)` or `'word OR word'` are left alone. +#' Matching is case-insensitive. Whitespace around each emitted part is +#' stripped; empty parts are dropped. +#' +#' @noRd +#' @examples +#' dataRetrieval:::split_top_level_or("A OR B OR C") +#' dataRetrieval:::split_top_level_or("(A OR B) OR (C OR D)") +#' dataRetrieval:::split_top_level_or("name = 'foo OR bar' OR id = 1") +split_top_level_or <- function(expr) { + chars <- strsplit(expr, "", fixed = TRUE)[[1]] + n <- length(chars) + if (n == 0L) { + return(character(0)) + } + + ws <- c(" ", "\t", "\n", "\r") + starts <- integer(0) + ends <- integer(0) + + depth <- 0L + in_quote <- "" + i <- 1L + while (i <= n) { + ch <- chars[i] + if (in_quote != "") { + if (ch == in_quote) in_quote <- "" + i <- i + 1L + } else if (ch == "'" || ch == '"') { + in_quote <- ch + i <- i + 1L + } else if (ch == "(") { + depth <- depth + 1L + i <- i + 1L + } else if (ch == ")") { + depth <- depth - 1L + i <- i + 1L + } else if (depth == 0L && ch %in% ws) { + j <- i + 1L + while (j <= n && chars[j] %in% ws) j <- j + 1L + if ( + j + 1L <= n && + (chars[j] == "o" || chars[j] == "O") && + (chars[j + 1L] == "r" || chars[j + 1L] == "R") + ) { + k <- j + 2L + if (k <= n && chars[k] %in% ws) { + starts <- c(starts, i - 1L) + m <- k + 1L + while (m <= n && chars[m] %in% ws) m <- m + 1L + ends <- c(ends, m) + i <- m + next + } + } + i <- i + 1L + } else { + i <- i + 1L + } + } + + if (length(starts) == 0L) { + out <- trimws(expr) + return(if (nzchar(out)) out else character(0)) + } + + segment_starts <- c(1L, ends) + segment_ends <- c(starts, n) + parts <- substring(expr, segment_starts, segment_ends) + parts <- trimws(parts) + parts[nzchar(parts)] +} + + +#' Split a CQL expression into OR-chunks that each fit under `max_len` +#' +#' Only top-level `OR` chains are split, since that is the only shape +#' that can be recombined losslessly as a disjunction of independent +#' sub-queries. Returns the input unchanged when the whole expression +#' already fits, when it contains no top-level `OR`, or when any single +#' clause is larger than `max_len` on its own (we would rather send a +#' too-long request and surface the server's 414 than silently drop +#' data). +#' +#' @noRd +#' @examples +#' clause <- "(time >= '2023-01-01' AND time <= '2023-01-02')" +#' expr <- paste(rep(clause, 3), collapse = " OR ") +#' dataRetrieval:::chunk_cql_or(expr, max_len = 100) +chunk_cql_or <- function(expr, max_len = .CQL_FILTER_CHUNK_LEN) { + if (nchar(expr) <= max_len) { + return(expr) + } + parts <- split_top_level_or(expr) + if (length(parts) < 2L || any(nchar(parts) > max_len)) { + return(expr) + } + + join_len <- nchar(" OR ") + chunks <- character(0) + current <- character(0) + current_len <- 0L + for (part in parts) { + join_cost <- if (length(current) > 0L) join_len else 0L + if (length(current) > 0L && current_len + join_cost + nchar(part) > max_len) { + chunks <- c(chunks, paste(current, collapse = " OR ")) + current <- part + current_len <- nchar(part) + } else { + current <- c(current, part) + current_len <- current_len + join_cost + nchar(part) + } + } + if (length(current) > 0L) { + chunks <- c(chunks, paste(current, collapse = " OR ")) + } + chunks +} diff --git a/R/read_waterdata_channel.R b/R/read_waterdata_channel.R index 62c0376c4..dbf7d6e97 100644 --- a/R/read_waterdata_channel.R +++ b/R/read_waterdata_channel.R @@ -48,6 +48,16 @@ #' @param skipGeometry This option can be used to skip response geometries for #' each feature. The returning object will be a data frame with no spatial #' information. +#' @param filter A CQL text or JSON expression passed through to the OGC +#' API `filter` query parameter. Commonly used to OR several time ranges +#' into a single request. At the time of writing the server accepts +#' `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not +#' yet supported. A long expression made up of a top-level `OR` chain is +#' transparently split into multiple requests that each fit under the +#' server's URI length limit; the results are concatenated and +#' deduplicated by id. +#' @param filter_lang Language of the `filter` expression, for example +#' `cql-text` (default) or `cql-json`. Sent as `filter-lang` in the URL. #' @param convertType logical, defaults to `TRUE`. If `TRUE`, the function #' will convert the data to dates and qualifier to string vector. #' @param no_paging logical, defaults to `FALSE`. If `TRUE`, the data will @@ -95,6 +105,8 @@ read_waterdata_channel <- function( skipGeometry = NA, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) { diff --git a/R/read_waterdata_continuous.R b/R/read_waterdata_continuous.R index c63e6d87f..763d03df5 100644 --- a/R/read_waterdata_continuous.R +++ b/R/read_waterdata_continuous.R @@ -37,6 +37,16 @@ #' limit is 50000. It may be beneficial to set this number lower if your internet #' connection is spotty. The default (`NA`) will set the limit to the maximum #' allowable limit for the service. +#' @param filter A CQL text or JSON expression passed through to the OGC +#' API `filter` query parameter. Commonly used to OR several time ranges +#' into a single request. At the time of writing the server accepts +#' `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not +#' yet supported. A long expression made up of a top-level `OR` chain is +#' transparently split into multiple requests that each fit under the +#' server's URI length limit; the results are concatenated and +#' deduplicated by id. +#' @param filter_lang Language of the `filter` expression, for example +#' `cql-text` (default) or `cql-json`. Sent as `filter-lang` in the URL. #' @param convertType logical, defaults to `TRUE`. If `TRUE`, the function #' will convert the data to dates and qualifier to string vector, and sepcifically #' order the returning data frame by time and monitoring_location_id. @@ -117,6 +127,32 @@ #' # Set the time to Eastern: #' # all_data$time <- lubridate::force_tz(all_data$time, "America/New_York") #' +#' # The `time` argument accepts a single instant or a single interval. +#' # To pull several disjoint windows in one call, pass a CQL-text `filter`: +#' two_windows <- read_waterdata_continuous( +#' monitoring_location_id = "USGS-02238500", +#' parameter_code = "00060", +#' filter = paste( +#' "(time >= '2023-06-01T12:00:00Z' AND time <= '2023-06-01T13:00:00Z')", +#' "OR", +#' "(time >= '2023-06-15T12:00:00Z' AND time <= '2023-06-15T13:00:00Z')" +#' ), +#' filter_lang = "cql-text" +#' ) +#' +#' # Long top-level OR chains (e.g. one short window per discrete-measurement +#' # timestamp) are built up the same way. If the resulting URL would exceed +#' # the server's length limit, the client transparently splits it into +#' # multiple sub-requests and returns the concatenated, deduplicated result. +#' windows <- sprintf( +#' "(time >= '2023-%02d-15T00:00:00Z' AND time <= '2023-%02d-15T00:30:00Z')", +#' 1:12, 1:12 +#' ) +#' many_windows <- read_waterdata_continuous( +#' monitoring_location_id = "USGS-02238500", +#' parameter_code = "00060", +#' filter = paste(windows, collapse = " OR ") +#' ) #' } read_waterdata_continuous <- function( monitoring_location_id = NA_character_, @@ -130,6 +166,8 @@ read_waterdata_continuous <- function( last_modified = NA_character_, time = NA_character_, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) { diff --git a/R/read_waterdata_daily.R b/R/read_waterdata_daily.R index a05186746..e76134418 100644 --- a/R/read_waterdata_daily.R +++ b/R/read_waterdata_daily.R @@ -39,6 +39,16 @@ #' @param skipGeometry This option can be used to skip response geometries for #' each feature. The returning object will be a data frame with no spatial #' information. +#' @param filter A CQL text or JSON expression passed through to the OGC +#' API `filter` query parameter. Commonly used to OR several time ranges +#' into a single request. At the time of writing the server accepts +#' `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not +#' yet supported. A long expression made up of a top-level `OR` chain is +#' transparently split into multiple requests that each fit under the +#' server's URI length limit; the results are concatenated and +#' deduplicated by id. +#' @param filter_lang Language of the `filter` expression, for example +#' `cql-text` (default) or `cql-json`. Sent as `filter-lang` in the URL. #' @param convertType logical, defaults to `TRUE`. If `TRUE`, the function #' will convert the data to dates and qualifier to string vector. #' @param no_paging logical, defaults to `FALSE`. If `TRUE`, the data will @@ -106,6 +116,8 @@ read_waterdata_daily <- function( time = NA_character_, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) { diff --git a/R/read_waterdata_field_measurements.R b/R/read_waterdata_field_measurements.R index b4cb90f4a..7d0143e47 100644 --- a/R/read_waterdata_field_measurements.R +++ b/R/read_waterdata_field_measurements.R @@ -45,6 +45,16 @@ #' @param skipGeometry This option can be used to skip response geometries for #' each feature. The returning object will be a data frame with no spatial #' information. +#' @param filter A CQL text or JSON expression passed through to the OGC +#' API `filter` query parameter. Commonly used to OR several time ranges +#' into a single request. At the time of writing the server accepts +#' `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not +#' yet supported. A long expression made up of a top-level `OR` chain is +#' transparently split into multiple requests that each fit under the +#' server's URI length limit; the results are concatenated and +#' deduplicated by id. +#' @param filter_lang Language of the `filter` expression, for example +#' `cql-text` (default) or `cql-json`. Sent as `filter-lang` in the URL. #' @param convertType logical, defaults to `TRUE`. If `TRUE`, the function #' will convert the data to dates and qualifier to string vector. #' @param no_paging logical, defaults to `FALSE`. If `TRUE`, the data will @@ -112,6 +122,8 @@ read_waterdata_field_measurements <- function( time = NA_character_, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) { diff --git a/R/read_waterdata_latest_continuous.R b/R/read_waterdata_latest_continuous.R index b015c9192..754b0adbd 100644 --- a/R/read_waterdata_latest_continuous.R +++ b/R/read_waterdata_latest_continuous.R @@ -37,6 +37,16 @@ #' @param skipGeometry This option can be used to skip response geometries for #' each feature. The returning object will be a data frame with no spatial #' information. +#' @param filter A CQL text or JSON expression passed through to the OGC +#' API `filter` query parameter. Commonly used to OR several time ranges +#' into a single request. At the time of writing the server accepts +#' `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not +#' yet supported. A long expression made up of a top-level `OR` chain is +#' transparently split into multiple requests that each fit under the +#' server's URI length limit; the results are concatenated and +#' deduplicated by id. +#' @param filter_lang Language of the `filter` expression, for example +#' `cql-text` (default) or `cql-json`. Sent as `filter-lang` in the URL. #' @param convertType logical, defaults to `TRUE`. If `TRUE`, the function #' will convert the data to dates and qualifier to string vector. #' @param no_paging logical, defaults to `FALSE`. If `TRUE`, the data will @@ -93,6 +103,8 @@ read_waterdata_latest_continuous <- function( time = NA_character_, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) { diff --git a/R/read_waterdata_latest_daily.R b/R/read_waterdata_latest_daily.R index dfb5deb38..421d21f6f 100644 --- a/R/read_waterdata_latest_daily.R +++ b/R/read_waterdata_latest_daily.R @@ -39,6 +39,16 @@ #' @param skipGeometry This option can be used to skip response geometries for #' each feature. The returning object will be a data frame with no spatial #' information. +#' @param filter A CQL text or JSON expression passed through to the OGC +#' API `filter` query parameter. Commonly used to OR several time ranges +#' into a single request. At the time of writing the server accepts +#' `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not +#' yet supported. A long expression made up of a top-level `OR` chain is +#' transparently split into multiple requests that each fit under the +#' server's URI length limit; the results are concatenated and +#' deduplicated by id. +#' @param filter_lang Language of the `filter` expression, for example +#' `cql-text` (default) or `cql-json`. Sent as `filter-lang` in the URL. #' @param convertType logical, defaults to `TRUE`. If `TRUE`, the function #' will convert the data to dates and qualifier to string vector. #' @param no_paging logical, defaults to `FALSE`. If `TRUE`, the data will @@ -90,6 +100,8 @@ read_waterdata_latest_daily <- function( time = NA_character_, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) { diff --git a/R/read_waterdata_monitoring_location.R b/R/read_waterdata_monitoring_location.R index 7ec6d4b25..264c32c51 100644 --- a/R/read_waterdata_monitoring_location.R +++ b/R/read_waterdata_monitoring_location.R @@ -62,6 +62,15 @@ #' @param skipGeometry This option can be used to skip response geometries for #' each feature. The returning object will be a data frame with no spatial #' information. +#' @param filter A CQL text or JSON expression passed through to the OGC +#' API `filter` query parameter. At the time of writing the server accepts +#' `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not +#' yet supported. A long expression made up of a top-level `OR` chain is +#' transparently split into multiple requests that each fit under the +#' server's URI length limit; the results are concatenated and +#' deduplicated by id. +#' @param filter_lang Language of the `filter` expression, for example +#' `cql-text` (default) or `cql-json`. Sent as `filter-lang` in the URL. #' @examplesIf is_dataRetrieval_user() #' #' \donttest{ @@ -136,7 +145,9 @@ read_waterdata_monitoring_location <- function( properties = NA_character_, bbox = NA, limit = NA, - skipGeometry = NA + skipGeometry = NA, + filter = NA_character_, + filter_lang = NA_character_ ) { service <- "monitoring-locations" output_id <- "monitoring_location_id" diff --git a/R/read_waterdata_nearest_continuous.R b/R/read_waterdata_nearest_continuous.R new file mode 100644 index 000000000..ef04b5125 --- /dev/null +++ b/R/read_waterdata_nearest_continuous.R @@ -0,0 +1,273 @@ +#' For each target timestamp, return the nearest continuous observation +#' +#' Builds one bracketed `(time >= t-window AND time <= t+window)` clause per +#' target, joins them into a top-level CQL `OR` filter, and lets +#' [read_waterdata_continuous()] (with its URL-length-safe auto-chunking) +#' fetch every observation that falls in any window. Then, per +#' `(monitoring_location_id, target)` pair, picks the single observation +#' whose time is closest to the target. +#' +#' The USGS continuous endpoint matches `time` parameters exactly rather +#' than fuzzily, and it does not implement `sortby` for arbitrary fields; +#' this function is the single-round-trip way to ask "what reading is +#' nearest this timestamp?" for many timestamps at once. +#' +#' @export +#' @param targets Target timestamps. Accepts a `POSIXct` vector, a +#' character vector in ISO 8601 form, a `Date` vector, or anything +#' [lubridate::as_datetime()] accepts. Character input without a +#' timezone is interpreted as UTC. +#' @param monitoring_location_id `r get_ogc_params("continuous")$monitoring_location_id` +#' Multiple monitoring_location_ids can be requested as a character vector. +#' @param parameter_code `r get_ogc_params("continuous")$parameter_code` +#' Multiple parameter_codes can be requested as a character vector. +#' @param window Half-window around each target. Accepts: +#' * a `"MM:SS"` or `"HH:MM:SS"` string (e.g. `"07:30"`, `"15:00"`, +#' `"00:30:00"`, `"01:00:00"`); +#' * an ISO 8601 duration string (e.g. `"PT7M30S"`, `"PT15M"`, +#' `"PT1H"`) or any other string `lubridate::duration()` parses +#' (e.g. `"7 minutes 30 seconds"`); +#' * a number of seconds, a `difftime`, or a `lubridate::Period` / +#' `lubridate::Duration`. +#' +#' Defaults to `"PT7M30S"` (7.5 minutes, ISO 8601), which is half +#' of the typical 15-minute continuous cadence — most targets' +#' windows contain exactly one observation. Widen for irregular +#' cadences or resilience to data gaps. +#' @param on_tie How to resolve ties when two observations are exactly +#' equidistant from a target (which happens when the target falls at +#' the midpoint between grid points — e.g. target 10:22:30 for a +#' 15-minute gauge). One of: +#' * `"first"` — keep the earlier observation (default). +#' * `"last"` — keep the later observation. +#' * `"mean"` — average numeric columns; set `time` to the target, +#' since no real observation exists at the midpoint. +#' @param ... Additional arguments forwarded to [read_waterdata_continuous()] +#' (e.g. `statistic_id`, `approval_status`, `properties`). Passing +#' `time`, `filter`, or `filter_lang` raises an error — this function +#' builds those itself. +#' +#' @details +#' *Window sizing and ties.* When `window` is exactly half the service +#' cadence, most targets' windows contain a single observation and +#' `on_tie` is moot. Ties arise only when a target sits exactly at the +#' window edge — rare in practice but possible. Setting `window` to a +#' full cadence (or larger) guarantees at least one observation per +#' target in steady state at the cost of more bytes per response. +#' +#' *Why windowed CQL rather than sort+limit.* The API's advertised +#' `sortby` parameter would make this a one-liner per target (`filter` +#' by `time <= t` and `limit 1`), but it is per-query — you would need +#' one HTTP round-trip per target. The CQL `OR`-chain approach folds +#' all N targets into one request (auto-chunked when the URL is long). +#' +#' @return A data frame with one row per `(target, monitoring_location_id)` +#' combination that had at least one observation in its window. +#' A `target_time` column is added identifying which target each row +#' corresponds to. Targets with no observations in their window are +#' silently dropped. +#' +#' @examplesIf is_dataRetrieval_user() +#' +#' \donttest{ +#' # Pair three off-grid timestamps with nearby observations +#' targets <- as.POSIXct( +#' c("2023-06-15 10:30:31", "2023-06-15 14:07:12", "2023-06-16 03:45:19"), +#' tz = "UTC" +#' ) +#' +#' near <- read_waterdata_nearest_continuous( +#' targets = targets, +#' monitoring_location_id = "USGS-02238500", +#' parameter_code = "00060" +#' ) +#' +#' # Widen the window for an irregular-cadence gauge and average on ties +#' near_wide <- read_waterdata_nearest_continuous( +#' targets = targets, +#' monitoring_location_id = "USGS-02238500", +#' parameter_code = "00060", +#' window = "PT30M", +#' on_tie = "mean" +#' ) +#' } +read_waterdata_nearest_continuous <- function( + targets, + monitoring_location_id = NA_character_, + parameter_code = NA_character_, + window = "PT7M30S", + on_tie = c("first", "last", "mean"), + ... +) { + on_tie <- match.arg(on_tie) + + dots <- list(...) + forbidden <- c("time", "filter", "filter_lang") + bad <- intersect(forbidden, names(dots)) + if (length(bad) > 0L) { + stop( + "read_waterdata_nearest_continuous constructs its own ", + paste(shQuote(bad), collapse = ", "), + "; do not pass it directly", + call. = FALSE + ) + } + + targets <- to_utc_posixct(targets) + window_secs <- as_window_seconds(window) + + if (length(targets) == 0L) { + # Nothing to ask about — return an empty frame shaped like a real + # read_waterdata_continuous response (via a trivially-empty time range). + empty <- read_waterdata_continuous( + monitoring_location_id = monitoring_location_id, + parameter_code = parameter_code, + time = "1900-01-01T00:00:00Z/1900-01-01T00:00:00Z", + ... + ) + empty$target_time <- as.POSIXct(character(0), tz = "UTC") + return(empty[0, , drop = FALSE]) + } + + fmt <- "%Y-%m-%dT%H:%M:%SZ" + starts <- format(targets - window_secs, fmt, tz = "UTC") + ends <- format(targets + window_secs, fmt, tz = "UTC") + clauses <- sprintf("(time >= '%s' AND time <= '%s')", starts, ends) + filter_expr <- paste(clauses, collapse = " OR ") + + df <- read_waterdata_continuous( + monitoring_location_id = monitoring_location_id, + parameter_code = parameter_code, + filter = filter_expr, + filter_lang = "cql-text", + ... + ) + + if (nrow(df) == 0L) { + df$target_time <- as.POSIXct(character(0), tz = "UTC") + return(df) + } + + df$time <- as.POSIXct(df$time, tz = "UTC") + + if ("monitoring_location_id" %in% names(df)) { + site_groups <- split(df, df$monitoring_location_id, drop = FALSE) + } else { + site_groups <- list(df) + } + + selected <- vector("list", 0L) + for (site_df in site_groups) { + for (ti in seq_along(targets)) { + # Subscript instead of iteration: for-loop on a POSIXct vector + # strips the class from the loop variable, which would make the + # target_time column numeric after rbind. + target <- targets[ti] + lo <- target - window_secs + hi <- target + window_secs + in_window <- site_df$time >= lo & site_df$time <= hi + window_df <- site_df[in_window, , drop = FALSE] + if (nrow(window_df) == 0L) next + + deltas <- abs(as.numeric( + difftime(window_df$time, target, units = "secs") + )) + candidates <- window_df[deltas == min(deltas), , drop = FALSE] + candidates <- candidates[order(candidates$time), , drop = FALSE] + + if (nrow(candidates) == 1L || on_tie == "first") { + row <- candidates[1L, , drop = FALSE] + } else if (on_tie == "last") { + row <- candidates[nrow(candidates), , drop = FALSE] + } else { + # "mean" — average numeric columns, pin time to the target since + # no real observation exists at the midpoint. + row <- candidates[1L, , drop = FALSE] + num_cols <- vapply(candidates, is.numeric, logical(1L)) + for (nc in names(candidates)[num_cols]) { + row[[nc]] <- mean(candidates[[nc]]) + } + row$time <- target + } + + row$target_time <- target + selected[[length(selected) + 1L]] <- row + } + } + + if (length(selected) == 0L) { + empty <- df[0, , drop = FALSE] + empty$target_time <- as.POSIXct(character(0), tz = "UTC") + return(empty) + } + + result <- do.call(rbind, selected) + row.names(result) <- NULL + result +} + + +#' Coerce various datetime-like inputs to a UTC `POSIXct` vector +#' +#' @noRd +to_utc_posixct <- function(x) { + if (length(x) == 0L) { + return(as.POSIXct(character(0), tz = "UTC")) + } + if (inherits(x, "POSIXct")) { + return(as.POSIXct(format(x, tz = "UTC"), tz = "UTC")) + } + if (inherits(x, "Date")) { + return(as.POSIXct(format(x), tz = "UTC")) + } + # Character and anything lubridate::as_datetime can parse. Naive strings + # are treated as UTC to match Python pd.to_datetime(..., utc=True). + lubridate::as_datetime(x, tz = "UTC") +} + + +#' Coerce `window` to a number of seconds +#' +#' Accepts a `"MM:SS"` or `"HH:MM:SS"` clock-style string, an ISO 8601 +#' duration string (or anything `lubridate::duration()` parses), a +#' plain number of seconds, a `difftime`, or a `lubridate::Period` / +#' `Duration`. +#' +#' @noRd +as_window_seconds <- function(window) { + if (inherits(window, "difftime")) { + return(as.numeric(window, units = "secs")) + } + if (inherits(window, c("Period", "Duration"))) { + return(as.numeric(lubridate::as.duration(window), units = "secs")) + } + if (is.numeric(window) && length(window) == 1L) { + return(as.numeric(window)) + } + if (is.character(window) && length(window) == 1L) { + # MM:SS or HH:MM:SS (with optional fractional seconds). + if (grepl("^\\d+:\\d{1,2}(:\\d{1,2})?(\\.\\d+)?$", window)) { + parts <- as.numeric(strsplit(window, ":", fixed = TRUE)[[1L]]) + if (length(parts) == 2L) { + return(parts[1L] * 60 + parts[2L]) + } + return(parts[1L] * 3600 + parts[2L] * 60 + parts[3L]) + } + # ISO 8601 duration ("PT7M30S", "PT1H", "P1D", ...) or a natural- + # language form that `lubridate::duration` recognises. + parsed <- suppressWarnings(tryCatch( + lubridate::duration(window), + error = function(e) NULL + )) + if (!is.null(parsed) && !is.na(parsed)) { + return(as.numeric(parsed, units = "secs")) + } + } + stop( + "`window` must be a clock-style 'MM:SS'/'HH:MM:SS' string, an ISO ", + "8601 duration (e.g. 'PT7M30S'), a number of seconds, a difftime, ", + "or a lubridate Period/Duration; got: ", + paste(format(window), collapse = " "), + call. = FALSE + ) +} diff --git a/R/read_waterdata_ts_meta.R b/R/read_waterdata_ts_meta.R index dd9777841..136761b84 100644 --- a/R/read_waterdata_ts_meta.R +++ b/R/read_waterdata_ts_meta.R @@ -67,6 +67,15 @@ #' be requested from a native csv format. This can be dangerous because the #' data will cut off at 50,000 rows without indication that more data #' is available. Use `TRUE` with caution. +#' @param filter A CQL text or JSON expression passed through to the OGC +#' API `filter` query parameter. At the time of writing the server accepts +#' `cql-text` (default) and `cql-json`; `cql2-text` / `cql2-json` are not +#' yet supported. A long expression made up of a top-level `OR` chain is +#' transparently split into multiple requests that each fit under the +#' server's URI length limit; the results are concatenated and +#' deduplicated by id. +#' @param filter_lang Language of the `filter` expression, for example +#' `cql-text` (default) or `cql-json`. Sent as `filter-lang` in the URL. #' #' @inherit read_waterdata_continuous details #' @@ -114,6 +123,8 @@ read_waterdata_ts_meta <- function( bbox = NA, begin = NA_character_, end = NA_character_, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) { diff --git a/man/construct_api_requests.Rd b/man/construct_api_requests.Rd index 3e691d751..ce2a210e0 100644 --- a/man/construct_api_requests.Rd +++ b/man/construct_api_requests.Rd @@ -35,6 +35,22 @@ information.} Main documentation: \url{https://api.waterdata.usgs.gov/ogcapi/v0/}, Swagger docs: \url{https://api.waterdata.usgs.gov/ogcapi/v0/openapi?f=html}. } +\details{ +Two arguments forwarded via \code{...} deserve a callout: +\itemize{ +\item \code{filter} — a CQL text or JSON expression passed through to the +OGC API \code{filter} query parameter. Commonly used to OR several time +ranges into a single request. At the time of writing the server +accepts \code{cql-text} (default) and \code{cql-json}; \code{cql2-text} / +\code{cql2-json} are not yet supported. A long expression made up of a +top-level \code{OR} chain is automatically split into multiple requests +that each fit under the server's URI length limit (handled in +\code{get_ogc_data}); the results are concatenated and deduplicated by id. +\item \code{filter_lang} — language of the \code{filter} expression, for +example \code{cql-text} (default) or \code{cql-json}. Sent as \code{filter-lang} +in the URL. +} +} \examples{ site <- "USGS-02238500" pcode <- "00060" diff --git a/man/read_waterdata_channel.Rd b/man/read_waterdata_channel.Rd index 537a7fdf6..7d98056c2 100644 --- a/man/read_waterdata_channel.Rd +++ b/man/read_waterdata_channel.Rd @@ -34,6 +34,8 @@ read_waterdata_channel( skipGeometry = NA, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) @@ -135,6 +137,18 @@ limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (\code{NA}) will set the limit to the maximum allowable limit for the service.} +\item{filter}{A CQL text or JSON expression passed through to the OGC +API \code{filter} query parameter. Commonly used to OR several time ranges +into a single request. At the time of writing the server accepts +\code{cql-text} (default) and \code{cql-json}; \code{cql2-text} / \code{cql2-json} are not +yet supported. A long expression made up of a top-level \code{OR} chain is +transparently split into multiple requests that each fit under the +server's URI length limit; the results are concatenated and +deduplicated by id.} + +\item{filter_lang}{Language of the \code{filter} expression, for example +\code{cql-text} (default) or \code{cql-json}. Sent as \code{filter-lang} in the URL.} + \item{convertType}{logical, defaults to \code{TRUE}. If \code{TRUE}, the function will convert the data to dates and qualifier to string vector.} diff --git a/man/read_waterdata_continuous.Rd b/man/read_waterdata_continuous.Rd index 8b66cbb29..c07168bd7 100644 --- a/man/read_waterdata_continuous.Rd +++ b/man/read_waterdata_continuous.Rd @@ -16,6 +16,8 @@ read_waterdata_continuous( last_modified = NA_character_, time = NA_character_, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) @@ -79,6 +81,18 @@ limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (\code{NA}) will set the limit to the maximum allowable limit for the service.} +\item{filter}{A CQL text or JSON expression passed through to the OGC +API \code{filter} query parameter. Commonly used to OR several time ranges +into a single request. At the time of writing the server accepts +\code{cql-text} (default) and \code{cql-json}; \code{cql2-text} / \code{cql2-json} are not +yet supported. A long expression made up of a top-level \code{OR} chain is +transparently split into multiple requests that each fit under the +server's URI length limit; the results are concatenated and +deduplicated by id.} + +\item{filter_lang}{Language of the \code{filter} expression, for example +\code{cql-text} (default) or \code{cql-json}. Sent as \code{filter-lang} in the URL.} + \item{convertType}{logical, defaults to \code{TRUE}. If \code{TRUE}, the function will convert the data to dates and qualifier to string vector, and sepcifically order the returning data frame by time and monitoring_location_id.} @@ -172,6 +186,32 @@ time_df <- data.frame(start = time_chunks[-length(time_chunks)], # Set the time to Eastern: # all_data$time <- lubridate::force_tz(all_data$time, "America/New_York") +# The `time` argument accepts a single instant or a single interval. +# To pull several disjoint windows in one call, pass a CQL-text `filter`: +two_windows <- read_waterdata_continuous( + monitoring_location_id = "USGS-02238500", + parameter_code = "00060", + filter = paste( + "(time >= '2023-06-01T12:00:00Z' AND time <= '2023-06-01T13:00:00Z')", + "OR", + "(time >= '2023-06-15T12:00:00Z' AND time <= '2023-06-15T13:00:00Z')" + ), + filter_lang = "cql-text" +) + +# Long top-level OR chains (e.g. one short window per discrete-measurement +# timestamp) are built up the same way. If the resulting URL would exceed +# the server's length limit, the client transparently splits it into +# multiple sub-requests and returns the concatenated, deduplicated result. +windows <- sprintf( + "(time >= '2023-\%02d-15T00:00:00Z' AND time <= '2023-\%02d-15T00:30:00Z')", + 1:12, 1:12 +) +many_windows <- read_waterdata_continuous( + monitoring_location_id = "USGS-02238500", + parameter_code = "00060", + filter = paste(windows, collapse = " OR ") +) } \dontshow{\}) # examplesIf} } diff --git a/man/read_waterdata_daily.Rd b/man/read_waterdata_daily.Rd index 9f3fa8a33..4f4436032 100644 --- a/man/read_waterdata_daily.Rd +++ b/man/read_waterdata_daily.Rd @@ -19,6 +19,8 @@ read_waterdata_daily( time = NA_character_, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) @@ -97,6 +99,18 @@ limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (\code{NA}) will set the limit to the maximum allowable limit for the service.} +\item{filter}{A CQL text or JSON expression passed through to the OGC +API \code{filter} query parameter. Commonly used to OR several time ranges +into a single request. At the time of writing the server accepts +\code{cql-text} (default) and \code{cql-json}; \code{cql2-text} / \code{cql2-json} are not +yet supported. A long expression made up of a top-level \code{OR} chain is +transparently split into multiple requests that each fit under the +server's URI length limit; the results are concatenated and +deduplicated by id.} + +\item{filter_lang}{Language of the \code{filter} expression, for example +\code{cql-text} (default) or \code{cql-json}. Sent as \code{filter-lang} in the URL.} + \item{convertType}{logical, defaults to \code{TRUE}. If \code{TRUE}, the function will convert the data to dates and qualifier to string vector.} diff --git a/man/read_waterdata_field_measurements.Rd b/man/read_waterdata_field_measurements.Rd index 508d2c9cd..b6ac20174 100644 --- a/man/read_waterdata_field_measurements.Rd +++ b/man/read_waterdata_field_measurements.Rd @@ -24,6 +24,8 @@ read_waterdata_field_measurements( time = NA_character_, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) @@ -41,7 +43,7 @@ Multiple parameter_codes can be requested as a character vector.} \item{properties}{A vector of requested columns to be returned from the query. Available options are: -geometry, field_measurement_id, field_visit_id, parameter_code, monitoring_location_id, observing_procedure_code, observing_procedure, value, unit_of_measure, time, qualifier, vertical_datum, approval_status, measuring_agency, last_modified, control_condition, measurement_rated. +geometry, field_measurement_id, field_measurements_series_id, field_visit_id, parameter_code, monitoring_location_id, observing_procedure_code, observing_procedure, value, unit_of_measure, time, qualifier, vertical_datum, approval_status, measuring_agency, last_modified, control_condition, measurement_rated. The default (\code{NA}) will return all columns of the data.} \item{field_visit_id}{A universally unique identifier (UUID) for the field visit. Multiple measurements may be made during a single field visit.} @@ -109,6 +111,18 @@ limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (\code{NA}) will set the limit to the maximum allowable limit for the service.} +\item{filter}{A CQL text or JSON expression passed through to the OGC +API \code{filter} query parameter. Commonly used to OR several time ranges +into a single request. At the time of writing the server accepts +\code{cql-text} (default) and \code{cql-json}; \code{cql2-text} / \code{cql2-json} are not +yet supported. A long expression made up of a top-level \code{OR} chain is +transparently split into multiple requests that each fit under the +server's URI length limit; the results are concatenated and +deduplicated by id.} + +\item{filter_lang}{Language of the \code{filter} expression, for example +\code{cql-text} (default) or \code{cql-json}. Sent as \code{filter-lang} in the URL.} + \item{convertType}{logical, defaults to \code{TRUE}. If \code{TRUE}, the function will convert the data to dates and qualifier to string vector.} diff --git a/man/read_waterdata_latest_continuous.Rd b/man/read_waterdata_latest_continuous.Rd index 14b211de5..09484f81a 100644 --- a/man/read_waterdata_latest_continuous.Rd +++ b/man/read_waterdata_latest_continuous.Rd @@ -18,6 +18,8 @@ read_waterdata_latest_continuous( time = NA_character_, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) @@ -92,6 +94,18 @@ limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (\code{NA}) will set the limit to the maximum allowable limit for the service.} +\item{filter}{A CQL text or JSON expression passed through to the OGC +API \code{filter} query parameter. Commonly used to OR several time ranges +into a single request. At the time of writing the server accepts +\code{cql-text} (default) and \code{cql-json}; \code{cql2-text} / \code{cql2-json} are not +yet supported. A long expression made up of a top-level \code{OR} chain is +transparently split into multiple requests that each fit under the +server's URI length limit; the results are concatenated and +deduplicated by id.} + +\item{filter_lang}{Language of the \code{filter} expression, for example +\code{cql-text} (default) or \code{cql-json}. Sent as \code{filter-lang} in the URL.} + \item{convertType}{logical, defaults to \code{TRUE}. If \code{TRUE}, the function will convert the data to dates and qualifier to string vector.} diff --git a/man/read_waterdata_latest_daily.Rd b/man/read_waterdata_latest_daily.Rd index a50f727e9..0c82b892d 100644 --- a/man/read_waterdata_latest_daily.Rd +++ b/man/read_waterdata_latest_daily.Rd @@ -19,6 +19,8 @@ read_waterdata_latest_daily( time = NA_character_, bbox = NA, limit = NA, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) @@ -97,6 +99,18 @@ limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (\code{NA}) will set the limit to the maximum allowable limit for the service.} +\item{filter}{A CQL text or JSON expression passed through to the OGC +API \code{filter} query parameter. Commonly used to OR several time ranges +into a single request. At the time of writing the server accepts +\code{cql-text} (default) and \code{cql-json}; \code{cql2-text} / \code{cql2-json} are not +yet supported. A long expression made up of a top-level \code{OR} chain is +transparently split into multiple requests that each fit under the +server's URI length limit; the results are concatenated and +deduplicated by id.} + +\item{filter_lang}{Language of the \code{filter} expression, for example +\code{cql-text} (default) or \code{cql-json}. Sent as \code{filter-lang} in the URL.} + \item{convertType}{logical, defaults to \code{TRUE}. If \code{TRUE}, the function will convert the data to dates and qualifier to string vector.} diff --git a/man/read_waterdata_monitoring_location.Rd b/man/read_waterdata_monitoring_location.Rd index b8e87b7d0..88222a3fd 100644 --- a/man/read_waterdata_monitoring_location.Rd +++ b/man/read_waterdata_monitoring_location.Rd @@ -48,7 +48,9 @@ read_waterdata_monitoring_location( properties = NA_character_, bbox = NA, limit = NA, - skipGeometry = NA + skipGeometry = NA, + filter = NA_character_, + filter_lang = NA_character_ ) } \arguments{ @@ -155,6 +157,17 @@ allowable limit for the service.} \item{skipGeometry}{This option can be used to skip response geometries for each feature. The returning object will be a data frame with no spatial information.} + +\item{filter}{A CQL text or JSON expression passed through to the OGC +API \code{filter} query parameter. At the time of writing the server accepts +\code{cql-text} (default) and \code{cql-json}; \code{cql2-text} / \code{cql2-json} are not +yet supported. A long expression made up of a top-level \code{OR} chain is +transparently split into multiple requests that each fit under the +server's URI length limit; the results are concatenated and +deduplicated by id.} + +\item{filter_lang}{Language of the \code{filter} expression, for example +\code{cql-text} (default) or \code{cql-json}. Sent as \code{filter-lang} in the URL.} } \description{ Location information is basic information about the monitoring location including the name, identifier, agency responsible for data collection, and the date the location was established. It also includes information about the type of location, such as stream, lake, or groundwater, and geographic information about the location, such as state, county, latitude and longitude, and hydrologic unit code (HUC). diff --git a/man/read_waterdata_nearest_continuous.Rd b/man/read_waterdata_nearest_continuous.Rd new file mode 100644 index 000000000..1f8f0e9ad --- /dev/null +++ b/man/read_waterdata_nearest_continuous.Rd @@ -0,0 +1,122 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/read_waterdata_nearest_continuous.R +\name{read_waterdata_nearest_continuous} +\alias{read_waterdata_nearest_continuous} +\title{For each target timestamp, return the nearest continuous observation} +\usage{ +read_waterdata_nearest_continuous( + targets, + monitoring_location_id = NA_character_, + parameter_code = NA_character_, + window = "PT7M30S", + on_tie = c("first", "last", "mean"), + ... +) +} +\arguments{ +\item{targets}{Target timestamps. Accepts a \code{POSIXct} vector, a +character vector in ISO 8601 form, a \code{Date} vector, or anything +\code{\link[lubridate:as_date]{lubridate::as_datetime()}} accepts. Character input without a +timezone is interpreted as UTC.} + +\item{monitoring_location_id}{A unique identifier representing a single monitoring location. This corresponds to the \code{id} field in the \code{monitoring-locations} endpoint. Monitoring location IDs are created by combining the agency code of the agency responsible for the monitoring location (e.g. USGS) with the ID number of the monitoring location (e.g. 02238500), separated by a hyphen (e.g. USGS-02238500). + +Multiple monitoring_location_ids can be requested as a character vector.} + +\item{parameter_code}{Parameter codes are 5-digit codes used to identify the constituent measured and the units of measure. A complete list of parameter codes and associated groupings can be found at \url{https://api.waterdata.usgs.gov/ogcapi/v0/collections/parameter-codes/items}. + +Multiple parameter_codes can be requested as a character vector.} + +\item{window}{Half-window around each target. Accepts: +\itemize{ +\item a \code{"MM:SS"} or \code{"HH:MM:SS"} string (e.g. \code{"07:30"}, \code{"15:00"}, +\code{"00:30:00"}, \code{"01:00:00"}); +\item an ISO 8601 duration string (e.g. \code{"PT7M30S"}, \code{"PT15M"}, +\code{"PT1H"}) or any other string \code{lubridate::duration()} parses +(e.g. \code{"7 minutes 30 seconds"}); +\item a number of seconds, a \code{difftime}, or a \code{lubridate::Period} / +\code{lubridate::Duration}. +} + +Defaults to \code{"PT7M30S"} (7.5 minutes, ISO 8601), which is half +of the typical 15-minute continuous cadence — most targets' +windows contain exactly one observation. Widen for irregular +cadences or resilience to data gaps.} + +\item{on_tie}{How to resolve ties when two observations are exactly +equidistant from a target (which happens when the target falls at +the midpoint between grid points — e.g. target 10:22:30 for a +15-minute gauge). One of: +\itemize{ +\item \code{"first"} — keep the earlier observation (default). +\item \code{"last"} — keep the later observation. +\item \code{"mean"} — average numeric columns; set \code{time} to the target, +since no real observation exists at the midpoint. +}} + +\item{...}{Additional arguments forwarded to \code{\link[=read_waterdata_continuous]{read_waterdata_continuous()}} +(e.g. \code{statistic_id}, \code{approval_status}, \code{properties}). Passing +\code{time}, \code{filter}, or \code{filter_lang} raises an error — this function +builds those itself.} +} +\value{ +A data frame with one row per \verb{(target, monitoring_location_id)} +combination that had at least one observation in its window. +A \code{target_time} column is added identifying which target each row +corresponds to. Targets with no observations in their window are +silently dropped. +} +\description{ +Builds one bracketed \verb{(time >= t-window AND time <= t+window)} clause per +target, joins them into a top-level CQL \code{OR} filter, and lets +\code{\link[=read_waterdata_continuous]{read_waterdata_continuous()}} (with its URL-length-safe auto-chunking) +fetch every observation that falls in any window. Then, per +\verb{(monitoring_location_id, target)} pair, picks the single observation +whose time is closest to the target. +} +\details{ +The USGS continuous endpoint matches \code{time} parameters exactly rather +than fuzzily, and it does not implement \code{sortby} for arbitrary fields; +this function is the single-round-trip way to ask "what reading is +nearest this timestamp?" for many timestamps at once. + +\emph{Window sizing and ties.} When \code{window} is exactly half the service +cadence, most targets' windows contain a single observation and +\code{on_tie} is moot. Ties arise only when a target sits exactly at the +window edge — rare in practice but possible. Setting \code{window} to a +full cadence (or larger) guarantees at least one observation per +target in steady state at the cost of more bytes per response. + +\emph{Why windowed CQL rather than sort+limit.} The API's advertised +\code{sortby} parameter would make this a one-liner per target (\code{filter} +by \code{time <= t} and \verb{limit 1}), but it is per-query — you would need +one HTTP round-trip per target. The CQL \code{OR}-chain approach folds +all N targets into one request (auto-chunked when the URL is long). +} +\examples{ +\dontshow{if (is_dataRetrieval_user()) withAutoprint(\{ # examplesIf} + +\donttest{ +# Pair three off-grid timestamps with nearby observations +targets <- as.POSIXct( + c("2023-06-15 10:30:31", "2023-06-15 14:07:12", "2023-06-16 03:45:19"), + tz = "UTC" +) + +near <- read_waterdata_nearest_continuous( + targets = targets, + monitoring_location_id = "USGS-02238500", + parameter_code = "00060" +) + +# Widen the window for an irregular-cadence gauge and average on ties +near_wide <- read_waterdata_nearest_continuous( + targets = targets, + monitoring_location_id = "USGS-02238500", + parameter_code = "00060", + window = "PT30M", + on_tie = "mean" +) +} +\dontshow{\}) # examplesIf} +} diff --git a/man/read_waterdata_ts_meta.Rd b/man/read_waterdata_ts_meta.Rd index 181b237f0..8aab886b3 100644 --- a/man/read_waterdata_ts_meta.Rd +++ b/man/read_waterdata_ts_meta.Rd @@ -30,6 +30,8 @@ read_waterdata_ts_meta( bbox = NA, begin = NA_character_, end = NA_character_, + filter = NA_character_, + filter_lang = NA_character_, convertType = TRUE, no_paging = FALSE ) @@ -151,6 +153,17 @@ Southern-most latitude, Eastern-most longitude, Northern-most longitude).} \item{end}{This field contains the same information as "end_utc", but in the local time of the monitoring location. It is retained for backwards compatibility, but will be removed in V1 of these APIs.} +\item{filter}{A CQL text or JSON expression passed through to the OGC +API \code{filter} query parameter. At the time of writing the server accepts +\code{cql-text} (default) and \code{cql-json}; \code{cql2-text} / \code{cql2-json} are not +yet supported. A long expression made up of a top-level \code{OR} chain is +transparently split into multiple requests that each fit under the +server's URI length limit; the results are concatenated and +deduplicated by id.} + +\item{filter_lang}{Language of the \code{filter} expression, for example +\code{cql-text} (default) or \code{cql-json}. Sent as \code{filter-lang} in the URL.} + \item{convertType}{logical, defaults to \code{TRUE}. If \code{TRUE}, the function will convert the data to dates and qualifier to string vector.} diff --git a/tests/testthat/test_nearest_continuous.R b/tests/testthat/test_nearest_continuous.R new file mode 100644 index 000000000..f95c5408a --- /dev/null +++ b/tests/testthat/test_nearest_continuous.R @@ -0,0 +1,278 @@ +context("read_waterdata_nearest_continuous") + +# --- Helpers ----------------------------------------------------------------- + +# Build a synthetic data frame of continuous observations (one per row). +fake_continuous <- function( + times, + values = seq_along(times), + site = "USGS-02238500" +) { + data.frame( + monitoring_location_id = rep(site, length(times)), + time = as.POSIXct(times, tz = "UTC"), + value = values, + stringsAsFactors = FALSE + ) +} + +# Capture the args that our stubbed read_waterdata_continuous receives, then +# return a canned data frame. +stub_continuous <- function(return_df, recorder = new.env()) { + recorder$calls <- list() + recorder$fn <- function(...) { + args <- list(...) + recorder$calls[[length(recorder$calls) + 1L]] <- args + return_df + } + recorder +} + +# --- Filter construction ----------------------------------------------------- + +test_that("builds one AND-bracketed OR-clause per target", { + targets <- as.POSIXct( + c("2023-06-15 10:30:31", "2023-06-15 14:07:12"), + tz = "UTC" + ) + recorder <- stub_continuous(fake_continuous(targets)) + with_mocked_bindings( + read_waterdata_continuous = recorder$fn, + { + read_waterdata_nearest_continuous( + targets = targets, + monitoring_location_id = "USGS-02238500", + parameter_code = "00060", + window = 450 + ) + } + ) + expect_length(recorder$calls, 1L) + args <- recorder$calls[[1L]] + expect_equal(args$filter_lang, "cql-text") + expect_match(args$filter, "time >= '2023-06-15T10:23:01Z'", fixed = TRUE) + expect_match(args$filter, "time <= '2023-06-15T10:38:01Z'", fixed = TRUE) + expect_match(args$filter, "time >= '2023-06-15T13:59:42Z'", fixed = TRUE) + expect_match(args$filter, "time <= '2023-06-15T14:14:42Z'", fixed = TRUE) + # Exactly one OR joins the two bracketed clauses. + expect_equal( + lengths(regmatches(args$filter, gregexpr(" OR ", args$filter))), + 1L + ) +}) + +# --- Nearest reduction ------------------------------------------------------- + +test_that("picks the observation with minimum absolute time delta", { + target <- as.POSIXct("2023-06-15 10:30:31", tz = "UTC") + obs_times <- as.POSIXct(c( + "2023-06-15 10:25:00", + "2023-06-15 10:30:00", # closest to target (31s away) + "2023-06-15 10:35:00" + ), tz = "UTC") + recorder <- stub_continuous(fake_continuous(obs_times)) + out <- with_mocked_bindings( + read_waterdata_continuous = recorder$fn, + { + read_waterdata_nearest_continuous( + targets = target, + monitoring_location_id = "USGS-02238500", + parameter_code = "00060", + window = 600 + ) + } + ) + expect_equal(nrow(out), 1L) + expect_equal(out$time, as.POSIXct("2023-06-15 10:30:00", tz = "UTC")) + expect_equal(out$target_time, target) +}) + +test_that("on_tie = 'first' keeps earlier equidistant observation", { + target <- as.POSIXct("2023-06-15 10:22:30", tz = "UTC") + obs_times <- as.POSIXct(c( + "2023-06-15 10:15:00", + "2023-06-15 10:30:00" + ), tz = "UTC") + recorder <- stub_continuous(fake_continuous(obs_times, values = c(10, 20))) + out <- with_mocked_bindings( + read_waterdata_continuous = recorder$fn, + { + read_waterdata_nearest_continuous( + targets = target, + window = 450, + on_tie = "first" + ) + } + ) + expect_equal(nrow(out), 1L) + expect_equal(out$time, as.POSIXct("2023-06-15 10:15:00", tz = "UTC")) + expect_equal(out$value, 10) +}) + +test_that("on_tie = 'last' keeps later equidistant observation", { + target <- as.POSIXct("2023-06-15 10:22:30", tz = "UTC") + obs_times <- as.POSIXct(c( + "2023-06-15 10:15:00", + "2023-06-15 10:30:00" + ), tz = "UTC") + recorder <- stub_continuous(fake_continuous(obs_times, values = c(10, 20))) + out <- with_mocked_bindings( + read_waterdata_continuous = recorder$fn, + { + read_waterdata_nearest_continuous( + targets = target, + window = 450, + on_tie = "last" + ) + } + ) + expect_equal(nrow(out), 1L) + expect_equal(out$time, as.POSIXct("2023-06-15 10:30:00", tz = "UTC")) + expect_equal(out$value, 20) +}) + +test_that("on_tie = 'mean' averages numerics and pins time to the target", { + target <- as.POSIXct("2023-06-15 10:22:30", tz = "UTC") + obs_times <- as.POSIXct(c( + "2023-06-15 10:15:00", + "2023-06-15 10:30:00" + ), tz = "UTC") + recorder <- stub_continuous(fake_continuous(obs_times, values = c(10, 20))) + out <- with_mocked_bindings( + read_waterdata_continuous = recorder$fn, + { + read_waterdata_nearest_continuous( + targets = target, + window = 450, + on_tie = "mean" + ) + } + ) + expect_equal(nrow(out), 1L) + expect_equal(out$time, target) + expect_equal(out$value, 15) +}) + +# --- Edge cases -------------------------------------------------------------- + +test_that("targets with no observations in their window are dropped", { + targets <- as.POSIXct( + c("2023-06-15 10:30:00", "2030-01-01 00:00:00"), + tz = "UTC" + ) + obs_times <- as.POSIXct("2023-06-15 10:30:00", tz = "UTC") + recorder <- stub_continuous(fake_continuous(obs_times)) + out <- with_mocked_bindings( + read_waterdata_continuous = recorder$fn, + { + read_waterdata_nearest_continuous(targets = targets, window = 450) + } + ) + expect_equal(nrow(out), 1L) + expect_equal(out$target_time, targets[1L]) +}) + +test_that("multi-site returns one row per (target, site) pair", { + targets <- as.POSIXct( + c("2023-06-15 10:30:00", "2023-06-15 10:45:00"), + tz = "UTC" + ) + df <- rbind( + fake_continuous(targets, site = "USGS-02238500"), + fake_continuous(targets, site = "USGS-01646500") + ) + recorder <- stub_continuous(df) + out <- with_mocked_bindings( + read_waterdata_continuous = recorder$fn, + { + read_waterdata_nearest_continuous( + targets = targets, + monitoring_location_id = c("USGS-02238500", "USGS-01646500"), + window = 450 + ) + } + ) + expect_equal(nrow(out), 4L) + expect_setequal( + unique(out$monitoring_location_id), + c("USGS-02238500", "USGS-01646500") + ) +}) + +test_that("empty targets returns an empty frame without calling the service", { + recorder <- stub_continuous( + data.frame( + monitoring_location_id = character(0), + time = as.POSIXct(character(0), tz = "UTC"), + value = numeric(0), + stringsAsFactors = FALSE + ) + ) + out <- with_mocked_bindings( + read_waterdata_continuous = recorder$fn, + { + read_waterdata_nearest_continuous(targets = character(0)) + } + ) + expect_length(recorder$calls, 1L) + # The single call is the "shape probe" — it uses a trivially-empty time + # range, not a filter. + expect_null(recorder$calls[[1L]]$filter) + expect_equal(nrow(out), 0L) + expect_true("target_time" %in% names(out)) +}) + +# --- Argument validation ----------------------------------------------------- + +test_that("forbidden kwargs raise an informative error", { + target <- as.POSIXct("2023-06-15 10:30:00", tz = "UTC") + for (forbidden in c("time", "filter", "filter_lang")) { + args <- list(targets = target) + args[[forbidden]] <- "anything" + expect_error( + do.call(read_waterdata_nearest_continuous, args), + forbidden, + info = forbidden + ) + } +}) + +test_that("on_tie is validated", { + target <- as.POSIXct("2023-06-15 10:30:00", tz = "UTC") + expect_error( + read_waterdata_nearest_continuous(targets = target, on_tie = "bogus"), + "should be one of" + ) +}) + +test_that("window accepts clock strings, ISO 8601, and programmatic forms", { + # MM:SS + expect_equal(dataRetrieval:::as_window_seconds("07:30"), 450) + expect_equal(dataRetrieval:::as_window_seconds("15:00"), 900) + # HH:MM:SS + expect_equal(dataRetrieval:::as_window_seconds("00:07:30"), 450) + expect_equal(dataRetrieval:::as_window_seconds("01:00:00"), 3600) + # Fractional seconds are allowed. + expect_equal(dataRetrieval:::as_window_seconds("00:01.5"), 1.5) + # ISO 8601 duration. + expect_equal(dataRetrieval:::as_window_seconds("PT7M30S"), 450) + expect_equal(dataRetrieval:::as_window_seconds("PT15M"), 900) + expect_equal(dataRetrieval:::as_window_seconds("PT1H"), 3600) + # Natural-language forms lubridate also accepts. + expect_equal( + dataRetrieval:::as_window_seconds("7 minutes 30 seconds"), + 450 + ) + # Programmatic forms. + expect_equal(dataRetrieval:::as_window_seconds(450), 450) + expect_equal( + dataRetrieval:::as_window_seconds(as.difftime(5, units = "mins")), + 300 + ) + expect_equal( + dataRetrieval:::as_window_seconds(lubridate::minutes(10)), + 600 + ) + # Reject garbage with a clear message. + expect_error(dataRetrieval:::as_window_seconds("bogus"), "MM:SS") +}) diff --git a/tests/testthat/tests_userFriendly_fxns.R b/tests/testthat/tests_userFriendly_fxns.R index 6ec117802..aa198046d 100644 --- a/tests/testthat/tests_userFriendly_fxns.R +++ b/tests/testthat/tests_userFriendly_fxns.R @@ -429,6 +429,156 @@ test_that("Construct USGS urls", { # nolint end }) +context("CQL filter passthrough") +test_that("filter is forwarded verbatim as a query parameter", { + expr <- paste0( + "(time >= '2023-01-06T16:00:00Z' AND time <= '2023-01-06T18:00:00Z') ", + "OR (time >= '2023-01-10T18:00:00Z' AND time <= '2023-01-10T20:00:00Z')" + ) + req <- construct_api_requests( + service = "continuous", + monitoring_location_id = "USGS-07374525", + parameter_code = "72255", + filter = expr + ) + qs <- httr2::url_parse(req$url)$query + expect_equal(qs[["filter"]], expr) +}) + +test_that("filter_lang is sent as the hyphenated URL key filter-lang", { + req <- construct_api_requests( + service = "continuous", + monitoring_location_id = "USGS-07374525", + parameter_code = "72255", + filter = "time >= '2023-01-01T00:00:00Z'", + filter_lang = "cql-text" + ) + qs <- httr2::url_parse(req$url)$query + expect_equal(qs[["filter-lang"]], "cql-text") + expect_false("filter_lang" %in% names(qs)) +}) + +test_that("filter passthrough works for every OGC collection endpoint", { + services <- c( + "daily", + "continuous", + "monitoring-locations", + "time-series-metadata", + "latest-continuous", + "latest-daily", + "field-measurements", + "channel-measurements" + ) + for (service in services) { + req <- construct_api_requests( + service = service, + filter = "value > 0", + filter_lang = "cql-text" + ) + qs <- httr2::url_parse(req$url)$query + expect_equal(qs[["filter"]], "value > 0", info = service) + expect_equal(qs[["filter-lang"]], "cql-text", info = service) + } +}) + +test_that("split_top_level_or splits at top-level OR only", { + expect_equal(dataRetrieval:::split_top_level_or("A OR B OR C"), c("A", "B", "C")) + # case-insensitive + expect_equal(dataRetrieval:::split_top_level_or("A or B Or C"), c("A", "B", "C")) + # parens are respected + expect_equal( + dataRetrieval:::split_top_level_or("(A OR B) OR (C OR D)"), + c("(A OR B)", "(C OR D)") + ) + # quotes are respected + expect_equal( + dataRetrieval:::split_top_level_or("name = 'foo OR bar' OR id = 1"), + c("name = 'foo OR bar'", "id = 1") + ) + # single clause is returned as-is + expect_equal( + dataRetrieval:::split_top_level_or("time >= '2023-01-01T00:00:00Z'"), + "time >= '2023-01-01T00:00:00Z'" + ) +}) + +test_that("chunk_cql_or keeps short expressions as-is", { + expr <- "time >= '2023-01-01T00:00:00Z'" + expect_equal(dataRetrieval:::chunk_cql_or(expr, max_len = 1000), expr) +}) + +test_that("chunk_cql_or splits a long top-level OR chain into fitting chunks", { + clause <- "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr <- paste(rep(clause, 200), collapse = " OR ") + chunks <- dataRetrieval:::chunk_cql_or(expr, max_len = 1000) + expect_true(length(chunks) > 1) + expect_true(all(nchar(chunks) <= 1000)) + rejoined_clauses <- sum(lengths(strsplit(chunks, " OR ", fixed = TRUE))) + expect_equal(rejoined_clauses, 200) +}) + +test_that("chunk_cql_or returns input unchanged when it cannot be split losslessly", { + # A single AND-heavy expression with no top-level OR should be sent as-is + # so the server decides, rather than us mangling it. + big <- paste0("value > 0 AND ", paste(rep("A", 4000), collapse = " ")) + expect_equal(dataRetrieval:::chunk_cql_or(big, max_len = 1000), big) + + # Similarly, when a single clause alone already exceeds the budget. + huge_clause <- paste0("(value > ", paste(rep("9", 6000), collapse = ""), ")") + expr <- paste(huge_clause, "OR (value > 0)") + expect_equal(dataRetrieval:::chunk_cql_or(expr, max_len = 1000), expr) +}) + +test_that("split_top_level_or handles doubled single-quote CQL escape", { + # In CQL-text, a single quote inside a literal is written as '' (two + # consecutive single quotes). The scanner's toggle-on-quote logic + # handles this because there is no content between the two quotes to + # misclassify, but lock the behavior in so a future refactor can't + # regress it. + expr <- "name = 'It''s hot' OR id = 1" + expect_equal( + dataRetrieval:::split_top_level_or(expr), + c("name = 'It''s hot'", "id = 1") + ) +}) + +test_that("effective_filter_budget keeps every produced chunk under the URL byte limit", { + limit <- dataRetrieval:::.WATERDATA_URL_BYTE_LIMIT + args <- list( + service = "continuous", + monitoring_location_id = "USGS-02238500", + parameter_code = "00060" + ) + clause <- "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr <- paste(rep(clause, 300), collapse = " OR ") + budget <- dataRetrieval:::effective_filter_budget(args, expr) + chunks <- dataRetrieval:::chunk_cql_or(expr, max_len = budget) + expect_gt(length(chunks), 1L) + url_bytes <- vapply(chunks, function(ch) { + a <- args + a[["filter"]] <- ch + nchar(do.call(construct_api_requests, a)$url) + }, integer(1)) + expect_true(all(url_bytes <= limit)) +}) + +test_that("non cql-text filter is passed through without chunking", { + # The splitter is cql-text-only; chunking a cql-json expression would + # corrupt it. `construct_api_requests` forwards whatever we give it, + # so the resulting URL has the full filter as a single value. + huge <- paste0("{\"op\":\"or\",\"args\":[", paste(rep("{}", 3000), collapse = ","), "]}") + req <- construct_api_requests( + service = "continuous", + monitoring_location_id = "USGS-02238500", + parameter_code = "00060", + filter = huge, + filter_lang = "cql-json" + ) + qs <- httr2::url_parse(req$url)$query + expect_equal(qs[["filter-lang"]], "cql-json") + expect_equal(qs[["filter"]], huge) +}) + context("Construct WQP urls") test_that("Construct WQP urls", { testthat::skip_on_cran()