From f435f7064504931f6be1d4005d8b814dbf49018f Mon Sep 17 00:00:00 2001 From: Peyton Date: Mon, 9 Mar 2026 11:18:46 -0700 Subject: [PATCH 1/5] feat(kvp): add store trait and Hyper-V KVP storage crate --- Cargo.toml | 1 + doc/libazurekvp.md | 166 ++++------ libazureinit-kvp/Cargo.toml | 20 ++ libazureinit-kvp/src/hyperv.rs | 578 +++++++++++++++++++++++++++++++++ libazureinit-kvp/src/lib.rs | 121 +++++++ 5 files changed, 785 insertions(+), 101 deletions(-) create mode 100644 libazureinit-kvp/Cargo.toml create mode 100644 libazureinit-kvp/src/hyperv.rs create mode 100644 libazureinit-kvp/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 31693d8e..83d7e630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ path = "tests/functional_tests.rs" [workspace] members = [ "libazureinit", + "libazureinit-kvp", ] [features] diff --git a/doc/libazurekvp.md b/doc/libazurekvp.md index 476bdcb1..8b34fb54 100644 --- a/doc/libazurekvp.md +++ b/doc/libazurekvp.md @@ -1,126 +1,90 @@ -# Azure-init Tracing System +# `libazureinit-kvp` -## Overview +`libazureinit-kvp` is the storage layer for Hyper-V KVP (Key-Value Pair) +pool files used by Azure guests. -Azure-init implements a comprehensive tracing system that captures detailed information about the provisioning process. -This information is crucial for monitoring, debugging, and troubleshooting VM provisioning issues in Azure environments. -The tracing system is built on a multi-layered architecture that provides flexibility and robustness. +It defines: +- `KvpStore`: storage trait with explicit read/write/delete semantics. +- `HyperVKvpStore`: production implementation backed by the Hyper-V + binary pool file format. +- `KvpLimits`: exported key/value byte limits for Hyper-V and Azure. -## Architecture +## Record Format -The tracing architecture consists of four specialized layers, each handling a specific aspect of the tracing process: +The Hyper-V pool file record format is fixed width: +- Key field: 512 bytes +- Value field: 2048 bytes +- Total record size: 2560 bytes -### 1. EmitKVPLayer +Records are appended to the file and zero-padded to fixed widths. -**Purpose**: Processes spans and events by capturing metadata, generating key-value pairs (KVPs), and writing to Hyper-V's data exchange file. +## Store Semantics -**Key Functions**: -- Captures span lifecycle events (creation, entry, exit, closing) -- Processes emitted events within spans -- Formats data as KVPs for Hyper-V consumption -- Writes encoded data to `/var/lib/hyperv/.kvp_pool_1` +### `write(key, value)` -Additionally, events emitted with a `health_report` field are written as special provisioning reports using the key `PROVISIONING_REPORT`. +- Append-only behavior: each call appends one new record. +- Duplicate keys are allowed in the file. +- Returns an error when: + - key is empty + - key byte length exceeds `max_key_size` + - value byte length exceeds `max_value_size` + - an I/O error occurs +- Oversized values are rejected by the store (no silent truncation). + Higher layers are responsible for chunking/splitting when required. -**Integration with Azure**: -- The `/var/lib/hyperv/.kvp_pool_1` file is monitored by the Hyper-V `hv_kvp_daemon` service -- This enables key metrics and logs to be transferred from the VM to the Azure platform -- Administrators can access this data through the Azure portal or API +### `read(key)` -### 2. OpenTelemetryLayer +- Scans records and returns the value from the most recent matching key + (last-write-wins). +- Returns `Ok(None)` when the key is missing or file does not exist. -**Purpose**: Propagates tracing context and prepares span data for export. +### `entries()` -**Key Functions**: -- Maintains distributed tracing context across service boundaries -- Exports standardized trace data to compatible backends -- Enables integration with broader monitoring ecosystems +- Returns `HashMap`. +- Deduplicates duplicate keys using last-write-wins, matching `read`. +- This exposes a logical unique-key view even though the file itself is + append-only and may contain multiple records per key. -### 3. Stderr Layer +### `delete(key)` -**Purpose**: Formats and logs trace data to stderr. +- Rewrites the file without any matching key records. +- Returns `true` if at least one record was removed, else `false`. -**Key Functions**: -- Provides human-readable logging for immediate inspection -- Supports debugging during development -- Captures trace events even when other layers might fail +## Truncate Semantics (`truncate_if_stale`) -### 4. File Layer +`HyperVKvpStore::truncate_if_stale` clears stale records from previous +boots by comparing file `mtime` to the current boot timestamp. -**Purpose**: Writes formatted logs to a file (default path: `/var/log/azure-init.log`). +- If file predates boot: truncate to zero length. +- If file is current: leave unchanged. +- If lock contention occurs (`WouldBlock`): return `Ok(())` and skip. +- Non-contention lock failures are returned as errors. -**Key Functions**: -- Provides a persistent log for post-provisioning inspection -- Uses file permissions `0600` when possible -- Log level controlled by `AZURE_INIT_LOG` (defaults to `info` for the file layer) +## Limits and Azure Compatibility -## How the Layers Work Together +`KvpLimits` is exported so callers (including diagnostics layers) can +enforce and reuse exact bounds. -Despite operating independently, these layers collaborate to provide comprehensive tracing: +- `KvpLimits::hyperv()` + - `max_key_size = 512` + - `max_value_size = 2048` +- `KvpLimits::azure()` + - `max_key_size = 512` + - `max_value_size = 1022` (UTF-16: 511 characters + null terminator) -1. **Independent Processing**: Each layer processes spans and events without dependencies on other layers -2. **Ordered Execution**: Layers are executed in the order they are registered in `setup_layers` (stderr, OpenTelemetry, KVP if enabled, file if available) -3. **Complementary Functions**: Each layer serves a specific purpose in the tracing ecosystem: - - `EmitKVPLayer` focuses on Azure Hyper-V integration - - `OpenTelemetryLayer` handles standardized tracing and exports - - `Stderr Layer` provides immediate visibility for debugging +Why Azure limit is lower for values: +- Hyper-V record format allows 2048-byte values. +- Azure host handling is stricter; values beyond 1022 bytes are + silently truncated by host-side consumers. +- For Azure VMs, use `KvpLimits::azure()` and rely on higher-level + chunking when larger payloads must be preserved. -### Configuration +## Record Count Behavior -The tracing system's behavior is controlled through configuration files and environment variables, allowing more control over what data is captured and where it's sent: +There is no explicit record-count cap in this storage layer. +The file grows with each append until external constraints (disk space, +retention policy, or caller behavior) are applied. -- `telemetry.kvp_diagnostics` (config): Enables/disables KVP emission. Default: `true`. -- `telemetry.kvp_filter` (config): Optional `EnvFilter`-style directives to select which spans/events go to KVP. -- `azure_init_log_path.path` (config): Target path for the file layer. Default: `/var/log/azure-init.log`. -- `AZURE_INIT_KVP_FILTER` (env): Overrides `telemetry.kvp_filter`. Precedence: env > config > default. -- `AZURE_INIT_LOG` (env): Controls stderr and file fmt layers’ levels (defaults: stderr=`error`, file=`info`). +## References -The KVP layer uses a conservative default filter aimed at essential provisioning signals; adjust that via the settings above as needed. -For more on how to use these configuration variables, see the [configuration documentation](./configuration.md#complete-configuration-example). - -## Practical Usage - -### Instrumenting Functions - -To instrument code with tracing, use the `#[instrument]` attribute on functions: - -```rust -use tracing::{instrument, Level, event}; - -#[instrument(fields(user_id = ?user.id))] -async fn provision_user(user: User) -> Result<(), Error> { - event!(Level::INFO, "Starting user provisioning"); - - // Function logic - - event!(Level::INFO, "User provisioning completed successfully"); - Ok(()) -} -``` - -### Emitting Events - -To record specific points within a span: - -```rust -use tracing::{event, Level}; - -fn configure_ssh_keys(user: &str, keys: &[String]) { - event!(Level::INFO, user = user, key_count = keys.len(), "Configuring SSH keys"); - - for (i, key) in keys.iter().enumerate() { - event!(Level::DEBUG, user = user, key_index = i, "Processing SSH key"); - // Process each key - } - - event!(Level::INFO, user = user, "SSH keys configured successfully"); -} -``` - -## Reference Documentation - -For more details on how the Hyper-V Data Exchange Service works, refer to the official documentation: -[Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) - -For OpenTelemetry integration details: -[OpenTelemetry for Rust](https://opentelemetry.io/docs/instrumentation/rust/) \ No newline at end of file +- [Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) \ No newline at end of file diff --git a/libazureinit-kvp/Cargo.toml b/libazureinit-kvp/Cargo.toml new file mode 100644 index 00000000..6d5a8106 --- /dev/null +++ b/libazureinit-kvp/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "libazureinit-kvp" +version = "0.1.0" +edition = "2021" +rust-version = "1.88" +repository = "https://github.com/Azure/azure-init/" +homepage = "https://github.com/Azure/azure-init/" +license = "MIT" +description = "Hyper-V KVP (Key-Value Pair) storage library for azure-init." + +[dependencies] +fs2 = "0.4" +sysinfo = "0.38" + +[dev-dependencies] +tempfile = "3" + +[lib] +name = "libazureinit_kvp" +path = "src/lib.rs" diff --git a/libazureinit-kvp/src/hyperv.rs b/libazureinit-kvp/src/hyperv.rs new file mode 100644 index 00000000..c0be3429 --- /dev/null +++ b/libazureinit-kvp/src/hyperv.rs @@ -0,0 +1,578 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Hyper-V-backed [`KvpStore`] implementation. +//! +//! ## Record format +//! - Fixed-size records of [`RECORD_SIZE`] bytes (512-byte key + +//! 2,048-byte value), zero-padded on disk. +//! - No record-count header or explicit record cap in this layer. +//! +//! ## Behavior summary +//! - **`write`**: append-only; one record appended per call. +//! - **`read`**: last-write-wins for duplicate keys. +//! - **`entries`**: returns a deduplicated `HashMap` with last-write-wins. +//! - **`delete`**: rewrites file and removes all records for the key. +//! - **`truncate_if_stale`**: truncates if file predates boot; on lock +//! contention (`WouldBlock`) it returns `Ok(())` and skips. +//! +//! ## Limits +//! Writes validate key/value byte lengths using [`KvpLimits`] and return +//! errors for empty/oversized keys or oversized values. The on-disk +//! format is always 512 + 2,048 bytes; limits only constrain what may +//! be written (for example, Azure's 1,024-byte value limit). +//! +//! Higher layers are responsible for splitting/chunking oversized +//! diagnostics payloads before calling this store. +//! +//! ## Reference +//! - [Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) + +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::io::{self, ErrorKind, Read, Seek, Write}; +use std::os::unix::fs::MetadataExt; +use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use fs2::FileExt; +use sysinfo::System; + +use crate::{ + KvpLimits, KvpStore, HYPERV_MAX_KEY_BYTES, HYPERV_MAX_VALUE_BYTES, +}; + +/// Key field width in the on-disk record format (bytes). +const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = HYPERV_MAX_KEY_BYTES; + +/// Value field width in the on-disk record format (bytes). +const HV_KVP_EXCHANGE_MAX_VALUE_SIZE: usize = HYPERV_MAX_VALUE_BYTES; + +/// Total size of one on-disk record (key + value). +const RECORD_SIZE: usize = + HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE; + +/// Hyper-V KVP pool file store. +/// +/// Reads and writes the binary Hyper-V KVP format: fixed-size records +/// of [`RECORD_SIZE`] bytes (512-byte key + 2,048-byte value) with +/// flock-based concurrency control. +/// +/// Constructed via [`HyperVKvpStore::new`] with a file path and a +/// [`KvpLimits`] that determines the maximum allowed key and value +/// byte lengths for writes. +#[derive(Clone, Debug)] +pub struct HyperVKvpStore { + path: PathBuf, + limits: KvpLimits, +} + +impl HyperVKvpStore { + /// Create a store backed by the pool file at `path`. + /// + /// The file is created on first write if it does not already exist. + /// `limits` controls the maximum key and value sizes that + /// [`write`](KvpStore::write) will accept. + pub fn new(path: impl Into, limits: KvpLimits) -> Self { + Self { + path: path.into(), + limits, + } + } + + /// Return a reference to the pool file path. + pub fn path(&self) -> &Path { + &self.path + } + + /// Truncate the file when its mtime predates the current boot + /// (stale-data guard). + /// + /// An exclusive `flock` is held while checking metadata and + /// truncating so that concurrent processes don't race on the same + /// check-then-truncate sequence. If the lock cannot be acquired + /// immediately (another client holds it), the call returns `Ok(())` + /// without blocking. + pub fn truncate_if_stale(&self) -> io::Result<()> { + let boot_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| io::Error::other(format!("clock error: {e}")))? + .as_secs() + .saturating_sub(get_uptime().as_secs()) + as i64; + + let file = + match OpenOptions::new().read(true).write(true).open(&self.path) { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(()); + } + Err(e) => return Err(e), + }; + + if FileExt::try_lock_exclusive(&file).is_err() { + return Ok(()); + } + + let result = (|| -> io::Result<()> { + let metadata = file.metadata()?; + if metadata.mtime() < boot_time { + file.set_len(0)?; + } + Ok(()) + })(); + + let _ = FileExt::unlock(&file); + result + } + + fn open_for_append(&self) -> io::Result { + OpenOptions::new() + .append(true) + .create(true) + .open(&self.path) + } + + fn open_for_read(&self) -> io::Result { + OpenOptions::new().read(true).open(&self.path) + } + + fn open_for_read_write(&self) -> io::Result { + OpenOptions::new().read(true).write(true).open(&self.path) + } + + fn validate_key_value(&self, key: &str, value: &str) -> io::Result<()> { + if key.is_empty() { + return Err(io::Error::other("KVP key must not be empty")); + } + if key.len() > self.limits.max_key_size { + return Err(io::Error::other(format!( + "KVP key length ({}) exceeds maximum ({})", + key.len(), + self.limits.max_key_size + ))); + } + if value.len() > self.limits.max_value_size { + return Err(io::Error::other(format!( + "KVP value length ({}) exceeds maximum ({})", + value.len(), + self.limits.max_value_size + ))); + } + Ok(()) + } +} + +/// Encode a key-value pair into a single fixed-size record. +/// +/// The key is zero-padded to [`HV_KVP_EXCHANGE_MAX_KEY_SIZE`] bytes +/// and the value is zero-padded to [`HV_KVP_EXCHANGE_MAX_VALUE_SIZE`] +/// bytes. The caller is responsible for ensuring the key and value do +/// not exceed the on-disk field widths; if they do, only the first N +/// bytes are written (no error is raised at this level -- validation +/// happens in [`HyperVKvpStore::write`]). +pub(crate) fn encode_record(key: &str, value: &str) -> Vec { + let mut buf = vec![0u8; RECORD_SIZE]; + + let key_bytes = key.as_bytes(); + let key_len = key_bytes.len().min(HV_KVP_EXCHANGE_MAX_KEY_SIZE); + buf[..key_len].copy_from_slice(&key_bytes[..key_len]); + + let val_bytes = value.as_bytes(); + let val_len = val_bytes.len().min(HV_KVP_EXCHANGE_MAX_VALUE_SIZE); + buf[HV_KVP_EXCHANGE_MAX_KEY_SIZE..HV_KVP_EXCHANGE_MAX_KEY_SIZE + val_len] + .copy_from_slice(&val_bytes[..val_len]); + + buf +} + +/// Decode a fixed-size record into its key and value strings. +/// +/// Trailing null bytes are stripped from both fields. Returns an error +/// if `data` is not exactly [`RECORD_SIZE`] bytes. +pub(crate) fn decode_record(data: &[u8]) -> io::Result<(String, String)> { + if data.len() != RECORD_SIZE { + return Err(io::Error::other(format!( + "record size mismatch: expected {RECORD_SIZE}, got {}", + data.len() + ))); + } + + let key = String::from_utf8(data[..HV_KVP_EXCHANGE_MAX_KEY_SIZE].to_vec()) + .unwrap_or_default() + .trim_end_matches('\0') + .to_string(); + + let value = + String::from_utf8(data[HV_KVP_EXCHANGE_MAX_KEY_SIZE..].to_vec()) + .unwrap_or_default() + .trim_end_matches('\0') + .to_string(); + + Ok((key, value)) +} + +/// Read all records from a file that is already open and locked. +fn read_all_records(file: &mut File) -> io::Result> { + let mut contents = Vec::new(); + file.read_to_end(&mut contents)?; + + if contents.is_empty() { + return Ok(Vec::new()); + } + + if contents.len() % RECORD_SIZE != 0 { + return Err(io::Error::other(format!( + "file size ({}) is not a multiple of record size ({RECORD_SIZE})", + contents.len() + ))); + } + + contents + .chunks_exact(RECORD_SIZE) + .map(decode_record) + .collect() +} + +impl KvpStore for HyperVKvpStore { + fn limits(&self) -> KvpLimits { + self.limits + } + + /// Append one fixed-size record to the pool file. + /// + /// Validates key and value against the configured [`KvpLimits`], + /// acquires an exclusive flock, writes the record, flushes, and + /// releases the lock. + fn write(&self, key: &str, value: &str) -> io::Result<()> { + self.validate_key_value(key, value)?; + + let mut file = self.open_for_append()?; + let record = encode_record(key, value); + + FileExt::lock_exclusive(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let write_result = file.write_all(&record).and_then(|_| file.flush()); + + let unlock_result = FileExt::unlock(&file).map_err(|e| { + io::Error::other(format!("failed to unlock KVP file: {e}")) + }); + + if let Err(err) = write_result { + let _ = unlock_result; + return Err(err); + } + unlock_result + } + + /// Scan all records and return the value of the last record + /// matching `key` (last-write-wins). + /// + /// Acquires a shared flock during the scan. Returns `Ok(None)` if + /// the pool file does not exist or no record matches. + fn read(&self, key: &str) -> io::Result> { + let mut file = match self.open_for_read() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(None); + } + Err(e) => return Err(e), + }; + + FileExt::lock_shared(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let records = read_all_records(&mut file); + let _ = FileExt::unlock(&file); + let records = records?; + + Ok(records + .into_iter() + .rev() + .find(|(k, _)| k == key) + .map(|(_, v)| v)) + } + + /// Return all key-value pairs as a deduplicated `HashMap`. + /// + /// Duplicate keys are resolved by last-write-wins, matching + /// [`read`](KvpStore::read) semantics. Acquires a shared flock + /// during the scan. Returns an empty map if the pool file does + /// not exist. + fn entries(&self) -> io::Result> { + let mut file = match self.open_for_read() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(HashMap::new()); + } + Err(e) => return Err(e), + }; + + FileExt::lock_shared(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let records = read_all_records(&mut file); + let _ = FileExt::unlock(&file); + let records = records?; + + let mut map = HashMap::new(); + for (k, v) in records { + map.insert(k, v); + } + Ok(map) + } + + /// Rewrite the pool file without the record(s) matching `key`. + /// + /// Acquires an exclusive flock for the duration. Returns `true` if + /// at least one record was removed, `false` if the key was not + /// found. Returns `Ok(false)` if the pool file does not exist. + fn delete(&self, key: &str) -> io::Result { + let mut file = match self.open_for_read_write() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(false); + } + Err(e) => return Err(e), + }; + + FileExt::lock_exclusive(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let result = (|| -> io::Result { + let records = read_all_records(&mut file)?; + let original_count = records.len(); + let kept: Vec<_> = + records.into_iter().filter(|(k, _)| k != key).collect(); + + if kept.len() == original_count { + return Ok(false); + } + + file.set_len(0)?; + file.seek(io::SeekFrom::Start(0))?; + + for (k, v) in &kept { + file.write_all(&encode_record(k, v))?; + } + file.flush()?; + Ok(true) + })(); + + let _ = FileExt::unlock(&file); + result + } +} + +fn get_uptime() -> Duration { + let mut system = System::new(); + system.refresh_memory(); + system.refresh_cpu_usage(); + Duration::from_secs(System::uptime()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::NamedTempFile; + + fn truncate_with_boot_time(path: &Path, boot_time: i64) -> io::Result<()> { + let file = OpenOptions::new().read(true).write(true).open(path)?; + let metadata = file.metadata()?; + if metadata.mtime() < boot_time { + file.set_len(0)?; + } + Ok(()) + } + + fn hyperv_store(path: &Path) -> HyperVKvpStore { + HyperVKvpStore::new(path, KvpLimits::hyperv()) + } + + fn azure_store(path: &Path) -> HyperVKvpStore { + HyperVKvpStore::new(path, KvpLimits::azure()) + } + + #[test] + fn test_encode_decode_roundtrip() { + let key = "test_key"; + let value = "test_value"; + let record = encode_record(key, value); + + assert_eq!(record.len(), RECORD_SIZE); + + let (decoded_key, decoded_value) = + decode_record(&record).expect("decode failed"); + assert_eq!(decoded_key, key); + assert_eq!(decoded_value, value); + } + + #[test] + fn test_write_rejects_empty_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + let err = store.write("", "value").unwrap_err(); + assert!( + err.to_string().contains("empty"), + "expected empty-key error, got: {err}" + ); + } + + #[test] + fn test_write_rejects_oversized_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + let key = "K".repeat(HV_KVP_EXCHANGE_MAX_KEY_SIZE + 1); + let err = store.write(&key, "v").unwrap_err(); + assert!( + err.to_string().contains("key length"), + "expected key-length error, got: {err}" + ); + } + + #[test] + fn test_write_rejects_oversized_value_hyperv() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + let value = "V".repeat(HV_KVP_EXCHANGE_MAX_VALUE_SIZE + 1); + let err = store.write("k", &value).unwrap_err(); + assert!( + err.to_string().contains("value length"), + "expected value-length error, got: {err}" + ); + } + + #[test] + fn test_azure_limits_reject_long_value() { + let tmp = NamedTempFile::new().unwrap(); + let store = azure_store(tmp.path()); + + let value = "V".repeat(1023); + let err = store.write("k", &value).unwrap_err(); + assert!( + err.to_string().contains("value length"), + "expected value-length error with azure limits, got: {err}" + ); + } + + #[test] + fn test_write_and_read() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key1", "value1").unwrap(); + store.write("key2", "value2").unwrap(); + + assert_eq!(store.read("key1").unwrap(), Some("value1".to_string())); + assert_eq!(store.read("key2").unwrap(), Some("value2".to_string())); + assert_eq!(store.read("nonexistent").unwrap(), None); + } + + #[test] + fn test_read_returns_last_match() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "first").unwrap(); + store.write("key", "second").unwrap(); + store.write("key", "third").unwrap(); + + assert_eq!(store.read("key").unwrap(), Some("third".to_string())); + } + + #[test] + fn test_entries_deduplicates_with_last_write_wins() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "v1").unwrap(); + store.write("key", "v2").unwrap(); + store.write("other", "v3").unwrap(); + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 2); + assert_eq!(entries.get("key"), Some(&"v2".to_string())); + assert_eq!(entries.get("other"), Some(&"v3".to_string())); + } + + #[test] + fn test_truncate_if_stale_truncates_when_file_is_older_than_boot() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "value").unwrap(); + assert!(tmp.path().metadata().unwrap().len() > 0); + + truncate_with_boot_time(tmp.path(), i64::MAX).unwrap(); + assert_eq!(tmp.path().metadata().unwrap().len(), 0); + } + + #[test] + fn test_truncate_if_stale_keeps_file_when_newer_than_boot() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "value").unwrap(); + let len_before = tmp.path().metadata().unwrap().len(); + + // Epoch boot time ensures any current file mtime is considered fresh. + truncate_with_boot_time(tmp.path(), 0).unwrap(); + assert_eq!(tmp.path().metadata().unwrap().len(), len_before); + } + + #[test] + fn test_delete_removes_all_matches() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "v1").unwrap(); + store.write("key", "v2").unwrap(); + store.write("other", "v3").unwrap(); + + assert!(store.delete("key").unwrap()); + assert_eq!(store.read("key").unwrap(), None); + assert_eq!(store.read("other").unwrap(), Some("v3".to_string())); + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 1); + } + + #[test] + fn test_multi_thread_concurrent_writes() { + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + + let num_threads: usize = 20; + let iterations: usize = 1_000; + + let handles: Vec<_> = (0..num_threads) + .map(|tid| { + let p = path.clone(); + std::thread::spawn(move || { + let store = HyperVKvpStore::new(&p, KvpLimits::hyperv()); + for i in 0..iterations { + let key = format!("thread-{tid}-iter-{i}"); + let value = format!("value-{tid}-{i}"); + store.write(&key, &value).expect("write failed"); + } + }) + }) + .collect(); + + for h in handles { + h.join().expect("thread panicked"); + } + + let store = HyperVKvpStore::new(&path, KvpLimits::hyperv()); + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), num_threads * iterations); + } +} diff --git a/libazureinit-kvp/src/lib.rs b/libazureinit-kvp/src/lib.rs new file mode 100644 index 00000000..6d8b4dff --- /dev/null +++ b/libazureinit-kvp/src/lib.rs @@ -0,0 +1,121 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! `libazureinit-kvp` provides a storage trait and Hyper-V-backed +//! implementation for KVP pool files. +//! +//! - [`KvpStore`]: storage interface used by higher layers. +//! - [`HyperVKvpStore`]: production implementation. +//! - [`KvpLimits`]: exported Hyper-V and Azure byte limits. + +use std::collections::HashMap; +use std::io; + +pub mod hyperv; + +pub use hyperv::HyperVKvpStore; + +/// Hyper-V key limit in bytes (policy/default preset). +pub const HYPERV_MAX_KEY_BYTES: usize = 512; +/// Hyper-V value limit in bytes (policy/default preset). +pub const HYPERV_MAX_VALUE_BYTES: usize = 2048; +/// Azure key limit in bytes (policy/default preset). +pub const AZURE_MAX_KEY_BYTES: usize = 512; +/// Azure value limit in bytes (UTF-16: 511 characters + null terminator). +pub const AZURE_MAX_VALUE_BYTES: usize = 1022; + +/// Storage abstraction for KVP backends. +/// +/// Semantics: +/// - `write`: stores one key/value or returns validation/I/O error. +/// - `read`: returns the most recent value for a key (last-write-wins). +/// - `entries`: returns deduplicated key/value pairs as `HashMap`. +/// - `delete`: removes all records for a key and reports whether any were removed. +/// - `limits`: returns the [`KvpLimits`] that govern maximum key/value +/// sizes for this store, allowing consumers to chunk or validate +/// data generically. +pub trait KvpStore: Send + Sync { + /// The key and value byte-size limits for this store. + /// + /// Consumers (e.g. diagnostics, tracing layers) should call this + /// instead of hardcoding size constants, so the limits stay correct + /// regardless of the underlying implementation. + fn limits(&self) -> KvpLimits; + + /// Write a key-value pair into the store. + /// + /// Returns an error if: + /// - The key is empty. + /// - The key exceeds the configured maximum key size. + /// - The value exceeds the configured maximum value size. + /// - An I/O error occurs during the write. + fn write(&self, key: &str, value: &str) -> io::Result<()>; + + /// Read the value for a given key, returning `None` if absent. + /// + /// When multiple records exist for the same key (append-only + /// storage), the value from the most recent record is returned + /// (last-write-wins). + fn read(&self, key: &str) -> io::Result>; + + /// Return all key-value pairs currently in the store. + /// + /// Keys are deduplicated using last-write-wins semantics, matching + /// the behavior of [`read`](KvpStore::read). + fn entries(&self) -> io::Result>; + + /// Remove all records matching `key`. + /// + /// Returns `true` if at least one record was removed, `false` if + /// the key was not found. + fn delete(&self, key: &str) -> io::Result; +} + +/// Configurable key/value byte limits for writes. +/// +/// Presets: +/// - [`KvpLimits::hyperv`]: [`HYPERV_MAX_KEY_BYTES`] / +/// [`HYPERV_MAX_VALUE_BYTES`]. +/// - [`KvpLimits::azure`]: [`AZURE_MAX_KEY_BYTES`] / +/// [`AZURE_MAX_VALUE_BYTES`]. +/// +/// Use `azure()` for Azure guests, where host-side consumers are stricter +/// on value byte length than raw Hyper-V format. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct KvpLimits { + pub max_key_size: usize, + pub max_value_size: usize, +} + +impl KvpLimits { + /// Raw Hyper-V wire format limits. + /// + /// - Max key size: 512 bytes + /// - Max value size: 2,048 bytes + /// + /// Use this when writing to a Hyper-V KVP pool file that will only + /// be consumed by Hyper-V tooling (not the Azure host agent). + pub const fn hyperv() -> Self { + Self { + max_key_size: HYPERV_MAX_KEY_BYTES, + max_value_size: HYPERV_MAX_VALUE_BYTES, + } + } + + /// Azure platform limits. + /// + /// - Max key size: 512 bytes + /// - Max value size: 1,022 bytes (UTF-16: 511 characters + null + /// terminator) + /// + /// The Azure host agent reads KVP records from the guest but is + /// stricter than the underlying Hyper-V format. Values beyond + /// 1,022 bytes are silently truncated by the host. Use this preset + /// for any code running on Azure VMs. + pub const fn azure() -> Self { + Self { + max_key_size: AZURE_MAX_KEY_BYTES, + max_value_size: AZURE_MAX_VALUE_BYTES, + } + } +} From 5226e32e9256c1b14918bd5e5fb7c7ded3655b9a Mon Sep 17 00:00:00 2001 From: Peyton Date: Mon, 9 Mar 2026 13:04:10 -0700 Subject: [PATCH 2/5] add Azure autodetect limits selection for HyperVKvpStore --- doc/libazurekvp.md | 2 +- libazureinit-kvp/src/hyperv.rs | 79 ++++++++++++++++++++++++++++++++-- libazureinit-kvp/src/lib.rs | 35 +++------------ 3 files changed, 81 insertions(+), 35 deletions(-) diff --git a/doc/libazurekvp.md b/doc/libazurekvp.md index 8b34fb54..2bef650c 100644 --- a/doc/libazurekvp.md +++ b/doc/libazurekvp.md @@ -70,7 +70,7 @@ enforce and reuse exact bounds. - `max_value_size = 2048` - `KvpLimits::azure()` - `max_key_size = 512` - - `max_value_size = 1022` (UTF-16: 511 characters + null terminator) + - `max_value_size = 1022` Why Azure limit is lower for values: - Hyper-V record format allows 2048-byte values. diff --git a/libazureinit-kvp/src/hyperv.rs b/libazureinit-kvp/src/hyperv.rs index c0be3429..7e300f04 100644 --- a/libazureinit-kvp/src/hyperv.rs +++ b/libazureinit-kvp/src/hyperv.rs @@ -20,7 +20,7 @@ //! Writes validate key/value byte lengths using [`KvpLimits`] and return //! errors for empty/oversized keys or oversized values. The on-disk //! format is always 512 + 2,048 bytes; limits only constrain what may -//! be written (for example, Azure's 1,024-byte value limit). +//! be written (for example, Azure's 1,022-byte value limit). //! //! Higher layers are responsible for splitting/chunking oversized //! diagnostics payloads before calling this store. @@ -42,6 +42,18 @@ use crate::{ KvpLimits, KvpStore, HYPERV_MAX_KEY_BYTES, HYPERV_MAX_VALUE_BYTES, }; +/// DMI chassis asset tag used to identify Azure VMs. +const AZURE_CHASSIS_ASSET_TAG: &str = "7783-7084-3265-9085-8269-3286-77"; +const AZURE_CHASSIS_ASSET_TAG_PATH: &str = + "/sys/class/dmi/id/chassis_asset_tag"; + +fn is_azure_vm(tag_path: Option<&str>) -> bool { + let path = tag_path.unwrap_or(AZURE_CHASSIS_ASSET_TAG_PATH); + std::fs::read_to_string(path) + .map(|s| s.trim() == AZURE_CHASSIS_ASSET_TAG) + .unwrap_or(false) +} + /// Key field width in the on-disk record format (bytes). const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = HYPERV_MAX_KEY_BYTES; @@ -68,11 +80,10 @@ pub struct HyperVKvpStore { } impl HyperVKvpStore { - /// Create a store backed by the pool file at `path`. + /// Create a store with explicit limits. /// /// The file is created on first write if it does not already exist. - /// `limits` controls the maximum key and value sizes that - /// [`write`](KvpStore::write) will accept. + /// Use [`HyperVKvpStore::new_autodetect`] to choose limits automatically. pub fn new(path: impl Into, limits: KvpLimits) -> Self { Self { path: path.into(), @@ -80,6 +91,38 @@ impl HyperVKvpStore { } } + /// Create a store with limits chosen from host platform detection. + /// + /// If the Azure DMI asset tag is present, uses [`KvpLimits::azure`]. + /// Otherwise, uses [`KvpLimits::hyperv`]. + pub fn new_autodetect(path: impl Into) -> Self { + let limits = if is_azure_vm(None) { + KvpLimits::azure() + } else { + KvpLimits::hyperv() + }; + Self { + path: path.into(), + limits, + } + } + + #[cfg(test)] + fn new_autodetect_with_tag_path( + path: impl Into, + tag_path: &str, + ) -> Self { + let limits = if is_azure_vm(Some(tag_path)) { + KvpLimits::azure() + } else { + KvpLimits::hyperv() + }; + Self { + path: path.into(), + limits, + } + } + /// Return a reference to the pool file path. pub fn path(&self) -> &Path { &self.path @@ -398,6 +441,34 @@ mod tests { HyperVKvpStore::new(path, KvpLimits::azure()) } + #[test] + fn test_autodetect_uses_azure_limits_on_azure_host() { + let tag_file = NamedTempFile::new().unwrap(); + let pool_file = NamedTempFile::new().unwrap(); + // DMI files on Linux have a trailing newline. + std::fs::write(tag_file.path(), format!("{AZURE_CHASSIS_ASSET_TAG}\n")) + .unwrap(); + + let store = HyperVKvpStore::new_autodetect_with_tag_path( + pool_file.path(), + tag_file.path().to_str().unwrap(), + ); + assert_eq!(store.limits(), KvpLimits::azure()); + } + + #[test] + fn test_autodetect_uses_hyperv_limits_on_bare_hyperv() { + let tag_file = NamedTempFile::new().unwrap(); + let pool_file = NamedTempFile::new().unwrap(); + std::fs::write(tag_file.path(), "bare-hyperv-tag").unwrap(); + + let store = HyperVKvpStore::new_autodetect_with_tag_path( + pool_file.path(), + tag_file.path().to_str().unwrap(), + ); + assert_eq!(store.limits(), KvpLimits::hyperv()); + } + #[test] fn test_encode_decode_roundtrip() { let key = "test_key"; diff --git a/libazureinit-kvp/src/lib.rs b/libazureinit-kvp/src/lib.rs index 6d8b4dff..89fef04f 100644 --- a/libazureinit-kvp/src/lib.rs +++ b/libazureinit-kvp/src/lib.rs @@ -21,7 +21,7 @@ pub const HYPERV_MAX_KEY_BYTES: usize = 512; pub const HYPERV_MAX_VALUE_BYTES: usize = 2048; /// Azure key limit in bytes (policy/default preset). pub const AZURE_MAX_KEY_BYTES: usize = 512; -/// Azure value limit in bytes (UTF-16: 511 characters + null terminator). +/// Azure value limit in bytes, matching Azure host behavior. pub const AZURE_MAX_VALUE_BYTES: usize = 1022; /// Storage abstraction for KVP backends. @@ -32,8 +32,7 @@ pub const AZURE_MAX_VALUE_BYTES: usize = 1022; /// - `entries`: returns deduplicated key/value pairs as `HashMap`. /// - `delete`: removes all records for a key and reports whether any were removed. /// - `limits`: returns the [`KvpLimits`] that govern maximum key/value -/// sizes for this store, allowing consumers to chunk or validate -/// data generically. +/// sizes for this store. pub trait KvpStore: Send + Sync { /// The key and value byte-size limits for this store. /// @@ -71,16 +70,7 @@ pub trait KvpStore: Send + Sync { fn delete(&self, key: &str) -> io::Result; } -/// Configurable key/value byte limits for writes. -/// -/// Presets: -/// - [`KvpLimits::hyperv`]: [`HYPERV_MAX_KEY_BYTES`] / -/// [`HYPERV_MAX_VALUE_BYTES`]. -/// - [`KvpLimits::azure`]: [`AZURE_MAX_KEY_BYTES`] / -/// [`AZURE_MAX_VALUE_BYTES`]. -/// -/// Use `azure()` for Azure guests, where host-side consumers are stricter -/// on value byte length than raw Hyper-V format. +/// Key/value byte limits for write validation. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct KvpLimits { pub max_key_size: usize, @@ -88,13 +78,7 @@ pub struct KvpLimits { } impl KvpLimits { - /// Raw Hyper-V wire format limits. - /// - /// - Max key size: 512 bytes - /// - Max value size: 2,048 bytes - /// - /// Use this when writing to a Hyper-V KVP pool file that will only - /// be consumed by Hyper-V tooling (not the Azure host agent). + /// Hyper-V limits (512-byte key, 2,048-byte value). pub const fn hyperv() -> Self { Self { max_key_size: HYPERV_MAX_KEY_BYTES, @@ -102,16 +86,7 @@ impl KvpLimits { } } - /// Azure platform limits. - /// - /// - Max key size: 512 bytes - /// - Max value size: 1,022 bytes (UTF-16: 511 characters + null - /// terminator) - /// - /// The Azure host agent reads KVP records from the guest but is - /// stricter than the underlying Hyper-V format. Values beyond - /// 1,022 bytes are silently truncated by the host. Use this preset - /// for any code running on Azure VMs. + /// Azure limits (512-byte key, 1,022-byte value). pub const fn azure() -> Self { Self { max_key_size: AZURE_MAX_KEY_BYTES, From c1440c960c1a9d4c7dc052227b35ec1e05803152 Mon Sep 17 00:00:00 2001 From: Peyton Date: Mon, 9 Mar 2026 15:04:21 -0700 Subject: [PATCH 3/5] Improving libazurekvp.md clarity --- doc/libazurekvp.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/doc/libazurekvp.md b/doc/libazurekvp.md index 2bef650c..6fe7bbba 100644 --- a/doc/libazurekvp.md +++ b/doc/libazurekvp.md @@ -29,8 +29,7 @@ Records are appended to the file and zero-padded to fixed widths. - key byte length exceeds `max_key_size` - value byte length exceeds `max_value_size` - an I/O error occurs -- Oversized values are rejected by the store (no silent truncation). - Higher layers are responsible for chunking/splitting when required. +- This store never truncates: oversized values cause `write()` to return an error. ### `read(key)` @@ -74,10 +73,9 @@ enforce and reuse exact bounds. Why Azure limit is lower for values: - Hyper-V record format allows 2048-byte values. -- Azure host handling is stricter; values beyond 1022 bytes are - silently truncated by host-side consumers. -- For Azure VMs, use `KvpLimits::azure()` and rely on higher-level - chunking when larger payloads must be preserved. +- On Azure, host-side consumers truncate values beyond 1022 bytes, so + `KvpLimits::azure()` caps at 1022. Layers above this crate (e.g. + diagnostics/tracing) handle chunking of larger payloads. ## Record Count Behavior From d63dd42f74144c6acce895db7066c13700eee11c Mon Sep 17 00:00:00 2001 From: peytonr18 Date: Tue, 10 Mar 2026 09:28:53 -0700 Subject: [PATCH 4/5] feat(kvp): harden Hyper-V record decode and refactor stale truncate tests --- libazureinit-kvp/src/hyperv.rs | 95 ++++++++++++++++++++++------------ 1 file changed, 61 insertions(+), 34 deletions(-) diff --git a/libazureinit-kvp/src/hyperv.rs b/libazureinit-kvp/src/hyperv.rs index 7e300f04..4d4a47a3 100644 --- a/libazureinit-kvp/src/hyperv.rs +++ b/libazureinit-kvp/src/hyperv.rs @@ -144,6 +144,10 @@ impl HyperVKvpStore { .saturating_sub(get_uptime().as_secs()) as i64; + self.truncate_if_stale_at_boot(boot_time) + } + + fn truncate_if_stale_at_boot(&self, boot_time: i64) -> io::Result<()> { let file = match OpenOptions::new().read(true).write(true).open(&self.path) { Ok(f) => f, @@ -153,8 +157,11 @@ impl HyperVKvpStore { Err(e) => return Err(e), }; - if FileExt::try_lock_exclusive(&file).is_err() { - return Ok(()); + if let Err(e) = FileExt::try_lock_exclusive(&file) { + if e.kind() == ErrorKind::WouldBlock { + return Ok(()); + } + return Err(e); } let result = (|| -> io::Result<()> { @@ -232,7 +239,8 @@ pub(crate) fn encode_record(key: &str, value: &str) -> Vec { /// Decode a fixed-size record into its key and value strings. /// /// Trailing null bytes are stripped from both fields. Returns an error -/// if `data` is not exactly [`RECORD_SIZE`] bytes. +/// if `data` is not exactly [`RECORD_SIZE`] bytes or if either field +/// contains invalid UTF-8. pub(crate) fn decode_record(data: &[u8]) -> io::Result<(String, String)> { if data.len() != RECORD_SIZE { return Err(io::Error::other(format!( @@ -241,40 +249,49 @@ pub(crate) fn decode_record(data: &[u8]) -> io::Result<(String, String)> { ))); } - let key = String::from_utf8(data[..HV_KVP_EXCHANGE_MAX_KEY_SIZE].to_vec()) - .unwrap_or_default() + let (key_bytes, value_bytes) = data.split_at(HV_KVP_EXCHANGE_MAX_KEY_SIZE); + + let key = std::str::from_utf8(key_bytes) + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? .trim_end_matches('\0') .to_string(); - let value = - String::from_utf8(data[HV_KVP_EXCHANGE_MAX_KEY_SIZE..].to_vec()) - .unwrap_or_default() - .trim_end_matches('\0') - .to_string(); + let value = std::str::from_utf8(value_bytes) + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? + .trim_end_matches('\0') + .to_string(); Ok((key, value)) } /// Read all records from a file that is already open and locked. fn read_all_records(file: &mut File) -> io::Result> { - let mut contents = Vec::new(); - file.read_to_end(&mut contents)?; + let metadata = file.metadata()?; + let len = metadata.len() as usize; - if contents.is_empty() { + if len == 0 { return Ok(Vec::new()); } - if contents.len() % RECORD_SIZE != 0 { + if len % RECORD_SIZE != 0 { return Err(io::Error::other(format!( - "file size ({}) is not a multiple of record size ({RECORD_SIZE})", - contents.len() + "file size ({len}) is not a multiple of record size ({RECORD_SIZE})" ))); } - contents - .chunks_exact(RECORD_SIZE) - .map(decode_record) - .collect() + // Ensure we start reading from the beginning of the file. + file.seek(io::SeekFrom::Start(0))?; + + let record_count = len / RECORD_SIZE; + let mut records = Vec::with_capacity(record_count); + let mut buf = [0u8; RECORD_SIZE]; + + for _ in 0..record_count { + file.read_exact(&mut buf)?; + records.push(decode_record(&buf)?); + } + + Ok(records) } impl KvpStore for HyperVKvpStore { @@ -424,15 +441,6 @@ mod tests { use super::*; use tempfile::NamedTempFile; - fn truncate_with_boot_time(path: &Path, boot_time: i64) -> io::Result<()> { - let file = OpenOptions::new().read(true).write(true).open(path)?; - let metadata = file.metadata()?; - if metadata.mtime() < boot_time { - file.set_len(0)?; - } - Ok(()) - } - fn hyperv_store(path: &Path) -> HyperVKvpStore { HyperVKvpStore::new(path, KvpLimits::hyperv()) } @@ -575,28 +583,47 @@ mod tests { } #[test] - fn test_truncate_if_stale_truncates_when_file_is_older_than_boot() { + fn test_truncate_if_stale_at_boot_truncates_when_file_is_older_than_boot() { let tmp = NamedTempFile::new().unwrap(); let store = hyperv_store(tmp.path()); store.write("key", "value").unwrap(); assert!(tmp.path().metadata().unwrap().len() > 0); - truncate_with_boot_time(tmp.path(), i64::MAX).unwrap(); + store.truncate_if_stale_at_boot(i64::MAX).unwrap(); assert_eq!(tmp.path().metadata().unwrap().len(), 0); } #[test] - fn test_truncate_if_stale_keeps_file_when_newer_than_boot() { + fn test_truncate_if_stale_at_boot_keeps_file_when_newer_than_boot() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "value").unwrap(); + let len_before = tmp.path().metadata().unwrap().len(); + + store.truncate_if_stale_at_boot(0).unwrap(); + assert_eq!(tmp.path().metadata().unwrap().len(), len_before); + } + + #[test] + fn test_truncate_if_stale_keeps_fresh_file() { let tmp = NamedTempFile::new().unwrap(); let store = hyperv_store(tmp.path()); store.write("key", "value").unwrap(); let len_before = tmp.path().metadata().unwrap().len(); + assert!(len_before > 0); - // Epoch boot time ensures any current file mtime is considered fresh. - truncate_with_boot_time(tmp.path(), 0).unwrap(); + store.truncate_if_stale().unwrap(); assert_eq!(tmp.path().metadata().unwrap().len(), len_before); + assert_eq!(store.read("key").unwrap(), Some("value".to_string())); + } + + #[test] + fn test_truncate_if_stale_ok_when_file_missing() { + let store = hyperv_store(Path::new("/tmp/nonexistent-kvp-pool")); + store.truncate_if_stale().unwrap(); } #[test] From 62a52fede6b86e0891b9d224b3b13b4a5a5ae6ed Mon Sep 17 00:00:00 2001 From: Peyton Date: Tue, 10 Mar 2026 17:17:07 -0700 Subject: [PATCH 5/5] Refactor libazureinit-kvp: KvpStore trait, KvpError, HyperV/Azure split, clear() API --- doc/libazurekvp.md | 88 -------- libazureinit-kvp/src/azure.rs | 134 ++++++++++++ libazureinit-kvp/src/hyperv.rs | 386 ++++++++++++++------------------- libazureinit-kvp/src/lib.rs | 205 +++++++++++------ 4 files changed, 443 insertions(+), 370 deletions(-) delete mode 100644 doc/libazurekvp.md create mode 100644 libazureinit-kvp/src/azure.rs diff --git a/doc/libazurekvp.md b/doc/libazurekvp.md deleted file mode 100644 index 6fe7bbba..00000000 --- a/doc/libazurekvp.md +++ /dev/null @@ -1,88 +0,0 @@ -# `libazureinit-kvp` - -`libazureinit-kvp` is the storage layer for Hyper-V KVP (Key-Value Pair) -pool files used by Azure guests. - -It defines: -- `KvpStore`: storage trait with explicit read/write/delete semantics. -- `HyperVKvpStore`: production implementation backed by the Hyper-V - binary pool file format. -- `KvpLimits`: exported key/value byte limits for Hyper-V and Azure. - -## Record Format - -The Hyper-V pool file record format is fixed width: -- Key field: 512 bytes -- Value field: 2048 bytes -- Total record size: 2560 bytes - -Records are appended to the file and zero-padded to fixed widths. - -## Store Semantics - -### `write(key, value)` - -- Append-only behavior: each call appends one new record. -- Duplicate keys are allowed in the file. -- Returns an error when: - - key is empty - - key byte length exceeds `max_key_size` - - value byte length exceeds `max_value_size` - - an I/O error occurs -- This store never truncates: oversized values cause `write()` to return an error. - -### `read(key)` - -- Scans records and returns the value from the most recent matching key - (last-write-wins). -- Returns `Ok(None)` when the key is missing or file does not exist. - -### `entries()` - -- Returns `HashMap`. -- Deduplicates duplicate keys using last-write-wins, matching `read`. -- This exposes a logical unique-key view even though the file itself is - append-only and may contain multiple records per key. - -### `delete(key)` - -- Rewrites the file without any matching key records. -- Returns `true` if at least one record was removed, else `false`. - -## Truncate Semantics (`truncate_if_stale`) - -`HyperVKvpStore::truncate_if_stale` clears stale records from previous -boots by comparing file `mtime` to the current boot timestamp. - -- If file predates boot: truncate to zero length. -- If file is current: leave unchanged. -- If lock contention occurs (`WouldBlock`): return `Ok(())` and skip. -- Non-contention lock failures are returned as errors. - -## Limits and Azure Compatibility - -`KvpLimits` is exported so callers (including diagnostics layers) can -enforce and reuse exact bounds. - -- `KvpLimits::hyperv()` - - `max_key_size = 512` - - `max_value_size = 2048` -- `KvpLimits::azure()` - - `max_key_size = 512` - - `max_value_size = 1022` - -Why Azure limit is lower for values: -- Hyper-V record format allows 2048-byte values. -- On Azure, host-side consumers truncate values beyond 1022 bytes, so - `KvpLimits::azure()` caps at 1022. Layers above this crate (e.g. - diagnostics/tracing) handle chunking of larger payloads. - -## Record Count Behavior - -There is no explicit record-count cap in this storage layer. -The file grows with each append until external constraints (disk space, -retention policy, or caller behavior) are applied. - -## References - -- [Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) \ No newline at end of file diff --git a/libazureinit-kvp/src/azure.rs b/libazureinit-kvp/src/azure.rs new file mode 100644 index 00000000..7c35c9c9 --- /dev/null +++ b/libazureinit-kvp/src/azure.rs @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Azure-specific KVP store. +//! +//! Wraps [`HyperVKvpStore`] with the stricter value-size limit imposed +//! by the Azure host (1,022 bytes). All other behavior — record +//! format, file locking, append-only writes — is inherited from the +//! underlying Hyper-V pool file implementation. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use crate::hyperv::HyperVKvpStore; +use crate::{KvpError, KvpStore}; + +/// Azure host-side value limit (values beyond this are truncated). +const AZURE_MAX_VALUE_BYTES: usize = 1022; + +/// Azure KVP store backed by a Hyper-V pool file. +/// +/// Identical to [`HyperVKvpStore`] except that +/// [`MAX_VALUE_SIZE`](KvpStore::MAX_VALUE_SIZE) is set to 1,022 bytes, +/// matching the Azure host's truncation behavior. +#[derive(Clone, Debug)] +pub struct AzureKvpStore { + inner: HyperVKvpStore, +} + +impl AzureKvpStore { + /// Create a new Azure KVP store backed by the pool file at `path`. + /// + /// When `truncate_on_stale` is `true` the constructor checks + /// whether the pool file predates the current boot and, if so, + /// truncates it before returning. + pub fn new( + path: impl Into, + truncate_on_stale: bool, + ) -> Result { + Ok(Self { + inner: HyperVKvpStore::new(path, truncate_on_stale)?, + }) + } + + /// Return a reference to the pool file path. + pub fn path(&self) -> &Path { + self.inner.path() + } +} + +impl KvpStore for AzureKvpStore { + const MAX_KEY_SIZE: usize = HyperVKvpStore::MAX_KEY_SIZE; + const MAX_VALUE_SIZE: usize = AZURE_MAX_VALUE_BYTES; + + fn backend_read(&self, key: &str) -> Result, KvpError> { + self.inner.backend_read(key) + } + + fn backend_write(&self, key: &str, value: &str) -> Result<(), KvpError> { + self.inner.backend_write(key, value) + } + + fn entries(&self) -> Result, KvpError> { + self.inner.entries() + } + + fn entries_raw(&self) -> Result, KvpError> { + self.inner.entries_raw() + } + + fn delete(&self, key: &str) -> Result { + self.inner.delete(key) + } + + fn backend_clear(&self) -> Result<(), KvpError> { + self.inner.backend_clear() + } + + fn is_stale(&self) -> Result { + self.inner.is_stale() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::NamedTempFile; + + fn azure_store(path: &Path) -> AzureKvpStore { + AzureKvpStore::new(path, false).unwrap() + } + + #[test] + fn test_azure_rejects_value_over_1022() { + let tmp = NamedTempFile::new().unwrap(); + let store = azure_store(tmp.path()); + + let value = "V".repeat(AZURE_MAX_VALUE_BYTES + 1); + let err = store.write("k", &value).unwrap_err(); + assert!( + matches!(err, KvpError::ValueTooLarge { .. }), + "expected ValueTooLarge, got: {err}" + ); + } + + #[test] + fn test_azure_accepts_value_at_1022() { + let tmp = NamedTempFile::new().unwrap(); + let store = azure_store(tmp.path()); + + let value = "V".repeat(AZURE_MAX_VALUE_BYTES); + store.write("k", &value).unwrap(); + assert_eq!(store.read("k").unwrap(), Some(value)); + } + + #[test] + fn test_azure_write_and_read() { + let tmp = NamedTempFile::new().unwrap(); + let store = azure_store(tmp.path()); + + store.write("key", "value").unwrap(); + assert_eq!(store.read("key").unwrap(), Some("value".to_string())); + } + + #[test] + fn test_azure_clear() { + let tmp = NamedTempFile::new().unwrap(); + let store = azure_store(tmp.path()); + + store.write("key", "value").unwrap(); + store.clear().unwrap(); + assert_eq!(store.read("key").unwrap(), None); + } +} diff --git a/libazureinit-kvp/src/hyperv.rs b/libazureinit-kvp/src/hyperv.rs index 4d4a47a3..f2b359f5 100644 --- a/libazureinit-kvp/src/hyperv.rs +++ b/libazureinit-kvp/src/hyperv.rs @@ -1,29 +1,16 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -//! Hyper-V-backed [`KvpStore`] implementation. +//! Hyper-V KVP (Key-Value Pair) pool file backend. //! -//! ## Record format -//! - Fixed-size records of [`RECORD_SIZE`] bytes (512-byte key + -//! 2,048-byte value), zero-padded on disk. -//! - No record-count header or explicit record cap in this layer. +//! Hyper-V exposes a Data Exchange Service that lets a guest and its +//! host share key-value pairs through a set of pool files. Each pool +//! file is a flat sequence of fixed-size records (512-byte key + +//! 2,048-byte value, zero-padded). There is no record-count header; +//! the file grows by one record per write. //! -//! ## Behavior summary -//! - **`write`**: append-only; one record appended per call. -//! - **`read`**: last-write-wins for duplicate keys. -//! - **`entries`**: returns a deduplicated `HashMap` with last-write-wins. -//! - **`delete`**: rewrites file and removes all records for the key. -//! - **`truncate_if_stale`**: truncates if file predates boot; on lock -//! contention (`WouldBlock`) it returns `Ok(())` and skips. -//! -//! ## Limits -//! Writes validate key/value byte lengths using [`KvpLimits`] and return -//! errors for empty/oversized keys or oversized values. The on-disk -//! format is always 512 + 2,048 bytes; limits only constrain what may -//! be written (for example, Azure's 1,022-byte value limit). -//! -//! Higher layers are responsible for splitting/chunking oversized -//! diagnostics payloads before calling this store. +//! On Azure, the host-side KVP consumer truncates values beyond +//! 1,022 bytes, so Azure guests should use [`AzureKvpStore`](crate::AzureKvpStore). //! //! ## Reference //! - [Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) @@ -38,27 +25,13 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use fs2::FileExt; use sysinfo::System; -use crate::{ - KvpLimits, KvpStore, HYPERV_MAX_KEY_BYTES, HYPERV_MAX_VALUE_BYTES, -}; - -/// DMI chassis asset tag used to identify Azure VMs. -const AZURE_CHASSIS_ASSET_TAG: &str = "7783-7084-3265-9085-8269-3286-77"; -const AZURE_CHASSIS_ASSET_TAG_PATH: &str = - "/sys/class/dmi/id/chassis_asset_tag"; - -fn is_azure_vm(tag_path: Option<&str>) -> bool { - let path = tag_path.unwrap_or(AZURE_CHASSIS_ASSET_TAG_PATH); - std::fs::read_to_string(path) - .map(|s| s.trim() == AZURE_CHASSIS_ASSET_TAG) - .unwrap_or(false) -} +use crate::{KvpError, KvpStore}; /// Key field width in the on-disk record format (bytes). -const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = HYPERV_MAX_KEY_BYTES; +const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = 512; /// Value field width in the on-disk record format (bytes). -const HV_KVP_EXCHANGE_MAX_VALUE_SIZE: usize = HYPERV_MAX_VALUE_BYTES; +const HV_KVP_EXCHANGE_MAX_VALUE_SIZE: usize = 2048; /// Total size of one on-disk record (key + value). const RECORD_SIZE: usize = @@ -69,58 +42,26 @@ const RECORD_SIZE: usize = /// Reads and writes the binary Hyper-V KVP format: fixed-size records /// of [`RECORD_SIZE`] bytes (512-byte key + 2,048-byte value) with /// flock-based concurrency control. -/// -/// Constructed via [`HyperVKvpStore::new`] with a file path and a -/// [`KvpLimits`] that determines the maximum allowed key and value -/// byte lengths for writes. #[derive(Clone, Debug)] pub struct HyperVKvpStore { path: PathBuf, - limits: KvpLimits, } impl HyperVKvpStore { - /// Create a store with explicit limits. - /// - /// The file is created on first write if it does not already exist. - /// Use [`HyperVKvpStore::new_autodetect`] to choose limits automatically. - pub fn new(path: impl Into, limits: KvpLimits) -> Self { - Self { - path: path.into(), - limits, - } - } - - /// Create a store with limits chosen from host platform detection. + /// Create a new store backed by the pool file at `path`. /// - /// If the Azure DMI asset tag is present, uses [`KvpLimits::azure`]. - /// Otherwise, uses [`KvpLimits::hyperv`]. - pub fn new_autodetect(path: impl Into) -> Self { - let limits = if is_azure_vm(None) { - KvpLimits::azure() - } else { - KvpLimits::hyperv() - }; - Self { - path: path.into(), - limits, - } - } - - #[cfg(test)] - fn new_autodetect_with_tag_path( + /// When `truncate_on_stale` is `true` the constructor checks + /// whether the pool file predates the current boot and, if so, + /// truncates it before returning. + pub fn new( path: impl Into, - tag_path: &str, - ) -> Self { - let limits = if is_azure_vm(Some(tag_path)) { - KvpLimits::azure() - } else { - KvpLimits::hyperv() - }; - Self { - path: path.into(), - limits, + truncate_on_stale: bool, + ) -> Result { + let store = Self { path: path.into() }; + if truncate_on_stale && store.pool_is_stale()? { + store.truncate_pool()?; } + Ok(store) } /// Return a reference to the pool file path. @@ -128,49 +69,59 @@ impl HyperVKvpStore { &self.path } - /// Truncate the file when its mtime predates the current boot - /// (stale-data guard). - /// - /// An exclusive `flock` is held while checking metadata and - /// truncating so that concurrent processes don't race on the same - /// check-then-truncate sequence. If the lock cannot be acquired - /// immediately (another client holds it), the call returns `Ok(())` - /// without blocking. - pub fn truncate_if_stale(&self) -> io::Result<()> { - let boot_time = SystemTime::now() + // -- Private helpers ------------------------------------------------ + + fn boot_time() -> Result { + let now = SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(|e| io::Error::other(format!("clock error: {e}")))? - .as_secs() - .saturating_sub(get_uptime().as_secs()) - as i64; + .as_secs(); + Ok(now.saturating_sub(get_uptime().as_secs()) as i64) + } - self.truncate_if_stale_at_boot(boot_time) + /// Check whether the pool file's mtime predates the current boot. + /// + /// Returns `false` if the file does not exist. + fn pool_is_stale(&self) -> Result { + let metadata = match std::fs::metadata(&self.path) { + Ok(m) => m, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(false); + } + Err(e) => return Err(e.into()), + }; + let boot = Self::boot_time()?; + Ok(metadata.mtime() < boot) } - fn truncate_if_stale_at_boot(&self, boot_time: i64) -> io::Result<()> { + #[cfg(test)] + fn pool_is_stale_at_boot(&self, boot_time: i64) -> Result { + let metadata = match std::fs::metadata(&self.path) { + Ok(m) => m, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(false); + } + Err(e) => return Err(e.into()), + }; + Ok(metadata.mtime() < boot_time) + } + + /// Truncate the pool file to zero length under an exclusive flock. + fn truncate_pool(&self) -> Result<(), KvpError> { let file = match OpenOptions::new().read(true).write(true).open(&self.path) { Ok(f) => f, Err(ref e) if e.kind() == ErrorKind::NotFound => { return Ok(()); } - Err(e) => return Err(e), + Err(e) => return Err(e.into()), }; - if let Err(e) = FileExt::try_lock_exclusive(&file) { - if e.kind() == ErrorKind::WouldBlock { - return Ok(()); - } - return Err(e); - } + FileExt::lock_exclusive(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; - let result = (|| -> io::Result<()> { - let metadata = file.metadata()?; - if metadata.mtime() < boot_time { - file.set_len(0)?; - } - Ok(()) - })(); + let result = file.set_len(0).map_err(KvpError::from); let _ = FileExt::unlock(&file); result @@ -190,27 +141,6 @@ impl HyperVKvpStore { fn open_for_read_write(&self) -> io::Result { OpenOptions::new().read(true).write(true).open(&self.path) } - - fn validate_key_value(&self, key: &str, value: &str) -> io::Result<()> { - if key.is_empty() { - return Err(io::Error::other("KVP key must not be empty")); - } - if key.len() > self.limits.max_key_size { - return Err(io::Error::other(format!( - "KVP key length ({}) exceeds maximum ({})", - key.len(), - self.limits.max_key_size - ))); - } - if value.len() > self.limits.max_value_size { - return Err(io::Error::other(format!( - "KVP value length ({}) exceeds maximum ({})", - value.len(), - self.limits.max_value_size - ))); - } - Ok(()) - } } /// Encode a key-value pair into a single fixed-size record. @@ -273,7 +203,7 @@ fn read_all_records(file: &mut File) -> io::Result> { return Ok(Vec::new()); } - if len % RECORD_SIZE != 0 { + if !len.is_multiple_of(RECORD_SIZE) { return Err(io::Error::other(format!( "file size ({len}) is not a multiple of record size ({RECORD_SIZE})" ))); @@ -295,18 +225,14 @@ fn read_all_records(file: &mut File) -> io::Result> { } impl KvpStore for HyperVKvpStore { - fn limits(&self) -> KvpLimits { - self.limits - } + const MAX_KEY_SIZE: usize = HV_KVP_EXCHANGE_MAX_KEY_SIZE; + const MAX_VALUE_SIZE: usize = HV_KVP_EXCHANGE_MAX_VALUE_SIZE; /// Append one fixed-size record to the pool file. /// - /// Validates key and value against the configured [`KvpLimits`], - /// acquires an exclusive flock, writes the record, flushes, and + /// Acquires an exclusive flock, writes the record, flushes, and /// releases the lock. - fn write(&self, key: &str, value: &str) -> io::Result<()> { - self.validate_key_value(key, value)?; - + fn backend_write(&self, key: &str, value: &str) -> Result<(), KvpError> { let mut file = self.open_for_append()?; let record = encode_record(key, value); @@ -322,9 +248,9 @@ impl KvpStore for HyperVKvpStore { if let Err(err) = write_result { let _ = unlock_result; - return Err(err); + return Err(err.into()); } - unlock_result + unlock_result.map_err(KvpError::from) } /// Scan all records and return the value of the last record @@ -332,13 +258,13 @@ impl KvpStore for HyperVKvpStore { /// /// Acquires a shared flock during the scan. Returns `Ok(None)` if /// the pool file does not exist or no record matches. - fn read(&self, key: &str) -> io::Result> { + fn backend_read(&self, key: &str) -> Result, KvpError> { let mut file = match self.open_for_read() { Ok(f) => f, Err(ref e) if e.kind() == ErrorKind::NotFound => { return Ok(None); } - Err(e) => return Err(e), + Err(e) => return Err(e.into()), }; FileExt::lock_shared(&file).map_err(|e| { @@ -358,17 +284,16 @@ impl KvpStore for HyperVKvpStore { /// Return all key-value pairs as a deduplicated `HashMap`. /// - /// Duplicate keys are resolved by last-write-wins, matching - /// [`read`](KvpStore::read) semantics. Acquires a shared flock - /// during the scan. Returns an empty map if the pool file does - /// not exist. - fn entries(&self) -> io::Result> { + /// Duplicate keys are resolved by last-write-wins. Acquires a + /// shared flock during the scan. Returns an empty map if the pool + /// file does not exist. + fn entries(&self) -> Result, KvpError> { let mut file = match self.open_for_read() { Ok(f) => f, Err(ref e) if e.kind() == ErrorKind::NotFound => { return Ok(HashMap::new()); } - Err(e) => return Err(e), + Err(e) => return Err(e.into()), }; FileExt::lock_shared(&file).map_err(|e| { @@ -386,25 +311,47 @@ impl KvpStore for HyperVKvpStore { Ok(map) } + /// Return all raw records without deduplication. + /// + /// Acquires a shared flock during the scan. Returns an empty list + /// if the pool file does not exist. + fn entries_raw(&self) -> Result, KvpError> { + let mut file = match self.open_for_read() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(Vec::new()); + } + Err(e) => return Err(e.into()), + }; + + FileExt::lock_shared(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let records = read_all_records(&mut file); + let _ = FileExt::unlock(&file); + Ok(records?) + } + /// Rewrite the pool file without the record(s) matching `key`. /// /// Acquires an exclusive flock for the duration. Returns `true` if /// at least one record was removed, `false` if the key was not /// found. Returns `Ok(false)` if the pool file does not exist. - fn delete(&self, key: &str) -> io::Result { + fn delete(&self, key: &str) -> Result { let mut file = match self.open_for_read_write() { Ok(f) => f, Err(ref e) if e.kind() == ErrorKind::NotFound => { return Ok(false); } - Err(e) => return Err(e), + Err(e) => return Err(e.into()), }; FileExt::lock_exclusive(&file).map_err(|e| { io::Error::other(format!("failed to lock KVP file: {e}")) })?; - let result = (|| -> io::Result { + let result = (|| -> Result { let records = read_all_records(&mut file)?; let original_count = records.len(); let kept: Vec<_> = @@ -427,6 +374,16 @@ impl KvpStore for HyperVKvpStore { let _ = FileExt::unlock(&file); result } + + /// Truncate the pool file to zero length, removing all records. + fn backend_clear(&self) -> Result<(), KvpError> { + self.truncate_pool() + } + + /// Whether the pool file's mtime predates the current boot. + fn is_stale(&self) -> Result { + self.pool_is_stale() + } } fn get_uptime() -> Duration { @@ -442,39 +399,7 @@ mod tests { use tempfile::NamedTempFile; fn hyperv_store(path: &Path) -> HyperVKvpStore { - HyperVKvpStore::new(path, KvpLimits::hyperv()) - } - - fn azure_store(path: &Path) -> HyperVKvpStore { - HyperVKvpStore::new(path, KvpLimits::azure()) - } - - #[test] - fn test_autodetect_uses_azure_limits_on_azure_host() { - let tag_file = NamedTempFile::new().unwrap(); - let pool_file = NamedTempFile::new().unwrap(); - // DMI files on Linux have a trailing newline. - std::fs::write(tag_file.path(), format!("{AZURE_CHASSIS_ASSET_TAG}\n")) - .unwrap(); - - let store = HyperVKvpStore::new_autodetect_with_tag_path( - pool_file.path(), - tag_file.path().to_str().unwrap(), - ); - assert_eq!(store.limits(), KvpLimits::azure()); - } - - #[test] - fn test_autodetect_uses_hyperv_limits_on_bare_hyperv() { - let tag_file = NamedTempFile::new().unwrap(); - let pool_file = NamedTempFile::new().unwrap(); - std::fs::write(tag_file.path(), "bare-hyperv-tag").unwrap(); - - let store = HyperVKvpStore::new_autodetect_with_tag_path( - pool_file.path(), - tag_file.path().to_str().unwrap(), - ); - assert_eq!(store.limits(), KvpLimits::hyperv()); + HyperVKvpStore::new(path, false).unwrap() } #[test] @@ -498,47 +423,46 @@ mod tests { let err = store.write("", "value").unwrap_err(); assert!( - err.to_string().contains("empty"), - "expected empty-key error, got: {err}" + matches!(err, KvpError::EmptyKey), + "expected EmptyKey, got: {err}" ); } #[test] - fn test_write_rejects_oversized_key() { + fn test_write_rejects_null_in_key() { let tmp = NamedTempFile::new().unwrap(); let store = hyperv_store(tmp.path()); - let key = "K".repeat(HV_KVP_EXCHANGE_MAX_KEY_SIZE + 1); - let err = store.write(&key, "v").unwrap_err(); + let err = store.write("bad\0key", "value").unwrap_err(); assert!( - err.to_string().contains("key length"), - "expected key-length error, got: {err}" + matches!(err, KvpError::KeyContainsNull), + "expected KeyContainsNull, got: {err}" ); } #[test] - fn test_write_rejects_oversized_value_hyperv() { + fn test_write_rejects_oversized_key() { let tmp = NamedTempFile::new().unwrap(); let store = hyperv_store(tmp.path()); - let value = "V".repeat(HV_KVP_EXCHANGE_MAX_VALUE_SIZE + 1); - let err = store.write("k", &value).unwrap_err(); + let key = "K".repeat(HV_KVP_EXCHANGE_MAX_KEY_SIZE + 1); + let err = store.write(&key, "v").unwrap_err(); assert!( - err.to_string().contains("value length"), - "expected value-length error, got: {err}" + matches!(err, KvpError::KeyTooLarge { .. }), + "expected KeyTooLarge, got: {err}" ); } #[test] - fn test_azure_limits_reject_long_value() { + fn test_write_rejects_oversized_value_hyperv() { let tmp = NamedTempFile::new().unwrap(); - let store = azure_store(tmp.path()); + let store = hyperv_store(tmp.path()); - let value = "V".repeat(1023); + let value = "V".repeat(HV_KVP_EXCHANGE_MAX_VALUE_SIZE + 1); let err = store.write("k", &value).unwrap_err(); assert!( - err.to_string().contains("value length"), - "expected value-length error with azure limits, got: {err}" + matches!(err, KvpError::ValueTooLarge { .. }), + "expected ValueTooLarge, got: {err}" ); } @@ -583,47 +507,71 @@ mod tests { } #[test] - fn test_truncate_if_stale_at_boot_truncates_when_file_is_older_than_boot() { + fn test_entries_raw_preserves_duplicates() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "v1").unwrap(); + store.write("key", "v2").unwrap(); + store.write("other", "v3").unwrap(); + + let raw = store.entries_raw().unwrap(); + assert_eq!(raw.len(), 3); + assert_eq!(raw[0], ("key".to_string(), "v1".to_string())); + assert_eq!(raw[1], ("key".to_string(), "v2".to_string())); + assert_eq!(raw[2], ("other".to_string(), "v3".to_string())); + } + + #[test] + fn test_clear_empties_store() { let tmp = NamedTempFile::new().unwrap(); let store = hyperv_store(tmp.path()); store.write("key", "value").unwrap(); assert!(tmp.path().metadata().unwrap().len() > 0); - store.truncate_if_stale_at_boot(i64::MAX).unwrap(); + store.clear().unwrap(); assert_eq!(tmp.path().metadata().unwrap().len(), 0); + assert_eq!(store.read("key").unwrap(), None); } #[test] - fn test_truncate_if_stale_at_boot_keeps_file_when_newer_than_boot() { + fn test_is_stale_false_for_fresh_file() { let tmp = NamedTempFile::new().unwrap(); let store = hyperv_store(tmp.path()); store.write("key", "value").unwrap(); - let len_before = tmp.path().metadata().unwrap().len(); + assert!(!store.is_stale().unwrap()); + } - store.truncate_if_stale_at_boot(0).unwrap(); - assert_eq!(tmp.path().metadata().unwrap().len(), len_before); + #[test] + fn test_is_stale_false_when_file_missing() { + let store = hyperv_store(Path::new("/tmp/nonexistent-kvp-pool")); + assert!(!store.is_stale().unwrap()); } #[test] - fn test_truncate_if_stale_keeps_fresh_file() { + fn test_pool_is_stale_at_boot_detects_old_file() { let tmp = NamedTempFile::new().unwrap(); let store = hyperv_store(tmp.path()); store.write("key", "value").unwrap(); - let len_before = tmp.path().metadata().unwrap().len(); - assert!(len_before > 0); + assert!(store.pool_is_stale_at_boot(i64::MAX).unwrap()); + } + + #[test] + fn test_pool_is_stale_at_boot_keeps_new_file() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); - store.truncate_if_stale().unwrap(); - assert_eq!(tmp.path().metadata().unwrap().len(), len_before); - assert_eq!(store.read("key").unwrap(), Some("value".to_string())); + store.write("key", "value").unwrap(); + assert!(!store.pool_is_stale_at_boot(0).unwrap()); } #[test] - fn test_truncate_if_stale_ok_when_file_missing() { + fn test_clear_ok_when_file_missing() { let store = hyperv_store(Path::new("/tmp/nonexistent-kvp-pool")); - store.truncate_if_stale().unwrap(); + store.clear().unwrap(); } #[test] @@ -655,7 +603,7 @@ mod tests { .map(|tid| { let p = path.clone(); std::thread::spawn(move || { - let store = HyperVKvpStore::new(&p, KvpLimits::hyperv()); + let store = HyperVKvpStore::new(&p, false).unwrap(); for i in 0..iterations { let key = format!("thread-{tid}-iter-{i}"); let value = format!("value-{tid}-{i}"); @@ -669,7 +617,7 @@ mod tests { h.join().expect("thread panicked"); } - let store = HyperVKvpStore::new(&path, KvpLimits::hyperv()); + let store = HyperVKvpStore::new(&path, false).unwrap(); let entries = store.entries().unwrap(); assert_eq!(entries.len(), num_threads * iterations); } diff --git a/libazureinit-kvp/src/lib.rs b/libazureinit-kvp/src/lib.rs index 89fef04f..a248b8ea 100644 --- a/libazureinit-kvp/src/lib.rs +++ b/libazureinit-kvp/src/lib.rs @@ -5,92 +5,171 @@ //! implementation for KVP pool files. //! //! - [`KvpStore`]: storage interface used by higher layers. -//! - [`HyperVKvpStore`]: production implementation. -//! - [`KvpLimits`]: exported Hyper-V and Azure byte limits. +//! - [`HyperVKvpStore`]: Hyper-V pool file implementation. +//! - [`AzureKvpStore`]: Azure-specific wrapper with stricter value limits. use std::collections::HashMap; +use std::fmt; use std::io; +pub mod azure; pub mod hyperv; +/// Errors returned by [`KvpStore`] operations. +#[derive(Debug)] +pub enum KvpError { + /// The key was empty. + EmptyKey, + /// The key exceeds the store's maximum key size. + KeyTooLarge { max: usize, actual: usize }, + /// The value exceeds the store's maximum value size. + ValueTooLarge { max: usize, actual: usize }, + /// The key contains a null byte, which is incompatible with the + /// on-disk format (null-padded fixed-width fields). + KeyContainsNull, + /// An underlying I/O error. + Io(io::Error), +} + +impl fmt::Display for KvpError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::EmptyKey => write!(f, "KVP key must not be empty"), + Self::KeyTooLarge { max, actual } => { + write!(f, "KVP key length ({actual}) exceeds maximum ({max})") + } + Self::ValueTooLarge { max, actual } => { + write!(f, "KVP value length ({actual}) exceeds maximum ({max})") + } + Self::KeyContainsNull => { + write!(f, "KVP key must not contain null bytes") + } + Self::Io(e) => write!(f, "{e}"), + } + } +} + +impl std::error::Error for KvpError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Io(e) => Some(e), + _ => None, + } + } +} + +impl From for KvpError { + fn from(err: io::Error) -> Self { + Self::Io(err) + } +} + +pub use azure::AzureKvpStore; pub use hyperv::HyperVKvpStore; -/// Hyper-V key limit in bytes (policy/default preset). -pub const HYPERV_MAX_KEY_BYTES: usize = 512; -/// Hyper-V value limit in bytes (policy/default preset). -pub const HYPERV_MAX_VALUE_BYTES: usize = 2048; -/// Azure key limit in bytes (policy/default preset). -pub const AZURE_MAX_KEY_BYTES: usize = 512; -/// Azure value limit in bytes, matching Azure host behavior. -pub const AZURE_MAX_VALUE_BYTES: usize = 1022; - -/// Storage abstraction for KVP backends. -/// -/// Semantics: -/// - `write`: stores one key/value or returns validation/I/O error. -/// - `read`: returns the most recent value for a key (last-write-wins). -/// - `entries`: returns deduplicated key/value pairs as `HashMap`. -/// - `delete`: removes all records for a key and reports whether any were removed. -/// - `limits`: returns the [`KvpLimits`] that govern maximum key/value -/// sizes for this store. +/// Key-value store that supports Hyper-V KVP semantics while being +/// generic enough for non-file-backed in-memory implementations +/// (e.g. tests). pub trait KvpStore: Send + Sync { - /// The key and value byte-size limits for this store. - /// - /// Consumers (e.g. diagnostics, tracing layers) should call this - /// instead of hardcoding size constants, so the limits stay correct - /// regardless of the underlying implementation. - fn limits(&self) -> KvpLimits; + /// Maximum key size in bytes for this store. + const MAX_KEY_SIZE: usize; + /// Maximum value size in bytes for this store. + const MAX_VALUE_SIZE: usize; - /// Write a key-value pair into the store. - /// - /// Returns an error if: - /// - The key is empty. - /// - The key exceeds the configured maximum key size. - /// - The value exceeds the configured maximum value size. - /// - An I/O error occurs during the write. - fn write(&self, key: &str, value: &str) -> io::Result<()>; + // -- Backend callouts for read/write (required) --------------------- - /// Read the value for a given key, returning `None` if absent. - /// - /// When multiple records exist for the same key (append-only - /// storage), the value from the most recent record is returned - /// (last-write-wins). - fn read(&self, key: &str) -> io::Result>; + /// Backend-specific read implementation. + fn backend_read(&self, key: &str) -> Result, KvpError>; + + /// Backend-specific write implementation. + fn backend_write(&self, key: &str, value: &str) -> Result<(), KvpError>; + + // -- Required methods (no shared validation wrapper) --------------- + + /// Return all key-value pairs, deduplicated with last-write-wins. + fn entries(&self) -> Result, KvpError>; - /// Return all key-value pairs currently in the store. + /// Return all raw key-value records without deduplication. /// - /// Keys are deduplicated using last-write-wins semantics, matching - /// the behavior of [`read`](KvpStore::read). - fn entries(&self) -> io::Result>; + /// Useful for testing or diagnostic dump commands where the full + /// record history is needed. + fn entries_raw(&self) -> Result, KvpError>; /// Remove all records matching `key`. /// /// Returns `true` if at least one record was removed, `false` if /// the key was not found. - fn delete(&self, key: &str) -> io::Result; -} + fn delete(&self, key: &str) -> Result; -/// Key/value byte limits for write validation. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub struct KvpLimits { - pub max_key_size: usize, - pub max_value_size: usize, -} + /// Backend-specific clear implementation (empty the store). + fn backend_clear(&self) -> Result<(), KvpError>; + + // -- Public API with shared validation ---------------------------- + + /// Empty the store, removing all records. + fn clear(&self) -> Result<(), KvpError> { + self.backend_clear() + } + + /// Write a key-value pair into the store. + /// + /// # Errors + /// + /// Returns [`KvpError::EmptyKey`] if the key is empty, + /// [`KvpError::KeyContainsNull`] if the key contains a null byte, + /// [`KvpError::KeyTooLarge`] if the key exceeds [`Self::MAX_KEY_SIZE`], + /// [`KvpError::ValueTooLarge`] if the value exceeds + /// [`Self::MAX_VALUE_SIZE`], or [`KvpError::Io`] on I/O failure. + fn write(&self, key: &str, value: &str) -> Result<(), KvpError> { + Self::validate_key(key)?; + Self::validate_value(value)?; + self.backend_write(key, value) + } -impl KvpLimits { - /// Hyper-V limits (512-byte key, 2,048-byte value). - pub const fn hyperv() -> Self { - Self { - max_key_size: HYPERV_MAX_KEY_BYTES, - max_value_size: HYPERV_MAX_VALUE_BYTES, + /// Read the value for a given key, returning `None` if absent. + /// + /// When multiple records share the same key, the most recent value + /// is returned (last-write-wins). + fn read(&self, key: &str) -> Result, KvpError> { + Self::validate_key(key)?; + self.backend_read(key) + } + + /// Whether the store's data is stale (e.g. predates the current + /// boot). Defaults to `false`; file-backed stores can override. + fn is_stale(&self) -> Result { + Ok(false) + } + + // -- Validation helpers ------------------------------------------- + + /// Validate a key against common constraints. + /// + /// Keys must be non-empty, must not contain null bytes (the on-disk + /// format uses null-padding), and must not exceed + /// [`Self::MAX_KEY_SIZE`] bytes. + fn validate_key(key: &str) -> Result<(), KvpError> { + if key.is_empty() { + return Err(KvpError::EmptyKey); + } + if key.as_bytes().contains(&0) { + return Err(KvpError::KeyContainsNull); + } + let actual = key.len(); + let max = Self::MAX_KEY_SIZE; + if actual > max { + return Err(KvpError::KeyTooLarge { max, actual }); } + Ok(()) } - /// Azure limits (512-byte key, 1,022-byte value). - pub const fn azure() -> Self { - Self { - max_key_size: AZURE_MAX_KEY_BYTES, - max_value_size: AZURE_MAX_VALUE_BYTES, + /// Validate a value against the store's size limit. + fn validate_value(value: &str) -> Result<(), KvpError> { + let actual = value.len(); + let max = Self::MAX_VALUE_SIZE; + if actual > max { + return Err(KvpError::ValueTooLarge { max, actual }); } + Ok(()) } }