Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ jobs:
run: |
sudo apt-get update
sudo apt-get install -y --no-install-recommends \
cmake clang libclang-dev pkg-config protobuf-compiler perl
cmake clang libclang-dev pkg-config protobuf-compiler perl \
libcurl4-openssl-dev libsasl2-dev
- name: Check formatting
run: cargo fmt --all -- --check
- name: Run clippy
Expand All @@ -52,6 +53,7 @@ jobs:
run: |
sudo apt-get update
sudo apt-get install -y --no-install-recommends \
cmake clang libclang-dev pkg-config protobuf-compiler perl
cmake clang libclang-dev pkg-config protobuf-compiler perl \
libcurl4-openssl-dev libsasl2-dev
- name: Run tests
run: cargo test --all-features
4 changes: 2 additions & 2 deletions nodedb-lite/src/storage/encrypted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl<S: StorageEngine> EncryptedStorage<S> {
fn encrypt(&self, ns: Namespace, key: &[u8], plaintext: &[u8]) -> Result<Vec<u8>, LiteError> {
let nonce = Self::derive_nonce(ns, key);
self.cipher
.encrypt(Nonce::from_slice(&nonce), plaintext)
.encrypt(&Nonce::from(nonce), plaintext)
.map_err(|e| LiteError::Storage {
detail: format!("AES-GCM encrypt failed: {e}"),
})
Expand All @@ -160,7 +160,7 @@ impl<S: StorageEngine> EncryptedStorage<S> {
fn decrypt(&self, ns: Namespace, key: &[u8], ciphertext: &[u8]) -> Result<Vec<u8>, LiteError> {
let nonce = Self::derive_nonce(ns, key);
self.cipher
.decrypt(Nonce::from_slice(&nonce), ciphertext)
.decrypt(&Nonce::from(nonce), ciphertext)
.map_err(|e| LiteError::Storage {
detail: format!("AES-GCM decrypt failed (wrong passphrase or corrupted data): {e}"),
})
Expand Down
39 changes: 28 additions & 11 deletions nodedb-sql/src/planner/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,33 @@ fn extract_bucket_interval(func: &ast::Function) -> Result<i64> {
Ok(parse_interval_to_ms(&interval_str))
}

/// Parse an interval string like "1h", "15m", "30s", "1d" to milliseconds.
/// Parse an interval string to milliseconds.
///
/// Accepted forms: `"1h"`, `"15m"`, `"30s"`, `"1d"`, `"1 hour"`, `"15 minutes"`,
/// `"30 seconds"`, `"7 days"`. Plural and singular word forms both work.
fn parse_interval_to_ms(s: &str) -> i64 {
let s = s.trim();
if s.is_empty() {
return 0;
}
let (num_str, suffix) = s.split_at(s.len() - 1);
let num: i64 = num_str.trim().parse().unwrap_or(0);
match suffix {
"s" => num * 1_000,
"m" => num * 60_000,
"h" => num * 3_600_000,
"d" => num * 86_400_000,
_ => {
// Try full string as seconds.
s.parse::<i64>().unwrap_or(0) * 1_000

// Split into numeric part and unit part (handles both "1h" and "1 hour").
let num_end = s
.find(|c: char| !c.is_ascii_digit() && c != '.')
.unwrap_or(s.len());
let num: i64 = s[..num_end].trim().parse().unwrap_or(0);
let unit = s[num_end..].trim();

match unit {
"s" | "sec" | "second" | "seconds" => num * 1_000,
"m" | "min" | "minute" | "minutes" => num * 60_000,
"h" | "hr" | "hour" | "hours" => num * 3_600_000,
"d" | "day" | "days" => num * 86_400_000,
"" => {
// Bare number — treat as seconds.
num * 1_000
}
_ => 0,
}
}

Expand Down Expand Up @@ -260,5 +270,12 @@ mod tests {
assert_eq!(parse_interval_to_ms("15m"), 900_000);
assert_eq!(parse_interval_to_ms("30s"), 30_000);
assert_eq!(parse_interval_to_ms("7d"), 604_800_000);
// Word-form intervals.
assert_eq!(parse_interval_to_ms("1 hour"), 3_600_000);
assert_eq!(parse_interval_to_ms("2 hours"), 7_200_000);
assert_eq!(parse_interval_to_ms("15 minutes"), 900_000);
assert_eq!(parse_interval_to_ms("30 seconds"), 30_000);
assert_eq!(parse_interval_to_ms("1 day"), 86_400_000);
assert_eq!(parse_interval_to_ms("5 min"), 300_000);
}
}
3 changes: 1 addition & 2 deletions nodedb-wal/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
//! `payload_len` includes the 16-byte auth tag.

use aes_gcm::Aes256Gcm;
use aes_gcm::aead::generic_array::GenericArray;
use aes_gcm::aead::{Aead, KeyInit};

use crate::error::{Result, WalError};
Expand Down Expand Up @@ -192,7 +191,7 @@ pub const AUTH_TAG_SIZE: usize = 16;
fn lsn_to_nonce(lsn: u64) -> aes_gcm::Nonce<aes_gcm::aead::consts::U12> {
let mut nonce_bytes = [0u8; 12];
nonce_bytes[..8].copy_from_slice(&lsn.to_le_bytes());
*GenericArray::from_slice(&nonce_bytes)
nonce_bytes.into()
}

#[cfg(test)]
Expand Down
34 changes: 22 additions & 12 deletions nodedb/src/control/planner/sql_plan_convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,8 @@ fn convert_one(
tiered,
} => {
let filter_bytes = serialize_filters(filters)?;
let agg_pairs: Vec<(String, String)> = aggregates
.iter()
.map(|a| (a.function.clone(), a.alias.clone()))
.collect();
let agg_pairs: Vec<(String, String)> =
aggregates.iter().map(agg_expr_to_pair).collect();

// AUTO_TIER: split query across retention tiers if enabled.
if *tiered
Expand Down Expand Up @@ -667,10 +665,7 @@ fn convert_aggregate(
let vshard = VShardId::from_collection(&left_collection);

let group_strs = group_by_to_strings(group_by);
let agg_pairs = aggregates
.iter()
.map(|a| (a.function.clone(), a.alias.clone()))
.collect();
let agg_pairs = aggregates.iter().map(agg_expr_to_pair).collect();

return Ok(vec![PhysicalTask {
tenant_id,
Expand Down Expand Up @@ -700,10 +695,7 @@ fn convert_aggregate(
let having_bytes = serialize_filters(having)?;

let group_strs = group_by_to_strings(group_by);
let agg_pairs = aggregates
.iter()
.map(|a| (a.function.clone(), a.alias.clone()))
.collect();
let agg_pairs = aggregates.iter().map(agg_expr_to_pair).collect();

Ok(vec![PhysicalTask {
tenant_id,
Expand Down Expand Up @@ -734,6 +726,24 @@ fn extract_collection_name(plan: &SqlPlan) -> String {
}
}

/// Convert an `AggregateExpr` to the `(op, field)` pair the executor expects.
///
/// The field is extracted from the first argument (e.g. `elapsed_ms` from
/// `AVG(elapsed_ms)`). Wildcard args produce `"*"`. Falls back to `"*"` when
/// there are no arguments (bare `COUNT`).
fn agg_expr_to_pair(a: &AggregateExpr) -> (String, String) {
let field = a
.args
.first()
.map(|arg| match arg {
SqlExpr::Column { name, .. } => name.clone(),
SqlExpr::Wildcard => "*".into(),
_ => format!("{arg:?}"),
})
.unwrap_or_else(|| "*".into());
(a.function.clone(), field)
}

fn group_by_to_strings(exprs: &[SqlExpr]) -> Vec<String> {
exprs
.iter()
Expand Down
13 changes: 9 additions & 4 deletions nodedb/src/control/security/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ impl VolumeEncryption {
let cipher = Aes256Gcm::new_from_slice(master).map_err(|e| crate::Error::Encryption {
detail: format!("AES-GCM key init failed: {e}"),
})?;
let nonce = Nonce::from_slice(&nonce_bytes);
let nonce = Nonce::from(nonce_bytes);

let ciphertext =
cipher
.encrypt(nonce, dek.as_ref())
.encrypt(&nonce, dek.as_ref())
.map_err(|e| crate::Error::Encryption {
detail: format!("DEK encryption failed: {e}"),
})?;
Expand Down Expand Up @@ -230,11 +230,16 @@ impl VolumeEncryption {
let cipher = Aes256Gcm::new_from_slice(master).map_err(|e| crate::Error::Encryption {
detail: format!("AES-GCM key init failed: {e}"),
})?;
let nonce = Nonce::from_slice(nonce_bytes);
let nonce_arr: [u8; 12] = nonce_bytes
.try_into()
.map_err(|_| crate::Error::Encryption {
detail: "nonce slice is not 12 bytes".into(),
})?;
let nonce = Nonce::from(nonce_arr);

let plaintext =
cipher
.decrypt(nonce, ciphertext)
.decrypt(&nonce, ciphertext)
.map_err(|_| crate::Error::Encryption {
detail: "DEK decryption failed: authentication tag mismatch".into(),
})?;
Expand Down
39 changes: 19 additions & 20 deletions nodedb/src/event/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ pub fn spawn_kafka_task(
};

// Initialize transactional producer if configured.
if config.transactional {
if let Err(e) = producer.init_transactions(Duration::from_secs(10)) {
warn!(
stream = %stream_name,
error = %e,
"failed to init Kafka transactions — falling back to at-least-once"
);
}
if config.transactional
&& let Err(e) = producer.init_transactions(Duration::from_secs(10))
{
warn!(
stream = %stream_name,
error = %e,
"failed to init Kafka transactions — falling back to at-least-once"
);
}

let group_name = format!("_kafka_{stream_name}");
Expand All @@ -99,11 +99,11 @@ pub fn spawn_kafka_task(
let batch_size = events.events.len();

// Begin transaction if configured.
if config.transactional {
if let Err(e) = producer.begin_transaction() {
warn!(error = %e, "Kafka begin_transaction failed");
continue;
}
if config.transactional
&& let Err(e) = producer.begin_transaction()
{
warn!(error = %e, "Kafka begin_transaction failed");
continue;
}

let mut published = 0u32;
Expand Down Expand Up @@ -136,13 +136,12 @@ pub fn spawn_kafka_task(
}

// Commit Kafka transaction.
if config.transactional && published > 0 {
if let Err(e) = producer
if config.transactional && published > 0
&& let Err(e) = producer
.commit_transaction(Duration::from_secs(10))
{
warn!(error = %e, "Kafka commit_transaction failed");
continue;
}
{
warn!(error = %e, "Kafka commit_transaction failed");
continue;
}

// Commit consumer offsets for successfully published events.
Expand Down Expand Up @@ -189,7 +188,7 @@ fn create_producer(

if config.transactional {
client_config.set("enable.idempotence", "true");
client_config.set("transactional.id", &format!("nodedb-kafka-{stream_name}"));
client_config.set("transactional.id", format!("nodedb-kafka-{stream_name}"));
}

client_config
Expand Down