diff --git a/doc/libazurekvp.md b/doc/libazurekvp.md index 476bdcb1..cbe972cb 100644 --- a/doc/libazurekvp.md +++ b/doc/libazurekvp.md @@ -80,40 +80,70 @@ For more on how to use these configuration variables, see the [configuration doc ## Practical Usage -### Instrumenting Functions +There are two valid ways to emit KVP data: +- Use the direct `Kvp` client API for explicit event/report writes. +- Use tracing instrumentation (`#[instrument]` and `event!`) with `setup_layers`. -To instrument code with tracing, use the `#[instrument]` attribute on functions: +### Using the KVP Client API -```rust -use tracing::{instrument, Level, event}; +For external callers that want to emit KVP diagnostics directly, use the `Kvp` client: -#[instrument(fields(user_id = ?user.id))] -async fn provision_user(user: User) -> Result<(), Error> { - event!(Level::INFO, "Starting user provisioning"); - - // Function logic - - event!(Level::INFO, "User provisioning completed successfully"); +```rust +use libazureinit::logging::{Kvp, KvpOptions}; +use tracing::Level; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Simple path: all defaults + let kvp = Kvp::new()?; + kvp.emit("Provisioning started", None)?; // defaults to DEBUG + kvp.emit("Provisioning info event", Some(Level::INFO))?; + kvp.emit_health_report("result=success")?; + kvp.close().await?; + + // Advanced path: override defaults + let custom = Kvp::with_options( + KvpOptions::default() + .vm_id("00000000-0000-0000-0000-000000000001") + .event_prefix("my-service") + .file_path("/var/lib/hyperv/.kvp_pool_1") + .truncate_on_start(true), + )?; + custom.emit("Custom event payload", None)?; + custom.close().await?; Ok(()) } ``` -### Emitting Events +Default behavior for `Kvp::new()`: +- `vm_id`: resolved from platform metadata (`get_vm_id`) +- `event_prefix`: `"azure-init-"` (e.g. `"azure-init-0.1.1"`) +- `file_path`: `/var/lib/hyperv/.kvp_pool_1` +- `truncate_on_start`: `true` + +### Truncation and Locking Behavior + +On startup, KVP performs a stale-data check and may truncate the guest pool file. -To record specific points within a span: +- The truncate path uses file locking to avoid races between clients. +- If truncation lock acquisition is unavailable (another client already holds it), + initialization continues without failing. +- This lock-contention case is expected in multi-client scenarios and is logged. + +### Instrumenting with Tracing + +`azure-init` itself continues to use tracing layers (`setup_layers`) for KVP emission. +To instrument code with tracing, use the `#[instrument]` attribute and `event!`: ```rust -use tracing::{event, Level}; - -fn configure_ssh_keys(user: &str, keys: &[String]) { - event!(Level::INFO, user = user, key_count = keys.len(), "Configuring SSH keys"); - - for (i, key) in keys.iter().enumerate() { - event!(Level::DEBUG, user = user, key_index = i, "Processing SSH key"); - // Process each key - } - - event!(Level::INFO, user = user, "SSH keys configured successfully"); +use tracing::{event, instrument, Level}; + +#[instrument(fields(user_id = ?user.id))] +async fn provision_user(user: User) -> Result<(), Error> { + event!(Level::INFO, "Starting user provisioning"); + // Function logic + event!(Level::INFO, "User provisioning completed successfully"); + Ok(()) } ``` diff --git a/libazureinit/src/kvp.rs b/libazureinit/src/kvp.rs index ae73be50..33d77db0 100644 --- a/libazureinit/src/kvp.rs +++ b/libazureinit/src/kvp.rs @@ -15,7 +15,7 @@ use std::{ fs::{File, OpenOptions}, io::{self, ErrorKind, Write}, os::unix::fs::MetadataExt, - path::Path, + path::{Path, PathBuf}, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; @@ -23,8 +23,9 @@ use fs2::FileExt; use tracing::{ field::Visit, + info, span::{Attributes, Id}, - Subscriber, + warn, Level, Subscriber, }; use tracing_subscriber::{ @@ -45,7 +46,52 @@ use uuid::Uuid; const HV_KVP_EXCHANGE_MAX_KEY_SIZE: usize = 512; const HV_KVP_EXCHANGE_MAX_VALUE_SIZE: usize = 2048; const HV_KVP_AZURE_MAX_VALUE_SIZE: usize = 1022; -const EVENT_PREFIX: &str = concat!("azure-init-", env!("CARGO_PKG_VERSION")); +/// The default event prefix used when no custom prefix is provided. +pub const EVENT_PREFIX: &str = + concat!("azure-init-", env!("CARGO_PKG_VERSION")); +const DEFAULT_KVP_FILE_PATH: &str = "/var/lib/hyperv/.kvp_pool_1"; + +/// Configuration options for creating a [`Kvp`] client. +#[derive(Clone, Debug)] +pub struct KvpOptions { + pub vm_id: Option, + pub event_prefix: Option, + pub file_path: PathBuf, + pub truncate_on_start: bool, +} + +impl Default for KvpOptions { + fn default() -> Self { + Self { + vm_id: None, + event_prefix: None, + file_path: PathBuf::from(DEFAULT_KVP_FILE_PATH), + truncate_on_start: true, + } + } +} + +impl KvpOptions { + pub fn vm_id>(mut self, vm_id: T) -> Self { + self.vm_id = Some(vm_id.into()); + self + } + + pub fn event_prefix>(mut self, event_prefix: T) -> Self { + self.event_prefix = Some(event_prefix.into()); + self + } + + pub fn file_path>(mut self, file_path: T) -> Self { + self.file_path = file_path.as_ref().to_path_buf(); + self + } + + pub fn truncate_on_start(mut self, truncate_on_start: bool) -> Self { + self.truncate_on_start = truncate_on_start; + self + } +} /// Encapsulates the KVP (Key-Value Pair) tracing infrastructure. /// @@ -61,9 +107,45 @@ pub struct Kvp { /// KVP data to the file. The caller can use this handle to wait for /// the writer to finish. pub(crate) writer: JoinHandle>, + shutdown: Option, } impl Kvp { + /// Create a new KVP client with sensible defaults. + pub fn new() -> Result { + Self::with_options(KvpOptions::default()) + } + + /// Create a new KVP client with explicit options. + pub fn with_options(options: KvpOptions) -> Result { + let vm_id = match options.vm_id { + Some(vm_id) => vm_id, + None => crate::get_vm_id() + .ok_or_else(|| anyhow::anyhow!("Failed to resolve VM ID"))?, + }; + let event_prefix = options + .event_prefix + .unwrap_or_else(|| EVENT_PREFIX.to_string()); + let shutdown = CancellationToken::new(); + let mut kvp = Self::new_internal( + options.file_path, + &vm_id, + &event_prefix, + shutdown.clone(), + options.truncate_on_start, + )?; + kvp.shutdown = Some(shutdown); + Ok(kvp) + } + + /// Opens a KVP guest pool file for appending. + /// + /// All callers that need a file handle for KVP writes should go through + /// this method so that the open-mode is consistent everywhere. + fn open_kvp_file(path: &Path) -> io::Result { + OpenOptions::new().append(true).create(true).open(path) + } + /// Creates a new `Kvp` instance, spawning a background task for writing /// KVP telemetry data to a file. /// @@ -72,17 +154,18 @@ impl Kvp { /// - It creates an unbounded channel for passing encoded KVP data from the /// tracing layer to the writer task. /// - It spawns the `kvp_writer` task, which listens for data and shutdown signals. - pub(crate) fn new( + pub(crate) fn new_internal( file_path: std::path::PathBuf, vm_id: &str, + event_prefix: &str, graceful_shutdown: CancellationToken, + truncate_on_start: bool, ) -> Result { - truncate_guest_pool_file(&file_path)?; + if truncate_on_start { + truncate_guest_pool_file(&file_path)?; + } - let file = OpenOptions::new() - .append(true) - .create(true) - .open(&file_path)?; + let file = Self::open_kvp_file(&file_path)?; let (events_tx, events_rx): ( UnboundedSender>, @@ -96,11 +179,47 @@ impl Kvp { tracing_layer: EmitKVPLayer { events_tx, vm_id: vm_id.to_string(), + event_prefix: event_prefix.to_string(), }, writer, + shutdown: None, }) } + /// Emit a KVP event. Defaults to `DEBUG` level if none provided. + pub fn emit( + &self, + message: &str, + level: Option, + ) -> Result<(), anyhow::Error> { + let level = level.unwrap_or(Level::DEBUG); + self.tracing_layer.handle_kvp_operation( + level.as_str(), + "kvp_emit", + &Uuid::new_v4().to_string(), + message, + ); + Ok(()) + } + + /// Emit a provisioning report (`PROVISIONING_REPORT`) entry. + pub fn emit_health_report>( + &self, + report: T, + ) -> Result<(), anyhow::Error> { + self.tracing_layer.emit_health_report(report.as_ref()); + Ok(()) + } + + /// Gracefully stop the writer and flush queued KVP data. + pub async fn close(self) -> Result<(), anyhow::Error> { + if let Some(shutdown) = self.shutdown { + shutdown.cancel(); + } + self.writer.await??; + Ok(()) + } + /// The background task that writes encoded KVP data to a file. /// /// This asynchronous function runs in a loop, waiting for two events: @@ -167,7 +286,10 @@ impl Kvp { /// # Arguments /// * `file` - A mutable reference to the file to write to. /// * `kvps` - A slice of encoded KVP messages to write. - fn write_kvps(file: &mut File, kvps: &[Vec]) -> io::Result<()> { + pub(crate) fn write_kvps( + file: &mut File, + kvps: &[Vec], + ) -> io::Result<()> { FileExt::lock_exclusive(file).map_err(|e| { io::Error::other(format!("Failed to lock KVP file: {e}")) })?; @@ -254,6 +376,7 @@ impl Visit for StringVisitor<'_> { pub struct EmitKVPLayer { events_tx: UnboundedSender>, vm_id: String, + event_prefix: String, } impl EmitKVPLayer { @@ -275,12 +398,23 @@ impl EmitKVPLayer { span_id: &str, event_value: &str, ) { - let event_key = - generate_event_key(&self.vm_id, event_level, event_name, span_id); + let event_key = generate_event_key( + &self.event_prefix, + &self.vm_id, + event_level, + event_name, + span_id, + ); let encoded_kvp = encode_kvp_item(&event_key, event_value); let encoded_kvp_flattened: Vec = encoded_kvp.concat(); self.send_event(encoded_kvp_flattened); } + + /// Sends a provisioning report directly into the writer queue. + pub(crate) fn emit_health_report(&self, report: &str) { + let msg = encode_kvp_item("PROVISIONING_REPORT", report).concat(); + self.send_event(msg); + } } impl Layer for EmitKVPLayer @@ -425,19 +559,24 @@ where } } -/// Generates a unique event key by combining the event level, name, and span ID. +/// Generates a unique event key by combining the event prefix, VM ID, level, +/// name, and span ID. /// /// # Arguments +/// * `event_prefix` - A prefix identifying the emitting application or library +/// (e.g., `"azure-init-0.1.1"` or `"my-library-2.0"`). +/// * `vm_id` - The unique identifier for the VM. /// * `event_level` - The logging level (e.g., "INFO", "DEBUG"). /// * `event_name` - The name of the event. /// * `span_id` - A unique identifier for the span. fn generate_event_key( + event_prefix: &str, vm_id: &str, event_level: &str, event_name: &str, span_id: &str, ) -> String { - format!("{EVENT_PREFIX}|{vm_id}|{event_level}|{event_name}|{span_id}") + format!("{event_prefix}|{vm_id}|{event_level}|{event_name}|{span_id}") } /// Encodes a key-value pair (KVP) into one or more byte slices. If the value @@ -452,7 +591,7 @@ fn generate_event_key( /// # Arguments /// * `key` - The key as a string slice. /// * `value` - The value associated with the key. -fn encode_kvp_item(key: &str, value: &str) -> Vec> { +pub(crate) fn encode_kvp_item(key: &str, value: &str) -> Vec> { let key_buf = key .as_bytes() .iter() @@ -532,37 +671,55 @@ pub fn decode_kvp_item( } /// Truncates the guest pool KVP file if it contains stale data (i.e., data -/// older than the system's boot time). Logs whether the file was truncated -/// or no action was needed. +/// older than the system's boot time). +/// +/// An exclusive `flock` is held while checking metadata and truncating so +/// that concurrent processes don't race on the same check-then-truncate +/// sequence. fn truncate_guest_pool_file(kvp_file: &Path) -> Result<(), anyhow::Error> { let boot_time = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() - get_uptime().as_secs(); - match kvp_file.metadata() { - Ok(metadata) => { - if metadata.mtime() < boot_time as i64 { - OpenOptions::new() - .write(true) - .truncate(true) - .open(kvp_file)?; - println!("Truncated the KVP file due to stale data."); - } else { - println!( - "File has been truncated since boot, no action taken." - ); - } - } + // Try to open the file; if it doesn't exist there is nothing to truncate. + let file = match OpenOptions::new().read(true).write(true).open(kvp_file) { + Ok(f) => f, Err(ref e) if e.kind() == ErrorKind::NotFound => { println!("File not found: {kvp_file:?}"); return Ok(()); } Err(e) => { return Err(anyhow::Error::from(e) - .context("Failed to access file metadata")); + .context("Failed to open KVP file for truncation")); } + }; + + // Hold an exclusive lock for the metadata-check + truncate window so + // that two concurrent callers cannot both decide the file is stale and + // truncate data the other has already written. + if let Err(e) = FileExt::try_lock_exclusive(&file) { + warn!( + "Could not acquire KVP truncation lock; assuming another client is handling truncation: {}", + e + ); + return Ok(()); } - Ok(()) + let result = (|| -> Result<(), anyhow::Error> { + let metadata = file.metadata()?; + if metadata.mtime() < boot_time as i64 { + file.set_len(0)?; + println!("Truncated the KVP file due to stale data."); + } else { + println!("File has been truncated since boot, no action taken."); + info!("KVP file already fresh since boot; no truncation needed."); + } + Ok(()) + })(); + + // Always release the lock, even if the inner operation failed. + let _ = FileExt::unlock(&file); + + result } /// Retrieves the system's uptime using the `sysinfo` crate, returning the duration @@ -686,9 +843,14 @@ mod tests { let test_vm_id = "00000000-0000-0000-0000-000000000001"; let graceful_shutdown = CancellationToken::new(); - let kvp = - Kvp::new(temp_path.clone(), test_vm_id, graceful_shutdown.clone()) - .expect("Failed to create Kvp"); + let kvp = Kvp::new_internal( + temp_path.clone(), + test_vm_id, + EVENT_PREFIX, + graceful_shutdown.clone(), + true, + ) + .expect("Failed to create Kvp"); let subscriber = Registry::default().with(kvp.tracing_layer); let default_guard = tracing::subscriber::set_default(subscriber); @@ -806,6 +968,81 @@ mod tests { } } + #[tokio::test] + async fn test_truncate_lock_contention_is_non_fatal() { + const LOCK_ENV_PATH: &str = "__KVP_TRUNCATION_LOCK_PATH"; + + // Child path: hold an exclusive lock briefly, then exit. + if let Ok(path) = std::env::var(LOCK_ENV_PATH) { + let lock_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(&path) + .expect("Child: open lock file"); + FileExt::lock_exclusive(&lock_file).expect("Child: acquire lock"); + std::thread::sleep(std::time::Duration::from_millis(800)); + return; + } + + let temp_file = + NamedTempFile::new().expect("Failed to create tempfile"); + let temp_path = temp_file.path().to_path_buf(); + std::fs::write(&temp_path, "stale-ish data") + .expect("Failed to seed temp file"); + + let test_exe = std::env::current_exe() + .expect("Failed to determine test executable path"); + + let mut child = std::process::Command::new(&test_exe) + .env(LOCK_ENV_PATH, temp_path.to_str().unwrap()) + .arg("--exact") + .arg("kvp::tests::test_truncate_lock_contention_is_non_fatal") + .arg("--nocapture") + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .expect("Failed to spawn child lock holder"); + + // Give the child a moment to acquire the lock. + std::thread::sleep(std::time::Duration::from_millis(150)); + + let graceful_shutdown = CancellationToken::new(); + let start = std::time::Instant::now(); + let kvp = Kvp::new_internal( + temp_path.clone(), + "00000000-0000-0000-0000-000000000003", + EVENT_PREFIX, + graceful_shutdown.clone(), + true, + ); + let elapsed = start.elapsed(); + + assert!( + kvp.is_ok(), + "Kvp initialization should not fail under truncation lock contention" + ); + assert!( + elapsed < std::time::Duration::from_millis(500), + "Kvp initialization should return quickly under lock contention, took {:?}", + elapsed + ); + + graceful_shutdown.cancel(); + kvp.expect("Kvp should initialize") + .writer + .await + .expect("KVP writer task panicked") + .expect("KVP writer task returned an IO error"); + + let status = child.wait().expect("Failed to wait on child"); + assert!( + status.success(), + "Child lock holder exited with failure: {status}" + ); + } + #[test] fn test_encode_kvp_item_value_length() { let key = "test_key"; @@ -868,10 +1105,12 @@ mod tests { let graceful_shutdown = CancellationToken::new(); let emit_kvp_layer = if kvp_enabled { Some( - Kvp::new( + Kvp::new_internal( temp_path.clone(), test_vm_id, + EVENT_PREFIX, graceful_shutdown.clone(), + true, ) .expect("Failed to create Kvp") .tracing_layer, @@ -899,4 +1138,135 @@ mod tests { println!("KVP file is empty as expected because kvp_diagnostics is disabled."); } + + /// Helper: verify that a file contains exactly `expected` well-formed KVP + /// records (each `HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE` + /// bytes). + fn assert_kvp_record_count(path: &std::path::Path, expected: usize) { + let contents = std::fs::read(path).expect("Failed to read KVP file"); + let record_size = + HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE; + + assert_eq!( + contents.len() % record_size, + 0, + "File size ({}) is not a multiple of the record size ({record_size})", + contents.len() + ); + + let actual = contents.len() / record_size; + assert_eq!( + actual, expected, + "Expected {expected} KVP records but found {actual}" + ); + + // Validate every record is decodable. + for i in 0..actual { + let start = i * record_size; + let end = start + record_size; + decode_kvp_item(&contents[start..end]) + .unwrap_or_else(|e| panic!("Record {i} failed to decode: {e}")); + } + } + + /// 20 threads × 10,000 iterations writing to the same file via separate FDs. + #[test] + fn test_multi_thread_kvp_concurrent_writes() { + let temp_file = + NamedTempFile::new().expect("Failed to create tempfile"); + let temp_path = temp_file.path().to_path_buf(); + + let num_threads: usize = 20; + let iterations: usize = 10_000; + + let handles: Vec<_> = (0..num_threads) + .map(|tid| { + let path = temp_path.clone(); + std::thread::spawn(move || { + let mut file = Kvp::open_kvp_file(&path) + .expect("Failed to open KVP file"); + + for i in 0..iterations { + let key = format!("thread-{tid}-iter-{i}"); + let value = format!("value-{tid}-{i}"); + let encoded = encode_kvp_item(&key, &value).concat(); + Kvp::write_kvps(&mut file, &[encoded]) + .expect("write_kvps failed"); + } + }) + }) + .collect(); + + for h in handles { + h.join().expect("Thread panicked"); + } + + let expected_records = num_threads * iterations; + assert_kvp_record_count(&temp_path, expected_records); + println!( + "Multi-thread test passed: {expected_records} records verified." + ); + } + + /// 10 child processes × 10,000 iterations writing to the same file. + /// + /// When the env var `__KVP_CHILD_WORKER` is set the process acts as a + /// worker (encode + write); otherwise it orchestrates the children. + #[test] + fn test_multi_process_kvp_concurrent_writes() { + let num_processes: usize = 10; + let iterations: usize = 10_000; + + // --- Child worker path --- + if let Ok(path) = std::env::var("__KVP_CHILD_WORKER_PATH") { + let pid = std::process::id(); + let mut file = Kvp::open_kvp_file(Path::new(&path)) + .expect("Child: failed to open KVP file"); + + for i in 0..iterations { + let key = format!("proc-{pid}-iter-{i}"); + let value = format!("value-{pid}-{i}"); + let encoded = encode_kvp_item(&key, &value).concat(); + Kvp::write_kvps(&mut file, &[encoded]) + .expect("Child: write_kvps failed"); + } + return; // done – the parent will verify the file. + } + + // --- Parent orchestrator path --- + let temp_file = + NamedTempFile::new().expect("Failed to create tempfile"); + let temp_path = temp_file.path().to_path_buf(); + + let test_exe = std::env::current_exe() + .expect("Failed to determine test executable path"); + + let children: Vec<_> = (0..num_processes) + .map(|_| { + std::process::Command::new(&test_exe) + .env("__KVP_CHILD_WORKER_PATH", temp_path.to_str().unwrap()) + .arg("--exact") + .arg("kvp::tests::test_multi_process_kvp_concurrent_writes") + .arg("--nocapture") + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .expect("Failed to spawn child process") + }) + .collect(); + + for mut child in children { + let status = child.wait().expect("Failed to wait on child"); + assert!( + status.success(), + "Child process exited with failure: {status}" + ); + } + + let expected_records = num_processes * iterations; + assert_kvp_record_count(&temp_path, expected_records); + println!( + "Multi-process test passed: {expected_records} records verified." + ); + } } diff --git a/libazureinit/src/logging.rs b/libazureinit/src/logging.rs index fb5c543a..1152e048 100644 --- a/libazureinit/src/logging.rs +++ b/libazureinit/src/logging.rs @@ -11,12 +11,12 @@ use tracing::{event, Level, Subscriber}; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::{ - filter::Filtered, fmt, layer::SubscriberExt, registry::LookupSpan, - EnvFilter, Layer, Registry, + fmt, layer::SubscriberExt, EnvFilter, Layer, Registry, }; use crate::config::Config; -use crate::kvp::{EmitKVPLayer, Kvp as KvpInternal}; +pub use crate::kvp::{Kvp, KvpOptions}; +use crate::kvp::{Kvp as KvpInternal, EVENT_PREFIX}; pub type LoggingSetup = ( Box, @@ -121,99 +121,6 @@ fn get_kvp_filter( } } -// Public KVP wrapper API for library consumers -struct KvpLayer(Filtered); - -/// Emit tracing data to the Hyper-V KVP. -/// -/// ## KVP Tracing Configuration -/// -/// The KVP tracing layer's filter can be configured at runtime by setting the -/// `AZURE_INIT_KVP_FILTER` environment variable. This allows any application -/// using this library to override the default filter and control which traces -/// are sent to the KVP pool. -/// -/// The value of the variable must be a string that follows the syntax for -/// `tracing_subscriber::EnvFilter`, which is a comma-separated list of -/// logging directives. For example: `warn,my_crate=debug` or `info,my_crate::api=trace`. -/// See `config.rs` for more details. -/// -/// The filter can also be configured via the `kvp_filter` field in the `Config` struct. -/// **Precedence**: Environment variable > Config field > Default filter. -/// If neither is set, a default filter tailored for `azure-init` (WARN level + specific modules) is used. -/// -/// If an invalid filter string is provided (via env or config), a warning is logged -/// and the default filter is used instead. -/// -/// # Example -/// -/// ```no_run -/// # use libazureinit::logging::Kvp; -/// use tracing_subscriber::layer::SubscriberExt; -/// -/// # #[tokio::main] -/// # async fn main() -> anyhow::Result<()> { -/// let mut kvp = Kvp::new("a-unique-id")?; -/// let registry = tracing_subscriber::Registry::default().with(kvp.layer()); -/// -/// // When it's time to shut down, doing this ensures all writes are flushed -/// kvp.halt().await?; -/// # Ok(()) -/// # } -/// ``` -pub struct Kvp { - layer: Option>, - /// The `JoinHandle` for the background task responsible for writing - /// KVP data to the file. The caller can use this handle to wait for - /// the writer to finish. - writer: JoinHandle>, - shutdown: CancellationToken, -} - -impl LookupSpan<'lookup>> Kvp { - /// Create a new tracing layer for KVP. - /// - /// Refer to [`libazureinit::get_vm_id`] to retrieve the VM's unique identifier. - pub fn new>(vm_id: T) -> Result { - let shutdown = CancellationToken::new(); - let inner = KvpInternal::new( - std::path::PathBuf::from("/var/lib/hyperv/.kvp_pool_1"), - vm_id.as_ref(), - shutdown.clone(), - )?; - - let kvp_filter = get_kvp_filter(None)?; - let layer = Some(KvpLayer(inner.tracing_layer.with_filter(kvp_filter))); - - Ok(Self { - layer, - writer: inner.writer, - shutdown, - }) - } - - /// Get a tracing [`Layer`] to use with a [`Registry`]. - /// - /// # Panics if this function is called more than once. - pub fn layer(&mut self) -> Filtered { - assert!( - self.layer.is_some(), - "Kvp::layer cannot be called multiple times!" - ); - self.layer.take().unwrap().0 - } - - /// Gracefully shut down the KVP writer. - /// - /// This will stop new KVP logs from being queued and wait for all pending writes to the KVP - /// pool to complete. After this returns, no further logs will be written to KVP. - pub async fn halt(self) -> Result<(), anyhow::Error> { - self.shutdown.cancel(); - self.writer.await??; - Ok(()) - } -} - /// Builds a `tracing` subscriber that can optionally write azure-init.log /// to a specific location if `Some(&Config)` is provided. /// @@ -241,10 +148,14 @@ pub fn setup_layers( .telemetry .kvp_diagnostics { - match KvpInternal::new( - std::path::PathBuf::from("/var/lib/hyperv/.kvp_pool_1"), + // Preserve existing azure-init behavior for subscriber wiring. + let options = KvpOptions::default(); + match KvpInternal::new_internal( + options.file_path, vm_id, + EVENT_PREFIX, graceful_shutdown, + options.truncate_on_start, ) { Ok(kvp) => { let layer = kvp.tracing_layer.with_filter(kvp_filter); @@ -633,4 +544,29 @@ mod tests { assert!(!stderr_contents.contains("This is a warn message")); assert!(stderr_contents.contains("This is an error message")); } + + #[tokio::test] + async fn test_kvp_client_emit_and_close_writes_data() { + let temp_file = + NamedTempFile::new().expect("Failed to create tempfile"); + let temp_path = temp_file.path().to_path_buf(); + + let options = KvpOptions::default() + .vm_id("00000000-0000-0000-0000-000000000099") + .event_prefix("kvp-client-test") + .file_path(&temp_path); + + let kvp = Kvp::with_options(options).expect("create kvp client"); + kvp.emit("hello-world", None).expect("emit event"); + kvp.emit_health_report("result=success") + .expect("emit report"); + kvp.close().await.expect("close kvp client"); + + let bytes = std::fs::read(&temp_path).expect("read kvp file"); + assert!(!bytes.is_empty(), "Expected KVP file to contain data"); + + let contents = String::from_utf8_lossy(&bytes); + assert!(contents.contains("hello-world")); + assert!(contents.contains("PROVISIONING_REPORT")); + } }