From d6d81d57ff828b330d7db2c845036566571761da Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Wed, 8 Apr 2026 02:32:09 +0800 Subject: [PATCH 1/6] fix(timeseries): repair numeric aggregates and time_bucket() after nodedb-sql migration Fixes #15. The aggregate executor receives (op, field) pairs but all three PhysicalTask construction sites were passing the alias instead of the source field name, so the executor could never locate the column and returned null for every MIN/MAX/SUM/AVG result. parse_interval_to_ms only handled compact suffixes ("1h", "15m") and rejected the word-form intervals that DataFusion normalises SQL INTERVAL literals into ("1 hour", "15 minutes"), causing time_bucket() to bucket everything into epoch-zero and return an empty result set. Changes: - sql_plan_convert: extract agg_expr_to_pair() helper that reads the first argument expression (Column name or Wildcard) and use it at all three PhysicalTask construction sites instead of a.alias - aggregate: extend parse_interval_to_ms to accept singular and plural word-form units (hour/hours, minute/minutes, second/seconds, day/days) alongside the existing compact forms; bare numbers are treated as seconds; unrecognised units return 0 instead of silently mis-routing - aggregate: add unit tests covering word-form parsing --- nodedb-sql/src/planner/aggregate.rs | 39 +++++++++++++------ .../src/control/planner/sql_plan_convert.rs | 34 ++++++++++------ 2 files changed, 50 insertions(+), 23 deletions(-) diff --git a/nodedb-sql/src/planner/aggregate.rs b/nodedb-sql/src/planner/aggregate.rs index d2503671..04653d6d 100644 --- a/nodedb-sql/src/planner/aggregate.rs +++ b/nodedb-sql/src/planner/aggregate.rs @@ -133,23 +133,33 @@ fn extract_bucket_interval(func: &ast::Function) -> Result { Ok(parse_interval_to_ms(&interval_str)) } -/// Parse an interval string like "1h", "15m", "30s", "1d" to milliseconds. +/// Parse an interval string to milliseconds. +/// +/// Accepted forms: `"1h"`, `"15m"`, `"30s"`, `"1d"`, `"1 hour"`, `"15 minutes"`, +/// `"30 seconds"`, `"7 days"`. Plural and singular word forms both work. fn parse_interval_to_ms(s: &str) -> i64 { let s = s.trim(); if s.is_empty() { return 0; } - let (num_str, suffix) = s.split_at(s.len() - 1); - let num: i64 = num_str.trim().parse().unwrap_or(0); - match suffix { - "s" => num * 1_000, - "m" => num * 60_000, - "h" => num * 3_600_000, - "d" => num * 86_400_000, - _ => { - // Try full string as seconds. - s.parse::().unwrap_or(0) * 1_000 + + // Split into numeric part and unit part (handles both "1h" and "1 hour"). + let num_end = s + .find(|c: char| !c.is_ascii_digit() && c != '.') + .unwrap_or(s.len()); + let num: i64 = s[..num_end].trim().parse().unwrap_or(0); + let unit = s[num_end..].trim(); + + match unit { + "s" | "sec" | "second" | "seconds" => num * 1_000, + "m" | "min" | "minute" | "minutes" => num * 60_000, + "h" | "hr" | "hour" | "hours" => num * 3_600_000, + "d" | "day" | "days" => num * 86_400_000, + "" => { + // Bare number — treat as seconds. + num * 1_000 } + _ => 0, } } @@ -260,5 +270,12 @@ mod tests { assert_eq!(parse_interval_to_ms("15m"), 900_000); assert_eq!(parse_interval_to_ms("30s"), 30_000); assert_eq!(parse_interval_to_ms("7d"), 604_800_000); + // Word-form intervals. + assert_eq!(parse_interval_to_ms("1 hour"), 3_600_000); + assert_eq!(parse_interval_to_ms("2 hours"), 7_200_000); + assert_eq!(parse_interval_to_ms("15 minutes"), 900_000); + assert_eq!(parse_interval_to_ms("30 seconds"), 30_000); + assert_eq!(parse_interval_to_ms("1 day"), 86_400_000); + assert_eq!(parse_interval_to_ms("5 min"), 300_000); } } diff --git a/nodedb/src/control/planner/sql_plan_convert.rs b/nodedb/src/control/planner/sql_plan_convert.rs index 7a493234..eac023e2 100644 --- a/nodedb/src/control/planner/sql_plan_convert.rs +++ b/nodedb/src/control/planner/sql_plan_convert.rs @@ -249,10 +249,8 @@ fn convert_one( tiered, } => { let filter_bytes = serialize_filters(filters)?; - let agg_pairs: Vec<(String, String)> = aggregates - .iter() - .map(|a| (a.function.clone(), a.alias.clone())) - .collect(); + let agg_pairs: Vec<(String, String)> = + aggregates.iter().map(agg_expr_to_pair).collect(); // AUTO_TIER: split query across retention tiers if enabled. if *tiered @@ -667,10 +665,7 @@ fn convert_aggregate( let vshard = VShardId::from_collection(&left_collection); let group_strs = group_by_to_strings(group_by); - let agg_pairs = aggregates - .iter() - .map(|a| (a.function.clone(), a.alias.clone())) - .collect(); + let agg_pairs = aggregates.iter().map(agg_expr_to_pair).collect(); return Ok(vec![PhysicalTask { tenant_id, @@ -700,10 +695,7 @@ fn convert_aggregate( let having_bytes = serialize_filters(having)?; let group_strs = group_by_to_strings(group_by); - let agg_pairs = aggregates - .iter() - .map(|a| (a.function.clone(), a.alias.clone())) - .collect(); + let agg_pairs = aggregates.iter().map(agg_expr_to_pair).collect(); Ok(vec![PhysicalTask { tenant_id, @@ -734,6 +726,24 @@ fn extract_collection_name(plan: &SqlPlan) -> String { } } +/// Convert an `AggregateExpr` to the `(op, field)` pair the executor expects. +/// +/// The field is extracted from the first argument (e.g. `elapsed_ms` from +/// `AVG(elapsed_ms)`). Wildcard args produce `"*"`. Falls back to `"*"` when +/// there are no arguments (bare `COUNT`). +fn agg_expr_to_pair(a: &AggregateExpr) -> (String, String) { + let field = a + .args + .first() + .map(|arg| match arg { + SqlExpr::Column { name, .. } => name.clone(), + SqlExpr::Wildcard => "*".into(), + _ => format!("{arg:?}"), + }) + .unwrap_or_else(|| "*".into()); + (a.function.clone(), field) +} + fn group_by_to_strings(exprs: &[SqlExpr]) -> Vec { exprs .iter() From ec826161c9e6db0cb79be856834ff75c0bd9d7df Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Wed, 8 Apr 2026 02:34:43 +0800 Subject: [PATCH 2/6] style(kafka): fix clippy warnings in producer --- nodedb/src/event/kafka/producer.rs | 39 +++++++++++++++--------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/nodedb/src/event/kafka/producer.rs b/nodedb/src/event/kafka/producer.rs index 7ffd36d2..b523fb14 100644 --- a/nodedb/src/event/kafka/producer.rs +++ b/nodedb/src/event/kafka/producer.rs @@ -66,14 +66,14 @@ pub fn spawn_kafka_task( }; // Initialize transactional producer if configured. - if config.transactional { - if let Err(e) = producer.init_transactions(Duration::from_secs(10)) { - warn!( - stream = %stream_name, - error = %e, - "failed to init Kafka transactions — falling back to at-least-once" - ); - } + if config.transactional + && let Err(e) = producer.init_transactions(Duration::from_secs(10)) + { + warn!( + stream = %stream_name, + error = %e, + "failed to init Kafka transactions — falling back to at-least-once" + ); } let group_name = format!("_kafka_{stream_name}"); @@ -99,11 +99,11 @@ pub fn spawn_kafka_task( let batch_size = events.events.len(); // Begin transaction if configured. - if config.transactional { - if let Err(e) = producer.begin_transaction() { - warn!(error = %e, "Kafka begin_transaction failed"); - continue; - } + if config.transactional + && let Err(e) = producer.begin_transaction() + { + warn!(error = %e, "Kafka begin_transaction failed"); + continue; } let mut published = 0u32; @@ -136,13 +136,12 @@ pub fn spawn_kafka_task( } // Commit Kafka transaction. - if config.transactional && published > 0 { - if let Err(e) = producer + if config.transactional && published > 0 + && let Err(e) = producer .commit_transaction(Duration::from_secs(10)) - { - warn!(error = %e, "Kafka commit_transaction failed"); - continue; - } + { + warn!(error = %e, "Kafka commit_transaction failed"); + continue; } // Commit consumer offsets for successfully published events. @@ -189,7 +188,7 @@ fn create_producer( if config.transactional { client_config.set("enable.idempotence", "true"); - client_config.set("transactional.id", &format!("nodedb-kafka-{stream_name}")); + client_config.set("transactional.id", format!("nodedb-kafka-{stream_name}")); } client_config From 458719707d3286311fd14537a01effca739b590b Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Wed, 8 Apr 2026 02:42:59 +0800 Subject: [PATCH 3/6] refactor(wal): replace deprecated GenericArray with .into() for nonce construction --- nodedb-wal/src/crypto.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nodedb-wal/src/crypto.rs b/nodedb-wal/src/crypto.rs index 8a204863..75ec48b5 100644 --- a/nodedb-wal/src/crypto.rs +++ b/nodedb-wal/src/crypto.rs @@ -14,7 +14,6 @@ //! `payload_len` includes the 16-byte auth tag. use aes_gcm::Aes256Gcm; -use aes_gcm::aead::generic_array::GenericArray; use aes_gcm::aead::{Aead, KeyInit}; use crate::error::{Result, WalError}; @@ -192,7 +191,7 @@ pub const AUTH_TAG_SIZE: usize = 16; fn lsn_to_nonce(lsn: u64) -> aes_gcm::Nonce { let mut nonce_bytes = [0u8; 12]; nonce_bytes[..8].copy_from_slice(&lsn.to_le_bytes()); - *GenericArray::from_slice(&nonce_bytes) + nonce_bytes.into() } #[cfg(test)] From 2fc49d15982e322cd12f7d6e849530670e6e4e3a Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Wed, 8 Apr 2026 02:48:18 +0800 Subject: [PATCH 4/6] ci: add libcurl4-openssl-dev and libsasl2-dev to CI system deps Fix rdkafka-sys build failure when compiling with --all-features by adding the missing system libraries required for the Kafka client. --- .github/workflows/test.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index be8c890a..995f4091 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,7 +32,8 @@ jobs: run: | sudo apt-get update sudo apt-get install -y --no-install-recommends \ - cmake clang libclang-dev pkg-config protobuf-compiler perl + cmake clang libclang-dev pkg-config protobuf-compiler perl \ + libcurl4-openssl-dev libsasl2-dev - name: Check formatting run: cargo fmt --all -- --check - name: Run clippy @@ -52,6 +53,7 @@ jobs: run: | sudo apt-get update sudo apt-get install -y --no-install-recommends \ - cmake clang libclang-dev pkg-config protobuf-compiler perl + cmake clang libclang-dev pkg-config protobuf-compiler perl \ + libcurl4-openssl-dev libsasl2-dev - name: Run tests run: cargo test --all-features From 3f66bd257ec55ca7b5fd9c72355f25c79b48f9d7 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Wed, 8 Apr 2026 02:55:55 +0800 Subject: [PATCH 5/6] fix(lite): replace deprecated Nonce::from_slice with Nonce::from in AES-GCM encrypt/decrypt --- nodedb-lite/src/storage/encrypted.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nodedb-lite/src/storage/encrypted.rs b/nodedb-lite/src/storage/encrypted.rs index 5258d907..4f82e1eb 100644 --- a/nodedb-lite/src/storage/encrypted.rs +++ b/nodedb-lite/src/storage/encrypted.rs @@ -151,7 +151,7 @@ impl EncryptedStorage { fn encrypt(&self, ns: Namespace, key: &[u8], plaintext: &[u8]) -> Result, LiteError> { let nonce = Self::derive_nonce(ns, key); self.cipher - .encrypt(Nonce::from_slice(&nonce), plaintext) + .encrypt(&Nonce::from(nonce), plaintext) .map_err(|e| LiteError::Storage { detail: format!("AES-GCM encrypt failed: {e}"), }) @@ -160,7 +160,7 @@ impl EncryptedStorage { fn decrypt(&self, ns: Namespace, key: &[u8], ciphertext: &[u8]) -> Result, LiteError> { let nonce = Self::derive_nonce(ns, key); self.cipher - .decrypt(Nonce::from_slice(&nonce), ciphertext) + .decrypt(&Nonce::from(nonce), ciphertext) .map_err(|e| LiteError::Storage { detail: format!("AES-GCM decrypt failed (wrong passphrase or corrupted data): {e}"), }) From 6187aeebb8ccdd39c298169f90fc9923e01c21ad Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Wed, 8 Apr 2026 03:06:35 +0800 Subject: [PATCH 6/6] fix(security): replace deprecated Nonce::from_slice with Nonce::from in DEK encryption Update encrypt_dek and decrypt_dek to use the non-deprecated Nonce::from constructor. The decrypt path now performs an explicit TryInto conversion to a [u8; 12] array before constructing the nonce, surfacing a proper error if the slice length is wrong. --- nodedb/src/control/security/encryption.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/nodedb/src/control/security/encryption.rs b/nodedb/src/control/security/encryption.rs index a380a5e1..090c3b87 100644 --- a/nodedb/src/control/security/encryption.rs +++ b/nodedb/src/control/security/encryption.rs @@ -185,11 +185,11 @@ impl VolumeEncryption { let cipher = Aes256Gcm::new_from_slice(master).map_err(|e| crate::Error::Encryption { detail: format!("AES-GCM key init failed: {e}"), })?; - let nonce = Nonce::from_slice(&nonce_bytes); + let nonce = Nonce::from(nonce_bytes); let ciphertext = cipher - .encrypt(nonce, dek.as_ref()) + .encrypt(&nonce, dek.as_ref()) .map_err(|e| crate::Error::Encryption { detail: format!("DEK encryption failed: {e}"), })?; @@ -230,11 +230,16 @@ impl VolumeEncryption { let cipher = Aes256Gcm::new_from_slice(master).map_err(|e| crate::Error::Encryption { detail: format!("AES-GCM key init failed: {e}"), })?; - let nonce = Nonce::from_slice(nonce_bytes); + let nonce_arr: [u8; 12] = nonce_bytes + .try_into() + .map_err(|_| crate::Error::Encryption { + detail: "nonce slice is not 12 bytes".into(), + })?; + let nonce = Nonce::from(nonce_arr); let plaintext = cipher - .decrypt(nonce, ciphertext) + .decrypt(&nonce, ciphertext) .map_err(|_| crate::Error::Encryption { detail: "DEK decryption failed: authentication tag mismatch".into(), })?;