diff --git a/Cargo.toml b/Cargo.toml index 31693d8e..83d7e630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ path = "tests/functional_tests.rs" [workspace] members = [ "libazureinit", + "libazureinit-kvp", ] [features] diff --git a/doc/libazurekvp.md b/doc/libazurekvp.md deleted file mode 100644 index 476bdcb1..00000000 --- a/doc/libazurekvp.md +++ /dev/null @@ -1,126 +0,0 @@ -# Azure-init Tracing System - -## Overview - -Azure-init implements a comprehensive tracing system that captures detailed information about the provisioning process. -This information is crucial for monitoring, debugging, and troubleshooting VM provisioning issues in Azure environments. -The tracing system is built on a multi-layered architecture that provides flexibility and robustness. - -## Architecture - -The tracing architecture consists of four specialized layers, each handling a specific aspect of the tracing process: - -### 1. EmitKVPLayer - -**Purpose**: Processes spans and events by capturing metadata, generating key-value pairs (KVPs), and writing to Hyper-V's data exchange file. - -**Key Functions**: -- Captures span lifecycle events (creation, entry, exit, closing) -- Processes emitted events within spans -- Formats data as KVPs for Hyper-V consumption -- Writes encoded data to `/var/lib/hyperv/.kvp_pool_1` - -Additionally, events emitted with a `health_report` field are written as special provisioning reports using the key `PROVISIONING_REPORT`. - -**Integration with Azure**: -- The `/var/lib/hyperv/.kvp_pool_1` file is monitored by the Hyper-V `hv_kvp_daemon` service -- This enables key metrics and logs to be transferred from the VM to the Azure platform -- Administrators can access this data through the Azure portal or API - -### 2. OpenTelemetryLayer - -**Purpose**: Propagates tracing context and prepares span data for export. - -**Key Functions**: -- Maintains distributed tracing context across service boundaries -- Exports standardized trace data to compatible backends -- Enables integration with broader monitoring ecosystems - -### 3. Stderr Layer - -**Purpose**: Formats and logs trace data to stderr. - -**Key Functions**: -- Provides human-readable logging for immediate inspection -- Supports debugging during development -- Captures trace events even when other layers might fail - -### 4. File Layer - -**Purpose**: Writes formatted logs to a file (default path: `/var/log/azure-init.log`). - -**Key Functions**: -- Provides a persistent log for post-provisioning inspection -- Uses file permissions `0600` when possible -- Log level controlled by `AZURE_INIT_LOG` (defaults to `info` for the file layer) - -## How the Layers Work Together - -Despite operating independently, these layers collaborate to provide comprehensive tracing: - -1. **Independent Processing**: Each layer processes spans and events without dependencies on other layers -2. **Ordered Execution**: Layers are executed in the order they are registered in `setup_layers` (stderr, OpenTelemetry, KVP if enabled, file if available) -3. **Complementary Functions**: Each layer serves a specific purpose in the tracing ecosystem: - - `EmitKVPLayer` focuses on Azure Hyper-V integration - - `OpenTelemetryLayer` handles standardized tracing and exports - - `Stderr Layer` provides immediate visibility for debugging - -### Configuration - -The tracing system's behavior is controlled through configuration files and environment variables, allowing more control over what data is captured and where it's sent: - -- `telemetry.kvp_diagnostics` (config): Enables/disables KVP emission. Default: `true`. -- `telemetry.kvp_filter` (config): Optional `EnvFilter`-style directives to select which spans/events go to KVP. -- `azure_init_log_path.path` (config): Target path for the file layer. Default: `/var/log/azure-init.log`. -- `AZURE_INIT_KVP_FILTER` (env): Overrides `telemetry.kvp_filter`. Precedence: env > config > default. -- `AZURE_INIT_LOG` (env): Controls stderr and file fmt layers’ levels (defaults: stderr=`error`, file=`info`). - -The KVP layer uses a conservative default filter aimed at essential provisioning signals; adjust that via the settings above as needed. -For more on how to use these configuration variables, see the [configuration documentation](./configuration.md#complete-configuration-example). - -## Practical Usage - -### Instrumenting Functions - -To instrument code with tracing, use the `#[instrument]` attribute on functions: - -```rust -use tracing::{instrument, Level, event}; - -#[instrument(fields(user_id = ?user.id))] -async fn provision_user(user: User) -> Result<(), Error> { - event!(Level::INFO, "Starting user provisioning"); - - // Function logic - - event!(Level::INFO, "User provisioning completed successfully"); - Ok(()) -} -``` - -### Emitting Events - -To record specific points within a span: - -```rust -use tracing::{event, Level}; - -fn configure_ssh_keys(user: &str, keys: &[String]) { - event!(Level::INFO, user = user, key_count = keys.len(), "Configuring SSH keys"); - - for (i, key) in keys.iter().enumerate() { - event!(Level::DEBUG, user = user, key_index = i, "Processing SSH key"); - // Process each key - } - - event!(Level::INFO, user = user, "SSH keys configured successfully"); -} -``` - -## Reference Documentation - -For more details on how the Hyper-V Data Exchange Service works, refer to the official documentation: -[Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) - -For OpenTelemetry integration details: -[OpenTelemetry for Rust](https://opentelemetry.io/docs/instrumentation/rust/) \ No newline at end of file diff --git a/libazureinit-kvp/Cargo.toml b/libazureinit-kvp/Cargo.toml new file mode 100644 index 00000000..6d5a8106 --- /dev/null +++ b/libazureinit-kvp/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "libazureinit-kvp" +version = "0.1.0" +edition = "2021" +rust-version = "1.88" +repository = "https://github.com/Azure/azure-init/" +homepage = "https://github.com/Azure/azure-init/" +license = "MIT" +description = "Hyper-V KVP (Key-Value Pair) storage library for azure-init." + +[dependencies] +fs2 = "0.4" +sysinfo = "0.38" + +[dev-dependencies] +tempfile = "3" + +[lib] +name = "libazureinit_kvp" +path = "src/lib.rs" diff --git a/libazureinit-kvp/src/azure.rs b/libazureinit-kvp/src/azure.rs new file mode 100644 index 00000000..7c35c9c9 --- /dev/null +++ b/libazureinit-kvp/src/azure.rs @@ -0,0 +1,134 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Azure-specific KVP store. +//! +//! Wraps [`HyperVKvpStore`] with the stricter value-size limit imposed +//! by the Azure host (1,022 bytes). All other behavior — record +//! format, file locking, append-only writes — is inherited from the +//! underlying Hyper-V pool file implementation. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use crate::hyperv::HyperVKvpStore; +use crate::{KvpError, KvpStore}; + +/// Azure host-side value limit (values beyond this are truncated). +const AZURE_MAX_VALUE_BYTES: usize = 1022; + +/// Azure KVP store backed by a Hyper-V pool file. +/// +/// Identical to [`HyperVKvpStore`] except that +/// [`MAX_VALUE_SIZE`](KvpStore::MAX_VALUE_SIZE) is set to 1,022 bytes, +/// matching the Azure host's truncation behavior. +#[derive(Clone, Debug)] +pub struct AzureKvpStore { + inner: HyperVKvpStore, +} + +impl AzureKvpStore { + /// Create a new Azure KVP store backed by the pool file at `path`. + /// + /// When `truncate_on_stale` is `true` the constructor checks + /// whether the pool file predates the current boot and, if so, + /// truncates it before returning. + pub fn new( + path: impl Into, + truncate_on_stale: bool, + ) -> Result { + Ok(Self { + inner: HyperVKvpStore::new(path, truncate_on_stale)?, + }) + } + + /// Return a reference to the pool file path. + pub fn path(&self) -> &Path { + self.inner.path() + } +} + +impl KvpStore for AzureKvpStore { + const MAX_KEY_SIZE: usize = HyperVKvpStore::MAX_KEY_SIZE; + const MAX_VALUE_SIZE: usize = AZURE_MAX_VALUE_BYTES; + + fn backend_read(&self, key: &str) -> Result, KvpError> { + self.inner.backend_read(key) + } + + fn backend_write(&self, key: &str, value: &str) -> Result<(), KvpError> { + self.inner.backend_write(key, value) + } + + fn entries(&self) -> Result, KvpError> { + self.inner.entries() + } + + fn entries_raw(&self) -> Result, KvpError> { + self.inner.entries_raw() + } + + fn delete(&self, key: &str) -> Result { + self.inner.delete(key) + } + + fn backend_clear(&self) -> Result<(), KvpError> { + self.inner.backend_clear() + } + + fn is_stale(&self) -> Result { + self.inner.is_stale() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::NamedTempFile; + + fn azure_store(path: &Path) -> AzureKvpStore { + AzureKvpStore::new(path, false).unwrap() + } + + #[test] + fn test_azure_rejects_value_over_1022() { + let tmp = NamedTempFile::new().unwrap(); + let store = azure_store(tmp.path()); + + let value = "V".repeat(AZURE_MAX_VALUE_BYTES + 1); + let err = store.write("k", &value).unwrap_err(); + assert!( + matches!(err, KvpError::ValueTooLarge { .. }), + "expected ValueTooLarge, got: {err}" + ); + } + + #[test] + fn test_azure_accepts_value_at_1022() { + let tmp = NamedTempFile::new().unwrap(); + let store = azure_store(tmp.path()); + + let value = "V".repeat(AZURE_MAX_VALUE_BYTES); + store.write("k", &value).unwrap(); + assert_eq!(store.read("k").unwrap(), Some(value)); + } + + #[test] + fn test_azure_write_and_read() { + let tmp = NamedTempFile::new().unwrap(); + let store = azure_store(tmp.path()); + + store.write("key", "value").unwrap(); + assert_eq!(store.read("key").unwrap(), Some("value".to_string())); + } + + #[test] + fn test_azure_clear() { + let tmp = NamedTempFile::new().unwrap(); + let store = azure_store(tmp.path()); + + store.write("key", "value").unwrap(); + store.clear().unwrap(); + assert_eq!(store.read("key").unwrap(), None); + } +} diff --git a/libazureinit-kvp/src/hyperv.rs b/libazureinit-kvp/src/hyperv.rs new file mode 100644 index 00000000..f2b359f5 --- /dev/null +++ b/libazureinit-kvp/src/hyperv.rs @@ -0,0 +1,624 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Hyper-V KVP (Key-Value Pair) pool file backend. +//! +//! Hyper-V exposes a Data Exchange Service that lets a guest and its +//! host share key-value pairs through a set of pool files. Each pool +//! file is a flat sequence of fixed-size records (512-byte key + +//! 2,048-byte value, zero-padded). There is no record-count header; +//! the file grows by one record per write. +//! +//! On Azure, the host-side KVP consumer truncates values beyond +//! 1,022 bytes, so Azure guests should use [`AzureKvpStore`](crate::AzureKvpStore). +//! +//! ## Reference +//! - [Hyper-V Data Exchange Service (KVP)](https://learn.microsoft.com/en-us/virtualization/hyper-v-on-windows/reference/integration-services#hyper-v-data-exchange-service-kvp) + +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::io::{self, ErrorKind, Read, Seek, Write}; +use std::os::unix::fs::MetadataExt; +use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use fs2::FileExt; +use sysinfo::System; + +use crate::{KvpError, KvpStore}; + +/// Key field width in the on-disk record format (bytes). +const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = 512; + +/// Value field width in the on-disk record format (bytes). +const HV_KVP_EXCHANGE_MAX_VALUE_SIZE: usize = 2048; + +/// Total size of one on-disk record (key + value). +const RECORD_SIZE: usize = + HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE; + +/// Hyper-V KVP pool file store. +/// +/// Reads and writes the binary Hyper-V KVP format: fixed-size records +/// of [`RECORD_SIZE`] bytes (512-byte key + 2,048-byte value) with +/// flock-based concurrency control. +#[derive(Clone, Debug)] +pub struct HyperVKvpStore { + path: PathBuf, +} + +impl HyperVKvpStore { + /// Create a new store backed by the pool file at `path`. + /// + /// When `truncate_on_stale` is `true` the constructor checks + /// whether the pool file predates the current boot and, if so, + /// truncates it before returning. + pub fn new( + path: impl Into, + truncate_on_stale: bool, + ) -> Result { + let store = Self { path: path.into() }; + if truncate_on_stale && store.pool_is_stale()? { + store.truncate_pool()?; + } + Ok(store) + } + + /// Return a reference to the pool file path. + pub fn path(&self) -> &Path { + &self.path + } + + // -- Private helpers ------------------------------------------------ + + fn boot_time() -> Result { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| io::Error::other(format!("clock error: {e}")))? + .as_secs(); + Ok(now.saturating_sub(get_uptime().as_secs()) as i64) + } + + /// Check whether the pool file's mtime predates the current boot. + /// + /// Returns `false` if the file does not exist. + fn pool_is_stale(&self) -> Result { + let metadata = match std::fs::metadata(&self.path) { + Ok(m) => m, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(false); + } + Err(e) => return Err(e.into()), + }; + let boot = Self::boot_time()?; + Ok(metadata.mtime() < boot) + } + + #[cfg(test)] + fn pool_is_stale_at_boot(&self, boot_time: i64) -> Result { + let metadata = match std::fs::metadata(&self.path) { + Ok(m) => m, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(false); + } + Err(e) => return Err(e.into()), + }; + Ok(metadata.mtime() < boot_time) + } + + /// Truncate the pool file to zero length under an exclusive flock. + fn truncate_pool(&self) -> Result<(), KvpError> { + let file = + match OpenOptions::new().read(true).write(true).open(&self.path) { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(()); + } + Err(e) => return Err(e.into()), + }; + + FileExt::lock_exclusive(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let result = file.set_len(0).map_err(KvpError::from); + + let _ = FileExt::unlock(&file); + result + } + + fn open_for_append(&self) -> io::Result { + OpenOptions::new() + .append(true) + .create(true) + .open(&self.path) + } + + fn open_for_read(&self) -> io::Result { + OpenOptions::new().read(true).open(&self.path) + } + + fn open_for_read_write(&self) -> io::Result { + OpenOptions::new().read(true).write(true).open(&self.path) + } +} + +/// Encode a key-value pair into a single fixed-size record. +/// +/// The key is zero-padded to [`HV_KVP_EXCHANGE_MAX_KEY_SIZE`] bytes +/// and the value is zero-padded to [`HV_KVP_EXCHANGE_MAX_VALUE_SIZE`] +/// bytes. The caller is responsible for ensuring the key and value do +/// not exceed the on-disk field widths; if they do, only the first N +/// bytes are written (no error is raised at this level -- validation +/// happens in [`HyperVKvpStore::write`]). +pub(crate) fn encode_record(key: &str, value: &str) -> Vec { + let mut buf = vec![0u8; RECORD_SIZE]; + + let key_bytes = key.as_bytes(); + let key_len = key_bytes.len().min(HV_KVP_EXCHANGE_MAX_KEY_SIZE); + buf[..key_len].copy_from_slice(&key_bytes[..key_len]); + + let val_bytes = value.as_bytes(); + let val_len = val_bytes.len().min(HV_KVP_EXCHANGE_MAX_VALUE_SIZE); + buf[HV_KVP_EXCHANGE_MAX_KEY_SIZE..HV_KVP_EXCHANGE_MAX_KEY_SIZE + val_len] + .copy_from_slice(&val_bytes[..val_len]); + + buf +} + +/// Decode a fixed-size record into its key and value strings. +/// +/// Trailing null bytes are stripped from both fields. Returns an error +/// if `data` is not exactly [`RECORD_SIZE`] bytes or if either field +/// contains invalid UTF-8. +pub(crate) fn decode_record(data: &[u8]) -> io::Result<(String, String)> { + if data.len() != RECORD_SIZE { + return Err(io::Error::other(format!( + "record size mismatch: expected {RECORD_SIZE}, got {}", + data.len() + ))); + } + + let (key_bytes, value_bytes) = data.split_at(HV_KVP_EXCHANGE_MAX_KEY_SIZE); + + let key = std::str::from_utf8(key_bytes) + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? + .trim_end_matches('\0') + .to_string(); + + let value = std::str::from_utf8(value_bytes) + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? + .trim_end_matches('\0') + .to_string(); + + Ok((key, value)) +} + +/// Read all records from a file that is already open and locked. +fn read_all_records(file: &mut File) -> io::Result> { + let metadata = file.metadata()?; + let len = metadata.len() as usize; + + if len == 0 { + return Ok(Vec::new()); + } + + if !len.is_multiple_of(RECORD_SIZE) { + return Err(io::Error::other(format!( + "file size ({len}) is not a multiple of record size ({RECORD_SIZE})" + ))); + } + + // Ensure we start reading from the beginning of the file. + file.seek(io::SeekFrom::Start(0))?; + + let record_count = len / RECORD_SIZE; + let mut records = Vec::with_capacity(record_count); + let mut buf = [0u8; RECORD_SIZE]; + + for _ in 0..record_count { + file.read_exact(&mut buf)?; + records.push(decode_record(&buf)?); + } + + Ok(records) +} + +impl KvpStore for HyperVKvpStore { + const MAX_KEY_SIZE: usize = HV_KVP_EXCHANGE_MAX_KEY_SIZE; + const MAX_VALUE_SIZE: usize = HV_KVP_EXCHANGE_MAX_VALUE_SIZE; + + /// Append one fixed-size record to the pool file. + /// + /// Acquires an exclusive flock, writes the record, flushes, and + /// releases the lock. + fn backend_write(&self, key: &str, value: &str) -> Result<(), KvpError> { + let mut file = self.open_for_append()?; + let record = encode_record(key, value); + + FileExt::lock_exclusive(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let write_result = file.write_all(&record).and_then(|_| file.flush()); + + let unlock_result = FileExt::unlock(&file).map_err(|e| { + io::Error::other(format!("failed to unlock KVP file: {e}")) + }); + + if let Err(err) = write_result { + let _ = unlock_result; + return Err(err.into()); + } + unlock_result.map_err(KvpError::from) + } + + /// Scan all records and return the value of the last record + /// matching `key` (last-write-wins). + /// + /// Acquires a shared flock during the scan. Returns `Ok(None)` if + /// the pool file does not exist or no record matches. + fn backend_read(&self, key: &str) -> Result, KvpError> { + let mut file = match self.open_for_read() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(None); + } + Err(e) => return Err(e.into()), + }; + + FileExt::lock_shared(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let records = read_all_records(&mut file); + let _ = FileExt::unlock(&file); + let records = records?; + + Ok(records + .into_iter() + .rev() + .find(|(k, _)| k == key) + .map(|(_, v)| v)) + } + + /// Return all key-value pairs as a deduplicated `HashMap`. + /// + /// Duplicate keys are resolved by last-write-wins. Acquires a + /// shared flock during the scan. Returns an empty map if the pool + /// file does not exist. + fn entries(&self) -> Result, KvpError> { + let mut file = match self.open_for_read() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(HashMap::new()); + } + Err(e) => return Err(e.into()), + }; + + FileExt::lock_shared(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let records = read_all_records(&mut file); + let _ = FileExt::unlock(&file); + let records = records?; + + let mut map = HashMap::new(); + for (k, v) in records { + map.insert(k, v); + } + Ok(map) + } + + /// Return all raw records without deduplication. + /// + /// Acquires a shared flock during the scan. Returns an empty list + /// if the pool file does not exist. + fn entries_raw(&self) -> Result, KvpError> { + let mut file = match self.open_for_read() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(Vec::new()); + } + Err(e) => return Err(e.into()), + }; + + FileExt::lock_shared(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let records = read_all_records(&mut file); + let _ = FileExt::unlock(&file); + Ok(records?) + } + + /// Rewrite the pool file without the record(s) matching `key`. + /// + /// Acquires an exclusive flock for the duration. Returns `true` if + /// at least one record was removed, `false` if the key was not + /// found. Returns `Ok(false)` if the pool file does not exist. + fn delete(&self, key: &str) -> Result { + let mut file = match self.open_for_read_write() { + Ok(f) => f, + Err(ref e) if e.kind() == ErrorKind::NotFound => { + return Ok(false); + } + Err(e) => return Err(e.into()), + }; + + FileExt::lock_exclusive(&file).map_err(|e| { + io::Error::other(format!("failed to lock KVP file: {e}")) + })?; + + let result = (|| -> Result { + let records = read_all_records(&mut file)?; + let original_count = records.len(); + let kept: Vec<_> = + records.into_iter().filter(|(k, _)| k != key).collect(); + + if kept.len() == original_count { + return Ok(false); + } + + file.set_len(0)?; + file.seek(io::SeekFrom::Start(0))?; + + for (k, v) in &kept { + file.write_all(&encode_record(k, v))?; + } + file.flush()?; + Ok(true) + })(); + + let _ = FileExt::unlock(&file); + result + } + + /// Truncate the pool file to zero length, removing all records. + fn backend_clear(&self) -> Result<(), KvpError> { + self.truncate_pool() + } + + /// Whether the pool file's mtime predates the current boot. + fn is_stale(&self) -> Result { + self.pool_is_stale() + } +} + +fn get_uptime() -> Duration { + let mut system = System::new(); + system.refresh_memory(); + system.refresh_cpu_usage(); + Duration::from_secs(System::uptime()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::NamedTempFile; + + fn hyperv_store(path: &Path) -> HyperVKvpStore { + HyperVKvpStore::new(path, false).unwrap() + } + + #[test] + fn test_encode_decode_roundtrip() { + let key = "test_key"; + let value = "test_value"; + let record = encode_record(key, value); + + assert_eq!(record.len(), RECORD_SIZE); + + let (decoded_key, decoded_value) = + decode_record(&record).expect("decode failed"); + assert_eq!(decoded_key, key); + assert_eq!(decoded_value, value); + } + + #[test] + fn test_write_rejects_empty_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + let err = store.write("", "value").unwrap_err(); + assert!( + matches!(err, KvpError::EmptyKey), + "expected EmptyKey, got: {err}" + ); + } + + #[test] + fn test_write_rejects_null_in_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + let err = store.write("bad\0key", "value").unwrap_err(); + assert!( + matches!(err, KvpError::KeyContainsNull), + "expected KeyContainsNull, got: {err}" + ); + } + + #[test] + fn test_write_rejects_oversized_key() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + let key = "K".repeat(HV_KVP_EXCHANGE_MAX_KEY_SIZE + 1); + let err = store.write(&key, "v").unwrap_err(); + assert!( + matches!(err, KvpError::KeyTooLarge { .. }), + "expected KeyTooLarge, got: {err}" + ); + } + + #[test] + fn test_write_rejects_oversized_value_hyperv() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + let value = "V".repeat(HV_KVP_EXCHANGE_MAX_VALUE_SIZE + 1); + let err = store.write("k", &value).unwrap_err(); + assert!( + matches!(err, KvpError::ValueTooLarge { .. }), + "expected ValueTooLarge, got: {err}" + ); + } + + #[test] + fn test_write_and_read() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key1", "value1").unwrap(); + store.write("key2", "value2").unwrap(); + + assert_eq!(store.read("key1").unwrap(), Some("value1".to_string())); + assert_eq!(store.read("key2").unwrap(), Some("value2".to_string())); + assert_eq!(store.read("nonexistent").unwrap(), None); + } + + #[test] + fn test_read_returns_last_match() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "first").unwrap(); + store.write("key", "second").unwrap(); + store.write("key", "third").unwrap(); + + assert_eq!(store.read("key").unwrap(), Some("third".to_string())); + } + + #[test] + fn test_entries_deduplicates_with_last_write_wins() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "v1").unwrap(); + store.write("key", "v2").unwrap(); + store.write("other", "v3").unwrap(); + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 2); + assert_eq!(entries.get("key"), Some(&"v2".to_string())); + assert_eq!(entries.get("other"), Some(&"v3".to_string())); + } + + #[test] + fn test_entries_raw_preserves_duplicates() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "v1").unwrap(); + store.write("key", "v2").unwrap(); + store.write("other", "v3").unwrap(); + + let raw = store.entries_raw().unwrap(); + assert_eq!(raw.len(), 3); + assert_eq!(raw[0], ("key".to_string(), "v1".to_string())); + assert_eq!(raw[1], ("key".to_string(), "v2".to_string())); + assert_eq!(raw[2], ("other".to_string(), "v3".to_string())); + } + + #[test] + fn test_clear_empties_store() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "value").unwrap(); + assert!(tmp.path().metadata().unwrap().len() > 0); + + store.clear().unwrap(); + assert_eq!(tmp.path().metadata().unwrap().len(), 0); + assert_eq!(store.read("key").unwrap(), None); + } + + #[test] + fn test_is_stale_false_for_fresh_file() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "value").unwrap(); + assert!(!store.is_stale().unwrap()); + } + + #[test] + fn test_is_stale_false_when_file_missing() { + let store = hyperv_store(Path::new("/tmp/nonexistent-kvp-pool")); + assert!(!store.is_stale().unwrap()); + } + + #[test] + fn test_pool_is_stale_at_boot_detects_old_file() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "value").unwrap(); + assert!(store.pool_is_stale_at_boot(i64::MAX).unwrap()); + } + + #[test] + fn test_pool_is_stale_at_boot_keeps_new_file() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "value").unwrap(); + assert!(!store.pool_is_stale_at_boot(0).unwrap()); + } + + #[test] + fn test_clear_ok_when_file_missing() { + let store = hyperv_store(Path::new("/tmp/nonexistent-kvp-pool")); + store.clear().unwrap(); + } + + #[test] + fn test_delete_removes_all_matches() { + let tmp = NamedTempFile::new().unwrap(); + let store = hyperv_store(tmp.path()); + + store.write("key", "v1").unwrap(); + store.write("key", "v2").unwrap(); + store.write("other", "v3").unwrap(); + + assert!(store.delete("key").unwrap()); + assert_eq!(store.read("key").unwrap(), None); + assert_eq!(store.read("other").unwrap(), Some("v3".to_string())); + + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), 1); + } + + #[test] + fn test_multi_thread_concurrent_writes() { + let tmp = NamedTempFile::new().unwrap(); + let path = tmp.path().to_path_buf(); + + let num_threads: usize = 20; + let iterations: usize = 1_000; + + let handles: Vec<_> = (0..num_threads) + .map(|tid| { + let p = path.clone(); + std::thread::spawn(move || { + let store = HyperVKvpStore::new(&p, false).unwrap(); + for i in 0..iterations { + let key = format!("thread-{tid}-iter-{i}"); + let value = format!("value-{tid}-{i}"); + store.write(&key, &value).expect("write failed"); + } + }) + }) + .collect(); + + for h in handles { + h.join().expect("thread panicked"); + } + + let store = HyperVKvpStore::new(&path, false).unwrap(); + let entries = store.entries().unwrap(); + assert_eq!(entries.len(), num_threads * iterations); + } +} diff --git a/libazureinit-kvp/src/lib.rs b/libazureinit-kvp/src/lib.rs new file mode 100644 index 00000000..a248b8ea --- /dev/null +++ b/libazureinit-kvp/src/lib.rs @@ -0,0 +1,175 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! `libazureinit-kvp` provides a storage trait and Hyper-V-backed +//! implementation for KVP pool files. +//! +//! - [`KvpStore`]: storage interface used by higher layers. +//! - [`HyperVKvpStore`]: Hyper-V pool file implementation. +//! - [`AzureKvpStore`]: Azure-specific wrapper with stricter value limits. + +use std::collections::HashMap; +use std::fmt; +use std::io; + +pub mod azure; +pub mod hyperv; + +/// Errors returned by [`KvpStore`] operations. +#[derive(Debug)] +pub enum KvpError { + /// The key was empty. + EmptyKey, + /// The key exceeds the store's maximum key size. + KeyTooLarge { max: usize, actual: usize }, + /// The value exceeds the store's maximum value size. + ValueTooLarge { max: usize, actual: usize }, + /// The key contains a null byte, which is incompatible with the + /// on-disk format (null-padded fixed-width fields). + KeyContainsNull, + /// An underlying I/O error. + Io(io::Error), +} + +impl fmt::Display for KvpError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::EmptyKey => write!(f, "KVP key must not be empty"), + Self::KeyTooLarge { max, actual } => { + write!(f, "KVP key length ({actual}) exceeds maximum ({max})") + } + Self::ValueTooLarge { max, actual } => { + write!(f, "KVP value length ({actual}) exceeds maximum ({max})") + } + Self::KeyContainsNull => { + write!(f, "KVP key must not contain null bytes") + } + Self::Io(e) => write!(f, "{e}"), + } + } +} + +impl std::error::Error for KvpError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Io(e) => Some(e), + _ => None, + } + } +} + +impl From for KvpError { + fn from(err: io::Error) -> Self { + Self::Io(err) + } +} + +pub use azure::AzureKvpStore; +pub use hyperv::HyperVKvpStore; + +/// Key-value store that supports Hyper-V KVP semantics while being +/// generic enough for non-file-backed in-memory implementations +/// (e.g. tests). +pub trait KvpStore: Send + Sync { + /// Maximum key size in bytes for this store. + const MAX_KEY_SIZE: usize; + /// Maximum value size in bytes for this store. + const MAX_VALUE_SIZE: usize; + + // -- Backend callouts for read/write (required) --------------------- + + /// Backend-specific read implementation. + fn backend_read(&self, key: &str) -> Result, KvpError>; + + /// Backend-specific write implementation. + fn backend_write(&self, key: &str, value: &str) -> Result<(), KvpError>; + + // -- Required methods (no shared validation wrapper) --------------- + + /// Return all key-value pairs, deduplicated with last-write-wins. + fn entries(&self) -> Result, KvpError>; + + /// Return all raw key-value records without deduplication. + /// + /// Useful for testing or diagnostic dump commands where the full + /// record history is needed. + fn entries_raw(&self) -> Result, KvpError>; + + /// Remove all records matching `key`. + /// + /// Returns `true` if at least one record was removed, `false` if + /// the key was not found. + fn delete(&self, key: &str) -> Result; + + /// Backend-specific clear implementation (empty the store). + fn backend_clear(&self) -> Result<(), KvpError>; + + // -- Public API with shared validation ---------------------------- + + /// Empty the store, removing all records. + fn clear(&self) -> Result<(), KvpError> { + self.backend_clear() + } + + /// Write a key-value pair into the store. + /// + /// # Errors + /// + /// Returns [`KvpError::EmptyKey`] if the key is empty, + /// [`KvpError::KeyContainsNull`] if the key contains a null byte, + /// [`KvpError::KeyTooLarge`] if the key exceeds [`Self::MAX_KEY_SIZE`], + /// [`KvpError::ValueTooLarge`] if the value exceeds + /// [`Self::MAX_VALUE_SIZE`], or [`KvpError::Io`] on I/O failure. + fn write(&self, key: &str, value: &str) -> Result<(), KvpError> { + Self::validate_key(key)?; + Self::validate_value(value)?; + self.backend_write(key, value) + } + + /// Read the value for a given key, returning `None` if absent. + /// + /// When multiple records share the same key, the most recent value + /// is returned (last-write-wins). + fn read(&self, key: &str) -> Result, KvpError> { + Self::validate_key(key)?; + self.backend_read(key) + } + + /// Whether the store's data is stale (e.g. predates the current + /// boot). Defaults to `false`; file-backed stores can override. + fn is_stale(&self) -> Result { + Ok(false) + } + + // -- Validation helpers ------------------------------------------- + + /// Validate a key against common constraints. + /// + /// Keys must be non-empty, must not contain null bytes (the on-disk + /// format uses null-padding), and must not exceed + /// [`Self::MAX_KEY_SIZE`] bytes. + fn validate_key(key: &str) -> Result<(), KvpError> { + if key.is_empty() { + return Err(KvpError::EmptyKey); + } + if key.as_bytes().contains(&0) { + return Err(KvpError::KeyContainsNull); + } + let actual = key.len(); + let max = Self::MAX_KEY_SIZE; + if actual > max { + return Err(KvpError::KeyTooLarge { max, actual }); + } + Ok(()) + } + + /// Validate a value against the store's size limit. + fn validate_value(value: &str) -> Result<(), KvpError> { + let actual = value.len(); + let max = Self::MAX_VALUE_SIZE; + if actual > max { + return Err(KvpError::ValueTooLarge { max, actual }); + } + Ok(()) + } +}