diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index be8c890a..995f4091 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,7 +32,8 @@ jobs: run: | sudo apt-get update sudo apt-get install -y --no-install-recommends \ - cmake clang libclang-dev pkg-config protobuf-compiler perl + cmake clang libclang-dev pkg-config protobuf-compiler perl \ + libcurl4-openssl-dev libsasl2-dev - name: Check formatting run: cargo fmt --all -- --check - name: Run clippy @@ -52,6 +53,7 @@ jobs: run: | sudo apt-get update sudo apt-get install -y --no-install-recommends \ - cmake clang libclang-dev pkg-config protobuf-compiler perl + cmake clang libclang-dev pkg-config protobuf-compiler perl \ + libcurl4-openssl-dev libsasl2-dev - name: Run tests run: cargo test --all-features diff --git a/nodedb-lite/src/storage/encrypted.rs b/nodedb-lite/src/storage/encrypted.rs index 5258d907..4f82e1eb 100644 --- a/nodedb-lite/src/storage/encrypted.rs +++ b/nodedb-lite/src/storage/encrypted.rs @@ -151,7 +151,7 @@ impl EncryptedStorage { fn encrypt(&self, ns: Namespace, key: &[u8], plaintext: &[u8]) -> Result, LiteError> { let nonce = Self::derive_nonce(ns, key); self.cipher - .encrypt(Nonce::from_slice(&nonce), plaintext) + .encrypt(&Nonce::from(nonce), plaintext) .map_err(|e| LiteError::Storage { detail: format!("AES-GCM encrypt failed: {e}"), }) @@ -160,7 +160,7 @@ impl EncryptedStorage { fn decrypt(&self, ns: Namespace, key: &[u8], ciphertext: &[u8]) -> Result, LiteError> { let nonce = Self::derive_nonce(ns, key); self.cipher - .decrypt(Nonce::from_slice(&nonce), ciphertext) + .decrypt(&Nonce::from(nonce), ciphertext) .map_err(|e| LiteError::Storage { detail: format!("AES-GCM decrypt failed (wrong passphrase or corrupted data): {e}"), }) diff --git a/nodedb-sql/src/planner/aggregate.rs b/nodedb-sql/src/planner/aggregate.rs index d2503671..04653d6d 100644 --- a/nodedb-sql/src/planner/aggregate.rs +++ b/nodedb-sql/src/planner/aggregate.rs @@ -133,23 +133,33 @@ fn extract_bucket_interval(func: &ast::Function) -> Result { Ok(parse_interval_to_ms(&interval_str)) } -/// Parse an interval string like "1h", "15m", "30s", "1d" to milliseconds. +/// Parse an interval string to milliseconds. +/// +/// Accepted forms: `"1h"`, `"15m"`, `"30s"`, `"1d"`, `"1 hour"`, `"15 minutes"`, +/// `"30 seconds"`, `"7 days"`. Plural and singular word forms both work. fn parse_interval_to_ms(s: &str) -> i64 { let s = s.trim(); if s.is_empty() { return 0; } - let (num_str, suffix) = s.split_at(s.len() - 1); - let num: i64 = num_str.trim().parse().unwrap_or(0); - match suffix { - "s" => num * 1_000, - "m" => num * 60_000, - "h" => num * 3_600_000, - "d" => num * 86_400_000, - _ => { - // Try full string as seconds. - s.parse::().unwrap_or(0) * 1_000 + + // Split into numeric part and unit part (handles both "1h" and "1 hour"). + let num_end = s + .find(|c: char| !c.is_ascii_digit() && c != '.') + .unwrap_or(s.len()); + let num: i64 = s[..num_end].trim().parse().unwrap_or(0); + let unit = s[num_end..].trim(); + + match unit { + "s" | "sec" | "second" | "seconds" => num * 1_000, + "m" | "min" | "minute" | "minutes" => num * 60_000, + "h" | "hr" | "hour" | "hours" => num * 3_600_000, + "d" | "day" | "days" => num * 86_400_000, + "" => { + // Bare number — treat as seconds. + num * 1_000 } + _ => 0, } } @@ -260,5 +270,12 @@ mod tests { assert_eq!(parse_interval_to_ms("15m"), 900_000); assert_eq!(parse_interval_to_ms("30s"), 30_000); assert_eq!(parse_interval_to_ms("7d"), 604_800_000); + // Word-form intervals. + assert_eq!(parse_interval_to_ms("1 hour"), 3_600_000); + assert_eq!(parse_interval_to_ms("2 hours"), 7_200_000); + assert_eq!(parse_interval_to_ms("15 minutes"), 900_000); + assert_eq!(parse_interval_to_ms("30 seconds"), 30_000); + assert_eq!(parse_interval_to_ms("1 day"), 86_400_000); + assert_eq!(parse_interval_to_ms("5 min"), 300_000); } } diff --git a/nodedb-wal/src/crypto.rs b/nodedb-wal/src/crypto.rs index 8a204863..75ec48b5 100644 --- a/nodedb-wal/src/crypto.rs +++ b/nodedb-wal/src/crypto.rs @@ -14,7 +14,6 @@ //! `payload_len` includes the 16-byte auth tag. use aes_gcm::Aes256Gcm; -use aes_gcm::aead::generic_array::GenericArray; use aes_gcm::aead::{Aead, KeyInit}; use crate::error::{Result, WalError}; @@ -192,7 +191,7 @@ pub const AUTH_TAG_SIZE: usize = 16; fn lsn_to_nonce(lsn: u64) -> aes_gcm::Nonce { let mut nonce_bytes = [0u8; 12]; nonce_bytes[..8].copy_from_slice(&lsn.to_le_bytes()); - *GenericArray::from_slice(&nonce_bytes) + nonce_bytes.into() } #[cfg(test)] diff --git a/nodedb/src/control/planner/sql_plan_convert.rs b/nodedb/src/control/planner/sql_plan_convert.rs index 7a493234..eac023e2 100644 --- a/nodedb/src/control/planner/sql_plan_convert.rs +++ b/nodedb/src/control/planner/sql_plan_convert.rs @@ -249,10 +249,8 @@ fn convert_one( tiered, } => { let filter_bytes = serialize_filters(filters)?; - let agg_pairs: Vec<(String, String)> = aggregates - .iter() - .map(|a| (a.function.clone(), a.alias.clone())) - .collect(); + let agg_pairs: Vec<(String, String)> = + aggregates.iter().map(agg_expr_to_pair).collect(); // AUTO_TIER: split query across retention tiers if enabled. if *tiered @@ -667,10 +665,7 @@ fn convert_aggregate( let vshard = VShardId::from_collection(&left_collection); let group_strs = group_by_to_strings(group_by); - let agg_pairs = aggregates - .iter() - .map(|a| (a.function.clone(), a.alias.clone())) - .collect(); + let agg_pairs = aggregates.iter().map(agg_expr_to_pair).collect(); return Ok(vec![PhysicalTask { tenant_id, @@ -700,10 +695,7 @@ fn convert_aggregate( let having_bytes = serialize_filters(having)?; let group_strs = group_by_to_strings(group_by); - let agg_pairs = aggregates - .iter() - .map(|a| (a.function.clone(), a.alias.clone())) - .collect(); + let agg_pairs = aggregates.iter().map(agg_expr_to_pair).collect(); Ok(vec![PhysicalTask { tenant_id, @@ -734,6 +726,24 @@ fn extract_collection_name(plan: &SqlPlan) -> String { } } +/// Convert an `AggregateExpr` to the `(op, field)` pair the executor expects. +/// +/// The field is extracted from the first argument (e.g. `elapsed_ms` from +/// `AVG(elapsed_ms)`). Wildcard args produce `"*"`. Falls back to `"*"` when +/// there are no arguments (bare `COUNT`). +fn agg_expr_to_pair(a: &AggregateExpr) -> (String, String) { + let field = a + .args + .first() + .map(|arg| match arg { + SqlExpr::Column { name, .. } => name.clone(), + SqlExpr::Wildcard => "*".into(), + _ => format!("{arg:?}"), + }) + .unwrap_or_else(|| "*".into()); + (a.function.clone(), field) +} + fn group_by_to_strings(exprs: &[SqlExpr]) -> Vec { exprs .iter() diff --git a/nodedb/src/control/security/encryption.rs b/nodedb/src/control/security/encryption.rs index a380a5e1..090c3b87 100644 --- a/nodedb/src/control/security/encryption.rs +++ b/nodedb/src/control/security/encryption.rs @@ -185,11 +185,11 @@ impl VolumeEncryption { let cipher = Aes256Gcm::new_from_slice(master).map_err(|e| crate::Error::Encryption { detail: format!("AES-GCM key init failed: {e}"), })?; - let nonce = Nonce::from_slice(&nonce_bytes); + let nonce = Nonce::from(nonce_bytes); let ciphertext = cipher - .encrypt(nonce, dek.as_ref()) + .encrypt(&nonce, dek.as_ref()) .map_err(|e| crate::Error::Encryption { detail: format!("DEK encryption failed: {e}"), })?; @@ -230,11 +230,16 @@ impl VolumeEncryption { let cipher = Aes256Gcm::new_from_slice(master).map_err(|e| crate::Error::Encryption { detail: format!("AES-GCM key init failed: {e}"), })?; - let nonce = Nonce::from_slice(nonce_bytes); + let nonce_arr: [u8; 12] = nonce_bytes + .try_into() + .map_err(|_| crate::Error::Encryption { + detail: "nonce slice is not 12 bytes".into(), + })?; + let nonce = Nonce::from(nonce_arr); let plaintext = cipher - .decrypt(nonce, ciphertext) + .decrypt(&nonce, ciphertext) .map_err(|_| crate::Error::Encryption { detail: "DEK decryption failed: authentication tag mismatch".into(), })?; diff --git a/nodedb/src/event/kafka/producer.rs b/nodedb/src/event/kafka/producer.rs index 7ffd36d2..b523fb14 100644 --- a/nodedb/src/event/kafka/producer.rs +++ b/nodedb/src/event/kafka/producer.rs @@ -66,14 +66,14 @@ pub fn spawn_kafka_task( }; // Initialize transactional producer if configured. - if config.transactional { - if let Err(e) = producer.init_transactions(Duration::from_secs(10)) { - warn!( - stream = %stream_name, - error = %e, - "failed to init Kafka transactions — falling back to at-least-once" - ); - } + if config.transactional + && let Err(e) = producer.init_transactions(Duration::from_secs(10)) + { + warn!( + stream = %stream_name, + error = %e, + "failed to init Kafka transactions — falling back to at-least-once" + ); } let group_name = format!("_kafka_{stream_name}"); @@ -99,11 +99,11 @@ pub fn spawn_kafka_task( let batch_size = events.events.len(); // Begin transaction if configured. - if config.transactional { - if let Err(e) = producer.begin_transaction() { - warn!(error = %e, "Kafka begin_transaction failed"); - continue; - } + if config.transactional + && let Err(e) = producer.begin_transaction() + { + warn!(error = %e, "Kafka begin_transaction failed"); + continue; } let mut published = 0u32; @@ -136,13 +136,12 @@ pub fn spawn_kafka_task( } // Commit Kafka transaction. - if config.transactional && published > 0 { - if let Err(e) = producer + if config.transactional && published > 0 + && let Err(e) = producer .commit_transaction(Duration::from_secs(10)) - { - warn!(error = %e, "Kafka commit_transaction failed"); - continue; - } + { + warn!(error = %e, "Kafka commit_transaction failed"); + continue; } // Commit consumer offsets for successfully published events. @@ -189,7 +188,7 @@ fn create_producer( if config.transactional { client_config.set("enable.idempotence", "true"); - client_config.set("transactional.id", &format!("nodedb-kafka-{stream_name}")); + client_config.set("transactional.id", format!("nodedb-kafka-{stream_name}")); } client_config