From d33ad107ccd7e6df1409998a7c2bbde58c70f5d2 Mon Sep 17 00:00:00 2001 From: katieannemills Date: Tue, 5 May 2026 15:14:51 -0400 Subject: [PATCH] first attempt at streaming --- api/Cargo.toml | 1 + api/src/helpers/transforms.rs | 266 ++++++++++++++++------------------ api/src/main.rs | 176 +++++++++++++++------- 3 files changed, 252 insertions(+), 191 deletions(-) diff --git a/api/Cargo.toml b/api/Cargo.toml index 98b91da..ee26a46 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -16,6 +16,7 @@ chrono = "0.4.38" tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] } tokio-stream = "0.1.16" lazy_static = "1.4.0" +async-stream = "0.3" [dev-dependencies] reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } diff --git a/api/src/helpers/transforms.rs b/api/src/helpers/transforms.rs index deda4c9..2449344 100644 --- a/api/src/helpers/transforms.rs +++ b/api/src/helpers/transforms.rs @@ -2,9 +2,14 @@ use super::schema; use super::helpers; use mongodb::bson::DateTime as BsonDateTime; -pub fn transform_timeseries(params: serde_json::Value, ts: Vec, results: Vec) -> Vec { - - // extract query parameters ////////////////////////////////////// +/// Apply the user's `startDate` / `endDate` / `data` parameters to a single +/// timeseries document. Returns `None` if the document was filtered out +/// entirely (e.g. `data=somefield` produced no matching columns). +pub fn transform_timeseries( + params: &serde_json::Value, + ts: &[BsonDateTime], + mut doc: T, +) -> Option { let start_date = params.get("startDate") .and_then(|v| v.as_str()) .and_then(helpers::string2bsondate); @@ -16,116 +21,113 @@ pub fn transform_timeseries(params: serde_json: let data: Vec = params.get("data") .and_then(|v| v.as_str()) .map(|s| s.split(',').map(|s| s.to_string()).collect()) - .unwrap_or_else(Vec::new); - - // apply appropriate transforms //////////////////////////////////// - let mut r = results.clone(); + .unwrap_or_default(); if start_date.is_some() || end_date.is_some() { - r = slice_timerange(start_date, end_date, ts, r); + slice_timerange(start_date, end_date, ts, &mut doc); } - r = slice_data(data, r); - - return r; + slice_data(&data, doc) } -pub fn slice_timerange(start_date: Option, end_date: Option, ts: Vec, mut results: Vec) -> Vec { - - let start_index = start_date.and_then(|start_date| { - ts.iter().position(|&t| t >= start_date) - }).unwrap_or(0); - - let end_index = end_date.and_then(|end_date| { - ts.iter().rposition(|&t| t < end_date).map(|idx| idx + 1) - }).unwrap_or(ts.len()); +/// Slice the document's data columns and timeseries field to the time window +/// `[start_date, end_date)`. The window is computed against `ts` (the +/// timeseries metadata cached at startup); each variable's per-timestep +/// values are sliced in lockstep. Mutates `doc` in place. +pub fn slice_timerange( + start_date: Option, + end_date: Option, + ts: &[BsonDateTime], + doc: &mut T, +) { + let start_index = start_date + .and_then(|sd| ts.iter().position(|&t| t >= sd)) + .unwrap_or(0); + + let end_index = end_date + .and_then(|ed| ts.iter().rposition(|&t| t < ed).map(|i| i + 1)) + .unwrap_or(ts.len()); let time_window: Vec = ts[start_index..end_index] .iter() .map(|t| helpers::bsondate2string(t)) .collect(); - for result in &mut results { - let data = result.data(); - *data = data.iter().map(|inner_vec| { - let slice = &inner_vec[start_index..end_index]; - slice.to_vec() - }).collect(); + let data = doc.data(); + *data = data + .iter() + .map(|inner| inner[start_index..end_index].to_vec()) + .collect(); - match result.timeseries() { - Some(timeseries) => *timeseries = time_window.clone(), - None => result.set_timeseries(time_window.clone()), - } + match doc.timeseries() { + Some(timeseries) => *timeseries = time_window, + None => doc.set_timeseries(time_window), } - - results - } -// todo: this will probably be generic over more than just Timeseries -pub fn slice_data(data: Vec, mut results: Vec) -> Vec { - +/// Apply the `data=` query parameter to a single document. Behaviour mirrors +/// the previous Vec-based implementation: +/// +/// - empty `data`: clears the document's `data` field but keeps the row. +/// - `data` contains "all": leaves everything untouched. +/// - otherwise: filters the data columns down to the named variables; +/// returns `None` if no requested variables match (caller drops the row). +/// - if `data` also contains "except_data_values", clears the data field +/// after column-filtering, but keeps the row. +pub fn slice_data( + data: &[String], + mut doc: T, +) -> Option { if data.is_empty() { - for result in &mut results { - result.set_data(Vec::new()); - } - } else if data.contains(&"all".to_string()) { - return results; - } else { - for result in &mut results { - let data_info = result.data_info(); - - let indexes: Vec = data.iter() - .filter_map(|item| data_info.0.iter().position(|x| x == item)) - .collect(); - - // only keep the requested data - let filtered_data: Vec> = indexes.iter() - .filter_map(|&i| result.data().get(i).cloned()) - .collect(); - result.set_data(filtered_data); - - // create a custom data_info to go with this reduced data, and add it to the result object - let filtered_data_info: (Vec, Vec, Vec>) = ( - indexes.iter().filter_map(|&i| data_info.0.get(i).cloned()).collect(), - data_info.1.clone(), - indexes.iter().filter_map(|&i| data_info.2.get(i).cloned()).collect(), - ); - result.set_data_info(filtered_data_info); - } + doc.set_data(Vec::new()); + return Some(doc); + } - // if all the data is empty, remove the result - let mut i = 0; - while i != results.len() { - if results[i].data().is_empty() { - results.remove(i); - } else { - i += 1; - } - } + if data.iter().any(|s| s == "all") { + return Some(doc); + } - // if we set except_data_values, drop the data from every result - if data.contains(&"except_data_values".to_string()) { - for result in &mut results { - result.set_data(Vec::new()); - } - } + // Specific fields requested — filter columns. + let data_info = doc.data_info(); + let indexes: Vec = data + .iter() + .filter_map(|item| data_info.0.iter().position(|x| x == item)) + .collect(); + + let filtered_data: Vec> = indexes + .iter() + .filter_map(|&i| doc.data().get(i).cloned()) + .collect(); + doc.set_data(filtered_data); + + let filtered_data_info: (Vec, Vec, Vec>) = ( + indexes.iter().filter_map(|&i| data_info.0.get(i).cloned()).collect(), + data_info.1.clone(), + indexes.iter().filter_map(|&i| data_info.2.get(i).cloned()).collect(), + ); + doc.set_data_info(filtered_data_info); + + // No matching columns -> caller drops the row. + if doc.data().is_empty() { + return None; } - results -} + // except_data_values clears data after column-filtering. + if data.iter().any(|s| s == "except_data_values") { + doc.set_data(Vec::new()); + } -pub fn timeseries_stub(results: Vec) -> Vec { - let r = results.iter().map(|result| { - schema::TimeseriesStub { - _id: result._id(), - longitude: result.longitude(), - latitude: result.latitude(), - level: result.level(), - metadata: result.metadata(), - } - }).collect(); + Some(doc) +} - r +/// Project a single timeseries document down to its summary stub. +pub fn timeseries_stub(result: &T) -> schema::TimeseriesStub { + schema::TimeseriesStub { + _id: result._id(), + longitude: result.longitude(), + latitude: result.latitude(), + level: result.level(), + metadata: result.metadata(), + } } #[cfg(test)] @@ -166,11 +168,10 @@ mod tests { } fn ts(months: &[u32]) -> Vec { - // Build a BSON date for each (1st of month, year 2020) + // BSON date for the 1st of each named month in 2020. months .iter() .map(|&m| { - // milliseconds since epoch for 2020-{m:02}-01T00:00:00Z, computed naively let s = format!("2020-{:02}-01T00:00:00Z", m); let dt = chrono::DateTime::parse_from_rfc3339(&s).unwrap(); BsonDateTime::from_millis(dt.timestamp_millis()) @@ -183,8 +184,7 @@ mod tests { #[test] fn slice_timerange_inclusive_start_exclusive_end() { let timeseries = ts(&[1, 2, 3, 4]); // Jan, Feb, Mar, Apr - // 2 variables, 4 timestamps each - let r = make_bsose( + let mut doc = make_bsose( "doc1", vec![vec![1.0, 2.0, 3.0, 4.0], vec![10.0, 20.0, 30.0, 40.0]], &["temp", "salinity"], @@ -193,11 +193,10 @@ mod tests { let start = helpers::string2bsondate("2020-02-01T00:00:00Z"); let end = helpers::string2bsondate("2020-04-01T00:00:00Z"); // exclusive - let mut out = slice_timerange(start, end, timeseries, vec![r]); + slice_timerange(start, end, ×eries, &mut doc); // Expect indexes 1..3 -> Feb, Mar - assert_eq!(out.len(), 1); - assert_eq!(*out[0].data(), vec![vec![2.0, 3.0], vec![20.0, 30.0]]); - let ts_field = out[0].timeseries().unwrap(); + assert_eq!(*doc.data(), vec![vec![2.0, 3.0], vec![20.0, 30.0]]); + let ts_field = doc.timeseries().unwrap(); assert_eq!(ts_field.len(), 2); assert!(ts_field[0].starts_with("2020-02-01")); assert!(ts_field[1].starts_with("2020-03-01")); @@ -206,81 +205,75 @@ mod tests { #[test] fn slice_timerange_no_dates_keeps_full_range() { let timeseries = ts(&[1, 2, 3]); - let r = make_bsose( + let mut doc = make_bsose( "doc1", vec![vec![1.0, 2.0, 3.0]], &["temp"], ); - let mut out = slice_timerange(None, None, timeseries, vec![r]); - assert_eq!(*out[0].data(), vec![vec![1.0, 2.0, 3.0]]); + slice_timerange(None, None, ×eries, &mut doc); + assert_eq!(*doc.data(), vec![vec![1.0, 2.0, 3.0]]); } // ---- slice_data ---------------------------------------------------------- #[test] fn slice_data_empty_request_drops_data() { - let r = make_bsose( + let doc = make_bsose( "doc1", vec![vec![1.0, 2.0], vec![3.0, 4.0]], &["temp", "salinity"], ); - let mut out = slice_data(vec![], vec![r]); - // when no data params, slice_data drops the data — but the result row - // is preserved (the empty-data removal only applies in the "specific - // fields" branch). - assert_eq!(out.len(), 1); - assert!(out[0].data().is_empty()); + let mut out = slice_data(&[], doc).expect("empty data param keeps the row"); + assert!(out.data().is_empty()); } #[test] fn slice_data_all_keeps_everything() { - let r = make_bsose( + let doc = make_bsose( "doc1", vec![vec![1.0, 2.0], vec![3.0, 4.0]], &["temp", "salinity"], ); - let mut out = slice_data(vec!["all".to_string()], vec![r]); - assert_eq!(*out[0].data(), vec![vec![1.0, 2.0], vec![3.0, 4.0]]); + let mut out = slice_data(&["all".to_string()], doc).unwrap(); + assert_eq!(*out.data(), vec![vec![1.0, 2.0], vec![3.0, 4.0]]); } #[test] fn slice_data_specific_field_filters_columns() { - let r = make_bsose( + let doc = make_bsose( "doc1", vec![vec![1.0, 2.0], vec![3.0, 4.0]], &["temp", "salinity"], ); - let mut out = slice_data(vec!["salinity".to_string()], vec![r]); - assert_eq!(out.len(), 1); - assert_eq!(*out[0].data(), vec![vec![3.0, 4.0]]); + let mut out = slice_data(&["salinity".to_string()], doc).unwrap(); + assert_eq!(*out.data(), vec![vec![3.0, 4.0]]); } #[test] fn slice_data_unknown_field_drops_result() { - let r = make_bsose( + let doc = make_bsose( "doc1", vec![vec![1.0, 2.0]], &["temp"], ); - let out = slice_data(vec!["nonexistent".to_string()], vec![r]); - // Filtered data is empty -> the result row is removed entirely. - assert!(out.is_empty()); + let out = slice_data(&["nonexistent".to_string()], doc); + assert!(out.is_none(), "no matching columns -> row is dropped"); } #[test] fn slice_data_except_data_values_clears_after_filtering() { - let r = make_bsose( + let doc = make_bsose( "doc1", vec![vec![1.0, 2.0]], &["temp"], ); let mut out = slice_data( - vec!["temp".to_string(), "except_data_values".to_string()], - vec![r], - ); - assert_eq!(out.len(), 1); - assert!(out[0].data().is_empty()); + &["temp".to_string(), "except_data_values".to_string()], + doc, + ) + .unwrap(); + assert!(out.data().is_empty()); } // ---- transform_timeseries (full pipeline) -------------------------------- @@ -288,7 +281,7 @@ mod tests { #[test] fn transform_timeseries_combines_time_and_data_slices() { let timeseries = ts(&[1, 2, 3, 4]); - let r = make_bsose( + let doc = make_bsose( "doc1", vec![vec![1.0, 2.0, 3.0, 4.0], vec![10.0, 20.0, 30.0, 40.0]], &["temp", "salinity"], @@ -300,26 +293,23 @@ mod tests { "data": "salinity", }); - let mut out = transform_timeseries(params, timeseries, vec![r]); - assert_eq!(out.len(), 1); - assert_eq!(*out[0].data(), vec![vec![20.0, 30.0]]); + let mut out = transform_timeseries(¶ms, ×eries, doc).unwrap(); + assert_eq!(*out.data(), vec![vec![20.0, 30.0]]); } // ---- timeseries_stub ----------------------------------------------------- #[test] fn timeseries_stub_projects_summary_fields() { - let r = make_bsose( + let doc = make_bsose( "doc1", vec![vec![1.0, 2.0]], &["temp"], ); - let stubs = timeseries_stub(vec![r]); - assert_eq!(stubs.len(), 1); - assert_eq!(stubs[0]._id, "doc1"); - assert!((stubs[0].longitude - 10.0).abs() < 1e-9); - assert!((stubs[0].latitude - 20.0).abs() < 1e-9); - assert!((stubs[0].level - 5.0).abs() < 1e-9); + let stub = timeseries_stub(&doc); + assert_eq!(stub._id, "doc1"); + assert!((stub.longitude - 10.0).abs() < 1e-9); + assert!((stub.latitude - 20.0).abs() < 1e-9); + assert!((stub.level - 5.0).abs() < 1e-9); } } - diff --git a/api/src/main.rs b/api/src/main.rs index aa4e841..42df6ec 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -24,9 +24,11 @@ use once_cell::sync::Lazy; use std::sync::Mutex; use futures::stream::StreamExt; use std::env; +use std::convert::Infallible; use serde::de::DeserializeOwned; use mongodb::bson::DateTime; use std::collections::HashSet; +use async_stream::stream; static CLIENT: Lazy>> = Lazy::new(|| Mutex::new(None)); static TIMESERIES: Lazy>>> = Lazy::new(|| Mutex::new(None)); @@ -35,12 +37,6 @@ static TIMESERIES: Lazy>>> = Lazy::new(|| Mutex::new( async fn search_data_schema(query_params: web::Query) -> impl Responder { let params = query_params.into_inner(); - let page: i64 = params.get("page") - .and_then(|v| v.as_str()) - .and_then(|s| s.parse::().ok()) - .unwrap_or(0); - //let page_size = 1000; - // validate query params //////////////////////////////////////// match helpers::validate_query_params(¶ms) { Ok(_) => {}, @@ -50,40 +46,22 @@ async fn search_data_schema(query_params: web::Query) -> impl // construct filter from query params ////////////////////////// let filter = filters::filter_timeseries(params.clone()); - // Search for documents with matching filters ////////////////// - let options_builder = { - FindOptions::builder() - //.sort(mongodb::bson::doc! { "_id": 1 }) - //.skip(Some((page * page_size) as u64)) - //.limit(page_size) - }; - - let mut cursor = generate_cursor::("argo", "bsose", filter, Some(options_builder.build())).await.unwrap(); - - // extract results from db ////////////////////////////////////// - let mut results = Vec::new(); - - while let Some(result) = cursor.next().await { - match result { - Ok(document) => { - results.push(document); - }, - Err(e) => { - eprintln!("Error: {}", e); - return HttpResponse::InternalServerError().finish(); - } + // open the cursor ////////////////////////////////////////////// + let options = FindOptions::builder().build(); + let mut cursor = match generate_cursor::("argo", "bsose", filter, Some(options)).await { + Ok(c) => c, + Err(e) => { + eprintln!("Error opening cursor: {}", e); + return HttpResponse::InternalServerError().finish(); } - } + }; - // transform results //////////////////////////////////////////// + // grab the cached timeseries vector once let timeseries = { let ts = TIMESERIES.lock().unwrap(); ts.clone().unwrap() }; - let munged_results = transforms::transform_timeseries(params.clone(), timeseries, results); - - // return results /////////////////////////////////////////////// let compression: Option = params.get("compression") .and_then(|v| v.as_str()) .map(|s| s.to_string()); @@ -92,31 +70,123 @@ async fn search_data_schema(query_params: web::Query) -> impl .and_then(|v| v.as_str()) .map(|s| s.to_string()); - if let Some(compression) = compression { - if compression == "minimal" { - let r = transforms::timeseries_stub(munged_results.clone()); - helpers::create_response(r) - } else { - helpers::create_response(munged_results) + // ------------------------------------------------------------------- + // batchmeta: drain the bsose cursor, but only keep the (small) set of + // unique metadata IDs in memory. Then fetch the metadata documents and + // return them as a normal JSON array. Worst-case memory is bounded by + // the number of distinct metadata ids, not the number of bsose hits. + // ------------------------------------------------------------------- + if batchmeta.is_some() { + let mut unique_metadata: HashSet = HashSet::new(); + while let Some(result) = cursor.next().await { + match result { + Ok(doc) => { + if let Some(t) = transforms::transform_timeseries(¶ms, ×eries, doc) { + for m in t.metadata.iter() { + unique_metadata.insert(m.clone()); + } + } + } + Err(e) => { + eprintln!("Cursor error: {}", e); + return HttpResponse::InternalServerError().finish(); + } + } + } + if unique_metadata.is_empty() { + return helpers::create_response::(vec![]); } - } else if let Some(_batchmeta) = batchmeta { - let unique_metadata: HashSet<_> = munged_results.iter() - .flat_map(|item| item.metadata.clone()) - .collect(); - - let filter = mongodb::bson::doc! { - "_id": { - "$in": unique_metadata.into_iter().collect::>() + let meta_filter = mongodb::bson::doc! { + "_id": { "$in": unique_metadata.into_iter().collect::>() } + }; + let meta_cursor = match generate_cursor::("argo", "timeseriesMeta", meta_filter, None).await { + Ok(c) => c, + Err(e) => { + eprintln!("Error opening metadata cursor: {}", e); + return HttpResponse::InternalServerError().finish(); } }; + let results: Vec<_> = meta_cursor.map(|doc| doc.unwrap()).collect().await; + return helpers::create_response(results); + } - let cursor = generate_cursor::("argo", "timeseriesMeta", filter, None).await.unwrap(); - let results: Vec<_> = cursor.map(|doc| doc.unwrap()).collect().await; - - helpers::create_response(results) - } else { - helpers::create_response(munged_results) + // ------------------------------------------------------------------- + // Default and compression=minimal both stream the bsose cursor through + // the per-document transforms straight to the HTTP response, never + // materializing the full result set in memory. + // + // We do still need to peek ahead until we've found at least one doc + // that survives transformation, so we can preserve the existing + // 404-on-empty contract. Once we have one survivor in hand, we open + // the streamed JSON array `[`, emit it, and continue draining the + // cursor doc-by-doc. + // ------------------------------------------------------------------- + let is_minimal = matches!(compression.as_deref(), Some("minimal")); + + let mut first_doc: Option = None; + while let Some(result) = cursor.next().await { + match result { + Ok(doc) => { + if let Some(t) = transforms::transform_timeseries(¶ms, ×eries, doc) { + first_doc = Some(t); + break; + } + } + Err(e) => { + eprintln!("Cursor error: {}", e); + return HttpResponse::InternalServerError().finish(); + } + } } + + let first_doc = match first_doc { + Some(d) => d, + None => return helpers::create_response::(vec![]), + }; + + // Stream owns: cursor, params, timeseries, first_doc, is_minimal. + // Cursor errors mid-stream are logged and end the stream; we cannot + // change the HTTP status after bytes have been sent, so we close the + // JSON array cleanly and let the caller see whatever they already got. + let body = stream! { + yield Ok::<_, Infallible>(web::Bytes::from_static(b"[")); + + // Project + serialize the buffered first doc. + let first_bytes = if is_minimal { + let stub = transforms::timeseries_stub(&first_doc); + serde_json::to_vec(&stub).expect("serializing one stub should not fail") + } else { + serde_json::to_vec(&first_doc).expect("serializing one bsose doc should not fail") + }; + yield Ok(web::Bytes::from(first_bytes)); + + while let Some(result) = cursor.next().await { + match result { + Ok(doc) => { + if let Some(t) = transforms::transform_timeseries(¶ms, ×eries, doc) { + let bytes = if is_minimal { + let stub = transforms::timeseries_stub(&t); + serde_json::to_vec(&stub).expect("serializing one stub should not fail") + } else { + serde_json::to_vec(&t).expect("serializing one bsose doc should not fail") + }; + yield Ok(web::Bytes::from_static(b",")); + yield Ok(web::Bytes::from(bytes)); + } + } + Err(e) => { + eprintln!("Cursor error during stream: {}", e); + break; + } + } + } + + yield Ok(web::Bytes::from_static(b"]")); + }; + + HttpResponse::Ok() + .content_type("application/json") + .streaming(body) } #[actix_web::main]